Project Oxygen & Ideo-LabIDEO LAB Dashboard 2026

📊 Apache Airflow – Orchestration, DAGs & Executors

Guide complet IDEO-Lab sur l'orchestrateur de pipelines (ETL/ELT) en Python.

1.1

Concept : Orchestrateur

Orchestration de pipelines (ETL/ELT). DAGs (Python).

Airflow ETL Orchestration
1.2

Architecture (4 Composants)

Scheduler, Webserver, Executor, Metadata Database.

Architecture Scheduler
1.3

vs. Cron

Gestion des dépendances, retries, logs, UI (vs 1 ligne).

Cron Comparatif
2.1

Le CƓur : Le DAG (Graphe)

Directed Acyclic Graph. Le "plan" (Python).

DAG Python
2.2

Le "Quoi" : Operator

L'unité de travail (BashOperator, PythonOperator...).

Operator TĂąche
2.3

Le "Lien" : Dépendances

>> (Bitshift), <<, set_downstream().

Dépendances >>
3.1

DAG Run & Task Instance

L'instance (Exécution) vs la Définition (DAG).

DAG Run Task Instance
3.2

Sensors (Attente)

HttpSensor, FileSensor. "Poke" vs "Reschedule".

Sensor Poke
3.3

Hooks (Connexions)

HttpHook, PostgresHook. (Abstractions).

Hook Connexion
4.1

Composant : Scheduler

Le "cƓur" (daemon). Parse les DAGs, lance les tñches.

Scheduler Daemon
4.2

Composant : Webserver

L'interface (Flask) pour monitorer/déclencher.

Webserver UI
4.3

Composant : Metadata Database

Le "cerveau" (Postgres/MySQL) qui stocke l'état.

Database État
5.1

Composant : Executor (Concept)

Comment les tùches s'exécutent (le "moteur").

Executor
5.2

Executors (Locaux)

SequentialExecutor (Dev) vs LocalExecutor (Prod simple).

SequentialExecutor LocalExecutor
5.3

Executors (Distribués)

CeleryExecutor, KubernetesExecutor.

CeleryExecutor KubernetesExecutor
6.1

XCom (Communication)

Partage de données (Push/Pull) entre tùches.

XCom Communication
6.2

Variables & Connections

Gestion des (secrets, configs) via l'UI/Base de données.

Variables Connections
6.3

Pools & Queues

Gestion des ressources (Slots) & Routage (Celery).

Pools Queues
7.1

catchup & backfill

catchup=False (Ignorer le passé), airflow dags backfill.

catchup backfill
7.2

trigger_rule (RĂšgles)

all_success, one_failed, all_done (Nettoyage).

trigger_rule all_done
7.3

Ressources & Liens

Site officiel, Documentation, GitHub.

Documentation Liens
1.1 Concept : Orchestrateur de Pipelines (ETL/ELT)
Qu'est-ce qu'Apache Airflow ?

Apache Airflow (créé par Airbnb, maintenant projet Apache) est une plateforme d'orchestration de flux de travail (workflow) open-source. C'est l'outil de référence pour l'ETL/ELT (Extract, Transform, Load) et le Data Engineering.

"Workflows as Code" (Python)

La philosophie d'Airflow est "Workflows as Code" (Flux de travail en tant que Code). Vous ne "dessinez" pas (drag-and-drop) vos pipelines. Vous les écrivez en Python.

Avantages : Les pipelines (DAGs) sont dynamiques (générés par du code), versionnables (Git), et testables (Pytest).

DAG (Directed Acyclic Graph)

Airflow est basé sur le concept de DAG (Graphe Orienté Acyclique). C'est le "plan" (le "workflow") que vous définissez en Python.

  • Graphe (Graph) : Un ensemble de TĂąches (Tasks) (nƓuds) et de DĂ©pendances (arĂȘtes).
  • OrientĂ© (Directed) : Les liens (dĂ©pendances) ont un sens (TĂąche A -> TĂąche B).
  • Acyclique (Acyclic) : Le graphe n'a pas de boucle (A -> B -> C -> A est impossible).
        [TĂąche A: Extrait (E)]
           │
           ├─â–ș [TĂąche B1: Transform (T)] ─â–ș [TĂąche C1: Load (L)]
           │
           └─â–ș [TĂąche B2: Transform (T)] ─â–ș [TĂąche C2: Load (L)]
1.2 Architecture (Les 4+ Composants)

Airflow (v2.0+) a une architecture "désassemblée" (scalable) basée sur 4 composants principaux (plus le dossier de DAGs).

(Utilisateur)
     │
     ▌ (HTTP)
+----------------+
| [ WEBSERVER ]  | (UI / API)
| (Flask/Gunicorn) |
+----------------+
     │ â–Č
(Lit/Écrit État) │ (Lit État)
     │ â–Œ
+----------------+   (Lit/Parse)   +-------------------+
| [ METADATA DB] | ◄--------------- | [ SCHEDULER ]     |
| (Postgres/MySQL) | (Stocke l'État) | (Daemon/CƓur)     |
| (Ex: Runs, Tasks) |                 +-------------------+
+----------------+                       │ â–Č
     │                                     │ │ (Parse/Lit)
     │ (Envoie TĂąche)                      │ â–Œ
     ▌                               +-------------+
+----------------+                     | [ DAGs ]    |
| [ EXECUTOR ]   |                     | (Python)    |
| (ex: Celery, K8s)|                     | (.py files) |
| (Le "Muscle")  |                     +-------------+
|   [Worker 1]   |
|   [Worker N]   |
+----------------+
  • 1. Webserver (Serveur Web) : (Flask) L'interface utilisateur (UI) pour visualiser les DAGs, monitorer les "Task Instances" (5.1), et dĂ©clencher (Trigger) les "DAG Runs" (5.1).
  • 2. Scheduler (Planificateur) : (Le "CƓur") Un daemon (service) qui lit (parse) vos fichiers .py (DAGs), vĂ©rifie les planifications (schedule) (ex: "cron"), et envoie les tĂąches prĂȘtes Ă  ĂȘtre exĂ©cutĂ©es Ă  l'Executor.
  • 3. Metadata Database (Base de DonnĂ©es) : (Le "Cerveau") Une base SQL (Postgres/MySQL, jamais SQLite en prod). C'est le SPOF. Elle stocke l'Ă©tat (state) de tous les DAGs, TĂąches, Runs, Logs, XComs, et Connections.
  • 4. Executor (ExĂ©cuteur) : (Le "Muscle") DĂ©finit comment les tĂąches sont exĂ©cutĂ©es (Local, Celery, Kubernetes...).
  • (5. Dossier de DAGs) : Un dossier (ex: /opt/airflow/dags) (souvent un volume Git-Sync) que le Scheduler et le Webserver lisent.
1.3 Comparaison : Airflow vs. Cron

Cron (voir guide Cron) est un "planificateur" (scheduler). Airflow est un "orchestrateur" (orchestrator).

CritĂšrecron (Classique)Apache Airflow
Concept"Time-based" (Basé sur le temps)."Event/Dependency-based" (Basé sur les dépendances).
DépendancesNon. (Tùche A ne sait pas si Tùche B a échoué).Oui (A >> B). (Gestion de graphe (DAG)).
Retries (Ré-essais)Non (Manuel).Oui (Intégré, ex: retries=3, retry_delay=...).
Backfilling (Rattrapage)Non (anacron basique).Oui (airflow dags backfill).
Monitoring (Logs)Faible (Email (MAILTO) ou > /dev/null).Excellent (UI Web, Logs centralisés par Tùche/Run).
ScalabilitĂ©LimitĂ©e (1 machine).ÉlevĂ©e (DistribuĂ© : Celery/Kubernetes Executors).
UsageTĂąche simple (backup.sh).Pipelines Data complexes (ETL, ELT, ML).
2.1 Le CƓur : Le DAG (Graphe)

Le DAG est la définition (le "plan") de votre pipeline. C'est un fichier Python (.py) qui est lu (parsé) par le Scheduler.

Exemple (Fichier .py)

Un DAG est défini en Python. Le code n'est pas exécuté, il est parsé par le Scheduler pour (extraire) la structure (Tùches + Dépendances).

from airflow.decorators import dag
from airflow.operators.bash import BashOperator
from datetime import datetime

# (Arguments par défaut pour le DAG)
default_args = {
    'owner': 'ideo',
    'retries': 3,
    'retry_delay': timedelta(minutes=5)
}

# (Définition du DAG (le "Graphe"))
@dag(
    dag_id='mon_premier_dag',
    default_args=default_args,
    description='Un DAG simple',
    
    # (Planification : "Cron")
    # (Tous les jours Ă  2h du matin)
    schedule_interval='0 2 * * *',
    
    # (Date de début (obligatoire))
    start_date=datetime(2025, 1, 1),
    
    # (Ne pas rattraper le passé au 1er lancement)
    catchup=False,
    
    tags=['demo', 'ideo']
)
def ma_fonction_dag():
    
    # (TĂąche 1)
    t1 = BashOperator(
        task_id='print_date',
        bash_command='date'
    )
    
    # (TĂąche 2)
    t2 = BashOperator(
        task_id='sleep',
        bash_command='sleep 5'
    )
    
    # (Dépendance : t1 PUIS t2)
    t1 >> t2

# (Instanciation du DAG)
ma_fonction_dag()
2.2 Le "Quoi" : Operator

Un Operator (OpĂ©rateur) est la TĂąche (Task) atomique. C'est le "nƓud" (node) du DAG. C'est le "Quoi" (l'action Ă  exĂ©cuter).

1. Action Operators

Ils exécutent une action (un "verbe").

  • BashOperator : (Le plus courant) ExĂ©cute une commande Shell/Bash (bash_command='...').
  • PythonOperator : (Le plus puissant) ExĂ©cute une fonction Python (python_callable=ma_fonction).
  • DockerOperator : Lance un conteneur Docker.
  • KubernetesPodOperator : Lance un Pod Kubernetes.
  • PostgresOperator : ExĂ©cute du SQL sur Postgres.
  • SparkSubmitOperator : Soumet un job Spark.
2. Transfer Operators

Déplacent des données (ex: S3ToRedshiftOperator).

3. Sensors (Capteurs) (Voir 3.2)

Ce sont des Operators "spéciaux" (mode='poke') qui attendent (poll) que quelque chose se produise.

  • HttpSensor : Attend qu'une URL (API) retourne 200 OK.
  • FileSensor : Attend qu'un fichier (.csv) arrive sur le disque.
  • SqlSensor : Attend qu'une requĂȘte SQL (SELECT COUNT...) retourne True.
Exemple : PythonOperator

Recommandé (vs BashOperator) car le code est testable (Pytest) et gÚre l'environnement (Python venv).

# --- 1. Définir la fonction Python ---
# (Peut ĂȘtre dans un autre fichier)
def ma_fonction_python(nom, age):
    print(f"Bonjour, {nom}. Vous avez {age} ans.")
    # (Doit retourner une valeur si xcom_push=True)
    return {"status": 200, "user": nom}

@dag(...)
def mon_dag():

    # --- 2. Définir l'Operator ---
    t_python = PythonOperator(
        task_id='tache_python',
        
        # (La fonction Ă  appeler)
        python_callable=ma_fonction_python,
        
        # (Arguments passés à la fonction)
        op_kwargs={
            "nom": "Alice",
            "age": 30
        }
        
        # (Par défaut, le 'return' est poussé (push)
        #  vers XCom (6.1))
        # do_xcom_push=True 
    )
2.3 Le "Lien" : Dépendances (Bitshift)

L'orchestration est définie par les dépendances (flux) entre les tùches (Operators).

Opérateurs "Bitshift" (>>, <<)

La syntaxe "moderne" (Pythonique) pour définir les dépendances (l'ordre).

(t1, t2, t3, t4 sont des Operators)

# --- 1. Séquentiel ---
# (Exécute t1, PUIS t2)
t1 >> t2

# (Équivalent : t2 << t1)
# (Équivalent : t2.set_upstream(t1))

# --- 2. ParallĂšle (Fan-Out) ---
# (Exécute t1, PUIS (t2 et t3 en parallÚle))
t1 >> [t2, t3]

# --- 3. ParallĂšle (Fan-In) ---
# (Exécute (t1 et t2 en parallÚle),
#  PUIS (quand les 2 sont finis) t3)
[t1, t2] >> t3

# --- 4. ChaĂźnage Complexe ---
start >> [t_extract_A, t_extract_B]
t_extract_A >> t_transform_A
t_extract_B >> t_transform_B
[t_transform_A, t_transform_B] >> t_load
t_load >> end
3.1 DAG Run & Task Instance (Les Exécutions)

Il faut différencier la Définition (le .py) de l'Exécution (le "Run").

  • DAG (Le Plan) : (Fichier mon_dag.py) C'est la DĂ©finition (le "plan", la "classe").
  • DAG Run (L'ExĂ©cution) : (L'"Instance" du plan) C'est 1 exĂ©cution du DAG pour un instant T (ex: "L'exĂ©cution du 10 Nov 2025").
  • Task Instance (L'Instance de TĂąche) : (L'"Instance" de la tĂąche) L'exĂ©cution d'1 TĂąche (Operator) pour 1 DAG Run spĂ©cifique. C'est l'unitĂ© la plus petite (ex: "TĂąche t1 du Run du 10 Nov").
États (State) d'une Task Instance

C'est ce que l'on voit dans l'UI (les carrés de couleur) :

  • none (Gris clair) : (Pas encore dĂ©marrĂ©)
  • queued (Gris foncĂ©) : (EnvoyĂ© Ă  l'Executor (Celery)).
  • running (Vert clair) : (En cours d'exĂ©cution).
  • success (Vert foncĂ©) : (TerminĂ© avec succĂšs (exit 0)).
  • failed (Rouge) : (Échec (exit non-zĂ©ro) aprĂšs N retries).
  • upstream_failed (Orange) : (IgnorĂ©, car une TĂąche "parent" (upstream) a Ă©chouĂ©).
  • skipped (Rose) : (IgnorĂ© (ex: BranchPythonOperator)).
  • up_for_retry (Jaune) : (Échec, en attente avant rĂ©-essai).
3.2 Sensors (L'Attente)

Un Sensor (Capteur) (une sous-classe d'Operator) est une tĂąche qui attend (poll) qu'une condition (externe) soit True.

Mode poke (Défaut)

Le Sensor prend un "Slot" (Worker) et tourne en boucle (sleep) jusqu'Ă  ce que la condition soit True.

# (Ex: Attendre un fichier S3)
s3_sensor = S3KeySensor(
    task_id='wait_for_s3_file',
    bucket_name='mon-bucket',
    bucket_key='data/input.csv',
    
    mode='poke', # (Défaut)
    poke_interval=60, # (Vérifie toutes les 60 sec)
    timeout=3600 # (Échoue aprùs 1h)
)

Inconvénient : Inefficace. Il "consomme" un slot Worker (ex: Celery) pendant 1h, juste pour attendre.

Mode reschedule (Recommandé)

Le Sensor (tĂąche) s'arrĂȘte (up_for_retry) et libĂšre son "Slot" (Worker), puis demande au Scheduler de le "rĂ©veiller" (re-planifier) plus tard (ex: dans 60 sec) pour rĂ©essayer.

s3_sensor = S3KeySensor(
    task_id='wait_for_s3_file',
    ...
    mode='reschedule', # (LibĂšre le slot)
    poke_interval=60,
    timeout=3600
)

Avantage : Efficace. Ne consomme pas de "Slot" Worker pendant l'attente. (Idéal pour les "sensors" qui attendent des heures).

3.3 Hooks (Connexions)

Un Hook (Crochet) est une abstraction (wrapper) Python (une classe) qui gĂšre l'interface avec un service externe (BDD, API, Cloud).

Hook vs Operator

Les Operators (2.2) utilisent les Hooks (3.3) (via les Connections (6.2)) pour faire le travail.

(Operator: PostgresOperator)
 "Exécute ce SQL"
   │
   │ (Utilise)
   ▌
(Hook: PostgresHook)
 "Je gĂšre la connexion (lib 'psycopg2'),
  j'ouvre le curseur, j'exécute, je ferme."
   │
   │ (Utilise)
   ▌
(Connection: "my_postgres_db")
 (ID dans l'UI : contient HĂŽte, Login, Pass)
Usage (Dans un PythonOperator)

On utilise les Hooks directement dans le code Python (PythonOperator) pour de la logique complexe (ex: SELECT -> Transform -> INSERT).

from airflow.providers.postgres.hooks.postgres import PostgresHook

def select_and_insert():
    # 1. (Utilise "my_postgres_db" (6.2))
    pg_hook = PostgresHook(postgres_conn_id='my_postgres_db')
    
    # 2. (Wrapper pour 'psycopg2')
    # (Utilise .get_conn(), .run(), .get_records()...)
    records = pg_hook.get_records("SELECT * FROM table_a")
    
    # (Transformation...)
    
    # 3. Insérer
    pg_hook.insert_rows(table='table_b', rows=...)
4.1 Composant : Scheduler (Planificateur)

Le Scheduler (Planificateur) (airflow scheduler) est le cƓur (daemon) d'Airflow.

RĂŽles (Boucle)
  1. 1. Parser (Analyser) les DAGs : (Toutes les N sec) Scanne le dossier dags_folder. Parse tous les fichiers .py (Python) pour détecter les nouveaux DAGs, les Tùches, et les dépendances.
  2. 2. Mettre à jour la BDD : Met à jour la Metadata Database (4.3) avec la structure (définition) des DAGs.
  3. 3. Planifier (Schedule) : (Toutes les N sec) Interroge la BDD :
    • "Y a-t-il des DAG Runs (schedule_interval) Ă  crĂ©er (ex: le cron '0 2 * * *' est arrivĂ©) ?"
    • "Y a-t-il des Task Instances (running) Ă  vĂ©rifier (timeout, retry) ?"
    • "Y a-t-il des Task Instances (success) dont les dĂ©pendances (A >> B) sont satisfaites ?"
  4. 4. Mettre en File (Queue) : Si une TĂąche (ex: B) est prĂȘte, le Scheduler la marque "queued" (en attente) et l'envoie Ă  l'Executor (5.1) (ex: Celery).
4.2 Composant : Webserver (UI)

Le Webserver (airflow webserver) est l'Interface Web (GUI) d'Airflow. C'est une application Flask (Python) (généralement servie via Gunicorn en production).

Architecture "Stateless"

L'Interface Web (UI) est (presque) stateless. Elle ne fait rien (pas de planification, pas d'exécution).

Son seul rÎle est de lire et écrire dans la Metadata Database (4.3).

Fonctionnalités (UI)
  • Vue (DAGs) : Lister, Mettre en Pause (Pause), DĂ©clencher (Trigger DAG).
  • Vue (Grid / Tree) : Visualiser l'Ă©tat (success, failed) des "DAG Runs" et "Task Instances".
  • Monitoring : Voir les Logs (rĂ©cupĂ©rĂ©s de la BDD ou du worker), voir les Gantt Charts (temps d'exĂ©cution).
  • Admin (Menu) : GĂ©rer les Connections (6.2), Variables (6.2), Pools (6.3).
4.3 Composant : Metadata Database (Le Cerveau)

La Metadata Database (Base de DonnĂ©es des MĂ©tadonnĂ©es) est le cƓur (cerveau) de l'architecture Airflow. C'est le SPOF (Single Point of Failure).

Stockage (État)

Elle stocke (via SQLAlchemy) tout l'état :

  • La liste des DAGs (parsĂ©s).
  • L'historique de tous les DAG Runs.
  • L'Ă©tat (success, failed...) de toutes les Task Instances.
  • Les Variables et Connections (chiffrĂ©es).
  • Les XComs (donnĂ©es inter-tĂąches).
  • (Si LocalExecutor) Les logs des tĂąches.
Choix (Dev vs Prod)
  • DĂ©veloppement (DĂ©faut) : SQLite (Fichier local airflow.db). (Facile, mais ne supporte pas le parallĂ©lisme (LocalExecutor) !).
  • Production (Requis) : PostgreSQL ou MySQL. (NĂ©cessaire pour LocalExecutor, CeleryExecutor, KubernetesExecutor).
5.1 Composant : Executor (Concept)

L'Executor (Exécuteur) est le "moteur" (le "comment"). C'est le composant (configuré dans airflow.cfg) que le Scheduler (4.1) utilise pour exécuter (lancer) physiquement une tùche.

(Scheduler) : "La TĂąche A est prĂȘte (queued)"
     │
     │ (Ordre : "ExĂ©cute TĂąche A")
     ▌
[ EXECUTOR ] (ex: Celery)
     │
     │ (Envoie à un Worker)
     ▌
[ Worker (Processus) ]
 (Exécute: 'airflow tasks run ...')
5.2 Executors (Locaux)
SequentialExecutor (Défaut)

Usage : Développement, Test, Débogage.

Fonctionnement : ExĂ©cute 1 seule tĂąche Ă  la fois, dans le mĂȘme processus que le Scheduler.

Inconvénient : Pas de parallélisme. (Utilise SQLite par défaut).

LocalExecutor

Usage : Production (Simple, 1 seule machine/VM).

Fonctionnement : Le Scheduler (MaĂźtre) "fork" (lance) des processus (workers) sur la mĂȘme machine pour exĂ©cuter les tĂąches en parallĂšle.

Pré-requis : Doit utiliser PostgreSQL ou MySQL (pas SQLite, qui bloque).

Inconvénient : SPOF (Si la VM meurt, le Scheduler et les Workers meurent).

5.3 Executors (Distribués / Scalés)
CeleryExecutor

Usage : Production (Scalabilité distribuée).

Architecture (Complexe) :

  1. (Scheduler) Envoie la TĂąche (Message) -> [ Message Broker ] (ex: RabbitMQ, Redis).
  2. (N Workers) Des Workers Celery (processus Python, sur N VMs) écoutent la Queue.
  3. (Worker 1) Prend la Tùche, l'exécute, met à jour la BDD (Metadata).

Avantage : TrĂšs scalable (ajouter des VMs Workers), robuste.

Inconvénient : Complexe (il faut gérer RabbitMQ/Redis en plus d'Airflow).

KubernetesExecutor

Usage : Production (Cloud Native, K8s).

Architecture :

  1. (Scheduler) (Tourne dans K8s) Détecte une Tùche.
  2. (Scheduler) Parle Ă  l'API K8s.
  3. (K8s) Lance un Nouveau Pod (Worker éphémÚre) juste pour cette Tùche (basé sur une image Docker).
  4. (Pod) Exécute la Tùche (airflow tasks run ...).
  5. (Pod) Se termine (Completed ou Error).

Avantage : Isolation parfaite (chaque tùche a son propre Pod/environnement). Scalabilité (K8s).

Inconvénient : "Cold Start" (démarrage d'un Pod) (latence).

6.1 XCom (Cross-Communication)

ProblÚme : Comment la Tùche B peut-elle récupérer une valeur (ex: un ID, un nom de fichier) générée par la Tùche A ?

Solution : XCom (Cross-Communication). C'est un systÚme (basé sur la Metadata DB) de "passage" (Push/Pull) de petites données (métadonnées).

1. XCom Push (Envoyer)

Automatique (PythonOperator) : Si une fonction python_callable (2.2) retourne (return) une valeur, Airflow la "Push" (sérialise, stocke dans la BDD) automatiquement.

Manuel (BashOperator) :

(TĂąche A)
def f_push(**context):
    # Pousse manuellement (Clé/Valeur)
    context['ti'].xcom_push(key='mon_id', value=12345)

t_push = PythonOperator(
    task_id='t_push',
    python_callable=f_push
)
2. XCom Pull (Recevoir)

Utilise Jinja Templating (dans bash_command) ou xcom_pull (dans PythonOperator) pour récupérer la valeur.

(TĂąche B - Python)
def f_pull(**context):
    # Tire la valeur (key) de la TĂąche A (task_ids)
    valeur = context['ti'].xcom_pull(
        key='mon_id',
        task_ids='t_push'
    )
    print(f"Valeur reçue: {valeur}") # 12345

t_pull = PythonOperator(...)

# (TĂąche C - Bash)
t_bash = BashOperator(
    task_id='t_bash',
    # (Utilise Jinja pour 'pull' la valeur de retour (default)
    #  de la tĂąche 't_push')
    
    # --- CORRECTION TEMPLATE DJANGO ---
    bash_command="echo 'Valeur reçue: {{ ti.xcom_pull(task_ids='t_push') }}'"
)

[t_push] >> [t_pull, t_bash]

Limite : Ne pas utiliser XCom pour de grosses données (ex: un DataFrame Pandas). XCom est stocké dans la BDD (limite ~48KB). (Pour les grosses données, Tùche A écrit sur S3/GCS, Tùche B lit depuis S3/GCS).

6.2 Variables & Connections

Ce sont les mécanismes (stockés dans la Metadata DB, gérés via l'UI Admin) pour externaliser la configuration (éviter le "hardcoding" dans les DAGs).

Variables

Stockage Clé/Valeur (K/V) simple (non-structuré).

(UI Admin) -> KEY: 'mon_seuil', VALUE: '100'

(Usage - Python)
from airflow.models import Variable
seuil = Variable.get("mon_seuil", default_var=50)

(Usage - Jinja)
{{ var.value.mon_seuil }}

Attention : Variable.get() fait un appel BDD (lent) à chaque fois. (Ne pas utiliser dans une boucle. OK au début du script).

Connections

Stockage structuré et chiffré (dans la BDD) pour les connexions externes (utilisé par les Hooks (3.3)).

(UI Admin) -> Créer Connexion :

  • Conn ID: my_postgres_db (L'ID)
  • Conn Type: Postgres
  • Host: db.ideolab.com
  • Login: admin
  • Password: *** (chiffrĂ©)
  • Schema: prod
(Usage - PythonOperator/Hook)
# (Le Hook lit 'my_postgres_db'
#  automatiquement depuis la BDD)
pg_hook = PostgresHook(postgres_conn_id='my_postgres_db')
6.3 Pools & Queues (Gestion des Ressources)

Outils pour gérer la concurrence (QoS) et éviter de surcharger les systÚmes externes (APIs, BDDs).

Pools (Piscines)

Un Pool (géré dans l'UI Admin) est une limite (Slot) de concurrence interne à Airflow.

Usage : "Mon API REST externe (api_meteo) n'accepte que 5 connexions parallĂšles."

Configuration :

  1. (UI) Créer un Pool api_meteo_pool (Slots = 5).
  2. (DAG) Assigner la tĂąche Ă  ce pool :

t_api = HttpSensor(
    task_id='call_api',
    pool='api_meteo_pool',
    ...
)

RĂ©sultat : MĂȘme si 100 DAGs tournent, le Scheduler ne lancera jamais plus de 5 tĂąches (api_meteo_pool) en mĂȘme temps.

Queues (Files d'attente)

Ceci est spécifique au CeleryExecutor (5.3). C'est du routage.

Usage : "J'ai des Tùches 'lourdes' (ML, 128Go RAM) et des Tùches 'légÚres' (Bash, 1Go RAM)."

Configuration :

  1. (Celery) Créer 2 types de Workers : worker_heavy (VMs 128Go), worker_light (VMs 4Go).
  2. (Celery) worker_heavy écoute la Queue heavy_tasks.
  3. (Celery) worker_light écoute la Queue light_tasks.
  4. (DAG) Assigner la tĂąche Ă  une queue :

t_ml = PythonOperator(
    task_id='train_model',
    queue='heavy_tasks',
    ...
)
t_bash = BashOperator(
    task_id='print_date',
    queue='light_tasks',
    ...
)
7.1 catchup & backfill
catchup (Rattrapage)

Le PiÚge : Vous créez un DAG (schedule_interval='@daily', start_date='2024-01-01'). Vous le déployez (activez) aujourd'hui (2025-11-13).

Comportement (Défaut : catchup=True) : Airflow voit qu'il a "manqué" ~680 exécutions (depuis 2024-01-01). Il va immédiatement lancer 680 DAG Runs (Backfills), un pour chaque jour manqué. (Spam/DoS).

Solution (Bonne Pratique) : Toujours mettre catchup=False dans le @dag (ou default_args), sauf si vous *voulez* ce comportement.

airflow dags backfill (Rattrapage Manuel)

La commande (CLI) pour exécuter (manuellement) un "rattrapage" (Backfill) sur une période passée, en ignorant les dépendances.

# (Relancer (manuellement) le DAG 'mon_dag'
#  pour la période du 1er au 5 Nov)
$ airflow dags backfill mon_dag \
    --start-date 2025-11-01 \
    --end-date 2025-11-05
7.2 trigger_rule (RÚgles de Déclenchement)

Par défaut (trigger_rule='all_success'), une Tùche (B) ne s'exécute que si tous ses parents (A) ont réussi (success).

La trigger_rule (un argument de l'Operator) change ce comportement.

[TĂąche A] >> [TĂąche C]
[TĂąche B] >> [TĂąche C]
RĂšgle (sur TĂąche C)Description
all_success (Défaut)Se lance si A=Success ET B=Success.
one_successSe lance si A=Success OU B=Success.
all_failedSe lance si A=Failed ET B=Failed.
one_failed(Alerte) Se lance si A=Failed OU B=Failed. (Utile pour une tĂąche d'alerte).
all_doneSe lance quand A et B sont "finis" (success, failed, ou skipped).
none_failed(Nettoyage) Se lance si A=Success/Skipped ET B=Success/Skipped. (Utile pour une tùche "cleanup" qui ne doit jamais tourner si une ingestion a échoué).