diff --git a/src/Controller.java b/src/Controller.java index 46c08b82b92a295f874a7e4616ae54d013ccade0..8e9cba62ecfb42a6bb9f847bf4801e2ff157e176 100644 --- a/src/Controller.java +++ b/src/Controller.java @@ -3,6 +3,8 @@ import java.net.ServerSocket; import java.net.Socket; import java.util.ArrayList; import java.util.HashMap; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicBoolean; public class Controller { @@ -47,7 +49,7 @@ public class Controller { //Process depending on command received switch (command) { - case "STORE" -> storeCommand(client); + case "STORE" -> storeCommand(client, received.split(" ")[1]); case "LOAD" -> loadCommand(client, received.split(" ")[1]); case "REMOVE" -> removeCommand(received.split(" ")[1], client); case "LIST" -> listCommand(client); @@ -76,19 +78,77 @@ public class Controller { /** * TODO Send store commands to client */ - private static synchronized void storeCommand(Socket client) { - //If file already exists, send error + private static synchronized void storeCommand(Socket client, String fileName) { + try { + //Prepare out put stream to client + PrintWriter clientOut = new PrintWriter(new OutputStreamWriter(client.getOutputStream())); - //If file does not exist, add as empty entry, select r DStores + //If file already exists, send error + synchronized (index) { + if (index.containsKey(fileName)) { + //TODO Work out what this is meant to say lol + clientOut.println(""); + clientOut.flush(); + return; + } + } - //Send list of store ports to client + //Marker that all acknowledgements received + AtomicBoolean allReceived = new AtomicBoolean(true); + ArrayList<Integer> candidateStores; - //Wait for store ack from each client + synchronized (dStores) { + //If file does not exist, select r DStores + candidateStores = emptiestDStores(); + + //Build command to send to client + StringBuilder listOfPorts = new StringBuilder(); + listOfPorts.append("STORE_TO"); + for (int port : candidateStores) { + listOfPorts.append(" ").append(port); + } - //If any not received, remove from index and decrease DStore numbers + //Send to client + clientOut.println(listOfPorts); + clientOut.flush(); - //If all received, send ack to client, store list to index & increment DStore numbers + //Create latch to wait for all acknowledgements + CountDownLatch ackLatch = new CountDownLatch(candidateStores.size()); + //New threads to sait for store ack from each client + for (int store : candidateStores) { + new Thread(new WaitForAcknowledgement(ackLatch, allReceived, dStores.get(store),fileName)).start(); + } + + //Await countdown before releasing hold on DStore list + ackLatch.await(); + } + + + if (allReceived.get()) { + //TODO Send store ACK to client + clientOut.println(""); + clientOut.flush(); + + synchronized (index) { + synchronized (storeNumbers) { + //Add DStores to index + index.put(fileName, candidateStores.stream().mapToInt(i -> i).toArray()); + + //Add increment numbers for each DStore + for (int dStore : candidateStores) { + increaseDStore(dStore); + } + } + } + } + + //Close streams + clientOut.close(); + client.close(); + } catch (Exception e) { + System.out.println("error "+e); + } } /** @@ -216,24 +276,22 @@ public class Controller { /** * Get r least populated dStores */ - private static ArrayList<Integer> emptiestDStores(Socket client) { + private static ArrayList<Integer> emptiestDStores() { ArrayList<Integer> lowestList = new ArrayList<>(); //Lock list of DStores until fully consumed synchronized (storeNumbers) { - synchronized (dStores) { - int stores = dStores.size(); - int brokenStores = 0; - - //Build list of DStores - int i = 0; - while ((lowestList.size() <= replicationFactor) && (brokenStores + replicationFactor < stores)) { - for (int store : storeNumbers.get(i)) { - if (checkDStore(dStores.get(store))) { - lowestList.add(store); - } else { - brokenStores++; - } + int stores = dStores.size(); + int brokenStores = 0; + + //Build list of DStores + int i = 0; + while ((lowestList.size() <= replicationFactor) && (brokenStores + replicationFactor < stores)) { + for (int store : storeNumbers.get(i)) { + if (checkDStore(dStores.get(store))) { + lowestList.add(store); + } else { + brokenStores++; } } } @@ -247,24 +305,6 @@ public class Controller { return lowestList; } - /** - * Check for all working dStores - */ - private static ArrayList<Integer> workingDStores() { - ArrayList<Integer> workingPorts = new ArrayList<>(); - - synchronized (dStores) { - //If dstore is still working add it to the list - for (Integer i : dStores.keySet()) { - if (checkDStore(dStores.get(i))) { - workingPorts.add(i); - } - } - } - - return workingPorts; - } - /** * Test if given DStore is still active */ diff --git a/src/WaitForAcknowledgement.java b/src/WaitForAcknowledgement.java new file mode 100644 index 0000000000000000000000000000000000000000..5c7da691316ac25709610b5efb2dd16765f754ed --- /dev/null +++ b/src/WaitForAcknowledgement.java @@ -0,0 +1,37 @@ +import java.io.BufferedReader; +import java.io.InputStreamReader; +import java.net.Socket; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicBoolean; + +public class WaitForAcknowledgement implements Runnable { + private final CountDownLatch countDownLatch; + private final AtomicBoolean allReceived; + private final Socket client; + private final String fileName; + + public WaitForAcknowledgement (CountDownLatch c, AtomicBoolean a, Socket cl, String f) { + countDownLatch = c; + allReceived = a; + client = cl; + fileName = f; + } + + @Override + public void run() { + try { + BufferedReader receiver = new BufferedReader(new InputStreamReader(client.getInputStream())); + String response = receiver.readLine(); + while(true) { + if (!(response == null) && response.equals("STORE_ACK " + fileName)) { + break; + } + } + } catch (Exception e) { + allReceived.set(false); + } + + //Alert countdown latch of completion + countDownLatch.countDown(); + } +}