Socle V004 – KV-Bus

Socle V004 - KV-Bus

06 – KvBus (Key-Value Bus)

Version : 4.0.0 Date : 2025-12-09

1. Introduction

KvBus est une abstraction de stockage clé-valeur avec deux implémentations :

  • in_memory : HashMap pour le développement local
  • redis : Redis pour la production multi-instances

Caractéristiques

  • Interface unifiée
  • TTL (Time-To-Live) configurable
  • Opérations atomiques
  • Support JSON pour les objets complexes
  • Patterns pub/sub (Redis uniquement)

2. Configuration

2.1 application.yml

socle:
  kvbus:
    mode: ${KVBUS_MODE:in_memory}
    redis:
      host: ${REDIS_HOST:localhost}
      port: ${REDIS_PORT:6379}
      password: ${REDIS_PASSWORD:}
      database: ${REDIS_DATABASE:0}
      prefix: ${REDIS_PREFIX:socle}
      connect-timeout-ms: ${REDIS_CONNECT_TIMEOUT:5000}
      read-timeout-ms: ${REDIS_READ_TIMEOUT:5000}
      pool:
        max-total: ${REDIS_POOL_MAX:16}
        max-idle: ${REDIS_POOL_MAX_IDLE:8}
        min-idle: ${REDIS_POOL_MIN_IDLE:2}

2.2 Variables d’environnement

Variable Description Défaut
KVBUS_MODE Mode (in_memory/redis) in_memory
REDIS_HOST Hôte Redis localhost
REDIS_PORT Port Redis 6379
REDIS_PASSWORD Mot de passe
REDIS_DATABASE Database number 0
REDIS_PREFIX Préfixe des clés socle

3. Interface KvBus

package eu.lmvi.socle.kv;

public interface KvBus {

    // === CRUD basique ===

    /**
     * Stocke une valeur
     */
    void put(String key, String value);

    /**
     * Stocke une valeur avec TTL
     */
    void put(String key, String value, Duration ttl);

    /**
     * Récupère une valeur
     */
    Optional<String> get(String key);

    /**
     * Supprime une clé
     */
    void delete(String key);

    /**
     * Vérifie l'existence d'une clé
     */
    boolean exists(String key);

    // === TTL ===

    /**
     * Définit le TTL d'une clé existante
     */
    void setTtl(String key, Duration ttl);

    /**
     * Récupère le TTL restant
     */
    Optional<Duration> getTtl(String key);

    // === Opérations atomiques ===

    /**
     * Incrémente une valeur numérique
     */
    long increment(String key);

    /**
     * Incrémente avec delta
     */
    long increment(String key, long delta);

    /**
     * Set if not exists
     */
    boolean putIfAbsent(String key, String value);

    /**
     * Set if not exists avec TTL
     */
    boolean putIfAbsent(String key, String value, Duration ttl);

    // === Opérations en lot ===

    /**
     * Récupère plusieurs clés
     */
    Map<String, String> getAll(Collection<String> keys);

    /**
     * Stocke plusieurs valeurs
     */
    void putAll(Map<String, String> entries);

    /**
     * Supprime plusieurs clés
     */
    void deleteAll(Collection<String> keys);

    // === Pattern matching ===

    /**
     * Liste les clés correspondant à un pattern
     */
    Set<String> keys(String pattern);

    // === JSON helpers ===

    /**
     * Stocke un objet en JSON
     */
    <T> void putJson(String key, T object);

    /**
     * Récupère un objet depuis JSON
     */
    <T> Optional<T> getJson(String key, Class<T> type);

    // === Lifecycle ===

    /**
     * Vérifie la santé de la connexion
     */
    boolean isHealthy();

    /**
     * Ferme les connexions
     */
    void close();
}

4. Implémentation InMemoryKvBus

package eu.lmvi.socle.kv;

@Component
@ConditionalOnProperty(name = "socle.kvbus.mode", havingValue = "in_memory", matchIfMissing = true)
public class InMemoryKvBus implements KvBus {

    private final ConcurrentHashMap<String, Entry> store = new ConcurrentHashMap<>();
    private final ScheduledExecutorService cleaner;

    public InMemoryKvBus() {
        // Nettoyage des entrées expirées toutes les minutes
        cleaner = Executors.newSingleThreadScheduledExecutor();
        cleaner.scheduleAtFixedRate(this::cleanExpired, 1, 1, TimeUnit.MINUTES);
    }

    @Override
    public void put(String key, String value) {
        store.put(key, new Entry(value, null));
    }

    @Override
    public void put(String key, String value, Duration ttl) {
        Instant expiry = Instant.now().plus(ttl);
        store.put(key, new Entry(value, expiry));
    }

    @Override
    public Optional<String> get(String key) {
        Entry entry = store.get(key);
        if (entry == null) return Optional.empty();
        if (entry.isExpired()) {
            store.remove(key);
            return Optional.empty();
        }
        return Optional.of(entry.value);
    }

    @Override
    public void delete(String key) {
        store.remove(key);
    }

    @Override
    public boolean exists(String key) {
        return get(key).isPresent();
    }

    @Override
    public long increment(String key) {
        return increment(key, 1);
    }

    @Override
    public long increment(String key, long delta) {
        Entry entry = store.compute(key, (k, v) -> {
            long current = (v == null) ? 0 : Long.parseLong(v.value);
            return new Entry(String.valueOf(current + delta), v != null ? v.expiry : null);
        });
        return Long.parseLong(entry.value);
    }

    @Override
    public boolean putIfAbsent(String key, String value) {
        return store.putIfAbsent(key, new Entry(value, null)) == null;
    }

    @Override
    public Set<String> keys(String pattern) {
        String regex = pattern.replace("*", ".*");
        return store.keySet().stream()
            .filter(k -> k.matches(regex))
            .collect(Collectors.toSet());
    }

    @Override
    public boolean isHealthy() {
        return true;
    }

    @Override
    public void close() {
        cleaner.shutdown();
        store.clear();
    }

    private void cleanExpired() {
        store.entrySet().removeIf(e -> e.getValue().isExpired());
    }

    private record Entry(String value, Instant expiry) {
        boolean isExpired() {
            return expiry != null && Instant.now().isAfter(expiry);
        }
    }
}

5. Implémentation RedisKvBus

package eu.lmvi.socle.kv;

@Component
@ConditionalOnProperty(name = "socle.kvbus.mode", havingValue = "redis")
public class RedisKvBus implements KvBus {

    private static final Logger log = LoggerFactory.getLogger(RedisKvBus.class);

    private final JedisPool jedisPool;
    private final String prefix;
    private final ObjectMapper objectMapper;

    public RedisKvBus(SocleConfiguration config) {
        JedisPoolConfig poolConfig = new JedisPoolConfig();
        poolConfig.setMaxTotal(config.getKvbus().getRedis().getPool().getMaxTotal());
        poolConfig.setMaxIdle(config.getKvbus().getRedis().getPool().getMaxIdle());
        poolConfig.setMinIdle(config.getKvbus().getRedis().getPool().getMinIdle());

        this.jedisPool = new JedisPool(
            poolConfig,
            config.getKvbus().getRedis().getHost(),
            config.getKvbus().getRedis().getPort(),
            config.getKvbus().getRedis().getConnectTimeoutMs(),
            config.getKvbus().getRedis().getPassword(),
            config.getKvbus().getRedis().getDatabase()
        );

        this.prefix = config.getKvbus().getRedis().getPrefix() + ":";
        this.objectMapper = new ObjectMapper();

        log.info("RedisKvBus initialized: {}:{}",
            config.getKvbus().getRedis().getHost(),
            config.getKvbus().getRedis().getPort());
    }

    private String prefixedKey(String key) {
        return prefix + key;
    }

    @Override
    public void put(String key, String value) {
        try (Jedis jedis = jedisPool.getResource()) {
            jedis.set(prefixedKey(key), value);
        }
    }

    @Override
    public void put(String key, String value, Duration ttl) {
        try (Jedis jedis = jedisPool.getResource()) {
            jedis.setex(prefixedKey(key), ttl.toSeconds(), value);
        }
    }

    @Override
    public Optional<String> get(String key) {
        try (Jedis jedis = jedisPool.getResource()) {
            return Optional.ofNullable(jedis.get(prefixedKey(key)));
        }
    }

    @Override
    public void delete(String key) {
        try (Jedis jedis = jedisPool.getResource()) {
            jedis.del(prefixedKey(key));
        }
    }

    @Override
    public boolean exists(String key) {
        try (Jedis jedis = jedisPool.getResource()) {
            return jedis.exists(prefixedKey(key));
        }
    }

    @Override
    public long increment(String key) {
        try (Jedis jedis = jedisPool.getResource()) {
            return jedis.incr(prefixedKey(key));
        }
    }

    @Override
    public long increment(String key, long delta) {
        try (Jedis jedis = jedisPool.getResource()) {
            return jedis.incrBy(prefixedKey(key), delta);
        }
    }

    @Override
    public boolean putIfAbsent(String key, String value) {
        try (Jedis jedis = jedisPool.getResource()) {
            return jedis.setnx(prefixedKey(key), value) == 1;
        }
    }

    @Override
    public boolean putIfAbsent(String key, String value, Duration ttl) {
        try (Jedis jedis = jedisPool.getResource()) {
            String result = jedis.set(prefixedKey(key), value,
                SetParams.setParams().nx().ex(ttl.toSeconds()));
            return "OK".equals(result);
        }
    }

    @Override
    public Set<String> keys(String pattern) {
        try (Jedis jedis = jedisPool.getResource()) {
            Set<String> rawKeys = jedis.keys(prefixedKey(pattern));
            return rawKeys.stream()
                .map(k -> k.substring(prefix.length()))
                .collect(Collectors.toSet());
        }
    }

    @Override
    public <T> void putJson(String key, T object) {
        try {
            String json = objectMapper.writeValueAsString(object);
            put(key, json);
        } catch (JsonProcessingException e) {
            throw new RuntimeException("Failed to serialize object", e);
        }
    }

    @Override
    public <T> Optional<T> getJson(String key, Class<T> type) {
        return get(key).map(json -> {
            try {
                return objectMapper.readValue(json, type);
            } catch (JsonProcessingException e) {
                throw new RuntimeException("Failed to deserialize object", e);
            }
        });
    }

    @Override
    public boolean isHealthy() {
        try (Jedis jedis = jedisPool.getResource()) {
            return "PONG".equals(jedis.ping());
        } catch (Exception e) {
            return false;
        }
    }

    @Override
    public void close() {
        jedisPool.close();
    }
}

6. Utilisation

6.1 Injection

@Service
public class MonService {

    @Autowired
    private KvBus kvBus;

    public void process() {
        // Utiliser kvBus...
    }
}

6.2 CRUD basique

// Stocker
kvBus.put("user:123:name", "John");
kvBus.put("session:abc", "data", Duration.ofHours(1));

// Récupérer
Optional<String> name = kvBus.get("user:123:name");
name.ifPresent(n -> log.info("Name: {}", n));

// Vérifier
if (kvBus.exists("user:123:name")) {
    // ...
}

// Supprimer
kvBus.delete("user:123:name");

6.3 JSON

// Stocker un objet
Order order = new Order("123", "PENDING", List.of("item1", "item2"));
kvBus.putJson("order:123", order);

// Récupérer un objet
Optional<Order> retrieved = kvBus.getJson("order:123", Order.class);

6.4 Compteurs atomiques

// Incrémenter
long newValue = kvBus.increment("stats:requests:total");
long newValue2 = kvBus.increment("stats:bytes:total", 1024);

// Compteur avec reset quotidien
String dailyKey = "stats:requests:" + LocalDate.now();
kvBus.increment(dailyKey);
kvBus.setTtl(dailyKey, Duration.ofDays(1));

6.5 Lock distribué (Redis)

public boolean tryLock(String resource, Duration timeout) {
    String lockKey = "lock:" + resource;
    return kvBus.putIfAbsent(lockKey, "locked", timeout);
}

public void unlock(String resource) {
    kvBus.delete("lock:" + resource);
}

// Utilisation
if (tryLock("order-processing", Duration.ofMinutes(5))) {
    try {
        processOrders();
    } finally {
        unlock("order-processing");
    }
}

6.6 Cache avec TTL

public Order getOrder(String orderId) {
    String cacheKey = "cache:order:" + orderId;

    // Check cache
    Optional<Order> cached = kvBus.getJson(cacheKey, Order.class);
    if (cached.isPresent()) {
        return cached.get();
    }

    // Load from DB
    Order order = orderRepository.findById(orderId);

    // Cache for 5 minutes
    kvBus.putJson(cacheKey, order);
    kvBus.setTtl(cacheKey, Duration.ofMinutes(5));

    return order;
}

7. Patterns avancés

7.1 Rate limiting

public boolean isRateLimited(String userId, int maxRequests, Duration window) {
    String key = "ratelimit:" + userId + ":" + Instant.now().truncatedTo(ChronoUnit.MINUTES);

    long count = kvBus.increment(key);
    if (count == 1) {
        kvBus.setTtl(key, window);
    }

    return count > maxRequests;
}

7.2 Session management

public void createSession(String sessionId, User user) {
    kvBus.putJson("session:" + sessionId, user);
    kvBus.setTtl("session:" + sessionId, Duration.ofHours(24));
}

public Optional<User> getSession(String sessionId) {
    return kvBus.getJson("session:" + sessionId, User.class);
}

public void refreshSession(String sessionId) {
    kvBus.setTtl("session:" + sessionId, Duration.ofHours(24));
}

public void destroySession(String sessionId) {
    kvBus.delete("session:" + sessionId);
}

7.3 Feature flags

public boolean isFeatureEnabled(String feature) {
    return kvBus.get("feature:" + feature)
        .map(Boolean::parseBoolean)
        .orElse(false);
}

public void setFeatureFlag(String feature, boolean enabled) {
    kvBus.put("feature:" + feature, String.valueOf(enabled));
}

8. KvBus vs TechDB (V4)

Aspect KvBus TechDB (H2)
Cas d’usage Cache, sessions, locks Offsets, état persistant
Survie restart Non (in_memory) / Oui (Redis) Oui (fichier)
Multi-instances Non (in_memory) / Oui (Redis) Non (par instance)
Performance Ultra rapide Rapide
Requêtes Clé simple SQL, JSON

Règle de choix

  • KvBus : Données temporaires, cache, sessions, compteurs temps réel
  • TechDB : Offsets, checkpoints, état qui doit survivre au restart

9. Monitoring

9.1 Métriques

socle_kvbus_operations_total{operation="get"}
socle_kvbus_operations_total{operation="put"}
socle_kvbus_operations_total{operation="delete"}
socle_kvbus_latency_seconds{operation="get"}
socle_kvbus_keys_count

9.2 Health Check

@Component
public class KvBusHealthIndicator implements HealthIndicator {

    @Autowired
    private KvBus kvBus;

    @Override
    public Health health() {
        if (kvBus.isHealthy()) {
            return Health.up().build();
        }
        return Health.down().withDetail("error", "KvBus not responding").build();
    }
}

10. Bonnes pratiques

DO

  • Utiliser des préfixes de clés cohérents (user:, session:, cache:)
  • Toujours définir un TTL pour les caches
  • Utiliser putIfAbsent pour les locks
  • Préférer Redis en production multi-instances

DON’T

  • Ne pas stocker de données volumineuses (> 1MB)
  • Ne pas utiliser keys("*") en production (scan)
  • Ne pas oublier de fermer les connexions
  • Ne pas utiliser in_memory pour les données critiques en prod

11. Références

Commentaires

Laisser un commentaire

Votre adresse e-mail ne sera pas publiée. Les champs obligatoires sont indiqués avec *