diff --git a/Controller$IndexEntry.class b/Controller$IndexEntry.class index 1a3af9c4b1553341ae7057985d0c3bb2d51cc7fd..d745c8e2de25162b50503edcae6b679006643ab7 100644 Binary files a/Controller$IndexEntry.class and b/Controller$IndexEntry.class differ diff --git a/Controller$SyncList.class b/Controller$SyncList.class new file mode 100644 index 0000000000000000000000000000000000000000..3aa93869f34b5831c8f1487af11b00f6e1812c3e Binary files /dev/null and b/Controller$SyncList.class differ diff --git a/Controller.class b/Controller.class index 338ca1cac64638e4f4032704a11ffa1948b256ee..ef4ebb6110d7d4a7fe514ac5b4b7d62058a0eb68 100644 Binary files a/Controller.class and b/Controller.class differ diff --git a/Controller.java b/Controller.java index d9661c4ebbae6fa9bc9bcc774ef41419766e163b..cf4f95b0bc8fed95acfc32ebaf1acf3e1b2df6ea 100644 --- a/Controller.java +++ b/Controller.java @@ -5,6 +5,7 @@ import java.util.List; import java.util.ArrayList; import java.util.Map; import java.util.HashMap; +import java.util.Collection; public class Controller { protected int cport; //Port to listen on @@ -18,6 +19,7 @@ public class Controller { protected int numberToStore; protected String status; protected Object storeAckLock; + protected List<Reloader> clientLoadList; public IndexEntry() { filesize = -1; @@ -25,6 +27,7 @@ public class Controller { numberToStore = 0; status = "store in progress"; storeAckLock = new Object(); + clientLoadList = new ArrayList<Reloader>(); } public synchronized void setFilesize(int filesize) { @@ -36,7 +39,7 @@ public class Controller { } public void addStoredBy(int dstore) { - storedBy.add(new Integer(dstore)); + storedBy.add(Integer.valueOf(dstore)); if(storedBy.size() == numberToStore) storeAckLock.notify(); } @@ -45,6 +48,16 @@ public class Controller { if(storedBy.size() == numberToStore) storeAckLock.notify(); } + public void removeStoredBy(int dstore) { + storedBy.remove(Integer.valueOf(dstore)); + if(storedBy.size() == 0) storeAckLock.notify(); + } + + public void removeStoredBy(List<Integer> dstores) { + storedBy.removeAll(dstores); + if(storedBy.size() == 0) storeAckLock.notify(); + } + public List<Integer> getStoredBy() { return storedBy; } @@ -57,13 +70,17 @@ public class Controller { this.status = status; } - public synchronized int getStatus() { + public synchronized String getStatus() { return status; } public Object getLock() { return storeAckLock; } + + public List<Reloader> getLoadList() { + return clientLoadList; + } } protected class SyncList extends ArrayList<Integer> { @@ -74,47 +91,53 @@ public class Controller { @Override public boolean add(Integer i) { synchronized(this) { - super.add(i); + return super.add(i); } } @Override - public boolean addAll(Collection<Integer> c) { + public boolean addAll(Collection<? extends Integer> c) { synchronized(this) { - super.addAll(c); + return super.addAll(c); } } @Override public Integer get(int i) { synchronized(this) { - super.get(i); + return super.get(i); } } @Override public int size() { synchronized(this) { - super.size(); + return super.size(); } } - @Override - public boolean remove(int i) { + public Integer remove(int i) { synchronized(this) { - super.remove(i); + return super.remove(i); } } - @Override public boolean remove(Integer i) { synchronized(this) { - super.remove(i); + return super.remove(i); } } } + protected class Reloader { + public boolean reload; + public Reloader() { + reload = false; + } + } + protected List<Integer> dstores; + protected Map<Integer,String[]> rebalanceMessages; protected Map<String,IndexEntry> index; public Controller(int cport, int rFactor, int timeout, int rebalancePeriod) { @@ -179,6 +202,7 @@ public class Controller { void handleMessage(String[] message, Socket client) throws Exception { if(message[0].equals("JOIN")) { dstores.add(Integer.parseInt(message[1])); + System.out.println("Dstore at " + message[1] + " joined"); rebalance(); } else if(message[0].equals("STORE")) { @@ -188,10 +212,10 @@ public class Controller { storeAck(client, message[1]); } else if(message[0].equals("LOAD")) { - load(client, message[1], false); + load(client, message[1]); } else if(message[0].equals("RELOAD")) { - load(client, message[1], true); + reload(message[1]); } else if(message[0].equals("REMOVE")) { remove(client, message[1]); @@ -200,7 +224,13 @@ public class Controller { list(client); } else { - //Log error and continue (throw exception?) + for(String name : message) { + if(!index.containsKey(name)) { + //Log error and continue (throw exception?) + return; + } + } + receiveDstoreList(client, message); } } @@ -262,93 +292,206 @@ public class Controller { } void storeAck(Socket client, String filename) throws Exception { - if(!index.containsKey(filename)) { - //Throw logging exception - return; - } - - IndexEntry thisEntry = index.get(filename); - thisEntry.addStoredBy(new Integer(client.getPort())); + new Thread(() -> { + if(!index.containsKey(filename)) { + //Throw logging exception + return; + } + + IndexEntry thisEntry = index.get(filename); + thisEntry.addStoredBy(Integer.valueOf(client.getPort())); + }).start(); } - void load(Socket client, String filename, boolean reload) throws Exception { - if(!index.containsKey(filename)) { - PrintWriter out = new PrintWriter(client.getOutputStream()); - out.print("ERROR DOES_NOT_EXIST"); - out.flush(); - out.close(); - } - - //Select a Dstore which contains the file - IndexEntry thisEntry = index.get(filename); - int thisStore = thisEntry.storedBy.get(0).intValue(); - int thisSize = thisEntry.filesize; - - // !!TO DO: RELOAD COMMAND!! - - //Send LOAD_FROM message - PrintWriter out = new PrintWriter(client.getOutputStream()); - out.print("LOAD_FROM " + thisStore + " " + thisSize); - out.flush(); - out.close(); + void load(Socket client, String filename) throws Exception { + new Thread(() -> { + try { + if(!index.containsKey(filename)) { + PrintWriter out = new PrintWriter(client.getOutputStream()); + out.print("ERROR DOES_NOT_EXIST"); + out.flush(); + out.close(); + return; + } + + //Select a Dstore which contains the file + IndexEntry thisEntry = index.get(filename); + int thisStore = thisEntry.storedBy.get(0).intValue(); + int thisSize = thisEntry.filesize; + + //Send LOAD_FROM message + PrintWriter out = new PrintWriter(client.getOutputStream()); + out.print("LOAD_FROM " + thisStore + " " + thisSize); + out.flush(); + + Reloader reloadLock = new Object(); + thisEntry.getLoadList().add(reloadLock); + int trials = 0; + while(true) { + reloadLock.wait(10 * timeout); + trials ++; + if(trials >= rFactor || !reloadLock.reload) break; + out.print("LOAD_FROM " + thisEntry.storedBy.get(trials).intValue() + " " + thisSize); + out.flush(); + reloadLock.reload = false; + } + + thisEntry.getLoadList().remove(reloadLock); + if(trials >= rFactor && reloadLock.reload) { + out.print("ERROR LOAD"); + out.flush(); + } + + out.close(); + } + catch(IOException e) { + e.printStackTrace(); + } + }).start(); + } + + void reload(String filename) { + new Thread(() -> { + try { + for(Reloader r : index.get(filename).getLoadList()) { + r.reload = true; + r.notify(); + } + } + catch(IOException e) { + e.printStackTrace(); + } + }).start(); } void remove(Socket client, String filename) throws Exception { - if(!index.containsKey(filename)) { - PrintWriter clientOut = new PrintWriter(client.getOutputStream()); - clientOut.print("ERROR DOES_NOT_EXIST"); - clientOut.flush(); - clientOut.close(); - } - - //Update index to "remove in progress" - IndexEntry entry = index.get(filename); - entry.status = "remove in progress"; - - //Send REMOVE message to all Dstores storing the file - for(Integer dstore : entry.storedBy) { - Socket socket = new Socket(InetAddress.getLocalHost(), dstore.intValue()); - PrintWriter out = new PrintWriter(socket.getOutputStream()); - out.write("REMOVE " + filename); - out.flush(); - out.close(); - socket.close(); - } - - //Wait for REMOVE_ACKs from all Dstores which were sent the REMOVE message - - //Update index to "remove complete" - entry.status = "remove complete"; - - //Send REMOVE_COMPLETE to client - PrintWriter clientOut = new PrintWriter(client.getOutputStream()); - clientOut.print("REMOVE_COMPLETE"); - clientOut.flush(); - clientOut.close(); + new Thread(() -> { + try { + if(!index.containsKey(filename)) { + PrintWriter clientOut = new PrintWriter(client.getOutputStream()); + clientOut.print("ERROR DOES_NOT_EXIST"); + clientOut.flush(); + clientOut.close(); + return; + } + + //Update index to "remove in progress" + IndexEntry entry = index.get(filename); + entry.status = "remove in progress"; + + //Send REMOVE message to all Dstores storing the file + for(Integer dstore : entry.storedBy) { + Socket socket = new Socket(InetAddress.getLocalHost(), dstore.intValue()); + PrintWriter out = new PrintWriter(socket.getOutputStream()); + out.write("REMOVE " + filename); + out.flush(); + out.close(); + socket.close(); + } + + //Wait for REMOVE_ACKs from all Dstores which were sent the REMOVE message + try { + entry.getLock().wait(timeout); + } + catch(InterruptedException e) { + e.printStackTrace(); + } + + if(entry.getStoredBy().size() > 0) { + //Log error + } + + //Update index to "remove complete" + entry.status = "remove complete"; + + //Send REMOVE_COMPLETE to client + PrintWriter clientOut = new PrintWriter(client.getOutputStream()); + clientOut.print("REMOVE_COMPLETE"); + clientOut.flush(); + clientOut.close(); + } + catch(IOException e) { + e.printStackTrace(); + } + }).start(); } void list(Socket client) throws Exception { - //Send file list to client - PrintWriter out = new PrintWriter(client.getOutputStream()); - for(String name : index.keySet()) { - out.println(name); - } - out.flush(); - out.close(); + new Thread(() -> { + try { + //Send file list to client + PrintWriter out = new PrintWriter(client.getOutputStream()); + for(String name : index.keySet()) { + out.println(name); + } + out.flush(); + out.close(); + } + catch(IOException e) { + e.printStackTrace(); + } + }).start(); } void rebalance() throws Exception { - //Send LIST message to each Dstore and receive their file list - - //Create a new file allocation so that: - //Each file appears rFactor times - //Each file appears at most once on each datastore - //Files are evenly distributed (Dstores differ in capacity by at most 1, no 2 datastores have identical file lists) - - //Make a (files to send, files to remove) pair for each Dstore - - //Send the respective REBALANCE message to each Dstore - - //Wait for REBALANCE_COMPLETE from all Dstores + new Thread(() -> { + if(rebalanceMessages != null) return; + Map<Integer,String[]> dstoreFiles = new HashMap<Integer,String[]>(); + rebalanceMessages = dstoreFiles; + try { + //Send LIST message to each Dstore and receive their file list + for(Integer dstore : dstores) { + Socket socket = new Socket(InetAddress.getLocalHost(), dstore.intValue()); + PrintWriter out = new PrintWriter(socket.getOutputStream()); + out.write("LIST"); + out.flush(); + out.close(); + socket.close(); + } + + dstoreFiles.wait(timeout); + if(dstoreFiles.size() < dstores.size()) { + //Log error + } + + //Create a new file allocation so that: + //Each file appears rFactor times + //Each file appears at most once on each datastore + //Files are evenly distributed (Dstores differ in capacity by at most 1, no 2 datastores have identical file lists) + for(Integer i : reshuffle(dstoreFiles.keySet())) { + + } + + //Make a (files to send, files to remove) pair for each Dstore + + //Send the respective REBALANCE message to each Dstore + + //Wait for REBALANCE_COMPLETE from all Dstores + } + catch(IOException e) { + e.printStackTrace(); + } + finally { + rebalanceMessages = null; + } + }).start(); + } + + void receiveDstoreList(Socket client, String[] list) { + new Thread(() -> { + if(rebalanceMessages == null) return; + rebalanceMessages.add(Integer.valueOf(client.getPort()), list); + if(rebalanceMessages.size() == dstores.size()) { + rebalanceMessages.notify(); + } + }).start(); + } + + List<Integer> reshuffle(Collection<Integer> col) { + List<Integer> list = new ArrayList<Integer>(); + for(Integer i : col) { + list.add(0, i); + } + return list; } } diff --git a/Dstore.class b/Dstore.class index 508e972562b57c9e441796e3b9ff255a2b8d89b9..4e4fda3f3e0789c67909d128772fc154545a4cf4 100644 Binary files a/Dstore.class and b/Dstore.class differ diff --git a/Dstore.java b/Dstore.java index 7a40e16fcc588aa6366b233d76d0318bdc151efa..ee9a08ec5dc5bb29d9d7a55ca9413653335aee7d 100644 --- a/Dstore.java +++ b/Dstore.java @@ -1,5 +1,7 @@ import java.io.*; +import java.lang.Runnable; import java.nio.file.Files; +import java.nio.file.Path; import java.net.*; import java.util.Map; import java.util.HashMap; @@ -88,131 +90,166 @@ public class Dstore { } void store(Socket client, String filename, int filesize, BufferedReader in) throws Exception { - //Send ACK message to client - PrintWriter out = new PrintWriter(client.getOutputStream()); - out.print("ACK"); - out.flush(); - out.close(); - - FileOutputStream writer = new FileOutputStream(fileFolder + "/" + filename, false); - - //Receive + write file content from client - int byteCount = filesize; - while(byteCount > 0) { - byte[] nextLine = in.readLine().getBytes(); - writer.write(nextLine); - writer.flush(); - byteCount -= nextLine.length; - } - writer.close(); - - //Send STORE_ACK message to the Controller - PrintWriter controllerOut = new PrintWriter(new Socket(InetAddress.getLocalHost(), cport).getOutputStream()); - controllerOut.print("STORE_ACK " + filename); - controllerOut.flush(); - controllerOut.close(); + new Thread(() -> { + try { + //Send ACK message to client + PrintWriter out = new PrintWriter(client.getOutputStream()); + out.print("ACK"); + out.flush(); + out.close(); + + FileOutputStream writer = new FileOutputStream(fileFolder + "/" + filename, false); + + //Receive + write file content from client + int byteCount = filesize; + while(byteCount > 0) { + byte[] nextLine = in.readLine().getBytes(); + writer.write(nextLine); + writer.flush(); + byteCount -= nextLine.length; + } + writer.close(); + + //Send STORE_ACK message to the Controller + PrintWriter controllerOut = new PrintWriter(new Socket(InetAddress.getLocalHost(), cport).getOutputStream()); + controllerOut.print("STORE_ACK " + filename); + controllerOut.flush(); + controllerOut.close(); + } + catch(IOException e) { + e.printStackTrace(); + } + }).start(); } void load(Socket client, String filename) throws Exception { - //Send the content of the file in fileFolder to the client - PrintWriter out = new PrintWriter(client.getOutputStream()); - FileInputStream reader; - try { - reader = new FileInputStream(fileFolder + "/" + filename); - } - catch(FileNotFoundException e) { - out.print("ERROR DOES_NOT_EXIST"); - out.flush(); - out.close(); - return; - } - - byte[] buf = new byte[8]; - while(reader.read(buf) != -1) { - out.print(new String(buf)); - out.flush(); - } - - reader.close(); - out.close(); + new Thread(() -> { + try { + //Send the content of the file in fileFolder to the client + PrintWriter out = new PrintWriter(client.getOutputStream()); + FileInputStream reader; + try { + reader = new FileInputStream(fileFolder + "/" + filename); + } + catch(FileNotFoundException e) { + out.print("ERROR DOES_NOT_EXIST"); + out.flush(); + out.close(); + return; + } + + byte[] buf = new byte[8]; + while(reader.read(buf) != -1) { + out.print(new String(buf)); + out.flush(); + } + + reader.close(); + out.close(); + } + catch(IOException e) { + e.printStackTrace(); + } + }).start(); } void remove(Socket client, String filename) throws Exception { - //Remove the file from fileFolder - Path path = new File(fileFolder + "/" + filename).toPath(); - PrintWriter out = new PrintWriter(client.getOutputStream()); - - if(Files.deleteIfExists(path)) { - //Send REMOVE_ACK message to client (the controller) - out.print("REMOVE_ACK"); - } - else { - //Send DOES NOT EXIST error - out.print("ERROR DOES_NOT_EXIST " + filename); - } - - out.flush(); - out.close(); + new Thread(() -> { + try { + //Remove the file from fileFolder + Path path = new File(fileFolder + "/" + filename).toPath(); + PrintWriter out = new PrintWriter(client.getOutputStream()); + + if(Files.deleteIfExists(path)) { + //Send REMOVE_ACK message to client (the controller) + out.print("REMOVE_ACK"); + } + else { + //Send DOES NOT EXIST error + out.print("ERROR DOES_NOT_EXIST " + filename); + } + + out.flush(); + out.close(); + } + catch(IOException e) { + e.printStackTrace(); + } + }).start(); } void list(Socket client) throws Exception { - //Send a list of all files in fileFolder to client (the controller) - PrintWriter out = new PrintWriter(client.getOutputStream()); - for(File file : new File(fileFolder).listFiles()) { - out.print(file.getName()); - out.flush(); - } - out.close(); + new Thread(() -> { + try { + //Send a list of all files in fileFolder to client (the controller) + PrintWriter out = new PrintWriter(client.getOutputStream()); + for(File file : new File(fileFolder).listFiles()) { + out.print(file.getName()); + out.flush(); + } + out.close(); + } + catch(IOException e) { + e.printStackTrace(); + } + }).start(); } void rebalance(Socket client, String[] message) throws Exception { - //Interpret files to send and files to remove from the message - Map<String,Integer[]> filesToSend; - String[] filesToRemove; - int index; - - int numberToSend = Integer.parseInt(message[1]); - index = 2; - filesToSend = new HashMap<String,Integer[]>(numberToSend); - for(int i=0; i<numberToSend; i++) { - String name = message[index]; - index++; - - int numberOfReceivers = Integer.parseInt(message[index]); - index++; - Integer[] receivers = new Integer[numberOfReceivers]; - for(int j=0; j<numberOfReceivers; j++) { - receivers[j] = Integer.parseInt(message[index]); + new Thread(() -> { + try { + //Interpret files to send and files to remove from the message + Map<String,Integer[]> filesToSend; + String[] filesToRemove; + int index; + + int numberToSend = Integer.parseInt(message[1]); + index = 2; + filesToSend = new HashMap<String,Integer[]>(numberToSend); + for(int i=0; i<numberToSend; i++) { + String name = message[index]; + index++; + + int numberOfReceivers = Integer.parseInt(message[index]); + index++; + Integer[] receivers = new Integer[numberOfReceivers]; + for(int j=0; j<numberOfReceivers; j++) { + receivers[j] = Integer.parseInt(message[index]); + index++; + } + + filesToSend.put(name, receivers); + } + + int numberToRemove = Integer.parseInt(message[index]); index++; + filesToRemove = new String[numberToRemove]; + for(int k=0; k<numberToRemove; k++) { + filesToRemove[k] = message[index]; + index++; + } + + //Send each file to send to the Dstore at the specified port number + for(String filename : filesToSend.keySet()) { + for(Integer dstore : filesToSend.get(filename)) { + //Same store functions as used in the client object + } + } + + //Remove each file to remove from fileFolder + for(String filename : filesToRemove) { + new File(fileFolder + "/" + filename).delete(); + } + + //Send REBALANCE_COMPLETE message to client (the controller) + PrintWriter out = new PrintWriter(client.getOutputStream()); + out.print("REBALANCE COMPLETE"); + out.flush(); + out.close(); } - - filesToSend.put(name, receivers); - } - - int numberToRemove = Integer.parseInt(message[index]); - index++; - filesToRemove = new String[numberToRemove]; - for(int k=0; k<numberToRemove; k++) { - filesToRemove[k] = message[index]; - index++; - } - - //Send each file to send to the Dstore at the specified port number - for(String filename : filesToSend.keySet()) { - for(Integer dstore : filesToSend.get(filename)) { - //Same store functions as used in the client object + catch(IOException e) { + e.printStackTrace(); } - } - - //Remove each file to remove from fileFolder - for(String filename : filesToRemove) { - new File(fileFolder + "/" + filename).delete(); - } - - //Send REBALANCE_COMPLETE message to client (the controller) - PrintWriter out = new PrintWriter(client.getOutputStream()); - out.print("REBALANCE COMPLETE"); - out.flush(); - out.close(); + }).start(); } }