diff --git a/src/Controller.java b/src/Controller.java index 820124e34e1e19f1c448a57ad45c7c8d92064cf8..d5cfe8443615f5fe45b9956666316eb1c654c26d 100644 --- a/src/Controller.java +++ b/src/Controller.java @@ -1,8 +1,10 @@ import java.io.*; import java.net.*; +import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.HashMap; import java.util.List; +import java.util.zip.InflaterInputStream; public class Controller { @@ -12,13 +14,20 @@ public class Controller { // it doesn't serve any client request until at least R Dstores have joined the system static File file = new File("output.txt"); + static int R; + // list of all dstores + static List<Socket> ports = new ArrayList<>(); + // file, list of dstores + static HashMap<String, List<Socket>> index = new HashMap<>(); + // file, status (e.g. in progress) + static HashMap<String, String> status = new HashMap<>(); public static void main (String[] args) { // port to listen on final int cport = Integer.parseInt(args[0]); // replication factor => number of Dstores to join - int R = Integer.parseInt(args[1]); + R = Integer.parseInt(args[1]); // timeout in milliseconds int timeout = Integer.parseInt(args[2]); // how long to wait to start the next rebalance operation @@ -26,53 +35,39 @@ public class Controller { System.out.println("Started"); - // list of all dstores - List<Socket> ports = new ArrayList<>(); - // file, list of dstores - HashMap<String, List<Socket>> index = new HashMap<>(); - // file, status (e.g. in progress) - HashMap<String, String> status = new HashMap<>(); - createLogFile(); try { ServerSocket socket = new ServerSocket(cport); for(;;) { try { - if(R > 0) { + while(R > 0) { + // accept Dstores System.out.println("waiting for Dstore to join"); Socket client = socket.accept(); // establish a connection between client and server - System.out.print(R + ": "); + System.out.println(R); ports.add(client); R = R - 1; - - } else { - - //Socket clientDstore = socket.accept(); - - for(Socket client : ports) { - Thread t = new Thread() { - @Override - public void run() { - try { - handleDstores(client); - } catch (Exception e) { - e.printStackTrace(); - } - } - }; - t.start(); - } - break; } + // create a Thread for each client + Socket client = socket.accept(); + Thread t = new Thread() { + @Override + public void run() { + try { + handleClient(client); + } catch (Exception e) { + e.printStackTrace(); + } + } + }; + t.start(); } catch (Exception e1) { System.out.println(e1); } } - - } catch (IOException e) { System.out.println("error" + e); } @@ -125,17 +120,70 @@ public class Controller { if (cmd.equals("STORE")) { + String filename = tokens[1]; + int filesize = Integer.parseInt(tokens[2]); + status.put(filename, "store in progress"); + log("store in progress"); + + String msg = ""; + + for(Socket socket : ports) { + msg = socket.getPort() + msg + " "; + } + + // test and see if this actually gets all the ports + System.out.println("STORE_TO " + msg); + + // send this message to the client so the client knows where to store the file + outputStream.write(("STORE_TO " + msg).getBytes()); + outputStream.flush(); + + final int[] num = {0}; + + for(Socket socket : ports) { + Thread t = new Thread() { + @Override + public void run() { + try { + InputStream inputStream = socket.getInputStream(); + BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream)); + String line = reader.readLine(); + String[] ts = line.split(" "); + String cmd = ts[0]; + if(cmd == "STORE_ACK") { + num[0]++; + } + } catch (Exception e) { + e.printStackTrace(); + } + } + }; + t.start(); + } + + status.put(filename, "store complete"); + outputStream.write("STORE_COMPLETE".getBytes()); + } else if (cmd.equals("LOAD")) { } else if (cmd.equals("REMOVE")) { } else if (cmd.equals("LIST")) { + String msg = ""; + for(String filename : status.keySet()) { + msg = filename + " " + msg; + } + + System.out.println(msg); + outputStream.write(("LIST " + msg).getBytes()); } else { String msg = "unknown command " + line; log(msg); } } + + clientSocket.close(); } private static void createLogFile() { @@ -154,7 +202,7 @@ public class Controller { try { FileWriter writer = new FileWriter(file, true); writer.write(java.time.LocalDate.now() + ", " + - java.time.LocalTime.now() + ": " + message + '\n'); + java.time.LocalTime.now() + ", CONTROLLER: " + message + '\n'); writer.close(); } catch (IOException e) { e.printStackTrace(); diff --git a/src/Dstore.java b/src/Dstore.java index 945d63af4fb3793f594fa71160c992f953e97caa..0de5ae7537f4cf24a09cb6f24a531993b9c94be0 100644 --- a/src/Dstore.java +++ b/src/Dstore.java @@ -1,6 +1,5 @@ import java.io.*; import java.net.*; -import java.nio.charset.StandardCharsets; public class Dstore { @@ -71,14 +70,17 @@ public class Dstore { } else if (cmd.equals("LOAD_DATA")) { String filename = tokens[1]; - FileInputStream file = new FileInputStream(file_folder + "/" + filename); - int n; - while((n = file.read())!= -1) { - outputStream.write(n); + File file = new File(file_folder + "/" + filename); + if(file.exists()) { + FileInputStream in = new FileInputStream(file_folder + "/" + filename); + int n; + while((n = in.read())!= -1) { + outputStream.write(n); + } + in.close(); + } else { + clientSocket.close(); } - file.close(); - - } else if (cmd.equals("LIST")) { } else if (cmd.equals("QUIT")) { clientSocket.close(); @@ -96,7 +98,7 @@ public class Dstore { } } - private static void handleController(Socket controllerSocket) throws IOException { + private static void handleController(Socket controllerSocket, String file_folder) throws IOException { OutputStream outputStream = controllerSocket.getOutputStream(); InputStream inputStream = controllerSocket.getInputStream(); BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream)); @@ -108,6 +110,15 @@ public class Dstore { if (cmd.equals("REMOVE")) { + String filename = tokens[1]; + File file = new File(file_folder + "/" + filename); + if(file.exists()) { + file.delete(); + outputStream.write(("REMOVE_ACK " + filename).getBytes()); + } else { + outputStream.write(("ERROR_FILE_DOES_NOT_EXIST " + filename).getBytes()); + } + } else if (cmd.equals("QUIT")) { controllerSocket.close(); break; @@ -139,7 +150,7 @@ public class Dstore { try { FileWriter writer = new FileWriter(file, true); writer.write(java.time.LocalDate.now() + ", " + - java.time.LocalTime.now() + ": " + message + '\n'); + java.time.LocalTime.now() + ", DSTORE: " + message + '\n'); writer.close(); } catch (IOException e) { e.printStackTrace();