From 6e07a95e79f5ecb0d932f9830142f53d0c16b6f7 Mon Sep 17 00:00:00 2001
From: pr1n19 <pr1n19@soton.ac.uk>
Date: Sat, 30 Apr 2022 02:36:13 +0100
Subject: [PATCH] Synchronisation added to maintain continuity List command &
 Load command completed

---
 src/Controller.java | 303 +++++++++++++++++++++++++++++---------------
 1 file changed, 203 insertions(+), 100 deletions(-)

diff --git a/src/Controller.java b/src/Controller.java
index 489015a..129de96 100644
--- a/src/Controller.java
+++ b/src/Controller.java
@@ -10,10 +10,8 @@ public class Controller {
     private static final HashMap<Integer,Socket> dStores = new HashMap<>();
     //Ports of each dStore, and associated number of files
     private static final HashMap<Integer,ArrayList<Integer>> storeNumbers = new HashMap<>();
-
     //Store filename & ports
     private static final HashMap<String,int[]> index = new HashMap<>();
-
     private static int replicationFactor;
 
     public static void main(String[] args) {
@@ -32,23 +30,38 @@ public class Controller {
                     try {
                         //Accept new TCP connection & get command
                         Socket client = listeningSocket.accept();
-                        BufferedReader in = new BufferedReader(new InputStreamReader(client.getInputStream()));
-                        String received = in.readLine();
-                        String command = received.split(" ")[0];
-
-                        //Process depending on command received
-                        switch (command) {
-                            case "STORE" -> storeCommand(client);
-                            case "LOAD" -> loadCommand(client, received.split(" ")[1]);
-                            case "REMOVE" -> removeCommand(received.split(" ")[1], client);
-                            case "LIST" -> listCommand();
-                            case "JOIN" -> {
-                                int portNo = Integer.parseInt(received.split(" ")[1]);
-                                client.setSoTimeout(timeOut);
-                                dStores.put(portNo, client);
-                                addDStore(portNo, 0);
+                        new Thread(() -> {
+                            try {
+                                BufferedReader in = new BufferedReader(new InputStreamReader(client.getInputStream()));
+                                String received = in.readLine();
+                                String command = received.split(" ")[0];
+
+                                //If not enough DStores alert client except for JOIN requests
+                                if(!command.equals("JOIN")){
+                                    synchronized (dStores) {
+                                        if(!enoughDStores(client)) {
+                                            return;
+                                        }
+                                    }
+                                }
+
+                                //Process depending on command received
+                                switch (command) {
+                                    case "STORE" -> storeCommand(client);
+                                    case "LOAD" -> loadCommand(client, received.split(" ")[1]);
+                                    case "REMOVE" -> removeCommand(received.split(" ")[1], client);
+                                    case "LIST" -> listCommand(client);
+                                    case "JOIN" -> {
+                                        int portNo = Integer.parseInt(received.split(" ")[1]);
+                                        client.setSoTimeout(timeOut);
+                                        dStores.put(portNo, client);
+                                        addDStore(portNo, 0);
+                                    }
+                                }
+                            }catch (Exception e) {
+                                System.out.println("error "+e);
                             }
-                        }
+                        }).start();
                     }catch (Exception e) {
                         System.out.println("error "+e);
                     }
@@ -68,43 +81,154 @@ public class Controller {
     }
 
     /**
-     * TODO Send load commands to client
+     * Send load commands to client
      */
     private static void loadCommand(Socket client, String fileName) {
+        try {
+            PrintWriter clientOut = new PrintWriter(new OutputStreamWriter(client.getOutputStream()));
+            int selectedStore = -1;
+
+            synchronized (index) {
+                synchronized (dStores) {
+                    //If file does not exist, send error
+                    if (!index.containsKey(fileName)) {
+                        clientOut.println("ERROR_FILE_DOES_NOT_EXIST");
+                    }
+
+                    //Get port of a working DStore with required file
+                    for (int store : index.get(fileName)) {
+                        if (checkDStore(dStores.get(store))) {
+                            selectedStore = store;
+                            break;
+                        }
+                    }
+                }
+            }
 
+            //If no selected store, send "ERROR_LOAD", else send port no of open client
+            if (selectedStore == -1) {
+                clientOut.println("ERROR_LOAD");
+            } else {
+                clientOut.println("LOAD_DATA "+selectedStore);
+            }
+
+            //Flush message through & close print writer
+            clientOut.flush();
+            clientOut.close();
+        }catch (Exception e) {
+            System.out.println("error "+e);
+        }
     }
 
     /**
-     * TODO Send compiled list to client
+     * Send compiled list to client
      */
     private static void listCommand(Socket client) {
+        StringBuilder list = new StringBuilder();
+        list.append("LIST");
+
+        //Build list
+        synchronized (index) {
+            for (String file : index.keySet()) {
+                list.append(" ");
+                list.append(file);
+            }
+        }
 
+        //Send list to client
+        try {
+            PrintWriter clientOut = new PrintWriter(new OutputStreamWriter(client.getOutputStream()));
+            clientOut.println(list);
+            clientOut.flush();
+            clientOut.close();
+        }catch (Exception e) {
+            System.out.println("error "+e);
+        }
     }
 
     /**
-     * Get r least populated dStores
+     * TODO Remove given file from all DStores
      */
-    private static ArrayList<Integer> emptiestDStores() throws Exception{
-        //If not enough DStores, throw exception
-        int stores = dStores.size();
-        int brokenStores = 0;
-        if(stores < replicationFactor) {
-            throw new Exception("Not enough DStores");
+    private static void removeCommand(String fileName, Socket client) {
+        try {
+            //Create output stream to client
+            PrintWriter clientOut = new PrintWriter(new OutputStreamWriter(client.getOutputStream()));
+
+            //If file does not exist
+            if (!index.containsKey(fileName)) {
+                clientOut.println("ERROR_FILE_DOES_NOT_EXIST");
+                clientOut.flush();
+            } else {
+                //Get list of DStores associated with given filename
+                int[] stores = index.get(fileName);
+
+                //For each DStore, send "delete filename" command in new thread
+                for (int store : stores) {
+                    decreaseDStore(store);
+
+                    new Thread(() -> {
+                        try {
+                            //Send "REMOVE filename"
+                            PrintWriter out = new PrintWriter(dStores.get(store).getOutputStream());
+                            out.println("REMOVE " + fileName);
+                            out.flush();
+
+                            //Wait for acknowledgement
+                            BufferedReader in = new BufferedReader(new InputStreamReader(dStores.get(store).getInputStream()));
+                            while (true) {
+                                String response = in.readLine();
+                                if(!(response == null) && response.equals("REMOVE_ACK " + fileName)){
+                                    break;
+                                }
+                            }
+                        } catch (Exception e) {
+                            System.out.println("error "+e);
+                        }
+                    }).start();
+                }
+
+                //Remove file from index
+                synchronized (index) {
+                    index.remove(fileName);
+                }
+
+                //Send acknowledgement to client
+                clientOut.println("REMOVE_COMPLETE");
+                clientOut.flush();
+                clientOut.close();
+            }
+        }catch (Exception e) {
+            System.out.println("error "+e);
         }
+    }
 
+    /**
+     * Get r least populated dStores
+     */
+    private static ArrayList<Integer> emptiestDStores(Socket client) {
         ArrayList<Integer> lowestList = new ArrayList<>();
 
-        int i = 0;
-        while ((lowestList.size() <= replicationFactor) && (brokenStores + replicationFactor < stores)) {
-            for(int store : storeNumbers.get(i)){
-                if (checkDStore(dStores.get(store))) {
-                    lowestList.add(store);
-                } else {
-                    brokenStores++;
+        //Lock list of DStores until fully consumed
+        synchronized (storeNumbers) {
+            synchronized (dStores) {
+                int stores = dStores.size();
+                int brokenStores = 0;
+
+                //Build list of DStores
+                int i = 0;
+                while ((lowestList.size() <= replicationFactor) && (brokenStores + replicationFactor < stores)) {
+                    for (int store : storeNumbers.get(i)) {
+                        if (checkDStore(dStores.get(store))) {
+                            lowestList.add(store);
+                        } else {
+                            brokenStores++;
+                        }
+                    }
                 }
             }
         }
 
+        //Remove excess DStores
         while (lowestList.size() > replicationFactor) {
             lowestList.remove(lowestList.size() - 1);
         }
@@ -118,10 +242,12 @@ public class Controller {
     private static ArrayList<Integer> workingDStores() {
         ArrayList<Integer> workingPorts = new ArrayList<>();
 
-        //If dstore is still working add it to the list
-        for (Integer i:dStores.keySet()) {
-            if(checkDStore(dStores.get(i))){
-                workingPorts.add(i);
+        synchronized (dStores) {
+            //If dstore is still working add it to the list
+            for (Integer i : dStores.keySet()) {
+                if (checkDStore(dStores.get(i))) {
+                    workingPorts.add(i);
+                }
             }
         }
 
@@ -145,6 +271,8 @@ public class Controller {
             in.readLine();
 
             active = true;
+            out.close();
+            in.close();
         }catch (Exception e){
             active = false;
         }
@@ -152,74 +280,30 @@ public class Controller {
         return active;
     }
 
-    /**
-     * Remove given file from all DStores
-     */
-    private static void removeCommand(String fileName, Socket client) {
-        try {
-            //Create output stream to client
-            PrintWriter clientOut = new PrintWriter(new OutputStreamWriter(client.getOutputStream()));
-
-            //If file does not exist
-            if (!index.containsKey(fileName)) {
-                clientOut.println("ERROR_FILE_DOES_NOT_EXIST");
-                clientOut.flush();
-            } else {
-                //Get list of DStores associated with given filename
-                int[] stores = index.get(fileName);
-
-                //For each DStore, send "delete filename" command in new thread
-                for (int store : stores) {
-                    decreaseDStore(store);
-
-                    new Thread(() -> {
-                        try {
-                            //Send "REMOVE filename"
-                            PrintWriter out = new PrintWriter(dStores.get(store).getOutputStream());
-                            out.println("REMOVE " + fileName);
-                            out.flush();
-
-                            //Wait for acknowledgement
-                            BufferedReader in = new BufferedReader(new InputStreamReader(dStores.get(store).getInputStream()));
-                            while (!in.readLine().equals("REMOVE_ACK " + fileName)) {
-
-                            }
-
-                        } catch (Exception e) {
-                            System.out.println("error "+e);
-                        }
-                    }).start();
-                }
-
-                //Send acknowledgement to client
-                clientOut.println("REMOVE_COMPLETE");
-                clientOut.flush();
-            }
-        }catch (Exception e){
-            System.out.println("error "+e);
-        }
-    }
-
     /**
      * Add new DStore to list with 0 files
      */
     private static void addDStore(int portNo, int depth){
-        if(!storeNumbers.containsKey(depth)) {
-            storeNumbers.put(depth,(new ArrayList<>()));
+        synchronized (dStores) {
+            if (!storeNumbers.containsKey(depth)) {
+                storeNumbers.put(depth, (new ArrayList<>()));
+            }
+            storeNumbers.get(depth).add(portNo);
         }
-        storeNumbers.get(depth).add(portNo);
     }
 
     /**
      * Increase number of files given DStore has by 1
      */
     private static void increaseDStore(int portNo) {
-        for (int i = 0 ; ;i++) {
-            if(storeNumbers.containsKey(i) && storeNumbers.get(i).contains(portNo)) {
-                storeNumbers.get(i).remove(portNo);
+        synchronized (storeNumbers) {
+            for (int i = 0; ; i++) {
+                if (storeNumbers.get(i).contains(portNo)) {
+                    storeNumbers.get(i).remove(portNo);
 
-                addDStore(portNo,i+1);
-                break;
+                    addDStore(portNo, i + 1);
+                    break;
+                }
             }
         }
     }
@@ -228,15 +312,34 @@ public class Controller {
      * Decrease number of files given DStore has by 1
      */
     private static void decreaseDStore(int portNo) {
-        for (int i = 0 ; ;i++) {
-            if(storeNumbers.containsKey(i) && storeNumbers.get(i).contains(portNo)) {
-                storeNumbers.get(i).remove(portNo);
+        synchronized (storeNumbers) {
+            for (int i = 0; ; i++) {
+                if (storeNumbers.get(i).contains(portNo)) {
+                    storeNumbers.get(i).remove(portNo);
 
-                addDStore(portNo,i-1);
-                break;
+                    addDStore(portNo, i - 1);
+                    break;
+                }
             }
         }
     }
 
-
+    /**
+     * If not enough DStores, send "ERROR_NOT_ENOUGH_DSTORES"
+     */
+    private static boolean enoughDStores(Socket client) {
+        if (dStores.size() < replicationFactor) {
+            try {
+                PrintWriter clientOut = new PrintWriter(new OutputStreamWriter(client.getOutputStream()));
+                clientOut.println("ERROR_NOT_ENOUGH_DSTORES");
+                clientOut.flush();
+                clientOut.close();
+            }catch (Exception e){
+                System.out.println("error "+e);
+            }
+            return false;
+        } else {
+            return true;
+        }
+    }
 }
-- 
GitLab