RailsConf 2014

Supercharge Your Workers with Storm

Supercharge Your Workers with Storm

by Carl Lerche

In his talk at RailsConf 2014, Carl Lerche introduces Apache Storm, a distributed real-time computation system designed to enhance background job processing, specifically in scenarios where using a database for coordination leads to significant performance issues. Given the increasing demand for processing large data volumes quickly and reliably, Lerche articulates the value of adopting Storm in modern applications.

Key points covered in the presentation include:

- Overview of Storm: Lerche describes Storm as a powerful and distributed worker system, explaining its benefits of distribution and fault tolerance alongside its operational complexities, such as the need to manage a Zookeeper cluster, Nimbus processes, and worker processes.

- Use Case - Twitter Trending Topics: Lerche uses the example of processing Twitter hashtags to illustrate Storm’s capabilities. He explains how hashtags can be tracked and their trend rates calculated using an exponentially weighted moving average, which requires efficient data processing in real-time.

- Operational Overhead: The presentation highlights the operational challenges and overhead when scaling systems using traditional methods like Sidekiq or Redis. Lerche showcases how using memory caching can improve performance before introducing Storm to better manage background tasks.

- Core Concepts of Storm: He details Storm's fundamental abstractions, which include streams, tuples, spouts, and states, emphasizing how they facilitate data flow and processing across different systems.

- Building a Data Processing Topology: Lerche outlines how to implement a data processing pipeline using Storm's API and establish a topology for processing tweets, extracting hashtags, and aggregating counts efficiently.

- Handling Failures: He discusses the inevitability of failures in distributed systems and how Storm manages message processing guarantees such as at-least-once processing, explaining its approach to failure recovery and how it prevents message loss.

- Final Thoughts and Impact: Lerche concludes with a recap of Storm’s capabilities, emphasizing its power in handling stateful jobs, complex data processing flows, and its overall value for developers looking for scalable solutions to data handling problems.

Overall, Lerche's talk provides an in-depth exploration of Apache Storm's features and benefits for developers, reinforcing the importance of real-time data processing in today's applications.

00:00:16.320 Hello, my name is Carl Lerche, and I'm going to be talking about Apache Storm.
00:00:21.920 I am shooting for 40 minutes. It's going to be a long talk; I'm going to try to cram everything in. I've already got a late start. There probably won't be time for questions. I'm also going to gloss over a few things that might seem a bit hand-wavy.
00:00:34.640 If something comes up or if you have a question, just send me a tweet directly, and I will respond to it after the talk, or just come and talk to me afterward. So, I work for Tilde, and our product is Skylight.
00:00:48.480 We're building a smart profiler for your Rails application, and I've actually built the entire backend using Storm. If you want to talk about how that works and how I've ended up using Storm in the real world, I'd be happy to discuss it.
00:00:59.760 So, what is Storm? Let's start with how it describes itself on the website. It calls itself a distributed real-time computation system, and that sounds really fancy.
00:01:13.360 The first time I thought about using it, I was hesitant and thought it was too serious for me. I needed something simpler. It took me another six months to finally take the time to learn it, and as I got to know it, I really came to understand that it can be used for many, many other use cases. I'm going to try to get into the nitty-gritty sooner than later.
00:01:30.320 But for now, I'm just going to say Storm is a really powerful worker system. Some things about it: it is distributed, very very distributed, which is both good and bad. The good thing is you can spread your load across many servers. The bad thing is your operations system is going to look something like this, which can turn out to be somewhat of an operational headache.
00:02:03.360 You'll end up running a Zookeeper cluster, having to run the Storm Nimbus process somewhere, and there are going to be a number of other Storm coordination processes. Every server is going to have the worker process, and while you're at it, if you've gone this far, let's throw in a distributed database and some distributed queues. The main point is there is going to be operational overhead, so don't just do it for fun. There needs to be a good reason. If you can get your job done on a single server, go for that.
00:02:38.239 Storm is fault-tolerant, meaning it is able to recover and continue making progress in the event of a failure. Many existing systems claim to do this, but the reality is that handling faults is really hard. This does not make it seamless; the reality is it can't be seamless. However, I think the way it handles faults is about as easy as it can get.
00:03:20.960 It's really fast with very low overhead. Some completely useless benchmark numbers: it's been clocked at processing over a million messages per second on a single server. This number is completely useless, but it sounds good.
00:03:32.560 Storm is supposedly language agnostic, meaning you can use almost any language to build your data processing pipeline. There are examples on the website where they use Bash, but I don't know why—it's probably for novelty. I've personally only used it on the JVM; it's a JVM-based project written in Java and Clojure. It uses many Java libraries.
00:03:51.599 I recommend trying it on the JVM, and the good news is we have JRuby, which has done an extremely good job at integrating with Java. This means you can use Java libraries while writing Ruby. I also want to gloss over the details of getting it running on JRuby, but there is a GitHub project called RedStorm. You could probably just Google 'GitHub RedStorm.' I realized I didn't actually include any links, but I can provide them later. RedStorm does a JRuby binding for Storm and provides Ruby DSLs for everything related to Storm, allowing you to define all your data transformations in terms of Ruby DSL.
00:04:34.400 I'm going to build up a straw man to show how one might implement trending topics on Twitter. We all hopefully know what a trending topic is; it's when everybody tweets and can include hashtags like #RailsConf or anything else. As the rates of specific hashtags increase, the hashtags that occur at the highest rates get bubbled up, indicating the topics that are trending. This helps everyone see what's going on.
00:05:06.240 For example, around here, you might see #RailsConf trending. Another good reason to pick this topic is that it is very time-sensitive and ties into the real-time aspect of Storm. To implement this, I'm going to calculate the rate of how often a tweet is happening using an exponentially weighted moving average, which sounds fancy.
00:05:30.720 As we process the tweets, I want to count the number of occurrences of each hashtag every five seconds. The interval doesn't really matter, but then we're going to average them. Instead of doing a normal average, we'll do a weighted average, which will cause older occurrences to drop in weight exponentially. This is similar to how Linux calculates one-minute, five-minute, and fifteen-minute load averages. It's a nice way to smooth out the curves, and you can tune how you want the curves to look.
00:06:07.440 First, I want to build up some context. Let's look at how one might implement this with Sidekiq, Rescue, or any other simpler system that pops off of a queue with workers. In a Rails app, the process might involve pushing data into Redis, where each worker pops a message from Redis, reads state from the database, processes the message, and writes state back out to the database. The Rails app can then query the result by reading straight from the database.
00:06:38.560 This example isn't perfect, and I know it's naive, but the important part is that we have an entry method that takes in a tweet as an argument. The first step is to extract the hashtags from the body, creating an array of tags. I loop through each tag and check if there is already an existing record in the database. If so, I update the moving average for it and save it back.
00:07:10.560 The way I want to implement the moving average function is to keep track of how often we see certain hashtags. The first thing to do in the updates is to catch any updates, and I didn't pass it now, but in theory, it should update the rate for every bucket we decide we're going to smooth.
00:08:06.560 However, this poses a problem in our worker. If we do not receive any tweets with a specific hashtag, how do we handle that? We need to still update the record to lower the weight of that hashtag because we're not receiving any data. We have to implement a regularly scheduled task, possibly using cron, to run every bucket interval.
00:08:36.240 The first step in that task would be to delete all existing tags that are too old to clear the database and then load up all the tags to update the rates, making sure they are caught up. This points out a challenge: we now have to schedule a task that will read literally every record from the database at every tick.
00:09:06.480 Is it web-scale yet? Well, not yet, but we can start scaling out our workers. Let's spawn up three workers—now we're ready to handle 50,000 tweets per second! Or, maybe not quite yet, but there are more tricks we can employ.
00:09:30.080 As I mentioned, we know that the database will be a significant bottleneck because every single tweet will force us to read state, process it, and write it back. What if we could optimize by caching in memory? We can create a simple in-memory cache of hashtags such that every time we get a tweet that comes in, we check our cache before reading the database. If it’s not there, we create a new record.
00:10:03.760 Then, when the next tweet comes in, we check the cache again instead of querying the database. For instance, when we receive our next tweet, it could be something like 'What's for lunch at #RailsConf?' The worker processes that, increments our cache, and saves it back to the database.
00:10:32.160 While caching in this system is not trivial, there are multiple things we could do, such as pushing our cache externally to something like Memcached. But this comes at the expense of needing an additional process to preserve coordination. The main takeaway, even though we have many workers, is there will always be a high amount of coordination required.
00:11:02.079 Now, enter Storm. As you'd expect, all these problems can magically go away when we use Storm—though I wish it were that simple. First, let's discuss some abstractions. The core abstractions in Storm are streams and tuples. A stream is essentially a series of tubes through which data flows.
00:11:39.840 What they call streams, I think tubes might be a better term. Tuples are just like a list of values and can be anything—messages, strings, integers, any objects you want as long as you can serialize them. You're allowed to specify custom serialization, so as long as it can be serialized into JSON or any other format, it can be a tuple.
00:12:11.200 The rest of Storm consists of a series of primitives used to take these streams and tuples and transform the data from one representation to another. Let's discuss spouts and states: spouts are the sources of the streams.
00:12:39.040 This is how you get data into Storm; it's the starting point of the streams. You could read data from Redis, SQS, or even directly from the Twitter API. States, on the other hand, are how you get the results of the transformations out of Storm.
00:13:07.360 Thus far, we've established that the spout starts the stream and data flows through it to the states. Transformations are purely functional operations on the data—given one input, they will output the same values—so let's add some transformations.
00:13:45.760 For example, we have a spout that sends data, which we might filter and then aggregate. The data flows through to the state to get persisted. After this, we can perform more transformations.
00:14:18.360 The goal here is to model our data in terms of data flow to reach the endpoint. There is no limitation in terms of adding as many spouts as you wish, so long as you don’t end up with a convoluted understanding of the code.
00:14:50.160 Now, let's break down how these components connect together. The spout emits tuples to a filter. Generally, the spouts are standard, so if you are pulling data from Redis, you would use a Redis spout that already exists. Similarly, filters and transforms can often be abstracted into higher-level concepts.
00:15:29.440 For example, you might implement the filter function or transformation for Storm by extending the base function provided as part of the Storm API. The requirement here is to define a single method that takes the input tuple and produces the output stream of tuples. You get the message out of the tuple and process it.
00:15:54.960 The next step is to define the topology, which connects all the different transforms, spouts, and everything together. The important parts start at the topology object, where you define how data flows together. You typically start by defining the streams by naming them as your spout. For example, with my Queue Spout, you would specify the server details and parameters.
00:17:10.080 The spouts will output tuples as raw bytes, so the next step is to deserialize those raw bytes. Here, you would create a message deserializer, eventually chaining to the filter, which will filter based on predicates, and finally output the transformed tuples.
00:18:09.120 To get started with running this locally, you would simply initialize a new cluster object, define the topology, initialize the config, pass the topology, and submit it to the cluster—all while your local machine runs the entire process.
00:18:58.160 The next step is to persist the results. There are already existing libraries to store tuples entirely in Redis, which require no additional code. But I want to show you how to implement a state from scratch.
00:19:48.000 First, you create a class that inherits from state—this is part of the Storm API. You define the methods 'begin,' 'commit,' and 'commit.' The state updater also needs to define the method that will update the state with input tuples and allow for output tuples in case you need to emit more.
00:20:48.400 To hook this into the topology, you replace the logger with persisting the data to the state. The goal on the implementation side is to ensure you're expecting a tuple that contains the relevant message or data and that you use a basic updater.
00:21:38.640 Now, let's return to the initial Twitter hashtag example. The Storm topology might start with a tweet spout that gets tweets from a specified source. It would pass the tweets to the transform, which extracts hashtags and outputs them as tuples. An aggregating function will track counts and send that information to the state.
00:22:27.760 This is conceptual work where you define an extract function to process tweets, extract hashtags, and emit them as tuples. The aggregator then collects all hashtags and the state computes the moving average of the counts.
00:22:56.160 The aggregate function works similarly to Ruby’s 'inject' on 'Enumerable': it takes an initial state and iterates over the incoming tuples, updating the state and eventually outputting the aggregation as tuples down the stream.
00:23:36.000 Now, when implementing our state, it will inherit from 'state.' That piece will handle moving averages, which takes hashtags and counts for record-keeping. The key here is ensuring that the function allows us to find records by name, update them, and save them accordingly.
00:24:17.440 However, even though streams sound like unbounded sequences of tuples, we do need to determine when calls to certain operations will be made. Storm operates in batches: the coordinator assigns batch IDs for processing to organize workflow.
00:25:02.960 The ability for Storm to re-emit and hit the spout again means we can keep issueing tuples according to the processing batch. In effect, it creates a reliably managed data flow and emphasizes the importance of deterministic functions that do not alter states based on external actions.
00:25:50.640 The transformation functions must be purely functional. That means never using time.now; otherwise, they become non-deterministic. If you need the current time for a batch, it’s often better to have a spout emit the current time separately and capture it. Many spouts don’t engage in this deterministic process, so it’s worth considering that Kafka is an outstanding option for a queue that does maintain these guarantees.
00:26:55.680 Summarizing this all, Storm is a powerful platform for writing workers, especially those that are stateful. It's excellent for complex data processing flows—allowing for scalability and operational efficiency when dealing with high volumes of data.
00:27:45.440 The final challenge is ensuring we can recover from failures. We need to build resilience into our systems because failures will happen; that’s a matter of certainty in distributed systems.
00:28:26.960 When thinking about persistence and data loss, we need to analyze what guarantees are made when processing messages—like whether they're processed at least once or at most once. The best way to handle spikes in errors or fluctuations is to ensure proper bookkeeping while still balancing available resources.
00:29:23.360 Storm’s methodology indicates that each batch will be processed only after the previous one is fully committed, which helps prevent duplicate processing without significant downsides. As I mentioned earlier, returning current transaction IDs allows teams to establish consistency across messages.
00:30:22.720 Through the management of batch IDs and outputs, we can operate in a more predictable environment with fewer inconsistencies. Knowing how to handle failures forces you to build robust systems that can endure pressure in their operations.
00:31:20.240 In conclusion, understanding how to efficiently manage partitions and threads in your Storm processes can lead to a more distributed workload. This scalability, combined with proper error handling, will help ensure the integrity of your event processing.
00:32:06.080 Ultimately, whether you are processing tweets or anything else, the essential part is designing systems that work with concurrency in mind to prevent collapse under load while maintaining reactive responsiveness.
00:32:58.720 Thank you for your attention. I hope this session has helped you understand how to supercharge your workers with Storm and provided some ground to explore further.
00:33:55.520 Thank you all!
00:34:06.600 Thank you!