Skip to content
Snippets Groups Projects
Commit eb3412db authored by cg4g19's avatar cg4g19
Browse files

Updated for load and almost remove

parent 6d4aac5b
Branches
No related tags found
No related merge requests found
...@@ -16,6 +16,7 @@ public class Controller { ...@@ -16,6 +16,7 @@ public class Controller {
public static ArrayList<DstoreListener> dstores; public static ArrayList<DstoreListener> dstores;
public static HashMap<String, GetStorageAcks> ongoingUploads; public static HashMap<String, GetStorageAcks> ongoingUploads;
public static HashMap<String, GetRemoveAcks> ongoingRemoves;
public static Index index; public static Index index;
...@@ -23,6 +24,7 @@ public class Controller { ...@@ -23,6 +24,7 @@ public class Controller {
ControllerLogger.init(Logger.LoggingType.ON_TERMINAL_ONLY); ControllerLogger.init(Logger.LoggingType.ON_TERMINAL_ONLY);
dstores = new ArrayList<DstoreListener>(); dstores = new ArrayList<DstoreListener>();
ongoingUploads = new HashMap<String, GetStorageAcks>(); ongoingUploads = new HashMap<String, GetStorageAcks>();
ongoingRemoves = new HashMap<String, GetRemoveAcks>();
index = new Index(); index = new Index();
args = Parser.parse(args, 4); args = Parser.parse(args, 4);
...@@ -76,9 +78,18 @@ public class Controller { ...@@ -76,9 +78,18 @@ public class Controller {
return reply; return reply;
} }
public static DstoreListener getDstoreListener(int port){
for(DstoreListener dstoreListener : dstores){
if(dstoreListener.port == port){
return dstoreListener;
}
}
return null;
}
static class DstoreListener extends Thread { static class DstoreListener extends Thread {
Socket dstore; public Socket dstore;
public int port; public int port;
public DstoreListener(Socket dstore, int port){ public DstoreListener(Socket dstore, int port){
...@@ -86,20 +97,30 @@ public class Controller { ...@@ -86,20 +97,30 @@ public class Controller {
this.port = port; this.port = port;
} }
public void send(String message) throws IOException{
sendMessage(dstore, message);
}
@Override @Override
public void run() { public void run() {
ExecutorService executor = Executors.newSingleThreadExecutor(); ExecutorService executor = Executors.newSingleThreadExecutor();
while(dstore.isConnected()){ while(dstore.isConnected()){
try { try {
String line = receiveMessage(dstore); String line = receiveMessage(dstore);
if(line != null){
String[] parsedLine = Parser.parse(line); String[] parsedLine = Parser.parse(line);
String command = parsedLine[0]; String command = parsedLine[0];
if(command.equals(Protocol.STORE_ACK_TOKEN)){ if(command.equals(Protocol.STORE_ACK_TOKEN)){
String filename = parsedLine[1]; String filename = parsedLine[1];
ongoingUploads.get(filename).ack(port); ongoingUploads.get(filename).ack(port);
}else if(command.equals(Protocol.REMOVE_ACK_TOKEN)){
String filename = parsedLine[1];
ongoingRemoves.get(filename).ack(port);
}
} }
} catch (Exception e) { } catch (Exception e) {
e.printStackTrace();
//TODO: handle exception //TODO: handle exception
} }
} }
...@@ -120,16 +141,17 @@ public class Controller { ...@@ -120,16 +141,17 @@ public class Controller {
ExecutorService executor = Executors.newSingleThreadExecutor(); ExecutorService executor = Executors.newSingleThreadExecutor();
while(client.isConnected()){ while(client.isConnected()){
try { try {
if(line != null){
String[] parsedLine = Parser.parse(line); String[] parsedLine = Parser.parse(line);
String command = parsedLine[0]; String command = parsedLine[0];
if(command.equals(Protocol.STORE_TOKEN)){ if(command.equals(Protocol.STORE_TOKEN)){
String filename = parsedLine[1]; String filename = parsedLine[1];
int filesize = Integer.parseInt(parsedLine[2]);
if(dstores.size() < R){ if(dstores.size() < R){
sendMessage(client, Protocol.ERROR_NOT_ENOUGH_DSTORES_TOKEN); sendMessage(client, Protocol.ERROR_NOT_ENOUGH_DSTORES_TOKEN);
}else if(index.fileExists(filename)){ }else if(index.fileExists(filename)){
sendMessage(client, Protocol.ERROR_FILE_ALREADY_EXISTS_TOKEN); sendMessage(client, Protocol.ERROR_FILE_ALREADY_EXISTS_TOKEN);
}else{ }else{
index.add(filename, "store in progress");
int ports[] = new int[R]; int ports[] = new int[R];
String portsList = ""; String portsList = "";
for(int i = 0; i < R; i++){ for(int i = 0; i < R; i++){
...@@ -140,6 +162,7 @@ public class Controller { ...@@ -140,6 +162,7 @@ public class Controller {
} }
portsList = portsList.trim(); portsList = portsList.trim();
index.add(filename, filesize, "store in progress", ports);
sendMessage(client, Protocol.STORE_TO_TOKEN + " " + portsList); sendMessage(client, Protocol.STORE_TO_TOKEN + " " + portsList);
GetStorageAcks storageAck = new GetStorageAcks(ports); GetStorageAcks storageAck = new GetStorageAcks(ports);
ongoingUploads.put(filename, storageAck); ongoingUploads.put(filename, storageAck);
...@@ -147,20 +170,54 @@ public class Controller { ...@@ -147,20 +170,54 @@ public class Controller {
if(future.get(timeout, TimeUnit.MILLISECONDS)){ if(future.get(timeout, TimeUnit.MILLISECONDS)){
sendMessage(client, Protocol.STORE_COMPLETE_TOKEN); sendMessage(client, Protocol.STORE_COMPLETE_TOKEN);
index.changeStatus(filename, "store complete"); index.changeStatus(filename, "store complete");
ongoingUploads.remove(filename);
} }
} }
}else if(command.equals(Protocol.LIST_TOKEN)){ }else if(command.equals(Protocol.LIST_TOKEN)){
sendMessage(client, Protocol.LIST_TOKEN + " " + index.getFileList()); sendMessage(client, Protocol.LIST_TOKEN + " " + index.getFileList());
}else if(command.equals(Protocol.LOAD_TOKEN)){
String filename = parsedLine[1];
if(dstores.size() < R){
sendMessage(client, Protocol.ERROR_NOT_ENOUGH_DSTORES_TOKEN);
}else if(!index.fileExists(filename)){
sendMessage(client, Protocol.ERROR_FILE_DOES_NOT_EXIST_TOKEN);
}else {
int port = index.getFileInfo(filename).storePorts[0];
int filesize = index.getFileInfo(filename).filesize;
sendMessage(client, Protocol.LOAD_FROM_TOKEN + " " + port + " " + filesize);
}
}else if(command.equals(Protocol.REMOVE_TOKEN)){
String filename = parsedLine[1];
if(dstores.size() < R){
sendMessage(client, Protocol.ERROR_NOT_ENOUGH_DSTORES_TOKEN);
}else if(!index.fileExists(filename)){
sendMessage(client, Protocol.ERROR_FILE_DOES_NOT_EXIST_TOKEN);
}else{
index.changeStatus(filename, "remove in progress");
int[] ports = index.getFileInfo(filename).storePorts;
for(int port : ports){
getDstoreListener(port).send(Protocol.REMOVE_TOKEN + " " + filename);
}
GetRemoveAcks removeAck = new GetRemoveAcks(ports);
ongoingRemoves.put(filename, removeAck);
Future<Boolean> future = executor.submit(removeAck);
if(future.get(timeout, TimeUnit.MILLISECONDS)){
System.out.println("REMOVE SUCCESS");
sendMessage(client, Protocol.REMOVE_COMPLETE_TOKEN);
index.remove(filename);
ongoingRemoves.remove(filename);
}
}
} }
line = receiveMessage(client); line = receiveMessage(client);
}
} catch (Exception e) { } catch (Exception e) {
e.printStackTrace();
break; break;
} }
} }
// TODO Auto-generated method stub
} }
} }
...@@ -188,6 +245,31 @@ public class Controller { ...@@ -188,6 +245,31 @@ public class Controller {
} }
} }
static class GetRemoveAcks implements Callable<Boolean> {
HashMap<Integer, Boolean> acks;
public GetRemoveAcks(int[] ports){
acks = new HashMap<Integer, Boolean>();
for(int port : ports){
acks.put(port, false);
}
}
public void ack(int port){
acks.put(port, true);
}
@Override
public Boolean call() throws Exception {
while(true){
if(!acks.values().contains(false)){
return true;
}
}
}
}
} }
...@@ -30,6 +30,7 @@ public class Dstore { ...@@ -30,6 +30,7 @@ public class Dstore {
clientListener.start(); clientListener.start();
} }
} catch (Exception e) { } catch (Exception e) {
e.printStackTrace();
//TODO: handle exception //TODO: handle exception
} }
} }
...@@ -65,6 +66,7 @@ public class Dstore { ...@@ -65,6 +66,7 @@ public class Dstore {
while(client.isConnected()){ while(client.isConnected()){
try { try {
String line = receiveMessage(client); String line = receiveMessage(client);
if(line != null){
String[] parsedLine = Parser.parse(line); String[] parsedLine = Parser.parse(line);
String command = parsedLine[0]; String command = parsedLine[0];
if(command.equals(Protocol.STORE_TOKEN)){ if(command.equals(Protocol.STORE_TOKEN)){
...@@ -79,9 +81,31 @@ public class Dstore { ...@@ -79,9 +81,31 @@ public class Dstore {
fileOutput.close(); fileOutput.close();
sendMessage(controller, Protocol.STORE_ACK_TOKEN + " " + filename); sendMessage(controller, Protocol.STORE_ACK_TOKEN + " " + filename);
}else if(command.equals(Protocol.LOAD_DATA_TOKEN)){
String filename = parsedLine[1];
File file = new File(file_folder, filename);
BufferedInputStream bis = new BufferedInputStream(new FileInputStream(file));
byte[] bytearray = bis.readAllBytes();
OutputStream os = client.getOutputStream();
os.write(bytearray);
os.flush();
bis.close();
}else if(command.equals(Protocol.REMOVE_TOKEN)){
String filename = parsedLine[1];
File file = new File(file_folder, filename);
if(file.delete()){
sendMessage(controller, Protocol.REMOVE_ACK_TOKEN + " " + filename);
}
} }
}
} catch (Exception e) { } catch (Exception e) {
e.printStackTrace();
//TODO: handle exception //TODO: handle exception
} }
} }
......
...@@ -3,11 +3,15 @@ import java.util.ArrayList; ...@@ -3,11 +3,15 @@ import java.util.ArrayList;
public class Index { public class Index {
class FileInfo { class FileInfo {
public String name; public String name;
public int filesize;
public String status; public String status;
public int[] storePorts;
public FileInfo(String name, String status){ public FileInfo(String name, int filesize, String status, int[] storePorts){
this.name = name; this.name = name;
this.filesize = filesize;
this.status = status; this.status = status;
this.storePorts = storePorts;
} }
} }
...@@ -17,15 +21,19 @@ public class Index { ...@@ -17,15 +21,19 @@ public class Index {
index = new ArrayList<FileInfo>(); index = new ArrayList<FileInfo>();
} }
public boolean add(String name, String status){ public boolean add(String name, int filesize, String status, int[] storePorts){
if(getFileInfo(name) == null){ if(getFileInfo(name) == null){
index.add(new FileInfo(name, status)); index.add(new FileInfo(name, filesize, status, storePorts));
return true; return true;
}else{ }else{
return false; return false;
} }
} }
public void remove(String name){
index.remove(getFileInfo(name));
}
public String getFileList(){ public String getFileList(){
String file_list = ""; String file_list = "";
for(FileInfo fInfo : index){ for(FileInfo fInfo : index){
...@@ -42,7 +50,7 @@ public class Index { ...@@ -42,7 +50,7 @@ public class Index {
public FileInfo getFileInfo(String name){ public FileInfo getFileInfo(String name){
for(FileInfo fileInfo : index){ for(FileInfo fileInfo : index){
if(fileInfo.name == name){ if(fileInfo.name.equals(name)){
return fileInfo; return fileInfo;
} }
} }
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment