From 439cc54c71a053fdeaad6f03dbc04a1e588bfae1 Mon Sep 17 00:00:00 2001 From: Daniel Lucas <dl3g19@soton.ac.uk> Date: Mon, 3 May 2021 18:46:46 +0100 Subject: [PATCH] More changes to adjust for MORE late changes to the spec --- Controller.java | 172 +++++++++------------- Dstore.java | 24 ++- DstoreConnection.java | 43 ++++-- DstoreDisconnectException.java | 16 +- to_store/{Look Away.mp3 => Look_Away.mp3} | Bin 5 files changed, 128 insertions(+), 127 deletions(-) rename to_store/{Look Away.mp3 => Look_Away.mp3} (100%) diff --git a/Controller.java b/Controller.java index 9de796f..48b137a 100644 --- a/Controller.java +++ b/Controller.java @@ -8,6 +8,8 @@ import java.util.List; import java.util.ArrayList; import java.util.Map; import java.util.HashMap; +import java.util.Set; +import java.util.HashSet; import java.util.Collection; import java.util.Collections; @@ -199,6 +201,7 @@ public class Controller { } while(clientMessage != null); System.out.println("Client closed"); + loadRequests.remove(client); try {client.close();} catch(IOException e) {} } }).start(); @@ -335,7 +338,7 @@ public class Controller { entry.setNumberToStore(rFactor); //Send STORE_TO message - CyclicBarrier barrier = new CyclicBarrier(rFactor + 1); + CountDownLatch latch = new CountDownLatch(rFactor); PrintWriter out = new PrintWriter(client.getOutputStream()); String message = Protocol.STORE_TO_TOKEN; for(Integer thisStore : storesToStore) { @@ -345,7 +348,7 @@ public class Controller { String[] receivedMessage = dstores.get(thisStore).receive(Protocol.STORE_ACK_TOKEN).split(" "); if(receivedMessage[0].equals(Protocol.STORE_ACK_TOKEN)) { try { - storeAck(thisStore, receivedMessage[1], barrier); + storeAck(thisStore, receivedMessage[1], latch); } catch(Exception e) { //Log error @@ -359,7 +362,7 @@ public class Controller { } catch(DstoreDisconnectException e) { e.printStackTrace(); - removeDstore(thisStore); + removeDstore(e); } }).start(); } @@ -367,46 +370,31 @@ public class Controller { out.flush(); //Wait for STORE_ACKs from datastores in storesToStore - try { - barrier.await(timeout, TimeUnit.MILLISECONDS); + if(latch.await(timeout, TimeUnit.MILLISECONDS)) { + //Update index to "store complete" + entry.setStatus(IndexEntry.Status.STORE_COMPLETE); + + //Send STORE_COMPLETE message + out.println(Protocol.STORE_COMPLETE_TOKEN); + out.flush(); + messageSent(client, Protocol.STORE_COMPLETE_TOKEN); } - catch(TimeoutException e) { + else { //Log error System.err.println("Not all STORE_ACKs have been received"); - } - catch(Exception e) { - e.printStackTrace(); - } - /* - synchronized(entry) { - try { - entry.wait(timeout); - } - catch(InterruptedException e) { - e.printStackTrace(); - } - if(entry.getStoredBy().size() < rFactor) { - //Log error - System.out.println("Not all STORE_ACKs have been received"); + //Remove file from index + synchronized(index) { + index.remove(filename); } } - */ - - //Update index to "store complete" - entry.status = IndexEntry.Status.STORE_COMPLETE; - - //Send STORE_COMPLETE message - out.println(Protocol.STORE_COMPLETE_TOKEN); - out.flush(); - messageSent(client, Protocol.STORE_COMPLETE_TOKEN); } catch(IOException e) { e.printStackTrace(); } } - void storeAck(Integer port, String filename, CyclicBarrier barrier) throws Exception { + void storeAck(Integer port, String filename, CountDownLatch latch) throws Exception { if(!index.containsKey(filename)) { //Throw logging exception throw new Exception("Index does not contain " + filename); @@ -414,18 +402,12 @@ public class Controller { IndexEntry thisEntry = index.get(filename); thisEntry.addStoredBy(port); - try { - barrier.await(); - } - catch(BrokenBarrierException e) { - System.err.println("Late STORE_ACK received from " + port + " after timeout expired"); - } - catch(Exception e) {e.printStackTrace();} + latch.countDown(); } void load(Socket client, String filename) throws Exception { try { - if(!index.containsKey(filename) || index.get(filename).status != IndexEntry.Status.STORE_COMPLETE) { + if(!index.containsKey(filename) || index.get(filename).getStatus() != IndexEntry.Status.STORE_COMPLETE) { PrintWriter out = new PrintWriter(client.getOutputStream()); out.println(Protocol.ERROR_FILE_DOES_NOT_EXIST_TOKEN); out.flush(); @@ -478,7 +460,7 @@ public class Controller { void remove(Socket client, String filename) throws Exception { try { - if(!index.containsKey(filename) || index.get(filename).status != IndexEntry.Status.STORE_COMPLETE) { + if(!index.containsKey(filename) || index.get(filename).getStatus() != IndexEntry.Status.STORE_COMPLETE) { PrintWriter clientOut = new PrintWriter(client.getOutputStream()); clientOut.println(Protocol.ERROR_FILE_DOES_NOT_EXIST_TOKEN); clientOut.flush(); @@ -488,23 +470,17 @@ public class Controller { //Update index to "remove in progress" IndexEntry entry = index.get(filename); - entry.status = IndexEntry.Status.REMOVE_IN_PROGRESS; + entry.setStatus(IndexEntry.Status.REMOVE_IN_PROGRESS); //Send REMOVE message to all Dstores storing the file - CyclicBarrier barrier = new CyclicBarrier(entry.getStoredBy().size() + 1); + CountDownLatch latch = new CountDownLatch(entry.getStoredBy().size()); for(Integer dstore : entry.getStoredBy()) { new Thread(() -> { try { String[] message = dstores.get(dstore).sendAndReceive(Protocol.REMOVE_TOKEN + " " + filename, Protocol.REMOVE_ACK_TOKEN).split(" "); if(message[0].equals(Protocol.REMOVE_ACK_TOKEN) && message[1].equals(filename)) { entry.removeStoredBy(dstore.intValue()); - try { - barrier.await(); - } - catch(BrokenBarrierException e) { - System.err.println("Late REMOVE_ACK received from " + dstore + " after timeout expired"); - } - catch(Exception e) {e.printStackTrace();} + latch.countDown(); } else { //Log error @@ -513,39 +489,19 @@ public class Controller { } catch(DstoreDisconnectException e) { e.printStackTrace(); - removeDstore(dstore); + removeDstore(e); } }).start(); } //Wait for REMOVE_ACKs from all Dstores which were sent the REMOVE message - try { - barrier.await(timeout, TimeUnit.MILLISECONDS); - } - catch(TimeoutException e) { - //Log error - System.out.println("Not all REMOVE_ACKs have been received"); - } - catch(Exception e) {e.printStackTrace();} - /* - synchronized(entry) { - try { - entry.wait(timeout); - } - catch(InterruptedException e) { - e.printStackTrace(); - } - } - - - if(entry.getStoredBy().size() > 0) { + if(!latch.await(timeout, TimeUnit.MILLISECONDS)) { //Log error System.out.println("Not all REMOVE_ACKs have been received"); } - */ //Update index to "remove complete" - entry.status = IndexEntry.Status.REMOVE_COMPLETE; + entry.setStatus(IndexEntry.Status.REMOVE_COMPLETE); synchronized(index) { if(index.get(filename) == entry) index.remove(filename); } @@ -567,7 +523,7 @@ public class Controller { //Send file list to client String message = Protocol.LIST_TOKEN + " "; for(String name : index.keySet()) { - if(index.get(name).status == IndexEntry.Status.STORE_COMPLETE) message = message + name + " "; + if(index.get(name).getStatus() == IndexEntry.Status.STORE_COMPLETE) message = message + name + " "; } PrintWriter out = new PrintWriter(client.getOutputStream()); System.out.println("Sending..."); @@ -588,28 +544,32 @@ public class Controller { if(rebalanceMessages.dstoreFiles != null) return; rebalanceMessages.dstoreFiles = dstoreFiles; } - CyclicBarrier barrier = new CyclicBarrier(dstores.size() + 1); + CountDownLatch listLatch = new CountDownLatch(dstores.size()); try { //Send LIST message to each Dstore and receive their file list for(Integer dstore : dstores.keySet()) { new Thread(() -> { try { String[] message = dstores.get(dstore).sendAndReceive(Protocol.LIST_TOKEN).split(" "); - receiveDstoreList(dstore.intValue(), message, barrier); + receiveDstoreList(dstore.intValue(), message, listLatch); } catch(DstoreDisconnectException e) { e.printStackTrace(); - removeDstore(dstore); + removeDstore(e); } }).start(); } try { - barrier.await(timeout, TimeUnit.MILLISECONDS); - } - catch(TimeoutException e) { - //Log error - System.out.println("Not all file lists have been received"); + if(!listLatch.await(timeout, TimeUnit.MILLISECONDS)) { + //Log error + System.err.println("Not all file lists have been received"); + Set<Integer> storesToRemove = new HashSet<Integer>(dstores.keySet()); + storesToRemove.removeAll(dstoreFiles.keySet()); + for(Integer dstore : storesToRemove) { + removeDstore(dstores.get(dstore).getDisconnectData()); + } + } } catch(Exception e) {e.printStackTrace();} @@ -674,7 +634,7 @@ public class Controller { } //For each dstore, compile a new message based on the contents of the indexes then send this message and receive a reply - CyclicBarrier barrier2 = new CyclicBarrier(dstoreFiles.size() + 1); + CountDownLatch latch = new CountDownLatch(dstoreFiles.size()); for(Integer thisStore : dstoreFiles.keySet()) { //Compose files to send List<String> sendMessages = new ArrayList<String>(); @@ -701,13 +661,20 @@ public class Controller { sendMessages.add(fileMessage); } + //Don't need to send a rebalance message if there is nothing to update + List<String> thisRemove = removeIndex.get(thisStore); + if(sendMessages.isEmpty() && thisRemove.isEmpty()) { + latch.countDown(); + return; + } + String message = Protocol.REBALANCE_TOKEN + " " + sendMessages.size(); for(String s : sendMessages) { message = message + " " + s; } //Compose files to remove - message = message + " " + removeIndex.get(thisStore).size(); - for(String f : removeIndex.get(thisStore)) { + message = message + " " + thisRemove.size(); + for(String f : thisRemove) { message = message + " " + f; } @@ -722,7 +689,7 @@ public class Controller { System.out.println("Dstore " + thisStore + " should have sent REBALANCE_COMPLETE but Controller received " + returnMessage); } - new Thread(() -> {try {barrier2.await();} catch(Exception e) {e.printStackTrace();}}).start(); + try {latch.countDown();} catch(Exception e) {e.printStackTrace();} for(int i=0; i<requireIndex.get(thisStore).size(); i++) { returnMessage = connection.receive(Protocol.STORE_ACK_TOKEN); @@ -734,19 +701,18 @@ public class Controller { } catch(DstoreDisconnectException e) { e.printStackTrace(); - removeDstore(thisStore); + removeDstore(e); } }).start(); } //Wait for REBALANCE_COMPLETE from all Dstores try { - barrier2.await(timeout, TimeUnit.MILLISECONDS); - } - catch(TimeoutException e) { - //Restart rebalance operation - System.err.println("Not all REBALANCE_COMPLETEs received. Restarting rebalance operation..."); - success = false; + if(!latch.await(timeout, TimeUnit.MILLISECONDS)) { + //Restart rebalance operation + System.err.println("Not all REBALANCE_COMPLETEs received. Restarting rebalance operation..."); + success = false; + } } catch(Exception e) {e.printStackTrace();} /* @@ -801,7 +767,7 @@ public class Controller { if(!success) rebalance(); } - void receiveDstoreList(int port, String[] list, CyclicBarrier barrier) { + void receiveDstoreList(int port, String[] list, CountDownLatch latch) { List<String> toList = new ArrayList<String>(); if(!list[0].equals("ERROR_EMPTY")) { for(String file : list) { @@ -815,20 +781,18 @@ public class Controller { rebalanceMessages.dstoreFiles.put(port, toList); } - try { - barrier.await(); - } - catch(BrokenBarrierException e) { - System.err.println("Late file list received from " + port + " after timeout expired"); - } - catch(Exception e) {e.printStackTrace();} + latch.countDown(); } - void removeDstore(Integer dstore) { - dstores.remove(dstore); - Iterator<IndexEntry> it = index.values().iterator(); + void removeDstore(DstoreDisconnectException e) { + Integer port = e.getPort(); + dstores.remove(port); + try {e.getSocket().close();} catch(IOException ee) {} + + Iterator<IndexEntry> it; + synchronized(index) {it = index.values().iterator();} while(it.hasNext()) { - it.next().removeStoredBy(dstore); + it.next().removeStoredBy(port); } } diff --git a/Dstore.java b/Dstore.java index 6ac0414..9bd7979 100644 --- a/Dstore.java +++ b/Dstore.java @@ -7,7 +7,7 @@ import java.util.List; import java.util.ArrayList; import java.util.Map; import java.util.HashMap; -import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; public class Dstore { @@ -118,19 +118,19 @@ public class Dstore { } void handleMessage(String[] message, Socket client) throws Exception { - if(message[0].equals("STORE")) { + if(message[0].equals(Protocol.STORE_TOKEN) || message[0].equals(Protocol.REBALANCE_STORE_TOKEN)) { store(client, message[1], Long.parseLong(message[2])); } - else if(message[0].equals("LOAD_DATA")) { + else if(message[0].equals(Protocol.LOAD_DATA_TOKEN)) { load(client, message[1]); } - else if(message[0].equals("REMOVE")) { + else if(message[0].equals(Protocol.REMOVE_TOKEN)) { remove(message[1]); } - else if(message[0].equals("LIST")) { + else if(message[0].equals(Protocol.LIST_TOKEN)) { list(); } - else if(message[0].equals("REBALANCE")) { + else if(message[0].equals(Protocol.REBALANCE_TOKEN)) { rebalance(message); } else { @@ -206,7 +206,7 @@ public class Dstore { e.printStackTrace(); } finally { - try {client.close();} catch(IOException e) {e.printStackTrace();} + try {if(!client.isClosed()) client.close();} catch(IOException e) {e.printStackTrace();} } }).start(); } @@ -222,7 +222,6 @@ public class Dstore { //Send REMOVE_ACK message to client (the controller) synchronized(controllerOut) { controllerMessage = Protocol.REMOVE_ACK_TOKEN + " " + filename; - } } else { @@ -300,7 +299,7 @@ public class Dstore { System.out.println("Interpreting complete, will send " + numberToSend + " and remove " + numberToRemove); //Send each file to send to the Dstore at the specified port number - CyclicBarrier barrier = new CyclicBarrier(totalReceivers + 1); + CountDownLatch latch = new CountDownLatch(totalReceivers); for(Integer dstore : filesToSend.keySet()) { for(String filename : filesToSend.get(dstore)) { new Thread(() -> { @@ -308,7 +307,7 @@ public class Dstore { System.out.println("Sending " + filename + " to store " + dstore); Socket socket = new Socket(InetAddress.getLocalHost(), dstore.intValue()); PrintWriter out = new PrintWriter(socket.getOutputStream()); - String dstoreMessage = Protocol.STORE_TOKEN + " " + filename + " " + fileSizes.get(filename); + String dstoreMessage = Protocol.REBALANCE_STORE_TOKEN + " " + filename + " " + fileSizes.get(filename); out.println(dstoreMessage); out.flush(); messageSent(socket, dstoreMessage); @@ -338,12 +337,12 @@ public class Dstore { e.printStackTrace(); } finally { - try {barrier.await();} catch(Exception e) {} + try {latch.countDown();} catch(Exception e) {} } }).start(); } } - try {barrier.await((long) timeout, TimeUnit.MILLISECONDS);} catch(Exception e) {e.printStackTrace();} + try {latch.await(timeout, TimeUnit.MILLISECONDS);} catch(Exception e) {e.printStackTrace();} //Remove each file to remove from fileFolder for(String filename : filesToRemove) { @@ -358,7 +357,6 @@ public class Dstore { messageSent(controllerSocket, Protocol.REBALANCE_COMPLETE_TOKEN); } System.out.println("Sent message REBALANCE_COMPLETE"); - //TO DO: WAIT FOR RESPONSES BEFORE SENDING REBALANCE_COMPLETE }).start(); } diff --git a/DstoreConnection.java b/DstoreConnection.java index d402186..3f82995 100644 --- a/DstoreConnection.java +++ b/DstoreConnection.java @@ -6,12 +6,13 @@ import java.util.ArrayList; import java.util.Map; import java.util.HashMap; import java.util.Collection; +import java.util.Iterator; public class DstoreConnection { protected final int MAX_QUEUE_SIZE = 50; protected Socket socket; - protected int port; //Solely used for debugging purposes + protected int port; protected BufferedReader reader; protected PrintWriter writer; protected boolean available; @@ -28,7 +29,7 @@ public class DstoreConnection { writer = new PrintWriter(socket.getOutputStream()); available = true; queue = new ArrayList<String>(); - disconnectException = new DstoreDisconnectException(port); + disconnectException = new DstoreDisconnectException(port, socket); } catch(IOException e) { e.printStackTrace(); @@ -40,6 +41,10 @@ public class DstoreConnection { } } + public DstoreDisconnectException getDisconnectData() { + return disconnectException; + } + public String sendAndReceive(String message, String expectedMessage) throws DstoreDisconnectException { System.out.println("Getting lock..."); synchronized(this) { @@ -65,24 +70,44 @@ public class DstoreConnection { } public String receive(String expectedMessage) throws DstoreDisconnectException { + String findMessage = checkQueue(expectedMessage); + if(findMessage != null) { + return findMessage; + } + System.out.println("Getting lock..."); synchronized(this) { System.out.println("Lock acquired"); if(!available) return "ERROR"; - //Check the queue for the message before trying to receive any new messages (if no expected message is specified, return the head of the queue) - for(int i=0; i<queue.size(); i++) { - String message = queue.get(i); - if(expectedMessage == null || expectedMessage.equals(message.split(" ")[0])) { - queue.remove(message); - return message; - } + //Check the queue twice: once incase the receiver is busy, twice incase the message was added by the last thread + findMessage = checkQueue(expectedMessage); + if(findMessage != null) { + return findMessage; } return localReceive(expectedMessage); } } + //Check the queue for the message before trying to receive any new messages (if no expected message is specified, return the head of the queue) + protected String checkQueue(String expectedMessage) { + Iterator<String> it; + synchronized(queue) { + it = queue.iterator(); + } + + while(it.hasNext()) { + String message = it.next(); + if(expectedMessage == null || expectedMessage.equals(message.split(" ")[0])) { + queue.remove(message); + return message; + } + } + + return null; + } + public String receive() throws DstoreDisconnectException { return receive(null); } diff --git a/DstoreDisconnectException.java b/DstoreDisconnectException.java index 82cd0fd..8187f03 100644 --- a/DstoreDisconnectException.java +++ b/DstoreDisconnectException.java @@ -1,7 +1,21 @@ import java.lang.Throwable; +import java.net.Socket; public class DstoreDisconnectException extends Exception { - public DstoreDisconnectException(int port) { + int port; + Socket socket; + + public DstoreDisconnectException(int port, Socket socket) { super("Dstore at port " + port + " has been disconnected"); + this.port = port; + this.socket = socket; + } + + public int getPort() { + return port; + } + + public Socket getSocket() { + return socket; } } diff --git a/to_store/Look Away.mp3 b/to_store/Look_Away.mp3 similarity index 100% rename from to_store/Look Away.mp3 rename to to_store/Look_Away.mp3 -- GitLab