Skip to content
Snippets Groups Projects
Commit 24843952 authored by dl3g19's avatar dl3g19
Browse files

Code for initializing Controller and Dstores implemented

Functions for receiving the possible messages implemented
Main structure of functions included as an outline
parent c293b181
Branches
No related tags found
No related merge requests found
File added
import java.io.*;
import java.net.*;
import java.util.List;
import java.util.ArrayList;
public class Controller {
protected int cport; //Port to listen on
protected int rFactor; //Replication factor; each file is replicated across r Dstores
protected int timeout; //in milliseconds
protected int rebalancePeriod; //How long to wait to start the next rebalance operation, in milliseconds
protected List<Integer> dstores;
public Controller(int cport, int rFactor, int timeout, int rebalancePeriod) {
this.cport = cport;
this.rFactor = rFactor;
this.timeout = timeout;
this.rebalancePeriod = rebalancePeriod;
dstores = new ArrayList<Integer>();
}
public static void main(String[] args) {
try {
int cport = Integer.parseInt(args[0]);
int rFactor = Integer.parseInt(args[1]);
int timeout = Integer.parseInt(args[2]);
int rebalancePeriod = Integer.parseInt(args[3]);
Controller controller = new Controller(cport, rFactor, timeout, rebalancePeriod);
controller.start();
}
catch(IndexOutOfBoundsException e) {
System.out.println("Command line arguments have not been provided");
return;
}
catch(NumberFormatException e) {
System.out.println("Command line arguments must be integers");
return;
}
}
public void start() {
try {
ServerSocket server = new ServerSocket(cport);
while(true) {
try {
Socket client = server.accept();
BufferedReader in = new BufferedReader(new InputStreamReader(client.getInputStream()));
String[] message = in.readLine().split(" ");
if(dstores.size() < rFactor && !message[0].equals("JOIN")) {
BufferedWriter out = new BufferedWriter(new OutputStreamWriter(client.getOutputStream()));
out.write("ERROR");
out.close();
}
else {
handleMessage(message, client);
}
in.close();
}
catch(Exception e) {
e.printStackTrace();
System.out.println("Continue...");
}
}
}
catch(Exception e) {
e.printStackTrace();
}
}
void handleMessage(String[] message, Socket client) {
if(message[0].equals("JOIN")) {
dstores.add(Integer.parseInt(message[1]));
rebalance();
}
else if(message[0].equals("STORE")) {
store(client, message[1]);
}
else if(message[0].equals("LOAD")) {
load(client, message[1]);
}
else if(message[0].equals("REMOVE")) {
remove(client, message[1]);
}
else if(message[0].equals("LIST")) {
list(client);
}
}
void store(Socket client, String filename) {
//Update index to "store in progress"
//Select Dstores
int[] storesToStore = new int[rFactor];
for(int i=0; i<rFactor; i++) {
storesToStore[i] = dstores.get(i).intValue();
}
//Send STORE_TO message
PrintWriter out = new PrintWriter(client.getOutputStream());
out.print("STORE_TO");
for(int port : storesToStore) {
out.print(" ");
out.print(port);
}
out.flush();
//Wait for STORE_ACKs from datastores in storesToStore
//Update index to "store complete"
//Send STORE_COMPLETE message
out.print("STORE_COMPLETE");
out.flush();
out.close();
}
void load(Socket client, String filename) {
//Select a Dstore which contains the file
//Send LOAD_FROM message
}
void remove(Socket client, String filename) {
//Update index to "remove in progress"
//Send REMOVE message to all Dstores storing the file
//Wait for REMOVE_ACKs from all Dstores which were sent the REMOVE message
//Update index to "remove complete"
//Send REMOVE_COMPLETE to client
PrintWriter clientOut = new PrintWriter(client.getOutputStream());
clientOut.print("REMOVE_COMPLETE");
clientOut.flush();
clientOut.close();
}
void list(Socket client) {
//Send file list to client
}
void rebalance() {
//Send LIST message to each Dstore and receive their file list
//Create a new file allocation so that:
//Each file appears rFactor times
//Each file appears at most once on each datastore
//Files are evenly distributed (Dstores differ in capacity by at most 1, no 2 datastores have identical file lists)
//Make a (files to send, files to remove) pair for each Dstore
//Send the respective REBALANCE message to each Dstore
//Wait for REBALANCE_COMPLETE from all Dstores
}
}
File added
import java.io.*;
import java.net.*;
import java.util.Map;
import java.util.HashMap;
public class Dstore {
protected int port; //Port to listen on
protected int cport; //Controller's port to talk to
protected int timeout; //in milliseconds
protected String fileFolder; //Where to store the data locally
public Dstore(int port, int cport, int timeout, String fileFolder) {
this.port = port;
this.cport = cport;
this.timeout = timeout;
this.fileFolder = fileFolder;
}
public static void main(String[] args) {
try {
int port = Integer.parseInt(args[0]);
int cport = Integer.parseInt(args[1]);
int timeout = Integer.parseInt(args[2]);
String fileFolder = args[3];
Dstore dstore = new Dstore(port, cport, timeout, fileFolder);
dstore.start();
}
catch(IndexOutOfBoundsException e) {
System.out.println("Command line arguments have not been provided");
return;
}
catch(NumberFormatException e) {
System.out.println("Command line arguments must be integers");
return;
}
}
public void start() {
try {
Socket socket = new Socket(InetAddress.getLocalHost(), cport);
BufferedWriter out = new BufferedWriter(new OutputStreamWriter(socket.getOutputStream()));
out.write("JOIN " + port);
out.close();
socket.close();
ServerSocket server = new ServerSocket(port);
while(true) {
try {
Socket client = server.accept();
BufferedReader in = new BufferedReader(new InputStreamReader(client.getInputStream()));
String[] message = in.readLine().split(" ");
handleMessage(message, client);
in.close();
}
catch(Exception e) {
e.printStackTrace();
}
}
}
catch(Exception e) {
e.printStackTrace();
}
}
void handleMessage(String[] message, Socket client) {
if(message[0].equals("STORE")) {
store(client, message[1], Integer.parseInt(message[2]));
}
else if(message[0].equals("LOAD_DATA")) {
load(client, message[1]);
}
else if(message[0].equals("REMOVE")) {
remove(client, message[1]);
}
else if(message[0].equals("LIST")) {
list(client);
}
else if(message[0].equals("REBALANCE")) {
rebalance(client, message);
}
}
void store(Socket client, String filename, int filesize) {
//Send ACK message to client
//Receive file content from client
//Store the file data in fileFolder
//Send STORE_ACK message to the Controller
}
void load(Socket client, String filename) {
//Send the content of the file in fileFolder to the client
}
void remove(Socket client, String filename) {
//Remove the file from fileFolder
//Send REMOVE_ACK message to client (the controller)
}
void list(Socket client) {
//Send a list of all files in fileFolder to client (the controller)
}
void rebalance(Socket client, String[] message) {
//Interpret files to send and files to remove from the message
Map<String,Integer[]> filesToSend;
String[] filesToRemove;
int index;
int numberToSend = Integer.parseInt(message[1]);
index = 2;
filesToSend = new HashMap<String,Integer[]>(numberToSend);
for(int i=0; i<numberToSend; i++) {
String name = message[index];
index++;
int numberOfReceivers = Integer.parseInt(message[index]);
index++;
Integer[] receivers = new Integer[numberOfReceivers];
for(int j=0; j<numberOfReceivers; j++) {
receivers[j] = Integer.parseInt(message[index]);
index++;
}
filesToSend.put(name, receivers);
}
int numberToRemove = Integer.parseInt(message[index]);
index++;
filesToRemove = new String[numberToRemove];
for(int k=0; k<numberToRemove; k++) {
filesToRemove[k] = message[index];
index++;
}
//Send each file to send to the Dstore at the specified port number
//Remove each file to remove from fileFolder
//Send REBALANCE_COMPLETE message to client (the controller)
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment