đ Apache Flink â Moteur de Streaming (Stateful, Event Time)
Guide complet IDEO-Lab sur le framework "Stream Processing" (State, Checkpoints, Watermarks).
Concept : Stateful Streaming
"Stream-First", Stateful (Ătat), Faible latence, "Exactly-Once".
Flink Streaming StatefulArchi : JobManager
Le "MaĂźtre" (Cerveau). (Coordonnateur, Checkpoint Coordinator).
JobManager MaĂźtreArchi : TaskManager & Slots
L'"Esclave" (Muscle). Slots (Unité de parallélisme).
TaskManager Slotsvs. Spark Streaming
Streaming "Natif" (Flink) vs Micro-Batch (Spark).
Spark Streaming Comparatifvs. Kafka Streams
Moteur (Flink) vs Librairie (Kafka Streams).
Kafka Streams ComparatifAPIs (DataStream vs Table)
DataStream (Impératif) vs Table/SQL (Déclaratif).
DataStream API Table APIConcept : Stateful Streaming
Garder une "mémoire" (SUM, COUNT) dans le flux.
Types d'Ătat (State)
ValueState, MapState, ListState. (Keyed State).
State Backends (Stockage)
HashMap (RAM/Dev) vs RocksDB (Disque/Prod).
Concept : Checkpointing
Le mécanisme de "Snapshot" (Exactly-Once).
Checkpointing SnapshotCheckpoint Barriers (BarriĂšres)
L'algorithme (Asynchrone) de Chandy-Lamport.
Barriers Chandy-LamportCheckpoints vs Savepoints
HA (Automatique) vs Maintenance (Manuel).
Checkpoint SavepointConcept : Temps
Event Time (Donnée) vs Processing Time (Machine).
Event Time Processing TimeWatermarks (Filigranes)
Gestion du retard ("Données en retard").
Watermarks Late DataTypes de FenĂȘtres (Windows)
Tumbling (Fixe), Sliding (Glissante), Session.
DataStream API (Base)
Source -> Transformations -> Sink.
Transformations (DataStream)
map, filter, keyBy, window, reduce.
Connecteurs (Sources/Sinks)
Kafka, Pulsar, RabbitMQ, Elasticsearch, JDBC.
Connectors KafkaTable API & Flink SQL
L'API "haut niveau" (Déclarative) (SELECT ...).
FlinkCEP (Patterns)
Complex Event Processing (Détection de séquences).
FlinkCEP Pattern APIRessources & Liens
Site officiel, Documentation, GitHub.
Documentation LiensQu'est-ce qu'Apache Flink ?
Apache Flink est un framework (cadre) et un moteur de calcul distribué open-source, conçu pour le Stateful Stream Processing (Traitement de flux "avec état").
Philosophie ("Stream-First") : Contrairement à Spark (Batch-first), Flink est un moteur de streaming "natif". Il traite les données événement par événement (ou micro-batch) avec une trÚs faible latence.
Caractéristiques Clés (Le "Pourquoi")
- Stateful (Avec Ătat) : (Sa force n°1) Flink est conçu pour gĂ©rer l'Ă©tat (state) (ex:
COUNT,SUM) localement (sur le worker), ce qui est trĂšs rapide (voir 3.1). - TolĂ©rance aux Pannes (Exactly-Once) : (Sa force n°2) Son mĂ©canisme de Checkpointing (4.1) (basĂ© sur Chandy-Lamport) garantit des sĂ©mantiques "Exactly-Once" (traitement garanti 1 fois) mĂȘme en cas de panne.
- Temps (Event Time) : (Sa force n°3) Support natif de l'Event Time (Temps de l'ĂvĂ©nement) (voir 5.1), ce qui permet de gĂ©rer les donnĂ©es en retard (late data) grĂące aux Watermarks (5.2).
- Batch (Unifié) : Flink considÚre le "Batch" (traitement par lots) comme un "cas particulier" du Streaming (un flux qui se termine).
L'architecture de Flink (comme Spark/Hadoop) est MaĂźtre/Esclave. Le "MaĂźtre" (Cerveau) est le JobManager.
RĂŽles du JobManager
Il y a 1 JobManager par Job (Application) Flink. (Ou 1 JobManager "Cluster" en mode HA/YARN).
- Réception du Job : Reçoit le "JobGraph" (le DAG) du client (ex:
flink run ...). - Planification (Scheduling) : Transforme le "JobGraph" (logique) en "ExecutionGraph" (physique).
- Coordination (Cerveau) : Coordonne les TaskManagers (1.3) (les "Esclaves") et leur assigne des "Tasks" (tĂąches).
- Checkpoint Coordinator : (RĂŽle vital) C'est le JobManager qui initie les Checkpoints (4.1) en injectant les "BarriĂšres" (4.2) dans le flux.
- HA (ZooKeeper) : Peut ĂȘtre configurĂ© en Haute DisponibilitĂ© (HA) (Active/Standby) en utilisant ZooKeeper (similaire Ă HDFS HA).
Le TaskManager (ou "TaskExecutor") est l'esclave (worker). C'est le "Muscle" qui exécute le travail.
Task Slots (Unités de Parallélisme)
Un TaskManager (ex: une VM ou un Conteneur K8s) est divisé en "Task Slots" (Emplacements).
Un Slot est l'unité de ressource (CPU/RAM) de Flink. C'est (conceptuellement) 1 thread d'exécution.
Exemple : Un TaskManager (VM) avec 8 CĆurs (CPU) peut ĂȘtre configurĂ© pour avoir 8 Slots.
[ TASKMANAGER (VM) ] (8 Coeurs, 32 Go RAM) (8 Slots) ââ [Slot 1] (1 Coeur, 4Go RAM) -> (ExĂ©cute TĂąche A (Job 1)) ââ [Slot 2] (1 Coeur, 4Go RAM) -> (ExĂ©cute TĂąche B (Job 1)) ââ [Slot 3] (1 Coeur, 4Go RAM) -> (ExĂ©cute TĂąche C (Job 2)) ââ ...
Parallélisme
Le Parallélisme (Parallelism) de votre Job (ex: parallelism=100) définit le nombre total de Tùches (Threads) nécessaires. Flink (JobManager) distribuera ces 100 Tùches sur les Slots disponibles (ex: sur 13 TaskManagers de 8 Slots).
Data Exchange (Shuffle) : Les TaskManagers communiquent directement (via réseau/Netty) pour le "Shuffle" (ex: aprÚs un keyBy).
C'est la comparaison "classique". (Note : Spark Structured Streaming (6.1) a brouillé les pistes).
Flink (True Streaming / Natif)
ModÚle : "Stream-First". Flink traite les événements un par un (ou par mini-batchs) dÚs qu'ils arrivent.
- Latence : TrĂšs Faible (millisecondes).
- Temps : Conçu pour Event Time (Temps de l'ĂvĂ©nement) (5.1) (via Watermarks).
- Ătat (State) : Gestion d'Ă©tat (3.1) trĂšs robuste (par Ă©vĂ©nement).
- Usage : Cas "temps réel" (Détection de fraude, Alerting).
Spark Streaming (Micro-Batch)
ModÚle : "Batch-First". Spark (Streaming) traite le flux comme une série de petits "Batchs" (lots) (ex: "Traite tous les événements des 5 derniÚres secondes").
- Latence : Plus élevée (secondes). (Liée à la taille du micro-batch).
- Temps : Historiquement "Processing Time". (Support "Event Time" ajouté, mais moins natif que Flink).
- Ătat (State) : Gestion d'Ă©tat (
updateStateByKey) (plus complexe/moins performant que Flink). - Usage : ETL (Micro-batch), si la latence n'est pas critique (secondes/minutes).
Ce sont deux moteurs de "Stateful Streaming" (3.1) trĂšs performants.
| CritĂšre | Apache Flink (Framework) | Kafka Streams (Librairie) |
|---|---|---|
| Type | Framework (Moteur). Un systĂšme "autonome" (Cluster). | Librairie (BibliothĂšque) (.jar). (Pas de cluster). |
| ĂcosystĂšme | Agnostique (Se connecte Ă Kafka, Pulsar, S3...). | Kafka-Only. (Ne lit que depuis Kafka, n'Ă©crit que vers Kafka). |
| Déploiement | Complexe (flink run, YARN, K8s). | Simple (Embarqué dans votre App Java/Scala). |
| Ătat (State) | State Backends (RocksDB). | State Stores (RocksDB, In-Memory). |
| Usage | Traitement complexe (multi-sources), multi-équipes. | Enrichissement "temps réel" (Microservices) (Stream-Table Join). |
Flink (comme Spark) offre deux niveaux d'API (qui sont unifiées : on peut passer de l'une à l'autre).
1. DataStream API (Niveau Bas/Impératif)
(Ăquiv. RDD de Spark). C'est l'API "cĆur" de Flink.
Usage : Logique "complexe" (Stateful) (ex: ProcessFunction), oĂč vous avez besoin d'un contrĂŽle total (par Ă©vĂ©nement).
(Java/Scala)
DataStream<String> stream = ...
stream
.map(new MyMapFunction())
.keyBy(event -> event.userID)
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.reduce(new MyReduceFunction())2. Table API & Flink SQL (Niveau Haut/Déclaratif)
(Ăquiv. DataFrame/SQL de Spark).
Usage : ETL, Analytique (BI). Plus simple, optimisé par Flink.
Table API (Code) :
(Java/Scala)
Table table = tEnv.fromDataStream(stream);
Table result = table
.window(Tumble.over(lit(5).seconds()).on(...))
.groupBy(col("userID"), window)
.select(col("userID"), window.end(), col("value").sum());Flink SQL (SQL) :
(SQL) SELECT userID, TUMBLE_END(event_time, INTERVAL '5' SECOND), SUM(value) FROM my_stream GROUP BY TUMBLE(event_time, INTERVAL '5' SECOND), userID
C'est la fonctionnalité centrale de Flink.
Stateless (Sans Ătat) : (Simple) map(), filter(). L'Ă©vĂ©nement (A) est traitĂ©, puis oubliĂ©. (L'Ă©vĂ©nement B ne sait rien de A).
Stateful (Avec Ătat) : (Complexe) keyBy(), reduce(), COUNT(). Le traitement de l'Ă©vĂ©nement B dĂ©pend de l'Ă©vĂ©nement A.
Exemple (Compteur de Clics)
Flux d'entrée : (userA, clic), (userB, clic), (userA, clic)
Pour compter les clics (COUNT), Flink doit mémoriser (stocker) l'état (le "compte") de userA et userB.
Gestion de l'Ătat (Local)
Flink (contrairement à d'autres) stocke cet état localement sur le TaskManager (1.3) (en RAM ou sur Disque/RocksDB (3.3)).
Avantage : L'accĂšs Ă l'Ă©tat (State.read(), State.write()) est extrĂȘmement rapide (accĂšs RAM/SSD local), au lieu d'un aller-retour rĂ©seau (ex: Redis/DB externe).
Keyed State)L'Ă©tat (State) le plus courant est le Keyed State (Ătat ClĂ©). C'est un "Ă©tat" qui est "scopĂ©" (limitĂ©) Ă une ClĂ© (Key) (ex: UserID).
(Il est obligatoire d'utiliser .keyBy() avant d'utiliser un Keyed State).
Types d'Ătat (API DataStream)
(Utilisé dans un RichFlatMapFunction ou ProcessFunction)
| Type d'Ătat | Description | Usage (Exemple) |
|---|---|---|
ValueState | Stocke 1 seule valeur (du type T). | (COUNT) ValueState (Garder le compte actuel). |
MapState | Stocke un Dictionnaire (Map) (Clé->Valeur). | Garder des attributs (ex: MapState (attribut, valeur)). |
ListState | Stocke une Liste de valeurs (T). | (Buffering) Garder les 5 derniers événements. |
ReducingState | (Optimisé) Stocke 1 valeur (agrégée). | (SUM) Ajoute la valeur (via .add()) (plus efficace que ValueState). |
Le State Backend (Moteur de Stockage d'Ătat) dĂ©finit oĂč (physiquement) le TaskManager (1.3) stocke l'Ătat (3.1) (ex: le ValueState).
1. HashMapStateBackend (Mémoire)
(Défaut) Stocke l'état en RAM (Mémoire) (sur la JVM) du TaskManager.
(Checkpointing (4.1)) : Le Checkpoint (Snapshot) est écrit (asynchronement) vers un stockage distant (ex: S3).
- Avantage : ExtrĂȘmement rapide (accĂšs RAM).
- Inconvénient (Taille) : L'état (ex:
COUNTde 1 milliard d'utilisateurs) est limité par la RAM du TaskManager (ex: 32 Go). - Usage : Jobs avec un petit état (ex:
window(5 sec)), ou Dev/Test.
2. RocksDBStateBackend (Disque)
(Recommandé en Production) Stocke l'état dans une base RocksDB (une BDD Clé-Valeur "embarquée" (embed)) sur le disque (SSD) local du TaskManager.
(Checkpointing (4.1)) : Le Checkpoint (Snapshot RocksDB) est écrit (asynchronement) vers un stockage distant (ex: S3).
- Avantage (Taille) : L'Ă©tat peut ĂȘtre immense (TĂ©raoctets, limitĂ© par le disque SSD).
- Inconvénient : Plus lent que la RAM (I/O Disque), nécessite de (dé)sérialiser les objets.
- Usage : Production. Jobs avec un état large (ex:
window(24h)).
ProblĂšme : L'Ătat (3.1) est stockĂ© localement (RAM/Disque) sur le TaskManager. Si le TaskManager (VM) crash, l'Ătat (ex: le COUNT) est perdu.
Solution : Checkpointing (Snapshots Distants)
Le Checkpointing est le mécanisme de Flink (Tolérance aux Pannes) pour garantir Exactly-Once.
C'est un Snapshot (InstantanĂ©), coordonnĂ© (par le JobManager), de l'Ătat (State) de tous les opĂ©rateurs (Tasks) ET de la position (Offset) dans la source (ex: Kafka).
Flux (Checkpoint)
- (Toutes les 60 sec) Le JobManager (1.2) lance un Checkpoint (via "Barriers" (4.2)).
- (Sur les TaskManagers) Tous les opĂ©rateurs "snapshot" (sauvegardent) leur Ătat (State) (ex: RocksDB) vers un stockage distant durable (ex: S3, HDFS, ADLS).
- (Source) Le connecteur Kafka "snapshot" son Offset (ex: "Partition 1, Offset 5000").
- Les TaskManagers confirment (ACK) au JobManager.
- Le Checkpoint (ex:
#50) est "Complet".
Flux (Reprise sur Panne)
- (Crash) TaskManager 3 meurt.
- JobManager (HA) détecte le crash.
- JobManager redémarre le Job (entier) depuis le dernier Checkpoint complet (
#50). - Les nouveaux TaskManagers restaurent leur Ătat (State) depuis S3 (RocksDB).
- Le connecteur Kafka recommence Ă lire Ă l'Offset 5000.
- Résultat : Aucune donnée n'est perdue (Exactly-Once).
ProblĂšme : Comment Flink prend-il un "Snapshot" (instantanĂ©) cohĂ©rent d'un systĂšme distribuĂ© (des milliers de threads) sans arrĂȘter (pause) le flux de donnĂ©es ?
Solution : L'algorithme (modifié) de Chandy-Lamport, qui utilise des "Checkpoint Barriers" (BarriÚres).
Flux (Barrier)
- Le JobManager (Checkpoint Coordinator) injecte une "BarriĂšre" (ex:
Barrier #50) (un message spécial) dans les Sources (ex: Kafka). - Cette BarriÚre "descend" (circule) dans le DAG (flux) avec les données (ex: entre l'événement A et B).
- (Opérateur 1 - ex: Map) :
- Il traite les événements (A).
- Il reçoit
Barrier #50. - Il "s'aligne" (align) (si plusieurs inputs).
- Il "snapshot" (sauvegarde) son propre Ătat (State) vers S3 (asynchrone).
- Il transmet (forward)
Barrier #50à l'opérateur suivant. - Il continue de traiter les événements (B).
- (Opérateur 2 - ex: Reduce) : (Reçoit Barrier #50, snapshot son état, transmet...).
- (Sink - Destination) : Reçoit
Barrier #50et notifie le JobManager (ACK).
Résultat : Le flux n'est jamais stoppé ("low-overhead"). Le "Snapshot" (Checkpoint) est asynchrone et distribué.
Les deux sont des "Snapshots" de l'état (State) du Job, mais avec des objectifs différents.
| CritĂšre | Checkpoint (Point de ContrĂŽle) | Savepoint (Point de Sauvegarde) |
|---|---|---|
| Objectif | Tolérance aux Pannes (HA). (Récupération *automatique* aprÚs crash). | Maintenance (Opérationnel). (Mise à jour *manuelle* du Job). |
| Déclenchement | Automatique (par le JobManager, ex: toutes les 60s). | Manuel (par l'Admin, via flink savepoint ...). |
| Propriété | Possédé par Flink (le Job). | Possédé par l'Utilisateur (l'Admin). |
| Nettoyage | Automatique (Flink supprime les anciens). | Manuel (L'utilisateur doit le supprimer). |
Cas d'Usage (Savepoint)
ProblĂšme : Je dois mettre Ă jour le code (.jar) de mon Job Flink (en production) sans perdre l'Ătat (State) (le COUNT).
Workflow (Mise Ă jour)
# 1. (Admin) CrĂ©er un 'Savepoint' (Snapshot) du Job (en cours) $ flink savepoint [JobID] /path/to/savepoints/ # 2. (Admin) ArrĂȘter l'ancien Job (v1) $ flink cancel [JobID] # 3. (Admin) DĂ©ployer le nouveau code (v2.jar) $ flink run -s /path/to/savepoints/savepoint-1234 ... new-job-v2.jar # (Flink (v2) dĂ©marre et restaure # automatiquement son Ătat (State) # depuis le 'savepoint-1234')
Dans le streaming, "le temps" est un concept complexe. Flink gĂšre 3 notions du temps.
| Type de Temps | Description | Usage |
|---|---|---|
| Processing Time (Temps de Traitement) | L'heure de la machine (locale) qui exĂ©cute l'opĂ©rateur (le TaskManager). | Simple, mais incorrect (non-dĂ©terministe). (Ne pas utiliser pour les fenĂȘtres). |
| Ingestion Time (Temps d'Ingestion) | L'heure oĂč l'Ă©vĂ©nement entre dans Flink (Ă la Source). | Compromis (meilleur que Processing). |
| Event Time (Temps de l'ĂvĂ©nement) | (RecommandĂ©) L'heure oĂč l'Ă©vĂ©nement a Ă©tĂ© créé (produit). (Timestamp embarquĂ© dans l'Ă©vĂ©nement). | Correct (DĂ©terministe). Permet de gĂ©rer les donnĂ©es en retard (late) (via Watermarks). |
Exemple (Le ProblĂšme)
Un capteur IoT (iPhone) dans un ascenseur (pas de réseau) envoie un événement "Clic" (créé à 10:00:00).
L'iPhone sort de l'ascenseur (réseau) à 10:05:00.
Flink (TaskManager) le traite (reçoit) à 10:05:01.
- Event Time : 10:00:00 (Correct)
- Ingestion Time : 10:05:00
- Processing Time : 10:05:01
Si vous faites un window("10:00-10:01"), seul l'Event Time placera (correctement) cet Ă©vĂ©nement dans la bonne fenĂȘtre.
ProblĂšme : Si on utilise l'Event Time (5.1) et qu'on crĂ©e une fenĂȘtre window("10:00-10:05"). Quand Flink doit-il "fermer" (calculer) cette fenĂȘtre ? Ă 10:05:00 ?
Non. (Car l'événement de 10:04:00 (en retard) peut arriver à 10:05:10).
Solution : Watermarks (Filigranes)
Un Watermark (Filigrane) est un message (métadonnée) injecté dans le flux, qui dit : "Je (Flink) suis certain qu'il n'y aura plus d'événements (Event Time) avant ce timestamp."
Flux d'entrée :
(Event: 10:00:02)
(Event: 10:00:03)
(Event: 10:00:01) <-- (Désordre)
(Event: 10:00:05)
(Event: 10:00:04)
(Watermark: 10:00:00) <-- "Je suis sûr à 100%
qu'il n'y aura plus
d'événements < 10:00:00"
(Le JobManager "ferme" la fenĂȘtre 09:55-10:00)
(Event: 10:00:07)
(Watermark: 10:00:05) <-- "Je suis sûr..."
(Le JobManager "ferme" la fenĂȘtre 10:00-10:05)
Bounded Out-of-Orderness (Désordre Limité)
C'est la stratégie de génération de Watermark la plus courante (WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(10))).
Traduction : "Générer des Watermarks basés sur le timestamp max vu (maxTs), moins 10 secondes (maxTs - 10s)". (TolÚre 10 sec de retard).
Une "FenĂȘtre" (Window) est un "bucket" (seau) de temps (basĂ© sur Event Time ou Processing Time) utilisĂ© pour agrĂ©ger (SUM, COUNT) un flux infini.
1. Tumbling Window (FenĂȘtre Tombante)
Taille fixe, non-chevauchante.
Exemple : window(TumblingEventTimeWindows.of(Time.seconds(5)))
[-- FenĂȘtre 1 --] [-- FenĂȘtre 2 --] [-- FenĂȘtre 3 --] (10:00:00 - 10:00:05) (10:00:05 - 10:00:10) (10:00:10 - 10:00:15)
Usage : "Combien de clics toutes les 5 secondes ?".
2. Sliding Window (FenĂȘtre Glissante)
Taille fixe, chevauchante.
Exemple : window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5))) (Taille 10 sec, Glisse (slide) de 5 sec).
[---- FenĂȘtre 1 (10s) ----]
(10:00:00 - 10:00:10)
[---- FenĂȘtre 2 (10s) ----]
(10:00:05 - 10:00:15)
[---- FenĂȘtre 3 (10s) ----]
(10:00:10 - 10:00:20)
Usage : "Quelle est la moyenne mobile (sur 10s) des ventes, mise Ă jour toutes les 5s ?".
3. Session Window (FenĂȘtre de Session)
Taille variable, basée sur l'inactivité.
Exemple : window(EventTimeSessionWindows.withGap(Time.minutes(30)))
Flux :
- (User A, Clic 1) (10:00) -> Ouvre FenĂȘtre 1
- (User A, Clic 2) (10:10) -> (Gap < 30m) -> Ajoute Ă FenĂȘtre 1
- (User A, Clic 3) (10:50) -> (Gap > 30m) -> Ferme FenĂȘtre 1. Ouvre FenĂȘtre 2.
Usage : "Calculer la durée de session (activité) par utilisateur."
La DataStream API est l'API "cĆur" (impĂ©rative) de Flink. Une application Flink est un graphe (DAG) de DataStream.
(Java)
// 1. Obtenir l'environnement
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 2. Créer une Source (ex: Kafka)
DataStream<String> kafkaStream = env.fromSource(kafkaSource, ...);
// 3. Transformations (keyBy, window, etc.)
DataStream<WordCount> resultStream = kafkaStream
.flatMap(...)
.keyBy(...)
.window(...)
.reduce(...);
// 4. Définir un Sink (Destination)
resultStream.sinkTo(elasticsearchSink);
// 5. Exécuter le Job (Lazy)
env.execute("Mon Job Flink");
| Transformation | Description | Type (Shuffle) |
|---|---|---|
map | (1-to-1) Transforme 1 élément (ex: String -> Int). | Narrow (Pas de shuffle) |
filter | (1-to-0/1) Garde/Jette l'élément (return bool). | Narrow |
flatMap | (1-to-N) Transforme 1 élément en 0, 1, ou N éléments (ex: 1 ligne -> N mots). | Narrow |
keyBy | (Partitionnement) Regroupe le flux par "ClĂ©" (ex: userID). (Requis pour l'Ătat (State)). | Wide (Shuffle) |
window | (AprĂšs keyBy) Applique une fenĂȘtre (Tumbling, Sliding...). | Narrow |
reduce / aggregate | (AprĂšs window) AgrĂšge (SUM, COUNT) les Ă©lĂ©ments de la fenĂȘtre. | Narrow |
Les Connecteurs sont le "pont" (Source ou Sink) entre Flink et les systĂšmes externes.
- Source (Lecture) : Apache Kafka, Apache Pulsar, RabbitMQ, Kinesis, Fichiers (S3, HDFS).
- Sink (Ăcriture) : Apache Kafka, Elasticsearch, JDBC (SQL DBs), Cassandra, Redis, Fichiers (S3, HDFS).
C'est l'API déclarative (haut niveau) de Flink pour le streaming et le batch (similaire à Spark SQL).
Flink SQL
Permet d'écrire du SQL standard (Analytique) sur des flux (streams).
-- (Exemple: Agréger un flux Kafka (clicks)
-- par fenĂȘtre (Tumbling) de 10 secondes)
CREATE TABLE clicks (
`user_id` BIGINT,
`event_time` TIMESTAMP(3) METADATA FROM 'timestamp',
WATERMARK FOR `event_time` AS `event_time` - INTERVAL '5' SECOND
) WITH (
'connector' = 'kafka',
'topic' = 'clicks',
...
);
SELECT
window_start,
window_end,
COUNT(DISTINCT user_id)
FROM
TABLE(
TUMBLE(TABLE clicks, DESCRIPTOR(event_time), INTERVAL '10' SECONDS)
)
GROUP BY
window_start, window_end;
Table API (Code)
L'équivalent "programmatique" (Java/Python) de Flink SQL. (Similaire à PySpark DataFrames).
(Java)
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
// (Convertir le DataStream en Table)
Table clicksTable = tEnv.fromDataStream(stream, $("user_id"), $("event_time").rowtime());
// (Appliquer les transformations)
Table resultTable = clicksTable
.window(Tumble.over(lit(10).seconds()).on($("event_time")).as("w"))
.groupBy($("w"), $("user_id"))
.select($("user_id"), $("w").end(), $("user_id").count().as("cnt"));
FlinkCEP est une librairie (module) de Flink pour la DĂ©tection d'ĂvĂ©nements Complexes (Complex Event Processing).
Objectif : Détecter des séquences (patterns) dans un flux d'événements.
Exemple (Détection de Fraude)
Pattern (ModĂšle) : "Je cherche un utilisateur (keyBy(userID)) qui fait :"
- (Ătape A) Une "Petite Transaction" (
amount < 10) - suivie par (
.next()) - (Ătape B) Une "Grosse Transaction" (
amount > 1000) - dans les (
.within()) 10 secondes.
(Java)
Pattern<Event, ?> fraudPattern = Pattern.<Event>begin("etape_A")
.where(evt -> evt.getAmount() < 10)
.next("etape_B")
.where(evt -> evt.getAmount() > 1000)
.within(Time.seconds(10));
PatternStream<Event> patternStream = CEP.pattern(inputStream.keyBy(...), fraudPattern);
DataStream<Alert> alerts = patternStream.select(
(Map<String, List<Event>> pattern) -> {
// (Le Pattern a matché !)
return new Alert("Fraude Détectée !");
}
);Site Officiel (Apache)
- Site Web :
https://flink.apache.org/ - Documentation (Stable) :
https://flink.apache.org/docs/stable/ - API (JavaDoc) :
https://flink.apache.org/docs/stable/api/java/
Communauté & Code
- GitHub (Code Source) :
https://github.com/apache/flink - Stack Overflow (Tag) :
https://stackoverflow.com/questions/tagged/apache-flink
Fournisseurs (Flink Managé)
- Amazon Kinesis Data Analytics for Apache Flink (AWS)
- Azure HDInsight (Microsoft)
- Ververica Platform (Par les créateurs originaux de Flink)
