From 5ee1ed8217d8839198eb08620bf6c3bbe47af96c Mon Sep 17 00:00:00 2001 From: Daniel <dl3g19@soton.ac.uk> Date: Fri, 9 Apr 2021 13:47:09 +0100 Subject: [PATCH] Failure handling started Threading started --- Controller$IndexEntry.class | Bin 0 -> 629 bytes Controller.class | Bin 3078 -> 5880 bytes Controller.java | 254 ++++++++++++++++++++++++++++++++---- Dstore.class | Bin 3972 -> 5800 bytes Dstore.java | 93 +++++++++++-- 5 files changed, 307 insertions(+), 40 deletions(-) create mode 100644 Controller$IndexEntry.class diff --git a/Controller$IndexEntry.class b/Controller$IndexEntry.class new file mode 100644 index 0000000000000000000000000000000000000000..1a3af9c4b1553341ae7057985d0c3bb2d51cc7fd GIT binary patch literal 629 zcmX^0Z`VEs1_mnzElvg|24;2!79Ivx1~x_pQRn=;lA`>aoYW!}&%Bh>3fH`nqDn>v zwvvp@Vif~M1|A=T3TrL~4hBwk1}+{3ZU!Dk27#=^vPAuy#JqHU|D>$c<Pt^(Hk-`6 z%o00B24)S-Fpyn*JPiB{0*nkCX_+~x#hF#9j0}vP><mJT48maDr6rj;`i@0KiIqN? z#U)$}f()D>ZK6C3VhrMp3>?KJ`9-NIPL+%d0zL>0Ahp&U43dltf?x#-nRyBYMfvGP zsl~-0?b199G7Pee3~a?Ei6y1Qj0{2$&0x0%mlS2@rCT#Ha0X|l=Ovbu7Ns&Ws6rHg zodYu62Eqqx@XRYoO;0VdwzFnr;MYJ75X~?~1{UZ1lvG9rexJ;|RKL>Pq|~C2#H5^5 zMh33n{L-T2RJTkJhaVA)dLVx=GI02%7MFPB7niUzs53HfA><hucs%p+Qj45(5{rvd ziy0ZX5HZ7`$iT$F!N9=4!oUa$SOx|LMo>sGFfzz7FfcGN$TKj4yw1QNAOMPCuskCJ zH-iELBZDFX1A_+xGXoO?1A~H=_ErW)t?dl#T1;CRc(ryj2u5ya5Z=llq9r7;l|gDF zSf>&L1A`a?D+2?AFasxp2m>F3D1#(}7=t{6I0G|-GT25H230Vv#-PE#z`(`8$iT#) J#-Pc-0sx8mhDZPa literal 0 HcmV?d00001 diff --git a/Controller.class b/Controller.class index 366a48b3ad9a08deaf72845574e63c4516c2d101..338ca1cac64638e4f4032704a11ffa1948b256ee 100644 GIT binary patch literal 5880 zcmX^0Z`VEs1_moeXD$XN24;2!79Ivx1~x_pfvm)`ME#t^ymWp4q^#8B5=I6#o6Nk- z5<5l)W)00SP6iGJPId+^9tLg(9!3T(=lr~qqWqkk)FMU(*5rcxq7p_1Mo*9`J{|^s zkP`MHx5VU<{34K;AP<8OSga&7H#NVs1SBTH!ypP0<1b20O3X>jOHK_)Ey~PKVP_C$ zWDo{Br?ez9N8hoiD6!Hfv$%wdL5zVJq*IEAL7G8^k%2v>xFo+QwV07Xzz3loq|}<7 zL5`6@5FzN1Se)UTSir>~3o$~Ghe3%!nUR4tGcP5zf{}qA$pGKP0&6Y?RR%S726Y|= z4F*k^N#M}&%qvMv2l<+#AhD=8)ibYzkwH`gq8+RtxTGjEFWp+xlZ%0aK^qjjI*bhL z8lIk>o|<7GF+CmzeNd3I7MCOzm9R4yGBPNGT?*Fj338BsX^DTDQ+{b)O0jE2a%w?I zW_}(ggE4~%JA)|?gBgQ4N`wVh7MG;vGBPmdmx3GvaZ6^ten3%XUP*9CQEFnYH3x$w zBZIDUer|4JUW!6aW?rg7Vo`c&Zfaghu|h^-S*k)_eu+X-YHFTBK~a8LW=d)b7lSo} z4LgG^4}%?pJuFI4jb&tDF92DRlZPV^!q^!c85!ge-t#NXO-e0t%P-1JEP;ECgTa}R zK^=$Va!ZRr4pYd41bi_&gDWG0DA>fj)Dr#R)S|M~qTu}G?9>u21{Vf*P>6dlGO%cX zgWH9{i-*CR!H1E7EioAsl8g+(8k&$G0&53rv}R}UV`LBm8=9G~?^K$WmRgjW5|o;l zl3K*h5WvVF36t~8D=38qDny!#A&4QEogsvWA(SDEk%1qq5MeYU17CV-36d^G2631J zGV}G3B&@j@{23xZ&W~hdkbovBG$o)E<jW8ZQWOL73B)6r`T7vISVQFFco^at5*Qgc zic%9(d@}P=85u-ic7hWbG>vjFBr!5DDsVBRFr>0Gr13DMGi0EqEO1(8WMC~W$jK~W zWKf5?3fU0NXow?_#kd)=7_!+Ja(EbW8S)qz_`qQT&ds3Q$;iM`oLQ9$E?7J{7z!8} zSiJl_{Xotr;$bLe0F^zdg{6r(#b~JtQZiX<MzJ%LVTrErqRbLdbX71iNP%t4%-8oX zErG`uL>?4hRXhyU3^j}l{OPGB$SOeL4ow_rdabz_${FfF;a<<kAPEfybTy#RFJ)*1 zscJ%r9f-}=nqeFaEsPATu0cWmL0k-_3~f9N?V!BHS`KnLNTid8p$l9%B<JK8r-F)$ z9v+5XhCW6H-i*Y&l$=!G)Z*gA^i)t>U<r6gR3K6xG(mK-Gfaf#U~oLavmO`21cu2x z3{yaM@`Lkoa7kivc1TfTaw-SIG)4w)P)VPeSDLD)rw1w(XYer0WSGUsz+RS^Q=00Z z#>gP1;Ry{hu+h*0(i)To=I}7g1w|)wVoD0gFY|dA7JzKxgqQgo42u{USc60SgIqxh zmhdnvWmpDEaG;W%kwFa_(nvmncoA$BG#P_}bp;~>i;usf3&@aFJPfND)-W=#<m4x& zfC37l1+N)VAVbzMGOz`?f{fu{*Z^kx`iFttwTXveGss<RMX9;@WvLttTS4}D28V#` z+s?zVgJCDgzRcnhP*%gRPcw|0VK>7bc80w?4EsO<!V6BH;2hzbSis1@ot&Rnl9-uS z?44T4&Ts%0SFq9+l76@t_A}H8F)%P3=3zL(aFmfjJU1~r)j2;eIk6->vn0bgKd-nX zF|VW;5pF2K1aTI!5Xi*iJPaqmX%HOyAdyo%45vY{#F~~<TATqA>11a($H*WGN(zul zUj<xEx#pD=RdO+$Ww-!J@)sEy_%(bGYOEo?zRbgLh2bhA19L%X2_u6AG(y3yfaYf; zcR@vv(-Tw(l$@^fFx+6c$;iN*o(ihyP}3b$9Vm_7=3%%4Dt9<C^Gd=%$&{1fEW<q> zhWnsa0|&U#km6Ly!SIlgfdicM;zRsF-h2!S3J(s3r;H4IU{wmq`MCu-sU@i(1D^9R zykL0A$iP-yl2}q&%*Y^w<Zx(G<Y0Ks$iN3N)7jrQz{fSjm4o3eBZCMy&nvk2y9USm z`G>^2MtBB?fD*-fke@!F#Te8tSb_oMkWV}epFu4rj<n32R8UDO#K6e#6_h5vfpa5D zc7!|Z2P1<JB=ss}f-Cv-qSWGIP_X{uVffAPhmnB;RD33u<QIWVf!YmDqMjwGMIdo& zZiash|JfNCco-QOnHU*_5UCiZgpq+gBeB>owW0)Eyhm{}{9|O{VPs`wV`N~-0}G>8 zMNqGDF>)|+vNLk=Fmf~UFfxdMO$Oyy&%D$U$CMP1TNxR6(o;)(@{<#DJn}&~O#)V_ zp((WHVhCd7<6&TB<Y#1%f~GihHJ+MbLJUj{hd~kF2C5V~IT%G48Tg@L1dDkNMlnVP z{vcOSNe~Z<Qf`Lbj1oMIl8jP}3~bq{mBFC&C<t>6ILv}mORTvWWf*?3Gs-eDAm#U9 zP&;KmgAF4CXK-eEUSdgUQ7R*YDkLp|wSwwM8wekqO<*Oeoi!ta6+|4Y&Ns0DSs65@ zk#ZnXF0}?}W^v9>Nd+}_GxJg*4Y82Kq?}Yn2A15!%sf!6W2sU!!x$NO!Igq<VgXn^ zJEI08gD@7Qj0{}xilvyHQJay0DcX~ffh#z_v?w{%EfZuFKcaxp14S|;1Gislafxp# zs9gwZ$~)!fmlT&2B^LOmmSp6o6!Wt&8geljF&eWon(#0jVKilAPyuO#bXGF+%JQ>Q z^&x(T6ors(M<pY}3~0=Q0}-kaVyFkWDlJy=$<NO&EwF}!DOefAAY>5~jUkl<smPL0 z<;Y<HP0-N%12z<DgmYp}PH<*Psx=3rIU|D@q(pG^337FGiG&mg!3vBVjFyZHyr4QL z-Yv-ASAh{EWX;IH0ZD=ijEoFCo_TqxMb0^i#l@h`4;P{=W@of#WKhKj9c-RvXLMv_ zki)N(k%0{oLJW!w91Khh3=C`xET9e`0|NsKsFw$(g&26iv?!F8W{_cEWRz!MU|?cY zU|<1twBUS222KV>237_g21Z6D1_p){24)6U1_lNLt?dkqTGCq?*tKM~GVp52Ze<YA z5@p%SAgr~UK`e4RgTz(_Nv+)svXR>v<hL>?Y-C_yU}RKgU|>*U;9y{2kY!+DkYnIv zkZ0gyP+$;dP-Ku~P-0MFP-f6)P+{O?RAE$w+PIa0je(7Ufx$zI*=PrYij5Wvh_cmU z1yOcdY#_>vWj6yyq_m8zD9d&REh|<@)@=;B%NfLdHZd3pS!`pl+6ZPjfmj>C;bOpG z%wPcW2ZJqx0|OJ-WmXJ)3=9k!4D1Y=415e)45AF$46+Qm4B8BO3<eD5U>zn5h72|g zMhvzLCJc5ArVMTjW(@8O7K~~P3=G^1%*E^s2A=E;4xm8OXJCVPfzb-oWoKYun6#V0 zB~oh(1N(LcHy`b744%Hb8GIvmGx$g9Y-0%A&JezxA<9p88$+y6;x>k4D;BLS43aw; zG8q}PwlMH6Vla|qVKx%V-^Nfli$QHSLuur0hH{Y3%Iyrb+Zh@_-WF=!#?T7#eaCVJ zE-gtGoox)=y4x7~m;ayatFw(^5-2>TZey@o{=XUIfJvah;9;m_s9~7Ez`)SM(8DkR z8W?^I!VC-yHVixr_6&jy4h$*`jtu$?P7K}*&J6JkE(~c5t_;}>ZVY)09t@=nUJR8C z0Sq+^J`A-CehiHa{tTTA0SrA1!3@0&Aq*24!oUG!#K8K8frp*J<sU;LJ3|mVgD*Ry zI@^B+HU=hk1`c)x7Y0Ulh6(=}_(3!Whyb~&m!Xe=37q&|F|aT&GcYhr(1wP?Oa@s{ z7-@r(#14k(I~is(F@$SvW0<#`As3`*;WmcCnGAedI@&Bo+ZYxvX9x$0E(eS9gGHIQ zF|1t9-~bX{izLhf7S;p_uLldm%-Fb`K^P>u1uV*;rL&D;8z^xjC!t6NZiXlZIfiHk zRfbpw1BN&TYlZ{{H-<!pAckayScVjaOolXu5{7h!dWKAfE`}_IeujK-2yrm5sfvn; ziSjdQg0m<iI9`@AEQ5pq<75VA1}+8$h6h?(7!-7NGVEevn8~2Dn_+*X_BMvf?F<L~ zbai$y9AaSD#&Aq`8^cN6Z475N?q)a}skNQqydPMT?oNhF%nae6B(;ajiiO#XRg@LP zS;U|Z3QJMeoebBQ7!EL~TCqv8h_XtuZDY9Que*iet|S{Icw1N*m;XP9WG6V`J=n%@ zOp?WlO_FV&89TEXhbV_A`yvK8Nj6aqJu^;8R#1{YCdn$wxsBmb3kT!!{|`aQ^rWuP zldTNTBw2-Cf%LO(V>k&CK7$C+ECwkC28JRAUWQ@@Nrn;zWrk7)b%rtqZH96N9foQK zQ-&G_M}~R^Plg7DP=-c^RE8#oe1>L*YK8`eMuv8VK86m4sSKSA>lwNjwlQ=w>}BX= zILy$?aDrhH!#Rej3|ASZG2CL9&TyY$1~>)`8CZ4xG03wsoMmU!V*JHm@}EH-G?2yt zPOvEuadw9NAX!ERh93-yC{eYVVGSgz7)~=VgG;(9aHxZ$Y8Hbca<H*TvIxBad7A~~ zZI*2eXP`>ofx`c+6^kUx7KXdbu=v=<aLbGpB=pgY4cQoWNp?|IQMO$SU%=5W$qq7& z9Tf545?z^rfx(P{mw|y{HUlri90n<dxeSU7^BB|^<}+wBEM~A`Si<1Tu#_Q$VHHCn z!)k_nhBXW&3~L!G8P<UVgqeX+8^w#8;i=Jvfd%Zv{ZM!BVq%!ZAdBQSKV7f~VJQ)+ z4B|aMU7_z=8J>Ys)=q|>j0_;}?qvAO$S{+^TaslbBQqm75wh-P2#Van$PP*tx3)9# zT7mrzN|m7GZpALiuCt3#fPrBfqaecPl5E=;g_i%{s4L7Uyp`b@l8ZpL2s4VJggPRz zg)xXSFfeRn;APmvAjz<qL7rg?gEGTb26cv=45kdb7_1rgGQ=?KW5{LL&rr>9fT4}y zAVW99A%+PIhZ&|a9ATKjaFSs@!x@G(3}+d(Fq~u9$#9-wAHxN3IPx<vtN&$?10^7a zf4>+4QNnZw!%k?jX@yp>j!2I71qIwrMsY@los80q41T)Ma1z&r22+X^i;g7AHb%MS z|DWk>V>qdU68mxt><kPHR~UF0t}^H|Tw^e0xXxg~aD%~y;U<Fv!!5Au_!yY}GDw5c zHOO%?D2`hIDxkpyuq3D!2Vqc<L8@~$21W)3hL=bhSQvG{2~U?%4_bzuWz=V2U=U$o nW;9@6WHbYl7GTnf(T0J60o1HN%fP{C%fP~D$LPT5#2^U(P&e9& delta 1816 zcmeyN+a|$v>ff$?3=9lb47(?C-Q(qAkYkW%XHei_P-IY=Y|3b@s=~v-%Am@~z^>ux z>FKE%#>K$Fpw7df!NA7Iz*=0ASX9E!pf!0aqXxGQgDyLR9uI>)gTdqzjK+*clm9Rp zahouhvNM?RFqkt~OtxgQVzioE!lc4t!^ogM`9Guh<jqV%CU%SrqFIS$iTZh|CHlds zMP;c)!THJAsU=(twhRs+`yCk>STw+v+cG%wFt{+dPX5WH$ml*<npv9BbFw+J9Je=v z4?BY|4}%|r|KtQ_dkznVK#<O$$;+8#IoudRK%CIYXP6~8+!(@n7$O)VC%<7<5sGGH zU{v5@h-HXlXNc!vNMJ~uti<BZoy?HJ&XCH(kj9Wcxr9YQD3g(a#mnE*52P}ihara{ zck(h8Wybu;r&y#J3nxEgk>~JcDCS`(VJMx<%c@^rz)%jdu!506Qo{%Ax6FKf|I(6z z(vskkqSVA(Yfwn$F;sz6RWmXOBUFVKWtOBCS!;%IFw`<Ku(}2X`3G?^<T2FqFf=e& zFfy=~gA{;7ns^wR!O@VMlV6+)ikMa&hBk(FMuy3|S*?QI7@F7_x)>RR!S?4Q=B4Yq zRwSnulw{`TaWQl<^zbnBg0%A&6lLa>1eYWxXNMFeCZ}>R^fNMWJLl(>WagEo>gnl$ z62(LwhDi*QCws7|S|&40<zbk{kju!xoS2dVvUUa!!%UD#oJFZgi8+aRATwq&GOz}R z_y@Uy6wKven8z@G@-jAKpM{JJEI$5@E+FB>JPb=1<})&|<m4x&a4;-mWMB(&_4N+} z>si6Wuo5K6R+O5XUzW<juo|S(GdKjKb1e_UI)?R=quJ#qcXOLf{>Sa%$gqQvK{y%` z*kI2FmlS2@rCT#Ha0Ta=7A2>;W#*(ZGVp^uR+OKUlUk$)3K#}O1`Y-$1_lP!$&P#) z3Je<<7#NrsHZpKBFoF_310%yG1_p*u24)6U1_lPz$us#>>NhelFfcM~W?*2DV_;`s zVBlk5Vc=)rWDsEBV-REzW)NbKWDsUhVGv>9WZ1&66>7#-1~vvZ1_lNXEoP$~46-&_ zEFj8OixouKX|aJQGnU;99Ffv8vZ5^88I-M9C0VyIs4Zs@_u0gtEo8Wj!FZ$3CWd-# zA!`tGBiP*r48{za3=9kg47LoG3`}5WSuyZ2Ffd3kuro+9@G(d+h%!hs$TG+>Xfwz$ z7%-?a7&9m^m@p_Z*f1zD*fOXv*fFRwxG|_PxHD)lY-3<x;AUVhW@pg!WM{Ahg@XnI z8^jNcRt&5RZ1oHb43l;<*hXq?VPN0RVDF>7jls!xH-lT`ZU&D?oox(W+Zh73GX(qT zZes`&irU5yW5uGig+X#BLlPr{))ofdMGQugEX+ni8QU1LW-+MkX2^@&%}@Z+S+t#@ zY&%0G$m2pa+ZgIV{%>5)z@;V0qO*;mMRyxR$MXO6lYMozF?551q;DI8@$&!8AQyCl zf`f;llA(s7lYxPuhoOg|6B-<T48jZy3_1*;vPqD^fI)@9kU^iph{2n|m?56QgdvT= zlp&kJj3JM~f}xbbilLIho}q@phM|_hmZ6csj-ivmo}q`qiJ_OlnPDP>YdtuOj2Kw| zFz~Q5*#2WkWM}YZXK-U@*v|H!fsKKQoq>a$!IpuMouTtT13!r701+T(wK23qf|B7R z0}BH)0|P^aHZ&w=GRT5LNgJFjb}&rX$uNb9AzW)4!}R40p&&)GwlQSQWZ=`P*U@G% z+Qu+vIfFe&cmY%xBD!cfgC<CHDT?Uw<qX0g(N$nk4lSK+3~NBC7?NC!7<fSGl7XAS zlR=Kbi$Rsahrxism%*CBpTUhGfFY0}h#`g{m?51Zl%a?rte&BqAp#r*><p}`qN1Yw z3=E+B4fU8L0|VG&j35gj*#wk)7#Lz07(n@*kg@~@22jq&tt=j5TnYmNs1QI>2g=46 cwq`IefJ!GMWsIQgi|)7_21bUR;Jm*J0FRJmzyJUM diff --git a/Controller.java b/Controller.java index c44d8a7..d9661c4 100644 --- a/Controller.java +++ b/Controller.java @@ -1,7 +1,10 @@ import java.io.*; import java.net.*; +import java.lang.Runnable; import java.util.List; import java.util.ArrayList; +import java.util.Map; +import java.util.HashMap; public class Controller { protected int cport; //Port to listen on @@ -9,14 +12,118 @@ public class Controller { protected int timeout; //in milliseconds protected int rebalancePeriod; //How long to wait to start the next rebalance operation, in milliseconds + protected class IndexEntry { + protected int filesize; + protected List<Integer> storedBy; + protected int numberToStore; + protected String status; + protected Object storeAckLock; + + public IndexEntry() { + filesize = -1; + storedBy = new SyncList(); + numberToStore = 0; + status = "store in progress"; + storeAckLock = new Object(); + } + + public synchronized void setFilesize(int filesize) { + this.filesize = filesize; + } + + public synchronized int getFilesize() { + return filesize; + } + + public void addStoredBy(int dstore) { + storedBy.add(new Integer(dstore)); + if(storedBy.size() == numberToStore) storeAckLock.notify(); + } + + public void addStoredBy(List<Integer> dstores) { + storedBy.addAll(dstores); + if(storedBy.size() == numberToStore) storeAckLock.notify(); + } + + public List<Integer> getStoredBy() { + return storedBy; + } + + public synchronized void setNumberToStore(int i) { + numberToStore = i; + } + + public synchronized void setStatus(String status) { + this.status = status; + } + + public synchronized int getStatus() { + return status; + } + + public Object getLock() { + return storeAckLock; + } + } + + protected class SyncList extends ArrayList<Integer> { + public SyncList() { + super(); + } + + @Override + public boolean add(Integer i) { + synchronized(this) { + super.add(i); + } + } + + @Override + public boolean addAll(Collection<Integer> c) { + synchronized(this) { + super.addAll(c); + } + } + + @Override + public Integer get(int i) { + synchronized(this) { + super.get(i); + } + } + + @Override + public int size() { + synchronized(this) { + super.size(); + } + } + + @Override + public boolean remove(int i) { + synchronized(this) { + super.remove(i); + } + } + + @Override + public boolean remove(Integer i) { + synchronized(this) { + super.remove(i); + } + } + } + protected List<Integer> dstores; + protected Map<String,IndexEntry> index; public Controller(int cport, int rFactor, int timeout, int rebalancePeriod) { this.cport = cport; this.rFactor = rFactor; this.timeout = timeout; this.rebalancePeriod = rebalancePeriod; - dstores = new ArrayList<Integer>(); + dstores = new SyncList(); + index = new HashMap<String,IndexEntry>(); } public static void main(String[] args) { @@ -58,6 +165,7 @@ public class Controller { in.close(); } catch(Exception e) { + //Log error e.printStackTrace(); System.out.println("Continue..."); } @@ -68,7 +176,7 @@ public class Controller { } } - void handleMessage(String[] message, Socket client) { + void handleMessage(String[] message, Socket client) throws Exception { if(message[0].equals("JOIN")) { dstores.add(Integer.parseInt(message[1])); rebalance(); @@ -76,8 +184,14 @@ public class Controller { else if(message[0].equals("STORE")) { store(client, message[1]); } + else if(message[0].equals("STORE_ACK")) { + storeAck(client, message[1]); + } else if(message[0].equals("LOAD")) { - load(client, message[1]); + load(client, message[1], false); + } + else if(message[0].equals("RELOAD")) { + load(client, message[1], true); } else if(message[0].equals("REMOVE")) { remove(client, message[1]); @@ -85,50 +199,126 @@ public class Controller { else if(message[0].equals("LIST")) { list(client); } + else { + //Log error and continue (throw exception?) + } } - void store(Socket client, String filename) { - //Update index to "store in progress" - - //Select Dstores - int[] storesToStore = new int[rFactor]; - for(int i=0; i<rFactor; i++) { - storesToStore[i] = dstores.get(i).intValue(); + void store(Socket client, String filename) throws Exception { + new Thread(() -> { + try { + if(index.containsKey(filename)) { + PrintWriter out = new PrintWriter(client.getOutputStream()); + out.print("ERROR ALREADY_EXISTS " + filename); + out.flush(); + out.close(); + return; + } + + //Update index to "store in progress" + IndexEntry entry = new IndexEntry(); + index.put(filename, entry); + + //Select Dstores + int[] storesToStore = new int[rFactor]; + for(int i=0; i<rFactor; i++) { + Integer thisStore = dstores.get(i); + storesToStore[i] = thisStore.intValue(); + } + entry.setNumberToStore(rFactor); + + //Send STORE_TO message + PrintWriter out = new PrintWriter(client.getOutputStream()); + out.print("STORE_TO"); + for(int port : storesToStore) { + out.print(" " + port); + } + out.flush(); + + //Wait for STORE_ACKs from datastores in storesToStore + try { + entry.getLock().wait(timeout); + } + catch(InterruptedException e) { + e.printStackTrace(); + } + + if(entry.getStoredBy().size() < rFactor) { + //Log error + } + + //Update index to "store complete" + entry.status = "store complete"; + + //Send STORE_COMPLETE message + out.print("STORE_COMPLETE"); + out.flush(); + out.close(); + } + catch(IOException e) { + e.printStackTrace(); + } + }).start(); + } + + void storeAck(Socket client, String filename) throws Exception { + if(!index.containsKey(filename)) { + //Throw logging exception + return; } - //Send STORE_TO message - PrintWriter out = new PrintWriter(client.getOutputStream()); - out.print("STORE_TO"); - for(int port : storesToStore) { - out.print(" "); - out.print(port); + IndexEntry thisEntry = index.get(filename); + thisEntry.addStoredBy(new Integer(client.getPort())); + } + + void load(Socket client, String filename, boolean reload) throws Exception { + if(!index.containsKey(filename)) { + PrintWriter out = new PrintWriter(client.getOutputStream()); + out.print("ERROR DOES_NOT_EXIST"); + out.flush(); + out.close(); } - out.flush(); - //Wait for STORE_ACKs from datastores in storesToStore + //Select a Dstore which contains the file + IndexEntry thisEntry = index.get(filename); + int thisStore = thisEntry.storedBy.get(0).intValue(); + int thisSize = thisEntry.filesize; - //Update index to "store complete" + // !!TO DO: RELOAD COMMAND!! - //Send STORE_COMPLETE message - out.print("STORE_COMPLETE"); + //Send LOAD_FROM message + PrintWriter out = new PrintWriter(client.getOutputStream()); + out.print("LOAD_FROM " + thisStore + " " + thisSize); out.flush(); out.close(); } - void load(Socket client, String filename) { - //Select a Dstore which contains the file + void remove(Socket client, String filename) throws Exception { + if(!index.containsKey(filename)) { + PrintWriter clientOut = new PrintWriter(client.getOutputStream()); + clientOut.print("ERROR DOES_NOT_EXIST"); + clientOut.flush(); + clientOut.close(); + } - //Send LOAD_FROM message - } - - void remove(Socket client, String filename) { //Update index to "remove in progress" + IndexEntry entry = index.get(filename); + entry.status = "remove in progress"; //Send REMOVE message to all Dstores storing the file + for(Integer dstore : entry.storedBy) { + Socket socket = new Socket(InetAddress.getLocalHost(), dstore.intValue()); + PrintWriter out = new PrintWriter(socket.getOutputStream()); + out.write("REMOVE " + filename); + out.flush(); + out.close(); + socket.close(); + } //Wait for REMOVE_ACKs from all Dstores which were sent the REMOVE message //Update index to "remove complete" + entry.status = "remove complete"; //Send REMOVE_COMPLETE to client PrintWriter clientOut = new PrintWriter(client.getOutputStream()); @@ -137,11 +327,17 @@ public class Controller { clientOut.close(); } - void list(Socket client) { + void list(Socket client) throws Exception { //Send file list to client + PrintWriter out = new PrintWriter(client.getOutputStream()); + for(String name : index.keySet()) { + out.println(name); + } + out.flush(); + out.close(); } - void rebalance() { + void rebalance() throws Exception { //Send LIST message to each Dstore and receive their file list //Create a new file allocation so that: diff --git a/Dstore.class b/Dstore.class index a22d81e7a3552d109af70b70e6b19b6cf1041d70..508e972562b57c9e441796e3b9ff255a2b8d89b9 100644 GIT binary patch delta 2829 zcmZpXU!lu&>ff$?3=9lbjJg}S7P2zhPhP{SD(;h&SeB@tnXm6unwFMYl$sKhnwXMW zH2DH+JEQ&N2sTv-94f3Q*Rp-67lvsMD9X$$2`|blNiE`HSjphc!LXK*f!Wd78^l}B z!?1zDh>?M{0Hg{evWbUbGXonV18Z7NX>kTfB!Zn`8zX}R%sjWuoK*kPl7iBb;F6-$ z#9Sc;Mur_c3_%P#85t}zAa2S@ocxQCYw`hBw#n>_yz(%?ymbBGlA_GKbZah#tqi+) z7+4whFfxcu{>Uio5v3W%#ZbwxkB4DD!vRJHj`Y+Lr^=GlVnzlQ4b5mLkcLA%3}Fn1 z85vkLqMbmhw=!%32}cMqFfkklx!?pNgWBYej3V`z7P2#(Vq_3U_Je00+zVU`CmGIw zTw?@co#SCR&v1c}fu$%lF$HRyC&+D=KuQjSlti#ITxDe7K{k$y;R?eU5OITtq5dYr zEk*{ml+>Kml2k?pW)00K4u(673|v93zW!mZ@!-I}!f=m=;XcCyMh4ED%;FMIuog2i z2x@3XLtLMkuMZNl2ATPYhv6|pDI)`WdTNPZVs0us!&62EL9q3uC7C(;9*M;nzKI1~ z3{M#1xELhZ8D8)(ykvMaxsP3y`whcec7}I64DT5}Oy0*HSkKMyhT#(r!)Jysj0|kq zsg=R0C17hIwt|fdPA#$KX86YNot@za55rG}UyKaA2+P14I6&zsu_V8UkwHvD6G^XU zNoo;D+?t!=55r$}hJQQ^|3RT41T`QqGe;k$hLM3iBeB>owW5TZ;SVDt4<i#JGa~~_ zUTQ@NBZCOc*qp??bbbG%tkh(XtKKlO@-VV7vNJL;r-NLsrZG8=Q_d19&(6rn$RGj@ zL9j;8ypq)P)FNvRMs7w1p&(Z$M;}K&XIBMhf8PKf*AUmq{2ZQ6j0{|^70IauC7Jno z#q5m2j0{ZCPV9`Lj12rxvtY5o&M3~vAPf}*TL4MXle;(!`59%n7-bpd*cs({7=joT zCZFOsY{jU=$iU|1@9C$&$ib+>$iU2~&&a{3#>l`O9O54YP74Z*j0`-Ud3mWt&N+$2 z#i_;YjGBxLs+09O#V5b#Wam}!$<NO&EnsKVW@M0qin=f{@=P}6(qdxdot(&}z`@AJ z$j`vY$TPW<D-|U6fy<7GQDCwGw>Tdo4+8_EECVY8I|Bnl&*V_<d}B%0Z47lFqG|d6 zQ&udJEZZ1bm;XQDqrHuxbt3~4gEK=VLk&YS0|P@FLmNXg1IWa$-1_w;4CV}#3^oik z43-Ra4Au<I3^ojH4E7A|3=Rz43{H#!3=9nN49x!+oY@)7*%_SJ8KNuL8Jhnya56Bm zGjM<ikf99>jSNg6XEE$%U}a!oU|`78W;PP)*v8N`lR;EVN1Me+o7HFsgShTChQ8$t z!63;=+ZehgFXHj7XXIsIV9;aWW?*1&W#DFTV~}O=V9;RjWH4s%X0T)MVen+|V+djJ zXNX}4WJqTSV#s9(2Rn?NfmK#iR7ikPkb!}Lg@F<5zX=Qzp-%bFz{bGIz`!tJH^a(E z?QINh+Zk3{u}ZQEt=Yz~4ir`!LBy8&-3(hJwYD$_>+E9K&cMLDone<1o2(hTDEmAH z6;MDHZDZJL#UaTi$+3;$pd{Nih9jcvk{tU|%-EOzzYF3Y1zE70!8~#YgQeCM2Hx!q z_E6h6Bsp|;F&txH0NKc~jbS6mj4c}(7(fxgz`)?eAjDA5z!1&A%MimL!Vt?K$q>gN z$B@8a#E{Hj%8<ff#gNM2!jQ(`&5+Izz>vWZ!H~%i&5*^Az>v#O#gNC)%8<{{#ZbV| z&rk>s5?%(Te+;JV3@h0gwlb=s1k5ys>Ck|g!@$D8#=yYfznkG?q*guL>)RPl`+>Xy z3o8}@jy)V!th%6JI?FVlK~j=sH$!ElB<psDiy#lNY-8BC{QqO!Z48$|;e_NbHE_U} zG6*r0F~~DiFsL$AGUzi@G1xLxGq^I;F?cf6Gx#wyfSp{=4|20AJHttkvl)ftQCvL- z?&=r@7I4Zk*v)VSDZaKdT-(NQJsip1e!4=pw=t~K-Nvv{cN+uPRgB;?CI$9&3xg;_ zJA)!a2ZK68CxbRaeHVj1LpO@03mFzcgK9bhGdP#IBbo0Da{$=tyMDU+%vhMsSVdVy zSr#$K=!&u$S+VJW(#6AV4C{2ZF>GvMWnBLMu@2NNOklUjfsO5F;9;nrz@X1Ck->st z5`zuH1O_LDDGY85Q^7$a#lZBRL5YEloq+?K{Z_IwiZC)VF#KRZ4W#7^D<JM=TExJ@ zz{kMA=%m8}%5G+CtY)mc8J<LnvTbL0=En?5)uL>R7=m;_uH<N8Wz-R!%)_T4{5-gY zm9d4Jaryrzx{@4{Tss+FPj=_CsMp=e@R5;WC&O1p1}hFpj-3p@85w3WSW9y3WMp7u zIKWV8#i=XFxs#EFiQxbvhZUD3*FH0DW-}g99#QT^3<8o|qC7@cye&M8%m2S${{NsP z3&@mxW}M7sT%uf}oQoLLBsoO6jI6j}VJgWD&V}Hhy&hiQ!o|4!|4pR82k94P<N_r> zEcvO8L6L!hVJ-s)!#oCVhWQMl3=0_K85T0AGAv@yU|7On$gq^bgkc$jHN$cSSB4b~ zAq*=S!Wot`#4@a5$Y5B&kjJo|p`2j@LlwhDhDL@>4E5a%n;E7sY+;zru!&(V!w!Z6 z3_BTKGVEb^!?2%`jo|>J48wj#C59u6`V2=I%^8j{+AthvbYwUIj=e(+JpUOaKv4&d zJw`EhhBpk1p!oaGpvu4lja0ZWJ3}QSJ0k}p1H&%{x&I6T4D9R-^(Z<S82&SeGH@bm zWM>p(1RME_LE;a{%s>Abv>13Xm9R7XVP~iWna>C^Mi`qh><oWEmNWfh;0BxfgTY=C wDPJ%%N`MP4Nk%CK1_luZW=3fSMn*+2sSGAn8PypW7`PZ188sNS7<CvV0eGB}cmMzZ delta 1037 zcmZ3X+ak|(>ff$?3=9lb3>P+XEo5a@*NC1xkw=0nFSSHJI6pZ%wPf-O)^<kq$pvhx zlh?9+P!!BcEKAfcEy>K$_ed<x@J%e>Vpz!#$HgGQu$G5m9mD#`XE;^4H!^HuXV}cc zu!Ujk<UgE&lS{Zgjo2CXFfs^7`+yC|Nz6;v4=yRn%uBasXV}NcAc8F6nOBmUo?0~d zKer)2!(lFlBMe8`8IJKV1Th?+Y|nF8mf<8L1Dlt>r=J2NBLk0TUS4XEb53G$acVI; z!<osJyqc^G=NK8}Cfo4JO>X72Wn$Pi`3SET6T|k&A9xiw7<Mr1WME|2Hd%r%6(m;A zXUD{_Yw~_RaXyA^3=E9Y46F?73=9lilb`VA8*XE$lVsh-(6s#j87me^mTe5J%l{wp z(cZ?;x{-m2!I`0)p^BlIfq|igp@pHD0c2hqzkYoYgE>PvgEd1HgC#=^gB3#~gEd17 zgB?RFgFQnhgCoN(1_lOs2IhYZ&g=~4><muq4AEum49));I2jn(88|=$$j}CcMvzMx z7#Majure?+Ffin5GaCtYY-8w}$snSoqs?NZ&1$rRL3|rS-*SdvklZAY++<S$pL&Mv z3=9l<4BQM13@!}Z46Y0^4DJl-3?2+d3|<Vj4BiYL489D(41NsJ3;_&j41o+e48dTB zu`{sBh>8jcFfcGoV3-JXfFuJ00}BHK1LH;z0XCbBfsuiMA)JAMVH(49BxN9XGH$F# zQy0y^z%U1kIy7bR3=9kl85SWK#{^P=Zfi0F1H*EL6%b{NO$;myJPZsB^K@81p<%|x zYR1aBn_*?7DBE_1)svqH+X=4;Zee9?;bvU^|B0?7ha}fdh7FTVMJ(z;1~Y-Z6UQLV zz`&5rz`>Biz|D}$Aj*)(AkUD`pvq9dputeYV8~F+V8T$sV9ij<;L1?O5W-N-5YAA_ z5X(@-kin40kjGHVP|i@tP{mNs(8$oh(9O`uFomIsVLC$t!(4_IaM0#4u>EI{VBlZ} zA#-+y-SzAY8yOfG85tP<GpI7~z{R1$><pER><oJu85n*s$o*#!U|>g60n#VRz=<r% z&aj&iY~n8l2?l0{{R|8YObiDY4l*!+5*EWD21bSx45t_v7`PZ18Q4!VoMkxAAPE5Q C9oh8& diff --git a/Dstore.java b/Dstore.java index da2d0a9..7a40e16 100644 --- a/Dstore.java +++ b/Dstore.java @@ -1,4 +1,5 @@ import java.io.*; +import java.nio.file.Files; import java.net.*; import java.util.Map; import java.util.HashMap; @@ -51,10 +52,11 @@ public class Dstore { Socket client = server.accept(); BufferedReader in = new BufferedReader(new InputStreamReader(client.getInputStream())); String[] message = in.readLine().split(" "); - handleMessage(message, client); + handleMessage(message, client, in); in.close(); } catch(Exception e) { + //Log error e.printStackTrace(); } } @@ -64,9 +66,9 @@ public class Dstore { } } - void handleMessage(String[] message, Socket client) { + void handleMessage(String[] message, Socket client, BufferedReader clientIn) throws Exception { if(message[0].equals("STORE")) { - store(client, message[1], Integer.parseInt(message[2])); + store(client, message[1], Integer.parseInt(message[2]), clientIn); } else if(message[0].equals("LOAD_DATA")) { load(client, message[1]); @@ -80,33 +82,90 @@ public class Dstore { else if(message[0].equals("REBALANCE")) { rebalance(client, message); } + else { + //Log error and continue (throw exception?) + } } - void store(Socket client, String filename, int filesize) { + void store(Socket client, String filename, int filesize, BufferedReader in) throws Exception { //Send ACK message to client + PrintWriter out = new PrintWriter(client.getOutputStream()); + out.print("ACK"); + out.flush(); + out.close(); - //Receive file content from client + FileOutputStream writer = new FileOutputStream(fileFolder + "/" + filename, false); - //Store the file data in fileFolder + //Receive + write file content from client + int byteCount = filesize; + while(byteCount > 0) { + byte[] nextLine = in.readLine().getBytes(); + writer.write(nextLine); + writer.flush(); + byteCount -= nextLine.length; + } + writer.close(); //Send STORE_ACK message to the Controller + PrintWriter controllerOut = new PrintWriter(new Socket(InetAddress.getLocalHost(), cport).getOutputStream()); + controllerOut.print("STORE_ACK " + filename); + controllerOut.flush(); + controllerOut.close(); } - void load(Socket client, String filename) { + void load(Socket client, String filename) throws Exception { //Send the content of the file in fileFolder to the client + PrintWriter out = new PrintWriter(client.getOutputStream()); + FileInputStream reader; + try { + reader = new FileInputStream(fileFolder + "/" + filename); + } + catch(FileNotFoundException e) { + out.print("ERROR DOES_NOT_EXIST"); + out.flush(); + out.close(); + return; + } + + byte[] buf = new byte[8]; + while(reader.read(buf) != -1) { + out.print(new String(buf)); + out.flush(); + } + + reader.close(); + out.close(); } - void remove(Socket client, String filename) { + void remove(Socket client, String filename) throws Exception { //Remove the file from fileFolder + Path path = new File(fileFolder + "/" + filename).toPath(); + PrintWriter out = new PrintWriter(client.getOutputStream()); + + if(Files.deleteIfExists(path)) { + //Send REMOVE_ACK message to client (the controller) + out.print("REMOVE_ACK"); + } + else { + //Send DOES NOT EXIST error + out.print("ERROR DOES_NOT_EXIST " + filename); + } - //Send REMOVE_ACK message to client (the controller) + out.flush(); + out.close(); } - void list(Socket client) { + void list(Socket client) throws Exception { //Send a list of all files in fileFolder to client (the controller) + PrintWriter out = new PrintWriter(client.getOutputStream()); + for(File file : new File(fileFolder).listFiles()) { + out.print(file.getName()); + out.flush(); + } + out.close(); } - void rebalance(Socket client, String[] message) { + void rebalance(Socket client, String[] message) throws Exception { //Interpret files to send and files to remove from the message Map<String,Integer[]> filesToSend; String[] filesToRemove; @@ -139,9 +198,21 @@ public class Dstore { } //Send each file to send to the Dstore at the specified port number + for(String filename : filesToSend.keySet()) { + for(Integer dstore : filesToSend.get(filename)) { + //Same store functions as used in the client object + } + } //Remove each file to remove from fileFolder + for(String filename : filesToRemove) { + new File(fileFolder + "/" + filename).delete(); + } //Send REBALANCE_COMPLETE message to client (the controller) + PrintWriter out = new PrintWriter(client.getOutputStream()); + out.print("REBALANCE COMPLETE"); + out.flush(); + out.close(); } } -- GitLab