Skip to content
Snippets Groups Projects
Commit 86f442de authored by pr1n19's avatar pr1n19
Browse files

Remove command updated to handle errors

Structure added to manage DStore file counts
Structure for store, load and list added
parent 27d7de0a
No related branches found
No related tags found
No related merge requests found
import java.io.BufferedReader; import java.io.*;
import java.io.InputStreamReader;
import java.net.ServerSocket; import java.net.ServerSocket;
import java.net.Socket; import java.net.Socket;
import java.util.ArrayList; import java.util.ArrayList;
...@@ -10,20 +9,19 @@ public class Controller { ...@@ -10,20 +9,19 @@ public class Controller {
//Ports of each dStore, and associated socket //Ports of each dStore, and associated socket
private static final HashMap<Integer,Socket> dStores = new HashMap<>(); private static final HashMap<Integer,Socket> dStores = new HashMap<>();
//Ports of each dStore, and associated number of files //Ports of each dStore, and associated number of files
private static final HashMap<Integer,Integer> storeNumbers = new HashMap<>(); private static final HashMap<Integer,ArrayList<Integer>> storeNumbers = new HashMap<>();
//Store filename & ports //Store filename & ports
private static final HashMap<String,int[]> index = new HashMap<>(); private static final HashMap<String,int[]> index = new HashMap<>();
private static int replicationFactor;
public static void main(String[] args) { public static void main(String[] args) {
int cPort = Integer.parseInt(args[0]); int cPort = Integer.parseInt(args[0]);
int replicationFactor = Integer.parseInt(args[1]); replicationFactor = Integer.parseInt(args[1]);
int timeOut = Integer.parseInt(args[2]); int timeOut = Integer.parseInt(args[2]);
int rebalancePeriod = Integer.parseInt(args[3]); int rebalancePeriod = Integer.parseInt(args[3]);
try{ try{
//Create socket for listening //Create socket for listening
ServerSocket listeningSocket = new ServerSocket(cPort); ServerSocket listeningSocket = new ServerSocket(cPort);
...@@ -40,14 +38,15 @@ public class Controller { ...@@ -40,14 +38,15 @@ public class Controller {
//Process depending on command received //Process depending on command received
switch (command) { switch (command) {
case "STORE" -> System.out.println("STORE"); case "STORE" -> storeCommand(client);
case "LOAD" -> System.out.println("LOAD"); case "LOAD" -> loadCommand(client, received.split(" ")[1]);
case "REMOVE" -> removeFile(received.split(" ")[1], client); case "REMOVE" -> removeCommand(received.split(" ")[1], client);
case "LIST" -> System.out.println("LIST"); case "LIST" -> listCommand();
case "JOIN" -> { case "JOIN" -> {
int portNo = Integer.parseInt(received.split(" ")[1]); int portNo = Integer.parseInt(received.split(" ")[1]);
client.setSoTimeout(timeOut);
dStores.put(portNo, client); dStores.put(portNo, client);
storeNumbers.put(portNo, 0); addDStore(portNo, 0);
} }
} }
}catch (Exception e) { }catch (Exception e) {
...@@ -56,19 +55,66 @@ public class Controller { ...@@ -56,19 +55,66 @@ public class Controller {
} }
}).start(); }).start();
}catch (Exception e){ }catch (Exception e){
System.out.println("error "+e); System.out.println("error "+e);
} }
} }
//TODO Get r least populated DStores /**
private static ArrayList<Integer> emptyDStores() { * TODO Send store commands to client
return new ArrayList<>(); */
private static void storeCommand(Socket client) {
}
/**
* TODO Send load commands to client
*/
private static void loadCommand(Socket client, String fileName) {
}
/**
* TODO Send compiled list to client
*/
private static void listCommand(Socket client) {
}
/**
* Get r least populated dStores
*/
private static ArrayList<Integer> emptiestDStores() throws Exception{
//If not enough DStores, throw exception
int stores = dStores.size();
int brokenStores = 0;
if(stores < replicationFactor) {
throw new Exception("Not enough DStores");
}
ArrayList<Integer> lowestList = new ArrayList<>();
int i = 0;
while ((lowestList.size() <= replicationFactor) && (brokenStores + replicationFactor < stores)) {
for(int store : storeNumbers.get(i)){
if (checkDStore(dStores.get(store))) {
lowestList.add(store);
} else {
brokenStores++;
}
}
}
while (lowestList.size() > replicationFactor) {
lowestList.remove(lowestList.size() - 1);
} }
//Check for all working dStores return lowestList;
}
/**
* Check for all working dStores
*/
private static ArrayList<Integer> workingDStores() { private static ArrayList<Integer> workingDStores() {
ArrayList<Integer> workingPorts = new ArrayList<>(); ArrayList<Integer> workingPorts = new ArrayList<>();
...@@ -82,13 +128,115 @@ public class Controller { ...@@ -82,13 +128,115 @@ public class Controller {
return workingPorts; return workingPorts;
} }
//TODO Test if given DStore is still active /**
* Test if given DStore is still active
*/
private static boolean checkDStore(Socket dStore) { private static boolean checkDStore(Socket dStore) {
return false; boolean active;
try{
//Write out
PrintWriter out = new PrintWriter(new OutputStreamWriter(dStore.getOutputStream()));
out.println("LIST");
out.flush();
//Receive in
BufferedReader in = new BufferedReader(new InputStreamReader(dStore.getInputStream()));
in.readLine();
active = true;
}catch (Exception e){
active = false;
}
return active;
} }
//TODO Remove file from given DStores /**
private static void removeFile(String fileName, Socket client) { * Remove given file from all DStores
*/
private static void removeCommand(String fileName, Socket client) {
try {
//Create output stream to client
PrintWriter clientOut = new PrintWriter(new OutputStreamWriter(client.getOutputStream()));
//If file does not exist
if (!index.containsKey(fileName)) {
clientOut.println("ERROR_FILE_DOES_NOT_EXIST");
clientOut.flush();
} else {
//Get list of DStores associated with given filename
int[] stores = index.get(fileName);
//For each DStore, send "delete filename" command in new thread
for (int store : stores) {
decreaseDStore(store);
new Thread(() -> {
try {
//Send "REMOVE filename"
PrintWriter out = new PrintWriter(dStores.get(store).getOutputStream());
out.println("REMOVE " + fileName);
out.flush();
//Wait for acknowledgement
BufferedReader in = new BufferedReader(new InputStreamReader(dStores.get(store).getInputStream()));
while (!in.readLine().equals("REMOVE_ACK " + fileName)) {
}
} catch (Exception e) {
System.out.println("error "+e);
}
}).start();
}
//Send acknowledgement to client
clientOut.println("REMOVE_COMPLETE");
clientOut.flush();
}
}catch (Exception e){
System.out.println("error "+e);
}
} }
/**
* Add new DStore to list with 0 files
*/
private static void addDStore(int portNo, int depth){
if(!storeNumbers.containsKey(depth)) {
storeNumbers.put(depth,(new ArrayList<>()));
}
storeNumbers.get(depth).add(portNo);
}
/**
* Increase number of files given DStore has by 1
*/
private static void increaseDStore(int portNo) {
for (int i = 0 ; ;i++) {
if(storeNumbers.containsKey(i) && storeNumbers.get(i).contains(portNo)) {
storeNumbers.get(i).remove(portNo);
addDStore(portNo,i+1);
break;
}
}
}
/**
* Decrease number of files given DStore has by 1
*/
private static void decreaseDStore(int portNo) {
for (int i = 0 ; ;i++) {
if(storeNumbers.containsKey(i) && storeNumbers.get(i).contains(portNo)) {
storeNumbers.get(i).remove(portNo);
addDStore(portNo,i-1);
break;
}
}
}
} }
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment