Étiquette : Computers

  • Socle V004 – Pipeline

    Socle V004 – Pipeline

    09 – Pipeline Engine

    Version : 4.1.0 Date : 2025-12-28

    1. Introduction

    Le PipelineEngine permet de définir des chaînes de traitement composées d’étapes (steps) exécutées séquentiellement ou en parallèle.

    Caractéristiques

    • Définition déclarative des pipelines
    • Exécution séquentielle ou parallèle
    • Gestion des erreurs et retry
    • Context partagé entre étapes
    • Métriques et logging intégrés

    2. Architecture

    ┌─────────────────────────────────────────────────────────────┐
    │                      PipelineEngine                          │
    └─────────────────────────────────────────────────────────────┘
                                  │
                                  ▼
    ┌─────────────────────────────────────────────────────────────┐
    │                      Pipeline                                │
    │  ┌─────────────────────────────────────────────────────┐    │
    │  │              PipelineContext                         │    │
    │  │  - input data                                        │    │
    │  │  - shared state                                      │    │
    │  │  - results                                           │    │
    │  └─────────────────────────────────────────────────────┘    │
    │                                                              │
    │  ┌──────────┐   ┌──────────┐   ┌──────────┐                │
    │  │  Step 1  │──►│  Step 2  │──►│  Step 3  │                │
    │  └──────────┘   └──────────┘   └──────────┘                │
    │                                                              │
    └─────────────────────────────────────────────────────────────┘
    

    3. Interface Pipeline

    package eu.lmvi.socle.pipeline;
    
    public interface Pipeline<I, O> {
    
        /**
         * Nom du pipeline
         */
        String getName();
    
        /**
         * Exécute le pipeline
         */
        PipelineResult<O> execute(I input);
    
        /**
         * Exécute le pipeline avec context
         */
        PipelineResult<O> execute(I input, PipelineContext context);
    
        /**
         * Liste des étapes
         */
        List<PipelineStep<?, ?>> getSteps();
    }
    

    4. Interface PipelineStep

    package eu.lmvi.socle.pipeline;
    
    public interface PipelineStep<I, O> {
    
        /**
         * Nom de l'étape
         */
        String getName();
    
        /**
         * Exécute l'étape
         */
        StepResult<O> execute(I input, PipelineContext context);
    
        /**
         * L'étape peut-elle être retryée ?
         */
        default boolean isRetryable() {
            return true;
        }
    
        /**
         * Nombre max de retries
         */
        default int getMaxRetries() {
            return 3;
        }
    
        /**
         * L'étape est-elle optionnelle ?
         */
        default boolean isOptional() {
            return false;
        }
    }
    

    5. Context et Result

    5.1 PipelineContext

    package eu.lmvi.socle.pipeline;
    
    public class PipelineContext {
        private final Map<String, Object> attributes = new ConcurrentHashMap<>();
        private final List<StepResult<?>> stepResults = new ArrayList<>();
        private final String correlationId;
        private final Instant startTime;
    
        public PipelineContext() {
            this.correlationId = UUID.randomUUID().toString();
            this.startTime = Instant.now();
        }
    
        public void put(String key, Object value) {
            attributes.put(key, value);
        }
    
        public <T> Optional<T> get(String key, Class<T> type) {
            return Optional.ofNullable(attributes.get(key))
                .filter(type::isInstance)
                .map(type::cast);
        }
    
        public void addStepResult(StepResult<?> result) {
            stepResults.add(result);
        }
    
        public List<StepResult<?>> getStepResults() {
            return Collections.unmodifiableList(stepResults);
        }
    
        public String getCorrelationId() {
            return correlationId;
        }
    
        public Duration getElapsedTime() {
            return Duration.between(startTime, Instant.now());
        }
    }
    

    5.2 StepResult

    package eu.lmvi.socle.pipeline;
    
    public record StepResult<T>(
        String stepName,
        StepStatus status,
        T output,
        Exception error,
        Duration duration,
        int attempts
    ) {
        public boolean isSuccess() {
            return status == StepStatus.SUCCESS;
        }
    
        public boolean isFailure() {
            return status == StepStatus.FAILURE;
        }
    
        public boolean isSkipped() {
            return status == StepStatus.SKIPPED;
        }
    
        public static <T> StepResult<T> success(String name, T output, Duration duration) {
            return new StepResult<>(name, StepStatus.SUCCESS, output, null, duration, 1);
        }
    
        public static <T> StepResult<T> failure(String name, Exception error, Duration duration, int attempts) {
            return new StepResult<>(name, StepStatus.FAILURE, null, error, duration, attempts);
        }
    
        public static <T> StepResult<T> skipped(String name) {
            return new StepResult<>(name, StepStatus.SKIPPED, null, null, Duration.ZERO, 0);
        }
    }
    
    public enum StepStatus {
        SUCCESS,
        FAILURE,
        SKIPPED
    }
    

    5.3 PipelineResult

    package eu.lmvi.socle.pipeline;
    
    public record PipelineResult<T>(
        String pipelineName,
        PipelineStatus status,
        T output,
        PipelineContext context,
        Duration totalDuration
    ) {
        public boolean isSuccess() {
            return status == PipelineStatus.SUCCESS;
        }
    
        public List<StepResult<?>> getFailedSteps() {
            return context.getStepResults().stream()
                .filter(StepResult::isFailure)
                .toList();
        }
    }
    
    public enum PipelineStatus {
        SUCCESS,
        PARTIAL_SUCCESS,
        FAILURE
    }
    

    6. Implémentation

    6.1 DefaultPipelineEngine

    package eu.lmvi.socle.pipeline;
    
    @Component
    public class DefaultPipelineEngine implements PipelineEngine {
    
        private static final Logger log = LoggerFactory.getLogger(DefaultPipelineEngine.class);
    
        @Override
        public <I, O> PipelineResult<O> execute(Pipeline<I, O> pipeline, I input) {
            return execute(pipeline, input, new PipelineContext());
        }
    
        @Override
        @SuppressWarnings("unchecked")
        public <I, O> PipelineResult<O> execute(Pipeline<I, O> pipeline, I input, PipelineContext context) {
            log.info("[{}] Starting pipeline execution", pipeline.getName());
            Instant start = Instant.now();
    
            Object currentInput = input;
            O finalOutput = null;
            boolean hasFailure = false;
    
            for (PipelineStep<?, ?> step : pipeline.getSteps()) {
                PipelineStep<Object, Object> typedStep = (PipelineStep<Object, Object>) step;
    
                StepResult<Object> result = executeStep(typedStep, currentInput, context);
                context.addStepResult(result);
    
                if (result.isSuccess()) {
                    currentInput = result.output();
                    finalOutput = (O) result.output();
                } else if (result.isFailure()) {
                    if (!step.isOptional()) {
                        hasFailure = true;
                        log.error("[{}] Pipeline failed at step: {}", pipeline.getName(), step.getName());
                        break;
                    }
                    log.warn("[{}] Optional step failed, continuing: {}", pipeline.getName(), step.getName());
                }
            }
    
            Duration duration = Duration.between(start, Instant.now());
            PipelineStatus status = hasFailure
                ? PipelineStatus.FAILURE
                : (context.getStepResults().stream().anyMatch(StepResult::isFailure)
                    ? PipelineStatus.PARTIAL_SUCCESS
                    : PipelineStatus.SUCCESS);
    
            log.info("[{}] Pipeline completed with status: {} in {}ms",
                pipeline.getName(), status, duration.toMillis());
    
            return new PipelineResult<>(pipeline.getName(), status, finalOutput, context, duration);
        }
    
        private <I, O> StepResult<O> executeStep(PipelineStep<I, O> step, I input, PipelineContext context) {
            log.debug("[{}] Executing step", step.getName());
            Instant start = Instant.now();
            int attempts = 0;
            Exception lastError = null;
    
            while (attempts < step.getMaxRetries()) {
                attempts++;
                try {
                    StepResult<O> result = step.execute(input, context);
                    if (result.isSuccess()) {
                        Duration duration = Duration.between(start, Instant.now());
                        log.debug("[{}] Step completed in {}ms", step.getName(), duration.toMillis());
                        return StepResult.success(step.getName(), result.output(), duration);
                    }
                    lastError = result.error();
                } catch (Exception e) {
                    lastError = e;
                    log.warn("[{}] Step attempt {} failed: {}", step.getName(), attempts, e.getMessage());
    
                    if (!step.isRetryable() || attempts >= step.getMaxRetries()) {
                        break;
                    }
    
                    // Backoff
                    try {
                        Thread.sleep(attempts * 1000L);
                    } catch (InterruptedException ie) {
                        Thread.currentThread().interrupt();
                        break;
                    }
                }
            }
    
            Duration duration = Duration.between(start, Instant.now());
            return StepResult.failure(step.getName(), lastError, duration, attempts);
        }
    }
    

    6.2 Pipeline Builder

    package eu.lmvi.socle.pipeline;
    
    public class PipelineBuilder<I, O> {
    
        private final String name;
        private final List<PipelineStep<?, ?>> steps = new ArrayList<>();
    
        public PipelineBuilder(String name) {
            this.name = name;
        }
    
        public static <I, O> PipelineBuilder<I, O> create(String name) {
            return new PipelineBuilder<>(name);
        }
    
        public <NO> PipelineBuilder<I, NO> addStep(PipelineStep<O, NO> step) {
            steps.add(step);
            return (PipelineBuilder<I, NO>) this;
        }
    
        public <NO> PipelineBuilder<I, NO> addStep(String name, Function<O, NO> processor) {
            return addStep(new FunctionalStep<>(name, processor));
        }
    
        public Pipeline<I, O> build() {
            return new DefaultPipeline<>(name, List.copyOf(steps));
        }
    
        private record FunctionalStep<I, O>(String name, Function<I, O> processor) implements PipelineStep<I, O> {
    
            @Override
            public String getName() {
                return name;
            }
    
            @Override
            public StepResult<O> execute(I input, PipelineContext context) {
                O output = processor.apply(input);
                return StepResult.success(name, output, Duration.ZERO);
            }
        }
    }
    

    7. Utilisation

    7.1 Pipeline simple

    @Component
    public class OrderPipeline {
    
        @Autowired
        private PipelineEngine engine;
    
        public PipelineResult<OrderResult> processOrder(Order order) {
            Pipeline<Order, OrderResult> pipeline = PipelineBuilder
                .<Order, OrderResult>create("order-processing")
                .addStep("validate", this::validateOrder)
                .addStep("enrich", this::enrichOrder)
                .addStep("process", this::processOrder)
                .addStep("notify", this::notifyCustomer)
                .build();
    
            return engine.execute(pipeline, order);
        }
    
        private ValidatedOrder validateOrder(Order order) {
            // Validation...
            return new ValidatedOrder(order);
        }
    
        private EnrichedOrder enrichOrder(ValidatedOrder order) {
            // Enrichissement...
            return new EnrichedOrder(order);
        }
    
        private ProcessedOrder processOrder(EnrichedOrder order) {
            // Traitement...
            return new ProcessedOrder(order);
        }
    
        private OrderResult notifyCustomer(ProcessedOrder order) {
            // Notification...
            return new OrderResult(order, "SUCCESS");
        }
    }
    

    7.2 Étapes personnalisées

    public class ValidationStep implements PipelineStep<Order, ValidatedOrder> {
    
        @Override
        public String getName() {
            return "validation";
        }
    
        @Override
        public StepResult<ValidatedOrder> execute(Order input, PipelineContext context) {
            List<String> errors = new ArrayList<>();
    
            if (input.getCustomerId() == null) {
                errors.add("Missing customer ID");
            }
            if (input.getItems().isEmpty()) {
                errors.add("No items in order");
            }
    
            if (!errors.isEmpty()) {
                return StepResult.failure(getName(),
                    new ValidationException(errors),
                    Duration.ZERO, 1);
            }
    
            // Stocker des données dans le context
            context.put("customerId", input.getCustomerId());
    
            return StepResult.success(getName(), new ValidatedOrder(input), Duration.ZERO);
        }
    
        @Override
        public boolean isRetryable() {
            return false;  // Validation ne doit pas être retryée
        }
    }
    

    7.3 Étape optionnelle

    public class NotificationStep implements PipelineStep<ProcessedOrder, ProcessedOrder> {
    
        @Override
        public String getName() {
            return "notification";
        }
    
        @Override
        public StepResult<ProcessedOrder> execute(ProcessedOrder input, PipelineContext context) {
            try {
                sendNotification(input);
                return StepResult.success(getName(), input, Duration.ZERO);
            } catch (Exception e) {
                return StepResult.failure(getName(), e, Duration.ZERO, 1);
            }
        }
    
        @Override
        public boolean isOptional() {
            return true;  // Le pipeline continue même si la notif échoue
        }
    }
    

    7.4 Utilisation du context

    public class EnrichmentStep implements PipelineStep<ValidatedOrder, EnrichedOrder> {
    
        @Autowired
        private CustomerService customerService;
    
        @Override
        public String getName() {
            return "enrichment";
        }
    
        @Override
        public StepResult<EnrichedOrder> execute(ValidatedOrder input, PipelineContext context) {
            // Lire depuis le context
            String customerId = context.get("customerId", String.class).orElseThrow();
    
            // Enrichir
            Customer customer = customerService.getCustomer(customerId);
            EnrichedOrder enriched = new EnrichedOrder(input, customer);
    
            // Écrire dans le context pour les étapes suivantes
            context.put("customerEmail", customer.getEmail());
    
            return StepResult.success(getName(), enriched, Duration.ZERO);
        }
    }
    

    8. Gestion des erreurs

    8.1 Retry automatique

    public class ExternalApiStep implements PipelineStep<Data, ApiResponse> {
    
        @Override
        public String getName() {
            return "external-api-call";
        }
    
        @Override
        public boolean isRetryable() {
            return true;
        }
    
        @Override
        public int getMaxRetries() {
            return 5;  // 5 tentatives max
        }
    
        @Override
        public StepResult<ApiResponse> execute(Data input, PipelineContext context) {
            // Appel API qui peut échouer
            ApiResponse response = callApi(input);
            return StepResult.success(getName(), response, Duration.ZERO);
        }
    }
    

    8.2 Traitement des résultats

    PipelineResult<OrderResult> result = engine.execute(pipeline, order);
    
    if (result.isSuccess()) {
        log.info("Order processed successfully: {}", result.output());
    } else {
        // Analyser les étapes en échec
        for (StepResult<?> stepResult : result.getFailedSteps()) {
            log.error("Step {} failed after {} attempts: {}",
                stepResult.stepName(),
                stepResult.attempts(),
                stepResult.error().getMessage());
        }
    
        // Décider quoi faire
        if (result.status() == PipelineStatus.PARTIAL_SUCCESS) {
            // Certaines étapes optionnelles ont échoué
            handlePartialSuccess(result);
        } else {
            // Échec complet
            handleFailure(result);
        }
    }
    

    9. Pipelines parallèles

    public class ParallelEnrichmentStep implements PipelineStep<Order, EnrichedOrder> {
    
        @Autowired
        private ExecutorService executor;
    
        @Override
        public StepResult<EnrichedOrder> execute(Order input, PipelineContext context) {
            // Exécuter plusieurs enrichissements en parallèle
            CompletableFuture<Customer> customerFuture = CompletableFuture.supplyAsync(
                () -> customerService.getCustomer(input.getCustomerId()), executor);
    
            CompletableFuture<Inventory> inventoryFuture = CompletableFuture.supplyAsync(
                () -> inventoryService.checkInventory(input.getItems()), executor);
    
            CompletableFuture<Pricing> pricingFuture = CompletableFuture.supplyAsync(
                () -> pricingService.calculatePrice(input), executor);
    
            try {
                CompletableFuture.allOf(customerFuture, inventoryFuture, pricingFuture).join();
    
                EnrichedOrder enriched = new EnrichedOrder(
                    input,
                    customerFuture.get(),
                    inventoryFuture.get(),
                    pricingFuture.get()
                );
    
                return StepResult.success(getName(), enriched, Duration.ZERO);
            } catch (Exception e) {
                return StepResult.failure(getName(), e, Duration.ZERO, 1);
            }
        }
    }
    

    10. Métriques

    @Component
    public class PipelineMetrics {
    
        private final MeterRegistry registry;
    
        public void recordPipelineExecution(PipelineResult<?> result) {
            Timer.builder("socle_pipeline_duration")
                .tag("pipeline", result.pipelineName())
                .tag("status", result.status().name())
                .register(registry)
                .record(result.totalDuration());
    
            Counter.builder("socle_pipeline_executions")
                .tag("pipeline", result.pipelineName())
                .tag("status", result.status().name())
                .register(registry)
                .increment();
        }
    
        public void recordStepExecution(StepResult<?> result) {
            Timer.builder("socle_pipeline_step_duration")
                .tag("step", result.stepName())
                .tag("status", result.status().name())
                .register(registry)
                .record(result.duration());
        }
    }
    

    11. Bonnes pratiques

    DO

    • Garder les étapes petites et focalisées
    • Utiliser le context pour partager des données entre étapes
    • Marquer les étapes non-critiques comme optionnelles
    • Logger au niveau des étapes pour le debugging
    • Configurer des retries appropriés pour les appels externes

    DON’T

    • Ne pas faire de pipelines avec trop d’étapes (> 10)
    • Ne pas mélanger logique métier et infrastructure dans une étape
    • Ne pas ignorer les erreurs des étapes obligatoires
    • Ne pas utiliser de pipelines pour du traitement simple

    12. Références

    Pipeline Engine V2 (Nouveau)

    Ajouté en version 4.1.0

    13. Introduction Pipeline V2

    Le Pipeline V2 est une refonte complète du système de pipelines, conçue pour les applications à haute disponibilité nécessitant :

    • Garantie at-least-once : Aucune perte de message grâce au pattern Queue/Claim/Ack
    • Virtual Threads (Java 21) : Scalabilité maximale avec les threads virtuels
    • Reprise après crash : État persisté permettant la reprise au dernier stage OK
    • DLQ (Dead Letter Queue) : Gestion des échecs définitifs avec replay

    Quand utiliser V2 vs V1 ?

    Critère Pipeline V1 Pipeline V2
    Exécution Synchrone, mono-thread Asynchrone, multi-thread
    Garantie de livraison At-most-once At-least-once
    Persistance Non Oui (TechDB / H2)
    DLQ Non Oui
    Use case Traitement simple Flux critiques, haute disponibilité

    14. Architecture V2

    ┌─────────────────────────────────────────────────────────────────┐
    │                            MOP                                   │
    │               (supervision, restart, health)                     │
    └───────────────────────────┬─────────────────────────────────────┘
                                │
    ┌───────────────────────────▼─────────────────────────────────────┐
    │                     PipelineV2                                   │
    │                                                                  │
    │  ┌────────────┐     ┌────────────┐     ┌────────────┐          │
    │  │  Stage A   │────►│  Stage B   │────►│  Stage C   │          │
    │  │ ┌────────┐ │     │ ┌────────┐ │     │ ┌────────┐ │          │
    │  │ │ Queue  │ │     │ │ Queue  │ │     │ │ Queue  │ │          │
    │  │ └────────┘ │     │ └────────┘ │     │ └────────┘ │          │
    │  │ VThreads  │     │ VThreads  │     │ VThreads  │          │
    │  └────────────┘     └────────────┘     └────────────┘          │
    │                                                                  │
    │         ┌─────────────────┴─────────────────┐                   │
    │    ┌────▼────┐                        ┌────▼────┐              │
    │    │ Context │                        │   DLQ   │              │
    │    │ (state) │                        │ (errors)│              │
    │    └─────────┘                        └─────────┘              │
    └─────────────────────────────────────────────────────────────────┘
    

    Composants

    • StageQueue : Queue avec sémantique Claim/Ack entre chaque stage
    • VThreadStageExecutor : Workers Virtual Threads consommant les queues
    • PersistentPipelineContext : État persisté pour reprise après crash
    • DLQ : Messages en échec après max retries

    15. Création d’un Pipeline V2

    15.1 Builder Fluent

    import eu.lmvi.socle.pipeline.v2.PipelineBuilderV2;
    import eu.lmvi.socle.pipeline.v2.PipelineV2;
    
    // Création du pipeline
    PipelineV2<Order, Result> pipeline = PipelineBuilderV2
        .<Order>create("order-processing")
        .description("Pipeline de traitement des commandes")
    
        // Stage 1: Validation (2 workers, timeout 30s)
        .addStage("validate", this::validateOrder)
            .concurrency(2)
            .timeout(Duration.ofSeconds(30))
    
        // Stage 2: Enrichissement (4 workers, queue 1000)
        .addStage("enrich", this::enrichOrder)
            .concurrency(4)
            .queueSize(1000)
    
        // Stage 3: Publication (2 workers)
        .addStage("publish", this::publishOrder)
            .concurrency(2)
            .maxRetries(5)
            .backoff(Duration.ofMillis(200), 2.0)
    
        .build();
    

    15.2 Configuration par stage

    .addStage("nom", processor)
        .concurrency(4)           // Nombre de workers parallèles
        .queueSize(500)           // Taille max de la queue
        .timeout(Duration.ofMinutes(5))  // Timeout par message
        .maxRetries(3)            // Tentatives avant DLQ
        .backoff(base, multiplier) // Backoff exponentiel
        .optional()               // Stage optionnel (échec non bloquant)
    

    16. Cycle de vie

    16.1 Démarrage/Arrêt

    // Démarrer le pipeline (lance tous les workers)
    pipeline.start();
    
    // Vérifier l'état
    boolean running = pipeline.isRunning();
    
    // Arrêt gracieux (attend la fin des traitements en cours)
    pipeline.stop();
    
    // Arrêt immédiat (interrompt tout)
    pipeline.stopNow();
    

    16.2 Soumission de messages

    // Soumettre un message
    String messageId = pipeline.submit(order, "exec-001");
    
    // Avec métadonnées
    String messageId = pipeline.submit(order, "exec-001", Map.of(
        "source", "api",
        "priority", "high"
    ));
    

    16.3 Suivi d’exécution

    // État d'une exécution
    Optional<PipelineState> state = pipeline.getExecutionState("exec-001");
    state.ifPresent(s -> {
        System.out.println("Status: " + s.status());
        System.out.println("Current stage: " + s.currentStage());
        System.out.println("Duration: " + s.duration());
    });
    
    // Liste des exécutions actives
    List<PipelineState> active = pipeline.listActiveExecutions();
    
    // Statistiques globales
    PipelineV2.PipelineStats stats = pipeline.getStats();
    System.out.println("Submitted: " + stats.totalSubmitted());
    System.out.println("Completed: " + stats.totalCompleted());
    System.out.println("Failed: " + stats.totalFailed());
    

    17. Queue/Claim/Ack

    17.1 Pattern de consommation

    Le Pipeline V2 utilise un pattern de consommation fiable :

    1. CLAIM   : Le worker réserve un message (lease)
    2. PROCESS : Le worker traite le message
    3. ACK     : Succès → message supprimé
       ou
       NACK    : Échec → retry ou DLQ
    

    17.2 Lease et récupération

    Si un worker crash pendant le traitement :

    • Le lease expire après le timeout configuré
    • Le message est automatiquement remis en queue
    • Un autre worker le reprend
    // Timeout de lease par défaut: 5 minutes
    // Configurable par stage via timeout()
    

    17.3 Implémentations disponibles

    Implémentation Persistance Use case
    InMemoryStageQueue Non Développement, tests
    TechDbStageQueue Oui (H2) Production

    18. Dead Letter Queue (DLQ)

    18.1 Envoi en DLQ

    Un message est envoyé en DLQ après :

    • maxRetries tentatives échouées
    • Une erreur non-retryable

    18.2 Consultation

    // Via API REST
    GET /admin/pipelines/v2/{name}/dlq
    
    // Programmatiquement (via StageQueue)
    List<DlqMessage<Order>> messages = queue.peekDlq(100);
    for (DlqMessage<Order> msg : messages) {
        System.out.println("ID: " + msg.messageId());
        System.out.println("Error: " + msg.errorMessage());
        System.out.println("Attempts: " + msg.attempts());
        System.out.println("Failed at: " + msg.failedAt());
    }
    

    18.3 Replay

    // Via API REST
    POST /admin/pipelines/v2/{name}/dlq/{messageId}/replay
    
    // Programmatiquement
    boolean replayed = queue.replayFromDlq(messageId);
    
    // Replay tous les messages
    int count = queue.replayAllFromDlq("exec-001");
    

    18.4 Suppression

    // Via API REST
    DELETE /admin/pipelines/v2/{name}/dlq/{messageId}
    
    // Programmatiquement
    queue.deleteFromDlq(messageId);
    
    // Purge complète
    queue.purgeDlq();
    

    19. Persistance et reprise

    19.1 Context persistant

    Le Pipeline V2 peut persister son état pour survivre aux redémarrages :

    import eu.lmvi.socle.pipeline.context.TechDbPipelineContext;
    
    // Créer un context persistant
    Supplier<Connection> connectionSupplier = () -> dataSource.getConnection();
    PersistentPipelineContext context = new TechDbPipelineContext(connectionSupplier);
    
    // Utiliser avec le builder
    PipelineV2<Order, Result> pipeline = PipelineBuilderV2
        .<Order>create("order-processing")
        .context(context)  // Utilise le context persistant
        .addStage(...)
        .build();
    

    19.2 Reprise après crash

    Au redémarrage, les messages non-ackés sont automatiquement re-traités :

    1. Les messages en status CLAIMED dont le lease a expiré → remis en PENDING
    2. Les stages reprennent leur consommation normalement
    3. Grâce à l’idempotence, les messages déjà traités ne sont pas re-traités

    20. Idempotence (OBLIGATOIRE)

    20.1 Pourquoi ?

    Le modèle at-least-once signifie qu’un message peut être traité plusieurs fois :

    • Crash après traitement mais avant ACK
    • Timeout de lease pendant un traitement long
    • Replay depuis la DLQ

    Chaque stage DOIT être idempotent.

    20.2 Patterns recommandés

    // Pattern 1: Vérifier "déjà traité"
    public Order processOrder(Order order) {
        if (orderRepository.exists(order.getId())) {
            return orderRepository.get(order.getId()); // Déjà traité
        }
        // Traiter...
        return orderRepository.save(order);
    }
    
    // Pattern 2: UPSERT au lieu d'INSERT
    public void saveOrder(Order order) {
        orderRepository.upsert(order); // Idempotent par nature
    }
    
    // Pattern 3: Utiliser executionId comme clé
    public void processWithDedup(Order order, String executionId) {
        String dedupKey = "order:" + order.getId() + ":" + executionId;
        if (kvBus.exists(dedupKey)) {
            return; // Déjà traité pour cette exécution
        }
    
        // Traiter...
    
        kvBus.set(dedupKey, true, Duration.ofDays(7));
    }
    

    20.3 Anti-patterns

    // MAUVAIS: INSERT sans vérification
    db.insert(order); // Échoue si déjà inséré
    
    // MAUVAIS: Compteur incrémental
    counter.increment(); // Compté plusieurs fois en cas de replay
    
    // MAUVAIS: Email sans déduplication
    emailService.send(email); // Envoyé plusieurs fois
    

    21. API REST d’administration

    21.1 Endpoints disponibles

    GET  /admin/pipelines/v2                    → Liste des pipelines
    GET  /admin/pipelines/v2/{name}             → État d'un pipeline
    GET  /admin/pipelines/v2/{name}/stages      → État des stages
    GET  /admin/pipelines/v2/{name}/executions  → Exécutions en cours
    
    GET  /admin/pipelines/v2/{name}/dlq         → Messages en DLQ
    POST /admin/pipelines/v2/{name}/dlq/{id}/replay  → Rejouer un message
    DELETE /admin/pipelines/v2/{name}/dlq/{id}  → Supprimer de la DLQ
    POST /admin/pipelines/v2/{name}/dlq/replay-all   → Rejouer tous
    
    POST /admin/pipelines/v2/{name}/start       → Démarrer le pipeline
    POST /admin/pipelines/v2/{name}/stop        → Arrêter gracieusement
    POST /admin/pipelines/v2/{name}/stop-now    → Arrêter immédiatement
    

    21.2 Exemple de réponse

    GET /admin/pipelines/v2/order-processing
    
    {
      "timestamp": 1735387200000,
      "name": "order-processing",
      "description": "Pipeline de traitement des commandes",
      "running": true,
      "stats": {
        "stage_count": 3,
        "active_executions": 5,
        "total_submitted": 1500,
        "total_completed": 1480,
        "total_failed": 12
      }
    }
    

    22. Métriques Prometheus

    22.1 Métriques exposées

    # Compteurs
    socle_pipeline_v2_messages_total{pipeline="order-processing", stage="validate", status="success"}
    socle_pipeline_v2_messages_total{pipeline="order-processing", stage="validate", status="failed"}
    socle_pipeline_v2_dlq_total{pipeline="order-processing", stage="validate"}
    
    # Gauges
    socle_pipeline_v2_queue_pending{pipeline="order-processing", stage="validate"}
    socle_pipeline_v2_queue_processing{pipeline="order-processing", stage="validate"}
    socle_pipeline_v2_dlq_size{pipeline="order-processing", stage="validate"}
    socle_pipeline_v2_active_executions{pipeline="order-processing"}
    socle_pipeline_v2_running{pipeline="order-processing"}
    
    # Histogrammes (durée de traitement)
    socle_pipeline_v2_stage_duration_seconds{pipeline="order-processing", stage="validate", quantile="0.5"}
    socle_pipeline_v2_stage_duration_seconds{pipeline="order-processing", stage="validate", quantile="0.95"}
    socle_pipeline_v2_stage_duration_seconds{pipeline="order-processing", stage="validate", quantile="0.99"}
    

    22.2 Alertes recommandées

    # DLQ croissante
    - alert: PipelineDLQGrowing
      expr: increase(socle_pipeline_v2_dlq_total[1h]) > 10
      labels:
        severity: warning
    
    # Queue qui s'accumule (backpressure)
    - alert: PipelineQueueBacklog
      expr: socle_pipeline_v2_queue_pending > 1000
      for: 5m
      labels:
        severity: warning
    
    # Pipeline arrêté
    - alert: PipelineStopped
      expr: socle_pipeline_v2_running == 0
      for: 1m
      labels:
        severity: critical
    

    23. Bonnes pratiques V2

    DO

    • Toujours implémenter l’idempotence dans chaque stage
    • Utiliser TechDbPipelineContext en production pour la persistance
    • Configurer des maxRetries appropriés (3-5 pour API externes)
    • Monitorer les métriques DLQ et queue_pending
    • Traiter régulièrement les messages en DLQ (replay ou suppression)

    DON’T

    • Ne jamais supposer qu’un message ne sera traité qu’une fois
    • Ne pas ignorer les messages en DLQ
    • Ne pas mettre de timeout trop courts (risque de faux positifs)
    • Ne pas créer trop de stages (overhead de queues)
    • Ne pas oublier d’appeler pipeline.stop() à l’arrêt de l’application

    24. Migration V1 → V2

    24.1 Changements d’API

    V1 V2
    PipelineEngine.execute(pipeline, input) pipeline.submit(input, executionId)
    Pipeline<I,O> PipelineV2<I,O>
    PipelineBuilder PipelineBuilderV2
    Exécution synchrone Exécution asynchrone

    24.2 Étapes de migration

    1. Rendre les steps idempotents (obligatoire avant migration)
    2. Créer le pipeline avec PipelineBuilderV2
    3. Remplacer execute() par submit()
    4. Ajouter le suivi asynchrone des résultats si nécessaire
    5. Configurer la persistance (TechDbPipelineContext)
    6. Activer les métriques et monitoring DLQ

    25. Configuration Dynamique (YAML)

    Ajouté en version 4.1.0

    25.1 Introduction

    Le Pipeline V2 peut maintenant être configuré entièrement via YAML, permettant :

    • Activation/désactivation des stages sans recompiler
    • Configuration dynamique de la concurrence, timeouts, retries
    • Ajout de nouveaux stages sans modifier le code de la factory
    • Modularité des workers (un worker = un composant Spring)

    25.2 Architecture

    ┌─────────────────────────────────────────────────────────────────┐
    │                        application.yml                           │
    │  cdc:                                                            │
    │    pipeline:                                                     │
    │      stages:                                                     │
    │        - name: filter                                            │
    │          worker: filterWorker                                    │
    │          enabled: true                                           │
    └───────────────────────────────┬─────────────────────────────────┘
                                    │
                                    ▼
    ┌───────────────────────────────────────────────────────────────────┐
    │            PipelineYamlConfig (@ConfigurationProperties)          │
    │                                                                    │
    │  - name: String                                                    │
    │  - stages: List<StageYamlConfig>                                   │
    │  - getEnabledStages(): List<StageYamlConfig>                       │
    └───────────────────────────────┬───────────────────────────────────┘
                                    │
                                    ▼
    ┌───────────────────────────────────────────────────────────────────┐
    │                    DynamicPipelineBuilder                          │
    │                                                                    │
    │  .config(pipelineConfig)                                           │
    │  .workers(Map<String, PipelineStageWorker>)                        │
    │  .build() → PipelineV2<I, O>                                       │
    └───────────────────────────────────────────────────────────────────┘
    

    25.3 Interface PipelineStageWorker

    Chaque stage est implémenté comme un composant Spring implémentant PipelineStageWorker :

    package eu.lmvi.socle.pipeline.v2.worker;
    
    public interface PipelineStageWorker<I, O> {
    
        /**
         * Nom unique du worker (utilisé dans la config YAML)
         */
        String getName();
    
        /**
         * Initialise le worker (chargement ressources, compilation scripts, etc.)
         */
        default void initialize() throws Exception {}
    
        /**
         * Traite un élément d'entrée (DOIT être thread-safe)
         */
        O process(I input) throws Exception;
    
        /**
         * Libère les ressources
         */
        default void shutdown() {}
    
        /**
         * Indique si ce worker est activé
         */
        default boolean isEnabled() { return true; }
    
        /**
         * Description pour le monitoring
         */
        default String getDescription() { return getName(); }
    
        /**
         * Ordre de priorité (plus bas = plus tôt)
         */
        default int getOrder() { return 100; }
    }
    

    25.4 Exemple d’implémentation

    @Component("filterWorker")
    public class FilterStageWorker implements PipelineStageWorker<WalEvent, PipelineMessage> {
    
        @Autowired
        private TableFilter tableFilter;
    
        @Override
        public String getName() {
            return "filter";
        }
    
        @Override
        public int getOrder() {
            return 10;  // Premier dans le pipeline
        }
    
        @Override
        public PipelineMessage process(WalEvent event) throws Exception {
            PipelineMessage message = new PipelineMessage(event);
            if (!tableFilter.isAllowed(event.getTable())) {
                message.markAsFiltered("table_not_in_whitelist");
            }
            return message;
        }
    }
    

    25.5 Configuration YAML

    cdc:
      pipeline:
        name: cdc-pipeline-v2
        description: Pipeline CDC PostgreSQL vers Kafka
        enabled: true
        stages:
          # Stage 1: Filtrage
          - name: filter
            worker: filterWorker        # Nom du bean Spring
            enabled: true
            concurrency: 1
            timeout-seconds: 5
            max-retries: 1
            order: 10
    
          # Stage 2: Transformation
          - name: transform
            worker: transformWorker
            enabled: true
            concurrency: 2
            timeout-seconds: 30
            order: 20
    
          # Stage 3a: Enrichissement Rules
          - name: enrich-rules
            worker: rulesEnrichmentWorker
            enabled: true
            concurrency: 2
            order: 30
    
          # Stage 3b: Enrichissement BeanShell
          - name: enrich-beanshell
            worker: beanshellEnrichmentWorker
            enabled: ${CDC_ENRICH_BSH_ENABLED:true}  # Variable d'environnement
            concurrency: 2
            order: 31
    
          # Stage 3c: Enrichissement JavaScript
          - name: enrich-javascript
            worker: javascriptEnrichmentWorker
            enabled: ${CDC_ENRICH_JS_ENABLED:true}
            concurrency: 4
            timeout-seconds: 60
            order: 32
    
          # Stage 3d: Enrichissement Janino (nouveau)
          - name: enrich-janino
            worker: janinoEnrichmentWorker
            enabled: ${CDC_ENRICH_JANINO_ENABLED:false}  # Désactivé par défaut
            concurrency: 2
            order: 33
    
          # Stage 4: Publication Kafka
          - name: publish
            worker: publishWorker
            enabled: true
            concurrency: 2
            max-retries: 5
            order: 40
    
          # Stage 5: Commit LSN
          - name: commit
            worker: commitWorker
            enabled: true
            concurrency: 1
            order: 50
    

    25.6 Classe de configuration Spring

    @Configuration
    @ConfigurationProperties(prefix = "cdc.pipeline")
    public class CdcPipelineConfig extends PipelineYamlConfig {
        // Hérite de toutes les propriétés
    }
    

    25.7 Construction du pipeline

    @Component
    public class CdcPipelineFactory {
    
        @Autowired
        private CdcPipelineConfig pipelineConfig;
    
        @Autowired
        private Map<String, PipelineStageWorker<?, ?>> availableWorkers;
    
        @PostConstruct
        public void init() {
            // Log des workers disponibles
            log.info("Available workers: {}", availableWorkers.keySet());
    
            // Construire le pipeline dynamiquement
            cdcPipeline = DynamicPipelineBuilder.create()
                .config(pipelineConfig)
                .workers(availableWorkers)
                .build();
    
            log.info("Pipeline created with {} stages",
                pipelineConfig.countEnabledStages());
        }
    }
    

    25.8 Avantages

    Aspect Avant (hardcodé) Après (YAML)
    Modification Recompilation Redémarrage
    Activation stage Code Java Variable d’env
    Ajout stage Modifier factory Ajouter worker + config
    Configuration Dans le code Centralisée
    Environnements Branches différentes Même code, config différente

    25.9 Activation/Désactivation à chaud

    Via variables d’environnement dans .env :

    # Désactiver BeanShell
    CDC_ENRICH_BSH_ENABLED=false
    
    # Activer Janino
    CDC_ENRICH_JANINO_ENABLED=true
    
    # Redémarrer le service
    sudo systemctl restart cdc-reflet-kafka-v02
    

    25.10 Ajout d’un nouveau type d’enrichissement

    1. Créer le worker :
    @Component("monNouveauWorker")
    public class MonNouveauEnrichmentWorker
        implements PipelineStageWorker<PipelineMessage, PipelineMessage> {
        // ...
    }
    
    1. Ajouter dans application.yml :
    stages:
      - name: enrich-nouveau
        worker: monNouveauWorker
        enabled: ${CDC_ENRICH_NOUVEAU_ENABLED:false}
        order: 34
    
    1. Redémarrer – le pipeline inclura automatiquement le nouveau stage.

    26. Références V2

  • Socle V004 – Supervisor

    Socle V004 – Supervisor

    08 – Supervisor

    Version : 4.0.0 Date : 2025-12-09

    1. Introduction

    Le Supervisor est le composant de supervision du Socle V4. Il surveille l’état de santé des Workers via heartbeats et expose des métriques de santé.

    Fonctionnalités

    • Collecte des heartbeats des Workers
    • Détection des Workers défaillants
    • Agrégation de l’état de santé global
    • Exposition via API REST et métriques
    • Intégration avec SharedDataRegistry

    2. Architecture

    ┌─────────────────────────────────────────────────────────────┐
    │                        Supervisor                            │
    │                                                              │
    │   ┌──────────────────┐    ┌──────────────────┐              │
    │   │ HeartbeatCollector│    │ HealthAggregator │              │
    │   └────────┬─────────┘    └────────┬─────────┘              │
    │            │                       │                         │
    │            ▼                       ▼                         │
    │   ┌──────────────────┐    ┌──────────────────┐              │
    │   │ Worker States    │    │ Global Health    │              │
    │   │ (per worker)     │    │ (aggregated)     │              │
    │   └──────────────────┘    └──────────────────┘              │
    │                                                              │
    └──────────────────────────────────────────────────────────────┘
             │                           │
             ▼                           ▼
    ┌─────────────────┐         ┌─────────────────┐
    │ /admin/health   │         │ /admin/workers  │
    └─────────────────┘         └─────────────────┘
    

    3. Configuration

    3.1 application.yml

    socle:
      supervisor:
        heartbeat-interval-ms: ${SUPERVISOR_HEARTBEAT_MS:10000}
        unhealthy-threshold: ${SUPERVISOR_UNHEALTHY_THRESHOLD:3}
        check-interval-ms: ${SUPERVISOR_CHECK_INTERVAL_MS:5000}
        stale-timeout-ms: ${SUPERVISOR_STALE_TIMEOUT_MS:60000}
    

    3.2 Variables d’environnement

    Variable Description Défaut
    SUPERVISOR_HEARTBEAT_MS Intervalle heartbeat attendu 10000 (10s)
    SUPERVISOR_UNHEALTHY_THRESHOLD Heartbeats manqués avant UNHEALTHY 3
    SUPERVISOR_CHECK_INTERVAL_MS Intervalle de vérification 5000 (5s)
    SUPERVISOR_STALE_TIMEOUT_MS Timeout avant worker STALE 60000 (1min)

    4. Interface Supervisor

    package eu.lmvi.socle.supervisor;
    
    public interface Supervisor {
    
        /**
         * Enregistre un worker dans le supervisor
         */
        void registerWorker(Worker worker);
    
        /**
         * Désenregistre un worker
         */
        void unregisterWorker(String workerName);
    
        /**
         * Reçoit un heartbeat d'un worker
         */
        void heartbeat(String workerName);
    
        /**
         * Reçoit un heartbeat avec métriques
         */
        void heartbeat(String workerName, Map<String, Object> metrics);
    
        /**
         * Récupère l'état d'un worker
         */
        WorkerState getWorkerState(String workerName);
    
        /**
         * Récupère l'état de tous les workers
         */
        Map<String, WorkerState> getAllWorkerStates();
    
        /**
         * Vérifie si un worker est healthy
         */
        boolean isWorkerHealthy(String workerName);
    
        /**
         * Récupère l'état de santé global
         */
        HealthStatus getGlobalHealth();
    
        /**
         * Liste les workers unhealthy
         */
        List<String> getUnhealthyWorkers();
    
        /**
         * Démarre la supervision
         */
        void start();
    
        /**
         * Arrête la supervision
         */
        void stop();
    }
    

    5. États des Workers

    5.1 WorkerState

    package eu.lmvi.socle.supervisor;
    
    public record WorkerState(
        String workerName,
        WorkerStatus status,
        Instant lastHeartbeat,
        int missedHeartbeats,
        Map<String, Object> lastMetrics,
        Instant registeredAt
    ) {
        public boolean isHealthy() {
            return status == WorkerStatus.RUNNING;
        }
    
        public boolean isStale() {
            return status == WorkerStatus.STALE;
        }
    }
    

    5.2 WorkerStatus

    public enum WorkerStatus {
        /**
         * Worker enregistré mais pas encore démarré
         */
        REGISTERED,
    
        /**
         * Worker en cours d'exécution, heartbeats reçus
         */
        RUNNING,
    
        /**
         * Heartbeats manqués mais pas encore timeout
         */
        DEGRADED,
    
        /**
         * Trop de heartbeats manqués, worker considéré unhealthy
         */
        UNHEALTHY,
    
        /**
         * Aucun heartbeat depuis longtemps, worker potentiellement mort
         */
        STALE,
    
        /**
         * Worker arrêté proprement
         */
        STOPPED
    }
    

    5.3 Diagramme d’états

                        ┌────────────────┐
                        │   REGISTERED   │
                        └───────┬────────┘
                                │ first heartbeat
                                ▼
                        ┌────────────────┐
             ┌─────────│    RUNNING     │─────────┐
             │         └───────┬────────┘         │
             │                 │                  │
             │ heartbeat       │ missed           │ stop()
             │ received        │ heartbeat        │
             │                 ▼                  │
             │         ┌────────────────┐         │
             └────────►│   DEGRADED     │         │
                       └───────┬────────┘         │
                               │ threshold        │
                               │ exceeded         │
                               ▼                  │
                       ┌────────────────┐         │
                       │   UNHEALTHY    │         │
                       └───────┬────────┘         │
                               │ stale            │
                               │ timeout          │
                               ▼                  │
                       ┌────────────────┐         │
                       │     STALE      │         │
                       └────────────────┘         │
                                                  │
                       ┌────────────────┐         │
                       │    STOPPED     │◄────────┘
                       └────────────────┘
    

    6. Implémentation

    package eu.lmvi.socle.supervisor;
    
    @Component
    public class DefaultSupervisor implements Supervisor {
    
        private static final Logger log = LoggerFactory.getLogger(DefaultSupervisor.class);
    
        private final SocleConfiguration config;
        private final ConcurrentHashMap<String, WorkerStateInternal> workers = new ConcurrentHashMap<>();
        private final ScheduledExecutorService scheduler;
        private volatile boolean running = false;
    
        public DefaultSupervisor(SocleConfiguration config) {
            this.config = config;
            this.scheduler = Executors.newSingleThreadScheduledExecutor(
                r -> new Thread(r, "supervisor-checker"));
        }
    
        @Override
        public void registerWorker(Worker worker) {
            String name = worker.getName();
            workers.put(name, new WorkerStateInternal(
                name, WorkerStatus.REGISTERED, Instant.now(), 0, Map.of(), Instant.now()));
            log.info("Worker registered: {}", name);
        }
    
        @Override
        public void unregisterWorker(String workerName) {
            WorkerStateInternal state = workers.remove(workerName);
            if (state != null) {
                log.info("Worker unregistered: {}", workerName);
            }
        }
    
        @Override
        public void heartbeat(String workerName) {
            heartbeat(workerName, Map.of());
        }
    
        @Override
        public void heartbeat(String workerName, Map<String, Object> metrics) {
            workers.computeIfPresent(workerName, (name, state) -> {
                log.debug("Heartbeat received: {}", name);
                return new WorkerStateInternal(
                    name,
                    WorkerStatus.RUNNING,
                    Instant.now(),
                    0,
                    metrics,
                    state.registeredAt
                );
            });
        }
    
        @Override
        public WorkerState getWorkerState(String workerName) {
            WorkerStateInternal internal = workers.get(workerName);
            return internal != null ? internal.toPublic() : null;
        }
    
        @Override
        public Map<String, WorkerState> getAllWorkerStates() {
            return workers.entrySet().stream()
                .collect(Collectors.toMap(
                    Map.Entry::getKey,
                    e -> e.getValue().toPublic()
                ));
        }
    
        @Override
        public boolean isWorkerHealthy(String workerName) {
            WorkerStateInternal state = workers.get(workerName);
            return state != null && state.status == WorkerStatus.RUNNING;
        }
    
        @Override
        public HealthStatus getGlobalHealth() {
            if (workers.isEmpty()) {
                return HealthStatus.HEALTHY;
            }
    
            boolean hasUnhealthy = workers.values().stream()
                .anyMatch(s -> s.status == WorkerStatus.UNHEALTHY || s.status == WorkerStatus.STALE);
    
            if (hasUnhealthy) {
                return HealthStatus.UNHEALTHY;
            }
    
            boolean hasDegraded = workers.values().stream()
                .anyMatch(s -> s.status == WorkerStatus.DEGRADED);
    
            if (hasDegraded) {
                return HealthStatus.DEGRADED;
            }
    
            return HealthStatus.HEALTHY;
        }
    
        @Override
        public List<String> getUnhealthyWorkers() {
            return workers.entrySet().stream()
                .filter(e -> e.getValue().status == WorkerStatus.UNHEALTHY
                          || e.getValue().status == WorkerStatus.STALE)
                .map(Map.Entry::getKey)
                .toList();
        }
    
        @Override
        public void start() {
            running = true;
            long checkInterval = config.getSupervisor().getCheckIntervalMs();
            scheduler.scheduleAtFixedRate(
                this::checkWorkers,
                checkInterval,
                checkInterval,
                TimeUnit.MILLISECONDS
            );
            log.info("Supervisor started with check interval: {}ms", checkInterval);
        }
    
        @Override
        public void stop() {
            running = false;
            scheduler.shutdown();
            log.info("Supervisor stopped");
        }
    
        private void checkWorkers() {
            if (!running) return;
    
            long heartbeatInterval = config.getSupervisor().getHeartbeatIntervalMs();
            int unhealthyThreshold = config.getSupervisor().getUnhealthyThreshold();
            long staleTimeout = config.getSupervisor().getStaleTimeoutMs();
    
            Instant now = Instant.now();
    
            workers.replaceAll((name, state) -> {
                if (state.status == WorkerStatus.STOPPED) {
                    return state;
                }
    
                long msSinceLastHeartbeat = Duration.between(state.lastHeartbeat, now).toMillis();
    
                // Stale check
                if (msSinceLastHeartbeat > staleTimeout) {
                    if (state.status != WorkerStatus.STALE) {
                        log.warn("Worker STALE: {} (no heartbeat for {}ms)", name, msSinceLastHeartbeat);
                    }
                    return state.withStatus(WorkerStatus.STALE);
                }
    
                // Missed heartbeat check
                int expectedHeartbeats = (int) (msSinceLastHeartbeat / heartbeatInterval);
                if (expectedHeartbeats > 0) {
                    int newMissedCount = state.missedHeartbeats + 1;
    
                    if (newMissedCount >= unhealthyThreshold) {
                        if (state.status != WorkerStatus.UNHEALTHY) {
                            log.warn("Worker UNHEALTHY: {} (missed {} heartbeats)", name, newMissedCount);
                        }
                        return state.withStatus(WorkerStatus.UNHEALTHY).withMissedCount(newMissedCount);
                    } else {
                        if (state.status == WorkerStatus.RUNNING) {
                            log.info("Worker DEGRADED: {} (missed {} heartbeats)", name, newMissedCount);
                        }
                        return state.withStatus(WorkerStatus.DEGRADED).withMissedCount(newMissedCount);
                    }
                }
    
                return state;
            });
        }
    
        private record WorkerStateInternal(
            String workerName,
            WorkerStatus status,
            Instant lastHeartbeat,
            int missedHeartbeats,
            Map<String, Object> lastMetrics,
            Instant registeredAt
        ) {
            WorkerState toPublic() {
                return new WorkerState(workerName, status, lastHeartbeat, missedHeartbeats, lastMetrics, registeredAt);
            }
    
            WorkerStateInternal withStatus(WorkerStatus newStatus) {
                return new WorkerStateInternal(workerName, newStatus, lastHeartbeat, missedHeartbeats, lastMetrics, registeredAt);
            }
    
            WorkerStateInternal withMissedCount(int newCount) {
                return new WorkerStateInternal(workerName, status, lastHeartbeat, newCount, lastMetrics, registeredAt);
            }
        }
    }
    

    7. Heartbeat depuis les Workers

    7.1 Heartbeat manuel

    @Component
    public class MyWorker implements Worker {
    
        @Autowired
        private Supervisor supervisor;
    
        @Override
        public void doWork() {
            // Envoyer heartbeat avec métriques
            supervisor.heartbeat(getName(), Map.of(
                "processed", processedCount,
                "queueSize", queue.size()
            ));
    
            // Traitement...
        }
    }
    

    7.2 Heartbeat automatique via AbstractWorker

    public abstract class AbstractWorker implements Worker {
    
        @Autowired
        private Supervisor supervisor;
    
        @Override
        public final void doWork() {
            // Heartbeat automatique
            supervisor.heartbeat(getName(), getStats());
    
            // Appel au traitement réel
            doProcess();
        }
    
        protected abstract void doProcess();
    }
    

    7.3 Heartbeat via thread dédié

    @Component
    public class LongRunningWorker implements Worker {
    
        @Autowired
        private Supervisor supervisor;
    
        private ScheduledExecutorService heartbeatExecutor;
    
        @Override
        public void start() {
            heartbeatExecutor = Executors.newSingleThreadScheduledExecutor();
            heartbeatExecutor.scheduleAtFixedRate(
                () -> supervisor.heartbeat(getName()),
                0, 10, TimeUnit.SECONDS
            );
        }
    
        @Override
        public void stop() {
            if (heartbeatExecutor != null) {
                heartbeatExecutor.shutdown();
            }
        }
    
        @Override
        public void doWork() {
            // Long processing - heartbeat handled by separate thread
            processLongTask();
        }
    }
    

    8. API REST

    8.1 Endpoints

    @RestController
    @RequestMapping("/admin")
    public class SupervisorController {
    
        @Autowired
        private Supervisor supervisor;
    
        @GetMapping("/health")
        public ResponseEntity<HealthResponse> health() {
            HealthStatus status = supervisor.getGlobalHealth();
            return ResponseEntity
                .status(status == HealthStatus.HEALTHY ? 200 : 503)
                .body(new HealthResponse(status, supervisor.getUnhealthyWorkers()));
        }
    
        @GetMapping("/workers")
        public Map<String, WorkerState> workers() {
            return supervisor.getAllWorkerStates();
        }
    
        @GetMapping("/workers/{name}")
        public ResponseEntity<WorkerState> worker(@PathVariable String name) {
            WorkerState state = supervisor.getWorkerState(name);
            return state != null
                ? ResponseEntity.ok(state)
                : ResponseEntity.notFound().build();
        }
    }
    

    8.2 Réponses

    // GET /admin/health
    {
      "status": "HEALTHY",
      "unhealthyWorkers": []
    }
    
    // GET /admin/workers
    {
      "kafka-consumer": {
        "workerName": "kafka-consumer",
        "status": "RUNNING",
        "lastHeartbeat": "2025-12-09T10:30:00Z",
        "missedHeartbeats": 0,
        "lastMetrics": {
          "processed": 12345,
          "lag": 23
        },
        "registeredAt": "2025-12-09T10:00:00Z"
      },
      "order-processor": {
        "workerName": "order-processor",
        "status": "DEGRADED",
        "lastHeartbeat": "2025-12-09T10:29:45Z",
        "missedHeartbeats": 1,
        "lastMetrics": {},
        "registeredAt": "2025-12-09T10:00:00Z"
      }
    }
    

    9. Intégration Kubernetes

    9.1 Liveness Probe

    livenessProbe:
      httpGet:
        path: /admin/health
        port: 8080
      initialDelaySeconds: 30
      periodSeconds: 10
      failureThreshold: 3
    

    9.2 Readiness Probe

    readinessProbe:
      httpGet:
        path: /admin/health
        port: 8080
      initialDelaySeconds: 5
      periodSeconds: 5
    

    9.3 Health Controller adapté

    @GetMapping("/health/live")
    public ResponseEntity<Void> live() {
        // Liveness: l'application répond
        return ResponseEntity.ok().build();
    }
    
    @GetMapping("/health/ready")
    public ResponseEntity<Void> ready() {
        // Readiness: tous les workers sont healthy
        HealthStatus status = supervisor.getGlobalHealth();
        return status == HealthStatus.HEALTHY
            ? ResponseEntity.ok().build()
            : ResponseEntity.status(503).build();
    }
    

    10. Métriques Prometheus

    @Component
    public class SupervisorMetrics {
    
        private final Supervisor supervisor;
        private final MeterRegistry registry;
    
        @PostConstruct
        public void registerMetrics() {
            Gauge.builder("socle_workers_total", supervisor,
                s -> s.getAllWorkerStates().size())
                .register(registry);
    
            Gauge.builder("socle_workers_healthy", supervisor,
                s -> s.getAllWorkerStates().values().stream()
                    .filter(WorkerState::isHealthy).count())
                .register(registry);
    
            Gauge.builder("socle_workers_unhealthy", supervisor,
                s -> s.getUnhealthyWorkers().size())
                .register(registry);
        }
    }
    

    11. Bonnes pratiques

    DO

    • Envoyer des heartbeats réguliers depuis tous les workers actifs
    • Inclure des métriques utiles dans les heartbeats
    • Configurer des timeouts adaptés à vos workers
    • Utiliser les probes Kubernetes pour la haute disponibilité

    DON’T

    • Ne pas oublier d’envoyer des heartbeats dans les workers long-running
    • Ne pas ignorer les états DEGRADED
    • Ne pas configurer des timeouts trop courts (faux positifs)
    • Ne pas bloquer l’envoi de heartbeat avec du traitement lourd

    12. Références

  • Socle V004 – Pipeline

    Socle V004 – Pipeline

    09 – Pipeline Engine

    Version : 4.1.0 Date : 2025-12-28

    1. Introduction

    Le PipelineEngine permet de définir des chaînes de traitement composées d’étapes (steps) exécutées séquentiellement ou en parallèle.

    Caractéristiques

    • Définition déclarative des pipelines
    • Exécution séquentielle ou parallèle
    • Gestion des erreurs et retry
    • Context partagé entre étapes
    • Métriques et logging intégrés

    2. Architecture

    ┌─────────────────────────────────────────────────────────────┐
    │                      PipelineEngine                          │
    └─────────────────────────────────────────────────────────────┘
                                  │
                                  ▼
    ┌─────────────────────────────────────────────────────────────┐
    │                      Pipeline                                │
    │  ┌─────────────────────────────────────────────────────┐    │
    │  │              PipelineContext                         │    │
    │  │  - input data                                        │    │
    │  │  - shared state                                      │    │
    │  │  - results                                           │    │
    │  └─────────────────────────────────────────────────────┘    │
    │                                                              │
    │  ┌──────────┐   ┌──────────┐   ┌──────────┐                │
    │  │  Step 1  │──►│  Step 2  │──►│  Step 3  │                │
    │  └──────────┘   └──────────┘   └──────────┘                │
    │                                                              │
    └─────────────────────────────────────────────────────────────┘
    

    3. Interface Pipeline

    package eu.lmvi.socle.pipeline;
    
    public interface Pipeline<I, O> {
    
        /**
         * Nom du pipeline
         */
        String getName();
    
        /**
         * Exécute le pipeline
         */
        PipelineResult<O> execute(I input);
    
        /**
         * Exécute le pipeline avec context
         */
        PipelineResult<O> execute(I input, PipelineContext context);
    
        /**
         * Liste des étapes
         */
        List<PipelineStep<?, ?>> getSteps();
    }
    

    4. Interface PipelineStep

    package eu.lmvi.socle.pipeline;
    
    public interface PipelineStep<I, O> {
    
        /**
         * Nom de l'étape
         */
        String getName();
    
        /**
         * Exécute l'étape
         */
        StepResult<O> execute(I input, PipelineContext context);
    
        /**
         * L'étape peut-elle être retryée ?
         */
        default boolean isRetryable() {
            return true;
        }
    
        /**
         * Nombre max de retries
         */
        default int getMaxRetries() {
            return 3;
        }
    
        /**
         * L'étape est-elle optionnelle ?
         */
        default boolean isOptional() {
            return false;
        }
    }
    

    5. Context et Result

    5.1 PipelineContext

    package eu.lmvi.socle.pipeline;
    
    public class PipelineContext {
        private final Map<String, Object> attributes = new ConcurrentHashMap<>();
        private final List<StepResult<?>> stepResults = new ArrayList<>();
        private final String correlationId;
        private final Instant startTime;
    
        public PipelineContext() {
            this.correlationId = UUID.randomUUID().toString();
            this.startTime = Instant.now();
        }
    
        public void put(String key, Object value) {
            attributes.put(key, value);
        }
    
        public <T> Optional<T> get(String key, Class<T> type) {
            return Optional.ofNullable(attributes.get(key))
                .filter(type::isInstance)
                .map(type::cast);
        }
    
        public void addStepResult(StepResult<?> result) {
            stepResults.add(result);
        }
    
        public List<StepResult<?>> getStepResults() {
            return Collections.unmodifiableList(stepResults);
        }
    
        public String getCorrelationId() {
            return correlationId;
        }
    
        public Duration getElapsedTime() {
            return Duration.between(startTime, Instant.now());
        }
    }
    

    5.2 StepResult

    package eu.lmvi.socle.pipeline;
    
    public record StepResult<T>(
        String stepName,
        StepStatus status,
        T output,
        Exception error,
        Duration duration,
        int attempts
    ) {
        public boolean isSuccess() {
            return status == StepStatus.SUCCESS;
        }
    
        public boolean isFailure() {
            return status == StepStatus.FAILURE;
        }
    
        public boolean isSkipped() {
            return status == StepStatus.SKIPPED;
        }
    
        public static <T> StepResult<T> success(String name, T output, Duration duration) {
            return new StepResult<>(name, StepStatus.SUCCESS, output, null, duration, 1);
        }
    
        public static <T> StepResult<T> failure(String name, Exception error, Duration duration, int attempts) {
            return new StepResult<>(name, StepStatus.FAILURE, null, error, duration, attempts);
        }
    
        public static <T> StepResult<T> skipped(String name) {
            return new StepResult<>(name, StepStatus.SKIPPED, null, null, Duration.ZERO, 0);
        }
    }
    
    public enum StepStatus {
        SUCCESS,
        FAILURE,
        SKIPPED
    }
    

    5.3 PipelineResult

    package eu.lmvi.socle.pipeline;
    
    public record PipelineResult<T>(
        String pipelineName,
        PipelineStatus status,
        T output,
        PipelineContext context,
        Duration totalDuration
    ) {
        public boolean isSuccess() {
            return status == PipelineStatus.SUCCESS;
        }
    
        public List<StepResult<?>> getFailedSteps() {
            return context.getStepResults().stream()
                .filter(StepResult::isFailure)
                .toList();
        }
    }
    
    public enum PipelineStatus {
        SUCCESS,
        PARTIAL_SUCCESS,
        FAILURE
    }
    

    6. Implémentation

    6.1 DefaultPipelineEngine

    package eu.lmvi.socle.pipeline;
    
    @Component
    public class DefaultPipelineEngine implements PipelineEngine {
    
        private static final Logger log = LoggerFactory.getLogger(DefaultPipelineEngine.class);
    
        @Override
        public <I, O> PipelineResult<O> execute(Pipeline<I, O> pipeline, I input) {
            return execute(pipeline, input, new PipelineContext());
        }
    
        @Override
        @SuppressWarnings("unchecked")
        public <I, O> PipelineResult<O> execute(Pipeline<I, O> pipeline, I input, PipelineContext context) {
            log.info("[{}] Starting pipeline execution", pipeline.getName());
            Instant start = Instant.now();
    
            Object currentInput = input;
            O finalOutput = null;
            boolean hasFailure = false;
    
            for (PipelineStep<?, ?> step : pipeline.getSteps()) {
                PipelineStep<Object, Object> typedStep = (PipelineStep<Object, Object>) step;
    
                StepResult<Object> result = executeStep(typedStep, currentInput, context);
                context.addStepResult(result);
    
                if (result.isSuccess()) {
                    currentInput = result.output();
                    finalOutput = (O) result.output();
                } else if (result.isFailure()) {
                    if (!step.isOptional()) {
                        hasFailure = true;
                        log.error("[{}] Pipeline failed at step: {}", pipeline.getName(), step.getName());
                        break;
                    }
                    log.warn("[{}] Optional step failed, continuing: {}", pipeline.getName(), step.getName());
                }
            }
    
            Duration duration = Duration.between(start, Instant.now());
            PipelineStatus status = hasFailure
                ? PipelineStatus.FAILURE
                : (context.getStepResults().stream().anyMatch(StepResult::isFailure)
                    ? PipelineStatus.PARTIAL_SUCCESS
                    : PipelineStatus.SUCCESS);
    
            log.info("[{}] Pipeline completed with status: {} in {}ms",
                pipeline.getName(), status, duration.toMillis());
    
            return new PipelineResult<>(pipeline.getName(), status, finalOutput, context, duration);
        }
    
        private <I, O> StepResult<O> executeStep(PipelineStep<I, O> step, I input, PipelineContext context) {
            log.debug("[{}] Executing step", step.getName());
            Instant start = Instant.now();
            int attempts = 0;
            Exception lastError = null;
    
            while (attempts < step.getMaxRetries()) {
                attempts++;
                try {
                    StepResult<O> result = step.execute(input, context);
                    if (result.isSuccess()) {
                        Duration duration = Duration.between(start, Instant.now());
                        log.debug("[{}] Step completed in {}ms", step.getName(), duration.toMillis());
                        return StepResult.success(step.getName(), result.output(), duration);
                    }
                    lastError = result.error();
                } catch (Exception e) {
                    lastError = e;
                    log.warn("[{}] Step attempt {} failed: {}", step.getName(), attempts, e.getMessage());
    
                    if (!step.isRetryable() || attempts >= step.getMaxRetries()) {
                        break;
                    }
    
                    // Backoff
                    try {
                        Thread.sleep(attempts * 1000L);
                    } catch (InterruptedException ie) {
                        Thread.currentThread().interrupt();
                        break;
                    }
                }
            }
    
            Duration duration = Duration.between(start, Instant.now());
            return StepResult.failure(step.getName(), lastError, duration, attempts);
        }
    }
    

    6.2 Pipeline Builder

    package eu.lmvi.socle.pipeline;
    
    public class PipelineBuilder<I, O> {
    
        private final String name;
        private final List<PipelineStep<?, ?>> steps = new ArrayList<>();
    
        public PipelineBuilder(String name) {
            this.name = name;
        }
    
        public static <I, O> PipelineBuilder<I, O> create(String name) {
            return new PipelineBuilder<>(name);
        }
    
        public <NO> PipelineBuilder<I, NO> addStep(PipelineStep<O, NO> step) {
            steps.add(step);
            return (PipelineBuilder<I, NO>) this;
        }
    
        public <NO> PipelineBuilder<I, NO> addStep(String name, Function<O, NO> processor) {
            return addStep(new FunctionalStep<>(name, processor));
        }
    
        public Pipeline<I, O> build() {
            return new DefaultPipeline<>(name, List.copyOf(steps));
        }
    
        private record FunctionalStep<I, O>(String name, Function<I, O> processor) implements PipelineStep<I, O> {
    
            @Override
            public String getName() {
                return name;
            }
    
            @Override
            public StepResult<O> execute(I input, PipelineContext context) {
                O output = processor.apply(input);
                return StepResult.success(name, output, Duration.ZERO);
            }
        }
    }
    

    7. Utilisation

    7.1 Pipeline simple

    @Component
    public class OrderPipeline {
    
        @Autowired
        private PipelineEngine engine;
    
        public PipelineResult<OrderResult> processOrder(Order order) {
            Pipeline<Order, OrderResult> pipeline = PipelineBuilder
                .<Order, OrderResult>create("order-processing")
                .addStep("validate", this::validateOrder)
                .addStep("enrich", this::enrichOrder)
                .addStep("process", this::processOrder)
                .addStep("notify", this::notifyCustomer)
                .build();
    
            return engine.execute(pipeline, order);
        }
    
        private ValidatedOrder validateOrder(Order order) {
            // Validation...
            return new ValidatedOrder(order);
        }
    
        private EnrichedOrder enrichOrder(ValidatedOrder order) {
            // Enrichissement...
            return new EnrichedOrder(order);
        }
    
        private ProcessedOrder processOrder(EnrichedOrder order) {
            // Traitement...
            return new ProcessedOrder(order);
        }
    
        private OrderResult notifyCustomer(ProcessedOrder order) {
            // Notification...
            return new OrderResult(order, "SUCCESS");
        }
    }
    

    7.2 Étapes personnalisées

    public class ValidationStep implements PipelineStep<Order, ValidatedOrder> {
    
        @Override
        public String getName() {
            return "validation";
        }
    
        @Override
        public StepResult<ValidatedOrder> execute(Order input, PipelineContext context) {
            List<String> errors = new ArrayList<>();
    
            if (input.getCustomerId() == null) {
                errors.add("Missing customer ID");
            }
            if (input.getItems().isEmpty()) {
                errors.add("No items in order");
            }
    
            if (!errors.isEmpty()) {
                return StepResult.failure(getName(),
                    new ValidationException(errors),
                    Duration.ZERO, 1);
            }
    
            // Stocker des données dans le context
            context.put("customerId", input.getCustomerId());
    
            return StepResult.success(getName(), new ValidatedOrder(input), Duration.ZERO);
        }
    
        @Override
        public boolean isRetryable() {
            return false;  // Validation ne doit pas être retryée
        }
    }
    

    7.3 Étape optionnelle

    public class NotificationStep implements PipelineStep<ProcessedOrder, ProcessedOrder> {
    
        @Override
        public String getName() {
            return "notification";
        }
    
        @Override
        public StepResult<ProcessedOrder> execute(ProcessedOrder input, PipelineContext context) {
            try {
                sendNotification(input);
                return StepResult.success(getName(), input, Duration.ZERO);
            } catch (Exception e) {
                return StepResult.failure(getName(), e, Duration.ZERO, 1);
            }
        }
    
        @Override
        public boolean isOptional() {
            return true;  // Le pipeline continue même si la notif échoue
        }
    }
    

    7.4 Utilisation du context

    public class EnrichmentStep implements PipelineStep<ValidatedOrder, EnrichedOrder> {
    
        @Autowired
        private CustomerService customerService;
    
        @Override
        public String getName() {
            return "enrichment";
        }
    
        @Override
        public StepResult<EnrichedOrder> execute(ValidatedOrder input, PipelineContext context) {
            // Lire depuis le context
            String customerId = context.get("customerId", String.class).orElseThrow();
    
            // Enrichir
            Customer customer = customerService.getCustomer(customerId);
            EnrichedOrder enriched = new EnrichedOrder(input, customer);
    
            // Écrire dans le context pour les étapes suivantes
            context.put("customerEmail", customer.getEmail());
    
            return StepResult.success(getName(), enriched, Duration.ZERO);
        }
    }
    

    8. Gestion des erreurs

    8.1 Retry automatique

    public class ExternalApiStep implements PipelineStep<Data, ApiResponse> {
    
        @Override
        public String getName() {
            return "external-api-call";
        }
    
        @Override
        public boolean isRetryable() {
            return true;
        }
    
        @Override
        public int getMaxRetries() {
            return 5;  // 5 tentatives max
        }
    
        @Override
        public StepResult<ApiResponse> execute(Data input, PipelineContext context) {
            // Appel API qui peut échouer
            ApiResponse response = callApi(input);
            return StepResult.success(getName(), response, Duration.ZERO);
        }
    }
    

    8.2 Traitement des résultats

    PipelineResult<OrderResult> result = engine.execute(pipeline, order);
    
    if (result.isSuccess()) {
        log.info("Order processed successfully: {}", result.output());
    } else {
        // Analyser les étapes en échec
        for (StepResult<?> stepResult : result.getFailedSteps()) {
            log.error("Step {} failed after {} attempts: {}",
                stepResult.stepName(),
                stepResult.attempts(),
                stepResult.error().getMessage());
        }
    
        // Décider quoi faire
        if (result.status() == PipelineStatus.PARTIAL_SUCCESS) {
            // Certaines étapes optionnelles ont échoué
            handlePartialSuccess(result);
        } else {
            // Échec complet
            handleFailure(result);
        }
    }
    

    9. Pipelines parallèles

    public class ParallelEnrichmentStep implements PipelineStep<Order, EnrichedOrder> {
    
        @Autowired
        private ExecutorService executor;
    
        @Override
        public StepResult<EnrichedOrder> execute(Order input, PipelineContext context) {
            // Exécuter plusieurs enrichissements en parallèle
            CompletableFuture<Customer> customerFuture = CompletableFuture.supplyAsync(
                () -> customerService.getCustomer(input.getCustomerId()), executor);
    
            CompletableFuture<Inventory> inventoryFuture = CompletableFuture.supplyAsync(
                () -> inventoryService.checkInventory(input.getItems()), executor);
    
            CompletableFuture<Pricing> pricingFuture = CompletableFuture.supplyAsync(
                () -> pricingService.calculatePrice(input), executor);
    
            try {
                CompletableFuture.allOf(customerFuture, inventoryFuture, pricingFuture).join();
    
                EnrichedOrder enriched = new EnrichedOrder(
                    input,
                    customerFuture.get(),
                    inventoryFuture.get(),
                    pricingFuture.get()
                );
    
                return StepResult.success(getName(), enriched, Duration.ZERO);
            } catch (Exception e) {
                return StepResult.failure(getName(), e, Duration.ZERO, 1);
            }
        }
    }
    

    10. Métriques

    @Component
    public class PipelineMetrics {
    
        private final MeterRegistry registry;
    
        public void recordPipelineExecution(PipelineResult<?> result) {
            Timer.builder("socle_pipeline_duration")
                .tag("pipeline", result.pipelineName())
                .tag("status", result.status().name())
                .register(registry)
                .record(result.totalDuration());
    
            Counter.builder("socle_pipeline_executions")
                .tag("pipeline", result.pipelineName())
                .tag("status", result.status().name())
                .register(registry)
                .increment();
        }
    
        public void recordStepExecution(StepResult<?> result) {
            Timer.builder("socle_pipeline_step_duration")
                .tag("step", result.stepName())
                .tag("status", result.status().name())
                .register(registry)
                .record(result.duration());
        }
    }
    

    11. Bonnes pratiques

    DO

    • Garder les étapes petites et focalisées
    • Utiliser le context pour partager des données entre étapes
    • Marquer les étapes non-critiques comme optionnelles
    • Logger au niveau des étapes pour le debugging
    • Configurer des retries appropriés pour les appels externes

    DON’T

    • Ne pas faire de pipelines avec trop d’étapes (> 10)
    • Ne pas mélanger logique métier et infrastructure dans une étape
    • Ne pas ignorer les erreurs des étapes obligatoires
    • Ne pas utiliser de pipelines pour du traitement simple

    12. Références

    Pipeline Engine V2 (Nouveau)

    Ajouté en version 4.1.0

    13. Introduction Pipeline V2

    Le Pipeline V2 est une refonte complète du système de pipelines, conçue pour les applications à haute disponibilité nécessitant :

    • Garantie at-least-once : Aucune perte de message grâce au pattern Queue/Claim/Ack
    • Virtual Threads (Java 21) : Scalabilité maximale avec les threads virtuels
    • Reprise après crash : État persisté permettant la reprise au dernier stage OK
    • DLQ (Dead Letter Queue) : Gestion des échecs définitifs avec replay

    Quand utiliser V2 vs V1 ?

    Critère Pipeline V1 Pipeline V2
    Exécution Synchrone, mono-thread Asynchrone, multi-thread
    Garantie de livraison At-most-once At-least-once
    Persistance Non Oui (TechDB / H2)
    DLQ Non Oui
    Use case Traitement simple Flux critiques, haute disponibilité

    14. Architecture V2

    ┌─────────────────────────────────────────────────────────────────┐
    │                            MOP                                   │
    │               (supervision, restart, health)                     │
    └───────────────────────────┬─────────────────────────────────────┘
                                │
    ┌───────────────────────────▼─────────────────────────────────────┐
    │                     PipelineV2                                   │
    │                                                                  │
    │  ┌────────────┐     ┌────────────┐     ┌────────────┐          │
    │  │  Stage A   │────►│  Stage B   │────►│  Stage C   │          │
    │  │ ┌────────┐ │     │ ┌────────┐ │     │ ┌────────┐ │          │
    │  │ │ Queue  │ │     │ │ Queue  │ │     │ │ Queue  │ │          │
    │  │ └────────┘ │     │ └────────┘ │     │ └────────┘ │          │
    │  │ VThreads  │     │ VThreads  │     │ VThreads  │          │
    │  └────────────┘     └────────────┘     └────────────┘          │
    │                                                                  │
    │         ┌─────────────────┴─────────────────┐                   │
    │    ┌────▼────┐                        ┌────▼────┐              │
    │    │ Context │                        │   DLQ   │              │
    │    │ (state) │                        │ (errors)│              │
    │    └─────────┘                        └─────────┘              │
    └─────────────────────────────────────────────────────────────────┘
    

    Composants

    • StageQueue : Queue avec sémantique Claim/Ack entre chaque stage
    • VThreadStageExecutor : Workers Virtual Threads consommant les queues
    • PersistentPipelineContext : État persisté pour reprise après crash
    • DLQ : Messages en échec après max retries

    15. Création d’un Pipeline V2

    15.1 Builder Fluent

    import eu.lmvi.socle.pipeline.v2.PipelineBuilderV2;
    import eu.lmvi.socle.pipeline.v2.PipelineV2;
    
    // Création du pipeline
    PipelineV2<Order, Result> pipeline = PipelineBuilderV2
        .<Order>create("order-processing")
        .description("Pipeline de traitement des commandes")
    
        // Stage 1: Validation (2 workers, timeout 30s)
        .addStage("validate", this::validateOrder)
            .concurrency(2)
            .timeout(Duration.ofSeconds(30))
    
        // Stage 2: Enrichissement (4 workers, queue 1000)
        .addStage("enrich", this::enrichOrder)
            .concurrency(4)
            .queueSize(1000)
    
        // Stage 3: Publication (2 workers)
        .addStage("publish", this::publishOrder)
            .concurrency(2)
            .maxRetries(5)
            .backoff(Duration.ofMillis(200), 2.0)
    
        .build();
    

    15.2 Configuration par stage

    .addStage("nom", processor)
        .concurrency(4)           // Nombre de workers parallèles
        .queueSize(500)           // Taille max de la queue
        .timeout(Duration.ofMinutes(5))  // Timeout par message
        .maxRetries(3)            // Tentatives avant DLQ
        .backoff(base, multiplier) // Backoff exponentiel
        .optional()               // Stage optionnel (échec non bloquant)
    

    16. Cycle de vie

    16.1 Démarrage/Arrêt

    // Démarrer le pipeline (lance tous les workers)
    pipeline.start();
    
    // Vérifier l'état
    boolean running = pipeline.isRunning();
    
    // Arrêt gracieux (attend la fin des traitements en cours)
    pipeline.stop();
    
    // Arrêt immédiat (interrompt tout)
    pipeline.stopNow();
    

    16.2 Soumission de messages

    // Soumettre un message
    String messageId = pipeline.submit(order, "exec-001");
    
    // Avec métadonnées
    String messageId = pipeline.submit(order, "exec-001", Map.of(
        "source", "api",
        "priority", "high"
    ));
    

    16.3 Suivi d’exécution

    // État d'une exécution
    Optional<PipelineState> state = pipeline.getExecutionState("exec-001");
    state.ifPresent(s -> {
        System.out.println("Status: " + s.status());
        System.out.println("Current stage: " + s.currentStage());
        System.out.println("Duration: " + s.duration());
    });
    
    // Liste des exécutions actives
    List<PipelineState> active = pipeline.listActiveExecutions();
    
    // Statistiques globales
    PipelineV2.PipelineStats stats = pipeline.getStats();
    System.out.println("Submitted: " + stats.totalSubmitted());
    System.out.println("Completed: " + stats.totalCompleted());
    System.out.println("Failed: " + stats.totalFailed());
    

    17. Queue/Claim/Ack

    17.1 Pattern de consommation

    Le Pipeline V2 utilise un pattern de consommation fiable :

    1. CLAIM   : Le worker réserve un message (lease)
    2. PROCESS : Le worker traite le message
    3. ACK     : Succès → message supprimé
       ou
       NACK    : Échec → retry ou DLQ
    

    17.2 Lease et récupération

    Si un worker crash pendant le traitement :

    • Le lease expire après le timeout configuré
    • Le message est automatiquement remis en queue
    • Un autre worker le reprend
    // Timeout de lease par défaut: 5 minutes
    // Configurable par stage via timeout()
    

    17.3 Implémentations disponibles

    Implémentation Persistance Use case
    InMemoryStageQueue Non Développement, tests
    TechDbStageQueue Oui (H2) Production

    18. Dead Letter Queue (DLQ)

    18.1 Envoi en DLQ

    Un message est envoyé en DLQ après :

    • maxRetries tentatives échouées
    • Une erreur non-retryable

    18.2 Consultation

    // Via API REST
    GET /admin/pipelines/v2/{name}/dlq
    
    // Programmatiquement (via StageQueue)
    List<DlqMessage<Order>> messages = queue.peekDlq(100);
    for (DlqMessage<Order> msg : messages) {
        System.out.println("ID: " + msg.messageId());
        System.out.println("Error: " + msg.errorMessage());
        System.out.println("Attempts: " + msg.attempts());
        System.out.println("Failed at: " + msg.failedAt());
    }
    

    18.3 Replay

    // Via API REST
    POST /admin/pipelines/v2/{name}/dlq/{messageId}/replay
    
    // Programmatiquement
    boolean replayed = queue.replayFromDlq(messageId);
    
    // Replay tous les messages
    int count = queue.replayAllFromDlq("exec-001");
    

    18.4 Suppression

    // Via API REST
    DELETE /admin/pipelines/v2/{name}/dlq/{messageId}
    
    // Programmatiquement
    queue.deleteFromDlq(messageId);
    
    // Purge complète
    queue.purgeDlq();
    

    19. Persistance et reprise

    19.1 Context persistant

    Le Pipeline V2 peut persister son état pour survivre aux redémarrages :

    import eu.lmvi.socle.pipeline.context.TechDbPipelineContext;
    
    // Créer un context persistant
    Supplier<Connection> connectionSupplier = () -> dataSource.getConnection();
    PersistentPipelineContext context = new TechDbPipelineContext(connectionSupplier);
    
    // Utiliser avec le builder
    PipelineV2<Order, Result> pipeline = PipelineBuilderV2
        .<Order>create("order-processing")
        .context(context)  // Utilise le context persistant
        .addStage(...)
        .build();
    

    19.2 Reprise après crash

    Au redémarrage, les messages non-ackés sont automatiquement re-traités :

    1. Les messages en status CLAIMED dont le lease a expiré → remis en PENDING
    2. Les stages reprennent leur consommation normalement
    3. Grâce à l’idempotence, les messages déjà traités ne sont pas re-traités

    20. Idempotence (OBLIGATOIRE)

    20.1 Pourquoi ?

    Le modèle at-least-once signifie qu’un message peut être traité plusieurs fois :

    • Crash après traitement mais avant ACK
    • Timeout de lease pendant un traitement long
    • Replay depuis la DLQ

    Chaque stage DOIT être idempotent.

    20.2 Patterns recommandés

    // Pattern 1: Vérifier "déjà traité"
    public Order processOrder(Order order) {
        if (orderRepository.exists(order.getId())) {
            return orderRepository.get(order.getId()); // Déjà traité
        }
        // Traiter...
        return orderRepository.save(order);
    }
    
    // Pattern 2: UPSERT au lieu d'INSERT
    public void saveOrder(Order order) {
        orderRepository.upsert(order); // Idempotent par nature
    }
    
    // Pattern 3: Utiliser executionId comme clé
    public void processWithDedup(Order order, String executionId) {
        String dedupKey = "order:" + order.getId() + ":" + executionId;
        if (kvBus.exists(dedupKey)) {
            return; // Déjà traité pour cette exécution
        }
    
        // Traiter...
    
        kvBus.set(dedupKey, true, Duration.ofDays(7));
    }
    

    20.3 Anti-patterns

    // MAUVAIS: INSERT sans vérification
    db.insert(order); // Échoue si déjà inséré
    
    // MAUVAIS: Compteur incrémental
    counter.increment(); // Compté plusieurs fois en cas de replay
    
    // MAUVAIS: Email sans déduplication
    emailService.send(email); // Envoyé plusieurs fois
    

    21. API REST d’administration

    21.1 Endpoints disponibles

    GET  /admin/pipelines/v2                    → Liste des pipelines
    GET  /admin/pipelines/v2/{name}             → État d'un pipeline
    GET  /admin/pipelines/v2/{name}/stages      → État des stages
    GET  /admin/pipelines/v2/{name}/executions  → Exécutions en cours
    
    GET  /admin/pipelines/v2/{name}/dlq         → Messages en DLQ
    POST /admin/pipelines/v2/{name}/dlq/{id}/replay  → Rejouer un message
    DELETE /admin/pipelines/v2/{name}/dlq/{id}  → Supprimer de la DLQ
    POST /admin/pipelines/v2/{name}/dlq/replay-all   → Rejouer tous
    
    POST /admin/pipelines/v2/{name}/start       → Démarrer le pipeline
    POST /admin/pipelines/v2/{name}/stop        → Arrêter gracieusement
    POST /admin/pipelines/v2/{name}/stop-now    → Arrêter immédiatement
    

    21.2 Exemple de réponse

    GET /admin/pipelines/v2/order-processing
    
    {
      "timestamp": 1735387200000,
      "name": "order-processing",
      "description": "Pipeline de traitement des commandes",
      "running": true,
      "stats": {
        "stage_count": 3,
        "active_executions": 5,
        "total_submitted": 1500,
        "total_completed": 1480,
        "total_failed": 12
      }
    }
    

    22. Métriques Prometheus

    22.1 Métriques exposées

    # Compteurs
    socle_pipeline_v2_messages_total{pipeline="order-processing", stage="validate", status="success"}
    socle_pipeline_v2_messages_total{pipeline="order-processing", stage="validate", status="failed"}
    socle_pipeline_v2_dlq_total{pipeline="order-processing", stage="validate"}
    
    # Gauges
    socle_pipeline_v2_queue_pending{pipeline="order-processing", stage="validate"}
    socle_pipeline_v2_queue_processing{pipeline="order-processing", stage="validate"}
    socle_pipeline_v2_dlq_size{pipeline="order-processing", stage="validate"}
    socle_pipeline_v2_active_executions{pipeline="order-processing"}
    socle_pipeline_v2_running{pipeline="order-processing"}
    
    # Histogrammes (durée de traitement)
    socle_pipeline_v2_stage_duration_seconds{pipeline="order-processing", stage="validate", quantile="0.5"}
    socle_pipeline_v2_stage_duration_seconds{pipeline="order-processing", stage="validate", quantile="0.95"}
    socle_pipeline_v2_stage_duration_seconds{pipeline="order-processing", stage="validate", quantile="0.99"}
    

    22.2 Alertes recommandées

    # DLQ croissante
    - alert: PipelineDLQGrowing
      expr: increase(socle_pipeline_v2_dlq_total[1h]) > 10
      labels:
        severity: warning
    
    # Queue qui s'accumule (backpressure)
    - alert: PipelineQueueBacklog
      expr: socle_pipeline_v2_queue_pending > 1000
      for: 5m
      labels:
        severity: warning
    
    # Pipeline arrêté
    - alert: PipelineStopped
      expr: socle_pipeline_v2_running == 0
      for: 1m
      labels:
        severity: critical
    

    23. Bonnes pratiques V2

    DO

    • Toujours implémenter l’idempotence dans chaque stage
    • Utiliser TechDbPipelineContext en production pour la persistance
    • Configurer des maxRetries appropriés (3-5 pour API externes)
    • Monitorer les métriques DLQ et queue_pending
    • Traiter régulièrement les messages en DLQ (replay ou suppression)

    DON’T

    • Ne jamais supposer qu’un message ne sera traité qu’une fois
    • Ne pas ignorer les messages en DLQ
    • Ne pas mettre de timeout trop courts (risque de faux positifs)
    • Ne pas créer trop de stages (overhead de queues)
    • Ne pas oublier d’appeler pipeline.stop() à l’arrêt de l’application

    24. Migration V1 → V2

    24.1 Changements d’API

    V1 V2
    PipelineEngine.execute(pipeline, input) pipeline.submit(input, executionId)
    Pipeline<I,O> PipelineV2<I,O>
    PipelineBuilder PipelineBuilderV2
    Exécution synchrone Exécution asynchrone

    24.2 Étapes de migration

    1. Rendre les steps idempotents (obligatoire avant migration)
    2. Créer le pipeline avec PipelineBuilderV2
    3. Remplacer execute() par submit()
    4. Ajouter le suivi asynchrone des résultats si nécessaire
    5. Configurer la persistance (TechDbPipelineContext)
    6. Activer les métriques et monitoring DLQ

    25. Configuration Dynamique (YAML)

    Ajouté en version 4.1.0

    25.1 Introduction

    Le Pipeline V2 peut maintenant être configuré entièrement via YAML, permettant :

    • Activation/désactivation des stages sans recompiler
    • Configuration dynamique de la concurrence, timeouts, retries
    • Ajout de nouveaux stages sans modifier le code de la factory
    • Modularité des workers (un worker = un composant Spring)

    25.2 Architecture

    ┌─────────────────────────────────────────────────────────────────┐
    │                        application.yml                           │
    │  cdc:                                                            │
    │    pipeline:                                                     │
    │      stages:                                                     │
    │        - name: filter                                            │
    │          worker: filterWorker                                    │
    │          enabled: true                                           │
    └───────────────────────────────┬─────────────────────────────────┘
                                    │
                                    ▼
    ┌───────────────────────────────────────────────────────────────────┐
    │            PipelineYamlConfig (@ConfigurationProperties)          │
    │                                                                    │
    │  - name: String                                                    │
    │  - stages: List<StageYamlConfig>                                   │
    │  - getEnabledStages(): List<StageYamlConfig>                       │
    └───────────────────────────────┬───────────────────────────────────┘
                                    │
                                    ▼
    ┌───────────────────────────────────────────────────────────────────┐
    │                    DynamicPipelineBuilder                          │
    │                                                                    │
    │  .config(pipelineConfig)                                           │
    │  .workers(Map<String, PipelineStageWorker>)                        │
    │  .build() → PipelineV2<I, O>                                       │
    └───────────────────────────────────────────────────────────────────┘
    

    25.3 Interface PipelineStageWorker

    Chaque stage est implémenté comme un composant Spring implémentant PipelineStageWorker :

    package eu.lmvi.socle.pipeline.v2.worker;
    
    public interface PipelineStageWorker<I, O> {
    
        /**
         * Nom unique du worker (utilisé dans la config YAML)
         */
        String getName();
    
        /**
         * Initialise le worker (chargement ressources, compilation scripts, etc.)
         */
        default void initialize() throws Exception {}
    
        /**
         * Traite un élément d'entrée (DOIT être thread-safe)
         */
        O process(I input) throws Exception;
    
        /**
         * Libère les ressources
         */
        default void shutdown() {}
    
        /**
         * Indique si ce worker est activé
         */
        default boolean isEnabled() { return true; }
    
        /**
         * Description pour le monitoring
         */
        default String getDescription() { return getName(); }
    
        /**
         * Ordre de priorité (plus bas = plus tôt)
         */
        default int getOrder() { return 100; }
    }
    

    25.4 Exemple d’implémentation

    @Component("filterWorker")
    public class FilterStageWorker implements PipelineStageWorker<WalEvent, PipelineMessage> {
    
        @Autowired
        private TableFilter tableFilter;
    
        @Override
        public String getName() {
            return "filter";
        }
    
        @Override
        public int getOrder() {
            return 10;  // Premier dans le pipeline
        }
    
        @Override
        public PipelineMessage process(WalEvent event) throws Exception {
            PipelineMessage message = new PipelineMessage(event);
            if (!tableFilter.isAllowed(event.getTable())) {
                message.markAsFiltered("table_not_in_whitelist");
            }
            return message;
        }
    }
    

    25.5 Configuration YAML

    cdc:
      pipeline:
        name: cdc-pipeline-v2
        description: Pipeline CDC PostgreSQL vers Kafka
        enabled: true
        stages:
          # Stage 1: Filtrage
          - name: filter
            worker: filterWorker        # Nom du bean Spring
            enabled: true
            concurrency: 1
            timeout-seconds: 5
            max-retries: 1
            order: 10
    
          # Stage 2: Transformation
          - name: transform
            worker: transformWorker
            enabled: true
            concurrency: 2
            timeout-seconds: 30
            order: 20
    
          # Stage 3a: Enrichissement Rules
          - name: enrich-rules
            worker: rulesEnrichmentWorker
            enabled: true
            concurrency: 2
            order: 30
    
          # Stage 3b: Enrichissement BeanShell
          - name: enrich-beanshell
            worker: beanshellEnrichmentWorker
            enabled: ${CDC_ENRICH_BSH_ENABLED:true}  # Variable d'environnement
            concurrency: 2
            order: 31
    
          # Stage 3c: Enrichissement JavaScript
          - name: enrich-javascript
            worker: javascriptEnrichmentWorker
            enabled: ${CDC_ENRICH_JS_ENABLED:true}
            concurrency: 4
            timeout-seconds: 60
            order: 32
    
          # Stage 3d: Enrichissement Janino (nouveau)
          - name: enrich-janino
            worker: janinoEnrichmentWorker
            enabled: ${CDC_ENRICH_JANINO_ENABLED:false}  # Désactivé par défaut
            concurrency: 2
            order: 33
    
          # Stage 4: Publication Kafka
          - name: publish
            worker: publishWorker
            enabled: true
            concurrency: 2
            max-retries: 5
            order: 40
    
          # Stage 5: Commit LSN
          - name: commit
            worker: commitWorker
            enabled: true
            concurrency: 1
            order: 50
    

    25.6 Classe de configuration Spring

    @Configuration
    @ConfigurationProperties(prefix = "cdc.pipeline")
    public class CdcPipelineConfig extends PipelineYamlConfig {
        // Hérite de toutes les propriétés
    }
    

    25.7 Construction du pipeline

    @Component
    public class CdcPipelineFactory {
    
        @Autowired
        private CdcPipelineConfig pipelineConfig;
    
        @Autowired
        private Map<String, PipelineStageWorker<?, ?>> availableWorkers;
    
        @PostConstruct
        public void init() {
            // Log des workers disponibles
            log.info("Available workers: {}", availableWorkers.keySet());
    
            // Construire le pipeline dynamiquement
            cdcPipeline = DynamicPipelineBuilder.create()
                .config(pipelineConfig)
                .workers(availableWorkers)
                .build();
    
            log.info("Pipeline created with {} stages",
                pipelineConfig.countEnabledStages());
        }
    }
    

    25.8 Avantages

    Aspect Avant (hardcodé) Après (YAML)
    Modification Recompilation Redémarrage
    Activation stage Code Java Variable d’env
    Ajout stage Modifier factory Ajouter worker + config
    Configuration Dans le code Centralisée
    Environnements Branches différentes Même code, config différente

    25.9 Activation/Désactivation à chaud

    Via variables d’environnement dans .env :

    # Désactiver BeanShell
    CDC_ENRICH_BSH_ENABLED=false
    
    # Activer Janino
    CDC_ENRICH_JANINO_ENABLED=true
    
    # Redémarrer le service
    sudo systemctl restart cdc-reflet-kafka-v02
    

    25.10 Ajout d’un nouveau type d’enrichissement

    1. Créer le worker :
    @Component("monNouveauWorker")
    public class MonNouveauEnrichmentWorker
        implements PipelineStageWorker<PipelineMessage, PipelineMessage> {
        // ...
    }
    
    1. Ajouter dans application.yml :
    stages:
      - name: enrich-nouveau
        worker: monNouveauWorker
        enabled: ${CDC_ENRICH_NOUVEAU_ENABLED:false}
        order: 34
    
    1. Redémarrer – le pipeline inclura automatiquement le nouveau stage.

    26. Références V2

  • Socle V004 – Supervisor

    Socle V004 – Supervisor

    08 – Supervisor

    Version : 4.0.0 Date : 2025-12-09

    1. Introduction

    Le Supervisor est le composant de supervision du Socle V4. Il surveille l’état de santé des Workers via heartbeats et expose des métriques de santé.

    Fonctionnalités

    • Collecte des heartbeats des Workers
    • Détection des Workers défaillants
    • Agrégation de l’état de santé global
    • Exposition via API REST et métriques
    • Intégration avec SharedDataRegistry

    2. Architecture

    ┌─────────────────────────────────────────────────────────────┐
    │                        Supervisor                            │
    │                                                              │
    │   ┌──────────────────┐    ┌──────────────────┐              │
    │   │ HeartbeatCollector│    │ HealthAggregator │              │
    │   └────────┬─────────┘    └────────┬─────────┘              │
    │            │                       │                         │
    │            ▼                       ▼                         │
    │   ┌──────────────────┐    ┌──────────────────┐              │
    │   │ Worker States    │    │ Global Health    │              │
    │   │ (per worker)     │    │ (aggregated)     │              │
    │   └──────────────────┘    └──────────────────┘              │
    │                                                              │
    └──────────────────────────────────────────────────────────────┘
             │                           │
             ▼                           ▼
    ┌─────────────────┐         ┌─────────────────┐
    │ /admin/health   │         │ /admin/workers  │
    └─────────────────┘         └─────────────────┘
    

    3. Configuration

    3.1 application.yml

    socle:
      supervisor:
        heartbeat-interval-ms: ${SUPERVISOR_HEARTBEAT_MS:10000}
        unhealthy-threshold: ${SUPERVISOR_UNHEALTHY_THRESHOLD:3}
        check-interval-ms: ${SUPERVISOR_CHECK_INTERVAL_MS:5000}
        stale-timeout-ms: ${SUPERVISOR_STALE_TIMEOUT_MS:60000}
    

    3.2 Variables d’environnement

    Variable Description Défaut
    SUPERVISOR_HEARTBEAT_MS Intervalle heartbeat attendu 10000 (10s)
    SUPERVISOR_UNHEALTHY_THRESHOLD Heartbeats manqués avant UNHEALTHY 3
    SUPERVISOR_CHECK_INTERVAL_MS Intervalle de vérification 5000 (5s)
    SUPERVISOR_STALE_TIMEOUT_MS Timeout avant worker STALE 60000 (1min)

    4. Interface Supervisor

    package eu.lmvi.socle.supervisor;
    
    public interface Supervisor {
    
        /**
         * Enregistre un worker dans le supervisor
         */
        void registerWorker(Worker worker);
    
        /**
         * Désenregistre un worker
         */
        void unregisterWorker(String workerName);
    
        /**
         * Reçoit un heartbeat d'un worker
         */
        void heartbeat(String workerName);
    
        /**
         * Reçoit un heartbeat avec métriques
         */
        void heartbeat(String workerName, Map<String, Object> metrics);
    
        /**
         * Récupère l'état d'un worker
         */
        WorkerState getWorkerState(String workerName);
    
        /**
         * Récupère l'état de tous les workers
         */
        Map<String, WorkerState> getAllWorkerStates();
    
        /**
         * Vérifie si un worker est healthy
         */
        boolean isWorkerHealthy(String workerName);
    
        /**
         * Récupère l'état de santé global
         */
        HealthStatus getGlobalHealth();
    
        /**
         * Liste les workers unhealthy
         */
        List<String> getUnhealthyWorkers();
    
        /**
         * Démarre la supervision
         */
        void start();
    
        /**
         * Arrête la supervision
         */
        void stop();
    }
    

    5. États des Workers

    5.1 WorkerState

    package eu.lmvi.socle.supervisor;
    
    public record WorkerState(
        String workerName,
        WorkerStatus status,
        Instant lastHeartbeat,
        int missedHeartbeats,
        Map<String, Object> lastMetrics,
        Instant registeredAt
    ) {
        public boolean isHealthy() {
            return status == WorkerStatus.RUNNING;
        }
    
        public boolean isStale() {
            return status == WorkerStatus.STALE;
        }
    }
    

    5.2 WorkerStatus

    public enum WorkerStatus {
        /**
         * Worker enregistré mais pas encore démarré
         */
        REGISTERED,
    
        /**
         * Worker en cours d'exécution, heartbeats reçus
         */
        RUNNING,
    
        /**
         * Heartbeats manqués mais pas encore timeout
         */
        DEGRADED,
    
        /**
         * Trop de heartbeats manqués, worker considéré unhealthy
         */
        UNHEALTHY,
    
        /**
         * Aucun heartbeat depuis longtemps, worker potentiellement mort
         */
        STALE,
    
        /**
         * Worker arrêté proprement
         */
        STOPPED
    }
    

    5.3 Diagramme d’états

                        ┌────────────────┐
                        │   REGISTERED   │
                        └───────┬────────┘
                                │ first heartbeat
                                ▼
                        ┌────────────────┐
             ┌─────────│    RUNNING     │─────────┐
             │         └───────┬────────┘         │
             │                 │                  │
             │ heartbeat       │ missed           │ stop()
             │ received        │ heartbeat        │
             │                 ▼                  │
             │         ┌────────────────┐         │
             └────────►│   DEGRADED     │         │
                       └───────┬────────┘         │
                               │ threshold        │
                               │ exceeded         │
                               ▼                  │
                       ┌────────────────┐         │
                       │   UNHEALTHY    │         │
                       └───────┬────────┘         │
                               │ stale            │
                               │ timeout          │
                               ▼                  │
                       ┌────────────────┐         │
                       │     STALE      │         │
                       └────────────────┘         │
                                                  │
                       ┌────────────────┐         │
                       │    STOPPED     │◄────────┘
                       └────────────────┘
    

    6. Implémentation

    package eu.lmvi.socle.supervisor;
    
    @Component
    public class DefaultSupervisor implements Supervisor {
    
        private static final Logger log = LoggerFactory.getLogger(DefaultSupervisor.class);
    
        private final SocleConfiguration config;
        private final ConcurrentHashMap<String, WorkerStateInternal> workers = new ConcurrentHashMap<>();
        private final ScheduledExecutorService scheduler;
        private volatile boolean running = false;
    
        public DefaultSupervisor(SocleConfiguration config) {
            this.config = config;
            this.scheduler = Executors.newSingleThreadScheduledExecutor(
                r -> new Thread(r, "supervisor-checker"));
        }
    
        @Override
        public void registerWorker(Worker worker) {
            String name = worker.getName();
            workers.put(name, new WorkerStateInternal(
                name, WorkerStatus.REGISTERED, Instant.now(), 0, Map.of(), Instant.now()));
            log.info("Worker registered: {}", name);
        }
    
        @Override
        public void unregisterWorker(String workerName) {
            WorkerStateInternal state = workers.remove(workerName);
            if (state != null) {
                log.info("Worker unregistered: {}", workerName);
            }
        }
    
        @Override
        public void heartbeat(String workerName) {
            heartbeat(workerName, Map.of());
        }
    
        @Override
        public void heartbeat(String workerName, Map<String, Object> metrics) {
            workers.computeIfPresent(workerName, (name, state) -> {
                log.debug("Heartbeat received: {}", name);
                return new WorkerStateInternal(
                    name,
                    WorkerStatus.RUNNING,
                    Instant.now(),
                    0,
                    metrics,
                    state.registeredAt
                );
            });
        }
    
        @Override
        public WorkerState getWorkerState(String workerName) {
            WorkerStateInternal internal = workers.get(workerName);
            return internal != null ? internal.toPublic() : null;
        }
    
        @Override
        public Map<String, WorkerState> getAllWorkerStates() {
            return workers.entrySet().stream()
                .collect(Collectors.toMap(
                    Map.Entry::getKey,
                    e -> e.getValue().toPublic()
                ));
        }
    
        @Override
        public boolean isWorkerHealthy(String workerName) {
            WorkerStateInternal state = workers.get(workerName);
            return state != null && state.status == WorkerStatus.RUNNING;
        }
    
        @Override
        public HealthStatus getGlobalHealth() {
            if (workers.isEmpty()) {
                return HealthStatus.HEALTHY;
            }
    
            boolean hasUnhealthy = workers.values().stream()
                .anyMatch(s -> s.status == WorkerStatus.UNHEALTHY || s.status == WorkerStatus.STALE);
    
            if (hasUnhealthy) {
                return HealthStatus.UNHEALTHY;
            }
    
            boolean hasDegraded = workers.values().stream()
                .anyMatch(s -> s.status == WorkerStatus.DEGRADED);
    
            if (hasDegraded) {
                return HealthStatus.DEGRADED;
            }
    
            return HealthStatus.HEALTHY;
        }
    
        @Override
        public List<String> getUnhealthyWorkers() {
            return workers.entrySet().stream()
                .filter(e -> e.getValue().status == WorkerStatus.UNHEALTHY
                          || e.getValue().status == WorkerStatus.STALE)
                .map(Map.Entry::getKey)
                .toList();
        }
    
        @Override
        public void start() {
            running = true;
            long checkInterval = config.getSupervisor().getCheckIntervalMs();
            scheduler.scheduleAtFixedRate(
                this::checkWorkers,
                checkInterval,
                checkInterval,
                TimeUnit.MILLISECONDS
            );
            log.info("Supervisor started with check interval: {}ms", checkInterval);
        }
    
        @Override
        public void stop() {
            running = false;
            scheduler.shutdown();
            log.info("Supervisor stopped");
        }
    
        private void checkWorkers() {
            if (!running) return;
    
            long heartbeatInterval = config.getSupervisor().getHeartbeatIntervalMs();
            int unhealthyThreshold = config.getSupervisor().getUnhealthyThreshold();
            long staleTimeout = config.getSupervisor().getStaleTimeoutMs();
    
            Instant now = Instant.now();
    
            workers.replaceAll((name, state) -> {
                if (state.status == WorkerStatus.STOPPED) {
                    return state;
                }
    
                long msSinceLastHeartbeat = Duration.between(state.lastHeartbeat, now).toMillis();
    
                // Stale check
                if (msSinceLastHeartbeat > staleTimeout) {
                    if (state.status != WorkerStatus.STALE) {
                        log.warn("Worker STALE: {} (no heartbeat for {}ms)", name, msSinceLastHeartbeat);
                    }
                    return state.withStatus(WorkerStatus.STALE);
                }
    
                // Missed heartbeat check
                int expectedHeartbeats = (int) (msSinceLastHeartbeat / heartbeatInterval);
                if (expectedHeartbeats > 0) {
                    int newMissedCount = state.missedHeartbeats + 1;
    
                    if (newMissedCount >= unhealthyThreshold) {
                        if (state.status != WorkerStatus.UNHEALTHY) {
                            log.warn("Worker UNHEALTHY: {} (missed {} heartbeats)", name, newMissedCount);
                        }
                        return state.withStatus(WorkerStatus.UNHEALTHY).withMissedCount(newMissedCount);
                    } else {
                        if (state.status == WorkerStatus.RUNNING) {
                            log.info("Worker DEGRADED: {} (missed {} heartbeats)", name, newMissedCount);
                        }
                        return state.withStatus(WorkerStatus.DEGRADED).withMissedCount(newMissedCount);
                    }
                }
    
                return state;
            });
        }
    
        private record WorkerStateInternal(
            String workerName,
            WorkerStatus status,
            Instant lastHeartbeat,
            int missedHeartbeats,
            Map<String, Object> lastMetrics,
            Instant registeredAt
        ) {
            WorkerState toPublic() {
                return new WorkerState(workerName, status, lastHeartbeat, missedHeartbeats, lastMetrics, registeredAt);
            }
    
            WorkerStateInternal withStatus(WorkerStatus newStatus) {
                return new WorkerStateInternal(workerName, newStatus, lastHeartbeat, missedHeartbeats, lastMetrics, registeredAt);
            }
    
            WorkerStateInternal withMissedCount(int newCount) {
                return new WorkerStateInternal(workerName, status, lastHeartbeat, newCount, lastMetrics, registeredAt);
            }
        }
    }
    

    7. Heartbeat depuis les Workers

    7.1 Heartbeat manuel

    @Component
    public class MyWorker implements Worker {
    
        @Autowired
        private Supervisor supervisor;
    
        @Override
        public void doWork() {
            // Envoyer heartbeat avec métriques
            supervisor.heartbeat(getName(), Map.of(
                "processed", processedCount,
                "queueSize", queue.size()
            ));
    
            // Traitement...
        }
    }
    

    7.2 Heartbeat automatique via AbstractWorker

    public abstract class AbstractWorker implements Worker {
    
        @Autowired
        private Supervisor supervisor;
    
        @Override
        public final void doWork() {
            // Heartbeat automatique
            supervisor.heartbeat(getName(), getStats());
    
            // Appel au traitement réel
            doProcess();
        }
    
        protected abstract void doProcess();
    }
    

    7.3 Heartbeat via thread dédié

    @Component
    public class LongRunningWorker implements Worker {
    
        @Autowired
        private Supervisor supervisor;
    
        private ScheduledExecutorService heartbeatExecutor;
    
        @Override
        public void start() {
            heartbeatExecutor = Executors.newSingleThreadScheduledExecutor();
            heartbeatExecutor.scheduleAtFixedRate(
                () -> supervisor.heartbeat(getName()),
                0, 10, TimeUnit.SECONDS
            );
        }
    
        @Override
        public void stop() {
            if (heartbeatExecutor != null) {
                heartbeatExecutor.shutdown();
            }
        }
    
        @Override
        public void doWork() {
            // Long processing - heartbeat handled by separate thread
            processLongTask();
        }
    }
    

    8. API REST

    8.1 Endpoints

    @RestController
    @RequestMapping("/admin")
    public class SupervisorController {
    
        @Autowired
        private Supervisor supervisor;
    
        @GetMapping("/health")
        public ResponseEntity<HealthResponse> health() {
            HealthStatus status = supervisor.getGlobalHealth();
            return ResponseEntity
                .status(status == HealthStatus.HEALTHY ? 200 : 503)
                .body(new HealthResponse(status, supervisor.getUnhealthyWorkers()));
        }
    
        @GetMapping("/workers")
        public Map<String, WorkerState> workers() {
            return supervisor.getAllWorkerStates();
        }
    
        @GetMapping("/workers/{name}")
        public ResponseEntity<WorkerState> worker(@PathVariable String name) {
            WorkerState state = supervisor.getWorkerState(name);
            return state != null
                ? ResponseEntity.ok(state)
                : ResponseEntity.notFound().build();
        }
    }
    

    8.2 Réponses

    // GET /admin/health
    {
      "status": "HEALTHY",
      "unhealthyWorkers": []
    }
    
    // GET /admin/workers
    {
      "kafka-consumer": {
        "workerName": "kafka-consumer",
        "status": "RUNNING",
        "lastHeartbeat": "2025-12-09T10:30:00Z",
        "missedHeartbeats": 0,
        "lastMetrics": {
          "processed": 12345,
          "lag": 23
        },
        "registeredAt": "2025-12-09T10:00:00Z"
      },
      "order-processor": {
        "workerName": "order-processor",
        "status": "DEGRADED",
        "lastHeartbeat": "2025-12-09T10:29:45Z",
        "missedHeartbeats": 1,
        "lastMetrics": {},
        "registeredAt": "2025-12-09T10:00:00Z"
      }
    }
    

    9. Intégration Kubernetes

    9.1 Liveness Probe

    livenessProbe:
      httpGet:
        path: /admin/health
        port: 8080
      initialDelaySeconds: 30
      periodSeconds: 10
      failureThreshold: 3
    

    9.2 Readiness Probe

    readinessProbe:
      httpGet:
        path: /admin/health
        port: 8080
      initialDelaySeconds: 5
      periodSeconds: 5
    

    9.3 Health Controller adapté

    @GetMapping("/health/live")
    public ResponseEntity<Void> live() {
        // Liveness: l'application répond
        return ResponseEntity.ok().build();
    }
    
    @GetMapping("/health/ready")
    public ResponseEntity<Void> ready() {
        // Readiness: tous les workers sont healthy
        HealthStatus status = supervisor.getGlobalHealth();
        return status == HealthStatus.HEALTHY
            ? ResponseEntity.ok().build()
            : ResponseEntity.status(503).build();
    }
    

    10. Métriques Prometheus

    @Component
    public class SupervisorMetrics {
    
        private final Supervisor supervisor;
        private final MeterRegistry registry;
    
        @PostConstruct
        public void registerMetrics() {
            Gauge.builder("socle_workers_total", supervisor,
                s -> s.getAllWorkerStates().size())
                .register(registry);
    
            Gauge.builder("socle_workers_healthy", supervisor,
                s -> s.getAllWorkerStates().values().stream()
                    .filter(WorkerState::isHealthy).count())
                .register(registry);
    
            Gauge.builder("socle_workers_unhealthy", supervisor,
                s -> s.getUnhealthyWorkers().size())
                .register(registry);
        }
    }
    

    11. Bonnes pratiques

    DO

    • Envoyer des heartbeats réguliers depuis tous les workers actifs
    • Inclure des métriques utiles dans les heartbeats
    • Configurer des timeouts adaptés à vos workers
    • Utiliser les probes Kubernetes pour la haute disponibilité

    DON’T

    • Ne pas oublier d’envoyer des heartbeats dans les workers long-running
    • Ne pas ignorer les états DEGRADED
    • Ne pas configurer des timeouts trop courts (faux positifs)
    • Ne pas bloquer l’envoi de heartbeat avec du traitement lourd

    12. Références

  • Socle V004 – Pipeline

    Socle V004 – Pipeline

    09 – Pipeline Engine

    Version : 4.1.0 Date : 2025-12-28

    1. Introduction

    Le PipelineEngine permet de définir des chaînes de traitement composées d’étapes (steps) exécutées séquentiellement ou en parallèle.

    Caractéristiques

    • Définition déclarative des pipelines
    • Exécution séquentielle ou parallèle
    • Gestion des erreurs et retry
    • Context partagé entre étapes
    • Métriques et logging intégrés

    2. Architecture

    ┌─────────────────────────────────────────────────────────────┐
    │                      PipelineEngine                          │
    └─────────────────────────────────────────────────────────────┘
                                  │
                                  ▼
    ┌─────────────────────────────────────────────────────────────┐
    │                      Pipeline                                │
    │  ┌─────────────────────────────────────────────────────┐    │
    │  │              PipelineContext                         │    │
    │  │  - input data                                        │    │
    │  │  - shared state                                      │    │
    │  │  - results                                           │    │
    │  └─────────────────────────────────────────────────────┘    │
    │                                                              │
    │  ┌──────────┐   ┌──────────┐   ┌──────────┐                │
    │  │  Step 1  │──►│  Step 2  │──►│  Step 3  │                │
    │  └──────────┘   └──────────┘   └──────────┘                │
    │                                                              │
    └─────────────────────────────────────────────────────────────┘
    

    3. Interface Pipeline

    package eu.lmvi.socle.pipeline;
    
    public interface Pipeline<I, O> {
    
        /**
         * Nom du pipeline
         */
        String getName();
    
        /**
         * Exécute le pipeline
         */
        PipelineResult<O> execute(I input);
    
        /**
         * Exécute le pipeline avec context
         */
        PipelineResult<O> execute(I input, PipelineContext context);
    
        /**
         * Liste des étapes
         */
        List<PipelineStep<?, ?>> getSteps();
    }
    

    4. Interface PipelineStep

    package eu.lmvi.socle.pipeline;
    
    public interface PipelineStep<I, O> {
    
        /**
         * Nom de l'étape
         */
        String getName();
    
        /**
         * Exécute l'étape
         */
        StepResult<O> execute(I input, PipelineContext context);
    
        /**
         * L'étape peut-elle être retryée ?
         */
        default boolean isRetryable() {
            return true;
        }
    
        /**
         * Nombre max de retries
         */
        default int getMaxRetries() {
            return 3;
        }
    
        /**
         * L'étape est-elle optionnelle ?
         */
        default boolean isOptional() {
            return false;
        }
    }
    

    5. Context et Result

    5.1 PipelineContext

    package eu.lmvi.socle.pipeline;
    
    public class PipelineContext {
        private final Map<String, Object> attributes = new ConcurrentHashMap<>();
        private final List<StepResult<?>> stepResults = new ArrayList<>();
        private final String correlationId;
        private final Instant startTime;
    
        public PipelineContext() {
            this.correlationId = UUID.randomUUID().toString();
            this.startTime = Instant.now();
        }
    
        public void put(String key, Object value) {
            attributes.put(key, value);
        }
    
        public <T> Optional<T> get(String key, Class<T> type) {
            return Optional.ofNullable(attributes.get(key))
                .filter(type::isInstance)
                .map(type::cast);
        }
    
        public void addStepResult(StepResult<?> result) {
            stepResults.add(result);
        }
    
        public List<StepResult<?>> getStepResults() {
            return Collections.unmodifiableList(stepResults);
        }
    
        public String getCorrelationId() {
            return correlationId;
        }
    
        public Duration getElapsedTime() {
            return Duration.between(startTime, Instant.now());
        }
    }
    

    5.2 StepResult

    package eu.lmvi.socle.pipeline;
    
    public record StepResult<T>(
        String stepName,
        StepStatus status,
        T output,
        Exception error,
        Duration duration,
        int attempts
    ) {
        public boolean isSuccess() {
            return status == StepStatus.SUCCESS;
        }
    
        public boolean isFailure() {
            return status == StepStatus.FAILURE;
        }
    
        public boolean isSkipped() {
            return status == StepStatus.SKIPPED;
        }
    
        public static <T> StepResult<T> success(String name, T output, Duration duration) {
            return new StepResult<>(name, StepStatus.SUCCESS, output, null, duration, 1);
        }
    
        public static <T> StepResult<T> failure(String name, Exception error, Duration duration, int attempts) {
            return new StepResult<>(name, StepStatus.FAILURE, null, error, duration, attempts);
        }
    
        public static <T> StepResult<T> skipped(String name) {
            return new StepResult<>(name, StepStatus.SKIPPED, null, null, Duration.ZERO, 0);
        }
    }
    
    public enum StepStatus {
        SUCCESS,
        FAILURE,
        SKIPPED
    }
    

    5.3 PipelineResult

    package eu.lmvi.socle.pipeline;
    
    public record PipelineResult<T>(
        String pipelineName,
        PipelineStatus status,
        T output,
        PipelineContext context,
        Duration totalDuration
    ) {
        public boolean isSuccess() {
            return status == PipelineStatus.SUCCESS;
        }
    
        public List<StepResult<?>> getFailedSteps() {
            return context.getStepResults().stream()
                .filter(StepResult::isFailure)
                .toList();
        }
    }
    
    public enum PipelineStatus {
        SUCCESS,
        PARTIAL_SUCCESS,
        FAILURE
    }
    

    6. Implémentation

    6.1 DefaultPipelineEngine

    package eu.lmvi.socle.pipeline;
    
    @Component
    public class DefaultPipelineEngine implements PipelineEngine {
    
        private static final Logger log = LoggerFactory.getLogger(DefaultPipelineEngine.class);
    
        @Override
        public <I, O> PipelineResult<O> execute(Pipeline<I, O> pipeline, I input) {
            return execute(pipeline, input, new PipelineContext());
        }
    
        @Override
        @SuppressWarnings("unchecked")
        public <I, O> PipelineResult<O> execute(Pipeline<I, O> pipeline, I input, PipelineContext context) {
            log.info("[{}] Starting pipeline execution", pipeline.getName());
            Instant start = Instant.now();
    
            Object currentInput = input;
            O finalOutput = null;
            boolean hasFailure = false;
    
            for (PipelineStep<?, ?> step : pipeline.getSteps()) {
                PipelineStep<Object, Object> typedStep = (PipelineStep<Object, Object>) step;
    
                StepResult<Object> result = executeStep(typedStep, currentInput, context);
                context.addStepResult(result);
    
                if (result.isSuccess()) {
                    currentInput = result.output();
                    finalOutput = (O) result.output();
                } else if (result.isFailure()) {
                    if (!step.isOptional()) {
                        hasFailure = true;
                        log.error("[{}] Pipeline failed at step: {}", pipeline.getName(), step.getName());
                        break;
                    }
                    log.warn("[{}] Optional step failed, continuing: {}", pipeline.getName(), step.getName());
                }
            }
    
            Duration duration = Duration.between(start, Instant.now());
            PipelineStatus status = hasFailure
                ? PipelineStatus.FAILURE
                : (context.getStepResults().stream().anyMatch(StepResult::isFailure)
                    ? PipelineStatus.PARTIAL_SUCCESS
                    : PipelineStatus.SUCCESS);
    
            log.info("[{}] Pipeline completed with status: {} in {}ms",
                pipeline.getName(), status, duration.toMillis());
    
            return new PipelineResult<>(pipeline.getName(), status, finalOutput, context, duration);
        }
    
        private <I, O> StepResult<O> executeStep(PipelineStep<I, O> step, I input, PipelineContext context) {
            log.debug("[{}] Executing step", step.getName());
            Instant start = Instant.now();
            int attempts = 0;
            Exception lastError = null;
    
            while (attempts < step.getMaxRetries()) {
                attempts++;
                try {
                    StepResult<O> result = step.execute(input, context);
                    if (result.isSuccess()) {
                        Duration duration = Duration.between(start, Instant.now());
                        log.debug("[{}] Step completed in {}ms", step.getName(), duration.toMillis());
                        return StepResult.success(step.getName(), result.output(), duration);
                    }
                    lastError = result.error();
                } catch (Exception e) {
                    lastError = e;
                    log.warn("[{}] Step attempt {} failed: {}", step.getName(), attempts, e.getMessage());
    
                    if (!step.isRetryable() || attempts >= step.getMaxRetries()) {
                        break;
                    }
    
                    // Backoff
                    try {
                        Thread.sleep(attempts * 1000L);
                    } catch (InterruptedException ie) {
                        Thread.currentThread().interrupt();
                        break;
                    }
                }
            }
    
            Duration duration = Duration.between(start, Instant.now());
            return StepResult.failure(step.getName(), lastError, duration, attempts);
        }
    }
    

    6.2 Pipeline Builder

    package eu.lmvi.socle.pipeline;
    
    public class PipelineBuilder<I, O> {
    
        private final String name;
        private final List<PipelineStep<?, ?>> steps = new ArrayList<>();
    
        public PipelineBuilder(String name) {
            this.name = name;
        }
    
        public static <I, O> PipelineBuilder<I, O> create(String name) {
            return new PipelineBuilder<>(name);
        }
    
        public <NO> PipelineBuilder<I, NO> addStep(PipelineStep<O, NO> step) {
            steps.add(step);
            return (PipelineBuilder<I, NO>) this;
        }
    
        public <NO> PipelineBuilder<I, NO> addStep(String name, Function<O, NO> processor) {
            return addStep(new FunctionalStep<>(name, processor));
        }
    
        public Pipeline<I, O> build() {
            return new DefaultPipeline<>(name, List.copyOf(steps));
        }
    
        private record FunctionalStep<I, O>(String name, Function<I, O> processor) implements PipelineStep<I, O> {
    
            @Override
            public String getName() {
                return name;
            }
    
            @Override
            public StepResult<O> execute(I input, PipelineContext context) {
                O output = processor.apply(input);
                return StepResult.success(name, output, Duration.ZERO);
            }
        }
    }
    

    7. Utilisation

    7.1 Pipeline simple

    @Component
    public class OrderPipeline {
    
        @Autowired
        private PipelineEngine engine;
    
        public PipelineResult<OrderResult> processOrder(Order order) {
            Pipeline<Order, OrderResult> pipeline = PipelineBuilder
                .<Order, OrderResult>create("order-processing")
                .addStep("validate", this::validateOrder)
                .addStep("enrich", this::enrichOrder)
                .addStep("process", this::processOrder)
                .addStep("notify", this::notifyCustomer)
                .build();
    
            return engine.execute(pipeline, order);
        }
    
        private ValidatedOrder validateOrder(Order order) {
            // Validation...
            return new ValidatedOrder(order);
        }
    
        private EnrichedOrder enrichOrder(ValidatedOrder order) {
            // Enrichissement...
            return new EnrichedOrder(order);
        }
    
        private ProcessedOrder processOrder(EnrichedOrder order) {
            // Traitement...
            return new ProcessedOrder(order);
        }
    
        private OrderResult notifyCustomer(ProcessedOrder order) {
            // Notification...
            return new OrderResult(order, "SUCCESS");
        }
    }
    

    7.2 Étapes personnalisées

    public class ValidationStep implements PipelineStep<Order, ValidatedOrder> {
    
        @Override
        public String getName() {
            return "validation";
        }
    
        @Override
        public StepResult<ValidatedOrder> execute(Order input, PipelineContext context) {
            List<String> errors = new ArrayList<>();
    
            if (input.getCustomerId() == null) {
                errors.add("Missing customer ID");
            }
            if (input.getItems().isEmpty()) {
                errors.add("No items in order");
            }
    
            if (!errors.isEmpty()) {
                return StepResult.failure(getName(),
                    new ValidationException(errors),
                    Duration.ZERO, 1);
            }
    
            // Stocker des données dans le context
            context.put("customerId", input.getCustomerId());
    
            return StepResult.success(getName(), new ValidatedOrder(input), Duration.ZERO);
        }
    
        @Override
        public boolean isRetryable() {
            return false;  // Validation ne doit pas être retryée
        }
    }
    

    7.3 Étape optionnelle

    public class NotificationStep implements PipelineStep<ProcessedOrder, ProcessedOrder> {
    
        @Override
        public String getName() {
            return "notification";
        }
    
        @Override
        public StepResult<ProcessedOrder> execute(ProcessedOrder input, PipelineContext context) {
            try {
                sendNotification(input);
                return StepResult.success(getName(), input, Duration.ZERO);
            } catch (Exception e) {
                return StepResult.failure(getName(), e, Duration.ZERO, 1);
            }
        }
    
        @Override
        public boolean isOptional() {
            return true;  // Le pipeline continue même si la notif échoue
        }
    }
    

    7.4 Utilisation du context

    public class EnrichmentStep implements PipelineStep<ValidatedOrder, EnrichedOrder> {
    
        @Autowired
        private CustomerService customerService;
    
        @Override
        public String getName() {
            return "enrichment";
        }
    
        @Override
        public StepResult<EnrichedOrder> execute(ValidatedOrder input, PipelineContext context) {
            // Lire depuis le context
            String customerId = context.get("customerId", String.class).orElseThrow();
    
            // Enrichir
            Customer customer = customerService.getCustomer(customerId);
            EnrichedOrder enriched = new EnrichedOrder(input, customer);
    
            // Écrire dans le context pour les étapes suivantes
            context.put("customerEmail", customer.getEmail());
    
            return StepResult.success(getName(), enriched, Duration.ZERO);
        }
    }
    

    8. Gestion des erreurs

    8.1 Retry automatique

    public class ExternalApiStep implements PipelineStep<Data, ApiResponse> {
    
        @Override
        public String getName() {
            return "external-api-call";
        }
    
        @Override
        public boolean isRetryable() {
            return true;
        }
    
        @Override
        public int getMaxRetries() {
            return 5;  // 5 tentatives max
        }
    
        @Override
        public StepResult<ApiResponse> execute(Data input, PipelineContext context) {
            // Appel API qui peut échouer
            ApiResponse response = callApi(input);
            return StepResult.success(getName(), response, Duration.ZERO);
        }
    }
    

    8.2 Traitement des résultats

    PipelineResult<OrderResult> result = engine.execute(pipeline, order);
    
    if (result.isSuccess()) {
        log.info("Order processed successfully: {}", result.output());
    } else {
        // Analyser les étapes en échec
        for (StepResult<?> stepResult : result.getFailedSteps()) {
            log.error("Step {} failed after {} attempts: {}",
                stepResult.stepName(),
                stepResult.attempts(),
                stepResult.error().getMessage());
        }
    
        // Décider quoi faire
        if (result.status() == PipelineStatus.PARTIAL_SUCCESS) {
            // Certaines étapes optionnelles ont échoué
            handlePartialSuccess(result);
        } else {
            // Échec complet
            handleFailure(result);
        }
    }
    

    9. Pipelines parallèles

    public class ParallelEnrichmentStep implements PipelineStep<Order, EnrichedOrder> {
    
        @Autowired
        private ExecutorService executor;
    
        @Override
        public StepResult<EnrichedOrder> execute(Order input, PipelineContext context) {
            // Exécuter plusieurs enrichissements en parallèle
            CompletableFuture<Customer> customerFuture = CompletableFuture.supplyAsync(
                () -> customerService.getCustomer(input.getCustomerId()), executor);
    
            CompletableFuture<Inventory> inventoryFuture = CompletableFuture.supplyAsync(
                () -> inventoryService.checkInventory(input.getItems()), executor);
    
            CompletableFuture<Pricing> pricingFuture = CompletableFuture.supplyAsync(
                () -> pricingService.calculatePrice(input), executor);
    
            try {
                CompletableFuture.allOf(customerFuture, inventoryFuture, pricingFuture).join();
    
                EnrichedOrder enriched = new EnrichedOrder(
                    input,
                    customerFuture.get(),
                    inventoryFuture.get(),
                    pricingFuture.get()
                );
    
                return StepResult.success(getName(), enriched, Duration.ZERO);
            } catch (Exception e) {
                return StepResult.failure(getName(), e, Duration.ZERO, 1);
            }
        }
    }
    

    10. Métriques

    @Component
    public class PipelineMetrics {
    
        private final MeterRegistry registry;
    
        public void recordPipelineExecution(PipelineResult<?> result) {
            Timer.builder("socle_pipeline_duration")
                .tag("pipeline", result.pipelineName())
                .tag("status", result.status().name())
                .register(registry)
                .record(result.totalDuration());
    
            Counter.builder("socle_pipeline_executions")
                .tag("pipeline", result.pipelineName())
                .tag("status", result.status().name())
                .register(registry)
                .increment();
        }
    
        public void recordStepExecution(StepResult<?> result) {
            Timer.builder("socle_pipeline_step_duration")
                .tag("step", result.stepName())
                .tag("status", result.status().name())
                .register(registry)
                .record(result.duration());
        }
    }
    

    11. Bonnes pratiques

    DO

    • Garder les étapes petites et focalisées
    • Utiliser le context pour partager des données entre étapes
    • Marquer les étapes non-critiques comme optionnelles
    • Logger au niveau des étapes pour le debugging
    • Configurer des retries appropriés pour les appels externes

    DON’T

    • Ne pas faire de pipelines avec trop d’étapes (> 10)
    • Ne pas mélanger logique métier et infrastructure dans une étape
    • Ne pas ignorer les erreurs des étapes obligatoires
    • Ne pas utiliser de pipelines pour du traitement simple

    12. Références

    Pipeline Engine V2 (Nouveau)

    Ajouté en version 4.1.0

    13. Introduction Pipeline V2

    Le Pipeline V2 est une refonte complète du système de pipelines, conçue pour les applications à haute disponibilité nécessitant :

    • Garantie at-least-once : Aucune perte de message grâce au pattern Queue/Claim/Ack
    • Virtual Threads (Java 21) : Scalabilité maximale avec les threads virtuels
    • Reprise après crash : État persisté permettant la reprise au dernier stage OK
    • DLQ (Dead Letter Queue) : Gestion des échecs définitifs avec replay

    Quand utiliser V2 vs V1 ?

    Critère Pipeline V1 Pipeline V2
    Exécution Synchrone, mono-thread Asynchrone, multi-thread
    Garantie de livraison At-most-once At-least-once
    Persistance Non Oui (TechDB / H2)
    DLQ Non Oui
    Use case Traitement simple Flux critiques, haute disponibilité

    14. Architecture V2

    ┌─────────────────────────────────────────────────────────────────┐
    │                            MOP                                   │
    │               (supervision, restart, health)                     │
    └───────────────────────────┬─────────────────────────────────────┘
                                │
    ┌───────────────────────────▼─────────────────────────────────────┐
    │                     PipelineV2                                   │
    │                                                                  │
    │  ┌────────────┐     ┌────────────┐     ┌────────────┐          │
    │  │  Stage A   │────►│  Stage B   │────►│  Stage C   │          │
    │  │ ┌────────┐ │     │ ┌────────┐ │     │ ┌────────┐ │          │
    │  │ │ Queue  │ │     │ │ Queue  │ │     │ │ Queue  │ │          │
    │  │ └────────┘ │     │ └────────┘ │     │ └────────┘ │          │
    │  │ VThreads  │     │ VThreads  │     │ VThreads  │          │
    │  └────────────┘     └────────────┘     └────────────┘          │
    │                                                                  │
    │         ┌─────────────────┴─────────────────┐                   │
    │    ┌────▼────┐                        ┌────▼────┐              │
    │    │ Context │                        │   DLQ   │              │
    │    │ (state) │                        │ (errors)│              │
    │    └─────────┘                        └─────────┘              │
    └─────────────────────────────────────────────────────────────────┘
    

    Composants

    • StageQueue : Queue avec sémantique Claim/Ack entre chaque stage
    • VThreadStageExecutor : Workers Virtual Threads consommant les queues
    • PersistentPipelineContext : État persisté pour reprise après crash
    • DLQ : Messages en échec après max retries

    15. Création d’un Pipeline V2

    15.1 Builder Fluent

    import eu.lmvi.socle.pipeline.v2.PipelineBuilderV2;
    import eu.lmvi.socle.pipeline.v2.PipelineV2;
    
    // Création du pipeline
    PipelineV2<Order, Result> pipeline = PipelineBuilderV2
        .<Order>create("order-processing")
        .description("Pipeline de traitement des commandes")
    
        // Stage 1: Validation (2 workers, timeout 30s)
        .addStage("validate", this::validateOrder)
            .concurrency(2)
            .timeout(Duration.ofSeconds(30))
    
        // Stage 2: Enrichissement (4 workers, queue 1000)
        .addStage("enrich", this::enrichOrder)
            .concurrency(4)
            .queueSize(1000)
    
        // Stage 3: Publication (2 workers)
        .addStage("publish", this::publishOrder)
            .concurrency(2)
            .maxRetries(5)
            .backoff(Duration.ofMillis(200), 2.0)
    
        .build();
    

    15.2 Configuration par stage

    .addStage("nom", processor)
        .concurrency(4)           // Nombre de workers parallèles
        .queueSize(500)           // Taille max de la queue
        .timeout(Duration.ofMinutes(5))  // Timeout par message
        .maxRetries(3)            // Tentatives avant DLQ
        .backoff(base, multiplier) // Backoff exponentiel
        .optional()               // Stage optionnel (échec non bloquant)
    

    16. Cycle de vie

    16.1 Démarrage/Arrêt

    // Démarrer le pipeline (lance tous les workers)
    pipeline.start();
    
    // Vérifier l'état
    boolean running = pipeline.isRunning();
    
    // Arrêt gracieux (attend la fin des traitements en cours)
    pipeline.stop();
    
    // Arrêt immédiat (interrompt tout)
    pipeline.stopNow();
    

    16.2 Soumission de messages

    // Soumettre un message
    String messageId = pipeline.submit(order, "exec-001");
    
    // Avec métadonnées
    String messageId = pipeline.submit(order, "exec-001", Map.of(
        "source", "api",
        "priority", "high"
    ));
    

    16.3 Suivi d’exécution

    // État d'une exécution
    Optional<PipelineState> state = pipeline.getExecutionState("exec-001");
    state.ifPresent(s -> {
        System.out.println("Status: " + s.status());
        System.out.println("Current stage: " + s.currentStage());
        System.out.println("Duration: " + s.duration());
    });
    
    // Liste des exécutions actives
    List<PipelineState> active = pipeline.listActiveExecutions();
    
    // Statistiques globales
    PipelineV2.PipelineStats stats = pipeline.getStats();
    System.out.println("Submitted: " + stats.totalSubmitted());
    System.out.println("Completed: " + stats.totalCompleted());
    System.out.println("Failed: " + stats.totalFailed());
    

    17. Queue/Claim/Ack

    17.1 Pattern de consommation

    Le Pipeline V2 utilise un pattern de consommation fiable :

    1. CLAIM   : Le worker réserve un message (lease)
    2. PROCESS : Le worker traite le message
    3. ACK     : Succès → message supprimé
       ou
       NACK    : Échec → retry ou DLQ
    

    17.2 Lease et récupération

    Si un worker crash pendant le traitement :

    • Le lease expire après le timeout configuré
    • Le message est automatiquement remis en queue
    • Un autre worker le reprend
    // Timeout de lease par défaut: 5 minutes
    // Configurable par stage via timeout()
    

    17.3 Implémentations disponibles

    Implémentation Persistance Use case
    InMemoryStageQueue Non Développement, tests
    TechDbStageQueue Oui (H2) Production

    18. Dead Letter Queue (DLQ)

    18.1 Envoi en DLQ

    Un message est envoyé en DLQ après :

    • maxRetries tentatives échouées
    • Une erreur non-retryable

    18.2 Consultation

    // Via API REST
    GET /admin/pipelines/v2/{name}/dlq
    
    // Programmatiquement (via StageQueue)
    List<DlqMessage<Order>> messages = queue.peekDlq(100);
    for (DlqMessage<Order> msg : messages) {
        System.out.println("ID: " + msg.messageId());
        System.out.println("Error: " + msg.errorMessage());
        System.out.println("Attempts: " + msg.attempts());
        System.out.println("Failed at: " + msg.failedAt());
    }
    

    18.3 Replay

    // Via API REST
    POST /admin/pipelines/v2/{name}/dlq/{messageId}/replay
    
    // Programmatiquement
    boolean replayed = queue.replayFromDlq(messageId);
    
    // Replay tous les messages
    int count = queue.replayAllFromDlq("exec-001");
    

    18.4 Suppression

    // Via API REST
    DELETE /admin/pipelines/v2/{name}/dlq/{messageId}
    
    // Programmatiquement
    queue.deleteFromDlq(messageId);
    
    // Purge complète
    queue.purgeDlq();
    

    19. Persistance et reprise

    19.1 Context persistant

    Le Pipeline V2 peut persister son état pour survivre aux redémarrages :

    import eu.lmvi.socle.pipeline.context.TechDbPipelineContext;
    
    // Créer un context persistant
    Supplier<Connection> connectionSupplier = () -> dataSource.getConnection();
    PersistentPipelineContext context = new TechDbPipelineContext(connectionSupplier);
    
    // Utiliser avec le builder
    PipelineV2<Order, Result> pipeline = PipelineBuilderV2
        .<Order>create("order-processing")
        .context(context)  // Utilise le context persistant
        .addStage(...)
        .build();
    

    19.2 Reprise après crash

    Au redémarrage, les messages non-ackés sont automatiquement re-traités :

    1. Les messages en status CLAIMED dont le lease a expiré → remis en PENDING
    2. Les stages reprennent leur consommation normalement
    3. Grâce à l’idempotence, les messages déjà traités ne sont pas re-traités

    20. Idempotence (OBLIGATOIRE)

    20.1 Pourquoi ?

    Le modèle at-least-once signifie qu’un message peut être traité plusieurs fois :

    • Crash après traitement mais avant ACK
    • Timeout de lease pendant un traitement long
    • Replay depuis la DLQ

    Chaque stage DOIT être idempotent.

    20.2 Patterns recommandés

    // Pattern 1: Vérifier "déjà traité"
    public Order processOrder(Order order) {
        if (orderRepository.exists(order.getId())) {
            return orderRepository.get(order.getId()); // Déjà traité
        }
        // Traiter...
        return orderRepository.save(order);
    }
    
    // Pattern 2: UPSERT au lieu d'INSERT
    public void saveOrder(Order order) {
        orderRepository.upsert(order); // Idempotent par nature
    }
    
    // Pattern 3: Utiliser executionId comme clé
    public void processWithDedup(Order order, String executionId) {
        String dedupKey = "order:" + order.getId() + ":" + executionId;
        if (kvBus.exists(dedupKey)) {
            return; // Déjà traité pour cette exécution
        }
    
        // Traiter...
    
        kvBus.set(dedupKey, true, Duration.ofDays(7));
    }
    

    20.3 Anti-patterns

    // MAUVAIS: INSERT sans vérification
    db.insert(order); // Échoue si déjà inséré
    
    // MAUVAIS: Compteur incrémental
    counter.increment(); // Compté plusieurs fois en cas de replay
    
    // MAUVAIS: Email sans déduplication
    emailService.send(email); // Envoyé plusieurs fois
    

    21. API REST d’administration

    21.1 Endpoints disponibles

    GET  /admin/pipelines/v2                    → Liste des pipelines
    GET  /admin/pipelines/v2/{name}             → État d'un pipeline
    GET  /admin/pipelines/v2/{name}/stages      → État des stages
    GET  /admin/pipelines/v2/{name}/executions  → Exécutions en cours
    
    GET  /admin/pipelines/v2/{name}/dlq         → Messages en DLQ
    POST /admin/pipelines/v2/{name}/dlq/{id}/replay  → Rejouer un message
    DELETE /admin/pipelines/v2/{name}/dlq/{id}  → Supprimer de la DLQ
    POST /admin/pipelines/v2/{name}/dlq/replay-all   → Rejouer tous
    
    POST /admin/pipelines/v2/{name}/start       → Démarrer le pipeline
    POST /admin/pipelines/v2/{name}/stop        → Arrêter gracieusement
    POST /admin/pipelines/v2/{name}/stop-now    → Arrêter immédiatement
    

    21.2 Exemple de réponse

    GET /admin/pipelines/v2/order-processing
    
    {
      "timestamp": 1735387200000,
      "name": "order-processing",
      "description": "Pipeline de traitement des commandes",
      "running": true,
      "stats": {
        "stage_count": 3,
        "active_executions": 5,
        "total_submitted": 1500,
        "total_completed": 1480,
        "total_failed": 12
      }
    }
    

    22. Métriques Prometheus

    22.1 Métriques exposées

    # Compteurs
    socle_pipeline_v2_messages_total{pipeline="order-processing", stage="validate", status="success"}
    socle_pipeline_v2_messages_total{pipeline="order-processing", stage="validate", status="failed"}
    socle_pipeline_v2_dlq_total{pipeline="order-processing", stage="validate"}
    
    # Gauges
    socle_pipeline_v2_queue_pending{pipeline="order-processing", stage="validate"}
    socle_pipeline_v2_queue_processing{pipeline="order-processing", stage="validate"}
    socle_pipeline_v2_dlq_size{pipeline="order-processing", stage="validate"}
    socle_pipeline_v2_active_executions{pipeline="order-processing"}
    socle_pipeline_v2_running{pipeline="order-processing"}
    
    # Histogrammes (durée de traitement)
    socle_pipeline_v2_stage_duration_seconds{pipeline="order-processing", stage="validate", quantile="0.5"}
    socle_pipeline_v2_stage_duration_seconds{pipeline="order-processing", stage="validate", quantile="0.95"}
    socle_pipeline_v2_stage_duration_seconds{pipeline="order-processing", stage="validate", quantile="0.99"}
    

    22.2 Alertes recommandées

    # DLQ croissante
    - alert: PipelineDLQGrowing
      expr: increase(socle_pipeline_v2_dlq_total[1h]) > 10
      labels:
        severity: warning
    
    # Queue qui s'accumule (backpressure)
    - alert: PipelineQueueBacklog
      expr: socle_pipeline_v2_queue_pending > 1000
      for: 5m
      labels:
        severity: warning
    
    # Pipeline arrêté
    - alert: PipelineStopped
      expr: socle_pipeline_v2_running == 0
      for: 1m
      labels:
        severity: critical
    

    23. Bonnes pratiques V2

    DO

    • Toujours implémenter l’idempotence dans chaque stage
    • Utiliser TechDbPipelineContext en production pour la persistance
    • Configurer des maxRetries appropriés (3-5 pour API externes)
    • Monitorer les métriques DLQ et queue_pending
    • Traiter régulièrement les messages en DLQ (replay ou suppression)

    DON’T

    • Ne jamais supposer qu’un message ne sera traité qu’une fois
    • Ne pas ignorer les messages en DLQ
    • Ne pas mettre de timeout trop courts (risque de faux positifs)
    • Ne pas créer trop de stages (overhead de queues)
    • Ne pas oublier d’appeler pipeline.stop() à l’arrêt de l’application

    24. Migration V1 → V2

    24.1 Changements d’API

    V1 V2
    PipelineEngine.execute(pipeline, input) pipeline.submit(input, executionId)
    Pipeline<I,O> PipelineV2<I,O>
    PipelineBuilder PipelineBuilderV2
    Exécution synchrone Exécution asynchrone

    24.2 Étapes de migration

    1. Rendre les steps idempotents (obligatoire avant migration)
    2. Créer le pipeline avec PipelineBuilderV2
    3. Remplacer execute() par submit()
    4. Ajouter le suivi asynchrone des résultats si nécessaire
    5. Configurer la persistance (TechDbPipelineContext)
    6. Activer les métriques et monitoring DLQ

    25. Configuration Dynamique (YAML)

    Ajouté en version 4.1.0

    25.1 Introduction

    Le Pipeline V2 peut maintenant être configuré entièrement via YAML, permettant :

    • Activation/désactivation des stages sans recompiler
    • Configuration dynamique de la concurrence, timeouts, retries
    • Ajout de nouveaux stages sans modifier le code de la factory
    • Modularité des workers (un worker = un composant Spring)

    25.2 Architecture

    ┌─────────────────────────────────────────────────────────────────┐
    │                        application.yml                           │
    │  cdc:                                                            │
    │    pipeline:                                                     │
    │      stages:                                                     │
    │        - name: filter                                            │
    │          worker: filterWorker                                    │
    │          enabled: true                                           │
    └───────────────────────────────┬─────────────────────────────────┘
                                    │
                                    ▼
    ┌───────────────────────────────────────────────────────────────────┐
    │            PipelineYamlConfig (@ConfigurationProperties)          │
    │                                                                    │
    │  - name: String                                                    │
    │  - stages: List<StageYamlConfig>                                   │
    │  - getEnabledStages(): List<StageYamlConfig>                       │
    └───────────────────────────────┬───────────────────────────────────┘
                                    │
                                    ▼
    ┌───────────────────────────────────────────────────────────────────┐
    │                    DynamicPipelineBuilder                          │
    │                                                                    │
    │  .config(pipelineConfig)                                           │
    │  .workers(Map<String, PipelineStageWorker>)                        │
    │  .build() → PipelineV2<I, O>                                       │
    └───────────────────────────────────────────────────────────────────┘
    

    25.3 Interface PipelineStageWorker

    Chaque stage est implémenté comme un composant Spring implémentant PipelineStageWorker :

    package eu.lmvi.socle.pipeline.v2.worker;
    
    public interface PipelineStageWorker<I, O> {
    
        /**
         * Nom unique du worker (utilisé dans la config YAML)
         */
        String getName();
    
        /**
         * Initialise le worker (chargement ressources, compilation scripts, etc.)
         */
        default void initialize() throws Exception {}
    
        /**
         * Traite un élément d'entrée (DOIT être thread-safe)
         */
        O process(I input) throws Exception;
    
        /**
         * Libère les ressources
         */
        default void shutdown() {}
    
        /**
         * Indique si ce worker est activé
         */
        default boolean isEnabled() { return true; }
    
        /**
         * Description pour le monitoring
         */
        default String getDescription() { return getName(); }
    
        /**
         * Ordre de priorité (plus bas = plus tôt)
         */
        default int getOrder() { return 100; }
    }
    

    25.4 Exemple d’implémentation

    @Component("filterWorker")
    public class FilterStageWorker implements PipelineStageWorker<WalEvent, PipelineMessage> {
    
        @Autowired
        private TableFilter tableFilter;
    
        @Override
        public String getName() {
            return "filter";
        }
    
        @Override
        public int getOrder() {
            return 10;  // Premier dans le pipeline
        }
    
        @Override
        public PipelineMessage process(WalEvent event) throws Exception {
            PipelineMessage message = new PipelineMessage(event);
            if (!tableFilter.isAllowed(event.getTable())) {
                message.markAsFiltered("table_not_in_whitelist");
            }
            return message;
        }
    }
    

    25.5 Configuration YAML

    cdc:
      pipeline:
        name: cdc-pipeline-v2
        description: Pipeline CDC PostgreSQL vers Kafka
        enabled: true
        stages:
          # Stage 1: Filtrage
          - name: filter
            worker: filterWorker        # Nom du bean Spring
            enabled: true
            concurrency: 1
            timeout-seconds: 5
            max-retries: 1
            order: 10
    
          # Stage 2: Transformation
          - name: transform
            worker: transformWorker
            enabled: true
            concurrency: 2
            timeout-seconds: 30
            order: 20
    
          # Stage 3a: Enrichissement Rules
          - name: enrich-rules
            worker: rulesEnrichmentWorker
            enabled: true
            concurrency: 2
            order: 30
    
          # Stage 3b: Enrichissement BeanShell
          - name: enrich-beanshell
            worker: beanshellEnrichmentWorker
            enabled: ${CDC_ENRICH_BSH_ENABLED:true}  # Variable d'environnement
            concurrency: 2
            order: 31
    
          # Stage 3c: Enrichissement JavaScript
          - name: enrich-javascript
            worker: javascriptEnrichmentWorker
            enabled: ${CDC_ENRICH_JS_ENABLED:true}
            concurrency: 4
            timeout-seconds: 60
            order: 32
    
          # Stage 3d: Enrichissement Janino (nouveau)
          - name: enrich-janino
            worker: janinoEnrichmentWorker
            enabled: ${CDC_ENRICH_JANINO_ENABLED:false}  # Désactivé par défaut
            concurrency: 2
            order: 33
    
          # Stage 4: Publication Kafka
          - name: publish
            worker: publishWorker
            enabled: true
            concurrency: 2
            max-retries: 5
            order: 40
    
          # Stage 5: Commit LSN
          - name: commit
            worker: commitWorker
            enabled: true
            concurrency: 1
            order: 50
    

    25.6 Classe de configuration Spring

    @Configuration
    @ConfigurationProperties(prefix = "cdc.pipeline")
    public class CdcPipelineConfig extends PipelineYamlConfig {
        // Hérite de toutes les propriétés
    }
    

    25.7 Construction du pipeline

    @Component
    public class CdcPipelineFactory {
    
        @Autowired
        private CdcPipelineConfig pipelineConfig;
    
        @Autowired
        private Map<String, PipelineStageWorker<?, ?>> availableWorkers;
    
        @PostConstruct
        public void init() {
            // Log des workers disponibles
            log.info("Available workers: {}", availableWorkers.keySet());
    
            // Construire le pipeline dynamiquement
            cdcPipeline = DynamicPipelineBuilder.create()
                .config(pipelineConfig)
                .workers(availableWorkers)
                .build();
    
            log.info("Pipeline created with {} stages",
                pipelineConfig.countEnabledStages());
        }
    }
    

    25.8 Avantages

    Aspect Avant (hardcodé) Après (YAML)
    Modification Recompilation Redémarrage
    Activation stage Code Java Variable d’env
    Ajout stage Modifier factory Ajouter worker + config
    Configuration Dans le code Centralisée
    Environnements Branches différentes Même code, config différente

    25.9 Activation/Désactivation à chaud

    Via variables d’environnement dans .env :

    # Désactiver BeanShell
    CDC_ENRICH_BSH_ENABLED=false
    
    # Activer Janino
    CDC_ENRICH_JANINO_ENABLED=true
    
    # Redémarrer le service
    sudo systemctl restart cdc-reflet-kafka-v02
    

    25.10 Ajout d’un nouveau type d’enrichissement

    1. Créer le worker :
    @Component("monNouveauWorker")
    public class MonNouveauEnrichmentWorker
        implements PipelineStageWorker<PipelineMessage, PipelineMessage> {
        // ...
    }
    
    1. Ajouter dans application.yml :
    stages:
      - name: enrich-nouveau
        worker: monNouveauWorker
        enabled: ${CDC_ENRICH_NOUVEAU_ENABLED:false}
        order: 34
    
    1. Redémarrer – le pipeline inclura automatiquement le nouveau stage.

    26. Références V2

  • Socle V004 – Exemples de Code

    Socle V004 – Exemples de Code

    19 – Exemples

    Version : 4.0.0 Date : 2025-12-09

    1. Application minimale

    1.1 pom.xml

    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0"
             xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
             https://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
    
        <parent>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-parent</artifactId>
            <version>3.2.1</version>
        </parent>
    
        <groupId>com.example</groupId>
        <artifactId>my-socle-app</artifactId>
        <version>1.0.0</version>
    
        <properties>
            <java.version>21</java.version>
        </properties>
    
        <dependencies>
            <!-- Socle V4 -->
            <dependency>
                <groupId>eu.lmvi</groupId>
                <artifactId>socle-v004</artifactId>
                <version>4.0.0</version>
            </dependency>
        </dependencies>
    
        <build>
            <plugins>
                <plugin>
                    <groupId>org.springframework.boot</groupId>
                    <artifactId>spring-boot-maven-plugin</artifactId>
                </plugin>
            </plugins>
        </build>
    </project>
    

    1.2 Application.java

    package com.example;
    
    import org.springframework.boot.SpringApplication;
    import org.springframework.boot.autoconfigure.SpringBootApplication;
    import org.springframework.context.annotation.ComponentScan;
    
    @SpringBootApplication
    @ComponentScan(basePackages = {"com.example", "eu.lmvi.socle"})
    public class Application {
        public static void main(String[] args) {
            SpringApplication.run(Application.class, args);
        }
    }
    

    1.3 application.yml

    socle:
      app_name: my-app
      env_name: ${ENV_NAME:DEV}
      region: ${REGION:local}
    
    server:
      port: ${HTTP_PORT:8080}
    
    logging:
      config: classpath:log4j2.xml
    

    1.4 Worker simple

    package com.example.worker;
    
    import eu.lmvi.socle.worker.Worker;
    import org.springframework.stereotype.Component;
    
    @Component
    public class HelloWorker implements Worker {
    
        @Override
        public String getName() {
            return "hello-worker";
        }
    
        @Override
        public void initialize() {
            System.out.println("Hello Worker initialized");
        }
    
        @Override
        public void start() {
            System.out.println("Hello Worker started");
        }
    
        @Override
        public void doWork() {
            System.out.println("Hello from worker!");
        }
    
        @Override
        public void stop() {
            System.out.println("Hello Worker stopped");
        }
    
        @Override
        public boolean isHealthy() {
            return true;
        }
    
        @Override
        public Map<String, Object> getStats() {
            return Map.of("status", "running");
        }
    }
    

    2. Worker Kafka Consumer

    package com.example.worker;
    
    import eu.lmvi.socle.worker.AbstractWorker;
    import eu.lmvi.socle.techdb.TechDbManager;
    import org.apache.kafka.clients.consumer.*;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Component;
    
    @Component
    public class KafkaConsumerWorker extends AbstractWorker {
    
        @Autowired
        private TechDbManager techDb;
    
        private KafkaConsumer<String, String> consumer;
        private String topic = "my-topic";
        private long lastOffset = 0;
    
        @Override
        public String getName() {
            return "kafka-consumer";
        }
    
        @Override
        public int getStartPriority() {
            return 10;
        }
    
        @Override
        protected void doInitialize() {
            // Restaurer l'offset
            lastOffset = techDb.getOffset("kafka", topic + "-0").orElse(0L);
            log.info("Starting from offset: {}", lastOffset);
    
            // Créer le consumer
            Properties props = new Properties();
            props.put("bootstrap.servers", "localhost:9092");
            props.put("group.id", "my-group");
            props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            props.put("enable.auto.commit", "false");
    
            consumer = new KafkaConsumer<>(props);
            consumer.subscribe(List.of(topic));
        }
    
        @Override
        protected void doProcess() {
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
    
            for (ConsumerRecord<String, String> record : records) {
                processMessage(record);
                lastOffset = record.offset();
                incrementProcessed();
            }
    
            // Persister périodiquement
            if (processedCount.get() % 100 == 0) {
                techDb.saveOffset("kafka", topic + "-0", lastOffset, null);
            }
        }
    
        private void processMessage(ConsumerRecord<String, String> record) {
            log.debug("Processing: key={}, value={}", record.key(), record.value());
            // Traitement...
        }
    
        @Override
        protected void doStop() {
            // Sauvegarder l'offset final
            techDb.saveOffset("kafka", topic + "-0", lastOffset, null);
    
            if (consumer != null) {
                consumer.close();
            }
        }
    
        @Override
        public Map<String, Object> getStats() {
            Map<String, Object> stats = new HashMap<>(super.getStats());
            stats.put("lastOffset", lastOffset);
            return stats;
        }
    }
    

    3. Worker HTTP API

    package com.example.worker;
    
    import eu.lmvi.socle.worker.Worker;
    import eu.lmvi.socle.kv.KvBus;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Component;
    import org.springframework.web.bind.annotation.*;
    
    @Component
    @RestController
    @RequestMapping("/api/orders")
    public class OrderApiWorker implements Worker {
    
        @Autowired
        private KvBus kvBus;
    
        @Autowired
        private OrderService orderService;
    
        private volatile boolean running = false;
    
        @Override
        public String getName() {
            return "order-api";
        }
    
        @Override
        public boolean isPassive() {
            return true;  // Pas de doWork cyclique
        }
    
        @Override
        public void initialize() {}
    
        @Override
        public void start() {
            running = true;
        }
    
        @Override
        public void doWork() {
            // Passif - traitement via endpoints REST
        }
    
        @Override
        public void stop() {
            running = false;
        }
    
        @Override
        public boolean isHealthy() {
            return running;
        }
    
        @Override
        public Map<String, Object> getStats() {
            return Map.of("running", running);
        }
    
        // === REST Endpoints ===
    
        @PostMapping
        public ResponseEntity<Order> createOrder(@RequestBody CreateOrderRequest request) {
            Order order = orderService.create(request);
    
            // Cache l'order
            kvBus.putJson("order:" + order.getId(), order);
            kvBus.setTtl("order:" + order.getId(), Duration.ofHours(1));
    
            return ResponseEntity.status(HttpStatus.CREATED).body(order);
        }
    
        @GetMapping("/{id}")
        public ResponseEntity<Order> getOrder(@PathVariable String id) {
            // Vérifier le cache d'abord
            Optional<Order> cached = kvBus.getJson("order:" + id, Order.class);
            if (cached.isPresent()) {
                return ResponseEntity.ok(cached.get());
            }
    
            // Sinon charger depuis la DB
            return orderService.findById(id)
                .map(ResponseEntity::ok)
                .orElse(ResponseEntity.notFound().build());
        }
    }
    

    4. Worker Schedulé (Cron)

    package com.example.worker;
    
    import eu.lmvi.socle.worker.AbstractWorker;
    import org.springframework.stereotype.Component;
    
    @Component
    public class DailyReportWorker extends AbstractWorker {
    
        @Override
        public String getName() {
            return "daily-report";
        }
    
        @Override
        public String getSchedule() {
            return "0 0 6 * * ?";  // Tous les jours à 6h
        }
    
        @Override
        public boolean isScheduled() {
            return true;
        }
    
        @Override
        protected void doProcess() {
            log.info("Generating daily report...");
    
            // Collecter les données
            ReportData data = collectReportData();
    
            // Générer le rapport
            Report report = generateReport(data);
    
            // Envoyer par email
            sendReportByEmail(report);
    
            log.info("Daily report sent successfully");
            incrementProcessed();
        }
    
        private ReportData collectReportData() {
            // ...
            return new ReportData();
        }
    
        private Report generateReport(ReportData data) {
            // ...
            return new Report();
        }
    
        private void sendReportByEmail(Report report) {
            // ...
        }
    }
    

    5. Pipeline de traitement

    package com.example.pipeline;
    
    import eu.lmvi.socle.pipeline.*;
    import org.springframework.stereotype.Component;
    
    @Component
    public class OrderProcessingPipeline {
    
        @Autowired
        private PipelineEngine engine;
    
        @Autowired
        private OrderValidator validator;
    
        @Autowired
        private OrderEnricher enricher;
    
        @Autowired
        private PaymentProcessor paymentProcessor;
    
        @Autowired
        private NotificationService notificationService;
    
        public PipelineResult<ProcessedOrder> process(Order order) {
            Pipeline<Order, ProcessedOrder> pipeline = PipelineBuilder
                .<Order, ProcessedOrder>create("order-processing")
                .addStep(new ValidationStep(validator))
                .addStep(new EnrichmentStep(enricher))
                .addStep(new PaymentStep(paymentProcessor))
                .addStep(new NotificationStep(notificationService))
                .build();
    
            return engine.execute(pipeline, order);
        }
    }
    
    // Étape de validation
    class ValidationStep implements PipelineStep<Order, ValidatedOrder> {
    
        private final OrderValidator validator;
    
        public ValidationStep(OrderValidator validator) {
            this.validator = validator;
        }
    
        @Override
        public String getName() {
            return "validation";
        }
    
        @Override
        public StepResult<ValidatedOrder> execute(Order input, PipelineContext context) {
            List<String> errors = validator.validate(input);
            if (!errors.isEmpty()) {
                return StepResult.failure(getName(),
                    new ValidationException(errors), Duration.ZERO, 1);
            }
            return StepResult.success(getName(), new ValidatedOrder(input), Duration.ZERO);
        }
    
        @Override
        public boolean isRetryable() {
            return false;
        }
    }
    
    // Étape de notification (optionnelle)
    class NotificationStep implements PipelineStep<ProcessedOrder, ProcessedOrder> {
    
        private final NotificationService notificationService;
    
        @Override
        public String getName() {
            return "notification";
        }
    
        @Override
        public StepResult<ProcessedOrder> execute(ProcessedOrder input, PipelineContext context) {
            try {
                notificationService.sendOrderConfirmation(input);
                return StepResult.success(getName(), input, Duration.ZERO);
            } catch (Exception e) {
                return StepResult.failure(getName(), e, Duration.ZERO, 1);
            }
        }
    
        @Override
        public boolean isOptional() {
            return true;  // Le pipeline continue même si la notif échoue
        }
    }
    

    6. Service avec résilience

    package com.example.service;
    
    import eu.lmvi.socle.resilience.*;
    import org.springframework.stereotype.Service;
    
    @Service
    public class ExternalApiService {
    
        @Autowired
        private RetryTemplate retryTemplate;
    
        @Autowired
        private CircuitBreakerRegistry cbRegistry;
    
        @Autowired
        private KvBus kvBus;
    
        private final OkHttpClient httpClient;
    
        public Data fetchData(String id) {
            // Circuit breaker + Retry + Cache fallback
            CircuitBreaker cb = cbRegistry.getOrCreate("external-api");
    
            return cb.executeWithFallback(
                () -> retryTemplate.execute(() -> doFetchData(id)),
                () -> getCachedData(id)
            );
        }
    
        private Data doFetchData(String id) throws IOException {
            Request request = new Request.Builder()
                .url("https://api.example.com/data/" + id)
                .build();
    
            try (Response response = httpClient.newCall(request).execute()) {
                if (!response.isSuccessful()) {
                    throw new IOException("API returned " + response.code());
                }
    
                Data data = parseResponse(response.body().string());
    
                // Mettre en cache
                kvBus.putJson("cache:data:" + id, data);
                kvBus.setTtl("cache:data:" + id, Duration.ofMinutes(5));
    
                return data;
            }
        }
    
        private Data getCachedData(String id) {
            return kvBus.getJson("cache:data:" + id, Data.class)
                .orElseThrow(() -> new RuntimeException("No cached data available"));
        }
    }
    

    7. Configuration multi-environnement

    application.yml (base)

    socle:
      app_name: ${APP_NAME:my-app}
      env_name: ${ENV_NAME:DEV}
      region: ${REGION:local}
    
    spring:
      profiles:
        active: ${PROFILE:dev}
    

    application-dev.yml

    socle:
      kvbus:
        mode: in_memory
      techdb:
        console:
          enabled: true
      admin:
        auth:
          enabled: false
    
    logging:
      level:
        eu.lmvi.socle: DEBUG
    

    application-prod.yml

    socle:
      kvbus:
        mode: redis
        redis:
          host: ${REDIS_HOST}
          password: ${REDIS_PASSWORD}
      techdb:
        console:
          enabled: false
      logging:
        forwarder:
          enabled: true
      auth:
        enabled: true
      admin:
        auth:
          enabled: true
    
    logging:
      level:
        eu.lmvi.socle: INFO
    

    8. Dockerfile complet

    # Build stage
    FROM eclipse-temurin:21-jdk-alpine AS build
    WORKDIR /app
    COPY pom.xml .
    COPY src ./src
    RUN apk add --no-cache maven && \
        mvn clean package -DskipTests
    
    # Runtime stage
    FROM eclipse-temurin:21-jre-alpine
    WORKDIR /app
    
    # Security
    RUN addgroup -S app && adduser -S app -G app
    USER app
    
    # Copy artifact
    COPY --from=build --chown=app:app /app/target/*.jar app.jar
    
    # Config
    ENV JAVA_OPTS="-XX:+UseContainerSupport -XX:MaxRAMPercentage=75.0 -Djava.security.egd=file:/dev/./urandom"
    
    # Health check
    HEALTHCHECK --interval=30s --timeout=10s --retries=3 \
        CMD wget -qO- http://localhost:8080/admin/health/live || exit 1
    
    EXPOSE 8080
    
    ENTRYPOINT ["sh", "-c", "java $JAVA_OPTS -jar app.jar"]
    

    9. Kubernetes deployment complet

    Voir 16-KUBERNETES pour l’exemple complet de déploiement K8s.

    10. Références

  • Socle V004 – Métriques

    Socle V004 – Métriques

    15 – Metrics

    Version : 4.0.0 Date : 2025-12-09

    1. Introduction

    Le Socle V4 expose des métriques au format Prometheus pour le monitoring et l’alerting.

    Types de métriques

    • Counter : Valeur qui ne fait qu’augmenter (requêtes, erreurs)
    • Gauge : Valeur qui peut monter et descendre (connexions actives)
    • Histogram : Distribution de valeurs (latences)
    • Summary : Similaire à histogram avec percentiles pré-calculés

    2. Configuration

    2.1 application.yml

    management:
      endpoints:
        web:
          exposure:
            include: prometheus,health,info,metrics
          base-path: /actuator
      endpoint:
        prometheus:
          enabled: true
      metrics:
        export:
          prometheus:
            enabled: true
        tags:
          application: ${socle.app_name}
          environment: ${socle.env_name}
          region: ${socle.region}
    

    2.2 Dépendances Maven

    <dependency>
        <groupId>io.micrometer</groupId>
        <artifactId>micrometer-registry-prometheus</artifactId>
    </dependency>
    

    3. Métriques Socle

    3.1 Métriques Workers

    # Nombre de workers
    socle_workers_total{application="socle-v4"} 5
    
    # Workers healthy
    socle_workers_healthy{application="socle-v4"} 5
    
    # Workers unhealthy
    socle_workers_unhealthy{application="socle-v4"} 0
    
    # État par worker
    socle_worker_status{worker="kafka-consumer",status="RUNNING"} 1
    socle_worker_status{worker="order-processor",status="RUNNING"} 1
    
    # Heartbeats par worker
    socle_worker_heartbeats_total{worker="kafka-consumer"} 1234
    socle_worker_missed_heartbeats{worker="kafka-consumer"} 0
    

    3.2 Métriques KvBus

    # Opérations
    socle_kvbus_operations_total{operation="get"} 12345
    socle_kvbus_operations_total{operation="put"} 6789
    socle_kvbus_operations_total{operation="delete"} 234
    
    # Latence
    socle_kvbus_operation_duration_seconds{operation="get",quantile="0.5"} 0.001
    socle_kvbus_operation_duration_seconds{operation="get",quantile="0.95"} 0.005
    socle_kvbus_operation_duration_seconds{operation="get",quantile="0.99"} 0.01
    
    # Nombre de clés
    socle_kvbus_keys_count 456
    

    3.3 Métriques Pipeline

    # Exécutions
    socle_pipeline_executions_total{pipeline="order-processing",status="SUCCESS"} 1234
    socle_pipeline_executions_total{pipeline="order-processing",status="FAILURE"} 12
    
    # Durée
    socle_pipeline_duration_seconds{pipeline="order-processing",quantile="0.5"} 0.5
    socle_pipeline_duration_seconds{pipeline="order-processing",quantile="0.95"} 2.0
    socle_pipeline_duration_seconds{pipeline="order-processing",quantile="0.99"} 5.0
    
    # Steps
    socle_pipeline_step_duration_seconds{step="validation",quantile="0.5"} 0.01
    socle_pipeline_step_duration_seconds{step="processing",quantile="0.5"} 0.3
    

    3.4 Métriques Resilience

    # Circuit breaker état (0=CLOSED, 1=HALF_OPEN, 2=OPEN)
    socle_circuit_breaker_state{name="payment-gateway"} 0
    
    # Tentatives de retry
    socle_retry_attempts_total{operation="external-api",attempt="1",success="true"} 1000
    socle_retry_attempts_total{operation="external-api",attempt="2",success="true"} 50
    socle_retry_attempts_total{operation="external-api",attempt="3",success="false"} 5
    

    3.5 Métriques TechDB (V4)

    # Opérations
    socle_techdb_operations_total{operation="saveOffset"} 5678
    socle_techdb_operations_total{operation="getOffset"} 12345
    
    # Taille des tables
    socle_techdb_rows_count{table="socle_offsets"} 23
    socle_techdb_rows_count{table="socle_events"} 456
    socle_techdb_rows_count{table="socle_log_fallback"} 0
    

    3.6 Métriques LogForwarder (V4)

    # Queue
    socle_logforwarder_queue_size 45
    socle_logforwarder_queue_capacity 10000
    
    # Logs envoyés
    socle_logforwarder_logs_sent_total 123456
    socle_logforwarder_logs_failed_total 23
    socle_logforwarder_logs_fallback_total 0
    
    # Batches
    socle_logforwarder_batches_sent_total 1234
    socle_logforwarder_batch_size{quantile="0.5"} 100
    

    4. Implémentation

    4.1 Enregistrement des métriques

    package eu.lmvi.socle.metrics;
    
    @Component
    public class SocleMetrics {
    
        private final MeterRegistry registry;
    
        // Counters
        private final Counter requestsTotal;
        private final Counter errorsTotal;
    
        // Gauges
        private final AtomicInteger activeConnections = new AtomicInteger(0);
    
        // Timers
        private final Timer requestDuration;
    
        public SocleMetrics(MeterRegistry registry) {
            this.registry = registry;
    
            // Counter
            this.requestsTotal = Counter.builder("socle_requests_total")
                .description("Total number of requests")
                .register(registry);
    
            this.errorsTotal = Counter.builder("socle_errors_total")
                .description("Total number of errors")
                .register(registry);
    
            // Gauge
            Gauge.builder("socle_active_connections", activeConnections, AtomicInteger::get)
                .description("Number of active connections")
                .register(registry);
    
            // Timer
            this.requestDuration = Timer.builder("socle_request_duration_seconds")
                .description("Request duration in seconds")
                .publishPercentiles(0.5, 0.95, 0.99)
                .register(registry);
        }
    
        public void recordRequest() {
            requestsTotal.increment();
        }
    
        public void recordError() {
            errorsTotal.increment();
        }
    
        public void connectionOpened() {
            activeConnections.incrementAndGet();
        }
    
        public void connectionClosed() {
            activeConnections.decrementAndGet();
        }
    
        public Timer.Sample startTimer() {
            return Timer.start(registry);
        }
    
        public void stopTimer(Timer.Sample sample) {
            sample.stop(requestDuration);
        }
    }
    

    4.2 Utilisation dans le code

    @Service
    public class OrderService {
    
        @Autowired
        private SocleMetrics metrics;
    
        public Order processOrder(Order order) {
            Timer.Sample sample = metrics.startTimer();
            metrics.recordRequest();
    
            try {
                Order result = doProcess(order);
                return result;
            } catch (Exception e) {
                metrics.recordError();
                throw e;
            } finally {
                metrics.stopTimer(sample);
            }
        }
    }
    

    4.3 Métriques avec tags

    @Component
    public class WorkerMetrics {
    
        private final MeterRegistry registry;
    
        public void recordWorkerStatus(String workerName, String status) {
            Gauge.builder("socle_worker_status", () -> 1)
                .tag("worker", workerName)
                .tag("status", status)
                .register(registry);
        }
    
        public void recordProcessed(String workerName, String type) {
            Counter.builder("socle_worker_processed_total")
                .tag("worker", workerName)
                .tag("type", type)
                .register(registry)
                .increment();
        }
    }
    

    5. Endpoint Prometheus

    5.1 Accès

    curl http://localhost:8080/actuator/prometheus
    

    5.2 Sortie

    # HELP socle_workers_total Number of workers
    # TYPE socle_workers_total gauge
    socle_workers_total{application="socle-v4",environment="PROD",region="MTQ"} 5
    
    # HELP socle_workers_healthy Number of healthy workers
    # TYPE socle_workers_healthy gauge
    socle_workers_healthy{application="socle-v4",environment="PROD",region="MTQ"} 5
    
    # HELP socle_requests_total Total number of requests
    # TYPE socle_requests_total counter
    socle_requests_total{application="socle-v4",environment="PROD",region="MTQ"} 12345
    
    # HELP socle_request_duration_seconds Request duration in seconds
    # TYPE socle_request_duration_seconds summary
    socle_request_duration_seconds{application="socle-v4",quantile="0.5"} 0.05
    socle_request_duration_seconds{application="socle-v4",quantile="0.95"} 0.2
    socle_request_duration_seconds{application="socle-v4",quantile="0.99"} 0.5
    socle_request_duration_seconds_count{application="socle-v4"} 12345
    socle_request_duration_seconds_sum{application="socle-v4"} 617.25
    

    6. Prometheus Configuration

    6.1 prometheus.yml

    global:
      scrape_interval: 15s
    
    scrape_configs:
      - job_name: 'socle-v4'
        metrics_path: '/actuator/prometheus'
        static_configs:
          - targets: ['socle-app:8080']
            labels:
              app: 'socle-v4'
              env: 'prod'
    
      - job_name: 'socle-v4-kubernetes'
        kubernetes_sd_configs:
          - role: pod
        relabel_configs:
          - source_labels: [__meta_kubernetes_pod_annotation_prometheus_io_scrape]
            action: keep
            regex: true
          - source_labels: [__meta_kubernetes_pod_annotation_prometheus_io_path]
            action: replace
            target_label: __metrics_path__
            regex: (.+)
    

    6.2 Kubernetes annotations

    apiVersion: v1
    kind: Pod
    metadata:
      annotations:
        prometheus.io/scrape: "true"
        prometheus.io/path: "/actuator/prometheus"
        prometheus.io/port: "8080"
    

    7. Grafana Dashboards

    7.1 Exemple de requêtes

    # Taux de requêtes par seconde
    rate(socle_requests_total[5m])
    
    # Taux d'erreurs
    rate(socle_errors_total[5m]) / rate(socle_requests_total[5m]) * 100
    
    # Latence P95
    histogram_quantile(0.95, rate(socle_request_duration_seconds_bucket[5m]))
    
    # Workers unhealthy
    socle_workers_unhealthy
    
    # Circuit breakers ouverts
    socle_circuit_breaker_state == 2
    
    # Queue LogForwarder
    socle_logforwarder_queue_size / socle_logforwarder_queue_capacity * 100
    

    7.2 Dashboard JSON

    {
      "title": "Socle V4 Dashboard",
      "panels": [
        {
          "title": "Request Rate",
          "type": "graph",
          "targets": [
            {
              "expr": "rate(socle_requests_total[5m])",
              "legendFormat": "{{application}}"
            }
          ]
        },
        {
          "title": "Error Rate",
          "type": "graph",
          "targets": [
            {
              "expr": "rate(socle_errors_total[5m]) / rate(socle_requests_total[5m]) * 100",
              "legendFormat": "Error %"
            }
          ]
        },
        {
          "title": "P95 Latency",
          "type": "graph",
          "targets": [
            {
              "expr": "histogram_quantile(0.95, rate(socle_request_duration_seconds_bucket[5m]))",
              "legendFormat": "P95"
            }
          ]
        },
        {
          "title": "Workers Status",
          "type": "stat",
          "targets": [
            {
              "expr": "socle_workers_healthy",
              "legendFormat": "Healthy"
            }
          ]
        }
      ]
    }
    

    8. Alerting

    8.1 Prometheus Alertmanager rules

    groups:
      - name: socle-alerts
        rules:
          - alert: SocleHighErrorRate
            expr: rate(socle_errors_total[5m]) / rate(socle_requests_total[5m]) > 0.05
            for: 5m
            labels:
              severity: warning
            annotations:
              summary: "High error rate on {{ $labels.application }}"
              description: "Error rate is {{ $value | humanizePercentage }}"
    
          - alert: SocleWorkerUnhealthy
            expr: socle_workers_unhealthy > 0
            for: 2m
            labels:
              severity: critical
            annotations:
              summary: "Unhealthy workers on {{ $labels.application }}"
              description: "{{ $value }} workers are unhealthy"
    
          - alert: SocleCircuitBreakerOpen
            expr: socle_circuit_breaker_state == 2
            for: 5m
            labels:
              severity: warning
            annotations:
              summary: "Circuit breaker {{ $labels.name }} is OPEN"
    
          - alert: SocleLogForwarderQueueHigh
            expr: socle_logforwarder_queue_size / socle_logforwarder_queue_capacity > 0.8
            for: 5m
            labels:
              severity: warning
            annotations:
              summary: "LogForwarder queue is {{ $value | humanizePercentage }} full"
    

    9. Bonnes pratiques

    DO

    • Utiliser des noms de métriques cohérents (socle_*)
    • Ajouter des tags pertinents (application, environment, region)
    • Utiliser des histogrammes pour les latences
    • Définir des alertes sur les métriques critiques
    • Documenter les métriques

    DON’T

    • Ne pas créer trop de métriques (cardinalité)
    • Ne pas utiliser de valeurs à haute cardinalité dans les tags
    • Ne pas oublier les métriques d’erreur
    • Ne pas ignorer les métriques de queue/buffer

    10. Références

  • Socle V004 – gRPC Inter-Socles

    Socle V004 – gRPC Inter-Socles

    31 – Communication gRPC Inter-Socles

    Vue d’ensemble

    Le module gRPC permet aux instances Socle V4 de communiquer entre elles via streaming bidirectionnel. Il offre :

    • Sessions : Gestion de sessions multi-participants avec TTL
    • Streaming bidirectionnel : Communication temps reel entre Socles
    • Pipeline de traitement : Transformation des messages via Janino et JDM
    • Fan-out : Routage broadcast ou cible vers les participants
    • Pool de connexions : Connexions persistantes vers les peers

    Architecture

    ┌─────────────────────────────────────────────────────────────────┐
    │                         Socle A                                  │
    │  ┌─────────────┐  ┌─────────────┐  ┌─────────────────────────┐  │
    │  │ SessionMgr  │  │  Pipeline   │  │    GrpcServerWorker     │  │
    │  │             │  │  Executor   │  │    (port 9400)          │  │
    │  └─────────────┘  └─────────────┘  └─────────────────────────┘  │
    │         │               │                      │                 │
    │         └───────────────┼──────────────────────┘                 │
    │                         │                                        │
    │              ┌──────────┴──────────┐                            │
    │              │   gRPC Services     │                            │
    │              │  - SocleComm        │                            │
    │              │  - SessionService   │                            │
    │              │  - DiscoveryService │                            │
    │              └──────────┬──────────┘                            │
    └─────────────────────────┼───────────────────────────────────────┘
                              │ gRPC/HTTP2
                              ▼
    ┌─────────────────────────────────────────────────────────────────┐
    │                         Socle B                                  │
    │              ┌──────────────────────┐                           │
    │              │  PeerConnectionPool  │                           │
    │              └──────────────────────┘                           │
    └─────────────────────────────────────────────────────────────────┘
    

    Configuration

    application.yml

    socle:
      grpc:
        # Activation du module
        enabled: ${GRPC_ENABLED:false}
    
        # Port du serveur gRPC
        port: ${GRPC_PORT:9400}
    
        # Identification du Socle
        socle-id: ${SOCLE_ID:${socle.app_name}}
        socle-version: ${socle.version}
    
        # Limites serveur
        max-inbound-message-size: ${GRPC_MAX_MESSAGE_SIZE:4194304}  # 4MB
        max-concurrent-calls-per-connection: ${GRPC_MAX_CONCURRENT_CALLS:100}
    
        # Sessions
        session:
          ttl-seconds: ${GRPC_SESSION_TTL:1800}           # 30 min
          max-participants: ${GRPC_MAX_PARTICIPANTS:100}
          persist-to-tech-db: ${GRPC_PERSIST_SESSIONS:true}
          cache-in-redis: ${GRPC_CACHE_REDIS:true}
    
        # Pipeline de traitement
        pipeline:
          enabled: ${GRPC_PIPELINE_ENABLED:true}
          config-cache-ttl-seconds: ${GRPC_PIPELINE_CACHE_TTL:300}
    
        # Connexions peer
        peer:
          max-channels-per-peer: ${GRPC_PEER_MAX_CHANNELS:4}
          connection-timeout-ms: ${GRPC_PEER_CONNECT_TIMEOUT:5000}
          idle-timeout-seconds: ${GRPC_PEER_IDLE_TIMEOUT:300}
          keep-alive-enabled: ${GRPC_PEER_KEEPALIVE:true}
          keep-alive-time-seconds: ${GRPC_PEER_KEEPALIVE_TIME:30}
          keep-alive-timeout-seconds: ${GRPC_PEER_KEEPALIVE_TIMEOUT:10}
    

    Variables d’environnement

    Variable Default Description
    GRPC_ENABLED false Active le module gRPC
    GRPC_PORT 9400 Port du serveur gRPC
    SOCLE_ID ${app_name} Identifiant unique du Socle
    GRPC_SESSION_TTL 1800 TTL des sessions en secondes
    GRPC_MAX_PARTICIPANTS 100 Max participants par session
    GRPC_PIPELINE_ENABLED true Active le pipeline de traitement

    Services gRPC

    DiscoveryService

    Service de decouverte et health check.

    service DiscoveryService {
        rpc GetCapabilities(CapabilitiesRequest) returns (CapabilitiesResponse);
        rpc Ping(PingRequest) returns (PingResponse);
    }
    

    Test avec grpcurl :

    # Ping
    grpcurl -plaintext localhost:9400 socle.DiscoveryService/Ping
    
    # Capabilities
    grpcurl -plaintext localhost:9400 socle.DiscoveryService/GetCapabilities
    

    SessionService

    Gestion du cycle de vie des sessions.

    service SessionService {
        rpc CreateSession(CreateSessionRequest) returns (SessionInfo);
        rpc JoinSession(JoinSessionRequest) returns (JoinSessionResponse);
        rpc LeaveSession(LeaveSessionRequest) returns (LeaveSessionResponse);
        rpc GetSession(GetSessionRequest) returns (SessionInfo);
        rpc CloseSession(CloseSessionRequest) returns (CloseSessionResponse);
    }
    

    Exemples :

    # Creer une session
    grpcurl -plaintext -d '{
      "session_type": "chat",
      "owner_id": "user1",
      "ttl_seconds": 3600
    }' localhost:9400 socle.SessionService/CreateSession
    
    # Joindre une session
    grpcurl -plaintext -d '{
      "session_id": "uuid-de-la-session",
      "participant_id": "user2",
      "display_name": "User 2"
    }' localhost:9400 socle.SessionService/JoinSession
    
    # Obtenir info session
    grpcurl -plaintext -d '{
      "session_id": "uuid-de-la-session"
    }' localhost:9400 socle.SessionService/GetSession
    

    SocleComm

    Streaming bidirectionnel pour l’echange de messages.

    service SocleComm {
        rpc Exchange(stream SessionMessage) returns (stream SessionMessage);
    }
    

    Format des messages :

    message SessionMessage {
        string session_id = 1;
        string sender_id = 2;
        MessageKind kind = 3;        // JOIN, LEAVE, DATA, REQUEST, RESPONSE, etc.
        repeated string target_ids = 4;  // Vide = broadcast
        string correlation_id = 5;
        int64 timestamp = 6;
        string payload = 7;          // JSON
        map<string, string> headers = 8;
    }
    

    Sessions

    Cycle de vie

    CREATE ──► ACTIVE ──► CLOSING ──► CLOSED
                  │
                  └──► EXPIRED (TTL depasse)
    

    Stockage

    Les sessions sont stockees dans :

    1. Cache memoire : Acces rapide, stream observers
    2. Redis (si KvBus en mode redis) : Partage entre instances
    3. TechDB : Audit et persistance

    Tables TechDB

    -- Sessions (audit)
    CREATE TABLE grpc_sessions (
        x_id BIGINT GENERATED BY DEFAULT AS IDENTITY PRIMARY KEY,
        x_dateCreated TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT CURRENT_TIMESTAMP,
        x_dateChanged TIMESTAMP WITH TIME ZONE,
        x_sub VARCHAR(255),
        x_partition VARCHAR(30),
        x_comment CLOB,
        session_id VARCHAR(36) NOT NULL UNIQUE,
        session_type VARCHAR(100) NOT NULL,
        owner_id VARCHAR(100) NOT NULL,
        status VARCHAR(20) DEFAULT 'ACTIVE',
        datas CLOB
    );
    
    -- Configuration pipeline par type de session
    CREATE TABLE grpc_pipeline_configs (
        x_id BIGINT GENERATED BY DEFAULT AS IDENTITY PRIMARY KEY,
        x_dateCreated TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT CURRENT_TIMESTAMP,
        x_dateChanged TIMESTAMP WITH TIME ZONE,
        x_sub VARCHAR(255),
        x_partition VARCHAR(30),
        x_comment CLOB,
        session_type VARCHAR(100) NOT NULL UNIQUE,
        janino_pre VARCHAR(255),
        jdm_rules VARCHAR(255),
        janino_post VARCHAR(255),
        default_targets VARCHAR(50) DEFAULT 'broadcast',
        enabled BOOLEAN DEFAULT TRUE,
        datas CLOB
    );
    

    Pipeline de traitement

    Le pipeline permet de transformer les messages et determiner leur routage.

    Etapes

    Message entrant
          │
          ▼
    ┌─────────────────┐
    │  1. Janino PRE  │  Transformation du payload
    └────────┬────────┘
             │
             ▼
    ┌─────────────────┐
    │  2. JDM Rules   │  Determination des cibles (routing)
    └────────┬────────┘
             │
             ▼
    ┌─────────────────┐
    │  3. Janino POST │  Transformation finale
    └────────┬────────┘
             │
             ▼
        Routage
       /        \
    Broadcast   Cible
    

    Configuration du pipeline

    Inserez dans grpc_pipeline_configs :

    INSERT INTO grpc_pipeline_configs
    (session_type, janino_pre, jdm_rules, janino_post, default_targets, enabled)
    VALUES
    ('chat', NULL, 'chat_routing', NULL, 'broadcast', TRUE);
    

    Exemple script Janino PRE

    // repository/scripts/java/grpc/chat_filter.java
    public class ChatFilter {
        public Object execute(Map<String, Object> context) {
            Map<String, Object> payload = (Map<String, Object>) context.get("payload");
            String senderId = (String) context.get("senderId");
    
            // Ajouter metadata
            payload.put("processed_at", System.currentTimeMillis());
            payload.put("sender", senderId);
    
            return payload;
        }
    }
    

    Exemple modele JDM pour routing

    {
      "name": "chat_routing",
      "nodes": [
        {
          "id": "input",
          "type": "inputNode",
          "content": {
            "fields": [
              {"field": "payload.target", "type": "string"},
              {"field": "payload.type", "type": "string"}
            ]
          }
        },
        {
          "id": "decision",
          "type": "decisionTableNode",
          "content": {
            "hitPolicy": "first",
            "inputs": [
              {"field": "payload.type"}
            ],
            "outputs": [
              {"field": "targets", "type": "string"},
              {"field": "broadcast", "type": "boolean"}
            ],
            "rules": [
              {"_input": ["private"], "_output": ["${payload.target}", false]},
              {"_input": ["broadcast"], "_output": ["", true]},
              {"_input": ["*"], "_output": ["", true]}
            ]
          }
        }
      ]
    }
    

    Connexions Peer

    Ajouter un peer programmatiquement

    @Autowired
    private PeerConnectionPool peerPool;
    
    public void connectToPeer() {
        // Ajouter et connecter
        boolean connected = peerPool.addPeer("socle-b", "192.168.1.100", 9400);
    
        if (connected) {
            // Ouvrir un stream
            peerPool.openStream("socle-b", message -> {
                // Handler pour messages entrants
                System.out.println("Received: " + message.getPayload());
            });
    
            // Envoyer un message
            SessionMessage msg = SessionMessage.newBuilder()
                .setSessionId("...")
                .setSenderId("local-participant")
                .setKind(MessageKind.DATA)
                .setPayload("{\"text\":\"Hello\"}")
                .build();
    
            peerPool.sendToPeer("socle-b", msg);
        }
    }
    

    Broadcast vers tous les peers

    SessionMessage msg = SessionMessage.newBuilder()
        .setSessionId(sessionId)
        .setSenderId(myId)
        .setKind(MessageKind.DATA)
        .setPayload(jsonPayload)
        .build();
    
    int sent = peerPool.broadcast(msg, null);  // null = inclure tous
    

    Worker gRPC

    Priorites

    Methode Valeur Description
    getStartPriority() 800 Demarre apres Janino (25) et JDM (30)
    getStopPriority() 10 S’arrete tot pour drain des connexions

    Statistiques

    @Autowired
    private GrpcServerWorker grpcWorker;
    
    Map<String, Object> stats = grpcWorker.getStats();
    // {
    //   "running": true,
    //   "port": 9400,
    //   "sessions": { "cached_sessions": 5, "active_streams": 12 },
    //   "messaging": { "messages_received": 1234, "messages_sent": 5678 },
    //   "peers": { "connected_peers": 2, "total_messages_sent": 100 }
    // }
    

    Types de sessions supportes

    Type Description
    chat Communication temps reel
    sync Synchronisation de donnees
    broadcast Diffusion un-vers-plusieurs
    pipeline Traitement en chaine
    custom Type personnalise

    Securite

    TLS (a venir)

    Le support TLS sera ajoute dans une version future :

    socle:
      grpc:
        tls:
          enabled: true
          cert-path: /path/to/server.crt
          key-path: /path/to/server.key
          ca-path: /path/to/ca.crt  # Pour mTLS
    

    Authentification

    L’authentification peut etre implementee via :

    • Metadata gRPC (tokens dans headers)
    • Intercepteurs personnalises
    • Integration avec le module Auth existant

    Monitoring

    Metriques exposees

    Les metriques sont disponibles via l’endpoint Prometheus /actuator/prometheus :

    • grpc_sessions_active : Nombre de sessions actives
    • grpc_messages_received_total : Total messages recus
    • grpc_messages_sent_total : Total messages envoyes
    • grpc_peers_connected : Nombre de peers connectes

    Logs

    [grpc_server] Started on port 9400
    [grpc_session] Session created: abc-123 (type=chat, owner=user1)
    [grpc_session] Participant joined: user2 -> abc-123 (total: 2)
    [grpc_comm] Message received: session=abc-123, sender=user1, kind=DATA
    [grpc_pipeline] Executed for session abc-123 (targets=broadcast)
    

    Troubleshooting

    Le serveur ne demarre pas

    1. Verifiez que le port 9400 n’est pas utilise
    2. Verifiez GRPC_ENABLED=true
    3. Consultez les logs pour les erreurs d’initialisation

    Sessions expirent trop vite

    Augmentez le TTL :

    socle:
      grpc:
        session:
          ttl-seconds: 7200  # 2 heures
    

    Messages non delivres

    1. Verifiez que le destinataire a un stream actif
    2. Verifiez les logs du pipeline pour les erreurs
    3. Verifiez la configuration du routing dans grpc_pipeline_configs

    Erreurs de connexion peer

    1. Verifiez la connectivite reseau
    2. Verifiez que le peer a gRPC active
    3. Augmentez le timeout de connexion :
    socle:
      grpc:
        peer:
          connection-timeout-ms: 10000
    

    Structure des fichiers

    src/main/java/eu/lmvi/socle/grpc/
    ├── GrpcConfiguration.java        # Configuration Spring
    ├── GrpcServerWorker.java         # Worker principal
    ├── session/
    │   ├── Session.java              # Record session
    │   ├── Participant.java          # Record participant
    │   ├── SessionStatus.java        # Enum ACTIVE/CLOSING/CLOSED/EXPIRED
    │   ├── SessionManager.java       # Gestion sessions + streams
    │   └── SessionRedisRepository.java  # Persistance Redis
    ├── pipeline/
    │   ├── PipelineConfig.java       # Config pipeline
    │   ├── PipelineResult.java       # Resultat pipeline
    │   └── PipelineExecutor.java     # Execution Janino + JDM
    ├── peer/
    │   ├── PeerConnection.java       # Connexion unique
    │   └── PeerConnectionPool.java   # Pool avec reconnexion
    └── service/
        ├── SocleCommService.java     # Streaming bidirectionnel
        ├── SessionServiceImpl.java   # CRUD sessions
        └── DiscoveryServiceImpl.java # Discovery + Ping
    
    src/main/proto/
    └── socle_comm.proto              # Definition Protocol Buffers
    
  • Socle V004 – Métriques

    Socle V004 – Métriques

    15 – Metrics

    Version : 4.0.0 Date : 2025-12-09

    1. Introduction

    Le Socle V4 expose des métriques au format Prometheus pour le monitoring et l’alerting.

    Types de métriques

    • Counter : Valeur qui ne fait qu’augmenter (requêtes, erreurs)
    • Gauge : Valeur qui peut monter et descendre (connexions actives)
    • Histogram : Distribution de valeurs (latences)
    • Summary : Similaire à histogram avec percentiles pré-calculés

    2. Configuration

    2.1 application.yml

    management:
      endpoints:
        web:
          exposure:
            include: prometheus,health,info,metrics
          base-path: /actuator
      endpoint:
        prometheus:
          enabled: true
      metrics:
        export:
          prometheus:
            enabled: true
        tags:
          application: ${socle.app_name}
          environment: ${socle.env_name}
          region: ${socle.region}
    

    2.2 Dépendances Maven

    <dependency>
        <groupId>io.micrometer</groupId>
        <artifactId>micrometer-registry-prometheus</artifactId>
    </dependency>
    

    3. Métriques Socle

    3.1 Métriques Workers

    # Nombre de workers
    socle_workers_total{application="socle-v4"} 5
    
    # Workers healthy
    socle_workers_healthy{application="socle-v4"} 5
    
    # Workers unhealthy
    socle_workers_unhealthy{application="socle-v4"} 0
    
    # État par worker
    socle_worker_status{worker="kafka-consumer",status="RUNNING"} 1
    socle_worker_status{worker="order-processor",status="RUNNING"} 1
    
    # Heartbeats par worker
    socle_worker_heartbeats_total{worker="kafka-consumer"} 1234
    socle_worker_missed_heartbeats{worker="kafka-consumer"} 0
    

    3.2 Métriques KvBus

    # Opérations
    socle_kvbus_operations_total{operation="get"} 12345
    socle_kvbus_operations_total{operation="put"} 6789
    socle_kvbus_operations_total{operation="delete"} 234
    
    # Latence
    socle_kvbus_operation_duration_seconds{operation="get",quantile="0.5"} 0.001
    socle_kvbus_operation_duration_seconds{operation="get",quantile="0.95"} 0.005
    socle_kvbus_operation_duration_seconds{operation="get",quantile="0.99"} 0.01
    
    # Nombre de clés
    socle_kvbus_keys_count 456
    

    3.3 Métriques Pipeline

    # Exécutions
    socle_pipeline_executions_total{pipeline="order-processing",status="SUCCESS"} 1234
    socle_pipeline_executions_total{pipeline="order-processing",status="FAILURE"} 12
    
    # Durée
    socle_pipeline_duration_seconds{pipeline="order-processing",quantile="0.5"} 0.5
    socle_pipeline_duration_seconds{pipeline="order-processing",quantile="0.95"} 2.0
    socle_pipeline_duration_seconds{pipeline="order-processing",quantile="0.99"} 5.0
    
    # Steps
    socle_pipeline_step_duration_seconds{step="validation",quantile="0.5"} 0.01
    socle_pipeline_step_duration_seconds{step="processing",quantile="0.5"} 0.3
    

    3.4 Métriques Resilience

    # Circuit breaker état (0=CLOSED, 1=HALF_OPEN, 2=OPEN)
    socle_circuit_breaker_state{name="payment-gateway"} 0
    
    # Tentatives de retry
    socle_retry_attempts_total{operation="external-api",attempt="1",success="true"} 1000
    socle_retry_attempts_total{operation="external-api",attempt="2",success="true"} 50
    socle_retry_attempts_total{operation="external-api",attempt="3",success="false"} 5
    

    3.5 Métriques TechDB (V4)

    # Opérations
    socle_techdb_operations_total{operation="saveOffset"} 5678
    socle_techdb_operations_total{operation="getOffset"} 12345
    
    # Taille des tables
    socle_techdb_rows_count{table="socle_offsets"} 23
    socle_techdb_rows_count{table="socle_events"} 456
    socle_techdb_rows_count{table="socle_log_fallback"} 0
    

    3.6 Métriques LogForwarder (V4)

    # Queue
    socle_logforwarder_queue_size 45
    socle_logforwarder_queue_capacity 10000
    
    # Logs envoyés
    socle_logforwarder_logs_sent_total 123456
    socle_logforwarder_logs_failed_total 23
    socle_logforwarder_logs_fallback_total 0
    
    # Batches
    socle_logforwarder_batches_sent_total 1234
    socle_logforwarder_batch_size{quantile="0.5"} 100
    

    4. Implémentation

    4.1 Enregistrement des métriques

    package eu.lmvi.socle.metrics;
    
    @Component
    public class SocleMetrics {
    
        private final MeterRegistry registry;
    
        // Counters
        private final Counter requestsTotal;
        private final Counter errorsTotal;
    
        // Gauges
        private final AtomicInteger activeConnections = new AtomicInteger(0);
    
        // Timers
        private final Timer requestDuration;
    
        public SocleMetrics(MeterRegistry registry) {
            this.registry = registry;
    
            // Counter
            this.requestsTotal = Counter.builder("socle_requests_total")
                .description("Total number of requests")
                .register(registry);
    
            this.errorsTotal = Counter.builder("socle_errors_total")
                .description("Total number of errors")
                .register(registry);
    
            // Gauge
            Gauge.builder("socle_active_connections", activeConnections, AtomicInteger::get)
                .description("Number of active connections")
                .register(registry);
    
            // Timer
            this.requestDuration = Timer.builder("socle_request_duration_seconds")
                .description("Request duration in seconds")
                .publishPercentiles(0.5, 0.95, 0.99)
                .register(registry);
        }
    
        public void recordRequest() {
            requestsTotal.increment();
        }
    
        public void recordError() {
            errorsTotal.increment();
        }
    
        public void connectionOpened() {
            activeConnections.incrementAndGet();
        }
    
        public void connectionClosed() {
            activeConnections.decrementAndGet();
        }
    
        public Timer.Sample startTimer() {
            return Timer.start(registry);
        }
    
        public void stopTimer(Timer.Sample sample) {
            sample.stop(requestDuration);
        }
    }
    

    4.2 Utilisation dans le code

    @Service
    public class OrderService {
    
        @Autowired
        private SocleMetrics metrics;
    
        public Order processOrder(Order order) {
            Timer.Sample sample = metrics.startTimer();
            metrics.recordRequest();
    
            try {
                Order result = doProcess(order);
                return result;
            } catch (Exception e) {
                metrics.recordError();
                throw e;
            } finally {
                metrics.stopTimer(sample);
            }
        }
    }
    

    4.3 Métriques avec tags

    @Component
    public class WorkerMetrics {
    
        private final MeterRegistry registry;
    
        public void recordWorkerStatus(String workerName, String status) {
            Gauge.builder("socle_worker_status", () -> 1)
                .tag("worker", workerName)
                .tag("status", status)
                .register(registry);
        }
    
        public void recordProcessed(String workerName, String type) {
            Counter.builder("socle_worker_processed_total")
                .tag("worker", workerName)
                .tag("type", type)
                .register(registry)
                .increment();
        }
    }
    

    5. Endpoint Prometheus

    5.1 Accès

    curl http://localhost:8080/actuator/prometheus
    

    5.2 Sortie

    # HELP socle_workers_total Number of workers
    # TYPE socle_workers_total gauge
    socle_workers_total{application="socle-v4",environment="PROD",region="MTQ"} 5
    
    # HELP socle_workers_healthy Number of healthy workers
    # TYPE socle_workers_healthy gauge
    socle_workers_healthy{application="socle-v4",environment="PROD",region="MTQ"} 5
    
    # HELP socle_requests_total Total number of requests
    # TYPE socle_requests_total counter
    socle_requests_total{application="socle-v4",environment="PROD",region="MTQ"} 12345
    
    # HELP socle_request_duration_seconds Request duration in seconds
    # TYPE socle_request_duration_seconds summary
    socle_request_duration_seconds{application="socle-v4",quantile="0.5"} 0.05
    socle_request_duration_seconds{application="socle-v4",quantile="0.95"} 0.2
    socle_request_duration_seconds{application="socle-v4",quantile="0.99"} 0.5
    socle_request_duration_seconds_count{application="socle-v4"} 12345
    socle_request_duration_seconds_sum{application="socle-v4"} 617.25
    

    6. Prometheus Configuration

    6.1 prometheus.yml

    global:
      scrape_interval: 15s
    
    scrape_configs:
      - job_name: 'socle-v4'
        metrics_path: '/actuator/prometheus'
        static_configs:
          - targets: ['socle-app:8080']
            labels:
              app: 'socle-v4'
              env: 'prod'
    
      - job_name: 'socle-v4-kubernetes'
        kubernetes_sd_configs:
          - role: pod
        relabel_configs:
          - source_labels: [__meta_kubernetes_pod_annotation_prometheus_io_scrape]
            action: keep
            regex: true
          - source_labels: [__meta_kubernetes_pod_annotation_prometheus_io_path]
            action: replace
            target_label: __metrics_path__
            regex: (.+)
    

    6.2 Kubernetes annotations

    apiVersion: v1
    kind: Pod
    metadata:
      annotations:
        prometheus.io/scrape: "true"
        prometheus.io/path: "/actuator/prometheus"
        prometheus.io/port: "8080"
    

    7. Grafana Dashboards

    7.1 Exemple de requêtes

    # Taux de requêtes par seconde
    rate(socle_requests_total[5m])
    
    # Taux d'erreurs
    rate(socle_errors_total[5m]) / rate(socle_requests_total[5m]) * 100
    
    # Latence P95
    histogram_quantile(0.95, rate(socle_request_duration_seconds_bucket[5m]))
    
    # Workers unhealthy
    socle_workers_unhealthy
    
    # Circuit breakers ouverts
    socle_circuit_breaker_state == 2
    
    # Queue LogForwarder
    socle_logforwarder_queue_size / socle_logforwarder_queue_capacity * 100
    

    7.2 Dashboard JSON

    {
      "title": "Socle V4 Dashboard",
      "panels": [
        {
          "title": "Request Rate",
          "type": "graph",
          "targets": [
            {
              "expr": "rate(socle_requests_total[5m])",
              "legendFormat": "{{application}}"
            }
          ]
        },
        {
          "title": "Error Rate",
          "type": "graph",
          "targets": [
            {
              "expr": "rate(socle_errors_total[5m]) / rate(socle_requests_total[5m]) * 100",
              "legendFormat": "Error %"
            }
          ]
        },
        {
          "title": "P95 Latency",
          "type": "graph",
          "targets": [
            {
              "expr": "histogram_quantile(0.95, rate(socle_request_duration_seconds_bucket[5m]))",
              "legendFormat": "P95"
            }
          ]
        },
        {
          "title": "Workers Status",
          "type": "stat",
          "targets": [
            {
              "expr": "socle_workers_healthy",
              "legendFormat": "Healthy"
            }
          ]
        }
      ]
    }
    

    8. Alerting

    8.1 Prometheus Alertmanager rules

    groups:
      - name: socle-alerts
        rules:
          - alert: SocleHighErrorRate
            expr: rate(socle_errors_total[5m]) / rate(socle_requests_total[5m]) > 0.05
            for: 5m
            labels:
              severity: warning
            annotations:
              summary: "High error rate on {{ $labels.application }}"
              description: "Error rate is {{ $value | humanizePercentage }}"
    
          - alert: SocleWorkerUnhealthy
            expr: socle_workers_unhealthy > 0
            for: 2m
            labels:
              severity: critical
            annotations:
              summary: "Unhealthy workers on {{ $labels.application }}"
              description: "{{ $value }} workers are unhealthy"
    
          - alert: SocleCircuitBreakerOpen
            expr: socle_circuit_breaker_state == 2
            for: 5m
            labels:
              severity: warning
            annotations:
              summary: "Circuit breaker {{ $labels.name }} is OPEN"
    
          - alert: SocleLogForwarderQueueHigh
            expr: socle_logforwarder_queue_size / socle_logforwarder_queue_capacity > 0.8
            for: 5m
            labels:
              severity: warning
            annotations:
              summary: "LogForwarder queue is {{ $value | humanizePercentage }} full"
    

    9. Bonnes pratiques

    DO

    • Utiliser des noms de métriques cohérents (socle_*)
    • Ajouter des tags pertinents (application, environment, region)
    • Utiliser des histogrammes pour les latences
    • Définir des alertes sur les métriques critiques
    • Documenter les métriques

    DON’T

    • Ne pas créer trop de métriques (cardinalité)
    • Ne pas utiliser de valeurs à haute cardinalité dans les tags
    • Ne pas oublier les métriques d’erreur
    • Ne pas ignorer les métriques de queue/buffer

    10. Références

  • Socle V004 – gRPC Inter-Socles

    Socle V004 – gRPC Inter-Socles

    31 – Communication gRPC Inter-Socles

    Vue d’ensemble

    Le module gRPC permet aux instances Socle V4 de communiquer entre elles via streaming bidirectionnel. Il offre :

    • Sessions : Gestion de sessions multi-participants avec TTL
    • Streaming bidirectionnel : Communication temps reel entre Socles
    • Pipeline de traitement : Transformation des messages via Janino et JDM
    • Fan-out : Routage broadcast ou cible vers les participants
    • Pool de connexions : Connexions persistantes vers les peers

    Architecture

    ┌─────────────────────────────────────────────────────────────────┐
    │                         Socle A                                  │
    │  ┌─────────────┐  ┌─────────────┐  ┌─────────────────────────┐  │
    │  │ SessionMgr  │  │  Pipeline   │  │    GrpcServerWorker     │  │
    │  │             │  │  Executor   │  │    (port 9400)          │  │
    │  └─────────────┘  └─────────────┘  └─────────────────────────┘  │
    │         │               │                      │                 │
    │         └───────────────┼──────────────────────┘                 │
    │                         │                                        │
    │              ┌──────────┴──────────┐                            │
    │              │   gRPC Services     │                            │
    │              │  - SocleComm        │                            │
    │              │  - SessionService   │                            │
    │              │  - DiscoveryService │                            │
    │              └──────────┬──────────┘                            │
    └─────────────────────────┼───────────────────────────────────────┘
                              │ gRPC/HTTP2
                              ▼
    ┌─────────────────────────────────────────────────────────────────┐
    │                         Socle B                                  │
    │              ┌──────────────────────┐                           │
    │              │  PeerConnectionPool  │                           │
    │              └──────────────────────┘                           │
    └─────────────────────────────────────────────────────────────────┘
    

    Configuration

    application.yml

    socle:
      grpc:
        # Activation du module
        enabled: ${GRPC_ENABLED:false}
    
        # Port du serveur gRPC
        port: ${GRPC_PORT:9400}
    
        # Identification du Socle
        socle-id: ${SOCLE_ID:${socle.app_name}}
        socle-version: ${socle.version}
    
        # Limites serveur
        max-inbound-message-size: ${GRPC_MAX_MESSAGE_SIZE:4194304}  # 4MB
        max-concurrent-calls-per-connection: ${GRPC_MAX_CONCURRENT_CALLS:100}
    
        # Sessions
        session:
          ttl-seconds: ${GRPC_SESSION_TTL:1800}           # 30 min
          max-participants: ${GRPC_MAX_PARTICIPANTS:100}
          persist-to-tech-db: ${GRPC_PERSIST_SESSIONS:true}
          cache-in-redis: ${GRPC_CACHE_REDIS:true}
    
        # Pipeline de traitement
        pipeline:
          enabled: ${GRPC_PIPELINE_ENABLED:true}
          config-cache-ttl-seconds: ${GRPC_PIPELINE_CACHE_TTL:300}
    
        # Connexions peer
        peer:
          max-channels-per-peer: ${GRPC_PEER_MAX_CHANNELS:4}
          connection-timeout-ms: ${GRPC_PEER_CONNECT_TIMEOUT:5000}
          idle-timeout-seconds: ${GRPC_PEER_IDLE_TIMEOUT:300}
          keep-alive-enabled: ${GRPC_PEER_KEEPALIVE:true}
          keep-alive-time-seconds: ${GRPC_PEER_KEEPALIVE_TIME:30}
          keep-alive-timeout-seconds: ${GRPC_PEER_KEEPALIVE_TIMEOUT:10}
    

    Variables d’environnement

    Variable Default Description
    GRPC_ENABLED false Active le module gRPC
    GRPC_PORT 9400 Port du serveur gRPC
    SOCLE_ID ${app_name} Identifiant unique du Socle
    GRPC_SESSION_TTL 1800 TTL des sessions en secondes
    GRPC_MAX_PARTICIPANTS 100 Max participants par session
    GRPC_PIPELINE_ENABLED true Active le pipeline de traitement

    Services gRPC

    DiscoveryService

    Service de decouverte et health check.

    service DiscoveryService {
        rpc GetCapabilities(CapabilitiesRequest) returns (CapabilitiesResponse);
        rpc Ping(PingRequest) returns (PingResponse);
    }
    

    Test avec grpcurl :

    # Ping
    grpcurl -plaintext localhost:9400 socle.DiscoveryService/Ping
    
    # Capabilities
    grpcurl -plaintext localhost:9400 socle.DiscoveryService/GetCapabilities
    

    SessionService

    Gestion du cycle de vie des sessions.

    service SessionService {
        rpc CreateSession(CreateSessionRequest) returns (SessionInfo);
        rpc JoinSession(JoinSessionRequest) returns (JoinSessionResponse);
        rpc LeaveSession(LeaveSessionRequest) returns (LeaveSessionResponse);
        rpc GetSession(GetSessionRequest) returns (SessionInfo);
        rpc CloseSession(CloseSessionRequest) returns (CloseSessionResponse);
    }
    

    Exemples :

    # Creer une session
    grpcurl -plaintext -d '{
      "session_type": "chat",
      "owner_id": "user1",
      "ttl_seconds": 3600
    }' localhost:9400 socle.SessionService/CreateSession
    
    # Joindre une session
    grpcurl -plaintext -d '{
      "session_id": "uuid-de-la-session",
      "participant_id": "user2",
      "display_name": "User 2"
    }' localhost:9400 socle.SessionService/JoinSession
    
    # Obtenir info session
    grpcurl -plaintext -d '{
      "session_id": "uuid-de-la-session"
    }' localhost:9400 socle.SessionService/GetSession
    

    SocleComm

    Streaming bidirectionnel pour l’echange de messages.

    service SocleComm {
        rpc Exchange(stream SessionMessage) returns (stream SessionMessage);
    }
    

    Format des messages :

    message SessionMessage {
        string session_id = 1;
        string sender_id = 2;
        MessageKind kind = 3;        // JOIN, LEAVE, DATA, REQUEST, RESPONSE, etc.
        repeated string target_ids = 4;  // Vide = broadcast
        string correlation_id = 5;
        int64 timestamp = 6;
        string payload = 7;          // JSON
        map<string, string> headers = 8;
    }
    

    Sessions

    Cycle de vie

    CREATE ──► ACTIVE ──► CLOSING ──► CLOSED
                  │
                  └──► EXPIRED (TTL depasse)
    

    Stockage

    Les sessions sont stockees dans :

    1. Cache memoire : Acces rapide, stream observers
    2. Redis (si KvBus en mode redis) : Partage entre instances
    3. TechDB : Audit et persistance

    Tables TechDB

    -- Sessions (audit)
    CREATE TABLE grpc_sessions (
        x_id BIGINT GENERATED BY DEFAULT AS IDENTITY PRIMARY KEY,
        x_dateCreated TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT CURRENT_TIMESTAMP,
        x_dateChanged TIMESTAMP WITH TIME ZONE,
        x_sub VARCHAR(255),
        x_partition VARCHAR(30),
        x_comment CLOB,
        session_id VARCHAR(36) NOT NULL UNIQUE,
        session_type VARCHAR(100) NOT NULL,
        owner_id VARCHAR(100) NOT NULL,
        status VARCHAR(20) DEFAULT 'ACTIVE',
        datas CLOB
    );
    
    -- Configuration pipeline par type de session
    CREATE TABLE grpc_pipeline_configs (
        x_id BIGINT GENERATED BY DEFAULT AS IDENTITY PRIMARY KEY,
        x_dateCreated TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT CURRENT_TIMESTAMP,
        x_dateChanged TIMESTAMP WITH TIME ZONE,
        x_sub VARCHAR(255),
        x_partition VARCHAR(30),
        x_comment CLOB,
        session_type VARCHAR(100) NOT NULL UNIQUE,
        janino_pre VARCHAR(255),
        jdm_rules VARCHAR(255),
        janino_post VARCHAR(255),
        default_targets VARCHAR(50) DEFAULT 'broadcast',
        enabled BOOLEAN DEFAULT TRUE,
        datas CLOB
    );
    

    Pipeline de traitement

    Le pipeline permet de transformer les messages et determiner leur routage.

    Etapes

    Message entrant
          │
          ▼
    ┌─────────────────┐
    │  1. Janino PRE  │  Transformation du payload
    └────────┬────────┘
             │
             ▼
    ┌─────────────────┐
    │  2. JDM Rules   │  Determination des cibles (routing)
    └────────┬────────┘
             │
             ▼
    ┌─────────────────┐
    │  3. Janino POST │  Transformation finale
    └────────┬────────┘
             │
             ▼
        Routage
       /        \
    Broadcast   Cible
    

    Configuration du pipeline

    Inserez dans grpc_pipeline_configs :

    INSERT INTO grpc_pipeline_configs
    (session_type, janino_pre, jdm_rules, janino_post, default_targets, enabled)
    VALUES
    ('chat', NULL, 'chat_routing', NULL, 'broadcast', TRUE);
    

    Exemple script Janino PRE

    // repository/scripts/java/grpc/chat_filter.java
    public class ChatFilter {
        public Object execute(Map<String, Object> context) {
            Map<String, Object> payload = (Map<String, Object>) context.get("payload");
            String senderId = (String) context.get("senderId");
    
            // Ajouter metadata
            payload.put("processed_at", System.currentTimeMillis());
            payload.put("sender", senderId);
    
            return payload;
        }
    }
    

    Exemple modele JDM pour routing

    {
      "name": "chat_routing",
      "nodes": [
        {
          "id": "input",
          "type": "inputNode",
          "content": {
            "fields": [
              {"field": "payload.target", "type": "string"},
              {"field": "payload.type", "type": "string"}
            ]
          }
        },
        {
          "id": "decision",
          "type": "decisionTableNode",
          "content": {
            "hitPolicy": "first",
            "inputs": [
              {"field": "payload.type"}
            ],
            "outputs": [
              {"field": "targets", "type": "string"},
              {"field": "broadcast", "type": "boolean"}
            ],
            "rules": [
              {"_input": ["private"], "_output": ["${payload.target}", false]},
              {"_input": ["broadcast"], "_output": ["", true]},
              {"_input": ["*"], "_output": ["", true]}
            ]
          }
        }
      ]
    }
    

    Connexions Peer

    Ajouter un peer programmatiquement

    @Autowired
    private PeerConnectionPool peerPool;
    
    public void connectToPeer() {
        // Ajouter et connecter
        boolean connected = peerPool.addPeer("socle-b", "192.168.1.100", 9400);
    
        if (connected) {
            // Ouvrir un stream
            peerPool.openStream("socle-b", message -> {
                // Handler pour messages entrants
                System.out.println("Received: " + message.getPayload());
            });
    
            // Envoyer un message
            SessionMessage msg = SessionMessage.newBuilder()
                .setSessionId("...")
                .setSenderId("local-participant")
                .setKind(MessageKind.DATA)
                .setPayload("{\"text\":\"Hello\"}")
                .build();
    
            peerPool.sendToPeer("socle-b", msg);
        }
    }
    

    Broadcast vers tous les peers

    SessionMessage msg = SessionMessage.newBuilder()
        .setSessionId(sessionId)
        .setSenderId(myId)
        .setKind(MessageKind.DATA)
        .setPayload(jsonPayload)
        .build();
    
    int sent = peerPool.broadcast(msg, null);  // null = inclure tous
    

    Worker gRPC

    Priorites

    Methode Valeur Description
    getStartPriority() 800 Demarre apres Janino (25) et JDM (30)
    getStopPriority() 10 S’arrete tot pour drain des connexions

    Statistiques

    @Autowired
    private GrpcServerWorker grpcWorker;
    
    Map<String, Object> stats = grpcWorker.getStats();
    // {
    //   "running": true,
    //   "port": 9400,
    //   "sessions": { "cached_sessions": 5, "active_streams": 12 },
    //   "messaging": { "messages_received": 1234, "messages_sent": 5678 },
    //   "peers": { "connected_peers": 2, "total_messages_sent": 100 }
    // }
    

    Types de sessions supportes

    Type Description
    chat Communication temps reel
    sync Synchronisation de donnees
    broadcast Diffusion un-vers-plusieurs
    pipeline Traitement en chaine
    custom Type personnalise

    Securite

    TLS (a venir)

    Le support TLS sera ajoute dans une version future :

    socle:
      grpc:
        tls:
          enabled: true
          cert-path: /path/to/server.crt
          key-path: /path/to/server.key
          ca-path: /path/to/ca.crt  # Pour mTLS
    

    Authentification

    L’authentification peut etre implementee via :

    • Metadata gRPC (tokens dans headers)
    • Intercepteurs personnalises
    • Integration avec le module Auth existant

    Monitoring

    Metriques exposees

    Les metriques sont disponibles via l’endpoint Prometheus /actuator/prometheus :

    • grpc_sessions_active : Nombre de sessions actives
    • grpc_messages_received_total : Total messages recus
    • grpc_messages_sent_total : Total messages envoyes
    • grpc_peers_connected : Nombre de peers connectes

    Logs

    [grpc_server] Started on port 9400
    [grpc_session] Session created: abc-123 (type=chat, owner=user1)
    [grpc_session] Participant joined: user2 -> abc-123 (total: 2)
    [grpc_comm] Message received: session=abc-123, sender=user1, kind=DATA
    [grpc_pipeline] Executed for session abc-123 (targets=broadcast)
    

    Troubleshooting

    Le serveur ne demarre pas

    1. Verifiez que le port 9400 n’est pas utilise
    2. Verifiez GRPC_ENABLED=true
    3. Consultez les logs pour les erreurs d’initialisation

    Sessions expirent trop vite

    Augmentez le TTL :

    socle:
      grpc:
        session:
          ttl-seconds: 7200  # 2 heures
    

    Messages non delivres

    1. Verifiez que le destinataire a un stream actif
    2. Verifiez les logs du pipeline pour les erreurs
    3. Verifiez la configuration du routing dans grpc_pipeline_configs

    Erreurs de connexion peer

    1. Verifiez la connectivite reseau
    2. Verifiez que le peer a gRPC active
    3. Augmentez le timeout de connexion :
    socle:
      grpc:
        peer:
          connection-timeout-ms: 10000
    

    Structure des fichiers

    src/main/java/eu/lmvi/socle/grpc/
    ├── GrpcConfiguration.java        # Configuration Spring
    ├── GrpcServerWorker.java         # Worker principal
    ├── session/
    │   ├── Session.java              # Record session
    │   ├── Participant.java          # Record participant
    │   ├── SessionStatus.java        # Enum ACTIVE/CLOSING/CLOSED/EXPIRED
    │   ├── SessionManager.java       # Gestion sessions + streams
    │   └── SessionRedisRepository.java  # Persistance Redis
    ├── pipeline/
    │   ├── PipelineConfig.java       # Config pipeline
    │   ├── PipelineResult.java       # Resultat pipeline
    │   └── PipelineExecutor.java     # Execution Janino + JDM
    ├── peer/
    │   ├── PeerConnection.java       # Connexion unique
    │   └── PeerConnectionPool.java   # Pool avec reconnexion
    └── service/
        ├── SocleCommService.java     # Streaming bidirectionnel
        ├── SessionServiceImpl.java   # CRUD sessions
        └── DiscoveryServiceImpl.java # Discovery + Ping
    
    src/main/proto/
    └── socle_comm.proto              # Definition Protocol Buffers