09 – Pipeline Engine
Version : 4.1.0 Date : 2025-12-28
1. Introduction
Le PipelineEngine permet de définir des chaînes de traitement composées d’étapes (steps) exécutées séquentiellement ou en parallèle.
Caractéristiques
- Définition déclarative des pipelines
- Exécution séquentielle ou parallèle
- Gestion des erreurs et retry
- Context partagé entre étapes
- Métriques et logging intégrés
2. Architecture
┌─────────────────────────────────────────────────────────────┐
│ PipelineEngine │
└─────────────────────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────┐
│ Pipeline │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ PipelineContext │ │
│ │ - input data │ │
│ │ - shared state │ │
│ │ - results │ │
│ └─────────────────────────────────────────────────────┘ │
│ │
│ ┌──────────┐ ┌──────────┐ ┌──────────┐ │
│ │ Step 1 │──►│ Step 2 │──►│ Step 3 │ │
│ └──────────┘ └──────────┘ └──────────┘ │
│ │
└─────────────────────────────────────────────────────────────┘
3. Interface Pipeline
package eu.lmvi.socle.pipeline;
public interface Pipeline<I, O> {
/**
* Nom du pipeline
*/
String getName();
/**
* Exécute le pipeline
*/
PipelineResult<O> execute(I input);
/**
* Exécute le pipeline avec context
*/
PipelineResult<O> execute(I input, PipelineContext context);
/**
* Liste des étapes
*/
List<PipelineStep<?, ?>> getSteps();
}
4. Interface PipelineStep
package eu.lmvi.socle.pipeline;
public interface PipelineStep<I, O> {
/**
* Nom de l'étape
*/
String getName();
/**
* Exécute l'étape
*/
StepResult<O> execute(I input, PipelineContext context);
/**
* L'étape peut-elle être retryée ?
*/
default boolean isRetryable() {
return true;
}
/**
* Nombre max de retries
*/
default int getMaxRetries() {
return 3;
}
/**
* L'étape est-elle optionnelle ?
*/
default boolean isOptional() {
return false;
}
}
5. Context et Result
5.1 PipelineContext
package eu.lmvi.socle.pipeline;
public class PipelineContext {
private final Map<String, Object> attributes = new ConcurrentHashMap<>();
private final List<StepResult<?>> stepResults = new ArrayList<>();
private final String correlationId;
private final Instant startTime;
public PipelineContext() {
this.correlationId = UUID.randomUUID().toString();
this.startTime = Instant.now();
}
public void put(String key, Object value) {
attributes.put(key, value);
}
public <T> Optional<T> get(String key, Class<T> type) {
return Optional.ofNullable(attributes.get(key))
.filter(type::isInstance)
.map(type::cast);
}
public void addStepResult(StepResult<?> result) {
stepResults.add(result);
}
public List<StepResult<?>> getStepResults() {
return Collections.unmodifiableList(stepResults);
}
public String getCorrelationId() {
return correlationId;
}
public Duration getElapsedTime() {
return Duration.between(startTime, Instant.now());
}
}
5.2 StepResult
package eu.lmvi.socle.pipeline;
public record StepResult<T>(
String stepName,
StepStatus status,
T output,
Exception error,
Duration duration,
int attempts
) {
public boolean isSuccess() {
return status == StepStatus.SUCCESS;
}
public boolean isFailure() {
return status == StepStatus.FAILURE;
}
public boolean isSkipped() {
return status == StepStatus.SKIPPED;
}
public static <T> StepResult<T> success(String name, T output, Duration duration) {
return new StepResult<>(name, StepStatus.SUCCESS, output, null, duration, 1);
}
public static <T> StepResult<T> failure(String name, Exception error, Duration duration, int attempts) {
return new StepResult<>(name, StepStatus.FAILURE, null, error, duration, attempts);
}
public static <T> StepResult<T> skipped(String name) {
return new StepResult<>(name, StepStatus.SKIPPED, null, null, Duration.ZERO, 0);
}
}
public enum StepStatus {
SUCCESS,
FAILURE,
SKIPPED
}
5.3 PipelineResult
package eu.lmvi.socle.pipeline;
public record PipelineResult<T>(
String pipelineName,
PipelineStatus status,
T output,
PipelineContext context,
Duration totalDuration
) {
public boolean isSuccess() {
return status == PipelineStatus.SUCCESS;
}
public List<StepResult<?>> getFailedSteps() {
return context.getStepResults().stream()
.filter(StepResult::isFailure)
.toList();
}
}
public enum PipelineStatus {
SUCCESS,
PARTIAL_SUCCESS,
FAILURE
}
6. Implémentation
6.1 DefaultPipelineEngine
package eu.lmvi.socle.pipeline;
@Component
public class DefaultPipelineEngine implements PipelineEngine {
private static final Logger log = LoggerFactory.getLogger(DefaultPipelineEngine.class);
@Override
public <I, O> PipelineResult<O> execute(Pipeline<I, O> pipeline, I input) {
return execute(pipeline, input, new PipelineContext());
}
@Override
@SuppressWarnings("unchecked")
public <I, O> PipelineResult<O> execute(Pipeline<I, O> pipeline, I input, PipelineContext context) {
log.info("[{}] Starting pipeline execution", pipeline.getName());
Instant start = Instant.now();
Object currentInput = input;
O finalOutput = null;
boolean hasFailure = false;
for (PipelineStep<?, ?> step : pipeline.getSteps()) {
PipelineStep<Object, Object> typedStep = (PipelineStep<Object, Object>) step;
StepResult<Object> result = executeStep(typedStep, currentInput, context);
context.addStepResult(result);
if (result.isSuccess()) {
currentInput = result.output();
finalOutput = (O) result.output();
} else if (result.isFailure()) {
if (!step.isOptional()) {
hasFailure = true;
log.error("[{}] Pipeline failed at step: {}", pipeline.getName(), step.getName());
break;
}
log.warn("[{}] Optional step failed, continuing: {}", pipeline.getName(), step.getName());
}
}
Duration duration = Duration.between(start, Instant.now());
PipelineStatus status = hasFailure
? PipelineStatus.FAILURE
: (context.getStepResults().stream().anyMatch(StepResult::isFailure)
? PipelineStatus.PARTIAL_SUCCESS
: PipelineStatus.SUCCESS);
log.info("[{}] Pipeline completed with status: {} in {}ms",
pipeline.getName(), status, duration.toMillis());
return new PipelineResult<>(pipeline.getName(), status, finalOutput, context, duration);
}
private <I, O> StepResult<O> executeStep(PipelineStep<I, O> step, I input, PipelineContext context) {
log.debug("[{}] Executing step", step.getName());
Instant start = Instant.now();
int attempts = 0;
Exception lastError = null;
while (attempts < step.getMaxRetries()) {
attempts++;
try {
StepResult<O> result = step.execute(input, context);
if (result.isSuccess()) {
Duration duration = Duration.between(start, Instant.now());
log.debug("[{}] Step completed in {}ms", step.getName(), duration.toMillis());
return StepResult.success(step.getName(), result.output(), duration);
}
lastError = result.error();
} catch (Exception e) {
lastError = e;
log.warn("[{}] Step attempt {} failed: {}", step.getName(), attempts, e.getMessage());
if (!step.isRetryable() || attempts >= step.getMaxRetries()) {
break;
}
// Backoff
try {
Thread.sleep(attempts * 1000L);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
break;
}
}
}
Duration duration = Duration.between(start, Instant.now());
return StepResult.failure(step.getName(), lastError, duration, attempts);
}
}
6.2 Pipeline Builder
package eu.lmvi.socle.pipeline;
public class PipelineBuilder<I, O> {
private final String name;
private final List<PipelineStep<?, ?>> steps = new ArrayList<>();
public PipelineBuilder(String name) {
this.name = name;
}
public static <I, O> PipelineBuilder<I, O> create(String name) {
return new PipelineBuilder<>(name);
}
public <NO> PipelineBuilder<I, NO> addStep(PipelineStep<O, NO> step) {
steps.add(step);
return (PipelineBuilder<I, NO>) this;
}
public <NO> PipelineBuilder<I, NO> addStep(String name, Function<O, NO> processor) {
return addStep(new FunctionalStep<>(name, processor));
}
public Pipeline<I, O> build() {
return new DefaultPipeline<>(name, List.copyOf(steps));
}
private record FunctionalStep<I, O>(String name, Function<I, O> processor) implements PipelineStep<I, O> {
@Override
public String getName() {
return name;
}
@Override
public StepResult<O> execute(I input, PipelineContext context) {
O output = processor.apply(input);
return StepResult.success(name, output, Duration.ZERO);
}
}
}
7. Utilisation
7.1 Pipeline simple
@Component
public class OrderPipeline {
@Autowired
private PipelineEngine engine;
public PipelineResult<OrderResult> processOrder(Order order) {
Pipeline<Order, OrderResult> pipeline = PipelineBuilder
.<Order, OrderResult>create("order-processing")
.addStep("validate", this::validateOrder)
.addStep("enrich", this::enrichOrder)
.addStep("process", this::processOrder)
.addStep("notify", this::notifyCustomer)
.build();
return engine.execute(pipeline, order);
}
private ValidatedOrder validateOrder(Order order) {
// Validation...
return new ValidatedOrder(order);
}
private EnrichedOrder enrichOrder(ValidatedOrder order) {
// Enrichissement...
return new EnrichedOrder(order);
}
private ProcessedOrder processOrder(EnrichedOrder order) {
// Traitement...
return new ProcessedOrder(order);
}
private OrderResult notifyCustomer(ProcessedOrder order) {
// Notification...
return new OrderResult(order, "SUCCESS");
}
}
7.2 Étapes personnalisées
public class ValidationStep implements PipelineStep<Order, ValidatedOrder> {
@Override
public String getName() {
return "validation";
}
@Override
public StepResult<ValidatedOrder> execute(Order input, PipelineContext context) {
List<String> errors = new ArrayList<>();
if (input.getCustomerId() == null) {
errors.add("Missing customer ID");
}
if (input.getItems().isEmpty()) {
errors.add("No items in order");
}
if (!errors.isEmpty()) {
return StepResult.failure(getName(),
new ValidationException(errors),
Duration.ZERO, 1);
}
// Stocker des données dans le context
context.put("customerId", input.getCustomerId());
return StepResult.success(getName(), new ValidatedOrder(input), Duration.ZERO);
}
@Override
public boolean isRetryable() {
return false; // Validation ne doit pas être retryée
}
}
7.3 Étape optionnelle
public class NotificationStep implements PipelineStep<ProcessedOrder, ProcessedOrder> {
@Override
public String getName() {
return "notification";
}
@Override
public StepResult<ProcessedOrder> execute(ProcessedOrder input, PipelineContext context) {
try {
sendNotification(input);
return StepResult.success(getName(), input, Duration.ZERO);
} catch (Exception e) {
return StepResult.failure(getName(), e, Duration.ZERO, 1);
}
}
@Override
public boolean isOptional() {
return true; // Le pipeline continue même si la notif échoue
}
}
7.4 Utilisation du context
public class EnrichmentStep implements PipelineStep<ValidatedOrder, EnrichedOrder> {
@Autowired
private CustomerService customerService;
@Override
public String getName() {
return "enrichment";
}
@Override
public StepResult<EnrichedOrder> execute(ValidatedOrder input, PipelineContext context) {
// Lire depuis le context
String customerId = context.get("customerId", String.class).orElseThrow();
// Enrichir
Customer customer = customerService.getCustomer(customerId);
EnrichedOrder enriched = new EnrichedOrder(input, customer);
// Écrire dans le context pour les étapes suivantes
context.put("customerEmail", customer.getEmail());
return StepResult.success(getName(), enriched, Duration.ZERO);
}
}
8. Gestion des erreurs
8.1 Retry automatique
public class ExternalApiStep implements PipelineStep<Data, ApiResponse> {
@Override
public String getName() {
return "external-api-call";
}
@Override
public boolean isRetryable() {
return true;
}
@Override
public int getMaxRetries() {
return 5; // 5 tentatives max
}
@Override
public StepResult<ApiResponse> execute(Data input, PipelineContext context) {
// Appel API qui peut échouer
ApiResponse response = callApi(input);
return StepResult.success(getName(), response, Duration.ZERO);
}
}
8.2 Traitement des résultats
PipelineResult<OrderResult> result = engine.execute(pipeline, order);
if (result.isSuccess()) {
log.info("Order processed successfully: {}", result.output());
} else {
// Analyser les étapes en échec
for (StepResult<?> stepResult : result.getFailedSteps()) {
log.error("Step {} failed after {} attempts: {}",
stepResult.stepName(),
stepResult.attempts(),
stepResult.error().getMessage());
}
// Décider quoi faire
if (result.status() == PipelineStatus.PARTIAL_SUCCESS) {
// Certaines étapes optionnelles ont échoué
handlePartialSuccess(result);
} else {
// Échec complet
handleFailure(result);
}
}
9. Pipelines parallèles
public class ParallelEnrichmentStep implements PipelineStep<Order, EnrichedOrder> {
@Autowired
private ExecutorService executor;
@Override
public StepResult<EnrichedOrder> execute(Order input, PipelineContext context) {
// Exécuter plusieurs enrichissements en parallèle
CompletableFuture<Customer> customerFuture = CompletableFuture.supplyAsync(
() -> customerService.getCustomer(input.getCustomerId()), executor);
CompletableFuture<Inventory> inventoryFuture = CompletableFuture.supplyAsync(
() -> inventoryService.checkInventory(input.getItems()), executor);
CompletableFuture<Pricing> pricingFuture = CompletableFuture.supplyAsync(
() -> pricingService.calculatePrice(input), executor);
try {
CompletableFuture.allOf(customerFuture, inventoryFuture, pricingFuture).join();
EnrichedOrder enriched = new EnrichedOrder(
input,
customerFuture.get(),
inventoryFuture.get(),
pricingFuture.get()
);
return StepResult.success(getName(), enriched, Duration.ZERO);
} catch (Exception e) {
return StepResult.failure(getName(), e, Duration.ZERO, 1);
}
}
}
10. Métriques
@Component
public class PipelineMetrics {
private final MeterRegistry registry;
public void recordPipelineExecution(PipelineResult<?> result) {
Timer.builder("socle_pipeline_duration")
.tag("pipeline", result.pipelineName())
.tag("status", result.status().name())
.register(registry)
.record(result.totalDuration());
Counter.builder("socle_pipeline_executions")
.tag("pipeline", result.pipelineName())
.tag("status", result.status().name())
.register(registry)
.increment();
}
public void recordStepExecution(StepResult<?> result) {
Timer.builder("socle_pipeline_step_duration")
.tag("step", result.stepName())
.tag("status", result.status().name())
.register(registry)
.record(result.duration());
}
}
11. Bonnes pratiques
DO
- Garder les étapes petites et focalisées
- Utiliser le context pour partager des données entre étapes
- Marquer les étapes non-critiques comme optionnelles
- Logger au niveau des étapes pour le debugging
- Configurer des retries appropriés pour les appels externes
DON’T
- Ne pas faire de pipelines avec trop d’étapes (> 10)
- Ne pas mélanger logique métier et infrastructure dans une étape
- Ne pas ignorer les erreurs des étapes obligatoires
- Ne pas utiliser de pipelines pour du traitement simple
12. Références
- 05-WORKERS – Workers
- 11-RESILIENCE – Patterns de résilience
Pipeline Engine V2 (Nouveau)
Ajouté en version 4.1.0
13. Introduction Pipeline V2
Le Pipeline V2 est une refonte complète du système de pipelines, conçue pour les applications à haute disponibilité nécessitant :
- Garantie at-least-once : Aucune perte de message grâce au pattern Queue/Claim/Ack
- Virtual Threads (Java 21) : Scalabilité maximale avec les threads virtuels
- Reprise après crash : État persisté permettant la reprise au dernier stage OK
- DLQ (Dead Letter Queue) : Gestion des échecs définitifs avec replay
Quand utiliser V2 vs V1 ?
| Critère | Pipeline V1 | Pipeline V2 |
|---|---|---|
| Exécution | Synchrone, mono-thread | Asynchrone, multi-thread |
| Garantie de livraison | At-most-once | At-least-once |
| Persistance | Non | Oui (TechDB / H2) |
| DLQ | Non | Oui |
| Use case | Traitement simple | Flux critiques, haute disponibilité |
14. Architecture V2
┌─────────────────────────────────────────────────────────────────┐
│ MOP │
│ (supervision, restart, health) │
└───────────────────────────┬─────────────────────────────────────┘
│
┌───────────────────────────▼─────────────────────────────────────┐
│ PipelineV2 │
│ │
│ ┌────────────┐ ┌────────────┐ ┌────────────┐ │
│ │ Stage A │────►│ Stage B │────►│ Stage C │ │
│ │ ┌────────┐ │ │ ┌────────┐ │ │ ┌────────┐ │ │
│ │ │ Queue │ │ │ │ Queue │ │ │ │ Queue │ │ │
│ │ └────────┘ │ │ └────────┘ │ │ └────────┘ │ │
│ │ VThreads │ │ VThreads │ │ VThreads │ │
│ └────────────┘ └────────────┘ └────────────┘ │
│ │
│ ┌─────────────────┴─────────────────┐ │
│ ┌────▼────┐ ┌────▼────┐ │
│ │ Context │ │ DLQ │ │
│ │ (state) │ │ (errors)│ │
│ └─────────┘ └─────────┘ │
└─────────────────────────────────────────────────────────────────┘
Composants
- StageQueue : Queue avec sémantique Claim/Ack entre chaque stage
- VThreadStageExecutor : Workers Virtual Threads consommant les queues
- PersistentPipelineContext : État persisté pour reprise après crash
- DLQ : Messages en échec après max retries
15. Création d’un Pipeline V2
15.1 Builder Fluent
import eu.lmvi.socle.pipeline.v2.PipelineBuilderV2;
import eu.lmvi.socle.pipeline.v2.PipelineV2;
// Création du pipeline
PipelineV2<Order, Result> pipeline = PipelineBuilderV2
.<Order>create("order-processing")
.description("Pipeline de traitement des commandes")
// Stage 1: Validation (2 workers, timeout 30s)
.addStage("validate", this::validateOrder)
.concurrency(2)
.timeout(Duration.ofSeconds(30))
// Stage 2: Enrichissement (4 workers, queue 1000)
.addStage("enrich", this::enrichOrder)
.concurrency(4)
.queueSize(1000)
// Stage 3: Publication (2 workers)
.addStage("publish", this::publishOrder)
.concurrency(2)
.maxRetries(5)
.backoff(Duration.ofMillis(200), 2.0)
.build();
15.2 Configuration par stage
.addStage("nom", processor)
.concurrency(4) // Nombre de workers parallèles
.queueSize(500) // Taille max de la queue
.timeout(Duration.ofMinutes(5)) // Timeout par message
.maxRetries(3) // Tentatives avant DLQ
.backoff(base, multiplier) // Backoff exponentiel
.optional() // Stage optionnel (échec non bloquant)
16. Cycle de vie
16.1 Démarrage/Arrêt
// Démarrer le pipeline (lance tous les workers)
pipeline.start();
// Vérifier l'état
boolean running = pipeline.isRunning();
// Arrêt gracieux (attend la fin des traitements en cours)
pipeline.stop();
// Arrêt immédiat (interrompt tout)
pipeline.stopNow();
16.2 Soumission de messages
// Soumettre un message
String messageId = pipeline.submit(order, "exec-001");
// Avec métadonnées
String messageId = pipeline.submit(order, "exec-001", Map.of(
"source", "api",
"priority", "high"
));
16.3 Suivi d’exécution
// État d'une exécution
Optional<PipelineState> state = pipeline.getExecutionState("exec-001");
state.ifPresent(s -> {
System.out.println("Status: " + s.status());
System.out.println("Current stage: " + s.currentStage());
System.out.println("Duration: " + s.duration());
});
// Liste des exécutions actives
List<PipelineState> active = pipeline.listActiveExecutions();
// Statistiques globales
PipelineV2.PipelineStats stats = pipeline.getStats();
System.out.println("Submitted: " + stats.totalSubmitted());
System.out.println("Completed: " + stats.totalCompleted());
System.out.println("Failed: " + stats.totalFailed());
17. Queue/Claim/Ack
17.1 Pattern de consommation
Le Pipeline V2 utilise un pattern de consommation fiable :
1. CLAIM : Le worker réserve un message (lease)
2. PROCESS : Le worker traite le message
3. ACK : Succès → message supprimé
ou
NACK : Échec → retry ou DLQ
17.2 Lease et récupération
Si un worker crash pendant le traitement :
- Le lease expire après le timeout configuré
- Le message est automatiquement remis en queue
- Un autre worker le reprend
// Timeout de lease par défaut: 5 minutes
// Configurable par stage via timeout()
17.3 Implémentations disponibles
| Implémentation | Persistance | Use case |
|---|---|---|
InMemoryStageQueue |
Non | Développement, tests |
TechDbStageQueue |
Oui (H2) | Production |
18. Dead Letter Queue (DLQ)
18.1 Envoi en DLQ
Un message est envoyé en DLQ après :
maxRetriestentatives échouées- Une erreur non-retryable
18.2 Consultation
// Via API REST
GET /admin/pipelines/v2/{name}/dlq
// Programmatiquement (via StageQueue)
List<DlqMessage<Order>> messages = queue.peekDlq(100);
for (DlqMessage<Order> msg : messages) {
System.out.println("ID: " + msg.messageId());
System.out.println("Error: " + msg.errorMessage());
System.out.println("Attempts: " + msg.attempts());
System.out.println("Failed at: " + msg.failedAt());
}
18.3 Replay
// Via API REST
POST /admin/pipelines/v2/{name}/dlq/{messageId}/replay
// Programmatiquement
boolean replayed = queue.replayFromDlq(messageId);
// Replay tous les messages
int count = queue.replayAllFromDlq("exec-001");
18.4 Suppression
// Via API REST
DELETE /admin/pipelines/v2/{name}/dlq/{messageId}
// Programmatiquement
queue.deleteFromDlq(messageId);
// Purge complète
queue.purgeDlq();
19. Persistance et reprise
19.1 Context persistant
Le Pipeline V2 peut persister son état pour survivre aux redémarrages :
import eu.lmvi.socle.pipeline.context.TechDbPipelineContext;
// Créer un context persistant
Supplier<Connection> connectionSupplier = () -> dataSource.getConnection();
PersistentPipelineContext context = new TechDbPipelineContext(connectionSupplier);
// Utiliser avec le builder
PipelineV2<Order, Result> pipeline = PipelineBuilderV2
.<Order>create("order-processing")
.context(context) // Utilise le context persistant
.addStage(...)
.build();
19.2 Reprise après crash
Au redémarrage, les messages non-ackés sont automatiquement re-traités :
- Les messages en status
CLAIMEDdont le lease a expiré → remis enPENDING - Les stages reprennent leur consommation normalement
- Grâce à l’idempotence, les messages déjà traités ne sont pas re-traités
20. Idempotence (OBLIGATOIRE)
20.1 Pourquoi ?
Le modèle at-least-once signifie qu’un message peut être traité plusieurs fois :
- Crash après traitement mais avant ACK
- Timeout de lease pendant un traitement long
- Replay depuis la DLQ
Chaque stage DOIT être idempotent.
20.2 Patterns recommandés
// Pattern 1: Vérifier "déjà traité"
public Order processOrder(Order order) {
if (orderRepository.exists(order.getId())) {
return orderRepository.get(order.getId()); // Déjà traité
}
// Traiter...
return orderRepository.save(order);
}
// Pattern 2: UPSERT au lieu d'INSERT
public void saveOrder(Order order) {
orderRepository.upsert(order); // Idempotent par nature
}
// Pattern 3: Utiliser executionId comme clé
public void processWithDedup(Order order, String executionId) {
String dedupKey = "order:" + order.getId() + ":" + executionId;
if (kvBus.exists(dedupKey)) {
return; // Déjà traité pour cette exécution
}
// Traiter...
kvBus.set(dedupKey, true, Duration.ofDays(7));
}
20.3 Anti-patterns
// MAUVAIS: INSERT sans vérification
db.insert(order); // Échoue si déjà inséré
// MAUVAIS: Compteur incrémental
counter.increment(); // Compté plusieurs fois en cas de replay
// MAUVAIS: Email sans déduplication
emailService.send(email); // Envoyé plusieurs fois
21. API REST d’administration
21.1 Endpoints disponibles
GET /admin/pipelines/v2 → Liste des pipelines
GET /admin/pipelines/v2/{name} → État d'un pipeline
GET /admin/pipelines/v2/{name}/stages → État des stages
GET /admin/pipelines/v2/{name}/executions → Exécutions en cours
GET /admin/pipelines/v2/{name}/dlq → Messages en DLQ
POST /admin/pipelines/v2/{name}/dlq/{id}/replay → Rejouer un message
DELETE /admin/pipelines/v2/{name}/dlq/{id} → Supprimer de la DLQ
POST /admin/pipelines/v2/{name}/dlq/replay-all → Rejouer tous
POST /admin/pipelines/v2/{name}/start → Démarrer le pipeline
POST /admin/pipelines/v2/{name}/stop → Arrêter gracieusement
POST /admin/pipelines/v2/{name}/stop-now → Arrêter immédiatement
21.2 Exemple de réponse
GET /admin/pipelines/v2/order-processing
{
"timestamp": 1735387200000,
"name": "order-processing",
"description": "Pipeline de traitement des commandes",
"running": true,
"stats": {
"stage_count": 3,
"active_executions": 5,
"total_submitted": 1500,
"total_completed": 1480,
"total_failed": 12
}
}
22. Métriques Prometheus
22.1 Métriques exposées
# Compteurs
socle_pipeline_v2_messages_total{pipeline="order-processing", stage="validate", status="success"}
socle_pipeline_v2_messages_total{pipeline="order-processing", stage="validate", status="failed"}
socle_pipeline_v2_dlq_total{pipeline="order-processing", stage="validate"}
# Gauges
socle_pipeline_v2_queue_pending{pipeline="order-processing", stage="validate"}
socle_pipeline_v2_queue_processing{pipeline="order-processing", stage="validate"}
socle_pipeline_v2_dlq_size{pipeline="order-processing", stage="validate"}
socle_pipeline_v2_active_executions{pipeline="order-processing"}
socle_pipeline_v2_running{pipeline="order-processing"}
# Histogrammes (durée de traitement)
socle_pipeline_v2_stage_duration_seconds{pipeline="order-processing", stage="validate", quantile="0.5"}
socle_pipeline_v2_stage_duration_seconds{pipeline="order-processing", stage="validate", quantile="0.95"}
socle_pipeline_v2_stage_duration_seconds{pipeline="order-processing", stage="validate", quantile="0.99"}
22.2 Alertes recommandées
# DLQ croissante
- alert: PipelineDLQGrowing
expr: increase(socle_pipeline_v2_dlq_total[1h]) > 10
labels:
severity: warning
# Queue qui s'accumule (backpressure)
- alert: PipelineQueueBacklog
expr: socle_pipeline_v2_queue_pending > 1000
for: 5m
labels:
severity: warning
# Pipeline arrêté
- alert: PipelineStopped
expr: socle_pipeline_v2_running == 0
for: 1m
labels:
severity: critical
23. Bonnes pratiques V2
DO
- Toujours implémenter l’idempotence dans chaque stage
- Utiliser
TechDbPipelineContexten production pour la persistance - Configurer des
maxRetriesappropriés (3-5 pour API externes) - Monitorer les métriques DLQ et queue_pending
- Traiter régulièrement les messages en DLQ (replay ou suppression)
DON’T
- Ne jamais supposer qu’un message ne sera traité qu’une fois
- Ne pas ignorer les messages en DLQ
- Ne pas mettre de timeout trop courts (risque de faux positifs)
- Ne pas créer trop de stages (overhead de queues)
- Ne pas oublier d’appeler
pipeline.stop()à l’arrêt de l’application
24. Migration V1 → V2
24.1 Changements d’API
| V1 | V2 |
|---|---|
PipelineEngine.execute(pipeline, input) |
pipeline.submit(input, executionId) |
Pipeline<I,O> |
PipelineV2<I,O> |
PipelineBuilder |
PipelineBuilderV2 |
| Exécution synchrone | Exécution asynchrone |
24.2 Étapes de migration
- Rendre les steps idempotents (obligatoire avant migration)
- Créer le pipeline avec
PipelineBuilderV2 - Remplacer
execute()parsubmit() - Ajouter le suivi asynchrone des résultats si nécessaire
- Configurer la persistance (
TechDbPipelineContext) - Activer les métriques et monitoring DLQ
25. Configuration Dynamique (YAML)
Ajouté en version 4.1.0
25.1 Introduction
Le Pipeline V2 peut maintenant être configuré entièrement via YAML, permettant :
- Activation/désactivation des stages sans recompiler
- Configuration dynamique de la concurrence, timeouts, retries
- Ajout de nouveaux stages sans modifier le code de la factory
- Modularité des workers (un worker = un composant Spring)
25.2 Architecture
┌─────────────────────────────────────────────────────────────────┐
│ application.yml │
│ cdc: │
│ pipeline: │
│ stages: │
│ - name: filter │
│ worker: filterWorker │
│ enabled: true │
└───────────────────────────────┬─────────────────────────────────┘
│
▼
┌───────────────────────────────────────────────────────────────────┐
│ PipelineYamlConfig (@ConfigurationProperties) │
│ │
│ - name: String │
│ - stages: List<StageYamlConfig> │
│ - getEnabledStages(): List<StageYamlConfig> │
└───────────────────────────────┬───────────────────────────────────┘
│
▼
┌───────────────────────────────────────────────────────────────────┐
│ DynamicPipelineBuilder │
│ │
│ .config(pipelineConfig) │
│ .workers(Map<String, PipelineStageWorker>) │
│ .build() → PipelineV2<I, O> │
└───────────────────────────────────────────────────────────────────┘
25.3 Interface PipelineStageWorker
Chaque stage est implémenté comme un composant Spring implémentant PipelineStageWorker :
package eu.lmvi.socle.pipeline.v2.worker;
public interface PipelineStageWorker<I, O> {
/**
* Nom unique du worker (utilisé dans la config YAML)
*/
String getName();
/**
* Initialise le worker (chargement ressources, compilation scripts, etc.)
*/
default void initialize() throws Exception {}
/**
* Traite un élément d'entrée (DOIT être thread-safe)
*/
O process(I input) throws Exception;
/**
* Libère les ressources
*/
default void shutdown() {}
/**
* Indique si ce worker est activé
*/
default boolean isEnabled() { return true; }
/**
* Description pour le monitoring
*/
default String getDescription() { return getName(); }
/**
* Ordre de priorité (plus bas = plus tôt)
*/
default int getOrder() { return 100; }
}
25.4 Exemple d’implémentation
@Component("filterWorker")
public class FilterStageWorker implements PipelineStageWorker<WalEvent, PipelineMessage> {
@Autowired
private TableFilter tableFilter;
@Override
public String getName() {
return "filter";
}
@Override
public int getOrder() {
return 10; // Premier dans le pipeline
}
@Override
public PipelineMessage process(WalEvent event) throws Exception {
PipelineMessage message = new PipelineMessage(event);
if (!tableFilter.isAllowed(event.getTable())) {
message.markAsFiltered("table_not_in_whitelist");
}
return message;
}
}
25.5 Configuration YAML
cdc:
pipeline:
name: cdc-pipeline-v2
description: Pipeline CDC PostgreSQL vers Kafka
enabled: true
stages:
# Stage 1: Filtrage
- name: filter
worker: filterWorker # Nom du bean Spring
enabled: true
concurrency: 1
timeout-seconds: 5
max-retries: 1
order: 10
# Stage 2: Transformation
- name: transform
worker: transformWorker
enabled: true
concurrency: 2
timeout-seconds: 30
order: 20
# Stage 3a: Enrichissement Rules
- name: enrich-rules
worker: rulesEnrichmentWorker
enabled: true
concurrency: 2
order: 30
# Stage 3b: Enrichissement BeanShell
- name: enrich-beanshell
worker: beanshellEnrichmentWorker
enabled: ${CDC_ENRICH_BSH_ENABLED:true} # Variable d'environnement
concurrency: 2
order: 31
# Stage 3c: Enrichissement JavaScript
- name: enrich-javascript
worker: javascriptEnrichmentWorker
enabled: ${CDC_ENRICH_JS_ENABLED:true}
concurrency: 4
timeout-seconds: 60
order: 32
# Stage 3d: Enrichissement Janino (nouveau)
- name: enrich-janino
worker: janinoEnrichmentWorker
enabled: ${CDC_ENRICH_JANINO_ENABLED:false} # Désactivé par défaut
concurrency: 2
order: 33
# Stage 4: Publication Kafka
- name: publish
worker: publishWorker
enabled: true
concurrency: 2
max-retries: 5
order: 40
# Stage 5: Commit LSN
- name: commit
worker: commitWorker
enabled: true
concurrency: 1
order: 50
25.6 Classe de configuration Spring
@Configuration
@ConfigurationProperties(prefix = "cdc.pipeline")
public class CdcPipelineConfig extends PipelineYamlConfig {
// Hérite de toutes les propriétés
}
25.7 Construction du pipeline
@Component
public class CdcPipelineFactory {
@Autowired
private CdcPipelineConfig pipelineConfig;
@Autowired
private Map<String, PipelineStageWorker<?, ?>> availableWorkers;
@PostConstruct
public void init() {
// Log des workers disponibles
log.info("Available workers: {}", availableWorkers.keySet());
// Construire le pipeline dynamiquement
cdcPipeline = DynamicPipelineBuilder.create()
.config(pipelineConfig)
.workers(availableWorkers)
.build();
log.info("Pipeline created with {} stages",
pipelineConfig.countEnabledStages());
}
}
25.8 Avantages
| Aspect | Avant (hardcodé) | Après (YAML) |
|---|---|---|
| Modification | Recompilation | Redémarrage |
| Activation stage | Code Java | Variable d’env |
| Ajout stage | Modifier factory | Ajouter worker + config |
| Configuration | Dans le code | Centralisée |
| Environnements | Branches différentes | Même code, config différente |
25.9 Activation/Désactivation à chaud
Via variables d’environnement dans .env :
# Désactiver BeanShell
CDC_ENRICH_BSH_ENABLED=false
# Activer Janino
CDC_ENRICH_JANINO_ENABLED=true
# Redémarrer le service
sudo systemctl restart cdc-reflet-kafka-v02
25.10 Ajout d’un nouveau type d’enrichissement
- Créer le worker :
@Component("monNouveauWorker")
public class MonNouveauEnrichmentWorker
implements PipelineStageWorker<PipelineMessage, PipelineMessage> {
// ...
}
- Ajouter dans application.yml :
stages:
- name: enrich-nouveau
worker: monNouveauWorker
enabled: ${CDC_ENRICH_NOUVEAU_ENABLED:false}
order: 34
- Redémarrer – le pipeline inclura automatiquement le nouveau stage.
26. Références V2
- PLAN-ACTION-PIPELINE-V2.md – Plan d’action détaillé
- DELTA-PIPELINE-V004.md – Analyse des écarts









