Project Oxygen & Ideo-LabIDEO LAB Dashboard 2026

🌊 Kafka Streams – Stream Processing Natif

Guide complet IDEO-Lab sur la librairie de streaming (KStream, KTable, Dualité).

1.1

Concept : Librairie (Client)

Librairie Java/Scala (pas un cluster). Embarquée.

Kafka Streams Librairie Java
1.2

Architecture (Topologie)

Topologie (DAG), Processeurs (Source, Sink), TĂąches.

Topologie Processeur
1.3

vs. Flink / Spark Streaming

Librairie (Streams) vs Moteur (Flink/Spark).

Flink Spark Streaming
2.1

Concept : Dualité Stream-Table

Le concept fondamental (Changements vs État).

Dualité Stream-Table
2.2

API : KStream (Flux)

Un "Stream" (INSERT). (Historique, faits immuables).

KStream Stream
2.3

API : KTable (Tableau)

Une "Table" (UPSERT). (État actuel agrĂ©gĂ©).

KTable Table
3.1

Stateless : map / flatMap

Transformation 1:1 (map) ou 1:N (flatMap).

map flatMap
3.2

Stateless : filter / branch

Filtrer (filter) ou Router (branch).

filter branch
3.3

Stateless : Sinks (to / foreach)

Écrire (to) ou Agir (foreach).

to foreach
4.1

Stateful : groupByKey / groupBy

Regrouper (Pré-requis pour l'agrégation).

groupByKey groupBy
4.2

Stateful : count / reduce

Agréger (Compter) un KStream -> KTable.

count reduce
4.3

Stateful : aggregate

L'agrégation la plus flexible (Initializer, Adder, Subtractor).

aggregate
5.1

Concept : State Store (État)

Le "stockage" local (RAM, RocksDB) de l'agrégation.

State Store RocksDB
5.2

HA : State Store & Changelog

Tolérance aux pannes (Topic "changelog" Kafka).

Changelog Topic HA
5.3

Queryable State (Interrogation)

Interroger l'état (State Store) via API REST (IPC).

Queryable State API REST
6.1

Join : KStream-KStream

Jointure (fenĂȘtrĂ©e obligatoire) de deux flux.

KStream-KStream Windowed Join
6.2

Join : KStream-KTable

Le cas "classique" (Enrichissement de flux).

KStream-KTable Enrichissement
6.3

Temps & FenĂȘtrage

Event Time (Défaut), Watermarks, Tumbling, Sliding...

Event Time Windowing
7.1

Écosystùme : ksqlDB

L'abstraction SQL par-dessus Kafka Streams.

ksqlDB SQL
7.2

Garantie : Exactly-Once

Garantie (EOS) via Transactions Kafka.

Exactly-Once (EOS) Transactions
7.3

Cheat-sheet (Java API)

Exemple (Builder, KStream, KTable).

Cheatsheet Java
1.1 Concept : Librairie (Client) de Stream Processing
Qu'est-ce que Kafka Streams ?

Kafka Streams est une librairie (bibliothĂšque) client (Java/Scala) pour construire des applications et des microservices de stream processing (traitement de flux).

Ce que ce n'est PAS (vs Flink/Spark)

Ce n'est PAS un "moteur" ou un "cluster" (comme Flink ou Spark). C'est une simple librairie (.jar) que vous ajoutez Ă  votre application (Java/Scala).

Il n'y a pas de "Cluster Streams". L'application que vous écrivez *est* l'application de streaming. La scalabilité est gérée par Kafka (le broker) (en lançant plusieurs instances de votre application).

Caractéristiques Clés
  • LĂ©ger (Librairie) : Pas de cluster Ă  gĂ©rer. S'embarque (embed) dans votre application.
  • Scalable (via Kafka) : La scalabilitĂ© (parallĂ©lisme) est gĂ©rĂ©e par les Partitions Kafka et le Consumer Group. (Pour scaler, lancez juste une autre instance de votre .jar).
  • Kafka-Only : (La grosse limitation) Ne lit que depuis Kafka, et n'Ă©crit que vers Kafka. (Flink/Spark sont agnostiques).
  • Stateful (État) : Supporte le streaming "stateful" (comptages, agrĂ©gations) via les State Stores (5.1) (RocksDB).
  • Exactly-Once (EOS) : Fournit une sĂ©mantique "Exactly-Once" (7.2).
1.2 Architecture (Topologie, Processeurs, TĂąches)
1. Topologie (Topology) - Le Plan Logique (DAG)

Une "Topologie" (Topology) est le DAG (Graphe) logique de votre application Streams. C'est l'équivalent du Pipeline (Beam) ou du JobGraph (Flink).

Il est composé de Processeurs (Processor Nodes) :

  • Source Processor : (DĂ©but) Un nƓud qui lit (consomme) un topic Kafka.
  • Stream Processor : (Milieu) Un nƓud qui reçoit 1 Ă©vĂ©nement, le transforme (ex: map, filter), et l'envoie (forward) au nƓud suivant.
  • Sink Processor : (Fin) Un nƓud qui reçoit un Ă©vĂ©nement et l'Ă©crit dans un topic Kafka.
(Source: 'topic-A') -> (Processor: 'Filter') -> (Processor: 'Map') -> (Sink: 'topic-B')
2. TĂąches & Threads - Le Plan Physique

Kafka Streams (la librairie) parallélise la Topologie (logique) en utilisant les Partitions Kafka.

  • TĂąche (Task) : Une "TĂąche" est l'unitĂ© de parallĂ©lisme. C'est une copie (instance) de votre Topologie, assignĂ©e Ă  1 Partition Kafka.
  • Thread : Une instance de votre application (le .jar) peut avoir plusieurs Threads (ex: 4).
Exemple (Scalabilité)

Scénario : Topic Kafka "Input" (10 Partitions). Vous lancez 2 Instances de votre App (.jar) (Instance A, Instance B), chacune avec 5 Threads.

Flux (Kafka Consumer Group) :

  1. Kafka (Broker) voit 2 membres (A, B) dans le "Consumer Group".
  2. Kafka assigne 5 Partitions (0-4) Ă  l'Instance A.
  3. Kafka assigne 5 Partitions (5-9) Ă  l'Instance B.
  4. (Instance A) : Kafka Streams crée 5 Tùches (1 par partition) et les distribue sur ses 5 Threads.
  5. (Instance B) : Kafka Streams crée 5 Tùches...

Résultat : Le "Cluster" (vos 2 .jar) traite les 10 partitions en parallÚle.

2.1 Concept : Dualité Stream-Table

C'est le concept théorique fondamental de Kafka Streams (et ksqlDB).

Un Stream (Flux) et une Table (Tableau) sont deux vues de la mĂȘme information (dualitĂ©).

Stream (Flux, ÉvĂ©nements)

Un "log" (historique) immuable (INSERT) de "ce qui s'est passé".

(Clé: "Alice", Val: "A cliqué")
(Clé: "Bob", Val: "A payé")
(Clé: "Alice", Val: "A déconnecté")
(Clé: "Bob", Val: "A remboursé")

API : KStream (2.2)

Table (Tableau, État)

Une "vue" (snapshot) agrégée de l'état actuel (UPSERT).

(Clé: "Alice", Val: "A déconnecté")
(Clé: "Bob", Val: "A remboursé")

API : KTable (2.3)

La Dualité :
Un Stream peut ĂȘtre agrĂ©gĂ© (count) pour crĂ©er une Table (l'Ă©tat actuel).
Une Table peut ĂȘtre vue comme un Stream (son "changelog" / flux de changements).

2.2 API : KStream (Le Flux)

Un KStream représente un flux (stream) d'enregistrements (records) infini et immuable. C'est la vue "Stream" (2.1) de la dualité.

Sémantique (INSERT)

Chaque nouvel enregistrement (ex: (Clé: "A", Val: 1)) est un fait indépendant (un INSERT). Il n'y a pas de relation (de mise à jour) avec les enregistrements précédents ((Clé: "A", Val: 0)).

Usage

UtilisĂ© pour les "Faits" (Facts) : Logs, Clics, Transactions, ÉvĂ©nements IoT. (Tout ce qui est un "historique").

(Java)
// (Crée un KStream (flux) depuis le topic "input-topic")
StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> myStream = builder.stream("input-topic");

// (Transformer (stateless))
KStream<String, String> upperStream = myStream.mapValues(
    (value) -> value.toUpperCase()
);
2.3 API : KTable (Le Tableau)

Une KTable représente un snapshot (état) d'un flux. C'est la vue "Table" (2.1) de la dualité.

Sémantique (UPSERT)

Une KTable est basée sur la Clé (Key). Chaque nouvel enregistrement (ex: (Clé: "A", Val: 2)) est une mise à jour (UPSERT) de la valeur précédente pour cette clé.

Flux d'entrée (KStream) :
("userA", "Paris")
("userB", "Lyon")
("userA", "Lille") <-- (Mise Ă  jour)

Résultat (KTable) :
(Clé: "userA", Val: "Lille")
(Clé: "userB", Val: "Lyon")
Usage

UtilisĂ© pour les "États" (States) : DonnĂ©es de rĂ©fĂ©rence (ex: table "Utilisateurs"), AgrĂ©gations (ex: COUNT).

(Java)
// (Crée une KTable (état) depuis le topic "users-topic")
KTable<String, String> usersTable = builder.table("users-topic");
3.1 Transformations Stateless : map, flatMap, selectKey

Transformations (1:1 ou 1:N) qui ne nécessitent pas d'état (State).

(Java API)
KStream<String, String> source = ...;

// --- mapValues (1:1, Valeur) ---
// (Transforme la Valeur, garde la Clé)
KStream<String, Integer> lengths = source.mapValues(
    (value) -> value.length()
);

// --- map (1:1, Clé+Valeur) ---
// (Transforme Clé et Valeur -> Nouveau KeyValuePair)
KStream<String, String> mapped = source.map(
    (key, value) -> new KeyValuePair<>(key.toUpperCase(), value + "!")
);

// --- flatMap (1:N, Clé+Valeur) ---
// (Ex: "Word Count")
KStream<String, String> words = source.flatMap(
    (key, value) -> { // (value = "le chat")
        List<KeyValuePair<String, String>> result = new ArrayList<>();
        for (String word : value.split(" ")) {
            result.add(new KeyValuePair<>(word, "1")); // (("le", 1), ("chat", 1))
        }
        return result;
    }
);

// --- selectKey (Changer la Clé) ---
// (Crucial avant un 'groupByKey' ou 'join')
KStream<NewKey, String> rekeyed = source.selectKey(
    (key, value) -> value.split(",")[0] // (Nouvelle clé)
);
3.2 Transformations Stateless : filter & branch
filter (Filtrer)

Garde (true) ou jette (false) les enregistrements.

(Java)
KStream<String, String> source = ...;

// (Garder les messages dont la valeur > 100)
KStream<String, String> filtered = source.filter(
    (key, value) -> Integer.parseInt(value) > 100
);
branch (Router / Diviser)

Divise 1 KStream en N KStream, basé sur des prédicats (conditions).

(Java)
KStream<String, String> source = ...;

// (Diviser le flux en 'erreurs' et 'autres')
Map<String, KStream<String, String>> branches = source.split(Named.as("B-"))
    .branch(
        (key, value) -> value.startsWith("ERROR"), // (Prédicat 1)
        Branched.as("Erreurs") // (Nom de la branche 1)
    )
    .defaultBranch( // (Tout le reste)
        Branched.as("Autres")
    );

// (Récupérer les flux)
KStream<String, String> errorsStream = branches.get("B-Erreurs");
KStream<String, String> othersStream = branches.get("B-Autres");
3.3 Transformations Stateless : Sinks (to, foreach)

Les "Sinks" (Destinations) sont les opérations terminales (la fin du DAG).

to (Écrire dans Kafka)

La destination la plus courante : écrire le KStream (résultat) dans un autre topic Kafka.

(Java)
KStream<String, String> resultStream = ...;

resultStream.to("output-topic");

// (Avec SerDes (Serialiseur/Désérialiseur) explicites)
resultStream.to("output-topic",
    Produced.with(Serdes.String(), Serdes.Long())
);
foreach (Action externe)

Opération terminale pour "faire quelque chose" (Effet de bord) avec chaque enregistrement (ex: appeler une API externe, écrire dans une BDD).

Attention : foreach n'offre pas de garantie "Exactly-Once" (EOS) (7.2) pour l'Ă©criture externe. Si le job crash, l'API externe peut ĂȘtre appelĂ©e 2 fois (At-Least-Once).

(Java)
resultStream.foreach(
    (key, value) -> {
        // (Appel API externe, écriture BDD...)
        myApiClient.send(key, value);
    }
);
4.1 Transformations Stateful : groupByKey & groupBy

C'est la premiĂšre Ă©tape d'une agrĂ©gation (count, reduce). Elle re-partitionne (Shuffle) le flux (KStream) pour que tous les enregistrements d'une mĂȘme clĂ© (ex: "userA") arrivent sur la mĂȘme TĂąche/Thread (1.2).

groupByKey()

Utilisé si les données sont déjà "keyées" (clées) correctement. (Ex: un flux (UserID, Clic)).

(Java)
KStream<String, String> stream = ...; // (Clé=UserID)

// (Groupe par la clé existante)
KGroupedStream<String, String> grouped = stream.groupByKey();
groupBy()

Utilisé si vous devez changer la clé (similaire à selectKey + groupByKey).

(Java)
KStream<String, String> stream = ...; // (Clé=null, Val="Alice,Paris")

// (Grouper par la nouvelle clé (Ville))
KGroupedStream<String, String> grouped = stream.groupBy(
    (key, value) -> value.split(",")[1] // (Clé = "Paris")
);
4.2 Transformations Stateful : count & reduce

Les opérations d'agrégation (sur un KGroupedStream) (4.1).

count() (Compter)

L'agrégation la plus simple. Transforme un KStream -> KTable.

(Java)
KStream<String, String> stream = ...;

// (Compter le nombre d'événements par Clé (ex: UserID))
KTable<String, Long> countsTable = stream
    .groupByKey()
    .count(); // (Retourne une KTable (état))

// Flux (Input KStream) : ("A", "clic"), ("B", "clic"), ("A", "clic")
// Résultat (KTable) :
// ("A", 1)
// ("B", 1)
// ("A", 2)
reduce() (Réduire)

Combine les Ă©vĂ©nements (de mĂȘme clĂ©) en 1 seule valeur (du mĂȘme type).

(Java)
KStream<String, Long> stream = ...; // (Clé=Produit, Val=Vente)

// (Sommer les ventes par Produit)
KTable<String, Long> sumsTable = stream
    .groupByKey()
    .reduce(
        (aggValue, newValue) -> aggValue + newValue // (previous + current)
    );

// Flux (Input KStream) : ("A", 10), ("B", 5), ("A", 20)
// Résultat (KTable) :
// ("A", 10)
// ("B", 5)
// ("A", 30) (10 + 20)
4.3 Transformations Stateful : aggregate

aggregate est l'agrégation la plus flexible (et complexe). Elle est nécessaire si le type de retour est différent du type d'entrée.

Exemple : Calculer la "moyenne" (Input=Long, Output=Double).

Nécessite 3 fonctions :

  • Initializer : CrĂ©e l'objet d'agrĂ©gation (ex: (Count: 0, Sum: 0)).
  • Adder (Ajouteur) : ((agg, new) -> agg) Ajoute une nouvelle valeur (ex: (Count+1, Sum+new)).
  • Subtractor (Soustracteur) : (Pour KTables) GĂšre la "rĂ©traction" (retrait) d'une ancienne valeur.
(Java)
// (Input: KStream<String, Long> )
// (Output: KTable<String, Double> (la moyenne))

// (L'objet d'état)
class AverageState { long count = 0; long sum = 0; double avg = 0.0; }

KTable<String, Double> avgTable = stream
    .groupByKey()
    .aggregate(
        // 1. Initializer
        () -> new AverageState(),
        
        // 2. Adder (agg, newValue, agg)
        (key, newValue, aggState) -> {
            aggState.count++;
            aggState.sum += newValue;
            aggState.avg = (double) aggState.sum / aggState.count;
            return aggState;
        },
        
        // (Définit le 'State Store' (5.1))
        Materialized.as("store-name").with(...)
    )
    // (Map finale pour ne garder que la moyenne)
    .mapValues(aggState -> aggState.avg);
5.1 Concept : State Store (Stockage d'État)

ProblĂšme : Quand vous faites un .count() (4.2), oĂč Kafka Streams stocke-t-il le "compte" (ex: ("A", 30)) ?

Solution : State Store (Stockage d'État)

Un State Store est une base de données locale (embarquée), gérée par l'instance Kafka Streams (le .jar).

Par défaut, Kafka Streams utilise RocksDB (une BDD Clé-Valeur (K/V) embarquée, écrite en C++, optimisée pour le SSD).

Fonctionnement (Agrégation)
  1. (Input) ("A", 10)
  2. (reduce) Kafka Streams lit (GET) la clé "A" dans RocksDB (local) -> null.
  3. (reduce) Calcule 0 + 10.
  4. (reduce) Écrit (PUT) ("A", 10) dans RocksDB (local).
  5. (Input) ("A", 20)
  6. (reduce) Kafka Streams lit (GET) "A" dans RocksDB -> 10.
  7. (reduce) Calcule 10 + 20.
  8. (reduce) Écrit (PUT) ("A", 30) dans RocksDB.

Avantage : AccÚs à l'état trÚs rapide (RAM/SSD local), pas de latence réseau (vs Redis/DB externe).

5.2 HA : State Store & Changelog Topic

ProblÚme : Le State Store (RocksDB) (5.1) est local (sur le disque/SSD de l'instance .jar). Si l'instance (ou le Pod K8s) crash (meurt), le RocksDB local (et tout l'état, ex: le COUNT) est perdu.

Solution (Tolérance aux Pannes) : Le "Changelog Topic"

Kafka Streams utilise Kafka (le Broker) pour assurer la durabilité (HA) de l'état.

Quand vous créez une agrégation (ex: .count()), Kafka Streams crée automatiquement (sur le Broker) :

  1. Le State Store (RocksDB) (local) (pour les lectures/écritures rapides).
  2. Un Topic Kafka "Changelog" (distant, répliqué 3x).
Flux (Double Écriture)
Input : ("A", 20) -> (Aggregator)
   │
   ├─â–ș 1. Écrit (PUT) -> [ RocksDB (Local) ]
   │                     (État: ("A", 30))
   │
   └─â–ș 2. Écrit (PUSH) -> [ Topic "Changelog" (Kafka Broker) ]
                         (Log: ("A", 30))
Flux (Reprise sur Panne)
  1. Instance A (.jar) crash. (RocksDB local est perdu).
  2. Kafka (Broker) ré-équilibre les partitions. Instance B (.jar) (un autre Pod) récupÚre la Tùche/Partition 1.
  3. Instance B lit (consomme) l'intégralité du Topic "Changelog" (trÚs rapidement).
  4. Instance B reconstruit le State Store (RocksDB) local (ex: ("A", 30)).
  5. Instance B reprend le traitement (oĂč A s'est arrĂȘtĂ©).
5.3 Queryable State (Interrogation)

ProblÚme : L'état (ex: le COUNT des clics) est "piégé" à l'intérieur de l'application Streams (dans RocksDB). Comment une application externe (ex: une API REST) peut-elle lire (GET) cet état (compte) actuel ?

Solution : Queryable State (État Interrogeable)

Kafka Streams permet d'exposer (en lecture seule) le State Store (RocksDB) local de l'application (via IPC ou API REST).

Exemple (API REST pour lire l'état)
(Client HTTP) -> (GET /count/userA)
    │
    ▌
[ Votre App (ex: Spring Boot / Ktor) ]
    │ (Contrîleur REST)
    │
    ├─ (Instance 1) -> [ KafkaStreams (Instance) ]
    │    │
    │    │ (Accùs local)
    │    │
    │    â–Œ
    │  [ State Store (RocksDB) ]
    │    (Contient ClĂ©s A-M)
    │
    └─ (Instance 2) -> [ KafkaStreams (Instance) ]
         │
         ▌
       [ State Store (RocksDB) ]
         (Contient Clés N-Z)

Flux (RequĂȘte /count/userA) :

  1. L'API REST (Instance 1) reçoit la requĂȘte.
  2. Elle interroge son State Store local (RocksDB) : "As-tu la clé 'userA' ?".
  3. (Cas 1 : OK) Oui. Elle retourne la valeur.
  4. (Cas 2 : Non) (La clĂ© "userA" est sur l'Instance 2). Kafka Streams (via IPC) redirige (redirect) la requĂȘte vers l'Instance 2 (qui possĂšde cette clĂ©) et renvoie le rĂ©sultat.
6.1 Join : KStream-KStream

Joindre (JOIN) deux flux (KStream).

ProblÚme : Les flux sont "infinis". Si (Clic_A, 10:00) arrive sur Flux 1, combien de temps doit-il "attendre" (en état) l'arrivée de (Impression_A, 10:01) sur Flux 2 ?

Solution : Jointure FenĂȘtrĂ©e (Windowed Join)

Une jointure KStream-KStream est obligatoirement fenĂȘtrĂ©e (JoinWindows).

(Java)
KStream<String, String> streamClicks = ...
KStream<String, String> streamImpressions = ...

// (DĂ©finir la fenĂȘtre de jointure :
//  ex: 5 minutes d'écart max entre les 2 événements)
JoinWindows window = JoinWindows.of(Duration.ofMinutes(5));

KStream<String, String> joinedStream = streamClicks.join(
    streamImpressions,
    (clickValue, impressionValue) -> // (Le 'ValueJoiner')
        "Clic: " + clickValue + ", Impression: " + impressionValue,
    window
);
6.2 Join : KStream-KTable (Enrichissement)

C'est le cas d'usage de jointure le plus courant : l'enrichissement de flux.

Objectif : Enrichir un Flux (Stream) (ex: "Clics") avec des Données (Table) (ex: "Profils Utilisateur").

Flux

KStream (Clics) : (userA, "clic_produit_123")

KTable (Users) : (userA, "Paris")

(Java)
KStream<String, String> streamClics = ... // (Clé=userID)
KTable<String, String> tableUsers = ...  // (Clé=userID)

// (Pas besoin de fenĂȘtre !
//  La KTable représente l'état 'actuel')
KStream<String, String> enrichedStream = streamClics.join(
    tableUsers,
    (clicValue, userValue) -> // (ValueJoiner)
        clicValue + " (Ville: " + userValue + ")"
);
Résultat (KStream)
("userA", "clic_produit_123 (Ville: Paris)")

SĂ©mantique : C'est un "Lookup" (recherche) d'Ă©tat (Stateful). Le flux (KStream) "regarde" (lookup) l'Ă©tat actuel de la table (KTable) au moment oĂč l'Ă©vĂ©nement de flux arrive.

6.3 Temps & FenĂȘtrage

Kafka Streams (comme Flink) est un moteur "Event Time" (Temps de l'ÉvĂ©nement) par dĂ©faut.

Temps (TimestampExtractor)
  • Event Time (DĂ©faut) : (RecommandĂ©) Utilise le timestamp embarquĂ© dans l'enregistrement Kafka. Permet de gĂ©rer les donnĂ©es en retard (Lateness).
  • Processing Time : (Alternative) Utilise l'horloge (locale) de la machine (Thread) qui traite l'Ă©vĂ©nement. (Rapide, mais non-dĂ©terministe).
Types de FenĂȘtres (Windows)

(Similaire Ă  Flink 5.3)

  • Tumbling (Tombante) : Taille fixe, non-chevauchante (ex: TumblingWindows.of(Duration.ofMinutes(5))).
  • Sliding (Glissante) : Taille fixe, chevauchante (ex: SlidingWindows.of(Duration.ofMinutes(5)).advanceBy(Duration.ofMinutes(1))).
  • Session : Taille variable, basĂ©e sur l'inactivitĂ© (ex: SessionWindows.withInactivityGap(Duration.ofMinutes(30))).
  • Hopping (Sautante) : (Identique Ă  Sliding).
7.1 Écosystùme : ksqlDB

ProblĂšme : Kafka Streams (API Java/Scala) est complexe pour les Analystes (qui ne connaissent que le SQL).

Solution : ksqlDB

ksqlDB (par Confluent) est une base de données de streaming (event-streaming database). C'est une abstraction SQL (un "wrapper") par-dessus Kafka Streams.

ksqlDB exécute un serveur (cluster) qui permet à l'utilisateur d'écrire du SQL. ksqlDB traduit ce SQL en une application Kafka Streams (Topologie) et l'exécute.

Exemple (SQL)
-- 1. (ksqlDB) Définir un STREAM (KStream) sur un topic
CREATE STREAM clics (
  user_id BIGINT KEY,
  url VARCHAR
) WITH (
  KAFKA_TOPIC='clics_topic',
  VALUE_FORMAT='JSON'
);

-- 2. (ksqlDB) Définir une TABLE (KTable) (agrégation)
CREATE TABLE clics_par_user AS
SELECT
  user_id,
  COUNT(*) AS total_clics
FROM clics
GROUP BY user_id
EMIT CHANGES; -- (Émet les changements de la KTable)
7.2 Garantie : Exactly-Once (EOS)

Kafka Streams (depuis Kafka 0.11) fournit une sémantique "Exactly-Once" (Traitement unique garanti).

Le ProblĂšme (At-Least-Once)

Par dĂ©faut (sans EOS), si un job (Read -> Process -> Write) crash aprĂšs le "Write" (Écriture) mais avant le "Commit" (de l'offset de lecture), il va retraiter (doublon) le message au redĂ©marrage.

Solution (Kafka Transactions)

En activant processing.guarantee="exactly_once_v2" (ou beta) :

Kafka Streams utilise les Transactions Kafka. Le "Commit" (de l'offset de lecture) ET le "Write" (de l'écriture du résultat) sont envoyés au Broker Kafka comme une seule transaction (atomique).

-- (Flux Transactionnel)
1. BeginTransaction
2. (Écrire RĂ©sultat -> Topic B)
3. (Écrire Offset (lu) -> Topic A)
4. CommitTransaction

-- (Si crash avant "Commit",
--  la transaction est "Abort" (annulée).
--  Le "RĂ©sultat" (Étape 2) n'est
--  jamais visible par les consommateurs.)
7.3 Cheat-sheet (Java API)
(Java 8+)
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.kstream.*;
import org.apache.kafka.common.serialization.Serdes;

// 1. Propriétés (Config)
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "mon-app-streams");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
// (Pour EOS)
// props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, "exactly_once_v2");

// 2. Définir la Topologie (Builder)
StreamsBuilder builder = new StreamsBuilder();

// 3. Source (KStream)
KStream<String, String> sourceStream = builder.stream("input-topic");

// 4. Transformations (Stateless)
KStream<String, String> filteredStream = sourceStream
    .filter((key, value) -> value.length() > 5);
    
KStream<String, String> wordsStream = filteredStream
    .flatMapValues(value -> Arrays.asList(value.toLowerCase().split(" ")));

// 5. Transformation (Stateful - Aggregation)
KTable<String, Long> wordCounts = wordsStream
    .groupBy((key, word) -> word) // (Re-key)
    .count(Materialized.as("word-counts-store")); // (Crée KTable)

// 6. Sink (Destination)
wordCounts.toStream().to("output-topic",
    Produced.with(Serdes.String(), Serdes.Long())
);

// 7. Construire & Démarrer
Topology topology = builder.build();
KafkaStreams streams = new KafkaStreams(topology, props);

streams.start(); // (Lance les threads)

// (Gestion de l'arrĂȘt)
Runtime.getRuntime().addShutdownHook(new Thread(streams::close));