Skip to content
Snippets Groups Projects
Commit 69f99016 authored by dl3g19's avatar dl3g19
Browse files

More threading implemented

parent 5ee1ed82
No related branches found
No related tags found
No related merge requests found
No preview for this file type
File added
No preview for this file type
...@@ -5,6 +5,7 @@ import java.util.List; ...@@ -5,6 +5,7 @@ import java.util.List;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Map; import java.util.Map;
import java.util.HashMap; import java.util.HashMap;
import java.util.Collection;
public class Controller { public class Controller {
protected int cport; //Port to listen on protected int cport; //Port to listen on
...@@ -18,6 +19,7 @@ public class Controller { ...@@ -18,6 +19,7 @@ public class Controller {
protected int numberToStore; protected int numberToStore;
protected String status; protected String status;
protected Object storeAckLock; protected Object storeAckLock;
protected List<Reloader> clientLoadList;
public IndexEntry() { public IndexEntry() {
filesize = -1; filesize = -1;
...@@ -25,6 +27,7 @@ public class Controller { ...@@ -25,6 +27,7 @@ public class Controller {
numberToStore = 0; numberToStore = 0;
status = "store in progress"; status = "store in progress";
storeAckLock = new Object(); storeAckLock = new Object();
clientLoadList = new ArrayList<Reloader>();
} }
public synchronized void setFilesize(int filesize) { public synchronized void setFilesize(int filesize) {
...@@ -36,7 +39,7 @@ public class Controller { ...@@ -36,7 +39,7 @@ public class Controller {
} }
public void addStoredBy(int dstore) { public void addStoredBy(int dstore) {
storedBy.add(new Integer(dstore)); storedBy.add(Integer.valueOf(dstore));
if(storedBy.size() == numberToStore) storeAckLock.notify(); if(storedBy.size() == numberToStore) storeAckLock.notify();
} }
...@@ -45,6 +48,16 @@ public class Controller { ...@@ -45,6 +48,16 @@ public class Controller {
if(storedBy.size() == numberToStore) storeAckLock.notify(); if(storedBy.size() == numberToStore) storeAckLock.notify();
} }
public void removeStoredBy(int dstore) {
storedBy.remove(Integer.valueOf(dstore));
if(storedBy.size() == 0) storeAckLock.notify();
}
public void removeStoredBy(List<Integer> dstores) {
storedBy.removeAll(dstores);
if(storedBy.size() == 0) storeAckLock.notify();
}
public List<Integer> getStoredBy() { public List<Integer> getStoredBy() {
return storedBy; return storedBy;
} }
...@@ -57,13 +70,17 @@ public class Controller { ...@@ -57,13 +70,17 @@ public class Controller {
this.status = status; this.status = status;
} }
public synchronized int getStatus() { public synchronized String getStatus() {
return status; return status;
} }
public Object getLock() { public Object getLock() {
return storeAckLock; return storeAckLock;
} }
public List<Reloader> getLoadList() {
return clientLoadList;
}
} }
protected class SyncList extends ArrayList<Integer> { protected class SyncList extends ArrayList<Integer> {
...@@ -74,47 +91,53 @@ public class Controller { ...@@ -74,47 +91,53 @@ public class Controller {
@Override @Override
public boolean add(Integer i) { public boolean add(Integer i) {
synchronized(this) { synchronized(this) {
super.add(i); return super.add(i);
} }
} }
@Override @Override
public boolean addAll(Collection<Integer> c) { public boolean addAll(Collection<? extends Integer> c) {
synchronized(this) { synchronized(this) {
super.addAll(c); return super.addAll(c);
} }
} }
@Override @Override
public Integer get(int i) { public Integer get(int i) {
synchronized(this) { synchronized(this) {
super.get(i); return super.get(i);
} }
} }
@Override @Override
public int size() { public int size() {
synchronized(this) { synchronized(this) {
super.size(); return super.size();
} }
} }
@Override public Integer remove(int i) {
public boolean remove(int i) {
synchronized(this) { synchronized(this) {
super.remove(i); return super.remove(i);
} }
} }
@Override
public boolean remove(Integer i) { public boolean remove(Integer i) {
synchronized(this) { synchronized(this) {
super.remove(i); return super.remove(i);
} }
} }
} }
protected class Reloader {
public boolean reload;
public Reloader() {
reload = false;
}
}
protected List<Integer> dstores; protected List<Integer> dstores;
protected Map<Integer,String[]> rebalanceMessages;
protected Map<String,IndexEntry> index; protected Map<String,IndexEntry> index;
public Controller(int cport, int rFactor, int timeout, int rebalancePeriod) { public Controller(int cport, int rFactor, int timeout, int rebalancePeriod) {
...@@ -179,6 +202,7 @@ public class Controller { ...@@ -179,6 +202,7 @@ public class Controller {
void handleMessage(String[] message, Socket client) throws Exception { void handleMessage(String[] message, Socket client) throws Exception {
if(message[0].equals("JOIN")) { if(message[0].equals("JOIN")) {
dstores.add(Integer.parseInt(message[1])); dstores.add(Integer.parseInt(message[1]));
System.out.println("Dstore at " + message[1] + " joined");
rebalance(); rebalance();
} }
else if(message[0].equals("STORE")) { else if(message[0].equals("STORE")) {
...@@ -188,10 +212,10 @@ public class Controller { ...@@ -188,10 +212,10 @@ public class Controller {
storeAck(client, message[1]); storeAck(client, message[1]);
} }
else if(message[0].equals("LOAD")) { else if(message[0].equals("LOAD")) {
load(client, message[1], false); load(client, message[1]);
} }
else if(message[0].equals("RELOAD")) { else if(message[0].equals("RELOAD")) {
load(client, message[1], true); reload(message[1]);
} }
else if(message[0].equals("REMOVE")) { else if(message[0].equals("REMOVE")) {
remove(client, message[1]); remove(client, message[1]);
...@@ -200,7 +224,13 @@ public class Controller { ...@@ -200,7 +224,13 @@ public class Controller {
list(client); list(client);
} }
else { else {
//Log error and continue (throw exception?) for(String name : message) {
if(!index.containsKey(name)) {
//Log error and continue (throw exception?)
return;
}
}
receiveDstoreList(client, message);
} }
} }
...@@ -262,93 +292,206 @@ public class Controller { ...@@ -262,93 +292,206 @@ public class Controller {
} }
void storeAck(Socket client, String filename) throws Exception { void storeAck(Socket client, String filename) throws Exception {
if(!index.containsKey(filename)) { new Thread(() -> {
//Throw logging exception if(!index.containsKey(filename)) {
return; //Throw logging exception
} return;
}
IndexEntry thisEntry = index.get(filename);
thisEntry.addStoredBy(new Integer(client.getPort())); IndexEntry thisEntry = index.get(filename);
thisEntry.addStoredBy(Integer.valueOf(client.getPort()));
}).start();
} }
void load(Socket client, String filename, boolean reload) throws Exception { void load(Socket client, String filename) throws Exception {
if(!index.containsKey(filename)) { new Thread(() -> {
PrintWriter out = new PrintWriter(client.getOutputStream()); try {
out.print("ERROR DOES_NOT_EXIST"); if(!index.containsKey(filename)) {
out.flush(); PrintWriter out = new PrintWriter(client.getOutputStream());
out.close(); out.print("ERROR DOES_NOT_EXIST");
} out.flush();
out.close();
//Select a Dstore which contains the file return;
IndexEntry thisEntry = index.get(filename); }
int thisStore = thisEntry.storedBy.get(0).intValue();
int thisSize = thisEntry.filesize; //Select a Dstore which contains the file
IndexEntry thisEntry = index.get(filename);
// !!TO DO: RELOAD COMMAND!! int thisStore = thisEntry.storedBy.get(0).intValue();
int thisSize = thisEntry.filesize;
//Send LOAD_FROM message
PrintWriter out = new PrintWriter(client.getOutputStream()); //Send LOAD_FROM message
out.print("LOAD_FROM " + thisStore + " " + thisSize); PrintWriter out = new PrintWriter(client.getOutputStream());
out.flush(); out.print("LOAD_FROM " + thisStore + " " + thisSize);
out.close(); out.flush();
Reloader reloadLock = new Object();
thisEntry.getLoadList().add(reloadLock);
int trials = 0;
while(true) {
reloadLock.wait(10 * timeout);
trials ++;
if(trials >= rFactor || !reloadLock.reload) break;
out.print("LOAD_FROM " + thisEntry.storedBy.get(trials).intValue() + " " + thisSize);
out.flush();
reloadLock.reload = false;
}
thisEntry.getLoadList().remove(reloadLock);
if(trials >= rFactor && reloadLock.reload) {
out.print("ERROR LOAD");
out.flush();
}
out.close();
}
catch(IOException e) {
e.printStackTrace();
}
}).start();
}
void reload(String filename) {
new Thread(() -> {
try {
for(Reloader r : index.get(filename).getLoadList()) {
r.reload = true;
r.notify();
}
}
catch(IOException e) {
e.printStackTrace();
}
}).start();
} }
void remove(Socket client, String filename) throws Exception { void remove(Socket client, String filename) throws Exception {
if(!index.containsKey(filename)) { new Thread(() -> {
PrintWriter clientOut = new PrintWriter(client.getOutputStream()); try {
clientOut.print("ERROR DOES_NOT_EXIST"); if(!index.containsKey(filename)) {
clientOut.flush(); PrintWriter clientOut = new PrintWriter(client.getOutputStream());
clientOut.close(); clientOut.print("ERROR DOES_NOT_EXIST");
} clientOut.flush();
clientOut.close();
//Update index to "remove in progress" return;
IndexEntry entry = index.get(filename); }
entry.status = "remove in progress";
//Update index to "remove in progress"
//Send REMOVE message to all Dstores storing the file IndexEntry entry = index.get(filename);
for(Integer dstore : entry.storedBy) { entry.status = "remove in progress";
Socket socket = new Socket(InetAddress.getLocalHost(), dstore.intValue());
PrintWriter out = new PrintWriter(socket.getOutputStream()); //Send REMOVE message to all Dstores storing the file
out.write("REMOVE " + filename); for(Integer dstore : entry.storedBy) {
out.flush(); Socket socket = new Socket(InetAddress.getLocalHost(), dstore.intValue());
out.close(); PrintWriter out = new PrintWriter(socket.getOutputStream());
socket.close(); out.write("REMOVE " + filename);
} out.flush();
out.close();
//Wait for REMOVE_ACKs from all Dstores which were sent the REMOVE message socket.close();
}
//Update index to "remove complete"
entry.status = "remove complete"; //Wait for REMOVE_ACKs from all Dstores which were sent the REMOVE message
try {
//Send REMOVE_COMPLETE to client entry.getLock().wait(timeout);
PrintWriter clientOut = new PrintWriter(client.getOutputStream()); }
clientOut.print("REMOVE_COMPLETE"); catch(InterruptedException e) {
clientOut.flush(); e.printStackTrace();
clientOut.close(); }
if(entry.getStoredBy().size() > 0) {
//Log error
}
//Update index to "remove complete"
entry.status = "remove complete";
//Send REMOVE_COMPLETE to client
PrintWriter clientOut = new PrintWriter(client.getOutputStream());
clientOut.print("REMOVE_COMPLETE");
clientOut.flush();
clientOut.close();
}
catch(IOException e) {
e.printStackTrace();
}
}).start();
} }
void list(Socket client) throws Exception { void list(Socket client) throws Exception {
//Send file list to client new Thread(() -> {
PrintWriter out = new PrintWriter(client.getOutputStream()); try {
for(String name : index.keySet()) { //Send file list to client
out.println(name); PrintWriter out = new PrintWriter(client.getOutputStream());
} for(String name : index.keySet()) {
out.flush(); out.println(name);
out.close(); }
out.flush();
out.close();
}
catch(IOException e) {
e.printStackTrace();
}
}).start();
} }
void rebalance() throws Exception { void rebalance() throws Exception {
//Send LIST message to each Dstore and receive their file list new Thread(() -> {
if(rebalanceMessages != null) return;
//Create a new file allocation so that: Map<Integer,String[]> dstoreFiles = new HashMap<Integer,String[]>();
//Each file appears rFactor times rebalanceMessages = dstoreFiles;
//Each file appears at most once on each datastore try {
//Files are evenly distributed (Dstores differ in capacity by at most 1, no 2 datastores have identical file lists) //Send LIST message to each Dstore and receive their file list
for(Integer dstore : dstores) {
//Make a (files to send, files to remove) pair for each Dstore Socket socket = new Socket(InetAddress.getLocalHost(), dstore.intValue());
PrintWriter out = new PrintWriter(socket.getOutputStream());
//Send the respective REBALANCE message to each Dstore out.write("LIST");
out.flush();
//Wait for REBALANCE_COMPLETE from all Dstores out.close();
socket.close();
}
dstoreFiles.wait(timeout);
if(dstoreFiles.size() < dstores.size()) {
//Log error
}
//Create a new file allocation so that:
//Each file appears rFactor times
//Each file appears at most once on each datastore
//Files are evenly distributed (Dstores differ in capacity by at most 1, no 2 datastores have identical file lists)
for(Integer i : reshuffle(dstoreFiles.keySet())) {
}
//Make a (files to send, files to remove) pair for each Dstore
//Send the respective REBALANCE message to each Dstore
//Wait for REBALANCE_COMPLETE from all Dstores
}
catch(IOException e) {
e.printStackTrace();
}
finally {
rebalanceMessages = null;
}
}).start();
}
void receiveDstoreList(Socket client, String[] list) {
new Thread(() -> {
if(rebalanceMessages == null) return;
rebalanceMessages.add(Integer.valueOf(client.getPort()), list);
if(rebalanceMessages.size() == dstores.size()) {
rebalanceMessages.notify();
}
}).start();
}
List<Integer> reshuffle(Collection<Integer> col) {
List<Integer> list = new ArrayList<Integer>();
for(Integer i : col) {
list.add(0, i);
}
return list;
} }
} }
No preview for this file type
import java.io.*; import java.io.*;
import java.lang.Runnable;
import java.nio.file.Files; import java.nio.file.Files;
import java.nio.file.Path;
import java.net.*; import java.net.*;
import java.util.Map; import java.util.Map;
import java.util.HashMap; import java.util.HashMap;
...@@ -88,131 +90,166 @@ public class Dstore { ...@@ -88,131 +90,166 @@ public class Dstore {
} }
void store(Socket client, String filename, int filesize, BufferedReader in) throws Exception { void store(Socket client, String filename, int filesize, BufferedReader in) throws Exception {
//Send ACK message to client new Thread(() -> {
PrintWriter out = new PrintWriter(client.getOutputStream()); try {
out.print("ACK"); //Send ACK message to client
out.flush(); PrintWriter out = new PrintWriter(client.getOutputStream());
out.close(); out.print("ACK");
out.flush();
FileOutputStream writer = new FileOutputStream(fileFolder + "/" + filename, false); out.close();
//Receive + write file content from client FileOutputStream writer = new FileOutputStream(fileFolder + "/" + filename, false);
int byteCount = filesize;
while(byteCount > 0) { //Receive + write file content from client
byte[] nextLine = in.readLine().getBytes(); int byteCount = filesize;
writer.write(nextLine); while(byteCount > 0) {
writer.flush(); byte[] nextLine = in.readLine().getBytes();
byteCount -= nextLine.length; writer.write(nextLine);
} writer.flush();
writer.close(); byteCount -= nextLine.length;
}
//Send STORE_ACK message to the Controller writer.close();
PrintWriter controllerOut = new PrintWriter(new Socket(InetAddress.getLocalHost(), cport).getOutputStream());
controllerOut.print("STORE_ACK " + filename); //Send STORE_ACK message to the Controller
controllerOut.flush(); PrintWriter controllerOut = new PrintWriter(new Socket(InetAddress.getLocalHost(), cport).getOutputStream());
controllerOut.close(); controllerOut.print("STORE_ACK " + filename);
controllerOut.flush();
controllerOut.close();
}
catch(IOException e) {
e.printStackTrace();
}
}).start();
} }
void load(Socket client, String filename) throws Exception { void load(Socket client, String filename) throws Exception {
//Send the content of the file in fileFolder to the client new Thread(() -> {
PrintWriter out = new PrintWriter(client.getOutputStream()); try {
FileInputStream reader; //Send the content of the file in fileFolder to the client
try { PrintWriter out = new PrintWriter(client.getOutputStream());
reader = new FileInputStream(fileFolder + "/" + filename); FileInputStream reader;
} try {
catch(FileNotFoundException e) { reader = new FileInputStream(fileFolder + "/" + filename);
out.print("ERROR DOES_NOT_EXIST"); }
out.flush(); catch(FileNotFoundException e) {
out.close(); out.print("ERROR DOES_NOT_EXIST");
return; out.flush();
} out.close();
return;
byte[] buf = new byte[8]; }
while(reader.read(buf) != -1) {
out.print(new String(buf)); byte[] buf = new byte[8];
out.flush(); while(reader.read(buf) != -1) {
} out.print(new String(buf));
out.flush();
reader.close(); }
out.close();
reader.close();
out.close();
}
catch(IOException e) {
e.printStackTrace();
}
}).start();
} }
void remove(Socket client, String filename) throws Exception { void remove(Socket client, String filename) throws Exception {
//Remove the file from fileFolder new Thread(() -> {
Path path = new File(fileFolder + "/" + filename).toPath(); try {
PrintWriter out = new PrintWriter(client.getOutputStream()); //Remove the file from fileFolder
Path path = new File(fileFolder + "/" + filename).toPath();
if(Files.deleteIfExists(path)) { PrintWriter out = new PrintWriter(client.getOutputStream());
//Send REMOVE_ACK message to client (the controller)
out.print("REMOVE_ACK"); if(Files.deleteIfExists(path)) {
} //Send REMOVE_ACK message to client (the controller)
else { out.print("REMOVE_ACK");
//Send DOES NOT EXIST error }
out.print("ERROR DOES_NOT_EXIST " + filename); else {
} //Send DOES NOT EXIST error
out.print("ERROR DOES_NOT_EXIST " + filename);
out.flush(); }
out.close();
out.flush();
out.close();
}
catch(IOException e) {
e.printStackTrace();
}
}).start();
} }
void list(Socket client) throws Exception { void list(Socket client) throws Exception {
//Send a list of all files in fileFolder to client (the controller) new Thread(() -> {
PrintWriter out = new PrintWriter(client.getOutputStream()); try {
for(File file : new File(fileFolder).listFiles()) { //Send a list of all files in fileFolder to client (the controller)
out.print(file.getName()); PrintWriter out = new PrintWriter(client.getOutputStream());
out.flush(); for(File file : new File(fileFolder).listFiles()) {
} out.print(file.getName());
out.close(); out.flush();
}
out.close();
}
catch(IOException e) {
e.printStackTrace();
}
}).start();
} }
void rebalance(Socket client, String[] message) throws Exception { void rebalance(Socket client, String[] message) throws Exception {
//Interpret files to send and files to remove from the message new Thread(() -> {
Map<String,Integer[]> filesToSend; try {
String[] filesToRemove; //Interpret files to send and files to remove from the message
int index; Map<String,Integer[]> filesToSend;
String[] filesToRemove;
int numberToSend = Integer.parseInt(message[1]); int index;
index = 2;
filesToSend = new HashMap<String,Integer[]>(numberToSend); int numberToSend = Integer.parseInt(message[1]);
for(int i=0; i<numberToSend; i++) { index = 2;
String name = message[index]; filesToSend = new HashMap<String,Integer[]>(numberToSend);
index++; for(int i=0; i<numberToSend; i++) {
String name = message[index];
int numberOfReceivers = Integer.parseInt(message[index]); index++;
index++;
Integer[] receivers = new Integer[numberOfReceivers]; int numberOfReceivers = Integer.parseInt(message[index]);
for(int j=0; j<numberOfReceivers; j++) { index++;
receivers[j] = Integer.parseInt(message[index]); Integer[] receivers = new Integer[numberOfReceivers];
for(int j=0; j<numberOfReceivers; j++) {
receivers[j] = Integer.parseInt(message[index]);
index++;
}
filesToSend.put(name, receivers);
}
int numberToRemove = Integer.parseInt(message[index]);
index++; index++;
filesToRemove = new String[numberToRemove];
for(int k=0; k<numberToRemove; k++) {
filesToRemove[k] = message[index];
index++;
}
//Send each file to send to the Dstore at the specified port number
for(String filename : filesToSend.keySet()) {
for(Integer dstore : filesToSend.get(filename)) {
//Same store functions as used in the client object
}
}
//Remove each file to remove from fileFolder
for(String filename : filesToRemove) {
new File(fileFolder + "/" + filename).delete();
}
//Send REBALANCE_COMPLETE message to client (the controller)
PrintWriter out = new PrintWriter(client.getOutputStream());
out.print("REBALANCE COMPLETE");
out.flush();
out.close();
} }
catch(IOException e) {
filesToSend.put(name, receivers); e.printStackTrace();
}
int numberToRemove = Integer.parseInt(message[index]);
index++;
filesToRemove = new String[numberToRemove];
for(int k=0; k<numberToRemove; k++) {
filesToRemove[k] = message[index];
index++;
}
//Send each file to send to the Dstore at the specified port number
for(String filename : filesToSend.keySet()) {
for(Integer dstore : filesToSend.get(filename)) {
//Same store functions as used in the client object
} }
} }).start();
//Remove each file to remove from fileFolder
for(String filename : filesToRemove) {
new File(fileFolder + "/" + filename).delete();
}
//Send REBALANCE_COMPLETE message to client (the controller)
PrintWriter out = new PrintWriter(client.getOutputStream());
out.print("REBALANCE COMPLETE");
out.flush();
out.close();
} }
} }
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment