diff --git a/ClientMain.java b/ClientMain.java new file mode 100644 index 0000000000000000000000000000000000000000..10a4789763510688bf21b2607809181eea710c6c --- /dev/null +++ b/ClientMain.java @@ -0,0 +1,84 @@ +import java.io.File; +import java.io.IOException; + +public class ClientMain { + + public static void main(String[] args) { + + int cport = -1; + int timeout = -1; + try { + // parse arguments + cport = Integer.parseInt(args[0]); + timeout = Integer.parseInt(args[1]); + } catch (NumberFormatException e) { + System.err.println("Error parsing arguments: " + e.getMessage()); + System.err.println("Expected: java ClientMain cport timeout"); + System.exit(-1); + } + + File downloadFolder = new File("downloads"); + if (!downloadFolder.exists()) + if (!downloadFolder.mkdir()) throw new RuntimeException("Cannot create download folder (folder absolute path: " + downloadFolder.getAbsolutePath() + ")"); + + testClient(cport, timeout, downloadFolder); + + // example to launch a number of concurrent clients, each doing the same operations + /*for (int i = 0; i < 10; i++) { + new Thread() { + public void run() { + testClient(cport, timeout, downloadFolder); + } + }.start(); + }*/ + } + + public static void testClient(int cport, int timeout, File downloadFolder) { + Client client = null; + + try { + + client = new Client(cport, timeout, Logger.LoggingType.ON_FILE_AND_TERMINAL); + + try { client.connect(); } catch(IOException e) { e.printStackTrace(); return; } + + try { list(client); } catch(IOException e) { e.printStackTrace(); } + + try { client.store(new File("Clipboard01.pdf")); } catch(IOException e) { e.printStackTrace(); } + + try { client.store(new File("Clipboard01.pdf")); } catch(IOException e) { e.printStackTrace(); } + + try { client.store(new File("Clipboard01.jpg")); } catch(IOException e) { e.printStackTrace(); } + + String list[] = null; + try { list = list(client); } catch(IOException e) { e.printStackTrace(); } + + if (list != null) + for (String filename : list) + try { client.load(filename, downloadFolder); } catch(IOException e) { e.printStackTrace(); } + + if (list != null) + for (String filename : list) + try { client.remove(filename); } catch(IOException e) { e.printStackTrace(); } + + try { list(client); } catch(IOException e) { e.printStackTrace(); } + + } finally { + if (client != null) + try { client.disconnect(); } catch(Exception e) { e.printStackTrace(); } + } + } + + public static String[] list(Client client) throws IOException, NotEnoughDstoresException { + System.out.println("Retrieving list of files..."); + String list[] = client.list(); + + System.out.println("Ok, " + list.length + " files:"); + int i = 0; + for (String filename : list) + System.out.println("[" + i++ + "] " + filename); + + return list; + } + +} diff --git a/Controller.java b/Controller.java index df943c116c7ddec8ae056f79e6d35d2e8243dad2..481360c65a0f74105b7ae335ed46dfe59fbf6804 100644 --- a/Controller.java +++ b/Controller.java @@ -6,6 +6,7 @@ import java.util.ArrayList; import java.util.Map; import java.util.HashMap; import java.util.Collection; +import java.util.Collections; public class Controller { protected int cport; //Port to listen on @@ -23,7 +24,7 @@ public class Controller { public IndexEntry() { filesize = -1; - storedBy = new SyncList(); + storedBy = Collections.synchronizedList(new ArrayList<Integer>()); numberToStore = 0; status = "store in progress"; storeAckLock = new Object(); @@ -90,7 +91,7 @@ public class Controller { } } - protected Map<Integer,Socket> dstores; + protected Map<Integer,DstoreConnection> dstores; protected Map<Integer,String[]> rebalanceMessages; protected Map<String,IndexEntry> index; @@ -99,7 +100,7 @@ public class Controller { this.rFactor = rFactor; this.timeout = timeout; this.rebalancePeriod = rebalancePeriod; - dstores = Collections.synchronizedMap(new HashMap<Integer,Socket>()); + dstores = Collections.synchronizedMap(new HashMap<Integer,DstoreConnection>()); index = new HashMap<String,IndexEntry>(); } @@ -132,8 +133,9 @@ public class Controller { BufferedReader in = new BufferedReader(new InputStreamReader(client.getInputStream())); String[] message = in.readLine().split(" "); if(dstores.size() < rFactor && !message[0].equals("JOIN")) { - BufferedWriter out = new BufferedWriter(new OutputStreamWriter(client.getOutputStream())); - out.write("ERROR"); + PrintWriter out = new PrintWriter(client.getOutputStream()); + out.println("ERROR"); + out.flush(); out.close(); } else { @@ -155,7 +157,7 @@ public class Controller { void handleMessage(String[] message, Socket client) throws Exception { if(message[0].equals("JOIN")) { - dstores.add(Integer.parseInt(message[1]), client); + dstores.put(Integer.parseInt(message[1]), new DstoreConnection(client)); System.out.println("Dstore at " + message[1] + " joined"); rebalance(); } @@ -175,13 +177,7 @@ public class Controller { list(client); } else { - for(String name : message) { - if(!index.containsKey(name)) { - //Log error and continue (throw exception?) - return; - } - } - receiveDstoreList(client, message); + //Log error } } @@ -190,7 +186,7 @@ public class Controller { try { if(index.containsKey(filename)) { PrintWriter out = new PrintWriter(client.getOutputStream()); - out.print("ERROR ALREADY_EXISTS " + filename); + out.println("ERROR ALREADY_EXISTS " + filename); out.flush(); out.close(); return; @@ -210,10 +206,10 @@ public class Controller { //Send STORE_TO message PrintWriter out = new PrintWriter(client.getOutputStream()); - out.print("STORE_TO"); + out.println("STORE_TO"); out.flush(); for(int port : storesToStore) { - out.print(" " + port); + out.println(" " + port); new Thread(() -> { String[] message = dstores.get(Integer.valueOf(port)).receive().split(" "); if(message[0].equals("STORE_ACK")) { @@ -241,7 +237,7 @@ public class Controller { entry.status = "store complete"; //Send STORE_COMPLETE message - out.print("STORE_COMPLETE"); + out.println("STORE_COMPLETE"); out.flush(); out.close(); } @@ -266,7 +262,7 @@ public class Controller { try { if(!index.containsKey(filename)) { PrintWriter out = new PrintWriter(client.getOutputStream()); - out.print("ERROR DOES_NOT_EXIST"); + out.println("ERROR DOES_NOT_EXIST"); out.flush(); out.close(); return; @@ -279,24 +275,24 @@ public class Controller { //Send LOAD_FROM message PrintWriter out = new PrintWriter(client.getOutputStream()); - out.print("LOAD_FROM " + thisStore + " " + thisSize); + out.println("LOAD_FROM " + thisStore + " " + thisSize); out.flush(); - Reloader reloadLock = new Object(); + Reloader reloadLock = new Reloader(); thisEntry.getLoadList().add(reloadLock); int trials = 0; while(true) { reloadLock.wait(10 * timeout); trials ++; if(trials >= rFactor || !reloadLock.reload) break; - out.print("LOAD_FROM " + thisEntry.storedBy.get(trials).intValue() + " " + thisSize); + out.println("LOAD_FROM " + thisEntry.storedBy.get(trials).intValue() + " " + thisSize); out.flush(); reloadLock.reload = false; } thisEntry.getLoadList().remove(reloadLock); if(trials >= rFactor && reloadLock.reload) { - out.print("ERROR LOAD"); + out.println("ERROR LOAD"); out.flush(); } @@ -327,7 +323,7 @@ public class Controller { try { if(!index.containsKey(filename)) { PrintWriter clientOut = new PrintWriter(client.getOutputStream()); - clientOut.print("ERROR DOES_NOT_EXIST"); + clientOut.println("ERROR DOES_NOT_EXIST"); clientOut.flush(); clientOut.close(); return; @@ -367,7 +363,7 @@ public class Controller { //Send REMOVE_COMPLETE to client PrintWriter clientOut = new PrintWriter(client.getOutputStream()); - clientOut.print("REMOVE_COMPLETE"); + clientOut.println("REMOVE_COMPLETE"); clientOut.flush(); clientOut.close(); } @@ -427,11 +423,11 @@ public class Controller { } } - Map<Integer,List<String>> newAlloc = new HashMap<Integer,String[]>(); + Map<Integer,List<String>> newAlloc = new HashMap<Integer,List<String>>(); int pos = 0; int storeSize = Math.ceiling((fileList.size() * rFactor) / dstores.size()); for(Integer i : dstoreFiles.keySet()) { - newAlloc.add(i, new ArrayList<String>(storeSize)); + newAlloc.put(i, new ArrayList<String>(storeSize)); } for(String file : fileList) { for(int j=0; j<rFactor; j++) { @@ -479,7 +475,7 @@ public class Controller { } } - rebalanceMessages.add(port, list); + rebalanceMessages.put(port, list); if(rebalanceMessages.size() == dstores.size()) { rebalanceMessages.notify(); } diff --git a/Dstore.java b/Dstore.java index 6c23c0ee9cae12c287f3b5ef66842deda62f15ce..6989e271f1ddc84275ef9c0d7bc78fa685425897 100644 --- a/Dstore.java +++ b/Dstore.java @@ -49,7 +49,7 @@ public class Dstore { Socket socket = new Socket(InetAddress.getLocalHost(), cport); controllerIn = new BufferedReader(new InputStreamReader(socket.getInputStream())); controllerOut = new PrintWriter(socket.getOutputStream()); - controllerOut.print("JOIN " + port); + controllerOut.println("JOIN " + port); controllerOut.flush(); new Thread(() -> { @@ -111,25 +111,24 @@ public class Dstore { try { //Send ACK message to client PrintWriter out = new PrintWriter(client.getOutputStream()); - out.print("ACK"); + out.println("ACK"); out.flush(); out.close(); FileOutputStream writer = new FileOutputStream(fileFolder + "/" + filename, false); + InputStream reader = client.getInputStream(); //Receive + write file content from client - int byteCount = filesize; - while(byteCount > 0) { - byte[] nextLine = in.readLine().getBytes(); + byte[] nextLine = new byte[8]; + while(reader.readNBytes(nextLine, 0, 8) > 0) { writer.write(nextLine); writer.flush(); - byteCount -= nextLine.length; } writer.close(); //Send STORE_ACK message to the Controller PrintWriter controllerOut = new PrintWriter(new Socket(InetAddress.getLocalHost(), cport).getOutputStream()); - controllerOut.print("STORE_ACK " + filename); + controllerOut.println("STORE_ACK " + filename); controllerOut.flush(); controllerOut.close(); @@ -152,20 +151,22 @@ public class Dstore { reader = new FileInputStream(fileFolder + "/" + filename); } catch(FileNotFoundException e) { - out.print("ERROR DOES_NOT_EXIST"); + out.println("ERROR DOES_NOT_EXIST"); out.flush(); out.close(); return; } + OutputStream contentOut = client.getOutputStream(); byte[] buf = new byte[8]; while(reader.read(buf) != -1) { - out.print(new String(buf)); - out.flush(); + contentOut.write(new String(buf)); + contentOut.flush(); } reader.close(); out.close(); + contentOut.close(); } catch(IOException e) { e.printStackTrace(); @@ -181,11 +182,11 @@ public class Dstore { if(Files.deleteIfExists(path)) { //Send REMOVE_ACK message to client (the controller) - controllerOut.print("REMOVE_ACK"); + controllerOut.println("REMOVE_ACK"); } else { //Send DOES NOT EXIST error - controllerOut.print("ERROR DOES_NOT_EXIST " + filename); + controllerOut.println("ERROR DOES_NOT_EXIST " + filename); } controllerOut.flush(); @@ -201,7 +202,7 @@ public class Dstore { try { //Send a list of all files in fileFolder to client (the controller) for(File file : new File(fileFolder).listFiles()) { - controllerOut.print(file.getName()); + controllerOut.println(file.getName()); controllerOut.flush(); } } @@ -215,26 +216,27 @@ public class Dstore { new Thread(() -> { try { //Interpret files to send and files to remove from the message - Map<String,Integer[]> filesToSend; + Map<Integer,List<String>> filesToSend; String[] filesToRemove; int index; int numberToSend = Integer.parseInt(message[1]); index = 2; - filesToSend = new HashMap<String,Integer[]>(numberToSend); + filesToSend = new HashMap<Integer,List<String>>(); for(int i=0; i<numberToSend; i++) { String name = message[index]; index++; int numberOfReceivers = Integer.parseInt(message[index]); index++; - Integer[] receivers = new Integer[numberOfReceivers]; for(int j=0; j<numberOfReceivers; j++) { - receivers[j] = Integer.parseInt(message[index]); + Integer receiver = Integer.parseInt(message[index]); + if(!filesToSend.containsKey(receiver)) { + filesToSend.put(receiver,new ArrayList<String>()); + } + filesToSend.get(receiver).add(name); index++; } - - filesToSend.put(name, receivers); } int numberToRemove = Integer.parseInt(message[index]); @@ -246,36 +248,37 @@ public class Dstore { } //Send each file to send to the Dstore at the specified port number - for(String filename : filesToSend.keySet()) { - for(Integer dstore : filesToSend.get(filename)) { - try { - Socket socket = new Socket(InetAddress.getLocalHost(), dstore.intValue()); - PrintWriter out = new PrintWriter(socket.getOutputStream()); - out.write("STORE " + filename + " " + fileSizes.get(filename)); - out.flush(); - - BufferedReader in = new BufferedReader(new InputStreamReader(socket.getInputStream())); - if(!in.readLine().equals("ACK")) { - //Log error - } - String line; - while(line = new BufferedReader(new FileInputStream(fileFolder + "/" + filename))) { - if(line == null) { - out.write(""); - out.flush(); - break; + for(Integer dstore : filesToSend.keySet()) { + for(String filename : filesToSend.get(dstore)) { + new Thread(() -> { + try { + Socket socket = new Socket(InetAddress.getLocalHost(), dstore.intValue()); + PrintWriter out = new PrintWriter(socket.getOutputStream()); + out.println("STORE " + filename + " " + fileSizes.get(filename)); + out.flush(); + out.close(); + + BufferedReader in = new BufferedReader(new InputStreamReader(socket.getInputStream())); + if(!in.readLine().equals("ACK")) { + //Log error } - else { - out.write(line); - out.flush(); + in.close(); + + byte[] content = new byte[8]; + FileInputStream fileIn = new FileInputStream(fileFolder + "/" + filename); + OutputStream fileOut = client.getOutputStream(); + while(fileIn.read(content) > 0) { + fileOut.write(content); + fileOut.flush(); } + fileIn.close(); + fileOut.close(); + socket.close(); } - out.close(); - socket.close(); - } - catch(IOException e) { - e.printStackTrace(); - } + catch(IOException e) { + e.printStackTrace(); + } + }).start(); } } diff --git a/DstoreConnection.java b/DstoreConnection.java index 56bae5a19b398aa561497ca77e29b10d5c9ebb5a..39751a317f6d1b4e2a89dd36c240c3174aca5eaf 100644 --- a/DstoreConnection.java +++ b/DstoreConnection.java @@ -19,18 +19,26 @@ public class DstoreConnection { } public String sendAndReceive(String message) { - try { - writer.print(message); - writer.flush(); - } - catch(IOException e) { - e.printStackTrace(); - return ""; + synchronized(this) { + try { + writer.println(message); + writer.flush(); + } + catch(IOException e) { + e.printStackTrace(); + return ""; + } + return localReceive(); } - return receive(); } public String receive() { + synchronized(this) { + return localReceive(); + } + } + + protected String localReceive() { try { String returnMessage = reader.readLine(); return returnMessage; diff --git a/client-1.0.0.jar b/client-1.0.0.jar new file mode 100644 index 0000000000000000000000000000000000000000..343aedcebb38c4d4ae8580850d37bb6090bd8f9a Binary files /dev/null and b/client-1.0.0.jar differ