07 – SharedDataRegistry
Version : 4.0.0 Date : 2025-12-09
1. Introduction
SharedDataRegistry est un registre centralisé pour partager des données entre Workers au sein d’une même instance. Il fournit des opérations atomiques et un système de niveaux de santé.
Différence avec KvBus
| Aspect | SharedDataRegistry | KvBus |
|---|---|---|
| Scope | Intra-instance | Optionnel inter-instances (Redis) |
| Performance | Ultra rapide (mémoire) | Variable (réseau si Redis) |
| Types | Fortement typés | Strings/JSON |
| Health levels | Oui | Non |
| Callbacks | Oui | Non |
2. Interface SharedDataRegistry
package eu.lmvi.socle.shared;
public interface SharedDataRegistry {
// === Key-Value basique ===
void put(String key, Object value);
void put(String key, Object value, HealthLevel level);
Optional<Object> get(String key);
<T> Optional<T> get(String key, Class<T> type);
void delete(String key);
boolean exists(String key);
// === Typed getters ===
Optional<String> getString(String key);
Optional<Integer> getInt(String key);
Optional<Long> getLong(String key);
Optional<Double> getDouble(String key);
Optional<Boolean> getBoolean(String key);
// === Sequences (compteurs atomiques) ===
void createSequence(String key, long initialValue, HealthLevel level);
long incrementSequence(String key);
long incrementSequence(String key, long delta);
long getSequence(String key);
void setSequence(String key, long value);
// === Lists ===
<T> void addToList(String key, T item);
<T> List<T> getList(String key, Class<T> type);
void clearList(String key);
// === Maps ===
<V> void putInMap(String key, String mapKey, V value);
<V> Optional<V> getFromMap(String key, String mapKey, Class<V> type);
<V> Map<String, V> getMap(String key, Class<V> type);
void removeFromMap(String key, String mapKey);
// === Health ===
HealthLevel getHealthLevel(String key);
Map<String, HealthLevel> getAllHealthLevels();
List<String> getUnhealthyKeys();
// === Callbacks ===
void registerCallback(String key, Consumer<Object> callback);
void unregisterCallback(String key);
// === Introspection ===
Set<String> keys();
Set<String> keys(String pattern);
Map<String, Object> getAll();
int size();
void clear();
}
3. Health Levels
package eu.lmvi.socle.shared;
public enum HealthLevel {
/**
* Informatif - pas d'impact sur la santé
*/
INFO,
/**
* Normal - contribue à la santé normale
*/
NORMAL,
/**
* Important - dégradation si problème
*/
IMPORTANT,
/**
* Critique - unhealthy si problème
*/
CRITICAL
}
Utilisation dans le Supervisor
Le Supervisor consulte les HealthLevel pour déterminer l’état de santé global :
- CRITICAL absent ou invalide → Instance UNHEALTHY
- IMPORTANT absent ou invalide → Instance DEGRADED
- NORMAL/INFO → Pas d’impact
4. Implémentation
package eu.lmvi.socle.shared;
@Component
public class InMemorySharedDataRegistry implements SharedDataRegistry {
private final ConcurrentHashMap<String, Entry> store = new ConcurrentHashMap<>();
private final ConcurrentHashMap<String, Consumer<Object>> callbacks = new ConcurrentHashMap<>();
@Override
public void put(String key, Object value) {
put(key, value, HealthLevel.NORMAL);
}
@Override
public void put(String key, Object value, HealthLevel level) {
Entry previous = store.put(key, new Entry(value, level));
notifyCallback(key, value);
}
@Override
public Optional<Object> get(String key) {
Entry entry = store.get(key);
return entry != null ? Optional.of(entry.value) : Optional.empty();
}
@Override
public <T> Optional<T> get(String key, Class<T> type) {
return get(key).filter(type::isInstance).map(type::cast);
}
@Override
public Optional<String> getString(String key) {
return get(key).map(Object::toString);
}
@Override
public Optional<Integer> getInt(String key) {
return get(key, Number.class).map(Number::intValue);
}
@Override
public Optional<Long> getLong(String key) {
return get(key, Number.class).map(Number::longValue);
}
@Override
public void createSequence(String key, long initialValue, HealthLevel level) {
store.put(key, new Entry(new AtomicLong(initialValue), level));
}
@Override
public long incrementSequence(String key) {
return incrementSequence(key, 1);
}
@Override
public long incrementSequence(String key, long delta) {
Entry entry = store.get(key);
if (entry == null || !(entry.value instanceof AtomicLong)) {
throw new IllegalStateException("Sequence not found: " + key);
}
long newValue = ((AtomicLong) entry.value).addAndGet(delta);
notifyCallback(key, newValue);
return newValue;
}
@Override
public long getSequence(String key) {
Entry entry = store.get(key);
if (entry == null || !(entry.value instanceof AtomicLong)) {
throw new IllegalStateException("Sequence not found: " + key);
}
return ((AtomicLong) entry.value).get();
}
@Override
public HealthLevel getHealthLevel(String key) {
Entry entry = store.get(key);
return entry != null ? entry.level : null;
}
@Override
public List<String> getUnhealthyKeys() {
return store.entrySet().stream()
.filter(e -> e.getValue().level == HealthLevel.CRITICAL)
.filter(e -> !isValueHealthy(e.getValue().value))
.map(Map.Entry::getKey)
.toList();
}
@Override
public void registerCallback(String key, Consumer<Object> callback) {
callbacks.put(key, callback);
}
private void notifyCallback(String key, Object value) {
Consumer<Object> callback = callbacks.get(key);
if (callback != null) {
try {
callback.accept(value);
} catch (Exception e) {
// Log but don't propagate
}
}
}
private boolean isValueHealthy(Object value) {
if (value == null) return false;
if (value instanceof Boolean b) return b;
if (value instanceof Number n) return n.doubleValue() >= 0;
return true;
}
private record Entry(Object value, HealthLevel level) {}
}
5. Utilisation
5.1 Injection
@Service
public class MonService {
@Autowired
private SharedDataRegistry registry;
public void process() {
// ...
}
}
5.2 Key-Value simple
// Stocker
registry.put("config.maxRetries", 3);
registry.put("status.lastSync", Instant.now().toString());
// Récupérer
int maxRetries = registry.getInt("config.maxRetries").orElse(5);
String lastSync = registry.getString("status.lastSync").orElse("never");
5.3 Avec Health Level
// Donnée critique - instance unhealthy si absente
registry.put("database.connected", true, HealthLevel.CRITICAL);
// Donnée importante - instance degraded si absente
registry.put("cache.available", true, HealthLevel.IMPORTANT);
// Donnée normale
registry.put("stats.requestsTotal", 0, HealthLevel.NORMAL);
// Donnée informative
registry.put("info.startTime", Instant.now(), HealthLevel.INFO);
5.4 Sequences (Compteurs)
// Créer une séquence
registry.createSequence("orders.processed", 0, HealthLevel.NORMAL);
// Incrémenter
long count = registry.incrementSequence("orders.processed");
log.info("Processed order #{}", count);
// Incrémenter avec delta
long bytes = registry.incrementSequence("bytes.transferred", 1024);
// Lire
long total = registry.getSequence("orders.processed");
5.5 Listes
// Ajouter à une liste
registry.addToList("errors.recent", new ErrorRecord("timeout", Instant.now()));
registry.addToList("errors.recent", new ErrorRecord("connection", Instant.now()));
// Lire la liste
List<ErrorRecord> errors = registry.getList("errors.recent", ErrorRecord.class);
// Vider
registry.clearList("errors.recent");
5.6 Maps
// Stocker dans une map
registry.putInMap("workers.status", "worker-1", "RUNNING");
registry.putInMap("workers.status", "worker-2", "STOPPED");
// Lire une entrée
Optional<String> status = registry.getFromMap("workers.status", "worker-1", String.class);
// Lire toute la map
Map<String, String> allStatus = registry.getMap("workers.status", String.class);
5.7 Callbacks
// Enregistrer un callback
registry.registerCallback("config.maxRetries", newValue -> {
log.info("maxRetries changed to: {}", newValue);
reconfigure((Integer) newValue);
});
// La modification déclenche le callback
registry.put("config.maxRetries", 5); // Callback appelé
6. Patterns courants
6.1 État de connexion
@Component
public class DatabaseWorker implements Worker {
@Autowired
private SharedDataRegistry registry;
@Override
public void initialize() {
registry.put("database.connected", false, HealthLevel.CRITICAL);
}
@Override
public void start() {
try {
connect();
registry.put("database.connected", true, HealthLevel.CRITICAL);
} catch (Exception e) {
registry.put("database.connected", false, HealthLevel.CRITICAL);
throw e;
}
}
@Override
public void stop() {
disconnect();
registry.put("database.connected", false, HealthLevel.CRITICAL);
}
}
6.2 Métriques temps réel
@Component
public class MetricsCollector {
@Autowired
private SharedDataRegistry registry;
@PostConstruct
public void init() {
registry.createSequence("metrics.requests.total", 0, HealthLevel.INFO);
registry.createSequence("metrics.requests.errors", 0, HealthLevel.NORMAL);
registry.createSequence("metrics.bytes.in", 0, HealthLevel.INFO);
registry.createSequence("metrics.bytes.out", 0, HealthLevel.INFO);
}
public void recordRequest(long bytesIn, long bytesOut, boolean success) {
registry.incrementSequence("metrics.requests.total");
registry.incrementSequence("metrics.bytes.in", bytesIn);
registry.incrementSequence("metrics.bytes.out", bytesOut);
if (!success) {
registry.incrementSequence("metrics.requests.errors");
}
}
public Map<String, Object> getMetrics() {
return Map.of(
"requests.total", registry.getSequence("metrics.requests.total"),
"requests.errors", registry.getSequence("metrics.requests.errors"),
"bytes.in", registry.getSequence("metrics.bytes.in"),
"bytes.out", registry.getSequence("metrics.bytes.out")
);
}
}
6.3 Circuit Breaker state
@Component
public class CircuitBreakerStateManager {
@Autowired
private SharedDataRegistry registry;
public void updateState(String circuitName, CircuitState state) {
String key = "circuit." + circuitName + ".state";
HealthLevel level = state == CircuitState.OPEN
? HealthLevel.IMPORTANT
: HealthLevel.NORMAL;
registry.put(key, state.name(), level);
}
public CircuitState getState(String circuitName) {
return registry.getString("circuit." + circuitName + ".state")
.map(CircuitState::valueOf)
.orElse(CircuitState.CLOSED);
}
}
6.4 Progress tracking
@Component
public class BatchProcessor {
@Autowired
private SharedDataRegistry registry;
public void processBatch(String batchId, List<Item> items) {
registry.put("batch." + batchId + ".total", items.size());
registry.createSequence("batch." + batchId + ".processed", 0, HealthLevel.NORMAL);
for (Item item : items) {
processItem(item);
registry.incrementSequence("batch." + batchId + ".processed");
}
registry.put("batch." + batchId + ".status", "COMPLETED");
}
public double getProgress(String batchId) {
int total = registry.getInt("batch." + batchId + ".total").orElse(0);
if (total == 0) return 0;
long processed = registry.getSequence("batch." + batchId + ".processed");
return (double) processed / total * 100;
}
}
7. Intégration avec Supervisor
@Component
public class HealthAggregator {
@Autowired
private SharedDataRegistry registry;
@Autowired
private List<Worker> workers;
public HealthStatus aggregateHealth() {
// Check workers
boolean allWorkersHealthy = workers.stream().allMatch(Worker::isHealthy);
// Check critical registry entries
List<String> unhealthyKeys = registry.getUnhealthyKeys();
boolean hasCriticalFailure = !unhealthyKeys.isEmpty();
if (!allWorkersHealthy || hasCriticalFailure) {
return HealthStatus.UNHEALTHY;
}
// Check important entries
Map<String, HealthLevel> levels = registry.getAllHealthLevels();
boolean hasImportantFailure = levels.entrySet().stream()
.filter(e -> e.getValue() == HealthLevel.IMPORTANT)
.anyMatch(e -> !isHealthy(registry.get(e.getKey())));
if (hasImportantFailure) {
return HealthStatus.DEGRADED;
}
return HealthStatus.HEALTHY;
}
}
8. Exposition API
@RestController
@RequestMapping("/admin/registry")
public class SharedDataController {
@Autowired
private SharedDataRegistry registry;
@GetMapping
public Map<String, Object> getAll() {
return registry.getAll();
}
@GetMapping("/{key}")
public ResponseEntity<?> get(@PathVariable String key) {
return registry.get(key)
.map(ResponseEntity::ok)
.orElse(ResponseEntity.notFound().build());
}
@GetMapping("/health")
public Map<String, HealthLevel> getHealthLevels() {
return registry.getAllHealthLevels();
}
@GetMapping("/unhealthy")
public List<String> getUnhealthyKeys() {
return registry.getUnhealthyKeys();
}
}
9. Bonnes pratiques
Conventions de nommage
<category>.<subcategory>.<name>
Exemples:
- database.connected
- worker.kafka.status
- metrics.requests.total
- batch.order-123.progress
- circuit.external-api.state
DO
- Utiliser des noms de clés cohérents et hiérarchiques
- Définir le HealthLevel approprié pour chaque donnée
- Utiliser les sequences pour les compteurs (thread-safe)
- Nettoyer les données obsolètes
DON’T
- Ne pas stocker de données volumineuses (logs, payloads)
- Ne pas utiliser pour le stockage persistant (utiliser TechDB)
- Ne pas créer de nouvelles clés dynamiquement sans contrôle
- Ne pas oublier que c’est per-instance (pas de sync multi-instances)
10. Références
- 06-KV-BUS – KvBus
- 08-SUPERVISOR – Supervision
- 21-H2-TECHDB – Persistance









