diff --git a/src/ftp/Controller.java b/src/ftp/Controller.java index 0e0aeda0c86f385e4f635a84a39f764f46d4d5a4..9ec34877474d622583de0c3c48300fb336b9a71e 100644 --- a/src/ftp/Controller.java +++ b/src/ftp/Controller.java @@ -3,19 +3,14 @@ package ftp; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; -import java.nio.channels.SelectionKey; -import java.nio.channels.Selector; -import java.nio.channels.ServerSocketChannel; -import java.nio.channels.SocketChannel; +import java.nio.channels.*; import java.util.*; import java.util.stream.Collectors; import java.util.stream.Stream; -public class Controller { +public class Controller extends Server { - int cport; int r; - int timeout; int rbPeriod; @@ -27,63 +22,13 @@ public class Controller { * @param rbPeriod rebalance period (ms) */ public Controller(int cport, int r, int timeout, int rbPeriod) throws IOException { - this.cport = cport; + this.port = cport; this.r = r; this.timeout = timeout; this.rbPeriod = rbPeriod; - Selector selector = Selector.open(); - ServerSocketChannel serverSocket = ServerSocketChannel.open(); - serverSocket.bind(new InetSocketAddress(cport)); - serverSocket.configureBlocking(false); - serverSocket.register(selector, SelectionKey.OP_ACCEPT); - ByteBuffer buffer = ByteBuffer.allocate(256); //buffer to read/write - - while (true) { - selector.select(); - Set<SelectionKey> selectedKeys = selector.selectedKeys(); - Iterator<SelectionKey> iter = selectedKeys.iterator(); - while (iter.hasNext()) { - - SelectionKey key = iter.next(); - - if (key.isAcceptable()) { - register(selector, serverSocket); - } - - if (key.isReadable()) { - takeRequest(buffer, key); - } - - iter.remove(); - } - } - } - - - private static void takeRequest(ByteBuffer buffer, SelectionKey key) throws IOException { - SocketChannel client = (SocketChannel) key.channel(); - client.read(buffer); - String command = new String(buffer.array()).trim(); - System.out.println("Received: " + command); - - switch(command) { - case "DSTORE": - key.attach(new DstoreConnection(new Index())); - break; - } - - buffer.flip(); //reset pos to 0, sets limit to what was wrote, ready for reading - client.write(buffer); - buffer.clear(); //empties buffer and clears, ready for writing - } - - - private static void register(Selector selector, ServerSocketChannel serverSocket) throws IOException { - SocketChannel client = serverSocket.accept(); - client.configureBlocking(false); - client.register(selector, SelectionKey.OP_READ); + openSelector(); } @@ -91,7 +36,7 @@ public class Controller { Stream<String> str = Arrays.stream(args); List<Integer> intArgs = str.map(x -> {return Integer.parseInt(x);}) - .collect(Collectors.toList()); + .collect(Collectors.toList()); try { Controller ctrl = new Controller(intArgs.get(0), intArgs.get(1), intArgs.get(2), intArgs.get(3)); @@ -100,4 +45,27 @@ public class Controller { } } + + @Override + protected void handleRequest(String request, SelectionKey key, ByteBuffer buffer) { + String args[] = request.split(" "); + + switch(args[0]) { + case "JOIN": + key.attach(new DstoreConnection(new Index(), Integer.parseInt(args[1]))); + + SocketChannel toDstore = null; + try { + toDstore = SocketChannel.open(new InetSocketAddress(Integer.parseInt(args[1]))); + toDstore.register(selector, SelectionKey.OP_WRITE); + } catch (IOException e) { + e.printStackTrace(); + } + + sendMessage("test",toDstore,buffer); + + break; + } + } + } \ No newline at end of file diff --git a/src/ftp/DstoreConnection.java b/src/ftp/DstoreConnection.java index 90b7d34649655c1eb4fd0a221a53bb05caa95276..3acf37c83fdd982581ed83a6f69e0c60db29bd49 100644 --- a/src/ftp/DstoreConnection.java +++ b/src/ftp/DstoreConnection.java @@ -3,10 +3,12 @@ package ftp; public class DstoreConnection extends Connection { private Index file_index; + private int port; - public DstoreConnection(Index file_index) { + public DstoreConnection(Index file_index, int port) { this.file_index = file_index; + this.port = port; } } diff --git a/src/ftp/Server.java b/src/ftp/Server.java index 754bcfd84280eaabbcdfa0919c37b18087dbe2a2..f2682c33df9fa881fe14fa92bcb6c5215253b497 100644 --- a/src/ftp/Server.java +++ b/src/ftp/Server.java @@ -1,5 +1,106 @@ package ftp; +import java.io.IOException; +import java.net.InetSocketAddress; +import java.nio.ByteBuffer; +import java.nio.channels.*; +import java.util.Iterator; +import java.util.Set; + public abstract class Server { - public abstract void handleRequests(); + + protected int port; + protected int timeout; + + protected Selector selector; + + + protected abstract void handleRequest(String request, SelectionKey key, ByteBuffer buffer); + + + protected void openSelector() throws IOException { + selector = Selector.open(); + ServerSocketChannel serverSocket = ServerSocketChannel.open(); + serverSocket.bind(new InetSocketAddress(port)); + serverSocket.configureBlocking(false); + serverSocket.register(selector, SelectionKey.OP_ACCEPT); + ByteBuffer buffer = ByteBuffer.allocate(256); //buffer to read/write + + while (true) { + selector.select(); + Set<SelectionKey> selectedKeys = selector.selectedKeys(); + Iterator<SelectionKey> iter = selectedKeys.iterator(); + while (iter.hasNext()) { + + SelectionKey key = iter.next(); + + if (key.isAcceptable()) { + register(selector, serverSocket); + } + + if (key.isReadable()) { + String request = takeRequest(buffer, key); + handleRequest(request, key, buffer); + } + + iter.remove(); + } + } + } + + + protected void register(Selector selector, ServerSocketChannel serverSocket) throws IOException { + SocketChannel client = serverSocket.accept(); + client.configureBlocking(false); + client.register(selector, SelectionKey.OP_READ); + } + + + protected String takeRequest(ByteBuffer buffer, SelectionKey key) throws IOException { + SocketChannel client = (SocketChannel) key.channel(); + client.read(buffer); + String request = new String(buffer.array()).trim(); + System.out.println("Received: " + request); + return request; + } + + + protected String sendMessage(String msg, SelectionKey key, ByteBuffer buffer) { + buffer.clear(); + buffer = ByteBuffer.wrap(msg.getBytes()); +System.out.println(msg); + SocketChannel channel = (SocketChannel) key.channel(); + + String response = null; + try { + channel.write(buffer); + buffer.clear(); + channel.read(buffer); + response = new String(buffer.array()).trim(); + buffer.clear(); + } catch (IOException e) { + e.printStackTrace(); + } + return response; + } + + + protected String sendMessage(String msg, SocketChannel channel, ByteBuffer buffer) { + buffer.clear(); + buffer = ByteBuffer.wrap(msg.getBytes()); + + String response = null; + try { + channel.write(buffer); + buffer.clear(); + channel.read(buffer); + response = new String(buffer.array()).trim(); + buffer.clear(); + } catch (IOException e) { + e.printStackTrace(); + } + return response; + } + + } diff --git a/src/ftp/Dstore.java b/src/ftp/dstore1/Dstore.java similarity index 52% rename from src/ftp/Dstore.java rename to src/ftp/dstore1/Dstore.java index 962c931021797687544cd2bba37da44a7cbb723e..c02abeced73adccd33de31fd426be1947339f039 100644 --- a/src/ftp/Dstore.java +++ b/src/ftp/dstore1/Dstore.java @@ -1,28 +1,26 @@ -package ftp; +package ftp.dstore1; +import ftp.DstoreConnection; +import ftp.Index; +import ftp.Server; + +import java.io.File; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; -import java.nio.channels.Selector; -import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.util.Arrays; -import java.util.Iterator; import java.util.List; -import java.util.Set; import java.util.stream.Collectors; import java.util.stream.Stream; -public class Dstore { +public class Dstore extends Server { - int port; int cport; - int timeout; String file_folder; - private static SocketChannel dstore; - private static ByteBuffer buffer; + private static SocketChannel toController; /** @@ -44,16 +42,21 @@ public class Dstore { // in blocking mode, so will wait for response before progressing // try { - dstore = SocketChannel.open(new InetSocketAddress(cport)); - buffer = ByteBuffer.allocate(256); + toController = SocketChannel.open(new InetSocketAddress(cport)); } catch (IOException e) { e.printStackTrace(); } - sendMessage("DSTORE"); + ByteBuffer buffer = ByteBuffer.allocate(256); + + sendMessage("JOIN " + port, toController, buffer); + + + openSelector(); - // creating server socket to receive requests and files + /* + // creating server socket to receive files // ServerSocketChannel serverSocket = ServerSocketChannel.open(); serverSocket.bind(new InetSocketAddress(port)); @@ -63,49 +66,16 @@ public class Dstore { new Thread( () -> { } ).start(); - } + */ } - public String sendMessage(String msg) { - buffer = ByteBuffer.wrap(msg.getBytes()); - String response = null; - try { - dstore.write(buffer); - buffer.clear(); - dstore.read(buffer); - response = new String(buffer.array()).trim(); - buffer.clear(); - } catch (IOException e) { - e.printStackTrace(); - } - return response; - - } - - - private static void takeRequest(ByteBuffer buffer, SelectionKey key) - throws IOException { - - SocketChannel client = (SocketChannel) key.channel(); - client.read(buffer); - String command = new String(buffer.array()).trim(); - System.out.println("Received: " + command); - - buffer.flip(); //resets pos to 0 to read from buffer and sets limit to what was put there - client.write(buffer); - buffer.clear(); //empties buffer and clears, ready for writing - } - - - private static void register(Selector selector, ServerSocketChannel serverSocket) - throws IOException { - - SocketChannel client = serverSocket.accept(); - client.configureBlocking(false); - client.register(selector, SelectionKey.OP_READ); - } + /* + buffer.flip(); //resets pos to 0 to read from buffer and sets limit to what was put there + client.write(buffer); + buffer.clear(); //empties buffer and clears, ready for writing + */ public static void main(String args[]) { @@ -122,4 +92,26 @@ public class Dstore { } } + + private File[] list() { + File folder = new File("./storage"); + return folder.listFiles(); + } + + + @Override + protected void handleRequest(String request, SelectionKey key, ByteBuffer buffer) { + String args[] = request.split(" "); + + switch(args[0]) { + case "LIST": + String fileNames = Arrays.stream(list()) + .map(File::getName) + .reduce("", (file1, file2) -> file1 + " " + file2); + + sendMessage(fileNames, key, buffer); + break; + } + } + }