Socle V004 – Pipeline

Socle V004 - Pipeline

09 – Pipeline Engine

Version : 4.1.0 Date : 2025-12-28

1. Introduction

Le PipelineEngine permet de définir des chaînes de traitement composées d’étapes (steps) exécutées séquentiellement ou en parallèle.

Caractéristiques

  • Définition déclarative des pipelines
  • Exécution séquentielle ou parallèle
  • Gestion des erreurs et retry
  • Context partagé entre étapes
  • Métriques et logging intégrés

2. Architecture

┌─────────────────────────────────────────────────────────────┐
│                      PipelineEngine                          │
└─────────────────────────────────────────────────────────────┘
                              │
                              ▼
┌─────────────────────────────────────────────────────────────┐
│                      Pipeline                                │
│  ┌─────────────────────────────────────────────────────┐    │
│  │              PipelineContext                         │    │
│  │  - input data                                        │    │
│  │  - shared state                                      │    │
│  │  - results                                           │    │
│  └─────────────────────────────────────────────────────┘    │
│                                                              │
│  ┌──────────┐   ┌──────────┐   ┌──────────┐                │
│  │  Step 1  │──►│  Step 2  │──►│  Step 3  │                │
│  └──────────┘   └──────────┘   └──────────┘                │
│                                                              │
└─────────────────────────────────────────────────────────────┘

3. Interface Pipeline

package eu.lmvi.socle.pipeline;

public interface Pipeline<I, O> {

    /**
     * Nom du pipeline
     */
    String getName();

    /**
     * Exécute le pipeline
     */
    PipelineResult<O> execute(I input);

    /**
     * Exécute le pipeline avec context
     */
    PipelineResult<O> execute(I input, PipelineContext context);

    /**
     * Liste des étapes
     */
    List<PipelineStep<?, ?>> getSteps();
}

4. Interface PipelineStep

package eu.lmvi.socle.pipeline;

public interface PipelineStep<I, O> {

    /**
     * Nom de l'étape
     */
    String getName();

    /**
     * Exécute l'étape
     */
    StepResult<O> execute(I input, PipelineContext context);

    /**
     * L'étape peut-elle être retryée ?
     */
    default boolean isRetryable() {
        return true;
    }

    /**
     * Nombre max de retries
     */
    default int getMaxRetries() {
        return 3;
    }

    /**
     * L'étape est-elle optionnelle ?
     */
    default boolean isOptional() {
        return false;
    }
}

5. Context et Result

5.1 PipelineContext

package eu.lmvi.socle.pipeline;

public class PipelineContext {
    private final Map<String, Object> attributes = new ConcurrentHashMap<>();
    private final List<StepResult<?>> stepResults = new ArrayList<>();
    private final String correlationId;
    private final Instant startTime;

    public PipelineContext() {
        this.correlationId = UUID.randomUUID().toString();
        this.startTime = Instant.now();
    }

    public void put(String key, Object value) {
        attributes.put(key, value);
    }

    public <T> Optional<T> get(String key, Class<T> type) {
        return Optional.ofNullable(attributes.get(key))
            .filter(type::isInstance)
            .map(type::cast);
    }

    public void addStepResult(StepResult<?> result) {
        stepResults.add(result);
    }

    public List<StepResult<?>> getStepResults() {
        return Collections.unmodifiableList(stepResults);
    }

    public String getCorrelationId() {
        return correlationId;
    }

    public Duration getElapsedTime() {
        return Duration.between(startTime, Instant.now());
    }
}

5.2 StepResult

package eu.lmvi.socle.pipeline;

public record StepResult<T>(
    String stepName,
    StepStatus status,
    T output,
    Exception error,
    Duration duration,
    int attempts
) {
    public boolean isSuccess() {
        return status == StepStatus.SUCCESS;
    }

    public boolean isFailure() {
        return status == StepStatus.FAILURE;
    }

    public boolean isSkipped() {
        return status == StepStatus.SKIPPED;
    }

    public static <T> StepResult<T> success(String name, T output, Duration duration) {
        return new StepResult<>(name, StepStatus.SUCCESS, output, null, duration, 1);
    }

    public static <T> StepResult<T> failure(String name, Exception error, Duration duration, int attempts) {
        return new StepResult<>(name, StepStatus.FAILURE, null, error, duration, attempts);
    }

    public static <T> StepResult<T> skipped(String name) {
        return new StepResult<>(name, StepStatus.SKIPPED, null, null, Duration.ZERO, 0);
    }
}

public enum StepStatus {
    SUCCESS,
    FAILURE,
    SKIPPED
}

5.3 PipelineResult

package eu.lmvi.socle.pipeline;

