Skip to content
Snippets Groups Projects
Commit c50e21e3 authored by ik1g19's avatar ik1g19
Browse files

need to make send message flip

parent 7335234d
No related branches found
No related tags found
No related merge requests found
......@@ -3,19 +3,14 @@ package ftp;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.channels.*;
import java.util.*;
import java.util.stream.Collectors;
import java.util.stream.Stream;
public class Controller {
public class Controller extends Server {
int cport;
int r;
int timeout;
int rbPeriod;
......@@ -27,63 +22,13 @@ public class Controller {
* @param rbPeriod rebalance period (ms)
*/
public Controller(int cport, int r, int timeout, int rbPeriod) throws IOException {
this.cport = cport;
this.port = cport;
this.r = r;
this.timeout = timeout;
this.rbPeriod = rbPeriod;
Selector selector = Selector.open();
ServerSocketChannel serverSocket = ServerSocketChannel.open();
serverSocket.bind(new InetSocketAddress(cport));
serverSocket.configureBlocking(false);
serverSocket.register(selector, SelectionKey.OP_ACCEPT);
ByteBuffer buffer = ByteBuffer.allocate(256); //buffer to read/write
while (true) {
selector.select();
Set<SelectionKey> selectedKeys = selector.selectedKeys();
Iterator<SelectionKey> iter = selectedKeys.iterator();
while (iter.hasNext()) {
SelectionKey key = iter.next();
if (key.isAcceptable()) {
register(selector, serverSocket);
}
if (key.isReadable()) {
takeRequest(buffer, key);
}
iter.remove();
}
}
}
private static void takeRequest(ByteBuffer buffer, SelectionKey key) throws IOException {
SocketChannel client = (SocketChannel) key.channel();
client.read(buffer);
String command = new String(buffer.array()).trim();
System.out.println("Received: " + command);
switch(command) {
case "DSTORE":
key.attach(new DstoreConnection(new Index()));
break;
}
buffer.flip(); //reset pos to 0, sets limit to what was wrote, ready for reading
client.write(buffer);
buffer.clear(); //empties buffer and clears, ready for writing
}
private static void register(Selector selector, ServerSocketChannel serverSocket) throws IOException {
SocketChannel client = serverSocket.accept();
client.configureBlocking(false);
client.register(selector, SelectionKey.OP_READ);
openSelector();
}
......@@ -100,4 +45,27 @@ public class Controller {
}
}
@Override
protected void handleRequest(String request, SelectionKey key, ByteBuffer buffer) {
String args[] = request.split(" ");
switch(args[0]) {
case "JOIN":
key.attach(new DstoreConnection(new Index(), Integer.parseInt(args[1])));
SocketChannel toDstore = null;
try {
toDstore = SocketChannel.open(new InetSocketAddress(Integer.parseInt(args[1])));
toDstore.register(selector, SelectionKey.OP_WRITE);
} catch (IOException e) {
e.printStackTrace();
}
sendMessage("test",toDstore,buffer);
break;
}
}
}
\ No newline at end of file
......@@ -3,10 +3,12 @@ package ftp;
public class DstoreConnection extends Connection {
private Index file_index;
private int port;
public DstoreConnection(Index file_index) {
public DstoreConnection(Index file_index, int port) {
this.file_index = file_index;
this.port = port;
}
}
package ftp;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.util.Iterator;
import java.util.Set;
public abstract class Server {
public abstract void handleRequests();
protected int port;
protected int timeout;
protected Selector selector;
protected abstract void handleRequest(String request, SelectionKey key, ByteBuffer buffer);
protected void openSelector() throws IOException {
selector = Selector.open();
ServerSocketChannel serverSocket = ServerSocketChannel.open();
serverSocket.bind(new InetSocketAddress(port));
serverSocket.configureBlocking(false);
serverSocket.register(selector, SelectionKey.OP_ACCEPT);
ByteBuffer buffer = ByteBuffer.allocate(256); //buffer to read/write
while (true) {
selector.select();
Set<SelectionKey> selectedKeys = selector.selectedKeys();
Iterator<SelectionKey> iter = selectedKeys.iterator();
while (iter.hasNext()) {
SelectionKey key = iter.next();
if (key.isAcceptable()) {
register(selector, serverSocket);
}
if (key.isReadable()) {
String request = takeRequest(buffer, key);
handleRequest(request, key, buffer);
}
iter.remove();
}
}
}
protected void register(Selector selector, ServerSocketChannel serverSocket) throws IOException {
SocketChannel client = serverSocket.accept();
client.configureBlocking(false);
client.register(selector, SelectionKey.OP_READ);
}
protected String takeRequest(ByteBuffer buffer, SelectionKey key) throws IOException {
SocketChannel client = (SocketChannel) key.channel();
client.read(buffer);
String request = new String(buffer.array()).trim();
System.out.println("Received: " + request);
return request;
}
protected String sendMessage(String msg, SelectionKey key, ByteBuffer buffer) {
buffer.clear();
buffer = ByteBuffer.wrap(msg.getBytes());
System.out.println(msg);
SocketChannel channel = (SocketChannel) key.channel();
String response = null;
try {
channel.write(buffer);
buffer.clear();
channel.read(buffer);
response = new String(buffer.array()).trim();
buffer.clear();
} catch (IOException e) {
e.printStackTrace();
}
return response;
}
protected String sendMessage(String msg, SocketChannel channel, ByteBuffer buffer) {
buffer.clear();
buffer = ByteBuffer.wrap(msg.getBytes());
String response = null;
try {
channel.write(buffer);
buffer.clear();
channel.read(buffer);
response = new String(buffer.array()).trim();
buffer.clear();
} catch (IOException e) {
e.printStackTrace();
}
return response;
}
}
package ftp;
package ftp.dstore1;
import ftp.DstoreConnection;
import ftp.Index;
import ftp.Server;
import java.io.File;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
public class Dstore {
public class Dstore extends Server {
int port;
int cport;
int timeout;
String file_folder;
private static SocketChannel dstore;
private static ByteBuffer buffer;
private static SocketChannel toController;
/**
......@@ -44,16 +42,21 @@ public class Dstore {
// in blocking mode, so will wait for response before progressing
//
try {
dstore = SocketChannel.open(new InetSocketAddress(cport));
buffer = ByteBuffer.allocate(256);
toController = SocketChannel.open(new InetSocketAddress(cport));
} catch (IOException e) {
e.printStackTrace();
}
sendMessage("DSTORE");
ByteBuffer buffer = ByteBuffer.allocate(256);
sendMessage("JOIN " + port, toController, buffer);
openSelector();
// creating server socket to receive requests and files
/*
// creating server socket to receive files
//
ServerSocketChannel serverSocket = ServerSocketChannel.open();
serverSocket.bind(new InetSocketAddress(port));
......@@ -63,49 +66,16 @@ public class Dstore {
new Thread( () -> {
} ).start();
}
}
public String sendMessage(String msg) {
buffer = ByteBuffer.wrap(msg.getBytes());
String response = null;
try {
dstore.write(buffer);
buffer.clear();
dstore.read(buffer);
response = new String(buffer.array()).trim();
buffer.clear();
} catch (IOException e) {
e.printStackTrace();
}
return response;
*/
}
private static void takeRequest(ByteBuffer buffer, SelectionKey key)
throws IOException {
SocketChannel client = (SocketChannel) key.channel();
client.read(buffer);
String command = new String(buffer.array()).trim();
System.out.println("Received: " + command);
/*
buffer.flip(); //resets pos to 0 to read from buffer and sets limit to what was put there
client.write(buffer);
buffer.clear(); //empties buffer and clears, ready for writing
}
private static void register(Selector selector, ServerSocketChannel serverSocket)
throws IOException {
SocketChannel client = serverSocket.accept();
client.configureBlocking(false);
client.register(selector, SelectionKey.OP_READ);
}
*/
public static void main(String args[]) {
......@@ -122,4 +92,26 @@ public class Dstore {
}
}
private File[] list() {
File folder = new File("./storage");
return folder.listFiles();
}
@Override
protected void handleRequest(String request, SelectionKey key, ByteBuffer buffer) {
String args[] = request.split(" ");
switch(args[0]) {
case "LIST":
String fileNames = Arrays.stream(list())
.map(File::getName)
.reduce("", (file1, file2) -> file1 + " " + file2);
sendMessage(fileNames, key, buffer);
break;
}
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment