Skip to content
Snippets Groups Projects
Commit 67868ba6 authored by tmp1u19's avatar tmp1u19 :octopus:
Browse files

Add the functionality for List and store in the controller

parent c7292a34
Branches
No related tags found
No related merge requests found
import java.io.*; import java.io.*;
import java.net.*; import java.net.*;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.zip.InflaterInputStream;
public class Controller { public class Controller {
...@@ -12,13 +14,20 @@ public class Controller { ...@@ -12,13 +14,20 @@ public class Controller {
// it doesn't serve any client request until at least R Dstores have joined the system // it doesn't serve any client request until at least R Dstores have joined the system
static File file = new File("output.txt"); static File file = new File("output.txt");
static int R;
// list of all dstores
static List<Socket> ports = new ArrayList<>();
// file, list of dstores
static HashMap<String, List<Socket>> index = new HashMap<>();
// file, status (e.g. in progress)
static HashMap<String, String> status = new HashMap<>();
public static void main (String[] args) { public static void main (String[] args) {
// port to listen on // port to listen on
final int cport = Integer.parseInt(args[0]); final int cport = Integer.parseInt(args[0]);
// replication factor => number of Dstores to join // replication factor => number of Dstores to join
int R = Integer.parseInt(args[1]); R = Integer.parseInt(args[1]);
// timeout in milliseconds // timeout in milliseconds
int timeout = Integer.parseInt(args[2]); int timeout = Integer.parseInt(args[2]);
// how long to wait to start the next rebalance operation // how long to wait to start the next rebalance operation
...@@ -26,53 +35,39 @@ public class Controller { ...@@ -26,53 +35,39 @@ public class Controller {
System.out.println("Started"); System.out.println("Started");
// list of all dstores
List<Socket> ports = new ArrayList<>();
// file, list of dstores
HashMap<String, List<Socket>> index = new HashMap<>();
// file, status (e.g. in progress)
HashMap<String, String> status = new HashMap<>();
createLogFile(); createLogFile();
try { try {
ServerSocket socket = new ServerSocket(cport); ServerSocket socket = new ServerSocket(cport);
for(;;) { for(;;) {
try { try {
if(R > 0) { while(R > 0) {
// accept Dstores
System.out.println("waiting for Dstore to join"); System.out.println("waiting for Dstore to join");
Socket client = socket.accept(); // establish a connection between client and server Socket client = socket.accept(); // establish a connection between client and server
System.out.print(R + ": "); System.out.println(R);
ports.add(client); ports.add(client);
R = R - 1; R = R - 1;
} else {
//Socket clientDstore = socket.accept();
for(Socket client : ports) {
Thread t = new Thread() {
@Override
public void run() {
try {
handleDstores(client);
} catch (Exception e) {
e.printStackTrace();
}
}
};
t.start();
}
break;
} }
// create a Thread for each client
Socket client = socket.accept();
Thread t = new Thread() {
@Override
public void run() {
try {
handleClient(client);
} catch (Exception e) {
e.printStackTrace();
}
}
};
t.start();
} catch (Exception e1) { } catch (Exception e1) {
System.out.println(e1); System.out.println(e1);
} }
} }
} catch (IOException e) { } catch (IOException e) {
System.out.println("error" + e); System.out.println("error" + e);
} }
...@@ -125,17 +120,70 @@ public class Controller { ...@@ -125,17 +120,70 @@ public class Controller {
if (cmd.equals("STORE")) { if (cmd.equals("STORE")) {
String filename = tokens[1];
int filesize = Integer.parseInt(tokens[2]);
status.put(filename, "store in progress");
log("store in progress");
String msg = "";
for(Socket socket : ports) {
msg = socket.getPort() + msg + " ";
}
// test and see if this actually gets all the ports
System.out.println("STORE_TO " + msg);
// send this message to the client so the client knows where to store the file
outputStream.write(("STORE_TO " + msg).getBytes());
outputStream.flush();
final int[] num = {0};
for(Socket socket : ports) {
Thread t = new Thread() {
@Override
public void run() {
try {
InputStream inputStream = socket.getInputStream();
BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream));
String line = reader.readLine();
String[] ts = line.split(" ");
String cmd = ts[0];
if(cmd == "STORE_ACK") {
num[0]++;
}
} catch (Exception e) {
e.printStackTrace();
}
}
};
t.start();
}
status.put(filename, "store complete");
outputStream.write("STORE_COMPLETE".getBytes());
} else if (cmd.equals("LOAD")) { } else if (cmd.equals("LOAD")) {
} else if (cmd.equals("REMOVE")) { } else if (cmd.equals("REMOVE")) {
} else if (cmd.equals("LIST")) { } else if (cmd.equals("LIST")) {
String msg = "";
for(String filename : status.keySet()) {
msg = filename + " " + msg;
}
System.out.println(msg);
outputStream.write(("LIST " + msg).getBytes());
} else { } else {
String msg = "unknown command " + line; String msg = "unknown command " + line;
log(msg); log(msg);
} }
} }
clientSocket.close();
} }
private static void createLogFile() { private static void createLogFile() {
...@@ -154,7 +202,7 @@ public class Controller { ...@@ -154,7 +202,7 @@ public class Controller {
try { try {
FileWriter writer = new FileWriter(file, true); FileWriter writer = new FileWriter(file, true);
writer.write(java.time.LocalDate.now() + ", " + writer.write(java.time.LocalDate.now() + ", " +
java.time.LocalTime.now() + ": " + message + '\n'); java.time.LocalTime.now() + ", CONTROLLER: " + message + '\n');
writer.close(); writer.close();
} catch (IOException e) { } catch (IOException e) {
e.printStackTrace(); e.printStackTrace();
......
import java.io.*; import java.io.*;
import java.net.*; import java.net.*;
import java.nio.charset.StandardCharsets;
public class Dstore { public class Dstore {
...@@ -71,14 +70,17 @@ public class Dstore { ...@@ -71,14 +70,17 @@ public class Dstore {
} else if (cmd.equals("LOAD_DATA")) { } else if (cmd.equals("LOAD_DATA")) {
String filename = tokens[1]; String filename = tokens[1];
FileInputStream file = new FileInputStream(file_folder + "/" + filename); File file = new File(file_folder + "/" + filename);
int n; if(file.exists()) {
while((n = file.read())!= -1) { FileInputStream in = new FileInputStream(file_folder + "/" + filename);
outputStream.write(n); int n;
while((n = in.read())!= -1) {
outputStream.write(n);
}
in.close();
} else {
clientSocket.close();
} }
file.close();
} else if (cmd.equals("LIST")) {
} else if (cmd.equals("QUIT")) { } else if (cmd.equals("QUIT")) {
clientSocket.close(); clientSocket.close();
...@@ -96,7 +98,7 @@ public class Dstore { ...@@ -96,7 +98,7 @@ public class Dstore {
} }
} }
private static void handleController(Socket controllerSocket) throws IOException { private static void handleController(Socket controllerSocket, String file_folder) throws IOException {
OutputStream outputStream = controllerSocket.getOutputStream(); OutputStream outputStream = controllerSocket.getOutputStream();
InputStream inputStream = controllerSocket.getInputStream(); InputStream inputStream = controllerSocket.getInputStream();
BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream)); BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream));
...@@ -108,6 +110,15 @@ public class Dstore { ...@@ -108,6 +110,15 @@ public class Dstore {
if (cmd.equals("REMOVE")) { if (cmd.equals("REMOVE")) {
String filename = tokens[1];
File file = new File(file_folder + "/" + filename);
if(file.exists()) {
file.delete();
outputStream.write(("REMOVE_ACK " + filename).getBytes());
} else {
outputStream.write(("ERROR_FILE_DOES_NOT_EXIST " + filename).getBytes());
}
} else if (cmd.equals("QUIT")) { } else if (cmd.equals("QUIT")) {
controllerSocket.close(); controllerSocket.close();
break; break;
...@@ -139,7 +150,7 @@ public class Dstore { ...@@ -139,7 +150,7 @@ public class Dstore {
try { try {
FileWriter writer = new FileWriter(file, true); FileWriter writer = new FileWriter(file, true);
writer.write(java.time.LocalDate.now() + ", " + writer.write(java.time.LocalDate.now() + ", " +
java.time.LocalTime.now() + ": " + message + '\n'); java.time.LocalTime.now() + ", DSTORE: " + message + '\n');
writer.close(); writer.close();
} catch (IOException e) { } catch (IOException e) {
e.printStackTrace(); e.printStackTrace();
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment