Skip to content
Snippets Groups Projects
Commit a471f8d3 authored by dl3g19's avatar dl3g19
Browse files

Improved rebalance process among other things. With r=5 and 10 dstores, there...

Improved rebalance process among other things. With r=5 and 10 dstores, there always seems to be 1 store that doesn't like remove messages.
parent 439cc54c
Branches
No related tags found
No related merge requests found
...@@ -6,6 +6,7 @@ import java.lang.Math; ...@@ -6,6 +6,7 @@ import java.lang.Math;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Comparator;
import java.util.Map; import java.util.Map;
import java.util.HashMap; import java.util.HashMap;
import java.util.Set; import java.util.Set;
...@@ -13,6 +14,12 @@ import java.util.HashSet; ...@@ -13,6 +14,12 @@ import java.util.HashSet;
import java.util.Collection; import java.util.Collection;
import java.util.Collections; import java.util.Collections;
/*
TO DO:
Get rid of missing REMOVE_ACKs problem
Distrbute files evenly (check spec for correct number of files on each store)
*/
public class Controller { public class Controller {
protected int cport; //Port to listen on protected int cport; //Port to listen on
protected int rFactor; //Replication factor; each file is replicated across r Dstores protected int rFactor; //Replication factor; each file is replicated across r Dstores
...@@ -329,11 +336,8 @@ public class Controller { ...@@ -329,11 +336,8 @@ public class Controller {
//Select Dstores //Select Dstores
Integer[] storesToStore = new Integer[rFactor]; Integer[] storesToStore = new Integer[rFactor];
synchronized(dstores) { for(int i=0; i<rFactor; i++) {
Iterator<Integer> it = reshuffle(dstores.keySet()).iterator(); storesToStore[i] = nextStoreInSequence();
for(int i=0; i<rFactor; i++) {
storesToStore[i] = it.next();
}
} }
entry.setNumberToStore(rFactor); entry.setNumberToStore(rFactor);
...@@ -474,11 +478,13 @@ public class Controller { ...@@ -474,11 +478,13 @@ public class Controller {
//Send REMOVE message to all Dstores storing the file //Send REMOVE message to all Dstores storing the file
CountDownLatch latch = new CountDownLatch(entry.getStoredBy().size()); CountDownLatch latch = new CountDownLatch(entry.getStoredBy().size());
for(Integer dstore : entry.getStoredBy()) { Iterator<Integer> it = entry.getStoredBy().iterator();
while(it.hasNext()) {
Integer dstore = it.next();
new Thread(() -> { new Thread(() -> {
try { try {
String[] message = dstores.get(dstore).sendAndReceive(Protocol.REMOVE_TOKEN + " " + filename, Protocol.REMOVE_ACK_TOKEN).split(" "); String[] message = dstores.get(dstore).sendAndReceive(Protocol.REMOVE_TOKEN + " " + filename, Protocol.REMOVE_ACK_TOKEN, Protocol.ERROR_FILE_DOES_NOT_EXIST_TOKEN).split(" ");
if(message[0].equals(Protocol.REMOVE_ACK_TOKEN) && message[1].equals(filename)) { if((message[0].equals(Protocol.REMOVE_ACK_TOKEN) && message[1].equals(filename)) || message[0].equals(Protocol.ERROR_FILE_DOES_NOT_EXIST_TOKEN)) {
entry.removeStoredBy(dstore.intValue()); entry.removeStoredBy(dstore.intValue());
latch.countDown(); latch.countDown();
} }
...@@ -537,7 +543,6 @@ public class Controller { ...@@ -537,7 +543,6 @@ public class Controller {
} }
void rebalance() throws Exception { void rebalance() throws Exception {
boolean success = true;
Map<Integer,List<String>> dstoreFiles = new HashMap<Integer,List<String>>(); Map<Integer,List<String>> dstoreFiles = new HashMap<Integer,List<String>>();
synchronized(rebalanceMessages) { synchronized(rebalanceMessages) {
...@@ -583,6 +588,30 @@ public class Controller { ...@@ -583,6 +588,30 @@ public class Controller {
} }
*/ */
Map<Integer,List<String>> newAlloc = allocate(dstoreFiles);
Map<Integer,String> sendIndex = composeRebalanceMessages(dstoreFiles, newAlloc);
CountDownLatch latch = new CountDownLatch(sendIndex.size());
for(Integer dstore : sendIndex.keySet()) {
new Thread(() -> {
try {
DstoreConnection connection = dstores.get(dstore);
String returnMessage = connection.sendAndReceive(sendIndex.get(dstore), Protocol.REBALANCE_COMPLETE_TOKEN);
if(!returnMessage.equals(Protocol.REBALANCE_COMPLETE_TOKEN)) {
//Log error
System.out.println("Dstore " + dstore + " should have sent REBALANCE_COMPLETE but Controller received " + returnMessage);
}
latch.countDown();
}
catch(DstoreDisconnectException e) {
e.printStackTrace();
removeDstore(e);
}
catch(Exception e) {e.printStackTrace();}
}).start();
}
/*
//Create a new file allocation so that: //Create a new file allocation so that:
//Each file appears rFactor times //Each file appears rFactor times
//Each file appears at most once on each datastore //Each file appears at most once on each datastore
...@@ -598,16 +627,6 @@ public class Controller { ...@@ -598,16 +627,6 @@ public class Controller {
} }
} }
//Create a new index each for files required and files to remove
Map<Integer,List<RequireHandle>> requireIndex = new HashMap<Integer,List<RequireHandle>>();
Map<Integer,List<String>> removeIndex = new HashMap<Integer,List<String>>();
int pos = 0;
int storeSize = (int) Math.ceil((fileList.size() * rFactor) / dstores.size());
for(Integer i : dstoreFiles.keySet()) {
requireIndex.put(i, new ArrayList<RequireHandle>());
removeIndex.put(i, new ArrayList<String>());
}
//Insert files into the new indexes. These are allocated according to the new store order //Insert files into the new indexes. These are allocated according to the new store order
List<Integer> storeOrder; List<Integer> storeOrder;
Iterator<Integer> it; Iterator<Integer> it;
...@@ -633,6 +652,7 @@ public class Controller { ...@@ -633,6 +652,7 @@ public class Controller {
} }
} }
//For each dstore, compile a new message based on the contents of the indexes then send this message and receive a reply //For each dstore, compile a new message based on the contents of the indexes then send this message and receive a reply
CountDownLatch latch = new CountDownLatch(dstoreFiles.size()); CountDownLatch latch = new CountDownLatch(dstoreFiles.size());
for(Integer thisStore : dstoreFiles.keySet()) { for(Integer thisStore : dstoreFiles.keySet()) {
...@@ -705,50 +725,16 @@ public class Controller { ...@@ -705,50 +725,16 @@ public class Controller {
} }
}).start(); }).start();
} }
*/
//Wait for REBALANCE_COMPLETE from all Dstores //Wait for REBALANCE_COMPLETE from all Dstores
try { try {
if(!latch.await(timeout, TimeUnit.MILLISECONDS)) { if(!latch.await(timeout, TimeUnit.MILLISECONDS)) {
//Restart rebalance operation //Restart rebalance operation
System.err.println("Not all REBALANCE_COMPLETEs received. Restarting rebalance operation..."); System.err.println("Not all REBALANCE_COMPLETEs received");
success = false;
} }
} }
catch(Exception e) {e.printStackTrace();} catch(Exception e) {e.printStackTrace();}
/*
synchronized(acksReceived) {
try {
System.out.println("Waiting for REBALANCE_COMPLETE...");
acksReceived.wait(timeout);
if(acksReceived.getValue() < dstoreFiles.size()) {
//Restart rebalance operation
System.out.println("Not all REBALANCE_COMPLETEs received. Restarting rebalance operation...");
success = false;
}
else if(acksReceived.getValue() > dstoreFiles.size()) {
System.out.println("Too many REBALANCE_COMPLETEs received");
}
}
catch(InterruptedException e) {
e.printStackTrace();
}
}
*/
if(success) {
synchronized(index) {
Iterator<Integer> jt = requireIndex.keySet().iterator();
while(jt.hasNext()) {
Integer dstore = jt.next();
for(RequireHandle handle : requireIndex.get(dstore)) {
index.get(handle.filename).addStoredBy(Integer.valueOf(dstore), false);
}
for(String file : removeIndex.get(dstore)) {
if(index.containsKey(file)) index.get(file).removeStoredBy(Integer.valueOf(dstore));
}
}
}
}
} }
catch(Exception e) { catch(Exception e) {
e.printStackTrace(); e.printStackTrace();
...@@ -763,8 +749,6 @@ public class Controller { ...@@ -763,8 +749,6 @@ public class Controller {
} }
System.out.print("\n"); System.out.print("\n");
} }
if(!success) rebalance();
} }
void receiveDstoreList(int port, String[] list, CountDownLatch latch) { void receiveDstoreList(int port, String[] list, CountDownLatch latch) {
...@@ -784,6 +768,143 @@ public class Controller { ...@@ -784,6 +768,143 @@ public class Controller {
latch.countDown(); latch.countDown();
} }
//Allocate needs to:
//allocate files that don't have enough storers to dstores that don't have them
//move files from dstores that have too many files
//prioritize storing these files to dstores that don't have enough files
Map<Integer,List<String>> allocate(Map<Integer,List<String>> oldDstoreFiles) {
//Precaution made so that the input map is not modified
Map<Integer,List<String>> dstoreFiles = new HashMap<Integer,List<String>>();
for(Integer i : oldDstoreFiles.keySet()) {
List<String> files = new ArrayList<String>();
for(String s : oldDstoreFiles.get(i)) {
if(index.containsKey(s)) files.add(s);
}
dstoreFiles.put(i, files);
}
class AllocComparator implements Comparator<Integer> {
public int compare(Integer s1, Integer s2) {
return dstoreFiles.get(s1).size() - dstoreFiles.get(s2).size();
}
}
Map<String,Integer> counts = new HashMap<String,Integer>();
for(Integer dstore : dstoreFiles.keySet()) {
for(String file : dstoreFiles.get(dstore)) {
if(counts.get(file) == null) {
counts.put(file, 1);
}
else {
counts.put(file, counts.get(file) + 1);
}
}
}
List<Integer> priorityList = new ArrayList<Integer>(dstoreFiles.keySet());
priorityList.sort(new AllocComparator());
Iterator<Integer> it;
for(String file : counts.keySet()) {
if(counts.get(file) >= rFactor) continue;
it = priorityList.iterator();
while(counts.get(file) < rFactor && it.hasNext()) {
Integer thisStore = it.next();
if(!dstoreFiles.get(thisStore).contains(file)) {
dstoreFiles.get(thisStore).add(file);
counts.put(file, counts.get(file) + 1);
}
}
}
priorityList.sort(new AllocComparator());
double optimumStoreAmount = (rFactor * counts.size()) / dstoreFiles.size();
Integer minStore = priorityList.get(0);
Integer maxStore = priorityList.get(priorityList.size() - 1);
boolean giveUp = false;
while(dstoreFiles.get(maxStore).size() > Math.ceil(optimumStoreAmount)
&& dstoreFiles.get(minStore).size() < Math.floor(optimumStoreAmount)
&& !giveUp) {
giveUp = true;
Iterator<String> jt = dstoreFiles.get(maxStore).iterator();
while(jt.hasNext()) {
String thisFile = jt.next();
if(!dstoreFiles.get(minStore).contains(thisFile)) {
dstoreFiles.get(minStore).add(thisFile);
dstoreFiles.get(maxStore).remove(thisFile);
giveUp = false;
break;
}
}
priorityList.sort(new AllocComparator());
minStore = priorityList.get(0);
maxStore = priorityList.get(priorityList.size() - 1);
}
return dstoreFiles;
}
Map<Integer,String> composeRebalanceMessages(Map<Integer,List<String>> oldAlloc, Map<Integer,List<String>> newAlloc) {
Map<String,List<Integer>> requireIndex = new HashMap<String,List<Integer>>();
//Compose a map of required files by finding files of the new allocation that weren't present in the old
for(Integer dstore : newAlloc.keySet()) {
List<String> oldFiles = oldAlloc.get(dstore);
for(String file : newAlloc.get(dstore)) {
if(!oldFiles.contains(file)) {
List<Integer> requires = requireIndex.get(file);
if(requires == null) {
requires = new ArrayList<Integer>();
requireIndex.put(file, requires);
}
requires.add(dstore);
index.get(file).addStoredBy(dstore);
}
}
}
Map<Integer,String> messages = new HashMap<Integer,String>();
for(Integer dstore : newAlloc.keySet()) {
String thisMessage = "";
//Compose files to send
int filesToSend = 0;
List<String> oldFiles = oldAlloc.get(dstore);
List<String> newFiles = newAlloc.get(dstore);
for(String file : requireIndex.keySet()) {
if(oldFiles.contains(file)) {
filesToSend ++;
List<Integer> thisRequire = requireIndex.get(file);
thisMessage = thisMessage + " " + file + " " + thisRequire.size();
for(Integer otherStore : thisRequire) {
thisMessage = thisMessage + " " + otherStore;
}
requireIndex.remove(file);
}
}
thisMessage = Protocol.REBALANCE_TOKEN + " " + filesToSend + thisMessage;
String removeMessage = "";
int filesToRemove = 0;
for(String file : oldFiles) {
if(!newFiles.contains(file)) {
filesToRemove ++;
removeMessage = removeMessage + " " + file;
if(index.get(file) != null) index.get(file).removeStoredBy(dstore);
}
}
if(filesToSend == 0 && filesToRemove == 0) continue;
thisMessage = thisMessage + " " + filesToRemove + removeMessage;
messages.put(dstore, thisMessage);
}
return messages;
}
void removeDstore(DstoreDisconnectException e) { void removeDstore(DstoreDisconnectException e) {
Integer port = e.getPort(); Integer port = e.getPort();
dstores.remove(port); dstores.remove(port);
...@@ -806,6 +927,27 @@ public class Controller { ...@@ -806,6 +927,27 @@ public class Controller {
return list; return list;
} }
Iterator<Integer> sequenceIt = null;
Object sequenceLock = new Object();
Integer nextStoreInSequence() {
Integer store = null;
while(store == null) {
synchronized(sequenceLock) {
if(sequenceIt == null || !sequenceIt.hasNext()) {
synchronized(dstores) {
if(dstores.isEmpty()) return null;
sequenceIt = dstores.keySet().iterator();
}
}
store = sequenceIt.next();
if(!dstores.containsKey(store)) store = null;
}
}
return store;
}
/* Deprecated
//Helper class for rebalance method - contains a filename and a boolean which is true if a dstore is going to send this file //Helper class for rebalance method - contains a filename and a boolean which is true if a dstore is going to send this file
protected class RequireHandle { protected class RequireHandle {
public String filename; public String filename;
...@@ -824,6 +966,7 @@ public class Controller { ...@@ -824,6 +966,7 @@ public class Controller {
} }
return true; return true;
} }
*/
void messageSent(Socket socket, String message) { void messageSent(Socket socket, String message) {
ControllerLogger.getInstance().messageSent(socket, message); ControllerLogger.getInstance().messageSent(socket, message);
......
...@@ -118,8 +118,11 @@ public class Dstore { ...@@ -118,8 +118,11 @@ public class Dstore {
} }
void handleMessage(String[] message, Socket client) throws Exception { void handleMessage(String[] message, Socket client) throws Exception {
if(message[0].equals(Protocol.STORE_TOKEN) || message[0].equals(Protocol.REBALANCE_STORE_TOKEN)) { if(message[0].equals(Protocol.STORE_TOKEN)) {
store(client, message[1], Long.parseLong(message[2])); store(client, message[1], Long.parseLong(message[2]), true);
}
else if(message[0].equals(Protocol.REBALANCE_STORE_TOKEN)) {
store(client, message[1], Long.parseLong(message[2]), false);
} }
else if(message[0].equals(Protocol.LOAD_DATA_TOKEN)) { else if(message[0].equals(Protocol.LOAD_DATA_TOKEN)) {
load(client, message[1]); load(client, message[1]);
...@@ -139,7 +142,7 @@ public class Dstore { ...@@ -139,7 +142,7 @@ public class Dstore {
} }
} }
void store(Socket client, String filename, long filesize) throws Exception { void store(Socket client, String filename, long filesize, boolean acknowledged) throws Exception {
new Thread(() -> { new Thread(() -> {
try { try {
//Send ACK message to client //Send ACK message to client
...@@ -160,11 +163,13 @@ public class Dstore { ...@@ -160,11 +163,13 @@ public class Dstore {
writer.close(); writer.close();
//Send STORE_ACK message to the Controller //Send STORE_ACK message to the Controller
synchronized(controllerOut) { if(acknowledged) {
String controllerMessage = Protocol.STORE_ACK_TOKEN + " " + filename; synchronized(controllerOut) {
controllerOut.println(controllerMessage); String controllerMessage = Protocol.STORE_ACK_TOKEN + " " + filename;
controllerOut.flush(); controllerOut.println(controllerMessage);
messageSent(controllerSocket, controllerMessage); controllerOut.flush();
messageSent(controllerSocket, controllerMessage);
}
} }
if(fileSizes.containsKey(filename)) fileSizes.remove(filename); if(fileSizes.containsKey(filename)) fileSizes.remove(filename);
......
...@@ -45,7 +45,7 @@ public class DstoreConnection { ...@@ -45,7 +45,7 @@ public class DstoreConnection {
return disconnectException; return disconnectException;
} }
public String sendAndReceive(String message, String expectedMessage) throws DstoreDisconnectException { public String sendAndReceive(String message, String... expectedMessages) throws DstoreDisconnectException {
System.out.println("Getting lock..."); System.out.println("Getting lock...");
synchronized(this) { synchronized(this) {
try { try {
...@@ -55,7 +55,7 @@ public class DstoreConnection { ...@@ -55,7 +55,7 @@ public class DstoreConnection {
writer.flush(); writer.flush();
//System.out.println("Controller sent " + message + " to port " + port); //System.out.println("Controller sent " + message + " to port " + port);
ControllerLogger.getInstance().messageSent(socket, message); ControllerLogger.getInstance().messageSent(socket, message);
return localReceive(expectedMessage); return localReceive(expectedMessages);
} }
catch(NullPointerException e) { catch(NullPointerException e) {
System.out.println("Dstore at port " + port + " disconnected"); System.out.println("Dstore at port " + port + " disconnected");
...@@ -65,12 +65,8 @@ public class DstoreConnection { ...@@ -65,12 +65,8 @@ public class DstoreConnection {
} }
} }
public String sendAndReceive(String message) throws DstoreDisconnectException { public String receive(String... expectedMessages) throws DstoreDisconnectException {
return sendAndReceive(message, null); String findMessage = checkQueue(expectedMessages);
}
public String receive(String expectedMessage) throws DstoreDisconnectException {
String findMessage = checkQueue(expectedMessage);
if(findMessage != null) { if(findMessage != null) {
return findMessage; return findMessage;
} }
...@@ -81,17 +77,19 @@ public class DstoreConnection { ...@@ -81,17 +77,19 @@ public class DstoreConnection {
if(!available) return "ERROR"; if(!available) return "ERROR";
//Check the queue twice: once incase the receiver is busy, twice incase the message was added by the last thread //Check the queue twice: once incase the receiver is busy, twice incase the message was added by the last thread
findMessage = checkQueue(expectedMessage); findMessage = checkQueue(expectedMessages);
if(findMessage != null) { if(findMessage != null) {
return findMessage; return findMessage;
} }
return localReceive(expectedMessage); return localReceive(expectedMessages);
} }
} }
//Check the queue for the message before trying to receive any new messages (if no expected message is specified, return the head of the queue) //Check the queue for the message before trying to receive any new messages (if no expected message is specified, return the head of the queue)
protected String checkQueue(String expectedMessage) { protected String checkQueue(String[] expectedMessages) {
if(expectedMessages.length == 0) return null;
Iterator<String> it; Iterator<String> it;
synchronized(queue) { synchronized(queue) {
it = queue.iterator(); it = queue.iterator();
...@@ -99,7 +97,7 @@ public class DstoreConnection { ...@@ -99,7 +97,7 @@ public class DstoreConnection {
while(it.hasNext()) { while(it.hasNext()) {
String message = it.next(); String message = it.next();
if(expectedMessage == null || expectedMessage.equals(message.split(" ")[0])) { if(isExpected(message, expectedMessages)) {
queue.remove(message); queue.remove(message);
return message; return message;
} }
...@@ -108,13 +106,9 @@ public class DstoreConnection { ...@@ -108,13 +106,9 @@ public class DstoreConnection {
return null; return null;
} }
public String receive() throws DstoreDisconnectException { protected String localReceive(String[] expectedMessages) throws DstoreDisconnectException {
return receive(null);
}
protected String localReceive(String expectedMessage) throws DstoreDisconnectException {
try { try {
ReceiveContext rc = new ReceiveContext(expectedMessage); ReceiveContext rc = new ReceiveContext(expectedMessages);
synchronized(rc.lock) { synchronized(rc.lock) {
rc.start(); rc.start();
rc.lock.wait(timeout); rc.lock.wait(timeout);
...@@ -155,15 +149,22 @@ public class DstoreConnection { ...@@ -155,15 +149,22 @@ public class DstoreConnection {
return ""; return "";
} }
protected boolean isExpected(String message, String[] expectedMessages) {
for(String s : expectedMessages) {
if(s.equals(message.split(" ")[0])) return true;
}
return false;
}
//Seperate class for enabling timeouts while receiving messages //Seperate class for enabling timeouts while receiving messages
protected class ReceiveContext { protected class ReceiveContext {
protected String expectedMessage; protected String[] expectedMessages;
protected String returnMessage; protected String returnMessage;
protected Thread thread; protected Thread thread;
public Object lock; public Object lock;
public ReceiveContext(String expectedMessage) { public ReceiveContext(String[] expectedMessages) {
this.expectedMessage = expectedMessage; this.expectedMessages = expectedMessages;
returnMessage = ""; returnMessage = "";
lock = new Object(); lock = new Object();
} }
...@@ -185,7 +186,7 @@ public class DstoreConnection { ...@@ -185,7 +186,7 @@ public class DstoreConnection {
throw disconnectException; throw disconnectException;
} }
ControllerLogger.getInstance().messageReceived(socket, message); ControllerLogger.getInstance().messageReceived(socket, message);
if(expectedMessage != null && !expectedMessage.equals(message.split(" ")[0])) { if(expectedMessages.length > 0 && !isExpected(message, expectedMessages)) {
queue.add(message); queue.add(message);
if(queue.size() > MAX_QUEUE_SIZE) queue.remove(0); if(queue.size() > MAX_QUEUE_SIZE) queue.remove(0);
message = null; message = null;
......
...@@ -59,6 +59,6 @@ public class RebalanceLock { ...@@ -59,6 +59,6 @@ public class RebalanceLock {
return dstoreJoined; return dstoreJoined;
} }
catch(InterruptedException e) {e.printStackTrace();} catch(InterruptedException e) {e.printStackTrace();}
return true; return false;
} }
} }
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment