Select Git revision
Dstore.java 11.74 KiB
import java.io.*;
import java.lang.Runnable;
import java.nio.file.Files;
import java.nio.file.Path;
import java.net.*;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.util.HashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
public class Dstore {
protected int port; //Port to listen on
protected int cport; //Controller's port to talk to
protected int timeout; //in milliseconds
protected File fileFolder; //Where to store the data locally
protected Map<String,Long> fileSizes;
protected Socket controllerSocket;
protected BufferedReader controllerIn;
protected PrintWriter controllerOut;
protected final int BUFFER_SIZE = 256;
public Dstore(int port, int cport, int timeout, String fileFolderName) throws Exception {
this.port = port;
this.cport = cport;
this.timeout = timeout;
DstoreLogger.init(Logger.LoggingType.ON_FILE_AND_TERMINAL, port);
fileFolder = new File(fileFolderName);
if(fileFolder.exists() && !fileFolder.isDirectory()) {
throw new Exception("Folder name provided exists as a file and not a directory");
}
else if(!fileFolder.exists()) {
System.out.println("New folder will be created");
if(!fileFolder.mkdir()) throw new Exception("Folder could not be created");
}
fileSizes = new HashMap<String,Long>();
for(File file : fileFolder.listFiles()) {
if(!file.delete()) throw new Exception("Directory specified has undeletable files; please try a different directory");
}
}
public static void main(String[] args) {
try {
int port = Integer.parseInt(args[0]);
int cport = Integer.parseInt(args[1]);
int timeout = Integer.parseInt(args[2]);
String fileFolder = args[3];
if(port < 0 || cport < 0 || timeout < 0) {
throw new Exception("Infeasible values provided as arguments");
}
Dstore dstore = new Dstore(port, cport, timeout, fileFolder);
dstore.start();
}
catch(IndexOutOfBoundsException e) {
System.out.println("Command line arguments have not been provided");
return;
}
catch(NumberFormatException e) {
System.out.println("Command line arguments must be integers");
return;
}
catch(Exception e) {
e.printStackTrace();
return;
}
}
public void start() {
try(Socket controllerSocket = new Socket(InetAddress.getLocalHost(), cport)) {
this.controllerSocket = controllerSocket;
controllerIn = new BufferedReader(new InputStreamReader(controllerSocket.getInputStream()));
controllerOut = new PrintWriter(controllerSocket.getOutputStream(), true);
String joinMessage = Protocol.JOIN_TOKEN + " " + port;
controllerOut.println(joinMessage);
controllerOut.flush();
messageSent(controllerSocket, joinMessage);
new Thread(() -> {
while(true) {
try {
String message = controllerIn.readLine();
if(message != null) {
messageReceived(controllerSocket, message);
handleMessage(message.split(" "), controllerSocket);
}
}
catch(Exception e) {
e.printStackTrace();
}
}
}).start();
ServerSocket server = new ServerSocket(port);
while(true) {
try {
Socket client = server.accept();
BufferedReader in = new BufferedReader(new InputStreamReader(client.getInputStream()));
String message = in.readLine();
messageReceived(client, message);
handleMessage(message.split(" "), client);
}
catch(Exception e) {
//Log error
e.printStackTrace();
}
}
}
catch(Exception e) {
e.printStackTrace();
}
}
void handleMessage(String[] message, Socket client) throws Exception {
if(message[0].equals(Protocol.STORE_TOKEN)) {
store(client, message[1], Long.parseLong(message[2]), true);
}
else if(message[0].equals(Protocol.REBALANCE_STORE_TOKEN)) {
store(client, message[1], Long.parseLong(message[2]), false);
}
else if(message[0].equals(Protocol.LOAD_DATA_TOKEN)) {
load(client, message[1]);
}
else if(message[0].equals(Protocol.REMOVE_TOKEN)) {
remove(message[1]);
}
else if(message[0].equals(Protocol.LIST_TOKEN)) {
list();
}
else if(message[0].equals(Protocol.REBALANCE_TOKEN)) {
rebalance(message);
}
else {
//Log error and continue (throw exception?)
System.out.println("Dstore " + port + " has received a malformed message");
}
}
void store(Socket client, String filename, long filesize, boolean acknowledged) throws Exception {
new Thread(() -> {
try {
//Send ACK message to client
PrintWriter out = new PrintWriter(client.getOutputStream(), true);
out.println(Protocol.ACK_TOKEN);
messageSent(client, Protocol.ACK_TOKEN);
OutputStream writer = new FileOutputStream(new File(fileFolder, filename), false);
InputStream reader = client.getInputStream();
//Receive + write file content from client
byte[] nextLine = new byte[BUFFER_SIZE];
int len;
do {
len = reader.readNBytes(nextLine, 0, BUFFER_SIZE);
writer.write(nextLine, 0, len);
writer.flush();
}
while(len == BUFFER_SIZE);
writer.close();
//Send STORE_ACK message to the Controller
if(acknowledged) {
synchronized(controllerOut) {
String controllerMessage = Protocol.STORE_ACK_TOKEN + " " + filename;
controllerOut.println(controllerMessage);
messageSent(controllerSocket, controllerMessage);
}
}
synchronized(fileSizes) {
if(fileSizes.containsKey(filename)) fileSizes.remove(filename);
fileSizes.put(filename, Long.valueOf(filesize));
}
}
catch(IOException e) {
e.printStackTrace();
}
finally {
try {client.close();} catch(IOException e) {e.printStackTrace();}
}
}).start();
}
void load(Socket client, String filename) throws Exception {
new Thread(() -> {
try {
//Send the content of the file in fileFolder to the client
FileInputStream reader;
try {
reader = new FileInputStream(new File(fileFolder, filename));
}
catch(FileNotFoundException e) {
client.close();
return;
}
OutputStream contentOut = client.getOutputStream();
byte[] buf = new byte[BUFFER_SIZE];
int len;
do {
len = reader.read(buf);
if(len >= 0) {
contentOut.write(buf, 0, len);
contentOut.flush();
}
}
while(len == BUFFER_SIZE);
reader.close();
contentOut.close();
}
catch(IOException e) {
e.printStackTrace();
}
finally {
try {if(!client.isClosed()) client.close();} catch(IOException e) {e.printStackTrace();}
}
}).start();
}
void remove(String filename) throws Exception {
new Thread(() -> {
try {
System.out.println("Store " + port + " removing " + filename + "...");
//Remove the file from fileFolder
Path path = new File(fileFolder, filename).toPath();
String controllerMessage;
if(Files.deleteIfExists(path)) {
System.out.println("Store " + port + " removed " + filename);
//Send REMOVE_ACK message to client (the controller)
synchronized(controllerOut) {
controllerMessage = Protocol.REMOVE_ACK_TOKEN + " " + filename;
}
}
else {
System.out.println("Store " + port + " couldn't remove " + filename);
//Send DOES NOT EXIST error
synchronized(controllerOut) {
controllerMessage = Protocol.ERROR_FILE_DOES_NOT_EXIST_TOKEN + " " + filename;
}
}
controllerOut.println(controllerMessage);
messageSent(controllerSocket, controllerMessage);
}
catch(IOException e) {
e.printStackTrace();
}
}).start();
}
void list() throws Exception {
new Thread(() -> {
//Send a list of all files in fileFolder to client (the controller)
String message = Protocol.LIST_TOKEN;
for(File file : fileFolder.listFiles()) {
message = message + " " + file.getName();
}
synchronized(controllerOut) {
controllerOut.println(message);
}
}).start();
}
void rebalance(String[] message) throws Exception {
System.out.println("Rebalance message received");
new Thread(() -> {
//Interpret files to send and files to remove from the message
Map<Integer,List<String>> filesToSend;
String[] filesToRemove;
int index;
String tmessage = "";
for(String s : message) {
tmessage = tmessage + " " + s;
}
System.out.println("Interpreting message:" + tmessage);
int numberToSend = Integer.parseInt(message[1]);
int totalReceivers = 0;
index = 2;
filesToSend = new HashMap<Integer,List<String>>();
for(int i=0; i<numberToSend; i++) {
String name = message[index];
index++;
int numberOfReceivers = Integer.parseInt(message[index]);
totalReceivers += numberOfReceivers;
index++;
for(int j=0; j<numberOfReceivers; j++) {
Integer receiver = Integer.parseInt(message[index]);
if(!filesToSend.containsKey(receiver)) {
filesToSend.put(receiver,new ArrayList<String>());
}
filesToSend.get(receiver).add(name);
index++;
}
}
int numberToRemove = Integer.parseInt(message[index]);
index++;
filesToRemove = new String[numberToRemove];
for(int k=0; k<numberToRemove; k++) {
filesToRemove[k] = message[index];
index++;
}
System.out.println("Interpreting complete, will send " + numberToSend + " and remove " + numberToRemove);
//Send each file to send to the Dstore at the specified port number
CountDownLatch latch = new CountDownLatch(totalReceivers);
for(Integer dstore : filesToSend.keySet()) {
for(String filename : filesToSend.get(dstore)) {
new Thread(() -> {
try {
System.out.println("Sending " + filename + " to store " + dstore);
Socket socket = new Socket(InetAddress.getLocalHost(), dstore.intValue());
PrintWriter out = new PrintWriter(socket.getOutputStream(), true);
long fileSize;
synchronized(fileSizes) {fileSize = fileSizes.get(filename);}
String dstoreMessage = Protocol.REBALANCE_STORE_TOKEN + " " + filename + " " + fileSize;
out.println(dstoreMessage);
messageSent(socket, dstoreMessage);
BufferedReader in = new BufferedReader(new InputStreamReader(socket.getInputStream()));
String receivedMessage = in.readLine();
messageReceived(socket, receivedMessage);
if(!receivedMessage.equals(Protocol.ACK_TOKEN)) {
//Log error
System.err.println("Dstore " + dstore + " should have sent ACK but " + port + " received " + receivedMessage);
}
byte[] content = new byte[BUFFER_SIZE];
int len;
FileInputStream fileIn = new FileInputStream(new File(fileFolder, filename));
OutputStream fileOut = socket.getOutputStream();
do {
len = fileIn.read(content);
if(len >= 0) {
fileOut.write(content, 0, len);
fileOut.flush();
}
}
while(len > 0);
fileIn.close();
fileOut.close();
in.close();
out.close();
socket.close();
}
catch(IOException e) {
e.printStackTrace();
}
finally {
try {latch.countDown();} catch(Exception e) {}
}
}).start();
}
}
try {latch.await(timeout, TimeUnit.MILLISECONDS);} catch(Exception e) {e.printStackTrace();}
//Remove each file to remove from fileFolder
for(String filename : filesToRemove) {
System.out.println("Removing file " + filename);
new File(fileFolder, filename).delete();
}
//Send REBALANCE_COMPLETE message to client (the controller)
synchronized(controllerOut) {
controllerOut.println(Protocol.REBALANCE_COMPLETE_TOKEN);
messageSent(controllerSocket, Protocol.REBALANCE_COMPLETE_TOKEN);
}
}).start();
}
protected void messageSent(Socket socket, String message) {
DstoreLogger.getInstance().messageSent(socket, message);
}
protected void messageReceived(Socket socket, String message) {
DstoreLogger.getInstance().messageReceived(socket, message);
}
}