Skip to content
Snippets Groups Projects
Commit dcfee31c authored by dl3g19's avatar dl3g19
Browse files

Upgraded from synchronizing on locks to using barrier

parent 6f22fd1e
Branches
No related tags found
No related merge requests found
Showing
with 157 additions and 76 deletions
File deleted
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.util.Random;
public class ClientMain { public class ClientMain {
public static void main(String[] args) { public static void main(String[] args) throws Exception{
int cport = -1; final int cport = Integer.parseInt(args[0]);
int timeout = -1; int timeout = Integer.parseInt(args[1]);
try {
// parse arguments
cport = Integer.parseInt(args[0]);
timeout = Integer.parseInt(args[1]);
} catch (NumberFormatException e) {
System.err.println("Error parsing arguments: " + e.getMessage());
System.err.println("Expected: java ClientMain cport timeout");
System.exit(-1);
}
File downloadFolder = new File("downloads"); File downloadFolder = new File("downloads");
if (!downloadFolder.exists()) if (!downloadFolder.exists())
if (!downloadFolder.mkdir()) throw new RuntimeException("Cannot create download folder (folder absolute path: " + downloadFolder.getAbsolutePath() + ")"); if (!downloadFolder.mkdir()) throw new RuntimeException("Cannot create download folder (folder absolute path: " + downloadFolder.getAbsolutePath() + ")");
testClient(cport, timeout, downloadFolder); File uploadFolder = new File("to_store");
if (!uploadFolder.exists())
throw new RuntimeException("to_store folder does not exist");
// testClient(cport, timeout, downloadFolder);
// example to launch a number of concurrent clients, each doing the same operations // example to launch a number of concurrent clients, each doing the same operations
/*for (int i = 0; i < 10; i++) { for (int i = 0; i < 10; i++) {
new Thread() { new Thread() {
public void run() { public void run() {
testClient(cport, timeout, downloadFolder); test2Client(cport, timeout, downloadFolder, uploadFolder);
} }
}.start(); }.start();
}*/ }
} }
public static void testClient(int cport, int timeout, File downloadFolder) { public static void test2Client(int cport, int timeout, File downloadFolder, File uploadFolder) {
Client client = null; Client client = null;
try { try {
client = new Client(cport, timeout, Logger.LoggingType.ON_FILE_AND_TERMINAL); client = new Client(cport, timeout, Logger.LoggingType.ON_FILE_AND_TERMINAL);
client.connect();
Random random = new Random(System.currentTimeMillis() * System.nanoTime());
try { client.connect(); } catch(IOException e) { e.printStackTrace(); return; } File fileList[] = uploadFolder.listFiles();
for (int i=0; i<fileList.length/2; i++) {
File fileToStore = fileList[random.nextInt(fileList.length)];
try {
client.store(fileToStore);
} catch (Exception e) {
System.out.println("Error storing file " + fileToStore);
e.printStackTrace();
}
}
String[] files = {"AllStar.txt", "Unknown.txt", "PumpkinHill.txt", "SnowHalation.txt", "Grandad.txt"}; String list[] = null;
try { list = list(client); } catch(IOException e) { e.printStackTrace(); }
for(String file : files) { for (int i = 0; i < list.length/4; i++) {
try { client.store(new File(file)); Thread.sleep(500);} String fileToRemove = list[random.nextInt(list.length)];
catch(IOException e) { e.printStackTrace(); } try {
catch(InterruptedException e) {e.printStackTrace();} client.remove(fileToRemove);
} catch (Exception e) {
System.out.println("Error remove file " + fileToRemove);
e.printStackTrace();
}
} }
/* try { list = list(client); } catch(IOException e) { e.printStackTrace(); }
} catch(IOException e) {
e.printStackTrace();
} finally {
if (client != null)
try { client.disconnect(); } catch(Exception e) { e.printStackTrace(); }
}
}
public static void testClient(int cport, int timeout, File downloadFolder) {
Client client = null;
try {
client = new Client(cport, timeout, Logger.LoggingType.ON_FILE_AND_TERMINAL);
try { client.connect(); } catch(IOException e) { e.printStackTrace(); return; }
try { list(client); } catch(IOException e) { e.printStackTrace(); } try { list(client); } catch(IOException e) { e.printStackTrace(); }
try { client.store(new File("AllStar.txt")); } catch(IOException e) { e.printStackTrace(); } try { client.store(new File("Clipboard01.pdf")); } catch(IOException e) { e.printStackTrace(); }
try { client.store(new File("AllStar.txt")); } catch(IOException e) { e.printStackTrace(); } try { client.store(new File("Clipboard01.pdf")); } catch(IOException e) { e.printStackTrace(); }
try { client.store(new File("Unknown.txt")); } catch(IOException e) { e.printStackTrace(); } try { client.store(new File("Clipboard01.jpg")); } catch(IOException e) { e.printStackTrace(); }
*/
String list[] = null; String list[] = null;
try { list = list(client); } catch(IOException e) { e.printStackTrace(); } try { list = list(client); } catch(IOException e) { e.printStackTrace(); }
if (list != null) if (list != null)
for (String filename : list) for (String filename : list)
try { client.load(filename, downloadFolder); } catch(IOException e) { e.printStackTrace(); } try { client.load(filename, downloadFolder); } catch(IOException e) { e.printStackTrace(); }
/*
if (list != null) /*if (list != null)
for (String filename : list) for (String filename : list)
try { client.remove(filename); } catch(IOException e) { e.printStackTrace(); } try { client.remove(filename); } catch(IOException e) { e.printStackTrace(); }
try { client.remove(list[0]); } catch(IOException e) { e.printStackTrace(); }
try { list(client); } catch(IOException e) { e.printStackTrace(); }*/
try { list(client); } catch(IOException e) { e.printStackTrace(); }
*/
} finally { } finally {
if (client != null) if (client != null)
try { client.disconnect(); } catch(Exception e) { e.printStackTrace(); } try { client.disconnect(); } catch(Exception e) { e.printStackTrace(); }
......
File deleted
File deleted
File deleted
File deleted
File deleted
File deleted
File deleted
File deleted
File deleted
import java.io.*; import java.io.*;
import java.net.*; import java.net.*;
import java.util.concurrent.*;
import java.lang.Runnable; import java.lang.Runnable;
import java.lang.Math; import java.lang.Math;
import java.util.Iterator; import java.util.Iterator;
...@@ -153,7 +154,11 @@ public class Controller { ...@@ -153,7 +154,11 @@ public class Controller {
try { try {
Socket client = server.accept(); Socket client = server.accept();
BufferedReader in = new BufferedReader(new InputStreamReader(client.getInputStream())); BufferedReader in = new BufferedReader(new InputStreamReader(client.getInputStream()));
String[] message = in.readLine().split(" "); String tMessage = in.readLine();
String[] message;
if(tMessage == null) {message = new String[]{""};}
else {message = tMessage.split(" ");}
if(message[0].equals("JOIN")) { if(message[0].equals("JOIN")) {
int portNumber = Integer.parseInt(message[1]); int portNumber = Integer.parseInt(message[1]);
dstores.put(portNumber, new DstoreConnection(client, portNumber, timeout)); dstores.put(portNumber, new DstoreConnection(client, portNumber, timeout));
...@@ -185,12 +190,14 @@ public class Controller { ...@@ -185,12 +190,14 @@ public class Controller {
} }
while(clientMessage != null); while(clientMessage != null);
System.out.println("Client closed"); System.out.println("Client closed");
try {client.close();} catch(IOException e) {}
}).start(); }).start();
} }
} }
catch(Exception e) { catch(Exception e) {
//Log error //Log error
System.out.println("Error accepting new connection"); System.out.println("Error accepting new connection");
e.printStackTrace();
System.out.println("Continue..."); System.out.println("Continue...");
} }
} }
...@@ -292,6 +299,7 @@ public class Controller { ...@@ -292,6 +299,7 @@ public class Controller {
entry.setNumberToStore(rFactor); entry.setNumberToStore(rFactor);
//Send STORE_TO message //Send STORE_TO message
CyclicBarrier barrier = new CyclicBarrier(rFactor + 1);
PrintWriter out = new PrintWriter(client.getOutputStream()); PrintWriter out = new PrintWriter(client.getOutputStream());
String message = "STORE_TO"; String message = "STORE_TO";
for(Integer thisStore : storesToStore) { for(Integer thisStore : storesToStore) {
...@@ -301,7 +309,7 @@ public class Controller { ...@@ -301,7 +309,7 @@ public class Controller {
String[] receivedMessage = dstores.get(thisStore).receive("STORE_ACK").split(" "); String[] receivedMessage = dstores.get(thisStore).receive("STORE_ACK").split(" ");
if(receivedMessage[0].equals("STORE_ACK")) { if(receivedMessage[0].equals("STORE_ACK")) {
try { try {
storeAck(thisStore, receivedMessage[1]); storeAck(thisStore, receivedMessage[1], barrier);
} }
catch(Exception e) { catch(Exception e) {
//Log error //Log error
...@@ -323,6 +331,17 @@ public class Controller { ...@@ -323,6 +331,17 @@ public class Controller {
out.flush(); out.flush();
//Wait for STORE_ACKs from datastores in storesToStore //Wait for STORE_ACKs from datastores in storesToStore
try {
barrier.await(timeout, TimeUnit.MILLISECONDS);
}
catch(TimeoutException e) {
//Log error
System.err.println("Not all STORE_ACKs have been received");
}
catch(Exception e) {
e.printStackTrace();
}
/*
synchronized(entry) { synchronized(entry) {
try { try {
entry.wait(timeout); entry.wait(timeout);
...@@ -336,6 +355,7 @@ public class Controller { ...@@ -336,6 +355,7 @@ public class Controller {
System.out.println("Not all STORE_ACKs have been received"); System.out.println("Not all STORE_ACKs have been received");
} }
} }
*/
//Update index to "store complete" //Update index to "store complete"
entry.status = IndexEntry.Status.STORE_COMPLETE; entry.status = IndexEntry.Status.STORE_COMPLETE;
...@@ -349,7 +369,7 @@ public class Controller { ...@@ -349,7 +369,7 @@ public class Controller {
} }
} }
void storeAck(Integer port, String filename) throws Exception { void storeAck(Integer port, String filename, CyclicBarrier barrier) throws Exception {
if(!index.containsKey(filename)) { if(!index.containsKey(filename)) {
//Throw logging exception //Throw logging exception
throw new Exception("Index does not contain " + filename); throw new Exception("Index does not contain " + filename);
...@@ -357,6 +377,13 @@ public class Controller { ...@@ -357,6 +377,13 @@ public class Controller {
IndexEntry thisEntry = index.get(filename); IndexEntry thisEntry = index.get(filename);
thisEntry.addStoredBy(port); thisEntry.addStoredBy(port);
try {
barrier.await();
}
catch(BrokenBarrierException e) {
System.err.println("Late STORE_ACK received from " + port + " after timeout expired");
}
catch(Exception e) {e.printStackTrace();}
} }
void load(Socket client, String filename) throws Exception { void load(Socket client, String filename) throws Exception {
...@@ -422,12 +449,20 @@ public class Controller { ...@@ -422,12 +449,20 @@ public class Controller {
entry.status = IndexEntry.Status.REMOVE_IN_PROGRESS; entry.status = IndexEntry.Status.REMOVE_IN_PROGRESS;
//Send REMOVE message to all Dstores storing the file //Send REMOVE message to all Dstores storing the file
CyclicBarrier barrier = new CyclicBarrier(entry.getStoredBy().size());
for(Integer dstore : entry.getStoredBy()) { for(Integer dstore : entry.getStoredBy()) {
new Thread(() -> { new Thread(() -> {
try { try {
String[] message = dstores.get(dstore).sendAndReceive("REMOVE " + filename, "REMOVE_ACK").split(" "); String[] message = dstores.get(dstore).sendAndReceive("REMOVE " + filename, "REMOVE_ACK").split(" ");
if(message[0].equals("REMOVE_ACK") && message[1].equals(filename)) { if(message[0].equals("REMOVE_ACK") && message[1].equals(filename)) {
entry.removeStoredBy(dstore.intValue()); entry.removeStoredBy(dstore.intValue());
try {
barrier.await();
}
catch(BrokenBarrierException e) {
System.err.println("Late REMOVE_ACK received from " + dstore + " after timeout expired");
}
catch(Exception e) {e.printStackTrace();}
} }
else { else {
//Log error //Log error
...@@ -442,6 +477,15 @@ public class Controller { ...@@ -442,6 +477,15 @@ public class Controller {
} }
//Wait for REMOVE_ACKs from all Dstores which were sent the REMOVE message //Wait for REMOVE_ACKs from all Dstores which were sent the REMOVE message
try {
barrier.await(timeout, TimeUnit.MILLISECONDS);
}
catch(TimeoutException e) {
//Log error
System.out.println("Not all REMOVE_ACKs have been received");
}
catch(Exception e) {e.printStackTrace();}
/*
synchronized(entry) { synchronized(entry) {
try { try {
entry.wait(timeout); entry.wait(timeout);
...@@ -451,15 +495,17 @@ public class Controller { ...@@ -451,15 +495,17 @@ public class Controller {
} }
} }
if(entry.getStoredBy().size() > 0) { if(entry.getStoredBy().size() > 0) {
//Log error //Log error
System.out.println("Not all REMOVE_ACKs have been received"); System.out.println("Not all REMOVE_ACKs have been received");
} }
*/
//Update index to "remove complete" //Update index to "remove complete"
entry.status = IndexEntry.Status.REMOVE_COMPLETE; entry.status = IndexEntry.Status.REMOVE_COMPLETE;
synchronized(index) { synchronized(index) {
if(index.get(filename) == entry) index.remove(filename); if(index.containsKey(filename)) index.remove(filename);
} }
//Send REMOVE_COMPLETE to client //Send REMOVE_COMPLETE to client
...@@ -497,13 +543,14 @@ public class Controller { ...@@ -497,13 +543,14 @@ public class Controller {
synchronized(rebalanceMessages) { synchronized(rebalanceMessages) {
if(rebalanceMessages.dstoreFiles != null) return; if(rebalanceMessages.dstoreFiles != null) return;
rebalanceMessages.dstoreFiles = dstoreFiles; rebalanceMessages.dstoreFiles = dstoreFiles;
CyclicBarrier barrier = new CyclicBarrier(dstores.size());
try { try {
//Send LIST message to each Dstore and receive their file list //Send LIST message to each Dstore and receive their file list
for(Integer dstore : dstores.keySet()) { for(Integer dstore : dstores.keySet()) {
new Thread(() -> { new Thread(() -> {
try { try {
String[] message = dstores.get(dstore).sendAndReceive("LIST").split(" "); String[] message = dstores.get(dstore).sendAndReceive("LIST").split(" ");
receiveDstoreList(dstore.intValue(), message); receiveDstoreList(dstore.intValue(), message, barrier);
} }
catch(DstoreDisconnectException e) { catch(DstoreDisconnectException e) {
e.printStackTrace(); e.printStackTrace();
...@@ -512,11 +559,21 @@ public class Controller { ...@@ -512,11 +559,21 @@ public class Controller {
}).start(); }).start();
} }
rebalanceMessages.wait(timeout); try {
barrier.await(timeout, TimeUnit.MILLISECONDS);
}
catch(TimeoutException e) {
//Log error
System.out.println("Not all file lists have been received");
}
catch(Exception e) {e.printStackTrace();}
rebalanceMessages.dstoreFiles = null;
/*
if(dstoreFiles.size() < dstores.size()) { if(dstoreFiles.size() < dstores.size()) {
//Log error //Log error
System.out.println("Not all file lists have been received"); System.out.println("Not all file lists have been received");
} }
*/
//Create a new file allocation so that: //Create a new file allocation so that:
//Each file appears rFactor times //Each file appears rFactor times
...@@ -524,7 +581,6 @@ public class Controller { ...@@ -524,7 +581,6 @@ public class Controller {
//Files are evenly distributed (Dstores differ in capacity by at most 1, no 2 datastores have identical file lists) //Files are evenly distributed (Dstores differ in capacity by at most 1, no 2 datastores have identical file lists)
//First, compile all the received files from all dstores into one list //First, compile all the received files from all dstores into one list
//(This should contain all the elements of index.keySet(), so is probably made redundant)
List<String> fileList = new ArrayList<String>(); List<String> fileList = new ArrayList<String>();
for(List<String> l : dstoreFiles.values()) { for(List<String> l : dstoreFiles.values()) {
for(String s : l) { for(String s : l) {
...@@ -550,6 +606,7 @@ public class Controller { ...@@ -550,6 +606,7 @@ public class Controller {
for(String file : fileList) { for(String file : fileList) {
storeOrder = reshuffle(dstoreFiles.keySet()); storeOrder = reshuffle(dstoreFiles.keySet());
it = storeOrder.iterator(); it = storeOrder.iterator();
if(index.containsKey(file)) {
for(int j=0; j<rFactor; j++) { for(int j=0; j<rFactor; j++) {
//If indexed dstore does not have the file, add it to its requireIndex entry //If indexed dstore does not have the file, add it to its requireIndex entry
Integer thisStore = it.next(); Integer thisStore = it.next();
...@@ -557,7 +614,9 @@ public class Controller { ...@@ -557,7 +614,9 @@ public class Controller {
requireIndex.get(thisStore).add(new RequireHandle(file)); requireIndex.get(thisStore).add(new RequireHandle(file));
} }
} }
}
//Dstores not chosen in the above loop must have an entry added to removeIndex, if they have the file //Dstores not chosen in the above loop must have an entry added to removeIndex, if they have the file
//This also covers files which were missed in a previous remove operation
while(it.hasNext()) { while(it.hasNext()) {
Integer thisStore = it.next(); Integer thisStore = it.next();
if(dstoreFiles.get(thisStore).contains(file)) { if(dstoreFiles.get(thisStore).contains(file)) {
...@@ -566,22 +625,8 @@ public class Controller { ...@@ -566,22 +625,8 @@ public class Controller {
} }
} }
//This class acts as a holder for a modifiable integer value, so that threads can synchronize on its lock
class AcksReceived {
int value;
public AcksReceived() {
value = 0;
}
public void incr() {
value ++;
}
public int getValue() {
return value;
}
}
//For each dstore, compile a new message based on the contents of the indexes then send this message and receive a reply //For each dstore, compile a new message based on the contents of the indexes then send this message and receive a reply
AcksReceived acksReceived = new AcksReceived(); CyclicBarrier barrier2 = new CyclicBarrier(dstoreFiles.size());
for(Integer thisStore : dstoreFiles.keySet()) { for(Integer thisStore : dstoreFiles.keySet()) {
//Compose files to send //Compose files to send
List<String> sendMessages = new ArrayList<String>(); List<String> sendMessages = new ArrayList<String>();
...@@ -629,12 +674,7 @@ public class Controller { ...@@ -629,12 +674,7 @@ public class Controller {
System.out.println("Dstore " + thisStore + " should have sent REBALANCE_COMPLETE but Controller received " + returnMessage); System.out.println("Dstore " + thisStore + " should have sent REBALANCE_COMPLETE but Controller received " + returnMessage);
} }
synchronized(acksReceived) { new Thread(() -> {try {barrier2.await();} catch(Exception e) {e.printStackTrace();}}).start();
acksReceived.incr();
if(acksReceived.getValue() == dstoreFiles.size()) {
acksReceived.notifyAll();
}
}
for(int i=0; i<requireIndex.get(thisStore).size(); i++) { for(int i=0; i<requireIndex.get(thisStore).size(); i++) {
returnMessage = connection.receive("STORE_ACK"); returnMessage = connection.receive("STORE_ACK");
...@@ -652,6 +692,16 @@ public class Controller { ...@@ -652,6 +692,16 @@ public class Controller {
} }
//Wait for REBALANCE_COMPLETE from all Dstores //Wait for REBALANCE_COMPLETE from all Dstores
try {
barrier2.await(timeout, TimeUnit.MILLISECONDS);
}
catch(TimeoutException e) {
//Restart rebalance operation
System.err.println("Not all REBALANCE_COMPLETEs received. Restarting rebalance operation...");
success = false;
}
catch(Exception e) {e.printStackTrace();}
/*
synchronized(acksReceived) { synchronized(acksReceived) {
try { try {
System.out.println("Waiting for REBALANCE_COMPLETE..."); System.out.println("Waiting for REBALANCE_COMPLETE...");
...@@ -669,6 +719,7 @@ public class Controller { ...@@ -669,6 +719,7 @@ public class Controller {
e.printStackTrace(); e.printStackTrace();
} }
} }
*/
if(success) { if(success) {
synchronized(index) { synchronized(index) {
...@@ -679,7 +730,7 @@ public class Controller { ...@@ -679,7 +730,7 @@ public class Controller {
index.get(handle.filename).addStoredBy(Integer.valueOf(dstore), false); index.get(handle.filename).addStoredBy(Integer.valueOf(dstore), false);
} }
for(String file : removeIndex.get(dstore)) { for(String file : removeIndex.get(dstore)) {
index.get(file).removeStoredBy(Integer.valueOf(dstore)); if(index.containsKey(file)) index.get(file).removeStoredBy(Integer.valueOf(dstore));
} }
} }
} }
...@@ -701,14 +752,10 @@ public class Controller { ...@@ -701,14 +752,10 @@ public class Controller {
if(!success) rebalance(); if(!success) rebalance();
} }
void receiveDstoreList(int port, String[] list) { void receiveDstoreList(int port, String[] list, CyclicBarrier barrier) {
List<String> toList = new ArrayList<String>(); List<String> toList = new ArrayList<String>();
if(!list[0].equals("ERROR_EMPTY")) { if(!list[0].equals("ERROR_EMPTY")) {
for(String file : list) { for(String file : list) {
if(!index.containsKey(file)) {
//Log error
return; //Throw exception?
}
toList.add(file); toList.add(file);
} }
} }
...@@ -717,10 +764,15 @@ public class Controller { ...@@ -717,10 +764,15 @@ public class Controller {
if(rebalanceMessages.dstoreFiles == null) return; if(rebalanceMessages.dstoreFiles == null) return;
rebalanceMessages.dstoreFiles.put(port, toList); rebalanceMessages.dstoreFiles.put(port, toList);
if(rebalanceMessages.dstoreFiles.size() == dstores.size()) {
rebalanceMessages.notify();
} }
try {
barrier.await();
}
catch(BrokenBarrierException e) {
System.err.println("Late file list received from " + port + " after timeout expired");
} }
catch(Exception e) {e.printStackTrace();}
} }
void removeDstore(Integer dstore) { void removeDstore(Integer dstore) {
......
File deleted
File deleted
File deleted
File deleted
...@@ -10,4 +10,4 @@ for((i=1; i<=$2; i++)) do ...@@ -10,4 +10,4 @@ for((i=1; i<=$2; i++)) do
echo $! echo $!
done done
sleep 2 sleep 2
java -cp .:client-1.0.0.jar ClientMain 8080 $3 java -cp .:client-1.0.2.jar ClientMain 8080 $3
File added
File moved
File moved
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment