Skip to content
Snippets Groups Projects
Commit 7842cf8a authored by cg4g19's avatar cg4g19
Browse files

Multiple clients storing loading and removing now working

parent eb3412db
No related branches found
No related tags found
No related merge requests found
import java.io.*; import java.io.*;
import java.net.*; import java.net.*;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap; import java.util.HashMap;
import java.util.concurrent.Callable; import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
...@@ -46,6 +47,17 @@ public class Controller { ...@@ -46,6 +47,17 @@ public class Controller {
DstoreListener dstoreListener = new DstoreListener(client, port); DstoreListener dstoreListener = new DstoreListener(client, port);
dstores.add(dstoreListener); dstores.add(dstoreListener);
dstoreListener.start(); dstoreListener.start();
/*for(DstoreListener listener : dstores){
Socket dstore = listener.dstore;
System.out.println("DSTORES: " + dstores.size());
line = sendMessageAndAwaitReply(dstore, Protocol.LIST_TOKEN);
parsedLine = Parser.parse(line);
command = parsedLine[0];
String[] dstorefile_list = Arrays.copyOfRange(parsedLine, 1, parsedLine.length);
}*/
}else{ }else{
ClientListener clientListener = new ClientListener(client, line); ClientListener clientListener = new ClientListener(client, line);
clientListener.start(); clientListener.start();
...@@ -56,9 +68,15 @@ public class Controller { ...@@ -56,9 +68,15 @@ public class Controller {
//SOCKET SEND AND RECEIVE //SOCKET SEND AND RECEIVE
public static void sendMessage(Socket socket, String message) throws IOException { public static void sendMessage(Socket socket, String message) throws IOException {
PrintWriter out = new PrintWriter(socket.getOutputStream(), true); try {
out.println(message); PrintWriter out = new PrintWriter(socket.getOutputStream(), true);
ControllerLogger.getInstance().messageSent(socket, message); out.println(message);
ControllerLogger.getInstance().messageSent(socket, message);
} catch (Exception e) {
System.out.println("ERROR SENDING MESSAGE:\n");
e.printStackTrace();
}
} }
public static String receiveMessage(Socket socket) throws IOException{ public static String receiveMessage(Socket socket) throws IOException{
...@@ -88,6 +106,7 @@ public class Controller { ...@@ -88,6 +106,7 @@ public class Controller {
} }
static class DstoreListener extends Thread { static class DstoreListener extends Thread {
public Socket dstore; public Socket dstore;
public int port; public int port;
...@@ -100,6 +119,10 @@ public class Controller { ...@@ -100,6 +119,10 @@ public class Controller {
public void send(String message) throws IOException{ public void send(String message) throws IOException{
sendMessage(dstore, message); sendMessage(dstore, message);
} }
public void remove(){
dstores.remove(this);
}
@Override @Override
public void run() { public void run() {
...@@ -117,6 +140,8 @@ public class Controller { ...@@ -117,6 +140,8 @@ public class Controller {
String filename = parsedLine[1]; String filename = parsedLine[1];
ongoingRemoves.get(filename).ack(port); ongoingRemoves.get(filename).ack(port);
} }
}else{
break;
} }
} catch (Exception e) { } catch (Exception e) {
...@@ -124,6 +149,7 @@ public class Controller { ...@@ -124,6 +149,7 @@ public class Controller {
//TODO: handle exception //TODO: handle exception
} }
} }
System.out.println("Dstore disconnected!");
} }
} }
...@@ -196,28 +222,37 @@ public class Controller { ...@@ -196,28 +222,37 @@ public class Controller {
index.changeStatus(filename, "remove in progress"); index.changeStatus(filename, "remove in progress");
int[] ports = index.getFileInfo(filename).storePorts; int[] ports = index.getFileInfo(filename).storePorts;
for(int port : ports){ for(int port : ports){
getDstoreListener(port).send(Protocol.REMOVE_TOKEN + " " + filename); sendMessage(getDstoreListener(port).dstore, Protocol.REMOVE_TOKEN + " " + filename);
} }
GetRemoveAcks removeAck = new GetRemoveAcks(ports); GetRemoveAcks removeAck = new GetRemoveAcks(ports);
ongoingRemoves.put(filename, removeAck); ongoingRemoves.put(filename, removeAck);
Future<Boolean> future = executor.submit(removeAck); Future<Boolean> future = executor.submit(removeAck);
if(future.get(timeout, TimeUnit.MILLISECONDS)){ if(future.get(timeout, TimeUnit.MILLISECONDS)){
System.out.println("REMOVE SUCCESS");
sendMessage(client, Protocol.REMOVE_COMPLETE_TOKEN); sendMessage(client, Protocol.REMOVE_COMPLETE_TOKEN);
index.remove(filename); index.remove(filename);
ongoingRemoves.remove(filename); ongoingRemoves.remove(filename);
} }
} }
}else if(command.equals(Protocol.RELOAD_TOKEN)){
String filename = parsedLine[1];
if(index.removeFirstPort(filename)){
int port = index.getFileInfo(filename).storePorts[0];
int filesize = index.getFileInfo(filename).filesize;
sendMessage(client, Protocol.LOAD_FROM_TOKEN + " " + port + " " + filesize);
}else{
sendMessage(client, Protocol.ERROR_LOAD_TOKEN);
}
} }
line = receiveMessage(client); line = receiveMessage(client);
} }
} catch (Exception e) { } catch (Exception e) {
e.printStackTrace(); //If future times out
break; break;
} }
} }
} }
} }
......
...@@ -24,6 +24,8 @@ public class Dstore { ...@@ -24,6 +24,8 @@ public class Dstore {
try { try {
Socket controller = new Socket("localhost", cport); Socket controller = new Socket("localhost", cport);
sendMessage(controller, "JOIN " + port); sendMessage(controller, "JOIN " + port);
ControllerListener controllerListener = new ControllerListener(controller);
controllerListener.start();
while(true){ while(true){
Socket client = dstoreServer.accept(); Socket client = dstoreServer.accept();
ClientListener clientListener = new ClientListener(client, controller); ClientListener clientListener = new ClientListener(client, controller);
...@@ -85,6 +87,9 @@ public class Dstore { ...@@ -85,6 +87,9 @@ public class Dstore {
String filename = parsedLine[1]; String filename = parsedLine[1];
File file = new File(file_folder, filename); File file = new File(file_folder, filename);
if(!file.exists()){
client.close();
}
BufferedInputStream bis = new BufferedInputStream(new FileInputStream(file)); BufferedInputStream bis = new BufferedInputStream(new FileInputStream(file));
byte[] bytearray = bis.readAllBytes(); byte[] bytearray = bis.readAllBytes();
...@@ -93,13 +98,6 @@ public class Dstore { ...@@ -93,13 +98,6 @@ public class Dstore {
os.write(bytearray); os.write(bytearray);
os.flush(); os.flush();
bis.close(); bis.close();
}else if(command.equals(Protocol.REMOVE_TOKEN)){
String filename = parsedLine[1];
File file = new File(file_folder, filename);
if(file.delete()){
sendMessage(controller, Protocol.REMOVE_ACK_TOKEN + " " + filename);
}
} }
...@@ -112,4 +110,44 @@ public class Dstore { ...@@ -112,4 +110,44 @@ public class Dstore {
} }
} }
static class ControllerListener extends Thread {
Socket controller;
public ControllerListener(Socket controller){
this.controller = controller;
}
@Override
public void run(){
while(controller.isConnected()){
try {
String line = receiveMessage(controller);
if(line != null){
String[] parsedLine = Parser.parse(line);
String command = parsedLine[0];
if(command.equals(Protocol.REMOVE_TOKEN)){
String filename = parsedLine[1];
File file = new File(file_folder, filename);
if(file.delete()){
sendMessage(controller, Protocol.REMOVE_ACK_TOKEN + " " + filename);
}else{
sendMessage(controller, Protocol.ERROR_FILE_DOES_NOT_EXIST_TOKEN + " " + filename);
}
}else if(command.equals(Protocol.LIST_TOKEN)){
String file_list = "";
for(File file : file_folder.listFiles()){
file_list += file.getName() + " ";
}
file_list = file_list.trim();
sendMessage(controller, Protocol.LIST_TOKEN + " " + file_list);
}
}
} catch (Exception e) {
//TODO: handle exception
}
}
}
}
} }
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays;
public class Index { public class Index {
class FileInfo { class FileInfo {
...@@ -34,6 +35,16 @@ public class Index { ...@@ -34,6 +35,16 @@ public class Index {
index.remove(getFileInfo(name)); index.remove(getFileInfo(name));
} }
public boolean removeFirstPort(String name){
int[] storePorts = getFileInfo(name).storePorts;
if(storePorts.length == 1){
return false;
}else{
getFileInfo(name).storePorts = Arrays.copyOfRange(storePorts, 1, storePorts.length);
return true;
}
}
public String getFileList(){ public String getFileList(){
String file_list = ""; String file_list = "";
for(FileInfo fInfo : index){ for(FileInfo fInfo : index){
...@@ -60,4 +71,5 @@ public class Index { ...@@ -60,4 +71,5 @@ public class Index {
public boolean fileExists(String name){ public boolean fileExists(String name){
return (getFileInfo(name) != null); return (getFileInfo(name) != null);
} }
} }
\ No newline at end of file
import java.io.IOException;
public class Parser {
public static void main(String[] args) throws IOException {
}
public static String[] parse(String args, int amount) throws IOException {
String[] splitArgs = args.split(" ");
if(splitArgs.length != (amount + 1)) throw new IOException("Invalid paramters\nExpected:" + amount + "\nActual:" + splitArgs.length);
return splitArgs;
}
public static String[] parse(String[] args, int amount) throws IOException {
if(args.length != (amount)) throw new IOException("Invalid paramters\nExpected:" + amount + "\nActual:" + args.length);
return args;
}
public static String[] parse(String args){
return args.split(" ");
}
}
\ No newline at end of file
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment