A Rust implementation of Celery for producing and consuming asyncronous tasks with a distributed message queue.
What is Rusty Celery?
Simply put, this is a Rust implementation of the Celery protocol for producing and consuming asyncronous tasks with a distributed message broker. It comes with an idiomatic async API driven by the performant tokio.rs, and above all an emphasis on safety.
How does it work?
Celery revolves around the concept of a task. A task is a unit of work that is requested by a producer to be completed by a consumer / worker.
For example, a social media service may need tasks to notify a user's followers when they post new content. When a user uploads their content to the service's website, the website's backend would act as the producer sending out the tasks to a set of workers - usually deployed on a separate server or cluster - via a distributed message broker.
A Celery
application instance is meant to serve as either the producer or the consumer. In this example, both the website backend and the worker applications would initialize a Celery
app in the same way, with the exact same configuration. The web backend would then call Celery::send_task
to produce a task which a worker would receive while it is consuming tasks through the Celery::consume
method.
Built to scale
The Celery framework is a multiple producer, multiple consumer setup: any number of producer applications can send tasks to any number of workers. Naturally this allows seamless horizontal scaling.
What do I need?
The Broker
is an integral part in all of this, providing the channel through which producers communicate to consumers and distributing tasks among the available workers. As of writing this, the only officially supported broker is the AMQPBroker
which can be used with a RabbitMQ instance. The RabbitMQ instance would be the actual broker, while the AMQPBroker
struct provides the API that the Celery
app uses to communicate with it.
There are many RabbitMQ hosting services available, such as CloudAMQP and Compose. Both of these have free tier options for development purposes.
Rusty Celery is developed on GitHub as an open source community effort.
Quick Start
Rusty Celery is provided as the celery
library on crates.io. To get started, add celery
as a dependency to your project. Then you can define tasks by decorating functions with the task
attribute:
use celery::prelude::*;
#[celery::task]
fn add(x: i32, y: i32) -> TaskResult<i32> {
Ok(x + y)
}
And create a Celery
app with the app
macro:
use celery::prelude::*;
#[celery::task]
fn add(x: i32, y: i32) -> TaskResult<i32> {
Ok(x + y)
}
let my_app = celery::app!(
broker = AMQP { std::env::var("AMQP_ADDR").unwrap() },
tasks = [add],
task_routes = [],
);
The Celery app can be used as either a producer or consumer (worker). To send tasks to a
queue for a worker to consume, use the Celery::send_task
method:
use celery::prelude::*;
#[celery::task]
fn add(x: i32, y: i32) -> TaskResult<i32> {
Ok(x + y)
}
#[tokio::main]
async fn main() -> Result<(), exitfailure::ExitFailure> {
let my_app = celery::app!(
broker = AMQP { std::env::var("AMQP_ADDR").unwrap() },
tasks = [add],
task_routes = [],
);
my_app.send_task(add::new(1, 2)).await?;
Ok(())
}
And to act as worker and consume tasks sent to a queue by a producer, use the
Celery::consume
method:
use celery::prelude::*;
#[celery::task]
fn add(x: i32, y: i32) -> TaskResult<i32> {
Ok(x + y)
}
#[tokio::main]
async fn main() -> Result<(), exitfailure::ExitFailure> {
let my_app = celery::app!(
broker = AMQP { std::env::var("AMQP_ADDR").unwrap() },
tasks = [add],
task_routes = [],
);
my_app.consume().await?;
Ok(())
}
Examples
A full working example is provided in the examples/
directory on GitHub. The includes a Celery app implemented in both Rust and Python with an AMQP broker. The only mandatory system requirement other than Rust is Docker, which is needed to run a RabbitMQ instance for the broker.
To play with the example, first clone the repository:
git clone https://github.com/rusty-celery/rusty-celery && cd rusty-celery
Then start the RabbitMQ instance:
./scripts/brokers/amqp.sh
Once the RabbitMQ Docker container has loaded, you can run a Rust worker in a separate terminal with
cargo run --example celery_app consume
From another terminal you can then send tasks to the worker from Rust with
cargo run --example celery_app produce
If you have Python and the celery Python library installed, you can also consume or produce tasks from the Python app with
python examples/celery_app.py consume
or
python examples/celery_app.py produce
The Rusty Celery Guide
This chapter will walk you through the details of defining and configuring tasks and applications.
Defining Tasks
A task represents a unit of work that a Celery
app can produce or consume.
The recommended way to define a task is by decorating a function with the task
attribute macro:
use celery::prelude::*;
#[celery::task]
fn add(x: i32, y: i32) -> TaskResult<i32> {
Ok(x + y)
}
If the function has a return value the return type must be a TaskResult<T>
.
Under the hood a task is just a struct that implements the Task
trait. When you decorate a function with the task macro, this creates a struct and implements the Task
trait so that Task::run
calls the function you've defined.
The macro accepts a number of optional parameters.
For example, to give a task a custom name and set a time limit:
use tokio::time::{self, Duration};
#[celery::task(name = "sleep", time_limit = 5)]
async fn delay(secs: u64) {
time::delay_for(Duration::from_secs(secs)).await;
}
Error handling
When a task executes, i.e. when the Task::run
method is called, it returns a TaskResult<T>
which is just a Result<T, TaskError>
. When an Err(TaskError)
is returned, the worker considers the task failed and may send it back to the broker to be retried.
A worker treats certain TaskError
variants differently. So when your task has points of failure, such as in the read_some_file
example below, you'll need to coerce those possible error types to the appropriate TaskError
variant and propogate them upwards:
use celery::prelude::*;
#[celery::task]
async fn read_some_file() -> TaskResult<String> {
tokio::fs::read_to_string("some_file")
.await
.with_unexpected_err(|| "File does not exist")
}
Here tokio::fs::read_to_string("some_file").await
produces a tokio::io::Result, so we use the helper method .with_unexpected_err
from the TaskResultExt
trait to convert this into a TaskError::UnexpectedError
and then apply the ?
operator to propogate it upwards.
There are two error kinds in particular that are meant as catch-alls for any other type of error that could arise in your task: TaskError::UnexpectedError
and TaskError::ExpectedError
. The latter should be used for errors that will occasionally happen due to factors outside of your control - such as a third party service being temporarily unavailable - while UnexpectedError
should be reserved to indicate a bug or that a critical resource is missing.
Positional vs keyword parameters
Within the Celery protocol
task parameters can be treated as either args
(positional) or kwargs
(key-word based).
Both are supported in Rusty Celery, which means you could call the Rust add
task defined above from another language like Python in any of the following ways:
celery_app.send_task("add", args=[1, 2])
celery_app.send_task("add", kwargs={"x": 1, "y": 2})
celery_app.send_task("add", args=[1], kwargs={"y": 2})
Optional parameters
Any parameters that are Option<T>
types are automatically treated as optional with a default value of None
. For example
use tokio::time::{self, Duration};
#[celery::task]
async fn delay(secs: Option<u64>) {
let secs = secs.unwrap_or(10);
time::delay_for(Duration::from_secs(secs)).await;
}
So you could call this task from Python with or without providing a value for secs
:
celery_app.send_task("sleep", args=[10])
celery_app.send_task("sleep")
Callbacks
You can set custom callbacks to run when a task fails or succeeds through the on_failure
and on_success
options to the task
macro:
use celery::task::Task;
use celery::error::TaskError;
use tokio::time::{self, Duration};
#[celery::task(
time_limit = 10,
on_failure = failure_callback,
on_success = success_callback,
)]
async fn sleep(secs: u64) {
time::delay_for(Duration::from_secs(secs)).await;
}
async fn failure_callback<T: Task>(task: &T, err: &TaskError) {
match err {
TaskError::TimeoutError => println!("Oops! Task {} timed out!", task.name()),
_ => println!("Hmm task {} failed with {:?}", task.name(), err),
};
}
async fn success_callback<T: Task>(task: &T, _ret: &T::Returns) {
println!("{} succeeded!", task.name());
}
Summary
In summary, tasks are easily defined by decorating a function with the #[celery::task]
macro. If the function returns anything the return type has to be a TaskResult<T>
. Internally the function is wrapped in a struct that implements the Task
trait.
The quickest way to propogate expected or unexpected errors from within your task is by using .with_expected_err("...")?
or .with_unexpected_err("...")?
, respectively, on the Result
.
Running Workers
While the Python version of Celery provides a CLI that you can use to run a worker, in Rust you'll have to implement your own worker binary. However this is a lot easier than it sounds. At a minimum you just need to initialize your Celery
application, define and register your tasks, and run the Celery::consume
method within an async executor.
Here is a complete example of a worker application:
#![allow(non_upper_case_globals)]
use celery::prelude::*;
use exitfailure::ExitFailure;
#[celery::task]
fn add(x: i32, y: i32) -> TaskResult<i32> {
Ok(x + y)
}
#[tokio::main]
async fn main() -> Result<(), ExitFailure> {
env_logger::init();
let celery_app = celery::app!(
broker = AMQP { std::env::var("AMQP_ADDR").unwrap() },
tasks = [add],
task_routes = [],
prefetch_count = 2,
acks_late = true,
default_queue = "celery-rs",
);
celery_app.consume().await?;
Ok(())
}
The consume
method will listen for SIGINT
and SIGTERM
signals - just like a Python worker - and will try to finish all pending tasks before shutting down unless it receives another signal.
Best practices
Acks early vs acks late
Tasks are only removed from a queue when they are acknowledged ("acked") by the worker that received them. The acks_late
setting determines when a worker will ack a task. When set to true
, tasks are acked after the worker finishes executing them. When set to false
, they are executed right before the worker starts executing them.
The default of acks_late
is false
, however if your tasks are idempotent it's strongly recommended that you set acks_late
to true
. This has two major benefits.
First, it ensures that if a worker were to crash, any tasks currently executing will be retried automatically by the next available worker.
Second, it provides a better back pressure mechanism when used in conjunction with a suitable prefetch_count
(see below).
Prefetch count
When initializing your Rust Celery app it's recommended that you set the prefetch_count
to a number suitable for your application, especially if you have acks_late
set to true
.
If you have
acks_late
set tofalse
, the defaultprefetch_count
is probably sufficient.
The prefetch_count
determines how many un-acked tasks (ignoring those with a future ETA) that a worker can hold onto at any point in time. Having prefetch_count
too low or too high can create a bottleneck.
If the number is set too low, workers could be under-utilized. If the number is set too high, workers could be hogging tasks that they can't execute yet, or worse: they could run out of memory from receiving too many tasks and crash.
Unfortunately finding an optimal prefetch count is easier said than done. It depends on a lot of factors, such as the hardware your workers are running on, the task throughput, and whether your tasks are more CPU-bound or IO-bound.
The last reason is especially important. A worker running on even a single CPU can probably handle hundreds, if not thousands, of (non-blocking) IO-bound tasks at once. But a worker consuming CPU-bound tasks is essentially limited to executing one task per CPU core. Therefore a good starting point for prefetch_count
would be either 100 x NUM_CPUS
for IO-bound tasks or 2 * NUM_CPUS
for CPU-bound tasks.
Consuming blocking / CPU-bound tasks
If your tasks are CPU-bound (or otherwise blocking), it's recommended that you use a multi-threaded async runtime, such as the one provided by tokio
. Within the task body you can then call tokio::task::block_in_place
where appropriate.
Coming from Python?
Though a lot of the Rusty Celery API is very similar to the Python equivalent - e.g. defining tasks by decorating functions - there are a few key differences listed here that have arisen because it was either not practical or just not possible to do it the same way as in Python.
In some cases this means the Rust equivalent is a little more verbose or takes a little more care on the user's end, but ultimately I think you'll find that the downsides of the Rust implementation are heavily outweighed by the benifits it brings: most notably speed, safety, and a much smaller memory footprint.
Registering tasks
In Python you can register tasks by dynamically importing them at runtime through the imports
configuration field, but in Rust you need to manually register all tasks either as parameters to the app
macro or using the Celery::register_task
method:
#![allow(non_upper_case_globals)]
use exitfailure::ExitFailure;
use celery::prelude::*;
#[tokio::main]
async fn main() -> Result<(), ExitFailure> {
let my_app = celery::app!(
broker = AMQP { std::env::var("AMQP_ADDR").unwrap() },
tasks = [],
task_routes = [],
);
#[celery::task]
fn add(x: i32, y: i32) -> TaskResult<i32> {
Ok(x + y)
}
my_app.register_task::<add>().await.unwrap();
Ok(())
}