import java.io.*; import java.net.*; import java.nio.CharBuffer; import java.util.*; public class Server { private final Controller controller; public static FileSystem fileSystem; public static int pos; public static boolean rebalancing; public Timer timer; public Server(Controller controller, FileSystem fileSystem) { this.controller = controller; Server.fileSystem = fileSystem; rebalancing = false; // do the rebalancing operation every rebalance_period seconds with a delay of rebalance_period seconds runRebalancing(controller.getRebalance_period()); } public static FileSystem getFileSystem() { return fileSystem; } private void runRebalancing(long delay) { if (timer != null) { timer.cancel(); timer.purge(); } timer = new Timer("RebalanceOperation"); TimerTask timerTask = new TimerTask() { public void run() { try { new Rebalance(fileSystem, controller).handle(); } catch (InterruptedException e) { e.printStackTrace(); } } }; timer.scheduleAtFixedRate(timerTask, delay, controller.getRebalance_period()); } public void handleClient(Socket client) throws IOException { OutputStream out = client.getOutputStream(); InputStream in = client.getInputStream(); pos = 0; BufferedReader req = new BufferedReader(new InputStreamReader(in)); PrintWriter res = new PrintWriter(new OutputStreamWriter(out)); System.out.println("---------NEW CONNECTION---------"); String line; while((line = req.readLine()) != null) { String[] tokens = line.split(" "); String command = tokens[0]; if (!rebalancing && command.equals("JOIN")) { int publicPort = Integer.parseInt(tokens[1]); FSStore fsStore = new FSStore(client, publicPort); fileSystem.addDstore(client.getPort(), fsStore); ControllerLogger.getInstance().dstoreJoined(client, publicPort); runRebalancing(0); } else if (fileSystem.getDstores().size() < controller.getR()) { ControllerLogger.getInstance().messageSent(client, "ERROR_NOT_ENOUGH_DSTORES"); res.println("ERROR_NOT_ENOUGH_DSTORES"); res.flush(); } else { if (fileSystem.getDstores().containsKey(client.getPort())) { if(command.equals("STORE_ACK")) { ControllerLogger.getInstance().messageReceived(client, "STORE_ACK"); handleStoreACK(tokens); } else if (command.equals("REMOVE_ACK")) { ControllerLogger.getInstance().messageReceived(client, "REMOVE_ACK"); handleRemoveACK(tokens); } else if(command.equals("LIST")) { ControllerLogger.getInstance().messageReceived(client, "LIST"); // get the files of the dstore when rebalancing List<String> filenames = new ArrayList<>(); for(int i = 1; i < tokens.length; i++) { filenames.add(tokens[i]); } fileSystem.addRebalancingList(fileSystem. getDstores(). get(client.getPort()).getConnection().getPort(), filenames); } else if (command.equals("REBALANCE_COMPLETE")) { ControllerLogger.getInstance().messageReceived(client, "REBALANCE_COMPLETE"); System.out.println("Rebalance done for " + fileSystem.getDstores().get(client.getPort()).getPublicPort()); } else if (command.equals("ERROR_FILE_DOES_NOT_EXIST")) { ControllerLogger.getInstance().messageReceived(client, "ERROR_FILE_DOES_NOT_EXIST"); } else { System.out.println("Unknown command " + command); } } else if (!rebalancing) { switch (command) { case "STORE" -> handleStore(client, tokens, line); case "LOAD" -> handleLoad(client, tokens, line); case "RELOAD" -> handleReload(client, tokens, line); case "REMOVE" -> handleRemove(client, tokens, line); case "LIST" -> handleList(client, line); default -> System.out.println("Unknown command " + command); } } else if (rebalancing) { System.out.println("Rebalancing..."); } } } } /** * @param client * @param tokens gets the filename and the filesize * @throws IOException */ private void handleStore(Socket client, String[] tokens, String line) throws IOException { ControllerLogger.getInstance().messageReceived(client, line); try { String filename = tokens[1]; int filesize = Integer.parseInt(tokens[2]); PrintWriter res = new PrintWriter(new OutputStreamWriter(client.getOutputStream())); if(fileSystem.getStore().containsKey(filename)) { ControllerLogger.getInstance().messageSent(client, "ERROR_FILE_ALREADY_EXISTS"); res.println("ERROR_FILE_ALREADY_EXISTS"); res.flush(); return; } // update index to store in progress fileSystem.addIndex(filename, "store in progress"); // select R Dstores and create a string of all of their endpoints String msg = ""; int i = 0; List<FSStore> temp = new ArrayList<>(); List<FSStore> sorted = new ArrayList<>(fileSystem.getDstores().values()); Collections.sort(sorted); for(FSStore fsStore : sorted) { if(i == controller.getR()) { break; } // in case all the ACK are received keep in memory all the dstores temp.add(fsStore); // construct the message msg = fsStore.getPublicPort() + " " + msg; i++; } ControllerLogger.getInstance().messageSent(client, "STORE_TO " + msg); res.println("STORE_TO " + msg); res.flush(); // check if all the dstores have sent an ACK and send appropriate messages boolean done = false; long limit = System.currentTimeMillis() + controller.getTimeout(); while(!done && System.currentTimeMillis() <= limit) { if(fileSystem.getStore().containsKey(filename) && !fileSystem.getStore().get(filename).isEmpty()) { if(fileSystem.getStore().get(filename).size() == controller.getR()) { done = true; fileSystem.addIndex(filename, "store complete"); fileSystem.addStore(filename, temp); for(FSStore fsStore : temp) { fsStore.getFiles().add(filename); } fileSystem.addFileSize(filename, filesize); ControllerLogger.getInstance().messageSent(client, "STORE_COMPLETE"); res.println("STORE_COMPLETE"); res.flush(); break; } } } // if the dstores didn't send a STORE_ACK in the timeout => store failed if(!done) { fileSystem.removeIndex(filename); System.out.println(filename + " failed to upload"); } } catch (IndexOutOfBoundsException e) { System.out.println("Arguments don't match in STORE operation"); } } private void handleLoad(Socket client, String[] tokens, String line) throws IOException { ControllerLogger.getInstance().messageReceived(client, line); try { String filename = tokens[1]; PrintWriter res = new PrintWriter(new OutputStreamWriter(client.getOutputStream())); if(!fileSystem.getStore().containsKey(filename)) { ControllerLogger.getInstance().messageSent(client, "ERROR_FILE_DOES_NOT_EXIST"); res.println("ERROR_FILE_DOES_NOT_EXIST"); } else { // select a Dstore from there and give an appropriate error if all Dstores fail ControllerLogger.getInstance().messageSent(client, "LOAD_FROM " + fileSystem.getStore().get(filename).get(pos).getPublicPort() + " " + fileSystem.getFileSizes().get(filename)); res.println("LOAD_FROM " + fileSystem.getStore().get(filename).get(pos).getPublicPort() + " " + fileSystem.getFileSizes().get(filename)); } res.flush(); } catch (IndexOutOfBoundsException e) { System.out.println("Arguments don't match in LOAD operation"); } } private void handleReload(Socket client, String[] tokens, String line) throws IOException { ControllerLogger.getInstance().messageReceived(client, line); pos = pos + 1; PrintWriter res = new PrintWriter(new OutputStreamWriter(client.getOutputStream())); try { String filename = tokens[1]; if(!fileSystem.getStore().containsKey(filename)) { pos = 0; ControllerLogger.getInstance().messageSent(client, "ERROR_FILE_DOES_NOT_EXIST"); res.println("ERROR_FILE_DOES_NOT_EXIST"); } else { if(pos == controller.getR()) { pos = 0; ControllerLogger.getInstance().messageSent(client, "ERROR_LOAD"); res.println("ERROR_LOAD"); res.flush(); return; } // select a Dstore from there and give an appropriate error if all Dstores fail ControllerLogger.getInstance().messageSent(client, "LOAD_FROM " + fileSystem.getStore().get(filename).get(pos).getPublicPort() + " " + fileSystem.getFileSizes().get(filename)); res.println("LOAD_FROM " + fileSystem.getStore().get(filename).get(pos).getPublicPort() + " " + fileSystem.getFileSizes().get(filename)); } res.flush(); } catch (IndexOutOfBoundsException e) { System.out.println("Arguments don't match in LOAD operation"); } } private void handleRemove(Socket client, String[] tokens, String line) throws IOException { ControllerLogger.getInstance().messageReceived(client, line); try { String filename = tokens[1]; fileSystem.addIndex(filename, "remove in progress"); System.out.println(fileSystem.index.get(filename)); if(!fileSystem.store.containsKey(filename)) { PrintWriter res = new PrintWriter(new OutputStreamWriter(client.getOutputStream())); ControllerLogger.getInstance().messageSent(client, "ERROR_FILE_DOES_NOT_EXIST"); res.println("ERROR_FILE_DOES_NOT_EXIST"); res.flush(); return; } for(FSStore fsStore : fileSystem.getStore().get(filename)) { fsStore.getFiles().remove(filename); PrintWriter res = fsStore.getOutput(); ControllerLogger.getInstance().messageSent(client, "REMOVE " + filename); res.println("REMOVE " + filename); res.flush(); } boolean done = false; long limit = System.currentTimeMillis() + controller.getTimeout(); PrintWriter res = new PrintWriter(new OutputStreamWriter(client.getOutputStream())); while(!done && System.currentTimeMillis() < limit) { if(fileSystem.getStore().get(filename).isEmpty()) { done = true; fileSystem.addIndex(filename, "remove complete"); fileSystem.getStore().remove(filename); fileSystem.getFileSizes().remove(filename); ControllerLogger.getInstance().messageSent(client, "REMOVE_COMPLETE"); res.println("REMOVE_COMPLETE"); res.flush(); } } if(!done) { fileSystem.removeIndex(filename); System.out.println(filename + " failed to remove"); } } catch (IndexOutOfBoundsException e) { System.out.println("Arguments don't match in REMOVE operation"); } } private void handleList(Socket client, String line) throws IOException { ControllerLogger.getInstance().messageReceived(client, line); String msg = ""; for(String filename : fileSystem.getStore().keySet()) { msg = filename + " " + msg; } PrintWriter res = new PrintWriter(new OutputStreamWriter(client.getOutputStream())); ControllerLogger.getInstance().messageSent(client, "LIST " + msg); res.println("LIST " + msg); res.flush(); } private void handleStoreACK(String[] tokens) throws IOException { String filename = tokens[1]; if(fileSystem.getStore().containsKey(filename)) { fileSystem.getStore().get(filename).add(null); } else { List<FSStore> d = new ArrayList<>(); d.add(null); Server.fileSystem.addStore(filename, d); } } private void handleRemoveACK(String[] tokens) { String filename = tokens[1]; for(FSStore fsStore : fileSystem.getStore().get(filename)) { if(!fsStore.getFiles().contains(filename)) { fileSystem.removeDstore(fsStore, filename); break; } } fileSystem.getFileSizes().remove(filename); } }