Skip to content
Snippets Groups Projects
Commit 0ef6065f authored by dl3g19's avatar dl3g19
Browse files

Errors fixed in rebalancing and store/load buffer has better implementation + made larger

parent a471f8d3
Branches
No related tags found
No related merge requests found
...@@ -42,7 +42,6 @@ public class Controller { ...@@ -42,7 +42,6 @@ public class Controller {
public IndexEntry() { public IndexEntry() {
filesize = -1; filesize = -1;
storedBy = Collections.synchronizedList(new ArrayList<Integer>()); storedBy = Collections.synchronizedList(new ArrayList<Integer>());
numberToStore = 0;
status = Status.STORE_IN_PROGRESS; status = Status.STORE_IN_PROGRESS;
} }
...@@ -55,42 +54,17 @@ public class Controller { ...@@ -55,42 +54,17 @@ public class Controller {
} }
public synchronized void addStoredBy(int dstore) { public synchronized void addStoredBy(int dstore) {
addStoredBy(dstore, true);
}
public synchronized void addStoredBy(int dstore, boolean notify) {
storedBy.add(Integer.valueOf(dstore)); storedBy.add(Integer.valueOf(dstore));
if(storedBy.size() >= numberToStore && notify) notifyAll();
}
public synchronized void addStoredBy(List<Integer> dstores) {
storedBy.addAll(dstores);
if(storedBy.size() >= numberToStore) notifyAll();
}
public synchronized void newAllocation(List<Integer> dstores) {
storedBy.clear();
storedBy.addAll(dstores);
} }
public synchronized void removeStoredBy(int dstore) { public synchronized void removeStoredBy(int dstore) {
storedBy.remove(Integer.valueOf(dstore)); storedBy.remove(Integer.valueOf(dstore));
if(storedBy.isEmpty()) notifyAll();
}
public synchronized void removeStoredBy(List<Integer> dstores) {
storedBy.removeAll(dstores);
if(storedBy.isEmpty()) notifyAll();
} }
public List<Integer> getStoredBy() { public List<Integer> getStoredBy() {
return storedBy; return storedBy;
} }
public synchronized void setNumberToStore(int i) {
numberToStore = i;
}
public synchronized void setStatus(Status status) { public synchronized void setStatus(Status status) {
this.status = status; this.status = status;
} }
...@@ -100,17 +74,13 @@ public class Controller { ...@@ -100,17 +74,13 @@ public class Controller {
} }
} }
protected class RebalanceMessages {
public Map<Integer,List<String>> dstoreFiles;
public RebalanceMessages() {dstoreFiles = null;}
}
protected class Reloader extends ArrayList<Integer> { protected class Reloader extends ArrayList<Integer> {
public long filesize; public long filesize;
} }
protected class InvalidStatusException extends Exception {}
protected Map<Integer,DstoreConnection> dstores; protected Map<Integer,DstoreConnection> dstores;
protected RebalanceMessages rebalanceMessages;
protected Map<String,IndexEntry> index; protected Map<String,IndexEntry> index;
protected Map<Socket,Reloader> loadRequests; protected Map<Socket,Reloader> loadRequests;
...@@ -122,7 +92,6 @@ public class Controller { ...@@ -122,7 +92,6 @@ public class Controller {
this.timeout = timeout; this.timeout = timeout;
this.rebalancePeriod = rebalancePeriod; this.rebalancePeriod = rebalancePeriod;
dstores = Collections.synchronizedMap(new HashMap<Integer,DstoreConnection>()); dstores = Collections.synchronizedMap(new HashMap<Integer,DstoreConnection>());
rebalanceMessages = new RebalanceMessages();
index = Collections.synchronizedMap(new HashMap<String,IndexEntry>()); index = Collections.synchronizedMap(new HashMap<String,IndexEntry>());
loadRequests = Collections.synchronizedMap(new HashMap<Socket,Reloader>()); loadRequests = Collections.synchronizedMap(new HashMap<Socket,Reloader>());
rebalanceLock = new RebalanceLock(rebalancePeriod); rebalanceLock = new RebalanceLock(rebalancePeriod);
...@@ -312,26 +281,29 @@ public class Controller { ...@@ -312,26 +281,29 @@ public class Controller {
try { try {
//Create a new entry in the index //Create a new entry in the index
IndexEntry entry; IndexEntry entry;
try {
synchronized(index) { synchronized(index) {
if(!index.containsKey(filename)) { if(index.containsKey(filename)) {
entry = new IndexEntry();
}
else {
entry = index.get(filename); entry = index.get(filename);
if(entry.getStatus() == IndexEntry.Status.REMOVE_IN_PROGRESS || entry.getStatus() == IndexEntry.Status.REMOVE_COMPLETE) { if(entry.getStatus() == IndexEntry.Status.REMOVE_IN_PROGRESS || entry.getStatus() == IndexEntry.Status.REMOVE_COMPLETE) {
index.remove(filename); index.remove(filename);
entry = new IndexEntry();
} }
else { else {
throw new InvalidStatusException();
}
}
entry = new IndexEntry();
index.put(filename, entry);
}
}
catch(InvalidStatusException e) {
PrintWriter out = new PrintWriter(client.getOutputStream()); PrintWriter out = new PrintWriter(client.getOutputStream());
out.println(Protocol.ERROR_FILE_ALREADY_EXISTS_TOKEN); out.println(Protocol.ERROR_FILE_ALREADY_EXISTS_TOKEN);
out.flush(); out.flush();
messageSent(client, Protocol.ERROR_FILE_ALREADY_EXISTS_TOKEN); messageSent(client, Protocol.ERROR_FILE_ALREADY_EXISTS_TOKEN);
return; return;
} }
}
index.put(filename, entry);
}
entry.setFilesize(filesize); entry.setFilesize(filesize);
//Select Dstores //Select Dstores
...@@ -339,7 +311,6 @@ public class Controller { ...@@ -339,7 +311,6 @@ public class Controller {
for(int i=0; i<rFactor; i++) { for(int i=0; i<rFactor; i++) {
storesToStore[i] = nextStoreInSequence(); storesToStore[i] = nextStoreInSequence();
} }
entry.setNumberToStore(rFactor);
//Send STORE_TO message //Send STORE_TO message
CountDownLatch latch = new CountDownLatch(rFactor); CountDownLatch latch = new CountDownLatch(rFactor);
...@@ -372,6 +343,7 @@ public class Controller { ...@@ -372,6 +343,7 @@ public class Controller {
} }
out.println(message); out.println(message);
out.flush(); out.flush();
messageSent(client, message);
//Wait for STORE_ACKs from datastores in storesToStore //Wait for STORE_ACKs from datastores in storesToStore
if(latch.await(timeout, TimeUnit.MILLISECONDS)) { if(latch.await(timeout, TimeUnit.MILLISECONDS)) {
...@@ -464,7 +436,19 @@ public class Controller { ...@@ -464,7 +436,19 @@ public class Controller {
void remove(Socket client, String filename) throws Exception { void remove(Socket client, String filename) throws Exception {
try { try {
if(!index.containsKey(filename) || index.get(filename).getStatus() != IndexEntry.Status.STORE_COMPLETE) { IndexEntry entry;
try {
synchronized(index) {
entry = index.get(filename);
if(entry == null || entry.getStatus() != IndexEntry.Status.STORE_COMPLETE) {
throw new InvalidStatusException();
}
//Update index to "remove in progress"
entry.setStatus(IndexEntry.Status.REMOVE_IN_PROGRESS);
}
}
catch(InvalidStatusException e) {
PrintWriter clientOut = new PrintWriter(client.getOutputStream()); PrintWriter clientOut = new PrintWriter(client.getOutputStream());
clientOut.println(Protocol.ERROR_FILE_DOES_NOT_EXIST_TOKEN); clientOut.println(Protocol.ERROR_FILE_DOES_NOT_EXIST_TOKEN);
clientOut.flush(); clientOut.flush();
...@@ -472,10 +456,6 @@ public class Controller { ...@@ -472,10 +456,6 @@ public class Controller {
return; return;
} }
//Update index to "remove in progress"
IndexEntry entry = index.get(filename);
entry.setStatus(IndexEntry.Status.REMOVE_IN_PROGRESS);
//Send REMOVE message to all Dstores storing the file //Send REMOVE message to all Dstores storing the file
CountDownLatch latch = new CountDownLatch(entry.getStoredBy().size()); CountDownLatch latch = new CountDownLatch(entry.getStoredBy().size());
Iterator<Integer> it = entry.getStoredBy().iterator(); Iterator<Integer> it = entry.getStoredBy().iterator();
...@@ -544,31 +524,33 @@ public class Controller { ...@@ -544,31 +524,33 @@ public class Controller {
void rebalance() throws Exception { void rebalance() throws Exception {
Map<Integer,List<String>> dstoreFiles = new HashMap<Integer,List<String>>(); Map<Integer,List<String>> dstoreFiles = new HashMap<Integer,List<String>>();
synchronized(rebalanceMessages) {
if(rebalanceMessages.dstoreFiles != null) return;
rebalanceMessages.dstoreFiles = dstoreFiles;
}
CountDownLatch listLatch = new CountDownLatch(dstores.size()); CountDownLatch listLatch = new CountDownLatch(dstores.size());
try { try {
//Send LIST message to each Dstore and receive their file list //Send LIST message to each Dstore and receive their file list
List<Thread> activeThreads = new ArrayList<Thread>();
for(Integer dstore : dstores.keySet()) { for(Integer dstore : dstores.keySet()) {
new Thread(() -> { Thread thisThread = new Thread(() -> {
try { try {
String[] message = dstores.get(dstore).sendAndReceive(Protocol.LIST_TOKEN).split(" "); String[] message = dstores.get(dstore).sendAndReceive(Protocol.LIST_TOKEN).split(" ");
receiveDstoreList(dstore.intValue(), message, listLatch); receiveDstoreList(dstore.intValue(), message, dstoreFiles, listLatch);
} }
catch(DstoreDisconnectException e) { catch(DstoreDisconnectException e) {
e.printStackTrace(); e.printStackTrace();
removeDstore(e); removeDstore(e);
} }
}).start(); });
thisThread.start();
activeThreads.add(thisThread);
} }
try { try {
if(!listLatch.await(timeout, TimeUnit.MILLISECONDS)) { if(!listLatch.await(timeout, TimeUnit.MILLISECONDS)) {
//Log error //Log error
System.err.println("Not all file lists have been received"); System.err.println("Not all file lists have been received");
for(Thread t : activeThreads) {
t.interrupt();
}
synchronized(dstoreFiles) {
Set<Integer> storesToRemove = new HashSet<Integer>(dstores.keySet()); Set<Integer> storesToRemove = new HashSet<Integer>(dstores.keySet());
storesToRemove.removeAll(dstoreFiles.keySet()); storesToRemove.removeAll(dstoreFiles.keySet());
for(Integer dstore : storesToRemove) { for(Integer dstore : storesToRemove) {
...@@ -576,17 +558,8 @@ public class Controller { ...@@ -576,17 +558,8 @@ public class Controller {
} }
} }
} }
catch(Exception e) {e.printStackTrace();}
synchronized(rebalanceMessages) {
rebalanceMessages.dstoreFiles = null;
}
/*
if(dstoreFiles.size() < dstores.size()) {
//Log error
System.out.println("Not all file lists have been received");
} }
*/ catch(Exception e) {e.printStackTrace();}
Map<Integer,List<String>> newAlloc = allocate(dstoreFiles); Map<Integer,List<String>> newAlloc = allocate(dstoreFiles);
Map<Integer,String> sendIndex = composeRebalanceMessages(dstoreFiles, newAlloc); Map<Integer,String> sendIndex = composeRebalanceMessages(dstoreFiles, newAlloc);
...@@ -611,122 +584,6 @@ public class Controller { ...@@ -611,122 +584,6 @@ public class Controller {
}).start(); }).start();
} }
/*
//Create a new file allocation so that:
//Each file appears rFactor times
//Each file appears at most once on each datastore
//Files are evenly distributed (Dstores differ in capacity by at most 1, no 2 datastores have identical file lists)
//First, compile all the received files from all dstores into one list
List<String> fileList = new ArrayList<String>();
for(List<String> l : dstoreFiles.values()) {
for(String s : l) {
if(!fileList.contains(s)) {
fileList.add(s);
}
}
}
//Insert files into the new indexes. These are allocated according to the new store order
List<Integer> storeOrder;
Iterator<Integer> it;
for(String file : fileList) {
storeOrder = reshuffle(dstoreFiles.keySet());
it = storeOrder.iterator();
if(index.containsKey(file)) {
for(int j=0; j<rFactor; j++) {
//If indexed dstore does not have the file, add it to its requireIndex entry
Integer thisStore = it.next();
if(!dstoreFiles.get(thisStore).contains(file)) {
requireIndex.get(thisStore).add(new RequireHandle(file));
}
}
}
//Dstores not chosen in the above loop must have an entry added to removeIndex, if they have the file
//This also covers files which were missed in a previous remove operation
while(it.hasNext()) {
Integer thisStore = it.next();
if(dstoreFiles.get(thisStore).contains(file)) {
removeIndex.get(thisStore).add(file);
}
}
}
//For each dstore, compile a new message based on the contents of the indexes then send this message and receive a reply
CountDownLatch latch = new CountDownLatch(dstoreFiles.size());
for(Integer thisStore : dstoreFiles.keySet()) {
//Compose files to send
List<String> sendMessages = new ArrayList<String>();
for(String file : dstoreFiles.get(thisStore)) {
//All files required by other dstores are getting sent = no need for the following computation
if(allHandled(requireIndex)) break;
String fileMessage = "";
for(Integer otherStore : requireIndex.keySet()) {
if(thisStore.equals(otherStore)) continue;
for(RequireHandle otherHandle : requireIndex.get(otherStore)) {
if(file.equals(otherHandle.filename)) {
if(!otherHandle.handled) {
//Another store requires a file that this store has - send it there
otherHandle.handled = true;
fileMessage = fileMessage + " " + otherStore.toString();
}
break;
}
}
}
if(fileMessage.equals("")) continue; //No files to send
fileMessage = file + " " + (fileMessage.trim().split(" ").length) + fileMessage;
sendMessages.add(fileMessage);
}
//Don't need to send a rebalance message if there is nothing to update
List<String> thisRemove = removeIndex.get(thisStore);
if(sendMessages.isEmpty() && thisRemove.isEmpty()) {
latch.countDown();
return;
}
String message = Protocol.REBALANCE_TOKEN + " " + sendMessages.size();
for(String s : sendMessages) {
message = message + " " + s;
}
//Compose files to remove
message = message + " " + thisRemove.size();
for(String f : thisRemove) {
message = message + " " + f;
}
//Send message to the Dstore
String finalMessage = message;
new Thread(() -> {
try {
DstoreConnection connection = dstores.get(thisStore);
String returnMessage = connection.sendAndReceive(finalMessage, Protocol.REBALANCE_COMPLETE_TOKEN);
if(!returnMessage.equals(Protocol.REBALANCE_COMPLETE_TOKEN)) {
//Log error
System.out.println("Dstore " + thisStore + " should have sent REBALANCE_COMPLETE but Controller received " + returnMessage);
}
try {latch.countDown();} catch(Exception e) {e.printStackTrace();}
for(int i=0; i<requireIndex.get(thisStore).size(); i++) {
returnMessage = connection.receive(Protocol.STORE_ACK_TOKEN);
if(!returnMessage.split(" ")[0].equals(Protocol.STORE_ACK_TOKEN)) {
//Log error
System.out.println("Dstore " + thisStore + " should have sent STORE_ACK but Controller received " + returnMessage);
}
}
}
catch(DstoreDisconnectException e) {
e.printStackTrace();
removeDstore(e);
}
}).start();
}
*/
//Wait for REBALANCE_COMPLETE from all Dstores //Wait for REBALANCE_COMPLETE from all Dstores
try { try {
if(!latch.await(timeout, TimeUnit.MILLISECONDS)) { if(!latch.await(timeout, TimeUnit.MILLISECONDS)) {
...@@ -740,9 +597,6 @@ public class Controller { ...@@ -740,9 +597,6 @@ public class Controller {
e.printStackTrace(); e.printStackTrace();
} }
finally { finally {
synchronized(rebalanceMessages) {
if(rebalanceMessages.dstoreFiles != null) rebalanceMessages.dstoreFiles = null;
}
System.out.println("There are " + dstores.size() + " dstores connected"); System.out.println("There are " + dstores.size() + " dstores connected");
for(String i : index.keySet()) { for(String i : index.keySet()) {
System.out.print(i); System.out.print(i);
...@@ -751,7 +605,7 @@ public class Controller { ...@@ -751,7 +605,7 @@ public class Controller {
} }
} }
void receiveDstoreList(int port, String[] list, CountDownLatch latch) { void receiveDstoreList(int port, String[] list, Map<Integer,List<String>> dstoreFiles, CountDownLatch latch) {
List<String> toList = new ArrayList<String>(); List<String> toList = new ArrayList<String>();
if(!list[0].equals("ERROR_EMPTY")) { if(!list[0].equals("ERROR_EMPTY")) {
for(String file : list) { for(String file : list) {
...@@ -759,10 +613,8 @@ public class Controller { ...@@ -759,10 +613,8 @@ public class Controller {
} }
} }
synchronized(rebalanceMessages) { synchronized(dstoreFiles) {
if(rebalanceMessages.dstoreFiles == null) return; dstoreFiles.put(port, toList);
rebalanceMessages.dstoreFiles.put(port, toList);
} }
latch.countDown(); latch.countDown();
...@@ -784,8 +636,14 @@ public class Controller { ...@@ -784,8 +636,14 @@ public class Controller {
} }
class AllocComparator implements Comparator<Integer> { class AllocComparator implements Comparator<Integer> {
protected int m;
public AllocComparator(boolean ascending) {
if(ascending) m = 1;
else m = -1;
}
public int compare(Integer s1, Integer s2) { public int compare(Integer s1, Integer s2) {
return dstoreFiles.get(s1).size() - dstoreFiles.get(s2).size(); return dstoreFiles.get(s1).size() - m * dstoreFiles.get(s2).size();
} }
} }
...@@ -802,28 +660,45 @@ public class Controller { ...@@ -802,28 +660,45 @@ public class Controller {
} }
List<Integer> priorityList = new ArrayList<Integer>(dstoreFiles.keySet()); List<Integer> priorityList = new ArrayList<Integer>(dstoreFiles.keySet());
priorityList.sort(new AllocComparator());
Iterator<Integer> it; Iterator<Integer> it;
for(String file : counts.keySet()) { for(String file : counts.keySet()) {
if(counts.get(file) >= rFactor) continue; if(counts.get(file) > rFactor) {
System.out.println("Need to remove copies of " + file);
priorityList.sort(new AllocComparator(false));
it = priorityList.iterator();
while(counts.get(file) > rFactor && it.hasNext()) {
Integer thisStore = it.next();
if(dstoreFiles.get(thisStore).contains(file)) {
dstoreFiles.get(thisStore).remove(file);
counts.put(file, counts.get(file) - 1);
System.out.println(file + " removed from " + thisStore);
}
}
}
else if(counts.get(file) < rFactor) {
System.out.println("Need to make copies of " + file);
priorityList.sort(new AllocComparator(true));
it = priorityList.iterator(); it = priorityList.iterator();
while(counts.get(file) < rFactor && it.hasNext()) { while(counts.get(file) < rFactor && it.hasNext()) {
Integer thisStore = it.next(); Integer thisStore = it.next();
if(!dstoreFiles.get(thisStore).contains(file)) { if(!dstoreFiles.get(thisStore).contains(file)) {
dstoreFiles.get(thisStore).add(file); dstoreFiles.get(thisStore).add(file);
counts.put(file, counts.get(file) + 1); counts.put(file, counts.get(file) + 1);
System.out.println(file + " allocated to " + thisStore);
}
} }
} }
} }
priorityList.sort(new AllocComparator()); double optimumStoreAmount = ((double) rFactor * (double) counts.size()) / (double) dstoreFiles.size();
double optimumStoreAmount = (rFactor * counts.size()) / dstoreFiles.size(); priorityList.sort(new AllocComparator(true));
Integer minStore = priorityList.get(0); Integer minStore = priorityList.get(0);
Integer maxStore = priorityList.get(priorityList.size() - 1); Integer maxStore = priorityList.get(priorityList.size() - 1);
boolean giveUp = false; boolean giveUp = false;
while(dstoreFiles.get(maxStore).size() > Math.ceil(optimumStoreAmount) System.out.println(rFactor + " * " + counts.size() + " / " + dstoreFiles.size() + " = " + optimumStoreAmount);
&& dstoreFiles.get(minStore).size() < Math.floor(optimumStoreAmount) while((dstoreFiles.get(maxStore).size() > Math.ceil(optimumStoreAmount)
|| dstoreFiles.get(minStore).size() < Math.floor(optimumStoreAmount))
&& !giveUp) { && !giveUp) {
giveUp = true; giveUp = true;
...@@ -831,6 +706,8 @@ public class Controller { ...@@ -831,6 +706,8 @@ public class Controller {
while(jt.hasNext()) { while(jt.hasNext()) {
String thisFile = jt.next(); String thisFile = jt.next();
if(!dstoreFiles.get(minStore).contains(thisFile)) { if(!dstoreFiles.get(minStore).contains(thisFile)) {
//System.out.println(optimumStoreAmount);
//System.out.println("Moving " + thisFile + " from " + maxStore + "[" + dstoreFiles.get(maxStore).size() + "] to " + minStore + "[" + dstoreFiles.get(minStore).size() + "]");
dstoreFiles.get(minStore).add(thisFile); dstoreFiles.get(minStore).add(thisFile);
dstoreFiles.get(maxStore).remove(thisFile); dstoreFiles.get(maxStore).remove(thisFile);
giveUp = false; giveUp = false;
...@@ -838,9 +715,11 @@ public class Controller { ...@@ -838,9 +715,11 @@ public class Controller {
} }
} }
priorityList.sort(new AllocComparator()); priorityList.sort(new AllocComparator(true));
minStore = priorityList.get(0); minStore = priorityList.get(0);
maxStore = priorityList.get(priorityList.size() - 1); maxStore = priorityList.get(priorityList.size() - 1);
if(giveUp) System.out.println("Gave up reallocating files");
} }
return dstoreFiles; return dstoreFiles;
...@@ -873,7 +752,9 @@ public class Controller { ...@@ -873,7 +752,9 @@ public class Controller {
int filesToSend = 0; int filesToSend = 0;
List<String> oldFiles = oldAlloc.get(dstore); List<String> oldFiles = oldAlloc.get(dstore);
List<String> newFiles = newAlloc.get(dstore); List<String> newFiles = newAlloc.get(dstore);
for(String file : requireIndex.keySet()) { Iterator<String> it = requireIndex.keySet().iterator();
while(it.hasNext()) {
String file = it.next();
if(oldFiles.contains(file)) { if(oldFiles.contains(file)) {
filesToSend ++; filesToSend ++;
List<Integer> thisRequire = requireIndex.get(file); List<Integer> thisRequire = requireIndex.get(file);
...@@ -881,7 +762,7 @@ public class Controller { ...@@ -881,7 +762,7 @@ public class Controller {
for(Integer otherStore : thisRequire) { for(Integer otherStore : thisRequire) {
thisMessage = thisMessage + " " + otherStore; thisMessage = thisMessage + " " + otherStore;
} }
requireIndex.remove(file); it.remove();
} }
} }
...@@ -906,9 +787,11 @@ public class Controller { ...@@ -906,9 +787,11 @@ public class Controller {
} }
void removeDstore(DstoreDisconnectException e) { void removeDstore(DstoreDisconnectException e) {
Integer port = e.getPort(); Integer port = e.getConnection().getPort();
dstores.remove(port); synchronized(dstores) {
try {e.getSocket().close();} catch(IOException ee) {} if(dstores.get(port).equals(e.getConnection())) dstores.remove(port);
}
try {e.getConnection().getSocket().close();} catch(IOException ee) {}
Iterator<IndexEntry> it; Iterator<IndexEntry> it;
synchronized(index) {it = index.values().iterator();} synchronized(index) {it = index.values().iterator();}
...@@ -917,16 +800,6 @@ public class Controller { ...@@ -917,16 +800,6 @@ public class Controller {
} }
} }
<T> List<T> reshuffle(Collection<T> col) {
List<T> list = new ArrayList<T>();
Iterator<T> it = col.iterator();
while(it.hasNext()) {
list.add(it.next());
}
Collections.shuffle(list);
return list;
}
Iterator<Integer> sequenceIt = null; Iterator<Integer> sequenceIt = null;
Object sequenceLock = new Object(); Object sequenceLock = new Object();
Integer nextStoreInSequence() { Integer nextStoreInSequence() {
...@@ -947,27 +820,6 @@ public class Controller { ...@@ -947,27 +820,6 @@ public class Controller {
return store; return store;
} }
/* Deprecated
//Helper class for rebalance method - contains a filename and a boolean which is true if a dstore is going to send this file
protected class RequireHandle {
public String filename;
public boolean handled;
public RequireHandle(String filename) {
this.filename = filename;
handled = false;
}
}
boolean allHandled(Map<Integer,List<RequireHandle>> map) {
for(List<RequireHandle> list : map.values()) {
for(RequireHandle handle : list) {
if(!handle.handled) return false;
}
}
return true;
}
*/
void messageSent(Socket socket, String message) { void messageSent(Socket socket, String message) {
ControllerLogger.getInstance().messageSent(socket, message); ControllerLogger.getInstance().messageSent(socket, message);
} }
......
...@@ -21,6 +21,8 @@ public class Dstore { ...@@ -21,6 +21,8 @@ public class Dstore {
protected BufferedReader controllerIn; protected BufferedReader controllerIn;
protected PrintWriter controllerOut; protected PrintWriter controllerOut;
protected final int BUFFER_SIZE = 256;
public Dstore(int port, int cport, int timeout, String fileFolderName) throws Exception { public Dstore(int port, int cport, int timeout, String fileFolderName) throws Exception {
this.port = port; this.port = port;
this.cport = cport; this.cport = cport;
...@@ -38,6 +40,7 @@ public class Dstore { ...@@ -38,6 +40,7 @@ public class Dstore {
} }
fileSizes = new HashMap<String,Long>(); fileSizes = new HashMap<String,Long>();
for(File file : fileFolder.listFiles()) { for(File file : fileFolder.listFiles()) {
if(!file.delete()) throw new Exception("Directory specified has undeletable files; please try a different directory"); if(!file.delete()) throw new Exception("Directory specified has undeletable files; please try a different directory");
} }
...@@ -75,7 +78,7 @@ public class Dstore { ...@@ -75,7 +78,7 @@ public class Dstore {
try(Socket controllerSocket = new Socket(InetAddress.getLocalHost(), cport)) { try(Socket controllerSocket = new Socket(InetAddress.getLocalHost(), cport)) {
this.controllerSocket = controllerSocket; this.controllerSocket = controllerSocket;
controllerIn = new BufferedReader(new InputStreamReader(controllerSocket.getInputStream())); controllerIn = new BufferedReader(new InputStreamReader(controllerSocket.getInputStream()));
controllerOut = new PrintWriter(controllerSocket.getOutputStream()); controllerOut = new PrintWriter(controllerSocket.getOutputStream(), true);
String joinMessage = Protocol.JOIN_TOKEN + " " + port; String joinMessage = Protocol.JOIN_TOKEN + " " + port;
controllerOut.println(joinMessage); controllerOut.println(joinMessage);
controllerOut.flush(); controllerOut.flush();
...@@ -146,20 +149,22 @@ public class Dstore { ...@@ -146,20 +149,22 @@ public class Dstore {
new Thread(() -> { new Thread(() -> {
try { try {
//Send ACK message to client //Send ACK message to client
PrintWriter out = new PrintWriter(client.getOutputStream()); PrintWriter out = new PrintWriter(client.getOutputStream(), true);
out.println(Protocol.ACK_TOKEN); out.println(Protocol.ACK_TOKEN);
out.flush();
messageSent(client, Protocol.ACK_TOKEN); messageSent(client, Protocol.ACK_TOKEN);
FileOutputStream writer = new FileOutputStream(new File(fileFolder, filename), false); OutputStream writer = new FileOutputStream(new File(fileFolder, filename), false);
InputStream reader = client.getInputStream(); InputStream reader = client.getInputStream();
//Receive + write file content from client //Receive + write file content from client
byte[] nextLine = new byte[8]; byte[] nextLine = new byte[BUFFER_SIZE];
while(reader.readNBytes(nextLine, 0, 8) > 0) { int len;
writer.write(nextLine); do {
len = reader.readNBytes(nextLine, 0, BUFFER_SIZE);
writer.write(nextLine, 0, len);
writer.flush(); writer.flush();
} }
while(len == BUFFER_SIZE);
writer.close(); writer.close();
//Send STORE_ACK message to the Controller //Send STORE_ACK message to the Controller
...@@ -167,14 +172,15 @@ public class Dstore { ...@@ -167,14 +172,15 @@ public class Dstore {
synchronized(controllerOut) { synchronized(controllerOut) {
String controllerMessage = Protocol.STORE_ACK_TOKEN + " " + filename; String controllerMessage = Protocol.STORE_ACK_TOKEN + " " + filename;
controllerOut.println(controllerMessage); controllerOut.println(controllerMessage);
controllerOut.flush();
messageSent(controllerSocket, controllerMessage); messageSent(controllerSocket, controllerMessage);
} }
} }
synchronized(fileSizes) {
if(fileSizes.containsKey(filename)) fileSizes.remove(filename); if(fileSizes.containsKey(filename)) fileSizes.remove(filename);
fileSizes.put(filename, Long.valueOf(filesize)); fileSizes.put(filename, Long.valueOf(filesize));
} }
}
catch(IOException e) { catch(IOException e) {
e.printStackTrace(); e.printStackTrace();
} }
...@@ -198,11 +204,16 @@ public class Dstore { ...@@ -198,11 +204,16 @@ public class Dstore {
} }
OutputStream contentOut = client.getOutputStream(); OutputStream contentOut = client.getOutputStream();
byte[] buf = new byte[8]; byte[] buf = new byte[BUFFER_SIZE];
while(reader.read(buf) != -1) { int len;
contentOut.write(buf); do {
len = reader.read(buf);
if(len >= 0) {
contentOut.write(buf, 0, len);
contentOut.flush(); contentOut.flush();
} }
}
while(len == BUFFER_SIZE);
reader.close(); reader.close();
contentOut.close(); contentOut.close();
...@@ -219,24 +230,26 @@ public class Dstore { ...@@ -219,24 +230,26 @@ public class Dstore {
void remove(String filename) throws Exception { void remove(String filename) throws Exception {
new Thread(() -> { new Thread(() -> {
try { try {
System.out.println("Store " + port + " removing " + filename + "...");
//Remove the file from fileFolder //Remove the file from fileFolder
Path path = new File(fileFolder, filename).toPath(); Path path = new File(fileFolder, filename).toPath();
String controllerMessage; String controllerMessage;
if(Files.deleteIfExists(path)) { if(Files.deleteIfExists(path)) {
System.out.println("Store " + port + " removed " + filename);
//Send REMOVE_ACK message to client (the controller) //Send REMOVE_ACK message to client (the controller)
synchronized(controllerOut) { synchronized(controllerOut) {
controllerMessage = Protocol.REMOVE_ACK_TOKEN + " " + filename; controllerMessage = Protocol.REMOVE_ACK_TOKEN + " " + filename;
} }
} }
else { else {
System.out.println("Store " + port + " couldn't remove " + filename);
//Send DOES NOT EXIST error //Send DOES NOT EXIST error
synchronized(controllerOut) { synchronized(controllerOut) {
controllerMessage = Protocol.ERROR_FILE_DOES_NOT_EXIST_TOKEN + " " + filename; controllerMessage = Protocol.ERROR_FILE_DOES_NOT_EXIST_TOKEN + " " + filename;
} }
} }
controllerOut.println(controllerMessage); controllerOut.println(controllerMessage);
controllerOut.flush();
messageSent(controllerSocket, controllerMessage); messageSent(controllerSocket, controllerMessage);
} }
catch(IOException e) { catch(IOException e) {
...@@ -255,7 +268,6 @@ public class Dstore { ...@@ -255,7 +268,6 @@ public class Dstore {
if(message.equals("")) message = "ERROR_EMPTY"; if(message.equals("")) message = "ERROR_EMPTY";
synchronized(controllerOut) { synchronized(controllerOut) {
controllerOut.println(message.trim()); controllerOut.println(message.trim());
controllerOut.flush();
} }
}).start(); }).start();
} }
...@@ -311,10 +323,11 @@ public class Dstore { ...@@ -311,10 +323,11 @@ public class Dstore {
try { try {
System.out.println("Sending " + filename + " to store " + dstore); System.out.println("Sending " + filename + " to store " + dstore);
Socket socket = new Socket(InetAddress.getLocalHost(), dstore.intValue()); Socket socket = new Socket(InetAddress.getLocalHost(), dstore.intValue());
PrintWriter out = new PrintWriter(socket.getOutputStream()); PrintWriter out = new PrintWriter(socket.getOutputStream(), true);
String dstoreMessage = Protocol.REBALANCE_STORE_TOKEN + " " + filename + " " + fileSizes.get(filename); long fileSize;
synchronized(fileSizes) {fileSize = fileSizes.get(filename);}
String dstoreMessage = Protocol.REBALANCE_STORE_TOKEN + " " + filename + " " + fileSize;
out.println(dstoreMessage); out.println(dstoreMessage);
out.flush();
messageSent(socket, dstoreMessage); messageSent(socket, dstoreMessage);
BufferedReader in = new BufferedReader(new InputStreamReader(socket.getInputStream())); BufferedReader in = new BufferedReader(new InputStreamReader(socket.getInputStream()));
...@@ -325,13 +338,18 @@ public class Dstore { ...@@ -325,13 +338,18 @@ public class Dstore {
System.out.println("Dstore " + dstore + " should have sent ACK but " + port + " received " + receivedMessage); System.out.println("Dstore " + dstore + " should have sent ACK but " + port + " received " + receivedMessage);
} }
byte[] content = new byte[8]; byte[] content = new byte[BUFFER_SIZE];
int len;
FileInputStream fileIn = new FileInputStream(new File(fileFolder, filename)); FileInputStream fileIn = new FileInputStream(new File(fileFolder, filename));
OutputStream fileOut = socket.getOutputStream(); OutputStream fileOut = socket.getOutputStream();
while(fileIn.read(content) > 0) { do {
fileOut.write(content); len = fileIn.read(content);
if(len >= 0) {
fileOut.write(content, 0, len);
fileOut.flush(); fileOut.flush();
} }
}
while(len > 0);
fileIn.close(); fileIn.close();
fileOut.close(); fileOut.close();
in.close(); in.close();
...@@ -358,7 +376,6 @@ public class Dstore { ...@@ -358,7 +376,6 @@ public class Dstore {
//Send REBALANCE_COMPLETE message to client (the controller) //Send REBALANCE_COMPLETE message to client (the controller)
synchronized(controllerOut) { synchronized(controllerOut) {
controllerOut.println(Protocol.REBALANCE_COMPLETE_TOKEN); controllerOut.println(Protocol.REBALANCE_COMPLETE_TOKEN);
controllerOut.flush();
messageSent(controllerSocket, Protocol.REBALANCE_COMPLETE_TOKEN); messageSent(controllerSocket, Protocol.REBALANCE_COMPLETE_TOKEN);
} }
System.out.println("Sent message REBALANCE_COMPLETE"); System.out.println("Sent message REBALANCE_COMPLETE");
......
import java.io.*; import java.io.*;
import java.net.*; import java.net.*;
import java.util.concurrent.*;
import java.lang.Runnable; import java.lang.Runnable;
import java.util.List; import java.util.List;
import java.util.ArrayList; import java.util.ArrayList;
...@@ -29,7 +30,7 @@ public class DstoreConnection { ...@@ -29,7 +30,7 @@ public class DstoreConnection {
writer = new PrintWriter(socket.getOutputStream()); writer = new PrintWriter(socket.getOutputStream());
available = true; available = true;
queue = new ArrayList<String>(); queue = new ArrayList<String>();
disconnectException = new DstoreDisconnectException(port, socket); disconnectException = new DstoreDisconnectException(this);
} }
catch(IOException e) { catch(IOException e) {
e.printStackTrace(); e.printStackTrace();
...@@ -41,6 +42,14 @@ public class DstoreConnection { ...@@ -41,6 +42,14 @@ public class DstoreConnection {
} }
} }
public int getPort() {
return port;
}
public Socket getSocket() {
return socket;
}
public DstoreDisconnectException getDisconnectData() { public DstoreDisconnectException getDisconnectData() {
return disconnectException; return disconnectException;
} }
...@@ -50,7 +59,7 @@ public class DstoreConnection { ...@@ -50,7 +59,7 @@ public class DstoreConnection {
synchronized(this) { synchronized(this) {
try { try {
System.out.println("Lock acquired"); System.out.println("Lock acquired");
if(!available) return "ERROR"; if(!available) throw disconnectException;
writer.println(message); writer.println(message);
writer.flush(); writer.flush();
//System.out.println("Controller sent " + message + " to port " + port); //System.out.println("Controller sent " + message + " to port " + port);
...@@ -74,7 +83,7 @@ public class DstoreConnection { ...@@ -74,7 +83,7 @@ public class DstoreConnection {
System.out.println("Getting lock..."); System.out.println("Getting lock...");
synchronized(this) { synchronized(this) {
System.out.println("Lock acquired"); System.out.println("Lock acquired");
if(!available) return "ERROR"; if(!available) throw disconnectException;
//Check the queue twice: once incase the receiver is busy, twice incase the message was added by the last thread //Check the queue twice: once incase the receiver is busy, twice incase the message was added by the last thread
findMessage = checkQueue(expectedMessages); findMessage = checkQueue(expectedMessages);
...@@ -109,14 +118,15 @@ public class DstoreConnection { ...@@ -109,14 +118,15 @@ public class DstoreConnection {
protected String localReceive(String[] expectedMessages) throws DstoreDisconnectException { protected String localReceive(String[] expectedMessages) throws DstoreDisconnectException {
try { try {
ReceiveContext rc = new ReceiveContext(expectedMessages); ReceiveContext rc = new ReceiveContext(expectedMessages);
synchronized(rc.lock) {
rc.start(); rc.start();
rc.lock.wait(timeout); if(rc.latch.await(timeout, TimeUnit.MILLISECONDS)) {
if(rc.disconnected()) throw disconnectException;
else return rc.getReturnMessage();
}
else {
rc.end(); rc.end();
return "";
} }
String returnMessage = rc.getReturnMessage();
if(returnMessage == null) throw disconnectException;
else return returnMessage;
} }
catch(InterruptedException e) { catch(InterruptedException e) {
e.printStackTrace(); e.printStackTrace();
...@@ -161,19 +171,19 @@ public class DstoreConnection { ...@@ -161,19 +171,19 @@ public class DstoreConnection {
protected String[] expectedMessages; protected String[] expectedMessages;
protected String returnMessage; protected String returnMessage;
protected Thread thread; protected Thread thread;
public Object lock; protected boolean disconnected;
public CountDownLatch latch;
public ReceiveContext(String[] expectedMessages) { public ReceiveContext(String[] expectedMessages) {
this.expectedMessages = expectedMessages; this.expectedMessages = expectedMessages;
returnMessage = ""; returnMessage = "";
lock = new Object(); disconnected = false;
latch = new CountDownLatch(1);
} }
public String getReturnMessage() { public String getReturnMessage() {
synchronized(lock) {
return returnMessage; return returnMessage;
} }
}
public void start() { public void start() {
thread = new Thread(() -> { thread = new Thread(() -> {
...@@ -183,7 +193,8 @@ public class DstoreConnection { ...@@ -183,7 +193,8 @@ public class DstoreConnection {
message = reader.readLine(); message = reader.readLine();
if(message == null) { if(message == null) {
available = false; available = false;
throw disconnectException; disconnected = true;
break;
} }
ControllerLogger.getInstance().messageReceived(socket, message); ControllerLogger.getInstance().messageReceived(socket, message);
if(expectedMessages.length > 0 && !isExpected(message, expectedMessages)) { if(expectedMessages.length > 0 && !isExpected(message, expectedMessages)) {
...@@ -200,14 +211,8 @@ public class DstoreConnection { ...@@ -200,14 +211,8 @@ public class DstoreConnection {
e.printStackTrace(); e.printStackTrace();
returnMessage = ""; returnMessage = "";
} }
catch(DstoreDisconnectException e) {
e.printStackTrace();
returnMessage = null;
}
finally { finally {
synchronized(lock) { latch.countDown();
lock.notify();
}
} }
}); });
thread.start(); thread.start();
...@@ -216,5 +221,9 @@ public class DstoreConnection { ...@@ -216,5 +221,9 @@ public class DstoreConnection {
public void end() { public void end() {
if(thread.isAlive()) thread.interrupt(); if(thread.isAlive()) thread.interrupt();
} }
public boolean disconnected() {
return disconnected;
}
} }
} }
...@@ -2,20 +2,14 @@ import java.lang.Throwable; ...@@ -2,20 +2,14 @@ import java.lang.Throwable;
import java.net.Socket; import java.net.Socket;
public class DstoreDisconnectException extends Exception { public class DstoreDisconnectException extends Exception {
int port; DstoreConnection connection;
Socket socket;
public DstoreDisconnectException(int port, Socket socket) { public DstoreDisconnectException(DstoreConnection connection) {
super("Dstore at port " + port + " has been disconnected"); super("Dstore at port " + connection.getPort() + " has been disconnected");
this.port = port; this.connection = connection;
this.socket = socket;
} }
public int getPort() { public DstoreConnection getConnection() {
return port; return connection;
}
public Socket getSocket() {
return socket;
} }
} }
#!/bin/bash #!/bin/bash
java -cp .:loggers Controller 8080 $1 $3 $4 & java -cp .:loggers Controller 8080 $1 $3 $4 &
echo $! echo $!
processes=()
for((i=1; i<=$2; i++)) do for((i=1; i<=$2; i++)) do
sleep 0.2 sleep 0.2
n=$((8080+$i)) n=$((8080+$i))
echo $n echo $n
s="store$i" s="store$i"
java -cp .:loggers Dstore $n 8080 $3 $s & java -cp .:loggers Dstore $n 8080 $3 $s &
echo $!
done done
sleep 2 sleep 2
java -cp .:client-1.0.2.jar ClientMain 8080 $3 java -cp .:client-1.0.2.jar ClientMain 8080 $3
#!/bin/bash
java -cp .:loggers Controller 8080 $1 $3 $4 &
echo $!
processes=()
for((i=1; i<=$2; i++)) do
sleep 0.2
n=$((8080+$i))
echo $n
s="store$i"
java -cp .:loggers Dstore $n 8080 $3 $s &
processes+=($!)
echo ${processes[${i-1}]}
done
echo ${processes[0]}
sleep 2
java -cp .:client-1.0.2.jar ClientMain 8080 $3
sleep $(((2*$4)/3000))
kill ${processes[0]}
sleep 2
java -cp .:loggers Dstore 8081 8080 $3 store1 &
to_store/GameDotCom.jpg

1010 KiB

File added
to_store/spurk.jpg

162 KiB

0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment