Construire un pipeline MLOps complet : du prototype à la production
Résumé rapide
Sommaire Introduction : pourquoi le MLOps est devenu indispensable Les fondamentaux du MLOps Architecture complète d'un pipeline MLOps MLflow : tracking d'expériences et model registry DVC : le versioning des données et des modèles Feature Stores : centraliser l'ingénierie des features CI/CD pour le Machine Learning Monitoring et détection de drift avec Evidently Infrastructure et conteneurisation avec Docker Pipeline complet : tout assembler Bonnes pratiques et anti-patterns Conclusion Sources 💡...
Sommaire
- Introduction : pourquoi le MLOps est devenu indispensable
- Les fondamentaux du MLOps
- Architecture complète d’un pipeline MLOps
- MLflow : tracking d’expériences et model registry
- DVC : le versioning des données et des modèles
- Feature Stores : centraliser l’ingénierie des features
- CI/CD pour le Machine Learning
- Monitoring et détection de drift avec Evidently
- Infrastructure et conteneurisation avec Docker
- Pipeline complet : tout assembler
- Bonnes pratiques et anti-patterns
- Conclusion
- Sources
Cet article est de niveau avancé. Il suppose une connaissance solide de Python, des bases du Machine Learning (entraînement, validation, déploiement), de Git, et une familiarité avec Docker et les environnements cloud. Si vous débutez en ML, nous vous recommandons de commencer par nos articles d’introduction au Machine Learning avant de continuer ici.
Introduction : pourquoi le MLOps est devenu indispensable
Vous avez entraîné un modèle de Machine Learning dans un notebook Jupyter. Il fonctionne brillamment sur vos données de test. Votre démonstration a impressionné les stakeholders. Félicitations — vous avez fait 10 % du travail.
Les 90 % restants ? Mettre ce modèle en production, le maintenir, le monitorer, le ré-entraîner quand les données changent, gérer les versions, automatiser le pipeline, assurer la reproductibilité, documenter le tout. C’est ce que l’on appelle le MLOps (Machine Learning Operations), et c’est le gouffre dans lequel 87 % des projets de Machine Learning tombent et meurent, selon Gartner.
Le MLOps est né de la convergence de trois disciplines : le Machine Learning (modélisation, entraînement, évaluation), le DevOps (CI/CD, infrastructure as code, monitoring) et le Data Engineering (pipelines de données, qualité, gouvernance). L’objectif est simple à énoncer mais complexe à réaliser : rendre le cycle de vie complet d’un modèle ML aussi fiable, reproductible et automatisé que le cycle de vie d’une application logicielle classique.
Dans cet article, nous allons construire ensemble un pipeline MLOps complet, en utilisant les outils les plus matures de l’écosystème : MLflow pour le tracking d’expériences et le model registry, DVC pour le versioning des données, Evidently pour le monitoring de drift, Docker pour la conteneurisation, et GitHub Actions pour le CI/CD. À la fin de cet article, vous aurez une architecture de référence que vous pourrez adapter à vos propres projets.
Les fondamentaux du MLOps
Le cycle de vie d’un modèle ML
Un modèle ML en production passe par plusieurs phases, chacune avec ses propres défis :
- Exploration et prototypage : expérimentation rapide avec différents algorithmes, features et hyperparamètres. C’est la phase « notebook », souvent chaotique mais créative.
- Développement et validation : formalisation du pipeline d’entraînement, tests rigoureux, validation sur des données représentatives, documentation.
- Déploiement : mise en production du modèle, intégration aux systèmes existants, tests de charge, rollout progressif.
- Monitoring : surveillance des performances en production, détection de drift, alertes automatiques.
- Ré-entraînement : quand les performances se dégradent ou que de nouvelles données sont disponibles, le modèle doit être ré-entraîné, validé et redéployé.
Le MLOps vise à automatiser et fiabiliser chacune de ces phases, et surtout les transitions entre elles. Sans MLOps, ces transitions sont manuelles, fragiles et non reproductibles — exactement les conditions qui font échouer les projets ML.
Les niveaux de maturité MLOps
Google a proposé une classification en trois niveaux de maturité MLOps qui est devenue une référence dans l’industrie :
- Niveau 0 — Manuel : tout est fait manuellement. L’entraînement se fait dans des notebooks, le déploiement est ad hoc (copier-coller de fichiers de modèle), il n’y a pas de monitoring. C’est le niveau de la plupart des prototypes et des POC.
- Niveau 1 — Pipeline ML automatisé : le pipeline d’entraînement est automatisé et reproductible. Le déploiement peut être semi-automatisé. Le monitoring existe mais est basique. C’est le niveau que la plupart des équipes devraient viser comme premier objectif.
- Niveau 2 — CI/CD pour ML : le pipeline complet est automatisé, de l’ingestion des données au déploiement et au monitoring. Le ré-entraînement est déclenché automatiquement quand nécessaire. Les tests couvrent les données, le modèle et l’infrastructure. C’est le niveau des organisations les plus matures.
Dans cet article, nous allons construire un système de Niveau 2. C’est ambitieux, mais c’est la bonne cible — les équipes qui s’arrêtent au Niveau 0 finissent presque toujours par accumuler de la dette technique ML qui devient ingérable.
Architecture complète d’un pipeline MLOps
Voici l’architecture de référence que nous allons construire. Chaque composant sera détaillé dans les sections suivantes.
┌─────────────────────────────────────────────────────────────────┐
│ ARCHITECTURE MLOPS COMPLÈTE │
├─────────────────────────────────────────────────────────────────┤
│ │
│ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐ │
│ │ Source │───▶│ Feature │───▶│Training │───▶│ Model │ │
│ │ Data │ │ Store │ │Pipeline │ │ Registry │ │
│ │ (DVC) │ │ (Feast) │ │(MLflow) │ │(MLflow) │ │
│ └──────────┘ └──────────┘ └──────────┘ └──────────┘ │
│ │ │ │
│ │ ┌──────────┐ │ │
│ │ │ CI/CD │ │ │
│ └─────────────▶│ (GitHub │◀────────────────────┘ │
│ │ Actions) │ │
│ └────┬─────┘ │
│ │ │
│ ┌────▼─────┐ ┌──────────┐ │
│ │ Deploy │───▶│Monitoring│ │
│ │ (Docker/ │ │(Evidently│ │
│ │ K8s) │ │ + Alerts)│ │
│ └──────────┘ └──────────┘ │
│ │
└─────────────────────────────────────────────────────────────────┘Les composants principaux :
- DVC (Data Version Control) : versionne les données et les artefacts de manière intégrée à Git.
- Feature Store (Feast) : centralise et sert les features pour l’entraînement et l’inférence.
- MLflow : tracke les expériences, gère le registre de modèles et sert les modèles.
- GitHub Actions : orchestre le CI/CD pour le code, les données et les modèles.
- Docker : conteneurise les pipelines et les services pour la portabilité et la reproductibilité.
- Evidently : monitore la qualité des données et les performances des modèles en production.
MLflow : tracking d’expériences et model registry
Pourquoi MLflow ?
MLflow est le standard de facto pour le tracking d’expériences ML. Créé par Databricks et open source, il résout un problème fondamental : comment garder une trace de toutes les expériences (hyperparamètres, métriques, artefacts, code) de manière organisée et requêtable ? Sans un tel outil, les data scientists finissent invariablement par nommer leurs modèles model_v2_final_FINAL_vraimentfinal.pkl — et tout le monde sait comment cette histoire se termine.
Installation et configuration
# Installation
pip install mlflow[extras]
# Lancer le serveur MLflow avec un backend PostgreSQL
mlflow server \
--backend-store-uri postgresql://user:pass@localhost:5432/mlflow \
--default-artifact-root s3://my-bucket/mlflow-artifacts \
--host 0.0.0.0 \
--port 5000En production, ne stockez jamais les artefacts sur le filesystem local du serveur MLflow. Utilisez un stockage objet (S3, GCS, Azure Blob) pour la durabilité et la scalabilité. Pour le backend store, utilisez PostgreSQL ou MySQL — SQLite convient pour le développement mais pas pour la production multi-utilisateurs.
Tracking d’expériences
Le tracking est le cœur de MLflow. Chaque « run » capture l’ensemble du contexte d’une expérience :
import mlflow
import mlflow.sklearn
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import accuracy_score, f1_score, precision_score, recall_score
from sklearn.model_selection import train_test_split
import pandas as pd
# Configuration du tracking server
mlflow.set_tracking_uri("http://localhost:5000")
mlflow.set_experiment("fraud-detection-v2")
# Chargement des données
df = pd.read_parquet("data/processed/transactions.parquet")
X = df.drop(columns=["is_fraud"])
y = df["is_fraud"]
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=0.2, random_state=42, stratify=y
)
# Entraînement avec tracking MLflow
with mlflow.start_run(run_name="rf-baseline") as run:
# Log des paramètres
params = {
"n_estimators": 200,
"max_depth": 15,
"min_samples_split": 5,
"class_weight": "balanced",
"random_state": 42
}
mlflow.log_params(params)
# Log des métadonnées du dataset
mlflow.log_param("dataset_size", len(df))
mlflow.log_param("feature_count", X.shape[1])
mlflow.log_param("fraud_ratio", y.mean())
# Entraînement
model = RandomForestClassifier(**params)
model.fit(X_train, y_train)
# Évaluation
y_pred = model.predict(X_test)
metrics = {
"accuracy": accuracy_score(y_test, y_pred),
"f1_score": f1_score(y_test, y_pred),
"precision": precision_score(y_test, y_pred),
"recall": recall_score(y_test, y_pred),
}
mlflow.log_metrics(metrics)
# Log du modèle avec signature
from mlflow.models import infer_signature
signature = infer_signature(X_train, model.predict(X_train))
mlflow.sklearn.log_model(
model,
"model",
signature=signature,
registered_model_name="fraud-detector"
)
# Log d'artefacts supplémentaires
# Feature importance plot
import matplotlib.pyplot as plt
import numpy as np
importances = model.feature_importances_
indices = np.argsort(importances)[-20:]
fig, ax = plt.subplots(figsize=(10, 8))
ax.barh(range(len(indices)), importances[indices])
ax.set_yticks(range(len(indices)))
ax.set_yticklabels(X.columns[indices])
ax.set_title("Top 20 Feature Importances")
fig.tight_layout()
fig.savefig("feature_importance.png", dpi=150)
mlflow.log_artifact("feature_importance.png")
print(f"Run ID: {run.info.run_id}")
print(f"Metrics: {metrics}")Model Registry
Le Model Registry de MLflow est l’endroit où les modèles passent du stade « expérimentation » au stade « production ». Il fournit un workflow de promotion de modèles avec des étapes (Staging, Production, Archived) et un historique complet des versions :
from mlflow import MlflowClient
client = MlflowClient()
# Récupérer la dernière version du modèle
latest_version = client.get_latest_versions(
"fraud-detector", stages=["None"]
)[0]
# Promouvoir en staging pour validation
client.transition_model_version_stage(
name="fraud-detector",
version=latest_version.version,
stage="Staging",
archive_existing_versions=False
)
# Après validation, promouvoir en production
client.transition_model_version_stage(
name="fraud-detector",
version=latest_version.version,
stage="Production",
archive_existing_versions=True # Archive l'ancienne version prod
)
# Charger le modèle de production pour l'inférence
import mlflow.pyfunc
model = mlflow.pyfunc.load_model("models:/fraud-detector/Production")
predictions = model.predict(new_data)Comparaison d’expériences
L’un des grands avantages de MLflow est la possibilité de comparer systématiquement les expériences. Voici un script utilitaire pour comparer les N meilleurs runs d’une expérience :
import mlflow
import pandas as pd
# Rechercher les meilleurs runs
experiment = mlflow.get_experiment_by_name("fraud-detection-v2")
runs = mlflow.search_runs(
experiment_ids=[experiment.experiment_id],
order_by=["metrics.f1_score DESC"],
max_results=10
)
# Afficher la comparaison
comparison = runs[[
"run_id",
"params.n_estimators",
"params.max_depth",
"metrics.accuracy",
"metrics.f1_score",
"metrics.precision",
"metrics.recall",
"start_time"
]]
print(comparison.to_string(index=False))DVC : le versioning des données et des modèles
Le problème que DVC résout
Git est parfait pour versionner du code, mais il n’est pas conçu pour des fichiers volumineux (datasets, modèles entraînés, artefacts). DVC (Data Version Control) étend Git pour gérer ces fichiers lourds de manière transparente. Concrètement, DVC stocke les fichiers volumineux dans un stockage distant (S3, GCS, Azure, NFS) et ne garde dans Git que des fichiers de métadonnées légers (les fichiers .dvc) qui pointent vers les versions correctes.
Installation et initialisation
# Installation
pip install dvc[s3] # Ajoutez le backend de votre choix
# Initialisation dans un repo Git existant
cd my-ml-project
dvc init
# Configurer le stockage distant
dvc remote add -d myremote s3://my-bucket/dvc-storage
dvc remote modify myremote region eu-west-1
# Committer la configuration
git add .dvc .dvcignore
git commit -m "Initialize DVC with S3 remote"Versionner les données
# Ajouter un dataset au tracking DVC
dvc add data/raw/transactions_2025.csv
# DVC crée un fichier .dvc (métadonnées) et ajoute le fichier original au .gitignore
# Committer les métadonnées dans Git
git add data/raw/transactions_2025.csv.dvc data/raw/.gitignore
git commit -m "Add transactions dataset v1"
# Pousser les données vers le stockage distant
dvc push
# Quand le dataset est mis à jour, simplement :
dvc add data/raw/transactions_2025.csv
git add data/raw/transactions_2025.csv.dvc
git commit -m "Update transactions dataset v2"
dvc push
# Pour revenir à une version précédente :
git checkout HEAD~1 -- data/raw/transactions_2025.csv.dvc
dvc checkoutPipelines DVC
DVC permet aussi de définir des pipelines reproductibles via un fichier dvc.yaml. C’est l’un de ses features les plus puissants :
# dvc.yaml
stages:
prepare:
cmd: python src/prepare.py
deps:
- src/prepare.py
- data/raw/transactions_2025.csv
params:
- prepare.test_size
- prepare.random_state
outs:
- data/processed/train.parquet
- data/processed/test.parquet
featurize:
cmd: python src/featurize.py
deps:
- src/featurize.py
- data/processed/train.parquet
- data/processed/test.parquet
params:
- featurize.features
outs:
- data/features/train_features.parquet
- data/features/test_features.parquet
train:
cmd: python src/train.py
deps:
- src/train.py
- data/features/train_features.parquet
params:
- train.n_estimators
- train.max_depth
- train.learning_rate
outs:
- models/model.pkl
metrics:
- metrics/train_metrics.json:
cache: false
evaluate:
cmd: python src/evaluate.py
deps:
- src/evaluate.py
- models/model.pkl
- data/features/test_features.parquet
metrics:
- metrics/eval_metrics.json:
cache: false
plots:
- metrics/confusion_matrix.csv:
x: predicted
y: actual
- metrics/roc_curve.csv:
x: fpr
y: tpr# params.yaml — fichier de paramètres centralisé
prepare:
test_size: 0.2
random_state: 42
featurize:
features:
- amount
- hour_of_day
- day_of_week
- merchant_category
- distance_from_home
- transaction_frequency_1h
- avg_amount_30d
train:
n_estimators: 200
max_depth: 15
learning_rate: 0.1# Exécuter le pipeline complet
dvc repro
# DVC détecte automatiquement quelles étapes doivent être ré-exécutées
# en fonction des changements dans les dépendances
# Comparer les métriques entre branches/commits
dvc metrics diff
dvc plots diffL’un des patterns les plus puissants de DVC est qu’un commit Git donné pointe toujours vers les mêmes données, les mêmes paramètres et les mêmes résultats. Quand vous faites
git checkout v1.2.0 && dvc checkout, vous obtenez exactement l’état du projet (code + données + modèle + métriques) tel qu’il était à cette version. C’est la reproductibilité totale.Feature Stores : centraliser l’ingénierie des features
Pourquoi un Feature Store ?
L’ingénierie des features (la transformation des données brutes en variables prédictives pour le modèle) représente typiquement 60 à 80 % du travail d’un projet ML. Sans Feature Store, les équipes réinventent constamment la roue : chaque data scientist calcule ses propres features, souvent de manière incohérente entre l’entraînement et l’inférence (le fameux « training-serving skew »).
Un Feature Store centralise la définition, le calcul, le stockage et le service des features, garantissant la cohérence entre l’entraînement (batch) et l’inférence (temps réel).
Feast : le Feature Store open source
Feast (Feature Store) est le Feature Store open source le plus populaire. Voici comment l’intégrer dans notre pipeline :
# feature_repo/feature_definitions.py
from feast import Entity, Feature, FeatureView, ValueType
from feast.infra.offline_stores.contrib.postgres_offline_store.postgres_source import (
PostgreSQLSource,
)
from datetime import timedelta
# Entité principale
customer = Entity(
name="customer_id",
value_type=ValueType.STRING,
description="Identifiant client unique"
)
# Source de données
transactions_source = PostgreSQLSource(
name="transactions_source",
query="""
SELECT
customer_id,
avg_transaction_amount_30d,
transaction_count_7d,
max_transaction_amount_24h,
unique_merchants_30d,
avg_time_between_transactions,
event_timestamp,
created_timestamp
FROM feature_transactions
""",
timestamp_field="event_timestamp",
created_timestamp_column="created_timestamp",
)
# Feature View
customer_transaction_features = FeatureView(
name="customer_transaction_features",
entities=[customer],
ttl=timedelta(days=90),
schema=[
Feature(name="avg_transaction_amount_30d", dtype=ValueType.FLOAT),
Feature(name="transaction_count_7d", dtype=ValueType.INT64),
Feature(name="max_transaction_amount_24h", dtype=ValueType.FLOAT),
Feature(name="unique_merchants_30d", dtype=ValueType.INT64),
Feature(name="avg_time_between_transactions", dtype=ValueType.FLOAT),
],
source=transactions_source,
online=True, # Disponible pour l'inférence temps réel
tags={"team": "fraud-detection", "version": "2.0"}
)# Utilisation pour l'entraînement (batch)
from feast import FeatureStore
import pandas as pd
store = FeatureStore(repo_path="feature_repo/")
# Créer un DataFrame d'entités avec les timestamps souhaités
entity_df = pd.DataFrame({
"customer_id": ["C001", "C002", "C003"],
"event_timestamp": pd.to_datetime(["2025-12-01", "2025-12-01", "2025-12-01"])
})
# Récupérer les features historiques (point-in-time correct)
training_df = store.get_historical_features(
entity_df=entity_df,
features=[
"customer_transaction_features:avg_transaction_amount_30d",
"customer_transaction_features:transaction_count_7d",
"customer_transaction_features:max_transaction_amount_24h",
]
).to_df()
# Utilisation pour l'inférence (temps réel)
online_features = store.get_online_features(
features=[
"customer_transaction_features:avg_transaction_amount_30d",
"customer_transaction_features:transaction_count_7d",
],
entity_rows=[{"customer_id": "C001"}]
).to_dict()CI/CD pour le Machine Learning
Les spécificités du CI/CD ML
Le CI/CD pour le ML est plus complexe que le CI/CD logiciel classique car il y a trois artefacts à tester et déployer (au lieu d’un seul) : le code, les données et le modèle. Chacun a ses propres critères de qualité et ses propres pipelines de validation.
Pipeline GitHub Actions complet
# .github/workflows/ml-pipeline.yml
name: ML Pipeline CI/CD
on:
push:
branches: [main, develop]
paths:
- 'src/**'
- 'data/**'
- 'params.yaml'
- 'dvc.yaml'
pull_request:
branches: [main]
workflow_dispatch:
inputs:
force_retrain:
description: 'Force model retraining'
required: false
default: 'false'
env:
MLFLOW_TRACKING_URI: ${{ secrets.MLFLOW_TRACKING_URI }}
AWS_ACCESS_KEY_ID: ${{ secrets.AWS_ACCESS_KEY_ID }}
AWS_SECRET_ACCESS_KEY: ${{ secrets.AWS_SECRET_ACCESS_KEY }}
jobs:
# Job 1 : Tests du code
code-quality:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Set up Python
uses: actions/setup-python@v5
with:
python-version: '3.11'
cache: 'pip'
- name: Install dependencies
run: |
pip install -r requirements.txt
pip install -r requirements-dev.txt
- name: Lint
run: |
ruff check src/ tests/
mypy src/ --ignore-missing-imports
- name: Unit tests
run: pytest tests/unit/ -v --cov=src --cov-report=xml
- name: Upload coverage
uses: codecov/codecov-action@v3
# Job 2 : Validation des données
data-validation:
runs-on: ubuntu-latest
needs: code-quality
steps:
- uses: actions/checkout@v4
- name: Set up Python
uses: actions/setup-python@v5
with:
python-version: '3.11'
cache: 'pip'
- name: Install dependencies
run: pip install -r requirements.txt
- name: Pull data from DVC
run: |
pip install dvc[s3]
dvc pull data/processed/
- name: Validate data schema
run: python src/validate_data.py
- name: Check data quality
run: python src/check_data_quality.py
- name: Great Expectations validation
run: python src/run_expectations.py
# Job 3 : Entraînement et évaluation
train-and-evaluate:
runs-on: ubuntu-latest
needs: data-validation
if: |
github.event_name == 'push' ||
github.event.inputs.force_retrain == 'true'
steps:
- uses: actions/checkout@v4
- name: Set up Python
uses: actions/setup-python@v5
with:
python-version: '3.11'
cache: 'pip'
- name: Install dependencies
run: pip install -r requirements.txt
- name: Pull data
run: dvc pull
- name: Run DVC pipeline
run: dvc repro
- name: Evaluate model
run: python src/evaluate.py --compare-with-production
- name: Upload metrics
uses: actions/upload-artifact@v4
with:
name: model-metrics
path: metrics/
# Job 4 : Déploiement (uniquement sur main)
deploy:
runs-on: ubuntu-latest
needs: train-and-evaluate
if: github.ref == 'refs/heads/main'
environment: production
steps:
- uses: actions/checkout@v4
- name: Set up Python
uses: actions/setup-python@v5
with:
python-version: '3.11'
- name: Install dependencies
run: pip install -r requirements.txt
- name: Promote model to production
run: python src/promote_model.py
- name: Build Docker image
run: |
docker build -t fraud-detector:${{ github.sha }} .
docker tag fraud-detector:${{ github.sha }} \
${{ secrets.ECR_REGISTRY }}/fraud-detector:latest
- name: Push to ECR
run: |
aws ecr get-login-password --region eu-west-1 | \
docker login --username AWS --password-stdin \
${{ secrets.ECR_REGISTRY }}
docker push ${{ secrets.ECR_REGISTRY }}/fraud-detector:latest
- name: Deploy to ECS
run: |
aws ecs update-service \
--cluster ml-production \
--service fraud-detector \
--force-new-deploymentTests spécifiques au ML
Les tests ML vont au-delà des tests unitaires classiques. Voici les catégories de tests essentiels :
# tests/test_model.py
import pytest
import numpy as np
import pandas as pd
from src.train import train_model
from src.evaluate import evaluate_model
class TestModelQuality:
"""Tests de qualité du modèle."""
def test_minimum_accuracy(self, trained_model, test_data):
"""Le modèle doit atteindre un seuil minimum de performance."""
X_test, y_test = test_data
accuracy = trained_model.score(X_test, y_test)
assert accuracy >= 0.85, f"Accuracy {accuracy:.3f} below threshold 0.85"
def test_minimum_f1_score(self, trained_model, test_data):
"""Le F1-score doit être suffisant pour la classe minoritaire."""
from sklearn.metrics import f1_score
X_test, y_test = test_data
y_pred = trained_model.predict(X_test)
f1 = f1_score(y_test, y_pred)
assert f1 >= 0.70, f"F1 score {f1:.3f} below threshold 0.70"
def test_no_regression_vs_production(self, trained_model, production_model, test_data):
"""Le nouveau modèle ne doit pas être pire que le modèle en production."""
X_test, y_test = test_data
new_f1 = f1_score(y_test, trained_model.predict(X_test))
prod_f1 = f1_score(y_test, production_model.predict(X_test))
assert new_f1 >= prod_f1 * 0.95, (
f"New model F1 ({new_f1:.3f}) is more than 5% worse "
f"than production ({prod_f1:.3f})"
)
def test_prediction_latency(self, trained_model, test_data):
"""L'inférence doit être suffisamment rapide."""
import time
X_test, _ = test_data
single_sample = X_test.iloc[[0]]
start = time.perf_counter()
for _ in range(1000):
trained_model.predict(single_sample)
elapsed = (time.perf_counter() - start) / 1000
assert elapsed < 0.01, f"Prediction latency {elapsed*1000:.1f}ms exceeds 10ms"
def test_model_fairness(self, trained_model, test_data_with_demographics):
"""Le modèle ne doit pas être discriminatoire."""
X_test, y_test, demographics = test_data_with_demographics
y_pred = trained_model.predict(X_test)
for group in demographics["gender"].unique():
mask = demographics["gender"] == group
group_accuracy = accuracy_score(y_test[mask], y_pred[mask])
overall_accuracy = accuracy_score(y_test, y_pred)
ratio = group_accuracy / overall_accuracy
assert 0.8 <= ratio <= 1.2, (
f"Fairness violation for {group}: ratio={ratio:.3f}"
)
class TestDataIntegrity:
"""Tests d'intégrité des données."""
def test_no_data_leakage(self, train_data, test_data):
"""Pas de fuite de données entre train et test."""
train_ids = set(train_data[0].index)
test_ids = set(test_data[0].index)
overlap = train_ids & test_ids
assert len(overlap) == 0, f"Data leakage: {len(overlap)} shared samples"
def test_feature_distributions(self, train_data, test_data):
"""Les distributions des features doivent être similaires."""
from scipy import stats
X_train, _ = train_data
X_test, _ = test_data
for col in X_train.select_dtypes(include=[np.number]).columns:
stat, p_value = stats.ks_2samp(X_train[col], X_test[col])
assert p_value > 0.01, (
f"Distribution shift detected in {col}: KS p-value={p_value:.4f}"
)Monitoring et détection de drift avec Evidently
Pourquoi monitorer un modèle en production ?
Un modèle ML en production se dégrade inévitablement avec le temps. Les données changent (data drift), la relation entre les features et la cible évolue (concept drift), et les patterns que le modèle a appris deviennent obsolètes. Sans monitoring, vous ne découvrez ce problème que lorsqu’un utilisateur se plaint — et à ce moment-là, le modèle a peut-être pris des milliers de mauvaises décisions.
Evidently : monitoring open source
Evidently est une bibliothèque Python open source pour le monitoring de modèles ML. Elle permet de détecter le data drift, le concept drift, et de générer des rapports visuels détaillés.
# src/monitoring/drift_detector.py
import pandas as pd
from evidently import ColumnMapping
from evidently.report import Report
from evidently.metric_preset import (
DataDriftPreset,
DataQualityPreset,
TargetDriftPreset,
ClassificationPreset
)
from evidently.test_suite import TestSuite
from evidently.tests import (
TestNumberOfColumnsWithMissingValues,
TestNumberOfRowsWithMissingValues,
TestNumberOfDriftedColumns,
TestShareOfDriftedColumns,
TestColumnDrift,
)
import json
from datetime import datetime
class ModelMonitor:
"""Moniteur de modèle ML en production."""
def __init__(self, reference_data: pd.DataFrame, column_mapping: ColumnMapping):
self.reference_data = reference_data
self.column_mapping = column_mapping
def check_data_drift(self, current_data: pd.DataFrame) -> dict:
"""Vérifie le data drift entre les données de référence et actuelles."""
report = Report(metrics=[
DataDriftPreset(),
DataQualityPreset(),
])
report.run(
reference_data=self.reference_data,
current_data=current_data,
column_mapping=self.column_mapping
)
# Extraire les résultats
result = report.as_dict()
drift_detected = result["metrics"][0]["result"]["dataset_drift"]
drift_share = result["metrics"][0]["result"]["share_of_drifted_columns"]
# Sauvegarder le rapport HTML
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
report.save_html(f"reports/drift_report_{timestamp}.html")
return {
"drift_detected": drift_detected,
"drift_share": drift_share,
"timestamp": timestamp,
"n_drifted_columns": result["metrics"][0]["result"]["number_of_drifted_columns"],
"details": result
}
def run_test_suite(self, current_data: pd.DataFrame) -> dict:
"""Exécute une suite de tests automatisés sur les données."""
tests = TestSuite(tests=[
TestNumberOfColumnsWithMissingValues(lte=2),
TestNumberOfRowsWithMissingValues(lte=current_data.shape[0] * 0.05),
TestNumberOfDriftedColumns(lte=3),
TestShareOfDriftedColumns(lte=0.3),
])
tests.run(
reference_data=self.reference_data,
current_data=current_data,
column_mapping=self.column_mapping
)
result = tests.as_dict()
all_passed = all(
test["status"] == "SUCCESS"
for test in result["tests"]
)
return {
"all_passed": all_passed,
"tests": result["tests"]
}
def check_prediction_drift(
self,
reference_predictions: pd.Series,
current_predictions: pd.Series
) -> dict:
"""Vérifie si la distribution des prédictions a changé."""
from scipy import stats
stat, p_value = stats.ks_2samp(
reference_predictions,
current_predictions
)
drift_detected = p_value < 0.05
return {
"prediction_drift_detected": drift_detected,
"ks_statistic": float(stat),
"p_value": float(p_value),
"reference_mean": float(reference_predictions.mean()),
"current_mean": float(current_predictions.mean()),
}
# Script de monitoring périodique (à exécuter via cron ou Airflow)
def run_monitoring():
"""Exécute le monitoring complet et alerte si nécessaire."""
import mlflow
# Charger les données de référence (données d'entraînement)
reference = pd.read_parquet("data/reference/training_data.parquet")
# Charger les données récentes de production
current = pd.read_parquet("data/production/last_24h.parquet")
column_mapping = ColumnMapping(
target="is_fraud",
prediction="prediction",
numerical_features=[
"amount", "hour_of_day", "distance_from_home",
"transaction_frequency_1h", "avg_amount_30d"
],
categorical_features=[
"merchant_category", "day_of_week"
]
)
monitor = ModelMonitor(reference, column_mapping)
# Vérifier le data drift
drift_result = monitor.check_data_drift(current)
# Exécuter les tests
test_result = monitor.run_test_suite(current)
# Logger les résultats dans MLflow
with mlflow.start_run(run_name=f"monitoring-{datetime.now().isoformat()}"):
mlflow.log_metric("drift_share", drift_result["drift_share"])
mlflow.log_metric("n_drifted_columns", drift_result["n_drifted_columns"])
mlflow.log_metric("tests_passed", int(test_result["all_passed"]))
# Alerter si nécessaire
if drift_result["drift_detected"] or not test_result["all_passed"]:
send_alert(
message=f"⚠️ Model monitoring alert!\n"
f"Drift detected: {drift_result['drift_detected']}\n"
f"Tests passed: {test_result['all_passed']}\n"
f"Drifted columns: {drift_result['n_drifted_columns']}",
channel="ml-alerts"
)
# Déclencher un ré-entraînement automatique si le drift est sévère
if drift_result["drift_share"] > 0.5:
trigger_retraining_pipeline()
if __name__ == "__main__":
run_monitoring()Infrastructure et conteneurisation avec Docker
Dockerfile pour le service de prédiction
# Dockerfile
FROM python:3.11-slim AS base
# Metadata
LABEL maintainer="ml-team@example.com"
LABEL description="Fraud detection model serving"
# Security: non-root user
RUN groupadd -r mluser && useradd -r -g mluser mluser
# System dependencies
RUN apt-get update && apt-get install -y --no-install-recommends \
curl \
&& rm -rf /var/lib/apt/lists/*
# Python dependencies
COPY requirements-serve.txt /tmp/
RUN pip install --no-cache-dir -r /tmp/requirements-serve.txt
# Application code
WORKDIR /app
COPY src/serve/ ./serve/
COPY src/monitoring/ ./monitoring/
# Health check
HEALTHCHECK --interval=30s --timeout=10s --retries=3 \
CMD curl -f http://localhost:8080/health || exit 1
# Switch to non-root user
USER mluser
# Expose port
EXPOSE 8080
# Start the server
CMD ["uvicorn", "serve.api:app", "--host", "0.0.0.0", "--port", "8080", "--workers", "4"]API de serving avec FastAPI
# src/serve/api.py
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel, Field
import mlflow.pyfunc
import pandas as pd
import numpy as np
import time
import logging
from typing import List, Optional
from contextlib import asynccontextmanager
logger = logging.getLogger(__name__)
# Chargement du modèle au démarrage
model = None
@asynccontextmanager
async def lifespan(app: FastAPI):
global model
logger.info("Loading model from MLflow registry...")
model = mlflow.pyfunc.load_model("models:/fraud-detector/Production")
logger.info("Model loaded successfully")
yield
logger.info("Shutting down...")
app = FastAPI(
title="Fraud Detection API",
version="2.0.0",
lifespan=lifespan
)
class Transaction(BaseModel):
amount: float = Field(..., gt=0, description="Transaction amount")
hour_of_day: int = Field(..., ge=0, le=23)
day_of_week: int = Field(..., ge=0, le=6)
merchant_category: str
distance_from_home: float = Field(..., ge=0)
transaction_frequency_1h: int = Field(..., ge=0)
avg_amount_30d: float = Field(..., ge=0)
class PredictionResponse(BaseModel):
is_fraud: bool
fraud_probability: float
model_version: str
latency_ms: float
class BatchPredictionRequest(BaseModel):
transactions: List[Transaction]
@app.get("/health")
async def health():
return {"status": "healthy", "model_loaded": model is not None}
@app.post("/predict", response_model=PredictionResponse)
async def predict(transaction: Transaction):
if model is None:
raise HTTPException(status_code=503, detail="Model not loaded")
start = time.perf_counter()
# Préparer les données
input_df = pd.DataFrame([transaction.model_dump()])
# Prédiction
prediction = model.predict(input_df)
proba = model._model_impl.predict_proba(input_df)[:, 1]
latency = (time.perf_counter() - start) * 1000
# Logger pour le monitoring
logger.info(
f"Prediction: fraud={bool(prediction[0])}, "
f"proba={float(proba[0]):.4f}, "
f"latency={latency:.1f}ms"
)
return PredictionResponse(
is_fraud=bool(prediction[0]),
fraud_probability=float(proba[0]),
model_version="2.0.0",
latency_ms=round(latency, 2)
)
@app.post("/predict/batch")
async def predict_batch(request: BatchPredictionRequest):
if model is None:
raise HTTPException(status_code=503, detail="Model not loaded")
start = time.perf_counter()
input_df = pd.DataFrame([t.model_dump() for t in request.transactions])
predictions = model.predict(input_df)
probas = model._model_impl.predict_proba(input_df)[:, 1]
latency = (time.perf_counter() - start) * 1000
results = [
{
"index": i,
"is_fraud": bool(pred),
"fraud_probability": float(proba)
}
for i, (pred, proba) in enumerate(zip(predictions, probas))
]
return {
"predictions": results,
"count": len(results),
"latency_ms": round(latency, 2)
}Docker Compose pour l’environnement complet
# docker-compose.yml
version: '3.8'
services:
mlflow:
image: ghcr.io/mlflow/mlflow:2.10.0
ports:
- "5000:5000"
environment:
- MLFLOW_BACKEND_STORE_URI=postgresql://mlflow:mlflow@postgres:5432/mlflow
- MLFLOW_DEFAULT_ARTIFACT_ROOT=s3://mlflow-artifacts/
- AWS_ACCESS_KEY_ID=${AWS_ACCESS_KEY_ID}
- AWS_SECRET_ACCESS_KEY=${AWS_SECRET_ACCESS_KEY}
command: >
mlflow server
--host 0.0.0.0
--port 5000
depends_on:
- postgres
postgres:
image: postgres:16
environment:
POSTGRES_DB: mlflow
POSTGRES_USER: mlflow
POSTGRES_PASSWORD: mlflow
volumes:
- postgres_data:/var/lib/postgresql/data
ports:
- "5432:5432"
model-api:
build: .
ports:
- "8080:8080"
environment:
- MLFLOW_TRACKING_URI=http://mlflow:5000
depends_on:
- mlflow
deploy:
resources:
limits:
cpus: '2'
memory: 4G
monitoring:
build:
context: .
dockerfile: Dockerfile.monitoring
environment:
- MLFLOW_TRACKING_URI=http://mlflow:5000
- ALERT_WEBHOOK=${ALERT_WEBHOOK}
depends_on:
- mlflow
- postgres
volumes:
postgres_data:Pipeline complet : tout assembler
Maintenant que nous avons vu chaque composant individuellement, voici comment ils s’articulent dans un workflow réel de bout en bout :
Workflow quotidien type
- Ingestion : les nouvelles données de transactions arrivent dans la base de données source (via un pipeline ETL/ELT classique).
- Feature Engineering : Feast calcule les nouvelles features et met à jour le Feature Store (batch job quotidien).
- Monitoring : Evidently compare les données du jour avec les données de référence. Si un drift significatif est détecté, une alerte est envoyée et un ré-entraînement peut être déclenché automatiquement.
- Ré-entraînement (si nécessaire) : DVC récupère les données les plus récentes, le pipeline d’entraînement est exécuté (
dvc repro), les résultats sont trackés dans MLflow. - Validation : la suite de tests ML vérifie que le nouveau modèle est au moins aussi bon que le modèle en production.
- Déploiement : si les tests passent, le modèle est promu en production dans le Model Registry, une nouvelle image Docker est construite et déployée.
- Serving : l’API FastAPI sert les prédictions en temps réel, avec logging de chaque prédiction pour le monitoring futur.
# src/orchestrate.py — Script d'orchestration principal
"""
Orchestrateur du pipeline MLOps complet.
Peut être déclenché par cron, Airflow, ou GitHub Actions.
"""
import subprocess
import sys
import json
import logging
from datetime import datetime, timedelta
from pathlib import Path
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("mlops-orchestrator")
def run_step(name: str, command: str) -> bool:
"""Exécute une étape du pipeline et retourne True si succès."""
logger.info(f"▶ Starting step: {name}")
start = datetime.now()
result = subprocess.run(
command, shell=True, capture_output=True, text=True
)
duration = (datetime.now() - start).total_seconds()
if result.returncode == 0:
logger.info(f"✅ {name} completed in {duration:.1f}s")
return True
else:
logger.error(f"❌ {name} failed after {duration:.1f}s")
logger.error(f"STDERR: {result.stderr}")
return False
def main():
pipeline_start = datetime.now()
logger.info(f"🚀 Pipeline started at {pipeline_start.isoformat()}")
steps = [
("Pull latest data", "dvc pull"),
("Validate data quality", "python src/validate_data.py"),
("Check for drift", "python src/monitoring/drift_detector.py"),
("Run training pipeline", "dvc repro"),
("Run model tests", "pytest tests/test_model.py -v"),
("Evaluate vs production", "python src/evaluate.py --compare-prod"),
]
for name, command in steps:
if not run_step(name, command):
logger.error(f"Pipeline failed at step: {name}")
send_notification(f"❌ MLOps pipeline failed at: {name}")
sys.exit(1)
# Si on est sur la branche main, déployer
if is_main_branch():
deploy_steps = [
("Promote model", "python src/promote_model.py"),
("Build Docker image", "docker build -t fraud-detector:latest ."),
("Deploy", "python src/deploy.py"),
]
for name, command in deploy_steps:
if not run_step(name, command):
logger.error(f"Deployment failed at step: {name}")
send_notification(f"❌ Deployment failed at: {name}")
sys.exit(1)
duration = (datetime.now() - pipeline_start).total_seconds()
logger.info(f"🎉 Pipeline completed successfully in {duration:.1f}s")
send_notification(f"✅ MLOps pipeline completed in {duration/60:.1f} min")
if __name__ == "__main__":
main()Bonnes pratiques et anti-patterns
Les bonnes pratiques essentielles
- Tout versionner : code (Git), données (DVC), modèles (MLflow), configurations (Git), infrastructure (Terraform/Pulumi). Si ce n’est pas versionné, c’est perdu.
- Automatiser dès le début : ne pas attendre d’avoir le « modèle parfait » pour mettre en place le MLOps. Commencez avec un modèle simple et un pipeline robuste — c’est infiniment plus valuable qu’un modèle complexe dans un notebook.
- Monitorer tout : pas seulement la performance du modèle, mais aussi la latence, le volume de requêtes, la distribution des données d’entrée, le taux d’erreur de l’API.
- Tester comme en logiciel classique : tests unitaires, tests d’intégration, tests de performance. Plus des tests spécifiques ML (qualité du modèle, fairness, non-régression).
- Documenter les décisions : pourquoi tel algorithme a été choisi, quels compromis ont été faits, quelles hypothèses sont faites sur les données. Utilisez les tags et les notes de MLflow pour cela.
Les anti-patterns à éviter
1. Le « notebook en production » : exécuter un notebook Jupyter en production via cron. Fragile, non testable, non reproductible. À bannir absolument.
2. Le « pickle artisanal » : sauvegarder un modèle avec pickle et le copier manuellement sur le serveur de production. Aucune traçabilité, aucune gestion de version.
3. Le « training-serving skew silencieux » : utiliser un code de feature engineering différent entre l’entraînement et l’inférence. Le modèle fonctionne bien en test mais produit des résultats incohérents en production.
4. Le « monitoring par l’absence de plaintes » : considérer que le modèle fonctionne bien parce que personne ne s’est plaint. Le drift peut dégrader silencieusement les performances pendant des mois.
5. Le « one-shot training » : entraîner le modèle une fois et ne jamais le ré-entraîner. Les données changent, le monde change, le modèle doit évoluer.
Structure de projet recommandée
my-ml-project/
├── .github/
│ └── workflows/
│ └── ml-pipeline.yml # CI/CD
├── data/
│ ├── raw/ # Données brutes (DVC-tracked)
│ ├── processed/ # Données transformées (DVC-tracked)
│ ├── features/ # Features calculées (DVC-tracked)
│ └── reference/ # Données de référence pour monitoring
├── feature_repo/ # Définitions Feast
│ ├── feature_definitions.py
│ └── feature_store.yaml
├── models/ # Modèles locaux (DVC-tracked)
├── metrics/ # Métriques d'évaluation
├── reports/ # Rapports Evidently
├── src/
│ ├── prepare.py # Préparation des données
│ ├── featurize.py # Feature engineering
│ ├── train.py # Entraînement
│ ├── evaluate.py # Évaluation
│ ├── promote_model.py # Promotion dans le registry
│ ├── validate_data.py # Validation des données
│ ├── serve/
│ │ └── api.py # API FastAPI
│ └── monitoring/
│ └── drift_detector.py # Monitoring Evidently
├── tests/
│ ├── unit/ # Tests unitaires
│ ├── integration/ # Tests d'intégration
│ └── test_model.py # Tests ML
├── notebooks/ # Exploration (NON en production)
├── dvc.yaml # Pipeline DVC
├── dvc.lock # Lock file DVC
├── params.yaml # Hyperparamètres
├── Dockerfile # Image de serving
├── docker-compose.yml # Stack locale
├── requirements.txt # Dépendances
├── requirements-dev.txt # Dépendances de développement
└── README.mdConclusion
Construire un pipeline MLOps complet n’est pas un projet trivial. C’est un investissement significatif en temps et en complexité. Mais c’est un investissement qui paie — et qui paie massivement. Les équipes qui adoptent le MLOps passent moins de temps à combattre des incendies en production, plus de temps à améliorer leurs modèles, et livrent de la valeur de manière beaucoup plus prévisible et fiable.
Les outils que nous avons présentés — MLflow, DVC, Feast, Evidently, Docker, GitHub Actions — forment un stack cohérent et mature pour construire un pipeline MLOps de niveau production. Ils sont tous open source, bien documentés, et activement maintenus par des communautés importantes.
Mais les outils ne sont que la moitié de l’équation. L’autre moitié, c’est la culture. Le MLOps est autant une question d’organisation et de processus que de technologie. Les équipes qui réussissent le mieux sont celles qui traitent le ML avec la même rigueur que le logiciel classique : revues de code (et de modèles), tests automatisés, déploiement continu, monitoring proactif, documentation systématique.
Le message final est simple : ne commencez pas par un modèle parfait dans un notebook. Commencez par un modèle simple dans un pipeline robuste, puis itérez. Le MLOps n’est pas la cerise sur le gâteau — c’est le gâteau. Le modèle n’en est que la garniture.
« Le plus grand défi du machine learning n’est pas de construire un bon modèle — c’est de le maintenir en production. Le MLOps n’est pas optionnel, c’est existentiel. Sans lui, votre modèle est un prototype qui se fait passer pour un produit. » — Chip Huyen, auteure de « Designing Machine Learning Systems »
Sources
- Google Cloud, « MLOps: Continuous delivery and automation pipelines in machine learning », Architecture documentation, 2025.
- Chip Huyen, « Designing Machine Learning Systems », O’Reilly Media, 2022 (2nd edition 2025).
- MLflow Documentation, « MLflow: A Machine Learning Lifecycle Platform », mlflow.org, 2025.
- DVC Documentation, « Data Version Control: Git for Data & Models », dvc.org, 2025.
- Evidently AI, « ML Monitoring in Production: Best Practices », evidently.ai blog, 2025.
- Feast Documentation, « Feast: Feature Store for Machine Learning », feast.dev, 2025.
- Gartner, « Top Strategic Technology Trends 2026: AI Engineering », décembre 2025.
- Sculley et al., « Hidden Technical Debt in Machine Learning Systems », NeurIPS 2015 (toujours fondamental en 2026).