Celery Walkthrough

Celery implements a Task queue to:

  • execute instructions asynchronously

  • run periodic tasks

  • run cron tasks

…and much more. Celery docs at https://docs.celeryq.dev/

Celery requires a message broker* capable of “outsorcing” tasks.

Main Ingredients

  • message has input data and name of the task.

  • task has executing instructions.

  • queue stores the messages.

  • broker fetches messages from the queue and delivers them to workers (not Celery)

  • worker is the task executor (thread, VM, container…)

celery components

NOTE: in web_reflectivity, the worker is a CPU thread that delegates work to an external resource, such as the worker docker container or a computer node in the anaysis cluster.

proxy worker and SSH tunnel to worker

Use of Celery in web_reflectivity

Celery, in concert with the message broker Redis, is used within app web_reflectivity for:

  • send fitting jobs to the remote worker (submit_job_to_server())

  • remove expired sessions and associated SSH keys (clean_expired_sessions)

Additional Celery tasks are invoked not as task to be added to the queue but as pure python functions.

  • establish passwordless SSH tunnels (copy_key_to_server(), delete_key_from_server())

Where does the whole thing start?

The entry point src/docker-entrypoint.sh to the container startup creates two Celery instances:

celery --app fitting.celery worker --loglevel=${CELERY_LOG_LEVEL} --logfile=${CELERY_LOG_PATH} --detach
celery --app fitting.celery beat --scheduler django_celery_beat.schedulers:DatabaseScheduler --loglevel=${CELERY_LOG_LEVEL} --logfile=${CELERY_LOG_PATH} --detach
  • fitting.celery worker is the startup module

  • celery worker instantiates the Task queue where new tasks can be added

  • celery beat instantiates the Task scheduler to store period tasks and cron task to run at specific times.

celery worker and celery beat

Most of the startup module src/fitting/celery.py contents is boilerplate code:

app = Celery("web_reflectivity")

os.environ.setdefault("DJANGO_SETTINGS_MODULE", "web_reflectivity.settings.develop")
app.config_from_object("django.conf:settings", namespace="CELERY")

app.autodiscover_tasks()

Configuration discovery is here accomplished with parsing attribute settings of module django.conf, which points to src/web_reflectivity/settings/develop.py and src/web_reflectivity/settings/base.py.

#####
# CELERY CONFIGURATION
#####
CELERY_RESULT_BACKEND = "django-db"
CELERY_BROKER_URL = "redis://redis:6379"
CELERY_TASK_SERIALIZER = "pickle"
CELERY_ACCEPT_CONTENT = ["pickle"]

Task discovery is here accomplished scanning the source code of web_reflectivity and any of its “installed apps”. These are other Django apps inserted as dependencies.

autodiscover tasks

For every installed app, Celery will check whether the app’s source contains a “task.py” file. If so, it will parse the file searching for task functions.

Periodic Tasks

Celery has a flexible scheduling for task creation:

  • happening at regular intervals

  • happening at a specific time of the day every certain days of the week (crontab)

Scheduling can happen at compile time or at runtime.

Creating Tasks at Compile time

Scheduling at compile time is defined in the CELERY_BEAT_SCHEDULE setting. In src/web_reflectivity/settings/base.py

CELERY_BEAT_SCHEDULE = {
    "clean-expired-sessions": {
        "task": "users.tasks.clean_expired_sessions",
        "schedule": SESSION_COOKIE_AGE,
    },
}

task users.tasks.clean_expired_sessions occurs every SESSION_COOKIE_AGE seconds. It cleans browser sessions that had no activity for SESSION_COOKIE_AGE seconds or more.

@shared_task
def clean_expired_sessions() -> None:
    # ..body of the function..

The @shared_task decorator ensures the task is made available to every Celery instance (web_reflectivity has two). Tasks to be made available to specific Celery instances require decorating the task with attribute task said specific instance. One (hopefully) clarifying example:

# Two Celery instances initialized in myapp/celery.py
app1 = Celery("web_reflectivity")
app2 = Celery("web_reflectivity")

# Two tasks defined in myapp/tasks.py
from myapp.celery import app1

@app1.task
def task_specific():
    pass  # specific_task is made available to app1

@shared_task
def task_general():
    pass  # task_general is made available to app1 and app2

Notice that the shared task require that the Celery instances are instantiated before the myapp/task.py file is interpreted, as well as imported in the namespace of myapp. This is accomplished with boiler-plate code in myapp/__init__.py:

from .celery import app as celery_app
__all__ = ["celery_app"]

The same boiler-place code is in src/fitting/__init__.py

Creating Tasks at Runtime

Dependency django_celery_beat stores tasks in the app’s database and exposes them in the admin site. Besides showing them, the app admin can edit them as well as create new tasks using anyone of the registered tasks.

The scheduler is specified when the Celery instance is created:

celery --app fitting.celery beat --scheduler django_celery_beat.schedulers:DatabaseScheduler --loglevel=${CELERY_LOG_LEVEL} --logfile=${CELERY_LOG_PATH} --detach

The vanilla scheduler stores the scheduled tasks in a separate file, more appropriate when we’re not supposed to mess with them not schedule new tasks.

Checking Executed Tasks

Dependency django_celery_results collects pieces of information from executed tasks (e.g. the returning value), store them in the database, and exposes them in the admin website. Useful for debugging.

Also, print and logged messages are be redirected to log file /var/log/celery.log in the filesystem of the container running the web service. In web_reflectivity the directory /var/log is bind-mounted to directory /tmp/log/web_reflectivity/web of the host machine.

One-off Tasks

Tasks to be run once in asynchronous mode are invoked with the .delay attribute

# in myapp/task.py
@shared_task
def my_task(greeting, target="World"):
    print(f"{greeting}, {target}!")

# in myapp/views.py
from myapp.task import my_task
# queue the task for asynchronous execution with the `delay` attribute
my_task.delay("Hello")  # will print "Hello, World!"

In web_reflectivity, django_remote_submission.task.submit_job_to_server is the only task invoked in this fashion.

submit_job_to_server.delay(
    job_pk=job.pk,
    key_filename=key_filename,
    username=username,
    log_policy=LogPolicy.LOG_TOTAL,
    store_results="",
    remote=(not settings.JOB_HANDLING_HOST == "localhost"),
)

Notice that the first positional argument to submit_job_to_server() is the table index in the database storing the state for an instance of class django_remote_submission.models.Job.

When passing information to a task:

  • pass the python object if:

    • you want the task to use the state of the object at task creation.

    • the selected serializer (pickle) can serialize the object.

  • pass the table index if:

    • you want the task to use the state of the object at task execution.

    • the worker has access to the database.

Tasks invoked as functions

Tasks invoked as functions run in the main thread (synchronous mode). Functions decorated with Celery-related decorators can still be calls as pure python functions.

# in myapp/task.py
@shared_task
def my_task(greeting, target="World"):
    print(f"{greeting}, {target}!")

# in myapp/views.py
from myapp.task import my_task
# queue the task for asynchronous execution with the `delay` attribute
my_task("Hello")  # will print "Hello, World!"

In web_reflectivity, django_remote_submission.task.copy_key_to_server and django_remote_submission.task.delete_key_from_server are the only tasks invoked in this fashion.

delete_key_from_server(
    public_key_filename=idfile.public,
    username=idfile.executor,
    password=None,
    key_filename=idfile.private,
    hostname=settings.JOB_HANDLING_HOST,
    port=settings.JOB_HANDLING_PORT,
    remote=True,
)
idfile.delete()

Notice that attributes of idfile are passed to delete_key_from_server() so it needs to run before idfile is deleted. We can be assured if we run delete_key_from_server() on the same thread.