Skip to content
Snippets Groups Projects
Commit 5f6d8cca authored by pr1n19's avatar pr1n19
Browse files

Completed store command

parent 85ca8e1f
No related branches found
No related tags found
No related merge requests found
...@@ -3,6 +3,8 @@ import java.net.ServerSocket; ...@@ -3,6 +3,8 @@ import java.net.ServerSocket;
import java.net.Socket; import java.net.Socket;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.HashMap; import java.util.HashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
public class Controller { public class Controller {
...@@ -47,7 +49,7 @@ public class Controller { ...@@ -47,7 +49,7 @@ public class Controller {
//Process depending on command received //Process depending on command received
switch (command) { switch (command) {
case "STORE" -> storeCommand(client); case "STORE" -> storeCommand(client, received.split(" ")[1]);
case "LOAD" -> loadCommand(client, received.split(" ")[1]); case "LOAD" -> loadCommand(client, received.split(" ")[1]);
case "REMOVE" -> removeCommand(received.split(" ")[1], client); case "REMOVE" -> removeCommand(received.split(" ")[1], client);
case "LIST" -> listCommand(client); case "LIST" -> listCommand(client);
...@@ -76,19 +78,77 @@ public class Controller { ...@@ -76,19 +78,77 @@ public class Controller {
/** /**
* TODO Send store commands to client * TODO Send store commands to client
*/ */
private static synchronized void storeCommand(Socket client) { private static synchronized void storeCommand(Socket client, String fileName) {
try {
//Prepare out put stream to client
PrintWriter clientOut = new PrintWriter(new OutputStreamWriter(client.getOutputStream()));
//If file already exists, send error //If file already exists, send error
synchronized (index) {
if (index.containsKey(fileName)) {
//TODO Work out what this is meant to say lol
clientOut.println("");
clientOut.flush();
return;
}
}
//If file does not exist, add as empty entry, select r DStores //Marker that all acknowledgements received
AtomicBoolean allReceived = new AtomicBoolean(true);
ArrayList<Integer> candidateStores;
//Send list of store ports to client synchronized (dStores) {
//If file does not exist, select r DStores
candidateStores = emptiestDStores();
//Wait for store ack from each client //Build command to send to client
StringBuilder listOfPorts = new StringBuilder();
listOfPorts.append("STORE_TO");
for (int port : candidateStores) {
listOfPorts.append(" ").append(port);
}
//If any not received, remove from index and decrease DStore numbers //Send to client
clientOut.println(listOfPorts);
clientOut.flush();
//Create latch to wait for all acknowledgements
CountDownLatch ackLatch = new CountDownLatch(candidateStores.size());
//New threads to sait for store ack from each client
for (int store : candidateStores) {
new Thread(new WaitForAcknowledgement(ackLatch, allReceived, dStores.get(store),fileName)).start();
}
//Await countdown before releasing hold on DStore list
ackLatch.await();
}
if (allReceived.get()) {
//TODO Send store ACK to client
clientOut.println("");
clientOut.flush();
synchronized (index) {
synchronized (storeNumbers) {
//Add DStores to index
index.put(fileName, candidateStores.stream().mapToInt(i -> i).toArray());
//If all received, send ack to client, store list to index & increment DStore numbers //Add increment numbers for each DStore
for (int dStore : candidateStores) {
increaseDStore(dStore);
}
}
}
}
//Close streams
clientOut.close();
client.close();
} catch (Exception e) {
System.out.println("error "+e);
}
} }
/** /**
...@@ -216,12 +276,11 @@ public class Controller { ...@@ -216,12 +276,11 @@ public class Controller {
/** /**
* Get r least populated dStores * Get r least populated dStores
*/ */
private static ArrayList<Integer> emptiestDStores(Socket client) { private static ArrayList<Integer> emptiestDStores() {
ArrayList<Integer> lowestList = new ArrayList<>(); ArrayList<Integer> lowestList = new ArrayList<>();
//Lock list of DStores until fully consumed //Lock list of DStores until fully consumed
synchronized (storeNumbers) { synchronized (storeNumbers) {
synchronized (dStores) {
int stores = dStores.size(); int stores = dStores.size();
int brokenStores = 0; int brokenStores = 0;
...@@ -237,7 +296,6 @@ public class Controller { ...@@ -237,7 +296,6 @@ public class Controller {
} }
} }
} }
}
//Remove excess DStores //Remove excess DStores
while (lowestList.size() > replicationFactor) { while (lowestList.size() > replicationFactor) {
...@@ -247,24 +305,6 @@ public class Controller { ...@@ -247,24 +305,6 @@ public class Controller {
return lowestList; return lowestList;
} }
/**
* Check for all working dStores
*/
private static ArrayList<Integer> workingDStores() {
ArrayList<Integer> workingPorts = new ArrayList<>();
synchronized (dStores) {
//If dstore is still working add it to the list
for (Integer i : dStores.keySet()) {
if (checkDStore(dStores.get(i))) {
workingPorts.add(i);
}
}
}
return workingPorts;
}
/** /**
* Test if given DStore is still active * Test if given DStore is still active
*/ */
......
import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.net.Socket;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
public class WaitForAcknowledgement implements Runnable {
private final CountDownLatch countDownLatch;
private final AtomicBoolean allReceived;
private final Socket client;
private final String fileName;
public WaitForAcknowledgement (CountDownLatch c, AtomicBoolean a, Socket cl, String f) {
countDownLatch = c;
allReceived = a;
client = cl;
fileName = f;
}
@Override
public void run() {
try {
BufferedReader receiver = new BufferedReader(new InputStreamReader(client.getInputStream()));
String response = receiver.readLine();
while(true) {
if (!(response == null) && response.equals("STORE_ACK " + fileName)) {
break;
}
}
} catch (Exception e) {
allReceived.set(false);
}
//Alert countdown latch of completion
countDownLatch.countDown();
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment