Déployer un modèle d’IA en production avec Docker et FastAPI

Niveau du tutoriel : Expert
🟡 Intermédiaire⏱️ 24 min

📋 Sommaire

  1. Introduction : pourquoi Docker + FastAPI pour l’IA ?
  2. Architecture de référence
  3. Créer l’API d’inférence avec FastAPI
  4. Dockerfile multi-stage optimisé
  5. Healthchecks et readiness probes
  6. Docker Compose pour l’orchestration locale
  7. Monitoring avec Prometheus et Grafana
  8. Scaling et gestion de la charge
  9. CI/CD : automatiser le déploiement
  10. Sécurité et bonnes pratiques
  11. Optimisations de performance
  12. Conclusion

1. Introduction : pourquoi Docker + FastAPI pour l’IA ?

Déployer un modèle d’intelligence artificielle en production est un défi radicalement différent de l’entraîner dans un notebook Jupyter. En recherche, on se concentre sur la précision et les métriques ; en production, il faut gérer la latence, la disponibilité, le scaling, le monitoring, la sécurité, et la reproductibilité. C’est le fameux gouffre entre le “ça marche sur ma machine” et le “ça tourne H24 en production”.

Docker résout le problème de la reproductibilité en empaquetant votre modèle, ses dépendances, et l’environnement d’exécution dans un conteneur portable. Que ce soit sur votre laptop, sur un serveur AWS, ou dans un cluster Kubernetes, le conteneur se comporte de manière identique. Fini les conflits de versions de CUDA, les bibliothèques Python incompatibles, et les “ça marchait avant la mise à jour”.

FastAPI est le framework Python idéal pour servir des modèles d’IA. Ses avantages sont convaincants : performances quasi-natives grâce à Starlette et Uvicorn (ASGI), documentation OpenAPI automatique, validation des entrées avec Pydantic, support natif de l’asynchrone, et typage statique qui réduit les bugs. En termes de performance, FastAPI rivalise avec Node.js et Go pour les applications I/O-bound, et son écosystème Python est un atout majeur pour l’IA.

Dans ce tutoriel, nous allons construire une architecture de production complète : une API d’inférence FastAPI conteneurisée avec Docker, incluant le monitoring, le healthcheck, le scaling, et les bases du CI/CD. Chaque composant sera expliqué en détail avec du code prêt à l’emploi.

💡 Astuce

Si vous débutez avec Docker, pas de panique. Ce tutoriel est conçu pour être suivi pas à pas. Assurez-vous simplement d’avoir Docker Desktop (Mac/Windows) ou Docker Engine (Linux) installé, ainsi que Python 3.10+. La commande docker --version doit retourner au moins la version 20.10.

2. Architecture de référence

Avant de coder, posons l’architecture. Un déploiement de modèle IA en production se compose généralement des briques suivantes :

Pour ce tutoriel, nous allons implémenter les composants essentiels : l’API FastAPI, le Dockerfile multi-stage, le docker-compose avec monitoring, et les healthchecks. C’est la base solide sur laquelle vous pourrez construire des architectures plus complexes.

La structure du projet sera la suivante :

Bash
ml-api/
├── app/
│   ├── __init__.py
│   ├── main.py              # Point d'entrée FastAPI
│   ├── config.py             # Configuration (env vars)
│   ├── models/
│   │   ├── __init__.py
│   │   └── inference.py      # Logique d'inférence
│   ├── schemas/
│   │   ├── __init__.py
│   │   └── prediction.py     # Schémas Pydantic
│   ├── middleware/
│   │   ├── __init__.py
│   │   └── metrics.py        # Middleware Prometheus
│   └── utils/
│       ├── __init__.py
│       └── logging.py        # Configuration logging
├── tests/
│   ├── test_api.py
│   └── test_inference.py
├── Dockerfile
├── Dockerfile.gpu            # Variante GPU
├── docker-compose.yml
├── docker-compose.prod.yml
├── prometheus.yml
├── requirements.txt
├── requirements-dev.txt
├── .env.example
├── .dockerignore
└── README.md

3. Créer l’API d’inférence avec FastAPI

Commençons par le cœur de notre système : l’API FastAPI. Nous allons construire une API robuste avec validation des entrées, gestion d’erreurs, logging structuré, et support du batching.

D’abord, les schémas Pydantic pour la validation des entrées et sorties :

Python
# app/schemas/prediction.py
from pydantic import BaseModel, Field, validator
from typing import List, Optional, Dict, Any
from enum import Enum
import time


class TaskType(str, Enum):
    CLASSIFICATION = "classification"
    GENERATION = "generation"
    EMBEDDING = "embedding"
    SUMMARIZATION = "summarization"


class PredictionRequest(BaseModel):
    """Schéma de requête d'inférence."""
    text: str = Field(
        ...,
        min_length=1,
        max_length=10000,
        description="Texte d'entrée pour l'inférence",
        examples=["Analysez le sentiment de ce commentaire client."]
    )
    task: TaskType = Field(
        default=TaskType.CLASSIFICATION,
        description="Type de tâche d'inférence"
    )
    parameters: Optional[Dict[str, Any]] = Field(
        default=None,
        description="Paramètres optionnels (temperature, max_tokens, etc.)"
    )
    
    @validator('text')
    def text_not_empty(cls, v):
        if not v.strip():
            raise ValueError("Le texte ne peut pas être vide ou contenir uniquement des espaces")
        return v.strip()


class BatchPredictionRequest(BaseModel):
    """Requête de prédiction par batch."""
    items: List[PredictionRequest] = Field(
        ...,
        min_length=1,
        max_length=32,
        description="Liste de requêtes (max 32)"
    )


class PredictionResponse(BaseModel):
    """Schéma de réponse d'inférence."""
    prediction: Any = Field(description="Résultat de la prédiction")
    confidence: Optional[float] = Field(
        default=None,
        ge=0.0,
        le=1.0,
        description="Score de confiance"
    )
    model_name: str = Field(description="Nom du modèle utilisé")
    model_version: str = Field(description="Version du modèle")
    processing_time_ms: float = Field(description="Temps de traitement en ms")
    metadata: Optional[Dict[str, Any]] = Field(default=None)


class BatchPredictionResponse(BaseModel):
    """Réponse batch."""
    results: List[PredictionResponse]
    total_processing_time_ms: float
    batch_size: int


class HealthResponse(BaseModel):
    """Réponse du healthcheck."""
    status: str
    model_loaded: bool
    gpu_available: bool
    uptime_seconds: float
    version: str

Ensuite, la configuration centralisée avec variables d’environnement :

Python
# app/config.py
from pydantic_settings import BaseSettings
from functools import lru_cache


class Settings(BaseSettings):
    """Configuration de l'application via variables d'environnement."""
    
    # Application
    app_name: str = "ML Inference API"
    app_version: str = "1.0.0"
    debug: bool = False
    log_level: str = "INFO"
    
    # Modèle
    model_name: str = "distilbert-base-uncased-finetuned-sst-2-english"
    model_path: str = "./models"
    model_device: str = "auto"  # auto, cpu, cuda, cuda:0
    max_batch_size: int = 32
    max_sequence_length: int = 512
    
    # Serveur
    host: str = "0.0.0.0"
    port: int = 8000
    workers: int = 1  # 1 worker pour les modèles GPU
    
    # CORS
    cors_origins: str = "*"
    
    # Rate limiting
    rate_limit_per_minute: int = 60
    
    # Cache
    cache_enabled: bool = True
    cache_ttl_seconds: int = 3600
    cache_max_size: int = 1000
    
    class Config:
        env_file = ".env"
        env_prefix = "ML_"


@lru_cache()
def get_settings() -> Settings:
    return Settings()

Maintenant, le service d’inférence qui encapsule le chargement et l’utilisation du modèle :

Python
# app/models/inference.py
import torch
import logging
import time
from typing import List, Dict, Any, Optional
from transformers import AutoModelForSequenceClassification, AutoTokenizer, pipeline
from functools import lru_cache
from cachetools import TTLCache

from app.config import get_settings

logger = logging.getLogger(__name__)


class InferenceService:
    """Service d'inférence singleton avec cache et batching."""
    
    _instance = None
    
    def __new__(cls):
        if cls._instance is None:
            cls._instance = super().__new__(cls)
            cls._instance._initialized = False
        return cls._instance
    
    def __init__(self):
        if self._initialized:
            return
        
        self.settings = get_settings()
        self.model = None
        self.tokenizer = None
        self.pipe = None
        self.device = None
        self.is_loaded = False
        self._cache = TTLCache(
            maxsize=self.settings.cache_max_size,
            ttl=self.settings.cache_ttl_seconds,
        )
        self._initialized = True
    
    def load_model(self):
        """Charge le modèle en mémoire."""
        start = time.time()
        logger.info(f"🔄 Chargement du modèle: {self.settings.model_name}")
        
        # Déterminer le device
        if self.settings.model_device == "auto":
            self.device = "cuda" if torch.cuda.is_available() else "cpu"
        else:
            self.device = self.settings.model_device
        
        logger.info(f"   Device: {self.device}")
        
        # Charger le tokenizer et le modèle
        self.tokenizer = AutoTokenizer.from_pretrained(
            self.settings.model_name,
            cache_dir=self.settings.model_path,
        )
        
        self.model = AutoModelForSequenceClassification.from_pretrained(
            self.settings.model_name,
            cache_dir=self.settings.model_path,
            torch_dtype=torch.float16 if self.device == "cuda" else torch.float32,
        )
        self.model.to(self.device)
        self.model.eval()
        
        # Pipeline Hugging Face pour simplicité
        self.pipe = pipeline(
            "text-classification",
            model=self.model,
            tokenizer=self.tokenizer,
            device=self.device,
            torch_dtype=torch.float16 if self.device == "cuda" else torch.float32,
        )
        
        elapsed = time.time() - start
        self.is_loaded = True
        logger.info(f"✅ Modèle chargé en {elapsed:.2f}s")
    
    def predict(self, text: str, parameters: Optional[Dict] = None) -> Dict[str, Any]:
        """Exécute une prédiction unique."""
        if not self.is_loaded:
            raise RuntimeError("Modèle non chargé")
        
        # Vérifier le cache
        cache_key = f"{text}:{str(parameters)}"
        if self.settings.cache_enabled and cache_key in self._cache:
            logger.debug("Cache hit")
            return self._cache[cache_key]
        
        start = time.time()
        
        # Inférence
        with torch.no_grad():
            result = self.pipe(
                text,
                truncation=True,
                max_length=self.settings.max_sequence_length,
            )
        
        elapsed_ms = (time.time() - start) * 1000
        
        output = {
            "prediction": result[0]["label"],
            "confidence": round(result[0]["score"], 4),
            "processing_time_ms": round(elapsed_ms, 2),
        }
        
        # Mettre en cache
        if self.settings.cache_enabled:
            self._cache[cache_key] = output
        
        return output
    
    def predict_batch(self, texts: List[str]) -> List[Dict[str, Any]]:
        """Exécute des prédictions par batch."""
        if not self.is_loaded:
            raise RuntimeError("Modèle non chargé")
        
        start = time.time()
        
        with torch.no_grad():
            results = self.pipe(
                texts,
                truncation=True,
                max_length=self.settings.max_sequence_length,
                batch_size=min(len(texts), self.settings.max_batch_size),
            )
        
        elapsed_ms = (time.time() - start) * 1000
        per_item_ms = elapsed_ms / len(texts)
        
        return [
            {
                "prediction": r["label"],
                "confidence": round(r["score"], 4),
                "processing_time_ms": round(per_item_ms, 2),
            }
            for r in results
        ]
    
    @property
    def gpu_available(self) -> bool:
        return torch.cuda.is_available()
    
    @property
    def model_info(self) -> Dict:
        return {
            "name": self.settings.model_name,
            "version": self.settings.app_version,
            "device": str(self.device),
            "loaded": self.is_loaded,
            "cache_size": len(self._cache),
        }

Enfin, le point d’entrée FastAPI qui assemble tous les composants :

Python
# app/main.py
import time
import logging
from contextlib import asynccontextmanager

from fastapi import FastAPI, HTTPException, Depends, Request
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import JSONResponse
from prometheus_client import (
    Counter, Histogram, Gauge, generate_latest, CONTENT_TYPE_LATEST,
)
from starlette.responses import Response

from app.config import get_settings, Settings
from app.models.inference import InferenceService
from app.schemas.prediction import (
    PredictionRequest, PredictionResponse,
    BatchPredictionRequest, BatchPredictionResponse,
    HealthResponse,
)

logger = logging.getLogger(__name__)

# ============================================================
# Métriques Prometheus
# ============================================================
REQUEST_COUNT = Counter(
    "ml_requests_total",
    "Total de requêtes d'inférence",
    ["method", "endpoint", "status"],
)
REQUEST_LATENCY = Histogram(
    "ml_request_duration_seconds",
    "Latence des requêtes d'inférence",
    ["endpoint"],
    buckets=[0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0],
)
MODEL_LOADED = Gauge(
    "ml_model_loaded",
    "Indique si le modèle est chargé (1) ou non (0)",
)
INFERENCE_ERRORS = Counter(
    "ml_inference_errors_total",
    "Total d'erreurs d'inférence",
    ["error_type"],
)

# ============================================================
# Lifespan (startup/shutdown)
# ============================================================
startup_time = None
inference_service = InferenceService()


@asynccontextmanager
async def lifespan(app: FastAPI):
    """Gestion du cycle de vie de l'application."""
    global startup_time
    startup_time = time.time()
    
    # Startup : charger le modèle
    logger.info("🚀 Démarrage de l'API...")
    try:
        inference_service.load_model()
        MODEL_LOADED.set(1)
    except Exception as e:
        logger.error(f"❌ Erreur au chargement du modèle: {e}")
        MODEL_LOADED.set(0)
        raise
    
    yield
    
    # Shutdown
    logger.info("👋 Arrêt de l'API...")
    MODEL_LOADED.set(0)


# ============================================================
# Application FastAPI
# ============================================================
settings = get_settings()

app = FastAPI(
    title=settings.app_name,
    version=settings.app_version,
    description="API d'inférence ML en production",
    lifespan=lifespan,
    docs_url="/docs",
    redoc_url="/redoc",
)

# CORS
app.add_middleware(
    CORSMiddleware,
    allow_origins=settings.cors_origins.split(","),
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)


# ============================================================
# Middleware de métriques
# ============================================================
@app.middleware("http")
async def metrics_middleware(request: Request, call_next):
    start = time.time()
    response = await call_next(request)
    elapsed = time.time() - start
    
    endpoint = request.url.path
    REQUEST_COUNT.labels(
        method=request.method,
        endpoint=endpoint,
        status=response.status_code,
    ).inc()
    REQUEST_LATENCY.labels(endpoint=endpoint).observe(elapsed)
    
    return response


# ============================================================
# Endpoints
# ============================================================

@app.get("/health", response_model=HealthResponse, tags=["Système"])
async def health_check():
    """Vérification de l'état de santé de l'API."""
    return HealthResponse(
        status="healthy" if inference_service.is_loaded else "degraded",
        model_loaded=inference_service.is_loaded,
        gpu_available=inference_service.gpu_available,
        uptime_seconds=round(time.time() - startup_time, 2),
        version=settings.app_version,
    )


@app.get("/ready", tags=["Système"])
async def readiness_check():
    """Vérification que l'API est prête à recevoir du trafic."""
    if not inference_service.is_loaded:
        raise HTTPException(
            status_code=503,
            detail="Le modèle n'est pas encore chargé"
        )
    return {"status": "ready"}


@app.get("/metrics", tags=["Système"])
async def prometheus_metrics():
    """Endpoint Prometheus pour le scraping de métriques."""
    return Response(
        content=generate_latest(),
        media_type=CONTENT_TYPE_LATEST,
    )


@app.post("/predict", response_model=PredictionResponse, tags=["Inférence"])
async def predict(request: PredictionRequest):
    """Exécute une prédiction sur un texte."""
    if not inference_service.is_loaded:
        raise HTTPException(status_code=503, detail="Modèle non disponible")
    
    try:
        result = inference_service.predict(
            text=request.text,
            parameters=request.parameters,
        )
        
        return PredictionResponse(
            prediction=result["prediction"],
            confidence=result["confidence"],
            model_name=settings.model_name,
            model_version=settings.app_version,
            processing_time_ms=result["processing_time_ms"],
        )
    
    except Exception as e:
        INFERENCE_ERRORS.labels(error_type=type(e).__name__).inc()
        logger.error(f"Erreur d'inférence: {e}", exc_info=True)
        raise HTTPException(
            status_code=500,
            detail=f"Erreur lors de l'inférence: {str(e)}"
        )


@app.post("/predict/batch", response_model=BatchPredictionResponse, tags=["Inférence"])
async def predict_batch(request: BatchPredictionRequest):
    """Exécute des prédictions sur un batch de textes."""
    if not inference_service.is_loaded:
        raise HTTPException(status_code=503, detail="Modèle non disponible")
    
    start = time.time()
    
    try:
        texts = [item.text for item in request.items]
        results = inference_service.predict_batch(texts)
        
        total_ms = (time.time() - start) * 1000
        
        return BatchPredictionResponse(
            results=[
                PredictionResponse(
                    prediction=r["prediction"],
                    confidence=r["confidence"],
                    model_name=settings.model_name,
                    model_version=settings.app_version,
                    processing_time_ms=r["processing_time_ms"],
                )
                for r in results
            ],
            total_processing_time_ms=round(total_ms, 2),
            batch_size=len(texts),
        )
    
    except Exception as e:
        INFERENCE_ERRORS.labels(error_type=type(e).__name__).inc()
        logger.error(f"Erreur batch: {e}", exc_info=True)
        raise HTTPException(status_code=500, detail=str(e))


@app.get("/model/info", tags=["Modèle"])
async def model_info():
    """Informations sur le modèle chargé."""
    return inference_service.model_info


# ============================================================
# Gestion d'erreurs globale
# ============================================================
@app.exception_handler(Exception)
async def global_exception_handler(request: Request, exc: Exception):
    logger.error(f"Erreur non gérée: {exc}", exc_info=True)
    return JSONResponse(
        status_code=500,
        content={"detail": "Erreur interne du serveur"},
    )

💡 Astuce

En production, utilisez toujours workers=1 pour les modèles GPU. Chaque worker charge sa propre copie du modèle en mémoire. Avec un modèle de 2 Go et 4 workers, vous consommez 8 Go de VRAM pour un seul service. Utilisez plutôt le batching asynchrone pour maximiser le throughput avec un seul worker.

4. Dockerfile multi-stage optimisé

Un Dockerfile multi-stage sépare la phase de build (installation des dépendances, compilation) de la phase d’exécution. L’image finale ne contient que le strict nécessaire, réduisant la taille et la surface d’attaque.

Dockerfile
# ============================================================
# Stage 1 : Builder — installation des dépendances
# ============================================================
FROM python:3.11-slim AS builder

WORKDIR /build

# Installer les dépendances système pour la compilation
RUN apt-get update && apt-get install -y --no-install-recommends \
    build-essential \
    gcc \
    && rm -rf /var/lib/apt/lists/*

# Copier et installer les requirements
COPY requirements.txt .
RUN pip install --no-cache-dir --prefix=/install -r requirements.txt

# ============================================================
# Stage 2 : Runtime — image finale légère
# ============================================================
FROM python:3.11-slim AS runtime

# Métadonnées
LABEL maintainer="Thomas Klein "
LABEL description="ML Inference API"
LABEL version="1.0.0"

# Variables d'environnement
ENV PYTHONUNBUFFERED=1 \
    PYTHONDONTWRITEBYTECODE=1 \
    PYTHONPATH=/app \
    ML_HOST=0.0.0.0 \
    ML_PORT=8000 \
    ML_WORKERS=1 \
    ML_LOG_LEVEL=INFO

# Créer un utilisateur non-root
RUN groupadd -r mluser && useradd -r -g mluser -d /app -s /sbin/nologin mluser

# Installer les dépendances runtime minimales
RUN apt-get update && apt-get install -y --no-install-recommends \
    curl \
    && rm -rf /var/lib/apt/lists/*

# Copier les packages Python depuis le builder
COPY --from=builder /install /usr/local

# Copier le code de l'application
WORKDIR /app
COPY app/ ./app/

# Créer les répertoires nécessaires
RUN mkdir -p /app/models /app/logs && \
    chown -R mluser:mluser /app

# Passer à l'utilisateur non-root
USER mluser

# Healthcheck
HEALTHCHECK --interval=30s --timeout=10s --start-period=60s --retries=3 \
    CMD curl -f http://localhost:${ML_PORT}/health || exit 1

# Port
EXPOSE ${ML_PORT}

# Commande de démarrage
CMD ["python", "-m", "uvicorn", "app.main:app", \
     "--host", "0.0.0.0", \
     "--port", "8000", \
     "--workers", "1", \
     "--log-level", "info", \
     "--access-log", \
     "--timeout-keep-alive", "65"]

Et voici la variante GPU pour les modèles qui nécessitent CUDA :

Dockerfile
# Dockerfile.gpu — Variante avec support CUDA
FROM nvidia/cuda:12.1.1-runtime-ubuntu22.04 AS runtime

ENV DEBIAN_FRONTEND=noninteractive
ENV PYTHONUNBUFFERED=1 \
    PYTHONDONTWRITEBYTECODE=1 \
    PYTHONPATH=/app

# Installer Python et dépendances
RUN apt-get update && apt-get install -y --no-install-recommends \
    python3.11 python3.11-venv python3-pip curl \
    && ln -s /usr/bin/python3.11 /usr/bin/python \
    && rm -rf /var/lib/apt/lists/*

WORKDIR /app

# Installer les dépendances Python
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

# Copier l'application
COPY app/ ./app/

# Utilisateur non-root
RUN groupadd -r mluser && useradd -r -g mluser -d /app mluser && \
    mkdir -p /app/models /app/logs && chown -R mluser:mluser /app
USER mluser

HEALTHCHECK --interval=30s --timeout=10s --start-period=120s --retries=3 \
    CMD curl -f http://localhost:8000/health || exit 1

EXPOSE 8000

CMD ["python", "-m", "uvicorn", "app.main:app", \
     "--host", "0.0.0.0", "--port", "8000", "--workers", "1"]

Le fichier .dockerignore est crucial pour éviter de copier des fichiers inutiles dans l’image :

Bash
# .dockerignore
__pycache__/
*.pyc
*.pyo
.git/
.gitignore
.env
.env.*
*.md
tests/
docs/
notebooks/
.venv/
venv/
.pytest_cache/
.mypy_cache/
*.egg-info/
dist/
build/
.DS_Store
*.log

⚠️ Attention

Ne mettez JAMAIS vos clés API, mots de passe ou secrets dans le Dockerfile ou l’image Docker. Utilisez des variables d’environnement (passées au runtime via docker run -e ou docker-compose) ou un gestionnaire de secrets (Docker Secrets, AWS Secrets Manager, HashiCorp Vault). Une clé dans une image Docker est une fuite de sécurité — les images sont souvent partagées ou stockées dans des registries.

5. Healthchecks et readiness probes

Les healthchecks sont essentiels en production pour que l’orchestrateur (Docker Compose, Kubernetes) sache si votre service est opérationnel. Il existe deux types de vérifications complémentaires :

La distinction est importante : pendant le chargement initial du modèle (qui peut prendre 30-120 secondes pour un LLM), le service est “alive” mais pas “ready”. L’orchestrateur ne doit pas le tuer (il se charge) ni lui envoyer de trafic (il n’est pas prêt).

Le paramètre --start-period dans le HEALTHCHECK Docker est crucial : il définit le délai avant le premier check, laissant au modèle le temps de se charger. Pour un modèle 7B, prévoyez 60-120 secondes en CPU, 30-60 secondes en GPU.

Pour Kubernetes, la configuration des probes se fait dans le manifest de déploiement :

YAML
# kubernetes/deployment.yaml (extrait)
containers:
  - name: ml-api
    image: ml-api:latest
    ports:
      - containerPort: 8000
    livenessProbe:
      httpGet:
        path: /health
        port: 8000
      initialDelaySeconds: 30
      periodSeconds: 15
      timeoutSeconds: 5
      failureThreshold: 3
    readinessProbe:
      httpGet:
        path: /ready
        port: 8000
      initialDelaySeconds: 60
      periodSeconds: 10
      timeoutSeconds: 5
      failureThreshold: 3
    startupProbe:
      httpGet:
        path: /health
        port: 8000
      initialDelaySeconds: 10
      periodSeconds: 10
      failureThreshold: 30  # 30 × 10s = 5 min max pour le startup
    resources:
      requests:
        memory: "4Gi"
        cpu: "2"
      limits:
        memory: "8Gi"
        nvidia.com/gpu: "1"

6. Docker Compose pour l’orchestration locale

Docker Compose orchestre plusieurs conteneurs comme un seul système. Voici un fichier complet incluant l’API, le monitoring Prometheus, Grafana, et un reverse proxy Nginx :

YAML
# docker-compose.yml
version: "3.9"

services:
  # ========================================
  # API d'inférence ML
  # ========================================
  ml-api:
    build:
      context: .
      dockerfile: Dockerfile
    container_name: ml-api
    ports:
      - "8000:8000"
    environment:
      - ML_MODEL_NAME=distilbert-base-uncased-finetuned-sst-2-english
      - ML_MODEL_DEVICE=cpu
      - ML_LOG_LEVEL=INFO
      - ML_CACHE_ENABLED=true
    volumes:
      - model-cache:/app/models    # Cache des modèles téléchargés
      - ./logs:/app/logs           # Logs persistants
    healthcheck:
      test: ["CMD", "curl", "-f", "http://localhost:8000/health"]
      interval: 30s
      timeout: 10s
      retries: 3
      start_period: 60s
    restart: unless-stopped
    networks:
      - ml-network
    deploy:
      resources:
        limits:
          memory: 4G
          cpus: "2.0"

  # ========================================
  # Nginx Reverse Proxy
  # ========================================
  nginx:
    image: nginx:alpine
    container_name: ml-nginx
    ports:
      - "80:80"
    volumes:
      - ./nginx.conf:/etc/nginx/conf.d/default.conf:ro
    depends_on:
      ml-api:
        condition: service_healthy
    networks:
      - ml-network
    restart: unless-stopped

  # ========================================
  # Prometheus (métriques)
  # ========================================
  prometheus:
    image: prom/prometheus:latest
    container_name: ml-prometheus
    ports:
      - "9090:9090"
    volumes:
      - ./prometheus.yml:/etc/prometheus/prometheus.yml:ro
      - prometheus-data:/prometheus
    command:
      - '--config.file=/etc/prometheus/prometheus.yml'
      - '--storage.tsdb.retention.time=30d'
    networks:
      - ml-network
    restart: unless-stopped

  # ========================================
  # Grafana (dashboards)
  # ========================================
  grafana:
    image: grafana/grafana:latest
    container_name: ml-grafana
    ports:
      - "3000:3000"
    environment:
      - GF_SECURITY_ADMIN_PASSWORD=admin
      - GF_USERS_ALLOW_SIGN_UP=false
    volumes:
      - grafana-data:/var/lib/grafana
    depends_on:
      - prometheus
    networks:
      - ml-network
    restart: unless-stopped

volumes:
  model-cache:
  prometheus-data:
  grafana-data:

networks:
  ml-network:
    driver: bridge

Et la configuration Nginx pour le reverse proxy avec rate limiting :

Nginx
# nginx.conf
upstream ml_backend {
    server ml-api:8000;
    # Ajouter d'autres instances pour le load balancing :
    # server ml-api-2:8000;
}

limit_req_zone $binary_remote_addr zone=api_limit:10m rate=30r/m;

server {
    listen 80;
    server_name _;

    # Compression
    gzip on;
    gzip_types application/json;

    # Timeouts adaptés aux modèles ML (inférence peut être lente)
    proxy_connect_timeout 10s;
    proxy_send_timeout 60s;
    proxy_read_timeout 120s;

    # API d'inférence avec rate limiting
    location /predict {
        limit_req zone=api_limit burst=10 nodelay;
        proxy_pass http://ml_backend;
        proxy_set_header Host $host;
        proxy_set_header X-Real-IP $remote_addr;
        proxy_set_header X-Request-ID $request_id;
    }

    # Health et métriques (pas de rate limit)
    location ~ ^/(health|ready|metrics) {
        proxy_pass http://ml_backend;
    }

    # Documentation
    location ~ ^/(docs|redoc|openapi.json) {
        proxy_pass http://ml_backend;
    }
}

7. Monitoring avec Prometheus et Grafana

Le monitoring est non négociable en production. Sans visibilité sur le comportement de votre service, vous volez à l’aveugle. Prometheus collecte les métriques, Grafana les visualise.

Configuration Prometheus :

YAML
# prometheus.yml
global:
  scrape_interval: 15s
  evaluation_interval: 15s

scrape_configs:
  - job_name: "ml-api"
    metrics_path: "/metrics"
    static_configs:
      - targets: ["ml-api:8000"]
    scrape_interval: 10s

Les métriques que nous avons définies dans notre API (via prometheus_client) sont automatiquement exposées sur l’endpoint /metrics. Voici les métriques clés à surveiller :

Pour les métriques GPU, ajoutez le NVIDIA DCGM Exporter à votre stack :

Python
# app/middleware/metrics.py — Métriques avancées
import psutil
import torch
from prometheus_client import Gauge, Info

# Métriques système
CPU_USAGE = Gauge("ml_cpu_usage_percent", "Utilisation CPU")
MEMORY_USAGE = Gauge("ml_memory_usage_bytes", "Utilisation mémoire")
GPU_MEMORY_USED = Gauge("ml_gpu_memory_used_bytes", "VRAM GPU utilisée")
GPU_MEMORY_TOTAL = Gauge("ml_gpu_memory_total_bytes", "VRAM GPU totale")
GPU_UTILIZATION = Gauge("ml_gpu_utilization_percent", "Utilisation GPU")
CACHE_SIZE = Gauge("ml_cache_entries", "Nombre d'entrées en cache")
MODEL_INFO = Info("ml_model", "Informations sur le modèle")


def update_system_metrics():
    """Met à jour les métriques système (appelé périodiquement)."""
    CPU_USAGE.set(psutil.cpu_percent())
    MEMORY_USAGE.set(psutil.Process().memory_info().rss)
    
    if torch.cuda.is_available():
        gpu_mem = torch.cuda.mem_get_info(0)
        GPU_MEMORY_USED.set(gpu_mem[1] - gpu_mem[0])
        GPU_MEMORY_TOTAL.set(gpu_mem[1])


# Appeler update_system_metrics() dans un background task toutes les 15s

💡 Astuce

Créez des alertes Grafana sur les métriques critiques : latence P99 > 2s, taux d’erreur > 1 %, VRAM GPU > 90 %, modèle non chargé. Configurez des notifications Slack ou email pour être prévenu en cas de problème. Un dashboard bien conçu vous fait gagner des heures de débugging.

8. Scaling et gestion de la charge

Quand votre API doit gérer plus de trafic, plusieurs stratégies de scaling s’offrent à vous. Le choix dépend de votre infrastructure et de vos contraintes.

Scaling horizontal avec Docker Compose :

Bash
# Lancer 3 répliques de l'API
docker compose up -d --scale ml-api=3

# Nginx ou Traefik se charge du load balancing automatiquement

Scaling avec Kubernetes (HPA — Horizontal Pod Autoscaler) :

YAML
# kubernetes/hpa.yaml
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
  name: ml-api-hpa
spec:
  scaleTargetRef:
    apiVersion: apps/v1
    kind: Deployment
    name: ml-api
  minReplicas: 2
  maxReplicas: 10
  metrics:
    - type: Resource
      resource:
        name: cpu
        target:
          type: Utilization
          averageUtilization: 70
    - type: Pods
      pods:
        metric:
          name: ml_request_duration_seconds
        target:
          type: AverageValue
          averageValue: "500m"  # 500ms de latence moyenne max

Optimisations de throughput :

9. CI/CD : automatiser le déploiement

Un pipeline CI/CD automatise les tests, le build de l’image Docker, et le déploiement. Voici un pipeline GitHub Actions complet :

YAML
# .github/workflows/deploy.yml
name: Build & Deploy ML API

on:
  push:
    branches: [main]
  pull_request:
    branches: [main]

env:
  REGISTRY: ghcr.io
  IMAGE_NAME: ${{ github.repository }}/ml-api

jobs:
  test:
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v4
      
      - name: Set up Python
        uses: actions/setup-python@v5
        with:
          python-version: "3.11"
      
      - name: Install dependencies
        run: |
          pip install -r requirements.txt
          pip install -r requirements-dev.txt
      
      - name: Lint
        run: |
          ruff check app/
          mypy app/ --ignore-missing-imports
      
      - name: Tests
        run: pytest tests/ -v --cov=app --cov-report=xml
      
      - name: Upload coverage
        uses: codecov/codecov-action@v3

  build:
    needs: test
    runs-on: ubuntu-latest
    if: github.event_name == 'push' && github.ref == 'refs/heads/main'
    
    steps:
      - uses: actions/checkout@v4
      
      - name: Set up Docker Buildx
        uses: docker/setup-buildx-action@v3
      
      - name: Login to Container Registry
        uses: docker/login-action@v3
        with:
          registry: ${{ env.REGISTRY }}
          username: ${{ github.actor }}
          password: ${{ secrets.GITHUB_TOKEN }}
      
      - name: Build and push
        uses: docker/build-push-action@v5
        with:
          context: .
          push: true
          tags: |
            ${{ env.REGISTRY }}/${{ env.IMAGE_NAME }}:latest
            ${{ env.REGISTRY }}/${{ env.IMAGE_NAME }}:${{ github.sha }}
          cache-from: type=gha
          cache-to: type=gha,mode=max

  deploy:
    needs: build
    runs-on: ubuntu-latest
    environment: production
    
    steps:
      - name: Deploy to production
        run: |
          # SSH et redéploiement (adapter à votre infra)
          echo "Deploying ${{ env.REGISTRY }}/${{ env.IMAGE_NAME }}:${{ github.sha }}"
          # kubectl set image deployment/ml-api ml-api=$IMAGE:$TAG
          # ou docker compose pull && docker compose up -d

10. Sécurité et bonnes pratiques

La sécurité d’une API ML est souvent négligée. Voici les mesures essentielles à mettre en place :

Python
# Authentification par clé API simple
from fastapi import Security, HTTPException
from fastapi.security import APIKeyHeader

api_key_header = APIKeyHeader(name="X-API-Key")
VALID_API_KEYS = set(os.getenv("API_KEYS", "").split(","))

async def verify_api_key(api_key: str = Security(api_key_header)):
    if api_key not in VALID_API_KEYS:
        raise HTTPException(status_code=403, detail="Clé API invalide")
    return api_key

# Appliquer à un endpoint
@app.post("/predict", dependencies=[Depends(verify_api_key)])
async def predict(request: PredictionRequest):
    ...

⚠️ Attention

Scannez régulièrement vos images Docker avec trivy image mon-image:latest. Les images Python contiennent souvent des vulnérabilités dans les dépendances système. Mettez à jour votre image de base régulièrement et épinglez les versions de vos dépendances dans requirements.txt.

11. Optimisations de performance

Pour tirer le maximum de performance de votre déploiement, voici les optimisations avancées à considérer :

1. ONNX Runtime pour l’inférence CPU. Convertissez votre modèle PyTorch en ONNX pour des accélérations de 2-5x sur CPU grâce aux optimisations de graphe et à la quantification INT8 automatique.

2. TensorRT pour l’inférence GPU. NVIDIA TensorRT optimise le graphe de calcul pour vos GPU spécifiques, avec du kernel fusion et de la quantification FP16/INT8. Les gains sont typiquement de 2-4x par rapport à PyTorch vanilla.

3. Pré-chargement et warmup. Effectuez une inférence “à vide” au démarrage pour initialiser tous les caches CUDA et optimiser le graphe de calcul. Les premières requêtes sont toujours plus lentes sans warmup.

4. Connection pooling. Si votre API appelle des services externes (base de données, autre API), utilisez un pool de connexions pour éviter le overhead de création de connexion à chaque requête.

5. Profiling. Utilisez py-spy ou cProfile pour identifier les goulots d’étranglement. Souvent, ce n’est pas l’inférence elle-même mais le pre/post-processing qui consomme le plus de temps.

12. Conclusion

Déployer un modèle d’IA en production avec Docker et FastAPI n’est plus un luxe réservé aux grandes entreprises. Avec les outils et les patterns présentés dans ce tutoriel, vous disposez d’une base solide pour servir n’importe quel modèle de manière fiable, performante et maintenable.

Les points essentiels à retenir :

L’étape suivante est l’adoption de Kubernetes pour un scaling plus avancé, avec l’opérateur NVIDIA pour la gestion automatique des GPU, et des outils comme Seldon Core ou KServe pour des fonctionnalités MLOps avancées (A/B testing, canary deployments, model monitoring).

“Le déploiement est la partie la plus sous-estimée du machine learning. Un modèle qui ne tourne pas en production n’a aucune valeur business. Investissez autant d’énergie dans votre pipeline de déploiement que dans l’entraînement de vos modèles.”

— Marie Rousseau, VP Engineering chez ScaleAI France

📚 Sources

Retour à l'accueil