5) Orchestration RAG â Jours 12â18
SchĂ©ma canonique de requĂȘte : reformulation (opt.), retrieval hybride â rerank â construction de prompt â gĂ©nĂ©ration â post-process â guardrails. Livrables : service /rag/ask (REST/GraphQL), schĂ©mas Pydantic, logs structurĂ©s.
Reformulation Retrieval & Re-rank Prompt Génération Post-process Guardrails Livrables
Mise en Ćuvre des Outils
API /rag/ask (REST/GraphQL), Pydantic, snippets Python (FastAPI), requĂȘtes SQL/OpenSearch, construction de prompt (verbatim), post-process & guardrails, logs/OTel.
SchĂ©ma canonique de requĂȘte
Reformulation (optionnelle)
- Nettoyage : casse, ponctuation, question courte.
- Détection lang & intent (Q&A, how-to, code, policy).
- Ăquilibrage recall/precision : ajouter synonymes, âmust wordsâ.
Retrieval â Re-rank
- Hybride : BM25 âȘ Vecteur (k=20), dĂ©-dup.
- Cross-Encoder (CE) pour rerank â top=5 passages.
- Filtres : lang, bu, type, fraĂźcheur < 90j.
Prompt & Génération
- Instructions + contexte (citations & métadonnées).
- Style contraint : ton neutre, format attendu.
- ParamĂštres LLM : temperature=0.2, max_tokens=400.
Post-process & Guardrails
- Self-check factualité / consistency (mini-prompt).
- Ajout des sources + vérif présence citations.
- Guardrails : hors périmÚtre, PII leak, jailbreak, sécurité.
ImplĂ©mentation technique â /rag/ask (REST/GraphQL)
REST (FastAPI) â endpoint /rag/ask
# api.py (pseudo)
from fastapi import FastAPI
from pydantic import BaseModel, Field
app = FastAPI()
class AskRequest(BaseModel):
query: str = Field(..., max_length=2000)
lang: str | None = None
top_k: int = 20
bu: str | None = None
type: list[str] | None = None
user_id: str | None = None
class Passage(BaseModel):
id: str; score: float; source_id: str
title: str | None = None
meta: dict | None = None
text: str
class AskResponse(BaseModel):
answer: str
citations: list[Passage]
trace_id: str
metrics: dict
@app.post("/rag/ask", response_model=AskResponse)
async def rag_ask(req: AskRequest):
# 1) reformulation (opt.) + langue/intent
q = maybe_rewrite(req.query)
lang = req.lang or detect_lang(q)
# 2) retrieve â rerank
candidates = retrieve_hybrid(q, lang=lang, bu=req.bu, types=req.type, k=req.top_k)
top5 = rerank_ce(q, candidates)[:5]
# 3) prompt
prompt = build_prompt(q, lang, top5)
# 4) llm
answer = await llm_generate(prompt, temperature=0.2, max_tokens=400)
# 5) post-process + guardrails
answer, top5 = post_process(answer, top5, q)
answer = apply_guardrails(q, answer, top5, lang)
# 6) logs
trace = log_trace(req, top5, answer)
return AskResponse(answer=answer, citations=top5, trace_id=trace.id, metrics=trace.metrics)GraphQL (option)
# schema.gql
type Passage { id: ID!, score: Float!, source_id: String!, title: String, meta: JSON, text: String! }
type AskResponse { answer: String!, citations: [Passage!]!, trace_id: String!, metrics: JSON }
type Query { ragAsk(query: String!, lang: String, top_k: Int, bu: String, type: [String]): AskResponse! }Contrat JSON (exemple)
{
"query": "Quelle est la politique SSO ?",
"lang": "fr",
"top_k": 20,
"bu": "IT",
"type": ["documentation","policy"]
}Hybrid search (SQL + vecteur)
-- PostgreSQL (pgvector) + filtres
WITH dense AS (
SELECT id, text, meta, 1 - (embedding <-> :qvec) AS s
FROM rag_chunks
WHERE (meta->>'lang') = :lang
AND (meta->>'bu') = :bu
AND (meta->>'type') IN (:types)
AND (meta->>'last_modified')::date >= (now()::date - interval '90 days')
ORDER BY embedding <-> :qvec
LIMIT 40
),
sparse AS (
SELECT id, text, meta, ts_rank_cd(to_tsvector('french', text), plainto_tsquery('french', :q)) AS s
FROM rag_chunks
WHERE to_tsvector('french', text) @@ plainto_tsquery('french', :q)
LIMIT 40
)
SELECT * FROM (
SELECT *, s*0.6 AS score FROM dense
UNION
SELECT *, s*0.6 AS score FROM sparse
) u ORDER BY score DESC LIMIT 20;Reranking (Cross-Encoder)
# rerank.py
def rerank_ce(q, candidates):
scores = ce_model.score([(q, c.text) for c in candidates]) # CE donne une pertinence
ranked = [c for _, c in sorted(zip(scores, candidates), reverse=True)]
return rankedHeuristiques
- Limiter 1â2 passages par source_id pour diversitĂ©.
- Booster passages récents et ancrés (titres parents).
- Downrank si lang â langue utilisateur.
Gabarit (verbatim protégé)
System:
Tu es un assistant d'entreprise. Réponds UNIQUEMENT à partir des passages fournis.
Si l'info manque, réponds "Je ne sais pas" et propose une piste.
Guidelines:
- Ton neutre, concis. Citer les sources sous forme [source_id].
- Si plusieurs sources, fusionner les infos sans contradictions.
User (lang={{lang}}):
{{question}}
Passages:
{{#each passages}}
- [{{this.source_id}}] {{this.text}}
{{/each}}
Réponse (lang={{lang}}) avec citations:
Toujours imposer les citations dans le format attendu (ex: [conf:ENG:1234]).
Prompt Builder (pseudo)
# prompt.py
def build_prompt(q, lang, top):
ctx = "\n".join([f"- [{p.meta.get('source_id', p.id)}] {p.text}" for p in top])
return f"System: ...\nUser (lang={lang}): {q}\nPassages:\n{ctx}\nRéponse (lang={lang}) avec citations:"Appel LLM
# llm.py
async def llm_generate(prompt, temperature=0.2, max_tokens=400):
return await provider.completions(prompt, temperature=temperature, max_tokens=max_tokens)Self-check & citations
# post_process.py
def post_process(answer, passages, q):
# 1) vérifier présence de citations [source_id]
if "[" not in answer or "]" not in answer:
answer += "\n\nSources: " + ", ".join({p.meta.get("source_id", p.id) for p in passages})
# 2) mini self-check consistence
if not looks_consistent(answer, passages):
answer = soften_claims(answer) # utiliser un ton prudent
return answer, passagesFormats & contraintes
- Style contraint (neutre, pas dâaffirmations non sourcĂ©es).
- RĂ©ponses JSON (si besoin) â valider le schĂ©ma.
- Streaming facultatif pour UX.
Politiques
- Hors périmÚtre : si aucune source pertinente, refuser.
- PII leak : bloquer les demandes dâexfiltration.
- Jailbreak : refuser prompts malveillants.
- Sécurité : interdire code exécutable dangereux.
Toujours retourner un message sûr et loguer la catégorie de refus.
Classification & refus (pseudo)
# guardrails.py
def apply_guardrails(q, answer, passages, lang="fr"):
cls = classify_intent(q) # {"category":"ok|oos|pii|jailbreak|security"}
if cls["category"] != "ok":
return safe_refusal(cls["category"], lang)
return answer
def safe_refusal(cat, lang):
msg = {
"oos": "Désolé, cette question est hors périmÚtre de mes sources.",
"pii": "Je ne peux pas divulguer d'informations personnelles.",
"jailbreak": "Je ne peux pas répondre à cette demande.",
"security": "Je ne peux pas fournir ce type de contenu."
}.get(cat, "Je ne peux pas répondre à cette demande.")
return msgLogs structurés (JSON)
# logging.py
def log_trace(req, passages, answer):
trace = {
"trace_id": uuid4().hex,
"user_id": req.user_id,
"q": req.query, "lang": req.lang,
"retrieval": {"k": len(passages)+15, "top": [p.id for p in passages]},
"latency_ms": {"retrieve": 120, "rerank": 60, "llm": 540},
"cost_eur": 0.006, "guardrails": "ok",
"has_citations": "[" in answer and "]" in answer
}
write_json(trace)
return SimpleNamespace(id=trace["trace_id"], metrics={"cost": trace["cost_eur"]})Métriques & OTel
- Spans : retrieve, rerank, prompt, llm, post.
- KPIs : hit@5 (proxy), P50/P95 latence, coût/req, refus guardrails.
- Rate limit & cache (LFU 5â10 min) pour requĂȘtes similaires.
# cache.py
def cached_answer(key): ...
def set_cache(key, value, ttl=300): ...