Eric Redmond

Distributed Patterns in Ruby

Scalability today is no longer a question of which programming language you use, or (largely) which web architecture you choose. Instead, scalability is a matter of how you handle two things: data distribution and message passing. This talk is over a few ways of solving both: distributed data structures and messaging patterns.

Ancient City Ruby 2013

00:00:00.030 all right so I'm from Portland land of pasty skin and moisture so I would first
00:00:07.259 love to appreciate you Floridians bringing in the clouds in the rain I feel like it was for me and I'm very
00:00:13.320 comfortable so thanks so what I'm gonna talk about is distributed pairs in Ruby I feel obliged to mention again that I
00:00:20.550 work at Bath Show I also have a couple books out some databases in seven weeks and more recently a little react book
00:00:27.740 that's free to download from my github account and I wanna post all these on
00:00:33.649 online so you don't need to feverishly write anything down but I get hub account is github a coder github Karoshi
00:00:40.140 little react book and you know I do tell people that if you buy the book and don't like it I I won't offer a refund
00:00:47.460 but what you can do is buy five more copies and burn them and you will have
00:00:52.770 taught me a valuable lesson and that'll show me
00:00:58.309 so this artist Richard Arif I am kind of
00:01:04.680 a fan of and attempted to write to do a sketch with his left-handed right-handed
00:01:09.990 artist attempt to do a sketch with his other hand and this is what he ended up with but it's actually you can tell that
00:01:18.210 he had a hard time holding the pen is his hand isn't a strong in that way and
00:01:25.170 wasn't it's not so anything he's practiced but you can tell this is not actually the work of an amateur anyway
00:01:31.740 you can tell the characters female the face isn't bad the girl and the the far
00:01:37.200 side is exacerbated with the classic anime sweat drop he clearly knows what
00:01:42.479 he's doing this skill wasn't his hand it's in his head he knows what to look for even
00:01:47.549 though he is an adept and the reason I wanted to point that out is because you
00:01:52.860 can glue together toolkits but it's not going to fix an ignorance problem mm-hmm
00:01:58.110 but there's a flip side to that just like with the artist if you know what you're doing you don't have to have
00:02:03.630 necessarily toolkits to stand on you can do a pretty good job on your own
00:02:11.069 you you you understand what it is and then you should pick the tool to solve the problem you don't just pick tools and cargo cult thing and look at other
00:02:19.049 people that have done similar things and then try to clone that and say okay now my system is going to be like Google because I copied what Google did so but
00:02:27.930 what I'm going to talk about is distributed systems in particular and this is one of my favorite definitions of a distributed system I like this
00:02:34.079 definition because it specifically noting the emphasize words because they're all important if you take any of
00:02:39.180 these bolded terms away you don't really have a distributed system anymore and rails users take special note of this
00:02:45.930 definition because if you're you have a bunch of autonomous nodes that are servers that are running and they're all
00:02:52.199 taking requests but they're not communicating with each other you don't really have a distributed system you just have a lot of an array of servers
00:02:57.930 that are that are sharing information now I also want to know so is
00:03:06.799 distributed systems are great I mean we all understand I hope the reasoning why you would want a distributed system it's
00:03:13.169 a it's a great way of scaling horizontally there's a limit to how much you can scale vertically vertically and
00:03:18.239 having one server it gets bigger and bigger and bigger but it's also really great for availability as well you know if one of those individual servers go
00:03:25.439 down your entire system can be up and running and as I hope you understand that the more servers you add the
00:03:33.150 likelihood of anyone going any single one going down goes up exponentially I mean this is math it's like we've got a
00:03:39.470 1 you know if you've got a point oh one chance of any server going down
00:03:44.669 and you have a thousand servers the chance of any one of those servers going down is like 60% I mean something's
00:03:51.540 going to go down so but there is a dark side to this and and
00:03:57.870 the cap theorem which is generally understood as a database problem but this is actually just a problem with
00:04:03.900 distributed systems in general who all's familiar with cap theorem by the way all right ok I am going to explain it so and
00:04:10.799 that's gonna skip over it so what the cap theorem says is if you have a distributed system
00:04:18.829 you must be partition tolerant so P is not an optional thing if you have a
00:04:25.610 distributed system but you can only have two of these three things so if you have
00:04:31.639 a distributed system you can be purchased all you have to be but you can either be consistent meaning the entire
00:04:36.919 system will return the same answer to any request or you can be available meaning that every time you request
00:04:43.610 something from the system you get a result and I think it's pretty easy to explain so let's take these three guys
00:04:49.039 in the front row and let's say that you guys are nodes in a distributed system
00:04:56.389 and then I as a client come to you and I say hey a software client not not not a
00:05:02.870 you know customer paint client I come to you and I say okay the word of the day is jump so now you all know the word of
00:05:10.580 the day is jump now you get up you go to the restroom and then while you're gone
00:05:16.310 I tell you guys hey guess what I changed my mind the word of the day is not a leap now somebody stops you in the hall
00:05:21.560 and says hey what's the word of the day you have a choice to make you know you're partitioned from the
00:05:27.530 rest of the system so you can either remain available and choose to answer the question say hey the last thing I
00:05:33.080 knew the answer was jump not knowing that I changed it to leap you are now
00:05:39.320 inconsistent this system is an inconsistent system because if I ask you versus you guys I get a different answer
00:05:45.020 or you can just choose to not answer at all and say you know what I don't know you're not available anymore but you
00:05:52.430 haven't destroyed the integrity of the system you as far as consistency is concerned because you're not giving me a
00:05:58.280 different answer from them you're just not giving me an answer at all and so that's the cap theorem in a nutshell you pick one you get consistent or available
00:06:06.080 when it's distributed but you can't possibly have both it's a it's a it's a it's a physics problem and just as a
00:06:12.620 side note I am telling you with with some semblance of authority that anybody
00:06:19.550 who claims that they have beaten the cap theorem or that that it doesn't matter or that
00:06:24.840 can deliver both consistency and availability I'm not going to say that they're intentionally lying to you
00:06:30.630 probably they just don't actually understand the implications of what they're saying so I don't know how in
00:06:40.860 the world these end up up here all right so we're gonna talk distributed patterns
00:06:46.860 these are the distributed patterns that we're gonna write a distributed hash table message patterns vector clock
00:06:51.960 Merkel tree and Map Reduce and I picked these for a couple reasons one because they're actually really easy
00:06:57.510 to write in Ruby and we're gonna we're going to write some now and two these
00:07:04.290 are actually a lot of the distributive patterns that underlie many distributed
00:07:09.360 systems react is company Batchelor I work for we make the react database and we use all of these patterns but we're
00:07:17.729 not the only ones Cassandra uses them I mean you guys have probably seen MapReduce before almost all no sequel
00:07:23.760 databases supply some some type of it vector collection another common one message patterns
00:07:29.630 that's not a data structure that's just a pattern of how do we communicate between servers and everything that's
00:07:37.380 distribute is going to use some type of message patterns let's start with a distributed hash table distributed hash
00:07:47.460 table also called the consistent hash distributes data evenly but the most important part about it unlike sort of a
00:07:54.780 classic hash table that you're all aware of also you know a hash and Ruby is there's minimal disruption when nodes
00:08:00.990 are added or removed and I'm gonna I'm gonna show why or how this happens by
00:08:07.590 implementing my own hash this is just a really naive hash implementation where
00:08:13.770 where I store the number of nodes and then I give a block of buckets in my
00:08:22.979 array I'm sending requests to those nodes and then my hash is just a sha-1
00:08:29.460 and it gets put in the array these are the this is a these are the important
00:08:36.770 parts is I have my array however many nodes I have if I have four nodes and I want my
00:08:42.659 spread to be a thousand meaning that each node can handle a thousand values
00:08:48.019 I'm gonna have four thousand array and then they get just hashed just as you
00:08:53.790 would in normal hash everything out I mean does everybody understand the concept of a hashing algorithm basically
00:09:01.860 we convert an object into a number and assign a hash to a bucket that's it but there's a problem with our standard
00:09:07.740 caption implementation so here say I start with three nodes now you the
00:09:14.339 number you has two is always going to be much larger than the range of your array if you're using anything like sha or
00:09:19.860 something like that so what you do is you just do a modulo on it and so in this case I have a it might convert to
00:09:27.480 the number four but I do a modulo 3 because I only have three which puts it in bucket position one so it's zero
00:09:32.490 starting kind of zero zero one two it gets put in beat so this is where my value will live on node B but if I add a
00:09:40.529 node to it now yeah now I do D my number four it still has a number four at my same object modulo four oh now it's zero
00:09:47.689 now it gets put here this is a problem when you have a distributed system because I if I have a whole bunch of
00:09:54.779 data sitting on one server and I want to add a new server to my collection I now have to take all of that data and
00:10:02.160 move it which is ridiculously expensive so I made a little program with my naive
00:10:09.660 hash I said okay each each node can hold a hundred thousand elements in it and
00:10:17.389 then I just I just said okay I just put a bunch of values sorry no I put a
00:10:24.540 hundred thousand elements in my array that goes from a through J so I have a through J now so I have ten nodes I add
00:10:31.829 node K and then I tried again and I want to count how many times if I tried to
00:10:37.019 hash this value it would miss the original bucket that I put it in and my
00:10:42.600 miss rate was ninety percent meaning that ninety percent of the data in all
00:10:47.910 of my nodes is now stale and worthless so what consistent hashing does is
00:10:55.520 rather than having very discreet buckets where things live it instead will map
00:11:02.550 nodes so we've got the red nodes are so we've got a B and C it'll map them to a
00:11:10.350 position on a circle and what's nice about this is as I add and remove nodes
00:11:16.080 so say I got rid of C and I added D the only values that in this case so I've
00:11:23.550 got values 1 2 3 & 4 I the only things that are stale is the range here between
00:11:29.940 the C and the D so 4 is stale and that's it so our really simple consistent hashing
00:11:39.050 implementation we initialize a ring I mean I'm kind of cheating here because
00:11:44.550 the ring I'm keeping track in a hash but you know whatever I mean that that's actually not important because what
00:11:49.620 we're what we're doing here we're not trying to make an efficient hash that sits on a single server we're trying to
00:11:57.020 minimize the amount of thrashing that goes on when we add and remove nodes
00:12:04.170 from our network so what we do is we'll add a bunch of these nodes the hash
00:12:09.930 implementation is exactly the same it's still a sha-1 algorithm but the most
00:12:15.510 important thing to note here I guess I have a better slide that shows it is our
00:12:21.450 node we have a sorted list of nodes when I want to know hey I have this key
00:12:27.290 before in our picture we circle the keys would have been 1 2 3 or 4 I have this key returned me the name of the node
00:12:35.730 that this should live in and what it'll do is it'll iterate through the nodes and each of these nodes is responsible
00:12:41.190 for a range of values and it'll just say hey node a are you responsible for this are you responsible for this are you
00:12:46.980 responsible for this and it actually so
00:12:52.320 you know in this example I created this consistent hash I should also mention to
00:12:59.610 you know don't worry if the code like every line of code doesn't necessarily make sense because I have all of this code
00:13:04.860 available you can download it and play with it at your leisure I just want to
00:13:13.950 go over what it does so that when you're looking through it on your own time this
00:13:19.110 will make a lot more sense but the important thing is so this is actually the exact same test I ran on my on my
00:13:24.600 naive hash the difference is now my cache miss isn't ninety percent it's seven percent when I added an extra node
00:13:34.460 so we're clearly on the right track but what we have with this sort of
00:13:39.510 implementation is a smooth continuous hash where nodes can live any point on the ring but there's a clear problem of
00:13:46.560 balance here and you can even see it just by looking at it it's like over there it's like you've got a B and D all
00:13:53.250 kind of clustered up here at the top what you really want to do is balance the ring so that you have if you have
00:13:59.970 two nodes you have a and B if you have three nodes you have a B and C you have four nodes you have ABC and D you you
00:14:05.400 want it to balance around the ring so there's no real enforcement on how many
00:14:10.830 keys we also rehash when she became D we only have the average case and obviously
00:14:16.920 B owns far more key space than than a or D so an alternative form of consistent
00:14:24.150 hash and this is the one that we use with react this is this style that Cassandra uses almost all real world
00:14:31.050 implementations use this form the other reason I wanted to show you this form is because if you ever look up anything on
00:14:36.120 Wikipedia about consistent hashing it goes on and on about this and that's because this was sort of the first take
00:14:41.790 this is the real production consistent hashing that people actually tend to use and they have a fixed number of key
00:14:47.850 spaced partition where a partition is a range of keys and then we're just assigning each partition to a node in a
00:14:53.400 round robin fashion and the benefit here is balanced so you're getting relatively even distribution of keys and more
00:14:59.490 importantly adding and removing nodes can keep misses to a reliable amount so
00:15:04.740 she originally here I had these nodes I added a third and and it's hard to see the colors but it's
00:15:10.350 they do alternate from light yellow to light green to light blugh I didn't color it but so hopefully
00:15:19.250 the red is a little more obvious but now I add and I'm still relatively balanced around the ring and the whole thing is
00:15:25.190 about keeping these these values balanced now you might say you know well
00:15:31.070 what's the catch then why don't we just use this as our consider why doesn't everybody use this as a consistent hashing algorithm all the time since
00:15:37.400 it's so superior well there's a downside and that's you're limited to how much you can grow however many partitions you
00:15:42.770 can have in this case we have thirty-two partitions you can't add more than 32 nodes you're just sort of stuck at that point
00:15:47.930 you can't have a 33rd node with this without completely repartition entire
00:15:53.870 ring and then rebuilding and then you're sort of back to where you work now that said the good news is as a practical
00:15:59.630 matter that actually doesn't necessarily happen because you can have as many partitions as you want and still remain balanced so you could have a thousand
00:16:05.840 partitions for ten nodes and then you can add up to a thousand nodes which you probably won't get to anyway and once
00:16:12.560 you're at that size then all right you know bite the bullet and just rehash it I'm not going to dig through the code of
00:16:17.840 that one because it's a little hairier you absolutely I I would say look
00:16:23.300 through it at your own now the missus is a little worse on this than the other one but that's that's just the random
00:16:29.630 data set I picked but the thing to know it is this is pretty close to the number because I have 11 of these since I have
00:16:37.220 11 in these nodes this is about 11 or 1/11 pretty similar it's kind of what you
00:16:44.030 would expect on a rebalance so there is one more thing that we should keep in
00:16:49.370 mind with this special kind of consistent hash and that's to build a preference list and in short the
00:16:55.460 preference list are the end successive partitions that you wish to replicate an action so if you have the key foo we
00:17:03.260 have down here which maps to node B or node a in this drawing the preference
00:17:10.610 lists are the next n minus 1 nodes so since in this case if we set N to 3 and
00:17:15.710 our preference list on foo we would get B C and D I drew this weirdly I don't
00:17:20.900 know so a B and D in the picture but it'd be seeing the in the exam here and this is really important this
00:17:27.829 is how we're going to get high availability because it's one thing to consistently hash a value across the
00:17:34.700 ring but if one of those nodes goes down you have lost the number you've lost
00:17:40.760 that segment of your data but if you've replicated it you now have much higher
00:17:47.809 availability in this case if a goes down or B goes down or C goes down you still have two copies of your data so this
00:17:54.620 preference list is something we're going to use later on but it's definitely a
00:18:00.010 very important piece of this now this is
00:18:05.330 just a simple object it contains a value I converted to Jason I'm able to serialize and deserialize it the reason
00:18:13.400 for the serialization is when we add our messaging our network piece that's how
00:18:21.190 these individual servers are going to be able to share data they're just going to serialize it convert it to a string send
00:18:27.650 it to the next one and then deserialize it now this is our node class and what
00:18:35.990 it does is it manages the hash ring as well as a hash of data remember when I
00:18:42.320 had said before with our hash string implementation it didn't matter that we had a hash in it because efficiency
00:18:47.360 wasn't really that important all it's for is looking up if I have a key the
00:18:52.370 entire reason for this consistent hash is only to say what node are you supposed to live on it doesn't store
00:18:58.730 data it just tells me where it's supposed to live and so with this node
00:19:05.720 implementation does actually store the data this is what's going to live on any particular server so does everybody get
00:19:14.390 that ok and so I'm just creating a standard put and get interface where I
00:19:22.010 have my node I have my consistent hash ring I contain my data in a hash I gave
00:19:29.150 it a name just so we can tell nodes apart and then all I do is I just I use my ring and I look up the node by key
00:19:35.900 and say hey is this you and I in the right place
00:19:41.169 if so store it same thing with get am I in the right place okay get it and
00:19:49.079 here's a sort of more truncated view these are the more important aspects but it's the same code it's pretty simple
00:19:56.889 you get a value put a value and it only stores that version locally if it's the correct node simple as that and here we
00:20:05.349 run it we've created three nodes see I gave it a name a B and C and then I also sent into the ring so each of them are
00:20:11.769 aware that there are rings a B and C and that's how they know about each other I
00:20:17.709 put a node I put a value in node a foo it returns nil why because it was the
00:20:23.679 wrong note it doesn't belong there I put it in B uh it returns bar that was the one that I
00:20:29.709 sorry I put it in and I got it and it got me bar that's what I expected and then I tried to do with see exactly what
00:20:35.679 we expect C was the wrong node so it doesn't actually store it so then there
00:20:44.349 are limits to how useful that is because I just ran these all on one server what
00:20:50.619 we need to do is we've created our consistent hash weekend which can route
00:20:56.549 messages to all nodes but before we can actually use it we need to ensure that every node is on the same page meaning
00:21:02.439 that every node needs to know the state of the cluster to ensure routing is done as expected so we're gonna play with a
00:21:10.419 few messaging patterns request reply it's a pattern that everyone is
00:21:16.989 intimately familiar with anybody who's done any sort of networking whatsoever you ever use the web or whatnot publish
00:21:22.839 subscribe and then pipeline is another way we'll go in to detail what all of
00:21:27.879 these actually nimble what we're gonna use your quest reply for is we're going to query and we're gonna use it to query nodes we're gonna we're gonna each of
00:21:34.389 those nodes is ultimately going to become a server that's that runs on its own server and the idea is that you can
00:21:43.359 query one node and if it's the correct note it will return you the value if
00:21:48.999 it's the wrong note it's not going to just say I don't have that value what it's gonna do is it's gonna forward that request so the
00:21:55.020 correct node because it has a consistent hash it knows where the real value lives so it's just going to make another hop
00:22:01.890 on your behalf to get the real value and then return it to you so we're going to use request reply for both of those
00:22:08.010 cases I'm using 0 mq as an implementation it's a little higher level than socket a little lower than
00:22:14.190 much lower level the middleware I love it I who's familiar with 0 mq here if
00:22:21.960 you're not familiar I highly recommend taking a look it is it is one of my favorite pieces of work but one of my
00:22:29.280 favorite things is it's entirely transport agnostic when you say I want
00:22:34.350 to connect to something you you will tell it in what way you wanted to
00:22:40.170 connect you can you can do in memory inter process communication TCP multicast all these different kinds
00:22:46.020 of communication but your interface is exactly the same all you have to worry about is the pattern am i doing publish/subscribe am i doing a request
00:22:52.110 reply pipeline or whatnot if you're MQ handles buffering and framing for you
00:22:59.580 unlike if you're doing raw sockets where you've got to deal with that yourself it's message oriented not byte streams
00:23:06.330 or anything like this although you can do that if you want but for our purposes this is all an implementation detail
00:23:11.370 anyway I'm just using this to simplify the code so requests reply like to spend
00:23:17.160 too much time on this I think again we all understand what they are they're bi-directional you send receives and
00:23:22.380 receives and receive that's exactly what a request reply does and this is in in 0
00:23:29.220 mq particularly a request does round robin and replies the the last pair of
00:23:36.000 going strategy let's just come in let's just go out as bill o'reilly says you can't explain that so here we create two
00:23:44.430 threads we start them both up a server followed by a client this is just a
00:23:49.560 simple 0 mq script you can forget what we did before I'm just giving an example
00:23:54.990 of what 0 mq would look like we create a
00:24:00.600 socket 0m q reply socket so this is a thread this this thread do stuff is just i just
00:24:07.500 made a little if you look every say you require threads include thread if you want to download the code it's literally
00:24:13.800 just it just takes a block and then it just runs it in a thread after after you call join threads so i'm creating a
00:24:21.060 little server here built to do replies and all it does it just does a while
00:24:26.160 loop and then just waits for the receive of anything that comes through port
00:24:31.880 through tcp in port 2200 if the message
00:24:37.290 contains put in the payload and then it just realized hey I called put then it
00:24:43.440 creates another thread which is a client and then it connects to this exact same TCP port as as you would with a client
00:24:50.430 and then it'll send put fubar and then and request you know get that get to get
00:24:57.210 the response back and output it and then it'll do it again and this is exactly
00:25:02.700 what it looks like we've got our server here we've got our client here it sends
00:25:07.890 it to the server the server sends back hey I called put and here's what it look
00:25:14.910 like yeah that's it it's that it's not it's
00:25:22.790 not magic it's but it but the important thing is the fact that I was running them locally is almost irrelevant if I
00:25:29.240 could easily put these on two completely different machines now this is a little
00:25:41.300 services module that I made the
00:25:46.430 important thing here is and you don't need to really pay attention to this and I highly recommend don't do things like
00:25:53.840 this if you're if you're kind of a big you know ruby nerd you might immediately
00:26:02.150 see the horror of this because what I'm doing with when I create a service is I'm getting a request and then whatever
00:26:08.600 the message is I assume it's a function of that name and then I'm just calling it and sending the payload to it so yes
00:26:15.200 you could send a message that say delete and it would delete and whatever but this is a really simple sort of
00:26:21.740 straightforward way of doing this because now for my put messages I only have to implement a function called put
00:26:28.610 and forget I just have to implement get and Map Reduce i Map Reduce so yes evil
00:26:34.460 at the side is very evil don't do this stuff but it normally should be a giant case statement but just for sanity's
00:26:39.470 sake I'm simplifying it do it so that's where it's suing this isn't and if
00:26:44.630 method method method missing if it doesn't have an implementation will just return say hey I have a bad message I don't know what that was for so so this
00:26:55.520 was this was my services module so we go back to our node that we wrote and we had a couple things we include the
00:27:00.770 threads thing obviously we you know a configuration file which is just a simple file that will contain the name
00:27:07.970 of each of these nodes because I don't feel like typing it in every single time I start up one of these nodes it's just
00:27:14.240 something that would live locally with the nodes and then also the port's that they own but more importantly is this
00:27:21.650 including services so I start up a service and my configuration as I said it tells it what port it needs to start
00:27:28.280 up and this remote call implementation and remember what I said
00:27:33.660 that if the current node doesn't actually contain the value that it expects I forward it on to the one that
00:27:40.050 is right that's exactly what this does this name is the name of a node and this message is the message that it got and
00:27:46.770 so all this does is it connects to the remote node in exactly the same way that
00:27:54.180 a client does it just connects to the remote node it sends the message gets the request and then and then it will
00:28:00.300 presumably forward it on to you so here's what we do with put so here's my
00:28:05.910 put function it will send the socket that it gets from do put but this is
00:28:11.430 where the real work happens all it will do is it'll do this is similar to what we saw before right my duplet says hey is this key correct
00:28:19.380 for this node if it is store it but if
00:28:24.600 it's not for that request on to the node that is actually responsible for storing
00:28:29.700 this data oh and this is the the config file make it that I was talking about so
00:28:37.760 this is what I do I started I start a node as a server give it a name whatever
00:28:44.760 my are whatever argument lists whatever argument I give that's the name of this one and it stores ok so it also sends
00:28:52.980 and say hey my ring looks like ABC and then there's what my client looks like
00:28:58.920 where it then connects sends a thousand requests to this one node and then
00:29:05.270 receives a thousand and then that's how I started up but it's more interesting
00:29:11.550 to look at I suppose so we're starting up a B and C and inserting values and if
00:29:23.100 you notice what I did in the client was connecting to a only the client was only
00:29:29.130 ever connected than a and a then forwarded on the correct values to B and C or if it was correct on a than it
00:29:36.900 would it would store it it would return it itself so that's when you get this message where it says B get key what
00:29:43.170 it's doing is it's forwarding on to B or it's forwarding on to C or if it just gets it then that was the correct correct one and it
00:29:51.780 may seem a little slow but I mean that's because we weren't doing any sort of like fancy connection pooling or
00:30:00.180 anything like that but what we've done so far is actually pretty amazing with a
00:30:05.790 very little bit of code we've created a balance key space clients can connect to
00:30:10.800 any node they can distribute objects across node and the request response from any node will forward to the
00:30:16.320 correct one so it doesn't even just doesn't matter what nodes we connect to which is what you light you which what you want on a distributed database again
00:30:24.180 we're talking about high availability here so your client can easily say okay I'm gonna ask a what the value is Oh a
00:30:30.540 is taking too long all right fine connect to B and is this sort of classic failover scenario so that you can get responses as quickly as
00:30:37.740 you want all right are there any questions so far like sir
00:30:53.010 that is the axe excellent question and remember when I talked about the preference list where it replicates we
00:30:59.920 haven't implemented it yet but we're going to very soon and that's that's an excellent question because that should
00:31:05.410 probably be one of the first things going through your head is we haven't actually saved ourselves anything and availability yet
00:31:18.850 like just just a
00:31:32.659 yes there there and we're gonna we're gonna deal with that situation as well so but there's one really big problem is
00:31:40.770 if you noticed in our implementations when we started up the node we had already hard-coded that it nodes a B and
00:31:46.110 C already exist so why not use another
00:31:51.390 message pub/sub in this case we're gonna go a little off script this isn't actually how react does it but I wanted
00:31:57.330 to give an instance of pub/sub what we're gonna do is we're gonna elect one leader to manage the publication of ring
00:32:04.980 changes to all of the other nodes so that when a new node comes in it can
00:32:10.679 then inform the rest normally you don't actually want to do it that way because as soon as you do leader election you've
00:32:16.320 created a single point of failure and you know you're back to where you started with the you ability to lose
00:32:23.010 availability so in this case we're creating a function called coordinate cluster and all it does is see this is
00:32:29.039 why I absolutely love 0 mq look at this zero amp zmq pub I just created a
00:32:34.559 publisher in exactly the same way that I would create the request reply and so
00:32:41.700 what we do here and this is the important part of the code is whatever the ring says we send that out to all of
00:32:49.289 the subscribers that's that's all we do everything else is just priming that
00:32:57.330 change track cluster this is the opposite side of it these are the subscriber we create a subscriber socket
00:33:03.500 and we subscribe to any message ring from the the port that the publisher
00:33:11.280 lives on and we will just accept any change they gives us and we will change
00:33:17.490 the cluster that this was a function on our on our consistent hash that will
00:33:23.070 just update the the the rings with the number of nodes that we've been given but this is what it looks like in
00:33:29.270 pictorial form here the problem is with a straight-up pub/sub that doesn't
00:33:35.580 really help us because when we add a new node that original the whoever the the
00:33:41.460 master coordinator is has to know about it so we again use your quest reply so we're
00:33:47.290 gonna add a little reply socket here that will just sit and listen if an O
00:33:53.140 joins it'll add it to the node if it goes down it'll remove it from the node
00:33:58.420 list and then send out a response to or send out a publication to all the
00:34:04.750 subscribers that hey the ring has changed oh and then informing the
00:34:10.780 coordinator this was remembered so so here we had when join happens when down happens same thing here when we start up
00:34:17.050 a node we want to inform the coordinator that our join has occurred when we close it we want to inform the coordinator the
00:34:22.060 node has gone down and then here's our informed coordinator it just is a standard request reply setup but just to
00:34:29.860 the coordinator node yeah and then this
00:34:35.020 is if if it's the leader so this is we just have we just were passing in a boolean that says if it's leader make you the coordinator if it's not the
00:34:41.620 leader tract of the cluster and if you're not the leader then you were capable of informing clusters that
00:34:47.620 you've meant that you've joined on start so let's run that I'm not typing here by
00:34:56.050 the way I'm I'm not doing this there's
00:35:02.440 there's way too much for me to try and do live but the the you know I did I did actually run these all of these are
00:35:08.260 absolutely true but what's interesting here is as you see it got the message that a ring changed as we added B and as
00:35:14.620 we added C and they were all informed of the Rings changing and then finally I
00:35:20.050 shut down the Rings and then it did this it did them in Reverse it just I took down B then I took down C so awesome
00:35:28.450 perfect leader node broadcast changes state all those nodes in forum leader of state but what if a node dies if a node
00:35:36.100 goes down still as you you would asked we lose all the keys in that partition the obvious answer replication now
00:35:43.750 there's a if you're familiar with Cassandra or anything like that and our W values n is the number of nodes you
00:35:49.870 want to replicate to R is the number of nodes that you want to read a value from when you do a read you can you have the
00:35:55.270 option of reading from any up to n w is how many rights have to be
00:36:01.309 successful before it returns the client says hey I was I was a success it could be one if you want to be really fast or
00:36:07.430 you can wait for all ends to right if you want to be pretty sure that the right has been successful so these are
00:36:14.690 the n RW values n is number nodes replicated to W in this case since I said W to two it writes these two grade
00:36:20.449 ones but it replicates in the background to this one but but it has responded before that replication has occurred
00:36:25.519 same thing R equals two I read from any of these three so I read from two and
00:36:32.390 the first two to respond are there's those are the ones that you get the result from so I'm gonna skip over this
00:36:41.959 eventual consistency because wow we are really running on type anybody really
00:36:47.209 wants to talk about eventual consistency after the fact but I would be happy to
00:36:53.019 discuss it so here's our so here's what our replication does we are adding this
00:36:59.869 replicate into our put see do put is the function we're calling replicate I've
00:37:05.269 created this special command put zero because if you noticed all of our commands had the number of nodes so it's
00:37:10.759 like put one put two put three before zero is just saying hey you're just a replicated value this is really just a
00:37:17.359 cheap trick you could just as easily create a separate code path only for replication but you win is well we
00:37:23.420 already wrote the ability for a client to put values so we may as well just make a special case for for replicated
00:37:29.509 values see where it says up here in the comments or means insert locally that
00:37:34.640 means because under normal circumstances if I were to say put one then this would
00:37:39.650 have the normal code path and say hey I'm not the right node to store this value so it just refused to do it so we
00:37:45.349 had to make this special case so now we've got our replication and I mean
00:37:50.630 this is it there's a replication code it does the remote call
00:37:56.750 all right
00:38:01.799 so everybody might want to back up a little I think this is I'm turning into
00:38:06.940 Kerry so anyway so the replication occurs to these to these nodes and let's
00:38:14.980 see this in action so I fire up the nodes and the ring
00:38:20.829 state changes as we expect now our client is putting values in but what's interesting is it's it's replicating all
00:38:28.390 these values so at first it was putting in one value then it was writing to and
00:38:33.789 then getting values from one note getting values from two now if you will notice it took double the time to get
00:38:41.799 values from one node versus getting the values from 2 node that is almost exactly what you would expect it to be the case however since you've gotten a
00:38:48.460 value from two nodes you can be a little more confident that the value you're getting is the most recent one and then
00:39:00.960 but there's a problem with what we've done so far and it's this so I have my
00:39:10.510 values baaz bar q ux see if i have my
00:39:17.500 client that caused this I don't anyway so the point was since
00:39:25.359 the society since I had gotten from - I actually received three values back and I expect to receive one and the problem
00:39:31.569 is this the more copies you have the more problems you have different versions on a node can conflict say I
00:39:36.640 wrote one value for a key foo to node a and I write another value to node B and
00:39:43.809 I say okay read from both of those and then it returns both of those values but
00:39:49.299 they're not the same so what our code does is it creates something called a sibling it will just return them both
00:39:55.539 and then say ok it's up to you to figure it out but we there's something better we can do which is called vector clocks
00:40:00.880 you can't rely on system clocks to be synchronized you just plain camp but we
00:40:06.099 don't need to synchronize system clock all we need is logical ordering to know which are the most recent values in
00:40:12.160 which of the older values so this is an example of vector clock so say you have Allison of Bob and they say
00:40:17.830 okay Alice says I want pizza and bob says I want a taco and then and then
00:40:24.750 Alice goes to read the write and says oh bob says taco I said pizza where did my
00:40:31.630 pizza go I'm going to update this value and increment myself and Bob and then a
00:40:38.920 right taco pizza so now when Bob does the get he's going to know that the Alice took his suggestion into account
00:40:44.860 when coming up with a final value this is just a another view of vector clock
00:40:50.260 I'm not going to go into the implementation of a vector clock right now but the important thing to know is
00:40:58.020 this that so you can increment just like we John before we compare them and it's
00:41:06.070 zero did now we increment Adam did V descend from v1 yes it did then we
00:41:14.050 increment barb is there a conflict yes there is because what I did here was I
00:41:19.360 had Adam incremented I cloned it so that you know I added barb and then I cloned
00:41:24.520 it so the same and then I increment it them both independently of each other and now if you compare them it'll tell
00:41:30.040 you these are a conflict the the values that you attempted to to give me are in conflict and this is this is what a
00:41:42.970 conflict looks like in our system so here it just returned both of our values
00:41:51.250 along with our V clock that we implemented it gives us the value hello one or hello to but we have two
00:41:57.790 different V clocks so then the next thing our client does is it will resolve it and say you know what I wanted hello
00:42:03.520 to and it sends the V clock with both C and B together so that the system knows
00:42:10.150 that it rejas resolved the conflict and this is this is what that client looked
00:42:16.600 like that's why it's sleet it slept I'm sorry I'm running through really fast right now I don't know how much
00:42:22.230 I have right now all right I did a couple minutes we do have some problems
00:42:30.210 of vector clocks they do grow forever but there are multiple ways of resolving conflicts you can choose a value at
00:42:37.080 random and this is actually what some implementations do now they say we use system clocks but that's effectively
00:42:43.260 random because we said you can't ever guarantee that that multiple servers will ever have completely synchronized
00:42:48.900 clocks siblings which is what we do and react it's what you know we did in our implementation here or some predefined
00:42:55.020 resolution and that's CRT T's CRT T's conflict-free replicated data ties
00:43:01.350 communitive replicative data types so what a problem is this so if we have two
00:43:08.670 services connecting to our cluster here and then say a does a read and says okay
00:43:14.310 give me the value of some counter now I want to write the counter in it's two and B says at the exact same time read
00:43:22.080 it okay now add added value increment now right it is to that a created a
00:43:29.010 conflict so when B goes to read it again which one is it is isn't is the
00:43:35.010 incrementation - or is it four it's actually neither it's three because
00:43:40.650 we've only we had the number one when we incremented it one over here we incremented it one over there so we've got three but we can't ever pick it out
00:43:47.520 of the the data that's in there so the solution instead is only store
00:43:52.560 what the changes are and try to let the system resolve those changes on the back
00:43:57.690 end if it can so in this case we just say plus one - the one that's already there and we get a value out there could
00:44:04.440 have been a conflict there's a conflict on both of these but what happens is so it took this plus one it created a two
00:44:10.619 inside here this had the plus one there's a conflict so what it'll do is it'll say I don't know what to do with
00:44:18.060 this plus one so I'm just going to return the two back to you guys and say oh there's also another pending plus one
00:44:24.359 and that's easier to resolve because you can take you can manually just take that too and that plus one and add them
00:44:30.930 together and get three but you don't even need to manually resolve it that's what CR DT is for
00:44:36.000 see our duty counter in this case resolves it for you I'm not gonna go through all the details of the code again you can read this is running out
00:44:41.580 of time but the interesting thing is here is that when you get the counter if there exists multiple node objects which
00:44:49.590 means if there were siblings it'll resolve them for you and send you a valid result and the nice thing about it
00:44:54.840 is we don't have to send in any sort of vector clock information at all we just say okay I'm putting one I'm putting two
00:45:02.550 I'm putting one we expect to get the value for and that is exactly what we'll
00:45:07.980 get or just skip the demo and you all I hope know-how addition works there the
00:45:13.110 response would have been for there's other types so we can do this with we can do source sets we can use with lists
00:45:18.680 if we have a set we can say okay add a value to this set subtract a value from
00:45:24.750 this set and then it if there's any conflicts this server is able to merge
00:45:30.360 them itself so we're going to skip
00:45:37.020 through an entropy as well it's very interesting but we are almost entirely
00:45:43.470 out of time Merkle trees fantastic I highly recommend looking up this
00:45:48.720 information what it effectively are doing is we're creating a hash tree and then each of the nodes each each level
00:45:54.660 of this hash tree that contains a effectively a diff of what all of the
00:46:01.950 trees are doing it will then compare each level at a time so that it can
00:46:07.620 actively repair these siblings that are occurring on the back end and you don't
00:46:14.190 have to actually repair them yourself but I will do one more complex grading this is actually the last thing right we
00:46:19.410 get two minutes all right Map Reduce I'm gonna do Map Reduce because because we've got this really
00:46:24.510 awesome consistent system now but you know it's still just a key value store how boring is that you want to do more
00:46:30.750 complex queries and I want to do this because I want to show that MapReduce is everybody familiar with MapReduce
00:46:36.090 everybody used MapReduce no one okay so really quick really quick if you're a
00:46:42.630 Ruby programmer this should actually be fairly straightforward you're probably already familiar with these functions you might not even necessarily realize
00:46:48.240 it so there actually is a function called map in Ruby and there is a function in array called
00:46:53.370 reduce that's what Map Reduce is so what map does is it changes an array of
00:46:59.220 objects from one thing into something else it's mapping them one at a time to something else in this case what
00:47:05.610 we're doing is say we have an array of hashes and all we're doing is we're saying okay get the value out of the hash so we've turned this array of
00:47:12.660 hashes into an array of numbers 1 3 & 5 step 2 is reduce where we're saying okay
00:47:19.290 you know we're going to prime of 0 and we're just going to sum them all together and we get 9 so it's iterating
00:47:24.390 through all of this and aggregating result that's the reduce phase the interesting thing about this is you can actually distribute it you can actually
00:47:31.020 say okay hey you server map these values reduce it you server over here do the
00:47:36.420 same thing map these values reduce it give me the reduced result and then re reduce it and then give me an answer and
00:47:42.900 this runs on the predicate that this works on the predicate that when you have like petabytes of data it's way
00:47:50.190 cheaper to send the algorithm to the data than it is to pull the data to the algorithm and that's all you're doing
00:47:55.500 you're just sending it out there and then you're getting the result back it's up to the to the nodes that contain the
00:48:00.660 data to actually run it so we show we you know this is what our MapReduce is going to look like it's really simple
00:48:06.570 it's really straightforward we have a map function that just evaluates some a function that it's
00:48:14.580 bound to M R is the the name of our Map Reduce
00:48:19.860 command that we're adding into our node if you notice this says module Map Reduce cuz again we're creating all these separate modules so we can add it
00:48:25.800 into the node and it adds this functionality there's no but when we import it so what it's doing is it's
00:48:32.190 it's going to up here where you see it says call Maps function that's down here
00:48:40.860 and that's where it will then remote call all of the nodes and say hey you
00:48:46.470 map you map you map you map you map and then it will just sit and wait and then aggregate all of the results it also has
00:48:53.400 to map its own results locally and then it returns them which then get reduced
00:49:00.210 and that's it own that's the actual we'll send the socket out and call and
00:49:05.490 this is probably my favorite part of this whole thing is we run MapReduce on
00:49:11.100 our values so we stuck a whole bunch of values in then we said okay I'm gonna
00:49:16.710 count all the objects and I'm gonna sum all the values that was relatively fast distributed across three servers with
00:49:23.370 minimal amount of code so this is what we've done tributed replicated self-healing
00:49:28.860 conflict resolving eventually because it's the key value database with MapReduce in about 250 lines of code
00:49:36.780 which is effectively where react does but it's considerably more about lines of code but it's Erlang we won't hold it against them for being implemented in
00:49:43.470 that way and that was all
00:49:54.170 you