diff --git a/src/ClientMain.java b/src/ClientMain.java index dc61bdb0abb574bae424f866cd17efa21c6ff3a4..15116847f728067b25d53f3ad7687e0a61bd4af2 100644 --- a/src/ClientMain.java +++ b/src/ClientMain.java @@ -17,7 +17,7 @@ public class ClientMain { if (!uploadFolder.exists()) throw new RuntimeException("to_store folder does not exist"); - testClient(cport, timeout, downloadFolder); + //testClient(cport, timeout, downloadFolder); // example to launch a number of concurrent clients, each doing the same operations for (int i = 0; i < 10; i++) { diff --git a/src/Controller.java b/src/Controller.java index 1ef5b448b34b59c279f6b083db109b81b5eef41b..283612b4d27b0150a0328a119daf05f1eeb727ae 100644 --- a/src/Controller.java +++ b/src/Controller.java @@ -17,6 +17,7 @@ public class Controller { public void start() { try { + ControllerLogger.init(Logger.LoggingType.ON_FILE_AND_TERMINAL); ServerSocket controller = new ServerSocket(cport); FileSystem fileSystem = new FileSystem(); Server server = new Server(this, fileSystem); diff --git a/src/ControllerLogger.java b/src/ControllerLogger.java new file mode 100644 index 0000000000000000000000000000000000000000..30f0f8f76ca72bf2f3344c2540b09ecd526727b1 --- /dev/null +++ b/src/ControllerLogger.java @@ -0,0 +1,36 @@ +import java.io.IOException; +import java.net.Socket; + +public class ControllerLogger extends Logger { + + private static final String LOG_FILE_SUFFIX = "controller"; + + private static ControllerLogger instance = null; + + public static void init(LoggingType loggingType) throws IOException { + if (instance == null) + instance = new ControllerLogger(loggingType); + else + throw new IOException("ControllerLogger already initialised"); + } + + public static ControllerLogger getInstance() { + if (instance == null) + throw new RuntimeException("ControllerLogger has not been initialised yet"); + return instance; + } + + protected ControllerLogger(LoggingType loggingType) throws IOException { + super(loggingType); + } + + @Override + protected String getLogFileSuffix() { + return LOG_FILE_SUFFIX; + } + + public void dstoreJoined(Socket socket, int dstorePort) { + log("[New Dstore " + dstorePort + " " + socket.getLocalPort() + "<-" + socket.getPort() + "]"); + } + +} diff --git a/src/Dstore.java b/src/Dstore.java index c46b01fdd5d84f0d36598e9029976c8bce6e275b..3de15be706ed2b3f9e678dd4ff67aa093b6fed18 100644 --- a/src/Dstore.java +++ b/src/Dstore.java @@ -98,6 +98,7 @@ public class Dstore { String command = tokens[0]; if(dstore.getFiles().size() < Controller.R) { + DstoreLogger.getInstance().messageSent(client, "ERROR_NOT_ENOUGH_DSTORES"); res.println("ERROR_NOT_ENOUGH_DSTORES"); res.flush(); } else { @@ -106,35 +107,51 @@ public class Dstore { case "STORE": try { + DstoreLogger.getInstance().messageReceived(client, line); + + // send response to client + DstoreLogger.getInstance().messageSent(client, "ACK"); res.println("ACK"); res.flush(); + + // get the filename and the filesize of the file to be stored String filename = tokens[1]; int filesize = Integer.parseInt(tokens[2]); + // read data and store the file long limit = System.currentTimeMillis() + dstore.getTimeout(); boolean finished = false; while(!finished && System.currentTimeMillis() < limit) { - // read data and store the file + + // create the file and read its bytes byte[] data = new byte[filesize]; - client.getInputStream().readNBytes(data, 0, filesize); + in.readNBytes(data, 0, filesize); FileOutputStream o = new FileOutputStream(dstore.getFile_folder() + "/" + filename); + // write the bytes in the file created and close it after o.write(data); - dstore.addFile(filename); - finished = true; o.flush(); o.close(); + + // update the files from the dstore + dstore.addFile(filename); dstore.getFiles().add(filename); dstore.getFileSizes().put(filename, filesize); + + // exit while loop + finished = true; } + // timeout from dstore if(!finished) { - System.out.println("ERROR: time expired"); + System.out.println("ERROR: time expired in dstore while storing"); return; } + // send response to the Controller PrintWriter r = new PrintWriter(new OutputStreamWriter(controllerSocket.getOutputStream())); + DstoreLogger.getInstance().messageSent(controllerSocket, "STORE_ACK " + filename); r.println("STORE_ACK " + filename); r.flush(); @@ -145,38 +162,40 @@ public class Dstore { case "LOAD_DATA": + DstoreLogger.getInstance().messageReceived(client, line); + if(tokens.length != 2) { System.out.println("Arguments don't match in LOAD operation"); } else { String filename = tokens[1]; - try { - if (dstore.getFiles().contains(filename)) { - File file = new File(dstore.getFile_folder() + "/" + filename); - FileInputStream i = new FileInputStream(file); - int n; - while ((n = i.read()) != -1) { - client.getOutputStream().write(n); - } - i.close(); - } else { - done = true; - client.close(); + if (dstore.getFiles().contains(filename)) { + File file = new File(dstore.getFile_folder() + "/" + filename); + FileInputStream i = new FileInputStream(file); + int n; + while ((n = i.read()) != -1) { + client.getOutputStream().write(n); } - Server.pos = 0; - } catch (Exception ingored) { - res.println("RELOAD " + filename); - res.flush(); + i.close(); + } else { + done = true; + client.close(); } + Server.pos = 0; + } break; case "REBALANCE_STORE" : + DstoreLogger.getInstance().messageReceived(client, line); + res.println("ACK"); res.flush(); + DstoreLogger.getInstance().messageSent(client, "ACK"); + String filename = tokens[1]; int filesize = Integer.parseInt(tokens[2]); @@ -192,6 +211,7 @@ public class Dstore { dstore.getFileSizes().put(filename, filesize); PrintWriter r = new PrintWriter(new OutputStreamWriter(controllerSocket.getOutputStream())); + DstoreLogger.getInstance().messageSent(controllerSocket, "REBALANCE_COMPLETE"); r.println("REBALANCE_COMPLETE"); r.flush(); @@ -217,21 +237,25 @@ public class Dstore { String command = tokens[0]; if(dstore.getFiles().size() < Controller.R) { + DstoreLogger.getInstance().messageSent(controllerSocket, "ERROR_NOT_ENOUGH_DSTORES"); res.println("ERROR_NOT_ENOUGH_DSTORES"); res.flush(); } else { if(command.equals("REMOVE")) { + DstoreLogger.getInstance().messageReceived(controllerSocket, line); try { String filename = tokens[1]; if(dstore.getFiles().contains(filename)) { - System.out.println("REMOVE_ACK"); + File file = new File(dstore.getFile_folder() + "/" + filename); file.delete(); dstore.getFiles().remove(filename); dstore.getFileSizes().remove(filename); + DstoreLogger.getInstance().messageSent(controllerSocket, "REMOVE_ACK " + filename); res.println("REMOVE_ACK " + filename); } else { + DstoreLogger.getInstance().messageSent(controllerSocket, "ERROR_FILE_DOES_NOT_EXIST " + filename); res.println("ERROR_FILE_DOES_NOT_EXIST " + filename); } @@ -242,16 +266,22 @@ public class Dstore { } } else if (command.equals("LIST")) { + DstoreLogger.getInstance().messageReceived(controllerSocket, line); + String file_list = ""; for(String filename : dstore.getFiles()) { file_list = filename + " " + file_list; } + + DstoreLogger.getInstance().messageSent(controllerSocket, "LIST " + file_list); res.println("LIST " + file_list); res.flush(); } else if (command.equals("REBALANCE")) { + DstoreLogger.getInstance().messageReceived(controllerSocket, line); + // show the distribution of files for(int i = 0; i < tokens.length; i++) { System.out.print(tokens[i] + " "); @@ -283,10 +313,12 @@ public class Dstore { PrintWriter re = new PrintWriter(new OutputStreamWriter(socket.getOutputStream())); BufferedReader rs = new BufferedReader(new InputStreamReader(socket.getInputStream())); + DstoreLogger.getInstance().messageSent(socket, "REBALANCE_STORE " + filename + " " + filesize); re.println("REBALANCE_STORE " + filename + " " + filesize); re.flush(); String l = rs.readLine(); + DstoreLogger.getInstance().messageReceived(socket, l); long limit = System.currentTimeMillis() + dstore.getTimeout(); boolean done = false; @@ -325,11 +357,6 @@ public class Dstore { } } - if(number_of_files_to_send == 0) { - res.println("REBALANCE_COMPLETE"); - res.flush(); - } - } else { System.out.println("Unknown command"); } @@ -353,10 +380,12 @@ public class Dstore { Dstore dstore = new Dstore(port, cport, timeout, file_folder); try { + DstoreLogger.init(Logger.LoggingType.ON_FILE_AND_TERMINAL, port); Socket socket = new Socket(); socket.connect(new InetSocketAddress("localhost", cport)); OutputStream out = socket.getOutputStream(); PrintWriter res = new PrintWriter(new OutputStreamWriter(out)); + DstoreLogger.getInstance().messageSent(socket, "JOIN " + port); res.println("JOIN " + port); res.flush(); diff --git a/src/DstoreLogger.java b/src/DstoreLogger.java new file mode 100644 index 0000000000000000000000000000000000000000..24982ce092cb7e790305a8d5961e1dfec92250ca --- /dev/null +++ b/src/DstoreLogger.java @@ -0,0 +1,34 @@ +import java.io.IOException; + +public class DstoreLogger extends Logger { + +private static final String LOG_FILE_SUFFIX = "dstore"; + + private static DstoreLogger instance = null; + + private final String logFileSuffix; + + public static void init(LoggingType loggingType, int port) throws IOException { + if (instance == null) + instance = new DstoreLogger(loggingType, port); + else + throw new IOException("DstoreLogger already initialised"); + } + + public static DstoreLogger getInstance() { + if (instance == null) + throw new RuntimeException("DstoreLogger has not been initialised yet"); + return instance; + } + + protected DstoreLogger(LoggingType loggingType, int port) throws IOException { + super(loggingType); + logFileSuffix = LOG_FILE_SUFFIX + "_" + port; + } + + @Override + protected String getLogFileSuffix() { + return logFileSuffix; + } + +} diff --git a/src/Logger.java b/src/Logger.java new file mode 100644 index 0000000000000000000000000000000000000000..14f3bf8b9fd6e1f35be840558b71146128a3fb31 --- /dev/null +++ b/src/Logger.java @@ -0,0 +1,52 @@ +import java.io.IOException; +import java.io.PrintStream; +import java.net.Socket; + +public abstract class Logger { + + public enum LoggingType { + NO_LOG, // no log at all + ON_TERMINAL_ONLY, // log to System.out only + ON_FILE_ONLY, // log to file only + ON_FILE_AND_TERMINAL // log to both System.out and file + } + + protected final LoggingType loggingType; + protected PrintStream ps; + + protected Logger(LoggingType loggingType) { + this.loggingType = loggingType; + } + + protected abstract String getLogFileSuffix(); + + protected synchronized PrintStream getPrintStream() throws IOException { + if (ps == null) + ps = new PrintStream(getLogFileSuffix() + "_" + System.currentTimeMillis() + ".log"); + return ps; + } + + protected boolean logToFile() { + return loggingType == LoggingType.ON_FILE_ONLY || loggingType == LoggingType.ON_FILE_AND_TERMINAL; + } + + protected boolean logToTerminal() { + return loggingType == LoggingType.ON_TERMINAL_ONLY || loggingType == LoggingType.ON_FILE_AND_TERMINAL; + } + + protected void log(String message) { + if (logToFile()) + try { getPrintStream().println(message); } catch(Exception e) { e.printStackTrace(); } + if (logToTerminal()) + System.out.println(message); + } + + public void messageSent(Socket socket, String message) { + log("[" + socket.getLocalPort() + "->" + socket.getPort() + "] " + message); + } + + public void messageReceived(Socket socket, String message) { + log("[" + socket.getLocalPort() + "<-" + socket.getPort() + "] " + message); + } + +} diff --git a/src/Protocol.java b/src/Protocol.java new file mode 100644 index 0000000000000000000000000000000000000000..25181131b0d0226678818b39344f787b47cbc66e --- /dev/null +++ b/src/Protocol.java @@ -0,0 +1,30 @@ + +public class Protocol { + + // messages from Clients + public final static String LIST_TOKEN = "LIST"; // also from Controller and Dstores + public final static String STORE_TOKEN = "STORE"; // also from Dstores + public final static String LOAD_TOKEN = "LOAD"; + public final static String LOAD_DATA_TOKEN = "LOAD_DATA"; + public final static String RELOAD_TOKEN = "RELOAD"; + public final static String REMOVE_TOKEN = "REMOVE"; // also from Controller + + // messages from Controller + public final static String STORE_TO_TOKEN = "STORE_TO"; + public final static String STORE_COMPLETE_TOKEN = "STORE_COMPLETE"; + public final static String LOAD_FROM_TOKEN = "LOAD_FROM"; + public final static String REMOVE_COMPLETE_TOKEN = "REMOVE_COMPLETE"; + public final static String REBALANCE_TOKEN = "REBALANCE"; + public final static String ERROR_FILE_DOES_NOT_EXIST_TOKEN = "ERROR_FILE_DOES_NOT_EXIST"; // also from Dstores + public final static String ERROR_FILE_ALREADY_EXISTS_TOKEN = "ERROR_FILE_ALREADY_EXISTS"; + public final static String ERROR_NOT_ENOUGH_DSTORES_TOKEN = "ERROR_NOT_ENOUGH_DSTORES"; + public final static String ERROR_LOAD_TOKEN = "ERROR_LOAD"; + + // messages from Dstores + public final static String ACK_TOKEN = "ACK"; + public final static String STORE_ACK_TOKEN = "STORE_ACK"; + public final static String REMOVE_ACK_TOKEN = "REMOVE_ACK"; + public final static String JOIN_TOKEN = "JOIN"; + public final static String REBALANCE_STORE_TOKEN = "REBALANCE_STORE"; + public final static String REBALANCE_COMPLETE_TOKEN = "REBALANCE_COMPLETE"; +} diff --git a/src/Server.java b/src/Server.java index edf5807c1cdf22e9b870f97f1969b5863ffa3b13..e0cab541fa1fae97cdb9be9fb9454a10975c9c0c 100644 --- a/src/Server.java +++ b/src/Server.java @@ -60,27 +60,33 @@ public class Server { String[] tokens = line.split(" "); String command = tokens[0]; - if (command.equals("JOIN")) { + if (!rebalancing && command.equals("JOIN")) { int publicPort = Integer.parseInt(tokens[1]); FSStore fsStore = new FSStore(client, publicPort); fileSystem.addDstore(client.getPort(), fsStore); - System.out.println("Dstore listening on port " + publicPort + " connected"); + ControllerLogger.getInstance().dstoreJoined(client, publicPort); runRebalancing(0); } else if (fileSystem.getDstores().size() < controller.getR()) { + + ControllerLogger.getInstance().messageSent(client, "ERROR_NOT_ENOUGH_DSTORES"); res.println("ERROR_NOT_ENOUGH_DSTORES"); res.flush(); + } else { if (fileSystem.getDstores().containsKey(client.getPort())) { if(command.equals("STORE_ACK")) { + ControllerLogger.getInstance().messageReceived(client, "STORE_ACK"); handleStoreACK(tokens); } else if (command.equals("REMOVE_ACK")) { + ControllerLogger.getInstance().messageReceived(client, "REMOVE_ACK"); handleRemoveACK(tokens); } else if(command.equals("LIST")) { + ControllerLogger.getInstance().messageReceived(client, "LIST"); // get the files of the dstore when rebalancing List<String> filenames = new ArrayList<>(); for(int i = 1; i < tokens.length; i++) { @@ -93,24 +99,22 @@ public class Server { filenames); } else if (command.equals("REBALANCE_COMPLETE")) { - - //fileSystem.increaseRebalanceComplete(); + ControllerLogger.getInstance().messageReceived(client, "REBALANCE_COMPLETE"); + System.out.println("Rebalance done for " + fileSystem.getDstores().get(client.getPort()).getPublicPort()); } else if (command.equals("ERROR_FILE_DOES_NOT_EXIST")) { - - System.out.println(command + " " + tokens[1]); - + ControllerLogger.getInstance().messageReceived(client, "ERROR_FILE_DOES_NOT_EXIST"); } else { System.out.println("Unknown command " + command); } } else if (!rebalancing) { switch (command) { - case "STORE" -> handleStore(client, tokens); - case "LOAD" -> handleLoad(client, tokens); - case "RELOAD" -> handleReload(client, tokens); - case "REMOVE" -> handleRemove(client, tokens); - case "LIST" -> handleList(client); + case "STORE" -> handleStore(client, tokens, line); + case "LOAD" -> handleLoad(client, tokens, line); + case "RELOAD" -> handleReload(client, tokens, line); + case "REMOVE" -> handleRemove(client, tokens, line); + case "LIST" -> handleList(client, line); default -> System.out.println("Unknown command " + command); } @@ -126,14 +130,17 @@ public class Server { * @param tokens gets the filename and the filesize * @throws IOException */ - private void handleStore(Socket client, String[] tokens) throws IOException { - try { + private void handleStore(Socket client, String[] tokens, String line) throws IOException { + + ControllerLogger.getInstance().messageReceived(client, line); + try { String filename = tokens[1]; int filesize = Integer.parseInt(tokens[2]); PrintWriter res = new PrintWriter(new OutputStreamWriter(client.getOutputStream())); if(fileSystem.getStore().containsKey(filename)) { + ControllerLogger.getInstance().messageSent(client, "ERROR_FILE_ALREADY_EXISTS"); res.println("ERROR_FILE_ALREADY_EXISTS"); res.flush(); return; @@ -162,6 +169,7 @@ public class Server { i++; } + ControllerLogger.getInstance().messageSent(client, "STORE_TO " + msg); res.println("STORE_TO " + msg); res.flush(); @@ -182,6 +190,7 @@ public class Server { } fileSystem.addFileSize(filename, filesize); + ControllerLogger.getInstance().messageSent(client, "STORE_COMPLETE"); res.println("STORE_COMPLETE"); res.flush(); break; @@ -200,27 +209,32 @@ public class Server { } } - private void handleLoad(Socket client, String[] tokens) throws IOException { + private void handleLoad(Socket client, String[] tokens, String line) throws IOException { + + ControllerLogger.getInstance().messageReceived(client, line); try { String filename = tokens[1]; PrintWriter res = new PrintWriter(new OutputStreamWriter(client.getOutputStream())); if(!fileSystem.getStore().containsKey(filename)) { + ControllerLogger.getInstance().messageSent(client, "ERROR_FILE_DOES_NOT_EXIST"); res.println("ERROR_FILE_DOES_NOT_EXIST"); } else { // select a Dstore from there and give an appropriate error if all Dstores fail - // #TODO fix NullPointerException from here + ControllerLogger.getInstance().messageSent(client, "LOAD_FROM " + fileSystem.getStore().get(filename).get(pos).getPublicPort() + + " " + fileSystem.getFileSizes().get(filename)); res.println("LOAD_FROM " + fileSystem.getStore().get(filename).get(pos).getPublicPort() + " " + fileSystem.getFileSizes().get(filename)); } - res.flush(); } catch (IndexOutOfBoundsException e) { System.out.println("Arguments don't match in LOAD operation"); } } - private void handleReload(Socket client, String[] tokens) throws IOException { + private void handleReload(Socket client, String[] tokens, String line) throws IOException { + ControllerLogger.getInstance().messageReceived(client, line); + pos = pos + 1; PrintWriter res = new PrintWriter(new OutputStreamWriter(client.getOutputStream())); @@ -228,17 +242,21 @@ public class Server { String filename = tokens[1]; if(!fileSystem.getStore().containsKey(filename)) { pos = 0; + ControllerLogger.getInstance().messageSent(client, "ERROR_FILE_DOES_NOT_EXIST"); res.println("ERROR_FILE_DOES_NOT_EXIST"); } else { if(pos == controller.getR()) { pos = 0; + ControllerLogger.getInstance().messageSent(client, "ERROR_LOAD"); res.println("ERROR_LOAD"); res.flush(); return; } // select a Dstore from there and give an appropriate error if all Dstores fail + ControllerLogger.getInstance().messageSent(client, "LOAD_FROM " + fileSystem.getStore().get(filename).get(pos).getPublicPort() + + " " + fileSystem.getFileSizes().get(filename)); res.println("LOAD_FROM " + fileSystem.getStore().get(filename).get(pos).getPublicPort() + " " + fileSystem.getFileSizes().get(filename)); } @@ -249,7 +267,10 @@ public class Server { } } - private void handleRemove(Socket client, String[] tokens) throws IOException { + private void handleRemove(Socket client, String[] tokens, String line) throws IOException { + + ControllerLogger.getInstance().messageReceived(client, line); + try { String filename = tokens[1]; @@ -259,15 +280,16 @@ public class Server { if(!fileSystem.store.containsKey(filename)) { PrintWriter res = new PrintWriter(new OutputStreamWriter(client.getOutputStream())); + ControllerLogger.getInstance().messageSent(client, "ERROR_FILE_DOES_NOT_EXIST"); res.println("ERROR_FILE_DOES_NOT_EXIST"); res.flush(); return; } for(FSStore fsStore : fileSystem.getStore().get(filename)) { - System.out.println("Sending to Dstore"); fsStore.getFiles().remove(filename); PrintWriter res = fsStore.getOutput(); + ControllerLogger.getInstance().messageSent(client, "REMOVE " + filename); res.println("REMOVE " + filename); res.flush(); } @@ -282,6 +304,7 @@ public class Server { fileSystem.addIndex(filename, "remove complete"); fileSystem.getStore().remove(filename); fileSystem.getFileSizes().remove(filename); + ControllerLogger.getInstance().messageSent(client, "REMOVE_COMPLETE"); res.println("REMOVE_COMPLETE"); res.flush(); } @@ -297,13 +320,17 @@ public class Server { } } - private void handleList(Socket client) throws IOException { + private void handleList(Socket client, String line) throws IOException { + + ControllerLogger.getInstance().messageReceived(client, line); + String msg = ""; for(String filename : fileSystem.getStore().keySet()) { msg = filename + " " + msg; } PrintWriter res = new PrintWriter(new OutputStreamWriter(client.getOutputStream())); + ControllerLogger.getInstance().messageSent(client, "LIST " + msg); res.println("LIST " + msg); res.flush(); }