Select Git revision
Dstore.java
Dstore.java 6.69 KiB
import java.io.*;
import java.lang.Runnable;
import java.nio.file.Files;
import java.nio.file.Path;
import java.net.*;
import java.util.Map;
import java.util.HashMap;
public class Dstore {
protected int port; //Port to listen on
protected int cport; //Controller's port to talk to
protected int timeout; //in milliseconds
protected String fileFolder; //Where to store the data locally
public Dstore(int port, int cport, int timeout, String fileFolder) {
this.port = port;
this.cport = cport;
this.timeout = timeout;
this.fileFolder = fileFolder;
}
public static void main(String[] args) {
try {
int port = Integer.parseInt(args[0]);
int cport = Integer.parseInt(args[1]);
int timeout = Integer.parseInt(args[2]);
String fileFolder = args[3];
Dstore dstore = new Dstore(port, cport, timeout, fileFolder);
dstore.start();
}
catch(IndexOutOfBoundsException e) {
System.out.println("Command line arguments have not been provided");
return;
}
catch(NumberFormatException e) {
System.out.println("Command line arguments must be integers");
return;
}
}
public void start() {
try {
Socket socket = new Socket(InetAddress.getLocalHost(), cport);
BufferedWriter out = new BufferedWriter(new OutputStreamWriter(socket.getOutputStream()));
out.write("JOIN " + port);
out.close();
socket.close();
ServerSocket server = new ServerSocket(port);
while(true) {
try {
Socket client = server.accept();
BufferedReader in = new BufferedReader(new InputStreamReader(client.getInputStream()));
String[] message = in.readLine().split(" ");
handleMessage(message, client, in);
in.close();
}
catch(Exception e) {
//Log error
e.printStackTrace();
}
}
}
catch(Exception e) {
e.printStackTrace();
}
}
void handleMessage(String[] message, Socket client, BufferedReader clientIn) throws Exception {
if(message[0].equals("STORE")) {
store(client, message[1], Integer.parseInt(message[2]), clientIn);
}
else if(message[0].equals("LOAD_DATA")) {
load(client, message[1]);
}
else if(message[0].equals("REMOVE")) {
remove(client, message[1]);
}
else if(message[0].equals("LIST")) {
list(client);
}
else if(message[0].equals("REBALANCE")) {
rebalance(client, message);
}
else {
//Log error and continue (throw exception?)
}
}
void store(Socket client, String filename, int filesize, BufferedReader in) throws Exception {
new Thread(() -> {
try {
//Send ACK message to client
PrintWriter out = new PrintWriter(client.getOutputStream());
out.print("ACK");
out.flush();
out.close();
FileOutputStream writer = new FileOutputStream(fileFolder + "/" + filename, false);
//Receive + write file content from client
int byteCount = filesize;
while(byteCount > 0) {
byte[] nextLine = in.readLine().getBytes();
writer.write(nextLine);
writer.flush();
byteCount -= nextLine.length;
}
writer.close();
//Send STORE_ACK message to the Controller
PrintWriter controllerOut = new PrintWriter(new Socket(InetAddress.getLocalHost(), cport).getOutputStream());
controllerOut.print("STORE_ACK " + filename);
controllerOut.flush();
controllerOut.close();
}
catch(IOException e) {
e.printStackTrace();
}
}).start();
}
void load(Socket client, String filename) throws Exception {
new Thread(() -> {
try {
//Send the content of the file in fileFolder to the client
PrintWriter out = new PrintWriter(client.getOutputStream());
FileInputStream reader;
try {
reader = new FileInputStream(fileFolder + "/" + filename);
}
catch(FileNotFoundException e) {
out.print("ERROR DOES_NOT_EXIST");
out.flush();
out.close();
return;
}
byte[] buf = new byte[8];
while(reader.read(buf) != -1) {
out.print(new String(buf));
out.flush();
}
reader.close();
out.close();
}
catch(IOException e) {
e.printStackTrace();
}
}).start();
}
void remove(Socket client, String filename) throws Exception {
new Thread(() -> {
try {
//Remove the file from fileFolder
Path path = new File(fileFolder + "/" + filename).toPath();
PrintWriter out = new PrintWriter(client.getOutputStream());
if(Files.deleteIfExists(path)) {
//Send REMOVE_ACK message to client (the controller)
out.print("REMOVE_ACK");
}
else {
//Send DOES NOT EXIST error
out.print("ERROR DOES_NOT_EXIST " + filename);
}
out.flush();
out.close();
}
catch(IOException e) {
e.printStackTrace();
}
}).start();
}
void list(Socket client) throws Exception {
new Thread(() -> {
try {
//Send a list of all files in fileFolder to client (the controller)
PrintWriter out = new PrintWriter(client.getOutputStream());
for(File file : new File(fileFolder).listFiles()) {
out.print(file.getName());
out.flush();
}
out.close();
}
catch(IOException e) {
e.printStackTrace();
}
}).start();
}
void rebalance(Socket client, String[] message) throws Exception {
new Thread(() -> {
try {
//Interpret files to send and files to remove from the message
Map<String,Integer[]> filesToSend;
String[] filesToRemove;
int index;
int numberToSend = Integer.parseInt(message[1]);
index = 2;
filesToSend = new HashMap<String,Integer[]>(numberToSend);
for(int i=0; i<numberToSend; i++) {
String name = message[index];
index++;
int numberOfReceivers = Integer.parseInt(message[index]);
index++;
Integer[] receivers = new Integer[numberOfReceivers];
for(int j=0; j<numberOfReceivers; j++) {
receivers[j] = Integer.parseInt(message[index]);
index++;
}
filesToSend.put(name, receivers);
}
int numberToRemove = Integer.parseInt(message[index]);
index++;
filesToRemove = new String[numberToRemove];
for(int k=0; k<numberToRemove; k++) {
filesToRemove[k] = message[index];
index++;
}
//Send each file to send to the Dstore at the specified port number
for(String filename : filesToSend.keySet()) {
for(Integer dstore : filesToSend.get(filename)) {
//Same store functions as used in the client object
}
}
//Remove each file to remove from fileFolder
for(String filename : filesToRemove) {
new File(fileFolder + "/" + filename).delete();
}
//Send REBALANCE_COMPLETE message to client (the controller)
PrintWriter out = new PrintWriter(client.getOutputStream());
out.print("REBALANCE COMPLETE");
out.flush();
out.close();
}
catch(IOException e) {
e.printStackTrace();
}
}).start();
}
}