diff --git a/ClientMain.class b/ClientMain.class deleted file mode 100644 index 0718a90f548d9eaebf0d35de7656a22ea70bea63..0000000000000000000000000000000000000000 Binary files a/ClientMain.class and /dev/null differ diff --git a/ClientMain.java b/ClientMain.java index 8645bd682e707662cb6b3dafbffaad6d205d8f18..607905026543e20433e37ae9b5eb919c3c4b531a 100644 --- a/ClientMain.java +++ b/ClientMain.java @@ -1,78 +1,107 @@ import java.io.File; import java.io.IOException; +import java.util.Random; public class ClientMain { - public static void main(String[] args) { + public static void main(String[] args) throws Exception{ - int cport = -1; - int timeout = -1; - try { - // parse arguments - cport = Integer.parseInt(args[0]); - timeout = Integer.parseInt(args[1]); - } catch (NumberFormatException e) { - System.err.println("Error parsing arguments: " + e.getMessage()); - System.err.println("Expected: java ClientMain cport timeout"); - System.exit(-1); - } + final int cport = Integer.parseInt(args[0]); + int timeout = Integer.parseInt(args[1]); File downloadFolder = new File("downloads"); if (!downloadFolder.exists()) if (!downloadFolder.mkdir()) throw new RuntimeException("Cannot create download folder (folder absolute path: " + downloadFolder.getAbsolutePath() + ")"); - testClient(cport, timeout, downloadFolder); + File uploadFolder = new File("to_store"); + if (!uploadFolder.exists()) + throw new RuntimeException("to_store folder does not exist"); + + // testClient(cport, timeout, downloadFolder); // example to launch a number of concurrent clients, each doing the same operations - /*for (int i = 0; i < 10; i++) { + for (int i = 0; i < 10; i++) { new Thread() { public void run() { - testClient(cport, timeout, downloadFolder); + test2Client(cport, timeout, downloadFolder, uploadFolder); } }.start(); - }*/ + } } - public static void testClient(int cport, int timeout, File downloadFolder) { + public static void test2Client(int cport, int timeout, File downloadFolder, File uploadFolder) { Client client = null; try { client = new Client(cport, timeout, Logger.LoggingType.ON_FILE_AND_TERMINAL); - - try { client.connect(); } catch(IOException e) { e.printStackTrace(); return; } + client.connect(); + Random random = new Random(System.currentTimeMillis() * System.nanoTime()); - String[] files = {"AllStar.txt", "Unknown.txt", "PumpkinHill.txt", "SnowHalation.txt", "Grandad.txt"}; + File fileList[] = uploadFolder.listFiles(); + for (int i=0; i<fileList.length/2; i++) { + File fileToStore = fileList[random.nextInt(fileList.length)]; + try { + client.store(fileToStore); + } catch (Exception e) { + System.out.println("Error storing file " + fileToStore); + e.printStackTrace(); + } + } - for(String file : files) { - try { client.store(new File(file)); Thread.sleep(500);} - catch(IOException e) { e.printStackTrace(); } - catch(InterruptedException e) {e.printStackTrace();} + String list[] = null; + try { list = list(client); } catch(IOException e) { e.printStackTrace(); } + + for (int i = 0; i < list.length/4; i++) { + String fileToRemove = list[random.nextInt(list.length)]; + try { + client.remove(fileToRemove); + } catch (Exception e) { + System.out.println("Error remove file " + fileToRemove); + e.printStackTrace(); + } } - /* + try { list = list(client); } catch(IOException e) { e.printStackTrace(); } + + } catch(IOException e) { + e.printStackTrace(); + } finally { + if (client != null) + try { client.disconnect(); } catch(Exception e) { e.printStackTrace(); } + } + } + + public static void testClient(int cport, int timeout, File downloadFolder) { + Client client = null; + + try { + + client = new Client(cport, timeout, Logger.LoggingType.ON_FILE_AND_TERMINAL); + + try { client.connect(); } catch(IOException e) { e.printStackTrace(); return; } try { list(client); } catch(IOException e) { e.printStackTrace(); } - try { client.store(new File("AllStar.txt")); } catch(IOException e) { e.printStackTrace(); } + try { client.store(new File("Clipboard01.pdf")); } catch(IOException e) { e.printStackTrace(); } - try { client.store(new File("AllStar.txt")); } catch(IOException e) { e.printStackTrace(); } + try { client.store(new File("Clipboard01.pdf")); } catch(IOException e) { e.printStackTrace(); } - try { client.store(new File("Unknown.txt")); } catch(IOException e) { e.printStackTrace(); } + try { client.store(new File("Clipboard01.jpg")); } catch(IOException e) { e.printStackTrace(); } - */ String list[] = null; try { list = list(client); } catch(IOException e) { e.printStackTrace(); } if (list != null) for (String filename : list) try { client.load(filename, downloadFolder); } catch(IOException e) { e.printStackTrace(); } - /* - if (list != null) + + /*if (list != null) for (String filename : list) try { client.remove(filename); } catch(IOException e) { e.printStackTrace(); } + try { client.remove(list[0]); } catch(IOException e) { e.printStackTrace(); } + + try { list(client); } catch(IOException e) { e.printStackTrace(); }*/ - try { list(client); } catch(IOException e) { e.printStackTrace(); } - */ } finally { if (client != null) try { client.disconnect(); } catch(Exception e) { e.printStackTrace(); } diff --git a/Controller$1AcksReceived.class b/Controller$1AcksReceived.class deleted file mode 100644 index 76f9c7cf8a9ab14a2ee5a059bcaaa1606c87a522..0000000000000000000000000000000000000000 Binary files a/Controller$1AcksReceived.class and /dev/null differ diff --git a/Controller$1RequireHandle.class b/Controller$1RequireHandle.class deleted file mode 100644 index 43fd8ea05aa94b6a0013a6c872b46ea016bd4086..0000000000000000000000000000000000000000 Binary files a/Controller$1RequireHandle.class and /dev/null differ diff --git a/Controller$IndexEntry$Status.class b/Controller$IndexEntry$Status.class deleted file mode 100644 index 538373f672d4086f88c7da4147a9b795369b667b..0000000000000000000000000000000000000000 Binary files a/Controller$IndexEntry$Status.class and /dev/null differ diff --git a/Controller$IndexEntry.class b/Controller$IndexEntry.class deleted file mode 100644 index dff648ecf50c78eda664bec5691882c2ca402a68..0000000000000000000000000000000000000000 Binary files a/Controller$IndexEntry.class and /dev/null differ diff --git a/Controller$RebalanceMessages.class b/Controller$RebalanceMessages.class deleted file mode 100644 index 8f171388f336657f23cacef98aa5d8958240084c..0000000000000000000000000000000000000000 Binary files a/Controller$RebalanceMessages.class and /dev/null differ diff --git a/Controller$RebalanceThread.class b/Controller$RebalanceThread.class deleted file mode 100644 index d2090d52270f1056374e16e0e80540658b15610f..0000000000000000000000000000000000000000 Binary files a/Controller$RebalanceThread.class and /dev/null differ diff --git a/Controller$Reloader.class b/Controller$Reloader.class deleted file mode 100644 index fe4456dcdad54991ba96e5ef3e7972b65c43e9ab..0000000000000000000000000000000000000000 Binary files a/Controller$Reloader.class and /dev/null differ diff --git a/Controller$RequireHandle.class b/Controller$RequireHandle.class deleted file mode 100644 index d10860a1501c7cd444e6230df4a9437cf76f3b3a..0000000000000000000000000000000000000000 Binary files a/Controller$RequireHandle.class and /dev/null differ diff --git a/Controller.class b/Controller.class deleted file mode 100644 index d9b8ef8a7fda28c4397e7c61a2fd5d1f92209ffe..0000000000000000000000000000000000000000 Binary files a/Controller.class and /dev/null differ diff --git a/Controller.java b/Controller.java index 9b13ac0c8b2e8944accd3a9a849783b5aae59ee3..756576d5330958e8884de8a1eb3eaf072debf8cc 100644 --- a/Controller.java +++ b/Controller.java @@ -1,5 +1,6 @@ import java.io.*; import java.net.*; +import java.util.concurrent.*; import java.lang.Runnable; import java.lang.Math; import java.util.Iterator; @@ -153,7 +154,11 @@ public class Controller { try { Socket client = server.accept(); BufferedReader in = new BufferedReader(new InputStreamReader(client.getInputStream())); - String[] message = in.readLine().split(" "); + String tMessage = in.readLine(); + String[] message; + if(tMessage == null) {message = new String[]{""};} + else {message = tMessage.split(" ");} + if(message[0].equals("JOIN")) { int portNumber = Integer.parseInt(message[1]); dstores.put(portNumber, new DstoreConnection(client, portNumber, timeout)); @@ -185,12 +190,14 @@ public class Controller { } while(clientMessage != null); System.out.println("Client closed"); + try {client.close();} catch(IOException e) {} }).start(); } } catch(Exception e) { //Log error System.out.println("Error accepting new connection"); + e.printStackTrace(); System.out.println("Continue..."); } } @@ -292,6 +299,7 @@ public class Controller { entry.setNumberToStore(rFactor); //Send STORE_TO message + CyclicBarrier barrier = new CyclicBarrier(rFactor + 1); PrintWriter out = new PrintWriter(client.getOutputStream()); String message = "STORE_TO"; for(Integer thisStore : storesToStore) { @@ -301,7 +309,7 @@ public class Controller { String[] receivedMessage = dstores.get(thisStore).receive("STORE_ACK").split(" "); if(receivedMessage[0].equals("STORE_ACK")) { try { - storeAck(thisStore, receivedMessage[1]); + storeAck(thisStore, receivedMessage[1], barrier); } catch(Exception e) { //Log error @@ -323,6 +331,17 @@ public class Controller { out.flush(); //Wait for STORE_ACKs from datastores in storesToStore + try { + barrier.await(timeout, TimeUnit.MILLISECONDS); + } + catch(TimeoutException e) { + //Log error + System.err.println("Not all STORE_ACKs have been received"); + } + catch(Exception e) { + e.printStackTrace(); + } + /* synchronized(entry) { try { entry.wait(timeout); @@ -336,6 +355,7 @@ public class Controller { System.out.println("Not all STORE_ACKs have been received"); } } + */ //Update index to "store complete" entry.status = IndexEntry.Status.STORE_COMPLETE; @@ -349,7 +369,7 @@ public class Controller { } } - void storeAck(Integer port, String filename) throws Exception { + void storeAck(Integer port, String filename, CyclicBarrier barrier) throws Exception { if(!index.containsKey(filename)) { //Throw logging exception throw new Exception("Index does not contain " + filename); @@ -357,6 +377,13 @@ public class Controller { IndexEntry thisEntry = index.get(filename); thisEntry.addStoredBy(port); + try { + barrier.await(); + } + catch(BrokenBarrierException e) { + System.err.println("Late STORE_ACK received from " + port + " after timeout expired"); + } + catch(Exception e) {e.printStackTrace();} } void load(Socket client, String filename) throws Exception { @@ -422,12 +449,20 @@ public class Controller { entry.status = IndexEntry.Status.REMOVE_IN_PROGRESS; //Send REMOVE message to all Dstores storing the file + CyclicBarrier barrier = new CyclicBarrier(entry.getStoredBy().size()); for(Integer dstore : entry.getStoredBy()) { new Thread(() -> { try { String[] message = dstores.get(dstore).sendAndReceive("REMOVE " + filename, "REMOVE_ACK").split(" "); if(message[0].equals("REMOVE_ACK") && message[1].equals(filename)) { entry.removeStoredBy(dstore.intValue()); + try { + barrier.await(); + } + catch(BrokenBarrierException e) { + System.err.println("Late REMOVE_ACK received from " + dstore + " after timeout expired"); + } + catch(Exception e) {e.printStackTrace();} } else { //Log error @@ -442,6 +477,15 @@ public class Controller { } //Wait for REMOVE_ACKs from all Dstores which were sent the REMOVE message + try { + barrier.await(timeout, TimeUnit.MILLISECONDS); + } + catch(TimeoutException e) { + //Log error + System.out.println("Not all REMOVE_ACKs have been received"); + } + catch(Exception e) {e.printStackTrace();} + /* synchronized(entry) { try { entry.wait(timeout); @@ -451,15 +495,17 @@ public class Controller { } } + if(entry.getStoredBy().size() > 0) { //Log error System.out.println("Not all REMOVE_ACKs have been received"); } + */ //Update index to "remove complete" entry.status = IndexEntry.Status.REMOVE_COMPLETE; synchronized(index) { - if(index.get(filename) == entry) index.remove(filename); + if(index.containsKey(filename)) index.remove(filename); } //Send REMOVE_COMPLETE to client @@ -497,13 +543,14 @@ public class Controller { synchronized(rebalanceMessages) { if(rebalanceMessages.dstoreFiles != null) return; rebalanceMessages.dstoreFiles = dstoreFiles; + CyclicBarrier barrier = new CyclicBarrier(dstores.size()); try { //Send LIST message to each Dstore and receive their file list for(Integer dstore : dstores.keySet()) { new Thread(() -> { try { String[] message = dstores.get(dstore).sendAndReceive("LIST").split(" "); - receiveDstoreList(dstore.intValue(), message); + receiveDstoreList(dstore.intValue(), message, barrier); } catch(DstoreDisconnectException e) { e.printStackTrace(); @@ -512,11 +559,21 @@ public class Controller { }).start(); } - rebalanceMessages.wait(timeout); + try { + barrier.await(timeout, TimeUnit.MILLISECONDS); + } + catch(TimeoutException e) { + //Log error + System.out.println("Not all file lists have been received"); + } + catch(Exception e) {e.printStackTrace();} + rebalanceMessages.dstoreFiles = null; + /* if(dstoreFiles.size() < dstores.size()) { //Log error System.out.println("Not all file lists have been received"); } + */ //Create a new file allocation so that: //Each file appears rFactor times @@ -524,7 +581,6 @@ public class Controller { //Files are evenly distributed (Dstores differ in capacity by at most 1, no 2 datastores have identical file lists) //First, compile all the received files from all dstores into one list - //(This should contain all the elements of index.keySet(), so is probably made redundant) List<String> fileList = new ArrayList<String>(); for(List<String> l : dstoreFiles.values()) { for(String s : l) { @@ -550,14 +606,17 @@ public class Controller { for(String file : fileList) { storeOrder = reshuffle(dstoreFiles.keySet()); it = storeOrder.iterator(); - for(int j=0; j<rFactor; j++) { - //If indexed dstore does not have the file, add it to its requireIndex entry - Integer thisStore = it.next(); - if(!dstoreFiles.get(thisStore).contains(file)) { - requireIndex.get(thisStore).add(new RequireHandle(file)); + if(index.containsKey(file)) { + for(int j=0; j<rFactor; j++) { + //If indexed dstore does not have the file, add it to its requireIndex entry + Integer thisStore = it.next(); + if(!dstoreFiles.get(thisStore).contains(file)) { + requireIndex.get(thisStore).add(new RequireHandle(file)); + } } } //Dstores not chosen in the above loop must have an entry added to removeIndex, if they have the file + //This also covers files which were missed in a previous remove operation while(it.hasNext()) { Integer thisStore = it.next(); if(dstoreFiles.get(thisStore).contains(file)) { @@ -566,22 +625,8 @@ public class Controller { } } - //This class acts as a holder for a modifiable integer value, so that threads can synchronize on its lock - class AcksReceived { - int value; - public AcksReceived() { - value = 0; - } - public void incr() { - value ++; - } - public int getValue() { - return value; - } - } - //For each dstore, compile a new message based on the contents of the indexes then send this message and receive a reply - AcksReceived acksReceived = new AcksReceived(); + CyclicBarrier barrier2 = new CyclicBarrier(dstoreFiles.size()); for(Integer thisStore : dstoreFiles.keySet()) { //Compose files to send List<String> sendMessages = new ArrayList<String>(); @@ -629,12 +674,7 @@ public class Controller { System.out.println("Dstore " + thisStore + " should have sent REBALANCE_COMPLETE but Controller received " + returnMessage); } - synchronized(acksReceived) { - acksReceived.incr(); - if(acksReceived.getValue() == dstoreFiles.size()) { - acksReceived.notifyAll(); - } - } + new Thread(() -> {try {barrier2.await();} catch(Exception e) {e.printStackTrace();}}).start(); for(int i=0; i<requireIndex.get(thisStore).size(); i++) { returnMessage = connection.receive("STORE_ACK"); @@ -652,6 +692,16 @@ public class Controller { } //Wait for REBALANCE_COMPLETE from all Dstores + try { + barrier2.await(timeout, TimeUnit.MILLISECONDS); + } + catch(TimeoutException e) { + //Restart rebalance operation + System.err.println("Not all REBALANCE_COMPLETEs received. Restarting rebalance operation..."); + success = false; + } + catch(Exception e) {e.printStackTrace();} + /* synchronized(acksReceived) { try { System.out.println("Waiting for REBALANCE_COMPLETE..."); @@ -669,6 +719,7 @@ public class Controller { e.printStackTrace(); } } + */ if(success) { synchronized(index) { @@ -679,7 +730,7 @@ public class Controller { index.get(handle.filename).addStoredBy(Integer.valueOf(dstore), false); } for(String file : removeIndex.get(dstore)) { - index.get(file).removeStoredBy(Integer.valueOf(dstore)); + if(index.containsKey(file)) index.get(file).removeStoredBy(Integer.valueOf(dstore)); } } } @@ -701,14 +752,10 @@ public class Controller { if(!success) rebalance(); } - void receiveDstoreList(int port, String[] list) { + void receiveDstoreList(int port, String[] list, CyclicBarrier barrier) { List<String> toList = new ArrayList<String>(); if(!list[0].equals("ERROR_EMPTY")) { for(String file : list) { - if(!index.containsKey(file)) { - //Log error - return; //Throw exception? - } toList.add(file); } } @@ -717,10 +764,15 @@ public class Controller { if(rebalanceMessages.dstoreFiles == null) return; rebalanceMessages.dstoreFiles.put(port, toList); - if(rebalanceMessages.dstoreFiles.size() == dstores.size()) { - rebalanceMessages.notify(); - } } + + try { + barrier.await(); + } + catch(BrokenBarrierException e) { + System.err.println("Late file list received from " + port + " after timeout expired"); + } + catch(Exception e) {e.printStackTrace();} } void removeDstore(Integer dstore) { diff --git a/Dstore.class b/Dstore.class deleted file mode 100644 index 18d98f5ac91d2e18bbf60ed629b2f407af49e0ce..0000000000000000000000000000000000000000 Binary files a/Dstore.class and /dev/null differ diff --git a/DstoreConnection$ReceiveContext.class b/DstoreConnection$ReceiveContext.class deleted file mode 100644 index 6276604e77c845f0e2b76c24946dc27deae658f0..0000000000000000000000000000000000000000 Binary files a/DstoreConnection$ReceiveContext.class and /dev/null differ diff --git a/DstoreConnection.class b/DstoreConnection.class deleted file mode 100644 index 26f8d2d10198470a3bbf2d10eb9e8a59c8d2a38f..0000000000000000000000000000000000000000 Binary files a/DstoreConnection.class and /dev/null differ diff --git a/DstoreDisconnectException.class b/DstoreDisconnectException.class deleted file mode 100644 index 22b4ca5149e1537a79318afa75ce420aba79414b..0000000000000000000000000000000000000000 Binary files a/DstoreDisconnectException.class and /dev/null differ diff --git a/Execute.sh b/Execute.sh index 0f70e0c2b840b4fb518b2f3acfd8a219e99b2986..1090c62a9162734b753ea5be6a6968a9c1a47193 100755 --- a/Execute.sh +++ b/Execute.sh @@ -10,4 +10,4 @@ for((i=1; i<=$2; i++)) do echo $! done sleep 2 -java -cp .:client-1.0.0.jar ClientMain 8080 $3 +java -cp .:client-1.0.2.jar ClientMain 8080 $3 diff --git a/client-1.0.2.jar b/client-1.0.2.jar new file mode 100644 index 0000000000000000000000000000000000000000..e10e391d3ba673d02192327bed3383618c2e4249 Binary files /dev/null and b/client-1.0.2.jar differ diff --git a/AllStar.txt b/to_store/AllStar.txt similarity index 100% rename from AllStar.txt rename to to_store/AllStar.txt diff --git a/Grandad.txt b/to_store/Grandad.txt similarity index 100% rename from Grandad.txt rename to to_store/Grandad.txt diff --git a/PumpkinHill.txt b/to_store/PumpkinHill.txt similarity index 100% rename from PumpkinHill.txt rename to to_store/PumpkinHill.txt diff --git a/SnowHalation.txt b/to_store/SnowHalation.txt similarity index 100% rename from SnowHalation.txt rename to to_store/SnowHalation.txt diff --git a/Unknown.txt b/to_store/Unknown.txt similarity index 100% rename from Unknown.txt rename to to_store/Unknown.txt