Telios I

Things I Wish I Knew Before I Tried Implementing a Task Queue

Let’s say you’re writing a web application. You have a long-running task that you want to run in the background. Maybe it’s a task that takes a long time to complete, or maybe it’s a task that you want to run at a later time; or maybe you just want to process it in aggregate somewhere else. Either way, you don’t want to block the user’s request while the task is running.

This is a very common problem. In fact, it’s so common, there are a number of services to help you solve it. ActiveJob is a built-in solution for Ruby on Rails. Celery is a popular solution for Python. Sidekiq is a popular solution for Ruby. If you don’t mind vendor lock-in, AWS’s SNS paired with SQS is a great solution; or Google Cloud’s PubSub.

But my application was small. I don’t need anything too fancy for what I’m doing; I just need something reliable that can handle a few hundred tasks a day. I don’t want to pay for a service, and I don’t want to have to manage making everything Just Right™. Surely, creating a task queue from scratch wouldn’t be hard, right? I could just use a Redis server to store the tasks, and a worker process to pull them off the queue and process them. How hard could it be?

The Basic Framework

Let’s start with the basics. We’ll need a way to add tasks to the queue, and a way to process them. We’ll also need a way to store the tasks. We’ll choose Redis here, as Redis is easy to set up, and has a number of data structures that are useful for this task.

A diagram depicting a basic task queue. On the left, there is a box labeled 'producers', in which producer 1 pushes into a set of tasks, t3, t2, t1, in a box labeled queue in the center; then, that gets pulled into consumer 1, in a box labeled consumers, on the right.

We have some number of producers that generate tasks and push them into this queue. These producers would be the front-end of our application; ideally, once we’ve pushed into the queue, and we receive confirmation from the queue that it has indeed received it, we should be confident that we won’t lose that task1, so we can then return a response to the user.

Tasks should be processed in the order they are received, as often times later tasks depend on earlier tasks to have been completed. For example, if you allow users to upload avatars to your application, you might want to process those avatars before they are displayed to the user - but what if the user changes the avatar again before the first one is processed? You don’t want to display the second avatar (as it was processed first), then the first avatar (as it was processed second), so for some applications, ordering can be very important.

Finally, you have the consumers, which pull tasks off the queue and process them. These would be servers that are running in the background, and are responsible for actually doing the work. They take a task off of the queue, process it (for the avatar example, they might want to check the dimensions, check that it is actually an image, resize it, virus check it, all sorts of things), and save the results.

This is a good start. In redis, all you’d need is a list - LPUSH items into the list, and BRPOP them off on the consumer end. You can even use any version of Redis 2.0 and on, since only BRPOP is the most restrictive, being available in 2.0.0 and on. Simple. Nothing can go wrong.

Well…

What Can Go Wrong?

It is often prudent to take a look at a system to see what aspects of it can fail. This very simplistic model assumes two major things:

  1. The consumer will never fail to process a task.
  2. The consumer will never fail to acknowledge that it has processed - or failed to process - a task.

These seem very similar, but the second assumption is even more restrictive than the first. Let’s take a look at what happens if these assumptions are broken.

The Consumer Fails to Process a Task

Let’s say that while processing an avatar, a transient fault occurs - maybe there is a hiccup in the networking, and the consumer is unable to connect to the database2. As this connection is essential to completing the task, the consumer cannot finish the task - the task has errored. But it’s a transient fault - the database is fine, and the consumer will be able to connect to it again in a few seconds.

So we should retry the task. But we shouldn’t retry it immediately - we’re still unable to connect to the database. Assuming we don’t know the nature of the failure - when the error is happening, we can’t know it’s transient - we’ll push the task back onto the queue using RPUSH to let another consumer that is likely able to connect to the database.

But what if the issue is caused by the task itself? What if the task is malformed, and the consumer is unable to process it? We can’t just keep retrying the task, as it will never succeed. We need to be able to handle errors in the task itself. So we’ll include a retry counter in the task, and if the consumer fails to process the task, it will increment the retry counter and push it back onto the queue. If the retry counter exceeds some threshold, we’ll just discard it.

But what if the database is down for a long time? We don’t want to keep discarding all tasks that are unable to connect to the database - that’s not an issue with the task, that’s an issue with the database; but the retry counter will keep incrementing, and eventually we’ll discard the task3. Or, maybe we want to know what keeps causing issues with the task queue, so we can fix it. We can’t do that if we keep discarding the task. So we’ll send tasks that fail to connect to the database to a different queue, a “dead letter queue” - a queue that is used to store tasks that have failed, and need to be looked at by a human. This way, we can keep track of what tasks are failing, and why, and we can fix the issue.

The Consumer Fails to Acknowledge a Task

Let’s take another look at the above scenario. The consumer is processing an avatar, and a fault occurs - only, instead of being a recoverable error, let’s say that the consumer process is killed randomly4. The task is lost to the abyss. As the process itself was the one keeping track of the task, including notifying the queue when it succeeded (or failed), if the process is unable to do so, the task is lost, never to be completed. A failure to acknowledge a task, to the queue, is success, rather than failure.

So we need to be able to handle this. We need to be able to detect when a consumer has died, and requeue the task. But how do we know when a consumer has died? This is a hard problem5. We can’t just check if the process is running, as it might be just spinning its wheels, doing nothing. Or it could just be working away at the task, invisible to the outside world. For the sake of reducing the complexity, we’ll put a timeout on how long it takes to process a task. If a consumer takes longer than n seconds to process a task, we’ll assume it has died, and requeue the task.

Of course, we can’t rely on the consumer to tell us how long it’s taking to process a task - if it hard crashed, it won’t be able to tell us anything. So the queue system now needs to keep track of the consumer, and how long it’s taking to process a task. This is a bit of a problem, as the queue system doesn’t know anything about the consumer, the way we’ve set it up - it just has a list of items, maybe two, and that’s it. It doesn’t know who the consumers are, let alone which consumers have which task, let alone how long it’s taking them to process the task. So we need to add some sort of registration system to the queue, so that consumers can register themselves with the queue, and the queue can keep track of them.

Thus, for the sake of reliability, LPUSH and BRPOP are not enough.

Take Two

This time, we’ll try to resolve the issues we ran into above. We’ll start with the same basic setup - a producer pushes into a queue, and a consumer pops from the queue. But this time, we’ll add a few more things.

Similar to the previous image. The producers on the left are pushing into the queue; specifically, the 'general queue' under the queue label. The general queue is pushing into 3 consumer-specific queues underneath it, with varying numbers of items in each. Below that is an empty queue labled 'dead letter queue'. The consumer-specific queues are pushing into 3 separate consumers under the 'consumer' box on the right; one for each.

The same general outline is here. We’ve got the producers pushing into the queue, and consumers pulling items from the queue. But this time, we’ve got a few more things going on. We’ll focus on the queue system, as that’s where most of the changes are.

Instead of having one fixed queue for all consumers to pull from, we have one “general queue,” where all of the items that are pushed into the queue are pushed into. Then, we have a number of consumer-specific queues, one for each consumer. When a consumer is ready to consume a task, it pops from its consumer-specific queue. If the consumer-specific queue is empty, it pops from the general queue. This way, we can have multiple consumers pulling from the same queue. The consumer-specific queue should only contain items that the consumer is currently working on; not items that the consumer will work on6. This is important. At the same time, the consumer registers its presence with the queue, so that the queue knows which consumers are currently active. This way, the queue can keep track of which consumers are currently active, and which consumers are currently working on which tasks.

If a consumer dies, the items in its queue can be released back into the general queue, and another consumer can pick them up. This way, we can detect when a consumer has died, and requeue the items that it was working on, preventing tasks from being lost. Note, however, that this necessarily requires that tasks can be processed more than once - if a consumer fails just before it would have acknowledged the task, and the task is requeued, then the task will be processed twice. This is a tradeoff that we have to make, in order to ensure that tasks are not processed zero times.

If a task fails on a consumer, the task can be pushed back into the general queue and have its retry counter incremented. This way, the task can be retried on a potentially different consumer, and the consumer that failed can continue processing other tasks. If it fails too many times, it can be pushed into a “dead letter queue,” where it can be inspected by a human, and the task can be retried manually.

Interestingly, this can still be implemented with Redis tasks. The producer can LPUSH into the general queue. When a consumer starts, it must set a key specific to that consumer and queue (e.g. queue:name:consumer:1:active), and set the expiration to the timeout for liveness (e.g. 30 minutes; the consumer must then take care to “touch” the key half of that interval, to ensure that the key doesn’t expire while it’s alive), and add itself to the list of consumers for that queue. Then, when a consumer is ready to consume a task, it can do the following, in order:

  1. Check its own queue for a task. If a consumer crashed while processing a task, and is able to come back up to operation within liveness time, the task will still be in the queue, and the consumer can pick it up again. It might make sense to increment the retry counter here on the task, as it’s possible that the consuemr crashed because of a bug in the task; this is an implementation detail, though.
  2. Check all other consumers’ queues for task. As all consumers add their information to the queue system, we know which consumers have been active; we can then filter down based on which aren’t currently, and for those that aren’t currently, we can steal their tasks, assigning them to ourselves. See (3) on how to move a task from one queue to another.
  3. If we’re unable to get a task any other way, then we can try pulling from the general queue. In version 6.2.0, Redis added BLMOVE as a command - this takes an item from one list, and moves it to another list, blocking if the former does not have any items for the given timeout. (It also allows you to specify which direction to remove and append the item, which is nice.) But before that, there was BRPOPLPUSH (available since 2.2.0); both commands describe how to create a reliable queue system in Redis, the same way we’re doing it! These commands are atomic - popping one item off from the general queue and pushing them onto the consumer-specific queue is done in one operation, ensuring that we’re the only one that gets the task7.

To be fair, this isn’t incredibly efficient - there are multiple round trips here, and the number of round trips (if not careful) can potentially scale with the number of consumers8. But it’s a basic system that can be implemented pretty easily.

So, a producer’s code might look like this:

1
use std::collections::HashSet;
2
3
use anyhow::Context;
4
use redis::aio::Connection;
5
use redis::{AsyncCommands, Client, Direction};
6
7
static TASK_QUEUE_KEY: &str = "queue:main:tasks";
8
static CONSUMER_SET_KEY: &str = "queue:main:consumers";
9
10
fn consumer_liveness_key(name: &str) -> String {
11
    format!("queue:main:consumers:active:{}", name)
12
}
13
14
fn consumer_processing_list_key(name: &str) -> String {
15
    format!("queue:main:consumers:list:{}", name)
16
}
17
18
#[tokio::main]
19
async fn main() -> Result<(), anyhow::Error> {
20
    // This will only fail if the url is badly formatted.
21
    let client = Client::open("redis://localhost:6379/10")?;
22
    let mut connection = client
23
        .get_async_connection()
24
        .await
25
        .context("failed to open connection to redis server!")?;
26
    let message = serde_json::to_string(&serde_json::json!({
27
        "id": uuid::Uuid::new_v4(),
28
        "payload": {}
29
    }))
30
    .context("failed to serialize message!")?;
31
32
    connection
33
        .rpush(TASK_QUEUE_KEY, message)
34
        .await
35
        .context("failed to push message to queue!")?;
36
37
    Ok(())
38
}

And a consumer’s code might look like this:

1
use std::collections::HashSet;
2
3
use anyhow::Context;
4
use redis::aio::Connection;
5
use redis::{AsyncCommands, Client, Direction};
6
7
static TASK_QUEUE_KEY: &str = "queue:main:tasks";
8
static CONSUMER_SET_KEY: &str = "queue:main:consumers";
9
10
fn consumer_liveness_key(name: &str) -> String {
11
    format!("queue:main:consumers:active:{}", name)
12
}
13
14
fn consumer_processing_list_key(name: &str) -> String {
15
    format!("queue:main:consumers:list:{}", name)
16
}
17
18
#[tokio::main]
19
async fn main() -> Result<(), anyhow::Error> {
20
    let client = Client::open("redis://localhost:6379/10")?;
21
    let mut connection = client
22
        .get_async_connection()
23
        .await
24
        .context("failed to open connection to redis server!")?;
25
26
    // define the consumer's name.
27
    let consumer_name = "consumer-1";
28
    // define the blocking timeout for blmove in milliseconds.
29
    let blocking_timeout_ms = 5000;
30
31
    // Set our liveness key to 1, with an expiration of 30 minutes.
32
    // Once this expires we're considered dead.
33
    // It is an excercise for the reader to implement a heartbeat.
34
    connection
35
        .set_ex(
36
            consumer_liveness_key(consumer_name),
37
            "1",
38
            1800, // 30 minutes
39
        )
40
        .await
41
        .context("failed to set the liveness key!")?;
42
43
    // Now, we can add ourselves to the consumer set.
44
    connection
45
        .sadd(CONSUMER_SET_KEY, consumer_name)
46
        .await
47
        .context("failed to add consumer to set!")?;
48
49
    loop {
50
        let incoming = next(&mut connection, consumer_name,
51
                blocking_timeout_ms)
52
            .await
53
            .context("failed to load next task!")?;
54
55
        // Pushing messages into a DLQ upon failure is an excercise
56
        // for the reader.
57
        for message in incoming {
58
            let task = serde_json::from_str(&message)
59
                .context("failed to deserialize message!")?;
60
            process(task).context("failed to process message!")?;
61
        }
62
63
        // We've finished processing, so we can remove the items in the
64
        // processing list.
65
        connection
66
            .del(consumer_processing_list_key(consumer_name))
67
            .await?;
68
    }
69
}
70
71
async fn next(
72
    connection: &mut Connection,
73
    consumer_name: &str,
74
    timeout: usize,
75
) -> Result<Vec<String>, anyhow::Error> {
76
    'retry: loop {
77
        // Load our processing list.
78
        let processing_list: Vec<String> = connection
79
            .lrange(consumer_processing_list_key(consumer_name), 0, -1)
80
            .await
81
            .context("failed to load processing list!")?;
82
        // We've received an item from our processing list, so we can
83
        // return it.  This is where we'd normally do some sort of
84
        // retry counter for the message.
85
        if !processing_list.is_empty() {
86
            // This is the only return in this function to make sure
87
            // that whatever we return is in the processing list.
88
            return Ok(processing_list);
89
        }
90
91
        // Now, we perform "work stealing." Get the list of consumers,
92
        // excluding ourselves, and check if any of them are dead.
93
        // If so, move something to ourselves.
94
        let mut consumers: HashSet<String> = connection
95
            .smembers(CONSUMER_SET_KEY)
96
            .await
97
            .context("failed to load consumer set!")?;
98
        consumers.remove(consumer_name);
99
100
        for consumer in consumers {
101
            const LIVENESS_MESSAGE: &str =
102
                "failed to load external consumer's liveness key";
103
            // redis's exists returns a 1 if the key exists, 0 otherwise.
104
            let exists: u32 = connection
105
                .exists(consumer_liveness_key(&consumer))
106
                .await
107
                .context(LIVENESS_MESSAGE)?;
108
            if exists == 0 {
109
                const DEAD_MESSAGE: &str =
110
                    "failed to move message from dead consumer!";
111
                // The consumer is dead, so we can move something from
112
                // their processing list to ours.
113
                let message: Option<()> = connection
114
                    .lmove(
115
                        consumer_processing_list_key(&consumer),
116
                        consumer_processing_list_key(consumer_name),
117
                        redis::Direction::Right,
118
                        redis::Direction::Left,
119
                    )
120
                    .await
121
                    .context(DEAD_MESSAGE)?;
122
                if message.is_some() {
123
                    // We've successfully stolen a message, so we can
124
                    // return it.
125
                    continue 'retry;
126
                }
127
            }
128
        }
129
130
        // No consumers to steal from, and no items in our queue, so
131
        // remove from the main queue and add to our processing list.
132
        connection
133
            .blmove(
134
                TASK_QUEUE_KEY,
135
                consumer_processing_list_key(consumer_name),
136
                Direction::Right,
137
                Direction::Left,
138
                timeout,
139
            )
140
            .await
141
            .context("failed to pop message from queue!")?;
142
    }
143
}
144
145
fn process(message: serde_json::Value) -> Result<(), anyhow::Error> {
146
    // Do something with the message.
147
    println!("processing message: {:?}", message);
148
    Ok(())
149
}

Packaging this up into a library, adding quality of life features, retries, and the liveness check are all left as exercises for the reader. I also make no guarantees that the above code is bug-free, but it should be a good starting point - it should not be used in production, especially not as-is.

It’s a shame that Redis doesn’t have a built-in for this, though. Really seems like one of those things they’d’ve added at some point, instead of having to abuse the list commands like this.

Redis’s Built-In For This

In Redis 5.0.0, the XADD command was added. This command is to be used with a new data type added in 5.0 called a stream. Streams are an append-only data structure, where each entry is a key-value pair; while also including stateful and optimization information around that.

Up until now, we’ve been using lists as a queue. But in reality, all we need is a data structure in which we can append items to the end, and mark items as being worked on (and by whom), or as being completed. An append-only log of tasks is a perfect fit - it’ll keep track of the order of tasks, as well as who’s working on what, what’s been completed, and what’s still pending9.

But before we can use streams, I need to introduce another concept we haven’t yet included into our model: consumer groups. We’ve been treating each consumer as all performing the same job, and doing the same thing for every task - so there’s no need to differentiate between them. But what if we have multiple consumers, and we want the same task to go to all of them? For example, we might want to gather image metadata quickly (e.g. dimensions, file size, etc.), and do more complex processing on other servers separately.

This is where consumer groups come in. A consumer group is a named group of consumers that all receive different tasks. Each consumer group essentially gets a “copy” of the queue, and each consumer in the group is assigned a task from that queue. This means that each task is only processed once within a consumer group, but can be processed multiple times across consumer groups.

A version of our system from above with consumer gorups might look something like this:

A version of the above diagram. This time, the queue box is separated into 'consumer group 1' and 'consumer group 2', which contain the same things the queue used to; producer pushes into both general queues now. The consumers box now similarly contains 'consumer group 1' and 'consumer group 2', but they draw from the respective consumer-specific queues now.

Note that in this variant, the producer has to push into both general queues for both consumer groups. We could offload that responsibility onto consumers - before polling, consumers could check a queue, and move all items in that queue into both consumer group queues - but that’s a lot of duplication and extra work in general10. However, Redis Streams has this concept built in as its core, as it’s completely free, no extra work required.

So, let’s take a look at how this would be implemented in Redis Streams. Again, Streams are an append-only log - and the reason we need one queue per consumer group previously is because we were popping items off of lists to claim them. Instead, with a stream, we can keep track of our position in the stream, and move that position around - when we claim a new item, we move that position forward. Thus, each consumer group is just a pointer into the stream, and consumers are just moving that pointer around.

However, within a consumer group, we still need to keep track of which tasks are pending. We can’t just use the stream itself, because we would need to be able to update the tasks with consumer information - and that would conflict across consumer groups. So each consumer group has a “Pending Entries List,” which is a structure that contains just that information - the ID of the task, the consumer that’s working on it, and the number of times it was delivered.

To remove a task from the pending entries list, we can use the XACK command, which effectively marks it as complete. Redis doesn’t offer a version of NACK, or negative acknowledgement, which would mark a task as incomplete and put it back onto the effective queue; but it does allow recovery in case of a failure. There is a way to emulate NACK using XCLAIM, but I’ll explain that later.

To pull a task from the stream, for our use-case, we should use XREADGROUP. We’ll ignore XREAD, which is the non-grouped version, because we want to ensure that each task is only processed once within a consumer group. The command takes the group we’re in, and the consumer name, and returns the next n tasks from the streams specified. But you’ll notice that it takes an ID argument. For XREADGROUP, the ID we’ll care about here is > - this is a special ID to redis, which means “messages that were never delivered to any other consumer in the group.” This is how we can ensure that each task is only processed once within a consumer group11.

But again, we want to reclaim tasks that have been abandoned by the consumer. Redis 5.0 includes the XPENDING and XCLAIM commands; before using XREADGROUP, we can use XPENDING to get a list of tasks that have been idle for a certain amount of time, and then use XCLAIM to claim them; XCLAIM allows us to specify the same idle timeout, so if we would claim a task that another consumer has already claimed (and reset the timeout for), we can ignore it. In addition, XCLAIM allows us to update the idle time (and retry count)12, so we can use this to emulate NACK - we can just set the idle time of the message to be past the idle timeout, and it’ll be picked up the next time someone calls XPENDING.

Redis 6.2 also includes the XAUTOCLAIM command, which is a combination of XPENDING and XCLAIM - you specify the group, the consumer, the minimum idle time, the starting ID (0-0 is a good choice here), and the maximum number of tasks to claim, and it’ll do the rest. The return value is similar to XCLAIM. There is one key difference, though - XAUTOCLAIM returns only the next ID to pass to XAUTOCLAIM on the next call, and the items claimed (as well as message IDs that no longer exist, but we don’t need this). XCLAIM returns the same information, about. However… XPENDING returns more information. It returns the idle time of the message, but more importantly, it returns the number of times the message has been delivered. This is important, because we want to track that, to push messages into a dead letter queue when it exceeds a threshold. So, we’ll need to use XPENDING either way to get the number of deliveries.

So, this is what it would look like with redis streams:

A heavily modified version of the above diagram. the queue block now has a boundless stream of items at the top; two consumer group blocks within the queue point to different points in the stream. Each consumer group have a 'pending entries list' within them. The consumers point to these consumer groups.

That’s pretty neat! Now, let’s take a look at how we’d implement this. The code for the producer wouldn’t change much, as we’d just need to XADD to the stream instead of LPUSH to the list:

1
use anyhow::Context;
2
use redis::{AsyncCommands, Client};
3
4
#[tokio::main]
5
async fn main() -> Result<(), anyhow::Error> {
6
    producer().await?;
7
    consumer().await?;
8
    Ok(())
9
}
10
11
async fn producer() -> Result<(), anyhow::Error> {
12
    // This will only fail if the url is badly formatted.
13
    let client = Client::open("redis://localhost:6379/11")?;
14
    let mut connection = client
15
        .get_async_connection()
16
        .await
17
        .context("failed to open connection to redis server!")?;
18
19
    let id = uuid::Uuid::new_v4().to_string();
20
    let message = [("id", id.as_str()), ("payload", "{}")];
21
22
    connection
23
        .xadd("stream:main", "*", &message)
24
        .await
25
        .context("failed to push message to queue!")?;
26
27
    Ok(())
28
}

The consumer, however, would change quite a bit.

  1. We need to create the consumer group the consumer is in, if it doesn’t exist. This is done with the XGROUP CREATE command. Note that we must specify the ID of where the consumer group’s position is in the stream - we’ll use 0-0 here, which means “the first message in the stream.” This is because we want to process all messages in the stream, and not just new ones. (If the group already exists, it’ll return an error, so we’ll need to make sure to handle that.)
  2. We need to create the consumer within the consumer group. This can be implicit with XREADGROUP - if the consumer’s name doesn’t exist, it’ll create it for us. We could use XGROUP CREATECONSUMER as well.
  3. Then, we’ll start by pulling all of the consumer’s pending messages, in case we had crashed and need to recover our tasks. We’ll use XREADGROUP for this - if we specify > for the ID, it’ll pull all of our pending messages for us. This has the side effect of incrementing the number of deliveries for each message, and resetting the idle time.
  4. If we don’t have any pending messages, we’ll use XAUTOCLAIM to claim messages that have been idle for a certain amount of time.
  5. If we get any messages for either of these, we’ll check their delivery count with XPENDING - if it’s over a certain threshold, we’ll push it to a dead letter queue, and XACK it. Otherwise, we’ll process it, and XACK it.
  6. Finally, if there are no pending messages, and no idle messages, we will use XREADGROUP with > to pull the next message from the stream, blocking for a few seconds. If we get a message, we’ll process it, and XACK it.
  7. Go to (3).

This might look something like this:

1
use std::collections::HashMap;
2
3
use anyhow::Context;
4
use redis::aio::Connection;
5
use redis::streams::{
6
    StreamId,
7
    StreamPendingCountReply,
8
    StreamReadOptions,
9
    StreamReadReply
10
};
11
use redis::{cmd, AsyncCommands, Client, FromRedisValue};
12
13
static STREAM_NAME: &str = "stream:main";
14
// Left as an exercise to make this configurable.
15
static GROUP_NAME: &str = "group-a";
16
// Ditto.
17
static CONSUMER_NAME: &str = "consumer-1";
18
// Idle time in milliseconds before a message can be reclaimed.
19
static CONSUMER_IDLE_TIME_MS: usize = 10000;
20
// Maximum number of times a message can be retried before being sent
21
// to the dead letter queue.
22
static MAX_RETRIES: usize = 3;
23
// Name of the dead letter queue.
24
static DEAD_LETTER_QUEUE: &str = "queue:main:dead-letter";
25
26
async fn consumer() -> Result<(), anyhow::Error> {
27
    // This will only fail if the url is badly formatted.
28
    let client = Client::open("redis://localhost:6379/11")?;
29
    let mut connection = client
30
        .get_async_connection()
31
        .await
32
        .context("failed to open connection to redis server!")?;
33
34
    // Create the consumer group if it doesn't exist.  We'll ignore the
35
    // error if it already exists.
36
    connection
37
        .xgroup_create_mkstream(STREAM_NAME, GROUP_NAME, "0-0")
38
        .await
39
        .or_else(|e| {
40
            if let Some("BUSYGROUP") = e.code() {
41
                Ok(())
42
            } else {
43
                Err(e)
44
            }
45
        })?;
46
47
    let mut pending_messages: StreamReadReply = connection
48
        .xread_options(
49
            &[STREAM_NAME],
50
            &[">"],
51
            &StreamReadOptions::default()
52
                .group(GROUP_NAME, CONSUMER_NAME)
53
                .count(10),
54
        )
55
        .await?;
56
57
    for message in pending_messages.keys.pop().unwrap().ids {
58
        let StreamId { id, map } = message;
59
        process(&mut connection, id, map).await?;
60
    }
61
62
    let mut start_id = "0-0".to_string();
63
64
    loop {
65
        type RangeReply = Vec<(String, HashMap<String, redis::Value>)>;
66
        type AutoClaimReply = (String, RangeReply, Vec<String>);
67
68
        // The redis library I am using does not support XAUTOCLAIM; it
69
        // makes sense as XAUTOCLAIM is only available after 6.2, and the
70
        // library may not support newer commands.  So, we'll use the
71
        // generic `cmd` function to send the command.
72
        let idle: AutoClaimReply = cmd("XAUTOCLAIM")
73
            .arg(STREAM_NAME)
74
            .arg(GROUP_NAME)
75
            .arg(CONSUMER_NAME)
76
            .arg(CONSUMER_IDLE_TIME_MS)
77
            .arg(&start_id)
78
            .arg("COUNT")
79
            .arg(10)
80
            .arg("JUSTID")
81
            .query_async(&mut connection)
82
            .await?;
83
84
        start_id = idle.0;
85
        for (id, map) in idle.1 {
86
            process(&mut connection, id, map).await?;
87
        }
88
        if start_id == "0-0" {
89
            break;
90
        }
91
    }
92
93
    let mut messages: StreamReadReply = connection
94
        .xread_options(
95
            &[STREAM_NAME],
96
            &[">"],
97
            &StreamReadOptions::default()
98
                .group(GROUP_NAME, CONSUMER_NAME)
99
                .block(CONSUMER_IDLE_TIME_MS)
100
                .count(1),
101
        )
102
        .await?;
103
104
    for message in messages.keys.pop().unwrap().ids {
105
        let StreamId { id, map } = message;
106
        process(&mut connection, id, map).await?;
107
    }
108
109
    Ok(())
110
}
111
112
async fn process(
113
    connection: &mut Connection,
114
    id: String,
115
    message: HashMap<String, redis::Value>,
116
) -> Result<(), anyhow::Error> {
117
    let pending_info: StreamPendingCountReply = connection
118
        .xpending_count(STREAM_NAME, GROUP_NAME, &id, &id, 1)
119
        .await?;
120
    let pending = pending_info
121
        .ids
122
        .into_iter()
123
        .find(|i| i.id == id)
124
        .context("failed to find pending id!?")?;
125
126
    let inner_id = message
127
        .get("id")
128
        .and_then(|v| String::from_redis_value(v).ok())
129
        .context("missing inner message id")?;
130
    let payload = message
131
        .get("payload")
132
        .and_then(|v| String::from_redis_value(v).ok())
133
        .context("missing inner message payload")?;
134
135
    if pending.times_delivered >= MAX_RETRIES {
136
        connection
137
            .xadd(
138
                DEAD_LETTER_QUEUE,
139
                "*",
140
                &[("id", inner_id), ("payload", payload)],
141
            )
142
            .await?;
143
        connection.xack(STREAM_NAME, GROUP_NAME, &[&id]).await?;
144
    } else {
145
        // Do something with the message.
146
        println!("processing message: {:?}", message);
147
        connection.xack(STREAM_NAME, GROUP_NAME, &[&id]).await?;
148
    }
149
    Ok(())
150
}

producer should be called in a loop, so that it will continue to process messages. This is, of course, far more complicated than the previous example. However, with more work, it can be simplified a lot more - we don’t need to constantly pull our own pending messages except on start-up, for one. We can also use XPENDING more intelligently than we do here, as we can grab all of our own pending message information at once, instead of one at a time. Thus, the number of round-trips to Redis, on average, for every message that can be pulled, is far less than the previous example - and it is optimized so that we can pull in multiple messages at once, and process them in bulk, without losing too much, as long as all messages can be processed within the idle time13.

This will also reduce the storage requirements on the server for this process. When adding a message to the stream, you can specify the maximum number of entries to keep in the stream - if you don’t specify this, it will keep all messages forever. If you specify that the max length of the queue is 1000, then the server will only keep the last 1000 messages in the stream14.

Trimming is optional. But considering that you only need to keep the active messages in the stream, i.e. messages that are being processed or will be processed, you can automatically clear out any messages that have been processed15.

Beyond Redis

Redis isn’t always the best tool for scalability. Redis works best on a single server instance, where it has easy linear serializability; unfortunately, when trying to scale it horizontally, it does not maintain any level of consistency that we would want for such a task system:

Out of 2000 writes, Redis claimed that 1998 of them completed successfully. However, only 872 of those integers were present in the final set. Redis threw away 56% of the writes it told us succeeded.

Jepsen: Redis, on Redis Sentinel

A second replication strategy, Redis Cluster, provides transparent sharding and majority availability. Like Redis Sentinel, it uses asynchronous replication, and can lose acknowledged writes during some types of network failures[.]

[...]

Redis Sentinel, Redis Cluster, and Active-Active Geo-Distribution all allow lost updates—at least for some workloads.

[...]

In discussions with Jepsen, Redis Labs clarified that Redis Enterprise can offer ACID characteristics, but only with significant limitations. Replication must be disabled, the write-ahead log must be set to fsync on every write, and there is no way to roll back when transactions fail. These factors are not clearly documented, but Redis Labs plans to document them in the future.

In short, users who want fault-tolerance and not lost updates need something stronger than existing Redis replication systems. Whereupon: the creation of Redis-Raft.

[...]

Tangentially, we were surprised to discover that Redis Enterprise’s claim of “full ACID compliance” only applied to specially-configured non-redundant deployments, rather than replicated clusters. While Jepsen has not experimentally confirmed data loss in Redis Enterprise, our discussions with Redis Labs suggest that users should not, in general, assume that multi-node Redis deployments offer ACID guarantees. We agree with Redis Labs that the documentation should make this clear.

Jepsen: Redis-Raft 1b3fbf6

So if you want to use a distributed Redis system for your task queue, you will need to do your own research in this regard. I’m not an expert in distributed database systems (yet), but for a task queue where you don’t want to lose data, you will want at least sequential serializable consistency.

So, when you hit a certain scale, you are going to need to go beyond what Redis can offer you. And thankfully, there are systems that can offer you this consistency, and still be fast enough to handle your task queue16.

Kafka

Kafka is a distributed streaming platform. It is a distributed commit log, where you can write messages to a topic, and then read messages from that topic in the order that they were written. It is astonishingly similar to Redis’s Streams - though there are definitely big differences, so make sure you adjust your mental model accordingly - but it is also a lot more powerful, more scalable, and offers more throughput. Accordingly, it is also more complex.

See your doctor engineer to see if Kafka is right for you.

RabbitMQ

RabbitMQ is a message broker. It is fundamentally different from Kafka and Redis Streams, and is more similar to a combination of Redis’s Pub/Sub and lists. You define the structure of your queues, exchanges, and consumers, and RabbitMQ will route them accordingly. RabbitMQ is a lot more focused on message passing rather than event streaming, which might make sense for our workload here.

RabbitMQ can be configured to be in a cluster, but queues are not replicated across the cluster by default - instead, you must use a custom queue type to do so. Currently, RabbitMQ’s documentation states the following:

The quorum queue is a modern queue type for RabbitMQ implementing a durable, replicated FIFO queue based on the Raft consensus algorithm. It is available as of RabbitMQ 3.8.0.

Quorum queues and streams replace durable mirrored queues, the original replicated queue type which is now deprecated and scheduled for removal.

Quorum queues are optimized for set of use cases where data safety is a top priority. This is covered in Motivation. Quorum queues should be considered the default option for a replicated queue type.

Quorum Queues, RabbitMQ

PostgreSQL

PostgreSQL is a relational database.

You could, theoretically, implement a lot of these things within PostgreSQL. There’s even extensions for this.

If you do want to go down this path, FOR UPDATE SKIP LOCKED will likely be your friend; but you will need to be careful about how you structure your queries, how you will handle failures, and how you will handle deadlocks.

I’ve tried implementing this, but I’ve found that it’s a lot of work to get right, let alone to get something that is nearly as performant as just using Redis17.

Discussion

This was a deep dive I had to take to understand the complexities of “just getting a task queue to work.” I hope you’re able to learn from my mistakes, and that you’re able to make a more informed decision about what you want to use for your task queue.

There is no shame in not reinventing the wheel. These systems are complex, for good reasons, and people have spent a lot of time and effort to make them reliable and performant. If you can use them, you should! But every system is geared for a specific purpose - and the context of what you’re trying to solve will determine what you should use. Even if you shouldn’t reinvent the wheel, fitting a square peg into a round hole is still a bad idea. I hope this article has helped you understand more about the problem in depth!

