Defining Tasks

A task represents a unit of work that a Celery app can produce or consume.

The recommended way to define a task is by decorating a function with the task attribute macro:

use celery::prelude::*;

#[celery::task]
fn add(x: i32, y: i32) -> TaskResult<i32> {
    Ok(x + y)
}

If the function has a return value the return type must be a TaskResult<T>.

Under the hood a task is just a struct that implements the Task trait. When you decorate a function with the task macro, this creates a struct and implements the Task trait so that Task::run calls the function you've defined.

The macro accepts a number of optional parameters.

For example, to give a task a custom name and set a time limit:

use tokio::time::{self, Duration};

#[celery::task(name = "sleep", time_limit = 5)]
async fn delay(secs: u64) {
    time::delay_for(Duration::from_secs(secs)).await;
}

Error handling

When a task executes, i.e. when the Task::run method is called, it returns a TaskResult<T> which is just a Result<T, TaskError>. When an Err(TaskError) is returned, the worker considers the task failed and may send it back to the broker to be retried.

A worker treats certain TaskError variants differently. So when your task has points of failure, such as in the read_some_file example below, you'll need to coerce those possible error types to the appropriate TaskError variant and propogate them upwards:

use celery::prelude::*;

#[celery::task]
async fn read_some_file() -> TaskResult<String> {
    tokio::fs::read_to_string("some_file")
        .await
        .with_unexpected_err(|| "File does not exist")
}

Here tokio::fs::read_to_string("some_file").await produces a tokio::io::Result, so we use the helper method .with_unexpected_err from the TaskResultExt trait to convert this into a TaskError::UnexpectedError and then apply the ? operator to propogate it upwards.

There are two error kinds in particular that are meant as catch-alls for any other type of error that could arise in your task: TaskError::UnexpectedError and TaskError::ExpectedError. The latter should be used for errors that will occasionally happen due to factors outside of your control - such as a third party service being temporarily unavailable - while UnexpectedError should be reserved to indicate a bug or that a critical resource is missing.

Positional vs keyword parameters

Within the Celery protocol task parameters can be treated as either args (positional) or kwargs (key-word based). Both are supported in Rusty Celery, which means you could call the Rust add task defined above from another language like Python in any of the following ways:

celery_app.send_task("add", args=[1, 2])
celery_app.send_task("add", kwargs={"x": 1, "y": 2})
celery_app.send_task("add", args=[1], kwargs={"y": 2})

Optional parameters

Any parameters that are Option<T> types are automatically treated as optional with a default value of None. For example

use tokio::time::{self, Duration};

#[celery::task]
async fn delay(secs: Option<u64>) {
    let secs = secs.unwrap_or(10);
    time::delay_for(Duration::from_secs(secs)).await;
}

So you could call this task from Python with or without providing a value for secs:

celery_app.send_task("sleep", args=[10])
celery_app.send_task("sleep")

Callbacks

You can set custom callbacks to run when a task fails or succeeds through the on_failure and on_success options to the task macro:

use celery::task::Task;
use celery::error::TaskError;
use tokio::time::{self, Duration};

#[celery::task(
    time_limit = 10,
    on_failure = failure_callback,
    on_success = success_callback,
)]
async fn sleep(secs: u64) {
    time::delay_for(Duration::from_secs(secs)).await;
}

async fn failure_callback<T: Task>(task: &T, err: &TaskError) {
    match err {
        TaskError::TimeoutError => println!("Oops! Task {} timed out!", task.name()),
        _ => println!("Hmm task {} failed with {:?}", task.name(), err),
    };
}

async fn success_callback<T: Task>(task: &T, _ret: &T::Returns) {
    println!("{} succeeded!", task.name());
}

Summary

In summary, tasks are easily defined by decorating a function with the #[celery::task] macro. If the function returns anything the return type has to be a TaskResult<T>. Internally the function is wrapped in a struct that implements the Task trait.

The quickest way to propogate expected or unexpected errors from within your task is by using .with_expected_err("...")? or .with_unexpected_err("...")?, respectively, on the Result.