Project Oxygen & Ideo-LabIDEO LAB Dashboard 2026

Toolbox Dev — Ideo-Lab

PipeFlow — Pipelines fonctionnels typés (lazy) pour Python

Map / Filter / Flatten / GroupBy / Unique / Reduce — sans dépendances, en restant “Pythonic”.

v0.1.0  •  Python 3.10+

Pourquoi PipeFlow ?

PipeFlow permet d’écrire des traitements sur des listes, générateurs ou flux de données sous forme de pipeline lisible, chaînable et lazy. L’objectif est simple : éviter les “soupes” de comprehensions, de boucles imbriquées et de variables temporaires.

Lazy signifie : les opérations map()/filter()/take()/drop() ne calculent rien tant que tu ne demandes pas un résultat (ex: to_list()). C’est idéal pour les gros volumes ou les flux.

Enchaînement lisible

Même pipeline, lisible comme une phrase : filtrer → transformer → dédupliquer → limiter.

from pipeflow import pipe

out = (
  pipe([1,2,3,4,5])
    .filter(lambda x: x % 2 == 0)
    .map(lambda x: x * 10)
    .to_list()
)
Résultat →[20, 40]

Zéro dépendance

PipeFlow reste minimaliste : pas de pandas, pas de framework. Parfait pour les scripts, cron, management commands, parsing de logs, ETL léger.

mapfilterflattenunique group_bychunkreduce

Matérialisation

Certaines opérations matérialisent (donc chargent en mémoire) : sort(), group_by(), to_list(), peek().

LazyMatérialise
map, filter, take, drop, chunk, flatten, tapsort, group_by, to_list, to_set, peek, reduce

Quand l’utiliser ?

  • Pré-traitement d’objets Python (list/generator) avant DB / API / export CSV
  • Traitement de logs (filtrer, extraire, dédupliquer, regrouper)
  • Nettoyage de payloads JSON (active users, champs, normalisations)
  • Pipelines “métier” courts : lisible, testable, réutilisable

Installation (.whl)

PipeFlow est livré en wheel pour une installation rapide dans un venv. L’URL ci-dessous sert au téléchargement ; côté serveur, pip préfère un chemin fichier.

Télécharger le Wheel (pipeflow v0.1.0)

/static/toolbox/pipeflow-0.1.0-py3-none-any.whl

Commande d’installation

# venv activé
pip install /chemin/local/pipeflow-0.1.0-py3-none-any.whl

Test rapide (sanity check)

python -c "from pipeflow import pipe; print(pipe([1,2,3,4]).filter(lambda x:x%2==0).map(lambda x:x*10).to_list())"
Résultat →[20, 40]
Astuce prod : si tu déploies sur un serveur (Django), installe le wheel dans ton venv système (ou celui du service systemd) et versionne le wheel dans ton stockage statique / artefacts.

Exemples pratiques (avec résultats)

1) Nettoyage de données (dict) : filter + pluck + map

Cas typique : extraire les emails actifs, normaliser en lowercase.

from pipeflow import pipe

users = [
  {"email":"A@X.com","active":True},
  {"email":"B@X.com","active":False},
  {"email":"A@X.com","active":True},
]

emails = (
  pipe(users)
    .filter(lambda u: u["active"])
    .pluck("email")
    .map(str.lower)
    .unique()
    .to_list()
)

print(emails)
Résultat →["a@x.com"]

2) Flatten / FlatMap

Flatten : 1 niveau. FlatMap : transformer puis aplatir.

from pipeflow import pipe

print(pipe([[1,2],[3],[4,5]]).flatten().to_list())
print(pipe([1,2,3]).flat_map(lambda x: [x, x]).to_list())
Résultat →[1,2,3,4,5]  |  [1,1,2,2,3,3]

3) Unique stable (conserve le 1er)

Important : stable = l’ordre original est respecté.

from pipeflow import pipe

print(pipe([3,1,3,2,1]).unique().to_list())
Résultat →[3,1,2]

4) GroupBy (regroupement)

Regrouper par longueur de string (clé = len).

from pipeflow import pipe

g = pipe(["aa","b","ccc","dd"]).group_by(len)
print(g)
Résultat →{2:["aa","dd"], 1:["b"], 3:["ccc"]}

5) Pagination / batch : chunk()

Découper un flux en “lots” (useful pour appels API / DB batch / jobs).

from pipeflow import pipe

print(pipe(range(10)).chunk(3).to_list())
Résultat →[[0,1,2],[3,4,5],[6,7,8],[9]]

6) take() / drop() : fenêtrage simple

from pipeflow import pipe

print(pipe(range(100)).drop(10).take(5).to_list())
Résultat →[10, 11, 12, 13, 14]

7) sort() (matérialise) + take()

Tri + top N : attention, sort charge en mémoire.

from pipeflow import pipe

scores = [("alice", 10), ("bob", 99), ("chris", 42)]
top = (
  pipe(scores)
    .sort(key=lambda x: x[1], reverse=True)
    .take(2)
    .to_list()
)
print(top)
Résultat →[("bob",99), ("chris",42)]

8) reduce() : agrégat

Somme, concat, etc. (avec ou sans initial).

from pipeflow import pipe

print(pipe([1,2,3]).reduce(lambda a,b: a+b))
print(pipe([]).reduce(lambda a,b: a+b, 0))
Résultat →6  |  0

Avancé & Bonnes pratiques

Cas réels (API / JSON / Logs)

Exemple : tu as une liste d’events (logs/JSON). Tu veux les erreurs, messages uniques, puis limiter.

from pipeflow import pipe

events = [
  {"status":"OK", "message":"health"},
  {"status":"ERROR", "message":"timeout"},
  {"status":"ERROR", "message":"timeout"},
  {"status":"ERROR", "message":"db down"},
]

out = (
  pipe(events)
    .filter(lambda e: e["status"] == "ERROR")
    .pluck("message")
    .unique()
    .take(10)
    .to_list()
)

print(out)
Résultat →["timeout", "db down"]

Django / ORM : “attention lazy”

Si tu passes un QuerySet Django dans pipe(qs), il est évalué au moment où tu matérialises (to_list, group_by, sort, etc.). C’est souvent OK, mais garde en tête :
  • sort() va charger tous les éléments
  • group_by() construit un dict complet (mémoire)
Pour gros volumes : utilise pagination DB (limit/offset) ou streaming + chunk().

Debug : tap() / peek()

tap() injecte un effet de bord (log) sans casser la chaîne. peek() affiche les N premiers éléments.

from pipeflow import pipe

data = [1,2,3,4,5]
out = (
  pipe(data)
    .tap(lambda x: print("seen:", x))
    .filter(lambda x: x > 2)
    .peek(2)
)

print(out)
Résultat →[3, 4]

tee() : duplication (avec buffer)

tee() duplique un flux en 2. Mais itertools.tee bufferise si un côté consomme plus vite. Évite sur des flux infinis ou énormes sans contrôle.

Cheat sheet

BesoinPipeline
Nettoyage emails actifspipe(users).filter(...).pluck("email").map(str.lower).unique().to_list()
Top N par scorepipe(items).sort(key=..., reverse=True).take(N).to_list()
Traitement batch APIpipe(records).chunk(100).map(call_api).to_list()
Recherche 1er matchpipe(xs).find(pred, default=None)
Aplatir listespipe(list_of_lists).flatten().to_list()

Performance : quand c’est utile ?

PipeFlow vise la lisibilité et la composabilité. Sur des micro-cas, une list comprehension peut être plus rapide. Sur des traitements “métier” avec 6–10 étapes, PipeFlow réduit la complexité cognitive et les bugs. Sur gros flux, le côté lazy évite des listes intermédiaires.