diff --git a/src/ftp/Controller.java b/src/ftp/Controller.java index 008ccf6494c150909d8261abfb17e443205254e1..3efbd6efa22f729606be673f2f631a89caaadfff 100644 --- a/src/ftp/Controller.java +++ b/src/ftp/Controller.java @@ -39,6 +39,7 @@ public class Controller extends Server { } + public static void main(String args[]) { Stream<String> str = Arrays.stream(args); @@ -53,55 +54,73 @@ public class Controller extends Server { } + @Override protected void handleRequest(String request, Socket client) { String args[] = request.split(" "); - switch(args[0]) { - case "JOIN": - Integer port = Integer.parseInt(args[1]); - send("LIST", client); + String command = args[0]; + + if (command.equals("JOIN")) { + + Integer port = Integer.parseInt(args[1]); - String files = readSocket(client); + send("LIST", client); - // todo use stream instead - // - List<String[]> filesInfo = new ArrayList<String[]>() {{ - String[] filesAndSizes = files.split("\\|"); - for (String file : filesAndSizes) add(file.split(" ")); - }}; + String files = readSocket(client); - dstoreIndex.addDstore(filesInfo, port, nextID); - threadIDOutput("New Dstore (ID: " + nextID + ") successfully joined"); - nextID++; + // todo use stream instead + // + List<String[]> filesInfo = new ArrayList<String[]>() {{ + String[] filesAndSizes = files.split("\\|"); + for (String file : filesAndSizes) add(file.split(" ")); + }}; + dstoreIndex.addDstore(filesInfo, port, nextID); + threadIDOutput("New Dstore (ID: " + nextID + ") successfully joined"); + nextID++; - send ("ACK", client); - break; + send("ACK", client); + + } + + else if (command.equals("STORE")) { - case "STORE": - String filename = args[1]; - Long filesize = Long.parseLong(args[2]); + String filename = args[1]; + Long filesize = Long.parseLong(args[2]); - DstoreFile file = fileIndex.addFile(filename,filesize); + DstoreFile file = fileIndex.addFile(filename, filesize); - file.setStoreInProgress(true); + file.setStoreInProgress(true); - List<DstoreConnection> dstores = dstoreIndex.getFirstN(r); + List<DstoreConnection> dstores = dstoreIndex.getFirstN(r); + + dstores.stream().forEach(x -> x.addFile(file)); + + String ports = dstores.stream(). + map(x -> Integer.toString(x.getPort())). + collect(Collectors.joining(" ")); + + + send("STORE_TO " + ports, client); + + } - dstores.stream().forEach(x -> x.addFile(file)); + else if (command.equals("STORE_ACK")) { - String ports = dstores.stream(). - map(x -> Integer.toString(x.getPort())). - collect(Collectors.joining(" ")); + String filename = args[1]; + DstoreFile file = fileIndex.get(filename); + file.storeAck(); - send("STORE_TO " + ports, client); + if ( file.ackCheck() ) { + file.setStoreInProgress(false); + file.setStoreComplete(true); + } - break; } } diff --git a/src/ftp/Dstore.java b/src/ftp/Dstore.java index 8c9091df6b773c6642910c3e3fb749eb0be6e558..315694ec9bfb3c638d95649abb14583d89b408be 100644 --- a/src/ftp/Dstore.java +++ b/src/ftp/Dstore.java @@ -1,11 +1,9 @@ package ftp; import ftp.DstoreConnection; -import ftp.Index; import ftp.Server; -import java.io.File; -import java.io.IOException; +import java.io.*; import java.net.InetSocketAddress; import java.net.Socket; import java.net.SocketException; @@ -32,35 +30,42 @@ public class Dstore extends Server { * @param file_folder where to store data locally */ public Dstore(int port, int cport, int timeout, String file_folder) { + this.port = port; this.cport = cport; this.timeout = timeout; this.file_folder = file_folder; + Socket controller = null; + connectToController(controller); + + + if (controller != null) { + send("JOIN " + port, controller); + handleRequest(readSocket(controller), controller); + start(); + } + else threadIDErr("Unable to connect to Controller"); + + } + + public Socket connectToController(Socket controller) { Boolean joined = false; + for (int i = 0; (i < 10) && !joined; i++) { try { controller = new Socket("localhost",cport); joined = true; } - catch(IOException e) { + catch (IOException e) { threadIDErr(e.getMessage()); } try {threadIDErr("Retrying Connection..."); Thread.sleep(1000);} - catch(InterruptedException e) {threadIDErr(e.getMessage());} - } - - - if (controller != null) { - send("JOIN " + port, controller); - handleRequest(readSocket(controller), controller); - start(); - } - else { - threadIDErr("Unable to connect to Controller"); + catch (InterruptedException e) {threadIDErr(e.getMessage());} } + return controller; } @@ -117,7 +122,41 @@ public class Dstore extends Server { send("ACK", client); + try { + receiveFile(client, file_folder + "\\" + filename); + } catch (IOException e) { + threadIDErr(e.getMessage()); + } + + + Socket controller = null; + connectToController(controller); + + if (controller != null) { + send("STORE_ACK " + filename, controller); + } + else threadIDErr("Unable to connect to Controller"); + } + } + + + + public void receiveFile(Socket client, String destination) throws IOException { + InputStream in = client.getInputStream(); + + byte[] buf = new byte[1000]; int buflen; + + File outputFile = new File(destination); + FileOutputStream out = new FileOutputStream(outputFile); + + + while ((buflen = in.read(buf)) != -1) { + out.write(buf,0,buflen); } + + + in.close(); out.close(); + } } diff --git a/src/ftp/DstoreConnection.java b/src/ftp/DstoreConnection.java index 8ded0f94e655c2938a2cc9254e18eb281e913d73..b77ca1725fd444b5dfef67b8ccd101d679106551 100644 --- a/src/ftp/DstoreConnection.java +++ b/src/ftp/DstoreConnection.java @@ -10,6 +10,7 @@ public class DstoreConnection { private int id; + public DstoreConnection(List<String[]> files, int port, int id) { this.port = port; this.id = id; @@ -22,11 +23,17 @@ public class DstoreConnection { } - public void addFile(String filename, Long filesize) { file_index.addFile(filename,filesize); } - public void addFile(DstoreFile file) { file_index.add(file); } + public void addFile(String filename, Long filesize) { file_index.addFile(filename,filesize).addDstore(this); } + + public void addFile(DstoreFile file) { file_index.put(file.getFilename(),file).addDstore(this); } + public int getPort() { return port; } + + + public int getID() { return id; } + } diff --git a/src/ftp/DstoreFile.java b/src/ftp/DstoreFile.java index 3e91f3cd15b94ede9eaefee44364e346d2dcb07a..677a1f0aa6c5974f4e80fee6bd227972e0d626d1 100644 --- a/src/ftp/DstoreFile.java +++ b/src/ftp/DstoreFile.java @@ -10,10 +10,18 @@ public class DstoreFile { private boolean removeInProgress; private boolean removeComplete; + private int storeAcksQuota; + private int storeAcks = 0; + + private DstoreIndex dstoreIndex; + + public DstoreFile(String filename, Long filesize) { this.filename = filename; this.filesize = filesize; + + dstoreIndex = new DstoreIndex(); } @@ -23,13 +31,27 @@ public class DstoreFile { public boolean isRemoveInProgress() {return removeInProgress;} public boolean isRemoveComplete() {return removeComplete;} - public void setStoreInProgress(Boolean store) { storeInProgress = store; } public void setStoreComplete(Boolean complete) { storeComplete = complete; } public void setRemoveInProgress(Boolean remove) { removeInProgress = remove; } public void setRemoveComplete(Boolean complete) { removeComplete = complete; } + public String getFilename() { return filename; } public Long getFilesize() { return filesize; } + + + + public void setStoreAcksQuota(int quota) { storeAcksQuota = quota; } + public int getStoreAcks() { return storeAcks; } + public int storeAck() { return storeAcks++; } + + public Boolean storeAckCheck() { return storeAcks++ == storeAcksQuota; } + public Boolean ackCheck() { return storeAcks == storeAcksQuota; } + + + + public void addDstore(DstoreConnection dstore) { dstoreIndex.put(dstore.getID(),dstore); } + } diff --git a/src/ftp/DstoreIndex.java b/src/ftp/DstoreIndex.java index 88532cf1090fa067ea1bdbda2643058fd98cc254..9a56ef153e9930b3fcfca4b1a2427af79a17b6bc 100644 --- a/src/ftp/DstoreIndex.java +++ b/src/ftp/DstoreIndex.java @@ -1,20 +1,29 @@ package ftp; +import java.util.ArrayList; +import java.util.HashMap; import java.util.List; +import java.util.stream.Collectors; -public class DstoreIndex extends Index<DstoreConnection> { +public class DstoreIndex extends HashMap<Integer,DstoreConnection> { public DstoreIndex() {} - public DstoreIndex(List<DstoreConnection> list) { - super(list); - } - public DstoreConnection addDstore(List<String[]> files, int port, int id) { + public DstoreConnection addDstore(List<String[]> files, int port, Integer id) { DstoreConnection dstore = new DstoreConnection(files,port,id); - add(dstore); + put(id, dstore); return dstore; } + + + public List<DstoreConnection> getFirstN(int n) { + return entrySet().stream() + .map(x -> x.getValue()) + .limit(n) + .collect(Collectors.toList()); + } + } diff --git a/src/ftp/FileIndex.java b/src/ftp/FileIndex.java index d79efc1319921b1a8199e276fc115b2f03e9f336..4e248322800c0e646189c0f572ce3cd09cad59dc 100644 --- a/src/ftp/FileIndex.java +++ b/src/ftp/FileIndex.java @@ -1,19 +1,21 @@ package ftp; +import java.util.HashMap; import java.util.List; -public class FileIndex extends Index<DstoreFile>{ +public class FileIndex extends HashMap<String,DstoreFile> { public FileIndex() {} public FileIndex(List<DstoreFile> list) { - super(list); + list.stream().forEach(x -> put(x.getFilename(),x)); } + public DstoreFile addFile(String filename, Long filesize) { DstoreFile file = new DstoreFile(filename,filesize); - add(file); + put(filename,file); return file; } diff --git a/src/ftp/Index.java b/src/ftp/Index.java deleted file mode 100644 index cebeb419a2b612d26afe03124b82f8fc72577030..0000000000000000000000000000000000000000 --- a/src/ftp/Index.java +++ /dev/null @@ -1,20 +0,0 @@ -package ftp; - -import java.util.ArrayList; -import java.util.List; -import java.util.stream.Collectors; - -public class Index<E> extends ArrayList<E> { - - public Index() {} - - public Index(List<E> list) { - super(list); - } - - - public List<E> getFirstN(int n) { - return stream().limit(n).collect(Collectors.toList()); - } - -}