Skip to content
Snippets Groups Projects
Commit d9ecb07d authored by tmp1u19's avatar tmp1u19 :octopus:
Browse files

Change the architecture of the project completely!!

parent e4c07a68
No related branches found
No related tags found
No related merge requests found
import java.io.*;
import java.net.*;
import java.util.*;
public class Controller {
static Map<Integer, Dstore> dstores = Collections.synchronizedMap(new HashMap<>());
static Map<String, List<Dstore>> store = Collections.synchronizedMap(new HashMap<>());
static Map<String, String> index = Collections.synchronizedMap(new HashMap<>());
private int cport;
static int R;
private int timeout;
private int rebalance_period;
public Controller(int cport, int R, int timeout, int rebalance_period) {
this.cport = cport;
this.R = R;
this.timeout = timeout;
this.rebalance_period = rebalance_period;
}
public void start() {
try {
ServerSocket controller = new ServerSocket(cport);
Server server = new Server(this);
System.out.println("Start listening for connections");
for(;;) {
try {
Socket client = controller.accept();
new Thread() {
@Override
public void run() {
try {
server.handleClient(client);
} catch (Exception e) {
e.printStackTrace();
}
}
}.start();
} catch (Exception e) {
e.printStackTrace();
}
}
} catch(Exception e) {
e.printStackTrace();
}
}
public int getCport() {
return cport;
}
public int getR() {
return R;
}
public int getRebalance_period() {
return rebalance_period;
}
public int getTimeout() {
return timeout;
}
public static void main(String[] args) {
if(args.length != 4) {
System.out.println("Incorrect number of arguments for the controller");
return;
}
int cport = Integer.parseInt(args[0]);
int R = Integer.parseInt(args[1]);
int timeout = Integer.parseInt(args[2]);
int rebalance_perios = Integer.parseInt(args[3]);
new Controller(cport, R, timeout,rebalance_perios).start();
}
}
import java.io.*;
import java.net.*;
import java.util.HashSet;
import java.util.Set;
public class Dstore {
private int port;
private int cport;
private int timeout;
private String file_folder;
private Socket controllerSocket;
private Set<String> files;
public Dstore(int port, int cport) throws IOException {
this.port = port;
this.cport = cport;
InetAddress ip = InetAddress.getLocalHost();
controllerSocket = new Socket();
controllerSocket.connect(new InetSocketAddress(ip, cport));
files = new HashSet<>();
}
public void listenClient() {
try {
ServerSocket dstore = new ServerSocket(port);
for(;;) {
try {
Socket client = dstore.accept();
new Thread() {
@Override
public void run() {
try {
controllerSocket.connect(new InetSocketAddress("localhost", cport));
new Handler().handleDstoreClientReq(client, Dstore.this);
} catch (IOException e) {
e.printStackTrace();
}
}
}.start();
} catch(Exception e) {
e.printStackTrace();
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
public void listenController() {
try {
new Thread() {
@Override
public void run() {
try {
controllerSocket.connect(new InetSocketAddress("localhost", cport));
new Handler().handleDstoreControllerReq(Dstore.this);
} catch (IOException e) {
e.printStackTrace();
}
}
}.start();
} catch (Exception e) {
e.printStackTrace();
}
}
public int getPort() {
return port;
}
public void setTimeout(int timeout) {
this.timeout = timeout;
}
public void setFile_folder(String file_folder) {
this.file_folder = file_folder;
}
public int getCport() {
return cport;
}
public int getTimeout() {
return timeout;
}
public String getFile_folder() {
return file_folder;
}
public Set<String> getFiles() {
return files;
}
public Socket getControllerSocket() {
return controllerSocket;
}
public static void main(String[] args) throws IOException {
if(args.length != 4) {
System.out.println("Incorrect number of arguments for the dstore");
return;
}
int port = Integer.parseInt(args[0]);
int cport = Integer.parseInt(args[1]);
int timeout = Integer.parseInt(args[2]);
String file_folder = args[3];
Dstore dstore = new Dstore(port, cport);
dstore.setTimeout(timeout);
dstore.setFile_folder(file_folder);
Controller.dstores.put(port, dstore);
dstore.listenController();
dstore.listenClient();
}
}
import java.io.*;
import java.net.*;
import java.util.ArrayList;
import java.util.List;
public class Handler {
public void handleDstoreClientReq(Socket client, Dstore dstore) throws IOException {
OutputStream out = client.getOutputStream();
InputStream in = client.getInputStream();
BufferedReader req = new BufferedReader(new InputStreamReader(in));
PrintWriter res = new PrintWriter(new OutputStreamWriter(out));
String line;
while((line = req.readLine()) != null) {
String[] tokens = line.split(" ");
String command = tokens[0];
if(dstore.getFiles().size() < Controller.R) {
res.println("ERROR_NOT_ENOUGH_DSTORES");
res.flush();
} else {
switch(command) {
case "STORE" : {
try {
res.println("ACK");
res.flush();
String filename = tokens[1];
int filesize = Integer.parseInt(tokens[2]);
// read data and store the file
byte[] data = new byte[filesize];
client.getInputStream().readNBytes(data, 0, filesize);
FileOutputStream o = new FileOutputStream(dstore.getFile_folder() +
"/" + filename);
o.write(data);
o.flush();
o.close();
dstore.getFiles().add(filename);
if(Controller.store.containsKey(filename)) {
Controller.store.get(filename).add(dstore);
} else {
List<Dstore> d = new ArrayList<>();
d.add(dstore);
Controller.store.put(filename, d);
}
PrintWriter r = new PrintWriter(new OutputStreamWriter(dstore.getControllerSocket().getOutputStream()));
r.println("STORE_COMPLETE");
} catch (IndexOutOfBoundsException e) {
System.out.println("Arguments don;t match the STORE operation");
}
}
case "LOAD_DATA" : {
try {
String filename = tokens[1];
if(dstore.getFiles().contains(filename)) {
File file = new File(dstore.getFile_folder() + "/" + filename);
int n;
while((n = in.read())!= -1) {
client.getOutputStream().write(n);
}
} else {
client.close();
}
} catch (IndexOutOfBoundsException e) {
System.out.println("Arguments don't match in LOAD operation");
}
}
}
}
}
}
public void handleDstoreControllerReq(Dstore dstore) throws IOException {
OutputStream out = dstore.getControllerSocket().getOutputStream();
InputStream in = dstore.getControllerSocket().getInputStream();
BufferedReader req = new BufferedReader(new InputStreamReader(in));
PrintWriter res = new PrintWriter(new OutputStreamWriter(out));
String line;
while((line = req.readLine()) != null) {
String[] tokens = line.split(" ");
String command = tokens[0];
if(dstore.getFiles().size() < Controller.R) {
res.println("ERROR_NOT_ENOUGH_DSTORES");
res.flush();
} else {
switch (command) {
case "REMOVE" : {
try {
String filename = tokens[1];
if(dstore.getFiles().contains(filename)) {
File file = new File(dstore.getFile_folder() + "/" + filename);
file.delete();
Controller.store.get(filename).remove(dstore);
res.println("REMOVE_ACK");
res.flush();
} else {
res.println("ERROR_FILE_DOES_NOT_EXIST");
res.flush();
}
} catch(IndexOutOfBoundsException e) {
System.out.println("Arguments don't match in REMOVE opretaion");
}
}
}
}
}
}
}
import java.io.*;
import java.net.*;
import java.util.*;
public class Server {
private Controller controller;
public Server(Controller controller) {
this.controller = controller;
}
public void handleClient(Socket client) throws IOException {
OutputStream out = client.getOutputStream();
InputStream in = client.getInputStream();
BufferedReader req = new BufferedReader(new InputStreamReader(in));
PrintWriter res = new PrintWriter(new OutputStreamWriter(out));
System.out.println("---------NEW CONNECTION---------");
String line;
while((line = req.readLine()) != null) {
String[] tokens = line.split(" ");
String command = tokens[0];
if(command.equals("JOIN")) {
Dstore dstore = new Dstore(client.getPort(), controller.getCport());
Controller.dstores.put(client.getPort(), dstore);
} else if(Controller.dstores.size() < controller.getR()) {
res.println("ERROR_NOT_ENOUGH_DSTORES");
res.flush();
} else if(Controller.dstores.containsKey(client.getPort())) {
switch(command) {
case "STORE_ACK" : handleStoreACK();
case "REMOVE_ACK" : handleRemoveACK();
}
} else {
switch(command) {
case "STORE": handleStore(client, tokens);
case "LOAD": handleLoad(client, tokens);
case "REMOVE": handleRemove(client, tokens);
case "LIST": handleList(client);
}
}
}
}
private void handleStore(Socket client, String[] tokens) throws IOException {
try {
String filename = tokens[1];
int filesize = Integer.parseInt(tokens[2]);
PrintWriter res = new PrintWriter(new OutputStreamWriter(client.getOutputStream()));
Controller.index.put(filename, "store in progress");
String msg = "";
int i = 0;
List<Dstore> temp = new ArrayList<>();
for(int port : Controller.dstores.keySet()) {
if(i == controller.getR()) {
break;
}
temp.add(Controller.dstores.get(port));
msg = Controller.dstores.get(port).getPort() + msg + " ";
}
res.println("STORE_TO " + msg);
res.flush();
boolean done = false;
long limit = System.currentTimeMillis() + controller.getTimeout();
while(!done && System.currentTimeMillis() < limit) {
if(Controller.store.containsKey(filename) && Controller.store.get(filename).size() == controller.getR()) {
Controller.index.put(filename, "store complete");
Controller.store.put(filename, temp);
res.println("STORE_COMPLETE");
res.flush();
done = true;
}
}
if(!done) {
System.out.println(filename + " failed to upload");
}
} catch (IndexOutOfBoundsException e) {
System.out.println("Arguments don't match in STORE operation");
}
}
private void handleLoad(Socket client, String[] tokens) throws IOException {
try {
String filename = tokens[1];
PrintWriter res = new PrintWriter(new OutputStreamWriter(client.getOutputStream()));
if(!Controller.store.containsKey(filename)) {
res.println("ERROR_FILE_DOES_NOT_EXIST");
} else {
// select a Dstore from there and give an approriate error if all Dstores fail
res.println("LOAD_FROM " + Controller.store.get(filename).get(0).getPort());
}
} catch (IndexOutOfBoundsException e) {
System.out.println("Arguments don't match in LOAD operation");
}
}
private void handleRemove(Socket client, String[] tokens) throws IOException {
try {
String filename = tokens[1];
Controller.index.put(filename, "remove in progress");
for(Dstore dstore : Controller.store.get(filename)) {
Socket socket = new Socket();
socket.connect(new InetSocketAddress(dstore.getPort()));
PrintWriter res = new PrintWriter(new OutputStreamWriter(socket.getOutputStream()));
res.println("REMOVE " + filename);
res.flush();
}
boolean done = false;
long limit = System.currentTimeMillis() + controller.getTimeout();
PrintWriter res = new PrintWriter(new OutputStreamWriter(client.getOutputStream()));
while(!done && System.currentTimeMillis() < limit) {
if(Controller.store.get(filename).isEmpty()) {
Controller.index.put(filename, "remove complete");
Controller.store.remove(filename);
res.println("REMOVE_COMPLETE");
res.flush();
done = true;
}
}
if(!done) {
System.out.println(filename + " failed to remove");
}
} catch (IndexOutOfBoundsException e) {
System.out.println("Arguments don't match in REMOVE operation");
}
}
private void handleList(Socket client) throws IOException {
String msg = "";
for(String filename : Controller.store.keySet()) {
msg = filename + " " + msg;
}
PrintWriter res = new PrintWriter(new OutputStreamWriter(client.getOutputStream()));
res.println("LIST " + msg);
}
private void handleStoreACK() {
}
private void handleRemoveACK() {
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment