A Rust implementation of Celery for producing and consuming asyncronous tasks with a distributed message queue.


Build License Crates Docs Help wanted Discord


What is Rusty Celery?

Simply put, this is a Rust implementation of the Celery protocol for producing and consuming asyncronous tasks with a distributed message broker. It comes with an idiomatic async API driven by the performant tokio.rs, and above all an emphasis on safety.

How does it work?

Celery revolves around the concept of a task. A task is a unit of work that is requested by a producer to be completed by a consumer / worker.

For example, a social media service may need tasks to notify a user's followers when they post new content. When a user uploads their content to the service's website, the website's backend would act as the producer sending out the tasks to a set of workers - usually deployed on a separate server or cluster - via a distributed message broker.

A Celery application instance is meant to serve as either the producer or the consumer. In this example, both the website backend and the worker applications would initialize a Celery app in the same way, with the exact same configuration. The web backend would then call Celery::send_task to produce a task which a worker would receive while it is consuming tasks through the Celery::consume method.

Built to scale

The Celery framework is a multiple producer, multiple consumer setup: any number of producer applications can send tasks to any number of workers. Naturally this allows seamless horizontal scaling.

What do I need?

The Broker is an integral part in all of this, providing the channel through which producers communicate to consumers and distributing tasks among the available workers. As of writing this, the only officially supported broker is the AMQPBroker which can be used with a RabbitMQ instance. The RabbitMQ instance would be the actual broker, while the AMQPBroker struct provides the API that the Celery app uses to communicate with it.

There are many RabbitMQ hosting services available, such as CloudAMQP and Compose. Both of these have free tier options for development purposes.




Rusty Celery is developed on GitHub as an open source community effort.

Quick Start

Rusty Celery is provided as the celery library on crates.io. To get started, add celery as a dependency to your project. Then you can define tasks by decorating functions with the task attribute:

use celery::prelude::*;

#[celery::task]
fn add(x: i32, y: i32) -> TaskResult<i32> {
    Ok(x + y)
}

And create a Celery app with the app macro:

use celery::prelude::*;
#[celery::task]
fn add(x: i32, y: i32) -> TaskResult<i32> {
    Ok(x + y)
}
let my_app = celery::app!(
    broker = AMQP { std::env::var("AMQP_ADDR").unwrap() },
    tasks = [add],
    task_routes = [],
);

The Celery app can be used as either a producer or consumer (worker). To send tasks to a queue for a worker to consume, use the Celery::send_task method:

use celery::prelude::*;
#[celery::task]
fn add(x: i32, y: i32) -> TaskResult<i32> {
    Ok(x + y)
}
#[tokio::main]
async fn main() -> Result<(), exitfailure::ExitFailure> {
let my_app = celery::app!(
    broker = AMQP { std::env::var("AMQP_ADDR").unwrap() },
    tasks = [add],
    task_routes = [],
);
my_app.send_task(add::new(1, 2)).await?;
  Ok(())
}

And to act as worker and consume tasks sent to a queue by a producer, use the Celery::consume method:

use celery::prelude::*;
#[celery::task]
fn add(x: i32, y: i32) -> TaskResult<i32> {
    Ok(x + y)
}
#[tokio::main]
async fn main() -> Result<(), exitfailure::ExitFailure> {
let my_app = celery::app!(
    broker = AMQP { std::env::var("AMQP_ADDR").unwrap() },
    tasks = [add],
    task_routes = [],
);
my_app.consume().await?;
Ok(())
}

Examples

A full working example is provided in the examples/ directory on GitHub. The includes a Celery app implemented in both Rust and Python with an AMQP broker. The only mandatory system requirement other than Rust is Docker, which is needed to run a RabbitMQ instance for the broker.

To play with the example, first clone the repository:

git clone https://github.com/rusty-celery/rusty-celery && cd rusty-celery

Then start the RabbitMQ instance:

./scripts/brokers/amqp.sh

Once the RabbitMQ Docker container has loaded, you can run a Rust worker in a separate terminal with

cargo run --example celery_app consume

From another terminal you can then send tasks to the worker from Rust with

cargo run --example celery_app produce

If you have Python and the celery Python library installed, you can also consume or produce tasks from the Python app with

python examples/celery_app.py consume

or

python examples/celery_app.py produce

The Rusty Celery Guide

This chapter will walk you through the details of defining and configuring tasks and applications.

Defining Tasks

A task represents a unit of work that a Celery app can produce or consume.

The recommended way to define a task is by decorating a function with the task attribute macro:

use celery::prelude::*;

#[celery::task]
fn add(x: i32, y: i32) -> TaskResult<i32> {
    Ok(x + y)
}

If the function has a return value the return type must be a TaskResult<T>.

Under the hood a task is just a struct that implements the Task trait. When you decorate a function with the task macro, this creates a struct and implements the Task trait so that Task::run calls the function you've defined.

The macro accepts a number of optional parameters.

For example, to give a task a custom name and set a time limit:

use tokio::time::{self, Duration};

#[celery::task(name = "sleep", time_limit = 5)]
async fn delay(secs: u64) {
    time::delay_for(Duration::from_secs(secs)).await;
}

Error handling

When a task executes, i.e. when the Task::run method is called, it returns a TaskResult<T> which is just a Result<T, TaskError>. When an Err(TaskError) is returned, the worker considers the task failed and may send it back to the broker to be retried.

A worker treats certain TaskError variants differently. So when your task has points of failure, such as in the read_some_file example below, you'll need to coerce those possible error types to the appropriate TaskError variant and propogate them upwards:

use celery::prelude::*;

#[celery::task]
async fn read_some_file() -> TaskResult<String> {
    tokio::fs::read_to_string("some_file")
        .await
        .with_unexpected_err(|| "File does not exist")
}

Here tokio::fs::read_to_string("some_file").await produces a tokio::io::Result, so we use the helper method .with_unexpected_err from the TaskResultExt trait to convert this into a TaskError::UnexpectedError and then apply the ? operator to propogate it upwards.

There are two error kinds in particular that are meant as catch-alls for any other type of error that could arise in your task: TaskError::UnexpectedError and TaskError::ExpectedError. The latter should be used for errors that will occasionally happen due to factors outside of your control - such as a third party service being temporarily unavailable - while UnexpectedError should be reserved to indicate a bug or that a critical resource is missing.

Positional vs keyword parameters

Within the Celery protocol task parameters can be treated as either args (positional) or kwargs (key-word based). Both are supported in Rusty Celery, which means you could call the Rust add task defined above from another language like Python in any of the following ways:

celery_app.send_task("add", args=[1, 2])
celery_app.send_task("add", kwargs={"x": 1, "y": 2})
celery_app.send_task("add", args=[1], kwargs={"y": 2})

Optional parameters

Any parameters that are Option<T> types are automatically treated as optional with a default value of None. For example

use tokio::time::{self, Duration};

#[celery::task]
async fn delay(secs: Option<u64>) {
    let secs = secs.unwrap_or(10);
    time::delay_for(Duration::from_secs(secs)).await;
}

So you could call this task from Python with or without providing a value for secs:

celery_app.send_task("sleep", args=[10])
celery_app.send_task("sleep")

Callbacks

You can set custom callbacks to run when a task fails or succeeds through the on_failure and on_success options to the task macro:

use celery::task::Task;
use celery::error::TaskError;
use tokio::time::{self, Duration};

#[celery::task(
    time_limit = 10,
    on_failure = failure_callback,
    on_success = success_callback,
)]
async fn sleep(secs: u64) {
    time::delay_for(Duration::from_secs(secs)).await;
}

async fn failure_callback<T: Task>(task: &T, err: &TaskError) {
    match err {
        TaskError::TimeoutError => println!("Oops! Task {} timed out!", task.name()),
        _ => println!("Hmm task {} failed with {:?}", task.name(), err),
    };
}

async fn success_callback<T: Task>(task: &T, _ret: &T::Returns) {
    println!("{} succeeded!", task.name());
}

Summary

In summary, tasks are easily defined by decorating a function with the #[celery::task] macro. If the function returns anything the return type has to be a TaskResult<T>. Internally the function is wrapped in a struct that implements the Task trait.

The quickest way to propogate expected or unexpected errors from within your task is by using .with_expected_err("...")? or .with_unexpected_err("...")?, respectively, on the Result.

Running Workers

While the Python version of Celery provides a CLI that you can use to run a worker, in Rust you'll have to implement your own worker binary. However this is a lot easier than it sounds. At a minimum you just need to initialize your Celery application, define and register your tasks, and run the Celery::consume method within an async executor.

Here is a complete example of a worker application:

#![allow(non_upper_case_globals)]

use celery::prelude::*;
use exitfailure::ExitFailure;

#[celery::task]
fn add(x: i32, y: i32) -> TaskResult<i32> {
    Ok(x + y)
}

#[tokio::main]
async fn main() -> Result<(), ExitFailure> {
    env_logger::init();

    let celery_app = celery::app!(
        broker = AMQP { std::env::var("AMQP_ADDR").unwrap() },
        tasks = [add],
        task_routes = [],
        prefetch_count = 2,
        acks_late = true,
        default_queue = "celery-rs",
    );

    celery_app.consume().await?;

    Ok(())
}

The consume method will listen for SIGINT and SIGTERM signals - just like a Python worker - and will try to finish all pending tasks before shutting down unless it receives another signal.

Best practices

Acks early vs acks late

Tasks are only removed from a queue when they are acknowledged ("acked") by the worker that received them. The acks_late setting determines when a worker will ack a task. When set to true, tasks are acked after the worker finishes executing them. When set to false, they are executed right before the worker starts executing them.

The default of acks_late is false, however if your tasks are idempotent it's strongly recommended that you set acks_late to true. This has two major benefits.

First, it ensures that if a worker were to crash, any tasks currently executing will be retried automatically by the next available worker.

Second, it provides a better back pressure mechanism when used in conjunction with a suitable prefetch_count (see below).

Prefetch count

When initializing your Rust Celery app it's recommended that you set the prefetch_count to a number suitable for your application, especially if you have acks_late set to true.

If you have acks_late set to false, the default prefetch_count is probably sufficient.

The prefetch_count determines how many un-acked tasks (ignoring those with a future ETA) that a worker can hold onto at any point in time. Having prefetch_count too low or too high can create a bottleneck.

If the number is set too low, workers could be under-utilized. If the number is set too high, workers could be hogging tasks that they can't execute yet, or worse: they could run out of memory from receiving too many tasks and crash.

Unfortunately finding an optimal prefetch count is easier said than done. It depends on a lot of factors, such as the hardware your workers are running on, the task throughput, and whether your tasks are more CPU-bound or IO-bound.

The last reason is especially important. A worker running on even a single CPU can probably handle hundreds, if not thousands, of (non-blocking) IO-bound tasks at once. But a worker consuming CPU-bound tasks is essentially limited to executing one task per CPU core. Therefore a good starting point for prefetch_count would be either 100 x NUM_CPUS for IO-bound tasks or 2 * NUM_CPUS for CPU-bound tasks.

Consuming blocking / CPU-bound tasks

If your tasks are CPU-bound (or otherwise blocking), it's recommended that you use a multi-threaded async runtime, such as the one provided by tokio. Within the task body you can then call tokio::task::block_in_place where appropriate.

Coming from Python?

Though a lot of the Rusty Celery API is very similar to the Python equivalent - e.g. defining tasks by decorating functions - there are a few key differences listed here that have arisen because it was either not practical or just not possible to do it the same way as in Python.

In some cases this means the Rust equivalent is a little more verbose or takes a little more care on the user's end, but ultimately I think you'll find that the downsides of the Rust implementation are heavily outweighed by the benifits it brings: most notably speed, safety, and a much smaller memory footprint.

Registering tasks

In Python you can register tasks by dynamically importing them at runtime through the imports configuration field, but in Rust you need to manually register all tasks either as parameters to the app macro or using the Celery::register_task method:

#![allow(non_upper_case_globals)]
use exitfailure::ExitFailure;
use celery::prelude::*;
#[tokio::main]
async fn main() -> Result<(), ExitFailure> {
let my_app = celery::app!(
    broker = AMQP { std::env::var("AMQP_ADDR").unwrap() },
    tasks = [],
    task_routes = [],
);
#[celery::task]
fn add(x: i32, y: i32) -> TaskResult<i32> {
    Ok(x + y)
}

my_app.register_task::<add>().await.unwrap();
Ok(())
}

Additional Resources

Quick links

Broker hosting options

RabbitMQ (AMQP)