Skip to content
Snippets Groups Projects
Select Git revision
  • 8124d07cc0ca42523d8cac02d01965389aae9a73
  • master default protected
2 results

Controller.java

Blame
  • Controller.java 26.89 KiB
    import java.io.*;
    import java.net.*;
    import java.util.concurrent.*;
    import java.lang.Runnable;
    import java.lang.Math;
    import java.util.Iterator;
    import java.util.List;
    import java.util.ArrayList;
    import java.util.Comparator;
    import java.util.Map;
    import java.util.HashMap;
    import java.util.Set;
    import java.util.HashSet;
    import java.util.Collection;
    import java.util.Collections;
    
    public class Controller {
    	protected int cport; //Port to listen on
    	protected int rFactor; //Replication factor; each file is replicated across r Dstores
    	protected int timeout; //in milliseconds
    	protected int rebalancePeriod; //How long to wait to start the next rebalance operation, in milliseconds
    	
    	protected static class IndexEntry {
    		public static enum Status {
    			STORE_IN_PROGRESS,
    			STORE_COMPLETE,
    			REMOVE_IN_PROGRESS,
    			REMOVE_COMPLETE
    		}
    		
    		protected long filesize;
    		protected List<Integer> storedBy;
    		protected int numberToStore;
    		protected Status status;
    		
    		public IndexEntry() {
    			filesize = -1;
    			storedBy = Collections.synchronizedList(new ArrayList<Integer>());
    			status = Status.STORE_IN_PROGRESS;
    		}
    		
    		public synchronized void setFilesize(long filesize) {
    			this.filesize = filesize;
    		}
    		
    		public synchronized long getFilesize() {
    			return filesize;
    		}
    		
    		public synchronized void addStoredBy(int dstore) {
    			storedBy.add(Integer.valueOf(dstore));
    		}
    		
    		public synchronized void removeStoredBy(int dstore) {
    			storedBy.remove(Integer.valueOf(dstore));
    		}
    		
    		public List<Integer> getStoredBy() {
    			return storedBy;
    		}
    		
    		public synchronized void setStatus(Status status) {
    			this.status = status;
    		}
    		
    		public synchronized Status getStatus() {
    			return status;
    		}
    	}
    	
    	protected class Reloader extends ArrayList<Integer> {
    		public long filesize;
    	}
    	
    	protected class InvalidStatusException extends Exception {}
    	
    	protected Map<Integer,DstoreConnection> dstores;
    	protected Map<String,IndexEntry> index;
    	protected Map<Socket,Reloader> loadRequests;
    	
    	protected RebalanceLock rebalanceLock;
    	
    	public Controller(int cport, int rFactor, int timeout, int rebalancePeriod) {
    		this.cport = cport;
    		this.rFactor = rFactor;
    		this.timeout = timeout;
    		this.rebalancePeriod = rebalancePeriod;
    		dstores = Collections.synchronizedMap(new HashMap<Integer,DstoreConnection>());
    		index = Collections.synchronizedMap(new HashMap<String,IndexEntry>());
    		loadRequests = Collections.synchronizedMap(new HashMap<Socket,Reloader>());
    		rebalanceLock = new RebalanceLock(rebalancePeriod);
    		
    		try {ControllerLogger.init(Logger.LoggingType.ON_FILE_AND_TERMINAL);} catch(IOException e) {e.printStackTrace();}
    	}
    	
    	public static void main(String[] args) {
    		try {
    			int cport = Integer.parseInt(args[0]);
    			int rFactor = Integer.parseInt(args[1]);
    			int timeout = Integer.parseInt(args[2]);
    			int rebalancePeriod = Integer.parseInt(args[3]);
    			
    			if(cport < 0 || rFactor < 1 || timeout < 0 || rebalancePeriod < 0) {
    				throw new Exception("Infeasible values provided as arguments");
    			}
    			
    			Controller controller = new Controller(cport, rFactor, timeout, rebalancePeriod);
    			controller.start();
    		}
    		catch(IndexOutOfBoundsException e) {
    			System.err.println("Command line arguments have not been provided");
    			return;
    		}
    		catch(NumberFormatException e) {
    			System.err.println("Command line arguments must be integers");
    			return;
    		}
    		catch(Exception e) {
    			e.printStackTrace();
    			return;
    		}
    	}
    	
    	public void start() {
    		try {
    			Thread rebalanceThread = new Thread(new RebalanceThread());
    			rebalanceThread.start();
    			
    			ServerSocket server = new ServerSocket(cport);
    			while(true) {
    				try {
    					Socket client = server.accept();
    					BufferedReader in = new BufferedReader(new InputStreamReader(client.getInputStream()));
    					String tMessage = in.readLine();
    					messageReceived(client, tMessage);
    					String[] message;
    					if(tMessage == null) {try {client.close();} catch(IOException e) {} finally {return;}}
    					else {message = tMessage.split(" ");}
    					
    					new Thread(() -> {
    						if(message[0].equals(Protocol.JOIN_TOKEN)) {
    							int portNumber = Integer.parseInt(message[1]);
    							synchronized(rebalanceLock) {
    								dstores.put(portNumber, new DstoreConnection(client, portNumber, timeout));
    								System.out.println("Dstore at " + portNumber + " joined");
    								ControllerLogger.getInstance().dstoreJoined(client, portNumber);
    								rebalanceLock.queueRebalance();
    							}
    						}
    						else {
    							System.out.println("A new client has joined");
    							try {
    								handleMessage(message, client);
    							}
    							catch(Exception e) {
    								e.printStackTrace();
    							}
    							
    							String clientMessage = "";
    							do {
    								try {
    									clientMessage = in.readLine();
    									messageReceived(client, clientMessage);
    									if(clientMessage != null) {
    										handleMessage(clientMessage.split(" "), client);
    									}
    								}
    								catch(Exception e) {
    									e.printStackTrace();
    								}
    							}
    							while(clientMessage != null);
    							System.out.println("Client closed");
    							loadRequests.remove(client);
    							try {client.close();} catch(IOException e) {}
    						}
    					}).start();
    				}
    				catch(Exception e) {
    					//Log error
    					System.out.println("Error accepting new connection");
    					e.printStackTrace();
    					System.out.println("Continue...");
    				}
    			}
    		}
    		catch(Exception e) {
    			e.printStackTrace();
    		}
    	}
    	
    	protected class RebalanceThread implements Runnable {
    		public void run() {
    			while(true) {
    				rebalanceLock.waitToRebalance();
    				try {
    					if(dstores.size() >= rFactor) {
    						runRebalance();
    					}
    				}
    				catch(Exception e) {e.printStackTrace();}
    			}
    		}
    		
    		protected void runRebalance() {
    			System.out.println("About to start a rebalance...");
    			synchronized(rebalanceLock) {
    				try {
    					rebalanceLock.waitForFinish();
    					rebalance();
    				}
    				catch(Exception e) {e.printStackTrace();}
    			}
    		}
    	}
    	
    	void handleMessage(String[] message, Socket client) throws Exception {
    		try {
    			rebalanceLock.addProcess();
    			
    			try {
    				if(dstores.size() < rFactor) {
    					PrintWriter out = new PrintWriter(client.getOutputStream());
    					out.println(Protocol.ERROR_NOT_ENOUGH_DSTORES_TOKEN);
    					out.flush();
    					messageSent(client, Protocol.ERROR_NOT_ENOUGH_DSTORES_TOKEN);
    				}
    				else if(message[0].equals(Protocol.STORE_TOKEN)) {
    					store(client, message[1], message[2]);
    				}
    				else if(message[0].equals(Protocol.LOAD_TOKEN)) {
    					load(client, message[1]);
    				}
    				else if(message[0].equals(Protocol.RELOAD_TOKEN)) {
    					sendLoadFrom(client, message[1]);
    				}
    				else if(message[0].equals(Protocol.REMOVE_TOKEN)) {
    					remove(client, message[1]);
    				}
    				else if(message[0].equals(Protocol.LIST_TOKEN)) {
    					list(client);
    				}
    				else {
    					//Log error
    					System.out.println("Malformed message received by Controller");
    				}
    			}
    			catch(Exception e) {e.printStackTrace();}
    			finally {
    				rebalanceLock.removeProcess();
    			}
    		}
    		catch(InterruptedException e) {e.printStackTrace();}
    	}
    	
    	void store(Socket client, String filename, String filesizeString) throws Exception {
    		long filesize = -1;
    		try {
    			filesize = Long.parseLong(filesizeString);
    			if(filesize < 1) {
    				//Log error
    				System.out.println("A client is trying to store a file with size < 1");
    			}
    		}
    		catch(NumberFormatException e) {
    			//Log error
    			System.out.println("Client has not provided an integer as a filesize");
    		}
    		
    		try {
    			//Create a new entry in the index
    			IndexEntry entry;
    			try {
    				synchronized(index) {
    					if(index.containsKey(filename)) {
    						entry = index.get(filename);
    						if(entry.getStatus() == IndexEntry.Status.REMOVE_COMPLETE) {
    							index.remove(filename);
    						}
    						else {
    							throw new InvalidStatusException();
    						}
    					}
    					entry = new IndexEntry();
    					index.put(filename, entry);
    				}
    			}
    			catch(InvalidStatusException e) {
    				PrintWriter out = new PrintWriter(client.getOutputStream());
    				out.println(Protocol.ERROR_FILE_ALREADY_EXISTS_TOKEN);
    				out.flush();
    				messageSent(client, Protocol.ERROR_FILE_ALREADY_EXISTS_TOKEN);
    				return;
    			}
    			
    			entry.setFilesize(filesize);
    			
    			//Select Dstores
    			Integer[] storesToStore = new Integer[rFactor];
    			for(int i=0; i<rFactor; i++) {
    				storesToStore[i] = nextStoreInSequence();
    			}
    			
    			//Send STORE_TO message
    			CountDownLatch latch = new CountDownLatch(rFactor);
    			PrintWriter out = new PrintWriter(client.getOutputStream());
    			String message = Protocol.STORE_TO_TOKEN;
    			for(Integer thisStore : storesToStore) {
    				message = message + " " + thisStore.intValue();
    				IndexEntry entryf = entry;
    				new Thread(() -> {
    					try {
    						String receivedMessage = dstores.get(thisStore).receive(Protocol.STORE_ACK_TOKEN + " " + filename);
    						if(receivedMessage != null) {
    							try {
    								storeAck(thisStore, entryf, latch);
    							}
    							catch(Exception e) {
    								//Log error
    								System.err.println("Error processing store ack from dstore " + thisStore);
    								e.printStackTrace();
    							}
    						}
    						else {
    							//Log error
    							System.err.println("Dstore " + thisStore + " timed out receiving STORE_ACK");
    						}
    					}
    					catch(DstoreDisconnectException e) {
    						e.printStackTrace();
    						removeDstore(e);
    					}
    					catch(DeadStoreException e) {
    						System.err.println("Store for " + filename + " failed due to dead dstore");
    					}
    				}).start();
    			}
    			out.println(message);
    			out.flush();
    			messageSent(client, message);
    			
    			//Wait for STORE_ACKs from datastores in storesToStore
    			if(latch.await(timeout, TimeUnit.MILLISECONDS)) {
    				//Update index to "store complete"
    				entry.setStatus(IndexEntry.Status.STORE_COMPLETE);
    				
    				//Send STORE_COMPLETE message
    				out.println(Protocol.STORE_COMPLETE_TOKEN);
    				out.flush();
    				messageSent(client, Protocol.STORE_COMPLETE_TOKEN);
    			}
    			else {
    				//Log error
    				System.err.println("Not all STORE_ACKs have been received");
    				
    				//Remove file from index
    				synchronized(index) {
    					if(index.containsKey(filename) && index.get(filename) == entry) index.remove(filename);
    				}
    			}
    		}
    		catch(IOException e) {
    			e.printStackTrace();
    		}
    	}
    	
    	void storeAck(Integer port, IndexEntry entry, CountDownLatch latch) throws Exception {
    		entry.addStoredBy(port);
    		latch.countDown();
    	}
    	
    	void load(Socket client, String filename) throws Exception {
    		try {
    			if(!index.containsKey(filename) || index.get(filename).getStatus() != IndexEntry.Status.STORE_COMPLETE) {
    				PrintWriter out = new PrintWriter(client.getOutputStream());
    				out.println(Protocol.ERROR_FILE_DOES_NOT_EXIST_TOKEN);
    				out.flush();
    				messageSent(client, Protocol.ERROR_FILE_DOES_NOT_EXIST_TOKEN);
    				return;
    			}
    			
    			//Select a Dstore which contains the file
    			IndexEntry thisEntry = index.get(filename);
    			Reloader storedBy = new Reloader();
    			storedBy.filesize = thisEntry.getFilesize();
    			Iterator<Integer> it = thisEntry.getStoredBy().iterator();
    			while(it.hasNext()) {
    				Integer d = it.next();
    				storedBy.add(d);
    				System.out.println("Dstore " + d + " added to load list");
    			}
    			loadRequests.put(client,storedBy);
    			
    			//Send LOAD_FROM message
    			sendLoadFrom(client, filename);
    		}
    		catch(IOException e) {
    			e.printStackTrace();
    		}
    	}
    	
    	void sendLoadFrom(Socket client, String filename) {
    		try {
    			PrintWriter out = new PrintWriter(client.getOutputStream());
    			String message;
    			if(!index.containsKey(filename) || index.get(filename).getStatus() != IndexEntry.Status.STORE_COMPLETE) {
    				message = Protocol.ERROR_FILE_DOES_NOT_EXIST_TOKEN;
    			}
    			else {
    				Reloader storedBy = loadRequests.get(client);
    				System.out.println("Load requested for file " + filename + ", there are " + storedBy.size() + " dstores to select from");
    				if(storedBy.isEmpty()) {
    					message = Protocol.ERROR_LOAD_TOKEN;
    				}
    				else {
    					Integer thisStore = storedBy.get(0);
    					storedBy.remove(thisStore);
    					message = Protocol.LOAD_FROM_TOKEN + thisStore + " " + storedBy.filesize;
    				}
    			}
    			out.println(message);
    			out.flush();
    			messageSent(client, message);
    		}
    		catch(IOException e) {
    			e.printStackTrace();
    		}
    	}
    	
    	void remove(Socket client, String filename) throws Exception {
    		try {
    			IndexEntry entry;
    			try {
    				synchronized(index) {
    					entry = index.get(filename);
    					if(entry == null || entry.getStatus() != IndexEntry.Status.STORE_COMPLETE) {
    						throw new InvalidStatusException();
    					}
    					
    					//Update index to "remove in progress"
    					entry.setStatus(IndexEntry.Status.REMOVE_IN_PROGRESS);
    				}
    			}
    			catch(InvalidStatusException e) {
    				PrintWriter clientOut = new PrintWriter(client.getOutputStream());
    				clientOut.println(Protocol.ERROR_FILE_DOES_NOT_EXIST_TOKEN);
    				clientOut.flush();
    				messageSent(client, Protocol.ERROR_FILE_DOES_NOT_EXIST_TOKEN);
    				return;
    			}
    			
    			//Send REMOVE message to all Dstores storing the file
    			List<Integer> storedBy;
    			CountDownLatch latch;
    			Iterator<Integer> it;
    			synchronized(entry.getStoredBy()) {
    				storedBy = new ArrayList<Integer>(entry.getStoredBy());
    			}
    			latch = new CountDownLatch(storedBy.size());
    			it = storedBy.iterator();
    			while(it.hasNext()) {
    				Integer dstore = it.next();
    				new Thread(() -> {
    					try {
    						String message = dstores.get(dstore).sendAndReceive(Protocol.REMOVE_TOKEN + " " + filename, Protocol.REMOVE_ACK_TOKEN + " " + filename, Protocol.ERROR_FILE_DOES_NOT_EXIST_TOKEN);
    						if(message != null) {
    							entry.removeStoredBy(dstore.intValue());
    							latch.countDown();
    						}
    						else {
    							//Log error
    							System.err.println("Dstore " + dstore + " timed out receiving REMOVE_ACK");
    						}
    					}
    					catch(DstoreDisconnectException e) {
    						e.printStackTrace();
    						removeDstore(e);
    					}
    					catch(DeadStoreException e) {
    						System.err.println("Remove for " + filename + " failed due to dead dstore");
    					}
    				}).start();
    			}
    			
    			//Wait for REMOVE_ACKs from all Dstores which were sent the REMOVE message
    			if(latch.await(timeout, TimeUnit.MILLISECONDS)) {
    				//Update index to "remove complete"
    				entry.setStatus(IndexEntry.Status.REMOVE_COMPLETE);
    				synchronized(index) {
    					if(index.get(filename) == entry) index.remove(filename);
    				}
    				
    				//Send REMOVE_COMPLETE to client
    				PrintWriter clientOut = new PrintWriter(client.getOutputStream());
    				clientOut.println(Protocol.REMOVE_COMPLETE_TOKEN);
    				clientOut.flush();
    				messageSent(client, Protocol.REMOVE_COMPLETE_TOKEN);
    			}
    			else {
    				//Log error
    				System.err.println("Not all REMOVE_ACKs have been received");
    			}
    		}
    		catch(IOException e) {
    			e.printStackTrace();
    		}
    	}
    	
    	void list(Socket client) throws Exception {
    		try {
    			System.out.println("Fetching list...");
    			//Send file list to client
    			String message = Protocol.LIST_TOKEN + " ";
    			Iterator<String> it;
    			synchronized(index) {
    				it = index.keySet().iterator();
    				while(it.hasNext()) {
    					String name = it.next();
    					IndexEntry entry = index.get(name);
    					synchronized(entry) {
    						if(entry != null && entry.getStatus() == IndexEntry.Status.STORE_COMPLETE)
    							message = message + name + " ";
    					}
    				}
    			}
    			PrintWriter out = new PrintWriter(client.getOutputStream());
    			System.out.println("Sending...");
    			out.println(message.trim());
    			out.flush();
    			messageSent(client, message.trim());
    		}
    		catch(IOException e) {
    			e.printStackTrace();
    		}
    	}
    	
    	void rebalance() throws Exception {
    		Map<Integer,List<String>> dstoreFilesR = new HashMap<Integer,List<String>>();
    		CountDownLatch listLatch = new CountDownLatch(dstores.size());
    		boolean locked = false;
    		try {
    			//Send LIST message to each Dstore and receive their file list
    			List<Thread> activeThreads = new ArrayList<Thread>();
    			for(Integer dstore : dstores.keySet()) {
    				Thread thisThread = new Thread(() -> {
    					try {
    						//String[] message = dstores.get(dstore).sendAndReceive(Protocol.LIST_TOKEN).split(" ");
    						String message = dstores.get(dstore).sendAndReceive(Protocol.LIST_TOKEN);
    						if(message != null) {
    							receiveDstoreList(dstore.intValue(), message, dstoreFilesR, listLatch);
    						}
    						else {
    							System.err.println("Dstore " + dstore + " timed out receiving file list");
    						}
    					}
    					catch(DstoreDisconnectException e) {
    						e.printStackTrace();
    						removeDstore(e);
    						listLatch.countDown();
    					}
    					catch(DeadStoreException e) {}
    				});
    				thisThread.start();
    				activeThreads.add(thisThread);
    			}
    			
    			Map<Integer,List<String>> dstoreFiles = null;
    			try {
    				boolean allReceived = listLatch.await(timeout, TimeUnit.MILLISECONDS);
    				synchronized(dstoreFilesR) {
    					if(!allReceived) {
    						//Log error
    						System.err.println("Not all file lists have been received");
    						Set<Integer> storesToRemove = new HashSet<Integer>(dstores.keySet());
    						storesToRemove.removeAll(dstoreFilesR.keySet());
    						for(Integer dstore : storesToRemove) {
    							removeDstore(dstores.get(dstore).getDisconnectData());
    						}
    						dstoreFiles = new HashMap<Integer,List<String>>(dstoreFilesR);
    					}
    					else {
    						dstoreFiles = dstoreFilesR;
    					}
    				}
    			}
    			catch(Exception e) {e.printStackTrace();}
    			
    			if(dstoreFiles.size() < rFactor) throw new Exception("Less than R dstores connected; connections may be faulty or timeout may be too strict");
    			
    			Map<Integer,List<String>> newAlloc = allocate(dstoreFiles);
    			Map<Integer,String> sendIndex = composeRebalanceMessages(dstoreFiles, newAlloc);
    			CountDownLatch latch = new CountDownLatch(sendIndex.size());
    			for(Integer dstore : sendIndex.keySet()) {
    				new Thread(() -> {
    					try {
    						DstoreConnection connection = dstores.get(dstore);
    						String returnMessage = connection.sendAndReceive(sendIndex.get(dstore), Protocol.REBALANCE_COMPLETE_TOKEN);
    						if(returnMessage == null) {
    							//Log error
    							System.out.println("Dstore " + dstore + " timed out receiving REBALANCE_COMPLETE");
    						}
    						
    						latch.countDown();
    					}
    					catch(DstoreDisconnectException e) {
    						e.printStackTrace();
    						removeDstore(e);
    					}
    					catch(Exception e) {e.printStackTrace();}
    				}).start();
    			}
    			
    			//Wait for REBALANCE_COMPLETE from all Dstores
    			try {
    				if(!latch.await(timeout, TimeUnit.MILLISECONDS)) {
    					//Restart rebalance operation
    					System.err.println("Not all REBALANCE_COMPLETEs received");
    				}
    			}
    			catch(Exception e) {e.printStackTrace();}
    		}
    		catch(Exception e) {
    			e.printStackTrace();
    		}
    		finally {
    			System.out.println("There are " + dstores.size() + " dstores connected");
    			for(String i : index.keySet()) {
    				System.out.print(i);
    			}
    			System.out.print("\n");
    			resetSequence();
    		}
    	}
    	
    	void receiveDstoreList(int port, String list, Map<Integer,List<String>> dstoreFiles, CountDownLatch latch) {
    		List<String> toList = new ArrayList<String>();
    		if(!list.equals("")) {
    			for(String file : list.split(" ")) {
    				toList.add(file);
    			}
    		}
    		
    		synchronized(dstoreFiles) {
    			dstoreFiles.put(port, toList);
    		}
    		
    		latch.countDown();
    	}
    	
    	//Allocate needs to:
    	  //allocate files that don't have enough storers to dstores that don't have them
    	  //move files from dstores that have too many files
    	  //prioritize storing these files to dstores that don't have enough files
    	Map<Integer,List<String>> allocate(Map<Integer,List<String>> oldDstoreFiles) {
    		Map<Integer,List<String>> dstoreFiles = new HashMap<Integer,List<String>>();
    		List<String> availableFiles = new ArrayList<String>();
    		for(Integer i : oldDstoreFiles.keySet()) {
    			List<String> files = new ArrayList<String>();
    			for(String s : oldDstoreFiles.get(i)) {
    				if(index.containsKey(s)) {
    					if(index.get(s).getStatus() == IndexEntry.Status.STORE_COMPLETE) {
    						files.add(s);
    					}
    					else {
    						index.remove(s);
    					}
    				}
    				if(!availableFiles.contains(s)) availableFiles.add(s);
    			}
    			dstoreFiles.put(i, files);
    		}
    		
    		//These files have been lost to crashes and need to be removed from the index
    		synchronized(index) {
    			Iterator<String> it = index.keySet().iterator();
    			while(it.hasNext()) {
    				String file = it.next();
    				if(!availableFiles.contains(file)) {
    					it.remove();
    				}
    			}	
    		}
    		
    		class AllocComparator implements Comparator<Integer> {
    			protected int m;
    			public AllocComparator(boolean ascending) {
    				if(ascending) m = 1;
    				else m = -1;
    			}
    			
    			public int compare(Integer s1, Integer s2) {
    				return dstoreFiles.get(s1).size() - m * dstoreFiles.get(s2).size();
    			}
    		}
    		
    		Map<String,Integer> counts = new HashMap<String,Integer>();
    		for(Integer dstore : dstoreFiles.keySet()) {
    			for(String file : dstoreFiles.get(dstore)) {
    				if(counts.get(file) == null) {
    					counts.put(file, 1);
    				}
    				else {
    					counts.put(file, counts.get(file) + 1);
    				}
    			}
    		}
    		
    		List<Integer> priorityList = new ArrayList<Integer>(dstoreFiles.keySet());
    		
    		Iterator<Integer> it;
    		for(String file : counts.keySet()) {
    			if(counts.get(file) > rFactor) {
    				System.out.println("Need to remove copies of " + file);
    				priorityList.sort(new AllocComparator(false));
    				it = priorityList.iterator();
    				while(counts.get(file) > rFactor && it.hasNext()) {
    					Integer thisStore = it.next();
    					if(dstoreFiles.get(thisStore).contains(file)) {
    						dstoreFiles.get(thisStore).remove(file);
    						counts.put(file, counts.get(file) - 1);
    						System.out.println(file + " removed from " + thisStore);
    					}
    				}
    			}
    			else if(counts.get(file) < rFactor) {
    				System.out.println("Need to make copies of " + file);
    				priorityList.sort(new AllocComparator(true));
    				it = priorityList.iterator();
    				while(counts.get(file) < rFactor && it.hasNext()) {
    					Integer thisStore = it.next();
    					if(!dstoreFiles.get(thisStore).contains(file)) {
    						dstoreFiles.get(thisStore).add(file);
    						counts.put(file, counts.get(file) + 1);
    						System.out.println(file + " allocated to " + thisStore);
    					}
    				}
    			}
    		}
    		
    		double optimumStoreAmount = ((double) rFactor * (double) counts.size()) / (double) dstoreFiles.size();
    		priorityList.sort(new AllocComparator(true));
    		Integer minStore = priorityList.get(0);
    		Integer maxStore = priorityList.get(priorityList.size() - 1);
    		boolean giveUp = false;
    		System.out.println(rFactor + " * " + counts.size() + " / " + dstoreFiles.size() + " = " + optimumStoreAmount);
    		while((dstoreFiles.get(maxStore).size() > Math.ceil(optimumStoreAmount)
    				|| dstoreFiles.get(minStore).size() < Math.floor(optimumStoreAmount))
    				&& !giveUp) {
    			giveUp = true;
    			
    			Iterator<String> jt = dstoreFiles.get(maxStore).iterator();
    			while(jt.hasNext()) {
    				String thisFile = jt.next();
    				if(!dstoreFiles.get(minStore).contains(thisFile)) {
    					//System.out.println(optimumStoreAmount);
    					//System.out.println("Moving " + thisFile + " from " + maxStore + "[" + dstoreFiles.get(maxStore).size() + "] to " + minStore + "[" + dstoreFiles.get(minStore).size() + "]");
    					dstoreFiles.get(minStore).add(thisFile);
    					dstoreFiles.get(maxStore).remove(thisFile);
    					giveUp = false;
    					break;
    				}
    			}
    			
    			priorityList.sort(new AllocComparator(true));
    			minStore = priorityList.get(0);
    			maxStore = priorityList.get(priorityList.size() - 1);
    			
    			if(giveUp) System.out.println("Gave up reallocating files");
    		}
    		
    		return dstoreFiles;
    	}
    	
    	Map<Integer,String> composeRebalanceMessages(Map<Integer,List<String>> oldAlloc, Map<Integer,List<String>> newAlloc) {
    		Map<String,List<Integer>> requireIndex = new HashMap<String,List<Integer>>();
    		
    		//Compose a map of required files by finding files of the new allocation that weren't present in the old
    		for(Integer dstore : newAlloc.keySet()) {
    			List<String> oldFiles = oldAlloc.get(dstore);
    			for(String file : newAlloc.get(dstore)) {
    				if(!oldFiles.contains(file)) {
    					List<Integer> requires = requireIndex.get(file);
    					if(requires == null) {
    						requires = new ArrayList<Integer>();
    						requireIndex.put(file, requires);
    					}
    					requires.add(dstore);
    					index.get(file).addStoredBy(dstore);
    				}
    			}
    		}
    		
    		Map<String,Integer> hasRequire = new HashMap<String,Integer>();
    		for(String file : requireIndex.keySet()) {
    			int count = 0;
    			for(Integer dstore : oldAlloc.keySet()) {
    				if(oldAlloc.get(dstore).contains(file)) {
    					count ++;
    				}
    			}
    			hasRequire.put(file, count);
    		}
    		
    		Map<Integer,String> messages = new HashMap<Integer,String>();
    		for(Integer dstore : newAlloc.keySet()) {
    			String thisMessage = "";
    			
    			//Compose files to send
    			int filesToSend = 0;
    			List<String> oldFiles = oldAlloc.get(dstore);
    			List<String> newFiles = newAlloc.get(dstore);
    			Iterator<String> it = requireIndex.keySet().iterator();
    			while(it.hasNext()) {
    				String file = it.next();
    				if(oldFiles.contains(file)) {
    					filesToSend ++;
    					List<Integer> thisRequire = requireIndex.get(file);
    					int distribution = (int) Math.ceil((double) thisRequire.size() / (double) hasRequire.get(file));
    					//thisMessage = thisMessage + " " + file + " " + thisRequire.size();
    					int numberSentTo = 0;
    					String sentTo = "";
    					while(numberSentTo < distribution && !thisRequire.isEmpty()) {
    						Integer otherStore = thisRequire.get(0);
    						sentTo = sentTo + " " + otherStore;
    						thisRequire.remove(0);
    						numberSentTo ++;
    					}
    					thisMessage = thisMessage + " " + file + " " + numberSentTo + sentTo;
    					if(thisRequire.isEmpty()) it.remove();
    				}
    			}
    			
    			thisMessage = Protocol.REBALANCE_TOKEN + " " + filesToSend + thisMessage;
    			
    			String removeMessage = "";
    			int filesToRemove = 0;
    			for(String file : oldFiles) {
    				if(!newFiles.contains(file)) {
    					filesToRemove ++;
    					removeMessage = removeMessage + " " + file;
    					if(index.get(file) != null) index.get(file).removeStoredBy(dstore);
    				}
    			}
    			
    			if(filesToSend == 0 && filesToRemove == 0) continue;
    			thisMessage = thisMessage + " " + filesToRemove + removeMessage;
    			messages.put(dstore, thisMessage);
    		}
    		
    		return messages;
    	}
    	
    	void removeDstore(DstoreDisconnectException e) {
    		Integer port = e.getConnection().getPort();
    		synchronized(dstores) {
    			if(dstores.containsKey(port) && dstores.get(port).equals(e.getConnection())) dstores.remove(port);
    		}
    		
    		try {e.getConnection().getSocket().close();} catch(IOException ee) {}
    		
    		Iterator<IndexEntry> it;
    		synchronized(index) {it = index.values().iterator();}
    		while(it.hasNext()) {
    			it.next().removeStoredBy(port);
    		}
    		
    		rebalanceLock.queueRebalance();
    	}
    	
    	Iterator<Integer> sequenceIt = null;
    	Object sequenceLock = new Object();
    	Integer nextStoreInSequence() {
    		Integer store = null;
    		while(store == null) {
    			synchronized(sequenceLock) {
    				if(sequenceIt == null || !sequenceIt.hasNext()) {
    					if(!resetSequence()) return null;
    				}
    				
    				store = sequenceIt.next();
    				if(!dstores.containsKey(store)) store = null;
    			}
    		}
    		return store;
    	}
    	
    	boolean resetSequence() {
    		synchronized(sequenceLock) { synchronized(dstores) {
    			if(dstores.isEmpty()) return false;
    			sequenceIt = new HashSet<Integer>(dstores.keySet()).iterator();
    		}}
    		return true;
    	}
    	
    	void messageSent(Socket socket, String message) {
    		ControllerLogger.getInstance().messageSent(socket, message);
    	}
    	
    	void messageReceived(Socket socket, String message) {
    		ControllerLogger.getInstance().messageReceived(socket, message);
    	}
    }