Skip to content
Snippets Groups Projects
Commit 55372169 authored by ik1g19's avatar ik1g19
Browse files

selectors now work

 - list query to dstore throws nullexception
parent c50e21e3
Branches
No related tags found
No related merge requests found
...@@ -27,7 +27,6 @@ public class Controller extends Server { ...@@ -27,7 +27,6 @@ public class Controller extends Server {
this.timeout = timeout; this.timeout = timeout;
this.rbPeriod = rbPeriod; this.rbPeriod = rbPeriod;
openSelector(); openSelector();
} }
...@@ -53,17 +52,7 @@ public class Controller extends Server { ...@@ -53,17 +52,7 @@ public class Controller extends Server {
switch(args[0]) { switch(args[0]) {
case "JOIN": case "JOIN":
key.attach(new DstoreConnection(new Index(), Integer.parseInt(args[1]))); key.attach(new DstoreConnection(new Index(), Integer.parseInt(args[1])));
sendMessage("LIST",key,buffer);
SocketChannel toDstore = null;
try {
toDstore = SocketChannel.open(new InetSocketAddress(Integer.parseInt(args[1])));
toDstore.register(selector, SelectionKey.OP_WRITE);
} catch (IOException e) {
e.printStackTrace();
}
sendMessage("test",toDstore,buffer);
break; break;
} }
} }
......
...@@ -2,8 +2,10 @@ package ftp; ...@@ -2,8 +2,10 @@ package ftp;
import java.io.IOException; import java.io.IOException;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.net.Socket;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.nio.channels.*; import java.nio.channels.*;
import java.util.Arrays;
import java.util.Iterator; import java.util.Iterator;
import java.util.Set; import java.util.Set;
...@@ -15,9 +17,19 @@ public abstract class Server { ...@@ -15,9 +17,19 @@ public abstract class Server {
protected Selector selector; protected Selector selector;
/**
* @desc abstract method which handles server requests based on implmentation
* @param request request to be handled
* @param key corresponding key
* @param buffer buffer to be used for r/w
*/
protected abstract void handleRequest(String request, SelectionKey key, ByteBuffer buffer); protected abstract void handleRequest(String request, SelectionKey key, ByteBuffer buffer);
/**
* @desc starts the selector and begins the server loop
* @throws IOException
*/
protected void openSelector() throws IOException { protected void openSelector() throws IOException {
selector = Selector.open(); selector = Selector.open();
ServerSocketChannel serverSocket = ServerSocketChannel.open(); ServerSocketChannel serverSocket = ServerSocketChannel.open();
...@@ -26,6 +38,59 @@ public abstract class Server { ...@@ -26,6 +38,59 @@ public abstract class Server {
serverSocket.register(selector, SelectionKey.OP_ACCEPT); serverSocket.register(selector, SelectionKey.OP_ACCEPT);
ByteBuffer buffer = ByteBuffer.allocate(256); //buffer to read/write ByteBuffer buffer = ByteBuffer.allocate(256); //buffer to read/write
System.out.println("Selector Started");
while (true) {
selector.select();
Set<SelectionKey> selectedKeys = selector.selectedKeys();
Iterator<SelectionKey> iter = selectedKeys.iterator();
while (iter.hasNext()) {
SelectionKey key = iter.next();
if (key.isAcceptable()) {
System.out.println("Server socket ready to accept connection");
register(selector, serverSocket);
}
if (key.isConnectable()) {
System.out.println("Channel is ready to connect to the server");
}
if (key.isReadable()) {
System.out.println("Channel has data to be read");
String request = takeRequest(buffer, key);
handleRequest(request, key, buffer);
}
iter.remove();
}
}
}
/**
* @desc starts the selector and begins the server loop, registers given initial channels
* @param channels initial channels to be registered with the selector
* @throws IOException
*/
protected void openSelector(SocketChannel[] channels) throws IOException{
selector = Selector.open();
ServerSocketChannel serverSocket = ServerSocketChannel.open();
serverSocket.bind(new InetSocketAddress(port));
serverSocket.configureBlocking(false);
serverSocket.register(selector, SelectionKey.OP_ACCEPT);
Iterator<SocketChannel> sockIt = Arrays.stream(channels).iterator();
while (sockIt.hasNext()) {
SocketChannel channel = sockIt.next();
channel.register(selector, SelectionKey.OP_READ | SelectionKey.OP_WRITE);
}
ByteBuffer buffer = ByteBuffer.allocate(256); //buffer to read/write
System.out.println("Selector Started");
while (true) { while (true) {
selector.select(); selector.select();
Set<SelectionKey> selectedKeys = selector.selectedKeys(); Set<SelectionKey> selectedKeys = selector.selectedKeys();
...@@ -35,10 +100,16 @@ public abstract class Server { ...@@ -35,10 +100,16 @@ public abstract class Server {
SelectionKey key = iter.next(); SelectionKey key = iter.next();
if (key.isAcceptable()) { if (key.isAcceptable()) {
System.out.println("Server socket ready to accept connection");
register(selector, serverSocket); register(selector, serverSocket);
} }
if (key.isConnectable()) {
System.out.println("Channel is ready to connect to the server");
}
if (key.isReadable()) { if (key.isReadable()) {
System.out.println("Channel has data to be read");
String request = takeRequest(buffer, key); String request = takeRequest(buffer, key);
handleRequest(request, key, buffer); handleRequest(request, key, buffer);
} }
...@@ -49,13 +120,27 @@ public abstract class Server { ...@@ -49,13 +120,27 @@ public abstract class Server {
} }
/**
* @desc registers a newly accepted channel with the selector
* @param selector selector to be registered with
* @param serverSocket server socket which accepts the connection
* @throws IOException
*/
protected void register(Selector selector, ServerSocketChannel serverSocket) throws IOException { protected void register(Selector selector, ServerSocketChannel serverSocket) throws IOException {
SocketChannel client = serverSocket.accept(); SocketChannel client = serverSocket.accept();
System.out.println("Channel accepted");
client.configureBlocking(false); client.configureBlocking(false);
client.register(selector, SelectionKey.OP_READ); client.register(selector, SelectionKey.OP_READ);
} }
/**
* @desc reads request from buffer
* @param buffer buffer to read request from
* @param key key linking to the channel
* @return returns the request
* @throws IOException
*/
protected String takeRequest(ByteBuffer buffer, SelectionKey key) throws IOException { protected String takeRequest(ByteBuffer buffer, SelectionKey key) throws IOException {
SocketChannel client = (SocketChannel) key.channel(); SocketChannel client = (SocketChannel) key.channel();
client.read(buffer); client.read(buffer);
...@@ -65,42 +150,100 @@ public abstract class Server { ...@@ -65,42 +150,100 @@ public abstract class Server {
} }
protected String sendMessage(String msg, SelectionKey key, ByteBuffer buffer) { /**
buffer.clear(); * @desc sends a message using a given key and waits for a response
* @param msg message to be sent
* @param key key of corresponding channel to send message through
* @param buffer buffer to r/w messages
* @return
*/
protected String sendMessageResponse(String msg, SelectionKey key, ByteBuffer buffer) {
buffer = ByteBuffer.wrap(msg.getBytes()); buffer = ByteBuffer.wrap(msg.getBytes());
System.out.println(msg);
SocketChannel channel = (SocketChannel) key.channel(); SocketChannel channel = (SocketChannel) key.channel();
String response = null; String response = null;
try { try {
channel.write(buffer); channel.write(buffer);
buffer.clear(); buffer.clear();
System.out.println("Sent message: " + msg);
channel.read(buffer); channel.read(buffer);
response = new String(buffer.array()).trim(); response = new String(buffer.array()).trim();
buffer.clear();
} catch (IOException e) { } catch (IOException e) {
e.printStackTrace(); e.printStackTrace();
} }
return response; return response;
} }
protected String sendMessage(String msg, SocketChannel channel, ByteBuffer buffer) { /**
buffer.clear(); * @desc sends a message using a given channel and waits for a response
* @param msg message to be sent
* @param channel channel to send message through
* @param buffer buffer to r/w messages
* @return
*/
protected String sendMessageResponse(String msg, SocketChannel channel, ByteBuffer buffer) {
buffer = ByteBuffer.wrap(msg.getBytes()); buffer = ByteBuffer.wrap(msg.getBytes());
String response = null; String response = null;
try { try {
channel.write(buffer); channel.write(buffer);
buffer.clear(); buffer.clear();
System.out.println("Sent message: " + msg);
channel.read(buffer); channel.read(buffer);
response = new String(buffer.array()).trim(); response = new String(buffer.array()).trim();
buffer.clear();
} catch (IOException e) { } catch (IOException e) {
e.printStackTrace(); e.printStackTrace();
} }
return response; return response;
} }
/**
* @desc sends a message using a given key, does not wait for a response
* @param msg message to be sent
* @param key key of corresponding channel to send message through
* @param buffer buffer to r/w messages
* @return
*/
protected void sendMessage(String msg, SelectionKey key, ByteBuffer buffer) {
buffer = ByteBuffer.wrap(msg.getBytes());
SocketChannel channel = (SocketChannel) key.channel();
try {
channel.write(buffer);
buffer.clear();
System.out.println("Sent message: " + msg);
} catch (IOException e) {
e.printStackTrace();
}
}
/**
* @desc sends a message using a given channel, does not wait for a response
* @param msg message to be sent
* @param channel channel to send message through
* @param buffer buffer to r/w messages
* @return
*/
protected void sendMessage(String msg, SocketChannel channel, ByteBuffer buffer) {
buffer = ByteBuffer.wrap(msg.getBytes());
try {
channel.write(buffer);
buffer.clear();
System.out.println("Sent message: " + msg);
} catch (IOException e) {
e.printStackTrace();
}
}
} }
...@@ -7,6 +7,7 @@ import ftp.Server; ...@@ -7,6 +7,7 @@ import ftp.Server;
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.net.SocketException;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey; import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel; import java.nio.channels.SocketChannel;
...@@ -37,37 +38,24 @@ public class Dstore extends Server { ...@@ -37,37 +38,24 @@ public class Dstore extends Server {
this.file_folder = file_folder; this.file_folder = file_folder;
ByteBuffer buffer = ByteBuffer.allocate(256);
// opening connection to controller // opening connection to controller
// open(addr) acts as convenience method for open() and connect() // open(addr) acts as convenience method for open() and connect()
// in blocking mode, so will wait for response before progressing // in blocking mode, so will wait for response before progressing
// //
try { try {
toController = SocketChannel.open(new InetSocketAddress(cport)); toController = SocketChannel.open(new InetSocketAddress(cport));
toController.configureBlocking(false);
} catch (IOException e) { } catch (IOException e) {
e.printStackTrace(); e.printStackTrace();
} }
ByteBuffer buffer = ByteBuffer.allocate(256);
sendMessage("JOIN " + port, toController, buffer); sendMessage("JOIN " + port, toController, buffer);
openSelector(new SocketChannel[] {toController});
openSelector();
/*
// creating server socket to receive files
//
ServerSocketChannel serverSocket = ServerSocketChannel.open();
serverSocket.bind(new InetSocketAddress(port));
while (true) {
SocketChannel client = serverSocket.accept();
new Thread( () -> {
} ).start();
}
*/
} }
...@@ -93,10 +81,14 @@ public class Dstore extends Server { ...@@ -93,10 +81,14 @@ public class Dstore extends Server {
} }
private File[] list() { // /**
File folder = new File("./storage"); // * @desc lists the files of the datastore
return folder.listFiles(); // * @return
} // */
// private File[] list() {
// File folder = new File("./storage");
// return folder.listFiles();
// }
@Override @Override
...@@ -105,9 +97,14 @@ public class Dstore extends Server { ...@@ -105,9 +97,14 @@ public class Dstore extends Server {
switch(args[0]) { switch(args[0]) {
case "LIST": case "LIST":
String fileNames = Arrays.stream(list()) File folder = new File("./storage");
.map(File::getName) String[] files = folder.list();
// String fileNames = Arrays.stream(list())
// .map(File::getName)
// .reduce("", (file1, file2) -> file1 + " " + file2);
String fileNames = Arrays.stream(files)
.reduce("", (file1, file2) -> file1 + " " + file2); .reduce("", (file1, file2) -> file1 + " " + file2);
System.out.println(fileNames);
sendMessage(fileNames, key, buffer); sendMessage(fileNames, key, buffer);
break; break;
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment