Skip to content
Snippets Groups Projects
Commit 5ee1ed82 authored by dl3g19's avatar dl3g19
Browse files

Failure handling started

Threading started
parent 24843952
No related branches found
No related tags found
No related merge requests found
File added
No preview for this file type
import java.io.*;
import java.net.*;
import java.lang.Runnable;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.util.HashMap;
public class Controller {
protected int cport; //Port to listen on
......@@ -9,14 +12,118 @@ public class Controller {
protected int timeout; //in milliseconds
protected int rebalancePeriod; //How long to wait to start the next rebalance operation, in milliseconds
protected class IndexEntry {
protected int filesize;
protected List<Integer> storedBy;
protected int numberToStore;
protected String status;
protected Object storeAckLock;
public IndexEntry() {
filesize = -1;
storedBy = new SyncList();
numberToStore = 0;
status = "store in progress";
storeAckLock = new Object();
}
public synchronized void setFilesize(int filesize) {
this.filesize = filesize;
}
public synchronized int getFilesize() {
return filesize;
}
public void addStoredBy(int dstore) {
storedBy.add(new Integer(dstore));
if(storedBy.size() == numberToStore) storeAckLock.notify();
}
public void addStoredBy(List<Integer> dstores) {
storedBy.addAll(dstores);
if(storedBy.size() == numberToStore) storeAckLock.notify();
}
public List<Integer> getStoredBy() {
return storedBy;
}
public synchronized void setNumberToStore(int i) {
numberToStore = i;
}
public synchronized void setStatus(String status) {
this.status = status;
}
public synchronized int getStatus() {
return status;
}
public Object getLock() {
return storeAckLock;
}
}
protected class SyncList extends ArrayList<Integer> {
public SyncList() {
super();
}
@Override
public boolean add(Integer i) {
synchronized(this) {
super.add(i);
}
}
@Override
public boolean addAll(Collection<Integer> c) {
synchronized(this) {
super.addAll(c);
}
}
@Override
public Integer get(int i) {
synchronized(this) {
super.get(i);
}
}
@Override
public int size() {
synchronized(this) {
super.size();
}
}
@Override
public boolean remove(int i) {
synchronized(this) {
super.remove(i);
}
}
@Override
public boolean remove(Integer i) {
synchronized(this) {
super.remove(i);
}
}
}
protected List<Integer> dstores;
protected Map<String,IndexEntry> index;
public Controller(int cport, int rFactor, int timeout, int rebalancePeriod) {
this.cport = cport;
this.rFactor = rFactor;
this.timeout = timeout;
this.rebalancePeriod = rebalancePeriod;
dstores = new ArrayList<Integer>();
dstores = new SyncList();
index = new HashMap<String,IndexEntry>();
}
public static void main(String[] args) {
......@@ -58,6 +165,7 @@ public class Controller {
in.close();
}
catch(Exception e) {
//Log error
e.printStackTrace();
System.out.println("Continue...");
}
......@@ -68,7 +176,7 @@ public class Controller {
}
}
void handleMessage(String[] message, Socket client) {
void handleMessage(String[] message, Socket client) throws Exception {
if(message[0].equals("JOIN")) {
dstores.add(Integer.parseInt(message[1]));
rebalance();
......@@ -76,8 +184,14 @@ public class Controller {
else if(message[0].equals("STORE")) {
store(client, message[1]);
}
else if(message[0].equals("STORE_ACK")) {
storeAck(client, message[1]);
}
else if(message[0].equals("LOAD")) {
load(client, message[1]);
load(client, message[1], false);
}
else if(message[0].equals("RELOAD")) {
load(client, message[1], true);
}
else if(message[0].equals("REMOVE")) {
remove(client, message[1]);
......@@ -85,50 +199,126 @@ public class Controller {
else if(message[0].equals("LIST")) {
list(client);
}
else {
//Log error and continue (throw exception?)
}
}
void store(Socket client, String filename) {
//Update index to "store in progress"
//Select Dstores
int[] storesToStore = new int[rFactor];
for(int i=0; i<rFactor; i++) {
storesToStore[i] = dstores.get(i).intValue();
void store(Socket client, String filename) throws Exception {
new Thread(() -> {
try {
if(index.containsKey(filename)) {
PrintWriter out = new PrintWriter(client.getOutputStream());
out.print("ERROR ALREADY_EXISTS " + filename);
out.flush();
out.close();
return;
}
//Update index to "store in progress"
IndexEntry entry = new IndexEntry();
index.put(filename, entry);
//Select Dstores
int[] storesToStore = new int[rFactor];
for(int i=0; i<rFactor; i++) {
Integer thisStore = dstores.get(i);
storesToStore[i] = thisStore.intValue();
}
entry.setNumberToStore(rFactor);
//Send STORE_TO message
PrintWriter out = new PrintWriter(client.getOutputStream());
out.print("STORE_TO");
for(int port : storesToStore) {
out.print(" " + port);
}
out.flush();
//Wait for STORE_ACKs from datastores in storesToStore
try {
entry.getLock().wait(timeout);
}
catch(InterruptedException e) {
e.printStackTrace();
}
if(entry.getStoredBy().size() < rFactor) {
//Log error
}
//Update index to "store complete"
entry.status = "store complete";
//Send STORE_COMPLETE message
out.print("STORE_COMPLETE");
out.flush();
out.close();
}
catch(IOException e) {
e.printStackTrace();
}
}).start();
}
void storeAck(Socket client, String filename) throws Exception {
if(!index.containsKey(filename)) {
//Throw logging exception
return;
}
//Send STORE_TO message
PrintWriter out = new PrintWriter(client.getOutputStream());
out.print("STORE_TO");
for(int port : storesToStore) {
out.print(" ");
out.print(port);
IndexEntry thisEntry = index.get(filename);
thisEntry.addStoredBy(new Integer(client.getPort()));
}
void load(Socket client, String filename, boolean reload) throws Exception {
if(!index.containsKey(filename)) {
PrintWriter out = new PrintWriter(client.getOutputStream());
out.print("ERROR DOES_NOT_EXIST");
out.flush();
out.close();
}
out.flush();
//Wait for STORE_ACKs from datastores in storesToStore
//Select a Dstore which contains the file
IndexEntry thisEntry = index.get(filename);
int thisStore = thisEntry.storedBy.get(0).intValue();
int thisSize = thisEntry.filesize;
//Update index to "store complete"
// !!TO DO: RELOAD COMMAND!!
//Send STORE_COMPLETE message
out.print("STORE_COMPLETE");
//Send LOAD_FROM message
PrintWriter out = new PrintWriter(client.getOutputStream());
out.print("LOAD_FROM " + thisStore + " " + thisSize);
out.flush();
out.close();
}
void load(Socket client, String filename) {
//Select a Dstore which contains the file
void remove(Socket client, String filename) throws Exception {
if(!index.containsKey(filename)) {
PrintWriter clientOut = new PrintWriter(client.getOutputStream());
clientOut.print("ERROR DOES_NOT_EXIST");
clientOut.flush();
clientOut.close();
}
//Send LOAD_FROM message
}
void remove(Socket client, String filename) {
//Update index to "remove in progress"
IndexEntry entry = index.get(filename);
entry.status = "remove in progress";
//Send REMOVE message to all Dstores storing the file
for(Integer dstore : entry.storedBy) {
Socket socket = new Socket(InetAddress.getLocalHost(), dstore.intValue());
PrintWriter out = new PrintWriter(socket.getOutputStream());
out.write("REMOVE " + filename);
out.flush();
out.close();
socket.close();
}
//Wait for REMOVE_ACKs from all Dstores which were sent the REMOVE message
//Update index to "remove complete"
entry.status = "remove complete";
//Send REMOVE_COMPLETE to client
PrintWriter clientOut = new PrintWriter(client.getOutputStream());
......@@ -137,11 +327,17 @@ public class Controller {
clientOut.close();
}
void list(Socket client) {
void list(Socket client) throws Exception {
//Send file list to client
PrintWriter out = new PrintWriter(client.getOutputStream());
for(String name : index.keySet()) {
out.println(name);
}
out.flush();
out.close();
}
void rebalance() {
void rebalance() throws Exception {
//Send LIST message to each Dstore and receive their file list
//Create a new file allocation so that:
......
No preview for this file type
import java.io.*;
import java.nio.file.Files;
import java.net.*;
import java.util.Map;
import java.util.HashMap;
......@@ -51,10 +52,11 @@ public class Dstore {
Socket client = server.accept();
BufferedReader in = new BufferedReader(new InputStreamReader(client.getInputStream()));
String[] message = in.readLine().split(" ");
handleMessage(message, client);
handleMessage(message, client, in);
in.close();
}
catch(Exception e) {
//Log error
e.printStackTrace();
}
}
......@@ -64,9 +66,9 @@ public class Dstore {
}
}
void handleMessage(String[] message, Socket client) {
void handleMessage(String[] message, Socket client, BufferedReader clientIn) throws Exception {
if(message[0].equals("STORE")) {
store(client, message[1], Integer.parseInt(message[2]));
store(client, message[1], Integer.parseInt(message[2]), clientIn);
}
else if(message[0].equals("LOAD_DATA")) {
load(client, message[1]);
......@@ -80,33 +82,90 @@ public class Dstore {
else if(message[0].equals("REBALANCE")) {
rebalance(client, message);
}
else {
//Log error and continue (throw exception?)
}
}
void store(Socket client, String filename, int filesize) {
void store(Socket client, String filename, int filesize, BufferedReader in) throws Exception {
//Send ACK message to client
PrintWriter out = new PrintWriter(client.getOutputStream());
out.print("ACK");
out.flush();
out.close();
//Receive file content from client
FileOutputStream writer = new FileOutputStream(fileFolder + "/" + filename, false);
//Store the file data in fileFolder
//Receive + write file content from client
int byteCount = filesize;
while(byteCount > 0) {
byte[] nextLine = in.readLine().getBytes();
writer.write(nextLine);
writer.flush();
byteCount -= nextLine.length;
}
writer.close();
//Send STORE_ACK message to the Controller
PrintWriter controllerOut = new PrintWriter(new Socket(InetAddress.getLocalHost(), cport).getOutputStream());
controllerOut.print("STORE_ACK " + filename);
controllerOut.flush();
controllerOut.close();
}
void load(Socket client, String filename) {
void load(Socket client, String filename) throws Exception {
//Send the content of the file in fileFolder to the client
PrintWriter out = new PrintWriter(client.getOutputStream());
FileInputStream reader;
try {
reader = new FileInputStream(fileFolder + "/" + filename);
}
catch(FileNotFoundException e) {
out.print("ERROR DOES_NOT_EXIST");
out.flush();
out.close();
return;
}
byte[] buf = new byte[8];
while(reader.read(buf) != -1) {
out.print(new String(buf));
out.flush();
}
reader.close();
out.close();
}
void remove(Socket client, String filename) {
void remove(Socket client, String filename) throws Exception {
//Remove the file from fileFolder
Path path = new File(fileFolder + "/" + filename).toPath();
PrintWriter out = new PrintWriter(client.getOutputStream());
if(Files.deleteIfExists(path)) {
//Send REMOVE_ACK message to client (the controller)
out.print("REMOVE_ACK");
}
else {
//Send DOES NOT EXIST error
out.print("ERROR DOES_NOT_EXIST " + filename);
}
//Send REMOVE_ACK message to client (the controller)
out.flush();
out.close();
}
void list(Socket client) {
void list(Socket client) throws Exception {
//Send a list of all files in fileFolder to client (the controller)
PrintWriter out = new PrintWriter(client.getOutputStream());
for(File file : new File(fileFolder).listFiles()) {
out.print(file.getName());
out.flush();
}
out.close();
}
void rebalance(Socket client, String[] message) {
void rebalance(Socket client, String[] message) throws Exception {
//Interpret files to send and files to remove from the message
Map<String,Integer[]> filesToSend;
String[] filesToRemove;
......@@ -139,9 +198,21 @@ public class Dstore {
}
//Send each file to send to the Dstore at the specified port number
for(String filename : filesToSend.keySet()) {
for(Integer dstore : filesToSend.get(filename)) {
//Same store functions as used in the client object
}
}
//Remove each file to remove from fileFolder
for(String filename : filesToRemove) {
new File(fileFolder + "/" + filename).delete();
}
//Send REBALANCE_COMPLETE message to client (the controller)
PrintWriter out = new PrintWriter(client.getOutputStream());
out.print("REBALANCE COMPLETE");
out.flush();
out.close();
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment