Things I Wish I Knew Before I Tried Implementing a Task Queue
May 19, 2023 :: 7811 wordsLet’s say you’re writing a web application. You have a long-running task that you want to run in the background. Maybe it’s a task that takes a long time to complete, or maybe it’s a task that you want to run at a later time; or maybe you just want to process it in aggregate somewhere else. Either way, you don’t want to block the user’s request while the task is running.
This is a very common problem. In fact, it’s so common, there are a number of services to help you solve it. ActiveJob is a built-in solution for Ruby on Rails. Celery is a popular solution for Python. Sidekiq is a popular solution for Ruby. If you don’t mind vendor lock-in, AWS’s SNS paired with SQS is a great solution; or Google Cloud’s PubSub.
But my application was small. I don’t need anything too fancy for what I’m doing; I just need something reliable that can handle a few hundred tasks a day. I don’t want to pay for a service, and I don’t want to have to manage making everything Just Right™. Surely, creating a task queue from scratch wouldn’t be hard, right? I could just use a Redis server to store the tasks, and a worker process to pull them off the queue and process them. How hard could it be?
The Basic Framework
Let’s start with the basics. We’ll need a way to add tasks to the queue, and a way to process them. We’ll also need a way to store the tasks. We’ll choose Redis here, as Redis is easy to set up, and has a number of data structures that are useful for this task.
We have some number of producers that generate tasks and push them into this queue. These producers would be the front-end of our application; ideally, once we’ve pushed into the queue, and we receive confirmation from the queue that it has indeed received it, we should be confident that we won’t lose that task1, so we can then return a response to the user.
Tasks should be processed in the order they are received, as often times later tasks depend on earlier tasks to have been completed. For example, if you allow users to upload avatars to your application, you might want to process those avatars before they are displayed to the user - but what if the user changes the avatar again before the first one is processed? You don’t want to display the second avatar (as it was processed first), then the first avatar (as it was processed second), so for some applications, ordering can be very important.
Finally, you have the consumers, which pull tasks off the queue and process them. These would be servers that are running in the background, and are responsible for actually doing the work. They take a task off of the queue, process it (for the avatar example, they might want to check the dimensions, check that it is actually an image, resize it, virus check it, all sorts of things), and save the results.
This is a good start. In redis, all you’d need is a list - LPUSH
items into
the list, and BRPOP
them off on the consumer end. You can even use any
version of Redis 2.0 and on, since only BRPOP
is the most restrictive, being
available in 2.0.0 and on. Simple. Nothing can go wrong.
Well…
What Can Go Wrong?
It is often prudent to take a look at a system to see what aspects of it can fail. This very simplistic model assumes two major things:
- The consumer will never fail to process a task.
- The consumer will never fail to acknowledge that it has processed - or failed to process - a task.
These seem very similar, but the second assumption is even more restrictive than the first. Let’s take a look at what happens if these assumptions are broken.
The Consumer Fails to Process a Task
Let’s say that while processing an avatar, a transient fault occurs - maybe there is a hiccup in the networking, and the consumer is unable to connect to the database2. As this connection is essential to completing the task, the consumer cannot finish the task - the task has errored. But it’s a transient fault - the database is fine, and the consumer will be able to connect to it again in a few seconds.
So we should retry the task. But we shouldn’t retry it immediately - we’re
still unable to connect to the database. Assuming we don’t know the nature of
the failure - when the error is happening, we can’t know it’s transient - we’ll
push the task back onto the queue using RPUSH
to let another consumer that
is likely able to connect to the database.
But what if the issue is caused by the task itself? What if the task is malformed, and the consumer is unable to process it? We can’t just keep retrying the task, as it will never succeed. We need to be able to handle errors in the task itself. So we’ll include a retry counter in the task, and if the consumer fails to process the task, it will increment the retry counter and push it back onto the queue. If the retry counter exceeds some threshold, we’ll just discard it.
But what if the database is down for a long time? We don’t want to keep discarding all tasks that are unable to connect to the database - that’s not an issue with the task, that’s an issue with the database; but the retry counter will keep incrementing, and eventually we’ll discard the task3. Or, maybe we want to know what keeps causing issues with the task queue, so we can fix it. We can’t do that if we keep discarding the task. So we’ll send tasks that fail to connect to the database to a different queue, a “dead letter queue” - a queue that is used to store tasks that have failed, and need to be looked at by a human. This way, we can keep track of what tasks are failing, and why, and we can fix the issue.
The Consumer Fails to Acknowledge a Task
Let’s take another look at the above scenario. The consumer is processing an avatar, and a fault occurs - only, instead of being a recoverable error, let’s say that the consumer process is killed randomly4. The task is lost to the abyss. As the process itself was the one keeping track of the task, including notifying the queue when it succeeded (or failed), if the process is unable to do so, the task is lost, never to be completed. A failure to acknowledge a task, to the queue, is success, rather than failure.
So we need to be able to handle this. We need to be able to detect when a consumer has died, and requeue the task. But how do we know when a consumer has died? This is a hard problem5. We can’t just check if the process is running, as it might be just spinning its wheels, doing nothing. Or it could just be working away at the task, invisible to the outside world. For the sake of reducing the complexity, we’ll put a timeout on how long it takes to process a task. If a consumer takes longer than n seconds to process a task, we’ll assume it has died, and requeue the task.
Of course, we can’t rely on the consumer to tell us how long it’s taking to process a task - if it hard crashed, it won’t be able to tell us anything. So the queue system now needs to keep track of the consumer, and how long it’s taking to process a task. This is a bit of a problem, as the queue system doesn’t know anything about the consumer, the way we’ve set it up - it just has a list of items, maybe two, and that’s it. It doesn’t know who the consumers are, let alone which consumers have which task, let alone how long it’s taking them to process the task. So we need to add some sort of registration system to the queue, so that consumers can register themselves with the queue, and the queue can keep track of them.
Thus, for the sake of reliability, LPUSH
and BRPOP
are not enough.
Take Two
This time, we’ll try to resolve the issues we ran into above. We’ll start with the same basic setup - a producer pushes into a queue, and a consumer pops from the queue. But this time, we’ll add a few more things.
The same general outline is here. We’ve got the producers pushing into the queue, and consumers pulling items from the queue. But this time, we’ve got a few more things going on. We’ll focus on the queue system, as that’s where most of the changes are.
Instead of having one fixed queue for all consumers to pull from, we have one “general queue,” where all of the items that are pushed into the queue are pushed into. Then, we have a number of consumer-specific queues, one for each consumer. When a consumer is ready to consume a task, it pops from its consumer-specific queue. If the consumer-specific queue is empty, it pops from the general queue. This way, we can have multiple consumers pulling from the same queue. The consumer-specific queue should only contain items that the consumer is currently working on; not items that the consumer will work on6. This is important. At the same time, the consumer registers its presence with the queue, so that the queue knows which consumers are currently active. This way, the queue can keep track of which consumers are currently active, and which consumers are currently working on which tasks.
If a consumer dies, the items in its queue can be released back into the general queue, and another consumer can pick them up. This way, we can detect when a consumer has died, and requeue the items that it was working on, preventing tasks from being lost. Note, however, that this necessarily requires that tasks can be processed more than once - if a consumer fails just before it would have acknowledged the task, and the task is requeued, then the task will be processed twice. This is a tradeoff that we have to make, in order to ensure that tasks are not processed zero times.
If a task fails on a consumer, the task can be pushed back into the general queue and have its retry counter incremented. This way, the task can be retried on a potentially different consumer, and the consumer that failed can continue processing other tasks. If it fails too many times, it can be pushed into a “dead letter queue,” where it can be inspected by a human, and the task can be retried manually.
Interestingly, this can still be implemented with Redis tasks. The producer
can LPUSH
into the general queue. When a consumer starts, it must set a key
specific to that consumer and queue (e.g. queue:name:consumer:1:active
), and
set the expiration to the timeout for liveness (e.g. 30 minutes; the consumer
must then take care to “touch” the key half of that interval, to ensure that
the key doesn’t expire while it’s alive), and add itself to the list of
consumers for that queue. Then, when a consumer is ready to consume a task, it
can do the following, in order:
- Check its own queue for a task. If a consumer crashed while processing a task, and is able to come back up to operation within liveness time, the task will still be in the queue, and the consumer can pick it up again. It might make sense to increment the retry counter here on the task, as it’s possible that the consuemr crashed because of a bug in the task; this is an implementation detail, though.
- Check all other consumers’ queues for task. As all consumers add their information to the queue system, we know which consumers have been active; we can then filter down based on which aren’t currently, and for those that aren’t currently, we can steal their tasks, assigning them to ourselves. See (3) on how to move a task from one queue to another.
- If we’re unable to get a task any other way, then we can try pulling from the
general queue. In version 6.2.0, Redis added
BLMOVE
as a command - this takes an item from one list, and moves it to another list, blocking if the former does not have any items for the given timeout. (It also allows you to specify which direction to remove and append the item, which is nice.) But before that, there wasBRPOPLPUSH
(available since 2.2.0); both commands describe how to create a reliable queue system in Redis, the same way we’re doing it! These commands are atomic - popping one item off from the general queue and pushing them onto the consumer-specific queue is done in one operation, ensuring that we’re the only one that gets the task7.
To be fair, this isn’t incredibly efficient - there are multiple round trips here, and the number of round trips (if not careful) can potentially scale with the number of consumers8. But it’s a basic system that can be implemented pretty easily.
So, a producer’s code might look like this:
1 | use std::collections::HashSet; |
2 | |
3 | use anyhow::Context; |
4 | use redis::aio::Connection; |
5 | use redis::{AsyncCommands, Client, Direction}; |
6 | |
7 | static TASK_QUEUE_KEY: &str = "queue:main:tasks"; |
8 | static CONSUMER_SET_KEY: &str = "queue:main:consumers"; |
9 | |
10 | fn consumer_liveness_key(name: &str) -> String { |
11 | format!("queue:main:consumers:active:{}", name) |
12 | }
|
13 | |
14 | fn consumer_processing_list_key(name: &str) -> String { |
15 | format!("queue:main:consumers:list:{}", name) |
16 | }
|
17 | |
18 | #[tokio::main]
|
19 | async fn main() -> Result<(), anyhow::Error> { |
20 | // This will only fail if the url is badly formatted.
|
21 | let client = Client::open("redis://localhost:6379/10")?; |
22 | let mut connection = client |
23 | .get_async_connection() |
24 | .await
|
25 | .context("failed to open connection to redis server!")?; |
26 | let message = serde_json::to_string(&serde_json::json!({ |
27 | "id": uuid::Uuid::new_v4(), |
28 | "payload": {} |
29 | }))
|
30 | .context("failed to serialize message!")?; |
31 | |
32 | connection
|
33 | .rpush(TASK_QUEUE_KEY, message) |
34 | .await
|
35 | .context("failed to push message to queue!")?; |
36 | |
37 | Ok(()) |
38 | }
|
And a consumer’s code might look like this:
1 | use std::collections::HashSet; |
2 | |
3 | use anyhow::Context; |
4 | use redis::aio::Connection; |
5 | use redis::{AsyncCommands, Client, Direction}; |
6 | |
7 | static TASK_QUEUE_KEY: &str = "queue:main:tasks"; |
8 | static CONSUMER_SET_KEY: &str = "queue:main:consumers"; |
9 | |
10 | fn consumer_liveness_key(name: &str) -> String { |
11 | format!("queue:main:consumers:active:{}", name) |
12 | }
|
13 | |
14 | fn consumer_processing_list_key(name: &str) -> String { |
15 | format!("queue:main:consumers:list:{}", name) |
16 | }
|
17 | |
18 | #[tokio::main]
|
19 | async fn main() -> Result<(), anyhow::Error> { |
20 | let client = Client::open("redis://localhost:6379/10")?; |
21 | let mut connection = client |
22 | .get_async_connection() |
23 | .await
|
24 | .context("failed to open connection to redis server!")?; |
25 | |
26 | // define the consumer's name.
|
27 | let consumer_name = "consumer-1"; |
28 | // define the blocking timeout for blmove in milliseconds.
|
29 | let blocking_timeout_ms = 5000; |
30 | |
31 | // Set our liveness key to 1, with an expiration of 30 minutes.
|
32 | // Once this expires we're considered dead.
|
33 | // It is an excercise for the reader to implement a heartbeat.
|
34 | connection
|
35 | .set_ex( |
36 | consumer_liveness_key(consumer_name), |
37 | "1", |
38 | 1800, // 30 minutes |
39 | )
|
40 | .await
|
41 | .context("failed to set the liveness key!")?; |
42 | |
43 | // Now, we can add ourselves to the consumer set.
|
44 | connection
|
45 | .sadd(CONSUMER_SET_KEY, consumer_name) |
46 | .await
|
47 | .context("failed to add consumer to set!")?; |
48 | |
49 | loop { |
50 | let incoming = next(&mut connection, consumer_name, |
51 | blocking_timeout_ms) |
52 | .await
|
53 | .context("failed to load next task!")?; |
54 | |
55 | // Pushing messages into a DLQ upon failure is an excercise
|
56 | // for the reader.
|
57 | for message in incoming { |
58 | let task = serde_json::from_str(&message) |
59 | .context("failed to deserialize message!")?; |
60 | process(task).context("failed to process message!")?; |
61 | }
|
62 | |
63 | // We've finished processing, so we can remove the items in the
|
64 | // processing list.
|
65 | connection
|
66 | .del(consumer_processing_list_key(consumer_name)) |
67 | .await?; |
68 | }
|
69 | }
|
70 | |
71 | async fn next( |
72 | connection: &mut Connection, |
73 | consumer_name: &str, |
74 | timeout: usize, |
75 | ) -> Result<Vec<String>, anyhow::Error> { |
76 | 'retry: loop { |
77 | // Load our processing list.
|
78 | let processing_list: Vec<String> = connection |
79 | .lrange(consumer_processing_list_key(consumer_name), 0, -1) |
80 | .await
|
81 | .context("failed to load processing list!")?; |
82 | // We've received an item from our processing list, so we can
|
83 | // return it. This is where we'd normally do some sort of
|
84 | // retry counter for the message.
|
85 | if !processing_list.is_empty() { |
86 | // This is the only return in this function to make sure
|
87 | // that whatever we return is in the processing list.
|
88 | return Ok(processing_list); |
89 | }
|
90 | |
91 | // Now, we perform "work stealing." Get the list of consumers,
|
92 | // excluding ourselves, and check if any of them are dead.
|
93 | // If so, move something to ourselves.
|
94 | let mut consumers: HashSet<String> = connection |
95 | .smembers(CONSUMER_SET_KEY) |
96 | .await
|
97 | .context("failed to load consumer set!")?; |
98 | consumers.remove(consumer_name); |
99 | |
100 | for consumer in consumers { |
101 | const LIVENESS_MESSAGE: &str = |
102 | "failed to load external consumer's liveness key"; |
103 | // redis's exists returns a 1 if the key exists, 0 otherwise.
|
104 | let exists: u32 = connection |
105 | .exists(consumer_liveness_key(&consumer)) |
106 | .await
|
107 | .context(LIVENESS_MESSAGE)?; |
108 | if exists == 0 { |
109 | const DEAD_MESSAGE: &str = |
110 | "failed to move message from dead consumer!"; |
111 | // The consumer is dead, so we can move something from
|
112 | // their processing list to ours.
|
113 | let message: Option<()> = connection |
114 | .lmove( |
115 | consumer_processing_list_key(&consumer), |
116 | consumer_processing_list_key(consumer_name), |
117 | redis::Direction::Right, |
118 | redis::Direction::Left, |
119 | )
|
120 | .await
|
121 | .context(DEAD_MESSAGE)?; |
122 | if message.is_some() { |
123 | // We've successfully stolen a message, so we can
|
124 | // return it.
|
125 | continue 'retry; |
126 | }
|
127 | }
|
128 | }
|
129 | |
130 | // No consumers to steal from, and no items in our queue, so
|
131 | // remove from the main queue and add to our processing list.
|
132 | connection
|
133 | .blmove( |
134 | TASK_QUEUE_KEY, |
135 | consumer_processing_list_key(consumer_name), |
136 | Direction::Right, |
137 | Direction::Left, |
138 | timeout, |
139 | )
|
140 | .await
|
141 | .context("failed to pop message from queue!")?; |
142 | }
|
143 | }
|
144 | |
145 | fn process(message: serde_json::Value) -> Result<(), anyhow::Error> { |
146 | // Do something with the message.
|
147 | println!("processing message: {:?}", message); |
148 | Ok(()) |
149 | }
|
Packaging this up into a library, adding quality of life features, retries, and the liveness check are all left as exercises for the reader. I also make no guarantees that the above code is bug-free, but it should be a good starting point - it should not be used in production, especially not as-is.
It’s a shame that Redis doesn’t have a built-in for this, though. Really seems like one of those things they’d’ve added at some point, instead of having to abuse the list commands like this.
Redis’s Built-In For This
In Redis 5.0.0, the XADD
command was added. This command is to be used with
a new data type added in 5.0 called a stream. Streams are
an append-only data structure, where each entry is a key-value pair; while also
including stateful and optimization information around that.
Up until now, we’ve been using lists as a queue. But in reality, all we need is a data structure in which we can append items to the end, and mark items as being worked on (and by whom), or as being completed. An append-only log of tasks is a perfect fit - it’ll keep track of the order of tasks, as well as who’s working on what, what’s been completed, and what’s still pending9.
But before we can use streams, I need to introduce another concept we haven’t yet included into our model: consumer groups. We’ve been treating each consumer as all performing the same job, and doing the same thing for every task - so there’s no need to differentiate between them. But what if we have multiple consumers, and we want the same task to go to all of them? For example, we might want to gather image metadata quickly (e.g. dimensions, file size, etc.), and do more complex processing on other servers separately.
This is where consumer groups come in. A consumer group is a named group of consumers that all receive different tasks. Each consumer group essentially gets a “copy” of the queue, and each consumer in the group is assigned a task from that queue. This means that each task is only processed once within a consumer group, but can be processed multiple times across consumer groups.
A version of our system from above with consumer gorups might look something like this:
Note that in this variant, the producer has to push into both general queues for both consumer groups. We could offload that responsibility onto consumers - before polling, consumers could check a queue, and move all items in that queue into both consumer group queues - but that’s a lot of duplication and extra work in general10. However, Redis Streams has this concept built in as its core, as it’s completely free, no extra work required.
So, let’s take a look at how this would be implemented in Redis Streams. Again, Streams are an append-only log - and the reason we need one queue per consumer group previously is because we were popping items off of lists to claim them. Instead, with a stream, we can keep track of our position in the stream, and move that position around - when we claim a new item, we move that position forward. Thus, each consumer group is just a pointer into the stream, and consumers are just moving that pointer around.
However, within a consumer group, we still need to keep track of which tasks are pending. We can’t just use the stream itself, because we would need to be able to update the tasks with consumer information - and that would conflict across consumer groups. So each consumer group has a “Pending Entries List,” which is a structure that contains just that information - the ID of the task, the consumer that’s working on it, and the number of times it was delivered.
To remove a task from the pending entries list, we can use the XACK
command,
which effectively marks it as complete. Redis doesn’t offer a version of
NACK
, or negative acknowledgement, which would mark a task as incomplete and
put it back onto the effective queue; but it does allow recovery in case of a
failure. There is a way to emulate NACK
using XCLAIM
, but I’ll explain
that later.
To pull a task from the stream, for our use-case, we should use XREADGROUP
.
We’ll ignore XREAD
, which is the non-grouped version, because we want to
ensure that each task is only processed once within a consumer group. The
command takes the group we’re in, and the consumer name, and returns the next
n tasks from the streams specified. But you’ll notice that it takes an ID
argument. For XREADGROUP
, the ID we’ll care about here is >
- this is a
special ID to redis, which means “messages that were never delivered to any
other consumer in the group.” This is how we can ensure that each task is only
processed once within a consumer group11.
But again, we want to reclaim tasks that have been abandoned by the consumer.
Redis 5.0 includes the XPENDING
and XCLAIM
commands; before using
XREADGROUP
, we can use XPENDING
to get a list of tasks that have been
idle for a certain amount of time, and then use XCLAIM
to claim them; XCLAIM
allows us to specify the same idle timeout, so if we would claim a task that
another consumer has already claimed (and reset the timeout for), we can ignore
it. In addition, XCLAIM
allows us to update the idle time (and retry
count)12, so we can use this to emulate NACK
- we can just
set the idle time of the message to be past the idle timeout, and it’ll be
picked up the next time someone calls XPENDING
.
Redis 6.2 also includes the XAUTOCLAIM
command, which is a combination of
XPENDING
and XCLAIM
- you specify the group, the consumer, the minimum idle
time, the starting ID (0-0
is a good choice here), and the maximum number of
tasks to claim, and it’ll do the rest. The return value is similar to XCLAIM
.
There is one key difference, though - XAUTOCLAIM
returns only the next ID
to pass to XAUTOCLAIM
on the next call, and the items claimed (as well as
message IDs that no longer exist, but we don’t need this). XCLAIM
returns
the same information, about. However… XPENDING
returns more information.
It returns the idle time of the message, but more importantly, it returns the
number of times the message has been delivered. This is important, because
we want to track that, to push messages into a dead letter queue when it exceeds
a threshold. So, we’ll need to use XPENDING
either way to get the number of
deliveries.
So, this is what it would look like with redis streams:
That’s pretty neat! Now, let’s take a look at how we’d implement this. The
code for the producer wouldn’t change much, as we’d just need to XADD
to the
stream instead of LPUSH
to the list:
1 | use anyhow::Context; |
2 | use redis::{AsyncCommands, Client}; |
3 | |
4 | #[tokio::main]
|
5 | async fn main() -> Result<(), anyhow::Error> { |
6 | producer().await?; |
7 | consumer().await?; |
8 | Ok(()) |
9 | }
|
10 | |
11 | async fn producer() -> Result<(), anyhow::Error> { |
12 | // This will only fail if the url is badly formatted.
|
13 | let client = Client::open("redis://localhost:6379/11")?; |
14 | let mut connection = client |
15 | .get_async_connection() |
16 | .await
|
17 | .context("failed to open connection to redis server!")?; |
18 | |
19 | let id = uuid::Uuid::new_v4().to_string(); |
20 | let message = [("id", id.as_str()), ("payload", "{}")]; |
21 | |
22 | connection
|
23 | .xadd("stream:main", "*", &message) |
24 | .await
|
25 | .context("failed to push message to queue!")?; |
26 | |
27 | Ok(()) |
28 | }
|
The consumer, however, would change quite a bit.
- We need to create the consumer group the consumer is in, if it doesn’t exist.
This is done with the
XGROUP CREATE
command. Note that we must specify the ID of where the consumer group’s position is in the stream - we’ll use0-0
here, which means “the first message in the stream.” This is because we want to process all messages in the stream, and not just new ones. (If the group already exists, it’ll return an error, so we’ll need to make sure to handle that.) - We need to create the consumer within the consumer group. This can be
implicit with
XREADGROUP
- if the consumer’s name doesn’t exist, it’ll create it for us. We could useXGROUP CREATECONSUMER
as well. - Then, we’ll start by pulling all of the consumer’s pending messages, in case
we had crashed and need to recover our tasks. We’ll use
XREADGROUP
for this - if we specify>
for the ID, it’ll pull all of our pending messages for us. This has the side effect of incrementing the number of deliveries for each message, and resetting the idle time. - If we don’t have any pending messages, we’ll use
XAUTOCLAIM
to claim messages that have been idle for a certain amount of time. - If we get any messages for either of these, we’ll check their delivery count
with
XPENDING
- if it’s over a certain threshold, we’ll push it to a dead letter queue, andXACK
it. Otherwise, we’ll process it, andXACK
it. - Finally, if there are no pending messages, and no idle messages, we will use
XREADGROUP
with>
to pull the next message from the stream, blocking for a few seconds. If we get a message, we’ll process it, andXACK
it. - Go to (3).
This might look something like this:
1 | use std::collections::HashMap; |
2 | |
3 | use anyhow::Context; |
4 | use redis::aio::Connection; |
5 | use redis::streams::{ |
6 | StreamId, |
7 | StreamPendingCountReply, |
8 | StreamReadOptions, |
9 | StreamReadReply
|
10 | };
|
11 | use redis::{cmd, AsyncCommands, Client, FromRedisValue}; |
12 | |
13 | static STREAM_NAME: &str = "stream:main"; |
14 | // Left as an exercise to make this configurable.
|
15 | static GROUP_NAME: &str = "group-a"; |
16 | // Ditto.
|
17 | static CONSUMER_NAME: &str = "consumer-1"; |
18 | // Idle time in milliseconds before a message can be reclaimed.
|
19 | static CONSUMER_IDLE_TIME_MS: usize = 10000; |
20 | // Maximum number of times a message can be retried before being sent
|
21 | // to the dead letter queue.
|
22 | static MAX_RETRIES: usize = 3; |
23 | // Name of the dead letter queue.
|
24 | static DEAD_LETTER_QUEUE: &str = "queue:main:dead-letter"; |
25 | |
26 | async fn consumer() -> Result<(), anyhow::Error> { |
27 | // This will only fail if the url is badly formatted.
|
28 | let client = Client::open("redis://localhost:6379/11")?; |
29 | let mut connection = client |
30 | .get_async_connection() |
31 | .await
|
32 | .context("failed to open connection to redis server!")?; |
33 | |
34 | // Create the consumer group if it doesn't exist. We'll ignore the
|
35 | // error if it already exists.
|
36 | connection
|
37 | .xgroup_create_mkstream(STREAM_NAME, GROUP_NAME, "0-0") |
38 | .await
|
39 | .or_else(|e| { |
40 | if let Some("BUSYGROUP") = e.code() { |
41 | Ok(()) |
42 | } else { |
43 | Err(e) |
44 | }
|
45 | })?; |
46 | |
47 | let mut pending_messages: StreamReadReply = connection |
48 | .xread_options( |
49 | &[STREAM_NAME], |
50 | &[">"], |
51 | &StreamReadOptions::default() |
52 | .group(GROUP_NAME, CONSUMER_NAME) |
53 | .count(10), |
54 | )
|
55 | .await?; |
56 | |
57 | for message in pending_messages.keys.pop().unwrap().ids { |
58 | let StreamId { id, map } = message; |
59 | process(&mut connection, id, map).await?; |
60 | }
|
61 | |
62 | let mut start_id = "0-0".to_string(); |
63 | |
64 | loop { |
65 | type RangeReply = Vec<(String, HashMap<String, redis::Value>)>; |
66 | type AutoClaimReply = (String, RangeReply, Vec<String>); |
67 | |
68 | // The redis library I am using does not support XAUTOCLAIM; it
|
69 | // makes sense as XAUTOCLAIM is only available after 6.2, and the
|
70 | // library may not support newer commands. So, we'll use the
|
71 | // generic `cmd` function to send the command.
|
72 | let idle: AutoClaimReply = cmd("XAUTOCLAIM") |
73 | .arg(STREAM_NAME) |
74 | .arg(GROUP_NAME) |
75 | .arg(CONSUMER_NAME) |
76 | .arg(CONSUMER_IDLE_TIME_MS) |
77 | .arg(&start_id) |
78 | .arg("COUNT") |
79 | .arg(10) |
80 | .arg("JUSTID") |
81 | .query_async(&mut connection) |
82 | .await?; |
83 | |
84 | start_id = idle.0; |
85 | for (id, map) in idle.1 { |
86 | process(&mut connection, id, map).await?; |
87 | }
|
88 | if start_id == "0-0" { |
89 | break; |
90 | }
|
91 | }
|
92 | |
93 | let mut messages: StreamReadReply = connection |
94 | .xread_options( |
95 | &[STREAM_NAME], |
96 | &[">"], |
97 | &StreamReadOptions::default() |
98 | .group(GROUP_NAME, CONSUMER_NAME) |
99 | .block(CONSUMER_IDLE_TIME_MS) |
100 | .count(1), |
101 | )
|
102 | .await?; |
103 | |
104 | for message in messages.keys.pop().unwrap().ids { |
105 | let StreamId { id, map } = message; |
106 | process(&mut connection, id, map).await?; |
107 | }
|
108 | |
109 | Ok(()) |
110 | }
|
111 | |
112 | async fn process( |
113 | connection: &mut Connection, |
114 | id: String, |
115 | message: HashMap<String, redis::Value>, |
116 | ) -> Result<(), anyhow::Error> { |
117 | let pending_info: StreamPendingCountReply = connection |
118 | .xpending_count(STREAM_NAME, GROUP_NAME, &id, &id, 1) |
119 | .await?; |
120 | let pending = pending_info |
121 | .ids
|
122 | .into_iter() |
123 | .find(|i| i.id == id) |
124 | .context("failed to find pending id!?")?; |
125 | |
126 | let inner_id = message |
127 | .get("id") |
128 | .and_then(|v| String::from_redis_value(v).ok()) |
129 | .context("missing inner message id")?; |
130 | let payload = message |
131 | .get("payload") |
132 | .and_then(|v| String::from_redis_value(v).ok()) |
133 | .context("missing inner message payload")?; |
134 | |
135 | if pending.times_delivered >= MAX_RETRIES { |
136 | connection
|
137 | .xadd( |
138 | DEAD_LETTER_QUEUE, |
139 | "*", |
140 | &[("id", inner_id), ("payload", payload)], |
141 | )
|
142 | .await?; |
143 | connection.xack(STREAM_NAME, GROUP_NAME, &[&id]).await?; |
144 | } else { |
145 | // Do something with the message.
|
146 | println!("processing message: {:?}", message); |
147 | connection.xack(STREAM_NAME, GROUP_NAME, &[&id]).await?; |
148 | }
|
149 | Ok(()) |
150 | }
|
producer
should be called in a loop, so that it will continue to
process messages. This is, of course, far more complicated than the previous
example. However, with more work, it can be simplified a lot more - we don’t
need to constantly pull our own pending messages except on start-up, for one.
We can also use XPENDING
more intelligently than we do here, as we can grab
all of our own pending message information at once, instead of one at a time.
Thus, the number of round-trips to Redis, on average, for every message that
can be pulled, is far less than the previous example - and it is optimized
so that we can pull in multiple messages at once, and process them in bulk,
without losing too much, as long as all messages can be processed within the
idle time13.
This will also reduce the storage requirements on the server for this process. When adding a message to the stream, you can specify the maximum number of entries to keep in the stream - if you don’t specify this, it will keep all messages forever. If you specify that the max length of the queue is 1000, then the server will only keep the last 1000 messages in the stream14.
Trimming is optional. But considering that you only need to keep the active messages in the stream, i.e. messages that are being processed or will be processed, you can automatically clear out any messages that have been processed15.
Beyond Redis
Redis isn’t always the best tool for scalability. Redis works best on a single server instance, where it has easy linear serializability; unfortunately, when trying to scale it horizontally, it does not maintain any level of consistency that we would want for such a task system:
So if you want to use a distributed Redis system for your task queue, you will need to do your own research in this regard. I’m not an expert in distributed database systems (yet), but for a task queue where you don’t want to lose data, you will want at least sequential serializable consistency.
So, when you hit a certain scale, you are going to need to go beyond what Redis can offer you. And thankfully, there are systems that can offer you this consistency, and still be fast enough to handle your task queue16.
Kafka
Kafka is a distributed streaming platform. It is a distributed commit log, where you can write messages to a topic, and then read messages from that topic in the order that they were written. It is astonishingly similar to Redis’s Streams - though there are definitely big differences, so make sure you adjust your mental model accordingly - but it is also a lot more powerful, more scalable, and offers more throughput. Accordingly, it is also more complex.
See your doctor engineer to see if Kafka is right for you.
RabbitMQ
RabbitMQ is a message broker. It is fundamentally different from Kafka and Redis Streams, and is more similar to a combination of Redis’s Pub/Sub and lists. You define the structure of your queues, exchanges, and consumers, and RabbitMQ will route them accordingly. RabbitMQ is a lot more focused on message passing rather than event streaming, which might make sense for our workload here.
RabbitMQ can be configured to be in a cluster, but queues are not replicated across the cluster by default - instead, you must use a custom queue type to do so. Currently, RabbitMQ’s documentation states the following:
PostgreSQL
PostgreSQL is a relational database.
You could, theoretically, implement a lot of these things within PostgreSQL. There’s even extensions for this.
If you do want to go down this path, FOR UPDATE
SKIP LOCKED
will likely be your friend; but you will need to be careful about how you
structure your queries, how you will handle failures, and how you will handle
deadlocks.
I’ve tried implementing this, but I’ve found that it’s a lot of work to get right, let alone to get something that is nearly as performant as just using Redis17.
Discussion
This was a deep dive I had to take to understand the complexities of “just getting a task queue to work.” I hope you’re able to learn from my mistakes, and that you’re able to make a more informed decision about what you want to use for your task queue.
There is no shame in not reinventing the wheel. These systems are complex, for good reasons, and people have spent a lot of time and effort to make them reliable and performant. If you can use them, you should! But every system is geared for a specific purpose - and the context of what you’re trying to solve will determine what you should use. Even if you shouldn’t reinvent the wheel, fitting a square peg into a round hole is still a bad idea. I hope this article has helped you understand more about the problem in depth!
Good luck, and feel free to let me know if I’ve left something out!
-
This is a difficult invariant to prove, but it is essential. If, for example, you wanted to do post-processing of an upload (e.g. for a user’s avatar), if the task was never put on the queue, the avatar would never be processed, and the user would be left with a broken avatar. This is not only a bad user experience, but it leaves inconsistent state in the application - the file is uploaded somewhere, and has to be there for the task to eventually process it, but the task was never put on the queue, so the file is just sitting there, taking up space, when it can’t even be seen by the user. If you can’t guarantee that the task will be put on the queue, then you should have some periodic job (does that put a task on the queue?) that checks the state of the database to ensure e.g. avatar entries aren’t left in this inconsistent state for too long. ↩
-
There is a problem involving the state of the task here. Going back to the avatar example, when the user uploads a new avatar before the first one is done processing, both uploads are now in the queue. If we transfer all state about the avatar and user in the task, the consumer will never know that the first avatar was replaced, and will process both avatars. This is wasted work at best, but at worst, it can cause inconsistency issues in the database. So even if you transfer the entire snapshot of the state of the resource with the task, you still need to be able to pull the current state of the resource from the database, in the consumer, and compare it to the expected state of the resource in the task. If they differ, you can do something different - in this avatar example, you can clean up the files from the first avatar, as it’s no longer ever going to be used. ↩
-
It is similarly prudent to add some sort of delay to the retry. If there is a transient fault, and the consumer is unable to connect to the database, it makes sense to slow the retry so that the database has time to recover. If it is not a transient fault with e.g. the database, then it makes sense to reduce the number of tasks that are falsely flagged as failed and discarded. Similarly, if a consumer finds that it keeps failing tasks, it might make sense to kill the consumer, as it is likely in a bad state - it should not be failing to consume tasks. ↩
-
Even if you have the most hardened kernel, and a proofed application, a stray cosmic ray can still cause a bit to flip, and cause a process to randomly die. It’s not likely, but it’s possible. Best to be sure, just in case. ↩
-
It sorta feels like one of those problems that reduces to the halting problem at its core - and if you could solve that, you’d be a very rich person. ↩
-
This is actually false. The consumer-specific queue should contain items that the consumer has claimed, but not necessarily items that the consumer is currently working on. This is because the consumer might have claimed an item, but not started working on it yet. This is an optimization, though - consumers might pull multiple items at once to increase throughput of processing items. But considering we’re expecting each task here to take seconds, if not minutes, to complete, it’s not necessary to optimize for a couple milliseconds of network latency. ↩
-
Assuming multiple consumers are running at the same time, then Redis will have to decide which consumer gets a new item first. While redis does resolve this with FIFO semantics, the distribution of which consumers get what when, and how “fair” that algorithm is, is something to keep in mind, especially as you scale or get more complex systems. ↩
-
The whole algorithm can’t be done in a simple Lua script, as according to the Redis documentation,
EVAL
scripts “should only access keys whose names are given as input arguments. Scripts should never access keys with programmatically-generated names or based on the contents of data structures stored in the database.” This means that step (2) will have to be done in at least two parts; one to get the list of consumers, and one to get the tasks from each consumer’s queue. Step (3) can be done in one script, though. ↩ -
Streams have a ton more uses, too, and give us so much flexibility in seeing into the system; for example, we can inspect a stream at any point in time to see what tasks have been processed and what will be processed, assuming we haven’t trimmed the stream. ↩
-
It’s also not atomic, so we’d have to do this in a script. And for a very simple use-case where you have one type of consumer, it didn’t make sense to add this as a feature. ↩
-
There are other special IDs, like
$
, which means “the last message in the stream,” and0
, which means “the first message in the stream.” Note that using these IDs withXREADGROUP
only read from the pending entries list, and not the stream itself; so this is a good way to recover from a failure. ↩ -
Though… apparently, these options are “mainly for internal use in order to transfer the effects of XCLAIM or other commands to the AOF file and to propagate the same effects to the replicas,” and “are unlikely to be useful to normal users.” So, uh, use at your own risk, I guess? ↩
-
We can again abuse
XCLAIM
(it’s getting a lot of abuse in this article) to extend the idle time of a message, if we need to - if we specifyJUSTID
, it won’t increment the attempted deliveries count, and will only update the idle time - and only return the ID of the message. We don’t need the body of the message anyway, so this works! There is one caveat - if the message hit the idle timeout, and was claimed by another consumer, this will silently steal it back, and now both consumers will be processing it. This process should only be done if you are certain that the message is still far within the idle threshold. ↩ -
It even offers an “approximate” trimming (because of the way streams are stored internally) - if you specify a max length of “approximately” 1000, it will trim the stream when it reaches 1000 messages, but it may include a few more messages than that. That’s probably fine for any use case of this. It also allows
MINID
selection - saying that the stream should only keep messages with an ID greater than a certain ID. ↩ -
XPENDING
allows you to retrieve information about what messages are currently pending, including the smallest and largest IDs of pending messages. If you get the smallest ID across all consumer groups, then no messages before that smallest ID are pending, and can be safely removed from the stream usingXTRIM
’sMINID
entry, which “Evicts entries with IDs lower thanthreshold
, wherethreshold
is a stream ID.” ↩ -
What kind of small web application are you building where you’ve out-grown Redis? Should I be concerned? ↩
-
I wouldn’t be surprised if it were just a skill issue. ↩