đ Kafka Streams â Stream Processing Natif
Guide complet IDEO-Lab sur la librairie de streaming (KStream, KTable, Dualité).
Concept : Librairie (Client)
Librairie Java/Scala (pas un cluster). Embarquée.
Kafka Streams Librairie JavaArchitecture (Topologie)
Topologie (DAG), Processeurs (Source, Sink), TĂąches.
Topologie Processeurvs. Flink / Spark Streaming
Librairie (Streams) vs Moteur (Flink/Spark).
Flink Spark StreamingConcept : Dualité Stream-Table
Le concept fondamental (Changements vs Ătat).
Dualité Stream-TableAPI : KStream (Flux)
Un "Stream" (INSERT). (Historique, faits immuables).
API : KTable (Tableau)
Une "Table" (UPSERT). (Ătat actuel agrĂ©gĂ©).
Stateless : map / flatMap
Transformation 1:1 (map) ou 1:N (flatMap).
Stateless : filter / branch
Filtrer (filter) ou Router (branch).
Stateless : Sinks (to / foreach)
Ăcrire (to) ou Agir (foreach).
Stateful : groupByKey / groupBy
Regrouper (Pré-requis pour l'agrégation).
groupByKey groupByStateful : count / reduce
Agréger (Compter) un KStream -> KTable.
Stateful : aggregate
L'agrégation la plus flexible (Initializer, Adder, Subtractor).
aggregateConcept : State Store (Ătat)
Le "stockage" local (RAM, RocksDB) de l'agrégation.
State Store RocksDBHA : State Store & Changelog
Tolérance aux pannes (Topic "changelog" Kafka).
Changelog Topic HAQueryable State (Interrogation)
Interroger l'état (State Store) via API REST (IPC).
Queryable State API RESTJoin : KStream-KStream
Jointure (fenĂȘtrĂ©e obligatoire) de deux flux.
KStream-KStream Windowed JoinJoin : KStream-KTable
Le cas "classique" (Enrichissement de flux).
KStream-KTable EnrichissementTemps & FenĂȘtrage
Event Time (Défaut), Watermarks, Tumbling, Sliding...
Event Time WindowingĂcosystĂšme : ksqlDB
L'abstraction SQL par-dessus Kafka Streams.
ksqlDB SQLGarantie : Exactly-Once
Garantie (EOS) via Transactions Kafka.
Exactly-Once (EOS) TransactionsCheat-sheet (Java API)
Exemple (Builder, KStream, KTable).
Cheatsheet JavaQu'est-ce que Kafka Streams ?
Kafka Streams est une librairie (bibliothĂšque) client (Java/Scala) pour construire des applications et des microservices de stream processing (traitement de flux).
Ce que ce n'est PAS (vs Flink/Spark)
Ce n'est PAS un "moteur" ou un "cluster" (comme Flink ou Spark). C'est une simple librairie (.jar) que vous ajoutez Ă votre application (Java/Scala).
Il n'y a pas de "Cluster Streams". L'application que vous écrivez *est* l'application de streaming. La scalabilité est gérée par Kafka (le broker) (en lançant plusieurs instances de votre application).
Caractéristiques Clés
- Léger (Librairie) : Pas de cluster à gérer. S'embarque (embed) dans votre application.
- Scalable (via Kafka) : La scalabilité (parallélisme) est gérée par les Partitions Kafka et le Consumer Group. (Pour scaler, lancez juste une autre instance de votre
.jar). - Kafka-Only : (La grosse limitation) Ne lit que depuis Kafka, et n'écrit que vers Kafka. (Flink/Spark sont agnostiques).
- Stateful (Ătat) : Supporte le streaming "stateful" (comptages, agrĂ©gations) via les State Stores (5.1) (RocksDB).
- Exactly-Once (EOS) : Fournit une sémantique "Exactly-Once" (7.2).
1. Topologie (Topology) - Le Plan Logique (DAG)
Une "Topologie" (Topology) est le DAG (Graphe) logique de votre application Streams. C'est l'équivalent du Pipeline (Beam) ou du JobGraph (Flink).
Il est composé de Processeurs (Processor Nodes) :
- Source Processor : (DĂ©but) Un nĆud qui lit (consomme) un topic Kafka.
- Stream Processor : (Milieu) Un nĆud qui reçoit 1 Ă©vĂ©nement, le transforme (ex:
map,filter), et l'envoie (forward) au nĆud suivant. - Sink Processor : (Fin) Un nĆud qui reçoit un Ă©vĂ©nement et l'Ă©crit dans un topic Kafka.
(Source: 'topic-A') -> (Processor: 'Filter') -> (Processor: 'Map') -> (Sink: 'topic-B')
2. TĂąches & Threads - Le Plan Physique
Kafka Streams (la librairie) parallélise la Topologie (logique) en utilisant les Partitions Kafka.
- Tùche (Task) : Une "Tùche" est l'unité de parallélisme. C'est une copie (instance) de votre Topologie, assignée à 1 Partition Kafka.
- Thread : Une instance de votre application (le
.jar) peut avoir plusieurs Threads (ex: 4).
Exemple (Scalabilité)
Scénario : Topic Kafka "Input" (10 Partitions). Vous lancez 2 Instances de votre App (.jar) (Instance A, Instance B), chacune avec 5 Threads.
Flux (Kafka Consumer Group) :
- Kafka (Broker) voit 2 membres (A, B) dans le "Consumer Group".
- Kafka assigne 5 Partitions (0-4) Ă l'Instance A.
- Kafka assigne 5 Partitions (5-9) Ă l'Instance B.
- (Instance A) : Kafka Streams crée 5 Tùches (1 par partition) et les distribue sur ses 5 Threads.
- (Instance B) : Kafka Streams crée 5 Tùches...
Résultat : Le "Cluster" (vos 2 .jar) traite les 10 partitions en parallÚle.
| CritĂšre | Kafka Streams (Librairie) | Apache Flink (Framework) | Apache Spark (Streaming) |
|---|---|---|---|
| Type | Librairie (BibliothĂšque) (Java/Scala). | Framework (Moteur) (Cluster). | Framework (Moteur) (Cluster). |
| Usage | Microservices, Applications Temps Réel. | Traitement de flux (Stateful) complexe. | Traitement Batch & Micro-batch. |
| Déploiement | Embarqué (dans votre .jar, K8s, VM). | Complexe (Cluster Flink, YARN, K8s). | Complexe (Cluster Spark, YARN, K8s). |
| Dépendances | Kafka-Only (Source & Sink = Kafka). | Agnostique (Connecteurs : Kafka, Pulsar, S3...). | Agnostique (Connecteurs : Kafka, S3, Delta...). |
| Latence | TrĂšs Faible (millisecondes). | TrĂšs Faible (millisecondes). | ĂlevĂ©e (Micro-batch, secondes). |
| API | KStream/KTable (Java/Scala). | DataStream, Table API (Java/Scala/SQL). | DataFrame, DStream (Scala/Java/Python). |
C'est le concept théorique fondamental de Kafka Streams (et ksqlDB).
Un Stream (Flux) et une Table (Tableau) sont deux vues de la mĂȘme information (dualitĂ©).
Stream (Flux, ĂvĂ©nements)
Un "log" (historique) immuable (INSERT) de "ce qui s'est passé".
(Clé: "Alice", Val: "A cliqué") (Clé: "Bob", Val: "A payé") (Clé: "Alice", Val: "A déconnecté") (Clé: "Bob", Val: "A remboursé")
API : KStream (2.2)
Table (Tableau, Ătat)
Une "vue" (snapshot) agrégée de l'état actuel (UPSERT).
(Clé: "Alice", Val: "A déconnecté") (Clé: "Bob", Val: "A remboursé")
API : KTable (2.3)
La Dualité :
Un Stream peut ĂȘtre agrĂ©gĂ© (count) pour crĂ©er une Table (l'Ă©tat actuel).
Une Table peut ĂȘtre vue comme un Stream (son "changelog" / flux de changements).
KStream (Le Flux)Un KStream représente un flux (stream) d'enregistrements (records) infini et immuable. C'est la vue "Stream" (2.1) de la dualité.
Sémantique (INSERT)
Chaque nouvel enregistrement (ex: (Clé: "A", Val: 1)) est un fait indépendant (un INSERT). Il n'y a pas de relation (de mise à jour) avec les enregistrements précédents ((Clé: "A", Val: 0)).
Usage
UtilisĂ© pour les "Faits" (Facts) : Logs, Clics, Transactions, ĂvĂ©nements IoT. (Tout ce qui est un "historique").
(Java)
// (Crée un KStream (flux) depuis le topic "input-topic")
StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> myStream = builder.stream("input-topic");
// (Transformer (stateless))
KStream<String, String> upperStream = myStream.mapValues(
(value) -> value.toUpperCase()
);
KTable (Le Tableau)Une KTable représente un snapshot (état) d'un flux. C'est la vue "Table" (2.1) de la dualité.
Sémantique (UPSERT)
Une KTable est basée sur la Clé (Key). Chaque nouvel enregistrement (ex: (Clé: "A", Val: 2)) est une mise à jour (UPSERT) de la valeur précédente pour cette clé.
Flux d'entrée (KStream) :
("userA", "Paris")
("userB", "Lyon")
("userA", "Lille") <-- (Mise Ă jour)
Résultat (KTable) :
(Clé: "userA", Val: "Lille")
(Clé: "userB", Val: "Lyon")
Usage
UtilisĂ© pour les "Ătats" (States) : DonnĂ©es de rĂ©fĂ©rence (ex: table "Utilisateurs"), AgrĂ©gations (ex: COUNT).
(Java)
// (Crée une KTable (état) depuis le topic "users-topic")
KTable<String, String> usersTable = builder.table("users-topic");
map, flatMap, selectKeyTransformations (1:1 ou 1:N) qui ne nécessitent pas d'état (State).
(Java API)
KStream<String, String> source = ...;
// --- mapValues (1:1, Valeur) ---
// (Transforme la Valeur, garde la Clé)
KStream<String, Integer> lengths = source.mapValues(
(value) -> value.length()
);
// --- map (1:1, Clé+Valeur) ---
// (Transforme Clé et Valeur -> Nouveau KeyValuePair)
KStream<String, String> mapped = source.map(
(key, value) -> new KeyValuePair<>(key.toUpperCase(), value + "!")
);
// --- flatMap (1:N, Clé+Valeur) ---
// (Ex: "Word Count")
KStream<String, String> words = source.flatMap(
(key, value) -> { // (value = "le chat")
List<KeyValuePair<String, String>> result = new ArrayList<>();
for (String word : value.split(" ")) {
result.add(new KeyValuePair<>(word, "1")); // (("le", 1), ("chat", 1))
}
return result;
}
);
// --- selectKey (Changer la Clé) ---
// (Crucial avant un 'groupByKey' ou 'join')
KStream<NewKey, String> rekeyed = source.selectKey(
(key, value) -> value.split(",")[0] // (Nouvelle clé)
);
filter & branchfilter (Filtrer)
Garde (true) ou jette (false) les enregistrements.
(Java)
KStream<String, String> source = ...;
// (Garder les messages dont la valeur > 100)
KStream<String, String> filtered = source.filter(
(key, value) -> Integer.parseInt(value) > 100
);
branch (Router / Diviser)
Divise 1 KStream en N KStream, basé sur des prédicats (conditions).
(Java)
KStream<String, String> source = ...;
// (Diviser le flux en 'erreurs' et 'autres')
Map<String, KStream<String, String>> branches = source.split(Named.as("B-"))
.branch(
(key, value) -> value.startsWith("ERROR"), // (Prédicat 1)
Branched.as("Erreurs") // (Nom de la branche 1)
)
.defaultBranch( // (Tout le reste)
Branched.as("Autres")
);
// (Récupérer les flux)
KStream<String, String> errorsStream = branches.get("B-Erreurs");
KStream<String, String> othersStream = branches.get("B-Autres");
to, foreach)Les "Sinks" (Destinations) sont les opérations terminales (la fin du DAG).
to (Ăcrire dans Kafka)
La destination la plus courante : écrire le KStream (résultat) dans un autre topic Kafka.
(Java)
KStream<String, String> resultStream = ...;
resultStream.to("output-topic");
// (Avec SerDes (Serialiseur/Désérialiseur) explicites)
resultStream.to("output-topic",
Produced.with(Serdes.String(), Serdes.Long())
);
foreach (Action externe)
Opération terminale pour "faire quelque chose" (Effet de bord) avec chaque enregistrement (ex: appeler une API externe, écrire dans une BDD).
Attention : foreach n'offre pas de garantie "Exactly-Once" (EOS) (7.2) pour l'Ă©criture externe. Si le job crash, l'API externe peut ĂȘtre appelĂ©e 2 fois (At-Least-Once).
(Java)
resultStream.foreach(
(key, value) -> {
// (Appel API externe, écriture BDD...)
myApiClient.send(key, value);
}
);
groupByKey & groupByC'est la premiĂšre Ă©tape d'une agrĂ©gation (count, reduce). Elle re-partitionne (Shuffle) le flux (KStream) pour que tous les enregistrements d'une mĂȘme clĂ© (ex: "userA") arrivent sur la mĂȘme TĂąche/Thread (1.2).
groupByKey()
Utilisé si les données sont déjà "keyées" (clées) correctement. (Ex: un flux (UserID, Clic)).
(Java) KStream<String, String> stream = ...; // (Clé=UserID) // (Groupe par la clé existante) KGroupedStream<String, String> grouped = stream.groupByKey();
groupBy()
Utilisé si vous devez changer la clé (similaire à selectKey + groupByKey).
(Java)
KStream<String, String> stream = ...; // (Clé=null, Val="Alice,Paris")
// (Grouper par la nouvelle clé (Ville))
KGroupedStream<String, String> grouped = stream.groupBy(
(key, value) -> value.split(",")[1] // (Clé = "Paris")
);
count & reduceLes opérations d'agrégation (sur un KGroupedStream) (4.1).
count() (Compter)
L'agrégation la plus simple. Transforme un KStream -> KTable.
(Java)
KStream<String, String> stream = ...;
// (Compter le nombre d'événements par Clé (ex: UserID))
KTable<String, Long> countsTable = stream
.groupByKey()
.count(); // (Retourne une KTable (état))
// Flux (Input KStream) : ("A", "clic"), ("B", "clic"), ("A", "clic")
// Résultat (KTable) :
// ("A", 1)
// ("B", 1)
// ("A", 2)
reduce() (Réduire)
Combine les Ă©vĂ©nements (de mĂȘme clĂ©) en 1 seule valeur (du mĂȘme type).
(Java)
KStream<String, Long> stream = ...; // (Clé=Produit, Val=Vente)
// (Sommer les ventes par Produit)
KTable<String, Long> sumsTable = stream
.groupByKey()
.reduce(
(aggValue, newValue) -> aggValue + newValue // (previous + current)
);
// Flux (Input KStream) : ("A", 10), ("B", 5), ("A", 20)
// Résultat (KTable) :
// ("A", 10)
// ("B", 5)
// ("A", 30) (10 + 20)
aggregateaggregate est l'agrégation la plus flexible (et complexe). Elle est nécessaire si le type de retour est différent du type d'entrée.
Exemple : Calculer la "moyenne" (Input=Long, Output=Double).
Nécessite 3 fonctions :
- Initializer : Crée l'objet d'agrégation (ex:
(Count: 0, Sum: 0)). - Adder (Ajouteur) : (
(agg, new) -> agg) Ajoute une nouvelle valeur (ex:(Count+1, Sum+new)). - Subtractor (Soustracteur) : (Pour KTables) GÚre la "rétraction" (retrait) d'une ancienne valeur.
(Java)
// (Input: KStream<String, Long> )
// (Output: KTable<String, Double> (la moyenne))
// (L'objet d'état)
class AverageState { long count = 0; long sum = 0; double avg = 0.0; }
KTable<String, Double> avgTable = stream
.groupByKey()
.aggregate(
// 1. Initializer
() -> new AverageState(),
// 2. Adder (agg, newValue, agg)
(key, newValue, aggState) -> {
aggState.count++;
aggState.sum += newValue;
aggState.avg = (double) aggState.sum / aggState.count;
return aggState;
},
// (Définit le 'State Store' (5.1))
Materialized.as("store-name").with(...)
)
// (Map finale pour ne garder que la moyenne)
.mapValues(aggState -> aggState.avg);
ProblĂšme : Quand vous faites un .count() (4.2), oĂč Kafka Streams stocke-t-il le "compte" (ex: ("A", 30)) ?
Solution : State Store (Stockage d'Ătat)
Un State Store est une base de données locale (embarquée), gérée par l'instance Kafka Streams (le .jar).
Par défaut, Kafka Streams utilise RocksDB (une BDD Clé-Valeur (K/V) embarquée, écrite en C++, optimisée pour le SSD).
Fonctionnement (Agrégation)
- (Input)
("A", 10) - (
reduce) Kafka Streams lit (GET) la clé "A" dans RocksDB (local) ->null. - (
reduce) Calcule0 + 10. - (
reduce) Ăcrit (PUT)("A", 10)dans RocksDB (local). - (Input)
("A", 20) - (
reduce) Kafka Streams lit (GET) "A" dans RocksDB ->10. - (
reduce) Calcule10 + 20. - (
reduce) Ăcrit (PUT)("A", 30)dans RocksDB.
Avantage : AccÚs à l'état trÚs rapide (RAM/SSD local), pas de latence réseau (vs Redis/DB externe).
ProblÚme : Le State Store (RocksDB) (5.1) est local (sur le disque/SSD de l'instance .jar). Si l'instance (ou le Pod K8s) crash (meurt), le RocksDB local (et tout l'état, ex: le COUNT) est perdu.
Solution (Tolérance aux Pannes) : Le "Changelog Topic"
Kafka Streams utilise Kafka (le Broker) pour assurer la durabilité (HA) de l'état.
Quand vous créez une agrégation (ex: .count()), Kafka Streams crée automatiquement (sur le Broker) :
- Le State Store (RocksDB) (local) (pour les lectures/écritures rapides).
- Un Topic Kafka "Changelog" (distant, répliqué 3x).
Flux (Double Ăcriture)
Input : ("A", 20) -> (Aggregator)
â
âââș 1. Ăcrit (PUT) -> [ RocksDB (Local) ]
â (Ătat: ("A", 30))
â
âââș 2. Ăcrit (PUSH) -> [ Topic "Changelog" (Kafka Broker) ]
(Log: ("A", 30))
Flux (Reprise sur Panne)
- Instance A (
.jar) crash. (RocksDB local est perdu). - Kafka (Broker) ré-équilibre les partitions. Instance B (
.jar) (un autre Pod) récupÚre la Tùche/Partition 1. - Instance B lit (consomme) l'intégralité du Topic "Changelog" (trÚs rapidement).
- Instance B reconstruit le State Store (RocksDB) local (ex: ("A", 30)).
- Instance B reprend le traitement (oĂč A s'est arrĂȘtĂ©).
ProblÚme : L'état (ex: le COUNT des clics) est "piégé" à l'intérieur de l'application Streams (dans RocksDB). Comment une application externe (ex: une API REST) peut-elle lire (GET) cet état (compte) actuel ?
Solution : Queryable State (Ătat Interrogeable)
Kafka Streams permet d'exposer (en lecture seule) le State Store (RocksDB) local de l'application (via IPC ou API REST).
Exemple (API REST pour lire l'état)
(Client HTTP) -> (GET /count/userA)
â
âŒ
[ Votre App (ex: Spring Boot / Ktor) ]
â (ContrĂŽleur REST)
â
ââ (Instance 1) -> [ KafkaStreams (Instance) ]
â â
â â (AccĂšs local)
â â
â âŒ
â [ State Store (RocksDB) ]
â (Contient ClĂ©s A-M)
â
ââ (Instance 2) -> [ KafkaStreams (Instance) ]
â
âŒ
[ State Store (RocksDB) ]
(Contient Clés N-Z)
Flux (RequĂȘte /count/userA) :
- L'API REST (Instance 1) reçoit la requĂȘte.
- Elle interroge son State Store local (RocksDB) : "As-tu la clé 'userA' ?".
- (Cas 1 : OK) Oui. Elle retourne la valeur.
- (Cas 2 : Non) (La clĂ© "userA" est sur l'Instance 2). Kafka Streams (via IPC) redirige (redirect) la requĂȘte vers l'Instance 2 (qui possĂšde cette clĂ©) et renvoie le rĂ©sultat.
Joindre (JOIN) deux flux (KStream).
ProblÚme : Les flux sont "infinis". Si (Clic_A, 10:00) arrive sur Flux 1, combien de temps doit-il "attendre" (en état) l'arrivée de (Impression_A, 10:01) sur Flux 2 ?
Solution : Jointure FenĂȘtrĂ©e (Windowed Join)
Une jointure KStream-KStream est obligatoirement fenĂȘtrĂ©e (JoinWindows).
(Java)
KStream<String, String> streamClicks = ...
KStream<String, String> streamImpressions = ...
// (DĂ©finir la fenĂȘtre de jointure :
// ex: 5 minutes d'écart max entre les 2 événements)
JoinWindows window = JoinWindows.of(Duration.ofMinutes(5));
KStream<String, String> joinedStream = streamClicks.join(
streamImpressions,
(clickValue, impressionValue) -> // (Le 'ValueJoiner')
"Clic: " + clickValue + ", Impression: " + impressionValue,
window
);
C'est le cas d'usage de jointure le plus courant : l'enrichissement de flux.
Objectif : Enrichir un Flux (Stream) (ex: "Clics") avec des Données (Table) (ex: "Profils Utilisateur").
Flux
KStream (Clics) : (userA, "clic_produit_123")
KTable (Users) : (userA, "Paris")
(Java)
KStream<String, String> streamClics = ... // (Clé=userID)
KTable<String, String> tableUsers = ... // (Clé=userID)
// (Pas besoin de fenĂȘtre !
// La KTable représente l'état 'actuel')
KStream<String, String> enrichedStream = streamClics.join(
tableUsers,
(clicValue, userValue) -> // (ValueJoiner)
clicValue + " (Ville: " + userValue + ")"
);
Résultat (KStream)
("userA", "clic_produit_123 (Ville: Paris)")
SĂ©mantique : C'est un "Lookup" (recherche) d'Ă©tat (Stateful). Le flux (KStream) "regarde" (lookup) l'Ă©tat actuel de la table (KTable) au moment oĂč l'Ă©vĂ©nement de flux arrive.
Kafka Streams (comme Flink) est un moteur "Event Time" (Temps de l'ĂvĂ©nement) par dĂ©faut.
Temps (TimestampExtractor)
- Event Time (Défaut) : (Recommandé) Utilise le timestamp embarqué dans l'enregistrement Kafka. Permet de gérer les données en retard (Lateness).
- Processing Time : (Alternative) Utilise l'horloge (locale) de la machine (Thread) qui traite l'événement. (Rapide, mais non-déterministe).
Types de FenĂȘtres (Windows)
(Similaire Ă Flink 5.3)
- Tumbling (Tombante) : Taille fixe, non-chevauchante (ex:
TumblingWindows.of(Duration.ofMinutes(5))). - Sliding (Glissante) : Taille fixe, chevauchante (ex:
SlidingWindows.of(Duration.ofMinutes(5)).advanceBy(Duration.ofMinutes(1))). - Session : Taille variable, basée sur l'inactivité (ex:
SessionWindows.withInactivityGap(Duration.ofMinutes(30))). - Hopping (Sautante) : (Identique Ă Sliding).
ProblĂšme : Kafka Streams (API Java/Scala) est complexe pour les Analystes (qui ne connaissent que le SQL).
Solution : ksqlDB
ksqlDB (par Confluent) est une base de données de streaming (event-streaming database). C'est une abstraction SQL (un "wrapper") par-dessus Kafka Streams.
ksqlDB exécute un serveur (cluster) qui permet à l'utilisateur d'écrire du SQL. ksqlDB traduit ce SQL en une application Kafka Streams (Topologie) et l'exécute.
Exemple (SQL)
-- 1. (ksqlDB) DĂ©finir un STREAM (KStream) sur un topic CREATE STREAM clics ( user_id BIGINT KEY, url VARCHAR ) WITH ( KAFKA_TOPIC='clics_topic', VALUE_FORMAT='JSON' ); -- 2. (ksqlDB) DĂ©finir une TABLE (KTable) (agrĂ©gation) CREATE TABLE clics_par_user AS SELECT user_id, COUNT(*) AS total_clics FROM clics GROUP BY user_id EMIT CHANGES; -- (Ămet les changements de la KTable)
Kafka Streams (depuis Kafka 0.11) fournit une sémantique "Exactly-Once" (Traitement unique garanti).
Le ProblĂšme (At-Least-Once)
Par dĂ©faut (sans EOS), si un job (Read -> Process -> Write) crash aprĂšs le "Write" (Ăcriture) mais avant le "Commit" (de l'offset de lecture), il va retraiter (doublon) le message au redĂ©marrage.
Solution (Kafka Transactions)
En activant processing.guarantee="exactly_once_v2" (ou beta) :
Kafka Streams utilise les Transactions Kafka. Le "Commit" (de l'offset de lecture) ET le "Write" (de l'écriture du résultat) sont envoyés au Broker Kafka comme une seule transaction (atomique).
-- (Flux Transactionnel) 1. BeginTransaction 2. (Ăcrire RĂ©sultat -> Topic B) 3. (Ăcrire Offset (lu) -> Topic A) 4. CommitTransaction -- (Si crash avant "Commit", -- la transaction est "Abort" (annulĂ©e). -- Le "RĂ©sultat" (Ătape 2) n'est -- jamais visible par les consommateurs.)
(Java 8+)
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.kstream.*;
import org.apache.kafka.common.serialization.Serdes;
// 1. Propriétés (Config)
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "mon-app-streams");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
// (Pour EOS)
// props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, "exactly_once_v2");
// 2. Définir la Topologie (Builder)
StreamsBuilder builder = new StreamsBuilder();
// 3. Source (KStream)
KStream<String, String> sourceStream = builder.stream("input-topic");
// 4. Transformations (Stateless)
KStream<String, String> filteredStream = sourceStream
.filter((key, value) -> value.length() > 5);
KStream<String, String> wordsStream = filteredStream
.flatMapValues(value -> Arrays.asList(value.toLowerCase().split(" ")));
// 5. Transformation (Stateful - Aggregation)
KTable<String, Long> wordCounts = wordsStream
.groupBy((key, word) -> word) // (Re-key)
.count(Materialized.as("word-counts-store")); // (Crée KTable)
// 6. Sink (Destination)
wordCounts.toStream().to("output-topic",
Produced.with(Serdes.String(), Serdes.Long())
);
// 7. Construire & Démarrer
Topology topology = builder.build();
KafkaStreams streams = new KafkaStreams(topology, props);
streams.start(); // (Lance les threads)
// (Gestion de l'arrĂȘt)
Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
