import java.io.*; import java.net.*; import java.util.HashSet; import java.util.Set; public class Dstore { private final int port; private final int cport; private final int timeout; private final String file_folder; private final Set<String> files; public Dstore(int port, int cport, int timeout, String file_folder) { this.port = port; this.cport = cport; this.timeout = timeout; this.file_folder = file_folder; this.files = new HashSet<>(); } public int getPort() { return port; } public int getCport() { return cport; } public int getTimeout() { return timeout; } public String getFile_folder() { return file_folder; } public Set<String> getFiles() { return files; } public void listenClient(Socket socket) { try { ServerSocket dstore = new ServerSocket(port); for(;;) { try { Socket client = dstore.accept(); new Thread(() -> { try { handleDstoreClientReq(client, Dstore.this, socket); } catch (IOException e) { e.printStackTrace(); } }).start(); } catch(Exception e) { e.printStackTrace(); } } } catch (Exception e) { e.printStackTrace(); } } public void listenController(Socket socket) { new Thread(() -> { try { handleDstoreControllerReq(Dstore.this, socket); } catch (IOException e) { e.printStackTrace(); } }).start(); } public void handleDstoreClientReq(Socket client, Dstore dstore, Socket controllerSocket) throws IOException { OutputStream out = client.getOutputStream(); InputStream in = client.getInputStream(); BufferedReader req = new BufferedReader(new InputStreamReader(in)); PrintWriter res = new PrintWriter(new OutputStreamWriter(out)); String line; boolean done = false; while(!done && (line = req.readLine()) != null) { String[] tokens = line.split(" "); String command = tokens[0]; if(dstore.getFiles().size() < Controller.R) { res.println("ERROR_NOT_ENOUGH_DSTORES"); res.flush(); } else { switch (command) { case "STORE": try { res.println("ACK"); res.flush(); String filename = tokens[1]; int filesize = Integer.parseInt(tokens[2]); long limit = System.currentTimeMillis() + dstore.getTimeout(); boolean finished = false; while(!finished && System.currentTimeMillis() < limit) { // read data and store the file byte[] data = new byte[filesize]; client.getInputStream().readNBytes(data, 0, filesize); FileOutputStream o = new FileOutputStream(dstore.getFile_folder() + "/" + filename); o.write(data); o.flush(); o.close(); dstore.getFiles().add(filename); finished = true; } if(!finished) { System.out.println("ERROR: time expired"); return; } PrintWriter r = new PrintWriter(new OutputStreamWriter(controllerSocket.getOutputStream())); r.println("STORE_ACK " + filename + " " + dstore.getPort()); r.flush(); } catch (IndexOutOfBoundsException e) { System.out.println("Arguments don;t match the STORE operation"); } break; case "LOAD_DATA": if(tokens.length != 2) { System.out.println("Arguments don't match in LOAD operation"); } else { String filename = tokens[1]; try { if (dstore.getFiles().contains(filename)) { File file = new File(dstore.getFile_folder() + "/" + filename); FileInputStream i = new FileInputStream(file); int n; while ((n = i.read()) != -1) { client.getOutputStream().write(n); } i.close(); } else { done = true; client.close(); } Server.pos = 0; } catch (Exception ingored) { res.println("RELOAD " + filename); res.flush(); } } break; default: System.out.println("Unknown command"); break; } } } } public void handleDstoreControllerReq(Dstore dstore, Socket controllerSocket) throws IOException { OutputStream out = controllerSocket.getOutputStream(); InputStream in = controllerSocket.getInputStream(); BufferedReader req = new BufferedReader(new InputStreamReader(in)); PrintWriter res = new PrintWriter(new OutputStreamWriter(out)); String line; System.out.println("Entered controller req"); while((line = req.readLine()) != null) { String[] tokens = line.split(" "); String command = tokens[0]; if(dstore.getFiles().size() < Controller.R) { res.println("ERROR_NOT_ENOUGH_DSTORES"); res.flush(); } else { if(command.equals("REMOVE")) { try { String filename = tokens[1]; if(dstore.getFiles().contains(filename)) { System.out.println("REMOVE_ACK"); File file = new File(dstore.getFile_folder() + "/" + filename); file.delete(); dstore.getFiles().remove(filename); res.println("REMOVE_ACK " + filename); } else { res.println("ERROR_FILE_DOES_NOT_EXIST " + filename); } res.flush(); } catch(IndexOutOfBoundsException e) { System.out.println("Arguments don't match in REMOVE opretaion"); } } else { System.out.println("Unknown command"); } } } } public static void main(String[] args) { if(args.length != 4) { System.out.println("Incorrect number of arguments for the dstore"); return; } int port = Integer.parseInt(args[0]); int cport = Integer.parseInt(args[1]); int timeout = Integer.parseInt(args[2]); String file_folder = args[3]; Dstore dstore = new Dstore(port, cport, timeout, file_folder); try { Socket socket = new Socket(); socket.connect(new InetSocketAddress("localhost", cport)); OutputStream out = socket.getOutputStream(); PrintWriter res = new PrintWriter(new OutputStreamWriter(out)); res.println("JOIN " + port); res.flush(); dstore.listenController(socket); dstore.listenClient(socket); } catch (Exception e) { e.printStackTrace(); } } }