Socle V004 – Worker Registry

Socle V004 - Worker Registry

24 – Client Worker Registry (Nouveauté V4)

Version : 4.0.0 Date : 2025-12-09

1. Introduction

Le WorkerRegistryClient permet aux applications Socle V4 de s’auto-enregistrer auprès d’un Registry central pour la supervision.

Bénéfices

  • Visibilité : Savoir quels workers sont actifs par région
  • Supervision : Détecter les workers « LOST » (heartbeat manquant)
  • Diagnostique : Informations version, capabilities, charge

2. Architecture

┌─────────────────┐                    ┌─────────────────┐
│   Application   │                    │  Registry       │
│   Socle V4      │                    │  (central)      │
└────────┬────────┘                    └────────┬────────┘
         │                                      │
         │  1. POST /workers/register           │
         │     (au démarrage)                   │
         │─────────────────────────────────────►│
         │                                      │
         │  2. POST /workers/heartbeat          │
         │     (toutes les 30s)                 │
         │─────────────────────────────────────►│
         │                                      │
         │  3. DELETE /workers/{id}             │
         │     (à l'arrêt)                      │
         │─────────────────────────────────────►│
         │                                      │

                    ┌─────────────────┐
                    │  Metabase /     │
                    │  Grafana        │◄────── Consultation
                    └─────────────────┘

3. Configuration

3.1 application.yml

socle:
  worker-registry:
    enabled: ${WORKER_REGISTRY_ENABLED:false}
    server-url: ${WORKER_REGISTRY_URL:https://registry.lmvi.org}
    heartbeat-interval-ms: ${REGISTRY_HEARTBEAT_INTERVAL_MS:30000}
    connect-timeout-ms: 10000
    read-timeout-ms: 30000

3.2 Variables d’environnement

Variable Description Défaut
WORKER_REGISTRY_ENABLED Activer le registry false
WORKER_REGISTRY_URL URL du Registry
REGISTRY_HEARTBEAT_INTERVAL_MS Intervalle heartbeat (ms) 30000

4. Interface WorkerRegistryClient

package eu.lmvi.socle.client.registry;

/**
 * Client Registry pour auto-enregistrement des Workers
 */
public interface WorkerRegistryClient {

    /**
     * Enregistrement initial au démarrage
     * @param registration Informations du worker
     * @throws RegistryException si échec
     */
    void register(WorkerRegistration registration) throws RegistryException;

    /**
     * Heartbeat périodique
     * @param heartbeat État courant du worker
     * @throws RegistryException si échec
     */
    void heartbeat(WorkerHeartbeat heartbeat) throws RegistryException;

    /**
     * Désenregistrement à l'arrêt
     * @param workerId ID du worker
     * @throws RegistryException si échec
     */
    void unregister(String workerId) throws RegistryException;

    /**
     * Vérifie si le worker est enregistré
     */
    boolean isRegistered();
}

5. DTOs

5.1 WorkerRegistration

package eu.lmvi.socle.client.registry;

/**
 * Informations d'enregistrement d'un worker
 */
public record WorkerRegistration(
    String workerId,          // Identifiant unique (ex: "AGENT-DB2-MTQ-001")
    String workerType,        // Type de worker (ex: "journal-reader")
    String region,            // Région (ex: "MTQ", "GUA", "REU")
    String host,              // Hostname
    String version,           // Version de l'application
    List<String> capabilities,// Capacités (ex: ["db2-cdc", "nats-publisher"])
    Map<String, Object> extra // Métadonnées additionnelles
) {
    public static WorkerRegistration of(SocleConfiguration config) {
        return new WorkerRegistration(
            config.getExec_id(),
            config.getApp_name(),
            config.getRegion(),
            getHostname(),
            config.getVersion(),
            List.of(),
            Map.of()
        );
    }

    private static String getHostname() {
        try {
            return InetAddress.getLocalHost().getHostName();
        } catch (UnknownHostException e) {
            return "unknown";
        }
    }
}

5.2 WorkerHeartbeat

package eu.lmvi.socle.client.registry;

/**
 * Heartbeat périodique d'un worker
 */
public record WorkerHeartbeat(
    String workerId,           // ID du worker
    String status,             // RUNNING, STOPPING, ERROR
    Map<String, Object> load   // Métriques de charge
) {
    public static WorkerHeartbeat running(String workerId, Map<String, Object> load) {
        return new WorkerHeartbeat(workerId, "RUNNING", load);
    }

    public static WorkerHeartbeat stopping(String workerId) {
        return new WorkerHeartbeat(workerId, "STOPPING", Map.of());
    }

    public static WorkerHeartbeat error(String workerId, String errorMessage) {
        return new WorkerHeartbeat(workerId, "ERROR", Map.of("error", errorMessage));
    }
}

6. Implémentation

package eu.lmvi.socle.client.registry;

@Component
@ConditionalOnProperty(name = "socle.worker-registry.enabled", havingValue = "true")
public class HttpWorkerRegistryClient implements WorkerRegistryClient {

    private static final Logger log = LoggerFactory.getLogger(HttpWorkerRegistryClient.class);

    private final SocleConfiguration config;
    private final SocleAuthClient authClient;
    private final OkHttpClient httpClient;
    private final ObjectMapper objectMapper;

    private volatile boolean registered = false;
    private volatile String currentWorkerId;

    public HttpWorkerRegistryClient(
            SocleConfiguration config,
            @Autowired(required = false) SocleAuthClient authClient) {
        this.config = config;
        this.authClient = authClient;
        this.objectMapper = new ObjectMapper();

        this.httpClient = new OkHttpClient.Builder()
            .connectTimeout(config.getRegistryConnectTimeoutMs(), TimeUnit.MILLISECONDS)
            .readTimeout(config.getRegistryReadTimeoutMs(), TimeUnit.MILLISECONDS)
            .build();
    }

    @Override
    public void register(WorkerRegistration registration) throws RegistryException {
        log.info("Registering worker: {} ({})", registration.workerId(), registration.workerType());

        try {
            String json = objectMapper.writeValueAsString(registration);

            Request.Builder requestBuilder = new Request.Builder()
                .url(config.getRegistryServerUrl() + "/api/v1/workers/register")
                .post(RequestBody.create(json, MediaType.parse("application/json")));

            // Add auth if available
            if (authClient != null && authClient.isAuthenticated()) {
                requestBuilder.header("Authorization", "Bearer " + authClient.getValidAccessToken());
            }

            try (Response response = httpClient.newCall(requestBuilder.build()).execute()) {
                if (!response.isSuccessful()) {
                    throw new RegistryException("Registration failed: " + response.code());
                }

                registered = true;
                currentWorkerId = registration.workerId();
                log.info("Worker registered successfully: {}", registration.workerId());
            }
        } catch (IOException e) {
            throw new RegistryException("Registration failed", e);
        }
    }

    @Override
    public void heartbeat(WorkerHeartbeat heartbeat) throws RegistryException {
        if (!registered) {
            log.warn("Cannot send heartbeat, worker not registered");
            return;
        }

        log.debug("Sending heartbeat: {} - {}", heartbeat.workerId(), heartbeat.status());

        try {
            String json = objectMapper.writeValueAsString(heartbeat);

            Request.Builder requestBuilder = new Request.Builder()
                .url(config.getRegistryServerUrl() + "/api/v1/workers/heartbeat")
                .post(RequestBody.create(json, MediaType.parse("application/json")));

            if (authClient != null && authClient.isAuthenticated()) {
                requestBuilder.header("Authorization", "Bearer " + authClient.getValidAccessToken());
            }

            try (Response response = httpClient.newCall(requestBuilder.build()).execute()) {
                if (!response.isSuccessful()) {
                    log.warn("Heartbeat failed: {}", response.code());
                    // Don't throw - heartbeat failure is not critical
                }
            }
        } catch (IOException e) {
            log.warn("Heartbeat failed: {}", e.getMessage());
            // Don't throw - heartbeat failure is not critical
        }
    }

    @Override
    public void unregister(String workerId) throws RegistryException {
        if (!registered) {
            return;
        }

        log.info("Unregistering worker: {}", workerId);

        try {
            Request.Builder requestBuilder = new Request.Builder()
                .url(config.getRegistryServerUrl() + "/api/v1/workers/" + workerId)
                .delete();

            if (authClient != null && authClient.isAuthenticated()) {
                requestBuilder.header("Authorization", "Bearer " + authClient.getValidAccessToken());
            }

            try (Response response = httpClient.newCall(requestBuilder.build()).execute()) {
                // Ignore response - best effort
                registered = false;
                currentWorkerId = null;
                log.info("Worker unregistered: {}", workerId);
            }
        } catch (IOException e) {
            log.warn("Unregister failed: {}", e.getMessage());
            // Don't throw - unregister failure is not critical
        }
    }

    @Override
    public boolean isRegistered() {
        return registered;
    }
}

7. Intégration avec MOP

7.1 Enregistrement au démarrage

// Dans MainOrchestratorProcess.start()
if (registryClient != null && config.isWorkerRegistryEnabled()) {
    log.info("[step:registry_register] Enregistrement au Worker Registry");
    try {
        WorkerRegistration registration = new WorkerRegistration(
            config.getExec_id(),
            config.getApp_name(),
            config.getRegion(),
            InetAddress.getLocalHost().getHostName(),
            config.getVersion(),
            getWorkerCapabilities(),
            Map.of(
                "startTime", Instant.now(),
                "javaVersion", System.getProperty("java.version")
            )
        );
        registryClient.register(registration);
    } catch (RegistryException e) {
        log.warn("Registry registration failed, continuing", e);
    }
}

7.2 Heartbeat périodique

// Dans la boucle principale ou via ScheduledExecutorService
private void sendRegistryHeartbeat() {
    if (registryClient == null || !registryClient.isRegistered()) {
        return;
    }

    try {
        WorkerHeartbeat heartbeat = new WorkerHeartbeat(
            config.getExec_id(),
            "RUNNING",
            Map.of(
                "uptime", getUptime(),
                "workersCount", workers.size(),
                "healthyWorkers", countHealthyWorkers(),
                "memoryUsedMb", getMemoryUsedMb()
            )
        );
        registryClient.heartbeat(heartbeat);
    } catch (RegistryException e) {
        log.debug("Heartbeat failed: {}", e.getMessage());
    }
}

7.3 Désenregistrement à l’arrêt

// Dans MainOrchestratorProcess.gracefulShutdown()
if (registryClient != null && registryClient.isRegistered()) {
    log.info("[step:registry_unregister] Désenregistrement du Registry");
    try {
        registryClient.unregister(config.getExec_id());
    } catch (RegistryException e) {
        log.warn("Unregister failed", e);
    }
}

8. Exemple de données

8.1 Registration

{
  "workerId": "AGENT-DB2-MTQ-001",
  "workerType": "journal-reader",
  "region": "MTQ",
  "host": "mtq-nuc-01",
  "version": "4.0.0",
  "capabilities": ["db2-cdc", "nats-publisher"],
  "extra": {
    "journal": "QUSR0023",
    "library": "IMA001FDMQ",
    "startTime": "2025-12-09T10:00:00Z",
    "javaVersion": "21.0.1"
  }
}

8.2 Heartbeat

{
  "workerId": "AGENT-DB2-MTQ-001",
  "status": "RUNNING",
  "load": {
    "uptime": 3600,
    "messagesPerMinute": 523,
    "lastSequence": 123456789,
    "memoryUsedMb": 256,
    "cpuPercent": 15
  }
}

9. Côté serveur (Registry central)

9.1 Table worker_registry

CREATE TABLE worker_registry (
    id BIGSERIAL PRIMARY KEY,
    worker_id VARCHAR(200) NOT NULL UNIQUE,
    worker_type VARCHAR(100) NOT NULL,
    region VARCHAR(50),
    host VARCHAR(200),
    version VARCHAR(50),
    status VARCHAR(20) DEFAULT 'UNKNOWN',
    registered_at TIMESTAMPTZ DEFAULT NOW(),
    last_heartbeat TIMESTAMPTZ,
    capabilities JSONB,
    extra JSONB,
    load JSONB
);

CREATE INDEX idx_worker_registry_region ON worker_registry(region);
CREATE INDEX idx_worker_registry_type ON worker_registry(worker_type);
CREATE INDEX idx_worker_registry_status ON worker_registry(status);

9.2 Détection des workers LOST

-- Workers sans heartbeat depuis plus de 2 minutes
UPDATE worker_registry
SET status = 'LOST'
WHERE status = 'RUNNING'
  AND last_heartbeat < NOW() - INTERVAL '2 minutes';

9.3 Dashboard Metabase/Grafana

-- Workers actifs par région
SELECT region, COUNT(*) as count
FROM worker_registry
WHERE status = 'RUNNING'
GROUP BY region;

-- Workers LOST
SELECT worker_id, region, last_heartbeat
FROM worker_registry
WHERE status = 'LOST';

10. Bonnes pratiques

DO

  • ✅ Enregistrer au démarrage, désenregistrer à l’arrêt
  • ✅ Heartbeat régulier (30s recommandé)
  • ✅ Inclure des métriques utiles dans le heartbeat
  • ✅ Gérer gracieusement les échecs (non bloquant)

DON’T

  • ❌ Heartbeat trop fréquent (< 10s)
  • ❌ Bloquer sur les erreurs registry
  • ❌ Stocker des données sensibles dans extra

11. Troubleshooting

Worker non visible dans le dashboard

  1. Vérifier WORKER_REGISTRY_ENABLED=true
  2. Vérifier WORKER_REGISTRY_URL
  3. Vérifier les logs : « Worker registered successfully »

Worker marqué LOST

  1. Vérifier que l’application tourne
  2. Vérifier la connectivité réseau
  3. Vérifier les logs heartbeat

Erreur 401

  1. Vérifier que AUTH_ENABLED=true
  2. Vérifier API_KEY
  3. Vérifier que l’auth fonctionne

12. Références

Commentaires

Laisser un commentaire

Votre adresse e-mail ne sera pas publiée. Les champs obligatoires sont indiqués avec *