From 50849405c77788107703a1fc3396bb60db9c3252 Mon Sep 17 00:00:00 2001 From: Theodora-Mara Pislar <tmp1u19@soton.ac.uk> Date: Mon, 10 May 2021 22:01:34 +0100 Subject: [PATCH] Fix some bugs and add some rebalancing operations --- src/Dstore.java | 24 +++++++++++---- src/Server.java | 80 ++++++++++++++++++++++++++++++++++++++++++------- 2 files changed, 88 insertions(+), 16 deletions(-) diff --git a/src/Dstore.java b/src/Dstore.java index dc6ba4e..b930b89 100644 --- a/src/Dstore.java +++ b/src/Dstore.java @@ -1,5 +1,6 @@ import java.io.*; import java.net.*; +import java.util.Collections; import java.util.HashSet; import java.util.Set; @@ -9,14 +10,14 @@ public class Dstore { private final int cport; private final int timeout; private final String file_folder; - private final Set<String> files; + public static Set<String> files; public Dstore(int port, int cport, int timeout, String file_folder) { this.port = port; this.cport = cport; this.timeout = timeout; this.file_folder = file_folder; - this.files = new HashSet<>(); + this.files = Collections.synchronizedSet(new HashSet<>()); } public int getPort() { @@ -35,10 +36,14 @@ public class Dstore { return file_folder; } - public Set<String> getFiles() { + synchronized public Set<String> getFiles() { return files; } + synchronized public void addFile(String filename) { + getFiles().add(filename); + } + public void listenClient(Socket socket) { try { @@ -112,10 +117,11 @@ public class Dstore { FileOutputStream o = new FileOutputStream(dstore.getFile_folder() + "/" + filename); o.write(data); + dstore.addFile(filename); + finished = true; o.flush(); o.close(); dstore.getFiles().add(filename); - finished = true; } if(!finished) { @@ -124,7 +130,7 @@ public class Dstore { } PrintWriter r = new PrintWriter(new OutputStreamWriter(controllerSocket.getOutputStream())); - r.println("STORE_ACK " + filename + " " + dstore.getPort()); + r.println("STORE_ACK " + filename); r.flush(); } catch (IndexOutOfBoundsException e) { @@ -207,6 +213,14 @@ public class Dstore { } catch(IndexOutOfBoundsException e) { System.out.println("Arguments don't match in REMOVE opretaion"); } + } else if (command.equals("LIST")) { + String file_list = ""; + + for(String filename : dstore.getFiles()) { + file_list = filename + " " + file_list; + } + res.println("LIST " + file_list); + res.flush(); } else { System.out.println("Unknown command"); } diff --git a/src/Server.java b/src/Server.java index 9ca3fb9..3d573bf 100644 --- a/src/Server.java +++ b/src/Server.java @@ -8,10 +8,27 @@ public class Server { private final Controller controller; public static FileSystem fileSystem; public static int pos; + public static boolean rebalancing; + Timer timer; public Server(Controller controller, FileSystem fileSystem) { this.controller = controller; Server.fileSystem = fileSystem; + rebalancing = false; + + // do the rebalancing operation every rebalance_period seconds with a delay of rebalance_period seconds + timer = new Timer(); + TimerTask timerTask = new TimerTask() { + @Override + public void run() { + try { + new Rebalance(fileSystem, controller).handle(); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + }; + timer.schedule(timerTask, controller.getRebalance_period(), controller.getRebalance_period()); } public void handleClient(Socket client) throws IOException { @@ -30,10 +47,23 @@ public class Server { String command = tokens[0]; if (command.equals("JOIN")) { + int publicPort = Integer.parseInt(tokens[1]); System.out.println("Dstore wants to connect"); FSStore fsStore = new FSStore(client, publicPort); fileSystem.addDstore(client.getPort(), fsStore); + rebalancing = true; + new Thread() { + @Override + public void run() { + try { + new Rebalance(fileSystem, controller).handle(); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + }.start(); + } else if (fileSystem.getDstores().size() < controller.getR()) { res.println("ERROR_NOT_ENOUGH_DSTORES"); res.flush(); @@ -41,12 +71,29 @@ public class Server { if (fileSystem.getDstores().containsKey(client.getPort())) { if(command.equals("STORE_ACK")) { handleStoreACK(tokens); - } else if (command.equals("REMOVE_ACK")){ + } else if (command.equals("REMOVE_ACK")) { handleRemoveACK(tokens); + } else if(command.equals("LIST")) { + // get the files of the dstore when rebalancing + List<String> filenames = new ArrayList<>(); + for(int i = 1; i < tokens.length; i++) { + filenames.add(tokens[i]); + } + + fileSystem.addRebalancingList(fileSystem. + getDstores(). + get(client.getPort()). + getPublicPort(), + filenames); + } else if (command.equals("REBALANCE_COMPLETE")) { + rebalancing = false; + System.out.println("Done rebalancing"); + } else if (command.equals("ERROR_FILE_DOES_NOT_EXIST")) { + System.out.println(command + " " + tokens[1]); } else { System.out.println("Unknown command"); } - } else { + } else if (!rebalancing) { switch (command) { case "STORE" -> handleStore(client, tokens); case "LOAD" -> handleLoad(client, tokens); @@ -55,6 +102,8 @@ public class Server { case "LIST" -> handleList(client); default -> System.out.println("Unknown command"); } + } else if (rebalancing) { + System.out.println("Rebalancing..."); } } } @@ -85,17 +134,19 @@ public class Server { String msg = ""; int i = 0; List<FSStore> temp = new ArrayList<>(); + List<FSStore> sorted = new ArrayList<>(fileSystem.getDstores().values()); + Collections.sort(sorted); - for(int port : fileSystem.getDstores().keySet()) { + for(FSStore fsStore : sorted) { if(i == controller.getR()) { break; } // in case all the ACK are received keep in memory all the dstores - temp.add(fileSystem.getDstores().get(port)); + temp.add(fsStore); // construct the message - msg = fileSystem.getDstores().get(port).getPublicPort() + " " + msg; + msg = fsStore.getPublicPort() + " " + msg; i++; } @@ -111,6 +162,11 @@ public class Server { done = true; fileSystem.addIndex(filename, "store complete"); fileSystem.addStore(filename, temp); + + for(FSStore fsStore : temp) { + fsStore.getFiles().add(filename); + } + fileSystem.addFileSize(filename, filesize); res.println("STORE_COMPLETE"); res.flush(); @@ -193,6 +249,7 @@ public class Server { for(FSStore fsStore : fileSystem.getStore().get(filename)) { System.out.println("Sending to Dstore"); + fsStore.getFiles().remove(filename); PrintWriter res = fsStore.getOutput(); res.println("REMOVE " + filename); res.flush(); @@ -214,6 +271,7 @@ public class Server { } if(!done) { + fileSystem.removeIndex(filename); System.out.println(filename + " failed to remove"); } @@ -233,17 +291,16 @@ public class Server { res.flush(); } - private void handleStoreACK(String[] tokens) { + private void handleStoreACK(String[] tokens) throws IOException { String filename = tokens[1]; - int dstorePort = Integer.parseInt(tokens[2]); if(fileSystem.getStore().containsKey(filename)) { - fileSystem.getStore().get(filename).add(fileSystem.getDstores().get(dstorePort)); + fileSystem.getStore().get(filename).add(null); } else { List<FSStore> d = new ArrayList<>(); - d.add(fileSystem.getDstores().get(dstorePort)); - Server.fileSystem.addStore(filename,d); + d.add(null); + Server.fileSystem.addStore(filename, d); } } @@ -256,6 +313,7 @@ public class Server { break; } } - } + fileSystem.getFileSizes().remove(filename); + } } -- GitLab