Good luck, and feel free to let me know if I’ve left something out!


  1. This is a difficult invariant to prove, but it is essential. If, for example, you wanted to do post-processing of an upload (e.g. for a user’s avatar), if the task was never put on the queue, the avatar would never be processed, and the user would be left with a broken avatar. This is not only a bad user experience, but it leaves inconsistent state in the application - the file is uploaded somewhere, and has to be there for the task to eventually process it, but the task was never put on the queue, so the file is just sitting there, taking up space, when it can’t even be seen by the user. If you can’t guarantee that the task will be put on the queue, then you should have some periodic job (does that put a task on the queue?) that checks the state of the database to ensure e.g. avatar entries aren’t left in this inconsistent state for too long. 

  2. There is a problem involving the state of the task here. Going back to the avatar example, when the user uploads a new avatar before the first one is done processing, both uploads are now in the queue. If we transfer all state about the avatar and user in the task, the consumer will never know that the first avatar was replaced, and will process both avatars. This is wasted work at best, but at worst, it can cause inconsistency issues in the database. So even if you transfer the entire snapshot of the state of the resource with the task, you still need to be able to pull the current state of the resource from the database, in the consumer, and compare it to the expected state of the resource in the task. If they differ, you can do something different - in this avatar example, you can clean up the files from the first avatar, as it’s no longer ever going to be used. 

  3. It is similarly prudent to add some sort of delay to the retry. If there is a transient fault, and the consumer is unable to connect to the database, it makes sense to slow the retry so that the database has time to recover. If it is not a transient fault with e.g. the database, then it makes sense to reduce the number of tasks that are falsely flagged as failed and discarded. Similarly, if a consumer finds that it keeps failing tasks, it might make sense to kill the consumer, as it is likely in a bad state - it should not be failing to consume tasks. 

  4. Even if you have the most hardened kernel, and a proofed application, a stray cosmic ray can still cause a bit to flip, and cause a process to randomly die. It’s not likely, but it’s possible. Best to be sure, just in case. 

  5. It sorta feels like one of those problems that reduces to the halting problem at its core - and if you could solve that, you’d be a very rich person. 

  6. This is actually false. The consumer-specific queue should contain items that the consumer has claimed, but not necessarily items that the consumer is currently working on. This is because the consumer might have claimed an item, but not started working on it yet. This is an optimization, though - consumers might pull multiple items at once to increase throughput of processing items. But considering we’re expecting each task here to take seconds, if not minutes, to complete, it’s not necessary to optimize for a couple milliseconds of network latency. 

  7. Assuming multiple consumers are running at the same time, then Redis will have to decide which consumer gets a new item first. While redis does resolve this with FIFO semantics, the distribution of which consumers get what when, and how “fair” that algorithm is, is something to keep in mind, especially as you scale or get more complex systems. 

  8. The whole algorithm can’t be done in a simple Lua script, as according to the Redis documentation, EVAL scripts “should only access keys whose names are given as input arguments. Scripts should never access keys with programmatically-generated names or based on the contents of data structures stored in the database.” This means that step (2) will have to be done in at least two parts; one to get the list of consumers, and one to get the tasks from each consumer’s queue. Step (3) can be done in one script, though. 

  9. Streams have a ton more uses, too, and give us so much flexibility in seeing into the system; for example, we can inspect a stream at any point in time to see what tasks have been processed and what will be processed, assuming we haven’t trimmed the stream. 

  10. It’s also not atomic, so we’d have to do this in a script. And for a very simple use-case where you have one type of consumer, it didn’t make sense to add this as a feature. 

  11. There are other special IDs, like $, which means “the last message in the stream,” and 0, which means “the first message in the stream.” Note that using these IDs with XREADGROUP only read from the pending entries list, and not the stream itself; so this is a good way to recover from a failure. 

  12. Though… apparently, these options are “mainly for internal use in order to transfer the effects of XCLAIM or other commands to the AOF file and to propagate the same effects to the replicas,” and “are unlikely to be useful to normal users.” So, uh, use at your own risk, I guess? 

  13. We can again abuse XCLAIM (it’s getting a lot of abuse in this article) to extend the idle time of a message, if we need to - if we specify JUSTID, it won’t increment the attempted deliveries count, and will only update the idle time - and only return the ID of the message. We don’t need the body of the message anyway, so this works! There is one caveat - if the message hit the idle timeout, and was claimed by another consumer, this will silently steal it back, and now both consumers will be processing it. This process should only be done if you are certain that the message is still far within the idle threshold. 

  14. It even offers an “approximate” trimming (because of the way streams are stored internally) - if you specify a max length of “approximately” 1000, it will trim the stream when it reaches 1000 messages, but it may include a few more messages than that. That’s probably fine for any use case of this. It also allows MINID selection - saying that the stream should only keep messages with an ID greater than a certain ID. 

  15. XPENDING allows you to retrieve information about what messages are currently pending, including the smallest and largest IDs of pending messages. If you get the smallest ID across all consumer groups, then no messages before that smallest ID are pending, and can be safely removed from the stream using XTRIM’s MINID entry, which “Evicts entries with IDs lower than threshold, where threshold is a stream ID.” 

  16. What kind of small web application are you building where you’ve out-grown Redis? Should I be concerned? 

  17. I wouldn’t be surprised if it were just a skill issue.