diff --git a/Controller$IndexEntry.class b/Controller$IndexEntry.class new file mode 100644 index 0000000000000000000000000000000000000000..1a3af9c4b1553341ae7057985d0c3bb2d51cc7fd Binary files /dev/null and b/Controller$IndexEntry.class differ diff --git a/Controller.class b/Controller.class index 366a48b3ad9a08deaf72845574e63c4516c2d101..338ca1cac64638e4f4032704a11ffa1948b256ee 100644 Binary files a/Controller.class and b/Controller.class differ diff --git a/Controller.java b/Controller.java index c44d8a7ffddd86b9293c9feeb8a5214d8dbf4a51..d9661c4ebbae6fa9bc9bcc774ef41419766e163b 100644 --- a/Controller.java +++ b/Controller.java @@ -1,7 +1,10 @@ import java.io.*; import java.net.*; +import java.lang.Runnable; import java.util.List; import java.util.ArrayList; +import java.util.Map; +import java.util.HashMap; public class Controller { protected int cport; //Port to listen on @@ -9,14 +12,118 @@ public class Controller { protected int timeout; //in milliseconds protected int rebalancePeriod; //How long to wait to start the next rebalance operation, in milliseconds + protected class IndexEntry { + protected int filesize; + protected List<Integer> storedBy; + protected int numberToStore; + protected String status; + protected Object storeAckLock; + + public IndexEntry() { + filesize = -1; + storedBy = new SyncList(); + numberToStore = 0; + status = "store in progress"; + storeAckLock = new Object(); + } + + public synchronized void setFilesize(int filesize) { + this.filesize = filesize; + } + + public synchronized int getFilesize() { + return filesize; + } + + public void addStoredBy(int dstore) { + storedBy.add(new Integer(dstore)); + if(storedBy.size() == numberToStore) storeAckLock.notify(); + } + + public void addStoredBy(List<Integer> dstores) { + storedBy.addAll(dstores); + if(storedBy.size() == numberToStore) storeAckLock.notify(); + } + + public List<Integer> getStoredBy() { + return storedBy; + } + + public synchronized void setNumberToStore(int i) { + numberToStore = i; + } + + public synchronized void setStatus(String status) { + this.status = status; + } + + public synchronized int getStatus() { + return status; + } + + public Object getLock() { + return storeAckLock; + } + } + + protected class SyncList extends ArrayList<Integer> { + public SyncList() { + super(); + } + + @Override + public boolean add(Integer i) { + synchronized(this) { + super.add(i); + } + } + + @Override + public boolean addAll(Collection<Integer> c) { + synchronized(this) { + super.addAll(c); + } + } + + @Override + public Integer get(int i) { + synchronized(this) { + super.get(i); + } + } + + @Override + public int size() { + synchronized(this) { + super.size(); + } + } + + @Override + public boolean remove(int i) { + synchronized(this) { + super.remove(i); + } + } + + @Override + public boolean remove(Integer i) { + synchronized(this) { + super.remove(i); + } + } + } + protected List<Integer> dstores; + protected Map<String,IndexEntry> index; public Controller(int cport, int rFactor, int timeout, int rebalancePeriod) { this.cport = cport; this.rFactor = rFactor; this.timeout = timeout; this.rebalancePeriod = rebalancePeriod; - dstores = new ArrayList<Integer>(); + dstores = new SyncList(); + index = new HashMap<String,IndexEntry>(); } public static void main(String[] args) { @@ -58,6 +165,7 @@ public class Controller { in.close(); } catch(Exception e) { + //Log error e.printStackTrace(); System.out.println("Continue..."); } @@ -68,7 +176,7 @@ public class Controller { } } - void handleMessage(String[] message, Socket client) { + void handleMessage(String[] message, Socket client) throws Exception { if(message[0].equals("JOIN")) { dstores.add(Integer.parseInt(message[1])); rebalance(); @@ -76,8 +184,14 @@ public class Controller { else if(message[0].equals("STORE")) { store(client, message[1]); } + else if(message[0].equals("STORE_ACK")) { + storeAck(client, message[1]); + } else if(message[0].equals("LOAD")) { - load(client, message[1]); + load(client, message[1], false); + } + else if(message[0].equals("RELOAD")) { + load(client, message[1], true); } else if(message[0].equals("REMOVE")) { remove(client, message[1]); @@ -85,50 +199,126 @@ public class Controller { else if(message[0].equals("LIST")) { list(client); } + else { + //Log error and continue (throw exception?) + } } - void store(Socket client, String filename) { - //Update index to "store in progress" - - //Select Dstores - int[] storesToStore = new int[rFactor]; - for(int i=0; i<rFactor; i++) { - storesToStore[i] = dstores.get(i).intValue(); + void store(Socket client, String filename) throws Exception { + new Thread(() -> { + try { + if(index.containsKey(filename)) { + PrintWriter out = new PrintWriter(client.getOutputStream()); + out.print("ERROR ALREADY_EXISTS " + filename); + out.flush(); + out.close(); + return; + } + + //Update index to "store in progress" + IndexEntry entry = new IndexEntry(); + index.put(filename, entry); + + //Select Dstores + int[] storesToStore = new int[rFactor]; + for(int i=0; i<rFactor; i++) { + Integer thisStore = dstores.get(i); + storesToStore[i] = thisStore.intValue(); + } + entry.setNumberToStore(rFactor); + + //Send STORE_TO message + PrintWriter out = new PrintWriter(client.getOutputStream()); + out.print("STORE_TO"); + for(int port : storesToStore) { + out.print(" " + port); + } + out.flush(); + + //Wait for STORE_ACKs from datastores in storesToStore + try { + entry.getLock().wait(timeout); + } + catch(InterruptedException e) { + e.printStackTrace(); + } + + if(entry.getStoredBy().size() < rFactor) { + //Log error + } + + //Update index to "store complete" + entry.status = "store complete"; + + //Send STORE_COMPLETE message + out.print("STORE_COMPLETE"); + out.flush(); + out.close(); + } + catch(IOException e) { + e.printStackTrace(); + } + }).start(); + } + + void storeAck(Socket client, String filename) throws Exception { + if(!index.containsKey(filename)) { + //Throw logging exception + return; } - //Send STORE_TO message - PrintWriter out = new PrintWriter(client.getOutputStream()); - out.print("STORE_TO"); - for(int port : storesToStore) { - out.print(" "); - out.print(port); + IndexEntry thisEntry = index.get(filename); + thisEntry.addStoredBy(new Integer(client.getPort())); + } + + void load(Socket client, String filename, boolean reload) throws Exception { + if(!index.containsKey(filename)) { + PrintWriter out = new PrintWriter(client.getOutputStream()); + out.print("ERROR DOES_NOT_EXIST"); + out.flush(); + out.close(); } - out.flush(); - //Wait for STORE_ACKs from datastores in storesToStore + //Select a Dstore which contains the file + IndexEntry thisEntry = index.get(filename); + int thisStore = thisEntry.storedBy.get(0).intValue(); + int thisSize = thisEntry.filesize; - //Update index to "store complete" + // !!TO DO: RELOAD COMMAND!! - //Send STORE_COMPLETE message - out.print("STORE_COMPLETE"); + //Send LOAD_FROM message + PrintWriter out = new PrintWriter(client.getOutputStream()); + out.print("LOAD_FROM " + thisStore + " " + thisSize); out.flush(); out.close(); } - void load(Socket client, String filename) { - //Select a Dstore which contains the file + void remove(Socket client, String filename) throws Exception { + if(!index.containsKey(filename)) { + PrintWriter clientOut = new PrintWriter(client.getOutputStream()); + clientOut.print("ERROR DOES_NOT_EXIST"); + clientOut.flush(); + clientOut.close(); + } - //Send LOAD_FROM message - } - - void remove(Socket client, String filename) { //Update index to "remove in progress" + IndexEntry entry = index.get(filename); + entry.status = "remove in progress"; //Send REMOVE message to all Dstores storing the file + for(Integer dstore : entry.storedBy) { + Socket socket = new Socket(InetAddress.getLocalHost(), dstore.intValue()); + PrintWriter out = new PrintWriter(socket.getOutputStream()); + out.write("REMOVE " + filename); + out.flush(); + out.close(); + socket.close(); + } //Wait for REMOVE_ACKs from all Dstores which were sent the REMOVE message //Update index to "remove complete" + entry.status = "remove complete"; //Send REMOVE_COMPLETE to client PrintWriter clientOut = new PrintWriter(client.getOutputStream()); @@ -137,11 +327,17 @@ public class Controller { clientOut.close(); } - void list(Socket client) { + void list(Socket client) throws Exception { //Send file list to client + PrintWriter out = new PrintWriter(client.getOutputStream()); + for(String name : index.keySet()) { + out.println(name); + } + out.flush(); + out.close(); } - void rebalance() { + void rebalance() throws Exception { //Send LIST message to each Dstore and receive their file list //Create a new file allocation so that: diff --git a/Dstore.class b/Dstore.class index a22d81e7a3552d109af70b70e6b19b6cf1041d70..508e972562b57c9e441796e3b9ff255a2b8d89b9 100644 Binary files a/Dstore.class and b/Dstore.class differ diff --git a/Dstore.java b/Dstore.java index da2d0a9591d268411f88ecb258fab8960917c92e..7a40e16fcc588aa6366b233d76d0318bdc151efa 100644 --- a/Dstore.java +++ b/Dstore.java @@ -1,4 +1,5 @@ import java.io.*; +import java.nio.file.Files; import java.net.*; import java.util.Map; import java.util.HashMap; @@ -51,10 +52,11 @@ public class Dstore { Socket client = server.accept(); BufferedReader in = new BufferedReader(new InputStreamReader(client.getInputStream())); String[] message = in.readLine().split(" "); - handleMessage(message, client); + handleMessage(message, client, in); in.close(); } catch(Exception e) { + //Log error e.printStackTrace(); } } @@ -64,9 +66,9 @@ public class Dstore { } } - void handleMessage(String[] message, Socket client) { + void handleMessage(String[] message, Socket client, BufferedReader clientIn) throws Exception { if(message[0].equals("STORE")) { - store(client, message[1], Integer.parseInt(message[2])); + store(client, message[1], Integer.parseInt(message[2]), clientIn); } else if(message[0].equals("LOAD_DATA")) { load(client, message[1]); @@ -80,33 +82,90 @@ public class Dstore { else if(message[0].equals("REBALANCE")) { rebalance(client, message); } + else { + //Log error and continue (throw exception?) + } } - void store(Socket client, String filename, int filesize) { + void store(Socket client, String filename, int filesize, BufferedReader in) throws Exception { //Send ACK message to client + PrintWriter out = new PrintWriter(client.getOutputStream()); + out.print("ACK"); + out.flush(); + out.close(); - //Receive file content from client + FileOutputStream writer = new FileOutputStream(fileFolder + "/" + filename, false); - //Store the file data in fileFolder + //Receive + write file content from client + int byteCount = filesize; + while(byteCount > 0) { + byte[] nextLine = in.readLine().getBytes(); + writer.write(nextLine); + writer.flush(); + byteCount -= nextLine.length; + } + writer.close(); //Send STORE_ACK message to the Controller + PrintWriter controllerOut = new PrintWriter(new Socket(InetAddress.getLocalHost(), cport).getOutputStream()); + controllerOut.print("STORE_ACK " + filename); + controllerOut.flush(); + controllerOut.close(); } - void load(Socket client, String filename) { + void load(Socket client, String filename) throws Exception { //Send the content of the file in fileFolder to the client + PrintWriter out = new PrintWriter(client.getOutputStream()); + FileInputStream reader; + try { + reader = new FileInputStream(fileFolder + "/" + filename); + } + catch(FileNotFoundException e) { + out.print("ERROR DOES_NOT_EXIST"); + out.flush(); + out.close(); + return; + } + + byte[] buf = new byte[8]; + while(reader.read(buf) != -1) { + out.print(new String(buf)); + out.flush(); + } + + reader.close(); + out.close(); } - void remove(Socket client, String filename) { + void remove(Socket client, String filename) throws Exception { //Remove the file from fileFolder + Path path = new File(fileFolder + "/" + filename).toPath(); + PrintWriter out = new PrintWriter(client.getOutputStream()); + + if(Files.deleteIfExists(path)) { + //Send REMOVE_ACK message to client (the controller) + out.print("REMOVE_ACK"); + } + else { + //Send DOES NOT EXIST error + out.print("ERROR DOES_NOT_EXIST " + filename); + } - //Send REMOVE_ACK message to client (the controller) + out.flush(); + out.close(); } - void list(Socket client) { + void list(Socket client) throws Exception { //Send a list of all files in fileFolder to client (the controller) + PrintWriter out = new PrintWriter(client.getOutputStream()); + for(File file : new File(fileFolder).listFiles()) { + out.print(file.getName()); + out.flush(); + } + out.close(); } - void rebalance(Socket client, String[] message) { + void rebalance(Socket client, String[] message) throws Exception { //Interpret files to send and files to remove from the message Map<String,Integer[]> filesToSend; String[] filesToRemove; @@ -139,9 +198,21 @@ public class Dstore { } //Send each file to send to the Dstore at the specified port number + for(String filename : filesToSend.keySet()) { + for(Integer dstore : filesToSend.get(filename)) { + //Same store functions as used in the client object + } + } //Remove each file to remove from fileFolder + for(String filename : filesToRemove) { + new File(fileFolder + "/" + filename).delete(); + } //Send REBALANCE_COMPLETE message to client (the controller) + PrintWriter out = new PrintWriter(client.getOutputStream()); + out.print("REBALANCE COMPLETE"); + out.flush(); + out.close(); } }