Skip to content
Snippets Groups Projects
Commit efdcc927 authored by ik1g19's avatar ik1g19
Browse files

add controller and client

parent 0e684ee3
Branches
No related tags found
No related merge requests found
package com.javacodegeeks.nio.large_file_transfer;
import static org.junit.Assert.assertEquals;
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
public abstract class AbstractTest {
protected static final String SRC = "/tmp/input.tar.gz";
protected static final String TARGET = "/tmp/output";
protected static final long AWAIT_TEST_COMPLETE = 20000l;
protected File srcFile;
protected File targetFile;
@BeforeClass
public static void init() {
Thread.currentThread().getContextClassLoader().setDefaultAssertionStatus(true);
}
@AfterClass
public static void destroy() {
Thread.currentThread().getContextClassLoader().setDefaultAssertionStatus(false);
}
@Before
public void setUp() throws IOException {
this.srcFile = new File(SRC);
this.targetFile = new File(TARGET + "/" + this.srcFile.getName());
Files.deleteIfExists(this.targetFile.toPath());
}
protected final void compare() {
assertEquals("file did not copy completely", this.srcFile.length(), this.targetFile.length());
}
}
package com.javacodegeeks.nio.large_file_transfer.remote;
import java.io.IOException;
import java.util.concurrent.CountDownLatch;
import org.junit.Test;
import com.javacodegeeks.nio.large_file_transfer.AbstractTest;
public class FileCopyTest extends AbstractTest {
private static final int PORT = 9999;
@Test
public void copyLargeFile() throws IOException, InterruptedException {
final CountDownLatch latch = new CountDownLatch(1);
final FileReceiver receiver = new FileReceiver(PORT, new FileWriter(TARGET + "/" + super.srcFile.getName()), super.srcFile.length());
new Thread() {
public void run() {
try {
receiver.receive();
} catch (IOException e) {
} finally {
latch.countDown();
}
}
}.start();
final FileReader reader = new FileReader(new FileSender(PORT), SRC);
reader.read();
latch.await();
compare();
}
}
final class FileReader {
private final FileChannel channel;
private final FileSender sender;
FileReader(final FileSender sender, final String path) throws IOException {
if (Objects.isNull(sender) || StringUtils.isEmpty(path)) {
throw new IllegalArgumentException("sender and path required");
}
this.sender = sender;
this.channel = FileChannel.open(Paths.get(path), StandardOpenOption.READ);
}
void read() throws IOException {
try {
transfer();
} finally {
close();
}
}
void close() throws IOException {
this.sender.close();
this.channel.close();
}
private void transfer() throws IOException {
this.sender.transfer(this.channel, 0l, this.channel.size());
}
}
\ No newline at end of file
final class FileReceiver {
private final int port;
private final FileWriter fileWriter;
private final long size;
FileReceiver(final int port, final FileWriter fileWriter, final long size) {
this.port = port;
this.fileWriter = fileWriter;
this.size = size;
}
void receive() throws IOException {
SocketChannel channel = null;
try (final ServerSocketChannel serverSocketChannel = ServerSocketChannel.open()) {
init(serverSocketChannel);
channel = serverSocketChannel.accept();
doTransfer(channel);
} finally {
if (!Objects.isNull(channel)) {
channel.close();
}
this.fileWriter.close();
}
}
private void doTransfer(final SocketChannel channel) throws IOException {
assert !Objects.isNull(channel);
this.fileWriter.transfer(channel, this.size);
}
private void init(final ServerSocketChannel serverSocketChannel) throws IOException {
assert !Objects.isNull(serverSocketChannel);
serverSocketChannel.bind(new InetSocketAddress(this.port));
}
}
\ No newline at end of file
final class FileSender {
private final InetSocketAddress hostAddress;
private SocketChannel client;
FileSender(final int port) throws IOException {
this.hostAddress = new InetSocketAddress(port);
this.client = SocketChannel.open(this.hostAddress);
}
void transfer(final FileChannel channel, long position, long size) throws IOException {
assert !Objects.isNull(channel);
while (position < size) {
position += channel.transferTo(position, Constants.TRANSFER_MAX_SIZE, this.client);
}
}
SocketChannel getChannel() {
return this.client;
}
void close() throws IOException {
this.client.close();
}
}
\ No newline at end of file
final class FileWriter {
private final FileChannel channel;
FileWriter(final String path) throws IOException {
if (StringUtils.isEmpty(path)) {
throw new IllegalArgumentException("path required");
}
this.channel = FileChannel.open(Paths.get(path), StandardOpenOption.WRITE, StandardOpenOption.CREATE_NEW);
}
void transfer(final SocketChannel channel, final long bytes) throws IOException {
assert !Objects.isNull(channel);
long position = 0l;
while (position < bytes) {
position += this.channel.transferFrom(channel, position, Constants.TRANSFER_MAX_SIZE);
}
}
int write(final ByteBuffer buffer, long position) throws IOException {
assert !Objects.isNull(buffer);
int bytesWritten = 0;
while(buffer.hasRemaining()) {
bytesWritten += this.channel.write(buffer, position + bytesWritten);
}
return bytesWritten;
}
void close() throws IOException {
this.channel.close();
}
}
\ No newline at end of file
import FileReader
import FileSender
public class Main {
public static void main(String args[]) {
FileReader reader = new FileReader(new FileSender(3125),"test.txt");
}
}
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<module type="JAVA_MODULE" version="4">
<component name="NewModuleRootManager" inherit-compiler-output="true">
<exclude-output />
<content url="file://$MODULE_DIR$">
<sourceFolder url="file://$MODULE_DIR$" isTestSource="false" />
</content>
<orderEntry type="inheritedJdk" />
<orderEntry type="sourceFolder" forTests="false" />
</component>
</module>
\ No newline at end of file
package ftp;
public class Controller {
int cport;
int r;
int timeout;
int rbPeriod;
public Controller(int cport, int r, int timeout, int rbPeriod) {
this.cport = cport;
this.r = r;
this.timeout = timeout;
this.rbPeriod = rbPeriod;
}
public static void main(String args[]) {
Controller ctrl = new Controller(Integer.parseInt(args[0]),Integer.parseInt(args[1]),Integer.parseInt(args[2]),Integer.parseInt(args[3]));
}
}
\ No newline at end of file
# Distributed Systems Coursework
a distributed file system, maybe some day it will work
<?xml version="1.0" encoding="UTF-8"?>
<module type="JAVA_MODULE" version="4">
<component name="NewModuleRootManager" inherit-compiler-output="true">
<exclude-output />
<content url="file://$MODULE_DIR$">
<sourceFolder url="file://$MODULE_DIR$" isTestSource="false" />
</content>
<orderEntry type="inheritedJdk" />
<orderEntry type="sourceFolder" forTests="false" />
<orderEntry type="module-library">
<library name="JUnit4">
<CLASSES>
<root url="jar://$MAVEN_REPOSITORY$/junit/junit/4.12/junit-4.12.jar!/" />
<root url="jar://$MAVEN_REPOSITORY$/org/hamcrest/hamcrest-core/1.3/hamcrest-core-1.3.jar!/" />
</CLASSES>
<JAVADOC />
<SOURCES />
</library>
</orderEntry>
</component>
</module>
\ No newline at end of file
package ftp;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
import java.util.Arrays;
import java.util.List;
import java.util.Scanner;
import java.util.stream.Collectors;
import java.util.stream.Stream;
public class Client {
int cport;
int timeout;
private static SocketChannel client;
private static ByteBuffer buffer;
private static Client instance;
/**
* @desc constructs a client
* @param cport controller port to use
* @param timeout timeout (ms)
*/
public Client(int cport, int timeout) {
this.cport = cport;
this.timeout = timeout;
try {
client = SocketChannel.open(new InetSocketAddress(cport));
buffer = ByteBuffer.allocate(256);
} catch (IOException e) {
e.printStackTrace();
}
}
public String sendMessage(String msg) {
buffer = ByteBuffer.wrap(msg.getBytes());
String response = null;
try {
client.write(buffer);
buffer.clear();
client.read(buffer);
response = new String(buffer.array()).trim();
System.out.println("response=" + response);
buffer.clear();
} catch (IOException e) {
e.printStackTrace();
}
return response;
}
public static void main(String args[]) {
Stream<String> str = Arrays.stream(args);
List<Integer> intArgs = str.map(x -> {return Integer.parseInt(x);})
.collect(Collectors.toList());
instance = new Client(intArgs.get(0),intArgs.get(1));
Scanner scanner = new Scanner(System.in);
String command = "";
while (!command.equals("quit")) {
System.out.println("Enter Command:");
command = scanner.nextLine();
String response = instance.sendMessage(command);
System.out.println(response);
}
}
}
package ftp;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.*;
import java.util.stream.Collectors;
import java.util.stream.Stream;
public class Controller {
int cport;
int r;
int timeout;
int rbPeriod;
private static final String POISON_PILL = "POISON_PILL";
/**
* @desc constructs a controller
* @param cport to listen on
* @param r replication factor
* @param timeout timeout (ms)
* @param rbPeriod rebalance period (ms)
*/
public Controller(int cport, int r, int timeout, int rbPeriod) throws IOException {
this.cport = cport;
this.r = r;
this.timeout = timeout;
this.rbPeriod = rbPeriod;
Selector selector = Selector.open();
ServerSocketChannel serverSocket = ServerSocketChannel.open();
serverSocket.bind(new InetSocketAddress(cport));
serverSocket.configureBlocking(false);
serverSocket.register(selector, SelectionKey.OP_ACCEPT);
ByteBuffer buffer = ByteBuffer.allocate(256);
while (true) {
selector.select();
Set<SelectionKey> selectedKeys = selector.selectedKeys();
Iterator<SelectionKey> iter = selectedKeys.iterator();
while (iter.hasNext()) {
SelectionKey key = iter.next();
if (key.isAcceptable()) {
register(selector, serverSocket);
}
if (key.isReadable()) {
answerWithEcho(buffer, key);
}
iter.remove();
}
}
}
private static void answerWithEcho(ByteBuffer buffer, SelectionKey key)
throws IOException {
SocketChannel client = (SocketChannel) key.channel();
client.read(buffer);
if (new String(buffer.array()).trim().equals(POISON_PILL)) {
client.close();
System.out.println("Not accepting client messages anymore");
}
else {
buffer.flip();
client.write(buffer);
buffer.clear();
}
}
private static void register(Selector selector, ServerSocketChannel serverSocket)
throws IOException {
SocketChannel client = serverSocket.accept();
client.configureBlocking(false);
client.register(selector, SelectionKey.OP_READ);
}
public static void main(String args[]) {
Stream<String> str = Arrays.stream(args);
List<Integer> intArgs = str.map(x -> {return Integer.parseInt(x);})
.collect(Collectors.toList());
try {
Controller ctrl = new Controller(intArgs.get(0), intArgs.get(1), intArgs.get(2), intArgs.get(3));
} catch (IOException e) {
System.out.println("IOException " + e.getMessage());
}
}
}
\ No newline at end of file
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment