Socle V004 – Supervisor

Socle V004 - Supervisor

08 – Supervisor

Version : 4.0.0 Date : 2025-12-09

1. Introduction

Le Supervisor est le composant de supervision du Socle V4. Il surveille l’état de santé des Workers via heartbeats et expose des métriques de santé.

Fonctionnalités

  • Collecte des heartbeats des Workers
  • Détection des Workers défaillants
  • Agrégation de l’état de santé global
  • Exposition via API REST et métriques
  • Intégration avec SharedDataRegistry

2. Architecture

┌─────────────────────────────────────────────────────────────┐
│                        Supervisor                            │
│                                                              │
│   ┌──────────────────┐    ┌──────────────────┐              │
│   │ HeartbeatCollector│    │ HealthAggregator │              │
│   └────────┬─────────┘    └────────┬─────────┘              │
│            │                       │                         │
│            ▼                       ▼                         │
│   ┌──────────────────┐    ┌──────────────────┐              │
│   │ Worker States    │    │ Global Health    │              │
│   │ (per worker)     │    │ (aggregated)     │              │
│   └──────────────────┘    └──────────────────┘              │
│                                                              │
└──────────────────────────────────────────────────────────────┘
         │                           │
         ▼                           ▼
┌─────────────────┐         ┌─────────────────┐
│ /admin/health   │         │ /admin/workers  │
└─────────────────┘         └─────────────────┘

3. Configuration

3.1 application.yml

socle:
  supervisor:
    heartbeat-interval-ms: ${SUPERVISOR_HEARTBEAT_MS:10000}
    unhealthy-threshold: ${SUPERVISOR_UNHEALTHY_THRESHOLD:3}
    check-interval-ms: ${SUPERVISOR_CHECK_INTERVAL_MS:5000}
    stale-timeout-ms: ${SUPERVISOR_STALE_TIMEOUT_MS:60000}

3.2 Variables d’environnement

Variable Description Défaut
SUPERVISOR_HEARTBEAT_MS Intervalle heartbeat attendu 10000 (10s)
SUPERVISOR_UNHEALTHY_THRESHOLD Heartbeats manqués avant UNHEALTHY 3
SUPERVISOR_CHECK_INTERVAL_MS Intervalle de vérification 5000 (5s)
SUPERVISOR_STALE_TIMEOUT_MS Timeout avant worker STALE 60000 (1min)

4. Interface Supervisor

package eu.lmvi.socle.supervisor;

public interface Supervisor {

    /**
     * Enregistre un worker dans le supervisor
     */
    void registerWorker(Worker worker);

    /**
     * Désenregistre un worker
     */
    void unregisterWorker(String workerName);

    /**
     * Reçoit un heartbeat d'un worker
     */
    void heartbeat(String workerName);

    /**
     * Reçoit un heartbeat avec métriques
     */
    void heartbeat(String workerName, Map<String, Object> metrics);

    /**
     * Récupère l'état d'un worker
     */
    WorkerState getWorkerState(String workerName);

    /**
     * Récupère l'état de tous les workers
     */
    Map<String, WorkerState> getAllWorkerStates();

    /**
     * Vérifie si un worker est healthy
     */
    boolean isWorkerHealthy(String workerName);

    /**
     * Récupère l'état de santé global
     */
    HealthStatus getGlobalHealth();

    /**
     * Liste les workers unhealthy
     */
    List<String> getUnhealthyWorkers();

    /**
     * Démarre la supervision
     */
    void start();

    /**
     * Arrête la supervision
     */
    void stop();
}

5. États des Workers

5.1 WorkerState

package eu.lmvi.socle.supervisor;

public record WorkerState(
    String workerName,
    WorkerStatus status,
    Instant lastHeartbeat,
    int missedHeartbeats,
    Map<String, Object> lastMetrics,
    Instant registeredAt
) {
    public boolean isHealthy() {
        return status == WorkerStatus.RUNNING;
    }

    public boolean isStale() {
        return status == WorkerStatus.STALE;
    }
}

5.2 WorkerStatus

public enum WorkerStatus {
    /**
     * Worker enregistré mais pas encore démarré
     */
    REGISTERED,

    /**
     * Worker en cours d'exécution, heartbeats reçus
     */
    RUNNING,

    /**
     * Heartbeats manqués mais pas encore timeout
     */
    DEGRADED,

    /**
     * Trop de heartbeats manqués, worker considéré unhealthy
     */
    UNHEALTHY,

    /**
     * Aucun heartbeat depuis longtemps, worker potentiellement mort
     */
    STALE,

    /**
     * Worker arrêté proprement
     */
    STOPPED
}

5.3 Diagramme d’états

                    ┌────────────────┐
                    │   REGISTERED   │
                    └───────┬────────┘
                            │ first heartbeat
                            ▼
                    ┌────────────────┐
         ┌─────────│    RUNNING     │─────────┐
         │         └───────┬────────┘         │
         │                 │                  │
         │ heartbeat       │ missed           │ stop()
         │ received        │ heartbeat        │
         │                 ▼                  │
         │         ┌────────────────┐         │
         └────────►│   DEGRADED     │         │
                   └───────┬────────┘         │
                           │ threshold        │
                           │ exceeded         │
                           ▼                  │
                   ┌────────────────┐         │
                   │   UNHEALTHY    │         │
                   └───────┬────────┘         │
                           │ stale            │
                           │ timeout          │
                           ▼                  │
                   ┌────────────────┐         │
                   │     STALE      │         │
                   └────────────────┘         │
                                              │
                   ┌────────────────┐         │
                   │    STOPPED     │◄────────┘
                   └────────────────┘

6. Implémentation

package eu.lmvi.socle.supervisor;

@Component
public class DefaultSupervisor implements Supervisor {

    private static final Logger log = LoggerFactory.getLogger(DefaultSupervisor.class);

    private final SocleConfiguration config;
    private final ConcurrentHashMap<String, WorkerStateInternal> workers = new ConcurrentHashMap<>();
    private final ScheduledExecutorService scheduler;
    private volatile boolean running = false;

    public DefaultSupervisor(SocleConfiguration config) {
        this.config = config;
        this.scheduler = Executors.newSingleThreadScheduledExecutor(
            r -> new Thread(r, "supervisor-checker"));
    }

    @Override
    public void registerWorker(Worker worker) {
        String name = worker.getName();
        workers.put(name, new WorkerStateInternal(
            name, WorkerStatus.REGISTERED, Instant.now(), 0, Map.of(), Instant.now()));
        log.info("Worker registered: {}", name);
    }

    @Override
    public void unregisterWorker(String workerName) {
        WorkerStateInternal state = workers.remove(workerName);
        if (state != null) {
            log.info("Worker unregistered: {}", workerName);
        }
    }

    @Override
    public void heartbeat(String workerName) {
        heartbeat(workerName, Map.of());
    }

    @Override
    public void heartbeat(String workerName, Map<String, Object> metrics) {
        workers.computeIfPresent(workerName, (name, state) -> {
            log.debug("Heartbeat received: {}", name);
            return new WorkerStateInternal(
                name,
                WorkerStatus.RUNNING,
                Instant.now(),
                0,
                metrics,
                state.registeredAt
            );
        });
    }

    @Override
    public WorkerState getWorkerState(String workerName) {
        WorkerStateInternal internal = workers.get(workerName);
        return internal != null ? internal.toPublic() : null;
    }

    @Override
    public Map<String, WorkerState> getAllWorkerStates() {
        return workers.entrySet().stream()
            .collect(Collectors.toMap(
                Map.Entry::getKey,
                e -> e.getValue().toPublic()
            ));
    }

    @Override
    public boolean isWorkerHealthy(String workerName) {
        WorkerStateInternal state = workers.get(workerName);
        return state != null && state.status == WorkerStatus.RUNNING;
    }

    @Override
    public HealthStatus getGlobalHealth() {
        if (workers.isEmpty()) {
            return HealthStatus.HEALTHY;
        }

        boolean hasUnhealthy = workers.values().stream()
            .anyMatch(s -> s.status == WorkerStatus.UNHEALTHY || s.status == WorkerStatus.STALE);

        if (hasUnhealthy) {
            return HealthStatus.UNHEALTHY;
        }

        boolean hasDegraded = workers.values().stream()
            .anyMatch(s -> s.status == WorkerStatus.DEGRADED);

        if (hasDegraded) {
            return HealthStatus.DEGRADED;
        }

        return HealthStatus.HEALTHY;
    }

    @Override
    public List<String> getUnhealthyWorkers() {
        return workers.entrySet().stream()
            .filter(e -> e.getValue().status == WorkerStatus.UNHEALTHY
                      || e.getValue().status == WorkerStatus.STALE)
            .map(Map.Entry::getKey)
            .toList();
    }

    @Override
    public void start() {
        running = true;
        long checkInterval = config.getSupervisor().getCheckIntervalMs();
        scheduler.scheduleAtFixedRate(
            this::checkWorkers,
            checkInterval,
            checkInterval,
            TimeUnit.MILLISECONDS
        );
        log.info("Supervisor started with check interval: {}ms", checkInterval);
    }

    @Override
    public void stop() {
        running = false;
        scheduler.shutdown();
        log.info("Supervisor stopped");
    }

    private void checkWorkers() {
        if (!running) return;

        long heartbeatInterval = config.getSupervisor().getHeartbeatIntervalMs();
        int unhealthyThreshold = config.getSupervisor().getUnhealthyThreshold();
        long staleTimeout = config.getSupervisor().getStaleTimeoutMs();

        Instant now = Instant.now();

        workers.replaceAll((name, state) -> {
            if (state.status == WorkerStatus.STOPPED) {
                return state;
            }

            long msSinceLastHeartbeat = Duration.between(state.lastHeartbeat, now).toMillis();

            // Stale check
            if (msSinceLastHeartbeat > staleTimeout) {
                if (state.status != WorkerStatus.STALE) {
                    log.warn("Worker STALE: {} (no heartbeat for {}ms)", name, msSinceLastHeartbeat);
                }
                return state.withStatus(WorkerStatus.STALE);
            }

            // Missed heartbeat check
            int expectedHeartbeats = (int) (msSinceLastHeartbeat / heartbeatInterval);
            if (expectedHeartbeats > 0) {
                int newMissedCount = state.missedHeartbeats + 1;

                if (newMissedCount >= unhealthyThreshold) {
                    if (state.status != WorkerStatus.UNHEALTHY) {
                        log.warn("Worker UNHEALTHY: {} (missed {} heartbeats)", name, newMissedCount);
                    }
                    return state.withStatus(WorkerStatus.UNHEALTHY).withMissedCount(newMissedCount);
                } else {
                    if (state.status == WorkerStatus.RUNNING) {
                        log.info("Worker DEGRADED: {} (missed {} heartbeats)", name, newMissedCount);
                    }
                    return state.withStatus(WorkerStatus.DEGRADED).withMissedCount(newMissedCount);
                }
            }

            return state;
        });
    }

    private record WorkerStateInternal(
        String workerName,
        WorkerStatus status,
        Instant lastHeartbeat,
        int missedHeartbeats,
        Map<String, Object> lastMetrics,
        Instant registeredAt
    ) {
        WorkerState toPublic() {
            return new WorkerState(workerName, status, lastHeartbeat, missedHeartbeats, lastMetrics, registeredAt);
        }

        WorkerStateInternal withStatus(WorkerStatus newStatus) {
            return new WorkerStateInternal(workerName, newStatus, lastHeartbeat, missedHeartbeats, lastMetrics, registeredAt);
        }

        WorkerStateInternal withMissedCount(int newCount) {
            return new WorkerStateInternal(workerName, status, lastHeartbeat, newCount, lastMetrics, registeredAt);
        }
    }
}

7. Heartbeat depuis les Workers

7.1 Heartbeat manuel

@Component
public class MyWorker implements Worker {

    @Autowired
    private Supervisor supervisor;

    @Override
    public void doWork() {
        // Envoyer heartbeat avec métriques
        supervisor.heartbeat(getName(), Map.of(
            "processed", processedCount,
            "queueSize", queue.size()
        ));

        // Traitement...
    }
}

7.2 Heartbeat automatique via AbstractWorker

public abstract class AbstractWorker implements Worker {

    @Autowired
    private Supervisor supervisor;

    @Override
    public final void doWork() {
        // Heartbeat automatique
        supervisor.heartbeat(getName(), getStats());

        // Appel au traitement réel
        doProcess();
    }

    protected abstract void doProcess();
}

7.3 Heartbeat via thread dédié

@Component
public class LongRunningWorker implements Worker {

    @Autowired
    private Supervisor supervisor;

    private ScheduledExecutorService heartbeatExecutor;

    @Override
    public void start() {
        heartbeatExecutor = Executors.newSingleThreadScheduledExecutor();
        heartbeatExecutor.scheduleAtFixedRate(
            () -> supervisor.heartbeat(getName()),
            0, 10, TimeUnit.SECONDS
        );
    }

