Celery Walkthrough
Table of Contents
Celery implements a Task queue to:
execute instructions asynchronously
run periodic tasks
run cron tasks
…and much more. Celery docs at https://docs.celeryq.dev/
Celery requires a message broker* capable of “outsorcing” tasks.
Main Ingredients
message has input data and name of the task.
task has executing instructions.
queue stores the messages.
broker fetches messages from the queue and delivers them to workers (not Celery)
worker is the task executor (thread, VM, container…)
NOTE: in web_reflectivity, the worker is a CPU thread that delegates work to an external resource, such as the worker docker container or a computer node in the anaysis cluster.
Use of Celery in web_reflectivity
Celery, in concert with the message broker Redis, is used within app web_reflectivity for:
send fitting jobs to the remote worker (submit_job_to_server())
remove expired sessions and associated SSH keys (clean_expired_sessions)
Additional Celery tasks are invoked not as task to be added to the queue but as pure python functions.
establish passwordless SSH tunnels (copy_key_to_server(), delete_key_from_server())
Where does the whole thing start?
The entry point src/docker-entrypoint.sh to the container startup creates two Celery instances:
celery --app fitting.celery worker --loglevel=${CELERY_LOG_LEVEL} --logfile=${CELERY_LOG_PATH} --detach
celery --app fitting.celery beat --scheduler django_celery_beat.schedulers:DatabaseScheduler --loglevel=${CELERY_LOG_LEVEL} --logfile=${CELERY_LOG_PATH} --detach
fitting.celery worker is the startup module
celery worker instantiates the Task queue where new tasks can be added
celery beat instantiates the Task scheduler to store period tasks and cron task to run at specific times.
Most of the startup module src/fitting/celery.py contents is boilerplate code:
app = Celery("web_reflectivity")
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "web_reflectivity.settings.develop")
app.config_from_object("django.conf:settings", namespace="CELERY")
app.autodiscover_tasks()
Configuration discovery is here accomplished with parsing attribute settings of module django.conf, which points to src/web_reflectivity/settings/develop.py and src/web_reflectivity/settings/base.py.
#####
# CELERY CONFIGURATION
#####
CELERY_RESULT_BACKEND = "django-db"
CELERY_BROKER_URL = "redis://redis:6379"
CELERY_TASK_SERIALIZER = "pickle"
CELERY_ACCEPT_CONTENT = ["pickle"]
Task discovery is here accomplished scanning the source code of web_reflectivity and any of its “installed apps”. These are other Django apps inserted as dependencies.
For every installed app, Celery will check whether the app’s source contains a “task.py” file. If so, it will parse the file searching for task functions.
Periodic Tasks
Celery has a flexible scheduling for task creation:
happening at regular intervals
happening at a specific time of the day every certain days of the week (crontab)
Scheduling can happen at compile time or at runtime.
Creating Tasks at Compile time
Scheduling at compile time is defined in the CELERY_BEAT_SCHEDULE setting. In src/web_reflectivity/settings/base.py
CELERY_BEAT_SCHEDULE = {
"clean-expired-sessions": {
"task": "users.tasks.clean_expired_sessions",
"schedule": SESSION_COOKIE_AGE,
},
}
task users.tasks.clean_expired_sessions occurs every SESSION_COOKIE_AGE seconds. It cleans browser sessions that had no activity for SESSION_COOKIE_AGE seconds or more.
@shared_task
def clean_expired_sessions() -> None:
# ..body of the function..
The @shared_task decorator ensures the task is made available to every Celery instance (web_reflectivity has two). Tasks to be made available to specific Celery instances require decorating the task with attribute task said specific instance. One (hopefully) clarifying example:
# Two Celery instances initialized in myapp/celery.py
app1 = Celery("web_reflectivity")
app2 = Celery("web_reflectivity")
# Two tasks defined in myapp/tasks.py
from myapp.celery import app1
@app1.task
def task_specific():
pass # specific_task is made available to app1
@shared_task
def task_general():
pass # task_general is made available to app1 and app2
Notice that the shared task require that the Celery instances are instantiated before the myapp/task.py file is interpreted, as well as imported in the namespace of myapp. This is accomplished with boiler-plate code in myapp/__init__.py:
from .celery import app as celery_app
__all__ = ["celery_app"]
The same boiler-place code is in src/fitting/__init__.py
Creating Tasks at Runtime
Dependency django_celery_beat stores tasks in the app’s database and exposes them in the admin site. Besides showing them, the app admin can edit them as well as create new tasks using anyone of the registered tasks.
The scheduler is specified when the Celery instance is created:
celery --app fitting.celery beat --scheduler django_celery_beat.schedulers:DatabaseScheduler --loglevel=${CELERY_LOG_LEVEL} --logfile=${CELERY_LOG_PATH} --detach
The vanilla scheduler stores the scheduled tasks in a separate file, more appropriate when we’re not supposed to mess with them not schedule new tasks.
Checking Executed Tasks
Dependency django_celery_results collects pieces of information from executed tasks (e.g. the returning value), store them in the database, and exposes them in the admin website. Useful for debugging.
Also, print and logged messages are be redirected to log file /var/log/celery.log in the filesystem of the container running the web service. In web_reflectivity the directory /var/log is bind-mounted to directory /tmp/log/web_reflectivity/web of the host machine.
One-off Tasks
Tasks to be run once in asynchronous mode are invoked with the .delay attribute
# in myapp/task.py
@shared_task
def my_task(greeting, target="World"):
print(f"{greeting}, {target}!")
# in myapp/views.py
from myapp.task import my_task
# queue the task for asynchronous execution with the `delay` attribute
my_task.delay("Hello") # will print "Hello, World!"
In web_reflectivity, django_remote_submission.task.submit_job_to_server is the only task invoked in this fashion.
submit_job_to_server.delay(
job_pk=job.pk,
key_filename=key_filename,
username=username,
log_policy=LogPolicy.LOG_TOTAL,
store_results="",
remote=(not settings.JOB_HANDLING_HOST == "localhost"),
)
Notice that the first positional argument to submit_job_to_server() is the table index in the database storing the state for an instance of class django_remote_submission.models.Job.
When passing information to a task:
pass the python object if:
you want the task to use the state of the object at task creation.
the selected serializer (pickle) can serialize the object.
pass the table index if:
you want the task to use the state of the object at task execution.
the worker has access to the database.
Tasks invoked as functions
Tasks invoked as functions run in the main thread (synchronous mode). Functions decorated with Celery-related decorators can still be calls as pure python functions.
# in myapp/task.py
@shared_task
def my_task(greeting, target="World"):
print(f"{greeting}, {target}!")
# in myapp/views.py
from myapp.task import my_task
# queue the task for asynchronous execution with the `delay` attribute
my_task("Hello") # will print "Hello, World!"
In web_reflectivity, django_remote_submission.task.copy_key_to_server and django_remote_submission.task.delete_key_from_server are the only tasks invoked in this fashion.
delete_key_from_server(
public_key_filename=idfile.public,
username=idfile.executor,
password=None,
key_filename=idfile.private,
hostname=settings.JOB_HANDLING_HOST,
port=settings.JOB_HANDLING_PORT,
remote=True,
)
idfile.delete()
Notice that attributes of idfile are passed to delete_key_from_server() so it needs to run before idfile is deleted. We can be assured if we run delete_key_from_server() on the same thread.