public record PipelineResult<T>(
    String pipelineName,
    PipelineStatus status,
    T output,
    PipelineContext context,
    Duration totalDuration
) {
    public boolean isSuccess() {
        return status == PipelineStatus.SUCCESS;
    }

    public List<StepResult<?>> getFailedSteps() {
        return context.getStepResults().stream()
            .filter(StepResult::isFailure)
            .toList();
    }
}

public enum PipelineStatus {
    SUCCESS,
    PARTIAL_SUCCESS,
    FAILURE
}

6. Implémentation

6.1 DefaultPipelineEngine

package eu.lmvi.socle.pipeline;

@Component
public class DefaultPipelineEngine implements PipelineEngine {

    private static final Logger log = LoggerFactory.getLogger(DefaultPipelineEngine.class);

    @Override
    public <I, O> PipelineResult<O> execute(Pipeline<I, O> pipeline, I input) {
        return execute(pipeline, input, new PipelineContext());
    }

    @Override
    @SuppressWarnings("unchecked")
    public <I, O> PipelineResult<O> execute(Pipeline<I, O> pipeline, I input, PipelineContext context) {
        log.info("[{}] Starting pipeline execution", pipeline.getName());
        Instant start = Instant.now();

        Object currentInput = input;
        O finalOutput = null;
        boolean hasFailure = false;

        for (PipelineStep<?, ?> step : pipeline.getSteps()) {
            PipelineStep<Object, Object> typedStep = (PipelineStep<Object, Object>) step;

            StepResult<Object> result = executeStep(typedStep, currentInput, context);
            context.addStepResult(result);

            if (result.isSuccess()) {
                currentInput = result.output();
                finalOutput = (O) result.output();
            } else if (result.isFailure()) {
                if (!step.isOptional()) {
                    hasFailure = true;
                    log.error("[{}] Pipeline failed at step: {}", pipeline.getName(), step.getName());
                    break;
                }
                log.warn("[{}] Optional step failed, continuing: {}", pipeline.getName(), step.getName());
            }
        }

        Duration duration = Duration.between(start, Instant.now());
        PipelineStatus status = hasFailure
            ? PipelineStatus.FAILURE
            : (context.getStepResults().stream().anyMatch(StepResult::isFailure)
                ? PipelineStatus.PARTIAL_SUCCESS
                : PipelineStatus.SUCCESS);

        log.info("[{}] Pipeline completed with status: {} in {}ms",
            pipeline.getName(), status, duration.toMillis());

        return new PipelineResult<>(pipeline.getName(), status, finalOutput, context, duration);
    }

    private <I, O> StepResult<O> executeStep(PipelineStep<I, O> step, I input, PipelineContext context) {
        log.debug("[{}] Executing step", step.getName());
        Instant start = Instant.now();
        int attempts = 0;
        Exception lastError = null;

        while (attempts < step.getMaxRetries()) {
            attempts++;
            try {
                StepResult<O> result = step.execute(input, context);
                if (result.isSuccess()) {
                    Duration duration = Duration.between(start, Instant.now());
                    log.debug("[{}] Step completed in {}ms", step.getName(), duration.toMillis());
                    return StepResult.success(step.getName(), result.output(), duration);
                }
                lastError = result.error();
            } catch (Exception e) {
                lastError = e;
                log.warn("[{}] Step attempt {} failed: {}", step.getName(), attempts, e.getMessage());

                if (!step.isRetryable() || attempts >= step.getMaxRetries()) {
                    break;
                }

                // Backoff
                try {
                    Thread.sleep(attempts * 1000L);
                } catch (InterruptedException ie) {
                    Thread.currentThread().interrupt();
                    break;
                }
            }
        }

        Duration duration = Duration.between(start, Instant.now());
        return StepResult.failure(step.getName(), lastError, duration, attempts);
    }
}

6.2 Pipeline Builder

package eu.lmvi.socle.pipeline;

public class PipelineBuilder<I, O> {

    private final String name;
    private final List<PipelineStep<?, ?>> steps = new ArrayList<>();

    public PipelineBuilder(String name) {
        this.name = name;
    }

    public static <I, O> PipelineBuilder<I, O> create(String name) {
        return new PipelineBuilder<>(name);
    }

    public <NO> PipelineBuilder<I, NO> addStep(PipelineStep<O, NO> step) {
        steps.add(step);
        return (PipelineBuilder<I, NO>) this;
    }

    public <NO> PipelineBuilder<I, NO> addStep(String name, Function<O, NO> processor) {
        return addStep(new FunctionalStep<>(name, processor));
    }

    public Pipeline<I, O> build() {
        return new DefaultPipeline<>(name, List.copyOf(steps));
    }

    private record FunctionalStep<I, O>(String name, Function<I, O> processor) implements PipelineStep<I, O> {

        @Override
        public String getName() {
            return name;
        }

        @Override
        public StepResult<O> execute(I input, PipelineContext context) {
            O output = processor.apply(input);
            return StepResult.success(name, output, Duration.ZERO);
        }
    }
}

7. Utilisation

7.1 Pipeline simple

@Component
public class OrderPipeline {

    @Autowired
    private PipelineEngine engine;

    public PipelineResult<OrderResult> processOrder(Order order) {
        Pipeline<Order, OrderResult> pipeline = PipelineBuilder
            .<Order, OrderResult>create("order-processing")
            .addStep("validate", this::validateOrder)
            .addStep("enrich", this::enrichOrder)
            .addStep("process", this::processOrder)
            .addStep("notify", this::notifyCustomer)
            .build();

        return engine.execute(pipeline, order);
    }

    private ValidatedOrder validateOrder(Order order) {
        // Validation...
        return new ValidatedOrder(order);
    }

    private EnrichedOrder enrichOrder(ValidatedOrder order) {
        // Enrichissement...
        return new EnrichedOrder(order);
    }

    private ProcessedOrder processOrder(EnrichedOrder order) {
        // Traitement...
        return new ProcessedOrder(order);
    }

    private OrderResult notifyCustomer(ProcessedOrder order) {
        // Notification...
        return new OrderResult(order, "SUCCESS");
    }
}

7.2 Étapes personnalisées

public class ValidationStep implements PipelineStep<Order, ValidatedOrder> {

    @Override
    public String getName() {
        return "validation";
    }

    @Override
    public StepResult<ValidatedOrder> execute(Order input, PipelineContext context) {
        List<String> errors = new ArrayList<>();

        if (input.getCustomerId() == null) {
            errors.add("Missing customer ID");
        }
        if (input.getItems().isEmpty()) {
            errors.add("No items in order");
        }

        if (!errors.isEmpty()) {
            return StepResult.failure(getName(),
                new ValidationException(errors),
                Duration.ZERO, 1);
        }

        // Stocker des données dans le context
        context.put("customerId", input.getCustomerId());

        return StepResult.success(getName(), new ValidatedOrder(input), Duration.ZERO);
    }

    @Override
    public boolean isRetryable() {
        return false;  // Validation ne doit pas être retryée
    }
}

7.3 Étape optionnelle

public class NotificationStep implements PipelineStep<ProcessedOrder, ProcessedOrder> {

    @Override
    public String getName() {
        return "notification";
    }

    @Override
    public StepResult<ProcessedOrder> execute(ProcessedOrder input, PipelineContext context) {
        try {
            sendNotification(input);
            return StepResult.success(getName(), input, Duration.ZERO);
        } catch (Exception e) {
            return StepResult.failure(getName(), e, Duration.ZERO, 1);
        }
    }

    @Override
    public boolean isOptional() {
        return true;  // Le pipeline continue même si la notif échoue
    }
}

7.4 Utilisation du context

public class EnrichmentStep implements PipelineStep<ValidatedOrder, EnrichedOrder> {

    @Autowired
    private CustomerService customerService;

    @Override
    public String getName() {
        return "enrichment";
    }

    @Override
    public StepResult<EnrichedOrder> execute(ValidatedOrder input, PipelineContext context) {
        // Lire depuis le context
        String customerId = context.get("customerId", String.class).orElseThrow();

        // Enrichir
        Customer customer = customerService.getCustomer(customerId);
        EnrichedOrder enriched = new EnrichedOrder(input, customer);

        // Écrire dans le context pour les étapes suivantes
        context.put("customerEmail", customer.getEmail());

        return StepResult.success(getName(), enriched, Duration.ZERO);
    }
}

8. Gestion des erreurs

8.1 Retry automatique

public class ExternalApiStep implements PipelineStep<Data, ApiResponse> {

    @Override
    public String getName() {
        return "external-api-call";
    }

    @Override
    public boolean isRetryable() {
        return true;
    }

    @Override
    public int getMaxRetries() {
        return 5;  // 5 tentatives max
    }

    @Override
    public StepResult<ApiResponse> execute(Data input, PipelineContext context) {
        // Appel API qui peut échouer
        ApiResponse response = callApi(input);
        return StepResult.success(getName(), response, Duration.ZERO);
    }
}

8.2 Traitement des résultats

PipelineResult<OrderResult> result = engine.execute(pipeline, order);

if (result.isSuccess()) {
    log.info("Order processed successfully: {}", result.output());
} else {
    // Analyser les étapes en échec
    for (StepResult<?> stepResult : result.getFailedSteps()) {
        log.error("Step {} failed after {} attempts: {}",
            stepResult.stepName(),
            stepResult.attempts(),
            stepResult.error().getMessage());
    }

    // Décider quoi faire
    if (result.status() == PipelineStatus.PARTIAL_SUCCESS) {
        // Certaines étapes optionnelles ont échoué
        handlePartialSuccess(result);
    } else {
        // Échec complet
        handleFailure(result);
    }
}