    @Override
    public void stop() {
        if (heartbeatExecutor != null) {
            heartbeatExecutor.shutdown();
        }
    }

    @Override
    public void doWork() {
        // Long processing - heartbeat handled by separate thread
        processLongTask();
    }
}

8. API REST

8.1 Endpoints

@RestController
@RequestMapping("/admin")
public class SupervisorController {

    @Autowired
    private Supervisor supervisor;

    @GetMapping("/health")
    public ResponseEntity<HealthResponse> health() {
        HealthStatus status = supervisor.getGlobalHealth();
        return ResponseEntity
            .status(status == HealthStatus.HEALTHY ? 200 : 503)
            .body(new HealthResponse(status, supervisor.getUnhealthyWorkers()));
    }

    @GetMapping("/workers")
    public Map<String, WorkerState> workers() {
        return supervisor.getAllWorkerStates();
    }

    @GetMapping("/workers/{name}")
    public ResponseEntity<WorkerState> worker(@PathVariable String name) {
        WorkerState state = supervisor.getWorkerState(name);
        return state != null
            ? ResponseEntity.ok(state)
            : ResponseEntity.notFound().build();
    }
}

8.2 Réponses

// GET /admin/health
{
  "status": "HEALTHY",
  "unhealthyWorkers": []
}

// GET /admin/workers
{
  "kafka-consumer": {
    "workerName": "kafka-consumer",
    "status": "RUNNING",
    "lastHeartbeat": "2025-12-09T10:30:00Z",
    "missedHeartbeats": 0,
    "lastMetrics": {
      "processed": 12345,
      "lag": 23
    },
    "registeredAt": "2025-12-09T10:00:00Z"
  },
  "order-processor": {
    "workerName": "order-processor",
    "status": "DEGRADED",
    "lastHeartbeat": "2025-12-09T10:29:45Z",
    "missedHeartbeats": 1,
    "lastMetrics": {},
    "registeredAt": "2025-12-09T10:00:00Z"
  }
}

9. Intégration Kubernetes

9.1 Liveness Probe

livenessProbe:
  httpGet:
    path: /admin/health
    port: 8080
  initialDelaySeconds: 30
  periodSeconds: 10
  failureThreshold: 3

9.2 Readiness Probe

readinessProbe:
  httpGet:
    path: /admin/health
    port: 8080
  initialDelaySeconds: 5
  periodSeconds: 5

9.3 Health Controller adapté

@GetMapping("/health/live")
public ResponseEntity<Void> live() {
    // Liveness: l'application répond
    return ResponseEntity.ok().build();
}

@GetMapping("/health/ready")
public ResponseEntity<Void> ready() {
    // Readiness: tous les workers sont healthy
    HealthStatus status = supervisor.getGlobalHealth();
    return status == HealthStatus.HEALTHY
        ? ResponseEntity.ok().build()
        : ResponseEntity.status(503).build();
}

10. Métriques Prometheus

@Component
public class SupervisorMetrics {

    private final Supervisor supervisor;
    private final MeterRegistry registry;

    @PostConstruct
    public void registerMetrics() {
        Gauge.builder("socle_workers_total", supervisor,
            s -> s.getAllWorkerStates().size())
            .register(registry);

        Gauge.builder("socle_workers_healthy", supervisor,
            s -> s.getAllWorkerStates().values().stream()
                .filter(WorkerState::isHealthy).count())
            .register(registry);

        Gauge.builder("socle_workers_unhealthy", supervisor,
            s -> s.getUnhealthyWorkers().size())
            .register(registry);
    }
}

11. Bonnes pratiques

DO

  • Envoyer des heartbeats réguliers depuis tous les workers actifs
  • Inclure des métriques utiles dans les heartbeats
  • Configurer des timeouts adaptés à vos workers
  • Utiliser les probes Kubernetes pour la haute disponibilité

DON’T

  • Ne pas oublier d’envoyer des heartbeats dans les workers long-running
  • Ne pas ignorer les états DEGRADED
  • Ne pas configurer des timeouts trop courts (faux positifs)
  • Ne pas bloquer l’envoi de heartbeat avec du traitement lourd

12. Références

Commentaires

Laisser un commentaire

Votre adresse e-mail ne sera pas publiée. Les champs obligatoires sont indiqués avec *