diff --git a/src/Controller.java b/src/Controller.java index 489015a653654f86d56d698f72f2c71ab817b780..129de965e6a359a816176ade5533ede48af1d6da 100644 --- a/src/Controller.java +++ b/src/Controller.java @@ -10,10 +10,8 @@ public class Controller { private static final HashMap<Integer,Socket> dStores = new HashMap<>(); //Ports of each dStore, and associated number of files private static final HashMap<Integer,ArrayList<Integer>> storeNumbers = new HashMap<>(); - //Store filename & ports private static final HashMap<String,int[]> index = new HashMap<>(); - private static int replicationFactor; public static void main(String[] args) { @@ -32,23 +30,38 @@ public class Controller { try { //Accept new TCP connection & get command Socket client = listeningSocket.accept(); - BufferedReader in = new BufferedReader(new InputStreamReader(client.getInputStream())); - String received = in.readLine(); - String command = received.split(" ")[0]; - - //Process depending on command received - switch (command) { - case "STORE" -> storeCommand(client); - case "LOAD" -> loadCommand(client, received.split(" ")[1]); - case "REMOVE" -> removeCommand(received.split(" ")[1], client); - case "LIST" -> listCommand(); - case "JOIN" -> { - int portNo = Integer.parseInt(received.split(" ")[1]); - client.setSoTimeout(timeOut); - dStores.put(portNo, client); - addDStore(portNo, 0); + new Thread(() -> { + try { + BufferedReader in = new BufferedReader(new InputStreamReader(client.getInputStream())); + String received = in.readLine(); + String command = received.split(" ")[0]; + + //If not enough DStores alert client except for JOIN requests + if(!command.equals("JOIN")){ + synchronized (dStores) { + if(!enoughDStores(client)) { + return; + } + } + } + + //Process depending on command received + switch (command) { + case "STORE" -> storeCommand(client); + case "LOAD" -> loadCommand(client, received.split(" ")[1]); + case "REMOVE" -> removeCommand(received.split(" ")[1], client); + case "LIST" -> listCommand(client); + case "JOIN" -> { + int portNo = Integer.parseInt(received.split(" ")[1]); + client.setSoTimeout(timeOut); + dStores.put(portNo, client); + addDStore(portNo, 0); + } + } + }catch (Exception e) { + System.out.println("error "+e); } - } + }).start(); }catch (Exception e) { System.out.println("error "+e); } @@ -68,43 +81,154 @@ public class Controller { } /** - * TODO Send load commands to client + * Send load commands to client */ private static void loadCommand(Socket client, String fileName) { + try { + PrintWriter clientOut = new PrintWriter(new OutputStreamWriter(client.getOutputStream())); + int selectedStore = -1; + + synchronized (index) { + synchronized (dStores) { + //If file does not exist, send error + if (!index.containsKey(fileName)) { + clientOut.println("ERROR_FILE_DOES_NOT_EXIST"); + } + + //Get port of a working DStore with required file + for (int store : index.get(fileName)) { + if (checkDStore(dStores.get(store))) { + selectedStore = store; + break; + } + } + } + } + //If no selected store, send "ERROR_LOAD", else send port no of open client + if (selectedStore == -1) { + clientOut.println("ERROR_LOAD"); + } else { + clientOut.println("LOAD_DATA "+selectedStore); + } + + //Flush message through & close print writer + clientOut.flush(); + clientOut.close(); + }catch (Exception e) { + System.out.println("error "+e); + } } /** - * TODO Send compiled list to client + * Send compiled list to client */ private static void listCommand(Socket client) { + StringBuilder list = new StringBuilder(); + list.append("LIST"); + + //Build list + synchronized (index) { + for (String file : index.keySet()) { + list.append(" "); + list.append(file); + } + } + //Send list to client + try { + PrintWriter clientOut = new PrintWriter(new OutputStreamWriter(client.getOutputStream())); + clientOut.println(list); + clientOut.flush(); + clientOut.close(); + }catch (Exception e) { + System.out.println("error "+e); + } } /** - * Get r least populated dStores + * TODO Remove given file from all DStores */ - private static ArrayList<Integer> emptiestDStores() throws Exception{ - //If not enough DStores, throw exception - int stores = dStores.size(); - int brokenStores = 0; - if(stores < replicationFactor) { - throw new Exception("Not enough DStores"); + private static void removeCommand(String fileName, Socket client) { + try { + //Create output stream to client + PrintWriter clientOut = new PrintWriter(new OutputStreamWriter(client.getOutputStream())); + + //If file does not exist + if (!index.containsKey(fileName)) { + clientOut.println("ERROR_FILE_DOES_NOT_EXIST"); + clientOut.flush(); + } else { + //Get list of DStores associated with given filename + int[] stores = index.get(fileName); + + //For each DStore, send "delete filename" command in new thread + for (int store : stores) { + decreaseDStore(store); + + new Thread(() -> { + try { + //Send "REMOVE filename" + PrintWriter out = new PrintWriter(dStores.get(store).getOutputStream()); + out.println("REMOVE " + fileName); + out.flush(); + + //Wait for acknowledgement + BufferedReader in = new BufferedReader(new InputStreamReader(dStores.get(store).getInputStream())); + while (true) { + String response = in.readLine(); + if(!(response == null) && response.equals("REMOVE_ACK " + fileName)){ + break; + } + } + } catch (Exception e) { + System.out.println("error "+e); + } + }).start(); + } + + //Remove file from index + synchronized (index) { + index.remove(fileName); + } + + //Send acknowledgement to client + clientOut.println("REMOVE_COMPLETE"); + clientOut.flush(); + clientOut.close(); + } + }catch (Exception e) { + System.out.println("error "+e); } + } + /** + * Get r least populated dStores + */ + private static ArrayList<Integer> emptiestDStores(Socket client) { ArrayList<Integer> lowestList = new ArrayList<>(); - int i = 0; - while ((lowestList.size() <= replicationFactor) && (brokenStores + replicationFactor < stores)) { - for(int store : storeNumbers.get(i)){ - if (checkDStore(dStores.get(store))) { - lowestList.add(store); - } else { - brokenStores++; + //Lock list of DStores until fully consumed + synchronized (storeNumbers) { + synchronized (dStores) { + int stores = dStores.size(); + int brokenStores = 0; + + //Build list of DStores + int i = 0; + while ((lowestList.size() <= replicationFactor) && (brokenStores + replicationFactor < stores)) { + for (int store : storeNumbers.get(i)) { + if (checkDStore(dStores.get(store))) { + lowestList.add(store); + } else { + brokenStores++; + } + } } } } + //Remove excess DStores while (lowestList.size() > replicationFactor) { lowestList.remove(lowestList.size() - 1); } @@ -118,10 +242,12 @@ public class Controller { private static ArrayList<Integer> workingDStores() { ArrayList<Integer> workingPorts = new ArrayList<>(); - //If dstore is still working add it to the list - for (Integer i:dStores.keySet()) { - if(checkDStore(dStores.get(i))){ - workingPorts.add(i); + synchronized (dStores) { + //If dstore is still working add it to the list + for (Integer i : dStores.keySet()) { + if (checkDStore(dStores.get(i))) { + workingPorts.add(i); + } } } @@ -145,6 +271,8 @@ public class Controller { in.readLine(); active = true; + out.close(); + in.close(); }catch (Exception e){ active = false; } @@ -152,74 +280,30 @@ public class Controller { return active; } - /** - * Remove given file from all DStores - */ - private static void removeCommand(String fileName, Socket client) { - try { - //Create output stream to client - PrintWriter clientOut = new PrintWriter(new OutputStreamWriter(client.getOutputStream())); - - //If file does not exist - if (!index.containsKey(fileName)) { - clientOut.println("ERROR_FILE_DOES_NOT_EXIST"); - clientOut.flush(); - } else { - //Get list of DStores associated with given filename - int[] stores = index.get(fileName); - - //For each DStore, send "delete filename" command in new thread - for (int store : stores) { - decreaseDStore(store); - - new Thread(() -> { - try { - //Send "REMOVE filename" - PrintWriter out = new PrintWriter(dStores.get(store).getOutputStream()); - out.println("REMOVE " + fileName); - out.flush(); - - //Wait for acknowledgement - BufferedReader in = new BufferedReader(new InputStreamReader(dStores.get(store).getInputStream())); - while (!in.readLine().equals("REMOVE_ACK " + fileName)) { - - } - - } catch (Exception e) { - System.out.println("error "+e); - } - }).start(); - } - - //Send acknowledgement to client - clientOut.println("REMOVE_COMPLETE"); - clientOut.flush(); - } - }catch (Exception e){ - System.out.println("error "+e); - } - } - /** * Add new DStore to list with 0 files */ private static void addDStore(int portNo, int depth){ - if(!storeNumbers.containsKey(depth)) { - storeNumbers.put(depth,(new ArrayList<>())); + synchronized (dStores) { + if (!storeNumbers.containsKey(depth)) { + storeNumbers.put(depth, (new ArrayList<>())); + } + storeNumbers.get(depth).add(portNo); } - storeNumbers.get(depth).add(portNo); } /** * Increase number of files given DStore has by 1 */ private static void increaseDStore(int portNo) { - for (int i = 0 ; ;i++) { - if(storeNumbers.containsKey(i) && storeNumbers.get(i).contains(portNo)) { - storeNumbers.get(i).remove(portNo); + synchronized (storeNumbers) { + for (int i = 0; ; i++) { + if (storeNumbers.get(i).contains(portNo)) { + storeNumbers.get(i).remove(portNo); - addDStore(portNo,i+1); - break; + addDStore(portNo, i + 1); + break; + } } } } @@ -228,15 +312,34 @@ public class Controller { * Decrease number of files given DStore has by 1 */ private static void decreaseDStore(int portNo) { - for (int i = 0 ; ;i++) { - if(storeNumbers.containsKey(i) && storeNumbers.get(i).contains(portNo)) { - storeNumbers.get(i).remove(portNo); + synchronized (storeNumbers) { + for (int i = 0; ; i++) { + if (storeNumbers.get(i).contains(portNo)) { + storeNumbers.get(i).remove(portNo); - addDStore(portNo,i-1); - break; + addDStore(portNo, i - 1); + break; + } } } } - + /** + * If not enough DStores, send "ERROR_NOT_ENOUGH_DSTORES" + */ + private static boolean enoughDStores(Socket client) { + if (dStores.size() < replicationFactor) { + try { + PrintWriter clientOut = new PrintWriter(new OutputStreamWriter(client.getOutputStream())); + clientOut.println("ERROR_NOT_ENOUGH_DSTORES"); + clientOut.flush(); + clientOut.close(); + }catch (Exception e){ + System.out.println("error "+e); + } + return false; + } else { + return true; + } + } }