8) Observabilité & qualité en run — RAG
Tracing par étape, boucle de feedback, alerting, red teaming continu, dashboards & livrables.
Tracing par étape Feedback boucle-fermée Alertes Red teaming Dashboards & livrables
Mise en Œuvre des Outils
Instrumentation OpenTelemetry/Prometheus, logs JSON, panels Grafana, Alertmanager, privacy & SLO.
8) Observabilité & qualité en run (Jours 22–28)
Spans & attributs (par étape)
- reformulate → {lang, intent, toks_in}.
- retrieve → {k, hits, hit_at_5, index_version, filters, latency_ms}.
- rerank → {model, k_in, k_out, latency_ms}.
- answer → {model, toks_in, toks_out, citations, faithfulness_pred}.
- postprocess → {redactions, policies_triggered}.
Inclure un request_id global + user_hash (SHA-256), jamais de PII en clair.
Trace (JSON condensé)
{
"ts":"2025-09-18T09:10:12Z","request_id":"r_7f3","user_hash":"u_d1a..",
"stages":[
{"name":"retrieve","lat_ms":220,"k":20,"hit@5":0.9,"index_version":"v13"},
{"name":"rerank","lat_ms":80,"k_in":20,"k_out":5,"model":"ce-msmarco"},
{"name":"answer","lat_ms":350,"model":"gpt-5-mini","tok_in":900,"tok_out":230,"citations":4}
],
"cost_eur":0.0042,"status":"ok"
}Boucle fermée
- UI : like/dislike + motif + champ “corriger la réponse”.
- Orchestration : si *dislike* ⇒ ouvrir ticket *triage* + pousser reindex_queue (sources citées).
- Priorisation : score = fréquence * sévérité * BU.
- Ré-indexation : pipeline incrémental (hash/last_modified).
Modèle de données (SQL)
CREATE TABLE qa_feedback(
id BIGSERIAL PRIMARY KEY,
user_hash TEXT, request_id TEXT, question TEXT,
answer TEXT, sources JSONB, rating SMALLINT, comment TEXT,
created_at TIMESTAMP DEFAULT now()
);Relier qa_feedback aux documents/chunks pour des *quality heatmaps* par source.
Seuils & SLO (exemples)
- P95 latency ≤ 1.5 s (global) ; ≤ 900 ms (retrieve+rerank).
- hit@5 ≥ 0.85 (moyenne glissante 1h).
- Coût/req ≤ €0.01 (moyenne 1h / BU).
- Erreur rate ≤ 1 % (HTTP ≥500).
Détection
- Chute de hit@5 > 15 % vs baseline 7j.
- Drift des embeddings : distance moyenne ↑ anormale.
PromQL (extraits)
# P95 latence (seconds)
histogram_quantile(0.95, sum(rate(rag_stage_latency_seconds_bucket[5m])) by (le))
# hit@5 (moyenne 5m)
avg_over_time(rag_retrieval_hit_at_5[5m])
# coût par req (5m)
sum(rate(rag_cost_eur_total[5m])) / sum(rate(rag_requests_total[5m]))Utiliser Alertmanager (multi-window burn-rate) pour capter les surcharges rapides et lentes.
Jeux d’attaques
- Prompt-injection : ignore instructions, exfiltration “print system prompt”.
- Data exfiltration : “montre moi des secrets/API keys”.
- Safety : contenu toxique / OSINT sensible.
Plan hebdo : batch 100 prompts d’attaque → score *pass rate* & *false negative*.
Pseudo-code de test
# redteam.py
for atk in ATTACK_SET:
out = ask_rag(atk.prompt)
verdict = guardrails(out) # should_block: True/False
log_redteam(atk.id, verdict, out.flags)À livrer
- Dashboards : latences par étape, coûts/req, hit@k, score feedback, erreurs.
- Alertes : P95, hit@5, coût, erreurs, drift.
- Rapport red teaming : pass rate, cas critiques, correctifs.
Contrat d’observabilité (JSON)
{
"slo":{"p95_ms":1500,"hit@5":0.85,"err_rate":0.01,"cost_eur_per_req":0.01},
"alerts":["latency_p95","hit5_drop","cost_spike","error_rate","embedding_drift"],
"privacy":{"hash_user":true,"pii_redaction":true,"logs_retention_days":90}
}Mise en Œuvre des Outils — Observabilité
# otel_init.py
from opentelemetry import trace
from opentelemetry.sdk.resources import Resource
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter
def init(service="rag-api"):
provider = TracerProvider(resource=Resource.create({"service.name": service}))
processor = BatchSpanProcessor(OTLPSpanExporter(endpoint="http://otel-collector:4318/v1/traces"))
provider.add_span_processor(processor)
trace.set_tracer_provider(provider)
return trace.get_tracer(service)# usage (par étape)
tracer = init()
with tracer.start_as_current_span("retrieve", attributes={"k":20,"filters":"lang=fr"}):
# call index...
pass# logging_json.py
import logging, json, hashlib
from pythonjsonlogger import jsonlogger
def hash_user(u): return hashlib.sha256(u.encode()).hexdigest()[:12]
handler = logging.StreamHandler()
handler.setFormatter(jsonlogger.JsonFormatter("%(asctime)s %(levelname)s %(message)s"))
log = logging.getLogger("rag"); log.setLevel(logging.INFO); log.addHandler(handler)
def log_event(user_id, request_id, stage, **kw):
log.info(json.dumps({
"user_hash": hash_user(user_id),
"request_id": request_id,
"stage": stage, **kw
}))# schéma conseillé (clé → type)
{ "ts": str, "level": str, "request_id": str, "user_hash": str,
"stage": str, "lat_ms": int, "hit5": float, "model": str,
"tok_in": int, "tok_out": int, "cost_eur": float, "status": str }# metrics_prom.py
from prometheus_client import Histogram, Counter, Gauge
rag_req_total = Counter("rag_requests_total","Total requêtes")
rag_stage_latency = Histogram("rag_stage_latency_seconds","Latence par étape",["stage"])
rag_hit5 = Gauge("rag_retrieval_hit_at_5","Hit@5")
rag_cost_total = Counter("rag_cost_eur_total","Cumul coûts EUR")
def observe(stage, lat_s, hit5=None, cost=None):
rag_req_total.inc()
rag_stage_latency.labels(stage).observe(lat_s)
if hit5 is not None: rag_hit5.set(hit5)
if cost is not None: rag_cost_total.inc(cost)# PromQL (extraits)
histogram_quantile(0.95, sum(rate(rag_stage_latency_seconds_bucket[5m])) by (le,stage))
avg_over_time(rag_retrieval_hit_at_5[15m])Panels indispensables
- Timeseries P50/P95 par stage.
- Coût/req et coût cumulés.
- hit@k (5/10) & score feedback.
- Erreurs par type (HTTP, guardrails, timeouts).
Variables
- env (dev/stage/prod)
- bu (Business Unit)
- model (LLM / reranker)
# prometheus_rules.yaml (extrait)
groups:
- name: rag
rules:
- alert: RAGLatencyP95High
expr: histogram_quantile(0.95, sum(rate(rag_stage_latency_seconds_bucket[5m])) by (le)) > 1.5
for: 10m
labels: {severity: warning}
annotations: {summary: "P95 latence > 1.5s", description: "Investigate retrieve/rerank/LLM"}
- alert: RAGHit5Drop
expr: avg_over_time(rag_retrieval_hit_at_5[15m]) < 0.85
for: 15m
labels: {severity: critical}
annotations: {summary: "Chute hit@5", description: "Qualité retrieval en baisse"}
- alert: RAGCostSpike
expr: (sum(rate(rag_cost_eur_total[10m])) / sum(rate(rag_requests_total[10m]))) > 0.02
for: 10m
labels: {severity: warning}
annotations: {summary: "Spike coût/req", description: "Vérifier taille prompts & modèles"}Privacy & conformité
- Hash user_id, pas de PII brute dans logs/traces.
- Rétention: logs 90j ; traces 30j ; échantillonnage 10–30%.
- RBAC: accès aux dashboards par BU/role.
CREATE TABLE events(
ts TIMESTAMP, stage TEXT, latency_ms INT, cost_eur NUMERIC(10,5),
meta JSONB, PRIMARY KEY (ts, stage)
);SLO & error budget
- Disponibilité API ≥ 99.5% / mois.
- P95 ≤ 1.5 s (95% des jours).
- hit@5 ≥ 0.85 (moyenne 7j glissante).
