Skip to content
Snippets Groups Projects
Server.java 14.21 KiB
import java.io.*;
import java.net.*;
import java.nio.CharBuffer;
import java.util.*;

public class Server {

    private final Controller controller;
    public static FileSystem fileSystem;
    public static int pos;
    public static boolean rebalancing;
    public Timer timer;

    public Server(Controller controller, FileSystem fileSystem) {
        this.controller = controller;
        Server.fileSystem = fileSystem;
        rebalancing = false;

        // do the rebalancing operation every rebalance_period seconds with a delay of rebalance_period seconds
        runRebalancing(controller.getRebalance_period());
    }

    public static FileSystem getFileSystem() {
        return fileSystem;
    }

    private void runRebalancing(long delay) {
        if (timer != null) {
            timer.cancel();
            timer.purge();
        }

        timer = new Timer("RebalanceOperation");

        TimerTask timerTask = new TimerTask() {
            public void run() {
                try {
                    new Rebalance(fileSystem, controller).handle();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        };

        timer.scheduleAtFixedRate(timerTask, delay, controller.getRebalance_period());
    }

    public void handleClient(Socket client) throws IOException {
        OutputStream out = client.getOutputStream();
        InputStream in = client.getInputStream();
        pos = 0;

        BufferedReader req = new BufferedReader(new InputStreamReader(in));
        PrintWriter res = new PrintWriter(new OutputStreamWriter(out));

        System.out.println("---------NEW CONNECTION---------");

        String line;
        while((line = req.readLine()) != null) {
            String[] tokens = line.split(" ");
            String command = tokens[0];

            if (!rebalancing && command.equals("JOIN")) {

                int publicPort = Integer.parseInt(tokens[1]);
                FSStore fsStore = new FSStore(client, publicPort);
                fileSystem.addDstore(client.getPort(), fsStore);

                ControllerLogger.getInstance().dstoreJoined(client, publicPort);
                runRebalancing(0);

            } else if (fileSystem.getDstores().size() < controller.getR()) {

                ControllerLogger.getInstance().messageSent(client, "ERROR_NOT_ENOUGH_DSTORES");
                res.println("ERROR_NOT_ENOUGH_DSTORES");
                res.flush();

            } else {
                if (fileSystem.getDstores().containsKey(client.getPort())) {
                    if(command.equals("STORE_ACK")) {
                        ControllerLogger.getInstance().messageReceived(client, "STORE_ACK");
                        handleStoreACK(tokens);
                    } else if (command.equals("REMOVE_ACK")) {
                        ControllerLogger.getInstance().messageReceived(client, "REMOVE_ACK");
                        handleRemoveACK(tokens);
                    }
                    else if(command.equals("LIST")) {

                        ControllerLogger.getInstance().messageReceived(client, "LIST");
                        // get the files of the dstore when rebalancing
                        List<String> filenames = new ArrayList<>();
                        for(int i = 1; i < tokens.length; i++) {
                            filenames.add(tokens[i]);
                        }

                        fileSystem.addRebalancingList(fileSystem.
                                        getDstores().
                                        get(client.getPort()).getConnection().getPort(),
                                        filenames);

                    } else if (command.equals("REBALANCE_COMPLETE")) {
                        ControllerLogger.getInstance().messageReceived(client, "REBALANCE_COMPLETE");
                        System.out.println("Rebalance done for " + fileSystem.getDstores().get(client.getPort()).getPublicPort());

                    } else if (command.equals("ERROR_FILE_DOES_NOT_EXIST")) {
                        ControllerLogger.getInstance().messageReceived(client, "ERROR_FILE_DOES_NOT_EXIST");
                    } else {
                        System.out.println("Unknown command " + command);
                    }
                } else if (!rebalancing) {

                    switch (command) {
                        case "STORE" -> handleStore(client, tokens, line);
                        case "LOAD" -> handleLoad(client, tokens, line);
                        case "RELOAD" -> handleReload(client, tokens, line);
                        case "REMOVE" -> handleRemove(client, tokens, line);
                        case "LIST" -> handleList(client, line);
                        default -> System.out.println("Unknown command " + command);
                    }

                } else if (rebalancing) {
                    System.out.println("Rebalancing...");
                }
            }
        }
    }

    /**
     * @param client
     * @param tokens gets the filename and the filesize
     * @throws IOException
     */
    private void handleStore(Socket client, String[] tokens, String line) throws IOException {

        ControllerLogger.getInstance().messageReceived(client, line);

        try {
            String filename = tokens[1];
            int filesize = Integer.parseInt(tokens[2]);
            PrintWriter res = new PrintWriter(new OutputStreamWriter(client.getOutputStream()));

            if(fileSystem.getStore().containsKey(filename)) {
                ControllerLogger.getInstance().messageSent(client, "ERROR_FILE_ALREADY_EXISTS");
                res.println("ERROR_FILE_ALREADY_EXISTS");
                res.flush();
                return;
            }

            // update index to store in progress
            fileSystem.addIndex(filename, "store in progress");

            // select R Dstores and create a string of all of their endpoints
            String msg = "";
            int i = 0;
            List<FSStore> temp = new ArrayList<>();
            List<FSStore> sorted = new ArrayList<>(fileSystem.getDstores().values());
            Collections.sort(sorted);

            for(FSStore fsStore : sorted) {
                if(i == controller.getR()) {
                    break;
                }

                // in case all the ACK are received keep in memory all the dstores
                temp.add(fsStore);

                // construct the message
                msg = fsStore.getPublicPort() + " " + msg;
                i++;
            }

            ControllerLogger.getInstance().messageSent(client, "STORE_TO " + msg);
            res.println("STORE_TO " + msg);
            res.flush();

            // check if all the dstores have sent an ACK and send appropriate messages
            boolean done = false;
            long limit = System.currentTimeMillis() + controller.getTimeout();

            while(!done && System.currentTimeMillis() <= limit) {
                if(fileSystem.getStore().containsKey(filename) && !fileSystem.getStore().get(filename).isEmpty())  {
                    if(fileSystem.getStore().get(filename).size() == controller.getR()) {
                        done = true;
                        fileSystem.addIndex(filename, "store complete");
                        fileSystem.addStore(filename, temp);


                        for(FSStore fsStore : temp) {
                            fsStore.getFiles().add(filename);
                        }

                        fileSystem.addFileSize(filename, filesize);
                        ControllerLogger.getInstance().messageSent(client, "STORE_COMPLETE");
                        res.println("STORE_COMPLETE");
                        res.flush();
                        break;
                    }
                }
            }

            // if the dstores didn't send a STORE_ACK in the timeout => store failed
            if(!done) {
                fileSystem.removeIndex(filename);
                System.out.println(filename + " failed to upload");
            }

        } catch (IndexOutOfBoundsException e) {
            System.out.println("Arguments don't match in STORE operation");
        }
    }

    private void handleLoad(Socket client, String[] tokens, String line) throws IOException {

        ControllerLogger.getInstance().messageReceived(client, line);

        try {
            String filename = tokens[1];
            PrintWriter res = new PrintWriter(new OutputStreamWriter(client.getOutputStream()));
            if(!fileSystem.getStore().containsKey(filename)) {
                ControllerLogger.getInstance().messageSent(client, "ERROR_FILE_DOES_NOT_EXIST");
                res.println("ERROR_FILE_DOES_NOT_EXIST");
            } else {
                // select a Dstore from there and give an appropriate error if all Dstores fail
                ControllerLogger.getInstance().messageSent(client, "LOAD_FROM " + fileSystem.getStore().get(filename).get(pos).getPublicPort() +
                        " " + fileSystem.getFileSizes().get(filename));
                res.println("LOAD_FROM " + fileSystem.getStore().get(filename).get(pos).getPublicPort() +
                        " " + fileSystem.getFileSizes().get(filename));
            }
            res.flush();
        } catch (IndexOutOfBoundsException e) {
            System.out.println("Arguments don't match in LOAD operation");
        }
    }

    private void handleReload(Socket client, String[] tokens, String line) throws IOException {
        ControllerLogger.getInstance().messageReceived(client, line);

        pos = pos + 1;
        PrintWriter res = new PrintWriter(new OutputStreamWriter(client.getOutputStream()));

        try {
            String filename = tokens[1];
            if(!fileSystem.getStore().containsKey(filename)) {
                pos = 0;
                ControllerLogger.getInstance().messageSent(client, "ERROR_FILE_DOES_NOT_EXIST");
                res.println("ERROR_FILE_DOES_NOT_EXIST");
            } else {

                if(pos == controller.getR()) {
                    pos = 0;
                    ControllerLogger.getInstance().messageSent(client, "ERROR_LOAD");
                    res.println("ERROR_LOAD");
                    res.flush();
                    return;
                }

                // select a Dstore from there and give an appropriate error if all Dstores fail
                ControllerLogger.getInstance().messageSent(client, "LOAD_FROM " + fileSystem.getStore().get(filename).get(pos).getPublicPort() +
                        " " + fileSystem.getFileSizes().get(filename));
                res.println("LOAD_FROM " + fileSystem.getStore().get(filename).get(pos).getPublicPort() +
                        " " + fileSystem.getFileSizes().get(filename));
            }

            res.flush();
        } catch (IndexOutOfBoundsException e) {
            System.out.println("Arguments don't match in LOAD operation");
        }
    }

    private void handleRemove(Socket client, String[] tokens, String line) throws IOException {

        ControllerLogger.getInstance().messageReceived(client, line);

        try {

            String filename = tokens[1];

            fileSystem.addIndex(filename, "remove in progress");
            System.out.println(fileSystem.index.get(filename));
            if(!fileSystem.store.containsKey(filename)) {
                PrintWriter res = new PrintWriter(new OutputStreamWriter(client.getOutputStream()));
                ControllerLogger.getInstance().messageSent(client, "ERROR_FILE_DOES_NOT_EXIST");
                res.println("ERROR_FILE_DOES_NOT_EXIST");
                res.flush();
                return;
            }

            for(FSStore fsStore : fileSystem.getStore().get(filename)) {
                fsStore.getFiles().remove(filename);
                PrintWriter res = fsStore.getOutput();
                ControllerLogger.getInstance().messageSent(client, "REMOVE " + filename);
                res.println("REMOVE " + filename);
                res.flush();
            }

            boolean done = false;
            long limit = System.currentTimeMillis() + controller.getTimeout();
            PrintWriter res = new PrintWriter(new OutputStreamWriter(client.getOutputStream()));

            while(!done && System.currentTimeMillis() < limit) {
                if(fileSystem.getStore().get(filename).isEmpty()) {
                    done = true;
                    fileSystem.addIndex(filename, "remove complete");
                    fileSystem.getStore().remove(filename);
                    fileSystem.getFileSizes().remove(filename);
                    ControllerLogger.getInstance().messageSent(client, "REMOVE_COMPLETE");
                    res.println("REMOVE_COMPLETE");
                    res.flush();
                }
            }

            if(!done) {
                fileSystem.removeIndex(filename);
                System.out.println(filename + " failed to remove");
            }

        } catch (IndexOutOfBoundsException e) {
            System.out.println("Arguments don't match in REMOVE operation");
        }
    }

    private void handleList(Socket client, String line) throws IOException {

        ControllerLogger.getInstance().messageReceived(client, line);

        String msg = "";
        for(String filename : fileSystem.getStore().keySet()) {
            msg = filename + " " + msg;
        }

        PrintWriter res = new PrintWriter(new OutputStreamWriter(client.getOutputStream()));
        ControllerLogger.getInstance().messageSent(client, "LIST " + msg);
        res.println("LIST " + msg);
        res.flush();
    }

    private void handleStoreACK(String[] tokens) throws IOException {

        String filename = tokens[1];

        if(fileSystem.getStore().containsKey(filename)) {
            fileSystem.getStore().get(filename).add(null);
        } else {
            List<FSStore> d = new ArrayList<>();
            d.add(null);
            Server.fileSystem.addStore(filename, d);
        }

    }

    private void handleRemoveACK(String[] tokens) {
        String filename = tokens[1];
        for(FSStore fsStore : fileSystem.getStore().get(filename)) {
            if(!fsStore.getFiles().contains(filename)) {
                fileSystem.removeDstore(fsStore, filename);
                break;
            }
        }

        fileSystem.getFileSizes().remove(filename);
    }
}