Skip to content
Snippets Groups Projects
Commit 38ae8cc5 authored by dl3g19's avatar dl3g19
Browse files

Concurrent handling implementing with rebalance period

parent dcfee31c
No related branches found
No related tags found
No related merge requests found
......@@ -100,11 +100,70 @@ public class Controller {
public long filesize;
}
//Rebalances only start when there are no executing threads
protected class RebalanceLock {
protected int processes;
protected boolean highPriorityWait;
protected CountDownLatch periodBlock;
protected Object blockLock;
public RebalanceLock() {
periodBlock = new CountDownLatch(1);
blockLock = new Object();
}
public synchronized void addProcess() throws InterruptedException {
while(highPriorityWait) {
this.wait();
}
processes ++;
}
public synchronized void removeProcess() {
processes --;
if(processes == 0) this.notifyAll();
}
public void waitForFinish() {
while(processes > 0) {
highPriorityWait = true;
try {
this.wait();
}
catch(InterruptedException e) {e.printStackTrace();}
}
highPriorityWait = false;
}
public void queueRebalance() {
synchronized(blockLock) {
periodBlock.countDown();
}
}
public boolean waitToRebalance() {
try {
boolean dstoreJoined = periodBlock.await(rebalancePeriod, TimeUnit.MILLISECONDS);
if(dstoreJoined) {
synchronized(blockLock) {
periodBlock = new CountDownLatch(1);
}
}
return dstoreJoined;
}
catch(InterruptedException e) {e.printStackTrace();}
return true;
}
}
protected Map<Integer,DstoreConnection> dstores;
protected RebalanceMessages rebalanceMessages;
protected Map<String,IndexEntry> index;
protected Map<Socket,Reloader> loadRequests;
protected RebalanceLock rebalanceLock;
public Controller(int cport, int rFactor, int timeout, int rebalancePeriod) {
this.cport = cport;
this.rFactor = rFactor;
......@@ -114,6 +173,7 @@ public class Controller {
rebalanceMessages = new RebalanceMessages();
index = Collections.synchronizedMap(new HashMap<String,IndexEntry>());
loadRequests = Collections.synchronizedMap(new HashMap<Socket,Reloader>());
rebalanceLock = new RebalanceLock();
}
public static void main(String[] args) {
......@@ -159,15 +219,17 @@ public class Controller {
if(tMessage == null) {message = new String[]{""};}
else {message = tMessage.split(" ");}
new Thread(() -> {
if(message[0].equals("JOIN")) {
int portNumber = Integer.parseInt(message[1]);
synchronized(rebalanceLock) {
dstores.put(portNumber, new DstoreConnection(client, portNumber, timeout));
System.out.println("Dstore at " + portNumber + " joined");
try {rebalanceThread.interrupt();} catch(SecurityException e) {e.printStackTrace();}
rebalanceLock.queueRebalance();
}
}
else {
System.out.println("A new client has joined");
new Thread(() -> {
try {
handleMessage(message, client);
}
......@@ -191,8 +253,8 @@ public class Controller {
while(clientMessage != null);
System.out.println("Client closed");
try {client.close();} catch(IOException e) {}
}).start();
}
}).start();
}
catch(Exception e) {
//Log error
......@@ -210,20 +272,36 @@ public class Controller {
protected class RebalanceThread implements Runnable {
public void run() {
while(true) {
try {Thread.sleep(rebalancePeriod);} catch(InterruptedException e) {
try {rebalance();} catch(Exception ee) {ee.printStackTrace();}
if(rebalanceLock.waitToRebalance()) {
try {runRebalance();} catch(Exception e) {e.printStackTrace();}
}
else {
try {
if(dstores.size() >= rFactor) {
rebalance();
runRebalance();
}
}
catch(Exception e) {e.printStackTrace();}
}
}
}
protected void runRebalance() {
synchronized(rebalanceLock) {
try {
rebalanceLock.waitForFinish();
rebalance();
}
catch(Exception e) {e.printStackTrace();}
}
}
}
void handleMessage(String[] message, Socket client) throws Exception {
try {
rebalanceLock.addProcess();
try {
if(dstores.size() < rFactor) {
PrintWriter out = new PrintWriter(client.getOutputStream());
out.println("ERROR_NOT_ENOUGH_DSTORES");
......@@ -249,6 +327,13 @@ public class Controller {
System.out.println("Malformed message received by Controller");
}
}
catch(Exception e) {e.printStackTrace();}
finally {
rebalanceLock.removeProcess();
}
}
catch(InterruptedException e) {e.printStackTrace();}
}
void store(Socket client, String filename, String filesizeString) throws Exception {
long filesize = -1;
......@@ -449,7 +534,7 @@ public class Controller {
entry.status = IndexEntry.Status.REMOVE_IN_PROGRESS;
//Send REMOVE message to all Dstores storing the file
CyclicBarrier barrier = new CyclicBarrier(entry.getStoredBy().size());
CyclicBarrier barrier = new CyclicBarrier(entry.getStoredBy().size() + 1);
for(Integer dstore : entry.getStoredBy()) {
new Thread(() -> {
try {
......@@ -505,7 +590,7 @@ public class Controller {
//Update index to "remove complete"
entry.status = IndexEntry.Status.REMOVE_COMPLETE;
synchronized(index) {
if(index.containsKey(filename)) index.remove(filename);
if(index.get(filename) == entry) index.remove(filename);
}
//Send REMOVE_COMPLETE to client
......@@ -543,7 +628,8 @@ public class Controller {
synchronized(rebalanceMessages) {
if(rebalanceMessages.dstoreFiles != null) return;
rebalanceMessages.dstoreFiles = dstoreFiles;
CyclicBarrier barrier = new CyclicBarrier(dstores.size());
}
CyclicBarrier barrier = new CyclicBarrier(dstores.size() + 1);
try {
//Send LIST message to each Dstore and receive their file list
for(Integer dstore : dstores.keySet()) {
......@@ -567,7 +653,10 @@ public class Controller {
System.out.println("Not all file lists have been received");
}
catch(Exception e) {e.printStackTrace();}
synchronized(rebalanceMessages) {
rebalanceMessages.dstoreFiles = null;
}
/*
if(dstoreFiles.size() < dstores.size()) {
//Log error
......@@ -626,7 +715,7 @@ public class Controller {
}
//For each dstore, compile a new message based on the contents of the indexes then send this message and receive a reply
CyclicBarrier barrier2 = new CyclicBarrier(dstoreFiles.size());
CyclicBarrier barrier2 = new CyclicBarrier(dstoreFiles.size() + 1);
for(Integer thisStore : dstoreFiles.keySet()) {
//Compose files to send
List<String> sendMessages = new ArrayList<String>();
......@@ -740,14 +829,15 @@ public class Controller {
e.printStackTrace();
}
finally {
rebalanceMessages.dstoreFiles = null;
synchronized(rebalanceMessages) {
if(rebalanceMessages.dstoreFiles != null) rebalanceMessages.dstoreFiles = null;
}
System.out.println("There are " + dstores.size() + " dstores connected");
for(String i : index.keySet()) {
System.out.print(i);
}
System.out.print("\n");
}
}
if(!success) rebalance();
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment