Server.java 14.21 KiB
import java.io.*;
import java.net.*;
import java.nio.CharBuffer;
import java.util.*;
public class Server {
private final Controller controller;
public static FileSystem fileSystem;
public static int pos;
public static boolean rebalancing;
public Timer timer;
public Server(Controller controller, FileSystem fileSystem) {
this.controller = controller;
Server.fileSystem = fileSystem;
rebalancing = false;
// do the rebalancing operation every rebalance_period seconds with a delay of rebalance_period seconds
runRebalancing(controller.getRebalance_period());
}
public static FileSystem getFileSystem() {
return fileSystem;
}
private void runRebalancing(long delay) {
if (timer != null) {
timer.cancel();
timer.purge();
}
timer = new Timer("RebalanceOperation");
TimerTask timerTask = new TimerTask() {
public void run() {
try {
new Rebalance(fileSystem, controller).handle();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
};
timer.scheduleAtFixedRate(timerTask, delay, controller.getRebalance_period());
}
public void handleClient(Socket client) throws IOException {
OutputStream out = client.getOutputStream();
InputStream in = client.getInputStream();
pos = 0;
BufferedReader req = new BufferedReader(new InputStreamReader(in));
PrintWriter res = new PrintWriter(new OutputStreamWriter(out));
System.out.println("---------NEW CONNECTION---------");
String line;
while((line = req.readLine()) != null) {
String[] tokens = line.split(" ");
String command = tokens[0];
if (!rebalancing && command.equals("JOIN")) {
int publicPort = Integer.parseInt(tokens[1]);
FSStore fsStore = new FSStore(client, publicPort);
fileSystem.addDstore(client.getPort(), fsStore);
ControllerLogger.getInstance().dstoreJoined(client, publicPort);
runRebalancing(0);
} else if (fileSystem.getDstores().size() < controller.getR()) {
ControllerLogger.getInstance().messageSent(client, "ERROR_NOT_ENOUGH_DSTORES");
res.println("ERROR_NOT_ENOUGH_DSTORES");
res.flush();
} else {
if (fileSystem.getDstores().containsKey(client.getPort())) {
if(command.equals("STORE_ACK")) {
ControllerLogger.getInstance().messageReceived(client, "STORE_ACK");
handleStoreACK(tokens);
} else if (command.equals("REMOVE_ACK")) {
ControllerLogger.getInstance().messageReceived(client, "REMOVE_ACK");
handleRemoveACK(tokens);
}
else if(command.equals("LIST")) {
ControllerLogger.getInstance().messageReceived(client, "LIST");
// get the files of the dstore when rebalancing
List<String> filenames = new ArrayList<>();
for(int i = 1; i < tokens.length; i++) {
filenames.add(tokens[i]);
}
fileSystem.addRebalancingList(fileSystem.
getDstores().
get(client.getPort()).getConnection().getPort(),
filenames);
} else if (command.equals("REBALANCE_COMPLETE")) {
ControllerLogger.getInstance().messageReceived(client, "REBALANCE_COMPLETE");
System.out.println("Rebalance done for " + fileSystem.getDstores().get(client.getPort()).getPublicPort());
} else if (command.equals("ERROR_FILE_DOES_NOT_EXIST")) {
ControllerLogger.getInstance().messageReceived(client, "ERROR_FILE_DOES_NOT_EXIST");
} else {
System.out.println("Unknown command " + command);
}
} else if (!rebalancing) {
switch (command) {
case "STORE" -> handleStore(client, tokens, line);
case "LOAD" -> handleLoad(client, tokens, line);
case "RELOAD" -> handleReload(client, tokens, line);
case "REMOVE" -> handleRemove(client, tokens, line);
case "LIST" -> handleList(client, line);
default -> System.out.println("Unknown command " + command);
}
} else if (rebalancing) {
System.out.println("Rebalancing...");
}
}
}
}
/**
* @param client
* @param tokens gets the filename and the filesize
* @throws IOException
*/
private void handleStore(Socket client, String[] tokens, String line) throws IOException {
ControllerLogger.getInstance().messageReceived(client, line);
try {
String filename = tokens[1];
int filesize = Integer.parseInt(tokens[2]);
PrintWriter res = new PrintWriter(new OutputStreamWriter(client.getOutputStream()));
if(fileSystem.getStore().containsKey(filename)) {
ControllerLogger.getInstance().messageSent(client, "ERROR_FILE_ALREADY_EXISTS");
res.println("ERROR_FILE_ALREADY_EXISTS");
res.flush();
return;
}
// update index to store in progress
fileSystem.addIndex(filename, "store in progress");
// select R Dstores and create a string of all of their endpoints
String msg = "";
int i = 0;
List<FSStore> temp = new ArrayList<>();
List<FSStore> sorted = new ArrayList<>(fileSystem.getDstores().values());
Collections.sort(sorted);
for(FSStore fsStore : sorted) {
if(i == controller.getR()) {
break;
}
// in case all the ACK are received keep in memory all the dstores
temp.add(fsStore);
// construct the message
msg = fsStore.getPublicPort() + " " + msg;
i++;
}
ControllerLogger.getInstance().messageSent(client, "STORE_TO " + msg);
res.println("STORE_TO " + msg);
res.flush();
// check if all the dstores have sent an ACK and send appropriate messages
boolean done = false;
long limit = System.currentTimeMillis() + controller.getTimeout();
while(!done && System.currentTimeMillis() <= limit) {
if(fileSystem.getStore().containsKey(filename) && !fileSystem.getStore().get(filename).isEmpty()) {
if(fileSystem.getStore().get(filename).size() == controller.getR()) {
done = true;
fileSystem.addIndex(filename, "store complete");
fileSystem.addStore(filename, temp);
for(FSStore fsStore : temp) {
fsStore.getFiles().add(filename);
}
fileSystem.addFileSize(filename, filesize);
ControllerLogger.getInstance().messageSent(client, "STORE_COMPLETE");
res.println("STORE_COMPLETE");
res.flush();
break;
}
}
}
// if the dstores didn't send a STORE_ACK in the timeout => store failed
if(!done) {
fileSystem.removeIndex(filename);
System.out.println(filename + " failed to upload");
}
} catch (IndexOutOfBoundsException e) {
System.out.println("Arguments don't match in STORE operation");
}
}
private void handleLoad(Socket client, String[] tokens, String line) throws IOException {
ControllerLogger.getInstance().messageReceived(client, line);
try {
String filename = tokens[1];
PrintWriter res = new PrintWriter(new OutputStreamWriter(client.getOutputStream()));
if(!fileSystem.getStore().containsKey(filename)) {
ControllerLogger.getInstance().messageSent(client, "ERROR_FILE_DOES_NOT_EXIST");
res.println("ERROR_FILE_DOES_NOT_EXIST");
} else {
// select a Dstore from there and give an appropriate error if all Dstores fail
ControllerLogger.getInstance().messageSent(client, "LOAD_FROM " + fileSystem.getStore().get(filename).get(pos).getPublicPort() +
" " + fileSystem.getFileSizes().get(filename));
res.println("LOAD_FROM " + fileSystem.getStore().get(filename).get(pos).getPublicPort() +
" " + fileSystem.getFileSizes().get(filename));
}
res.flush();
} catch (IndexOutOfBoundsException e) {
System.out.println("Arguments don't match in LOAD operation");
}
}
private void handleReload(Socket client, String[] tokens, String line) throws IOException {
ControllerLogger.getInstance().messageReceived(client, line);
pos = pos + 1;
PrintWriter res = new PrintWriter(new OutputStreamWriter(client.getOutputStream()));
try {
String filename = tokens[1];
if(!fileSystem.getStore().containsKey(filename)) {
pos = 0;
ControllerLogger.getInstance().messageSent(client, "ERROR_FILE_DOES_NOT_EXIST");
res.println("ERROR_FILE_DOES_NOT_EXIST");
} else {
if(pos == controller.getR()) {
pos = 0;
ControllerLogger.getInstance().messageSent(client, "ERROR_LOAD");
res.println("ERROR_LOAD");
res.flush();
return;
}
// select a Dstore from there and give an appropriate error if all Dstores fail
ControllerLogger.getInstance().messageSent(client, "LOAD_FROM " + fileSystem.getStore().get(filename).get(pos).getPublicPort() +
" " + fileSystem.getFileSizes().get(filename));
res.println("LOAD_FROM " + fileSystem.getStore().get(filename).get(pos).getPublicPort() +
" " + fileSystem.getFileSizes().get(filename));
}
res.flush();
} catch (IndexOutOfBoundsException e) {
System.out.println("Arguments don't match in LOAD operation");
}
}
private void handleRemove(Socket client, String[] tokens, String line) throws IOException {
ControllerLogger.getInstance().messageReceived(client, line);
try {
String filename = tokens[1];
fileSystem.addIndex(filename, "remove in progress");
System.out.println(fileSystem.index.get(filename));
if(!fileSystem.store.containsKey(filename)) {
PrintWriter res = new PrintWriter(new OutputStreamWriter(client.getOutputStream()));
ControllerLogger.getInstance().messageSent(client, "ERROR_FILE_DOES_NOT_EXIST");
res.println("ERROR_FILE_DOES_NOT_EXIST");
res.flush();
return;
}
for(FSStore fsStore : fileSystem.getStore().get(filename)) {
fsStore.getFiles().remove(filename);
PrintWriter res = fsStore.getOutput();
ControllerLogger.getInstance().messageSent(client, "REMOVE " + filename);
res.println("REMOVE " + filename);
res.flush();
}
boolean done = false;
long limit = System.currentTimeMillis() + controller.getTimeout();
PrintWriter res = new PrintWriter(new OutputStreamWriter(client.getOutputStream()));
while(!done && System.currentTimeMillis() < limit) {
if(fileSystem.getStore().get(filename).isEmpty()) {
done = true;
fileSystem.addIndex(filename, "remove complete");
fileSystem.getStore().remove(filename);
fileSystem.getFileSizes().remove(filename);
ControllerLogger.getInstance().messageSent(client, "REMOVE_COMPLETE");
res.println("REMOVE_COMPLETE");
res.flush();
}
}
if(!done) {
fileSystem.removeIndex(filename);
System.out.println(filename + " failed to remove");
}
} catch (IndexOutOfBoundsException e) {
System.out.println("Arguments don't match in REMOVE operation");
}
}
private void handleList(Socket client, String line) throws IOException {
ControllerLogger.getInstance().messageReceived(client, line);
String msg = "";
for(String filename : fileSystem.getStore().keySet()) {
msg = filename + " " + msg;
}
PrintWriter res = new PrintWriter(new OutputStreamWriter(client.getOutputStream()));
ControllerLogger.getInstance().messageSent(client, "LIST " + msg);
res.println("LIST " + msg);
res.flush();
}
private void handleStoreACK(String[] tokens) throws IOException {
String filename = tokens[1];
if(fileSystem.getStore().containsKey(filename)) {
fileSystem.getStore().get(filename).add(null);
} else {
List<FSStore> d = new ArrayList<>();
d.add(null);
Server.fileSystem.addStore(filename, d);
}
}
private void handleRemoveACK(String[] tokens) {
String filename = tokens[1];
for(FSStore fsStore : fileSystem.getStore().get(filename)) {
if(!fsStore.getFiles().contains(filename)) {
fileSystem.removeDstore(fsStore, filename);
break;
}
}
fileSystem.getFileSizes().remove(filename);
}
}