diff --git a/Controller.java b/Controller.java index 756576d5330958e8884de8a1eb3eaf072debf8cc..1da9704e17bb11d075d3cd8cc652ea297c1b3a93 100644 --- a/Controller.java +++ b/Controller.java @@ -100,11 +100,70 @@ public class Controller { public long filesize; } + //Rebalances only start when there are no executing threads + protected class RebalanceLock { + protected int processes; + protected boolean highPriorityWait; + protected CountDownLatch periodBlock; + protected Object blockLock; + + public RebalanceLock() { + periodBlock = new CountDownLatch(1); + blockLock = new Object(); + } + + public synchronized void addProcess() throws InterruptedException { + while(highPriorityWait) { + this.wait(); + } + + processes ++; + } + + public synchronized void removeProcess() { + processes --; + if(processes == 0) this.notifyAll(); + } + + public void waitForFinish() { + while(processes > 0) { + highPriorityWait = true; + try { + this.wait(); + } + catch(InterruptedException e) {e.printStackTrace();} + } + highPriorityWait = false; + } + + public void queueRebalance() { + synchronized(blockLock) { + periodBlock.countDown(); + } + } + + public boolean waitToRebalance() { + try { + boolean dstoreJoined = periodBlock.await(rebalancePeriod, TimeUnit.MILLISECONDS); + if(dstoreJoined) { + synchronized(blockLock) { + periodBlock = new CountDownLatch(1); + } + } + return dstoreJoined; + } + catch(InterruptedException e) {e.printStackTrace();} + return true; + } + } + protected Map<Integer,DstoreConnection> dstores; protected RebalanceMessages rebalanceMessages; protected Map<String,IndexEntry> index; protected Map<Socket,Reloader> loadRequests; + protected RebalanceLock rebalanceLock; + public Controller(int cport, int rFactor, int timeout, int rebalancePeriod) { this.cport = cport; this.rFactor = rFactor; @@ -114,6 +173,7 @@ public class Controller { rebalanceMessages = new RebalanceMessages(); index = Collections.synchronizedMap(new HashMap<String,IndexEntry>()); loadRequests = Collections.synchronizedMap(new HashMap<Socket,Reloader>()); + rebalanceLock = new RebalanceLock(); } public static void main(String[] args) { @@ -159,15 +219,17 @@ public class Controller { if(tMessage == null) {message = new String[]{""};} else {message = tMessage.split(" ");} - if(message[0].equals("JOIN")) { - int portNumber = Integer.parseInt(message[1]); - dstores.put(portNumber, new DstoreConnection(client, portNumber, timeout)); - System.out.println("Dstore at " + portNumber + " joined"); - try {rebalanceThread.interrupt();} catch(SecurityException e) {e.printStackTrace();} - } - else { - System.out.println("A new client has joined"); - new Thread(() -> { + new Thread(() -> { + if(message[0].equals("JOIN")) { + int portNumber = Integer.parseInt(message[1]); + synchronized(rebalanceLock) { + dstores.put(portNumber, new DstoreConnection(client, portNumber, timeout)); + System.out.println("Dstore at " + portNumber + " joined"); + rebalanceLock.queueRebalance(); + } + } + else { + System.out.println("A new client has joined"); try { handleMessage(message, client); } @@ -191,8 +253,8 @@ public class Controller { while(clientMessage != null); System.out.println("Client closed"); try {client.close();} catch(IOException e) {} - }).start(); - } + } + }).start(); } catch(Exception e) { //Log error @@ -210,13 +272,25 @@ public class Controller { protected class RebalanceThread implements Runnable { public void run() { while(true) { - try {Thread.sleep(rebalancePeriod);} catch(InterruptedException e) { - try {rebalance();} catch(Exception ee) {ee.printStackTrace();} + if(rebalanceLock.waitToRebalance()) { + try {runRebalance();} catch(Exception e) {e.printStackTrace();} } - try { - if(dstores.size() >= rFactor) { - rebalance(); + else { + try { + if(dstores.size() >= rFactor) { + runRebalance(); + } } + catch(Exception e) {e.printStackTrace();} + } + } + } + + protected void runRebalance() { + synchronized(rebalanceLock) { + try { + rebalanceLock.waitForFinish(); + rebalance(); } catch(Exception e) {e.printStackTrace();} } @@ -224,30 +298,41 @@ public class Controller { } void handleMessage(String[] message, Socket client) throws Exception { - if(dstores.size() < rFactor) { - PrintWriter out = new PrintWriter(client.getOutputStream()); - out.println("ERROR_NOT_ENOUGH_DSTORES"); - out.flush(); - } - else if(message[0].equals("STORE")) { - store(client, message[1], message[2]); - } - else if(message[0].equals("LOAD")) { - load(client, message[1]); - } - else if(message[0].equals("RELOAD")) { - sendLoadFrom(client, message[1]); - } - else if(message[0].equals("REMOVE")) { - remove(client, message[1]); - } - else if(message[0].equals("LIST")) { - list(client); - } - else { - //Log error - System.out.println("Malformed message received by Controller"); + try { + rebalanceLock.addProcess(); + + try { + if(dstores.size() < rFactor) { + PrintWriter out = new PrintWriter(client.getOutputStream()); + out.println("ERROR_NOT_ENOUGH_DSTORES"); + out.flush(); + } + else if(message[0].equals("STORE")) { + store(client, message[1], message[2]); + } + else if(message[0].equals("LOAD")) { + load(client, message[1]); + } + else if(message[0].equals("RELOAD")) { + sendLoadFrom(client, message[1]); + } + else if(message[0].equals("REMOVE")) { + remove(client, message[1]); + } + else if(message[0].equals("LIST")) { + list(client); + } + else { + //Log error + System.out.println("Malformed message received by Controller"); + } + } + catch(Exception e) {e.printStackTrace();} + finally { + rebalanceLock.removeProcess(); + } } + catch(InterruptedException e) {e.printStackTrace();} } void store(Socket client, String filename, String filesizeString) throws Exception { @@ -449,7 +534,7 @@ public class Controller { entry.status = IndexEntry.Status.REMOVE_IN_PROGRESS; //Send REMOVE message to all Dstores storing the file - CyclicBarrier barrier = new CyclicBarrier(entry.getStoredBy().size()); + CyclicBarrier barrier = new CyclicBarrier(entry.getStoredBy().size() + 1); for(Integer dstore : entry.getStoredBy()) { new Thread(() -> { try { @@ -505,7 +590,7 @@ public class Controller { //Update index to "remove complete" entry.status = IndexEntry.Status.REMOVE_COMPLETE; synchronized(index) { - if(index.containsKey(filename)) index.remove(filename); + if(index.get(filename) == entry) index.remove(filename); } //Send REMOVE_COMPLETE to client @@ -543,210 +628,215 @@ public class Controller { synchronized(rebalanceMessages) { if(rebalanceMessages.dstoreFiles != null) return; rebalanceMessages.dstoreFiles = dstoreFiles; - CyclicBarrier barrier = new CyclicBarrier(dstores.size()); + } + CyclicBarrier barrier = new CyclicBarrier(dstores.size() + 1); + try { + //Send LIST message to each Dstore and receive their file list + for(Integer dstore : dstores.keySet()) { + new Thread(() -> { + try { + String[] message = dstores.get(dstore).sendAndReceive("LIST").split(" "); + receiveDstoreList(dstore.intValue(), message, barrier); + } + catch(DstoreDisconnectException e) { + e.printStackTrace(); + removeDstore(dstore); + } + }).start(); + } + try { - //Send LIST message to each Dstore and receive their file list - for(Integer dstore : dstores.keySet()) { - new Thread(() -> { - try { - String[] message = dstores.get(dstore).sendAndReceive("LIST").split(" "); - receiveDstoreList(dstore.intValue(), message, barrier); - } - catch(DstoreDisconnectException e) { - e.printStackTrace(); - removeDstore(dstore); - } - }).start(); - } - - try { - barrier.await(timeout, TimeUnit.MILLISECONDS); - } - catch(TimeoutException e) { - //Log error - System.out.println("Not all file lists have been received"); - } - catch(Exception e) {e.printStackTrace();} + barrier.await(timeout, TimeUnit.MILLISECONDS); + } + catch(TimeoutException e) { + //Log error + System.out.println("Not all file lists have been received"); + } + catch(Exception e) {e.printStackTrace();} + + synchronized(rebalanceMessages) { rebalanceMessages.dstoreFiles = null; - /* - if(dstoreFiles.size() < dstores.size()) { - //Log error - System.out.println("Not all file lists have been received"); - } - */ - - //Create a new file allocation so that: - //Each file appears rFactor times - //Each file appears at most once on each datastore - //Files are evenly distributed (Dstores differ in capacity by at most 1, no 2 datastores have identical file lists) - - //First, compile all the received files from all dstores into one list - List<String> fileList = new ArrayList<String>(); - for(List<String> l : dstoreFiles.values()) { - for(String s : l) { - if(!fileList.contains(s)) { - fileList.add(s); - } + } + /* + if(dstoreFiles.size() < dstores.size()) { + //Log error + System.out.println("Not all file lists have been received"); + } + */ + + //Create a new file allocation so that: + //Each file appears rFactor times + //Each file appears at most once on each datastore + //Files are evenly distributed (Dstores differ in capacity by at most 1, no 2 datastores have identical file lists) + + //First, compile all the received files from all dstores into one list + List<String> fileList = new ArrayList<String>(); + for(List<String> l : dstoreFiles.values()) { + for(String s : l) { + if(!fileList.contains(s)) { + fileList.add(s); } } - - //Create a new index each for files required and files to remove - Map<Integer,List<RequireHandle>> requireIndex = new HashMap<Integer,List<RequireHandle>>(); - Map<Integer,List<String>> removeIndex = new HashMap<Integer,List<String>>(); - int pos = 0; - int storeSize = (int) Math.ceil((fileList.size() * rFactor) / dstores.size()); - for(Integer i : dstoreFiles.keySet()) { - requireIndex.put(i, new ArrayList<RequireHandle>()); - removeIndex.put(i, new ArrayList<String>()); - } - - //Insert files into the new indexes. These are allocated according to the new store order - List<Integer> storeOrder; - Iterator<Integer> it; - for(String file : fileList) { - storeOrder = reshuffle(dstoreFiles.keySet()); - it = storeOrder.iterator(); - if(index.containsKey(file)) { - for(int j=0; j<rFactor; j++) { - //If indexed dstore does not have the file, add it to its requireIndex entry - Integer thisStore = it.next(); - if(!dstoreFiles.get(thisStore).contains(file)) { - requireIndex.get(thisStore).add(new RequireHandle(file)); - } - } - } - //Dstores not chosen in the above loop must have an entry added to removeIndex, if they have the file - //This also covers files which were missed in a previous remove operation - while(it.hasNext()) { + } + + //Create a new index each for files required and files to remove + Map<Integer,List<RequireHandle>> requireIndex = new HashMap<Integer,List<RequireHandle>>(); + Map<Integer,List<String>> removeIndex = new HashMap<Integer,List<String>>(); + int pos = 0; + int storeSize = (int) Math.ceil((fileList.size() * rFactor) / dstores.size()); + for(Integer i : dstoreFiles.keySet()) { + requireIndex.put(i, new ArrayList<RequireHandle>()); + removeIndex.put(i, new ArrayList<String>()); + } + + //Insert files into the new indexes. These are allocated according to the new store order + List<Integer> storeOrder; + Iterator<Integer> it; + for(String file : fileList) { + storeOrder = reshuffle(dstoreFiles.keySet()); + it = storeOrder.iterator(); + if(index.containsKey(file)) { + for(int j=0; j<rFactor; j++) { + //If indexed dstore does not have the file, add it to its requireIndex entry Integer thisStore = it.next(); - if(dstoreFiles.get(thisStore).contains(file)) { - removeIndex.get(thisStore).add(file); + if(!dstoreFiles.get(thisStore).contains(file)) { + requireIndex.get(thisStore).add(new RequireHandle(file)); } } } - - //For each dstore, compile a new message based on the contents of the indexes then send this message and receive a reply - CyclicBarrier barrier2 = new CyclicBarrier(dstoreFiles.size()); - for(Integer thisStore : dstoreFiles.keySet()) { - //Compose files to send - List<String> sendMessages = new ArrayList<String>(); - for(String file : dstoreFiles.get(thisStore)) { - //All files required by other dstores are getting sent = no need for the following computation - if(allHandled(requireIndex)) break; - - String fileMessage = ""; - for(Integer otherStore : requireIndex.keySet()) { - if(thisStore.equals(otherStore)) continue; - for(RequireHandle otherHandle : requireIndex.get(otherStore)) { - if(file.equals(otherHandle.filename)) { - if(!otherHandle.handled) { - //Another store requires a file that this store has - send it there - otherHandle.handled = true; - fileMessage = fileMessage + " " + otherStore.toString(); - } - break; - } - } - } - if(fileMessage.equals("")) continue; //No files to send - fileMessage = file + " " + (fileMessage.trim().split(" ").length) + fileMessage; - sendMessages.add(fileMessage); - } - - String message = "REBALANCE " + sendMessages.size(); - for(String s : sendMessages) { - message = message + " " + s; - } - //Compose files to remove - message = message + " " + removeIndex.get(thisStore).size(); - for(String f : removeIndex.get(thisStore)) { - message = message + " " + f; + //Dstores not chosen in the above loop must have an entry added to removeIndex, if they have the file + //This also covers files which were missed in a previous remove operation + while(it.hasNext()) { + Integer thisStore = it.next(); + if(dstoreFiles.get(thisStore).contains(file)) { + removeIndex.get(thisStore).add(file); } + } + } + + //For each dstore, compile a new message based on the contents of the indexes then send this message and receive a reply + CyclicBarrier barrier2 = new CyclicBarrier(dstoreFiles.size() + 1); + for(Integer thisStore : dstoreFiles.keySet()) { + //Compose files to send + List<String> sendMessages = new ArrayList<String>(); + for(String file : dstoreFiles.get(thisStore)) { + //All files required by other dstores are getting sent = no need for the following computation + if(allHandled(requireIndex)) break; - //Send message to the Dstore - String finalMessage = message; - new Thread(() -> { - try { - DstoreConnection connection = dstores.get(thisStore); - String returnMessage = connection.sendAndReceive(finalMessage, "REBALANCE_COMPLETE"); - if(!returnMessage.equals("REBALANCE_COMPLETE")) { - //Log error - System.out.println("Dstore " + thisStore + " should have sent REBALANCE_COMPLETE but Controller received " + returnMessage); - } - - new Thread(() -> {try {barrier2.await();} catch(Exception e) {e.printStackTrace();}}).start(); - - for(int i=0; i<requireIndex.get(thisStore).size(); i++) { - returnMessage = connection.receive("STORE_ACK"); - if(!returnMessage.split(" ")[0].equals("STORE_ACK")) { - //Log error - System.out.println("Dstore " + thisStore + " should have sent STORE_ACK but Controller received " + returnMessage); + String fileMessage = ""; + for(Integer otherStore : requireIndex.keySet()) { + if(thisStore.equals(otherStore)) continue; + for(RequireHandle otherHandle : requireIndex.get(otherStore)) { + if(file.equals(otherHandle.filename)) { + if(!otherHandle.handled) { + //Another store requires a file that this store has - send it there + otherHandle.handled = true; + fileMessage = fileMessage + " " + otherStore.toString(); } + break; } } - catch(DstoreDisconnectException e) { - e.printStackTrace(); - removeDstore(thisStore); - } - }).start(); + } + if(fileMessage.equals("")) continue; //No files to send + fileMessage = file + " " + (fileMessage.trim().split(" ").length) + fileMessage; + sendMessages.add(fileMessage); } - //Wait for REBALANCE_COMPLETE from all Dstores - try { - barrier2.await(timeout, TimeUnit.MILLISECONDS); + String message = "REBALANCE " + sendMessages.size(); + for(String s : sendMessages) { + message = message + " " + s; } - catch(TimeoutException e) { - //Restart rebalance operation - System.err.println("Not all REBALANCE_COMPLETEs received. Restarting rebalance operation..."); - success = false; + //Compose files to remove + message = message + " " + removeIndex.get(thisStore).size(); + for(String f : removeIndex.get(thisStore)) { + message = message + " " + f; } - catch(Exception e) {e.printStackTrace();} - /* - synchronized(acksReceived) { + + //Send message to the Dstore + String finalMessage = message; + new Thread(() -> { try { - System.out.println("Waiting for REBALANCE_COMPLETE..."); - acksReceived.wait(timeout); - if(acksReceived.getValue() < dstoreFiles.size()) { - //Restart rebalance operation - System.out.println("Not all REBALANCE_COMPLETEs received. Restarting rebalance operation..."); - success = false; + DstoreConnection connection = dstores.get(thisStore); + String returnMessage = connection.sendAndReceive(finalMessage, "REBALANCE_COMPLETE"); + if(!returnMessage.equals("REBALANCE_COMPLETE")) { + //Log error + System.out.println("Dstore " + thisStore + " should have sent REBALANCE_COMPLETE but Controller received " + returnMessage); } - else if(acksReceived.getValue() > dstoreFiles.size()) { - System.out.println("Too many REBALANCE_COMPLETEs received"); + + new Thread(() -> {try {barrier2.await();} catch(Exception e) {e.printStackTrace();}}).start(); + + for(int i=0; i<requireIndex.get(thisStore).size(); i++) { + returnMessage = connection.receive("STORE_ACK"); + if(!returnMessage.split(" ")[0].equals("STORE_ACK")) { + //Log error + System.out.println("Dstore " + thisStore + " should have sent STORE_ACK but Controller received " + returnMessage); + } } } - catch(InterruptedException e) { + catch(DstoreDisconnectException e) { e.printStackTrace(); + removeDstore(thisStore); + } + }).start(); + } + + //Wait for REBALANCE_COMPLETE from all Dstores + try { + barrier2.await(timeout, TimeUnit.MILLISECONDS); + } + catch(TimeoutException e) { + //Restart rebalance operation + System.err.println("Not all REBALANCE_COMPLETEs received. Restarting rebalance operation..."); + success = false; + } + catch(Exception e) {e.printStackTrace();} + /* + synchronized(acksReceived) { + try { + System.out.println("Waiting for REBALANCE_COMPLETE..."); + acksReceived.wait(timeout); + if(acksReceived.getValue() < dstoreFiles.size()) { + //Restart rebalance operation + System.out.println("Not all REBALANCE_COMPLETEs received. Restarting rebalance operation..."); + success = false; + } + else if(acksReceived.getValue() > dstoreFiles.size()) { + System.out.println("Too many REBALANCE_COMPLETEs received"); } } - */ - - if(success) { - synchronized(index) { - Iterator<Integer> jt = requireIndex.keySet().iterator(); - while(jt.hasNext()) { - Integer dstore = jt.next(); - for(RequireHandle handle : requireIndex.get(dstore)) { - index.get(handle.filename).addStoredBy(Integer.valueOf(dstore), false); - } - for(String file : removeIndex.get(dstore)) { - if(index.containsKey(file)) index.get(file).removeStoredBy(Integer.valueOf(dstore)); - } + catch(InterruptedException e) { + e.printStackTrace(); + } + } + */ + + if(success) { + synchronized(index) { + Iterator<Integer> jt = requireIndex.keySet().iterator(); + while(jt.hasNext()) { + Integer dstore = jt.next(); + for(RequireHandle handle : requireIndex.get(dstore)) { + index.get(handle.filename).addStoredBy(Integer.valueOf(dstore), false); + } + for(String file : removeIndex.get(dstore)) { + if(index.containsKey(file)) index.get(file).removeStoredBy(Integer.valueOf(dstore)); } } } } - catch(Exception e) { - e.printStackTrace(); + } + catch(Exception e) { + e.printStackTrace(); + } + finally { + synchronized(rebalanceMessages) { + if(rebalanceMessages.dstoreFiles != null) rebalanceMessages.dstoreFiles = null; } - finally { - rebalanceMessages.dstoreFiles = null; - System.out.println("There are " + dstores.size() + " dstores connected"); - for(String i : index.keySet()) { - System.out.print(i); - } - System.out.print("\n"); + System.out.println("There are " + dstores.size() + " dstores connected"); + for(String i : index.keySet()) { + System.out.print(i); } + System.out.print("\n"); } if(!success) rebalance();