06 – KvBus (Key-Value Bus)
Version : 4.0.0 Date : 2025-12-09
1. Introduction
KvBus est une abstraction de stockage clé-valeur avec deux implémentations :
- in_memory : HashMap pour le développement local
- redis : Redis pour la production multi-instances
Caractéristiques
- Interface unifiée
- TTL (Time-To-Live) configurable
- Opérations atomiques
- Support JSON pour les objets complexes
- Patterns pub/sub (Redis uniquement)
2. Configuration
2.1 application.yml
socle:
kvbus:
mode: ${KVBUS_MODE:in_memory}
redis:
host: ${REDIS_HOST:localhost}
port: ${REDIS_PORT:6379}
password: ${REDIS_PASSWORD:}
database: ${REDIS_DATABASE:0}
prefix: ${REDIS_PREFIX:socle}
connect-timeout-ms: ${REDIS_CONNECT_TIMEOUT:5000}
read-timeout-ms: ${REDIS_READ_TIMEOUT:5000}
pool:
max-total: ${REDIS_POOL_MAX:16}
max-idle: ${REDIS_POOL_MAX_IDLE:8}
min-idle: ${REDIS_POOL_MIN_IDLE:2}
2.2 Variables d’environnement
| Variable | Description | Défaut |
|---|---|---|
KVBUS_MODE |
Mode (in_memory/redis) |
in_memory |
REDIS_HOST |
Hôte Redis | localhost |
REDIS_PORT |
Port Redis | 6379 |
REDIS_PASSWORD |
Mot de passe | – |
REDIS_DATABASE |
Database number | 0 |
REDIS_PREFIX |
Préfixe des clés | socle |
3. Interface KvBus
package eu.lmvi.socle.kv;
public interface KvBus {
// === CRUD basique ===
/**
* Stocke une valeur
*/
void put(String key, String value);
/**
* Stocke une valeur avec TTL
*/
void put(String key, String value, Duration ttl);
/**
* Récupère une valeur
*/
Optional<String> get(String key);
/**
* Supprime une clé
*/
void delete(String key);
/**
* Vérifie l'existence d'une clé
*/
boolean exists(String key);
// === TTL ===
/**
* Définit le TTL d'une clé existante
*/
void setTtl(String key, Duration ttl);
/**
* Récupère le TTL restant
*/
Optional<Duration> getTtl(String key);
// === Opérations atomiques ===
/**
* Incrémente une valeur numérique
*/
long increment(String key);
/**
* Incrémente avec delta
*/
long increment(String key, long delta);
/**
* Set if not exists
*/
boolean putIfAbsent(String key, String value);
/**
* Set if not exists avec TTL
*/
boolean putIfAbsent(String key, String value, Duration ttl);
// === Opérations en lot ===
/**
* Récupère plusieurs clés
*/
Map<String, String> getAll(Collection<String> keys);
/**
* Stocke plusieurs valeurs
*/
void putAll(Map<String, String> entries);
/**
* Supprime plusieurs clés
*/
void deleteAll(Collection<String> keys);
// === Pattern matching ===
/**
* Liste les clés correspondant à un pattern
*/
Set<String> keys(String pattern);
// === JSON helpers ===
/**
* Stocke un objet en JSON
*/
<T> void putJson(String key, T object);
/**
* Récupère un objet depuis JSON
*/
<T> Optional<T> getJson(String key, Class<T> type);
// === Lifecycle ===
/**
* Vérifie la santé de la connexion
*/
boolean isHealthy();
/**
* Ferme les connexions
*/
void close();
}
4. Implémentation InMemoryKvBus
package eu.lmvi.socle.kv;
@Component
@ConditionalOnProperty(name = "socle.kvbus.mode", havingValue = "in_memory", matchIfMissing = true)
public class InMemoryKvBus implements KvBus {
private final ConcurrentHashMap<String, Entry> store = new ConcurrentHashMap<>();
private final ScheduledExecutorService cleaner;
public InMemoryKvBus() {
// Nettoyage des entrées expirées toutes les minutes
cleaner = Executors.newSingleThreadScheduledExecutor();
cleaner.scheduleAtFixedRate(this::cleanExpired, 1, 1, TimeUnit.MINUTES);
}
@Override
public void put(String key, String value) {
store.put(key, new Entry(value, null));
}
@Override
public void put(String key, String value, Duration ttl) {
Instant expiry = Instant.now().plus(ttl);
store.put(key, new Entry(value, expiry));
}
@Override
public Optional<String> get(String key) {
Entry entry = store.get(key);
if (entry == null) return Optional.empty();
if (entry.isExpired()) {
store.remove(key);
return Optional.empty();
}
return Optional.of(entry.value);
}
@Override
public void delete(String key) {
store.remove(key);
}
@Override
public boolean exists(String key) {
return get(key).isPresent();
}
@Override
public long increment(String key) {
return increment(key, 1);
}
@Override
public long increment(String key, long delta) {
Entry entry = store.compute(key, (k, v) -> {
long current = (v == null) ? 0 : Long.parseLong(v.value);
return new Entry(String.valueOf(current + delta), v != null ? v.expiry : null);
});
return Long.parseLong(entry.value);
}
@Override
public boolean putIfAbsent(String key, String value) {
return store.putIfAbsent(key, new Entry(value, null)) == null;
}
@Override
public Set<String> keys(String pattern) {
String regex = pattern.replace("*", ".*");
return store.keySet().stream()
.filter(k -> k.matches(regex))
.collect(Collectors.toSet());
}
@Override
public boolean isHealthy() {
return true;
}
@Override
public void close() {
cleaner.shutdown();
store.clear();
}
private void cleanExpired() {
store.entrySet().removeIf(e -> e.getValue().isExpired());
}
private record Entry(String value, Instant expiry) {
boolean isExpired() {
return expiry != null && Instant.now().isAfter(expiry);
}
}
}
5. Implémentation RedisKvBus
package eu.lmvi.socle.kv;
@Component
@ConditionalOnProperty(name = "socle.kvbus.mode", havingValue = "redis")
public class RedisKvBus implements KvBus {
private static final Logger log = LoggerFactory.getLogger(RedisKvBus.class);
private final JedisPool jedisPool;
private final String prefix;
private final ObjectMapper objectMapper;
public RedisKvBus(SocleConfiguration config) {
JedisPoolConfig poolConfig = new JedisPoolConfig();
poolConfig.setMaxTotal(config.getKvbus().getRedis().getPool().getMaxTotal());
poolConfig.setMaxIdle(config.getKvbus().getRedis().getPool().getMaxIdle());
poolConfig.setMinIdle(config.getKvbus().getRedis().getPool().getMinIdle());
this.jedisPool = new JedisPool(
poolConfig,
config.getKvbus().getRedis().getHost(),
config.getKvbus().getRedis().getPort(),
config.getKvbus().getRedis().getConnectTimeoutMs(),
config.getKvbus().getRedis().getPassword(),
config.getKvbus().getRedis().getDatabase()
);
this.prefix = config.getKvbus().getRedis().getPrefix() + ":";
this.objectMapper = new ObjectMapper();
log.info("RedisKvBus initialized: {}:{}",
config.getKvbus().getRedis().getHost(),
config.getKvbus().getRedis().getPort());
}
private String prefixedKey(String key) {
return prefix + key;
}
@Override
public void put(String key, String value) {
try (Jedis jedis = jedisPool.getResource()) {
jedis.set(prefixedKey(key), value);
}
}
@Override
public void put(String key, String value, Duration ttl) {
try (Jedis jedis = jedisPool.getResource()) {
jedis.setex(prefixedKey(key), ttl.toSeconds(), value);
}
}
@Override
public Optional<String> get(String key) {
try (Jedis jedis = jedisPool.getResource()) {
return Optional.ofNullable(jedis.get(prefixedKey(key)));
}
}
@Override
public void delete(String key) {
try (Jedis jedis = jedisPool.getResource()) {
jedis.del(prefixedKey(key));
}
}
@Override
public boolean exists(String key) {
try (Jedis jedis = jedisPool.getResource()) {
return jedis.exists(prefixedKey(key));
}
}
@Override
public long increment(String key) {
try (Jedis jedis = jedisPool.getResource()) {
return jedis.incr(prefixedKey(key));
}
}
@Override
public long increment(String key, long delta) {
try (Jedis jedis = jedisPool.getResource()) {
return jedis.incrBy(prefixedKey(key), delta);
}
}
@Override
public boolean putIfAbsent(String key, String value) {
try (Jedis jedis = jedisPool.getResource()) {
return jedis.setnx(prefixedKey(key), value) == 1;
}
}
@Override
public boolean putIfAbsent(String key, String value, Duration ttl) {
try (Jedis jedis = jedisPool.getResource()) {
String result = jedis.set(prefixedKey(key), value,
SetParams.setParams().nx().ex(ttl.toSeconds()));
return "OK".equals(result);
}
}
@Override
public Set<String> keys(String pattern) {
try (Jedis jedis = jedisPool.getResource()) {
Set<String> rawKeys = jedis.keys(prefixedKey(pattern));
return rawKeys.stream()
.map(k -> k.substring(prefix.length()))
.collect(Collectors.toSet());
}
}
@Override
public <T> void putJson(String key, T object) {
try {
String json = objectMapper.writeValueAsString(object);
put(key, json);
} catch (JsonProcessingException e) {
throw new RuntimeException("Failed to serialize object", e);
}
}
@Override
public <T> Optional<T> getJson(String key, Class<T> type) {
return get(key).map(json -> {
try {
return objectMapper.readValue(json, type);
} catch (JsonProcessingException e) {
throw new RuntimeException("Failed to deserialize object", e);
}
});
}
@Override
public boolean isHealthy() {
try (Jedis jedis = jedisPool.getResource()) {
return "PONG".equals(jedis.ping());
} catch (Exception e) {
return false;
}
}
@Override
public void close() {
jedisPool.close();
}
}
6. Utilisation
6.1 Injection
@Service
public class MonService {
@Autowired
private KvBus kvBus;
public void process() {
// Utiliser kvBus...
}
}
6.2 CRUD basique
// Stocker
kvBus.put("user:123:name", "John");
kvBus.put("session:abc", "data", Duration.ofHours(1));
// Récupérer
Optional<String> name = kvBus.get("user:123:name");
name.ifPresent(n -> log.info("Name: {}", n));
// Vérifier
if (kvBus.exists("user:123:name")) {
// ...
}
// Supprimer
kvBus.delete("user:123:name");
6.3 JSON
// Stocker un objet
Order order = new Order("123", "PENDING", List.of("item1", "item2"));
kvBus.putJson("order:123", order);
// Récupérer un objet
Optional<Order> retrieved = kvBus.getJson("order:123", Order.class);
6.4 Compteurs atomiques
// Incrémenter
long newValue = kvBus.increment("stats:requests:total");
long newValue2 = kvBus.increment("stats:bytes:total", 1024);
// Compteur avec reset quotidien
String dailyKey = "stats:requests:" + LocalDate.now();
kvBus.increment(dailyKey);
kvBus.setTtl(dailyKey, Duration.ofDays(1));
6.5 Lock distribué (Redis)
public boolean tryLock(String resource, Duration timeout) {
String lockKey = "lock:" + resource;
return kvBus.putIfAbsent(lockKey, "locked", timeout);
}
public void unlock(String resource) {
kvBus.delete("lock:" + resource);
}
// Utilisation
if (tryLock("order-processing", Duration.ofMinutes(5))) {
try {
processOrders();
} finally {
unlock("order-processing");
}
}
6.6 Cache avec TTL
public Order getOrder(String orderId) {
String cacheKey = "cache:order:" + orderId;
// Check cache
Optional<Order> cached = kvBus.getJson(cacheKey, Order.class);
if (cached.isPresent()) {
return cached.get();
}
// Load from DB
Order order = orderRepository.findById(orderId);
// Cache for 5 minutes
kvBus.putJson(cacheKey, order);
kvBus.setTtl(cacheKey, Duration.ofMinutes(5));
return order;
}
7. Patterns avancés
7.1 Rate limiting
public boolean isRateLimited(String userId, int maxRequests, Duration window) {
String key = "ratelimit:" + userId + ":" + Instant.now().truncatedTo(ChronoUnit.MINUTES);
long count = kvBus.increment(key);
if (count == 1) {
kvBus.setTtl(key, window);
}
return count > maxRequests;
}
7.2 Session management
public void createSession(String sessionId, User user) {
kvBus.putJson("session:" + sessionId, user);
kvBus.setTtl("session:" + sessionId, Duration.ofHours(24));
}
public Optional<User> getSession(String sessionId) {
return kvBus.getJson("session:" + sessionId, User.class);
}
public void refreshSession(String sessionId) {
kvBus.setTtl("session:" + sessionId, Duration.ofHours(24));
}
public void destroySession(String sessionId) {
kvBus.delete("session:" + sessionId);
}
7.3 Feature flags
public boolean isFeatureEnabled(String feature) {
return kvBus.get("feature:" + feature)
.map(Boolean::parseBoolean)
.orElse(false);
}
public void setFeatureFlag(String feature, boolean enabled) {
kvBus.put("feature:" + feature, String.valueOf(enabled));
}
8. KvBus vs TechDB (V4)
| Aspect | KvBus | TechDB (H2) |
|---|---|---|
| Cas d’usage | Cache, sessions, locks | Offsets, état persistant |
| Survie restart | Non (in_memory) / Oui (Redis) | Oui (fichier) |
| Multi-instances | Non (in_memory) / Oui (Redis) | Non (par instance) |
| Performance | Ultra rapide | Rapide |
| Requêtes | Clé simple | SQL, JSON |
Règle de choix
- KvBus : Données temporaires, cache, sessions, compteurs temps réel
- TechDB : Offsets, checkpoints, état qui doit survivre au restart
9. Monitoring
9.1 Métriques
socle_kvbus_operations_total{operation="get"}
socle_kvbus_operations_total{operation="put"}
socle_kvbus_operations_total{operation="delete"}
socle_kvbus_latency_seconds{operation="get"}
socle_kvbus_keys_count
9.2 Health Check
@Component
public class KvBusHealthIndicator implements HealthIndicator {
@Autowired
private KvBus kvBus;
@Override
public Health health() {
if (kvBus.isHealthy()) {
return Health.up().build();
}
return Health.down().withDetail("error", "KvBus not responding").build();
}
}
10. Bonnes pratiques
DO
- Utiliser des préfixes de clés cohérents (
user:,session:,cache:) - Toujours définir un TTL pour les caches
- Utiliser
putIfAbsentpour les locks - Préférer Redis en production multi-instances
DON’T
- Ne pas stocker de données volumineuses (> 1MB)
- Ne pas utiliser
keys("*")en production (scan) - Ne pas oublier de fermer les connexions
- Ne pas utiliser in_memory pour les données critiques en prod
11. Références
- 07-SHARED-DATA – SharedDataRegistry
- 21-H2-TECHDB – TechDB
- Redis Documentation

Laisser un commentaire