⚡ Apache Spark – Le Moteur du Big Data
Guide complet IDEO-Lab : PySpark, DataFrames, Lazy Evaluation, SQL, Streaming & MLlib.
Vue d'ensemble
Calcul distribué (cluster), In-Memory (rapide), API unifiée.
Big Data DistribuéPourquoi Spark ?
Successeur de Hadoop MapReduce (100x+ rapide). API (SQL, ML).
In-Memory MapReduceInstallation (PySpark)
pip install pyspark (pour le dev local). findspark.
Architecture Cluster
Driver (le "cerveau") vs Executors (les "bras", workers).
Driver ExecutorPoint d'entrée : SparkSession
SparkSession.builder.getOrCreate(). Le spark.
Concept N°1 : DataFrame
Abstraction principale (Table distribuée). Optimisée (Catalyst).
DataFrame RDD (Legacy)Concept N°2 : Lazy Evaluation
Spark attend. Les Transformations construisent un "plan" (DAG).
Lazy DAGTransformations vs Actions
Transformations (ex: .filter) (lazy) vs Actions (ex: .show) (déclenche).
I/O (Lecture & Écriture)
spark.read.parquet(), .csv(). df.write.parquet().
Schéma (StructType)
Définir la structure (StructField). (Évite l'inférence lente).
DataFrame API (Ops)
.select(), .filter(), .withColumn(). (Pandas-like).
Agrégation (.groupBy)
.groupBy('col').agg(F.sum('...')). (Wide transform / Shuffle).
Module : Spark SQL
spark.sql("SELECT ... FROM ..."). API SQL sur DataFrames.
Module : Structured Streaming
DataFrame "infini" (micro-batch). readStream, writeStream.
Module : MLlib (ML)
ML distribué (VectorAssembler, Pipeline, KMeans).
🐼 PySpark vs ⚡ Pandas
Distribué/Cluster (Lazy) vs Nœud unique/RAM (Eager).
Distribué Nœud uniqueDéploiement (Cluster)
Standalone (Dev), YARN (Hadoop), Kubernetes (Modern).
YARN KubernetesVitrine (Qui l'utilise ?)
Databricks, Netflix, Uber, Spotify, (toute la Tech).
Databricks NetflixLiens Utiles & Formation
spark.apache.org (Docs), Databricks (Blog/Tutoriels).
Cheat-sheet
Workflow (Session, Read, Transform, Action).
cheat WorkflowQu'est-ce qu'Apache Spark ?
Apache Spark est un **moteur de calcul distribué (cluster)** open-source, conçu pour le traitement de **Big Data** (Téraoctets/Pétaoctets).
Sa force principale est le **calcul In-Memory** (en RAM), ce qui le rend jusqu'à 100x plus rapide que son prédécesseur, **Hadoop MapReduce** (qui écrivait sur disque).
Ce n'est pas une base de données, c'est un moteur de **traitement** (ETL, SQL, ML, Streaming) qui lit depuis des SGBD (SQL), Data Lakes (S3, HDFS), ou des Queues (Kafka).
L'API Unifiée
Spark propose une API unifiée (principalement en Scala, Python/PySpark, SQL) pour :
- Spark SQL : Traitement de données structurées (DataFrames).
- Spark Streaming : Traitement en temps réel (Structured Streaming).
- MLlib : Machine Learning distribué.
- GraphX : Traitement de graphes (Legacy).
Spark a été créé pour résoudre les limitations de Hadoop MapReduce (MR).
| Critère | Hadoop MapReduce (Legacy) | Apache Spark (Moderne) |
|---|---|---|
| Traitement | Batch (Lots). | Batch, Streaming, SQL, ML (Unifié). |
| Stockage (Calcul) | Disque (HDFS). (Très lent, I/O disque à chaque étape). | En mémoire (In-Memory/RAM). (100x+ rapide). |
| API | Verbeux (Java). Map(), Reduce(). | Simple (Python, SQL, Scala). API haut niveau (DataFrame). |
Pour le développement, on n'a pas besoin d'un cluster. pip install pyspark lance Spark en **"Local Mode"** (le Driver et l'Executor sont sur votre machine).
# 1. (Recommandé) Créer un environnement virtuel python -m venv spark_env source spark_env/bin/activate # 2. Installer PySpark (API Python pour Spark) pip install pyspark # 3. (Optionnel: pour Jupyter) Trouver Spark pip install findspark
Lancement (Script ou Notebook)
import findspark
findspark.init()
import pyspark
from pyspark.sql import SparkSession
# (Le point d'entrée, voir 2.1)
spark = SparkSession.builder.appName("MonAppLocal").getOrCreate()
print(spark.version)
Spark est un système **Driver/Executors** (Maître/Esclaves).
- Driver (Pilote) : Le "cerveau" (là où
SparkSessiontourne). Il planifie le travail (le DAG). - Cluster Manager : Le "RH" (YARN, Kubernetes). Négocie les ressources (machines).
- Executors (Exécuteurs) : Les "bras" (Workers). Exécutent les "Tasks" (calculs) sur les partitions de données.
+-------------------------+
| Votre App (PySpark) |
| (Contient le "Driver") |
| (Planifie le DAG) |
+-------------------------+
| (1. Demande des ressources)
▼
+-------------------------+
| Cluster Manager |
| (YARN / Kubernetes) |
+-------------------------+
| (2. Alloue les Executors)
▼
+-------------------------+ +-------------------------+
| Executor 1 (Worker) | | Executor 2 (Worker) |
| (Cache) (Task 1, Task 3)| | (Cache) (Task 2, Task 4)|
+-------------------------+ +-------------------------+
| (3. Lit/Écrit) | (3. Lit/Écrit)
▼ ▼
+----------------------------------------+
| Stockage Distribué (S3, HDFS, Parquet) |
+----------------------------------------+
Depuis Spark 2.0, le SparkSession (conventionnellement appelé spark) est le point d'entrée unique pour toutes les fonctionnalités de Spark (SQL, DataFrame, Streaming).
C'est lui qui configure et lance le "Driver".
from pyspark.sql import SparkSession
# 1. Créer (ou récupérer) la session
spark = SparkSession.builder \
.appName("MonAnalyseDeVentes") \
.master("local[4]") \ // ("local[4]" = 4 cœurs en mode local)
.config("spark.some.config.option", "valeur") \
.getOrCreate()
# 2. Utiliser 'spark' pour lire/écrire/SQL
df = spark.read.csv("data.csv")
spark.sql(...)
# 3. Accéder au SparkContext (API bas niveau)
sc = spark.sparkContext
Spark a deux API de données :
| API | Description | Usage |
|---|---|---|
| RDD (Resilient Distributed Dataset) | (Legacy) API bas niveau. Non-structurée, pas d'optimisation (boîte noire). | Éviter. (Nécessaire pour du code très bas niveau). |
| DataFrame (Dataset en Scala/Java) | (Moderne) API haut niveau (comme Pandas/SQL). Structurée (Schéma). | Toujours utiliser. Permet l'optimisation (Catalyst Optimizer). |
Le Catalyst Optimizer (moteur d'optimisation de Spark) ne comprend que les DataFrames. Il réorganise les opérations (ex: "pousser" un filter avant un join) pour la performance.
C'est le concept le plus important. Spark est **LAZY (Paresseux)**.
Quand vous appliquez une Transformation (ex: .filter()), Spark n'exécute **rien**. Il construit un "plan d'exécution", un **DAG (Directed Acyclic Graph)**.
Le calcul (le "Job") n'est déclenché que lorsqu'une Action (ex: .show(), .count()) est appelée.
Diagramme (Lazy Evaluation)
// (1. Transformation 1: Build DAG)
df = spark.read.parquet("ventes.parquet")
// (Plan: Lire 'ventes.parquet')
// (2. Transformation 2: Update DAG)
df_fr = df.filter(df.pays == "FR")
// (Plan: Lire -> Filtre)
// (3. Transformation 3: Update DAG)
df_agg = df_fr.groupBy("produit").sum("prix")
// (Plan: Lire -> Filtre -> GroupBy)
// (RIEN NE S'EST ENCORE PASSÉ)
// (4. ACTION: Déclenche l'exécution du DAG)
df_agg.show()
// (Spark lance le Job, les Executors tournent)
Transformations (Lazy)
Construisent le DAG. Créent un nouveau DataFrame.
Narrow (Rapide) : Pas de "Shuffle". (.select, .filter, .withColumn). Les données restent sur leur partition.
Wide (Lent) : Nécessite un "Shuffle" (déplacement des données entre Executors). (.groupBy, .join, .orderBy).
Actions (Eager)
Déclenchent le Job (exécution du DAG).
.show(n)(Affiche n lignes).count()(Compte les lignes).collect()(Ramène **TOUT** au Driver. DANGEREUX !).take(n)(Ramène n lignes au Driver).write.parquet(...)(Sauvegarde les données)
L'API spark.read et df.write.
Parquet est le format N°1 du Big Data (colonnaire, compressé, optimisé pour Spark).
Lecture (spark.read)
// 1. Parquet (Format préféré)
df = spark.read.parquet("s3a://mon-bucket/data/ventes/")
// 2. CSV (Moins performant)
df = spark.read.csv(
"data.csv",
header=True,
inferSchema=True // (Lent, évitez en prod, voir 3.2)
)
// 3. (Générique)
df = spark.read.format("jdbc").option(...).load()
Écriture (df.write)
// 1. Parquet (avec partitionnement)
df.write \
.mode("overwrite") \ // ('overwrite', 'append', 'error')
.partitionBy("annee", "mois") \ // (Crée des dossiers ex: /annee=2024/)
.parquet("s3a://mon-bucket/output/ventes_clean/")
// 2. CSV
df.write.mode("append").csv("output.csv")
StructType)Laisser Spark "deviner" le schéma (inferSchema=True) est lent (Spark doit lire le fichier 2 fois). En production, on définit **toujours** le schéma.
Exemple (StructType)
from pyspark.sql.types import (
StructType, StructField, StringType, IntegerType, DoubleType
)
# 1. Définir le schéma
schema = StructType([
StructField("id_client", IntegerType(), nullable=False),
StructField("nom", StringType(), nullable=True),
StructField("prix_total", DoubleType(), nullable=True)
])
# 2. Utiliser le schéma à la lecture
df = spark.read.csv(
"data.csv",
header=True,
schema=schema // (Rapide, pas d'inférence)
)
# 3. Afficher le schéma
df.printSchema()
L'API DataFrame ressemble beaucoup à Pandas, mais les opérations sont **Lazy**.
from pyspark.sql.functions import F
# (df = DataFrame chargé)
# 1. Sélection (Select)
# (Retourne un nouveau DF)
df_select = df.select("col1", "col2")
# (Syntaxe 'dot')
df_select_dot = df.select(df.col1, df.col2)
# 2. Filtre (Filter / Where)
df_filter = df.filter(df.age > 30)
df_filter_multi = df.filter(
(df.age > 30) & (df.ville == "Paris")
)
# 3. Ajouter/Modifier Colonne (withColumn)
df_new_col = df.withColumn(
"age_plus_un", // (Nom nouvelle colonne)
df.age + 1 // (Logique)
)
# (Utiliser F.* pour les fonctions SQL)
df_new_col_F = df.withColumn(
"nom_upper",
F.upper(df.nom)
)
# 4. Renommer
df_renamed = df.withColumnRenamed("nom", "nom_client")
# 5. Supprimer
df_dropped = df.drop("col_inutile")
.groupBy)Similaire à Pandas groupby (Split-Apply-Combine). C'est une **Wide Transformation** (implique un "Shuffle" coûteux des données sur le cluster).
On utilise pyspark.sql.functions (alias F) pour les fonctions d'agrégation.
from pyspark.sql.functions import F
// 1. Syntaxe simple
df_agg_simple = df.groupBy("ville").mean("salaire")
// (Renomme 'avg(salaire)')
// 2. Syntaxe .agg() (Préférée, multi-aggs)
df_agg_multi = df.groupBy("ville").agg(
F.mean("salaire").alias("salaire_moyen"),
F.sum("ventes").alias("ventes_totales"),
F.count("id_client").alias("nb_clients")
)
// 3. GroupBy Multi-colonnes
df_agg_multi_cols = df.groupBy("ville", "departement").agg(
F.max("age").alias("age_max")
)
// (Ceci est LAZY. Lancer .show() pour exécuter)
df_agg_multi_cols.show()
Spark SQL permet d'exécuter des requêtes SQL (ANSI-SQL) directement sur les DataFrames. C'est souvent plus simple que l'API Python pour des JOIN ou GROUP BY complexes.
# (df = DataFrame chargé)
# 1. Enregistrer le DataFrame comme "Vue" temporaire
df.createOrReplaceTempView("ventes")
df_clients.createOrReplaceTempView("clients")
# 2. Exécuter du SQL (via spark.sql)
# (Ceci est une Transformation LAZY)
df_sql = spark.sql("""
SELECT
c.nom,
SUM(v.prix) as total_depense
FROM ventes v
JOIN clients c ON v.client_id = c.client_id
WHERE v.pays = 'FR'
GROUP BY c.nom
ORDER BY total_depense DESC
""")
# 3. Déclenche le calcul
df_sql.show()
Structured Streaming est l'API haut niveau (basée sur les DataFrames) pour le streaming (ex: lire de Kafka, fichiers S3 entrants...).
Spark traite le flux (stream) comme un "DataFrame infini" (unbounded table). Il exécute des "micro-batchs" (ex: toutes les 30s) sur les nouvelles données.
# 1. Lecture (readStream) (ex: depuis Kafka)
df_stream = spark.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "host:port") \
.option("subscribe", "mon_topic") \
.load()
# 2. Transformations (identiques à l'API DF/SQL)
df_processed = df_stream.selectExpr("CAST(value AS STRING)") \
.filter(...)
# 3. Écriture (writeStream) (Déclenche)
query = df_processed.writeStream \
.outputMode("append") \ // ('append', 'complete', 'update')
.format("parquet") \ // (Sink: Parquet, Kafka, Console)
.option("path", "/output/data") \
.option("checkpointLocation", "/checkpoints") \ // (Crucial pour la tolérance)
.trigger(processingTime='1 minute') \
.start()
query.awaitTermination()
MLlib est la bibliothèque ML de Spark (similaire à Scikit-learn, mais distribuée).
VectorAssembler : Étape N°1 obligatoire. MLlib requiert que toutes les features (X) soient dans une seule colonne "vecteur".
Pipeline : Comme sklearn, on utilise des Pipelines pour chaîner (Scaler, OHE, VectorAssembler, Modèle).
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler, StandardScaler
from pyspark.ml.classification import LogisticRegression
# 1. Assembler les features en 1 vecteur
assembler = VectorAssembler(
inputCols=["age", "salaire", "ohe_ville"],
outputCol="features"
)
# 2. Scaler
scaler = StandardScaler(inputCol="features", outputCol="scaledFeatures")
# 3. Modèle
lr = LogisticRegression(featuresCol="scaledFeatures", labelCol="target")
# 4. Pipeline
pipeline = Pipeline(stages=[assembler, scaler, lr])
# 5. Entraîner
model = pipeline.fit(train_data)
# 6. Prédire
predictions = model.transform(test_data)
La syntaxe est similaire, mais la philosophie est radicalement différente.
| Critère | Pandas | PySpark |
|---|---|---|
| Scope | Nœud unique (Single-Node) | Distribué (Cluster) |
| Taille Données | Limité par la RAM de la machine (Small Data). | Quasiment illimité (stockage cluster) (Big Data). |
| Exécution | Eager (Avide). (df['a'] + 1 s'exécute instantanément). | Lazy (Paresseux). (Construit un DAG, attend une Action). |
| Mutation | Mutable (ex: df['a'] = ... modifie df). | Immuable (.withColumn() crée un *nouveau* DataFrame). |
| API | Très riche (plotting, 1000s fonctions). | Plus limitée (focus sur ETL/ML distribué). |
Pandas API on Spark (pyspark.pandas) : Un effort pour implémenter l'API Pandas "Eager" au-dessus du moteur Spark "Lazy". Utile pour la transition.
Le Cluster Manager est le "RH" (voir 1.4) qui alloue les Executors (machines).
| Manager | Description |
|---|---|
| Local | (.master("local[*]")). Dev & Test. Driver et Executors sur la même machine (la vôtre). |
| Standalone | Le manager de cluster simple, inclus avec Spark. (Peu utilisé en prod). |
| Apache YARN | (Standard Legacy) Le manager de l'écosystème Hadoop. (Très courant "on-premise"). |
| Apache Mesos | (Legacy) Manager de cluster (ex: Twitter). (Moins courant). |
| Kubernetes (K8s) | (Standard Moderne) Spark tourne nativement sur K8s. (Standard du Cloud). |
Spark est le standard de facto pour l'ETL (Extract-Transform-Load) et le ML à grande échelle (Big Data).
| Entreprise | Cas d'usage |
|---|---|
| Databricks | L'entreprise fondée par les créateurs de Spark. (Leur plateforme EST Spark). |
| Netflix | Pipelines ETL, recommandation de contenu (MLlib, Spark SQL) (batch et streaming). |
| Uber | ETL (ex: traitement des données de courses/GPS), modélisation (MLlib), streaming. |
| Spotify | Recommandation musicale (MLlib), analyse comportementale (Spark SQL). |
| Amazon / Microsoft / Google | Fournissent des services Spark managés (EMR, Databricks, Dataproc). |
Ressources pour apprendre et travailler avec Apache Spark.
| Site | Description |
|---|---|
| spark.apache.org | Le site officiel. Point d'entrée pour la documentation. |
| Spark Programming Guide | (spark.apache.org/docs/latest/guide.html) Le "User Guide" officiel (conceptuel). |
| PySpark API Reference | (spark.apache.org/docs/latest/api/python/) La documentation de l'API PySpark (détail des fonctions). |
| Databricks (Blog/Learning) | Tutoriels de haute qualité par les créateurs de Spark (souvent la meilleure source). |
1. Setup (Session)
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
spark = SparkSession.builder \
.appName("MonApp") \
.master("local[*]") \
.getOrCreate()
2. Read (Lazy)
df = spark.read \
.option("header", True) \
.option("inferSchema", True) \
.csv("data.csv")
3. Transform (Lazy)
// (Rien n'est exécuté ici)
// Select
df_trans = df.select("nom", "age", "salaire")
// Filter
df_trans = df_trans.filter(df.age > 30)
// GroupBy
df_agg = df_trans.groupBy("age").agg(
F.mean("salaire").alias("salaire_moyen")
)
// SQL
df.createOrReplaceTempView("personnes")
df_sql = spark.sql("SELECT * FROM personnes")
4. Action (Déclenche)
// Affiche le plan (DAG)
df_agg.explain()
// Déclenche le calcul et affiche
df_agg.show()
// Déclenche et sauvegarde
df_agg.write.mode("overwrite").parquet("output/")
// Stop
spark.stop()
