Socle V004 – Workers

Socle V004 - Workers

05 – Workers

Version : 4.0.1 Date : 2026-01-13

1. Introduction

Les Workers sont les composants de traitement du Socle V4. Chaque Worker implémente une tâche spécifique et son cycle de vie est géré par le MOP (Main Orchestrator Process).

Caractéristiques

  • Interface unique : Tous les workers implémentent Worker
  • Cycle de vie géré : Le MOP orchestre start/stop/doWork
  • Priorités : Ordre de démarrage/arrêt configurable
  • Scheduling : Support cron et intervalle
  • Health check : Supervision intégrée

2. Interface Worker

package eu.lmvi.socle.worker;

public interface Worker {

    /**
     * Nom unique du worker
     */
    String getName();

    /**
     * Initialisation (appelé une fois au démarrage)
     */
    void initialize();

    /**
     * Démarrage du worker
     */
    void start();

    /**
     * Traitement principal (appelé cycliquement)
     */
    void doWork();

    /**
     * Arrêt gracieux
     */
    void stop();

    /**
     * État de santé
     */
    boolean isHealthy();

    /**
     * Statistiques du worker
     */
    Map<String, Object> getStats();

    // === Priorités ===

    /**
     * Priorité au démarrage (plus petit = premier)
     */
    default int getStartPriority() {
        return 100;
    }

    /**
     * Priorité à l'arrêt (plus petit = premier)
     */
    default int getStopPriority() {
        return 100;
    }

    // === Scheduling ===

    /**
     * Expression cron (ou null si non schedulé)
     */
    default String getSchedule() {
        return null;
    }

    /**
     * Intervalle entre les cycles doWork() en ms
     * Valeurs recommandees : 5000 (5s), 10000 (10s), 30000 (30s)
     */
    default long getCycleIntervalMs() {
        return 5000; // 5 secondes par defaut
    }

    /**
     * Worker schedulé par cron ?
     */
    default boolean isScheduled() {
        return getSchedule() != null;
    }

    /**
     * Worker passif (ne fait rien dans doWork) ?
     */
    default boolean isPassive() {
        return false;
    }
}

3. Implémentation de base

3.1 Worker simple

package com.myapp.worker;

import eu.lmvi.socle.worker.Worker;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;

@Component
public class SimpleWorker implements Worker {

    private static final Logger log = LoggerFactory.getLogger(SimpleWorker.class);

    private volatile boolean running = false;
    private long processedCount = 0;

    @Override
    public String getName() {
        return "simple-worker";
    }

    @Override
    public void initialize() {
        log.info("Initializing SimpleWorker");
        // Charger configuration, connexions, etc.
    }

    @Override
    public void start() {
        log.info("Starting SimpleWorker");
        running = true;
    }

    @Override
    public void doWork() {
        if (!running) return;

        try {
            // Traitement principal
            processedCount++;
            log.debug("Processing item #{}", processedCount);
        } catch (Exception e) {
            log.error("Error in doWork", e);
        }
    }

    @Override
    public void stop() {
        log.info("Stopping SimpleWorker");
        running = false;
    }

    @Override
    public boolean isHealthy() {
        return running;
    }

    @Override
    public Map<String, Object> getStats() {
        return Map.of(
            "running", running,
            "processedCount", processedCount
        );
    }
}

3.2 Worker avec priorité

@Component
public class DatabaseWorker implements Worker {

    @Override
    public String getName() {
        return "database-worker";
    }

    @Override
    public int getStartPriority() {
        return 10;  // Démarre en premier (connexion DB)
    }

    @Override
    public int getStopPriority() {
        return 90;  // S'arrête en dernier (flush des données)
    }

    // ... autres méthodes
}

3.3 Worker schedulé (cron)

@Component
public class ReportWorker implements Worker {

    @Override
    public String getName() {
        return "report-worker";
    }

    @Override
    public String getSchedule() {
        return "0 0 6 * * ?";  // Tous les jours à 6h
    }

    @Override
    public boolean isScheduled() {
        return true;
    }

    @Override
    public void doWork() {
        log.info("Generating daily report...");
        // Génération du rapport
    }

    // ... autres méthodes
}

3.4 Worker passif (HTTP)

@Component
public class ApiWorker implements Worker {

    @Override
    public String getName() {
        return "api-worker";
    }

    @Override
    public boolean isPassive() {
        return true;  // Pas de doWork() cyclique
    }

    @Override
    public void doWork() {
        // Ne fait rien - le traitement est déclenché par HTTP
    }

    // Les requêtes HTTP déclenchent le traitement via endpoints REST
}

4. AbstractWorker

Le Socle fournit une classe de base qui simplifie l’implémentation :

package eu.lmvi.socle.worker;

public abstract class AbstractWorker implements Worker {

    protected final Logger log = LoggerFactory.getLogger(getClass());
    protected volatile boolean running = false;
    protected volatile boolean healthy = true;
    protected final AtomicLong processedCount = new AtomicLong(0);
    protected final AtomicLong errorCount = new AtomicLong(0);
    protected Instant lastActivity;

    @Override
    public void initialize() {
        log.info("[{}] Initializing", getName());
        doInitialize();
    }

    @Override
    public void start() {
        log.info("[{}] Starting", getName());
        running = true;
        doStart();
    }

    @Override
    public void stop() {
        log.info("[{}] Stopping", getName());
        running = false;
        doStop();
    }

    @Override
    public void doWork() {
        if (!running) return;

        try {
            doProcess();
            lastActivity = Instant.now();
        } catch (Exception e) {
            errorCount.incrementAndGet();
            handleError(e);
        }
    }

    @Override
    public boolean isHealthy() {
        return running && healthy;
    }

    @Override
    public Map<String, Object> getStats() {
        return Map.of(
            "running", running,
            "healthy", healthy,
            "processedCount", processedCount.get(),
            "errorCount", errorCount.get(),
            "lastActivity", lastActivity != null ? lastActivity.toString() : "never"
        );
    }

    // === Méthodes à implémenter ===

    protected void doInitialize() {}
    protected void doStart() {}
    protected abstract void doProcess();
    protected void doStop() {}

    protected void handleError(Exception e) {
        log.error("[{}] Error in doWork", getName(), e);
    }

    protected void incrementProcessed() {
        processedCount.incrementAndGet();
    }
}

Utilisation

@Component
public class OrderWorker extends AbstractWorker {

    @Override
    public String getName() {
        return "order-worker";
    }

    @Override
    protected void doInitialize() {
        // Initialisation spécifique
    }

    @Override
    protected void doProcess() {
        // Traitement principal
        processOrders();
        incrementProcessed();
    }

    private void processOrders() {
        // ...
    }
}

5. Cycle de vie

5.1 Séquence de démarrage

1. MOP.start()
2. Pour chaque worker (trié par startPriority ASC) :
   a. worker.initialize()
   b. worker.start()
   c. Enregistrement dans Supervisor
3. Boucle principale :
   - Pour chaque worker non-schedulé, non-passif :
     - worker.doWork()
     - Sleep(worker.getCycleIntervalMs())

5.2 Séquence d’arrêt

1. Signal SIGTERM reçu
2. MOP.gracefulShutdown()
3. Pour chaque worker (trié par stopPriority ASC) :
   a. worker.stop()
   b. Attendre terminaison
4. Cleanup final

5.3 Diagramme

                 ┌──────────────┐
                 │   CREATED    │
                 └──────┬───────┘
                        │ initialize()
                        ▼
                 ┌──────────────┐
                 │ INITIALIZED  │
                 └──────┬───────┘
                        │ start()
                        ▼
          ┌─────────────────────────────┐
          │          RUNNING            │
          │                             │
          │  ┌──────────────────────┐   │
          │  │   doWork() loop      │   │
          │  │   (si non-passif)    │   │
          │  └──────────────────────┘   │
          │                             │
          └─────────────┬───────────────┘
                        │ stop()
                        ▼
                 ┌──────────────┐
                 │   STOPPED    │
                 └──────────────┘

6. Communication entre Workers

6.1 Via SharedDataRegistry

@Component
public class ProducerWorker extends AbstractWorker {

    @Autowired
    private SharedDataRegistry registry;

    @Override
    protected void doProcess() {
        // Publier des données
        registry.put("orders.pending.count", pendingOrders.size());
        registry.incrementSequence("orders.total");
    }
}

@Component
public class ConsumerWorker extends AbstractWorker {

    @Autowired
    private SharedDataRegistry registry;

    @Override
    protected void doProcess() {
        // Lire les données
        int pending = registry.getInt("orders.pending.count").orElse(0);
        if (pending > 0) {
            processOrders();
        }
    }
}

6.2 Via KvBus

@Component
public class OrderWorker extends AbstractWorker {

    @Autowired
    private KvBus kvBus;

    @Override
    protected void doProcess() {
        // Stocker l'état partagé (même entre instances)
        kvBus.put("order:" + orderId, orderJson);
    }
}

6.3 Via Events

@Component
public class EventProducerWorker extends AbstractWorker {

    @Autowired
    private ApplicationEventPublisher eventPublisher;

    @Override
    protected void doProcess() {
        // Publier un événement Spring
        eventPublisher.publishEvent(new OrderCreatedEvent(order));
    }
}

@Component
public class EventConsumerWorker extends AbstractWorker {

    @EventListener
    public void onOrderCreated(OrderCreatedEvent event) {
        // Réagir à l'événement
        processOrder(event.getOrder());
    }
}

7. Workers et TechDB (V4)

7.1 Persistance d’état

@Component
public class KafkaConsumerWorker extends AbstractWorker {

    @Autowired
    private TechDbManager techDb;

    private long currentOffset;

    @Override
    protected void doInitialize() {
        // Restaurer l'offset au démarrage
        currentOffset = techDb.getOffset("kafka", "my-topic-0")
            .orElse(0L);
        log.info("Starting from offset: {}", currentOffset);
    }

    @Override
    protected void doProcess() {
        // Traiter les messages
        List<Message> messages = consume(currentOffset);
        for (Message msg : messages) {
            process(msg);
            currentOffset = msg.getOffset();
        }

        // Persister périodiquement
        if (currentOffset % 1000 == 0) {
            techDb.saveOffset("kafka", "my-topic-0", currentOffset, null);
        }
    }

    @Override
    protected void doStop() {
        // Sauvegarder l'offset final
        techDb.saveOffset("kafka", "my-topic-0", currentOffset, null);
    }
}

7.2 État du worker

@Component
public class BatchWorker extends AbstractWorker {

    @Autowired
    private TechDbManager techDb;

    @Override
    protected void doProcess() {
        // Mettre à jour l'état
        techDb.saveWorkerState(getName(), "PROCESSING",
            Map.of(
                "currentBatch", currentBatchId,
                "progress", progress
            ));

        // Traitement...

        techDb.saveWorkerState(getName(), "IDLE", Map.of());
    }
}

8. Patterns courants

8.1 Worker avec retry

@Component
public class RetryableWorker extends AbstractWorker {

    @Autowired
    private RetryTemplate retryTemplate;

    @Override
    protected void doProcess() {
        retryTemplate.execute(context -> {
            processWithRetry();
            return null;
        });
    }
}

8.2 Worker avec circuit breaker

@Component
public class ResilientWorker extends AbstractWorker {

    @Autowired
    private CircuitBreakerTemplate circuitBreaker;

    @Override
    protected void doProcess() {
        circuitBreaker.execute("external-api", () -> {
            callExternalApi();
        });
    }
}

8.3 Worker batch

@Component
public class BatchWorker extends AbstractWorker {

    private final int BATCH_SIZE = 100;

    @Override
    protected void doProcess() {
        List<Item> batch = fetchBatch(BATCH_SIZE);
        if (batch.isEmpty()) {
            return;
        }

        for (Item item : batch) {
            processItem(item);
            incrementProcessed();
        }
    }
}

9. Tests

9.1 Test unitaire

@ExtendWith(MockitoExtension.class)
class SimpleWorkerTest {

    @InjectMocks
    private SimpleWorker worker;

    @Test
    void shouldInitialize() {
        worker.initialize();
        // Assertions...
    }

    @Test
    void shouldProcessItems() {
        worker.initialize();
        worker.start();

        worker.doWork();

        Map<String, Object> stats = worker.getStats();
        assertEquals(1L, stats.get("processedCount"));
    }

    @Test
    void shouldStopGracefully() {
        worker.initialize();
        worker.start();
        worker.stop();

        assertFalse(worker.isHealthy());
    }
}

9.2 Test d’intégration

@SpringBootTest
class WorkerIntegrationTest {

    @Autowired
    private List<Worker> workers;

    @Test
    void allWorkersShouldBeHealthyAfterStart() {
        workers.forEach(Worker::initialize);
        workers.forEach(Worker::start);

        workers.forEach(w -> assertTrue(w.isHealthy(),
            "Worker " + w.getName() + " should be healthy"));
    }
}

10. Auto-Restart des Workers

Le MOP surveille automatiquement la sante des workers et peut les redemarrer en cas d’echec.

Fonctionnement

  1. Detection : Le MOP verifie worker.isHealthy() a chaque cycle de la boucle principale
  2. Compteur : Un compteur d’echecs consecutifs est maintenu pour chaque worker
  3. Seuil : Apres 3 echecs consecutifs (configurable), le worker est redemarre
  4. Restart : Sequence stop()initialize()start() → replanification doWork()
  5. Reset : Le compteur est remis a zero si le worker redevient healthy

Logs associes

[step:worker_auto_restart] Worker 'xxx' unhealthy 3 fois, tentative de redemarrage
[step:worker_restarted] Worker 'xxx' redemarre avec succes
[step:worker_restart_failed] Echec du redemarrage de 'xxx': <error>

Implementation dans le Worker

Pour beneficier de l’auto-restart, le worker doit :

  1. Retourner false dans isHealthy() en cas de probleme
  2. Supporter un cycle stop()initialize()start()
  3. Reinitialiser son etat interne dans initialize()
@Override
public boolean isHealthy() {
    // Retourner false declenche le compteur d'echecs
    return lastApiCallSuccessful && !hasConsecutiveErrors;
}

Workers exclus

Les workers built-in du socle (http_worker, control_worker, etc.) ne sont pas soumis a l’auto-restart car ils sont geres par le Supervisor.

11. Workers Event-Driven

Les Event-Driven Workers sont des workers passifs declenches par des evenements externes (Kafka, queues, CDC, etc.).

Classe de base

package eu.lmvi.socle.worker.event;

public abstract class AbstractEventDrivenWorker<T> implements Worker {

    // Concurrence configurable
    protected AbstractEventDrivenWorker(int concurrency) { ... }

    // Mode PASSIVE automatique
    @Override
    public final boolean isPassive() { return true; }

    @Override
    public final String getSchedule() { return "PASSIVE"; }

    // Methodes abstraites a implementer
    protected abstract T pollEvent() throws InterruptedException;
    protected abstract void processEvent(T event);

    // Hooks optionnels
    protected void onInitialize() { }
    protected void onStarted() { }
    protected void onStopping() { }
    protected void onStopped() { }
    protected void handleError(T event, Exception e, int workerId) { }
    public long getBacklog() { return 0; }
}

Exemple d’implementation

@Component
public class KafkaEventWorker extends AbstractEventDrivenWorker<ConsumerRecord<String, String>> {

    private final BlockingQueue<ConsumerRecord<String, String>> eventQueue = new LinkedBlockingQueue<>();

    public KafkaEventWorker() {
        super(4); // 4 threads concurrents
    }

    @Override
    public String getName() {
        return "kafka_event_worker";
    }

    @Override
    protected ConsumerRecord<String, String> pollEvent() throws InterruptedException {
        return eventQueue.poll(100, TimeUnit.MILLISECONDS);
    }

    @Override
    protected void processEvent(ConsumerRecord<String, String> event) {
        // Traitement de l'evenement
        log.info("Processing: {}", event.value());
    }

    @Override
    public long getBacklog() {
        return eventQueue.size();
    }
}

Metriques standardisees

Les Event-Driven Workers exposent des metriques compatibles avec le StatusDashboard :

Cle Type Description
state String "running" ou "stopped"
execution_count long Nombre d’evenements traites
errors_count long Nombre d’erreurs
last_execution String ISO-8601 du dernier traitement
schedule String Toujours "PASSIVE"
messages_processed long Alias de execution_count
throughput_per_sec double Debit calcule
backlog long Evenements en attente

12. Convention des Stats Workers

Pour une integration correcte avec le StatusDashboard et le WorkerActivityTracker, tous les workers doivent exposer des cles standardisees dans getStats().

Cles obligatoires

Cle Type Description
state String "running" ou "stopped"
execution_count long Nombre total d’executions
errors_count long Nombre d’erreurs
last_execution String/long Timestamp ISO-8601 ou epoch ms
schedule String Mode: "PASSIVE", "INTERVAL", "CRON", ou expression cron

Cles optionnelles

Cle Type Description
total_duration_ms long Duree totale cumulee
avg_duration_ms double Duree moyenne par execution
throughput_per_sec double Debit (ops/sec)
messages_processed long Pour Event-Driven (alias execution_count)
backlog long File d’attente

Exemple d’implementation

@Override
public Map<String, Object> getStats() {
    Map<String, Object> stats = new HashMap<>();

    // Cles standardisees (obligatoires)
    stats.put("state", running ? "running" : "stopped");
    stats.put("execution_count", executionCount.get());
    stats.put("errors_count", errorCount.get());
    stats.put("last_execution", lastExecution != null
        ? lastExecution.toString()  // ISO-8601
        : null);
    stats.put("schedule", getSchedule() != null ? getSchedule() : "INTERVAL");

    // Cles optionnelles
    stats.put("total_duration_ms", totalDurationMs.get());
    stats.put("avg_duration_ms", executionCount.get() > 0
        ? (double) totalDurationMs.get() / executionCount.get()
        : 0.0);

    return stats;
}

13. JaninoWorker (Scripts Java Compiles)

Le JaninoWorker permet d’executer du code Java compile dynamiquement a la volee. Contrairement au ScriptEngine (interprete), Janino compile le code en bytecode JVM pour des performances natives.

13.1 Activation

socle:
  janino:
    enabled: true
    scripts-path: ./repository/scripts/java
    reload-interval-ms: 300000  # 5 minutes

13.2 Interfaces disponibles

Les scripts doivent implementer une des interfaces :

Interface Usage Methode
Calculator<T> Calculs (frais, taxes) T calculate(Map<String, Object> context)
Executable Execution generique Object execute(Map<String, Object> context)
Validator Validations ValidationResult validate(Object input)

13.3 Exemple de script

// Fichier: repository/scripts/java/KrakenFeeCalculator.java

import eu.lmvi.socle.janino.interfaces.Calculator;
import java.math.BigDecimal;
import java.math.RoundingMode;
import java.util.Map;

public class KrakenFeeCalculator implements Calculator<BigDecimal> {

    private static final BigDecimal FEE_RATE = new BigDecimal("0.0026");

    @Override
    public BigDecimal calculate(Map<String, Object> context) {
        BigDecimal amount = (BigDecimal) context.get("amount");
        return amount.multiply(FEE_RATE).setScale(8, RoundingMode.HALF_UP);
    }
}

13.4 Utilisation dans un Worker

@Component
public class TradingWorker implements Worker {

    @Autowired
    private JaninoWorker janinoWorker;

    @Override
    public void doWork() {
        Map<String, Object> context = Map.of(
            "amount", new BigDecimal("1000.00")
        );

        BigDecimal fee = janinoWorker.execute(
            "KrakenFeeCalculator",
            context,
            BigDecimal.class
        );

        log.info("Fee calculated: {}", fee);
    }
}

13.5 Acces direct au moteur

@Autowired
private JaninoWorker janinoWorker;

// Compiler un script a la volee
janinoWorker.compileScript("MyScript", sourceCode);

// Verifier si compile
boolean ready = janinoWorker.isScriptCompiled("MyScript");

// Forcer le rechargement
janinoWorker.forceReload();

// Acces au moteur
JaninoEngine engine = janinoWorker.getEngine();

13.6 Securite

Janino bloque l’acces aux packages dangereux :

  • java.io – Acces fichiers
  • java.net – Acces reseau
  • java.lang.reflect – Reflection
  • sun.*, com.sun.* – Classes internes

13.7 Comparaison avec ScriptEngine

Critere ScriptEngine JaninoWorker
Langages JavaScript, BeanShell Java pur
Performance Interprete Compile (natif JVM)
Typage Dynamique Statique
Hot reload Non Oui (automatique)
Use case Prototypage, scripts simples Calculs haute perf

14. Bonnes pratiques

DO

  • Implémenter stop() pour un arrêt gracieux
  • Utiliser isHealthy() pour signaler les problèmes
  • Logger les transitions d’état
  • Gérer les exceptions dans doWork()
  • Utiliser des priorités pour les dépendances

DON’T

  • Ne pas bloquer indéfiniment dans doWork()
  • Ne pas ignorer les signaux d’arrêt
  • Ne pas oublier de décrémenter les ressources dans stop()
  • Ne pas utiliser de variables statiques pour l’état

15. Références

Commentaires

Laisser un commentaire

Votre adresse e-mail ne sera pas publiée. Les champs obligatoires sont indiqués avec *