Project Oxygen & Ideo-LabIDEO LAB Dashboard 2026

🚀 Apache Spark – Moteur Big Data (RDD, DataFrame, Spark SQL)

Guide complet IDEO-Lab sur le moteur de calcul distribué (Spark Core, SQL, Streaming & MLlib).

1.1

Concept : Moteur Distribué

Calcul (In-Memory) vs MapReduce (Disque).

Spark Big Data In-Memory
1.2

Architecture Cluster

Driver (Cerveau) vs Executor (Muscle).

Driver Executor Cluster Manager
1.3

vs. Databricks / EMR

Le Moteur (Spark) vs La Plateforme (Databricks).

Databricks AWS EMR
2.1

API (Legacy) : RDD

Resilient Distributed Dataset. Bas niveau (map, filter).

RDD map filter
2.2

API (Moderne) : DataFrame

Données structurées (Colonnes). Haut niveau (select, where).

DataFrame Dataset
2.3

RDD vs DataFrame

Non-Structuré (RDD) vs Structuré (DF + Catalyst).

Catalyst Optimisation
3.1

Module : Spark SQL

spark.sql(). (Moteur SQL sur DataFrames).

Spark SQL SQL
3.2

Le Cerveau : Catalyst Optimizer

Plan Logique -> Plan Physique (Optimisation).

Catalyst Optimizer
3.3

Sources de Données

Parquet, Delta Lake, Avro, JSON, CSV, JDBC.

Parquet Delta Lake
4.1

Concept : Lazy Evaluation

Transformations (Lazy) vs Actions (Trigger).

Lazy Evaluation Transform Action
4.2

Le DAG : Job -> Stages -> Tasks

Plan d'exĂ©cution (Job), Étapes (Stages), UnitĂ©s (Tasks).

DAG Job Stage
4.3

Le PiĂšge : Le Shuffle

Mouvement réseau (groupBy, join). (Goulot d'étranglement).

Shuffle Performance
5.1

CLI : spark-submit

Soumission d'applications (.py, .jar) au cluster.

spark-submit CLI
5.2

Modes de Déploiement

--deploy-mode client (Dev) vs cluster (Prod).

client mode cluster mode
5.3

Outil : Spark UI (Port 4040)

L'interface web (diagnostic : DAG, Stages, TĂąches).

Spark UI Port 4040
6.1

Structured Streaming (Moderne)

Le "DataFrame infini" (Micro-batch).

Structured Streaming Micro-batch
6.2

Intégration : Kafka

Source (readStream) & Sink (writeStream).

Kafka Streaming
6.3

Streaming : State & Watermarking

Gestion (flatMapGroupsWithState) & Retard (Watermark).

Stateful Watermarking
7.1

API : PySpark

L'API Python (Wrapper Py4J) pour Spark.

PySpark Python
7.2

Module : MLlib

Machine Learning (Pipelines, Featurization, ModĂšles).

MLlib ML
7.3

Cheat-sheet PySpark

Opérations DataFrame (select, withColumn, groupBy).

Cheatsheet PySpark
1.1 Concept : Moteur de Calcul Distribué
Qu'est-ce qu'Apache Spark ?

Apache Spark est un moteur de calcul (compute engine) open-source, conçu pour le traitement de données distribué (Big Data).

vs. Hadoop MapReduce (Le "Pourquoi")

Spark a été créé pour remplacer Hadoop MapReduce (l'ancien standard).

ProblÚme (MapReduce) : MapReduce était basé sur le disque (Disk-based). Chaque étape (Map -> Reduce) devait écrire ses résultats sur le disque (HDFS) avant que l'étape suivante ne puisse lire (I/O Disque massif, trÚs lent).

Solution (Spark) : Spark est basé sur la mémoire (In-Memory). Il charge les données (RDD/DataFrame) en RAM sur le cluster et exécute les opérations (le DAG) en mémoire. (10x à 100x plus rapide que MapReduce).

Les 4 Piliers (Modules)

Spark n'est pas un seul outil, c'est une "stack" (pile) :

  • Spark Core : Le "cƓur" (le moteur). GĂšre le scheduling (TĂąches, DAG), la mĂ©moire, et l'API RDD (2.1).
  • Spark SQL : (Le plus utilisĂ©) Le module pour les donnĂ©es structurĂ©es. Fournit l'API DataFrame (2.2), Spark SQL (3.1) et l'optimiseur Catalyst (3.2).
  • Spark Streaming / Structured Streaming (6.1) : Le module pour le traitement en temps rĂ©el (flux).
  • MLlib (7.2) : La bibliothĂšque de Machine Learning distribuĂ©.
1.2 Architecture : Driver, Executor & Cluster Manager

Une application Spark (ex: un spark-submit) s'exécute comme un ensemble de processus indépendants sur un cluster.

(Votre Script .py)
       │
       │ (1. Soumission : spark-submit)
       ▌
[ CLUSTER MANAGER ] (ex: YARN, Kubernetes)
 (Le "RH" : Négocie les ressources)
       │
       │ (2. Lance le Driver)
       ▌
+---------------------+
| [ DRIVER (JVM) ]    | (Le "Cerveau")
| (Votre 'main()')    |
| (SparkContext)      |
| (Catalyst Optimizer)|
| (Planifie le DAG)   |
+---------------------+
       │
       │ (3. NĂ©gocie les Executors)
       │
       └────────â–ș [ CLUSTER MANAGER ]
                        │
                        │ (4. Lance les Executors)
                        │
    ┌───────────────────┮───────────────────┐
    ▌                                       ▌
+----------------+                      +----------------+
| [ EXECUTOR 1 ] | (Le "Muscle")        | [ EXECUTOR 2 ] |
| (JVM/Worker)   |                      | (JVM/Worker)   |
| (Exécute Task 1) |                      | (Exécute Task 2) |
| (Cache Data A) |                      | (Cache Data B) |
+----------------+                      +----------------+
  • Cluster Manager (Gestionnaire) : (ex: YARN, Kubernetes). Le "gestionnaire de ressources". Spark lui "demande" des conteneurs (CPU/RAM).
  • Driver (Pilote) : (1 par App) Le "cerveau". C'est le processus main() de votre application. Il planifie le travail (le DAG) et coordonne les Executors.
  • Executor (ExĂ©cuteur) : (N par App) Les "muscles" (workers). LancĂ©s par le Cluster Manager. Ils exĂ©cutent les TĂąches (Tasks) (le calcul) et stockent (cache) les partitions de donnĂ©es en RAM.
1.3 Comparaison : Apache Spark vs. Databricks vs. EMR
CritĂšreApache Spark (Open Source)Databricks (SaaS)AWS EMR (PaaS)
DescriptionLe moteur (logiciel). (DIY).La plateforme SaaS (Lakehouse) (par les créateurs de Spark).La plateforme PaaS (AWS).
GestionManuelle (Installer Spark, Hadoop, gérer les clusters).EntiÚrement managée (Zéro admin, UI, Notebooks).Managée (Provisionne un cluster Hadoop/Spark "vanilla").
PerformanceStandard (JVM).Optimisée (Moteur Photon C++, Delta Lake, Caching).Standard (JVM).
InterfaceCLI (spark-submit), Shell.Notebooks Web, Databricks SQL, Jobs UI.CLI (AWS), Console EMR, Notebooks (EMR Studio).
CoûtGratuit (Coût des VMs).Cher (Coût des VMs + Coût DBU (Databricks Unit)).Moyen (Coût EC2 + Marge EMR).
2.1 API (Legacy) : RDD (Resilient Distributed Dataset)

Le RDD est l'API "bas niveau" originale de Spark. C'est une collection d'objets (immuable, distribuée) sur laquelle on peut opérer en parallÚle.

Resilient : Si une partition (Executor) est perdue, Spark peut la recalculer (grĂące au DAG).

Distribué : Le RDD est "coupé" en Partitions, stockées sur les Executors.

Dataset : Une collection de données (ex: lignes d'un log, objets Python).

RDD (Non-Structuré)

Le RDD est "agnostique" (il ne connaßt pas le schéma). C'est une "boßte noire" d'objets (ex: RDD[String], RDD[MaClassePerso]).

Exemple (PySpark)
# 1. Créer un RDD (depuis un fichier)
lines_rdd = sc.textFile("s3://.../logs.txt")
# (RDD de Strings (lignes))

# 2. Transformation (map) - Lazy
errors_rdd = lines_rdd.filter(lambda line: "ERROR" in line)

# 3. Transformation (map) - Lazy
msgs_rdd = errors_rdd.map(lambda line: line.split(" ")[-1])

# 4. Action (Trigger) - Exécute le Job
results = msgs_rdd.collect()
2.2 API (Moderne) : DataFrame (DF) / Dataset

Le DataFrame (DF) est l'API "haut niveau" moderne (introduite dans Spark SQL). C'est le standard de facto aujourd'hui.

DataFrame (Structuré)

Un DataFrame est un RDD (distribué) + un Schéma (Schema).

C'est une table (structurée) avec des colonnes nommées (similaire à un DataFrame Pandas ou une table SQL).

Exemple (PySpark)
# 1. Créer un DataFrame (lit le Parquet/Delta)
# (Le schéma est lu automatiquement)
df = spark.read.format("parquet").load("s3://.../users.parquet")
# df.printSchema()
# |-- user_id: long
# |-- age: integer
# |-- city: string

# 2. Transformation (select) - Lazy
# (On manipule des *colonnes*, pas des "lignes")
df_filtered = df.select("user_id", "age") \
                .where(df["age"] > 30)

# 3. Action (Trigger)
df_filtered.show()
Dataset (Statique - Scala/Java)

Un Dataset (Scala/Java) est un DataFrame "typé statiquement". (Ex: Dataset[Person]). C'est le "meilleur des deux mondes" (Structuré (DF) + Typé (RDD)). (L'API Python (PySpark) n'a pas de Datasets, elle n'a que des DataFrames).

2.3 Comparaison : RDD vs DataFrame (Catalyst)

RÚgle : Toujours utiliser les DataFrames (Spark SQL), sauf cas trÚs spécifique (données binaires non-structurées).

Le RĂŽle de Catalyst (L'Optimiseur)

La raison principale est la performance, grĂące Ă  l'Optimiseur Catalyst (3.2).

RDD (BoĂźte Noire)

Quand vous utilisez un RDD (ex: .filter(lambda x: ...)), Spark ne "voit" que du code (Python/Scala) opaque.

Il ne peut pas l'optimiser. Il doit sérialiser le code (le lambda) et l'envoyer à l'Executor tel quel.

(Inefficace)

DataFrame (BoĂźte Transparente)

Quand vous utilisez un DataFrame (ex: .where(col("age") > 30)), vous ne donnez pas de code, vous décrivez (déclaratif) une intention.

Catalyst (Optimizer) intercepte cette intention ("WHERE age > 30").

Optimisation (Pushdown) : Catalyst va réécrire votre requĂȘte. (Ex: Si la source est Parquet, il va "pousser" (pushdown) le filtre age > 30 directement au lecteur Parquet, qui ne lira mĂȘme pas les autres lignes (Pruning)).

(TrĂšs efficace)

3.1 Module : Spark SQL

Spark SQL est le module (et l'API) qui permet d'exĂ©cuter des requĂȘtes SQL (Standard) sur des DataFrames (ou des sources de donnĂ©es).

Usage (PySpark)

Les APIs DataFrame et SQL sont unifiĂ©es (elles partagent le mĂȘme moteur Catalyst).

(Session Spark 'spark' est créée)

# 1. Charger un DataFrame
df = spark.read.parquet("s3://.../users.parquet")

# 2. Enregistrer le DF comme une "Table" (temporaire, en mémoire)
df.createOrReplaceTempView("users_view")

# 3. (Magie) RequĂȘter la "Vue" en SQL
# (Ceci est exécuté par le moteur Spark, pas par une BDD SQL)
df_results = spark.sql("""
    SELECT
        city,
        AVG(age) as avg_age
    FROM
        users_view
    WHERE
        age > 20
    GROUP BY
        city
    ORDER BY
        avg_age DESC
""")

df_results.show()

Usage : Permet aux Data Analysts (qui connaissent SQL) de faire du Big Data, sans apprendre PySpark/Scala.

3.2 Le Cerveau : Catalyst Optimizer

Catalyst est le moteur d'optimisation (query optimizer) de Spark SQL. C'est le "cerveau" qui rend les DataFrames (2.2) et Spark SQL (3.1) rapides.

Flux (Simplifié)

Quand vous exécutez df.show() ou spark.sql() :

  1. Code (DF/SQL) : (ex: df.select(...).filter(...)).
  2. Catalyst (Analyse) : Convertit le code en Plan Logique (Unresolved).
  3. Catalyst (Analyse) : Vérifie le "Catalogue" (Métadonnées) -> Plan Logique (Analyzed).
  4. Catalyst (Optimisation Logique) : Applique des rÚgles (ex: Predicate Pushdown (Pousser les filtres), Constant Folding). -> Plan Logique (Optimisé).
  5. Catalyst (Optimisation Physique) : GénÚre plusieurs Plans Physiques (ex: "Hash Join" vs "Broadcast Join") et choisit le meilleur (basé sur les coûts).
  6. GĂ©nĂ©ration de Code (Codegen) : (Étape finale) GĂ©nĂšre du bytecode Java (trĂšs rapide) pour ce plan.
  7. Exécution : (Envoie le code (Tùches) aux Executors).
3.3 Sources de DonnĂ©es (Lecture/Écriture)

Spark (via spark.read et df.write) peut se connecter à (presque) n'importe quelle source de données.

spark.read.format(...).load(...)
FormatDescriptionNotes
parquet(Recommandé) Format Colonnaire (rapide, compressé, schéma).Défaut. Idéal pour Data Lakes.
delta(Recommandé) (Delta Lake) "Parquet" + ACID (Logs).Requis pour MERGE/UPDATE. (Utilisé par Databricks/Synapse).
csvTexte (lent, parsing).Nécessite .option("header", "true"), .option("inferSchema", "true").
jsonTexte (lent, parsing).GĂšre le JSON multi-lignes.
orc(Concurrent de Parquet) Colonnaire.(Utilisé par Hive).
jdbcBase de donnĂ©es (SQL)..option("url", ...), .option("dbtable", ...). (Lent, pousse les requĂȘtes (pushdown)).
4.1 Concept : Lazy Evaluation (Évaluation Paresseuse)

C'est le concept fondamental de l'exécution Spark.

Spark divise les opérations en deux types : Transformations (Lazy) et Actions (Eager).

1. Transformations (Lazy / Paresseuses)

Une "Transformation" (ex: select, filter, map, groupBy) ne fait rien (n'exécute aucun calcul) quand vous l'appelez.

Elle ajoute simplement une étape au Plan d'Exécution (DAG) que Spark (Catalyst) construit en mémoire.

2. Actions (Eager / Actives)

Une "Action" (ex: show, count, collect, write) déclenche (triggers) l'exécution du DAG.

C'est seulement quand vous appelez une Action que Spark :

  1. (Analyse le DAG complet)
  2. (Optimise le DAG via Catalyst)
  3. (GénÚre un "Job" et l'envoie aux Executors)

# (1. Transformation) -> Lazy. (0 sec)
# (Spark construit le DAG : "Etape 1: Lire")
df = spark.read.parquet("s3://...")

# (2. Transformation) -> Lazy. (0 sec)
# (Spark ajoute au DAG : "Etape 2: Filtrer")
df_filtered = df.where(col("age") > 30)

# (3. Action) -> Eager. (Déclenche le Job)
# (Maintenant, Spark exécute (1) et (2))
df_filtered.show()
4.2 Le DAG : Job -> Stages -> Tasks

Lorsqu'une "Action" (4.1) est appelée, le Driver (Catalyst) convertit le "Plan Logique" en DAG (Directed Acyclic Graph) d'exécution physique.

[ JOB 1 (ex: .show()) ]
   │
   ├─â–ș [ STAGE 1 (Narrow) ] (ex: read + filter)
   │     ├─ Task 1 (Partition 1)
   │     ├─ Task 2 (Partition 2)
   │     └─ ... (Task N)
   │
   │ (--- SHUFFLE (4.3) ---)
   │
   └─â–ș [ STAGE 2 (Wide) ] (ex: groupBy)
         ├─ Task 1 (Reducer 1)
         └─ Task 2 (Reducer 2)
  • Job : (1 Job = 1 Action). (ex: df.write(...)).
  • Stage (Étape) : Un "Stage" est un ensemble de TĂąches qui peuvent ĂȘtre exĂ©cutĂ©es en parallĂšle (Narrow Transformation) sans Shuffle (4.3). Un Shuffle (ex: groupBy) coupe le DAG et crĂ©e un nouveau Stage.
  • Task (TĂąche) : (L'unitĂ© de travail) 1 TĂąche = 1 "Partition" de donnĂ©es, exĂ©cutĂ©e sur 1 "Core" (CPU) d'un "Executor".
4.3 Le Piùge : Le Shuffle (Goulot d'Étranglement)

Le Shuffle (Mélange) est l'opération (physique) de redistribution des données sur le réseau (entre les Executors). C'est l'opération la plus coûteuse (lente) dans Spark.

Transformations "Narrow" vs "Wide"
  • Narrow (Étroit) : (Rapide) Pas de Shuffle. Les donnĂ©es restent sur leur partition. (Ex: map, filter, select).
  • Wide (Large) : (Lent) NĂ©cessite un Shuffle. Les donnĂ©es (clĂ©s) doivent ĂȘtre dĂ©placĂ©es entre les Executors. (Ex: groupByKey, reduceByKey, join (sauf Broadcast), repartition).
Exemple (groupByKey)

Pour compter les "Clés" (A, B, C), qui sont réparties sur 4 Executors :

  1. (Stage 1) Executor 1 (A, B), Executor 2 (A, C), Executor 3 (B, C)...
  2. (Shuffle - I/O Disque/Réseau) Executor 1 doit envoyer ses "B" et "C" aux autres. Executor 2 doit envoyer... (etc.).
  3. (Stage 2) (AprĂšs Shuffle) Executor 1 a (A, A), Executor 2 a (B, B), Executor 3 a (C, C).
  4. (Stage 2) Les Executors comptent (réduisent) leurs clés locales.

Optimisation : Toujours préférer reduceByKey (qui pré-agrÚge sur le "Mapper" (Stage 1)) à groupByKey (qui envoie tout (brut) au Shuffle).

5.1 CLI : spark-submit

spark-submit est l'outil CLI (standard) pour soumettre une application Spark (ex: un .py ou .jar) Ă  un Cluster Manager (1.2).

Syntaxe (Exemple PySpark sur YARN)
/usr/bin/spark-submit \
    --master yarn \
    --deploy-mode cluster \
    --name "Mon_App_Prod" \
    --driver-memory 4g \
    --num-executors 50 \
    --executor-memory 8g \
    --executor-cores 4 \
    --py-files libs.zip \
    mon_script.py \
    arg1 \
    arg2
OptionDescription
--master [url]Le Cluster Manager (ex: yarn, k8s://..., local[*] (Test)).
--deploy-mode [mode]OĂč exĂ©cuter le Driver : client (local) ou cluster (Prod). (Voir 5.2).
--name [nom]Nom de l'application (visible dans l'UI YARN/Spark).
--driver-memory [Go]RAM pour le "Cerveau" (Driver).
--num-executors [N]Nombre de "Muscles" (Workers) Ă  demander.
--executor-memory [Go]RAM pour 1 Muscle (Worker).
--executor-cores [N]Nb de vCPUs (TĂąches parallĂšles) pour 1 Muscle.
--py-files [zip/egg](PySpark) Librairies Python Ă  envoyer aux Executors.
mon_script.pyLe script (Point d'entrée).
arg1 arg2Arguments passés au script main().
5.2 Modes de Déploiement : client vs cluster

L'option --deploy-mode (dans spark-submit) est cruciale. Elle dĂ©finit oĂč (physiquement) le processus Driver (1.2) va s'exĂ©cuter.

Mode client (Défaut)

(Développement / Interactif)

Le Driver (Cerveau) s'exĂ©cute sur la machine oĂč vous avez lancĂ© spark-submit (ex: votre laptop, ou le "Edge Node" du cluster).

Les Executors (Muscles) tournent sur le cluster (YARN/K8s).

  • Avantage : Interactif. Le shell pyspark (REPL) est en mode "client".
  • InconvĂ©nient (Prod) : Si vous fermez votre laptop (ou tuez le terminal SSH), vous tuez le Driver (le Cerveau). Le Job (Application) crash.
  • InconvĂ©nient (RĂ©seau) : Le Driver (Cerveau) doit communiquer (beaucoup) avec les Executors (Muscles) (WAN). (Latence).
Mode cluster (Recommandé)

(Production / Batch)

Le Driver (Cerveau) est envoyĂ© (soumis) au Cluster Manager (YARN), qui le lance sur un nƓud (VM) quelconque du cluster (gĂ©nĂ©ralement le "Master Node" YARN).

Les Executors (Muscles) tournent aussi sur le cluster.

(Laptop) -> spark-submit (Job 1)
(Laptop) -> (Le terminal se libĂšre)

(Sur le Cluster YARN)
  -> [Node 1 (Master)] -> (Driver (Job 1))
  -> [Node 2] -> (Executor 1)
  -> [Node 3] -> (Executor 2)
  • Avantage (Prod) : RĂ©silience. Vous pouvez fermer votre laptop. Le Job (Driver) continue de tourner sur le cluster.
  • Avantage (RĂ©seau) : Le Driver et les Executors sont "proches" (mĂȘme Data Center/VPC). (Faible latence).
5.3 Outil : Spark UI (Port 4040)

La Spark UI est l'interface web (GUI) de diagnostic de Spark. C'est l'outil n°1 pour déboguer la performance.

Elle est (par défaut) hébergée sur le Driver (1.2), sur le Port 4040 (http://[IP_Driver]:4040).

Onglets Clés (Diagnostic)
OngletDescription
JobsListe des "Actions" (4.1) (ex: show, write). Permet de voir le DAG (4.2).
Stages(Le plus important) Liste les "Stages" (étapes) du Job. Permet d'identifier le Shuffle (4.3) (les "Shuffle Read/Write").
TasksListe les Tùches (unités) individuelles (ex: 500 Tùches "failed").
StorageMontre quels RDDs/DataFrames sont en Cache (Mémoire).
ExecutorsListe les "Muscles" (Workers) (IP, RAM/Disque utilisé, Nb Tùches).
SQLAffiche le Plan d'Exécution (Optimisé) de Catalyst (3.2). (Permet de voir si le "Predicate Pushdown" a fonctionné).
6.1 Streaming (Moderne) : Structured Streaming

DStreams (Legacy) : L'ancien "Spark Streaming" (avant 2.x) était basé sur des RDDs (DStreams). (ObsolÚte).

Structured Streaming (Moderne)

Le Structured Streaming (Spark 2.x+) est le moteur de streaming moderne. Il est basé sur l'API DataFrame (2.2).

Le "DataFrame Infini"

Concept : Vous traitez un flux (Stream) de données (ex: Kafka) comme s'il s'agissait d'une table (DataFrame) statique qui "grandit" (append-only).

Vous Ă©crivez une requĂȘte (Batch) une seule fois (ex: df.groupBy("type").count()). Spark (Moteur) se charge de l'exĂ©cuter en continu (micro-batch).

Exemple (PySpark)
# 1. Lecture (Source : Socket)
# (Crée un DF 'lines' qui représente le flux)
lines_df = spark.readStream \
    .format("socket") \
    .option("host", "localhost") \
    .option("port", 9999) \
    .load()

# 2. Transformation (DataFrame standard)
words_df = lines_df.select(
    explode(split(lines_df.value, " ")).alias("word")
)
word_counts_df = words_df.groupBy("word").count()

# 3. Écriture (Sink : Console)
# (Action 'writeStream' qui déclenche le Job)
query = word_counts_df.writeStream \
    .outputMode("complete") \
    .format("console") \
    .start()

query.awaitTermination()
6.2 Intégration : Kafka

Kafka (le Bus de Messages) est la Source (Input) et le Sink (Output) le plus courant pour Structured Streaming.

Source (readStream)

Lire (consommer) un topic Kafka.

df = spark.readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "host1:port1,...") \
  .option("subscribe", "mon_topic") \
  .load()

# (Le DF Kafka a des colonnes 'key', 'value' (binaire),
#  'topic', 'partition', 'timestamp')
df_string = df.selectExpr("CAST(value AS STRING)")
Sink (writeStream)

Écrire (produire) un DataFrame (rĂ©sultat) dans un topic Kafka.

# (df_results est le résultat du 'groupBy')
# (Doit avoir une colonne 'value', ou 'key'/'value')

query = df_results.writeStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "host1:port1,...") \
  .option("topic", "topic_resultats") \
  .option("checkpointLocation", "/path/to/checkpoints") \
  .start()

checkpointLocation (Crucial) : L'"état" (checkpoint) du streaming. C'est là que Spark écrit les "offsets" (positions) qu'il a lus (Garantie "Exactly-Once").

6.3 Streaming : State & Watermarking

ProblÚme (Streaming) : Vous faites un groupBy(window("10 minutes")).count(). Que faire si un événement de 10h02 arrive en retard (à 10h15) (à cause du réseau) ?

Solution : Watermarking (Filigrane)

Le Watermarking (withWatermark) dit Ă  Spark : "J'accepte les donnĂ©es en retard (ex: 10 minutes de retard), mais au-delĂ , ferme la fenĂȘtre (agrĂ©gation) et ignore-les."

C'est la façon dont Spark gÚre le compromis Latence vs Complétude.

df_logs = ... (avec "timestamp")

df_window = df_logs \
    .withWatermark("timestamp", "10 minutes") \
    .groupBy(
        window("timestamp", "5 minutes") # (AgrĂšge par fenĂȘtre de 5 min)
    ) \
    .count()

RĂ©sultat : Spark attendra 10 minutes (le "watermark") avant de finaliser (Ă©crire) le compte de la fenĂȘtre "10:00-10:05".

7.1 API : PySpark (Python)

PySpark est l'API Python pour Spark. C'est le langage dominant en Data Science et Data Engineering (sur Spark).

Architecture (Py4J)

ProblÚme : Spark (Driver/Executor) tourne sur la JVM (Java/Scala). Python (CPython) tourne dans un processus séparé.

Solution : Py4J (la "glue").

[ Processus Python (PySpark) ]
  (ex: df.select("age"))
  │
  │ (Socket Py4J)
  │
  ▌
[ Processus JVM (Spark Driver) ]
  │
  │ (Catalyst Optimizer)
  │ (CrĂ©e le Plan)
  ▌
[ Executors (JVM) ]

Overhead : Les opérations DataFrame (3.1) sont rapides (le code Python est "traduit" en plan Spark). Les opérations RDD (rdd.map(lambda...)) sont lentes (Spark doit sérialiser le code Python, l'envoyer aux Executors, lancer un "Worker Python", exécuter, et resérialiser les résultats -> Coûteux).

7.2 Module : MLlib (Machine Learning)

MLlib est la bibliothÚque (intégrée) de Machine Learning distribué de Spark. (Basée sur l'API DataFrame).

Concept : Pipeline (ML)

MLlib (comme Scikit-learn) est basé sur le concept de Pipelines (chaßnes d'étapes).

(Exemple PySpark : Pipeline de Classification)
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml.classification import LogisticRegression

# 1. Étapes de "Featurization" (PrĂ©-traitement)
# (Transformer "ville" (string) en "ville_idx" (nombre))
indexer = StringIndexer(inputCol="ville", outputCol="ville_idx")

# (Assembler les colonnes "features" en 1 vecteur)
assembler = VectorAssembler(inputCols=["age", "ville_idx"],
                            outputCol="features")

# 2. Étape de "Modùle" (Entraünement)
lr = LogisticRegression(featuresCol="features", labelCol="a_achete")

# 3. Définir le Pipeline
pipeline = Pipeline(stages=[indexer, assembler, lr])

# 4. EntraĂźner (Fit)
model = pipeline.fit(training_data_df)

# 5. Prédire
predictions_df = model.transform(test_data_df)
7.3 Cheat-sheet : PySpark DataFrame
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, avg, sum, count, when, lit

# 1. Créer la Session (Driver)
spark = SparkSession.builder \
    .appName("MonApp") \
    .getOrCreate()

# 2. Lire (Source)
df = spark.read.format("parquet").load("/path/to/data")
# df = spark.read.csv("/path/to/csv", header=True, inferSchema=True)

# --- 3. TRANSFORMATIONS (Lazy) ---

# Sélectionner des colonnes
df_select = df.select("col_A", "col_B")
df_select = df.select(col("col_A"), (col("col_B") * 2).alias("col_B_double"))

# Filtrer (Where)
df_filter = df.where(col("age") > 30)
df_filter = df.filter((col("age") > 30) & (col("ville") == "Paris"))

# Ajouter/Modifier une colonne (withColumn)
df_with = df.withColumn("age_plus_10", col("age") + 10)
df_with = df.withColumn("categorie",
    when(col("age") < 18, "Mineur")
    .when(col("age") < 65, "Adulte")
    .otherwise("Senior")
)
df_with = df.withColumn("constante", lit(123)) # Ajouter une constante

# Renommer / Supprimer
df_rename = df.withColumnRenamed("nom", "nouveau_nom")
df_drop = df.drop("col_inutile_1", "col_inutile_2")

# Jointure (Join)
df_joined = df_A.join(
    df_B,
    df_A["user_id"] == df_B["user_id"], # Clé de Join
    "inner" # (Type: inner, left, right, full)
)

# Agrégation (GroupBy)
df_agg = df.groupBy("ville") \
           .agg(
                count("*").alias("nb_users"),
                avg("age").alias("age_moyen")
           )

# --- 4. ACTIONS (Eager) ---

# Afficher (Debug)
df_agg.show(10)

# Afficher le Schéma (Debug)
df_agg.printSchema()

# Compter (Action)
count = df_agg.count()

# Collecter (Action - DANGER : RAM Driver)
# (RamÚne TOUTES les données (résultat) au Driver)
results_list = df_agg.collect()

# Écrire (Sink)
df_agg.write \
    .format("delta") \
    .mode("overwrite") \
    .save("/path/to/output")