Skip to content
Snippets Groups Projects
Commit a10f2bce authored by ik1g19's avatar ik1g19
Browse files

add more progress to store operation

parent e8cca92a
No related branches found
No related tags found
No related merge requests found
...@@ -39,6 +39,7 @@ public class Controller extends Server { ...@@ -39,6 +39,7 @@ public class Controller extends Server {
} }
public static void main(String args[]) { public static void main(String args[]) {
Stream<String> str = Arrays.stream(args); Stream<String> str = Arrays.stream(args);
...@@ -53,12 +54,16 @@ public class Controller extends Server { ...@@ -53,12 +54,16 @@ public class Controller extends Server {
} }
@Override @Override
protected void handleRequest(String request, Socket client) { protected void handleRequest(String request, Socket client) {
String args[] = request.split(" "); String args[] = request.split(" ");
switch(args[0]) {
case "JOIN": String command = args[0];
if (command.equals("JOIN")) {
Integer port = Integer.parseInt(args[1]); Integer port = Integer.parseInt(args[1]);
send("LIST", client); send("LIST", client);
...@@ -79,9 +84,10 @@ public class Controller extends Server { ...@@ -79,9 +84,10 @@ public class Controller extends Server {
send("ACK", client); send("ACK", client);
break; }
else if (command.equals("STORE")) {
case "STORE":
String filename = args[1]; String filename = args[1];
Long filesize = Long.parseLong(args[2]); Long filesize = Long.parseLong(args[2]);
...@@ -101,7 +107,20 @@ public class Controller extends Server { ...@@ -101,7 +107,20 @@ public class Controller extends Server {
send("STORE_TO " + ports, client); send("STORE_TO " + ports, client);
break; }
else if (command.equals("STORE_ACK")) {
String filename = args[1];
DstoreFile file = fileIndex.get(filename);
file.storeAck();
if ( file.ackCheck() ) {
file.setStoreInProgress(false);
file.setStoreComplete(true);
}
} }
} }
......
package ftp; package ftp;
import ftp.DstoreConnection; import ftp.DstoreConnection;
import ftp.Index;
import ftp.Server; import ftp.Server;
import java.io.File; import java.io.*;
import java.io.IOException;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.net.Socket; import java.net.Socket;
import java.net.SocketException; import java.net.SocketException;
...@@ -32,15 +30,31 @@ public class Dstore extends Server { ...@@ -32,15 +30,31 @@ public class Dstore extends Server {
* @param file_folder where to store data locally * @param file_folder where to store data locally
*/ */
public Dstore(int port, int cport, int timeout, String file_folder) { public Dstore(int port, int cport, int timeout, String file_folder) {
this.port = port; this.port = port;
this.cport = cport; this.cport = cport;
this.timeout = timeout; this.timeout = timeout;
this.file_folder = file_folder; this.file_folder = file_folder;
Socket controller = null; Socket controller = null;
connectToController(controller);
if (controller != null) {
send("JOIN " + port, controller);
handleRequest(readSocket(controller), controller);
start();
}
else threadIDErr("Unable to connect to Controller");
}
public Socket connectToController(Socket controller) {
Boolean joined = false; Boolean joined = false;
for (int i = 0; (i < 10) && !joined; i++) { for (int i = 0; (i < 10) && !joined; i++) {
try { controller = new Socket("localhost",cport); joined = true; } try { controller = new Socket("localhost",cport); joined = true; }
catch (IOException e) { catch (IOException e) {
...@@ -51,16 +65,7 @@ public class Dstore extends Server { ...@@ -51,16 +65,7 @@ public class Dstore extends Server {
catch (InterruptedException e) {threadIDErr(e.getMessage());} catch (InterruptedException e) {threadIDErr(e.getMessage());}
} }
return controller;
if (controller != null) {
send("JOIN " + port, controller);
handleRequest(readSocket(controller), controller);
start();
}
else {
threadIDErr("Unable to connect to Controller");
}
} }
...@@ -117,7 +122,41 @@ public class Dstore extends Server { ...@@ -117,7 +122,41 @@ public class Dstore extends Server {
send("ACK", client); send("ACK", client);
try {
receiveFile(client, file_folder + "\\" + filename);
} catch (IOException e) {
threadIDErr(e.getMessage());
}
Socket controller = null;
connectToController(controller);
if (controller != null) {
send("STORE_ACK " + filename, controller);
}
else threadIDErr("Unable to connect to Controller");
}
}
public void receiveFile(Socket client, String destination) throws IOException {
InputStream in = client.getInputStream();
byte[] buf = new byte[1000]; int buflen;
File outputFile = new File(destination);
FileOutputStream out = new FileOutputStream(outputFile);
while ((buflen = in.read(buf)) != -1) {
out.write(buf,0,buflen);
} }
in.close(); out.close();
} }
} }
...@@ -10,6 +10,7 @@ public class DstoreConnection { ...@@ -10,6 +10,7 @@ public class DstoreConnection {
private int id; private int id;
public DstoreConnection(List<String[]> files, int port, int id) { public DstoreConnection(List<String[]> files, int port, int id) {
this.port = port; this.port = port;
this.id = id; this.id = id;
...@@ -22,11 +23,17 @@ public class DstoreConnection { ...@@ -22,11 +23,17 @@ public class DstoreConnection {
} }
public void addFile(String filename, Long filesize) { file_index.addFile(filename,filesize); }
public void addFile(DstoreFile file) { file_index.add(file); } public void addFile(String filename, Long filesize) { file_index.addFile(filename,filesize).addDstore(this); }
public void addFile(DstoreFile file) { file_index.put(file.getFilename(),file).addDstore(this); }
public int getPort() { return port; } public int getPort() { return port; }
public int getID() { return id; }
} }
...@@ -10,10 +10,18 @@ public class DstoreFile { ...@@ -10,10 +10,18 @@ public class DstoreFile {
private boolean removeInProgress; private boolean removeInProgress;
private boolean removeComplete; private boolean removeComplete;
private int storeAcksQuota;
private int storeAcks = 0;
private DstoreIndex dstoreIndex;
public DstoreFile(String filename, Long filesize) { public DstoreFile(String filename, Long filesize) {
this.filename = filename; this.filename = filename;
this.filesize = filesize; this.filesize = filesize;
dstoreIndex = new DstoreIndex();
} }
...@@ -23,13 +31,27 @@ public class DstoreFile { ...@@ -23,13 +31,27 @@ public class DstoreFile {
public boolean isRemoveInProgress() {return removeInProgress;} public boolean isRemoveInProgress() {return removeInProgress;}
public boolean isRemoveComplete() {return removeComplete;} public boolean isRemoveComplete() {return removeComplete;}
public void setStoreInProgress(Boolean store) { storeInProgress = store; } public void setStoreInProgress(Boolean store) { storeInProgress = store; }
public void setStoreComplete(Boolean complete) { storeComplete = complete; } public void setStoreComplete(Boolean complete) { storeComplete = complete; }
public void setRemoveInProgress(Boolean remove) { removeInProgress = remove; } public void setRemoveInProgress(Boolean remove) { removeInProgress = remove; }
public void setRemoveComplete(Boolean complete) { removeComplete = complete; } public void setRemoveComplete(Boolean complete) { removeComplete = complete; }
public String getFilename() { return filename; } public String getFilename() { return filename; }
public Long getFilesize() { return filesize; } public Long getFilesize() { return filesize; }
public void setStoreAcksQuota(int quota) { storeAcksQuota = quota; }
public int getStoreAcks() { return storeAcks; }
public int storeAck() { return storeAcks++; }
public Boolean storeAckCheck() { return storeAcks++ == storeAcksQuota; }
public Boolean ackCheck() { return storeAcks == storeAcksQuota; }
public void addDstore(DstoreConnection dstore) { dstoreIndex.put(dstore.getID(),dstore); }
} }
package ftp; package ftp;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.stream.Collectors;
public class DstoreIndex extends Index<DstoreConnection> { public class DstoreIndex extends HashMap<Integer,DstoreConnection> {
public DstoreIndex() {} public DstoreIndex() {}
public DstoreIndex(List<DstoreConnection> list) {
super(list);
}
public DstoreConnection addDstore(List<String[]> files, int port, int id) { public DstoreConnection addDstore(List<String[]> files, int port, Integer id) {
DstoreConnection dstore = new DstoreConnection(files,port,id); DstoreConnection dstore = new DstoreConnection(files,port,id);
add(dstore); put(id, dstore);
return dstore; return dstore;
} }
public List<DstoreConnection> getFirstN(int n) {
return entrySet().stream()
.map(x -> x.getValue())
.limit(n)
.collect(Collectors.toList());
}
} }
package ftp; package ftp;
import java.util.HashMap;
import java.util.List; import java.util.List;
public class FileIndex extends Index<DstoreFile>{ public class FileIndex extends HashMap<String,DstoreFile> {
public FileIndex() {} public FileIndex() {}
public FileIndex(List<DstoreFile> list) { public FileIndex(List<DstoreFile> list) {
super(list); list.stream().forEach(x -> put(x.getFilename(),x));
} }
public DstoreFile addFile(String filename, Long filesize) { public DstoreFile addFile(String filename, Long filesize) {
DstoreFile file = new DstoreFile(filename,filesize); DstoreFile file = new DstoreFile(filename,filesize);
add(file); put(filename,file);
return file; return file;
} }
......
package ftp;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
public class Index<E> extends ArrayList<E> {
public Index() {}
public Index(List<E> list) {
super(list);
}
public List<E> getFirstN(int n) {
return stream().limit(n).collect(Collectors.toList());
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment