From 24843952de5149cb10c28a3d0a40b2320e365544 Mon Sep 17 00:00:00 2001 From: Daniel <dl3g19@soton.ac.uk> Date: Wed, 31 Mar 2021 16:23:58 +0100 Subject: [PATCH] Code for initializing Controller and Dstores implemented Functions for receiving the possible messages implemented Main structure of functions included as an outline --- Controller.class | Bin 0 -> 3078 bytes Controller.java | 155 +++++++++++++++++++++++++++++++++++++++++++++++ Dstore.class | Bin 0 -> 3972 bytes Dstore.java | 144 +++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 299 insertions(+) create mode 100644 Controller.class create mode 100644 Dstore.class diff --git a/Controller.class b/Controller.class new file mode 100644 index 0000000000000000000000000000000000000000..366a48b3ad9a08deaf72845574e63c4516c2d101 GIT binary patch literal 3078 zcmX^0Z`VEs1_mpJ-CPVz49x5dEIbUX3~Y=H0$GV=iTXK-dFlH8Nm;4MC5#MgHko;u zC3cJq%o>_uoD3Wcoa_u-JPh0nJd6xn&iQ#IMfo{7sYQ$otjPuWMJ0?3jGiD>d^`;N zASLWYZi&ey`9&ZxK^_JnuvkfEZfbsM2}n$Yhd~r1#$S}0l$ev4mz)}qT9lce!p<Ph z$RG@MPH9PIj=p12QDUV}W^oA@gBSxdNT(DJgEWH-BLjO%aY=qrYB3{&fDb}FNU1d! zgB*iAJA(obgCc_xBZDAV8tgmIypq&(kgGWg5{rscJ@ZN!8ALT8`oRi<ONuh{(ycW; zxfnPYR6xO_%E-X3;pyq=sTl?mQ|DpO00kLqaY<rP2|I%pBZD%s`6;Ot{-q`UX-@g2 zc`3!N70IauC7JnooD4b)y6g;kJPi5_1}Fg@Tv=R_n#;(*oL>rZ48$#&`T7AxnRz9_ zB}J);xz-#EMvM%)&iT2yiFqjsIhlE>3W-JOrManjCB+IEiDjt@dHE#@NvWxM3I#>^ zWtl0dDO?OD45sW1W;_h$3>L5;MKzX@fxQ4^NlqS)KnP=JuwrD8M|jV#G&d==$SuDp zH?ainH4X+FMh0~pj>|1A202V26B6*n><o5{45DBY^HNLngHwyjQj3D~le1GxxEO31 z96%xN$jHE=0S<0k24@}y7Y0{G2DZdxP)IT|2y19Uf(Wb~tkIgC!JUyo3~Xp-zP?jw zT3TvRYD!RQVoGWeJA)@9gCtDOGq0c&8mJIyE(UJ~A9e;`9tJ-Ke?|s=utJ2<j0}9~ zsU=9d7#YN24#>>cN0PASV(?%H1UWy5kwF5Qq|lUrQji-%2uM*V$R`kwWajHb++q!p z59eWsV2EU7;3!H>O!3LgOJ!sbf!T?iMmZRw85tNAxENv?;@BDDc^DEH5>ZnYI4v_W zuof5OWR@^8s6$<aY=~ww#1Y71+ziPKDeMfXJPc_J>5L3~;4lH_Mo{izWMC=ItV#tJ z44xbenT!l9UjCkbAZKLrFyt`gGBU8G7M3RF6r-goNXcTY8O6?!k0rXoi!w_<(N)OE zAO*HDGhg4ov;-bs5P49174tBZFqASf@TaGiAgcg{J2Y{i>9yu!C}1cDg?j}fgCsN< z(A9uKKaZgbq^cSvb|5xeYld+!)G{)#x&{UL2XQgvG1T)gG=TCJYdOg2Adw~>hGuZ# zkeriWoC+!~T6q}S7}^;bcrz07QgTv#Q;Ule(^EllfhFJ}QGrN(&;-%M&d>$R!Qgm; zXFV>4PKF*HhF*}J{NTJCT#}fa9a5B-oXWw_&&a?HDycK`N>lap^gyNJL>`7o43ild z*vk@gN>lyQ7#YMgJfUF*HX2$$T7%NSR33(Dpy*^yOi2OxWd;w!Opr~S@G_f&VKyTJ zYjB8vkSj>RTposb4D&$=4pcHTGN?gA8p%fxFM_Rt6}4d;3=0_<SbY2)T|kB`=3!U@ zGK3{3KQV=aVHqO>Tac@-e;C+?6+8?pL6U4msk!-OsT>TeK{`EyLqIy$@-VDpSkK77 zl9O3n0?ITH%aCl>3}a;A49-l?ODriZN@ZkFg{Xk1Z5s$5>^xXV*jY0&usG+Zq%tz_ zgK{IJ3=Bz3%1LEpV98C)%mW1?me>SY#tV)Q-^2p2dUl2#j10n9lrl1K1?QI*C8xS& zf=uNH#Tul3)dOV(21N!A1||ju237_ZPz}t$z`z2kX2G-&0}q%Mh0@XtG7O9i8yFZE zm>4!Pa56A5urjDJFfwdnU|<MkU}j)tU|>+y+RnhJCB2n_T}x&w1Fx3sRt5nrQI@R? z!dkl-#3HvdNNi=0+{nPdz{s$ffq_Aeft`VYfe&mcCxZY3AA=x+FoO_-B!e)63WEp( zC&LzotxzkrGO#hQF)%QAXfYe@V34)ZVgXUMTC5<-PKymhnX&9<;E0r#kridx&Y*0? zD#^NyL2WsMxX&gAZ6U*L48|M5ENc*JBiQ2x48{za3=9kg47LoG3`}5GSuyZ2Ffd3k zuro+9@G(d+h%!hs$TG+>Xfwz$7%-?a7&9m^m@p_Z*f1zD*fOXv*fFRwxG|_PxHD)l zY-3<x;AUVhW@pg!WM{Ahg^~sX8^jBYR-i@^0|UdP-3+#oT3Z;{w=>xLXm4Y1^4-ng z7P*_jBT{D@gV%P3fb9&ye!ANj!i1u>F~nH0Xl-GT+{uu{$e^`_fp-yukt7SVkx<4q zhOAi(YP%WoB6l+sfOHmZXDHjwPzmz3P|Y@mI*{)hmoso_NwVl{V`$Oc#?Z0+|72gC zZ4BL@@aWsdV7&Z)Gspqmpupf^sAQ;N=wx7E=waw#=!6D_AA>Lh1A`6&sC^;GV8Ecl zV921)V8r0fV9XHDV8W2bV9JoqV8)QgV8KwzV8u|$V9!v)V8c+$V9U_RV8_tOV9(IQ z;Kb0&;LI?Q!4(`pMhvWf7<kwjZ2vJNvNL$IGq|xcY-jt=z{bGD&cMOWV9UVB&d~Xv zfgeP3fC!MQ+8Ej);mB~3frWvYfq|hy8yXHX8Dv3Wqzz6II~XSHWSGLl5U#b2Vfu20 zP>`Zo+ZeKDGVp2XXtNk?W0<p?!5$>K07Z1sat2M1=u#BX<;xj_L87a`q8wT}+Zfh> z5-=pG7=iPK2Lm^QCxaY=7lSH;4}$@NFM~CMKZ6@X07D={5JL<@Fhe>+C_@oL7(+Qj z1ULlP8CX?CMMe1;7(fLZ)LW7a3}A0Dg3O0x5KzitV2EL002N(?lqE1QfC@1}%2F5@ lK*br7GEfPCVQB^f1E^F+QpO0%x9D!mVPIs~2`&?M0RRV7$&dg5 literal 0 HcmV?d00001 diff --git a/Controller.java b/Controller.java index 34356cf..c44d8a7 100644 --- a/Controller.java +++ b/Controller.java @@ -1,3 +1,158 @@ +import java.io.*; +import java.net.*; +import java.util.List; +import java.util.ArrayList; + public class Controller { + protected int cport; //Port to listen on + protected int rFactor; //Replication factor; each file is replicated across r Dstores + protected int timeout; //in milliseconds + protected int rebalancePeriod; //How long to wait to start the next rebalance operation, in milliseconds + protected List<Integer> dstores; + + public Controller(int cport, int rFactor, int timeout, int rebalancePeriod) { + this.cport = cport; + this.rFactor = rFactor; + this.timeout = timeout; + this.rebalancePeriod = rebalancePeriod; + dstores = new ArrayList<Integer>(); + } + + public static void main(String[] args) { + try { + int cport = Integer.parseInt(args[0]); + int rFactor = Integer.parseInt(args[1]); + int timeout = Integer.parseInt(args[2]); + int rebalancePeriod = Integer.parseInt(args[3]); + + Controller controller = new Controller(cport, rFactor, timeout, rebalancePeriod); + controller.start(); + } + catch(IndexOutOfBoundsException e) { + System.out.println("Command line arguments have not been provided"); + return; + } + catch(NumberFormatException e) { + System.out.println("Command line arguments must be integers"); + return; + } + } + + public void start() { + try { + ServerSocket server = new ServerSocket(cport); + while(true) { + try { + Socket client = server.accept(); + BufferedReader in = new BufferedReader(new InputStreamReader(client.getInputStream())); + String[] message = in.readLine().split(" "); + if(dstores.size() < rFactor && !message[0].equals("JOIN")) { + BufferedWriter out = new BufferedWriter(new OutputStreamWriter(client.getOutputStream())); + out.write("ERROR"); + out.close(); + } + else { + handleMessage(message, client); + } + in.close(); + } + catch(Exception e) { + e.printStackTrace(); + System.out.println("Continue..."); + } + } + } + catch(Exception e) { + e.printStackTrace(); + } + } + + void handleMessage(String[] message, Socket client) { + if(message[0].equals("JOIN")) { + dstores.add(Integer.parseInt(message[1])); + rebalance(); + } + else if(message[0].equals("STORE")) { + store(client, message[1]); + } + else if(message[0].equals("LOAD")) { + load(client, message[1]); + } + else if(message[0].equals("REMOVE")) { + remove(client, message[1]); + } + else if(message[0].equals("LIST")) { + list(client); + } + } + + void store(Socket client, String filename) { + //Update index to "store in progress" + + //Select Dstores + int[] storesToStore = new int[rFactor]; + for(int i=0; i<rFactor; i++) { + storesToStore[i] = dstores.get(i).intValue(); + } + + //Send STORE_TO message + PrintWriter out = new PrintWriter(client.getOutputStream()); + out.print("STORE_TO"); + for(int port : storesToStore) { + out.print(" "); + out.print(port); + } + out.flush(); + + //Wait for STORE_ACKs from datastores in storesToStore + + //Update index to "store complete" + + //Send STORE_COMPLETE message + out.print("STORE_COMPLETE"); + out.flush(); + out.close(); + } + + void load(Socket client, String filename) { + //Select a Dstore which contains the file + + //Send LOAD_FROM message + } + + void remove(Socket client, String filename) { + //Update index to "remove in progress" + + //Send REMOVE message to all Dstores storing the file + + //Wait for REMOVE_ACKs from all Dstores which were sent the REMOVE message + + //Update index to "remove complete" + + //Send REMOVE_COMPLETE to client + PrintWriter clientOut = new PrintWriter(client.getOutputStream()); + clientOut.print("REMOVE_COMPLETE"); + clientOut.flush(); + clientOut.close(); + } + + void list(Socket client) { + //Send file list to client + } + + void rebalance() { + //Send LIST message to each Dstore and receive their file list + + //Create a new file allocation so that: + //Each file appears rFactor times + //Each file appears at most once on each datastore + //Files are evenly distributed (Dstores differ in capacity by at most 1, no 2 datastores have identical file lists) + + //Make a (files to send, files to remove) pair for each Dstore + + //Send the respective REBALANCE message to each Dstore + + //Wait for REBALANCE_COMPLETE from all Dstores + } } diff --git a/Dstore.class b/Dstore.class new file mode 100644 index 0000000000000000000000000000000000000000..a22d81e7a3552d109af70b70e6b19b6cf1041d70 GIT binary patch literal 3972 zcmX^0Z`VEs1_mpJ3tS9L49x5dEIbUX3~Y=H0$GV=iTXK-dFlH8Nm;4MC5#MgHko;u zC3cJq%o>_uoD3Wcoa_u-JPh0nJd6x%F2yDJMX8JoECu;RC5#M=o*)H$JPiCGS=MBb zC`d$*hd~G=!d{Y@o0?x*0umG9VGv~yV`Sh;%gjl2%g;$kEn;L4@<BKtxTGjEFWs7p zL4rY&ok5C+L7G8^kwFllz%#ETH65gjqad-UIMp++gpom11JeXePc8-y204%y<rx_y zG(0^$u_y`yDN^KNPy&SmYjH_pQ3*SP3L}FuvV|$B75=3q{%KD6rFkjEt`*6t1tppJ zd7KPt4C?F*8axb|3|c6m7+hIglA6oNz?@$Savj9p%zXWTqRhOK;F6-$#9V6*1|3EQ zUFZDV+{C;Tg`CX1RE5N%^wQkaypm#tjKs23g}nR{g{0KfJcWXy{IblH)D$iTJqCSt z1_K@jLk1&824S!pQH^C}U@rh!l9PucRKnO9Oc)vD!CJvF?pK<dlv?DLUzD3z0{0pR zgBc@(Iu6I>mKK8?rjQAViDGsJ3q}Thu!(u8CHlen$=RtTTnttW*6a*6JPfuBc8m-n zaB0uH)Dp*(l%mw)Vnzm@^wbic{N%(OkNn~iP%LRe!U|*#l0s`P26F}nP*gfHGDtxq z8(od3W*9qzGb4i-*s9EYeW%j2wA7;1l<=a=lGGx023JM~DVUspX-PpTG%6tSAS>K? z7(5s}85#J~Q%jIlAh|3vUmsb<nv21O!5ic<A4UdAsLRmRXohhyI5GHvRQWS92qRQM zY_`@66JlUs2;^Z1VhCnr5YJ7_PIb=DOHM2a&n(Gs&d)0@Nz5xL21TeRG<?7@2hS=_ z452&>VW1?zS`P9n$hHU`hDdOlPR_|MP6c@`f}J6TkwFw30-&%9PAw`+ErP{XG(#N7 zeesM8EE=952S+m`@-QSZBr`IwB_@MXJjmrx>%rQ=(QM7mkcuV9gHjVyKtZ0)$RG)} zFf(7@GY=l(5NVL3GI<!X7_u1|_|j8LkQ6X7h-*OYLe*)_#gN933vy{5C{iG)1x*Ra z^(hPmAVq~BN5d6CthI*77xOTbFqASfa1^B`ruby$r7|*z!0bd0UJizGMg~R&E`~~m zDt3ly9)=o*TGY%AF0mLHSc?mCGD{d4)S>Z<Y=~ww#1Y71ppvJahoOO?k&%HnBQY-} zC)GE#xHvIA6=Vh$4G<%c!XDz02zG`RSYZG*AD)Z37@8T{co^D2>3|<xECiP%CTE8f zB_^kGFmy69um*?t2f2d0*3HAv!_dpfz?NEAnwSGlYY^+eCPHcjYt1N-oBMefCNN9{ z`4>_P!2FBkFeI(eMCz#-#=$U|k%804-_a%B#WBPYWYkn1hG`7b85vk|@)J`S8PuR5 zhu0*KK{FW{*n(Vr{li>Arp)GHm;*9}ttd4&zbuu5VICs`i;rh;2uN}P55q!+MIb9O zK_!d`rWGIqmoPGL2Dv&p`Z)SIgUwvV!?2uT1tSAzQEF0RPGVj%I8q?~LUJCKSOS$o zt6()&X-Q^|zDHtlhHqj47sE=1I8eQ|mWN>-!+J&r_OisB(p3L6Mg}nrl#C6n;;gwD zHZp8tXV}ccu!UhOBLgouCxMOhO)Ow!U@j;vVPvp?`Up9qAnrsKfo3LTAw~uk=lqmZ zMh1RR>W5SVA&E&jpo%>=F*6V3EUf-xWZ(s-Bj3aVuzGfeJ&X*(Sd_9e>|<mQL6(5| zjFEvWIKQ+gIn^x_WFI%AMWF}sAR~i-Q+|F)aY<2Pfp2O_Mt({$KO4hgE`}ovN7)&U zfeOjvj0`FurI02|W?or-cB(!kp+U+`x5VU<{Gv)mh8f6#162qy)B~L1idB5_^Rr6} ztdT+tVi2+jipG%2f>dNlsB&aq!m<mJp->~76LWHcGfPsfIT%hdGO&61d-^FbGBWUZ z=H;apIp-u67pE4pGn`>$P{r^cHfOUloMU8=!>^Q)feqqE21N!A1||ju1{MYuP(u#P z;%DFi(?U>MltGMvfq|2Ok%5&#o`I2J8v_G_8v_#qD+2?Al-70zMlI>B4D4DmTN!w@ zWVbR1Xi2hcWf0!Tz`(%BupOj^fsKKIfro*GftP`kfscWYfuBK`L4ZM$L6CuyVF$xb zsF|A>*uW}XwU~`|Fo@e|v4ALBEmja^r^RMu#j=}$BT`yMR+42qgRB*+B<nT?h2;$5 zKARX+g|xRZ=xzkFOhK%TU~lO%7%(U^FfiycSTPtgFoB(8#lXkFz#z)N&LGCX#~{ui z${@iY#URC?!640`%b>zwz#z+D$RNjH$so^Q#h}Pw&7j2K%%IHR!l25qi-CcGn}NB2 zok7`?oxvCsCQ1x!5ce}mGq5tSGcYi8?Pf5K+`(X}wS|FqJA=K?ZU(2w-3%^~+S?f1 zwljEbXYlpa(b~emzKbD%fngg%h|V^KaP4gjQM(zUBSGTZ8Djl(w=pDuyqglan;|Vy zl4To1#&(9B?F{)4Ul$3LZDXjgV$+gj1Ig8aLZWH;|1(xBAa3jO|A&0Ew=uMW0>PP~ zoS}-LnSp_!g`tI^85#&)44~$PCIdf%7K0LlHiJHc4udg+E`vFPK7%uZF@q<A2}2x% zDMJ>68AB0+IYT*vHA5AHB|{B^6+<I~HA4%79YZUFJwqphBREjx8JPbuII}aDvokoc zGenoMGc^APB^h=G4hBYcu=NcLjgWw1*u}ugz|6qFkgLsXB-F8up=&0Ch?b5vi;*@s zs<$!pEoTS@$xQ;w@qy*GF-%#`U<(qRfg(C<IfFV#bS_wwTT5pf!~Eq8A|Ro~U?EN| z9qnxlOF`)ok^uA=xEUB2To||+Tp45-+!@pvJQ$1^ycld5ycs+gd>Min{1~Dc0vOU5 z0vU1`g2Db{XJC~P6%`U-U;tHgP=82*vk3zOBgkw>S_h>C28M7322dr2qzvR;3}w*_ z44_(ykg|9N22g#4qzsf}F-%KlU;x!m5M_)_3@qU6J5L9ki_F+q%~*Fctc(<8+s?4s zkJ*e}lwFi<5kruUD65ebM+++>h_Qn~+>En@l~I&)FN1^?7qc0+D7Pr*A_i$mE>Ui% zf*lNNf?HS_TeumQ|9_$@$sx(LlVJlhLwE~2<MRJ&p^D7dL8h?pWvH}bVK(Cs<q&0G z#2_lkBFX_$0M^gJxcvVsP{2VVC5}Oyfq@~LfrBB3ftw+hL6jknL7pL>L6xC^L4%=) z!H}Vt!GxiN!J46z!IhzmA%vlvA)KL<A(o+vA%h{0A&;S!p`4+Pp^BlNp^>41p_`$R zVG2VN!*qrQhPeza;7H74VEfM?!N9=|LgwrYyV)5wGB7eSGBEsSP-Wmj7iMRuWMpU9 z%gDg+i$U%`g8&0NnhKCUQBW_K1Dx<d#<4T(W(1q~i$Q{cnPEQz0|OJo0fvJN44{m_ baEO7C;RM4e1_lN$21bU{3}+e6Ge`me5d_P; literal 0 HcmV?d00001 diff --git a/Dstore.java b/Dstore.java index 07485ee..da2d0a9 100644 --- a/Dstore.java +++ b/Dstore.java @@ -1,3 +1,147 @@ +import java.io.*; +import java.net.*; +import java.util.Map; +import java.util.HashMap; + public class Dstore { + protected int port; //Port to listen on + protected int cport; //Controller's port to talk to + protected int timeout; //in milliseconds + protected String fileFolder; //Where to store the data locally + public Dstore(int port, int cport, int timeout, String fileFolder) { + this.port = port; + this.cport = cport; + this.timeout = timeout; + this.fileFolder = fileFolder; + } + + public static void main(String[] args) { + try { + int port = Integer.parseInt(args[0]); + int cport = Integer.parseInt(args[1]); + int timeout = Integer.parseInt(args[2]); + String fileFolder = args[3]; + + Dstore dstore = new Dstore(port, cport, timeout, fileFolder); + dstore.start(); + } + catch(IndexOutOfBoundsException e) { + System.out.println("Command line arguments have not been provided"); + return; + } + catch(NumberFormatException e) { + System.out.println("Command line arguments must be integers"); + return; + } + } + + public void start() { + try { + Socket socket = new Socket(InetAddress.getLocalHost(), cport); + BufferedWriter out = new BufferedWriter(new OutputStreamWriter(socket.getOutputStream())); + out.write("JOIN " + port); + out.close(); + socket.close(); + + ServerSocket server = new ServerSocket(port); + + while(true) { + try { + Socket client = server.accept(); + BufferedReader in = new BufferedReader(new InputStreamReader(client.getInputStream())); + String[] message = in.readLine().split(" "); + handleMessage(message, client); + in.close(); + } + catch(Exception e) { + e.printStackTrace(); + } + } + } + catch(Exception e) { + e.printStackTrace(); + } + } + + void handleMessage(String[] message, Socket client) { + if(message[0].equals("STORE")) { + store(client, message[1], Integer.parseInt(message[2])); + } + else if(message[0].equals("LOAD_DATA")) { + load(client, message[1]); + } + else if(message[0].equals("REMOVE")) { + remove(client, message[1]); + } + else if(message[0].equals("LIST")) { + list(client); + } + else if(message[0].equals("REBALANCE")) { + rebalance(client, message); + } + } + + void store(Socket client, String filename, int filesize) { + //Send ACK message to client + + //Receive file content from client + + //Store the file data in fileFolder + + //Send STORE_ACK message to the Controller + } + + void load(Socket client, String filename) { + //Send the content of the file in fileFolder to the client + } + + void remove(Socket client, String filename) { + //Remove the file from fileFolder + + //Send REMOVE_ACK message to client (the controller) + } + + void list(Socket client) { + //Send a list of all files in fileFolder to client (the controller) + } + + void rebalance(Socket client, String[] message) { + //Interpret files to send and files to remove from the message + Map<String,Integer[]> filesToSend; + String[] filesToRemove; + int index; + + int numberToSend = Integer.parseInt(message[1]); + index = 2; + filesToSend = new HashMap<String,Integer[]>(numberToSend); + for(int i=0; i<numberToSend; i++) { + String name = message[index]; + index++; + + int numberOfReceivers = Integer.parseInt(message[index]); + index++; + Integer[] receivers = new Integer[numberOfReceivers]; + for(int j=0; j<numberOfReceivers; j++) { + receivers[j] = Integer.parseInt(message[index]); + index++; + } + + filesToSend.put(name, receivers); + } + + int numberToRemove = Integer.parseInt(message[index]); + index++; + filesToRemove = new String[numberToRemove]; + for(int k=0; k<numberToRemove; k++) { + filesToRemove[k] = message[index]; + index++; + } + + //Send each file to send to the Dstore at the specified port number + + //Remove each file to remove from fileFolder + + //Send REBALANCE_COMPLETE message to client (the controller) + } } -- GitLab