Select Git revision
Controller.java
Controller.java 26.89 KiB
import java.io.*;
import java.net.*;
import java.util.concurrent.*;
import java.lang.Runnable;
import java.lang.Math;
import java.util.Iterator;
import java.util.List;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.Map;
import java.util.HashMap;
import java.util.Set;
import java.util.HashSet;
import java.util.Collection;
import java.util.Collections;
public class Controller {
protected int cport; //Port to listen on
protected int rFactor; //Replication factor; each file is replicated across r Dstores
protected int timeout; //in milliseconds
protected int rebalancePeriod; //How long to wait to start the next rebalance operation, in milliseconds
protected static class IndexEntry {
public static enum Status {
STORE_IN_PROGRESS,
STORE_COMPLETE,
REMOVE_IN_PROGRESS,
REMOVE_COMPLETE
}
protected long filesize;
protected List<Integer> storedBy;
protected int numberToStore;
protected Status status;
public IndexEntry() {
filesize = -1;
storedBy = Collections.synchronizedList(new ArrayList<Integer>());
status = Status.STORE_IN_PROGRESS;
}
public synchronized void setFilesize(long filesize) {
this.filesize = filesize;
}
public synchronized long getFilesize() {
return filesize;
}
public synchronized void addStoredBy(int dstore) {
storedBy.add(Integer.valueOf(dstore));
}
public synchronized void removeStoredBy(int dstore) {
storedBy.remove(Integer.valueOf(dstore));
}
public List<Integer> getStoredBy() {
return storedBy;
}
public synchronized void setStatus(Status status) {
this.status = status;
}
public synchronized Status getStatus() {
return status;
}
}
protected class Reloader extends ArrayList<Integer> {
public long filesize;
}
protected class InvalidStatusException extends Exception {}
protected Map<Integer,DstoreConnection> dstores;
protected Map<String,IndexEntry> index;
protected Map<Socket,Reloader> loadRequests;
protected RebalanceLock rebalanceLock;
public Controller(int cport, int rFactor, int timeout, int rebalancePeriod) {
this.cport = cport;
this.rFactor = rFactor;
this.timeout = timeout;
this.rebalancePeriod = rebalancePeriod;
dstores = Collections.synchronizedMap(new HashMap<Integer,DstoreConnection>());
index = Collections.synchronizedMap(new HashMap<String,IndexEntry>());
loadRequests = Collections.synchronizedMap(new HashMap<Socket,Reloader>());
rebalanceLock = new RebalanceLock(rebalancePeriod);
try {ControllerLogger.init(Logger.LoggingType.ON_FILE_AND_TERMINAL);} catch(IOException e) {e.printStackTrace();}
}
public static void main(String[] args) {
try {
int cport = Integer.parseInt(args[0]);
int rFactor = Integer.parseInt(args[1]);
int timeout = Integer.parseInt(args[2]);
int rebalancePeriod = Integer.parseInt(args[3]);
if(cport < 0 || rFactor < 1 || timeout < 0 || rebalancePeriod < 0) {
throw new Exception("Infeasible values provided as arguments");
}
Controller controller = new Controller(cport, rFactor, timeout, rebalancePeriod);
controller.start();
}
catch(IndexOutOfBoundsException e) {
System.err.println("Command line arguments have not been provided");
return;
}
catch(NumberFormatException e) {
System.err.println("Command line arguments must be integers");
return;
}
catch(Exception e) {
e.printStackTrace();
return;
}
}
public void start() {
try {
Thread rebalanceThread = new Thread(new RebalanceThread());
rebalanceThread.start();
ServerSocket server = new ServerSocket(cport);
while(true) {
try {
Socket client = server.accept();
BufferedReader in = new BufferedReader(new InputStreamReader(client.getInputStream()));
String tMessage = in.readLine();
messageReceived(client, tMessage);
String[] message;
if(tMessage == null) {try {client.close();} catch(IOException e) {} finally {return;}}
else {message = tMessage.split(" ");}
new Thread(() -> {
if(message[0].equals(Protocol.JOIN_TOKEN)) {
int portNumber = Integer.parseInt(message[1]);
synchronized(rebalanceLock) {
dstores.put(portNumber, new DstoreConnection(client, portNumber, timeout));
System.out.println("Dstore at " + portNumber + " joined");
ControllerLogger.getInstance().dstoreJoined(client, portNumber);
rebalanceLock.queueRebalance();
}
}
else {
System.out.println("A new client has joined");
try {
handleMessage(message, client);
}
catch(Exception e) {
e.printStackTrace();
}
String clientMessage = "";
do {
try {
clientMessage = in.readLine();
messageReceived(client, clientMessage);
if(clientMessage != null) {
handleMessage(clientMessage.split(" "), client);
}
}
catch(Exception e) {
e.printStackTrace();
}
}
while(clientMessage != null);
System.out.println("Client closed");
loadRequests.remove(client);
try {client.close();} catch(IOException e) {}
}
}).start();
}
catch(Exception e) {
//Log error
System.out.println("Error accepting new connection");
e.printStackTrace();
System.out.println("Continue...");
}
}
}
catch(Exception e) {
e.printStackTrace();
}
}
protected class RebalanceThread implements Runnable {
public void run() {
while(true) {
rebalanceLock.waitToRebalance();
try {
if(dstores.size() >= rFactor) {
runRebalance();
}
}
catch(Exception e) {e.printStackTrace();}
}
}
protected void runRebalance() {
System.out.println("About to start a rebalance...");
synchronized(rebalanceLock) {
try {
rebalanceLock.waitForFinish();
rebalance();
}
catch(Exception e) {e.printStackTrace();}
}
}
}
void handleMessage(String[] message, Socket client) throws Exception {
try {
rebalanceLock.addProcess();
try {
if(dstores.size() < rFactor) {
PrintWriter out = new PrintWriter(client.getOutputStream());
out.println(Protocol.ERROR_NOT_ENOUGH_DSTORES_TOKEN);
out.flush();
messageSent(client, Protocol.ERROR_NOT_ENOUGH_DSTORES_TOKEN);
}
else if(message[0].equals(Protocol.STORE_TOKEN)) {
store(client, message[1], message[2]);
}
else if(message[0].equals(Protocol.LOAD_TOKEN)) {
load(client, message[1]);
}
else if(message[0].equals(Protocol.RELOAD_TOKEN)) {
sendLoadFrom(client, message[1]);
}
else if(message[0].equals(Protocol.REMOVE_TOKEN)) {
remove(client, message[1]);
}
else if(message[0].equals(Protocol.LIST_TOKEN)) {
list(client);
}
else {
//Log error
System.out.println("Malformed message received by Controller");
}
}
catch(Exception e) {e.printStackTrace();}
finally {
rebalanceLock.removeProcess();
}
}
catch(InterruptedException e) {e.printStackTrace();}
}
void store(Socket client, String filename, String filesizeString) throws Exception {
long filesize = -1;
try {
filesize = Long.parseLong(filesizeString);
if(filesize < 1) {
//Log error
System.out.println("A client is trying to store a file with size < 1");
}
}
catch(NumberFormatException e) {
//Log error
System.out.println("Client has not provided an integer as a filesize");
}
try {
//Create a new entry in the index
IndexEntry entry;
try {
synchronized(index) {
if(index.containsKey(filename)) {
entry = index.get(filename);
if(entry.getStatus() == IndexEntry.Status.REMOVE_COMPLETE) {
index.remove(filename);
}
else {
throw new InvalidStatusException();
}
}
entry = new IndexEntry();
index.put(filename, entry);
}
}
catch(InvalidStatusException e) {
PrintWriter out = new PrintWriter(client.getOutputStream());
out.println(Protocol.ERROR_FILE_ALREADY_EXISTS_TOKEN);
out.flush();
messageSent(client, Protocol.ERROR_FILE_ALREADY_EXISTS_TOKEN);
return;
}
entry.setFilesize(filesize);
//Select Dstores
Integer[] storesToStore = new Integer[rFactor];
for(int i=0; i<rFactor; i++) {
storesToStore[i] = nextStoreInSequence();
}
//Send STORE_TO message
CountDownLatch latch = new CountDownLatch(rFactor);
PrintWriter out = new PrintWriter(client.getOutputStream());
String message = Protocol.STORE_TO_TOKEN;
for(Integer thisStore : storesToStore) {
message = message + " " + thisStore.intValue();
IndexEntry entryf = entry;
new Thread(() -> {
try {
String receivedMessage = dstores.get(thisStore).receive(Protocol.STORE_ACK_TOKEN + " " + filename);
if(receivedMessage != null) {
try {
storeAck(thisStore, entryf, latch);
}
catch(Exception e) {
//Log error
System.err.println("Error processing store ack from dstore " + thisStore);
e.printStackTrace();
}
}
else {
//Log error
System.err.println("Dstore " + thisStore + " timed out receiving STORE_ACK");
}
}
catch(DstoreDisconnectException e) {
e.printStackTrace();
removeDstore(e);
}
catch(DeadStoreException e) {
System.err.println("Store for " + filename + " failed due to dead dstore");
}
}).start();
}
out.println(message);
out.flush();
messageSent(client, message);
//Wait for STORE_ACKs from datastores in storesToStore
if(latch.await(timeout, TimeUnit.MILLISECONDS)) {
//Update index to "store complete"
entry.setStatus(IndexEntry.Status.STORE_COMPLETE);
//Send STORE_COMPLETE message
out.println(Protocol.STORE_COMPLETE_TOKEN);
out.flush();
messageSent(client, Protocol.STORE_COMPLETE_TOKEN);
}
else {
//Log error
System.err.println("Not all STORE_ACKs have been received");
//Remove file from index
synchronized(index) {
if(index.containsKey(filename) && index.get(filename) == entry) index.remove(filename);
}
}
}
catch(IOException e) {
e.printStackTrace();
}
}
void storeAck(Integer port, IndexEntry entry, CountDownLatch latch) throws Exception {
entry.addStoredBy(port);
latch.countDown();
}
void load(Socket client, String filename) throws Exception {
try {
if(!index.containsKey(filename) || index.get(filename).getStatus() != IndexEntry.Status.STORE_COMPLETE) {
PrintWriter out = new PrintWriter(client.getOutputStream());
out.println(Protocol.ERROR_FILE_DOES_NOT_EXIST_TOKEN);
out.flush();
messageSent(client, Protocol.ERROR_FILE_DOES_NOT_EXIST_TOKEN);
return;
}
//Select a Dstore which contains the file
IndexEntry thisEntry = index.get(filename);
Reloader storedBy = new Reloader();
storedBy.filesize = thisEntry.getFilesize();
Iterator<Integer> it = thisEntry.getStoredBy().iterator();
while(it.hasNext()) {
Integer d = it.next();
storedBy.add(d);
System.out.println("Dstore " + d + " added to load list");
}
loadRequests.put(client,storedBy);
//Send LOAD_FROM message
sendLoadFrom(client, filename);
}
catch(IOException e) {
e.printStackTrace();
}
}
void sendLoadFrom(Socket client, String filename) {
try {
PrintWriter out = new PrintWriter(client.getOutputStream());
String message;
if(!index.containsKey(filename) || index.get(filename).getStatus() != IndexEntry.Status.STORE_COMPLETE) {
message = Protocol.ERROR_FILE_DOES_NOT_EXIST_TOKEN;
}
else {
Reloader storedBy = loadRequests.get(client);
System.out.println("Load requested for file " + filename + ", there are " + storedBy.size() + " dstores to select from");
if(storedBy.isEmpty()) {
message = Protocol.ERROR_LOAD_TOKEN;
}
else {
Integer thisStore = storedBy.get(0);
storedBy.remove(thisStore);
message = Protocol.LOAD_FROM_TOKEN + thisStore + " " + storedBy.filesize;
}
}
out.println(message);
out.flush();
messageSent(client, message);
}
catch(IOException e) {
e.printStackTrace();
}
}
void remove(Socket client, String filename) throws Exception {
try {
IndexEntry entry;
try {
synchronized(index) {
entry = index.get(filename);
if(entry == null || entry.getStatus() != IndexEntry.Status.STORE_COMPLETE) {
throw new InvalidStatusException();
}
//Update index to "remove in progress"
entry.setStatus(IndexEntry.Status.REMOVE_IN_PROGRESS);
}
}
catch(InvalidStatusException e) {
PrintWriter clientOut = new PrintWriter(client.getOutputStream());
clientOut.println(Protocol.ERROR_FILE_DOES_NOT_EXIST_TOKEN);
clientOut.flush();
messageSent(client, Protocol.ERROR_FILE_DOES_NOT_EXIST_TOKEN);
return;
}
//Send REMOVE message to all Dstores storing the file
List<Integer> storedBy;
CountDownLatch latch;
Iterator<Integer> it;
synchronized(entry.getStoredBy()) {
storedBy = new ArrayList<Integer>(entry.getStoredBy());
}
latch = new CountDownLatch(storedBy.size());
it = storedBy.iterator();
while(it.hasNext()) {
Integer dstore = it.next();
new Thread(() -> {
try {
String message = dstores.get(dstore).sendAndReceive(Protocol.REMOVE_TOKEN + " " + filename, Protocol.REMOVE_ACK_TOKEN + " " + filename, Protocol.ERROR_FILE_DOES_NOT_EXIST_TOKEN);
if(message != null) {
entry.removeStoredBy(dstore.intValue());
latch.countDown();
}
else {
//Log error
System.err.println("Dstore " + dstore + " timed out receiving REMOVE_ACK");
}
}
catch(DstoreDisconnectException e) {
e.printStackTrace();
removeDstore(e);
}
catch(DeadStoreException e) {
System.err.println("Remove for " + filename + " failed due to dead dstore");
}
}).start();
}
//Wait for REMOVE_ACKs from all Dstores which were sent the REMOVE message
if(latch.await(timeout, TimeUnit.MILLISECONDS)) {
//Update index to "remove complete"
entry.setStatus(IndexEntry.Status.REMOVE_COMPLETE);
synchronized(index) {
if(index.get(filename) == entry) index.remove(filename);
}
//Send REMOVE_COMPLETE to client
PrintWriter clientOut = new PrintWriter(client.getOutputStream());
clientOut.println(Protocol.REMOVE_COMPLETE_TOKEN);
clientOut.flush();
messageSent(client, Protocol.REMOVE_COMPLETE_TOKEN);
}
else {
//Log error
System.err.println("Not all REMOVE_ACKs have been received");
}
}
catch(IOException e) {
e.printStackTrace();
}
}
void list(Socket client) throws Exception {
try {
System.out.println("Fetching list...");
//Send file list to client
String message = Protocol.LIST_TOKEN + " ";
Iterator<String> it;
synchronized(index) {
it = index.keySet().iterator();
while(it.hasNext()) {
String name = it.next();
IndexEntry entry = index.get(name);
synchronized(entry) {
if(entry != null && entry.getStatus() == IndexEntry.Status.STORE_COMPLETE)
message = message + name + " ";
}
}
}
PrintWriter out = new PrintWriter(client.getOutputStream());
System.out.println("Sending...");
out.println(message.trim());
out.flush();
messageSent(client, message.trim());
}
catch(IOException e) {
e.printStackTrace();
}
}
void rebalance() throws Exception {
Map<Integer,List<String>> dstoreFilesR = new HashMap<Integer,List<String>>();
CountDownLatch listLatch = new CountDownLatch(dstores.size());
boolean locked = false;
try {
//Send LIST message to each Dstore and receive their file list
List<Thread> activeThreads = new ArrayList<Thread>();
for(Integer dstore : dstores.keySet()) {
Thread thisThread = new Thread(() -> {
try {
//String[] message = dstores.get(dstore).sendAndReceive(Protocol.LIST_TOKEN).split(" ");
String message = dstores.get(dstore).sendAndReceive(Protocol.LIST_TOKEN);
if(message != null) {
receiveDstoreList(dstore.intValue(), message, dstoreFilesR, listLatch);
}
else {
System.err.println("Dstore " + dstore + " timed out receiving file list");
}
}
catch(DstoreDisconnectException e) {
e.printStackTrace();
removeDstore(e);
listLatch.countDown();
}
catch(DeadStoreException e) {}
});
thisThread.start();
activeThreads.add(thisThread);
}
Map<Integer,List<String>> dstoreFiles = null;
try {
boolean allReceived = listLatch.await(timeout, TimeUnit.MILLISECONDS);
synchronized(dstoreFilesR) {
if(!allReceived) {
//Log error
System.err.println("Not all file lists have been received");
Set<Integer> storesToRemove = new HashSet<Integer>(dstores.keySet());
storesToRemove.removeAll(dstoreFilesR.keySet());
for(Integer dstore : storesToRemove) {
removeDstore(dstores.get(dstore).getDisconnectData());
}
dstoreFiles = new HashMap<Integer,List<String>>(dstoreFilesR);
}
else {
dstoreFiles = dstoreFilesR;
}
}
}
catch(Exception e) {e.printStackTrace();}
if(dstoreFiles.size() < rFactor) throw new Exception("Less than R dstores connected; connections may be faulty or timeout may be too strict");
Map<Integer,List<String>> newAlloc = allocate(dstoreFiles);
Map<Integer,String> sendIndex = composeRebalanceMessages(dstoreFiles, newAlloc);
CountDownLatch latch = new CountDownLatch(sendIndex.size());
for(Integer dstore : sendIndex.keySet()) {
new Thread(() -> {
try {
DstoreConnection connection = dstores.get(dstore);
String returnMessage = connection.sendAndReceive(sendIndex.get(dstore), Protocol.REBALANCE_COMPLETE_TOKEN);
if(returnMessage == null) {
//Log error
System.out.println("Dstore " + dstore + " timed out receiving REBALANCE_COMPLETE");
}
latch.countDown();
}
catch(DstoreDisconnectException e) {
e.printStackTrace();
removeDstore(e);
}
catch(Exception e) {e.printStackTrace();}
}).start();
}
//Wait for REBALANCE_COMPLETE from all Dstores
try {
if(!latch.await(timeout, TimeUnit.MILLISECONDS)) {
//Restart rebalance operation
System.err.println("Not all REBALANCE_COMPLETEs received");
}
}
catch(Exception e) {e.printStackTrace();}
}
catch(Exception e) {
e.printStackTrace();
}
finally {
System.out.println("There are " + dstores.size() + " dstores connected");
for(String i : index.keySet()) {
System.out.print(i);
}
System.out.print("\n");
resetSequence();
}
}
void receiveDstoreList(int port, String list, Map<Integer,List<String>> dstoreFiles, CountDownLatch latch) {
List<String> toList = new ArrayList<String>();
if(!list.equals("")) {
for(String file : list.split(" ")) {
toList.add(file);
}
}
synchronized(dstoreFiles) {
dstoreFiles.put(port, toList);
}
latch.countDown();
}
//Allocate needs to:
//allocate files that don't have enough storers to dstores that don't have them
//move files from dstores that have too many files
//prioritize storing these files to dstores that don't have enough files
Map<Integer,List<String>> allocate(Map<Integer,List<String>> oldDstoreFiles) {
Map<Integer,List<String>> dstoreFiles = new HashMap<Integer,List<String>>();
List<String> availableFiles = new ArrayList<String>();
for(Integer i : oldDstoreFiles.keySet()) {
List<String> files = new ArrayList<String>();
for(String s : oldDstoreFiles.get(i)) {
if(index.containsKey(s)) {
if(index.get(s).getStatus() == IndexEntry.Status.STORE_COMPLETE) {
files.add(s);
}
else {
index.remove(s);
}
}
if(!availableFiles.contains(s)) availableFiles.add(s);
}
dstoreFiles.put(i, files);
}
//These files have been lost to crashes and need to be removed from the index
synchronized(index) {
Iterator<String> it = index.keySet().iterator();
while(it.hasNext()) {
String file = it.next();
if(!availableFiles.contains(file)) {
it.remove();
}
}
}
class AllocComparator implements Comparator<Integer> {
protected int m;
public AllocComparator(boolean ascending) {
if(ascending) m = 1;
else m = -1;
}
public int compare(Integer s1, Integer s2) {
return dstoreFiles.get(s1).size() - m * dstoreFiles.get(s2).size();
}
}
Map<String,Integer> counts = new HashMap<String,Integer>();
for(Integer dstore : dstoreFiles.keySet()) {
for(String file : dstoreFiles.get(dstore)) {
if(counts.get(file) == null) {
counts.put(file, 1);
}
else {
counts.put(file, counts.get(file) + 1);
}
}
}
List<Integer> priorityList = new ArrayList<Integer>(dstoreFiles.keySet());
Iterator<Integer> it;
for(String file : counts.keySet()) {
if(counts.get(file) > rFactor) {
System.out.println("Need to remove copies of " + file);
priorityList.sort(new AllocComparator(false));
it = priorityList.iterator();
while(counts.get(file) > rFactor && it.hasNext()) {
Integer thisStore = it.next();
if(dstoreFiles.get(thisStore).contains(file)) {
dstoreFiles.get(thisStore).remove(file);
counts.put(file, counts.get(file) - 1);
System.out.println(file + " removed from " + thisStore);
}
}
}
else if(counts.get(file) < rFactor) {
System.out.println("Need to make copies of " + file);
priorityList.sort(new AllocComparator(true));
it = priorityList.iterator();
while(counts.get(file) < rFactor && it.hasNext()) {
Integer thisStore = it.next();
if(!dstoreFiles.get(thisStore).contains(file)) {
dstoreFiles.get(thisStore).add(file);
counts.put(file, counts.get(file) + 1);
System.out.println(file + " allocated to " + thisStore);
}
}
}
}
double optimumStoreAmount = ((double) rFactor * (double) counts.size()) / (double) dstoreFiles.size();
priorityList.sort(new AllocComparator(true));
Integer minStore = priorityList.get(0);
Integer maxStore = priorityList.get(priorityList.size() - 1);
boolean giveUp = false;
System.out.println(rFactor + " * " + counts.size() + " / " + dstoreFiles.size() + " = " + optimumStoreAmount);
while((dstoreFiles.get(maxStore).size() > Math.ceil(optimumStoreAmount)
|| dstoreFiles.get(minStore).size() < Math.floor(optimumStoreAmount))
&& !giveUp) {
giveUp = true;
Iterator<String> jt = dstoreFiles.get(maxStore).iterator();
while(jt.hasNext()) {
String thisFile = jt.next();
if(!dstoreFiles.get(minStore).contains(thisFile)) {
//System.out.println(optimumStoreAmount);
//System.out.println("Moving " + thisFile + " from " + maxStore + "[" + dstoreFiles.get(maxStore).size() + "] to " + minStore + "[" + dstoreFiles.get(minStore).size() + "]");
dstoreFiles.get(minStore).add(thisFile);
dstoreFiles.get(maxStore).remove(thisFile);
giveUp = false;
break;
}
}
priorityList.sort(new AllocComparator(true));
minStore = priorityList.get(0);
maxStore = priorityList.get(priorityList.size() - 1);
if(giveUp) System.out.println("Gave up reallocating files");
}
return dstoreFiles;
}
Map<Integer,String> composeRebalanceMessages(Map<Integer,List<String>> oldAlloc, Map<Integer,List<String>> newAlloc) {
Map<String,List<Integer>> requireIndex = new HashMap<String,List<Integer>>();
//Compose a map of required files by finding files of the new allocation that weren't present in the old
for(Integer dstore : newAlloc.keySet()) {
List<String> oldFiles = oldAlloc.get(dstore);
for(String file : newAlloc.get(dstore)) {
if(!oldFiles.contains(file)) {
List<Integer> requires = requireIndex.get(file);
if(requires == null) {
requires = new ArrayList<Integer>();
requireIndex.put(file, requires);
}
requires.add(dstore);
index.get(file).addStoredBy(dstore);
}
}
}
Map<String,Integer> hasRequire = new HashMap<String,Integer>();
for(String file : requireIndex.keySet()) {
int count = 0;
for(Integer dstore : oldAlloc.keySet()) {
if(oldAlloc.get(dstore).contains(file)) {
count ++;
}
}
hasRequire.put(file, count);
}
Map<Integer,String> messages = new HashMap<Integer,String>();
for(Integer dstore : newAlloc.keySet()) {
String thisMessage = "";
//Compose files to send
int filesToSend = 0;
List<String> oldFiles = oldAlloc.get(dstore);
List<String> newFiles = newAlloc.get(dstore);
Iterator<String> it = requireIndex.keySet().iterator();
while(it.hasNext()) {
String file = it.next();
if(oldFiles.contains(file)) {
filesToSend ++;
List<Integer> thisRequire = requireIndex.get(file);
int distribution = (int) Math.ceil((double) thisRequire.size() / (double) hasRequire.get(file));
//thisMessage = thisMessage + " " + file + " " + thisRequire.size();
int numberSentTo = 0;
String sentTo = "";
while(numberSentTo < distribution && !thisRequire.isEmpty()) {
Integer otherStore = thisRequire.get(0);
sentTo = sentTo + " " + otherStore;
thisRequire.remove(0);
numberSentTo ++;
}
thisMessage = thisMessage + " " + file + " " + numberSentTo + sentTo;
if(thisRequire.isEmpty()) it.remove();
}
}
thisMessage = Protocol.REBALANCE_TOKEN + " " + filesToSend + thisMessage;
String removeMessage = "";
int filesToRemove = 0;
for(String file : oldFiles) {
if(!newFiles.contains(file)) {
filesToRemove ++;
removeMessage = removeMessage + " " + file;
if(index.get(file) != null) index.get(file).removeStoredBy(dstore);
}
}
if(filesToSend == 0 && filesToRemove == 0) continue;
thisMessage = thisMessage + " " + filesToRemove + removeMessage;
messages.put(dstore, thisMessage);
}
return messages;
}
void removeDstore(DstoreDisconnectException e) {
Integer port = e.getConnection().getPort();
synchronized(dstores) {
if(dstores.containsKey(port) && dstores.get(port).equals(e.getConnection())) dstores.remove(port);
}
try {e.getConnection().getSocket().close();} catch(IOException ee) {}
Iterator<IndexEntry> it;
synchronized(index) {it = index.values().iterator();}
while(it.hasNext()) {
it.next().removeStoredBy(port);
}
rebalanceLock.queueRebalance();
}
Iterator<Integer> sequenceIt = null;
Object sequenceLock = new Object();
Integer nextStoreInSequence() {
Integer store = null;
while(store == null) {
synchronized(sequenceLock) {
if(sequenceIt == null || !sequenceIt.hasNext()) {
if(!resetSequence()) return null;
}
store = sequenceIt.next();
if(!dstores.containsKey(store)) store = null;
}
}
return store;
}
boolean resetSequence() {
synchronized(sequenceLock) { synchronized(dstores) {
if(dstores.isEmpty()) return false;
sequenceIt = new HashSet<Integer>(dstores.keySet()).iterator();
}}
return true;
}
void messageSent(Socket socket, String message) {
ControllerLogger.getInstance().messageSent(socket, message);
}
void messageReceived(Socket socket, String message) {
ControllerLogger.getInstance().messageReceived(socket, message);
}
}