09 – Pipeline Engine
Version : 4.1.0 Date : 2025-12-28
1. Introduction
Le PipelineEngine permet de définir des chaînes de traitement composées d’étapes (steps) exécutées séquentiellement ou en parallèle.
Caractéristiques
- Définition déclarative des pipelines
- Exécution séquentielle ou parallèle
- Gestion des erreurs et retry
- Context partagé entre étapes
- Métriques et logging intégrés
2. Architecture
┌─────────────────────────────────────────────────────────────┐
│ PipelineEngine │
└─────────────────────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────┐
│ Pipeline │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ PipelineContext │ │
│ │ - input data │ │
│ │ - shared state │ │
│ │ - results │ │
│ └─────────────────────────────────────────────────────┘ │
│ │
│ ┌──────────┐ ┌──────────┐ ┌──────────┐ │
│ │ Step 1 │──►│ Step 2 │──►│ Step 3 │ │
│ └──────────┘ └──────────┘ └──────────┘ │
│ │
└─────────────────────────────────────────────────────────────┘
3. Interface Pipeline
package eu.lmvi.socle.pipeline;
public interface Pipeline<I, O> {
/**
* Nom du pipeline
*/
String getName();
/**
* Exécute le pipeline
*/
PipelineResult<O> execute(I input);
/**
* Exécute le pipeline avec context
*/
PipelineResult<O> execute(I input, PipelineContext context);
/**
* Liste des étapes
*/
List<PipelineStep<?, ?>> getSteps();
}
4. Interface PipelineStep
package eu.lmvi.socle.pipeline;
public interface PipelineStep<I, O> {
/**
* Nom de l'étape
*/
String getName();
/**
* Exécute l'étape
*/
StepResult<O> execute(I input, PipelineContext context);
/**
* L'étape peut-elle être retryée ?
*/
default boolean isRetryable() {
return true;
}
/**
* Nombre max de retries
*/
default int getMaxRetries() {
return 3;
}
/**
* L'étape est-elle optionnelle ?
*/
default boolean isOptional() {
return false;
}
}
5. Context et Result
5.1 PipelineContext
package eu.lmvi.socle.pipeline;
public class PipelineContext {
private final Map<String, Object> attributes = new ConcurrentHashMap<>();
private final List<StepResult<?>> stepResults = new ArrayList<>();
private final String correlationId;
private final Instant startTime;
public PipelineContext() {
this.correlationId = UUID.randomUUID().toString();
this.startTime = Instant.now();
}
public void put(String key, Object value) {
attributes.put(key, value);
}
public <T> Optional<T> get(String key, Class<T> type) {
return Optional.ofNullable(attributes.get(key))
.filter(type::isInstance)
.map(type::cast);
}
public void addStepResult(StepResult<?> result) {
stepResults.add(result);
}
public List<StepResult<?>> getStepResults() {
return Collections.unmodifiableList(stepResults);
}
public String getCorrelationId() {
return correlationId;
}
public Duration getElapsedTime() {
return Duration.between(startTime, Instant.now());
}
}
5.2 StepResult
package eu.lmvi.socle.pipeline;
public record StepResult<T>(
String stepName,
StepStatus status,
T output,
Exception error,
Duration duration,
int attempts
) {
public boolean isSuccess() {
return status == StepStatus.SUCCESS;
}
public boolean isFailure() {
return status == StepStatus.FAILURE;
}
public boolean isSkipped() {
return status == StepStatus.SKIPPED;
}
public static <T> StepResult<T> success(String name, T output, Duration duration) {
return new StepResult<>(name, StepStatus.SUCCESS, output, null, duration, 1);
}
public static <T> StepResult<T> failure(String name, Exception error, Duration duration, int attempts) {
return new StepResult<>(name, StepStatus.FAILURE, null, error, duration, attempts);
}
public static <T> StepResult<T> skipped(String name) {
return new StepResult<>(name, StepStatus.SKIPPED, null, null, Duration.ZERO, 0);
}
}
public enum StepStatus {
SUCCESS,
FAILURE,
SKIPPED
}
5.3 PipelineResult
package eu.lmvi.socle.pipeline;
public record PipelineResult<T>(
String pipelineName,
PipelineStatus status,
T output,
PipelineContext context,
Duration totalDuration
) {
public boolean isSuccess() {
return status == PipelineStatus.SUCCESS;
}
public List<StepResult<?>> getFailedSteps() {
return context.getStepResults().stream()
.filter(StepResult::isFailure)
.toList();
}
}
public enum PipelineStatus {
SUCCESS,
PARTIAL_SUCCESS,
FAILURE
}
6. Implémentation
6.1 DefaultPipelineEngine
package eu.lmvi.socle.pipeline;
@Component
public class DefaultPipelineEngine implements PipelineEngine {
private static final Logger log = LoggerFactory.getLogger(DefaultPipelineEngine.class);
@Override
public <I, O> PipelineResult<O> execute(Pipeline<I, O> pipeline, I input) {
return execute(pipeline, input, new PipelineContext());
}
@Override
@SuppressWarnings("unchecked")
public <I, O> PipelineResult<O> execute(Pipeline<I, O> pipeline, I input, PipelineContext context) {
log.info("[{}] Starting pipeline execution", pipeline.getName());
Instant start = Instant.now();
Object currentInput = input;
O finalOutput = null;
boolean hasFailure = false;
for (PipelineStep<?, ?> step : pipeline.getSteps()) {
PipelineStep<Object, Object> typedStep = (PipelineStep<Object, Object>) step;
StepResult<Object> result = executeStep(typedStep, currentInput, context);
context.addStepResult(result);
if (result.isSuccess()) {
currentInput = result.output();
finalOutput = (O) result.output();
} else if (result.isFailure()) {
if (!step.isOptional()) {
hasFailure = true;
log.error("[{}] Pipeline failed at step: {}", pipeline.getName(), step.getName());
break;
}
log.warn("[{}] Optional step failed, continuing: {}", pipeline.getName(), step.getName());
}
}
Duration duration = Duration.between(start, Instant.now());
PipelineStatus status = hasFailure
? PipelineStatus.FAILURE
: (context.getStepResults().stream().anyMatch(StepResult::isFailure)
? PipelineStatus.PARTIAL_SUCCESS
: PipelineStatus.SUCCESS);
log.info("[{}] Pipeline completed with status: {} in {}ms",
pipeline.getName(), status, duration.toMillis());
return new PipelineResult<>(pipeline.getName(), status, finalOutput, context, duration);
}
private <I, O> StepResult<O> executeStep(PipelineStep<I, O> step, I input, PipelineContext context) {
log.debug("[{}] Executing step", step.getName());
Instant start = Instant.now();
int attempts = 0;
Exception lastError = null;
while (attempts < step.getMaxRetries()) {
attempts++;
try {
StepResult<O> result = step.execute(input, context);
if (result.isSuccess()) {
Duration duration = Duration.between(start, Instant.now());
log.debug("[{}] Step completed in {}ms", step.getName(), duration.toMillis());
return StepResult.success(step.getName(), result.output(), duration);
}
lastError = result.error();
} catch (Exception e) {
lastError = e;
log.warn("[{}] Step attempt {} failed: {}", step.getName(), attempts, e.getMessage());
if (!step.isRetryable() || attempts >= step.getMaxRetries()) {
break;
}
// Backoff
try {
Thread.sleep(attempts * 1000L);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
break;
}
}
}
Duration duration = Duration.between(start, Instant.now());
return StepResult.failure(step.getName(), lastError, duration, attempts);
}
}
6.2 Pipeline Builder
package eu.lmvi.socle.pipeline;
public class PipelineBuilder<I, O> {
private final String name;
private final List<PipelineStep<?, ?>> steps = new ArrayList<>();
public PipelineBuilder(String name) {
this.name = name;
}
public static <I, O> PipelineBuilder<I, O> create(String name) {
return new PipelineBuilder<>(name);
}
public <NO> PipelineBuilder<I, NO> addStep(PipelineStep<O, NO> step) {
steps.add(step);
return (PipelineBuilder<I, NO>) this;
}
public <NO> PipelineBuilder<I, NO> addStep(String name, Function<O, NO> processor) {
return addStep(new FunctionalStep<>(name, processor));
}
public Pipeline<I, O> build() {
return new DefaultPipeline<>(name, List.copyOf(steps));
}
private record FunctionalStep<I, O>(String name, Function<I, O> processor) implements PipelineStep<I, O> {
@Override
public String getName() {
return name;
}
@Override
public StepResult<O> execute(I input, PipelineContext context) {
O output = processor.apply(input);
return StepResult.success(name, output, Duration.ZERO);
}
}
}
7. Utilisation
7.1 Pipeline simple
@Component
public class OrderPipeline {
@Autowired
private PipelineEngine engine;
public PipelineResult<OrderResult> processOrder(Order order) {
Pipeline<Order, OrderResult> pipeline = PipelineBuilder
.<Order, OrderResult>create("order-processing")
.addStep("validate", this::validateOrder)
.addStep("enrich", this::enrichOrder)
.addStep("process", this::processOrder)
.addStep("notify", this::notifyCustomer)
.build();
return engine.execute(pipeline, order);
}
private ValidatedOrder validateOrder(Order order) {
// Validation...
return new ValidatedOrder(order);
}
private EnrichedOrder enrichOrder(ValidatedOrder order) {
// Enrichissement...
return new EnrichedOrder(order);
}
private ProcessedOrder processOrder(EnrichedOrder order) {
// Traitement...
return new ProcessedOrder(order);
}
private OrderResult notifyCustomer(ProcessedOrder order) {
// Notification...
return new OrderResult(order, "SUCCESS");
}
}
7.2 Étapes personnalisées
public class ValidationStep implements PipelineStep<Order, ValidatedOrder> {
@Override
public String getName() {
return "validation";
}
@Override
public StepResult<ValidatedOrder> execute(Order input, PipelineContext context) {
List<String> errors = new ArrayList<>();
if (input.getCustomerId() == null) {
errors.add("Missing customer ID");
}
if (input.getItems().isEmpty()) {
errors.add("No items in order");
}
if (!errors.isEmpty()) {
return StepResult.failure(getName(),
new ValidationException(errors),
Duration.ZERO, 1);
}
// Stocker des données dans le context
context.put("customerId", input.getCustomerId());
return StepResult.success(getName(), new ValidatedOrder(input), Duration.ZERO);
}
@Override
public boolean isRetryable() {
return false; // Validation ne doit pas être retryée
}
}
7.3 Étape optionnelle
public class NotificationStep implements PipelineStep<ProcessedOrder, ProcessedOrder> {
@Override
public String getName() {
return "notification";
}
@Override
public StepResult<ProcessedOrder> execute(ProcessedOrder input, PipelineContext context) {
try {
sendNotification(input);
return StepResult.success(getName(), input, Duration.ZERO);
} catch (Exception e) {
return StepResult.failure(getName(), e, Duration.ZERO, 1);
}
}
@Override
public boolean isOptional() {
return true; // Le pipeline continue même si la notif échoue
}
}
7.4 Utilisation du context
public class EnrichmentStep implements PipelineStep<ValidatedOrder, EnrichedOrder> {
@Autowired
private CustomerService customerService;
@Override
public String getName() {
return "enrichment";
}
@Override
public StepResult<EnrichedOrder> execute(ValidatedOrder input, PipelineContext context) {
// Lire depuis le context
String customerId = context.get("customerId", String.class).orElseThrow();
// Enrichir
Customer customer = customerService.getCustomer(customerId);
EnrichedOrder enriched = new EnrichedOrder(input, customer);
// Écrire dans le context pour les étapes suivantes
context.put("customerEmail", customer.getEmail());
return StepResult.success(getName(), enriched, Duration.ZERO);
}
}
8. Gestion des erreurs
8.1 Retry automatique
public class ExternalApiStep implements PipelineStep<Data, ApiResponse> {
@Override
public String getName() {
return "external-api-call";
}
@Override
public boolean isRetryable() {
return true;
}
@Override
public int getMaxRetries() {
return 5; // 5 tentatives max
}
@Override
public StepResult<ApiResponse> execute(Data input, PipelineContext context) {
// Appel API qui peut échouer
ApiResponse response = callApi(input);
return StepResult.success(getName(), response, Duration.ZERO);
}
}
8.2 Traitement des résultats
PipelineResult<OrderResult> result = engine.execute(pipeline, order);
if (result.isSuccess()) {
log.info("Order processed successfully: {}", result.output());
} else {
// Analyser les étapes en échec
for (StepResult<?> stepResult : result.getFailedSteps()) {
log.error("Step {} failed after {} attempts: {}",
stepResult.stepName(),
stepResult.attempts(),
stepResult.error().getMessage());
}
// Décider quoi faire
if (result.status() == PipelineStatus.PARTIAL_SUCCESS) {
// Certaines étapes optionnelles ont échoué
handlePartialSuccess(result);
} else {
// Échec complet
handleFailure(result);
}
}
9. Pipelines parallèles
public class ParallelEnrichmentStep implements PipelineStep<Order, EnrichedOrder> {
@Autowired
private ExecutorService executor;
@Override
public StepResult<EnrichedOrder> execute(Order input, PipelineContext context) {
// Exécuter plusieurs enrichissements en parallèle
CompletableFuture<Customer> customerFuture = CompletableFuture.supplyAsync(
() -> customerService.getCustomer(input.getCustomerId()), executor);
CompletableFuture<Inventory> inventoryFuture = CompletableFuture.supplyAsync(
() -> inventoryService.checkInventory(input.getItems()), executor);
CompletableFuture<Pricing> pricingFuture = CompletableFuture.supplyAsync(
() -> pricingService.calculatePrice(input), executor);
try {
CompletableFuture.allOf(customerFuture, inventoryFuture, pricingFuture).join();
EnrichedOrder enriched = new EnrichedOrder(
input,
customerFuture.get(),
inventoryFuture.get(),
pricingFuture.get()
);
return StepResult.success(getName(), enriched, Duration.ZERO);
} catch (Exception e) {
return StepResult.failure(getName(), e, Duration.ZERO, 1);
}
}
}
10. Métriques
@Component
public class PipelineMetrics {
private final MeterRegistry registry;
public void recordPipelineExecution(PipelineResult<?> result) {
Timer.builder("socle_pipeline_duration")
.tag("pipeline", result.pipelineName())
.tag("status", result.status().name())
.register(registry)
.record(result.totalDuration());
Counter.builder("socle_pipeline_executions")
.tag("pipeline", result.pipelineName())
.tag("status", result.status().name())
.register(registry)
.increment();
}
public void recordStepExecution(StepResult<?> result) {
Timer.builder("socle_pipeline_step_duration")
.tag("step", result.stepName())
.tag("status", result.status().name())
.register(registry)
.record(result.duration());
}
}
11. Bonnes pratiques
DO
- Garder les étapes petites et focalisées
- Utiliser le context pour partager des données entre étapes
- Marquer les étapes non-critiques comme optionnelles
- Logger au niveau des étapes pour le debugging
- Configurer des retries appropriés pour les appels externes
DON’T
- Ne pas faire de pipelines avec trop d’étapes (> 10)
- Ne pas mélanger logique métier et infrastructure dans une étape
- Ne pas ignorer les erreurs des étapes obligatoires
- Ne pas utiliser de pipelines pour du traitement simple
12. Références
Pipeline Engine V2 (Nouveau)
Ajouté en version 4.1.0
13. Introduction Pipeline V2
Le Pipeline V2 est une refonte complète du système de pipelines, conçue pour les applications à haute disponibilité nécessitant :
- Garantie at-least-once : Aucune perte de message grâce au pattern Queue/Claim/Ack
- Virtual Threads (Java 21) : Scalabilité maximale avec les threads virtuels
- Reprise après crash : État persisté permettant la reprise au dernier stage OK
- DLQ (Dead Letter Queue) : Gestion des échecs définitifs avec replay
Quand utiliser V2 vs V1 ?
| Critère |
Pipeline V1 |
Pipeline V2 |
| Exécution |
Synchrone, mono-thread |
Asynchrone, multi-thread |
| Garantie de livraison |
At-most-once |
At-least-once |
| Persistance |
Non |
Oui (TechDB / H2) |
| DLQ |
Non |
Oui |
| Use case |
Traitement simple |
Flux critiques, haute disponibilité |
14. Architecture V2
┌─────────────────────────────────────────────────────────────────┐
│ MOP │
│ (supervision, restart, health) │
└───────────────────────────┬─────────────────────────────────────┘
│
┌───────────────────────────▼─────────────────────────────────────┐
│ PipelineV2 │
│ │
│ ┌────────────┐ ┌────────────┐ ┌────────────┐ │
│ │ Stage A │────►│ Stage B │────►│ Stage C │ │
│ │ ┌────────┐ │ │ ┌────────┐ │ │ ┌────────┐ │ │
│ │ │ Queue │ │ │ │ Queue │ │ │ │ Queue │ │ │
│ │ └────────┘ │ │ └────────┘ │ │ └────────┘ │ │
│ │ VThreads │ │ VThreads │ │ VThreads │ │
│ └────────────┘ └────────────┘ └────────────┘ │
│ │
│ ┌─────────────────┴─────────────────┐ │
│ ┌────▼────┐ ┌────▼────┐ │
│ │ Context │ │ DLQ │ │
│ │ (state) │ │ (errors)│ │
│ └─────────┘ └─────────┘ │
└─────────────────────────────────────────────────────────────────┘
Composants
- StageQueue : Queue avec sémantique Claim/Ack entre chaque stage
- VThreadStageExecutor : Workers Virtual Threads consommant les queues
- PersistentPipelineContext : État persisté pour reprise après crash
- DLQ : Messages en échec après max retries
15. Création d’un Pipeline V2
15.1 Builder Fluent
import eu.lmvi.socle.pipeline.v2.PipelineBuilderV2;
import eu.lmvi.socle.pipeline.v2.PipelineV2;
// Création du pipeline
PipelineV2<Order, Result> pipeline = PipelineBuilderV2
.<Order>create("order-processing")
.description("Pipeline de traitement des commandes")
// Stage 1: Validation (2 workers, timeout 30s)
.addStage("validate", this::validateOrder)
.concurrency(2)
.timeout(Duration.ofSeconds(30))
// Stage 2: Enrichissement (4 workers, queue 1000)
.addStage("enrich", this::enrichOrder)
.concurrency(4)
.queueSize(1000)
// Stage 3: Publication (2 workers)
.addStage("publish", this::publishOrder)
.concurrency(2)
.maxRetries(5)
.backoff(Duration.ofMillis(200), 2.0)
.build();
15.2 Configuration par stage
.addStage("nom", processor)
.concurrency(4) // Nombre de workers parallèles
.queueSize(500) // Taille max de la queue
.timeout(Duration.ofMinutes(5)) // Timeout par message
.maxRetries(3) // Tentatives avant DLQ
.backoff(base, multiplier) // Backoff exponentiel
.optional() // Stage optionnel (échec non bloquant)
16. Cycle de vie
16.1 Démarrage/Arrêt
// Démarrer le pipeline (lance tous les workers)
pipeline.start();
// Vérifier l'état
boolean running = pipeline.isRunning();
// Arrêt gracieux (attend la fin des traitements en cours)
pipeline.stop();
// Arrêt immédiat (interrompt tout)
pipeline.stopNow();
16.2 Soumission de messages
// Soumettre un message
String messageId = pipeline.submit(order, "exec-001");
// Avec métadonnées
String messageId = pipeline.submit(order, "exec-001", Map.of(
"source", "api",
"priority", "high"
));
16.3 Suivi d’exécution
// État d'une exécution
Optional<PipelineState> state = pipeline.getExecutionState("exec-001");
state.ifPresent(s -> {
System.out.println("Status: " + s.status());
System.out.println("Current stage: " + s.currentStage());
System.out.println("Duration: " + s.duration());
});
// Liste des exécutions actives
List<PipelineState> active = pipeline.listActiveExecutions();
// Statistiques globales
PipelineV2.PipelineStats stats = pipeline.getStats();
System.out.println("Submitted: " + stats.totalSubmitted());
System.out.println("Completed: " + stats.totalCompleted());
System.out.println("Failed: " + stats.totalFailed());
17. Queue/Claim/Ack
17.1 Pattern de consommation
Le Pipeline V2 utilise un pattern de consommation fiable :
1. CLAIM : Le worker réserve un message (lease)
2. PROCESS : Le worker traite le message
3. ACK : Succès → message supprimé
ou
NACK : Échec → retry ou DLQ
17.2 Lease et récupération
Si un worker crash pendant le traitement :
- Le lease expire après le timeout configuré
- Le message est automatiquement remis en queue
- Un autre worker le reprend
// Timeout de lease par défaut: 5 minutes
// Configurable par stage via timeout()
17.3 Implémentations disponibles
| Implémentation |
Persistance |
Use case |
InMemoryStageQueue |
Non |
Développement, tests |
TechDbStageQueue |
Oui (H2) |
Production |
18. Dead Letter Queue (DLQ)
18.1 Envoi en DLQ
Un message est envoyé en DLQ après :
maxRetries tentatives échouées
- Une erreur non-retryable
18.2 Consultation
// Via API REST
GET /admin/pipelines/v2/{name}/dlq
// Programmatiquement (via StageQueue)
List<DlqMessage<Order>> messages = queue.peekDlq(100);
for (DlqMessage<Order> msg : messages) {
System.out.println("ID: " + msg.messageId());
System.out.println("Error: " + msg.errorMessage());
System.out.println("Attempts: " + msg.attempts());
System.out.println("Failed at: " + msg.failedAt());
}
18.3 Replay
// Via API REST
POST /admin/pipelines/v2/{name}/dlq/{messageId}/replay
// Programmatiquement
boolean replayed = queue.replayFromDlq(messageId);
// Replay tous les messages
int count = queue.replayAllFromDlq("exec-001");
18.4 Suppression
// Via API REST
DELETE /admin/pipelines/v2/{name}/dlq/{messageId}
// Programmatiquement
queue.deleteFromDlq(messageId);
// Purge complète
queue.purgeDlq();
19. Persistance et reprise
19.1 Context persistant
Le Pipeline V2 peut persister son état pour survivre aux redémarrages :
import eu.lmvi.socle.pipeline.context.TechDbPipelineContext;
// Créer un context persistant
Supplier<Connection> connectionSupplier = () -> dataSource.getConnection();
PersistentPipelineContext context = new TechDbPipelineContext(connectionSupplier);
// Utiliser avec le builder
PipelineV2<Order, Result> pipeline = PipelineBuilderV2
.<Order>create("order-processing")
.context(context) // Utilise le context persistant
.addStage(...)
.build();
19.2 Reprise après crash
Au redémarrage, les messages non-ackés sont automatiquement re-traités :
- Les messages en status
CLAIMED dont le lease a expiré → remis en PENDING
- Les stages reprennent leur consommation normalement
- Grâce à l’idempotence, les messages déjà traités ne sont pas re-traités
20. Idempotence (OBLIGATOIRE)
20.1 Pourquoi ?
Le modèle at-least-once signifie qu’un message peut être traité plusieurs fois :
- Crash après traitement mais avant ACK
- Timeout de lease pendant un traitement long
- Replay depuis la DLQ
Chaque stage DOIT être idempotent.
20.2 Patterns recommandés
// Pattern 1: Vérifier "déjà traité"
public Order processOrder(Order order) {
if (orderRepository.exists(order.getId())) {
return orderRepository.get(order.getId()); // Déjà traité
}
// Traiter...
return orderRepository.save(order);
}
// Pattern 2: UPSERT au lieu d'INSERT
public void saveOrder(Order order) {
orderRepository.upsert(order); // Idempotent par nature
}
// Pattern 3: Utiliser executionId comme clé
public void processWithDedup(Order order, String executionId) {
String dedupKey = "order:" + order.getId() + ":" + executionId;
if (kvBus.exists(dedupKey)) {
return; // Déjà traité pour cette exécution
}
// Traiter...
kvBus.set(dedupKey, true, Duration.ofDays(7));
}
20.3 Anti-patterns
// MAUVAIS: INSERT sans vérification
db.insert(order); // Échoue si déjà inséré
// MAUVAIS: Compteur incrémental
counter.increment(); // Compté plusieurs fois en cas de replay
// MAUVAIS: Email sans déduplication
emailService.send(email); // Envoyé plusieurs fois
21. API REST d’administration
21.1 Endpoints disponibles
GET /admin/pipelines/v2 → Liste des pipelines
GET /admin/pipelines/v2/{name} → État d'un pipeline
GET /admin/pipelines/v2/{name}/stages → État des stages
GET /admin/pipelines/v2/{name}/executions → Exécutions en cours
GET /admin/pipelines/v2/{name}/dlq → Messages en DLQ
POST /admin/pipelines/v2/{name}/dlq/{id}/replay → Rejouer un message
DELETE /admin/pipelines/v2/{name}/dlq/{id} → Supprimer de la DLQ
POST /admin/pipelines/v2/{name}/dlq/replay-all → Rejouer tous
POST /admin/pipelines/v2/{name}/start → Démarrer le pipeline
POST /admin/pipelines/v2/{name}/stop → Arrêter gracieusement
POST /admin/pipelines/v2/{name}/stop-now → Arrêter immédiatement
21.2 Exemple de réponse
GET /admin/pipelines/v2/order-processing
{
"timestamp": 1735387200000,
"name": "order-processing",
"description": "Pipeline de traitement des commandes",
"running": true,
"stats": {
"stage_count": 3,
"active_executions": 5,
"total_submitted": 1500,
"total_completed": 1480,
"total_failed": 12
}
}
22. Métriques Prometheus
22.1 Métriques exposées
# Compteurs
socle_pipeline_v2_messages_total{pipeline="order-processing", stage="validate", status="success"}
socle_pipeline_v2_messages_total{pipeline="order-processing", stage="validate", status="failed"}
socle_pipeline_v2_dlq_total{pipeline="order-processing", stage="validate"}
# Gauges
socle_pipeline_v2_queue_pending{pipeline="order-processing", stage="validate"}
socle_pipeline_v2_queue_processing{pipeline="order-processing", stage="validate"}
socle_pipeline_v2_dlq_size{pipeline="order-processing", stage="validate"}
socle_pipeline_v2_active_executions{pipeline="order-processing"}
socle_pipeline_v2_running{pipeline="order-processing"}
# Histogrammes (durée de traitement)
socle_pipeline_v2_stage_duration_seconds{pipeline="order-processing", stage="validate", quantile="0.5"}
socle_pipeline_v2_stage_duration_seconds{pipeline="order-processing", stage="validate", quantile="0.95"}
socle_pipeline_v2_stage_duration_seconds{pipeline="order-processing", stage="validate", quantile="0.99"}
22.2 Alertes recommandées
# DLQ croissante
- alert: PipelineDLQGrowing
expr: increase(socle_pipeline_v2_dlq_total[1h]) > 10
labels:
severity: warning
# Queue qui s'accumule (backpressure)
- alert: PipelineQueueBacklog
expr: socle_pipeline_v2_queue_pending > 1000
for: 5m
labels:
severity: warning
# Pipeline arrêté
- alert: PipelineStopped
expr: socle_pipeline_v2_running == 0
for: 1m
labels:
severity: critical
23. Bonnes pratiques V2
DO
- Toujours implémenter l’idempotence dans chaque stage
- Utiliser
TechDbPipelineContext en production pour la persistance
- Configurer des
maxRetries appropriés (3-5 pour API externes)
- Monitorer les métriques DLQ et queue_pending
- Traiter régulièrement les messages en DLQ (replay ou suppression)
DON’T
- Ne jamais supposer qu’un message ne sera traité qu’une fois
- Ne pas ignorer les messages en DLQ
- Ne pas mettre de timeout trop courts (risque de faux positifs)
- Ne pas créer trop de stages (overhead de queues)
- Ne pas oublier d’appeler
pipeline.stop() à l’arrêt de l’application
24. Migration V1 → V2
24.1 Changements d’API
| V1 |
V2 |
PipelineEngine.execute(pipeline, input) |
pipeline.submit(input, executionId) |
Pipeline<I,O> |
PipelineV2<I,O> |
PipelineBuilder |
PipelineBuilderV2 |
| Exécution synchrone |
Exécution asynchrone |
24.2 Étapes de migration
- Rendre les steps idempotents (obligatoire avant migration)
- Créer le pipeline avec
PipelineBuilderV2
- Remplacer
execute() par submit()
- Ajouter le suivi asynchrone des résultats si nécessaire
- Configurer la persistance (
TechDbPipelineContext)
- Activer les métriques et monitoring DLQ
25. Configuration Dynamique (YAML)
Ajouté en version 4.1.0
25.1 Introduction
Le Pipeline V2 peut maintenant être configuré entièrement via YAML, permettant :
- Activation/désactivation des stages sans recompiler
- Configuration dynamique de la concurrence, timeouts, retries
- Ajout de nouveaux stages sans modifier le code de la factory
- Modularité des workers (un worker = un composant Spring)
25.2 Architecture
┌─────────────────────────────────────────────────────────────────┐
│ application.yml │
│ cdc: │
│ pipeline: │
│ stages: │
│ - name: filter │
│ worker: filterWorker │
│ enabled: true │
└───────────────────────────────┬─────────────────────────────────┘
│
▼
┌───────────────────────────────────────────────────────────────────┐
│ PipelineYamlConfig (@ConfigurationProperties) │
│ │
│ - name: String │
│ - stages: List<StageYamlConfig> │
│ - getEnabledStages(): List<StageYamlConfig> │
└───────────────────────────────┬───────────────────────────────────┘
│
▼
┌───────────────────────────────────────────────────────────────────┐
│ DynamicPipelineBuilder │
│ │
│ .config(pipelineConfig) │
│ .workers(Map<String, PipelineStageWorker>) │
│ .build() → PipelineV2<I, O> │
└───────────────────────────────────────────────────────────────────┘
25.3 Interface PipelineStageWorker
Chaque stage est implémenté comme un composant Spring implémentant PipelineStageWorker :
package eu.lmvi.socle.pipeline.v2.worker;
public interface PipelineStageWorker<I, O> {
/**
* Nom unique du worker (utilisé dans la config YAML)
*/
String getName();
/**
* Initialise le worker (chargement ressources, compilation scripts, etc.)
*/
default void initialize() throws Exception {}
/**
* Traite un élément d'entrée (DOIT être thread-safe)
*/
O process(I input) throws Exception;
/**
* Libère les ressources
*/
default void shutdown() {}
/**
* Indique si ce worker est activé
*/
default boolean isEnabled() { return true; }
/**
* Description pour le monitoring
*/
default String getDescription() { return getName(); }
/**
* Ordre de priorité (plus bas = plus tôt)
*/
default int getOrder() { return 100; }
}
25.4 Exemple d’implémentation
@Component("filterWorker")
public class FilterStageWorker implements PipelineStageWorker<WalEvent, PipelineMessage> {
@Autowired
private TableFilter tableFilter;
@Override
public String getName() {
return "filter";
}
@Override
public int getOrder() {
return 10; // Premier dans le pipeline
}
@Override
public PipelineMessage process(WalEvent event) throws Exception {
PipelineMessage message = new PipelineMessage(event);
if (!tableFilter.isAllowed(event.getTable())) {
message.markAsFiltered("table_not_in_whitelist");
}
return message;
}
}
25.5 Configuration YAML
cdc:
pipeline:
name: cdc-pipeline-v2
description: Pipeline CDC PostgreSQL vers Kafka
enabled: true
stages:
# Stage 1: Filtrage
- name: filter
worker: filterWorker # Nom du bean Spring
enabled: true
concurrency: 1
timeout-seconds: 5
max-retries: 1
order: 10
# Stage 2: Transformation
- name: transform
worker: transformWorker
enabled: true
concurrency: 2
timeout-seconds: 30
order: 20
# Stage 3a: Enrichissement Rules
- name: enrich-rules
worker: rulesEnrichmentWorker
enabled: true
concurrency: 2
order: 30
# Stage 3b: Enrichissement BeanShell
- name: enrich-beanshell
worker: beanshellEnrichmentWorker
enabled: ${CDC_ENRICH_BSH_ENABLED:true} # Variable d'environnement
concurrency: 2
order: 31
# Stage 3c: Enrichissement JavaScript
- name: enrich-javascript
worker: javascriptEnrichmentWorker
enabled: ${CDC_ENRICH_JS_ENABLED:true}
concurrency: 4
timeout-seconds: 60
order: 32
# Stage 3d: Enrichissement Janino (nouveau)
- name: enrich-janino
worker: janinoEnrichmentWorker
enabled: ${CDC_ENRICH_JANINO_ENABLED:false} # Désactivé par défaut
concurrency: 2
order: 33
# Stage 4: Publication Kafka
- name: publish
worker: publishWorker
enabled: true
concurrency: 2
max-retries: 5
order: 40
# Stage 5: Commit LSN
- name: commit
worker: commitWorker
enabled: true
concurrency: 1
order: 50
25.6 Classe de configuration Spring
@Configuration
@ConfigurationProperties(prefix = "cdc.pipeline")
public class CdcPipelineConfig extends PipelineYamlConfig {
// Hérite de toutes les propriétés
}
25.7 Construction du pipeline
@Component
public class CdcPipelineFactory {
@Autowired
private CdcPipelineConfig pipelineConfig;
@Autowired
private Map<String, PipelineStageWorker<?, ?>> availableWorkers;
@PostConstruct
public void init() {
// Log des workers disponibles
log.info("Available workers: {}", availableWorkers.keySet());
// Construire le pipeline dynamiquement
cdcPipeline = DynamicPipelineBuilder.create()
.config(pipelineConfig)
.workers(availableWorkers)
.build();
log.info("Pipeline created with {} stages",
pipelineConfig.countEnabledStages());
}
}
25.8 Avantages
| Aspect |
Avant (hardcodé) |
Après (YAML) |
| Modification |
Recompilation |
Redémarrage |
| Activation stage |
Code Java |
Variable d’env |
| Ajout stage |
Modifier factory |
Ajouter worker + config |
| Configuration |
Dans le code |
Centralisée |
| Environnements |
Branches différentes |
Même code, config différente |
25.9 Activation/Désactivation à chaud
Via variables d’environnement dans .env :
# Désactiver BeanShell
CDC_ENRICH_BSH_ENABLED=false
# Activer Janino
CDC_ENRICH_JANINO_ENABLED=true
# Redémarrer le service
sudo systemctl restart cdc-reflet-kafka-v02
25.10 Ajout d’un nouveau type d’enrichissement
- Créer le worker :
@Component("monNouveauWorker")
public class MonNouveauEnrichmentWorker
implements PipelineStageWorker<PipelineMessage, PipelineMessage> {
// ...
}
- Ajouter dans application.yml :
stages:
- name: enrich-nouveau
worker: monNouveauWorker
enabled: ${CDC_ENRICH_NOUVEAU_ENABLED:false}
order: 34
- Redémarrer – le pipeline inclura automatiquement le nouveau stage.
26. Références V2