diff --git a/src/Controller.java b/src/Controller.java index 08ef4329c103d8e0ddef9f5d2cba007a8283aab3..820124e34e1e19f1c448a57ad45c7c8d92064cf8 100644 --- a/src/Controller.java +++ b/src/Controller.java @@ -1,6 +1,7 @@ import java.io.*; import java.net.*; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; public class Controller { @@ -10,7 +11,7 @@ public class Controller { // waits for Dstores to join the datastore (rebalance operation) // it doesn't serve any client request until at least R Dstores have joined the system - File file = new File("output.txt"); + static File file = new File("output.txt"); public static void main (String[] args) { @@ -24,7 +25,15 @@ public class Controller { int rebalance_period = Integer.parseInt(args[3]); System.out.println("Started"); + + // list of all dstores List<Socket> ports = new ArrayList<>(); + // file, list of dstores + HashMap<String, List<Socket>> index = new HashMap<>(); + // file, status (e.g. in progress) + HashMap<String, String> status = new HashMap<>(); + + createLogFile(); try { ServerSocket socket = new ServerSocket(cport); @@ -37,31 +46,32 @@ public class Controller { ports.add(client); R = R - 1; - Thread t = new Thread() { - @Override - public void run() { - try { - handleDstores(client); - } catch (Exception e) { - e.printStackTrace(); - } - } - }; - t.start(); } else { + + //Socket clientDstore = socket.accept(); + + for(Socket client : ports) { + Thread t = new Thread() { + @Override + public void run() { + try { + handleDstores(client); + } catch (Exception e) { + e.printStackTrace(); + } + } + }; + t.start(); + } break; } - } catch (Exception e1) { System.out.println(e1); } } - /* - #TODO - - use threads to connect multiple clients (Dstore and Client) to a server - */ + } catch (IOException e) { System.out.println("error" + e); @@ -81,6 +91,7 @@ public class Controller { if(cmd.equals("ACK")) { } else if (cmd.equals("STORE_ACK")) { + System.out.println("STORE_ACK"); } else if (cmd.equals("REMOVE_ACK")) { @@ -88,12 +99,13 @@ public class Controller { clientSocket.close(); break; } else { - String msg = "unknown command " + line + "\n"; - outputStream.write(msg.getBytes()); + String msg = "unknown command " + line; + // outputStream.write(msg.getBytes()); + log(msg); } - String msg = "You typed " + line + "\n"; - outputStream.write(msg.getBytes()); + String msg = "You typed " + line; + log(msg); } if(!clientSocket.isClosed()) { @@ -101,7 +113,32 @@ public class Controller { } } - private void createLogFile() { + private static void handleClient(Socket clientSocket) throws IOException { + OutputStream outputStream = clientSocket.getOutputStream(); + InputStream inputStream = clientSocket.getInputStream(); + BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream)); + String line; + + while((line = reader.readLine()) != null) { + String[] tokens = line.split(" "); + String cmd = tokens[0]; + + if (cmd.equals("STORE")) { + + } else if (cmd.equals("LOAD")) { + + } else if (cmd.equals("REMOVE")) { + + } else if (cmd.equals("LIST")) { + + } else { + String msg = "unknown command " + line; + log(msg); + } + } + } + + private static void createLogFile() { try { if(file.createNewFile()) { System.out.println("File created"); @@ -113,10 +150,11 @@ public class Controller { } } - private void log(String message) { + private static void log(String message) { try { - FileWriter writer = new FileWriter(file); - writer.write(message + " at " + System.currentTimeMillis() + "\n"); + FileWriter writer = new FileWriter(file, true); + writer.write(java.time.LocalDate.now() + ", " + + java.time.LocalTime.now() + ": " + message + '\n'); writer.close(); } catch (IOException e) { e.printStackTrace(); diff --git a/src/Dstore.java b/src/Dstore.java index 9dfb9ed5c31b20fe69a632a08112ec4ede8c0e01..945d63af4fb3793f594fa71160c992f953e97caa 100644 --- a/src/Dstore.java +++ b/src/Dstore.java @@ -1,8 +1,11 @@ import java.io.*; import java.net.*; +import java.nio.charset.StandardCharsets; public class Dstore { + static File file = new File("output.txt"); + public static void main (String[] args) { // port to listen on @@ -15,22 +18,131 @@ public class Dstore { String file_folder = args[3]; InetAddress ip; + createLogFile(); try { + ip = InetAddress.getLocalHost(); Socket socket = new Socket(); - socket.connect(new InetSocketAddress(ip, cport), timeout); + socket.connect(new InetSocketAddress(ip, cport)); - for(;;) { - PrintWriter out = new PrintWriter(socket.getOutputStream()); - out.println("adding Dstore"); - out.flush(); - System.out.println("Dstore was added"); - Thread.sleep(1000); - } + ServerSocket s = new ServerSocket(port); + Socket client = s.accept(); + + handleClient(client, socket, file_folder); } catch (Exception e) { System.out.println("error " + e); } } + + private static void handleClient(Socket clientSocket, Socket controllerSocket, String file_folder) throws IOException, InterruptedException { + OutputStream outputStream = clientSocket.getOutputStream(); + InputStream inputStream = clientSocket.getInputStream(); + + OutputStream cOutputStream = controllerSocket.getOutputStream(); + InputStream cInputStream = controllerSocket.getInputStream(); + + BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream)); + String line; + + while((line = reader.readLine()) != null) { + String[] tokens = line.split(" "); + String cmd = tokens[0]; + + if (cmd.equals("STORE")) { + + // ACK the command from the client + outputStream.write("ACK\n".getBytes()); + + // read the filename, filesize and create the data bytes to be read + String filename = tokens[1]; + int filesize = Integer.parseInt(tokens[2]); + + // read data and store the file + byte[] data = new byte[filesize]; + inputStream.readNBytes(data, 0, filesize); + FileOutputStream out = new FileOutputStream(file_folder + "/" + filename); + out.write(data); + out.flush(); + out.close(); + + cOutputStream.write(("STORE_ACK " + filename).getBytes()); + + } else if (cmd.equals("LOAD_DATA")) { + String filename = tokens[1]; + FileInputStream file = new FileInputStream(file_folder + "/" + filename); + int n; + while((n = file.read())!= -1) { + outputStream.write(n); + } + file.close(); + + } else if (cmd.equals("LIST")) { + + } else if (cmd.equals("QUIT")) { + clientSocket.close(); + break; + } else { + String msg = "unknown command " + line; + // outputStream.write(msg.getBytes()); + log(msg); + } + } + + if(!clientSocket.isClosed() || !controllerSocket.isClosed()) { + clientSocket.close(); + controllerSocket.close(); + } + } + + private static void handleController(Socket controllerSocket) throws IOException { + OutputStream outputStream = controllerSocket.getOutputStream(); + InputStream inputStream = controllerSocket.getInputStream(); + BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream)); + String line; + + while((line = reader.readLine()) != null) { + String[] tokens = line.split(" "); + String cmd = tokens[0]; + + if (cmd.equals("REMOVE")) { + + } else if (cmd.equals("QUIT")) { + controllerSocket.close(); + break; + } else { + String msg = "unknown command " + line; + // outputStream.write(msg.getBytes()); + log(msg); + } + } + + if(!controllerSocket.isClosed()) { + controllerSocket.close(); + } + } + + private static void createLogFile() { + try { + if(file.createNewFile()) { + System.out.println("File created"); + } else { + System.out.println("File already exists"); + } + } catch (IOException e) { + e.printStackTrace(); + } + } + + private static void log(String message) { + try { + FileWriter writer = new FileWriter(file, true); + writer.write(java.time.LocalDate.now() + ", " + + java.time.LocalTime.now() + ": " + message + '\n'); + writer.close(); + } catch (IOException e) { + e.printStackTrace(); + } + } } \ No newline at end of file