Pourquoi PipeFlow ?
PipeFlow permet d’écrire des traitements sur des listes, générateurs ou flux de données sous forme de pipeline lisible, chaînable et lazy. L’objectif est simple : éviter les “soupes” de comprehensions, de boucles imbriquées et de variables temporaires.
map()/filter()/take()/drop() ne calculent rien tant que tu ne demandes pas un résultat (ex: to_list()). C’est idéal pour les gros volumes ou les flux.Enchaînement lisible
Même pipeline, lisible comme une phrase : filtrer → transformer → dédupliquer → limiter.
from pipeflow import pipe
out = (
pipe([1,2,3,4,5])
.filter(lambda x: x % 2 == 0)
.map(lambda x: x * 10)
.to_list()
)Zéro dépendance
PipeFlow reste minimaliste : pas de pandas, pas de framework. Parfait pour les scripts, cron, management commands, parsing de logs, ETL léger.
Matérialisation
Certaines opérations matérialisent (donc chargent en mémoire) : sort(), group_by(), to_list(), peek().
| Lazy | Matérialise |
|---|---|
| map, filter, take, drop, chunk, flatten, tap | sort, group_by, to_list, to_set, peek, reduce |
Quand l’utiliser ?
- Pré-traitement d’objets Python (list/generator) avant DB / API / export CSV
- Traitement de logs (filtrer, extraire, dédupliquer, regrouper)
- Nettoyage de payloads JSON (active users, champs, normalisations)
- Pipelines “métier” courts : lisible, testable, réutilisable
Installation (.whl)
PipeFlow est livré en wheel pour une installation rapide dans un venv. L’URL ci-dessous sert au téléchargement ; côté serveur, pip préfère un chemin fichier.
/static/toolbox/pipeflow-0.1.0-py3-none-any.whl
Commande d’installation
# venv activé
pip install /chemin/local/pipeflow-0.1.0-py3-none-any.whlTest rapide (sanity check)
python -c "from pipeflow import pipe; print(pipe([1,2,3,4]).filter(lambda x:x%2==0).map(lambda x:x*10).to_list())"Exemples pratiques (avec résultats)
1) Nettoyage de données (dict) : filter + pluck + map
Cas typique : extraire les emails actifs, normaliser en lowercase.
from pipeflow import pipe
users = [
{"email":"A@X.com","active":True},
{"email":"B@X.com","active":False},
{"email":"A@X.com","active":True},
]
emails = (
pipe(users)
.filter(lambda u: u["active"])
.pluck("email")
.map(str.lower)
.unique()
.to_list()
)
print(emails)2) Flatten / FlatMap
Flatten : 1 niveau. FlatMap : transformer puis aplatir.
from pipeflow import pipe
print(pipe([[1,2],[3],[4,5]]).flatten().to_list())
print(pipe([1,2,3]).flat_map(lambda x: [x, x]).to_list())3) Unique stable (conserve le 1er)
Important : stable = l’ordre original est respecté.
from pipeflow import pipe
print(pipe([3,1,3,2,1]).unique().to_list())4) GroupBy (regroupement)
Regrouper par longueur de string (clé = len).
from pipeflow import pipe
g = pipe(["aa","b","ccc","dd"]).group_by(len)
print(g)5) Pagination / batch : chunk()
Découper un flux en “lots” (useful pour appels API / DB batch / jobs).
from pipeflow import pipe
print(pipe(range(10)).chunk(3).to_list())6) take() / drop() : fenêtrage simple
from pipeflow import pipe
print(pipe(range(100)).drop(10).take(5).to_list())7) sort() (matérialise) + take()
Tri + top N : attention, sort charge en mémoire.
from pipeflow import pipe
scores = [("alice", 10), ("bob", 99), ("chris", 42)]
top = (
pipe(scores)
.sort(key=lambda x: x[1], reverse=True)
.take(2)
.to_list()
)
print(top)8) reduce() : agrégat
Somme, concat, etc. (avec ou sans initial).
from pipeflow import pipe
print(pipe([1,2,3]).reduce(lambda a,b: a+b))
print(pipe([]).reduce(lambda a,b: a+b, 0))Avancé & Bonnes pratiques
Cas réels (API / JSON / Logs)
Exemple : tu as une liste d’events (logs/JSON). Tu veux les erreurs, messages uniques, puis limiter.
from pipeflow import pipe
events = [
{"status":"OK", "message":"health"},
{"status":"ERROR", "message":"timeout"},
{"status":"ERROR", "message":"timeout"},
{"status":"ERROR", "message":"db down"},
]
out = (
pipe(events)
.filter(lambda e: e["status"] == "ERROR")
.pluck("message")
.unique()
.take(10)
.to_list()
)
print(out)Django / ORM : “attention lazy”
pipe(qs), il est évalué au moment où tu matérialises (to_list, group_by, sort, etc.). C’est souvent OK, mais garde en tête :sort()va charger tous les élémentsgroup_by()construit un dict complet (mémoire)
chunk().Debug : tap() / peek()
tap() injecte un effet de bord (log) sans casser la chaîne. peek() affiche les N premiers éléments.
from pipeflow import pipe
data = [1,2,3,4,5]
out = (
pipe(data)
.tap(lambda x: print("seen:", x))
.filter(lambda x: x > 2)
.peek(2)
)
print(out)tee() : duplication (avec buffer)
itertools.tee bufferise si un côté consomme plus vite. Évite sur des flux infinis ou énormes sans contrôle.Cheat sheet
| Besoin | Pipeline |
|---|---|
| Nettoyage emails actifs | pipe(users).filter(...).pluck("email").map(str.lower).unique().to_list() |
| Top N par score | pipe(items).sort(key=..., reverse=True).take(N).to_list() |
| Traitement batch API | pipe(records).chunk(100).map(call_api).to_list() |
| Recherche 1er match | pipe(xs).find(pred, default=None) |
| Aplatir listes | pipe(list_of_lists).flatten().to_list() |
