Skip to content
Snippets Groups Projects
Commit ee59b3ab authored by ewrh1g20's avatar ewrh1g20
Browse files

updated controller so processes won't start while its rebalancing

parent ad4c0e95
No related branches found
No related tags found
No related merge requests found
......@@ -44,6 +44,7 @@ public class Controller {
private int rebalance_period;
private Controller controller;
private AtomicBoolean rebalanceRunning = new AtomicBoolean(false);
private List<Integer> storePorts;
private List<Thread> runningThreads;
......@@ -51,13 +52,11 @@ public class Controller {
private Hashtable<String, ArrayList<Integer>> reloadStores;
Object clientLock = new Object();
final Object clientLock = new Object();
Object concurrentOperationLock = new Object();
private final ScheduledExecutorService rebalanceScheduler;
private AtomicBoolean rebalanceLock = new AtomicBoolean(false);
public static void main(String[] args) {
if (args.length != 4) {
System.err.println("Invalid number of arguments");
......@@ -103,20 +102,6 @@ public class Controller {
return floor;
}
private int getCeilPlusOne() {
int numberOfStoredFiles = 0;
for (Index i : index) {
if (i.getStatus().equals("store complete")) {
numberOfStoredFiles += 1;
}
}
numberOfStoredFiles += 1;
int ceil = (int) Math.ceil((r * numberOfStoredFiles) / (float) storePorts.size());
return ceil;
}
private int getNumberOfStoredFiles() {
int numberOfStoredFiles = 0;
for (Index i : index) {
......@@ -148,8 +133,17 @@ public class Controller {
//This is synchronized so two rebalance operations can't happen simultaneously
synchronized (concurrentOperationLock) {
rebalanceRunning.set(true);
try {
//sleep to ensure all running processes finish
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
if (getNumberOfStoredFiles() == 0 || storePorts.size() < r) {
System.out.println("Rebalance Ended - not enough stores or files");
rebalanceRunning.set(false);
return;
}
......@@ -277,7 +271,6 @@ public class Controller {
fileToRemove = null;
for (String file : portsThatNeedFiles.keySet()) {
if (filesAlreadyInPort.contains(file)) {
System.out.println("port: " + port + " contains: " + file);
Pair fileSwitch = new Pair(file, portsThatNeedFiles.get(file));
fileToRemove = file;
outputArray.add(fileSwitch);
......@@ -301,13 +294,8 @@ public class Controller {
}
}
System.out.println("15 SECOND SLEEP RUN STORE NOW");
try {
Thread.sleep(15000);
} catch (Exception e) {
e.printStackTrace();
}
rebalanceRunning.set(false);
System.out.println("REBALANCE FINISHED: " + newFilesStored);
}
......@@ -535,12 +523,13 @@ public class Controller {
boolean exists = false;
int filesize = 0;
ArrayList<Integer> stores = new ArrayList<>();
synchronized (clientLock) {
for (Index i : index) {
if (i.getFilename().equals(filename) && i.getStatus().equals("store complete")) {
exists = true;
filesize = i.getFilesize();
stores.addAll(i.getStores());
}
}
}
......@@ -552,12 +541,6 @@ public class Controller {
return;
}
//Get List of DStores in case of failure
ArrayList<Integer> stores = new ArrayList<>();
for (int n : storePorts) {
stores.add(n);
}
//if all DStores don't work
Random random = new Random();
int storeIndex = random.nextInt(stores.size());
......@@ -604,6 +587,9 @@ public class Controller {
}
private void removeOperation(String message) {
while (rebalanceRunning.get()) {
;
}
//split message to get filename
String filename = null;
try {
......@@ -612,8 +598,6 @@ public class Controller {
System.err.println("Error getting filename to remove");
}
System.out.println("Remove operation started for file: " + filename);
boolean exists = false;
synchronized (clientLock) {
for (Index i : index) {
......@@ -625,21 +609,18 @@ public class Controller {
} else if (i.getStatus().equals("store_in_progress")) {
messageOutput.println("ERROR_FILE_DOES_NOT_EXIST");
return;
} else if (i.getStatus().equals("store complete")) {
i.setStatus("remove in progress");
}
}
}
}
if (!exists) {
messageOutput.println("ERROR_FILE_DOES_NOT_EXIST");
return;
}
try {
updateIndex(filename, "remove in progress", 0);
} catch (Exception e) {
System.err.println(e.getMessage());
}
//Send message to all stores to remove
messageStoreThreads(message);
......@@ -661,6 +642,9 @@ public class Controller {
private void storeOperation(String line) {
//First split input to get filename and filesize
while (rebalanceRunning.get()) {
;
}
storeACkStores = new ArrayList<>();
String filename = null;
int filesize = 0;
......@@ -672,7 +656,6 @@ public class Controller {
}
System.out.println("Starting store operation for file: " + filename);
synchronized (clientLock) {
System.out.println("Synchronized section");
for (Index i : index) {
System.out.println("Index: " + i.getFilename() + " " + i.getStatus());
if (i.getFilename().equals(filename)) {
......@@ -683,7 +666,6 @@ public class Controller {
}
try {
updateIndex(filename, "store in progress", filesize);
System.out.println("index updated");
} catch (Exception e) {
System.err.println("error updating index");
}
......@@ -711,6 +693,7 @@ public class Controller {
rCount ++;
}
messageOutput.println(portString);
storeACK = new AtomicBoolean(false);
......@@ -764,6 +747,7 @@ public class Controller {
}
} else if (! storeACK.get()){
updateIndex(storeACKFilename, "remove", 0);
storeACKFilename = null;
}
} catch (Exception e) {
e.printStackTrace();
......@@ -771,25 +755,27 @@ public class Controller {
}
private void updateIndex(String filename, String update, int filesize){
if (update.equals("store in progress")) {
Index newIndex = new Index(filename, update, filesize);
index.add(newIndex);
} else if (update.equals("remove")) {
Iterator indexIterator = index.listIterator();
Index toRemove = null;
while (indexIterator.hasNext()) {
Index i = (Index) indexIterator.next();
if (i.getFilename().equals(filename)) {
toRemove = i;
synchronized (clientLock) {
if (update.equals("store in progress")) {
Index newIndex = new Index(filename, update, filesize);
index.add(newIndex);
} else if (update.equals("remove")) {
Iterator indexIterator = index.listIterator();
Index toRemove = null;
while (indexIterator.hasNext()) {
Index i = (Index) indexIterator.next();
if (i.getFilename().equals(filename)) {
toRemove = i;
}
}
}
if (toRemove != null) {
index.remove(toRemove);
}
} else if (update.equals("store complete")) {
for (Index i : index) {
if (i.getFilename().equals(filename)) {
i.setStatus(update);
if (toRemove != null) {
index.remove(toRemove);
}
} else if (update.equals("store complete")) {
for (Index i : index) {
if (i.getFilename().equals(filename)) {
i.setStatus(update);
}
}
}
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment