Skip to content
Snippets Groups Projects
Commit 50849405 authored by tmp1u19's avatar tmp1u19 :octopus:
Browse files

Fix some bugs and add some rebalancing operations

parent 93d1175c
Branches
No related tags found
No related merge requests found
import java.io.*;
import java.net.*;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
......@@ -9,14 +10,14 @@ public class Dstore {
private final int cport;
private final int timeout;
private final String file_folder;
private final Set<String> files;
public static Set<String> files;
public Dstore(int port, int cport, int timeout, String file_folder) {
this.port = port;
this.cport = cport;
this.timeout = timeout;
this.file_folder = file_folder;
this.files = new HashSet<>();
this.files = Collections.synchronizedSet(new HashSet<>());
}
public int getPort() {
......@@ -35,10 +36,14 @@ public class Dstore {
return file_folder;
}
public Set<String> getFiles() {
synchronized public Set<String> getFiles() {
return files;
}
synchronized public void addFile(String filename) {
getFiles().add(filename);
}
public void listenClient(Socket socket) {
try {
......@@ -112,10 +117,11 @@ public class Dstore {
FileOutputStream o = new FileOutputStream(dstore.getFile_folder() +
"/" + filename);
o.write(data);
dstore.addFile(filename);
finished = true;
o.flush();
o.close();
dstore.getFiles().add(filename);
finished = true;
}
if(!finished) {
......@@ -124,7 +130,7 @@ public class Dstore {
}
PrintWriter r = new PrintWriter(new OutputStreamWriter(controllerSocket.getOutputStream()));
r.println("STORE_ACK " + filename + " " + dstore.getPort());
r.println("STORE_ACK " + filename);
r.flush();
} catch (IndexOutOfBoundsException e) {
......@@ -207,6 +213,14 @@ public class Dstore {
} catch(IndexOutOfBoundsException e) {
System.out.println("Arguments don't match in REMOVE opretaion");
}
} else if (command.equals("LIST")) {
String file_list = "";
for(String filename : dstore.getFiles()) {
file_list = filename + " " + file_list;
}
res.println("LIST " + file_list);
res.flush();
} else {
System.out.println("Unknown command");
}
......
......@@ -8,10 +8,27 @@ public class Server {
private final Controller controller;
public static FileSystem fileSystem;
public static int pos;
public static boolean rebalancing;
Timer timer;
public Server(Controller controller, FileSystem fileSystem) {
this.controller = controller;
Server.fileSystem = fileSystem;
rebalancing = false;
// do the rebalancing operation every rebalance_period seconds with a delay of rebalance_period seconds
timer = new Timer();
TimerTask timerTask = new TimerTask() {
@Override
public void run() {
try {
new Rebalance(fileSystem, controller).handle();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
};
timer.schedule(timerTask, controller.getRebalance_period(), controller.getRebalance_period());
}
public void handleClient(Socket client) throws IOException {
......@@ -30,10 +47,23 @@ public class Server {
String command = tokens[0];
if (command.equals("JOIN")) {
int publicPort = Integer.parseInt(tokens[1]);
System.out.println("Dstore wants to connect");
FSStore fsStore = new FSStore(client, publicPort);
fileSystem.addDstore(client.getPort(), fsStore);
rebalancing = true;
new Thread() {
@Override
public void run() {
try {
new Rebalance(fileSystem, controller).handle();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}.start();
} else if (fileSystem.getDstores().size() < controller.getR()) {
res.println("ERROR_NOT_ENOUGH_DSTORES");
res.flush();
......@@ -43,10 +73,27 @@ public class Server {
handleStoreACK(tokens);
} else if (command.equals("REMOVE_ACK")) {
handleRemoveACK(tokens);
} else if(command.equals("LIST")) {
// get the files of the dstore when rebalancing
List<String> filenames = new ArrayList<>();
for(int i = 1; i < tokens.length; i++) {
filenames.add(tokens[i]);
}
fileSystem.addRebalancingList(fileSystem.
getDstores().
get(client.getPort()).
getPublicPort(),
filenames);
} else if (command.equals("REBALANCE_COMPLETE")) {
rebalancing = false;
System.out.println("Done rebalancing");
} else if (command.equals("ERROR_FILE_DOES_NOT_EXIST")) {
System.out.println(command + " " + tokens[1]);
} else {
System.out.println("Unknown command");
}
} else {
} else if (!rebalancing) {
switch (command) {
case "STORE" -> handleStore(client, tokens);
case "LOAD" -> handleLoad(client, tokens);
......@@ -55,6 +102,8 @@ public class Server {
case "LIST" -> handleList(client);
default -> System.out.println("Unknown command");
}
} else if (rebalancing) {
System.out.println("Rebalancing...");
}
}
}
......@@ -85,17 +134,19 @@ public class Server {
String msg = "";
int i = 0;
List<FSStore> temp = new ArrayList<>();
List<FSStore> sorted = new ArrayList<>(fileSystem.getDstores().values());
Collections.sort(sorted);
for(int port : fileSystem.getDstores().keySet()) {
for(FSStore fsStore : sorted) {
if(i == controller.getR()) {
break;
}
// in case all the ACK are received keep in memory all the dstores
temp.add(fileSystem.getDstores().get(port));
temp.add(fsStore);
// construct the message
msg = fileSystem.getDstores().get(port).getPublicPort() + " " + msg;
msg = fsStore.getPublicPort() + " " + msg;
i++;
}
......@@ -111,6 +162,11 @@ public class Server {
done = true;
fileSystem.addIndex(filename, "store complete");
fileSystem.addStore(filename, temp);
for(FSStore fsStore : temp) {
fsStore.getFiles().add(filename);
}
fileSystem.addFileSize(filename, filesize);
res.println("STORE_COMPLETE");
res.flush();
......@@ -193,6 +249,7 @@ public class Server {
for(FSStore fsStore : fileSystem.getStore().get(filename)) {
System.out.println("Sending to Dstore");
fsStore.getFiles().remove(filename);
PrintWriter res = fsStore.getOutput();
res.println("REMOVE " + filename);
res.flush();
......@@ -214,6 +271,7 @@ public class Server {
}
if(!done) {
fileSystem.removeIndex(filename);
System.out.println(filename + " failed to remove");
}
......@@ -233,16 +291,15 @@ public class Server {
res.flush();
}
private void handleStoreACK(String[] tokens) {
private void handleStoreACK(String[] tokens) throws IOException {
String filename = tokens[1];
int dstorePort = Integer.parseInt(tokens[2]);
if(fileSystem.getStore().containsKey(filename)) {
fileSystem.getStore().get(filename).add(fileSystem.getDstores().get(dstorePort));
fileSystem.getStore().get(filename).add(null);
} else {
List<FSStore> d = new ArrayList<>();
d.add(fileSystem.getDstores().get(dstorePort));
d.add(null);
Server.fileSystem.addStore(filename, d);
}
......@@ -256,6 +313,7 @@ public class Server {
break;
}
}
}
fileSystem.getFileSizes().remove(filename);
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment