Skip to content
Snippets Groups Projects
Commit decfd0fa authored by dl3g19's avatar dl3g19
Browse files

Operations with client work, now need to fix rebalancing and multiple Dstores

parent bc82a441
No related branches found
No related tags found
No related merge requests found
Showing with 778 additions and 354 deletions
Somebody once told me the world is gonna roll me
I ain't the sharpest tool in the shed
She was looking kind of dumb with her finger and her thumb
In the shape of an "L" on her forehead
Well, the years start coming and they don't stop coming
Fed to the rules and I hit the ground running
Didn't make sense not to live for fun
Your brain gets smart but your head gets dumb
So much to do, so much to see
So what's wrong with taking the back streets?
You'll never know if you don't go
You'll never shine if you don't glow
Hey, now, you're an all-star, get your game on, go play
Hey, now, you're a rock star, get the show on, get paid
And all that glitters is gold
Only shooting stars break the mold
It's a cool place and they say it gets colder
You're bundled up now wait 'til you get older
But the meteor men beg to differ
Judging by the hole in the satellite picture
The ice we skate is getting pretty thin
The water's getting warm so you might as well swim
My world's on fire. How about yours?
That's the way I like it and I'll never get bored
Hey, now, you're an all-star, get your game on, go play
Hey, now, you're a rock star, get the show on, get paid
And all that glitters is gold
Only shooting stars break the mold
Go for the moon
Go for the moon
Go for the moon
Go for the moon
Hey, now, you're an all-star, get your game on, go play
Hey, now, you're a rock star, get the show on, get paid
And all that glitters is gold
Only shooting stars
Somebody once asked could I spare some change for gas
I need to get myself away from this place
I said yep, what a concept
I could use a little fuel myself
And we could all use a little change
Well, the years start coming and they don't stop coming
Fed to the rules and I hit the ground running
Didn't make sense not to live for fun
Your brain gets smart but your head gets dumb
So much to do, so much to see
So what's wrong with taking the back streets?
You'll never know if you don't go
You'll never shine if you don't glow
Hey, now, you're an all star, get your game on, go play
Hey, now, you're a rock star, get the show on, get paid
And all that glitters is gold
Only shooting stars break the mold
And all that glitters is gold
Only shooting stars break the mold
File added
......@@ -42,26 +42,36 @@ public class ClientMain {
try { client.connect(); } catch(IOException e) { e.printStackTrace(); return; }
String[] files = {"AllStar.txt", "Unknown.txt", "PumpkinHill.txt", "SnowHalation.txt", "Grandad.txt"};
for(String file : files) {
try { client.store(new File(file)); } catch(IOException e) { e.printStackTrace(); }
}
/*
try { list(client); } catch(IOException e) { e.printStackTrace(); }
try { client.store(new File("Clipboard01.pdf")); } catch(IOException e) { e.printStackTrace(); }
try { client.store(new File("AllStar.txt")); } catch(IOException e) { e.printStackTrace(); }
try { client.store(new File("Clipboard01.pdf")); } catch(IOException e) { e.printStackTrace(); }
try { client.store(new File("AllStar.txt")); } catch(IOException e) { e.printStackTrace(); }
try { client.store(new File("Clipboard01.jpg")); } catch(IOException e) { e.printStackTrace(); }
try { client.store(new File("Unknown.txt")); } catch(IOException e) { e.printStackTrace(); }
*/
String list[] = null;
try { list = list(client); } catch(IOException e) { e.printStackTrace(); }
if (list != null)
for (String filename : list)
try { client.load(filename, downloadFolder); } catch(IOException e) { e.printStackTrace(); }
/*
if (list != null)
for (String filename : list)
try { client.remove(filename); } catch(IOException e) { e.printStackTrace(); }
try { list(client); } catch(IOException e) { e.printStackTrace(); }
*/
} finally {
if (client != null)
......
File added
No preview for this file type
File added
No preview for this file type
No preview for this file type
......@@ -21,16 +21,12 @@ public class Controller {
protected List<Integer> storedBy;
protected int numberToStore;
protected String status;
protected Object storeAckLock;
protected List<Reloader> clientLoadList;
public IndexEntry() {
filesize = -1;
storedBy = Collections.synchronizedList(new ArrayList<Integer>());
numberToStore = 0;
status = "store in progress";
storeAckLock = new Object();
clientLoadList = new ArrayList<Reloader>();
}
public synchronized void setFilesize(int filesize) {
......@@ -41,24 +37,24 @@ public class Controller {
return filesize;
}
public void addStoredBy(int dstore) {
public synchronized void addStoredBy(int dstore) {
storedBy.add(Integer.valueOf(dstore));
if(storedBy.size() == numberToStore) storeAckLock.notify();
if(storedBy.size() >= numberToStore) notifyAll();
}
public void addStoredBy(List<Integer> dstores) {
public synchronized void addStoredBy(List<Integer> dstores) {
storedBy.addAll(dstores);
if(storedBy.size() == numberToStore) storeAckLock.notify();
if(storedBy.size() >= numberToStore) notifyAll();
}
public void removeStoredBy(int dstore) {
public synchronized void removeStoredBy(int dstore) {
storedBy.remove(Integer.valueOf(dstore));
if(storedBy.size() == 0) storeAckLock.notify();
if(storedBy.isEmpty()) notifyAll();
}
public void removeStoredBy(List<Integer> dstores) {
public synchronized void removeStoredBy(List<Integer> dstores) {
storedBy.removeAll(dstores);
if(storedBy.size() == 0) storeAckLock.notify();
if(storedBy.isEmpty()) notifyAll();
}
public List<Integer> getStoredBy() {
......@@ -76,26 +72,16 @@ public class Controller {
public synchronized String getStatus() {
return status;
}
public Object getLock() {
return storeAckLock;
}
public List<Reloader> getLoadList() {
return clientLoadList;
}
}
protected class Reloader {
public boolean reload;
public Reloader() {
reload = false;
}
protected class Reloader extends ArrayList<Integer> {
public int filesize;
}
protected Map<Integer,DstoreConnection> dstores;
protected Map<Integer,String[]> rebalanceMessages;
protected Map<Integer,List<String>> rebalanceMessages;
protected Map<String,IndexEntry> index;
protected Map<Socket,Reloader> loadRequests;
public Controller(int cport, int rFactor, int timeout, int rebalancePeriod) {
this.cport = cport;
......@@ -103,7 +89,8 @@ public class Controller {
this.timeout = timeout;
this.rebalancePeriod = rebalancePeriod;
dstores = Collections.synchronizedMap(new HashMap<Integer,DstoreConnection>());
index = new HashMap<String,IndexEntry>();
index = Collections.synchronizedMap(new HashMap<String,IndexEntry>());
loadRequests = Collections.synchronizedMap(new HashMap<Socket,Reloader>());
}
public static void main(String[] args) {
......@@ -136,22 +123,44 @@ public class Controller {
public void start() {
try {
Thread rebalanceThread = new Thread(new RebalanceThread());
rebalanceThread.start();
ServerSocket server = new ServerSocket(cport);
while(true) {
try {
Socket client = server.accept();
BufferedReader in = new BufferedReader(new InputStreamReader(client.getInputStream()));
String[] message = in.readLine().split(" ");
if(dstores.size() < rFactor && !message[0].equals("JOIN")) {
PrintWriter out = new PrintWriter(client.getOutputStream());
out.println("ERROR");
out.flush();
out.close();
if(message[0].equals("JOIN")) {
dstores.put(Integer.parseInt(message[1]), new DstoreConnection(client));
System.out.println("Dstore at " + message[1] + " joined");
try {rebalanceThread.interrupt();} catch(SecurityException e) {e.printStackTrace();}
}
else {
System.out.println("A new client has joined");
new Thread(() -> {
try {
handleMessage(message, client);
}
in.close();
catch(Exception e) {
e.printStackTrace();
}
while(!client.isClosed()) {
try {
String clientMessage = in.readLine();
if(clientMessage != null) {
handleMessage(clientMessage.split(" "), client);
}
}
catch(Exception e) {
e.printStackTrace();
}
}
System.out.println("Client closed");
}).start();
}
}
catch(Exception e) {
//Log error
......@@ -165,20 +174,36 @@ public class Controller {
}
}
void handleMessage(String[] message, Socket client) throws Exception {
if(message[0].equals("JOIN")) {
dstores.put(Integer.parseInt(message[1]), new DstoreConnection(client));
System.out.println("Dstore at " + message[1] + " joined");
protected class RebalanceThread implements Runnable {
public void run() {
while(true) {
try {Thread.sleep(rebalancePeriod);} catch(InterruptedException e) {
try {rebalance();} catch(Exception ee) {ee.printStackTrace();}
}
try {
if(dstores.size() >= rFactor) {
rebalance();
}
}
catch(Exception e) {e.printStackTrace();}
}
}
}
void handleMessage(String[] message, Socket client) throws Exception {
if(dstores.size() < rFactor) {
PrintWriter out = new PrintWriter(client.getOutputStream());
out.println("ERROR_NOT_ENOUGH_DSTORES");
out.flush();
}
else if(message[0].equals("STORE")) {
store(client, message[1]);
store(client, message[1], message[2]);
}
else if(message[0].equals("LOAD")) {
load(client, message[1]);
}
else if(message[0].equals("RELOAD")) {
reload(message[1]);
sendLoadFrom(client, message[1]);
}
else if(message[0].equals("REMOVE")) {
remove(client, message[1]);
......@@ -191,43 +216,52 @@ public class Controller {
}
}
void store(Socket client, String filename) throws Exception {
new Thread(() -> {
void store(Socket client, String filename, String filesizeString) throws Exception {
int filesize = -1;
try {
filesize = Integer.parseInt(filesizeString);
if(filesize < 1) {
//Log error
}
}
catch(NumberFormatException e) {
//Log error
}
try {
if(index.containsKey(filename)) {
PrintWriter out = new PrintWriter(client.getOutputStream());
out.println("ERROR ALREADY_EXISTS " + filename);
out.println("ERROR_FILE_ALREADY_EXISTS " + filename);
out.flush();
out.close();
return;
}
//Update index to "store in progress"
IndexEntry entry = new IndexEntry();
index.put(filename, entry);
entry.setFilesize(filesize);
//Select Dstores
int[] storesToStore = new int[rFactor];
Integer[] storesToStore = new Integer[rFactor];
synchronized(dstores) {
Iterator<Integer> it = dstores.keySet().iterator();
for(int i=0; i<rFactor; i++) {
Integer thisStore = it.next();
storesToStore[i] = thisStore.intValue();
storesToStore[i] = it.next();
}
}
entry.setNumberToStore(rFactor);
//Send STORE_TO message
PrintWriter out = new PrintWriter(client.getOutputStream());
out.println("STORE_TO");
out.flush();
for(int port : storesToStore) {
out.println(" " + port);
String message = "STORE_TO";
for(Integer thisStore : storesToStore) {
message = message + " " + thisStore.intValue();
new Thread(() -> {
String[] message = dstores.get(Integer.valueOf(port)).receive().split(" ");
if(message[0].equals("STORE_ACK")) {
try {
storeAck(Integer.valueOf(port), message[1]);
String[] receivedMessage = dstores.get(thisStore).receive().split(" ");
if(receivedMessage[0].equals("STORE_ACK")) {
try {
storeAck(thisStore, receivedMessage[1]);
}
catch(Exception e) {
//Log error
......@@ -236,16 +270,24 @@ public class Controller {
else {
//Log error
}
}
catch(NullPointerException e) {
removeDstore(thisStore);
}
}).start();
}
out.println(message);
out.flush();
//Wait for STORE_ACKs from datastores in storesToStore
synchronized(entry) {
try {
entry.getLock().wait(timeout);
entry.wait(timeout);
}
catch(InterruptedException e) {
e.printStackTrace();
}
}
if(entry.getStoredBy().size() < rFactor) {
//Log error
......@@ -257,12 +299,10 @@ public class Controller {
//Send STORE_COMPLETE message
out.println("STORE_COMPLETE");
out.flush();
out.close();
}
catch(IOException e) {
e.printStackTrace();
}
}).start();
}
void storeAck(Integer port, String filename) throws Exception {
......@@ -276,74 +316,60 @@ public class Controller {
}
void load(Socket client, String filename) throws Exception {
new Thread(() -> {
try {
if(!index.containsKey(filename)) {
PrintWriter out = new PrintWriter(client.getOutputStream());
out.println("ERROR DOES_NOT_EXIST");
out.flush();
out.close();
return;
}
//Select a Dstore which contains the file
IndexEntry thisEntry = index.get(filename);
int thisStore = thisEntry.storedBy.get(0).intValue();
int thisSize = thisEntry.filesize;
Reloader storedBy = new Reloader();
storedBy.filesize = thisEntry.getFilesize();
Iterator<Integer> it = thisEntry.getStoredBy().iterator();
while(it.hasNext()) {
Integer d = it.next();
storedBy.add(d);
System.out.println("Dstore " + d + " added to load list");
}
loadRequests.put(client,storedBy);
//Send LOAD_FROM message
PrintWriter out = new PrintWriter(client.getOutputStream());
out.println("LOAD_FROM " + thisStore + " " + thisSize);
out.flush();
Reloader reloadLock = new Reloader();
thisEntry.getLoadList().add(reloadLock);
int trials = 0;
while(true) {
try {
reloadLock.wait(10 * timeout);
sendLoadFrom(client, filename);
}
catch(InterruptedException e) {
catch(IOException e) {
e.printStackTrace();
}
trials ++;
if(trials >= rFactor || !reloadLock.reload) break;
out.println("LOAD_FROM " + thisEntry.storedBy.get(trials).intValue() + " " + thisSize);
out.flush();
reloadLock.reload = false;
}
thisEntry.getLoadList().remove(reloadLock);
if(trials >= rFactor && reloadLock.reload) {
out.println("ERROR LOAD");
out.flush();
void sendLoadFrom(Socket client, String filename) {
try {
PrintWriter out = new PrintWriter(client.getOutputStream());
Reloader storedBy = loadRequests.get(client);
System.out.println("Load requested for file " + filename + ", there are " + storedBy.size() + " dstores to select from");
if(storedBy.isEmpty()) {
out.println("ERROR_LOAD");
}
out.close();
else {
Integer thisStore = storedBy.get(0);
storedBy.remove(thisStore);
out.println("LOAD_FROM " + thisStore + " " + storedBy.filesize);
}
out.flush();
}
catch(IOException e) {
e.printStackTrace();
}
}).start();
}
void reload(String filename) {
new Thread(() -> {
for(Reloader r : index.get(filename).getLoadList()) {
r.reload = true;
r.notify();
}
}).start();
}
void remove(Socket client, String filename) throws Exception {
new Thread(() -> {
try {
if(!index.containsKey(filename)) {
PrintWriter clientOut = new PrintWriter(client.getOutputStream());
clientOut.println("ERROR DOES_NOT_EXIST");
clientOut.flush();
clientOut.close();
return;
}
......@@ -354,23 +380,30 @@ public class Controller {
//Send REMOVE message to all Dstores storing the file
for(Integer dstore : entry.getStoredBy()) {
new Thread(() -> {
String[] message = dstores.get(dstore).sendAndReceive("REMOVE").split(" ");
try {
String[] message = dstores.get(dstore).sendAndReceive("REMOVE " + filename).split(" ");
if(message[0].equals("REMOVE_ACK") && message[1].equals(filename)) {
entry.removeStoredBy(dstore.intValue());
}
else {
//Log error
}
}
catch(NullPointerException e) {
removeDstore(dstore);
}
}).start();
}
//Wait for REMOVE_ACKs from all Dstores which were sent the REMOVE message
synchronized(entry) {
try {
entry.getLock().wait(timeout);
entry.wait(timeout);
}
catch(InterruptedException e) {
e.printStackTrace();
}
}
if(entry.getStoredBy().size() > 0) {
//Log error
......@@ -378,38 +411,37 @@ public class Controller {
//Update index to "remove complete"
entry.status = "remove complete";
index.remove(filename);
//Send REMOVE_COMPLETE to client
PrintWriter clientOut = new PrintWriter(client.getOutputStream());
clientOut.println("REMOVE_COMPLETE");
clientOut.flush();
clientOut.close();
}
catch(IOException e) {
e.printStackTrace();
}
}).start();
}
void list(Socket client) throws Exception {
new Thread(() -> {
try {
System.out.println("Fetching list...");
//Send file list to client
PrintWriter out = new PrintWriter(client.getOutputStream());
String message = "LIST ";
for(String name : index.keySet()) {
out.println(name);
message = message + name + " ";
}
PrintWriter out = new PrintWriter(client.getOutputStream());
System.out.println("Sending...");
out.println(message.trim());
out.flush();
out.close();
}
catch(IOException e) {
e.printStackTrace();
}
}).start();
}
void rebalance() throws Exception {
new Thread(() -> {
if(rebalanceMessages != null) return;
Map<Integer,List<String>> dstoreFiles = new HashMap<Integer,List<String>>();
synchronized(dstoreFiles) {
......@@ -420,8 +452,13 @@ public class Controller {
dstoreFiles.put(dstore, new ArrayList<String>());
new Thread(() -> {
try {
String[] message = dstores.get(dstore).sendAndReceive("LIST").split(" ");
receiveDstoreList(dstore.intValue(), message);
}
catch(NullPointerException e) {
removeDstore(dstore);
}
}).start();
}
......@@ -452,7 +489,7 @@ public class Controller {
requireIndex.put(i, new ArrayList<String>());
removeIndex.put(i, new ArrayList<String>());
}
Iterator<Integer> it;
Iterator<Integer> it = null;
for(String file : fileList) {
for(int j=0; j<rFactor; j++) {
if(it == null || !it.hasNext()) {
......@@ -477,18 +514,30 @@ public class Controller {
}
}
Integer acksReceived = new Integer(0);
class AcksReceived {
int value;
public AcksReceived() {
value = 0;
}
public void incr() {
value ++;
}
public int getValue() {
return value;
}
}
AcksReceived acksReceived = new AcksReceived();
for(Integer thisStore : storeOrder) {
List<String> sendMessages = new ArrayList<String>();
for(String file : dstoreFiles.get(thisStore)) {
if(isEmptyListMap(requiredFiles)) break;
if(isEmptyListMap(requireIndex)) break;
String fileMessage = "";
for(Integer otherStore : requiredFiles.keySet()) {
for(Integer otherStore : requireIndex.keySet()) {
if(thisStore.equals(otherStore)) continue;
for(String otherFile : requiredFiles.get(otherStore)) {
for(String otherFile : requireIndex.get(otherStore)) {
if(file.equals(otherFile)) {
requiredFiles.get(otherStore).remove(otherFile);
requireIndex.get(otherStore).remove(otherFile);
fileMessage = fileMessage + " " + otherStore.toString();
break;
}
......@@ -508,25 +557,32 @@ public class Controller {
}
//Send message to the Dstore
String finalMessage = message;
new Thread(() -> {
String returnMessage = dstores.get(thisStore).sendAndReceive(message);
try {
String returnMessage = dstores.get(thisStore).sendAndReceive(finalMessage);
if(!returnMessage.equals("REBALANCE_COMPLETE")) {
//Log error
}
synchronized(acksReceived) {
acksReceived += 1;
if(acksReceived.intValue() == storeOrder.size()) {
acksReceived.incr();
if(acksReceived.getValue() == storeOrder.size()) {
acksReceived.notifyAll();
}
}
}
catch(NullPointerException e) {
removeDstore(thisStore);
}
}).start();
}
//Wait for REBALANCE_COMPLETE from all Dstores
synchronized(acksReceived) {
try {
System.out.println("Waiting for REBALANCE_COMPLETE...");
acksReceived.wait(timeout);
if(acksReceived.intValue < storeOrder.size()) {
if(acksReceived.getValue() < storeOrder.size()) {
//Restart rebalance operation
}
}
......@@ -542,27 +598,40 @@ public class Controller {
rebalanceMessages = null;
}
}
}).start();
}
void receiveDstoreList(int port, String[] list) {
if(rebalanceMessages == null) return;
List<String> toList = new ArrayList<String>();
if(!list[0].equals("ERROR_EMPTY")) {
for(String file : list) {
if(!index.containsKey(file)) {
//Log error
return; //Throw exception?
//return; //Throw exception?
//Ignore those, there might be new resources a joining Dstore wants to contribute
//Then again, there might be malformed messages. Think about this.
}
toList.add(file);
}
}
synchronized(rebalanceMessages) {
rebalanceMessages.put(port, list);
rebalanceMessages.put(port, toList);
if(rebalanceMessages.size() == dstores.size()) {
rebalanceMessages.notify();
}
}
}
void removeDstore(Integer dstore) {
dstores.remove(dstore);
Iterator<IndexEntry> it = index.values().iterator();
while(it.hasNext()) {
it.next().removeStoredBy(dstore);
}
}
List<Integer> reshuffle(Collection<Integer> col) {
List<Integer> list = new ArrayList<Integer>();
for(Integer i : col) {
......@@ -571,8 +640,8 @@ public class Controller {
return list;
}
boolean isEmptyListMap(Map<T,List<U>> map) {
for(List<U> list : map.entrySet()) {
<T,U> boolean isEmptyListMap(Map<T,List<U>> map) {
for(List<U> list : map.values()) {
if(!list.isEmpty()) {
return false;
}
......
No preview for this file type
......@@ -12,17 +12,27 @@ public class Dstore {
protected int port; //Port to listen on
protected int cport; //Controller's port to talk to
protected int timeout; //in milliseconds
protected String fileFolder; //Where to store the data locally
protected File fileFolder; //Where to store the data locally
protected Map<String,Integer> fileSizes;
protected Socket controllerSocket;
protected BufferedReader controllerIn;
protected PrintWriter controllerOut;
public Dstore(int port, int cport, int timeout, String fileFolder) {
public Dstore(int port, int cport, int timeout, String fileFolderName) throws Exception {
this.port = port;
this.cport = cport;
this.timeout = timeout;
this.fileFolder = fileFolder;
fileFolder = new File(fileFolderName);
if(fileFolder.exists() && !fileFolder.isDirectory()) {
throw new Exception("Folder name provided exists as a file and not a directory");
}
else if(!fileFolder.exists()) {
System.out.println("New folder will be created");
if(!fileFolder.mkdir()) throw new Exception("Folder could not be created");
}
fileSizes = new HashMap<String,Integer>();
}
......@@ -56,9 +66,9 @@ public class Dstore {
public void start() {
try {
Socket socket = new Socket(InetAddress.getLocalHost(), cport);
controllerIn = new BufferedReader(new InputStreamReader(socket.getInputStream()));
controllerOut = new PrintWriter(socket.getOutputStream());
controllerSocket = new Socket(InetAddress.getLocalHost(), cport);
controllerIn = new BufferedReader(new InputStreamReader(controllerSocket.getInputStream()));
controllerOut = new PrintWriter(controllerSocket.getOutputStream());
controllerOut.println("JOIN " + port);
controllerOut.flush();
......@@ -67,7 +77,7 @@ public class Dstore {
try {
String message = controllerIn.readLine();
if(message != null) {
handleMessage(message.split(" "), socket, controllerIn);
handleMessage(message.split(" "), controllerSocket);
}
}
catch(Exception e) {
......@@ -83,8 +93,7 @@ public class Dstore {
Socket client = server.accept();
BufferedReader in = new BufferedReader(new InputStreamReader(client.getInputStream()));
String[] message = in.readLine().split(" ");
handleMessage(message, client, in);
in.close();
handleMessage(message, client);
}
catch(Exception e) {
//Log error
......@@ -97,9 +106,9 @@ public class Dstore {
}
}
void handleMessage(String[] message, Socket client, BufferedReader clientIn) throws Exception {
void handleMessage(String[] message, Socket client) throws Exception {
if(message[0].equals("STORE")) {
store(client, message[1], Integer.parseInt(message[2]), clientIn);
store(client, message[1], Integer.parseInt(message[2]));
}
else if(message[0].equals("LOAD_DATA")) {
load(client, message[1]);
......@@ -118,16 +127,15 @@ public class Dstore {
}
}
void store(Socket client, String filename, int filesize, BufferedReader in) throws Exception {
void store(Socket client, String filename, int filesize) throws Exception {
new Thread(() -> {
try {
//Send ACK message to client
PrintWriter out = new PrintWriter(client.getOutputStream());
out.println("ACK");
out.flush();
out.close();
FileOutputStream writer = new FileOutputStream(fileFolder + "/" + filename, false);
FileOutputStream writer = new FileOutputStream(new File(fileFolder, filename), false);
InputStream reader = client.getInputStream();
//Receive + write file content from client
......@@ -139,10 +147,8 @@ public class Dstore {
writer.close();
//Send STORE_ACK message to the Controller
PrintWriter controllerOut = new PrintWriter(new Socket(InetAddress.getLocalHost(), cport).getOutputStream());
controllerOut.println("STORE_ACK " + filename);
controllerOut.flush();
controllerOut.close();
if(fileSizes.containsKey(filename)) fileSizes.remove(filename);
fileSizes.put(filename, filesize);
......@@ -150,6 +156,9 @@ public class Dstore {
catch(IOException e) {
e.printStackTrace();
}
finally {
try {client.close();} catch(IOException e) {e.printStackTrace();}
}
}).start();
}
......@@ -160,12 +169,13 @@ public class Dstore {
PrintWriter out = new PrintWriter(client.getOutputStream());
FileInputStream reader;
try {
reader = new FileInputStream(fileFolder + "/" + filename);
reader = new FileInputStream(new File(fileFolder, filename));
}
catch(FileNotFoundException e) {
out.println("ERROR DOES_NOT_EXIST");
out.flush();
out.close();
client.close();
return;
}
......@@ -183,6 +193,9 @@ public class Dstore {
catch(IOException e) {
e.printStackTrace();
}
finally {
try {client.close();} catch(IOException e) {e.printStackTrace();}
}
}).start();
}
......@@ -190,11 +203,11 @@ public class Dstore {
new Thread(() -> {
try {
//Remove the file from fileFolder
Path path = new File(fileFolder + "/" + filename).toPath();
Path path = new File(fileFolder, filename).toPath();
if(Files.deleteIfExists(path)) {
//Send REMOVE_ACK message to client (the controller)
controllerOut.println("REMOVE_ACK");
controllerOut.println("REMOVE_ACK " + filename);
}
else {
//Send DOES NOT EXIST error
......@@ -212,20 +225,25 @@ public class Dstore {
void list() throws Exception {
new Thread(() -> {
//Send a list of all files in fileFolder to client (the controller)
for(File file : new File(fileFolder).listFiles()) {
controllerOut.println(file.getName());
controllerOut.flush();
String message = "";
for(File file : fileFolder.listFiles()) {
message = message + " " + file.getName();
}
if(message.equals("")) message = "ERROR_EMPTY";
controllerOut.println(message.trim());
controllerOut.flush();
}).start();
}
void rebalance(String[] message) throws Exception {
System.out.println("Rebalance message received");
new Thread(() -> {
//Interpret files to send and files to remove from the message
Map<Integer,List<String>> filesToSend;
String[] filesToRemove;
int index;
System.out.println("Interpreting message...");
int numberToSend = Integer.parseInt(message[1]);
index = 2;
filesToSend = new HashMap<Integer,List<String>>();
......@@ -252,26 +270,26 @@ public class Dstore {
filesToRemove[k] = message[index];
index++;
}
System.out.println("Interpreting complete, will send " + numberToSend + " and remove " + numberToRemove);
//Send each file to send to the Dstore at the specified port number
for(Integer dstore : filesToSend.keySet()) {
for(String filename : filesToSend.get(dstore)) {
new Thread(() -> {
try {
System.out.println("Sending " + filename + " to store " + dstore);
Socket socket = new Socket(InetAddress.getLocalHost(), dstore.intValue());
PrintWriter out = new PrintWriter(socket.getOutputStream());
out.println("STORE " + filename + " " + fileSizes.get(filename));
out.flush();
out.close();
BufferedReader in = new BufferedReader(new InputStreamReader(socket.getInputStream()));
if(!in.readLine().equals("ACK")) {
//Log error
}
in.close();
byte[] content = new byte[8];
FileInputStream fileIn = new FileInputStream(fileFolder + "/" + filename);
FileInputStream fileIn = new FileInputStream(new File(fileFolder, filename));
OutputStream fileOut = socket.getOutputStream();
while(fileIn.read(content) > 0) {
fileOut.write(content);
......@@ -279,6 +297,8 @@ public class Dstore {
}
fileIn.close();
fileOut.close();
in.close();
out.close();
socket.close();
}
catch(IOException e) {
......@@ -290,12 +310,14 @@ public class Dstore {
//Remove each file to remove from fileFolder
for(String filename : filesToRemove) {
new File(fileFolder + "/" + filename).delete();
System.out.println("Removing file " + filename);
new File(fileFolder, filename).delete();
}
//Send REBALANCE_COMPLETE message to client (the controller)
controllerOut.print("REBALANCE COMPLETE");
controllerOut.println("REBALANCE_COMPLETE");
controllerOut.flush();
System.out.println("Sent message REBALANCE_COMPLETE");
//TO DO: WAIT FOR RESPONSES BEFORE SENDING REBALANCE_COMPLETE
}).start();
}
......
No preview for this file type
......@@ -24,32 +24,56 @@ public class DstoreConnection {
e.printStackTrace();
available = false;
}
catch(NullPointerException e) {
System.out.println("Dstore disconnected");
available = false;
}
}
public String sendAndReceive(String message) {
public String sendAndReceive(String message) throws NullPointerException {
System.out.println("Getting lock...");
synchronized(this) {
try {
System.out.println("Lock acquired");
if(!available) return "ERROR";
writer.println(message);
writer.flush();
return localReceive();
}
catch(NullPointerException e) {
System.out.println("Dstore disconnected");
available = false;
throw new NullPointerException();
}
}
}
public String receive() {
public String receive() throws NullPointerException {
System.out.println("Getting lock...");
synchronized(this) {
System.out.println("Lock acquired");
if(!available) return "ERROR";
return localReceive();
}
}
protected String localReceive() {
protected String localReceive() throws NullPointerException {
try {
String returnMessage = reader.readLine();
String returnMessage = "";
while(returnMessage.equals("")) {
returnMessage = reader.readLine();
}
System.out.println("Controller received " + returnMessage);
return returnMessage;
}
catch(IOException e) {
e.printStackTrace();
return "";
}
catch(NullPointerException e) {
System.out.println("Dstore disconnected");
available = false;
throw new NullPointerException();
}
}
}
(Yabba Dabba Doo!)
Flintstones. Meet the Flintstones.
They're the modern stone age family.
From the town of Bedrock,
They're a page right out of history.
Let's ride with the family down the street.
Through the courtesy of Fred's two feet.
When you're with the Flintstones
Have a yabba dabba doo time.
A dabba doo time.
We'll have a gay old time.
You know me, the fighting freak Knuckles,
And we're at Pumpkin Hill,
You ready?
I ain't gonna let it get to me, I'm just gonna creep,
Down in Pumpkin Hill I gots to find my lost piece.
I know that it's here, I can sense it in my feet,
The great Emerald's power allows me to feel.
I can't see a thing but it's around somewhere,
I'm gonna hold my head 'cause I have no fear.
This probably seems crazy, crazy, a graveyard theory,
A ghost tried to approach me and got leery.
Asked him a question and he vanished in a second,
I'm walkin' through valleys cryin' pumpkin in the alley.
Didn't seem happy but they sure tried to get me,
Had to back 'em up with the fist, metal crack 'em.
I'm hearing someone sayin' "You a chicken, don't be scared!"
It had to be the wind, 'cause nobody wasn't there.
I searched and I searched as I climbed up the wall,
And then I started to fly, I went in deeper!
Let it get to me? I'm just gonna creep,
Down in Pumpkin Hill I gots to find my lost piece.
I know that it's here, I sense it in my feet,
The great Emerald's power allows me to feel.
I can't see a thing but it's around somewhere,
I gotta hold my head, I have no fear.
It probably seems crazy, crazy, a graveyard theory,
A ghost tried to approach me, he got leery.
(This is Knuckles, who fears none.)
(It's real deal when it comes to my name, kid!)
I ain't gonna let it get to me, I'm just gonna creep,
Down in Pumpkin Hill I gots to find my lost piece.
I know that it's here, I can sense it in my feet,
The great Emerald's power allows me to feel.
I can't see a thing but it's around somewhere,
I'm gonna hold my head 'cause I have no fear.
This probably seems crazy, crazy, a graveyard theory,
A ghost tried to approach me and got leery.
Asked him a question and he vanished in a second,
I'm walkin' through valleys cryin' pumpkin in the alley.
Didn't seem happy but they sure tried to get me,
Had to back 'em up with the fist, metal crack 'em.
I'm hearing someone sayin' "You a chicken, don't be scared!"
It had to be the wind, 'cause nobody wasn't there.
I searched and I searched as I climbed up the wall,
And then I started to fly, I went in deeper!
Let it get to me? I'm just gonna creep,
Down in Pumpkin Hill I gots to find my lost piece.
I know that it's here, I sense it in my feet,
The great Emerald's power allows me to feel.
I can't see a thing but it's around somewhere,
I gotta hold my head, I have no fear.
It probably seems crazy, crazy, a graveyard theory,
A ghost tried to approach me, he got leery.
(Spooky up in here, it's crazy in here,)
(We still gon' keep it goin', I'm Knuckles.)
(Nobody scares me,)
(Whoever want it, bring it!)
(I don't care, we 'ka do this.)
(Then come step up to the plate, and meet your match,)
(It ain't no thang.)
Fushigi da ne ima no kimochi
Sora kara futte kita mitai
Tokubetsu na kisetsu no iro ga tokimeki o miseru yo
Hajimete deatta toki kara
Yokan ni sawagu kokoro no Melody
Tomerarenai tomaranai na・ze
Todokete
Setsunasa ni wa namae o tsukeyou ka "Snow halation"
Omoi ga kasanaru made matezu ni
Kuyashii kedo suki tte junjou
Binetsu no naka tameratte mo dame da ne
Tobikomu yuuki ni sansei mamonaku Start!!
Oto mo naku kehai mo naku
Shizuka ni unmei wa kawaru
Korekara no mirai ni mune no kodou ga hayaku naru
Tatoeba komatta toki ni wa
Sugu kaketsukete dakishimetakute
Doko ni ite mo dokodemo Fly high
Isoide
Itsu no ma ni ka ookiku nari sugita "True emotion"
Yume dake miteru you ja tsurai yo
Koibito wa kimi tte iitai
Yasashii me ga tomadotteru iya da yo
Kono mama ikki ni aijou azukete Please!!
Todokete
Setsunasa ni wa namae o tsukeyou ka "Snow halation"
Omoi ga kasanaru made matezu ni
Kuyashii kedo suki tte junjou
Binetsu no naka tameratte mo dame da ne
Tobikomu yuuki ni sansei mamonaku Start!!
Here I come, rougher than the rest of them
The best of them, tougher than leather
You can call me Knuckles, unlike Sonic I don't chuckle
I'd rather flex my muscles
I'm hard as nails, it ain't hard to tell
I break 'em down whether they're solid or frail
Unlike the rest I'm independent since my first breath
First test, feel the right, than the worst's left
Born on an island in the heavens
The blood of my ancestors flows inside me
My duty is to save the flower
From evil deterioration
I will be the one to set your heart free, true
Cleanse yourself of them evil spirits that's in you
Streaking lights, loud sounds, and instincts
Are the elements that keep me going
I am fighting my own mission
Nothing's gonna stand in my way
I will be the one to set your heart free, true
Cleanse yourself of them evil spirits that's in you
Won't be frightened, I'll stand up to all the pain and turmoil
Just believe in myself, won't rely on others
Get this power to wipe out the havoc and anarchy
This is my planet, gonna fight for my destiny
Here I come, rougher than the rest of them
The best of them, tougher than leather
You can call me Knuckles, unlike Sonic I don't chuckle
I'd rather flex my muscles
I'm hard as nails, it ain't hard to tell
I break 'em down whether they're solid or frail
Unlike the rest I'm independent since my first breath
First test, feel the right, than the worst's left
I have no such things as weak spots
Don't approve of him but gotta trust him
This alliance has a purpose
This partnership is only temporary
I will be the one to set your heart free, true
Cleanse yourself of evil spirits that got in you
Won't be frightened, I'll stand up to all the pain and turmoil
Just believe in myself, won't rely on others
Freedom will be waiting when serenity is restored
This is my planet, I shall not surrender
Won't be frightened, I'll stand up to all the pain and turmoil
Just believe in myself, won't rely on others
Get this power to wipe out the havoc and anarchy
This is my planet, gonna fight
Won't be frightened, I'll stand up to all the pain and turmoil
Just believe in myself, won't rely on others
Freedom will be waiting when serenity is restored
This is my planet, I shall not surrender
The new porcupine on the block with the buff chest
In the wilderness with the ruggedness
Knock, knock, it's Knuckles, the bloat thrower
Independent flower, Magical Emerald holder
I'll give you the coldest shoulder
My spikes go through boulders, that's why I stay a loner
I was born by myself, I don't need a posse
I get it on by myself, adversaries get shelved
Right on!
Cannot connect to the Controller on port 8082
Connection established to port 8080
Message sent to port 8080: LIST
List operation started
Message received from port 8080: ERROR
ERROR: Connection closed by the Controller
List operation failed
ERROR: File to store does not exist (absolute path: /home/danimal/Documents/comp2207-distributed-filesystem/Clipboard01.pdf)
ERROR: File to store does not exist (absolute path: /home/danimal/Documents/comp2207-distributed-filesystem/Clipboard01.pdf)
ERROR: File to store does not exist (absolute path: /home/danimal/Documents/comp2207-distributed-filesystem/Clipboard01.jpg)
Message sent to port 8080: LIST
List operation started
Message received from port 8080: null
ERROR: Connection closed by the Controller
List operation failed
Message sent to port 8080: LIST
List operation started
Message received from port 8080: null
ERROR: Connection closed by the Controller
List operation failed
Connection established to port 8080
Message sent to port 8080: LIST
List operation started
Message received from port 8080: null
ERROR: Connection closed by the Controller
List operation failed
ERROR: File to store does not exist (absolute path: /home/danimal/Documents/comp2207-distributed-filesystem/Clipboard01.pdf)
ERROR: File to store does not exist (absolute path: /home/danimal/Documents/comp2207-distributed-filesystem/Clipboard01.pdf)
ERROR: File to store does not exist (absolute path: /home/danimal/Documents/comp2207-distributed-filesystem/Clipboard01.jpg)
Message sent to port 8080: LIST
List operation started
Message received from port 8080: null
ERROR: Connection closed by the Controller
List operation failed
Message sent to port 8080: LIST
List operation started
Message received from port 8080: null
ERROR: Connection closed by the Controller
List operation failed
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment