2) Cartographie & Ingestion des données — RAG
Sources (HTML/MD, PDF, DOCX, Confluence, Notion, DB, S3/GCS, Git), normalisation, versioning/fraîcheur, pipeline incrémental et livrables.
Sources Normalisation Versioning & fraîcheur Livrables
Mise en Œuvre des Outils
Connecteurs, parsers, normalisation, calcul de hash/version, incrémental (Cron/Celery), et schémas SQL.
2) Cartographie & Ingestion (Jours 3–10)
Inventaire des sources
| Type | Accès | Notes |
|---|---|---|
| Fichiers (HTML/MD, PDF, DOCX) | FS / partage / S3 | OCR si scannés, tables à extraire |
| Confluence / Notion | API token | Export HTML/MD + hiérarchie |
| DB (PostgreSQL/MySQL) | Lecture seule | Vues dédiées par périmètre |
| Stockage objet (S3/GCS) | IAM / key | Prefix par projet + tags |
| Git | SSH / PAT | Docs, READMEs, wiki |
Définir pour chaque source : propriétaire, SLA de mise à jour, volumétrie estimée, langues, règles RBAC.
Modèle “source” (contrat JSON)
{
"sources":[
{"code":"pdf_repo","type":"pdf","path":"/data/pdfs","rbac":["EMPLOYEE"],"lang":["fr","en"]},
{"code":"conf_space","type":"confluence","base_url":"https://confluence.example","space":"ENG","token":"***"},
{"code":"notion_hr","type":"notion","workspace":"HR","token":"***"},
{"code":"db_hr","type":"postgres","dsn":"postgresql://ro@db/hr","table":"kb_articles"},
{"code":"s3_manuals","type":"s3","bucket":"kb-manuals","prefix":"2025/","region":"eu-west-1"},
{"code":"git_docs","type":"git","url":"git@github.com:org/docs.git","branch":"main"}
]
}Extraction & nettoyage
- Extraire texte + métadonnées: titre, auteur, date, URL/chemin.
- Nettoyage : retirer headers/footers, numéros de page, notes, doublons.
- Tables → markdown/CSV normalisé ; images → alt-text/ocr si nécessaire.
- Détection langue + normalisation unicode + trim espaces.
Mappage métadonnées
| Champ | Description |
|---|---|
| source_id | Identifiant stable de la source (ex: conf:ENG:pageId) |
| version | Numéro/version logique ou horodatage |
| hash | Hash du contenu normalisé (sha256) |
| last_modified | Datetime source |
| is_current | Flag de fraîcheur pour l’index |
| rbac | Liste de claims d’accès |
| tags | Mots-clés, business unit, langue… |
Exemple normalisation (pseudo-Python)
# normalize.py
from hashlib import sha256
def normalize(doc):
text = clean_text(doc.raw_text) # remove headers/footers/tables converties, etc.
meta = {
"title": doc.title, "author": doc.author, "date": doc.date,
"url": doc.url, "path": doc.path, "lang": detect_lang(text),
"source_id": doc.source_id, "last_modified": doc.last_modified,
"rbac": doc.rbac, "tags": doc.tags
}
h = sha256((text + "|" + str(meta)).encode("utf-8")).hexdigest()
meta["hash"] = h
return text, metaStratégie
- Stocker source_id + version + hash + last_modified + is_current.
- Incrémental : ne (ré)indexer que si hash change ou last_modified plus récent.
- Garder historique N versions (audit) ; marquer la plus récente is_current=true.
Schéma SQL (PostgreSQL)
CREATE TABLE rag_docs (
id TEXT PRIMARY KEY,
source_id TEXT NOT NULL,
version TEXT NOT NULL,
last_modified TIMESTAMP WITH TIME ZONE,
lang TEXT,
meta JSONB,
hash TEXT NOT NULL,
is_current BOOLEAN DEFAULT TRUE,
text TEXT
);
CREATE INDEX ON rag_docs (source_id, version);
CREATE INDEX ON rag_docs ((meta->>'rbac'));
CREATE INDEX ON rag_docs ((meta->>'tags'));Upsert & mise à jour
# upsert.py
def upsert_doc(db, doc_id, text, meta):
# récupérer version courante
db.execute("UPDATE rag_docs SET is_current=false WHERE source_id=%s", [meta["source_id"]])
db.execute("""
INSERT INTO rag_docs (id, source_id, version, last_modified, lang, meta, hash, is_current, text)
VALUES (%s,%s,%s,%s,%s,%s,%s,true,%s)
ON CONFLICT (id) DO UPDATE SET
last_modified=EXCLUDED.last_modified, lang=EXCLUDED.lang,
meta=EXCLUDED.meta, hash=EXCLUDED.hash, is_current=true, text=EXCLUDED.text
""", [doc_id, meta["source_id"], meta["version"], meta["last_modified"], meta["lang"], json.dumps(meta), meta["hash"], text])L’ID peut être un hash de source_id+version pour garantir l’unicité.
Livrables attendus
- Connecteurs d’ingestion (fichiers, Confluence, Notion, DB, S3/GCS, Git).
- Schéma des métadonnées & politique de versioning.
- Pipeline incrémental (Cron/Celery) + logs & tableaux de bord.
- Rapport d’inventaire des sources (SLA, volumétrie, RBAC, langues).
Contrat JSON de sortie (exemple)
{
"doc_id":"conf:ENG:1234@2025-09-15T12:00:00Z",
"source_id":"conf:ENG:1234",
"version":"2025-09-15T12:00:00Z",
"last_modified":"2025-09-15T11:58:03Z",
"lang":"fr",
"hash":"sha256:...",
"is_current":true,
"meta":{"title":"Politique SSO","author":"IT","url":"...","rbac":["EMPLOYEE"],"tags":["IT","SSO"]},
"text":"Contenu normalisé…"
}Implémentation technique – Ingestion
Parsers recommandés
- HTML/MD : BeautifulSoup / markdown-it
- PDF : pdfminer.six / pypdf + OCR (Tesseract) pour scannés
- DOCX : python-docx
Extraction (pseudo-code)
# files_ingest.py
for path in walk("/data"):
ext = path.suffix.lower()
if ext in (".html",".htm",".md"):
text, meta = parse_html_md(path)
elif ext == ".pdf":
text, meta = parse_pdf(path, ocr_if_scanned=True)
elif ext == ".docx":
text, meta = parse_docx(path)
text, meta = normalize(Document(text=text, path=str(path), **meta))
yield text, metaBonnes pratiques
- Uniformiser les titres (H1–H3) et les listes.
- Convertir tables → markdown/CSV ; conserver l’ordre de lecture.
- Remplir source_id (ex: file:/data/relatif) + last_modified FS.
Confluence
# confluence_ingest.py
def fetch_confluence(space, base_url, token):
for page in list_pages(space):
html = export_html(page.id, base_url, token)
text, meta = html_to_text_with_meta(html, title=page.title, url=page.url)
meta.update({"source_id": f"conf:{space}:{page.id}", "last_modified": page.updated})
yield normalize(Document(text=text, **meta))Notion
# notion_ingest.py
def fetch_notion(db_id, token):
for npage in query_database(db_id):
md = notion_to_markdown(npage)
text, meta = md_to_text_with_meta(md, title=npage.title, url=npage.url)
meta.update({"source_id": f"notion:{db_id}:{npage.id}", "last_modified": npage.updated})
yield normalize(Document(text=text, **meta))Lecture seule, vues dédiées
-- Exemple PostgreSQL
CREATE VIEW kb_articles_v AS
SELECT id, title, body AS text, author, updated_at AS last_modified, lang, rbac, array['KB']::text[] AS tags
FROM kb_articles
WHERE published = true;Extraction Python
# db_ingest.py
for row in db.query("SELECT * FROM kb_articles_v"):
text = row["text"]
meta = {k: row[k] for k in ("title","author","last_modified","lang")}
meta.update({"source_id": f"db:kb:{row['id']}", "rbac": row["rbac"], "tags": row["tags"]})
yield normalize(Document(text=text, **meta))Listing & métadonnées
# s3_ingest.py
for obj in s3.list_objects(bucket, prefix):
body = s3.get_object_text(bucket, obj.key)
text, meta = parse_auto(body, key=obj.key) # détecte type
meta.update({"source_id": f"s3:{bucket}:{obj.key}", "last_modified": obj.last_modified})
yield normalize(Document(text=text, **meta))Conseils
- Utiliser des prefix par projet pour limiter le scope.
- Stocker ETag/VersionId dans version si activé.
Extraction
# git_ingest.py
for file in git_ls(repo="git@github.com:org/docs.git", branch="main", paths=["docs","wiki"]):
text, meta = parse_textual(file.path, file.content)
meta.update({"source_id": f"git:{file.commit[:7]}:{file.path}", "version": file.commit, "last_modified": file.timestamp})
yield normalize(Document(text=text, **meta))Bonnes pratiques
- Ignorer binaires ; ne garder que formats textuels.
- Utiliser le SHA commit comme version.
Cron Linux (ex.)
# /etc/cron.d/rag_ingest
*/15 * * * * www-data /venv/bin/python /app/manage.py rag_ingest --incremental true --max 500Management command (squelette)
# manage.py command: rag_ingest
class Command(BaseCommand):
def add_arguments(self, p):
p.add_argument("--incremental", type=bool, default=True)
p.add_argument("--max", type=int, default=1000)
def handle(self, *args, **o):
for text, meta in ingest_all(incremental=o["incremental"], limit=o["max"]):
upsert_doc(db, make_id(meta["source_id"], meta.get("version")), text, meta)Celery (option)
# tasks.py
@app.task(rate_limit="10/s", soft_time_limit=30)
def ingest_source_task(src_conf):
for text, meta in run_connector(src_conf):
upsert_doc(db, make_id(meta["source_id"], meta.get("version")), text, meta)
# planification
@app.on_after_configure.connect
def setup_periodic(sender, **kwargs):
sender.add_periodic_task(900.0, sweep_sources.s()) # toutes les 15 min