diff --git a/Controller.java b/Controller.java new file mode 100644 index 0000000000000000000000000000000000000000..4305b5770c47c6b1a0464d2cacbeddd999ef425d --- /dev/null +++ b/Controller.java @@ -0,0 +1,982 @@ +import java.lang.reflect.Array; +import java.net.*; +import java.io.*; +import java.time.LocalDateTime; +import java.util.*; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +//todo: test two clients joining simultanously + +class Pair <A, B> { + private A first; + private B second; + public Pair(A first, B second) { + this.first = first; + this.second = second; + } + + public A getFirst() { + return this.first; + } + + public B getSecond() { + return this.second; + } + + public void setFirst(A first) { + this.first = first; + } + + public void setSecond(B first) { + this.second = second; + } +} + +public class Controller { + + private int cport; + private int r; + private int timeout; + private int rebalance_period; + + private Controller controller; + + private List<Integer> storePorts; + private List<Thread> runningThreads; + private List<Index> index; + + private Hashtable<String, ArrayList<Integer>> reloadStores; + + Object clientLock = new Object(); + Object concurrentOperationLock = new Object(); + + private final ScheduledExecutorService rebalanceScheduler; + + private AtomicBoolean rebalanceLock = new AtomicBoolean(false); + + public static void main(String[] args) { + if (args.length != 4) { + System.err.println("Invalid number of arguments"); + } else{ + int cport = Integer.parseInt(args[0]); + int r = Integer.parseInt(args[1]); + int timeout = Integer.parseInt(args[2]); + int rebalancePeriod = Integer.parseInt(args[3]); + Controller controller = new Controller(cport, r, timeout, rebalancePeriod); + } + } + + + public Controller(int c, int r, int t, int rp) { + System.out.println("Controller created on port: " + c); + controller = this; + + this.cport = c; + this.r = r; + this.timeout = t; + this.rebalance_period = rp; + + storePorts = Collections.synchronizedList(new ArrayList<Integer>()); + runningThreads = Collections.synchronizedList(new ArrayList<Thread>()); + index = Collections.synchronizedList(new ArrayList<Index>()); + reloadStores = new Hashtable<String, ArrayList<Integer>>(); + + new Thread(new waitForConnections(this)).start(); + + rebalanceScheduler = Executors.newScheduledThreadPool(1); + Runnable rebalance = () -> startRebalance(); + rebalanceScheduler.scheduleAtFixedRate(rebalance, rebalance_period, rebalance_period, TimeUnit.SECONDS); + } + + private int getFloor() { + int numberOfStoredFiles = 0; + for (Index i : index) { + if (i.getStatus().equals("store complete")) { + numberOfStoredFiles += 1; + } + } + int floor = Math.floorDiv((r * numberOfStoredFiles), storePorts.size()); + return floor; + } + + private int getCeilPlusOne() { + int numberOfStoredFiles = 0; + + for (Index i : index) { + if (i.getStatus().equals("store complete")) { + numberOfStoredFiles += 1; + } + } + + numberOfStoredFiles += 1; + int ceil = (int) Math.ceil((r * numberOfStoredFiles) / (float) storePorts.size()); + return ceil; + } + + private int getNumberOfStoredFiles() { + int numberOfStoredFiles = 0; + for (Index i : index) { + if (i.getStatus().equals("store complete")) { + numberOfStoredFiles += 1; + } + } + return numberOfStoredFiles; + } + + private int getCeil() { + int numberOfStoredFiles = 0; + + for (Index i : index) { + if (i.getStatus().equals("store complete")) { + numberOfStoredFiles += 1; + } + } + + numberOfStoredFiles = getNumberOfStoredFiles(); + int ceil = (int) Math.ceil((r * numberOfStoredFiles) / (float) storePorts.size()); + return ceil; + } + + + protected void startRebalance() { + + System.out.println("Rebalance Starting"); + + //This is synchronized so two rebalance operations can't happen simultaneously + synchronized (concurrentOperationLock) { + if (getNumberOfStoredFiles() == 0 || storePorts.size() < r) { + System.out.println("Rebalance Ended - not enough stores or files"); + return; + } + + + int ceil = getCeil(); + int floor = getFloor(); + Random random = new Random(); + + + HashMap<Integer, ArrayList<String>> localFilesStored = new HashMap<>(); + HashMap<Integer, Pair<ArrayList<Pair<String, Integer>>, ArrayList<String>>> swaps = new HashMap<>(); + + for (Thread t : runningThreads) { + if (t instanceof DStoreCommunication) { + ArrayList<String> localFiles = ((DStoreCommunication) t).getFiles(); + localFilesStored.put(((DStoreCommunication) t).port, new ArrayList<>(localFiles)); + } + } + + + //Duplicate the fileStore + HashMap<Integer, ArrayList<String>> newFilesStored = new HashMap<>(); + for (Integer port : localFilesStored.keySet()) { + int copyOfPort = Integer.valueOf(port); + ArrayList<String> copyOfList = new ArrayList<String>(List.copyOf(localFilesStored.get(port))); + newFilesStored.put(copyOfPort, copyOfList); + } + + ArrayList<Integer> sortedBySize = sortHashMap(localFilesStored); + + //First check if each file is replicated at least r times + HashMap<String, Integer> fileCount = new HashMap<>(); + for (int port : localFilesStored.keySet()) { + for (String file : localFilesStored.get(port)) { + if (fileCount.containsKey(file)) { + fileCount.put(file, fileCount.get(file) + 1); + } else { + fileCount.put(file, 1); + } + } + } + + + //Any files that remain need to be replicated + for (String file : fileCount.keySet()) { + if (fileCount.get(file) < r) { + while (fileCount.get(file) < r) { + for (int port : sortedBySize) { + if (newFilesStored.get(port).size() + 1 <= ceil && !newFilesStored.get(port).contains(file)) { + newFilesStored.get(port).add(file); + fileCount.put(file, fileCount.get(file) + 1); + } + } + } + } + } + //Now every file should be replicated at least r times so we can sort the stores + int count = 1; + while (!checkValid(newFilesStored, ceil, floor)) { + count++; + //If not valid then try to take from the highest store and place in the lowest + ArrayList<Integer> highestFirst = sortHashMap(newFilesStored); + sortedBySize = new ArrayList<>(highestFirst); + Collections.reverse(sortedBySize); + boolean movedFile = false; + while (!movedFile) { + int highestPort = highestFirst.get(highestFirst.size() - 1); + ArrayList<String> filesInHighestPort = newFilesStored.get(highestPort); + String randomFile = filesInHighestPort.get(random.nextInt(filesInHighestPort.size())); + for (int port : sortedBySize) { + if (newFilesStored.get(port).size() + 1 <= ceil && !newFilesStored.get(port).contains(randomFile)) { + filesInHighestPort.remove(randomFile); + newFilesStored.get(port).add(randomFile); + movedFile = true; + } + } + } + } + + + //Update the index + HashMap<String, ArrayList<Integer>> indexUpdate = new HashMap<>(); + for (int port : newFilesStored.keySet()) { + ArrayList<String> filesInPort = newFilesStored.get(port); + for (String file : filesInPort) { + if (indexUpdate.containsKey(file)) { + indexUpdate.get(file).add(port); + } else { + indexUpdate.put(file, new ArrayList<>(Arrays.asList(port))); + } + } + } + + + for (Index i : index) { + i.setStores(indexUpdate.get(i.getFilename())); + } + + HashMap<String, ArrayList<Integer>> portsThatNeedFiles = new HashMap<>(); + + + for (int port : newFilesStored.keySet()) { + for (String file : newFilesStored.get(port)) { + if (!localFilesStored.get(port).contains(file)) { + if (portsThatNeedFiles.containsKey(file)) { + portsThatNeedFiles.get(file).add(port); + } else { + portsThatNeedFiles.put(file, new ArrayList<>(Arrays.asList(port))); + } + } + } + } + + + HashMap<Integer, String> outputStrings = new HashMap<>(); + + String fileToRemove = null; + + for (int port : newFilesStored.keySet()) { + ArrayList<String> filesAlreadyInPort = localFilesStored.get(port); + ArrayList<Pair<String, ArrayList<Integer>>> outputArray = new ArrayList<>(); + if (fileToRemove != null) { + portsThatNeedFiles.remove(fileToRemove); + } + fileToRemove = null; + for (String file : portsThatNeedFiles.keySet()) { + if (filesAlreadyInPort.contains(file)) { + System.out.println("port: " + port + " contains: " + file); + Pair fileSwitch = new Pair(file, portsThatNeedFiles.get(file)); + fileToRemove = file; + outputArray.add(fileSwitch); + } + } + String outputString = "REBALANCE " + outputArray.size() + " "; + if (!(outputArray.size() == 0)) { + for (Pair<String, ArrayList<Integer>> pair : outputArray) { + outputString += pair.getFirst() + " " + pair.getSecond().size() + " "; + for (int outputPorts : pair.getSecond()) { + outputString += outputPorts + " "; + } + } + } + outputStrings.put(port, outputString); + } + + for (Thread t : runningThreads) { + if (t instanceof DStoreCommunication) { + ((DStoreCommunication) t).rebalanceOperation(newFilesStored, outputStrings.get(((DStoreCommunication) t).port)); + } + } + + System.out.println("15 SECOND SLEEP RUN STORE NOW"); + try { + Thread.sleep(15000); + } catch (Exception e) { + e.printStackTrace(); + } + + System.out.println("REBALANCE FINISHED: " + newFilesStored); + } + + + + } + + private boolean checkValid(HashMap<Integer, ArrayList<String>> map, int ceil, int floor) { + for (int port : map.keySet()) { + if (map.get(port).size() < floor || map.get(port).size() > ceil) { + return false; + } + } + return true; + } + + private ArrayList<Integer> sortHashMap(HashMap<Integer, ArrayList<String>> localFilesStored) { + ArrayList<Integer> sortedFileMap = new ArrayList<>(localFilesStored.keySet()); + Collections.sort(sortedFileMap, new Comparator<Integer>() { + @Override + public int compare(Integer o1, Integer o2) { + return localFilesStored.get(o1).size() - localFilesStored.get(o2).size(); + } + }); + return sortedFileMap; + } + + protected HashMap<Integer, ArrayList<String>> getStoreFileMap() { + HashMap<Integer, ArrayList<String>> storeFileMap = new HashMap<>(); + for (Index i : index) { + if (i.getStatus().equals("store complete")) { + for (int port : i.getStores()) { + if (storeFileMap.containsKey(port)) { + storeFileMap.get(port).add(i.getFilename()); + } else{ + ArrayList<String> arr = new ArrayList<>(); + arr.add(i.getFilename()); + storeFileMap.put(port, arr); + } + } + } + } + for (int port : storePorts) { + if (! storeFileMap.containsKey(port)) { + storeFileMap.put(port, new ArrayList<String>()); + } + } + return storeFileMap; + } + + + protected void messageStoreThreads(String message) { + for (Thread t : runningThreads) { + if (t instanceof DStoreCommunication) { + ((DStoreCommunication) t).receiveMessage(message); + } + } + } + + protected void messageClientThreads(String message) { + synchronized (clientLock) { + for (Thread t : runningThreads) { + if (t instanceof ClientCommunication) { + ((ClientCommunication) t).receiveMessage(message); + } + } + } + } + + + private class waitForConnections implements Runnable { + + + //controller required for listeners + private Controller controller; + + public waitForConnections(Controller c) { + controller = c; + } + + public void run() { + ServerSocket ss = null; + //Creating serverSocket to accept connections + try { + ss = new ServerSocket(cport); + } catch (IOException e) { + e.printStackTrace(); + } + + //running while loop to always accept new connections + while (true) { + try { + Socket s = ss.accept(); + System.out.println("New socket connection"); + new WaitForFirstMessage(s).start(); + } catch (IOException e) { + e.printStackTrace(); + } + } + } + } + + //Class to handle the first message and start the appropriate thread + private class WaitForFirstMessage extends Thread { + + private Socket socket; + private PrintWriter messageOutput; + private BufferedReader messageInput; + + + public WaitForFirstMessage(Socket s) { + this.socket = s; + } + + @Override + public void run() { + try { + messageInput = new BufferedReader(new InputStreamReader(socket.getInputStream())); + messageOutput = new PrintWriter(socket.getOutputStream(), true); + } catch (IOException e) { + System.out.println("Error Creating message input and output"); + } + //Wait for first message to create correct type of thread + String line; + try { + while ((line = messageInput.readLine()) != null) { + if (line.split(" ")[0].equals("JOIN")) { + //This means a DStore has connected + DStoreCommunication newThread = new DStoreCommunication(line, socket, messageOutput, messageInput); + newThread.start(); + runningThreads.add(newThread); + break; + } else { + ClientCommunication newThread = new ClientCommunication(line, socket, messageOutput, messageInput); + newThread.start(); + runningThreads.add(newThread); + break; + } + } + } catch (IOException e) { + System.err.println("Lost connection with connection before first message sent"); + } + } + } + + private class ClientCommunication extends Thread { + + private Socket socket; + private PrintWriter messageOutput; + private BufferedReader messageInput; + + private String intialMessage; + + //variables responsible for tracking store acks + private AtomicBoolean storeACK; + private String storeACKFilename; + private AtomicInteger storeACKCount; + private AtomicInteger storeACKCountRequired; + private ArrayList<Integer> storeACkStores = new ArrayList<>(); + + //variables responsible for tracking remove acks + private AtomicBoolean removeACK; + private String removeACKFilename; + private AtomicInteger removeACKCount; + private AtomicInteger removeACKCountRequired; + + + public ClientCommunication(String line, Socket s, PrintWriter p, BufferedReader b) { + System.out.println("ClientCommunication thread started"); + this.socket = s; + this.messageOutput = p; + this.messageInput = b; + this.intialMessage = line; + } + + @Override + public void run() { + processClientMessage(intialMessage); + try { + String line; + while ((line = messageInput.readLine()) != null) { + processClientMessage(line); + } + } catch (SocketException e) { + System.out.println("Lost connection with a client"); + } catch (Exception e) { + e.printStackTrace(); + } + } + + private void processClientMessage(String message) { + //split message so we know what to do + String command = message.split(" ")[0]; + if (! (storePorts.size() < r)) { + if (command.equals("STORE")) { + storeOperation(message); + } else if (command.equals("LIST")) { + listOperation(); + } else if (command.equals("REMOVE")) { + removeOperation(message); + } else if (command.equals("LOAD")) { + loadOperation(message); + } else if (command.equals("RELOAD")) { + reloadOperation(message); + } + } else { + messageOutput.println("ERROR_NOT_ENOUGH_DSTORES"); + } + } + + + + private void loadOperation(String message) { + //Split message to get filename + String filename = null; + try { + filename = message.split(" ")[1]; + } catch (IndexOutOfBoundsException e) { + System.err.println("Malformed load message: " + message); + return; + } + System.out.println("Load Operation started for file:" + filename); + + + boolean exists = false; + int filesize = 0; + + + synchronized (clientLock) { + for (Index i : index) { + if (i.getFilename().equals(filename) && i.getStatus().equals("store complete")) { + exists = true; + filesize = i.getFilesize(); + } + } + } + + + if (!exists) { + messageOutput.println("ERROR_FILE_DOES_NOT_EXIST"); + System.out.println("Cannot load file: " + filename + "(does not exist)"); + return; + } + + //Get List of DStores in case of failure + ArrayList<Integer> stores = new ArrayList<>(); + for (int n : storePorts) { + stores.add(n); + } + + //if all DStores don't work + Random random = new Random(); + int storeIndex = random.nextInt(stores.size()); + int store = stores.get(storeIndex); + //Remove from store incase of reload so same store isn't created + stores.remove(storeIndex); + messageOutput.println("LOAD_FROM " + store + " " + filesize); + //We create a dictionary entry to say which stores have already been tried + reloadStores.put(filename, stores); + //if program gets to this point then all stores were tried and all failed so + + } + + private void reloadOperation(String message) { + String filename = null; + try { + filename = message.split(" ")[1]; + } catch (IndexOutOfBoundsException e) { + System.err.println("Malformed reload message: " + message); + return; + } + System.out.println("Reload operation started for file: " + filename); + + + Random random = new Random(); + + //If list of possible stores is empty then it has tried all possible stores so error loading + if (reloadStores.get(filename).size() == 0) { + messageOutput.println("ERROR_LOAD"); + return; + } else { + ArrayList<Integer> stores = reloadStores.get(filename); + int storeIndex = random.nextInt(stores.size()); + int store = stores.get(storeIndex); + stores.remove(storeIndex); + int filesize = 0; + for (Index i : index) { + if (i.getFilename().equals(filename)) { + filesize = i.getFilesize(); + } + } + messageOutput.println("LOAD_FROM " + store + " " + filesize); + } + } + + private void removeOperation(String message) { + //split message to get filename + String filename = null; + try { + filename = message.split(" ")[1]; + } catch (Exception e) { + System.err.println("Error getting filename to remove"); + } + + System.out.println("Remove operation started for file: " + filename); + + boolean exists = false; + for (Index i : index) { + if (i.getFilename().equals(filename)) { + exists = true; + if (i.getStatus().equals("remove in progress")) { + messageOutput.println("ERROR_FILE_DOES_NOT_EXIST"); + return; + } else if (i.getStatus().equals("store_in_progress")) { + messageOutput.println("ERROR_FILE_DOES_NOT_EXIST"); + return; + } + } + } + if (!exists) { + messageOutput.println("ERROR_FILE_DOES_NOT_EXIST"); + return; + } + + try { + updateIndex(filename, "remove in progress", 0); + } catch (Exception e) { + System.err.println(e.getMessage()); + } + + //Send message to all stores to remove + messageStoreThreads(message); + + removeACK = new AtomicBoolean(false); + removeACKFilename = filename; + removeACKCount = new AtomicInteger(0); + removeACKCountRequired = new AtomicInteger(storePorts.size()); + + TimerTask removeACKTimeout = new TimerTask() { + @Override + public void run() { + removeCheck(true); + } + }; + + Timer timer = new Timer(); + timer.schedule(removeACKTimeout, timeout); + } + + private void storeOperation(String line) { + //First split input to get filename and filesize + storeACkStores = new ArrayList<>(); + String filename = null; + int filesize = 0; + try { + filename = line.split(" ")[1]; + filesize = Integer.parseInt(line.split(" ")[2]); + } catch (Exception e) { + System.err.println("Malformed store message: " + line); + } + System.out.println("Starting store operation for file: " + filename); + synchronized (clientLock) { + System.out.println("Synchronized section"); + for (Index i : index) { + System.out.println("Index: " + i.getFilename() + " " + i.getStatus()); + if (i.getFilename().equals(filename)) { + messageOutput.println("ERROR_FILE_ALREADY_EXISTS"); + System.out.println("Sent file already exists"); + return; + } + } + try { + updateIndex(filename, "store in progress", filesize); + System.out.println("index updated"); + } catch (Exception e) { + System.err.println("error updating index"); + } + } + + //create string to send back the stores + String portString = "STORE_TO"; + + HashMap<Integer, ArrayList<String>> storeFileMap = getStoreFileMap(); + ArrayList<Integer> sortedFileMap = new ArrayList<>(storeFileMap.keySet()); + Collections.sort(sortedFileMap, new Comparator<Integer>() { + @Override + public int compare(Integer o1, Integer o2) { + return storeFileMap.get(o1).size() - storeFileMap.get(o2).size(); + } + }); + + + int rCount = 0; + while (rCount < r) { + portString += " " + sortedFileMap.get(rCount); + if (!storeACkStores.contains(sortedFileMap.get(rCount))) { + storeACkStores.add(sortedFileMap.get(rCount)); + } + rCount ++; + } + + messageOutput.println(portString); + + storeACK = new AtomicBoolean(false); + storeACKFilename = filename; + storeACKCount = new AtomicInteger(0); + storeACKCountRequired = new AtomicInteger(r); + + TimerTask storeACKTimeout = new TimerTask() { + @Override + public void run() { + storeCheck(true); + } + }; + + Timer timer = new Timer(); + timer.schedule(storeACKTimeout, timeout); + } + + private void removeCheck(boolean timer) { + try { + if (removeACKCount.get() >= removeACKCountRequired.get() && !removeACK.get() && !timer) { + updateIndex(removeACKFilename, "remove", 0); + messageOutput.println("REMOVE_COMPLETE"); + removeACK.set(true); + removeACKFilename = null; + } else if (timer) { + removeACKFilename = null; + } + } catch (Exception e) { + e.printStackTrace(); + } + } + + private void storeCheck(boolean timer) { + try { + if (!timer) { + if (storeACKCount.get() >= storeACKCountRequired.get() && !storeACK.get()) { + messageOutput.println("STORE_COMPLETE"); + updateIndex(storeACKFilename, "store complete", 0); + for (Index i : index) { + if (i.getFilename().equals(storeACKFilename)) { + if (i.getStores().isEmpty()) { + i.setStores(storeACkStores); + } else { + i.getStores().addAll(storeACkStores); + } + } + } + storeACKFilename = null; + storeACK.set(true); + } + } else if (! storeACK.get()){ + updateIndex(storeACKFilename, "remove", 0); + } + } catch (Exception e) { + e.printStackTrace(); + } + } + + private void updateIndex(String filename, String update, int filesize){ + if (update.equals("store in progress")) { + Index newIndex = new Index(filename, update, filesize); + index.add(newIndex); + } else if (update.equals("remove")) { + Iterator indexIterator = index.listIterator(); + Index toRemove = null; + while (indexIterator.hasNext()) { + Index i = (Index) indexIterator.next(); + if (i.getFilename().equals(filename)) { + toRemove = i; + } + } + if (toRemove != null) { + index.remove(toRemove); + } + } else if (update.equals("store complete")) { + for (Index i : index) { + if (i.getFilename().equals(filename)) { + i.setStatus(update); + } + } + } + } + + private void listOperation() { + String filenameString = "LIST"; + for (Index i : index) { + if (i.getStatus().equals("store complete")) { + filenameString = filenameString + " " + i.getFilename(); + } + } + messageOutput.println(filenameString); + } + + + protected void receiveMessage(String message) { + if (message.split(" ")[0].equals("STORE_ACK")) { + if (message.split(" ")[1].equals(storeACKFilename)) { + int tempCount = storeACKCount.get(); + storeACKCount.set(tempCount + 1); + storeCheck(false); + } + } else if (message.split(" ")[0].equals("REMOVE_ACK")) { + if (message.split(" ")[1].equals(removeACKFilename)) { + int tempCount = removeACKCount.get(); + removeACKCount.set(tempCount + 1); + removeCheck(false); + } + } + } + } + + private class DStoreCommunication extends Thread { + + private Socket socket; + private PrintWriter messageOutput; + private BufferedReader messageInput; + private String listResponse; + + private ArrayList<String> storedFiles = new ArrayList<>(); + + private int port; + + + public DStoreCommunication(String line, Socket s, PrintWriter p, BufferedReader b) { + + this.socket = s; + this.messageOutput = p; + this.messageInput = b; + + //line is initial JOIN message so contains port + String[] splitLine = line.split(" "); + try { + port = Integer.parseInt(splitLine[1]); + storePorts.add(port); + } catch (IndexOutOfBoundsException e) { + System.err.println("Can't split initial join message"); + } + setName("DSTORE Thread:"+port); + System.out.println("DstoreThread started port:" + port); + } + + + protected void rebalanceOperation(HashMap<Integer, ArrayList<String>> updated, String rebalanceString) { + listResponse = null; + messageOutput.println("LIST"); + while (listResponse == null) { + //Stupid java breaks if I remove this + System.out.print(""); + } + String[] splitLine = listResponse.split(" "); + ArrayList<String> filesInStore = new ArrayList<String>(Arrays.asList(Arrays.copyOfRange(splitLine, 1, splitLine.length))); + ArrayList<String> updatedFileList = updated.get(port); + ArrayList<String> removeList = new ArrayList<>(); + int removeCount = 0; + for (String filesCurrentlyThere : filesInStore) { + if (! updatedFileList.contains(filesCurrentlyThere)) { + removeCount ++; + removeList.add(filesCurrentlyThere); + } + } + rebalanceString += removeCount; + for (String filesToRemove : removeList) { + rebalanceString += " " + filesToRemove; + } + + storedFiles = updated.get(port); + messageOutput.println(rebalanceString); + + } + + + @Override + public void run() { + new Thread(new Runnable() { + @Override + public void run() { + try { + Thread.sleep(100); + startRebalance(); + } catch (Exception e) { + e.printStackTrace(); + } + } + }).start(); + String line; + //Listen for new inputs from DStore + try { + while (true) { + line = messageInput.readLine(); + if (line == null) { + if (socket.getInputStream().read() == -1) { + lostConnection(); + break; + } + } else { + String command = line.split(" ")[0]; + if (command.equals("STORE_ACK")) { + storeACK(line); + } else if (command.equals("REMOVE_ACK")) { + removeACK(line); + } else if (command.equals("REBALANCE_ACK")) { + rebalanceACK(line); + } else if (command.equals("ERROR_FILE_DOES_NOT_EXIST")) { + controller.messageClientThreads("REMOVE_ACK " + line.split(" ")[1]); + } else if (command.equals("LIST")) { + listResponse = line; + } else { + System.out.println("DSTORE:" + port + " received unfamiliar command: " + line); + } + } + } + } catch (SocketException e) { + lostConnection(); + } catch (IOException e) { + e.printStackTrace(); + } + } + + //function to deal with storeACK from DStore + private void storeACK(String line) { + String file = line.split(" ")[1]; + storedFiles.add(file); + controller.messageClientThreads(line); + } + + //function to deal with removeACK from DStore + private void removeACK(String line) { + String file = line.split(" ")[1]; + storedFiles.remove(file); + controller.messageClientThreads(line); + } + + //function to deal with rebalanceACK from DStore + private void rebalanceACK(String line) { + ; + } + + //to get messages from Controller to communicate between threads + protected void receiveMessage(String message) { + if (message.split(" ")[0].equals("REMOVE")) { + messageOutput.println(message); + } + } + + protected ArrayList<String> getFiles() { + return this.storedFiles; + } + + private void lostConnection() { + runningThreads.remove(this); + storePorts.remove(Integer.valueOf(port)); + System.out.println("Connection lost with DSTORE:" + port); + //remove port from index + for (Index i : index) { + if (i.getStores().contains(port)) { + i.getStores().remove(Integer.valueOf(port)); + } + } + } + + + } + + + + + + + +} diff --git a/Dstore.java b/Dstore.java new file mode 100644 index 0000000000000000000000000000000000000000..4ffc7f0e31ac097a028916efb1fff976db5416c5 --- /dev/null +++ b/Dstore.java @@ -0,0 +1,485 @@ +import java.net.*; +import java.io.*; +import java.util.ArrayList; +import java.util.Arrays; + + +//todo: set timeout on store and rebalance +//todo: Check why remove is not sending an ack +//todo: check for malformed messages + +public class Dstore { + + private int port; + private int cport; + private int timeout; + private File directory; + + private ControllerCommunicationThread controllerCommunication; + private ArrayList<ClientCommunicationThread> clientCommunication; + + public static void main(String[] args) { + if (args.length != 4) { + System.err.println("Invalid number of arguments"); + } else{ + int port = Integer.parseInt(args[0]); + int cport = Integer.parseInt(args[1]); + int timeout = Integer.parseInt(args[2]); + String fileFolder = args[3]; + Dstore dstore = new Dstore(port, cport, timeout, fileFolder); + } + } + + public Dstore(int p, int c, int t, String f) { + this.port = p; + this.cport = c; + this.timeout = t; + createDirectory(f); + + + new Thread(new WaitForConnections(this)).start(); + + this.clientCommunication = new ArrayList<>(); + controllerCommunication = new ControllerCommunicationThread(); + controllerCommunication.run(); + + } + + protected void messageController(String message) { + controllerCommunication.sendMessage(message); + } + + private void createDirectory(String file_folder) { + File folder = new File(file_folder); + emptyDirectory(folder); + this.directory = folder; + } + + private void emptyDirectory(File folder) { + if (! folder.exists()) { + folder.mkdir(); + } else { + if (folder.listFiles() != null) { + for (File f : folder.listFiles()) { + if (f.isDirectory()) { + emptyDirectory(f); + } else { + f.delete(); + } + } + } + } + } + + protected void addClientListener(ClientCommunicationThread c) { + this.clientCommunication.add(c); + } + + + private class WaitForConnections implements Runnable { + + private Dstore dstore; + + public WaitForConnections(Dstore d) { + this.dstore = d; + } + + @Override + public void run() { + ServerSocket serverSocket = null; + try { + serverSocket = new ServerSocket(port); + } catch (IOException e) { + errorMessage("Problem setting up server socket"); + } + + while (true) { + try { + Socket socket = serverSocket.accept(); + new ClientCommunicationThread(this.dstore, socket).start(); + } catch (IOException e) { + errorMessage("Problem accepting connection"); + } + } + } + } + + protected void errorMessage(String message) { + System.err.println(message + " | DSTORE port: " + port); + } + + public class ControllerCommunicationThread extends Thread { + + private InetAddress localAddress; + + private PrintWriter messageOutput; + private BufferedReader messageInput; + + public void run() { + //Attempt to connect to controller + Socket socket = null; + try { + InetAddress localAddress = InetAddress.getLocalHost(); + socket = new Socket(localAddress, cport); + } catch (Exception e) { + errorMessage("Error connecting to controller"); + e.printStackTrace(); + } + + //Setup outbound connection + messageOutput = null; + try { + messageOutput = new PrintWriter(socket.getOutputStream(), true); + } catch (IOException e) { + errorMessage("Error with messageOutput to controller"); + } + + //Setup inbound connection + messageInput = null; + try { + messageInput = new BufferedReader(new InputStreamReader(socket.getInputStream())); + } catch (IOException e) { + errorMessage("Error with messageInput from controller"); + } + + //Send port to controller to identify DStore + messageOutput.println("JOIN " + port); + + try { + String line; + while ((line = messageInput.readLine()) != null) { + if (line.split(" ")[0].equals("REMOVE")) { + removeOperation(line); + } else if (line.split(" ")[0].equals("LIST")) { + listOperation(line); + } else if (line.split(" ")[0].equals("REBALANCE")) { + rebalanceOperation(line); + } + } + } catch (Exception e) { + errorMessage("Error with message from controller"); + } + + } + + private void rebalanceOperation(String line) { + System.out.println("\nREBALANCE OPERATION STARTED IN DSTORE: " + port); + System.out.println(line); + ArrayList<String> splitLine = new ArrayList<>(Arrays.asList(line.split(" "))); + int numToSend = Integer.parseInt(splitLine.get(1)); + String filename = null; + //REBALANCE 1 test2.txt 1 5004 1 test2.txt + int stringIterate = 2; + for (int i = 0; i < numToSend; i++) { + filename = splitLine.get(stringIterate); //test.txt + stringIterate ++; //3 + int numOfStores = Integer.parseInt(splitLine.get(stringIterate)); //1 + stringIterate ++; + for (int j = 0; j < numOfStores; j++) { + int port = Integer.parseInt(splitLine.get(stringIterate)); + new RebalanceSending(port, filename); + stringIterate ++; + } + } + int removeCount = Integer.parseInt(splitLine.get(stringIterate)); + stringIterate ++; + for (int r = 0; r<removeCount; r++) { + filename = splitLine.get(removeCount); + deleteFile(filename); + } + + messageOutput.println("REBALANCE_COMPLETE"); + } + + private void deleteFile(String filename) { + File fileToRemove = new File(directory.getAbsolutePath() + "/" + filename); + if (fileToRemove.exists()) { + fileToRemove.delete(); + } else { + errorMessage("Can't delete file [" + filename + "] as does not exist"); + } + } + + private void listOperation(String line) { + String outputString = "LIST"; + File[] listOfFiles = directory.listFiles(); + for (File f : listOfFiles) { + String filename = f.getName(); + outputString += " " + filename; + } + messageOutput.println(outputString); + } + + private void removeOperation(String line) { + String filename = null; + try { + filename = line.split(" ")[1]; + } catch (Exception e) { + errorMessage("Can't extract filename for remove"); + } + + //Check if file already exists + File fileToRemove = new File(directory.getAbsolutePath() + "/" + filename); + if (fileToRemove.exists()) { + fileToRemove.delete(); + sendMessage("REMOVE_ACK " + filename); + } else { + sendMessage("ERROR_FILE_DOES_NOT_EXIST " + filename); + } + } + + //function to send messages from other thread + protected void sendMessage(String message) { + messageOutput.println(message); + } + + } + + public class RebalanceSending extends Thread { + private Socket socket; + private BufferedReader messageInput; + private PrintWriter messageOutput; + + private int toPort; + private String filename; + + public RebalanceSending(int toPort, String filename) { + this.toPort = toPort; + this.filename = filename; + run(); + } + + @Override + public void run() { + try { + Thread.sleep(100); + socket = new Socket(InetAddress.getLocalHost(), toPort); + } catch (Exception e) { + errorMessage("Problem setting up connection with dstore: " + port); + } + + messageInput = null; + messageOutput = null; + try { + messageInput = new BufferedReader(new InputStreamReader(socket.getInputStream())); + } catch (Exception e) { + errorMessage("Can't establish message input in client communication"); + } + + try { + messageOutput = new PrintWriter(socket.getOutputStream(), true); + } catch (Exception e) { + errorMessage("Can't establish message output in client communication"); + } + + + File file = new File(directory.getAbsolutePath() + "/" + filename); + String message = "REBALANCE_STORE " + filename + " " + file.length(); + messageOutput.println(message); + + try { + String line; + while ((line = messageInput.readLine()) != null) { + if (line.equals("ACK")) { + break; + } + } + } catch (Exception e) { + System.out.println("Problem getting rebalance ack"); + } + + if (!file.exists()) { + try { + socket.close(); + } catch (IOException e) { + e.printStackTrace(); + } + } else { + try { + OutputStream fileOutput = socket.getOutputStream(); + InputStream fileInput = new FileInputStream(file); + byte[] bytes; + bytes = fileInput.readNBytes((int) file.length()); + fileOutput.write(bytes); + } catch (IOException e) { + errorMessage("Error sending file"); + } + } + } + + } + + public class ClientCommunicationThread extends Thread { + + private Dstore dstore; + private Socket socket; + + private BufferedReader messageInput; + private PrintWriter messageOutput; + + public ClientCommunicationThread(Dstore d, Socket s) { + this.dstore = d; + this.socket = s; + this.dstore.addClientListener(this); + } + + public void run() { + messageInput = null; + messageOutput = null; + try { + messageInput = new BufferedReader(new InputStreamReader(socket.getInputStream())); + } catch (Exception e) { + errorMessage("Can't establish message input in client communication"); + } + + try { + messageOutput = new PrintWriter(socket.getOutputStream(), true); + } catch (Exception e) { + errorMessage("Can't establish message output in client communication"); + } + + + String line = null; + try { + while ((line = messageInput.readLine()) != null) { + String[] splitLine = line.split(" "); + String command = splitLine[0]; + if (command.equals("STORE")) { + storeOperation(line); + } else if (command.equals("LOAD_DATA")) { + loadOperation(line); + } else if (command.equals("REBALANCE_STORE")) { + rebalanceOperation(line); + } else { + System.out.println("DSTORE received unfamiliar command: " + command); + } + break; + } + } catch (Exception e) { + errorMessage("Error reading message in client communication"); + e.printStackTrace(); + } + } + + private void rebalanceOperation(String line) { + String filename = ""; + int filesize = 0; + try { + filename = line.split(" ")[1]; + filesize = Integer.parseInt(line.split(" ")[2]); + } catch (Exception e) { + errorMessage("Error splitting store command"); + this.interrupt(); + } + try { + messageOutput = new PrintWriter(socket.getOutputStream(), true); + messageOutput.println("ACK"); + } catch (Exception e) { + System.err.println("Error establishing output in rebalance operation"); + } + + OutputStream fileOutput = null; + try { + File outputFile = new File(directory.getAbsolutePath()+"/"+filename); + outputFile.createNewFile(); + outputFile.setWritable(true); + fileOutput = new FileOutputStream(outputFile); + } catch (Exception e) { + errorMessage("Error creating and writing to file"); + } + + InputStream fileInput = null; + try { + fileInput = socket.getInputStream(); + } catch (IOException e) { + errorMessage("Error establishing inbound file input"); + } + + try { + byte[] bytes; + bytes = fileInput.readNBytes(filesize); + fileOutput.write(bytes); + fileOutput.flush(); + } catch (IOException e) { + errorMessage("error reading data and writing to file"); + e.printStackTrace(); + } + + } + + private void loadOperation(String line) { + String filename = null; + try { + filename = line.split(" ")[1]; + } catch (IndexOutOfBoundsException e) { + errorMessage("Malformed load message"); + } + + File file = new File(directory.getAbsolutePath() + "/" + filename); + if (!file.exists()) { + try { + socket.close(); + } catch (IOException e) { + e.printStackTrace(); + } + } else { + try { + OutputStream fileOutput = socket.getOutputStream(); + InputStream fileInput = new FileInputStream(file); + byte[] bytes; + bytes = fileInput.readNBytes((int) file.length()); + fileOutput.write(bytes); + } catch (IOException e) { + errorMessage("Error sending file"); + } + } + } + + private void storeOperation(String line) { + String filename = null; + int filesize = 0; + try { + filename = line.split(" ")[1]; + filesize = Integer.parseInt(line.split(" ")[2]); + } catch (Exception e) { + errorMessage("Error splitting store command"); + this.interrupt(); + } + + try { + messageOutput = new PrintWriter(socket.getOutputStream(), true); + messageOutput.println("ACK"); + } catch (Exception e) { + System.err.println("Error establishing output in store operation"); + } + + OutputStream fileOutput = null; + try { + File outputFile = new File(directory.getAbsolutePath()+"/"+filename); + outputFile.createNewFile(); + outputFile.setWritable(true); + fileOutput = new FileOutputStream(outputFile); + } catch (Exception e) { + errorMessage("Error creating and writing to file"); + } + + InputStream fileInput = null; + try { + fileInput = socket.getInputStream(); + } catch (IOException e) { + errorMessage("Error establishing inbound file input"); + } + + try { + byte[] bytes; + bytes = fileInput.readNBytes(filesize); + fileOutput.write(bytes); + fileOutput.flush(); + } catch (IOException e) { + errorMessage("error reading data and writing to file"); + e.printStackTrace(); + } + controllerCommunication.sendMessage("STORE_ACK " + filename); + } + } +} diff --git a/Index.java b/Index.java new file mode 100644 index 0000000000000000000000000000000000000000..423c07fba8c4af1f8f754e898ab15416390fa8f8 --- /dev/null +++ b/Index.java @@ -0,0 +1,42 @@ +import java.util.ArrayList; + +public class Index { + + private String filename; + private String status; + private int filesize; + private ArrayList<Integer> stores = new ArrayList<Integer>(); + + public Index(String filename, String status, int filesize) { + this.filename = filename; + this.status = status; + this.filesize = filesize; + } + + public void setFilename(String filename) { + this.filename = filename; + } + + public void setStatus(String status) { + this.status = status; + } + + public String getFilename() { + return filename; + } + + public String getStatus() { + return status; + } + + public int getFilesize() {return filesize;} + + public ArrayList<Integer> getStores() { + return this.stores; + } + + public void setStores(ArrayList<Integer> stores) { + this.stores = stores; + } + +}