From 03c30ec763173e4b17a5585577fc2aa87ec85f7e Mon Sep 17 00:00:00 2001 From: Theodora-Mara Pislar <tmp1u19@soton.ac.uk> Date: Sat, 8 May 2021 18:30:34 +0100 Subject: [PATCH] Store, list, remove and load are now functional --- src/ClientMain.java | 2 +- src/Controller.java | 35 +++---- src/Dstore.java | 239 +++++++++++++++++++++++++++++++------------- src/Server.java | 120 ++++++++++++++-------- 4 files changed, 265 insertions(+), 131 deletions(-) diff --git a/src/ClientMain.java b/src/ClientMain.java index a5b72c5..6079050 100644 --- a/src/ClientMain.java +++ b/src/ClientMain.java @@ -9,7 +9,7 @@ public class ClientMain { final int cport = Integer.parseInt(args[0]); int timeout = Integer.parseInt(args[1]); - File downloadFolder = new File("to_store"); + File downloadFolder = new File("downloads"); if (!downloadFolder.exists()) if (!downloadFolder.mkdir()) throw new RuntimeException("Cannot create download folder (folder absolute path: " + downloadFolder.getAbsolutePath() + ")"); diff --git a/src/Controller.java b/src/Controller.java index 1ec54da..1ef5b44 100644 --- a/src/Controller.java +++ b/src/Controller.java @@ -1,21 +1,16 @@ -import java.io.*; -import java.net.*; -import java.util.*; +import java.net.ServerSocket; +import java.net.Socket; public class Controller { - static Map<Integer, Dstore> dstores = Collections.synchronizedMap(new HashMap<>()); - static Map<String, List<Dstore>> store = Collections.synchronizedMap(new HashMap<>()); - static Map<String, String> index = Collections.synchronizedMap(new HashMap<>()); - - private int cport; + private final int cport; static int R; - private int timeout; - private int rebalance_period; + private final int timeout; + private final int rebalance_period; public Controller(int cport, int R, int timeout, int rebalance_period) { this.cport = cport; - this.R = R; + Controller.R = R; this.timeout = timeout; this.rebalance_period = rebalance_period; } @@ -23,23 +18,21 @@ public class Controller { public void start() { try { ServerSocket controller = new ServerSocket(cport); - Server server = new Server(this); + FileSystem fileSystem = new FileSystem(); + Server server = new Server(this, fileSystem); System.out.println("Start listening for connections"); for(;;) { try { Socket client = controller.accept(); - new Thread() { - @Override - public void run() { - try { - server.handleClient(client); - } catch (Exception e) { - e.printStackTrace(); - } + new Thread(() -> { + try { + server.handleClient(client); + } catch (Exception e) { + e.printStackTrace(); } - }.start(); + }).start(); } catch (Exception e) { e.printStackTrace(); } diff --git a/src/Dstore.java b/src/Dstore.java index 65c0c28..9998daf 100644 --- a/src/Dstore.java +++ b/src/Dstore.java @@ -5,40 +5,54 @@ import java.util.Set; public class Dstore { - private int port; - private int cport; - private int timeout; - private String file_folder; - private Socket controllerSocket; - private Set<String> files; - - public Dstore(int port, int cport) throws IOException { + private final int port; + private final int cport; + private final int timeout; + private final String file_folder; + private final Set<String> files; + + public Dstore(int port, int cport, int timeout, String file_folder) { this.port = port; this.cport = cport; - InetAddress ip = InetAddress.getLocalHost(); - controllerSocket = new Socket(); - controllerSocket.connect(new InetSocketAddress(ip, cport)); - files = new HashSet<>(); + this.timeout = timeout; + this.file_folder = file_folder; + this.files = new HashSet<>(); + } + + public int getPort() { + return port; + } + + public int getCport() { + return cport; } - public void listenClient() { + public int getTimeout() { + return timeout; + } + + public String getFile_folder() { + return file_folder; + } + + public Set<String> getFiles() { + return files; + } + + public void listenClient(Socket socket) { try { ServerSocket dstore = new ServerSocket(port); for(;;) { try { Socket client = dstore.accept(); - new Thread() { - @Override - public void run() { - try { - controllerSocket.connect(new InetSocketAddress("localhost", cport)); - new Handler().handleDstoreClientReq(client, Dstore.this); - } catch (IOException e) { - e.printStackTrace(); - } + new Thread(() -> { + try { + handleDstoreClientReq(client, Dstore.this, socket); + } catch (IOException e) { + e.printStackTrace(); } - }.start(); + }).start(); } catch(Exception e) { e.printStackTrace(); } @@ -50,57 +64,139 @@ public class Dstore { } } - public void listenController() { - try { - new Thread() { - @Override - public void run() { - try { - controllerSocket.connect(new InetSocketAddress("localhost", cport)); - new Handler().handleDstoreControllerReq(Dstore.this); - } catch (IOException e) { - e.printStackTrace(); - } + public void listenController(Socket socket) { + new Thread(() -> { + try { + handleDstoreControllerReq(Dstore.this, socket); + } catch (IOException e) { + e.printStackTrace(); + } + }).start(); + } + + public void handleDstoreClientReq(Socket client, Dstore dstore, Socket controllerSocket) throws IOException { + + OutputStream out = client.getOutputStream(); + InputStream in = client.getInputStream(); + + BufferedReader req = new BufferedReader(new InputStreamReader(in)); + PrintWriter res = new PrintWriter(new OutputStreamWriter(out)); + String line; + boolean done = false; + + while(!done && (line = req.readLine()) != null) { + String[] tokens = line.split(" "); + String command = tokens[0]; + + if(dstore.getFiles().size() < Controller.R) { + res.println("ERROR_NOT_ENOUGH_DSTORES"); + res.flush(); + } else { + + switch (command) { + case "STORE": + try { + + res.println("ACK"); + res.flush(); + String filename = tokens[1]; + int filesize = Integer.parseInt(tokens[2]); + + // read data and store the file + byte[] data = new byte[filesize]; + client.getInputStream().readNBytes(data, 0, filesize); + FileOutputStream o = new FileOutputStream(dstore.getFile_folder() + + "/" + filename); + o.write(data); + o.flush(); + o.close(); + dstore.getFiles().add(filename); + + PrintWriter r = new PrintWriter(new OutputStreamWriter(controllerSocket.getOutputStream())); + r.println("STORE_ACK " + filename + " " + dstore.getPort()); + r.flush(); + + } catch (IndexOutOfBoundsException e) { + System.out.println("Arguments don;t match the STORE operation"); + } + break; + case "LOAD_DATA": + try { + + String filename = tokens[1]; + if (dstore.getFiles().contains(filename)) { + File file = new File(dstore.getFile_folder() + "/" + filename); + FileInputStream i = new FileInputStream(file); + int n; + while ((n = i.read()) != -1) { + client.getOutputStream().write(n); + } + i.close(); + } else { + done = true; + client.close(); + } + + } catch (IndexOutOfBoundsException e) { + + System.out.println("Arguments don't match in LOAD operation"); + } + break; + default: + System.out.println("Unknown command"); + break; } - }.start(); - } catch (Exception e) { - e.printStackTrace(); + } } } - public int getPort() { - return port; - } + public void handleDstoreControllerReq(Dstore dstore, Socket controllerSocket) throws IOException { - public void setTimeout(int timeout) { - this.timeout = timeout; - } + OutputStream out = controllerSocket.getOutputStream(); + InputStream in = controllerSocket.getInputStream(); - public void setFile_folder(String file_folder) { - this.file_folder = file_folder; - } + BufferedReader req = new BufferedReader(new InputStreamReader(in)); + PrintWriter res = new PrintWriter(new OutputStreamWriter(out)); + String line; - public int getCport() { - return cport; - } + System.out.println("Entered controller req"); - public int getTimeout() { - return timeout; - } + while((line = req.readLine()) != null) { + String[] tokens = line.split(" "); + String command = tokens[0]; - public String getFile_folder() { - return file_folder; - } + if(dstore.getFiles().size() < Controller.R) { + res.println("ERROR_NOT_ENOUGH_DSTORES"); + res.flush(); + } else { - public Set<String> getFiles() { - return files; - } + if(command.equals("REMOVE")) { + try { + String filename = tokens[1]; + if(dstore.getFiles().contains(filename)) { + System.out.println("REMOVE_ACK"); + File file = new File(dstore.getFile_folder() + "/" + filename); + file.delete(); + dstore.getFiles().remove(filename); + res.println("REMOVE_ACK " + filename); + } else { + res.println("ERROR_FILE_DOES_NOT_EXIST"); + } + + res.flush(); - public Socket getControllerSocket() { - return controllerSocket; + } catch(IndexOutOfBoundsException e) { + System.out.println("Arguments don't match in REMOVE opretaion"); + } + } else { + System.out.println("Unknown command"); + } + + } + } } - public static void main(String[] args) throws IOException { + public static void main(String[] args) { if(args.length != 4) { System.out.println("Incorrect number of arguments for the dstore"); @@ -112,13 +208,20 @@ public class Dstore { int timeout = Integer.parseInt(args[2]); String file_folder = args[3]; - Dstore dstore = new Dstore(port, cport); - dstore.setTimeout(timeout); - dstore.setFile_folder(file_folder); - - Controller.dstores.put(port, dstore); + Dstore dstore = new Dstore(port, cport, timeout, file_folder); - dstore.listenController(); - dstore.listenClient(); + try { + Socket socket = new Socket(); + socket.connect(new InetSocketAddress("localhost", cport)); + OutputStream out = socket.getOutputStream(); + PrintWriter res = new PrintWriter(new OutputStreamWriter(out)); + res.println("JOIN " + port); + res.flush(); + + dstore.listenController(socket); + dstore.listenClient(socket); + } catch (Exception e) { + e.printStackTrace(); + } } } diff --git a/src/Server.java b/src/Server.java index 43a222d..9fedf42 100644 --- a/src/Server.java +++ b/src/Server.java @@ -4,10 +4,12 @@ import java.util.*; public class Server { - private Controller controller; + private final Controller controller; + public static FileSystem fileSystem; - public Server(Controller controller) { + public Server(Controller controller, FileSystem fileSystem) { this.controller = controller; + Server.fileSystem = fileSystem; } public void handleClient(Socket client) throws IOException { @@ -24,23 +26,31 @@ public class Server { String[] tokens = line.split(" "); String command = tokens[0]; - if(command.equals("JOIN")) { - Dstore dstore = new Dstore(client.getPort(), controller.getCport()); - Controller.dstores.put(client.getPort(), dstore); - } else if(Controller.dstores.size() < controller.getR()) { + if (command.equals("JOIN")) { + int publicPort = Integer.parseInt(tokens[1]); + System.out.println("Dstore wants to connect"); + FSStore fsStore = new FSStore(client, publicPort); + fileSystem.addDstore(client.getPort(), fsStore); + } else if (fileSystem.getDstores().size() < controller.getR()) { res.println("ERROR_NOT_ENOUGH_DSTORES"); res.flush(); - } else if(Controller.dstores.containsKey(client.getPort())) { - switch(command) { - case "STORE_ACK" : handleStoreACK(); - case "REMOVE_ACK" : handleRemoveACK(); - } } else { - switch(command) { - case "STORE": handleStore(client, tokens); - case "LOAD": handleLoad(client, tokens); - case "REMOVE": handleRemove(client, tokens); - case "LIST": handleList(client); + if (fileSystem.getDstores().containsKey(client.getPort())) { + if(command.equals("STORE_ACK")) { + handleStoreACK(tokens); + } else if (command.equals("REMOVE_ACK")){ + handleRemoveACK(tokens); + } else { + System.out.println("Unkown command"); + } + } else { + switch (command) { + case "STORE" -> handleStore(client, tokens); + case "LOAD" -> handleLoad(client, tokens); + case "REMOVE" -> handleRemove(client, tokens); + case "LIST" -> handleList(client); + default -> System.out.println("Command unknown"); + } } } } @@ -53,18 +63,18 @@ public class Server { int filesize = Integer.parseInt(tokens[2]); PrintWriter res = new PrintWriter(new OutputStreamWriter(client.getOutputStream())); - Controller.index.put(filename, "store in progress"); + fileSystem.addIndex(filename, "store in progress"); String msg = ""; int i = 0; - List<Dstore> temp = new ArrayList<>(); + List<FSStore> temp = new ArrayList<>(); - for(int port : Controller.dstores.keySet()) { + for(int port : fileSystem.getDstores().keySet()) { if(i == controller.getR()) { break; } - temp.add(Controller.dstores.get(port)); - msg = Controller.dstores.get(port).getPort() + msg + " "; + temp.add(fileSystem.getDstores().get(port)); + msg = fileSystem.getDstores().get(port).getPublicPort() + msg + " "; } res.println("STORE_TO " + msg); @@ -74,12 +84,12 @@ public class Server { long limit = System.currentTimeMillis() + controller.getTimeout(); while(!done && System.currentTimeMillis() < limit) { - if(Controller.store.containsKey(filename) && Controller.store.get(filename).size() == controller.getR()) { - Controller.index.put(filename, "store complete"); - Controller.store.put(filename, temp); + if(fileSystem.getStore().containsKey(filename) && fileSystem.getStore().get(filename).size() == controller.getR()) { + done = true; + fileSystem.addIndex(filename, "store complete"); + fileSystem.addStore(filename, temp); res.println("STORE_COMPLETE"); res.flush(); - done = true; } } @@ -97,12 +107,15 @@ public class Server { try { String filename = tokens[1]; PrintWriter res = new PrintWriter(new OutputStreamWriter(client.getOutputStream())); - if(!Controller.store.containsKey(filename)) { + + if(!fileSystem.getStore().containsKey(filename)) { res.println("ERROR_FILE_DOES_NOT_EXIST"); } else { - // select a Dstore from there and give an approriate error if all Dstores fail - res.println("LOAD_FROM " + Controller.store.get(filename).get(0).getPort()); + // select a Dstore from there and give an appropriate error if all Dstores fail + res.println("LOAD_FROM " + fileSystem.getStore().get(filename).get(0).getPublicPort()); } + + res.flush(); } catch (IndexOutOfBoundsException e) { System.out.println("Arguments don't match in LOAD operation"); } @@ -112,12 +125,21 @@ public class Server { try { String filename = tokens[1]; - Controller.index.put(filename, "remove in progress"); - for(Dstore dstore : Controller.store.get(filename)) { - Socket socket = new Socket(); - socket.connect(new InetSocketAddress(dstore.getPort())); - PrintWriter res = new PrintWriter(new OutputStreamWriter(socket.getOutputStream())); + fileSystem.addIndex(filename, "remove in progress"); + System.out.println(fileSystem.index.get(filename)); + + if(!fileSystem.store.containsKey(filename)) { + System.out.println("Entered in Controller request"); + PrintWriter res = new PrintWriter(new OutputStreamWriter(client.getOutputStream())); + res.println("ERROR_FILE_DOES_NOT_EXIST"); + res.flush(); + return; + } + + for(FSStore fsStore : fileSystem.getStore().get(filename)) { + System.out.println("Sending to Dstore"); + PrintWriter res = fsStore.getOutput(); res.println("REMOVE " + filename); res.flush(); } @@ -127,12 +149,12 @@ public class Server { PrintWriter res = new PrintWriter(new OutputStreamWriter(client.getOutputStream())); while(!done && System.currentTimeMillis() < limit) { - if(Controller.store.get(filename).isEmpty()) { - Controller.index.put(filename, "remove complete"); - Controller.store.remove(filename); + if(fileSystem.getStore().get(filename).isEmpty()) { + done = true; + fileSystem.addIndex(filename, "remove complete"); + fileSystem.getStore().remove(filename); res.println("REMOVE_COMPLETE"); res.flush(); - done = true; } } @@ -147,21 +169,37 @@ public class Server { private void handleList(Socket client) throws IOException { String msg = ""; - for(String filename : Controller.store.keySet()) { + for(String filename : fileSystem.getStore().keySet()) { msg = filename + " " + msg; } PrintWriter res = new PrintWriter(new OutputStreamWriter(client.getOutputStream())); res.println("LIST " + msg); + res.flush(); } + private void handleStoreACK(String[] tokens) { - private void handleStoreACK() { + String filename = tokens[1]; + int dstorePort = Integer.parseInt(tokens[2]); - } + if(fileSystem.getStore().containsKey(filename)) { + fileSystem.getStore().get(filename).add(fileSystem.getDstores().get(dstorePort)); + } else { + List<FSStore> d = new ArrayList<>(); + d.add(fileSystem.getDstores().get(dstorePort)); + Server.fileSystem.addStore(filename,d); + } - private void handleRemoveACK() { + } + private void handleRemoveACK(String[] tokens) { + String filename = tokens[1]; + for(FSStore fsStore : fileSystem.getStore().get(filename)) { + if(!fsStore.getFiles().contains(filename)) { + fileSystem.removeDstore(fsStore, filename); + } + } } } -- GitLab