08 – Supervisor
Version : 4.0.0 Date : 2025-12-09
1. Introduction
Le Supervisor est le composant de supervision du Socle V4. Il surveille l’état de santé des Workers via heartbeats et expose des métriques de santé.
Fonctionnalités
- Collecte des heartbeats des Workers
- Détection des Workers défaillants
- Agrégation de l’état de santé global
- Exposition via API REST et métriques
- Intégration avec SharedDataRegistry
2. Architecture
┌─────────────────────────────────────────────────────────────┐
│ Supervisor │
│ │
│ ┌──────────────────┐ ┌──────────────────┐ │
│ │ HeartbeatCollector│ │ HealthAggregator │ │
│ └────────┬─────────┘ └────────┬─────────┘ │
│ │ │ │
│ ▼ ▼ │
│ ┌──────────────────┐ ┌──────────────────┐ │
│ │ Worker States │ │ Global Health │ │
│ │ (per worker) │ │ (aggregated) │ │
│ └──────────────────┘ └──────────────────┘ │
│ │
└──────────────────────────────────────────────────────────────┘
│ │
▼ ▼
┌─────────────────┐ ┌─────────────────┐
│ /admin/health │ │ /admin/workers │
└─────────────────┘ └─────────────────┘
3. Configuration
3.1 application.yml
socle:
supervisor:
heartbeat-interval-ms: ${SUPERVISOR_HEARTBEAT_MS:10000}
unhealthy-threshold: ${SUPERVISOR_UNHEALTHY_THRESHOLD:3}
check-interval-ms: ${SUPERVISOR_CHECK_INTERVAL_MS:5000}
stale-timeout-ms: ${SUPERVISOR_STALE_TIMEOUT_MS:60000}
3.2 Variables d’environnement
| Variable | Description | Défaut |
|---|---|---|
SUPERVISOR_HEARTBEAT_MS |
Intervalle heartbeat attendu | 10000 (10s) |
SUPERVISOR_UNHEALTHY_THRESHOLD |
Heartbeats manqués avant UNHEALTHY | 3 |
SUPERVISOR_CHECK_INTERVAL_MS |
Intervalle de vérification | 5000 (5s) |
SUPERVISOR_STALE_TIMEOUT_MS |
Timeout avant worker STALE | 60000 (1min) |
4. Interface Supervisor
package eu.lmvi.socle.supervisor;
public interface Supervisor {
/**
* Enregistre un worker dans le supervisor
*/
void registerWorker(Worker worker);
/**
* Désenregistre un worker
*/
void unregisterWorker(String workerName);
/**
* Reçoit un heartbeat d'un worker
*/
void heartbeat(String workerName);
/**
* Reçoit un heartbeat avec métriques
*/
void heartbeat(String workerName, Map<String, Object> metrics);
/**
* Récupère l'état d'un worker
*/
WorkerState getWorkerState(String workerName);
/**
* Récupère l'état de tous les workers
*/
Map<String, WorkerState> getAllWorkerStates();
/**
* Vérifie si un worker est healthy
*/
boolean isWorkerHealthy(String workerName);
/**
* Récupère l'état de santé global
*/
HealthStatus getGlobalHealth();
/**
* Liste les workers unhealthy
*/
List<String> getUnhealthyWorkers();
/**
* Démarre la supervision
*/
void start();
/**
* Arrête la supervision
*/
void stop();
}
5. États des Workers
5.1 WorkerState
package eu.lmvi.socle.supervisor;
public record WorkerState(
String workerName,
WorkerStatus status,
Instant lastHeartbeat,
int missedHeartbeats,
Map<String, Object> lastMetrics,
Instant registeredAt
) {
public boolean isHealthy() {
return status == WorkerStatus.RUNNING;
}
public boolean isStale() {
return status == WorkerStatus.STALE;
}
}
5.2 WorkerStatus
public enum WorkerStatus {
/**
* Worker enregistré mais pas encore démarré
*/
REGISTERED,
/**
* Worker en cours d'exécution, heartbeats reçus
*/
RUNNING,
/**
* Heartbeats manqués mais pas encore timeout
*/
DEGRADED,
/**
* Trop de heartbeats manqués, worker considéré unhealthy
*/
UNHEALTHY,
/**
* Aucun heartbeat depuis longtemps, worker potentiellement mort
*/
STALE,
/**
* Worker arrêté proprement
*/
STOPPED
}
5.3 Diagramme d’états
┌────────────────┐
│ REGISTERED │
└───────┬────────┘
│ first heartbeat
▼
┌────────────────┐
┌─────────│ RUNNING │─────────┐
│ └───────┬────────┘ │
│ │ │
│ heartbeat │ missed │ stop()
│ received │ heartbeat │
│ ▼ │
│ ┌────────────────┐ │
└────────►│ DEGRADED │ │
└───────┬────────┘ │
│ threshold │
│ exceeded │
▼ │
┌────────────────┐ │
│ UNHEALTHY │ │
└───────┬────────┘ │
│ stale │
│ timeout │
▼ │
┌────────────────┐ │
│ STALE │ │
└────────────────┘ │
│
┌────────────────┐ │
│ STOPPED │◄────────┘
└────────────────┘
6. Implémentation
package eu.lmvi.socle.supervisor;
@Component
public class DefaultSupervisor implements Supervisor {
private static final Logger log = LoggerFactory.getLogger(DefaultSupervisor.class);
private final SocleConfiguration config;
private final ConcurrentHashMap<String, WorkerStateInternal> workers = new ConcurrentHashMap<>();
private final ScheduledExecutorService scheduler;
private volatile boolean running = false;
public DefaultSupervisor(SocleConfiguration config) {
this.config = config;
this.scheduler = Executors.newSingleThreadScheduledExecutor(
r -> new Thread(r, "supervisor-checker"));
}
@Override
public void registerWorker(Worker worker) {
String name = worker.getName();
workers.put(name, new WorkerStateInternal(
name, WorkerStatus.REGISTERED, Instant.now(), 0, Map.of(), Instant.now()));
log.info("Worker registered: {}", name);
}
@Override
public void unregisterWorker(String workerName) {
WorkerStateInternal state = workers.remove(workerName);
if (state != null) {
log.info("Worker unregistered: {}", workerName);
}
}
@Override
public void heartbeat(String workerName) {
heartbeat(workerName, Map.of());
}
@Override
public void heartbeat(String workerName, Map<String, Object> metrics) {
workers.computeIfPresent(workerName, (name, state) -> {
log.debug("Heartbeat received: {}", name);
return new WorkerStateInternal(
name,
WorkerStatus.RUNNING,
Instant.now(),
0,
metrics,
state.registeredAt
);
});
}
@Override
public WorkerState getWorkerState(String workerName) {
WorkerStateInternal internal = workers.get(workerName);
return internal != null ? internal.toPublic() : null;
}
@Override
public Map<String, WorkerState> getAllWorkerStates() {
return workers.entrySet().stream()
.collect(Collectors.toMap(
Map.Entry::getKey,
e -> e.getValue().toPublic()
));
}
@Override
public boolean isWorkerHealthy(String workerName) {
WorkerStateInternal state = workers.get(workerName);
return state != null && state.status == WorkerStatus.RUNNING;
}
@Override
public HealthStatus getGlobalHealth() {
if (workers.isEmpty()) {
return HealthStatus.HEALTHY;
}
boolean hasUnhealthy = workers.values().stream()
.anyMatch(s -> s.status == WorkerStatus.UNHEALTHY || s.status == WorkerStatus.STALE);
if (hasUnhealthy) {
return HealthStatus.UNHEALTHY;
}
boolean hasDegraded = workers.values().stream()
.anyMatch(s -> s.status == WorkerStatus.DEGRADED);
if (hasDegraded) {
return HealthStatus.DEGRADED;
}
return HealthStatus.HEALTHY;
}
@Override
public List<String> getUnhealthyWorkers() {
return workers.entrySet().stream()
.filter(e -> e.getValue().status == WorkerStatus.UNHEALTHY
|| e.getValue().status == WorkerStatus.STALE)
.map(Map.Entry::getKey)
.toList();
}
@Override
public void start() {
running = true;
long checkInterval = config.getSupervisor().getCheckIntervalMs();
scheduler.scheduleAtFixedRate(
this::checkWorkers,
checkInterval,
checkInterval,
TimeUnit.MILLISECONDS
);
log.info("Supervisor started with check interval: {}ms", checkInterval);
}
@Override
public void stop() {
running = false;
scheduler.shutdown();
log.info("Supervisor stopped");
}
private void checkWorkers() {
if (!running) return;
long heartbeatInterval = config.getSupervisor().getHeartbeatIntervalMs();
int unhealthyThreshold = config.getSupervisor().getUnhealthyThreshold();
long staleTimeout = config.getSupervisor().getStaleTimeoutMs();
Instant now = Instant.now();
workers.replaceAll((name, state) -> {
if (state.status == WorkerStatus.STOPPED) {
return state;
}
long msSinceLastHeartbeat = Duration.between(state.lastHeartbeat, now).toMillis();
// Stale check
if (msSinceLastHeartbeat > staleTimeout) {
if (state.status != WorkerStatus.STALE) {
log.warn("Worker STALE: {} (no heartbeat for {}ms)", name, msSinceLastHeartbeat);
}
return state.withStatus(WorkerStatus.STALE);
}
// Missed heartbeat check
int expectedHeartbeats = (int) (msSinceLastHeartbeat / heartbeatInterval);
if (expectedHeartbeats > 0) {
int newMissedCount = state.missedHeartbeats + 1;
if (newMissedCount >= unhealthyThreshold) {
if (state.status != WorkerStatus.UNHEALTHY) {
log.warn("Worker UNHEALTHY: {} (missed {} heartbeats)", name, newMissedCount);
}
return state.withStatus(WorkerStatus.UNHEALTHY).withMissedCount(newMissedCount);
} else {
if (state.status == WorkerStatus.RUNNING) {
log.info("Worker DEGRADED: {} (missed {} heartbeats)", name, newMissedCount);
}
return state.withStatus(WorkerStatus.DEGRADED).withMissedCount(newMissedCount);
}
}
return state;
});
}
private record WorkerStateInternal(
String workerName,
WorkerStatus status,
Instant lastHeartbeat,
int missedHeartbeats,
Map<String, Object> lastMetrics,
Instant registeredAt
) {
WorkerState toPublic() {
return new WorkerState(workerName, status, lastHeartbeat, missedHeartbeats, lastMetrics, registeredAt);
}
WorkerStateInternal withStatus(WorkerStatus newStatus) {
return new WorkerStateInternal(workerName, newStatus, lastHeartbeat, missedHeartbeats, lastMetrics, registeredAt);
}
WorkerStateInternal withMissedCount(int newCount) {
return new WorkerStateInternal(workerName, status, lastHeartbeat, newCount, lastMetrics, registeredAt);
}
}
}
7. Heartbeat depuis les Workers
7.1 Heartbeat manuel
@Component
public class MyWorker implements Worker {
@Autowired
private Supervisor supervisor;
@Override
public void doWork() {
// Envoyer heartbeat avec métriques
supervisor.heartbeat(getName(), Map.of(
"processed", processedCount,
"queueSize", queue.size()
));
// Traitement...
}
}
7.2 Heartbeat automatique via AbstractWorker
public abstract class AbstractWorker implements Worker {
@Autowired
private Supervisor supervisor;
@Override
public final void doWork() {
// Heartbeat automatique
supervisor.heartbeat(getName(), getStats());
// Appel au traitement réel
doProcess();
}
protected abstract void doProcess();
}
7.3 Heartbeat via thread dédié
@Component
public class LongRunningWorker implements Worker {
@Autowired
private Supervisor supervisor;
private ScheduledExecutorService heartbeatExecutor;
@Override
public void start() {
heartbeatExecutor = Executors.newSingleThreadScheduledExecutor();
heartbeatExecutor.scheduleAtFixedRate(
() -> supervisor.heartbeat(getName()),
0, 10, TimeUnit.SECONDS
);
}
@Override
public void stop() {
if (heartbeatExecutor != null) {
heartbeatExecutor.shutdown();
}
}
@Override
public void doWork() {
// Long processing - heartbeat handled by separate thread
processLongTask();
}
}
8. API REST
8.1 Endpoints
@RestController
@RequestMapping("/admin")
public class SupervisorController {
@Autowired
private Supervisor supervisor;
@GetMapping("/health")
public ResponseEntity<HealthResponse> health() {
HealthStatus status = supervisor.getGlobalHealth();
return ResponseEntity
.status(status == HealthStatus.HEALTHY ? 200 : 503)
.body(new HealthResponse(status, supervisor.getUnhealthyWorkers()));
}
@GetMapping("/workers")
public Map<String, WorkerState> workers() {
return supervisor.getAllWorkerStates();
}
@GetMapping("/workers/{name}")
public ResponseEntity<WorkerState> worker(@PathVariable String name) {
WorkerState state = supervisor.getWorkerState(name);
return state != null
? ResponseEntity.ok(state)
: ResponseEntity.notFound().build();
}
}
8.2 Réponses
// GET /admin/health
{
"status": "HEALTHY",
"unhealthyWorkers": []
}
// GET /admin/workers
{
"kafka-consumer": {
"workerName": "kafka-consumer",
"status": "RUNNING",
"lastHeartbeat": "2025-12-09T10:30:00Z",
"missedHeartbeats": 0,
"lastMetrics": {
"processed": 12345,
"lag": 23
},
"registeredAt": "2025-12-09T10:00:00Z"
},
"order-processor": {
"workerName": "order-processor",
"status": "DEGRADED",
"lastHeartbeat": "2025-12-09T10:29:45Z",
"missedHeartbeats": 1,
"lastMetrics": {},
"registeredAt": "2025-12-09T10:00:00Z"
}
}
9. Intégration Kubernetes
9.1 Liveness Probe
livenessProbe:
httpGet:
path: /admin/health
port: 8080
initialDelaySeconds: 30
periodSeconds: 10
failureThreshold: 3
9.2 Readiness Probe
readinessProbe:
httpGet:
path: /admin/health
port: 8080
initialDelaySeconds: 5
periodSeconds: 5
9.3 Health Controller adapté
@GetMapping("/health/live")
public ResponseEntity<Void> live() {
// Liveness: l'application répond
return ResponseEntity.ok().build();
}
@GetMapping("/health/ready")
public ResponseEntity<Void> ready() {
// Readiness: tous les workers sont healthy
HealthStatus status = supervisor.getGlobalHealth();
return status == HealthStatus.HEALTHY
? ResponseEntity.ok().build()
: ResponseEntity.status(503).build();
}
10. Métriques Prometheus
@Component
public class SupervisorMetrics {
private final Supervisor supervisor;
private final MeterRegistry registry;
@PostConstruct
public void registerMetrics() {
Gauge.builder("socle_workers_total", supervisor,
s -> s.getAllWorkerStates().size())
.register(registry);
Gauge.builder("socle_workers_healthy", supervisor,
s -> s.getAllWorkerStates().values().stream()
.filter(WorkerState::isHealthy).count())
.register(registry);
Gauge.builder("socle_workers_unhealthy", supervisor,
s -> s.getUnhealthyWorkers().size())
.register(registry);
}
}
11. Bonnes pratiques
DO
- Envoyer des heartbeats réguliers depuis tous les workers actifs
- Inclure des métriques utiles dans les heartbeats
- Configurer des timeouts adaptés à vos workers
- Utiliser les probes Kubernetes pour la haute disponibilité
DON’T
- Ne pas oublier d’envoyer des heartbeats dans les workers long-running
- Ne pas ignorer les états DEGRADED
- Ne pas configurer des timeouts trop courts (faux positifs)
- Ne pas bloquer l’envoi de heartbeat avec du traitement lourd
12. Références
- 05-WORKERS – Workers
- 07-SHARED-DATA – SharedDataRegistry
- 14-ADMIN-API – API Admin
- 24-WORKER-REGISTRY – Registry central (V4)

Laisser un commentaire