diff --git a/Controller.class b/Controller.class new file mode 100644 index 0000000000000000000000000000000000000000..366a48b3ad9a08deaf72845574e63c4516c2d101 Binary files /dev/null and b/Controller.class differ diff --git a/Controller.java b/Controller.java index 34356cf381446edf549b652fc40e17070ec9b668..c44d8a7ffddd86b9293c9feeb8a5214d8dbf4a51 100644 --- a/Controller.java +++ b/Controller.java @@ -1,3 +1,158 @@ +import java.io.*; +import java.net.*; +import java.util.List; +import java.util.ArrayList; + public class Controller { + protected int cport; //Port to listen on + protected int rFactor; //Replication factor; each file is replicated across r Dstores + protected int timeout; //in milliseconds + protected int rebalancePeriod; //How long to wait to start the next rebalance operation, in milliseconds + protected List<Integer> dstores; + + public Controller(int cport, int rFactor, int timeout, int rebalancePeriod) { + this.cport = cport; + this.rFactor = rFactor; + this.timeout = timeout; + this.rebalancePeriod = rebalancePeriod; + dstores = new ArrayList<Integer>(); + } + + public static void main(String[] args) { + try { + int cport = Integer.parseInt(args[0]); + int rFactor = Integer.parseInt(args[1]); + int timeout = Integer.parseInt(args[2]); + int rebalancePeriod = Integer.parseInt(args[3]); + + Controller controller = new Controller(cport, rFactor, timeout, rebalancePeriod); + controller.start(); + } + catch(IndexOutOfBoundsException e) { + System.out.println("Command line arguments have not been provided"); + return; + } + catch(NumberFormatException e) { + System.out.println("Command line arguments must be integers"); + return; + } + } + + public void start() { + try { + ServerSocket server = new ServerSocket(cport); + while(true) { + try { + Socket client = server.accept(); + BufferedReader in = new BufferedReader(new InputStreamReader(client.getInputStream())); + String[] message = in.readLine().split(" "); + if(dstores.size() < rFactor && !message[0].equals("JOIN")) { + BufferedWriter out = new BufferedWriter(new OutputStreamWriter(client.getOutputStream())); + out.write("ERROR"); + out.close(); + } + else { + handleMessage(message, client); + } + in.close(); + } + catch(Exception e) { + e.printStackTrace(); + System.out.println("Continue..."); + } + } + } + catch(Exception e) { + e.printStackTrace(); + } + } + + void handleMessage(String[] message, Socket client) { + if(message[0].equals("JOIN")) { + dstores.add(Integer.parseInt(message[1])); + rebalance(); + } + else if(message[0].equals("STORE")) { + store(client, message[1]); + } + else if(message[0].equals("LOAD")) { + load(client, message[1]); + } + else if(message[0].equals("REMOVE")) { + remove(client, message[1]); + } + else if(message[0].equals("LIST")) { + list(client); + } + } + + void store(Socket client, String filename) { + //Update index to "store in progress" + + //Select Dstores + int[] storesToStore = new int[rFactor]; + for(int i=0; i<rFactor; i++) { + storesToStore[i] = dstores.get(i).intValue(); + } + + //Send STORE_TO message + PrintWriter out = new PrintWriter(client.getOutputStream()); + out.print("STORE_TO"); + for(int port : storesToStore) { + out.print(" "); + out.print(port); + } + out.flush(); + + //Wait for STORE_ACKs from datastores in storesToStore + + //Update index to "store complete" + + //Send STORE_COMPLETE message + out.print("STORE_COMPLETE"); + out.flush(); + out.close(); + } + + void load(Socket client, String filename) { + //Select a Dstore which contains the file + + //Send LOAD_FROM message + } + + void remove(Socket client, String filename) { + //Update index to "remove in progress" + + //Send REMOVE message to all Dstores storing the file + + //Wait for REMOVE_ACKs from all Dstores which were sent the REMOVE message + + //Update index to "remove complete" + + //Send REMOVE_COMPLETE to client + PrintWriter clientOut = new PrintWriter(client.getOutputStream()); + clientOut.print("REMOVE_COMPLETE"); + clientOut.flush(); + clientOut.close(); + } + + void list(Socket client) { + //Send file list to client + } + + void rebalance() { + //Send LIST message to each Dstore and receive their file list + + //Create a new file allocation so that: + //Each file appears rFactor times + //Each file appears at most once on each datastore + //Files are evenly distributed (Dstores differ in capacity by at most 1, no 2 datastores have identical file lists) + + //Make a (files to send, files to remove) pair for each Dstore + + //Send the respective REBALANCE message to each Dstore + + //Wait for REBALANCE_COMPLETE from all Dstores + } } diff --git a/Dstore.class b/Dstore.class new file mode 100644 index 0000000000000000000000000000000000000000..a22d81e7a3552d109af70b70e6b19b6cf1041d70 Binary files /dev/null and b/Dstore.class differ diff --git a/Dstore.java b/Dstore.java index 07485eeeb6c9c217735dc5f8d5aaa12d67cdc8d4..da2d0a9591d268411f88ecb258fab8960917c92e 100644 --- a/Dstore.java +++ b/Dstore.java @@ -1,3 +1,147 @@ +import java.io.*; +import java.net.*; +import java.util.Map; +import java.util.HashMap; + public class Dstore { + protected int port; //Port to listen on + protected int cport; //Controller's port to talk to + protected int timeout; //in milliseconds + protected String fileFolder; //Where to store the data locally + public Dstore(int port, int cport, int timeout, String fileFolder) { + this.port = port; + this.cport = cport; + this.timeout = timeout; + this.fileFolder = fileFolder; + } + + public static void main(String[] args) { + try { + int port = Integer.parseInt(args[0]); + int cport = Integer.parseInt(args[1]); + int timeout = Integer.parseInt(args[2]); + String fileFolder = args[3]; + + Dstore dstore = new Dstore(port, cport, timeout, fileFolder); + dstore.start(); + } + catch(IndexOutOfBoundsException e) { + System.out.println("Command line arguments have not been provided"); + return; + } + catch(NumberFormatException e) { + System.out.println("Command line arguments must be integers"); + return; + } + } + + public void start() { + try { + Socket socket = new Socket(InetAddress.getLocalHost(), cport); + BufferedWriter out = new BufferedWriter(new OutputStreamWriter(socket.getOutputStream())); + out.write("JOIN " + port); + out.close(); + socket.close(); + + ServerSocket server = new ServerSocket(port); + + while(true) { + try { + Socket client = server.accept(); + BufferedReader in = new BufferedReader(new InputStreamReader(client.getInputStream())); + String[] message = in.readLine().split(" "); + handleMessage(message, client); + in.close(); + } + catch(Exception e) { + e.printStackTrace(); + } + } + } + catch(Exception e) { + e.printStackTrace(); + } + } + + void handleMessage(String[] message, Socket client) { + if(message[0].equals("STORE")) { + store(client, message[1], Integer.parseInt(message[2])); + } + else if(message[0].equals("LOAD_DATA")) { + load(client, message[1]); + } + else if(message[0].equals("REMOVE")) { + remove(client, message[1]); + } + else if(message[0].equals("LIST")) { + list(client); + } + else if(message[0].equals("REBALANCE")) { + rebalance(client, message); + } + } + + void store(Socket client, String filename, int filesize) { + //Send ACK message to client + + //Receive file content from client + + //Store the file data in fileFolder + + //Send STORE_ACK message to the Controller + } + + void load(Socket client, String filename) { + //Send the content of the file in fileFolder to the client + } + + void remove(Socket client, String filename) { + //Remove the file from fileFolder + + //Send REMOVE_ACK message to client (the controller) + } + + void list(Socket client) { + //Send a list of all files in fileFolder to client (the controller) + } + + void rebalance(Socket client, String[] message) { + //Interpret files to send and files to remove from the message + Map<String,Integer[]> filesToSend; + String[] filesToRemove; + int index; + + int numberToSend = Integer.parseInt(message[1]); + index = 2; + filesToSend = new HashMap<String,Integer[]>(numberToSend); + for(int i=0; i<numberToSend; i++) { + String name = message[index]; + index++; + + int numberOfReceivers = Integer.parseInt(message[index]); + index++; + Integer[] receivers = new Integer[numberOfReceivers]; + for(int j=0; j<numberOfReceivers; j++) { + receivers[j] = Integer.parseInt(message[index]); + index++; + } + + filesToSend.put(name, receivers); + } + + int numberToRemove = Integer.parseInt(message[index]); + index++; + filesToRemove = new String[numberToRemove]; + for(int k=0; k<numberToRemove; k++) { + filesToRemove[k] = message[index]; + index++; + } + + //Send each file to send to the Dstore at the specified port number + + //Remove each file to remove from fileFolder + + //Send REBALANCE_COMPLETE message to client (the controller) + } }