Event Sourcing

Summarized using AI

High availability by offloading work

Kerstin Puschke • April 12, 2021 • online

In this talk, Kerstin Puschke, an expert from Shopify, discusses high availability in applications, particularly in the context of handling unpredictable traffic spikes and ensuring seamless user experiences. Puschke emphasizes the importance of offloading intensive tasks, thereby avoiding system degradation during peak loads. The main approaches explored include background jobs, message-oriented middleware, and event logs like Kafka.

Key Points:

- Definition and Importance of High Availability:

- High availability is more than uptime; it involves ensuring users can interact with the system without degradation of service during heavy traffic.

- Background Jobs:

- Background jobs are utilized to offload tasks from the user-facing part of the system, ensuring that heavy processing does not interrupt ongoing user requests.

- At Shopify, background jobs handle up to tens of thousands of tasks per second, improving system responsiveness and availability.

- Advantages of Task Queues:

- Task queues act as buffers, allowing the system to queue jobs without impacting user-facing requests.

- They enable horizontal scaling by allowing multiple workers to handle tasks concurrently.

- Challenges with Background Jobs:

- Managing versioning and job parameters can lead to errors if not properly synchronized.

- Handling failures and maintaining consistency is crucial for maintaining high availability.

- Message-Oriented Middleware:

- This approach separates message routing and management from service logic, allowing for a more decoupled system.

- Events can be published and consumed without tight coupling, improving overall availability.

- It can handle consumer downtime by queuing messages, ensuring no data is lost.

- Event Logs (e.g., Kafka):

- Event logs provide a shared append-only log, allowing multiple consumers access to the same data without losing messages.

- They are useful for real-time applications and provide a single source of truth, allowing for event sourcing.

- Conclusion:

- Puschke concludes that while each approach has its complexities, they are essential for maintaining high availability in modern applications. Each strategy has its advantages depending on the architectural requirements—background jobs work well in monolithic structures, message-oriented middleware is suited for microservices, and event logs are beneficial for high-throughput needs.

Through her insights and case studies tailored around Shopify’s operational needs, Puschke highlights that choosing the right tool for offloading work is crucial for sustaining high availability and accommodating growth.

High availability by offloading work
Kerstin Puschke • April 12, 2021 • online

Unpredictable traffic spikes, slow requests to a third party, and time-consuming tasks like image processing shouldn’t degrade the user facing availability of an application. In this talk, you’ll learn about different approaches to maintain high availability by offloading work into the background: background jobs, message oriented middleware based on queues, and event logs like Kafka. I’ll explain their foundations and how they compare to each other, helping you to choose the right tool for the job.

RailsConf 2021

