Skip to content
Snippets Groups Projects
Commit 7752b3e6 authored by tmp1u19's avatar tmp1u19 :octopus:
Browse files

Final version of coursework. It's done!!!

parent af474d34
No related branches found
No related tags found
No related merge requests found
......@@ -17,7 +17,7 @@ public class ClientMain {
if (!uploadFolder.exists())
throw new RuntimeException("to_store folder does not exist");
testClient(cport, timeout, downloadFolder);
//testClient(cport, timeout, downloadFolder);
// example to launch a number of concurrent clients, each doing the same operations
for (int i = 0; i < 10; i++) {
......
......@@ -17,6 +17,7 @@ public class Controller {
public void start() {
try {
ControllerLogger.init(Logger.LoggingType.ON_FILE_AND_TERMINAL);
ServerSocket controller = new ServerSocket(cport);
FileSystem fileSystem = new FileSystem();
Server server = new Server(this, fileSystem);
......
import java.io.IOException;
import java.net.Socket;
public class ControllerLogger extends Logger {
private static final String LOG_FILE_SUFFIX = "controller";
private static ControllerLogger instance = null;
public static void init(LoggingType loggingType) throws IOException {
if (instance == null)
instance = new ControllerLogger(loggingType);
else
throw new IOException("ControllerLogger already initialised");
}
public static ControllerLogger getInstance() {
if (instance == null)
throw new RuntimeException("ControllerLogger has not been initialised yet");
return instance;
}
protected ControllerLogger(LoggingType loggingType) throws IOException {
super(loggingType);
}
@Override
protected String getLogFileSuffix() {
return LOG_FILE_SUFFIX;
}
public void dstoreJoined(Socket socket, int dstorePort) {
log("[New Dstore " + dstorePort + " " + socket.getLocalPort() + "<-" + socket.getPort() + "]");
}
}
......@@ -98,6 +98,7 @@ public class Dstore {
String command = tokens[0];
if(dstore.getFiles().size() < Controller.R) {
DstoreLogger.getInstance().messageSent(client, "ERROR_NOT_ENOUGH_DSTORES");
res.println("ERROR_NOT_ENOUGH_DSTORES");
res.flush();
} else {
......@@ -106,35 +107,51 @@ public class Dstore {
case "STORE":
try {
DstoreLogger.getInstance().messageReceived(client, line);
// send response to client
DstoreLogger.getInstance().messageSent(client, "ACK");
res.println("ACK");
res.flush();
// get the filename and the filesize of the file to be stored
String filename = tokens[1];
int filesize = Integer.parseInt(tokens[2]);
// read data and store the file
long limit = System.currentTimeMillis() + dstore.getTimeout();
boolean finished = false;
while(!finished && System.currentTimeMillis() < limit) {
// read data and store the file
// create the file and read its bytes
byte[] data = new byte[filesize];
client.getInputStream().readNBytes(data, 0, filesize);
in.readNBytes(data, 0, filesize);
FileOutputStream o = new FileOutputStream(dstore.getFile_folder() +
"/" + filename);
// write the bytes in the file created and close it after
o.write(data);
dstore.addFile(filename);
finished = true;
o.flush();
o.close();
// update the files from the dstore
dstore.addFile(filename);
dstore.getFiles().add(filename);
dstore.getFileSizes().put(filename, filesize);
// exit while loop
finished = true;
}
// timeout from dstore
if(!finished) {
System.out.println("ERROR: time expired");
System.out.println("ERROR: time expired in dstore while storing");
return;
}
// send response to the Controller
PrintWriter r = new PrintWriter(new OutputStreamWriter(controllerSocket.getOutputStream()));
DstoreLogger.getInstance().messageSent(controllerSocket, "STORE_ACK " + filename);
r.println("STORE_ACK " + filename);
r.flush();
......@@ -145,38 +162,40 @@ public class Dstore {
case "LOAD_DATA":
DstoreLogger.getInstance().messageReceived(client, line);
if(tokens.length != 2) {
System.out.println("Arguments don't match in LOAD operation");
} else {
String filename = tokens[1];
try {
if (dstore.getFiles().contains(filename)) {
File file = new File(dstore.getFile_folder() + "/" + filename);
FileInputStream i = new FileInputStream(file);
int n;
while ((n = i.read()) != -1) {
client.getOutputStream().write(n);
}
i.close();
} else {
done = true;
client.close();
if (dstore.getFiles().contains(filename)) {
File file = new File(dstore.getFile_folder() + "/" + filename);
FileInputStream i = new FileInputStream(file);
int n;
while ((n = i.read()) != -1) {
client.getOutputStream().write(n);
}
Server.pos = 0;
} catch (Exception ingored) {
res.println("RELOAD " + filename);
res.flush();
i.close();
} else {
done = true;
client.close();
}
Server.pos = 0;
}
break;
case "REBALANCE_STORE" :
DstoreLogger.getInstance().messageReceived(client, line);
res.println("ACK");
res.flush();
DstoreLogger.getInstance().messageSent(client, "ACK");
String filename = tokens[1];
int filesize = Integer.parseInt(tokens[2]);
......@@ -192,6 +211,7 @@ public class Dstore {
dstore.getFileSizes().put(filename, filesize);
PrintWriter r = new PrintWriter(new OutputStreamWriter(controllerSocket.getOutputStream()));
DstoreLogger.getInstance().messageSent(controllerSocket, "REBALANCE_COMPLETE");
r.println("REBALANCE_COMPLETE");
r.flush();
......@@ -217,21 +237,25 @@ public class Dstore {
String command = tokens[0];
if(dstore.getFiles().size() < Controller.R) {
DstoreLogger.getInstance().messageSent(controllerSocket, "ERROR_NOT_ENOUGH_DSTORES");
res.println("ERROR_NOT_ENOUGH_DSTORES");
res.flush();
} else {
if(command.equals("REMOVE")) {
DstoreLogger.getInstance().messageReceived(controllerSocket, line);
try {
String filename = tokens[1];
if(dstore.getFiles().contains(filename)) {
System.out.println("REMOVE_ACK");
File file = new File(dstore.getFile_folder() + "/" + filename);
file.delete();
dstore.getFiles().remove(filename);
dstore.getFileSizes().remove(filename);
DstoreLogger.getInstance().messageSent(controllerSocket, "REMOVE_ACK " + filename);
res.println("REMOVE_ACK " + filename);
} else {
DstoreLogger.getInstance().messageSent(controllerSocket, "ERROR_FILE_DOES_NOT_EXIST " + filename);
res.println("ERROR_FILE_DOES_NOT_EXIST " + filename);
}
......@@ -242,16 +266,22 @@ public class Dstore {
}
} else if (command.equals("LIST")) {
DstoreLogger.getInstance().messageReceived(controllerSocket, line);
String file_list = "";
for(String filename : dstore.getFiles()) {
file_list = filename + " " + file_list;
}
DstoreLogger.getInstance().messageSent(controllerSocket, "LIST " + file_list);
res.println("LIST " + file_list);
res.flush();
} else if (command.equals("REBALANCE")) {
DstoreLogger.getInstance().messageReceived(controllerSocket, line);
// show the distribution of files
for(int i = 0; i < tokens.length; i++) {
System.out.print(tokens[i] + " ");
......@@ -283,10 +313,12 @@ public class Dstore {
PrintWriter re = new PrintWriter(new OutputStreamWriter(socket.getOutputStream()));
BufferedReader rs = new BufferedReader(new InputStreamReader(socket.getInputStream()));
DstoreLogger.getInstance().messageSent(socket, "REBALANCE_STORE " + filename + " " + filesize);
re.println("REBALANCE_STORE " + filename + " " + filesize);
re.flush();
String l = rs.readLine();
DstoreLogger.getInstance().messageReceived(socket, l);
long limit = System.currentTimeMillis() + dstore.getTimeout();
boolean done = false;
......@@ -325,11 +357,6 @@ public class Dstore {
}
}
if(number_of_files_to_send == 0) {
res.println("REBALANCE_COMPLETE");
res.flush();
}
} else {
System.out.println("Unknown command");
}
......@@ -353,10 +380,12 @@ public class Dstore {
Dstore dstore = new Dstore(port, cport, timeout, file_folder);
try {
DstoreLogger.init(Logger.LoggingType.ON_FILE_AND_TERMINAL, port);
Socket socket = new Socket();
socket.connect(new InetSocketAddress("localhost", cport));
OutputStream out = socket.getOutputStream();
PrintWriter res = new PrintWriter(new OutputStreamWriter(out));
DstoreLogger.getInstance().messageSent(socket, "JOIN " + port);
res.println("JOIN " + port);
res.flush();
......
import java.io.IOException;
public class DstoreLogger extends Logger {
private static final String LOG_FILE_SUFFIX = "dstore";
private static DstoreLogger instance = null;
private final String logFileSuffix;
public static void init(LoggingType loggingType, int port) throws IOException {
if (instance == null)
instance = new DstoreLogger(loggingType, port);
else
throw new IOException("DstoreLogger already initialised");
}
public static DstoreLogger getInstance() {
if (instance == null)
throw new RuntimeException("DstoreLogger has not been initialised yet");
return instance;
}
protected DstoreLogger(LoggingType loggingType, int port) throws IOException {
super(loggingType);
logFileSuffix = LOG_FILE_SUFFIX + "_" + port;
}
@Override
protected String getLogFileSuffix() {
return logFileSuffix;
}
}
import java.io.IOException;
import java.io.PrintStream;
import java.net.Socket;
public abstract class Logger {
public enum LoggingType {
NO_LOG, // no log at all
ON_TERMINAL_ONLY, // log to System.out only
ON_FILE_ONLY, // log to file only
ON_FILE_AND_TERMINAL // log to both System.out and file
}
protected final LoggingType loggingType;
protected PrintStream ps;
protected Logger(LoggingType loggingType) {
this.loggingType = loggingType;
}
protected abstract String getLogFileSuffix();
protected synchronized PrintStream getPrintStream() throws IOException {
if (ps == null)
ps = new PrintStream(getLogFileSuffix() + "_" + System.currentTimeMillis() + ".log");
return ps;
}
protected boolean logToFile() {
return loggingType == LoggingType.ON_FILE_ONLY || loggingType == LoggingType.ON_FILE_AND_TERMINAL;
}
protected boolean logToTerminal() {
return loggingType == LoggingType.ON_TERMINAL_ONLY || loggingType == LoggingType.ON_FILE_AND_TERMINAL;
}
protected void log(String message) {
if (logToFile())
try { getPrintStream().println(message); } catch(Exception e) { e.printStackTrace(); }
if (logToTerminal())
System.out.println(message);
}
public void messageSent(Socket socket, String message) {
log("[" + socket.getLocalPort() + "->" + socket.getPort() + "] " + message);
}
public void messageReceived(Socket socket, String message) {
log("[" + socket.getLocalPort() + "<-" + socket.getPort() + "] " + message);
}
}
public class Protocol {
// messages from Clients
public final static String LIST_TOKEN = "LIST"; // also from Controller and Dstores
public final static String STORE_TOKEN = "STORE"; // also from Dstores
public final static String LOAD_TOKEN = "LOAD";
public final static String LOAD_DATA_TOKEN = "LOAD_DATA";
public final static String RELOAD_TOKEN = "RELOAD";
public final static String REMOVE_TOKEN = "REMOVE"; // also from Controller
// messages from Controller
public final static String STORE_TO_TOKEN = "STORE_TO";
public final static String STORE_COMPLETE_TOKEN = "STORE_COMPLETE";
public final static String LOAD_FROM_TOKEN = "LOAD_FROM";
public final static String REMOVE_COMPLETE_TOKEN = "REMOVE_COMPLETE";
public final static String REBALANCE_TOKEN = "REBALANCE";
public final static String ERROR_FILE_DOES_NOT_EXIST_TOKEN = "ERROR_FILE_DOES_NOT_EXIST"; // also from Dstores
public final static String ERROR_FILE_ALREADY_EXISTS_TOKEN = "ERROR_FILE_ALREADY_EXISTS";
public final static String ERROR_NOT_ENOUGH_DSTORES_TOKEN = "ERROR_NOT_ENOUGH_DSTORES";
public final static String ERROR_LOAD_TOKEN = "ERROR_LOAD";
// messages from Dstores
public final static String ACK_TOKEN = "ACK";
public final static String STORE_ACK_TOKEN = "STORE_ACK";
public final static String REMOVE_ACK_TOKEN = "REMOVE_ACK";
public final static String JOIN_TOKEN = "JOIN";
public final static String REBALANCE_STORE_TOKEN = "REBALANCE_STORE";
public final static String REBALANCE_COMPLETE_TOKEN = "REBALANCE_COMPLETE";
}
......@@ -60,27 +60,33 @@ public class Server {
String[] tokens = line.split(" ");
String command = tokens[0];
if (command.equals("JOIN")) {
if (!rebalancing && command.equals("JOIN")) {
int publicPort = Integer.parseInt(tokens[1]);
FSStore fsStore = new FSStore(client, publicPort);
fileSystem.addDstore(client.getPort(), fsStore);
System.out.println("Dstore listening on port " + publicPort + " connected");
ControllerLogger.getInstance().dstoreJoined(client, publicPort);
runRebalancing(0);
} else if (fileSystem.getDstores().size() < controller.getR()) {
ControllerLogger.getInstance().messageSent(client, "ERROR_NOT_ENOUGH_DSTORES");
res.println("ERROR_NOT_ENOUGH_DSTORES");
res.flush();
} else {
if (fileSystem.getDstores().containsKey(client.getPort())) {
if(command.equals("STORE_ACK")) {
ControllerLogger.getInstance().messageReceived(client, "STORE_ACK");
handleStoreACK(tokens);
} else if (command.equals("REMOVE_ACK")) {
ControllerLogger.getInstance().messageReceived(client, "REMOVE_ACK");
handleRemoveACK(tokens);
}
else if(command.equals("LIST")) {
ControllerLogger.getInstance().messageReceived(client, "LIST");
// get the files of the dstore when rebalancing
List<String> filenames = new ArrayList<>();
for(int i = 1; i < tokens.length; i++) {
......@@ -93,24 +99,22 @@ public class Server {
filenames);
} else if (command.equals("REBALANCE_COMPLETE")) {
//fileSystem.increaseRebalanceComplete();
ControllerLogger.getInstance().messageReceived(client, "REBALANCE_COMPLETE");
System.out.println("Rebalance done for " + fileSystem.getDstores().get(client.getPort()).getPublicPort());
} else if (command.equals("ERROR_FILE_DOES_NOT_EXIST")) {
System.out.println(command + " " + tokens[1]);
ControllerLogger.getInstance().messageReceived(client, "ERROR_FILE_DOES_NOT_EXIST");
} else {
System.out.println("Unknown command " + command);
}
} else if (!rebalancing) {
switch (command) {
case "STORE" -> handleStore(client, tokens);
case "LOAD" -> handleLoad(client, tokens);
case "RELOAD" -> handleReload(client, tokens);
case "REMOVE" -> handleRemove(client, tokens);
case "LIST" -> handleList(client);
case "STORE" -> handleStore(client, tokens, line);
case "LOAD" -> handleLoad(client, tokens, line);
case "RELOAD" -> handleReload(client, tokens, line);
case "REMOVE" -> handleRemove(client, tokens, line);
case "LIST" -> handleList(client, line);
default -> System.out.println("Unknown command " + command);
}
......@@ -126,14 +130,17 @@ public class Server {
* @param tokens gets the filename and the filesize
* @throws IOException
*/
private void handleStore(Socket client, String[] tokens) throws IOException {
try {
private void handleStore(Socket client, String[] tokens, String line) throws IOException {
ControllerLogger.getInstance().messageReceived(client, line);
try {
String filename = tokens[1];
int filesize = Integer.parseInt(tokens[2]);
PrintWriter res = new PrintWriter(new OutputStreamWriter(client.getOutputStream()));
if(fileSystem.getStore().containsKey(filename)) {
ControllerLogger.getInstance().messageSent(client, "ERROR_FILE_ALREADY_EXISTS");
res.println("ERROR_FILE_ALREADY_EXISTS");
res.flush();
return;
......@@ -162,6 +169,7 @@ public class Server {
i++;
}
ControllerLogger.getInstance().messageSent(client, "STORE_TO " + msg);
res.println("STORE_TO " + msg);
res.flush();
......@@ -182,6 +190,7 @@ public class Server {
}
fileSystem.addFileSize(filename, filesize);
ControllerLogger.getInstance().messageSent(client, "STORE_COMPLETE");
res.println("STORE_COMPLETE");
res.flush();
break;
......@@ -200,27 +209,32 @@ public class Server {
}
}
private void handleLoad(Socket client, String[] tokens) throws IOException {
private void handleLoad(Socket client, String[] tokens, String line) throws IOException {
ControllerLogger.getInstance().messageReceived(client, line);
try {
String filename = tokens[1];
PrintWriter res = new PrintWriter(new OutputStreamWriter(client.getOutputStream()));
if(!fileSystem.getStore().containsKey(filename)) {
ControllerLogger.getInstance().messageSent(client, "ERROR_FILE_DOES_NOT_EXIST");
res.println("ERROR_FILE_DOES_NOT_EXIST");
} else {
// select a Dstore from there and give an appropriate error if all Dstores fail
// #TODO fix NullPointerException from here
ControllerLogger.getInstance().messageSent(client, "LOAD_FROM " + fileSystem.getStore().get(filename).get(pos).getPublicPort() +
" " + fileSystem.getFileSizes().get(filename));
res.println("LOAD_FROM " + fileSystem.getStore().get(filename).get(pos).getPublicPort() +
" " + fileSystem.getFileSizes().get(filename));
}
res.flush();
} catch (IndexOutOfBoundsException e) {
System.out.println("Arguments don't match in LOAD operation");
}
}
private void handleReload(Socket client, String[] tokens) throws IOException {
private void handleReload(Socket client, String[] tokens, String line) throws IOException {
ControllerLogger.getInstance().messageReceived(client, line);
pos = pos + 1;
PrintWriter res = new PrintWriter(new OutputStreamWriter(client.getOutputStream()));
......@@ -228,17 +242,21 @@ public class Server {
String filename = tokens[1];
if(!fileSystem.getStore().containsKey(filename)) {
pos = 0;
ControllerLogger.getInstance().messageSent(client, "ERROR_FILE_DOES_NOT_EXIST");
res.println("ERROR_FILE_DOES_NOT_EXIST");
} else {
if(pos == controller.getR()) {
pos = 0;
ControllerLogger.getInstance().messageSent(client, "ERROR_LOAD");
res.println("ERROR_LOAD");
res.flush();
return;
}
// select a Dstore from there and give an appropriate error if all Dstores fail
ControllerLogger.getInstance().messageSent(client, "LOAD_FROM " + fileSystem.getStore().get(filename).get(pos).getPublicPort() +
" " + fileSystem.getFileSizes().get(filename));
res.println("LOAD_FROM " + fileSystem.getStore().get(filename).get(pos).getPublicPort() +
" " + fileSystem.getFileSizes().get(filename));
}
......@@ -249,7 +267,10 @@ public class Server {
}
}
private void handleRemove(Socket client, String[] tokens) throws IOException {
private void handleRemove(Socket client, String[] tokens, String line) throws IOException {
ControllerLogger.getInstance().messageReceived(client, line);
try {
String filename = tokens[1];
......@@ -259,15 +280,16 @@ public class Server {
if(!fileSystem.store.containsKey(filename)) {
PrintWriter res = new PrintWriter(new OutputStreamWriter(client.getOutputStream()));
ControllerLogger.getInstance().messageSent(client, "ERROR_FILE_DOES_NOT_EXIST");
res.println("ERROR_FILE_DOES_NOT_EXIST");
res.flush();
return;
}
for(FSStore fsStore : fileSystem.getStore().get(filename)) {
System.out.println("Sending to Dstore");
fsStore.getFiles().remove(filename);
PrintWriter res = fsStore.getOutput();
ControllerLogger.getInstance().messageSent(client, "REMOVE " + filename);
res.println("REMOVE " + filename);
res.flush();
}
......@@ -282,6 +304,7 @@ public class Server {
fileSystem.addIndex(filename, "remove complete");
fileSystem.getStore().remove(filename);
fileSystem.getFileSizes().remove(filename);
ControllerLogger.getInstance().messageSent(client, "REMOVE_COMPLETE");
res.println("REMOVE_COMPLETE");
res.flush();
}
......@@ -297,13 +320,17 @@ public class Server {
}
}
private void handleList(Socket client) throws IOException {
private void handleList(Socket client, String line) throws IOException {
ControllerLogger.getInstance().messageReceived(client, line);
String msg = "";
for(String filename : fileSystem.getStore().keySet()) {
msg = filename + " " + msg;
}
PrintWriter res = new PrintWriter(new OutputStreamWriter(client.getOutputStream()));
ControllerLogger.getInstance().messageSent(client, "LIST " + msg);
res.println("LIST " + msg);
res.flush();
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment