05 – Workers
Version : 4.0.1 Date : 2026-01-13
1. Introduction
Les Workers sont les composants de traitement du Socle V4. Chaque Worker implémente une tâche spécifique et son cycle de vie est géré par le MOP (Main Orchestrator Process).
Caractéristiques
- Interface unique : Tous les workers implémentent
Worker - Cycle de vie géré : Le MOP orchestre start/stop/doWork
- Priorités : Ordre de démarrage/arrêt configurable
- Scheduling : Support cron et intervalle
- Health check : Supervision intégrée
2. Interface Worker
package eu.lmvi.socle.worker;
public interface Worker {
/**
* Nom unique du worker
*/
String getName();
/**
* Initialisation (appelé une fois au démarrage)
*/
void initialize();
/**
* Démarrage du worker
*/
void start();
/**
* Traitement principal (appelé cycliquement)
*/
void doWork();
/**
* Arrêt gracieux
*/
void stop();
/**
* État de santé
*/
boolean isHealthy();
/**
* Statistiques du worker
*/
Map<String, Object> getStats();
// === Priorités ===
/**
* Priorité au démarrage (plus petit = premier)
*/
default int getStartPriority() {
return 100;
}
/**
* Priorité à l'arrêt (plus petit = premier)
*/
default int getStopPriority() {
return 100;
}
// === Scheduling ===
/**
* Expression cron (ou null si non schedulé)
*/
default String getSchedule() {
return null;
}
/**
* Intervalle entre les cycles doWork() en ms
* Valeurs recommandees : 5000 (5s), 10000 (10s), 30000 (30s)
*/
default long getCycleIntervalMs() {
return 5000; // 5 secondes par defaut
}
/**
* Worker schedulé par cron ?
*/
default boolean isScheduled() {
return getSchedule() != null;
}
/**
* Worker passif (ne fait rien dans doWork) ?
*/
default boolean isPassive() {
return false;
}
}
3. Implémentation de base
3.1 Worker simple
package com.myapp.worker;
import eu.lmvi.socle.worker.Worker;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
@Component
public class SimpleWorker implements Worker {
private static final Logger log = LoggerFactory.getLogger(SimpleWorker.class);
private volatile boolean running = false;
private long processedCount = 0;
@Override
public String getName() {
return "simple-worker";
}
@Override
public void initialize() {
log.info("Initializing SimpleWorker");
// Charger configuration, connexions, etc.
}
@Override
public void start() {
log.info("Starting SimpleWorker");
running = true;
}
@Override
public void doWork() {
if (!running) return;
try {
// Traitement principal
processedCount++;
log.debug("Processing item #{}", processedCount);
} catch (Exception e) {
log.error("Error in doWork", e);
}
}
@Override
public void stop() {
log.info("Stopping SimpleWorker");
running = false;
}
@Override
public boolean isHealthy() {
return running;
}
@Override
public Map<String, Object> getStats() {
return Map.of(
"running", running,
"processedCount", processedCount
);
}
}
3.2 Worker avec priorité
@Component
public class DatabaseWorker implements Worker {
@Override
public String getName() {
return "database-worker";
}
@Override
public int getStartPriority() {
return 10; // Démarre en premier (connexion DB)
}
@Override
public int getStopPriority() {
return 90; // S'arrête en dernier (flush des données)
}
// ... autres méthodes
}
3.3 Worker schedulé (cron)
@Component
public class ReportWorker implements Worker {
@Override
public String getName() {
return "report-worker";
}
@Override
public String getSchedule() {
return "0 0 6 * * ?"; // Tous les jours à 6h
}
@Override
public boolean isScheduled() {
return true;
}
@Override
public void doWork() {
log.info("Generating daily report...");
// Génération du rapport
}
// ... autres méthodes
}
3.4 Worker passif (HTTP)
@Component
public class ApiWorker implements Worker {
@Override
public String getName() {
return "api-worker";
}
@Override
public boolean isPassive() {
return true; // Pas de doWork() cyclique
}
@Override
public void doWork() {
// Ne fait rien - le traitement est déclenché par HTTP
}
// Les requêtes HTTP déclenchent le traitement via endpoints REST
}
4. AbstractWorker
Le Socle fournit une classe de base qui simplifie l’implémentation :
package eu.lmvi.socle.worker;
public abstract class AbstractWorker implements Worker {
protected final Logger log = LoggerFactory.getLogger(getClass());
protected volatile boolean running = false;
protected volatile boolean healthy = true;
protected final AtomicLong processedCount = new AtomicLong(0);
protected final AtomicLong errorCount = new AtomicLong(0);
protected Instant lastActivity;
@Override
public void initialize() {
log.info("[{}] Initializing", getName());
doInitialize();
}
@Override
public void start() {
log.info("[{}] Starting", getName());
running = true;
doStart();
}
@Override
public void stop() {
log.info("[{}] Stopping", getName());
running = false;
doStop();
}
@Override
public void doWork() {
if (!running) return;
try {
doProcess();
lastActivity = Instant.now();
} catch (Exception e) {
errorCount.incrementAndGet();
handleError(e);
}
}
@Override
public boolean isHealthy() {
return running && healthy;
}
@Override
public Map<String, Object> getStats() {
return Map.of(
"running", running,
"healthy", healthy,
"processedCount", processedCount.get(),
"errorCount", errorCount.get(),
"lastActivity", lastActivity != null ? lastActivity.toString() : "never"
);
}
// === Méthodes à implémenter ===
protected void doInitialize() {}
protected void doStart() {}
protected abstract void doProcess();
protected void doStop() {}
protected void handleError(Exception e) {
log.error("[{}] Error in doWork", getName(), e);
}
protected void incrementProcessed() {
processedCount.incrementAndGet();
}
}
Utilisation
@Component
public class OrderWorker extends AbstractWorker {
@Override
public String getName() {
return "order-worker";
}
@Override
protected void doInitialize() {
// Initialisation spécifique
}
@Override
protected void doProcess() {
// Traitement principal
processOrders();
incrementProcessed();
}
private void processOrders() {
// ...
}
}
5. Cycle de vie
5.1 Séquence de démarrage
1. MOP.start()
2. Pour chaque worker (trié par startPriority ASC) :
a. worker.initialize()
b. worker.start()
c. Enregistrement dans Supervisor
3. Boucle principale :
- Pour chaque worker non-schedulé, non-passif :
- worker.doWork()
- Sleep(worker.getCycleIntervalMs())
5.2 Séquence d’arrêt
1. Signal SIGTERM reçu
2. MOP.gracefulShutdown()
3. Pour chaque worker (trié par stopPriority ASC) :
a. worker.stop()
b. Attendre terminaison
4. Cleanup final
5.3 Diagramme
┌──────────────┐
│ CREATED │
└──────┬───────┘
│ initialize()
▼
┌──────────────┐
│ INITIALIZED │
└──────┬───────┘
│ start()
▼
┌─────────────────────────────┐
│ RUNNING │
│ │
│ ┌──────────────────────┐ │
│ │ doWork() loop │ │
│ │ (si non-passif) │ │
│ └──────────────────────┘ │
│ │
└─────────────┬───────────────┘
│ stop()
▼
┌──────────────┐
│ STOPPED │
└──────────────┘
6. Communication entre Workers
6.1 Via SharedDataRegistry
@Component
public class ProducerWorker extends AbstractWorker {
@Autowired
private SharedDataRegistry registry;
@Override
protected void doProcess() {
// Publier des données
registry.put("orders.pending.count", pendingOrders.size());
registry.incrementSequence("orders.total");
}
}
@Component
public class ConsumerWorker extends AbstractWorker {
@Autowired
private SharedDataRegistry registry;
@Override
protected void doProcess() {
// Lire les données
int pending = registry.getInt("orders.pending.count").orElse(0);
if (pending > 0) {
processOrders();
}
}
}
6.2 Via KvBus
@Component
public class OrderWorker extends AbstractWorker {
@Autowired
private KvBus kvBus;
@Override
protected void doProcess() {
// Stocker l'état partagé (même entre instances)
kvBus.put("order:" + orderId, orderJson);
}
}
6.3 Via Events
@Component
public class EventProducerWorker extends AbstractWorker {
@Autowired
private ApplicationEventPublisher eventPublisher;
@Override
protected void doProcess() {
// Publier un événement Spring
eventPublisher.publishEvent(new OrderCreatedEvent(order));
}
}
@Component
public class EventConsumerWorker extends AbstractWorker {
@EventListener
public void onOrderCreated(OrderCreatedEvent event) {
// Réagir à l'événement
processOrder(event.getOrder());
}
}
7. Workers et TechDB (V4)
7.1 Persistance d’état
@Component
public class KafkaConsumerWorker extends AbstractWorker {
@Autowired
private TechDbManager techDb;
private long currentOffset;
@Override
protected void doInitialize() {
// Restaurer l'offset au démarrage
currentOffset = techDb.getOffset("kafka", "my-topic-0")
.orElse(0L);
log.info("Starting from offset: {}", currentOffset);
}
@Override
protected void doProcess() {
// Traiter les messages
List<Message> messages = consume(currentOffset);
for (Message msg : messages) {
process(msg);
currentOffset = msg.getOffset();
}
// Persister périodiquement
if (currentOffset % 1000 == 0) {
techDb.saveOffset("kafka", "my-topic-0", currentOffset, null);
}
}
@Override
protected void doStop() {
// Sauvegarder l'offset final
techDb.saveOffset("kafka", "my-topic-0", currentOffset, null);
}
}
7.2 État du worker
@Component
public class BatchWorker extends AbstractWorker {
@Autowired
private TechDbManager techDb;
@Override
protected void doProcess() {
// Mettre à jour l'état
techDb.saveWorkerState(getName(), "PROCESSING",
Map.of(
"currentBatch", currentBatchId,
"progress", progress
));
// Traitement...
techDb.saveWorkerState(getName(), "IDLE", Map.of());
}
}
8. Patterns courants
8.1 Worker avec retry
@Component
public class RetryableWorker extends AbstractWorker {
@Autowired
private RetryTemplate retryTemplate;
@Override
protected void doProcess() {
retryTemplate.execute(context -> {
processWithRetry();
return null;
});
}
}
8.2 Worker avec circuit breaker
@Component
public class ResilientWorker extends AbstractWorker {
@Autowired
private CircuitBreakerTemplate circuitBreaker;
@Override
protected void doProcess() {
circuitBreaker.execute("external-api", () -> {
callExternalApi();
});
}
}
8.3 Worker batch
@Component
public class BatchWorker extends AbstractWorker {
private final int BATCH_SIZE = 100;
@Override
protected void doProcess() {
List<Item> batch = fetchBatch(BATCH_SIZE);
if (batch.isEmpty()) {
return;
}
for (Item item : batch) {
processItem(item);
incrementProcessed();
}
}
}
9. Tests
9.1 Test unitaire
@ExtendWith(MockitoExtension.class)
class SimpleWorkerTest {
@InjectMocks
private SimpleWorker worker;
@Test
void shouldInitialize() {
worker.initialize();
// Assertions...
}
@Test
void shouldProcessItems() {
worker.initialize();
worker.start();
worker.doWork();
Map<String, Object> stats = worker.getStats();
assertEquals(1L, stats.get("processedCount"));
}
@Test
void shouldStopGracefully() {
worker.initialize();
worker.start();
worker.stop();
assertFalse(worker.isHealthy());
}
}
9.2 Test d’intégration
@SpringBootTest
class WorkerIntegrationTest {
@Autowired
private List<Worker> workers;
@Test
void allWorkersShouldBeHealthyAfterStart() {
workers.forEach(Worker::initialize);
workers.forEach(Worker::start);
workers.forEach(w -> assertTrue(w.isHealthy(),
"Worker " + w.getName() + " should be healthy"));
}
}
10. Auto-Restart des Workers
Le MOP surveille automatiquement la sante des workers et peut les redemarrer en cas d’echec.
Fonctionnement
- Detection : Le MOP verifie
worker.isHealthy()a chaque cycle de la boucle principale - Compteur : Un compteur d’echecs consecutifs est maintenu pour chaque worker
- Seuil : Apres 3 echecs consecutifs (configurable), le worker est redemarre
- Restart : Sequence
stop()→initialize()→start()→ replanification doWork() - Reset : Le compteur est remis a zero si le worker redevient healthy
Logs associes
[step:worker_auto_restart] Worker 'xxx' unhealthy 3 fois, tentative de redemarrage
[step:worker_restarted] Worker 'xxx' redemarre avec succes
[step:worker_restart_failed] Echec du redemarrage de 'xxx': <error>
Implementation dans le Worker
Pour beneficier de l’auto-restart, le worker doit :
- Retourner
falsedansisHealthy()en cas de probleme - Supporter un cycle
stop()→initialize()→start() - Reinitialiser son etat interne dans
initialize()
@Override
public boolean isHealthy() {
// Retourner false declenche le compteur d'echecs
return lastApiCallSuccessful && !hasConsecutiveErrors;
}
Workers exclus
Les workers built-in du socle (http_worker, control_worker, etc.) ne sont pas soumis a l’auto-restart car ils sont geres par le Supervisor.
11. Workers Event-Driven
Les Event-Driven Workers sont des workers passifs declenches par des evenements externes (Kafka, queues, CDC, etc.).
Classe de base
package eu.lmvi.socle.worker.event;
public abstract class AbstractEventDrivenWorker<T> implements Worker {
// Concurrence configurable
protected AbstractEventDrivenWorker(int concurrency) { ... }
// Mode PASSIVE automatique
@Override
public final boolean isPassive() { return true; }
@Override
public final String getSchedule() { return "PASSIVE"; }
// Methodes abstraites a implementer
protected abstract T pollEvent() throws InterruptedException;
protected abstract void processEvent(T event);
// Hooks optionnels
protected void onInitialize() { }
protected void onStarted() { }
protected void onStopping() { }
protected void onStopped() { }
protected void handleError(T event, Exception e, int workerId) { }
public long getBacklog() { return 0; }
}
Exemple d’implementation
@Component
public class KafkaEventWorker extends AbstractEventDrivenWorker<ConsumerRecord<String, String>> {
private final BlockingQueue<ConsumerRecord<String, String>> eventQueue = new LinkedBlockingQueue<>();
public KafkaEventWorker() {
super(4); // 4 threads concurrents
}
@Override
public String getName() {
return "kafka_event_worker";
}
@Override
protected ConsumerRecord<String, String> pollEvent() throws InterruptedException {
return eventQueue.poll(100, TimeUnit.MILLISECONDS);
}
@Override
protected void processEvent(ConsumerRecord<String, String> event) {
// Traitement de l'evenement
log.info("Processing: {}", event.value());
}
@Override
public long getBacklog() {
return eventQueue.size();
}
}
Metriques standardisees
Les Event-Driven Workers exposent des metriques compatibles avec le StatusDashboard :
| Cle | Type | Description |
|---|---|---|
state |
String | "running" ou "stopped" |
execution_count |
long | Nombre d’evenements traites |
errors_count |
long | Nombre d’erreurs |
last_execution |
String | ISO-8601 du dernier traitement |
schedule |
String | Toujours "PASSIVE" |
messages_processed |
long | Alias de execution_count |
throughput_per_sec |
double | Debit calcule |
backlog |
long | Evenements en attente |
12. Convention des Stats Workers
Pour une integration correcte avec le StatusDashboard et le WorkerActivityTracker, tous les workers doivent exposer des cles standardisees dans getStats().
Cles obligatoires
| Cle | Type | Description |
|---|---|---|
state |
String | "running" ou "stopped" |
execution_count |
long | Nombre total d’executions |
errors_count |
long | Nombre d’erreurs |
last_execution |
String/long | Timestamp ISO-8601 ou epoch ms |
schedule |
String | Mode: "PASSIVE", "INTERVAL", "CRON", ou expression cron |
Cles optionnelles
| Cle | Type | Description |
|---|---|---|
total_duration_ms |
long | Duree totale cumulee |
avg_duration_ms |
double | Duree moyenne par execution |
throughput_per_sec |
double | Debit (ops/sec) |
messages_processed |
long | Pour Event-Driven (alias execution_count) |
backlog |
long | File d’attente |
Exemple d’implementation
@Override
public Map<String, Object> getStats() {
Map<String, Object> stats = new HashMap<>();
// Cles standardisees (obligatoires)
stats.put("state", running ? "running" : "stopped");
stats.put("execution_count", executionCount.get());
stats.put("errors_count", errorCount.get());
stats.put("last_execution", lastExecution != null
? lastExecution.toString() // ISO-8601
: null);
stats.put("schedule", getSchedule() != null ? getSchedule() : "INTERVAL");
// Cles optionnelles
stats.put("total_duration_ms", totalDurationMs.get());
stats.put("avg_duration_ms", executionCount.get() > 0
? (double) totalDurationMs.get() / executionCount.get()
: 0.0);
return stats;
}
13. JaninoWorker (Scripts Java Compiles)
Le JaninoWorker permet d’executer du code Java compile dynamiquement a la volee. Contrairement au ScriptEngine (interprete), Janino compile le code en bytecode JVM pour des performances natives.
13.1 Activation
socle:
janino:
enabled: true
scripts-path: ./repository/scripts/java
reload-interval-ms: 300000 # 5 minutes
13.2 Interfaces disponibles
Les scripts doivent implementer une des interfaces :
| Interface | Usage | Methode |
|---|---|---|
Calculator<T> |
Calculs (frais, taxes) | T calculate(Map<String, Object> context) |
Executable |
Execution generique | Object execute(Map<String, Object> context) |
Validator |
Validations | ValidationResult validate(Object input) |
13.3 Exemple de script
// Fichier: repository/scripts/java/KrakenFeeCalculator.java
import eu.lmvi.socle.janino.interfaces.Calculator;
import java.math.BigDecimal;
import java.math.RoundingMode;
import java.util.Map;
public class KrakenFeeCalculator implements Calculator<BigDecimal> {
private static final BigDecimal FEE_RATE = new BigDecimal("0.0026");
@Override
public BigDecimal calculate(Map<String, Object> context) {
BigDecimal amount = (BigDecimal) context.get("amount");
return amount.multiply(FEE_RATE).setScale(8, RoundingMode.HALF_UP);
}
}
13.4 Utilisation dans un Worker
@Component
public class TradingWorker implements Worker {
@Autowired
private JaninoWorker janinoWorker;
@Override
public void doWork() {
Map<String, Object> context = Map.of(
"amount", new BigDecimal("1000.00")
);
BigDecimal fee = janinoWorker.execute(
"KrakenFeeCalculator",
context,
BigDecimal.class
);
log.info("Fee calculated: {}", fee);
}
}
13.5 Acces direct au moteur
@Autowired
private JaninoWorker janinoWorker;
// Compiler un script a la volee
janinoWorker.compileScript("MyScript", sourceCode);
// Verifier si compile
boolean ready = janinoWorker.isScriptCompiled("MyScript");
// Forcer le rechargement
janinoWorker.forceReload();
// Acces au moteur
JaninoEngine engine = janinoWorker.getEngine();
13.6 Securite
Janino bloque l’acces aux packages dangereux :
java.io– Acces fichiersjava.net– Acces reseaujava.lang.reflect– Reflectionsun.*,com.sun.*– Classes internes
13.7 Comparaison avec ScriptEngine
| Critere | ScriptEngine | JaninoWorker |
|---|---|---|
| Langages | JavaScript, BeanShell | Java pur |
| Performance | Interprete | Compile (natif JVM) |
| Typage | Dynamique | Statique |
| Hot reload | Non | Oui (automatique) |
| Use case | Prototypage, scripts simples | Calculs haute perf |
14. Bonnes pratiques
DO
- Implémenter
stop()pour un arrêt gracieux - Utiliser
isHealthy()pour signaler les problèmes - Logger les transitions d’état
- Gérer les exceptions dans
doWork() - Utiliser des priorités pour les dépendances
DON’T
- Ne pas bloquer indéfiniment dans
doWork() - Ne pas ignorer les signaux d’arrêt
- Ne pas oublier de décrémenter les ressources dans
stop() - Ne pas utiliser de variables statiques pour l’état
15. Références
- 02-ARCHITECTURE – Architecture MOP
- 06-KV-BUS – Stockage clé-valeur
- 07-SHARED-DATA – Données partagées
- 08-SUPERVISOR – Supervision
- 11-RESILIENCE – Patterns de résilience
- 21-H2-TECHDB – Base de données technique
- 27-STATUS-DASHBOARD – Dashboard de supervision
- Spec Janino – Specification complete









