Skip to content
Snippets Groups Projects
Commit 6e07a95e authored by pr1n19's avatar pr1n19
Browse files

Synchronisation added to maintain continuity

List command & Load command completed
parent 86f442de
No related branches found
No related tags found
No related merge requests found
......@@ -10,10 +10,8 @@ public class Controller {
private static final HashMap<Integer,Socket> dStores = new HashMap<>();
//Ports of each dStore, and associated number of files
private static final HashMap<Integer,ArrayList<Integer>> storeNumbers = new HashMap<>();
//Store filename & ports
private static final HashMap<String,int[]> index = new HashMap<>();
private static int replicationFactor;
public static void main(String[] args) {
......@@ -32,23 +30,38 @@ public class Controller {
try {
//Accept new TCP connection & get command
Socket client = listeningSocket.accept();
BufferedReader in = new BufferedReader(new InputStreamReader(client.getInputStream()));
String received = in.readLine();
String command = received.split(" ")[0];
//Process depending on command received
switch (command) {
case "STORE" -> storeCommand(client);
case "LOAD" -> loadCommand(client, received.split(" ")[1]);
case "REMOVE" -> removeCommand(received.split(" ")[1], client);
case "LIST" -> listCommand();
case "JOIN" -> {
int portNo = Integer.parseInt(received.split(" ")[1]);
client.setSoTimeout(timeOut);
dStores.put(portNo, client);
addDStore(portNo, 0);
new Thread(() -> {
try {
BufferedReader in = new BufferedReader(new InputStreamReader(client.getInputStream()));
String received = in.readLine();
String command = received.split(" ")[0];
//If not enough DStores alert client except for JOIN requests
if(!command.equals("JOIN")){
synchronized (dStores) {
if(!enoughDStores(client)) {
return;
}
}
}
//Process depending on command received
switch (command) {
case "STORE" -> storeCommand(client);
case "LOAD" -> loadCommand(client, received.split(" ")[1]);
case "REMOVE" -> removeCommand(received.split(" ")[1], client);
case "LIST" -> listCommand(client);
case "JOIN" -> {
int portNo = Integer.parseInt(received.split(" ")[1]);
client.setSoTimeout(timeOut);
dStores.put(portNo, client);
addDStore(portNo, 0);
}
}
}catch (Exception e) {
System.out.println("error "+e);
}
}
}).start();
}catch (Exception e) {
System.out.println("error "+e);
}
......@@ -68,43 +81,154 @@ public class Controller {
}
/**
* TODO Send load commands to client
* Send load commands to client
*/
private static void loadCommand(Socket client, String fileName) {
try {
PrintWriter clientOut = new PrintWriter(new OutputStreamWriter(client.getOutputStream()));
int selectedStore = -1;
synchronized (index) {
synchronized (dStores) {
//If file does not exist, send error
if (!index.containsKey(fileName)) {
clientOut.println("ERROR_FILE_DOES_NOT_EXIST");
}
//Get port of a working DStore with required file
for (int store : index.get(fileName)) {
if (checkDStore(dStores.get(store))) {
selectedStore = store;
break;
}
}
}
}
//If no selected store, send "ERROR_LOAD", else send port no of open client
if (selectedStore == -1) {
clientOut.println("ERROR_LOAD");
} else {
clientOut.println("LOAD_DATA "+selectedStore);
}
//Flush message through & close print writer
clientOut.flush();
clientOut.close();
}catch (Exception e) {
System.out.println("error "+e);
}
}
/**
* TODO Send compiled list to client
* Send compiled list to client
*/
private static void listCommand(Socket client) {
StringBuilder list = new StringBuilder();
list.append("LIST");
//Build list
synchronized (index) {
for (String file : index.keySet()) {
list.append(" ");
list.append(file);
}
}
//Send list to client
try {
PrintWriter clientOut = new PrintWriter(new OutputStreamWriter(client.getOutputStream()));
clientOut.println(list);
clientOut.flush();
clientOut.close();
}catch (Exception e) {
System.out.println("error "+e);
}
}
/**
* Get r least populated dStores
* TODO Remove given file from all DStores
*/
private static ArrayList<Integer> emptiestDStores() throws Exception{
//If not enough DStores, throw exception
int stores = dStores.size();
int brokenStores = 0;
if(stores < replicationFactor) {
throw new Exception("Not enough DStores");
private static void removeCommand(String fileName, Socket client) {
try {
//Create output stream to client
PrintWriter clientOut = new PrintWriter(new OutputStreamWriter(client.getOutputStream()));
//If file does not exist
if (!index.containsKey(fileName)) {
clientOut.println("ERROR_FILE_DOES_NOT_EXIST");
clientOut.flush();
} else {
//Get list of DStores associated with given filename
int[] stores = index.get(fileName);
//For each DStore, send "delete filename" command in new thread
for (int store : stores) {
decreaseDStore(store);
new Thread(() -> {
try {
//Send "REMOVE filename"
PrintWriter out = new PrintWriter(dStores.get(store).getOutputStream());
out.println("REMOVE " + fileName);
out.flush();
//Wait for acknowledgement
BufferedReader in = new BufferedReader(new InputStreamReader(dStores.get(store).getInputStream()));
while (true) {
String response = in.readLine();
if(!(response == null) && response.equals("REMOVE_ACK " + fileName)){
break;
}
}
} catch (Exception e) {
System.out.println("error "+e);
}
}).start();
}
//Remove file from index
synchronized (index) {
index.remove(fileName);
}
//Send acknowledgement to client
clientOut.println("REMOVE_COMPLETE");
clientOut.flush();
clientOut.close();
}
}catch (Exception e) {
System.out.println("error "+e);
}
}
/**
* Get r least populated dStores
*/
private static ArrayList<Integer> emptiestDStores(Socket client) {
ArrayList<Integer> lowestList = new ArrayList<>();
int i = 0;
while ((lowestList.size() <= replicationFactor) && (brokenStores + replicationFactor < stores)) {
for(int store : storeNumbers.get(i)){
if (checkDStore(dStores.get(store))) {
lowestList.add(store);
} else {
brokenStores++;
//Lock list of DStores until fully consumed
synchronized (storeNumbers) {
synchronized (dStores) {
int stores = dStores.size();
int brokenStores = 0;
//Build list of DStores
int i = 0;
while ((lowestList.size() <= replicationFactor) && (brokenStores + replicationFactor < stores)) {
for (int store : storeNumbers.get(i)) {
if (checkDStore(dStores.get(store))) {
lowestList.add(store);
} else {
brokenStores++;
}
}
}
}
}
//Remove excess DStores
while (lowestList.size() > replicationFactor) {
lowestList.remove(lowestList.size() - 1);
}
......@@ -118,10 +242,12 @@ public class Controller {
private static ArrayList<Integer> workingDStores() {
ArrayList<Integer> workingPorts = new ArrayList<>();
//If dstore is still working add it to the list
for (Integer i:dStores.keySet()) {
if(checkDStore(dStores.get(i))){
workingPorts.add(i);
synchronized (dStores) {
//If dstore is still working add it to the list
for (Integer i : dStores.keySet()) {
if (checkDStore(dStores.get(i))) {
workingPorts.add(i);
}
}
}
......@@ -145,6 +271,8 @@ public class Controller {
in.readLine();
active = true;
out.close();
in.close();
}catch (Exception e){
active = false;
}
......@@ -152,74 +280,30 @@ public class Controller {
return active;
}
/**
* Remove given file from all DStores
*/
private static void removeCommand(String fileName, Socket client) {
try {
//Create output stream to client
PrintWriter clientOut = new PrintWriter(new OutputStreamWriter(client.getOutputStream()));
//If file does not exist
if (!index.containsKey(fileName)) {
clientOut.println("ERROR_FILE_DOES_NOT_EXIST");
clientOut.flush();
} else {
//Get list of DStores associated with given filename
int[] stores = index.get(fileName);
//For each DStore, send "delete filename" command in new thread
for (int store : stores) {
decreaseDStore(store);
new Thread(() -> {
try {
//Send "REMOVE filename"
PrintWriter out = new PrintWriter(dStores.get(store).getOutputStream());
out.println("REMOVE " + fileName);
out.flush();
//Wait for acknowledgement
BufferedReader in = new BufferedReader(new InputStreamReader(dStores.get(store).getInputStream()));
while (!in.readLine().equals("REMOVE_ACK " + fileName)) {
}
} catch (Exception e) {
System.out.println("error "+e);
}
}).start();
}
//Send acknowledgement to client
clientOut.println("REMOVE_COMPLETE");
clientOut.flush();
}
}catch (Exception e){
System.out.println("error "+e);
}
}
/**
* Add new DStore to list with 0 files
*/
private static void addDStore(int portNo, int depth){
if(!storeNumbers.containsKey(depth)) {
storeNumbers.put(depth,(new ArrayList<>()));
synchronized (dStores) {
if (!storeNumbers.containsKey(depth)) {
storeNumbers.put(depth, (new ArrayList<>()));
}
storeNumbers.get(depth).add(portNo);
}
storeNumbers.get(depth).add(portNo);
}
/**
* Increase number of files given DStore has by 1
*/
private static void increaseDStore(int portNo) {
for (int i = 0 ; ;i++) {
if(storeNumbers.containsKey(i) && storeNumbers.get(i).contains(portNo)) {
storeNumbers.get(i).remove(portNo);
synchronized (storeNumbers) {
for (int i = 0; ; i++) {
if (storeNumbers.get(i).contains(portNo)) {
storeNumbers.get(i).remove(portNo);
addDStore(portNo,i+1);
break;
addDStore(portNo, i + 1);
break;
}
}
}
}
......@@ -228,15 +312,34 @@ public class Controller {
* Decrease number of files given DStore has by 1
*/
private static void decreaseDStore(int portNo) {
for (int i = 0 ; ;i++) {
if(storeNumbers.containsKey(i) && storeNumbers.get(i).contains(portNo)) {
storeNumbers.get(i).remove(portNo);
synchronized (storeNumbers) {
for (int i = 0; ; i++) {
if (storeNumbers.get(i).contains(portNo)) {
storeNumbers.get(i).remove(portNo);
addDStore(portNo,i-1);
break;
addDStore(portNo, i - 1);
break;
}
}
}
}
/**
* If not enough DStores, send "ERROR_NOT_ENOUGH_DSTORES"
*/
private static boolean enoughDStores(Socket client) {
if (dStores.size() < replicationFactor) {
try {
PrintWriter clientOut = new PrintWriter(new OutputStreamWriter(client.getOutputStream()));
clientOut.println("ERROR_NOT_ENOUGH_DSTORES");
clientOut.flush();
clientOut.close();
}catch (Exception e){
System.out.println("error "+e);
}
return false;
} else {
return true;
}
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment