📡 MQTT – Architecture, Brokers, IoT, Sécurité & Real-Time Ingestion
Guide IDEO-Lab : protocole MQTT, ingestion temps réel, IoT industriel, brokers, QoS, sécurité, Python, cloud et exploitation production.
Intro & Positionnement
MQTT en une phrase : publish/subscribe léger pour relier capteurs, gateways, services backend et dashboards temps réel.
IoTM2MPub/SubArchitecture MQTT
Broker central, publishers, subscribers, topics, sessions, retained messages, last will.
BrokerTopicsSessionsProtocole & Paquets
CONNECT, PUBLISH, SUBSCRIBE, PUBACK/PUBREC/PUBREL/PUBCOMP, PINGREQ et DISCONNECT.
MQTT 5Control PacketsWireQoS & Delivery
Choisir QoS 0, 1 ou 2 selon coût, duplication, latence, criticité et idempotence côté consommateur.
QoS 0QoS 1QoS 2Topics & Design
Conventions de nommage, wildcards, namespaces multi-tenant, retained state, anti-patterns.
+#NamespaceBrokers MQTT
Mosquitto, EMQX, HiveMQ, VerneMQ, RabbitMQ plugin, AWS IoT Core, Azure IoT Hub.
MosquittoEMQXHiveMQSécurité
TLS, mTLS, ACL par topic, certificats device, rotation, rate limiting et segmentation réseau.
TLSACLmTLSPython Backend
Paho, asyncio, ingestion vers Django/Celery/Redis, validation payload, backpressure, stockage.
PahoDjangoCeleryApplications
IoT maison, véhicule connecté, industrie, énergie, santé, logistique, observabilité edge.
IIoTFleetTelemetrySparkplug / Industrie
Standardiser l'état machine, la naissance, la mort, les métriques et le contexte industriel.
Sparkplug BSCADAOPC UAMonitoring & Run
Connexions, messages/sec, queue depth, dropped messages, retained store, incidents et runbooks.
MetricsAlertsSRECloud & Scaling
Clusters, bridges, shared subscriptions, gateways, cloud IoT, multi-region, coût et limites.
AWSAzureBridgeMQTT vs Rabbit/Kafka
Quand utiliser MQTT, AMQP/RabbitMQ, Kafka, WebSocket, HTTP streaming ou NATS.
RabbitMQKafkaNATSAnti-patterns
Topics mal conçus, QoS 2 partout, payload non versionné, device_id non authentifié, broker unique.
DesignRisquesProdRoadmap Projet
Plan de réalisation d'une plateforme MQTT complète : MVP, sécurité, ingestion, dashboard, HA.
MVPHADjangoMQTT / IoT Portfolio
Five concrete MQTT and IoT implementation projects: smart metering, energy, building operations, fleet telemetry and reference platform.
Portfolio IoT MQTTDéfinition opérationnelle
MQTT est un protocole de messagerie applicatif, léger, orienté publish/subscribe. Il sert à faire circuler des messages entre des clients qui ne se connaissent pas directement. Les clients publient sur des topics, le broker MQTT reçoit les messages, applique les règles de sécurité, de session et de qualité de service, puis redistribue les messages aux clients abonnés.
MQTT est le protocole idéal pour connecter des capteurs, objets connectés, gateways industrielles, applications mobiles, services backend et dashboards temps réel via un broker Pub/Sub très léger.
La promesse de MQTT
- Découpler les producteurs et les consommateurs de messages.
- Supporter des réseaux instables : edge, mobile, Wi-Fi industriel, 4G/5G, satellite.
- Réduire la bande passante par rapport à des échanges HTTP répétitifs.
- Centraliser la distribution via un broker : filtrage, sécurité, QoS, sessions.
- Faciliter l’ingestion temps réel vers backend, stockage, alerting, supervision ou IA.
MQTT en une image mentale
┌──────────────────────┐
│ MQTT Broker │
│──────────────────────│
│ Auth / ACL / TLS │
│ Sessions │
│ QoS 0 / 1 / 2 │
│ Retained messages │
│ Last Will │
│ Topic routing │
└──────────┬───────────┘
│
┌─────────────────────┼─────────────────────┐
│ │ │
▼ ▼ ▼
┌──────────────┐ ┌────────────────┐ ┌────────────────┐
│ Dashboard │ │ Backend Python │ │ Alerting / ML │
│ temps réel │ │ ingestion │ │ rules engine │
└──────────────┘ └────────────────┘ └────────────────┘
▲
│ subscribe
│
┌───────┴─────────────────────────────────────────────┐
│ Devices / Gateways / Capteurs / PLC / Apps mobiles │
│ publish telemetry, events, state, commands │
└─────────────────────────────────────────────────────┘Où classer MQTT dans le monde Web / Dev ?
Dans une taxonomie de développement web ou backend, MQTT se place dans la famille Messaging / Message Brokers / Event-driven architecture. Techniquement, MQTT est un protocole, mais dans un portail technique il est souvent classé près des brokers qui l’implémentent : Mosquitto, EMQX, HiveMQ, VerneMQ, RabbitMQ MQTT plugin.
| Catégorie | Pertinence | Explication |
|---|---|---|
| Message Brokers | Très forte | MQTT s’utilise presque toujours avec un broker central. |
| Réseau & Protocoles | Forte | MQTT est un protocole applicatif au-dessus de TCP, souvent sécurisé par TLS. |
| Data Engineering | Moyenne à forte | Utilisé pour ingestion de télémétrie, événements, mesures capteurs. |
| DevOps & Monitoring | Moyenne | Peut transporter des métriques, états, heartbeats, alertes edge. |
| Frontend | Faible à moyenne | Possible via MQTT over WebSocket, mais ce n’est pas son cœur historique. |
Arbre de classification recommandé
Web / Dev
└── Backend
└── Architecture distribuée
└── Messaging & Event-driven architecture
└── Message Brokers
└── MQTT
├── Mosquitto
├── EMQX
├── HiveMQ
├── VerneMQ
└── RabbitMQ MQTT PluginBadges utiles pour une fiche IDEO-Lab
# Messaging
# Message Broker
# Pub/Sub
# IoT
# Edge Computing
# Real-Time
# Telemetry
# M2M
# MQTT 3.1.1
# MQTT 5
# Lightweight Protocol
# Event-Driven ArchitectureClassement secondaire : Réseau & Protocoles.
Architecture MQTT : producteurs, broker, topics, abonnés
MQTT repose sur un modèle très simple : les clients ne s’appellent pas directement. Ils passent par un broker. Un producteur publie un message sur un topic. Les consommateurs s’abonnent à des topics exacts ou à des motifs avec wildcards.
| Élément | Rôle | Exemple |
|---|---|---|
| Publisher | Client qui envoie un message | Capteur température, gateway PLC, app mobile |
| Broker | Serveur MQTT qui route les messages | Mosquitto, EMQX, HiveMQ, VerneMQ |
| Subscriber | Client qui reçoit les messages d’un topic | Dashboard, worker Python, service alerting |
| Topic | Adresse logique du message | factory/line1/motor7/temperature |
| Payload | Contenu libre du message | JSON, binaire, Protobuf, texte, CBOR |
| QoS | Niveau de garantie de livraison | 0, 1 ou 2 |
Flux publish / subscribe
1. Le device se connecte au broker
CONNECT client_id=device-042 username=gw_paris
2. Le dashboard s’abonne à un topic
SUBSCRIBE factory/line1/+/temperature
3. Le device publie une mesure
PUBLISH factory/line1/motor7/temperature
Payload: {"value": 72.4, "unit": "C", "ts": "2026-04-25T10:00:00Z"}
4. Le broker route le message
Tous les abonnés compatibles reçoivent le message
5. Les consommateurs traitent
- affichage dashboard
- stockage time-series
- alerte si seuil dépassé
- pipeline ML ou prédictionTopologie logique
Topic exact :
factory/line1/motor7/temperature
Wildcard + :
factory/line1/+/temperature
=> reçoit :
factory/line1/motor7/temperature
factory/line1/motor8/temperature
Wildcard # :
factory/line1/#
=> reçoit tout sous factory/line1
Attention :
# est puissant mais dangereux en production
si utilisé trop largement.Pourquoi MQTT est utilisé : contraintes terrain et réponses techniques
| Contrainte terrain | Réponse MQTT | Bénéfice | Point de vigilance |
|---|---|---|---|
| Réseau instable | Connexion persistante, keepalive, reconnexion, session expiry, QoS | Meilleure résilience edge/mobile | Prévoir idempotence côté backend |
| Faible bande passante | En-tête compact, pas de verbosité HTTP à chaque message | Moins de trafic réseau | Payload à concevoir proprement |
| Nombreux devices | Client léger, broker central, topics hiérarchiques | Architecture scalable | Dimensionner sessions, ACL, file descriptors |
| Consommateurs multiples | Fan-out Pub/Sub natif | Un message peut alimenter dashboard, stockage, alerting | Éviter les abonnements trop larges |
| Objets intermittents | Last Will, retained message, session persistante | Gestion propre du statut online/offline | Bien distinguer état courant et événement historique |
| Edge computing | Broker local + bridge vers cloud | Autonomie locale et synchronisation distante | Gérer les doublons, buffers et conflits |
MQTT est très bon quand...
- Les clients sont nombreux, distribués, parfois instables.
- Les messages sont petits ou moyens.
- On veut distribuer une même information vers plusieurs consommateurs.
- On travaille en temps réel ou quasi temps réel.
- On veut remonter télémétrie, état, événements, commandes.
- On veut découpler les devices du backend applicatif.
MQTT est moins adapté quand...
- On doit transférer de gros fichiers.
- On a besoin d’un historique durable façon Kafka.
- On veut faire de l’analytique batch massive directement dans le broker.
- On a besoin d’une API publique request/response classique.
- On ne veut pas gérer de broker ni d’état de connexion.
Cas d’usage concrets
| Domaine | Application MQTT | Exemple de topic |
|---|---|---|
| IoT domestique | Capteurs, domotique, état équipements | home/livingroom/temperature |
| Industrie | Machines, automates, lignes de production | plant/a/line/3/motor/7/vibration |
| Énergie | Compteurs, panneaux solaires, bornes, smart grid | energy/site42/meter17/power |
| Transport | Flottes, GPS, télémétrie véhicule | fleet/truck/984/location |
| Monitoring | Heartbeats, états edge, alarmes locales | edge/gateway/paris-01/status |
| Backend temps réel | Ingestion, worker, alerting, dashboards | events/orders/created |
Architecture industrielle typique
┌─────────────────────────┐
│ Machines / PLC / Sensors│
└────────────┬────────────┘
│ Modbus / OPC-UA / Serial / GPIO
▼
┌─────────────────────────┐
│ Edge Gateway │
│ - normalise les données │
│ - enrichit les messages │
│ - buffer local │
│ - publie MQTT │
└────────────┬────────────┘
│ MQTT/TLS
▼
┌─────────────────────────┐
│ Broker MQTT local/cloud │
└───────┬─────────┬───────┘
│ │
▼ ▼
┌────────────┐ ┌──────────────┐
│ Historian │ │ Dashboard │
│ TimeSeries │ │ Supervision │
└────────────┘ └──────────────┘
│
▼
┌─────────────────────────┐
│ Analytics / IA / Alertes│
└─────────────────────────┘Chiffres utiles, ports, dimensions et ordres de grandeur
1883 : MQTT TCP sans TLS
8883 : MQTT sur TLS
WebSocket : variable selon broker
0 : au plus une fois
1 : au moins une fois
2 : exactement une fois
Reason codes, user properties,
session expiry, message expiry,
topic alias, flow control.
| Dimension | Ordres de grandeur | Impact architectural |
|---|---|---|
| Messages/seconde | 100, 1k, 10k, 100k+ | Influence broker, stockage, workers, backpressure |
| Connexions simultanées | 1k, 10k, 100k, millions selon broker/cloud | Impact mémoire, sockets, file descriptors, clustering |
| Payload | Quelques octets à plusieurs KB | Les gros payloads doivent souvent aller vers stockage objet |
| Keepalive | 30s à 300s typiquement | Compromis détection rapide vs batterie/réseau |
| Topic depth | 3 à 8 niveaux souvent | Plus c’est profond, plus la gouvernance devient importante |
| QoS 2 | Plus coûteux que QoS 0/1 | À réserver aux messages vraiment critiques |
Mini grille de choix QoS
| Type de message | QoS conseillé | Pourquoi |
|---|---|---|
| Température envoyée toutes les 5 secondes | QoS 0 ou 1 | Perdre une mesure peut être acceptable selon contexte |
| Commande d’arrêt machine | QoS 1 ou 2 | Message critique, mais prévoir idempotence |
| Heartbeat gateway | QoS 0 | Un heartbeat suivant arrive bientôt |
| Événement métier unique | QoS 1 | Bon compromis, doublons possibles mais gérables |
| État courant retained | QoS 1 | Utile pour synchroniser les nouveaux abonnés |
MQTT vs HTTP, WebSocket, AMQP, Kafka, CoAP
| Technologie | Famille | Très bon pour | Moins bon pour | Comparaison avec MQTT |
|---|---|---|---|---|
| HTTP/REST | Request/response | API web, CRUD, intégrations simples | Flux temps réel nombreux et fréquents | MQTT est plus léger pour télémétrie fréquente. |
| WebSocket | Canal bidirectionnel | Temps réel navigateur/serveur | Routage Pub/Sub structuré côté broker | MQTT peut passer sur WebSocket pour le navigateur. |
| AMQP | Messaging entreprise | Queues, routing complexe, backend enterprise | Devices très contraints | MQTT est plus léger, AMQP plus riche côté broker. |
| Kafka | Event streaming log | Historique durable, replay, analytics à grande échelle | Objets connectés intermittents directement connectés | MQTT ingère souvent en amont, Kafka stocke/rejoue en aval. |
| CoAP | Protocole IoT REST-like | Objets très contraints, UDP | Architecture Pub/Sub brokerisée classique | MQTT est plus fréquent dans architectures broker Pub/Sub. |
Pattern MQTT + Kafka
Devices / Gateways
│
│ MQTT
▼
MQTT Broker
│
│ bridge / connector / worker
▼
Kafka / Redpanda / Pulsar
│
├── Stream processing
├── Data lake
├── Analytics
└── ML / alertingPattern MQTT + Web Dashboard
Device ──MQTT/TLS──► Broker MQTT
│
├── Worker backend Python
├── DB time-series
└── MQTT over WebSocket
│
▼
Browser UILimites, pièges et anti-patterns MQTT
| Piège | Pourquoi c’est dangereux | Bonne pratique |
|---|---|---|
| Topics non gouvernés | Explosion chaotique des conventions | Définir une nomenclature officielle |
| Payload sans version | Les consommateurs cassent lors des évolutions | Ajouter schema_version dans le payload |
| QoS 2 partout | Coût protocolaire inutile | Utiliser QoS 0/1 par défaut, QoS 2 seulement si justifié |
| Wildcard # en production | Surconsommation, fuite de données, charge broker | Limiter par ACL et par périmètre |
| Broker unique non monitoré | Point de panne central | Monitoring, HA, backup config, clustering si besoin |
| Pas d’idempotence | QoS 1 peut livrer des doublons | Ajouter message_id, timestamp, source_id |
Checklist de conception
Avant de mettre MQTT en production :
[ ] Nommer les topics officiellement
[ ] Définir les ACL par client/groupe
[ ] Activer TLS si réseau non totalement privé
[ ] Choisir QoS par type de message
[ ] Ajouter message_id pour idempotence
[ ] Versionner les payloads
[ ] Monitorer broker, connexions, messages/sec
[ ] Prévoir retained messages pour états courants
[ ] Définir Last Will pour online/offline
[ ] Tester reconnexion, coupures réseau, doublons
[ ] Documenter conventions et exemplesCe que MQTT ne fait pas seul
- Il ne garantit pas automatiquement une sémantique métier “exactement une fois”.
- Il ne remplace pas une base time-series ou un event store.
- Il ne valide pas le schéma du payload par défaut.
- Il ne protège pas sans configuration TLS/auth/ACL.
- Il ne résout pas les problèmes de gouvernance des topics.
URLs de référence et écosystème
Standards & documentation
MQTT official:
https://mqtt.org/
MQTT 5.0 OASIS standard:
https://www.oasis-open.org/standard/mqtt-v5-0-os/
MQTT 5.0 specification:
https://docs.oasis-open.org/mqtt/mqtt/v5.0/mqtt-v5.0.html
MQTT 3.1.1 OASIS / ISO information:
https://www.oasis-open.org/2016/07/19/oasis-mqtt-internet-of-things-standard-now-approved-by-iso-iec-jtc1/
Eclipse Sparkplug:
https://sparkplug.eclipse.org/specification/Brokers MQTT
Eclipse Mosquitto:
https://mosquitto.org/
EMQX:
https://www.emqx.com/
HiveMQ:
https://www.hivemq.com/
VerneMQ:
https://vernemq.com/
RabbitMQ MQTT plugin:
https://www.rabbitmq.com/docs/mqttCloud IoT & intégrations
AWS IoT Core:
https://aws.amazon.com/iot-core/
Azure IoT Hub:
https://azure.microsoft.com/products/iot-hub/
Google Cloud IoT alternatives / partner ecosystem:
https://cloud.google.com/solutions/iot
Node-RED:
https://nodered.org/
Telegraf MQTT Consumer:
https://docs.influxdata.com/telegraf/
Paho MQTT clients:
https://www.eclipse.org/paho/Librairies clientes courantes
| Langage | Librairie | Usage courant |
|---|---|---|
| Python | paho-mqtt | Workers, ingestion, tests, scripts |
| JavaScript | mqtt.js | Node.js, navigateur via WebSocket |
| Java | Eclipse Paho Java | Backend enterprise, Android, gateways |
| Go | paho.mqtt.golang | Services légers, agents, edge |
| C/C++ | Paho C/C++ | Embedded, industriel, devices |
Architecture MQTT : le modèle mental
MQTT repose sur une architecture client / broker. Les clients peuvent être des capteurs, gateways, applications mobiles, services backend, dashboards ou agents industriels. Ils ne communiquent pas directement entre eux : tout passe par un broker MQTT.
Le broker est le cœur logique du système. Il reçoit les publications, vérifie l’identité du client, applique les ACL, gère les sessions, route les messages vers les abonnés, conserve éventuellement des messages retained et publie les messages Last Will lorsqu’un client disparaît brutalement.
| Composant | Rôle | Exemples |
|---|---|---|
| Publisher | Produit un message sur un topic. | Capteur, gateway, mobile app, microservice. |
| Broker | Authentifie, route, applique QoS/ACL, gère sessions et retained. | Mosquitto, EMQX, HiveMQ, VerneMQ, RabbitMQ MQTT plugin. |
| Subscriber | Consomme les messages d’un ou plusieurs topics. | Worker Python, dashboard, moteur d’alerte, connecteur Kafka. |
| Topic | Adresse logique hiérarchique du message. | factory/a1/line/7/motor/temp |
| Payload | Contenu transporté par MQTT. | JSON, binaire, Protobuf, CBOR, texte. |
Diagramme global
┌──────────────────────────────┐
│ MQTT BROKER │
│──────────────────────────────│
│ Authentification │
│ ACL publish / subscribe │
│ Topic routing │
│ QoS state │
│ Session store │
│ Retained messages │
│ Last Will messages │
│ Metrics / logs │
└──────────────┬───────────────┘
│
┌─────────────────────────┼─────────────────────────┐
│ │ │
▼ ▼ ▼
┌──────────────────┐ ┌──────────────────┐ ┌──────────────────┐
│ Time-series DB │ │ Dashboard temps │ │ Alerting / Rules │
│ Influx/Timescale │ │ réel / WebSocket │ │ ML / Automation │
└──────────────────┘ └──────────────────┘ └──────────────────┘
▲
│ subscribe
│
┌─────────┴──────────────────────────────────────────────────────┐
│ Publishers : devices, gateways, PLC, agents, apps, services │
└────────────────────────────────────────────────────────────────┘Séparation des responsabilités
MQTT Broker
├── Transport temps réel
├── Routage Pub/Sub
├── Sessions / QoS / Retained / LWT
└── Sécurité protocolaire
Backend applicatif
├── Validation métier
├── Idempotence
├── Stockage durable
├── Historique
├── Alerting métier
└── API / UI / ReportingLe broker MQTT : centre nerveux de l’architecture
Le broker MQTT est le serveur qui accepte les connexions clientes, gère les topics, contrôle qui a le droit de publier ou de s’abonner, route les messages, maintient les sessions et expose des métriques. Dans une architecture sérieuse, il est considéré comme un composant critique d’infrastructure.
| Fonction broker | Description | Impact architecture |
|---|---|---|
| Connection handling | Accepte TCP/TLS/WebSocket, keepalive, reconnexions. | Dimensionnement sockets, file descriptors, CPU. |
| Authentication | Identifie les clients : login, token, certificat, plugin externe. | Sécurité device/backend. |
| Authorization / ACL | Contrôle publish/subscribe par topic. | Isolation clients, multi-tenant, sécurité industrielle. |
| Routing | Distribue les messages aux abonnés compatibles. | Performance dépendante du nombre d’abonnements. |
| Session store | Conserve subscriptions et messages QoS pour clients offline. | Consomme mémoire/disque selon broker. |
| Retained store | Conserve la dernière valeur connue d’un topic. | Très utile pour états courants. |
| Observability | Expose connexions, débit, erreurs, latence, drops. | Indispensable pour production. |
Broker comme point de contrôle
Client CONNECT
│
▼
┌──────────────────────────────┐
│ Broker MQTT │
├──────────────────────────────┤
│ 1. Accepte ou refuse TLS │
│ 2. Authentifie le client │
│ 3. Charge les ACL │
│ 4. Attache la session │
│ 5. Reçoit publish/subscribe │
│ 6. Route les messages │
│ 7. Journalise / expose stats │
└──────────────────────────────┘Comparaison rapide de brokers
| Broker | Profil | Bon usage |
|---|---|---|
| Mosquitto | Simple, léger, très connu. | POC, edge, petits/moyens déploiements. |
| EMQX | Cluster, haute volumétrie, plugins. | IoT massif, cloud, enterprise. |
| HiveMQ | Enterprise, clustering, observabilité. | Production critique, grandes flottes. |
| VerneMQ | Distribué, Erlang, scalable. | Architecture cluster MQTT. |
| RabbitMQ MQTT plugin | MQTT connecté à l’écosystème RabbitMQ. | Intégration AMQP/backend existant. |
Topics, routing et conventions de nommage
Le topic est l’adresse logique d’un message MQTT. Il est hiérarchique, séparé par des slashs. Le broker ne comprend pas forcément le sens métier du topic, mais il l’utilise pour router les messages vers les abonnements correspondants.
Exemple de topic industriel
factory/fr/paris/site-01/line-03/motor-07/temperature
factory = domaine
fr = pays ou région
paris = zone
site-01 = site industriel
line-03 = ligne de production
motor-07 = équipement
temperature = mesureWildcards MQTT
| Wildcard | Nom | Exemple | Effet |
|---|---|---|---|
| + | Un seul niveau | factory/+/site-01/status | Remplace exactement un niveau. |
| # | Multi-niveaux | factory/fr/# | Capture tout ce qui est sous le préfixe. |
factory/+/temperature.Routage par topic
PUBLISH :
factory/fr/paris/site-01/line-03/motor-07/temperature
SUBSCRIBERS :
1) factory/fr/paris/site-01/line-03/motor-07/temperature
=> reçoit uniquement ce moteur précis
2) factory/fr/paris/site-01/line-03/+/temperature
=> reçoit toutes les températures des équipements de line-03
3) factory/fr/paris/site-01/#
=> reçoit tout le site
4) factory/#
=> reçoit tout le domaine factory
=> dangereux si volumétrie élevéeBonnes conventions
| Règle | Pourquoi |
|---|---|
| Utiliser des noms stables et documentés. | Évite de casser les subscribers. |
| Éviter les espaces et caractères ambigus. | Facilite scripts, ACL, logs, monitoring. |
| Placer le domaine métier au début. | Permet des ACL simples par préfixe. |
| Ne pas mélanger état, commande et événement. | Évite les confusions de sens. |
| Versionner si nécessaire. | Permet migration progressive. |
Exemple de séparation propre
devices/gw-001/status
devices/gw-001/telemetry/temperature
devices/gw-001/telemetry/vibration
devices/gw-001/events/alarm
devices/gw-001/commands/reboot
devices/gw-001/commands/config/updateSessions MQTT : identité, persistance, offline, messages en attente
Une session MQTT représente l’état logique associé à un client : son Client ID, ses abonnements, ses messages QoS en attente et les échanges non acquittés. La gestion de session est essentielle pour les clients intermittents.
| Concept | MQTT 3.1.1 | MQTT 5 | Usage |
|---|---|---|---|
| Client ID | Identité de session | Identité de session | Doit être stable et unique. |
| Clean Session | Booléen | Remplacé par Clean Start + Expiry | Détermine si la session est conservée. |
| Clean Start | Non disponible | Oui | Démarre avec une session propre. |
| Session Expiry | Non disponible | Oui | Durée de conservation de session. |
| Queued messages | Selon session persistante et QoS | Plus fin avec expiry/quotas | Messages conservés pour subscriber offline. |
| Inflight messages | Messages non acquittés | Receive Maximum | Contrôle du débit et de la pression. |
Cycle de vie d’une session
CONNECT client_id="gateway-paris-001"
clean_start=false
session_expiry=86400
│
▼
Broker vérifie si session existante
│
├── Oui :
│ ├── restaure subscriptions
│ ├── conserve messages QoS en attente
│ └── reprend les échanges inflight
│
└── Non :
├── crée nouvelle session
├── attend subscriptions
└── initialise l’état clientProfils de session recommandés
| Client | Session | QoS | Commentaire |
|---|---|---|---|
| Capteur simple | Clean | 0 ou 1 | Reconnexion simple, peu d’état. |
| Gateway industrielle | Persistante | 1 | Client ID stable, Last Will, idempotence. |
| Dashboard web | Clean | 0 ou 1 | Souvent via WebSocket, pas d’historique dans session. |
| Worker backend critique | Persistante | 1 | Doit gérer doublons et erreurs DB. |
| Commande machine | Selon criticité | 1 ou 2 | Prévoir ack applicatif séparé. |
Erreur fréquente
Deux clients avec le même Client ID
│
▼
Le broker déconnecte l’ancien client
│
▼
Symptôme :
- connexions instables
- devices qui se déconnectent
- messages perdus ou sessions écrasées
Solution :
- générer Client ID unique et stable
- contrôler l’onboarding device
- auditer les connexions du brokerRetained messages et Last Will : état courant et détection d’absence
Retained messages
Un message retained est conservé par le broker comme dernière valeur connue d’un topic. Lorsqu’un nouveau subscriber s’abonne, il reçoit immédiatement cette dernière valeur, sans attendre la prochaine publication.
Topic :
devices/gw-034/status
Payload retained :
{
"state": "online",
"firmware": "1.8.2",
"ip": "10.0.4.12",
"ts": "2026-04-25T10:00:00Z"
}| Bon usage retained | Mauvais usage retained |
|---|---|
| Dernier état connu d’un device. | Historique complet de mesures. |
| Configuration active. | Événements métier non rejouables. |
| Dernière valeur d’un capteur pour dashboard. | Queue de messages pour offline subscriber. |
Last Will and Testament
Le Last Will est un message que le client prépare au moment de sa connexion. Si le client disparaît brutalement, le broker publie ce message à sa place. C’est le mécanisme standard pour signaler qu’un device est offline.
CONNECT gateway-paris-001
will_topic = devices/gateway-paris-001/status
will_payload = {"state": "offline", "reason": "connection_lost"}
will_qos = 1
will_retain = true
Après connexion normale :
PUBLISH retained devices/gateway-paris-001/status
{"state": "online"}
Si coupure brutale :
Broker publie automatiquement :
{"state": "offline", "reason": "connection_lost"}Diagramme état online/offline
Gateway démarre
│
├── CONNECT avec Last Will offline
│
├── PUBLISH retained online
│
▼
Device visible comme ONLINE
│
├── Déconnexion propre :
│ DISCONNECT
│ Pas de Last Will
│
└── Crash / réseau coupé :
Broker publie Last Will
Device visible comme OFFLINEFlux de messages : publish, subscribe, QoS et accusés
Flux QoS 0 : fire and forget
Publisher Broker Subscriber
│ │ │
│ PUBLISH QoS 0 │ │
├───────────────────►│ │
│ │ PUBLISH QoS 0 │
│ ├──────────────────────►│
│ │ │
Garantie :
- au plus une fois
- pas d’ack protocolaire
- rapide, léger
- perte possibleFlux QoS 1 : au moins une fois
Publisher Broker
│ │
│ PUBLISH QoS 1 │
├───────────────────►│
│ PUBACK │
◄────────────────────┤
Garantie :
- au moins une fois
- doublons possibles
- nécessite idempotence côté consommateurFlux QoS 2 : exactement une fois protocolaire
Publisher Broker
│ │
│ PUBLISH QoS 2 │
├───────────────────►│
│ PUBREC │
◄────────────────────┤
│ PUBREL │
├───────────────────►│
│ PUBCOMP │
◄────────────────────┤
Garantie :
- échange en 4 étapes
- plus coûteux
- utile seulement si vraiment justifiéPipeline complet d’ingestion
Device
│
│ PUBLISH telemetry QoS 1
▼
Broker MQTT
│
│ route vers subscribers
▼
Ingestion worker
│
├── validate payload
├── check idempotency key
├── enrich metadata
├── write hot state
├── write historical record
└── trigger alert if neededArchitecture type avec Django / Celery / Redis / PostgreSQL
Dans un backend Django, MQTT ne doit pas être mélangé directement aux vues HTTP. Une architecture propre isole l’ingestion MQTT dans un service ou un management command dédié, puis pousse le travail vers Redis/Celery ou vers une file interne.
[MQTT Broker]
│
│ SUBSCRIBE devices/+/telemetry/#
▼
[ingestion_service.py]
│
├── connect MQTT
├── validate topic
├── validate payload schema
├── normalize timestamp
├── enrich with Device/Site metadata
├── build idempotency key
├── push job to Celery / Redis Stream
└── ack / log / retry policy
[Celery Workers]
│
├── write current state to Redis
├── write history to PostgreSQL / TimescaleDB
├── trigger alerting rules
├── notify WebSocket dashboard
└── store failures in admin-visible table
[Django Admin / API]
│
├── devices
├── certificates
├── ACL / topic map
├── ingestion failures
├── replay tools
└── monitoring dashboardDécoupage recommandé des responsabilités
| Module | Responsabilité |
|---|---|
| mqtt_client.py | Connexion, subscriptions, reconnect, callbacks. |
| topic_router.py | Parse les topics et mappe vers handlers. |
| schemas.py | Valide payloads et versions. |
| ingestion_service.py | Normalise, enrichit, construit les jobs. |
| tasks.py | Écrit DB, Redis, alertes, WebSocket. |
| models.py | Devices, mesures, états, erreurs d’ingestion. |
| admin.py | Suivi des devices, erreurs, derniers messages. |
Exemple d’idempotency key
idempotency_key =
sha256(
client_id + "|" +
topic + "|" +
payload.message_id + "|" +
payload.ts
)
But :
- éviter double insertion
- supporter QoS 1
- rejouer sans casser la DB
- tracer les doublonsHaute disponibilité, cluster, bridge et edge architecture
En production, le broker est critique. Selon la volumétrie et le niveau de service, on peut utiliser un broker simple, un cluster MQTT, ou une architecture edge avec broker local et bridge vers un broker central/cloud.
| Architecture | Usage | Avantage | Limite |
|---|---|---|---|
| Broker unique | POC, lab, petit edge. | Simple, rapide à déployer. | Point de panne unique. |
| Broker + backup | Production modérée. | Failover possible. | Reconnexion clients à gérer. |
| Cluster MQTT | Flotte massive, SLA élevé. | Scalabilité, résilience. | Complexité opérationnelle. |
| Edge broker + bridge | Usine, sites distants, réseau instable. | Autonomie locale. | Gestion buffer, doublons, conflits. |
Architecture Edge + Cloud
┌───────────────────────────────┐
│ Site industriel local │
│ │
│ Sensors / PLC / Machines │
│ │ │
│ ▼ │
│ Edge Gateway │
│ │ │
│ ▼ │
│ Local MQTT Broker │
│ │ │
│ ├── Local dashboard │
│ ├── Local rules │
│ └── Bridge MQTT/TLS │
└──────────┼────────────────────┘
│ Internet / VPN / 4G
▼
┌───────────────────────────────┐
│ Central MQTT Broker / Cloud │
│ │ │
│ ├── Data Lake │
│ ├── Time-series DB │
│ ├── Alerting │
│ └── Global dashboard │
└───────────────────────────────┘Bridge MQTT : points à surveiller
- Préfixes de topics entre local et central.
- Éviter les boucles de bridge.
- Limiter les wildcards trop larges.
- Définir la politique de reconnexion et buffer.
- Signer/chiffrer les connexions entre sites.
- Monitorer le lag entre edge et cloud.
Observabilité de l’architecture MQTT
Une architecture MQTT sans métriques est dangereuse : le broker peut sembler fonctionner alors qu’il accumule des sessions, rejette des messages, sature CPU, perd des connexions ou subit un abonnement wildcard trop coûteux.
| Métrique | Pourquoi c’est important | Symptôme possible |
|---|---|---|
| Clients connected | Mesure la flotte active. | Chute brutale = incident réseau/broker. |
| Connect/disconnect rate | Détecte instabilité. | Boucles de reconnexion. |
| Messages in/out per sec | Mesure le débit. | Pic anormal ou perte d’activité. |
| Dropped messages | Indique surcharge ou quotas. | Perte de données. |
| Queued messages | Mesure subscribers offline/lents. | Accumulation mémoire/disque. |
| Auth failures | Sécurité et erreurs de provisioning. | Attaque ou mauvais certificats. |
| Subscription count | Impact routing. | Explosion d’abonnements. |
| Broker CPU/RAM | Santé infrastructure. | Saturation, latence, disconnects. |
Dashboard MQTT recommandé
MQTT Production Dashboard
├── Broker health
│ ├── CPU / RAM / FD usage
│ ├── uptime
│ └── cluster status
│
├── Clients
│ ├── connected clients
│ ├── connect/disconnect rate
│ ├── top clients by messages
│ └── auth failures
│
├── Traffic
│ ├── messages in/sec
│ ├── messages out/sec
│ ├── bytes in/out
│ └── dropped messages
│
├── Sessions
│ ├── persistent sessions
│ ├── queued messages
│ └── inflight messages
│
└── Topics
├── hottest prefixes
├── wildcard subscriptions
└── retained countAlertes utiles
| Alerte | Seuil typique | Action |
|---|---|---|
| Clients connectés chute brutalement | -30% en 5 minutes | Vérifier réseau, TLS, broker, DNS. |
| Auth failures augmentent | Pic anormal | Audit certificats, attaque, provisioning. |
| Queued messages explosent | Hausse continue | Subscriber offline ou trop lent. |
| Dropped messages | > 0 persistant | Analyser quotas, surcharge, backpressure. |
| CPU broker élevé | > 80% persistant | Topics chauds, wildcards, cluster, scaling. |
Pièges d’architecture et mauvaises conceptions fréquentes
| Anti-pattern | Problème | Correction |
|---|---|---|
| Un seul topic global | Impossible de filtrer proprement. | Créer une hiérarchie métier claire. |
| Wildcard # partout | Charge énorme, fuite potentielle de données. | Limiter par ACL et par préfixe. |
| Broker utilisé comme DB | Pas fait pour historique long terme. | Stocker dans DB/time-series/event log. |
| Client ID aléatoire pour device critique | Session non récupérable. | Client ID stable et unique. |
| QoS 2 partout | Surcoût inutile. | QoS par type de message. |
| Pas d’idempotence backend | Doublons en QoS 1. | Message ID, contrainte applicative, déduplication. |
| Pas d’ACL topic | Un client peut lire/publier trop large. | ACL strictes par device, site, rôle. |
| Payload non versionné | Évolutions cassantes. | Ajouter schema_version. |
Checklist architecture avant production
[ ] Broker choisi selon volumétrie réelle
[ ] Topics documentés et gouvernés
[ ] ACL publish/subscribe définies
[ ] TLS activé si réseau non totalement privé
[ ] Client IDs stables pour devices critiques
[ ] Last Will configuré pour gateways/devices
[ ] Retained réservé aux états courants
[ ] QoS choisi par type de message
[ ] Idempotence backend en place
[ ] Payloads versionnés
[ ] Monitoring broker activé
[ ] Alertes connect/disconnect configurées
[ ] Stratégie HA ou backup définie
[ ] Tests de coupure réseau réalisés
[ ] Tests de reconnexion réalisés
[ ] Tests de duplication QoS 1 réalisésArchitecture saine en une phrase
Diagramme anti-chaos
Mauvais :
devices/all/data
└── payloads différents, pas de version, pas d’ACL fine
Bon :
devices/{site_id}/{device_id}/telemetry/{metric}
devices/{site_id}/{device_id}/status
devices/{site_id}/{device_id}/events/{event_type}
devices/{site_id}/{device_id}/commands/{command_name}
Résultat :
- ACL simples
- dashboards filtrables
- ingestion plus robuste
- debugging plus rapide
- évolution plus propreMQTT au niveau protocole
MQTT est un protocole binaire léger. Le client et le broker échangent des Control Packets. Chaque paquet MQTT commence par un Fixed Header, puis peut contenir un Variable Header et un Payload.
Le protocole est conçu pour être compact : le broker n’a pas besoin de comprendre le contenu métier du payload. Il lit les métadonnées MQTT, le topic, le QoS, le flag retain, l’état de session, puis route les messages.
| Couche | Contenu | Exemple |
|---|---|---|
| Fixed Header | Type de paquet, flags, Remaining Length. | PUBLISH, SUBSCRIBE, PINGREQ. |
| Variable Header | Champs dépendants du type de paquet. | Topic name, packet identifier, properties MQTT 5. |
| Payload | Données applicatives ou liste d’abonnements. | JSON capteur, topic filters, credentials. |
Diagramme de paquet MQTT
┌──────────────────────────────────────────────────────────┐
│ MQTT Control Packet │
├──────────────────────────────────────────────────────────┤
│ Fixed Header │
│ - Packet Type │
│ - Flags │
│ - Remaining Length │
├──────────────────────────────────────────────────────────┤
│ Variable Header │
│ - Packet Identifier if required │
│ - Topic Name for PUBLISH │
│ - Properties in MQTT 5 │
├──────────────────────────────────────────────────────────┤
│ Payload │
│ - Application data │
│ - Subscription list │
│ - Will payload │
│ - Auth data │
└──────────────────────────────────────────────────────────┘Cycle complet minimal
Client Broker
│ │
│ CONNECT │
├─────────────────────────────►│
│ CONNACK │
◄─────────────────────────────┤
│ │
│ SUBSCRIBE │
├─────────────────────────────►│
│ SUBACK │
◄─────────────────────────────┤
│ │
│ PUBLISH │
├─────────────────────────────►│
│ │
│ PINGREQ / PINGRESP │
├─────────────────────────────►│
◄─────────────────────────────┤
│ │
│ DISCONNECT │
├─────────────────────────────►│Liste des principaux MQTT Control Packets
| Paquet | Sens | Rôle | Contient souvent | Remarque production |
|---|---|---|---|---|
| CONNECT | Client → Broker | Ouverture de connexion MQTT. | Client ID, keepalive, clean start, auth, will. | Point critique de sécurité et session. |
| CONNACK | Broker → Client | Réponse à CONNECT. | Code de retour, session present, reason code. | Permet de diagnostiquer refus auth/ACL/version. |
| PUBLISH | Client ↔ Broker | Transport d’un message applicatif. | Topic, payload, QoS, retain, packet id si QoS > 0. | Le paquet central de MQTT. |
| PUBACK | Réponse QoS 1 | Accusé de réception QoS 1. | Packet ID, reason code MQTT 5. | Doublons possibles si ack perdu. |
| PUBREC | Réponse QoS 2 | Première étape de réception QoS 2. | Packet ID. | Début du handshake exactement une fois. |
| PUBREL | QoS 2 | Libération après PUBREC. | Packet ID. | Étape intermédiaire QoS 2. |
| PUBCOMP | QoS 2 | Fin du handshake QoS 2. | Packet ID. | Nettoyage état client/broker. |
| SUBSCRIBE | Client → Broker | Demande d’abonnement à des topic filters. | Topic filters, requested QoS. | Soumis aux ACL et quotas. |
| SUBACK | Broker → Client | Réponse à SUBSCRIBE. | QoS accordé ou code d’échec. | Important pour vérifier si l’abonnement est réellement accepté. |
| UNSUBSCRIBE | Client → Broker | Désabonnement. | Liste de topic filters. | À tracer si les subscriptions sont dynamiques. |
| UNSUBACK | Broker → Client | Confirmation de désabonnement. | Packet ID, reason codes MQTT 5. | Utile pour debug client. |
| PINGREQ | Client → Broker | Keepalive. | Aucun payload. | Détecte les connexions mortes. |
| PINGRESP | Broker → Client | Réponse keepalive. | Aucun payload. | Si absent, le client doit reconnecter. |
| DISCONNECT | Client → Broker ou Broker → Client en MQTT 5 | Fermeture propre ou notification de fermeture. | Reason code, session expiry MQTT 5. | Évite Last Will si fermeture volontaire. |
| AUTH | Client ↔ Broker | Authentification étendue MQTT 5. | Méthode, données auth. | Utilisé pour mécanismes avancés. |
CONNECT / CONNACK : ouverture de session MQTT
Le paquet CONNECT démarre la session MQTT. Il transporte les paramètres de session, l’identité du client, le keepalive, les informations d’authentification, et éventuellement le message Last Will.
| Champ | Rôle | Exemple |
|---|---|---|
| Protocol Name / Level | Version MQTT utilisée. | MQTT 3.1.1 ou MQTT 5. |
| Client ID | Identité logique du client. | gateway-paris-001 |
| Clean Start / Clean Session | Indique si la session doit être neuve. | true / false. |
| Keep Alive | Délai maximal sans paquet avant ping. | 30s, 60s, 300s. |
| Username / Password | Authentification simple. | Token, login technique. |
| Will Topic / Payload | Message publié si disparition brutale. | devices/gw-01/status |
| Properties MQTT 5 | Paramètres avancés. | Session Expiry, Receive Maximum, User Properties. |
Handshake de connexion
Client Broker
│ │
│ CONNECT │
│ client_id = gateway-paris-001 │
│ clean_start = false │
│ keepalive = 60 │
│ will_topic = devices/gw-001/status │
│ username / password or certificate │
├──────────────────────────────────────────►│
│ │
│ Authentification │
│ ACL / quotas │
│ session restore │
│ │
│ CONNACK │
│ success / failure │
│ session_present │
│ reason_code │
◄──────────────────────────────────────────┤Codes d’échec typiques
| Symptôme | Cause possible | Action |
|---|---|---|
| Connexion refusée | Mauvais login, certificat, token. | Vérifier auth backend et logs broker. |
| Client déconnecté immédiatement | Client ID déjà utilisé. | Garantir unicité Client ID. |
| Session non restaurée | Clean start ou expiry mal configuré. | Vérifier clean/session expiry. |
| Last Will absent | Will non déclaré ou retain false. | Vérifier CONNECT et retained status. |
PUBLISH : le paquet central de MQTT
PUBLISH transporte le message applicatif. Il contient au minimum un topic et un payload. Selon le QoS, il contient aussi un Packet Identifier. Selon MQTT 5, il peut également contenir des properties comme Message Expiry, Response Topic, Correlation Data, Payload Format Indicator ou User Properties.
| Champ | Rôle | Exemple |
|---|---|---|
| Topic Name | Destination logique. | factory/fr/site-01/line-3/temp |
| QoS | Niveau de garantie. | 0, 1, 2. |
| Retain | Conserver comme dernière valeur connue. | true pour état courant. |
| DUP | Indique une retransmission. | Possible en QoS 1/2. |
| Packet Identifier | Associe publish et ack. | 42. |
| Payload | Données applicatives. | JSON, binaire, Protobuf. |
Exemple de payload propre
Topic:
factory/fr/paris/site-01/line-03/motor-07/telemetry/temperature
Payload:
{
"schema_version": "1.0",
"message_id": "01HX9F4Z7C3X",
"device_id": "motor-07",
"metric": "temperature",
"value": 72.4,
"unit": "C",
"ts": "2026-04-25T10:00:00Z"
}Structure logique PUBLISH
PUBLISH Packet
├── Fixed Header
│ ├── Packet Type = PUBLISH
│ ├── DUP flag
│ ├── QoS level
│ ├── Retain flag
│ └── Remaining Length
│
├── Variable Header
│ ├── Topic Name
│ ├── Packet Identifier if QoS 1 or 2
│ └── Properties if MQTT 5
│
└── Payload
└── Application bytesFlags importants
| Flag | Sens | Attention |
|---|---|---|
| QoS | Détermine la mécanique d’ack. | Plus le QoS est élevé, plus le coût augmente. |
| Retain | Demande au broker de conserver ce message. | À réserver aux états courants. |
| DUP | Message retransmis. | Ne suffit pas pour dédupliquer côté métier. |
QoS 0 / 1 / 2 : les trois garanties MQTT
QoS 0 : au plus une fois
Publisher Broker
│ │
│ PUBLISH QoS 0 │
├─────────────────────────────────►│
│ │
Caractéristiques :
- pas d’accusé protocolaire
- pas de retransmission MQTT
- très rapide
- perte possible
- idéal pour mesures fréquentes non critiquesQoS 1 : au moins une fois
Publisher Broker
│ │
│ PUBLISH QoS 1 packet_id=42 │
├─────────────────────────────────►│
│ PUBACK packet_id=42 │
◄─────────────────────────────────┤
Si PUBACK est perdu :
- le publisher peut renvoyer PUBLISH
- le broker ou le subscriber peut recevoir un doublon
Conséquence :
- le backend doit être idempotentQoS 2 : exactement une fois au niveau protocole
Publisher Broker
│ │
│ PUBLISH QoS 2 packet_id=77 │
├─────────────────────────────────►│
│ PUBREC packet_id=77 │
◄─────────────────────────────────┤
│ PUBREL packet_id=77 │
├─────────────────────────────────►│
│ PUBCOMP packet_id=77 │
◄─────────────────────────────────┤
Caractéristiques :
- handshake en 4 étapes
- plus d’état côté client/broker
- plus coûteux
- utile seulement pour messages critiquesTable de décision QoS
| Type de message | QoS conseillé | Raison |
|---|---|---|
| Température toutes les 5 secondes | 0 ou 1 | Une mesure perdue peut être remplacée par la suivante. |
| Heartbeat device | 0 | Fréquent, léger, non critique individuellement. |
| Événement d’alarme | 1 | Doit arriver, doublon gérable. |
| Commande machine | 1 ou 2 | Critique, mais nécessite ack applicatif. |
| Écriture métier unique | 1 | Bon compromis avec idempotence backend. |
commands/.../ack.SUBSCRIBE / SUBACK : abonnements et topic filters
Le paquet SUBSCRIBE permet à un client de demander au broker de recevoir les messages correspondant à un ou plusieurs topic filters. Le broker répond avec SUBACK, qui indique le QoS réellement accordé ou un échec.
| Élément | Description | Exemple |
|---|---|---|
| Topic Filter | Filtre d’abonnement avec ou sans wildcard. | factory/+/+/temperature |
| Requested QoS | QoS demandé par le subscriber. | 0, 1, 2. |
| Granted QoS | QoS réellement accordé par le broker. | Peut être inférieur. |
| Subscription ID | Identifiant MQTT 5 pour tracer l’abonnement. | Utile observabilité. |
| Shared Subscription | Load balancing entre subscribers. | $share/group/factory/# |
Exemple classique
SUBSCRIBE
topic_filter = factory/fr/paris/site-01/+/temperature
requested_qos = 1
SUBACK
granted_qos = 1
Résultat :
Le client reçoit toutes les températures sous site-01
sur exactement un niveau après site-01.Shared subscriptions
Sans shared subscription :
Broker
├── message A -> worker-1
├── message A -> worker-2
└── message A -> worker-3
Chaque worker reçoit le même message.
Avec shared subscription :
$share/ingestion-workers/factory/+/telemetry
Broker
├── message A -> worker-1
├── message B -> worker-2
├── message C -> worker-3
└── message D -> worker-1
Les messages sont répartis dans le groupe.Usage backend
| Cas | Type d’abonnement | Pourquoi |
|---|---|---|
| Dashboard global | Abonnement normal | Il doit recevoir tous les messages utiles. |
| Workers ingestion scalables | Shared subscription | Répartition de charge. |
| Audit sécurité | Abonnement filtré | Éviter de lire trop large. |
| Debug temporaire | Wildcard limité | Ne jamais laisser # en prod sans contrôle. |
PINGREQ / PINGRESP : keepalive et détection des connexions mortes
MQTT garde une connexion ouverte. Pour éviter les connexions mortes silencieuses, le client annonce une valeur de Keep Alive dans CONNECT. Si aucun autre paquet n’est échangé pendant cette période, le client envoie un PINGREQ. Le broker répond avec PINGRESP.
CONNECT keepalive=60
Pendant 60 secondes :
- si PUBLISH, SUBSCRIBE ou autre paquet circule,
pas besoin de PINGREQ.
Si silence :
Client -> PINGREQ
Broker -> PINGRESP
Si pas de PINGRESP :
Client considère la connexion morte et reconnecte.| Keepalive | Usage | Compromis |
|---|---|---|
| 10-30s | Détection rapide. | Plus coûteux réseau/batterie. |
| 60s | Valeur courante. | Bon compromis général. |
| 300s+ | Devices contraints ou réseau coûteux. | Détection plus lente. |
Diagramme keepalive
Client Broker
│ │
│ CONNECT keepalive=60 │
├──────────────────────────────────────►│
│ CONNACK │
◄──────────────────────────────────────┤
│ │
│ ... aucun trafic pendant 60s ... │
│ │
│ PINGREQ │
├──────────────────────────────────────►│
│ PINGRESP │
◄──────────────────────────────────────┤
│ │
│ Connexion considérée vivante │Keepalive vs Last Will
| Mécanisme | But | Exemple |
|---|---|---|
| Keepalive | Vérifier que la connexion MQTT est vivante. | PINGREQ / PINGRESP. |
| Last Will | Publier un message si le client disparaît brutalement. | {"state":"offline"} |
| Heartbeat applicatif | Vérifier que l’application/device fonctionne réellement. | devices/gw-1/heartbeat |
DISCONNECT : fermeture propre et signalisation d’erreur
DISCONNECT permet de fermer proprement une session MQTT. Lorsqu’un client envoie DISCONNECT, le broker sait que la fermeture est volontaire et ne publie pas le Last Will. En MQTT 5, DISCONNECT peut aussi transporter un reason code et des properties.
| Cas | Effet | Last Will ? |
|---|---|---|
| Client envoie DISCONNECT | Fermeture propre. | Non. |
| Client crash | Broker détecte timeout ou socket close. | Oui, si configuré. |
| Broker ferme pour erreur protocole | Déconnexion forcée. | Selon contexte et version. |
| MQTT 5 server disconnect | Broker peut envoyer un reason code. | Diagnostic plus clair. |
Exemple logique
Cas 1 : arrêt volontaire
Client:
PUBLISH devices/gw-001/status {"state":"shutting_down"} retain=true
DISCONNECT
Broker:
ferme proprement
ne publie pas Last Will
Cas 2 : crash brutal
Client:
disparaît sans DISCONNECT
Broker:
détecte la perte
publie Last Will offlineMQTT 5 DISCONNECT reason codes
| Reason | Sens | Action |
|---|---|---|
| Normal disconnection | Fermeture volontaire. | Aucune erreur. |
| Disconnect with Will Message | Demande de publication du Will. | Cas avancé MQTT 5. |
| Protocol Error | Le client viole le protocole. | Corriger librairie/client. |
| Receive Maximum exceeded | Client/broker saturé en inflight. | Réduire débit ou ajuster flow control. |
| Topic Alias invalid | Alias incorrect. | Corriger gestion MQTT 5. |
| Quota exceeded | Limite broker atteinte. | Vérifier quotas, sessions, rate limits. |
MQTT 5 : les améliorations qui comptent vraiment
| Feature MQTT 5 | Pourquoi c’est utile | Exemple terrain |
|---|---|---|
| Reason Codes | Diagnostic précis. | SUBSCRIBE refusé pour ACL. |
| User Properties | Métadonnées key/value. | tenant_id, trace_id, firmware. |
| Message Expiry | Évite de livrer une donnée périmée. | Commande valable 30 secondes. |
| Session Expiry | Contrôle fin de la persistance session. | Garder une session offline 1h. |
| Topic Alias | Réduit l’overhead sur topics longs. | Topic industriel long remplacé par alias numérique. |
| Receive Maximum | Flow control côté client. | Éviter de saturer un subscriber lent. |
| Maximum Packet Size | Protège broker/client. | Refuser payloads énormes. |
| Response Topic | Pattern request/response au-dessus de MQTT. | Commande device avec réponse dédiée. |
| Correlation Data | Corréler requête et réponse. | Trace commande/ack. |
| Subscription Identifier | Tracer quel abonnement a livré le message. | Debug multi-subscriptions. |
Exemple MQTT 5 : commande avec expiration
PUBLISH
topic = devices/gw-001/commands/reboot
qos = 1
payload = {
"command_id": "cmd-20260425-001",
"action": "reboot"
}
MQTT 5 properties:
message_expiry_interval = 30
response_topic = devices/gw-001/commands/ack
correlation_data = cmd-20260425-001
user_properties:
tenant_id = customer-42
trace_id = trace-abc-789
Effet :
- si la gateway reçoit trop tard, le message est périmé
- la réponse revient sur un topic connu
- le backend corrèle commande et ackMQTT 3.1.1 vs MQTT 5
| Aspect | MQTT 3.1.1 | MQTT 5 |
|---|---|---|
| Diagnostic erreurs | Limité. | Reason codes détaillés. |
| Session persistence | Clean Session booléen. | Clean Start + Session Expiry. |
| Métadonnées | Dans le payload. | User Properties. |
| Expiration message | Non standard. | Message Expiry. |
| Flow control | Moins fin. | Receive Maximum, Maximum Packet Size. |
Debug Wire : lire les échanges MQTT comme un ingénieur backend
Scénario normal
[CONNECT]
client_id=gateway-paris-001
keepalive=60
clean_start=false
will=devices/gateway-paris-001/status
[CONNACK]
success=true
session_present=true
[SUBSCRIBE]
topic=devices/gateway-paris-001/commands/#
qos=1
[SUBACK]
granted_qos=1
[PUBLISH]
topic=devices/gateway-paris-001/status
retain=true
qos=1
payload={"state":"online"}
[PUBACK]
packet_id=10
[PINGREQ]
[PINGRESP]
[DISCONNECT]
reason=normalCe que tu vérifies en premier
| Problème | Paquet à inspecter | Hypothèse |
|---|---|---|
| Client ne se connecte pas | CONNECT / CONNACK | TLS, auth, version, Client ID. |
| Messages non reçus | SUBSCRIBE / SUBACK | ACL, topic filter, QoS refusé. |
| Doublons | PUBLISH / PUBACK | QoS 1, ack perdu, retry. |
| Déconnexions régulières | PINGREQ / PINGRESP | Keepalive, réseau, timeout broker. |
| Device offline non détecté | CONNECT Will + keepalive | LWT absent ou mal configuré. |
Runbook debug MQTT
1. Vérifier la connexion TCP/TLS
- port 1883 / 8883
- certificat
- DNS
- firewall
2. Vérifier CONNECT
- Client ID unique
- keepalive
- clean start / session expiry
- username/password/token
- Last Will
3. Lire CONNACK
- success
- reason code
- session present
4. Vérifier SUBSCRIBE/SUBACK
- topic filter exact
- wildcards correctes
- QoS accordé
- ACL subscribe
5. Vérifier PUBLISH
- topic exact
- QoS
- retain
- payload valide
- packet id si QoS 1/2
6. Vérifier les ACK
- PUBACK pour QoS 1
- PUBREC/PUBREL/PUBCOMP pour QoS 2
7. Vérifier keepalive
- PINGREQ/PINGRESP
- timeouts
- reconnect loops
8. Vérifier backend
- idempotence
- validation payload
- logs ingestion
- erreurs DBCommandes utiles Mosquitto
Subscribe debug :
mosquitto_sub -h broker.example.com -p 8883 \
--cafile ca.crt \
-u user -P password \
-t 'factory/fr/#' -v
Publish test :
mosquitto_pub -h broker.example.com -p 8883 \
--cafile ca.crt \
-u user -P password \
-t 'factory/fr/test/device-1/status' \
-m '{"state":"online"}' \
-q 1 -r
Debug local :
mosquitto_sub -h localhost -t '#' -v-t '#' est utile en debug local, mais dangereux en production si le broker transporte beaucoup de messages ou des données sensibles.QoS MQTT : ce que cela garantit vraiment
Le QoS MQTT définit la garantie de livraison entre deux participants MQTT : un client et le broker, puis le broker et les abonnés. Il ne garantit pas automatiquement que ton traitement applicatif, ton écriture SQL, ton appel API ou ton workflow métier sera exécuté exactement une seule fois.
| QoS | Nom | Garantie MQTT | Coût | Risque principal | Usage typique |
|---|---|---|---|---|---|
| 0 | At most once | Au plus une fois. | Très faible. | Perte possible. | Mesures fréquentes, heartbeat, GPS, température. |
| 1 | At least once | Au moins une fois. | Moyen. | Doublons possibles. | Événements importants, alarmes, états, commandes traçables. |
| 2 | Exactly once | Exactement une fois au niveau MQTT. | Élevé. | Latence, état, complexité. | Messages rares et critiques, si support broker/client solide. |
Diagramme des garanties
QoS 0 : rapide, aucune confirmation
Device ──PUBLISH──► Broker ──PUBLISH──► Subscriber
perte possible si réseau/broker/client indisponible
QoS 1 : confirmation simple, doublon possible
Device ──PUBLISH──► Broker
Device ◄─PUBACK──── Broker
Si PUBACK perdu :
Device peut renvoyer le même message
QoS 2 : handshake complet
Device ──PUBLISH──► Broker
Device ◄─PUBREC──── Broker
Device ──PUBREL───► Broker
Device ◄─PUBCOMP─── BrokerQuestion à poser avant de choisir
1. Est-ce grave si le message est perdu ?
2. Est-ce grave si le message arrive deux fois ?
3. Est-ce grave si le message arrive en retard ?
4. Le consommateur sait-il dédupliquer ?
5. Le message est-il fréquent ou rare ?
6. Le message est-il une mesure, un état, un événement ou une commande ?
7. Le réseau est-il stable ?
8. Le broker et les clients supportent-ils bien QoS 2 ?
9. Faut-il un ack applicatif séparé ?
10. Faut-il stocker l’historique durablement ?QoS 0 : “at most once” — ultra léger, mais perte possible
Avec QoS 0, le client envoie un paquet PUBLISH et n’attend aucun accusé de réception. C’est le mode le plus rapide et le moins coûteux. En contrepartie, si le réseau coupe, si le broker est indisponible ou si le client subscriber n’est pas là, le message peut être perdu.
| Aspect | QoS 0 |
|---|---|
| Garantie | Au plus une fois. |
| Ack MQTT | Aucun. |
| Latence | Très faible. |
| Charge broker | Faible. |
| Doublons | Normalement non liés au QoS, mais possibles côté applicatif si republish. |
| Perte | Possible. |
Bon usage QoS 0
- Température envoyée toutes les 5 ou 10 secondes.
- Position GPS très fréquente.
- Heartbeat à haute fréquence.
- Métriques observabilité non critiques.
- Données remplacées rapidement par une mesure plus récente.
Flux QoS 0
Publisher Broker
│ │
│ PUBLISH QoS 0 │
├─────────────────────────────────►│
│ │
│ Aucun PUBACK │
│ Aucun retry MQTT │
│ │
Conséquence :
- rapide
- simple
- pas de garantie de réception
- pas d’état inflight à maintenirExemple de décision
Flux :
devices/gw-001/telemetry/temperature
Fréquence :
1 message toutes les 5 secondes
Criticité :
faible à moyenne
Historique :
time-series compressée
Choix :
QoS 0 si perte acceptable
QoS 1 si chaque mesure doit être auditéeQoS 1 : “at least once” — fiable, mais doublons possibles
QoS 1 est souvent le meilleur compromis en production. Le publisher envoie PUBLISH, le destinataire répond PUBACK. Si l’accusé n’arrive pas, le message peut être retransmis. Cela améliore la fiabilité, mais introduit une contrainte majeure : le consommateur doit tolérer les doublons.
| Aspect | QoS 1 |
|---|---|
| Garantie | Au moins une fois. |
| Ack MQTT | PUBACK. |
| Latence | Moyenne. |
| Charge broker | Moyenne. |
| Doublons | Possibles. |
| Backend requis | Idempotence ou déduplication. |
Bon usage QoS 1
- Alarme machine.
- Événement métier.
- Changement d’état online/offline.
- Message de configuration important.
- Commande non destructrice mais à tracer.
- Mesures qui doivent être conservées avec audit.
Flux QoS 1
Publisher Broker
│ │
│ PUBLISH packet_id=42 QoS 1 │
├─────────────────────────────────►│
│ │
│ PUBACK packet_id=42 │
◄─────────────────────────────────┤
Cas problématique :
1. Broker reçoit PUBLISH
2. Broker traite le message
3. PUBACK se perd sur le réseau
4. Publisher renvoie PUBLISH avec DUP flag
5. Backend peut revoir le même événementImplication backend
Payload conseillé :
{
"message_id": "01HX9F4Z7C3X",
"device_id": "gw-001",
"sequence": 18422,
"metric": "temperature",
"value": 72.4,
"ts": "2026-04-25T10:00:00Z"
}
Déduplication :
UNIQUE(device_id, sequence)
ou
UNIQUE(message_id)QoS 2 : “exactly once” MQTT — puissant, mais coûteux
QoS 2 utilise un échange en quatre étapes pour éviter la double livraison au niveau MQTT. C’est le niveau de garantie le plus fort du protocole, mais aussi le plus coûteux en latence, état client/broker et complexité opérationnelle.
| Aspect | QoS 2 |
|---|---|
| Garantie | Exactement une fois au niveau MQTT. |
| Échanges | PUBLISH, PUBREC, PUBREL, PUBCOMP. |
| Latence | Plus élevée. |
| Charge broker | Plus forte. |
| État inflight | Plus important. |
| Usage | Messages rares, critiques, bien maîtrisés. |
Flux QoS 2 détaillé
Publisher Broker
│ │
│ PUBLISH packet_id=77 QoS 2 │
├─────────────────────────────────►│
│ │
│ PUBREC packet_id=77 │
◄─────────────────────────────────┤
│ │
│ PUBREL packet_id=77 │
├─────────────────────────────────►│
│ │
│ PUBCOMP packet_id=77 │
◄─────────────────────────────────┤
But :
- éviter double livraison MQTT
- synchroniser l’état publisher/broker
- nettoyer l’état après PUBCOMPQuand utiliser QoS 2 ?
| Cas | QoS 2 justifié ? | Commentaire |
|---|---|---|
| Commande rare et critique | Possible | Mais ajouter ack métier. |
| Télémétrie haute fréquence | Non | Trop coûteux. |
| Alarme sécurité | Parfois | QoS 1 + audit peut suffire. |
| État retained | Rarement | QoS 1 est souvent suffisant. |
| Backend déjà idempotent | Pas forcément | QoS 1 peut être plus performant. |
Matrice de décision QoS par type de flux
| Flux | Fréquence | Criticité | Perte acceptable ? | Doublon acceptable ? | QoS conseillé | Stockage recommandé |
|---|---|---|---|---|---|---|
| Température pièce | 10s - 60s | Faible | Oui | Oui | QoS 0 ou 1 | Time-series compressée. |
| Vibration moteur | 1s - 10s | Moyenne | Selon contexte | Oui si dédup | QoS 0/1 | Time-series + agrégation. |
| Présence online/offline | À changement | Moyenne | Non souhaité | Oui | QoS 1 + retained + LWT | Redis state + historique. |
| Alarme sécurité | Événement | Haute | Non | Oui si idempotent | QoS 1 + ack applicatif | Table events + audit. |
| Commande redémarrage gateway | Rare | Haute | Non | Non sans command_id | QoS 1/2 + command_id | Command journal. |
| Commande arrêt machine | Très rare | Très haute | Non | Non | QoS 1/2 + ack métier fort | Command journal + audit + validation opérateur. |
| Logs firmware | Batch | Faible | Parfois | Oui | QoS 0/1 | Blob/log pipeline. |
| Configuration device | Rare | Haute | Non | Non sans version | QoS 1 + retained si état courant | Config store versionné. |
Arbre de décision simple
Le message peut être perdu ?
├── Oui
│ └── QoS 0 possible
│
└── Non
│
├── Le backend sait dédupliquer ?
│ ├── Oui -> QoS 1 conseillé
│ └── Non -> ajouter idempotence avant production
│
└── Message rare et critique ?
├── Oui -> QoS 1/2 + ack applicatif
└── Non -> QoS 1 suffit souventLecture par coût
Coût approximatif :
QoS 0 = 1 paquet PUBLISH
QoS 1 = PUBLISH + PUBACK
QoS 2 = PUBLISH + PUBREC + PUBREL + PUBCOMP
Plus le QoS monte :
- plus la latence augmente
- plus l’état inflight augmente
- plus le broker travaille
- plus les effets de backlog deviennent sensiblesDuplication : pourquoi elle arrive et comment la traiter
La duplication n’est pas un bug exceptionnel : elle fait partie du modèle de livraison “at least once”. Un message peut être envoyé plusieurs fois si un accusé est perdu, si le client reconnecte, si une session reprend ou si le backend rejoue un buffer.
| Cause | Exemple | Correction |
|---|---|---|
| PUBACK perdu | Publisher renvoie le message QoS 1. | Déduplication par message_id. |
| Reconnect client | Client republie son buffer local. | Sequence number par device. |
| Worker backend crash | Job repris après écriture partielle. | Transaction + unique key. |
| Bridge MQTT | Message reçu local + cloud. | Source_id + bridge_id + message_id. |
| Retry applicatif | HTTP/DB temporairement indisponible. | Outbox / inbox pattern. |
Le flag DUP ne suffit pas
Le flag DUP indique une retransmission MQTT. Il ne doit pas être utilisé comme unique mécanisme de déduplication métier, car la duplication peut venir d’autres couches : buffer local, bridge, worker, retry DB, redémarrage applicatif.
Timeline d’un doublon QoS 1
1. Device publie :
PUBLISH packet_id=42 message_id=A
2. Broker reçoit le message.
3. Broker transmet au subscriber.
4. Subscriber écrit en base.
5. PUBACK ou ack côté chemin retour est perdu.
6. Device retransmet :
PUBLISH packet_id=42 DUP=1 message_id=A
7. Backend revoit le même message.
Sans idempotence :
double insertion
Avec idempotence :
message reconnu comme déjà traitéStratégies anti-doublons
Bonnes clés possibles :
- message_id global dans le payload
- device_id + sequence_number
- device_id + metric_name + timestamp
- command_id pour commandes
- MQTT 5 user property trace_id
Stockage :
- table inbox_mqtt_message
- contrainte unique applicative
- statut processed / failed / ignored
- timestamp first_seen / last_seen
- duplicate_countIdempotence backend : rendre les messages sûrs à retraiter
L’idempotence signifie qu’un même message traité plusieurs fois produit le même état final que s’il avait été traité une seule fois. C’est indispensable avec QoS 1 et très recommandé même avec QoS 2, car les retries peuvent aussi exister hors MQTT.
Modèle conceptuel
Message MQTT
│
▼
Validate schema
│
▼
Build idempotency_key
│
▼
Check inbox table / cache
│
├── already processed
│ └── ignore or update duplicate_count
│
└── new message
├── process in transaction
├── write target table
├── mark as processed
└── commitCandidats de clé d’idempotence
| Clé | Très bon pour | Risque |
|---|---|---|
| message_id | Événements uniques. | Doit être généré correctement. |
| device_id + sequence | Flux device ordonné. | Reset compteur à gérer. |
| device_id + metric + ts | Time-series. | Collision si plusieurs mesures au même timestamp. |
| command_id | Commandes. | Doit être imposé par le backend. |
| trace_id MQTT 5 | Traçabilité distribuée. | Pas toujours disponible en MQTT 3.1.1. |
Exemple Django : table inbox
class MQTTInboxMessage(models.Model):
idempotency_key = models.CharField(max_length=128)
topic = models.CharField(max_length=512)
client_id = models.CharField(max_length=128)
payload_hash = models.CharField(max_length=64)
status = models.CharField(max_length=32, default="pending")
first_seen_at = models.DateTimeField(auto_now_add=True)
last_seen_at = models.DateTimeField(auto_now=True)
duplicate_count = models.IntegerField(default=0)
error_message = models.TextField(blank=True, default="")
class Meta:
constraints = [
models.UniqueConstraint(
fields=["idempotency_key"],
name="uniq_mqtt_inbox_idempotency_key"
)
]Pseudo-code traitement
def handle_mqtt_message(topic, payload, client_id):
data = validate_payload(payload)
key = build_idempotency_key(topic, data, client_id)
with transaction.atomic():
inbox, created = MQTTInboxMessage.objects.get_or_create(
idempotency_key=key,
defaults={
"topic": topic,
"client_id": client_id,
"payload_hash": sha256(payload),
"status": "pending",
}
)
if not created:
inbox.duplicate_count += 1
inbox.save(update_fields=["duplicate_count", "last_seen_at"])
return "duplicate_ignored"
write_business_data(data)
inbox.status = "processed"
inbox.save(update_fields=["status"])
return "processed"idempotency_key, pas un payload ou un topic énorme.Backpressure : quand MQTT reçoit plus vite que le backend ne traite
La backpressure apparaît quand les publishers envoient plus vite que les consumers, workers ou bases de données ne peuvent absorber. MQTT peut bufferiser certains messages selon sessions/QoS, mais ce n’est pas une solution infinie. Il faut une stratégie explicite.
| Symptôme | Cause probable | Action |
|---|---|---|
| Queued messages augmentent | Subscriber offline ou lent. | Scaler consumers, shared subscriptions. |
| CPU broker élevé | Topics chauds, wildcards, trop d’acks. | Réduire wildcards, partitionner flux, cluster. |
| DB saturée | Écritures trop fréquentes. | Batch writes, buffer Redis/Kafka, agrégation. |
| Latence ingestion | Workers insuffisants. | Autoscaling, profiling, queue séparée. |
| Messages expirés | Consommation trop lente. | Prioriser flux critiques, drop low priority. |
Mécanismes utiles
- Receive Maximum en MQTT 5 pour limiter l’inflight.
- Shared subscriptions pour répartir la charge entre workers.
- Redis Streams / Kafka comme buffer durable côté backend.
- Batch database writes pour réduire coût SQL.
- Priorisation entre alarmes critiques et métriques secondaires.
- Message Expiry pour éviter de traiter des commandes périmées.
Architecture avec buffer
MQTT Broker
│
│ shared subscription
▼
Ingestion Workers
│
├── validate
├── deduplicate
└── push to buffer
│
▼
Redis Stream / Kafka
│
▼
Processing Workers
│
├── batch write PostgreSQL
├── update Redis hot state
├── trigger alerts
└── push WebSocket dashboardStratégie de dégradation
Si surcharge :
1. protéger les alarmes critiques
2. ralentir ou rejeter métriques non critiques
3. augmenter workers consommateurs
4. batcher les écritures DB
5. réduire QoS sur flux non critiques
6. augmenter capacité broker si nécessaire
7. surveiller queue depth et lag
8. exposer état degraded dans l’adminCommandes critiques : QoS ne suffit pas, il faut un ack métier
Pour les commandes critiques, MQTT QoS confirme la livraison protocolaire, mais pas l’exécution métier. Une commande peut être livrée au device, mais échouer côté machine : droit insuffisant, sécurité locale, état incompatible, timeout, opérateur requis.
Pattern commande / ack
Backend publish :
devices/gw-001/commands/reboot
Payload :
{
"command_id": "cmd-20260425-0001",
"action": "reboot",
"requested_by": "operator-7",
"expires_at": "2026-04-25T10:01:00Z"
}
Device ack :
devices/gw-001/commands/ack
Payload :
{
"command_id": "cmd-20260425-0001",
"status": "accepted",
"received_at": "2026-04-25T10:00:05Z"
}
Device result :
devices/gw-001/commands/result
Payload :
{
"command_id": "cmd-20260425-0001",
"status": "done",
"completed_at": "2026-04-25T10:00:28Z"
}Machine d’état commande
CREATED
│
├── published to MQTT
▼
SENT
│
├── device received
▼
ACKED
│
├── device starts execution
▼
RUNNING
│
├── success
▼
DONE
Branches erreur :
SENT -> EXPIRED
SENT -> REJECTED
ACKED -> FAILED
RUNNING -> TIMEOUT
ANY -> CANCELLED| Élément | Pourquoi |
|---|---|
| command_id | Déduplication, suivi, audit. |
| expires_at | Évite exécution tardive dangereuse. |
| ack | Le device a reçu la commande. |
| result | La commande a réellement été exécutée ou rejetée. |
| audit | Traçabilité opérateur/système. |
Runbook production : diagnostiquer QoS, doublons, pertes et latence
Symptôme → diagnostic
| Symptôme | Hypothèse | Vérification | Correction |
|---|---|---|---|
| Messages manquants | QoS 0, subscriber offline, ACL. | Logs broker, SUBACK, drops. | QoS 1, session persistante, monitoring. |
| Doublons DB | QoS 1 retry, worker retry. | Comparer message_id, sequence. | Idempotence + contrainte courte. |
| Latence élevée | QoS trop élevé, DB lente, queue. | Lag, inflight, CPU broker, DB metrics. | Batch, shared subscription, réduire QoS non critique. |
| Commandes exécutées deux fois | Absence command_id / ack métier. | Journal commandes. | Machine d’état + idempotence côté device. |
| Broker mémoire élevée | Sessions persistantes et queues. | Queued messages par client. | Quotas, expiry, purge, consumers. |
| Subscriber lent | Traitement synchrone trop lourd. | Temps callback, queue depth. | Découpler MQTT callback et traitement métier. |
Checklist QoS avant mise en production
[ ] Chaque flux a un QoS documenté
[ ] Les raisons du QoS sont écrites
[ ] Les payloads importants ont message_id
[ ] Les devices critiques ont sequence_number
[ ] Le backend est idempotent pour QoS 1
[ ] Les commandes ont command_id
[ ] Les commandes ont ack et result
[ ] Les expirations sont définies pour commandes
[ ] Les shared subscriptions sont testées
[ ] Les queues offline ont des limites
[ ] Le lag d’ingestion est mesuré
[ ] Les doublons sont visibles dans l’admin
[ ] Les drops broker sont alertés
[ ] Les tests de coupure réseau sont faits
[ ] Les tests de retry sont faits
[ ] Les tests de crash worker sont faitsTest de chaos simple
Scénario :
1. Publier 1000 messages QoS 1 avec message_id.
2. Couper le worker backend après 300 messages.
3. Redémarrer le worker.
4. Vérifier :
- aucune double insertion métier
- duplicate_count augmente si retry
- lag revient à zéro
- erreurs visibles dans l’admin
- aucune commande critique exécutée deux foisPourquoi le design des topics est critique
Dans MQTT, le topic est beaucoup plus qu’un simple nom. C’est l’adresse logique qui permet au broker de router les messages, aux consommateurs de filtrer les flux, aux ACL de protéger les données, aux dashboards de s’abonner proprement, et aux backends d’organiser l’ingestion.
Un mauvais design de topics finit presque toujours par créer du chaos : abonnements trop larges, données difficiles à isoler, sécurité fragile, migration compliquée, dashboards bruyants, workers surchargés, et conventions impossibles à maintenir.
| Dimension | Rôle dans le topic | Exemple |
|---|---|---|
| Tenant | Isolation client ou organisation. | acme |
| Site | Localisation ou périmètre logique. | paris, factory-01 |
| Area | Sous-zone, ligne, étage, atelier. | line-02, floor-04 |
| Asset | Device, gateway, machine ou équipement. | gw-123, motor-09 |
| Stream | Nature du flux. | telemetry, status, event, command |
| Signal | Mesure, événement ou action précise. | temperature, alarm, setpoint |
Diagramme mental du topic
Topic MQTT bien structuré :
{tenant}/{site}/{area}/{asset}/{stream}/{signal}
Exemple :
acme/paris/line-02/motor-09/telemetry/temperature
Décomposition :
┌─────────┬────────┬─────────┬──────────┬───────────┬─────────────┐
│ tenant │ site │ area │ asset │ stream │ signal │
├─────────┼────────┼─────────┼──────────┼───────────┼─────────────┤
│ acme │ paris │ line-02 │ motor-09 │ telemetry │ temperature │
└─────────┴────────┴─────────┴──────────┴───────────┴─────────────┘Pourquoi cette structure marche
Isolation :
acme/#
Filtrage site :
acme/paris/#
Filtrage ligne :
acme/paris/line-02/#
Filtrage asset :
acme/paris/line-02/motor-09/#
Filtrage flux :
acme/+/+/+/telemetry/#
Filtrage signal :
acme/+/+/+/telemetry/temperatureConvention de nommage robuste
Convention recommandée
{tenant}/{site}/{area}/{asset}/{stream}/{signal}
Exemples :
acme/paris/building-a/boiler-07/telemetry/temperature
acme/paris/building-a/boiler-07/telemetry/pressure
acme/paris/building-a/boiler-07/status/availability
acme/paris/building-a/boiler-07/event/alarm
acme/paris/building-a/boiler-07/command/setpoint
acme/paris/building-a/boiler-07/ack/setpoint| Niveau | But | Exemple | Conseil |
|---|---|---|---|
| tenant | Organisation, client, espace logique. | acme | Stable et contrôlé. |
| site | Site, ville, usine, data center. | paris, factory-01 | Éviter les noms trop humains si changeants. |
| area | Ligne, étage, zone, bâtiment. | line-02 | Préférer les IDs stables. |
| asset | Machine, device, gateway, équipement. | motor-09 | Doit mapper vers ton référentiel devices. |
| stream | Nature du message. | telemetry, status | Liste fermée, documentée. |
| signal | Mesure, commande, événement précis. | temperature | Nom stable, en anglais technique. |
Règles pratiques de nommage
| Règle | Pourquoi |
|---|---|
| Utiliser lowercase. | Évite les incohérences Paris vs paris. |
| Utiliser kebab-case ou snake_case, mais pas les deux. | Homogénéité des conventions. |
| Éviter accents, espaces, caractères spéciaux. | Interopérabilité, logs, scripts, ACL. |
| Ne pas mettre de valeur dynamique inutile dans le topic. | Évite explosion de cardinalité. |
| Éviter les noms humains modifiables. | Préférer des IDs stables. |
| Documenter chaque niveau. | Empêche les interprétations divergentes. |
Bon vs mauvais
Mauvais :
ACME/Paris/Bâtiment A/Chaudière 07/Température
device123
data
telemetry/all
factory/#/temperature
Bon :
acme/paris/building-a/boiler-07/telemetry/temperature
acme/paris/building-a/boiler-07/status/availability
acme/paris/building-a/boiler-07/event/alarm
acme/paris/building-a/boiler-07/command/setpointWildcards MQTT : + et #
Les wildcards MQTT servent uniquement dans les subscriptions. Elles permettent de recevoir plusieurs topics correspondant à un motif. Elles sont très puissantes, mais dangereuses si elles sont trop larges.
| Wildcard | Nom | Signification | Exemple |
|---|---|---|---|
| + | Single-level wildcard | Remplace exactement un niveau. | acme/+/+/+/status/availability |
| # | Multi-level wildcard | Remplace tous les niveaux restants. | acme/paris/# |
Exemples
Subscription :
acme/+/+/+/telemetry/temperature
Reçoit :
acme/paris/line-01/motor-01/telemetry/temperature
acme/madrid/line-04/motor-09/telemetry/temperature
Ne reçoit pas :
acme/paris/line-01/motor-01/telemetry/vibration
acme/paris/line-01/motor-01/status/availabilitySubscription :
acme/paris/#
Reçoit :
acme/paris/line-01/motor-01/telemetry/temperature
acme/paris/line-01/motor-01/status/availability
acme/paris/line-02/gw-07/event/alarm
acme/paris/building-a/boiler-07/command/setpointRisque des wildcards trop larges
Subscription dangereuse :
#
Effet :
- reçoit tout le broker
- peut aspirer énormément de trafic
- peut exposer des données sensibles
- peut saturer un client lent
- peut rendre le debug trompeur
À réserver :
- debug local
- compte admin temporaire
- environnement de test
- bridge très contrôléBonnes pratiques wildcards
| Besoin | Subscription recommandée |
|---|---|
| Toute la télémétrie d’un tenant. | acme/+/+/+/telemetry/# |
| Tout un site. | acme/paris/# |
| Toutes les alarmes. | acme/+/+/+/event/alarm |
| Toutes les températures. | acme/+/+/+/telemetry/temperature |
| Commandes d’un device. | acme/paris/line-02/gw-123/command/# |
tenant/#.Namespaces multi-tenant : isoler clients, sites et domaines
En architecture multi-tenant, le premier niveau du topic sert souvent à isoler les clients, organisations, environnements ou domaines métier. Cela facilite les ACL, le monitoring, les quotas, les bridges et les exports.
Modèle recommandé
{env}/{tenant}/{site}/{area}/{asset}/{stream}/{signal}
Exemples :
prod/acme/paris/line-02/motor-09/telemetry/temperature
prod/acme/paris/line-02/motor-09/status/availability
prod/acme/paris/line-02/motor-09/event/alarm
staging/acme/paris/line-02/motor-09/telemetry/temperature| Niveau | Utilité | Exemple |
|---|---|---|
| env | Sépare prod, staging, dev. | prod, staging |
| tenant | Sépare les clients. | acme, globex |
| site | Sépare les sites. | paris, madrid |
| area | Sépare zones ou lignes. | line-02, floor-04 |
ACL multi-tenant simplifiées
Tenant ACME backend :
allow subscribe: prod/acme/+/+/+/telemetry/#
allow subscribe: prod/acme/+/+/+/event/#
allow publish: prod/acme/+/+/+/command/#
Tenant GLOBEX backend :
allow subscribe: prod/globex/+/+/+/telemetry/#
deny subscribe: prod/acme/#
Device ACME gw-123 :
allow publish: prod/acme/paris/line-02/gw-123/telemetry/#
allow publish: prod/acme/paris/line-02/gw-123/status/#
allow subscribe: prod/acme/paris/line-02/gw-123/command/#
deny subscribe: prod/acme/#Stratégies d’isolation
| Stratégie | Avantage | Limite |
|---|---|---|
| Un broker par tenant. | Isolation forte. | Coût et exploitation plus élevés. |
| Un namespace par tenant. | Simple, efficace, économique. | ACL doivent être impeccables. |
| Un cluster partagé avec quotas. | Scalable. | Gouvernance et monitoring nécessaires. |
| Bridge par tenant. | Contrôle fin des flux sortants. | Complexité bridge et mapping. |
Séparer telemetry, status et event
Un topic design propre distingue les messages de mesure, les états courants et les événements. Ces trois familles ont des comportements différents : fréquence, stockage, retained, QoS, consommation et alerting.
| Stream | Sens | Fréquence | Retained | QoS typique |
|---|---|---|---|---|
| telemetry | Mesures numériques ou observations. | Fréquente. | Non, sauf dernier état spécifique. | 0 ou 1. |
| status | État courant d’un device ou composant. | À changement. | Oui souvent. | 1. |
| event | Événement ponctuel horodaté. | Irrégulière. | Non en général. | 1. |
| alarm | Événement critique ou sécurité. | Irrégulière. | Parfois pour état actif. | 1, parfois 2. |
Exemples propres
Telemetry :
acme/paris/line-02/motor-09/telemetry/temperature
acme/paris/line-02/motor-09/telemetry/vibration
acme/paris/line-02/motor-09/telemetry/rpm
Status :
acme/paris/line-02/motor-09/status/availability
acme/paris/line-02/motor-09/status/mode
acme/paris/line-02/motor-09/status/health
Events :
acme/paris/line-02/motor-09/event/alarm
acme/paris/line-02/motor-09/event/maintenance
acme/paris/line-02/motor-09/event/errorDiagramme : nature des flux
Telemetry
├── nombreuses mesures
├── stockage time-series
├── agrégation possible
└── perte parfois acceptable
Status
├── état courant
├── retained utile
├── dashboard immédiat
└── dernière valeur importante
Event
├── événement daté
├── audit / historique
├── pas forcément retained
└── idempotence importante
Command
├── ordre envoyé au device
├── command_id obligatoire
├── expiration recommandée
└── ack/result requisPayloads types
Telemetry payload :
{
"schema_version": "1.0",
"message_id": "msg-001",
"value": 72.4,
"unit": "C",
"ts": "2026-04-25T10:00:00Z"
}
Status payload :
{
"schema_version": "1.0",
"state": "online",
"since": "2026-04-25T09:58:00Z",
"firmware": "1.8.2"
}
Event payload :
{
"schema_version": "1.0",
"event_id": "evt-001",
"severity": "critical",
"code": "OVERHEAT",
"ts": "2026-04-25T10:00:00Z"
}Commandes et accusés applicatifs
Les commandes doivent être traitées différemment de la télémétrie. Une commande est une intention d’action. Elle doit avoir un identifiant, une expiration, un statut, un accusé de réception et idéalement un résultat d’exécution.
Topics recommandés
Commande :
acme/paris/line-02/gw-123/command/reboot
acme/paris/line-02/gw-123/command/setpoint
acme/paris/line-02/gw-123/command/config-update
Ack :
acme/paris/line-02/gw-123/ack/reboot
acme/paris/line-02/gw-123/ack/setpoint
acme/paris/line-02/gw-123/ack/config-update
Résultat :
acme/paris/line-02/gw-123/result/reboot
acme/paris/line-02/gw-123/result/setpointPayload commande
{
"schema_version": "1.0",
"command_id": "cmd-20260425-0001",
"action": "setpoint",
"value": 21.5,
"unit": "C",
"requested_by": "backend-rules-engine",
"expires_at": "2026-04-25T10:01:00Z"
}Flux command / ack / result
Backend
│
│ PUBLISH command/setpoint
▼
Broker MQTT
│
▼
Device
│
├── vérifie command_id
├── vérifie expiration
├── vérifie état local
├── publie ack/accepted ou ack/rejected
└── exécute si accepté
│
▼
publie result/done ou result/failedMachine d’état commande
CREATED
│
▼
PUBLISHED
│
├── ACK accepted
▼
ACKED
│
├── execution started
▼
RUNNING
│
├── success
▼
DONE
Branches :
PUBLISHED -> EXPIRED
PUBLISHED -> REJECTED
ACKED -> FAILED
RUNNING -> TIMEOUT
ANY -> CANCELLEDRetained state : dernière valeur connue, pas historique
Le flag retained demande au broker de conserver le dernier message publié sur un topic. Quand un nouveau subscriber s’abonne, il reçoit immédiatement cette dernière valeur. C’est parfait pour les états courants, mais dangereux si utilisé comme historique.
| Usage retained | Recommandé ? | Pourquoi |
|---|---|---|
| Dernier état online/offline. | Oui | Dashboard immédiatement cohérent. |
| Mode courant machine. | Oui | Nouvel abonné voit l’état actuel. |
| Dernier setpoint actif. | Oui, avec prudence | État courant important. |
| Mesures de température historiques. | Non | Retained ne garde qu’une valeur. |
| Événements d’alarme passés. | Non en général | À stocker en table events. |
| Commande à exécuter. | Très dangereux | Un device reconnecté pourrait recevoir une ancienne commande. |
Exemple retained status
Topic :
acme/paris/line-02/gw-123/status/availability
Retained payload :
{
"schema_version": "1.0",
"state": "online",
"since": "2026-04-25T09:58:00Z",
"firmware": "1.8.2"
}Retained : comportement
1. Device publie retained :
status/availability = online
2. Broker conserve cette dernière valeur.
3. Dashboard démarre plus tard.
4. Dashboard SUBSCRIBE status/availability.
5. Broker envoie immédiatement :
online
6. Dashboard n’a pas besoin d’attendre
le prochain message device.Retained vs queue vs historique
| Mécanisme | But | Conserve quoi ? |
|---|---|---|
| Retained | Dernière valeur connue. | Un message par topic. |
| Session queue | Messages pour subscriber offline. | Messages QoS selon session. |
| Database | Historique durable. | Toutes les valeurs utiles. |
| Kafka/Event log | Replay et streaming durable. | Flux ordonné et rejouable. |
ACL par topic : sécurité publish / subscribe
Les ACL MQTT doivent être pensées dès le design des topics. Un bon namespace rend les règles simples. Un mauvais namespace oblige à écrire des ACL complexes, fragiles et dangereuses.
Exemple ACL device
Device gw-123 :
allow publish:
acme/paris/line-02/gw-123/telemetry/#
acme/paris/line-02/gw-123/status/#
acme/paris/line-02/gw-123/event/#
allow subscribe:
acme/paris/line-02/gw-123/command/#
deny subscribe:
acme/#
deny publish:
acme/paris/line-02/other-device/#Exemple ACL backend ingestion
Backend ingestion :
allow subscribe:
acme/+/+/+/telemetry/#
acme/+/+/+/status/#
acme/+/+/+/event/#
deny publish:
acme/+/+/+/command/#
allow publish only for command service:
acme/+/+/+/command/#Table des rôles MQTT
| Rôle | Publish | Subscribe | Interdit |
|---|---|---|---|
| Device | Sa telemetry, son status, ses events. | Ses commands. | Lire tout le tenant. |
| Gateway | Flux de ses assets. | Commandes de son périmètre. | Publier pour un autre site. |
| Dashboard | Souvent aucun publish. | Topics filtrés par utilisateur. | # global. |
| Ingestion worker | Rarement. | Telemetry/status/event. | Commandes si non autorisé. |
| Command service | Command topics. | Ack/result topics. | Telemetry brute si inutile. |
| Admin debug | Limité ou aucun. | Large temporairement. | Accès permanent non audité. |
Diagramme ACL
Device gw-123
├── publish telemetry/status/event
└── subscribe command
Backend ingestion
└── subscribe telemetry/status/event
Command service
├── publish command
└── subscribe ack/result
Dashboard
└── subscribe périmètre utilisateur
Admin
└── debug limité, temporaire, auditéVersioning : faire évoluer les topics et payloads sans casser les consommateurs
Les systèmes MQTT vivent longtemps : devices, gateways et dashboards peuvent être mis à jour à des rythmes différents. Il faut donc une stratégie de versioning. Le plus souvent, on versionne le payload. On versionne le topic seulement si la rupture est structurelle.
| Stratégie | Exemple | Quand l’utiliser |
|---|---|---|
| Version dans payload. | "schema_version": "1.0" | Recommandé par défaut. |
| Version dans topic. | v2/acme/paris/... | Changement majeur de structure. |
| Topic parallèle. | telemetry-v2 | Migration progressive. |
| User Properties MQTT 5. | schema=temperature.v2 | Si écosystème MQTT 5 maîtrisé. |
Payload versionné
{
"schema_version": "2.0",
"message_id": "msg-20260425-001",
"device_id": "motor-09",
"metric": "temperature",
"value": 72.4,
"unit": "C",
"quality": "good",
"ts": "2026-04-25T10:00:00Z"
}Migration sans rupture
Phase 1 :
Device publie ancien payload v1
Backend accepte v1
Phase 2 :
Device publie v1 + nouveaux champs compatibles
Backend accepte v1 enrichi
Phase 3 :
Device publie schema_version v2
Backend accepte v1 et v2
Phase 4 :
Dashboard compatible v2
Monitoring des consommateurs restants v1
Phase 5 :
Décommission v1 documentéeBreaking changes typiques
| Changement | Risque | Solution |
|---|---|---|
| Renommer un champ. | Consommateurs cassés. | Ajouter nouveau champ, déprécier ancien. |
| Changer unité. | Calculs faux. | Inclure unit et version. |
| Changer type numérique/string. | Validation échoue. | Versionner le schéma. |
| Déplacer dimension topic/payload. | Routing cassé. | Topic v2 ou période parallèle. |
Anti-patterns de design MQTT
| Anti-pattern | Pourquoi c’est mauvais | Correction |
|---|---|---|
| Topic = device_id uniquement. | Impossible de filtrer par site, flux, métrique. | Hiérarchie stable. |
Topic générique data. | Le broker ne peut pas router intelligemment. | Mettre dimensions de routage dans topic. |
| Tout dans le payload. | Subscribers obligés de tout recevoir puis filtrer. | Topic = routage, payload = données. |
Wildcard # en production. | Risque performance et sécurité. | Limiter à des préfixes précis. |
| Commandes retained. | Commande ancienne rejouée à la reconnexion. | Commandes non retained + expiration. |
| Pas d’ack commande. | Impossible de prouver l’exécution. | command_id + ack/result. |
| Pas de version payload. | Breaking changes silencieux. | schema_version. |
| Majuscules, accents, espaces. | Interop fragile. | lowercase, ASCII, kebab-case. |
| Tenant absent. | ACL multi-client difficiles. | Préfixe tenant ou broker séparé. |
| État et événement mélangés. | Retained et historique confondus. | Séparer status et event. |
Checklist qualité topic design
[ ] Le topic permet de filtrer par tenant
[ ] Le topic permet de filtrer par site
[ ] Le topic permet de filtrer par asset
[ ] Le topic distingue telemetry/status/event/command
[ ] Les commands ont ack/result
[ ] Les commands ne sont pas retained
[ ] Les status importants peuvent être retained
[ ] Les events sont historisés en DB
[ ] Le payload contient schema_version
[ ] Le payload contient message_id si important
[ ] Les noms sont lowercase ASCII
[ ] Les ACL sont simples à écrire
[ ] Les wildcards larges sont évitées
[ ] Les conventions sont documentées
[ ] Les migrations v1/v2 sont prévuesDesign final recommandé
prod/{tenant}/{site}/{area}/{asset}/telemetry/{metric}
prod/{tenant}/{site}/{area}/{asset}/status/{state_name}
prod/{tenant}/{site}/{area}/{asset}/event/{event_type}
prod/{tenant}/{site}/{area}/{asset}/command/{command_name}
prod/{tenant}/{site}/{area}/{asset}/ack/{command_name}
prod/{tenant}/{site}/{area}/{asset}/result/{command_name}Le broker MQTT : le coeur operationnel
Le broker MQTT est le composant central de l’architecture. Il accepte les connexions clientes, authentifie les devices, applique les ACL, route les messages, gere les sessions, conserve les retained messages, publie les Last Will et expose les metriques de production.
Choisir un broker MQTT n’est pas seulement choisir un logiciel. C’est choisir un modele d’exploitation : edge leger, cluster enterprise, cloud managed, integration AMQP, rules engine, observabilite, securite, multi-tenant, HA et cout operationnel.
| Famille | Brokers | Usage principal |
|---|---|---|
| Lightweight / Edge | Mosquitto | Lab, Raspberry, VM simple, gateway locale, petit edge. |
| Cluster IoT massif | EMQX, HiveMQ, VerneMQ | Grand nombre de connexions, HA, multi-tenant, observabilite. |
| Messaging enterprise | RabbitMQ MQTT Plugin | Integration MQTT vers AMQP, exchanges, queues, workers existants. |
| Cloud managed IoT | AWS IoT Core, Azure IoT Hub | Certificats devices, rules, integration cloud, scalabilite managee. |
Carte mentale des brokers
MQTT Brokers
├── Simple / Edge
│ └── Mosquitto
│ ├── tres leger
│ ├── configuration simple
│ └── parfait lab / gateway / petite prod
│
├── Enterprise / Cluster
│ ├── EMQX
│ ├── HiveMQ
│ └── VerneMQ
│ ├── HA / cluster
│ ├── observabilite
│ ├── plugins auth
│ ├── shared subscriptions
│ └── MQTT 5 avance
│
├── Messaging enterprise
│ └── RabbitMQ MQTT Plugin
│ ├── MQTT en entree
│ ├── AMQP en interne
│ └── queues metier
│
└── Cloud managed IoT
├── AWS IoT Core
└── Azure IoT Hub
├── device registry
├── certificates
├── rules
└── integration cloudQuestions de choix
1. Combien de connexions simultanees ?
2. Combien de messages par seconde ?
3. Besoin de HA active-active ?
4. Broker edge local ou cloud central ?
5. Besoin de MQTT 5 avance ?
6. Besoin de rules engine ?
7. Auth locale ou auth externe ?
8. Integration Kafka / SQL / Webhook ?
9. Multi-tenant strict ?
10. Equipe capable d’exploiter un cluster ?Comparatif des principaux brokers MQTT
| Broker | Profil | Forces | Limites | Bon choix si... | URL |
|---|---|---|---|---|---|
| Mosquitto | Leger, simple, edge. | Installation rapide, robuste, connu, faible empreinte. | Fonctions cluster/enterprise plus limitees. | POC, lab, gateway, petite production. | https://mosquitto.org/ |
| EMQX | Broker IoT massif. | Cluster, dashboard, auth plugins, rules engine, MQTT 5. | Plus lourd a exploiter qu’un broker simple. | Grand volume, multi-tenant, integrations data. | https://www.emqx.com/ |
| HiveMQ | Enterprise MQTT. | Cluster, extension SDK, observabilite, support enterprise. | Cout enterprise selon edition et usage. | Production critique avec SLA et support. | https://www.hivemq.com/ |
| VerneMQ | Broker distribue Erlang. | Cluster, architecture distribuee, plugins. | Ecosysteme moins large que EMQX/HiveMQ. | Tu veux un broker distribue open-source mature. | https://vernemq.com/ |
| RabbitMQ MQTT Plugin | MQTT vers RabbitMQ. | Integration AMQP, exchanges, queues, dead-letter. | Pas toujours ideal pour IoT massif pur. | RabbitMQ est deja le bus de messaging interne. | https://www.rabbitmq.com/docs/mqtt |
| AWS IoT Core | Cloud managed IoT. | Certificats devices, registry, rules, integration AWS. | Quotas, cout, lock-in, architecture AWS. | Ton SI est AWS et tu veux du managed. | https://docs.aws.amazon.com/iot/ |
| Azure IoT Hub | Cloud managed IoT Microsoft. | Device identity, cloud-to-device, integration Azure. | Modele specifique Azure, cout, lock-in. | Ton SI est Azure ou industriel Microsoft. | https://learn.microsoft.com/azure/iot-hub/ |
Mosquitto : le broker simple, robuste et leger
Mosquitto est souvent le meilleur point de depart. Il est simple a installer, tres leger, tres documente, parfait pour un lab, une VM, une gateway locale, un Raspberry Pi, ou une petite production controlee.
| Point | Mosquitto |
|---|---|
| Profil | Broker MQTT leger. |
| Usage ideal | Edge, gateway, POC, petite/moyenne prod. |
| Forces | Simplicite, faible empreinte, stabilite. |
| Securite | TLS, password file, ACL file, certificats. |
| Limite | Moins riche pour cluster enterprise et rules engine. |
Configuration TLS + password + ACL
# /etc/mosquitto/conf.d/prod.conf
listener 8883
protocol mqtt
cafile /etc/mosquitto/certs/ca.crt
certfile /etc/mosquitto/certs/server.crt
keyfile /etc/mosquitto/certs/server.key
require_certificate false
allow_anonymous false
password_file /etc/mosquitto/passwd
acl_file /etc/mosquitto/acl
persistence true
persistence_location /var/lib/mosquitto/
log_type error
log_type warning
log_type notice
log_type informationACL Mosquitto typique
# /etc/mosquitto/acl
user gw_123
topic write acme/paris/line-02/gw-123/telemetry/#
topic write acme/paris/line-02/gw-123/status/#
topic write acme/paris/line-02/gw-123/event/#
topic read acme/paris/line-02/gw-123/command/#
user ingestion_worker
topic read acme/+/+/+/telemetry/#
topic read acme/+/+/+/status/#
topic read acme/+/+/+/event/#
user command_service
topic write acme/+/+/+/command/#
topic read acme/+/+/+/ack/#
topic read acme/+/+/+/result/#Commandes utiles
# Creer un utilisateur
sudo mosquitto_passwd -c /etc/mosquitto/passwd gw_123
# Ajouter un utilisateur
sudo mosquitto_passwd /etc/mosquitto/passwd ingestion_worker
# Tester subscribe
mosquitto_sub -h localhost -p 8883 \
--cafile ca.crt \
-u ingestion_worker -P password \
-t 'acme/+/+/+/telemetry/#' -v
# Tester publish
mosquitto_pub -h localhost -p 8883 \
--cafile ca.crt \
-u gw_123 -P password \
-t 'acme/paris/line-02/gw-123/status/availability' \
-m '{"state":"online"}' -q 1 -rEMQX : broker MQTT massif, clusterise, oriente IoT et integrations
EMQX est un broker MQTT moderne oriente scalabilite, clustering, MQTT 5, dashboards, plugins d’authentification, integrations externes et rules engine. Il devient logique quand le broker n’est plus seulement un point de passage, mais une plateforme IoT.
Quand EMQX devient pertinent
- Nombre eleve de connexions simultanees.
- Besoin de cluster actif.
- Besoin de dashboard broker integre.
- Auth externe : HTTP, JWT, LDAP, SQL, OAuth selon edition/plugins.
- Rules engine pour pousser vers Kafka, SQL, HTTP, S3 ou autres integrations.
- Quotas, rate limits, observabilite et MQTT 5 avance.
- Multi-tenant ou segmentation forte des clients.
| Fonction | Valeur |
|---|---|
| Cluster | Scalabilite horizontale et resilience. |
| Dashboard | Operations, metriques, connexions, subscriptions. |
| Rules engine | Routage vers systemes externes sans coder tout le pipeline. |
| Auth plugins | Integration avec systemes existants. |
| MQTT 5 | Reason codes, expiry, flow control, properties. |
Architecture EMQX typique
┌─────────────────────┐
Devices ────────►│ TCP/TLS Load Balancer│
└──────────┬──────────┘
│
┌─────────────────┼─────────────────┐
▼ ▼ ▼
┌─────────────┐ ┌─────────────┐ ┌─────────────┐
│ EMQX Node 1 │ │ EMQX Node 2 │ │ EMQX Node 3 │
└──────┬──────┘ └──────┬──────┘ └──────┬──────┘
│ │ │
└─────────────────┼─────────────────┘
▼
┌─────────────────┐
│ Rules / Bridges │
└───────┬─────────┘
│
┌───────────────────┼────────────────────┐
▼ ▼ ▼
Kafka PostgreSQL HTTP
S3 ClickHouse WebhooksPoints de vigilance
| Point | Risque | Controle |
|---|---|---|
| Cluster | Complexite reseau et operations. | Tests failover, monitoring node health. |
| Rules engine | Logique cachee dans le broker. | Versionner et documenter les rules. |
| Auth externe | Dependance a un service externe. | Timeouts, cache, fallback controle. |
| Quotas | Clients bloques si mauvais dimensionnement. | Profil par type de client. |
HiveMQ : broker MQTT enterprise, support, extensions et observabilite
HiveMQ est tres oriente production enterprise : clustering, support MQTT 5, observabilite, extension SDK, integrations, securite et support professionnel. C’est un choix naturel dans des environnements ou le broker devient un composant critique avec SLA, support et gouvernance.
Forces de HiveMQ
- Orientation enterprise et production critique.
- Support fort de MQTT 5.
- Cluster et haute disponibilite.
- Extension SDK pour personnaliser authentification, autorisation, integration.
- Observabilite et integrations monitoring.
- Approche professionnelle pour grands deploiements IoT.
| Besoin | Interet HiveMQ |
|---|---|
| SLA eleve | Approche enterprise, support, architecture HA. |
| Custom auth | Extensions specifiques. |
| IoT critique | Robustesse, monitoring, MQTT 5. |
| Gouvernance SI | Produit plus cadre pour grandes organisations. |
Architecture HiveMQ enterprise
Devices
│
▼
Load Balancer TCP/TLS
│
├── HiveMQ Node A
├── HiveMQ Node B
└── HiveMQ Node C
│
├── Extension Auth / ACL
├── Extension Integration
├── Metrics / Monitoring
└── Data pipeline
├── Kafka
├── Database
├── Webhook
└── AnalyticsQuand choisir HiveMQ
| Situation | Decision |
|---|---|
| Grand compte industriel. | HiveMQ est un candidat serieux. |
| Besoin support vendor. | HiveMQ devient logique. |
| Besoin d’extensions propres. | SDK utile. |
| Simple broker local. | Mosquitto suffit souvent. |
| Budget tres contraint. | Comparer EMQX/VerneMQ/Mosquitto. |
VerneMQ : broker MQTT distribue base sur Erlang
VerneMQ est un broker MQTT distribue, historiquement apprecie pour les architectures cluster et la robustesse de l’ecosysteme Erlang. Il peut etre interessant si tu veux un broker open-source distribue, avec plugins et capacite de cluster.
| Point | VerneMQ |
|---|---|
| Profil | Broker MQTT distribue. |
| Technologie | Erlang. |
| Forces | Cluster, plugins, design distribue. |
| Usage | Production MQTT distribuee, si expertise operationnelle. |
| Limite | Ecosysteme et popularite parfois moins visibles que EMQX/HiveMQ. |
Quand VerneMQ est pertinent
- Tu veux un broker distribue open-source.
- Tu as besoin de cluster MQTT.
- Tu acceptes d’exploiter un composant Erlang.
- Tu veux tester une alternative a EMQX/HiveMQ.
- Tu as des besoins de plugins ou d’auth custom.
Topologie cluster VerneMQ
┌─────────────────────┐
Devices ────►│ Load Balancer MQTT │
└──────────┬──────────┘
│
┌───────────────┼───────────────┐
▼ ▼ ▼
┌──────────────┐ ┌──────────────┐ ┌──────────────┐
│ VerneMQ N1 │ │ VerneMQ N2 │ │ VerneMQ N3 │
└──────┬───────┘ └──────┬───────┘ └──────┬───────┘
│ │ │
└────────────────┼────────────────┘
▼
Cluster state / routing
│
▼
Backend subscribers / bridgesPoints d’exploitation
| Sujet | A surveiller |
|---|---|
| Cluster membership | Noeuds visibles, partitions reseau. |
| Sessions | Persistance, reconnexion, distribution. |
| Plugins | Auth/ACL, compatibilite, latence. |
| Monitoring | Connexions, message rate, dropped, queues. |
RabbitMQ MQTT Plugin : MQTT en entree, AMQP en interne
RabbitMQ n’est pas d’abord un broker MQTT pur. C’est un broker de messaging enterprise AMQP avec exchanges, queues, routing keys, dead-letter, acknowledgements et plugins. Le plugin MQTT permet a des clients MQTT de publier/recevoir via RabbitMQ, ce qui est tres utile si ton SI exploite deja RabbitMQ.
Cas d’usage pertinent
- RabbitMQ existe deja dans l’architecture.
- Les devices doivent publier en MQTT.
- Les services internes consomment via AMQP.
- Tu veux utiliser exchanges, queues, dead-letter, retry.
- Tu veux brancher ingestion, stockage, alerting et WebSocket via queues separees.
| MQTT | RabbitMQ | Interpretation |
|---|---|---|
| Topic MQTT | Routing / exchange mapping | Traduction vers modele AMQP. |
| Subscriber MQTT | Queue / consumer | Consommation via queue. |
| QoS MQTT | Ack / delivery semantics | Attention aux differences de semantique. |
| Session MQTT | Queues internes | Comprendre la persistance. |
Architecture RabbitMQ + MQTT
MQTT Devices
│
│ MQTT publish
▼
RabbitMQ MQTT Listener
│
▼
AMQP Exchange
│
├── Queue storage
│ └── PostgreSQL / TimescaleDB worker
│
├── Queue alerts
│ └── rule engine
│
├── Queue websocket
│ └── realtime dashboard
│
└── Dead-letter queue
└── failed messages / replayForces et limites
| Forces | Limites |
|---|---|
| Excellent si RabbitMQ est deja standard interne. | Pas toujours ideal pour millions de devices IoT. |
| Queues, DLQ, retry, routing metier. | Semantique MQTT traduite vers AMQP a comprendre. |
| Integration forte avec workers backend. | Moins naturel qu’un broker MQTT pur pour flottes massives. |
| Bonne separation des consommateurs. | Configuration plus subtile. |
Cloud IoT : AWS IoT Core, Azure IoT Hub et services managed
Les services cloud IoT ne sont pas seulement des brokers MQTT. Ils ajoutent souvent un registre devices, une gestion d’identite, certificats, policies, rules engine, integration avec stockage, serverless, monitoring et securite cloud.
| Service | Forces | Limites | Bon choix si... |
|---|---|---|---|
| AWS IoT Core | Certificats X.509, policies, rules engine, integration AWS. | Cout, quotas, lock-in, complexite IAM/policies. | Stack AWS, besoin managed, devices certifies. |
| Azure IoT Hub | Device identity, cloud-to-device, integration Azure. | Modele Azure specifique, cout, lock-in. | Stack Azure, industrie Microsoft, Device Twin. |
| Broker self-hosted cloud | Controle total, choix broker, cout previsible. | Operations, HA, securite a gerer. | Equipe infra solide, besoin de controle. |
AWS IoT Core : pipeline typique
Devices
│
│ MQTT/TLS + certificates
▼
AWS IoT Core
│
├── IoT Policy
├── Device Registry
├── Rules Engine
│ ├── Lambda
│ ├── Kinesis
│ ├── DynamoDB
│ ├── S3
│ └── SNS/SQS
│
└── Monitoring / LogsAzure IoT Hub : pipeline typique
Devices
│
│ MQTT/TLS
▼
Azure IoT Hub
│
├── Device Identity
├── Device-to-cloud messages
├── Cloud-to-device messages
├── Device Twin
├── Routes
│ ├── Event Hubs
│ ├── Storage
│ ├── Service Bus
│ └── Functions
│
└── Monitoring / Defender / LogsCloud managed vs self-hosted
| Critere | Cloud managed | Self-hosted |
|---|---|---|
| Time-to-market | Rapide. | Plus long. |
| Controle | Limite par service. | Total. |
| Cout | Variable selon messages/devices. | Infra + operations. |
| Lock-in | Fort. | Plus faible. |
| Operations | Reduites. | A gerer completement. |
Architecture HA, cluster, bridge et edge
La haute disponibilite depend du broker choisi. Un Mosquitto simple peut etre suffisant pour un edge local, mais un service central critique a souvent besoin d’un cluster, d’un load balancer, de monitoring, de sauvegardes de configuration et de procedures de failover.
| Modele | Usage | Avantage | Limite |
|---|---|---|---|
| Broker unique | Lab, POC, edge non critique. | Simple. | Point de panne unique. |
| Broker local + bridge | Sites distants, industrie. | Autonomie locale. | Gestion buffer, doublons, boucles. |
| Cluster broker | Central critique. | Scalabilite, resilience. | Complexite. |
| Cloud managed | IoT cloud. | Operations reduites. | Lock-in, quotas, couts. |
Architecture edge avec bridge
Site industriel
│
├── Sensors / PLC / Machines
│
├── Edge Gateway
│
└── Local MQTT Broker
│
├── Local dashboard
├── Local rules
└── Bridge MQTT/TLS
│
▼
Central Broker / Cloud
│
├── Data lake
├── Time-series DB
├── Alerting
└── Global dashboardArchitecture cluster broker
Devices
│
▼
TCP/TLS Load Balancer
│
├── Broker Node 1
├── Broker Node 2
└── Broker Node 3
│
├── Session / subscription management
├── Metrics / logs
├── Auth / ACL
└── Bridges / connectors
│
├── Kafka
├── PostgreSQL
├── Redis
├── S3
└── WebhooksChecklist HA broker
[ ] Load balancer TCP/TLS configure
[ ] Health checks broker
[ ] Certificats TLS renouvelables
[ ] Config broker versionnee
[ ] ACL sauvegardees
[ ] Persistence configuree si necessaire
[ ] Retained messages strategy
[ ] Quotas clients
[ ] Monitoring CPU/RAM/FD
[ ] Metrics messages in/out
[ ] Alertes connect/disconnect
[ ] Test failover node
[ ] Test reconnexion clients
[ ] Test bridge loop prevention
[ ] Plan de rollback configChoisir le bon broker : matrice et runbook de decision
Matrice de choix rapide
| Situation | Choix probable | Pourquoi |
|---|---|---|
| Guide, lab, formation, prototype. | Mosquitto | Simple, rapide, tres lisible. |
| Gateway locale usine. | Mosquitto ou EMQX edge | Local, robuste, faible latence. |
| IoT massif multi-tenant. | EMQX ou HiveMQ | Cluster, observabilite, MQTT 5, integrations. |
| Production enterprise avec support. | HiveMQ | Support, gouvernance, SLA. |
| SI deja RabbitMQ. | RabbitMQ MQTT Plugin | Integration AMQP existante. |
| Stack AWS complete. | AWS IoT Core | Device certs, rules, integration AWS. |
| Stack Azure industrielle. | Azure IoT Hub | Device identity, IoT Hub routes, Device Twin. |
Scorecard broker
Noter chaque broker de 1 a 5 :
[ ] Simplicite installation
[ ] Support MQTT 5
[ ] TLS / certificats
[ ] ACL fines
[ ] Auth externe
[ ] Cluster / HA
[ ] Observabilite
[ ] Rules engine
[ ] Integrations data
[ ] Cout licence
[ ] Cout operations
[ ] Competence equipe
[ ] Documentation
[ ] Support vendor
[ ] PortabiliteRunbook d’evaluation en 2 jours
Jour 1 : fonctionnalite
1. Installer broker
2. Activer TLS
3. Creer 3 users : device, ingestion, command
4. Configurer ACL
5. Tester publish telemetry
6. Tester subscribe backend
7. Tester retained status
8. Tester Last Will
9. Tester QoS 1 et doublons
10. Tester logs et metrics
Jour 2 : production readiness
1. Redemarrer broker
2. Verifier persistence
3. Simuler coupure device
4. Simuler subscriber lent
5. Charger 10k messages
6. Tester wildcard controle
7. Tester rotation certificats
8. Exporter configuration
9. Configurer alertes
10. Rediger decision finaleConclusion pratique
- Apprendre / prototype / edge : Mosquitto.
- Gros IoT self-hosted : EMQX ou HiveMQ.
- Enterprise support : HiveMQ.
- RabbitMQ deja present : RabbitMQ MQTT Plugin.
- Cloud AWS/Azure assume : IoT Core ou IoT Hub.
URLs utiles
Mosquitto:
https://mosquitto.org/
EMQX:
https://www.emqx.com/
HiveMQ:
https://www.hivemq.com/
VerneMQ:
https://vernemq.com/
RabbitMQ MQTT plugin:
https://www.rabbitmq.com/docs/mqtt
AWS IoT Core:
https://docs.aws.amazon.com/iot/
Azure IoT Hub:
https://learn.microsoft.com/azure/iot-hub/Security model overview
MQTT security is not only about enabling TLS. A secure MQTT architecture requires strong client identity, topic-level authorization, network segmentation, certificate lifecycle management, payload validation, command auditing, rate limiting and full broker observability.
The broker is a critical control point. If it is exposed, misconfigured or too permissive, attackers can inject fake telemetry, subscribe to sensitive topics, overload the broker, send dangerous commands or pivot into backend systems.
| Threat | Impact | Protection |
|---|---|---|
| Fake device | False telemetry injection, fake status, wrong dashboards. | Per-device certificate, strict client ID mapping, ACLs. |
| Network sniffing | Industrial or business data exposure. | TLS on 8883, no public plaintext 1883. |
| Wildcard abuse | Massive data exfiltration through broad subscriptions. | Deny-by-default ACLs, no unrestricted #. |
| Payload attack | Parser crash, backend exception, storage pollution. | Schema validation, size limits, strict parsing. |
| Connection flood | Broker saturation, CPU/RAM exhaustion, socket exhaustion. | Rate limits, quotas, firewall, connection limits. |
| Unauthorized command | Dangerous device or machine action. | Dedicated command service, RBAC, audit, command ack/result. |
| Certificate theft | Device impersonation. | Revocation, rotation, secure provisioning, short validity. |
Security layers
MQTT Security Stack
├── Network layer
│ ├── private subnet
│ ├── firewall
│ ├── VPN / private link
│ └── no public plaintext listener
│
├── Transport layer
│ ├── TLS
│ ├── mTLS
│ ├── certificate validation
│ └── secure cipher configuration
│
├── Identity layer
│ ├── client_id
│ ├── username / JWT
│ ├── device certificate
│ └── identity registry
│
├── Authorization layer
│ ├── publish ACL
│ ├── subscribe ACL
│ ├── deny-by-default
│ └── topic namespace isolation
│
├── Application layer
│ ├── payload schema validation
│ ├── idempotency
│ ├── command audit
│ └── backend RBAC
│
└── Operations layer
├── logs
├── metrics
├── alerting
├── revocation
└── incident responseTLS and mTLS: securing the transport layer
TLS server authentication
With standard TLS, the client validates the broker certificate. This protects the connection against passive sniffing and man-in-the-middle attacks, provided the client correctly validates the broker certificate chain and hostname.
Client validates broker certificate:
Device
│
│ TLS handshake
│ verify broker certificate
│ verify CA
│ verify hostname if applicable
▼
MQTT Broker
Client authentication may still be:
- username/password
- JWT token
- API key
- external auth plugin| Mode | Broker identity | Client identity | Usage |
|---|---|---|---|
| TLS only | Certificate | Password, token or plugin | Web apps, backend services, simple deployments. |
| mTLS | Certificate | Client certificate | Industrial IoT, device fleets, high security. |
| Plain TCP | None | Weak or none | Local lab only, never public Internet. |
mTLS mutual authentication
With mTLS, both sides validate certificates. The client validates the broker certificate, and the broker validates the client certificate. This is the preferred model for serious industrial and IoT deployments.
mTLS handshake:
Device certificate
│
▼
┌─────────────────┐ validates broker cert ┌─────────────────┐
│ MQTT Client │ ─────────────────────────────────► │ MQTT Broker │
│ device gw-001 │ │ listener 8883 │
└─────────────────┘ ◄─────────────────────────────────┘
▲ validates client certificate
│
└── device identity mapped to:
- client_id
- tenant
- site
- allowed topics
- device registry recordRecommended listener policy
Public or semi-public network:
- expose 8883 only
- require TLS
- prefer mTLS
- disable anonymous access
- disable public 1883
- restrict by firewall
- log all auth failures
Private lab:
- 1883 allowed only inside trusted network
- never expose 1883 to the Internet
- use test credentials only
- no production devices1883 to the public Internet. Use 8883 with TLS, strict authentication and firewall rules.Identity model: client ID, certificate, registry and ownership
MQTT identity should not be improvised. A secure deployment ties the MQTT client ID to a device registry entry, a certificate or token, and a strict topic namespace. A device should not be able to choose another device identity by changing its client ID.
| Identity element | Role | Example |
|---|---|---|
| client_id | MQTT session identity. | gw-paris-001 |
| certificate CN / SAN | Cryptographic device identity. | device:gw-paris-001 |
| tenant_id | Business isolation. | acme |
| site_id | Operational perimeter. | paris |
| device_id | Asset identity in backend registry. | gw-001 |
| role | Authorization profile. | device, ingestion, command-service |
Identity mapping rule
client_id = gw-paris-001
Certificate subject:
CN = gw-paris-001
Device registry:
device_id = gw-paris-001
tenant = acme
site = paris
role = gateway
status = active
Allowed namespace:
prod/acme/paris/+/gw-paris-001/#Secure connection decision flow
Client CONNECT
│
▼
Validate TLS
│
▼
Validate client certificate or token
│
▼
Extract identity
│
▼
Lookup device registry
│
├── not found -> reject
├── disabled -> reject
├── expired cert -> reject
└── active -> continue
│
▼
Load ACL profile
│
▼
Accept connectionIdentity anti-patterns
| Anti-pattern | Risk | Correction |
|---|---|---|
| Client chooses arbitrary client_id. | Device impersonation. | Bind client_id to certificate or registry. |
| Shared credential for all devices. | One leak compromises the fleet. | Per-device identity. |
| No disabled-device state. | Lost device remains active. | Registry with active/disabled/revoked status. |
| No owner metadata. | Incident response is slow. | Track tenant, site, owner, installation date. |
Topic ACLs: publish and subscribe authorization
MQTT ACLs must be designed around topic namespaces. The default posture should be deny all. Each role receives the minimum publish and subscribe rights required. Devices should publish only under their own namespace and subscribe only to their own command topics.
ACL principles
deny all by default
allow device:
- publish own telemetry
- publish own status
- publish own events
- subscribe own commands
allow ingestion backend:
- subscribe telemetry
- subscribe status
- subscribe events
- no command publishing
allow command service:
- publish command topics
- subscribe ack/result topics
allow dashboard:
- subscribe only authorized tenant/site scope
log:
- denied publish
- denied subscribe
- auth failures
- unexpected wildcard attemptsACL example
Device gw-123:
allow publish:
prod/acme/paris/line-02/gw-123/telemetry/#
prod/acme/paris/line-02/gw-123/status/#
prod/acme/paris/line-02/gw-123/event/#
allow subscribe:
prod/acme/paris/line-02/gw-123/command/#
deny subscribe:
prod/acme/#
deny publish:
prod/acme/paris/line-02/other-device/#Role matrix
| Actor | Publish | Subscribe | Forbidden |
|---|---|---|---|
| Device | Own telemetry, status, events. | Own commands. | Tenant-wide read. |
| Gateway | Its local assets only. | Commands for its scope. | Other site topics. |
| Ingestion worker | None or internal ack only. | Telemetry/status/events. | Command publishing. |
| Command service | Command topics. | Ack/result/status topics. | Raw telemetry unless required. |
| Dashboard | None or controlled test topics. | User-authorized scope. | Global #. |
| Admin debug | Controlled only. | Temporary scoped debug. | Permanent unrestricted access. |
Dangerous subscriptions
High risk:
#
High risk:
prod/#
Still broad:
prod/acme/#
Safer:
prod/acme/paris/+/+/telemetry/temperature
Very specific:
prod/acme/paris/line-02/gw-123/status/availabilityCertificate lifecycle: provisioning, rotation and revocation
Certificates are powerful, but they create lifecycle responsibilities. A secure MQTT deployment needs a clear process for certificate generation, provisioning, storage, renewal, rotation, revocation and incident response.
| Step | Goal | Control |
|---|---|---|
| Generate | Create device identity. | Unique key pair per device. |
| Provision | Install certificate securely. | No shared secret in firmware image. |
| Register | Bind cert to device record. | Device registry mapping. |
| Operate | Use cert for mTLS. | Monitor certificate age and usage. |
| Rotate | Replace before expiry or compromise. | Grace period and dual trust if required. |
| Revoke | Disable lost or compromised device. | CRL, broker deny list, registry disabled flag. |
| Audit | Trace identity usage. | Connection logs and certificate fingerprint. |
Registry fields
Device registry fields:
- device_id
- tenant_id
- site_id
- client_id
- certificate_fingerprint
- certificate_serial
- certificate_not_before
- certificate_not_after
- status: active / disabled / revoked
- last_seen_at
- last_ip
- firmware_version
- owner
- rotation_due_atCertificate rotation flow
Rotation workflow:
1. Mark device as rotation_due.
2. Generate new certificate.
3. Provision new certificate securely.
4. Allow old and new certificate during grace window.
5. Device reconnects with new certificate.
6. Confirm new fingerprint in registry.
7. Revoke old certificate.
8. Remove old certificate from trusted set.
9. Audit rotation event.
10. Alert if old certificate is still used.Lost device incident
Lost or compromised device:
1. Set registry status = revoked.
2. Revoke certificate.
3. Remove active session from broker.
4. Block client_id and certificate fingerprint.
5. Search recent publish activity.
6. Search command activity.
7. Invalidate pending commands.
8. Notify tenant/site owner.
9. Rotate related secrets if shared.
10. Keep forensic logs.Payload protection: validation, limits and safe parsing
MQTT does not validate the business payload. A malicious or broken client can send malformed JSON, huge payloads, unexpected types, missing fields, invalid timestamps, command injection strings or data that causes backend errors.
| Risk | Example | Protection |
|---|---|---|
| Oversized payload | 10 MB JSON sent to telemetry topic. | Broker max packet size + backend max size. |
| Malformed JSON | Broken syntax crashes parser. | Strict parser, safe exception handling. |
| Schema mismatch | value is string instead of number. | Schema validation. |
| Timestamp abuse | Future date or year 1970. | Timestamp sanity window. |
| Metric pollution | Random metric names create high cardinality. | Allowed metric registry. |
| Command injection | Payload contains shell-like values. | Command schema, allowlist, no shell execution. |
Safe payload contract
Telemetry payload:
{
"schema_version": "1.0",
"message_id": "msg-20260425-001",
"device_id": "gw-001",
"metric": "temperature",
"value": 21.7,
"unit": "C",
"ts": "2026-04-25T10:00:00Z"
}
Validation rules:
- schema_version required
- message_id required for QoS 1
- device_id must match authenticated identity
- metric must be allowed
- value must match metric type
- timestamp must be sane
- payload size must be limitedValidation pipeline
MQTT PUBLISH
│
▼
Check topic ACL
│
▼
Check payload size
│
▼
Parse payload safely
│
▼
Validate schema version
│
▼
Validate identity consistency
│
▼
Validate metric allowlist
│
▼
Build idempotency key
│
▼
Write to buffer or database
│
▼
Expose failures in adminBackend failure table
| Field | Purpose |
|---|---|
| topic | Find broken namespace or wrong publisher. |
| client_id | Identify device or service. |
| payload_hash | Trace payload without storing sensitive data. |
| error_code | Classify failure. |
| error_message | Explain rejection. |
| first_seen_at | Incident timeline. |
| count | Detect repeated attack or broken firmware. |
Rate limiting, quotas and flood protection
MQTT brokers can be overloaded by connection storms, reconnect loops, message floods, broad subscriptions, oversized payloads or too many retained messages. Rate limiting and quotas protect the broker and the backend.
| Control | Purpose | Example policy |
|---|---|---|
| Connection rate limit | Prevent reconnect storms. | Max N new connections per IP per minute. |
| Message rate limit | Prevent publisher flood. | Max messages/sec per client. |
| Payload size limit | Prevent memory and parser abuse. | Max 64 KB or lower depending on use case. |
| Inflight limit | Protect slow consumers and broker state. | Receive Maximum in MQTT 5. |
| Subscription limit | Prevent wildcard or subscription abuse. | Max subscriptions per client. |
| Retained limit | Prevent retained store explosion. | Limit retained by namespace or role. |
| IP firewall | Block unwanted sources. | Allow VPN/private ranges only. |
Flood symptoms
- Broker CPU suddenly high.
- Connection rate spikes.
- Authentication failures increase.
- Message rate jumps unexpectedly.
- Queued messages grow continuously.
- Backend ingestion lag increases.
- Subscribers disconnect repeatedly.
Protection architecture
Internet / WAN
│
▼
Firewall / Security Group
│
├── allow trusted IP ranges
├── block known bad sources
└── expose 8883 only
│
▼
Load Balancer TCP/TLS
│
├── connection limits
├── health checks
└── TLS policy if terminated here
│
▼
MQTT Broker
│
├── auth
├── ACL
├── quotas
├── max packet size
├── max inflight
└── rate limits
│
▼
Backend buffer
│
├── Redis Stream
├── Kafka
└── worker queueAlerting table
| Alert | Signal | First action |
|---|---|---|
| Connection storm | New connections per minute spike. | Check client version, IPs, network outage. |
| Auth attack | Auth failures spike. | Block source, rotate credentials if needed. |
| Message flood | Messages/sec above baseline. | Identify top publishers. |
| Queue growth | Queued messages increase. | Scale consumers or throttle publishers. |
| Oversized payloads | Rejected packet size count. | Identify client and firmware version. |
Network segmentation: public, private, edge and backend zones
MQTT should be placed in a controlled network architecture. The broker should not have unrestricted access to all internal systems. Devices, brokers, backend workers, databases and admin tools should live in separated zones with explicit flows.
| Zone | Contains | Allowed flows |
|---|---|---|
| Device zone | Devices, gateways, edge clients. | Outbound MQTT/TLS to broker. |
| Broker zone | MQTT broker nodes. | Accept MQTT, send to backend connectors. |
| Backend zone | Ingestion workers, APIs, command service. | Subscribe/publish controlled topics. |
| Data zone | PostgreSQL, TimescaleDB, Kafka, S3. | Only backend services, not devices. |
| Admin zone | Ops consoles, dashboards, broker admin. | VPN, MFA, RBAC, audit. |
Rule of thumb
Devices should never connect directly to:
- PostgreSQL
- Redis
- internal admin APIs
- command database
- broker admin UI
- cloud credentials
- internal message queues
Devices should only talk to:
- MQTT listener
- controlled provisioning endpoint if neededNetwork diagram
┌───────────────────────────────┐
│ Device / Edge Zone │
│ gateways, sensors, PLC bridge │
└───────────────┬───────────────┘
│ MQTT/TLS 8883 only
▼
┌───────────────────────────────┐
│ MQTT Broker Zone │
│ broker nodes, auth, ACL │
└───────────────┬───────────────┘
│ controlled subscribe/publish
▼
┌───────────────────────────────┐
│ Backend Zone │
│ ingestion, command, API │
└───────────────┬───────────────┘
│ DB/Kafka access
▼
┌───────────────────────────────┐
│ Data Zone │
│ PostgreSQL, Kafka, S3, Redis │
└───────────────────────────────┘
Admin access:
VPN + MFA + RBAC + audit logsSegmentation checklist
[ ] MQTT listener exposed only where needed
[ ] 1883 disabled or private-only
[ ] 8883 protected by firewall
[ ] Broker admin UI not public
[ ] Backend databases not reachable from devices
[ ] Command service separated from ingestion
[ ] Admin access via VPN or private network
[ ] Security groups are least-privilege
[ ] Logs are centralized
[ ] Secrets are not stored on public imagesCommand security: the most sensitive MQTT flow
Commands are more dangerous than telemetry. A fake telemetry message can pollute data, but a fake command can reboot a gateway, change a setpoint, stop a machine or trigger unsafe behavior. Command topics require stricter controls.
| Control | Purpose | Example |
|---|---|---|
| Dedicated command service | Only one trusted service can publish commands. | command-service role. |
| Command ID | Deduplication and audit. | cmd-20260425-001 |
| Expiration | Prevent late execution. | expires_at. |
| Device-side validation | Reject unsafe or invalid commands. | Allowed command list. |
| Ack and result topics | Prove receipt and execution outcome. | ack/reboot, result/reboot. |
| Audit log | Trace operator/system action. | who, when, why, target, result. |
Secure command payload
{
"schema_version": "1.0",
"command_id": "cmd-20260425-001",
"action": "setpoint",
"value": 21.5,
"unit": "C",
"requested_by": "rules-engine",
"reason": "energy-optimization",
"expires_at": "2026-04-25T10:01:00Z"
}Command flow with audit
Operator / Rule Engine
│
▼
Command Service
│
├── RBAC check
├── device state check
├── command policy check
├── create command_id
├── write command journal
└── publish MQTT command
│
▼
MQTT Broker
│
▼
Device
│
├── validate command
├── check expiry
├── publish ack
├── execute
└── publish resultCommand topic ACL
Device gw-123:
allow subscribe:
prod/acme/paris/line-02/gw-123/command/#
allow publish:
prod/acme/paris/line-02/gw-123/ack/#
prod/acme/paris/line-02/gw-123/result/#
deny publish:
prod/acme/paris/line-02/gw-123/command/#
Command service:
allow publish:
prod/acme/+/+/+/command/#
allow subscribe:
prod/acme/+/+/+/ack/#
prod/acme/+/+/+/result/#Production security checklist
MQTT broker hardening
[ ] Disable anonymous access
[ ] Disable public plaintext 1883
[ ] Enable TLS on 8883
[ ] Prefer mTLS for devices
[ ] Use per-device identity
[ ] Bind client_id to certificate or registry
[ ] Enforce deny-by-default ACLs
[ ] Restrict wildcard subscriptions
[ ] Set max payload size
[ ] Set connection limits
[ ] Set message rate limits
[ ] Set max subscriptions per client
[ ] Set inflight limits
[ ] Log auth failures
[ ] Log denied publish/subscribe
[ ] Monitor broker metrics
[ ] Protect admin UI with VPN/MFA
[ ] Version broker configuration
[ ] Backup ACL and certificate configurationBackend hardening
[ ] Validate every payload schema
[ ] Enforce device_id identity consistency
[ ] Add idempotency keys
[ ] Store ingestion failures visibly
[ ] Reject unknown metrics
[ ] Reject stale or future timestamps
[ ] Audit command creation
[ ] Require command_id
[ ] Require command expiration
[ ] Require ack and result for commands
[ ] Keep command journal
[ ] Separate ingestion service and command service
[ ] Do not trust topic alone
[ ] Do not trust payload aloneIncident response checklist
Suspected compromised device:
1. Disable device in registry.
2. Revoke certificate or token.
3. Disconnect active broker session.
4. Block client_id and fingerprint.
5. Search recent telemetry from device.
6. Search command activity.
7. Invalidate pending commands.
8. Check same firmware batch.
9. Rotate shared secrets if any.
10. Preserve logs for forensics.
Suspected credential leak:
1. Rotate credential immediately.
2. Identify all affected clients.
3. Review auth failure logs.
4. Review unexpected topic access.
5. Tighten ACL if needed.
6. Add alert rule for reuse attempt.Security acceptance test
| Test | Expected result |
|---|---|
| Anonymous CONNECT. | Rejected. |
| Wrong certificate. | Rejected. |
Device subscribes to prod/acme/#. | Rejected and logged. |
| Device publishes to another device namespace. | Rejected and logged. |
| Device publishes command topic. | Rejected and logged. |
| Oversized payload. | Rejected before backend damage. |
| Revoked device reconnects. | Rejected. |
| Reconnect storm. | Rate limited and alerted. |
Backend architecture overview
A robust Python MQTT backend should not process heavy business logic directly inside the MQTT callback. The callback must stay fast: parse minimally, validate the envelope, build metadata, then push work to a durable or semi-durable processing layer such as Redis Streams, Celery, Kafka, RabbitMQ or a staging table.
The production pattern is: receive, validate, buffer, deduplicate, process, store, alert and expose failures in the Django Admin.
| Layer | Responsibility | Python component |
|---|---|---|
| MQTT client | Connect, subscribe, reconnect, receive messages. | Paho, asyncio-mqtt, gmqtt. |
| Topic parser | Extract tenant, site, asset, stream, metric. | Pure Python service. |
| Payload validator | Validate JSON, schema, metric, timestamp and identity. | Pydantic, dataclasses or custom validation. |
| Buffer | Decouple broker speed from backend speed. | Redis Streams, Celery queue, Kafka. |
| Workers | Deduplicate, enrich, write state/history, trigger rules. | Celery workers, async workers, management commands. |
| Storage | Persist current state, history and rejected messages. | Django ORM, PostgreSQL, TimescaleDB, Redis. |
Recommended ingestion flow
MQTT Broker
│
│ PUBLISH telemetry/status/event
▼
Python MQTT Subscriber
│
├── fast callback
├── parse topic
├── parse payload
├── basic validation
└── enqueue record
│
▼
Redis Stream / Celery / Kafka
│
▼
Processing Workers
│
├── strict validation
├── identity check
├── idempotency check
├── enrich metadata
├── write current state
├── write history
├── trigger rules
└── record failures
Django Admin
├── devices
├── latest state
├── ingestion errors
├── duplicate messages
├── command journal
└── operational metricsRobust Paho subscriber
Paho is the most common Python MQTT client. For a backend ingestion service, it is usually started from a Django management command or a dedicated daemon. The callback receives the MQTT message and quickly pushes it to a buffer.
import json
import logging
import ssl
import time
from typing import Any, Dict
from paho.mqtt import client as mqtt
LOG = logging.getLogger("mqtt_ingestion")
BROKER_HOST = "mqtt.example.com"
BROKER_PORT = 8883
CLIENT_ID = "backend-ingestion-01"
TOPICS = [
("prod/acme/+/+/+/telemetry/#", 1),
("prod/acme/+/+/+/status/#", 1),
("prod/acme/+/+/+/event/#", 1),
]
def build_record(msg: mqtt.MQTTMessage) -> Dict[str, Any]:
payload_raw = msg.payload.decode("utf-8", errors="replace")
return {
"topic": msg.topic,
"qos": msg.qos,
"retain": bool(msg.retain),
"payload_raw": payload_raw,
"received_at_epoch": time.time(),
}
def enqueue_record(record: Dict[str, Any]) -> None:
"""
Push to Redis Stream, Celery, Kafka or a staging table.
This function must stay fast and reliable.
"""
LOG.info("enqueue mqtt topic=%s qos=%s", record["topic"], record["qos"])
def on_connect(client, userdata, flags, reason_code, properties=None):
LOG.info("mqtt connected reason=%s flags=%s", reason_code, flags)
for topic, qos in TOPICS:
result, mid = client.subscribe(topic, qos=qos)
LOG.info("subscribe topic=%s qos=%s result=%s mid=%s", topic, qos, result, mid)
def on_disconnect(client, userdata, reason_code, properties=None):
LOG.warning("mqtt disconnected reason=%s", reason_code)
def on_message(client, userdata, msg):
try:
record = build_record(msg)
if len(record["payload_raw"]) > 65536:
LOG.warning("mqtt payload too large topic=%s", msg.topic)
return
enqueue_record(record)
except Exception:
LOG.exception("mqtt callback failed topic=%s", getattr(msg, "topic", "unknown"))
def create_client() -> mqtt.Client:
client = mqtt.Client(
client_id=CLIENT_ID,
protocol=mqtt.MQTTv5,
callback_api_version=mqtt.CallbackAPIVersion.VERSION2,
)
client.tls_set(cert_reqs=ssl.CERT_REQUIRED)
client.username_pw_set("backend_ingestion", "CHANGE_ME")
client.on_connect = on_connect
client.on_disconnect = on_disconnect
client.on_message = on_message
client.reconnect_delay_set(min_delay=1, max_delay=60)
return client
def main() -> None:
logging.basicConfig(level=logging.INFO)
client = create_client()
client.connect(BROKER_HOST, BROKER_PORT, keepalive=60)
client.loop_forever()
if __name__ == "__main__":
main()Subscriber production rules
| Rule | Reason |
|---|---|
| Keep callback fast. | Prevents broker-side and client-side backlog. |
| Log topic, qos, retain and size. | Makes incidents diagnosable. |
| Never trust payload. | Payload can be malformed or malicious. |
| Do not write directly to multiple tables in callback. | Slow and fragile under load. |
| Use reconnect backoff. | Prevents reconnect storms. |
| Use stable client ID for backend subscribers. | Allows persistent sessions if needed. |
Common failure modes
Failure:
Database is slow.
Bad design:
on_message writes directly to database.
Result:
MQTT callback blocks.
Client cannot consume fast enough.
Broker queues grow.
Messages are delayed or dropped.
Better design:
on_message pushes to Redis Stream.
Workers write to database in batches.
Failures are stored and visible in admin.Python publisher for device or gateway simulation
A publisher can represent a device, a gateway, an edge agent or a backend command service. For telemetry, the payload should include schema version, message ID, device ID, metric, value, unit and timestamp.
import json
import ssl
import time
import uuid
from datetime import datetime, timezone
from paho.mqtt import client as mqtt
BROKER_HOST = "mqtt.example.com"
BROKER_PORT = 8883
TENANT = "acme"
SITE = "paris"
AREA = "line-02"
DEVICE_ID = "gw-123"
CLIENT_ID = DEVICE_ID
USERNAME = "gw_123"
PASSWORD = "CHANGE_ME"
def utc_now() -> str:
return datetime.now(timezone.utc).isoformat().replace("+00:00", "Z")
def build_payload(sequence: int) -> dict:
return {
"schema_version": "1.0",
"message_id": str(uuid.uuid4()),
"device_id": DEVICE_ID,
"sequence": sequence,
"metric": "temperature",
"value": 21.8,
"unit": "C",
"ts": utc_now(),
}
def build_topic() -> str:
return f"prod/{TENANT}/{SITE}/{AREA}/{DEVICE_ID}/telemetry/temperature"
client = mqtt.Client(
client_id=CLIENT_ID,
protocol=mqtt.MQTTv5,
callback_api_version=mqtt.CallbackAPIVersion.VERSION2,
)
client.tls_set(cert_reqs=ssl.CERT_REQUIRED)
client.username_pw_set(USERNAME, PASSWORD)
client.connect(BROKER_HOST, BROKER_PORT, keepalive=60)
client.loop_start()
sequence = 0
try:
while True:
sequence += 1
payload = build_payload(sequence)
topic = build_topic()
result = client.publish(
topic,
json.dumps(payload, separators=(",", ":")),
qos=1,
retain=False,
)
result.wait_for_publish(timeout=5)
print(f"published topic={topic} mid={result.mid} rc={result.rc}")
time.sleep(10)
finally:
client.loop_stop()
client.disconnect()Publisher design checklist
| Element | Required? | Reason |
|---|---|---|
| Stable client_id | Yes | Identity, sessions, logs and ACL mapping. |
| message_id | Yes for QoS 1 | Backend idempotency. |
| sequence | Recommended | Detect gaps, duplicates and ordering issues. |
| schema_version | Yes | Safe evolution of payloads. |
| timestamp | Yes | Event time vs ingestion time. |
| retain flag | Only for state | Telemetry should usually not be retained. |
Status publisher with retained message
status_topic = "prod/acme/paris/line-02/gw-123/status/availability"
online_payload = {
"schema_version": "1.0",
"device_id": "gw-123",
"state": "online",
"ts": utc_now(),
}
client.publish(
status_topic,
json.dumps(online_payload),
qos=1,
retain=True,
)Django project layout for MQTT ingestion
In Django, the cleanest approach is to isolate MQTT ingestion in a dedicated app. The MQTT service should be executable as a management command, supervised by systemd, Docker, Kubernetes or your orchestrator.
mqtt_app/
__init__.py
management/
__init__.py
commands/
__init__.py
mqtt_ingest.py
mqtt_publish_test.py
mqtt_healthcheck.py
services/
__init__.py
mqtt_client.py
topic_parser.py
payload_validator.py
idempotency.py
redis_streams.py
metrics.py
command_router.py
models.py
admin.py
tasks.py
apps.py
urls.py
views.pyManagement command pattern
from django.core.management.base import BaseCommand
from mqtt_app.services.mqtt_client import run_mqtt_ingestion
class Command(BaseCommand):
help = "Run MQTT ingestion service."
def add_arguments(self, parser):
parser.add_argument("--client-id", default="backend-ingestion-01")
parser.add_argument("--log-level", default="INFO")
def handle(self, *args, **options):
client_id = options["client_id"]
log_level = options["log_level"]
self.stdout.write(f"starting mqtt ingestion client_id={client_id}")
run_mqtt_ingestion(client_id=client_id, log_level=log_level)Service responsibilities
| File | Responsibility |
|---|---|
| mqtt_client.py | Create Paho client, callbacks, reconnect, subscriptions. |
| topic_parser.py | Parse tenant, site, area, asset, stream and signal. |
| payload_validator.py | Validate JSON, schema version, identity and metric. |
| idempotency.py | Build keys, detect duplicates, update duplicate counters. |
| redis_streams.py | Push raw messages to Redis Stream and read consumer groups. |
| tasks.py | Celery tasks for processing and storage. |
| models.py | Device, state, telemetry, rejected messages, inbox. |
| admin.py | Operational visibility for ingestion and failures. |
Runtime options
Deployment options:
- systemd service running manage.py mqtt_ingest
- Docker container running mqtt_ingest
- Kubernetes Deployment with one or more subscribers
- Celery worker consuming Redis/Kafka buffer
- separate command service for outbound MQTT commands
Avoid:
- running MQTT client inside a web request
- starting MQTT loop from AppConfig.ready()
- hidden background threads inside Django web workersPayload and topic validation
Validation must combine topic validation and payload validation. A payload claiming device_id=gw-999 must not be accepted if it was published under gw-123. The backend should reject mismatches and store them as security or quality failures.
| Check | Reason | Failure action |
|---|---|---|
| Topic shape | Prevent invalid routing. | Reject and log. |
| JSON parse | Prevent backend crashes. | Reject and store error. |
| schema_version | Support safe evolution. | Route to correct parser or reject. |
| device_id matches topic | Prevent impersonation. | Security reject. |
| timestamp sanity | Prevent future or stale data pollution. | Quarantine or reject. |
| metric allowlist | Prevent cardinality explosion. | Reject. |
| payload size | Anti-flood control. | Drop, reject or alert. |
Topic parser example
from dataclasses import dataclass
@dataclass(frozen=True)
class ParsedTopic:
env: str
tenant: str
site: str
area: str
asset: str
stream: str
signal: str
def parse_topic(topic: str) -> ParsedTopic:
parts = topic.split("/")
if len(parts) != 7:
raise ValueError(f"invalid topic shape: {topic}")
env, tenant, site, area, asset, stream, signal = parts
allowed_streams = {"telemetry", "status", "event", "command", "ack", "result"}
if stream not in allowed_streams:
raise ValueError(f"invalid stream: {stream}")
return ParsedTopic(
env=env,
tenant=tenant,
site=site,
area=area,
asset=asset,
stream=stream,
signal=signal,
)Payload validator example
import json
from datetime import datetime, timezone, timedelta
from typing import Any, Dict
ALLOWED_METRICS = {"temperature", "pressure", "vibration", "rpm"}
def parse_json_payload(payload_raw: str) -> Dict[str, Any]:
try:
payload = json.loads(payload_raw)
except json.JSONDecodeError as exc:
raise ValueError(f"invalid json: {exc}") from exc
if not isinstance(payload, dict):
raise ValueError("payload must be a JSON object")
return payload
def validate_telemetry_payload(parsed_topic, payload: Dict[str, Any]) -> Dict[str, Any]:
if payload.get("schema_version") != "1.0":
raise ValueError("unsupported schema_version")
if payload.get("device_id") != parsed_topic.asset:
raise ValueError("device_id does not match topic asset")
metric = payload.get("metric")
if metric not in ALLOWED_METRICS:
raise ValueError(f"metric is not allowed: {metric}")
if metric != parsed_topic.signal:
raise ValueError("metric does not match topic signal")
value = payload.get("value")
if not isinstance(value, (int, float)):
raise ValueError("value must be numeric")
ts_raw = payload.get("ts")
if not isinstance(ts_raw, str):
raise ValueError("ts is required")
ts = datetime.fromisoformat(ts_raw.replace("Z", "+00:00"))
now = datetime.now(timezone.utc)
if ts > now + timedelta(minutes=5):
raise ValueError("timestamp is too far in the future")
if ts < now - timedelta(days=30):
raise ValueError("timestamp is too old")
return payloadFailure categories
Validation failure categories:
- invalid_topic
- invalid_json
- unsupported_schema
- identity_mismatch
- unknown_metric
- invalid_value_type
- stale_timestamp
- future_timestamp
- oversized_payload
- security_violationCelery processing pipeline
Celery is useful when MQTT ingestion must trigger asynchronous processing, database writes, alerting, enrichment or WebSocket updates. The MQTT callback can enqueue a task, and workers process messages independently.
tasks.py example
import hashlib
import json
import logging
from celery import shared_task
from django.db import transaction
from mqtt_app.services.topic_parser import parse_topic
from mqtt_app.services.payload_validator import (
parse_json_payload,
validate_telemetry_payload,
)
LOG = logging.getLogger("mqtt_tasks")
def payload_hash(payload_raw: str) -> str:
return hashlib.sha256(payload_raw.encode("utf-8")).hexdigest()
@shared_task(
bind=True,
autoretry_for=(Exception,),
retry_backoff=True,
retry_kwargs={"max_retries": 5},
)
def process_mqtt_record(self, record: dict) -> str:
topic = record["topic"]
payload_raw = record["payload_raw"]
try:
parsed_topic = parse_topic(topic)
payload = parse_json_payload(payload_raw)
if parsed_topic.stream == "telemetry":
payload = validate_telemetry_payload(parsed_topic, payload)
return process_telemetry(parsed_topic, payload, payload_raw)
if parsed_topic.stream == "status":
return process_status(parsed_topic, payload, payload_raw)
if parsed_topic.stream == "event":
return process_event(parsed_topic, payload, payload_raw)
return "ignored"
except ValueError as exc:
store_rejected_message(record, str(exc))
return "rejected"
def process_telemetry(parsed_topic, payload: dict, payload_raw: str) -> str:
key = build_idempotency_key(parsed_topic, payload)
with transaction.atomic():
inbox_created = create_inbox_if_new(
idempotency_key=key,
topic="/".join([
parsed_topic.env,
parsed_topic.tenant,
parsed_topic.site,
parsed_topic.area,
parsed_topic.asset,
parsed_topic.stream,
parsed_topic.signal,
]),
payload_hash_value=payload_hash(payload_raw),
)
if not inbox_created:
return "duplicate"
write_telemetry(parsed_topic, payload)
update_current_state(parsed_topic, payload)
return "processed"Celery topology
MQTT Subscriber
│
│ delay()
▼
Celery Broker
│
├── queue: mqtt.telemetry
├── queue: mqtt.status
├── queue: mqtt.event
└── queue: mqtt.command_result
│
▼
Celery Workers
│
├── validation
├── idempotency
├── database writes
├── alerting
└── websocket fanoutQueue separation
| Queue | Content | Priority |
|---|---|---|
| mqtt.telemetry | Frequent measurements. | Medium. |
| mqtt.status | Current device state. | High. |
| mqtt.event | Alarms and business events. | High. |
| mqtt.low_priority | Verbose logs or optional metrics. | Low. |
| mqtt.dead_letter | Failed records. | Manual review. |
Redis Streams as MQTT ingestion buffer
Redis Streams can act as a fast buffer between the MQTT subscriber and the processing workers. This gives you consumer groups, pending entries, replay, lag visibility and better backpressure control than a direct database write inside the callback.
Producer example
import redis
redis_client = redis.Redis(
host="localhost",
port=6379,
db=0,
decode_responses=True,
)
STREAM_NAME = "mqtt:ingestion"
def enqueue_record_to_stream(record: dict) -> str:
message_id = redis_client.xadd(
STREAM_NAME,
fields={
"topic": record["topic"],
"qos": str(record["qos"]),
"retain": "1" if record["retain"] else "0",
"payload_raw": record["payload_raw"],
"received_at_epoch": str(record["received_at_epoch"]),
},
maxlen=100000,
approximate=True,
)
return message_idConsumer group example
STREAM_NAME = "mqtt:ingestion"
GROUP_NAME = "mqtt-workers"
CONSUMER_NAME = "worker-01"
def ensure_group():
try:
redis_client.xgroup_create(
STREAM_NAME,
GROUP_NAME,
id="0",
mkstream=True,
)
except redis.exceptions.ResponseError as exc:
if "BUSYGROUP" not in str(exc):
raise
def consume_forever():
ensure_group()
while True:
response = redis_client.xreadgroup(
groupname=GROUP_NAME,
consumername=CONSUMER_NAME,
streams={STREAM_NAME: ">"},
count=50,
block=5000,
)
for stream_name, messages in response:
for message_id, fields in messages:
try:
process_record(fields)
redis_client.xack(STREAM_NAME, GROUP_NAME, message_id)
except Exception:
LOG.exception("stream processing failed id=%s", message_id)Redis Streams architecture
MQTT callback
│
│ XADD mqtt:ingestion
▼
Redis Stream
│
├── consumer group: mqtt-workers
│ ├── worker-01
│ ├── worker-02
│ └── worker-03
│
├── pending entries list
├── lag metrics
└── replay supportWhat to monitor
| Metric | Meaning | Action |
|---|---|---|
| Stream length | Buffer growth. | Scale workers or reduce input. |
| Consumer lag | Workers behind producers. | Increase workers, optimize storage. |
| Pending entries | Messages delivered but not acked. | Recover stuck consumers. |
| Processing time | Worker speed. | Batch writes or profile code. |
| Rejected count | Bad payloads or attacks. | Inspect firmware, ACL or threat. |
Idempotency for MQTT messages
MQTT QoS 1 may deliver duplicates. Worker retries may also create duplicates. Idempotency ensures the same message can be received more than once without causing duplicate business records or unsafe repeated actions.
Django models example
from django.db import models
class MQTTInboxMessage(models.Model):
idempotency_key = models.CharField(max_length=128)
topic_hash = models.CharField(max_length=64)
payload_hash = models.CharField(max_length=64)
client_id = models.CharField(max_length=128, blank=True, default="")
topic_preview = models.CharField(max_length=512, blank=True, default="")
status = models.CharField(max_length=32, default="pending")
duplicate_count = models.IntegerField(default=0)
first_seen_at = models.DateTimeField(auto_now_add=True)
last_seen_at = models.DateTimeField(auto_now=True)
class Meta:
constraints = [
models.UniqueConstraint(
fields=["idempotency_key"],
name="uniq_mqtt_inbox_key",
)
]
class MQTTRejectedMessage(models.Model):
topic_preview = models.CharField(max_length=512, blank=True, default="")
payload_hash = models.CharField(max_length=64, blank=True, default="")
error_code = models.CharField(max_length=64)
error_message = models.TextField(blank=True, default="")
count = models.IntegerField(default=1)
first_seen_at = models.DateTimeField(auto_now_add=True)
last_seen_at = models.DateTimeField(auto_now=True)Idempotency key builder
import hashlib
def sha256_text(value: str) -> str:
return hashlib.sha256(value.encode("utf-8")).hexdigest()
def build_idempotency_key(parsed_topic, payload: dict) -> str:
message_id = payload.get("message_id")
if message_id:
return sha256_text(f"message_id:{message_id}")
sequence = payload.get("sequence")
if sequence is not None:
raw = (
f"device_sequence:"
f"{parsed_topic.tenant}:"
f"{parsed_topic.site}:"
f"{parsed_topic.asset}:"
f"{sequence}"
)
return sha256_text(raw)
ts = payload.get("ts", "")
metric = payload.get("metric", parsed_topic.signal)
raw = (
f"fallback:"
f"{parsed_topic.tenant}:"
f"{parsed_topic.site}:"
f"{parsed_topic.asset}:"
f"{metric}:"
f"{ts}"
)
return sha256_text(raw)Processing decision
Incoming message
│
▼
Build idempotency key
│
▼
Try insert MQTTInboxMessage
│
├── insert succeeds
│ └── process normally
│
└── unique conflict
├── increment duplicate_count
└── skip business write| Key strategy | Best for |
|---|---|
| message_id | Events and QoS 1 messages. |
| device_id + sequence | Ordered telemetry from devices. |
| device_id + metric + ts | Fallback for time-series data. |
| command_id | Commands and command results. |
Storage model: hot state, history, rejects and audit
MQTT messages should usually be split into several storage targets: hot state for dashboards, historical measurements for analytics, event tables for audit and rejected messages for operational troubleshooting.
| Data type | Storage | Purpose |
|---|---|---|
| Current device state | Redis + relational snapshot | Fast dashboard and latest known state. |
| Telemetry history | PostgreSQL partitioning, TimescaleDB, InfluxDB, ClickHouse | Time-series analysis. |
| Events and alarms | Relational event table | Audit, alerting, incident review. |
| Commands | Command journal | Trace request, ack, result and timeout. |
| Rejected messages | Django model | Debug firmware, attacks, schema issues. |
| Raw archive | S3 or object storage | Forensics and replay if required. |
Telemetry model example
class MQTTTelemetryPoint(models.Model):
tenant = models.CharField(max_length=64)
site = models.CharField(max_length=64)
area = models.CharField(max_length=64)
asset = models.CharField(max_length=128)
metric = models.CharField(max_length=64)
value_float = models.FloatField(null=True, blank=True)
unit = models.CharField(max_length=32, blank=True, default="")
event_ts = models.DateTimeField()
ingestion_ts = models.DateTimeField(auto_now_add=True)
message_key = models.CharField(max_length=128)
quality = models.CharField(max_length=32, blank=True, default="good")Storage architecture
Processing worker
│
├── Redis hot state
│ └── latest value per asset/metric
│
├── Time-series table
│ └── historical telemetry
│
├── Event table
│ └── alarms, errors, lifecycle events
│
├── Command journal
│ └── command, ack, result, timeout
│
├── Rejected messages
│ └── validation and security failures
│
└── Object archive
└── optional raw payload retentionHot state Redis key example
Key:
mqtt:state:prod:acme:paris:line-02:gw-123:temperature
Value:
{
"value": 21.8,
"unit": "C",
"event_ts": "2026-04-25T10:00:00Z",
"ingestion_ts": "2026-04-25T10:00:02Z",
"quality": "good"
}Retention strategy
| Data | Retention | Reason |
|---|---|---|
| Hot state | Current only | Dashboard and latest status. |
| Raw telemetry | Days to months | Depends on volume and compliance. |
| Aggregates | Months to years | Trend and analytics. |
| Command journal | Longer retention | Audit and safety. |
| Rejected messages | Short to medium | Debug and security forensics. |
Production runbook for Python MQTT backend
Deployment checklist
[ ] MQTT daemon runs outside web workers
[ ] Stable backend client_id
[ ] TLS enabled
[ ] Credentials loaded from environment or secret manager
[ ] Topics configured by settings
[ ] Reconnect backoff enabled
[ ] Callback does not block
[ ] Payload size limit enforced
[ ] Validation errors stored
[ ] Idempotency implemented
[ ] Redis/Celery/Kafka buffer monitored
[ ] Worker lag monitored
[ ] Database write latency monitored
[ ] Duplicate count visible
[ ] Rejected messages visible in admin
[ ] Dead-letter or retry policy defined
[ ] Graceful shutdown implemented
[ ] systemd/Docker/Kubernetes restart policy set
[ ] Healthcheck command available
[ ] Logs include topic, client_id, message_id and error codesystemd service example
[Unit]
Description=MQTT ingestion service
After=network-online.target
Wants=network-online.target
[Service]
User=www-data
WorkingDirectory=/srv/ideo-lab
Environment=DJANGO_SETTINGS_MODULE=project.settings
ExecStart=/srv/venv/bin/python manage.py mqtt_ingest --client-id backend-ingestion-01
Restart=always
RestartSec=5
TimeoutStopSec=30
[Install]
WantedBy=multi-user.targetIncident diagnosis table
| Symptom | Likely cause | First checks |
|---|---|---|
| No messages received | Subscription rejected, ACL, broker connection. | CONNACK, SUBACK, broker logs. |
| Duplicate rows | QoS 1 retry, missing idempotency. | message_id, inbox table, unique key. |
| High ingestion lag | Workers too slow or database bottleneck. | Redis lag, Celery queue, DB latency. |
| Many rejected messages | Firmware bug, schema drift, attack. | Error categories, top devices, payload hashes. |
| MQTT reconnect loop | TLS/auth/client ID conflict. | Broker logs, certificates, client_id reuse. |
| Callback crashes | Uncaught parsing or encoding error. | Exception logs and rejected message path. |
Healthcheck command idea
Healthcheck should verify:
- broker reachable
- TLS handshake works
- backend credentials accepted
- test subscription succeeds
- Redis or Celery broker reachable
- database reachable
- last processed message age below threshold
- rejected message spike below threshold
- worker lag below thresholdWhy MQTT is used in real applications
MQTT is useful whenever many distributed clients need to send small or medium messages to one or several backend consumers in near real time. Its strongest domains are IoT, industrial telemetry, fleet tracking, energy monitoring, logistics, smart buildings and edge observability.
The common pattern is always the same: devices publish state, telemetry or events; the broker routes messages; backend services store, alert, visualize or command devices.
| Domain | Typical data | MQTT value | Backend target |
|---|---|---|---|
| Smart home / building | Temperature, humidity, presence, heating state. | Live state and lightweight device communication. | Home Assistant, Django dashboard, Redis, time-series DB. |
| Industrial IoT | PLC data, machine state, alarms, vibration, pressure. | Decouples shop floor from backend systems. | Historian, SCADA bridge, alerting, analytics. |
| Energy | Power, kWh, inverter status, load shedding commands. | Real-time monitoring and control. | Billing, audit, time-series, rules engine. |
| Fleet / vehicles | GPS, battery, speed, shock, connectivity. | Efficient mobile telemetry over unstable networks. | Maps, alerts, tracking, maintenance. |
| Logistics | Cold chain, asset location, door events, package sensors. | Event-driven monitoring and exceptions. | WMS, TMS, alerting, compliance records. |
| Health / care | Device status, room sensors, non-critical telemetry. | Lightweight edge connectivity. | Monitoring dashboards, alerts, audit. |
Generic MQTT application architecture
Devices / Gateways / Edge Agents
│
│ MQTT/TLS
▼
MQTT Broker
│
├── retained state
├── Last Will status
├── ACL by topic
├── QoS delivery
└── topic routing
│
├── Dashboard subscriber
├── Ingestion worker
├── Alerting engine
├── Command service
└── Data pipeline
├── Redis hot state
├── PostgreSQL / TimescaleDB
├── Kafka / object archive
└── ML / analyticsMessage families
Telemetry:
- frequent measurements
- QoS 0 or 1
- usually not retained
- time-series storage
Status:
- current state
- QoS 1
- often retained
- dashboard and availability
Event:
- punctual business or technical event
- QoS 1
- audit/event table
Command:
- action request
- QoS 1 or 2
- command_id required
- ack and result topics requiredSmart home / smart building
MQTT is widely used in home automation and smart building scenarios because devices are small, local networks may be unstable, messages are lightweight, and dashboards need immediate state updates.
Typical architecture
Sensors / Switches / Thermostats
│
│ Wi-Fi / Zigbee gateway / Ethernet
▼
Local MQTT Broker
│
├── Home Assistant
├── Django dashboard
├── Rules engine
├── Redis hot state
└── Time-series DBTopic examples
home/floor-1/room-12/temperature
home/floor-1/room-12/humidity
home/floor-1/room-12/presence
home/floor-1/room-12/status/heater
home/floor-1/room-12/command/heater
home/floor-1/room-12/ack/heater| Need | MQTT pattern | Reason |
|---|---|---|
| Latest known temperature | Retained state | Dashboard gets state immediately. |
| Device offline detection | Last Will retained status | Broker publishes offline if device disappears. |
| Heating command | Command topic + ack topic | Proves command was received. |
| Live dashboard | MQTT subscriber to WebSocket | Browser updates in real time. |
Heating control flow
Temperature sensor
│
│ PUBLISH home/floor-1/room-12/temperature
▼
MQTT Broker
│
├── Dashboard updates value
├── Time-series stores history
└── Rules engine checks threshold
│
▼
Command service publishes:
home/floor-1/room-12/command/heater
│
▼
Heater device receives command
│
├── validates command_id
├── applies setpoint
└── publishes ack/heaterPayload examples
Temperature:
{
"schema_version": "1.0",
"message_id": "msg-001",
"value": 21.7,
"unit": "C",
"ts": "2026-04-25T10:00:00Z"
}
Heater command:
{
"schema_version": "1.0",
"command_id": "cmd-001",
"action": "set_mode",
"mode": "eco",
"expires_at": "2026-04-25T10:01:00Z"
}
Ack:
{
"command_id": "cmd-001",
"status": "accepted",
"ts": "2026-04-25T10:00:05Z"
}Industrial IoT / factory / machine telemetry
Industrial MQTT connects machines, PLCs, gateways, historians, dashboards and backend systems. The gateway often translates Modbus, OPC UA, serial protocols or vendor-specific APIs into MQTT topics.
Industrial architecture
PLC / Sensors / Machines
│
│ OPC UA / Modbus / Serial / Vendor API
▼
Edge Gateway
│
├── normalize values
├── map asset model
├── buffer during network outage
└── publish MQTT
│
▼
MQTT Broker
│
├── Historian
├── SCADA bridge
├── Alerting
├── Maintenance analytics
└── Enterprise backendTopic examples
factory/a1/line-7/press-2/telemetry/pressure
factory/a1/line-7/press-2/telemetry/temperature
factory/a1/line-7/press-2/telemetry/vibration
factory/a1/line-7/press-2/event/alarm
factory/a1/line-7/press-2/status/availability
factory/a1/line-7/press-2/command/reset
factory/a1/line-7/press-2/ack/reset| Industrial need | MQTT pattern | Storage |
|---|---|---|
| Machine measurements | Telemetry topics, QoS 0/1. | Historian or time-series DB. |
| Machine availability | Status retained + LWT. | Redis state + event history. |
| Alarm events | Event topics, QoS 1. | Audit/event table. |
| Machine commands | Command + ack + result. | Command journal. |
Industrial semantic layer
Raw machine signal:
PLC register 40021 = 723
Gateway normalization:
{
"asset": "press-2",
"metric": "pressure",
"value": 72.3,
"unit": "bar",
"quality": "good",
"ts": "2026-04-25T10:00:00Z"
}
MQTT topic:
factory/a1/line-7/press-2/telemetry/pressureSparkplug-style thinking
Industrial MQTT often needs more than basic topics:
- asset model
- birth certificate
- death certificate
- typed metrics
- device state
- metric quality
- node/device hierarchy
- consistent payload structure
- lifecycle events
This is why Sparkplug or OPC UA bridge patterns are common in serious IIoT.Risk points
| Risk | Mitigation |
|---|---|
| Machine command executed twice. | command_id + device idempotency + result topic. |
| Network outage between site and cloud. | Edge broker + local buffer + replay with sequence. |
| Bad unit conversion. | Metric registry with unit and type. |
| Too many raw tags. | Normalize and aggregate at gateway. |
Energy: meters, solar, batteries and load control
Energy systems use MQTT for real-time production, consumption, battery, inverter and load control data. MQTT fits well because measurements are frequent, distributed and often produced by gateways or controllers.
| Flow | Topic | QoS | Storage |
|---|---|---|---|
| Solar production | energy/site-01/inverter-3/telemetry/power | 0/1 | Time-series. |
| Consumption | energy/site-01/meter-main/telemetry/kwh | 1 | Billing/audit. |
| Battery state | energy/site-01/battery-1/status/soc | 1 | Current state + history. |
| Inverter alarm | energy/site-01/inverter-3/event/alarm | 1 | Events. |
| Load shedding | energy/site-01/controller/command/load-shed | 1/2 | Command journal. |
Energy site architecture
Meters / Inverters / Batteries
│
▼
Energy Gateway
│
├── normalize kW, kWh, voltage, current
├── compute local aggregates
├── buffer if WAN down
└── publish MQTT
│
▼
MQTT Broker
│
├── live dashboard
├── billing/audit pipeline
├── optimization engine
└── load control command serviceEnergy control loop
Meter publishes consumption
│
▼
Broker routes telemetry
│
├── dashboard updates
├── time-series stores kWh
└── optimization engine evaluates threshold
│
▼
Command service publishes:
energy/site-01/controller/command/load-shed
│
▼
Controller acknowledges:
energy/site-01/controller/ack/load-shed
│
▼
Controller result:
energy/site-01/controller/result/load-shedPayload examples
Power telemetry:
{
"schema_version": "1.0",
"message_id": "msg-pwr-001",
"metric": "power",
"value": 4.82,
"unit": "kW",
"ts": "2026-04-25T10:00:00Z"
}
Load shed command:
{
"schema_version": "1.0",
"command_id": "cmd-ls-001",
"action": "load_shed",
"target_kw": 2.5,
"duration_seconds": 300,
"expires_at": "2026-04-25T10:01:00Z"
}Connected vehicles and fleet telemetry
Vehicles and fleets are a natural MQTT use case: mobile networks are unstable, data is frequent, payloads are small, and events must be routed to dashboards, alerts, maintenance and analytics.
Topic examples
vehicle/fleet-a/truck-984/telemetry/gps
vehicle/fleet-a/truck-984/telemetry/speed
vehicle/fleet-a/truck-984/telemetry/battery
vehicle/fleet-a/truck-984/telemetry/fuel
vehicle/fleet-a/truck-984/event/shock
vehicle/fleet-a/truck-984/event/door-open
vehicle/fleet-a/truck-984/status/connectivity
vehicle/fleet-a/truck-984/command/diagnostic| Flow | QoS | Reason |
|---|---|---|
| GPS high frequency | 0 or 1 | Next position may replace previous one. |
| Shock event | 1 | Rare and important event. |
| Door open | 1 | Security and logistics event. |
| Connectivity status | 1 + retained | Latest vehicle state for dashboard. |
| Remote diagnostic command | 1/2 + ack | Command must be tracked. |
Fleet architecture
Vehicle Gateway
│
├── GPS
├── CAN bus
├── battery / fuel
├── door sensor
├── shock sensor
└── cellular modem
│
│ MQTT/TLS
▼
Cloud MQTT Broker
│
├── map dashboard
├── alerting
├── maintenance analytics
├── route optimization
└── historical storageOffline buffering pattern
Network available:
vehicle publishes live MQTT
Network unavailable:
gateway stores locally:
- sequence_number
- event_ts
- topic
- payload
- priority
Network restored:
gateway replays buffered messages
with original event_ts and sequence_number
Backend:
deduplicates using vehicle_id + sequence_numberGPS payload
{
"schema_version": "1.0",
"message_id": "gps-00018422",
"vehicle_id": "truck-984",
"sequence": 18422,
"lat": 48.8566,
"lon": 2.3522,
"speed_kmh": 72.4,
"heading": 184,
"ts": "2026-04-25T10:00:00Z"
}Logistics, cold chain, warehouses and asset tracking
Logistics systems use MQTT to monitor mobile assets, containers, warehouses, cold rooms, pallets, doors, temperature, humidity and exceptions. The key value is exception-driven monitoring: alert only when something deviates from the expected path or conditions.
Topic examples
logistics/warehouse-01/cold-room-03/telemetry/temperature
logistics/warehouse-01/cold-room-03/telemetry/humidity
logistics/container/cntr-991/telemetry/location
logistics/container/cntr-991/event/door-open
logistics/container/cntr-991/event/shock
logistics/pallet/pallet-883/status/availability| Use case | MQTT pattern | Backend action |
|---|---|---|
| Cold chain monitoring | Temperature telemetry + threshold alert. | Compliance event and notification. |
| Door open event | Event topic, QoS 1. | Security alert and audit. |
| Asset tracking | GPS telemetry, batch replay. | Map and route history. |
| Warehouse state | Retained status topics. | Dashboard latest state. |
Cold chain flow
Container sensor
│
│ PUBLISH temperature
▼
MQTT Broker
│
├── store time-series
├── update latest state
└── threshold rule
│
├── normal -> no alert
└── out of range
│
▼
event/cold-chain-violation
│
├── notify operator
├── create incident
└── store compliance evidenceCold chain payload
{
"schema_version": "1.0",
"message_id": "temp-991-0001",
"asset_id": "cntr-991",
"metric": "temperature",
"value": 8.7,
"unit": "C",
"allowed_min": 2.0,
"allowed_max": 8.0,
"ts": "2026-04-25T10:00:00Z"
}Compliance event payload
{
"schema_version": "1.0",
"event_id": "evt-cold-001",
"asset_id": "cntr-991",
"severity": "high",
"code": "TEMPERATURE_OUT_OF_RANGE",
"value": 8.7,
"unit": "C",
"ts": "2026-04-25T10:00:00Z"
}Health, care facilities and medical-adjacent telemetry
MQTT can be used in healthcare-adjacent environments for non-critical telemetry: room sensors, bed presence, equipment status, environmental monitoring, facility alerts and device availability. Medical use cases require stronger compliance, validation, privacy and safety controls.
| Use case | Topic example | Important control |
|---|---|---|
| Room temperature | care/site-01/room-204/telemetry/temperature | Privacy-safe environmental data. |
| Bed presence | care/site-01/room-204/bed-1/status/presence | Access control and audit. |
| Equipment status | care/site-01/equipment/pump-88/status/availability | Retained state and LWT. |
| Facility alert | care/site-01/room-204/event/call-button | QoS 1, alert workflow, audit. |
Architecture care facility
Room Sensors / Equipment
│
▼
Local MQTT Broker
│
├── nurse dashboard
├── facility alerting
├── equipment state monitor
└── audit/event storage
│
▼
Optional secure cloud syncSafety and privacy considerations
| Concern | Control |
|---|---|
| Personal data exposure | Minimize payload, strict ACL, encryption. |
| Unauthorized subscriptions | No broad wildcard, role-based access. |
| Missed alert | QoS 1, ack workflow, monitoring. |
| Device offline | LWT retained status and heartbeat. |
| Audit requirement | Store events with actor/time/source. |
Alert event payload
{
"schema_version": "1.0",
"event_id": "evt-call-001",
"room_id": "room-204",
"source": "call-button",
"severity": "high",
"code": "ASSISTANCE_REQUESTED",
"ts": "2026-04-25T10:00:00Z"
}Edge observability: device health, agents and distributed sites
MQTT is excellent for edge observability: gateways, small servers, remote agents, industrial PCs and distributed sites can publish health, metrics, logs, status and connectivity events to a central backend.
Topic examples
edge/site-01/gateway-01/status/availability
edge/site-01/gateway-01/telemetry/cpu
edge/site-01/gateway-01/telemetry/memory
edge/site-01/gateway-01/telemetry/disk
edge/site-01/gateway-01/event/service-down
edge/site-01/gateway-01/event/network-loss
edge/site-01/gateway-01/command/restart-service| Signal | QoS | Retained | Action |
|---|---|---|---|
| Availability | 1 | Yes | Dashboard latest online/offline. |
| CPU/memory | 0/1 | No | Trend and alert threshold. |
| Disk full | 1 | No | Create alert. |
| Service down | 1 | No | Incident workflow. |
| Restart service command | 1 | No | Command ack/result. |
Edge monitoring architecture
Edge Agent
│
├── reads local metrics
├── checks services
├── monitors disk
├── publishes heartbeat
├── publishes events
└── receives safe commands
│
▼
MQTT Broker
│
├── operations dashboard
├── alert manager
├── time-series storage
├── command service
└── incident historyAgent status payload
{
"schema_version": "1.0",
"agent_id": "gateway-01",
"state": "online",
"version": "2.4.1",
"hostname": "edge-site-01",
"uptime_seconds": 88422,
"ts": "2026-04-25T10:00:00Z"
}Service-down event
{
"schema_version": "1.0",
"event_id": "evt-svc-001",
"service": "local-ingestion",
"state": "down",
"severity": "critical",
"ts": "2026-04-25T10:00:00Z"
}Common architecture patterns across applications
Pattern 1: MQTT to time-series storage
Devices
│
▼
MQTT Broker
│
▼
Ingestion worker
│
├── validate
├── deduplicate
├── normalize units
└── batch write
│
▼
TimescaleDB / InfluxDB / ClickHousePattern 2: MQTT to dashboard
Devices
│
▼
MQTT Broker
│
├── Backend subscriber
│ └── WebSocket push
│
└── Retained status
└── dashboard initial statePattern 3: MQTT to alerting
Telemetry / Event
│
▼
MQTT Broker
│
▼
Rules engine
│
├── threshold check
├── anomaly check
├── state transition
└── alert creationPattern 4: Edge broker to cloud broker
Local site
│
├── devices
├── local MQTT broker
├── local dashboard
└── local rules
│
│ bridge MQTT/TLS
▼
Cloud broker
│
├── central dashboard
├── data lake
├── analytics
└── command servicePattern 5: MQTT to Kafka
MQTT Broker
│
▼
Connector / Worker
│
▼
Kafka topic
│
├── stream processing
├── data lake
├── ML pipeline
└── audit replayPattern 6: Command and result
Backend command service
│
▼
MQTT command topic
│
▼
Device validates command
│
├── publish ack
└── publish resultDecision matrix: when MQTT is the right tool
| Question | If yes | If no |
|---|---|---|
| Many distributed clients? | MQTT is a strong candidate. | HTTP/REST may be enough. |
| Small frequent messages? | MQTT fits very well. | For large files, use object storage. |
| Unstable networks? | MQTT helps with persistent connections and sessions. | Classic APIs may be simpler. |
| Multiple consumers? | Pub/Sub is useful. | Direct request/response may be simpler. |
| Need replay/history? | Add Kafka or database. | MQTT alone is enough for transport. |
| Need commands to devices? | Use command/ack/result topics. | Telemetry-only design is simpler. |
| Need browser real-time UI? | Use backend bridge or MQTT over WebSocket. | Normal dashboards can poll API. |
MQTT is excellent for
- IoT and distributed devices.
- Telemetry and edge observability.
- Real-time dashboards.
- Machine status and events.
- Fleet and mobile connectivity.
- Lightweight command channels with ack/result.
MQTT is not ideal for
| Need | Better option | Why |
|---|---|---|
| Large file upload. | HTTP/S3/object storage. | MQTT is for messages, not big files. |
| Long-term event replay. | Kafka/Redpanda/Pulsar. | MQTT broker is not an event log. |
| CRUD business API. | REST/GraphQL. | Request/response fits better. |
| Complex transactional workflow. | Database + workflow engine. | MQTT only transports messages. |
| Public web API. | HTTP API gateway. | Auth, documentation and client model are simpler. |
Final design checklist
[ ] Use case really benefits from Pub/Sub
[ ] Topic namespace is documented
[ ] QoS is chosen per flow
[ ] Status uses retained where useful
[ ] Offline uses Last Will
[ ] Commands have command_id
[ ] Commands have ack and result
[ ] Backend validates payloads
[ ] Backend is idempotent
[ ] Storage is designed for history
[ ] Broker is monitored
[ ] Security ACLs are tested
[ ] Failure cases are documentedWhy Sparkplug exists
MQTT is a transport protocol. It defines how clients connect, publish, subscribe and exchange messages, but it does not define an industrial information model by itself. MQTT alone does not say which machines exist, which metrics are valid, how a gateway announces its state, how a SCADA system discovers tags, or how a backend knows that a device has disappeared.
Sparkplug adds a semantic layer on top of MQTT for industrial use cases. It defines a standardized topic structure, message types, birth certificates, death certificates, metric payloads, sequence numbers and command patterns. In practice, Sparkplug turns raw MQTT into an interoperable IIoT data contract.
| Layer | MQTT alone | With Sparkplug |
|---|---|---|
| Transport | Publish / subscribe. | Still MQTT transport. |
| Topic structure | Custom convention. | Standardized industrial namespace. |
| Device discovery | Not defined. | Birth messages declare metrics. |
| Online / offline | Custom LWT. | Death messages and state model. |
| Metrics | Custom payload. | Typed metrics with metadata. |
| Commands | Custom topics and payloads. | Command message types and conventions. |
The problem Sparkplug solves
Without Sparkplug:
Gateway A publishes:
factory/line1/temp
Gateway B publishes:
plant/a/machine/temperature
Gateway C publishes:
devices/123/data
Problems:
- no common meaning
- no standard birth message
- no standard death message
- no metric catalog
- no consistent command model
- difficult SCADA auto-discovery
- difficult historian mapping
- backend-specific custom code everywhere
With Sparkplug:
- edge nodes announce themselves
- devices announce metrics
- offline state is standardized
- metrics are typed
- SCADA can discover tags
- historian can map data consistently
- backend can process lifecycle eventsWhere Sparkplug fits
Industrial Systems
├── PLC / sensors / controllers
├── Edge gateway
│ ├── OPC UA / Modbus / vendor drivers
│ ├── Sparkplug encoder
│ └── MQTT client
├── MQTT broker
├── SCADA
├── Historian
├── MES / ERP bridge
└── Operations dashboardCore Sparkplug concepts
Sparkplug introduces a vocabulary that is essential for industrial MQTT. The main objects are the application namespace, group, edge node, device, metric and lifecycle message type.
| Concept | Meaning | Example |
|---|---|---|
| Namespace | Top-level Sparkplug namespace. | spBv1.0 |
| Group ID | Logical group, site, plant or tenant. | factory-a |
| Edge Node ID | Gateway or node publishing on behalf of devices. | edge-01 |
| Device ID | Device or asset behind the edge node. | press-02 |
| Metric | Typed value with name, datatype, timestamp and quality. | pressure, temperature |
| Birth | Message declaring current state and metric definitions. | NBIRTH, DBIRTH |
| Death | Message indicating node or device is offline. | NDEATH, DDEATH |
Industrial hierarchy
Sparkplug hierarchy:
Application Namespace
└── Group ID
└── Edge Node ID
├── Node metrics
│ ├── CPU
│ ├── uptime
│ └── network status
│
└── Device ID
├── temperature
├── pressure
├── vibration
├── running state
└── alarm stateMessage type families
| Family | Messages | Purpose |
|---|---|---|
| Node lifecycle | NBIRTH, NDEATH | Edge node online/offline and metric declaration. |
| Device lifecycle | DBIRTH, DDEATH | Device online/offline and metric declaration. |
| Data | NDATA, DDATA | Regular metric updates. |
| Commands | NCMD, DCMD | Commands sent to node or device. |
| State | STATE | Application state awareness. |
Birth and death messages
Birth and death messages are the heart of Sparkplug. A birth message announces that a node or device is online and declares its metrics. A death message announces that it is offline. This allows SCADA systems, historians and dashboards to maintain a reliable live model of the industrial system.
| Message | Direction | Role | Typical trigger |
|---|---|---|---|
| NBIRTH | Edge node to broker | Node announces online state and node metrics. | Gateway startup or reconnect. |
| NDEATH | Broker via Last Will | Node is considered offline. | Unexpected gateway loss. |
| DBIRTH | Edge node to broker | Device announces metrics and initial state. | Device discovered or reconnects. |
| DDEATH | Edge node or broker pattern | Device becomes unavailable. | Device disconnect, PLC link loss. |
| NDATA | Edge node to broker | Node metric updates. | Periodic or change-based update. |
| DDATA | Device to broker via node | Device metric updates. | Sensor or PLC value changes. |
| NCMD | Application to node | Command to edge node. | Rebirth request, configuration, operation. |
| DCMD | Application to device | Command to industrial device. | Setpoint, reset, mode change. |
Lifecycle diagram
Edge node starts
│
├── CONNECT to MQTT broker
│ └── registers NDEATH as Last Will
│
├── publish NBIRTH
│ └── node metrics are declared
│
├── publish DBIRTH for each device
│ └── device metrics are declared
│
├── publish NDATA / DDATA
│ └── regular metric updates
│
├── normal shutdown
│ └── graceful disconnect
│
└── unexpected failure
└── broker publishes NDEATHWhy birth matters
Without birth:
- consumer sees values but not full metric catalog
- SCADA may not know what tags exist
- historian mapping must be hardcoded
- restart creates ambiguity
- offline/online is custom
With birth:
- node announces complete metric set
- consumers can rebuild state
- SCADA can discover tags
- backend can detect restarts
- sequence and lifecycle are explicitSparkplug topic namespace
Sparkplug defines a topic namespace that carries the namespace version, group, message type, edge node and optionally device ID. This makes routing and interpretation standardized across industrial consumers.
Generic structure
spBv1.0/{group_id}/{message_type}/{edge_node_id}
spBv1.0/{group_id}/{message_type}/{edge_node_id}/{device_id}Examples
Node birth:
spBv1.0/factory-a/NBIRTH/edge-01
Node data:
spBv1.0/factory-a/NDATA/edge-01
Node death:
spBv1.0/factory-a/NDEATH/edge-01
Device birth:
spBv1.0/factory-a/DBIRTH/edge-01/press-02
Device data:
spBv1.0/factory-a/DDATA/edge-01/press-02
Device death:
spBv1.0/factory-a/DDEATH/edge-01/press-02
Device command:
spBv1.0/factory-a/DCMD/edge-01/press-02Topic fields
| Field | Role | Example |
|---|---|---|
| Namespace | Identifies Sparkplug namespace/version. | spBv1.0 |
| Group ID | Logical plant, tenant or system group. | factory-a |
| Message Type | Lifecycle, data or command type. | NBIRTH, DDATA |
| Edge Node ID | Gateway or edge node identity. | edge-01 |
| Device ID | Optional device behind edge node. | press-02 |
Subscription patterns
All Sparkplug messages:
spBv1.0/#
All factory-a messages:
spBv1.0/factory-a/#
All device data:
spBv1.0/+/DDATA/+/+
All birth messages:
spBv1.0/+/+BIRTH/#
One edge node:
spBv1.0/factory-a/+/edge-01/#Sparkplug payload model and metrics
Sparkplug payloads are designed to represent industrial metrics with structure. A metric is not only a raw value: it can include a name, alias, datatype, timestamp, value, quality and metadata. This is essential for industrial interoperability.
| Metric field | Role | Example |
|---|---|---|
| Name | Human and system-readable metric name. | pressure |
| Alias | Numeric shorthand for repeated data messages. | 12 |
| Datatype | Metric type. | Float, Int, Boolean, String. |
| Value | Actual measurement or state. | 72.4 |
| Timestamp | Time of metric. | 2026-04-25T10:00:00Z |
| Quality | Signal quality or validity. | Good, bad, uncertain. |
| Properties | Metadata such as engineering units. | bar, C, rpm |
Birth payload idea
DBIRTH for press-02 declares:
metrics:
- pressure
datatype: Float
unit: bar
alias: 1
- temperature
datatype: Float
unit: C
alias: 2
- running
datatype: Boolean
alias: 3
- alarm_active
datatype: Boolean
alias: 4Data payload idea
DDATA for press-02 sends updates:
metrics:
- alias: 1
value: 72.4
timestamp: 2026-04-25T10:00:00Z
- alias: 2
value: 41.8
timestamp: 2026-04-25T10:00:00Z
- alias: 3
value: true
timestamp: 2026-04-25T10:00:00Z
Meaning is known because DBIRTH mapped:
alias 1 -> pressure
alias 2 -> temperature
alias 3 -> runningMetric design rules
| Rule | Reason |
|---|---|
| Use stable metric names. | SCADA and historian mapping depend on them. |
| Declare units. | Prevents wrong interpretation. |
| Use correct datatypes. | Prevents parsing and storage errors. |
| Keep aliases stable during session. | Efficient and consistent DDATA processing. |
| Preserve timestamps. | Industrial data often arrives after acquisition time. |
Industrial Sparkplug architecture
A Sparkplug architecture usually starts at the edge. The edge node talks to PLCs, sensors and controllers, normalizes data into Sparkplug metrics, publishes lifecycle messages and forwards data to MQTT consumers such as SCADA, historians and dashboards.
Reference architecture
PLC / Sensors / Controllers
│
│ OPC UA / Modbus / vendor protocol
▼
Sparkplug Edge Node / Gateway
│
├── reads industrial tags
├── maps tags to metrics
├── publishes NBIRTH
├── publishes DBIRTH
├── publishes NDATA / DDATA
├── registers NDEATH Last Will
└── handles NCMD / DCMD
│
▼
MQTT Broker
│
├── SCADA
├── Historian
├── Operations dashboard
├── Alerting engine
├── MES / ERP bridge
└── Data lake connectorConsumer roles
| Consumer | Consumes | Purpose |
|---|---|---|
| SCADA | Birth, data, state, commands. | Operational supervision and control. |
| Historian | Data metrics. | Long-term industrial data storage. |
| Dashboard | Status, data, alarms. | Operational visibility. |
| Alerting | Metrics and alarm state. | Detect thresholds, anomalies and downtime. |
| MES / ERP bridge | Selected metrics/events. | Business integration. |
| Django backend | State, events, selected telemetry. | Admin, audit, workflows, analytics. |
Data path
Raw PLC tag
│
▼
Gateway driver
│
▼
Metric mapping
│
▼
Sparkplug payload
│
▼
MQTT broker
│
├── SCADA live display
├── historian time-series
└── backend event processingSCADA, historian and operations integration
Sparkplug is valuable because it makes MQTT easier for industrial systems to consume. A SCADA system can discover nodes and devices through birth messages. A historian can map metric names, types and timestamps. Operations dashboards can distinguish offline state from missing data.
| System | What it needs | How Sparkplug helps |
|---|---|---|
| SCADA | Tags, state, commands, online/offline. | Birth/death and command message types. |
| Historian | Metric name, type, timestamp, quality. | Structured metrics and lifecycle. |
| Dashboard | Current state and recent values. | Birth rebuild and death awareness. |
| Alerting | Reliable state transitions. | Death messages and metric quality. |
| Backend | Consistent ingestion model. | Standard topics and payload structure. |
SCADA discovery flow
SCADA subscribes:
spBv1.0/+/+/+/#
Edge node starts:
publishes NBIRTH
Devices appear:
publishes DBIRTH for each device
SCADA builds:
- node list
- device list
- metric tags
- current state
- command capabilities
Regular updates:
NDATA / DDATA update live valuesHistorian mapping flow
DBIRTH declares metrics:
press-02.pressure
press-02.temperature
press-02.running
press-02.alarm_active
Historian creates or maps tags:
factory-a.edge-01.press-02.pressure
factory-a.edge-01.press-02.temperature
DDATA arrives:
alias 1 = 72.4
alias 2 = 41.8
Historian writes:
tag, value, event_ts, quality, ingestion_tsOperations dashboard model
| Dashboard widget | Source | Meaning |
|---|---|---|
| Node online/offline | NBIRTH / NDEATH | Gateway state. |
| Device online/offline | DBIRTH / DDEATH | Machine or asset state. |
| Metric value | DDATA | Current telemetry. |
| Metric quality | Metric quality field | Trust level of value. |
| Command result | Command response pattern | Control feedback. |
OPC UA bridge and industrial protocol mapping
In many factories, MQTT does not directly read the machine. The edge gateway reads industrial protocols such as OPC UA, Modbus, serial protocols or vendor APIs, then publishes a normalized Sparkplug model to MQTT.
OPC UA to Sparkplug mapping
OPC UA source:
ns=2;s=Press02.Pressure
ns=2;s=Press02.Temperature
ns=2;s=Press02.Running
ns=2;s=Press02.AlarmActive
Gateway mapping:
Press02.Pressure -> metric pressure
Press02.Temperature -> metric temperature
Press02.Running -> metric running
Press02.AlarmActive -> metric alarm_active
Sparkplug device:
group_id = factory-a
edge_node_id = edge-01
device_id = press-02| Source concept | Sparkplug concept | Example |
|---|---|---|
| OPC UA server | Edge node data source | opcua-server-01 |
| OPC UA node | Metric | pressure |
| OPC UA namespace | Group or metadata | factory-a |
| PLC / machine | Device ID | press-02 |
| OPC UA quality | Metric quality | Good, bad, uncertain. |
Bridge architecture
PLC / Machine
│
▼
OPC UA Server
│
▼
Edge Gateway
│
├── browse OPC UA nodes
├── map nodes to assets
├── read values
├── preserve quality
├── preserve timestamp
├── build Sparkplug metrics
└── publish MQTT
│
▼
MQTT Broker
│
├── SCADA
├── Historian
└── BackendMapping risks
| Risk | Impact | Mitigation |
|---|---|---|
| Wrong unit mapping | Bad analytics or unsafe control. | Metric registry and engineering units. |
| Missing quality | Bad values treated as valid. | Preserve source quality. |
| Timestamp overwritten | Wrong event chronology. | Store source time and ingestion time. |
| Unstable node names | Broken historian mapping. | Stable metric naming contract. |
| Too many raw tags | Cardinality explosion. | Curate and normalize at gateway. |
Django backend design for Sparkplug ingestion
A Django backend consuming Sparkplug should model lifecycle separately from metric history. Birth and death messages update node/device state. Data messages write time-series points. Commands and command results should be audited.
Recommended Django models
SparkplugGroup
- group_id
- description
- tenant
- enabled
SparkplugEdgeNode
- group
- edge_node_id
- online_state
- last_birth_at
- last_death_at
- last_sequence
- last_seen_at
SparkplugDevice
- edge_node
- device_id
- online_state
- last_birth_at
- last_death_at
- last_seen_at
SparkplugMetricDefinition
- device or node
- metric_name
- alias
- datatype
- unit
- properties
- active
SparkplugMetricSample
- metric_definition
- value
- quality
- event_ts
- ingestion_ts
SparkplugCommandJournal
- command_id
- target
- command_name
- payload
- status
- requested_by
- created_at
- ack_at
- result_atProcessing pipeline
MQTT message
│
▼
Parse Sparkplug topic
│
▼
Identify message type
│
├── NBIRTH
│ ├── upsert edge node
│ ├── mark online
│ └── upsert node metric definitions
│
├── NDEATH
│ ├── mark edge node offline
│ └── mark child devices uncertain/offline
│
├── DBIRTH
│ ├── upsert device
│ ├── mark online
│ └── upsert device metric definitions
│
├── DDEATH
│ └── mark device offline
│
├── NDATA / DDATA
│ ├── resolve metric alias/name
│ ├── write samples
│ └── update current state
│
└── NCMD / DCMD
└── audit command flowAdmin views
| Admin page | What to show |
|---|---|
| Groups | Group ID, tenant, node count, online nodes. |
| Edge nodes | Online state, last birth/death, sequence, devices. |
| Devices | Online state, metrics, last seen, parent node. |
| Metrics | Name, alias, datatype, unit, latest value. |
| Rejected messages | Invalid topic, unknown alias, bad payload, sequence issue. |
| Commands | Command ID, target, status, ack, result, timeout. |
Production checklist and anti-patterns
Sparkplug production checklist
[ ] Group IDs are documented
[ ] Edge node IDs are stable
[ ] Device IDs are stable
[ ] NBIRTH is published on startup
[ ] NDEATH is configured as Last Will
[ ] DBIRTH is published for devices
[ ] Metric definitions are stable
[ ] Metric units are declared
[ ] Metric datatypes are correct
[ ] Metric aliases are tracked
[ ] Sequence numbers are monitored
[ ] NDATA / DDATA are stored correctly
[ ] Death messages update online state
[ ] Rebirth handling is implemented
[ ] Commands use NCMD / DCMD patterns
[ ] Command audit is stored
[ ] SCADA subscriptions are scoped
[ ] Historian mapping is tested
[ ] Backend rejects unknown metrics safely
[ ] Dashboard distinguishes offline from stale dataOperational alerts
| Alert | Meaning | Action |
|---|---|---|
| NDEATH received | Edge node offline. | Check gateway, network, broker connection. |
| Missing DBIRTH | Device not properly declared. | Check gateway mapping. |
| Unknown metric alias | Data without known birth mapping. | Request rebirth or inspect gateway. |
| Sequence gap | Potential lost data or restart. | Check node logs and broker stability. |
| Metric quality bad | Source signal unreliable. | Inspect PLC, sensor or OPC UA source. |
Anti-patterns
| Anti-pattern | Impact | Correction |
|---|---|---|
| Using MQTT topics without lifecycle. | Consumers cannot know if node is alive. | Use NBIRTH/NDEATH lifecycle. |
| Changing metric names frequently. | SCADA/historian mapping breaks. | Maintain stable metric contracts. |
| Ignoring birth messages. | Metric catalog becomes stale. | Process birth as state rebuild. |
| Ignoring metric quality. | Bad values look valid. | Store and display quality. |
| No rebirth handling. | Consumers stay desynchronized. | Implement rebirth request and processing. |
| Commands without audit. | No traceability for machine actions. | Command journal with ack/result. |
Final architecture rule
Useful URLs
Sparkplug specification:
https://sparkplug.eclipse.org/specification/
Eclipse Sparkplug project:
https://sparkplug.eclipse.org/
MQTT official:
https://mqtt.org/
OPC Foundation:
https://opcfoundation.org/
Eclipse Tahu:
https://github.com/eclipse/tahuMQTT operations model
Running MQTT in production means observing the broker, the clients, the subscriptions, the session queues, the retained store, the backend consumers and the business impact. A broker can be technically online while messages are delayed, rejected, dropped or silently accumulating in offline sessions.
The operational goal is simple: know how many clients are connected, how many messages flow through the broker, whether consumers are keeping up, whether security controls are rejecting abnormal traffic, and whether critical business flows are healthy.
| Layer | What to observe | Typical failure |
|---|---|---|
| Network | DNS, TLS, firewall, load balancer, latency. | Devices cannot connect or reconnect storms. |
| Broker | Connections, messages/sec, CPU, RAM, retained, dropped. | Overload, memory pressure, routing latency. |
| Sessions | Persistent sessions, inflight, queued messages. | Offline subscribers accumulating messages. |
| Security | Auth failures, ACL denied, invalid certs, suspicious wildcards. | Attack, broken firmware, bad provisioning. |
| Backend | Consumer lag, queue depth, processing time, DB latency. | Ingestion behind real time. |
| Business | Freshness of data, missing devices, command success rate. | Dashboard stale, commands not acknowledged. |
End-to-end observability path
Device
│
├── connect success/failure
├── publish rate
├── reconnect count
└── last message timestamp
│
▼
MQTT Broker
│
├── connected clients
├── messages in/out
├── dropped/rejected
├── session queues
├── retained count
└── auth/ACL failures
│
▼
Backend subscriber
│
├── consumer lag
├── queue depth
├── processing latency
├── duplicate count
└── rejected payloads
│
▼
Storage / Dashboard
│
├── latest state freshness
├── time-series write rate
├── alarm creation rate
└── command ack/result rateGolden signals for MQTT
Traffic:
- messages in/sec
- messages out/sec
- bytes in/out
Errors:
- dropped messages
- auth failures
- ACL denied
- invalid payloads
Latency:
- broker routing latency
- backend consumer lag
- time from event_ts to stored_at
Saturation:
- CPU / RAM / network
- file descriptors
- session queue depth
- Redis/Celery/Kafka backlogBroker metrics: the minimum production set
| Metric | Why it matters | Typical alert |
|---|---|---|
| Connected clients | Fleet health and connectivity. | Sudden drop or abnormal spike. |
| Messages in/sec | Incoming publisher load. | Outside baseline for 5-10 minutes. |
| Messages out/sec | Subscriber fan-out pressure. | Out rate much higher than in rate unexpectedly. |
| Bytes in/out | Network and payload size trend. | Large increase may indicate bad firmware or flood. |
| Dropped messages | Potential data loss. | Any persistent value above zero. |
| Rejected messages | Protocol, quota, auth or validation issue. | Spike by client, IP or topic. |
| Retained messages count | Retained store growth. | Unexpected growth trend. |
| Session queue depth | Offline or slow subscribers. | Queue grows continuously. |
| Inflight messages | QoS state pressure. | High sustained inflight count. |
| CPU / RAM / FD | Broker saturation. | CPU/RAM/FD above threshold with trend. |
Broker dashboard layout
MQTT Broker Dashboard
├── Broker health
│ ├── uptime
│ ├── node status
│ ├── CPU / RAM
│ ├── disk
│ └── file descriptors
│
├── Connections
│ ├── connected clients
│ ├── connects/sec
│ ├── disconnects/sec
│ ├── reconnect loops
│ └── top clients by traffic
│
├── Traffic
│ ├── messages in/sec
│ ├── messages out/sec
│ ├── bytes in/out
│ ├── publish rate by topic prefix
│ └── fan-out ratio
│
├── Reliability
│ ├── dropped messages
│ ├── rejected messages
│ ├── queued messages
│ ├── retained messages
│ └── inflight messages
│
└── Security
├── auth failures
├── ACL denied
├── invalid certificate
└── suspicious wildcard attemptsUseful derived metrics
| Derived metric | Formula idea | Meaning |
|---|---|---|
| Fan-out ratio | messages_out / messages_in | How many subscribers receive each message. |
| Average payload size | bytes_in / messages_in | Detect firmware or payload drift. |
| Reconnect ratio | disconnects / connected_clients | Detect unstable clients. |
| Error rate | rejected / total messages | Detect protocol or security problems. |
Client and fleet metrics
Broker metrics show the platform view. Fleet metrics show the device reality: which clients are online, which firmware versions are unstable, which sites are losing connectivity, and which devices are producing abnormal traffic.
| Metric | Scope | Why it matters |
|---|---|---|
| last_seen_at | Device | Detect stale devices. |
| online_state | Device/site | Dashboard availability. |
| reconnect_count | Device/firmware | Detect unstable network or firmware. |
| publish_rate | Device/topic | Detect flood or silent device. |
| payload_size_avg | Device/firmware | Detect schema or firmware drift. |
| command_ack_rate | Device/service | Command reliability. |
| firmware_version | Device | Correlate incidents with deployments. |
| site_online_percentage | Site | Operational view by location. |
Device state model
Device state:
- online
- offline
- stale
- degraded
- unknown
- disabled
- revoked
Transitions:
online -> stale if no data for threshold
stale -> offline if Last Will or long timeout
offline -> online on new retained status or birth
online -> degraded if errors or bad quality increase
any -> disabled operator action
any -> revoked security incidentFleet dashboard
Fleet / Device Dashboard
├── Global
│ ├── total devices
│ ├── online devices
│ ├── stale devices
│ ├── offline devices
│ └── disabled/revoked devices
│
├── By site
│ ├── online percentage
│ ├── last message age
│ ├── reconnect rate
│ └── top failing devices
│
├── By firmware
│ ├── version distribution
│ ├── error rate
│ ├── reconnect rate
│ └── payload validation failures
│
└── By topic prefix
├── message rate
├── rejected count
├── payload size
└── retained countStaleness thresholds
| Flow | Expected frequency | Stale threshold |
|---|---|---|
| Heartbeat | 30s | 90s |
| Temperature | 60s | 180s |
| GPS | 10s | 60s |
| Industrial status | On change + heartbeat | 2x to 5x heartbeat window |
| Rare event | Irregular | Use device heartbeat, not event frequency. |
Backend lag, queue depth and ingestion freshness
MQTT broker health does not guarantee backend health. Your backend can be slow while the broker continues receiving messages. You must monitor queue depth, consumer lag, oldest pending message age, database latency and end-to-end freshness.
| Metric | Where | Meaning | Alert |
|---|---|---|---|
| Redis stream length | Redis | Buffered messages not fully processed. | Growth for 10-15 minutes. |
| Celery queue depth | Celery broker | Task backlog. | Above baseline and growing. |
| Kafka consumer lag | Kafka | Consumer behind partition offsets. | Lag grows continuously. |
| Oldest pending age | Queue/buffer | Worst-case delay. | Exceeds business freshness SLO. |
| DB write latency | Database | Storage bottleneck. | P95/P99 above baseline. |
| Rejected payload rate | Backend | Bad firmware, schema drift or attack. | Spike per device/firmware/topic. |
| Duplicate count | Inbox/idempotency | Retries or reconnects. | Spike by device/site. |
End-to-end freshness
event_ts:
time when device measured the data
received_at:
time when backend subscriber received MQTT message
stored_at:
time when message was stored in database
dashboard_seen_at:
time when UI displayed updated state
Freshness metrics:
- received_at - event_ts
- stored_at - event_ts
- dashboard_seen_at - event_ts
- stored_at - received_atLag diagnosis diagram
Messages are late
│
├── Broker messages in normal?
│ ├── no -> publisher/device/network issue
│ └── yes
│
├── Subscriber receiving?
│ ├── no -> subscription/ACL/session issue
│ └── yes
│
├── Buffer growing?
│ ├── yes -> workers too slow or DB slow
│ └── no
│
├── DB write latency high?
│ ├── yes -> optimize DB/batch/partition
│ └── no
│
└── Dashboard stale?
├── yes -> WebSocket/cache/API issue
└── no -> incident resolvedBackend SLO examples
| Flow | Freshness SLO | Why |
|---|---|---|
| Industrial alarm | < 5 seconds | Operational response. |
| Dashboard state | < 10 seconds | Human supervision. |
| Temperature history | < 60 seconds | Analytics tolerant. |
| Batch replay after outage | Catch up within N minutes | Recovery objective. |
| Command ack | < 5 seconds | Operator confidence. |
Alert design: actionable, owned and tied to runbooks
MQTT alerts must be actionable. Avoid alerts that only say “broker problem”. Good alerts define the symptom, likely impact, owner, severity, first diagnostic steps and definition of done.
Alert principles
Alert design:
- warning: metric outside baseline for a short period
- critical: sustained issue with user/business impact
- every alert has an owner
- every alert links to a runbook
- every alert has a definition of done
- alerts use rate and trend, not only absolute value
- alerts are grouped by site, tenant, broker node or firmware| Alert | Condition | Severity | Owner |
|---|---|---|---|
| Connected clients drop | Drop by 30% in 10 min. | Critical if production fleet. | Platform / Network. |
| Auth failures spike | Above baseline + threshold. | Warning/Critical. | Security / Platform. |
| ACL denied spike | New client or firmware causing denies. | Warning. | Platform / Firmware. |
| Backend lag growing | Oldest pending age above SLO. | Critical for alarms. | Backend / Data. |
| Dropped messages | > 0 persistent. | Critical for critical flows. | Platform. |
| Retained growth | Unexpected growth over baseline. | Warning. | Platform. |
| Command ack timeout | Ack missing after SLO. | Critical for command flows. | Operations / Backend. |
Example alert definitions
Alert: MQTTConnectedClientsDrop
Condition:
connected_clients{env="prod"} drops by 30% over 10 minutes
Impact:
Devices may be unable to publish telemetry.
First checks:
1. Broker health
2. TLS certificate expiry
3. Load balancer health
4. DNS
5. Network/firewall changes
6. Recent deployment
Definition of done:
connected_clients returns to expected baseline
and no abnormal reconnect rate remains.
Alert: MQTTBackendConsumerLag
Condition:
oldest_pending_message_age_seconds > freshness_slo_seconds
Impact:
Dashboard and alerts are delayed.
First checks:
1. Queue depth
2. Worker count
3. DB latency
4. rejected payload spike
5. CPU/RAM workers
Definition of done:
queue depth stable or decreasing
oldest pending age below SLOAlert routing
| Alert family | Primary owner |
|---|---|
| Broker unavailable | Platform / Infrastructure. |
| Auth / certificate failures | Security / Platform. |
| Firmware-specific rejects | Firmware / Device team. |
| Backend queue lag | Backend / Data team. |
| Command failures | Operations / Backend. |
Dashboards: broker, fleet, backend and business views
One dashboard is not enough. MQTT production needs at least four views: broker health, fleet/device health, backend ingestion, and business/application state.
| Dashboard | Audience | Main questions |
|---|---|---|
| Broker dashboard | Infrastructure / SRE | Is MQTT infrastructure healthy? |
| Fleet dashboard | Operations / Support | Which devices/sites are online, stale or broken? |
| Ingestion dashboard | Backend / Data | Is the backend keeping up? |
| Security dashboard | Security / Platform | Are auth/ACL failures abnormal? |
| Business dashboard | Product / Operations | Are critical workflows working? |
Broker panels
Broker dashboard panels:
- connected clients
- messages in/sec
- messages out/sec
- dropped messages
- rejected messages
- retained messages count
- session queue depth
- auth failures
- ACL denied
- CPU / RAM / network
- top topic prefixes by message rateIngestion dashboard panels
Backend ingestion dashboard:
- Redis stream length
- Celery queue depth
- Kafka consumer lag
- oldest pending message age
- processing time p50/p95/p99
- DB write latency p95/p99
- rejected messages by reason
- duplicate messages by device
- top noisy devices
- latest stored timestamp by flowBusiness panels
| Panel | Purpose |
|---|---|
| Data freshness by site | Detect stale operational data. |
| Command success rate | Know if devices act on commands. |
| Alarm processing delay | Measure operational responsiveness. |
| Offline devices by tenant | Support and customer impact. |
| Rejected payloads by firmware | Detect broken deployments. |
Incident patterns and immediate diagnostics
| Incident | Immediate diagnostic | Likely causes |
|---|---|---|
| Mass devices offline | Broker up? TLS expired? DNS? Firewall? LB? | Certificate, network, broker restart, DNS, config rollback. |
| Messages missing | QoS? dropped? backend lag? ACL denied? | QoS 0 loss, subscriber offline, ACL, backend overload. |
| Broker CPU high | Wildcards? reconnect storm? retained flood? | Noisy client, broad subscriptions, fan-out spike. |
| Backend subscriber slow | DB slow? queue backlog? heavy parsing? | Database bottleneck, worker shortage, bad payloads. |
| ACL denies spike | New firmware? wrong topic? intrusion? | Firmware bug, misprovisioning, attack attempt. |
| Auth failures spike | Source IP? cert expiry? password rotation? | Credential issue, attack, expired certs. |
| Retained store growth | Which topic prefix? new firmware? | Telemetry accidentally retained. |
| Command ack timeout | Device online? subscribed? command ACL? | Offline device, wrong topic, command service issue. |
Incident triage diagram
Incident detected
│
▼
Is broker reachable?
├── no -> infra incident
└── yes
│
▼
Are clients connected?
├── no -> TLS/DNS/network/auth
└── yes
│
▼
Are messages entering broker?
├── no -> publisher/device issue
└── yes
│
▼
Are messages leaving broker?
├── no -> subscription/ACL/session issue
└── yes
│
▼
Is backend consuming?
├── no -> subscriber/queue/worker issue
└── yes
│
▼
Is storage/dashboard fresh?
├── no -> DB/API/cache/WebSocket issue
└── yes -> incident may be scoped or resolvedFirst 5 minutes checklist
[ ] Check broker health
[ ] Check connected clients trend
[ ] Check messages in/out trend
[ ] Check auth and ACL failures
[ ] Check dropped/rejected messages
[ ] Check backend lag
[ ] Check recent deployments
[ ] Check TLS certificate expiry
[ ] Check firewall/LB/DNS changes
[ ] Check top noisy clients and topic prefixesRunbooks for common MQTT incidents
Runbook: mass device offline
Symptoms:
- connected clients drop sharply
- dashboards show offline devices
- reconnect attempts may spike
Immediate checks:
1. Broker process/node status
2. Load balancer health
3. TLS certificate expiry
4. DNS resolution
5. Firewall/security group changes
6. Recent broker config deployment
7. Auth backend availability
8. Network provider or VPN status
Actions:
1. Roll back recent broker/TLS/LB config if suspected.
2. Restore previous certificate if rotation failed.
3. Move traffic to healthy broker node if cluster.
4. Temporarily increase logs for connect failures.
5. Communicate impacted sites/tenants.
6. Confirm clients reconnect and publish data.
Definition of done:
- connected clients back to baseline
- reconnect rate normal
- data freshness below SLO
- no auth failure spike remainsRunbook: retained store flood
Symptoms:
- retained message count grows unexpectedly
- memory or storage increases
- new dashboards receive old irrelevant values
Immediate checks:
1. Top retained topic prefixes
2. Recent firmware deployment
3. Publish retain flag usage
4. Telemetry topics accidentally retained
5. ACL allowing retained publish too broadly
Actions:
1. Stop offending publisher or firmware.
2. Disable retain on telemetry flow.
3. Clear retained messages for bad prefixes carefully.
4. Add alert on retained growth.
5. Add validation in publisher library.
Definition of done:
- retained count stable
- bad retained topics removed
- publisher fixed
- alert addedRunbook: backend consumer lag
Symptoms:
- queue depth grows
- oldest pending age exceeds SLO
- dashboard stale
- alarms delayed
Immediate checks:
1. Worker count and health
2. Redis/Celery/Kafka lag
3. DB write latency
4. rejected payload spike
5. top topic prefixes by volume
6. recent backend deployment
7. database locks or slow queries
Actions:
1. Scale workers if safe.
2. Reduce low-priority ingestion.
3. Enable batch writes.
4. Pause noisy clients if needed.
5. Add temporary consumer group.
6. Fix DB bottleneck or rollback deployment.
7. Prioritize alarm/event queues.
Definition of done:
- queue depth decreasing
- oldest pending age below SLO
- dashboard fresh
- alarm flow normalRunbook: ACL denied spike
Symptoms:
- ACL denied rate spikes
- devices cannot publish or subscribe
- new firmware may fail
Immediate checks:
1. Which client IDs?
2. Which topic prefixes?
3. Publish or subscribe denied?
4. Firmware version?
5. Recent ACL changes?
6. Suspicious source IPs?
Actions:
1. If firmware bug: rollback or hotfix topic convention.
2. If ACL config bug: rollback ACL.
3. If attack: block source and preserve logs.
4. If provisioning issue: fix registry/identity mapping.
5. Add regression test for denied/allowed cases.
Definition of done:
- denied rate back to baseline
- affected clients confirmed healthy
- root cause documentedCapacity planning and scaling
MQTT capacity depends on connected clients, publish rate, subscriber fan-out, QoS level, retained messages, persistent sessions, payload size and backend throughput. The broker can often handle large message rates, but the backend database may become the real bottleneck.
| Capacity driver | Impact | Scaling lever |
|---|---|---|
| Connected clients | Sockets, memory, keepalive traffic. | Broker tuning, cluster, LB, FD limits. |
| Messages/sec | CPU, network, routing, backend load. | Horizontal broker scale, workers, batching. |
| QoS 1/2 | More state and acknowledgements. | Use QoS by flow, tune inflight. |
| Payload size | Bandwidth, memory, parsing time. | Limit size, compress carefully, use object storage for large data. |
| Subscriber fan-out | messages_out can greatly exceed messages_in. | Reduce duplicate subscribers, use bridges carefully. |
| Persistent sessions | Queued messages and memory/disk. | Session expiry, quotas, cleanup. |
| Retained messages | Memory/store growth. | Retain only status/current state. |
Sizing formula thinking
Incoming messages/sec:
devices * messages_per_device_per_sec
Outgoing messages/sec:
incoming_messages * average_subscriber_fanout
Incoming bandwidth:
incoming_messages * average_payload_size
Backend writes/sec:
messages_to_store * write_amplification
Example:
10,000 devices
1 message / 10 sec each
= 1,000 messages/sec incoming
If fanout = 3:
= 3,000 messages/sec outgoing
If payload = 500 bytes:
= 500 KB/sec incoming before overheadScaling patterns
Small scale:
Devices -> Mosquitto -> Python worker -> PostgreSQL
Medium scale:
Devices -> MQTT broker -> Redis Stream -> worker pool -> TimescaleDB
Large scale:
Devices -> MQTT cluster -> Kafka bridge -> stream processors -> data lake
Industrial edge:
Machines -> local broker -> local dashboard
-> bridge -> central broker -> analyticsLoad test plan
Load test dimensions:
[ ] connected clients
[ ] publish rate
[ ] payload size
[ ] QoS level
[ ] retained flag usage
[ ] subscriber fan-out
[ ] persistent sessions
[ ] disconnect/reconnect storm
[ ] backend database write rate
[ ] command ack latency
Measure:
[ ] broker CPU/RAM
[ ] messages in/out
[ ] dropped messages
[ ] backend lag
[ ] DB latency
[ ] dashboard freshness
[ ] error rateSRE checklist for MQTT production readiness
Monitoring checklist
[ ] Connected clients monitored
[ ] Connect/disconnect rate monitored
[ ] Messages in/sec monitored
[ ] Messages out/sec monitored
[ ] Bytes in/out monitored
[ ] Dropped messages monitored
[ ] Rejected messages monitored
[ ] Auth failures monitored
[ ] ACL denied monitored
[ ] Session queue depth monitored
[ ] Inflight messages monitored
[ ] Retained messages monitored
[ ] Broker CPU/RAM/disk monitored
[ ] File descriptors monitored
[ ] Backend queue depth monitored
[ ] Backend lag monitored
[ ] DB write latency monitored
[ ] Rejected payloads visible in admin
[ ] Duplicate messages visible in admin
[ ] Data freshness SLO monitored
[ ] Command ack/result latency monitoredAlert checklist
[ ] Every alert has severity
[ ] Every alert has owner
[ ] Every alert has runbook
[ ] Every alert has impact description
[ ] Every alert has definition of done
[ ] Alerts use baseline when possible
[ ] Alerts are grouped by tenant/site/node
[ ] Alerts avoid noisy one-minute spikes
[ ] Critical alarms detect business impact
[ ] Security alerts preserve forensic contextOperational checklist
[ ] Broker config versioned
[ ] ACL config versioned
[ ] TLS certificates tracked
[ ] Certificate expiry alerts enabled
[ ] Rollback procedure documented
[ ] Load test performed
[ ] Failover test performed
[ ] Reconnect storm test performed
[ ] Slow subscriber test performed
[ ] Retained cleanup procedure documented
[ ] Device revocation procedure documented
[ ] Command incident procedure documented
[ ] Backup and restore tested
[ ] Dashboard reviewed with operations
[ ] On-call runbooks available
[ ] Post-incident review template readyDefinition of a healthy MQTT platform
It means clients are connected, messages flow within expected baselines, consumers keep up, dropped messages are zero or understood, security failures are explainable, retained state is controlled, data freshness is within SLO, and command workflows are acknowledged.
Minimum run command examples
Smoke subscribe:
mosquitto_sub -h broker.example.com -p 8883 \
--cafile ca.crt \
-u monitor -P password \
-t 'prod/acme/+/+/+/status/#' -v
Smoke publish:
mosquitto_pub -h broker.example.com -p 8883 \
--cafile ca.crt \
-u test_device -P password \
-t 'prod/acme/test/line-01/test-device/status/availability' \
-m '{"schema_version":"1.0","state":"online"}' \
-q 1 -r
Debug local only:
mosquitto_sub -h localhost -t '#' -vCloud and scaling overview
Scaling MQTT is not only about increasing broker capacity. A production MQTT platform must scale connections, authentication, authorization, message routing, backend consumers, storage, dashboards, monitoring, offline replay, bridge links and command flows.
The right architecture depends on the number of clients, message rate, payload size, QoS level, fan-out ratio, offline behavior, retained state, security model and whether the deployment is edge, cloud, industrial, multi-tenant or multi-region.
| Scaling dimension | Impact | Typical control |
|---|---|---|
| Connected clients | Sockets, memory, keepalive traffic. | Broker tuning, cluster, load balancer, file descriptor limits. |
| Messages/sec | CPU, network, routing, backend pressure. | Horizontal scale, topic partitioning, backend buffering. |
| Subscriber fan-out | Messages out can greatly exceed messages in. | Reduce duplicate subscriptions, use shared subscriptions. |
| QoS 1/2 | More acknowledgements and inflight state. | Use QoS per flow, tune inflight and retries. |
| Offline sessions | Queued messages and memory/disk growth. | Session expiry, quotas, queue limits. |
| Storage throughput | Database can become the bottleneck. | Redis/Kafka buffer, batch writes, time-series storage. |
End-to-end scaling map
Devices / Gateways
│
├── connection count
├── publish rate
├── payload size
├── QoS level
└── reconnect behavior
│
▼
MQTT Broker Layer
│
├── listener capacity
├── auth / ACL throughput
├── topic routing
├── session queues
├── retained store
└── cluster coordination
│
▼
Backend Consumers
│
├── shared subscriptions
├── Redis / Kafka buffer
├── Celery / workers
├── validation
└── idempotency
│
▼
Storage / Analytics
│
├── hot state
├── time-series
├── event store
├── data lake
└── dashboardsCommon scaling traps
Trap 1:
Broker benchmark is excellent, but database cannot keep up.
Trap 2:
Adding broker nodes does not fix slow subscribers.
Trap 3:
Wildcard subscriptions create huge fan-out.
Trap 4:
Persistent sessions accumulate queues for offline clients.
Trap 5:
Edge bridge replicates too many topics to central.
Trap 6:
Cloud IoT cost explodes because every tiny message is billable.MQTT scaling patterns
| Pattern | Usage | Strength | Limit |
|---|---|---|---|
| Single broker VM | Lab, prototype, small production. | Simple and cheap. | Single point of failure. |
| Broker cluster | Large production, many clients. | High availability and scale. | Operational complexity. |
| Edge broker per site | Factories, stores, remote sites. | Local autonomy during WAN outage. | Bridge and replay complexity. |
| Managed cloud IoT | AWS/Azure-centric IoT platform. | Device identity and cloud integration. | Cost, quotas, lock-in. |
| MQTT ingress to Kafka | Analytics, replay, data lake. | Durable streaming downstream. | Extra architecture layer. |
| Shared subscriptions | Horizontal backend consumers. | Load balancing. | Ordering and partitioning must be designed. |
Pattern selection
Small project:
Devices -> Mosquitto -> Python worker -> PostgreSQL
Medium project:
Devices -> EMQX/HiveMQ -> Redis Stream -> workers -> TimescaleDB
Large analytics:
Devices -> MQTT cluster -> Kafka bridge -> stream processors -> data lake
Industrial edge:
Machines -> local broker -> bridge -> central broker -> historian
Cloud managed:
Devices -> AWS IoT Core / Azure IoT Hub -> cloud rules -> storage/servicesScaling decision tree
Need local autonomy?
├── yes -> edge broker per site + bridge
└── no
│
▼
Need managed cloud identity?
├── yes -> AWS IoT Core / Azure IoT Hub
└── no
│
▼
Need high connection scale?
├── yes -> EMQX / HiveMQ / VerneMQ cluster
└── no
│
▼
Need simple broker?
├── yes -> Mosquitto
└── no -> RabbitMQ plugin if AMQP ecosystem existsWhat must scale together
- Broker listener capacity.
- Authentication backend.
- ACL evaluation path.
- Backend subscribers.
- Buffer layer such as Redis or Kafka.
- Database write throughput.
- Dashboard fan-out.
- Monitoring and alerting pipeline.
Broker clusters and load balancing
MQTT clusters are used when one broker instance is not enough for availability, connection count, message rate or operational resilience. The cluster is usually placed behind a TCP/TLS load balancer.
Cluster architecture
Devices
│
│ MQTT/TLS
▼
TCP/TLS Load Balancer
│
├── MQTT Broker Node 1
├── MQTT Broker Node 2
├── MQTT Broker Node 3
└── MQTT Broker Node N
│
├── shared routing
├── session handling
├── retained store
├── auth / ACL
└── metrics
│
▼
Backend subscribers / bridges / rules engine| Cluster concern | Why it matters | Control |
|---|---|---|
| Session distribution | Client reconnect may land on another node. | Broker-specific cluster/session model. |
| Retained messages | Need consistent retained state. | Cluster retained store or external store. |
| Shared subscriptions | Load balanced consumers must coordinate. | Use broker-supported shared subscription semantics. |
| TLS termination | LB or broker termination changes identity model. | Preserve mTLS if required. |
| Node failure | Clients reconnect and sessions recover. | Reconnect backoff and failover testing. |
Load balancer choices
| Mode | Pros | Cons |
|---|---|---|
| TCP passthrough | Broker handles TLS/mTLS directly. | Less L7 visibility at LB. |
| TLS termination at LB | Central TLS management. | mTLS client identity must be forwarded safely. |
| Sticky sessions | Can reduce session movement. | May reduce balancing quality. |
| Non-sticky balancing | Better distribution. | Requires strong cluster session handling. |
Cluster readiness checklist
[ ] Load balancer health checks
[ ] Broker node metrics
[ ] Cluster membership metrics
[ ] Reconnect behavior tested
[ ] Node failure tested
[ ] TLS certificate rotation tested
[ ] Retained messages behavior tested
[ ] Persistent session behavior tested
[ ] Shared subscription behavior tested
[ ] Auth backend failure tested
[ ] ACL changes tested
[ ] Backup and restore procedure
[ ] Rollback broker config procedureShared subscriptions for horizontal backend consumers
Shared subscriptions allow multiple backend consumers to share the same subscription group. Instead of every consumer receiving every message, the broker distributes messages across members of the group. This is essential for scaling ingestion workers.
Normal subscription
Subscription:
prod/acme/+/+/+/telemetry/#
Message:
prod/acme/paris/line-02/gw-123/telemetry/temperature
Result:
worker-1 receives message
worker-2 receives message
worker-3 receives message
Problem:
duplicate processing unless consumers are designed for fan-out.Shared subscription
Subscription:
$share/ingestion-workers/prod/acme/+/+/+/telemetry/#
Message A -> worker-1
Message B -> worker-2
Message C -> worker-3
Message D -> worker-1
Result:
load is distributed across the worker group.Shared subscription architecture
MQTT Broker
│
└── $share/ingestion-workers/prod/acme/+/+/+/telemetry/#
│
├── ingestion-worker-01
├── ingestion-worker-02
├── ingestion-worker-03
└── ingestion-worker-04
│
▼
Redis / Kafka / DatabaseDesign rules
| Rule | Reason |
|---|---|
| Use one group per processing role. | Separates ingestion, alerting and analytics. |
| Keep workers idempotent. | Retries and reconnects can still duplicate messages. |
| Do not assume global ordering. | Messages are distributed across consumers. |
| Monitor per-worker throughput. | Detect stuck or slow consumers. |
| Use separate queues for critical flows. | Telemetry spikes should not delay alarms. |
Edge broker to central broker bridge
An edge bridge connects a local site broker to a central broker. This is common in factories, buildings, stores, energy sites and remote facilities. The local site can continue operating during WAN outages, then synchronize selected topics when the link returns.
Bridge architecture
Factory local devices
│
▼
Local MQTT Broker
│
├── local dashboard
├── local rules
├── local buffer
└── bridge MQTT/TLS/mTLS
│
▼
Central MQTT Cluster
│
├── global dashboard
├── storage
├── alerting
├── analytics
└── ERP / CMMS integration| Bridge design point | Recommendation |
|---|---|
| Topic filtering | Bridge only required topics, not everything. |
| Direction | Separate upstream telemetry and downstream commands. |
| Security | Use TLS or mTLS between brokers. |
| Loop prevention | Avoid republishing central messages back to local endlessly. |
| Retained messages | Replicate retained state carefully. |
| Offline replay | Use sequence numbers and idempotency. |
Bridge topic mapping
Local topic:
site/paris/line-02/gw-123/telemetry/temperature
Central topic:
prod/acme/paris/line-02/gw-123/telemetry/temperature
Upstream bridge:
site/+/+/+/telemetry/# -> prod/acme/{site}/{area}/{asset}/telemetry/#
site/+/+/+/status/# -> prod/acme/{site}/{area}/{asset}/status/#
site/+/+/+/event/# -> prod/acme/{site}/{area}/{asset}/event/#
Downstream bridge:
prod/acme/paris/+/+/command/# -> site/paris/{area}/{asset}/command/#Bridge failure modes
| Failure | Symptom | Mitigation |
|---|---|---|
| WAN outage | Central stops receiving site data. | Local buffer, replay, offline status. |
| Bridge loop | Message rate explodes. | Directional mapping and loop markers. |
| Retained flood | Old retained messages propagate widely. | Retained policy per prefix. |
| Command arrives late | Unsafe delayed action. | Command expiration and device validation. |
| Duplicate replay | Data inserted twice. | message_id or sequence idempotency. |
AWS IoT Core, Azure IoT Hub and managed cloud IoT
Managed cloud IoT services provide MQTT connectivity plus device identity, policies, routing rules, cloud integrations, monitoring and operational tooling. They reduce broker operations but introduce quotas, cost models, service-specific conventions and cloud lock-in.
| Service | Usage | Strengths | Watch out |
|---|---|---|---|
| AWS IoT Core | Managed IoT MQTT in AWS. | Certificates, policies, rules, AWS integration. | Quotas, cost per message, IAM/policy complexity, lock-in. |
| Azure IoT Hub | Managed IoT in Azure. | Device identity, cloud-to-device, routes, Device Twin. | Azure-specific MQTT conventions, quotas, cost, lock-in. |
| Self-hosted EMQX/HiveMQ | Controlled MQTT platform. | Flexibility, data residency, custom auth, portable architecture. | Operations, HA, patching, security and monitoring to manage. |
| Cloud VM broker | Simple hosted broker. | Low cost, simple deployment. | HA and scaling are your responsibility. |
AWS IoT Core pattern
Devices
│
│ MQTT/TLS + device certificate
▼
AWS IoT Core
│
├── thing registry
├── IoT policies
├── rules engine
│ ├── Lambda
│ ├── Kinesis
│ ├── DynamoDB
│ ├── S3
│ └── SNS / SQS
└── CloudWatch logs and metricsAzure IoT Hub pattern
Devices
│
│ MQTT/TLS
▼
Azure IoT Hub
│
├── device identity
├── device-to-cloud messages
├── cloud-to-device messages
├── Device Twin
├── routing
│ ├── Event Hubs
│ ├── Storage
│ ├── Service Bus
│ └── Azure Functions
└── Azure Monitor / logsManaged vs self-hosted
| Criterion | Managed cloud IoT | Self-hosted MQTT |
|---|---|---|
| Time to market | Fast. | Slower. |
| Operations | Reduced. | You own it. |
| Identity | Built-in device registry. | Custom or broker plugin. |
| Cost | Usage-based, can grow. | Infra and team cost. |
| Portability | Lower. | Higher. |
| Customization | Service-limited. | Full control. |
MQTT ingress to Kafka, Kinesis, Event Hubs or analytics pipelines
MQTT brokers are excellent for device connectivity, but they are not long-term event logs. For replay, analytics, machine learning, data lakes and high-volume stream processing, MQTT is often connected to Kafka, Redpanda, Pulsar, Kinesis or Event Hubs.
MQTT to streaming architecture
Devices
│
▼
MQTT Broker
│
├── operational subscribers
└── streaming connector
│
▼
Kafka / Redpanda / Pulsar / Kinesis / Event Hubs
│
├── stream processing
├── data lake
├── anomaly detection
├── batch analytics
└── replay consumers| Need | MQTT role | Streaming role |
|---|---|---|
| Device connectivity | Primary. | Not ideal directly. |
| Pub/Sub routing | Primary. | Possible but different model. |
| Replay | Limited. | Primary. |
| Data lake ingestion | Ingress layer. | Pipeline backbone. |
| Analytics | Transport source. | Processing and replay source. |
Connector design
MQTT subscriber connector
│
├── subscribes to selected topics
├── validates payload envelope
├── adds metadata
│ ├── topic
│ ├── tenant
│ ├── site
│ ├── device_id
│ ├── received_at
│ └── qos
├── builds partition key
└── writes to streaming topicPartition key choices
| Partition key | Good for | Risk |
|---|---|---|
| device_id | Ordering per device. | Hot partitions for noisy devices. |
| site_id | Site-level processing. | Large sites may dominate. |
| tenant_id | Tenant isolation. | Uneven tenants create imbalance. |
| hash(topic) | Balanced distribution. | Less semantic ordering. |
Multi-region and disaster recovery
Multi-region MQTT is difficult because clients maintain persistent connections, sessions, retained state and QoS flows. The architecture must decide whether regions are active/passive, active/active, site-affine, tenant-affine or edge-first.
| Model | Usage | Strength | Complexity |
|---|---|---|---|
| Active/passive | DR standby region. | Simpler mental model. | Failover and data replication required. |
| Active/active by tenant | Tenants pinned to regions. | Good isolation. | Tenant migration complexity. |
| Active/active by geography | Devices connect to nearest region. | Low latency. | Global state and command routing complexity. |
| Edge-first | Sites operate locally and sync to cloud. | WAN outage resilience. | Replay and conflict handling. |
Active/passive diagram
Normal state:
Devices -> Region A MQTT Cluster -> Backend A -> Storage A
│
└── replicated data/config -> Region B standby
Failover:
Devices reconnect -> Region B MQTT Cluster -> Backend B -> Storage BMulti-region risks
| Risk | Why it happens | Mitigation |
|---|---|---|
| Duplicate messages | Replay from edge or region failover. | Global message_id and idempotency. |
| Command sent twice | Active/active command services. | Single command authority per device. |
| Retained state divergence | Regions hold different last values. | Region ownership or replicated state strategy. |
| Session loss | Persistent MQTT sessions are region-local. | Accept reconnect and rebuild state. |
| DNS failover delay | TTL and client caching. | Low TTL, client fallback list. |
DR checklist
[ ] Broker config replicated
[ ] ACLs replicated
[ ] Certificates available in DR region
[ ] Device registry replicated
[ ] Backend consumers deployed
[ ] Storage replication tested
[ ] DNS or endpoint failover tested
[ ] Client reconnect behavior tested
[ ] Command authority defined
[ ] Idempotency works across regions
[ ] Retained state recovery defined
[ ] Runbook documented
[ ] RTO and RPO definedCost model and limits
MQTT costs come from broker infrastructure, cloud message billing, TLS processing, bandwidth, storage writes, monitoring, retained state, logs, data egress and operational team time. Cloud managed IoT can be very efficient, but high-frequency telemetry can generate large message counts quickly.
Cost drivers
| Driver | Self-hosted impact | Managed cloud impact |
|---|---|---|
| Connected clients | Broker memory/sockets. | Service quotas and pricing tier. |
| Messages/sec | CPU/network/backend load. | Message billing can dominate. |
| Payload size | Bandwidth and parsing cost. | Message units, bandwidth and storage. |
| Rules/bridges | Compute and connector ops. | Rules engine and downstream service cost. |
| Storage | Database infrastructure. | Cloud storage, time-series, data lake cost. |
| Monitoring/logs | Observability stack cost. | Cloud logs and metrics can become expensive. |
| Egress | Bandwidth provider. | Cloud egress charges. |
Message volume estimate
Formula:
messages_per_month =
devices
* messages_per_device_per_minute
* 60
* 24
* 30
Example:
10,000 devices
1 message every 10 seconds = 6 messages/minute
10,000 * 6 * 60 * 24 * 30
= 2,592,000,000 messages/month
That is why telemetry frequency matters.Cost reduction levers
| Lever | Effect | Risk |
|---|---|---|
| Reduce frequency | Large message count reduction. | Less granular data. |
| Publish on change | Avoid useless repeated values. | Need heartbeat for freshness. |
| Aggregate at edge | Less cloud traffic. | Raw detail may be lost. |
| Filter bridge topics | Less central traffic. | Missing data if filters are wrong. |
| Batch low-priority logs | Lower message overhead. | Less real-time visibility. |
| Store aggregates longer than raw data | Lower storage cost. | Less forensic detail. |
Limits to define upfront
Per client:
- max messages/sec
- max payload size
- max subscriptions
- max inflight
- max queued messages
- session expiry
Per tenant:
- max devices
- max messages/day
- max retained messages
- max command rate
- max storage retention
Per platform:
- broker nodes
- backend worker capacity
- database write capacity
- dashboard fan-out capacityCloud and scaling decision checklist
Architecture checklist
[ ] Expected connected clients defined
[ ] Expected messages/sec defined
[ ] Average payload size estimated
[ ] Fan-out ratio estimated
[ ] QoS policy defined per flow
[ ] Offline behavior defined
[ ] Retained state policy defined
[ ] Persistent session policy defined
[ ] Broker choice justified
[ ] Cluster or single broker decision justified
[ ] Edge bridge strategy defined
[ ] Shared subscription groups defined
[ ] Backend buffer chosen
[ ] Storage throughput tested
[ ] Dashboard fan-out tested
[ ] Security and ACLs tested
[ ] Monitoring and alerting ready
[ ] Cost model estimated
[ ] Quotas and limits documentedCloud decision matrix
| Requirement | Best fit |
|---|---|
| Fast prototype | Single Mosquitto or managed cloud trial. |
| Industrial edge autonomy | Local broker + bridge. |
| Large self-hosted fleet | EMQX/HiveMQ cluster. |
| AWS-native IoT | AWS IoT Core. |
| Azure-native IoT | Azure IoT Hub. |
| Analytics and replay | MQTT to Kafka/Event Hubs/Kinesis. |
| Existing RabbitMQ platform | RabbitMQ MQTT plugin if requirements fit. |
Production acceptance tests
[ ] Load test with expected client count
[ ] Load test with expected message rate
[ ] Burst test above expected peak
[ ] Reconnect storm test
[ ] Slow subscriber test
[ ] Broker node failure test
[ ] TLS certificate rotation test
[ ] ACL negative tests
[ ] Retained flood test
[ ] Offline edge bridge test
[ ] Replay duplicate test
[ ] Shared subscription scaling test
[ ] Backend database saturation test
[ ] Dashboard freshness test
[ ] Command ack timeout test
[ ] Disaster recovery failover testFinal architecture rule
Broker clustering, edge bridges, shared subscriptions and cloud IoT services only work well when topic design, QoS, idempotency, buffering, storage, monitoring and cost control are designed together.
Useful URLs
AWS IoT Core:
https://docs.aws.amazon.com/iot/
Azure IoT Hub:
https://learn.microsoft.com/azure/iot-hub/
EMQX:
https://www.emqx.com/
HiveMQ:
https://www.hivemq.com/
Mosquitto:
https://mosquitto.org/
RabbitMQ MQTT plugin:
https://www.rabbitmq.com/docs/mqtt
MQTT official:
https://mqtt.org/Where MQTT fits
MQTT is best understood as a lightweight Pub/Sub protocol for distributed clients, devices, gateways, edge agents and real-time telemetry. It is not a universal replacement for RabbitMQ, Kafka, WebSocket, HTTP APIs or NATS. Each technology solves a different communication problem.
The best architectures often combine several of them: MQTT for field ingestion, RabbitMQ for backend task routing, Kafka for durable event streaming, WebSocket for browser live updates, HTTP for admin APIs and NATS for lightweight cloud-native messaging.
| Technology | Best usage | Avoid when | Complement with MQTT |
|---|---|---|---|
| MQTT | Devices, IoT, M2M, edge, telemetry, unreliable networks. | You need direct large-scale replay or heavy analytics in the broker. | Field ingress layer. |
| RabbitMQ / AMQP | Enterprise queues, routing, back-office workflows, worker jobs. | Millions of direct constrained IoT devices without specific design. | MQTT ingress to AMQP queues. |
| Kafka | Durable event streaming, replay, analytics, data lake pipelines. | Constrained devices connecting directly. | MQTT broker to Kafka connector. |
| WebSocket | Real-time browser UI and bidirectional web sessions. | Constrained devices, topic ACLs, MQTT QoS semantics. | MQTT backend to WebSocket dashboard. |
| HTTP / REST | CRUD APIs, admin APIs, synchronous request/response. | High-frequency telemetry and persistent device streams. | HTTP admin API plus MQTT telemetry. |
| NATS | Fast cloud-native messaging, service communication, request/reply. | MQTT ecosystem, device brokers and IoT tooling are required. | Bridge or backend messaging layer. |
Mental model
Device / Field side:
MQTT
├── lightweight clients
├── Pub/Sub topics
├── QoS 0/1/2
├── retained state
└── Last Will
Backend workflow side:
RabbitMQ
├── queues
├── exchanges
├── routing keys
├── retries
└── dead-letter queues
Analytics side:
Kafka
├── durable log
├── partitions
├── replay
├── stream processing
└── data lake feed
Browser side:
WebSocket
├── live UI
├── bidirectional session
└── frontend updates
API side:
HTTP
├── CRUD
├── admin
├── synchronous calls
└── public APIs
Cloud-native side:
NATS
├── lightweight messaging
├── request/reply
├── subjects
└── service mesh style communicationDecision matrix by technical need
| Need | Best fit | Second option | Why |
|---|---|---|---|
| IoT devices publishing telemetry | MQTT | HTTP if low frequency | Lightweight Pub/Sub, QoS, retained, Last Will. |
| Backend jobs and worker queues | RabbitMQ | Redis Queue, Celery broker, NATS | Queue semantics, routing, retries, DLQ. |
| Event replay and analytics | Kafka | Redpanda, Pulsar, Event Hubs, Kinesis | Durable ordered log and consumer replay. |
| Browser live dashboard | WebSocket | SSE, MQTT over WebSocket | Native web real-time channel. |
| Public CRUD API | HTTP REST | GraphQL | Simple request/response and tooling. |
| Cloud-native service messaging | NATS | RabbitMQ | Low latency, simple subjects, request/reply. |
| Industrial machine state | MQTT + Sparkplug | OPC UA | MQTT transport plus industrial semantic layer. |
| Large file transfer | HTTP / object storage | S3-compatible storage | MQTT is not designed for large files. |
Decision tree
Is the producer a device, gateway or edge agent?
├── yes -> MQTT
└── no
│
▼
Do you need durable replay?
├── yes -> Kafka / Redpanda / Pulsar
└── no
│
▼
Do you need backend queues and retries?
├── yes -> RabbitMQ
└── no
│
▼
Is the consumer a browser UI?
├── yes -> WebSocket / SSE
└── no
│
▼
Is it a synchronous API?
├── yes -> HTTP / REST / GraphQL
└── no -> NATS or lightweight messagingShort classification
MQTT:
field Pub/Sub transport
RabbitMQ:
enterprise queue broker
Kafka:
durable event streaming log
WebSocket:
browser real-time channel
HTTP:
request/response API
NATS:
fast cloud-native messagingMQTT vs RabbitMQ / AMQP
MQTT and RabbitMQ do not target the same primary world. MQTT was designed for lightweight Pub/Sub communication with devices and unstable networks. RabbitMQ is an enterprise messaging broker built around exchanges, queues, routing keys, acknowledgements, retries and dead-lettering.
| Aspect | MQTT | RabbitMQ / AMQP |
|---|---|---|
| Primary model | Topic Pub/Sub. | Exchange to queue routing. |
| Best producers | Devices, gateways, edge agents. | Backend services, jobs, business systems. |
| Delivery | QoS 0/1/2. | Queue acknowledgements, durable queues, retries. |
| Offline clients | Persistent sessions, queued QoS messages. | Queues hold messages until consumers process them. |
| Routing | Topic filters and wildcards. | Exchanges, bindings, routing keys. |
| Dead-lettering | Not native in the same way. | Strong DLQ patterns. |
| Device ecosystem | Very strong. | Less natural for constrained devices. |
When RabbitMQ is better
- Backend job queues.
- Business workflows with retries.
- Dead-letter queues and retry policies.
- Routing to multiple internal queues.
- Celery or worker processing.
- Enterprise integration with AMQP.
Hybrid MQTT to RabbitMQ pattern
Devices
│
│ MQTT
▼
MQTT Broker or RabbitMQ MQTT Plugin
│
▼
AMQP Exchange
│
├── queue.storage
│ └── database writer
│
├── queue.alerts
│ └── alert engine
│
├── queue.websocket
│ └── live dashboard
│
└── queue.dead_letter
└── failed messagesDecision examples
| Scenario | Choice | Reason |
|---|---|---|
| Sensor sends temperature every 10 seconds. | MQTT | Lightweight device telemetry. |
| Django sends invoice generation jobs. | RabbitMQ | Worker queue and retry semantics. |
| Device ingress then multiple backend queues. | MQTT + RabbitMQ | MQTT first mile, RabbitMQ internal routing. |
| Need DLQ for failed enrichment. | RabbitMQ | Dead-letter pattern is native and clean. |
MQTT vs Kafka
MQTT and Kafka are frequently complementary. MQTT is a connectivity and Pub/Sub protocol for clients and devices. Kafka is a durable distributed event log for replay, analytics, stream processing and data pipelines.
| Aspect | MQTT | Kafka |
|---|---|---|
| Primary role | Device connectivity and live Pub/Sub. | Durable event streaming and replay. |
| Data retention | Not an event log by default. | Retention by time or size. |
| Replay | Limited. | Core feature. |
| Consumer model | Subscriptions and session behavior. | Consumer groups and offsets. |
| Ordering | Topic/session dependent. | Ordering within partition. |
| Device suitability | High. | Low for constrained devices. |
| Analytics | Needs downstream storage. | Designed for analytics pipelines. |
When Kafka is better
- Long-term event retention.
- Replay from offsets.
- Stream processing.
- Data lake ingestion.
- Multiple analytics consumers.
- High-throughput backend event pipelines.
MQTT to Kafka architecture
Devices / Gateways
│
│ MQTT/TLS
▼
MQTT Broker
│
├── operational dashboard
├── alerting subscriber
└── Kafka connector
│
▼
Kafka / Redpanda
│
├── stream processing
├── data lake
├── ML pipeline
├── audit replay
└── batch analyticsTopic mapping example
MQTT topic:
prod/acme/paris/line-02/gw-123/telemetry/temperature
Kafka topic options:
iot.telemetry
iot.telemetry.acme
iot.telemetry.temperature
Kafka key options:
device_id = gw-123
site_id = paris
hash(topic)
Recommended event metadata:
{
"mqtt_topic": "...",
"tenant": "acme",
"site": "paris",
"device_id": "gw-123",
"metric": "temperature",
"event_ts": "...",
"ingestion_ts": "..."
}MQTT vs WebSocket
WebSocket is a browser and application transport for bidirectional real-time connections. MQTT is a Pub/Sub protocol with topics, QoS, retained messages and Last Will. MQTT can run over WebSocket, but they are not the same abstraction.
| Aspect | MQTT | WebSocket |
|---|---|---|
| Primary usage | Devices, brokers, Pub/Sub. | Browser-to-server real-time channel. |
| Routing | Topic-based routing. | Application-defined. |
| QoS | QoS 0/1/2. | No MQTT-style QoS by default. |
| Retained state | Native retained messages. | Application must implement. |
| Last Will | Native. | Application must implement. |
| Browser fit | Possible via MQTT over WebSocket. | Native and common. |
When WebSocket is better
- Interactive browser dashboards.
- Chat-like UI.
- Live admin screens.
- Application-specific push from backend to browser.
- When direct MQTT exposure to frontend is not desired.
Recommended dashboard pattern
Devices
│
│ MQTT
▼
MQTT Broker
│
▼
Backend subscriber
│
├── validates data
├── applies user permissions
├── aggregates state
└── pushes safe updates
│
▼
WebSocket Gateway
│
▼
Browser DashboardDirect MQTT over WebSocket
Browser
│
│ MQTT over WebSocket
▼
MQTT Broker
Use only if:
- broker supports WebSocket
- ACLs are extremely strict
- browser credentials are short-lived
- topic scope is limited per user
- sensitive industrial topics are not exposed
- rate limits are enforcedMQTT vs HTTP REST / HTTP Streaming / SSE
HTTP is the standard model for synchronous APIs, CRUD operations, public integrations and admin actions. MQTT is better for persistent, lightweight, topic-based telemetry and device communication. HTTP streaming and SSE can push events to clients, but they do not replace MQTT for device fleets.
| Aspect | MQTT | HTTP / REST | SSE / HTTP streaming |
|---|---|---|---|
| Communication style | Pub/Sub. | Request/response. | Server-to-client stream. |
| Connection | Persistent broker connection. | Request-based. | Persistent HTTP stream. |
| Device telemetry | Excellent. | OK for low frequency. | Not ideal. |
| CRUD API | Not ideal. | Excellent. | Not intended. |
| Browser push | Possible with MQTT over WebSocket. | Polling unless custom. | Good for one-way updates. |
| QoS semantics | Native MQTT QoS. | HTTP status and retries. | Application-defined. |
When HTTP is better
- Admin APIs.
- CRUD operations.
- Public APIs.
- File uploads and downloads.
- Simple low-frequency device reporting.
- Human-driven requests.
MQTT + HTTP pattern
Device telemetry:
Device -> MQTT Broker -> Ingestion backend -> Time-series DB
Admin operations:
Browser -> HTTP API -> Django backend -> Database
Commands:
Browser -> HTTP API -> Command service -> MQTT command topic
Dashboard:
Backend -> WebSocket/SSE -> BrowserAvoid this confusion
| Bad idea | Better design |
|---|---|
| Use REST polling every second for 50k devices. | Use MQTT telemetry and backend aggregation. |
| Use MQTT for public CRUD API. | Use REST or GraphQL. |
| Send firmware files through MQTT payloads. | Use HTTP/object storage and MQTT notification. |
| Expose device command directly from browser MQTT. | Use HTTP API to command service, then MQTT. |
MQTT vs NATS
NATS is a very fast cloud-native messaging system with subjects, request/reply, Pub/Sub and optional persistence through JetStream. It is excellent for service-to-service communication. MQTT is stronger when the problem is IoT, device connectivity, retained state, Last Will and MQTT-specific broker ecosystem.
| Aspect | MQTT | NATS |
|---|---|---|
| Primary ecosystem | IoT, devices, brokers, edge. | Cloud-native services and messaging. |
| Subjects/topics | MQTT topics and wildcards. | NATS subjects and wildcards. |
| QoS | QoS 0/1/2. | Different delivery model, JetStream for persistence. |
| Retained state | Native retained messages. | Different pattern, usually application or JetStream. |
| Last Will | Native. | Application-defined. |
| Request/reply | Possible in MQTT 5 pattern. | Very natural. |
| Device tooling | Very strong. | Less standard in IoT device ecosystem. |
When NATS is better
- Fast service-to-service messaging.
- Cloud-native microservices.
- Request/reply between internal services.
- Lightweight internal event bus.
- JetStream persistence for selected flows.
MQTT + NATS hybrid
Devices
│
│ MQTT
▼
MQTT Broker
│
▼
Ingestion service
│
├── validates payload
├── normalizes event
└── publishes internal event
│
▼
NATS
│
├── alert service
├── dashboard service
├── enrichment service
└── command orchestratorDecision examples
| Scenario | Choice |
|---|---|
| Gateway publishes telemetry over cellular network. | MQTT. |
| Microservice asks another service for current calculation. | NATS request/reply. |
| Internal event bus with low latency. | NATS. |
| Device online/offline with Last Will. | MQTT. |
Hybrid architecture patterns
Pattern 1: MQTT to Kafka for analytics
Devices
│
▼
MQTT Broker
│
▼
Kafka Connector
│
▼
Kafka
│
├── stream processing
├── data lake
├── ML features
└── replay consumersPattern 2: MQTT to RabbitMQ for workers
Devices
│
▼
MQTT Broker
│
▼
Ingestion service
│
▼
RabbitMQ exchange
│
├── queue.storage
├── queue.alerting
├── queue.enrichment
└── queue.dead_letterPattern 3: MQTT to WebSocket for UI
Devices
│
▼
MQTT Broker
│
▼
Backend subscriber
│
├── checks permissions
├── aggregates state
└── pushes safe updates
│
▼
WebSocket
│
▼
Browser dashboardPattern 4: HTTP API to MQTT command
Operator browser
│
│ HTTP POST /commands
▼
Django API
│
├── RBAC check
├── device state check
├── command journal
└── publish MQTT command
│
▼
Device
│
├── ack topic
└── result topicPattern 5: MQTT to NATS internal bus
MQTT Broker
│
▼
Ingestion service
│
├── validate
├── normalize
└── publish internal event
│
▼
NATS
│
├── alert service
├── state service
├── notification service
└── command serviceAnti-patterns and wrong choices
| Anti-pattern | Why it is wrong | Better design |
|---|---|---|
| Use MQTT as a long-term event log. | MQTT broker is not Kafka. | Bridge MQTT to Kafka or database. |
| Use Kafka directly from tiny devices. | Heavy client and operational mismatch. | Use MQTT first mile, Kafka downstream. |
| Use REST polling for high-frequency telemetry. | Wasteful and hard to scale. | Use MQTT or streaming architecture. |
| Expose all MQTT topics to browsers. | Security and data leakage risk. | Backend bridge to WebSocket. |
| Use RabbitMQ as raw massive IoT broker without design. | Device sessions and MQTT semantics may not fit. | Use MQTT broker or RabbitMQ plugin carefully. |
| Use WebSocket as device protocol for constrained clients. | No native MQTT QoS, retained, Last Will. | Use MQTT. |
| Use MQTT for file upload. | Large payloads stress broker and clients. | Use object storage and MQTT notification. |
Architecture smell checklist
[ ] One technology is used for every communication pattern
[ ] Devices talk directly to Kafka
[ ] Browser subscribes to unrestricted MQTT wildcard
[ ] MQTT broker stores business history
[ ] Commands are sent without ack/result
[ ] RabbitMQ queues are used as time-series storage
[ ] REST endpoint receives thousands of telemetry posts per second
[ ] WebSocket server implements custom broker logic
[ ] No idempotency between MQTT and downstream queue
[ ] No clear ownership between MQTT, Kafka and databaseCorrection map
Need telemetry from devices:
MQTT
Need durable replay:
Kafka / Redpanda / Pulsar
Need worker retry and DLQ:
RabbitMQ
Need live browser UI:
WebSocket / SSE
Need CRUD API:
HTTP REST / GraphQL
Need internal fast messaging:
NATS
Need large file transfer:
Object storage + HTTPFinal choice guide
Recommended combinations
| Project type | Recommended stack | Reason |
|---|---|---|
| Smart home / building | MQTT + WebSocket dashboard + PostgreSQL/Redis. | Device telemetry, live UI, current state. |
| Industrial telemetry | MQTT + Sparkplug + historian + alerting. | Industrial semantics and time-series. |
| Fleet tracking | MQTT + Kafka + time-series + map UI. | Mobile telemetry plus replay and analytics. |
| Backend job system | RabbitMQ + Celery. | Queues, retries, worker routing. |
| Large data platform | MQTT ingress + Kafka backbone + data lake. | Device ingress and durable analytics. |
| Microservices internal bus | NATS or RabbitMQ. | Fast internal messaging or enterprise queues. |
| Public SaaS API | HTTP REST/GraphQL + async queue behind. | Standard client integration. |
One-line recommendations
Use MQTT when:
devices, telemetry, edge, Pub/Sub, QoS, retained state, Last Will.
Use RabbitMQ when:
backend queues, retries, DLQ, enterprise routing.
Use Kafka when:
replay, analytics, durable event log, stream processing.
Use WebSocket when:
browser needs live updates.
Use HTTP when:
request/response, CRUD, public API, file transfer.
Use NATS when:
fast internal cloud-native messaging or request/reply.Final architecture example
Industrial SaaS platform:
Field layer:
Devices / PLC gateways
│
▼
MQTT + Sparkplug
Messaging layer:
MQTT broker cluster
│
├── live operations
├── command service
└── connector
Streaming layer:
Kafka / Redpanda
│
├── replay
├── analytics
└── data lake
Backend layer:
RabbitMQ / Celery
│
├── enrichment
├── notifications
└── reports
UI layer:
WebSocket
│
▼
Browser dashboards
API layer:
HTTP REST / GraphQL
│
▼
Admin, users, configurationFinal checklist
[ ] Communication pattern identified
[ ] Producer type identified
[ ] Consumer type identified
[ ] Need for replay decided
[ ] Need for queue retries decided
[ ] Need for browser live UI decided
[ ] Need for public API decided
[ ] Need for device QoS decided
[ ] Need for retained state decided
[ ] Need for Last Will decided
[ ] Storage responsibility separated
[ ] Command responsibility separated
[ ] Security boundary defined
[ ] Operational ownership definedWhy MQTT anti-patterns are dangerous
MQTT looks simple at first: connect, publish, subscribe. In production, most failures do not come from the protocol itself. They come from weak topic design, poor identity mapping, missing ACLs, wrong QoS choices, unversioned payloads, retained misuse, invisible backend lag and insufficient operational controls.
The most dangerous MQTT mistakes are architectural. They may work in a demo with ten devices, then fail badly with thousands of devices, unstable networks, firmware updates, command flows, multi-tenant access and real-time dashboards.
| Anti-pattern family | Typical symptom | Production impact | Primary correction |
|---|---|---|---|
| Bad topics | Subscribers receive too much or too little. | Routing chaos, weak ACLs, hard debugging. | Stable namespace and documented topic contract. |
| Wrong QoS | Latency, duplicates, losses or broker pressure. | Slow ingestion or unreliable delivery. | QoS per flow plus idempotent backend. |
| Weak identity | Devices impersonate each other or disconnect each other. | Security incident and data corruption. | Unique client ID tied to certificate or registry. |
| No schema version | Firmware update breaks consumers. | Rejected data, silent wrong parsing, incident. | Payload versioning and compatibility policy. |
| Retained misuse | Old messages appear as current state. | Wrong dashboard state or unsafe command replay. | Retained only for current state. |
| Single fragile broker | One outage stops all devices. | Full ingestion outage. | HA, backup, edge broker or managed service. |
Anti-pattern map
MQTT production failures usually start here:
Design
├── topics are not structured
├── telemetry, status, event and command are mixed
├── no namespace per tenant/site/device
└── no versioning strategy
Security
├── anonymous access
├── no topic ACLs
├── shared credentials
├── client_id not tied to identity
└── browser can subscribe too broadly
Delivery
├── QoS 2 everywhere
├── no idempotency
├── commands without ack/result
└── no offline replay strategy
Operations
├── no broker metrics
├── no backend lag metric
├── no dropped message alert
├── no retained cleanup procedure
└── no incident runbooksCore correction principle
Explicit topics, explicit identity, explicit ACLs, explicit QoS, explicit payload schemas, explicit retained policy, explicit command lifecycle and explicit monitoring.
Topic design anti-patterns
| Bad design | Symptom | Risk | Correction |
|---|---|---|---|
data | Everything goes to one topic. | No routing, no filtering, no clean ACL. | Use hierarchical namespace. |
device123 | No tenant, site, stream or metric. | Hard to subscribe by business scope. | Add tenant/site/asset/stream/signal. |
factory/# everywhere | Subscribers receive massive traffic. | Performance and security issue. | Use narrow topic filters. |
| Mixed telemetry and commands | Same prefix carries values and actions. | Unsafe ACLs and confusing routing. | Separate telemetry, status, event, command. |
| Human labels in topics | Renaming breaks consumers. | Topic instability. | Use stable IDs and map labels in backend. |
| Accents, spaces, inconsistent case | Parsing and ACL mistakes. | Interop and debugging problems. | Use lowercase ASCII and one naming convention. |
Bad topic examples
Bad:
data
temperature
Device 123
Factory/Paris/Line 1/Temperature
acme/all
prod/+/+/+/#
commands
telemetry_and_commands/gw-123Correct topic structure
Recommended:
prod/{tenant}/{site}/{area}/{asset}/{stream}/{signal}
Examples:
prod/acme/paris/line-02/gw-123/telemetry/temperature
prod/acme/paris/line-02/gw-123/status/availability
prod/acme/paris/line-02/gw-123/event/alarm
prod/acme/paris/line-02/gw-123/command/reboot
prod/acme/paris/line-02/gw-123/ack/reboot
prod/acme/paris/line-02/gw-123/result/rebootGood subscription examples
All telemetry for one tenant:
prod/acme/+/+/+/telemetry/#
One site:
prod/acme/paris/#
One device:
prod/acme/paris/line-02/gw-123/#
All alarms:
prod/acme/+/+/+/event/alarm
All command results:
prod/acme/+/+/+/result/#QoS and delivery anti-patterns
| Anti-pattern | Symptom | Why it hurts | Better design |
|---|---|---|---|
| QoS 2 everywhere | High latency, low throughput, more broker state. | Unnecessary protocol overhead. | QoS 1 plus idempotency for most critical flows. |
| QoS 0 for alarms | Important events can disappear. | No delivery acknowledgement. | QoS 1, event_id, audit table. |
| No backend idempotency | Duplicate rows, repeated actions. | QoS 1 can redeliver. | message_id, command_id, sequence_number. |
| QoS used as business guarantee | Command delivered but not executed. | MQTT ack is not business ack. | Command ack/result topics. |
| No offline replay | Data lost during network outage. | Edge disappears from central view. | Local buffer with sequence and replay. |
QoS selection errors
Bad:
- QoS 2 for every temperature message
- QoS 0 for safety alarm
- QoS 1 without message_id
- command without command_id
- assuming QoS 2 means database exactly once
- no duplicate handling after reconnectCorrect delivery model
Telemetry high frequency:
QoS 0 or 1
message_id optional but recommended
time-series storage
Status:
QoS 1
retained true
Last Will for offline state
Event / alarm:
QoS 1
event_id required
idempotent event storage
Command:
QoS 1 or 2
command_id required
expires_at required
ack topic required
result topic required
Backend:
deduplicate before business writeDuplicate-safe processing
Incoming message
│
▼
Build idempotency key
│
▼
Try insert inbox record
│
├── new
│ └── process and mark processed
│
└── already exists
├── increment duplicate_count
└── skip business writePayload and schema anti-patterns
| Anti-pattern | Symptom | Risk | Correction |
|---|---|---|---|
No schema_version | Firmware update breaks backend. | Silent wrong parsing or rejected data. | Add version and parser per version. |
No message_id | Cannot deduplicate. | Duplicate writes. | Add message_id or sequence. |
| No timestamp | Cannot distinguish event time from ingestion time. | Wrong analytics. | Add event_ts and store ingestion_ts. |
| Unit missing | Celsius, Fahrenheit, bar, psi confusion. | Wrong decisions. | Add unit and metric registry. |
| Dynamic metric names | Cardinality explosion. | Storage and dashboard chaos. | Allowed metric list. |
| Huge payloads | Broker and backend pressure. | Memory, latency, denial-of-service. | Max packet size and payload limits. |
Bad payload
{
"temp": "72",
"time": "now",
"device": "abc"
}
Problems:
- no schema_version
- no message_id
- no numeric guarantee
- no unit
- weak timestamp
- unclear device identity
- no quality field
- no extensibilityCorrect payload
{
"schema_version": "1.0",
"message_id": "msg-20260425-0001",
"device_id": "gw-123",
"sequence": 18422,
"metric": "temperature",
"value": 21.7,
"unit": "C",
"quality": "good",
"event_ts": "2026-04-25T10:00:00Z"
}Validation pipeline
Payload received
│
▼
Check max size
│
▼
Parse JSON safely
│
▼
Validate schema_version
│
▼
Validate device_id against topic identity
│
▼
Validate metric allowlist
│
▼
Validate value type and unit
│
▼
Validate timestamp sanity
│
▼
Build idempotency key
│
▼
Process or reject with visible reasonIdentity and client ID anti-patterns
| Anti-pattern | Symptom | Risk | Correction |
|---|---|---|---|
| Shared client_id | Clients disconnect each other. | Unstable sessions and missing data. | Unique client ID per device or app instance. |
| Client chooses any client_id | Device impersonation. | Security breach. | Bind client_id to certificate or registry. |
| Shared password for all devices | One leak compromises the fleet. | Mass compromise. | Per-device credentials or mTLS. |
| No device registry | Cannot revoke or audit devices. | Slow incident response. | Registry with active, disabled, revoked states. |
| Payload identity trusted blindly | Payload claims another device. | Data corruption. | Compare topic, auth identity and payload. |
Shared client ID failure
Device A connects with:
client_id = gateway
Device B connects with:
client_id = gateway
Broker behavior:
- Device B connection replaces Device A
- Device A reconnects
- Device A replaces Device B
- Both devices enter reconnect loop
Symptoms:
- unstable devices
- missing telemetry
- repeated disconnect logs
- Last Will false alarmsSecure identity model
MQTT CONNECT:
client_id = gw-123
Certificate:
CN = gw-123
fingerprint = abc123...
Registry:
device_id = gw-123
tenant = acme
site = paris
status = active
allowed_namespace = prod/acme/paris/line-02/gw-123/#
Payload:
device_id = gw-123
Validation:
client_id == certificate identity
certificate identity == registry device
topic asset == registry device
payload device_id == registry deviceDevice states
active:
device can connect and publish
disabled:
device is known but blocked by operator
revoked:
device credential is compromised or retired
unknown:
device identity is not registered
quarantined:
device can connect only to restricted diagnostic topicsSecurity and ACL anti-patterns
| Anti-pattern | Symptom | Impact | Correction |
|---|---|---|---|
| Anonymous access enabled | Unknown clients can connect. | Data injection or exfiltration. | Disable anonymous access. |
| Public plaintext 1883 | Traffic readable on network. | Credential and data exposure. | Use 8883 with TLS or mTLS. |
| No topic ACL | Any client can publish or subscribe anywhere. | Total tenant isolation failure. | Deny-by-default ACLs. |
| Device can subscribe to tenant wildcard | Device reads other devices. | Data leak. | Device reads only own command namespace. |
| Device can publish command topics | Device can command itself or others. | Unsafe operations. | Only command service publishes commands. |
| Admin UI public | Broker management exposed. | Platform takeover risk. | VPN, MFA, private access only. |
Bad ACL
Bad:
allow all users publish #
allow all users subscribe #
allow anonymous true
listener 1883 publicCorrect ACL model
Device gw-123:
allow publish:
prod/acme/paris/line-02/gw-123/telemetry/#
prod/acme/paris/line-02/gw-123/status/#
prod/acme/paris/line-02/gw-123/event/#
allow subscribe:
prod/acme/paris/line-02/gw-123/command/#
deny publish:
prod/acme/paris/line-02/gw-123/command/#
deny subscribe:
prod/acme/#
Command service:
allow publish:
prod/acme/+/+/+/command/#
allow subscribe:
prod/acme/+/+/+/ack/#
prod/acme/+/+/+/result/#Security smoke tests
[ ] Anonymous connect is rejected
[ ] Wrong certificate is rejected
[ ] Device cannot subscribe to prod/acme/#
[ ] Device cannot publish to another device namespace
[ ] Device cannot publish command topics
[ ] Dashboard cannot subscribe outside user scope
[ ] Revoked device cannot reconnect
[ ] ACL denied events are logged and alertableRetained message and Last Will anti-patterns
| Anti-pattern | Symptom | Risk | Correction |
|---|---|---|---|
| Telemetry retained | Every new subscriber receives old measurement. | Confusing state and broker growth. | Do not retain high-frequency telemetry. |
| Commands retained | Device reconnects and receives old command. | Unsafe repeated action. | Do not retain commands; use expiration. |
| No retained status | Dashboard starts with unknown state. | Poor visibility. | Retain current availability/status. |
| No Last Will | Dead devices remain online. | False operational state. | Configure Last Will offline retained message. |
| Retained cleanup ignored | Old topics stay forever. | Stale state and memory growth. | Cleanup procedure and retained policy. |
Bad retained command
Topic:
prod/acme/paris/line-02/gw-123/command/reboot
Payload:
{"command_id":"cmd-001","action":"reboot"}
retain = true
Problem:
A device reconnecting later may receive an old reboot command.Correct retained usage
Retained status:
prod/acme/paris/line-02/gw-123/status/availability
Payload:
{
"schema_version": "1.0",
"device_id": "gw-123",
"state": "online",
"ts": "2026-04-25T10:00:00Z"
}
retain = true
Last Will:
same topic
payload = {"state":"offline","reason":"connection_lost"}
retain = trueRetained policy
Allowed retained:
- status/availability
- status/mode
- status/health
- current configuration state
- latest safe dashboard state
Forbidden retained:
- telemetry high frequency
- event history
- alarm history
- command topics
- result topics unless explicitly modeledScaling and architecture anti-patterns
| Anti-pattern | Symptom | Failure mode | Correction |
|---|---|---|---|
| Single broker for critical production | One VM outage stops all ingestion. | Total platform outage. | HA cluster, backup broker, edge autonomy. |
| One subscriber processes everything | Backlog grows, no isolation. | Alarms delayed by telemetry spike. | Shared subscriptions and separated flows. |
| No backend buffer | DB slowness blocks MQTT callback. | Message delays and drops. | Redis Streams, Kafka, RabbitMQ or staging table. |
| Bridge everything | Central broker receives useless traffic. | Cost and performance explosion. | Filter bridge topics and aggregate at edge. |
| No replay design | WAN outage loses data. | Missing history. | Edge buffer and sequence_number. |
| No load test | Production fails at first spike. | Unknown capacity. | Test clients, message rate, QoS and backend. |
Bad pipeline
MQTT callback
│
├── parse JSON
├── call external API
├── write multiple DB tables
├── send email
└── push WebSocket
Problem:
callback becomes slow
subscriber cannot keep up
broker queues grow
messages become lateCorrect pipeline
MQTT callback
│
├── parse minimal envelope
├── check size
└── enqueue quickly
│
▼
Buffer
├── Redis Stream
├── Kafka
├── RabbitMQ
└── staging table
│
▼
Workers
├── validate
├── deduplicate
├── enrich
├── store
├── alert
└── notify dashboardScaling correction map
Too many messages:
- reduce frequency
- publish on change
- aggregate at edge
- filter bridge topics
Too much backend lag:
- shared subscriptions
- more workers
- batch writes
- separate critical queues
Too many clients:
- broker cluster
- tune file descriptors
- load balancer
- reconnect backoff
Too many duplicates:
- message_id
- sequence_number
- inbox table
- idempotent writesOperations and monitoring anti-patterns
| Anti-pattern | Symptom | Risk | Correction |
|---|---|---|---|
| Only checking broker uptime | Broker is up but data is stale. | False confidence. | Monitor end-to-end freshness. |
| No dropped message alert | Data loss discovered late. | Missing audit and history. | Alert on dropped/rejected messages. |
| No queue depth metric | Lag grows silently. | Delayed dashboards and alarms. | Monitor oldest pending age. |
| No auth failure monitoring | Attack or provisioning issue hidden. | Security incident. | Alert on auth and ACL spikes. |
| No certificate expiry alert | Mass disconnect on expiry. | Full fleet outage. | Track expiry and rotate early. |
| No retained cleanup runbook | Stale state remains forever. | Wrong dashboards. | Document cleanup command and policy. |
Missing metrics checklist
Danger if you do not monitor:
[ ] connected clients
[ ] messages in/sec
[ ] messages out/sec
[ ] dropped messages
[ ] rejected messages
[ ] auth failures
[ ] ACL denied
[ ] session queue depth
[ ] retained count
[ ] backend queue depth
[ ] oldest pending age
[ ] processing latency
[ ] duplicate count
[ ] rejected payload count
[ ] data freshness
[ ] command ack latencyCorrect production dashboard
MQTT production dashboard
├── Broker health
│ ├── uptime
│ ├── CPU / RAM
│ ├── connected clients
│ └── messages in/out
│
├── Reliability
│ ├── dropped messages
│ ├── rejected messages
│ ├── queued messages
│ └── inflight messages
│
├── Security
│ ├── auth failures
│ ├── ACL denied
│ ├── invalid certificates
│ └── suspicious wildcards
│
├── Backend ingestion
│ ├── queue depth
│ ├── oldest pending age
│ ├── processing latency
│ └── DB write latency
│
└── Business freshness
├── latest telemetry age
├── alarm delay
├── command ack time
└── offline devicesIncident triage
Data is stale
│
├── Are devices connected?
│ ├── no -> network/TLS/auth
│ └── yes
│
├── Are messages entering broker?
│ ├── no -> publisher issue
│ └── yes
│
├── Are messages leaving broker?
│ ├── no -> subscription/ACL/session issue
│ └── yes
│
├── Is backend queue growing?
│ ├── yes -> worker or DB bottleneck
│ └── no
│
└── Is dashboard stale?
├── yes -> cache/API/WebSocket issue
└── no -> incident scoped or resolvedProduction anti-pattern checklist
Red flags before production
[ ] Topics are not documented
[ ] Topic namespace has no tenant or site
[ ] Telemetry, status, event and command are mixed
[ ] Devices share the same client_id
[ ] Devices share the same password
[ ] Anonymous access is enabled
[ ] Port 1883 is public
[ ] ACLs allow subscribe #
[ ] Payload has no schema_version
[ ] Payload has no message_id for QoS 1
[ ] Backend has no idempotency
[ ] Commands have no command_id
[ ] Commands have no ack/result
[ ] Commands are retained
[ ] Telemetry is retained
[ ] Last Will is not configured
[ ] Broker is a single point of failure
[ ] MQTT callback writes directly to slow systems
[ ] No backend queue or buffer
[ ] No dropped message alert
[ ] No backend lag metric
[ ] No certificate expiry alert
[ ] No incident runbookCorrective architecture checklist
[ ] Stable topic contract
[ ] Separate telemetry/status/event/command
[ ] Unique client_id per device
[ ] Client identity tied to certificate or registry
[ ] Deny-by-default ACLs
[ ] TLS or mTLS enabled
[ ] Payload schema_version
[ ] message_id or sequence_number
[ ] Backend idempotency
[ ] Command journal
[ ] Command ack and result topics
[ ] Retained only for current state
[ ] Last Will offline status
[ ] Shared subscriptions for workers
[ ] Redis/Kafka/RabbitMQ buffer
[ ] Time-series storage for history
[ ] Broker metrics dashboard
[ ] Backend lag dashboard
[ ] Security alerts
[ ] Load test
[ ] Reconnect storm test
[ ] Negative ACL testsFinal production rule
They are usually caused by missing conventions, missing identity binding, missing idempotency, missing ACLs, missing monitoring or missing operational runbooks. Fix the architecture before increasing traffic.
Minimal safe baseline
Minimum safe MQTT baseline:
- TLS on 8883
- no anonymous access
- unique client_id
- deny-by-default ACLs
- documented topic namespace
- schema_version in payload
- message_id for important flows
- Last Will for devices
- retained only for status
- idempotent backend
- broker and backend monitoring
- command ack/result for actionsmosquitto_pub / sub
# Subscribe
mosquitto_sub -h mqtt.example.com -p 8883 --cafile ca.crt \
-u backend -P 'secret' -t 'acme/+/+/+/telemetry/#' -q 1 -v
# Publish
mosquitto_pub -h mqtt.example.com -p 8883 --cafile ca.crt \
-u gw_123 -P 'secret' \
-t 'acme/paris/line-2/gw-123/telemetry/temperature' \
-m '{"value":21.7,"unit":"C","schema_version":1}' -q 1
# Retained status
mosquitto_pub -h mqtt.example.com -p 8883 --cafile ca.crt \
-t 'acme/paris/line-2/gw-123/status/availability' \
-m '{"state":"online"}' -r -q 1Décisions rapides
Use MQTT when:
- devices are many, remote, intermittent
- pub/sub decoupling is needed
- payloads are small/medium
- near-real-time state matters
Do not use MQTT alone for:
- durable analytics replay
- huge files
- complex transactions
- database-like querying
Pair with:
- Redis for hot state
- PostgreSQL/TimescaleDB for history
- Kafka for event replay
- WebSocket for browser UIProject goal
The goal is to build a complete MQTT platform: broker, topic governance, secure device identity, Python/Django ingestion, Redis or queue buffering, validated payloads, idempotent storage, real-time dashboard, command workflow, monitoring, alerting, high availability and production runbooks.
The project should not start with a huge distributed architecture. The clean path is progressive: first make a local MVP reliable, then add security, then add ingestion durability, then dashboarding, then commands, then HA and cloud scaling.
| Stage | Goal | Main deliverable | Exit criteria |
|---|---|---|---|
| Phase 0 | Define architecture and conventions. | Topic contract, payload contract, broker choice. | Design reviewed and documented. |
| MVP | Publish, subscribe, store, view. | Mosquitto + Python subscriber + Django Admin. | End-to-end telemetry works. |
| Security | Lock down clients and topics. | TLS, users, ACLs, device registry. | Negative security tests pass. |
| Ingestion | Make processing reliable. | Redis/Celery buffer, validation, idempotency. | No duplicate writes under retry. |
| Dashboard | Expose live operational state. | Device status, telemetry, errors, lag. | Operators can diagnose issues. |
| HA | Prepare production scale. | Cluster or managed broker, monitoring, runbooks. | Failover and load tests pass. |
Target architecture
Devices / Simulators / Gateways
│
│ MQTT/TLS
▼
MQTT Broker
│
├── auth
├── ACL
├── retained status
├── Last Will
└── topic routing
│
▼
Python MQTT Ingestion Service
│
├── fast callback
├── topic parser
├── payload envelope validation
└── enqueue record
│
▼
Redis Stream / Celery / Kafka
│
▼
Processing Workers
│
├── strict validation
├── identity check
├── idempotency
├── storage
├── alerts
└── dashboard updates
│
▼
Django Platform
├── Admin
├── API
├── dashboard
├── command journal
├── rejected messages
└── monitoring viewsPhase 0: architecture, contracts and technical baseline
Phase 0 prevents chaos later. Before writing ingestion code, define the broker, topic namespace, payload fields, QoS policy, retained policy, security model, storage targets and operational metrics.
| Decision | Recommended baseline | Reason |
|---|---|---|
| Broker for MVP | Mosquitto | Simple, fast, easy to test locally. |
| Topic format | prod/{tenant}/{site}/{area}/{asset}/{stream}/{signal} | Clean routing and ACLs. |
| Payload format | JSON with schema_version and message_id. | Validation and evolution. |
| QoS baseline | QoS 1 for important flows, QoS 0 for frequent non-critical telemetry. | Good reliability/cost balance. |
| Retained policy | Status only. | Retained is latest state, not history. |
| Backend stack | Django + Redis + Celery or Redis Streams. | Simple reliable processing path. |
Contracts to write before code
Documents:
- topic_namespace.md
- payload_contract.md
- qos_policy.md
- retained_policy.md
- acl_policy.md
- device_identity_model.md
- command_lifecycle.md
- monitoring_metrics.md
- incident_runbooks.mdPhase 0 deliverables
Deliverables:
[ ] Broker selected for MVP
[ ] Topic namespace documented
[ ] Payload schema v1 documented
[ ] Allowed streams documented
[ ] Allowed metrics documented
[ ] QoS policy documented
[ ] Retained policy documented
[ ] Device identity model documented
[ ] Initial Django app structure defined
[ ] Storage model drafted
[ ] Monitoring metrics listed
[ ] MVP test scenarios listedInitial topic contract
Telemetry:
prod/{tenant}/{site}/{area}/{asset}/telemetry/{metric}
Status:
prod/{tenant}/{site}/{area}/{asset}/status/{state_name}
Event:
prod/{tenant}/{site}/{area}/{asset}/event/{event_type}
Command:
prod/{tenant}/{site}/{area}/{asset}/command/{command_name}
Ack:
prod/{tenant}/{site}/{area}/{asset}/ack/{command_name}
Result:
prod/{tenant}/{site}/{area}/{asset}/result/{command_name}MVP: first working platform
The MVP must prove the complete path: a simulator publishes telemetry and status; the broker receives it; the Python subscriber consumes it; Django stores it; the Admin displays devices, latest state, telemetry points and rejected messages.
MVP components
MVP stack:
- Mosquitto broker
- Python Paho publisher simulator
- Python Paho backend subscriber
- Django app mqtt_app
- Device model
- TelemetryPoint model
- DeviceState model
- MQTTInboxMessage model
- MQTTRejectedMessage model
- Django Admin pages
- Basic smoke commands| Feature | MVP scope | Not yet |
|---|---|---|
| Broker | Single Mosquitto instance. | Cluster or cloud managed broker. |
| Security | Username/password and local ACL. | Full mTLS and certificate lifecycle. |
| Ingestion | Subscriber writes via service layer. | Large distributed pipeline. |
| Dashboard | Django Admin and simple status view. | Full WebSocket dashboard. |
| Testing | Smoke tests and retry tests. | Massive load testing. |
MVP flow
Simulator
│
├── publishes telemetry/temperature
├── publishes status/availability retained
└── uses QoS 1
│
▼
Mosquitto
│
├── routes to backend subscriber
└── keeps retained status
│
▼
Django MQTT subscriber
│
├── parses topic
├── validates payload
├── builds idempotency key
├── writes telemetry
├── updates latest state
└── stores rejects
│
▼
Django Admin
├── devices
├── telemetry points
├── latest state
├── inbox messages
└── rejected messagesMVP acceptance tests
[ ] Publish one telemetry point
[ ] Telemetry appears in Django Admin
[ ] Publish retained online status
[ ] New subscriber receives retained status
[ ] Invalid JSON is rejected visibly
[ ] Invalid topic is rejected visibly
[ ] Duplicate message_id is not inserted twice
[ ] Device offline status works with Last Will
[ ] Subscriber restart does not break ingestion
[ ] Broker restart recovery is documentedSecurity milestone: TLS, ACLs, identity and negative tests
After the MVP works, lock it down. Security must be tested with denied actions: anonymous access, wrong topic, wrong user, wrong device namespace, command publishing by a device and revoked device connection.
| Security feature | Implementation | Validation |
|---|---|---|
| TLS | Broker listener on 8883. | Plain 1883 disabled or private only. |
| Authentication | Per-device user or certificate identity. | Unknown client rejected. |
| ACL | Deny-by-default topic rules. | Forbidden publish/subscribe fails. |
| Device registry | Django model with active/disabled/revoked. | Revoked device cannot connect or process data. |
| Command isolation | Only command service publishes command topics. | Device command publish rejected. |
| Audit | Auth failures, ACL denied, rejected messages. | Admin shows reason and source. |
Device ACL baseline
Device gw-123 can publish:
prod/acme/paris/line-02/gw-123/telemetry/#
prod/acme/paris/line-02/gw-123/status/#
prod/acme/paris/line-02/gw-123/event/#
Device gw-123 can subscribe:
prod/acme/paris/line-02/gw-123/command/#
Device gw-123 cannot:
- subscribe prod/acme/#
- publish another device namespace
- publish command topics
- access admin topicsSecurity architecture
MQTT CONNECT
│
▼
TLS handshake
│
▼
Authenticate client
│
▼
Resolve identity in registry
│
├── unknown -> reject
├── disabled -> reject
├── revoked -> reject
└── active -> load ACL
│
▼
Enforce publish/subscribe rules
│
▼
Log denied operationsNegative test plan
[ ] Anonymous connect is rejected
[ ] Wrong password is rejected
[ ] Wrong certificate is rejected
[ ] Device cannot subscribe to tenant wildcard
[ ] Device cannot publish to another device
[ ] Device cannot publish command topic
[ ] Backend worker cannot publish commands unless authorized
[ ] Revoked device is rejected
[ ] Invalid payload is rejected
[ ] Oversized payload is rejected
[ ] All denials are loggedIngestion milestone: buffering, validation, idempotency and storage
The ingestion milestone turns the MVP into a robust data pipeline. The MQTT callback should enqueue quickly. Workers should validate, deduplicate, store and expose failures. This avoids blocking the subscriber when the database or downstream services are slow.
Recommended modules
mqtt_app/
services/
mqtt_client.py
topic_parser.py
payload_validator.py
idempotency.py
ingestion_buffer.py
storage_writer.py
metrics.py
management/
commands/
mqtt_ingest.py
mqtt_consume_buffer.py
mqtt_publish_test.py
mqtt_replay_failed.py
models.py
admin.py
tasks.py| Component | Responsibility | Failure visible in Admin? |
|---|---|---|
| Topic parser | Extract env, tenant, site, area, asset, stream, signal. | Yes, invalid_topic. |
| Payload validator | Schema, type, unit, timestamp, identity. | Yes, validation error. |
| Idempotency service | Prevent duplicate business writes. | Yes, duplicate_count. |
| Buffer | Decouple MQTT from backend speed. | Lag metrics. |
| Storage writer | Write state, telemetry, events, rejects. | Yes, failed writes. |
Ingestion pipeline
on_message callback
│
├── read topic
├── read payload bytes
├── check max size
└── enqueue raw record
│
▼
Buffer consumer
│
├── parse topic
├── parse JSON
├── validate schema
├── check identity
├── build idempotency key
├── write inbox record
├── write business data
├── update latest state
└── ack buffer messageStorage targets
Current state:
Redis + DeviceState model
Telemetry:
TelemetryPoint table or TimescaleDB
Events:
MQTTEvent table
Rejected messages:
MQTTRejectedMessage table
Duplicates:
MQTTInboxMessage duplicate_count
Commands:
MQTTCommandJournal tableAcceptance tests
[ ] Callback stays fast under DB latency
[ ] Queue depth is visible
[ ] Duplicate message_id is ignored
[ ] Invalid topic is rejected
[ ] Invalid payload is rejected
[ ] Identity mismatch is rejected
[ ] Old timestamp is quarantined or rejected
[ ] Worker crash does not lose message
[ ] Replay failed message works
[ ] Admin explains rejection reason clearlyDashboard milestone: visibility for devices, data, errors and lag
A platform is not production-ready if failures are invisible. The dashboard should show device state, latest telemetry, ingestion errors, duplicates, backend lag, broker metrics, security denials and command status.
| Dashboard page | Content | Audience |
|---|---|---|
| Fleet overview | Total devices, online, stale, offline, disabled. | Operations. |
| Device detail | Latest state, telemetry, events, errors, commands. | Support / Engineering. |
| Ingestion health | Queue depth, oldest pending age, processing latency. | Backend / SRE. |
| Rejected messages | Error category, topic, payload hash, count. | Firmware / Backend. |
| Security | Auth failures, ACL denied, revoked attempts. | Security / Platform. |
| Commands | Created, sent, acked, done, failed, timeout. | Operations. |
Dashboard metrics
Fleet:
- total devices
- online devices
- stale devices
- offline devices
- last_seen_at
- firmware versions
Ingestion:
- messages received per minute
- rejected messages
- duplicate messages
- queue depth
- oldest pending age
- processing latency
Broker:
- connected clients
- messages in/out
- dropped messages
- retained count
- auth failures
- ACL deniedDashboard architecture
MQTT ingestion workers
│
├── update database
├── update Redis hot state
└── emit internal event
│
▼
Django dashboard API
│
├── fleet summary
├── device detail
├── error list
├── lag metrics
└── command status
│
▼
Frontend
├── polling for MVP
└── WebSocket for real-time modeAdmin-first rollout
Step 1:
Django Admin pages for all models
Step 2:
Read-only dashboard page
Step 3:
Filtering by tenant, site, stream and device
Step 4:
Operational widgets and counters
Step 5:
WebSocket live updates
Step 6:
Operator actions for commands and replayCommand milestone: safe command lifecycle
Commands are more sensitive than telemetry. The platform must not simply publish a command and hope it worked. A command needs a journal, a command ID, validation, expiration, ack, result, timeout handling and audit.
Command lifecycle
CREATED
│
├── RBAC validated
├── target device validated
├── command payload validated
▼
PUBLISHED
│
├── MQTT publish succeeded
▼
ACKED
│
├── device accepted command
▼
RUNNING
│
├── device executing
▼
DONE
Error states:
- rejected
- expired
- timeout
- failed
- cancelled| Command field | Purpose | Required? |
|---|---|---|
| command_id | Idempotency and audit. | Yes. |
| target_device | Device scope. | Yes. |
| requested_by | Trace operator or system. | Yes. |
| expires_at | Prevent late execution. | Yes. |
| ack_at | Device received command. | Yes for critical commands. |
| result_at | Device completed command. | Yes for critical commands. |
Command topics
Command:
prod/acme/paris/line-02/gw-123/command/reboot
Ack:
prod/acme/paris/line-02/gw-123/ack/reboot
Result:
prod/acme/paris/line-02/gw-123/result/rebootCommand payloads
Command:
{
"schema_version": "1.0",
"command_id": "cmd-20260425-001",
"action": "reboot",
"requested_by": "operator-7",
"expires_at": "2026-04-25T10:01:00Z"
}
Ack:
{
"schema_version": "1.0",
"command_id": "cmd-20260425-001",
"status": "accepted",
"ts": "2026-04-25T10:00:05Z"
}
Result:
{
"schema_version": "1.0",
"command_id": "cmd-20260425-001",
"status": "done",
"ts": "2026-04-25T10:00:28Z"
}Command acceptance tests
[ ] Command is written to journal before publish
[ ] command_id is unique
[ ] Device rejects expired command
[ ] Device publishes ack
[ ] Device publishes result
[ ] Timeout is detected
[ ] Duplicate command_id is not executed twice
[ ] Unauthorized user cannot create command
[ ] Device cannot publish command topic
[ ] Command failure is visible in AdminScaling and HA milestone
HA and scaling should come after the core pipeline is correct. Scaling a bad design only makes failures faster and harder to diagnose. The HA milestone introduces broker resilience, shared subscriptions, backend workers, monitoring and failover tests.
| Scale level | Architecture | When to use |
|---|---|---|
| Small | Single Mosquitto + Django subscriber. | Demo, lab, low-volume MVP. |
| Medium | Broker + Redis Stream + worker pool. | Real ingestion with controlled volume. |
| Large | EMQX/HiveMQ cluster + shared subscriptions + Timescale/Kafka. | Many devices, high rate, HA required. |
| Industrial edge | Local broker + bridge to central broker. | Sites must survive WAN outage. |
| Cloud managed | AWS IoT Core or Azure IoT Hub. | Managed identity and cloud integration. |
HA components
Broker HA:
- broker cluster or standby broker
- load balancer
- health checks
- config backup
- TLS certificate rotation
Backend HA:
- multiple subscribers
- shared subscriptions
- worker pool
- queue/buffer monitoring
- database scaling
Operational HA:
- dashboards
- alerts
- runbooks
- backup/restore
- failover drillsProduction HA architecture
Devices
│
▼
TCP/TLS Load Balancer
│
├── Broker Node 1
├── Broker Node 2
└── Broker Node 3
│
└── $share/ingestion-workers/prod/acme/+/+/+/telemetry/#
│
├── worker-01
├── worker-02
├── worker-03
└── worker-04
│
▼
Redis / Kafka buffer
│
▼
Storage / Dashboard / AlertsHA acceptance tests
[ ] Broker node restart
[ ] Subscriber process restart
[ ] Worker crash and recovery
[ ] Database slowdown
[ ] Queue backlog and catch-up
[ ] Device reconnect storm
[ ] TLS certificate rotation
[ ] ACL rollback
[ ] Retained cleanup
[ ] Bridge WAN outage
[ ] Replay duplicate handling
[ ] Command timeout during outageTesting strategy: functional, security, load and chaos
MQTT testing must cover happy path and failure path. The most important tests are duplicates, reconnects, invalid payloads, unauthorized topics, backend lag, retained misuse and command timeout.
| Test family | Examples | Goal |
|---|---|---|
| Functional | Publish telemetry, retained status, event, command ack. | Validate end-to-end behavior. |
| Security | Anonymous, wrong ACL, wrong namespace, revoked device. | Prove forbidden actions fail. |
| Reliability | Duplicate QoS 1, reconnect, broker restart, worker crash. | Prove no duplicate business writes. |
| Load | Many clients, high messages/sec, large payloads. | Find capacity and bottlenecks. |
| Chaos | DB slow, Redis down, network loss, bridge outage. | Validate recovery and visibility. |
| Operations | Alert triggers, dashboard freshness, runbook execution. | Validate on-call readiness. |
Minimum automated tests
[ ] topic parser valid topic
[ ] topic parser invalid topic
[ ] payload validator valid payload
[ ] payload validator invalid schema
[ ] payload validator identity mismatch
[ ] idempotency duplicate message_id
[ ] command lifecycle timeout
[ ] retained status behavior
[ ] rejected message creation
[ ] device state transition online/offlineChaos test scenarios
Scenario 1: worker crash
1. Publish 1000 QoS 1 messages
2. Kill worker after 300 messages
3. Restart worker
4. Verify no duplicate business rows
5. Verify queue catches up
Scenario 2: broker restart
1. Devices publish status and telemetry
2. Restart broker
3. Devices reconnect with backoff
4. Verify Last Will behavior
5. Verify ingestion resumes
Scenario 3: DB slow
1. Add artificial DB latency
2. Publish telemetry burst
3. Verify MQTT callback does not block
4. Verify queue depth grows visibly
5. Verify workers catch up later
Scenario 4: command timeout
1. Publish command to offline device
2. Wait for timeout
3. Verify journal status is timeout
4. Verify no false success is displayedLoad test dimensions
Vary:
- number of clients
- message rate per client
- payload size
- QoS level
- retained flag
- subscriber count
- worker count
- database latency
- reconnect storms
- command frequencyFinal project roadmap
Roadmap by increments
| Increment | Scope | Deliverable |
|---|---|---|
| R0 | Design baseline. | Topic contract, payload contract, QoS policy. |
| R1 | Broker MVP. | Mosquitto config, users, basic topics. |
| R2 | Python subscriber. | Paho management command with fast callback. |
| R3 | Django storage. | Device, telemetry, state, rejected messages. |
| R4 | Validation and idempotency. | Parser, schema validation, inbox table. |
| R5 | Security. | TLS, ACL, device registry, negative tests. |
| R6 | Buffer and workers. | Redis/Celery pipeline and lag metrics. |
| R7 | Dashboard. | Fleet, device, ingestion, errors, security views. |
| R8 | Commands. | Command journal, ack, result, timeout. |
| R9 | Monitoring. | Metrics, alerts, runbooks. |
| R10 | HA and scale. | Shared subscriptions, cluster or managed broker. |
One-page execution plan
Week 1:
- topic contract
- payload contract
- Mosquitto MVP
- simulator publish/sub
Week 2:
- Django mqtt_app
- subscriber management command
- telemetry and state storage
- admin views
Week 3:
- validation
- idempotency
- rejected messages
- Last Will and retained status
Week 4:
- TLS
- ACLs
- device registry
- negative security tests
Week 5:
- Redis or Celery buffer
- worker pipeline
- lag metrics
- replay failed messages
Week 6:
- dashboard
- alerts
- command journal
- command ack/result
Week 7+:
- load tests
- HA design
- bridge or cloud IoT
- production runbooks
- failover testsProduction readiness gate
[ ] End-to-end telemetry works
[ ] Status retained works
[ ] Last Will works
[ ] Invalid messages are visible
[ ] Duplicates are safe
[ ] ACL negative tests pass
[ ] Commands have ack/result
[ ] Backend lag is monitored
[ ] Dropped messages are alerted
[ ] Certificate expiry is tracked
[ ] Load test is documented
[ ] Runbooks exist
[ ] Backup and rollback are testedPortfolio positioning
This portfolio presents five realistic MQTT and IoT implementation projects across energy, smart metering, building operations, industrial telemetry and platform engineering. Each project follows the same core pattern: connected assets publish telemetry and status through MQTT, a secure broker routes the messages, backend workers validate and store the data, and dashboards expose operational state.
| Project | Domain | Main MQTT role | Backend focus |
|---|---|---|---|
| Techem | Smart metering and consumption telemetry | Meter data ingestion | Validation, billing-grade history, anomalies |
| Idex | Building energy operations | Heating, plant room and equipment telemetry | Supervision, alerts, energy optimization |
| Kocliko | Connected building and HVAC monitoring | Sensor and HVAC state streaming | Occupancy, comfort, predictive maintenance |
| E.ON Energy | Energy production and grid-side telemetry | Distributed energy asset monitoring | Load control, storage, forecasting |
| IDEO-Lab Reference Platform | Reusable MQTT / IoT platform | Multi-tenant MQTT architecture | Django, Redis, Celery, dashboards, HA |
Common architecture across projects
Field devices
│
├── meters
├── gateways
├── sensors
├── HVAC equipment
├── energy controllers
└── edge agents
│
│ MQTT/TLS
▼
MQTT Broker
│
├── authentication
├── ACL by topic
├── retained status
├── Last Will
└── topic routing
│
▼
Python / Django backend
│
├── validation
├── idempotency
├── ingestion buffer
├── time-series storage
├── alerts
└── dashboardsPortfolio keywords
MQTT
IoT
Smart metering
Energy telemetry
Building operations
HVAC monitoring
Edge gateway
Django backend
Redis buffer
Celery workers
TimescaleDB
Device identity
ACL by topic
Retained status
Last Will
Command ack/result
Monitoring
High availabilityProject 1: Techem - smart metering MQTT ingestion platform
The Techem-oriented project focuses on smart metering telemetry: heating meters, water meters, energy counters and gateway-based collection. MQTT is used as the lightweight transport between distributed metering gateways and the central backend.
| Dimension | Implementation |
|---|---|
| Assets | Heat meters, water meters, building gateways, consumption sensors. |
| Messages | Consumption index, temperature, battery, gateway status, meter alarms. |
| MQTT design | Tenant, building, floor, meter, stream and metric namespace. |
| QoS | QoS 1 for billing-related measurements and alarms. |
| Storage | Time-series history, billing snapshots, anomaly table, rejected messages. |
| Business value | Reliable consumption collection, data quality, anomaly detection. |
Topic examples
prod/techem/building-042/floor-03/meter-778/telemetry/heat-kwh
prod/techem/building-042/floor-03/meter-778/telemetry/water-m3
prod/techem/building-042/floor-03/meter-778/status/battery
prod/techem/building-042/gateway-01/status/availability
prod/techem/building-042/gateway-01/event/meter-offlineTarget pipeline
Meter
│
▼
Building gateway
│
├── reads meter values
├── normalizes units
├── adds meter_id and sequence
└── publishes MQTT
│
▼
MQTT Broker
│
▼
Django ingestion
│
├── validates schema
├── checks meter registry
├── deduplicates by meter_id + sequence
├── stores consumption history
├── detects abnormal gaps
└── updates dashboardPayload example
{
"schema_version": "1.0",
"message_id": "msg-meter-778-00018422",
"meter_id": "meter-778",
"sequence": 18422,
"metric": "heat-kwh",
"value": 12842.72,
"unit": "kWh",
"quality": "good",
"event_ts": "2026-04-25T10:00:00Z"
}Key deliverables
[ ] Meter registry
[ ] Gateway registry
[ ] Topic contract
[ ] Payload schema
[ ] MQTT ingestion daemon
[ ] Idempotency key by meter_id + sequence
[ ] Consumption history table
[ ] Anomaly detection for missing readings
[ ] Battery and offline alerts
[ ] Admin dashboard for rejected readingsProject 2: Idex - building energy operations and supervision
The Idex-oriented project focuses on building energy systems: boiler rooms, heat pumps, substations, HVAC equipment, energy counters and plant-room supervision. MQTT is used to collect equipment status, sensor telemetry and operational events.
| Dimension | Implementation |
|---|---|
| Assets | Boilers, pumps, heat exchangers, HVAC controllers, gateways. |
| Messages | Temperature, pressure, flow, mode, alarm, availability, energy counters. |
| MQTT design | Site, plant room, equipment, stream and signal hierarchy. |
| Commands | Setpoint update, mode switch, diagnostic request, restart gateway. |
| Dashboard | Equipment state, energy performance, alarms, stale devices. |
| Business value | Reduced downtime, better supervision, energy optimization. |
Topic examples
prod/idex/site-075/plant-room-01/boiler-02/telemetry/temperature
prod/idex/site-075/plant-room-01/pump-04/telemetry/pressure
prod/idex/site-075/plant-room-01/boiler-02/status/mode
prod/idex/site-075/plant-room-01/boiler-02/event/alarm
prod/idex/site-075/plant-room-01/boiler-02/command/setpoint
prod/idex/site-075/plant-room-01/boiler-02/ack/setpointOperations architecture
Building equipment
│
├── sensors
├── PLC / controller
├── local gateway
└── plant room network
│
▼
MQTT Broker
│
├── live dashboard
├── alarm engine
├── energy analytics
├── command service
└── maintenance workflowCommand flow
Operator
│
│ HTTP action in dashboard
▼
Command service
│
├── validates RBAC
├── checks equipment state
├── creates command_id
├── writes command journal
└── publishes MQTT command
│
▼
Equipment gateway
│
├── validates expiry
├── applies setpoint
├── publishes ack
└── publishes resultMain technical challenges
| Challenge | Solution |
|---|---|
| Heterogeneous equipment | Gateway normalization and metric registry. |
| Operational alarms | QoS 1 events, idempotent event storage, alert rules. |
| Unsafe commands | Command journal, expiration, ack and result. |
| Stale data | Last Will, retained status and freshness dashboard. |
Project 3: Kocliko - connected building and HVAC intelligence
The Kocliko-oriented project focuses on connected buildings, indoor comfort, HVAC optimization, occupancy signals and sensor-based building intelligence. MQTT is used to stream room-level telemetry and equipment state to a backend platform.
| Dimension | Implementation |
|---|---|
| Assets | Rooms, sensors, HVAC units, gateways, comfort controllers. |
| Messages | Temperature, humidity, CO2, presence, comfort score, HVAC mode. |
| Analytics | Comfort drift, occupancy trends, energy waste, predictive maintenance. |
| Storage | Hot state in Redis, history in time-series, events in relational tables. |
| Dashboard | Room comfort, stale sensors, HVAC state, alerts, optimization hints. |
| Business value | Better comfort, lower energy waste, data-driven HVAC operations. |
Topic examples
prod/kocliko/building-a/floor-04/room-412/telemetry/temperature
prod/kocliko/building-a/floor-04/room-412/telemetry/co2
prod/kocliko/building-a/floor-04/room-412/telemetry/presence
prod/kocliko/building-a/floor-04/hvac-09/status/mode
prod/kocliko/building-a/floor-04/hvac-09/command/set-mode
prod/kocliko/building-a/floor-04/hvac-09/result/set-modeSmart building flow
Room sensors
│
├── temperature
├── humidity
├── CO2
├── presence
└── comfort signal
│
▼
MQTT Broker
│
▼
Backend intelligence
│
├── validates room registry
├── updates hot state
├── stores history
├── computes comfort score
├── detects stale sensors
└── proposes HVAC actionComfort payload example
{
"schema_version": "1.0",
"message_id": "room-412-000843",
"room_id": "room-412",
"metric": "co2",
"value": 982,
"unit": "ppm",
"quality": "good",
"event_ts": "2026-04-25T10:00:00Z"
}Dashboard widgets
Building dashboard:
- comfort score by floor
- CO2 heatmap
- temperature drift
- stale sensors
- HVAC mode
- occupancy trend
- energy waste indicators
- command success rate
- rejected payloads by firmwareProject 4: E.ON Energy - distributed energy telemetry and control
The E.ON Energy-oriented project focuses on distributed energy assets: meters, inverters, batteries, substations, grid-edge controllers and load-control devices. MQTT provides a lightweight real-time channel for telemetry, state and controlled command workflows.
| Dimension | Implementation |
|---|---|
| Assets | Meters, solar inverters, batteries, controllers, grid-edge gateways. |
| Messages | Power, voltage, current, kWh, battery SOC, inverter status, alarms. |
| Control | Load shedding, battery mode, inverter diagnostic, setpoint command. |
| Storage | Time-series, event log, command journal, data lake export. |
| Analytics | Forecasting, anomaly detection, peak shaving, energy optimization. |
| Business value | Operational visibility, faster response, grid-side optimization. |
Topic examples
prod/eon/site-129/inverter-03/telemetry/power-kw
prod/eon/site-129/battery-01/status/soc
prod/eon/site-129/meter-main/telemetry/kwh
prod/eon/site-129/controller-01/event/grid-alarm
prod/eon/site-129/controller-01/command/load-shed
prod/eon/site-129/controller-01/result/load-shedEnergy platform architecture
Distributed energy assets
│
├── meters
├── inverters
├── batteries
├── controllers
└── edge gateways
│
▼
MQTT Broker / Cloud IoT
│
├── rules engine
├── stream connector
├── time-series storage
├── alert engine
├── forecasting engine
└── command serviceLoad control command
{
"schema_version": "1.0",
"command_id": "cmd-load-20260425-001",
"action": "load_shed",
"target_kw": 2.5,
"duration_seconds": 300,
"requested_by": "optimization-engine",
"expires_at": "2026-04-25T10:01:00Z"
}Critical controls
| Risk | Control |
|---|---|
| Late command execution | expires_at and device-side validation. |
| Duplicate command | command_id and idempotent device execution. |
| Bad billing-grade data | Quality field, audit trail and validation. |
| Large telemetry volume | Edge aggregation and frequency policy. |
Project 5: IDEO-Lab Reference Platform - reusable MQTT / IoT backbone
The IDEO-Lab reference project is a reusable MQTT / IoT platform template. Its goal is to provide a generic technical foundation that can be adapted to smart metering, energy supervision, HVAC monitoring, industrial telemetry or edge observability.
Core modules
Django apps:
- mqtt_devices
- mqtt_ingestion
- mqtt_topics
- mqtt_security
- mqtt_commands
- mqtt_dashboard
- mqtt_monitoring
- mqtt_rejected
- mqtt_simulators
- mqtt_reports| Module | Responsibility |
|---|---|
| Device registry | Clients, devices, tenants, sites, credentials, state. |
| Topic governance | Contracts, allowed streams, metrics, naming rules. |
| Ingestion engine | Paho subscriber, Redis buffer, validation, idempotency. |
| Command center | Command journal, publish, ack, result, timeout. |
| Dashboard | Fleet state, telemetry, lag, rejected messages, alerts. |
| Runbooks | Operational checks, failure diagnosis, incident workflow. |
Reference architecture
MQTT Broker
│
▼
mqtt_ingest management command
│
├── fast callback
├── topic parser
└── enqueue raw record
│
▼
Redis Stream
│
▼
Worker pool
│
├── validate
├── deduplicate
├── enrich
├── store
├── alert
└── push dashboard state
│
▼
Django Admin and dashboard
├── devices
├── current state
├── telemetry history
├── rejected messages
├── duplicates
├── commands
└── monitoringReusable deliverables
[ ] Django models
[ ] Django Admin pages
[ ] Management commands
[ ] MQTT simulator
[ ] Topic parser
[ ] Payload validators
[ ] Idempotency service
[ ] Redis Stream adapter
[ ] Command journal
[ ] Dashboard template
[ ] Monitoring counters
[ ] Security checklist
[ ] Runbook libraryShared technical architecture for the 5 projects
Common data model
Tenant
└── Site
└── Area
└── Asset / Device
├── TelemetryPoint
├── DeviceState
├── MQTTEvent
├── MQTTInboxMessage
├── MQTTRejectedMessage
└── MQTTCommandJournalCommon backend services
| Service | Purpose |
|---|---|
| TopicParser | Extract env, tenant, site, area, asset, stream, signal. |
| PayloadValidator | Validate schema, identity, metric, unit, timestamp. |
| IdempotencyService | Prevent duplicate business writes. |
| StateService | Update latest status and retained-style state. |
| TelemetryWriter | Write time-series history. |
| CommandService | Publish commands and track ack/result. |
| MonitoringService | Expose lag, rejects, duplicates, freshness. |
End-to-end flow
Device publishes MQTT
│
▼
Broker authenticates and authorizes
│
▼
Backend subscriber receives
│
▼
Raw record goes to buffer
│
▼
Worker validates and deduplicates
│
├── telemetry -> time-series
├── status -> hot state
├── event -> event table
├── ack -> command journal
└── invalid -> rejected messages
│
▼
Dashboard and alerts updateTopic design reused everywhere
prod/{tenant}/{site}/{area}/{asset}/telemetry/{metric}
prod/{tenant}/{site}/{area}/{asset}/status/{state_name}
prod/{tenant}/{site}/{area}/{asset}/event/{event_type}
prod/{tenant}/{site}/{area}/{asset}/command/{command_name}
prod/{tenant}/{site}/{area}/{asset}/ack/{command_name}
prod/{tenant}/{site}/{area}/{asset}/result/{command_name}Business and technical value summary
| Project | Technical value | Business value |
|---|---|---|
| Techem | Reliable metering ingestion and data quality. | Better consumption tracking and anomaly visibility. |
| Idex | Building equipment supervision and command workflow. | Lower downtime and improved energy operations. |
| Kocliko | Room-level telemetry and HVAC intelligence. | Comfort optimization and energy waste reduction. |
| E.ON Energy | Distributed energy telemetry and controlled actions. | Grid-edge visibility and optimization. |
| IDEO-Lab | Reusable Django MQTT platform architecture. | Faster delivery of future IoT projects. |
Skills demonstrated
Architecture:
- MQTT broker design
- topic namespace
- QoS policy
- retained and Last Will
- edge gateway pattern
Backend:
- Python Paho ingestion
- Django data model
- Redis / Celery buffering
- idempotency
- validation
- time-series storage
Operations:
- security ACLs
- monitoring
- rejected messages
- dashboarding
- command audit
- HA planningPortfolio pitch
I designed MQTT and IoT-oriented backend architectures
for smart metering, energy supervision, connected buildings,
HVAC monitoring and reusable platform engineering.
The work covers:
- secure MQTT topic architecture
- device and gateway identity
- TLS and ACL design
- Python ingestion services
- Django storage and admin visibility
- Redis/Celery buffering
- payload validation
- idempotent processing
- telemetry dashboards
- command ack/result workflows
- monitoring and production runbooks