Skip to content
Snippets Groups Projects
Commit c239b089 authored by dl3g19's avatar dl3g19
Browse files

Different input/output objects used. Client jar finally arrived

parent e10ec28f
No related branches found
No related tags found
No related merge requests found
import java.io.File;
import java.io.IOException;
public class ClientMain {
public static void main(String[] args) {
int cport = -1;
int timeout = -1;
try {
// parse arguments
cport = Integer.parseInt(args[0]);
timeout = Integer.parseInt(args[1]);
} catch (NumberFormatException e) {
System.err.println("Error parsing arguments: " + e.getMessage());
System.err.println("Expected: java ClientMain cport timeout");
System.exit(-1);
}
File downloadFolder = new File("downloads");
if (!downloadFolder.exists())
if (!downloadFolder.mkdir()) throw new RuntimeException("Cannot create download folder (folder absolute path: " + downloadFolder.getAbsolutePath() + ")");
testClient(cport, timeout, downloadFolder);
// example to launch a number of concurrent clients, each doing the same operations
/*for (int i = 0; i < 10; i++) {
new Thread() {
public void run() {
testClient(cport, timeout, downloadFolder);
}
}.start();
}*/
}
public static void testClient(int cport, int timeout, File downloadFolder) {
Client client = null;
try {
client = new Client(cport, timeout, Logger.LoggingType.ON_FILE_AND_TERMINAL);
try { client.connect(); } catch(IOException e) { e.printStackTrace(); return; }
try { list(client); } catch(IOException e) { e.printStackTrace(); }
try { client.store(new File("Clipboard01.pdf")); } catch(IOException e) { e.printStackTrace(); }
try { client.store(new File("Clipboard01.pdf")); } catch(IOException e) { e.printStackTrace(); }
try { client.store(new File("Clipboard01.jpg")); } catch(IOException e) { e.printStackTrace(); }
String list[] = null;
try { list = list(client); } catch(IOException e) { e.printStackTrace(); }
if (list != null)
for (String filename : list)
try { client.load(filename, downloadFolder); } catch(IOException e) { e.printStackTrace(); }
if (list != null)
for (String filename : list)
try { client.remove(filename); } catch(IOException e) { e.printStackTrace(); }
try { list(client); } catch(IOException e) { e.printStackTrace(); }
} finally {
if (client != null)
try { client.disconnect(); } catch(Exception e) { e.printStackTrace(); }
}
}
public static String[] list(Client client) throws IOException, NotEnoughDstoresException {
System.out.println("Retrieving list of files...");
String list[] = client.list();
System.out.println("Ok, " + list.length + " files:");
int i = 0;
for (String filename : list)
System.out.println("[" + i++ + "] " + filename);
return list;
}
}
...@@ -6,6 +6,7 @@ import java.util.ArrayList; ...@@ -6,6 +6,7 @@ import java.util.ArrayList;
import java.util.Map; import java.util.Map;
import java.util.HashMap; import java.util.HashMap;
import java.util.Collection; import java.util.Collection;
import java.util.Collections;
public class Controller { public class Controller {
protected int cport; //Port to listen on protected int cport; //Port to listen on
...@@ -23,7 +24,7 @@ public class Controller { ...@@ -23,7 +24,7 @@ public class Controller {
public IndexEntry() { public IndexEntry() {
filesize = -1; filesize = -1;
storedBy = new SyncList(); storedBy = Collections.synchronizedList(new ArrayList<Integer>());
numberToStore = 0; numberToStore = 0;
status = "store in progress"; status = "store in progress";
storeAckLock = new Object(); storeAckLock = new Object();
...@@ -90,7 +91,7 @@ public class Controller { ...@@ -90,7 +91,7 @@ public class Controller {
} }
} }
protected Map<Integer,Socket> dstores; protected Map<Integer,DstoreConnection> dstores;
protected Map<Integer,String[]> rebalanceMessages; protected Map<Integer,String[]> rebalanceMessages;
protected Map<String,IndexEntry> index; protected Map<String,IndexEntry> index;
...@@ -99,7 +100,7 @@ public class Controller { ...@@ -99,7 +100,7 @@ public class Controller {
this.rFactor = rFactor; this.rFactor = rFactor;
this.timeout = timeout; this.timeout = timeout;
this.rebalancePeriod = rebalancePeriod; this.rebalancePeriod = rebalancePeriod;
dstores = Collections.synchronizedMap(new HashMap<Integer,Socket>()); dstores = Collections.synchronizedMap(new HashMap<Integer,DstoreConnection>());
index = new HashMap<String,IndexEntry>(); index = new HashMap<String,IndexEntry>();
} }
...@@ -132,8 +133,9 @@ public class Controller { ...@@ -132,8 +133,9 @@ public class Controller {
BufferedReader in = new BufferedReader(new InputStreamReader(client.getInputStream())); BufferedReader in = new BufferedReader(new InputStreamReader(client.getInputStream()));
String[] message = in.readLine().split(" "); String[] message = in.readLine().split(" ");
if(dstores.size() < rFactor && !message[0].equals("JOIN")) { if(dstores.size() < rFactor && !message[0].equals("JOIN")) {
BufferedWriter out = new BufferedWriter(new OutputStreamWriter(client.getOutputStream())); PrintWriter out = new PrintWriter(client.getOutputStream());
out.write("ERROR"); out.println("ERROR");
out.flush();
out.close(); out.close();
} }
else { else {
...@@ -155,7 +157,7 @@ public class Controller { ...@@ -155,7 +157,7 @@ public class Controller {
void handleMessage(String[] message, Socket client) throws Exception { void handleMessage(String[] message, Socket client) throws Exception {
if(message[0].equals("JOIN")) { if(message[0].equals("JOIN")) {
dstores.add(Integer.parseInt(message[1]), client); dstores.put(Integer.parseInt(message[1]), new DstoreConnection(client));
System.out.println("Dstore at " + message[1] + " joined"); System.out.println("Dstore at " + message[1] + " joined");
rebalance(); rebalance();
} }
...@@ -175,13 +177,7 @@ public class Controller { ...@@ -175,13 +177,7 @@ public class Controller {
list(client); list(client);
} }
else { else {
for(String name : message) { //Log error
if(!index.containsKey(name)) {
//Log error and continue (throw exception?)
return;
}
}
receiveDstoreList(client, message);
} }
} }
...@@ -190,7 +186,7 @@ public class Controller { ...@@ -190,7 +186,7 @@ public class Controller {
try { try {
if(index.containsKey(filename)) { if(index.containsKey(filename)) {
PrintWriter out = new PrintWriter(client.getOutputStream()); PrintWriter out = new PrintWriter(client.getOutputStream());
out.print("ERROR ALREADY_EXISTS " + filename); out.println("ERROR ALREADY_EXISTS " + filename);
out.flush(); out.flush();
out.close(); out.close();
return; return;
...@@ -210,10 +206,10 @@ public class Controller { ...@@ -210,10 +206,10 @@ public class Controller {
//Send STORE_TO message //Send STORE_TO message
PrintWriter out = new PrintWriter(client.getOutputStream()); PrintWriter out = new PrintWriter(client.getOutputStream());
out.print("STORE_TO"); out.println("STORE_TO");
out.flush(); out.flush();
for(int port : storesToStore) { for(int port : storesToStore) {
out.print(" " + port); out.println(" " + port);
new Thread(() -> { new Thread(() -> {
String[] message = dstores.get(Integer.valueOf(port)).receive().split(" "); String[] message = dstores.get(Integer.valueOf(port)).receive().split(" ");
if(message[0].equals("STORE_ACK")) { if(message[0].equals("STORE_ACK")) {
...@@ -241,7 +237,7 @@ public class Controller { ...@@ -241,7 +237,7 @@ public class Controller {
entry.status = "store complete"; entry.status = "store complete";
//Send STORE_COMPLETE message //Send STORE_COMPLETE message
out.print("STORE_COMPLETE"); out.println("STORE_COMPLETE");
out.flush(); out.flush();
out.close(); out.close();
} }
...@@ -266,7 +262,7 @@ public class Controller { ...@@ -266,7 +262,7 @@ public class Controller {
try { try {
if(!index.containsKey(filename)) { if(!index.containsKey(filename)) {
PrintWriter out = new PrintWriter(client.getOutputStream()); PrintWriter out = new PrintWriter(client.getOutputStream());
out.print("ERROR DOES_NOT_EXIST"); out.println("ERROR DOES_NOT_EXIST");
out.flush(); out.flush();
out.close(); out.close();
return; return;
...@@ -279,24 +275,24 @@ public class Controller { ...@@ -279,24 +275,24 @@ public class Controller {
//Send LOAD_FROM message //Send LOAD_FROM message
PrintWriter out = new PrintWriter(client.getOutputStream()); PrintWriter out = new PrintWriter(client.getOutputStream());
out.print("LOAD_FROM " + thisStore + " " + thisSize); out.println("LOAD_FROM " + thisStore + " " + thisSize);
out.flush(); out.flush();
Reloader reloadLock = new Object(); Reloader reloadLock = new Reloader();
thisEntry.getLoadList().add(reloadLock); thisEntry.getLoadList().add(reloadLock);
int trials = 0; int trials = 0;
while(true) { while(true) {
reloadLock.wait(10 * timeout); reloadLock.wait(10 * timeout);
trials ++; trials ++;
if(trials >= rFactor || !reloadLock.reload) break; if(trials >= rFactor || !reloadLock.reload) break;
out.print("LOAD_FROM " + thisEntry.storedBy.get(trials).intValue() + " " + thisSize); out.println("LOAD_FROM " + thisEntry.storedBy.get(trials).intValue() + " " + thisSize);
out.flush(); out.flush();
reloadLock.reload = false; reloadLock.reload = false;
} }
thisEntry.getLoadList().remove(reloadLock); thisEntry.getLoadList().remove(reloadLock);
if(trials >= rFactor && reloadLock.reload) { if(trials >= rFactor && reloadLock.reload) {
out.print("ERROR LOAD"); out.println("ERROR LOAD");
out.flush(); out.flush();
} }
...@@ -327,7 +323,7 @@ public class Controller { ...@@ -327,7 +323,7 @@ public class Controller {
try { try {
if(!index.containsKey(filename)) { if(!index.containsKey(filename)) {
PrintWriter clientOut = new PrintWriter(client.getOutputStream()); PrintWriter clientOut = new PrintWriter(client.getOutputStream());
clientOut.print("ERROR DOES_NOT_EXIST"); clientOut.println("ERROR DOES_NOT_EXIST");
clientOut.flush(); clientOut.flush();
clientOut.close(); clientOut.close();
return; return;
...@@ -367,7 +363,7 @@ public class Controller { ...@@ -367,7 +363,7 @@ public class Controller {
//Send REMOVE_COMPLETE to client //Send REMOVE_COMPLETE to client
PrintWriter clientOut = new PrintWriter(client.getOutputStream()); PrintWriter clientOut = new PrintWriter(client.getOutputStream());
clientOut.print("REMOVE_COMPLETE"); clientOut.println("REMOVE_COMPLETE");
clientOut.flush(); clientOut.flush();
clientOut.close(); clientOut.close();
} }
...@@ -427,11 +423,11 @@ public class Controller { ...@@ -427,11 +423,11 @@ public class Controller {
} }
} }
Map<Integer,List<String>> newAlloc = new HashMap<Integer,String[]>(); Map<Integer,List<String>> newAlloc = new HashMap<Integer,List<String>>();
int pos = 0; int pos = 0;
int storeSize = Math.ceiling((fileList.size() * rFactor) / dstores.size()); int storeSize = Math.ceiling((fileList.size() * rFactor) / dstores.size());
for(Integer i : dstoreFiles.keySet()) { for(Integer i : dstoreFiles.keySet()) {
newAlloc.add(i, new ArrayList<String>(storeSize)); newAlloc.put(i, new ArrayList<String>(storeSize));
} }
for(String file : fileList) { for(String file : fileList) {
for(int j=0; j<rFactor; j++) { for(int j=0; j<rFactor; j++) {
...@@ -479,7 +475,7 @@ public class Controller { ...@@ -479,7 +475,7 @@ public class Controller {
} }
} }
rebalanceMessages.add(port, list); rebalanceMessages.put(port, list);
if(rebalanceMessages.size() == dstores.size()) { if(rebalanceMessages.size() == dstores.size()) {
rebalanceMessages.notify(); rebalanceMessages.notify();
} }
......
...@@ -49,7 +49,7 @@ public class Dstore { ...@@ -49,7 +49,7 @@ public class Dstore {
Socket socket = new Socket(InetAddress.getLocalHost(), cport); Socket socket = new Socket(InetAddress.getLocalHost(), cport);
controllerIn = new BufferedReader(new InputStreamReader(socket.getInputStream())); controllerIn = new BufferedReader(new InputStreamReader(socket.getInputStream()));
controllerOut = new PrintWriter(socket.getOutputStream()); controllerOut = new PrintWriter(socket.getOutputStream());
controllerOut.print("JOIN " + port); controllerOut.println("JOIN " + port);
controllerOut.flush(); controllerOut.flush();
new Thread(() -> { new Thread(() -> {
...@@ -111,25 +111,24 @@ public class Dstore { ...@@ -111,25 +111,24 @@ public class Dstore {
try { try {
//Send ACK message to client //Send ACK message to client
PrintWriter out = new PrintWriter(client.getOutputStream()); PrintWriter out = new PrintWriter(client.getOutputStream());
out.print("ACK"); out.println("ACK");
out.flush(); out.flush();
out.close(); out.close();
FileOutputStream writer = new FileOutputStream(fileFolder + "/" + filename, false); FileOutputStream writer = new FileOutputStream(fileFolder + "/" + filename, false);
InputStream reader = client.getInputStream();
//Receive + write file content from client //Receive + write file content from client
int byteCount = filesize; byte[] nextLine = new byte[8];
while(byteCount > 0) { while(reader.readNBytes(nextLine, 0, 8) > 0) {
byte[] nextLine = in.readLine().getBytes();
writer.write(nextLine); writer.write(nextLine);
writer.flush(); writer.flush();
byteCount -= nextLine.length;
} }
writer.close(); writer.close();
//Send STORE_ACK message to the Controller //Send STORE_ACK message to the Controller
PrintWriter controllerOut = new PrintWriter(new Socket(InetAddress.getLocalHost(), cport).getOutputStream()); PrintWriter controllerOut = new PrintWriter(new Socket(InetAddress.getLocalHost(), cport).getOutputStream());
controllerOut.print("STORE_ACK " + filename); controllerOut.println("STORE_ACK " + filename);
controllerOut.flush(); controllerOut.flush();
controllerOut.close(); controllerOut.close();
...@@ -152,20 +151,22 @@ public class Dstore { ...@@ -152,20 +151,22 @@ public class Dstore {
reader = new FileInputStream(fileFolder + "/" + filename); reader = new FileInputStream(fileFolder + "/" + filename);
} }
catch(FileNotFoundException e) { catch(FileNotFoundException e) {
out.print("ERROR DOES_NOT_EXIST"); out.println("ERROR DOES_NOT_EXIST");
out.flush(); out.flush();
out.close(); out.close();
return; return;
} }
OutputStream contentOut = client.getOutputStream();
byte[] buf = new byte[8]; byte[] buf = new byte[8];
while(reader.read(buf) != -1) { while(reader.read(buf) != -1) {
out.print(new String(buf)); contentOut.write(new String(buf));
out.flush(); contentOut.flush();
} }
reader.close(); reader.close();
out.close(); out.close();
contentOut.close();
} }
catch(IOException e) { catch(IOException e) {
e.printStackTrace(); e.printStackTrace();
...@@ -181,11 +182,11 @@ public class Dstore { ...@@ -181,11 +182,11 @@ public class Dstore {
if(Files.deleteIfExists(path)) { if(Files.deleteIfExists(path)) {
//Send REMOVE_ACK message to client (the controller) //Send REMOVE_ACK message to client (the controller)
controllerOut.print("REMOVE_ACK"); controllerOut.println("REMOVE_ACK");
} }
else { else {
//Send DOES NOT EXIST error //Send DOES NOT EXIST error
controllerOut.print("ERROR DOES_NOT_EXIST " + filename); controllerOut.println("ERROR DOES_NOT_EXIST " + filename);
} }
controllerOut.flush(); controllerOut.flush();
...@@ -201,7 +202,7 @@ public class Dstore { ...@@ -201,7 +202,7 @@ public class Dstore {
try { try {
//Send a list of all files in fileFolder to client (the controller) //Send a list of all files in fileFolder to client (the controller)
for(File file : new File(fileFolder).listFiles()) { for(File file : new File(fileFolder).listFiles()) {
controllerOut.print(file.getName()); controllerOut.println(file.getName());
controllerOut.flush(); controllerOut.flush();
} }
} }
...@@ -215,26 +216,27 @@ public class Dstore { ...@@ -215,26 +216,27 @@ public class Dstore {
new Thread(() -> { new Thread(() -> {
try { try {
//Interpret files to send and files to remove from the message //Interpret files to send and files to remove from the message
Map<String,Integer[]> filesToSend; Map<Integer,List<String>> filesToSend;
String[] filesToRemove; String[] filesToRemove;
int index; int index;
int numberToSend = Integer.parseInt(message[1]); int numberToSend = Integer.parseInt(message[1]);
index = 2; index = 2;
filesToSend = new HashMap<String,Integer[]>(numberToSend); filesToSend = new HashMap<Integer,List<String>>();
for(int i=0; i<numberToSend; i++) { for(int i=0; i<numberToSend; i++) {
String name = message[index]; String name = message[index];
index++; index++;
int numberOfReceivers = Integer.parseInt(message[index]); int numberOfReceivers = Integer.parseInt(message[index]);
index++; index++;
Integer[] receivers = new Integer[numberOfReceivers];
for(int j=0; j<numberOfReceivers; j++) { for(int j=0; j<numberOfReceivers; j++) {
receivers[j] = Integer.parseInt(message[index]); Integer receiver = Integer.parseInt(message[index]);
if(!filesToSend.containsKey(receiver)) {
filesToSend.put(receiver,new ArrayList<String>());
}
filesToSend.get(receiver).add(name);
index++; index++;
} }
filesToSend.put(name, receivers);
} }
int numberToRemove = Integer.parseInt(message[index]); int numberToRemove = Integer.parseInt(message[index]);
...@@ -246,36 +248,37 @@ public class Dstore { ...@@ -246,36 +248,37 @@ public class Dstore {
} }
//Send each file to send to the Dstore at the specified port number //Send each file to send to the Dstore at the specified port number
for(String filename : filesToSend.keySet()) { for(Integer dstore : filesToSend.keySet()) {
for(Integer dstore : filesToSend.get(filename)) { for(String filename : filesToSend.get(dstore)) {
try { new Thread(() -> {
Socket socket = new Socket(InetAddress.getLocalHost(), dstore.intValue()); try {
PrintWriter out = new PrintWriter(socket.getOutputStream()); Socket socket = new Socket(InetAddress.getLocalHost(), dstore.intValue());
out.write("STORE " + filename + " " + fileSizes.get(filename)); PrintWriter out = new PrintWriter(socket.getOutputStream());
out.flush(); out.println("STORE " + filename + " " + fileSizes.get(filename));
out.flush();
BufferedReader in = new BufferedReader(new InputStreamReader(socket.getInputStream())); out.close();
if(!in.readLine().equals("ACK")) {
//Log error BufferedReader in = new BufferedReader(new InputStreamReader(socket.getInputStream()));
} if(!in.readLine().equals("ACK")) {
String line; //Log error
while(line = new BufferedReader(new FileInputStream(fileFolder + "/" + filename))) {
if(line == null) {
out.write("");
out.flush();
break;
} }
else { in.close();
out.write(line);
out.flush(); byte[] content = new byte[8];
FileInputStream fileIn = new FileInputStream(fileFolder + "/" + filename);
OutputStream fileOut = client.getOutputStream();
while(fileIn.read(content) > 0) {
fileOut.write(content);
fileOut.flush();
} }
fileIn.close();
fileOut.close();
socket.close();
} }
out.close(); catch(IOException e) {
socket.close(); e.printStackTrace();
} }
catch(IOException e) { }).start();
e.printStackTrace();
}
} }
} }
......
...@@ -19,18 +19,26 @@ public class DstoreConnection { ...@@ -19,18 +19,26 @@ public class DstoreConnection {
} }
public String sendAndReceive(String message) { public String sendAndReceive(String message) {
try { synchronized(this) {
writer.print(message); try {
writer.flush(); writer.println(message);
} writer.flush();
catch(IOException e) { }
e.printStackTrace(); catch(IOException e) {
return ""; e.printStackTrace();
return "";
}
return localReceive();
} }
return receive();
} }
public String receive() { public String receive() {
synchronized(this) {
return localReceive();
}
}
protected String localReceive() {
try { try {
String returnMessage = reader.readLine(); String returnMessage = reader.readLine();
return returnMessage; return returnMessage;
......
File added
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment