diff --git a/Controller.java b/Controller.java index a3d5fdef04b6d14bd9fecdbebfc5712928327ad7..1866ab294e69098f9a8cc738461e85305cca98c8 100644 --- a/Controller.java +++ b/Controller.java @@ -42,7 +42,6 @@ public class Controller { public IndexEntry() { filesize = -1; storedBy = Collections.synchronizedList(new ArrayList<Integer>()); - numberToStore = 0; status = Status.STORE_IN_PROGRESS; } @@ -55,42 +54,17 @@ public class Controller { } public synchronized void addStoredBy(int dstore) { - addStoredBy(dstore, true); - } - - public synchronized void addStoredBy(int dstore, boolean notify) { storedBy.add(Integer.valueOf(dstore)); - if(storedBy.size() >= numberToStore && notify) notifyAll(); - } - - public synchronized void addStoredBy(List<Integer> dstores) { - storedBy.addAll(dstores); - if(storedBy.size() >= numberToStore) notifyAll(); - } - - public synchronized void newAllocation(List<Integer> dstores) { - storedBy.clear(); - storedBy.addAll(dstores); } public synchronized void removeStoredBy(int dstore) { storedBy.remove(Integer.valueOf(dstore)); - if(storedBy.isEmpty()) notifyAll(); - } - - public synchronized void removeStoredBy(List<Integer> dstores) { - storedBy.removeAll(dstores); - if(storedBy.isEmpty()) notifyAll(); } public List<Integer> getStoredBy() { return storedBy; } - public synchronized void setNumberToStore(int i) { - numberToStore = i; - } - public synchronized void setStatus(Status status) { this.status = status; } @@ -100,17 +74,13 @@ public class Controller { } } - protected class RebalanceMessages { - public Map<Integer,List<String>> dstoreFiles; - public RebalanceMessages() {dstoreFiles = null;} - } - protected class Reloader extends ArrayList<Integer> { public long filesize; } + protected class InvalidStatusException extends Exception {} + protected Map<Integer,DstoreConnection> dstores; - protected RebalanceMessages rebalanceMessages; protected Map<String,IndexEntry> index; protected Map<Socket,Reloader> loadRequests; @@ -122,7 +92,6 @@ public class Controller { this.timeout = timeout; this.rebalancePeriod = rebalancePeriod; dstores = Collections.synchronizedMap(new HashMap<Integer,DstoreConnection>()); - rebalanceMessages = new RebalanceMessages(); index = Collections.synchronizedMap(new HashMap<String,IndexEntry>()); loadRequests = Collections.synchronizedMap(new HashMap<Socket,Reloader>()); rebalanceLock = new RebalanceLock(rebalancePeriod); @@ -312,26 +281,29 @@ public class Controller { try { //Create a new entry in the index IndexEntry entry; - synchronized(index) { - if(!index.containsKey(filename)) { - entry = new IndexEntry(); - } - else { - entry = index.get(filename); - if(entry.getStatus() == IndexEntry.Status.REMOVE_IN_PROGRESS || entry.getStatus() == IndexEntry.Status.REMOVE_COMPLETE) { - index.remove(filename); - entry = new IndexEntry(); - } - else { - PrintWriter out = new PrintWriter(client.getOutputStream()); - out.println(Protocol.ERROR_FILE_ALREADY_EXISTS_TOKEN); - out.flush(); - messageSent(client, Protocol.ERROR_FILE_ALREADY_EXISTS_TOKEN); - return; + try { + synchronized(index) { + if(index.containsKey(filename)) { + entry = index.get(filename); + if(entry.getStatus() == IndexEntry.Status.REMOVE_IN_PROGRESS || entry.getStatus() == IndexEntry.Status.REMOVE_COMPLETE) { + index.remove(filename); + } + else { + throw new InvalidStatusException(); + } } + entry = new IndexEntry(); + index.put(filename, entry); } - index.put(filename, entry); } + catch(InvalidStatusException e) { + PrintWriter out = new PrintWriter(client.getOutputStream()); + out.println(Protocol.ERROR_FILE_ALREADY_EXISTS_TOKEN); + out.flush(); + messageSent(client, Protocol.ERROR_FILE_ALREADY_EXISTS_TOKEN); + return; + } + entry.setFilesize(filesize); //Select Dstores @@ -339,7 +311,6 @@ public class Controller { for(int i=0; i<rFactor; i++) { storesToStore[i] = nextStoreInSequence(); } - entry.setNumberToStore(rFactor); //Send STORE_TO message CountDownLatch latch = new CountDownLatch(rFactor); @@ -372,6 +343,7 @@ public class Controller { } out.println(message); out.flush(); + messageSent(client, message); //Wait for STORE_ACKs from datastores in storesToStore if(latch.await(timeout, TimeUnit.MILLISECONDS)) { @@ -464,7 +436,19 @@ public class Controller { void remove(Socket client, String filename) throws Exception { try { - if(!index.containsKey(filename) || index.get(filename).getStatus() != IndexEntry.Status.STORE_COMPLETE) { + IndexEntry entry; + try { + synchronized(index) { + entry = index.get(filename); + if(entry == null || entry.getStatus() != IndexEntry.Status.STORE_COMPLETE) { + throw new InvalidStatusException(); + } + + //Update index to "remove in progress" + entry.setStatus(IndexEntry.Status.REMOVE_IN_PROGRESS); + } + } + catch(InvalidStatusException e) { PrintWriter clientOut = new PrintWriter(client.getOutputStream()); clientOut.println(Protocol.ERROR_FILE_DOES_NOT_EXIST_TOKEN); clientOut.flush(); @@ -472,10 +456,6 @@ public class Controller { return; } - //Update index to "remove in progress" - IndexEntry entry = index.get(filename); - entry.setStatus(IndexEntry.Status.REMOVE_IN_PROGRESS); - //Send REMOVE message to all Dstores storing the file CountDownLatch latch = new CountDownLatch(entry.getStoredBy().size()); Iterator<Integer> it = entry.getStoredBy().iterator(); @@ -544,50 +524,43 @@ public class Controller { void rebalance() throws Exception { Map<Integer,List<String>> dstoreFiles = new HashMap<Integer,List<String>>(); - - synchronized(rebalanceMessages) { - if(rebalanceMessages.dstoreFiles != null) return; - rebalanceMessages.dstoreFiles = dstoreFiles; - } CountDownLatch listLatch = new CountDownLatch(dstores.size()); try { //Send LIST message to each Dstore and receive their file list + List<Thread> activeThreads = new ArrayList<Thread>(); for(Integer dstore : dstores.keySet()) { - new Thread(() -> { + Thread thisThread = new Thread(() -> { try { String[] message = dstores.get(dstore).sendAndReceive(Protocol.LIST_TOKEN).split(" "); - receiveDstoreList(dstore.intValue(), message, listLatch); + receiveDstoreList(dstore.intValue(), message, dstoreFiles, listLatch); } catch(DstoreDisconnectException e) { e.printStackTrace(); removeDstore(e); } - }).start(); + }); + thisThread.start(); + activeThreads.add(thisThread); } try { if(!listLatch.await(timeout, TimeUnit.MILLISECONDS)) { //Log error System.err.println("Not all file lists have been received"); - Set<Integer> storesToRemove = new HashSet<Integer>(dstores.keySet()); - storesToRemove.removeAll(dstoreFiles.keySet()); - for(Integer dstore : storesToRemove) { - removeDstore(dstores.get(dstore).getDisconnectData()); + for(Thread t : activeThreads) { + t.interrupt(); + } + synchronized(dstoreFiles) { + Set<Integer> storesToRemove = new HashSet<Integer>(dstores.keySet()); + storesToRemove.removeAll(dstoreFiles.keySet()); + for(Integer dstore : storesToRemove) { + removeDstore(dstores.get(dstore).getDisconnectData()); + } } } } catch(Exception e) {e.printStackTrace();} - synchronized(rebalanceMessages) { - rebalanceMessages.dstoreFiles = null; - } - /* - if(dstoreFiles.size() < dstores.size()) { - //Log error - System.out.println("Not all file lists have been received"); - } - */ - Map<Integer,List<String>> newAlloc = allocate(dstoreFiles); Map<Integer,String> sendIndex = composeRebalanceMessages(dstoreFiles, newAlloc); CountDownLatch latch = new CountDownLatch(sendIndex.size()); @@ -611,122 +584,6 @@ public class Controller { }).start(); } - /* - //Create a new file allocation so that: - //Each file appears rFactor times - //Each file appears at most once on each datastore - //Files are evenly distributed (Dstores differ in capacity by at most 1, no 2 datastores have identical file lists) - - //First, compile all the received files from all dstores into one list - List<String> fileList = new ArrayList<String>(); - for(List<String> l : dstoreFiles.values()) { - for(String s : l) { - if(!fileList.contains(s)) { - fileList.add(s); - } - } - } - - //Insert files into the new indexes. These are allocated according to the new store order - List<Integer> storeOrder; - Iterator<Integer> it; - for(String file : fileList) { - storeOrder = reshuffle(dstoreFiles.keySet()); - it = storeOrder.iterator(); - if(index.containsKey(file)) { - for(int j=0; j<rFactor; j++) { - //If indexed dstore does not have the file, add it to its requireIndex entry - Integer thisStore = it.next(); - if(!dstoreFiles.get(thisStore).contains(file)) { - requireIndex.get(thisStore).add(new RequireHandle(file)); - } - } - } - //Dstores not chosen in the above loop must have an entry added to removeIndex, if they have the file - //This also covers files which were missed in a previous remove operation - while(it.hasNext()) { - Integer thisStore = it.next(); - if(dstoreFiles.get(thisStore).contains(file)) { - removeIndex.get(thisStore).add(file); - } - } - } - - - //For each dstore, compile a new message based on the contents of the indexes then send this message and receive a reply - CountDownLatch latch = new CountDownLatch(dstoreFiles.size()); - for(Integer thisStore : dstoreFiles.keySet()) { - //Compose files to send - List<String> sendMessages = new ArrayList<String>(); - for(String file : dstoreFiles.get(thisStore)) { - //All files required by other dstores are getting sent = no need for the following computation - if(allHandled(requireIndex)) break; - - String fileMessage = ""; - for(Integer otherStore : requireIndex.keySet()) { - if(thisStore.equals(otherStore)) continue; - for(RequireHandle otherHandle : requireIndex.get(otherStore)) { - if(file.equals(otherHandle.filename)) { - if(!otherHandle.handled) { - //Another store requires a file that this store has - send it there - otherHandle.handled = true; - fileMessage = fileMessage + " " + otherStore.toString(); - } - break; - } - } - } - if(fileMessage.equals("")) continue; //No files to send - fileMessage = file + " " + (fileMessage.trim().split(" ").length) + fileMessage; - sendMessages.add(fileMessage); - } - - //Don't need to send a rebalance message if there is nothing to update - List<String> thisRemove = removeIndex.get(thisStore); - if(sendMessages.isEmpty() && thisRemove.isEmpty()) { - latch.countDown(); - return; - } - - String message = Protocol.REBALANCE_TOKEN + " " + sendMessages.size(); - for(String s : sendMessages) { - message = message + " " + s; - } - //Compose files to remove - message = message + " " + thisRemove.size(); - for(String f : thisRemove) { - message = message + " " + f; - } - - //Send message to the Dstore - String finalMessage = message; - new Thread(() -> { - try { - DstoreConnection connection = dstores.get(thisStore); - String returnMessage = connection.sendAndReceive(finalMessage, Protocol.REBALANCE_COMPLETE_TOKEN); - if(!returnMessage.equals(Protocol.REBALANCE_COMPLETE_TOKEN)) { - //Log error - System.out.println("Dstore " + thisStore + " should have sent REBALANCE_COMPLETE but Controller received " + returnMessage); - } - - try {latch.countDown();} catch(Exception e) {e.printStackTrace();} - - for(int i=0; i<requireIndex.get(thisStore).size(); i++) { - returnMessage = connection.receive(Protocol.STORE_ACK_TOKEN); - if(!returnMessage.split(" ")[0].equals(Protocol.STORE_ACK_TOKEN)) { - //Log error - System.out.println("Dstore " + thisStore + " should have sent STORE_ACK but Controller received " + returnMessage); - } - } - } - catch(DstoreDisconnectException e) { - e.printStackTrace(); - removeDstore(e); - } - }).start(); - } - */ - //Wait for REBALANCE_COMPLETE from all Dstores try { if(!latch.await(timeout, TimeUnit.MILLISECONDS)) { @@ -740,9 +597,6 @@ public class Controller { e.printStackTrace(); } finally { - synchronized(rebalanceMessages) { - if(rebalanceMessages.dstoreFiles != null) rebalanceMessages.dstoreFiles = null; - } System.out.println("There are " + dstores.size() + " dstores connected"); for(String i : index.keySet()) { System.out.print(i); @@ -751,7 +605,7 @@ public class Controller { } } - void receiveDstoreList(int port, String[] list, CountDownLatch latch) { + void receiveDstoreList(int port, String[] list, Map<Integer,List<String>> dstoreFiles, CountDownLatch latch) { List<String> toList = new ArrayList<String>(); if(!list[0].equals("ERROR_EMPTY")) { for(String file : list) { @@ -759,10 +613,8 @@ public class Controller { } } - synchronized(rebalanceMessages) { - if(rebalanceMessages.dstoreFiles == null) return; - - rebalanceMessages.dstoreFiles.put(port, toList); + synchronized(dstoreFiles) { + dstoreFiles.put(port, toList); } latch.countDown(); @@ -784,8 +636,14 @@ public class Controller { } class AllocComparator implements Comparator<Integer> { + protected int m; + public AllocComparator(boolean ascending) { + if(ascending) m = 1; + else m = -1; + } + public int compare(Integer s1, Integer s2) { - return dstoreFiles.get(s1).size() - dstoreFiles.get(s2).size(); + return dstoreFiles.get(s1).size() - m * dstoreFiles.get(s2).size(); } } @@ -802,28 +660,45 @@ public class Controller { } List<Integer> priorityList = new ArrayList<Integer>(dstoreFiles.keySet()); - priorityList.sort(new AllocComparator()); Iterator<Integer> it; for(String file : counts.keySet()) { - if(counts.get(file) >= rFactor) continue; - it = priorityList.iterator(); - while(counts.get(file) < rFactor && it.hasNext()) { - Integer thisStore = it.next(); - if(!dstoreFiles.get(thisStore).contains(file)) { - dstoreFiles.get(thisStore).add(file); - counts.put(file, counts.get(file) + 1); + if(counts.get(file) > rFactor) { + System.out.println("Need to remove copies of " + file); + priorityList.sort(new AllocComparator(false)); + it = priorityList.iterator(); + while(counts.get(file) > rFactor && it.hasNext()) { + Integer thisStore = it.next(); + if(dstoreFiles.get(thisStore).contains(file)) { + dstoreFiles.get(thisStore).remove(file); + counts.put(file, counts.get(file) - 1); + System.out.println(file + " removed from " + thisStore); + } + } + } + else if(counts.get(file) < rFactor) { + System.out.println("Need to make copies of " + file); + priorityList.sort(new AllocComparator(true)); + it = priorityList.iterator(); + while(counts.get(file) < rFactor && it.hasNext()) { + Integer thisStore = it.next(); + if(!dstoreFiles.get(thisStore).contains(file)) { + dstoreFiles.get(thisStore).add(file); + counts.put(file, counts.get(file) + 1); + System.out.println(file + " allocated to " + thisStore); + } } } } - priorityList.sort(new AllocComparator()); - double optimumStoreAmount = (rFactor * counts.size()) / dstoreFiles.size(); + double optimumStoreAmount = ((double) rFactor * (double) counts.size()) / (double) dstoreFiles.size(); + priorityList.sort(new AllocComparator(true)); Integer minStore = priorityList.get(0); Integer maxStore = priorityList.get(priorityList.size() - 1); boolean giveUp = false; - while(dstoreFiles.get(maxStore).size() > Math.ceil(optimumStoreAmount) - && dstoreFiles.get(minStore).size() < Math.floor(optimumStoreAmount) + System.out.println(rFactor + " * " + counts.size() + " / " + dstoreFiles.size() + " = " + optimumStoreAmount); + while((dstoreFiles.get(maxStore).size() > Math.ceil(optimumStoreAmount) + || dstoreFiles.get(minStore).size() < Math.floor(optimumStoreAmount)) && !giveUp) { giveUp = true; @@ -831,6 +706,8 @@ public class Controller { while(jt.hasNext()) { String thisFile = jt.next(); if(!dstoreFiles.get(minStore).contains(thisFile)) { + //System.out.println(optimumStoreAmount); + //System.out.println("Moving " + thisFile + " from " + maxStore + "[" + dstoreFiles.get(maxStore).size() + "] to " + minStore + "[" + dstoreFiles.get(minStore).size() + "]"); dstoreFiles.get(minStore).add(thisFile); dstoreFiles.get(maxStore).remove(thisFile); giveUp = false; @@ -838,9 +715,11 @@ public class Controller { } } - priorityList.sort(new AllocComparator()); + priorityList.sort(new AllocComparator(true)); minStore = priorityList.get(0); maxStore = priorityList.get(priorityList.size() - 1); + + if(giveUp) System.out.println("Gave up reallocating files"); } return dstoreFiles; @@ -873,7 +752,9 @@ public class Controller { int filesToSend = 0; List<String> oldFiles = oldAlloc.get(dstore); List<String> newFiles = newAlloc.get(dstore); - for(String file : requireIndex.keySet()) { + Iterator<String> it = requireIndex.keySet().iterator(); + while(it.hasNext()) { + String file = it.next(); if(oldFiles.contains(file)) { filesToSend ++; List<Integer> thisRequire = requireIndex.get(file); @@ -881,7 +762,7 @@ public class Controller { for(Integer otherStore : thisRequire) { thisMessage = thisMessage + " " + otherStore; } - requireIndex.remove(file); + it.remove(); } } @@ -906,9 +787,11 @@ public class Controller { } void removeDstore(DstoreDisconnectException e) { - Integer port = e.getPort(); - dstores.remove(port); - try {e.getSocket().close();} catch(IOException ee) {} + Integer port = e.getConnection().getPort(); + synchronized(dstores) { + if(dstores.get(port).equals(e.getConnection())) dstores.remove(port); + } + try {e.getConnection().getSocket().close();} catch(IOException ee) {} Iterator<IndexEntry> it; synchronized(index) {it = index.values().iterator();} @@ -917,16 +800,6 @@ public class Controller { } } - <T> List<T> reshuffle(Collection<T> col) { - List<T> list = new ArrayList<T>(); - Iterator<T> it = col.iterator(); - while(it.hasNext()) { - list.add(it.next()); - } - Collections.shuffle(list); - return list; - } - Iterator<Integer> sequenceIt = null; Object sequenceLock = new Object(); Integer nextStoreInSequence() { @@ -947,27 +820,6 @@ public class Controller { return store; } - /* Deprecated - //Helper class for rebalance method - contains a filename and a boolean which is true if a dstore is going to send this file - protected class RequireHandle { - public String filename; - public boolean handled; - public RequireHandle(String filename) { - this.filename = filename; - handled = false; - } - } - - boolean allHandled(Map<Integer,List<RequireHandle>> map) { - for(List<RequireHandle> list : map.values()) { - for(RequireHandle handle : list) { - if(!handle.handled) return false; - } - } - return true; - } - */ - void messageSent(Socket socket, String message) { ControllerLogger.getInstance().messageSent(socket, message); } diff --git a/Dstore.java b/Dstore.java index 3264df8578f317cec551bea74cc84bbc04a3cd10..5a86d15f11032e450ed7958fe0bbde18476c580d 100644 --- a/Dstore.java +++ b/Dstore.java @@ -21,6 +21,8 @@ public class Dstore { protected BufferedReader controllerIn; protected PrintWriter controllerOut; + protected final int BUFFER_SIZE = 256; + public Dstore(int port, int cport, int timeout, String fileFolderName) throws Exception { this.port = port; this.cport = cport; @@ -38,6 +40,7 @@ public class Dstore { } fileSizes = new HashMap<String,Long>(); + for(File file : fileFolder.listFiles()) { if(!file.delete()) throw new Exception("Directory specified has undeletable files; please try a different directory"); } @@ -75,7 +78,7 @@ public class Dstore { try(Socket controllerSocket = new Socket(InetAddress.getLocalHost(), cport)) { this.controllerSocket = controllerSocket; controllerIn = new BufferedReader(new InputStreamReader(controllerSocket.getInputStream())); - controllerOut = new PrintWriter(controllerSocket.getOutputStream()); + controllerOut = new PrintWriter(controllerSocket.getOutputStream(), true); String joinMessage = Protocol.JOIN_TOKEN + " " + port; controllerOut.println(joinMessage); controllerOut.flush(); @@ -146,20 +149,22 @@ public class Dstore { new Thread(() -> { try { //Send ACK message to client - PrintWriter out = new PrintWriter(client.getOutputStream()); + PrintWriter out = new PrintWriter(client.getOutputStream(), true); out.println(Protocol.ACK_TOKEN); - out.flush(); messageSent(client, Protocol.ACK_TOKEN); - FileOutputStream writer = new FileOutputStream(new File(fileFolder, filename), false); + OutputStream writer = new FileOutputStream(new File(fileFolder, filename), false); InputStream reader = client.getInputStream(); //Receive + write file content from client - byte[] nextLine = new byte[8]; - while(reader.readNBytes(nextLine, 0, 8) > 0) { - writer.write(nextLine); + byte[] nextLine = new byte[BUFFER_SIZE]; + int len; + do { + len = reader.readNBytes(nextLine, 0, BUFFER_SIZE); + writer.write(nextLine, 0, len); writer.flush(); } + while(len == BUFFER_SIZE); writer.close(); //Send STORE_ACK message to the Controller @@ -167,13 +172,14 @@ public class Dstore { synchronized(controllerOut) { String controllerMessage = Protocol.STORE_ACK_TOKEN + " " + filename; controllerOut.println(controllerMessage); - controllerOut.flush(); messageSent(controllerSocket, controllerMessage); } } - if(fileSizes.containsKey(filename)) fileSizes.remove(filename); - fileSizes.put(filename, Long.valueOf(filesize)); + synchronized(fileSizes) { + if(fileSizes.containsKey(filename)) fileSizes.remove(filename); + fileSizes.put(filename, Long.valueOf(filesize)); + } } catch(IOException e) { e.printStackTrace(); @@ -198,11 +204,16 @@ public class Dstore { } OutputStream contentOut = client.getOutputStream(); - byte[] buf = new byte[8]; - while(reader.read(buf) != -1) { - contentOut.write(buf); - contentOut.flush(); + byte[] buf = new byte[BUFFER_SIZE]; + int len; + do { + len = reader.read(buf); + if(len >= 0) { + contentOut.write(buf, 0, len); + contentOut.flush(); + } } + while(len == BUFFER_SIZE); reader.close(); contentOut.close(); @@ -219,24 +230,26 @@ public class Dstore { void remove(String filename) throws Exception { new Thread(() -> { try { + System.out.println("Store " + port + " removing " + filename + "..."); //Remove the file from fileFolder Path path = new File(fileFolder, filename).toPath(); String controllerMessage; if(Files.deleteIfExists(path)) { + System.out.println("Store " + port + " removed " + filename); //Send REMOVE_ACK message to client (the controller) synchronized(controllerOut) { controllerMessage = Protocol.REMOVE_ACK_TOKEN + " " + filename; } } else { + System.out.println("Store " + port + " couldn't remove " + filename); //Send DOES NOT EXIST error synchronized(controllerOut) { controllerMessage = Protocol.ERROR_FILE_DOES_NOT_EXIST_TOKEN + " " + filename; } } controllerOut.println(controllerMessage); - controllerOut.flush(); messageSent(controllerSocket, controllerMessage); } catch(IOException e) { @@ -255,7 +268,6 @@ public class Dstore { if(message.equals("")) message = "ERROR_EMPTY"; synchronized(controllerOut) { controllerOut.println(message.trim()); - controllerOut.flush(); } }).start(); } @@ -311,10 +323,11 @@ public class Dstore { try { System.out.println("Sending " + filename + " to store " + dstore); Socket socket = new Socket(InetAddress.getLocalHost(), dstore.intValue()); - PrintWriter out = new PrintWriter(socket.getOutputStream()); - String dstoreMessage = Protocol.REBALANCE_STORE_TOKEN + " " + filename + " " + fileSizes.get(filename); + PrintWriter out = new PrintWriter(socket.getOutputStream(), true); + long fileSize; + synchronized(fileSizes) {fileSize = fileSizes.get(filename);} + String dstoreMessage = Protocol.REBALANCE_STORE_TOKEN + " " + filename + " " + fileSize; out.println(dstoreMessage); - out.flush(); messageSent(socket, dstoreMessage); BufferedReader in = new BufferedReader(new InputStreamReader(socket.getInputStream())); @@ -325,13 +338,18 @@ public class Dstore { System.out.println("Dstore " + dstore + " should have sent ACK but " + port + " received " + receivedMessage); } - byte[] content = new byte[8]; + byte[] content = new byte[BUFFER_SIZE]; + int len; FileInputStream fileIn = new FileInputStream(new File(fileFolder, filename)); OutputStream fileOut = socket.getOutputStream(); - while(fileIn.read(content) > 0) { - fileOut.write(content); - fileOut.flush(); + do { + len = fileIn.read(content); + if(len >= 0) { + fileOut.write(content, 0, len); + fileOut.flush(); + } } + while(len > 0); fileIn.close(); fileOut.close(); in.close(); @@ -358,7 +376,6 @@ public class Dstore { //Send REBALANCE_COMPLETE message to client (the controller) synchronized(controllerOut) { controllerOut.println(Protocol.REBALANCE_COMPLETE_TOKEN); - controllerOut.flush(); messageSent(controllerSocket, Protocol.REBALANCE_COMPLETE_TOKEN); } System.out.println("Sent message REBALANCE_COMPLETE"); diff --git a/DstoreConnection.java b/DstoreConnection.java index e7afb4a5a7b38a999242c462eb624f688947c400..3ecc064be2243e61d26cad76531f6f03ca1299b0 100644 --- a/DstoreConnection.java +++ b/DstoreConnection.java @@ -1,5 +1,6 @@ import java.io.*; import java.net.*; +import java.util.concurrent.*; import java.lang.Runnable; import java.util.List; import java.util.ArrayList; @@ -29,7 +30,7 @@ public class DstoreConnection { writer = new PrintWriter(socket.getOutputStream()); available = true; queue = new ArrayList<String>(); - disconnectException = new DstoreDisconnectException(port, socket); + disconnectException = new DstoreDisconnectException(this); } catch(IOException e) { e.printStackTrace(); @@ -41,6 +42,14 @@ public class DstoreConnection { } } + public int getPort() { + return port; + } + + public Socket getSocket() { + return socket; + } + public DstoreDisconnectException getDisconnectData() { return disconnectException; } @@ -50,7 +59,7 @@ public class DstoreConnection { synchronized(this) { try { System.out.println("Lock acquired"); - if(!available) return "ERROR"; + if(!available) throw disconnectException; writer.println(message); writer.flush(); //System.out.println("Controller sent " + message + " to port " + port); @@ -74,7 +83,7 @@ public class DstoreConnection { System.out.println("Getting lock..."); synchronized(this) { System.out.println("Lock acquired"); - if(!available) return "ERROR"; + if(!available) throw disconnectException; //Check the queue twice: once incase the receiver is busy, twice incase the message was added by the last thread findMessage = checkQueue(expectedMessages); @@ -109,14 +118,15 @@ public class DstoreConnection { protected String localReceive(String[] expectedMessages) throws DstoreDisconnectException { try { ReceiveContext rc = new ReceiveContext(expectedMessages); - synchronized(rc.lock) { - rc.start(); - rc.lock.wait(timeout); + rc.start(); + if(rc.latch.await(timeout, TimeUnit.MILLISECONDS)) { + if(rc.disconnected()) throw disconnectException; + else return rc.getReturnMessage(); + } + else { rc.end(); + return ""; } - String returnMessage = rc.getReturnMessage(); - if(returnMessage == null) throw disconnectException; - else return returnMessage; } catch(InterruptedException e) { e.printStackTrace(); @@ -161,18 +171,18 @@ public class DstoreConnection { protected String[] expectedMessages; protected String returnMessage; protected Thread thread; - public Object lock; + protected boolean disconnected; + public CountDownLatch latch; public ReceiveContext(String[] expectedMessages) { this.expectedMessages = expectedMessages; returnMessage = ""; - lock = new Object(); + disconnected = false; + latch = new CountDownLatch(1); } public String getReturnMessage() { - synchronized(lock) { - return returnMessage; - } + return returnMessage; } public void start() { @@ -183,7 +193,8 @@ public class DstoreConnection { message = reader.readLine(); if(message == null) { available = false; - throw disconnectException; + disconnected = true; + break; } ControllerLogger.getInstance().messageReceived(socket, message); if(expectedMessages.length > 0 && !isExpected(message, expectedMessages)) { @@ -200,14 +211,8 @@ public class DstoreConnection { e.printStackTrace(); returnMessage = ""; } - catch(DstoreDisconnectException e) { - e.printStackTrace(); - returnMessage = null; - } finally { - synchronized(lock) { - lock.notify(); - } + latch.countDown(); } }); thread.start(); @@ -216,5 +221,9 @@ public class DstoreConnection { public void end() { if(thread.isAlive()) thread.interrupt(); } + + public boolean disconnected() { + return disconnected; + } } } diff --git a/DstoreDisconnectException.java b/DstoreDisconnectException.java index 8187f03febef64e3ab454fe38b30e9ed6db8bf96..e720122bceb9d00117fc3197c7194b727d0635c0 100644 --- a/DstoreDisconnectException.java +++ b/DstoreDisconnectException.java @@ -2,20 +2,14 @@ import java.lang.Throwable; import java.net.Socket; public class DstoreDisconnectException extends Exception { - int port; - Socket socket; + DstoreConnection connection; - public DstoreDisconnectException(int port, Socket socket) { - super("Dstore at port " + port + " has been disconnected"); - this.port = port; - this.socket = socket; + public DstoreDisconnectException(DstoreConnection connection) { + super("Dstore at port " + connection.getPort() + " has been disconnected"); + this.connection = connection; } - public int getPort() { - return port; - } - - public Socket getSocket() { - return socket; + public DstoreConnection getConnection() { + return connection; } } diff --git a/Execute.sh b/Execute.sh index b6719668a1e1f998fc080203f9632646de2ccb3d..60fce9751d092f3c377e82e60b7c07ab984ef552 100755 --- a/Execute.sh +++ b/Execute.sh @@ -1,13 +1,13 @@ #!/bin/bash java -cp .:loggers Controller 8080 $1 $3 $4 & echo $! +processes=() for((i=1; i<=$2; i++)) do sleep 0.2 n=$((8080+$i)) echo $n s="store$i" java -cp .:loggers Dstore $n 8080 $3 $s & - echo $! done sleep 2 java -cp .:client-1.0.2.jar ClientMain 8080 $3 diff --git a/ExecuteWithCrash.sh b/ExecuteWithCrash.sh new file mode 100755 index 0000000000000000000000000000000000000000..78066640c7a05599854a10d4191415bc9b990e7b --- /dev/null +++ b/ExecuteWithCrash.sh @@ -0,0 +1,20 @@ +#!/bin/bash +java -cp .:loggers Controller 8080 $1 $3 $4 & +echo $! +processes=() +for((i=1; i<=$2; i++)) do + sleep 0.2 + n=$((8080+$i)) + echo $n + s="store$i" + java -cp .:loggers Dstore $n 8080 $3 $s & + processes+=($!) + echo ${processes[${i-1}]} +done +echo ${processes[0]} +sleep 2 +java -cp .:client-1.0.2.jar ClientMain 8080 $3 +sleep $(((2*$4)/3000)) +kill ${processes[0]} +sleep 2 +java -cp .:loggers Dstore 8081 8080 $3 store1 & diff --git a/to_store/GameDotCom.jpg b/to_store/GameDotCom.jpg new file mode 100644 index 0000000000000000000000000000000000000000..0895008ff721400e0df8cdc1f82e2307f1ffda5b Binary files /dev/null and b/to_store/GameDotCom.jpg differ diff --git a/to_store/rap.mp3 b/to_store/rap.mp3 new file mode 100644 index 0000000000000000000000000000000000000000..db4f5dfb9e7419cc0feaa6856fb5dc1e6f5a4a95 Binary files /dev/null and b/to_store/rap.mp3 differ diff --git a/to_store/spurk.jpg b/to_store/spurk.jpg new file mode 100644 index 0000000000000000000000000000000000000000..aeaa5bc7e96fa02e83492c0fa92e8f5f92d37255 Binary files /dev/null and b/to_store/spurk.jpg differ