Socle V004 – Données Partagées

Socle V004 - Données Partagées

07 – SharedDataRegistry

Version : 4.0.0 Date : 2025-12-09

1. Introduction

SharedDataRegistry est un registre centralisé pour partager des données entre Workers au sein d’une même instance. Il fournit des opérations atomiques et un système de niveaux de santé.

Différence avec KvBus

Aspect SharedDataRegistry KvBus
Scope Intra-instance Optionnel inter-instances (Redis)
Performance Ultra rapide (mémoire) Variable (réseau si Redis)
Types Fortement typés Strings/JSON
Health levels Oui Non
Callbacks Oui Non

2. Interface SharedDataRegistry

package eu.lmvi.socle.shared;

public interface SharedDataRegistry {

    // === Key-Value basique ===

    void put(String key, Object value);
    void put(String key, Object value, HealthLevel level);
    Optional<Object> get(String key);
    <T> Optional<T> get(String key, Class<T> type);
    void delete(String key);
    boolean exists(String key);

    // === Typed getters ===

    Optional<String> getString(String key);
    Optional<Integer> getInt(String key);
    Optional<Long> getLong(String key);
    Optional<Double> getDouble(String key);
    Optional<Boolean> getBoolean(String key);

    // === Sequences (compteurs atomiques) ===

    void createSequence(String key, long initialValue, HealthLevel level);
    long incrementSequence(String key);
    long incrementSequence(String key, long delta);
    long getSequence(String key);
    void setSequence(String key, long value);

    // === Lists ===

    <T> void addToList(String key, T item);
    <T> List<T> getList(String key, Class<T> type);
    void clearList(String key);

    // === Maps ===

    <V> void putInMap(String key, String mapKey, V value);
    <V> Optional<V> getFromMap(String key, String mapKey, Class<V> type);
    <V> Map<String, V> getMap(String key, Class<V> type);
    void removeFromMap(String key, String mapKey);

    // === Health ===

    HealthLevel getHealthLevel(String key);
    Map<String, HealthLevel> getAllHealthLevels();
    List<String> getUnhealthyKeys();

    // === Callbacks ===

    void registerCallback(String key, Consumer<Object> callback);
    void unregisterCallback(String key);

    // === Introspection ===

    Set<String> keys();
    Set<String> keys(String pattern);
    Map<String, Object> getAll();
    int size();
    void clear();
}

3. Health Levels

package eu.lmvi.socle.shared;

public enum HealthLevel {
    /**
     * Informatif - pas d'impact sur la santé
     */
    INFO,

    /**
     * Normal - contribue à la santé normale
     */
    NORMAL,

    /**
     * Important - dégradation si problème
     */
    IMPORTANT,

    /**
     * Critique - unhealthy si problème
     */
    CRITICAL
}

Utilisation dans le Supervisor

Le Supervisor consulte les HealthLevel pour déterminer l’état de santé global :

  • CRITICAL absent ou invalide → Instance UNHEALTHY
  • IMPORTANT absent ou invalide → Instance DEGRADED
  • NORMAL/INFO → Pas d’impact

4. Implémentation

package eu.lmvi.socle.shared;

@Component
public class InMemorySharedDataRegistry implements SharedDataRegistry {

    private final ConcurrentHashMap<String, Entry> store = new ConcurrentHashMap<>();
    private final ConcurrentHashMap<String, Consumer<Object>> callbacks = new ConcurrentHashMap<>();

    @Override
    public void put(String key, Object value) {
        put(key, value, HealthLevel.NORMAL);
    }

    @Override
    public void put(String key, Object value, HealthLevel level) {
        Entry previous = store.put(key, new Entry(value, level));
        notifyCallback(key, value);
    }

    @Override
    public Optional<Object> get(String key) {
        Entry entry = store.get(key);
        return entry != null ? Optional.of(entry.value) : Optional.empty();
    }

    @Override
    public <T> Optional<T> get(String key, Class<T> type) {
        return get(key).filter(type::isInstance).map(type::cast);
    }

    @Override
    public Optional<String> getString(String key) {
        return get(key).map(Object::toString);
    }

    @Override
    public Optional<Integer> getInt(String key) {
        return get(key, Number.class).map(Number::intValue);
    }

    @Override
    public Optional<Long> getLong(String key) {
        return get(key, Number.class).map(Number::longValue);
    }

    @Override
    public void createSequence(String key, long initialValue, HealthLevel level) {
        store.put(key, new Entry(new AtomicLong(initialValue), level));
    }

    @Override
    public long incrementSequence(String key) {
        return incrementSequence(key, 1);
    }

    @Override
    public long incrementSequence(String key, long delta) {
        Entry entry = store.get(key);
        if (entry == null || !(entry.value instanceof AtomicLong)) {
            throw new IllegalStateException("Sequence not found: " + key);
        }
        long newValue = ((AtomicLong) entry.value).addAndGet(delta);
        notifyCallback(key, newValue);
        return newValue;
    }

    @Override
    public long getSequence(String key) {
        Entry entry = store.get(key);
        if (entry == null || !(entry.value instanceof AtomicLong)) {
            throw new IllegalStateException("Sequence not found: " + key);
        }
        return ((AtomicLong) entry.value).get();
    }

    @Override
    public HealthLevel getHealthLevel(String key) {
        Entry entry = store.get(key);
        return entry != null ? entry.level : null;
    }

    @Override
    public List<String> getUnhealthyKeys() {
        return store.entrySet().stream()
            .filter(e -> e.getValue().level == HealthLevel.CRITICAL)
            .filter(e -> !isValueHealthy(e.getValue().value))
            .map(Map.Entry::getKey)
            .toList();
    }

    @Override
    public void registerCallback(String key, Consumer<Object> callback) {
        callbacks.put(key, callback);
    }

    private void notifyCallback(String key, Object value) {
        Consumer<Object> callback = callbacks.get(key);
        if (callback != null) {
            try {
                callback.accept(value);
            } catch (Exception e) {
                // Log but don't propagate
            }
        }
    }

    private boolean isValueHealthy(Object value) {
        if (value == null) return false;
        if (value instanceof Boolean b) return b;
        if (value instanceof Number n) return n.doubleValue() >= 0;
        return true;
    }

    private record Entry(Object value, HealthLevel level) {}
}

5. Utilisation

5.1 Injection

@Service
public class MonService {

    @Autowired
    private SharedDataRegistry registry;

    public void process() {
        // ...
    }
}

5.2 Key-Value simple

// Stocker
registry.put("config.maxRetries", 3);
registry.put("status.lastSync", Instant.now().toString());

// Récupérer
int maxRetries = registry.getInt("config.maxRetries").orElse(5);
String lastSync = registry.getString("status.lastSync").orElse("never");

5.3 Avec Health Level

// Donnée critique - instance unhealthy si absente
registry.put("database.connected", true, HealthLevel.CRITICAL);

// Donnée importante - instance degraded si absente
registry.put("cache.available", true, HealthLevel.IMPORTANT);

// Donnée normale
registry.put("stats.requestsTotal", 0, HealthLevel.NORMAL);

// Donnée informative
registry.put("info.startTime", Instant.now(), HealthLevel.INFO);

5.4 Sequences (Compteurs)

// Créer une séquence
registry.createSequence("orders.processed", 0, HealthLevel.NORMAL);

// Incrémenter
long count = registry.incrementSequence("orders.processed");
log.info("Processed order #{}", count);

// Incrémenter avec delta
long bytes = registry.incrementSequence("bytes.transferred", 1024);

// Lire
long total = registry.getSequence("orders.processed");

5.5 Listes

// Ajouter à une liste
registry.addToList("errors.recent", new ErrorRecord("timeout", Instant.now()));
registry.addToList("errors.recent", new ErrorRecord("connection", Instant.now()));

// Lire la liste
List<ErrorRecord> errors = registry.getList("errors.recent", ErrorRecord.class);

// Vider
registry.clearList("errors.recent");

5.6 Maps

// Stocker dans une map
registry.putInMap("workers.status", "worker-1", "RUNNING");
registry.putInMap("workers.status", "worker-2", "STOPPED");

// Lire une entrée
Optional<String> status = registry.getFromMap("workers.status", "worker-1", String.class);

// Lire toute la map
Map<String, String> allStatus = registry.getMap("workers.status", String.class);

5.7 Callbacks

// Enregistrer un callback
registry.registerCallback("config.maxRetries", newValue -> {
    log.info("maxRetries changed to: {}", newValue);
    reconfigure((Integer) newValue);
});

// La modification déclenche le callback
registry.put("config.maxRetries", 5);  // Callback appelé

6. Patterns courants

6.1 État de connexion

@Component
public class DatabaseWorker implements Worker {

    @Autowired
    private SharedDataRegistry registry;

    @Override
    public void initialize() {
        registry.put("database.connected", false, HealthLevel.CRITICAL);
    }

    @Override
    public void start() {
        try {
            connect();
            registry.put("database.connected", true, HealthLevel.CRITICAL);
        } catch (Exception e) {
            registry.put("database.connected", false, HealthLevel.CRITICAL);
            throw e;
        }
    }

    @Override
    public void stop() {
        disconnect();
        registry.put("database.connected", false, HealthLevel.CRITICAL);
    }
}

6.2 Métriques temps réel

@Component
public class MetricsCollector {

    @Autowired
    private SharedDataRegistry registry;

    @PostConstruct
    public void init() {
        registry.createSequence("metrics.requests.total", 0, HealthLevel.INFO);
        registry.createSequence("metrics.requests.errors", 0, HealthLevel.NORMAL);
        registry.createSequence("metrics.bytes.in", 0, HealthLevel.INFO);
        registry.createSequence("metrics.bytes.out", 0, HealthLevel.INFO);
    }

    public void recordRequest(long bytesIn, long bytesOut, boolean success) {
        registry.incrementSequence("metrics.requests.total");
        registry.incrementSequence("metrics.bytes.in", bytesIn);
        registry.incrementSequence("metrics.bytes.out", bytesOut);

        if (!success) {
            registry.incrementSequence("metrics.requests.errors");
        }
    }

    public Map<String, Object> getMetrics() {
        return Map.of(
            "requests.total", registry.getSequence("metrics.requests.total"),
            "requests.errors", registry.getSequence("metrics.requests.errors"),
            "bytes.in", registry.getSequence("metrics.bytes.in"),
            "bytes.out", registry.getSequence("metrics.bytes.out")
        );
    }
}

6.3 Circuit Breaker state

@Component
public class CircuitBreakerStateManager {

    @Autowired
    private SharedDataRegistry registry;

    public void updateState(String circuitName, CircuitState state) {
        String key = "circuit." + circuitName + ".state";
        HealthLevel level = state == CircuitState.OPEN
            ? HealthLevel.IMPORTANT
            : HealthLevel.NORMAL;
        registry.put(key, state.name(), level);
    }

    public CircuitState getState(String circuitName) {
        return registry.getString("circuit." + circuitName + ".state")
            .map(CircuitState::valueOf)
            .orElse(CircuitState.CLOSED);
    }
}

6.4 Progress tracking

@Component
public class BatchProcessor {

    @Autowired
    private SharedDataRegistry registry;

    public void processBatch(String batchId, List<Item> items) {
        registry.put("batch." + batchId + ".total", items.size());
        registry.createSequence("batch." + batchId + ".processed", 0, HealthLevel.NORMAL);

        for (Item item : items) {
            processItem(item);
            registry.incrementSequence("batch." + batchId + ".processed");
        }

        registry.put("batch." + batchId + ".status", "COMPLETED");
    }

    public double getProgress(String batchId) {
        int total = registry.getInt("batch." + batchId + ".total").orElse(0);
        if (total == 0) return 0;

        long processed = registry.getSequence("batch." + batchId + ".processed");
        return (double) processed / total * 100;
    }
}

7. Intégration avec Supervisor

@Component
public class HealthAggregator {

    @Autowired
    private SharedDataRegistry registry;

    @Autowired
    private List<Worker> workers;

    public HealthStatus aggregateHealth() {
        // Check workers
        boolean allWorkersHealthy = workers.stream().allMatch(Worker::isHealthy);

        // Check critical registry entries
        List<String> unhealthyKeys = registry.getUnhealthyKeys();
        boolean hasCriticalFailure = !unhealthyKeys.isEmpty();

        if (!allWorkersHealthy || hasCriticalFailure) {
            return HealthStatus.UNHEALTHY;
        }

        // Check important entries
        Map<String, HealthLevel> levels = registry.getAllHealthLevels();
        boolean hasImportantFailure = levels.entrySet().stream()
            .filter(e -> e.getValue() == HealthLevel.IMPORTANT)
            .anyMatch(e -> !isHealthy(registry.get(e.getKey())));

        if (hasImportantFailure) {
            return HealthStatus.DEGRADED;
        }

        return HealthStatus.HEALTHY;
    }
}

8. Exposition API

@RestController
@RequestMapping("/admin/registry")
public class SharedDataController {

    @Autowired
    private SharedDataRegistry registry;

    @GetMapping
    public Map<String, Object> getAll() {
        return registry.getAll();
    }

    @GetMapping("/{key}")
    public ResponseEntity<?> get(@PathVariable String key) {
        return registry.get(key)
            .map(ResponseEntity::ok)
            .orElse(ResponseEntity.notFound().build());
    }

    @GetMapping("/health")
    public Map<String, HealthLevel> getHealthLevels() {
        return registry.getAllHealthLevels();
    }

    @GetMapping("/unhealthy")
    public List<String> getUnhealthyKeys() {
        return registry.getUnhealthyKeys();
    }
}

9. Bonnes pratiques

Conventions de nommage

<category>.<subcategory>.<name>

Exemples:
- database.connected
- worker.kafka.status
- metrics.requests.total
- batch.order-123.progress
- circuit.external-api.state

DO

  • Utiliser des noms de clés cohérents et hiérarchiques
  • Définir le HealthLevel approprié pour chaque donnée
  • Utiliser les sequences pour les compteurs (thread-safe)
  • Nettoyer les données obsolètes

DON’T

  • Ne pas stocker de données volumineuses (logs, payloads)
  • Ne pas utiliser pour le stockage persistant (utiliser TechDB)
  • Ne pas créer de nouvelles clés dynamiquement sans contrôle
  • Ne pas oublier que c’est per-instance (pas de sync multi-instances)

10. Références

Commentaires

Laisser un commentaire

Votre adresse e-mail ne sera pas publiée. Les champs obligatoires sont indiqués avec *