9. Pipelines parallèles

public class ParallelEnrichmentStep implements PipelineStep<Order, EnrichedOrder> {

    @Autowired
    private ExecutorService executor;

    @Override
    public StepResult<EnrichedOrder> execute(Order input, PipelineContext context) {
        // Exécuter plusieurs enrichissements en parallèle
        CompletableFuture<Customer> customerFuture = CompletableFuture.supplyAsync(
            () -> customerService.getCustomer(input.getCustomerId()), executor);

        CompletableFuture<Inventory> inventoryFuture = CompletableFuture.supplyAsync(
            () -> inventoryService.checkInventory(input.getItems()), executor);

        CompletableFuture<Pricing> pricingFuture = CompletableFuture.supplyAsync(
            () -> pricingService.calculatePrice(input), executor);

        try {
            CompletableFuture.allOf(customerFuture, inventoryFuture, pricingFuture).join();

            EnrichedOrder enriched = new EnrichedOrder(
                input,
                customerFuture.get(),
                inventoryFuture.get(),
                pricingFuture.get()
            );

            return StepResult.success(getName(), enriched, Duration.ZERO);
        } catch (Exception e) {
            return StepResult.failure(getName(), e, Duration.ZERO, 1);
        }
    }
}

10. Métriques

@Component
public class PipelineMetrics {

    private final MeterRegistry registry;

    public void recordPipelineExecution(PipelineResult<?> result) {
        Timer.builder("socle_pipeline_duration")
            .tag("pipeline", result.pipelineName())
            .tag("status", result.status().name())
            .register(registry)
            .record(result.totalDuration());

        Counter.builder("socle_pipeline_executions")
            .tag("pipeline", result.pipelineName())
            .tag("status", result.status().name())
            .register(registry)
            .increment();
    }

    public void recordStepExecution(StepResult<?> result) {
        Timer.builder("socle_pipeline_step_duration")
            .tag("step", result.stepName())
            .tag("status", result.status().name())
            .register(registry)
            .record(result.duration());
    }
}

11. Bonnes pratiques

DO

  • Garder les étapes petites et focalisées
  • Utiliser le context pour partager des données entre étapes
  • Marquer les étapes non-critiques comme optionnelles
  • Logger au niveau des étapes pour le debugging
  • Configurer des retries appropriés pour les appels externes

DON’T

  • Ne pas faire de pipelines avec trop d’étapes (> 10)
  • Ne pas mélanger logique métier et infrastructure dans une étape
  • Ne pas ignorer les erreurs des étapes obligatoires
  • Ne pas utiliser de pipelines pour du traitement simple

12. Références

Pipeline Engine V2 (Nouveau)

Ajouté en version 4.1.0

13. Introduction Pipeline V2

Le Pipeline V2 est une refonte complète du système de pipelines, conçue pour les applications à haute disponibilité nécessitant :

  • Garantie at-least-once : Aucune perte de message grâce au pattern Queue/Claim/Ack
  • Virtual Threads (Java 21) : Scalabilité maximale avec les threads virtuels
  • Reprise après crash : État persisté permettant la reprise au dernier stage OK
  • DLQ (Dead Letter Queue) : Gestion des échecs définitifs avec replay

Quand utiliser V2 vs V1 ?

Critère Pipeline V1 Pipeline V2
Exécution Synchrone, mono-thread Asynchrone, multi-thread
Garantie de livraison At-most-once At-least-once
Persistance Non Oui (TechDB / H2)
DLQ Non Oui
Use case Traitement simple Flux critiques, haute disponibilité

14. Architecture V2

┌─────────────────────────────────────────────────────────────────┐
│                            MOP                                   │
│               (supervision, restart, health)                     │
└───────────────────────────┬─────────────────────────────────────┘
                            │
┌───────────────────────────▼─────────────────────────────────────┐
│                     PipelineV2                                   │
│                                                                  │
│  ┌────────────┐     ┌────────────┐     ┌────────────┐          │
│  │  Stage A   │────►│  Stage B   │────►│  Stage C   │          │
│  │ ┌────────┐ │     │ ┌────────┐ │     │ ┌────────┐ │          │
│  │ │ Queue  │ │     │ │ Queue  │ │     │ │ Queue  │ │          │
│  │ └────────┘ │     │ └────────┘ │     │ └────────┘ │          │
│  │ VThreads  │     │ VThreads  │     │ VThreads  │          │
│  └────────────┘     └────────────┘     └────────────┘          │
│                                                                  │
│         ┌─────────────────┴─────────────────┐                   │
│    ┌────▼────┐                        ┌────▼────┐              │
│    │ Context │                        │   DLQ   │              │
│    │ (state) │                        │ (errors)│              │
│    └─────────┘                        └─────────┘              │
└─────────────────────────────────────────────────────────────────┘

Composants

  • StageQueue : Queue avec sémantique Claim/Ack entre chaque stage
  • VThreadStageExecutor : Workers Virtual Threads consommant les queues
  • PersistentPipelineContext : État persisté pour reprise après crash
  • DLQ : Messages en échec après max retries

15. Création d’un Pipeline V2

15.1 Builder Fluent

import eu.lmvi.socle.pipeline.v2.PipelineBuilderV2;
import eu.lmvi.socle.pipeline.v2.PipelineV2;

// Création du pipeline
PipelineV2<Order, Result> pipeline = PipelineBuilderV2
    .<Order>create("order-processing")
    .description("Pipeline de traitement des commandes")

    // Stage 1: Validation (2 workers, timeout 30s)
    .addStage("validate", this::validateOrder)
        .concurrency(2)
        .timeout(Duration.ofSeconds(30))

    // Stage 2: Enrichissement (4 workers, queue 1000)
    .addStage("enrich", this::enrichOrder)
        .concurrency(4)
        .queueSize(1000)

    // Stage 3: Publication (2 workers)
    .addStage("publish", this::publishOrder)
        .concurrency(2)
        .maxRetries(5)
        .backoff(Duration.ofMillis(200), 2.0)

    .build();

15.2 Configuration par stage

.addStage("nom", processor)
    .concurrency(4)           // Nombre de workers parallèles
    .queueSize(500)           // Taille max de la queue
    .timeout(Duration.ofMinutes(5))  // Timeout par message
    .maxRetries(3)            // Tentatives avant DLQ
    .backoff(base, multiplier) // Backoff exponentiel
    .optional()               // Stage optionnel (échec non bloquant)

16. Cycle de vie

16.1 Démarrage/Arrêt

// Démarrer le pipeline (lance tous les workers)
pipeline.start();

// Vérifier l'état
boolean running = pipeline.isRunning();

// Arrêt gracieux (attend la fin des traitements en cours)
pipeline.stop();

// Arrêt immédiat (interrompt tout)
pipeline.stopNow();

16.2 Soumission de messages

// Soumettre un message
String messageId = pipeline.submit(order, "exec-001");

// Avec métadonnées
String messageId = pipeline.submit(order, "exec-001", Map.of(
    "source", "api",
    "priority", "high"
));

16.3 Suivi d’exécution

// État d'une exécution
Optional<PipelineState> state = pipeline.getExecutionState("exec-001");
state.ifPresent(s -> {
    System.out.println("Status: " + s.status());
    System.out.println("Current stage: " + s.currentStage());
    System.out.println("Duration: " + s.duration());
});

// Liste des exécutions actives
List<PipelineState> active = pipeline.listActiveExecutions();

// Statistiques globales
PipelineV2.PipelineStats stats = pipeline.getStats();
System.out.println("Submitted: " + stats.totalSubmitted());
System.out.println("Completed: " + stats.totalCompleted());
System.out.println("Failed: " + stats.totalFailed());

17. Queue/Claim/Ack

17.1 Pattern de consommation

Le Pipeline V2 utilise un pattern de consommation fiable :

1. CLAIM   : Le worker réserve un message (lease)
2. PROCESS : Le worker traite le message
3. ACK     : Succès → message supprimé
   ou
   NACK    : Échec → retry ou DLQ

17.2 Lease et récupération

Si un worker crash pendant le traitement :

  • Le lease expire après le timeout configuré
  • Le message est automatiquement remis en queue
  • Un autre worker le reprend
// Timeout de lease par défaut: 5 minutes
// Configurable par stage via timeout()

17.3 Implémentations disponibles

Implémentation Persistance Use case
InMemoryStageQueue Non Développement, tests
TechDbStageQueue Oui (H2) Production

18. Dead Letter Queue (DLQ)

18.1 Envoi en DLQ

Un message est envoyé en DLQ après :

  • maxRetries tentatives échouées
  • Une erreur non-retryable

18.2 Consultation

// Via API REST
GET /admin/pipelines/v2/{name}/dlq

// Programmatiquement (via StageQueue)
List<DlqMessage<Order>> messages = queue.peekDlq(100);
for (DlqMessage<Order> msg : messages) {
    System.out.println("ID: " + msg.messageId());
    System.out.println("Error: " + msg.errorMessage());
    System.out.println("Attempts: " + msg.attempts());
    System.out.println("Failed at: " + msg.failedAt());
}

18.3 Replay

// Via API REST
POST /admin/pipelines/v2/{name}/dlq/{messageId}/replay

// Programmatiquement
boolean replayed = queue.replayFromDlq(messageId);

// Replay tous les messages
int count = queue.replayAllFromDlq("exec-001");

18.4 Suppression

// Via API REST
DELETE /admin/pipelines/v2/{name}/dlq/{messageId}

// Programmatiquement
queue.deleteFromDlq(messageId);

// Purge complète
queue.purgeDlq();

19. Persistance et reprise

19.1 Context persistant

Le Pipeline V2 peut persister son état pour survivre aux redémarrages :

import eu.lmvi.socle.pipeline.context.TechDbPipelineContext;

// Créer un context persistant
Supplier<Connection> connectionSupplier = () -> dataSource.getConnection();
PersistentPipelineContext context = new TechDbPipelineContext(connectionSupplier);

// Utiliser avec le builder
PipelineV2<Order, Result> pipeline = PipelineBuilderV2
    .<Order>create("order-processing")
    .context(context)  // Utilise le context persistant
    .addStage(...)
    .build();

19.2 Reprise après crash

Au redémarrage, les messages non-ackés sont automatiquement re-traités :

  1. Les messages en status CLAIMED dont le lease a expiré → remis en PENDING
  2. Les stages reprennent leur consommation normalement
  3. Grâce à l’idempotence, les messages déjà traités ne sont pas re-traités

20. Idempotence (OBLIGATOIRE)

20.1 Pourquoi ?

Le modèle at-least-once signifie qu’un message peut être traité plusieurs fois :

  • Crash après traitement mais avant ACK
  • Timeout de lease pendant un traitement long
  • Replay depuis la DLQ

Chaque stage DOIT être idempotent.

20.2 Patterns recommandés

// Pattern 1: Vérifier "déjà traité"
public Order processOrder(Order order) {
    if (orderRepository.exists(order.getId())) {
        return orderRepository.get(order.getId()); // Déjà traité
    }
    // Traiter...
    return orderRepository.save(order);
}

// Pattern 2: UPSERT au lieu d'INSERT
public void saveOrder(Order order) {
    orderRepository.upsert(order); // Idempotent par nature
}

// Pattern 3: Utiliser executionId comme clé
public void processWithDedup(Order order, String executionId) {
    String dedupKey = "order:" + order.getId() + ":" + executionId;
    if (kvBus.exists(dedupKey)) {
        return; // Déjà traité pour cette exécution
    }

    // Traiter...

    kvBus.set(dedupKey, true, Duration.ofDays(7));
}

20.3 Anti-patterns

// MAUVAIS: INSERT sans vérification
db.insert(order); // Échoue si déjà inséré

// MAUVAIS: Compteur incrémental
counter.increment(); // Compté plusieurs fois en cas de replay

// MAUVAIS: Email sans déduplication
emailService.send(email); // Envoyé plusieurs fois

21. API REST d’administration

21.1 Endpoints disponibles

GET  /admin/pipelines/v2                    → Liste des pipelines
GET  /admin/pipelines/v2/{name}             → État d'un pipeline
GET  /admin/pipelines/v2/{name}/stages      → État des stages
GET  /admin/pipelines/v2/{name}/executions  → Exécutions en cours

GET  /admin/pipelines/v2/{name}/dlq         → Messages en DLQ
POST /admin/pipelines/v2/{name}/dlq/{id}/replay  → Rejouer un message
DELETE /admin/pipelines/v2/{name}/dlq/{id}  → Supprimer de la DLQ
POST /admin/pipelines/v2/{name}/dlq/replay-all   → Rejouer tous

POST /admin/pipelines/v2/{name}/start       → Démarrer le pipeline
POST /admin/pipelines/v2/{name}/stop        → Arrêter gracieusement
POST /admin/pipelines/v2/{name}/stop-now    → Arrêter immédiatement

21.2 Exemple de réponse

GET /admin/pipelines/v2/order-processing

{
  "timestamp": 1735387200000,
  "name": "order-processing",
  "description": "Pipeline de traitement des commandes",
  "running": true,
  "stats": {
    "stage_count": 3,
    "active_executions": 5,
    "total_submitted": 1500,
    "total_completed": 1480,
    "total_failed": 12
  }
}

22. Métriques Prometheus

22.1 Métriques exposées

# Compteurs
socle_pipeline_v2_messages_total{pipeline="order-processing", stage="validate", status="success"}
socle_pipeline_v2_messages_total{pipeline="order-processing", stage="validate", status="failed"}
socle_pipeline_v2_dlq_total{pipeline="order-processing", stage="validate"}

# Gauges
socle_pipeline_v2_queue_pending{pipeline="order-processing", stage="validate"}
socle_pipeline_v2_queue_processing{pipeline="order-processing", stage="validate"}
socle_pipeline_v2_dlq_size{pipeline="order-processing", stage="validate"}
socle_pipeline_v2_active_executions{pipeline="order-processing"}
socle_pipeline_v2_running{pipeline="order-processing"}

