Project Oxygen & Ideo-LabIDEO LAB Dashboard 2026

🐇 Celery – Tâches Asynchrones (Django, RabbitMQ & Redis)

Guide opérationnel complet pour débutants sur l'installation des brokers et l'intégration Django.

1.1 Facile

Vue d'ensemble

Qu'est-ce que Celery ? Tâches asynchrones (hors-requête).

Async Task Queue
1.2 Moyen

Architecture

Client (Django) -> Broker (Queue) -> Worker (Celery).

Client Broker Worker
1.3 Moyen

Concept : Broker (File d'attente)

RabbitMQ (AMQP) vs Redis. Le "facteur".

Broker RabbitMQ Redis
1.4 Avancé

Install: RabbitMQ (Linux)

apt/dnf, rabbitmqctl add_user, set_permissions.

RabbitMQ AMQP
1.5 Facile

Install: Redis (Linux)

apt/dnf, redis-server. Simple et rapide.

Redis Install
1.6 Facile

Installation Celery (Python)

pip install celery, pip install redis, librabbitmq.

pip celery
2.1 Moyen

Setup Django (celery.py)

Configuration : celery.py, __init__.py, settings.py.

Django celery.py
2.2 Moyen

Tâches Django (tasks.py)

@shared_task, appel depuis une view.

@shared_task tasks.py
2.3 Facile

Lancer un Worker

celery -A [proj] worker -l info.

worker CLI
2.4 Facile

Appeler des Tâches

.delay() (simple) vs .apply_async() (avancé).

.delay() .apply_async()
3.1 Moyen

Result Backend (Django)

Stocker les résultats (django-celery-results).

django-celery-results Result
3.2 Moyen

Récupérer les Résultats

AsyncResult(id), task.state, task.result.

AsyncResult state
3.3 Avancé

Tâches Planifiées (Beat)

django-celery-beat, CELERY_BEAT_SCHEDULE.

Beat Cron
3.4 Avancé

Lancer Worker & Beat (Prod)

Deux services systemd séparés (worker, beat).

systemd Prod
4.1 Facile

Monitoring (Flower)

Interface web de monitoring (celery flower).

Flower UI
4.2 Avancé

Bonnes Pratiques (Retries)

autoretry_for, retry_kwargs, bind=True.

Retry Idempotence
5.1 Moyen

Cheat-sheet Celery

celery worker, beat, flower, inspect.

cheat CLI
1.1 Vue d'ensemble : Tâches Asynchrones
Qu'est-ce que Celery ?

Celery est une **file d'attente de tâches (Task Queue) asynchrone** pour Python. Son but est de prendre en charge des tâches longues ou lourdes *en dehors* du cycle de requête-réponse normal de votre application web (ex: Django, Flask).

Problème : Un utilisateur demande un rapport qui prend 30 secondes à générer. Le navigateur de l'utilisateur attend et finit par "timeout".

Solution Celery : L'application web (Django) dit à Celery : "Génère ce rapport" (cela prend 10ms) et renvoie "OK, rapport en cours" à l'utilisateur. Un processus séparé (Worker Celery) prend la tâche et l'exécute en arrière-plan.

Cas d'usage
  • Envoi d'e-mails : (ex: e-mail de bienvenue, réinitialisation de mot de passe).
  • Traitement d'images/vidéos : (Génération de miniatures, encodage).
  • Appels API externes : (Contacter une API tierce lente).
  • Tâches planifiées (Cron) : (Nettoyage de la BDD tous les soirs à minuit).
  • Calculs lourds : (Génération de PDF, rapports complexes).
1.2 Architecture Celery (Client, Broker, Worker)
Les 3 Composants Clés
  1. Client (Producteur) : Votre application (ex: Django) qui *demande* l'exécution d'une tâche.
  2. Broker (File d'attente) : Le "facteur". Un service (ex: RabbitMQ, Redis) qui reçoit les messages (tâches) et les stocke dans une file d'attente.
  3. Worker (Consommateur) : Le processus Celery (un script Python séparé) qui surveille le Broker, prend les tâches, et les exécute.

(Optionnel) Result Backend : Une BDD (ex: Redis, Django DB) où le Worker écrit le résultat (ex: "SUCCÈS", ou l'erreur).

Schéma de flux
[Image d'une architecture Celery]
                                (HTTP)
[Utilisateur] <-----------------> [Django / FastAPI (Client)]
                                      |
                                      | 1. .delay()
                                      | (Envoie Tâche: "envoyer_email(to: 'a@b.c')")
                                      |
                                      ▼
+-------------------------------------------------------------+
| Broker (ex: RabbitMQ ou Redis)                              |
| [ Queue: "emails" | Tâche | Tâche | ... ]                   |
+-------------------------------------------------------------+
    ^                                     |
    | (Surveille la queue)                | 2. (Optionnel) Écrit le résultat
    |                                     |
+----------------------+                  |
| Worker (Celery)      |                  |
| (Processus Python)   |------------------+
| (Exécute la tâche)   |
+----------------------+
1.3 Concept : Le Broker (Message Queue)

Le "Broker" (courtier) est le cœur de Celery. C'est un service tiers qui gère les files d'attente de messages. Celery ne stocke *pas* les tâches lui-même, il les envoie au Broker.

BrokerProtocoleAvantagesInconvénientsUsage
RabbitMQ (Recommandé)AMQPRobuste, fiable (garantit la livraison), routage complexe.Plus lourd à installer/gérer.Production (si les tâches ne *doivent* pas être perdues).
Redis(Custom)Léger, rapide (en mémoire), très facile à installer.Moins de garanties (si le serveur crashe), routage simple.Développement, Tâches non-critiques, Tâches rapides.
1.4 Installation : RabbitMQ (Linux)
Ubuntu / Debian
sudo apt-get update
sudo apt-get install -y rabbitmq-server
RHEL / Rocky / AlmaLinux
# (Nécessite le dépôt EPEL)
sudo dnf install -y rabbitmq-server
Démarrage
sudo systemctl enable --now rabbitmq-server
sudo systemctl status rabbitmq-server
Configuration (rabbitmqctl)

Ne **pas** utiliser l'utilisateur guest (qui ne marche que sur localhost). Créez un utilisateur et un "vhost" (virtual host) dédiés à votre projet.

# 1. Créer un utilisateur (ex: ideo_user)
sudo rabbitmqctl add_user ideo_user "mot_de_passe_solide"

# 2. Créer un Virtual Host (un "silo" pour votre projet)
sudo rabbitmqctl add_vhost ideo_vhost

# 3. Donner les permissions à l'utilisateur SUR le vhost
# (Configure, Write, Read)
sudo rabbitmqctl set_permissions -p ideo_vhost ideo_user ".*" ".*" ".*"

# (Optionnel) Rendre l'utilisateur 'admin' (pour l'UI)
sudo rabbitmqctl set_user_tags ideo_user administrator

URL du Broker (pour Celery) : amqp://ideo_user:mot_de_passe_solide@localhost:5672/ideo_vhost

1.5 Installation : Redis (Linux)
Installation (Simple)

Redis est une base de données Clé-Valeur en mémoire, souvent utilisée pour le cache, les sessions, et comme Broker Celery simple.

Ubuntu / Debian
sudo apt update
sudo apt install -y redis-server
RHEL / Rocky / AlmaLinux
sudo dnf install -y redis
sudo systemctl enable --now redis
Vérification
# Tester la connexion
redis-cli ping
# PONG

Par défaut, Redis écoute sur localhost:6379 sans mot de passe.

URL du Broker (pour Celery) : redis://localhost:6379/0
(/0 est la base de données n°0).

1.6 Installation : Celery (Python)
1. pip install celery

Installez Celery dans votre environnement virtuel Python (venv).

(venv) $ pip install celery
2. Installer les dépendances du Broker

Vous devez aussi installer le "connecteur" (driver) pour votre Broker.

Pour Redis :
pip install redis
Pour RabbitMQ (AMQP) :
pip install librabbitmq
# (ou 'pip install "celery[librabbitmq]"')
2.1 Setup Django : (celery.py, __init__.py, settings.py)
mon_projet/settings.py

Ajoutez la configuration du Broker (et du Backend, voir 3.1) ici.

# mon_projet/settings.py

# --- CELERY SETTINGS ---
# (Utiliser Redis comme Broker)
CELERY_BROKER_URL = "redis://localhost:6379/0"

# (Utiliser RabbitMQ comme Broker)
# CELERY_BROKER_URL = "amqp://ideo_user:mot_de_passe_solide@localhost:5672/ideo_vhost"

# (Format des messages)
CELERY_ACCEPT_CONTENT = ['json']
CELERY_TASK_SERIALIZER = 'json'
mon_projet/celery.py (Nouveau fichier)

Créez ce fichier au même niveau que settings.py. C'est l'instance de l'application Celery.

import os
from celery import Celery

# 1. Définit le module 'settings' de Django pour Celery
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'mon_projet.settings')

app = Celery('mon_projet')

# 2. Charge la config depuis 'settings.py' (ex: CELERY_BROKER_URL)
app.config_from_object('django.conf:settings', namespace='CELERY')

# 3. Auto-découvre les 'tasks.py' dans les apps Django
app.autodiscover_tasks()
mon_projet/__init__.py

Modifiez ce fichier pour vous assurer que l'app Celery est chargée quand Django démarre.

# mon_projet/__init__.py

# (Importer l'app Celery)
from .celery import app as celery_app

__all__ = ('celery_app',)
2.2 Tâches Django (@shared_task) & Appel (View)
mon_app/tasks.py (Nouveau fichier)

Créez un fichier tasks.py dans votre app Django (ex: core/tasks.py). Celery (via autodiscover_tasks()) le trouvera.

Utilisez @shared_task pour créer une tâche qui utilise l'app Celery de Django.

from celery import shared_task
import time

@shared_task
def send_welcome_email(user_email, user_name):
    """
    Tâche (lente) qui envoie un email.
    """
    print(f"Début de l'envoi d'email à {user_name} ({user_email})...")
    # (Simulation d'un appel API SMTP lent)
    time.sleep(10) 
    print("Email envoyé !")
    return f"Email envoyé à {user_email}"

@shared_task
def process_heavy_report(report_id):
    # (Logique métier... ex: Article.objects.get(id=report_id))
    print(f"Traitement du rapport {report_id}...")
    time.sleep(30)
    print(f"Rapport {report_id} terminé.")
    return "OK"
mon_app/views.py (Appeler la tâche)

Dans votre vue (ou n'importe où), importez la tâche et utilisez .delay().

from django.http import HttpResponse
from django.shortcuts import redirect
from .tasks import send_welcome_email

def register_user(request):
    # (Logique de création d'utilisateur...)
    # ...
    # user = User.objects.create(...)
    
    # 1. La vue NE bloque PAS ici.
    #    La tâche est envoyée au Broker (Redis/RabbitMQ).
    send_welcome_email.delay(user.email, user.first_name)
    
    # 2. L'utilisateur reçoit la réponse immédiate.
    return redirect('home')
2.3 Lancer un Worker (Consommateur)
La commande worker

Un "Worker" est un processus (Python) que vous devez lancer *séparément* de Django (runserver).

Ouvrez un **nouveau terminal**, activez votre venv, et lancez :

# Syntaxe: celery -A [nom_projet] worker --loglevel=info
(venv) $ celery -A mon_projet worker --loglevel=info

Vous verrez le worker démarrer, se connecter au Broker (Redis/RabbitMQ) et lister les tâches qu'il a découvertes (ex: send_welcome_email).

Sortie (Log)
[2025-11-02 18:00:00] celery@mon-serveur ready.
[2025-11-02 18:00:00] ...
[2025-11-02 18:00:00] Tasks:
    - mon_app.tasks.send_welcome_email
    - mon_app.tasks.process_heavy_report
    ...
[2025-11-02 18:01:10] Received task: send_welcome_email[abc-123]
[2025-11-02 18:01:20] Task send_welcome_email[abc-123] succeeded in 10s
2.4 Appeler des Tâches (.delay vs .apply_async)
.delay() (Raccourci)

Le plus simple. Exécute la tâche dès que possible.

from .tasks import send_welcome_email
send_welcome_email.delay("user@mail.com", "Alice")

C'est un raccourci pour :

send_welcome_email.apply_async(
    args=["user@mail.com", "Alice"]
)
.apply_async() (Avancé)

Permet de spécifier des options d'exécution (tâches différées, routage...).

from datetime import timedelta

# Lancer dans 1 minute
send_welcome_email.apply_async(
    args=["user@mail.com", "Alice"],
    countdown=60
)

# Lancer demain à 9h00
send_welcome_email.apply_async(
    args=["user@mail.com", "Alice"],
    eta=datetime(2025, 11, 3, 9, 0, 0)
)

# Spécifier une file (Queue) (pour routage)
process_heavy_report.apply_async(
    args=[123],
    queue='heavy_reports'
)
3.1 Result Backend (django-celery-results)

Problème : Comment savoir si une tâche a réussi, échoué, ou quel est son résultat ?

Solution : Configurer un "Result Backend" (base de données de résultats).

django-celery-results est l'addon officiel pour stocker les résultats directement dans votre base de données Django (recommandé).

1. Installation
(venv) $ pip install django-celery-results
2. settings.py
# mon_projet/settings.py

INSTALLED_APPS = [
    ...
    'django_celery_results',
]

# Définir le backend (utiliser 'django-db')
CELERY_RESULT_BACKEND = 'django-db'
# (ou 'django-cache' si vous utilisez le cache Django)
3. migrate

Crée les tables celery_taskmeta et celery_groupmeta.

python manage.py migrate django_celery_results

Désormais, les résultats des tâches (ex: return "OK") seront stockés en BDD.

3.2 Récupérer les Résultats (AsyncResult)
AsyncResult

Quand vous appelez .delay() ou .apply_async(), Celery vous retourne un objet AsyncResult. Cet objet contient le task_id.

Exemple (views.py)
from celery.result import AsyncResult
from .tasks import process_heavy_report

def start_report(request):
    # 1. Lancer la tâche et récupérer l'ID
    task = process_heavy_report.delay(123)
    
    # 2. Stocker l'ID (ex: en session, ou BDD)
    request.session['report_task_id'] = task.id
    
    return redirect('check_report_status')

def check_report_status(request):
    task_id = request.session.get('report_task_id')
    
    # 3. Récupérer le résultat depuis le Backend
    task_result = AsyncResult(task_id)
    
    return render(request, 'status.html', {
        'status': task_result.state, // PENDING, SUCCESS, FAILURE
        'result': task_result.result, // Le 'return' (si SUCCESS)
    })
États (task.state)
ÉtatDescription
PENDINGLa tâche est dans la file d'attente (Broker), pas encore prise.
STARTEDLa tâche a été prise par un Worker.
SUCCESSTâche terminée (task.result contient le retour).
FAILURETâche échouée (task.result contient l'Exception).
RETRYLa tâche a échoué et va être ré-essayée.
3.3 Tâches Planifiées (Celery Beat)
Celery Beat (Le Planificateur)

Beat est un service (planificateur) qui lance des tâches à intervalles réguliers (ex: "toutes les 5 minutes", "tous les lundis à 9h"). C'est le "Cron" de Celery.

Il ne fait *pas* le travail. Il se contente de "dispatcher" les tâches dans le Broker (RabbitMQ/Redis) à l'heure dite. Les Workers (cf 2.3) font le travail.

Setup (django-celery-beat)

Le meilleur moyen d'utiliser Beat avec Django est d'utiliser l'addon qui stocke le planning (schedule) dans la BDD Django.

1. Installation
(venv) $ pip install django-celery-beat
2. settings.py
# mon_projet/settings.py

INSTALLED_APPS = [
    ...
    'django_celery_results',
    'django_celery_beat', // Ajouter Beat
]

# Utiliser le planificateur BDD de Beat
CELERY_BEAT_SCHEDULER = 'django_celery_beat.schedulers:DatabaseScheduler'
3. migrate
python manage.py migrate django_celery_beat

(Ceci crée les tables (django_periodictask...) dans l'Admin Django pour gérer les tâches planifiées via l'UI).

Alternative : Configuration Statique (settings.py)

Vous pouvez aussi définir le planning directement dans settings.py.

from celery.schedules import crontab

CELERY_BEAT_SCHEDULE = {
    
    # Nom de la tâche
    'nettoyer-sessions-chaque-nuit': {
        # La tâche (ex: mon_app.tasks.cleanup)
        'task': 'mon_app.tasks.cleanup_sessions',
        
        # Le planning (crontab: min, h, jour/semaine)
        # (Ici, tous les jours à 4h05 du matin)
        'schedule': crontab(minute='5', hour='4', day_of_week='*'),
        
        # (Arguments)
        'args': (True,), 
    },
    
    'check-api-toutes-les-5-minutes': {
        'task': 'mon_app.tasks.check_api_status',
        # (Toutes les 5 minutes)
        'schedule': crontab(minute='*/5'),
    },
}
3.4 Lancer en Production (Worker + Beat)
Deux services, pas un

En production, vous devez lancer **deux** services Celery (en plus de Django/Gunicorn) :

  1. Le Worker : Le "travailleur" (celery worker).
  2. Le Beat : Le "planificateur" (celery beat).

On les gère (sous Linux) avec systemd (ou Supervisor).

/etc/systemd/system/celery_worker.service
[Unit]
Description=Celery Worker for Django
After=network.target

[Service]
# (Chemins à adapter)
User=ideo_user
Group=www-data
WorkingDirectory=/home/ideo_user/mon_projet
EnvironmentFile=/home/ideo_user/mon_projet/.env
ExecStart=/home/ideo_user/mon_projet/.venv/bin/celery \
    -A mon_projet worker \
    --loglevel=INFO \
    --concurrency=4 # (Nb de coeurs CPU)
Restart=always

[Install]
WantedBy=multi-user.target
/etc/systemd/system/celery_beat.service
[Unit]
Description=Celery Beat (Scheduler)
After=network.target

[Service]
User=ideo_user
Group=www-data
WorkingDirectory=/home/ideo_user/mon_projet
EnvironmentFile=/home/ideo_user/mon_projet/.env
ExecStart=/home/ideo_user/mon_projet/.venv/bin/celery \
    -A mon_projet beat \
    --loglevel=INFO \
    --scheduler=django_celery_beat.schedulers:DatabaseScheduler
Restart=always

[Install]
WantedBy=multi-user.target
sudo systemctl daemon-reload
sudo systemctl enable --now celery_worker celery_beat
4.1 Monitoring (Flower)
Flower (UI de Monitoring)

Flower est une interface web (UI) simple pour surveiller vos workers et vos tâches. (Indispensable en dev/staging).

Installation
(venv) $ pip install flower
Lancement (Dev)
(venv) $ celery -A mon_projet flower --port=5555
# (Accès: http://localhost:5555)

Flower vous montre :

  • Les Workers (en ligne / hors ligne).
  • Les tâches (en cours, réussies, échouées).
  • Les détails des tâches (Arguments, Résultat/Erreur).
4.2 Bonnes Pratiques (Retries & Idempotence)
Idempotence

Règle d'or : Vos tâches *doivent* être **idempotentes**. (Exécuter la tâche 5 fois doit donner le même résultat que l'exécuter 1 fois).

Mauvais (Non-idempotent)
@shared_task
def add_points(user_id, points):
    user = User.objects.get(id=user_id)
    user.points += points
    user.save()

Si la tâche est ré-exécutée (retry), l'utilisateur gagne 2x les points.

Bon (Idempotent)
@shared_task
def set_points(user_id, total_points):
    user = User.objects.get(id=user_id)
    user.points = total_points
    user.save()
Retries (Tentatives)

Gérer les échecs (ex: API externe timeout).

Retry Manuel (bind=True)
@shared_task(bind=True)
def call_api(self, data):
    try:
        response = requests.post('https://api.externe.com', data=data)
        response.raise_for_status() // (Lève une erreur si 4xx/5xx)
    except requests.RequestException as e:
        # (Réessayer dans 60s, max 3 fois)
        raise self.retry(exc=e, countdown=60, max_retries=3)
    
    return response.json()
Retry Automatique (autoretry_for)
@shared_task(
    autoretry_for=(requests.RequestException,),
    retry_kwargs={'max_retries': 3, 'countdown': 60}
)
def call_api_auto(data):
    response = requests.post('https://api.externe.com', data=data)
    response.raise_for_status()
    return response.json()
5.1 Cheat-sheet Celery (CLI)
Processus (-A [projet])
# Lancer un Worker (Prod)
celery -A mon_projet worker -l info --concurrency=4

# Lancer un Worker (Dev, auto-reload)
pip install watchfiles
celery -A mon_projet worker -l debug --autoloader

# Lancer Beat (Planificateur)
celery -A mon_projet beat -l info

# Lancer Flower (Monitoring)
celery -A mon_projet flower --port=5555
Inspection (inspect)
# Ping les workers
celery -A mon_projet inspect ping

# Lister les tâches enregistrées
celery -A mon_projet inspect registered

# Lister les tâches actives (en cours)
celery -A mon_projet inspect active

# Lister les tâches planifiées (Beat)
celery -A mon_projet inspect scheduled

# Purger (vider) la file d'attente (DANGER)
celery -A mon_projet purge