Skip to content
Snippets Groups Projects
Commit 5999e7b2 authored by dl3g19's avatar dl3g19
Browse files

Minor changes made to adapt to new spec points and failures found

parent 8f283439
No related branches found
No related tags found
No related merge requests found
...@@ -192,12 +192,7 @@ public class Controller { ...@@ -192,12 +192,7 @@ public class Controller {
protected class RebalanceThread implements Runnable { protected class RebalanceThread implements Runnable {
public void run() { public void run() {
while(true) { while(true) {
if(rebalanceLock.waitToRebalance()) { rebalanceLock.waitToRebalance();
//Another dstore joined, it requested a rebalance
try {runRebalance();} catch(Exception e) {e.printStackTrace();}
}
else {
//Timeout occured, i.e. rebalancePeriod has passed since the last rebalance
try { try {
if(dstores.size() >= rFactor) { if(dstores.size() >= rFactor) {
runRebalance(); runRebalance();
...@@ -206,7 +201,6 @@ public class Controller { ...@@ -206,7 +201,6 @@ public class Controller {
catch(Exception e) {e.printStackTrace();} catch(Exception e) {e.printStackTrace();}
} }
} }
}
protected void runRebalance() { protected void runRebalance() {
synchronized(rebalanceLock) { synchronized(rebalanceLock) {
...@@ -312,12 +306,13 @@ public class Controller { ...@@ -312,12 +306,13 @@ public class Controller {
String message = Protocol.STORE_TO_TOKEN; String message = Protocol.STORE_TO_TOKEN;
for(Integer thisStore : storesToStore) { for(Integer thisStore : storesToStore) {
message = message + " " + thisStore.intValue(); message = message + " " + thisStore.intValue();
IndexEntry entryf = entry;
new Thread(() -> { new Thread(() -> {
try { try {
String[] receivedMessage = dstores.get(thisStore).receive(Protocol.STORE_ACK_TOKEN).split(" "); String[] receivedMessage = dstores.get(thisStore).receive(Protocol.STORE_ACK_TOKEN + " " + filename).split(" ");
if(receivedMessage[0].equals(Protocol.STORE_ACK_TOKEN)) { if(receivedMessage[0].equals(Protocol.STORE_ACK_TOKEN)) {
try { try {
storeAck(thisStore, receivedMessage[1], latch); storeAck(thisStore, entryf, latch);
} }
catch(Exception e) { catch(Exception e) {
//Log error //Log error
...@@ -334,6 +329,9 @@ public class Controller { ...@@ -334,6 +329,9 @@ public class Controller {
e.printStackTrace(); e.printStackTrace();
removeDstore(e); removeDstore(e);
} }
catch(DeadStoreException e) {
System.err.println("Store for " + filename + " failed due to dead dstore");
}
}).start(); }).start();
} }
out.println(message); out.println(message);
...@@ -365,14 +363,8 @@ public class Controller { ...@@ -365,14 +363,8 @@ public class Controller {
} }
} }
void storeAck(Integer port, String filename, CountDownLatch latch) throws Exception { void storeAck(Integer port, IndexEntry entry, CountDownLatch latch) throws Exception {
if(!index.containsKey(filename)) { entry.addStoredBy(port);
//Throw logging exception
throw new Exception("Index does not contain " + filename);
}
IndexEntry thisEntry = index.get(filename);
thisEntry.addStoredBy(port);
latch.countDown(); latch.countDown();
} }
...@@ -409,9 +401,13 @@ public class Controller { ...@@ -409,9 +401,13 @@ public class Controller {
void sendLoadFrom(Socket client, String filename) { void sendLoadFrom(Socket client, String filename) {
try { try {
PrintWriter out = new PrintWriter(client.getOutputStream()); PrintWriter out = new PrintWriter(client.getOutputStream());
String message;
if(!index.containsKey(filename) || index.get(filename).getStatus() != IndexEntry.Status.STORE_COMPLETE) {
message = Protocol.ERROR_FILE_DOES_NOT_EXIST_TOKEN;
}
else {
Reloader storedBy = loadRequests.get(client); Reloader storedBy = loadRequests.get(client);
System.out.println("Load requested for file " + filename + ", there are " + storedBy.size() + " dstores to select from"); System.out.println("Load requested for file " + filename + ", there are " + storedBy.size() + " dstores to select from");
String message;
if(storedBy.isEmpty()) { if(storedBy.isEmpty()) {
message = Protocol.ERROR_LOAD_TOKEN; message = Protocol.ERROR_LOAD_TOKEN;
} }
...@@ -420,6 +416,7 @@ public class Controller { ...@@ -420,6 +416,7 @@ public class Controller {
storedBy.remove(thisStore); storedBy.remove(thisStore);
message = Protocol.LOAD_FROM_TOKEN + thisStore + " " + storedBy.filesize; message = Protocol.LOAD_FROM_TOKEN + thisStore + " " + storedBy.filesize;
} }
}
out.println(message); out.println(message);
out.flush(); out.flush();
messageSent(client, message); messageSent(client, message);
...@@ -464,7 +461,7 @@ public class Controller { ...@@ -464,7 +461,7 @@ public class Controller {
Integer dstore = it.next(); Integer dstore = it.next();
new Thread(() -> { new Thread(() -> {
try { try {
String[] message = dstores.get(dstore).sendAndReceive(Protocol.REMOVE_TOKEN + " " + filename, Protocol.REMOVE_ACK_TOKEN, Protocol.ERROR_FILE_DOES_NOT_EXIST_TOKEN).split(" "); String[] message = dstores.get(dstore).sendAndReceive(Protocol.REMOVE_TOKEN + " " + filename, Protocol.REMOVE_ACK_TOKEN + " " + filename, Protocol.ERROR_FILE_DOES_NOT_EXIST_TOKEN).split(" ");
if((message[0].equals(Protocol.REMOVE_ACK_TOKEN) && message[1].equals(filename)) || message[0].equals(Protocol.ERROR_FILE_DOES_NOT_EXIST_TOKEN)) { if((message[0].equals(Protocol.REMOVE_ACK_TOKEN) && message[1].equals(filename)) || message[0].equals(Protocol.ERROR_FILE_DOES_NOT_EXIST_TOKEN)) {
entry.removeStoredBy(dstore.intValue()); entry.removeStoredBy(dstore.intValue());
latch.countDown(); latch.countDown();
...@@ -478,15 +475,14 @@ public class Controller { ...@@ -478,15 +475,14 @@ public class Controller {
e.printStackTrace(); e.printStackTrace();
removeDstore(e); removeDstore(e);
} }
catch(DeadStoreException e) {
System.err.println("Remove for " + filename + " failed due to dead dstore");
}
}).start(); }).start();
} }
//Wait for REMOVE_ACKs from all Dstores which were sent the REMOVE message //Wait for REMOVE_ACKs from all Dstores which were sent the REMOVE message
if(!latch.await(timeout, TimeUnit.MILLISECONDS)) { if(latch.await(timeout, TimeUnit.MILLISECONDS)) {
//Log error
System.err.println("Not all REMOVE_ACKs have been received");
}
//Update index to "remove complete" //Update index to "remove complete"
entry.setStatus(IndexEntry.Status.REMOVE_COMPLETE); entry.setStatus(IndexEntry.Status.REMOVE_COMPLETE);
synchronized(index) { synchronized(index) {
...@@ -499,6 +495,11 @@ public class Controller { ...@@ -499,6 +495,11 @@ public class Controller {
clientOut.flush(); clientOut.flush();
messageSent(client, Protocol.REMOVE_COMPLETE_TOKEN); messageSent(client, Protocol.REMOVE_COMPLETE_TOKEN);
} }
else {
//Log error
System.err.println("Not all REMOVE_ACKs have been received");
}
}
catch(IOException e) { catch(IOException e) {
e.printStackTrace(); e.printStackTrace();
} }
...@@ -549,6 +550,7 @@ public class Controller { ...@@ -549,6 +550,7 @@ public class Controller {
removeDstore(e); removeDstore(e);
listLatch.countDown(); listLatch.countDown();
} }
catch(DeadStoreException e) {}
}); });
thisThread.start(); thisThread.start();
activeThreads.add(thisThread); activeThreads.add(thisThread);
...@@ -572,7 +574,7 @@ public class Controller { ...@@ -572,7 +574,7 @@ public class Controller {
} }
catch(Exception e) {e.printStackTrace();} catch(Exception e) {e.printStackTrace();}
if(dstoreFiles.isEmpty()) throw new Exception("All dstores have been disconnected!"); if(dstoreFiles.size() < rFactor) throw new Exception("Less than R dstores connected; connections may be faulty or timeout may be too strict");
Map<Integer,List<String>> newAlloc = allocate(dstoreFiles); Map<Integer,List<String>> newAlloc = allocate(dstoreFiles);
Map<Integer,String> sendIndex = composeRebalanceMessages(dstoreFiles, newAlloc); Map<Integer,String> sendIndex = composeRebalanceMessages(dstoreFiles, newAlloc);
...@@ -644,7 +646,14 @@ public class Controller { ...@@ -644,7 +646,14 @@ public class Controller {
for(Integer i : oldDstoreFiles.keySet()) { for(Integer i : oldDstoreFiles.keySet()) {
List<String> files = new ArrayList<String>(); List<String> files = new ArrayList<String>();
for(String s : oldDstoreFiles.get(i)) { for(String s : oldDstoreFiles.get(i)) {
if(index.containsKey(s)) files.add(s); if(index.containsKey(s)) {
if(index.get(s).getStatus() == IndexEntry.Status.STORE_COMPLETE) {
files.add(s);
}
else {
index.remove(s);
}
}
if(!availableFiles.contains(s)) availableFiles.add(s); if(!availableFiles.contains(s)) availableFiles.add(s);
} }
dstoreFiles.put(i, files); dstoreFiles.put(i, files);
......
import java.lang.Throwable;
import java.net.Socket;
public class DeadStoreException extends Exception {
DstoreConnection connection;
public DeadStoreException(DstoreConnection connection) {
super("Dstore at port " + connection.getPort() + " is unavailable");
this.connection = connection;
}
public DstoreConnection getConnection() {
return connection;
}
}
...@@ -335,7 +335,7 @@ public class Dstore { ...@@ -335,7 +335,7 @@ public class Dstore {
messageReceived(socket, receivedMessage); messageReceived(socket, receivedMessage);
if(!receivedMessage.equals(Protocol.ACK_TOKEN)) { if(!receivedMessage.equals(Protocol.ACK_TOKEN)) {
//Log error //Log error
System.out.println("Dstore " + dstore + " should have sent ACK but " + port + " received " + receivedMessage); System.err.println("Dstore " + dstore + " should have sent ACK but " + port + " received " + receivedMessage);
} }
byte[] content = new byte[BUFFER_SIZE]; byte[] content = new byte[BUFFER_SIZE];
...@@ -378,7 +378,6 @@ public class Dstore { ...@@ -378,7 +378,6 @@ public class Dstore {
controllerOut.println(Protocol.REBALANCE_COMPLETE_TOKEN); controllerOut.println(Protocol.REBALANCE_COMPLETE_TOKEN);
messageSent(controllerSocket, Protocol.REBALANCE_COMPLETE_TOKEN); messageSent(controllerSocket, Protocol.REBALANCE_COMPLETE_TOKEN);
} }
System.out.println("Sent message REBALANCE_COMPLETE");
}).start(); }).start();
} }
......
...@@ -52,12 +52,16 @@ public class DstoreConnection { ...@@ -52,12 +52,16 @@ public class DstoreConnection {
return new DstoreDisconnectException(this); return new DstoreDisconnectException(this);
} }
public String sendAndReceive(String message, String... expectedMessages) throws DstoreDisconnectException { public void checkAvailable() throws DeadStoreException {
if(!available) throw new DeadStoreException(this);
}
public String sendAndReceive(String message, String... expectedMessages) throws DstoreDisconnectException, DeadStoreException {
System.out.println("Getting lock..."); System.out.println("Getting lock...");
synchronized(this) { synchronized(this) {
try { try {
System.out.println("Lock acquired"); System.out.println("Lock acquired");
if(!available) throw getDisconnectData(); checkAvailable();
writer.println(message); writer.println(message);
writer.flush(); writer.flush();
//System.out.println("Controller sent " + message + " to port " + port); //System.out.println("Controller sent " + message + " to port " + port);
...@@ -72,7 +76,7 @@ public class DstoreConnection { ...@@ -72,7 +76,7 @@ public class DstoreConnection {
} }
} }
public String receive(String... expectedMessages) throws DstoreDisconnectException { public String receive(String... expectedMessages) throws DstoreDisconnectException, DeadStoreException {
String findMessage = checkQueue(expectedMessages); String findMessage = checkQueue(expectedMessages);
if(findMessage != null) { if(findMessage != null) {
return findMessage; return findMessage;
...@@ -81,7 +85,7 @@ public class DstoreConnection { ...@@ -81,7 +85,7 @@ public class DstoreConnection {
System.out.println("Getting lock..."); System.out.println("Getting lock...");
synchronized(this) { synchronized(this) {
System.out.println("Lock acquired"); System.out.println("Lock acquired");
if(!available) throw getDisconnectData(); checkAvailable();
//Check the queue twice: once incase the receiver is busy, twice incase the message was added by the last thread //Check the queue twice: once incase the receiver is busy, twice incase the message was added by the last thread
findMessage = checkQueue(expectedMessages); findMessage = checkQueue(expectedMessages);
...@@ -130,36 +134,12 @@ public class DstoreConnection { ...@@ -130,36 +134,12 @@ public class DstoreConnection {
e.printStackTrace(); e.printStackTrace();
} }
/*
String returnMessage = null;
do {
returnMessage = reader.readLine();
if(returnMessage == null) {
System.out.println("Dstore disconnected");
available = false;
throw new DstoreDisconnectException();
}
if(expectedMessage != null && !expectedMessage.equals(returnMessage.split(" ")[0])) {
queue.add(returnMessage);
if(queue.size() > MAX_QUEUE_SIZE) queue.remove(0);
returnMessage = null;
}
}
while(returnMessage == null);
System.out.println("Controller received " + returnMessage);
return returnMessage;
}
catch(IOException e) {
e.printStackTrace();
return "";
}
*/
return ""; return "";
} }
protected boolean isExpected(String message, String[] expectedMessages) { protected boolean isExpected(String message, String[] expectedMessages) {
for(String s : expectedMessages) { for(String s : expectedMessages) {
if(s.equals(message.split(" ")[0])) return true; if(s.equals(message)) return true;
} }
return false; return false;
} }
......
This diff is collapsed.
File deleted
File deleted
File deleted
File deleted
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment