diff --git a/Controller.java b/Controller.java index 7c79f4b145f642168b5ba37b452d33aa19dc2715..023336fcdbdfd03fe588b477efc338eb43cec645 100644 --- a/Controller.java +++ b/Controller.java @@ -44,6 +44,7 @@ public class Controller { private int rebalance_period; private Controller controller; + private AtomicBoolean rebalanceRunning = new AtomicBoolean(false); private List<Integer> storePorts; private List<Thread> runningThreads; @@ -51,13 +52,11 @@ public class Controller { private Hashtable<String, ArrayList<Integer>> reloadStores; - Object clientLock = new Object(); + final Object clientLock = new Object(); Object concurrentOperationLock = new Object(); private final ScheduledExecutorService rebalanceScheduler; - private AtomicBoolean rebalanceLock = new AtomicBoolean(false); - public static void main(String[] args) { if (args.length != 4) { System.err.println("Invalid number of arguments"); @@ -103,20 +102,6 @@ public class Controller { return floor; } - private int getCeilPlusOne() { - int numberOfStoredFiles = 0; - - for (Index i : index) { - if (i.getStatus().equals("store complete")) { - numberOfStoredFiles += 1; - } - } - - numberOfStoredFiles += 1; - int ceil = (int) Math.ceil((r * numberOfStoredFiles) / (float) storePorts.size()); - return ceil; - } - private int getNumberOfStoredFiles() { int numberOfStoredFiles = 0; for (Index i : index) { @@ -148,8 +133,17 @@ public class Controller { //This is synchronized so two rebalance operations can't happen simultaneously synchronized (concurrentOperationLock) { + rebalanceRunning.set(true); + try { + //sleep to ensure all running processes finish + Thread.sleep(500); + } catch (InterruptedException e) { + e.printStackTrace(); + } + if (getNumberOfStoredFiles() == 0 || storePorts.size() < r) { System.out.println("Rebalance Ended - not enough stores or files"); + rebalanceRunning.set(false); return; } @@ -277,7 +271,6 @@ public class Controller { fileToRemove = null; for (String file : portsThatNeedFiles.keySet()) { if (filesAlreadyInPort.contains(file)) { - System.out.println("port: " + port + " contains: " + file); Pair fileSwitch = new Pair(file, portsThatNeedFiles.get(file)); fileToRemove = file; outputArray.add(fileSwitch); @@ -301,13 +294,8 @@ public class Controller { } } - System.out.println("15 SECOND SLEEP RUN STORE NOW"); - try { - Thread.sleep(15000); - } catch (Exception e) { - e.printStackTrace(); - } + rebalanceRunning.set(false); System.out.println("REBALANCE FINISHED: " + newFilesStored); } @@ -535,12 +523,13 @@ public class Controller { boolean exists = false; int filesize = 0; - + ArrayList<Integer> stores = new ArrayList<>(); synchronized (clientLock) { for (Index i : index) { if (i.getFilename().equals(filename) && i.getStatus().equals("store complete")) { exists = true; filesize = i.getFilesize(); + stores.addAll(i.getStores()); } } } @@ -552,12 +541,6 @@ public class Controller { return; } - //Get List of DStores in case of failure - ArrayList<Integer> stores = new ArrayList<>(); - for (int n : storePorts) { - stores.add(n); - } - //if all DStores don't work Random random = new Random(); int storeIndex = random.nextInt(stores.size()); @@ -604,6 +587,9 @@ public class Controller { } private void removeOperation(String message) { + while (rebalanceRunning.get()) { + ; + } //split message to get filename String filename = null; try { @@ -612,8 +598,6 @@ public class Controller { System.err.println("Error getting filename to remove"); } - System.out.println("Remove operation started for file: " + filename); - boolean exists = false; synchronized (clientLock) { for (Index i : index) { @@ -625,21 +609,18 @@ public class Controller { } else if (i.getStatus().equals("store_in_progress")) { messageOutput.println("ERROR_FILE_DOES_NOT_EXIST"); return; + } else if (i.getStatus().equals("store complete")) { + i.setStatus("remove in progress"); } } } } + if (!exists) { messageOutput.println("ERROR_FILE_DOES_NOT_EXIST"); return; } - try { - updateIndex(filename, "remove in progress", 0); - } catch (Exception e) { - System.err.println(e.getMessage()); - } - //Send message to all stores to remove messageStoreThreads(message); @@ -661,6 +642,9 @@ public class Controller { private void storeOperation(String line) { //First split input to get filename and filesize + while (rebalanceRunning.get()) { + ; + } storeACkStores = new ArrayList<>(); String filename = null; int filesize = 0; @@ -672,7 +656,6 @@ public class Controller { } System.out.println("Starting store operation for file: " + filename); synchronized (clientLock) { - System.out.println("Synchronized section"); for (Index i : index) { System.out.println("Index: " + i.getFilename() + " " + i.getStatus()); if (i.getFilename().equals(filename)) { @@ -683,7 +666,6 @@ public class Controller { } try { updateIndex(filename, "store in progress", filesize); - System.out.println("index updated"); } catch (Exception e) { System.err.println("error updating index"); } @@ -711,6 +693,7 @@ public class Controller { rCount ++; } + messageOutput.println(portString); storeACK = new AtomicBoolean(false); @@ -764,6 +747,7 @@ public class Controller { } } else if (! storeACK.get()){ updateIndex(storeACKFilename, "remove", 0); + storeACKFilename = null; } } catch (Exception e) { e.printStackTrace(); @@ -771,25 +755,27 @@ public class Controller { } private void updateIndex(String filename, String update, int filesize){ - if (update.equals("store in progress")) { - Index newIndex = new Index(filename, update, filesize); - index.add(newIndex); - } else if (update.equals("remove")) { - Iterator indexIterator = index.listIterator(); - Index toRemove = null; - while (indexIterator.hasNext()) { - Index i = (Index) indexIterator.next(); - if (i.getFilename().equals(filename)) { - toRemove = i; + synchronized (clientLock) { + if (update.equals("store in progress")) { + Index newIndex = new Index(filename, update, filesize); + index.add(newIndex); + } else if (update.equals("remove")) { + Iterator indexIterator = index.listIterator(); + Index toRemove = null; + while (indexIterator.hasNext()) { + Index i = (Index) indexIterator.next(); + if (i.getFilename().equals(filename)) { + toRemove = i; + } } - } - if (toRemove != null) { - index.remove(toRemove); - } - } else if (update.equals("store complete")) { - for (Index i : index) { - if (i.getFilename().equals(filename)) { - i.setStatus(update); + if (toRemove != null) { + index.remove(toRemove); + } + } else if (update.equals("store complete")) { + for (Index i : index) { + if (i.getFilename().equals(filename)) { + i.setStatus(update); + } } } }