00:00:05.240 Thanksgiving is a particularly busy shopping weekend in the U.S., also known as Black Friday and Cyber Monday, or BF/CM. It's becoming increasingly popular in other countries as well.
00:00:12.059 Over BF/CM 2020, which I'll define later, merchants made sales exceeding 5 billion USD, peaking at over 100 million USD per hour. For us at Shopify, even the shortest downtime matters, and high availability of our applications is key.
00:00:22.980 Today, I'm going to compare different approaches to offload work and how they can help us ensure high availability while keeping our codebase as simple as possible. I'll start with a brief introduction about high availability and offloading work. I'll discuss background jobs, the features they provide, the challenges they might pose, and how we are using them at Shopify to ensure high availability of one of the largest running routes.
00:00:45.719 I will compare background jobs to message-oriented middleware like RabbitMQ and event logs like Kafka.
00:01:04.500 So what is high availability? Availability is much more than just uptime. The user needs to be able to interact with the system, and simply telling them to come back later is not good enough. We're talking about meaningful interactions, not just for one individual user, but for all users.
00:01:33.720 High availability is achieved if this holds true whenever users need it. For our application to have high availability, it must be capable of accepting incoming requests. More specifically, the user-facing part of the system has to handle this.
00:01:50.460 If the same part of the system is doing all the heavy lifting related to processing the requests, it might become overwhelmed and too busy to accept more requests. That’s why we prefer to offload work away from the user-facing part of the system into the background whenever possible. This means whenever there is something that can be done outside of the main request-response cycle.
00:02:21.959 In the Ruby world, background jobs are a popular way to offload work into the background. At Shopify, we utilize widely used background job backends. A background job is a unit of work to be completed later in the future.
00:02:49.080 The application server forks a worker to run the job, which can even be on a different machine, but the two must communicate. For example, this is necessary to address failures. The application server, however, shouldn't have to wait for the worker to finish the task, so we require some form of asynchronous communication between the application server and the worker.
00:03:07.739 We can facilitate this by placing a method in a queue, whereby producers and consumers of messages can interact with the queue independently at different times. The message is the task assigned to the worker, which is why we also refer to this queue as a task queue.
00:03:39.180 The application server does not have to wait for the worker to process a message, and by setting up several workers, we can tackle multiple tasks simultaneously. This is at the core of a background job backend: a task queue along with broker code for managing the workers.
00:04:21.900 It encapsulates the asynchronous communication between the application server and the workers, simplifying our background job code since it simply does not need to be aware of being executed in the future.
00:04:29.880 Shopify queues tens of thousands of jobs per second, utilizing them for a broad spectrum of use cases, such as firing webhooks or purchasing shipping labels. The features that make background jobs attractive include decoupling user-facing requests served by the application server from any time-consuming backend tasks handled by a worker. This improves response times, benefiting all users.
00:05:07.139 Shopify faces traffic spikes, sometimes reaching up to 170,000 requests per second. A sudden spike, such as image uploads, won’t adversely affect us since the actual image processing is conducted by a background job.
00:05:20.580 Though our appeal may grow, the speed of queuing more jobs is not constrained by the speed at which they are processed. Users can still interact with the application as quickly as usual. Our message queue acts as a buffer, providing high availability in the face of unpredictable traffic.
00:05:49.560 When dealing with a large task, we can break it down into several smaller subtasks, treating these as individual background jobs. If we set up multiple workers, they can process these subtasks in parallel. Our accounting game remains simple, as it does not need to consider parallel processing; it only needs to manage one of the subtasks.
00:06:36.600 When a worker encounters an error while running a job, we can try again later. The background job code is not aware of this retry process, keeping it straightforward. All of this error handling occurs in the background, ensuring user-facing application server availability and response times remain unaffected.
00:07:06.360 The background job backend encapsulates the asynchronous communication between the application server and the worker, but challenges remain. The application server queuing the job and the worker processing it may run different versions of our codebase, especially during rollouts.
00:07:49.659 Changes to job parameters could disrupt our application. A job queued with new parameters might be processed on a worker expecting the old ones, necessitating smooth changes for backwards compatibility until our legacy jobs are cleared from the queue.
00:08:30.000 When a job finishes successfully, the worker confirms it, permitting its removal from the queue. If a job is not confirmed in time, we can allow a second worker to pick it up, but if the first worker is simply slow, the job will execute twice.
00:09:07.020 If we do not allow a second worker to pick up the unconfirmed job, and the first worker crashes, the job will not run at all. Hence, we must choose between at least once and at most once delivery. The latter is preferable if the consequences of running the job twice are worse than not running it at all.
00:09:47.940 However, this often indicates a flaw in our code. For instance, if we worry about charging a customer twice, we should carefully track all charges made and confirm that state before attempting to charge again.
00:10:31.020 Our job queue is likely persisted in a different data store than our operational data; operational data might reside in MySQL while the job queue is in Redis. This separation means queuing a job does not fall under any enclosing database transaction.
00:10:55.050 Even if queued within that transaction, our job might process before the enclosing transaction commits or might revert if the transaction fails, leading to an unavailable state. We can mitigate this by deferring job queuing until we write the job parameters into a staging table within our main data store.
00:11:46.899 This staging can be part of an enclosing database transaction, with a scheduler checking the table to queue jobs accordingly. The scheduler will report back when jobs are successfully queued, confirming our jobs are executed. However, we still risk spreading database write operations over multiple locale transactions.
00:12:31.680 Reaching eventual consistency might be impeded if some jobs fail. We can apply a pattern called Saga, specifically the command orchestrator pattern, which means a job reports back to the staging table upon successful completion, allowing the scheduler to spot inconsistencies for resolution.
00:13:10.680 Jobs leave the queue in a predefined order yet are processed by different workers. If a previous job encounters an error, it may delay the processing of subsequent ones. This could lead to executing jobs out of order. However, the first job can either queue the second as a follow-up to ensure correct order execution.
00:14:06.939 This form is seen in a different flavor of the Saga pattern called event choreography. While easy to build, it can prove complicated to debug, as it may push all problems down an extended chain of jobs.
00:14:57.600 Alternatively, we can employ the command orchestrator flavor of the Saga pattern to sidestep these complexities, albeit with more initial implementation effort.
00:15:49.919 A background job does not need to match the speed of a user-facing request, but long-running jobs can create challenges. Rescue, for instance, prevents worker shutdown while jobs remain active, limiting deployment options. This aspect is not very cloud-friendly, as resources may not be available indefinitely.
00:16:31.800 Psychic behaves better by aborting and requeuing jobs if the worker attempts to shut down while processing, but if deployments outpace job completion, those jobs will never succeed. With Shopify deploying about 40 times per day on average, this isn't a mere theoretical issue.
00:17:21.480 Most long-running jobs tend to iterate over extensive collections of data. We split our job into iterable collections and the specific tasks for each iteration. After each iteration, we set a checkpoint, allowing job processing to resume at that point during the next queue cycle.
00:17:44.760 This results in interruptible jobs that support automatic resuming, allowing us to shut down workers at any time, including for more frequent deployments. If our database is strained, we can throttle jobs or pause them entirely to avert exacerbating ongoing incidents.
00:18:02.760 When data moves among different database shards, we can interrupt jobs safely. Our job iteration framework is open-source and available on GitHub. These results can be achieved by breaking jobs into sub-tasks and managing them independently, but keeping everything unified as one job allows us to leverage regular retry mechanisms.
00:18:54.660 It also simplifies tracking overall success or failure since failures aren't retries, effectively encapsulating scalability issues into a distinct abstraction layer, enabling us to keep our background class implementations remarkably simple.
00:19:28.560 Our background job backends run on Ruby gems, utilized by both application servers and workers. The task to be completed is an instance of a background job class, a Ruby object passed around. This class requires implementation on both the application server and worker sides.
00:20:20.700 Thus, background jobs are excellent for offloading work to a worker running the same codebase as the application server, characteristic of a monolithic architecture.
00:21:02.280 To sum it up, background jobs are based on task queues, which can inadvertently complicate the system due to involved asynchronous communication and the absence of guarantees. However, abstraction of this complexity allows us to maintain the simplicity of our specific background job classes.
00:21:43.320 While background jobs work well with monoliths, extracting a service, such as image processing into a separate microservice requires a different communication method, which is where message-oriented middleware comes into play. This type of middleware can leverage various protocols, with several implementations, such as RabbitMQ, being quite popular.
00:22:34.920 Essentially, message-oriented middleware also relies on task queues, but the logic for message routing and managing workers exists within this middleware, kept separate from our services. Therefore, we no longer pass Ruby objects around; instead, message producers and consumers agree on a standardized interface, like a JSON payload.
00:23:22.320 This allows each service to be replaced with a different implementation, even employing entirely different technologies, as long as it complies with our standardized interface.
00:24:02.760 By utilizing task queues, message-oriented middleware retains many features of background jobs while adapting to a decoupled microservices architecture.
00:24:45.840 Although I don't work directly with this technology at Shopify, I crafted an example to explain. Imagine a business partner going out of business, which necessitates updates to their data in the business partner’s service.
00:25:32.880 This could mean canceling pending orders or putting their support contracts on hold. Other services must also learn about this status, and message-oriented middleware can be used to broadcast this information.
00:26:10.440 The business partner's microservice produces a message about this event, while client services can consume it. If a message consumer happens to be down, the messages remain in the queue until the service returns, ensuring that no information is lost during temporary outages.
00:26:53.240 The message producer can drop the message into the queue and proceed, unaffected by any downtime from message consumers, thus maintaining the high availability of our message producer, which often serves as the user-facing application server.
00:27:38.100 All this functionality is embedded within the broker, the messaging middleware, meaning our concrete message consumer code can remain straightforward.
00:28:20.799 Message-oriented middleware typically doesn’t employ a single point-to-point queue but rather utilizes topics; for example, in AMQ, each message consumer has its queue set up, with the middleware duplicating messages into the appropriate queues.
00:29:03.480 Thus, unlike HTTP communication, we aren’t restricted to one-to-one routing. Topics provide us with publish-subscribe capabilities.
00:29:48.480 A new service can subscribe to messages it finds relevant by creating its own queues, without the message producer needing to be aware of this new consumer. For instance, if microservices issue warning messages upon detecting suspicious activity, a message consumer can aggregate these into a comprehensive fraud score.
00:30:34.620 This means that new message producers can integrate seamlessly into the system and start dispatching messages with no adjustments needed from consumers. Since message producers and consumers are not aware of one another’s existence, the failures of any single service have minimal impact on others, improving overall system availability.
00:31:06.720 Using these features does present challenges similar to background jobs, as we again encounter queues which can lead to delivery inconsistencies. Designing our messages involves outlining the database interface between producers and consumers, requiring careful consideration of versioning.
00:31:44.240 As illustrated in my examples, we route messages from one producer to multiple consumers or several producers to one consumer. Although we could technically write messages end-to-end, doing so complicates breaking changes as coordination is required across multiple producers and consumers.
00:32:16.920 In my experience, our business domains are more accurately modeled by various messages—some routed one-to-one and others one-to-end—rather than a single comprehensive message.
00:32:58.680 Background jobs generally signify commands, such as processing this image or sending this email. Thus, in a microservices architecture, we might replace background commands with messages conveying similar information—for example, sending a message to the email service to deliver a welcome email to a new user.
00:33:37.560 However, this setup forces our message producer to depend on knowing about message consumers and what actions to take with the information handed over, leading to unfortunate coupling.
00:34:05.640 Conversely, we can use event-driven designs, providing information about actions already executed. For example, we could generate an event notifying of a new user sign-up. All interested services can subscribe to this message, with the email service sending out the corresponding welcome email.
00:34:49.440 This decouples services, allowing the message producer to remain unaware of what consumers do with the sign-up information, and the knowledge about sending welcome emails stays encapsulated within the email service.
00:35:32.280 There are multiple flavors of event messages to consider; for instance, if a user updates the same information several times consecutively, the last message received by the service may not stem from the most current update due to the possibility of out-of-order messages.
00:36:23.280 If messages are commutative—in that their payloads just include, say, the user ID without any additional detail—the order does not cause an issue. An ID suffices for some operations, like caching validation, but often consumers must reach back to message producers for complete updated data.
00:37:07.440 This could result in significant traffic, and the consumer may lack a complete history of state changes, learning only the state of the account at the time of fetching the latest label, missing interim updates.
00:37:52.680 This challenge contrasts with event-carrier state transfers, where payloads contain all the updated data, enabling consumers to avoid unnecessary fetches from producers and maintaining a detailed history of changes.
00:38:42.180 Nevertheless, consumers must address out-of-order delivery, a responsibility that typically falls upon them.
00:39:10.680 As developers, there is a natural inclination to streamline our message producers through Active Record models, but if we conflate our thinking with Active Record, it’s easy to inadvertently expose implementation details.
00:39:54.240 For instance, a message consumer may not care if a specific field in the producer's database is modified; in fact, they should remain ignorant of its existence, as that's an implementation detail of the message producer. Similarly, users do not wish to update fields in databases; they want to accomplish real-life tasks within their business contexts.
00:40:37.680 This connection lies within our business logic rather than in our persistence layer, allowing us to avert the risk of leaking implementation details by focusing on business domain events, firing events from within the application's business logic rather than from the database migrations.
00:41:23.160 Messages are removed from the queue after consumption, preventing us from replaying the message stream. Each service operates within its own reality, yielding a high level of decoupling, but while these services may represent the truth for a specific dataset, we lose a cohesive overall state, complicating debugging.
00:42:01.440 This challenge hampers actions like event sourcing; however, message-oriented middleware enables us to offload work into highly decoupled microservices. This decoupling is beneficial for high availability, but it leaves us without a comprehensive system-wide state.
00:42:43.140 In summary, message-oriented middleware is built on queues and topics, which may complicate the system due to the involvement of asynchronous communication and lack of guarantees; yet it simplifies middleware by abstracting the complexity of message consumption.
00:43:34.440 This allows us to build straightforward concrete message consumers, making message-oriented middleware an effective solution for decoupled microservices but leaving behind the idea of a universal system state.
00:44:13.440 Event logs approach event messages differently, with Kafka as a prominent example. Events are persisted within an append-only log shared by all consumers, who can read from it at any time.
00:45:02.760 The stateless broker does not track event consumption and simplifies the architecture, but with this stateless broker and append-only writes, we achieve an incredible level of focus, great for high availability and well-suited for real-time applications and data streams.
00:45:56.520 Events are not purged after processing, thus enabling us to replay the stream of events at will, which forms a solid foundation for event sourcing.
00:46:39.240 Event logs represent an excellent method of offloading work to microservices while concurrently maintaining the notion of a system-wide state.
00:47:39.840 To summarize, background jobs utilize queues and are a great fit for monolithic architectures. Message-oriented middleware employs topics and queues, thriving in decoupled microservices. In contrast, event logs utilize a shared log, lacking queues, providing significant advantages for event sourcing and high-throughput applications.
00:48:53.520 Shopify processes millions of Kafka messages per second. This is a recording of a Kafka-powered real-time visualization of Shopify merchants making sales during BF/CM 2020, with each particle representing a real order. Thank you for listening.
Explore all talks recorded at RailsConf 2021
+61