Commit 8d006e83 authored by es6g19's avatar es6g19
Browse files

Update client-1.0.2.jar, ClientMain.java, Controller.java,...

Update client-1.0.2.jar, ClientMain.java, Controller.java, ControllerLogger.java, Dstore.java, DstoreLogger.java, Index.java, Logger.java, Protocol.java, Rebalancer.java, Test.java files
parents
import java.io.File;
import java.io.IOException;
import java.util.Random;
public class ClientMain {
public static void main(String[] args) throws Exception{
final int cport = Integer.parseInt(args[0]);
int timeout = Integer.parseInt(args[1]);
File downloadFolder = new File("downloads");
if (!downloadFolder.exists())
if (!downloadFolder.mkdir()) throw new RuntimeException("Cannot create download folder (folder absolute path: " + downloadFolder.getAbsolutePath() + ")");
File uploadFolder = new File("to_store");
if (!uploadFolder.exists())
throw new RuntimeException("to_store folder does not exist");
// testClient(cport, timeout, downloadFolder);
// example to launch a number of concurrent clients, each doing the same operations
for (int i = 0; i < 10; i++) {
new Thread() {
public void run() {
test2Client(cport, timeout, downloadFolder, uploadFolder);
}
}.start();
}
}
public static void test2Client(int cport, int timeout, File downloadFolder, File uploadFolder) {
Client client = null;
try {
client = new Client(cport, timeout, Logger.LoggingType.ON_FILE_AND_TERMINAL);
client.connect();
Random random = new Random(System.currentTimeMillis() * System.nanoTime());
File fileList[] = uploadFolder.listFiles();
for (int i=0; i<fileList.length/2; i++) {
File fileToStore = fileList[random.nextInt(fileList.length)];
try {
client.store(fileToStore);
} catch (Exception e) {
System.out.println("Error storing file " + fileToStore);
e.printStackTrace();
}
}
String list[] = null;
try { list = list(client); } catch(IOException e) { e.printStackTrace(); }
for (int i = 0; i < list.length/4; i++) {
String fileToRemove = list[random.nextInt(list.length)];
try {
client.remove(fileToRemove);
} catch (Exception e) {
System.out.println("Error remove file " + fileToRemove);
e.printStackTrace();
}
}
try { list = list(client); } catch(IOException e) { e.printStackTrace(); }
} catch(IOException e) {
e.printStackTrace();
} finally {
if (client != null)
try { client.disconnect(); } catch(Exception e) { e.printStackTrace(); }
}
}
public static void testClient(int cport, int timeout, File downloadFolder) {
Client client = null;
try {
client = new Client(cport, timeout, Logger.LoggingType.ON_FILE_AND_TERMINAL);
try { client.connect(); } catch(IOException e) { e.printStackTrace(); return; }
try { list(client); } catch(IOException e) { e.printStackTrace(); }
try { client.store(new File("Clipboard01.pdf")); } catch(IOException e) { e.printStackTrace(); }
try { client.store(new File("Clipboard01.pdf")); } catch(IOException e) { e.printStackTrace(); }
try { client.store(new File("Clipboard01.jpg")); } catch(IOException e) { e.printStackTrace(); }
String list[] = null;
try { list = list(client); } catch(IOException e) { e.printStackTrace(); }
if (list != null)
for (String filename : list)
try { client.load(filename, downloadFolder); } catch(IOException e) { e.printStackTrace(); }
/*if (list != null)
for (String filename : list)
try { client.remove(filename); } catch(IOException e) { e.printStackTrace(); }
try { client.remove(list[0]); } catch(IOException e) { e.printStackTrace(); }
try { list(client); } catch(IOException e) { e.printStackTrace(); }*/
} finally {
if (client != null)
try { client.disconnect(); } catch(Exception e) { e.printStackTrace(); }
}
}
public static String[] list(Client client) throws IOException, NotEnoughDstoresException {
System.out.println("Retrieving list of files...");
String list[] = client.list();
System.out.println("Ok, " + list.length + " files:");
int i = 0;
for (String filename : list)
System.out.println("[" + i++ + "] " + filename);
return list;
}
}
import java.io.*;
import java.net.*;
import java.util.*;
public final class Controller { //only one controller therefore final
private static ServerSocket server;
private static int cport, r, timeout, rebalancePeriod ;
private static int dStoreCount; //stores the number of dstores to be counted
private static ArrayList<Integer> dStores;
private static Index INDEX = new Index();
private static int storesUnAck;
private static int removesUnAck;
private static HashMap<Integer,ArrayList<Integer>> invalidPorts;
//is a map for each sockets invalid ports, so we know which socket has tried which to load a given file
private static int socketCount;
public static void main(String[] args){
// instantiating the variables required for the controllers
String _cport = args[0];
String _R = args[1];
String _timeout = args[2];
String _rebalance_period = args[3]; // time in seconds to rebalance the files
dStores = new ArrayList<Integer>();
invalidPorts = new HashMap<Integer,ArrayList<Integer>>();
try{
ControllerLogger.init(Logger.LoggingType.ON_FILE_AND_TERMINAL);
cport = Integer.parseInt(_cport);
r = Integer.parseInt(_R);
timeout = Integer.parseInt(_timeout);
rebalancePeriod = Integer.parseInt(_rebalance_period);
rebalanceTimer(rebalancePeriod);
socketServer();
}catch(IllegalArgumentException e){
ControllerLogger.getInstance().log("ARGUMENTS IN THE INCORRECT FORMAT");
}catch(IOException e){
ControllerLogger.getInstance().log("LOGGER ERROR: "+e.getMessage());
}
}
public static void rebalanceTimer(int intervalInSeconds){
Timer timer = new Timer();
timer.scheduleAtFixedRate(new Rebalancer(),0,intervalInSeconds*1000);
}
public static void rebalance(){
ControllerLogger.getInstance().log("rebalancing");
}
public static boolean isReady(){
if(dStoreCount>=r){
return true;
}else{
return false;
}
}
private static void decodeClientMessage(String msg, int id){
//All client messages start with operation, followed by 0 or more parameters
String[] msgSplit = msg.split(" ");
String operation = msgSplit[0];
String fileName;
//for every operation that isnt a reload, we reset the invalid ports for that socketID
//as it shows the client has been able to continue
switch (operation) {
case Protocol.STORE_TOKEN:
fileName = msgSplit[1];
String fileSize = msgSplit[2];
storeFileToClient(fileName, fileSize);
invalidPorts.get(id).clear();
return;
case Protocol.LOAD_TOKEN:
fileName = msgSplit[1];
loadFile(fileName,id);
invalidPorts.get(id).clear();
return;
case Protocol.RELOAD_TOKEN:
fileName = msgSplit[1];
reloadFile(fileName, id);
return;
case Protocol.REMOVE_TOKEN:
invalidPorts.get(id).clear();
fileName = msgSplit[1];
removeFile(fileName);
return;
case Protocol.LIST_TOKEN:
invalidPorts.get(id).clear();
return;
default:
ControllerLogger.getInstance().log("ERROR FROM CLIENT: Invalid message received");
return;
}
}
private static void decodeDstoreMessage(String msg, int port){
//All client messages start with operation, followed by 0 or more parameters
String[] msgSplit = msg.split(" ");
String operation = msgSplit[0];
switch (operation) {
case Protocol.STORE_ACK_TOKEN:
storeAck();
return;
case Protocol.REMOVE_ACK_TOKEN:
removeAck();
return;
default:
ControllerLogger.getInstance().log("ERROR FROM DSTORE:" + port + "Invalid initial message received");
}
}
private static void connect(BufferedReader inputStream, Socket socket)throws IOException{
String connectMsg = inputStream.readLine();
String[] msgSplit = connectMsg.split(" ");
String type = msgSplit[0];
String msg;
if(type.equals(Protocol.JOIN_TOKEN)){ //connects DSTORE (ie, performs DSTORE protocols)
int port = Integer.parseInt(msgSplit[1]);
dStores.add(port);//adds the port to the array of DStore ports
ControllerLogger.getInstance().dstoreJoined(socket,port);
dStoreCount +=1;
while((msg = inputStream.readLine())!= null){
ControllerLogger.getInstance().messageReceived(socket, msg);
decodeDstoreMessage(msg,port);
};
}else{//connects CLIENT (ie, performs DSTORE protocols)
boolean firstMessage = true;
int socketId = socketCount;
socketCount++;
invalidPorts.put(socketId,new ArrayList<Integer>());
while((msg = inputStream.readLine())!= null||firstMessage){
ControllerLogger.getInstance().messageReceived(socket, msg);
if(isReady()) {
decodeClientMessage(msg, socketId);
}else{
sendMsgToClient(Protocol.ERROR_NOT_ENOUGH_DSTORES_TOKEN);
}
firstMessage= false;
};
}
}
public static void socketServer(){
try{
ControllerLogger.getInstance().log("Starting Server on "+cport);
server = new ServerSocket(cport);
for(;;){
try{final Socket client = server.accept();
new Thread(new Runnable(){
public void run(){try{
BufferedReader in = new BufferedReader(new InputStreamReader(client.getInputStream()));
connect(in, client);
client.close();
}catch(IOException e){}
}
}).start();
}catch(Exception e){ControllerLogger.getInstance().log("ERROR "+e);}
}
}catch(Exception e){ControllerLogger.getInstance().log("ERROR "+e);}
}
public static void sendMsg(String msg, int port){
try{
Socket socket = new Socket("localhost",port);
PrintWriter out = new PrintWriter(socket.getOutputStream());
out.println(msg);
out.flush();
ControllerLogger.getInstance().messageSent(socket, msg);
socket.close();
}catch(IOException e){
ControllerLogger.getInstance().log("ERROR SENDING MESSAGE TO"+port+" ERROR:" + e);
}
}
public static void sendMsgToClient(String msg){
sendMsg(msg,cport);
}
public static void sendMsgToAllDstores(String msg){
for(int port: dStores){
sendMsg(msg,port);
}
}
/** INDIVIDUAL MESSAGE FUNCTIONS**/
public static void storeFileToClient(String fileName, String fileSize){
if(INDEX.containsFile(fileName)){
sendMsgToClient(Protocol.ERROR_FILE_ALREADY_EXISTS_TOKEN);
return;
}
while((INDEX.getStatus().contentEquals("store in progress")||INDEX.getStatus().contentEquals("remove in progress"))) {
//DO NOTHING
}
String msg = Protocol.STORE_TO_TOKEN;
INDEX.setStatus("store in progress");
for(int x = 0; x<r;x++){
int port = dStores.get(x);
INDEX.storeFile(port,fileName,fileSize);
msg += (" " + port);
}
storesUnAck = r;
sendMsgToClient(msg);
}
public static void storeAck(){
storesUnAck--;
if(storesUnAck == 0){
INDEX.setStatus("store complete");
sendMsgToClient(Protocol.STORE_COMPLETE_TOKEN);
}
};
public static void removeAck(){
removesUnAck--;
if(removesUnAck == 0){
INDEX.setStatus("remove complete");
sendMsgToClient(Protocol.REMOVE_COMPLETE_TOKEN);
}
};
public static void loadFile(String filename, int socketId){
int dstorePort = INDEX.getDstorePortWithFile(filename);
if(dstorePort == -1){
sendMsgToClient(Protocol.ERROR_FILE_DOES_NOT_EXIST_TOKEN);
return;
}else{
invalidPorts.get(socketId).add(dstorePort);
sendMsgToClient(Protocol.LOAD_FROM_TOKEN+" "+ dstorePort + " " + INDEX.getFileSize(filename));
}
}
public static void reloadFile(String filename, int socketId){
int dstorePort = INDEX.getDstorePortWithFile(filename,invalidPorts.get(socketId));
if(dstorePort == -1){
sendMsgToClient(Protocol.ERROR_LOAD_TOKEN);
return;
}else{
sendMsgToClient(Protocol.LOAD_FROM_TOKEN+" "+ dstorePort + " " + INDEX.getFileSize(filename));
}
}
public static void removeFile(String filename){
INDEX.setStatus("remove in progress");
removesUnAck = r;
ArrayList<Integer> ports = INDEX.getDStoresWithFileAndRemove(filename);
if(ports.size() == 0){
sendMsgToClient(Protocol.ERROR_FILE_DOES_NOT_EXIST_TOKEN);
return;
}
String msg = Protocol.REMOVE_TOKEN+" "+filename;
for(int port: ports){
sendMsg(msg,port);
}
}
public static void listFiles(){
ArrayList<String> filenames = INDEX.getFileNames();
String msg = Protocol.LIST_TOKEN;
for(String filename: filenames){
msg += " "+filename;
}
sendMsgToClient(msg);
}
}
import java.io.IOException;
import java.net.Socket;
public class ControllerLogger extends Logger {
private static final String LOG_FILE_SUFFIX = "controller";
private static ControllerLogger instance = null;
public static void init(LoggingType loggingType) throws IOException {
if (instance == null)
instance = new ControllerLogger(loggingType);
else
throw new IOException("ControllerLogger already initialised");
}
public static ControllerLogger getInstance() {
if (instance == null)
throw new RuntimeException("ControllerLogger has not been initialised yet");
return instance;
}
protected ControllerLogger(LoggingType loggingType) throws IOException {
super(loggingType);
}
@Override
protected String getLogFileSuffix() {
return LOG_FILE_SUFFIX;
}
public void dstoreJoined(Socket socket, int dstorePort) {
log("[New Dstore " + dstorePort + " " + socket.getLocalPort() + "<-" + socket.getPort() + "]");
}
}
import java.io.*;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.Scanner;
public class Dstore {
private int port, cport, timeout;
private String file_folder;
private Socket toControllerSocket;
private ServerSocket fromSocket;
public static void main(String[] args){
try{
int port = Integer.parseInt(args[0]);
int cport = Integer.parseInt(args[1]);
int timeout = Integer.parseInt(args[2]);
String file_folder = args[3];
System.out.println("Creating new Dstore");
new Thread(new Runnable(){
public void run() {
Dstore dstore = new Dstore(port, cport,timeout,file_folder);
}
}).start();
}catch(IllegalArgumentException e){
System.out.println(e.getMessage());
}
}
public Dstore(int port, int cport, int timeout, String file_folder){
this.port = port; this. cport = cport; this.timeout = timeout; this.file_folder = file_folder;
try{
DstoreLogger.init(Logger.LoggingType.ON_FILE_AND_TERMINAL, port);
System.out.println("Creating new socket");
toControllerSocket = new Socket("localhost",cport);
fromSocket = new ServerSocket(port);
sendToController(Protocol.JOIN_TOKEN + " "+ port);
connectServer();
}catch(Exception e){
DstoreLogger.getInstance().log(e.getMessage());
}
}
public void connectServer(){
for(;;) {
try {
final Socket client = fromSocket.accept();
connect(client);
client.close();
} catch (Exception e) {
DstoreLogger.getInstance().log(e.getMessage());
}
}
}
public void connect(Socket client) throws IOException {
String msg;
BufferedReader in = new BufferedReader(new InputStreamReader(client.getInputStream()));
while((msg = in.readLine())!= null){
final String fmsg = msg;
new Thread(new Runnable(){
public void run() {
DstoreLogger.getInstance().messageReceived(client, fmsg);
onMessage(fmsg, client);
}
}).start();
}
}
public void onMessage(String message, Socket client){
String[] msgSplit = message.split(" ");
String operation = msgSplit[0];
switch (operation) {
case Protocol.STORE_TOKEN:
Store(msgSplit[1], Integer.parseInt(msgSplit[2]), client);
break;
case Protocol.LOAD_DATA_TOKEN:
Load(msgSplit[1],client);
break;
case Protocol.REMOVE_TOKEN:
Remove(msgSplit[1]);
break;
case Protocol.REBALANCE_TOKEN:
break;
default:
System.out.println(message);
break;
}
}
public void Store(String filename, int filesize,Socket client){
try{
sendMsgToClient(Protocol.ACK_TOKEN);
File dstoreFile = new File(filename + port+".txt");
try (FileOutputStream outputStream = new FileOutputStream(dstoreFile)) {
outputStream.write(client.getInputStream().readNBytes(filesize));
}
sendToController(Protocol.STORE_ACK_TOKEN+" "+ filename);
}catch(IOException e){
DstoreLogger.getInstance().log(e.getMessage());
}
}
public void sendMsgToClient(String msg){
sendMsg(msg,port);
}
public static void sendMsg(String msg, int port){
try{
Socket socket = new Socket("localhost",port);
PrintWriter out = new PrintWriter(socket.getOutputStream());
out.println(msg);
out.flush();
socket.close();
DstoreLogger.getInstance().messageSent(socket, msg);
}catch(IOException e){
DstoreLogger.getInstance().log(e.getMessage());
}
}
public void sendToController(String serialisedMessage){
try{
PrintWriter out = new PrintWriter(toControllerSocket.getOutputStream());
out.println(serialisedMessage); out.flush();
DstoreLogger.getInstance().messageSent(toControllerSocket, serialisedMessage);
}catch(IOException e){
DstoreLogger.getInstance().log(e.getMessage());
}
}
public void Load(String filename, Socket socket){
try{
File file = new File(filename + port+".txt");
if(!file.exists()){
socket.close();
return;
}
Scanner myReader = new Scanner(file);
while (myReader.hasNextLine()) {
String msg = myReader.nextLine();
socket.getOutputStream().write(msg.getBytes());
}
myReader.close();
socket.getOutputStream().flush();
socket.getOutputStream().close();
socket.close();
}catch(IOException e){
DstoreLogger.getInstance().log(e.getMessage());
}
}
public void Remove(String filename) {
File file = new File(filename + port+".txt");
if(!file.exists()){
sendToController(Protocol.ERROR_FILE_DOES_NOT_EXIST_TOKEN +" "+filename);
return;
}
if(file.delete())
{
sendToController(Protocol.REMOVE_ACK_TOKEN);
}