Skip to content
Snippets Groups Projects
Select Git revision
  • a248f149e99a62379834df3343c8f87b4b50b3e5
  • main default
  • feat_deconstruct_irq_gen
  • laudantium-unde-et-iste-et
  • ea-dolor-quia-et-sint
  • ipsum-consequatur-et-in-et
  • sapiente-et-possimus-neque-est
  • qui-in-quod-nam-voluptatem
  • aut-deleniti-est-voluptatum-repellat
  • modi-et-quam-sunt-consequatur
  • et-laudantium-voluptas-quos-pariatur
  • voluptatem-quia-fugit-ut-perferendis
  • at-adipisci-ducimus-qui-nihil
  • dolorem-ratione-sed-illum-minima
  • inventore-temporibus-ipsum-neque-rerum
  • autem-at-dolore-molestiae-et
  • doloribus-dolorem-quos-adipisci-et
  • sed-sit-tempore-expedita-possimus
  • et-recusandae-deleniti-voluptas-consectetur
  • atque-corrupti-laboriosam-nobis-explicabo
  • nostrum-ut-vel-voluptates-et
21 results

sourceme

Blame
  • Dstore.java 6.69 KiB
    import java.io.*;
    import java.lang.Runnable;
    import java.nio.file.Files;
    import java.nio.file.Path;
    import java.net.*;
    import java.util.Map;
    import java.util.HashMap;
    
    public class Dstore {
    	protected int port; //Port to listen on
    	protected int cport; //Controller's port to talk to
    	protected int timeout; //in milliseconds
    	protected String fileFolder; //Where to store the data locally
    	
    	public Dstore(int port, int cport, int timeout, String fileFolder) {
    		this.port = port;
    		this.cport = cport;
    		this.timeout = timeout;
    		this.fileFolder = fileFolder;
    	}
    	
    	public static void main(String[] args) {
    		try {
    			int port = Integer.parseInt(args[0]);
    			int cport = Integer.parseInt(args[1]);
    			int timeout = Integer.parseInt(args[2]);
    			String fileFolder = args[3];
    			
    			Dstore dstore = new Dstore(port, cport, timeout, fileFolder);
    			dstore.start();
    		}
    		catch(IndexOutOfBoundsException e) {
    			System.out.println("Command line arguments have not been provided");
    			return;
    		}
    		catch(NumberFormatException e) {
    			System.out.println("Command line arguments must be integers");
    			return;
    		}
    	}
    	
    	public void start() {
    		try {
    			Socket socket = new Socket(InetAddress.getLocalHost(), cport);
    			BufferedWriter out = new BufferedWriter(new OutputStreamWriter(socket.getOutputStream()));
    			out.write("JOIN " + port);
    			out.close();
    			socket.close();
    			
    			ServerSocket server = new ServerSocket(port);
    			
    			while(true) {
    				try {
    					Socket client = server.accept();
    					BufferedReader in = new BufferedReader(new InputStreamReader(client.getInputStream()));
    					String[] message = in.readLine().split(" ");
    					handleMessage(message, client, in);
    					in.close();
    				}
    				catch(Exception e) {
    					//Log error
    					e.printStackTrace();
    				}
    			}
    		}
    		catch(Exception e) {
    			e.printStackTrace();
    		}
    	}
    	
    	void handleMessage(String[] message, Socket client, BufferedReader clientIn) throws Exception {
    		if(message[0].equals("STORE")) {
    			store(client, message[1], Integer.parseInt(message[2]), clientIn);
    		}
    		else if(message[0].equals("LOAD_DATA")) {
    			load(client, message[1]);
    		}
    		else if(message[0].equals("REMOVE")) {
    			remove(client, message[1]);
    		}
    		else if(message[0].equals("LIST")) {
    			list(client);
    		}
    		else if(message[0].equals("REBALANCE")) {
    			rebalance(client, message);
    		}
    		else {
    			//Log error and continue (throw exception?)
    		}
    	}
    	
    	void store(Socket client, String filename, int filesize, BufferedReader in) throws Exception {
    		new Thread(() -> {
    			try {
    				//Send ACK message to client
    				PrintWriter out = new PrintWriter(client.getOutputStream());
    				out.print("ACK");
    				out.flush();
    				out.close();
    				
    				FileOutputStream writer = new FileOutputStream(fileFolder + "/" + filename, false);
    				
    				//Receive + write file content from client
    				int byteCount = filesize;
    				while(byteCount > 0) {
    					byte[] nextLine = in.readLine().getBytes();
    					writer.write(nextLine);
    					writer.flush();
    					byteCount -= nextLine.length;
    				}
    				writer.close();
    				
    				//Send STORE_ACK message to the Controller
    				PrintWriter controllerOut = new PrintWriter(new Socket(InetAddress.getLocalHost(), cport).getOutputStream());
    				controllerOut.print("STORE_ACK " + filename);
    				controllerOut.flush();
    				controllerOut.close();
    			}
    			catch(IOException e) {
    				e.printStackTrace();
    			}
    		}).start();
    	}
    	
    	void load(Socket client, String filename) throws Exception {
    		new Thread(() -> {
    			try {
    				//Send the content of the file in fileFolder to the client
    				PrintWriter out = new PrintWriter(client.getOutputStream());
    				FileInputStream reader;
    				try {
    					reader = new FileInputStream(fileFolder + "/" + filename);
    				}
    				catch(FileNotFoundException e) {
    					out.print("ERROR DOES_NOT_EXIST");
    					out.flush();
    					out.close();
    					return;
    				}
    				
    				byte[] buf = new byte[8];
    				while(reader.read(buf) != -1) {
    					out.print(new String(buf));
    					out.flush();
    				}
    				
    				reader.close();
    				out.close();
    			}
    			catch(IOException e) {
    				e.printStackTrace();
    			}
    		}).start();
    	}
    	
    	void remove(Socket client, String filename) throws Exception {
    		new Thread(() -> {
    			try {
    				//Remove the file from fileFolder
    				Path path = new File(fileFolder + "/" + filename).toPath();
    				PrintWriter out = new PrintWriter(client.getOutputStream());
    				
    				if(Files.deleteIfExists(path)) {
    					//Send REMOVE_ACK message to client (the controller)
    					out.print("REMOVE_ACK");
    				}
    				else {
    					//Send DOES NOT EXIST error
    					out.print("ERROR DOES_NOT_EXIST " + filename);
    				}
    				
    				out.flush();
    				out.close();
    			}
    			catch(IOException e) {
    				e.printStackTrace();
    			}
    		}).start();
    	}
    	
    	void list(Socket client) throws Exception {
    		new Thread(() -> {
    			try {
    				//Send a list of all files in fileFolder to client (the controller)
    				PrintWriter out = new PrintWriter(client.getOutputStream());
    				for(File file : new File(fileFolder).listFiles()) {
    					out.print(file.getName());
    					out.flush();
    				}
    				out.close();
    			}
    			catch(IOException e) {
    				e.printStackTrace();
    			}
    		}).start();
    	}
    	
    	void rebalance(Socket client, String[] message) throws Exception {
    		new Thread(() -> {
    			try {
    				//Interpret files to send and files to remove from the message
    				Map<String,Integer[]> filesToSend;
    				String[] filesToRemove;
    				int index;
    				
    				int numberToSend = Integer.parseInt(message[1]);
    				index = 2;
    				filesToSend = new HashMap<String,Integer[]>(numberToSend);
    				for(int i=0; i<numberToSend; i++) {
    					String name = message[index];
    					index++;
    					
    					int numberOfReceivers = Integer.parseInt(message[index]);
    					index++;
    					Integer[] receivers = new Integer[numberOfReceivers];
    					for(int j=0; j<numberOfReceivers; j++) {
    						receivers[j] = Integer.parseInt(message[index]);
    						index++;
    					}
    					
    					filesToSend.put(name, receivers);
    				}
    				
    				int numberToRemove = Integer.parseInt(message[index]);
    				index++;
    				filesToRemove = new String[numberToRemove];
    				for(int k=0; k<numberToRemove; k++) {
    					filesToRemove[k] = message[index];
    					index++;
    				}
    				
    				//Send each file to send to the Dstore at the specified port number
    				for(String filename : filesToSend.keySet()) {
    					for(Integer dstore : filesToSend.get(filename)) {
    						//Same store functions as used in the client object
    					}
    				}
    				
    				//Remove each file to remove from fileFolder
    				for(String filename : filesToRemove) {
    					new File(fileFolder + "/" + filename).delete();
    				}
    				
    				//Send REBALANCE_COMPLETE message to client (the controller)
    				PrintWriter out = new PrintWriter(client.getOutputStream());
    				out.print("REBALANCE COMPLETE");
    				out.flush();
    				out.close();
    			}
    			catch(IOException e) {
    				e.printStackTrace();
    			}
    		}).start();
    	}
    }