Dstore.java 8.39 KiB
import java.io.*;
import java.net.*;
import java.util.HashSet;
import java.util.Set;
public class Dstore {
private final int port;
private final int cport;
private final int timeout;
private final String file_folder;
private final Set<String> files;
public Dstore(int port, int cport, int timeout, String file_folder) {
this.port = port;
this.cport = cport;
this.timeout = timeout;
this.file_folder = file_folder;
this.files = new HashSet<>();
}
public int getPort() {
return port;
}
public int getCport() {
return cport;
}
public int getTimeout() {
return timeout;
}
public String getFile_folder() {
return file_folder;
}
public Set<String> getFiles() {
return files;
}
public void listenClient(Socket socket) {
try {
ServerSocket dstore = new ServerSocket(port);
for(;;) {
try {
Socket client = dstore.accept();
new Thread(() -> {
try {
handleDstoreClientReq(client, Dstore.this, socket);
} catch (IOException e) {
e.printStackTrace();
}
}).start();
} catch(Exception e) {
e.printStackTrace();
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
public void listenController(Socket socket) {
new Thread(() -> {
try {
handleDstoreControllerReq(Dstore.this, socket);
} catch (IOException e) {
e.printStackTrace();
}
}).start();
}
public void handleDstoreClientReq(Socket client, Dstore dstore, Socket controllerSocket) throws IOException {
OutputStream out = client.getOutputStream();
InputStream in = client.getInputStream();
BufferedReader req = new BufferedReader(new InputStreamReader(in));
PrintWriter res = new PrintWriter(new OutputStreamWriter(out));
String line;
boolean done = false;
while(!done && (line = req.readLine()) != null) {
String[] tokens = line.split(" ");
String command = tokens[0];
if(dstore.getFiles().size() < Controller.R) {
res.println("ERROR_NOT_ENOUGH_DSTORES");
res.flush();
} else {
switch (command) {
case "STORE":
try {
res.println("ACK");
res.flush();
String filename = tokens[1];
int filesize = Integer.parseInt(tokens[2]);
long limit = System.currentTimeMillis() + dstore.getTimeout();
boolean finished = false;
while(!finished && System.currentTimeMillis() < limit) {
// read data and store the file
byte[] data = new byte[filesize];
client.getInputStream().readNBytes(data, 0, filesize);
FileOutputStream o = new FileOutputStream(dstore.getFile_folder() +
"/" + filename);
o.write(data);
o.flush();
o.close();
dstore.getFiles().add(filename);
finished = true;
}
if(!finished) {
System.out.println("ERROR: time expired");
return;
}
PrintWriter r = new PrintWriter(new OutputStreamWriter(controllerSocket.getOutputStream()));
r.println("STORE_ACK " + filename + " " + dstore.getPort());
r.flush();
} catch (IndexOutOfBoundsException e) {
System.out.println("Arguments don;t match the STORE operation");
}
break;
case "LOAD_DATA":
if(tokens.length != 2) {
System.out.println("Arguments don't match in LOAD operation");
} else {
String filename = tokens[1];
try {
if (dstore.getFiles().contains(filename)) {
File file = new File(dstore.getFile_folder() + "/" + filename);
FileInputStream i = new FileInputStream(file);
int n;
while ((n = i.read()) != -1) {
client.getOutputStream().write(n);
}
i.close();
} else {
done = true;
client.close();
}
Server.pos = 0;
} catch (Exception ingored) {
res.println("RELOAD " + filename);
res.flush();
}
}
break;
default:
System.out.println("Unknown command");
break;
}
}
}
}
public void handleDstoreControllerReq(Dstore dstore, Socket controllerSocket) throws IOException {
OutputStream out = controllerSocket.getOutputStream();
InputStream in = controllerSocket.getInputStream();
BufferedReader req = new BufferedReader(new InputStreamReader(in));
PrintWriter res = new PrintWriter(new OutputStreamWriter(out));
String line;
System.out.println("Entered controller req");
while((line = req.readLine()) != null) {
String[] tokens = line.split(" ");
String command = tokens[0];
if(dstore.getFiles().size() < Controller.R) {
res.println("ERROR_NOT_ENOUGH_DSTORES");
res.flush();
} else {
if(command.equals("REMOVE")) {
try {
String filename = tokens[1];
if(dstore.getFiles().contains(filename)) {
System.out.println("REMOVE_ACK");
File file = new File(dstore.getFile_folder() + "/" + filename);
file.delete();
dstore.getFiles().remove(filename);
res.println("REMOVE_ACK " + filename);
} else {
res.println("ERROR_FILE_DOES_NOT_EXIST " + filename);
}
res.flush();
} catch(IndexOutOfBoundsException e) {
System.out.println("Arguments don't match in REMOVE opretaion");
}
} else {
System.out.println("Unknown command");
}
}
}
}
public static void main(String[] args) {
if(args.length != 4) {
System.out.println("Incorrect number of arguments for the dstore");
return;
}
int port = Integer.parseInt(args[0]);
int cport = Integer.parseInt(args[1]);
int timeout = Integer.parseInt(args[2]);
String file_folder = args[3];
Dstore dstore = new Dstore(port, cport, timeout, file_folder);
try {
Socket socket = new Socket();
socket.connect(new InetSocketAddress("localhost", cport));
OutputStream out = socket.getOutputStream();
PrintWriter res = new PrintWriter(new OutputStreamWriter(out));
res.println("JOIN " + port);
res.flush();
dstore.listenController(socket);
dstore.listenClient(socket);
} catch (Exception e) {
e.printStackTrace();
}
}
}