From efdcc92719b9ff9959ffa1f16fcccaa76619aee4 Mon Sep 17 00:00:00 2001 From: ik1g19 <ik1g19@soton.ac.uk> Date: Sat, 10 Apr 2021 00:55:07 +0100 Subject: [PATCH] add controller and client --- AbstractTest.java | 43 --------- FileCopyTest.java | 37 -------- FileReader.java | 31 ------ FileReceiver.java | 42 --------- FileSender.java | 26 ----- FileWriter.java | 36 ------- Main.java | 9 -- distributed-systems-cw.iml | 11 --- ftp/Controller.java | 19 ---- nioExampleFiles/README.md | 3 - nioExampleFiles/distributed-systems-cw.iml | 21 ----- src/ftp/Client.java | 82 ++++++++++++++++ src/ftp/Controller.java | 105 +++++++++++++++++++++ 13 files changed, 187 insertions(+), 278 deletions(-) delete mode 100644 AbstractTest.java delete mode 100644 FileCopyTest.java delete mode 100644 FileReader.java delete mode 100644 FileReceiver.java delete mode 100644 FileSender.java delete mode 100644 FileWriter.java delete mode 100644 Main.java delete mode 100644 distributed-systems-cw.iml delete mode 100644 ftp/Controller.java delete mode 100644 nioExampleFiles/README.md delete mode 100644 nioExampleFiles/distributed-systems-cw.iml create mode 100644 src/ftp/Client.java create mode 100644 src/ftp/Controller.java diff --git a/AbstractTest.java b/AbstractTest.java deleted file mode 100644 index 967b8aa..0000000 --- a/AbstractTest.java +++ /dev/null @@ -1,43 +0,0 @@ -package com.javacodegeeks.nio.large_file_transfer; - -import static org.junit.Assert.assertEquals; - -import java.io.File; -import java.io.IOException; -import java.nio.file.Files; - -import org.junit.AfterClass; -import org.junit.Before; -import org.junit.BeforeClass; - -public abstract class AbstractTest { - - protected static final String SRC = "/tmp/input.tar.gz"; - protected static final String TARGET = "/tmp/output"; - protected static final long AWAIT_TEST_COMPLETE = 20000l; - - protected File srcFile; - protected File targetFile; - - @BeforeClass - public static void init() { - Thread.currentThread().getContextClassLoader().setDefaultAssertionStatus(true); - } - - @AfterClass - public static void destroy() { - Thread.currentThread().getContextClassLoader().setDefaultAssertionStatus(false); - } - - @Before - public void setUp() throws IOException { - this.srcFile = new File(SRC); - this.targetFile = new File(TARGET + "/" + this.srcFile.getName()); - - Files.deleteIfExists(this.targetFile.toPath()); - } - - protected final void compare() { - assertEquals("file did not copy completely", this.srcFile.length(), this.targetFile.length()); - } -} diff --git a/FileCopyTest.java b/FileCopyTest.java deleted file mode 100644 index d7248a9..0000000 --- a/FileCopyTest.java +++ /dev/null @@ -1,37 +0,0 @@ -package com.javacodegeeks.nio.large_file_transfer.remote; - -import java.io.IOException; -import java.util.concurrent.CountDownLatch; - -import org.junit.Test; - -import com.javacodegeeks.nio.large_file_transfer.AbstractTest; - -public class FileCopyTest extends AbstractTest { - - private static final int PORT = 9999; - - @Test - public void copyLargeFile() throws IOException, InterruptedException { - final CountDownLatch latch = new CountDownLatch(1); - final FileReceiver receiver = new FileReceiver(PORT, new FileWriter(TARGET + "/" + super.srcFile.getName()), super.srcFile.length()); - - new Thread() { - public void run() { - try { - receiver.receive(); - } catch (IOException e) { - - } finally { - latch.countDown(); - } - } - }.start(); - - final FileReader reader = new FileReader(new FileSender(PORT), SRC); - reader.read(); - - latch.await(); - compare(); - } -} diff --git a/FileReader.java b/FileReader.java deleted file mode 100644 index aeba501..0000000 --- a/FileReader.java +++ /dev/null @@ -1,31 +0,0 @@ -final class FileReader { - - private final FileChannel channel; - private final FileSender sender; - - FileReader(final FileSender sender, final String path) throws IOException { - if (Objects.isNull(sender) || StringUtils.isEmpty(path)) { - throw new IllegalArgumentException("sender and path required"); - } - - this.sender = sender; - this.channel = FileChannel.open(Paths.get(path), StandardOpenOption.READ); - } - - void read() throws IOException { - try { - transfer(); - } finally { - close(); - } - } - - void close() throws IOException { - this.sender.close(); - this.channel.close(); - } - - private void transfer() throws IOException { - this.sender.transfer(this.channel, 0l, this.channel.size()); - } -} \ No newline at end of file diff --git a/FileReceiver.java b/FileReceiver.java deleted file mode 100644 index 98e4089..0000000 --- a/FileReceiver.java +++ /dev/null @@ -1,42 +0,0 @@ -final class FileReceiver { - - private final int port; - private final FileWriter fileWriter; - private final long size; - - FileReceiver(final int port, final FileWriter fileWriter, final long size) { - this.port = port; - this.fileWriter = fileWriter; - this.size = size; - } - - void receive() throws IOException { - SocketChannel channel = null; - - try (final ServerSocketChannel serverSocketChannel = ServerSocketChannel.open()) { - init(serverSocketChannel); - - channel = serverSocketChannel.accept(); - - doTransfer(channel); - } finally { - if (!Objects.isNull(channel)) { - channel.close(); - } - - this.fileWriter.close(); - } - } - - private void doTransfer(final SocketChannel channel) throws IOException { - assert !Objects.isNull(channel); - - this.fileWriter.transfer(channel, this.size); - } - - private void init(final ServerSocketChannel serverSocketChannel) throws IOException { - assert !Objects.isNull(serverSocketChannel); - - serverSocketChannel.bind(new InetSocketAddress(this.port)); - } -} \ No newline at end of file diff --git a/FileSender.java b/FileSender.java deleted file mode 100644 index 03dea3d..0000000 --- a/FileSender.java +++ /dev/null @@ -1,26 +0,0 @@ -final class FileSender { - - private final InetSocketAddress hostAddress; - private SocketChannel client; - - FileSender(final int port) throws IOException { - this.hostAddress = new InetSocketAddress(port); - this.client = SocketChannel.open(this.hostAddress); - } - - void transfer(final FileChannel channel, long position, long size) throws IOException { - assert !Objects.isNull(channel); - - while (position < size) { - position += channel.transferTo(position, Constants.TRANSFER_MAX_SIZE, this.client); - } - } - - SocketChannel getChannel() { - return this.client; - } - - void close() throws IOException { - this.client.close(); - } -} \ No newline at end of file diff --git a/FileWriter.java b/FileWriter.java deleted file mode 100644 index 1d65527..0000000 --- a/FileWriter.java +++ /dev/null @@ -1,36 +0,0 @@ -final class FileWriter { - - private final FileChannel channel; - - FileWriter(final String path) throws IOException { - if (StringUtils.isEmpty(path)) { - throw new IllegalArgumentException("path required"); - } - - this.channel = FileChannel.open(Paths.get(path), StandardOpenOption.WRITE, StandardOpenOption.CREATE_NEW); - } - - void transfer(final SocketChannel channel, final long bytes) throws IOException { - assert !Objects.isNull(channel); - - long position = 0l; - while (position < bytes) { - position += this.channel.transferFrom(channel, position, Constants.TRANSFER_MAX_SIZE); - } - } - - int write(final ByteBuffer buffer, long position) throws IOException { - assert !Objects.isNull(buffer); - - int bytesWritten = 0; - while(buffer.hasRemaining()) { - bytesWritten += this.channel.write(buffer, position + bytesWritten); - } - - return bytesWritten; - } - - void close() throws IOException { - this.channel.close(); - } -} \ No newline at end of file diff --git a/Main.java b/Main.java deleted file mode 100644 index 900ce89..0000000 --- a/Main.java +++ /dev/null @@ -1,9 +0,0 @@ -import FileReader -import FileSender - - -public class Main { - public static void main(String args[]) { - FileReader reader = new FileReader(new FileSender(3125),"test.txt"); - } -} \ No newline at end of file diff --git a/distributed-systems-cw.iml b/distributed-systems-cw.iml deleted file mode 100644 index b107a2d..0000000 --- a/distributed-systems-cw.iml +++ /dev/null @@ -1,11 +0,0 @@ -<?xml version="1.0" encoding="UTF-8"?> -<module type="JAVA_MODULE" version="4"> - <component name="NewModuleRootManager" inherit-compiler-output="true"> - <exclude-output /> - <content url="file://$MODULE_DIR$"> - <sourceFolder url="file://$MODULE_DIR$" isTestSource="false" /> - </content> - <orderEntry type="inheritedJdk" /> - <orderEntry type="sourceFolder" forTests="false" /> - </component> -</module> \ No newline at end of file diff --git a/ftp/Controller.java b/ftp/Controller.java deleted file mode 100644 index d53915f..0000000 --- a/ftp/Controller.java +++ /dev/null @@ -1,19 +0,0 @@ -package ftp; - -public class Controller { - int cport; - int r; - int timeout; - int rbPeriod; - - public Controller(int cport, int r, int timeout, int rbPeriod) { - this.cport = cport; - this.r = r; - this.timeout = timeout; - this.rbPeriod = rbPeriod; - } - - public static void main(String args[]) { - Controller ctrl = new Controller(Integer.parseInt(args[0]),Integer.parseInt(args[1]),Integer.parseInt(args[2]),Integer.parseInt(args[3])); - } -} \ No newline at end of file diff --git a/nioExampleFiles/README.md b/nioExampleFiles/README.md deleted file mode 100644 index 75843ce..0000000 --- a/nioExampleFiles/README.md +++ /dev/null @@ -1,3 +0,0 @@ -# Distributed Systems Coursework - -a distributed file system, maybe some day it will work diff --git a/nioExampleFiles/distributed-systems-cw.iml b/nioExampleFiles/distributed-systems-cw.iml deleted file mode 100644 index c6df852..0000000 --- a/nioExampleFiles/distributed-systems-cw.iml +++ /dev/null @@ -1,21 +0,0 @@ -<?xml version="1.0" encoding="UTF-8"?> -<module type="JAVA_MODULE" version="4"> - <component name="NewModuleRootManager" inherit-compiler-output="true"> - <exclude-output /> - <content url="file://$MODULE_DIR$"> - <sourceFolder url="file://$MODULE_DIR$" isTestSource="false" /> - </content> - <orderEntry type="inheritedJdk" /> - <orderEntry type="sourceFolder" forTests="false" /> - <orderEntry type="module-library"> - <library name="JUnit4"> - <CLASSES> - <root url="jar://$MAVEN_REPOSITORY$/junit/junit/4.12/junit-4.12.jar!/" /> - <root url="jar://$MAVEN_REPOSITORY$/org/hamcrest/hamcrest-core/1.3/hamcrest-core-1.3.jar!/" /> - </CLASSES> - <JAVADOC /> - <SOURCES /> - </library> - </orderEntry> - </component> -</module> \ No newline at end of file diff --git a/src/ftp/Client.java b/src/ftp/Client.java new file mode 100644 index 0000000..7da169f --- /dev/null +++ b/src/ftp/Client.java @@ -0,0 +1,82 @@ +package ftp; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.nio.ByteBuffer; +import java.nio.channels.SocketChannel; +import java.util.Arrays; +import java.util.List; +import java.util.Scanner; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +public class Client { + + int cport; + int timeout; + + private static SocketChannel client; + private static ByteBuffer buffer; + private static Client instance; + + + /** + * @desc constructs a client + * @param cport controller port to use + * @param timeout timeout (ms) + */ + public Client(int cport, int timeout) { + this.cport = cport; + this.timeout = timeout; + + try { + client = SocketChannel.open(new InetSocketAddress(cport)); + buffer = ByteBuffer.allocate(256); + } catch (IOException e) { + e.printStackTrace(); + } + } + + + public String sendMessage(String msg) { + buffer = ByteBuffer.wrap(msg.getBytes()); + String response = null; + try { + client.write(buffer); + buffer.clear(); + client.read(buffer); + response = new String(buffer.array()).trim(); + System.out.println("response=" + response); + buffer.clear(); + } catch (IOException e) { + e.printStackTrace(); + } + return response; + + } + + + public static void main(String args[]) { + Stream<String> str = Arrays.stream(args); + + List<Integer> intArgs = str.map(x -> {return Integer.parseInt(x);}) + .collect(Collectors.toList()); + + instance = new Client(intArgs.get(0),intArgs.get(1)); + + Scanner scanner = new Scanner(System.in); + String command = ""; + + while (!command.equals("quit")) { + + System.out.println("Enter Command:"); + command = scanner.nextLine(); + + String response = instance.sendMessage(command); + + System.out.println(response); + + } + } + +} diff --git a/src/ftp/Controller.java b/src/ftp/Controller.java new file mode 100644 index 0000000..ca52024 --- /dev/null +++ b/src/ftp/Controller.java @@ -0,0 +1,105 @@ +package ftp; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.nio.ByteBuffer; +import java.nio.channels.SelectionKey; +import java.nio.channels.Selector; +import java.nio.channels.ServerSocketChannel; +import java.nio.channels.SocketChannel; +import java.util.*; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +public class Controller { + + int cport; + int r; + int timeout; + int rbPeriod; + + private static final String POISON_PILL = "POISON_PILL"; + + + /** + * @desc constructs a controller + * @param cport to listen on + * @param r replication factor + * @param timeout timeout (ms) + * @param rbPeriod rebalance period (ms) + */ + public Controller(int cport, int r, int timeout, int rbPeriod) throws IOException { + this.cport = cport; + this.r = r; + this.timeout = timeout; + this.rbPeriod = rbPeriod; + + + Selector selector = Selector.open(); + ServerSocketChannel serverSocket = ServerSocketChannel.open(); + serverSocket.bind(new InetSocketAddress(cport)); + serverSocket.configureBlocking(false); + serverSocket.register(selector, SelectionKey.OP_ACCEPT); + ByteBuffer buffer = ByteBuffer.allocate(256); + + while (true) { + selector.select(); + Set<SelectionKey> selectedKeys = selector.selectedKeys(); + Iterator<SelectionKey> iter = selectedKeys.iterator(); + while (iter.hasNext()) { + + SelectionKey key = iter.next(); + + if (key.isAcceptable()) { + register(selector, serverSocket); + } + + if (key.isReadable()) { + answerWithEcho(buffer, key); + } + iter.remove(); + } + } + } + + + private static void answerWithEcho(ByteBuffer buffer, SelectionKey key) + throws IOException { + + SocketChannel client = (SocketChannel) key.channel(); + client.read(buffer); + if (new String(buffer.array()).trim().equals(POISON_PILL)) { + client.close(); + System.out.println("Not accepting client messages anymore"); + } + else { + buffer.flip(); + client.write(buffer); + buffer.clear(); + } + } + + + private static void register(Selector selector, ServerSocketChannel serverSocket) + throws IOException { + + SocketChannel client = serverSocket.accept(); + client.configureBlocking(false); + client.register(selector, SelectionKey.OP_READ); + } + + + public static void main(String args[]) { + Stream<String> str = Arrays.stream(args); + + List<Integer> intArgs = str.map(x -> {return Integer.parseInt(x);}) + .collect(Collectors.toList()); + + try { + Controller ctrl = new Controller(intArgs.get(0), intArgs.get(1), intArgs.get(2), intArgs.get(3)); + } catch (IOException e) { + System.out.println("IOException " + e.getMessage()); + } + } + +} \ No newline at end of file -- GitLab