Select Git revision
Controller.java
Controller.java 12.11 KiB
import java.io.*;
import java.net.*;
import java.lang.Runnable;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.util.HashMap;
import java.util.Collection;
public class Controller {
protected int cport; //Port to listen on
protected int rFactor; //Replication factor; each file is replicated across r Dstores
protected int timeout; //in milliseconds
protected int rebalancePeriod; //How long to wait to start the next rebalance operation, in milliseconds
protected class IndexEntry {
protected int filesize;
protected List<Integer> storedBy;
protected int numberToStore;
protected String status;
protected Object storeAckLock;
protected List<Reloader> clientLoadList;
public IndexEntry() {
filesize = -1;
storedBy = new SyncList();
numberToStore = 0;
status = "store in progress";
storeAckLock = new Object();
clientLoadList = new ArrayList<Reloader>();
}
public synchronized void setFilesize(int filesize) {
this.filesize = filesize;
}
public synchronized int getFilesize() {
return filesize;
}
public void addStoredBy(int dstore) {
storedBy.add(Integer.valueOf(dstore));
if(storedBy.size() == numberToStore) storeAckLock.notify();
}
public void addStoredBy(List<Integer> dstores) {
storedBy.addAll(dstores);
if(storedBy.size() == numberToStore) storeAckLock.notify();
}
public void removeStoredBy(int dstore) {
storedBy.remove(Integer.valueOf(dstore));
if(storedBy.size() == 0) storeAckLock.notify();
}
public void removeStoredBy(List<Integer> dstores) {
storedBy.removeAll(dstores);
if(storedBy.size() == 0) storeAckLock.notify();
}
public List<Integer> getStoredBy() {
return storedBy;
}
public synchronized void setNumberToStore(int i) {
numberToStore = i;
}
public synchronized void setStatus(String status) {
this.status = status;
}
public synchronized String getStatus() {
return status;
}
public Object getLock() {
return storeAckLock;
}
public List<Reloader> getLoadList() {
return clientLoadList;
}
}
protected class SyncList extends ArrayList<Integer> {
public SyncList() {
super();
}
@Override
public boolean add(Integer i) {
synchronized(this) {
return super.add(i);
}
}
@Override
public boolean addAll(Collection<? extends Integer> c) {
synchronized(this) {
return super.addAll(c);
}
}
@Override
public Integer get(int i) {
synchronized(this) {
return super.get(i);
}
}
@Override
public int size() {
synchronized(this) {
return super.size();
}
}
public Integer remove(int i) {
synchronized(this) {
return super.remove(i);
}
}
public boolean remove(Integer i) {
synchronized(this) {
return super.remove(i);
}
}
}
protected class Reloader {
public boolean reload;
public Reloader() {
reload = false;
}
}
protected List<Integer> dstores;
protected Map<Integer,String[]> rebalanceMessages;
protected Map<String,IndexEntry> index;
public Controller(int cport, int rFactor, int timeout, int rebalancePeriod) {
this.cport = cport;
this.rFactor = rFactor;
this.timeout = timeout;
this.rebalancePeriod = rebalancePeriod;
dstores = new SyncList();
index = new HashMap<String,IndexEntry>();
}
public static void main(String[] args) {
try {
int cport = Integer.parseInt(args[0]);
int rFactor = Integer.parseInt(args[1]);
int timeout = Integer.parseInt(args[2]);
int rebalancePeriod = Integer.parseInt(args[3]);
Controller controller = new Controller(cport, rFactor, timeout, rebalancePeriod);
controller.start();
}
catch(IndexOutOfBoundsException e) {
System.out.println("Command line arguments have not been provided");
return;
}
catch(NumberFormatException e) {
System.out.println("Command line arguments must be integers");
return;
}
}
public void start() {
try {
ServerSocket server = new ServerSocket(cport);
while(true) {
try {
Socket client = server.accept();
BufferedReader in = new BufferedReader(new InputStreamReader(client.getInputStream()));
String[] message = in.readLine().split(" ");
if(dstores.size() < rFactor && !message[0].equals("JOIN")) {
BufferedWriter out = new BufferedWriter(new OutputStreamWriter(client.getOutputStream()));
out.write("ERROR");
out.close();
}
else {
handleMessage(message, client);
}
in.close();
}
catch(Exception e) {
//Log error
e.printStackTrace();
System.out.println("Continue...");
}
}
}
catch(Exception e) {
e.printStackTrace();
}
}
void handleMessage(String[] message, Socket client) throws Exception {
if(message[0].equals("JOIN")) {
dstores.add(Integer.parseInt(message[1]));
System.out.println("Dstore at " + message[1] + " joined");
rebalance();
}
else if(message[0].equals("STORE")) {
store(client, message[1]);
}
else if(message[0].equals("STORE_ACK")) {
storeAck(client, message[1]);
}
else if(message[0].equals("LOAD")) {
load(client, message[1]);
}
else if(message[0].equals("RELOAD")) {
reload(message[1]);
}
else if(message[0].equals("REMOVE")) {
remove(client, message[1]);
}
else if(message[0].equals("LIST")) {
list(client);
}
else {
for(String name : message) {
if(!index.containsKey(name)) {
//Log error and continue (throw exception?)
return;
}
}
receiveDstoreList(client, message);
}
}
void store(Socket client, String filename) throws Exception {
new Thread(() -> {
try {
if(index.containsKey(filename)) {
PrintWriter out = new PrintWriter(client.getOutputStream());
out.print("ERROR ALREADY_EXISTS " + filename);
out.flush();
out.close();
return;
}
//Update index to "store in progress"
IndexEntry entry = new IndexEntry();
index.put(filename, entry);
//Select Dstores
int[] storesToStore = new int[rFactor];
for(int i=0; i<rFactor; i++) {
Integer thisStore = dstores.get(i);
storesToStore[i] = thisStore.intValue();
}
entry.setNumberToStore(rFactor);
//Send STORE_TO message
PrintWriter out = new PrintWriter(client.getOutputStream());
out.print("STORE_TO");
for(int port : storesToStore) {
out.print(" " + port);
}
out.flush();
//Wait for STORE_ACKs from datastores in storesToStore
try {
entry.getLock().wait(timeout);
}
catch(InterruptedException e) {
e.printStackTrace();
}
if(entry.getStoredBy().size() < rFactor) {
//Log error
}
//Update index to "store complete"
entry.status = "store complete";
//Send STORE_COMPLETE message
out.print("STORE_COMPLETE");
out.flush();
out.close();
}
catch(IOException e) {
e.printStackTrace();
}
}).start();
}
void storeAck(Socket client, String filename) throws Exception {
new Thread(() -> {
if(!index.containsKey(filename)) {
//Throw logging exception
return;
}
IndexEntry thisEntry = index.get(filename);
thisEntry.addStoredBy(Integer.valueOf(client.getPort()));
}).start();
}
void load(Socket client, String filename) throws Exception {
new Thread(() -> {
try {
if(!index.containsKey(filename)) {
PrintWriter out = new PrintWriter(client.getOutputStream());
out.print("ERROR DOES_NOT_EXIST");
out.flush();
out.close();
return;
}
//Select a Dstore which contains the file
IndexEntry thisEntry = index.get(filename);
int thisStore = thisEntry.storedBy.get(0).intValue();
int thisSize = thisEntry.filesize;
//Send LOAD_FROM message
PrintWriter out = new PrintWriter(client.getOutputStream());
out.print("LOAD_FROM " + thisStore + " " + thisSize);
out.flush();
Reloader reloadLock = new Object();
thisEntry.getLoadList().add(reloadLock);
int trials = 0;
while(true) {
reloadLock.wait(10 * timeout);
trials ++;
if(trials >= rFactor || !reloadLock.reload) break;
out.print("LOAD_FROM " + thisEntry.storedBy.get(trials).intValue() + " " + thisSize);
out.flush();
reloadLock.reload = false;
}
thisEntry.getLoadList().remove(reloadLock);
if(trials >= rFactor && reloadLock.reload) {
out.print("ERROR LOAD");
out.flush();
}
out.close();
}
catch(IOException e) {
e.printStackTrace();
}
}).start();
}
void reload(String filename) {
new Thread(() -> {
try {
for(Reloader r : index.get(filename).getLoadList()) {
r.reload = true;
r.notify();
}
}
catch(IOException e) {
e.printStackTrace();
}
}).start();
}
void remove(Socket client, String filename) throws Exception {
new Thread(() -> {
try {
if(!index.containsKey(filename)) {
PrintWriter clientOut = new PrintWriter(client.getOutputStream());
clientOut.print("ERROR DOES_NOT_EXIST");
clientOut.flush();
clientOut.close();
return;
}
//Update index to "remove in progress"
IndexEntry entry = index.get(filename);
entry.status = "remove in progress";
//Send REMOVE message to all Dstores storing the file
for(Integer dstore : entry.storedBy) {
Socket socket = new Socket(InetAddress.getLocalHost(), dstore.intValue());
PrintWriter out = new PrintWriter(socket.getOutputStream());
out.write("REMOVE " + filename);
out.flush();
out.close();
socket.close();
}
//Wait for REMOVE_ACKs from all Dstores which were sent the REMOVE message
try {
entry.getLock().wait(timeout);
}
catch(InterruptedException e) {
e.printStackTrace();
}
if(entry.getStoredBy().size() > 0) {
//Log error
}
//Update index to "remove complete"
entry.status = "remove complete";
//Send REMOVE_COMPLETE to client
PrintWriter clientOut = new PrintWriter(client.getOutputStream());
clientOut.print("REMOVE_COMPLETE");
clientOut.flush();
clientOut.close();
}
catch(IOException e) {
e.printStackTrace();
}
}).start();
}
void list(Socket client) throws Exception {
new Thread(() -> {
try {
//Send file list to client
PrintWriter out = new PrintWriter(client.getOutputStream());
for(String name : index.keySet()) {
out.println(name);
}
out.flush();
out.close();
}
catch(IOException e) {
e.printStackTrace();
}
}).start();
}
void rebalance() throws Exception {
new Thread(() -> {
if(rebalanceMessages != null) return;
Map<Integer,String[]> dstoreFiles = new HashMap<Integer,String[]>();
rebalanceMessages = dstoreFiles;
try {
//Send LIST message to each Dstore and receive their file list
for(Integer dstore : dstores) {
Socket socket = new Socket(InetAddress.getLocalHost(), dstore.intValue());
PrintWriter out = new PrintWriter(socket.getOutputStream());
out.write("LIST");
out.flush();
out.close();
socket.close();
}
dstoreFiles.wait(timeout);
if(dstoreFiles.size() < dstores.size()) {
//Log error
}
//Create a new file allocation so that:
//Each file appears rFactor times
//Each file appears at most once on each datastore
//Files are evenly distributed (Dstores differ in capacity by at most 1, no 2 datastores have identical file lists)
for(Integer i : reshuffle(dstoreFiles.keySet())) {
}
//Make a (files to send, files to remove) pair for each Dstore
//Send the respective REBALANCE message to each Dstore
//Wait for REBALANCE_COMPLETE from all Dstores
}
catch(IOException e) {
e.printStackTrace();
}
finally {
rebalanceMessages = null;
}
}).start();
}
void receiveDstoreList(Socket client, String[] list) {
new Thread(() -> {
if(rebalanceMessages == null) return;
rebalanceMessages.add(Integer.valueOf(client.getPort()), list);
if(rebalanceMessages.size() == dstores.size()) {
rebalanceMessages.notify();
}
}).start();
}
List<Integer> reshuffle(Collection<Integer> col) {
List<Integer> list = new ArrayList<Integer>();
for(Integer i : col) {
list.add(0, i);
}
return list;
}
}