diff --git a/src/Dstore.java b/src/Dstore.java index fce2b56c3f35f5f564151e29dbd4c9c32d84a93b..c46b01fdd5d84f0d36598e9029976c8bce6e275b 100644 --- a/src/Dstore.java +++ b/src/Dstore.java @@ -1,8 +1,6 @@ import java.io.*; import java.net.*; -import java.util.Collections; -import java.util.HashSet; -import java.util.Set; +import java.util.*; public class Dstore { @@ -10,14 +8,20 @@ public class Dstore { private final int cport; private final int timeout; private final String file_folder; - public static Set<String> files; + public Set<String> files; + public Map<String, Integer> fileSizes; public Dstore(int port, int cport, int timeout, String file_folder) { this.port = port; this.cport = cport; this.timeout = timeout; this.file_folder = file_folder; - this.files = Collections.synchronizedSet(new HashSet<>()); + files = new HashSet<>(); + fileSizes = new HashMap<>(); + } + + public Map<String, Integer> getFileSizes() { + return fileSizes; } public int getPort() { @@ -122,6 +126,7 @@ public class Dstore { o.flush(); o.close(); dstore.getFiles().add(filename); + dstore.getFileSizes().put(filename, filesize); } if(!finished) { @@ -165,8 +170,31 @@ public class Dstore { res.flush(); } } - break; + + case "REBALANCE_STORE" : + + res.println("ACK"); + res.flush(); + + String filename = tokens[1]; + int filesize = Integer.parseInt(tokens[2]); + + byte[] data = new byte[filesize]; + in.readNBytes(data, 0, filesize); + FileOutputStream o = new FileOutputStream(dstore.getFile_folder() + + "/" + filename); + o.write(data); + dstore.addFile(filename); + o.flush(); + o.close(); + dstore.getFiles().add(filename); + dstore.getFileSizes().put(filename, filesize); + + PrintWriter r = new PrintWriter(new OutputStreamWriter(controllerSocket.getOutputStream())); + r.println("REBALANCE_COMPLETE"); + r.flush(); + default: System.out.println("Unknown command"); break; @@ -201,6 +229,7 @@ public class Dstore { File file = new File(dstore.getFile_folder() + "/" + filename); file.delete(); dstore.getFiles().remove(filename); + dstore.getFileSizes().remove(filename); res.println("REMOVE_ACK " + filename); } else { res.println("ERROR_FILE_DOES_NOT_EXIST " + filename); @@ -212,6 +241,7 @@ public class Dstore { System.out.println("Arguments don't match in REMOVE opretaion"); } } else if (command.equals("LIST")) { + String file_list = ""; for(String filename : dstore.getFiles()) { @@ -219,6 +249,87 @@ public class Dstore { } res.println("LIST " + file_list); res.flush(); + + } else if (command.equals("REBALANCE")) { + + // show the distribution of files + for(int i = 0; i < tokens.length; i++) { + System.out.print(tokens[i] + " "); + } + System.out.println(); + + + // send the messages to each dstore + int number_of_files_to_send = Integer.parseInt(tokens[1]); + int pos = 2; + for(int i = 0; i < number_of_files_to_send; i++) { + String filename = tokens[pos]; + pos++; + int filesize = Integer.parseInt(tokens[pos]); + pos++; + int size = Integer.parseInt(tokens[pos]); + pos++; + for(int j = pos; j < pos + size; j++) { + int finalJ = j; + int port = Integer.parseInt(tokens[finalJ]); + + Socket socket = new Socket(); + socket.connect(new InetSocketAddress("localhost", port)); + + new Thread() { + @Override + public void run() { + try { + + PrintWriter re = new PrintWriter(new OutputStreamWriter(socket.getOutputStream())); + BufferedReader rs = new BufferedReader(new InputStreamReader(socket.getInputStream())); + re.println("REBALANCE_STORE " + filename + " " + filesize); + re.flush(); + + String l = rs.readLine(); + + long limit = System.currentTimeMillis() + dstore.getTimeout(); + boolean done = false; + while(!done && System.currentTimeMillis() < limit) { + if(l.equals("ACK")) { + done = true; + + // send the file content + File file = new File(dstore.getFile_folder() + "/" + filename); + byte[] data = new byte[filesize]; + FileInputStream fis = new FileInputStream(file); + BufferedInputStream bis = new BufferedInputStream(fis); + + bis.read(data, 0, data.length); + + OutputStream os = null; + os = socket.getOutputStream(); + os.write(data, 0, data.length); + os.flush(); + + file.delete(); + dstore.getFiles().remove(filename); + dstore.getFileSizes().remove(filename); + } + } + + if(!done) { + System.out.println("Didn't get the ACK from rebalance in time"); + } + + } catch (Exception e) { + e.printStackTrace(); + } + } + }.start(); + } + } + + if(number_of_files_to_send == 0) { + res.println("REBALANCE_COMPLETE"); + res.flush(); + } + } else { System.out.println("Unknown command"); } diff --git a/src/Server.java b/src/Server.java index 4e7c1a1b98c945d771d7dd80a5e2ab5eba9f3c36..edf5807c1cdf22e9b870f97f1969b5863ffa3b13 100644 --- a/src/Server.java +++ b/src/Server.java @@ -9,7 +9,7 @@ public class Server { public static FileSystem fileSystem; public static int pos; public static boolean rebalancing; - Timer timer; + public Timer timer; public Server(Controller controller, FileSystem fileSystem) { this.controller = controller; @@ -17,9 +17,22 @@ public class Server { rebalancing = false; // do the rebalancing operation every rebalance_period seconds with a delay of rebalance_period seconds - timer = new Timer(); + runRebalancing(controller.getRebalance_period()); + } + + public static FileSystem getFileSystem() { + return fileSystem; + } + + private void runRebalancing(long delay) { + if (timer != null) { + timer.cancel(); + timer.purge(); + } + + timer = new Timer("RebalanceOperation"); + TimerTask timerTask = new TimerTask() { - @Override public void run() { try { new Rebalance(fileSystem, controller).handle(); @@ -28,7 +41,8 @@ public class Server { } } }; - timer.schedule(timerTask, controller.getRebalance_period(), controller.getRebalance_period()); + + timer.scheduleAtFixedRate(timerTask, delay, controller.getRebalance_period()); } public void handleClient(Socket client) throws IOException { @@ -49,20 +63,11 @@ public class Server { if (command.equals("JOIN")) { int publicPort = Integer.parseInt(tokens[1]); - System.out.println("Dstore wants to connect"); FSStore fsStore = new FSStore(client, publicPort); fileSystem.addDstore(client.getPort(), fsStore); - rebalancing = true; - new Thread() { - @Override - public void run() { - try { - new Rebalance(fileSystem, controller).handle(); - } catch (InterruptedException e) { - e.printStackTrace(); - } - } - }.start(); + + System.out.println("Dstore listening on port " + publicPort + " connected"); + runRebalancing(0); } else if (fileSystem.getDstores().size() < controller.getR()) { res.println("ERROR_NOT_ENOUGH_DSTORES"); @@ -73,7 +78,9 @@ public class Server { handleStoreACK(tokens); } else if (command.equals("REMOVE_ACK")) { handleRemoveACK(tokens); - } else if(command.equals("LIST")) { + } + else if(command.equals("LIST")) { + // get the files of the dstore when rebalancing List<String> filenames = new ArrayList<>(); for(int i = 1; i < tokens.length; i++) { @@ -82,26 +89,31 @@ public class Server { fileSystem.addRebalancingList(fileSystem. getDstores(). - get(client.getPort()). - getPublicPort(), + get(client.getPort()).getConnection().getPort(), filenames); + } else if (command.equals("REBALANCE_COMPLETE")) { - rebalancing = false; - System.out.println("Done rebalancing"); + + //fileSystem.increaseRebalanceComplete(); + } else if (command.equals("ERROR_FILE_DOES_NOT_EXIST")) { + System.out.println(command + " " + tokens[1]); + } else { - System.out.println("Unknown command"); + System.out.println("Unknown command " + command); } } else if (!rebalancing) { + switch (command) { case "STORE" -> handleStore(client, tokens); case "LOAD" -> handleLoad(client, tokens); case "RELOAD" -> handleReload(client, tokens); case "REMOVE" -> handleRemove(client, tokens); case "LIST" -> handleList(client); - default -> System.out.println("Unknown command"); + default -> System.out.println("Unknown command " + command); } + } else if (rebalancing) { System.out.println("Rebalancing..."); } @@ -157,19 +169,23 @@ public class Server { boolean done = false; long limit = System.currentTimeMillis() + controller.getTimeout(); - while(!done && System.currentTimeMillis() < limit) { - if(fileSystem.getStore().containsKey(filename) && fileSystem.getStore().get(filename).size() == controller.getR()) { - done = true; - fileSystem.addIndex(filename, "store complete"); - fileSystem.addStore(filename, temp); + while(!done && System.currentTimeMillis() <= limit) { + if(fileSystem.getStore().containsKey(filename) && !fileSystem.getStore().get(filename).isEmpty()) { + if(fileSystem.getStore().get(filename).size() == controller.getR()) { + done = true; + fileSystem.addIndex(filename, "store complete"); + fileSystem.addStore(filename, temp); - for(FSStore fsStore : temp) { - fsStore.getFiles().add(filename); - } - fileSystem.addFileSize(filename, filesize); - res.println("STORE_COMPLETE"); - res.flush(); + for(FSStore fsStore : temp) { + fsStore.getFiles().add(filename); + } + + fileSystem.addFileSize(filename, filesize); + res.println("STORE_COMPLETE"); + res.flush(); + break; + } } }