diff --git a/Controller$IndexEntry.class b/Controller$IndexEntry.class index d745c8e2de25162b50503edcae6b679006643ab7..adc524c042879907e90d45d3d52b889b6660c7ad 100644 Binary files a/Controller$IndexEntry.class and b/Controller$IndexEntry.class differ diff --git a/Controller$Reloader.class b/Controller$Reloader.class new file mode 100644 index 0000000000000000000000000000000000000000..1778f421b3e1ecd1109fa137402893ab772a7d5e Binary files /dev/null and b/Controller$Reloader.class differ diff --git a/Controller.class b/Controller.class index ef4ebb6110d7d4a7fe514ac5b4b7d62058a0eb68..86c939b219a47dffd1fcd6905d10b5000029bee8 100644 Binary files a/Controller.class and b/Controller.class differ diff --git a/Controller.java b/Controller.java index 481360c65a0f74105b7ae335ed46dfe59fbf6804..12eb9b80571c7407410d1febff722dca6fbcc4a9 100644 --- a/Controller.java +++ b/Controller.java @@ -1,6 +1,8 @@ import java.io.*; import java.net.*; import java.lang.Runnable; +import java.lang.Math; +import java.util.Iterator; import java.util.List; import java.util.ArrayList; import java.util.Map; @@ -111,15 +113,23 @@ public class Controller { int timeout = Integer.parseInt(args[2]); int rebalancePeriod = Integer.parseInt(args[3]); + if(cport < 0 || rFactor < 1 || timeout < 0 || rebalancePeriod < 0) { + throw new Exception("Infeasible values provided as arguments"); + } + Controller controller = new Controller(cport, rFactor, timeout, rebalancePeriod); controller.start(); } catch(IndexOutOfBoundsException e) { - System.out.println("Command line arguments have not been provided"); + System.err.println("Command line arguments have not been provided"); return; } catch(NumberFormatException e) { - System.out.println("Command line arguments must be integers"); + System.err.println("Command line arguments must be integers"); + return; + } + catch(Exception e) { + e.printStackTrace(); return; } } @@ -198,9 +208,12 @@ public class Controller { //Select Dstores int[] storesToStore = new int[rFactor]; - for(int i=0; i<rFactor; i++) { - Integer thisStore = dstores.get(i); - storesToStore[i] = thisStore.intValue(); + synchronized(dstores) { + Iterator<Integer> it = dstores.keySet().iterator(); + for(int i=0; i<rFactor; i++) { + Integer thisStore = it.next(); + storesToStore[i] = thisStore.intValue(); + } } entry.setNumberToStore(rFactor); @@ -213,7 +226,12 @@ public class Controller { new Thread(() -> { String[] message = dstores.get(Integer.valueOf(port)).receive().split(" "); if(message[0].equals("STORE_ACK")) { - storeAck(Integer.valueOf(port), message[1]); + try { + storeAck(Integer.valueOf(port), message[1]); + } + catch(Exception e) { + //Log error + } } else { //Log error @@ -282,7 +300,12 @@ public class Controller { thisEntry.getLoadList().add(reloadLock); int trials = 0; while(true) { - reloadLock.wait(10 * timeout); + try { + reloadLock.wait(10 * timeout); + } + catch(InterruptedException e) { + e.printStackTrace(); + } trials ++; if(trials >= rFactor || !reloadLock.reload) break; out.println("LOAD_FROM " + thisEntry.storedBy.get(trials).intValue() + " " + thisSize); @@ -306,14 +329,9 @@ public class Controller { void reload(String filename) { new Thread(() -> { - try { - for(Reloader r : index.get(filename).getLoadList()) { - r.reload = true; - r.notify(); - } - } - catch(IOException e) { - e.printStackTrace(); + for(Reloader r : index.get(filename).getLoadList()) { + r.reload = true; + r.notify(); } }).start(); } @@ -393,74 +411,136 @@ public class Controller { void rebalance() throws Exception { new Thread(() -> { if(rebalanceMessages != null) return; - Map<Integer,String[]> dstoreFiles = new HashMap<Integer,String[]>(); - rebalanceMessages = dstoreFiles; - try { - //Send LIST message to each Dstore and receive their file list - for(Integer dstore : dstores.keySet()) { - new Thread(() -> { - String[] message = dstores.get(dstore).sendAndReceive("LIST").split(" "); - receiveDstoreList(dstore.intValue(), message); - }).start(); - } - - dstoreFiles.wait(timeout); - if(dstoreFiles.size() < dstores.size()) { - //Log error - } - - //Create a new file allocation so that: - //Each file appears rFactor times - //Each file appears at most once on each datastore - //Files are evenly distributed (Dstores differ in capacity by at most 1, no 2 datastores have identical file lists) - List<Integer> storeOrder = reshuffle(dstoreFiles.keySet()); - List<String> fileList = new ArrayList<String>(); - for(Integer i : reshuffle(dstoreFiles.keySet())) { - for(String s : dstoreFiles.get(i)) { - if(!fileList.contains(s)) { - fileList.add(s); + Map<Integer,List<String>> dstoreFiles = new HashMap<Integer,List<String>>(); + synchronized(dstoreFiles) { + rebalanceMessages = dstoreFiles; + try { + //Send LIST message to each Dstore and receive their file list + for(Integer dstore : dstores.keySet()) { + dstoreFiles.put(dstore, new ArrayList<String>()); + + new Thread(() -> { + String[] message = dstores.get(dstore).sendAndReceive("LIST").split(" "); + receiveDstoreList(dstore.intValue(), message); + }).start(); + } + + dstoreFiles.wait(timeout); + if(dstoreFiles.size() < dstores.size()) { + //Log error + } + + //Create a new file allocation so that: + //Each file appears rFactor times + //Each file appears at most once on each datastore + //Files are evenly distributed (Dstores differ in capacity by at most 1, no 2 datastores have identical file lists) + List<Integer> storeOrder = reshuffle(dstoreFiles.keySet()); + List<String> fileList = new ArrayList<String>(); + for(Integer i : reshuffle(dstoreFiles.keySet())) { + for(String s : dstoreFiles.get(i)) { + if(!fileList.contains(s)) { + fileList.add(s); + } } } - } - - Map<Integer,List<String>> newAlloc = new HashMap<Integer,List<String>>(); - int pos = 0; - int storeSize = Math.ceiling((fileList.size() * rFactor) / dstores.size()); - for(Integer i : dstoreFiles.keySet()) { - newAlloc.put(i, new ArrayList<String>(storeSize)); - } - for(String file : fileList) { - for(int j=0; j<rFactor; j++) { - newAlloc.get(pos).add(file); - pos ++; - if(pos >= newAlloc.size()) pos = 0; + + Map<Integer,List<String>> requireIndex = new HashMap<Integer,List<String>>(); + Map<Integer,List<String>> removeIndex = new HashMap<Integer,List<String>>(); + int pos = 0; + int storeSize = (int) Math.ceil((fileList.size() * rFactor) / dstores.size()); + for(Integer i : dstoreFiles.keySet()) { + requireIndex.put(i, new ArrayList<String>()); + removeIndex.put(i, new ArrayList<String>()); } - } - - //Make a (files to send, files to remove) pair for each Dstore - /* - Map<Integer,String> outMessages = new HashMap<Integer,String>(); - for(Integer dstore : storeOrder) { - String[] oldFiles = dstoreFiles.get(dstore); - List<String> newFiles = newAlloc.get(dstore); - for(String file : oldFiles) { - if(!newFiles.) + Iterator<Integer> it; + for(String file : fileList) { + for(int j=0; j<rFactor; j++) { + if(it == null || !it.hasNext()) { + it = dstoreFiles.keySet().iterator(); + } + //If indexed dstore does not have the file, add it to its requireIndex entry + Integer thisStore = it.next(); + if(!dstoreFiles.get(thisStore).contains(file)) { + requireIndex.get(thisStore).add(file); + } + } + //Dstores not chosen in the above loop must have an entry added to removeIndex, if they have the file + for(int j=0; j<(requireIndex.size() - rFactor); j++) { + if(it == null || !it.hasNext()) { + it = dstoreFiles.keySet().iterator(); + } + + Integer thisStore = it.next(); + if(dstoreFiles.get(thisStore).contains(file)) { + removeIndex.get(thisStore).add(file); + } + } + } + + Integer acksReceived = new Integer(0); + for(Integer thisStore : storeOrder) { + List<String> sendMessages = new ArrayList<String>(); + for(String file : dstoreFiles.get(thisStore)) { + if(isEmptyListMap(requiredFiles)) break; + + String fileMessage = ""; + for(Integer otherStore : requiredFiles.keySet()) { + if(thisStore.equals(otherStore)) continue; + for(String otherFile : requiredFiles.get(otherStore)) { + if(file.equals(otherFile)) { + requiredFiles.get(otherStore).remove(otherFile); + fileMessage = fileMessage + " " + otherStore.toString(); + break; + } + } + } + fileMessage = file + " " + (fileMessage.trim().split(" ").length) + fileMessage; + sendMessages.add(fileMessage); + } + + String message = "REBALANCE " + sendMessages.size(); + for(String s : sendMessages) { + message = message + " " + s; + } + message = message + " " + removeIndex.get(thisStore).size(); + for(String f : removeIndex.get(thisStore)) { + message = message + " " + f; + } + + //Send message to the Dstore + new Thread(() -> { + String returnMessage = dstores.get(thisStore).sendAndReceive(message); + if(!returnMessage.equals("REBALANCE_COMPLETE")) { + //Log error + } + synchronized(acksReceived) { + acksReceived += 1; + if(acksReceived.intValue() == storeOrder.size()) { + acksReceived.notifyAll(); + } + } + }).start(); + } + + //Wait for REBALANCE_COMPLETE from all Dstores + synchronized(acksReceived) { + try { + acksReceived.wait(timeout); + if(acksReceived.intValue < storeOrder.size()) { + //Restart rebalance operation + } + } + catch(InterruptedException e) { + e.printStackTrace(); + } } } - */ - - //Send the respective REBALANCE message to each Dstore - - //Wait for REBALANCE_COMPLETE from all Dstores - } - catch(IOException e) { - e.printStackTrace(); - } - catch(Exception e) { - e.printStackTrace(); - } - finally { - rebalanceMessages = null; + catch(Exception e) { + e.printStackTrace(); + } + finally { + rebalanceMessages = null; + } } }).start(); } @@ -475,9 +555,11 @@ public class Controller { } } - rebalanceMessages.put(port, list); - if(rebalanceMessages.size() == dstores.size()) { - rebalanceMessages.notify(); + synchronized(rebalanceMessages) { + rebalanceMessages.put(port, list); + if(rebalanceMessages.size() == dstores.size()) { + rebalanceMessages.notify(); + } } } @@ -488,4 +570,13 @@ public class Controller { } return list; } + + boolean isEmptyListMap(Map<T,List<U>> map) { + for(List<U> list : map.entrySet()) { + if(!list.isEmpty()) { + return false; + } + } + return true; + } } diff --git a/Dstore.class b/Dstore.class index 4e4fda3f3e0789c67909d128772fc154545a4cf4..f175e51d292f7e107fa06774569a117094b0ff31 100644 Binary files a/Dstore.class and b/Dstore.class differ diff --git a/Dstore.java b/Dstore.java index 6989e271f1ddc84275ef9c0d7bc78fa685425897..018a925dc83089ee70663b21b142f117db112c29 100644 --- a/Dstore.java +++ b/Dstore.java @@ -3,6 +3,8 @@ import java.lang.Runnable; import java.nio.file.Files; import java.nio.file.Path; import java.net.*; +import java.util.List; +import java.util.ArrayList; import java.util.Map; import java.util.HashMap; @@ -31,6 +33,10 @@ public class Dstore { int timeout = Integer.parseInt(args[2]); String fileFolder = args[3]; + if(port < 0 || cport < 0 || timeout < 0) { + throw new Exception("Infeasible values provided as arguments"); + } + Dstore dstore = new Dstore(port, cport, timeout, fileFolder); dstore.start(); } @@ -42,6 +48,10 @@ public class Dstore { System.out.println("Command line arguments must be integers"); return; } + catch(Exception e) { + e.printStackTrace(); + return; + } } public void start() { @@ -55,8 +65,10 @@ public class Dstore { new Thread(() -> { while(true) { try { - String[] message = controllerIn.readLine().split(" "); - handleMessage(message, socket, controllerIn); + String message = controllerIn.readLine(); + if(message != null) { + handleMessage(message.split(" "), socket, controllerIn); + } } catch(Exception e) { e.printStackTrace(); @@ -133,7 +145,7 @@ public class Dstore { controllerOut.close(); if(fileSizes.containsKey(filename)) fileSizes.remove(filename); - fileSizes.add(filename, filesize); + fileSizes.put(filename, filesize); } catch(IOException e) { e.printStackTrace(); @@ -160,7 +172,7 @@ public class Dstore { OutputStream contentOut = client.getOutputStream(); byte[] buf = new byte[8]; while(reader.read(buf) != -1) { - contentOut.write(new String(buf)); + contentOut.write(buf); contentOut.flush(); } @@ -199,101 +211,92 @@ public class Dstore { void list() throws Exception { new Thread(() -> { - try { - //Send a list of all files in fileFolder to client (the controller) - for(File file : new File(fileFolder).listFiles()) { - controllerOut.println(file.getName()); - controllerOut.flush(); - } - } - catch(IOException e) { - e.printStackTrace(); + //Send a list of all files in fileFolder to client (the controller) + for(File file : new File(fileFolder).listFiles()) { + controllerOut.println(file.getName()); + controllerOut.flush(); } }).start(); } void rebalance(String[] message) throws Exception { new Thread(() -> { - try { - //Interpret files to send and files to remove from the message - Map<Integer,List<String>> filesToSend; - String[] filesToRemove; - int index; - - int numberToSend = Integer.parseInt(message[1]); - index = 2; - filesToSend = new HashMap<Integer,List<String>>(); - for(int i=0; i<numberToSend; i++) { - String name = message[index]; - index++; - - int numberOfReceivers = Integer.parseInt(message[index]); - index++; - for(int j=0; j<numberOfReceivers; j++) { - Integer receiver = Integer.parseInt(message[index]); - if(!filesToSend.containsKey(receiver)) { - filesToSend.put(receiver,new ArrayList<String>()); - } - filesToSend.get(receiver).add(name); - index++; - } - } + //Interpret files to send and files to remove from the message + Map<Integer,List<String>> filesToSend; + String[] filesToRemove; + int index; + + int numberToSend = Integer.parseInt(message[1]); + index = 2; + filesToSend = new HashMap<Integer,List<String>>(); + for(int i=0; i<numberToSend; i++) { + String name = message[index]; + index++; - int numberToRemove = Integer.parseInt(message[index]); + int numberOfReceivers = Integer.parseInt(message[index]); index++; - filesToRemove = new String[numberToRemove]; - for(int k=0; k<numberToRemove; k++) { - filesToRemove[k] = message[index]; + for(int j=0; j<numberOfReceivers; j++) { + Integer receiver = Integer.parseInt(message[index]); + if(!filesToSend.containsKey(receiver)) { + filesToSend.put(receiver,new ArrayList<String>()); + } + filesToSend.get(receiver).add(name); index++; } - - //Send each file to send to the Dstore at the specified port number - for(Integer dstore : filesToSend.keySet()) { - for(String filename : filesToSend.get(dstore)) { - new Thread(() -> { - try { - Socket socket = new Socket(InetAddress.getLocalHost(), dstore.intValue()); - PrintWriter out = new PrintWriter(socket.getOutputStream()); - out.println("STORE " + filename + " " + fileSizes.get(filename)); - out.flush(); - out.close(); - - BufferedReader in = new BufferedReader(new InputStreamReader(socket.getInputStream())); - if(!in.readLine().equals("ACK")) { - //Log error - } - in.close(); - - byte[] content = new byte[8]; - FileInputStream fileIn = new FileInputStream(fileFolder + "/" + filename); - OutputStream fileOut = client.getOutputStream(); - while(fileIn.read(content) > 0) { - fileOut.write(content); - fileOut.flush(); - } - fileIn.close(); - fileOut.close(); - socket.close(); + } + + int numberToRemove = Integer.parseInt(message[index]); + index++; + filesToRemove = new String[numberToRemove]; + for(int k=0; k<numberToRemove; k++) { + filesToRemove[k] = message[index]; + index++; + } + + //Send each file to send to the Dstore at the specified port number + for(Integer dstore : filesToSend.keySet()) { + for(String filename : filesToSend.get(dstore)) { + new Thread(() -> { + try { + Socket socket = new Socket(InetAddress.getLocalHost(), dstore.intValue()); + PrintWriter out = new PrintWriter(socket.getOutputStream()); + out.println("STORE " + filename + " " + fileSizes.get(filename)); + out.flush(); + out.close(); + + BufferedReader in = new BufferedReader(new InputStreamReader(socket.getInputStream())); + if(!in.readLine().equals("ACK")) { + //Log error } - catch(IOException e) { - e.printStackTrace(); + in.close(); + + byte[] content = new byte[8]; + FileInputStream fileIn = new FileInputStream(fileFolder + "/" + filename); + OutputStream fileOut = socket.getOutputStream(); + while(fileIn.read(content) > 0) { + fileOut.write(content); + fileOut.flush(); } - }).start(); - } - } - - //Remove each file to remove from fileFolder - for(String filename : filesToRemove) { - new File(fileFolder + "/" + filename).delete(); + fileIn.close(); + fileOut.close(); + socket.close(); + } + catch(IOException e) { + e.printStackTrace(); + } + }).start(); } - - //Send REBALANCE_COMPLETE message to client (the controller) - controllerOut.print("REBALANCE COMPLETE"); - controllerOut.flush(); } - catch(IOException e) { - e.printStackTrace(); + + //Remove each file to remove from fileFolder + for(String filename : filesToRemove) { + new File(fileFolder + "/" + filename).delete(); } + + //Send REBALANCE_COMPLETE message to client (the controller) + controllerOut.print("REBALANCE COMPLETE"); + controllerOut.flush(); + //TO DO: WAIT FOR RESPONSES BEFORE SENDING REBALANCE_COMPLETE }).start(); } } diff --git a/DstoreConnection.class b/DstoreConnection.class new file mode 100644 index 0000000000000000000000000000000000000000..46f9894507e1969d0e9e198cd8bf1d8fda9c031a Binary files /dev/null and b/DstoreConnection.class differ diff --git a/DstoreConnection.java b/DstoreConnection.java index 39751a317f6d1b4e2a89dd36c240c3174aca5eaf..2d426a52747b32dcac0edab3a0d2307e129f8ebe 100644 --- a/DstoreConnection.java +++ b/DstoreConnection.java @@ -11,29 +11,33 @@ public class DstoreConnection { protected Socket socket; protected BufferedReader reader; protected PrintWriter writer; + protected boolean available; public DstoreConnection(Socket socket) { this.socket = socket; - reader = new BufferedReader(new InputStreamReader(socket.getInputStream())); - writer = new PrintWriter(socket.getOutputStream()); + try { + reader = new BufferedReader(new InputStreamReader(socket.getInputStream())); + writer = new PrintWriter(socket.getOutputStream()); + available = true; + } + catch(IOException e) { + e.printStackTrace(); + available = false; + } } public String sendAndReceive(String message) { synchronized(this) { - try { - writer.println(message); - writer.flush(); - } - catch(IOException e) { - e.printStackTrace(); - return ""; - } + if(!available) return "ERROR"; + writer.println(message); + writer.flush(); return localReceive(); } } public String receive() { synchronized(this) { + if(!available) return "ERROR"; return localReceive(); } } diff --git a/javac.20210419_161632.args b/javac.20210419_161632.args new file mode 100644 index 0000000000000000000000000000000000000000..159b3933b6658f4b9e6b60f45d28c5f300041a5d --- /dev/null +++ b/javac.20210419_161632.args @@ -0,0 +1 @@ +Controller.java