Skip to content
Snippets Groups Projects
Commit e10ec28f authored by dl3g19's avatar dl3g19
Browse files

Modifications made so that only one connection per Dstore needs to open

parent 69f99016
No related branches found
No related tags found
No related merge requests found
File deleted
...@@ -83,52 +83,6 @@ public class Controller { ...@@ -83,52 +83,6 @@ public class Controller {
} }
} }
protected class SyncList extends ArrayList<Integer> {
public SyncList() {
super();
}
@Override
public boolean add(Integer i) {
synchronized(this) {
return super.add(i);
}
}
@Override
public boolean addAll(Collection<? extends Integer> c) {
synchronized(this) {
return super.addAll(c);
}
}
@Override
public Integer get(int i) {
synchronized(this) {
return super.get(i);
}
}
@Override
public int size() {
synchronized(this) {
return super.size();
}
}
public Integer remove(int i) {
synchronized(this) {
return super.remove(i);
}
}
public boolean remove(Integer i) {
synchronized(this) {
return super.remove(i);
}
}
}
protected class Reloader { protected class Reloader {
public boolean reload; public boolean reload;
public Reloader() { public Reloader() {
...@@ -136,7 +90,7 @@ public class Controller { ...@@ -136,7 +90,7 @@ public class Controller {
} }
} }
protected List<Integer> dstores; protected Map<Integer,Socket> dstores;
protected Map<Integer,String[]> rebalanceMessages; protected Map<Integer,String[]> rebalanceMessages;
protected Map<String,IndexEntry> index; protected Map<String,IndexEntry> index;
...@@ -145,7 +99,7 @@ public class Controller { ...@@ -145,7 +99,7 @@ public class Controller {
this.rFactor = rFactor; this.rFactor = rFactor;
this.timeout = timeout; this.timeout = timeout;
this.rebalancePeriod = rebalancePeriod; this.rebalancePeriod = rebalancePeriod;
dstores = new SyncList(); dstores = Collections.synchronizedMap(new HashMap<Integer,Socket>());
index = new HashMap<String,IndexEntry>(); index = new HashMap<String,IndexEntry>();
} }
...@@ -201,16 +155,13 @@ public class Controller { ...@@ -201,16 +155,13 @@ public class Controller {
void handleMessage(String[] message, Socket client) throws Exception { void handleMessage(String[] message, Socket client) throws Exception {
if(message[0].equals("JOIN")) { if(message[0].equals("JOIN")) {
dstores.add(Integer.parseInt(message[1])); dstores.add(Integer.parseInt(message[1]), client);
System.out.println("Dstore at " + message[1] + " joined"); System.out.println("Dstore at " + message[1] + " joined");
rebalance(); rebalance();
} }
else if(message[0].equals("STORE")) { else if(message[0].equals("STORE")) {
store(client, message[1]); store(client, message[1]);
} }
else if(message[0].equals("STORE_ACK")) {
storeAck(client, message[1]);
}
else if(message[0].equals("LOAD")) { else if(message[0].equals("LOAD")) {
load(client, message[1]); load(client, message[1]);
} }
...@@ -260,10 +211,19 @@ public class Controller { ...@@ -260,10 +211,19 @@ public class Controller {
//Send STORE_TO message //Send STORE_TO message
PrintWriter out = new PrintWriter(client.getOutputStream()); PrintWriter out = new PrintWriter(client.getOutputStream());
out.print("STORE_TO"); out.print("STORE_TO");
out.flush();
for(int port : storesToStore) { for(int port : storesToStore) {
out.print(" " + port); out.print(" " + port);
new Thread(() -> {
String[] message = dstores.get(Integer.valueOf(port)).receive().split(" ");
if(message[0].equals("STORE_ACK")) {
storeAck(Integer.valueOf(port), message[1]);
}
else {
//Log error
}
}).start();
} }
out.flush();
//Wait for STORE_ACKs from datastores in storesToStore //Wait for STORE_ACKs from datastores in storesToStore
try { try {
...@@ -291,16 +251,14 @@ public class Controller { ...@@ -291,16 +251,14 @@ public class Controller {
}).start(); }).start();
} }
void storeAck(Socket client, String filename) throws Exception { void storeAck(Integer port, String filename) throws Exception {
new Thread(() -> {
if(!index.containsKey(filename)) { if(!index.containsKey(filename)) {
//Throw logging exception //Throw logging exception
return; return;
} }
IndexEntry thisEntry = index.get(filename); IndexEntry thisEntry = index.get(filename);
thisEntry.addStoredBy(Integer.valueOf(client.getPort())); thisEntry.addStoredBy(port);
}).start();
} }
void load(Socket client, String filename) throws Exception { void load(Socket client, String filename) throws Exception {
...@@ -380,13 +338,16 @@ public class Controller { ...@@ -380,13 +338,16 @@ public class Controller {
entry.status = "remove in progress"; entry.status = "remove in progress";
//Send REMOVE message to all Dstores storing the file //Send REMOVE message to all Dstores storing the file
for(Integer dstore : entry.storedBy) { for(Integer dstore : entry.getStoredBy()) {
Socket socket = new Socket(InetAddress.getLocalHost(), dstore.intValue()); new Thread(() -> {
PrintWriter out = new PrintWriter(socket.getOutputStream()); String[] message = dstores.get(dstore).sendAndReceive("REMOVE").split(" ");
out.write("REMOVE " + filename); if(message[0].equals("REMOVE_ACK") && message[1].equals(filename)) {
out.flush(); entry.removeStoredBy(dstore.intValue());
out.close(); }
socket.close(); else {
//Log error
}
}).start();
} }
//Wait for REMOVE_ACKs from all Dstores which were sent the REMOVE message //Wait for REMOVE_ACKs from all Dstores which were sent the REMOVE message
...@@ -440,13 +401,11 @@ public class Controller { ...@@ -440,13 +401,11 @@ public class Controller {
rebalanceMessages = dstoreFiles; rebalanceMessages = dstoreFiles;
try { try {
//Send LIST message to each Dstore and receive their file list //Send LIST message to each Dstore and receive their file list
for(Integer dstore : dstores) { for(Integer dstore : dstores.keySet()) {
Socket socket = new Socket(InetAddress.getLocalHost(), dstore.intValue()); new Thread(() -> {
PrintWriter out = new PrintWriter(socket.getOutputStream()); String[] message = dstores.get(dstore).sendAndReceive("LIST").split(" ");
out.write("LIST"); receiveDstoreList(dstore.intValue(), message);
out.flush(); }).start();
out.close();
socket.close();
} }
dstoreFiles.wait(timeout); dstoreFiles.wait(timeout);
...@@ -458,11 +417,41 @@ public class Controller { ...@@ -458,11 +417,41 @@ public class Controller {
//Each file appears rFactor times //Each file appears rFactor times
//Each file appears at most once on each datastore //Each file appears at most once on each datastore
//Files are evenly distributed (Dstores differ in capacity by at most 1, no 2 datastores have identical file lists) //Files are evenly distributed (Dstores differ in capacity by at most 1, no 2 datastores have identical file lists)
List<Integer> storeOrder = reshuffle(dstoreFiles.keySet());
List<String> fileList = new ArrayList<String>();
for(Integer i : reshuffle(dstoreFiles.keySet())) { for(Integer i : reshuffle(dstoreFiles.keySet())) {
for(String s : dstoreFiles.get(i)) {
if(!fileList.contains(s)) {
fileList.add(s);
}
}
}
Map<Integer,List<String>> newAlloc = new HashMap<Integer,String[]>();
int pos = 0;
int storeSize = Math.ceiling((fileList.size() * rFactor) / dstores.size());
for(Integer i : dstoreFiles.keySet()) {
newAlloc.add(i, new ArrayList<String>(storeSize));
}
for(String file : fileList) {
for(int j=0; j<rFactor; j++) {
newAlloc.get(pos).add(file);
pos ++;
if(pos >= newAlloc.size()) pos = 0;
}
} }
//Make a (files to send, files to remove) pair for each Dstore //Make a (files to send, files to remove) pair for each Dstore
/*
Map<Integer,String> outMessages = new HashMap<Integer,String>();
for(Integer dstore : storeOrder) {
String[] oldFiles = dstoreFiles.get(dstore);
List<String> newFiles = newAlloc.get(dstore);
for(String file : oldFiles) {
if(!newFiles.)
}
}
*/
//Send the respective REBALANCE message to each Dstore //Send the respective REBALANCE message to each Dstore
...@@ -471,20 +460,29 @@ public class Controller { ...@@ -471,20 +460,29 @@ public class Controller {
catch(IOException e) { catch(IOException e) {
e.printStackTrace(); e.printStackTrace();
} }
catch(Exception e) {
e.printStackTrace();
}
finally { finally {
rebalanceMessages = null; rebalanceMessages = null;
} }
}).start(); }).start();
} }
void receiveDstoreList(Socket client, String[] list) { void receiveDstoreList(int port, String[] list) {
new Thread(() -> {
if(rebalanceMessages == null) return; if(rebalanceMessages == null) return;
rebalanceMessages.add(Integer.valueOf(client.getPort()), list);
for(String file : list) {
if(!index.containsKey(file)) {
//Log error
return; //Throw exception?
}
}
rebalanceMessages.add(port, list);
if(rebalanceMessages.size() == dstores.size()) { if(rebalanceMessages.size() == dstores.size()) {
rebalanceMessages.notify(); rebalanceMessages.notify();
} }
}).start();
} }
List<Integer> reshuffle(Collection<Integer> col) { List<Integer> reshuffle(Collection<Integer> col) {
......
...@@ -11,12 +11,17 @@ public class Dstore { ...@@ -11,12 +11,17 @@ public class Dstore {
protected int cport; //Controller's port to talk to protected int cport; //Controller's port to talk to
protected int timeout; //in milliseconds protected int timeout; //in milliseconds
protected String fileFolder; //Where to store the data locally protected String fileFolder; //Where to store the data locally
protected Map<String,Integer> fileSizes;
protected BufferedReader controllerIn;
protected PrintWriter controllerOut;
public Dstore(int port, int cport, int timeout, String fileFolder) { public Dstore(int port, int cport, int timeout, String fileFolder) {
this.port = port; this.port = port;
this.cport = cport; this.cport = cport;
this.timeout = timeout; this.timeout = timeout;
this.fileFolder = fileFolder; this.fileFolder = fileFolder;
fileSizes = new HashMap<String,Integer>();
} }
public static void main(String[] args) { public static void main(String[] args) {
...@@ -42,10 +47,22 @@ public class Dstore { ...@@ -42,10 +47,22 @@ public class Dstore {
public void start() { public void start() {
try { try {
Socket socket = new Socket(InetAddress.getLocalHost(), cport); Socket socket = new Socket(InetAddress.getLocalHost(), cport);
BufferedWriter out = new BufferedWriter(new OutputStreamWriter(socket.getOutputStream())); controllerIn = new BufferedReader(new InputStreamReader(socket.getInputStream()));
out.write("JOIN " + port); controllerOut = new PrintWriter(socket.getOutputStream());
out.close(); controllerOut.print("JOIN " + port);
socket.close(); controllerOut.flush();
new Thread(() -> {
while(true) {
try {
String[] message = controllerIn.readLine().split(" ");
handleMessage(message, socket, controllerIn);
}
catch(Exception e) {
e.printStackTrace();
}
}
}).start();
ServerSocket server = new ServerSocket(port); ServerSocket server = new ServerSocket(port);
...@@ -76,13 +93,13 @@ public class Dstore { ...@@ -76,13 +93,13 @@ public class Dstore {
load(client, message[1]); load(client, message[1]);
} }
else if(message[0].equals("REMOVE")) { else if(message[0].equals("REMOVE")) {
remove(client, message[1]); remove(message[1]);
} }
else if(message[0].equals("LIST")) { else if(message[0].equals("LIST")) {
list(client); list();
} }
else if(message[0].equals("REBALANCE")) { else if(message[0].equals("REBALANCE")) {
rebalance(client, message); rebalance(message);
} }
else { else {
//Log error and continue (throw exception?) //Log error and continue (throw exception?)
...@@ -115,6 +132,9 @@ public class Dstore { ...@@ -115,6 +132,9 @@ public class Dstore {
controllerOut.print("STORE_ACK " + filename); controllerOut.print("STORE_ACK " + filename);
controllerOut.flush(); controllerOut.flush();
controllerOut.close(); controllerOut.close();
if(fileSizes.containsKey(filename)) fileSizes.remove(filename);
fileSizes.add(filename, filesize);
} }
catch(IOException e) { catch(IOException e) {
e.printStackTrace(); e.printStackTrace();
...@@ -153,24 +173,22 @@ public class Dstore { ...@@ -153,24 +173,22 @@ public class Dstore {
}).start(); }).start();
} }
void remove(Socket client, String filename) throws Exception { void remove(String filename) throws Exception {
new Thread(() -> { new Thread(() -> {
try { try {
//Remove the file from fileFolder //Remove the file from fileFolder
Path path = new File(fileFolder + "/" + filename).toPath(); Path path = new File(fileFolder + "/" + filename).toPath();
PrintWriter out = new PrintWriter(client.getOutputStream());
if(Files.deleteIfExists(path)) { if(Files.deleteIfExists(path)) {
//Send REMOVE_ACK message to client (the controller) //Send REMOVE_ACK message to client (the controller)
out.print("REMOVE_ACK"); controllerOut.print("REMOVE_ACK");
} }
else { else {
//Send DOES NOT EXIST error //Send DOES NOT EXIST error
out.print("ERROR DOES_NOT_EXIST " + filename); controllerOut.print("ERROR DOES_NOT_EXIST " + filename);
} }
out.flush(); controllerOut.flush();
out.close();
} }
catch(IOException e) { catch(IOException e) {
e.printStackTrace(); e.printStackTrace();
...@@ -178,16 +196,14 @@ public class Dstore { ...@@ -178,16 +196,14 @@ public class Dstore {
}).start(); }).start();
} }
void list(Socket client) throws Exception { void list() throws Exception {
new Thread(() -> { new Thread(() -> {
try { try {
//Send a list of all files in fileFolder to client (the controller) //Send a list of all files in fileFolder to client (the controller)
PrintWriter out = new PrintWriter(client.getOutputStream());
for(File file : new File(fileFolder).listFiles()) { for(File file : new File(fileFolder).listFiles()) {
out.print(file.getName()); controllerOut.print(file.getName());
out.flush(); controllerOut.flush();
} }
out.close();
} }
catch(IOException e) { catch(IOException e) {
e.printStackTrace(); e.printStackTrace();
...@@ -195,7 +211,7 @@ public class Dstore { ...@@ -195,7 +211,7 @@ public class Dstore {
}).start(); }).start();
} }
void rebalance(Socket client, String[] message) throws Exception { void rebalance(String[] message) throws Exception {
new Thread(() -> { new Thread(() -> {
try { try {
//Interpret files to send and files to remove from the message //Interpret files to send and files to remove from the message
...@@ -232,7 +248,34 @@ public class Dstore { ...@@ -232,7 +248,34 @@ public class Dstore {
//Send each file to send to the Dstore at the specified port number //Send each file to send to the Dstore at the specified port number
for(String filename : filesToSend.keySet()) { for(String filename : filesToSend.keySet()) {
for(Integer dstore : filesToSend.get(filename)) { for(Integer dstore : filesToSend.get(filename)) {
//Same store functions as used in the client object try {
Socket socket = new Socket(InetAddress.getLocalHost(), dstore.intValue());
PrintWriter out = new PrintWriter(socket.getOutputStream());
out.write("STORE " + filename + " " + fileSizes.get(filename));
out.flush();
BufferedReader in = new BufferedReader(new InputStreamReader(socket.getInputStream()));
if(!in.readLine().equals("ACK")) {
//Log error
}
String line;
while(line = new BufferedReader(new FileInputStream(fileFolder + "/" + filename))) {
if(line == null) {
out.write("");
out.flush();
break;
}
else {
out.write(line);
out.flush();
}
}
out.close();
socket.close();
}
catch(IOException e) {
e.printStackTrace();
}
} }
} }
...@@ -242,10 +285,8 @@ public class Dstore { ...@@ -242,10 +285,8 @@ public class Dstore {
} }
//Send REBALANCE_COMPLETE message to client (the controller) //Send REBALANCE_COMPLETE message to client (the controller)
PrintWriter out = new PrintWriter(client.getOutputStream()); controllerOut.print("REBALANCE COMPLETE");
out.print("REBALANCE COMPLETE"); controllerOut.flush();
out.flush();
out.close();
} }
catch(IOException e) { catch(IOException e) {
e.printStackTrace(); e.printStackTrace();
......
import java.io.*;
import java.net.*;
import java.lang.Runnable;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.util.HashMap;
import java.util.Collection;
public class DstoreConnection {
protected Socket socket;
protected BufferedReader reader;
protected PrintWriter writer;
public DstoreConnection(Socket socket) {
this.socket = socket;
reader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
writer = new PrintWriter(socket.getOutputStream());
}
public String sendAndReceive(String message) {
try {
writer.print(message);
writer.flush();
}
catch(IOException e) {
e.printStackTrace();
return "";
}
return receive();
}
public String receive() {
try {
String returnMessage = reader.readLine();
return returnMessage;
}
catch(IOException e) {
e.printStackTrace();
return "";
}
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment