Skip to content
Snippets Groups Projects
Select Git revision
  • 69f99016fff7bcd5cea27cb2835a77c3f4587ed2
  • master default protected
2 results

Controller.java

Blame
  • Controller.java 12.11 KiB
    import java.io.*;
    import java.net.*;
    import java.lang.Runnable;
    import java.util.List;
    import java.util.ArrayList;
    import java.util.Map;
    import java.util.HashMap;
    import java.util.Collection;
    
    public class Controller {
    	protected int cport; //Port to listen on
    	protected int rFactor; //Replication factor; each file is replicated across r Dstores
    	protected int timeout; //in milliseconds
    	protected int rebalancePeriod; //How long to wait to start the next rebalance operation, in milliseconds
    	
    	protected class IndexEntry {
    		protected int filesize;
    		protected List<Integer> storedBy;
    		protected int numberToStore;
    		protected String status;
    		protected Object storeAckLock;
    		protected List<Reloader> clientLoadList;
    		
    		public IndexEntry() {
    			filesize = -1;
    			storedBy = new SyncList();
    			numberToStore = 0;
    			status = "store in progress";
    			storeAckLock = new Object();
    			clientLoadList = new ArrayList<Reloader>();
    		}
    		
    		public synchronized void setFilesize(int filesize) {
    			this.filesize = filesize;
    		}
    		
    		public synchronized int getFilesize() {
    			return filesize;
    		}
    		
    		public void addStoredBy(int dstore) {
    			storedBy.add(Integer.valueOf(dstore));
    			if(storedBy.size() == numberToStore) storeAckLock.notify();
    		}
    		
    		public void addStoredBy(List<Integer> dstores) {
    			storedBy.addAll(dstores);
    			if(storedBy.size() == numberToStore) storeAckLock.notify();
    		}
    		
    		public void removeStoredBy(int dstore) {
    			storedBy.remove(Integer.valueOf(dstore));
    			if(storedBy.size() == 0) storeAckLock.notify();
    		}
    		
    		public void removeStoredBy(List<Integer> dstores) {
    			storedBy.removeAll(dstores);
    			if(storedBy.size() == 0) storeAckLock.notify();
    		}
    		
    		public List<Integer> getStoredBy() {
    			return storedBy;
    		}
    		
    		public synchronized void setNumberToStore(int i) {
    			numberToStore = i;
    		}
    		
    		public synchronized void setStatus(String status) {
    			this.status = status;
    		}
    		
    		public synchronized String getStatus() {
    			return status;
    		}
    		
    		public Object getLock() {
    			return storeAckLock;
    		}
    		
    		public List<Reloader> getLoadList() {
    			return clientLoadList;
    		}
    	}
    	
    	protected class SyncList extends ArrayList<Integer> {
    		public SyncList() {
    			super();
    		}
    		
    		@Override
    		public boolean add(Integer i) {
    			synchronized(this) {
    				return super.add(i);
    			}
    		}
    		
    		@Override
    		public boolean addAll(Collection<? extends Integer> c) {
    			synchronized(this) {
    				return super.addAll(c);
    			}
    		}
    		
    		@Override
    		public Integer get(int i) {
    			synchronized(this) {
    				return super.get(i);
    			}
    		}
    		
    		@Override
    		public int size() {
    			synchronized(this) {
    				return super.size();
    			}
    		}
    		
    		public Integer remove(int i) {
    			synchronized(this) {
    				return super.remove(i);
    			}
    		}
    		
    		public boolean remove(Integer i) {
    			synchronized(this) {
    				return super.remove(i);
    			}
    		}
    	}
    	
    	protected class Reloader {
    		public boolean reload;
    		public Reloader() {
    			reload = false;
    		}
    	}
    	
    	protected List<Integer> dstores;
    	protected Map<Integer,String[]> rebalanceMessages;
    	protected Map<String,IndexEntry> index;
    	
    	public Controller(int cport, int rFactor, int timeout, int rebalancePeriod) {
    		this.cport = cport;
    		this.rFactor = rFactor;
    		this.timeout = timeout;
    		this.rebalancePeriod = rebalancePeriod;
    		dstores = new SyncList();
    		index = new HashMap<String,IndexEntry>();
    	}
    	
    	public static void main(String[] args) {
    		try {
    			int cport = Integer.parseInt(args[0]);
    			int rFactor = Integer.parseInt(args[1]);
    			int timeout = Integer.parseInt(args[2]);
    			int rebalancePeriod = Integer.parseInt(args[3]);
    			
    			Controller controller = new Controller(cport, rFactor, timeout, rebalancePeriod);
    			controller.start();
    		}
    		catch(IndexOutOfBoundsException e) {
    			System.out.println("Command line arguments have not been provided");
    			return;
    		}
    		catch(NumberFormatException e) {
    			System.out.println("Command line arguments must be integers");
    			return;
    		}
    	}
    	
    	public void start() {
    		try {
    			ServerSocket server = new ServerSocket(cport);
    			while(true) {
    				try {
    					Socket client = server.accept();
    					BufferedReader in = new BufferedReader(new InputStreamReader(client.getInputStream()));
    					String[] message = in.readLine().split(" ");
    					if(dstores.size() < rFactor && !message[0].equals("JOIN")) {
    						BufferedWriter out = new BufferedWriter(new OutputStreamWriter(client.getOutputStream()));
    						out.write("ERROR");
    						out.close();
    					}
    					else {
    						handleMessage(message, client);
    					}
    					in.close();
    				}
    				catch(Exception e) {
    					//Log error
    					e.printStackTrace();
    					System.out.println("Continue...");
    				}
    			}
    		}
    		catch(Exception e) {
    			e.printStackTrace();
    		}
    	}
    	
    	void handleMessage(String[] message, Socket client) throws Exception {
    		if(message[0].equals("JOIN")) {
    			dstores.add(Integer.parseInt(message[1]));
    			System.out.println("Dstore at " + message[1] + " joined");
    			rebalance();
    		}
    		else if(message[0].equals("STORE")) {
    			store(client, message[1]);
    		}
    		else if(message[0].equals("STORE_ACK")) {
    			storeAck(client, message[1]);
    		}
    		else if(message[0].equals("LOAD")) {
    			load(client, message[1]);
    		}
    		else if(message[0].equals("RELOAD")) {
    			reload(message[1]);
    		}
    		else if(message[0].equals("REMOVE")) {
    			remove(client, message[1]);
    		}
    		else if(message[0].equals("LIST")) {
    			list(client);
    		}
    		else {
    			for(String name : message) {
    				if(!index.containsKey(name)) {
    					//Log error and continue (throw exception?)
    					return;
    				}
    			}
    			receiveDstoreList(client, message);
    		}
    	}
    	
    	void store(Socket client, String filename) throws Exception {
    		new Thread(() -> {
    			try {
    				if(index.containsKey(filename)) {
    					PrintWriter out = new PrintWriter(client.getOutputStream());
    					out.print("ERROR ALREADY_EXISTS " + filename);
    					out.flush();
    					out.close();
    					return;
    				}
    				
    				//Update index to "store in progress"
    				IndexEntry entry = new IndexEntry();
    				index.put(filename, entry);
    				
    				//Select Dstores
    				int[] storesToStore = new int[rFactor];
    				for(int i=0; i<rFactor; i++) {
    					Integer thisStore = dstores.get(i);
    					storesToStore[i] = thisStore.intValue();
    				}
    				entry.setNumberToStore(rFactor);
    				
    				//Send STORE_TO message
    				PrintWriter out = new PrintWriter(client.getOutputStream());
    				out.print("STORE_TO");
    				for(int port : storesToStore) {
    					out.print(" " + port);
    				}
    				out.flush();
    				
    				//Wait for STORE_ACKs from datastores in storesToStore
    				try {
    					entry.getLock().wait(timeout);
    				}
    				catch(InterruptedException e) {
    					e.printStackTrace();
    				}
    				
    				if(entry.getStoredBy().size() < rFactor) {
    					//Log error
    				}
    				
    				//Update index to "store complete"
    				entry.status = "store complete";
    				
    				//Send STORE_COMPLETE message
    				out.print("STORE_COMPLETE");
    				out.flush();
    				out.close();
    			}
    			catch(IOException e) {
    				e.printStackTrace();
    			}
    		}).start();
    	}
    	
    	void storeAck(Socket client, String filename) throws Exception {
    		new Thread(() -> {
    			if(!index.containsKey(filename)) {
    				//Throw logging exception
    				return;
    			}
    			
    			IndexEntry thisEntry = index.get(filename);
    			thisEntry.addStoredBy(Integer.valueOf(client.getPort()));
    		}).start();
    	}
    	
    	void load(Socket client, String filename) throws Exception {
    		new Thread(() -> {
    			try {
    				if(!index.containsKey(filename)) {
    					PrintWriter out = new PrintWriter(client.getOutputStream());
    					out.print("ERROR DOES_NOT_EXIST");
    					out.flush();
    					out.close();
    					return;
    				}
    				
    				//Select a Dstore which contains the file
    				IndexEntry thisEntry = index.get(filename);
    				int thisStore = thisEntry.storedBy.get(0).intValue();
    				int thisSize = thisEntry.filesize;
    				
    				//Send LOAD_FROM message
    				PrintWriter out = new PrintWriter(client.getOutputStream());
    				out.print("LOAD_FROM " + thisStore + " " + thisSize);
    				out.flush();
    				
    				Reloader reloadLock = new Object();
    				thisEntry.getLoadList().add(reloadLock);
    				int trials = 0;
    				while(true) {
    					reloadLock.wait(10 * timeout);
    					trials ++;
    					if(trials >= rFactor || !reloadLock.reload) break;
    					out.print("LOAD_FROM " + thisEntry.storedBy.get(trials).intValue() + " " + thisSize);
    					out.flush();
    					reloadLock.reload = false;
    				}
    				
    				thisEntry.getLoadList().remove(reloadLock);
    				if(trials >= rFactor && reloadLock.reload) {
    					out.print("ERROR LOAD");
    					out.flush();
    				}
    				
    				out.close();
    			}
    			catch(IOException e) {
    				e.printStackTrace();
    			}
    		}).start();
    	}
    	
    	void reload(String filename) {
    		new Thread(() -> {
    			try {
    				for(Reloader r : index.get(filename).getLoadList()) {
    					r.reload = true;
    					r.notify();
    				}
    			}
    			catch(IOException e) {
    				e.printStackTrace();
    			}
    		}).start();
    	}
    	
    	void remove(Socket client, String filename) throws Exception {
    		new Thread(() -> {
    			try {
    				if(!index.containsKey(filename)) {
    					PrintWriter clientOut = new PrintWriter(client.getOutputStream());
    					clientOut.print("ERROR DOES_NOT_EXIST");
    					clientOut.flush();
    					clientOut.close();
    					return;
    				}
    				
    				//Update index to "remove in progress"
    				IndexEntry entry = index.get(filename);
    				entry.status = "remove in progress";
    				
    				//Send REMOVE message to all Dstores storing the file
    				for(Integer dstore : entry.storedBy) {
    					Socket socket = new Socket(InetAddress.getLocalHost(), dstore.intValue());
    					PrintWriter out = new PrintWriter(socket.getOutputStream());
    					out.write("REMOVE " + filename);
    					out.flush();
    					out.close();
    					socket.close();
    				}
    				
    				//Wait for REMOVE_ACKs from all Dstores which were sent the REMOVE message
    				try {
    					entry.getLock().wait(timeout);
    				}
    				catch(InterruptedException e) {
    					e.printStackTrace();
    				}
    				
    				if(entry.getStoredBy().size() > 0) {
    					//Log error
    				}
    				
    				//Update index to "remove complete"
    				entry.status = "remove complete";
    				
    				//Send REMOVE_COMPLETE to client
    				PrintWriter clientOut = new PrintWriter(client.getOutputStream());
    				clientOut.print("REMOVE_COMPLETE");
    				clientOut.flush();
    				clientOut.close();
    			}
    			catch(IOException e) {
    				e.printStackTrace();
    			}
    		}).start();
    	}
    	
    	void list(Socket client) throws Exception {
    		new Thread(() -> {
    			try {
    				//Send file list to client
    				PrintWriter out = new PrintWriter(client.getOutputStream());
    				for(String name : index.keySet()) {
    					out.println(name);
    				}
    				out.flush();
    				out.close();
    			}
    			catch(IOException e) {
    				e.printStackTrace();
    			}
    		}).start();
    	}
    	
    	void rebalance() throws Exception {
    		new Thread(() -> {
    			if(rebalanceMessages != null) return;
    			Map<Integer,String[]> dstoreFiles = new HashMap<Integer,String[]>();
    			rebalanceMessages = dstoreFiles;
    			try {
    				//Send LIST message to each Dstore and receive their file list
    				for(Integer dstore : dstores) {
    					Socket socket = new Socket(InetAddress.getLocalHost(), dstore.intValue());
    					PrintWriter out = new PrintWriter(socket.getOutputStream());
    					out.write("LIST");
    					out.flush();
    					out.close();
    					socket.close();
    				}
    				
    				dstoreFiles.wait(timeout);
    				if(dstoreFiles.size() < dstores.size()) {
    					//Log error
    				}
    				
    				//Create a new file allocation so that:
    				  //Each file appears rFactor times
    				  //Each file appears at most once on each datastore
    				  //Files are evenly distributed (Dstores differ in capacity by at most 1, no 2 datastores have identical file lists)
    				for(Integer i : reshuffle(dstoreFiles.keySet())) {
    					
    				}
    				
    				//Make a (files to send, files to remove) pair for each Dstore
    				
    				//Send the respective REBALANCE message to each Dstore
    				
    				//Wait for REBALANCE_COMPLETE from all Dstores
    			}
    			catch(IOException e) {
    				e.printStackTrace();
    			}
    			finally {
    				rebalanceMessages = null;
    			}
    		}).start();
    	}
    	
    	void receiveDstoreList(Socket client, String[] list) {
    		new Thread(() -> {
    			if(rebalanceMessages == null) return;
    			rebalanceMessages.add(Integer.valueOf(client.getPort()), list);
    			if(rebalanceMessages.size() == dstores.size()) {
    				rebalanceMessages.notify();
    			}
    		}).start();
    	}
    	
    	List<Integer> reshuffle(Collection<Integer> col) {
    		List<Integer> list = new ArrayList<Integer>();
    		for(Integer i : col) {
    			list.add(0, i);
    		}
    		return list;
    	}
    }