Auteur/autrice : jmh

  • Socle V004 – Log4j2 et LogForwarder

    Socle V004 – Log4j2 et LogForwarder

    22 – Log4j2 et LogForwarder (Nouveauté V4)

    Version : 4.0.0 Date : 2025-12-09

    1. Introduction

    Le Socle V4 remplace Logback par Log4j2 pour le logging, avec un LogForwarder intégré pour la centralisation des logs.

    Pourquoi Log4j2 ?

    Critère Logback (V3) Log4j2 (V4)
    Async natif AsyncAppender (wrapper) AsyncLoggers (LMAX Disruptor)
    Performance Bon 6-68x plus rapide
    Garbage-free Non Oui
    Custom Appender Complexe Plugin system simple
    JSON natif Via encoder externe JsonTemplateLayout intégré

    2. Architecture

    ┌─────────────────────────────────────────────────────────────┐
    │                      Application                             │
    │                                                              │
    │  Logger.info("message")                                      │
    │         │                                                    │
    │         ▼                                                    │
    │  ┌─────────────────────────────────────────────────────┐    │
    │  │              Log4j2 AsyncLoggers                     │    │
    │  │              (LMAX Disruptor)                        │    │
    │  └────────────────────┬────────────────────────────────┘    │
    │                       │                                      │
    │         ┌─────────────┼─────────────┐                       │
    │         ▼             ▼             ▼                       │
    │  ┌───────────┐ ┌───────────┐ ┌─────────────────────┐       │
    │  │  Console  │ │  File     │ │ SocleLogForwarder   │       │
    │  │  Appender │ │  Appender │ │ Appender            │       │
    │  └───────────┘ └───────────┘ └──────────┬──────────┘       │
    │                                         │                   │
    └─────────────────────────────────────────┼───────────────────┘
                                              │
                        ┌─────────────────────┴─────────────────────┐
                        │                                           │
                        ▼                                           ▼
                ┌──────────────┐                           ┌──────────────┐
                │ HTTP Transport│                           │ NATS Transport│
                │ → LogHub     │                           │ → JetStream  │
                └──────┬───────┘                           └──────┬───────┘
                       │                                          │
                       │  (si échec)                              │
                       ▼                                          │
                ┌──────────────┐                                  │
                │ H2 Fallback  │◄─────────────────────────────────┘
                │ Storage      │
                └──────────────┘
    

    3. Configuration

    3.1 Dépendances Maven

    <!-- Exclure Logback de Spring Boot -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
        <exclusions>
            <exclusion>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-logging</artifactId>
            </exclusion>
        </exclusions>
    </dependency>
    
    <!-- Log4j2 -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-log4j2</artifactId>
    </dependency>
    
    <!-- LMAX Disruptor (AsyncLoggers) -->
    <dependency>
        <groupId>com.lmax</groupId>
        <artifactId>disruptor</artifactId>
        <version>4.0.0</version>
    </dependency>
    
    <!-- JSON Template Layout -->
    <dependency>
        <groupId>org.apache.logging.log4j</groupId>
        <artifactId>log4j-layout-template-json</artifactId>
        <version>2.22.1</version>
    </dependency>
    

    3.2 log4j2.xml

    <?xml version="1.0" encoding="UTF-8"?>
    <Configuration status="WARN" monitorInterval="30">
    
        <Properties>
            <Property name="LOG_DIR">${env:LOG_DIR:-./logs}</Property>
            <Property name="APP_NAME">${env:APP_NAME:-socle-v4}</Property>
            <Property name="REGION">${env:REGION:-local}</Property>
        </Properties>
    
        <Appenders>
            <!-- Console (dev) -->
            <Console name="Console" target="SYSTEM_OUT">
                <PatternLayout pattern="%d{ISO8601} %highlight{%-5level} [%thread] %logger{36} - %msg%n"/>
            </Console>
    
            <!-- Fichier rotatif -->
            <RollingFile name="File"
                         fileName="${LOG_DIR}/${APP_NAME}.log"
                         filePattern="${LOG_DIR}/${APP_NAME}-%d{yyyy-MM-dd}-%i.log.gz">
                <PatternLayout pattern="%d{ISO8601} %-5level [%thread] %logger{36} - %msg%n"/>
                <Policies>
                    <TimeBasedTriggeringPolicy interval="1"/>
                    <SizeBasedTriggeringPolicy size="100MB"/>
                </Policies>
                <DefaultRolloverStrategy max="30"/>
            </RollingFile>
    
            <!-- LogForwarder (centralisation) -->
            <SocleLogForwarder name="LogForwarder"
                               transportMode="${env:LOG_TRANSPORT_MODE:-http}"
                               logHubUrl="${env:LOG_HUB_URL:-http://localhost:8080/api/ingest-logs}"
                               natsUrl="${env:NATS_URL:-nats://localhost:4222}"
                               batchSize="100"
                               flushIntervalMs="5000"
                               queueCapacity="10000"
                               serviceName="${APP_NAME}"
                               region="${REGION}">
                <ThresholdFilter level="INFO"/>
            </SocleLogForwarder>
        </Appenders>
    
        <Loggers>
            <!-- Socle -->
            <Logger name="eu.lmvi.socle" level="${env:LOG_LEVEL:-INFO}" additivity="false">
                <AppenderRef ref="Console"/>
                <AppenderRef ref="File"/>
                <AppenderRef ref="LogForwarder"/>
            </Logger>
    
            <!-- Frameworks (moins verbeux) -->
            <Logger name="org.springframework" level="WARN"/>
            <Logger name="org.apache.kafka" level="WARN"/>
            <Logger name="io.nats" level="WARN"/>
    
            <!-- Root -->
            <Root level="INFO">
                <AppenderRef ref="Console"/>
                <AppenderRef ref="File"/>
                <AppenderRef ref="LogForwarder"/>
            </Root>
        </Loggers>
    
    </Configuration>
    

    3.3 log4j2.component.properties

    # Activer AsyncLoggers globalement (LMAX Disruptor)
    Log4jContextSelector=org.apache.logging.log4j.core.async.AsyncLoggerContextSelector
    
    # Ring buffer (puissance de 2)
    AsyncLogger.RingBufferSize=262144
    
    # Politique d'attente
    AsyncLogger.WaitStrategy=Sleep
    
    # Sécurité (Log4Shell)
    log4j2.formatMsgNoLookups=true
    

    3.4 application.yml

    socle:
      logging:
        forwarder:
          enabled: ${LOG_FORWARDER_ENABLED:false}
          transport-mode: ${LOG_TRANSPORT_MODE:http}
          log-hub-url: ${LOG_HUB_URL:http://localhost:8080/api/ingest-logs}
          nats-url: ${NATS_URL:nats://localhost:4222}
          nats-subject-prefix: ${LOG_NATS_SUBJECT:logs}
          batch-size: ${LOG_BATCH_SIZE:100}
          flush-interval-ms: ${LOG_FLUSH_INTERVAL_MS:5000}
          queue-capacity: ${LOG_QUEUE_CAPACITY:10000}
    
    logging:
      config: classpath:log4j2.xml
    

    4. Variables d’environnement

    Variable Description Défaut
    LOG_LEVEL Niveau de log INFO
    LOG_DIR Répertoire des logs ./logs
    LOG_FORWARDER_ENABLED Activer LogForwarder false
    LOG_TRANSPORT_MODE Mode transport (http/nats) http
    LOG_HUB_URL URL du LogHub
    NATS_URL URL NATS
    LOG_BATCH_SIZE Taille des batches 100
    LOG_FLUSH_INTERVAL_MS Intervalle flush (ms) 5000

    5. Utilisation

    5.1 Logging standard

    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    public class MonService {
        private static final Logger log = LoggerFactory.getLogger(MonService.class);
    
        public void process() {
            log.debug("Processing started");
            log.info("Processing item: {}", itemId);
            log.warn("Slow processing detected");
            log.error("Processing failed", exception);
        }
    }
    

    5.2 MDC (Mapped Diagnostic Context)

    import org.slf4j.MDC;
    
    public class MonService {
        public void process(String correlationId) {
            MDC.put("correlationId", correlationId);
            MDC.put("worker", "order-processor");
            try {
                log.info("Processing order");
                // Les logs incluront correlationId et worker
            } finally {
                MDC.clear();
            }
        }
    }
    

    5.3 Structured logging

    // Les logs JSON incluent automatiquement :
    // - timestamp
    // - level
    // - logger
    // - thread
    // - message
    // - MDC (correlationId, execId, etc.)
    // - exception (si présente)
    
    log.info("Order processed: orderId={}, amount={}", orderId, amount);
    

    6. LogForwarder

    6.1 Principe

    Le LogForwarder :

    1. Collecte les logs dans une queue interne (non-bloquant)
    2. Envoie les logs en batch vers le LogHub (HTTP ou NATS)
    3. Stocke en H2 si le réseau est indisponible
    4. Rejoue automatiquement à la reconnexion

    6.2 Mode HTTP

    socle:
      logging:
        forwarder:
          enabled: true
          transport-mode: http
          log-hub-url: https://logs.mycompany.com/api/ingest-logs
    

    Les logs sont envoyés en POST avec JWT :

    POST /api/ingest-logs
    Authorization: Bearer <jwt>
    Content-Type: application/json
    
    [
      {"timestamp": "...", "level": "INFO", "message": "...", ...},
      {"timestamp": "...", "level": "ERROR", "message": "...", ...}
    ]
    

    6.3 Mode NATS

    socle:
      logging:
        forwarder:
          enabled: true
          transport-mode: nats
          nats-url: nats://nats.mycompany.com:4222
          nats-subject-prefix: logs
    

    Les logs sont publiés sur logs.<region>.<service> :

    logs.mtq.order-service
    logs.gua.sync-agent
    

    6.4 Fallback H2

    Si le transport échoue, les logs sont stockés dans socle_log_fallback :

    SELECT COUNT(*) FROM socle_log_fallback;  -- Logs en attente
    

    Ils sont automatiquement rejoués quand le transport redevient disponible.

    7. Format JSON des logs

    {
      "timestamp": "2025-12-09T10:30:00.123Z",
      "level": "INFO",
      "logger": "eu.lmvi.socle.mop.MainOrchestratorProcess",
      "thread": "main",
      "message": "MOP démarré avec succès",
      "service": "socle-v4",
      "region": "MTQ",
      "instanceId": "nuc-mtq-001",
      "execId": "20251209-1030-abc123",
      "correlationId": "MTQ-2025-12-09-000001",
      "mdc": {
        "worker": "http_worker",
        "phase": "startup"
      },
      "exception": null
    }
    

    8. Profils de configuration

    8.1 Développement

    <!-- log4j2-dev.xml -->
    <Configuration status="WARN">
        <Appenders>
            <Console name="Console" target="SYSTEM_OUT">
                <PatternLayout pattern="%d{HH:mm:ss.SSS} %highlight{%-5level} [%thread] %logger{36} - %msg%n"/>
            </Console>
        </Appenders>
        <Loggers>
            <Root level="DEBUG">
                <AppenderRef ref="Console"/>
            </Root>
        </Loggers>
    </Configuration>
    

    8.2 Production

    <!-- log4j2-prod.xml -->
    <Configuration status="ERROR">
        <Appenders>
            <Console name="ConsoleJson" target="SYSTEM_OUT">
                <JsonTemplateLayout eventTemplateUri="classpath:socle-log-template.json"/>
            </Console>
            <SocleLogForwarder name="LogForwarder" .../>
        </Appenders>
        <Loggers>
            <Root level="INFO">
                <AppenderRef ref="ConsoleJson"/>
                <AppenderRef ref="LogForwarder"/>
            </Root>
        </Loggers>
    </Configuration>
    

    8.3 Sélection du profil

    logging:
      config: classpath:log4j2-${spring.profiles.active}.xml
    

    9. Performances

    AsyncLoggers vs Sync

    Mode Throughput Latence
    Sync ~1M logs/sec Variable
    Async (LMAX) ~18M logs/sec Stable

    Bonnes pratiques

    // BON - Lazy evaluation
    log.debug("Processing: {}", () -> expensiveToString());
    
    // MAUVAIS - Toujours évalué
    log.debug("Processing: " + expensiveToString());
    

    10. Troubleshooting

    Logs non visibles

    1. Vérifier log4j2.xml dans src/main/resources/
    2. Vérifier les niveaux de log
    3. Vérifier logging.config dans application.yml

    AsyncLoggers non activés

    Vérifier que log4j2.component.properties existe et contient :

    Log4jContextSelector=org.apache.logging.log4j.core.async.AsyncLoggerContextSelector
    

    LogForwarder queue pleine

    WARN - Log queue full, storing to fallback
    

    Solutions :

    • Augmenter queue-capacity
    • Réduire batch-size
    • Vérifier la connectivité réseau

    Logs en fallback non rejoués

    -- Vérifier les logs en attente
    SELECT COUNT(*) FROM socle_log_fallback;
    
    -- Forcer le replay (via API admin)
    POST /admin/logforwarder/replay
    

    11. Sécurité

    Log4Shell (CVE-2021-44228)

    Log4j2 2.22.1 est protégé contre Log4Shell. De plus :

    # Désactiver les lookups JNDI
    log4j2.formatMsgNoLookups=true
    

    Données sensibles

    // NE PAS logger de données sensibles
    log.info("User logged in: {}", user.getEmail());  // OK
    log.info("Password: {}", password);  // INTERDIT
    

    12. Références

  • Socle V004 – Workers

    Socle V004 – Workers

    05 – Workers

    Version : 4.0.1 Date : 2026-01-13

    1. Introduction

    Les Workers sont les composants de traitement du Socle V4. Chaque Worker implémente une tâche spécifique et son cycle de vie est géré par le MOP (Main Orchestrator Process).

    Caractéristiques

    • Interface unique : Tous les workers implémentent Worker
    • Cycle de vie géré : Le MOP orchestre start/stop/doWork
    • Priorités : Ordre de démarrage/arrêt configurable
    • Scheduling : Support cron et intervalle
    • Health check : Supervision intégrée

    2. Interface Worker

    package eu.lmvi.socle.worker;
    
    public interface Worker {
    
        /**
         * Nom unique du worker
         */
        String getName();
    
        /**
         * Initialisation (appelé une fois au démarrage)
         */
        void initialize();
    
        /**
         * Démarrage du worker
         */
        void start();
    
        /**
         * Traitement principal (appelé cycliquement)
         */
        void doWork();
    
        /**
         * Arrêt gracieux
         */
        void stop();
    
        /**
         * État de santé
         */
        boolean isHealthy();
    
        /**
         * Statistiques du worker
         */
        Map<String, Object> getStats();
    
        // === Priorités ===
    
        /**
         * Priorité au démarrage (plus petit = premier)
         */
        default int getStartPriority() {
            return 100;
        }
    
        /**
         * Priorité à l'arrêt (plus petit = premier)
         */
        default int getStopPriority() {
            return 100;
        }
    
        // === Scheduling ===
    
        /**
         * Expression cron (ou null si non schedulé)
         */
        default String getSchedule() {
            return null;
        }
    
        /**
         * Intervalle entre les cycles doWork() en ms
         * Valeurs recommandees : 5000 (5s), 10000 (10s), 30000 (30s)
         */
        default long getCycleIntervalMs() {
            return 5000; // 5 secondes par defaut
        }
    
        /**
         * Worker schedulé par cron ?
         */
        default boolean isScheduled() {
            return getSchedule() != null;
        }
    
        /**
         * Worker passif (ne fait rien dans doWork) ?
         */
        default boolean isPassive() {
            return false;
        }
    }
    

    3. Implémentation de base

    3.1 Worker simple

    package com.myapp.worker;
    
    import eu.lmvi.socle.worker.Worker;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.stereotype.Component;
    
    @Component
    public class SimpleWorker implements Worker {
    
        private static final Logger log = LoggerFactory.getLogger(SimpleWorker.class);
    
        private volatile boolean running = false;
        private long processedCount = 0;
    
        @Override
        public String getName() {
            return "simple-worker";
        }
    
        @Override
        public void initialize() {
            log.info("Initializing SimpleWorker");
            // Charger configuration, connexions, etc.
        }
    
        @Override
        public void start() {
            log.info("Starting SimpleWorker");
            running = true;
        }
    
        @Override
        public void doWork() {
            if (!running) return;
    
            try {
                // Traitement principal
                processedCount++;
                log.debug("Processing item #{}", processedCount);
            } catch (Exception e) {
                log.error("Error in doWork", e);
            }
        }
    
        @Override
        public void stop() {
            log.info("Stopping SimpleWorker");
            running = false;
        }
    
        @Override
        public boolean isHealthy() {
            return running;
        }
    
        @Override
        public Map<String, Object> getStats() {
            return Map.of(
                "running", running,
                "processedCount", processedCount
            );
        }
    }
    

    3.2 Worker avec priorité

    @Component
    public class DatabaseWorker implements Worker {
    
        @Override
        public String getName() {
            return "database-worker";
        }
    
        @Override
        public int getStartPriority() {
            return 10;  // Démarre en premier (connexion DB)
        }
    
        @Override
        public int getStopPriority() {
            return 90;  // S'arrête en dernier (flush des données)
        }
    
        // ... autres méthodes
    }
    

    3.3 Worker schedulé (cron)

    @Component
    public class ReportWorker implements Worker {
    
        @Override
        public String getName() {
            return "report-worker";
        }
    
        @Override
        public String getSchedule() {
            return "0 0 6 * * ?";  // Tous les jours à 6h
        }
    
        @Override
        public boolean isScheduled() {
            return true;
        }
    
        @Override
        public void doWork() {
            log.info("Generating daily report...");
            // Génération du rapport
        }
    
        // ... autres méthodes
    }
    

    3.4 Worker passif (HTTP)

    @Component
    public class ApiWorker implements Worker {
    
        @Override
        public String getName() {
            return "api-worker";
        }
    
        @Override
        public boolean isPassive() {
            return true;  // Pas de doWork() cyclique
        }
    
        @Override
        public void doWork() {
            // Ne fait rien - le traitement est déclenché par HTTP
        }
    
        // Les requêtes HTTP déclenchent le traitement via endpoints REST
    }
    

    4. AbstractWorker

    Le Socle fournit une classe de base qui simplifie l’implémentation :

    package eu.lmvi.socle.worker;
    
    public abstract class AbstractWorker implements Worker {
    
        protected final Logger log = LoggerFactory.getLogger(getClass());
        protected volatile boolean running = false;
        protected volatile boolean healthy = true;
        protected final AtomicLong processedCount = new AtomicLong(0);
        protected final AtomicLong errorCount = new AtomicLong(0);
        protected Instant lastActivity;
    
        @Override
        public void initialize() {
            log.info("[{}] Initializing", getName());
            doInitialize();
        }
    
        @Override
        public void start() {
            log.info("[{}] Starting", getName());
            running = true;
            doStart();
        }
    
        @Override
        public void stop() {
            log.info("[{}] Stopping", getName());
            running = false;
            doStop();
        }
    
        @Override
        public void doWork() {
            if (!running) return;
    
            try {
                doProcess();
                lastActivity = Instant.now();
            } catch (Exception e) {
                errorCount.incrementAndGet();
                handleError(e);
            }
        }
    
        @Override
        public boolean isHealthy() {
            return running && healthy;
        }
    
        @Override
        public Map<String, Object> getStats() {
            return Map.of(
                "running", running,
                "healthy", healthy,
                "processedCount", processedCount.get(),
                "errorCount", errorCount.get(),
                "lastActivity", lastActivity != null ? lastActivity.toString() : "never"
            );
        }
    
        // === Méthodes à implémenter ===
    
        protected void doInitialize() {}
        protected void doStart() {}
        protected abstract void doProcess();
        protected void doStop() {}
    
        protected void handleError(Exception e) {
            log.error("[{}] Error in doWork", getName(), e);
        }
    
        protected void incrementProcessed() {
            processedCount.incrementAndGet();
        }
    }
    

    Utilisation

    @Component
    public class OrderWorker extends AbstractWorker {
    
        @Override
        public String getName() {
            return "order-worker";
        }
    
        @Override
        protected void doInitialize() {
            // Initialisation spécifique
        }
    
        @Override
        protected void doProcess() {
            // Traitement principal
            processOrders();
            incrementProcessed();
        }
    
        private void processOrders() {
            // ...
        }
    }
    

    5. Cycle de vie

    5.1 Séquence de démarrage

    1. MOP.start()
    2. Pour chaque worker (trié par startPriority ASC) :
       a. worker.initialize()
       b. worker.start()
       c. Enregistrement dans Supervisor
    3. Boucle principale :
       - Pour chaque worker non-schedulé, non-passif :
         - worker.doWork()
         - Sleep(worker.getCycleIntervalMs())
    

    5.2 Séquence d’arrêt

    1. Signal SIGTERM reçu
    2. MOP.gracefulShutdown()
    3. Pour chaque worker (trié par stopPriority ASC) :
       a. worker.stop()
       b. Attendre terminaison
    4. Cleanup final
    

    5.3 Diagramme

                     ┌──────────────┐
                     │   CREATED    │
                     └──────┬───────┘
                            │ initialize()
                            ▼
                     ┌──────────────┐
                     │ INITIALIZED  │
                     └──────┬───────┘
                            │ start()
                            ▼
              ┌─────────────────────────────┐
              │          RUNNING            │
              │                             │
              │  ┌──────────────────────┐   │
              │  │   doWork() loop      │   │
              │  │   (si non-passif)    │   │
              │  └──────────────────────┘   │
              │                             │
              └─────────────┬───────────────┘
                            │ stop()
                            ▼
                     ┌──────────────┐
                     │   STOPPED    │
                     └──────────────┘
    

    6. Communication entre Workers

    6.1 Via SharedDataRegistry

    @Component
    public class ProducerWorker extends AbstractWorker {
    
        @Autowired
        private SharedDataRegistry registry;
    
        @Override
        protected void doProcess() {
            // Publier des données
            registry.put("orders.pending.count", pendingOrders.size());
            registry.incrementSequence("orders.total");
        }
    }
    
    @Component
    public class ConsumerWorker extends AbstractWorker {
    
        @Autowired
        private SharedDataRegistry registry;
    
        @Override
        protected void doProcess() {
            // Lire les données
            int pending = registry.getInt("orders.pending.count").orElse(0);
            if (pending > 0) {
                processOrders();
            }
        }
    }
    

    6.2 Via KvBus

    @Component
    public class OrderWorker extends AbstractWorker {
    
        @Autowired
        private KvBus kvBus;
    
        @Override
        protected void doProcess() {
            // Stocker l'état partagé (même entre instances)
            kvBus.put("order:" + orderId, orderJson);
        }
    }
    

    6.3 Via Events

    @Component
    public class EventProducerWorker extends AbstractWorker {
    
        @Autowired
        private ApplicationEventPublisher eventPublisher;
    
        @Override
        protected void doProcess() {
            // Publier un événement Spring
            eventPublisher.publishEvent(new OrderCreatedEvent(order));
        }
    }
    
    @Component
    public class EventConsumerWorker extends AbstractWorker {
    
        @EventListener
        public void onOrderCreated(OrderCreatedEvent event) {
            // Réagir à l'événement
            processOrder(event.getOrder());
        }
    }
    

    7. Workers et TechDB (V4)

    7.1 Persistance d’état

    @Component
    public class KafkaConsumerWorker extends AbstractWorker {
    
        @Autowired
        private TechDbManager techDb;
    
        private long currentOffset;
    
        @Override
        protected void doInitialize() {
            // Restaurer l'offset au démarrage
            currentOffset = techDb.getOffset("kafka", "my-topic-0")
                .orElse(0L);
            log.info("Starting from offset: {}", currentOffset);
        }
    
        @Override
        protected void doProcess() {
            // Traiter les messages
            List<Message> messages = consume(currentOffset);
            for (Message msg : messages) {
                process(msg);
                currentOffset = msg.getOffset();
            }
    
            // Persister périodiquement
            if (currentOffset % 1000 == 0) {
                techDb.saveOffset("kafka", "my-topic-0", currentOffset, null);
            }
        }
    
        @Override
        protected void doStop() {
            // Sauvegarder l'offset final
            techDb.saveOffset("kafka", "my-topic-0", currentOffset, null);
        }
    }
    

    7.2 État du worker

    @Component
    public class BatchWorker extends AbstractWorker {
    
        @Autowired
        private TechDbManager techDb;
    
        @Override
        protected void doProcess() {
            // Mettre à jour l'état
            techDb.saveWorkerState(getName(), "PROCESSING",
                Map.of(
                    "currentBatch", currentBatchId,
                    "progress", progress
                ));
    
            // Traitement...
    
            techDb.saveWorkerState(getName(), "IDLE", Map.of());
        }
    }
    

    8. Patterns courants

    8.1 Worker avec retry

    @Component
    public class RetryableWorker extends AbstractWorker {
    
        @Autowired
        private RetryTemplate retryTemplate;
    
        @Override
        protected void doProcess() {
            retryTemplate.execute(context -> {
                processWithRetry();
                return null;
            });
        }
    }
    

    8.2 Worker avec circuit breaker

    @Component
    public class ResilientWorker extends AbstractWorker {
    
        @Autowired
        private CircuitBreakerTemplate circuitBreaker;
    
        @Override
        protected void doProcess() {
            circuitBreaker.execute("external-api", () -> {
                callExternalApi();
            });
        }
    }
    

    8.3 Worker batch

    @Component
    public class BatchWorker extends AbstractWorker {
    
        private final int BATCH_SIZE = 100;
    
        @Override
        protected void doProcess() {
            List<Item> batch = fetchBatch(BATCH_SIZE);
            if (batch.isEmpty()) {
                return;
            }
    
            for (Item item : batch) {
                processItem(item);
                incrementProcessed();
            }
        }
    }
    

    9. Tests

    9.1 Test unitaire

    @ExtendWith(MockitoExtension.class)
    class SimpleWorkerTest {
    
        @InjectMocks
        private SimpleWorker worker;
    
        @Test
        void shouldInitialize() {
            worker.initialize();
            // Assertions...
        }
    
        @Test
        void shouldProcessItems() {
            worker.initialize();
            worker.start();
    
            worker.doWork();
    
            Map<String, Object> stats = worker.getStats();
            assertEquals(1L, stats.get("processedCount"));
        }
    
        @Test
        void shouldStopGracefully() {
            worker.initialize();
            worker.start();
            worker.stop();
    
            assertFalse(worker.isHealthy());
        }
    }
    

    9.2 Test d’intégration

    @SpringBootTest
    class WorkerIntegrationTest {
    
        @Autowired
        private List<Worker> workers;
    
        @Test
        void allWorkersShouldBeHealthyAfterStart() {
            workers.forEach(Worker::initialize);
            workers.forEach(Worker::start);
    
            workers.forEach(w -> assertTrue(w.isHealthy(),
                "Worker " + w.getName() + " should be healthy"));
        }
    }
    

    10. Auto-Restart des Workers

    Le MOP surveille automatiquement la sante des workers et peut les redemarrer en cas d’echec.

    Fonctionnement

    1. Detection : Le MOP verifie worker.isHealthy() a chaque cycle de la boucle principale
    2. Compteur : Un compteur d’echecs consecutifs est maintenu pour chaque worker
    3. Seuil : Apres 3 echecs consecutifs (configurable), le worker est redemarre
    4. Restart : Sequence stop()initialize()start() → replanification doWork()
    5. Reset : Le compteur est remis a zero si le worker redevient healthy

    Logs associes

    [step:worker_auto_restart] Worker 'xxx' unhealthy 3 fois, tentative de redemarrage
    [step:worker_restarted] Worker 'xxx' redemarre avec succes
    [step:worker_restart_failed] Echec du redemarrage de 'xxx': <error>
    

    Implementation dans le Worker

    Pour beneficier de l’auto-restart, le worker doit :

    1. Retourner false dans isHealthy() en cas de probleme
    2. Supporter un cycle stop()initialize()start()
    3. Reinitialiser son etat interne dans initialize()
    @Override
    public boolean isHealthy() {
        // Retourner false declenche le compteur d'echecs
        return lastApiCallSuccessful && !hasConsecutiveErrors;
    }
    

    Workers exclus

    Les workers built-in du socle (http_worker, control_worker, etc.) ne sont pas soumis a l’auto-restart car ils sont geres par le Supervisor.

    11. Workers Event-Driven

    Les Event-Driven Workers sont des workers passifs declenches par des evenements externes (Kafka, queues, CDC, etc.).

    Classe de base

    package eu.lmvi.socle.worker.event;
    
    public abstract class AbstractEventDrivenWorker<T> implements Worker {
    
        // Concurrence configurable
        protected AbstractEventDrivenWorker(int concurrency) { ... }
    
        // Mode PASSIVE automatique
        @Override
        public final boolean isPassive() { return true; }
    
        @Override
        public final String getSchedule() { return "PASSIVE"; }
    
        // Methodes abstraites a implementer
        protected abstract T pollEvent() throws InterruptedException;
        protected abstract void processEvent(T event);
    
        // Hooks optionnels
        protected void onInitialize() { }
        protected void onStarted() { }
        protected void onStopping() { }
        protected void onStopped() { }
        protected void handleError(T event, Exception e, int workerId) { }
        public long getBacklog() { return 0; }
    }
    

    Exemple d’implementation

    @Component
    public class KafkaEventWorker extends AbstractEventDrivenWorker<ConsumerRecord<String, String>> {
    
        private final BlockingQueue<ConsumerRecord<String, String>> eventQueue = new LinkedBlockingQueue<>();
    
        public KafkaEventWorker() {
            super(4); // 4 threads concurrents
        }
    
        @Override
        public String getName() {
            return "kafka_event_worker";
        }
    
        @Override
        protected ConsumerRecord<String, String> pollEvent() throws InterruptedException {
            return eventQueue.poll(100, TimeUnit.MILLISECONDS);
        }
    
        @Override
        protected void processEvent(ConsumerRecord<String, String> event) {
            // Traitement de l'evenement
            log.info("Processing: {}", event.value());
        }
    
        @Override
        public long getBacklog() {
            return eventQueue.size();
        }
    }
    

    Metriques standardisees

    Les Event-Driven Workers exposent des metriques compatibles avec le StatusDashboard :

    Cle Type Description
    state String "running" ou "stopped"
    execution_count long Nombre d’evenements traites
    errors_count long Nombre d’erreurs
    last_execution String ISO-8601 du dernier traitement
    schedule String Toujours "PASSIVE"
    messages_processed long Alias de execution_count
    throughput_per_sec double Debit calcule
    backlog long Evenements en attente

    12. Convention des Stats Workers

    Pour une integration correcte avec le StatusDashboard et le WorkerActivityTracker, tous les workers doivent exposer des cles standardisees dans getStats().

    Cles obligatoires

    Cle Type Description
    state String "running" ou "stopped"
    execution_count long Nombre total d’executions
    errors_count long Nombre d’erreurs
    last_execution String/long Timestamp ISO-8601 ou epoch ms
    schedule String Mode: "PASSIVE", "INTERVAL", "CRON", ou expression cron

    Cles optionnelles

    Cle Type Description
    total_duration_ms long Duree totale cumulee
    avg_duration_ms double Duree moyenne par execution
    throughput_per_sec double Debit (ops/sec)
    messages_processed long Pour Event-Driven (alias execution_count)
    backlog long File d’attente

    Exemple d’implementation

    @Override
    public Map<String, Object> getStats() {
        Map<String, Object> stats = new HashMap<>();
    
        // Cles standardisees (obligatoires)
        stats.put("state", running ? "running" : "stopped");
        stats.put("execution_count", executionCount.get());
        stats.put("errors_count", errorCount.get());
        stats.put("last_execution", lastExecution != null
            ? lastExecution.toString()  // ISO-8601
            : null);
        stats.put("schedule", getSchedule() != null ? getSchedule() : "INTERVAL");
    
        // Cles optionnelles
        stats.put("total_duration_ms", totalDurationMs.get());
        stats.put("avg_duration_ms", executionCount.get() > 0
            ? (double) totalDurationMs.get() / executionCount.get()
            : 0.0);
    
        return stats;
    }
    

    13. JaninoWorker (Scripts Java Compiles)

    Le JaninoWorker permet d’executer du code Java compile dynamiquement a la volee. Contrairement au ScriptEngine (interprete), Janino compile le code en bytecode JVM pour des performances natives.

    13.1 Activation

    socle:
      janino:
        enabled: true
        scripts-path: ./repository/scripts/java
        reload-interval-ms: 300000  # 5 minutes
    

    13.2 Interfaces disponibles

    Les scripts doivent implementer une des interfaces :

    Interface Usage Methode
    Calculator<T> Calculs (frais, taxes) T calculate(Map<String, Object> context)
    Executable Execution generique Object execute(Map<String, Object> context)
    Validator Validations ValidationResult validate(Object input)

    13.3 Exemple de script

    // Fichier: repository/scripts/java/KrakenFeeCalculator.java
    
    import eu.lmvi.socle.janino.interfaces.Calculator;
    import java.math.BigDecimal;
    import java.math.RoundingMode;
    import java.util.Map;
    
    public class KrakenFeeCalculator implements Calculator<BigDecimal> {
    
        private static final BigDecimal FEE_RATE = new BigDecimal("0.0026");
    
        @Override
        public BigDecimal calculate(Map<String, Object> context) {
            BigDecimal amount = (BigDecimal) context.get("amount");
            return amount.multiply(FEE_RATE).setScale(8, RoundingMode.HALF_UP);
        }
    }
    

    13.4 Utilisation dans un Worker

    @Component
    public class TradingWorker implements Worker {
    
        @Autowired
        private JaninoWorker janinoWorker;
    
        @Override
        public void doWork() {
            Map<String, Object> context = Map.of(
                "amount", new BigDecimal("1000.00")
            );
    
            BigDecimal fee = janinoWorker.execute(
                "KrakenFeeCalculator",
                context,
                BigDecimal.class
            );
    
            log.info("Fee calculated: {}", fee);
        }
    }
    

    13.5 Acces direct au moteur

    @Autowired
    private JaninoWorker janinoWorker;
    
    // Compiler un script a la volee
    janinoWorker.compileScript("MyScript", sourceCode);
    
    // Verifier si compile
    boolean ready = janinoWorker.isScriptCompiled("MyScript");
    
    // Forcer le rechargement
    janinoWorker.forceReload();
    
    // Acces au moteur
    JaninoEngine engine = janinoWorker.getEngine();
    

    13.6 Securite

    Janino bloque l’acces aux packages dangereux :

    • java.io – Acces fichiers
    • java.net – Acces reseau
    • java.lang.reflect – Reflection
    • sun.*, com.sun.* – Classes internes

    13.7 Comparaison avec ScriptEngine

    Critere ScriptEngine JaninoWorker
    Langages JavaScript, BeanShell Java pur
    Performance Interprete Compile (natif JVM)
    Typage Dynamique Statique
    Hot reload Non Oui (automatique)
    Use case Prototypage, scripts simples Calculs haute perf

    14. Bonnes pratiques

    DO

    • Implémenter stop() pour un arrêt gracieux
    • Utiliser isHealthy() pour signaler les problèmes
    • Logger les transitions d’état
    • Gérer les exceptions dans doWork()
    • Utiliser des priorités pour les dépendances

    DON’T

    • Ne pas bloquer indéfiniment dans doWork()
    • Ne pas ignorer les signaux d’arrêt
    • Ne pas oublier de décrémenter les ressources dans stop()
    • Ne pas utiliser de variables statiques pour l’état

    15. Références

  • Socle V004 – Worker Registry

    Socle V004 – Worker Registry

    24 – Client Worker Registry (Nouveauté V4)

    Version : 4.0.0 Date : 2025-12-09

    1. Introduction

    Le WorkerRegistryClient permet aux applications Socle V4 de s’auto-enregistrer auprès d’un Registry central pour la supervision.

    Bénéfices

    • Visibilité : Savoir quels workers sont actifs par région
    • Supervision : Détecter les workers « LOST » (heartbeat manquant)
    • Diagnostique : Informations version, capabilities, charge

    2. Architecture

    ┌─────────────────┐                    ┌─────────────────┐
    │   Application   │                    │  Registry       │
    │   Socle V4      │                    │  (central)      │
    └────────┬────────┘                    └────────┬────────┘
             │                                      │
             │  1. POST /workers/register           │
             │     (au démarrage)                   │
             │─────────────────────────────────────►│
             │                                      │
             │  2. POST /workers/heartbeat          │
             │     (toutes les 30s)                 │
             │─────────────────────────────────────►│
             │                                      │
             │  3. DELETE /workers/{id}             │
             │     (à l'arrêt)                      │
             │─────────────────────────────────────►│
             │                                      │
    
                        ┌─────────────────┐
                        │  Metabase /     │
                        │  Grafana        │◄────── Consultation
                        └─────────────────┘
    

    3. Configuration

    3.1 application.yml

    socle:
      worker-registry:
        enabled: ${WORKER_REGISTRY_ENABLED:false}
        server-url: ${WORKER_REGISTRY_URL:https://registry.lmvi.org}
        heartbeat-interval-ms: ${REGISTRY_HEARTBEAT_INTERVAL_MS:30000}
        connect-timeout-ms: 10000
        read-timeout-ms: 30000
    

    3.2 Variables d’environnement

    Variable Description Défaut
    WORKER_REGISTRY_ENABLED Activer le registry false
    WORKER_REGISTRY_URL URL du Registry
    REGISTRY_HEARTBEAT_INTERVAL_MS Intervalle heartbeat (ms) 30000

    4. Interface WorkerRegistryClient

    package eu.lmvi.socle.client.registry;
    
    /**
     * Client Registry pour auto-enregistrement des Workers
     */
    public interface WorkerRegistryClient {
    
        /**
         * Enregistrement initial au démarrage
         * @param registration Informations du worker
         * @throws RegistryException si échec
         */
        void register(WorkerRegistration registration) throws RegistryException;
    
        /**
         * Heartbeat périodique
         * @param heartbeat État courant du worker
         * @throws RegistryException si échec
         */
        void heartbeat(WorkerHeartbeat heartbeat) throws RegistryException;
    
        /**
         * Désenregistrement à l'arrêt
         * @param workerId ID du worker
         * @throws RegistryException si échec
         */
        void unregister(String workerId) throws RegistryException;
    
        /**
         * Vérifie si le worker est enregistré
         */
        boolean isRegistered();
    }
    

    5. DTOs

    5.1 WorkerRegistration

    package eu.lmvi.socle.client.registry;
    
    /**
     * Informations d'enregistrement d'un worker
     */
    public record WorkerRegistration(
        String workerId,          // Identifiant unique (ex: "AGENT-DB2-MTQ-001")
        String workerType,        // Type de worker (ex: "journal-reader")
        String region,            // Région (ex: "MTQ", "GUA", "REU")
        String host,              // Hostname
        String version,           // Version de l'application
        List<String> capabilities,// Capacités (ex: ["db2-cdc", "nats-publisher"])
        Map<String, Object> extra // Métadonnées additionnelles
    ) {
        public static WorkerRegistration of(SocleConfiguration config) {
            return new WorkerRegistration(
                config.getExec_id(),
                config.getApp_name(),
                config.getRegion(),
                getHostname(),
                config.getVersion(),
                List.of(),
                Map.of()
            );
        }
    
        private static String getHostname() {
            try {
                return InetAddress.getLocalHost().getHostName();
            } catch (UnknownHostException e) {
                return "unknown";
            }
        }
    }
    

    5.2 WorkerHeartbeat

    package eu.lmvi.socle.client.registry;
    
    /**
     * Heartbeat périodique d'un worker
     */
    public record WorkerHeartbeat(
        String workerId,           // ID du worker
        String status,             // RUNNING, STOPPING, ERROR
        Map<String, Object> load   // Métriques de charge
    ) {
        public static WorkerHeartbeat running(String workerId, Map<String, Object> load) {
            return new WorkerHeartbeat(workerId, "RUNNING", load);
        }
    
        public static WorkerHeartbeat stopping(String workerId) {
            return new WorkerHeartbeat(workerId, "STOPPING", Map.of());
        }
    
        public static WorkerHeartbeat error(String workerId, String errorMessage) {
            return new WorkerHeartbeat(workerId, "ERROR", Map.of("error", errorMessage));
        }
    }
    

    6. Implémentation

    package eu.lmvi.socle.client.registry;
    
    @Component
    @ConditionalOnProperty(name = "socle.worker-registry.enabled", havingValue = "true")
    public class HttpWorkerRegistryClient implements WorkerRegistryClient {
    
        private static final Logger log = LoggerFactory.getLogger(HttpWorkerRegistryClient.class);
    
        private final SocleConfiguration config;
        private final SocleAuthClient authClient;
        private final OkHttpClient httpClient;
        private final ObjectMapper objectMapper;
    
        private volatile boolean registered = false;
        private volatile String currentWorkerId;
    
        public HttpWorkerRegistryClient(
                SocleConfiguration config,
                @Autowired(required = false) SocleAuthClient authClient) {
            this.config = config;
            this.authClient = authClient;
            this.objectMapper = new ObjectMapper();
    
            this.httpClient = new OkHttpClient.Builder()
                .connectTimeout(config.getRegistryConnectTimeoutMs(), TimeUnit.MILLISECONDS)
                .readTimeout(config.getRegistryReadTimeoutMs(), TimeUnit.MILLISECONDS)
                .build();
        }
    
        @Override
        public void register(WorkerRegistration registration) throws RegistryException {
            log.info("Registering worker: {} ({})", registration.workerId(), registration.workerType());
    
            try {
                String json = objectMapper.writeValueAsString(registration);
    
                Request.Builder requestBuilder = new Request.Builder()
                    .url(config.getRegistryServerUrl() + "/api/v1/workers/register")
                    .post(RequestBody.create(json, MediaType.parse("application/json")));
    
                // Add auth if available
                if (authClient != null && authClient.isAuthenticated()) {
                    requestBuilder.header("Authorization", "Bearer " + authClient.getValidAccessToken());
                }
    
                try (Response response = httpClient.newCall(requestBuilder.build()).execute()) {
                    if (!response.isSuccessful()) {
                        throw new RegistryException("Registration failed: " + response.code());
                    }
    
                    registered = true;
                    currentWorkerId = registration.workerId();
                    log.info("Worker registered successfully: {}", registration.workerId());
                }
            } catch (IOException e) {
                throw new RegistryException("Registration failed", e);
            }
        }
    
        @Override
        public void heartbeat(WorkerHeartbeat heartbeat) throws RegistryException {
            if (!registered) {
                log.warn("Cannot send heartbeat, worker not registered");
                return;
            }
    
            log.debug("Sending heartbeat: {} - {}", heartbeat.workerId(), heartbeat.status());
    
            try {
                String json = objectMapper.writeValueAsString(heartbeat);
    
                Request.Builder requestBuilder = new Request.Builder()
                    .url(config.getRegistryServerUrl() + "/api/v1/workers/heartbeat")
                    .post(RequestBody.create(json, MediaType.parse("application/json")));
    
                if (authClient != null && authClient.isAuthenticated()) {
                    requestBuilder.header("Authorization", "Bearer " + authClient.getValidAccessToken());
                }
    
                try (Response response = httpClient.newCall(requestBuilder.build()).execute()) {
                    if (!response.isSuccessful()) {
                        log.warn("Heartbeat failed: {}", response.code());
                        // Don't throw - heartbeat failure is not critical
                    }
                }
            } catch (IOException e) {
                log.warn("Heartbeat failed: {}", e.getMessage());
                // Don't throw - heartbeat failure is not critical
            }
        }
    
        @Override
        public void unregister(String workerId) throws RegistryException {
            if (!registered) {
                return;
            }
    
            log.info("Unregistering worker: {}", workerId);
    
            try {
                Request.Builder requestBuilder = new Request.Builder()
                    .url(config.getRegistryServerUrl() + "/api/v1/workers/" + workerId)
                    .delete();
    
                if (authClient != null && authClient.isAuthenticated()) {
                    requestBuilder.header("Authorization", "Bearer " + authClient.getValidAccessToken());
                }
    
                try (Response response = httpClient.newCall(requestBuilder.build()).execute()) {
                    // Ignore response - best effort
                    registered = false;
                    currentWorkerId = null;
                    log.info("Worker unregistered: {}", workerId);
                }
            } catch (IOException e) {
                log.warn("Unregister failed: {}", e.getMessage());
                // Don't throw - unregister failure is not critical
            }
        }
    
        @Override
        public boolean isRegistered() {
            return registered;
        }
    }
    

    7. Intégration avec MOP

    7.1 Enregistrement au démarrage

    // Dans MainOrchestratorProcess.start()
    if (registryClient != null && config.isWorkerRegistryEnabled()) {
        log.info("[step:registry_register] Enregistrement au Worker Registry");
        try {
            WorkerRegistration registration = new WorkerRegistration(
                config.getExec_id(),
                config.getApp_name(),
                config.getRegion(),
                InetAddress.getLocalHost().getHostName(),
                config.getVersion(),
                getWorkerCapabilities(),
                Map.of(
                    "startTime", Instant.now(),
                    "javaVersion", System.getProperty("java.version")
                )
            );
            registryClient.register(registration);
        } catch (RegistryException e) {
            log.warn("Registry registration failed, continuing", e);
        }
    }
    

    7.2 Heartbeat périodique

    // Dans la boucle principale ou via ScheduledExecutorService
    private void sendRegistryHeartbeat() {
        if (registryClient == null || !registryClient.isRegistered()) {
            return;
        }
    
        try {
            WorkerHeartbeat heartbeat = new WorkerHeartbeat(
                config.getExec_id(),
                "RUNNING",
                Map.of(
                    "uptime", getUptime(),
                    "workersCount", workers.size(),
                    "healthyWorkers", countHealthyWorkers(),
                    "memoryUsedMb", getMemoryUsedMb()
                )
            );
            registryClient.heartbeat(heartbeat);
        } catch (RegistryException e) {
            log.debug("Heartbeat failed: {}", e.getMessage());
        }
    }
    

    7.3 Désenregistrement à l’arrêt

    // Dans MainOrchestratorProcess.gracefulShutdown()
    if (registryClient != null && registryClient.isRegistered()) {
        log.info("[step:registry_unregister] Désenregistrement du Registry");
        try {
            registryClient.unregister(config.getExec_id());
        } catch (RegistryException e) {
            log.warn("Unregister failed", e);
        }
    }
    

    8. Exemple de données

    8.1 Registration

    {
      "workerId": "AGENT-DB2-MTQ-001",
      "workerType": "journal-reader",
      "region": "MTQ",
      "host": "mtq-nuc-01",
      "version": "4.0.0",
      "capabilities": ["db2-cdc", "nats-publisher"],
      "extra": {
        "journal": "QUSR0023",
        "library": "IMA001FDMQ",
        "startTime": "2025-12-09T10:00:00Z",
        "javaVersion": "21.0.1"
      }
    }
    

    8.2 Heartbeat

    {
      "workerId": "AGENT-DB2-MTQ-001",
      "status": "RUNNING",
      "load": {
        "uptime": 3600,
        "messagesPerMinute": 523,
        "lastSequence": 123456789,
        "memoryUsedMb": 256,
        "cpuPercent": 15
      }
    }
    

    9. Côté serveur (Registry central)

    9.1 Table worker_registry

    CREATE TABLE worker_registry (
        id BIGSERIAL PRIMARY KEY,
        worker_id VARCHAR(200) NOT NULL UNIQUE,
        worker_type VARCHAR(100) NOT NULL,
        region VARCHAR(50),
        host VARCHAR(200),
        version VARCHAR(50),
        status VARCHAR(20) DEFAULT 'UNKNOWN',
        registered_at TIMESTAMPTZ DEFAULT NOW(),
        last_heartbeat TIMESTAMPTZ,
        capabilities JSONB,
        extra JSONB,
        load JSONB
    );
    
    CREATE INDEX idx_worker_registry_region ON worker_registry(region);
    CREATE INDEX idx_worker_registry_type ON worker_registry(worker_type);
    CREATE INDEX idx_worker_registry_status ON worker_registry(status);
    

    9.2 Détection des workers LOST

    -- Workers sans heartbeat depuis plus de 2 minutes
    UPDATE worker_registry
    SET status = 'LOST'
    WHERE status = 'RUNNING'
      AND last_heartbeat < NOW() - INTERVAL '2 minutes';
    

    9.3 Dashboard Metabase/Grafana

    -- Workers actifs par région
    SELECT region, COUNT(*) as count
    FROM worker_registry
    WHERE status = 'RUNNING'
    GROUP BY region;
    
    -- Workers LOST
    SELECT worker_id, region, last_heartbeat
    FROM worker_registry
    WHERE status = 'LOST';
    

    10. Bonnes pratiques

    DO

    • ✅ Enregistrer au démarrage, désenregistrer à l’arrêt
    • ✅ Heartbeat régulier (30s recommandé)
    • ✅ Inclure des métriques utiles dans le heartbeat
    • ✅ Gérer gracieusement les échecs (non bloquant)

    DON’T

    • ❌ Heartbeat trop fréquent (< 10s)
    • ❌ Bloquer sur les erreurs registry
    • ❌ Stocker des données sensibles dans extra

    11. Troubleshooting

    Worker non visible dans le dashboard

    1. Vérifier WORKER_REGISTRY_ENABLED=true
    2. Vérifier WORKER_REGISTRY_URL
    3. Vérifier les logs : « Worker registered successfully »

    Worker marqué LOST

    1. Vérifier que l’application tourne
    2. Vérifier la connectivité réseau
    3. Vérifier les logs heartbeat

    Erreur 401

    1. Vérifier que AUTH_ENABLED=true
    2. Vérifier API_KEY
    3. Vérifier que l’auth fonctionne

    12. Références

  • Socle V004 – Standard TechDB

    Socle V004 – Standard TechDB

    28 – Standard de Creation de Table H2

    Version : 4.0.1 Date : 2026-01-13 Package : eu.lmvi.socle.techdb

    Introduction

    Ce document definit le standard de creation des tables dans la base technique H2 du Socle V004. Ce standard garantit la coherence, la tracabilite et la compatibilite PostgreSQL/H2.

    Structure de Base

    Toutes les tables TechDB doivent suivre cette structure:

    Champ Type Description
    x_id BIGINT GENERATED BY DEFAULT AS IDENTITY Identifiant technique auto-genere
    x_dateCreated TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT CURRENT_TIMESTAMP Date creation
    x_dateChanged TIMESTAMP WITH TIME ZONE Date modification (NULL a l’insert, MAJ par appli)
    x_sub VARCHAR(255) Sujet/categorie
    x_partition VARCHAR(30) Partition logique
    x_comment CLOB Commentaires/historiques JSON texte
    [champs metier] Champs specifiques a la table
    datas CLOB Donnees metier JSON texte (toujours en fin)

    Regles

    Identite

    • Utiliser GENERATED BY DEFAULT AS IDENTITY, pas AUTO_INCREMENT (MySQL)
    • Pas de sequence explicite
    • La colonne x_id est toujours la cle primaire
    -- Correct (SQL standard / H2 / PostgreSQL)
    x_id BIGINT GENERATED BY DEFAULT AS IDENTITY PRIMARY KEY
    
    -- Incorrect (MySQL uniquement)
    id BIGINT AUTO_INCREMENT PRIMARY KEY
    

    Timestamps

    • x_dateCreated: Toujours NOT NULL DEFAULT CURRENT_TIMESTAMP
    • x_dateChanged: NULL a l’insertion, mis a jour par l’application
    • Utiliser TIMESTAMP WITH TIME ZONE pour la compatibilite
    x_dateCreated TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT CURRENT_TIMESTAMP,
    x_dateChanged TIMESTAMP WITH TIME ZONE,
    

    Triggers

    • Aucun trigger dans H2
    • L’audit, l’historisation et la mise a jour de x_dateChanged sont geres par l’application

    Cle Primaire

    • Si une cle existante est presente (ex: worker_name), la conserver comme UNIQUE
    • x_id reste la PK technique
    x_id BIGINT GENERATED BY DEFAULT AS IDENTITY PRIMARY KEY,
    worker_name VARCHAR(255) NOT NULL UNIQUE,
    

    Conventions de Nommage

    Element Convention Exemple
    Champs techniques Prefixe x_ x_id, x_dateCreated
    Cles etrangeres id_<table_cible> id_user, id_order
    Champ id existant Renommer en x_id
    Donnees JSON datas en derniere position

    Permissions

    • Droits geres au niveau utilisateur H2
    • Proprietaire = utilisateur createur (socle par defaut)

    Contraintes H2 vs PostgreSQL

    PostgreSQL H2
    JSONB CLOB
    Triggers natifs Logique applicative
    Validation JSON DB Validation applicative
    SERIAL GENERATED BY DEFAULT AS IDENTITY
    Index GIN sur JSON Non supporte

    Exemple DDL Complet

    CREATE TABLE IF NOT EXISTS techdb_example (
        -- Champs techniques (standard)
        x_id BIGINT GENERATED BY DEFAULT AS IDENTITY PRIMARY KEY,
        x_dateCreated TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT CURRENT_TIMESTAMP,
        x_dateChanged TIMESTAMP WITH TIME ZONE,
        x_sub VARCHAR(255),
        x_partition VARCHAR(30),
        x_comment CLOB,
    
        -- Champs metier specifiques
        example_key VARCHAR(255) NOT NULL UNIQUE,
        status VARCHAR(50) NOT NULL,
        counter INT DEFAULT 0,
        last_activity TIMESTAMP WITH TIME ZONE,
    
        -- Donnees JSON (toujours en dernier)
        datas CLOB
    );
    
    -- Index recommandes
    CREATE INDEX IF NOT EXISTS idx_example_key ON techdb_example(example_key);
    CREATE INDEX IF NOT EXISTS idx_example_status ON techdb_example(status);
    CREATE INDEX IF NOT EXISTS idx_example_created ON techdb_example(x_dateCreated);
    

    Tables TechDB du Socle

    Le Socle V004 definit 5 tables techniques:

    techdb_offsets

    Stockage des offsets de consommation (Kafka, NATS, etc.)

    CREATE TABLE IF NOT EXISTS techdb_offsets (
        x_id BIGINT GENERATED BY DEFAULT AS IDENTITY PRIMARY KEY,
        x_dateCreated TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT CURRENT_TIMESTAMP,
        x_dateChanged TIMESTAMP WITH TIME ZONE,
        x_sub VARCHAR(255),
        x_partition VARCHAR(30),
        x_comment CLOB,
        offset_key VARCHAR(255) NOT NULL UNIQUE,
        topic VARCHAR(255) NOT NULL,
        partition_id INT DEFAULT 0,
        offset_value BIGINT NOT NULL,
        consumer_group VARCHAR(255),
        datas CLOB
    );
    

    techdb_worker_state

    Etat persistant des Workers

    CREATE TABLE IF NOT EXISTS techdb_worker_state (
        x_id BIGINT GENERATED BY DEFAULT AS IDENTITY PRIMARY KEY,
        x_dateCreated TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT CURRENT_TIMESTAMP,
        x_dateChanged TIMESTAMP WITH TIME ZONE,
        x_sub VARCHAR(255),
        x_partition VARCHAR(30),
        x_comment CLOB,
        worker_name VARCHAR(255) NOT NULL UNIQUE,
        state VARCHAR(50) NOT NULL,
        last_run_at TIMESTAMP WITH TIME ZONE,
        next_run_at TIMESTAMP WITH TIME ZONE,
        error_count INT DEFAULT 0,
        last_error CLOB,
        datas CLOB
    );
    

    techdb_events

    Evenements techniques

    CREATE TABLE IF NOT EXISTS techdb_events (
        x_id BIGINT GENERATED BY DEFAULT AS IDENTITY PRIMARY KEY,
        x_dateCreated TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT CURRENT_TIMESTAMP,
        x_dateChanged TIMESTAMP WITH TIME ZONE,
        x_sub VARCHAR(255),
        x_partition VARCHAR(30),
        x_comment CLOB,
        event_type VARCHAR(100) NOT NULL,
        source VARCHAR(255) NOT NULL,
        processed BOOLEAN DEFAULT FALSE,
        processed_at TIMESTAMP WITH TIME ZONE,
        datas CLOB
    );
    

    techdb_log_buffer

    Buffer de logs pour LogForwarder

    CREATE TABLE IF NOT EXISTS techdb_log_buffer (
        x_id BIGINT GENERATED BY DEFAULT AS IDENTITY PRIMARY KEY,
        x_dateCreated TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT CURRENT_TIMESTAMP,
        x_dateChanged TIMESTAMP WITH TIME ZONE,
        x_sub VARCHAR(255),
        x_partition VARCHAR(30),
        x_comment CLOB,
        log_level VARCHAR(20) NOT NULL,
        logger_name VARCHAR(255),
        message CLOB NOT NULL,
        thread_name VARCHAR(255),
        log_timestamp TIMESTAMP WITH TIME ZONE NOT NULL,
        forwarded BOOLEAN DEFAULT FALSE,
        forwarded_at TIMESTAMP WITH TIME ZONE,
        datas CLOB
    );
    

    techdb_kv

    Stockage cle-valeur generique

    CREATE TABLE IF NOT EXISTS techdb_kv (
        x_id BIGINT GENERATED BY DEFAULT AS IDENTITY PRIMARY KEY,
        x_dateCreated TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT CURRENT_TIMESTAMP,
        x_dateChanged TIMESTAMP WITH TIME ZONE,
        x_sub VARCHAR(255),
        x_partition VARCHAR(30),
        x_comment CLOB,
        kv_key VARCHAR(512) NOT NULL UNIQUE,
        value_type VARCHAR(50) DEFAULT 'string',
        expires_at TIMESTAMP WITH TIME ZONE,
        datas CLOB
    );
    

    Bonnes Pratiques

    DO

    • Toujours utiliser le format standard x_ pour les champs techniques
    • Mettre datas en derniere colonne
    • Creer des index sur les colonnes frequemment requetees
    • Utiliser TIMESTAMP WITH TIME ZONE pour tous les timestamps
    • Documenter le contenu JSON attendu dans datas

    DON’T

    • Ne pas utiliser AUTO_INCREMENT (syntaxe MySQL)
    • Ne pas creer de triggers (gestion applicative)
    • Ne pas stocker de BLOBs volumineux (utiliser stockage externe)
    • Ne pas utiliser JSONB (non supporte par H2)
    • Ne pas omettre les index sur les colonnes de recherche

    Migration depuis l’ancien format

    Si vous avez des tables existantes avec l’ancien format:

    -- 1. Sauvegarder les donnees
    CREATE TABLE techdb_events_backup AS SELECT * FROM techdb_events;
    
    -- 2. Supprimer l'ancienne table
    DROP TABLE techdb_events;
    
    -- 3. Recreer avec le nouveau format
    -- (voir DDL ci-dessus)
    
    -- 4. Migrer les donnees
    INSERT INTO techdb_events (event_type, source, processed, processed_at, datas)
    SELECT event_type, source, processed, processed_at, payload
    FROM techdb_events_backup;
    
    -- 5. Supprimer la sauvegarde
    DROP TABLE techdb_events_backup;
    

    Voir aussi

    H2 = dev/tests/outillage – Structure compatible PostgreSQL/H2

    Socle V004 – Standard TechDB

  • Socle V004 – Configuration

    Socle V004 – Configuration

    04 – Configuration

    Version : 4.0.0 Date : 2025-01-25

    1. Introduction

    Le Socle V4 utilise une configuration centralisée via SocleConfiguration qui charge les paramètres depuis application.yml et les variables d’environnement.

    Priorité de configuration

    1. Variables d'environnement (priorité maximale)
    2. application.yml
    3. Valeurs par défaut dans le code
    

    2. Fichier application.yml

    2.1 Configuration minimale

    socle:
      app_name: ${APP_NAME:my-app}
      env_name: ${ENV_NAME:DEV}
      exec_id: ${EXEC_ID:${socle.app_name}-${random.uuid}}
      region: ${REGION:local}
      version: ${APP_VERSION:4.0.0}
    
    spring:
      application:
        name: ${socle.app_name}
    
    server:
      port: ${HTTP_PORT:8080}
    
    logging:
      config: classpath:log4j2.xml
    

    2.2 Configuration complète

    socle:
      # === Identification ===
      app_name: ${APP_NAME:socle-v4}
      env_name: ${ENV_NAME:DEV}
      exec_id: ${EXEC_ID:${socle.app_name}-${random.uuid}}
      region: ${REGION:local}
      version: ${APP_VERSION:4.0.0}
    
      # === HTTP Server ===
      http:
        enabled: ${HTTP_ENABLED:true}
        port: ${HTTP_PORT:8080}
        context-path: ${CONTEXT_PATH:/}
    
      # === KvBus ===
      kvbus:
        mode: ${KVBUS_MODE:in_memory}
        redis:
          host: ${REDIS_HOST:localhost}
          port: ${REDIS_PORT:6379}
          password: ${REDIS_PASSWORD:}
          database: ${REDIS_DATABASE:0}
          prefix: ${REDIS_PREFIX:socle}
    
      # === Supervisor ===
      supervisor:
        heartbeat-interval-ms: ${SUPERVISOR_HEARTBEAT_MS:10000}
        unhealthy-threshold: ${SUPERVISOR_UNHEALTHY_THRESHOLD:3}
        check-interval-ms: ${SUPERVISOR_CHECK_INTERVAL_MS:5000}
        stale-timeout-ms: ${SUPERVISOR_STALE_TIMEOUT_MS:60000}
    
      # === StatusDashboard (V4) ===
      status_dashboard:
        enabled: ${STATUS_DASHBOARD_ENABLED:true}
        port: ${STATUS_DASHBOARD_PORT:9374}
        refresh_interval: ${STATUS_DASHBOARD_REFRESH:5}
    
      # === Scheduler ===
      scheduler:
        enabled: ${SCHEDULER_ENABLED:true}
        thread-pool-size: ${SCHEDULER_POOL_SIZE:4}
    
      # === Admin API ===
      admin:
        enabled: ${ADMIN_ENABLED:true}
        auth:
          enabled: ${ADMIN_AUTH_ENABLED:false}
          username: ${ADMIN_USERNAME:admin}
          password: ${ADMIN_PASSWORD:admin}
    
      # === TechDB (V4) ===
      techdb:
        enabled: ${TECHDB_ENABLED:true}
        url: jdbc:h2:file:${TECHDB_PATH:./data/socle-techdb};MODE=PostgreSQL;DB_CLOSE_DELAY=-1
        username: socle
        password: ${TECHDB_PASSWORD:socle}
        console:
          enabled: ${H2_CONSOLE_ENABLED:false}
          path: /h2-console
    
      # === Logging (V4) ===
      logging:
        forwarder:
          enabled: ${LOG_FORWARDER_ENABLED:false}
          transport-mode: ${LOG_TRANSPORT_MODE:http}
          log-hub-url: ${LOG_HUB_URL:}
          nats-url: ${NATS_URL:}
          batch-size: ${LOG_BATCH_SIZE:100}
          flush-interval-ms: ${LOG_FLUSH_INTERVAL_MS:5000}
    
      # === Auth Client (V4) ===
      auth:
        enabled: ${AUTH_ENABLED:false}
        server-url: ${AUTH_SERVER_URL:}
        source-name: ${SOURCE_NAME:${socle.app_name}}
        api-key: ${API_KEY:}
    
      # === Worker Registry (V4) ===
      worker-registry:
        enabled: ${WORKER_REGISTRY_ENABLED:false}
        server-url: ${WORKER_REGISTRY_URL:}
        heartbeat-interval-ms: ${REGISTRY_HEARTBEAT_MS:30000}
    
    spring:
      application:
        name: ${socle.app_name}
    
    server:
      port: ${socle.http.port}
    
    logging:
      config: classpath:log4j2.xml
    

    3. Variables d’environnement

    3.1 Variables essentielles

    Variable Description Défaut Obligatoire
    APP_NAME Nom de l’application socle-v4 Recommandé
    ENV_NAME Environnement (DEV/STAGING/PROD) DEV Recommandé
    REGION Région géographique local Recommandé
    HTTP_PORT Port HTTP 8080 Non

    3.2 Variables KvBus

    Variable Description Défaut
    KVBUS_MODE Mode (in_memory/redis) in_memory
    REDIS_HOST Hôte Redis localhost
    REDIS_PORT Port Redis 6379
    REDIS_PASSWORD Mot de passe Redis
    REDIS_DATABASE Database Redis 0
    REDIS_PREFIX Préfixe des clés socle

    3.3 Variables Supervisor et Dashboard

    Variable Description Défaut
    SUPERVISOR_HEARTBEAT_MS Intervalle heartbeat attendu 10000 (10s)
    SUPERVISOR_UNHEALTHY_THRESHOLD Heartbeats manqués avant UNHEALTHY 3
    SUPERVISOR_CHECK_INTERVAL_MS Fréquence de vérification 5000 (5s)
    SUPERVISOR_STALE_TIMEOUT_MS Timeout avant STALE 60000 (1min)
    STATUS_DASHBOARD_ENABLED Activer le dashboard true
    STATUS_DASHBOARD_PORT Port du dashboard 9374
    STATUS_DASHBOARD_REFRESH Rafraîchissement (secondes) 5

    3.4 Variables V4

    Variable Description Défaut
    TECHDB_ENABLED Activer H2 TechDB true
    TECHDB_PATH Chemin fichier H2 ./data/socle-techdb
    H2_CONSOLE_ENABLED Console H2 web false
    LOG_FORWARDER_ENABLED Activer LogForwarder false
    LOG_TRANSPORT_MODE Mode transport logs http
    AUTH_ENABLED Activer auth JWT false
    WORKER_REGISTRY_ENABLED Activer registry false

    4. Classe SocleConfiguration

    package eu.lmvi.socle.config;
    
    @Configuration
    @ConfigurationProperties(prefix = "socle")
    public class SocleConfiguration {
    
        // === Identification ===
        private String app_name = "socle-v4";
        private String env_name = "DEV";
        private String exec_id;
        private String region = "local";
        private String version = "4.0.0";
    
        // === HTTP ===
        private HttpConfig http = new HttpConfig();
    
        // === KvBus ===
        private KvBusConfig kvbus = new KvBusConfig();
    
        // === Supervisor ===
        private SupervisorConfig supervisor = new SupervisorConfig();
    
        // === TechDB (V4) ===
        private TechDbConfig techdb = new TechDbConfig();
    
        // === Logging (V4) ===
        private LoggingConfig logging = new LoggingConfig();
    
        // === Auth (V4) ===
        private AuthConfig auth = new AuthConfig();
    
        // === Worker Registry (V4) ===
        private WorkerRegistryConfig workerRegistry = new WorkerRegistryConfig();
    
        // Getters / Setters...
    
        @PostConstruct
        public void init() {
            if (exec_id == null || exec_id.isEmpty()) {
                exec_id = app_name + "-" + UUID.randomUUID().toString().substring(0, 8);
            }
        }
    }
    

    4.1 Sous-configurations

    public static class HttpConfig {
        private boolean enabled = true;
        private int port = 8080;
        private String contextPath = "/";
    }
    
    public static class KvBusConfig {
        private String mode = "in_memory";
        private RedisConfig redis = new RedisConfig();
    }
    
    public static class TechDbConfig {
        private boolean enabled = true;
        private String url = "jdbc:h2:file:./data/socle-techdb";
        private String username = "socle";
        private String password = "socle";
        private ConsoleConfig console = new ConsoleConfig();
    }
    
    public static class LoggingConfig {
        private ForwarderConfig forwarder = new ForwarderConfig();
    }
    
    public static class AuthConfig {
        private boolean enabled = false;
        private String serverUrl;
        private String sourceName;
        private String apiKey;
    }
    
    public static class WorkerRegistryConfig {
        private boolean enabled = false;
        private String serverUrl;
        private long heartbeatIntervalMs = 30000;
    }
    

    5. Accès à la configuration

    5.1 Injection

    @Service
    public class MonService {
    
        @Autowired
        private SocleConfiguration config;
    
        public void doSomething() {
            String appName = config.getApp_name();
            String region = config.getRegion();
            boolean techDbEnabled = config.getTechdb().isEnabled();
        }
    }
    

    5.2 Dans un Worker

    public class MonWorker implements Worker {
    
        private final SocleConfiguration config;
    
        public MonWorker(SocleConfiguration config) {
            this.config = config;
        }
    
        @Override
        public void doWork() {
            log.info("Running in region: {}", config.getRegion());
        }
    }
    

    6. Profils Spring

    6.1 Activation

    # Via variable d'environnement
    export SPRING_PROFILES_ACTIVE=prod
    
    # Via ligne de commande
    java -jar app.jar --spring.profiles.active=prod
    

    6.2 Fichiers par profil

    src/main/resources/
    ├── application.yml           # Configuration de base
    ├── application-dev.yml       # Overrides DEV
    ├── application-staging.yml   # Overrides STAGING
    └── application-prod.yml      # Overrides PROD
    

    6.3 Exemple application-prod.yml

    socle:
      env_name: PROD
      techdb:
        console:
          enabled: false
      logging:
        forwarder:
          enabled: true
      auth:
        enabled: true
      worker-registry:
        enabled: true
    
    logging:
      config: classpath:log4j2-prod.xml
    

    7. Configuration Docker

    7.1 Dockerfile

    FROM eclipse-temurin:21-jre
    
    WORKDIR /app
    
    COPY target/socle-v004-4.0.0.jar app.jar
    
    # Variables par défaut (peuvent être overridées)
    ENV APP_NAME=socle-v4
    ENV ENV_NAME=PROD
    ENV HTTP_PORT=8080
    ENV TECHDB_PATH=/app/data/techdb
    
    EXPOSE 8080
    
    ENTRYPOINT ["java", "-jar", "app.jar"]
    

    7.2 docker-compose.yml

    version: '3.8'
    
    services:
      socle-app:
        image: socle-v4:latest
        environment:
          - APP_NAME=my-service
          - ENV_NAME=PROD
          - REGION=MTQ
          - KVBUS_MODE=redis
          - REDIS_HOST=redis
          - LOG_FORWARDER_ENABLED=true
          - LOG_HUB_URL=http://loghub:8080/api/ingest-logs
        ports:
          - "8080:8080"
        volumes:
          - ./data:/app/data
        depends_on:
          - redis
    
      redis:
        image: redis:7-alpine
        ports:
          - "6379:6379"
    

    8. Configuration Kubernetes

    8.1 ConfigMap

    apiVersion: v1
    kind: ConfigMap
    metadata:
      name: socle-config
    data:
      APP_NAME: "my-service"
      ENV_NAME: "PROD"
      REGION: "MTQ"
      KVBUS_MODE: "redis"
      LOG_FORWARDER_ENABLED: "true"
    

    8.2 Secret

    apiVersion: v1
    kind: Secret
    metadata:
      name: socle-secrets
    type: Opaque
    stringData:
      REDIS_PASSWORD: "secret-password"
      API_KEY: "my-api-key"
      TECHDB_PASSWORD: "techdb-password"
    

    8.3 Deployment

    apiVersion: apps/v1
    kind: Deployment
    metadata:
      name: socle-app
    spec:
      replicas: 2
      template:
        spec:
          containers:
            - name: socle
              image: socle-v4:latest
              envFrom:
                - configMapRef:
                    name: socle-config
                - secretRef:
                    name: socle-secrets
              ports:
                - containerPort: 8080
    

    9. Validation de configuration

    9.1 Validation au démarrage

    @Component
    public class ConfigurationValidator implements ApplicationListener<ApplicationReadyEvent> {
    
        @Autowired
        private SocleConfiguration config;
    
        @Override
        public void onApplicationEvent(ApplicationReadyEvent event) {
            validateRequired();
            validateConsistency();
        }
    
        private void validateRequired() {
            if (config.getApp_name() == null || config.getApp_name().isEmpty()) {
                throw new IllegalStateException("APP_NAME is required");
            }
        }
    
        private void validateConsistency() {
            if (config.getAuth().isEnabled() && config.getAuth().getApiKey() == null) {
                throw new IllegalStateException("API_KEY required when AUTH_ENABLED=true");
            }
        }
    }
    

    9.2 Endpoint de configuration

    GET /admin/config
    

    Retourne la configuration actuelle (sans les secrets).

    10. Bonnes pratiques

    DO

    • Utiliser les variables d’environnement pour les valeurs spécifiques à l’environnement
    • Définir des valeurs par défaut sensées
    • Utiliser des profils Spring pour les environnements
    • Valider la configuration au démarrage
    • Ne jamais committer de secrets

    DON’T

    • Ne pas hardcoder de valeurs dans le code
    • Ne pas mettre de mots de passe dans application.yml
    • Ne pas utiliser de valeurs par défaut dangereuses en prod

    11. Références

  • Socle V004 – Métriques

    Socle V004 – Métriques

    15 – Metrics

    Version : 4.0.0 Date : 2025-12-09

    1. Introduction

    Le Socle V4 expose des métriques au format Prometheus pour le monitoring et l’alerting.

    Types de métriques

    • Counter : Valeur qui ne fait qu’augmenter (requêtes, erreurs)
    • Gauge : Valeur qui peut monter et descendre (connexions actives)
    • Histogram : Distribution de valeurs (latences)
    • Summary : Similaire à histogram avec percentiles pré-calculés

    2. Configuration

    2.1 application.yml

    management:
      endpoints:
        web:
          exposure:
            include: prometheus,health,info,metrics
          base-path: /actuator
      endpoint:
        prometheus:
          enabled: true
      metrics:
        export:
          prometheus:
            enabled: true
        tags:
          application: ${socle.app_name}
          environment: ${socle.env_name}
          region: ${socle.region}
    

    2.2 Dépendances Maven

    <dependency>
        <groupId>io.micrometer</groupId>
        <artifactId>micrometer-registry-prometheus</artifactId>
    </dependency>
    

    3. Métriques Socle

    3.1 Métriques Workers

    # Nombre de workers
    socle_workers_total{application="socle-v4"} 5
    
    # Workers healthy
    socle_workers_healthy{application="socle-v4"} 5
    
    # Workers unhealthy
    socle_workers_unhealthy{application="socle-v4"} 0
    
    # État par worker
    socle_worker_status{worker="kafka-consumer",status="RUNNING"} 1
    socle_worker_status{worker="order-processor",status="RUNNING"} 1
    
    # Heartbeats par worker
    socle_worker_heartbeats_total{worker="kafka-consumer"} 1234
    socle_worker_missed_heartbeats{worker="kafka-consumer"} 0
    

    3.2 Métriques KvBus

    # Opérations
    socle_kvbus_operations_total{operation="get"} 12345
    socle_kvbus_operations_total{operation="put"} 6789
    socle_kvbus_operations_total{operation="delete"} 234
    
    # Latence
    socle_kvbus_operation_duration_seconds{operation="get",quantile="0.5"} 0.001
    socle_kvbus_operation_duration_seconds{operation="get",quantile="0.95"} 0.005
    socle_kvbus_operation_duration_seconds{operation="get",quantile="0.99"} 0.01
    
    # Nombre de clés
    socle_kvbus_keys_count 456
    

    3.3 Métriques Pipeline

    # Exécutions
    socle_pipeline_executions_total{pipeline="order-processing",status="SUCCESS"} 1234
    socle_pipeline_executions_total{pipeline="order-processing",status="FAILURE"} 12
    
    # Durée
    socle_pipeline_duration_seconds{pipeline="order-processing",quantile="0.5"} 0.5
    socle_pipeline_duration_seconds{pipeline="order-processing",quantile="0.95"} 2.0
    socle_pipeline_duration_seconds{pipeline="order-processing",quantile="0.99"} 5.0
    
    # Steps
    socle_pipeline_step_duration_seconds{step="validation",quantile="0.5"} 0.01
    socle_pipeline_step_duration_seconds{step="processing",quantile="0.5"} 0.3
    

    3.4 Métriques Resilience

    # Circuit breaker état (0=CLOSED, 1=HALF_OPEN, 2=OPEN)
    socle_circuit_breaker_state{name="payment-gateway"} 0
    
    # Tentatives de retry
    socle_retry_attempts_total{operation="external-api",attempt="1",success="true"} 1000
    socle_retry_attempts_total{operation="external-api",attempt="2",success="true"} 50
    socle_retry_attempts_total{operation="external-api",attempt="3",success="false"} 5
    

    3.5 Métriques TechDB (V4)

    # Opérations
    socle_techdb_operations_total{operation="saveOffset"} 5678
    socle_techdb_operations_total{operation="getOffset"} 12345
    
    # Taille des tables
    socle_techdb_rows_count{table="socle_offsets"} 23
    socle_techdb_rows_count{table="socle_events"} 456
    socle_techdb_rows_count{table="socle_log_fallback"} 0
    

    3.6 Métriques LogForwarder (V4)

    # Queue
    socle_logforwarder_queue_size 45
    socle_logforwarder_queue_capacity 10000
    
    # Logs envoyés
    socle_logforwarder_logs_sent_total 123456
    socle_logforwarder_logs_failed_total 23
    socle_logforwarder_logs_fallback_total 0
    
    # Batches
    socle_logforwarder_batches_sent_total 1234
    socle_logforwarder_batch_size{quantile="0.5"} 100
    

    4. Implémentation

    4.1 Enregistrement des métriques

    package eu.lmvi.socle.metrics;
    
    @Component
    public class SocleMetrics {
    
        private final MeterRegistry registry;
    
        // Counters
        private final Counter requestsTotal;
        private final Counter errorsTotal;
    
        // Gauges
        private final AtomicInteger activeConnections = new AtomicInteger(0);
    
        // Timers
        private final Timer requestDuration;
    
        public SocleMetrics(MeterRegistry registry) {
            this.registry = registry;
    
            // Counter
            this.requestsTotal = Counter.builder("socle_requests_total")
                .description("Total number of requests")
                .register(registry);
    
            this.errorsTotal = Counter.builder("socle_errors_total")
                .description("Total number of errors")
                .register(registry);
    
            // Gauge
            Gauge.builder("socle_active_connections", activeConnections, AtomicInteger::get)
                .description("Number of active connections")
                .register(registry);
    
            // Timer
            this.requestDuration = Timer.builder("socle_request_duration_seconds")
                .description("Request duration in seconds")
                .publishPercentiles(0.5, 0.95, 0.99)
                .register(registry);
        }
    
        public void recordRequest() {
            requestsTotal.increment();
        }
    
        public void recordError() {
            errorsTotal.increment();
        }
    
        public void connectionOpened() {
            activeConnections.incrementAndGet();
        }
    
        public void connectionClosed() {
            activeConnections.decrementAndGet();
        }
    
        public Timer.Sample startTimer() {
            return Timer.start(registry);
        }
    
        public void stopTimer(Timer.Sample sample) {
            sample.stop(requestDuration);
        }
    }
    

    4.2 Utilisation dans le code

    @Service
    public class OrderService {
    
        @Autowired
        private SocleMetrics metrics;
    
        public Order processOrder(Order order) {
            Timer.Sample sample = metrics.startTimer();
            metrics.recordRequest();
    
            try {
                Order result = doProcess(order);
                return result;
            } catch (Exception e) {
                metrics.recordError();
                throw e;
            } finally {
                metrics.stopTimer(sample);
            }
        }
    }
    

    4.3 Métriques avec tags

    @Component
    public class WorkerMetrics {
    
        private final MeterRegistry registry;
    
        public void recordWorkerStatus(String workerName, String status) {
            Gauge.builder("socle_worker_status", () -> 1)
                .tag("worker", workerName)
                .tag("status", status)
                .register(registry);
        }
    
        public void recordProcessed(String workerName, String type) {
            Counter.builder("socle_worker_processed_total")
                .tag("worker", workerName)
                .tag("type", type)
                .register(registry)
                .increment();
        }
    }
    

    5. Endpoint Prometheus

    5.1 Accès

    curl http://localhost:8080/actuator/prometheus
    

    5.2 Sortie

    # HELP socle_workers_total Number of workers
    # TYPE socle_workers_total gauge
    socle_workers_total{application="socle-v4",environment="PROD",region="MTQ"} 5
    
    # HELP socle_workers_healthy Number of healthy workers
    # TYPE socle_workers_healthy gauge
    socle_workers_healthy{application="socle-v4",environment="PROD",region="MTQ"} 5
    
    # HELP socle_requests_total Total number of requests
    # TYPE socle_requests_total counter
    socle_requests_total{application="socle-v4",environment="PROD",region="MTQ"} 12345
    
    # HELP socle_request_duration_seconds Request duration in seconds
    # TYPE socle_request_duration_seconds summary
    socle_request_duration_seconds{application="socle-v4",quantile="0.5"} 0.05
    socle_request_duration_seconds{application="socle-v4",quantile="0.95"} 0.2
    socle_request_duration_seconds{application="socle-v4",quantile="0.99"} 0.5
    socle_request_duration_seconds_count{application="socle-v4"} 12345
    socle_request_duration_seconds_sum{application="socle-v4"} 617.25
    

    6. Prometheus Configuration

    6.1 prometheus.yml

    global:
      scrape_interval: 15s
    
    scrape_configs:
      - job_name: 'socle-v4'
        metrics_path: '/actuator/prometheus'
        static_configs:
          - targets: ['socle-app:8080']
            labels:
              app: 'socle-v4'
              env: 'prod'
    
      - job_name: 'socle-v4-kubernetes'
        kubernetes_sd_configs:
          - role: pod
        relabel_configs:
          - source_labels: [__meta_kubernetes_pod_annotation_prometheus_io_scrape]
            action: keep
            regex: true
          - source_labels: [__meta_kubernetes_pod_annotation_prometheus_io_path]
            action: replace
            target_label: __metrics_path__
            regex: (.+)
    

    6.2 Kubernetes annotations

    apiVersion: v1
    kind: Pod
    metadata:
      annotations:
        prometheus.io/scrape: "true"
        prometheus.io/path: "/actuator/prometheus"
        prometheus.io/port: "8080"
    

    7. Grafana Dashboards

    7.1 Exemple de requêtes

    # Taux de requêtes par seconde
    rate(socle_requests_total[5m])
    
    # Taux d'erreurs
    rate(socle_errors_total[5m]) / rate(socle_requests_total[5m]) * 100
    
    # Latence P95
    histogram_quantile(0.95, rate(socle_request_duration_seconds_bucket[5m]))
    
    # Workers unhealthy
    socle_workers_unhealthy
    
    # Circuit breakers ouverts
    socle_circuit_breaker_state == 2
    
    # Queue LogForwarder
    socle_logforwarder_queue_size / socle_logforwarder_queue_capacity * 100
    

    7.2 Dashboard JSON

    {
      "title": "Socle V4 Dashboard",
      "panels": [
        {
          "title": "Request Rate",
          "type": "graph",
          "targets": [
            {
              "expr": "rate(socle_requests_total[5m])",
              "legendFormat": "{{application}}"
            }
          ]
        },
        {
          "title": "Error Rate",
          "type": "graph",
          "targets": [
            {
              "expr": "rate(socle_errors_total[5m]) / rate(socle_requests_total[5m]) * 100",
              "legendFormat": "Error %"
            }
          ]
        },
        {
          "title": "P95 Latency",
          "type": "graph",
          "targets": [
            {
              "expr": "histogram_quantile(0.95, rate(socle_request_duration_seconds_bucket[5m]))",
              "legendFormat": "P95"
            }
          ]
        },
        {
          "title": "Workers Status",
          "type": "stat",
          "targets": [
            {
              "expr": "socle_workers_healthy",
              "legendFormat": "Healthy"
            }
          ]
        }
      ]
    }
    

    8. Alerting

    8.1 Prometheus Alertmanager rules

    groups:
      - name: socle-alerts
        rules:
          - alert: SocleHighErrorRate
            expr: rate(socle_errors_total[5m]) / rate(socle_requests_total[5m]) > 0.05
            for: 5m
            labels:
              severity: warning
            annotations:
              summary: "High error rate on {{ $labels.application }}"
              description: "Error rate is {{ $value | humanizePercentage }}"
    
          - alert: SocleWorkerUnhealthy
            expr: socle_workers_unhealthy > 0
            for: 2m
            labels:
              severity: critical
            annotations:
              summary: "Unhealthy workers on {{ $labels.application }}"
              description: "{{ $value }} workers are unhealthy"
    
          - alert: SocleCircuitBreakerOpen
            expr: socle_circuit_breaker_state == 2
            for: 5m
            labels:
              severity: warning
            annotations:
              summary: "Circuit breaker {{ $labels.name }} is OPEN"
    
          - alert: SocleLogForwarderQueueHigh
            expr: socle_logforwarder_queue_size / socle_logforwarder_queue_capacity > 0.8
            for: 5m
            labels:
              severity: warning
            annotations:
              summary: "LogForwarder queue is {{ $value | humanizePercentage }} full"
    

    9. Bonnes pratiques

    DO

    • Utiliser des noms de métriques cohérents (socle_*)
    • Ajouter des tags pertinents (application, environment, region)
    • Utiliser des histogrammes pour les latences
    • Définir des alertes sur les métriques critiques
    • Documenter les métriques

    DON’T

    • Ne pas créer trop de métriques (cardinalité)
    • Ne pas utiliser de valeurs à haute cardinalité dans les tags
    • Ne pas oublier les métriques d’erreur
    • Ne pas ignorer les métriques de queue/buffer

    10. Références

  • Socle V004 – Pipeline

    Socle V004 – Pipeline

    09 – Pipeline Engine

    Version : 4.1.0 Date : 2025-12-28

    1. Introduction

    Le PipelineEngine permet de définir des chaînes de traitement composées d’étapes (steps) exécutées séquentiellement ou en parallèle.

    Caractéristiques

    • Définition déclarative des pipelines
    • Exécution séquentielle ou parallèle
    • Gestion des erreurs et retry
    • Context partagé entre étapes
    • Métriques et logging intégrés

    2. Architecture

    ┌─────────────────────────────────────────────────────────────┐
    │                      PipelineEngine                          │
    └─────────────────────────────────────────────────────────────┘
                                  │
                                  ▼
    ┌─────────────────────────────────────────────────────────────┐
    │                      Pipeline                                │
    │  ┌─────────────────────────────────────────────────────┐    │
    │  │              PipelineContext                         │    │
    │  │  - input data                                        │    │
    │  │  - shared state                                      │    │
    │  │  - results                                           │    │
    │  └─────────────────────────────────────────────────────┘    │
    │                                                              │
    │  ┌──────────┐   ┌──────────┐   ┌──────────┐                │
    │  │  Step 1  │──►│  Step 2  │──►│  Step 3  │                │
    │  └──────────┘   └──────────┘   └──────────┘                │
    │                                                              │
    └─────────────────────────────────────────────────────────────┘
    

    3. Interface Pipeline

    package eu.lmvi.socle.pipeline;
    
    public interface Pipeline<I, O> {
    
        /**
         * Nom du pipeline
         */
        String getName();
    
        /**
         * Exécute le pipeline
         */
        PipelineResult<O> execute(I input);
    
        /**
         * Exécute le pipeline avec context
         */
        PipelineResult<O> execute(I input, PipelineContext context);
    
        /**
         * Liste des étapes
         */
        List<PipelineStep<?, ?>> getSteps();
    }
    

    4. Interface PipelineStep

    package eu.lmvi.socle.pipeline;
    
    public interface PipelineStep<I, O> {
    
        /**
         * Nom de l'étape
         */
        String getName();
    
        /**
         * Exécute l'étape
         */
        StepResult<O> execute(I input, PipelineContext context);
    
        /**
         * L'étape peut-elle être retryée ?
         */
        default boolean isRetryable() {
            return true;
        }
    
        /**
         * Nombre max de retries
         */
        default int getMaxRetries() {
            return 3;
        }
    
        /**
         * L'étape est-elle optionnelle ?
         */
        default boolean isOptional() {
            return false;
        }
    }
    

    5. Context et Result

    5.1 PipelineContext

    package eu.lmvi.socle.pipeline;
    
    public class PipelineContext {
        private final Map<String, Object> attributes = new ConcurrentHashMap<>();
        private final List<StepResult<?>> stepResults = new ArrayList<>();
        private final String correlationId;
        private final Instant startTime;
    
        public PipelineContext() {
            this.correlationId = UUID.randomUUID().toString();
            this.startTime = Instant.now();
        }
    
        public void put(String key, Object value) {
            attributes.put(key, value);
        }
    
        public <T> Optional<T> get(String key, Class<T> type) {
            return Optional.ofNullable(attributes.get(key))
                .filter(type::isInstance)
                .map(type::cast);
        }
    
        public void addStepResult(StepResult<?> result) {
            stepResults.add(result);
        }
    
        public List<StepResult<?>> getStepResults() {
            return Collections.unmodifiableList(stepResults);
        }
    
        public String getCorrelationId() {
            return correlationId;
        }
    
        public Duration getElapsedTime() {
            return Duration.between(startTime, Instant.now());
        }
    }
    

    5.2 StepResult

    package eu.lmvi.socle.pipeline;
    
    public record StepResult<T>(
        String stepName,
        StepStatus status,
        T output,
        Exception error,
        Duration duration,
        int attempts
    ) {
        public boolean isSuccess() {
            return status == StepStatus.SUCCESS;
        }
    
        public boolean isFailure() {
            return status == StepStatus.FAILURE;
        }
    
        public boolean isSkipped() {
            return status == StepStatus.SKIPPED;
        }
    
        public static <T> StepResult<T> success(String name, T output, Duration duration) {
            return new StepResult<>(name, StepStatus.SUCCESS, output, null, duration, 1);
        }
    
        public static <T> StepResult<T> failure(String name, Exception error, Duration duration, int attempts) {
            return new StepResult<>(name, StepStatus.FAILURE, null, error, duration, attempts);
        }
    
        public static <T> StepResult<T> skipped(String name) {
            return new StepResult<>(name, StepStatus.SKIPPED, null, null, Duration.ZERO, 0);
        }
    }
    
    public enum StepStatus {
        SUCCESS,
        FAILURE,
        SKIPPED
    }
    

    5.3 PipelineResult

    package eu.lmvi.socle.pipeline;
    
    public record PipelineResult<T>(
        String pipelineName,
        PipelineStatus status,
        T output,
        PipelineContext context,
        Duration totalDuration
    ) {
        public boolean isSuccess() {
            return status == PipelineStatus.SUCCESS;
        }
    
        public List<StepResult<?>> getFailedSteps() {
            return context.getStepResults().stream()
                .filter(StepResult::isFailure)
                .toList();
        }
    }
    
    public enum PipelineStatus {
        SUCCESS,
        PARTIAL_SUCCESS,
        FAILURE
    }
    

    6. Implémentation

    6.1 DefaultPipelineEngine

    package eu.lmvi.socle.pipeline;
    
    @Component
    public class DefaultPipelineEngine implements PipelineEngine {
    
        private static final Logger log = LoggerFactory.getLogger(DefaultPipelineEngine.class);
    
        @Override
        public <I, O> PipelineResult<O> execute(Pipeline<I, O> pipeline, I input) {
            return execute(pipeline, input, new PipelineContext());
        }
    
        @Override
        @SuppressWarnings("unchecked")
        public <I, O> PipelineResult<O> execute(Pipeline<I, O> pipeline, I input, PipelineContext context) {
            log.info("[{}] Starting pipeline execution", pipeline.getName());
            Instant start = Instant.now();
    
            Object currentInput = input;
            O finalOutput = null;
            boolean hasFailure = false;
    
            for (PipelineStep<?, ?> step : pipeline.getSteps()) {
                PipelineStep<Object, Object> typedStep = (PipelineStep<Object, Object>) step;
    
                StepResult<Object> result = executeStep(typedStep, currentInput, context);
                context.addStepResult(result);
    
                if (result.isSuccess()) {
                    currentInput = result.output();
                    finalOutput = (O) result.output();
                } else if (result.isFailure()) {
                    if (!step.isOptional()) {
                        hasFailure = true;
                        log.error("[{}] Pipeline failed at step: {}", pipeline.getName(), step.getName());
                        break;
                    }
                    log.warn("[{}] Optional step failed, continuing: {}", pipeline.getName(), step.getName());
                }
            }
    
            Duration duration = Duration.between(start, Instant.now());
            PipelineStatus status = hasFailure
                ? PipelineStatus.FAILURE
                : (context.getStepResults().stream().anyMatch(StepResult::isFailure)
                    ? PipelineStatus.PARTIAL_SUCCESS
                    : PipelineStatus.SUCCESS);
    
            log.info("[{}] Pipeline completed with status: {} in {}ms",
                pipeline.getName(), status, duration.toMillis());
    
            return new PipelineResult<>(pipeline.getName(), status, finalOutput, context, duration);
        }
    
        private <I, O> StepResult<O> executeStep(PipelineStep<I, O> step, I input, PipelineContext context) {
            log.debug("[{}] Executing step", step.getName());
            Instant start = Instant.now();
            int attempts = 0;
            Exception lastError = null;
    
            while (attempts < step.getMaxRetries()) {
                attempts++;
                try {
                    StepResult<O> result = step.execute(input, context);
                    if (result.isSuccess()) {
                        Duration duration = Duration.between(start, Instant.now());
                        log.debug("[{}] Step completed in {}ms", step.getName(), duration.toMillis());
                        return StepResult.success(step.getName(), result.output(), duration);
                    }
                    lastError = result.error();
                } catch (Exception e) {
                    lastError = e;
                    log.warn("[{}] Step attempt {} failed: {}", step.getName(), attempts, e.getMessage());
    
                    if (!step.isRetryable() || attempts >= step.getMaxRetries()) {
                        break;
                    }
    
                    // Backoff
                    try {
                        Thread.sleep(attempts * 1000L);
                    } catch (InterruptedException ie) {
                        Thread.currentThread().interrupt();
                        break;
                    }
                }
            }
    
            Duration duration = Duration.between(start, Instant.now());
            return StepResult.failure(step.getName(), lastError, duration, attempts);
        }
    }
    

    6.2 Pipeline Builder

    package eu.lmvi.socle.pipeline;
    
    public class PipelineBuilder<I, O> {
    
        private final String name;
        private final List<PipelineStep<?, ?>> steps = new ArrayList<>();
    
        public PipelineBuilder(String name) {
            this.name = name;
        }
    
        public static <I, O> PipelineBuilder<I, O> create(String name) {
            return new PipelineBuilder<>(name);
        }
    
        public <NO> PipelineBuilder<I, NO> addStep(PipelineStep<O, NO> step) {
            steps.add(step);
            return (PipelineBuilder<I, NO>) this;
        }
    
        public <NO> PipelineBuilder<I, NO> addStep(String name, Function<O, NO> processor) {
            return addStep(new FunctionalStep<>(name, processor));
        }
    
        public Pipeline<I, O> build() {
            return new DefaultPipeline<>(name, List.copyOf(steps));
        }
    
        private record FunctionalStep<I, O>(String name, Function<I, O> processor) implements PipelineStep<I, O> {
    
            @Override
            public String getName() {
                return name;
            }
    
            @Override
            public StepResult<O> execute(I input, PipelineContext context) {
                O output = processor.apply(input);
                return StepResult.success(name, output, Duration.ZERO);
            }
        }
    }
    

    7. Utilisation

    7.1 Pipeline simple

    @Component
    public class OrderPipeline {
    
        @Autowired
        private PipelineEngine engine;
    
        public PipelineResult<OrderResult> processOrder(Order order) {
            Pipeline<Order, OrderResult> pipeline = PipelineBuilder
                .<Order, OrderResult>create("order-processing")
                .addStep("validate", this::validateOrder)
                .addStep("enrich", this::enrichOrder)
                .addStep("process", this::processOrder)
                .addStep("notify", this::notifyCustomer)
                .build();
    
            return engine.execute(pipeline, order);
        }
    
        private ValidatedOrder validateOrder(Order order) {
            // Validation...
            return new ValidatedOrder(order);
        }
    
        private EnrichedOrder enrichOrder(ValidatedOrder order) {
            // Enrichissement...
            return new EnrichedOrder(order);
        }
    
        private ProcessedOrder processOrder(EnrichedOrder order) {
            // Traitement...
            return new ProcessedOrder(order);
        }
    
        private OrderResult notifyCustomer(ProcessedOrder order) {
            // Notification...
            return new OrderResult(order, "SUCCESS");
        }
    }
    

    7.2 Étapes personnalisées

    public class ValidationStep implements PipelineStep<Order, ValidatedOrder> {
    
        @Override
        public String getName() {
            return "validation";
        }
    
        @Override
        public StepResult<ValidatedOrder> execute(Order input, PipelineContext context) {
            List<String> errors = new ArrayList<>();
    
            if (input.getCustomerId() == null) {
                errors.add("Missing customer ID");
            }
            if (input.getItems().isEmpty()) {
                errors.add("No items in order");
            }
    
            if (!errors.isEmpty()) {
                return StepResult.failure(getName(),
                    new ValidationException(errors),
                    Duration.ZERO, 1);
            }
    
            // Stocker des données dans le context
            context.put("customerId", input.getCustomerId());
    
            return StepResult.success(getName(), new ValidatedOrder(input), Duration.ZERO);
        }
    
        @Override
        public boolean isRetryable() {
            return false;  // Validation ne doit pas être retryée
        }
    }
    

    7.3 Étape optionnelle

    public class NotificationStep implements PipelineStep<ProcessedOrder, ProcessedOrder> {
    
        @Override
        public String getName() {
            return "notification";
        }
    
        @Override
        public StepResult<ProcessedOrder> execute(ProcessedOrder input, PipelineContext context) {
            try {
                sendNotification(input);
                return StepResult.success(getName(), input, Duration.ZERO);
            } catch (Exception e) {
                return StepResult.failure(getName(), e, Duration.ZERO, 1);
            }
        }
    
        @Override
        public boolean isOptional() {
            return true;  // Le pipeline continue même si la notif échoue
        }
    }
    

    7.4 Utilisation du context

    public class EnrichmentStep implements PipelineStep<ValidatedOrder, EnrichedOrder> {
    
        @Autowired
        private CustomerService customerService;
    
        @Override
        public String getName() {
            return "enrichment";
        }
    
        @Override
        public StepResult<EnrichedOrder> execute(ValidatedOrder input, PipelineContext context) {
            // Lire depuis le context
            String customerId = context.get("customerId", String.class).orElseThrow();
    
            // Enrichir
            Customer customer = customerService.getCustomer(customerId);
            EnrichedOrder enriched = new EnrichedOrder(input, customer);
    
            // Écrire dans le context pour les étapes suivantes
            context.put("customerEmail", customer.getEmail());
    
            return StepResult.success(getName(), enriched, Duration.ZERO);
        }
    }
    

    8. Gestion des erreurs

    8.1 Retry automatique

    public class ExternalApiStep implements PipelineStep<Data, ApiResponse> {
    
        @Override
        public String getName() {
            return "external-api-call";
        }
    
        @Override
        public boolean isRetryable() {
            return true;
        }
    
        @Override
        public int getMaxRetries() {
            return 5;  // 5 tentatives max
        }
    
        @Override
        public StepResult<ApiResponse> execute(Data input, PipelineContext context) {
            // Appel API qui peut échouer
            ApiResponse response = callApi(input);
            return StepResult.success(getName(), response, Duration.ZERO);
        }
    }
    

    8.2 Traitement des résultats

    PipelineResult<OrderResult> result = engine.execute(pipeline, order);
    
    if (result.isSuccess()) {
        log.info("Order processed successfully: {}", result.output());
    } else {
        // Analyser les étapes en échec
        for (StepResult<?> stepResult : result.getFailedSteps()) {
            log.error("Step {} failed after {} attempts: {}",
                stepResult.stepName(),
                stepResult.attempts(),
                stepResult.error().getMessage());
        }
    
        // Décider quoi faire
        if (result.status() == PipelineStatus.PARTIAL_SUCCESS) {
            // Certaines étapes optionnelles ont échoué
            handlePartialSuccess(result);
        } else {
            // Échec complet
            handleFailure(result);
        }
    }
    

    9. Pipelines parallèles

    public class ParallelEnrichmentStep implements PipelineStep<Order, EnrichedOrder> {
    
        @Autowired
        private ExecutorService executor;
    
        @Override
        public StepResult<EnrichedOrder> execute(Order input, PipelineContext context) {
            // Exécuter plusieurs enrichissements en parallèle
            CompletableFuture<Customer> customerFuture = CompletableFuture.supplyAsync(
                () -> customerService.getCustomer(input.getCustomerId()), executor);
    
            CompletableFuture<Inventory> inventoryFuture = CompletableFuture.supplyAsync(
                () -> inventoryService.checkInventory(input.getItems()), executor);
    
            CompletableFuture<Pricing> pricingFuture = CompletableFuture.supplyAsync(
                () -> pricingService.calculatePrice(input), executor);
    
            try {
                CompletableFuture.allOf(customerFuture, inventoryFuture, pricingFuture).join();
    
                EnrichedOrder enriched = new EnrichedOrder(
                    input,
                    customerFuture.get(),
                    inventoryFuture.get(),
                    pricingFuture.get()
                );
    
                return StepResult.success(getName(), enriched, Duration.ZERO);
            } catch (Exception e) {
                return StepResult.failure(getName(), e, Duration.ZERO, 1);
            }
        }
    }
    

    10. Métriques

    @Component
    public class PipelineMetrics {
    
        private final MeterRegistry registry;
    
        public void recordPipelineExecution(PipelineResult<?> result) {
            Timer.builder("socle_pipeline_duration")
                .tag("pipeline", result.pipelineName())
                .tag("status", result.status().name())
                .register(registry)
                .record(result.totalDuration());
    
            Counter.builder("socle_pipeline_executions")
                .tag("pipeline", result.pipelineName())
                .tag("status", result.status().name())
                .register(registry)
                .increment();
        }
    
        public void recordStepExecution(StepResult<?> result) {
            Timer.builder("socle_pipeline_step_duration")
                .tag("step", result.stepName())
                .tag("status", result.status().name())
                .register(registry)
                .record(result.duration());
        }
    }
    

    11. Bonnes pratiques

    DO

    • Garder les étapes petites et focalisées
    • Utiliser le context pour partager des données entre étapes
    • Marquer les étapes non-critiques comme optionnelles
    • Logger au niveau des étapes pour le debugging
    • Configurer des retries appropriés pour les appels externes

    DON’T

    • Ne pas faire de pipelines avec trop d’étapes (> 10)
    • Ne pas mélanger logique métier et infrastructure dans une étape
    • Ne pas ignorer les erreurs des étapes obligatoires
    • Ne pas utiliser de pipelines pour du traitement simple

    12. Références

    Pipeline Engine V2 (Nouveau)

    Ajouté en version 4.1.0

    13. Introduction Pipeline V2

    Le Pipeline V2 est une refonte complète du système de pipelines, conçue pour les applications à haute disponibilité nécessitant :

    • Garantie at-least-once : Aucune perte de message grâce au pattern Queue/Claim/Ack
    • Virtual Threads (Java 21) : Scalabilité maximale avec les threads virtuels
    • Reprise après crash : État persisté permettant la reprise au dernier stage OK
    • DLQ (Dead Letter Queue) : Gestion des échecs définitifs avec replay

    Quand utiliser V2 vs V1 ?

    Critère Pipeline V1 Pipeline V2
    Exécution Synchrone, mono-thread Asynchrone, multi-thread
    Garantie de livraison At-most-once At-least-once
    Persistance Non Oui (TechDB / H2)
    DLQ Non Oui
    Use case Traitement simple Flux critiques, haute disponibilité

    14. Architecture V2

    ┌─────────────────────────────────────────────────────────────────┐
    │                            MOP                                   │
    │               (supervision, restart, health)                     │
    └───────────────────────────┬─────────────────────────────────────┘
                                │
    ┌───────────────────────────▼─────────────────────────────────────┐
    │                     PipelineV2                                   │
    │                                                                  │
    │  ┌────────────┐     ┌────────────┐     ┌────────────┐          │
    │  │  Stage A   │────►│  Stage B   │────►│  Stage C   │          │
    │  │ ┌────────┐ │     │ ┌────────┐ │     │ ┌────────┐ │          │
    │  │ │ Queue  │ │     │ │ Queue  │ │     │ │ Queue  │ │          │
    │  │ └────────┘ │     │ └────────┘ │     │ └────────┘ │          │
    │  │ VThreads  │     │ VThreads  │     │ VThreads  │          │
    │  └────────────┘     └────────────┘     └────────────┘          │
    │                                                                  │
    │         ┌─────────────────┴─────────────────┐                   │
    │    ┌────▼────┐                        ┌────▼────┐              │
    │    │ Context │                        │   DLQ   │              │
    │    │ (state) │                        │ (errors)│              │
    │    └─────────┘                        └─────────┘              │
    └─────────────────────────────────────────────────────────────────┘
    

    Composants

    • StageQueue : Queue avec sémantique Claim/Ack entre chaque stage
    • VThreadStageExecutor : Workers Virtual Threads consommant les queues
    • PersistentPipelineContext : État persisté pour reprise après crash
    • DLQ : Messages en échec après max retries

    15. Création d’un Pipeline V2

    15.1 Builder Fluent

    import eu.lmvi.socle.pipeline.v2.PipelineBuilderV2;
    import eu.lmvi.socle.pipeline.v2.PipelineV2;
    
    // Création du pipeline
    PipelineV2<Order, Result> pipeline = PipelineBuilderV2
        .<Order>create("order-processing")
        .description("Pipeline de traitement des commandes")
    
        // Stage 1: Validation (2 workers, timeout 30s)
        .addStage("validate", this::validateOrder)
            .concurrency(2)
            .timeout(Duration.ofSeconds(30))
    
        // Stage 2: Enrichissement (4 workers, queue 1000)
        .addStage("enrich", this::enrichOrder)
            .concurrency(4)
            .queueSize(1000)
    
        // Stage 3: Publication (2 workers)
        .addStage("publish", this::publishOrder)
            .concurrency(2)
            .maxRetries(5)
            .backoff(Duration.ofMillis(200), 2.0)
    
        .build();
    

    15.2 Configuration par stage

    .addStage("nom", processor)
        .concurrency(4)           // Nombre de workers parallèles
        .queueSize(500)           // Taille max de la queue
        .timeout(Duration.ofMinutes(5))  // Timeout par message
        .maxRetries(3)            // Tentatives avant DLQ
        .backoff(base, multiplier) // Backoff exponentiel
        .optional()               // Stage optionnel (échec non bloquant)
    

    16. Cycle de vie

    16.1 Démarrage/Arrêt

    // Démarrer le pipeline (lance tous les workers)
    pipeline.start();
    
    // Vérifier l'état
    boolean running = pipeline.isRunning();
    
    // Arrêt gracieux (attend la fin des traitements en cours)
    pipeline.stop();
    
    // Arrêt immédiat (interrompt tout)
    pipeline.stopNow();
    

    16.2 Soumission de messages

    // Soumettre un message
    String messageId = pipeline.submit(order, "exec-001");
    
    // Avec métadonnées
    String messageId = pipeline.submit(order, "exec-001", Map.of(
        "source", "api",
        "priority", "high"
    ));
    

    16.3 Suivi d’exécution

    // État d'une exécution
    Optional<PipelineState> state = pipeline.getExecutionState("exec-001");
    state.ifPresent(s -> {
        System.out.println("Status: " + s.status());
        System.out.println("Current stage: " + s.currentStage());
        System.out.println("Duration: " + s.duration());
    });
    
    // Liste des exécutions actives
    List<PipelineState> active = pipeline.listActiveExecutions();
    
    // Statistiques globales
    PipelineV2.PipelineStats stats = pipeline.getStats();
    System.out.println("Submitted: " + stats.totalSubmitted());
    System.out.println("Completed: " + stats.totalCompleted());
    System.out.println("Failed: " + stats.totalFailed());
    

    17. Queue/Claim/Ack

    17.1 Pattern de consommation

    Le Pipeline V2 utilise un pattern de consommation fiable :

    1. CLAIM   : Le worker réserve un message (lease)
    2. PROCESS : Le worker traite le message
    3. ACK     : Succès → message supprimé
       ou
       NACK    : Échec → retry ou DLQ
    

    17.2 Lease et récupération

    Si un worker crash pendant le traitement :

    • Le lease expire après le timeout configuré
    • Le message est automatiquement remis en queue
    • Un autre worker le reprend
    // Timeout de lease par défaut: 5 minutes
    // Configurable par stage via timeout()
    

    17.3 Implémentations disponibles

    Implémentation Persistance Use case
    InMemoryStageQueue Non Développement, tests
    TechDbStageQueue Oui (H2) Production

    18. Dead Letter Queue (DLQ)

    18.1 Envoi en DLQ

    Un message est envoyé en DLQ après :

    • maxRetries tentatives échouées
    • Une erreur non-retryable

    18.2 Consultation

    // Via API REST
    GET /admin/pipelines/v2/{name}/dlq
    
    // Programmatiquement (via StageQueue)
    List<DlqMessage<Order>> messages = queue.peekDlq(100);
    for (DlqMessage<Order> msg : messages) {
        System.out.println("ID: " + msg.messageId());
        System.out.println("Error: " + msg.errorMessage());
        System.out.println("Attempts: " + msg.attempts());
        System.out.println("Failed at: " + msg.failedAt());
    }
    

    18.3 Replay

    // Via API REST
    POST /admin/pipelines/v2/{name}/dlq/{messageId}/replay
    
    // Programmatiquement
    boolean replayed = queue.replayFromDlq(messageId);
    
    // Replay tous les messages
    int count = queue.replayAllFromDlq("exec-001");
    

    18.4 Suppression

    // Via API REST
    DELETE /admin/pipelines/v2/{name}/dlq/{messageId}
    
    // Programmatiquement
    queue.deleteFromDlq(messageId);
    
    // Purge complète
    queue.purgeDlq();
    

    19. Persistance et reprise

    19.1 Context persistant

    Le Pipeline V2 peut persister son état pour survivre aux redémarrages :

    import eu.lmvi.socle.pipeline.context.TechDbPipelineContext;
    
    // Créer un context persistant
    Supplier<Connection> connectionSupplier = () -> dataSource.getConnection();
    PersistentPipelineContext context = new TechDbPipelineContext(connectionSupplier);
    
    // Utiliser avec le builder
    PipelineV2<Order, Result> pipeline = PipelineBuilderV2
        .<Order>create("order-processing")
        .context(context)  // Utilise le context persistant
        .addStage(...)
        .build();
    

    19.2 Reprise après crash

    Au redémarrage, les messages non-ackés sont automatiquement re-traités :

    1. Les messages en status CLAIMED dont le lease a expiré → remis en PENDING
    2. Les stages reprennent leur consommation normalement
    3. Grâce à l’idempotence, les messages déjà traités ne sont pas re-traités

    20. Idempotence (OBLIGATOIRE)

    20.1 Pourquoi ?

    Le modèle at-least-once signifie qu’un message peut être traité plusieurs fois :

    • Crash après traitement mais avant ACK
    • Timeout de lease pendant un traitement long
    • Replay depuis la DLQ

    Chaque stage DOIT être idempotent.

    20.2 Patterns recommandés

    // Pattern 1: Vérifier "déjà traité"
    public Order processOrder(Order order) {
        if (orderRepository.exists(order.getId())) {
            return orderRepository.get(order.getId()); // Déjà traité
        }
        // Traiter...
        return orderRepository.save(order);
    }
    
    // Pattern 2: UPSERT au lieu d'INSERT
    public void saveOrder(Order order) {
        orderRepository.upsert(order); // Idempotent par nature
    }
    
    // Pattern 3: Utiliser executionId comme clé
    public void processWithDedup(Order order, String executionId) {
        String dedupKey = "order:" + order.getId() + ":" + executionId;
        if (kvBus.exists(dedupKey)) {
            return; // Déjà traité pour cette exécution
        }
    
        // Traiter...
    
        kvBus.set(dedupKey, true, Duration.ofDays(7));
    }
    

    20.3 Anti-patterns

    // MAUVAIS: INSERT sans vérification
    db.insert(order); // Échoue si déjà inséré
    
    // MAUVAIS: Compteur incrémental
    counter.increment(); // Compté plusieurs fois en cas de replay
    
    // MAUVAIS: Email sans déduplication
    emailService.send(email); // Envoyé plusieurs fois
    

    21. API REST d’administration

    21.1 Endpoints disponibles

    GET  /admin/pipelines/v2                    → Liste des pipelines
    GET  /admin/pipelines/v2/{name}             → État d'un pipeline
    GET  /admin/pipelines/v2/{name}/stages      → État des stages
    GET  /admin/pipelines/v2/{name}/executions  → Exécutions en cours
    
    GET  /admin/pipelines/v2/{name}/dlq         → Messages en DLQ
    POST /admin/pipelines/v2/{name}/dlq/{id}/replay  → Rejouer un message
    DELETE /admin/pipelines/v2/{name}/dlq/{id}  → Supprimer de la DLQ
    POST /admin/pipelines/v2/{name}/dlq/replay-all   → Rejouer tous
    
    POST /admin/pipelines/v2/{name}/start       → Démarrer le pipeline
    POST /admin/pipelines/v2/{name}/stop        → Arrêter gracieusement
    POST /admin/pipelines/v2/{name}/stop-now    → Arrêter immédiatement
    

    21.2 Exemple de réponse

    GET /admin/pipelines/v2/order-processing
    
    {
      "timestamp": 1735387200000,
      "name": "order-processing",
      "description": "Pipeline de traitement des commandes",
      "running": true,
      "stats": {
        "stage_count": 3,
        "active_executions": 5,
        "total_submitted": 1500,
        "total_completed": 1480,
        "total_failed": 12
      }
    }
    

    22. Métriques Prometheus

    22.1 Métriques exposées

    # Compteurs
    socle_pipeline_v2_messages_total{pipeline="order-processing", stage="validate", status="success"}
    socle_pipeline_v2_messages_total{pipeline="order-processing", stage="validate", status="failed"}
    socle_pipeline_v2_dlq_total{pipeline="order-processing", stage="validate"}
    
    # Gauges
    socle_pipeline_v2_queue_pending{pipeline="order-processing", stage="validate"}
    socle_pipeline_v2_queue_processing{pipeline="order-processing", stage="validate"}
    socle_pipeline_v2_dlq_size{pipeline="order-processing", stage="validate"}
    socle_pipeline_v2_active_executions{pipeline="order-processing"}
    socle_pipeline_v2_running{pipeline="order-processing"}
    
    # Histogrammes (durée de traitement)
    socle_pipeline_v2_stage_duration_seconds{pipeline="order-processing", stage="validate", quantile="0.5"}
    socle_pipeline_v2_stage_duration_seconds{pipeline="order-processing", stage="validate", quantile="0.95"}
    socle_pipeline_v2_stage_duration_seconds{pipeline="order-processing", stage="validate", quantile="0.99"}
    

    22.2 Alertes recommandées

    # DLQ croissante
    - alert: PipelineDLQGrowing
      expr: increase(socle_pipeline_v2_dlq_total[1h]) > 10
      labels:
        severity: warning
    
    # Queue qui s'accumule (backpressure)
    - alert: PipelineQueueBacklog
      expr: socle_pipeline_v2_queue_pending > 1000
      for: 5m
      labels:
        severity: warning
    
    # Pipeline arrêté
    - alert: PipelineStopped
      expr: socle_pipeline_v2_running == 0
      for: 1m
      labels:
        severity: critical
    

    23. Bonnes pratiques V2

    DO

    • Toujours implémenter l’idempotence dans chaque stage
    • Utiliser TechDbPipelineContext en production pour la persistance
    • Configurer des maxRetries appropriés (3-5 pour API externes)
    • Monitorer les métriques DLQ et queue_pending
    • Traiter régulièrement les messages en DLQ (replay ou suppression)

    DON’T

    • Ne jamais supposer qu’un message ne sera traité qu’une fois
    • Ne pas ignorer les messages en DLQ
    • Ne pas mettre de timeout trop courts (risque de faux positifs)
    • Ne pas créer trop de stages (overhead de queues)
    • Ne pas oublier d’appeler pipeline.stop() à l’arrêt de l’application

    24. Migration V1 → V2

    24.1 Changements d’API

    V1 V2
    PipelineEngine.execute(pipeline, input) pipeline.submit(input, executionId)
    Pipeline<I,O> PipelineV2<I,O>
    PipelineBuilder PipelineBuilderV2
    Exécution synchrone Exécution asynchrone

    24.2 Étapes de migration

    1. Rendre les steps idempotents (obligatoire avant migration)
    2. Créer le pipeline avec PipelineBuilderV2
    3. Remplacer execute() par submit()
    4. Ajouter le suivi asynchrone des résultats si nécessaire
    5. Configurer la persistance (TechDbPipelineContext)
    6. Activer les métriques et monitoring DLQ

    25. Configuration Dynamique (YAML)

    Ajouté en version 4.1.0

    25.1 Introduction

    Le Pipeline V2 peut maintenant être configuré entièrement via YAML, permettant :

    • Activation/désactivation des stages sans recompiler
    • Configuration dynamique de la concurrence, timeouts, retries
    • Ajout de nouveaux stages sans modifier le code de la factory
    • Modularité des workers (un worker = un composant Spring)

    25.2 Architecture

    ┌─────────────────────────────────────────────────────────────────┐
    │                        application.yml                           │
    │  cdc:                                                            │
    │    pipeline:                                                     │
    │      stages:                                                     │
    │        - name: filter                                            │
    │          worker: filterWorker                                    │
    │          enabled: true                                           │
    └───────────────────────────────┬─────────────────────────────────┘
                                    │
                                    ▼
    ┌───────────────────────────────────────────────────────────────────┐
    │            PipelineYamlConfig (@ConfigurationProperties)          │
    │                                                                    │
    │  - name: String                                                    │
    │  - stages: List<StageYamlConfig>                                   │
    │  - getEnabledStages(): List<StageYamlConfig>                       │
    └───────────────────────────────┬───────────────────────────────────┘
                                    │
                                    ▼
    ┌───────────────────────────────────────────────────────────────────┐
    │                    DynamicPipelineBuilder                          │
    │                                                                    │
    │  .config(pipelineConfig)                                           │
    │  .workers(Map<String, PipelineStageWorker>)                        │
    │  .build() → PipelineV2<I, O>                                       │
    └───────────────────────────────────────────────────────────────────┘
    

    25.3 Interface PipelineStageWorker

    Chaque stage est implémenté comme un composant Spring implémentant PipelineStageWorker :

    package eu.lmvi.socle.pipeline.v2.worker;
    
    public interface PipelineStageWorker<I, O> {
    
        /**
         * Nom unique du worker (utilisé dans la config YAML)
         */
        String getName();
    
        /**
         * Initialise le worker (chargement ressources, compilation scripts, etc.)
         */
        default void initialize() throws Exception {}
    
        /**
         * Traite un élément d'entrée (DOIT être thread-safe)
         */
        O process(I input) throws Exception;
    
        /**
         * Libère les ressources
         */
        default void shutdown() {}
    
        /**
         * Indique si ce worker est activé
         */
        default boolean isEnabled() { return true; }
    
        /**
         * Description pour le monitoring
         */
        default String getDescription() { return getName(); }
    
        /**
         * Ordre de priorité (plus bas = plus tôt)
         */
        default int getOrder() { return 100; }
    }
    

    25.4 Exemple d’implémentation

    @Component("filterWorker")
    public class FilterStageWorker implements PipelineStageWorker<WalEvent, PipelineMessage> {
    
        @Autowired
        private TableFilter tableFilter;
    
        @Override
        public String getName() {
            return "filter";
        }
    
        @Override
        public int getOrder() {
            return 10;  // Premier dans le pipeline
        }
    
        @Override
        public PipelineMessage process(WalEvent event) throws Exception {
            PipelineMessage message = new PipelineMessage(event);
            if (!tableFilter.isAllowed(event.getTable())) {
                message.markAsFiltered("table_not_in_whitelist");
            }
            return message;
        }
    }
    

    25.5 Configuration YAML

    cdc:
      pipeline:
        name: cdc-pipeline-v2
        description: Pipeline CDC PostgreSQL vers Kafka
        enabled: true
        stages:
          # Stage 1: Filtrage
          - name: filter
            worker: filterWorker        # Nom du bean Spring
            enabled: true
            concurrency: 1
            timeout-seconds: 5
            max-retries: 1
            order: 10
    
          # Stage 2: Transformation
          - name: transform
            worker: transformWorker
            enabled: true
            concurrency: 2
            timeout-seconds: 30
            order: 20
    
          # Stage 3a: Enrichissement Rules
          - name: enrich-rules
            worker: rulesEnrichmentWorker
            enabled: true
            concurrency: 2
            order: 30
    
          # Stage 3b: Enrichissement BeanShell
          - name: enrich-beanshell
            worker: beanshellEnrichmentWorker
            enabled: ${CDC_ENRICH_BSH_ENABLED:true}  # Variable d'environnement
            concurrency: 2
            order: 31
    
          # Stage 3c: Enrichissement JavaScript
          - name: enrich-javascript
            worker: javascriptEnrichmentWorker
            enabled: ${CDC_ENRICH_JS_ENABLED:true}
            concurrency: 4
            timeout-seconds: 60
            order: 32
    
          # Stage 3d: Enrichissement Janino (nouveau)
          - name: enrich-janino
            worker: janinoEnrichmentWorker
            enabled: ${CDC_ENRICH_JANINO_ENABLED:false}  # Désactivé par défaut
            concurrency: 2
            order: 33
    
          # Stage 4: Publication Kafka
          - name: publish
            worker: publishWorker
            enabled: true
            concurrency: 2
            max-retries: 5
            order: 40
    
          # Stage 5: Commit LSN
          - name: commit
            worker: commitWorker
            enabled: true
            concurrency: 1
            order: 50
    

    25.6 Classe de configuration Spring

    @Configuration
    @ConfigurationProperties(prefix = "cdc.pipeline")
    public class CdcPipelineConfig extends PipelineYamlConfig {
        // Hérite de toutes les propriétés
    }
    

    25.7 Construction du pipeline

    @Component
    public class CdcPipelineFactory {
    
        @Autowired
        private CdcPipelineConfig pipelineConfig;
    
        @Autowired
        private Map<String, PipelineStageWorker<?, ?>> availableWorkers;
    
        @PostConstruct
        public void init() {
            // Log des workers disponibles
            log.info("Available workers: {}", availableWorkers.keySet());
    
            // Construire le pipeline dynamiquement
            cdcPipeline = DynamicPipelineBuilder.create()
                .config(pipelineConfig)
                .workers(availableWorkers)
                .build();
    
            log.info("Pipeline created with {} stages",
                pipelineConfig.countEnabledStages());
        }
    }
    

    25.8 Avantages

    Aspect Avant (hardcodé) Après (YAML)
    Modification Recompilation Redémarrage
    Activation stage Code Java Variable d’env
    Ajout stage Modifier factory Ajouter worker + config
    Configuration Dans le code Centralisée
    Environnements Branches différentes Même code, config différente

    25.9 Activation/Désactivation à chaud

    Via variables d’environnement dans .env :

    # Désactiver BeanShell
    CDC_ENRICH_BSH_ENABLED=false
    
    # Activer Janino
    CDC_ENRICH_JANINO_ENABLED=true
    
    # Redémarrer le service
    sudo systemctl restart cdc-reflet-kafka-v02
    

    25.10 Ajout d’un nouveau type d’enrichissement

    1. Créer le worker :
    @Component("monNouveauWorker")
    public class MonNouveauEnrichmentWorker
        implements PipelineStageWorker<PipelineMessage, PipelineMessage> {
        // ...
    }
    
    1. Ajouter dans application.yml :
    stages:
      - name: enrich-nouveau
        worker: monNouveauWorker
        enabled: ${CDC_ENRICH_NOUVEAU_ENABLED:false}
        order: 34
    
    1. Redémarrer – le pipeline inclura automatiquement le nouveau stage.

    26. Références V2

  • Socle V004 – Guides Pratiques

    Socle V004 – Guides Pratiques

    17 – How-To Guides

    Version : 4.0.0 Date : 2025-12-09

    1. Comment créer un nouveau Worker

    1.1 Worker simple

    package com.myapp.worker;
    
    import eu.lmvi.socle.worker.Worker;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.stereotype.Component;
    
    @Component
    public class MyWorker implements Worker {
    
        private static final Logger log = LoggerFactory.getLogger(MyWorker.class);
        private volatile boolean running = false;
    
        @Override
        public String getName() {
            return "my-worker";
        }
    
        @Override
        public void initialize() {
            log.info("[{}] Initializing", getName());
        }
    
        @Override
        public void start() {
            log.info("[{}] Starting", getName());
            running = true;
        }
    
        @Override
        public void doWork() {
            if (!running) return;
            // Votre logique ici
        }
    
        @Override
        public void stop() {
            log.info("[{}] Stopping", getName());
            running = false;
        }
    
        @Override
        public boolean isHealthy() {
            return running;
        }
    
        @Override
        public Map<String, Object> getStats() {
            return Map.of("running", running);
        }
    }
    

    1.2 Worker avec priorité

    @Override
    public int getStartPriority() {
        return 10;  // Démarre en premier
    }
    
    @Override
    public int getStopPriority() {
        return 90;  // S'arrête en dernier
    }
    

    1.3 Worker schedulé

    @Override
    public String getSchedule() {
        return "0 0 6 * * ?";  // Tous les jours à 6h
    }
    
    @Override
    public boolean isScheduled() {
        return true;
    }
    

    2. Comment utiliser KvBus

    2.1 Opérations basiques

    @Service
    public class MyService {
    
        @Autowired
        private KvBus kvBus;
    
        public void example() {
            // Stocker
            kvBus.put("key", "value");
            kvBus.put("key-with-ttl", "value", Duration.ofHours(1));
    
            // Récupérer
            Optional<String> value = kvBus.get("key");
    
            // Supprimer
            kvBus.delete("key");
    
            // Compteur atomique
            long count = kvBus.increment("counter");
        }
    }
    

    2.2 JSON

    // Stocker un objet
    kvBus.putJson("order:123", order);
    
    // Récupérer un objet
    Optional<Order> order = kvBus.getJson("order:123", Order.class);
    

    2.3 Lock distribué

    public boolean tryLock(String resource) {
        return kvBus.putIfAbsent("lock:" + resource, "locked", Duration.ofMinutes(5));
    }
    
    public void unlock(String resource) {
        kvBus.delete("lock:" + resource);
    }
    

    3. Comment utiliser SharedDataRegistry

    3.1 Key-Value

    @Service
    public class MyService {
    
        @Autowired
        private SharedDataRegistry registry;
    
        public void example() {
            // Stocker avec niveau de santé
            registry.put("database.connected", true, HealthLevel.CRITICAL);
    
            // Récupérer
            boolean connected = registry.getBoolean("database.connected").orElse(false);
        }
    }
    

    3.2 Compteurs

    // Créer une séquence
    registry.createSequence("orders.processed", 0, HealthLevel.NORMAL);
    
    // Incrémenter
    long count = registry.incrementSequence("orders.processed");
    
    // Lire
    long total = registry.getSequence("orders.processed");
    

    4. Comment utiliser TechDB (V4)

    4.1 Offsets

    @Service
    public class MyService {
    
        @Autowired
        private TechDbManager techDb;
    
        public void example() {
            // Sauvegarder un offset
            techDb.saveOffset("kafka", "my-topic-0", 123456L, null);
    
            // Récupérer un offset
            OptionalLong offset = techDb.getOffset("kafka", "my-topic-0");
        }
    }
    

    4.2 État des workers

    // Sauvegarder l'état
    techDb.saveWorkerState("my-worker", "RUNNING", Map.of("progress", 50));
    
    // Récupérer l'état
    Optional<WorkerState> state = techDb.getWorkerState("my-worker");
    

    4.3 Événements techniques

    // Logger un événement
    techDb.logEvent("ERROR", Map.of(
        "message", "Connection failed",
        "target", "database"
    ));
    
    // Récupérer les événements
    List<TechEvent> events = techDb.getEvents("ERROR", Instant.now().minus(1, ChronoUnit.HOURS), 100);
    

    5. Comment implémenter un Pipeline

    5.1 Pipeline simple

    Pipeline<Order, ProcessedOrder> pipeline = PipelineBuilder
        .<Order, ProcessedOrder>create("order-processing")
        .addStep("validate", this::validateOrder)
        .addStep("enrich", this::enrichOrder)
        .addStep("process", this::processOrder)
        .build();
    
    PipelineResult<ProcessedOrder> result = pipelineEngine.execute(pipeline, order);
    

    5.2 Étape personnalisée

    public class ValidationStep implements PipelineStep<Order, ValidatedOrder> {
    
        @Override
        public String getName() {
            return "validation";
        }
    
        @Override
        public StepResult<ValidatedOrder> execute(Order input, PipelineContext context) {
            // Validation...
            return StepResult.success(getName(), new ValidatedOrder(input), Duration.ZERO);
        }
    
        @Override
        public boolean isRetryable() {
            return false;
        }
    }
    

    6. Comment configurer le logging (V4)

    6.1 Log4j2 basique

    <!-- src/main/resources/log4j2.xml -->
    <?xml version="1.0" encoding="UTF-8"?>
    <Configuration status="WARN">
        <Appenders>
            <Console name="Console" target="SYSTEM_OUT">
                <PatternLayout pattern="%d{ISO8601} %-5level [%thread] %logger{36} - %msg%n"/>
            </Console>
        </Appenders>
        <Loggers>
            <Root level="INFO">
                <AppenderRef ref="Console"/>
            </Root>
        </Loggers>
    </Configuration>
    

    6.2 Avec LogForwarder

    # application.yml
    socle:
      logging:
        forwarder:
          enabled: true
          transport-mode: http
          log-hub-url: https://logs.mycompany.com/api/ingest
    

    7. Comment utiliser l’authentification JWT (V4)

    7.1 Configuration

    socle:
      auth:
        enabled: true
        server-url: https://auth.mycompany.com
        api-key: ${API_KEY}
    

    7.2 Utilisation

    @Service
    public class SecuredService {
    
        @Autowired(required = false)
        private SocleAuthClient authClient;
    
        public void callSecuredApi() {
            if (authClient == null) {
                throw new IllegalStateException("Auth not configured");
            }
    
            String token = authClient.getValidAccessToken();
    
            // Utiliser le token dans les requêtes HTTP
            Request request = new Request.Builder()
                .url("https://api.mycompany.com/data")
                .header("Authorization", "Bearer " + token)
                .build();
        }
    }
    

    8. Comment ajouter des métriques personnalisées

    8.1 Counter

    @Component
    public class MyMetrics {
    
        private final Counter ordersProcessed;
    
        public MyMetrics(MeterRegistry registry) {
            this.ordersProcessed = Counter.builder("my_orders_processed_total")
                .description("Total orders processed")
                .register(registry);
        }
    
        public void orderProcessed() {
            ordersProcessed.increment();
        }
    }
    

    8.2 Timer

    private final Timer processingTime;
    
    public MyMetrics(MeterRegistry registry) {
        this.processingTime = Timer.builder("my_processing_duration_seconds")
            .description("Processing duration")
            .publishPercentiles(0.5, 0.95, 0.99)
            .register(registry);
    }
    
    public void process() {
        Timer.Sample sample = Timer.start();
        try {
            doProcess();
        } finally {
            sample.stop(processingTime);
        }
    }
    

    9. Comment gérer la résilience

    9.1 Retry

    @Autowired
    private RetryTemplate retryTemplate;
    
    public Data fetchData() {
        return retryTemplate.execute(() -> httpClient.get("/api/data"));
    }
    

    9.2 Circuit Breaker

    @Autowired
    private CircuitBreakerRegistry cbRegistry;
    
    public Data fetchData() {
        CircuitBreaker cb = cbRegistry.getOrCreate("external-api");
    
        return cb.executeWithFallback(
            () -> httpClient.get("/api/data"),
            () -> getCachedData()
        );
    }
    

    10. Comment déployer sur Kubernetes

    10.1 Build de l’image

    # Build
    mvn clean package -DskipTests
    docker build -t my-app:1.0.0 .
    
    # Push
    docker push my-registry/my-app:1.0.0
    

    10.2 Déploiement

    # Appliquer les manifests
    kubectl apply -f k8s/
    
    # Ou avec Helm
    helm install my-app ./chart -n my-namespace
    

    10.3 Vérification

    # Logs
    kubectl logs -f deployment/my-app
    
    # Port forward
    kubectl port-forward svc/my-app 8080:80
    
    # Health check
    curl http://localhost:8080/admin/health
    

    11. Comment migrer de V3 à V4

    11.1 Dépendances Maven

    <!-- Remplacer Logback par Log4j2 -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-log4j2</artifactId>
    </dependency>
    
    <!-- Ajouter H2 -->
    <dependency>
        <groupId>com.h2database</groupId>
        <artifactId>h2</artifactId>
    </dependency>
    

    11.2 Configuration

    # Ajouter à application.yml
    socle:
      techdb:
        enabled: true
      logging:
        forwarder:
          enabled: false
      auth:
        enabled: false
      worker-registry:
        enabled: false
    
    logging:
      config: classpath:log4j2.xml
    

    11.3 Fichiers Log4j2

    Créer src/main/resources/log4j2.xml et log4j2.component.properties.

    Voir 25-MIGRATION-V3-V4 pour le guide complet.

    12. Comment debugger

    12.1 H2 Console

    socle:
      techdb:
        console:
          enabled: true
          path: /h2-console
    

    Accéder à http://localhost:8080/h2-console

    12.2 Endpoints Admin

    # État de santé
    curl http://localhost:8080/admin/health
    
    # Workers
    curl http://localhost:8080/admin/workers
    
    # Registry
    curl http://localhost:8080/admin/registry
    
    # Métriques
    curl http://localhost:8080/actuator/prometheus
    

    12.3 Logs

    // Activer le debug pour le Socle
    logging.level.eu.lmvi.socle=DEBUG
    

    13. Références

  • Socle V004 – Dépannage

    Socle V004 – Dépannage

    18 – Troubleshooting

    Version : 4.0.0 Date : 2025-12-09

    1. Problèmes de démarrage

    1.1 Application ne démarre pas

    Symptôme : L’application ne démarre pas ou crashe immédiatement.

    Causes possibles :

    1. Port déjà utilisé

      Error: Address already in use: bind
      

      Solution :

      # Trouver le processus
      lsof -i :8080
      # Changer le port
      export HTTP_PORT=8081
      
    2. Configuration manquante

      Failed to bind properties under 'socle.xxx'
      

      Solution : Vérifier les variables d’environnement et application.yml

    3. Erreur de logging

      ERROR StatusLogger Log4j2 could not find a logging implementation
      

      Solution : Vérifier que log4j2.xml existe dans src/main/resources/

    1.2 Workers ne démarrent pas

    Symptôme : Les workers sont enregistrés mais restent en état REGISTERED.

    Solutions :

    1. Vérifier les logs d’initialisation
    2. Vérifier les dépendances (base de données, Redis, etc.)
    3. Vérifier les priorités de démarrage
    curl http://localhost:8080/admin/workers
    

    2. Problèmes de connectivité

    2.1 Redis non accessible

    Symptôme :

    Cannot get Jedis connection
    

    Solutions :

    1. Vérifier l’hôte et le port Redis
      redis-cli -h localhost -p 6379 ping
      
    2. Vérifier le mot de passe
    3. Vérifier les firewalls

    2.2 Database non accessible

    Symptôme :

    Database may be already in use: "locked by another process"
    

    Solutions :

    1. Arrêter les autres instances
    2. Utiliser AUTO_SERVER=TRUE dans l’URL H2
    3. Supprimer les fichiers de lock
    rm ./data/socle-techdb.lock.db
    

    3. Problèmes de logging (V4)

    3.1 Logs non visibles

    Symptôme : Aucun log n’apparaît.

    Solutions :

    1. Vérifier que log4j2.xml existe
    2. Vérifier le niveau de log
    3. Vérifier logging.config dans application.yml
    logging:
      config: classpath:log4j2.xml
    

    3.2 Conflit Logback/Log4j2

    Symptôme :

    SLF4J: Class path contains multiple SLF4J bindings
    

    Solution : Exclure Logback de toutes les dépendances

    mvn dependency:tree | grep logback
    
    <exclusion>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-logging</artifactId>
    </exclusion>
    

    3.3 AsyncLoggers non actifs

    Symptôme : Performance de logging dégradée.

    Solution : Vérifier log4j2.component.properties

    Log4jContextSelector=org.apache.logging.log4j.core.async.AsyncLoggerContextSelector
    

    3.4 LogForwarder queue pleine

    Symptôme :

    WARN - Log queue full, storing to fallback
    

    Solutions :

    1. Augmenter queue-capacity
    2. Vérifier la connectivité réseau vers le LogHub
    3. Réduire le volume de logs
    # Vérifier les logs en fallback
    curl http://localhost:8080/admin/logforwarder/status
    

    4. Problèmes de performance

    4.1 Haute consommation CPU

    Solutions :

    1. Vérifier les workers en boucle infinie
    2. Vérifier les logs en DEBUG
    3. Profiler avec JFR
    jcmd <pid> JFR.start duration=60s filename=recording.jfr
    

    4.2 Haute consommation mémoire

    Solutions :

    1. Analyser le heap dump
      jmap -dump:format=b,file=heap.hprof <pid>
      
    2. Vérifier les fuites dans KvBus/SharedDataRegistry
    3. Ajuster la configuration JVM

    4.3 Latence élevée

    Solutions :

    1. Vérifier les circuit breakers
      curl http://localhost:8080/admin/resilience/circuits
      
    2. Vérifier les connexions réseau
    3. Vérifier les métriques
      curl http://localhost:8080/actuator/prometheus | grep latency
      

    5. Problèmes de résilience

    5.1 Circuit Breaker bloqué en OPEN

    Symptôme : Un circuit reste ouvert malgré la récupération du service.

    Solutions :

    1. Reset manuel
      curl -X POST http://localhost:8080/admin/resilience/circuits/my-circuit/reset
      
    2. Vérifier le timeout configuré
    3. Vérifier que le service cible répond

    5.2 Retry infini

    Symptôme : L’application retry sans fin.

    Solutions :

    1. Vérifier max-attempts configuré
    2. Vérifier les exceptions retryables
    3. Ajouter un circuit breaker

    6. Problèmes H2 TechDB (V4)

    6.1 Base corrompue

    Symptôme :

    File corrupted
    

    Solution :

    rm -rf ./data/socle-techdb.*
    # Redémarrer l'application
    

    6.2 H2 Console inaccessible

    Solutions :

    1. Vérifier socle.techdb.console.enabled=true
    2. Vérifier l’URL : http://localhost:8080/h2-console
    3. Utiliser l’URL JDBC correcte : jdbc:h2:file:./data/socle-techdb

    6.3 Offsets perdus

    Solutions :

    1. Vérifier que TechDB est enabled
    2. Vérifier les logs de saveOffset
    3. Requêter directement H2
      SELECT * FROM socle_offsets;
      

    7. Problèmes d’authentification (V4)

    7.1 Login échoue

    Symptôme :

    AuthenticationException: Login failed: 401
    

    Solutions :

    1. Vérifier API_KEY
    2. Vérifier SOURCE_NAME
    3. Vérifier AUTH_SERVER_URL

    7.2 Token expiré

    Symptôme :

    Token expired
    

    Solutions :

    1. Vérifier l’horloge système (NTP)
    2. Augmenter access-token-buffer-seconds

    7.3 Admin API 401

    Solutions :

    1. Vérifier si l’auth est activée
      socle.admin.auth.enabled: true
      
    2. Utiliser les credentials corrects
      curl -u admin:password http://localhost:8080/admin/workers
      

    8. Problèmes Kubernetes

    8.1 Pod en CrashLoopBackOff

    Solutions :

    1. Vérifier les logs
      kubectl logs <pod-name> --previous
      
    2. Vérifier les ressources
    3. Vérifier les probes

    8.2 Probes échouent

    Solutions :

    1. Augmenter initialDelaySeconds
    2. Vérifier que l’endpoint /admin/health répond
    3. Vérifier le port

    8.3 OOMKilled

    Solutions :

    1. Augmenter les limites mémoire
    2. Ajuster les options JVM
      -XX:MaxRAMPercentage=75.0
      

    9. Commandes de diagnostic

    9.1 API Admin

    # Santé globale
    curl http://localhost:8080/admin/health
    
    # État des workers
    curl http://localhost:8080/admin/workers
    
    # Registry
    curl http://localhost:8080/admin/registry
    
    # Circuits breakers
    curl http://localhost:8080/admin/resilience/circuits
    
    # Configuration
    curl http://localhost:8080/admin/config
    
    # Métriques
    curl http://localhost:8080/actuator/prometheus
    

    9.2 JVM

    # Thread dump
    jstack <pid>
    
    # Heap info
    jmap -heap <pid>
    
    # GC stats
    jstat -gcutil <pid> 1000
    
    # JFR recording
    jcmd <pid> JFR.start duration=60s filename=recording.jfr
    

    9.3 Réseau

    # Test Redis
    redis-cli -h localhost ping
    
    # Test HTTP
    curl -v http://localhost:8080/admin/health
    
    # DNS
    nslookup myservice.namespace.svc.cluster.local
    

    10. Logs utiles à activer

    # application.yml ou variables d'environnement
    
    logging:
      level:
        eu.lmvi.socle: DEBUG
        eu.lmvi.socle.mop: DEBUG
        eu.lmvi.socle.supervisor: DEBUG
        eu.lmvi.socle.techdb: DEBUG
        eu.lmvi.socle.resilience: DEBUG
        org.springframework.web: DEBUG
        io.lettuce: DEBUG  # Redis
    

    11. Checklist de diagnostic

    □ L'application démarre-t-elle ?
      □ Logs de démarrage présents ?
      □ Port disponible ?
      □ Configuration valide ?
    
    □ Les workers sont-ils healthy ?
      □ GET /admin/workers
      □ Heartbeats reçus ?
      □ Erreurs dans les logs ?
    
    □ Les connexions externes fonctionnent-elles ?
      □ Redis accessible ?
      □ Base de données accessible ?
      □ APIs externes accessibles ?
    
    □ Les métriques sont-elles normales ?
      □ CPU < 80% ?
      □ Mémoire < 80% ?
      □ Latence acceptable ?
      □ Taux d'erreur bas ?
    
    □ Les logs sont-ils corrects ?
      □ Log4j2 configuré ?
      □ LogForwarder fonctionne ?
      □ Pas de logs en fallback ?
    

    12. Références

  • Socle V004 – Données Partagées

    Socle V004 – Données Partagées

    07 – SharedDataRegistry

    Version : 4.0.0 Date : 2025-12-09

    1. Introduction

    SharedDataRegistry est un registre centralisé pour partager des données entre Workers au sein d’une même instance. Il fournit des opérations atomiques et un système de niveaux de santé.

    Différence avec KvBus

    Aspect SharedDataRegistry KvBus
    Scope Intra-instance Optionnel inter-instances (Redis)
    Performance Ultra rapide (mémoire) Variable (réseau si Redis)
    Types Fortement typés Strings/JSON
    Health levels Oui Non
    Callbacks Oui Non

    2. Interface SharedDataRegistry

    package eu.lmvi.socle.shared;
    
    public interface SharedDataRegistry {
    
        // === Key-Value basique ===
    
        void put(String key, Object value);
        void put(String key, Object value, HealthLevel level);
        Optional<Object> get(String key);
        <T> Optional<T> get(String key, Class<T> type);
        void delete(String key);
        boolean exists(String key);
    
        // === Typed getters ===
    
        Optional<String> getString(String key);
        Optional<Integer> getInt(String key);
        Optional<Long> getLong(String key);
        Optional<Double> getDouble(String key);
        Optional<Boolean> getBoolean(String key);
    
        // === Sequences (compteurs atomiques) ===
    
        void createSequence(String key, long initialValue, HealthLevel level);
        long incrementSequence(String key);
        long incrementSequence(String key, long delta);
        long getSequence(String key);
        void setSequence(String key, long value);
    
        // === Lists ===
    
        <T> void addToList(String key, T item);
        <T> List<T> getList(String key, Class<T> type);
        void clearList(String key);
    
        // === Maps ===
    
        <V> void putInMap(String key, String mapKey, V value);
        <V> Optional<V> getFromMap(String key, String mapKey, Class<V> type);
        <V> Map<String, V> getMap(String key, Class<V> type);
        void removeFromMap(String key, String mapKey);
    
        // === Health ===
    
        HealthLevel getHealthLevel(String key);
        Map<String, HealthLevel> getAllHealthLevels();
        List<String> getUnhealthyKeys();
    
        // === Callbacks ===
    
        void registerCallback(String key, Consumer<Object> callback);
        void unregisterCallback(String key);
    
        // === Introspection ===
    
        Set<String> keys();
        Set<String> keys(String pattern);
        Map<String, Object> getAll();
        int size();
        void clear();
    }
    

    3. Health Levels

    package eu.lmvi.socle.shared;
    
    public enum HealthLevel {
        /**
         * Informatif - pas d'impact sur la santé
         */
        INFO,
    
        /**
         * Normal - contribue à la santé normale
         */
        NORMAL,
    
        /**
         * Important - dégradation si problème
         */
        IMPORTANT,
    
        /**
         * Critique - unhealthy si problème
         */
        CRITICAL
    }
    

    Utilisation dans le Supervisor

    Le Supervisor consulte les HealthLevel pour déterminer l’état de santé global :

    • CRITICAL absent ou invalide → Instance UNHEALTHY
    • IMPORTANT absent ou invalide → Instance DEGRADED
    • NORMAL/INFO → Pas d’impact

    4. Implémentation

    package eu.lmvi.socle.shared;
    
    @Component
    public class InMemorySharedDataRegistry implements SharedDataRegistry {
    
        private final ConcurrentHashMap<String, Entry> store = new ConcurrentHashMap<>();
        private final ConcurrentHashMap<String, Consumer<Object>> callbacks = new ConcurrentHashMap<>();
    
        @Override
        public void put(String key, Object value) {
            put(key, value, HealthLevel.NORMAL);
        }
    
        @Override
        public void put(String key, Object value, HealthLevel level) {
            Entry previous = store.put(key, new Entry(value, level));
            notifyCallback(key, value);
        }
    
        @Override
        public Optional<Object> get(String key) {
            Entry entry = store.get(key);
            return entry != null ? Optional.of(entry.value) : Optional.empty();
        }
    
        @Override
        public <T> Optional<T> get(String key, Class<T> type) {
            return get(key).filter(type::isInstance).map(type::cast);
        }
    
        @Override
        public Optional<String> getString(String key) {
            return get(key).map(Object::toString);
        }
    
        @Override
        public Optional<Integer> getInt(String key) {
            return get(key, Number.class).map(Number::intValue);
        }
    
        @Override
        public Optional<Long> getLong(String key) {
            return get(key, Number.class).map(Number::longValue);
        }
    
        @Override
        public void createSequence(String key, long initialValue, HealthLevel level) {
            store.put(key, new Entry(new AtomicLong(initialValue), level));
        }
    
        @Override
        public long incrementSequence(String key) {
            return incrementSequence(key, 1);
        }
    
        @Override
        public long incrementSequence(String key, long delta) {
            Entry entry = store.get(key);
            if (entry == null || !(entry.value instanceof AtomicLong)) {
                throw new IllegalStateException("Sequence not found: " + key);
            }
            long newValue = ((AtomicLong) entry.value).addAndGet(delta);
            notifyCallback(key, newValue);
            return newValue;
        }
    
        @Override
        public long getSequence(String key) {
            Entry entry = store.get(key);
            if (entry == null || !(entry.value instanceof AtomicLong)) {
                throw new IllegalStateException("Sequence not found: " + key);
            }
            return ((AtomicLong) entry.value).get();
        }
    
        @Override
        public HealthLevel getHealthLevel(String key) {
            Entry entry = store.get(key);
            return entry != null ? entry.level : null;
        }
    
        @Override
        public List<String> getUnhealthyKeys() {
            return store.entrySet().stream()
                .filter(e -> e.getValue().level == HealthLevel.CRITICAL)
                .filter(e -> !isValueHealthy(e.getValue().value))
                .map(Map.Entry::getKey)
                .toList();
        }
    
        @Override
        public void registerCallback(String key, Consumer<Object> callback) {
            callbacks.put(key, callback);
        }
    
        private void notifyCallback(String key, Object value) {
            Consumer<Object> callback = callbacks.get(key);
            if (callback != null) {
                try {
                    callback.accept(value);
                } catch (Exception e) {
                    // Log but don't propagate
                }
            }
        }
    
        private boolean isValueHealthy(Object value) {
            if (value == null) return false;
            if (value instanceof Boolean b) return b;
            if (value instanceof Number n) return n.doubleValue() >= 0;
            return true;
        }
    
        private record Entry(Object value, HealthLevel level) {}
    }
    

    5. Utilisation

    5.1 Injection

    @Service
    public class MonService {
    
        @Autowired
        private SharedDataRegistry registry;
    
        public void process() {
            // ...
        }
    }
    

    5.2 Key-Value simple

    // Stocker
    registry.put("config.maxRetries", 3);
    registry.put("status.lastSync", Instant.now().toString());
    
    // Récupérer
    int maxRetries = registry.getInt("config.maxRetries").orElse(5);
    String lastSync = registry.getString("status.lastSync").orElse("never");
    

    5.3 Avec Health Level

    // Donnée critique - instance unhealthy si absente
    registry.put("database.connected", true, HealthLevel.CRITICAL);
    
    // Donnée importante - instance degraded si absente
    registry.put("cache.available", true, HealthLevel.IMPORTANT);
    
    // Donnée normale
    registry.put("stats.requestsTotal", 0, HealthLevel.NORMAL);
    
    // Donnée informative
    registry.put("info.startTime", Instant.now(), HealthLevel.INFO);
    

    5.4 Sequences (Compteurs)

    // Créer une séquence
    registry.createSequence("orders.processed", 0, HealthLevel.NORMAL);
    
    // Incrémenter
    long count = registry.incrementSequence("orders.processed");
    log.info("Processed order #{}", count);
    
    // Incrémenter avec delta
    long bytes = registry.incrementSequence("bytes.transferred", 1024);
    
    // Lire
    long total = registry.getSequence("orders.processed");
    

    5.5 Listes

    // Ajouter à une liste
    registry.addToList("errors.recent", new ErrorRecord("timeout", Instant.now()));
    registry.addToList("errors.recent", new ErrorRecord("connection", Instant.now()));
    
    // Lire la liste
    List<ErrorRecord> errors = registry.getList("errors.recent", ErrorRecord.class);
    
    // Vider
    registry.clearList("errors.recent");
    

    5.6 Maps

    // Stocker dans une map
    registry.putInMap("workers.status", "worker-1", "RUNNING");
    registry.putInMap("workers.status", "worker-2", "STOPPED");
    
    // Lire une entrée
    Optional<String> status = registry.getFromMap("workers.status", "worker-1", String.class);
    
    // Lire toute la map
    Map<String, String> allStatus = registry.getMap("workers.status", String.class);
    

    5.7 Callbacks

    // Enregistrer un callback
    registry.registerCallback("config.maxRetries", newValue -> {
        log.info("maxRetries changed to: {}", newValue);
        reconfigure((Integer) newValue);
    });
    
    // La modification déclenche le callback
    registry.put("config.maxRetries", 5);  // Callback appelé
    

    6. Patterns courants

    6.1 État de connexion

    @Component
    public class DatabaseWorker implements Worker {
    
        @Autowired
        private SharedDataRegistry registry;
    
        @Override
        public void initialize() {
            registry.put("database.connected", false, HealthLevel.CRITICAL);
        }
    
        @Override
        public void start() {
            try {
                connect();
                registry.put("database.connected", true, HealthLevel.CRITICAL);
            } catch (Exception e) {
                registry.put("database.connected", false, HealthLevel.CRITICAL);
                throw e;
            }
        }
    
        @Override
        public void stop() {
            disconnect();
            registry.put("database.connected", false, HealthLevel.CRITICAL);
        }
    }
    

    6.2 Métriques temps réel

    @Component
    public class MetricsCollector {
    
        @Autowired
        private SharedDataRegistry registry;
    
        @PostConstruct
        public void init() {
            registry.createSequence("metrics.requests.total", 0, HealthLevel.INFO);
            registry.createSequence("metrics.requests.errors", 0, HealthLevel.NORMAL);
            registry.createSequence("metrics.bytes.in", 0, HealthLevel.INFO);
            registry.createSequence("metrics.bytes.out", 0, HealthLevel.INFO);
        }
    
        public void recordRequest(long bytesIn, long bytesOut, boolean success) {
            registry.incrementSequence("metrics.requests.total");
            registry.incrementSequence("metrics.bytes.in", bytesIn);
            registry.incrementSequence("metrics.bytes.out", bytesOut);
    
            if (!success) {
                registry.incrementSequence("metrics.requests.errors");
            }
        }
    
        public Map<String, Object> getMetrics() {
            return Map.of(
                "requests.total", registry.getSequence("metrics.requests.total"),
                "requests.errors", registry.getSequence("metrics.requests.errors"),
                "bytes.in", registry.getSequence("metrics.bytes.in"),
                "bytes.out", registry.getSequence("metrics.bytes.out")
            );
        }
    }
    

    6.3 Circuit Breaker state

    @Component
    public class CircuitBreakerStateManager {
    
        @Autowired
        private SharedDataRegistry registry;
    
        public void updateState(String circuitName, CircuitState state) {
            String key = "circuit." + circuitName + ".state";
            HealthLevel level = state == CircuitState.OPEN
                ? HealthLevel.IMPORTANT
                : HealthLevel.NORMAL;
            registry.put(key, state.name(), level);
        }
    
        public CircuitState getState(String circuitName) {
            return registry.getString("circuit." + circuitName + ".state")
                .map(CircuitState::valueOf)
                .orElse(CircuitState.CLOSED);
        }
    }
    

    6.4 Progress tracking

    @Component
    public class BatchProcessor {
    
        @Autowired
        private SharedDataRegistry registry;
    
        public void processBatch(String batchId, List<Item> items) {
            registry.put("batch." + batchId + ".total", items.size());
            registry.createSequence("batch." + batchId + ".processed", 0, HealthLevel.NORMAL);
    
            for (Item item : items) {
                processItem(item);
                registry.incrementSequence("batch." + batchId + ".processed");
            }
    
            registry.put("batch." + batchId + ".status", "COMPLETED");
        }
    
        public double getProgress(String batchId) {
            int total = registry.getInt("batch." + batchId + ".total").orElse(0);
            if (total == 0) return 0;
    
            long processed = registry.getSequence("batch." + batchId + ".processed");
            return (double) processed / total * 100;
        }
    }
    

    7. Intégration avec Supervisor

    @Component
    public class HealthAggregator {
    
        @Autowired
        private SharedDataRegistry registry;
    
        @Autowired
        private List<Worker> workers;
    
        public HealthStatus aggregateHealth() {
            // Check workers
            boolean allWorkersHealthy = workers.stream().allMatch(Worker::isHealthy);
    
            // Check critical registry entries
            List<String> unhealthyKeys = registry.getUnhealthyKeys();
            boolean hasCriticalFailure = !unhealthyKeys.isEmpty();
    
            if (!allWorkersHealthy || hasCriticalFailure) {
                return HealthStatus.UNHEALTHY;
            }
    
            // Check important entries
            Map<String, HealthLevel> levels = registry.getAllHealthLevels();
            boolean hasImportantFailure = levels.entrySet().stream()
                .filter(e -> e.getValue() == HealthLevel.IMPORTANT)
                .anyMatch(e -> !isHealthy(registry.get(e.getKey())));
    
            if (hasImportantFailure) {
                return HealthStatus.DEGRADED;
            }
    
            return HealthStatus.HEALTHY;
        }
    }
    

    8. Exposition API

    @RestController
    @RequestMapping("/admin/registry")
    public class SharedDataController {
    
        @Autowired
        private SharedDataRegistry registry;
    
        @GetMapping
        public Map<String, Object> getAll() {
            return registry.getAll();
        }
    
        @GetMapping("/{key}")
        public ResponseEntity<?> get(@PathVariable String key) {
            return registry.get(key)
                .map(ResponseEntity::ok)
                .orElse(ResponseEntity.notFound().build());
        }
    
        @GetMapping("/health")
        public Map<String, HealthLevel> getHealthLevels() {
            return registry.getAllHealthLevels();
        }
    
        @GetMapping("/unhealthy")
        public List<String> getUnhealthyKeys() {
            return registry.getUnhealthyKeys();
        }
    }
    

    9. Bonnes pratiques

    Conventions de nommage

    <category>.<subcategory>.<name>
    
    Exemples:
    - database.connected
    - worker.kafka.status
    - metrics.requests.total
    - batch.order-123.progress
    - circuit.external-api.state
    

    DO

    • Utiliser des noms de clés cohérents et hiérarchiques
    • Définir le HealthLevel approprié pour chaque donnée
    • Utiliser les sequences pour les compteurs (thread-safe)
    • Nettoyer les données obsolètes

    DON’T

    • Ne pas stocker de données volumineuses (logs, payloads)
    • Ne pas utiliser pour le stockage persistant (utiliser TechDB)
    • Ne pas créer de nouvelles clés dynamiquement sans contrôle
    • Ne pas oublier que c’est per-instance (pas de sync multi-instances)

    10. Références