diff --git a/Controller$SyncList.class b/Controller$SyncList.class deleted file mode 100644 index 3aa93869f34b5831c8f1487af11b00f6e1812c3e..0000000000000000000000000000000000000000 Binary files a/Controller$SyncList.class and /dev/null differ diff --git a/Controller.java b/Controller.java index cf4f95b0bc8fed95acfc32ebaf1acf3e1b2df6ea..df943c116c7ddec8ae056f79e6d35d2e8243dad2 100644 --- a/Controller.java +++ b/Controller.java @@ -83,52 +83,6 @@ public class Controller { } } - protected class SyncList extends ArrayList<Integer> { - public SyncList() { - super(); - } - - @Override - public boolean add(Integer i) { - synchronized(this) { - return super.add(i); - } - } - - @Override - public boolean addAll(Collection<? extends Integer> c) { - synchronized(this) { - return super.addAll(c); - } - } - - @Override - public Integer get(int i) { - synchronized(this) { - return super.get(i); - } - } - - @Override - public int size() { - synchronized(this) { - return super.size(); - } - } - - public Integer remove(int i) { - synchronized(this) { - return super.remove(i); - } - } - - public boolean remove(Integer i) { - synchronized(this) { - return super.remove(i); - } - } - } - protected class Reloader { public boolean reload; public Reloader() { @@ -136,7 +90,7 @@ public class Controller { } } - protected List<Integer> dstores; + protected Map<Integer,Socket> dstores; protected Map<Integer,String[]> rebalanceMessages; protected Map<String,IndexEntry> index; @@ -145,7 +99,7 @@ public class Controller { this.rFactor = rFactor; this.timeout = timeout; this.rebalancePeriod = rebalancePeriod; - dstores = new SyncList(); + dstores = Collections.synchronizedMap(new HashMap<Integer,Socket>()); index = new HashMap<String,IndexEntry>(); } @@ -201,16 +155,13 @@ public class Controller { void handleMessage(String[] message, Socket client) throws Exception { if(message[0].equals("JOIN")) { - dstores.add(Integer.parseInt(message[1])); + dstores.add(Integer.parseInt(message[1]), client); System.out.println("Dstore at " + message[1] + " joined"); rebalance(); } else if(message[0].equals("STORE")) { store(client, message[1]); } - else if(message[0].equals("STORE_ACK")) { - storeAck(client, message[1]); - } else if(message[0].equals("LOAD")) { load(client, message[1]); } @@ -260,10 +211,19 @@ public class Controller { //Send STORE_TO message PrintWriter out = new PrintWriter(client.getOutputStream()); out.print("STORE_TO"); + out.flush(); for(int port : storesToStore) { out.print(" " + port); + new Thread(() -> { + String[] message = dstores.get(Integer.valueOf(port)).receive().split(" "); + if(message[0].equals("STORE_ACK")) { + storeAck(Integer.valueOf(port), message[1]); + } + else { + //Log error + } + }).start(); } - out.flush(); //Wait for STORE_ACKs from datastores in storesToStore try { @@ -291,16 +251,14 @@ public class Controller { }).start(); } - void storeAck(Socket client, String filename) throws Exception { - new Thread(() -> { - if(!index.containsKey(filename)) { - //Throw logging exception - return; - } + void storeAck(Integer port, String filename) throws Exception { + if(!index.containsKey(filename)) { + //Throw logging exception + return; + } - IndexEntry thisEntry = index.get(filename); - thisEntry.addStoredBy(Integer.valueOf(client.getPort())); - }).start(); + IndexEntry thisEntry = index.get(filename); + thisEntry.addStoredBy(port); } void load(Socket client, String filename) throws Exception { @@ -380,13 +338,16 @@ public class Controller { entry.status = "remove in progress"; //Send REMOVE message to all Dstores storing the file - for(Integer dstore : entry.storedBy) { - Socket socket = new Socket(InetAddress.getLocalHost(), dstore.intValue()); - PrintWriter out = new PrintWriter(socket.getOutputStream()); - out.write("REMOVE " + filename); - out.flush(); - out.close(); - socket.close(); + for(Integer dstore : entry.getStoredBy()) { + new Thread(() -> { + String[] message = dstores.get(dstore).sendAndReceive("REMOVE").split(" "); + if(message[0].equals("REMOVE_ACK") && message[1].equals(filename)) { + entry.removeStoredBy(dstore.intValue()); + } + else { + //Log error + } + }).start(); } //Wait for REMOVE_ACKs from all Dstores which were sent the REMOVE message @@ -440,13 +401,11 @@ public class Controller { rebalanceMessages = dstoreFiles; try { //Send LIST message to each Dstore and receive their file list - for(Integer dstore : dstores) { - Socket socket = new Socket(InetAddress.getLocalHost(), dstore.intValue()); - PrintWriter out = new PrintWriter(socket.getOutputStream()); - out.write("LIST"); - out.flush(); - out.close(); - socket.close(); + for(Integer dstore : dstores.keySet()) { + new Thread(() -> { + String[] message = dstores.get(dstore).sendAndReceive("LIST").split(" "); + receiveDstoreList(dstore.intValue(), message); + }).start(); } dstoreFiles.wait(timeout); @@ -458,11 +417,41 @@ public class Controller { //Each file appears rFactor times //Each file appears at most once on each datastore //Files are evenly distributed (Dstores differ in capacity by at most 1, no 2 datastores have identical file lists) + List<Integer> storeOrder = reshuffle(dstoreFiles.keySet()); + List<String> fileList = new ArrayList<String>(); for(Integer i : reshuffle(dstoreFiles.keySet())) { - + for(String s : dstoreFiles.get(i)) { + if(!fileList.contains(s)) { + fileList.add(s); + } + } + } + + Map<Integer,List<String>> newAlloc = new HashMap<Integer,String[]>(); + int pos = 0; + int storeSize = Math.ceiling((fileList.size() * rFactor) / dstores.size()); + for(Integer i : dstoreFiles.keySet()) { + newAlloc.add(i, new ArrayList<String>(storeSize)); + } + for(String file : fileList) { + for(int j=0; j<rFactor; j++) { + newAlloc.get(pos).add(file); + pos ++; + if(pos >= newAlloc.size()) pos = 0; + } } //Make a (files to send, files to remove) pair for each Dstore + /* + Map<Integer,String> outMessages = new HashMap<Integer,String>(); + for(Integer dstore : storeOrder) { + String[] oldFiles = dstoreFiles.get(dstore); + List<String> newFiles = newAlloc.get(dstore); + for(String file : oldFiles) { + if(!newFiles.) + } + } + */ //Send the respective REBALANCE message to each Dstore @@ -471,20 +460,29 @@ public class Controller { catch(IOException e) { e.printStackTrace(); } + catch(Exception e) { + e.printStackTrace(); + } finally { rebalanceMessages = null; } }).start(); } - void receiveDstoreList(Socket client, String[] list) { - new Thread(() -> { - if(rebalanceMessages == null) return; - rebalanceMessages.add(Integer.valueOf(client.getPort()), list); - if(rebalanceMessages.size() == dstores.size()) { - rebalanceMessages.notify(); + void receiveDstoreList(int port, String[] list) { + if(rebalanceMessages == null) return; + + for(String file : list) { + if(!index.containsKey(file)) { + //Log error + return; //Throw exception? } - }).start(); + } + + rebalanceMessages.add(port, list); + if(rebalanceMessages.size() == dstores.size()) { + rebalanceMessages.notify(); + } } List<Integer> reshuffle(Collection<Integer> col) { diff --git a/Dstore.java b/Dstore.java index ee9a08ec5dc5bb29d9d7a55ca9413653335aee7d..6c23c0ee9cae12c287f3b5ef66842deda62f15ce 100644 --- a/Dstore.java +++ b/Dstore.java @@ -11,12 +11,17 @@ public class Dstore { protected int cport; //Controller's port to talk to protected int timeout; //in milliseconds protected String fileFolder; //Where to store the data locally + protected Map<String,Integer> fileSizes; + + protected BufferedReader controllerIn; + protected PrintWriter controllerOut; public Dstore(int port, int cport, int timeout, String fileFolder) { this.port = port; this.cport = cport; this.timeout = timeout; this.fileFolder = fileFolder; + fileSizes = new HashMap<String,Integer>(); } public static void main(String[] args) { @@ -42,10 +47,22 @@ public class Dstore { public void start() { try { Socket socket = new Socket(InetAddress.getLocalHost(), cport); - BufferedWriter out = new BufferedWriter(new OutputStreamWriter(socket.getOutputStream())); - out.write("JOIN " + port); - out.close(); - socket.close(); + controllerIn = new BufferedReader(new InputStreamReader(socket.getInputStream())); + controllerOut = new PrintWriter(socket.getOutputStream()); + controllerOut.print("JOIN " + port); + controllerOut.flush(); + + new Thread(() -> { + while(true) { + try { + String[] message = controllerIn.readLine().split(" "); + handleMessage(message, socket, controllerIn); + } + catch(Exception e) { + e.printStackTrace(); + } + } + }).start(); ServerSocket server = new ServerSocket(port); @@ -76,13 +93,13 @@ public class Dstore { load(client, message[1]); } else if(message[0].equals("REMOVE")) { - remove(client, message[1]); + remove(message[1]); } else if(message[0].equals("LIST")) { - list(client); + list(); } else if(message[0].equals("REBALANCE")) { - rebalance(client, message); + rebalance(message); } else { //Log error and continue (throw exception?) @@ -115,6 +132,9 @@ public class Dstore { controllerOut.print("STORE_ACK " + filename); controllerOut.flush(); controllerOut.close(); + + if(fileSizes.containsKey(filename)) fileSizes.remove(filename); + fileSizes.add(filename, filesize); } catch(IOException e) { e.printStackTrace(); @@ -153,24 +173,22 @@ public class Dstore { }).start(); } - void remove(Socket client, String filename) throws Exception { + void remove(String filename) throws Exception { new Thread(() -> { try { //Remove the file from fileFolder Path path = new File(fileFolder + "/" + filename).toPath(); - PrintWriter out = new PrintWriter(client.getOutputStream()); if(Files.deleteIfExists(path)) { //Send REMOVE_ACK message to client (the controller) - out.print("REMOVE_ACK"); + controllerOut.print("REMOVE_ACK"); } else { //Send DOES NOT EXIST error - out.print("ERROR DOES_NOT_EXIST " + filename); + controllerOut.print("ERROR DOES_NOT_EXIST " + filename); } - out.flush(); - out.close(); + controllerOut.flush(); } catch(IOException e) { e.printStackTrace(); @@ -178,16 +196,14 @@ public class Dstore { }).start(); } - void list(Socket client) throws Exception { + void list() throws Exception { new Thread(() -> { try { //Send a list of all files in fileFolder to client (the controller) - PrintWriter out = new PrintWriter(client.getOutputStream()); for(File file : new File(fileFolder).listFiles()) { - out.print(file.getName()); - out.flush(); + controllerOut.print(file.getName()); + controllerOut.flush(); } - out.close(); } catch(IOException e) { e.printStackTrace(); @@ -195,7 +211,7 @@ public class Dstore { }).start(); } - void rebalance(Socket client, String[] message) throws Exception { + void rebalance(String[] message) throws Exception { new Thread(() -> { try { //Interpret files to send and files to remove from the message @@ -232,7 +248,34 @@ public class Dstore { //Send each file to send to the Dstore at the specified port number for(String filename : filesToSend.keySet()) { for(Integer dstore : filesToSend.get(filename)) { - //Same store functions as used in the client object + try { + Socket socket = new Socket(InetAddress.getLocalHost(), dstore.intValue()); + PrintWriter out = new PrintWriter(socket.getOutputStream()); + out.write("STORE " + filename + " " + fileSizes.get(filename)); + out.flush(); + + BufferedReader in = new BufferedReader(new InputStreamReader(socket.getInputStream())); + if(!in.readLine().equals("ACK")) { + //Log error + } + String line; + while(line = new BufferedReader(new FileInputStream(fileFolder + "/" + filename))) { + if(line == null) { + out.write(""); + out.flush(); + break; + } + else { + out.write(line); + out.flush(); + } + } + out.close(); + socket.close(); + } + catch(IOException e) { + e.printStackTrace(); + } } } @@ -242,10 +285,8 @@ public class Dstore { } //Send REBALANCE_COMPLETE message to client (the controller) - PrintWriter out = new PrintWriter(client.getOutputStream()); - out.print("REBALANCE COMPLETE"); - out.flush(); - out.close(); + controllerOut.print("REBALANCE COMPLETE"); + controllerOut.flush(); } catch(IOException e) { e.printStackTrace(); diff --git a/DstoreConnection.java b/DstoreConnection.java new file mode 100644 index 0000000000000000000000000000000000000000..56bae5a19b398aa561497ca77e29b10d5c9ebb5a --- /dev/null +++ b/DstoreConnection.java @@ -0,0 +1,43 @@ +import java.io.*; +import java.net.*; +import java.lang.Runnable; +import java.util.List; +import java.util.ArrayList; +import java.util.Map; +import java.util.HashMap; +import java.util.Collection; + +public class DstoreConnection { + protected Socket socket; + protected BufferedReader reader; + protected PrintWriter writer; + + public DstoreConnection(Socket socket) { + this.socket = socket; + reader = new BufferedReader(new InputStreamReader(socket.getInputStream())); + writer = new PrintWriter(socket.getOutputStream()); + } + + public String sendAndReceive(String message) { + try { + writer.print(message); + writer.flush(); + } + catch(IOException e) { + e.printStackTrace(); + return ""; + } + return receive(); + } + + public String receive() { + try { + String returnMessage = reader.readLine(); + return returnMessage; + } + catch(IOException e) { + e.printStackTrace(); + return ""; + } + } +}