Project Oxygen & Ideo-LabIDEO LAB Dashboard 2026

⚡ Apache Spark – Le Moteur du Big Data

Guide complet IDEO-Lab : PySpark, DataFrames, Lazy Evaluation, SQL, Streaming & MLlib.

1.1 Facile

Vue d'ensemble

Calcul distribué (cluster), In-Memory (rapide), API unifiée.

Big Data Distribué
1.2 Moyen

Pourquoi Spark ?

Successeur de Hadoop MapReduce (100x+ rapide). API (SQL, ML).

In-Memory MapReduce
1.3 Facile

Installation (PySpark)

pip install pyspark (pour le dev local). findspark.

pip pyspark
1.4 Moyen

Architecture Cluster

Driver (le "cerveau") vs Executors (les "bras", workers).

Driver Executor
2.1 Facile

Point d'entrée : SparkSession

SparkSession.builder.getOrCreate(). Le spark.

SparkSession Entry Point
2.2 Facile

Concept N°1 : DataFrame

Abstraction principale (Table distribuée). Optimisée (Catalyst).

DataFrame RDD (Legacy)
2.3 Avancé

Concept N°2 : Lazy Evaluation

Spark attend. Les Transformations construisent un "plan" (DAG).

Lazy DAG
2.4 Moyen

Transformations vs Actions

Transformations (ex: .filter) (lazy) vs Actions (ex: .show) (déclenche).

Transformation Action
3.1 Facile

I/O (Lecture & Écriture)

spark.read.parquet(), .csv(). df.write.parquet().

read write Parquet
3.2 Moyen

Schéma (StructType)

Définir la structure (StructField). (Évite l'inférence lente).

StructType Schema
3.3 Moyen

DataFrame API (Ops)

.select(), .filter(), .withColumn(). (Pandas-like).

.select() .filter()
3.4 Moyen

Agrégation (.groupBy)

.groupBy('col').agg(F.sum('...')). (Wide transform / Shuffle).

.groupBy() agg()
4.1 Facile

Module : Spark SQL

spark.sql("SELECT ... FROM ..."). API SQL sur DataFrames.

Spark SQL SELECT
4.2 Avancé

Module : Structured Streaming

DataFrame "infini" (micro-batch). readStream, writeStream.

Streaming readStream
4.3 Avancé

Module : MLlib (ML)

ML distribué (VectorAssembler, Pipeline, KMeans).

MLlib Pipeline (ML)
4.4 Moyen

🐼 PySpark vs ⚡ Pandas

Distribué/Cluster (Lazy) vs Nœud unique/RAM (Eager).

Distribué Nœud unique
5.1 Moyen

Déploiement (Cluster)

Standalone (Dev), YARN (Hadoop), Kubernetes (Modern).

YARN Kubernetes
5.2 Facile

Vitrine (Qui l'utilise ?)

Databricks, Netflix, Uber, Spotify, (toute la Tech).

Databricks Netflix
5.3 Facile

Liens Utiles & Formation

spark.apache.org (Docs), Databricks (Blog/Tutoriels).

Documentation Databricks
5.4 Facile

Cheat-sheet

Workflow (Session, Read, Transform, Action).

cheat Workflow
1.1 Vue d'ensemble : Le Moteur de Calcul Distribué
Qu'est-ce qu'Apache Spark ?

Apache Spark est un **moteur de calcul distribué (cluster)** open-source, conçu pour le traitement de **Big Data** (Téraoctets/Pétaoctets).

Sa force principale est le **calcul In-Memory** (en RAM), ce qui le rend jusqu'à 100x plus rapide que son prédécesseur, **Hadoop MapReduce** (qui écrivait sur disque).

Ce n'est pas une base de données, c'est un moteur de **traitement** (ETL, SQL, ML, Streaming) qui lit depuis des SGBD (SQL), Data Lakes (S3, HDFS), ou des Queues (Kafka).

L'API Unifiée

Spark propose une API unifiée (principalement en Scala, Python/PySpark, SQL) pour :

  • Spark SQL : Traitement de données structurées (DataFrames).
  • Spark Streaming : Traitement en temps réel (Structured Streaming).
  • MLlib : Machine Learning distribué.
  • GraphX : Traitement de graphes (Legacy).
1.2 Pourquoi Spark ? (vs Hadoop MapReduce)

Spark a été créé pour résoudre les limitations de Hadoop MapReduce (MR).

CritèreHadoop MapReduce (Legacy)Apache Spark (Moderne)
TraitementBatch (Lots).Batch, Streaming, SQL, ML (Unifié).
Stockage (Calcul)Disque (HDFS). (Très lent, I/O disque à chaque étape).En mémoire (In-Memory/RAM). (100x+ rapide).
APIVerbeux (Java). Map(), Reduce().Simple (Python, SQL, Scala). API haut niveau (DataFrame).
1.3 Installation (PySpark en "Local Mode")

Pour le développement, on n'a pas besoin d'un cluster. pip install pyspark lance Spark en **"Local Mode"** (le Driver et l'Executor sont sur votre machine).

# 1. (Recommandé) Créer un environnement virtuel
python -m venv spark_env
source spark_env/bin/activate

# 2. Installer PySpark (API Python pour Spark)
pip install pyspark

# 3. (Optionnel: pour Jupyter) Trouver Spark
pip install findspark
Lancement (Script ou Notebook)
import findspark
findspark.init()

import pyspark
from pyspark.sql import SparkSession

# (Le point d'entrée, voir 2.1)
spark = SparkSession.builder.appName("MonAppLocal").getOrCreate()

print(spark.version)
1.4 Diagramme : Architecture Cluster

Spark est un système **Driver/Executors** (Maître/Esclaves).

  • Driver (Pilote) : Le "cerveau" (là où SparkSession tourne). Il planifie le travail (le DAG).
  • Cluster Manager : Le "RH" (YARN, Kubernetes). Négocie les ressources (machines).
  • Executors (Exécuteurs) : Les "bras" (Workers). Exécutent les "Tasks" (calculs) sur les partitions de données.
+-------------------------+
| Votre App (PySpark)     |
| (Contient le "Driver")  |
| (Planifie le DAG)       |
+-------------------------+
      | (1. Demande des ressources)
      ▼
+-------------------------+
| Cluster Manager         |
| (YARN / Kubernetes)     |
+-------------------------+
      | (2. Alloue les Executors)
      ▼
+-------------------------+  +-------------------------+
| Executor 1 (Worker)     |  | Executor 2 (Worker)     |
| (Cache) (Task 1, Task 3)|  | (Cache) (Task 2, Task 4)|
+-------------------------+  +-------------------------+
      | (3. Lit/Écrit)             | (3. Lit/Écrit)
      ▼                          ▼
+----------------------------------------+
| Stockage Distribué (S3, HDFS, Parquet) |
+----------------------------------------+
2.1 Point d'entrée : SparkSession

Depuis Spark 2.0, le SparkSession (conventionnellement appelé spark) est le point d'entrée unique pour toutes les fonctionnalités de Spark (SQL, DataFrame, Streaming).

C'est lui qui configure et lance le "Driver".

from pyspark.sql import SparkSession

# 1. Créer (ou récupérer) la session
spark = SparkSession.builder \
    .appName("MonAnalyseDeVentes") \
    .master("local[4]") \ // ("local[4]" = 4 cœurs en mode local)
    .config("spark.some.config.option", "valeur") \
    .getOrCreate()

# 2. Utiliser 'spark' pour lire/écrire/SQL
df = spark.read.csv("data.csv")
spark.sql(...)

# 3. Accéder au SparkContext (API bas niveau)
sc = spark.sparkContext
2.2 Concept N°1 : DataFrame (vs RDD)

Spark a deux API de données :

APIDescriptionUsage
RDD (Resilient Distributed Dataset)(Legacy) API bas niveau. Non-structurée, pas d'optimisation (boîte noire).Éviter. (Nécessaire pour du code très bas niveau).
DataFrame (Dataset en Scala/Java)(Moderne) API haut niveau (comme Pandas/SQL). Structurée (Schéma).Toujours utiliser. Permet l'optimisation (Catalyst Optimizer).

Le Catalyst Optimizer (moteur d'optimisation de Spark) ne comprend que les DataFrames. Il réorganise les opérations (ex: "pousser" un filter avant un join) pour la performance.

2.3 Concept N°2 : Lazy Evaluation & DAG

C'est le concept le plus important. Spark est **LAZY (Paresseux)**.

Quand vous appliquez une Transformation (ex: .filter()), Spark n'exécute **rien**. Il construit un "plan d'exécution", un **DAG (Directed Acyclic Graph)**.

Le calcul (le "Job") n'est déclenché que lorsqu'une Action (ex: .show(), .count()) est appelée.

Diagramme (Lazy Evaluation)
// (1. Transformation 1: Build DAG)
df = spark.read.parquet("ventes.parquet")
// (Plan: Lire 'ventes.parquet')

// (2. Transformation 2: Update DAG)
df_fr = df.filter(df.pays == "FR")
// (Plan: Lire -> Filtre)

// (3. Transformation 3: Update DAG)
df_agg = df_fr.groupBy("produit").sum("prix")
// (Plan: Lire -> Filtre -> GroupBy)

// (RIEN NE S'EST ENCORE PASSÉ)

// (4. ACTION: Déclenche l'exécution du DAG)
df_agg.show()
// (Spark lance le Job, les Executors tournent)
2.4 Transformations (Narrow/Wide) vs Actions
Transformations (Lazy)

Construisent le DAG. Créent un nouveau DataFrame.

Narrow (Rapide) : Pas de "Shuffle". (.select, .filter, .withColumn). Les données restent sur leur partition.

Wide (Lent) : Nécessite un "Shuffle" (déplacement des données entre Executors). (.groupBy, .join, .orderBy).

Actions (Eager)

Déclenchent le Job (exécution du DAG).

  • .show(n) (Affiche n lignes)
  • .count() (Compte les lignes)
  • .collect() (Ramène **TOUT** au Driver. DANGEREUX !)
  • .take(n) (Ramène n lignes au Driver)
  • .write.parquet(...) (Sauvegarde les données)
3.1 I/O (Lecture & Écriture)

L'API spark.read et df.write.

Parquet est le format N°1 du Big Data (colonnaire, compressé, optimisé pour Spark).

Lecture (spark.read)
// 1. Parquet (Format préféré)
df = spark.read.parquet("s3a://mon-bucket/data/ventes/")

// 2. CSV (Moins performant)
df = spark.read.csv(
    "data.csv",
    header=True,
    inferSchema=True // (Lent, évitez en prod, voir 3.2)
)

// 3. (Générique)
df = spark.read.format("jdbc").option(...).load()
Écriture (df.write)
// 1. Parquet (avec partitionnement)
df.write \
  .mode("overwrite") \ // ('overwrite', 'append', 'error')
  .partitionBy("annee", "mois") \ // (Crée des dossiers ex: /annee=2024/)
  .parquet("s3a://mon-bucket/output/ventes_clean/")
  
// 2. CSV
df.write.mode("append").csv("output.csv")
3.2 Schéma (StructType)

Laisser Spark "deviner" le schéma (inferSchema=True) est lent (Spark doit lire le fichier 2 fois). En production, on définit **toujours** le schéma.

Exemple (StructType)
from pyspark.sql.types import (
    StructType, StructField, StringType, IntegerType, DoubleType
)

# 1. Définir le schéma
schema = StructType([
    StructField("id_client", IntegerType(), nullable=False),
    StructField("nom", StringType(), nullable=True),
    StructField("prix_total", DoubleType(), nullable=True)
])

# 2. Utiliser le schéma à la lecture
df = spark.read.csv(
    "data.csv",
    header=True,
    schema=schema // (Rapide, pas d'inférence)
)

# 3. Afficher le schéma
df.printSchema()
3.3 DataFrame API (Ops) (Pandas-like)

L'API DataFrame ressemble beaucoup à Pandas, mais les opérations sont **Lazy**.

from pyspark.sql.functions import F

# (df = DataFrame chargé)

# 1. Sélection (Select)
# (Retourne un nouveau DF)
df_select = df.select("col1", "col2")

# (Syntaxe 'dot')
df_select_dot = df.select(df.col1, df.col2)

# 2. Filtre (Filter / Where)
df_filter = df.filter(df.age > 30)
df_filter_multi = df.filter(
    (df.age > 30) & (df.ville == "Paris")
)

# 3. Ajouter/Modifier Colonne (withColumn)
df_new_col = df.withColumn(
    "age_plus_un", // (Nom nouvelle colonne)
    df.age + 1     // (Logique)
)

# (Utiliser F.* pour les fonctions SQL)
df_new_col_F = df.withColumn(
    "nom_upper",
    F.upper(df.nom)
)

# 4. Renommer
df_renamed = df.withColumnRenamed("nom", "nom_client")

# 5. Supprimer
df_dropped = df.drop("col_inutile")
3.4 Agrégation (.groupBy)

Similaire à Pandas groupby (Split-Apply-Combine). C'est une **Wide Transformation** (implique un "Shuffle" coûteux des données sur le cluster).

On utilise pyspark.sql.functions (alias F) pour les fonctions d'agrégation.

from pyspark.sql.functions import F

// 1. Syntaxe simple
df_agg_simple = df.groupBy("ville").mean("salaire")
// (Renomme 'avg(salaire)')

// 2. Syntaxe .agg() (Préférée, multi-aggs)
df_agg_multi = df.groupBy("ville").agg(
    F.mean("salaire").alias("salaire_moyen"),
    F.sum("ventes").alias("ventes_totales"),
    F.count("id_client").alias("nb_clients")
)

// 3. GroupBy Multi-colonnes
df_agg_multi_cols = df.groupBy("ville", "departement").agg(
    F.max("age").alias("age_max")
)

// (Ceci est LAZY. Lancer .show() pour exécuter)
df_agg_multi_cols.show()
4.1 Module : Spark SQL

Spark SQL permet d'exécuter des requêtes SQL (ANSI-SQL) directement sur les DataFrames. C'est souvent plus simple que l'API Python pour des JOIN ou GROUP BY complexes.

# (df = DataFrame chargé)

# 1. Enregistrer le DataFrame comme "Vue" temporaire
df.createOrReplaceTempView("ventes")
df_clients.createOrReplaceTempView("clients")

# 2. Exécuter du SQL (via spark.sql)
# (Ceci est une Transformation LAZY)
df_sql = spark.sql("""
    SELECT 
        c.nom,
        SUM(v.prix) as total_depense
    FROM ventes v
    JOIN clients c ON v.client_id = c.client_id
    WHERE v.pays = 'FR'
    GROUP BY c.nom
    ORDER BY total_depense DESC
""")

# 3. Déclenche le calcul
df_sql.show()
4.2 Module : Structured Streaming

Structured Streaming est l'API haut niveau (basée sur les DataFrames) pour le streaming (ex: lire de Kafka, fichiers S3 entrants...).

Spark traite le flux (stream) comme un "DataFrame infini" (unbounded table). Il exécute des "micro-batchs" (ex: toutes les 30s) sur les nouvelles données.

# 1. Lecture (readStream) (ex: depuis Kafka)
df_stream = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "host:port") \
    .option("subscribe", "mon_topic") \
    .load()

# 2. Transformations (identiques à l'API DF/SQL)
df_processed = df_stream.selectExpr("CAST(value AS STRING)") \
    .filter(...)

# 3. Écriture (writeStream) (Déclenche)
query = df_processed.writeStream \
    .outputMode("append") \ // ('append', 'complete', 'update')
    .format("parquet") \ // (Sink: Parquet, Kafka, Console)
    .option("path", "/output/data") \
    .option("checkpointLocation", "/checkpoints") \ // (Crucial pour la tolérance)
    .trigger(processingTime='1 minute') \
    .start()

query.awaitTermination()
4.3 Module : MLlib (Machine Learning)

MLlib est la bibliothèque ML de Spark (similaire à Scikit-learn, mais distribuée).

VectorAssembler : Étape N°1 obligatoire. MLlib requiert que toutes les features (X) soient dans une seule colonne "vecteur".

Pipeline : Comme sklearn, on utilise des Pipelines pour chaîner (Scaler, OHE, VectorAssembler, Modèle).

from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler, StandardScaler
from pyspark.ml.classification import LogisticRegression

# 1. Assembler les features en 1 vecteur
assembler = VectorAssembler(
    inputCols=["age", "salaire", "ohe_ville"],
    outputCol="features"
)

# 2. Scaler
scaler = StandardScaler(inputCol="features", outputCol="scaledFeatures")

# 3. Modèle
lr = LogisticRegression(featuresCol="scaledFeatures", labelCol="target")

# 4. Pipeline
pipeline = Pipeline(stages=[assembler, scaler, lr])

# 5. Entraîner
model = pipeline.fit(train_data)

# 6. Prédire
predictions = model.transform(test_data)
4.4 🐼 PySpark vs ⚡ Pandas

La syntaxe est similaire, mais la philosophie est radicalement différente.

CritèrePandasPySpark
ScopeNœud unique (Single-Node)Distribué (Cluster)
Taille DonnéesLimité par la RAM de la machine (Small Data).Quasiment illimité (stockage cluster) (Big Data).
ExécutionEager (Avide). (df['a'] + 1 s'exécute instantanément).Lazy (Paresseux). (Construit un DAG, attend une Action).
MutationMutable (ex: df['a'] = ... modifie df).Immuable (.withColumn() crée un *nouveau* DataFrame).
APITrès riche (plotting, 1000s fonctions).Plus limitée (focus sur ETL/ML distribué).

Pandas API on Spark (pyspark.pandas) : Un effort pour implémenter l'API Pandas "Eager" au-dessus du moteur Spark "Lazy". Utile pour la transition.

5.1 Déploiement (Cluster Managers)

Le Cluster Manager est le "RH" (voir 1.4) qui alloue les Executors (machines).

ManagerDescription
Local(.master("local[*]")). Dev & Test. Driver et Executors sur la même machine (la vôtre).
StandaloneLe manager de cluster simple, inclus avec Spark. (Peu utilisé en prod).
Apache YARN(Standard Legacy) Le manager de l'écosystème Hadoop. (Très courant "on-premise").
Apache Mesos(Legacy) Manager de cluster (ex: Twitter). (Moins courant).
Kubernetes (K8s)(Standard Moderne) Spark tourne nativement sur K8s. (Standard du Cloud).
5.2 Vitrine (Qui utilise Spark ?)

Spark est le standard de facto pour l'ETL (Extract-Transform-Load) et le ML à grande échelle (Big Data).

EntrepriseCas d'usage
DatabricksL'entreprise fondée par les créateurs de Spark. (Leur plateforme EST Spark).
NetflixPipelines ETL, recommandation de contenu (MLlib, Spark SQL) (batch et streaming).
UberETL (ex: traitement des données de courses/GPS), modélisation (MLlib), streaming.
SpotifyRecommandation musicale (MLlib), analyse comportementale (Spark SQL).
Amazon / Microsoft / GoogleFournissent des services Spark managés (EMR, Databricks, Dataproc).
5.4 Cheat-sheet (PySpark Workflow)
1. Setup (Session)
from pyspark.sql import SparkSession
import pyspark.sql.functions as F

spark = SparkSession.builder \
    .appName("MonApp") \
    .master("local[*]") \
    .getOrCreate()
2. Read (Lazy)
df = spark.read \
    .option("header", True) \
    .option("inferSchema", True) \
    .csv("data.csv")
3. Transform (Lazy)
// (Rien n'est exécuté ici)

// Select
df_trans = df.select("nom", "age", "salaire")

// Filter
df_trans = df_trans.filter(df.age > 30)

// GroupBy
df_agg = df_trans.groupBy("age").agg(
    F.mean("salaire").alias("salaire_moyen")
)

// SQL
df.createOrReplaceTempView("personnes")
df_sql = spark.sql("SELECT * FROM personnes")
4. Action (Déclenche)
// Affiche le plan (DAG)
df_agg.explain()

// Déclenche le calcul et affiche
df_agg.show()

// Déclenche et sauvegarde
df_agg.write.mode("overwrite").parquet("output/")

// Stop
spark.stop()