Skip to content
Snippets Groups Projects
Commit e3df9a8e authored by dl3g19's avatar dl3g19
Browse files

Fixing bugs with the new implementation

parent 8124d07c
Branches
No related tags found
No related merge requests found
......@@ -323,7 +323,7 @@ public class Controller {
}
else {
//Log error
System.err.println("Dstore " + thisStore + " timed out receiving STORE_ACK");
System.err.println("Dstore " + thisStore + " timed out receiving STORE_ACK for " + filename);
}
}
catch(DstoreDisconnectException e) {
......@@ -545,7 +545,7 @@ public class Controller {
Thread thisThread = new Thread(() -> {
try {
//String[] message = dstores.get(dstore).sendAndReceive(Protocol.LIST_TOKEN).split(" ");
String message = dstores.get(dstore).sendAndReceive(Protocol.LIST_TOKEN);
String message = dstores.get(dstore).sendAndReceive(Protocol.LIST_TOKEN, Protocol.LIST_TOKEN);
if(message != null) {
receiveDstoreList(dstore.intValue(), message, dstoreFilesR, listLatch);
}
......@@ -635,8 +635,9 @@ public class Controller {
void receiveDstoreList(int port, String list, Map<Integer,List<String>> dstoreFiles, CountDownLatch latch) {
List<String> toList = new ArrayList<String>();
if(!list.equals("")) {
for(String file : list.split(" ")) {
toList.add(file);
String[] files = list.split(" ");
for(int i=1; i<files.length; i++) {
toList.add(files[i]);
}
}
......
......@@ -261,12 +261,12 @@ public class Dstore {
void list() throws Exception {
new Thread(() -> {
//Send a list of all files in fileFolder to client (the controller)
String message = "";
String message = Protocol.LIST_TOKEN;
for(File file : fileFolder.listFiles()) {
message = message + " " + file.getName();
}
synchronized(controllerOut) {
controllerOut.println(message.trim());
controllerOut.println(message);
}
}).start();
}
......
......@@ -12,7 +12,7 @@ import java.util.Iterator;
import java.util.Arrays;
public class DstoreConnection {
protected final int MAX_QUEUE_SIZE = 50;
protected final int MAX_QUEUE_SIZE = 20;
protected Socket socket;
protected int port;
......@@ -20,21 +20,19 @@ public class DstoreConnection {
protected PrintWriter writer;
protected boolean available;
protected boolean disconnectThrown;
protected List<String> queue;
protected final List<String> TOKENS;
protected List<Message> queue;
protected int timeout;
public DstoreConnection(Socket socket, int port, int timeout) {
this.socket = socket;
this.port = port;
this.timeout = timeout;
TOKENS = Arrays.asList(Protocol.ACK_TOKEN, Protocol.STORE_ACK_TOKEN, Protocol.REMOVE_ACK_TOKEN, Protocol.JOIN_TOKEN, Protocol.REBALANCE_STORE_TOKEN, Protocol.REBALANCE_COMPLETE_TOKEN);
try {
reader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
writer = new PrintWriter(socket.getOutputStream(), true);
available = true;
disconnectThrown = false;
queue = new ArrayList<String>();
queue = new ArrayList<Message>();
new Thread(new Receiver()).start();
}
catch(IOException e) {
......@@ -47,6 +45,14 @@ public class DstoreConnection {
}
}
protected class Message {
public boolean taken = false;
public String string;
public Message(String string) {
this.string = string;
}
}
public int getPort() {
return port;
}
......@@ -73,40 +79,40 @@ public class DstoreConnection {
protected void enqueue(String message) {
synchronized(queue) {
queue.add(0, message);
queue.add(0, new Message(message));
if(queue.size() > MAX_QUEUE_SIZE) queue.remove(queue.size() - 1);
}
}
//Check the queue for the message before trying to receive any new messages
protected String checkQueue(String[] expectedMessages) {
Iterator<String> it;
List<Message> queueState;
synchronized(queue) {
it = queue.iterator();
queueState = new ArrayList<Message>(queue);
}
try {
while(it.hasNext()) {
String message = it.next();
if(isExpected(message, expectedMessages)) {
synchronized(queue) {it.remove();}
return message;
for(Message message : queueState) {
if(isExpected(message.string, expectedMessages)) {
synchronized(message) {
if(message.taken) continue;
message.taken = true;
synchronized(queue) {queue.remove(message);}
return message.string;
}
}
}
}
catch(Exception e) {}
catch(Exception e) {System.err.println(port + ": queue modified, must restart search");}
return null;
}
protected boolean isExpected(String message, String[] expectedMessages) {
if(expectedMessages.length == 0) {
return !TOKENS.contains(message);
}
else {
for(String s : expectedMessages) {
if(s.equals(message)) return true;
for(String s : expectedMessages) {
if(s.equals(Protocol.LIST_TOKEN)) {
if(message.split(" ")[0].equals(Protocol.LIST_TOKEN)) return true;
}
else if(s.equals(message)) return true;
}
return false;
......
This diff is collapsed.
to_store/GameDotCom.jpg

1010 KiB

File deleted
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment