đ Apache Beam â ModĂšle UnifiĂ© (Batch & Stream) & Runners
Guide complet IDEO-Lab sur le SDK de data processing portable (Spark, Flink, Dataflow).
Concept : ModÚle Unifié
ModÚle de programmation unifié (Batch & Stream).
Apache Beam Batch StreamConcept : SDK vs Runner
Le CĆur : Ăcrire 1 fois (SDK), ExĂ©cuter N fois (Runner).
SDK Runner Portabilitévs. Spark / Flink
ModĂšle/Abstraction (Beam) vs Moteur (Spark/Flink).
Spark FlinkLe "P" n°1 : Pipeline
Le DAG (Graphe) complet des transformations (le "Job").
Pipeline DAGLe "P" n°2 : PCollection
L'ensemble de données (Data) (RDD, DataFrame).
Le "P" n°3 : PTransform
L'opération (Map, Filter, GroupByKey).
Transform : ParDo (DoFn)
Le "couteau suisse" (map, filter, flatMap).
Transform : GroupByKey
Le "Shuffle". Regrouper par clé (K/V).
GroupByKey ShuffleTransform : Composite
Encapsuler N PTransform dans 1 seule (Réutilisable).
Concept : Runner (Moteur)
Le "traducteur" (Beam -> Spark, Beam -> Flink).
Runner PortabilitéRunner : Apache Spark
Exécute le pipeline Beam sur un cluster Spark (Batch/Stream).
Spark RunnerRunner : Google Cloud Dataflow
Runner "natif" Serverless (managé) sur GCP.
Dataflow GCPStreaming : Temps (Event vs Processing)
Heure de l'événement (Donnée) vs Heure de la machine (SystÚme).
Event Time Processing TimeStreaming : Watermarks (Filigranes)
Gestion du retard ("Données en retard", AfterWatermark).
Streaming : FenĂȘtres (Windows)
Fixed (Tumbling), Sliding (Glissante), Session.
Streaming : Triggers (Déclencheurs)
"Quand" Ă©mettre le rĂ©sultat (Watermark, Heure, Nb ĂlĂ©ments).
TriggersStreaming : State & Timers API
L'API "Stateful" bas niveau (ValueState, Timer).
Schemas & Beam SQL
L'API "DataFrame" (PCollection) & SQL de Beam.
SDK : Python
pip install apache-beam. (import apache_beam as beam).
SDK : Java
beam-sdks-java-core (Maven).
Ressources & Liens (Officiels)
Site web, Documentation, GitHub, Runners.
Documentation LiensQu'est-ce qu'Apache Beam ?
Apache Beam (acronyme : Batch + Stream) est un modÚle de programmation (programming model) open-source, unifié, pour définir des pipelines (flux de travail) de traitement de données.
Ce n'est pas un moteur d'exécution (comme Spark ou Flink). C'est une couche d'abstraction (SDK).
ModÚle Unifié (Batch & Stream)
La philosophie de Beam est que le "Batch" (lots) est juste un "cas particulier" du "Streaming" (un flux borné).
Beam fournit une seule API (le SDK Beam) pour définir des pipelines qui fonctionnent aussi bien pour des données "finies" (Batch, ex: un fichier CSV) que pour des données "infinies" (Stream, ex: un topic Kafka).
C'est le concept fondamental de Beam : la portabilité (Write-Once, Run-Anywhere).
Beam sépare la Définition (le "Quoi") de l'Exécution (le "Comment").
1. Le SDK (ModĂšle)
Vous (le développeur) écrivez votre pipeline en utilisant le SDK Beam (en Java, Python, ou Go).
(Exemple PySpark : 'WordCount') pipeline | 'Read' >> ReadFromText(...) | 'Split' >> beam.FlatMap(...) | 'Count' >> beam.CombinePerKey(sum) | 'Write' >> WriteToText(...)
Ce code définit le DAG (Graphe) logique, mais il ne dit rien sur *comment* l'exécuter.
2. Le Runner (Moteur)
Le Runner (Exécuteur) est le "traducteur". C'est le moteur de calcul distribué (ex: Spark) que vous choisissez au moment de l'exécution.
(Code Beam)
â
âââș (Traduction) ââș [ Spark Runner ] ââș (ExĂ©cute comme un Job Spark)
â
âââș (Traduction) ââș [ Flink Runner ] ââș (ExĂ©cute comme un Job Flink)
â
âââș (Traduction) ââș [ Dataflow Runner ] ââș (ExĂ©cute sur GCP)Avantage : Vous pouvez tester votre pipeline localement (DirectRunner), le dĂ©ployer en production (Batch) sur Spark, puis le redĂ©ployer (Streaming) sur Flink, sans changer 1 ligne de code (juste les options de pipeline).
Beam n'est pas un concurrent de Spark ou Flink. C'est une abstraction au-dessus d'eux.
| CritĂšre | Apache Beam | Apache Spark | Apache Flink |
|---|---|---|---|
| Type | ModĂšle de Programmation (SDK) | Moteur de Calcul (Batch/Micro-batch) | Moteur de Calcul (Stream-first) |
| RÎle | Définit le pipeline (le "Quoi"). | Exécute le pipeline. | Exécute le pipeline. |
| PortabilitĂ© | ĂlevĂ©e (Java, Python) -> (Spark, Flink...). | Moyenne (API Spark, tourne sur YARN/K8s). | Moyenne (API Flink, tourne sur YARN/K8s). |
| Usage | Ăcrire du code "agnostique" (portable). | Ăcrire du code "spĂ©cifique" (optimisĂ©) pour Spark. | Ăcrire du code "spĂ©cifique" (optimisĂ©) pour Flink. |
PipelineLe Pipeline est l'objet "racine" de toute application Beam. Il représente le graphe (DAG) complet de votre flux de travail (de la Source au Sink).
Exemple (Python SDK)
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
# 1. Options (ex: le Runner Ă utiliser)
options = PipelineOptions([
'--runner=DirectRunner' # (Test Local)
# '--runner=FlinkRunner'
# '--runner=SparkRunner'
])
# 2. Créer le Pipeline (le Graphe)
p = beam.Pipeline(options=options)
# 3. Définir le Pipeline (Source, Transforms, Sink)
(p
| 'Read' >> beam.io.ReadFromText('input.txt')
| 'Split' >> beam.FlatMap(...)
| 'Write' >> beam.io.WriteToText('output.txt')
)
# 4. Exécuter le Pipeline
result = p.run()
result.wait_until_finish()
Note (Python) : L'opérateur "Pipe" | est une surcharge (syntaxe) pour la fonction .apply(). p | 'Nom' >> Transform est égal à p.apply('Nom', Transform).
PCollection (Données)Une PCollection (Parallel Collection) est l'abstraction de Beam pour représenter un ensemble de données (Dataset) distribué.
C'est l'équivalent du RDD (Spark) ou du DataStream (Flink). C'est ce qui "transite" (le "flux") entre les étapes (PTransforms).
Caractéristiques
- Immuable (Immutable) : On ne "modifie" jamais une PCollection. Une transformation (
Map) crée une nouvelle PCollection. - Distribuée : (Au runtime) La PCollection est "partitionnée" et gérée par les différents workers (Executors/TaskManagers).
- Bornée (Bounded) ou Non-Bornée (Unbounded) :
- Bounded : (Batch) Données finies (ex: lire un fichier GCS).
- Unbounded : (Stream) Données infinies (ex: lire un topic Kafka).
PTransform (Opération)Une PTransform (Parallel Transform) est une opération (étape) dans le Pipeline.
(PCollection_In) | 'Nom_Etape' >> (PTransform) | (PCollection_Out)
Types de PTransforms
- Core Transforms (Noyau) : Les "primitives" (ex:
ParDo,GroupByKey,Combine,Flatten). - Composite Transforms (Composites) : Un "wrapper" qui regroupe plusieurs PTransforms (ex:
Count()est un composite deParDo+GroupByKey+Combine). - I/O Transforms (Connecteurs) : Transformations "Source" (Lecture) ou "Sink" (Ăcriture). (ex:
beam.io.ReadFromText,beam.io.WriteToKafka).
ParDo (DoFn)ParDo (Parallel Do) est la transformation fondamentale (le "couteau suisse") de Beam. C'est l'équivalent de flatMap (Spark/Flink).
ParDo applique une DoFn (Do Function) (votre code) à chaque élément de la PCollection.
Un DoFn peut :
- (Map) Ămettre (
yield) 1 Ă©lĂ©ment (1-to-1). - (Filter) Ămettre 0 Ă©lĂ©ment (
1-to-0). - (FlatMap) Ămettre N Ă©lĂ©ments (
1-to-N).
Python (Raccourcis Map, Filter, FlatMap)
# (Note: Ce sont des raccourcis 'ParDo' composites)
# Map (1-to-1)
pcoll | 'ToUpper' >> beam.Map(lambda s: s.upper())
# Filter (1-to-0/1)
pcoll | 'Filter' >> beam.Filter(lambda s: s.startswith("Error"))
# FlatMap (1-to-N) (Ex: WordCount)
lines | 'Split' >> beam.FlatMap(lambda line: line.split(' '))
Python (beam.DoFn)
Pour une logique complexe (ex: compteurs, état), on utilise une classe DoFn.
class SplitWordsFn(beam.DoFn):
def process(self, element):
# (element = 1 ligne d'entrée)
# (yield = émettre N sorties)
for word in element.split(' '):
yield word
lines | 'Split' >> beam.ParDo(SplitWordsFn())
Java (DoFn)
public class SplitWordsFn extends DoFn<String, String> {
@ProcessElement
public void processElement(@Element String line, OutputReceiver<String> out) {
// (line = 1 ligne d'entrée)
for (String word : line.split(" ")) {
// (out.output = émettre N sorties)
out.output(word);
}
}
}
lines.apply("Split", ParDo.of(new SplitWordsFn()));
GroupByKey (Le Shuffle)GroupByKey (GBK) est la transformation (Wide/Shuffle) (4.3) fondamentale pour l'agrégation.
Elle prend une PCollection de Key-Value (K/V) (Paires) et la regroupe par Clé.
Flux (WordCount)
(Input PCollection)
("le", 1)
("chat", 1)
("le", 1)
â
â | 'Group' >> beam.GroupByKey()
âŒ
(Output PCollection)
("le", [1, 1])
("chat", [1])
Combine (Optimisation)
ProblÚme : GroupByKey est trÚs coûteux (Shuffle). (Si 1 Clé a 1 Milliard de "1", l'iterable [1, 1, ...] sature la RAM du Reducer).
Solution : Utiliser Combine (ex: CombinePerKey, Count.PerKey) qui effectue une pré-agrégation (Combiner) cÎté "Map" (avant le Shuffle).
Une transformation Composite est une "macro" (un "wrapper") qui encapsule une chaßne de transformations (un sous-graphe) dans une seule PTransform réutilisable.
Exemple (Python) : WordCount
Le "WordCount" (Split + Count) est un PTransform composite.
# 1. Définir le PTransform Composite
# (Hérite de beam.PTransform)
class CountWords(beam.PTransform):
def expand(self, pcoll): # 'expand' est la méthode à implémenter
# (pcoll = PCollection d'entrée)
# (Le "sous-graphe" encapsulé)
counts = (
pcoll
| 'Split' >> beam.FlatMap(lambda line: line.split(' '))
| 'Pair' >> beam.Map(lambda word: (word, 1))
| 'Count' >> beam.CombinePerKey(sum)
)
return counts
# 2. Utiliser le PTransform Composite
(p
| 'Read' >> ReadFromText('input.txt')
| 'Compter les Mots' >> CountWords() # (Appel de notre "macro")
| 'Write' >> WriteToText('output.txt')
)
Le Runner est le moteur d'exécution. C'sest le "traducteur" qui prend le DAG (Graphe) logique (défini par le SDK Beam) et le traduit en un Job natif pour un moteur spécifique (Spark, Flink...).
Runners (Courants)
| Runner | Usage | Description |
|---|---|---|
DirectRunner | Test / Local | Exécute le pipeline localement (sur 1 machine). (Inclus dans le SDK). |
SparkRunner | Production (Batch/Stream) | Traduit le pipeline Beam en un Job Spark. (Exécute sur YARN/K8s). |
FlinkRunner | Production (Streaming) | Traduit le pipeline Beam en un Job Flink. |
DataflowRunner | Production (GCP) | Traduit et exécute le pipeline sur Google Cloud Dataflow (Managé/Serverless). |
SamzaRunner | (Niche) | Exécute sur Apache Samza. |
Le Spark Runner permet d'exécuter un pipeline Beam (Batch ou Streaming) sur un cluster Apache Spark (voir guide Spark).
Traduction (Beam -> Spark)
Pipeline(Beam) ->Spark Job(Spark)PCollection(Beam) ->RDDouDataFrame(Spark)ParDo(Beam) ->map/flatMap(Spark)GroupByKey(Beam) ->groupByKey/reduceByKey(Spark)
Exécution (CLI Python)
# (Exécuter 'mon_script_beam.py'
# en utilisant le 'SparkRunner'
# sur un 'master' YARN)
$ python mon_script_beam.py \
--runner=SparkRunner \
--spark_master=yarn \
--deploy_mode=cluster \
--... (options spark-submit)
Google Cloud Dataflow est le Runner "natif" et "serverless" de Beam (sur GCP). (Beam est issu des technologies internes de Google (FlumeJava, MillWheel)).
Fonctionnement (Managé)
Quand vous lancez un job avec le DataflowRunner :
- Le SDK Beam "traduit" le pipeline en "Job Dataflow".
- Il l'envoie Ă l'API Dataflow (Control Plane GCP).
- Dataflow provisionne (auto-scale) automatiquement un pool de "Workers" (VMs) pour exécuter le job.
- (Auto-scaling) Dataflow ajuste le nombre de workers (milliers) en fonction de la charge (Shuffle).
- Dataflow arrĂȘte les VMs Ă la fin (Batch) ou continue (Streaming).
Exécution (CLI Python)
# (Exécuter sur GCP)
$ python mon_script_beam.py \
--runner=DataflowRunner \
--project=mon-projet-gcp \
--region=europe-west1 \
--temp_location=gs://mon-bucket/temp \
--streaming (Si streaming)Dans le streaming, "le temps" est un concept complexe. Beam (comme Flink) gĂšre 3 notions du temps.
| Type de Temps | Description | Usage |
|---|---|---|
| Processing Time (Temps de Traitement) | L'heure de la machine (locale) qui exĂ©cute l'opĂ©rateur (le Worker Spark/Flink). | Simple, mais incorrect (non-dĂ©terministe). (Ne pas utiliser pour les fenĂȘtres). |
| Ingestion Time (Temps d'Ingestion) | L'heure oĂč l'Ă©vĂ©nement entre dans Beam (Ă la Source). | Compromis (meilleur que Processing). |
| Event Time (Temps de l'ĂvĂ©nement) | (RecommandĂ©) L'heure oĂč l'Ă©vĂ©nement a Ă©tĂ© créé (produit). (Timestamp embarquĂ© dans l'Ă©vĂ©nement). | Correct (DĂ©terministe). Permet de gĂ©rer les donnĂ©es en retard (late) (via Watermarks). |
Exemple (Le ProblĂšme)
Un capteur IoT (iPhone) dans un ascenseur (pas de réseau) envoie un événement "Clic" (créé à 10:00:00).
L'iPhone sort de l'ascenseur (réseau) à 10:05:00.
Beam (Worker) le traite (reçoit) à 10:05:01.
- Event Time : 10:00:00 (Correct)
- Ingestion Time : 10:05:00
- Processing Time : 10:05:01
Si vous faites un window("10:00-10:01"), seul l'Event Time placera (correctement) cet Ă©vĂ©nement dans la bonne fenĂȘtre.
ProblĂšme : Si on utilise l'Event Time (5.1) et qu'on crĂ©e une fenĂȘtre window("10:00-10:05"). Quand Beam doit-il "fermer" (calculer) cette fenĂȘtre ? Ă 10:05:00 ?
Non. (Car l'événement de 10:04:00 (en retard) peut arriver à 10:05:10).
Solution : Watermarks (Filigranes)
Un Watermark (Filigrane) est la "notion du temps" (heuristique) de Beam. C'est une métadonnée qui dit : "Je (Beam) suis certain (ou 'assez certain') qu'il n'y aura plus d'événements (Event Time) avant ce timestamp."
Quand le "Watermark" (l'horloge d'Event Time) dĂ©passe la fin d'une fenĂȘtre (ex: 10:05:00), Beam dĂ©clenche (fires) le calcul de cette fenĂȘtre.
Gestion du Retard (Lateness)
Les Watermarks sont (par dĂ©finition) imparfaits. (L'Ă©vĂ©nement de 10:04:00 peut arriver Ă 10:05:30, *aprĂšs* la fermeture de la fenĂȘtre 10:00-10:05).
Beam (via les Triggers (6.1)) permet de gérer ces données en retard (LATE) (ex: les ignorer (Discard), ou les inclure (Accumulate)).
Une "FenĂȘtre" (Window) est un "bucket" (seau) de temps (basĂ© sur Event Time ou Processing Time) utilisĂ© pour agrĂ©ger (GroupByKey) un flux infini (Unbounded).
1. Fixed Window (FenĂȘtre Fixe / Tumbling)
Taille fixe, non-chevauchante.
Exemple : beam.WindowInto(window.FixedWindows(5 * 60)) (FenĂȘtre de 5 minutes).
[-- FenĂȘtre 1 --] [-- FenĂȘtre 2 --] [-- FenĂȘtre 3 --] (10:00 - 10:05) (10:05 - 10:10) (10:10 - 10:15)
Usage : "Combien de clics toutes les 5 minutes ?".
2. Sliding Window (FenĂȘtre Glissante)
Taille fixe, chevauchante.
Exemple : beam.WindowInto(window.SlidingWindows(60, 10)) (Taille 60 sec, Glisse (slide) de 10 sec).
[---- FenĂȘtre 1 (60s) ----]
(10:00:00 - 10:01:00)
[---- FenĂȘtre 2 (60s) ----]
(10:00:10 - 10:01:10)
Usage : "Quelle est la moyenne mobile (sur 60s) des ventes, mise Ă jour toutes les 10s ?".
3. Session Window (FenĂȘtre de Session)
Taille variable, basée sur l'inactivité.
Exemple : beam.WindowInto(window.Sessions(30 * 60)) (Gap (écart) de 30 minutes).
Flux :
- (User A, Clic 1) (10:00) -> Ouvre FenĂȘtre 1
- (User A, Clic 2) (10:10) -> (Gap < 30m) -> Ajoute Ă FenĂȘtre 1
- (User A, Clic 3) (10:50) -> (Gap > 30m) -> Ferme FenĂȘtre 1. Ouvre FenĂȘtre 2.
Usage : "Calculer la durée de session (activité) par utilisateur."
Le Trigger (DĂ©clencheur) rĂ©pond Ă la question : "Quand (dans une fenĂȘtre) le rĂ©sultat (l'agrĂ©gation) doit-il ĂȘtre Ă©mis (fired) ?".
DĂ©faut : Le trigger par dĂ©faut est AfterWatermark(). (Attend que le Watermark (5.2) passe la fin de la fenĂȘtre).
Triggers (Exemples)
AfterWatermark(): (Défaut) Basé sur l'Event Time (Correct).AfterProcessingTime(): Basé sur l'horloge (Temps réel, mais incorrect).AfterCount(N): Déclencher aprÚs N éléments (ex: "tous les 100 éléments").
Combinaison & Rétraction
Les Triggers permettent des scénarios complexes : "Montre-moi un résultat (rapide) toutes les minutes (AfterProcessingTime), ET (OrFinally) le résultat correct (final) quand le Watermark arrive (AfterWatermark), ET (AfterEach) continue de mettre à jour (Accumulating) si des données en retard (LATE) arrivent."
L'API State & Timers est l'API "bas niveau" (impérative) (dans un ParDo/DoFn) pour le Stateful Streaming (3.1). (Similaire à Flink 3.2).
Elle permet Ă un DoFn (sur un flux "Keyed" (keyBy)) de stocker (State) et de planifier (Timer).
Exemple (Détection de "double-clic")
class DetectDoubleClickFn(beam.DoFn):
# 1. DĂ©finir l'Ătat (State)
# (Stocke le timestamp du dernier clic vu)
LAST_CLICK_STATE = beam.transforms.userstate.ValueStateSpec(
'last_click', beam.coders.FloatCoder())
def process(self, element,
last_click=beam.DoFn.StateParam(LAST_CLICK_STATE)):
# (Lire l'état)
prev_click_ts = last_click.read()
if prev_click_ts is not None:
if element.timestamp - prev_click_ts < 1.0: # (1 sec)
yield "Double Clic !" # (Ămettre)
# (Ăcrire l'Ă©tat)
last_click.write(element.timestamp)
ProblÚme : L'API PCollection (2.2) (comme le RDD) est "non-structurée".
Solution : L'API Schema (similaire à Spark DataFrames) permet de définir une PCollection structurée (ex: PCollection ou PCollection).
Beam SQL
Une fois qu'une PCollection a un Schéma, on peut lui appliquer des transformations SQL (beam.SqlTransform) (similaire à Spark SQL).
(Python)
# (PCollection 'ventes' (avec schéma))
ventes | 'SQL Query' >> beam.SqlTransform("""
SELECT
userID,
SUM(montant) AS total_ventes
FROM
PCOLLECTION -- (Mot-clé pour la source)
WHERE
montant > 10
GROUP BY
userID
""")
Usage : Permet aux analystes (SQL) d'écrire des pipelines Beam.
Le SDK Python (apache-beam) est le plus populaire pour la Data Science et le ML.
Installation (pip)
# Installation de base $ pip install apache-beam # (Optionnel) Installer les "extras" # (Pour Google Cloud Dataflow) $ pip install apache-beam[gcp] # (Pour Spark Runner) $ pip install apache-beam[spark] # (Pour Flink Runner) $ pip install apache-beam[flink] # (Pour Kafka) $ pip install apache-beam[kafka]
Usage (PySpark)
import apache_beam as beam
with beam.Pipeline() as p:
(p | "Read" >> beam.Create([1, 2, 3])
| "Square" >> beam.Map(lambda x: x * x)
| "Print" >> beam.Map(print)
)
Le SDK Java est le SDK "natif" (le plus ancien et le plus stable), souvent utilisé en production "entreprise".
Installation (Maven pom.xml)
<!-- (Dépendance de base) --> <dependency> <groupId>org.apache.beam</groupId> <artifactId>beam-sdks-java-core</artifactId> <version>2.56.0</version> </dependency> <!-- (Ajouter le Runner) --> <dependency> <groupId>org.apache.beam</groupId> <artifactId>beam-runners-spark-3</artifactId> <version>2.56.0</version> </dependency> <!-- (ou Dataflow Runner) --> <dependency> <groupId>org.apache.beam</groupId> <artifactId>beam-runners-google-cloud-dataflow-java</artifactId> <version>2.56.0</version> </dependency>
Ressources officielles pour Apache Beam :
Page d'accueil du projet, nouvelles, communauté. Documentation Officielle
Guides de programmation (Python, Java), Concepts (Streaming, Windows...). Matrice de Capacité des Runners
(Page cruciale) Montre quelles fonctionnalités (ex: Triggers, State) sont supportées par quels Runners (Spark, Flink...). Code Source (GitHub)
Le dépÎt Git officiel (Apache).
