Construire un pipeline MLOps complet : du prototype à la production

Niveau du tutoriel : Expert

Sommaire

💡 Prérequis
Cet article est de niveau avancé. Il suppose une connaissance solide de Python, des bases du Machine Learning (entraînement, validation, déploiement), de Git, et une familiarité avec Docker et les environnements cloud. Si vous débutez en ML, nous vous recommandons de commencer par nos articles d’introduction au Machine Learning avant de continuer ici.

Introduction : pourquoi le MLOps est devenu indispensable

Vous avez entraîné un modèle de Machine Learning dans un notebook Jupyter. Il fonctionne brillamment sur vos données de test. Votre démonstration a impressionné les stakeholders. Félicitations — vous avez fait 10 % du travail.

Les 90 % restants ? Mettre ce modèle en production, le maintenir, le monitorer, le ré-entraîner quand les données changent, gérer les versions, automatiser le pipeline, assurer la reproductibilité, documenter le tout. C’est ce que l’on appelle le MLOps (Machine Learning Operations), et c’est le gouffre dans lequel 87 % des projets de Machine Learning tombent et meurent, selon Gartner.

Le MLOps est né de la convergence de trois disciplines : le Machine Learning (modélisation, entraînement, évaluation), le DevOps (CI/CD, infrastructure as code, monitoring) et le Data Engineering (pipelines de données, qualité, gouvernance). L’objectif est simple à énoncer mais complexe à réaliser : rendre le cycle de vie complet d’un modèle ML aussi fiable, reproductible et automatisé que le cycle de vie d’une application logicielle classique.

Dans cet article, nous allons construire ensemble un pipeline MLOps complet, en utilisant les outils les plus matures de l’écosystème : MLflow pour le tracking d’expériences et le model registry, DVC pour le versioning des données, Evidently pour le monitoring de drift, Docker pour la conteneurisation, et GitHub Actions pour le CI/CD. À la fin de cet article, vous aurez une architecture de référence que vous pourrez adapter à vos propres projets.

Les fondamentaux du MLOps

Le cycle de vie d’un modèle ML

Un modèle ML en production passe par plusieurs phases, chacune avec ses propres défis :

  1. Exploration et prototypage : expérimentation rapide avec différents algorithmes, features et hyperparamètres. C’est la phase « notebook », souvent chaotique mais créative.
  2. Développement et validation : formalisation du pipeline d’entraînement, tests rigoureux, validation sur des données représentatives, documentation.
  3. Déploiement : mise en production du modèle, intégration aux systèmes existants, tests de charge, rollout progressif.
  4. Monitoring : surveillance des performances en production, détection de drift, alertes automatiques.
  5. Ré-entraînement : quand les performances se dégradent ou que de nouvelles données sont disponibles, le modèle doit être ré-entraîné, validé et redéployé.

Le MLOps vise à automatiser et fiabiliser chacune de ces phases, et surtout les transitions entre elles. Sans MLOps, ces transitions sont manuelles, fragiles et non reproductibles — exactement les conditions qui font échouer les projets ML.

Les niveaux de maturité MLOps

Google a proposé une classification en trois niveaux de maturité MLOps qui est devenue une référence dans l’industrie :

Dans cet article, nous allons construire un système de Niveau 2. C’est ambitieux, mais c’est la bonne cible — les équipes qui s’arrêtent au Niveau 0 finissent presque toujours par accumuler de la dette technique ML qui devient ingérable.

Architecture complète d’un pipeline MLOps

Voici l’architecture de référence que nous allons construire. Chaque composant sera détaillé dans les sections suivantes.

┌─────────────────────────────────────────────────────────────────┐
│                    ARCHITECTURE MLOPS COMPLÈTE                   │
├─────────────────────────────────────────────────────────────────┤
│                                                                   │
│  ┌──────────┐    ┌──────────┐    ┌──────────┐    ┌──────────┐   │
│  │  Source   │───▶│  Feature │───▶│Training  │───▶│  Model   │   │
│  │  Data     │    │  Store   │    │Pipeline  │    │ Registry │   │
│  │  (DVC)    │    │ (Feast)  │    │(MLflow)  │    │(MLflow)  │   │
│  └──────────┘    └──────────┘    └──────────┘    └──────────┘   │
│       │                                               │          │
│       │              ┌──────────┐                     │          │
│       │              │  CI/CD   │                     │          │
│       └─────────────▶│ (GitHub  │◀────────────────────┘          │
│                      │ Actions) │                                │
│                      └────┬─────┘                                │
│                           │                                      │
│                      ┌────▼─────┐    ┌──────────┐               │
│                      │  Deploy  │───▶│Monitoring│               │
│                      │ (Docker/ │    │(Evidently│               │
│                      │  K8s)    │    │ + Alerts)│               │
│                      └──────────┘    └──────────┘               │
│                                                                   │
└─────────────────────────────────────────────────────────────────┘

Les composants principaux :

MLflow : tracking d’expériences et model registry

Pourquoi MLflow ?

MLflow est le standard de facto pour le tracking d’expériences ML. Créé par Databricks et open source, il résout un problème fondamental : comment garder une trace de toutes les expériences (hyperparamètres, métriques, artefacts, code) de manière organisée et requêtable ? Sans un tel outil, les data scientists finissent invariablement par nommer leurs modèles model_v2_final_FINAL_vraimentfinal.pkl — et tout le monde sait comment cette histoire se termine.

Installation et configuration

# Installation
pip install mlflow[extras]

# Lancer le serveur MLflow avec un backend PostgreSQL
mlflow server \
  --backend-store-uri postgresql://user:pass@localhost:5432/mlflow \
  --default-artifact-root s3://my-bucket/mlflow-artifacts \
  --host 0.0.0.0 \
  --port 5000
⚠️ Configuration production
En production, ne stockez jamais les artefacts sur le filesystem local du serveur MLflow. Utilisez un stockage objet (S3, GCS, Azure Blob) pour la durabilité et la scalabilité. Pour le backend store, utilisez PostgreSQL ou MySQL — SQLite convient pour le développement mais pas pour la production multi-utilisateurs.

Tracking d’expériences

Le tracking est le cœur de MLflow. Chaque « run » capture l’ensemble du contexte d’une expérience :

import mlflow
import mlflow.sklearn
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import accuracy_score, f1_score, precision_score, recall_score
from sklearn.model_selection import train_test_split
import pandas as pd

# Configuration du tracking server
mlflow.set_tracking_uri("http://localhost:5000")
mlflow.set_experiment("fraud-detection-v2")

# Chargement des données
df = pd.read_parquet("data/processed/transactions.parquet")
X = df.drop(columns=["is_fraud"])
y = df["is_fraud"]
X_train, X_test, y_train, y_test = train_test_split(
    X, y, test_size=0.2, random_state=42, stratify=y
)

# Entraînement avec tracking MLflow
with mlflow.start_run(run_name="rf-baseline") as run:
    # Log des paramètres
    params = {
        "n_estimators": 200,
        "max_depth": 15,
        "min_samples_split": 5,
        "class_weight": "balanced",
        "random_state": 42
    }
    mlflow.log_params(params)
    
    # Log des métadonnées du dataset
    mlflow.log_param("dataset_size", len(df))
    mlflow.log_param("feature_count", X.shape[1])
    mlflow.log_param("fraud_ratio", y.mean())
    
    # Entraînement
    model = RandomForestClassifier(**params)
    model.fit(X_train, y_train)
    
    # Évaluation
    y_pred = model.predict(X_test)
    metrics = {
        "accuracy": accuracy_score(y_test, y_pred),
        "f1_score": f1_score(y_test, y_pred),
        "precision": precision_score(y_test, y_pred),
        "recall": recall_score(y_test, y_pred),
    }
    mlflow.log_metrics(metrics)
    
    # Log du modèle avec signature
    from mlflow.models import infer_signature
    signature = infer_signature(X_train, model.predict(X_train))
    mlflow.sklearn.log_model(
        model,
        "model",
        signature=signature,
        registered_model_name="fraud-detector"
    )
    
    # Log d'artefacts supplémentaires
    # Feature importance plot
    import matplotlib.pyplot as plt
    import numpy as np
    
    importances = model.feature_importances_
    indices = np.argsort(importances)[-20:]
    
    fig, ax = plt.subplots(figsize=(10, 8))
    ax.barh(range(len(indices)), importances[indices])
    ax.set_yticks(range(len(indices)))
    ax.set_yticklabels(X.columns[indices])
    ax.set_title("Top 20 Feature Importances")
    fig.tight_layout()
    fig.savefig("feature_importance.png", dpi=150)
    mlflow.log_artifact("feature_importance.png")
    
    print(f"Run ID: {run.info.run_id}")
    print(f"Metrics: {metrics}")

Model Registry

Le Model Registry de MLflow est l’endroit où les modèles passent du stade « expérimentation » au stade « production ». Il fournit un workflow de promotion de modèles avec des étapes (Staging, Production, Archived) et un historique complet des versions :

from mlflow import MlflowClient

client = MlflowClient()

# Récupérer la dernière version du modèle
latest_version = client.get_latest_versions(
    "fraud-detector", stages=["None"]
)[0]

# Promouvoir en staging pour validation
client.transition_model_version_stage(
    name="fraud-detector",
    version=latest_version.version,
    stage="Staging",
    archive_existing_versions=False
)

# Après validation, promouvoir en production
client.transition_model_version_stage(
    name="fraud-detector",
    version=latest_version.version,
    stage="Production",
    archive_existing_versions=True  # Archive l'ancienne version prod
)

# Charger le modèle de production pour l'inférence
import mlflow.pyfunc

model = mlflow.pyfunc.load_model("models:/fraud-detector/Production")
predictions = model.predict(new_data)

Comparaison d’expériences

L’un des grands avantages de MLflow est la possibilité de comparer systématiquement les expériences. Voici un script utilitaire pour comparer les N meilleurs runs d’une expérience :

import mlflow
import pandas as pd

# Rechercher les meilleurs runs
experiment = mlflow.get_experiment_by_name("fraud-detection-v2")
runs = mlflow.search_runs(
    experiment_ids=[experiment.experiment_id],
    order_by=["metrics.f1_score DESC"],
    max_results=10
)

# Afficher la comparaison
comparison = runs[[
    "run_id",
    "params.n_estimators",
    "params.max_depth",
    "metrics.accuracy",
    "metrics.f1_score",
    "metrics.precision",
    "metrics.recall",
    "start_time"
]]
print(comparison.to_string(index=False))

DVC : le versioning des données et des modèles

Le problème que DVC résout

Git est parfait pour versionner du code, mais il n’est pas conçu pour des fichiers volumineux (datasets, modèles entraînés, artefacts). DVC (Data Version Control) étend Git pour gérer ces fichiers lourds de manière transparente. Concrètement, DVC stocke les fichiers volumineux dans un stockage distant (S3, GCS, Azure, NFS) et ne garde dans Git que des fichiers de métadonnées légers (les fichiers .dvc) qui pointent vers les versions correctes.

Installation et initialisation

# Installation
pip install dvc[s3]  # Ajoutez le backend de votre choix

# Initialisation dans un repo Git existant
cd my-ml-project
dvc init

# Configurer le stockage distant
dvc remote add -d myremote s3://my-bucket/dvc-storage
dvc remote modify myremote region eu-west-1

# Committer la configuration
git add .dvc .dvcignore
git commit -m "Initialize DVC with S3 remote"

Versionner les données

# Ajouter un dataset au tracking DVC
dvc add data/raw/transactions_2025.csv

# DVC crée un fichier .dvc (métadonnées) et ajoute le fichier original au .gitignore
# Committer les métadonnées dans Git
git add data/raw/transactions_2025.csv.dvc data/raw/.gitignore
git commit -m "Add transactions dataset v1"

# Pousser les données vers le stockage distant
dvc push

# Quand le dataset est mis à jour, simplement :
dvc add data/raw/transactions_2025.csv
git add data/raw/transactions_2025.csv.dvc
git commit -m "Update transactions dataset v2"
dvc push

# Pour revenir à une version précédente :
git checkout HEAD~1 -- data/raw/transactions_2025.csv.dvc
dvc checkout

Pipelines DVC

DVC permet aussi de définir des pipelines reproductibles via un fichier dvc.yaml. C’est l’un de ses features les plus puissants :

# dvc.yaml
stages:
  prepare:
    cmd: python src/prepare.py
    deps:
      - src/prepare.py
      - data/raw/transactions_2025.csv
    params:
      - prepare.test_size
      - prepare.random_state
    outs:
      - data/processed/train.parquet
      - data/processed/test.parquet
    
  featurize:
    cmd: python src/featurize.py
    deps:
      - src/featurize.py
      - data/processed/train.parquet
      - data/processed/test.parquet
    params:
      - featurize.features
    outs:
      - data/features/train_features.parquet
      - data/features/test_features.parquet

  train:
    cmd: python src/train.py
    deps:
      - src/train.py
      - data/features/train_features.parquet
    params:
      - train.n_estimators
      - train.max_depth
      - train.learning_rate
    outs:
      - models/model.pkl
    metrics:
      - metrics/train_metrics.json:
          cache: false

  evaluate:
    cmd: python src/evaluate.py
    deps:
      - src/evaluate.py
      - models/model.pkl
      - data/features/test_features.parquet
    metrics:
      - metrics/eval_metrics.json:
          cache: false
    plots:
      - metrics/confusion_matrix.csv:
          x: predicted
          y: actual
      - metrics/roc_curve.csv:
          x: fpr
          y: tpr
# params.yaml — fichier de paramètres centralisé
prepare:
  test_size: 0.2
  random_state: 42

featurize:
  features:
    - amount
    - hour_of_day
    - day_of_week
    - merchant_category
    - distance_from_home
    - transaction_frequency_1h
    - avg_amount_30d

train:
  n_estimators: 200
  max_depth: 15
  learning_rate: 0.1
# Exécuter le pipeline complet
dvc repro

# DVC détecte automatiquement quelles étapes doivent être ré-exécutées
# en fonction des changements dans les dépendances

# Comparer les métriques entre branches/commits
dvc metrics diff
dvc plots diff
💡 DVC + Git = reproductibilité parfaite
L’un des patterns les plus puissants de DVC est qu’un commit Git donné pointe toujours vers les mêmes données, les mêmes paramètres et les mêmes résultats. Quand vous faites git checkout v1.2.0 && dvc checkout, vous obtenez exactement l’état du projet (code + données + modèle + métriques) tel qu’il était à cette version. C’est la reproductibilité totale.

Feature Stores : centraliser l’ingénierie des features

Pourquoi un Feature Store ?

L’ingénierie des features (la transformation des données brutes en variables prédictives pour le modèle) représente typiquement 60 à 80 % du travail d’un projet ML. Sans Feature Store, les équipes réinventent constamment la roue : chaque data scientist calcule ses propres features, souvent de manière incohérente entre l’entraînement et l’inférence (le fameux « training-serving skew »).

Un Feature Store centralise la définition, le calcul, le stockage et le service des features, garantissant la cohérence entre l’entraînement (batch) et l’inférence (temps réel).

Feast : le Feature Store open source

Feast (Feature Store) est le Feature Store open source le plus populaire. Voici comment l’intégrer dans notre pipeline :

# feature_repo/feature_definitions.py
from feast import Entity, Feature, FeatureView, ValueType
from feast.infra.offline_stores.contrib.postgres_offline_store.postgres_source import (
    PostgreSQLSource,
)
from datetime import timedelta

# Entité principale
customer = Entity(
    name="customer_id",
    value_type=ValueType.STRING,
    description="Identifiant client unique"
)

# Source de données
transactions_source = PostgreSQLSource(
    name="transactions_source",
    query="""
        SELECT 
            customer_id,
            avg_transaction_amount_30d,
            transaction_count_7d,
            max_transaction_amount_24h,
            unique_merchants_30d,
            avg_time_between_transactions,
            event_timestamp,
            created_timestamp
        FROM feature_transactions
    """,
    timestamp_field="event_timestamp",
    created_timestamp_column="created_timestamp",
)

# Feature View
customer_transaction_features = FeatureView(
    name="customer_transaction_features",
    entities=[customer],
    ttl=timedelta(days=90),
    schema=[
        Feature(name="avg_transaction_amount_30d", dtype=ValueType.FLOAT),
        Feature(name="transaction_count_7d", dtype=ValueType.INT64),
        Feature(name="max_transaction_amount_24h", dtype=ValueType.FLOAT),
        Feature(name="unique_merchants_30d", dtype=ValueType.INT64),
        Feature(name="avg_time_between_transactions", dtype=ValueType.FLOAT),
    ],
    source=transactions_source,
    online=True,  # Disponible pour l'inférence temps réel
    tags={"team": "fraud-detection", "version": "2.0"}
)
# Utilisation pour l'entraînement (batch)
from feast import FeatureStore
import pandas as pd

store = FeatureStore(repo_path="feature_repo/")

# Créer un DataFrame d'entités avec les timestamps souhaités
entity_df = pd.DataFrame({
    "customer_id": ["C001", "C002", "C003"],
    "event_timestamp": pd.to_datetime(["2025-12-01", "2025-12-01", "2025-12-01"])
})

# Récupérer les features historiques (point-in-time correct)
training_df = store.get_historical_features(
    entity_df=entity_df,
    features=[
        "customer_transaction_features:avg_transaction_amount_30d",
        "customer_transaction_features:transaction_count_7d",
        "customer_transaction_features:max_transaction_amount_24h",
    ]
).to_df()

# Utilisation pour l'inférence (temps réel)
online_features = store.get_online_features(
    features=[
        "customer_transaction_features:avg_transaction_amount_30d",
        "customer_transaction_features:transaction_count_7d",
    ],
    entity_rows=[{"customer_id": "C001"}]
).to_dict()

CI/CD pour le Machine Learning

Les spécificités du CI/CD ML

Le CI/CD pour le ML est plus complexe que le CI/CD logiciel classique car il y a trois artefacts à tester et déployer (au lieu d’un seul) : le code, les données et le modèle. Chacun a ses propres critères de qualité et ses propres pipelines de validation.

Pipeline GitHub Actions complet

# .github/workflows/ml-pipeline.yml
name: ML Pipeline CI/CD

on:
  push:
    branches: [main, develop]
    paths:
      - 'src/**'
      - 'data/**'
      - 'params.yaml'
      - 'dvc.yaml'
  pull_request:
    branches: [main]
  workflow_dispatch:
    inputs:
      force_retrain:
        description: 'Force model retraining'
        required: false
        default: 'false'

env:
  MLFLOW_TRACKING_URI: ${{ secrets.MLFLOW_TRACKING_URI }}
  AWS_ACCESS_KEY_ID: ${{ secrets.AWS_ACCESS_KEY_ID }}
  AWS_SECRET_ACCESS_KEY: ${{ secrets.AWS_SECRET_ACCESS_KEY }}

jobs:
  # Job 1 : Tests du code
  code-quality:
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v4
      
      - name: Set up Python
        uses: actions/setup-python@v5
        with:
          python-version: '3.11'
          cache: 'pip'
      
      - name: Install dependencies
        run: |
          pip install -r requirements.txt
          pip install -r requirements-dev.txt
      
      - name: Lint
        run: |
          ruff check src/ tests/
          mypy src/ --ignore-missing-imports
      
      - name: Unit tests
        run: pytest tests/unit/ -v --cov=src --cov-report=xml
      
      - name: Upload coverage
        uses: codecov/codecov-action@v3

  # Job 2 : Validation des données
  data-validation:
    runs-on: ubuntu-latest
    needs: code-quality
    steps:
      - uses: actions/checkout@v4
      
      - name: Set up Python
        uses: actions/setup-python@v5
        with:
          python-version: '3.11'
          cache: 'pip'
      
      - name: Install dependencies
        run: pip install -r requirements.txt
      
      - name: Pull data from DVC
        run: |
          pip install dvc[s3]
          dvc pull data/processed/
      
      - name: Validate data schema
        run: python src/validate_data.py
      
      - name: Check data quality
        run: python src/check_data_quality.py
      
      - name: Great Expectations validation
        run: python src/run_expectations.py

  # Job 3 : Entraînement et évaluation
  train-and-evaluate:
    runs-on: ubuntu-latest
    needs: data-validation
    if: |
      github.event_name == 'push' || 
      github.event.inputs.force_retrain == 'true'
    steps:
      - uses: actions/checkout@v4
      
      - name: Set up Python
        uses: actions/setup-python@v5
        with:
          python-version: '3.11'
          cache: 'pip'
      
      - name: Install dependencies
        run: pip install -r requirements.txt
      
      - name: Pull data
        run: dvc pull
      
      - name: Run DVC pipeline
        run: dvc repro
      
      - name: Evaluate model
        run: python src/evaluate.py --compare-with-production
      
      - name: Upload metrics
        uses: actions/upload-artifact@v4
        with:
          name: model-metrics
          path: metrics/

  # Job 4 : Déploiement (uniquement sur main)
  deploy:
    runs-on: ubuntu-latest
    needs: train-and-evaluate
    if: github.ref == 'refs/heads/main'
    environment: production
    steps:
      - uses: actions/checkout@v4
      
      - name: Set up Python
        uses: actions/setup-python@v5
        with:
          python-version: '3.11'
      
      - name: Install dependencies
        run: pip install -r requirements.txt
      
      - name: Promote model to production
        run: python src/promote_model.py
      
      - name: Build Docker image
        run: |
          docker build -t fraud-detector:${{ github.sha }} .
          docker tag fraud-detector:${{ github.sha }} \
            ${{ secrets.ECR_REGISTRY }}/fraud-detector:latest
      
      - name: Push to ECR
        run: |
          aws ecr get-login-password --region eu-west-1 | \
            docker login --username AWS --password-stdin \
            ${{ secrets.ECR_REGISTRY }}
          docker push ${{ secrets.ECR_REGISTRY }}/fraud-detector:latest
      
      - name: Deploy to ECS
        run: |
          aws ecs update-service \
            --cluster ml-production \
            --service fraud-detector \
            --force-new-deployment

Tests spécifiques au ML

Les tests ML vont au-delà des tests unitaires classiques. Voici les catégories de tests essentiels :

# tests/test_model.py
import pytest
import numpy as np
import pandas as pd
from src.train import train_model
from src.evaluate import evaluate_model

class TestModelQuality:
    """Tests de qualité du modèle."""
    
    def test_minimum_accuracy(self, trained_model, test_data):
        """Le modèle doit atteindre un seuil minimum de performance."""
        X_test, y_test = test_data
        accuracy = trained_model.score(X_test, y_test)
        assert accuracy >= 0.85, f"Accuracy {accuracy:.3f} below threshold 0.85"
    
    def test_minimum_f1_score(self, trained_model, test_data):
        """Le F1-score doit être suffisant pour la classe minoritaire."""
        from sklearn.metrics import f1_score
        X_test, y_test = test_data
        y_pred = trained_model.predict(X_test)
        f1 = f1_score(y_test, y_pred)
        assert f1 >= 0.70, f"F1 score {f1:.3f} below threshold 0.70"
    
    def test_no_regression_vs_production(self, trained_model, production_model, test_data):
        """Le nouveau modèle ne doit pas être pire que le modèle en production."""
        X_test, y_test = test_data
        new_f1 = f1_score(y_test, trained_model.predict(X_test))
        prod_f1 = f1_score(y_test, production_model.predict(X_test))
        assert new_f1 >= prod_f1 * 0.95, (
            f"New model F1 ({new_f1:.3f}) is more than 5% worse "
            f"than production ({prod_f1:.3f})"
        )
    
    def test_prediction_latency(self, trained_model, test_data):
        """L'inférence doit être suffisamment rapide."""
        import time
        X_test, _ = test_data
        single_sample = X_test.iloc[[0]]
        
        start = time.perf_counter()
        for _ in range(1000):
            trained_model.predict(single_sample)
        elapsed = (time.perf_counter() - start) / 1000
        
        assert elapsed < 0.01, f"Prediction latency {elapsed*1000:.1f}ms exceeds 10ms"
    
    def test_model_fairness(self, trained_model, test_data_with_demographics):
        """Le modèle ne doit pas être discriminatoire."""
        X_test, y_test, demographics = test_data_with_demographics
        y_pred = trained_model.predict(X_test)
        
        for group in demographics["gender"].unique():
            mask = demographics["gender"] == group
            group_accuracy = accuracy_score(y_test[mask], y_pred[mask])
            overall_accuracy = accuracy_score(y_test, y_pred)
            
            ratio = group_accuracy / overall_accuracy
            assert 0.8 <= ratio <= 1.2, (
                f"Fairness violation for {group}: ratio={ratio:.3f}"
            )


class TestDataIntegrity:
    """Tests d'intégrité des données."""
    
    def test_no_data_leakage(self, train_data, test_data):
        """Pas de fuite de données entre train et test."""
        train_ids = set(train_data[0].index)
        test_ids = set(test_data[0].index)
        overlap = train_ids & test_ids
        assert len(overlap) == 0, f"Data leakage: {len(overlap)} shared samples"
    
    def test_feature_distributions(self, train_data, test_data):
        """Les distributions des features doivent être similaires."""
        from scipy import stats
        X_train, _ = train_data
        X_test, _ = test_data
        
        for col in X_train.select_dtypes(include=[np.number]).columns:
            stat, p_value = stats.ks_2samp(X_train[col], X_test[col])
            assert p_value > 0.01, (
                f"Distribution shift detected in {col}: KS p-value={p_value:.4f}"
            )

Monitoring et détection de drift avec Evidently

Pourquoi monitorer un modèle en production ?

Un modèle ML en production se dégrade inévitablement avec le temps. Les données changent (data drift), la relation entre les features et la cible évolue (concept drift), et les patterns que le modèle a appris deviennent obsolètes. Sans monitoring, vous ne découvrez ce problème que lorsqu’un utilisateur se plaint — et à ce moment-là, le modèle a peut-être pris des milliers de mauvaises décisions.

Evidently : monitoring open source

Evidently est une bibliothèque Python open source pour le monitoring de modèles ML. Elle permet de détecter le data drift, le concept drift, et de générer des rapports visuels détaillés.

# src/monitoring/drift_detector.py
import pandas as pd
from evidently import ColumnMapping
from evidently.report import Report
from evidently.metric_preset import (
    DataDriftPreset,
    DataQualityPreset,
    TargetDriftPreset,
    ClassificationPreset
)
from evidently.test_suite import TestSuite
from evidently.tests import (
    TestNumberOfColumnsWithMissingValues,
    TestNumberOfRowsWithMissingValues,
    TestNumberOfDriftedColumns,
    TestShareOfDriftedColumns,
    TestColumnDrift,
)
import json
from datetime import datetime

class ModelMonitor:
    """Moniteur de modèle ML en production."""
    
    def __init__(self, reference_data: pd.DataFrame, column_mapping: ColumnMapping):
        self.reference_data = reference_data
        self.column_mapping = column_mapping
    
    def check_data_drift(self, current_data: pd.DataFrame) -> dict:
        """Vérifie le data drift entre les données de référence et actuelles."""
        report = Report(metrics=[
            DataDriftPreset(),
            DataQualityPreset(),
        ])
        report.run(
            reference_data=self.reference_data,
            current_data=current_data,
            column_mapping=self.column_mapping
        )
        
        # Extraire les résultats
        result = report.as_dict()
        
        drift_detected = result["metrics"][0]["result"]["dataset_drift"]
        drift_share = result["metrics"][0]["result"]["share_of_drifted_columns"]
        
        # Sauvegarder le rapport HTML
        timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
        report.save_html(f"reports/drift_report_{timestamp}.html")
        
        return {
            "drift_detected": drift_detected,
            "drift_share": drift_share,
            "timestamp": timestamp,
            "n_drifted_columns": result["metrics"][0]["result"]["number_of_drifted_columns"],
            "details": result
        }
    
    def run_test_suite(self, current_data: pd.DataFrame) -> dict:
        """Exécute une suite de tests automatisés sur les données."""
        tests = TestSuite(tests=[
            TestNumberOfColumnsWithMissingValues(lte=2),
            TestNumberOfRowsWithMissingValues(lte=current_data.shape[0] * 0.05),
            TestNumberOfDriftedColumns(lte=3),
            TestShareOfDriftedColumns(lte=0.3),
        ])
        
        tests.run(
            reference_data=self.reference_data,
            current_data=current_data,
            column_mapping=self.column_mapping
        )
        
        result = tests.as_dict()
        all_passed = all(
            test["status"] == "SUCCESS" 
            for test in result["tests"]
        )
        
        return {
            "all_passed": all_passed,
            "tests": result["tests"]
        }
    
    def check_prediction_drift(
        self, 
        reference_predictions: pd.Series, 
        current_predictions: pd.Series
    ) -> dict:
        """Vérifie si la distribution des prédictions a changé."""
        from scipy import stats
        
        stat, p_value = stats.ks_2samp(
            reference_predictions, 
            current_predictions
        )
        
        drift_detected = p_value < 0.05
        
        return {
            "prediction_drift_detected": drift_detected,
            "ks_statistic": float(stat),
            "p_value": float(p_value),
            "reference_mean": float(reference_predictions.mean()),
            "current_mean": float(current_predictions.mean()),
        }


# Script de monitoring périodique (à exécuter via cron ou Airflow)
def run_monitoring():
    """Exécute le monitoring complet et alerte si nécessaire."""
    import mlflow
    
    # Charger les données de référence (données d'entraînement)
    reference = pd.read_parquet("data/reference/training_data.parquet")
    
    # Charger les données récentes de production
    current = pd.read_parquet("data/production/last_24h.parquet")
    
    column_mapping = ColumnMapping(
        target="is_fraud",
        prediction="prediction",
        numerical_features=[
            "amount", "hour_of_day", "distance_from_home",
            "transaction_frequency_1h", "avg_amount_30d"
        ],
        categorical_features=[
            "merchant_category", "day_of_week"
        ]
    )
    
    monitor = ModelMonitor(reference, column_mapping)
    
    # Vérifier le data drift
    drift_result = monitor.check_data_drift(current)
    
    # Exécuter les tests
    test_result = monitor.run_test_suite(current)
    
    # Logger les résultats dans MLflow
    with mlflow.start_run(run_name=f"monitoring-{datetime.now().isoformat()}"):
        mlflow.log_metric("drift_share", drift_result["drift_share"])
        mlflow.log_metric("n_drifted_columns", drift_result["n_drifted_columns"])
        mlflow.log_metric("tests_passed", int(test_result["all_passed"]))
    
    # Alerter si nécessaire
    if drift_result["drift_detected"] or not test_result["all_passed"]:
        send_alert(
            message=f"⚠️ Model monitoring alert!\n"
                    f"Drift detected: {drift_result['drift_detected']}\n"
                    f"Tests passed: {test_result['all_passed']}\n"
                    f"Drifted columns: {drift_result['n_drifted_columns']}",
            channel="ml-alerts"
        )
        
        # Déclencher un ré-entraînement automatique si le drift est sévère
        if drift_result["drift_share"] > 0.5:
            trigger_retraining_pipeline()

if __name__ == "__main__":
    run_monitoring()

Infrastructure et conteneurisation avec Docker

Dockerfile pour le service de prédiction

# Dockerfile
FROM python:3.11-slim AS base

# Metadata
LABEL maintainer="ml-team@example.com"
LABEL description="Fraud detection model serving"

# Security: non-root user
RUN groupadd -r mluser && useradd -r -g mluser mluser

# System dependencies
RUN apt-get update && apt-get install -y --no-install-recommends \
    curl \
    && rm -rf /var/lib/apt/lists/*

# Python dependencies
COPY requirements-serve.txt /tmp/
RUN pip install --no-cache-dir -r /tmp/requirements-serve.txt

# Application code
WORKDIR /app
COPY src/serve/ ./serve/
COPY src/monitoring/ ./monitoring/

# Health check
HEALTHCHECK --interval=30s --timeout=10s --retries=3 \
  CMD curl -f http://localhost:8080/health || exit 1

# Switch to non-root user
USER mluser

# Expose port
EXPOSE 8080

# Start the server
CMD ["uvicorn", "serve.api:app", "--host", "0.0.0.0", "--port", "8080", "--workers", "4"]

API de serving avec FastAPI

# src/serve/api.py
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel, Field
import mlflow.pyfunc
import pandas as pd
import numpy as np
import time
import logging
from typing import List, Optional
from contextlib import asynccontextmanager

logger = logging.getLogger(__name__)

# Chargement du modèle au démarrage
model = None

@asynccontextmanager
async def lifespan(app: FastAPI):
    global model
    logger.info("Loading model from MLflow registry...")
    model = mlflow.pyfunc.load_model("models:/fraud-detector/Production")
    logger.info("Model loaded successfully")
    yield
    logger.info("Shutting down...")

app = FastAPI(
    title="Fraud Detection API",
    version="2.0.0",
    lifespan=lifespan
)

class Transaction(BaseModel):
    amount: float = Field(..., gt=0, description="Transaction amount")
    hour_of_day: int = Field(..., ge=0, le=23)
    day_of_week: int = Field(..., ge=0, le=6)
    merchant_category: str
    distance_from_home: float = Field(..., ge=0)
    transaction_frequency_1h: int = Field(..., ge=0)
    avg_amount_30d: float = Field(..., ge=0)

class PredictionResponse(BaseModel):
    is_fraud: bool
    fraud_probability: float
    model_version: str
    latency_ms: float

class BatchPredictionRequest(BaseModel):
    transactions: List[Transaction]

@app.get("/health")
async def health():
    return {"status": "healthy", "model_loaded": model is not None}

@app.post("/predict", response_model=PredictionResponse)
async def predict(transaction: Transaction):
    if model is None:
        raise HTTPException(status_code=503, detail="Model not loaded")
    
    start = time.perf_counter()
    
    # Préparer les données
    input_df = pd.DataFrame([transaction.model_dump()])
    
    # Prédiction
    prediction = model.predict(input_df)
    proba = model._model_impl.predict_proba(input_df)[:, 1]
    
    latency = (time.perf_counter() - start) * 1000
    
    # Logger pour le monitoring
    logger.info(
        f"Prediction: fraud={bool(prediction[0])}, "
        f"proba={float(proba[0]):.4f}, "
        f"latency={latency:.1f}ms"
    )
    
    return PredictionResponse(
        is_fraud=bool(prediction[0]),
        fraud_probability=float(proba[0]),
        model_version="2.0.0",
        latency_ms=round(latency, 2)
    )

@app.post("/predict/batch")
async def predict_batch(request: BatchPredictionRequest):
    if model is None:
        raise HTTPException(status_code=503, detail="Model not loaded")
    
    start = time.perf_counter()
    
    input_df = pd.DataFrame([t.model_dump() for t in request.transactions])
    predictions = model.predict(input_df)
    probas = model._model_impl.predict_proba(input_df)[:, 1]
    
    latency = (time.perf_counter() - start) * 1000
    
    results = [
        {
            "index": i,
            "is_fraud": bool(pred),
            "fraud_probability": float(proba)
        }
        for i, (pred, proba) in enumerate(zip(predictions, probas))
    ]
    
    return {
        "predictions": results,
        "count": len(results),
        "latency_ms": round(latency, 2)
    }

Docker Compose pour l’environnement complet

# docker-compose.yml
version: '3.8'

services:
  mlflow:
    image: ghcr.io/mlflow/mlflow:2.10.0
    ports:
      - "5000:5000"
    environment:
      - MLFLOW_BACKEND_STORE_URI=postgresql://mlflow:mlflow@postgres:5432/mlflow
      - MLFLOW_DEFAULT_ARTIFACT_ROOT=s3://mlflow-artifacts/
      - AWS_ACCESS_KEY_ID=${AWS_ACCESS_KEY_ID}
      - AWS_SECRET_ACCESS_KEY=${AWS_SECRET_ACCESS_KEY}
    command: >
      mlflow server 
      --host 0.0.0.0 
      --port 5000
    depends_on:
      - postgres

  postgres:
    image: postgres:16
    environment:
      POSTGRES_DB: mlflow
      POSTGRES_USER: mlflow
      POSTGRES_PASSWORD: mlflow
    volumes:
      - postgres_data:/var/lib/postgresql/data
    ports:
      - "5432:5432"

  model-api:
    build: .
    ports:
      - "8080:8080"
    environment:
      - MLFLOW_TRACKING_URI=http://mlflow:5000
    depends_on:
      - mlflow
    deploy:
      resources:
        limits:
          cpus: '2'
          memory: 4G

  monitoring:
    build:
      context: .
      dockerfile: Dockerfile.monitoring
    environment:
      - MLFLOW_TRACKING_URI=http://mlflow:5000
      - ALERT_WEBHOOK=${ALERT_WEBHOOK}
    depends_on:
      - mlflow
      - postgres

volumes:
  postgres_data:

Pipeline complet : tout assembler

Maintenant que nous avons vu chaque composant individuellement, voici comment ils s’articulent dans un workflow réel de bout en bout :

Workflow quotidien type

  1. Ingestion : les nouvelles données de transactions arrivent dans la base de données source (via un pipeline ETL/ELT classique).
  2. Feature Engineering : Feast calcule les nouvelles features et met à jour le Feature Store (batch job quotidien).
  3. Monitoring : Evidently compare les données du jour avec les données de référence. Si un drift significatif est détecté, une alerte est envoyée et un ré-entraînement peut être déclenché automatiquement.
  4. Ré-entraînement (si nécessaire) : DVC récupère les données les plus récentes, le pipeline d’entraînement est exécuté (dvc repro), les résultats sont trackés dans MLflow.
  5. Validation : la suite de tests ML vérifie que le nouveau modèle est au moins aussi bon que le modèle en production.
  6. Déploiement : si les tests passent, le modèle est promu en production dans le Model Registry, une nouvelle image Docker est construite et déployée.
  7. Serving : l’API FastAPI sert les prédictions en temps réel, avec logging de chaque prédiction pour le monitoring futur.
# src/orchestrate.py — Script d'orchestration principal
"""
Orchestrateur du pipeline MLOps complet.
Peut être déclenché par cron, Airflow, ou GitHub Actions.
"""

import subprocess
import sys
import json
import logging
from datetime import datetime, timedelta
from pathlib import Path

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("mlops-orchestrator")


def run_step(name: str, command: str) -> bool:
    """Exécute une étape du pipeline et retourne True si succès."""
    logger.info(f"▶ Starting step: {name}")
    start = datetime.now()
    
    result = subprocess.run(
        command, shell=True, capture_output=True, text=True
    )
    
    duration = (datetime.now() - start).total_seconds()
    
    if result.returncode == 0:
        logger.info(f"✅ {name} completed in {duration:.1f}s")
        return True
    else:
        logger.error(f"❌ {name} failed after {duration:.1f}s")
        logger.error(f"STDERR: {result.stderr}")
        return False


def main():
    pipeline_start = datetime.now()
    logger.info(f"🚀 Pipeline started at {pipeline_start.isoformat()}")
    
    steps = [
        ("Pull latest data", "dvc pull"),
        ("Validate data quality", "python src/validate_data.py"),
        ("Check for drift", "python src/monitoring/drift_detector.py"),
        ("Run training pipeline", "dvc repro"),
        ("Run model tests", "pytest tests/test_model.py -v"),
        ("Evaluate vs production", "python src/evaluate.py --compare-prod"),
    ]
    
    for name, command in steps:
        if not run_step(name, command):
            logger.error(f"Pipeline failed at step: {name}")
            send_notification(f"❌ MLOps pipeline failed at: {name}")
            sys.exit(1)
    
    # Si on est sur la branche main, déployer
    if is_main_branch():
        deploy_steps = [
            ("Promote model", "python src/promote_model.py"),
            ("Build Docker image", "docker build -t fraud-detector:latest ."),
            ("Deploy", "python src/deploy.py"),
        ]
        for name, command in deploy_steps:
            if not run_step(name, command):
                logger.error(f"Deployment failed at step: {name}")
                send_notification(f"❌ Deployment failed at: {name}")
                sys.exit(1)
    
    duration = (datetime.now() - pipeline_start).total_seconds()
    logger.info(f"🎉 Pipeline completed successfully in {duration:.1f}s")
    send_notification(f"✅ MLOps pipeline completed in {duration/60:.1f} min")


if __name__ == "__main__":
    main()

Bonnes pratiques et anti-patterns

Les bonnes pratiques essentielles

Les anti-patterns à éviter

🚫 Anti-patterns MLOps courants

1. Le « notebook en production » : exécuter un notebook Jupyter en production via cron. Fragile, non testable, non reproductible. À bannir absolument.

2. Le « pickle artisanal » : sauvegarder un modèle avec pickle et le copier manuellement sur le serveur de production. Aucune traçabilité, aucune gestion de version.

3. Le « training-serving skew silencieux » : utiliser un code de feature engineering différent entre l’entraînement et l’inférence. Le modèle fonctionne bien en test mais produit des résultats incohérents en production.

4. Le « monitoring par l’absence de plaintes » : considérer que le modèle fonctionne bien parce que personne ne s’est plaint. Le drift peut dégrader silencieusement les performances pendant des mois.

5. Le « one-shot training » : entraîner le modèle une fois et ne jamais le ré-entraîner. Les données changent, le monde change, le modèle doit évoluer.

Structure de projet recommandée

my-ml-project/
├── .github/
│   └── workflows/
│       └── ml-pipeline.yml        # CI/CD
├── data/
│   ├── raw/                       # Données brutes (DVC-tracked)
│   ├── processed/                 # Données transformées (DVC-tracked)
│   ├── features/                  # Features calculées (DVC-tracked)
│   └── reference/                 # Données de référence pour monitoring
├── feature_repo/                  # Définitions Feast
│   ├── feature_definitions.py
│   └── feature_store.yaml
├── models/                        # Modèles locaux (DVC-tracked)
├── metrics/                       # Métriques d'évaluation
├── reports/                       # Rapports Evidently
├── src/
│   ├── prepare.py                 # Préparation des données
│   ├── featurize.py               # Feature engineering
│   ├── train.py                   # Entraînement
│   ├── evaluate.py                # Évaluation
│   ├── promote_model.py           # Promotion dans le registry
│   ├── validate_data.py           # Validation des données
│   ├── serve/
│   │   └── api.py                 # API FastAPI
│   └── monitoring/
│       └── drift_detector.py      # Monitoring Evidently
├── tests/
│   ├── unit/                      # Tests unitaires
│   ├── integration/               # Tests d'intégration
│   └── test_model.py              # Tests ML
├── notebooks/                     # Exploration (NON en production)
├── dvc.yaml                       # Pipeline DVC
├── dvc.lock                       # Lock file DVC
├── params.yaml                    # Hyperparamètres
├── Dockerfile                     # Image de serving
├── docker-compose.yml             # Stack locale
├── requirements.txt               # Dépendances
├── requirements-dev.txt           # Dépendances de développement
└── README.md

Conclusion

Construire un pipeline MLOps complet n’est pas un projet trivial. C’est un investissement significatif en temps et en complexité. Mais c’est un investissement qui paie — et qui paie massivement. Les équipes qui adoptent le MLOps passent moins de temps à combattre des incendies en production, plus de temps à améliorer leurs modèles, et livrent de la valeur de manière beaucoup plus prévisible et fiable.

Les outils que nous avons présentés — MLflow, DVC, Feast, Evidently, Docker, GitHub Actions — forment un stack cohérent et mature pour construire un pipeline MLOps de niveau production. Ils sont tous open source, bien documentés, et activement maintenus par des communautés importantes.

Mais les outils ne sont que la moitié de l’équation. L’autre moitié, c’est la culture. Le MLOps est autant une question d’organisation et de processus que de technologie. Les équipes qui réussissent le mieux sont celles qui traitent le ML avec la même rigueur que le logiciel classique : revues de code (et de modèles), tests automatisés, déploiement continu, monitoring proactif, documentation systématique.

Le message final est simple : ne commencez pas par un modèle parfait dans un notebook. Commencez par un modèle simple dans un pipeline robuste, puis itérez. Le MLOps n’est pas la cerise sur le gâteau — c’est le gâteau. Le modèle n’en est que la garniture.

« Le plus grand défi du machine learning n’est pas de construire un bon modèle — c’est de le maintenir en production. Le MLOps n’est pas optionnel, c’est existentiel. Sans lui, votre modèle est un prototype qui se fait passer pour un produit. » — Chip Huyen, auteure de « Designing Machine Learning Systems »

Sources

  1. Google Cloud, « MLOps: Continuous delivery and automation pipelines in machine learning », Architecture documentation, 2025.
  2. Chip Huyen, « Designing Machine Learning Systems », O’Reilly Media, 2022 (2nd edition 2025).
  3. MLflow Documentation, « MLflow: A Machine Learning Lifecycle Platform », mlflow.org, 2025.
  4. DVC Documentation, « Data Version Control: Git for Data & Models », dvc.org, 2025.
  5. Evidently AI, « ML Monitoring in Production: Best Practices », evidently.ai blog, 2025.
  6. Feast Documentation, « Feast: Feature Store for Machine Learning », feast.dev, 2025.
  7. Gartner, « Top Strategic Technology Trends 2026: AI Engineering », décembre 2025.
  8. Sculley et al., « Hidden Technical Debt in Machine Learning Systems », NeurIPS 2015 (toujours fondamental en 2026).
Retour à l'accueil