Select Git revision
Controller.java 3.78 KiB
package ftp;
import java.io.IOException;
import java.net.Socket;
import java.util.*;
import java.util.stream.Collectors;
import java.util.stream.Stream;
public class Controller extends Server {
private int r;
private int rbPeriod;
private int nextID = 0;
private DStoreIndex dStoreIndex;
private FileIndex fileIndex;
/**
* @desc constructs a controller
* @param cport to listen on
* @param r replication factor
* @param timeout timeout (ms)
* @param rbPeriod rebalance period (ms)
*/
public Controller(int cport, int r, int timeout, int rbPeriod) throws IOException {
this.port = cport;
this.r = r;
this.timeout = timeout;
this.rbPeriod = rbPeriod;
dStoreIndex = new DStoreIndex();
fileIndex = new FileIndex();
start();
}
public static void main(String args[]) {
Stream<String> str = Arrays.stream(args);
List<Integer> intArgs = str.map(x -> {return Integer.parseInt(x);})
.collect(Collectors.toList());
try {
Controller ctrl = new Controller(intArgs.get(0), intArgs.get(1), intArgs.get(2), intArgs.get(3));
} catch (IOException e) {
System.out.println("IOException " + e.getMessage());
}
}
@Override
protected void handleRequest(String request, Socket client) {
String args[] = request.split(" ");
String command = args[0];
if (command.equals("JOIN")) {
Integer port = Integer.parseInt(args[1]);
send("LIST", client);
String files = readSocket(client);
DStoreConnection dStore;
if (!files.equals("empty")) {
List<DStoreFile> dStoreFiles = Arrays.stream(files.split("\\|")).
map(x -> x.split(" ")).
map(x -> new DStoreFile(x[0], Long.parseLong(x[1]))).
collect(Collectors.toList());
dStore = new DStoreConnection(dStoreFiles, port, nextID);
dStoreFiles.stream().forEach(x -> x.addDstore(dStore));
}
else dStore = new DStoreConnection(port, nextID);
dStoreIndex.addDStore(dStore);
threadIDOutput("New Dstore (ID: " + nextID + ") successfully joined");
nextID++;
send("ACK", client);
}
else if (command.equals("STORE")) {
String filename = args[1];
Long filesize = Long.parseLong(args[2]);
DStoreFile file = fileIndex.addFile(filename, filesize);
file.setStoreInProgress(true);
List<DStoreConnection> dStores = dStoreIndex.getFirstN(r);
dStores.stream().
forEach(x -> {
x.addFile(file);
file.addDstore(x);
});
String ports = dStores.stream().
map(x -> Integer.toString(x.getPort())).
collect(Collectors.joining(" "));
file.setStoreAcksQuota(r);
send("STORE_TO " + ports, client);
}
else if (command.equals("STORE_ACK")) {
String filename = args[1];
DStoreFile file = fileIndex.get(filename);
file.storeAck();
if ( file.ackCheck() ) {
file.setStoreInProgress(false);
file.setStoreComplete(true);
threadIDOutput("Store of file " + filename + " complete");
}
}
else if (command.equals("LOAD")) {
String filename = args[1];
DStoreFile file = fileIndex.get(filename);
int dStorePort = file.getDstore().getPort();
send("LOAD_FROM " + dStorePort + " " + file.getFilesize(), client);
}
}
}