Étiquette : Coding

  • Socle V004 – Guides Pratiques

    Socle V004 – Guides Pratiques

    17 – How-To Guides

    Version : 4.0.0 Date : 2025-12-09

    1. Comment créer un nouveau Worker

    1.1 Worker simple

    package com.myapp.worker;
    
    import eu.lmvi.socle.worker.Worker;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.stereotype.Component;
    
    @Component
    public class MyWorker implements Worker {
    
        private static final Logger log = LoggerFactory.getLogger(MyWorker.class);
        private volatile boolean running = false;
    
        @Override
        public String getName() {
            return "my-worker";
        }
    
        @Override
        public void initialize() {
            log.info("[{}] Initializing", getName());
        }
    
        @Override
        public void start() {
            log.info("[{}] Starting", getName());
            running = true;
        }
    
        @Override
        public void doWork() {
            if (!running) return;
            // Votre logique ici
        }
    
        @Override
        public void stop() {
            log.info("[{}] Stopping", getName());
            running = false;
        }
    
        @Override
        public boolean isHealthy() {
            return running;
        }
    
        @Override
        public Map<String, Object> getStats() {
            return Map.of("running", running);
        }
    }
    

    1.2 Worker avec priorité

    @Override
    public int getStartPriority() {
        return 10;  // Démarre en premier
    }
    
    @Override
    public int getStopPriority() {
        return 90;  // S'arrête en dernier
    }
    

    1.3 Worker schedulé

    @Override
    public String getSchedule() {
        return "0 0 6 * * ?";  // Tous les jours à 6h
    }
    
    @Override
    public boolean isScheduled() {
        return true;
    }
    

    2. Comment utiliser KvBus

    2.1 Opérations basiques

    @Service
    public class MyService {
    
        @Autowired
        private KvBus kvBus;
    
        public void example() {
            // Stocker
            kvBus.put("key", "value");
            kvBus.put("key-with-ttl", "value", Duration.ofHours(1));
    
            // Récupérer
            Optional<String> value = kvBus.get("key");
    
            // Supprimer
            kvBus.delete("key");
    
            // Compteur atomique
            long count = kvBus.increment("counter");
        }
    }
    

    2.2 JSON

    // Stocker un objet
    kvBus.putJson("order:123", order);
    
    // Récupérer un objet
    Optional<Order> order = kvBus.getJson("order:123", Order.class);
    

    2.3 Lock distribué

    public boolean tryLock(String resource) {
        return kvBus.putIfAbsent("lock:" + resource, "locked", Duration.ofMinutes(5));
    }
    
    public void unlock(String resource) {
        kvBus.delete("lock:" + resource);
    }
    

    3. Comment utiliser SharedDataRegistry

    3.1 Key-Value

    @Service
    public class MyService {
    
        @Autowired
        private SharedDataRegistry registry;
    
        public void example() {
            // Stocker avec niveau de santé
            registry.put("database.connected", true, HealthLevel.CRITICAL);
    
            // Récupérer
            boolean connected = registry.getBoolean("database.connected").orElse(false);
        }
    }
    

    3.2 Compteurs

    // Créer une séquence
    registry.createSequence("orders.processed", 0, HealthLevel.NORMAL);
    
    // Incrémenter
    long count = registry.incrementSequence("orders.processed");
    
    // Lire
    long total = registry.getSequence("orders.processed");
    

    4. Comment utiliser TechDB (V4)

    4.1 Offsets

    @Service
    public class MyService {
    
        @Autowired
        private TechDbManager techDb;
    
        public void example() {
            // Sauvegarder un offset
            techDb.saveOffset("kafka", "my-topic-0", 123456L, null);
    
            // Récupérer un offset
            OptionalLong offset = techDb.getOffset("kafka", "my-topic-0");
        }
    }
    

    4.2 État des workers

    // Sauvegarder l'état
    techDb.saveWorkerState("my-worker", "RUNNING", Map.of("progress", 50));
    
    // Récupérer l'état
    Optional<WorkerState> state = techDb.getWorkerState("my-worker");
    

    4.3 Événements techniques

    // Logger un événement
    techDb.logEvent("ERROR", Map.of(
        "message", "Connection failed",
        "target", "database"
    ));
    
    // Récupérer les événements
    List<TechEvent> events = techDb.getEvents("ERROR", Instant.now().minus(1, ChronoUnit.HOURS), 100);
    

    5. Comment implémenter un Pipeline

    5.1 Pipeline simple

    Pipeline<Order, ProcessedOrder> pipeline = PipelineBuilder
        .<Order, ProcessedOrder>create("order-processing")
        .addStep("validate", this::validateOrder)
        .addStep("enrich", this::enrichOrder)
        .addStep("process", this::processOrder)
        .build();
    
    PipelineResult<ProcessedOrder> result = pipelineEngine.execute(pipeline, order);
    

    5.2 Étape personnalisée

    public class ValidationStep implements PipelineStep<Order, ValidatedOrder> {
    
        @Override
        public String getName() {
            return "validation";
        }
    
        @Override
        public StepResult<ValidatedOrder> execute(Order input, PipelineContext context) {
            // Validation...
            return StepResult.success(getName(), new ValidatedOrder(input), Duration.ZERO);
        }
    
        @Override
        public boolean isRetryable() {
            return false;
        }
    }
    

    6. Comment configurer le logging (V4)

    6.1 Log4j2 basique

    <!-- src/main/resources/log4j2.xml -->
    <?xml version="1.0" encoding="UTF-8"?>
    <Configuration status="WARN">
        <Appenders>
            <Console name="Console" target="SYSTEM_OUT">
                <PatternLayout pattern="%d{ISO8601} %-5level [%thread] %logger{36} - %msg%n"/>
            </Console>
        </Appenders>
        <Loggers>
            <Root level="INFO">
                <AppenderRef ref="Console"/>
            </Root>
        </Loggers>
    </Configuration>
    

    6.2 Avec LogForwarder

    # application.yml
    socle:
      logging:
        forwarder:
          enabled: true
          transport-mode: http
          log-hub-url: https://logs.mycompany.com/api/ingest
    

    7. Comment utiliser l’authentification JWT (V4)

    7.1 Configuration

    socle:
      auth:
        enabled: true
        server-url: https://auth.mycompany.com
        api-key: ${API_KEY}
    

    7.2 Utilisation

    @Service
    public class SecuredService {
    
        @Autowired(required = false)
        private SocleAuthClient authClient;
    
        public void callSecuredApi() {
            if (authClient == null) {
                throw new IllegalStateException("Auth not configured");
            }
    
            String token = authClient.getValidAccessToken();
    
            // Utiliser le token dans les requêtes HTTP
            Request request = new Request.Builder()
                .url("https://api.mycompany.com/data")
                .header("Authorization", "Bearer " + token)
                .build();
        }
    }
    

    8. Comment ajouter des métriques personnalisées

    8.1 Counter

    @Component
    public class MyMetrics {
    
        private final Counter ordersProcessed;
    
        public MyMetrics(MeterRegistry registry) {
            this.ordersProcessed = Counter.builder("my_orders_processed_total")
                .description("Total orders processed")
                .register(registry);
        }
    
        public void orderProcessed() {
            ordersProcessed.increment();
        }
    }
    

    8.2 Timer

    private final Timer processingTime;
    
    public MyMetrics(MeterRegistry registry) {
        this.processingTime = Timer.builder("my_processing_duration_seconds")
            .description("Processing duration")
            .publishPercentiles(0.5, 0.95, 0.99)
            .register(registry);
    }
    
    public void process() {
        Timer.Sample sample = Timer.start();
        try {
            doProcess();
        } finally {
            sample.stop(processingTime);
        }
    }
    

    9. Comment gérer la résilience

    9.1 Retry

    @Autowired
    private RetryTemplate retryTemplate;
    
    public Data fetchData() {
        return retryTemplate.execute(() -> httpClient.get("/api/data"));
    }
    

    9.2 Circuit Breaker

    @Autowired
    private CircuitBreakerRegistry cbRegistry;
    
    public Data fetchData() {
        CircuitBreaker cb = cbRegistry.getOrCreate("external-api");
    
        return cb.executeWithFallback(
            () -> httpClient.get("/api/data"),
            () -> getCachedData()
        );
    }
    

    10. Comment déployer sur Kubernetes

    10.1 Build de l’image

    # Build
    mvn clean package -DskipTests
    docker build -t my-app:1.0.0 .
    
    # Push
    docker push my-registry/my-app:1.0.0
    

    10.2 Déploiement

    # Appliquer les manifests
    kubectl apply -f k8s/
    
    # Ou avec Helm
    helm install my-app ./chart -n my-namespace
    

    10.3 Vérification

    # Logs
    kubectl logs -f deployment/my-app
    
    # Port forward
    kubectl port-forward svc/my-app 8080:80
    
    # Health check
    curl http://localhost:8080/admin/health
    

    11. Comment migrer de V3 à V4

    11.1 Dépendances Maven

    <!-- Remplacer Logback par Log4j2 -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-log4j2</artifactId>
    </dependency>
    
    <!-- Ajouter H2 -->
    <dependency>
        <groupId>com.h2database</groupId>
        <artifactId>h2</artifactId>
    </dependency>
    

    11.2 Configuration

    # Ajouter à application.yml
    socle:
      techdb:
        enabled: true
      logging:
        forwarder:
          enabled: false
      auth:
        enabled: false
      worker-registry:
        enabled: false
    
    logging:
      config: classpath:log4j2.xml
    

    11.3 Fichiers Log4j2

    Créer src/main/resources/log4j2.xml et log4j2.component.properties.

    Voir 25-MIGRATION-V3-V4 pour le guide complet.

    12. Comment debugger

    12.1 H2 Console

    socle:
      techdb:
        console:
          enabled: true
          path: /h2-console
    

    Accéder à http://localhost:8080/h2-console

    12.2 Endpoints Admin

    # État de santé
    curl http://localhost:8080/admin/health
    
    # Workers
    curl http://localhost:8080/admin/workers
    
    # Registry
    curl http://localhost:8080/admin/registry
    
    # Métriques
    curl http://localhost:8080/actuator/prometheus
    

    12.3 Logs

    // Activer le debug pour le Socle
    logging.level.eu.lmvi.socle=DEBUG
    

    13. Références

  • Socle V004 – Exemples de Code

    Socle V004 – Exemples de Code

    19 – Exemples

    Version : 4.0.0 Date : 2025-12-09

    1. Application minimale

    1.1 pom.xml

    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0"
             xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
             https://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
    
        <parent>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-parent</artifactId>
            <version>3.2.1</version>
        </parent>
    
        <groupId>com.example</groupId>
        <artifactId>my-socle-app</artifactId>
        <version>1.0.0</version>
    
        <properties>
            <java.version>21</java.version>
        </properties>
    
        <dependencies>
            <!-- Socle V4 -->
            <dependency>
                <groupId>eu.lmvi</groupId>
                <artifactId>socle-v004</artifactId>
                <version>4.0.0</version>
            </dependency>
        </dependencies>
    
        <build>
            <plugins>
                <plugin>
                    <groupId>org.springframework.boot</groupId>
                    <artifactId>spring-boot-maven-plugin</artifactId>
                </plugin>
            </plugins>
        </build>
    </project>
    

    1.2 Application.java

    package com.example;
    
    import org.springframework.boot.SpringApplication;
    import org.springframework.boot.autoconfigure.SpringBootApplication;
    import org.springframework.context.annotation.ComponentScan;
    
    @SpringBootApplication
    @ComponentScan(basePackages = {"com.example", "eu.lmvi.socle"})
    public class Application {
        public static void main(String[] args) {
            SpringApplication.run(Application.class, args);
        }
    }
    

    1.3 application.yml

    socle:
      app_name: my-app
      env_name: ${ENV_NAME:DEV}
      region: ${REGION:local}
    
    server:
      port: ${HTTP_PORT:8080}
    
    logging:
      config: classpath:log4j2.xml
    

    1.4 Worker simple

    package com.example.worker;
    
    import eu.lmvi.socle.worker.Worker;
    import org.springframework.stereotype.Component;
    
    @Component
    public class HelloWorker implements Worker {
    
        @Override
        public String getName() {
            return "hello-worker";
        }
    
        @Override
        public void initialize() {
            System.out.println("Hello Worker initialized");
        }
    
        @Override
        public void start() {
            System.out.println("Hello Worker started");
        }
    
        @Override
        public void doWork() {
            System.out.println("Hello from worker!");
        }
    
        @Override
        public void stop() {
            System.out.println("Hello Worker stopped");
        }
    
        @Override
        public boolean isHealthy() {
            return true;
        }
    
        @Override
        public Map<String, Object> getStats() {
            return Map.of("status", "running");
        }
    }
    

    2. Worker Kafka Consumer

    package com.example.worker;
    
    import eu.lmvi.socle.worker.AbstractWorker;
    import eu.lmvi.socle.techdb.TechDbManager;
    import org.apache.kafka.clients.consumer.*;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Component;
    
    @Component
    public class KafkaConsumerWorker extends AbstractWorker {
    
        @Autowired
        private TechDbManager techDb;
    
        private KafkaConsumer<String, String> consumer;
        private String topic = "my-topic";
        private long lastOffset = 0;
    
        @Override
        public String getName() {
            return "kafka-consumer";
        }
    
        @Override
        public int getStartPriority() {
            return 10;
        }
    
        @Override
        protected void doInitialize() {
            // Restaurer l'offset
            lastOffset = techDb.getOffset("kafka", topic + "-0").orElse(0L);
            log.info("Starting from offset: {}", lastOffset);
    
            // Créer le consumer
            Properties props = new Properties();
            props.put("bootstrap.servers", "localhost:9092");
            props.put("group.id", "my-group");
            props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            props.put("enable.auto.commit", "false");
    
            consumer = new KafkaConsumer<>(props);
            consumer.subscribe(List.of(topic));
        }
    
        @Override
        protected void doProcess() {
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
    
            for (ConsumerRecord<String, String> record : records) {
                processMessage(record);
                lastOffset = record.offset();
                incrementProcessed();
            }
    
            // Persister périodiquement
            if (processedCount.get() % 100 == 0) {
                techDb.saveOffset("kafka", topic + "-0", lastOffset, null);
            }
        }
    
        private void processMessage(ConsumerRecord<String, String> record) {
            log.debug("Processing: key={}, value={}", record.key(), record.value());
            // Traitement...
        }
    
        @Override
        protected void doStop() {
            // Sauvegarder l'offset final
            techDb.saveOffset("kafka", topic + "-0", lastOffset, null);
    
            if (consumer != null) {
                consumer.close();
            }
        }
    
        @Override
        public Map<String, Object> getStats() {
            Map<String, Object> stats = new HashMap<>(super.getStats());
            stats.put("lastOffset", lastOffset);
            return stats;
        }
    }
    

    3. Worker HTTP API

    package com.example.worker;
    
    import eu.lmvi.socle.worker.Worker;
    import eu.lmvi.socle.kv.KvBus;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Component;
    import org.springframework.web.bind.annotation.*;
    
    @Component
    @RestController
    @RequestMapping("/api/orders")
    public class OrderApiWorker implements Worker {
    
        @Autowired
        private KvBus kvBus;
    
        @Autowired
        private OrderService orderService;
    
        private volatile boolean running = false;
    
        @Override
        public String getName() {
            return "order-api";
        }
    
        @Override
        public boolean isPassive() {
            return true;  // Pas de doWork cyclique
        }
    
        @Override
        public void initialize() {}
    
        @Override
        public void start() {
            running = true;
        }
    
        @Override
        public void doWork() {
            // Passif - traitement via endpoints REST
        }
    
        @Override
        public void stop() {
            running = false;
        }
    
        @Override
        public boolean isHealthy() {
            return running;
        }
    
        @Override
        public Map<String, Object> getStats() {
            return Map.of("running", running);
        }
    
        // === REST Endpoints ===
    
        @PostMapping
        public ResponseEntity<Order> createOrder(@RequestBody CreateOrderRequest request) {
            Order order = orderService.create(request);
    
            // Cache l'order
            kvBus.putJson("order:" + order.getId(), order);
            kvBus.setTtl("order:" + order.getId(), Duration.ofHours(1));
    
            return ResponseEntity.status(HttpStatus.CREATED).body(order);
        }
    
        @GetMapping("/{id}")
        public ResponseEntity<Order> getOrder(@PathVariable String id) {
            // Vérifier le cache d'abord
            Optional<Order> cached = kvBus.getJson("order:" + id, Order.class);
            if (cached.isPresent()) {
                return ResponseEntity.ok(cached.get());
            }
    
            // Sinon charger depuis la DB
            return orderService.findById(id)
                .map(ResponseEntity::ok)
                .orElse(ResponseEntity.notFound().build());
        }
    }
    

    4. Worker Schedulé (Cron)

    package com.example.worker;
    
    import eu.lmvi.socle.worker.AbstractWorker;
    import org.springframework.stereotype.Component;
    
    @Component
    public class DailyReportWorker extends AbstractWorker {
    
        @Override
        public String getName() {
            return "daily-report";
        }
    
        @Override
        public String getSchedule() {
            return "0 0 6 * * ?";  // Tous les jours à 6h
        }
    
        @Override
        public boolean isScheduled() {
            return true;
        }
    
        @Override
        protected void doProcess() {
            log.info("Generating daily report...");
    
            // Collecter les données
            ReportData data = collectReportData();
    
            // Générer le rapport
            Report report = generateReport(data);
    
            // Envoyer par email
            sendReportByEmail(report);
    
            log.info("Daily report sent successfully");
            incrementProcessed();
        }
    
        private ReportData collectReportData() {
            // ...
            return new ReportData();
        }
    
        private Report generateReport(ReportData data) {
            // ...
            return new Report();
        }
    
        private void sendReportByEmail(Report report) {
            // ...
        }
    }
    

    5. Pipeline de traitement

    package com.example.pipeline;
    
    import eu.lmvi.socle.pipeline.*;
    import org.springframework.stereotype.Component;
    
    @Component
    public class OrderProcessingPipeline {
    
        @Autowired
        private PipelineEngine engine;
    
        @Autowired
        private OrderValidator validator;
    
        @Autowired
        private OrderEnricher enricher;
    
        @Autowired
        private PaymentProcessor paymentProcessor;
    
        @Autowired
        private NotificationService notificationService;
    
        public PipelineResult<ProcessedOrder> process(Order order) {
            Pipeline<Order, ProcessedOrder> pipeline = PipelineBuilder
                .<Order, ProcessedOrder>create("order-processing")
                .addStep(new ValidationStep(validator))
                .addStep(new EnrichmentStep(enricher))
                .addStep(new PaymentStep(paymentProcessor))
                .addStep(new NotificationStep(notificationService))
                .build();
    
            return engine.execute(pipeline, order);
        }
    }
    
    // Étape de validation
    class ValidationStep implements PipelineStep<Order, ValidatedOrder> {
    
        private final OrderValidator validator;
    
        public ValidationStep(OrderValidator validator) {
            this.validator = validator;
        }
    
        @Override
        public String getName() {
            return "validation";
        }
    
        @Override
        public StepResult<ValidatedOrder> execute(Order input, PipelineContext context) {
            List<String> errors = validator.validate(input);
            if (!errors.isEmpty()) {
                return StepResult.failure(getName(),
                    new ValidationException(errors), Duration.ZERO, 1);
            }
            return StepResult.success(getName(), new ValidatedOrder(input), Duration.ZERO);
        }
    
        @Override
        public boolean isRetryable() {
            return false;
        }
    }
    
    // Étape de notification (optionnelle)
    class NotificationStep implements PipelineStep<ProcessedOrder, ProcessedOrder> {
    
        private final NotificationService notificationService;
    
        @Override
        public String getName() {
            return "notification";
        }
    
        @Override
        public StepResult<ProcessedOrder> execute(ProcessedOrder input, PipelineContext context) {
            try {
                notificationService.sendOrderConfirmation(input);
                return StepResult.success(getName(), input, Duration.ZERO);
            } catch (Exception e) {
                return StepResult.failure(getName(), e, Duration.ZERO, 1);
            }
        }
    
        @Override
        public boolean isOptional() {
            return true;  // Le pipeline continue même si la notif échoue
        }
    }
    

    6. Service avec résilience

    package com.example.service;
    
    import eu.lmvi.socle.resilience.*;
    import org.springframework.stereotype.Service;
    
    @Service
    public class ExternalApiService {
    
        @Autowired
        private RetryTemplate retryTemplate;
    
        @Autowired
        private CircuitBreakerRegistry cbRegistry;
    
        @Autowired
        private KvBus kvBus;
    
        private final OkHttpClient httpClient;
    
        public Data fetchData(String id) {
            // Circuit breaker + Retry + Cache fallback
            CircuitBreaker cb = cbRegistry.getOrCreate("external-api");
    
            return cb.executeWithFallback(
                () -> retryTemplate.execute(() -> doFetchData(id)),
                () -> getCachedData(id)
            );
        }
    
        private Data doFetchData(String id) throws IOException {
            Request request = new Request.Builder()
                .url("https://api.example.com/data/" + id)
                .build();
    
            try (Response response = httpClient.newCall(request).execute()) {
                if (!response.isSuccessful()) {
                    throw new IOException("API returned " + response.code());
                }
    
                Data data = parseResponse(response.body().string());
    
                // Mettre en cache
                kvBus.putJson("cache:data:" + id, data);
                kvBus.setTtl("cache:data:" + id, Duration.ofMinutes(5));
    
                return data;
            }
        }
    
        private Data getCachedData(String id) {
            return kvBus.getJson("cache:data:" + id, Data.class)
                .orElseThrow(() -> new RuntimeException("No cached data available"));
        }
    }
    

    7. Configuration multi-environnement

    application.yml (base)

    socle:
      app_name: ${APP_NAME:my-app}
      env_name: ${ENV_NAME:DEV}
      region: ${REGION:local}
    
    spring:
      profiles:
        active: ${PROFILE:dev}
    

    application-dev.yml

    socle:
      kvbus:
        mode: in_memory
      techdb:
        console:
          enabled: true
      admin:
        auth:
          enabled: false
    
    logging:
      level:
        eu.lmvi.socle: DEBUG
    

    application-prod.yml

    socle:
      kvbus:
        mode: redis
        redis:
          host: ${REDIS_HOST}
          password: ${REDIS_PASSWORD}
      techdb:
        console:
          enabled: false
      logging:
        forwarder:
          enabled: true
      auth:
        enabled: true
      admin:
        auth:
          enabled: true
    
    logging:
      level:
        eu.lmvi.socle: INFO
    

    8. Dockerfile complet

    # Build stage
    FROM eclipse-temurin:21-jdk-alpine AS build
    WORKDIR /app
    COPY pom.xml .
    COPY src ./src
    RUN apk add --no-cache maven && \
        mvn clean package -DskipTests
    
    # Runtime stage
    FROM eclipse-temurin:21-jre-alpine
    WORKDIR /app
    
    # Security
    RUN addgroup -S app && adduser -S app -G app
    USER app
    
    # Copy artifact
    COPY --from=build --chown=app:app /app/target/*.jar app.jar
    
    # Config
    ENV JAVA_OPTS="-XX:+UseContainerSupport -XX:MaxRAMPercentage=75.0 -Djava.security.egd=file:/dev/./urandom"
    
    # Health check
    HEALTHCHECK --interval=30s --timeout=10s --retries=3 \
        CMD wget -qO- http://localhost:8080/admin/health/live || exit 1
    
    EXPOSE 8080
    
    ENTRYPOINT ["sh", "-c", "java $JAVA_OPTS -jar app.jar"]
    

    9. Kubernetes deployment complet

    Voir 16-KUBERNETES pour l’exemple complet de déploiement K8s.

    10. Références

  • Socle V004 – Guides Pratiques

    Socle V004 – Guides Pratiques

    17 – How-To Guides

    Version : 4.0.0 Date : 2025-12-09

    1. Comment créer un nouveau Worker

    1.1 Worker simple

    package com.myapp.worker;
    
    import eu.lmvi.socle.worker.Worker;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.stereotype.Component;
    
    @Component
    public class MyWorker implements Worker {
    
        private static final Logger log = LoggerFactory.getLogger(MyWorker.class);
        private volatile boolean running = false;
    
        @Override
        public String getName() {
            return "my-worker";
        }
    
        @Override
        public void initialize() {
            log.info("[{}] Initializing", getName());
        }
    
        @Override
        public void start() {
            log.info("[{}] Starting", getName());
            running = true;
        }
    
        @Override
        public void doWork() {
            if (!running) return;
            // Votre logique ici
        }
    
        @Override
        public void stop() {
            log.info("[{}] Stopping", getName());
            running = false;
        }
    
        @Override
        public boolean isHealthy() {
            return running;
        }
    
        @Override
        public Map<String, Object> getStats() {
            return Map.of("running", running);
        }
    }
    

    1.2 Worker avec priorité

    @Override
    public int getStartPriority() {
        return 10;  // Démarre en premier
    }
    
    @Override
    public int getStopPriority() {
        return 90;  // S'arrête en dernier
    }
    

    1.3 Worker schedulé

    @Override
    public String getSchedule() {
        return "0 0 6 * * ?";  // Tous les jours à 6h
    }
    
    @Override
    public boolean isScheduled() {
        return true;
    }
    

    2. Comment utiliser KvBus

    2.1 Opérations basiques

    @Service
    public class MyService {
    
        @Autowired
        private KvBus kvBus;
    
        public void example() {
            // Stocker
            kvBus.put("key", "value");
            kvBus.put("key-with-ttl", "value", Duration.ofHours(1));
    
            // Récupérer
            Optional<String> value = kvBus.get("key");
    
            // Supprimer
            kvBus.delete("key");
    
            // Compteur atomique
            long count = kvBus.increment("counter");
        }
    }
    

    2.2 JSON

    // Stocker un objet
    kvBus.putJson("order:123", order);
    
    // Récupérer un objet
    Optional<Order> order = kvBus.getJson("order:123", Order.class);
    

    2.3 Lock distribué

    public boolean tryLock(String resource) {
        return kvBus.putIfAbsent("lock:" + resource, "locked", Duration.ofMinutes(5));
    }
    
    public void unlock(String resource) {
        kvBus.delete("lock:" + resource);
    }
    

    3. Comment utiliser SharedDataRegistry

    3.1 Key-Value

    @Service
    public class MyService {
    
        @Autowired
        private SharedDataRegistry registry;
    
        public void example() {
            // Stocker avec niveau de santé
            registry.put("database.connected", true, HealthLevel.CRITICAL);
    
            // Récupérer
            boolean connected = registry.getBoolean("database.connected").orElse(false);
        }
    }
    

    3.2 Compteurs

    // Créer une séquence
    registry.createSequence("orders.processed", 0, HealthLevel.NORMAL);
    
    // Incrémenter
    long count = registry.incrementSequence("orders.processed");
    
    // Lire
    long total = registry.getSequence("orders.processed");
    

    4. Comment utiliser TechDB (V4)

    4.1 Offsets

    @Service
    public class MyService {
    
        @Autowired
        private TechDbManager techDb;
    
        public void example() {
            // Sauvegarder un offset
            techDb.saveOffset("kafka", "my-topic-0", 123456L, null);
    
            // Récupérer un offset
            OptionalLong offset = techDb.getOffset("kafka", "my-topic-0");
        }
    }
    

    4.2 État des workers

    // Sauvegarder l'état
    techDb.saveWorkerState("my-worker", "RUNNING", Map.of("progress", 50));
    
    // Récupérer l'état
    Optional<WorkerState> state = techDb.getWorkerState("my-worker");
    

    4.3 Événements techniques

    // Logger un événement
    techDb.logEvent("ERROR", Map.of(
        "message", "Connection failed",
        "target", "database"
    ));
    
    // Récupérer les événements
    List<TechEvent> events = techDb.getEvents("ERROR", Instant.now().minus(1, ChronoUnit.HOURS), 100);
    

    5. Comment implémenter un Pipeline

    5.1 Pipeline simple

    Pipeline<Order, ProcessedOrder> pipeline = PipelineBuilder
        .<Order, ProcessedOrder>create("order-processing")
        .addStep("validate", this::validateOrder)
        .addStep("enrich", this::enrichOrder)
        .addStep("process", this::processOrder)
        .build();
    
    PipelineResult<ProcessedOrder> result = pipelineEngine.execute(pipeline, order);
    

    5.2 Étape personnalisée

    public class ValidationStep implements PipelineStep<Order, ValidatedOrder> {
    
        @Override
        public String getName() {
            return "validation";
        }
    
        @Override
        public StepResult<ValidatedOrder> execute(Order input, PipelineContext context) {
            // Validation...
            return StepResult.success(getName(), new ValidatedOrder(input), Duration.ZERO);
        }
    
        @Override
        public boolean isRetryable() {
            return false;
        }
    }
    

    6. Comment configurer le logging (V4)

    6.1 Log4j2 basique

    <!-- src/main/resources/log4j2.xml -->
    <?xml version="1.0" encoding="UTF-8"?>
    <Configuration status="WARN">
        <Appenders>
            <Console name="Console" target="SYSTEM_OUT">
                <PatternLayout pattern="%d{ISO8601} %-5level [%thread] %logger{36} - %msg%n"/>
            </Console>
        </Appenders>
        <Loggers>
            <Root level="INFO">
                <AppenderRef ref="Console"/>
            </Root>
        </Loggers>
    </Configuration>
    

    6.2 Avec LogForwarder

    # application.yml
    socle:
      logging:
        forwarder:
          enabled: true
          transport-mode: http
          log-hub-url: https://logs.mycompany.com/api/ingest
    

    7. Comment utiliser l’authentification JWT (V4)

    7.1 Configuration

    socle:
      auth:
        enabled: true
        server-url: https://auth.mycompany.com
        api-key: ${API_KEY}
    

    7.2 Utilisation

    @Service
    public class SecuredService {
    
        @Autowired(required = false)
        private SocleAuthClient authClient;
    
        public void callSecuredApi() {
            if (authClient == null) {
                throw new IllegalStateException("Auth not configured");
            }
    
            String token = authClient.getValidAccessToken();
    
            // Utiliser le token dans les requêtes HTTP
            Request request = new Request.Builder()
                .url("https://api.mycompany.com/data")
                .header("Authorization", "Bearer " + token)
                .build();
        }
    }
    

    8. Comment ajouter des métriques personnalisées

    8.1 Counter

    @Component
    public class MyMetrics {
    
        private final Counter ordersProcessed;
    
        public MyMetrics(MeterRegistry registry) {
            this.ordersProcessed = Counter.builder("my_orders_processed_total")
                .description("Total orders processed")
                .register(registry);
        }
    
        public void orderProcessed() {
            ordersProcessed.increment();
        }
    }
    

    8.2 Timer

    private final Timer processingTime;
    
    public MyMetrics(MeterRegistry registry) {
        this.processingTime = Timer.builder("my_processing_duration_seconds")
            .description("Processing duration")
            .publishPercentiles(0.5, 0.95, 0.99)
            .register(registry);
    }
    
    public void process() {
        Timer.Sample sample = Timer.start();
        try {
            doProcess();
        } finally {
            sample.stop(processingTime);
        }
    }
    

    9. Comment gérer la résilience

    9.1 Retry

    @Autowired
    private RetryTemplate retryTemplate;
    
    public Data fetchData() {
        return retryTemplate.execute(() -> httpClient.get("/api/data"));
    }
    

    9.2 Circuit Breaker

    @Autowired
    private CircuitBreakerRegistry cbRegistry;
    
    public Data fetchData() {
        CircuitBreaker cb = cbRegistry.getOrCreate("external-api");
    
        return cb.executeWithFallback(
            () -> httpClient.get("/api/data"),
            () -> getCachedData()
        );
    }
    

    10. Comment déployer sur Kubernetes

    10.1 Build de l’image

    # Build
    mvn clean package -DskipTests
    docker build -t my-app:1.0.0 .
    
    # Push
    docker push my-registry/my-app:1.0.0
    

    10.2 Déploiement

    # Appliquer les manifests
    kubectl apply -f k8s/
    
    # Ou avec Helm
    helm install my-app ./chart -n my-namespace
    

    10.3 Vérification

    # Logs
    kubectl logs -f deployment/my-app
    
    # Port forward
    kubectl port-forward svc/my-app 8080:80
    
    # Health check
    curl http://localhost:8080/admin/health
    

    11. Comment migrer de V3 à V4

    11.1 Dépendances Maven

    <!-- Remplacer Logback par Log4j2 -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-log4j2</artifactId>
    </dependency>
    
    <!-- Ajouter H2 -->
    <dependency>
        <groupId>com.h2database</groupId>
        <artifactId>h2</artifactId>
    </dependency>
    

    11.2 Configuration

    # Ajouter à application.yml
    socle:
      techdb:
        enabled: true
      logging:
        forwarder:
          enabled: false
      auth:
        enabled: false
      worker-registry:
        enabled: false
    
    logging:
      config: classpath:log4j2.xml
    

    11.3 Fichiers Log4j2

    Créer src/main/resources/log4j2.xml et log4j2.component.properties.

    Voir 25-MIGRATION-V3-V4 pour le guide complet.

    12. Comment debugger

    12.1 H2 Console

    socle:
      techdb:
        console:
          enabled: true
          path: /h2-console
    

    Accéder à http://localhost:8080/h2-console

    12.2 Endpoints Admin

    # État de santé
    curl http://localhost:8080/admin/health
    
    # Workers
    curl http://localhost:8080/admin/workers
    
    # Registry
    curl http://localhost:8080/admin/registry
    
    # Métriques
    curl http://localhost:8080/actuator/prometheus
    

    12.3 Logs

    // Activer le debug pour le Socle
    logging.level.eu.lmvi.socle=DEBUG
    

    13. Références

  • Socle V004 – Introduction

    Socle V004 – Introduction

    01 – Introduction au Socle V4

    Version : 4.0.0 Date : 2025-01-25

    1. Qu’est-ce que le Socle V4 ?

    Le Socle V4 est un framework Java de grade production construit sur Spring Boot 3.2.1 qui implémente le pattern MOP (Main Orchestrator Process). Il fournit une base solide pour construire des applications d’entreprise robustes et observables.

    Évolution depuis V3

    Le Socle V4 conserve et étend l’architecture V3 en ajoutant :

    Nouveauté V4 Description
    H2 TechDB Base embarquée pour état technique (remplace Nitrite)
    Log4j2 Framework logging haute performance (remplace Logback)
    LogForwarder Centralisation des logs vers LogHub (HTTP/NATS)
    SocleAuthClient Client authentification JWT
    WorkerRegistryClient Auto-enregistrement des workers
    StatusDashboard Dashboard HTML de supervision temps réel (port 9374)
    Pipeline V2 Pipeline asynchrone avec garantie at-least-once (Queue/Claim/Ack)

    2. Philosophie « MOP Pilote Tout »

    Le Main Orchestrator Process est le cœur du framework :

    ┌─────────────────────────────────────────────────────────────────┐
    │                           MOP                                    │
    │  - Orchestre tous les Workers                                   │
    │  - Gère le lifecycle (start/stop)                               │
    │  - Appelle doWork() automatiquement                             │
    │  - Garantit le shutdown gracieux                                │
    └─────────────────────────────────────────────────────────────────┘
                                  │
             ┌────────────────────┼────────────────────┐
             ▼                    ▼                    ▼
        ┌─────────┐         ┌─────────┐         ┌─────────┐
        │ Worker  │         │ Worker  │         │  HTTP   │
        │ Métier  │         │ Métier  │         │ Worker  │
        └─────────┘         └─────────┘         └─────────┘
    

    Principes clés

    1. Orchestration centralisée : Le MOP contrôle tout le lifecycle
    2. Démarrage ordonné : Workers par priorité (petit → grand), HTTP en dernier
    3. Arrêt gracieux : HTTP d’abord (drain), puis Workers
    4. Scheduling automatique : doWork() appelé selon cron ou interval

    3. Les 4 principes fondamentaux V4

    3.1 Portabilité

    • Fonctionne sur ARM/AMD64, Linux/macOS
    • Aucune dépendance serveur externe obligatoire
    • Base H2 embarquée pour l’état technique

    3.2 Sécurité

    • Aucun port entrant sur les NUC/agents
    • Communication sortante uniquement (HTTP/NATS)
    • Authentification JWT pour les services centraux

    3.3 Observabilité

    • Logs centralisés via LogForwarder
    • Corrélation par correlationId / execId
    • Suivi des workers via Registry

    3.4 Standardisation

    • Même authentification partout
    • Même format de logs
    • Même enregistrement des workers

    4. Stack technique

    Composant Version Usage
    Java 21 LTS Runtime
    Spring Boot 3.2.1 Framework
    Log4j2 2.22.1 Logging (nouveau V4)
    LMAX Disruptor 4.0.0 AsyncLoggers
    H2 2.2.x Base technique embarquée (nouveau V4)
    Kafka 3.6.0 Messaging
    NATS 2.17.0 Messaging
    Redisson 3.24.3 Redis client
    OkHttp 4.12.0 HTTP client
    Micrometer 1.12.0 Metrics

    5. Composants du Socle

    Composants V3 (conservés)

    Package Description
    mop Main Orchestrator Process
    worker Interface Worker
    config SocleConfiguration
    kv KvBus (in_memory / Redis)
    shared SharedDataRegistry
    supervisor Supervision heartbeats
    http HttpWorker, TomcatManager
    admin AdminRestApi
    metrics SocleMetrics
    pipeline PipelineEngine
    resilience CircuitBreaker, Retry
    scheduler WorkerScheduler
    security AdminAuthFilter, RateLimit

    Nouveaux composants V4

    Package Description
    techdb H2 TechDB Manager
    logging Log4j2 + LogForwarder
    client/auth SocleAuthClient
    client/registry WorkerRegistryClient

    6. Cas d’usage

    Le Socle V4 est idéal pour :

    • Agents de collecte (DB2 Journal Reader, CDC)
    • Services de synchronisation (ODH-sync)
    • Proxies et bridges (Kafka Proxy)
    • Workers de traitement (ETL, pipelines)
    • Services multi-région (MTQ, GUA, REU, etc.)

    7. Prérequis

    Développement

    • JDK 21+
    • Maven 3.9+
    • IDE (IntelliJ IDEA recommandé)

    Production

    • JRE 21+
    • Docker (optionnel)
    • Accès NATS ou HTTP pour LogForwarder (optionnel)

    8. Premiers pas

    # Cloner le projet
    git clone <repo>/socle-v004.git
    
    # Build
    cd socle-v004
    mvn clean package -DskipTests
    
    # Run
    java -jar target/socle-v004-4.0.0.jar
    
    # Vérifier
    curl http://localhost:8080/health
    

    9. Documentation

    Document Description
    02-ARCHITECTURE Architecture détaillée
    03-QUICKSTART Guide de démarrage
    08-SUPERVISOR Supervision et heartbeats
    09-PIPELINE Pipeline V1 et V2
    21-H2-TECHDB Base H2 (V4)
    22-LOG4J2-LOGFORWARDER Logging V4
    25-MIGRATION-V3-V4 Migration
    27-STATUS-DASHBOARD Dashboard supervision
    GUIDE-METHODOLOGIQUE Bonnes pratiques

    10. Support

    • Issues : GitHub Issues
    • Documentation : Ce dossier docs/Help/
    • Exemples : 20-EXEMPLES
  • Socle V004 – Pipeline

    Socle V004 – Pipeline

    09 – Pipeline Engine

    Version : 4.1.0 Date : 2025-12-28

    1. Introduction

    Le PipelineEngine permet de définir des chaînes de traitement composées d’étapes (steps) exécutées séquentiellement ou en parallèle.

    Caractéristiques

    • Définition déclarative des pipelines
    • Exécution séquentielle ou parallèle
    • Gestion des erreurs et retry
    • Context partagé entre étapes
    • Métriques et logging intégrés

    2. Architecture

    ┌─────────────────────────────────────────────────────────────┐
    │                      PipelineEngine                          │
    └─────────────────────────────────────────────────────────────┘
                                  │
                                  ▼
    ┌─────────────────────────────────────────────────────────────┐
    │                      Pipeline                                │
    │  ┌─────────────────────────────────────────────────────┐    │
    │  │              PipelineContext                         │    │
    │  │  - input data                                        │    │
    │  │  - shared state                                      │    │
    │  │  - results                                           │    │
    │  └─────────────────────────────────────────────────────┘    │
    │                                                              │
    │  ┌──────────┐   ┌──────────┐   ┌──────────┐                │
    │  │  Step 1  │──►│  Step 2  │──►│  Step 3  │                │
    │  └──────────┘   └──────────┘   └──────────┘                │
    │                                                              │
    └─────────────────────────────────────────────────────────────┘
    

    3. Interface Pipeline

    package eu.lmvi.socle.pipeline;
    
    public interface Pipeline<I, O> {
    
        /**
         * Nom du pipeline
         */
        String getName();
    
        /**
         * Exécute le pipeline
         */
        PipelineResult<O> execute(I input);
    
        /**
         * Exécute le pipeline avec context
         */
        PipelineResult<O> execute(I input, PipelineContext context);
    
        /**
         * Liste des étapes
         */
        List<PipelineStep<?, ?>> getSteps();
    }
    

    4. Interface PipelineStep

    package eu.lmvi.socle.pipeline;
    
    public interface PipelineStep<I, O> {
    
        /**
         * Nom de l'étape
         */
        String getName();
    
        /**
         * Exécute l'étape
         */
        StepResult<O> execute(I input, PipelineContext context);
    
        /**
         * L'étape peut-elle être retryée ?
         */
        default boolean isRetryable() {
            return true;
        }
    
        /**
         * Nombre max de retries
         */
        default int getMaxRetries() {
            return 3;
        }
    
        /**
         * L'étape est-elle optionnelle ?
         */
        default boolean isOptional() {
            return false;
        }
    }
    

    5. Context et Result

    5.1 PipelineContext

    package eu.lmvi.socle.pipeline;
    
    public class PipelineContext {
        private final Map<String, Object> attributes = new ConcurrentHashMap<>();
        private final List<StepResult<?>> stepResults = new ArrayList<>();
        private final String correlationId;
        private final Instant startTime;
    
        public PipelineContext() {
            this.correlationId = UUID.randomUUID().toString();
            this.startTime = Instant.now();
        }
    
        public void put(String key, Object value) {
            attributes.put(key, value);
        }
    
        public <T> Optional<T> get(String key, Class<T> type) {
            return Optional.ofNullable(attributes.get(key))
                .filter(type::isInstance)
                .map(type::cast);
        }
    
        public void addStepResult(StepResult<?> result) {
            stepResults.add(result);
        }
    
        public List<StepResult<?>> getStepResults() {
            return Collections.unmodifiableList(stepResults);
        }
    
        public String getCorrelationId() {
            return correlationId;
        }
    
        public Duration getElapsedTime() {
            return Duration.between(startTime, Instant.now());
        }
    }
    

    5.2 StepResult

    package eu.lmvi.socle.pipeline;
    
    public record StepResult<T>(
        String stepName,
        StepStatus status,
        T output,
        Exception error,
        Duration duration,
        int attempts
    ) {
        public boolean isSuccess() {
            return status == StepStatus.SUCCESS;
        }
    
        public boolean isFailure() {
            return status == StepStatus.FAILURE;
        }
    
        public boolean isSkipped() {
            return status == StepStatus.SKIPPED;
        }
    
        public static <T> StepResult<T> success(String name, T output, Duration duration) {
            return new StepResult<>(name, StepStatus.SUCCESS, output, null, duration, 1);
        }
    
        public static <T> StepResult<T> failure(String name, Exception error, Duration duration, int attempts) {
            return new StepResult<>(name, StepStatus.FAILURE, null, error, duration, attempts);
        }
    
        public static <T> StepResult<T> skipped(String name) {
            return new StepResult<>(name, StepStatus.SKIPPED, null, null, Duration.ZERO, 0);
        }
    }
    
    public enum StepStatus {
        SUCCESS,
        FAILURE,
        SKIPPED
    }
    

    5.3 PipelineResult

    package eu.lmvi.socle.pipeline;
    
    public record PipelineResult<T>(
        String pipelineName,
        PipelineStatus status,
        T output,
        PipelineContext context,
        Duration totalDuration
    ) {
        public boolean isSuccess() {
            return status == PipelineStatus.SUCCESS;
        }
    
        public List<StepResult<?>> getFailedSteps() {
            return context.getStepResults().stream()
                .filter(StepResult::isFailure)
                .toList();
        }
    }
    
    public enum PipelineStatus {
        SUCCESS,
        PARTIAL_SUCCESS,
        FAILURE
    }
    

    6. Implémentation

    6.1 DefaultPipelineEngine

    package eu.lmvi.socle.pipeline;
    
    @Component
    public class DefaultPipelineEngine implements PipelineEngine {
    
        private static final Logger log = LoggerFactory.getLogger(DefaultPipelineEngine.class);
    
        @Override
        public <I, O> PipelineResult<O> execute(Pipeline<I, O> pipeline, I input) {
            return execute(pipeline, input, new PipelineContext());
        }
    
        @Override
        @SuppressWarnings("unchecked")
        public <I, O> PipelineResult<O> execute(Pipeline<I, O> pipeline, I input, PipelineContext context) {
            log.info("[{}] Starting pipeline execution", pipeline.getName());
            Instant start = Instant.now();
    
            Object currentInput = input;
            O finalOutput = null;
            boolean hasFailure = false;
    
            for (PipelineStep<?, ?> step : pipeline.getSteps()) {
                PipelineStep<Object, Object> typedStep = (PipelineStep<Object, Object>) step;
    
                StepResult<Object> result = executeStep(typedStep, currentInput, context);
                context.addStepResult(result);
    
                if (result.isSuccess()) {
                    currentInput = result.output();
                    finalOutput = (O) result.output();
                } else if (result.isFailure()) {
                    if (!step.isOptional()) {
                        hasFailure = true;
                        log.error("[{}] Pipeline failed at step: {}", pipeline.getName(), step.getName());
                        break;
                    }
                    log.warn("[{}] Optional step failed, continuing: {}", pipeline.getName(), step.getName());
                }
            }
    
            Duration duration = Duration.between(start, Instant.now());
            PipelineStatus status = hasFailure
                ? PipelineStatus.FAILURE
                : (context.getStepResults().stream().anyMatch(StepResult::isFailure)
                    ? PipelineStatus.PARTIAL_SUCCESS
                    : PipelineStatus.SUCCESS);
    
            log.info("[{}] Pipeline completed with status: {} in {}ms",
                pipeline.getName(), status, duration.toMillis());
    
            return new PipelineResult<>(pipeline.getName(), status, finalOutput, context, duration);
        }
    
        private <I, O> StepResult<O> executeStep(PipelineStep<I, O> step, I input, PipelineContext context) {
            log.debug("[{}] Executing step", step.getName());
            Instant start = Instant.now();
            int attempts = 0;
            Exception lastError = null;
    
            while (attempts < step.getMaxRetries()) {
                attempts++;
                try {
                    StepResult<O> result = step.execute(input, context);
                    if (result.isSuccess()) {
                        Duration duration = Duration.between(start, Instant.now());
                        log.debug("[{}] Step completed in {}ms", step.getName(), duration.toMillis());
                        return StepResult.success(step.getName(), result.output(), duration);
                    }
                    lastError = result.error();
                } catch (Exception e) {
                    lastError = e;
                    log.warn("[{}] Step attempt {} failed: {}", step.getName(), attempts, e.getMessage());
    
                    if (!step.isRetryable() || attempts >= step.getMaxRetries()) {
                        break;
                    }
    
                    // Backoff
                    try {
                        Thread.sleep(attempts * 1000L);
                    } catch (InterruptedException ie) {
                        Thread.currentThread().interrupt();
                        break;
                    }
                }
            }
    
            Duration duration = Duration.between(start, Instant.now());
            return StepResult.failure(step.getName(), lastError, duration, attempts);
        }
    }
    

    6.2 Pipeline Builder

    package eu.lmvi.socle.pipeline;
    
    public class PipelineBuilder<I, O> {
    
        private final String name;
        private final List<PipelineStep<?, ?>> steps = new ArrayList<>();
    
        public PipelineBuilder(String name) {
            this.name = name;
        }
    
        public static <I, O> PipelineBuilder<I, O> create(String name) {
            return new PipelineBuilder<>(name);
        }
    
        public <NO> PipelineBuilder<I, NO> addStep(PipelineStep<O, NO> step) {
            steps.add(step);
            return (PipelineBuilder<I, NO>) this;
        }
    
        public <NO> PipelineBuilder<I, NO> addStep(String name, Function<O, NO> processor) {
            return addStep(new FunctionalStep<>(name, processor));
        }
    
        public Pipeline<I, O> build() {
            return new DefaultPipeline<>(name, List.copyOf(steps));
        }
    
        private record FunctionalStep<I, O>(String name, Function<I, O> processor) implements PipelineStep<I, O> {
    
            @Override
            public String getName() {
                return name;
            }
    
            @Override
            public StepResult<O> execute(I input, PipelineContext context) {
                O output = processor.apply(input);
                return StepResult.success(name, output, Duration.ZERO);
            }
        }
    }
    

    7. Utilisation

    7.1 Pipeline simple

    @Component
    public class OrderPipeline {
    
        @Autowired
        private PipelineEngine engine;
    
        public PipelineResult<OrderResult> processOrder(Order order) {
            Pipeline<Order, OrderResult> pipeline = PipelineBuilder
                .<Order, OrderResult>create("order-processing")
                .addStep("validate", this::validateOrder)
                .addStep("enrich", this::enrichOrder)
                .addStep("process", this::processOrder)
                .addStep("notify", this::notifyCustomer)
                .build();
    
            return engine.execute(pipeline, order);
        }
    
        private ValidatedOrder validateOrder(Order order) {
            // Validation...
            return new ValidatedOrder(order);
        }
    
        private EnrichedOrder enrichOrder(ValidatedOrder order) {
            // Enrichissement...
            return new EnrichedOrder(order);
        }
    
        private ProcessedOrder processOrder(EnrichedOrder order) {
            // Traitement...
            return new ProcessedOrder(order);
        }
    
        private OrderResult notifyCustomer(ProcessedOrder order) {
            // Notification...
            return new OrderResult(order, "SUCCESS");
        }
    }
    

    7.2 Étapes personnalisées

    public class ValidationStep implements PipelineStep<Order, ValidatedOrder> {
    
        @Override
        public String getName() {
            return "validation";
        }
    
        @Override
        public StepResult<ValidatedOrder> execute(Order input, PipelineContext context) {
            List<String> errors = new ArrayList<>();
    
            if (input.getCustomerId() == null) {
                errors.add("Missing customer ID");
            }
            if (input.getItems().isEmpty()) {
                errors.add("No items in order");
            }
    
            if (!errors.isEmpty()) {
                return StepResult.failure(getName(),
                    new ValidationException(errors),
                    Duration.ZERO, 1);
            }
    
            // Stocker des données dans le context
            context.put("customerId", input.getCustomerId());
    
            return StepResult.success(getName(), new ValidatedOrder(input), Duration.ZERO);
        }
    
        @Override
        public boolean isRetryable() {
            return false;  // Validation ne doit pas être retryée
        }
    }
    

    7.3 Étape optionnelle

    public class NotificationStep implements PipelineStep<ProcessedOrder, ProcessedOrder> {
    
        @Override
        public String getName() {
            return "notification";
        }
    
        @Override
        public StepResult<ProcessedOrder> execute(ProcessedOrder input, PipelineContext context) {
            try {
                sendNotification(input);
                return StepResult.success(getName(), input, Duration.ZERO);
            } catch (Exception e) {
                return StepResult.failure(getName(), e, Duration.ZERO, 1);
            }
        }
    
        @Override
        public boolean isOptional() {
            return true;  // Le pipeline continue même si la notif échoue
        }
    }
    

    7.4 Utilisation du context

    public class EnrichmentStep implements PipelineStep<ValidatedOrder, EnrichedOrder> {
    
        @Autowired
        private CustomerService customerService;
    
        @Override
        public String getName() {
            return "enrichment";
        }
    
        @Override
        public StepResult<EnrichedOrder> execute(ValidatedOrder input, PipelineContext context) {
            // Lire depuis le context
            String customerId = context.get("customerId", String.class).orElseThrow();
    
            // Enrichir
            Customer customer = customerService.getCustomer(customerId);
            EnrichedOrder enriched = new EnrichedOrder(input, customer);
    
            // Écrire dans le context pour les étapes suivantes
            context.put("customerEmail", customer.getEmail());
    
            return StepResult.success(getName(), enriched, Duration.ZERO);
        }
    }
    

    8. Gestion des erreurs

    8.1 Retry automatique

    public class ExternalApiStep implements PipelineStep<Data, ApiResponse> {
    
        @Override
        public String getName() {
            return "external-api-call";
        }
    
        @Override
        public boolean isRetryable() {
            return true;
        }
    
        @Override
        public int getMaxRetries() {
            return 5;  // 5 tentatives max
        }
    
        @Override
        public StepResult<ApiResponse> execute(Data input, PipelineContext context) {
            // Appel API qui peut échouer
            ApiResponse response = callApi(input);
            return StepResult.success(getName(), response, Duration.ZERO);
        }
    }
    

    8.2 Traitement des résultats

    PipelineResult<OrderResult> result = engine.execute(pipeline, order);
    
    if (result.isSuccess()) {
        log.info("Order processed successfully: {}", result.output());
    } else {
        // Analyser les étapes en échec
        for (StepResult<?> stepResult : result.getFailedSteps()) {
            log.error("Step {} failed after {} attempts: {}",
                stepResult.stepName(),
                stepResult.attempts(),
                stepResult.error().getMessage());
        }
    
        // Décider quoi faire
        if (result.status() == PipelineStatus.PARTIAL_SUCCESS) {
            // Certaines étapes optionnelles ont échoué
            handlePartialSuccess(result);
        } else {
            // Échec complet
            handleFailure(result);
        }
    }
    

    9. Pipelines parallèles

    public class ParallelEnrichmentStep implements PipelineStep<Order, EnrichedOrder> {
    
        @Autowired
        private ExecutorService executor;
    
        @Override
        public StepResult<EnrichedOrder> execute(Order input, PipelineContext context) {
            // Exécuter plusieurs enrichissements en parallèle
            CompletableFuture<Customer> customerFuture = CompletableFuture.supplyAsync(
                () -> customerService.getCustomer(input.getCustomerId()), executor);
    
            CompletableFuture<Inventory> inventoryFuture = CompletableFuture.supplyAsync(
                () -> inventoryService.checkInventory(input.getItems()), executor);
    
            CompletableFuture<Pricing> pricingFuture = CompletableFuture.supplyAsync(
                () -> pricingService.calculatePrice(input), executor);
    
            try {
                CompletableFuture.allOf(customerFuture, inventoryFuture, pricingFuture).join();
    
                EnrichedOrder enriched = new EnrichedOrder(
                    input,
                    customerFuture.get(),
                    inventoryFuture.get(),
                    pricingFuture.get()
                );
    
                return StepResult.success(getName(), enriched, Duration.ZERO);
            } catch (Exception e) {
                return StepResult.failure(getName(), e, Duration.ZERO, 1);
            }
        }
    }
    

    10. Métriques

    @Component
    public class PipelineMetrics {
    
        private final MeterRegistry registry;
    
        public void recordPipelineExecution(PipelineResult<?> result) {
            Timer.builder("socle_pipeline_duration")
                .tag("pipeline", result.pipelineName())
                .tag("status", result.status().name())
                .register(registry)
                .record(result.totalDuration());
    
            Counter.builder("socle_pipeline_executions")
                .tag("pipeline", result.pipelineName())
                .tag("status", result.status().name())
                .register(registry)
                .increment();
        }
    
        public void recordStepExecution(StepResult<?> result) {
            Timer.builder("socle_pipeline_step_duration")
                .tag("step", result.stepName())
                .tag("status", result.status().name())
                .register(registry)
                .record(result.duration());
        }
    }
    

    11. Bonnes pratiques

    DO

    • Garder les étapes petites et focalisées
    • Utiliser le context pour partager des données entre étapes
    • Marquer les étapes non-critiques comme optionnelles
    • Logger au niveau des étapes pour le debugging
    • Configurer des retries appropriés pour les appels externes

    DON’T

    • Ne pas faire de pipelines avec trop d’étapes (> 10)
    • Ne pas mélanger logique métier et infrastructure dans une étape
    • Ne pas ignorer les erreurs des étapes obligatoires
    • Ne pas utiliser de pipelines pour du traitement simple

    12. Références

    Pipeline Engine V2 (Nouveau)

    Ajouté en version 4.1.0

    13. Introduction Pipeline V2

    Le Pipeline V2 est une refonte complète du système de pipelines, conçue pour les applications à haute disponibilité nécessitant :

    • Garantie at-least-once : Aucune perte de message grâce au pattern Queue/Claim/Ack
    • Virtual Threads (Java 21) : Scalabilité maximale avec les threads virtuels
    • Reprise après crash : État persisté permettant la reprise au dernier stage OK
    • DLQ (Dead Letter Queue) : Gestion des échecs définitifs avec replay

    Quand utiliser V2 vs V1 ?

    Critère Pipeline V1 Pipeline V2
    Exécution Synchrone, mono-thread Asynchrone, multi-thread
    Garantie de livraison At-most-once At-least-once
    Persistance Non Oui (TechDB / H2)
    DLQ Non Oui
    Use case Traitement simple Flux critiques, haute disponibilité

    14. Architecture V2

    ┌─────────────────────────────────────────────────────────────────┐
    │                            MOP                                   │
    │               (supervision, restart, health)                     │
    └───────────────────────────┬─────────────────────────────────────┘
                                │
    ┌───────────────────────────▼─────────────────────────────────────┐
    │                     PipelineV2                                   │
    │                                                                  │
    │  ┌────────────┐     ┌────────────┐     ┌────────────┐          │
    │  │  Stage A   │────►│  Stage B   │────►│  Stage C   │          │
    │  │ ┌────────┐ │     │ ┌────────┐ │     │ ┌────────┐ │          │
    │  │ │ Queue  │ │     │ │ Queue  │ │     │ │ Queue  │ │          │
    │  │ └────────┘ │     │ └────────┘ │     │ └────────┘ │          │
    │  │ VThreads  │     │ VThreads  │     │ VThreads  │          │
    │  └────────────┘     └────────────┘     └────────────┘          │
    │                                                                  │
    │         ┌─────────────────┴─────────────────┐                   │
    │    ┌────▼────┐                        ┌────▼────┐              │
    │    │ Context │                        │   DLQ   │              │
    │    │ (state) │                        │ (errors)│              │
    │    └─────────┘                        └─────────┘              │
    └─────────────────────────────────────────────────────────────────┘
    

    Composants

    • StageQueue : Queue avec sémantique Claim/Ack entre chaque stage
    • VThreadStageExecutor : Workers Virtual Threads consommant les queues
    • PersistentPipelineContext : État persisté pour reprise après crash
    • DLQ : Messages en échec après max retries

    15. Création d’un Pipeline V2

    15.1 Builder Fluent

    import eu.lmvi.socle.pipeline.v2.PipelineBuilderV2;
    import eu.lmvi.socle.pipeline.v2.PipelineV2;
    
    // Création du pipeline
    PipelineV2<Order, Result> pipeline = PipelineBuilderV2
        .<Order>create("order-processing")
        .description("Pipeline de traitement des commandes")
    
        // Stage 1: Validation (2 workers, timeout 30s)
        .addStage("validate", this::validateOrder)
            .concurrency(2)
            .timeout(Duration.ofSeconds(30))
    
        // Stage 2: Enrichissement (4 workers, queue 1000)
        .addStage("enrich", this::enrichOrder)
            .concurrency(4)
            .queueSize(1000)
    
        // Stage 3: Publication (2 workers)
        .addStage("publish", this::publishOrder)
            .concurrency(2)
            .maxRetries(5)
            .backoff(Duration.ofMillis(200), 2.0)
    
        .build();
    

    15.2 Configuration par stage

    .addStage("nom", processor)
        .concurrency(4)           // Nombre de workers parallèles
        .queueSize(500)           // Taille max de la queue
        .timeout(Duration.ofMinutes(5))  // Timeout par message
        .maxRetries(3)            // Tentatives avant DLQ
        .backoff(base, multiplier) // Backoff exponentiel
        .optional()               // Stage optionnel (échec non bloquant)
    

    16. Cycle de vie

    16.1 Démarrage/Arrêt

    // Démarrer le pipeline (lance tous les workers)
    pipeline.start();
    
    // Vérifier l'état
    boolean running = pipeline.isRunning();
    
    // Arrêt gracieux (attend la fin des traitements en cours)
    pipeline.stop();
    
    // Arrêt immédiat (interrompt tout)
    pipeline.stopNow();
    

    16.2 Soumission de messages

    // Soumettre un message
    String messageId = pipeline.submit(order, "exec-001");
    
    // Avec métadonnées
    String messageId = pipeline.submit(order, "exec-001", Map.of(
        "source", "api",
        "priority", "high"
    ));
    

    16.3 Suivi d’exécution

    // État d'une exécution
    Optional<PipelineState> state = pipeline.getExecutionState("exec-001");
    state.ifPresent(s -> {
        System.out.println("Status: " + s.status());
        System.out.println("Current stage: " + s.currentStage());
        System.out.println("Duration: " + s.duration());
    });
    
    // Liste des exécutions actives
    List<PipelineState> active = pipeline.listActiveExecutions();
    
    // Statistiques globales
    PipelineV2.PipelineStats stats = pipeline.getStats();
    System.out.println("Submitted: " + stats.totalSubmitted());
    System.out.println("Completed: " + stats.totalCompleted());
    System.out.println("Failed: " + stats.totalFailed());
    

    17. Queue/Claim/Ack

    17.1 Pattern de consommation

    Le Pipeline V2 utilise un pattern de consommation fiable :

    1. CLAIM   : Le worker réserve un message (lease)
    2. PROCESS : Le worker traite le message
    3. ACK     : Succès → message supprimé
       ou
       NACK    : Échec → retry ou DLQ
    

    17.2 Lease et récupération

    Si un worker crash pendant le traitement :

    • Le lease expire après le timeout configuré
    • Le message est automatiquement remis en queue
    • Un autre worker le reprend
    // Timeout de lease par défaut: 5 minutes
    // Configurable par stage via timeout()
    

    17.3 Implémentations disponibles

    Implémentation Persistance Use case
    InMemoryStageQueue Non Développement, tests
    TechDbStageQueue Oui (H2) Production

    18. Dead Letter Queue (DLQ)

    18.1 Envoi en DLQ

    Un message est envoyé en DLQ après :

    • maxRetries tentatives échouées
    • Une erreur non-retryable

    18.2 Consultation

    // Via API REST
    GET /admin/pipelines/v2/{name}/dlq
    
    // Programmatiquement (via StageQueue)
    List<DlqMessage<Order>> messages = queue.peekDlq(100);
    for (DlqMessage<Order> msg : messages) {
        System.out.println("ID: " + msg.messageId());
        System.out.println("Error: " + msg.errorMessage());
        System.out.println("Attempts: " + msg.attempts());
        System.out.println("Failed at: " + msg.failedAt());
    }
    

    18.3 Replay

    // Via API REST
    POST /admin/pipelines/v2/{name}/dlq/{messageId}/replay
    
    // Programmatiquement
    boolean replayed = queue.replayFromDlq(messageId);
    
    // Replay tous les messages
    int count = queue.replayAllFromDlq("exec-001");
    

    18.4 Suppression

    // Via API REST
    DELETE /admin/pipelines/v2/{name}/dlq/{messageId}
    
    // Programmatiquement
    queue.deleteFromDlq(messageId);
    
    // Purge complète
    queue.purgeDlq();
    

    19. Persistance et reprise

    19.1 Context persistant

    Le Pipeline V2 peut persister son état pour survivre aux redémarrages :

    import eu.lmvi.socle.pipeline.context.TechDbPipelineContext;
    
    // Créer un context persistant
    Supplier<Connection> connectionSupplier = () -> dataSource.getConnection();
    PersistentPipelineContext context = new TechDbPipelineContext(connectionSupplier);
    
    // Utiliser avec le builder
    PipelineV2<Order, Result> pipeline = PipelineBuilderV2
        .<Order>create("order-processing")
        .context(context)  // Utilise le context persistant
        .addStage(...)
        .build();
    

    19.2 Reprise après crash

    Au redémarrage, les messages non-ackés sont automatiquement re-traités :

    1. Les messages en status CLAIMED dont le lease a expiré → remis en PENDING
    2. Les stages reprennent leur consommation normalement
    3. Grâce à l’idempotence, les messages déjà traités ne sont pas re-traités

    20. Idempotence (OBLIGATOIRE)

    20.1 Pourquoi ?

    Le modèle at-least-once signifie qu’un message peut être traité plusieurs fois :

    • Crash après traitement mais avant ACK
    • Timeout de lease pendant un traitement long
    • Replay depuis la DLQ

    Chaque stage DOIT être idempotent.

    20.2 Patterns recommandés

    // Pattern 1: Vérifier "déjà traité"
    public Order processOrder(Order order) {
        if (orderRepository.exists(order.getId())) {
            return orderRepository.get(order.getId()); // Déjà traité
        }
        // Traiter...
        return orderRepository.save(order);
    }
    
    // Pattern 2: UPSERT au lieu d'INSERT
    public void saveOrder(Order order) {
        orderRepository.upsert(order); // Idempotent par nature
    }
    
    // Pattern 3: Utiliser executionId comme clé
    public void processWithDedup(Order order, String executionId) {
        String dedupKey = "order:" + order.getId() + ":" + executionId;
        if (kvBus.exists(dedupKey)) {
            return; // Déjà traité pour cette exécution
        }
    
        // Traiter...
    
        kvBus.set(dedupKey, true, Duration.ofDays(7));
    }
    

    20.3 Anti-patterns

    // MAUVAIS: INSERT sans vérification
    db.insert(order); // Échoue si déjà inséré
    
    // MAUVAIS: Compteur incrémental
    counter.increment(); // Compté plusieurs fois en cas de replay
    
    // MAUVAIS: Email sans déduplication
    emailService.send(email); // Envoyé plusieurs fois
    

    21. API REST d’administration

    21.1 Endpoints disponibles

    GET  /admin/pipelines/v2                    → Liste des pipelines
    GET  /admin/pipelines/v2/{name}             → État d'un pipeline
    GET  /admin/pipelines/v2/{name}/stages      → État des stages
    GET  /admin/pipelines/v2/{name}/executions  → Exécutions en cours
    
    GET  /admin/pipelines/v2/{name}/dlq         → Messages en DLQ
    POST /admin/pipelines/v2/{name}/dlq/{id}/replay  → Rejouer un message
    DELETE /admin/pipelines/v2/{name}/dlq/{id}  → Supprimer de la DLQ
    POST /admin/pipelines/v2/{name}/dlq/replay-all   → Rejouer tous
    
    POST /admin/pipelines/v2/{name}/start       → Démarrer le pipeline
    POST /admin/pipelines/v2/{name}/stop        → Arrêter gracieusement
    POST /admin/pipelines/v2/{name}/stop-now    → Arrêter immédiatement
    

    21.2 Exemple de réponse

    GET /admin/pipelines/v2/order-processing
    
    {
      "timestamp": 1735387200000,
      "name": "order-processing",
      "description": "Pipeline de traitement des commandes",
      "running": true,
      "stats": {
        "stage_count": 3,
        "active_executions": 5,
        "total_submitted": 1500,
        "total_completed": 1480,
        "total_failed": 12
      }
    }
    

    22. Métriques Prometheus

    22.1 Métriques exposées

    # Compteurs
    socle_pipeline_v2_messages_total{pipeline="order-processing", stage="validate", status="success"}
    socle_pipeline_v2_messages_total{pipeline="order-processing", stage="validate", status="failed"}
    socle_pipeline_v2_dlq_total{pipeline="order-processing", stage="validate"}
    
    # Gauges
    socle_pipeline_v2_queue_pending{pipeline="order-processing", stage="validate"}
    socle_pipeline_v2_queue_processing{pipeline="order-processing", stage="validate"}
    socle_pipeline_v2_dlq_size{pipeline="order-processing", stage="validate"}
    socle_pipeline_v2_active_executions{pipeline="order-processing"}
    socle_pipeline_v2_running{pipeline="order-processing"}
    
    # Histogrammes (durée de traitement)
    socle_pipeline_v2_stage_duration_seconds{pipeline="order-processing", stage="validate", quantile="0.5"}
    socle_pipeline_v2_stage_duration_seconds{pipeline="order-processing", stage="validate", quantile="0.95"}
    socle_pipeline_v2_stage_duration_seconds{pipeline="order-processing", stage="validate", quantile="0.99"}
    

    22.2 Alertes recommandées

    # DLQ croissante
    - alert: PipelineDLQGrowing
      expr: increase(socle_pipeline_v2_dlq_total[1h]) > 10
      labels:
        severity: warning
    
    # Queue qui s'accumule (backpressure)
    - alert: PipelineQueueBacklog
      expr: socle_pipeline_v2_queue_pending > 1000
      for: 5m
      labels:
        severity: warning
    
    # Pipeline arrêté
    - alert: PipelineStopped
      expr: socle_pipeline_v2_running == 0
      for: 1m
      labels:
        severity: critical
    

    23. Bonnes pratiques V2

    DO

    • Toujours implémenter l’idempotence dans chaque stage
    • Utiliser TechDbPipelineContext en production pour la persistance
    • Configurer des maxRetries appropriés (3-5 pour API externes)
    • Monitorer les métriques DLQ et queue_pending
    • Traiter régulièrement les messages en DLQ (replay ou suppression)

    DON’T

    • Ne jamais supposer qu’un message ne sera traité qu’une fois
    • Ne pas ignorer les messages en DLQ
    • Ne pas mettre de timeout trop courts (risque de faux positifs)
    • Ne pas créer trop de stages (overhead de queues)
    • Ne pas oublier d’appeler pipeline.stop() à l’arrêt de l’application

    24. Migration V1 → V2

    24.1 Changements d’API

    V1 V2
    PipelineEngine.execute(pipeline, input) pipeline.submit(input, executionId)
    Pipeline<I,O> PipelineV2<I,O>
    PipelineBuilder PipelineBuilderV2
    Exécution synchrone Exécution asynchrone

    24.2 Étapes de migration

    1. Rendre les steps idempotents (obligatoire avant migration)
    2. Créer le pipeline avec PipelineBuilderV2
    3. Remplacer execute() par submit()
    4. Ajouter le suivi asynchrone des résultats si nécessaire
    5. Configurer la persistance (TechDbPipelineContext)
    6. Activer les métriques et monitoring DLQ

    25. Configuration Dynamique (YAML)

    Ajouté en version 4.1.0

    25.1 Introduction

    Le Pipeline V2 peut maintenant être configuré entièrement via YAML, permettant :

    • Activation/désactivation des stages sans recompiler
    • Configuration dynamique de la concurrence, timeouts, retries
    • Ajout de nouveaux stages sans modifier le code de la factory
    • Modularité des workers (un worker = un composant Spring)

    25.2 Architecture

    ┌─────────────────────────────────────────────────────────────────┐
    │                        application.yml                           │
    │  cdc:                                                            │
    │    pipeline:                                                     │
    │      stages:                                                     │
    │        - name: filter                                            │
    │          worker: filterWorker                                    │
    │          enabled: true                                           │
    └───────────────────────────────┬─────────────────────────────────┘
                                    │
                                    ▼
    ┌───────────────────────────────────────────────────────────────────┐
    │            PipelineYamlConfig (@ConfigurationProperties)          │
    │                                                                    │
    │  - name: String                                                    │
    │  - stages: List<StageYamlConfig>                                   │
    │  - getEnabledStages(): List<StageYamlConfig>                       │
    └───────────────────────────────┬───────────────────────────────────┘
                                    │
                                    ▼
    ┌───────────────────────────────────────────────────────────────────┐
    │                    DynamicPipelineBuilder                          │
    │                                                                    │
    │  .config(pipelineConfig)                                           │
    │  .workers(Map<String, PipelineStageWorker>)                        │
    │  .build() → PipelineV2<I, O>                                       │
    └───────────────────────────────────────────────────────────────────┘
    

    25.3 Interface PipelineStageWorker

    Chaque stage est implémenté comme un composant Spring implémentant PipelineStageWorker :

    package eu.lmvi.socle.pipeline.v2.worker;
    
    public interface PipelineStageWorker<I, O> {
    
        /**
         * Nom unique du worker (utilisé dans la config YAML)
         */
        String getName();
    
        /**
         * Initialise le worker (chargement ressources, compilation scripts, etc.)
         */
        default void initialize() throws Exception {}
    
        /**
         * Traite un élément d'entrée (DOIT être thread-safe)
         */
        O process(I input) throws Exception;
    
        /**
         * Libère les ressources
         */
        default void shutdown() {}
    
        /**
         * Indique si ce worker est activé
         */
        default boolean isEnabled() { return true; }
    
        /**
         * Description pour le monitoring
         */
        default String getDescription() { return getName(); }
    
        /**
         * Ordre de priorité (plus bas = plus tôt)
         */
        default int getOrder() { return 100; }
    }
    

    25.4 Exemple d’implémentation

    @Component("filterWorker")
    public class FilterStageWorker implements PipelineStageWorker<WalEvent, PipelineMessage> {
    
        @Autowired
        private TableFilter tableFilter;
    
        @Override
        public String getName() {
            return "filter";
        }
    
        @Override
        public int getOrder() {
            return 10;  // Premier dans le pipeline
        }
    
        @Override
        public PipelineMessage process(WalEvent event) throws Exception {
            PipelineMessage message = new PipelineMessage(event);
            if (!tableFilter.isAllowed(event.getTable())) {
                message.markAsFiltered("table_not_in_whitelist");
            }
            return message;
        }
    }
    

    25.5 Configuration YAML

    cdc:
      pipeline:
        name: cdc-pipeline-v2
        description: Pipeline CDC PostgreSQL vers Kafka
        enabled: true
        stages:
          # Stage 1: Filtrage
          - name: filter
            worker: filterWorker        # Nom du bean Spring
            enabled: true
            concurrency: 1
            timeout-seconds: 5
            max-retries: 1
            order: 10
    
          # Stage 2: Transformation
          - name: transform
            worker: transformWorker
            enabled: true
            concurrency: 2
            timeout-seconds: 30
            order: 20
    
          # Stage 3a: Enrichissement Rules
          - name: enrich-rules
            worker: rulesEnrichmentWorker
            enabled: true
            concurrency: 2
            order: 30
    
          # Stage 3b: Enrichissement BeanShell
          - name: enrich-beanshell
            worker: beanshellEnrichmentWorker
            enabled: ${CDC_ENRICH_BSH_ENABLED:true}  # Variable d'environnement
            concurrency: 2
            order: 31
    
          # Stage 3c: Enrichissement JavaScript
          - name: enrich-javascript
            worker: javascriptEnrichmentWorker
            enabled: ${CDC_ENRICH_JS_ENABLED:true}
            concurrency: 4
            timeout-seconds: 60
            order: 32
    
          # Stage 3d: Enrichissement Janino (nouveau)
          - name: enrich-janino
            worker: janinoEnrichmentWorker
            enabled: ${CDC_ENRICH_JANINO_ENABLED:false}  # Désactivé par défaut
            concurrency: 2
            order: 33
    
          # Stage 4: Publication Kafka
          - name: publish
            worker: publishWorker
            enabled: true
            concurrency: 2
            max-retries: 5
            order: 40
    
          # Stage 5: Commit LSN
          - name: commit
            worker: commitWorker
            enabled: true
            concurrency: 1
            order: 50
    

    25.6 Classe de configuration Spring

    @Configuration
    @ConfigurationProperties(prefix = "cdc.pipeline")
    public class CdcPipelineConfig extends PipelineYamlConfig {
        // Hérite de toutes les propriétés
    }
    

    25.7 Construction du pipeline

    @Component
    public class CdcPipelineFactory {
    
        @Autowired
        private CdcPipelineConfig pipelineConfig;
    
        @Autowired
        private Map<String, PipelineStageWorker<?, ?>> availableWorkers;
    
        @PostConstruct
        public void init() {
            // Log des workers disponibles
            log.info("Available workers: {}", availableWorkers.keySet());
    
            // Construire le pipeline dynamiquement
            cdcPipeline = DynamicPipelineBuilder.create()
                .config(pipelineConfig)
                .workers(availableWorkers)
                .build();
    
            log.info("Pipeline created with {} stages",
                pipelineConfig.countEnabledStages());
        }
    }
    

    25.8 Avantages

    Aspect Avant (hardcodé) Après (YAML)
    Modification Recompilation Redémarrage
    Activation stage Code Java Variable d’env
    Ajout stage Modifier factory Ajouter worker + config
    Configuration Dans le code Centralisée
    Environnements Branches différentes Même code, config différente

    25.9 Activation/Désactivation à chaud

    Via variables d’environnement dans .env :

    # Désactiver BeanShell
    CDC_ENRICH_BSH_ENABLED=false
    
    # Activer Janino
    CDC_ENRICH_JANINO_ENABLED=true
    
    # Redémarrer le service
    sudo systemctl restart cdc-reflet-kafka-v02
    

    25.10 Ajout d’un nouveau type d’enrichissement

    1. Créer le worker :
    @Component("monNouveauWorker")
    public class MonNouveauEnrichmentWorker
        implements PipelineStageWorker<PipelineMessage, PipelineMessage> {
        // ...
    }
    
    1. Ajouter dans application.yml :
    stages:
      - name: enrich-nouveau
        worker: monNouveauWorker
        enabled: ${CDC_ENRICH_NOUVEAU_ENABLED:false}
        order: 34
    
    1. Redémarrer – le pipeline inclura automatiquement le nouveau stage.

    26. Références V2

  • Socle V004 – Supervisor

    Socle V004 – Supervisor

    08 – Supervisor

    Version : 4.0.0 Date : 2025-12-09

    1. Introduction

    Le Supervisor est le composant de supervision du Socle V4. Il surveille l’état de santé des Workers via heartbeats et expose des métriques de santé.

    Fonctionnalités

    • Collecte des heartbeats des Workers
    • Détection des Workers défaillants
    • Agrégation de l’état de santé global
    • Exposition via API REST et métriques
    • Intégration avec SharedDataRegistry

    2. Architecture

    ┌─────────────────────────────────────────────────────────────┐
    │                        Supervisor                            │
    │                                                              │
    │   ┌──────────────────┐    ┌──────────────────┐              │
    │   │ HeartbeatCollector│    │ HealthAggregator │              │
    │   └────────┬─────────┘    └────────┬─────────┘              │
    │            │                       │                         │
    │            ▼                       ▼                         │
    │   ┌──────────────────┐    ┌──────────────────┐              │
    │   │ Worker States    │    │ Global Health    │              │
    │   │ (per worker)     │    │ (aggregated)     │              │
    │   └──────────────────┘    └──────────────────┘              │
    │                                                              │
    └──────────────────────────────────────────────────────────────┘
             │                           │
             ▼                           ▼
    ┌─────────────────┐         ┌─────────────────┐
    │ /admin/health   │         │ /admin/workers  │
    └─────────────────┘         └─────────────────┘
    

    3. Configuration

    3.1 application.yml

    socle:
      supervisor:
        heartbeat-interval-ms: ${SUPERVISOR_HEARTBEAT_MS:10000}
        unhealthy-threshold: ${SUPERVISOR_UNHEALTHY_THRESHOLD:3}
        check-interval-ms: ${SUPERVISOR_CHECK_INTERVAL_MS:5000}
        stale-timeout-ms: ${SUPERVISOR_STALE_TIMEOUT_MS:60000}
    

    3.2 Variables d’environnement

    Variable Description Défaut
    SUPERVISOR_HEARTBEAT_MS Intervalle heartbeat attendu 10000 (10s)
    SUPERVISOR_UNHEALTHY_THRESHOLD Heartbeats manqués avant UNHEALTHY 3
    SUPERVISOR_CHECK_INTERVAL_MS Intervalle de vérification 5000 (5s)
    SUPERVISOR_STALE_TIMEOUT_MS Timeout avant worker STALE 60000 (1min)

    4. Interface Supervisor

    package eu.lmvi.socle.supervisor;
    
    public interface Supervisor {
    
        /**
         * Enregistre un worker dans le supervisor
         */
        void registerWorker(Worker worker);
    
        /**
         * Désenregistre un worker
         */
        void unregisterWorker(String workerName);
    
        /**
         * Reçoit un heartbeat d'un worker
         */
        void heartbeat(String workerName);
    
        /**
         * Reçoit un heartbeat avec métriques
         */
        void heartbeat(String workerName, Map<String, Object> metrics);
    
        /**
         * Récupère l'état d'un worker
         */
        WorkerState getWorkerState(String workerName);
    
        /**
         * Récupère l'état de tous les workers
         */
        Map<String, WorkerState> getAllWorkerStates();
    
        /**
         * Vérifie si un worker est healthy
         */
        boolean isWorkerHealthy(String workerName);
    
        /**
         * Récupère l'état de santé global
         */
        HealthStatus getGlobalHealth();
    
        /**
         * Liste les workers unhealthy
         */
        List<String> getUnhealthyWorkers();
    
        /**
         * Démarre la supervision
         */
        void start();
    
        /**
         * Arrête la supervision
         */
        void stop();
    }
    

    5. États des Workers

    5.1 WorkerState

    package eu.lmvi.socle.supervisor;
    
    public record WorkerState(
        String workerName,
        WorkerStatus status,
        Instant lastHeartbeat,
        int missedHeartbeats,
        Map<String, Object> lastMetrics,
        Instant registeredAt
    ) {
        public boolean isHealthy() {
            return status == WorkerStatus.RUNNING;
        }
    
        public boolean isStale() {
            return status == WorkerStatus.STALE;
        }
    }
    

    5.2 WorkerStatus

    public enum WorkerStatus {
        /**
         * Worker enregistré mais pas encore démarré
         */
        REGISTERED,
    
        /**
         * Worker en cours d'exécution, heartbeats reçus
         */
        RUNNING,
    
        /**
         * Heartbeats manqués mais pas encore timeout
         */
        DEGRADED,
    
        /**
         * Trop de heartbeats manqués, worker considéré unhealthy
         */
        UNHEALTHY,
    
        /**
         * Aucun heartbeat depuis longtemps, worker potentiellement mort
         */
        STALE,
    
        /**
         * Worker arrêté proprement
         */
        STOPPED
    }
    

    5.3 Diagramme d’états

                        ┌────────────────┐
                        │   REGISTERED   │
                        └───────┬────────┘
                                │ first heartbeat
                                ▼
                        ┌────────────────┐
             ┌─────────│    RUNNING     │─────────┐
             │         └───────┬────────┘         │
             │                 │                  │
             │ heartbeat       │ missed           │ stop()
             │ received        │ heartbeat        │
             │                 ▼                  │
             │         ┌────────────────┐         │
             └────────►│   DEGRADED     │         │
                       └───────┬────────┘         │
                               │ threshold        │
                               │ exceeded         │
                               ▼                  │
                       ┌────────────────┐         │
                       │   UNHEALTHY    │         │
                       └───────┬────────┘         │
                               │ stale            │
                               │ timeout          │
                               ▼                  │
                       ┌────────────────┐         │
                       │     STALE      │         │
                       └────────────────┘         │
                                                  │
                       ┌────────────────┐         │
                       │    STOPPED     │◄────────┘
                       └────────────────┘
    

    6. Implémentation

    package eu.lmvi.socle.supervisor;
    
    @Component
    public class DefaultSupervisor implements Supervisor {
    
        private static final Logger log = LoggerFactory.getLogger(DefaultSupervisor.class);
    
        private final SocleConfiguration config;
        private final ConcurrentHashMap<String, WorkerStateInternal> workers = new ConcurrentHashMap<>();
        private final ScheduledExecutorService scheduler;
        private volatile boolean running = false;
    
        public DefaultSupervisor(SocleConfiguration config) {
            this.config = config;
            this.scheduler = Executors.newSingleThreadScheduledExecutor(
                r -> new Thread(r, "supervisor-checker"));
        }
    
        @Override
        public void registerWorker(Worker worker) {
            String name = worker.getName();
            workers.put(name, new WorkerStateInternal(
                name, WorkerStatus.REGISTERED, Instant.now(), 0, Map.of(), Instant.now()));
            log.info("Worker registered: {}", name);
        }
    
        @Override
        public void unregisterWorker(String workerName) {
            WorkerStateInternal state = workers.remove(workerName);
            if (state != null) {
                log.info("Worker unregistered: {}", workerName);
            }
        }
    
        @Override
        public void heartbeat(String workerName) {
            heartbeat(workerName, Map.of());
        }
    
        @Override
        public void heartbeat(String workerName, Map<String, Object> metrics) {
            workers.computeIfPresent(workerName, (name, state) -> {
                log.debug("Heartbeat received: {}", name);
                return new WorkerStateInternal(
                    name,
                    WorkerStatus.RUNNING,
                    Instant.now(),
                    0,
                    metrics,
                    state.registeredAt
                );
            });
        }
    
        @Override
        public WorkerState getWorkerState(String workerName) {
            WorkerStateInternal internal = workers.get(workerName);
            return internal != null ? internal.toPublic() : null;
        }
    
        @Override
        public Map<String, WorkerState> getAllWorkerStates() {
            return workers.entrySet().stream()
                .collect(Collectors.toMap(
                    Map.Entry::getKey,
                    e -> e.getValue().toPublic()
                ));
        }
    
        @Override
        public boolean isWorkerHealthy(String workerName) {
            WorkerStateInternal state = workers.get(workerName);
            return state != null && state.status == WorkerStatus.RUNNING;
        }
    
        @Override
        public HealthStatus getGlobalHealth() {
            if (workers.isEmpty()) {
                return HealthStatus.HEALTHY;
            }
    
            boolean hasUnhealthy = workers.values().stream()
                .anyMatch(s -> s.status == WorkerStatus.UNHEALTHY || s.status == WorkerStatus.STALE);
    
            if (hasUnhealthy) {
                return HealthStatus.UNHEALTHY;
            }
    
            boolean hasDegraded = workers.values().stream()
                .anyMatch(s -> s.status == WorkerStatus.DEGRADED);
    
            if (hasDegraded) {
                return HealthStatus.DEGRADED;
            }
    
            return HealthStatus.HEALTHY;
        }
    
        @Override
        public List<String> getUnhealthyWorkers() {
            return workers.entrySet().stream()
                .filter(e -> e.getValue().status == WorkerStatus.UNHEALTHY
                          || e.getValue().status == WorkerStatus.STALE)
                .map(Map.Entry::getKey)
                .toList();
        }
    
        @Override
        public void start() {
            running = true;
            long checkInterval = config.getSupervisor().getCheckIntervalMs();
            scheduler.scheduleAtFixedRate(
                this::checkWorkers,
                checkInterval,
                checkInterval,
                TimeUnit.MILLISECONDS
            );
            log.info("Supervisor started with check interval: {}ms", checkInterval);
        }
    
        @Override
        public void stop() {
            running = false;
            scheduler.shutdown();
            log.info("Supervisor stopped");
        }
    
        private void checkWorkers() {
            if (!running) return;
    
            long heartbeatInterval = config.getSupervisor().getHeartbeatIntervalMs();
            int unhealthyThreshold = config.getSupervisor().getUnhealthyThreshold();
            long staleTimeout = config.getSupervisor().getStaleTimeoutMs();
    
            Instant now = Instant.now();
    
            workers.replaceAll((name, state) -> {
                if (state.status == WorkerStatus.STOPPED) {
                    return state;
                }
    
                long msSinceLastHeartbeat = Duration.between(state.lastHeartbeat, now).toMillis();
    
                // Stale check
                if (msSinceLastHeartbeat > staleTimeout) {
                    if (state.status != WorkerStatus.STALE) {
                        log.warn("Worker STALE: {} (no heartbeat for {}ms)", name, msSinceLastHeartbeat);
                    }
                    return state.withStatus(WorkerStatus.STALE);
                }
    
                // Missed heartbeat check
                int expectedHeartbeats = (int) (msSinceLastHeartbeat / heartbeatInterval);
                if (expectedHeartbeats > 0) {
                    int newMissedCount = state.missedHeartbeats + 1;
    
                    if (newMissedCount >= unhealthyThreshold) {
                        if (state.status != WorkerStatus.UNHEALTHY) {
                            log.warn("Worker UNHEALTHY: {} (missed {} heartbeats)", name, newMissedCount);
                        }
                        return state.withStatus(WorkerStatus.UNHEALTHY).withMissedCount(newMissedCount);
                    } else {
                        if (state.status == WorkerStatus.RUNNING) {
                            log.info("Worker DEGRADED: {} (missed {} heartbeats)", name, newMissedCount);
                        }
                        return state.withStatus(WorkerStatus.DEGRADED).withMissedCount(newMissedCount);
                    }
                }
    
                return state;
            });
        }
    
        private record WorkerStateInternal(
            String workerName,
            WorkerStatus status,
            Instant lastHeartbeat,
            int missedHeartbeats,
            Map<String, Object> lastMetrics,
            Instant registeredAt
        ) {
            WorkerState toPublic() {
                return new WorkerState(workerName, status, lastHeartbeat, missedHeartbeats, lastMetrics, registeredAt);
            }
    
            WorkerStateInternal withStatus(WorkerStatus newStatus) {
                return new WorkerStateInternal(workerName, newStatus, lastHeartbeat, missedHeartbeats, lastMetrics, registeredAt);
            }
    
            WorkerStateInternal withMissedCount(int newCount) {
                return new WorkerStateInternal(workerName, status, lastHeartbeat, newCount, lastMetrics, registeredAt);
            }
        }
    }
    

    7. Heartbeat depuis les Workers

    7.1 Heartbeat manuel

    @Component
    public class MyWorker implements Worker {
    
        @Autowired
        private Supervisor supervisor;
    
        @Override
        public void doWork() {
            // Envoyer heartbeat avec métriques
            supervisor.heartbeat(getName(), Map.of(
                "processed", processedCount,
                "queueSize", queue.size()
            ));
    
            // Traitement...
        }
    }
    

    7.2 Heartbeat automatique via AbstractWorker

    public abstract class AbstractWorker implements Worker {
    
        @Autowired
        private Supervisor supervisor;
    
        @Override
        public final void doWork() {
            // Heartbeat automatique
            supervisor.heartbeat(getName(), getStats());
    
            // Appel au traitement réel
            doProcess();
        }
    
        protected abstract void doProcess();
    }
    

    7.3 Heartbeat via thread dédié

    @Component
    public class LongRunningWorker implements Worker {
    
        @Autowired
        private Supervisor supervisor;
    
        private ScheduledExecutorService heartbeatExecutor;
    
        @Override
        public void start() {
            heartbeatExecutor = Executors.newSingleThreadScheduledExecutor();
            heartbeatExecutor.scheduleAtFixedRate(
                () -> supervisor.heartbeat(getName()),
                0, 10, TimeUnit.SECONDS
            );
        }
    
        @Override
        public void stop() {
            if (heartbeatExecutor != null) {
                heartbeatExecutor.shutdown();
            }
        }
    
        @Override
        public void doWork() {
            // Long processing - heartbeat handled by separate thread
            processLongTask();
        }
    }
    

    8. API REST

    8.1 Endpoints

    @RestController
    @RequestMapping("/admin")
    public class SupervisorController {
    
        @Autowired
        private Supervisor supervisor;
    
        @GetMapping("/health")
        public ResponseEntity<HealthResponse> health() {
            HealthStatus status = supervisor.getGlobalHealth();
            return ResponseEntity
                .status(status == HealthStatus.HEALTHY ? 200 : 503)
                .body(new HealthResponse(status, supervisor.getUnhealthyWorkers()));
        }
    
        @GetMapping("/workers")
        public Map<String, WorkerState> workers() {
            return supervisor.getAllWorkerStates();
        }
    
        @GetMapping("/workers/{name}")
        public ResponseEntity<WorkerState> worker(@PathVariable String name) {
            WorkerState state = supervisor.getWorkerState(name);
            return state != null
                ? ResponseEntity.ok(state)
                : ResponseEntity.notFound().build();
        }
    }
    

    8.2 Réponses

    // GET /admin/health
    {
      "status": "HEALTHY",
      "unhealthyWorkers": []
    }
    
    // GET /admin/workers
    {
      "kafka-consumer": {
        "workerName": "kafka-consumer",
        "status": "RUNNING",
        "lastHeartbeat": "2025-12-09T10:30:00Z",
        "missedHeartbeats": 0,
        "lastMetrics": {
          "processed": 12345,
          "lag": 23
        },
        "registeredAt": "2025-12-09T10:00:00Z"
      },
      "order-processor": {
        "workerName": "order-processor",
        "status": "DEGRADED",
        "lastHeartbeat": "2025-12-09T10:29:45Z",
        "missedHeartbeats": 1,
        "lastMetrics": {},
        "registeredAt": "2025-12-09T10:00:00Z"
      }
    }
    

    9. Intégration Kubernetes

    9.1 Liveness Probe

    livenessProbe:
      httpGet:
        path: /admin/health
        port: 8080
      initialDelaySeconds: 30
      periodSeconds: 10
      failureThreshold: 3
    

    9.2 Readiness Probe

    readinessProbe:
      httpGet:
        path: /admin/health
        port: 8080
      initialDelaySeconds: 5
      periodSeconds: 5
    

    9.3 Health Controller adapté

    @GetMapping("/health/live")
    public ResponseEntity<Void> live() {
        // Liveness: l'application répond
        return ResponseEntity.ok().build();
    }
    
    @GetMapping("/health/ready")
    public ResponseEntity<Void> ready() {
        // Readiness: tous les workers sont healthy
        HealthStatus status = supervisor.getGlobalHealth();
        return status == HealthStatus.HEALTHY
            ? ResponseEntity.ok().build()
            : ResponseEntity.status(503).build();
    }
    

    10. Métriques Prometheus

    @Component
    public class SupervisorMetrics {
    
        private final Supervisor supervisor;
        private final MeterRegistry registry;
    
        @PostConstruct
        public void registerMetrics() {
            Gauge.builder("socle_workers_total", supervisor,
                s -> s.getAllWorkerStates().size())
                .register(registry);
    
            Gauge.builder("socle_workers_healthy", supervisor,
                s -> s.getAllWorkerStates().values().stream()
                    .filter(WorkerState::isHealthy).count())
                .register(registry);
    
            Gauge.builder("socle_workers_unhealthy", supervisor,
                s -> s.getUnhealthyWorkers().size())
                .register(registry);
        }
    }
    

    11. Bonnes pratiques

    DO

    • Envoyer des heartbeats réguliers depuis tous les workers actifs
    • Inclure des métriques utiles dans les heartbeats
    • Configurer des timeouts adaptés à vos workers
    • Utiliser les probes Kubernetes pour la haute disponibilité

    DON’T

    • Ne pas oublier d’envoyer des heartbeats dans les workers long-running
    • Ne pas ignorer les états DEGRADED
    • Ne pas configurer des timeouts trop courts (faux positifs)
    • Ne pas bloquer l’envoi de heartbeat avec du traitement lourd

    12. Références

  • Socle V004 – Pipeline

    Socle V004 – Pipeline

    09 – Pipeline Engine

    Version : 4.1.0 Date : 2025-12-28

    1. Introduction

    Le PipelineEngine permet de définir des chaînes de traitement composées d’étapes (steps) exécutées séquentiellement ou en parallèle.

    Caractéristiques

    • Définition déclarative des pipelines
    • Exécution séquentielle ou parallèle
    • Gestion des erreurs et retry
    • Context partagé entre étapes
    • Métriques et logging intégrés

    2. Architecture

    ┌─────────────────────────────────────────────────────────────┐
    │                      PipelineEngine                          │
    └─────────────────────────────────────────────────────────────┘
                                  │
                                  ▼
    ┌─────────────────────────────────────────────────────────────┐
    │                      Pipeline                                │
    │  ┌─────────────────────────────────────────────────────┐    │
    │  │              PipelineContext                         │    │
    │  │  - input data                                        │    │
    │  │  - shared state                                      │    │
    │  │  - results                                           │    │
    │  └─────────────────────────────────────────────────────┘    │
    │                                                              │
    │  ┌──────────┐   ┌──────────┐   ┌──────────┐                │
    │  │  Step 1  │──►│  Step 2  │──►│  Step 3  │                │
    │  └──────────┘   └──────────┘   └──────────┘                │
    │                                                              │
    └─────────────────────────────────────────────────────────────┘
    

    3. Interface Pipeline

    package eu.lmvi.socle.pipeline;
    
    public interface Pipeline<I, O> {
    
        /**
         * Nom du pipeline
         */
        String getName();
    
        /**
         * Exécute le pipeline
         */
        PipelineResult<O> execute(I input);
    
        /**
         * Exécute le pipeline avec context
         */
        PipelineResult<O> execute(I input, PipelineContext context);
    
        /**
         * Liste des étapes
         */
        List<PipelineStep<?, ?>> getSteps();
    }
    

    4. Interface PipelineStep

    package eu.lmvi.socle.pipeline;
    
    public interface PipelineStep<I, O> {
    
        /**
         * Nom de l'étape
         */
        String getName();
    
        /**
         * Exécute l'étape
         */
        StepResult<O> execute(I input, PipelineContext context);
    
        /**
         * L'étape peut-elle être retryée ?
         */
        default boolean isRetryable() {
            return true;
        }
    
        /**
         * Nombre max de retries
         */
        default int getMaxRetries() {
            return 3;
        }
    
        /**
         * L'étape est-elle optionnelle ?
         */
        default boolean isOptional() {
            return false;
        }
    }
    

    5. Context et Result

    5.1 PipelineContext

    package eu.lmvi.socle.pipeline;
    
    public class PipelineContext {
        private final Map<String, Object> attributes = new ConcurrentHashMap<>();
        private final List<StepResult<?>> stepResults = new ArrayList<>();
        private final String correlationId;
        private final Instant startTime;
    
        public PipelineContext() {
            this.correlationId = UUID.randomUUID().toString();
            this.startTime = Instant.now();
        }
    
        public void put(String key, Object value) {
            attributes.put(key, value);
        }
    
        public <T> Optional<T> get(String key, Class<T> type) {
            return Optional.ofNullable(attributes.get(key))
                .filter(type::isInstance)
                .map(type::cast);
        }
    
        public void addStepResult(StepResult<?> result) {
            stepResults.add(result);
        }
    
        public List<StepResult<?>> getStepResults() {
            return Collections.unmodifiableList(stepResults);
        }
    
        public String getCorrelationId() {
            return correlationId;
        }
    
        public Duration getElapsedTime() {
            return Duration.between(startTime, Instant.now());
        }
    }
    

    5.2 StepResult

    package eu.lmvi.socle.pipeline;
    
    public record StepResult<T>(
        String stepName,
        StepStatus status,
        T output,
        Exception error,
        Duration duration,
        int attempts
    ) {
        public boolean isSuccess() {
            return status == StepStatus.SUCCESS;
        }
    
        public boolean isFailure() {
            return status == StepStatus.FAILURE;
        }
    
        public boolean isSkipped() {
            return status == StepStatus.SKIPPED;
        }
    
        public static <T> StepResult<T> success(String name, T output, Duration duration) {
            return new StepResult<>(name, StepStatus.SUCCESS, output, null, duration, 1);
        }
    
        public static <T> StepResult<T> failure(String name, Exception error, Duration duration, int attempts) {
            return new StepResult<>(name, StepStatus.FAILURE, null, error, duration, attempts);
        }
    
        public static <T> StepResult<T> skipped(String name) {
            return new StepResult<>(name, StepStatus.SKIPPED, null, null, Duration.ZERO, 0);
        }
    }
    
    public enum StepStatus {
        SUCCESS,
        FAILURE,
        SKIPPED
    }
    

    5.3 PipelineResult

    package eu.lmvi.socle.pipeline;
    
    public record PipelineResult<T>(
        String pipelineName,
        PipelineStatus status,
        T output,
        PipelineContext context,
        Duration totalDuration
    ) {
        public boolean isSuccess() {
            return status == PipelineStatus.SUCCESS;
        }
    
        public List<StepResult<?>> getFailedSteps() {
            return context.getStepResults().stream()
                .filter(StepResult::isFailure)
                .toList();
        }
    }
    
    public enum PipelineStatus {
        SUCCESS,
        PARTIAL_SUCCESS,
        FAILURE
    }
    

    6. Implémentation

    6.1 DefaultPipelineEngine

    package eu.lmvi.socle.pipeline;
    
    @Component
    public class DefaultPipelineEngine implements PipelineEngine {
    
        private static final Logger log = LoggerFactory.getLogger(DefaultPipelineEngine.class);
    
        @Override
        public <I, O> PipelineResult<O> execute(Pipeline<I, O> pipeline, I input) {
            return execute(pipeline, input, new PipelineContext());
        }
    
        @Override
        @SuppressWarnings("unchecked")
        public <I, O> PipelineResult<O> execute(Pipeline<I, O> pipeline, I input, PipelineContext context) {
            log.info("[{}] Starting pipeline execution", pipeline.getName());
            Instant start = Instant.now();
    
            Object currentInput = input;
            O finalOutput = null;
            boolean hasFailure = false;
    
            for (PipelineStep<?, ?> step : pipeline.getSteps()) {
                PipelineStep<Object, Object> typedStep = (PipelineStep<Object, Object>) step;
    
                StepResult<Object> result = executeStep(typedStep, currentInput, context);
                context.addStepResult(result);
    
                if (result.isSuccess()) {
                    currentInput = result.output();
                    finalOutput = (O) result.output();
                } else if (result.isFailure()) {
                    if (!step.isOptional()) {
                        hasFailure = true;
                        log.error("[{}] Pipeline failed at step: {}", pipeline.getName(), step.getName());
                        break;
                    }
                    log.warn("[{}] Optional step failed, continuing: {}", pipeline.getName(), step.getName());
                }
            }
    
            Duration duration = Duration.between(start, Instant.now());
            PipelineStatus status = hasFailure
                ? PipelineStatus.FAILURE
                : (context.getStepResults().stream().anyMatch(StepResult::isFailure)
                    ? PipelineStatus.PARTIAL_SUCCESS
                    : PipelineStatus.SUCCESS);
    
            log.info("[{}] Pipeline completed with status: {} in {}ms",
                pipeline.getName(), status, duration.toMillis());
    
            return new PipelineResult<>(pipeline.getName(), status, finalOutput, context, duration);
        }
    
        private <I, O> StepResult<O> executeStep(PipelineStep<I, O> step, I input, PipelineContext context) {
            log.debug("[{}] Executing step", step.getName());
            Instant start = Instant.now();
            int attempts = 0;
            Exception lastError = null;
    
            while (attempts < step.getMaxRetries()) {
                attempts++;
                try {
                    StepResult<O> result = step.execute(input, context);
                    if (result.isSuccess()) {
                        Duration duration = Duration.between(start, Instant.now());
                        log.debug("[{}] Step completed in {}ms", step.getName(), duration.toMillis());
                        return StepResult.success(step.getName(), result.output(), duration);
                    }
                    lastError = result.error();
                } catch (Exception e) {
                    lastError = e;
                    log.warn("[{}] Step attempt {} failed: {}", step.getName(), attempts, e.getMessage());
    
                    if (!step.isRetryable() || attempts >= step.getMaxRetries()) {
                        break;
                    }
    
                    // Backoff
                    try {
                        Thread.sleep(attempts * 1000L);
                    } catch (InterruptedException ie) {
                        Thread.currentThread().interrupt();
                        break;
                    }
                }
            }
    
            Duration duration = Duration.between(start, Instant.now());
            return StepResult.failure(step.getName(), lastError, duration, attempts);
        }
    }
    

    6.2 Pipeline Builder

    package eu.lmvi.socle.pipeline;
    
    public class PipelineBuilder<I, O> {
    
        private final String name;
        private final List<PipelineStep<?, ?>> steps = new ArrayList<>();
    
        public PipelineBuilder(String name) {
            this.name = name;
        }
    
        public static <I, O> PipelineBuilder<I, O> create(String name) {
            return new PipelineBuilder<>(name);
        }
    
        public <NO> PipelineBuilder<I, NO> addStep(PipelineStep<O, NO> step) {
            steps.add(step);
            return (PipelineBuilder<I, NO>) this;
        }
    
        public <NO> PipelineBuilder<I, NO> addStep(String name, Function<O, NO> processor) {
            return addStep(new FunctionalStep<>(name, processor));
        }
    
        public Pipeline<I, O> build() {
            return new DefaultPipeline<>(name, List.copyOf(steps));
        }
    
        private record FunctionalStep<I, O>(String name, Function<I, O> processor) implements PipelineStep<I, O> {
    
            @Override
            public String getName() {
                return name;
            }
    
            @Override
            public StepResult<O> execute(I input, PipelineContext context) {
                O output = processor.apply(input);
                return StepResult.success(name, output, Duration.ZERO);
            }
        }
    }
    

    7. Utilisation

    7.1 Pipeline simple

    @Component
    public class OrderPipeline {
    
        @Autowired
        private PipelineEngine engine;
    
        public PipelineResult<OrderResult> processOrder(Order order) {
            Pipeline<Order, OrderResult> pipeline = PipelineBuilder
                .<Order, OrderResult>create("order-processing")
                .addStep("validate", this::validateOrder)
                .addStep("enrich", this::enrichOrder)
                .addStep("process", this::processOrder)
                .addStep("notify", this::notifyCustomer)
                .build();
    
            return engine.execute(pipeline, order);
        }
    
        private ValidatedOrder validateOrder(Order order) {
            // Validation...
            return new ValidatedOrder(order);
        }
    
        private EnrichedOrder enrichOrder(ValidatedOrder order) {
            // Enrichissement...
            return new EnrichedOrder(order);
        }
    
        private ProcessedOrder processOrder(EnrichedOrder order) {
            // Traitement...
            return new ProcessedOrder(order);
        }
    
        private OrderResult notifyCustomer(ProcessedOrder order) {
            // Notification...
            return new OrderResult(order, "SUCCESS");
        }
    }
    

    7.2 Étapes personnalisées

    public class ValidationStep implements PipelineStep<Order, ValidatedOrder> {
    
        @Override
        public String getName() {
            return "validation";
        }
    
        @Override
        public StepResult<ValidatedOrder> execute(Order input, PipelineContext context) {
            List<String> errors = new ArrayList<>();
    
            if (input.getCustomerId() == null) {
                errors.add("Missing customer ID");
            }
            if (input.getItems().isEmpty()) {
                errors.add("No items in order");
            }
    
            if (!errors.isEmpty()) {
                return StepResult.failure(getName(),
                    new ValidationException(errors),
                    Duration.ZERO, 1);
            }
    
            // Stocker des données dans le context
            context.put("customerId", input.getCustomerId());
    
            return StepResult.success(getName(), new ValidatedOrder(input), Duration.ZERO);
        }
    
        @Override
        public boolean isRetryable() {
            return false;  // Validation ne doit pas être retryée
        }
    }
    

    7.3 Étape optionnelle

    public class NotificationStep implements PipelineStep<ProcessedOrder, ProcessedOrder> {
    
        @Override
        public String getName() {
            return "notification";
        }
    
        @Override
        public StepResult<ProcessedOrder> execute(ProcessedOrder input, PipelineContext context) {
            try {
                sendNotification(input);
                return StepResult.success(getName(), input, Duration.ZERO);
            } catch (Exception e) {
                return StepResult.failure(getName(), e, Duration.ZERO, 1);
            }
        }
    
        @Override
        public boolean isOptional() {
            return true;  // Le pipeline continue même si la notif échoue
        }
    }
    

    7.4 Utilisation du context

    public class EnrichmentStep implements PipelineStep<ValidatedOrder, EnrichedOrder> {
    
        @Autowired
        private CustomerService customerService;
    
        @Override
        public String getName() {
            return "enrichment";
        }
    
        @Override
        public StepResult<EnrichedOrder> execute(ValidatedOrder input, PipelineContext context) {
            // Lire depuis le context
            String customerId = context.get("customerId", String.class).orElseThrow();
    
            // Enrichir
            Customer customer = customerService.getCustomer(customerId);
            EnrichedOrder enriched = new EnrichedOrder(input, customer);
    
            // Écrire dans le context pour les étapes suivantes
            context.put("customerEmail", customer.getEmail());
    
            return StepResult.success(getName(), enriched, Duration.ZERO);
        }
    }
    

    8. Gestion des erreurs

    8.1 Retry automatique

    public class ExternalApiStep implements PipelineStep<Data, ApiResponse> {
    
        @Override
        public String getName() {
            return "external-api-call";
        }
    
        @Override
        public boolean isRetryable() {
            return true;
        }
    
        @Override
        public int getMaxRetries() {
            return 5;  // 5 tentatives max
        }
    
        @Override
        public StepResult<ApiResponse> execute(Data input, PipelineContext context) {
            // Appel API qui peut échouer
            ApiResponse response = callApi(input);
            return StepResult.success(getName(), response, Duration.ZERO);
        }
    }
    

    8.2 Traitement des résultats

    PipelineResult<OrderResult> result = engine.execute(pipeline, order);
    
    if (result.isSuccess()) {
        log.info("Order processed successfully: {}", result.output());
    } else {
        // Analyser les étapes en échec
        for (StepResult<?> stepResult : result.getFailedSteps()) {
            log.error("Step {} failed after {} attempts: {}",
                stepResult.stepName(),
                stepResult.attempts(),
                stepResult.error().getMessage());
        }
    
        // Décider quoi faire
        if (result.status() == PipelineStatus.PARTIAL_SUCCESS) {
            // Certaines étapes optionnelles ont échoué
            handlePartialSuccess(result);
        } else {
            // Échec complet
            handleFailure(result);
        }
    }
    

    9. Pipelines parallèles

    public class ParallelEnrichmentStep implements PipelineStep<Order, EnrichedOrder> {
    
        @Autowired
        private ExecutorService executor;
    
        @Override
        public StepResult<EnrichedOrder> execute(Order input, PipelineContext context) {
            // Exécuter plusieurs enrichissements en parallèle
            CompletableFuture<Customer> customerFuture = CompletableFuture.supplyAsync(
                () -> customerService.getCustomer(input.getCustomerId()), executor);
    
            CompletableFuture<Inventory> inventoryFuture = CompletableFuture.supplyAsync(
                () -> inventoryService.checkInventory(input.getItems()), executor);
    
            CompletableFuture<Pricing> pricingFuture = CompletableFuture.supplyAsync(
                () -> pricingService.calculatePrice(input), executor);
    
            try {
                CompletableFuture.allOf(customerFuture, inventoryFuture, pricingFuture).join();
    
                EnrichedOrder enriched = new EnrichedOrder(
                    input,
                    customerFuture.get(),
                    inventoryFuture.get(),
                    pricingFuture.get()
                );
    
                return StepResult.success(getName(), enriched, Duration.ZERO);
            } catch (Exception e) {
                return StepResult.failure(getName(), e, Duration.ZERO, 1);
            }
        }
    }
    

    10. Métriques

    @Component
    public class PipelineMetrics {
    
        private final MeterRegistry registry;
    
        public void recordPipelineExecution(PipelineResult<?> result) {
            Timer.builder("socle_pipeline_duration")
                .tag("pipeline", result.pipelineName())
                .tag("status", result.status().name())
                .register(registry)
                .record(result.totalDuration());
    
            Counter.builder("socle_pipeline_executions")
                .tag("pipeline", result.pipelineName())
                .tag("status", result.status().name())
                .register(registry)
                .increment();
        }
    
        public void recordStepExecution(StepResult<?> result) {
            Timer.builder("socle_pipeline_step_duration")
                .tag("step", result.stepName())
                .tag("status", result.status().name())
                .register(registry)
                .record(result.duration());
        }
    }
    

    11. Bonnes pratiques

    DO

    • Garder les étapes petites et focalisées
    • Utiliser le context pour partager des données entre étapes
    • Marquer les étapes non-critiques comme optionnelles
    • Logger au niveau des étapes pour le debugging
    • Configurer des retries appropriés pour les appels externes

    DON’T

    • Ne pas faire de pipelines avec trop d’étapes (> 10)
    • Ne pas mélanger logique métier et infrastructure dans une étape
    • Ne pas ignorer les erreurs des étapes obligatoires
    • Ne pas utiliser de pipelines pour du traitement simple

    12. Références

    Pipeline Engine V2 (Nouveau)

    Ajouté en version 4.1.0

    13. Introduction Pipeline V2

    Le Pipeline V2 est une refonte complète du système de pipelines, conçue pour les applications à haute disponibilité nécessitant :

    • Garantie at-least-once : Aucune perte de message grâce au pattern Queue/Claim/Ack
    • Virtual Threads (Java 21) : Scalabilité maximale avec les threads virtuels
    • Reprise après crash : État persisté permettant la reprise au dernier stage OK
    • DLQ (Dead Letter Queue) : Gestion des échecs définitifs avec replay

    Quand utiliser V2 vs V1 ?

    Critère Pipeline V1 Pipeline V2
    Exécution Synchrone, mono-thread Asynchrone, multi-thread
    Garantie de livraison At-most-once At-least-once
    Persistance Non Oui (TechDB / H2)
    DLQ Non Oui
    Use case Traitement simple Flux critiques, haute disponibilité

    14. Architecture V2

    ┌─────────────────────────────────────────────────────────────────┐
    │                            MOP                                   │
    │               (supervision, restart, health)                     │
    └───────────────────────────┬─────────────────────────────────────┘
                                │
    ┌───────────────────────────▼─────────────────────────────────────┐
    │                     PipelineV2                                   │
    │                                                                  │
    │  ┌────────────┐     ┌────────────┐     ┌────────────┐          │
    │  │  Stage A   │────►│  Stage B   │────►│  Stage C   │          │
    │  │ ┌────────┐ │     │ ┌────────┐ │     │ ┌────────┐ │          │
    │  │ │ Queue  │ │     │ │ Queue  │ │     │ │ Queue  │ │          │
    │  │ └────────┘ │     │ └────────┘ │     │ └────────┘ │          │
    │  │ VThreads  │     │ VThreads  │     │ VThreads  │          │
    │  └────────────┘     └────────────┘     └────────────┘          │
    │                                                                  │
    │         ┌─────────────────┴─────────────────┐                   │
    │    ┌────▼────┐                        ┌────▼────┐              │
    │    │ Context │                        │   DLQ   │              │
    │    │ (state) │                        │ (errors)│              │
    │    └─────────┘                        └─────────┘              │
    └─────────────────────────────────────────────────────────────────┘
    

    Composants

    • StageQueue : Queue avec sémantique Claim/Ack entre chaque stage
    • VThreadStageExecutor : Workers Virtual Threads consommant les queues
    • PersistentPipelineContext : État persisté pour reprise après crash
    • DLQ : Messages en échec après max retries

    15. Création d’un Pipeline V2

    15.1 Builder Fluent

    import eu.lmvi.socle.pipeline.v2.PipelineBuilderV2;
    import eu.lmvi.socle.pipeline.v2.PipelineV2;
    
    // Création du pipeline
    PipelineV2<Order, Result> pipeline = PipelineBuilderV2
        .<Order>create("order-processing")
        .description("Pipeline de traitement des commandes")
    
        // Stage 1: Validation (2 workers, timeout 30s)
        .addStage("validate", this::validateOrder)
            .concurrency(2)
            .timeout(Duration.ofSeconds(30))
    
        // Stage 2: Enrichissement (4 workers, queue 1000)
        .addStage("enrich", this::enrichOrder)
            .concurrency(4)
            .queueSize(1000)
    
        // Stage 3: Publication (2 workers)
        .addStage("publish", this::publishOrder)
            .concurrency(2)
            .maxRetries(5)
            .backoff(Duration.ofMillis(200), 2.0)
    
        .build();
    

    15.2 Configuration par stage

    .addStage("nom", processor)
        .concurrency(4)           // Nombre de workers parallèles
        .queueSize(500)           // Taille max de la queue
        .timeout(Duration.ofMinutes(5))  // Timeout par message
        .maxRetries(3)            // Tentatives avant DLQ
        .backoff(base, multiplier) // Backoff exponentiel
        .optional()               // Stage optionnel (échec non bloquant)
    

    16. Cycle de vie

    16.1 Démarrage/Arrêt

    // Démarrer le pipeline (lance tous les workers)
    pipeline.start();
    
    // Vérifier l'état
    boolean running = pipeline.isRunning();
    
    // Arrêt gracieux (attend la fin des traitements en cours)
    pipeline.stop();
    
    // Arrêt immédiat (interrompt tout)
    pipeline.stopNow();
    

    16.2 Soumission de messages

    // Soumettre un message
    String messageId = pipeline.submit(order, "exec-001");
    
    // Avec métadonnées
    String messageId = pipeline.submit(order, "exec-001", Map.of(
        "source", "api",
        "priority", "high"
    ));
    

    16.3 Suivi d’exécution

    // État d'une exécution
    Optional<PipelineState> state = pipeline.getExecutionState("exec-001");
    state.ifPresent(s -> {
        System.out.println("Status: " + s.status());
        System.out.println("Current stage: " + s.currentStage());
        System.out.println("Duration: " + s.duration());
    });
    
    // Liste des exécutions actives
    List<PipelineState> active = pipeline.listActiveExecutions();
    
    // Statistiques globales
    PipelineV2.PipelineStats stats = pipeline.getStats();
    System.out.println("Submitted: " + stats.totalSubmitted());
    System.out.println("Completed: " + stats.totalCompleted());
    System.out.println("Failed: " + stats.totalFailed());
    

    17. Queue/Claim/Ack

    17.1 Pattern de consommation

    Le Pipeline V2 utilise un pattern de consommation fiable :

    1. CLAIM   : Le worker réserve un message (lease)
    2. PROCESS : Le worker traite le message
    3. ACK     : Succès → message supprimé
       ou
       NACK    : Échec → retry ou DLQ
    

    17.2 Lease et récupération

    Si un worker crash pendant le traitement :

    • Le lease expire après le timeout configuré
    • Le message est automatiquement remis en queue
    • Un autre worker le reprend
    // Timeout de lease par défaut: 5 minutes
    // Configurable par stage via timeout()
    

    17.3 Implémentations disponibles

    Implémentation Persistance Use case
    InMemoryStageQueue Non Développement, tests
    TechDbStageQueue Oui (H2) Production

    18. Dead Letter Queue (DLQ)

    18.1 Envoi en DLQ

    Un message est envoyé en DLQ après :

    • maxRetries tentatives échouées
    • Une erreur non-retryable

    18.2 Consultation

    // Via API REST
    GET /admin/pipelines/v2/{name}/dlq
    
    // Programmatiquement (via StageQueue)
    List<DlqMessage<Order>> messages = queue.peekDlq(100);
    for (DlqMessage<Order> msg : messages) {
        System.out.println("ID: " + msg.messageId());
        System.out.println("Error: " + msg.errorMessage());
        System.out.println("Attempts: " + msg.attempts());
        System.out.println("Failed at: " + msg.failedAt());
    }
    

    18.3 Replay

    // Via API REST
    POST /admin/pipelines/v2/{name}/dlq/{messageId}/replay
    
    // Programmatiquement
    boolean replayed = queue.replayFromDlq(messageId);
    
    // Replay tous les messages
    int count = queue.replayAllFromDlq("exec-001");
    

    18.4 Suppression

    // Via API REST
    DELETE /admin/pipelines/v2/{name}/dlq/{messageId}
    
    // Programmatiquement
    queue.deleteFromDlq(messageId);
    
    // Purge complète
    queue.purgeDlq();
    

    19. Persistance et reprise

    19.1 Context persistant

    Le Pipeline V2 peut persister son état pour survivre aux redémarrages :

    import eu.lmvi.socle.pipeline.context.TechDbPipelineContext;
    
    // Créer un context persistant
    Supplier<Connection> connectionSupplier = () -> dataSource.getConnection();
    PersistentPipelineContext context = new TechDbPipelineContext(connectionSupplier);
    
    // Utiliser avec le builder
    PipelineV2<Order, Result> pipeline = PipelineBuilderV2
        .<Order>create("order-processing")
        .context(context)  // Utilise le context persistant
        .addStage(...)
        .build();
    

    19.2 Reprise après crash

    Au redémarrage, les messages non-ackés sont automatiquement re-traités :

    1. Les messages en status CLAIMED dont le lease a expiré → remis en PENDING
    2. Les stages reprennent leur consommation normalement
    3. Grâce à l’idempotence, les messages déjà traités ne sont pas re-traités

    20. Idempotence (OBLIGATOIRE)

    20.1 Pourquoi ?

    Le modèle at-least-once signifie qu’un message peut être traité plusieurs fois :

    • Crash après traitement mais avant ACK
    • Timeout de lease pendant un traitement long
    • Replay depuis la DLQ

    Chaque stage DOIT être idempotent.

    20.2 Patterns recommandés

    // Pattern 1: Vérifier "déjà traité"
    public Order processOrder(Order order) {
        if (orderRepository.exists(order.getId())) {
            return orderRepository.get(order.getId()); // Déjà traité
        }
        // Traiter...
        return orderRepository.save(order);
    }
    
    // Pattern 2: UPSERT au lieu d'INSERT
    public void saveOrder(Order order) {
        orderRepository.upsert(order); // Idempotent par nature
    }
    
    // Pattern 3: Utiliser executionId comme clé
    public void processWithDedup(Order order, String executionId) {
        String dedupKey = "order:" + order.getId() + ":" + executionId;
        if (kvBus.exists(dedupKey)) {
            return; // Déjà traité pour cette exécution
        }
    
        // Traiter...
    
        kvBus.set(dedupKey, true, Duration.ofDays(7));
    }
    

    20.3 Anti-patterns

    // MAUVAIS: INSERT sans vérification
    db.insert(order); // Échoue si déjà inséré
    
    // MAUVAIS: Compteur incrémental
    counter.increment(); // Compté plusieurs fois en cas de replay
    
    // MAUVAIS: Email sans déduplication
    emailService.send(email); // Envoyé plusieurs fois
    

    21. API REST d’administration

    21.1 Endpoints disponibles

    GET  /admin/pipelines/v2                    → Liste des pipelines
    GET  /admin/pipelines/v2/{name}             → État d'un pipeline
    GET  /admin/pipelines/v2/{name}/stages      → État des stages
    GET  /admin/pipelines/v2/{name}/executions  → Exécutions en cours
    
    GET  /admin/pipelines/v2/{name}/dlq         → Messages en DLQ
    POST /admin/pipelines/v2/{name}/dlq/{id}/replay  → Rejouer un message
    DELETE /admin/pipelines/v2/{name}/dlq/{id}  → Supprimer de la DLQ
    POST /admin/pipelines/v2/{name}/dlq/replay-all   → Rejouer tous
    
    POST /admin/pipelines/v2/{name}/start       → Démarrer le pipeline
    POST /admin/pipelines/v2/{name}/stop        → Arrêter gracieusement
    POST /admin/pipelines/v2/{name}/stop-now    → Arrêter immédiatement
    

    21.2 Exemple de réponse

    GET /admin/pipelines/v2/order-processing
    
    {
      "timestamp": 1735387200000,
      "name": "order-processing",
      "description": "Pipeline de traitement des commandes",
      "running": true,
      "stats": {
        "stage_count": 3,
        "active_executions": 5,
        "total_submitted": 1500,
        "total_completed": 1480,
        "total_failed": 12
      }
    }
    

    22. Métriques Prometheus

    22.1 Métriques exposées

    # Compteurs
    socle_pipeline_v2_messages_total{pipeline="order-processing", stage="validate", status="success"}
    socle_pipeline_v2_messages_total{pipeline="order-processing", stage="validate", status="failed"}
    socle_pipeline_v2_dlq_total{pipeline="order-processing", stage="validate"}
    
    # Gauges
    socle_pipeline_v2_queue_pending{pipeline="order-processing", stage="validate"}
    socle_pipeline_v2_queue_processing{pipeline="order-processing", stage="validate"}
    socle_pipeline_v2_dlq_size{pipeline="order-processing", stage="validate"}
    socle_pipeline_v2_active_executions{pipeline="order-processing"}
    socle_pipeline_v2_running{pipeline="order-processing"}
    
    # Histogrammes (durée de traitement)
    socle_pipeline_v2_stage_duration_seconds{pipeline="order-processing", stage="validate", quantile="0.5"}
    socle_pipeline_v2_stage_duration_seconds{pipeline="order-processing", stage="validate", quantile="0.95"}
    socle_pipeline_v2_stage_duration_seconds{pipeline="order-processing", stage="validate", quantile="0.99"}
    

    22.2 Alertes recommandées

    # DLQ croissante
    - alert: PipelineDLQGrowing
      expr: increase(socle_pipeline_v2_dlq_total[1h]) > 10
      labels:
        severity: warning
    
    # Queue qui s'accumule (backpressure)
    - alert: PipelineQueueBacklog
      expr: socle_pipeline_v2_queue_pending > 1000
      for: 5m
      labels:
        severity: warning
    
    # Pipeline arrêté
    - alert: PipelineStopped
      expr: socle_pipeline_v2_running == 0
      for: 1m
      labels:
        severity: critical
    

    23. Bonnes pratiques V2

    DO

    • Toujours implémenter l’idempotence dans chaque stage
    • Utiliser TechDbPipelineContext en production pour la persistance
    • Configurer des maxRetries appropriés (3-5 pour API externes)
    • Monitorer les métriques DLQ et queue_pending
    • Traiter régulièrement les messages en DLQ (replay ou suppression)

    DON’T

    • Ne jamais supposer qu’un message ne sera traité qu’une fois
    • Ne pas ignorer les messages en DLQ
    • Ne pas mettre de timeout trop courts (risque de faux positifs)
    • Ne pas créer trop de stages (overhead de queues)
    • Ne pas oublier d’appeler pipeline.stop() à l’arrêt de l’application

    24. Migration V1 → V2

    24.1 Changements d’API

    V1 V2
    PipelineEngine.execute(pipeline, input) pipeline.submit(input, executionId)
    Pipeline<I,O> PipelineV2<I,O>
    PipelineBuilder PipelineBuilderV2
    Exécution synchrone Exécution asynchrone

    24.2 Étapes de migration

    1. Rendre les steps idempotents (obligatoire avant migration)
    2. Créer le pipeline avec PipelineBuilderV2
    3. Remplacer execute() par submit()
    4. Ajouter le suivi asynchrone des résultats si nécessaire
    5. Configurer la persistance (TechDbPipelineContext)
    6. Activer les métriques et monitoring DLQ

    25. Configuration Dynamique (YAML)

    Ajouté en version 4.1.0

    25.1 Introduction

    Le Pipeline V2 peut maintenant être configuré entièrement via YAML, permettant :

    • Activation/désactivation des stages sans recompiler
    • Configuration dynamique de la concurrence, timeouts, retries
    • Ajout de nouveaux stages sans modifier le code de la factory
    • Modularité des workers (un worker = un composant Spring)

    25.2 Architecture

    ┌─────────────────────────────────────────────────────────────────┐
    │                        application.yml                           │
    │  cdc:                                                            │
    │    pipeline:                                                     │
    │      stages:                                                     │
    │        - name: filter                                            │
    │          worker: filterWorker                                    │
    │          enabled: true                                           │
    └───────────────────────────────┬─────────────────────────────────┘
                                    │
                                    ▼
    ┌───────────────────────────────────────────────────────────────────┐
    │            PipelineYamlConfig (@ConfigurationProperties)          │
    │                                                                    │
    │  - name: String                                                    │
    │  - stages: List<StageYamlConfig>                                   │
    │  - getEnabledStages(): List<StageYamlConfig>                       │
    └───────────────────────────────┬───────────────────────────────────┘
                                    │
                                    ▼
    ┌───────────────────────────────────────────────────────────────────┐
    │                    DynamicPipelineBuilder                          │
    │                                                                    │
    │  .config(pipelineConfig)                                           │
    │  .workers(Map<String, PipelineStageWorker>)                        │
    │  .build() → PipelineV2<I, O>                                       │
    └───────────────────────────────────────────────────────────────────┘
    

    25.3 Interface PipelineStageWorker

    Chaque stage est implémenté comme un composant Spring implémentant PipelineStageWorker :

    package eu.lmvi.socle.pipeline.v2.worker;
    
    public interface PipelineStageWorker<I, O> {
    
        /**
         * Nom unique du worker (utilisé dans la config YAML)
         */
        String getName();
    
        /**
         * Initialise le worker (chargement ressources, compilation scripts, etc.)
         */
        default void initialize() throws Exception {}
    
        /**
         * Traite un élément d'entrée (DOIT être thread-safe)
         */
        O process(I input) throws Exception;
    
        /**
         * Libère les ressources
         */
        default void shutdown() {}
    
        /**
         * Indique si ce worker est activé
         */
        default boolean isEnabled() { return true; }
    
        /**
         * Description pour le monitoring
         */
        default String getDescription() { return getName(); }
    
        /**
         * Ordre de priorité (plus bas = plus tôt)
         */
        default int getOrder() { return 100; }
    }
    

    25.4 Exemple d’implémentation

    @Component("filterWorker")
    public class FilterStageWorker implements PipelineStageWorker<WalEvent, PipelineMessage> {
    
        @Autowired
        private TableFilter tableFilter;
    
        @Override
        public String getName() {
            return "filter";
        }
    
        @Override
        public int getOrder() {
            return 10;  // Premier dans le pipeline
        }
    
        @Override
        public PipelineMessage process(WalEvent event) throws Exception {
            PipelineMessage message = new PipelineMessage(event);
            if (!tableFilter.isAllowed(event.getTable())) {
                message.markAsFiltered("table_not_in_whitelist");
            }
            return message;
        }
    }
    

    25.5 Configuration YAML

    cdc:
      pipeline:
        name: cdc-pipeline-v2
        description: Pipeline CDC PostgreSQL vers Kafka
        enabled: true
        stages:
          # Stage 1: Filtrage
          - name: filter
            worker: filterWorker        # Nom du bean Spring
            enabled: true
            concurrency: 1
            timeout-seconds: 5
            max-retries: 1
            order: 10
    
          # Stage 2: Transformation
          - name: transform
            worker: transformWorker
            enabled: true
            concurrency: 2
            timeout-seconds: 30
            order: 20
    
          # Stage 3a: Enrichissement Rules
          - name: enrich-rules
            worker: rulesEnrichmentWorker
            enabled: true
            concurrency: 2
            order: 30
    
          # Stage 3b: Enrichissement BeanShell
          - name: enrich-beanshell
            worker: beanshellEnrichmentWorker
            enabled: ${CDC_ENRICH_BSH_ENABLED:true}  # Variable d'environnement
            concurrency: 2
            order: 31
    
          # Stage 3c: Enrichissement JavaScript
          - name: enrich-javascript
            worker: javascriptEnrichmentWorker
            enabled: ${CDC_ENRICH_JS_ENABLED:true}
            concurrency: 4
            timeout-seconds: 60
            order: 32
    
          # Stage 3d: Enrichissement Janino (nouveau)
          - name: enrich-janino
            worker: janinoEnrichmentWorker
            enabled: ${CDC_ENRICH_JANINO_ENABLED:false}  # Désactivé par défaut
            concurrency: 2
            order: 33
    
          # Stage 4: Publication Kafka
          - name: publish
            worker: publishWorker
            enabled: true
            concurrency: 2
            max-retries: 5
            order: 40
    
          # Stage 5: Commit LSN
          - name: commit
            worker: commitWorker
            enabled: true
            concurrency: 1
            order: 50
    

    25.6 Classe de configuration Spring

    @Configuration
    @ConfigurationProperties(prefix = "cdc.pipeline")
    public class CdcPipelineConfig extends PipelineYamlConfig {
        // Hérite de toutes les propriétés
    }
    

    25.7 Construction du pipeline

    @Component
    public class CdcPipelineFactory {
    
        @Autowired
        private CdcPipelineConfig pipelineConfig;
    
        @Autowired
        private Map<String, PipelineStageWorker<?, ?>> availableWorkers;
    
        @PostConstruct
        public void init() {
            // Log des workers disponibles
            log.info("Available workers: {}", availableWorkers.keySet());
    
            // Construire le pipeline dynamiquement
            cdcPipeline = DynamicPipelineBuilder.create()
                .config(pipelineConfig)
                .workers(availableWorkers)
                .build();
    
            log.info("Pipeline created with {} stages",
                pipelineConfig.countEnabledStages());
        }
    }
    

    25.8 Avantages

    Aspect Avant (hardcodé) Après (YAML)
    Modification Recompilation Redémarrage
    Activation stage Code Java Variable d’env
    Ajout stage Modifier factory Ajouter worker + config
    Configuration Dans le code Centralisée
    Environnements Branches différentes Même code, config différente

    25.9 Activation/Désactivation à chaud

    Via variables d’environnement dans .env :

    # Désactiver BeanShell
    CDC_ENRICH_BSH_ENABLED=false
    
    # Activer Janino
    CDC_ENRICH_JANINO_ENABLED=true
    
    # Redémarrer le service
    sudo systemctl restart cdc-reflet-kafka-v02
    

    25.10 Ajout d’un nouveau type d’enrichissement

    1. Créer le worker :
    @Component("monNouveauWorker")
    public class MonNouveauEnrichmentWorker
        implements PipelineStageWorker<PipelineMessage, PipelineMessage> {
        // ...
    }
    
    1. Ajouter dans application.yml :
    stages:
      - name: enrich-nouveau
        worker: monNouveauWorker
        enabled: ${CDC_ENRICH_NOUVEAU_ENABLED:false}
        order: 34
    
    1. Redémarrer – le pipeline inclura automatiquement le nouveau stage.

    26. Références V2

  • Socle V004 – Supervisor

    Socle V004 – Supervisor

    08 – Supervisor

    Version : 4.0.0 Date : 2025-12-09

    1. Introduction

    Le Supervisor est le composant de supervision du Socle V4. Il surveille l’état de santé des Workers via heartbeats et expose des métriques de santé.

    Fonctionnalités

    • Collecte des heartbeats des Workers
    • Détection des Workers défaillants
    • Agrégation de l’état de santé global
    • Exposition via API REST et métriques
    • Intégration avec SharedDataRegistry

    2. Architecture

    ┌─────────────────────────────────────────────────────────────┐
    │                        Supervisor                            │
    │                                                              │
    │   ┌──────────────────┐    ┌──────────────────┐              │
    │   │ HeartbeatCollector│    │ HealthAggregator │              │
    │   └────────┬─────────┘    └────────┬─────────┘              │
    │            │                       │                         │
    │            ▼                       ▼                         │
    │   ┌──────────────────┐    ┌──────────────────┐              │
    │   │ Worker States    │    │ Global Health    │              │
    │   │ (per worker)     │    │ (aggregated)     │              │
    │   └──────────────────┘    └──────────────────┘              │
    │                                                              │
    └──────────────────────────────────────────────────────────────┘
             │                           │
             ▼                           ▼
    ┌─────────────────┐         ┌─────────────────┐
    │ /admin/health   │         │ /admin/workers  │
    └─────────────────┘         └─────────────────┘
    

    3. Configuration

    3.1 application.yml

    socle:
      supervisor:
        heartbeat-interval-ms: ${SUPERVISOR_HEARTBEAT_MS:10000}
        unhealthy-threshold: ${SUPERVISOR_UNHEALTHY_THRESHOLD:3}
        check-interval-ms: ${SUPERVISOR_CHECK_INTERVAL_MS:5000}
        stale-timeout-ms: ${SUPERVISOR_STALE_TIMEOUT_MS:60000}
    

    3.2 Variables d’environnement

    Variable Description Défaut
    SUPERVISOR_HEARTBEAT_MS Intervalle heartbeat attendu 10000 (10s)
    SUPERVISOR_UNHEALTHY_THRESHOLD Heartbeats manqués avant UNHEALTHY 3
    SUPERVISOR_CHECK_INTERVAL_MS Intervalle de vérification 5000 (5s)
    SUPERVISOR_STALE_TIMEOUT_MS Timeout avant worker STALE 60000 (1min)

    4. Interface Supervisor

    package eu.lmvi.socle.supervisor;
    
    public interface Supervisor {
    
        /**
         * Enregistre un worker dans le supervisor
         */
        void registerWorker(Worker worker);
    
        /**
         * Désenregistre un worker
         */
        void unregisterWorker(String workerName);
    
        /**
         * Reçoit un heartbeat d'un worker
         */
        void heartbeat(String workerName);
    
        /**
         * Reçoit un heartbeat avec métriques
         */
        void heartbeat(String workerName, Map<String, Object> metrics);
    
        /**
         * Récupère l'état d'un worker
         */
        WorkerState getWorkerState(String workerName);
    
        /**
         * Récupère l'état de tous les workers
         */
        Map<String, WorkerState> getAllWorkerStates();
    
        /**
         * Vérifie si un worker est healthy
         */
        boolean isWorkerHealthy(String workerName);
    
        /**
         * Récupère l'état de santé global
         */
        HealthStatus getGlobalHealth();
    
        /**
         * Liste les workers unhealthy
         */
        List<String> getUnhealthyWorkers();
    
        /**
         * Démarre la supervision
         */
        void start();
    
        /**
         * Arrête la supervision
         */
        void stop();
    }
    

    5. États des Workers

    5.1 WorkerState

    package eu.lmvi.socle.supervisor;
    
    public record WorkerState(
        String workerName,
        WorkerStatus status,
        Instant lastHeartbeat,
        int missedHeartbeats,
        Map<String, Object> lastMetrics,
        Instant registeredAt
    ) {
        public boolean isHealthy() {
            return status == WorkerStatus.RUNNING;
        }
    
        public boolean isStale() {
            return status == WorkerStatus.STALE;
        }
    }
    

    5.2 WorkerStatus

    public enum WorkerStatus {
        /**
         * Worker enregistré mais pas encore démarré
         */
        REGISTERED,
    
        /**
         * Worker en cours d'exécution, heartbeats reçus
         */
        RUNNING,
    
        /**
         * Heartbeats manqués mais pas encore timeout
         */
        DEGRADED,
    
        /**
         * Trop de heartbeats manqués, worker considéré unhealthy
         */
        UNHEALTHY,
    
        /**
         * Aucun heartbeat depuis longtemps, worker potentiellement mort
         */
        STALE,
    
        /**
         * Worker arrêté proprement
         */
        STOPPED
    }
    

    5.3 Diagramme d’états

                        ┌────────────────┐
                        │   REGISTERED   │
                        └───────┬────────┘
                                │ first heartbeat
                                ▼
                        ┌────────────────┐
             ┌─────────│    RUNNING     │─────────┐
             │         └───────┬────────┘         │
             │                 │                  │
             │ heartbeat       │ missed           │ stop()
             │ received        │ heartbeat        │
             │                 ▼                  │
             │         ┌────────────────┐         │
             └────────►│   DEGRADED     │         │
                       └───────┬────────┘         │
                               │ threshold        │
                               │ exceeded         │
                               ▼                  │
                       ┌────────────────┐         │
                       │   UNHEALTHY    │         │
                       └───────┬────────┘         │
                               │ stale            │
                               │ timeout          │
                               ▼                  │
                       ┌────────────────┐         │
                       │     STALE      │         │
                       └────────────────┘         │
                                                  │
                       ┌────────────────┐         │
                       │    STOPPED     │◄────────┘
                       └────────────────┘
    

    6. Implémentation

    package eu.lmvi.socle.supervisor;
    
    @Component
    public class DefaultSupervisor implements Supervisor {
    
        private static final Logger log = LoggerFactory.getLogger(DefaultSupervisor.class);
    
        private final SocleConfiguration config;
        private final ConcurrentHashMap<String, WorkerStateInternal> workers = new ConcurrentHashMap<>();
        private final ScheduledExecutorService scheduler;
        private volatile boolean running = false;
    
        public DefaultSupervisor(SocleConfiguration config) {
            this.config = config;
            this.scheduler = Executors.newSingleThreadScheduledExecutor(
                r -> new Thread(r, "supervisor-checker"));
        }
    
        @Override
        public void registerWorker(Worker worker) {
            String name = worker.getName();
            workers.put(name, new WorkerStateInternal(
                name, WorkerStatus.REGISTERED, Instant.now(), 0, Map.of(), Instant.now()));
            log.info("Worker registered: {}", name);
        }
    
        @Override
        public void unregisterWorker(String workerName) {
            WorkerStateInternal state = workers.remove(workerName);
            if (state != null) {
                log.info("Worker unregistered: {}", workerName);
            }
        }
    
        @Override
        public void heartbeat(String workerName) {
            heartbeat(workerName, Map.of());
        }
    
        @Override
        public void heartbeat(String workerName, Map<String, Object> metrics) {
            workers.computeIfPresent(workerName, (name, state) -> {
                log.debug("Heartbeat received: {}", name);
                return new WorkerStateInternal(
                    name,
                    WorkerStatus.RUNNING,
                    Instant.now(),
                    0,
                    metrics,
                    state.registeredAt
                );
            });
        }
    
        @Override
        public WorkerState getWorkerState(String workerName) {
            WorkerStateInternal internal = workers.get(workerName);
            return internal != null ? internal.toPublic() : null;
        }
    
        @Override
        public Map<String, WorkerState> getAllWorkerStates() {
            return workers.entrySet().stream()
                .collect(Collectors.toMap(
                    Map.Entry::getKey,
                    e -> e.getValue().toPublic()
                ));
        }
    
        @Override
        public boolean isWorkerHealthy(String workerName) {
            WorkerStateInternal state = workers.get(workerName);
            return state != null && state.status == WorkerStatus.RUNNING;
        }
    
        @Override
        public HealthStatus getGlobalHealth() {
            if (workers.isEmpty()) {
                return HealthStatus.HEALTHY;
            }
    
            boolean hasUnhealthy = workers.values().stream()
                .anyMatch(s -> s.status == WorkerStatus.UNHEALTHY || s.status == WorkerStatus.STALE);
    
            if (hasUnhealthy) {
                return HealthStatus.UNHEALTHY;
            }
    
            boolean hasDegraded = workers.values().stream()
                .anyMatch(s -> s.status == WorkerStatus.DEGRADED);
    
            if (hasDegraded) {
                return HealthStatus.DEGRADED;
            }
    
            return HealthStatus.HEALTHY;
        }
    
        @Override
        public List<String> getUnhealthyWorkers() {
            return workers.entrySet().stream()
                .filter(e -> e.getValue().status == WorkerStatus.UNHEALTHY
                          || e.getValue().status == WorkerStatus.STALE)
                .map(Map.Entry::getKey)
                .toList();
        }
    
        @Override
        public void start() {
            running = true;
            long checkInterval = config.getSupervisor().getCheckIntervalMs();
            scheduler.scheduleAtFixedRate(
                this::checkWorkers,
                checkInterval,
                checkInterval,
                TimeUnit.MILLISECONDS
            );
            log.info("Supervisor started with check interval: {}ms", checkInterval);
        }
    
        @Override
        public void stop() {
            running = false;
            scheduler.shutdown();
            log.info("Supervisor stopped");
        }
    
        private void checkWorkers() {
            if (!running) return;
    
            long heartbeatInterval = config.getSupervisor().getHeartbeatIntervalMs();
            int unhealthyThreshold = config.getSupervisor().getUnhealthyThreshold();
            long staleTimeout = config.getSupervisor().getStaleTimeoutMs();
    
            Instant now = Instant.now();
    
            workers.replaceAll((name, state) -> {
                if (state.status == WorkerStatus.STOPPED) {
                    return state;
                }
    
                long msSinceLastHeartbeat = Duration.between(state.lastHeartbeat, now).toMillis();
    
                // Stale check
                if (msSinceLastHeartbeat > staleTimeout) {
                    if (state.status != WorkerStatus.STALE) {
                        log.warn("Worker STALE: {} (no heartbeat for {}ms)", name, msSinceLastHeartbeat);
                    }
                    return state.withStatus(WorkerStatus.STALE);
                }
    
                // Missed heartbeat check
                int expectedHeartbeats = (int) (msSinceLastHeartbeat / heartbeatInterval);
                if (expectedHeartbeats > 0) {
                    int newMissedCount = state.missedHeartbeats + 1;
    
                    if (newMissedCount >= unhealthyThreshold) {
                        if (state.status != WorkerStatus.UNHEALTHY) {
                            log.warn("Worker UNHEALTHY: {} (missed {} heartbeats)", name, newMissedCount);
                        }
                        return state.withStatus(WorkerStatus.UNHEALTHY).withMissedCount(newMissedCount);
                    } else {
                        if (state.status == WorkerStatus.RUNNING) {
                            log.info("Worker DEGRADED: {} (missed {} heartbeats)", name, newMissedCount);
                        }
                        return state.withStatus(WorkerStatus.DEGRADED).withMissedCount(newMissedCount);
                    }
                }
    
                return state;
            });
        }
    
        private record WorkerStateInternal(
            String workerName,
            WorkerStatus status,
            Instant lastHeartbeat,
            int missedHeartbeats,
            Map<String, Object> lastMetrics,
            Instant registeredAt
        ) {
            WorkerState toPublic() {
                return new WorkerState(workerName, status, lastHeartbeat, missedHeartbeats, lastMetrics, registeredAt);
            }
    
            WorkerStateInternal withStatus(WorkerStatus newStatus) {
                return new WorkerStateInternal(workerName, newStatus, lastHeartbeat, missedHeartbeats, lastMetrics, registeredAt);
            }
    
            WorkerStateInternal withMissedCount(int newCount) {
                return new WorkerStateInternal(workerName, status, lastHeartbeat, newCount, lastMetrics, registeredAt);
            }
        }
    }
    

    7. Heartbeat depuis les Workers

    7.1 Heartbeat manuel

    @Component
    public class MyWorker implements Worker {
    
        @Autowired
        private Supervisor supervisor;
    
        @Override
        public void doWork() {
            // Envoyer heartbeat avec métriques
            supervisor.heartbeat(getName(), Map.of(
                "processed", processedCount,
                "queueSize", queue.size()
            ));
    
            // Traitement...
        }
    }
    

    7.2 Heartbeat automatique via AbstractWorker

    public abstract class AbstractWorker implements Worker {
    
        @Autowired
        private Supervisor supervisor;
    
        @Override
        public final void doWork() {
            // Heartbeat automatique
            supervisor.heartbeat(getName(), getStats());
    
            // Appel au traitement réel
            doProcess();
        }
    
        protected abstract void doProcess();
    }
    

    7.3 Heartbeat via thread dédié

    @Component
    public class LongRunningWorker implements Worker {
    
        @Autowired
        private Supervisor supervisor;
    
        private ScheduledExecutorService heartbeatExecutor;
    
        @Override
        public void start() {
            heartbeatExecutor = Executors.newSingleThreadScheduledExecutor();
            heartbeatExecutor.scheduleAtFixedRate(
                () -> supervisor.heartbeat(getName()),
                0, 10, TimeUnit.SECONDS
            );
        }
    
        @Override
        public void stop() {
            if (heartbeatExecutor != null) {
                heartbeatExecutor.shutdown();
            }
        }
    
        @Override
        public void doWork() {
            // Long processing - heartbeat handled by separate thread
            processLongTask();
        }
    }
    

    8. API REST

    8.1 Endpoints

    @RestController
    @RequestMapping("/admin")
    public class SupervisorController {
    
        @Autowired
        private Supervisor supervisor;
    
        @GetMapping("/health")
        public ResponseEntity<HealthResponse> health() {
            HealthStatus status = supervisor.getGlobalHealth();
            return ResponseEntity
                .status(status == HealthStatus.HEALTHY ? 200 : 503)
                .body(new HealthResponse(status, supervisor.getUnhealthyWorkers()));
        }
    
        @GetMapping("/workers")
        public Map<String, WorkerState> workers() {
            return supervisor.getAllWorkerStates();
        }
    
        @GetMapping("/workers/{name}")
        public ResponseEntity<WorkerState> worker(@PathVariable String name) {
            WorkerState state = supervisor.getWorkerState(name);
            return state != null
                ? ResponseEntity.ok(state)
                : ResponseEntity.notFound().build();
        }
    }
    

    8.2 Réponses

    // GET /admin/health
    {
      "status": "HEALTHY",
      "unhealthyWorkers": []
    }
    
    // GET /admin/workers
    {
      "kafka-consumer": {
        "workerName": "kafka-consumer",
        "status": "RUNNING",
        "lastHeartbeat": "2025-12-09T10:30:00Z",
        "missedHeartbeats": 0,
        "lastMetrics": {
          "processed": 12345,
          "lag": 23
        },
        "registeredAt": "2025-12-09T10:00:00Z"
      },
      "order-processor": {
        "workerName": "order-processor",
        "status": "DEGRADED",
        "lastHeartbeat": "2025-12-09T10:29:45Z",
        "missedHeartbeats": 1,
        "lastMetrics": {},
        "registeredAt": "2025-12-09T10:00:00Z"
      }
    }
    

    9. Intégration Kubernetes

    9.1 Liveness Probe

    livenessProbe:
      httpGet:
        path: /admin/health
        port: 8080
      initialDelaySeconds: 30
      periodSeconds: 10
      failureThreshold: 3
    

    9.2 Readiness Probe

    readinessProbe:
      httpGet:
        path: /admin/health
        port: 8080
      initialDelaySeconds: 5
      periodSeconds: 5
    

    9.3 Health Controller adapté

    @GetMapping("/health/live")
    public ResponseEntity<Void> live() {
        // Liveness: l'application répond
        return ResponseEntity.ok().build();
    }
    
    @GetMapping("/health/ready")
    public ResponseEntity<Void> ready() {
        // Readiness: tous les workers sont healthy
        HealthStatus status = supervisor.getGlobalHealth();
        return status == HealthStatus.HEALTHY
            ? ResponseEntity.ok().build()
            : ResponseEntity.status(503).build();
    }
    

    10. Métriques Prometheus

    @Component
    public class SupervisorMetrics {
    
        private final Supervisor supervisor;
        private final MeterRegistry registry;
    
        @PostConstruct
        public void registerMetrics() {
            Gauge.builder("socle_workers_total", supervisor,
                s -> s.getAllWorkerStates().size())
                .register(registry);
    
            Gauge.builder("socle_workers_healthy", supervisor,
                s -> s.getAllWorkerStates().values().stream()
                    .filter(WorkerState::isHealthy).count())
                .register(registry);
    
            Gauge.builder("socle_workers_unhealthy", supervisor,
                s -> s.getUnhealthyWorkers().size())
                .register(registry);
        }
    }
    

    11. Bonnes pratiques

    DO

    • Envoyer des heartbeats réguliers depuis tous les workers actifs
    • Inclure des métriques utiles dans les heartbeats
    • Configurer des timeouts adaptés à vos workers
    • Utiliser les probes Kubernetes pour la haute disponibilité

    DON’T

    • Ne pas oublier d’envoyer des heartbeats dans les workers long-running
    • Ne pas ignorer les états DEGRADED
    • Ne pas configurer des timeouts trop courts (faux positifs)
    • Ne pas bloquer l’envoi de heartbeat avec du traitement lourd

    12. Références

  • Socle V004 – Pipeline

    Socle V004 – Pipeline

    09 – Pipeline Engine

    Version : 4.1.0 Date : 2025-12-28

    1. Introduction

    Le PipelineEngine permet de définir des chaînes de traitement composées d’étapes (steps) exécutées séquentiellement ou en parallèle.

    Caractéristiques

    • Définition déclarative des pipelines
    • Exécution séquentielle ou parallèle
    • Gestion des erreurs et retry
    • Context partagé entre étapes
    • Métriques et logging intégrés

    2. Architecture

    ┌─────────────────────────────────────────────────────────────┐
    │                      PipelineEngine                          │
    └─────────────────────────────────────────────────────────────┘
                                  │
                                  ▼
    ┌─────────────────────────────────────────────────────────────┐
    │                      Pipeline                                │
    │  ┌─────────────────────────────────────────────────────┐    │
    │  │              PipelineContext                         │    │
    │  │  - input data                                        │    │
    │  │  - shared state                                      │    │
    │  │  - results                                           │    │
    │  └─────────────────────────────────────────────────────┘    │
    │                                                              │
    │  ┌──────────┐   ┌──────────┐   ┌──────────┐                │
    │  │  Step 1  │──►│  Step 2  │──►│  Step 3  │                │
    │  └──────────┘   └──────────┘   └──────────┘                │
    │                                                              │
    └─────────────────────────────────────────────────────────────┘
    

    3. Interface Pipeline

    package eu.lmvi.socle.pipeline;
    
    public interface Pipeline<I, O> {
    
        /**
         * Nom du pipeline
         */
        String getName();
    
        /**
         * Exécute le pipeline
         */
        PipelineResult<O> execute(I input);
    
        /**
         * Exécute le pipeline avec context
         */
        PipelineResult<O> execute(I input, PipelineContext context);
    
        /**
         * Liste des étapes
         */
        List<PipelineStep<?, ?>> getSteps();
    }
    

    4. Interface PipelineStep

    package eu.lmvi.socle.pipeline;
    
    public interface PipelineStep<I, O> {
    
        /**
         * Nom de l'étape
         */
        String getName();
    
        /**
         * Exécute l'étape
         */
        StepResult<O> execute(I input, PipelineContext context);
    
        /**
         * L'étape peut-elle être retryée ?
         */
        default boolean isRetryable() {
            return true;
        }
    
        /**
         * Nombre max de retries
         */
        default int getMaxRetries() {
            return 3;
        }
    
        /**
         * L'étape est-elle optionnelle ?
         */
        default boolean isOptional() {
            return false;
        }
    }
    

    5. Context et Result

    5.1 PipelineContext

    package eu.lmvi.socle.pipeline;
    
    public class PipelineContext {
        private final Map<String, Object> attributes = new ConcurrentHashMap<>();
        private final List<StepResult<?>> stepResults = new ArrayList<>();
        private final String correlationId;
        private final Instant startTime;
    
        public PipelineContext() {
            this.correlationId = UUID.randomUUID().toString();
            this.startTime = Instant.now();
        }
    
        public void put(String key, Object value) {
            attributes.put(key, value);
        }
    
        public <T> Optional<T> get(String key, Class<T> type) {
            return Optional.ofNullable(attributes.get(key))
                .filter(type::isInstance)
                .map(type::cast);
        }
    
        public void addStepResult(StepResult<?> result) {
            stepResults.add(result);
        }
    
        public List<StepResult<?>> getStepResults() {
            return Collections.unmodifiableList(stepResults);
        }
    
        public String getCorrelationId() {
            return correlationId;
        }
    
        public Duration getElapsedTime() {
            return Duration.between(startTime, Instant.now());
        }
    }
    

    5.2 StepResult

    package eu.lmvi.socle.pipeline;
    
    public record StepResult<T>(
        String stepName,
        StepStatus status,
        T output,
        Exception error,
        Duration duration,
        int attempts
    ) {
        public boolean isSuccess() {
            return status == StepStatus.SUCCESS;
        }
    
        public boolean isFailure() {
            return status == StepStatus.FAILURE;
        }
    
        public boolean isSkipped() {
            return status == StepStatus.SKIPPED;
        }
    
        public static <T> StepResult<T> success(String name, T output, Duration duration) {
            return new StepResult<>(name, StepStatus.SUCCESS, output, null, duration, 1);
        }
    
        public static <T> StepResult<T> failure(String name, Exception error, Duration duration, int attempts) {
            return new StepResult<>(name, StepStatus.FAILURE, null, error, duration, attempts);
        }
    
        public static <T> StepResult<T> skipped(String name) {
            return new StepResult<>(name, StepStatus.SKIPPED, null, null, Duration.ZERO, 0);
        }
    }
    
    public enum StepStatus {
        SUCCESS,
        FAILURE,
        SKIPPED
    }
    

    5.3 PipelineResult

    package eu.lmvi.socle.pipeline;
    
    public record PipelineResult<T>(
        String pipelineName,
        PipelineStatus status,
        T output,
        PipelineContext context,
        Duration totalDuration
    ) {
        public boolean isSuccess() {
            return status == PipelineStatus.SUCCESS;
        }
    
        public List<StepResult<?>> getFailedSteps() {
            return context.getStepResults().stream()
                .filter(StepResult::isFailure)
                .toList();
        }
    }
    
    public enum PipelineStatus {
        SUCCESS,
        PARTIAL_SUCCESS,
        FAILURE
    }
    

    6. Implémentation

    6.1 DefaultPipelineEngine

    package eu.lmvi.socle.pipeline;
    
    @Component
    public class DefaultPipelineEngine implements PipelineEngine {
    
        private static final Logger log = LoggerFactory.getLogger(DefaultPipelineEngine.class);
    
        @Override
        public <I, O> PipelineResult<O> execute(Pipeline<I, O> pipeline, I input) {
            return execute(pipeline, input, new PipelineContext());
        }
    
        @Override
        @SuppressWarnings("unchecked")
        public <I, O> PipelineResult<O> execute(Pipeline<I, O> pipeline, I input, PipelineContext context) {
            log.info("[{}] Starting pipeline execution", pipeline.getName());
            Instant start = Instant.now();
    
            Object currentInput = input;
            O finalOutput = null;
            boolean hasFailure = false;
    
            for (PipelineStep<?, ?> step : pipeline.getSteps()) {
                PipelineStep<Object, Object> typedStep = (PipelineStep<Object, Object>) step;
    
                StepResult<Object> result = executeStep(typedStep, currentInput, context);
                context.addStepResult(result);
    
                if (result.isSuccess()) {
                    currentInput = result.output();
                    finalOutput = (O) result.output();
                } else if (result.isFailure()) {
                    if (!step.isOptional()) {
                        hasFailure = true;
                        log.error("[{}] Pipeline failed at step: {}", pipeline.getName(), step.getName());
                        break;
                    }
                    log.warn("[{}] Optional step failed, continuing: {}", pipeline.getName(), step.getName());
                }
            }
    
            Duration duration = Duration.between(start, Instant.now());
            PipelineStatus status = hasFailure
                ? PipelineStatus.FAILURE
                : (context.getStepResults().stream().anyMatch(StepResult::isFailure)
                    ? PipelineStatus.PARTIAL_SUCCESS
                    : PipelineStatus.SUCCESS);
    
            log.info("[{}] Pipeline completed with status: {} in {}ms",
                pipeline.getName(), status, duration.toMillis());
    
            return new PipelineResult<>(pipeline.getName(), status, finalOutput, context, duration);
        }
    
        private <I, O> StepResult<O> executeStep(PipelineStep<I, O> step, I input, PipelineContext context) {
            log.debug("[{}] Executing step", step.getName());
            Instant start = Instant.now();
            int attempts = 0;
            Exception lastError = null;
    
            while (attempts < step.getMaxRetries()) {
                attempts++;
                try {
                    StepResult<O> result = step.execute(input, context);
                    if (result.isSuccess()) {
                        Duration duration = Duration.between(start, Instant.now());
                        log.debug("[{}] Step completed in {}ms", step.getName(), duration.toMillis());
                        return StepResult.success(step.getName(), result.output(), duration);
                    }
                    lastError = result.error();
                } catch (Exception e) {
                    lastError = e;
                    log.warn("[{}] Step attempt {} failed: {}", step.getName(), attempts, e.getMessage());
    
                    if (!step.isRetryable() || attempts >= step.getMaxRetries()) {
                        break;
                    }
    
                    // Backoff
                    try {
                        Thread.sleep(attempts * 1000L);
                    } catch (InterruptedException ie) {
                        Thread.currentThread().interrupt();
                        break;
                    }
                }
            }
    
            Duration duration = Duration.between(start, Instant.now());
            return StepResult.failure(step.getName(), lastError, duration, attempts);
        }
    }
    

    6.2 Pipeline Builder

    package eu.lmvi.socle.pipeline;
    
    public class PipelineBuilder<I, O> {
    
        private final String name;
        private final List<PipelineStep<?, ?>> steps = new ArrayList<>();
    
        public PipelineBuilder(String name) {
            this.name = name;
        }
    
        public static <I, O> PipelineBuilder<I, O> create(String name) {
            return new PipelineBuilder<>(name);
        }
    
        public <NO> PipelineBuilder<I, NO> addStep(PipelineStep<O, NO> step) {
            steps.add(step);
            return (PipelineBuilder<I, NO>) this;
        }
    
        public <NO> PipelineBuilder<I, NO> addStep(String name, Function<O, NO> processor) {
            return addStep(new FunctionalStep<>(name, processor));
        }
    
        public Pipeline<I, O> build() {
            return new DefaultPipeline<>(name, List.copyOf(steps));
        }
    
        private record FunctionalStep<I, O>(String name, Function<I, O> processor) implements PipelineStep<I, O> {
    
            @Override
            public String getName() {
                return name;
            }
    
            @Override
            public StepResult<O> execute(I input, PipelineContext context) {
                O output = processor.apply(input);
                return StepResult.success(name, output, Duration.ZERO);
            }
        }
    }
    

    7. Utilisation

    7.1 Pipeline simple

    @Component
    public class OrderPipeline {
    
        @Autowired
        private PipelineEngine engine;
    
        public PipelineResult<OrderResult> processOrder(Order order) {
            Pipeline<Order, OrderResult> pipeline = PipelineBuilder
                .<Order, OrderResult>create("order-processing")
                .addStep("validate", this::validateOrder)
                .addStep("enrich", this::enrichOrder)
                .addStep("process", this::processOrder)
                .addStep("notify", this::notifyCustomer)
                .build();
    
            return engine.execute(pipeline, order);
        }
    
        private ValidatedOrder validateOrder(Order order) {
            // Validation...
            return new ValidatedOrder(order);
        }
    
        private EnrichedOrder enrichOrder(ValidatedOrder order) {
            // Enrichissement...
            return new EnrichedOrder(order);
        }
    
        private ProcessedOrder processOrder(EnrichedOrder order) {
            // Traitement...
            return new ProcessedOrder(order);
        }
    
        private OrderResult notifyCustomer(ProcessedOrder order) {
            // Notification...
            return new OrderResult(order, "SUCCESS");
        }
    }
    

    7.2 Étapes personnalisées

    public class ValidationStep implements PipelineStep<Order, ValidatedOrder> {
    
        @Override
        public String getName() {
            return "validation";
        }
    
        @Override
        public StepResult<ValidatedOrder> execute(Order input, PipelineContext context) {
            List<String> errors = new ArrayList<>();
    
            if (input.getCustomerId() == null) {
                errors.add("Missing customer ID");
            }
            if (input.getItems().isEmpty()) {
                errors.add("No items in order");
            }
    
            if (!errors.isEmpty()) {
                return StepResult.failure(getName(),
                    new ValidationException(errors),
                    Duration.ZERO, 1);
            }
    
            // Stocker des données dans le context
            context.put("customerId", input.getCustomerId());
    
            return StepResult.success(getName(), new ValidatedOrder(input), Duration.ZERO);
        }
    
        @Override
        public boolean isRetryable() {
            return false;  // Validation ne doit pas être retryée
        }
    }
    

    7.3 Étape optionnelle

    public class NotificationStep implements PipelineStep<ProcessedOrder, ProcessedOrder> {
    
        @Override
        public String getName() {
            return "notification";
        }
    
        @Override
        public StepResult<ProcessedOrder> execute(ProcessedOrder input, PipelineContext context) {
            try {
                sendNotification(input);
                return StepResult.success(getName(), input, Duration.ZERO);
            } catch (Exception e) {
                return StepResult.failure(getName(), e, Duration.ZERO, 1);
            }
        }
    
        @Override
        public boolean isOptional() {
            return true;  // Le pipeline continue même si la notif échoue
        }
    }
    

    7.4 Utilisation du context

    public class EnrichmentStep implements PipelineStep<ValidatedOrder, EnrichedOrder> {
    
        @Autowired
        private CustomerService customerService;
    
        @Override
        public String getName() {
            return "enrichment";
        }
    
        @Override
        public StepResult<EnrichedOrder> execute(ValidatedOrder input, PipelineContext context) {
            // Lire depuis le context
            String customerId = context.get("customerId", String.class).orElseThrow();
    
            // Enrichir
            Customer customer = customerService.getCustomer(customerId);
            EnrichedOrder enriched = new EnrichedOrder(input, customer);
    
            // Écrire dans le context pour les étapes suivantes
            context.put("customerEmail", customer.getEmail());
    
            return StepResult.success(getName(), enriched, Duration.ZERO);
        }
    }
    

    8. Gestion des erreurs

    8.1 Retry automatique

    public class ExternalApiStep implements PipelineStep<Data, ApiResponse> {
    
        @Override
        public String getName() {
            return "external-api-call";
        }
    
        @Override
        public boolean isRetryable() {
            return true;
        }
    
        @Override
        public int getMaxRetries() {
            return 5;  // 5 tentatives max
        }
    
        @Override
        public StepResult<ApiResponse> execute(Data input, PipelineContext context) {
            // Appel API qui peut échouer
            ApiResponse response = callApi(input);
            return StepResult.success(getName(), response, Duration.ZERO);
        }
    }
    

    8.2 Traitement des résultats

    PipelineResult<OrderResult> result = engine.execute(pipeline, order);
    
    if (result.isSuccess()) {
        log.info("Order processed successfully: {}", result.output());
    } else {
        // Analyser les étapes en échec
        for (StepResult<?> stepResult : result.getFailedSteps()) {
            log.error("Step {} failed after {} attempts: {}",
                stepResult.stepName(),
                stepResult.attempts(),
                stepResult.error().getMessage());
        }
    
        // Décider quoi faire
        if (result.status() == PipelineStatus.PARTIAL_SUCCESS) {
            // Certaines étapes optionnelles ont échoué
            handlePartialSuccess(result);
        } else {
            // Échec complet
            handleFailure(result);
        }
    }
    

    9. Pipelines parallèles

    public class ParallelEnrichmentStep implements PipelineStep<Order, EnrichedOrder> {
    
        @Autowired
        private ExecutorService executor;
    
        @Override
        public StepResult<EnrichedOrder> execute(Order input, PipelineContext context) {
            // Exécuter plusieurs enrichissements en parallèle
            CompletableFuture<Customer> customerFuture = CompletableFuture.supplyAsync(
                () -> customerService.getCustomer(input.getCustomerId()), executor);
    
            CompletableFuture<Inventory> inventoryFuture = CompletableFuture.supplyAsync(
                () -> inventoryService.checkInventory(input.getItems()), executor);
    
            CompletableFuture<Pricing> pricingFuture = CompletableFuture.supplyAsync(
                () -> pricingService.calculatePrice(input), executor);
    
            try {
                CompletableFuture.allOf(customerFuture, inventoryFuture, pricingFuture).join();
    
                EnrichedOrder enriched = new EnrichedOrder(
                    input,
                    customerFuture.get(),
                    inventoryFuture.get(),
                    pricingFuture.get()
                );
    
                return StepResult.success(getName(), enriched, Duration.ZERO);
            } catch (Exception e) {
                return StepResult.failure(getName(), e, Duration.ZERO, 1);
            }
        }
    }
    

    10. Métriques

    @Component
    public class PipelineMetrics {
    
        private final MeterRegistry registry;
    
        public void recordPipelineExecution(PipelineResult<?> result) {
            Timer.builder("socle_pipeline_duration")
                .tag("pipeline", result.pipelineName())
                .tag("status", result.status().name())
                .register(registry)
                .record(result.totalDuration());
    
            Counter.builder("socle_pipeline_executions")
                .tag("pipeline", result.pipelineName())
                .tag("status", result.status().name())
                .register(registry)
                .increment();
        }
    
        public void recordStepExecution(StepResult<?> result) {
            Timer.builder("socle_pipeline_step_duration")
                .tag("step", result.stepName())
                .tag("status", result.status().name())
                .register(registry)
                .record(result.duration());
        }
    }
    

    11. Bonnes pratiques

    DO

    • Garder les étapes petites et focalisées
    • Utiliser le context pour partager des données entre étapes
    • Marquer les étapes non-critiques comme optionnelles
    • Logger au niveau des étapes pour le debugging
    • Configurer des retries appropriés pour les appels externes

    DON’T

    • Ne pas faire de pipelines avec trop d’étapes (> 10)
    • Ne pas mélanger logique métier et infrastructure dans une étape
    • Ne pas ignorer les erreurs des étapes obligatoires
    • Ne pas utiliser de pipelines pour du traitement simple

    12. Références

    Pipeline Engine V2 (Nouveau)

    Ajouté en version 4.1.0

    13. Introduction Pipeline V2

    Le Pipeline V2 est une refonte complète du système de pipelines, conçue pour les applications à haute disponibilité nécessitant :

    • Garantie at-least-once : Aucune perte de message grâce au pattern Queue/Claim/Ack
    • Virtual Threads (Java 21) : Scalabilité maximale avec les threads virtuels
    • Reprise après crash : État persisté permettant la reprise au dernier stage OK
    • DLQ (Dead Letter Queue) : Gestion des échecs définitifs avec replay

    Quand utiliser V2 vs V1 ?

    Critère Pipeline V1 Pipeline V2
    Exécution Synchrone, mono-thread Asynchrone, multi-thread
    Garantie de livraison At-most-once At-least-once
    Persistance Non Oui (TechDB / H2)
    DLQ Non Oui
    Use case Traitement simple Flux critiques, haute disponibilité

    14. Architecture V2

    ┌─────────────────────────────────────────────────────────────────┐
    │                            MOP                                   │
    │               (supervision, restart, health)                     │
    └───────────────────────────┬─────────────────────────────────────┘
                                │
    ┌───────────────────────────▼─────────────────────────────────────┐
    │                     PipelineV2                                   │
    │                                                                  │
    │  ┌────────────┐     ┌────────────┐     ┌────────────┐          │
    │  │  Stage A   │────►│  Stage B   │────►│  Stage C   │          │
    │  │ ┌────────┐ │     │ ┌────────┐ │     │ ┌────────┐ │          │
    │  │ │ Queue  │ │     │ │ Queue  │ │     │ │ Queue  │ │          │
    │  │ └────────┘ │     │ └────────┘ │     │ └────────┘ │          │
    │  │ VThreads  │     │ VThreads  │     │ VThreads  │          │
    │  └────────────┘     └────────────┘     └────────────┘          │
    │                                                                  │
    │         ┌─────────────────┴─────────────────┐                   │
    │    ┌────▼────┐                        ┌────▼────┐              │
    │    │ Context │                        │   DLQ   │              │
    │    │ (state) │                        │ (errors)│              │
    │    └─────────┘                        └─────────┘              │
    └─────────────────────────────────────────────────────────────────┘
    

    Composants

    • StageQueue : Queue avec sémantique Claim/Ack entre chaque stage
    • VThreadStageExecutor : Workers Virtual Threads consommant les queues
    • PersistentPipelineContext : État persisté pour reprise après crash
    • DLQ : Messages en échec après max retries

    15. Création d’un Pipeline V2

    15.1 Builder Fluent

    import eu.lmvi.socle.pipeline.v2.PipelineBuilderV2;
    import eu.lmvi.socle.pipeline.v2.PipelineV2;
    
    // Création du pipeline
    PipelineV2<Order, Result> pipeline = PipelineBuilderV2
        .<Order>create("order-processing")
        .description("Pipeline de traitement des commandes")
    
        // Stage 1: Validation (2 workers, timeout 30s)
        .addStage("validate", this::validateOrder)
            .concurrency(2)
            .timeout(Duration.ofSeconds(30))
    
        // Stage 2: Enrichissement (4 workers, queue 1000)
        .addStage("enrich", this::enrichOrder)
            .concurrency(4)
            .queueSize(1000)
    
        // Stage 3: Publication (2 workers)
        .addStage("publish", this::publishOrder)
            .concurrency(2)
            .maxRetries(5)
            .backoff(Duration.ofMillis(200), 2.0)
    
        .build();
    

    15.2 Configuration par stage

    .addStage("nom", processor)
        .concurrency(4)           // Nombre de workers parallèles
        .queueSize(500)           // Taille max de la queue
        .timeout(Duration.ofMinutes(5))  // Timeout par message
        .maxRetries(3)            // Tentatives avant DLQ
        .backoff(base, multiplier) // Backoff exponentiel
        .optional()               // Stage optionnel (échec non bloquant)
    

    16. Cycle de vie

    16.1 Démarrage/Arrêt

    // Démarrer le pipeline (lance tous les workers)
    pipeline.start();
    
    // Vérifier l'état
    boolean running = pipeline.isRunning();
    
    // Arrêt gracieux (attend la fin des traitements en cours)
    pipeline.stop();
    
    // Arrêt immédiat (interrompt tout)
    pipeline.stopNow();
    

    16.2 Soumission de messages

    // Soumettre un message
    String messageId = pipeline.submit(order, "exec-001");
    
    // Avec métadonnées
    String messageId = pipeline.submit(order, "exec-001", Map.of(
        "source", "api",
        "priority", "high"
    ));
    

    16.3 Suivi d’exécution

    // État d'une exécution
    Optional<PipelineState> state = pipeline.getExecutionState("exec-001");
    state.ifPresent(s -> {
        System.out.println("Status: " + s.status());
        System.out.println("Current stage: " + s.currentStage());
        System.out.println("Duration: " + s.duration());
    });
    
    // Liste des exécutions actives
    List<PipelineState> active = pipeline.listActiveExecutions();
    
    // Statistiques globales
    PipelineV2.PipelineStats stats = pipeline.getStats();
    System.out.println("Submitted: " + stats.totalSubmitted());
    System.out.println("Completed: " + stats.totalCompleted());
    System.out.println("Failed: " + stats.totalFailed());
    

    17. Queue/Claim/Ack

    17.1 Pattern de consommation

    Le Pipeline V2 utilise un pattern de consommation fiable :

    1. CLAIM   : Le worker réserve un message (lease)
    2. PROCESS : Le worker traite le message
    3. ACK     : Succès → message supprimé
       ou
       NACK    : Échec → retry ou DLQ
    

    17.2 Lease et récupération

    Si un worker crash pendant le traitement :

    • Le lease expire après le timeout configuré
    • Le message est automatiquement remis en queue
    • Un autre worker le reprend
    // Timeout de lease par défaut: 5 minutes
    // Configurable par stage via timeout()
    

    17.3 Implémentations disponibles

    Implémentation Persistance Use case
    InMemoryStageQueue Non Développement, tests
    TechDbStageQueue Oui (H2) Production

    18. Dead Letter Queue (DLQ)

    18.1 Envoi en DLQ

    Un message est envoyé en DLQ après :

    • maxRetries tentatives échouées
    • Une erreur non-retryable

    18.2 Consultation

    // Via API REST
    GET /admin/pipelines/v2/{name}/dlq
    
    // Programmatiquement (via StageQueue)
    List<DlqMessage<Order>> messages = queue.peekDlq(100);
    for (DlqMessage<Order> msg : messages) {
        System.out.println("ID: " + msg.messageId());
        System.out.println("Error: " + msg.errorMessage());
        System.out.println("Attempts: " + msg.attempts());
        System.out.println("Failed at: " + msg.failedAt());
    }
    

    18.3 Replay

    // Via API REST
    POST /admin/pipelines/v2/{name}/dlq/{messageId}/replay
    
    // Programmatiquement
    boolean replayed = queue.replayFromDlq(messageId);
    
    // Replay tous les messages
    int count = queue.replayAllFromDlq("exec-001");
    

    18.4 Suppression

    // Via API REST
    DELETE /admin/pipelines/v2/{name}/dlq/{messageId}
    
    // Programmatiquement
    queue.deleteFromDlq(messageId);
    
    // Purge complète
    queue.purgeDlq();
    

    19. Persistance et reprise

    19.1 Context persistant

    Le Pipeline V2 peut persister son état pour survivre aux redémarrages :

    import eu.lmvi.socle.pipeline.context.TechDbPipelineContext;
    
    // Créer un context persistant
    Supplier<Connection> connectionSupplier = () -> dataSource.getConnection();
    PersistentPipelineContext context = new TechDbPipelineContext(connectionSupplier);
    
    // Utiliser avec le builder
    PipelineV2<Order, Result> pipeline = PipelineBuilderV2
        .<Order>create("order-processing")
        .context(context)  // Utilise le context persistant
        .addStage(...)
        .build();
    

    19.2 Reprise après crash

    Au redémarrage, les messages non-ackés sont automatiquement re-traités :

    1. Les messages en status CLAIMED dont le lease a expiré → remis en PENDING
    2. Les stages reprennent leur consommation normalement
    3. Grâce à l’idempotence, les messages déjà traités ne sont pas re-traités

    20. Idempotence (OBLIGATOIRE)

    20.1 Pourquoi ?

    Le modèle at-least-once signifie qu’un message peut être traité plusieurs fois :

    • Crash après traitement mais avant ACK
    • Timeout de lease pendant un traitement long
    • Replay depuis la DLQ

    Chaque stage DOIT être idempotent.

    20.2 Patterns recommandés

    // Pattern 1: Vérifier "déjà traité"
    public Order processOrder(Order order) {
        if (orderRepository.exists(order.getId())) {
            return orderRepository.get(order.getId()); // Déjà traité
        }
        // Traiter...
        return orderRepository.save(order);
    }
    
    // Pattern 2: UPSERT au lieu d'INSERT
    public void saveOrder(Order order) {
        orderRepository.upsert(order); // Idempotent par nature
    }
    
    // Pattern 3: Utiliser executionId comme clé
    public void processWithDedup(Order order, String executionId) {
        String dedupKey = "order:" + order.getId() + ":" + executionId;
        if (kvBus.exists(dedupKey)) {
            return; // Déjà traité pour cette exécution
        }
    
        // Traiter...
    
        kvBus.set(dedupKey, true, Duration.ofDays(7));
    }
    

    20.3 Anti-patterns

    // MAUVAIS: INSERT sans vérification
    db.insert(order); // Échoue si déjà inséré
    
    // MAUVAIS: Compteur incrémental
    counter.increment(); // Compté plusieurs fois en cas de replay
    
    // MAUVAIS: Email sans déduplication
    emailService.send(email); // Envoyé plusieurs fois
    

    21. API REST d’administration

    21.1 Endpoints disponibles

    GET  /admin/pipelines/v2                    → Liste des pipelines
    GET  /admin/pipelines/v2/{name}             → État d'un pipeline
    GET  /admin/pipelines/v2/{name}/stages      → État des stages
    GET  /admin/pipelines/v2/{name}/executions  → Exécutions en cours
    
    GET  /admin/pipelines/v2/{name}/dlq         → Messages en DLQ
    POST /admin/pipelines/v2/{name}/dlq/{id}/replay  → Rejouer un message
    DELETE /admin/pipelines/v2/{name}/dlq/{id}  → Supprimer de la DLQ
    POST /admin/pipelines/v2/{name}/dlq/replay-all   → Rejouer tous
    
    POST /admin/pipelines/v2/{name}/start       → Démarrer le pipeline
    POST /admin/pipelines/v2/{name}/stop        → Arrêter gracieusement
    POST /admin/pipelines/v2/{name}/stop-now    → Arrêter immédiatement
    

    21.2 Exemple de réponse

    GET /admin/pipelines/v2/order-processing
    
    {
      "timestamp": 1735387200000,
      "name": "order-processing",
      "description": "Pipeline de traitement des commandes",
      "running": true,
      "stats": {
        "stage_count": 3,
        "active_executions": 5,
        "total_submitted": 1500,
        "total_completed": 1480,
        "total_failed": 12
      }
    }
    

    22. Métriques Prometheus

    22.1 Métriques exposées

    # Compteurs
    socle_pipeline_v2_messages_total{pipeline="order-processing", stage="validate", status="success"}
    socle_pipeline_v2_messages_total{pipeline="order-processing", stage="validate", status="failed"}
    socle_pipeline_v2_dlq_total{pipeline="order-processing", stage="validate"}
    
    # Gauges
    socle_pipeline_v2_queue_pending{pipeline="order-processing", stage="validate"}
    socle_pipeline_v2_queue_processing{pipeline="order-processing", stage="validate"}
    socle_pipeline_v2_dlq_size{pipeline="order-processing", stage="validate"}
    socle_pipeline_v2_active_executions{pipeline="order-processing"}
    socle_pipeline_v2_running{pipeline="order-processing"}
    
    # Histogrammes (durée de traitement)
    socle_pipeline_v2_stage_duration_seconds{pipeline="order-processing", stage="validate", quantile="0.5"}
    socle_pipeline_v2_stage_duration_seconds{pipeline="order-processing", stage="validate", quantile="0.95"}
    socle_pipeline_v2_stage_duration_seconds{pipeline="order-processing", stage="validate", quantile="0.99"}
    

    22.2 Alertes recommandées

    # DLQ croissante
    - alert: PipelineDLQGrowing
      expr: increase(socle_pipeline_v2_dlq_total[1h]) > 10
      labels:
        severity: warning
    
    # Queue qui s'accumule (backpressure)
    - alert: PipelineQueueBacklog
      expr: socle_pipeline_v2_queue_pending > 1000
      for: 5m
      labels:
        severity: warning
    
    # Pipeline arrêté
    - alert: PipelineStopped
      expr: socle_pipeline_v2_running == 0
      for: 1m
      labels:
        severity: critical
    

    23. Bonnes pratiques V2

    DO

    • Toujours implémenter l’idempotence dans chaque stage
    • Utiliser TechDbPipelineContext en production pour la persistance
    • Configurer des maxRetries appropriés (3-5 pour API externes)
    • Monitorer les métriques DLQ et queue_pending
    • Traiter régulièrement les messages en DLQ (replay ou suppression)

    DON’T

    • Ne jamais supposer qu’un message ne sera traité qu’une fois
    • Ne pas ignorer les messages en DLQ
    • Ne pas mettre de timeout trop courts (risque de faux positifs)
    • Ne pas créer trop de stages (overhead de queues)
    • Ne pas oublier d’appeler pipeline.stop() à l’arrêt de l’application

    24. Migration V1 → V2

    24.1 Changements d’API

    V1 V2
    PipelineEngine.execute(pipeline, input) pipeline.submit(input, executionId)
    Pipeline<I,O> PipelineV2<I,O>
    PipelineBuilder PipelineBuilderV2
    Exécution synchrone Exécution asynchrone

    24.2 Étapes de migration

    1. Rendre les steps idempotents (obligatoire avant migration)
    2. Créer le pipeline avec PipelineBuilderV2
    3. Remplacer execute() par submit()
    4. Ajouter le suivi asynchrone des résultats si nécessaire
    5. Configurer la persistance (TechDbPipelineContext)
    6. Activer les métriques et monitoring DLQ

    25. Configuration Dynamique (YAML)

    Ajouté en version 4.1.0

    25.1 Introduction

    Le Pipeline V2 peut maintenant être configuré entièrement via YAML, permettant :

    • Activation/désactivation des stages sans recompiler
    • Configuration dynamique de la concurrence, timeouts, retries
    • Ajout de nouveaux stages sans modifier le code de la factory
    • Modularité des workers (un worker = un composant Spring)

    25.2 Architecture

    ┌─────────────────────────────────────────────────────────────────┐
    │                        application.yml                           │
    │  cdc:                                                            │
    │    pipeline:                                                     │
    │      stages:                                                     │
    │        - name: filter                                            │
    │          worker: filterWorker                                    │
    │          enabled: true                                           │
    └───────────────────────────────┬─────────────────────────────────┘
                                    │
                                    ▼
    ┌───────────────────────────────────────────────────────────────────┐
    │            PipelineYamlConfig (@ConfigurationProperties)          │
    │                                                                    │
    │  - name: String                                                    │
    │  - stages: List<StageYamlConfig>                                   │
    │  - getEnabledStages(): List<StageYamlConfig>                       │
    └───────────────────────────────┬───────────────────────────────────┘
                                    │
                                    ▼
    ┌───────────────────────────────────────────────────────────────────┐
    │                    DynamicPipelineBuilder                          │
    │                                                                    │
    │  .config(pipelineConfig)                                           │
    │  .workers(Map<String, PipelineStageWorker>)                        │
    │  .build() → PipelineV2<I, O>                                       │
    └───────────────────────────────────────────────────────────────────┘
    

    25.3 Interface PipelineStageWorker

    Chaque stage est implémenté comme un composant Spring implémentant PipelineStageWorker :

    package eu.lmvi.socle.pipeline.v2.worker;
    
    public interface PipelineStageWorker<I, O> {
    
        /**
         * Nom unique du worker (utilisé dans la config YAML)
         */
        String getName();
    
        /**
         * Initialise le worker (chargement ressources, compilation scripts, etc.)
         */
        default void initialize() throws Exception {}
    
        /**
         * Traite un élément d'entrée (DOIT être thread-safe)
         */
        O process(I input) throws Exception;
    
        /**
         * Libère les ressources
         */
        default void shutdown() {}
    
        /**
         * Indique si ce worker est activé
         */
        default boolean isEnabled() { return true; }
    
        /**
         * Description pour le monitoring
         */
        default String getDescription() { return getName(); }
    
        /**
         * Ordre de priorité (plus bas = plus tôt)
         */
        default int getOrder() { return 100; }
    }
    

    25.4 Exemple d’implémentation

    @Component("filterWorker")
    public class FilterStageWorker implements PipelineStageWorker<WalEvent, PipelineMessage> {
    
        @Autowired
        private TableFilter tableFilter;
    
        @Override
        public String getName() {
            return "filter";
        }
    
        @Override
        public int getOrder() {
            return 10;  // Premier dans le pipeline
        }
    
        @Override
        public PipelineMessage process(WalEvent event) throws Exception {
            PipelineMessage message = new PipelineMessage(event);
            if (!tableFilter.isAllowed(event.getTable())) {
                message.markAsFiltered("table_not_in_whitelist");
            }
            return message;
        }
    }
    

    25.5 Configuration YAML

    cdc:
      pipeline:
        name: cdc-pipeline-v2
        description: Pipeline CDC PostgreSQL vers Kafka
        enabled: true
        stages:
          # Stage 1: Filtrage
          - name: filter
            worker: filterWorker        # Nom du bean Spring
            enabled: true
            concurrency: 1
            timeout-seconds: 5
            max-retries: 1
            order: 10
    
          # Stage 2: Transformation
          - name: transform
            worker: transformWorker
            enabled: true
            concurrency: 2
            timeout-seconds: 30
            order: 20
    
          # Stage 3a: Enrichissement Rules
          - name: enrich-rules
            worker: rulesEnrichmentWorker
            enabled: true
            concurrency: 2
            order: 30
    
          # Stage 3b: Enrichissement BeanShell
          - name: enrich-beanshell
            worker: beanshellEnrichmentWorker
            enabled: ${CDC_ENRICH_BSH_ENABLED:true}  # Variable d'environnement
            concurrency: 2
            order: 31
    
          # Stage 3c: Enrichissement JavaScript
          - name: enrich-javascript
            worker: javascriptEnrichmentWorker
            enabled: ${CDC_ENRICH_JS_ENABLED:true}
            concurrency: 4
            timeout-seconds: 60
            order: 32
    
          # Stage 3d: Enrichissement Janino (nouveau)
          - name: enrich-janino
            worker: janinoEnrichmentWorker
            enabled: ${CDC_ENRICH_JANINO_ENABLED:false}  # Désactivé par défaut
            concurrency: 2
            order: 33
    
          # Stage 4: Publication Kafka
          - name: publish
            worker: publishWorker
            enabled: true
            concurrency: 2
            max-retries: 5
            order: 40
    
          # Stage 5: Commit LSN
          - name: commit
            worker: commitWorker
            enabled: true
            concurrency: 1
            order: 50
    

    25.6 Classe de configuration Spring

    @Configuration
    @ConfigurationProperties(prefix = "cdc.pipeline")
    public class CdcPipelineConfig extends PipelineYamlConfig {
        // Hérite de toutes les propriétés
    }
    

    25.7 Construction du pipeline

    @Component
    public class CdcPipelineFactory {
    
        @Autowired
        private CdcPipelineConfig pipelineConfig;
    
        @Autowired
        private Map<String, PipelineStageWorker<?, ?>> availableWorkers;
    
        @PostConstruct
        public void init() {
            // Log des workers disponibles
            log.info("Available workers: {}", availableWorkers.keySet());
    
            // Construire le pipeline dynamiquement
            cdcPipeline = DynamicPipelineBuilder.create()
                .config(pipelineConfig)
                .workers(availableWorkers)
                .build();
    
            log.info("Pipeline created with {} stages",
                pipelineConfig.countEnabledStages());
        }
    }
    

    25.8 Avantages

    Aspect Avant (hardcodé) Après (YAML)
    Modification Recompilation Redémarrage
    Activation stage Code Java Variable d’env
    Ajout stage Modifier factory Ajouter worker + config
    Configuration Dans le code Centralisée
    Environnements Branches différentes Même code, config différente

    25.9 Activation/Désactivation à chaud

    Via variables d’environnement dans .env :

    # Désactiver BeanShell
    CDC_ENRICH_BSH_ENABLED=false
    
    # Activer Janino
    CDC_ENRICH_JANINO_ENABLED=true
    
    # Redémarrer le service
    sudo systemctl restart cdc-reflet-kafka-v02
    

    25.10 Ajout d’un nouveau type d’enrichissement

    1. Créer le worker :
    @Component("monNouveauWorker")
    public class MonNouveauEnrichmentWorker
        implements PipelineStageWorker<PipelineMessage, PipelineMessage> {
        // ...
    }
    
    1. Ajouter dans application.yml :
    stages:
      - name: enrich-nouveau
        worker: monNouveauWorker
        enabled: ${CDC_ENRICH_NOUVEAU_ENABLED:false}
        order: 34
    
    1. Redémarrer – le pipeline inclura automatiquement le nouveau stage.

    26. Références V2

  • Socle V004 – Exemples de Code

    Socle V004 – Exemples de Code

    19 – Exemples

    Version : 4.0.0 Date : 2025-12-09

    1. Application minimale

    1.1 pom.xml

    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0"
             xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
             https://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
    
        <parent>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-parent</artifactId>
            <version>3.2.1</version>
        </parent>
    
        <groupId>com.example</groupId>
        <artifactId>my-socle-app</artifactId>
        <version>1.0.0</version>
    
        <properties>
            <java.version>21</java.version>
        </properties>
    
        <dependencies>
            <!-- Socle V4 -->
            <dependency>
                <groupId>eu.lmvi</groupId>
                <artifactId>socle-v004</artifactId>
                <version>4.0.0</version>
            </dependency>
        </dependencies>
    
        <build>
            <plugins>
                <plugin>
                    <groupId>org.springframework.boot</groupId>
                    <artifactId>spring-boot-maven-plugin</artifactId>
                </plugin>
            </plugins>
        </build>
    </project>
    

    1.2 Application.java

    package com.example;
    
    import org.springframework.boot.SpringApplication;
    import org.springframework.boot.autoconfigure.SpringBootApplication;
    import org.springframework.context.annotation.ComponentScan;
    
    @SpringBootApplication
    @ComponentScan(basePackages = {"com.example", "eu.lmvi.socle"})
    public class Application {
        public static void main(String[] args) {
            SpringApplication.run(Application.class, args);
        }
    }
    

    1.3 application.yml

    socle:
      app_name: my-app
      env_name: ${ENV_NAME:DEV}
      region: ${REGION:local}
    
    server:
      port: ${HTTP_PORT:8080}
    
    logging:
      config: classpath:log4j2.xml
    

    1.4 Worker simple

    package com.example.worker;
    
    import eu.lmvi.socle.worker.Worker;
    import org.springframework.stereotype.Component;
    
    @Component
    public class HelloWorker implements Worker {
    
        @Override
        public String getName() {
            return "hello-worker";
        }
    
        @Override
        public void initialize() {
            System.out.println("Hello Worker initialized");
        }
    
        @Override
        public void start() {
            System.out.println("Hello Worker started");
        }
    
        @Override
        public void doWork() {
            System.out.println("Hello from worker!");
        }
    
        @Override
        public void stop() {
            System.out.println("Hello Worker stopped");
        }
    
        @Override
        public boolean isHealthy() {
            return true;
        }
    
        @Override
        public Map<String, Object> getStats() {
            return Map.of("status", "running");
        }
    }
    

    2. Worker Kafka Consumer

    package com.example.worker;
    
    import eu.lmvi.socle.worker.AbstractWorker;
    import eu.lmvi.socle.techdb.TechDbManager;
    import org.apache.kafka.clients.consumer.*;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Component;
    
    @Component
    public class KafkaConsumerWorker extends AbstractWorker {
    
        @Autowired
        private TechDbManager techDb;
    
        private KafkaConsumer<String, String> consumer;
        private String topic = "my-topic";
        private long lastOffset = 0;
    
        @Override
        public String getName() {
            return "kafka-consumer";
        }
    
        @Override
        public int getStartPriority() {
            return 10;
        }
    
        @Override
        protected void doInitialize() {
            // Restaurer l'offset
            lastOffset = techDb.getOffset("kafka", topic + "-0").orElse(0L);
            log.info("Starting from offset: {}", lastOffset);
    
            // Créer le consumer
            Properties props = new Properties();
            props.put("bootstrap.servers", "localhost:9092");
            props.put("group.id", "my-group");
            props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            props.put("enable.auto.commit", "false");
    
            consumer = new KafkaConsumer<>(props);
            consumer.subscribe(List.of(topic));
        }
    
        @Override
        protected void doProcess() {
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
    
            for (ConsumerRecord<String, String> record : records) {
                processMessage(record);
                lastOffset = record.offset();
                incrementProcessed();
            }
    
            // Persister périodiquement
            if (processedCount.get() % 100 == 0) {
                techDb.saveOffset("kafka", topic + "-0", lastOffset, null);
            }
        }
    
        private void processMessage(ConsumerRecord<String, String> record) {
            log.debug("Processing: key={}, value={}", record.key(), record.value());
            // Traitement...
        }
    
        @Override
        protected void doStop() {
            // Sauvegarder l'offset final
            techDb.saveOffset("kafka", topic + "-0", lastOffset, null);
    
            if (consumer != null) {
                consumer.close();
            }
        }
    
        @Override
        public Map<String, Object> getStats() {
            Map<String, Object> stats = new HashMap<>(super.getStats());
            stats.put("lastOffset", lastOffset);
            return stats;
        }
    }
    

    3. Worker HTTP API

    package com.example.worker;
    
    import eu.lmvi.socle.worker.Worker;
    import eu.lmvi.socle.kv.KvBus;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Component;
    import org.springframework.web.bind.annotation.*;
    
    @Component
    @RestController
    @RequestMapping("/api/orders")
    public class OrderApiWorker implements Worker {
    
        @Autowired
        private KvBus kvBus;
    
        @Autowired
        private OrderService orderService;
    
        private volatile boolean running = false;
    
        @Override
        public String getName() {
            return "order-api";
        }
    
        @Override
        public boolean isPassive() {
            return true;  // Pas de doWork cyclique
        }
    
        @Override
        public void initialize() {}
    
        @Override
        public void start() {
            running = true;
        }
    
        @Override
        public void doWork() {
            // Passif - traitement via endpoints REST
        }
    
        @Override
        public void stop() {
            running = false;
        }
    
        @Override
        public boolean isHealthy() {
            return running;
        }
    
        @Override
        public Map<String, Object> getStats() {
            return Map.of("running", running);
        }
    
        // === REST Endpoints ===
    
        @PostMapping
        public ResponseEntity<Order> createOrder(@RequestBody CreateOrderRequest request) {
            Order order = orderService.create(request);
    
            // Cache l'order
            kvBus.putJson("order:" + order.getId(), order);
            kvBus.setTtl("order:" + order.getId(), Duration.ofHours(1));
    
            return ResponseEntity.status(HttpStatus.CREATED).body(order);
        }
    
        @GetMapping("/{id}")
        public ResponseEntity<Order> getOrder(@PathVariable String id) {
            // Vérifier le cache d'abord
            Optional<Order> cached = kvBus.getJson("order:" + id, Order.class);
            if (cached.isPresent()) {
                return ResponseEntity.ok(cached.get());
            }
    
            // Sinon charger depuis la DB
            return orderService.findById(id)
                .map(ResponseEntity::ok)
                .orElse(ResponseEntity.notFound().build());
        }
    }
    

    4. Worker Schedulé (Cron)

    package com.example.worker;
    
    import eu.lmvi.socle.worker.AbstractWorker;
    import org.springframework.stereotype.Component;
    
    @Component
    public class DailyReportWorker extends AbstractWorker {
    
        @Override
        public String getName() {
            return "daily-report";
        }
    
        @Override
        public String getSchedule() {
            return "0 0 6 * * ?";  // Tous les jours à 6h
        }
    
        @Override
        public boolean isScheduled() {
            return true;
        }
    
        @Override
        protected void doProcess() {
            log.info("Generating daily report...");
    
            // Collecter les données
            ReportData data = collectReportData();
    
            // Générer le rapport
            Report report = generateReport(data);
    
            // Envoyer par email
            sendReportByEmail(report);
    
            log.info("Daily report sent successfully");
            incrementProcessed();
        }
    
        private ReportData collectReportData() {
            // ...
            return new ReportData();
        }
    
        private Report generateReport(ReportData data) {
            // ...
            return new Report();
        }
    
        private void sendReportByEmail(Report report) {
            // ...
        }
    }
    

    5. Pipeline de traitement

    package com.example.pipeline;
    
    import eu.lmvi.socle.pipeline.*;
    import org.springframework.stereotype.Component;
    
    @Component
    public class OrderProcessingPipeline {
    
        @Autowired
        private PipelineEngine engine;
    
        @Autowired
        private OrderValidator validator;
    
        @Autowired
        private OrderEnricher enricher;
    
        @Autowired
        private PaymentProcessor paymentProcessor;
    
        @Autowired
        private NotificationService notificationService;
    
        public PipelineResult<ProcessedOrder> process(Order order) {
            Pipeline<Order, ProcessedOrder> pipeline = PipelineBuilder
                .<Order, ProcessedOrder>create("order-processing")
                .addStep(new ValidationStep(validator))
                .addStep(new EnrichmentStep(enricher))
                .addStep(new PaymentStep(paymentProcessor))
                .addStep(new NotificationStep(notificationService))
                .build();
    
            return engine.execute(pipeline, order);
        }
    }
    
    // Étape de validation
    class ValidationStep implements PipelineStep<Order, ValidatedOrder> {
    
        private final OrderValidator validator;
    
        public ValidationStep(OrderValidator validator) {
            this.validator = validator;
        }
    
        @Override
        public String getName() {
            return "validation";
        }
    
        @Override
        public StepResult<ValidatedOrder> execute(Order input, PipelineContext context) {
            List<String> errors = validator.validate(input);
            if (!errors.isEmpty()) {
                return StepResult.failure(getName(),
                    new ValidationException(errors), Duration.ZERO, 1);
            }
            return StepResult.success(getName(), new ValidatedOrder(input), Duration.ZERO);
        }
    
        @Override
        public boolean isRetryable() {
            return false;
        }
    }
    
    // Étape de notification (optionnelle)
    class NotificationStep implements PipelineStep<ProcessedOrder, ProcessedOrder> {
    
        private final NotificationService notificationService;
    
        @Override
        public String getName() {
            return "notification";
        }
    
        @Override
        public StepResult<ProcessedOrder> execute(ProcessedOrder input, PipelineContext context) {
            try {
                notificationService.sendOrderConfirmation(input);
                return StepResult.success(getName(), input, Duration.ZERO);
            } catch (Exception e) {
                return StepResult.failure(getName(), e, Duration.ZERO, 1);
            }
        }
    
        @Override
        public boolean isOptional() {
            return true;  // Le pipeline continue même si la notif échoue
        }
    }
    

    6. Service avec résilience

    package com.example.service;
    
    import eu.lmvi.socle.resilience.*;
    import org.springframework.stereotype.Service;
    
    @Service
    public class ExternalApiService {
    
        @Autowired
        private RetryTemplate retryTemplate;
    
        @Autowired
        private CircuitBreakerRegistry cbRegistry;
    
        @Autowired
        private KvBus kvBus;
    
        private final OkHttpClient httpClient;
    
        public Data fetchData(String id) {
            // Circuit breaker + Retry + Cache fallback
            CircuitBreaker cb = cbRegistry.getOrCreate("external-api");
    
            return cb.executeWithFallback(
                () -> retryTemplate.execute(() -> doFetchData(id)),
                () -> getCachedData(id)
            );
        }
    
        private Data doFetchData(String id) throws IOException {
            Request request = new Request.Builder()
                .url("https://api.example.com/data/" + id)
                .build();
    
            try (Response response = httpClient.newCall(request).execute()) {
                if (!response.isSuccessful()) {
                    throw new IOException("API returned " + response.code());
                }
    
                Data data = parseResponse(response.body().string());
    
                // Mettre en cache
                kvBus.putJson("cache:data:" + id, data);
                kvBus.setTtl("cache:data:" + id, Duration.ofMinutes(5));
    
                return data;
            }
        }
    
        private Data getCachedData(String id) {
            return kvBus.getJson("cache:data:" + id, Data.class)
                .orElseThrow(() -> new RuntimeException("No cached data available"));
        }
    }
    

    7. Configuration multi-environnement

    application.yml (base)

    socle:
      app_name: ${APP_NAME:my-app}
      env_name: ${ENV_NAME:DEV}
      region: ${REGION:local}
    
    spring:
      profiles:
        active: ${PROFILE:dev}
    

    application-dev.yml

    socle:
      kvbus:
        mode: in_memory
      techdb:
        console:
          enabled: true
      admin:
        auth:
          enabled: false
    
    logging:
      level:
        eu.lmvi.socle: DEBUG
    

    application-prod.yml

    socle:
      kvbus:
        mode: redis
        redis:
          host: ${REDIS_HOST}
          password: ${REDIS_PASSWORD}
      techdb:
        console:
          enabled: false
      logging:
        forwarder:
          enabled: true
      auth:
        enabled: true
      admin:
        auth:
          enabled: true
    
    logging:
      level:
        eu.lmvi.socle: INFO
    

    8. Dockerfile complet

    # Build stage
    FROM eclipse-temurin:21-jdk-alpine AS build
    WORKDIR /app
    COPY pom.xml .
    COPY src ./src
    RUN apk add --no-cache maven && \
        mvn clean package -DskipTests
    
    # Runtime stage
    FROM eclipse-temurin:21-jre-alpine
    WORKDIR /app
    
    # Security
    RUN addgroup -S app && adduser -S app -G app
    USER app
    
    # Copy artifact
    COPY --from=build --chown=app:app /app/target/*.jar app.jar
    
    # Config
    ENV JAVA_OPTS="-XX:+UseContainerSupport -XX:MaxRAMPercentage=75.0 -Djava.security.egd=file:/dev/./urandom"
    
    # Health check
    HEALTHCHECK --interval=30s --timeout=10s --retries=3 \
        CMD wget -qO- http://localhost:8080/admin/health/live || exit 1
    
    EXPOSE 8080
    
    ENTRYPOINT ["sh", "-c", "java $JAVA_OPTS -jar app.jar"]
    

    9. Kubernetes deployment complet

    Voir 16-KUBERNETES pour l’exemple complet de déploiement K8s.

    10. Références