🐇 Celery – Tâches Asynchrones (Django, RabbitMQ & Redis)
Guide opérationnel complet pour débutants sur l'installation des brokers et l'intégration Django.
Vue d'ensemble
Qu'est-ce que Celery ? Tâches asynchrones (hors-requête).
Async Task QueueArchitecture
Client (Django) -> Broker (Queue) -> Worker (Celery).
Client Broker WorkerConcept : Broker (File d'attente)
RabbitMQ (AMQP) vs Redis. Le "facteur".
Broker RabbitMQ RedisInstall: RabbitMQ (Linux)
apt/dnf, rabbitmqctl add_user, set_permissions.
Install: Redis (Linux)
apt/dnf, redis-server. Simple et rapide.
Installation Celery (Python)
pip install celery, pip install redis, librabbitmq.
Setup Django (celery.py)
Configuration : celery.py, __init__.py, settings.py.
Tâches Django (tasks.py)
@shared_task, appel depuis une view.
Lancer un Worker
celery -A [proj] worker -l info.
Appeler des Tâches
.delay() (simple) vs .apply_async() (avancé).
Result Backend (Django)
Stocker les résultats (django-celery-results).
Récupérer les Résultats
AsyncResult(id), task.state, task.result.
Tâches Planifiées (Beat)
django-celery-beat, CELERY_BEAT_SCHEDULE.
Lancer Worker & Beat (Prod)
Deux services systemd séparés (worker, beat).
Monitoring (Flower)
Interface web de monitoring (celery flower).
Bonnes Pratiques (Retries)
autoretry_for, retry_kwargs, bind=True.
Cheat-sheet Celery
celery worker, beat, flower, inspect.
Qu'est-ce que Celery ?
Celery est une **file d'attente de tâches (Task Queue) asynchrone** pour Python. Son but est de prendre en charge des tâches longues ou lourdes *en dehors* du cycle de requête-réponse normal de votre application web (ex: Django, Flask).
Problème : Un utilisateur demande un rapport qui prend 30 secondes à générer. Le navigateur de l'utilisateur attend et finit par "timeout".
Solution Celery : L'application web (Django) dit à Celery : "Génère ce rapport" (cela prend 10ms) et renvoie "OK, rapport en cours" à l'utilisateur. Un processus séparé (Worker Celery) prend la tâche et l'exécute en arrière-plan.
Cas d'usage
- Envoi d'e-mails : (ex: e-mail de bienvenue, réinitialisation de mot de passe).
- Traitement d'images/vidéos : (Génération de miniatures, encodage).
- Appels API externes : (Contacter une API tierce lente).
- Tâches planifiées (Cron) : (Nettoyage de la BDD tous les soirs à minuit).
- Calculs lourds : (Génération de PDF, rapports complexes).
Les 3 Composants Clés
- Client (Producteur) : Votre application (ex: Django) qui *demande* l'exécution d'une tâche.
- Broker (File d'attente) : Le "facteur". Un service (ex: RabbitMQ, Redis) qui reçoit les messages (tâches) et les stocke dans une file d'attente.
- Worker (Consommateur) : Le processus Celery (un script Python séparé) qui surveille le Broker, prend les tâches, et les exécute.
(Optionnel) Result Backend : Une BDD (ex: Redis, Django DB) où le Worker écrit le résultat (ex: "SUCCÈS", ou l'erreur).
Schéma de flux
[Image d'une architecture Celery]
(HTTP)
[Utilisateur] <-----------------> [Django / FastAPI (Client)]
|
| 1. .delay()
| (Envoie Tâche: "envoyer_email(to: 'a@b.c')")
|
▼
+-------------------------------------------------------------+
| Broker (ex: RabbitMQ ou Redis) |
| [ Queue: "emails" | Tâche | Tâche | ... ] |
+-------------------------------------------------------------+
^ |
| (Surveille la queue) | 2. (Optionnel) Écrit le résultat
| |
+----------------------+ |
| Worker (Celery) | |
| (Processus Python) |------------------+
| (Exécute la tâche) |
+----------------------+
Le "Broker" (courtier) est le cœur de Celery. C'est un service tiers qui gère les files d'attente de messages. Celery ne stocke *pas* les tâches lui-même, il les envoie au Broker.
| Broker | Protocole | Avantages | Inconvénients | Usage |
|---|---|---|---|---|
| RabbitMQ (Recommandé) | AMQP | Robuste, fiable (garantit la livraison), routage complexe. | Plus lourd à installer/gérer. | Production (si les tâches ne *doivent* pas être perdues). |
| Redis | (Custom) | Léger, rapide (en mémoire), très facile à installer. | Moins de garanties (si le serveur crashe), routage simple. | Développement, Tâches non-critiques, Tâches rapides. |
Ubuntu / Debian
sudo apt-get update sudo apt-get install -y rabbitmq-server
RHEL / Rocky / AlmaLinux
# (Nécessite le dépôt EPEL) sudo dnf install -y rabbitmq-server
Démarrage
sudo systemctl enable --now rabbitmq-server sudo systemctl status rabbitmq-server
Configuration (rabbitmqctl)
Ne **pas** utiliser l'utilisateur guest (qui ne marche que sur localhost). Créez un utilisateur et un "vhost" (virtual host) dédiés à votre projet.
# 1. Créer un utilisateur (ex: ideo_user) sudo rabbitmqctl add_user ideo_user "mot_de_passe_solide" # 2. Créer un Virtual Host (un "silo" pour votre projet) sudo rabbitmqctl add_vhost ideo_vhost # 3. Donner les permissions à l'utilisateur SUR le vhost # (Configure, Write, Read) sudo rabbitmqctl set_permissions -p ideo_vhost ideo_user ".*" ".*" ".*" # (Optionnel) Rendre l'utilisateur 'admin' (pour l'UI) sudo rabbitmqctl set_user_tags ideo_user administrator
URL du Broker (pour Celery) : amqp://ideo_user:mot_de_passe_solide@localhost:5672/ideo_vhost
Installation (Simple)
Redis est une base de données Clé-Valeur en mémoire, souvent utilisée pour le cache, les sessions, et comme Broker Celery simple.
Ubuntu / Debian
sudo apt update sudo apt install -y redis-server
RHEL / Rocky / AlmaLinux
sudo dnf install -y redis sudo systemctl enable --now redis
Vérification
# Tester la connexion redis-cli ping # PONG
Par défaut, Redis écoute sur localhost:6379 sans mot de passe.
URL du Broker (pour Celery) : redis://localhost:6379/0
(/0 est la base de données n°0).
1. pip install celery
Installez Celery dans votre environnement virtuel Python (venv).
(venv) $ pip install celery
2. Installer les dépendances du Broker
Vous devez aussi installer le "connecteur" (driver) pour votre Broker.
Pour Redis :
pip install redis
Pour RabbitMQ (AMQP) :
pip install librabbitmq # (ou 'pip install "celery[librabbitmq]"')
mon_projet/settings.py
Ajoutez la configuration du Broker (et du Backend, voir 3.1) ici.
# mon_projet/settings.py # --- CELERY SETTINGS --- # (Utiliser Redis comme Broker) CELERY_BROKER_URL = "redis://localhost:6379/0" # (Utiliser RabbitMQ comme Broker) # CELERY_BROKER_URL = "amqp://ideo_user:mot_de_passe_solide@localhost:5672/ideo_vhost" # (Format des messages) CELERY_ACCEPT_CONTENT = ['json'] CELERY_TASK_SERIALIZER = 'json'
mon_projet/celery.py (Nouveau fichier)
Créez ce fichier au même niveau que settings.py. C'est l'instance de l'application Celery.
import os
from celery import Celery
# 1. Définit le module 'settings' de Django pour Celery
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'mon_projet.settings')
app = Celery('mon_projet')
# 2. Charge la config depuis 'settings.py' (ex: CELERY_BROKER_URL)
app.config_from_object('django.conf:settings', namespace='CELERY')
# 3. Auto-découvre les 'tasks.py' dans les apps Django
app.autodiscover_tasks()mon_projet/__init__.py
Modifiez ce fichier pour vous assurer que l'app Celery est chargée quand Django démarre.
# mon_projet/__init__.py
# (Importer l'app Celery)
from .celery import app as celery_app
__all__ = ('celery_app',)mon_app/tasks.py (Nouveau fichier)
Créez un fichier tasks.py dans votre app Django (ex: core/tasks.py). Celery (via autodiscover_tasks()) le trouvera.
Utilisez @shared_task pour créer une tâche qui utilise l'app Celery de Django.
from celery import shared_task
import time
@shared_task
def send_welcome_email(user_email, user_name):
"""
Tâche (lente) qui envoie un email.
"""
print(f"Début de l'envoi d'email à {user_name} ({user_email})...")
# (Simulation d'un appel API SMTP lent)
time.sleep(10)
print("Email envoyé !")
return f"Email envoyé à {user_email}"
@shared_task
def process_heavy_report(report_id):
# (Logique métier... ex: Article.objects.get(id=report_id))
print(f"Traitement du rapport {report_id}...")
time.sleep(30)
print(f"Rapport {report_id} terminé.")
return "OK"mon_app/views.py (Appeler la tâche)
Dans votre vue (ou n'importe où), importez la tâche et utilisez .delay().
from django.http import HttpResponse
from django.shortcuts import redirect
from .tasks import send_welcome_email
def register_user(request):
# (Logique de création d'utilisateur...)
# ...
# user = User.objects.create(...)
# 1. La vue NE bloque PAS ici.
# La tâche est envoyée au Broker (Redis/RabbitMQ).
send_welcome_email.delay(user.email, user.first_name)
# 2. L'utilisateur reçoit la réponse immédiate.
return redirect('home')La commande worker
Un "Worker" est un processus (Python) que vous devez lancer *séparément* de Django (runserver).
Ouvrez un **nouveau terminal**, activez votre venv, et lancez :
# Syntaxe: celery -A [nom_projet] worker --loglevel=info (venv) $ celery -A mon_projet worker --loglevel=info
Vous verrez le worker démarrer, se connecter au Broker (Redis/RabbitMQ) et lister les tâches qu'il a découvertes (ex: send_welcome_email).
Sortie (Log)
[2025-11-02 18:00:00] celery@mon-serveur ready.
[2025-11-02 18:00:00] ...
[2025-11-02 18:00:00] Tasks:
- mon_app.tasks.send_welcome_email
- mon_app.tasks.process_heavy_report
...
[2025-11-02 18:01:10] Received task: send_welcome_email[abc-123]
[2025-11-02 18:01:20] Task send_welcome_email[abc-123] succeeded in 10s.delay vs .apply_async).delay() (Raccourci)
Le plus simple. Exécute la tâche dès que possible.
from .tasks import send_welcome_email
send_welcome_email.delay("user@mail.com", "Alice")C'est un raccourci pour :
send_welcome_email.apply_async(
args=["user@mail.com", "Alice"]
).apply_async() (Avancé)
Permet de spécifier des options d'exécution (tâches différées, routage...).
from datetime import timedelta
# Lancer dans 1 minute
send_welcome_email.apply_async(
args=["user@mail.com", "Alice"],
countdown=60
)
# Lancer demain à 9h00
send_welcome_email.apply_async(
args=["user@mail.com", "Alice"],
eta=datetime(2025, 11, 3, 9, 0, 0)
)
# Spécifier une file (Queue) (pour routage)
process_heavy_report.apply_async(
args=[123],
queue='heavy_reports'
)django-celery-results)Problème : Comment savoir si une tâche a réussi, échoué, ou quel est son résultat ?
Solution : Configurer un "Result Backend" (base de données de résultats).
django-celery-results est l'addon officiel pour stocker les résultats directement dans votre base de données Django (recommandé).
1. Installation
(venv) $ pip install django-celery-results
2. settings.py
# mon_projet/settings.py
INSTALLED_APPS = [
...
'django_celery_results',
]
# Définir le backend (utiliser 'django-db')
CELERY_RESULT_BACKEND = 'django-db'
# (ou 'django-cache' si vous utilisez le cache Django)3. migrate
Crée les tables celery_taskmeta et celery_groupmeta.
python manage.py migrate django_celery_results
Désormais, les résultats des tâches (ex: return "OK") seront stockés en BDD.
AsyncResult
Quand vous appelez .delay() ou .apply_async(), Celery vous retourne un objet AsyncResult. Cet objet contient le task_id.
Exemple (views.py)
from celery.result import AsyncResult
from .tasks import process_heavy_report
def start_report(request):
# 1. Lancer la tâche et récupérer l'ID
task = process_heavy_report.delay(123)
# 2. Stocker l'ID (ex: en session, ou BDD)
request.session['report_task_id'] = task.id
return redirect('check_report_status')
def check_report_status(request):
task_id = request.session.get('report_task_id')
# 3. Récupérer le résultat depuis le Backend
task_result = AsyncResult(task_id)
return render(request, 'status.html', {
'status': task_result.state, // PENDING, SUCCESS, FAILURE
'result': task_result.result, // Le 'return' (si SUCCESS)
})États (task.state)
| État | Description |
|---|---|
PENDING | La tâche est dans la file d'attente (Broker), pas encore prise. |
STARTED | La tâche a été prise par un Worker. |
SUCCESS | Tâche terminée (task.result contient le retour). |
FAILURE | Tâche échouée (task.result contient l'Exception). |
RETRY | La tâche a échoué et va être ré-essayée. |
Celery Beat (Le Planificateur)
Beat est un service (planificateur) qui lance des tâches à intervalles réguliers (ex: "toutes les 5 minutes", "tous les lundis à 9h"). C'est le "Cron" de Celery.
Il ne fait *pas* le travail. Il se contente de "dispatcher" les tâches dans le Broker (RabbitMQ/Redis) à l'heure dite. Les Workers (cf 2.3) font le travail.
Setup (django-celery-beat)
Le meilleur moyen d'utiliser Beat avec Django est d'utiliser l'addon qui stocke le planning (schedule) dans la BDD Django.
1. Installation
(venv) $ pip install django-celery-beat
2. settings.py
# mon_projet/settings.py
INSTALLED_APPS = [
...
'django_celery_results',
'django_celery_beat', // Ajouter Beat
]
# Utiliser le planificateur BDD de Beat
CELERY_BEAT_SCHEDULER = 'django_celery_beat.schedulers:DatabaseScheduler'3. migrate
python manage.py migrate django_celery_beat
(Ceci crée les tables (django_periodictask...) dans l'Admin Django pour gérer les tâches planifiées via l'UI).
Alternative : Configuration Statique (settings.py)
Vous pouvez aussi définir le planning directement dans settings.py.
from celery.schedules import crontab
CELERY_BEAT_SCHEDULE = {
# Nom de la tâche
'nettoyer-sessions-chaque-nuit': {
# La tâche (ex: mon_app.tasks.cleanup)
'task': 'mon_app.tasks.cleanup_sessions',
# Le planning (crontab: min, h, jour/semaine)
# (Ici, tous les jours à 4h05 du matin)
'schedule': crontab(minute='5', hour='4', day_of_week='*'),
# (Arguments)
'args': (True,),
},
'check-api-toutes-les-5-minutes': {
'task': 'mon_app.tasks.check_api_status',
# (Toutes les 5 minutes)
'schedule': crontab(minute='*/5'),
},
}Deux services, pas un
En production, vous devez lancer **deux** services Celery (en plus de Django/Gunicorn) :
- Le Worker : Le "travailleur" (
celery worker). - Le Beat : Le "planificateur" (
celery beat).
On les gère (sous Linux) avec systemd (ou Supervisor).
/etc/systemd/system/celery_worker.service
[Unit]
Description=Celery Worker for Django
After=network.target
[Service]
# (Chemins à adapter)
User=ideo_user
Group=www-data
WorkingDirectory=/home/ideo_user/mon_projet
EnvironmentFile=/home/ideo_user/mon_projet/.env
ExecStart=/home/ideo_user/mon_projet/.venv/bin/celery \
-A mon_projet worker \
--loglevel=INFO \
--concurrency=4 # (Nb de coeurs CPU)
Restart=always
[Install]
WantedBy=multi-user.target/etc/systemd/system/celery_beat.service
[Unit]
Description=Celery Beat (Scheduler)
After=network.target
[Service]
User=ideo_user
Group=www-data
WorkingDirectory=/home/ideo_user/mon_projet
EnvironmentFile=/home/ideo_user/mon_projet/.env
ExecStart=/home/ideo_user/mon_projet/.venv/bin/celery \
-A mon_projet beat \
--loglevel=INFO \
--scheduler=django_celery_beat.schedulers:DatabaseScheduler
Restart=always
[Install]
WantedBy=multi-user.targetsudo systemctl daemon-reload sudo systemctl enable --now celery_worker celery_beat
Flower (UI de Monitoring)
Flower est une interface web (UI) simple pour surveiller vos workers et vos tâches. (Indispensable en dev/staging).
Installation
(venv) $ pip install flower
Lancement (Dev)
(venv) $ celery -A mon_projet flower --port=5555 # (Accès: http://localhost:5555)
Flower vous montre :
- Les Workers (en ligne / hors ligne).
- Les tâches (en cours, réussies, échouées).
- Les détails des tâches (Arguments, Résultat/Erreur).
Idempotence
Règle d'or : Vos tâches *doivent* être **idempotentes**. (Exécuter la tâche 5 fois doit donner le même résultat que l'exécuter 1 fois).
Mauvais (Non-idempotent)
@shared_task
def add_points(user_id, points):
user = User.objects.get(id=user_id)
user.points += points
user.save()Si la tâche est ré-exécutée (retry), l'utilisateur gagne 2x les points.
Bon (Idempotent)
@shared_task
def set_points(user_id, total_points):
user = User.objects.get(id=user_id)
user.points = total_points
user.save()Retries (Tentatives)
Gérer les échecs (ex: API externe timeout).
Retry Manuel (bind=True)
@shared_task(bind=True)
def call_api(self, data):
try:
response = requests.post('https://api.externe.com', data=data)
response.raise_for_status() // (Lève une erreur si 4xx/5xx)
except requests.RequestException as e:
# (Réessayer dans 60s, max 3 fois)
raise self.retry(exc=e, countdown=60, max_retries=3)
return response.json()Retry Automatique (autoretry_for)
@shared_task(
autoretry_for=(requests.RequestException,),
retry_kwargs={'max_retries': 3, 'countdown': 60}
)
def call_api_auto(data):
response = requests.post('https://api.externe.com', data=data)
response.raise_for_status()
return response.json()Processus (-A [projet])
# Lancer un Worker (Prod) celery -A mon_projet worker -l info --concurrency=4 # Lancer un Worker (Dev, auto-reload) pip install watchfiles celery -A mon_projet worker -l debug --autoloader # Lancer Beat (Planificateur) celery -A mon_projet beat -l info # Lancer Flower (Monitoring) celery -A mon_projet flower --port=5555
Inspection (inspect)
# Ping les workers celery -A mon_projet inspect ping # Lister les tâches enregistrées celery -A mon_projet inspect registered # Lister les tâches actives (en cours) celery -A mon_projet inspect active # Lister les tâches planifiées (Beat) celery -A mon_projet inspect scheduled # Purger (vider) la file d'attente (DANGER) celery -A mon_projet purge
