Skip to content
Snippets Groups Projects
Commit 6f22fd1e authored by dl3g19's avatar dl3g19
Browse files

Better concurrency and debugging implemented, code has been tested with r=3, n=5 without problems

parent 6bec82a1
No related branches found
No related tags found
No related merge requests found
No preview for this file type
File added
No preview for this file type
File added
No preview for this file type
...@@ -155,8 +155,9 @@ public class Controller { ...@@ -155,8 +155,9 @@ public class Controller {
BufferedReader in = new BufferedReader(new InputStreamReader(client.getInputStream())); BufferedReader in = new BufferedReader(new InputStreamReader(client.getInputStream()));
String[] message = in.readLine().split(" "); String[] message = in.readLine().split(" ");
if(message[0].equals("JOIN")) { if(message[0].equals("JOIN")) {
dstores.put(Integer.parseInt(message[1]), new DstoreConnection(client, timeout)); int portNumber = Integer.parseInt(message[1]);
System.out.println("Dstore at " + message[1] + " joined"); dstores.put(portNumber, new DstoreConnection(client, portNumber, timeout));
System.out.println("Dstore at " + portNumber + " joined");
try {rebalanceThread.interrupt();} catch(SecurityException e) {e.printStackTrace();} try {rebalanceThread.interrupt();} catch(SecurityException e) {e.printStackTrace();}
} }
else { else {
...@@ -189,7 +190,7 @@ public class Controller { ...@@ -189,7 +190,7 @@ public class Controller {
} }
catch(Exception e) { catch(Exception e) {
//Log error //Log error
System.out.println("Controller error while accepting connections!"); System.out.println("Error accepting new connection");
System.out.println("Continue..."); System.out.println("Continue...");
} }
} }
...@@ -521,42 +522,43 @@ public class Controller { ...@@ -521,42 +522,43 @@ public class Controller {
//Each file appears rFactor times //Each file appears rFactor times
//Each file appears at most once on each datastore //Each file appears at most once on each datastore
//Files are evenly distributed (Dstores differ in capacity by at most 1, no 2 datastores have identical file lists) //Files are evenly distributed (Dstores differ in capacity by at most 1, no 2 datastores have identical file lists)
List<Integer> storeOrder = reshuffle(dstoreFiles.keySet());
//First, compile all the received files from all dstores into one list
//(This should contain all the elements of index.keySet(), so is probably made redundant)
List<String> fileList = new ArrayList<String>(); List<String> fileList = new ArrayList<String>();
for(Integer i : storeOrder) { for(List<String> l : dstoreFiles.values()) {
for(String s : dstoreFiles.get(i)) { for(String s : l) {
if(!fileList.contains(s)) { if(!fileList.contains(s)) {
fileList.add(s); fileList.add(s);
} }
} }
} }
Map<Integer,List<String>> requireIndex = new HashMap<Integer,List<String>>(); //Create a new index each for files required and files to remove
Map<Integer,List<RequireHandle>> requireIndex = new HashMap<Integer,List<RequireHandle>>();
Map<Integer,List<String>> removeIndex = new HashMap<Integer,List<String>>(); Map<Integer,List<String>> removeIndex = new HashMap<Integer,List<String>>();
int pos = 0; int pos = 0;
int storeSize = (int) Math.ceil((fileList.size() * rFactor) / dstores.size()); int storeSize = (int) Math.ceil((fileList.size() * rFactor) / dstores.size());
for(Integer i : dstoreFiles.keySet()) { for(Integer i : dstoreFiles.keySet()) {
requireIndex.put(i, new ArrayList<String>()); requireIndex.put(i, new ArrayList<RequireHandle>());
removeIndex.put(i, new ArrayList<String>()); removeIndex.put(i, new ArrayList<String>());
} }
Iterator<Integer> it = null;
//Insert files into the new indexes. These are allocated according to the new store order
List<Integer> storeOrder;
Iterator<Integer> it;
for(String file : fileList) { for(String file : fileList) {
for(int j=0; j<rFactor; j++) { storeOrder = reshuffle(dstoreFiles.keySet());
if(it == null || !it.hasNext()) {
it = storeOrder.iterator(); it = storeOrder.iterator();
} for(int j=0; j<rFactor; j++) {
//If indexed dstore does not have the file, add it to its requireIndex entry //If indexed dstore does not have the file, add it to its requireIndex entry
Integer thisStore = it.next(); Integer thisStore = it.next();
if(!dstoreFiles.get(thisStore).contains(file)) { if(!dstoreFiles.get(thisStore).contains(file)) {
requireIndex.get(thisStore).add(file); requireIndex.get(thisStore).add(new RequireHandle(file));
} }
} }
//Dstores not chosen in the above loop must have an entry added to removeIndex, if they have the file //Dstores not chosen in the above loop must have an entry added to removeIndex, if they have the file
for(int j=0; j<(requireIndex.size() - rFactor); j++) { while(it.hasNext()) {
if(it == null || !it.hasNext()) {
it = storeOrder.iterator();
}
Integer thisStore = it.next(); Integer thisStore = it.next();
if(dstoreFiles.get(thisStore).contains(file)) { if(dstoreFiles.get(thisStore).contains(file)) {
removeIndex.get(thisStore).add(file); removeIndex.get(thisStore).add(file);
...@@ -564,6 +566,7 @@ public class Controller { ...@@ -564,6 +566,7 @@ public class Controller {
} }
} }
//This class acts as a holder for a modifiable integer value, so that threads can synchronize on its lock
class AcksReceived { class AcksReceived {
int value; int value;
public AcksReceived() { public AcksReceived() {
...@@ -576,24 +579,31 @@ public class Controller { ...@@ -576,24 +579,31 @@ public class Controller {
return value; return value;
} }
} }
//For each dstore, compile a new message based on the contents of the indexes then send this message and receive a reply
AcksReceived acksReceived = new AcksReceived(); AcksReceived acksReceived = new AcksReceived();
for(Integer thisStore : storeOrder) { for(Integer thisStore : dstoreFiles.keySet()) {
//Compose files to send
List<String> sendMessages = new ArrayList<String>(); List<String> sendMessages = new ArrayList<String>();
for(String file : dstoreFiles.get(thisStore)) { for(String file : dstoreFiles.get(thisStore)) {
if(isEmptyListMap(requireIndex)) break; //All files required by other dstores are getting sent = no need for the following computation
if(allHandled(requireIndex)) break;
String fileMessage = ""; String fileMessage = "";
for(Integer otherStore : requireIndex.keySet()) { for(Integer otherStore : requireIndex.keySet()) {
if(thisStore.equals(otherStore)) continue; if(thisStore.equals(otherStore)) continue;
for(String otherFile : requireIndex.get(otherStore)) { for(RequireHandle otherHandle : requireIndex.get(otherStore)) {
if(file.equals(otherFile)) { if(file.equals(otherHandle.filename)) {
requireIndex.get(otherStore).remove(otherFile); if(!otherHandle.handled) {
//Another store requires a file that this store has - send it there
otherHandle.handled = true;
fileMessage = fileMessage + " " + otherStore.toString(); fileMessage = fileMessage + " " + otherStore.toString();
}
break; break;
} }
} }
} }
if(fileMessage.equals("")) continue; if(fileMessage.equals("")) continue; //No files to send
fileMessage = file + " " + (fileMessage.trim().split(" ").length) + fileMessage; fileMessage = file + " " + (fileMessage.trim().split(" ").length) + fileMessage;
sendMessages.add(fileMessage); sendMessages.add(fileMessage);
} }
...@@ -602,6 +612,7 @@ public class Controller { ...@@ -602,6 +612,7 @@ public class Controller {
for(String s : sendMessages) { for(String s : sendMessages) {
message = message + " " + s; message = message + " " + s;
} }
//Compose files to remove
message = message + " " + removeIndex.get(thisStore).size(); message = message + " " + removeIndex.get(thisStore).size();
for(String f : removeIndex.get(thisStore)) { for(String f : removeIndex.get(thisStore)) {
message = message + " " + f; message = message + " " + f;
...@@ -611,7 +622,8 @@ public class Controller { ...@@ -611,7 +622,8 @@ public class Controller {
String finalMessage = message; String finalMessage = message;
new Thread(() -> { new Thread(() -> {
try { try {
String returnMessage = dstores.get(thisStore).sendAndReceive(finalMessage, "REBALANCE_COMPLETE"); DstoreConnection connection = dstores.get(thisStore);
String returnMessage = connection.sendAndReceive(finalMessage, "REBALANCE_COMPLETE");
if(!returnMessage.equals("REBALANCE_COMPLETE")) { if(!returnMessage.equals("REBALANCE_COMPLETE")) {
//Log error //Log error
System.out.println("Dstore " + thisStore + " should have sent REBALANCE_COMPLETE but Controller received " + returnMessage); System.out.println("Dstore " + thisStore + " should have sent REBALANCE_COMPLETE but Controller received " + returnMessage);
...@@ -619,10 +631,18 @@ public class Controller { ...@@ -619,10 +631,18 @@ public class Controller {
synchronized(acksReceived) { synchronized(acksReceived) {
acksReceived.incr(); acksReceived.incr();
if(acksReceived.getValue() == storeOrder.size()) { if(acksReceived.getValue() == dstoreFiles.size()) {
acksReceived.notifyAll(); acksReceived.notifyAll();
} }
} }
for(int i=0; i<requireIndex.get(thisStore).size(); i++) {
returnMessage = connection.receive("STORE_ACK");
if(!returnMessage.split(" ")[0].equals("STORE_ACK")) {
//Log error
System.out.println("Dstore " + thisStore + " should have sent STORE_ACK but Controller received " + returnMessage);
}
}
} }
catch(DstoreDisconnectException e) { catch(DstoreDisconnectException e) {
e.printStackTrace(); e.printStackTrace();
...@@ -636,12 +656,12 @@ public class Controller { ...@@ -636,12 +656,12 @@ public class Controller {
try { try {
System.out.println("Waiting for REBALANCE_COMPLETE..."); System.out.println("Waiting for REBALANCE_COMPLETE...");
acksReceived.wait(timeout); acksReceived.wait(timeout);
if(acksReceived.getValue() < storeOrder.size()) { if(acksReceived.getValue() < dstoreFiles.size()) {
//Restart rebalance operation //Restart rebalance operation
System.out.println("Not all REBALANCE_COMPLETEs received. Restarting rebalance operation..."); System.out.println("Not all REBALANCE_COMPLETEs received. Restarting rebalance operation...");
success = false; success = false;
} }
else if(acksReceived.getValue() > storeOrder.size()) { else if(acksReceived.getValue() > dstoreFiles.size()) {
System.out.println("Too many REBALANCE_COMPLETEs received"); System.out.println("Too many REBALANCE_COMPLETEs received");
} }
} }
...@@ -655,8 +675,8 @@ public class Controller { ...@@ -655,8 +675,8 @@ public class Controller {
Iterator<Integer> jt = requireIndex.keySet().iterator(); Iterator<Integer> jt = requireIndex.keySet().iterator();
while(jt.hasNext()) { while(jt.hasNext()) {
Integer dstore = jt.next(); Integer dstore = jt.next();
for(String file : requireIndex.get(dstore)) { for(RequireHandle handle : requireIndex.get(dstore)) {
index.get(file).addStoredBy(Integer.valueOf(dstore), false); index.get(handle.filename).addStoredBy(Integer.valueOf(dstore), false);
} }
for(String file : removeIndex.get(dstore)) { for(String file : removeIndex.get(dstore)) {
index.get(file).removeStoredBy(Integer.valueOf(dstore)); index.get(file).removeStoredBy(Integer.valueOf(dstore));
...@@ -721,10 +741,20 @@ public class Controller { ...@@ -721,10 +741,20 @@ public class Controller {
return list; return list;
} }
<T,U> boolean isEmptyListMap(Map<T,List<U>> map) { //Helper class for rebalance method - contains a filename and a boolean which is true if a dstore is going to send this file
for(List<U> list : map.values()) { protected class RequireHandle {
if(!list.isEmpty()) { public String filename;
return false; public boolean handled;
public RequireHandle(String filename) {
this.filename = filename;
handled = false;
}
}
boolean allHandled(Map<Integer,List<RequireHandle>> map) {
for(List<RequireHandle> list : map.values()) {
for(RequireHandle handle : list) {
if(!handle.handled) return false;
} }
} }
return true; return true;
......
No preview for this file type
...@@ -7,6 +7,8 @@ import java.util.List; ...@@ -7,6 +7,8 @@ import java.util.List;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Map; import java.util.Map;
import java.util.HashMap; import java.util.HashMap;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.TimeUnit;
public class Dstore { public class Dstore {
protected int port; //Port to listen on protected int port; //Port to listen on
...@@ -68,8 +70,8 @@ public class Dstore { ...@@ -68,8 +70,8 @@ public class Dstore {
} }
public void start() { public void start() {
try { try(Socket controllerSocket = new Socket(InetAddress.getLocalHost(), cport)) {
controllerSocket = new Socket(InetAddress.getLocalHost(), cport); this.controllerSocket = controllerSocket;
controllerIn = new BufferedReader(new InputStreamReader(controllerSocket.getInputStream())); controllerIn = new BufferedReader(new InputStreamReader(controllerSocket.getInputStream()));
controllerOut = new PrintWriter(controllerSocket.getOutputStream()); controllerOut = new PrintWriter(controllerSocket.getOutputStream());
controllerOut.println("JOIN " + port); controllerOut.println("JOIN " + port);
...@@ -151,8 +153,10 @@ public class Dstore { ...@@ -151,8 +153,10 @@ public class Dstore {
writer.close(); writer.close();
//Send STORE_ACK message to the Controller //Send STORE_ACK message to the Controller
synchronized(controllerOut) {
controllerOut.println("STORE_ACK " + filename); controllerOut.println("STORE_ACK " + filename);
controllerOut.flush(); controllerOut.flush();
}
if(fileSizes.containsKey(filename)) fileSizes.remove(filename); if(fileSizes.containsKey(filename)) fileSizes.remove(filename);
fileSizes.put(filename, Long.valueOf(filesize)); fileSizes.put(filename, Long.valueOf(filesize));
...@@ -211,15 +215,19 @@ public class Dstore { ...@@ -211,15 +215,19 @@ public class Dstore {
if(Files.deleteIfExists(path)) { if(Files.deleteIfExists(path)) {
//Send REMOVE_ACK message to client (the controller) //Send REMOVE_ACK message to client (the controller)
synchronized(controllerOut) {
controllerOut.println("REMOVE_ACK " + filename); controllerOut.println("REMOVE_ACK " + filename);
controllerOut.flush();
}
} }
else { else {
//Send DOES NOT EXIST error //Send DOES NOT EXIST error
synchronized(controllerOut) {
controllerOut.println("ERROR DOES_NOT_EXIST " + filename); controllerOut.println("ERROR DOES_NOT_EXIST " + filename);
}
controllerOut.flush(); controllerOut.flush();
} }
}
}
catch(IOException e) { catch(IOException e) {
e.printStackTrace(); e.printStackTrace();
} }
...@@ -234,8 +242,10 @@ public class Dstore { ...@@ -234,8 +242,10 @@ public class Dstore {
message = message + " " + file.getName(); message = message + " " + file.getName();
} }
if(message.equals("")) message = "ERROR_EMPTY"; if(message.equals("")) message = "ERROR_EMPTY";
synchronized(controllerOut) {
controllerOut.println(message.trim()); controllerOut.println(message.trim());
controllerOut.flush(); controllerOut.flush();
}
}).start(); }).start();
} }
...@@ -253,6 +263,7 @@ public class Dstore { ...@@ -253,6 +263,7 @@ public class Dstore {
} }
System.out.println("Interpreting message:" + tmessage); System.out.println("Interpreting message:" + tmessage);
int numberToSend = Integer.parseInt(message[1]); int numberToSend = Integer.parseInt(message[1]);
int totalReceivers = 0;
index = 2; index = 2;
filesToSend = new HashMap<Integer,List<String>>(); filesToSend = new HashMap<Integer,List<String>>();
for(int i=0; i<numberToSend; i++) { for(int i=0; i<numberToSend; i++) {
...@@ -260,6 +271,7 @@ public class Dstore { ...@@ -260,6 +271,7 @@ public class Dstore {
index++; index++;
int numberOfReceivers = Integer.parseInt(message[index]); int numberOfReceivers = Integer.parseInt(message[index]);
totalReceivers += numberOfReceivers;
index++; index++;
for(int j=0; j<numberOfReceivers; j++) { for(int j=0; j<numberOfReceivers; j++) {
Integer receiver = Integer.parseInt(message[index]); Integer receiver = Integer.parseInt(message[index]);
...@@ -281,6 +293,7 @@ public class Dstore { ...@@ -281,6 +293,7 @@ public class Dstore {
System.out.println("Interpreting complete, will send " + numberToSend + " and remove " + numberToRemove); System.out.println("Interpreting complete, will send " + numberToSend + " and remove " + numberToRemove);
//Send each file to send to the Dstore at the specified port number //Send each file to send to the Dstore at the specified port number
CyclicBarrier barrier = new CyclicBarrier(totalReceivers + 1);
for(Integer dstore : filesToSend.keySet()) { for(Integer dstore : filesToSend.keySet()) {
for(String filename : filesToSend.get(dstore)) { for(String filename : filesToSend.get(dstore)) {
new Thread(() -> { new Thread(() -> {
...@@ -314,9 +327,13 @@ public class Dstore { ...@@ -314,9 +327,13 @@ public class Dstore {
catch(IOException e) { catch(IOException e) {
e.printStackTrace(); e.printStackTrace();
} }
finally {
try {barrier.await();} catch(Exception e) {}
}
}).start(); }).start();
} }
} }
try {barrier.await((long) timeout, TimeUnit.MILLISECONDS);} catch(Exception e) {e.printStackTrace();}
//Remove each file to remove from fileFolder //Remove each file to remove from fileFolder
for(String filename : filesToRemove) { for(String filename : filesToRemove) {
...@@ -325,8 +342,10 @@ public class Dstore { ...@@ -325,8 +342,10 @@ public class Dstore {
} }
//Send REBALANCE_COMPLETE message to client (the controller) //Send REBALANCE_COMPLETE message to client (the controller)
synchronized(controllerOut) {
controllerOut.println("REBALANCE_COMPLETE"); controllerOut.println("REBALANCE_COMPLETE");
controllerOut.flush(); controllerOut.flush();
}
System.out.println("Sent message REBALANCE_COMPLETE"); System.out.println("Sent message REBALANCE_COMPLETE");
//TO DO: WAIT FOR RESPONSES BEFORE SENDING REBALANCE_COMPLETE //TO DO: WAIT FOR RESPONSES BEFORE SENDING REBALANCE_COMPLETE
}).start(); }).start();
......
No preview for this file type
No preview for this file type
...@@ -11,20 +11,24 @@ public class DstoreConnection { ...@@ -11,20 +11,24 @@ public class DstoreConnection {
protected final int MAX_QUEUE_SIZE = 50; protected final int MAX_QUEUE_SIZE = 50;
protected Socket socket; protected Socket socket;
protected int port; //Solely used for debugging purposes
protected BufferedReader reader; protected BufferedReader reader;
protected PrintWriter writer; protected PrintWriter writer;
protected boolean available; protected boolean available;
protected List<String> queue; protected List<String> queue;
protected int timeout; protected int timeout;
protected DstoreDisconnectException disconnectException;
public DstoreConnection(Socket socket, int timeout) { public DstoreConnection(Socket socket, int port, int timeout) {
this.socket = socket; this.socket = socket;
this.port = port;
this.timeout = timeout; this.timeout = timeout;
try { try {
reader = new BufferedReader(new InputStreamReader(socket.getInputStream())); reader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
writer = new PrintWriter(socket.getOutputStream()); writer = new PrintWriter(socket.getOutputStream());
available = true; available = true;
queue = new ArrayList<String>(); queue = new ArrayList<String>();
disconnectException = new DstoreDisconnectException(port);
} }
catch(IOException e) { catch(IOException e) {
e.printStackTrace(); e.printStackTrace();
...@@ -44,13 +48,13 @@ public class DstoreConnection { ...@@ -44,13 +48,13 @@ public class DstoreConnection {
if(!available) return "ERROR"; if(!available) return "ERROR";
writer.println(message); writer.println(message);
writer.flush(); writer.flush();
System.out.println("Controller sent " + message); System.out.println("Controller sent " + message + " to port " + port);
return localReceive(expectedMessage); return localReceive(expectedMessage);
} }
catch(NullPointerException e) { catch(NullPointerException e) {
System.out.println("Dstore disconnected"); System.out.println("Dstore at port " + port + " disconnected");
available = false; available = false;
throw new DstoreDisconnectException(); throw disconnectException;
} }
} }
} }
...@@ -91,7 +95,7 @@ public class DstoreConnection { ...@@ -91,7 +95,7 @@ public class DstoreConnection {
rc.end(); rc.end();
} }
String returnMessage = rc.getReturnMessage(); String returnMessage = rc.getReturnMessage();
if(returnMessage == null) throw new DstoreDisconnectException(); if(returnMessage == null) throw disconnectException;
else return returnMessage; else return returnMessage;
} }
catch(InterruptedException e) { catch(InterruptedException e) {
...@@ -151,9 +155,8 @@ public class DstoreConnection { ...@@ -151,9 +155,8 @@ public class DstoreConnection {
do { do {
message = reader.readLine(); message = reader.readLine();
if(message == null) { if(message == null) {
System.out.println("Dstore disconnected");
available = false; available = false;
throw new DstoreDisconnectException(); throw disconnectException;
} }
if(expectedMessage != null && !expectedMessage.equals(message.split(" ")[0])) { if(expectedMessage != null && !expectedMessage.equals(message.split(" ")[0])) {
queue.add(message); queue.add(message);
...@@ -162,7 +165,7 @@ public class DstoreConnection { ...@@ -162,7 +165,7 @@ public class DstoreConnection {
} }
} }
while(message == null); while(message == null);
System.out.println("Controller received " + message); System.out.println("Controller received " + message + " from port " + port);
returnMessage = message; returnMessage = message;
} }
catch(IOException e) { catch(IOException e) {
......
No preview for this file type
import java.lang.Throwable; import java.lang.Throwable;
public class DstoreDisconnectException extends Exception { public class DstoreDisconnectException extends Exception {
public DstoreDisconnectException() { public DstoreDisconnectException(int port) {
super("Dstore has been disconnected"); super("Dstore at port " + port + " has been disconnected");
} }
} }
#!/bin/bash
java Controller 8080 $1 $3 $4 &
echo $!
for((i=1; i<=$2; i++)) do
sleep 0.2
n=$((8080+$i))
echo $n
s="store$i"
java Dstore $n 8080 $3 $s &
echo $!
done
sleep 2
java -cp .:client-1.0.0.jar ClientMain 8080 $3
Controller.java
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment