diff --git a/Controller$1AcksReceived.class b/Controller$1AcksReceived.class index de2954fe46168c4914df9082fb7b12d7647ea5ed..76f9c7cf8a9ab14a2ee5a059bcaaa1606c87a522 100644 Binary files a/Controller$1AcksReceived.class and b/Controller$1AcksReceived.class differ diff --git a/Controller$1RequireHandle.class b/Controller$1RequireHandle.class new file mode 100644 index 0000000000000000000000000000000000000000..43fd8ea05aa94b6a0013a6c872b46ea016bd4086 Binary files /dev/null and b/Controller$1RequireHandle.class differ diff --git a/Controller$RebalanceThread.class b/Controller$RebalanceThread.class index 25e1c787e72efd4e87508bd8fff5ab539839fb1a..d2090d52270f1056374e16e0e80540658b15610f 100644 Binary files a/Controller$RebalanceThread.class and b/Controller$RebalanceThread.class differ diff --git a/Controller$RequireHandle.class b/Controller$RequireHandle.class new file mode 100644 index 0000000000000000000000000000000000000000..d10860a1501c7cd444e6230df4a9437cf76f3b3a Binary files /dev/null and b/Controller$RequireHandle.class differ diff --git a/Controller.class b/Controller.class index f8f4f7fba84db7c0c456b8882bec45b194ed4beb..d9b8ef8a7fda28c4397e7c61a2fd5d1f92209ffe 100644 Binary files a/Controller.class and b/Controller.class differ diff --git a/Controller.java b/Controller.java index de3651a37cee5e217346e0e781f6def63ef82d9b..9b13ac0c8b2e8944accd3a9a849783b5aae59ee3 100644 --- a/Controller.java +++ b/Controller.java @@ -155,8 +155,9 @@ public class Controller { BufferedReader in = new BufferedReader(new InputStreamReader(client.getInputStream())); String[] message = in.readLine().split(" "); if(message[0].equals("JOIN")) { - dstores.put(Integer.parseInt(message[1]), new DstoreConnection(client, timeout)); - System.out.println("Dstore at " + message[1] + " joined"); + int portNumber = Integer.parseInt(message[1]); + dstores.put(portNumber, new DstoreConnection(client, portNumber, timeout)); + System.out.println("Dstore at " + portNumber + " joined"); try {rebalanceThread.interrupt();} catch(SecurityException e) {e.printStackTrace();} } else { @@ -189,7 +190,7 @@ public class Controller { } catch(Exception e) { //Log error - System.out.println("Controller error while accepting connections!"); + System.out.println("Error accepting new connection"); System.out.println("Continue..."); } } @@ -521,42 +522,43 @@ public class Controller { //Each file appears rFactor times //Each file appears at most once on each datastore //Files are evenly distributed (Dstores differ in capacity by at most 1, no 2 datastores have identical file lists) - List<Integer> storeOrder = reshuffle(dstoreFiles.keySet()); + + //First, compile all the received files from all dstores into one list + //(This should contain all the elements of index.keySet(), so is probably made redundant) List<String> fileList = new ArrayList<String>(); - for(Integer i : storeOrder) { - for(String s : dstoreFiles.get(i)) { + for(List<String> l : dstoreFiles.values()) { + for(String s : l) { if(!fileList.contains(s)) { fileList.add(s); } } } - Map<Integer,List<String>> requireIndex = new HashMap<Integer,List<String>>(); + //Create a new index each for files required and files to remove + Map<Integer,List<RequireHandle>> requireIndex = new HashMap<Integer,List<RequireHandle>>(); Map<Integer,List<String>> removeIndex = new HashMap<Integer,List<String>>(); int pos = 0; int storeSize = (int) Math.ceil((fileList.size() * rFactor) / dstores.size()); for(Integer i : dstoreFiles.keySet()) { - requireIndex.put(i, new ArrayList<String>()); + requireIndex.put(i, new ArrayList<RequireHandle>()); removeIndex.put(i, new ArrayList<String>()); } - Iterator<Integer> it = null; + + //Insert files into the new indexes. These are allocated according to the new store order + List<Integer> storeOrder; + Iterator<Integer> it; for(String file : fileList) { + storeOrder = reshuffle(dstoreFiles.keySet()); + it = storeOrder.iterator(); for(int j=0; j<rFactor; j++) { - if(it == null || !it.hasNext()) { - it = storeOrder.iterator(); - } //If indexed dstore does not have the file, add it to its requireIndex entry Integer thisStore = it.next(); if(!dstoreFiles.get(thisStore).contains(file)) { - requireIndex.get(thisStore).add(file); + requireIndex.get(thisStore).add(new RequireHandle(file)); } } //Dstores not chosen in the above loop must have an entry added to removeIndex, if they have the file - for(int j=0; j<(requireIndex.size() - rFactor); j++) { - if(it == null || !it.hasNext()) { - it = storeOrder.iterator(); - } - + while(it.hasNext()) { Integer thisStore = it.next(); if(dstoreFiles.get(thisStore).contains(file)) { removeIndex.get(thisStore).add(file); @@ -564,6 +566,7 @@ public class Controller { } } + //This class acts as a holder for a modifiable integer value, so that threads can synchronize on its lock class AcksReceived { int value; public AcksReceived() { @@ -576,24 +579,31 @@ public class Controller { return value; } } + + //For each dstore, compile a new message based on the contents of the indexes then send this message and receive a reply AcksReceived acksReceived = new AcksReceived(); - for(Integer thisStore : storeOrder) { + for(Integer thisStore : dstoreFiles.keySet()) { + //Compose files to send List<String> sendMessages = new ArrayList<String>(); for(String file : dstoreFiles.get(thisStore)) { - if(isEmptyListMap(requireIndex)) break; + //All files required by other dstores are getting sent = no need for the following computation + if(allHandled(requireIndex)) break; String fileMessage = ""; for(Integer otherStore : requireIndex.keySet()) { if(thisStore.equals(otherStore)) continue; - for(String otherFile : requireIndex.get(otherStore)) { - if(file.equals(otherFile)) { - requireIndex.get(otherStore).remove(otherFile); - fileMessage = fileMessage + " " + otherStore.toString(); + for(RequireHandle otherHandle : requireIndex.get(otherStore)) { + if(file.equals(otherHandle.filename)) { + if(!otherHandle.handled) { + //Another store requires a file that this store has - send it there + otherHandle.handled = true; + fileMessage = fileMessage + " " + otherStore.toString(); + } break; } } } - if(fileMessage.equals("")) continue; + if(fileMessage.equals("")) continue; //No files to send fileMessage = file + " " + (fileMessage.trim().split(" ").length) + fileMessage; sendMessages.add(fileMessage); } @@ -602,6 +612,7 @@ public class Controller { for(String s : sendMessages) { message = message + " " + s; } + //Compose files to remove message = message + " " + removeIndex.get(thisStore).size(); for(String f : removeIndex.get(thisStore)) { message = message + " " + f; @@ -611,7 +622,8 @@ public class Controller { String finalMessage = message; new Thread(() -> { try { - String returnMessage = dstores.get(thisStore).sendAndReceive(finalMessage, "REBALANCE_COMPLETE"); + DstoreConnection connection = dstores.get(thisStore); + String returnMessage = connection.sendAndReceive(finalMessage, "REBALANCE_COMPLETE"); if(!returnMessage.equals("REBALANCE_COMPLETE")) { //Log error System.out.println("Dstore " + thisStore + " should have sent REBALANCE_COMPLETE but Controller received " + returnMessage); @@ -619,10 +631,18 @@ public class Controller { synchronized(acksReceived) { acksReceived.incr(); - if(acksReceived.getValue() == storeOrder.size()) { + if(acksReceived.getValue() == dstoreFiles.size()) { acksReceived.notifyAll(); } } + + for(int i=0; i<requireIndex.get(thisStore).size(); i++) { + returnMessage = connection.receive("STORE_ACK"); + if(!returnMessage.split(" ")[0].equals("STORE_ACK")) { + //Log error + System.out.println("Dstore " + thisStore + " should have sent STORE_ACK but Controller received " + returnMessage); + } + } } catch(DstoreDisconnectException e) { e.printStackTrace(); @@ -636,12 +656,12 @@ public class Controller { try { System.out.println("Waiting for REBALANCE_COMPLETE..."); acksReceived.wait(timeout); - if(acksReceived.getValue() < storeOrder.size()) { + if(acksReceived.getValue() < dstoreFiles.size()) { //Restart rebalance operation System.out.println("Not all REBALANCE_COMPLETEs received. Restarting rebalance operation..."); success = false; } - else if(acksReceived.getValue() > storeOrder.size()) { + else if(acksReceived.getValue() > dstoreFiles.size()) { System.out.println("Too many REBALANCE_COMPLETEs received"); } } @@ -655,8 +675,8 @@ public class Controller { Iterator<Integer> jt = requireIndex.keySet().iterator(); while(jt.hasNext()) { Integer dstore = jt.next(); - for(String file : requireIndex.get(dstore)) { - index.get(file).addStoredBy(Integer.valueOf(dstore), false); + for(RequireHandle handle : requireIndex.get(dstore)) { + index.get(handle.filename).addStoredBy(Integer.valueOf(dstore), false); } for(String file : removeIndex.get(dstore)) { index.get(file).removeStoredBy(Integer.valueOf(dstore)); @@ -721,10 +741,20 @@ public class Controller { return list; } - <T,U> boolean isEmptyListMap(Map<T,List<U>> map) { - for(List<U> list : map.values()) { - if(!list.isEmpty()) { - return false; + //Helper class for rebalance method - contains a filename and a boolean which is true if a dstore is going to send this file + protected class RequireHandle { + public String filename; + public boolean handled; + public RequireHandle(String filename) { + this.filename = filename; + handled = false; + } + } + + boolean allHandled(Map<Integer,List<RequireHandle>> map) { + for(List<RequireHandle> list : map.values()) { + for(RequireHandle handle : list) { + if(!handle.handled) return false; } } return true; diff --git a/Dstore.class b/Dstore.class index ff1e5b46bfd884be9e7581f53b80d4c57d1bce07..18d98f5ac91d2e18bbf60ed629b2f407af49e0ce 100644 Binary files a/Dstore.class and b/Dstore.class differ diff --git a/Dstore.java b/Dstore.java index f9c67703844d390cfb9b4c8e245a6873ae7bb093..4bdde39763a54ff48a58c78cb531788628bf324a 100644 --- a/Dstore.java +++ b/Dstore.java @@ -7,6 +7,8 @@ import java.util.List; import java.util.ArrayList; import java.util.Map; import java.util.HashMap; +import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.TimeUnit; public class Dstore { protected int port; //Port to listen on @@ -68,8 +70,8 @@ public class Dstore { } public void start() { - try { - controllerSocket = new Socket(InetAddress.getLocalHost(), cport); + try(Socket controllerSocket = new Socket(InetAddress.getLocalHost(), cport)) { + this.controllerSocket = controllerSocket; controllerIn = new BufferedReader(new InputStreamReader(controllerSocket.getInputStream())); controllerOut = new PrintWriter(controllerSocket.getOutputStream()); controllerOut.println("JOIN " + port); @@ -151,8 +153,10 @@ public class Dstore { writer.close(); //Send STORE_ACK message to the Controller - controllerOut.println("STORE_ACK " + filename); - controllerOut.flush(); + synchronized(controllerOut) { + controllerOut.println("STORE_ACK " + filename); + controllerOut.flush(); + } if(fileSizes.containsKey(filename)) fileSizes.remove(filename); fileSizes.put(filename, Long.valueOf(filesize)); @@ -211,14 +215,18 @@ public class Dstore { if(Files.deleteIfExists(path)) { //Send REMOVE_ACK message to client (the controller) - controllerOut.println("REMOVE_ACK " + filename); + synchronized(controllerOut) { + controllerOut.println("REMOVE_ACK " + filename); + controllerOut.flush(); + } } else { //Send DOES NOT EXIST error - controllerOut.println("ERROR DOES_NOT_EXIST " + filename); + synchronized(controllerOut) { + controllerOut.println("ERROR DOES_NOT_EXIST " + filename); + controllerOut.flush(); + } } - - controllerOut.flush(); } catch(IOException e) { e.printStackTrace(); @@ -234,8 +242,10 @@ public class Dstore { message = message + " " + file.getName(); } if(message.equals("")) message = "ERROR_EMPTY"; - controllerOut.println(message.trim()); - controllerOut.flush(); + synchronized(controllerOut) { + controllerOut.println(message.trim()); + controllerOut.flush(); + } }).start(); } @@ -253,6 +263,7 @@ public class Dstore { } System.out.println("Interpreting message:" + tmessage); int numberToSend = Integer.parseInt(message[1]); + int totalReceivers = 0; index = 2; filesToSend = new HashMap<Integer,List<String>>(); for(int i=0; i<numberToSend; i++) { @@ -260,6 +271,7 @@ public class Dstore { index++; int numberOfReceivers = Integer.parseInt(message[index]); + totalReceivers += numberOfReceivers; index++; for(int j=0; j<numberOfReceivers; j++) { Integer receiver = Integer.parseInt(message[index]); @@ -281,6 +293,7 @@ public class Dstore { System.out.println("Interpreting complete, will send " + numberToSend + " and remove " + numberToRemove); //Send each file to send to the Dstore at the specified port number + CyclicBarrier barrier = new CyclicBarrier(totalReceivers + 1); for(Integer dstore : filesToSend.keySet()) { for(String filename : filesToSend.get(dstore)) { new Thread(() -> { @@ -314,9 +327,13 @@ public class Dstore { catch(IOException e) { e.printStackTrace(); } + finally { + try {barrier.await();} catch(Exception e) {} + } }).start(); } } + try {barrier.await((long) timeout, TimeUnit.MILLISECONDS);} catch(Exception e) {e.printStackTrace();} //Remove each file to remove from fileFolder for(String filename : filesToRemove) { @@ -325,8 +342,10 @@ public class Dstore { } //Send REBALANCE_COMPLETE message to client (the controller) - controllerOut.println("REBALANCE_COMPLETE"); - controllerOut.flush(); + synchronized(controllerOut) { + controllerOut.println("REBALANCE_COMPLETE"); + controllerOut.flush(); + } System.out.println("Sent message REBALANCE_COMPLETE"); //TO DO: WAIT FOR RESPONSES BEFORE SENDING REBALANCE_COMPLETE }).start(); diff --git a/DstoreConnection$ReceiveContext.class b/DstoreConnection$ReceiveContext.class index 29a6f8871a554ee11deeab64ee52990e0810f08f..6276604e77c845f0e2b76c24946dc27deae658f0 100644 Binary files a/DstoreConnection$ReceiveContext.class and b/DstoreConnection$ReceiveContext.class differ diff --git a/DstoreConnection.class b/DstoreConnection.class index 78bdda2b66cc70e62e1b5a96c7af9d389228ec03..26f8d2d10198470a3bbf2d10eb9e8a59c8d2a38f 100644 Binary files a/DstoreConnection.class and b/DstoreConnection.class differ diff --git a/DstoreConnection.java b/DstoreConnection.java index 4778e9921fcc9a78d81dfa445f046bb585380266..2db4b30bded6f4e39473d6dc1acfcc1a7fce83ab 100644 --- a/DstoreConnection.java +++ b/DstoreConnection.java @@ -11,20 +11,24 @@ public class DstoreConnection { protected final int MAX_QUEUE_SIZE = 50; protected Socket socket; + protected int port; //Solely used for debugging purposes protected BufferedReader reader; protected PrintWriter writer; protected boolean available; protected List<String> queue; protected int timeout; + protected DstoreDisconnectException disconnectException; - public DstoreConnection(Socket socket, int timeout) { + public DstoreConnection(Socket socket, int port, int timeout) { this.socket = socket; + this.port = port; this.timeout = timeout; try { reader = new BufferedReader(new InputStreamReader(socket.getInputStream())); writer = new PrintWriter(socket.getOutputStream()); available = true; queue = new ArrayList<String>(); + disconnectException = new DstoreDisconnectException(port); } catch(IOException e) { e.printStackTrace(); @@ -44,13 +48,13 @@ public class DstoreConnection { if(!available) return "ERROR"; writer.println(message); writer.flush(); - System.out.println("Controller sent " + message); + System.out.println("Controller sent " + message + " to port " + port); return localReceive(expectedMessage); } catch(NullPointerException e) { - System.out.println("Dstore disconnected"); + System.out.println("Dstore at port " + port + " disconnected"); available = false; - throw new DstoreDisconnectException(); + throw disconnectException; } } } @@ -91,7 +95,7 @@ public class DstoreConnection { rc.end(); } String returnMessage = rc.getReturnMessage(); - if(returnMessage == null) throw new DstoreDisconnectException(); + if(returnMessage == null) throw disconnectException; else return returnMessage; } catch(InterruptedException e) { @@ -151,9 +155,8 @@ public class DstoreConnection { do { message = reader.readLine(); if(message == null) { - System.out.println("Dstore disconnected"); available = false; - throw new DstoreDisconnectException(); + throw disconnectException; } if(expectedMessage != null && !expectedMessage.equals(message.split(" ")[0])) { queue.add(message); @@ -162,7 +165,7 @@ public class DstoreConnection { } } while(message == null); - System.out.println("Controller received " + message); + System.out.println("Controller received " + message + " from port " + port); returnMessage = message; } catch(IOException e) { diff --git a/DstoreDisconnectException.class b/DstoreDisconnectException.class index f1a70dafe9f2cfe9e9879cbf5b4fc17a31599470..22b4ca5149e1537a79318afa75ce420aba79414b 100644 Binary files a/DstoreDisconnectException.class and b/DstoreDisconnectException.class differ diff --git a/DstoreDisconnectException.java b/DstoreDisconnectException.java index 085512f94676526c5f35c1125e86d51051dcabbd..82cd0fdf8b00dac6745d0afd5c0c32cced9493d2 100644 --- a/DstoreDisconnectException.java +++ b/DstoreDisconnectException.java @@ -1,7 +1,7 @@ import java.lang.Throwable; public class DstoreDisconnectException extends Exception { - public DstoreDisconnectException() { - super("Dstore has been disconnected"); + public DstoreDisconnectException(int port) { + super("Dstore at port " + port + " has been disconnected"); } } diff --git a/Execute.sh b/Execute.sh new file mode 100755 index 0000000000000000000000000000000000000000..0f70e0c2b840b4fb518b2f3acfd8a219e99b2986 --- /dev/null +++ b/Execute.sh @@ -0,0 +1,13 @@ +#!/bin/bash +java Controller 8080 $1 $3 $4 & +echo $! +for((i=1; i<=$2; i++)) do + sleep 0.2 + n=$((8080+$i)) + echo $n + s="store$i" + java Dstore $n 8080 $3 $s & + echo $! +done +sleep 2 +java -cp .:client-1.0.0.jar ClientMain 8080 $3 diff --git a/javac.20210419_161632.args b/javac.20210419_161632.args deleted file mode 100644 index 159b3933b6658f4b9e6b60f45d28c5f300041a5d..0000000000000000000000000000000000000000 --- a/javac.20210419_161632.args +++ /dev/null @@ -1 +0,0 @@ -Controller.java