diff --git a/AbstractTest.java b/AbstractTest.java deleted file mode 100644 index 967b8aa45eb52602efe3642538943208da955e3a..0000000000000000000000000000000000000000 --- a/AbstractTest.java +++ /dev/null @@ -1,43 +0,0 @@ -package com.javacodegeeks.nio.large_file_transfer; - -import static org.junit.Assert.assertEquals; - -import java.io.File; -import java.io.IOException; -import java.nio.file.Files; - -import org.junit.AfterClass; -import org.junit.Before; -import org.junit.BeforeClass; - -public abstract class AbstractTest { - - protected static final String SRC = "/tmp/input.tar.gz"; - protected static final String TARGET = "/tmp/output"; - protected static final long AWAIT_TEST_COMPLETE = 20000l; - - protected File srcFile; - protected File targetFile; - - @BeforeClass - public static void init() { - Thread.currentThread().getContextClassLoader().setDefaultAssertionStatus(true); - } - - @AfterClass - public static void destroy() { - Thread.currentThread().getContextClassLoader().setDefaultAssertionStatus(false); - } - - @Before - public void setUp() throws IOException { - this.srcFile = new File(SRC); - this.targetFile = new File(TARGET + "/" + this.srcFile.getName()); - - Files.deleteIfExists(this.targetFile.toPath()); - } - - protected final void compare() { - assertEquals("file did not copy completely", this.srcFile.length(), this.targetFile.length()); - } -} diff --git a/FileCopyTest.java b/FileCopyTest.java deleted file mode 100644 index d7248a9581eb9d7a8ca4a4bfd659e6ab656be057..0000000000000000000000000000000000000000 --- a/FileCopyTest.java +++ /dev/null @@ -1,37 +0,0 @@ -package com.javacodegeeks.nio.large_file_transfer.remote; - -import java.io.IOException; -import java.util.concurrent.CountDownLatch; - -import org.junit.Test; - -import com.javacodegeeks.nio.large_file_transfer.AbstractTest; - -public class FileCopyTest extends AbstractTest { - - private static final int PORT = 9999; - - @Test - public void copyLargeFile() throws IOException, InterruptedException { - final CountDownLatch latch = new CountDownLatch(1); - final FileReceiver receiver = new FileReceiver(PORT, new FileWriter(TARGET + "/" + super.srcFile.getName()), super.srcFile.length()); - - new Thread() { - public void run() { - try { - receiver.receive(); - } catch (IOException e) { - - } finally { - latch.countDown(); - } - } - }.start(); - - final FileReader reader = new FileReader(new FileSender(PORT), SRC); - reader.read(); - - latch.await(); - compare(); - } -} diff --git a/FileReader.java b/FileReader.java deleted file mode 100644 index aeba501b40ffea0916e4588d6a57a57d2edbe247..0000000000000000000000000000000000000000 --- a/FileReader.java +++ /dev/null @@ -1,31 +0,0 @@ -final class FileReader { - - private final FileChannel channel; - private final FileSender sender; - - FileReader(final FileSender sender, final String path) throws IOException { - if (Objects.isNull(sender) || StringUtils.isEmpty(path)) { - throw new IllegalArgumentException("sender and path required"); - } - - this.sender = sender; - this.channel = FileChannel.open(Paths.get(path), StandardOpenOption.READ); - } - - void read() throws IOException { - try { - transfer(); - } finally { - close(); - } - } - - void close() throws IOException { - this.sender.close(); - this.channel.close(); - } - - private void transfer() throws IOException { - this.sender.transfer(this.channel, 0l, this.channel.size()); - } -} \ No newline at end of file diff --git a/FileReceiver.java b/FileReceiver.java deleted file mode 100644 index 98e4089644cf94f0e014d471793f22c2b6086f89..0000000000000000000000000000000000000000 --- a/FileReceiver.java +++ /dev/null @@ -1,42 +0,0 @@ -final class FileReceiver { - - private final int port; - private final FileWriter fileWriter; - private final long size; - - FileReceiver(final int port, final FileWriter fileWriter, final long size) { - this.port = port; - this.fileWriter = fileWriter; - this.size = size; - } - - void receive() throws IOException { - SocketChannel channel = null; - - try (final ServerSocketChannel serverSocketChannel = ServerSocketChannel.open()) { - init(serverSocketChannel); - - channel = serverSocketChannel.accept(); - - doTransfer(channel); - } finally { - if (!Objects.isNull(channel)) { - channel.close(); - } - - this.fileWriter.close(); - } - } - - private void doTransfer(final SocketChannel channel) throws IOException { - assert !Objects.isNull(channel); - - this.fileWriter.transfer(channel, this.size); - } - - private void init(final ServerSocketChannel serverSocketChannel) throws IOException { - assert !Objects.isNull(serverSocketChannel); - - serverSocketChannel.bind(new InetSocketAddress(this.port)); - } -} \ No newline at end of file diff --git a/FileSender.java b/FileSender.java deleted file mode 100644 index 03dea3d4c6013cd089ea09854d27dd5ab6980433..0000000000000000000000000000000000000000 --- a/FileSender.java +++ /dev/null @@ -1,26 +0,0 @@ -final class FileSender { - - private final InetSocketAddress hostAddress; - private SocketChannel client; - - FileSender(final int port) throws IOException { - this.hostAddress = new InetSocketAddress(port); - this.client = SocketChannel.open(this.hostAddress); - } - - void transfer(final FileChannel channel, long position, long size) throws IOException { - assert !Objects.isNull(channel); - - while (position < size) { - position += channel.transferTo(position, Constants.TRANSFER_MAX_SIZE, this.client); - } - } - - SocketChannel getChannel() { - return this.client; - } - - void close() throws IOException { - this.client.close(); - } -} \ No newline at end of file diff --git a/FileWriter.java b/FileWriter.java deleted file mode 100644 index 1d655271d092a57e53cab1c78844ee7062b4f7a9..0000000000000000000000000000000000000000 --- a/FileWriter.java +++ /dev/null @@ -1,36 +0,0 @@ -final class FileWriter { - - private final FileChannel channel; - - FileWriter(final String path) throws IOException { - if (StringUtils.isEmpty(path)) { - throw new IllegalArgumentException("path required"); - } - - this.channel = FileChannel.open(Paths.get(path), StandardOpenOption.WRITE, StandardOpenOption.CREATE_NEW); - } - - void transfer(final SocketChannel channel, final long bytes) throws IOException { - assert !Objects.isNull(channel); - - long position = 0l; - while (position < bytes) { - position += this.channel.transferFrom(channel, position, Constants.TRANSFER_MAX_SIZE); - } - } - - int write(final ByteBuffer buffer, long position) throws IOException { - assert !Objects.isNull(buffer); - - int bytesWritten = 0; - while(buffer.hasRemaining()) { - bytesWritten += this.channel.write(buffer, position + bytesWritten); - } - - return bytesWritten; - } - - void close() throws IOException { - this.channel.close(); - } -} \ No newline at end of file diff --git a/Main.java b/Main.java deleted file mode 100644 index 900ce894faff5915cd75902476d1a85a838333f3..0000000000000000000000000000000000000000 --- a/Main.java +++ /dev/null @@ -1,9 +0,0 @@ -import FileReader -import FileSender - - -public class Main { - public static void main(String args[]) { - FileReader reader = new FileReader(new FileSender(3125),"test.txt"); - } -} \ No newline at end of file diff --git a/distributed-systems-cw.iml b/distributed-systems-cw.iml deleted file mode 100644 index b107a2dd81165eaaf682ad3da030668b937fbb6c..0000000000000000000000000000000000000000 --- a/distributed-systems-cw.iml +++ /dev/null @@ -1,11 +0,0 @@ -<?xml version="1.0" encoding="UTF-8"?> -<module type="JAVA_MODULE" version="4"> - <component name="NewModuleRootManager" inherit-compiler-output="true"> - <exclude-output /> - <content url="file://$MODULE_DIR$"> - <sourceFolder url="file://$MODULE_DIR$" isTestSource="false" /> - </content> - <orderEntry type="inheritedJdk" /> - <orderEntry type="sourceFolder" forTests="false" /> - </component> -</module> \ No newline at end of file diff --git a/ftp/Controller.java b/ftp/Controller.java deleted file mode 100644 index d53915f3ab67b8aa4b812f27702346eac40c3e70..0000000000000000000000000000000000000000 --- a/ftp/Controller.java +++ /dev/null @@ -1,19 +0,0 @@ -package ftp; - -public class Controller { - int cport; - int r; - int timeout; - int rbPeriod; - - public Controller(int cport, int r, int timeout, int rbPeriod) { - this.cport = cport; - this.r = r; - this.timeout = timeout; - this.rbPeriod = rbPeriod; - } - - public static void main(String args[]) { - Controller ctrl = new Controller(Integer.parseInt(args[0]),Integer.parseInt(args[1]),Integer.parseInt(args[2]),Integer.parseInt(args[3])); - } -} \ No newline at end of file diff --git a/nioExampleFiles/README.md b/nioExampleFiles/README.md deleted file mode 100644 index 75843ce39d2d20e40c8bcc2c11b12c32201873a5..0000000000000000000000000000000000000000 --- a/nioExampleFiles/README.md +++ /dev/null @@ -1,3 +0,0 @@ -# Distributed Systems Coursework - -a distributed file system, maybe some day it will work diff --git a/nioExampleFiles/distributed-systems-cw.iml b/nioExampleFiles/distributed-systems-cw.iml deleted file mode 100644 index c6df852fbd1b055c82563fd73a611d53d9355eb3..0000000000000000000000000000000000000000 --- a/nioExampleFiles/distributed-systems-cw.iml +++ /dev/null @@ -1,21 +0,0 @@ -<?xml version="1.0" encoding="UTF-8"?> -<module type="JAVA_MODULE" version="4"> - <component name="NewModuleRootManager" inherit-compiler-output="true"> - <exclude-output /> - <content url="file://$MODULE_DIR$"> - <sourceFolder url="file://$MODULE_DIR$" isTestSource="false" /> - </content> - <orderEntry type="inheritedJdk" /> - <orderEntry type="sourceFolder" forTests="false" /> - <orderEntry type="module-library"> - <library name="JUnit4"> - <CLASSES> - <root url="jar://$MAVEN_REPOSITORY$/junit/junit/4.12/junit-4.12.jar!/" /> - <root url="jar://$MAVEN_REPOSITORY$/org/hamcrest/hamcrest-core/1.3/hamcrest-core-1.3.jar!/" /> - </CLASSES> - <JAVADOC /> - <SOURCES /> - </library> - </orderEntry> - </component> -</module> \ No newline at end of file diff --git a/src/ftp/Client.java b/src/ftp/Client.java new file mode 100644 index 0000000000000000000000000000000000000000..7da169f19b3119ac71f6c3f1752b1e5d2ec2a26a --- /dev/null +++ b/src/ftp/Client.java @@ -0,0 +1,82 @@ +package ftp; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.nio.ByteBuffer; +import java.nio.channels.SocketChannel; +import java.util.Arrays; +import java.util.List; +import java.util.Scanner; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +public class Client { + + int cport; + int timeout; + + private static SocketChannel client; + private static ByteBuffer buffer; + private static Client instance; + + + /** + * @desc constructs a client + * @param cport controller port to use + * @param timeout timeout (ms) + */ + public Client(int cport, int timeout) { + this.cport = cport; + this.timeout = timeout; + + try { + client = SocketChannel.open(new InetSocketAddress(cport)); + buffer = ByteBuffer.allocate(256); + } catch (IOException e) { + e.printStackTrace(); + } + } + + + public String sendMessage(String msg) { + buffer = ByteBuffer.wrap(msg.getBytes()); + String response = null; + try { + client.write(buffer); + buffer.clear(); + client.read(buffer); + response = new String(buffer.array()).trim(); + System.out.println("response=" + response); + buffer.clear(); + } catch (IOException e) { + e.printStackTrace(); + } + return response; + + } + + + public static void main(String args[]) { + Stream<String> str = Arrays.stream(args); + + List<Integer> intArgs = str.map(x -> {return Integer.parseInt(x);}) + .collect(Collectors.toList()); + + instance = new Client(intArgs.get(0),intArgs.get(1)); + + Scanner scanner = new Scanner(System.in); + String command = ""; + + while (!command.equals("quit")) { + + System.out.println("Enter Command:"); + command = scanner.nextLine(); + + String response = instance.sendMessage(command); + + System.out.println(response); + + } + } + +} diff --git a/src/ftp/Controller.java b/src/ftp/Controller.java new file mode 100644 index 0000000000000000000000000000000000000000..ca52024765bfebc463ec846245d0a76d54799f48 --- /dev/null +++ b/src/ftp/Controller.java @@ -0,0 +1,105 @@ +package ftp; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.nio.ByteBuffer; +import java.nio.channels.SelectionKey; +import java.nio.channels.Selector; +import java.nio.channels.ServerSocketChannel; +import java.nio.channels.SocketChannel; +import java.util.*; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +public class Controller { + + int cport; + int r; + int timeout; + int rbPeriod; + + private static final String POISON_PILL = "POISON_PILL"; + + + /** + * @desc constructs a controller + * @param cport to listen on + * @param r replication factor + * @param timeout timeout (ms) + * @param rbPeriod rebalance period (ms) + */ + public Controller(int cport, int r, int timeout, int rbPeriod) throws IOException { + this.cport = cport; + this.r = r; + this.timeout = timeout; + this.rbPeriod = rbPeriod; + + + Selector selector = Selector.open(); + ServerSocketChannel serverSocket = ServerSocketChannel.open(); + serverSocket.bind(new InetSocketAddress(cport)); + serverSocket.configureBlocking(false); + serverSocket.register(selector, SelectionKey.OP_ACCEPT); + ByteBuffer buffer = ByteBuffer.allocate(256); + + while (true) { + selector.select(); + Set<SelectionKey> selectedKeys = selector.selectedKeys(); + Iterator<SelectionKey> iter = selectedKeys.iterator(); + while (iter.hasNext()) { + + SelectionKey key = iter.next(); + + if (key.isAcceptable()) { + register(selector, serverSocket); + } + + if (key.isReadable()) { + answerWithEcho(buffer, key); + } + iter.remove(); + } + } + } + + + private static void answerWithEcho(ByteBuffer buffer, SelectionKey key) + throws IOException { + + SocketChannel client = (SocketChannel) key.channel(); + client.read(buffer); + if (new String(buffer.array()).trim().equals(POISON_PILL)) { + client.close(); + System.out.println("Not accepting client messages anymore"); + } + else { + buffer.flip(); + client.write(buffer); + buffer.clear(); + } + } + + + private static void register(Selector selector, ServerSocketChannel serverSocket) + throws IOException { + + SocketChannel client = serverSocket.accept(); + client.configureBlocking(false); + client.register(selector, SelectionKey.OP_READ); + } + + + public static void main(String args[]) { + Stream<String> str = Arrays.stream(args); + + List<Integer> intArgs = str.map(x -> {return Integer.parseInt(x);}) + .collect(Collectors.toList()); + + try { + Controller ctrl = new Controller(intArgs.get(0), intArgs.get(1), intArgs.get(2), intArgs.get(3)); + } catch (IOException e) { + System.out.println("IOException " + e.getMessage()); + } + } + +} \ No newline at end of file