đ Apache Spark â Moteur Big Data (RDD, DataFrame, Spark SQL)
Guide complet IDEO-Lab sur le moteur de calcul distribué (Spark Core, SQL, Streaming & MLlib).
Concept : Moteur Distribué
Calcul (In-Memory) vs MapReduce (Disque).
Spark Big Data In-MemoryArchitecture Cluster
Driver (Cerveau) vs Executor (Muscle).
Driver Executor Cluster Managervs. Databricks / EMR
Le Moteur (Spark) vs La Plateforme (Databricks).
Databricks AWS EMRAPI (Legacy) : RDD
Resilient Distributed Dataset. Bas niveau (map, filter).
API (Moderne) : DataFrame
Données structurées (Colonnes). Haut niveau (select, where).
RDD vs DataFrame
Non-Structuré (RDD) vs Structuré (DF + Catalyst).
Catalyst OptimisationModule : Spark SQL
spark.sql(). (Moteur SQL sur DataFrames).
Le Cerveau : Catalyst Optimizer
Plan Logique -> Plan Physique (Optimisation).
Catalyst OptimizerSources de Données
Parquet, Delta Lake, Avro, JSON, CSV, JDBC.
Parquet Delta LakeConcept : Lazy Evaluation
Transformations (Lazy) vs Actions (Trigger).
Lazy Evaluation Transform ActionLe DAG : Job -> Stages -> Tasks
Plan d'exĂ©cution (Job), Ătapes (Stages), UnitĂ©s (Tasks).
DAG Job StageLe PiĂšge : Le Shuffle
Mouvement réseau (groupBy, join). (Goulot d'étranglement).
CLI : spark-submit
Soumission d'applications (.py, .jar) au cluster.
Modes de Déploiement
--deploy-mode client (Dev) vs cluster (Prod).
Outil : Spark UI (Port 4040)
L'interface web (diagnostic : DAG, Stages, TĂąches).
Spark UI Port 4040Structured Streaming (Moderne)
Le "DataFrame infini" (Micro-batch).
Structured Streaming Micro-batchIntégration : Kafka
Source (readStream) & Sink (writeStream).
Streaming : State & Watermarking
Gestion (flatMapGroupsWithState) & Retard (Watermark).
API : PySpark
L'API Python (Wrapper Py4J) pour Spark.
PySpark PythonModule : MLlib
Machine Learning (Pipelines, Featurization, ModĂšles).
MLlib MLCheat-sheet PySpark
Opérations DataFrame (select, withColumn, groupBy).
Qu'est-ce qu'Apache Spark ?
Apache Spark est un moteur de calcul (compute engine) open-source, conçu pour le traitement de données distribué (Big Data).
vs. Hadoop MapReduce (Le "Pourquoi")
Spark a été créé pour remplacer Hadoop MapReduce (l'ancien standard).
ProblÚme (MapReduce) : MapReduce était basé sur le disque (Disk-based). Chaque étape (Map -> Reduce) devait écrire ses résultats sur le disque (HDFS) avant que l'étape suivante ne puisse lire (I/O Disque massif, trÚs lent).
Solution (Spark) : Spark est basé sur la mémoire (In-Memory). Il charge les données (RDD/DataFrame) en RAM sur le cluster et exécute les opérations (le DAG) en mémoire. (10x à 100x plus rapide que MapReduce).
Les 4 Piliers (Modules)
Spark n'est pas un seul outil, c'est une "stack" (pile) :
- Spark Core : Le "cĆur" (le moteur). GĂšre le scheduling (TĂąches, DAG), la mĂ©moire, et l'API RDD (2.1).
- Spark SQL : (Le plus utilisé) Le module pour les données structurées. Fournit l'API DataFrame (2.2), Spark SQL (3.1) et l'optimiseur Catalyst (3.2).
- Spark Streaming / Structured Streaming (6.1) : Le module pour le traitement en temps réel (flux).
- MLlib (7.2) : La bibliothÚque de Machine Learning distribué.
Une application Spark (ex: un spark-submit) s'exécute comme un ensemble de processus indépendants sur un cluster.
(Votre Script .py)
â
â (1. Soumission : spark-submit)
âŒ
[ CLUSTER MANAGER ] (ex: YARN, Kubernetes)
(Le "RH" : Négocie les ressources)
â
â (2. Lance le Driver)
âŒ
+---------------------+
| [ DRIVER (JVM) ] | (Le "Cerveau")
| (Votre 'main()') |
| (SparkContext) |
| (Catalyst Optimizer)|
| (Planifie le DAG) |
+---------------------+
â
â (3. NĂ©gocie les Executors)
â
ââââââââââș [ CLUSTER MANAGER ]
â
â (4. Lance les Executors)
â
âââââââââââââââââââââŽââââââââââââââââââââ
⌠âŒ
+----------------+ +----------------+
| [ EXECUTOR 1 ] | (Le "Muscle") | [ EXECUTOR 2 ] |
| (JVM/Worker) | | (JVM/Worker) |
| (Exécute Task 1) | | (Exécute Task 2) |
| (Cache Data A) | | (Cache Data B) |
+----------------+ +----------------+
- Cluster Manager (Gestionnaire) : (ex: YARN, Kubernetes). Le "gestionnaire de ressources". Spark lui "demande" des conteneurs (CPU/RAM).
- Driver (Pilote) : (1 par App) Le "cerveau". C'est le processus
main()de votre application. Il planifie le travail (le DAG) et coordonne les Executors. - Executor (Exécuteur) : (N par App) Les "muscles" (workers). Lancés par le Cluster Manager. Ils exécutent les Tùches (Tasks) (le calcul) et stockent (cache) les partitions de données en RAM.
| CritĂšre | Apache Spark (Open Source) | Databricks (SaaS) | AWS EMR (PaaS) |
|---|---|---|---|
| Description | Le moteur (logiciel). (DIY). | La plateforme SaaS (Lakehouse) (par les créateurs de Spark). | La plateforme PaaS (AWS). |
| Gestion | Manuelle (Installer Spark, Hadoop, gérer les clusters). | EntiÚrement managée (Zéro admin, UI, Notebooks). | Managée (Provisionne un cluster Hadoop/Spark "vanilla"). |
| Performance | Standard (JVM). | Optimisée (Moteur Photon C++, Delta Lake, Caching). | Standard (JVM). |
| Interface | CLI (spark-submit), Shell. | Notebooks Web, Databricks SQL, Jobs UI. | CLI (AWS), Console EMR, Notebooks (EMR Studio). |
| Coût | Gratuit (Coût des VMs). | Cher (Coût des VMs + Coût DBU (Databricks Unit)). | Moyen (Coût EC2 + Marge EMR). |
Le RDD est l'API "bas niveau" originale de Spark. C'est une collection d'objets (immuable, distribuée) sur laquelle on peut opérer en parallÚle.
Resilient : Si une partition (Executor) est perdue, Spark peut la recalculer (grĂące au DAG).
Distribué : Le RDD est "coupé" en Partitions, stockées sur les Executors.
Dataset : Une collection de données (ex: lignes d'un log, objets Python).
RDD (Non-Structuré)
Le RDD est "agnostique" (il ne connaßt pas le schéma). C'est une "boßte noire" d'objets (ex: RDD[String], RDD[MaClassePerso]).
Exemple (PySpark)
# 1. Créer un RDD (depuis un fichier)
lines_rdd = sc.textFile("s3://.../logs.txt")
# (RDD de Strings (lignes))
# 2. Transformation (map) - Lazy
errors_rdd = lines_rdd.filter(lambda line: "ERROR" in line)
# 3. Transformation (map) - Lazy
msgs_rdd = errors_rdd.map(lambda line: line.split(" ")[-1])
# 4. Action (Trigger) - Exécute le Job
results = msgs_rdd.collect()
Le DataFrame (DF) est l'API "haut niveau" moderne (introduite dans Spark SQL). C'est le standard de facto aujourd'hui.
DataFrame (Structuré)
Un DataFrame est un RDD (distribué) + un Schéma (Schema).
C'est une table (structurée) avec des colonnes nommées (similaire à un DataFrame Pandas ou une table SQL).
Exemple (PySpark)
# 1. Créer un DataFrame (lit le Parquet/Delta)
# (Le schéma est lu automatiquement)
df = spark.read.format("parquet").load("s3://.../users.parquet")
# df.printSchema()
# |-- user_id: long
# |-- age: integer
# |-- city: string
# 2. Transformation (select) - Lazy
# (On manipule des *colonnes*, pas des "lignes")
df_filtered = df.select("user_id", "age") \
.where(df["age"] > 30)
# 3. Action (Trigger)
df_filtered.show()
Dataset (Statique - Scala/Java)
Un Dataset (Scala/Java) est un DataFrame "typé statiquement". (Ex: Dataset[Person]). C'est le "meilleur des deux mondes" (Structuré (DF) + Typé (RDD)). (L'API Python (PySpark) n'a pas de Datasets, elle n'a que des DataFrames).
RÚgle : Toujours utiliser les DataFrames (Spark SQL), sauf cas trÚs spécifique (données binaires non-structurées).
Le RĂŽle de Catalyst (L'Optimiseur)
La raison principale est la performance, grĂące Ă l'Optimiseur Catalyst (3.2).
RDD (BoĂźte Noire)
Quand vous utilisez un RDD (ex: .filter(lambda x: ...)), Spark ne "voit" que du code (Python/Scala) opaque.
Il ne peut pas l'optimiser. Il doit sérialiser le code (le lambda) et l'envoyer à l'Executor tel quel.
(Inefficace)
DataFrame (BoĂźte Transparente)
Quand vous utilisez un DataFrame (ex: .where(col("age") > 30)), vous ne donnez pas de code, vous décrivez (déclaratif) une intention.
Catalyst (Optimizer) intercepte cette intention ("WHERE age > 30").
Optimisation (Pushdown) : Catalyst va réécrire votre requĂȘte. (Ex: Si la source est Parquet, il va "pousser" (pushdown) le filtre age > 30 directement au lecteur Parquet, qui ne lira mĂȘme pas les autres lignes (Pruning)).
(TrĂšs efficace)
Spark SQL est le module (et l'API) qui permet d'exĂ©cuter des requĂȘtes SQL (Standard) sur des DataFrames (ou des sources de donnĂ©es).
Usage (PySpark)
Les APIs DataFrame et SQL sont unifiĂ©es (elles partagent le mĂȘme moteur Catalyst).
(Session Spark 'spark' est créée)
# 1. Charger un DataFrame
df = spark.read.parquet("s3://.../users.parquet")
# 2. Enregistrer le DF comme une "Table" (temporaire, en mémoire)
df.createOrReplaceTempView("users_view")
# 3. (Magie) RequĂȘter la "Vue" en SQL
# (Ceci est exécuté par le moteur Spark, pas par une BDD SQL)
df_results = spark.sql("""
SELECT
city,
AVG(age) as avg_age
FROM
users_view
WHERE
age > 20
GROUP BY
city
ORDER BY
avg_age DESC
""")
df_results.show()
Usage : Permet aux Data Analysts (qui connaissent SQL) de faire du Big Data, sans apprendre PySpark/Scala.
Catalyst est le moteur d'optimisation (query optimizer) de Spark SQL. C'est le "cerveau" qui rend les DataFrames (2.2) et Spark SQL (3.1) rapides.
Flux (Simplifié)
Quand vous exécutez df.show() ou spark.sql() :
- Code (DF/SQL) : (ex:
df.select(...).filter(...)). - Catalyst (Analyse) : Convertit le code en Plan Logique (Unresolved).
- Catalyst (Analyse) : Vérifie le "Catalogue" (Métadonnées) -> Plan Logique (Analyzed).
- Catalyst (Optimisation Logique) : Applique des rÚgles (ex: Predicate Pushdown (Pousser les filtres), Constant Folding). -> Plan Logique (Optimisé).
- Catalyst (Optimisation Physique) : GénÚre plusieurs Plans Physiques (ex: "Hash Join" vs "Broadcast Join") et choisit le meilleur (basé sur les coûts).
- GĂ©nĂ©ration de Code (Codegen) : (Ătape finale) GĂ©nĂšre du bytecode Java (trĂšs rapide) pour ce plan.
- Exécution : (Envoie le code (Tùches) aux Executors).
Spark (via spark.read et df.write) peut se connecter à (presque) n'importe quelle source de données.
spark.read.format(...).load(...)
| Format | Description | Notes |
|---|---|---|
parquet | (Recommandé) Format Colonnaire (rapide, compressé, schéma). | Défaut. Idéal pour Data Lakes. |
delta | (Recommandé) (Delta Lake) "Parquet" + ACID (Logs). | Requis pour MERGE/UPDATE. (Utilisé par Databricks/Synapse). |
csv | Texte (lent, parsing). | Nécessite .option("header", "true"), .option("inferSchema", "true"). |
json | Texte (lent, parsing). | GĂšre le JSON multi-lignes. |
orc | (Concurrent de Parquet) Colonnaire. | (Utilisé par Hive). |
jdbc | Base de donnĂ©es (SQL). | .option("url", ...), .option("dbtable", ...). (Lent, pousse les requĂȘtes (pushdown)). |
C'est le concept fondamental de l'exécution Spark.
Spark divise les opérations en deux types : Transformations (Lazy) et Actions (Eager).
1. Transformations (Lazy / Paresseuses)
Une "Transformation" (ex: select, filter, map, groupBy) ne fait rien (n'exécute aucun calcul) quand vous l'appelez.
Elle ajoute simplement une étape au Plan d'Exécution (DAG) que Spark (Catalyst) construit en mémoire.
2. Actions (Eager / Actives)
Une "Action" (ex: show, count, collect, write) déclenche (triggers) l'exécution du DAG.
C'est seulement quand vous appelez une Action que Spark :
- (Analyse le DAG complet)
- (Optimise le DAG via Catalyst)
- (GénÚre un "Job" et l'envoie aux Executors)
# (1. Transformation) -> Lazy. (0 sec)
# (Spark construit le DAG : "Etape 1: Lire")
df = spark.read.parquet("s3://...")
# (2. Transformation) -> Lazy. (0 sec)
# (Spark ajoute au DAG : "Etape 2: Filtrer")
df_filtered = df.where(col("age") > 30)
# (3. Action) -> Eager. (Déclenche le Job)
# (Maintenant, Spark exécute (1) et (2))
df_filtered.show()
Lorsqu'une "Action" (4.1) est appelée, le Driver (Catalyst) convertit le "Plan Logique" en DAG (Directed Acyclic Graph) d'exécution physique.
[ JOB 1 (ex: .show()) ]
â
âââș [ STAGE 1 (Narrow) ] (ex: read + filter)
â ââ Task 1 (Partition 1)
â ââ Task 2 (Partition 2)
â ââ ... (Task N)
â
â (--- SHUFFLE (4.3) ---)
â
âââș [ STAGE 2 (Wide) ] (ex: groupBy)
ââ Task 1 (Reducer 1)
ââ Task 2 (Reducer 2)
- Job : (1 Job = 1 Action). (ex:
df.write(...)). - Stage (Ătape) : Un "Stage" est un ensemble de TĂąches qui peuvent ĂȘtre exĂ©cutĂ©es en parallĂšle (Narrow Transformation) sans Shuffle (4.3). Un
Shuffle(ex:groupBy) coupe le DAG et crée un nouveau Stage. - Task (Tùche) : (L'unité de travail) 1 Tùche = 1 "Partition" de données, exécutée sur 1 "Core" (CPU) d'un "Executor".
Le Shuffle (Mélange) est l'opération (physique) de redistribution des données sur le réseau (entre les Executors). C'est l'opération la plus coûteuse (lente) dans Spark.
Transformations "Narrow" vs "Wide"
- Narrow (Ătroit) : (Rapide) Pas de Shuffle. Les donnĂ©es restent sur leur partition. (Ex:
map,filter,select). - Wide (Large) : (Lent) NĂ©cessite un Shuffle. Les donnĂ©es (clĂ©s) doivent ĂȘtre dĂ©placĂ©es entre les Executors. (Ex:
groupByKey,reduceByKey,join(sauf Broadcast),repartition).
Exemple (groupByKey)
Pour compter les "Clés" (A, B, C), qui sont réparties sur 4 Executors :
- (Stage 1) Executor 1 (A, B), Executor 2 (A, C), Executor 3 (B, C)...
- (Shuffle - I/O Disque/Réseau) Executor 1 doit envoyer ses "B" et "C" aux autres. Executor 2 doit envoyer... (etc.).
- (Stage 2) (AprĂšs Shuffle) Executor 1 a (A, A), Executor 2 a (B, B), Executor 3 a (C, C).
- (Stage 2) Les Executors comptent (réduisent) leurs clés locales.
Optimisation : Toujours préférer reduceByKey (qui pré-agrÚge sur le "Mapper" (Stage 1)) à groupByKey (qui envoie tout (brut) au Shuffle).
spark-submitspark-submit est l'outil CLI (standard) pour soumettre une application Spark (ex: un .py ou .jar) Ă un Cluster Manager (1.2).
Syntaxe (Exemple PySpark sur YARN)
/usr/bin/spark-submit \
--master yarn \
--deploy-mode cluster \
--name "Mon_App_Prod" \
--driver-memory 4g \
--num-executors 50 \
--executor-memory 8g \
--executor-cores 4 \
--py-files libs.zip \
mon_script.py \
arg1 \
arg2
| Option | Description |
|---|---|
--master [url] | Le Cluster Manager (ex: yarn, k8s://..., local[*] (Test)). |
--deploy-mode [mode] | OĂč exĂ©cuter le Driver : client (local) ou cluster (Prod). (Voir 5.2). |
--name [nom] | Nom de l'application (visible dans l'UI YARN/Spark). |
--driver-memory [Go] | RAM pour le "Cerveau" (Driver). |
--num-executors [N] | Nombre de "Muscles" (Workers) Ă demander. |
--executor-memory [Go] | RAM pour 1 Muscle (Worker). |
--executor-cores [N] | Nb de vCPUs (TĂąches parallĂšles) pour 1 Muscle. |
--py-files [zip/egg] | (PySpark) Librairies Python Ă envoyer aux Executors. |
mon_script.py | Le script (Point d'entrée). |
arg1 arg2 | Arguments passés au script main(). |
client vs clusterL'option --deploy-mode (dans spark-submit) est cruciale. Elle dĂ©finit oĂč (physiquement) le processus Driver (1.2) va s'exĂ©cuter.
Mode client (Défaut)
(Développement / Interactif)
Le Driver (Cerveau) s'exĂ©cute sur la machine oĂč vous avez lancĂ© spark-submit (ex: votre laptop, ou le "Edge Node" du cluster).
Les Executors (Muscles) tournent sur le cluster (YARN/K8s).
- Avantage : Interactif. Le shell
pyspark(REPL) est en mode "client". - Inconvénient (Prod) : Si vous fermez votre laptop (ou tuez le terminal SSH), vous tuez le Driver (le Cerveau). Le Job (Application) crash.
- Inconvénient (Réseau) : Le Driver (Cerveau) doit communiquer (beaucoup) avec les Executors (Muscles) (WAN). (Latence).
Mode cluster (Recommandé)
(Production / Batch)
Le Driver (Cerveau) est envoyĂ© (soumis) au Cluster Manager (YARN), qui le lance sur un nĆud (VM) quelconque du cluster (gĂ©nĂ©ralement le "Master Node" YARN).
Les Executors (Muscles) tournent aussi sur le cluster.
(Laptop) -> spark-submit (Job 1) (Laptop) -> (Le terminal se libĂšre) (Sur le Cluster YARN) -> [Node 1 (Master)] -> (Driver (Job 1)) -> [Node 2] -> (Executor 1) -> [Node 3] -> (Executor 2)
- Avantage (Prod) : Résilience. Vous pouvez fermer votre laptop. Le Job (Driver) continue de tourner sur le cluster.
- Avantage (RĂ©seau) : Le Driver et les Executors sont "proches" (mĂȘme Data Center/VPC). (Faible latence).
La Spark UI est l'interface web (GUI) de diagnostic de Spark. C'est l'outil n°1 pour déboguer la performance.
Elle est (par défaut) hébergée sur le Driver (1.2), sur le Port 4040 (http://[IP_Driver]:4040).
Onglets Clés (Diagnostic)
| Onglet | Description |
|---|---|
| Jobs | Liste des "Actions" (4.1) (ex: show, write). Permet de voir le DAG (4.2). |
| Stages | (Le plus important) Liste les "Stages" (étapes) du Job. Permet d'identifier le Shuffle (4.3) (les "Shuffle Read/Write"). |
| Tasks | Liste les Tùches (unités) individuelles (ex: 500 Tùches "failed"). |
| Storage | Montre quels RDDs/DataFrames sont en Cache (Mémoire). |
| Executors | Liste les "Muscles" (Workers) (IP, RAM/Disque utilisé, Nb Tùches). |
| SQL | Affiche le Plan d'Exécution (Optimisé) de Catalyst (3.2). (Permet de voir si le "Predicate Pushdown" a fonctionné). |
DStreams (Legacy) : L'ancien "Spark Streaming" (avant 2.x) était basé sur des RDDs (DStreams). (ObsolÚte).
Structured Streaming (Moderne)
Le Structured Streaming (Spark 2.x+) est le moteur de streaming moderne. Il est basé sur l'API DataFrame (2.2).
Le "DataFrame Infini"
Concept : Vous traitez un flux (Stream) de données (ex: Kafka) comme s'il s'agissait d'une table (DataFrame) statique qui "grandit" (append-only).
Vous Ă©crivez une requĂȘte (Batch) une seule fois (ex: df.groupBy("type").count()). Spark (Moteur) se charge de l'exĂ©cuter en continu (micro-batch).
Exemple (PySpark)
# 1. Lecture (Source : Socket)
# (Crée un DF 'lines' qui représente le flux)
lines_df = spark.readStream \
.format("socket") \
.option("host", "localhost") \
.option("port", 9999) \
.load()
# 2. Transformation (DataFrame standard)
words_df = lines_df.select(
explode(split(lines_df.value, " ")).alias("word")
)
word_counts_df = words_df.groupBy("word").count()
# 3. Ăcriture (Sink : Console)
# (Action 'writeStream' qui déclenche le Job)
query = word_counts_df.writeStream \
.outputMode("complete") \
.format("console") \
.start()
query.awaitTermination()
Kafka (le Bus de Messages) est la Source (Input) et le Sink (Output) le plus courant pour Structured Streaming.
Source (readStream)
Lire (consommer) un topic Kafka.
df = spark.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "host1:port1,...") \
.option("subscribe", "mon_topic") \
.load()
# (Le DF Kafka a des colonnes 'key', 'value' (binaire),
# 'topic', 'partition', 'timestamp')
df_string = df.selectExpr("CAST(value AS STRING)")
Sink (writeStream)
Ăcrire (produire) un DataFrame (rĂ©sultat) dans un topic Kafka.
# (df_results est le résultat du 'groupBy')
# (Doit avoir une colonne 'value', ou 'key'/'value')
query = df_results.writeStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "host1:port1,...") \
.option("topic", "topic_resultats") \
.option("checkpointLocation", "/path/to/checkpoints") \
.start()
checkpointLocation (Crucial) : L'"état" (checkpoint) du streaming. C'est là que Spark écrit les "offsets" (positions) qu'il a lus (Garantie "Exactly-Once").
ProblÚme (Streaming) : Vous faites un groupBy(window("10 minutes")).count(). Que faire si un événement de 10h02 arrive en retard (à 10h15) (à cause du réseau) ?
Solution : Watermarking (Filigrane)
Le Watermarking (withWatermark) dit Ă Spark : "J'accepte les donnĂ©es en retard (ex: 10 minutes de retard), mais au-delĂ , ferme la fenĂȘtre (agrĂ©gation) et ignore-les."
C'est la façon dont Spark gÚre le compromis Latence vs Complétude.
df_logs = ... (avec "timestamp")
df_window = df_logs \
.withWatermark("timestamp", "10 minutes") \
.groupBy(
window("timestamp", "5 minutes") # (AgrĂšge par fenĂȘtre de 5 min)
) \
.count()
RĂ©sultat : Spark attendra 10 minutes (le "watermark") avant de finaliser (Ă©crire) le compte de la fenĂȘtre "10:00-10:05".
PySpark est l'API Python pour Spark. C'est le langage dominant en Data Science et Data Engineering (sur Spark).
Architecture (Py4J)
ProblÚme : Spark (Driver/Executor) tourne sur la JVM (Java/Scala). Python (CPython) tourne dans un processus séparé.
Solution : Py4J (la "glue").
[ Processus Python (PySpark) ]
(ex: df.select("age"))
â
â (Socket Py4J)
â
âŒ
[ Processus JVM (Spark Driver) ]
â
â (Catalyst Optimizer)
â (CrĂ©e le Plan)
âŒ
[ Executors (JVM) ]
Overhead : Les opérations DataFrame (3.1) sont rapides (le code Python est "traduit" en plan Spark). Les opérations RDD (rdd.map(lambda...)) sont lentes (Spark doit sérialiser le code Python, l'envoyer aux Executors, lancer un "Worker Python", exécuter, et resérialiser les résultats -> Coûteux).
MLlib est la bibliothÚque (intégrée) de Machine Learning distribué de Spark. (Basée sur l'API DataFrame).
Concept : Pipeline (ML)
MLlib (comme Scikit-learn) est basé sur le concept de Pipelines (chaßnes d'étapes).
(Exemple PySpark : Pipeline de Classification)
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml.classification import LogisticRegression
# 1. Ătapes de "Featurization" (PrĂ©-traitement)
# (Transformer "ville" (string) en "ville_idx" (nombre))
indexer = StringIndexer(inputCol="ville", outputCol="ville_idx")
# (Assembler les colonnes "features" en 1 vecteur)
assembler = VectorAssembler(inputCols=["age", "ville_idx"],
outputCol="features")
# 2. Ătape de "ModĂšle" (EntraĂźnement)
lr = LogisticRegression(featuresCol="features", labelCol="a_achete")
# 3. Définir le Pipeline
pipeline = Pipeline(stages=[indexer, assembler, lr])
# 4. EntraĂźner (Fit)
model = pipeline.fit(training_data_df)
# 5. Prédire
predictions_df = model.transform(test_data_df)
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, avg, sum, count, when, lit
# 1. Créer la Session (Driver)
spark = SparkSession.builder \
.appName("MonApp") \
.getOrCreate()
# 2. Lire (Source)
df = spark.read.format("parquet").load("/path/to/data")
# df = spark.read.csv("/path/to/csv", header=True, inferSchema=True)
# --- 3. TRANSFORMATIONS (Lazy) ---
# Sélectionner des colonnes
df_select = df.select("col_A", "col_B")
df_select = df.select(col("col_A"), (col("col_B") * 2).alias("col_B_double"))
# Filtrer (Where)
df_filter = df.where(col("age") > 30)
df_filter = df.filter((col("age") > 30) & (col("ville") == "Paris"))
# Ajouter/Modifier une colonne (withColumn)
df_with = df.withColumn("age_plus_10", col("age") + 10)
df_with = df.withColumn("categorie",
when(col("age") < 18, "Mineur")
.when(col("age") < 65, "Adulte")
.otherwise("Senior")
)
df_with = df.withColumn("constante", lit(123)) # Ajouter une constante
# Renommer / Supprimer
df_rename = df.withColumnRenamed("nom", "nouveau_nom")
df_drop = df.drop("col_inutile_1", "col_inutile_2")
# Jointure (Join)
df_joined = df_A.join(
df_B,
df_A["user_id"] == df_B["user_id"], # Clé de Join
"inner" # (Type: inner, left, right, full)
)
# Agrégation (GroupBy)
df_agg = df.groupBy("ville") \
.agg(
count("*").alias("nb_users"),
avg("age").alias("age_moyen")
)
# --- 4. ACTIONS (Eager) ---
# Afficher (Debug)
df_agg.show(10)
# Afficher le Schéma (Debug)
df_agg.printSchema()
# Compter (Action)
count = df_agg.count()
# Collecter (Action - DANGER : RAM Driver)
# (RamÚne TOUTES les données (résultat) au Driver)
results_list = df_agg.collect()
# Ăcrire (Sink)
df_agg.write \
.format("delta") \
.mode("overwrite") \
.save("/path/to/output")
