Skip to content
Snippets Groups Projects
Select Git revision
  • e3df9a8e67136fee1568d0093c79954853d6be9c
  • master default protected
2 results

Dstore.java

Blame
  • Dstore.java 11.74 KiB
    import java.io.*;
    import java.lang.Runnable;
    import java.nio.file.Files;
    import java.nio.file.Path;
    import java.net.*;
    import java.util.List;
    import java.util.ArrayList;
    import java.util.Map;
    import java.util.HashMap;
    import java.util.concurrent.CountDownLatch;
    import java.util.concurrent.TimeUnit;
    
    public class Dstore {
    	protected int port; //Port to listen on
    	protected int cport; //Controller's port to talk to
    	protected int timeout; //in milliseconds
    	protected File fileFolder; //Where to store the data locally
    	protected Map<String,Long> fileSizes;
    	
    	protected Socket controllerSocket;
    	protected BufferedReader controllerIn;
    	protected PrintWriter controllerOut;
    	
    	protected final int BUFFER_SIZE = 256;
    	
    	public Dstore(int port, int cport, int timeout, String fileFolderName) throws Exception {
    		this.port = port;
    		this.cport = cport;
    		this.timeout = timeout;
    		
    		DstoreLogger.init(Logger.LoggingType.ON_FILE_AND_TERMINAL, port);
    		
    		fileFolder = new File(fileFolderName);
    		if(fileFolder.exists() && !fileFolder.isDirectory()) {
    			throw new Exception("Folder name provided exists as a file and not a directory");
    		}
    		else if(!fileFolder.exists()) {
    			System.out.println("New folder will be created");
    			if(!fileFolder.mkdir()) throw new Exception("Folder could not be created");
    		}
    		
    		fileSizes = new HashMap<String,Long>();
    		
    		for(File file : fileFolder.listFiles()) {
    			if(!file.delete()) throw new Exception("Directory specified has undeletable files; please try a different directory");
    		}
    	}
    	
    	public static void main(String[] args) {
    		try {
    			int port = Integer.parseInt(args[0]);
    			int cport = Integer.parseInt(args[1]);
    			int timeout = Integer.parseInt(args[2]);
    			String fileFolder = args[3];
    			
    			if(port < 0 || cport < 0 || timeout < 0) {
    				throw new Exception("Infeasible values provided as arguments");
    			}
    			
    			Dstore dstore = new Dstore(port, cport, timeout, fileFolder);
    			dstore.start();
    		}
    		catch(IndexOutOfBoundsException e) {
    			System.out.println("Command line arguments have not been provided");
    			return;
    		}
    		catch(NumberFormatException e) {
    			System.out.println("Command line arguments must be integers");
    			return;
    		}
    		catch(Exception e) {
    			e.printStackTrace();
    			return;
    		}
    	}
    	
    	public void start() {
    		try(Socket controllerSocket = new Socket(InetAddress.getLocalHost(), cport)) {
    			this.controllerSocket = controllerSocket;
    			controllerIn = new BufferedReader(new InputStreamReader(controllerSocket.getInputStream()));
    			controllerOut = new PrintWriter(controllerSocket.getOutputStream(), true);
    			String joinMessage = Protocol.JOIN_TOKEN + " " + port;
    			controllerOut.println(joinMessage);
    			controllerOut.flush();
    			messageSent(controllerSocket, joinMessage);
    			
    			new Thread(() -> {
    				while(true) {
    					try {
    						String message = controllerIn.readLine();
    						if(message != null) {
    							messageReceived(controllerSocket, message);
    							handleMessage(message.split(" "), controllerSocket);
    						}
    					}
    					catch(Exception e) {
    						e.printStackTrace();
    					}
    				}
    			}).start();
    			
    			ServerSocket server = new ServerSocket(port);
    			
    			while(true) {
    				try {
    					Socket client = server.accept();
    					BufferedReader in = new BufferedReader(new InputStreamReader(client.getInputStream()));
    					String message = in.readLine();
    					messageReceived(client, message);
    					handleMessage(message.split(" "), client);
    				}
    				catch(Exception e) {
    					//Log error
    					e.printStackTrace();
    				}
    			}
    		}
    		catch(Exception e) {
    			e.printStackTrace();
    		}
    	}
    	
    	void handleMessage(String[] message, Socket client) throws Exception {
    		if(message[0].equals(Protocol.STORE_TOKEN)) {
    			store(client, message[1], Long.parseLong(message[2]), true);
    		}
    		else if(message[0].equals(Protocol.REBALANCE_STORE_TOKEN)) {
    			store(client, message[1], Long.parseLong(message[2]), false);
    		}
    		else if(message[0].equals(Protocol.LOAD_DATA_TOKEN)) {
    			load(client, message[1]);
    		}
    		else if(message[0].equals(Protocol.REMOVE_TOKEN)) {
    			remove(message[1]);
    		}
    		else if(message[0].equals(Protocol.LIST_TOKEN)) {
    			list();
    		}
    		else if(message[0].equals(Protocol.REBALANCE_TOKEN)) {
    			rebalance(message);
    		}
    		else {
    			//Log error and continue (throw exception?)
    			System.out.println("Dstore " + port + " has received a malformed message");
    		}
    	}
    	
    	void store(Socket client, String filename, long filesize, boolean acknowledged) throws Exception {
    		new Thread(() -> {
    			try {
    				//Send ACK message to client
    				PrintWriter out = new PrintWriter(client.getOutputStream(), true);
    				out.println(Protocol.ACK_TOKEN);
    				messageSent(client, Protocol.ACK_TOKEN);
    				
    				OutputStream writer = new FileOutputStream(new File(fileFolder, filename), false);
    				InputStream reader = client.getInputStream();
    				
    				//Receive + write file content from client
    				byte[] nextLine = new byte[BUFFER_SIZE];
    				int len;
    				do {
    					len = reader.readNBytes(nextLine, 0, BUFFER_SIZE);
    					writer.write(nextLine, 0, len);
    					writer.flush();
    				}
    				while(len == BUFFER_SIZE);
    				writer.close();
    				
    				//Send STORE_ACK message to the Controller
    				if(acknowledged) {
    					synchronized(controllerOut) {
    						String controllerMessage = Protocol.STORE_ACK_TOKEN + " " + filename;
    						controllerOut.println(controllerMessage);
    						messageSent(controllerSocket, controllerMessage);
    					}
    				}
    				
    				synchronized(fileSizes) {
    					if(fileSizes.containsKey(filename)) fileSizes.remove(filename);
    					fileSizes.put(filename, Long.valueOf(filesize));
    				}
    			}
    			catch(IOException e) {
    				e.printStackTrace();
    			}
    			finally {
    				try {client.close();} catch(IOException e) {e.printStackTrace();}
    			}
    		}).start();
    	}
    	
    	void load(Socket client, String filename) throws Exception {
    		new Thread(() -> {
    			try {
    				//Send the content of the file in fileFolder to the client
    				FileInputStream reader;
    				try {
    					reader = new FileInputStream(new File(fileFolder, filename));
    				}
    				catch(FileNotFoundException e) {
    					client.close();
    					return;
    				}
    				
    				OutputStream contentOut = client.getOutputStream();
    				byte[] buf = new byte[BUFFER_SIZE];
    				int len;
    				do {
    					len = reader.read(buf);
    					if(len >= 0) {
    						contentOut.write(buf, 0, len);
    						contentOut.flush();
    					}
    				}
    				while(len == BUFFER_SIZE);
    				
    				reader.close();
    				contentOut.close();
    			}
    			catch(IOException e) {
    				e.printStackTrace();
    			}
    			finally {
    				try {if(!client.isClosed()) client.close();} catch(IOException e) {e.printStackTrace();}
    			}
    		}).start();
    	}
    	
    	void remove(String filename) throws Exception {
    		new Thread(() -> {
    			try {
    				System.out.println("Store " + port + " removing " + filename + "...");
    				//Remove the file from fileFolder
    				Path path = new File(fileFolder, filename).toPath();
    				
    				String controllerMessage;
    				if(Files.deleteIfExists(path)) {
    					System.out.println("Store " + port + " removed " + filename);
    					//Send REMOVE_ACK message to client (the controller)
    					synchronized(controllerOut) {
    						controllerMessage = Protocol.REMOVE_ACK_TOKEN + " " + filename;
    					}
    				}
    				else {
    					System.out.println("Store " + port + " couldn't remove " + filename);
    					//Send DOES NOT EXIST error
    					synchronized(controllerOut) {
    						controllerMessage = Protocol.ERROR_FILE_DOES_NOT_EXIST_TOKEN + " " + filename;
    					}
    				}
    				controllerOut.println(controllerMessage);
    				messageSent(controllerSocket, controllerMessage);
    			}
    			catch(IOException e) {
    				e.printStackTrace();
    			}
    		}).start();
    	}
    	
    	void list() throws Exception {
    		new Thread(() -> {
    			//Send a list of all files in fileFolder to client (the controller)
    			String message = Protocol.LIST_TOKEN;
    			for(File file : fileFolder.listFiles()) {
    				message = message + " " + file.getName();
    			}
    			synchronized(controllerOut) {
    				controllerOut.println(message);
    			}
    		}).start();
    	}
    	
    	void rebalance(String[] message) throws Exception {
    		System.out.println("Rebalance message received");
    		new Thread(() -> {
    			//Interpret files to send and files to remove from the message
    			Map<Integer,List<String>> filesToSend;
    			String[] filesToRemove;
    			int index;
    			
    			String tmessage = "";
    			for(String s : message) {
    				tmessage = tmessage + " " + s;
    			}
    			System.out.println("Interpreting message:" + tmessage);
    			int numberToSend = Integer.parseInt(message[1]);
    			int totalReceivers = 0;
    			index = 2;
    			filesToSend = new HashMap<Integer,List<String>>();
    			for(int i=0; i<numberToSend; i++) {
    				String name = message[index];
    				index++;
    				
    				int numberOfReceivers = Integer.parseInt(message[index]);
    				totalReceivers += numberOfReceivers;
    				index++;
    				for(int j=0; j<numberOfReceivers; j++) {
    					Integer receiver = Integer.parseInt(message[index]);
    					if(!filesToSend.containsKey(receiver)) {
    						filesToSend.put(receiver,new ArrayList<String>());
    					}
    					filesToSend.get(receiver).add(name);
    					index++;
    				}
    			}
    			
    			int numberToRemove = Integer.parseInt(message[index]);
    			index++;
    			filesToRemove = new String[numberToRemove];
    			for(int k=0; k<numberToRemove; k++) {
    				filesToRemove[k] = message[index];
    				index++;
    			}
    			System.out.println("Interpreting complete, will send " + numberToSend + " and remove " + numberToRemove);
    			
    			//Send each file to send to the Dstore at the specified port number
    			CountDownLatch latch = new CountDownLatch(totalReceivers);
    			for(Integer dstore : filesToSend.keySet()) {
    				for(String filename : filesToSend.get(dstore)) {
    					new Thread(() -> {
    						try {
    							System.out.println("Sending " + filename + " to store " + dstore);
    							Socket socket = new Socket(InetAddress.getLocalHost(), dstore.intValue());
    							PrintWriter out = new PrintWriter(socket.getOutputStream(), true);
    							long fileSize;
    							synchronized(fileSizes) {fileSize = fileSizes.get(filename);}
    							String dstoreMessage = Protocol.REBALANCE_STORE_TOKEN + " " + filename + " " + fileSize;
    							out.println(dstoreMessage);
    							messageSent(socket, dstoreMessage);
    							
    							BufferedReader in = new BufferedReader(new InputStreamReader(socket.getInputStream()));
    							String receivedMessage = in.readLine();
    							messageReceived(socket, receivedMessage);
    							if(!receivedMessage.equals(Protocol.ACK_TOKEN)) {
    								//Log error
    								System.err.println("Dstore " + dstore + " should have sent ACK but " + port + " received " + receivedMessage);
    							}
    							
    							byte[] content = new byte[BUFFER_SIZE];
    							int len;
    							FileInputStream fileIn = new FileInputStream(new File(fileFolder, filename));
    							OutputStream fileOut = socket.getOutputStream();
    							do {
    								len = fileIn.read(content);
    								if(len >= 0) {
    									fileOut.write(content, 0, len);
    									fileOut.flush();
    								}
    							}
    							while(len > 0);
    							fileIn.close();
    							fileOut.close();
    							in.close();
    							out.close();
    							socket.close();
    						}
    						catch(IOException e) {
    							e.printStackTrace();
    						}
    						finally {
    							try {latch.countDown();} catch(Exception e) {}
    						}
    					}).start();
    				}
    			}
    			try {latch.await(timeout, TimeUnit.MILLISECONDS);} catch(Exception e) {e.printStackTrace();}
    			
    			//Remove each file to remove from fileFolder
    			for(String filename : filesToRemove) {
    				System.out.println("Removing file " + filename);
    				new File(fileFolder, filename).delete();
    			}
    			
    			//Send REBALANCE_COMPLETE message to client (the controller)
    			synchronized(controllerOut) {
    				controllerOut.println(Protocol.REBALANCE_COMPLETE_TOKEN);
    				messageSent(controllerSocket, Protocol.REBALANCE_COMPLETE_TOKEN);
    			}
    		}).start();
    	}
    	
    	protected void messageSent(Socket socket, String message) {
    		DstoreLogger.getInstance().messageSent(socket, message);
    	}
    	
    	protected void messageReceived(Socket socket, String message) {
    		DstoreLogger.getInstance().messageReceived(socket, message);
    	}
    }