# Histogrammes (durée de traitement)
socle_pipeline_v2_stage_duration_seconds{pipeline="order-processing", stage="validate", quantile="0.5"}
socle_pipeline_v2_stage_duration_seconds{pipeline="order-processing", stage="validate", quantile="0.95"}
socle_pipeline_v2_stage_duration_seconds{pipeline="order-processing", stage="validate", quantile="0.99"}

22.2 Alertes recommandées

# DLQ croissante
- alert: PipelineDLQGrowing
  expr: increase(socle_pipeline_v2_dlq_total[1h]) > 10
  labels:
    severity: warning

# Queue qui s'accumule (backpressure)
- alert: PipelineQueueBacklog
  expr: socle_pipeline_v2_queue_pending > 1000
  for: 5m
  labels:
    severity: warning

# Pipeline arrêté
- alert: PipelineStopped
  expr: socle_pipeline_v2_running == 0
  for: 1m
  labels:
    severity: critical

23. Bonnes pratiques V2

DO

  • Toujours implémenter l’idempotence dans chaque stage
  • Utiliser TechDbPipelineContext en production pour la persistance
  • Configurer des maxRetries appropriés (3-5 pour API externes)
  • Monitorer les métriques DLQ et queue_pending
  • Traiter régulièrement les messages en DLQ (replay ou suppression)

DON’T

  • Ne jamais supposer qu’un message ne sera traité qu’une fois
  • Ne pas ignorer les messages en DLQ
  • Ne pas mettre de timeout trop courts (risque de faux positifs)
  • Ne pas créer trop de stages (overhead de queues)
  • Ne pas oublier d’appeler pipeline.stop() à l’arrêt de l’application

24. Migration V1 → V2

24.1 Changements d’API

V1 V2
PipelineEngine.execute(pipeline, input) pipeline.submit(input, executionId)
Pipeline<I,O> PipelineV2<I,O>
PipelineBuilder PipelineBuilderV2
Exécution synchrone Exécution asynchrone

24.2 Étapes de migration

  1. Rendre les steps idempotents (obligatoire avant migration)
  2. Créer le pipeline avec PipelineBuilderV2
  3. Remplacer execute() par submit()
  4. Ajouter le suivi asynchrone des résultats si nécessaire
  5. Configurer la persistance (TechDbPipelineContext)
  6. Activer les métriques et monitoring DLQ

25. Configuration Dynamique (YAML)

Ajouté en version 4.1.0

25.1 Introduction

Le Pipeline V2 peut maintenant être configuré entièrement via YAML, permettant :

  • Activation/désactivation des stages sans recompiler
  • Configuration dynamique de la concurrence, timeouts, retries
  • Ajout de nouveaux stages sans modifier le code de la factory
  • Modularité des workers (un worker = un composant Spring)

25.2 Architecture

┌─────────────────────────────────────────────────────────────────┐
│                        application.yml                           │
│  cdc:                                                            │
│    pipeline:                                                     │
│      stages:                                                     │
│        - name: filter                                            │
│          worker: filterWorker                                    │
│          enabled: true                                           │
└───────────────────────────────┬─────────────────────────────────┘
                                │
                                ▼
┌───────────────────────────────────────────────────────────────────┐
│            PipelineYamlConfig (@ConfigurationProperties)          │
│                                                                    │
│  - name: String                                                    │
│  - stages: List<StageYamlConfig>                                   │
│  - getEnabledStages(): List<StageYamlConfig>                       │
└───────────────────────────────┬───────────────────────────────────┘
                                │
                                ▼
┌───────────────────────────────────────────────────────────────────┐
│                    DynamicPipelineBuilder                          │
│                                                                    │
│  .config(pipelineConfig)                                           │
│  .workers(Map<String, PipelineStageWorker>)                        │
│  .build() → PipelineV2<I, O>                                       │
└───────────────────────────────────────────────────────────────────┘

25.3 Interface PipelineStageWorker

Chaque stage est implémenté comme un composant Spring implémentant PipelineStageWorker :

package eu.lmvi.socle.pipeline.v2.worker;

public interface PipelineStageWorker<I, O> {

    /**
     * Nom unique du worker (utilisé dans la config YAML)
     */
    String getName();

    /**
     * Initialise le worker (chargement ressources, compilation scripts, etc.)
     */
    default void initialize() throws Exception {}

    /**
     * Traite un élément d'entrée (DOIT être thread-safe)
     */
    O process(I input) throws Exception;

    /**
     * Libère les ressources
     */
    default void shutdown() {}

    /**
     * Indique si ce worker est activé
     */
    default boolean isEnabled() { return true; }

    /**
     * Description pour le monitoring
     */
    default String getDescription() { return getName(); }

    /**
     * Ordre de priorité (plus bas = plus tôt)
     */
    default int getOrder() { return 100; }
}

25.4 Exemple d’implémentation

@Component("filterWorker")
public class FilterStageWorker implements PipelineStageWorker<WalEvent, PipelineMessage> {

    @Autowired
    private TableFilter tableFilter;

    @Override
    public String getName() {
        return "filter";
    }

    @Override
    public int getOrder() {
        return 10;  // Premier dans le pipeline
    }

    @Override
    public PipelineMessage process(WalEvent event) throws Exception {
        PipelineMessage message = new PipelineMessage(event);
        if (!tableFilter.isAllowed(event.getTable())) {
            message.markAsFiltered("table_not_in_whitelist");
        }
        return message;
    }
}

25.5 Configuration YAML

cdc:
  pipeline:
    name: cdc-pipeline-v2
    description: Pipeline CDC PostgreSQL vers Kafka
    enabled: true
    stages:
      # Stage 1: Filtrage
      - name: filter
        worker: filterWorker        # Nom du bean Spring
        enabled: true
        concurrency: 1
        timeout-seconds: 5
        max-retries: 1
        order: 10

      # Stage 2: Transformation
      - name: transform
        worker: transformWorker
        enabled: true
        concurrency: 2
        timeout-seconds: 30
        order: 20

      # Stage 3a: Enrichissement Rules
      - name: enrich-rules
        worker: rulesEnrichmentWorker
        enabled: true
        concurrency: 2
        order: 30

      # Stage 3b: Enrichissement BeanShell
      - name: enrich-beanshell
        worker: beanshellEnrichmentWorker
        enabled: ${CDC_ENRICH_BSH_ENABLED:true}  # Variable d'environnement
        concurrency: 2
        order: 31

      # Stage 3c: Enrichissement JavaScript
      - name: enrich-javascript
        worker: javascriptEnrichmentWorker
        enabled: ${CDC_ENRICH_JS_ENABLED:true}
        concurrency: 4
        timeout-seconds: 60
        order: 32

      # Stage 3d: Enrichissement Janino (nouveau)
      - name: enrich-janino
        worker: janinoEnrichmentWorker
        enabled: ${CDC_ENRICH_JANINO_ENABLED:false}  # Désactivé par défaut
        concurrency: 2
        order: 33

      # Stage 4: Publication Kafka
      - name: publish
        worker: publishWorker
        enabled: true
        concurrency: 2
        max-retries: 5
        order: 40

      # Stage 5: Commit LSN
      - name: commit
        worker: commitWorker
        enabled: true
        concurrency: 1
        order: 50

25.6 Classe de configuration Spring

@Configuration
@ConfigurationProperties(prefix = "cdc.pipeline")
public class CdcPipelineConfig extends PipelineYamlConfig {
    // Hérite de toutes les propriétés
}

25.7 Construction du pipeline

@Component
public class CdcPipelineFactory {

    @Autowired
    private CdcPipelineConfig pipelineConfig;

    @Autowired
    private Map<String, PipelineStageWorker<?, ?>> availableWorkers;

    @PostConstruct
    public void init() {
        // Log des workers disponibles
        log.info("Available workers: {}", availableWorkers.keySet());

        // Construire le pipeline dynamiquement
        cdcPipeline = DynamicPipelineBuilder.create()
            .config(pipelineConfig)
            .workers(availableWorkers)
            .build();

        log.info("Pipeline created with {} stages",
            pipelineConfig.countEnabledStages());
    }
}

25.8 Avantages

Aspect Avant (hardcodé) Après (YAML)
Modification Recompilation Redémarrage
Activation stage Code Java Variable d’env
Ajout stage Modifier factory Ajouter worker + config
Configuration Dans le code Centralisée
Environnements Branches différentes Même code, config différente

25.9 Activation/Désactivation à chaud

Via variables d’environnement dans .env :

# Désactiver BeanShell
CDC_ENRICH_BSH_ENABLED=false

# Activer Janino
CDC_ENRICH_JANINO_ENABLED=true

# Redémarrer le service
sudo systemctl restart cdc-reflet-kafka-v02

25.10 Ajout d’un nouveau type d’enrichissement

  1. Créer le worker :
@Component("monNouveauWorker")
public class MonNouveauEnrichmentWorker
    implements PipelineStageWorker<PipelineMessage, PipelineMessage> {
    // ...
}
  1. Ajouter dans application.yml :
stages:
  - name: enrich-nouveau
    worker: monNouveauWorker
    enabled: ${CDC_ENRICH_NOUVEAU_ENABLED:false}
    order: 34
  1. Redémarrer – le pipeline inclura automatiquement le nouveau stage.

26. Références V2

Commentaires

Laisser un commentaire

Votre adresse e-mail ne sera pas publiée. Les champs obligatoires sont indiqués avec *