Automatiser sa veille technologique avec l’IA : agents, RSS et résumés automatiques

Niveau du tutoriel : Expert

Veille technologique automatisée

1. Introduction : pourquoi automatiser sa veille ?

Dans un écosystème technologique qui évolue à une vitesse vertigineuse, rester informé est devenu un défi quotidien pour tout professionnel du numérique. Chaque jour, des centaines d’articles de blog, de papers de recherche, de tweets influents, de newsletters spécialisées et de posts sur des forums comme Hacker News ou Reddit viennent s’ajouter au flot continu d’informations. La veille technologique manuelle — parcourir ses bookmarks, scroller ses timelines, lire chaque newsletter — consomme un temps précieux qui pourrait être consacré à des tâches à plus forte valeur ajoutée.

L’intelligence artificielle, et en particulier les grands modèles de langage (LLM), offre aujourd’hui une solution élégante à ce problème. En combinant des techniques de collecte automatisée de données (flux RSS, scraping web), des capacités de traitement du langage naturel (résumé, classification, extraction de thèmes), et des mécanismes de notification intelligente, il est possible de construire un système de veille entièrement automatisé qui vous livre, chaque matin, un digest personnalisé des actualités les plus pertinentes dans vos domaines d’intérêt.

Ce tutoriel s’adresse aux développeurs débutants qui souhaitent construire, étape par étape, un pipeline de veille technologique alimenté par l’IA. Nous utiliserons exclusivement des outils open source et des bibliothèques Python populaires. À la fin de cet article, vous disposerez d’un système fonctionnel capable de collecter automatiquement des articles depuis des dizaines de sources, de les résumer intelligemment, et de vous notifier sur Telegram ou par email avec un digest quotidien personnalisé.

ℹ️ Ce que vous allez construire
Un pipeline complet qui : (1) collecte automatiquement les articles de vos sources favorites via RSS et scraping, (2) les résume avec un LLM local ou via API, (3) vous envoie un digest quotidien sur Telegram ou par email, (4) affiche un dashboard interactif pour explorer vos articles. Le tout orchestré par des tâches planifiées avec cron ou Celery.

2. Identifier et organiser ses sources

La première étape de toute stratégie de veille efficace est l’identification et l’organisation méthodique de vos sources d’information. La qualité de votre veille dépend directement de la qualité de vos sources : une curation initiale soignée vous évitera de noyer les signaux importants dans un océan de bruit informationnel.

2.1 Types de sources

Les sources de veille technologique se répartissent en plusieurs catégories complémentaires. Les flux RSS restent le moyen le plus fiable et structuré de suivre des publications régulières : blogs techniques d’entreprises (Google AI Blog, Meta AI, OpenAI), blogs personnels de chercheurs et développeurs influents, médias spécialisés (TechCrunch, The Verge, Ars Technica), et agrégateurs comme Hacker News. Les newsletters constituent une source curatée par des experts : TLDR, The Batch (Andrew Ng), Morning Brew Tech, Import AI, et Ben’s Bites sont parmi les plus populaires en IA. Les réseaux sociaux — principalement Twitter/X, LinkedIn et Mastodon — offrent des informations en temps réel mais nécessitent un filtrage plus agressif. Enfin, les repositories GitHub et les preprints arXiv sont indispensables pour suivre les avancées en recherche.

2.2 Structure de configuration

Nous allons organiser nos sources dans un fichier de configuration YAML qui servira de référence pour l’ensemble du pipeline. Cette approche modulaire permet d’ajouter ou de supprimer des sources facilement sans modifier le code.

# config/sources.yaml - Configuration des sources de veille
sources:
  rss:
    - name: "Hacker News - Best"
      url: "https://hnrss.org/best"
      category: "tech_general"
      priority: "high"
      language: "en"

    - name: "Google AI Blog"
      url: "https://blog.google/technology/ai/rss/"
      category: "ai_research"
      priority: "high"
      language: "en"

    - name: "OpenAI Blog"
      url: "https://openai.com/blog/rss.xml"
      category: "ai_research"
      priority: "high"
      language: "en"

    - name: "Hugging Face Blog"
      url: "https://huggingface.co/blog/feed.xml"
      category: "ml_tools"
      priority: "medium"
      language: "en"

    - name: "Towards Data Science"
      url: "https://towardsdatascience.com/feed"
      category: "data_science"
      priority: "medium"
      language: "en"

    - name: "Le Blog du Modérateur"
      url: "https://www.blogdumoderateur.com/feed/"
      category: "tech_general"
      priority: "medium"
      language: "fr"

    - name: "MIT Technology Review"
      url: "https://www.technologyreview.com/feed/"
      category: "tech_general"
      priority: "high"
      language: "en"

    - name: "ArXiv CS.AI"
      url: "http://export.arxiv.org/rss/cs.AI"
      category: "ai_research"
      priority: "high"
      language: "en"

  scraping:
    - name: "Product Hunt"
      url: "https://www.producthunt.com/"
      selector: ".post-name"
      category: "products"
      priority: "low"

  newsletters:
    - name: "TLDR"
      email_filter: "dan@tldrnewsletter.com"
      category: "tech_general"
      priority: "high"

settings:
  max_articles_per_source: 20
  max_age_hours: 48
  summary_max_words: 150
  digest_time: "08:00"
  timezone: "Europe/Paris"

categories:
  ai_research: "🧠 Recherche IA"
  ml_tools: "🔧 Outils ML"
  tech_general: "💻 Tech Général"
  data_science: "📊 Data Science"
  products: "🚀 Produits"

Cette configuration centralise toutes les informations nécessaires : l’URL de chaque source, sa catégorie thématique, son niveau de priorité, et les paramètres globaux du pipeline. L’utilisation du format YAML rend le fichier lisible et facile à maintenir, même pour des non-développeurs.

3. Collecter les flux RSS avec feedparser

Le protocole RSS (Really Simple Syndication) est un format standardisé basé sur XML qui permet aux sites web de publier un flux structuré de leurs contenus récents. Malgré la montée en puissance des réseaux sociaux, RSS reste le backbone de la syndication de contenu sur le web et le moyen le plus fiable de collecter automatiquement des articles. La bibliothèque Python feedparser simplifie considérablement le parsing de ces flux en gérant les différentes versions de RSS (0.9x, 1.0, 2.0) ainsi que le format Atom.

import feedparser
import yaml
import sqlite3
import hashlib
from datetime import datetime, timedelta
from dataclasses import dataclass, field
from typing import List, Optional
import time
import logging

# Configuration du logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger('veille')

@dataclass
class Article:
    """Représente un article collecté depuis une source."""
    title: str
    url: str
    source: str
    category: str
    published: datetime
    summary: str = ""
    content: str = ""
    ai_summary: str = ""
    priority: str = "medium"
    language: str = "en"
    hash_id: str = ""

    def __post_init__(self):
        if not self.hash_id:
            # Identifiant unique basé sur l'URL
            self.hash_id = hashlib.md5(self.url.encode()).hexdigest()


class RSSCollector:
    """Collecte les articles depuis des flux RSS."""

    def __init__(self, config_path: str, db_path: str = "veille.db"):
        with open(config_path, 'r') as f:
            self.config = yaml.safe_load(f)

        self.db_path = db_path
        self._init_database()

    def _init_database(self):
        """Initialise la base SQLite pour stocker les articles."""
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()
        cursor.execute('''
            CREATE TABLE IF NOT EXISTS articles (
                hash_id TEXT PRIMARY KEY,
                title TEXT NOT NULL,
                url TEXT NOT NULL,
                source TEXT NOT NULL,
                category TEXT,
                published TIMESTAMP,
                summary TEXT,
                content TEXT,
                ai_summary TEXT,
                priority TEXT DEFAULT 'medium',
                language TEXT DEFAULT 'en',
                is_read BOOLEAN DEFAULT FALSE,
                is_sent BOOLEAN DEFAULT FALSE,
                created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
            )
        ''')
        cursor.execute('''
            CREATE INDEX IF NOT EXISTS idx_published
            ON articles(published DESC)
        ''')
        cursor.execute('''
            CREATE INDEX IF NOT EXISTS idx_category
            ON articles(category)
        ''')
        conn.commit()
        conn.close()
        logger.info(f"Base de données initialisée : {self.db_path}")

    def fetch_feed(self, feed_config: dict) -> List[Article]:
        """Récupère et parse un flux RSS individuel."""
        url = feed_config['url']
        name = feed_config['name']
        logger.info(f"Récupération du flux : {name}")

        try:
            feed = feedparser.parse(url)

            if feed.bozo and not feed.entries:
                logger.warning(f"Erreur de parsing pour {name}: {feed.bozo_exception}")
                return []

            articles = []
            max_age = timedelta(
                hours=self.config['settings']['max_age_hours']
            )
            cutoff = datetime.now() - max_age
            max_articles = self.config['settings']['max_articles_per_source']

            for entry in feed.entries[:max_articles]:
                # Parser la date de publication
                published = self._parse_date(entry)
                if published and published < cutoff:
                    continue  # Article trop ancien

                # Extraire le contenu
                content = ""
                if hasattr(entry, 'content'):
                    content = entry.content[0].get('value', '')
                elif hasattr(entry, 'description'):
                    content = entry.description

                article = Article(
                    title=entry.get('title', 'Sans titre'),
                    url=entry.get('link', ''),
                    source=name,
                    category=feed_config.get('category', 'general'),
                    published=published or datetime.now(),
                    summary=entry.get('summary', '')[:500],
                    content=content,
                    priority=feed_config.get('priority', 'medium'),
                    language=feed_config.get('language', 'en')
                )
                articles.append(article)

            logger.info(f"  → {len(articles)} articles récupérés depuis {name}")
            return articles

        except Exception as e:
            logger.error(f"Erreur lors de la récupération de {name}: {e}")
            return []

    def _parse_date(self, entry) -> Optional[datetime]:
        """Parse la date d'un entry RSS avec gestion des formats multiples."""
        for date_field in ['published_parsed', 'updated_parsed', 'created_parsed']:
            date_tuple = entry.get(date_field)
            if date_tuple:
                try:
                    return datetime(*date_tuple[:6])
                except (ValueError, TypeError):
                    continue
        return None

    def collect_all(self) -> List[Article]:
        """Collecte les articles de toutes les sources RSS configurées."""
        all_articles = []

        for feed_config in self.config['sources']['rss']:
            articles = self.fetch_feed(feed_config)
            all_articles.extend(articles)
            time.sleep(1)  # Respecter les serveurs

        # Dédupliquer par hash_id
        seen = set()
        unique_articles = []
        for article in all_articles:
            if article.hash_id not in seen:
                seen.add(article.hash_id)
                unique_articles.append(article)

        logger.info(f"Total : {len(unique_articles)} articles uniques collectés")
        return unique_articles

    def save_articles(self, articles: List[Article]):
        """Sauvegarde les articles dans la base SQLite."""
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()
        saved = 0

        for article in articles:
            try:
                cursor.execute('''
                    INSERT OR IGNORE INTO articles
                    (hash_id, title, url, source, category, published,
                     summary, content, priority, language)
                    VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
                ''', (
                    article.hash_id, article.title, article.url,
                    article.source, article.category,
                    article.published.isoformat(),
                    article.summary, article.content,
                    article.priority, article.language
                ))
                if cursor.rowcount > 0:
                    saved += 1
            except sqlite3.Error as e:
                logger.error(f"Erreur SQLite pour '{article.title}': {e}")

        conn.commit()
        conn.close()
        logger.info(f"{saved} nouveaux articles sauvegardés en base")

# Utilisation
collector = RSSCollector('config/sources.yaml')
articles = collector.collect_all()
collector.save_articles(articles)

Le code ci-dessus implémente un collecteur RSS robuste avec plusieurs bonnes pratiques essentielles. La base de données SQLite assure la persistance des articles entre les exécutions et permet la déduplication via le hash MD5 de l’URL. Le délai d’une seconde entre chaque requête respecte les serveurs et évite le rate limiting. La gestion des erreurs permet au collecteur de continuer même si une source individuelle échoue. Les index sur la date de publication et la catégorie garantissent des requêtes rapides même avec des milliers d’articles.

4. Scraping web avec BeautifulSoup

Certaines sources d’information ne proposent pas de flux RSS. Dans ces cas, le scraping web — l’extraction automatisée de données depuis des pages HTML — devient nécessaire. BeautifulSoup, combiné avec la bibliothèque requests, offre une solution simple et efficace pour extraire du contenu structuré depuis n’importe quelle page web. Pour les sites qui utilisent du rendu JavaScript côté client, nous utiliserons également requests-html ou Playwright comme alternative.

⚠️ Éthique du scraping
Avant de scraper un site, vérifiez toujours son fichier robots.txt et ses conditions d’utilisation. Respectez les délais entre les requêtes (minimum 1 seconde), identifiez-vous via un User-Agent descriptif, et ne surchargez jamais un serveur. Le scraping abusif peut entraîner le blocage de votre IP et des conséquences légales dans certaines juridictions.
import requests
from bs4 import BeautifulSoup
from urllib.parse import urljoin, urlparse
import re
import time

class WebScraper:
    """Scraper web éthique pour la veille technologique."""

    def __init__(self):
        self.session = requests.Session()
        self.session.headers.update({
            'User-Agent': 'VeilleTechBot/1.0 (veille automatique; +https://example.com/bot)',
            'Accept': 'text/html,application/xhtml+xml',
            'Accept-Language': 'fr,en;q=0.9',
        })
        self.rate_limit = 2  # secondes entre chaque requête

    def check_robots_txt(self, base_url: str, path: str = "/") -> bool:
        """Vérifie si le scraping est autorisé par robots.txt."""
        robots_url = urljoin(base_url, '/robots.txt')
        try:
            resp = self.session.get(robots_url, timeout=10)
            if resp.status_code == 200:
                # Vérification basique - en production, utiliser robotparser
                content = resp.text.lower()
                if 'disallow: /' in content and 'user-agent: *' in content:
                    return False
            return True
        except requests.RequestException:
            return True  # En cas d'erreur, on suppose que c'est autorisé

    def scrape_page(self, url: str) -> dict:
        """Scrape une page web et extrait le contenu principal."""
        time.sleep(self.rate_limit)

        try:
            response = self.session.get(url, timeout=15)
            response.raise_for_status()
            response.encoding = response.apparent_encoding

            soup = BeautifulSoup(response.text, 'html.parser')

            # Supprimer les éléments non pertinents
            for tag in soup.find_all(['script', 'style', 'nav', 'footer',
                                       'header', 'aside', 'iframe', 'noscript']):
                tag.decompose()

            # Extraire le titre
            title = ""
            if soup.find('h1'):
                title = soup.find('h1').get_text(strip=True)
            elif soup.find('title'):
                title = soup.find('title').get_text(strip=True)

            # Extraire le contenu principal
            # Chercher les conteneurs courants d'articles
            content_selectors = [
                'article', '.post-content', '.article-content',
                '.entry-content', '#content', 'main',
                '[role="main"]', '.story-body'
            ]

            content = ""
            for selector in content_selectors:
                element = soup.select_one(selector)
                if element:
                    content = element.get_text(separator='\n', strip=True)
                    break

            if not content:
                # Fallback : extraire tous les paragraphes
                paragraphs = soup.find_all('p')
                content = '\n'.join(p.get_text(strip=True) for p in paragraphs
                                   if len(p.get_text(strip=True)) > 50)

            # Extraire les métadonnées
            meta = {}
            for tag in soup.find_all('meta'):
                name = tag.get('name', tag.get('property', ''))
                if name and tag.get('content'):
                    meta[name] = tag['content']

            return {
                'title': title,
                'content': content[:5000],  # Limiter la taille
                'url': url,
                'description': meta.get('description',
                               meta.get('og:description', '')),
                'author': meta.get('author', ''),
                'published': meta.get('article:published_time', ''),
                'word_count': len(content.split()),
                'success': True
            }

        except requests.RequestException as e:
            logger.error(f"Erreur de scraping pour {url}: {e}")
            return {'url': url, 'success': False, 'error': str(e)}

    def scrape_hacker_news(self, pages: int = 1) -> List[dict]:
        """Scrape les articles populaires de Hacker News."""
        articles = []

        for page in range(1, pages + 1):
            url = f"https://news.ycombinator.com/news?p={page}"
            time.sleep(self.rate_limit)

            try:
                response = self.session.get(url, timeout=15)
                soup = BeautifulSoup(response.text, 'html.parser')

                # Parser la structure de HN
                rows = soup.find_all('tr', class_='athing')

                for row in rows:
                    title_cell = row.find('span', class_='titleline')
                    if not title_cell:
                        continue

                    link = title_cell.find('a')
                    if not link:
                        continue

                    title = link.get_text(strip=True)
                    href = link.get('href', '')

                    # Score et commentaires (ligne suivante)
                    subtext = row.find_next_sibling('tr')
                    score = 0
                    comments = 0
                    if subtext:
                        score_span = subtext.find('span', class_='score')
                        if score_span:
                            score_match = re.search(r'(\d+)', score_span.text)
                            score = int(score_match.group(1)) if score_match else 0

                    articles.append({
                        'title': title,
                        'url': href if href.startswith('http') else urljoin(url, href),
                        'score': score,
                        'source': 'Hacker News'
                    })

                logger.info(f"HN page {page}: {len(rows)} articles trouvés")

            except Exception as e:
                logger.error(f"Erreur scraping HN page {page}: {e}")

        # Trier par score décroissant
        articles.sort(key=lambda x: x['score'], reverse=True)
        return articles

# Utilisation
scraper = WebScraper()

# Scraper un article spécifique
result = scraper.scrape_page("https://example.com/article")
if result['success']:
    print(f"Titre: {result['title']}")
    print(f"Mots: {result['word_count']}")

# Scraper Hacker News
hn_articles = scraper.scrape_hacker_news(pages=2)
for article in hn_articles[:5]:
    print(f"[{article['score']}] {article['title']}")

Ce scraper implémente plusieurs principes importants : le respect du fichier robots.txt, le rate limiting entre les requêtes, un User-Agent identifiable, et une extraction de contenu intelligente qui essaie plusieurs sélecteurs CSS courants avant de se rabattre sur une extraction générique des paragraphes. La méthode spécialisée pour Hacker News montre comment adapter le scraping à la structure HTML spécifique d’un site.

5. Résumé automatique avec un LLM

Le cœur de notre système de veille intelligente réside dans sa capacité à résumer automatiquement les articles collectés. Plutôt que de lire intégralement chaque article, notre pipeline va utiliser un grand modèle de langage (LLM) pour générer des résumés concis, pertinents et informatifs. Nous allons explorer deux approches complémentaires : l’utilisation d’une API cloud (OpenAI, Anthropic) pour une qualité maximale, et l’exécution d’un modèle local avec Ollama pour la confidentialité et l’économie.

import openai
import requests
import json
from abc import ABC, abstractmethod
from typing import Optional

class BaseSummarizer(ABC):
    """Interface abstraite pour les résumeurs."""

    @abstractmethod
    def summarize(self, text: str, max_words: int = 150) -> str:
        pass

    @abstractmethod
    def classify_topic(self, text: str, categories: list) -> str:
        pass


class OpenAISummarizer(BaseSummarizer):
    """Résumeur utilisant l'API OpenAI."""

    def __init__(self, api_key: str, model: str = "gpt-4o-mini"):
        self.client = openai.OpenAI(api_key=api_key)
        self.model = model

    def summarize(self, text: str, max_words: int = 150) -> str:
        """Génère un résumé concis d'un article."""
        prompt = f"""Résume l'article suivant en {max_words} mots maximum.
Le résumé doit :
- Capturer les points clés et les informations nouvelles
- Être écrit dans la même langue que l'article
- Commencer directement par l'information principale
- Mentionner les chiffres ou données importants

Article :
{text[:4000]}

Résumé :"""

        try:
            response = self.client.chat.completions.create(
                model=self.model,
                messages=[
                    {"role": "system", "content": "Tu es un assistant spécialisé dans la synthèse d'articles technologiques. Tu produis des résumés précis, concis et informatifs."},
                    {"role": "user", "content": prompt}
                ],
                max_tokens=300,
                temperature=0.3  # Faible température pour la cohérence
            )
            return response.choices[0].message.content.strip()
        except Exception as e:
            logger.error(f"Erreur OpenAI: {e}")
            return text[:200] + "..."  # Fallback : troncature simple

    def classify_topic(self, text: str, categories: list) -> str:
        """Classifie un article dans une catégorie."""
        prompt = f"""Classifie l'article suivant dans exactement UNE des catégories suivantes :
{', '.join(categories)}

Article : {text[:2000]}

Réponds uniquement par le nom de la catégorie, rien d'autre."""

        try:
            response = self.client.chat.completions.create(
                model=self.model,
                messages=[{"role": "user", "content": prompt}],
                max_tokens=20,
                temperature=0
            )
            category = response.choices[0].message.content.strip()
            return category if category in categories else categories[0]
        except Exception as e:
            logger.error(f"Erreur classification: {e}")
            return categories[0]

    def extract_key_points(self, text: str, num_points: int = 5) -> list:
        """Extrait les points clés d'un article sous forme de liste."""
        prompt = f"""Extrais les {num_points} points clés les plus importants
de l'article suivant. Réponds sous forme de liste numérotée.

Article : {text[:4000]}

Points clés :"""

        try:
            response = self.client.chat.completions.create(
                model=self.model,
                messages=[{"role": "user", "content": prompt}],
                max_tokens=500,
                temperature=0.3
            )
            points = response.choices[0].message.content.strip().split('\n')
            return [p.strip() for p in points if p.strip()]
        except Exception as e:
            logger.error(f"Erreur extraction: {e}")
            return []


class OllamaSummarizer(BaseSummarizer):
    """Résumeur utilisant Ollama (modèle local)."""

    def __init__(self, model: str = "mistral:7b",
                 base_url: str = "http://localhost:11434"):
        self.model = model
        self.base_url = base_url
        self._check_availability()

    def _check_availability(self):
        """Vérifie que Ollama est accessible."""
        try:
            resp = requests.get(f"{self.base_url}/api/tags", timeout=5)
            models = [m['name'] for m in resp.json().get('models', [])]
            if self.model not in models:
                logger.warning(f"Modèle {self.model} non trouvé. "
                              f"Disponibles: {models}")
        except requests.ConnectionError:
            logger.error("Ollama n'est pas accessible. "
                        "Lancez: ollama serve")

    def summarize(self, text: str, max_words: int = 150) -> str:
        """Résumé avec modèle local via Ollama."""
        prompt = f"""[INST] Résume cet article en {max_words} mots maximum.
Sois concis et factuel.

{text[:3000]} [/INST]"""

        try:
            response = requests.post(
                f"{self.base_url}/api/generate",
                json={
                    "model": self.model,
                    "prompt": prompt,
                    "stream": False,
                    "options": {
                        "temperature": 0.3,
                        "num_predict": 300,
                        "top_p": 0.9
                    }
                },
                timeout=60
            )
            return response.json()['response'].strip()
        except Exception as e:
            logger.error(f"Erreur Ollama: {e}")
            return text[:200] + "..."

    def classify_topic(self, text: str, categories: list) -> str:
        """Classification avec modèle local."""
        prompt = f"""Classifie dans une seule catégorie parmi : {', '.join(categories)}
Texte: {text[:1000]}
Catégorie:"""

        try:
            response = requests.post(
                f"{self.base_url}/api/generate",
                json={
                    "model": self.model,
                    "prompt": prompt,
                    "stream": False,
                    "options": {"temperature": 0, "num_predict": 20}
                },
                timeout=30
            )
            return response.json()['response'].strip().split('\n')[0]
        except Exception as e:
            return categories[0]


class ArticleSummarizer:
    """Orchestrateur de résumé avec fallback automatique."""

    def __init__(self, openai_key: Optional[str] = None):
        self.summarizers = []

        if openai_key:
            self.summarizers.append(OpenAISummarizer(openai_key))
            logger.info("Résumeur OpenAI configuré (primaire)")

        # Ollama comme fallback
        self.summarizers.append(OllamaSummarizer())
        logger.info("Résumeur Ollama configuré (fallback)")

    def summarize(self, article: Article) -> str:
        """Résume un article avec fallback automatique."""
        text = article.content or article.summary
        if not text:
            return "Contenu non disponible pour le résumé."

        for summarizer in self.summarizers:
            try:
                summary = summarizer.summarize(text)
                if summary and len(summary) > 20:
                    return summary
            except Exception as e:
                logger.warning(f"Fallback: {e}")
                continue

        return text[:200] + "..."  # Dernier recours

# Utilisation
summarizer = ArticleSummarizer(openai_key="sk-...")

for article in articles[:5]:
    article.ai_summary = summarizer.summarize(article)
    print(f"\n📄 {article.title}")
    print(f"📝 {article.ai_summary}")

L’architecture avec une classe abstraite BaseSummarizer et des implémentations concrètes permet de basculer facilement entre différents backends de résumé. Le pattern de fallback dans ArticleSummarizer garantit que le pipeline ne s’arrête jamais : si l’API OpenAI est indisponible ou renvoie une erreur, le système bascule automatiquement sur le modèle local Ollama, et en dernier recours sur une simple troncature du texte.

6. Notifications : Telegram et email

Un système de veille n’a de valeur que s’il vous transmet l’information au bon moment et via le bon canal. Nous allons implémenter deux canaux de notification complémentaires : Telegram pour les notifications instantanées sur mobile (idéal pour les alertes urgentes et le digest quotidien), et l’email pour des rapports plus détaillés avec mise en forme HTML.

6.1 Bot Telegram

Telegram offre une API de bot simple et gratuite, parfaitement adaptée aux notifications automatisées. Pour créer un bot, il suffit de contacter @BotFather sur Telegram, qui vous fournira un token d’API. Ensuite, envoyez un premier message à votre bot et récupérez votre chat_id via l’API getUpdates.

import requests
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
from datetime import datetime

class TelegramNotifier:
    """Envoie des notifications via un bot Telegram."""

    def __init__(self, bot_token: str, chat_id: str):
        self.bot_token = bot_token
        self.chat_id = chat_id
        self.base_url = f"https://api.telegram.org/bot{bot_token}"

    def send_message(self, text: str, parse_mode: str = "HTML"):
        """Envoie un message texte via Telegram."""
        # Telegram limite à 4096 caractères
        chunks = self._split_message(text, max_length=4000)

        for chunk in chunks:
            try:
                response = requests.post(
                    f"{self.base_url}/sendMessage",
                    json={
                        "chat_id": self.chat_id,
                        "text": chunk,
                        "parse_mode": parse_mode,
                        "disable_web_page_preview": True
                    },
                    timeout=10
                )
                response.raise_for_status()
            except requests.RequestException as e:
                logger.error(f"Erreur Telegram: {e}")

    def send_digest(self, articles: List[Article]):
        """Envoie un digest formaté des articles du jour."""
        now = datetime.now().strftime("%d/%m/%Y")
        header = f"📰 Veille Tech — {now}\n"
        header += f"📊 {len(articles)} articles collectés\n"
        header += "━" * 30 + "\n\n"

        # Grouper par catégorie
        by_category = {}
        for article in articles:
            cat = article.category
            if cat not in by_category:
                by_category[cat] = []
            by_category[cat].append(article)

        body = ""
        category_emojis = {
            'ai_research': '🧠', 'ml_tools': '🔧',
            'tech_general': '💻', 'data_science': '📊',
            'products': '🚀'
        }

        for category, cat_articles in by_category.items():
            emoji = category_emojis.get(category, '📌')
            body += f"\n{emoji} {category.replace('_', ' ').title()}\n\n"

            for article in cat_articles[:5]:  # Max 5 par catégorie
                priority_icon = "🔴" if article.priority == "high" else "🟡" if article.priority == "medium" else "⚪"
                summary = article.ai_summary or article.summary[:100]
                body += f"{priority_icon} {article.title}\n"
                body += f"   {summary[:120]}...\n\n"

        self.send_message(header + body)
        logger.info(f"Digest envoyé via Telegram ({len(articles)} articles)")

    def _split_message(self, text: str, max_length: int = 4000) -> list:
        """Découpe un message long en chunks compatibles Telegram."""
        if len(text) <= max_length:
            return [text]

        chunks = []
        while text:
            if len(text) <= max_length:
                chunks.append(text)
                break

            # Chercher un point de coupure naturel
            split_point = text.rfind('\n\n', 0, max_length)
            if split_point == -1:
                split_point = text.rfind('\n', 0, max_length)
            if split_point == -1:
                split_point = max_length

            chunks.append(text[:split_point])
            text = text[split_point:].lstrip()

        return chunks


class EmailNotifier:
    """Envoie des rapports par email avec mise en forme HTML."""

    def __init__(self, smtp_host: str, smtp_port: int,
                 username: str, password: str, from_email: str):
        self.smtp_host = smtp_host
        self.smtp_port = smtp_port
        self.username = username
        self.password = password
        self.from_email = from_email

    def send_digest(self, articles: List[Article], to_email: str):
        """Envoie un digest HTML par email."""
        now = datetime.now().strftime("%d/%m/%Y")
        subject = f"📰 Veille Tech — {now} ({len(articles)} articles)"

        # Construire le HTML de l'email
        html = f"""
        
        
            

📰 Veille Tech

{now} — {len(articles)} articles sélectionnés pour vous

""" # Grouper par catégorie by_category = {} for article in articles: cat = article.category if cat not in by_category: by_category[cat] = [] by_category[cat].append(article) for category, cat_articles in by_category.items(): html += f"""

{category.replace('_', ' ').title()}

""" for article in cat_articles: summary = article.ai_summary or article.summary[:200] priority_color = "#ef4444" if article.priority == "high" else "#f59e0b" html += f"""

{article.title}

{article.source} · {article.published.strftime('%d/%m %H:%M')}

{summary}

""" html += """

Généré automatiquement par VeilleTech AI 🤖

""" # Envoyer l'email msg = MIMEMultipart('alternative') msg['Subject'] = subject msg['From'] = self.from_email msg['To'] = to_email msg.attach(MIMEText(html, 'html')) try: with smtplib.SMTP(self.smtp_host, self.smtp_port) as server: server.starttls() server.login(self.username, self.password) server.send_message(msg) logger.info(f"Digest email envoyé à {to_email}") except smtplib.SMTPException as e: logger.error(f"Erreur email: {e}") # Configuration telegram = TelegramNotifier( bot_token="123456:ABC-DEF...", chat_id="987654321" ) email_notifier = EmailNotifier( smtp_host="smtp.gmail.com", smtp_port=587, username="votre.email@gmail.com", password="app_password_ici", from_email="votre.email@gmail.com" ) # Envoyer les notifications telegram.send_digest(articles) email_notifier.send_digest(articles, "destinataire@example.com")

7. Planification avec cron et Celery

Pour que notre système de veille fonctionne de manière autonome, nous devons planifier l’exécution automatique du pipeline de collecte, résumé et notification. Deux approches sont possibles selon la complexité de vos besoins : cron pour les planifications simples sur un serveur Linux, et Celery pour les architectures distribuées avec gestion avancée des tâches, des retries et du monitoring.

7.1 Planification avec cron

Cron est le planificateur de tâches natif de Linux, parfaitement adapté aux scripts simples qui doivent s’exécuter à intervalles réguliers. La syntaxe crontab utilise cinq champs (minute, heure, jour du mois, mois, jour de la semaine) suivis de la commande à exécuter.

# Éditer la crontab : crontab -e

# Collecte RSS toutes les 2 heures de 6h à 22h
0 6-22/2 * * * cd /home/user/veille && /usr/bin/python3 collect.py >> /var/log/veille/collect.log 2>&1

# Digest quotidien à 8h00 du lundi au vendredi
0 8 * * 1-5 cd /home/user/veille && /usr/bin/python3 digest.py >> /var/log/veille/digest.log 2>&1

# Nettoyage des articles de plus de 30 jours, le dimanche à 3h
0 3 * * 0 cd /home/user/veille && /usr/bin/python3 cleanup.py >> /var/log/veille/cleanup.log 2>&1
#!/usr/bin/env python3
# collect.py - Script de collecte planifié
"""
Script autonome de collecte et résumé des articles.
Conçu pour être exécuté via cron.
"""

import sys
import os
import logging
from pathlib import Path

# Ajouter le répertoire du projet au path
sys.path.insert(0, str(Path(__file__).parent))

from rss_collector import RSSCollector
from web_scraper import WebScraper
from summarizer import ArticleSummarizer

# Logging vers fichier
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s [%(levelname)s] %(message)s',
    handlers=[
        logging.FileHandler('/var/log/veille/collect.log'),
        logging.StreamHandler()
    ]
)
logger = logging.getLogger(__name__)

def main():
    logger.info("=" * 50)
    logger.info("Début de la collecte planifiée")

    try:
        # 1. Collecter les flux RSS
        collector = RSSCollector('config/sources.yaml')
        articles = collector.collect_all()
        collector.save_articles(articles)

        # 2. Enrichir avec le scraping
        scraper = WebScraper()
        for article in articles:
            if not article.content or len(article.content) < 100:
                result = scraper.scrape_page(article.url)
                if result['success']:
                    article.content = result['content']

        # 3. Résumer avec l'IA
        api_key = os.environ.get('OPENAI_API_KEY')
        summarizer = ArticleSummarizer(openai_key=api_key)

        new_articles = [a for a in articles if not a.ai_summary]
        for i, article in enumerate(new_articles):
            article.ai_summary = summarizer.summarize(article)
            logger.info(f"Résumé {i+1}/{len(new_articles)}: {article.title[:50]}")

        # 4. Mettre à jour la base
        collector.save_articles(articles)

        logger.info(f"Collecte terminée: {len(articles)} articles traités")

    except Exception as e:
        logger.error(f"Erreur critique: {e}", exc_info=True)
        sys.exit(1)

if __name__ == '__main__':
    main()

7.2 Planification avancée avec Celery

Pour les systèmes plus complexes nécessitant une gestion fine des tâches (retry automatique, limites de concurrence, monitoring en temps réel), Celery est la solution de référence en Python. Celery utilise un broker de messages (Redis ou RabbitMQ) pour distribuer les tâches entre des workers indépendants.

# celery_app.py - Configuration Celery
from celery import Celery
from celery.schedules import crontab

app = Celery('veille',
             broker='redis://localhost:6379/0',
             backend='redis://localhost:6379/1')

app.conf.update(
    task_serializer='json',
    accept_content=['json'],
    result_serializer='json',
    timezone='Europe/Paris',
    enable_utc=True,
    task_track_started=True,
    task_acks_late=True,           # Acquitter après exécution
    worker_prefetch_multiplier=1,  # 1 tâche à la fois par worker
    task_soft_time_limit=300,      # Timeout souple : 5 min
    task_time_limit=600,           # Timeout dur : 10 min
    task_default_retry_delay=60,   # Retry après 60s
    task_max_retries=3,
)

# Tâches planifiées
app.conf.beat_schedule = {
    'collect-rss-every-2-hours': {
        'task': 'tasks.collect_rss',
        'schedule': crontab(minute=0, hour='6-22/2'),
    },
    'daily-digest': {
        'task': 'tasks.send_digest',
        'schedule': crontab(minute=0, hour=8, day_of_week='1-5'),
    },
    'weekly-cleanup': {
        'task': 'tasks.cleanup_old_articles',
        'schedule': crontab(minute=0, hour=3, day_of_week=0),
    },
}

# tasks.py - Définition des tâches
from celery_app import app

@app.task(bind=True, max_retries=3)
def collect_rss(self):
    """Tâche de collecte RSS."""
    try:
        collector = RSSCollector('config/sources.yaml')
        articles = collector.collect_all()
        collector.save_articles(articles)

        # Lancer le résumé en chaîne
        for article in articles:
            if not article.ai_summary:
                summarize_article.delay(article.hash_id)

        return {'status': 'success', 'count': len(articles)}

    except Exception as e:
        self.retry(exc=e, countdown=120)

@app.task(bind=True, rate_limit='10/m')
def summarize_article(self, article_hash):
    """Résume un article individuel (rate limited)."""
    try:
        summarizer = ArticleSummarizer()
        # Charger l'article depuis la DB et le résumer
        # ...
        return {'status': 'summarized', 'hash': article_hash}
    except Exception as e:
        self.retry(exc=e, countdown=60)

@app.task
def send_digest():
    """Envoie le digest quotidien."""
    # Charger les articles des dernières 24h
    # Envoyer via Telegram et email
    pass

La configuration Celery ci-dessus intègre plusieurs bonnes pratiques de production. Le rate_limit='10/m' sur la tâche de résumé empêche de surcharger l'API du LLM avec trop de requêtes simultanées. Le task_acks_late=True garantit que les tâches ne sont acquittées qu'après leur exécution complète, évitant les pertes en cas de crash du worker. Le mécanisme de retry automatique avec backoff exponentiel assure la résilience face aux erreurs transitoires.

8. Dashboard interactif avec Streamlit

Pour visualiser, explorer et interagir avec vos articles de veille, nous allons créer un dashboard web interactif avec Streamlit. Streamlit est un framework Python qui permet de créer des applications web de data science en quelques dizaines de lignes de code, sans aucune connaissance en HTML, CSS ou JavaScript. C'est l'outil idéal pour créer rapidement une interface utilisable pour notre système de veille.

# dashboard.py - Dashboard Streamlit pour la veille
import streamlit as st
import sqlite3
import pandas as pd
from datetime import datetime, timedelta
import plotly.express as px
import plotly.graph_objects as go

# Configuration de la page
st.set_page_config(
    page_title="🔍 VeilleTech AI",
    page_icon="📰",
    layout="wide",
    initial_sidebar_state="expanded"
)

# Style personnalisé
st.markdown("""""", unsafe_allow_html=True)

@st.cache_resource
def get_database():
    """Connexion à la base SQLite."""
    return sqlite3.connect('veille.db', check_same_thread=False)

def load_articles(days: int = 7, category: str = None) -> pd.DataFrame:
    """Charge les articles depuis la base de données."""
    conn = get_database()
    query = """
        SELECT title, url, source, category, published,
               ai_summary, priority, language
        FROM articles
        WHERE published >= ?
    """
    params = [(datetime.now() - timedelta(days=days)).isoformat()]

    if category and category != "Toutes":
        query += " AND category = ?"
        params.append(category)

    query += " ORDER BY published DESC"

    df = pd.read_sql_query(query, conn, params=params)
    df['published'] = pd.to_datetime(df['published'])
    return df

# ===== SIDEBAR =====
st.sidebar.title("🔍 VeilleTech AI")
st.sidebar.markdown("---")

# Filtres
days_filter = st.sidebar.slider("Période (jours)", 1, 30, 7)

conn = get_database()
categories = pd.read_sql_query(
    "SELECT DISTINCT category FROM articles", conn
)['category'].tolist()
categories.insert(0, "Toutes")
category_filter = st.sidebar.selectbox("Catégorie", categories)

priority_filter = st.sidebar.multiselect(
    "Priorité",
    ["high", "medium", "low"],
    default=["high", "medium"]
)

search_query = st.sidebar.text_input("🔎 Recherche", "")

# Charger les données
df = load_articles(days=days_filter, category=category_filter)

if priority_filter:
    df = df[df['priority'].isin(priority_filter)]

if search_query:
    mask = (df['title'].str.contains(search_query, case=False, na=False) |
            df['ai_summary'].str.contains(search_query, case=False, na=False))
    df = df[mask]

# ===== MÉTRIQUES =====
st.title("📰 Dashboard de Veille Technologique")

col1, col2, col3, col4 = st.columns(4)
with col1:
    st.metric("Articles totaux", len(df))
with col2:
    st.metric("Sources actives", df['source'].nunique())
with col3:
    high_priority = len(df[df['priority'] == 'high'])
    st.metric("Haute priorité", high_priority)
with col4:
    today = len(df[df['published'].dt.date == datetime.now().date()])
    st.metric("Aujourd'hui", today)

st.markdown("---")

# ===== GRAPHIQUES =====
col_left, col_right = st.columns(2)

with col_left:
    st.subheader("📊 Articles par jour")
    daily = df.groupby(df['published'].dt.date).size().reset_index()
    daily.columns = ['date', 'count']
    fig = px.bar(daily, x='date', y='count',
                 color_discrete_sequence=['#667eea'])
    fig.update_layout(template='plotly_dark', height=300)
    st.plotly_chart(fig, use_container_width=True)

with col_right:
    st.subheader("📂 Répartition par catégorie")
    cat_counts = df['category'].value_counts().reset_index()
    cat_counts.columns = ['category', 'count']
    fig = px.pie(cat_counts, values='count', names='category',
                 color_discrete_sequence=px.colors.qualitative.Set2)
    fig.update_layout(template='plotly_dark', height=300)
    st.plotly_chart(fig, use_container_width=True)

st.markdown("---")

# ===== LISTE DES ARTICLES =====
st.subheader(f"📋 Articles ({len(df)} résultats)")

for _, row in df.iterrows():
    priority_emoji = {"high": "🔴", "medium": "🟡", "low": "⚪"}.get(
        row['priority'], "⚪"
    )

    with st.expander(f"{priority_emoji} {row['title']}", expanded=False):
        col_a, col_b = st.columns([3, 1])
        with col_a:
            if row['ai_summary']:
                st.markdown(f"**Résumé IA :** {row['ai_summary']}")
            st.markdown(f"🔗 [{row['url']}]({row['url']})")
        with col_b:
            st.caption(f"📌 {row['source']}")
            st.caption(f"📅 {row['published'].strftime('%d/%m/%Y %H:%M')}")
            st.caption(f"📂 {row['category']}")

# ===== LANCER LE DASHBOARD =====
# streamlit run dashboard.py --server.port 8501
ℹ️ Lancer le dashboard
Pour démarrer le dashboard Streamlit, exécutez simplement : streamlit run dashboard.py --server.port 8501. L'application sera accessible dans votre navigateur à l'adresse http://localhost:8501. Streamlit gère automatiquement le rechargement à chaud : toute modification du code est reflétée instantanément dans le navigateur.

9. Pipeline complet : tout assembler

Maintenant que nous avons tous les composants individuels, assemblons-les dans un pipeline cohérent et robuste. Ce script principal orchestre l'ensemble du processus : collecte, enrichissement, résumé, stockage et notification. Il peut être exécuté manuellement ou déclenché par cron ou Celery.

#!/usr/bin/env python3
"""
pipeline.py - Pipeline complet de veille technologique automatisée.

Usage:
    python pipeline.py --mode collect    # Collecte uniquement
    python pipeline.py --mode digest     # Digest uniquement
    python pipeline.py --mode full       # Pipeline complet
    python pipeline.py --mode cleanup    # Nettoyage
"""

import argparse
import os
import sys
import logging
from datetime import datetime, timedelta
from pathlib import Path

# Import des modules du projet
from rss_collector import RSSCollector
from web_scraper import WebScraper
from summarizer import ArticleSummarizer
from notifications import TelegramNotifier, EmailNotifier

# Configuration du logging
log_dir = Path('/var/log/veille')
log_dir.mkdir(parents=True, exist_ok=True)

logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s [%(levelname)s] %(name)s: %(message)s',
    handlers=[
        logging.FileHandler(log_dir / 'pipeline.log'),
        logging.StreamHandler()
    ]
)
logger = logging.getLogger('pipeline')


class VeillePipeline:
    """Pipeline complet de veille technologique."""

    def __init__(self, config_path='config/sources.yaml'):
        self.config_path = config_path
        self.collector = RSSCollector(config_path)
        self.scraper = WebScraper()

        # Initialiser le résumeur
        openai_key = os.environ.get('OPENAI_API_KEY')
        self.summarizer = ArticleSummarizer(openai_key=openai_key)

        # Initialiser les notifiers
        self.telegram = None
        tg_token = os.environ.get('TELEGRAM_BOT_TOKEN')
        tg_chat = os.environ.get('TELEGRAM_CHAT_ID')
        if tg_token and tg_chat:
            self.telegram = TelegramNotifier(tg_token, tg_chat)

        self.email = None
        smtp_host = os.environ.get('SMTP_HOST')
        if smtp_host:
            self.email = EmailNotifier(
                smtp_host=smtp_host,
                smtp_port=int(os.environ.get('SMTP_PORT', 587)),
                username=os.environ.get('SMTP_USER', ''),
                password=os.environ.get('SMTP_PASS', ''),
                from_email=os.environ.get('SMTP_FROM', '')
            )

    def collect(self):
        """Phase 1 : Collecte des articles."""
        logger.info("📥 Phase 1 : Collecte des flux RSS")
        articles = self.collector.collect_all()

        logger.info("🔍 Enrichissement par scraping")
        for article in articles:
            if not article.content or len(article.content) < 100:
                result = self.scraper.scrape_page(article.url)
                if result.get('success'):
                    article.content = result['content']

        self.collector.save_articles(articles)
        logger.info(f"✅ Collecte terminée : {len(articles)} articles")
        return articles

    def summarize(self, articles=None):
        """Phase 2 : Résumé IA des articles."""
        if articles is None:
            articles = self.collector.get_unsummarized_articles()

        logger.info(f"🤖 Phase 2 : Résumé de {len(articles)} articles")
        for i, article in enumerate(articles):
            if not article.ai_summary:
                article.ai_summary = self.summarizer.summarize(article)
                logger.info(f"  [{i+1}/{len(articles)}] {article.title[:50]}...")

        self.collector.save_articles(articles)
        logger.info("✅ Résumés générés")
        return articles

    def notify(self, articles=None):
        """Phase 3 : Envoi des notifications."""
        if articles is None:
            articles = self.collector.get_recent_articles(hours=24)

        logger.info(f"📤 Phase 3 : Notification ({len(articles)} articles)")

        if self.telegram:
            self.telegram.send_digest(articles)
            logger.info("  ✅ Telegram envoyé")

        if self.email:
            to_email = os.environ.get('DIGEST_EMAIL', '')
            if to_email:
                self.email.send_digest(articles, to_email)
                logger.info(f"  ✅ Email envoyé à {to_email}")

    def run_full(self):
        """Exécute le pipeline complet."""
        logger.info("🚀 Démarrage du pipeline complet")
        start = datetime.now()

        articles = self.collect()
        articles = self.summarize(articles)
        self.notify(articles)

        elapsed = (datetime.now() - start).total_seconds()
        logger.info(f"🏁 Pipeline terminé en {elapsed:.1f}s")

    def cleanup(self, max_age_days=30):
        """Supprime les articles anciens."""
        logger.info(f"🧹 Nettoyage des articles > {max_age_days} jours")
        self.collector.delete_old_articles(max_age_days)


def main():
    parser = argparse.ArgumentParser(description='Pipeline de veille tech')
    parser.add_argument('--mode', choices=['collect', 'summarize',
                        'digest', 'full', 'cleanup'],
                        default='full', help='Mode d\'exécution')
    parser.add_argument('--config', default='config/sources.yaml',
                        help='Chemin du fichier de configuration')
    args = parser.parse_args()

    pipeline = VeillePipeline(config_path=args.config)

    modes = {
        'collect': pipeline.collect,
        'summarize': pipeline.summarize,
        'digest': pipeline.notify,
        'full': pipeline.run_full,
        'cleanup': pipeline.cleanup,
    }

    modes[args.mode]()

if __name__ == '__main__':
    main()

Ce pipeline final est conçu pour la robustesse et la maintenabilité. Chaque phase peut être exécutée indépendamment via l'argument --mode, ce qui facilite le débogage et permet des planifications différenciées (collecte toutes les 2 heures, digest une fois par jour). La configuration par variables d'environnement pour les credentials sensibles (clés API, tokens) suit les bonnes pratiques de la méthodologie Twelve-Factor App et facilite le déploiement sur différents environnements.

10. Conclusion et prochaines étapes

Vous disposez maintenant d'un système complet de veille technologique automatisée, capable de collecter des articles depuis des dizaines de sources, de les résumer intelligemment grâce à l'IA, et de vous livrer un digest quotidien personnalisé sur vos canaux de communication préférés. Ce pipeline, bien que fonctionnel en l'état, peut être enrichi de nombreuses façons.

Parmi les améliorations possibles, vous pourriez ajouter un système de scoring de pertinence basé sur vos centres d'intérêt (en fine-tunant un petit modèle de classification sur vos articles lus vs ignorés). L'intégration de sources supplémentaires comme les preprints arXiv, les trending repositories GitHub ou les discussions Reddit enrichirait considérablement votre couverture. Un système de détection de tendances basé sur la fréquence des mots-clés pourrait vous alerter sur les sujets émergents avant qu'ils ne deviennent mainstream.

Le passage à l'échelle pourrait s'appuyer sur une base de données vectorielle (Qdrant, Pinecone) pour la recherche sémantique dans vos articles, permettant des requêtes du type « trouve-moi des articles similaires à celui-ci » ou « quelles sont les dernières avancées en fine-tuning de LLM ? ». L'ajout d'agents autonomes (LangChain, CrewAI) qui approfondissent automatiquement les sujets les plus intéressants représente l'étape suivante vers une veille véritablement intelligente et proactive.

N'oubliez pas que le meilleur système de veille est celui que vous utilisez effectivement au quotidien. Commencez simple avec quelques sources RSS et un digest Telegram, puis itérez progressivement en ajoutant de la complexité uniquement là où elle apporte une valeur réelle. Bonne veille technologique à tous !

📚 Sources et références

  1. feedparser documentation. Universal Feed Parser. feedparser.readthedocs.io
  2. Richardson, L. (2023). Beautiful Soup Documentation. crummy.com/software/BeautifulSoup
  3. OpenAI. Chat Completions API Reference. platform.openai.com/docs
  4. Celery Project. First Steps with Celery. docs.celeryq.dev
  5. Streamlit Documentation. Get Started. docs.streamlit.io
  6. Ollama Documentation. API Reference. github.com/ollama
  7. Telegram Bot API. Making Requests. core.telegram.org/bots/api
Retour à l'accueil