Skip to content
Snippets Groups Projects
Commit e8cca92a authored by ik1g19's avatar ik1g19
Browse files

add start of the store operation

parent d744deee
No related branches found
No related tags found
No related merge requests found
Showing
with 96 additions and 277 deletions
File moved
File moved
File moved
import static org.junit.Assert.assertEquals;
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
public abstract class AbstractTest {
protected static final String SRC = "/README.md";
protected static final String TARGET = "/tmp/output";
protected static final long AWAIT_TEST_COMPLETE = 20000l;
protected File srcFile;
protected File targetFile;
@BeforeClass
public static void init() {
Thread.currentThread().getContextClassLoader().setDefaultAssertionStatus(true);
}
@AfterClass
public static void destroy() {
Thread.currentThread().getContextClassLoader().setDefaultAssertionStatus(false);
}
@Before
public void setUp() throws IOException {
this.srcFile = new File(SRC);
this.targetFile = new File(TARGET + "/" + this.srcFile.getName());
Files.deleteIfExists(this.targetFile.toPath());
}
protected final void compare() {
assertEquals("file did not copy completely", this.srcFile.length(), this.targetFile.length());
}
}
public final class Constants {
public static final String INSTANTIATION_NOT_ALLOWED = "Instantiation not allowed";
public static final long TRANSFER_MAX_SIZE = (1024 * 1024);
public static final int BUFFER_SIZE = 2048;
public static final String END_MESSAGE_MARKER = ":END";
public static final String MESSAGE_DELIMITTER = "#";
public static final String CONFIRMATION = "OK";
private Constants() {
throw new IllegalStateException(INSTANTIATION_NOT_ALLOWED);
}
}
import java.io.IOException;
import java.util.concurrent.CountDownLatch;
import org.junit.Test;
public class FileCopyTest extends AbstractTest {
private static final int PORT = 9999;
@Test
public void copyLargeFile() throws IOException, InterruptedException {
final CountDownLatch latch = new CountDownLatch(1);
final FileReceiver receiver = new FileReceiver(PORT, new FileWriter(TARGET + "/" + super.srcFile.getName()), super.srcFile.length());
new Thread() {
public void run() {
try {
receiver.receive();
} catch (IOException e) {
} finally {
latch.countDown();
}
}
}.start();
final FileReader reader = new FileReader(new FileSender(PORT), SRC);
reader.read();
latch.await();
compare();
}
}
import java.io.IOException;
import java.nio.channels.FileChannel;
import java.nio.file.Paths;
import java.nio.file.StandardOpenOption;
import java.util.Objects;
final class FileReader {
private final FileChannel channel;
private final FileSender sender;
FileReader(final FileSender sender, final String path) throws IOException {
if (Objects.isNull(sender) || path == "") {
throw new IllegalArgumentException("sender and path required");
}
this.sender = sender;
this.channel = FileChannel.open(Paths.get(path), StandardOpenOption.READ);
}
void read() throws IOException {
try {
transfer();
} finally {
close();
}
}
void close() throws IOException {
this.sender.close();
this.channel.close();
}
private void transfer() throws IOException {
this.sender.transfer(this.channel, 0l, this.channel.size());
}
}
\ No newline at end of file
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Objects;
final class FileReceiver {
private final int port;
private final FileWriter fileWriter;
private final long size;
FileReceiver(final int port, final FileWriter fileWriter, final long size) {
this.port = port;
this.fileWriter = fileWriter;
this.size = size;
}
void receive() throws IOException {
SocketChannel channel = null;
try (final ServerSocketChannel serverSocketChannel = ServerSocketChannel.open()) {
init(serverSocketChannel);
channel = serverSocketChannel.accept();
doTransfer(channel);
} finally {
if (!Objects.isNull(channel)) {
channel.close();
}
this.fileWriter.close();
}
}
private void doTransfer(final SocketChannel channel) throws IOException {
assert !Objects.isNull(channel);
this.fileWriter.transfer(channel, this.size);
}
private void init(final ServerSocketChannel serverSocketChannel) throws IOException {
assert !Objects.isNull(serverSocketChannel);
serverSocketChannel.bind(new InetSocketAddress(this.port));
}
}
\ No newline at end of file
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.FileChannel;
import java.nio.channels.SocketChannel;
import java.util.Objects;
final class FileSender {
private final InetSocketAddress hostAddress;
private SocketChannel client;
FileSender(final int port) throws IOException {
this.hostAddress = new InetSocketAddress(port);
this.client = SocketChannel.open(this.hostAddress);
}
void transfer(final FileChannel channel, long position, long size) throws IOException {
assert !Objects.isNull(channel);
while (position < size) {
position += channel.transferTo(position, Constants.TRANSFER_MAX_SIZE, this.client);
}
}
SocketChannel getChannel() {
return this.client;
}
void close() throws IOException {
this.client.close();
}
}
\ No newline at end of file
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.channels.SocketChannel;
import java.nio.file.Paths;
import java.nio.file.StandardOpenOption;
import java.util.Objects;
final class FileWriter {
private final FileChannel channel;
FileWriter(final String path) throws IOException {
if (path == "") {
throw new IllegalArgumentException("path required");
}
this.channel = FileChannel.open(Paths.get(path), StandardOpenOption.WRITE, StandardOpenOption.CREATE_NEW);
}
void transfer(final SocketChannel channel, final long bytes) throws IOException {
assert !Objects.isNull(channel);
long position = 0l;
while (position < bytes) {
position += this.channel.transferFrom(channel, position, Constants.TRANSFER_MAX_SIZE);
}
}
int write(final ByteBuffer buffer, long position) throws IOException {
assert !Objects.isNull(buffer);
int bytesWritten = 0;
while(buffer.hasRemaining()) {
bytesWritten += this.channel.write(buffer, position + bytesWritten);
}
return bytesWritten;
}
void close() throws IOException {
this.channel.close();
}
}
\ No newline at end of file
...@@ -11,10 +11,12 @@ import java.util.stream.Stream; ...@@ -11,10 +11,12 @@ import java.util.stream.Stream;
public class Controller extends Server { public class Controller extends Server {
int r; private int r;
int rbPeriod; private int rbPeriod;
private int nextID = 0;
private ArrayList<DstoreConnection> index; private DstoreIndex dstoreIndex;
private FileIndex fileIndex;
/** /**
...@@ -30,7 +32,8 @@ public class Controller extends Server { ...@@ -30,7 +32,8 @@ public class Controller extends Server {
this.timeout = timeout; this.timeout = timeout;
this.rbPeriod = rbPeriod; this.rbPeriod = rbPeriod;
index = new ArrayList<DstoreConnection>(); dstoreIndex = new DstoreIndex();
fileIndex = new FileIndex();
start(); start();
} }
...@@ -56,6 +59,8 @@ public class Controller extends Server { ...@@ -56,6 +59,8 @@ public class Controller extends Server {
switch(args[0]) { switch(args[0]) {
case "JOIN": case "JOIN":
Integer port = Integer.parseInt(args[1]);
send("LIST", client); send("LIST", client);
String files = readSocket(client); String files = readSocket(client);
...@@ -63,15 +68,38 @@ public class Controller extends Server { ...@@ -63,15 +68,38 @@ public class Controller extends Server {
// todo use stream instead // todo use stream instead
// //
List<String[]> filesInfo = new ArrayList<String[]>() {{ List<String[]> filesInfo = new ArrayList<String[]>() {{
String[] filesAndSizes = files.split(" | "); String[] filesAndSizes = files.split("\\|");
for (String file : filesAndSizes) add(file.split(" ")); for (String file : filesAndSizes) add(file.split(" "));
}}; }};
System.out.println("test"); dstoreIndex.addDstore(filesInfo, port, nextID);
threadIDOutput("New Dstore (ID: " + nextID + ") successfully joined");
nextID++;
send ("ACK", client);
break;
case "STORE":
String filename = args[1];
Long filesize = Long.parseLong(args[2]);
DstoreFile file = fileIndex.addFile(filename,filesize);
file.setStoreInProgress(true);
List<DstoreConnection> dstores = dstoreIndex.getFirstN(r);
dstores.stream().forEach(x -> x.addFile(file));
String ports = dstores.stream().
map(x -> Integer.toString(x.getPort())).
collect(Collectors.joining(" "));
index.add(new DstoreConnection(filesInfo, Integer.parseInt(args[1])));
threadIDOutput(index.get(0).getFile_index().getFile(0).getFilename()); send("STORE_TO " + ports, client);
break; break;
} }
......
...@@ -31,20 +31,37 @@ public class Dstore extends Server { ...@@ -31,20 +31,37 @@ public class Dstore extends Server {
* @param timeout timeout (ms) * @param timeout timeout (ms)
* @param file_folder where to store data locally * @param file_folder where to store data locally
*/ */
public Dstore(int port, int cport, int timeout, String file_folder) throws IOException{ public Dstore(int port, int cport, int timeout, String file_folder) {
this.port = port; this.port = port;
this.cport = cport; this.cport = cport;
this.timeout = timeout; this.timeout = timeout;
this.file_folder = file_folder; this.file_folder = file_folder;
Socket controller = new Socket("localhost",cport); Socket controller = null;
send("JOIN " + port, controller);
handleRequest(readSocket(controller), controller); Boolean joined = false;
for (int i = 0; (i < 10) && !joined; i++) {
try { controller = new Socket("localhost",cport); joined = true; }
catch(IOException e) {
threadIDErr(e.getMessage());
}
try {threadIDErr("Retrying Connection..."); Thread.sleep(1000);}
catch(InterruptedException e) {threadIDErr(e.getMessage());}
}
if (controller != null) {
send("JOIN " + port, controller);
handleRequest(readSocket(controller), controller);
start(); start();
} }
else {
threadIDErr("Unable to connect to Controller");
}
}
...@@ -55,11 +72,8 @@ public class Dstore extends Server { ...@@ -55,11 +72,8 @@ public class Dstore extends Server {
List<Integer> intArgs = str.map(x -> {return Integer.parseInt(x);}) List<Integer> intArgs = str.map(x -> {return Integer.parseInt(x);})
.collect(Collectors.toList()); .collect(Collectors.toList());
try {
Dstore dstore = new Dstore(intArgs.get(0), intArgs.get(1), intArgs.get(2), args[3]); Dstore dstore = new Dstore(intArgs.get(0), intArgs.get(1), intArgs.get(2), args[3]);
} catch (IOException e) {
System.out.println("IOException " + e.getMessage());
}
} }
...@@ -70,7 +84,7 @@ public class Dstore extends Server { ...@@ -70,7 +84,7 @@ public class Dstore extends Server {
switch(args[0]) { switch(args[0]) {
case "LIST": case "LIST":
File folder = new File("storage"); File folder = new File(file_folder);
//todo use stream instead //todo use stream instead
...@@ -79,15 +93,30 @@ public class Dstore extends Server { ...@@ -79,15 +93,30 @@ public class Dstore extends Server {
for (File file : folder.listFiles()) add(file.getName() + " " + file.length()); for (File file : folder.listFiles()) add(file.getName() + " " + file.length());
}}; }};
// todo use joining instead of reduce
//
String ident = files.get(0); String ident = files.get(0);
files.remove(0); files.remove(0);
String fileMessage = files.stream() String fileMessage = files.stream()
.reduce( ident, (file1, file2) -> (file1 + "|" + file2) ); .reduce( ident, (file1, file2) -> (file1 + "|" + file2) );
send(fileMessage, client); send(fileMessage, client);
String response = readSocket(client);
if (response.equals("ACK")) threadIDOutput("Successfully joined Controller");
break; break;
case "STORE":
String filename = args[1];
Long filesize = Long.parseLong(args[2]);
send("ACK", client);
} }
} }
......
...@@ -5,22 +5,28 @@ import java.util.stream.Collectors; ...@@ -5,22 +5,28 @@ import java.util.stream.Collectors;
public class DstoreConnection { public class DstoreConnection {
private Index file_index; private FileIndex file_index;
private int port; private int port;
private int id;
public DstoreConnection(List<String[]> files, int port) { public DstoreConnection(List<String[]> files, int port, int id) {
this.port = port; this.port = port;
this.id = id;
List<DstoreFile> dstoreFiles = files.stream() List<DstoreFile> dstoreFiles = files.stream()
.map(x -> {return new DstoreFile(x[0],Long.parseLong(x[1]));}) .map(x -> new DstoreFile(x[0],Long.parseLong(x[1])))
.collect(Collectors.toList()); .collect(Collectors.toList());
file_index = new Index(dstoreFiles); file_index = new FileIndex(dstoreFiles);
} }
public Index getFile_index() { public void addFile(String filename, Long filesize) { file_index.addFile(filename,filesize); }
return file_index;
} public void addFile(DstoreFile file) { file_index.add(file); }
public int getPort() { return port; }
} }
...@@ -24,6 +24,12 @@ public class DstoreFile { ...@@ -24,6 +24,12 @@ public class DstoreFile {
public boolean isRemoveComplete() {return removeComplete;} public boolean isRemoveComplete() {return removeComplete;}
public void setStoreInProgress(Boolean store) { storeInProgress = store; }
public void setStoreComplete(Boolean complete) { storeComplete = complete; }
public void setRemoveInProgress(Boolean remove) { removeInProgress = remove; }
public void setRemoveComplete(Boolean complete) { removeComplete = complete; }
public String getFilename() { return filename; } public String getFilename() { return filename; }
public Long getFilesize() { return filesize; } public Long getFilesize() { return filesize; }
} }
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment