diff --git a/Controller.java b/Controller.java index 3e5f5593f75fccc522255311d3e2ea3cf1f2e47b..0d57d94952a4a47cc0982d2f60557c4c912983a6 100644 --- a/Controller.java +++ b/Controller.java @@ -16,6 +16,7 @@ public class Controller { public static ArrayList<DstoreListener> dstores; public static HashMap<String, GetStorageAcks> ongoingUploads; + public static HashMap<String, GetRemoveAcks> ongoingRemoves; public static Index index; @@ -23,6 +24,7 @@ public class Controller { ControllerLogger.init(Logger.LoggingType.ON_TERMINAL_ONLY); dstores = new ArrayList<DstoreListener>(); ongoingUploads = new HashMap<String, GetStorageAcks>(); + ongoingRemoves = new HashMap<String, GetRemoveAcks>(); index = new Index(); args = Parser.parse(args, 4); @@ -76,15 +78,28 @@ public class Controller { return reply; } + public static DstoreListener getDstoreListener(int port){ + for(DstoreListener dstoreListener : dstores){ + if(dstoreListener.port == port){ + return dstoreListener; + } + } + return null; + } + static class DstoreListener extends Thread { - Socket dstore; + public Socket dstore; public int port; public DstoreListener(Socket dstore, int port){ this.dstore = dstore; this.port = port; } + + public void send(String message) throws IOException{ + sendMessage(dstore, message); + } @Override public void run() { @@ -92,14 +107,20 @@ public class Controller { while(dstore.isConnected()){ try { String line = receiveMessage(dstore); - String[] parsedLine = Parser.parse(line); - String command = parsedLine[0]; - if(command.equals(Protocol.STORE_ACK_TOKEN)){ - String filename = parsedLine[1]; - ongoingUploads.get(filename).ack(port); + if(line != null){ + String[] parsedLine = Parser.parse(line); + String command = parsedLine[0]; + if(command.equals(Protocol.STORE_ACK_TOKEN)){ + String filename = parsedLine[1]; + ongoingUploads.get(filename).ack(port); + }else if(command.equals(Protocol.REMOVE_ACK_TOKEN)){ + String filename = parsedLine[1]; + ongoingRemoves.get(filename).ack(port); + } } } catch (Exception e) { + e.printStackTrace(); //TODO: handle exception } } @@ -120,47 +141,83 @@ public class Controller { ExecutorService executor = Executors.newSingleThreadExecutor(); while(client.isConnected()){ try { - String[] parsedLine = Parser.parse(line); - String command = parsedLine[0]; - if(command.equals(Protocol.STORE_TOKEN)){ - String filename = parsedLine[1]; - if(dstores.size() < R){ - sendMessage(client, Protocol.ERROR_NOT_ENOUGH_DSTORES_TOKEN); - }else if(index.fileExists(filename)){ - sendMessage(client, Protocol.ERROR_FILE_ALREADY_EXISTS_TOKEN); - }else{ - index.add(filename, "store in progress"); - int ports[] = new int[R]; - String portsList = ""; - for(int i = 0; i < R; i++){ - ports[i] = dstores.get(i).port; + if(line != null){ + String[] parsedLine = Parser.parse(line); + String command = parsedLine[0]; + if(command.equals(Protocol.STORE_TOKEN)){ + String filename = parsedLine[1]; + int filesize = Integer.parseInt(parsedLine[2]); + if(dstores.size() < R){ + sendMessage(client, Protocol.ERROR_NOT_ENOUGH_DSTORES_TOKEN); + }else if(index.fileExists(filename)){ + sendMessage(client, Protocol.ERROR_FILE_ALREADY_EXISTS_TOKEN); + }else{ + int ports[] = new int[R]; + String portsList = ""; + for(int i = 0; i < R; i++){ + ports[i] = dstores.get(i).port; + } + for(int port : ports){ + portsList += port + " "; + } + portsList = portsList.trim(); + + index.add(filename, filesize, "store in progress", ports); + sendMessage(client, Protocol.STORE_TO_TOKEN + " " + portsList); + GetStorageAcks storageAck = new GetStorageAcks(ports); + ongoingUploads.put(filename, storageAck); + Future<Boolean> future = executor.submit(storageAck); + if(future.get(timeout, TimeUnit.MILLISECONDS)){ + sendMessage(client, Protocol.STORE_COMPLETE_TOKEN); + index.changeStatus(filename, "store complete"); + ongoingUploads.remove(filename); + } } - for(int port : ports){ - portsList += port + " "; + }else if(command.equals(Protocol.LIST_TOKEN)){ + sendMessage(client, Protocol.LIST_TOKEN + " " + index.getFileList()); + }else if(command.equals(Protocol.LOAD_TOKEN)){ + String filename = parsedLine[1]; + if(dstores.size() < R){ + sendMessage(client, Protocol.ERROR_NOT_ENOUGH_DSTORES_TOKEN); + }else if(!index.fileExists(filename)){ + sendMessage(client, Protocol.ERROR_FILE_DOES_NOT_EXIST_TOKEN); + }else { + int port = index.getFileInfo(filename).storePorts[0]; + int filesize = index.getFileInfo(filename).filesize; + sendMessage(client, Protocol.LOAD_FROM_TOKEN + " " + port + " " + filesize); } - portsList = portsList.trim(); - - sendMessage(client, Protocol.STORE_TO_TOKEN + " " + portsList); - GetStorageAcks storageAck = new GetStorageAcks(ports); - ongoingUploads.put(filename, storageAck); - Future<Boolean> future = executor.submit(storageAck); - if(future.get(timeout, TimeUnit.MILLISECONDS)){ - sendMessage(client, Protocol.STORE_COMPLETE_TOKEN); - index.changeStatus(filename, "store complete"); + }else if(command.equals(Protocol.REMOVE_TOKEN)){ + String filename = parsedLine[1]; + if(dstores.size() < R){ + sendMessage(client, Protocol.ERROR_NOT_ENOUGH_DSTORES_TOKEN); + }else if(!index.fileExists(filename)){ + sendMessage(client, Protocol.ERROR_FILE_DOES_NOT_EXIST_TOKEN); + }else{ + index.changeStatus(filename, "remove in progress"); + int[] ports = index.getFileInfo(filename).storePorts; + for(int port : ports){ + getDstoreListener(port).send(Protocol.REMOVE_TOKEN + " " + filename); + } + GetRemoveAcks removeAck = new GetRemoveAcks(ports); + ongoingRemoves.put(filename, removeAck); + Future<Boolean> future = executor.submit(removeAck); + if(future.get(timeout, TimeUnit.MILLISECONDS)){ + System.out.println("REMOVE SUCCESS"); + sendMessage(client, Protocol.REMOVE_COMPLETE_TOKEN); + index.remove(filename); + ongoingRemoves.remove(filename); + } } } - }else if(command.equals(Protocol.LIST_TOKEN)){ - sendMessage(client, Protocol.LIST_TOKEN + " " + index.getFileList()); + line = receiveMessage(client); + } - line = receiveMessage(client); } catch (Exception e) { + e.printStackTrace(); break; } - } - - // TODO Auto-generated method stub - + } } } @@ -188,6 +245,31 @@ public class Controller { } } + + static class GetRemoveAcks implements Callable<Boolean> { + HashMap<Integer, Boolean> acks; + + public GetRemoveAcks(int[] ports){ + acks = new HashMap<Integer, Boolean>(); + for(int port : ports){ + acks.put(port, false); + } + } + + public void ack(int port){ + acks.put(port, true); + } + + @Override + public Boolean call() throws Exception { + while(true){ + if(!acks.values().contains(false)){ + return true; + } + } + } + + } } diff --git a/Dstore.java b/Dstore.java index be347b3bc43a8b341ba8a9253b5969b8221ab5b9..8fdc46ebaecfbc47e251b8c11017863b4fadd1c6 100644 --- a/Dstore.java +++ b/Dstore.java @@ -30,6 +30,7 @@ public class Dstore { clientListener.start(); } } catch (Exception e) { + e.printStackTrace(); //TODO: handle exception } } @@ -65,23 +66,46 @@ public class Dstore { while(client.isConnected()){ try { String line = receiveMessage(client); - String[] parsedLine = Parser.parse(line); - String command = parsedLine[0]; - if(command.equals(Protocol.STORE_TOKEN)){ - sendMessage(client, Protocol.ACK_TOKEN); - String filename = parsedLine[1]; - int filesize = Integer.parseInt(parsedLine[2]); - - File file = new File(file_folder, filename); - OutputStream fileOutput = new FileOutputStream(file); - InputStream clientInput = client.getInputStream(); - fileOutput.write(clientInput.readNBytes(filesize)); - fileOutput.close(); + if(line != null){ + String[] parsedLine = Parser.parse(line); + String command = parsedLine[0]; + if(command.equals(Protocol.STORE_TOKEN)){ + sendMessage(client, Protocol.ACK_TOKEN); + String filename = parsedLine[1]; + int filesize = Integer.parseInt(parsedLine[2]); + + File file = new File(file_folder, filename); + OutputStream fileOutput = new FileOutputStream(file); + InputStream clientInput = client.getInputStream(); + fileOutput.write(clientInput.readNBytes(filesize)); + fileOutput.close(); + + sendMessage(controller, Protocol.STORE_ACK_TOKEN + " " + filename); + }else if(command.equals(Protocol.LOAD_DATA_TOKEN)){ + String filename = parsedLine[1]; + + File file = new File(file_folder, filename); + + BufferedInputStream bis = new BufferedInputStream(new FileInputStream(file)); + byte[] bytearray = bis.readAllBytes(); + + OutputStream os = client.getOutputStream(); + os.write(bytearray); + os.flush(); + bis.close(); + }else if(command.equals(Protocol.REMOVE_TOKEN)){ + String filename = parsedLine[1]; + + File file = new File(file_folder, filename); + if(file.delete()){ + sendMessage(controller, Protocol.REMOVE_ACK_TOKEN + " " + filename); + } + } + - sendMessage(controller, Protocol.STORE_ACK_TOKEN + " " + filename); } - } catch (Exception e) { + e.printStackTrace(); //TODO: handle exception } } diff --git a/Index.java b/Index.java index bd3217525c5085c2656074e56aadbd0d2757e8d4..9ec1651b5248c9b3bd4ae9e407affb23de9531c6 100644 --- a/Index.java +++ b/Index.java @@ -3,11 +3,15 @@ import java.util.ArrayList; public class Index { class FileInfo { public String name; + public int filesize; public String status; + public int[] storePorts; - public FileInfo(String name, String status){ + public FileInfo(String name, int filesize, String status, int[] storePorts){ this.name = name; + this.filesize = filesize; this.status = status; + this.storePorts = storePorts; } } @@ -17,15 +21,19 @@ public class Index { index = new ArrayList<FileInfo>(); } - public boolean add(String name, String status){ + public boolean add(String name, int filesize, String status, int[] storePorts){ if(getFileInfo(name) == null){ - index.add(new FileInfo(name, status)); + index.add(new FileInfo(name, filesize, status, storePorts)); return true; }else{ return false; } } + public void remove(String name){ + index.remove(getFileInfo(name)); + } + public String getFileList(){ String file_list = ""; for(FileInfo fInfo : index){ @@ -42,7 +50,7 @@ public class Index { public FileInfo getFileInfo(String name){ for(FileInfo fileInfo : index){ - if(fileInfo.name == name){ + if(fileInfo.name.equals(name)){ return fileInfo; } }