đïž Delta Lake â Le "Lakehouse" (ACID, Time Travel, Z-Order)
Guide complet IDEO-Lab sur la couche de stockage (ACID) pour Data Lakes (Spark, Databricks).
Concept : Couche ACID
Couche de stockage (Storage Layer) ACID pour Data Lakes.
Delta Lake ACID Open SourceLe ProblĂšme : Data Lake (Parquet)
Pas d'ACID, Pas d'UPDATE, Corruption (Jobs échoués).
La Solution : Lakehouse
Fiabilité (DWH) + Coût (Data Lake). (Permis par Delta).
Lakehouse DatabricksArchi : Fichiers Parquet
Les données (Data) sont stockées en .parquet (ouvert).
Archi : _delta_log (Le Cerveau)
Le journal de transaction (JSON, Checkpoints).
_delta_log Transaction LogGarantie : ACID (Atomicité)
Comment le log (Commit) garantit l'atomicité.
ACID CommitFonction : Time Travel
TIMESTAMP AS OF, VERSION AS OF. (DĂ» au Log).
Fonction : DML (UPDATE, DELETE)
Support des opérations UPDATE, DELETE, MERGE.
Fonction : Schema
Schema Enforcement (Respect) & Evolution (Ăvolution).
Schema Enforcement Schema EvolutionOptimisation : OPTIMIZE
Compactage (Bin-Packing) (Résout le "Small Files Problem").
OPTIMIZE Bin-PackingOptimisation : Z-ORDER
Clustering (Colocalisation) des données (CLUSTER BY).
Maintenance : VACUUM
Nettoyage (DELETE physique) des anciens fichiers (Time Travel).
Streaming : Source
spark.readStream.format("delta").
Streaming : Sink (Destination)
df.writeStream.format("delta"). (Garantie Exactly-Once).
Streaming : Change Data Feed (CDC)
Capturer les changements (UPDATE, DELETE) d'une table Delta.
Usage : PySpark
spark.read/write.format("delta").
Usage : SQL
CREATE TABLE ... USING DELTA, MERGE INTO....
Partage : Delta Sharing
Protocole ouvert (REST) pour partager des tables Delta.
Delta Sharing PartageQu'est-ce que Delta Lake ?
Delta Lake est une couche de stockage (storage layer) open-source (gérée par la Linux Foundation) conçue pour s'exécuter par-dessus un Data Lake existant (ex: S3, ADLS, GCS).
Son unique objectif est d'apporter les garanties ACID (Atomicité, Cohérence, Isolation, Durabilité) (la fiabilité d'un Data Warehouse SQL) à des fichiers "Big Data" (comme Parquet).
ACID ?
ACID est ce qui garantit qu'une base de données est fiable. (Voir 2.3).
Format (Parquet + JSON)
Delta Lake n'est pas un nouveau format de données. C'est une combinaison :
- Les Données (Data) : Stockées au format Apache Parquet (ouvert, colonnaire, rapide).
- Le "Cerveau" (Log) : Un Journal de Transactions (
_delta_log) (fichiers JSON) qui décrit quelles actions (COMMIT) ont été appliquées à quels fichiers Parquet.
Delta Lake est la technologie fondamentale qui permet l'architecture "Lakehouse" (1.3).
Un "Data Lake" (ex: un bucket S3 rempli de fichiers Parquet) est bon marché et scalable, mais il souffre de problÚmes de fiabilité majeurs :
1. Pas d'ACID (Pas d'Atomicité)
Scénario (Job ETL) : Un Job Spark (100 serveurs) doit écrire 1 To de données (1000 fichiers Parquet) sur S3.
Le Crash : Le Job échoue (crash) à 90% (au fichier 900/1000).
Résultat (Data Lake Corrompu) : Le dossier /ventes/date=.../ contient 900 fichiers (partiels). La table est corrompue et inutilisable (les données sont incomplÚtes).
Solution Delta (ACID) : (Voir 2.3) Le job écrit les 900 fichiers (cachés). Le job crash. Le _delta_log (le "commit") n'est jamais écrit. La table n'a jamais changé (atomicité).
2. Pas de DML (UPDATE / DELETE)
Les fichiers Parquet (sur S3) sont immuables (Immutable). On ne peut pas "ouvrir" un fichier Parquet sur S3 et "changer une ligne".
ProblÚme : Comment gérer une demande RGPD/GDPR ("Supprimez-moi (DELETE) !") ou un UPDATE (ex: "Changer le statut de 'En cours' à 'Livré'") ?
Solution Delta (DML) : (Voir 3.2) Delta Lake gĂšre UPDATE/DELETE/MERGE en utilisant le Log (Copie-on-Write).
3. Le "Small Files Problem" (ProblĂšme des petits fichiers)
ProblÚme (Streaming) : Un job de streaming (ex: Kafka -> S3) qui tourne toutes les minutes va créer des milliers de trÚs petits fichiers Parquet (ex: 1 Mo).
RĂ©sultat : Les requĂȘtes (SELECT) sont extrĂȘmement lentes, car Spark/Redshift passe plus de temps Ă "lister les fichiers" (I/O S3) qu'Ă lire les donnĂ©es.
Solution Delta (Optimize) : (Voir 4.1) La commande OPTIMIZE (Compactage) fusionne (compacte) automatiquement les petits fichiers en grands fichiers (ex: 1 Go).
Le Lakehouse (terme inventé par Databricks) est une architecture qui combine le meilleur des deux mondes (Data Lake vs Data Warehouse).
(Data Warehouse) + (Data Lake) = Lakehouse (Fiabilité/ACID) + (Flexibilité/Coût)
La technologie qui permet le "Lakehouse" est Delta Lake (ou ses concurrents : Apache Iceberg, Apache Hudi).
Un Lakehouse (implémenté avec Delta Lake) fournit :
- Stockage : Formats ouverts (Parquet) sur stockage bon marché (S3/ADLS).
- Fiabilité : Transactions ACID (grùce au
_delta_log). - DML : Support
UPDATE,DELETE,MERGE. - Gouvernance : Schema Enforcement, Time Travel.
Le "Data Plane" (Plan de Données) de Delta Lake est composé de fichiers Apache Parquet.
Quand vous écrivez (df.write.format("delta")), Spark écrit (par défaut) de nouveaux fichiers .parquet (compressés, colonnaires) dans le dossier de la table.
/path/to/my_delta_table/ ââ part-00000-....snappy.parquet ââ part-00001-....snappy.parquet ââ ...
Ces fichiers .parquet sont immuables (Immutable). Ils ne sont jamais modifiés. Un UPDATE (3.2) ne modifie pas ces fichiers, il en crée de nouveaux.
_delta_log (Le Cerveau)Le "Control Plane" (Plan de ContrÎle) de Delta Lake est le dossier _delta_log. C'est la Source de Vérité (SSoT) de la table.
/path/to/my_delta_table/
ââ (Fichiers Parquet...)
ââ _delta_log/
ââ 000000.json
ââ 000001.json
ââ 000002.json
ââ 000003.json
ââ _last_checkpoint
Fichiers JSON (Commits)
Chaque transaction (écriture) sur la table crée un nouveau fichier .json (Commit) dans le log. Ce JSON est un "delta" (un changement) atomique.
(Ex: 000001.json)
{
"commitInfo": { ... "operation": "WRITE" ... },
"add": { "path": "part-00001.parquet", "size": 100MB, ... }
}
(Ex: 000002.json - (DELETE))
{
"commitInfo": { ... "operation": "DELETE" ... },
"remove": { "path": "part-00000.parquet", ... }
}Ătat (State)
L'état actuel de la table est la somme (agrégation) de tous les fichiers JSON (Commits) du log. C'est ce qui permet ACID (2.3) et Time Travel (3.1).
Checkpoints : Pour éviter d'avoir à lire 1 million de JSON, Delta Lake crée (par défaut tous les 10 commits) un _checkpoint.parquet qui "compacte" l'état (Snapshot).
Delta Lake fournit les garanties ACID (Atomicité, Cohérence, Isolation, Durabilité) aux Data Lakes.
Comment fonctionne l'Atomicité (Commit) ?
C'est la garantie la plus importante (résout le problÚme 1.2 "Job échoué").
(Flux d'un 'df.write.format("delta")')
1. [Spark Job] (Phase 1 : Ăcriture des donnĂ©es)
(Ăcrit les nouveaux fichiers (ex: part-0005.parquet)
dans le dossier, mais "cachés" ou "temporaires")
2. [Spark Job] (Phase 2 : Le "Commit")
(Tente d'écrire (atomiquement) le
fichier '000005.json' dans le dossier '_delta_log/')
(Le '000005.json' contient :
{ "add": "part-00005.parquet" } )
3. (Vérification de Conflit)
(Si '000005.json' existe déjà (conflit),
le Job échoue et recommence (Retry))
4. (SuccĂšs)
('000005.json' est écrit)
- Si le Job Ă©choue (Crash) en Phase 1 (Ăcriture Parquet) : Rien ne se passe. Le
_delta_log(le "commit") n'est jamais écrit. La table est intacte. - Si le Job réussit (Phase 2) : Le fichier
.json(le "commit") est écrit. La transaction est validée.
Le Time Travel (Voyage Temporel) est une conséquence directe du _delta_log (2.2). Puisque Delta Lake conserve l'historique (les JSONs) de toutes les versions de la table, il peut "reconstruire" l'état de la table à n'importe quel moment.
(Similaire Ă git checkout [hash]).
Usage (PySpark)
# 1. Lire la version la plus récente
df = spark.read.format("delta").load("/path/to/table")
# 2. Lire la version 5 (le 5Ăšme commit)
df_v5 = spark.read.format("delta") \
.option("versionAsOf", 5) \
.load("/path/to/table")
# 3. Lire l'état... tel qu'il était hier
df_yesterday = spark.read.format("delta") \
.option("timestampAsOf", "2025-11-09 10:00:00") \
.load("/path/to/table")
Usage (SQL)
-- (SQL) SELECT * FROM ma_table VERSION AS OF 5 SELECT * FROM ma_table TIMESTAMP AS OF '2025-11-09'
Cas d'Usage
- Audit / Débogage : (Comparer v5 et v6).
- Rollback (Restauration) : (Si le Job 6 (mauvais) a corrompu les données) ->
RESTORE TABLE ma_table TO VERSION AS OF 5.
UPDATE, DELETE, MERGE)Delta Lake supporte les commandes DML (Data Manipulation Language) (impossibles sur Parquet pur).
Fonctionnement (Copy-on-Write)
Les fichiers Parquet sont immuables. Un UPDATE ne modifie pas un fichier Parquet. Il réécrit le fichier (compactage).
- (Ătat V1) :
part-001.parquet(1000 lignes) - (RequĂȘte)
UPDATE table SET ... WHERE id = 5(Ligne 5 est danspart-001). - (Job Spark) Spark lit
part-001(1000 lignes). - (Job Spark) Réécrit un nouveau fichier :
part-002.parquet(les 999 lignes + 1 ligne modifiĂ©e). - (Commit) Ăcrit le
_delta_log(JSON) :{ "remove": "part-001", "add": "part-002" }.
MERGE INTO (Upsert)
La commande la plus puissante (ETL) : UPDATE (si existe) ou INSERT (si n'existe pas).
(SQL) MERGE INTO target_table T USING source_updates S ON T.id = S.id -- (Si l'ID existe) WHEN MATCHED THEN UPDATE SET T.value = S.value -- (Si l'ID n'existe pas) WHEN NOT MATCHED THEN INSERT (id, value) VALUES (S.id, S.value)
Schema Enforcement (Respect du Schéma)
(Défaut) C'est la protection contre les "données sales" (dirty data).
Delta Lake stocke le schéma (UserID: int, Email: string) dans le _delta_log.
Scénario : Vous essayez d'écrire (df.write) un nouveau DataFrame avec un mauvais type (ex: UserID: string) ou une colonne manquante.
Résultat (Parquet pur) : Le job écrit, corrompant la table (la moitié est int, l'autre string).
Résultat (Delta Lake) : Delta Lake rejette (reject) l'écriture (AnalysisException: Mismatch schema). La table est protégée (garantie de cohérence).
Schema Evolution (Ăvolution du SchĂ©ma)
ProblÚme : Que faire si je veux légitimement ajouter une nouvelle colonne (ex: "Ville") ? (Par défaut, "Enforcement" (ci-dessus) le bloque).
Solution (mergeSchema)
On utilise l'option .option("mergeSchema", "true") lors de l'écriture (df.write).
Flux :
- Delta Lake compare le schéma (Nouveau) et l'ancien (Log).
- Il voit la nouvelle colonne "
Ville". - Il fusionne (merge) les schémas, et l'ajoute (en
NULL) aux anciens fichiers (métadonnées). - Il autorise l'écriture (
COMMIT).
OPTIMIZE (Compactage)Le ProblĂšme (Small Files) : (Voir 1.2) L'ingestion (Streaming, DML) crĂ©e des milliers de petits fichiers Parquet (1 Ko - 1 Mo). Les requĂȘtes SELECT deviennent lentes (I/O S3 excessif).
Solution (OPTIMIZE - Bin-Packing)
La commande OPTIMIZE (Compactage) est une opération de "nettoyage" qui résout ce problÚme.
Flux (OPTIMIZE ma_table) :
_delta_log (Commit) : { "remove": [1000 petits], "add": [1 gros] }.-- (SQL) OPTIMIZE ma_table_delta -- (PySpark) deltaTable.optimize()
Résultat : Les SELECT sont (beaucoup) plus rapides. (à exécuter périodiquement, ex: cron).
Z-ORDER (Clustering)Z-ORDER (similaire au CLUSTER BY de Snowflake/BQ) est une technique de clustering (colocalisation) des données (utilisée avec OPTIMIZE).
ProblÚme : (Similaire au "Pruning") Si vous filtrez sur WHERE region='FR' AND produit='A' (colonnes non-partitionnées), Spark doit scanner tous les fichiers Parquet.
Solution (Z-ORDER)
Z-ORDER réécrit les fichiers Parquet (pendant OPTIMIZE) en triant (colocalisant) les données sur plusieurs dimensions (en utilisant une "Courbe Z").
-- (Optimise ET Trie physiquement -- les données par 'region' et 'produit') OPTIMIZE ma_table_delta ZORDER BY (region, produit)
RĂ©sultat : Les donnĂ©es (ex: "France/Produit A") sont regroupĂ©es dans les mĂȘmes fichiers Parquet. La requĂȘte (WHERE region='FR' AND produit='A') ne lit que ces fichiers (Data Skipping / Pruning).
Attention : Ne pas Z-Order sur des colonnes à haute cardinalité (ex: user_id).
VACUUM (Nettoyage)ProblÚme : UPDATE (3.2) et OPTIMIZE (4.1) ne suppriment pas les anciens fichiers Parquet (ils ne font que les marquer "removed" (supprimé) dans le _delta_log). Ils sont gardés pour le Time Travel (3.1).
Conséquence : La taille (S3/ADLS) de votre table augmente indéfiniment.
Solution : VACUUM (Nettoyage)
VACUUM est la commande (destructrice) qui supprime physiquement (DELETE) les fichiers de données (Parquet) qui ne sont plus référencés par le _delta_log (ET qui sont plus vieux que la période de rétention (défaut : 7 jours)).
-- (Analyse le Log, trouve les fichiers "morts" -- (plus vieux que 7 jours) et les supprime (DELETE) de S3) VACUUM ma_table_delta -- (Forcer (dangereux) : supprimer tout ce qui -- est plus vieux que 0 heure (casse le Time Travel)) VACUUM ma_table_delta RETAIN 0 HOURS
Delta Lake est une source (Source) de streaming "native" pour Structured Streaming (Spark).
Spark (Streaming) "surveille" (tail) le _delta_log (2.2). DÚs qu'un nouveau .json (Commit) apparaßt, Spark le lit et traite (en micro-batch) uniquement les nouveaux fichiers ("add") listés dans ce commit.
# (PySpark)
# (Crée un DF 'silver_stream' qui lit
# en continu la table Delta 'bronze_table')
silver_stream_df = spark.readStream \
.format("delta") \
.load("/path/to/bronze_table")
# (Appliquer des transformations...)
gold_df = silver_stream_df.where(...)
gold_df.writeStream \
.format("delta") \
.outputMode("append") \
.option("checkpointLocation", "...") \
.start("/path/to/gold_table")Delta Lake est le Sink (Destination) recommandé pour le Structured Streaming.
ProblÚme (Parquet Sink) : Si vous écrivez (writeStream) sur Parquet, vous n'avez aucune garantie (ACID). Si le job crash au milieu, vous avez des données partielles (corrompues) (1.2).
Solution (Delta Sink)
Utiliser .format("delta") comme "Sink" (destination).
Grùce au Transaction Log (2.2), Delta Lake garantit la tolérance aux pannes et la sémantique Exactly-Once (via le "Checkpoint" du streaming + le "Commit" atomique de Delta).
# (df_stream = Données de Kafka)
query = df_stream.writeStream \
.format("delta") \
.outputMode("append") \
.option("checkpointLocation", "/checkpoints/bronze/") \
.start("/path/to/bronze_delta_table")ProblÚme : readStream (5.1) ne voit que les nouvelles données (INSERT). Il ne voit pas les UPDATE ou DELETE (ex: MERGE (3.2)) effectués sur la table source.
Solution : Change Data Feed (CDC)
CDC (Change Data Feed) (une option Ă activer sur la table Delta) dit Ă Delta de stocker (logger) les changements (UPDATE, DELETE) dans le _delta_log (dans un dossier _change_data).
-- (Activer CDC sur la table Silver)
ALTER TABLE silver_table SET TBLPROPERTIES (delta.enableChangeDataFeed = true)
-- (PySpark - Lire le *flux de changements* de Silver)
df_cdc = spark.readStream \
.format("delta") \
.option("readChangeData", True) \
.option("startingVersion", 1) \
.load("/path/to/silver_table")
# (df_cdc a 2 colonnes en plus :
# _change_type : 'insert', 'update_preimage',
# 'update_postimage', 'delete'
# _commit_version : ...)
# (Permet de propager les DELETEs/UPDATEs
# de Silver -> Gold)from delta.tables import *
from pyspark.sql.functions import *
# --- Ăcriture (Overwrite) ---
df.write.format("delta") \
.mode("overwrite") \
.option("overwriteSchema", "true") \
.save("/path/to/table")
# --- Ăcriture (Append) ---
df_new.write.format("delta") \
.mode("append") \
.save("/path/to/table")
# --- Ăcriture (Merge Schema) ---
df_new_columns.write.format("delta") \
.option("mergeSchema", "true") \
.mode("append") \
.save("/path/to/table")
# --- Lecture ---
df_delta = spark.read.format("delta").load("/path/to/table")
# --- Lecture (Time Travel) ---
df_v5 = spark.read.format("delta") \
.option("versionAsOf", 5) \
.load("/path/to/table")
# --- DML (UPDATE/DELETE/MERGE) ---
# (Utiliser l'API DeltaTable)
deltaTable = DeltaTable.forPath(spark, "/path/to/table")
# (DELETE)
deltaTable.delete(col("date") < "2020-01-01")
# (UPDATE)
deltaTable.update(
condition = col("status") == "NEW",
set = { "status": lit("PROCESSED") }
)
# (MERGE - Upsert)
deltaTable.alias("target").merge(
source_df.alias("source"),
"target.id = source.id"
) \
.whenMatchedUpdateAll() \
.whenNotMatchedInsertAll() \
.execute()
# --- Maintenance ---
deltaTable.optimize().executeCompaction()
deltaTable.optimize().executeZOrderBy("region", "produit")
deltaTable.vacuum(retentionHours=168) # (7 jours)Delta Lake (via Spark SQL, Databricks, Synapse) supporte les commandes SQL DDL/DML.
-- 1. Créer une table Delta CREATE TABLE ma_table_delta ( id INT, nom STRING ) USING DELTA LOCATION 's3://.../path/to/table' PARTITIONED BY (date); -- 2. DML (UPDATE/DELETE/MERGE) UPDATE ma_table_delta SET nom = 'Nouveau' WHERE id = 1; DELETE FROM ma_table_delta WHERE id = 2; MERGE INTO ... (Voir 3.2); -- 3. Lecture (Time Travel) SELECT * FROM ma_table_delta VERSION AS OF 5; SELECT * FROM ma_table_delta TIMESTAMP AS OF '2025-11-10'; -- 4. Maintenance OPTIMIZE ma_table_delta; OPTIMIZE ma_table_delta ZORDER BY (col_a); VACUUM ma_table_delta; -- (Défaut 7 jours)
Delta Sharing (créé par Databricks, open-source) est un protocole ouvert pour partager (en lecture seule) des tables Delta Lake entre organisations, sans copier les données.
(C'est la réponse "ouverte" au "Secure Data Sharing" (5.3) (propriétaire) de Snowflake).
Fonctionnement (REST & S3)
- Provider (Fournisseur) :
- (Databricks) Crée un
SHARE(Partage). - (Databricks)
GRANT SELECT ON TABLE ... TO SHARE ... - (Databricks) Crée un "Recipient" (Destinataire) et lui donne un token (
.sharefile).
- (Databricks) Crée un
- Consumer (Consommateur) :
- (Peut ĂȘtre n'importe quoi : Python, Pandas, Power BI, Spark...)
- Utilise le
.share(token) pour s'authentifier auprÚs du Serveur Delta Sharing (REST) du Provider. - Le Serveur (REST) renvoie une liste d'URLs pré-signées (S3) pour les fichiers Parquet (de la table partagée).
- Le Consommateur lit les fichiers Parquet directement depuis le S3 du Provider.
Avantage : Ouvert. Le consommateur n'a pas besoin d'ĂȘtre sur Databricks (ni mĂȘme sur le mĂȘme Cloud).
from delta.tables import *
from pyspark.sql.functions import *
# --- Ăcriture (Overwrite) ---
df.write.format("delta") \
.mode("overwrite") \
.option("overwriteSchema", "true") \
.save("/path/to/table")
# --- Ăcriture (Append) ---
df_new.write.format("delta") \
.mode("append") \
.save("/path/to/table")
# --- Ăcriture (Merge Schema) ---
df_new_columns.write.format("delta") \
.option("mergeSchema", "true") \
.mode("append") \
.save("/path/to/table")
# --- Lecture ---
df_delta = spark.read.format("delta").load("/path/to/table")
# --- Lecture (Time Travel) ---
df_v5 = spark.read.format("delta") \
.option("versionAsOf", 5) \
.load("/path/to/table")
# --- DML (UPDATE/DELETE/MERGE) ---
# (Utiliser l'API DeltaTable)
deltaTable = DeltaTable.forPath(spark, "/path/to/table")
# (DELETE)
deltaTable.delete(col("date") < "2020-01-01")
# (UPDATE)
deltaTable.update(
condition = col("status") == "NEW",
set = { "status": lit("PROCESSED") }
)
# (MERGE - Upsert)
deltaTable.alias("target").merge(
source_df.alias("source"),
"target.id = source.id"
) \
.whenMatchedUpdateAll() \
.whenNotMatchedInsertAll() \
.execute()
# --- Maintenance ---
deltaTable.optimize().executeCompaction()
deltaTable.optimize().executeZOrderBy("region", "produit")
deltaTable.vacuum(retentionHours=168) # (7 jours)
