24 – Client Worker Registry (Nouveauté V4)
Version : 4.0.0 Date : 2025-12-09
1. Introduction
Le WorkerRegistryClient permet aux applications Socle V4 de s’auto-enregistrer auprès d’un Registry central pour la supervision.
Bénéfices
- Visibilité : Savoir quels workers sont actifs par région
- Supervision : Détecter les workers « LOST » (heartbeat manquant)
- Diagnostique : Informations version, capabilities, charge
2. Architecture
┌─────────────────┐ ┌─────────────────┐
│ Application │ │ Registry │
│ Socle V4 │ │ (central) │
└────────┬────────┘ └────────┬────────┘
│ │
│ 1. POST /workers/register │
│ (au démarrage) │
│─────────────────────────────────────►│
│ │
│ 2. POST /workers/heartbeat │
│ (toutes les 30s) │
│─────────────────────────────────────►│
│ │
│ 3. DELETE /workers/{id} │
│ (à l'arrêt) │
│─────────────────────────────────────►│
│ │
┌─────────────────┐
│ Metabase / │
│ Grafana │◄────── Consultation
└─────────────────┘
3. Configuration
3.1 application.yml
socle:
worker-registry:
enabled: ${WORKER_REGISTRY_ENABLED:false}
server-url: ${WORKER_REGISTRY_URL:https://registry.lmvi.org}
heartbeat-interval-ms: ${REGISTRY_HEARTBEAT_INTERVAL_MS:30000}
connect-timeout-ms: 10000
read-timeout-ms: 30000
3.2 Variables d’environnement
| Variable | Description | Défaut |
|---|---|---|
WORKER_REGISTRY_ENABLED |
Activer le registry | false |
WORKER_REGISTRY_URL |
URL du Registry | – |
REGISTRY_HEARTBEAT_INTERVAL_MS |
Intervalle heartbeat (ms) | 30000 |
4. Interface WorkerRegistryClient
package eu.lmvi.socle.client.registry;
/**
* Client Registry pour auto-enregistrement des Workers
*/
public interface WorkerRegistryClient {
/**
* Enregistrement initial au démarrage
* @param registration Informations du worker
* @throws RegistryException si échec
*/
void register(WorkerRegistration registration) throws RegistryException;
/**
* Heartbeat périodique
* @param heartbeat État courant du worker
* @throws RegistryException si échec
*/
void heartbeat(WorkerHeartbeat heartbeat) throws RegistryException;
/**
* Désenregistrement à l'arrêt
* @param workerId ID du worker
* @throws RegistryException si échec
*/
void unregister(String workerId) throws RegistryException;
/**
* Vérifie si le worker est enregistré
*/
boolean isRegistered();
}
5. DTOs
5.1 WorkerRegistration
package eu.lmvi.socle.client.registry;
/**
* Informations d'enregistrement d'un worker
*/
public record WorkerRegistration(
String workerId, // Identifiant unique (ex: "AGENT-DB2-MTQ-001")
String workerType, // Type de worker (ex: "journal-reader")
String region, // Région (ex: "MTQ", "GUA", "REU")
String host, // Hostname
String version, // Version de l'application
List<String> capabilities,// Capacités (ex: ["db2-cdc", "nats-publisher"])
Map<String, Object> extra // Métadonnées additionnelles
) {
public static WorkerRegistration of(SocleConfiguration config) {
return new WorkerRegistration(
config.getExec_id(),
config.getApp_name(),
config.getRegion(),
getHostname(),
config.getVersion(),
List.of(),
Map.of()
);
}
private static String getHostname() {
try {
return InetAddress.getLocalHost().getHostName();
} catch (UnknownHostException e) {
return "unknown";
}
}
}
5.2 WorkerHeartbeat
package eu.lmvi.socle.client.registry;
/**
* Heartbeat périodique d'un worker
*/
public record WorkerHeartbeat(
String workerId, // ID du worker
String status, // RUNNING, STOPPING, ERROR
Map<String, Object> load // Métriques de charge
) {
public static WorkerHeartbeat running(String workerId, Map<String, Object> load) {
return new WorkerHeartbeat(workerId, "RUNNING", load);
}
public static WorkerHeartbeat stopping(String workerId) {
return new WorkerHeartbeat(workerId, "STOPPING", Map.of());
}
public static WorkerHeartbeat error(String workerId, String errorMessage) {
return new WorkerHeartbeat(workerId, "ERROR", Map.of("error", errorMessage));
}
}
6. Implémentation
package eu.lmvi.socle.client.registry;
@Component
@ConditionalOnProperty(name = "socle.worker-registry.enabled", havingValue = "true")
public class HttpWorkerRegistryClient implements WorkerRegistryClient {
private static final Logger log = LoggerFactory.getLogger(HttpWorkerRegistryClient.class);
private final SocleConfiguration config;
private final SocleAuthClient authClient;
private final OkHttpClient httpClient;
private final ObjectMapper objectMapper;
private volatile boolean registered = false;
private volatile String currentWorkerId;
public HttpWorkerRegistryClient(
SocleConfiguration config,
@Autowired(required = false) SocleAuthClient authClient) {
this.config = config;
this.authClient = authClient;
this.objectMapper = new ObjectMapper();
this.httpClient = new OkHttpClient.Builder()
.connectTimeout(config.getRegistryConnectTimeoutMs(), TimeUnit.MILLISECONDS)
.readTimeout(config.getRegistryReadTimeoutMs(), TimeUnit.MILLISECONDS)
.build();
}
@Override
public void register(WorkerRegistration registration) throws RegistryException {
log.info("Registering worker: {} ({})", registration.workerId(), registration.workerType());
try {
String json = objectMapper.writeValueAsString(registration);
Request.Builder requestBuilder = new Request.Builder()
.url(config.getRegistryServerUrl() + "/api/v1/workers/register")
.post(RequestBody.create(json, MediaType.parse("application/json")));
// Add auth if available
if (authClient != null && authClient.isAuthenticated()) {
requestBuilder.header("Authorization", "Bearer " + authClient.getValidAccessToken());
}
try (Response response = httpClient.newCall(requestBuilder.build()).execute()) {
if (!response.isSuccessful()) {
throw new RegistryException("Registration failed: " + response.code());
}
registered = true;
currentWorkerId = registration.workerId();
log.info("Worker registered successfully: {}", registration.workerId());
}
} catch (IOException e) {
throw new RegistryException("Registration failed", e);
}
}
@Override
public void heartbeat(WorkerHeartbeat heartbeat) throws RegistryException {
if (!registered) {
log.warn("Cannot send heartbeat, worker not registered");
return;
}
log.debug("Sending heartbeat: {} - {}", heartbeat.workerId(), heartbeat.status());
try {
String json = objectMapper.writeValueAsString(heartbeat);
Request.Builder requestBuilder = new Request.Builder()
.url(config.getRegistryServerUrl() + "/api/v1/workers/heartbeat")
.post(RequestBody.create(json, MediaType.parse("application/json")));
if (authClient != null && authClient.isAuthenticated()) {
requestBuilder.header("Authorization", "Bearer " + authClient.getValidAccessToken());
}
try (Response response = httpClient.newCall(requestBuilder.build()).execute()) {
if (!response.isSuccessful()) {
log.warn("Heartbeat failed: {}", response.code());
// Don't throw - heartbeat failure is not critical
}
}
} catch (IOException e) {
log.warn("Heartbeat failed: {}", e.getMessage());
// Don't throw - heartbeat failure is not critical
}
}
@Override
public void unregister(String workerId) throws RegistryException {
if (!registered) {
return;
}
log.info("Unregistering worker: {}", workerId);
try {
Request.Builder requestBuilder = new Request.Builder()
.url(config.getRegistryServerUrl() + "/api/v1/workers/" + workerId)
.delete();
if (authClient != null && authClient.isAuthenticated()) {
requestBuilder.header("Authorization", "Bearer " + authClient.getValidAccessToken());
}
try (Response response = httpClient.newCall(requestBuilder.build()).execute()) {
// Ignore response - best effort
registered = false;
currentWorkerId = null;
log.info("Worker unregistered: {}", workerId);
}
} catch (IOException e) {
log.warn("Unregister failed: {}", e.getMessage());
// Don't throw - unregister failure is not critical
}
}
@Override
public boolean isRegistered() {
return registered;
}
}
7. Intégration avec MOP
7.1 Enregistrement au démarrage
// Dans MainOrchestratorProcess.start()
if (registryClient != null && config.isWorkerRegistryEnabled()) {
log.info("[step:registry_register] Enregistrement au Worker Registry");
try {
WorkerRegistration registration = new WorkerRegistration(
config.getExec_id(),
config.getApp_name(),
config.getRegion(),
InetAddress.getLocalHost().getHostName(),
config.getVersion(),
getWorkerCapabilities(),
Map.of(
"startTime", Instant.now(),
"javaVersion", System.getProperty("java.version")
)
);
registryClient.register(registration);
} catch (RegistryException e) {
log.warn("Registry registration failed, continuing", e);
}
}
7.2 Heartbeat périodique
// Dans la boucle principale ou via ScheduledExecutorService
private void sendRegistryHeartbeat() {
if (registryClient == null || !registryClient.isRegistered()) {
return;
}
try {
WorkerHeartbeat heartbeat = new WorkerHeartbeat(
config.getExec_id(),
"RUNNING",
Map.of(
"uptime", getUptime(),
"workersCount", workers.size(),
"healthyWorkers", countHealthyWorkers(),
"memoryUsedMb", getMemoryUsedMb()
)
);
registryClient.heartbeat(heartbeat);
} catch (RegistryException e) {
log.debug("Heartbeat failed: {}", e.getMessage());
}
}
7.3 Désenregistrement à l’arrêt
// Dans MainOrchestratorProcess.gracefulShutdown()
if (registryClient != null && registryClient.isRegistered()) {
log.info("[step:registry_unregister] Désenregistrement du Registry");
try {
registryClient.unregister(config.getExec_id());
} catch (RegistryException e) {
log.warn("Unregister failed", e);
}
}
8. Exemple de données
8.1 Registration
{
"workerId": "AGENT-DB2-MTQ-001",
"workerType": "journal-reader",
"region": "MTQ",
"host": "mtq-nuc-01",
"version": "4.0.0",
"capabilities": ["db2-cdc", "nats-publisher"],
"extra": {
"journal": "QUSR0023",
"library": "IMA001FDMQ",
"startTime": "2025-12-09T10:00:00Z",
"javaVersion": "21.0.1"
}
}
8.2 Heartbeat
{
"workerId": "AGENT-DB2-MTQ-001",
"status": "RUNNING",
"load": {
"uptime": 3600,
"messagesPerMinute": 523,
"lastSequence": 123456789,
"memoryUsedMb": 256,
"cpuPercent": 15
}
}
9. Côté serveur (Registry central)
9.1 Table worker_registry
CREATE TABLE worker_registry (
id BIGSERIAL PRIMARY KEY,
worker_id VARCHAR(200) NOT NULL UNIQUE,
worker_type VARCHAR(100) NOT NULL,
region VARCHAR(50),
host VARCHAR(200),
version VARCHAR(50),
status VARCHAR(20) DEFAULT 'UNKNOWN',
registered_at TIMESTAMPTZ DEFAULT NOW(),
last_heartbeat TIMESTAMPTZ,
capabilities JSONB,
extra JSONB,
load JSONB
);
CREATE INDEX idx_worker_registry_region ON worker_registry(region);
CREATE INDEX idx_worker_registry_type ON worker_registry(worker_type);
CREATE INDEX idx_worker_registry_status ON worker_registry(status);
9.2 Détection des workers LOST
-- Workers sans heartbeat depuis plus de 2 minutes
UPDATE worker_registry
SET status = 'LOST'
WHERE status = 'RUNNING'
AND last_heartbeat < NOW() - INTERVAL '2 minutes';
9.3 Dashboard Metabase/Grafana
-- Workers actifs par région
SELECT region, COUNT(*) as count
FROM worker_registry
WHERE status = 'RUNNING'
GROUP BY region;
-- Workers LOST
SELECT worker_id, region, last_heartbeat
FROM worker_registry
WHERE status = 'LOST';
10. Bonnes pratiques
DO
- ✅ Enregistrer au démarrage, désenregistrer à l’arrêt
- ✅ Heartbeat régulier (30s recommandé)
- ✅ Inclure des métriques utiles dans le heartbeat
- ✅ Gérer gracieusement les échecs (non bloquant)
DON’T
- ❌ Heartbeat trop fréquent (< 10s)
- ❌ Bloquer sur les erreurs registry
- ❌ Stocker des données sensibles dans extra
11. Troubleshooting
Worker non visible dans le dashboard
- Vérifier
WORKER_REGISTRY_ENABLED=true - Vérifier
WORKER_REGISTRY_URL - Vérifier les logs : « Worker registered successfully »
Worker marqué LOST
- Vérifier que l’application tourne
- Vérifier la connectivité réseau
- Vérifier les logs heartbeat
Erreur 401
- Vérifier que
AUTH_ENABLED=true - Vérifier
API_KEY - Vérifier que l’auth fonctionne
12. Références
- 08-SUPERVISOR – Supervision locale
- 23-AUTH-CLIENT – Authentification JWT
- 02-ARCHITECTURE – Architecture









