Skip to content
Snippets Groups Projects
Commit af474d34 authored by tmp1u19's avatar tmp1u19 :octopus:
Browse files

Add one of the final projects

parent 14269f01
Branches
No related tags found
No related merge requests found
import java.io.*;
import java.net.*;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import java.util.*;
public class Dstore {
......@@ -10,14 +8,20 @@ public class Dstore {
private final int cport;
private final int timeout;
private final String file_folder;
public static Set<String> files;
public Set<String> files;
public Map<String, Integer> fileSizes;
public Dstore(int port, int cport, int timeout, String file_folder) {
this.port = port;
this.cport = cport;
this.timeout = timeout;
this.file_folder = file_folder;
this.files = Collections.synchronizedSet(new HashSet<>());
files = new HashSet<>();
fileSizes = new HashMap<>();
}
public Map<String, Integer> getFileSizes() {
return fileSizes;
}
public int getPort() {
......@@ -122,6 +126,7 @@ public class Dstore {
o.flush();
o.close();
dstore.getFiles().add(filename);
dstore.getFileSizes().put(filename, filesize);
}
if(!finished) {
......@@ -165,8 +170,31 @@ public class Dstore {
res.flush();
}
}
break;
case "REBALANCE_STORE" :
res.println("ACK");
res.flush();
String filename = tokens[1];
int filesize = Integer.parseInt(tokens[2]);
byte[] data = new byte[filesize];
in.readNBytes(data, 0, filesize);
FileOutputStream o = new FileOutputStream(dstore.getFile_folder() +
"/" + filename);
o.write(data);
dstore.addFile(filename);
o.flush();
o.close();
dstore.getFiles().add(filename);
dstore.getFileSizes().put(filename, filesize);
PrintWriter r = new PrintWriter(new OutputStreamWriter(controllerSocket.getOutputStream()));
r.println("REBALANCE_COMPLETE");
r.flush();
default:
System.out.println("Unknown command");
break;
......@@ -201,6 +229,7 @@ public class Dstore {
File file = new File(dstore.getFile_folder() + "/" + filename);
file.delete();
dstore.getFiles().remove(filename);
dstore.getFileSizes().remove(filename);
res.println("REMOVE_ACK " + filename);
} else {
res.println("ERROR_FILE_DOES_NOT_EXIST " + filename);
......@@ -212,6 +241,7 @@ public class Dstore {
System.out.println("Arguments don't match in REMOVE opretaion");
}
} else if (command.equals("LIST")) {
String file_list = "";
for(String filename : dstore.getFiles()) {
......@@ -219,6 +249,87 @@ public class Dstore {
}
res.println("LIST " + file_list);
res.flush();
} else if (command.equals("REBALANCE")) {
// show the distribution of files
for(int i = 0; i < tokens.length; i++) {
System.out.print(tokens[i] + " ");
}
System.out.println();
// send the messages to each dstore
int number_of_files_to_send = Integer.parseInt(tokens[1]);
int pos = 2;
for(int i = 0; i < number_of_files_to_send; i++) {
String filename = tokens[pos];
pos++;
int filesize = Integer.parseInt(tokens[pos]);
pos++;
int size = Integer.parseInt(tokens[pos]);
pos++;
for(int j = pos; j < pos + size; j++) {
int finalJ = j;
int port = Integer.parseInt(tokens[finalJ]);
Socket socket = new Socket();
socket.connect(new InetSocketAddress("localhost", port));
new Thread() {
@Override
public void run() {
try {
PrintWriter re = new PrintWriter(new OutputStreamWriter(socket.getOutputStream()));
BufferedReader rs = new BufferedReader(new InputStreamReader(socket.getInputStream()));
re.println("REBALANCE_STORE " + filename + " " + filesize);
re.flush();
String l = rs.readLine();
long limit = System.currentTimeMillis() + dstore.getTimeout();
boolean done = false;
while(!done && System.currentTimeMillis() < limit) {
if(l.equals("ACK")) {
done = true;
// send the file content
File file = new File(dstore.getFile_folder() + "/" + filename);
byte[] data = new byte[filesize];
FileInputStream fis = new FileInputStream(file);
BufferedInputStream bis = new BufferedInputStream(fis);
bis.read(data, 0, data.length);
OutputStream os = null;
os = socket.getOutputStream();
os.write(data, 0, data.length);
os.flush();
file.delete();
dstore.getFiles().remove(filename);
dstore.getFileSizes().remove(filename);
}
}
if(!done) {
System.out.println("Didn't get the ACK from rebalance in time");
}
} catch (Exception e) {
e.printStackTrace();
}
}
}.start();
}
}
if(number_of_files_to_send == 0) {
res.println("REBALANCE_COMPLETE");
res.flush();
}
} else {
System.out.println("Unknown command");
}
......
......@@ -9,7 +9,7 @@ public class Server {
public static FileSystem fileSystem;
public static int pos;
public static boolean rebalancing;
Timer timer;
public Timer timer;
public Server(Controller controller, FileSystem fileSystem) {
this.controller = controller;
......@@ -17,9 +17,22 @@ public class Server {
rebalancing = false;
// do the rebalancing operation every rebalance_period seconds with a delay of rebalance_period seconds
timer = new Timer();
runRebalancing(controller.getRebalance_period());
}
public static FileSystem getFileSystem() {
return fileSystem;
}
private void runRebalancing(long delay) {
if (timer != null) {
timer.cancel();
timer.purge();
}
timer = new Timer("RebalanceOperation");
TimerTask timerTask = new TimerTask() {
@Override
public void run() {
try {
new Rebalance(fileSystem, controller).handle();
......@@ -28,7 +41,8 @@ public class Server {
}
}
};
timer.schedule(timerTask, controller.getRebalance_period(), controller.getRebalance_period());
timer.scheduleAtFixedRate(timerTask, delay, controller.getRebalance_period());
}
public void handleClient(Socket client) throws IOException {
......@@ -49,20 +63,11 @@ public class Server {
if (command.equals("JOIN")) {
int publicPort = Integer.parseInt(tokens[1]);
System.out.println("Dstore wants to connect");
FSStore fsStore = new FSStore(client, publicPort);
fileSystem.addDstore(client.getPort(), fsStore);
rebalancing = true;
new Thread() {
@Override
public void run() {
try {
new Rebalance(fileSystem, controller).handle();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}.start();
System.out.println("Dstore listening on port " + publicPort + " connected");
runRebalancing(0);
} else if (fileSystem.getDstores().size() < controller.getR()) {
res.println("ERROR_NOT_ENOUGH_DSTORES");
......@@ -73,7 +78,9 @@ public class Server {
handleStoreACK(tokens);
} else if (command.equals("REMOVE_ACK")) {
handleRemoveACK(tokens);
} else if(command.equals("LIST")) {
}
else if(command.equals("LIST")) {
// get the files of the dstore when rebalancing
List<String> filenames = new ArrayList<>();
for(int i = 1; i < tokens.length; i++) {
......@@ -82,26 +89,31 @@ public class Server {
fileSystem.addRebalancingList(fileSystem.
getDstores().
get(client.getPort()).
getPublicPort(),
get(client.getPort()).getConnection().getPort(),
filenames);
} else if (command.equals("REBALANCE_COMPLETE")) {
rebalancing = false;
System.out.println("Done rebalancing");
//fileSystem.increaseRebalanceComplete();
} else if (command.equals("ERROR_FILE_DOES_NOT_EXIST")) {
System.out.println(command + " " + tokens[1]);
} else {
System.out.println("Unknown command");
System.out.println("Unknown command " + command);
}
} else if (!rebalancing) {
switch (command) {
case "STORE" -> handleStore(client, tokens);
case "LOAD" -> handleLoad(client, tokens);
case "RELOAD" -> handleReload(client, tokens);
case "REMOVE" -> handleRemove(client, tokens);
case "LIST" -> handleList(client);
default -> System.out.println("Unknown command");
default -> System.out.println("Unknown command " + command);
}
} else if (rebalancing) {
System.out.println("Rebalancing...");
}
......@@ -157,12 +169,14 @@ public class Server {
boolean done = false;
long limit = System.currentTimeMillis() + controller.getTimeout();
while(!done && System.currentTimeMillis() < limit) {
if(fileSystem.getStore().containsKey(filename) && fileSystem.getStore().get(filename).size() == controller.getR()) {
while(!done && System.currentTimeMillis() <= limit) {
if(fileSystem.getStore().containsKey(filename) && !fileSystem.getStore().get(filename).isEmpty()) {
if(fileSystem.getStore().get(filename).size() == controller.getR()) {
done = true;
fileSystem.addIndex(filename, "store complete");
fileSystem.addStore(filename, temp);
for(FSStore fsStore : temp) {
fsStore.getFiles().add(filename);
}
......@@ -170,6 +184,8 @@ public class Server {
fileSystem.addFileSize(filename, filesize);
res.println("STORE_COMPLETE");
res.flush();
break;
}
}
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment