From d9ecb07d1fceca62d312bfb5089112828f81c38b Mon Sep 17 00:00:00 2001 From: Theodora-Mara Pislar <tmp1u19@soton.ac.uk> Date: Wed, 5 May 2021 19:35:59 +0100 Subject: [PATCH] Change the architecture of the project completely!! --- src/Controller.java | 83 ++++++++++++++++++++++ src/Dstore.java | 124 ++++++++++++++++++++++++++++++++ src/Handler.java | 129 ++++++++++++++++++++++++++++++++++ src/Server.java | 167 ++++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 503 insertions(+) create mode 100644 src/Controller.java create mode 100644 src/Dstore.java create mode 100644 src/Handler.java create mode 100644 src/Server.java diff --git a/src/Controller.java b/src/Controller.java new file mode 100644 index 0000000..1ec54da --- /dev/null +++ b/src/Controller.java @@ -0,0 +1,83 @@ +import java.io.*; +import java.net.*; +import java.util.*; + +public class Controller { + + static Map<Integer, Dstore> dstores = Collections.synchronizedMap(new HashMap<>()); + static Map<String, List<Dstore>> store = Collections.synchronizedMap(new HashMap<>()); + static Map<String, String> index = Collections.synchronizedMap(new HashMap<>()); + + private int cport; + static int R; + private int timeout; + private int rebalance_period; + + public Controller(int cport, int R, int timeout, int rebalance_period) { + this.cport = cport; + this.R = R; + this.timeout = timeout; + this.rebalance_period = rebalance_period; + } + + public void start() { + try { + ServerSocket controller = new ServerSocket(cport); + Server server = new Server(this); + System.out.println("Start listening for connections"); + + for(;;) { + try { + Socket client = controller.accept(); + + new Thread() { + @Override + public void run() { + try { + server.handleClient(client); + } catch (Exception e) { + e.printStackTrace(); + } + } + }.start(); + } catch (Exception e) { + e.printStackTrace(); + } + } + + } catch(Exception e) { + e.printStackTrace(); + } + + } + + public int getCport() { + return cport; + } + + public int getR() { + return R; + } + + public int getRebalance_period() { + return rebalance_period; + } + + public int getTimeout() { + return timeout; + } + + public static void main(String[] args) { + if(args.length != 4) { + System.out.println("Incorrect number of arguments for the controller"); + return; + } + + int cport = Integer.parseInt(args[0]); + int R = Integer.parseInt(args[1]); + int timeout = Integer.parseInt(args[2]); + int rebalance_perios = Integer.parseInt(args[3]); + + new Controller(cport, R, timeout,rebalance_perios).start(); + } +} diff --git a/src/Dstore.java b/src/Dstore.java new file mode 100644 index 0000000..65c0c28 --- /dev/null +++ b/src/Dstore.java @@ -0,0 +1,124 @@ +import java.io.*; +import java.net.*; +import java.util.HashSet; +import java.util.Set; + +public class Dstore { + + private int port; + private int cport; + private int timeout; + private String file_folder; + private Socket controllerSocket; + private Set<String> files; + + public Dstore(int port, int cport) throws IOException { + this.port = port; + this.cport = cport; + InetAddress ip = InetAddress.getLocalHost(); + controllerSocket = new Socket(); + controllerSocket.connect(new InetSocketAddress(ip, cport)); + files = new HashSet<>(); + } + + public void listenClient() { + + try { + ServerSocket dstore = new ServerSocket(port); + for(;;) { + try { + Socket client = dstore.accept(); + new Thread() { + @Override + public void run() { + try { + controllerSocket.connect(new InetSocketAddress("localhost", cport)); + new Handler().handleDstoreClientReq(client, Dstore.this); + } catch (IOException e) { + e.printStackTrace(); + } + } + }.start(); + } catch(Exception e) { + e.printStackTrace(); + } + + } + + } catch (Exception e) { + e.printStackTrace(); + } + } + + public void listenController() { + try { + new Thread() { + @Override + public void run() { + try { + controllerSocket.connect(new InetSocketAddress("localhost", cport)); + new Handler().handleDstoreControllerReq(Dstore.this); + } catch (IOException e) { + e.printStackTrace(); + } + } + }.start(); + } catch (Exception e) { + e.printStackTrace(); + } + } + + public int getPort() { + return port; + } + + public void setTimeout(int timeout) { + this.timeout = timeout; + } + + public void setFile_folder(String file_folder) { + this.file_folder = file_folder; + } + + public int getCport() { + return cport; + } + + public int getTimeout() { + return timeout; + } + + public String getFile_folder() { + return file_folder; + } + + public Set<String> getFiles() { + return files; + } + + public Socket getControllerSocket() { + return controllerSocket; + } + + public static void main(String[] args) throws IOException { + + if(args.length != 4) { + System.out.println("Incorrect number of arguments for the dstore"); + return; + } + + int port = Integer.parseInt(args[0]); + int cport = Integer.parseInt(args[1]); + int timeout = Integer.parseInt(args[2]); + String file_folder = args[3]; + + Dstore dstore = new Dstore(port, cport); + dstore.setTimeout(timeout); + dstore.setFile_folder(file_folder); + + Controller.dstores.put(port, dstore); + + dstore.listenController(); + dstore.listenClient(); + } +} diff --git a/src/Handler.java b/src/Handler.java new file mode 100644 index 0000000..042ee34 --- /dev/null +++ b/src/Handler.java @@ -0,0 +1,129 @@ +import java.io.*; +import java.net.*; +import java.util.ArrayList; +import java.util.List; + +public class Handler { + public void handleDstoreClientReq(Socket client, Dstore dstore) throws IOException { + + OutputStream out = client.getOutputStream(); + InputStream in = client.getInputStream(); + + BufferedReader req = new BufferedReader(new InputStreamReader(in)); + PrintWriter res = new PrintWriter(new OutputStreamWriter(out)); + String line; + + while((line = req.readLine()) != null) { + String[] tokens = line.split(" "); + String command = tokens[0]; + + if(dstore.getFiles().size() < Controller.R) { + res.println("ERROR_NOT_ENOUGH_DSTORES"); + res.flush(); + } else { + switch(command) { + case "STORE" : { + try { + + res.println("ACK"); + res.flush(); + String filename = tokens[1]; + int filesize = Integer.parseInt(tokens[2]); + + // read data and store the file + byte[] data = new byte[filesize]; + client.getInputStream().readNBytes(data, 0, filesize); + FileOutputStream o = new FileOutputStream(dstore.getFile_folder() + + "/" + filename); + o.write(data); + o.flush(); + o.close(); + dstore.getFiles().add(filename); + + if(Controller.store.containsKey(filename)) { + Controller.store.get(filename).add(dstore); + } else { + List<Dstore> d = new ArrayList<>(); + d.add(dstore); + Controller.store.put(filename, d); + } + + PrintWriter r = new PrintWriter(new OutputStreamWriter(dstore.getControllerSocket().getOutputStream())); + r.println("STORE_COMPLETE"); + + } catch (IndexOutOfBoundsException e) { + System.out.println("Arguments don;t match the STORE operation"); + } + } + case "LOAD_DATA" : { + + try { + + String filename = tokens[1]; + if(dstore.getFiles().contains(filename)) { + File file = new File(dstore.getFile_folder() + "/" + filename); + int n; + while((n = in.read())!= -1) { + client.getOutputStream().write(n); + } + } else { + client.close(); + } + + } catch (IndexOutOfBoundsException e) { + + System.out.println("Arguments don't match in LOAD operation"); + } + } + } + } + } + + } + + public void handleDstoreControllerReq(Dstore dstore) throws IOException { + + OutputStream out = dstore.getControllerSocket().getOutputStream(); + InputStream in = dstore.getControllerSocket().getInputStream(); + + BufferedReader req = new BufferedReader(new InputStreamReader(in)); + PrintWriter res = new PrintWriter(new OutputStreamWriter(out)); + String line; + + while((line = req.readLine()) != null) { + String[] tokens = line.split(" "); + String command = tokens[0]; + + if(dstore.getFiles().size() < Controller.R) { + res.println("ERROR_NOT_ENOUGH_DSTORES"); + res.flush(); + } else { + switch (command) { + + case "REMOVE" : { + + try { + + String filename = tokens[1]; + if(dstore.getFiles().contains(filename)) { + File file = new File(dstore.getFile_folder() + "/" + filename); + file.delete(); + Controller.store.get(filename).remove(dstore); + res.println("REMOVE_ACK"); + res.flush(); + } else { + res.println("ERROR_FILE_DOES_NOT_EXIST"); + res.flush(); + } + + } catch(IndexOutOfBoundsException e) { + System.out.println("Arguments don't match in REMOVE opretaion"); + } + + } + } + } + } + + } +} diff --git a/src/Server.java b/src/Server.java new file mode 100644 index 0000000..43a222d --- /dev/null +++ b/src/Server.java @@ -0,0 +1,167 @@ +import java.io.*; +import java.net.*; +import java.util.*; + +public class Server { + + private Controller controller; + + public Server(Controller controller) { + this.controller = controller; + } + + public void handleClient(Socket client) throws IOException { + OutputStream out = client.getOutputStream(); + InputStream in = client.getInputStream(); + + BufferedReader req = new BufferedReader(new InputStreamReader(in)); + PrintWriter res = new PrintWriter(new OutputStreamWriter(out)); + + System.out.println("---------NEW CONNECTION---------"); + + String line; + while((line = req.readLine()) != null) { + String[] tokens = line.split(" "); + String command = tokens[0]; + + if(command.equals("JOIN")) { + Dstore dstore = new Dstore(client.getPort(), controller.getCport()); + Controller.dstores.put(client.getPort(), dstore); + } else if(Controller.dstores.size() < controller.getR()) { + res.println("ERROR_NOT_ENOUGH_DSTORES"); + res.flush(); + } else if(Controller.dstores.containsKey(client.getPort())) { + switch(command) { + case "STORE_ACK" : handleStoreACK(); + case "REMOVE_ACK" : handleRemoveACK(); + } + } else { + switch(command) { + case "STORE": handleStore(client, tokens); + case "LOAD": handleLoad(client, tokens); + case "REMOVE": handleRemove(client, tokens); + case "LIST": handleList(client); + } + } + } + } + + private void handleStore(Socket client, String[] tokens) throws IOException { + try { + + String filename = tokens[1]; + int filesize = Integer.parseInt(tokens[2]); + PrintWriter res = new PrintWriter(new OutputStreamWriter(client.getOutputStream())); + + Controller.index.put(filename, "store in progress"); + String msg = ""; + int i = 0; + List<Dstore> temp = new ArrayList<>(); + + for(int port : Controller.dstores.keySet()) { + if(i == controller.getR()) { + break; + } + + temp.add(Controller.dstores.get(port)); + msg = Controller.dstores.get(port).getPort() + msg + " "; + } + + res.println("STORE_TO " + msg); + res.flush(); + + boolean done = false; + long limit = System.currentTimeMillis() + controller.getTimeout(); + + while(!done && System.currentTimeMillis() < limit) { + if(Controller.store.containsKey(filename) && Controller.store.get(filename).size() == controller.getR()) { + Controller.index.put(filename, "store complete"); + Controller.store.put(filename, temp); + res.println("STORE_COMPLETE"); + res.flush(); + done = true; + } + } + + if(!done) { + System.out.println(filename + " failed to upload"); + } + + } catch (IndexOutOfBoundsException e) { + System.out.println("Arguments don't match in STORE operation"); + } + } + + private void handleLoad(Socket client, String[] tokens) throws IOException { + + try { + String filename = tokens[1]; + PrintWriter res = new PrintWriter(new OutputStreamWriter(client.getOutputStream())); + if(!Controller.store.containsKey(filename)) { + res.println("ERROR_FILE_DOES_NOT_EXIST"); + } else { + // select a Dstore from there and give an approriate error if all Dstores fail + res.println("LOAD_FROM " + Controller.store.get(filename).get(0).getPort()); + } + } catch (IndexOutOfBoundsException e) { + System.out.println("Arguments don't match in LOAD operation"); + } + } + + private void handleRemove(Socket client, String[] tokens) throws IOException { + try { + + String filename = tokens[1]; + Controller.index.put(filename, "remove in progress"); + + for(Dstore dstore : Controller.store.get(filename)) { + Socket socket = new Socket(); + socket.connect(new InetSocketAddress(dstore.getPort())); + PrintWriter res = new PrintWriter(new OutputStreamWriter(socket.getOutputStream())); + res.println("REMOVE " + filename); + res.flush(); + } + + boolean done = false; + long limit = System.currentTimeMillis() + controller.getTimeout(); + PrintWriter res = new PrintWriter(new OutputStreamWriter(client.getOutputStream())); + + while(!done && System.currentTimeMillis() < limit) { + if(Controller.store.get(filename).isEmpty()) { + Controller.index.put(filename, "remove complete"); + Controller.store.remove(filename); + res.println("REMOVE_COMPLETE"); + res.flush(); + done = true; + } + } + + if(!done) { + System.out.println(filename + " failed to remove"); + } + + } catch (IndexOutOfBoundsException e) { + System.out.println("Arguments don't match in REMOVE operation"); + } + } + + private void handleList(Socket client) throws IOException { + String msg = ""; + for(String filename : Controller.store.keySet()) { + msg = filename + " " + msg; + } + + PrintWriter res = new PrintWriter(new OutputStreamWriter(client.getOutputStream())); + res.println("LIST " + msg); + } + + + private void handleStoreACK() { + + } + + private void handleRemoveACK() { + + } + +} -- GitLab