diff --git a/Controller.java b/Controller.java index 0d57d94952a4a47cc0982d2f60557c4c912983a6..54a5d8205cdccba5f75bbb225eb299babb3c7deb 100644 --- a/Controller.java +++ b/Controller.java @@ -1,6 +1,7 @@ import java.io.*; import java.net.*; import java.util.ArrayList; +import java.util.Arrays; import java.util.HashMap; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; @@ -46,6 +47,17 @@ public class Controller { DstoreListener dstoreListener = new DstoreListener(client, port); dstores.add(dstoreListener); dstoreListener.start(); + + /*for(DstoreListener listener : dstores){ + Socket dstore = listener.dstore; + System.out.println("DSTORES: " + dstores.size()); + line = sendMessageAndAwaitReply(dstore, Protocol.LIST_TOKEN); + parsedLine = Parser.parse(line); + command = parsedLine[0]; + String[] dstorefile_list = Arrays.copyOfRange(parsedLine, 1, parsedLine.length); + + }*/ + }else{ ClientListener clientListener = new ClientListener(client, line); clientListener.start(); @@ -56,9 +68,15 @@ public class Controller { //SOCKET SEND AND RECEIVE public static void sendMessage(Socket socket, String message) throws IOException { - PrintWriter out = new PrintWriter(socket.getOutputStream(), true); - out.println(message); - ControllerLogger.getInstance().messageSent(socket, message); + try { + PrintWriter out = new PrintWriter(socket.getOutputStream(), true); + out.println(message); + ControllerLogger.getInstance().messageSent(socket, message); + + } catch (Exception e) { + System.out.println("ERROR SENDING MESSAGE:\n"); + e.printStackTrace(); + } } public static String receiveMessage(Socket socket) throws IOException{ @@ -88,6 +106,7 @@ public class Controller { } + static class DstoreListener extends Thread { public Socket dstore; public int port; @@ -100,6 +119,10 @@ public class Controller { public void send(String message) throws IOException{ sendMessage(dstore, message); } + + public void remove(){ + dstores.remove(this); + } @Override public void run() { @@ -117,6 +140,8 @@ public class Controller { String filename = parsedLine[1]; ongoingRemoves.get(filename).ack(port); } + }else{ + break; } } catch (Exception e) { @@ -124,6 +149,7 @@ public class Controller { //TODO: handle exception } } + System.out.println("Dstore disconnected!"); } } @@ -196,28 +222,37 @@ public class Controller { index.changeStatus(filename, "remove in progress"); int[] ports = index.getFileInfo(filename).storePorts; for(int port : ports){ - getDstoreListener(port).send(Protocol.REMOVE_TOKEN + " " + filename); + sendMessage(getDstoreListener(port).dstore, Protocol.REMOVE_TOKEN + " " + filename); } GetRemoveAcks removeAck = new GetRemoveAcks(ports); ongoingRemoves.put(filename, removeAck); + Future<Boolean> future = executor.submit(removeAck); if(future.get(timeout, TimeUnit.MILLISECONDS)){ - System.out.println("REMOVE SUCCESS"); sendMessage(client, Protocol.REMOVE_COMPLETE_TOKEN); index.remove(filename); ongoingRemoves.remove(filename); } } + }else if(command.equals(Protocol.RELOAD_TOKEN)){ + String filename = parsedLine[1]; + if(index.removeFirstPort(filename)){ + int port = index.getFileInfo(filename).storePorts[0]; + int filesize = index.getFileInfo(filename).filesize; + sendMessage(client, Protocol.LOAD_FROM_TOKEN + " " + port + " " + filesize); + }else{ + sendMessage(client, Protocol.ERROR_LOAD_TOKEN); + } } line = receiveMessage(client); } } catch (Exception e) { - e.printStackTrace(); + //If future times out break; } - } + } } } diff --git a/Dstore.java b/Dstore.java index 8fdc46ebaecfbc47e251b8c11017863b4fadd1c6..7918dba547a9cde30a77395a6eecf534c3ab670f 100644 --- a/Dstore.java +++ b/Dstore.java @@ -24,6 +24,8 @@ public class Dstore { try { Socket controller = new Socket("localhost", cport); sendMessage(controller, "JOIN " + port); + ControllerListener controllerListener = new ControllerListener(controller); + controllerListener.start(); while(true){ Socket client = dstoreServer.accept(); ClientListener clientListener = new ClientListener(client, controller); @@ -85,6 +87,9 @@ public class Dstore { String filename = parsedLine[1]; File file = new File(file_folder, filename); + if(!file.exists()){ + client.close(); + } BufferedInputStream bis = new BufferedInputStream(new FileInputStream(file)); byte[] bytearray = bis.readAllBytes(); @@ -93,13 +98,6 @@ public class Dstore { os.write(bytearray); os.flush(); bis.close(); - }else if(command.equals(Protocol.REMOVE_TOKEN)){ - String filename = parsedLine[1]; - - File file = new File(file_folder, filename); - if(file.delete()){ - sendMessage(controller, Protocol.REMOVE_ACK_TOKEN + " " + filename); - } } @@ -112,4 +110,44 @@ public class Dstore { } } + + static class ControllerListener extends Thread { + Socket controller; + + public ControllerListener(Socket controller){ + this.controller = controller; + } + + @Override + public void run(){ + while(controller.isConnected()){ + try { + String line = receiveMessage(controller); + if(line != null){ + String[] parsedLine = Parser.parse(line); + String command = parsedLine[0]; + if(command.equals(Protocol.REMOVE_TOKEN)){ + String filename = parsedLine[1]; + + File file = new File(file_folder, filename); + if(file.delete()){ + sendMessage(controller, Protocol.REMOVE_ACK_TOKEN + " " + filename); + }else{ + sendMessage(controller, Protocol.ERROR_FILE_DOES_NOT_EXIST_TOKEN + " " + filename); + } + }else if(command.equals(Protocol.LIST_TOKEN)){ + String file_list = ""; + for(File file : file_folder.listFiles()){ + file_list += file.getName() + " "; + } + file_list = file_list.trim(); + sendMessage(controller, Protocol.LIST_TOKEN + " " + file_list); + } + } + } catch (Exception e) { + //TODO: handle exception + } + } + } + } } diff --git a/Index.java b/Index.java index 9ec1651b5248c9b3bd4ae9e407affb23de9531c6..4a0b4f225d17cc5b88025a57c8a22fe6de104d10 100644 --- a/Index.java +++ b/Index.java @@ -1,4 +1,5 @@ import java.util.ArrayList; +import java.util.Arrays; public class Index { class FileInfo { @@ -34,6 +35,16 @@ public class Index { index.remove(getFileInfo(name)); } + public boolean removeFirstPort(String name){ + int[] storePorts = getFileInfo(name).storePorts; + if(storePorts.length == 1){ + return false; + }else{ + getFileInfo(name).storePorts = Arrays.copyOfRange(storePorts, 1, storePorts.length); + return true; + } + } + public String getFileList(){ String file_list = ""; for(FileInfo fInfo : index){ @@ -60,4 +71,5 @@ public class Index { public boolean fileExists(String name){ return (getFileInfo(name) != null); } + } \ No newline at end of file diff --git a/Parser.java b/Parser.java new file mode 100644 index 0000000000000000000000000000000000000000..4aafa7e3f24b5342985de25e5d986c179b36bf1c --- /dev/null +++ b/Parser.java @@ -0,0 +1,22 @@ +import java.io.IOException; + +public class Parser { + public static void main(String[] args) throws IOException { + + } + + public static String[] parse(String args, int amount) throws IOException { + String[] splitArgs = args.split(" "); + if(splitArgs.length != (amount + 1)) throw new IOException("Invalid paramters\nExpected:" + amount + "\nActual:" + splitArgs.length); + return splitArgs; + } + + public static String[] parse(String[] args, int amount) throws IOException { + if(args.length != (amount)) throw new IOException("Invalid paramters\nExpected:" + amount + "\nActual:" + args.length); + return args; + } + + public static String[] parse(String args){ + return args.split(" "); + } +} \ No newline at end of file