Fernando Alves

Using celery with multiple queues, retries and scheduled tasks

Celery

On this post, I’ll show how to work with multiple queues, scheduled tasks, and retry when something goes wrong.

If you don’t know how to use celery, read this post first: http://3.230.47.72/executing-time-consuming-tasks-asynchronously-with-django-and-celery/

Retrying a task

Let’s say your task depends on an external API or connects to another web service and for any reason, it’s raising a ConnectionError, for instance. It’s plausible to think that after a few seconds the API, web service, or anything you are using may be back on track and working again. In this cases, you may want to catch an exception and retry your task.


from celery import shared_task
 
@shared_task(bind=True, max_retries=3)  # you can determine the max_retries here
def access_awful_system(self, my_obj_id):
    from core.models import Object
    from requests import ConnectionError
 
    o = Object.objects.get(pk=my_obj_id)
 
    # If ConnectionError try again in 180 seconds
    try:
 
        o.access_awful_system()
  
    except ConnectionError as exc:
        self.retry(exc=exc, countdown=180)  # the task goes back to the queue

The self.retry inside a function is what’s interesting here. That’s possible thanks to bind=True on the shared_task decorator. It turns our function access_awful_system into a method of Task class. And it forced us to use self as the first argument of the function too.

Another nice way to retry a function is using exponential backoff:

self.retry(exc=exc, countdown=2 ** self.request.retries)

ETA – Scheduling a task for later

Now, imagine that your application has to call an asynchronous task, but need to wait one hour until running it.

In this case, we just need to call the task using the ETA(estimated time of arrival)  property and it means your task will be executed any time after ETA. To be precise not exactly in ETA time because it will depend if there are workers available at that time. If you want to schedule tasks exactly as you do in crontab, you may want to take a look at CeleryBeat).


from django.utils import timezone
from datetime import timedelta

now = timezone.now() 
 
# later is one hour from now
later = now + timedelta(hours=1)

access_awful_system.apply_async((object_id), eta=later) 

Using more queues

When you execute celery, it creates a queue on your broker (in the last blog post it was RabbitMQ). If you have a few asynchronous tasks and you use just the celery default queue, all tasks will be going to the same queue.

Suppose that we have another task called too_long_task and one more called quick_task and imagine that we have one single queue and four workers.

In that scenario, imagine if the producer sends ten messages to the queue to be executed by too_long_task and right after that, it produces ten more messages to quick_task. What is going to happen? All your workers may be occupied executing too_long_task that went first on the queue and you don’t have workers on quick_task.

The solution for this is routing each task using named queues.


# CELERY ROUTES
CELERY_ROUTES = {
    'core.tasks.too_long_task': {'queue': 'too_long_queue'},
    'core.tasks.quick_task': {'queue': 'quick_queue'},
}

Now we can split the workers, determining which queue they will be consuming.


# For too long queue
celery --app=proj_name worker -Q too_long_queue -c 2

# For quick queue
celery --app=proj_name worker -Q quick_queue -c 2

I’m using 2 workers for each queue, but it depends on your system.

As, in the last post, you may want to run it on Supervisord

There is a lot of interesting things to do with your workers here.

Calling Sequential Tasks

Another common issue is having to call two asynchronous tasks one after the other. It can happen in a lot of scenarios, e.g. if the second tasks use the first task as a parameter.

You can use chain to do that


from celery import chain
from tasks import first_task, second_task
 
chain(first_task.s(meu_objeto_id) | second_task.s())

The chain is a task too, so you can use parameters on apply_async, for instance, using an ETA:


chain(salvar_dados.s(meu_objeto_id) | trabalhar_dados.s()).apply_async(eta=depois)

Ignoring the results from ResultBackend

If you just use tasks to execute something that doesn’t need the return from the task you can ignore the results and improve your performance.

If you’re just saving something on your models, you’d like to use this in your settings.py:


CELERY_IGNORE_RESULT = True

 

Sources:

http://docs.celeryproject.org/en/latest/userguide/tasks.html

http://docs.celeryproject.org/en/latest/userguide/optimizing.html#guide-optimizing

https://denibertovic.com/posts/celery-best-practices/

https://news.ycombinator.com/item?id=7909201

http://docs.celeryproject.org/en/latest/userguide/workers.html

http://docs.celeryproject.org/en/latest/userguide/canvas.html

 

Super Bônus

Celery Messaging at Scale at Instagram – Pycon 2013

Sair da versão mobile