đ Apache Airflow â Orchestration, DAGs & Executors
Guide complet IDEO-Lab sur l'orchestrateur de pipelines (ETL/ELT) en Python.
Concept : Orchestrateur
Orchestration de pipelines (ETL/ELT). DAGs (Python).
Airflow ETL OrchestrationArchitecture (4 Composants)
Scheduler, Webserver, Executor, Metadata Database.
Architecture Schedulervs. Cron
Gestion des dépendances, retries, logs, UI (vs 1 ligne).
Cron ComparatifLe CĆur : Le DAG (Graphe)
Directed Acyclic Graph. Le "plan" (Python).
DAG PythonLe "Quoi" : Operator
L'unité de travail (BashOperator, PythonOperator...).
Le "Lien" : Dépendances
>> (Bitshift), <<, set_downstream().
DAG Run & Task Instance
L'instance (Exécution) vs la Définition (DAG).
DAG Run Task InstanceSensors (Attente)
HttpSensor, FileSensor. "Poke" vs "Reschedule".
Hooks (Connexions)
HttpHook, PostgresHook. (Abstractions).
Composant : Scheduler
Le "cĆur" (daemon). Parse les DAGs, lance les tĂąches.
Scheduler DaemonComposant : Webserver
L'interface (Flask) pour monitorer/déclencher.
Webserver UIComposant : Metadata Database
Le "cerveau" (Postgres/MySQL) qui stocke l'état.
Database ĂtatComposant : Executor (Concept)
Comment les tùches s'exécutent (le "moteur").
ExecutorExecutors (Locaux)
SequentialExecutor (Dev) vs LocalExecutor (Prod simple).
Executors (Distribués)
CeleryExecutor, KubernetesExecutor.
XCom (Communication)
Partage de données (Push/Pull) entre tùches.
XCom CommunicationVariables & Connections
Gestion des (secrets, configs) via l'UI/Base de données.
Variables ConnectionsPools & Queues
Gestion des ressources (Slots) & Routage (Celery).
Pools Queuescatchup & backfill
catchup=False (Ignorer le passé), airflow dags backfill.
trigger_rule (RĂšgles)
all_success, one_failed, all_done (Nettoyage).
Ressources & Liens
Site officiel, Documentation, GitHub.
Documentation LiensQu'est-ce qu'Apache Airflow ?
Apache Airflow (créé par Airbnb, maintenant projet Apache) est une plateforme d'orchestration de flux de travail (workflow) open-source. C'est l'outil de référence pour l'ETL/ELT (Extract, Transform, Load) et le Data Engineering.
"Workflows as Code" (Python)
La philosophie d'Airflow est "Workflows as Code" (Flux de travail en tant que Code). Vous ne "dessinez" pas (drag-and-drop) vos pipelines. Vous les écrivez en Python.
Avantages : Les pipelines (DAGs) sont dynamiques (générés par du code), versionnables (Git), et testables (Pytest).
DAG (Directed Acyclic Graph)
Airflow est basé sur le concept de DAG (Graphe Orienté Acyclique). C'est le "plan" (le "workflow") que vous définissez en Python.
- Graphe (Graph) : Un ensemble de TĂąches (Tasks) (nĆuds) et de DĂ©pendances (arĂȘtes).
- Orienté (Directed) : Les liens (dépendances) ont un sens (
TĂąche A->TĂąche B). - Acyclique (Acyclic) : Le graphe n'a pas de boucle (
A -> B -> C -> Aest impossible).
[TĂąche A: Extrait (E)]
â
âââș [TĂąche B1: Transform (T)] ââș [TĂąche C1: Load (L)]
â
âââș [TĂąche B2: Transform (T)] ââș [TĂąche C2: Load (L)]
Airflow (v2.0+) a une architecture "désassemblée" (scalable) basée sur 4 composants principaux (plus le dossier de DAGs).
(Utilisateur)
â
⌠(HTTP)
+----------------+
| [ WEBSERVER ] | (UI / API)
| (Flask/Gunicorn) |
+----------------+
â âČ
(Lit/Ăcrit Ătat) â (Lit Ătat)
â âŒ
+----------------+ (Lit/Parse) +-------------------+
| [ METADATA DB] | â--------------- | [ SCHEDULER ] |
| (Postgres/MySQL) | (Stocke l'Ătat) | (Daemon/CĆur) |
| (Ex: Runs, Tasks) | +-------------------+
+----------------+ â âČ
â â â (Parse/Lit)
â (Envoie TĂąche) â âŒ
⌠+-------------+
+----------------+ | [ DAGs ] |
| [ EXECUTOR ] | | (Python) |
| (ex: Celery, K8s)| | (.py files) |
| (Le "Muscle") | +-------------+
| [Worker 1] |
| [Worker N] |
+----------------+
- 1. Webserver (Serveur Web) : (Flask) L'interface utilisateur (UI) pour visualiser les DAGs, monitorer les "Task Instances" (5.1), et déclencher (Trigger) les "DAG Runs" (5.1).
- 2. Scheduler (Planificateur) : (Le "CĆur") Un daemon (service) qui lit (parse) vos fichiers
.py(DAGs), vĂ©rifie les planifications (schedule) (ex: "cron"), et envoie les tĂąches prĂȘtes Ă ĂȘtre exĂ©cutĂ©es Ă l'Executor. - 3. Metadata Database (Base de DonnĂ©es) : (Le "Cerveau") Une base SQL (Postgres/MySQL, jamais SQLite en prod). C'est le SPOF. Elle stocke l'Ă©tat (state) de tous les DAGs, TĂąches, Runs, Logs, XComs, et Connections.
- 4. Executor (Exécuteur) : (Le "Muscle") Définit comment les tùches sont exécutées (Local, Celery, Kubernetes...).
- (5. Dossier de DAGs) : Un dossier (ex:
/opt/airflow/dags) (souvent un volume Git-Sync) que le Scheduler et le Webserver lisent.
Cron (voir guide Cron) est un "planificateur" (scheduler). Airflow est un "orchestrateur" (orchestrator).
| CritĂšre | cron (Classique) | Apache Airflow |
|---|---|---|
| Concept | "Time-based" (Basé sur le temps). | "Event/Dependency-based" (Basé sur les dépendances). |
| Dépendances | Non. (Tùche A ne sait pas si Tùche B a échoué). | Oui (A >> B). (Gestion de graphe (DAG)). |
| Retries (Ré-essais) | Non (Manuel). | Oui (Intégré, ex: retries=3, retry_delay=...). |
| Backfilling (Rattrapage) | Non (anacron basique). | Oui (airflow dags backfill). |
| Monitoring (Logs) | Faible (Email (MAILTO) ou > /dev/null). | Excellent (UI Web, Logs centralisés par Tùche/Run). |
| ScalabilitĂ© | LimitĂ©e (1 machine). | ĂlevĂ©e (DistribuĂ© : Celery/Kubernetes Executors). |
| Usage | TĂąche simple (backup.sh). | Pipelines Data complexes (ETL, ELT, ML). |
Le DAG est la définition (le "plan") de votre pipeline. C'est un fichier Python (.py) qui est lu (parsé) par le Scheduler.
Exemple (Fichier .py)
Un DAG est défini en Python. Le code n'est pas exécuté, il est parsé par le Scheduler pour (extraire) la structure (Tùches + Dépendances).
from airflow.decorators import dag
from airflow.operators.bash import BashOperator
from datetime import datetime
# (Arguments par défaut pour le DAG)
default_args = {
'owner': 'ideo',
'retries': 3,
'retry_delay': timedelta(minutes=5)
}
# (Définition du DAG (le "Graphe"))
@dag(
dag_id='mon_premier_dag',
default_args=default_args,
description='Un DAG simple',
# (Planification : "Cron")
# (Tous les jours Ă 2h du matin)
schedule_interval='0 2 * * *',
# (Date de début (obligatoire))
start_date=datetime(2025, 1, 1),
# (Ne pas rattraper le passé au 1er lancement)
catchup=False,
tags=['demo', 'ideo']
)
def ma_fonction_dag():
# (TĂąche 1)
t1 = BashOperator(
task_id='print_date',
bash_command='date'
)
# (TĂąche 2)
t2 = BashOperator(
task_id='sleep',
bash_command='sleep 5'
)
# (Dépendance : t1 PUIS t2)
t1 >> t2
# (Instanciation du DAG)
ma_fonction_dag()
Un Operator (OpĂ©rateur) est la TĂąche (Task) atomique. C'est le "nĆud" (node) du DAG. C'est le "Quoi" (l'action Ă exĂ©cuter).
1. Action Operators
Ils exécutent une action (un "verbe").
BashOperator: (Le plus courant) Exécute une commande Shell/Bash (bash_command='...').PythonOperator: (Le plus puissant) Exécute une fonction Python (python_callable=ma_fonction).DockerOperator: Lance un conteneur Docker.KubernetesPodOperator: Lance un Pod Kubernetes.PostgresOperator: Exécute du SQL sur Postgres.SparkSubmitOperator: Soumet un job Spark.
2. Transfer Operators
Déplacent des données (ex: S3ToRedshiftOperator).
3. Sensors (Capteurs) (Voir 3.2)
Ce sont des Operators "spéciaux" (mode='poke') qui attendent (poll) que quelque chose se produise.
HttpSensor: Attend qu'une URL (API) retourne 200 OK.FileSensor: Attend qu'un fichier (.csv) arrive sur le disque.SqlSensor: Attend qu'une requĂȘte SQL (SELECT COUNT...) retourneTrue.
Exemple : PythonOperator
Recommandé (vs BashOperator) car le code est testable (Pytest) et gÚre l'environnement (Python venv).
# --- 1. Définir la fonction Python ---
# (Peut ĂȘtre dans un autre fichier)
def ma_fonction_python(nom, age):
print(f"Bonjour, {nom}. Vous avez {age} ans.")
# (Doit retourner une valeur si xcom_push=True)
return {"status": 200, "user": nom}
@dag(...)
def mon_dag():
# --- 2. Définir l'Operator ---
t_python = PythonOperator(
task_id='tache_python',
# (La fonction Ă appeler)
python_callable=ma_fonction_python,
# (Arguments passés à la fonction)
op_kwargs={
"nom": "Alice",
"age": 30
}
# (Par défaut, le 'return' est poussé (push)
# vers XCom (6.1))
# do_xcom_push=True
)
L'orchestration est définie par les dépendances (flux) entre les tùches (Operators).
Opérateurs "Bitshift" (>>, <<)
La syntaxe "moderne" (Pythonique) pour définir les dépendances (l'ordre).
(t1, t2, t3, t4 sont des Operators) # --- 1. SĂ©quentiel --- # (ExĂ©cute t1, PUIS t2) t1 >> t2 # (Ăquivalent : t2 << t1) # (Ăquivalent : t2.set_upstream(t1)) # --- 2. ParallĂšle (Fan-Out) --- # (ExĂ©cute t1, PUIS (t2 et t3 en parallĂšle)) t1 >> [t2, t3] # --- 3. ParallĂšle (Fan-In) --- # (ExĂ©cute (t1 et t2 en parallĂšle), # PUIS (quand les 2 sont finis) t3) [t1, t2] >> t3 # --- 4. ChaĂźnage Complexe --- start >> [t_extract_A, t_extract_B] t_extract_A >> t_transform_A t_extract_B >> t_transform_B [t_transform_A, t_transform_B] >> t_load t_load >> end
DAG Run & Task Instance (Les Exécutions)Il faut différencier la Définition (le .py) de l'Exécution (le "Run").
DAG(Le Plan) : (Fichiermon_dag.py) C'est la Définition (le "plan", la "classe").DAG Run(L'Exécution) : (L'"Instance" du plan) C'est 1 exécution du DAG pour un instant T (ex: "L'exécution du 10 Nov 2025").Task Instance(L'Instance de Tùche) : (L'"Instance" de la tùche) L'exécution d'1 Tùche (Operator) pour 1 DAG Run spécifique. C'est l'unité la plus petite (ex: "Tùchet1du Run du 10 Nov").
Ătats (State) d'une Task Instance
C'est ce que l'on voit dans l'UI (les carrés de couleur) :
none(Gris clair) : (Pas encore dĂ©marrĂ©)queued(Gris foncĂ©) : (EnvoyĂ© Ă l'Executor (Celery)).running(Vert clair) : (En cours d'exĂ©cution).success(Vert foncĂ©) : (TerminĂ© avec succĂšs (exit 0)).failed(Rouge) : (Ăchec (exit non-zĂ©ro) aprĂšs Nretries).upstream_failed(Orange) : (IgnorĂ©, car une TĂąche "parent" (upstream) a Ă©chouĂ©).skipped(Rose) : (IgnorĂ© (ex:BranchPythonOperator)).up_for_retry(Jaune) : (Ăchec, en attente avant rĂ©-essai).
Sensors (L'Attente)Un Sensor (Capteur) (une sous-classe d'Operator) est une tĂąche qui attend (poll) qu'une condition (externe) soit True.
Mode poke (Défaut)
Le Sensor prend un "Slot" (Worker) et tourne en boucle (sleep) jusqu'Ă ce que la condition soit True.
# (Ex: Attendre un fichier S3)
s3_sensor = S3KeySensor(
task_id='wait_for_s3_file',
bucket_name='mon-bucket',
bucket_key='data/input.csv',
mode='poke', # (Défaut)
poke_interval=60, # (Vérifie toutes les 60 sec)
timeout=3600 # (Ăchoue aprĂšs 1h)
)
Inconvénient : Inefficace. Il "consomme" un slot Worker (ex: Celery) pendant 1h, juste pour attendre.
Mode reschedule (Recommandé)
Le Sensor (tĂąche) s'arrĂȘte (up_for_retry) et libĂšre son "Slot" (Worker), puis demande au Scheduler de le "rĂ©veiller" (re-planifier) plus tard (ex: dans 60 sec) pour rĂ©essayer.
s3_sensor = S3KeySensor(
task_id='wait_for_s3_file',
...
mode='reschedule', # (LibĂšre le slot)
poke_interval=60,
timeout=3600
)
Avantage : Efficace. Ne consomme pas de "Slot" Worker pendant l'attente. (Idéal pour les "sensors" qui attendent des heures).
Hooks (Connexions)Un Hook (Crochet) est une abstraction (wrapper) Python (une classe) qui gĂšre l'interface avec un service externe (BDD, API, Cloud).
Hook vs Operator
Les Operators (2.2) utilisent les Hooks (3.3) (via les Connections (6.2)) pour faire le travail.
(Operator: PostgresOperator) "ExĂ©cute ce SQL" â â (Utilise) ⌠(Hook: PostgresHook) "Je gĂšre la connexion (lib 'psycopg2'), j'ouvre le curseur, j'exĂ©cute, je ferme." â â (Utilise) ⌠(Connection: "my_postgres_db") (ID dans l'UI : contient HĂŽte, Login, Pass)
Usage (Dans un PythonOperator)
On utilise les Hooks directement dans le code Python (PythonOperator) pour de la logique complexe (ex: SELECT -> Transform -> INSERT).
from airflow.providers.postgres.hooks.postgres import PostgresHook
def select_and_insert():
# 1. (Utilise "my_postgres_db" (6.2))
pg_hook = PostgresHook(postgres_conn_id='my_postgres_db')
# 2. (Wrapper pour 'psycopg2')
# (Utilise .get_conn(), .run(), .get_records()...)
records = pg_hook.get_records("SELECT * FROM table_a")
# (Transformation...)
# 3. Insérer
pg_hook.insert_rows(table='table_b', rows=...)
Le Scheduler (Planificateur) (airflow scheduler) est le cĆur (daemon) d'Airflow.
RĂŽles (Boucle)
- 1. Parser (Analyser) les DAGs : (Toutes les N sec) Scanne le dossier
dags_folder. Parse tous les fichiers.py(Python) pour détecter les nouveaux DAGs, les Tùches, et les dépendances. - 2. Mettre à jour la BDD : Met à jour la Metadata Database (4.3) avec la structure (définition) des DAGs.
- 3. Planifier (Schedule) : (Toutes les N sec) Interroge la BDD :
- "Y a-t-il des DAG Runs (
schedule_interval) à créer (ex: le cron '0 2 * * *' est arrivé) ?" - "Y a-t-il des Task Instances (
running) à vérifier (timeout, retry) ?" - "Y a-t-il des Task Instances (
success) dont les dépendances (A >> B) sont satisfaites ?"
- "Y a-t-il des DAG Runs (
- 4. Mettre en File (Queue) : Si une TĂąche (ex:
B) est prĂȘte, le Scheduler la marque "queued" (en attente) et l'envoie Ă l'Executor (5.1) (ex: Celery).
Le Webserver (airflow webserver) est l'Interface Web (GUI) d'Airflow. C'est une application Flask (Python) (généralement servie via Gunicorn en production).
Architecture "Stateless"
L'Interface Web (UI) est (presque) stateless. Elle ne fait rien (pas de planification, pas d'exécution).
Son seul rÎle est de lire et écrire dans la Metadata Database (4.3).
Fonctionnalités (UI)
- Vue (DAGs) : Lister, Mettre en Pause (
Pause), Déclencher (Trigger DAG). - Vue (Grid / Tree) : Visualiser l'état (
success,failed) des "DAG Runs" et "Task Instances". - Monitoring : Voir les Logs (récupérés de la BDD ou du worker), voir les Gantt Charts (temps d'exécution).
- Admin (Menu) : Gérer les Connections (6.2), Variables (6.2), Pools (6.3).
La Metadata Database (Base de DonnĂ©es des MĂ©tadonnĂ©es) est le cĆur (cerveau) de l'architecture Airflow. C'est le SPOF (Single Point of Failure).
Stockage (Ătat)
Elle stocke (via SQLAlchemy) tout l'état :
- La liste des DAGs (parsés).
- L'historique de tous les
DAG Runs. - L'état (
success,failed...) de toutes lesTask Instances. - Les
VariablesetConnections(chiffrées). - Les
XComs(données inter-tùches). - (Si
LocalExecutor) Les logs des tĂąches.
Choix (Dev vs Prod)
- Développement (Défaut) :
SQLite(Fichier localairflow.db). (Facile, mais ne supporte pas le parallélisme (LocalExecutor) !). - Production (Requis) :
PostgreSQLouMySQL. (Nécessaire pourLocalExecutor,CeleryExecutor,KubernetesExecutor).
L'Executor (Exécuteur) est le "moteur" (le "comment"). C'est le composant (configuré dans airflow.cfg) que le Scheduler (4.1) utilise pour exécuter (lancer) physiquement une tùche.
(Scheduler) : "La TĂąche A est prĂȘte (queued)"
â
â (Ordre : "ExĂ©cute TĂąche A")
âŒ
[ EXECUTOR ] (ex: Celery)
â
â (Envoie Ă un Worker)
âŒ
[ Worker (Processus) ]
(Exécute: 'airflow tasks run ...')
SequentialExecutor (Défaut)
Usage : Développement, Test, Débogage.
Fonctionnement : ExĂ©cute 1 seule tĂąche Ă la fois, dans le mĂȘme processus que le Scheduler.
Inconvénient : Pas de parallélisme. (Utilise SQLite par défaut).
LocalExecutor
Usage : Production (Simple, 1 seule machine/VM).
Fonctionnement : Le Scheduler (MaĂźtre) "fork" (lance) des processus (workers) sur la mĂȘme machine pour exĂ©cuter les tĂąches en parallĂšle.
Pré-requis : Doit utiliser PostgreSQL ou MySQL (pas SQLite, qui bloque).
Inconvénient : SPOF (Si la VM meurt, le Scheduler et les Workers meurent).
CeleryExecutor
Usage : Production (Scalabilité distribuée).
Architecture (Complexe) :
- (Scheduler) Envoie la TĂąche (Message) -> [ Message Broker ] (ex: RabbitMQ, Redis).
- (N Workers) Des Workers Celery (processus Python, sur N VMs) écoutent la Queue.
- (Worker 1) Prend la Tùche, l'exécute, met à jour la BDD (Metadata).
Avantage : TrĂšs scalable (ajouter des VMs Workers), robuste.
Inconvénient : Complexe (il faut gérer RabbitMQ/Redis en plus d'Airflow).
KubernetesExecutor
Usage : Production (Cloud Native, K8s).
Architecture :
- (Scheduler) (Tourne dans K8s) Détecte une Tùche.
- (Scheduler) Parle Ă l'API K8s.
- (K8s) Lance un Nouveau Pod (Worker éphémÚre) juste pour cette Tùche (basé sur une image Docker).
- (Pod) Exécute la Tùche (
airflow tasks run ...). - (Pod) Se termine (
CompletedouError).
Avantage : Isolation parfaite (chaque tùche a son propre Pod/environnement). Scalabilité (K8s).
Inconvénient : "Cold Start" (démarrage d'un Pod) (latence).
XCom (Cross-Communication)ProblÚme : Comment la Tùche B peut-elle récupérer une valeur (ex: un ID, un nom de fichier) générée par la Tùche A ?
Solution : XCom (Cross-Communication). C'est un systÚme (basé sur la Metadata DB) de "passage" (Push/Pull) de petites données (métadonnées).
1. XCom Push (Envoyer)
Automatique (PythonOperator) : Si une fonction python_callable (2.2) retourne (return) une valeur, Airflow la "Push" (sérialise, stocke dans la BDD) automatiquement.
Manuel (BashOperator) :
(TĂąche A)
def f_push(**context):
# Pousse manuellement (Clé/Valeur)
context['ti'].xcom_push(key='mon_id', value=12345)
t_push = PythonOperator(
task_id='t_push',
python_callable=f_push
)2. XCom Pull (Recevoir)
Utilise Jinja Templating (dans bash_command) ou xcom_pull (dans PythonOperator) pour récupérer la valeur.
(TĂąche B - Python)
def f_pull(**context):
# Tire la valeur (key) de la TĂąche A (task_ids)
valeur = context['ti'].xcom_pull(
key='mon_id',
task_ids='t_push'
)
print(f"Valeur reçue: {valeur}") # 12345
t_pull = PythonOperator(...)
# (TĂąche C - Bash)
t_bash = BashOperator(
task_id='t_bash',
# (Utilise Jinja pour 'pull' la valeur de retour (default)
# de la tĂąche 't_push')
# --- CORRECTION TEMPLATE DJANGO ---
bash_command="echo 'Valeur reçue: {{ ti.xcom_pull(task_ids='t_push') }}'"
)
[t_push] >> [t_pull, t_bash]
Limite : Ne pas utiliser XCom pour de grosses données (ex: un DataFrame Pandas). XCom est stocké dans la BDD (limite ~48KB). (Pour les grosses données, Tùche A écrit sur S3/GCS, Tùche B lit depuis S3/GCS).
Variables & ConnectionsCe sont les mécanismes (stockés dans la Metadata DB, gérés via l'UI Admin) pour externaliser la configuration (éviter le "hardcoding" dans les DAGs).
Variables
Stockage Clé/Valeur (K/V) simple (non-structuré).
(UI Admin) -> KEY: 'mon_seuil', VALUE: '100'
(Usage - Python)
from airflow.models import Variable
seuil = Variable.get("mon_seuil", default_var=50)
(Usage - Jinja)
{{ var.value.mon_seuil }}Attention : Variable.get() fait un appel BDD (lent) à chaque fois. (Ne pas utiliser dans une boucle. OK au début du script).
Connections
Stockage structuré et chiffré (dans la BDD) pour les connexions externes (utilisé par les Hooks (3.3)).
(UI Admin) -> Créer Connexion :
- Conn ID:
my_postgres_db(L'ID) - Conn Type:
Postgres - Host:
db.ideolab.com - Login:
admin - Password:
*** (chiffré) - Schema:
prod
(Usage - PythonOperator/Hook) # (Le Hook lit 'my_postgres_db' # automatiquement depuis la BDD) pg_hook = PostgresHook(postgres_conn_id='my_postgres_db')
Pools & Queues (Gestion des Ressources)Outils pour gérer la concurrence (QoS) et éviter de surcharger les systÚmes externes (APIs, BDDs).
Pools (Piscines)
Un Pool (géré dans l'UI Admin) est une limite (Slot) de concurrence interne à Airflow.
Usage : "Mon API REST externe (api_meteo) n'accepte que 5 connexions parallĂšles."
Configuration :
- (UI) Créer un Pool
api_meteo_pool(Slots = 5). - (DAG) Assigner la tĂąche Ă ce pool :
t_api = HttpSensor(
task_id='call_api',
pool='api_meteo_pool',
...
)
RĂ©sultat : MĂȘme si 100 DAGs tournent, le Scheduler ne lancera jamais plus de 5 tĂąches (api_meteo_pool) en mĂȘme temps.
Queues (Files d'attente)
Ceci est spécifique au CeleryExecutor (5.3). C'est du routage.
Usage : "J'ai des Tùches 'lourdes' (ML, 128Go RAM) et des Tùches 'légÚres' (Bash, 1Go RAM)."
Configuration :
- (Celery) Créer 2 types de Workers :
worker_heavy(VMs 128Go),worker_light(VMs 4Go). - (Celery)
worker_heavyécoute la Queueheavy_tasks. - (Celery)
worker_lightécoute la Queuelight_tasks. - (DAG) Assigner la tùche à une queue :
t_ml = PythonOperator(
task_id='train_model',
queue='heavy_tasks',
...
)
t_bash = BashOperator(
task_id='print_date',
queue='light_tasks',
...
)catchup & backfillcatchup (Rattrapage)
Le PiÚge : Vous créez un DAG (schedule_interval='@daily', start_date='2024-01-01'). Vous le déployez (activez) aujourd'hui (2025-11-13).
Comportement (Défaut : catchup=True) : Airflow voit qu'il a "manqué" ~680 exécutions (depuis 2024-01-01). Il va immédiatement lancer 680 DAG Runs (Backfills), un pour chaque jour manqué. (Spam/DoS).
Solution (Bonne Pratique) : Toujours mettre catchup=False dans le @dag (ou default_args), sauf si vous *voulez* ce comportement.
airflow dags backfill (Rattrapage Manuel)
La commande (CLI) pour exécuter (manuellement) un "rattrapage" (Backfill) sur une période passée, en ignorant les dépendances.
# (Relancer (manuellement) le DAG 'mon_dag'
# pour la période du 1er au 5 Nov)
$ airflow dags backfill mon_dag \
--start-date 2025-11-01 \
--end-date 2025-11-05
trigger_rule (RÚgles de Déclenchement)Par défaut (trigger_rule='all_success'), une Tùche (B) ne s'exécute que si tous ses parents (A) ont réussi (success).
La trigger_rule (un argument de l'Operator) change ce comportement.
[TĂąche A] >> [TĂąche C] [TĂąche B] >> [TĂąche C]
| RĂšgle (sur TĂąche C) | Description |
|---|---|
all_success (Défaut) | Se lance si A=Success ET B=Success. |
one_success | Se lance si A=Success OU B=Success. |
all_failed | Se lance si A=Failed ET B=Failed. |
one_failed | (Alerte) Se lance si A=Failed OU B=Failed. (Utile pour une tĂąche d'alerte). |
all_done | Se lance quand A et B sont "finis" (success, failed, ou skipped). |
none_failed | (Nettoyage) Se lance si A=Success/Skipped ET B=Success/Skipped. (Utile pour une tùche "cleanup" qui ne doit jamais tourner si une ingestion a échoué). |
Ressources officielles pour Apache Airflow :
Page d'accueil du projet, nouvelles, communauté. Documentation Officielle
Guides (Installation, Concepts, How-to). Concepts : Opérateurs
Documentation sur les opérateurs de base. Concepts : Executors
Documentation sur les types d'exécuteurs (Local, Celery, K8s). Code Source (GitHub)
Le dépÎt Git officiel (Apache). Astronomer Registry
Registre (par Astronomer) des "Providers" (Hooks/Operators) tiers.
