diff --git a/Controller.java b/Controller.java index 48b137ae261f9676c96c8294b4f27893cbb6d7c7..a3d5fdef04b6d14bd9fecdbebfc5712928327ad7 100644 --- a/Controller.java +++ b/Controller.java @@ -6,6 +6,7 @@ import java.lang.Math; import java.util.Iterator; import java.util.List; import java.util.ArrayList; +import java.util.Comparator; import java.util.Map; import java.util.HashMap; import java.util.Set; @@ -13,6 +14,12 @@ import java.util.HashSet; import java.util.Collection; import java.util.Collections; +/* +TO DO: +Get rid of missing REMOVE_ACKs problem +Distrbute files evenly (check spec for correct number of files on each store) +*/ + public class Controller { protected int cport; //Port to listen on protected int rFactor; //Replication factor; each file is replicated across r Dstores @@ -329,11 +336,8 @@ public class Controller { //Select Dstores Integer[] storesToStore = new Integer[rFactor]; - synchronized(dstores) { - Iterator<Integer> it = reshuffle(dstores.keySet()).iterator(); - for(int i=0; i<rFactor; i++) { - storesToStore[i] = it.next(); - } + for(int i=0; i<rFactor; i++) { + storesToStore[i] = nextStoreInSequence(); } entry.setNumberToStore(rFactor); @@ -474,11 +478,13 @@ public class Controller { //Send REMOVE message to all Dstores storing the file CountDownLatch latch = new CountDownLatch(entry.getStoredBy().size()); - for(Integer dstore : entry.getStoredBy()) { + Iterator<Integer> it = entry.getStoredBy().iterator(); + while(it.hasNext()) { + Integer dstore = it.next(); new Thread(() -> { try { - String[] message = dstores.get(dstore).sendAndReceive(Protocol.REMOVE_TOKEN + " " + filename, Protocol.REMOVE_ACK_TOKEN).split(" "); - if(message[0].equals(Protocol.REMOVE_ACK_TOKEN) && message[1].equals(filename)) { + String[] message = dstores.get(dstore).sendAndReceive(Protocol.REMOVE_TOKEN + " " + filename, Protocol.REMOVE_ACK_TOKEN, Protocol.ERROR_FILE_DOES_NOT_EXIST_TOKEN).split(" "); + if((message[0].equals(Protocol.REMOVE_ACK_TOKEN) && message[1].equals(filename)) || message[0].equals(Protocol.ERROR_FILE_DOES_NOT_EXIST_TOKEN)) { entry.removeStoredBy(dstore.intValue()); latch.countDown(); } @@ -537,7 +543,6 @@ public class Controller { } void rebalance() throws Exception { - boolean success = true; Map<Integer,List<String>> dstoreFiles = new HashMap<Integer,List<String>>(); synchronized(rebalanceMessages) { @@ -583,6 +588,30 @@ public class Controller { } */ + Map<Integer,List<String>> newAlloc = allocate(dstoreFiles); + Map<Integer,String> sendIndex = composeRebalanceMessages(dstoreFiles, newAlloc); + CountDownLatch latch = new CountDownLatch(sendIndex.size()); + for(Integer dstore : sendIndex.keySet()) { + new Thread(() -> { + try { + DstoreConnection connection = dstores.get(dstore); + String returnMessage = connection.sendAndReceive(sendIndex.get(dstore), Protocol.REBALANCE_COMPLETE_TOKEN); + if(!returnMessage.equals(Protocol.REBALANCE_COMPLETE_TOKEN)) { + //Log error + System.out.println("Dstore " + dstore + " should have sent REBALANCE_COMPLETE but Controller received " + returnMessage); + } + + latch.countDown(); + } + catch(DstoreDisconnectException e) { + e.printStackTrace(); + removeDstore(e); + } + catch(Exception e) {e.printStackTrace();} + }).start(); + } + + /* //Create a new file allocation so that: //Each file appears rFactor times //Each file appears at most once on each datastore @@ -598,16 +627,6 @@ public class Controller { } } - //Create a new index each for files required and files to remove - Map<Integer,List<RequireHandle>> requireIndex = new HashMap<Integer,List<RequireHandle>>(); - Map<Integer,List<String>> removeIndex = new HashMap<Integer,List<String>>(); - int pos = 0; - int storeSize = (int) Math.ceil((fileList.size() * rFactor) / dstores.size()); - for(Integer i : dstoreFiles.keySet()) { - requireIndex.put(i, new ArrayList<RequireHandle>()); - removeIndex.put(i, new ArrayList<String>()); - } - //Insert files into the new indexes. These are allocated according to the new store order List<Integer> storeOrder; Iterator<Integer> it; @@ -633,6 +652,7 @@ public class Controller { } } + //For each dstore, compile a new message based on the contents of the indexes then send this message and receive a reply CountDownLatch latch = new CountDownLatch(dstoreFiles.size()); for(Integer thisStore : dstoreFiles.keySet()) { @@ -705,50 +725,16 @@ public class Controller { } }).start(); } + */ //Wait for REBALANCE_COMPLETE from all Dstores try { if(!latch.await(timeout, TimeUnit.MILLISECONDS)) { //Restart rebalance operation - System.err.println("Not all REBALANCE_COMPLETEs received. Restarting rebalance operation..."); - success = false; + System.err.println("Not all REBALANCE_COMPLETEs received"); } } catch(Exception e) {e.printStackTrace();} - /* - synchronized(acksReceived) { - try { - System.out.println("Waiting for REBALANCE_COMPLETE..."); - acksReceived.wait(timeout); - if(acksReceived.getValue() < dstoreFiles.size()) { - //Restart rebalance operation - System.out.println("Not all REBALANCE_COMPLETEs received. Restarting rebalance operation..."); - success = false; - } - else if(acksReceived.getValue() > dstoreFiles.size()) { - System.out.println("Too many REBALANCE_COMPLETEs received"); - } - } - catch(InterruptedException e) { - e.printStackTrace(); - } - } - */ - - if(success) { - synchronized(index) { - Iterator<Integer> jt = requireIndex.keySet().iterator(); - while(jt.hasNext()) { - Integer dstore = jt.next(); - for(RequireHandle handle : requireIndex.get(dstore)) { - index.get(handle.filename).addStoredBy(Integer.valueOf(dstore), false); - } - for(String file : removeIndex.get(dstore)) { - if(index.containsKey(file)) index.get(file).removeStoredBy(Integer.valueOf(dstore)); - } - } - } - } } catch(Exception e) { e.printStackTrace(); @@ -763,8 +749,6 @@ public class Controller { } System.out.print("\n"); } - - if(!success) rebalance(); } void receiveDstoreList(int port, String[] list, CountDownLatch latch) { @@ -784,6 +768,143 @@ public class Controller { latch.countDown(); } + //Allocate needs to: + //allocate files that don't have enough storers to dstores that don't have them + //move files from dstores that have too many files + //prioritize storing these files to dstores that don't have enough files + Map<Integer,List<String>> allocate(Map<Integer,List<String>> oldDstoreFiles) { + //Precaution made so that the input map is not modified + Map<Integer,List<String>> dstoreFiles = new HashMap<Integer,List<String>>(); + for(Integer i : oldDstoreFiles.keySet()) { + List<String> files = new ArrayList<String>(); + for(String s : oldDstoreFiles.get(i)) { + if(index.containsKey(s)) files.add(s); + } + dstoreFiles.put(i, files); + } + + class AllocComparator implements Comparator<Integer> { + public int compare(Integer s1, Integer s2) { + return dstoreFiles.get(s1).size() - dstoreFiles.get(s2).size(); + } + } + + Map<String,Integer> counts = new HashMap<String,Integer>(); + for(Integer dstore : dstoreFiles.keySet()) { + for(String file : dstoreFiles.get(dstore)) { + if(counts.get(file) == null) { + counts.put(file, 1); + } + else { + counts.put(file, counts.get(file) + 1); + } + } + } + + List<Integer> priorityList = new ArrayList<Integer>(dstoreFiles.keySet()); + priorityList.sort(new AllocComparator()); + + Iterator<Integer> it; + for(String file : counts.keySet()) { + if(counts.get(file) >= rFactor) continue; + it = priorityList.iterator(); + while(counts.get(file) < rFactor && it.hasNext()) { + Integer thisStore = it.next(); + if(!dstoreFiles.get(thisStore).contains(file)) { + dstoreFiles.get(thisStore).add(file); + counts.put(file, counts.get(file) + 1); + } + } + } + + priorityList.sort(new AllocComparator()); + double optimumStoreAmount = (rFactor * counts.size()) / dstoreFiles.size(); + Integer minStore = priorityList.get(0); + Integer maxStore = priorityList.get(priorityList.size() - 1); + boolean giveUp = false; + while(dstoreFiles.get(maxStore).size() > Math.ceil(optimumStoreAmount) + && dstoreFiles.get(minStore).size() < Math.floor(optimumStoreAmount) + && !giveUp) { + giveUp = true; + + Iterator<String> jt = dstoreFiles.get(maxStore).iterator(); + while(jt.hasNext()) { + String thisFile = jt.next(); + if(!dstoreFiles.get(minStore).contains(thisFile)) { + dstoreFiles.get(minStore).add(thisFile); + dstoreFiles.get(maxStore).remove(thisFile); + giveUp = false; + break; + } + } + + priorityList.sort(new AllocComparator()); + minStore = priorityList.get(0); + maxStore = priorityList.get(priorityList.size() - 1); + } + + return dstoreFiles; + } + + Map<Integer,String> composeRebalanceMessages(Map<Integer,List<String>> oldAlloc, Map<Integer,List<String>> newAlloc) { + Map<String,List<Integer>> requireIndex = new HashMap<String,List<Integer>>(); + + //Compose a map of required files by finding files of the new allocation that weren't present in the old + for(Integer dstore : newAlloc.keySet()) { + List<String> oldFiles = oldAlloc.get(dstore); + for(String file : newAlloc.get(dstore)) { + if(!oldFiles.contains(file)) { + List<Integer> requires = requireIndex.get(file); + if(requires == null) { + requires = new ArrayList<Integer>(); + requireIndex.put(file, requires); + } + requires.add(dstore); + index.get(file).addStoredBy(dstore); + } + } + } + + Map<Integer,String> messages = new HashMap<Integer,String>(); + for(Integer dstore : newAlloc.keySet()) { + String thisMessage = ""; + + //Compose files to send + int filesToSend = 0; + List<String> oldFiles = oldAlloc.get(dstore); + List<String> newFiles = newAlloc.get(dstore); + for(String file : requireIndex.keySet()) { + if(oldFiles.contains(file)) { + filesToSend ++; + List<Integer> thisRequire = requireIndex.get(file); + thisMessage = thisMessage + " " + file + " " + thisRequire.size(); + for(Integer otherStore : thisRequire) { + thisMessage = thisMessage + " " + otherStore; + } + requireIndex.remove(file); + } + } + + thisMessage = Protocol.REBALANCE_TOKEN + " " + filesToSend + thisMessage; + + String removeMessage = ""; + int filesToRemove = 0; + for(String file : oldFiles) { + if(!newFiles.contains(file)) { + filesToRemove ++; + removeMessage = removeMessage + " " + file; + if(index.get(file) != null) index.get(file).removeStoredBy(dstore); + } + } + + if(filesToSend == 0 && filesToRemove == 0) continue; + thisMessage = thisMessage + " " + filesToRemove + removeMessage; + messages.put(dstore, thisMessage); + } + + return messages; + } + void removeDstore(DstoreDisconnectException e) { Integer port = e.getPort(); dstores.remove(port); @@ -806,6 +927,27 @@ public class Controller { return list; } + Iterator<Integer> sequenceIt = null; + Object sequenceLock = new Object(); + Integer nextStoreInSequence() { + Integer store = null; + while(store == null) { + synchronized(sequenceLock) { + if(sequenceIt == null || !sequenceIt.hasNext()) { + synchronized(dstores) { + if(dstores.isEmpty()) return null; + sequenceIt = dstores.keySet().iterator(); + } + } + + store = sequenceIt.next(); + if(!dstores.containsKey(store)) store = null; + } + } + return store; + } + + /* Deprecated //Helper class for rebalance method - contains a filename and a boolean which is true if a dstore is going to send this file protected class RequireHandle { public String filename; @@ -824,6 +966,7 @@ public class Controller { } return true; } + */ void messageSent(Socket socket, String message) { ControllerLogger.getInstance().messageSent(socket, message); diff --git a/Dstore.java b/Dstore.java index 9bd7979f86112b111afb8b6be5ea43669afe6adf..3264df8578f317cec551bea74cc84bbc04a3cd10 100644 --- a/Dstore.java +++ b/Dstore.java @@ -118,8 +118,11 @@ public class Dstore { } void handleMessage(String[] message, Socket client) throws Exception { - if(message[0].equals(Protocol.STORE_TOKEN) || message[0].equals(Protocol.REBALANCE_STORE_TOKEN)) { - store(client, message[1], Long.parseLong(message[2])); + if(message[0].equals(Protocol.STORE_TOKEN)) { + store(client, message[1], Long.parseLong(message[2]), true); + } + else if(message[0].equals(Protocol.REBALANCE_STORE_TOKEN)) { + store(client, message[1], Long.parseLong(message[2]), false); } else if(message[0].equals(Protocol.LOAD_DATA_TOKEN)) { load(client, message[1]); @@ -139,7 +142,7 @@ public class Dstore { } } - void store(Socket client, String filename, long filesize) throws Exception { + void store(Socket client, String filename, long filesize, boolean acknowledged) throws Exception { new Thread(() -> { try { //Send ACK message to client @@ -160,11 +163,13 @@ public class Dstore { writer.close(); //Send STORE_ACK message to the Controller - synchronized(controllerOut) { - String controllerMessage = Protocol.STORE_ACK_TOKEN + " " + filename; - controllerOut.println(controllerMessage); - controllerOut.flush(); - messageSent(controllerSocket, controllerMessage); + if(acknowledged) { + synchronized(controllerOut) { + String controllerMessage = Protocol.STORE_ACK_TOKEN + " " + filename; + controllerOut.println(controllerMessage); + controllerOut.flush(); + messageSent(controllerSocket, controllerMessage); + } } if(fileSizes.containsKey(filename)) fileSizes.remove(filename); diff --git a/DstoreConnection.java b/DstoreConnection.java index 3f82995d4433c664c6af50485ffc2e175db8f828..e7afb4a5a7b38a999242c462eb624f688947c400 100644 --- a/DstoreConnection.java +++ b/DstoreConnection.java @@ -45,7 +45,7 @@ public class DstoreConnection { return disconnectException; } - public String sendAndReceive(String message, String expectedMessage) throws DstoreDisconnectException { + public String sendAndReceive(String message, String... expectedMessages) throws DstoreDisconnectException { System.out.println("Getting lock..."); synchronized(this) { try { @@ -55,7 +55,7 @@ public class DstoreConnection { writer.flush(); //System.out.println("Controller sent " + message + " to port " + port); ControllerLogger.getInstance().messageSent(socket, message); - return localReceive(expectedMessage); + return localReceive(expectedMessages); } catch(NullPointerException e) { System.out.println("Dstore at port " + port + " disconnected"); @@ -65,12 +65,8 @@ public class DstoreConnection { } } - public String sendAndReceive(String message) throws DstoreDisconnectException { - return sendAndReceive(message, null); - } - - public String receive(String expectedMessage) throws DstoreDisconnectException { - String findMessage = checkQueue(expectedMessage); + public String receive(String... expectedMessages) throws DstoreDisconnectException { + String findMessage = checkQueue(expectedMessages); if(findMessage != null) { return findMessage; } @@ -81,17 +77,19 @@ public class DstoreConnection { if(!available) return "ERROR"; //Check the queue twice: once incase the receiver is busy, twice incase the message was added by the last thread - findMessage = checkQueue(expectedMessage); + findMessage = checkQueue(expectedMessages); if(findMessage != null) { return findMessage; } - return localReceive(expectedMessage); + return localReceive(expectedMessages); } } //Check the queue for the message before trying to receive any new messages (if no expected message is specified, return the head of the queue) - protected String checkQueue(String expectedMessage) { + protected String checkQueue(String[] expectedMessages) { + if(expectedMessages.length == 0) return null; + Iterator<String> it; synchronized(queue) { it = queue.iterator(); @@ -99,7 +97,7 @@ public class DstoreConnection { while(it.hasNext()) { String message = it.next(); - if(expectedMessage == null || expectedMessage.equals(message.split(" ")[0])) { + if(isExpected(message, expectedMessages)) { queue.remove(message); return message; } @@ -108,13 +106,9 @@ public class DstoreConnection { return null; } - public String receive() throws DstoreDisconnectException { - return receive(null); - } - - protected String localReceive(String expectedMessage) throws DstoreDisconnectException { + protected String localReceive(String[] expectedMessages) throws DstoreDisconnectException { try { - ReceiveContext rc = new ReceiveContext(expectedMessage); + ReceiveContext rc = new ReceiveContext(expectedMessages); synchronized(rc.lock) { rc.start(); rc.lock.wait(timeout); @@ -155,15 +149,22 @@ public class DstoreConnection { return ""; } + protected boolean isExpected(String message, String[] expectedMessages) { + for(String s : expectedMessages) { + if(s.equals(message.split(" ")[0])) return true; + } + return false; + } + //Seperate class for enabling timeouts while receiving messages protected class ReceiveContext { - protected String expectedMessage; + protected String[] expectedMessages; protected String returnMessage; protected Thread thread; public Object lock; - public ReceiveContext(String expectedMessage) { - this.expectedMessage = expectedMessage; + public ReceiveContext(String[] expectedMessages) { + this.expectedMessages = expectedMessages; returnMessage = ""; lock = new Object(); } @@ -185,7 +186,7 @@ public class DstoreConnection { throw disconnectException; } ControllerLogger.getInstance().messageReceived(socket, message); - if(expectedMessage != null && !expectedMessage.equals(message.split(" ")[0])) { + if(expectedMessages.length > 0 && !isExpected(message, expectedMessages)) { queue.add(message); if(queue.size() > MAX_QUEUE_SIZE) queue.remove(0); message = null; diff --git a/RebalanceLock.java b/RebalanceLock.java index 8f25a524fdf9781c9310a1d09320caf383ddcd49..c2ea2974b5fa1818862b60a0ffa4ac834d613826 100644 --- a/RebalanceLock.java +++ b/RebalanceLock.java @@ -59,6 +59,6 @@ public class RebalanceLock { return dstoreJoined; } catch(InterruptedException e) {e.printStackTrace();} - return true; + return false; } }