Defining Tasks

A task represents a unit of work that a Celery app can produce or consume.

The recommended way to define a task is by decorating a function with the task attribute macro:

use celery::prelude::*; #[celery::task] fn add(x: i32, y: i32) -> TaskResult<i32> { Ok(x + y) }

If the function has a return value the return type must be a TaskResult<T>.

Under the hood a task is just a struct that implements the Task trait. When you decorate a function with the task macro, this creates a struct and implements the Task trait so that Task::run calls the function you've defined.

The macro accepts a number of optional parameters.

For example, to give a task a custom name and set a time limit:

use tokio::time::{self, Duration}; #[celery::task(name = "sleep", time_limit = 5)] async fn delay(secs: u64) { time::delay_for(Duration::from_secs(secs)).await; }

Error handling

When a task executes, i.e. when the Task::run method is called, it returns a TaskResult<T> which is just a Result<T, TaskError>. When an Err(TaskError) is returned, the worker considers the task failed and may send it back to the broker to be retried.

A worker treats certain TaskError variants differently. So when your task has points of failure, such as in the read_some_file example below, you'll need to coerce those possible error types to the appropriate TaskError variant and propogate them upwards:

use celery::prelude::*; #[celery::task] async fn read_some_file() -> TaskResult<String> { tokio::fs::read_to_string("some_file") .await .with_unexpected_err(|| "File does not exist") }

Here tokio::fs::read_to_string("some_file").await produces a tokio::io::Result, so we use the helper method .with_unexpected_err from the TaskResultExt trait to convert this into a TaskError::UnexpectedError and then apply the ? operator to propogate it upwards.

There are two error kinds in particular that are meant as catch-alls for any other type of error that could arise in your task: TaskError::UnexpectedError and TaskError::ExpectedError. The latter should be used for errors that will occasionally happen due to factors outside of your control - such as a third party service being temporarily unavailable - while UnexpectedError should be reserved to indicate a bug or that a critical resource is missing.

Positional vs keyword parameters

Within the Celery protocol task parameters can be treated as either args (positional) or kwargs (key-word based). Both are supported in Rusty Celery, which means you could call the Rust add task defined above from another language like Python in any of the following ways:

celery_app.send_task("add", args=[1, 2]) celery_app.send_task("add", kwargs={"x": 1, "y": 2}) celery_app.send_task("add", args=[1], kwargs={"y": 2})

Optional parameters

Any parameters that are Option<T> types are automatically treated as optional with a default value of None. For example

use tokio::time::{self, Duration}; #[celery::task] async fn delay(secs: Option<u64>) { let secs = secs.unwrap_or(10); time::delay_for(Duration::from_secs(secs)).await; }

So you could call this task from Python with or without providing a value for secs:

celery_app.send_task("sleep", args=[10]) celery_app.send_task("sleep")

Callbacks

You can set custom callbacks to run when a task fails or succeeds through the on_failure and on_success options to the task macro:

use celery::task::Task; use celery::error::TaskError; use tokio::time::{self, Duration}; #[celery::task( time_limit = 10, on_failure = failure_callback, on_success = success_callback, )] async fn sleep(secs: u64) { time::delay_for(Duration::from_secs(secs)).await; } async fn failure_callback<T: Task>(task: &T, err: &TaskError) { match err { TaskError::TimeoutError => println!("Oops! Task {} timed out!", task.name()), _ => println!("Hmm task {} failed with {:?}", task.name(), err), }; } async fn success_callback<T: Task>(task: &T, _ret: &T::Returns) { println!("{} succeeded!", task.name()); }

Summary

In summary, tasks are easily defined by decorating a function with the #[celery::task] macro. If the function returns anything the return type has to be a TaskResult<T>. Internally the function is wrapped in a struct that implements the Task trait.

The quickest way to propogate expected or unexpected errors from within your task is by using .with_expected_err("...")? or .with_unexpected_err("...")?, respectively, on the Result.