30 – EventBus et Workers Push
Version : 4.1.0 Date : 2026-01-23 Statut : Specification
1. Introduction
Ce document introduit le nouveau systeme EventBus et les Workers Push dans le Socle V004. Ce mecanisme remplace avantageusement le polling par un systeme de notification instantanee.
1.1 Motivation
Le mecanisme actuel des workers event-driven (AbstractEventDrivenWorker) utilise un polling avec timeout :
// Ancien mecanisme (DEPRECIE)
while (running) {
event = queue.poll(100, TimeUnit.MILLISECONDS); // Verifie toutes les 100ms
if (event != null) processEvent(event);
}
Problemes :
- Latence de 0 a 100ms avant traitement
- Consommation CPU constante (meme si faible)
- Pas de vrai mecanisme push
1.2 Nouveau mecanisme Push
// Nouveau mecanisme (RECOMMANDE)
while (running) {
event = queue.take(); // Dort jusqu'a ce qu'un event arrive
processEvent(event); // Reveil instantane
}
Avantages :
- Latence ~0ms (reveil instantane)
- Zero CPU quand idle
- Vrai mecanisme push event-driven
2. Depreciation des anciens Workers
2.1 Classes depreciees
| Classe |
Statut |
Remplacement |
AbstractEventDrivenWorker<T> |
DEPRECIE |
AbstractPushWorker<T> |
KafkaEventWorker<K,V> |
DEPRECIE |
KafkaPushWorker<K,V> |
2.2 Migration
Les anciens workers continuent de fonctionner mais ne doivent plus etre utilises pour les nouveaux developpements.
// DEPRECIE - Ne plus utiliser
@Deprecated(since = "4.1.0", forRemoval = true)
public abstract class AbstractEventDrivenWorker<T> implements Worker { ... }
// RECOMMANDE - Utiliser ceci
public abstract class AbstractPushWorker<T> implements Worker { ... }
2.3 Calendrier de depreciation
| Version |
Action |
| 4.1.0 |
Introduction EventBus + Workers Push, depreciation des anciens |
| 4.2.0 |
Warnings de compilation pour les anciens workers |
| 5.0.0 |
Suppression des anciens workers (breaking change) |
3. Architecture EventBus
3.1 Vue d’ensemble
┌─────────────────────────────────────────────────────────────────────┐
│ MOP │
│ │
│ ┌──────────────────────────────────────────────────────────────┐ │
│ │ EventBus │ │
│ │ │ │
│ │ Channels: │ │
│ │ "orders.created" ──► [Worker1.queue, Worker2.queue] │ │
│ │ "orders.updated" ──► [Worker1.queue] │ │
│ │ "alerts.*" ──► [AlertWorker.queue] │ │
│ │ │ │
│ └──────────────────────────────────────────────────────────────┘ │
│ │ │
│ publish("orders.created", event) │
│ │ │
│ ▼ │
│ ┌────────────────┐ ┌────────────────┐ ┌────────────────┐ │
│ │ PushWorker 1 │ │ PushWorker 2 │ │ PushWorker 3 │ │
│ │ │ │ │ │ │ │
│ │ queue.take() │ │ queue.take() │ │ queue.take() │ │
│ │ (dort) │ │ (dort) │ │ (dort) │ │
│ │ ↓ │ │ ↓ │ │ │ │
│ │ REVEIL! │ │ REVEIL! │ │ (pas abonne) │ │
│ └────────────────┘ └────────────────┘ └────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────────┘
3.2 Composants
| Composant |
Role |
EventBus |
Bus central de publication/souscription |
BusEvent<T> |
Enveloppe d’evenement avec metadata |
AbstractPushWorker<T> |
Worker reveille par EventBus |
KafkaPushWorker<K,V> |
Worker Kafka avec push interne |
EventBusWorker |
Worker qui gere le lifecycle de l’EventBus |
4. Interface EventBus
4.1 Definition
package eu.lmvi.socle.event;
/**
* Bus d'evenements central avec mecanisme push.
*
* <p>L'EventBus permet la communication entre workers via un systeme
* de publication/souscription avec reveil instantane.</p>
*/
public interface EventBus {
// ==================== Publication ====================
/**
* Publie un evenement sur un channel.
* Les subscribers sont reveilles instantanement.
*
* @param channel Nom du channel (ex: "orders.created")
* @param payload Contenu de l'evenement
*/
void publish(String channel, Object payload);
/**
* Publie un evenement avec metadata personnalisees.
*
* @param channel Nom du channel
* @param payload Contenu de l'evenement
* @param metadata Metadata additionnelles
*/
void publish(String channel, Object payload, Map<String, String> metadata);
/**
* Publication asynchrone avec confirmation.
*
* @return Future completee quand tous les subscribers ont recu l'event
*/
CompletableFuture<PublishResult> publishAsync(String channel, Object payload);
// ==================== Souscription ====================
/**
* S'abonne a un channel et retourne une queue pour reception.
* Utiliser queue.take() pour attendre les evenements (push).
*
* @param channel Nom du channel
* @param eventType Type attendu du payload
* @return Queue sur laquelle faire take()
*/
<T> Subscription<T> subscribe(String channel, Class<T> eventType);
/**
* S'abonne avec un pattern glob (ex: "orders.*", "*.created").
*
* @param pattern Pattern glob
* @param eventType Type attendu du payload
* @return Subscription avec queue
*/
<T> Subscription<T> subscribePattern(String pattern, Class<T> eventType);
/**
* S'abonne avec un handler callback (execution immediate).
*
* @param channel Nom du channel
* @param handler Handler appele pour chaque evenement
* @return ID de souscription pour unsubscribe
*/
String subscribe(String channel, EventHandler handler);
/**
* Se desabonne.
*
* @param subscriptionId ID retourne par subscribe
*/
void unsubscribe(String subscriptionId);
// ==================== Lifecycle ====================
/**
* Demarre l'EventBus.
*/
void start();
/**
* Arrete l'EventBus proprement.
* Les queues sont videes, les subscribers notifies.
*/
void stop();
/**
* Verifie la sante de l'EventBus.
*/
boolean isHealthy();
// ==================== Stats ====================
/**
* Statistiques globales.
*/
EventBusStats getStats();
/**
* Statistiques par channel.
*/
Map<String, ChannelStats> getChannelStats();
}
4.2 Classes de support
package eu.lmvi.socle.event;
/**
* Evenement transporte par l'EventBus.
*/
public record BusEvent<T>(
String id, // UUID unique
String channel, // Channel de publication
Instant timestamp, // Timestamp de publication
String source, // Source (worker name)
T payload, // Contenu metier
Map<String, String> metadata // Metadata additionnelles
) {
/**
* Cree un evenement simple.
*/
public static <T> BusEvent<T> of(String channel, T payload) {
return new BusEvent<>(
UUID.randomUUID().toString(),
channel,
Instant.now(),
null,
payload,
Map.of()
);
}
}
/**
* Souscription a un channel.
*/
public interface Subscription<T> {
/**
* ID unique de la souscription.
*/
String getId();
/**
* Channel ou pattern souscrit.
*/
String getChannelOrPattern();
/**
* Queue de reception des evenements.
* Utiliser take() pour attendre (bloquant, reveil instantane).
* Utiliser poll(timeout) si besoin d'un timeout.
*/
BlockingQueue<BusEvent<T>> getQueue();
/**
* Se desabonner.
*/
void unsubscribe();
/**
* Nombre d'evenements en attente dans la queue.
*/
int getPendingCount();
}
/**
* Handler pour souscription callback.
*/
@FunctionalInterface
public interface EventHandler {
void onEvent(BusEvent<?> event);
}
/**
* Resultat de publication.
*/
public record PublishResult(
String eventId,
int deliveredCount,
int failedCount,
Duration duration
) {}
/**
* Statistiques EventBus.
*/
public record EventBusStats(
long totalPublished,
long totalDelivered,
long totalFailed,
int activeSubscriptions,
int activeChannels,
Instant startedAt
) {}
/**
* Statistiques par channel.
*/
public record ChannelStats(
String channel,
long publishedCount,
int subscriberCount,
long lastPublishedAt
) {}
5. Implementations EventBus
5.1 InMemoryEventBus
Implementation pour mono-instance (dev, tests, applications simples).
package eu.lmvi.socle.event.impl;
@Component
@ConditionalOnProperty(name = "socle.eventbus.mode", havingValue = "in_memory", matchIfMissing = true)
public class InMemoryEventBus implements EventBus {
private static final Logger log = LoggerFactory.getLogger(InMemoryEventBus.class);
// Subscriptions par channel exact
private final Map<String, Set<SubscriptionImpl<?>>> channelSubs = new ConcurrentHashMap<>();
// Subscriptions par pattern
private final Map<Pattern, Set<SubscriptionImpl<?>>> patternSubs = new ConcurrentHashMap<>();
// Index pour lookup rapide
private final Map<String, SubscriptionImpl<?>> subsIndex = new ConcurrentHashMap<>();
// Compteurs
private final AtomicLong idGen = new AtomicLong();
private final AtomicLong publishedCount = new AtomicLong();
private final AtomicLong deliveredCount = new AtomicLong();
private final AtomicLong failedCount = new AtomicLong();
// Etat
private volatile boolean running = false;
private Instant startedAt;
// ==================== Lifecycle ====================
@Override
public void start() {
running = true;
startedAt = Instant.now();
log.info("InMemoryEventBus started");
}
@Override
public void stop() {
running = false;
// Notifier tous les subscribers (poison pill)
for (SubscriptionImpl<?> sub : subsIndex.values()) {
sub.notifyShutdown();
}
log.info("InMemoryEventBus stopped. Published={}, Delivered={}, Failed={}",
publishedCount.get(), deliveredCount.get(), failedCount.get());
}
@Override
public boolean isHealthy() {
return running;
}
// ==================== Publish ====================
@Override
public void publish(String channel, Object payload) {
publish(channel, payload, Map.of());
}
@Override
public void publish(String channel, Object payload, Map<String, String> metadata) {
if (!running) {
log.warn("EventBus not running, event dropped on channel '{}'", channel);
return;
}
BusEvent<?> event = new BusEvent<>(
"evt-" + idGen.incrementAndGet(),
channel,
Instant.now(),
resolveSource(),
payload,
metadata
);
int delivered = 0;
int failed = 0;
// Livraison aux subscriptions exactes
Set<SubscriptionImpl<?>> exact = channelSubs.get(channel);
if (exact != null) {
for (SubscriptionImpl<?> sub : exact) {
if (deliverToSubscription(sub, event)) {
delivered++;
} else {
failed++;
}
}
}
// Livraison aux subscriptions par pattern
for (Map.Entry<Pattern, Set<SubscriptionImpl<?>>> entry : patternSubs.entrySet()) {
if (entry.getKey().matcher(channel).matches()) {
for (SubscriptionImpl<?> sub : entry.getValue()) {
if (deliverToSubscription(sub, event)) {
delivered++;
} else {
failed++;
}
}
}
}
publishedCount.incrementAndGet();
deliveredCount.addAndGet(delivered);
failedCount.addAndGet(failed);
log.debug("Published event {} to channel '{}': delivered={}, failed={}",
event.id(), channel, delivered, failed);
}
@Override
public CompletableFuture<PublishResult> publishAsync(String channel, Object payload) {
return CompletableFuture.supplyAsync(() -> {
Instant start = Instant.now();
publish(channel, payload);
// Simplification: on ne trace pas le detail ici
return new PublishResult(
"evt-" + idGen.get(),
1, 0,
Duration.between(start, Instant.now())
);
});
}
@SuppressWarnings("unchecked")
private boolean deliverToSubscription(SubscriptionImpl<?> sub, BusEvent<?> event) {
try {
BlockingQueue<BusEvent<?>> queue = (BlockingQueue<BusEvent<?>>) sub.getQueue();
// offer avec timeout pour eviter blocage infini
boolean offered = queue.offer(event, 100, TimeUnit.MILLISECONDS);
if (!offered) {
log.warn("Queue full for subscription {}, event dropped", sub.getId());
return false;
}
return true;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return false;
} catch (Exception e) {
log.error("Error delivering to subscription {}: {}", sub.getId(), e.getMessage());
return false;
}
}
private String resolveSource() {
// Recuperer le nom du worker courant si possible
String threadName = Thread.currentThread().getName();
return threadName;
}
// ==================== Subscribe ====================
@Override
public <T> Subscription<T> subscribe(String channel, Class<T> eventType) {
String subId = "sub-" + idGen.incrementAndGet();
SubscriptionImpl<T> sub = new SubscriptionImpl<>(subId, channel, null, this);
channelSubs
.computeIfAbsent(channel, k -> ConcurrentHashMap.newKeySet())
.add(sub);
subsIndex.put(subId, sub);
log.info("Subscription {} created on channel '{}'", subId, channel);
return sub;
}
@Override
public <T> Subscription<T> subscribePattern(String pattern, Class<T> eventType) {
String subId = "sub-" + idGen.incrementAndGet();
// Convertir glob en regex
String regex = pattern
.replace(".", "\\.")
.replace("*", "[^.]*")
.replace(">", ".*");
Pattern compiled = Pattern.compile("^" + regex + "$");
SubscriptionImpl<T> sub = new SubscriptionImpl<>(subId, pattern, compiled, this);
patternSubs
.computeIfAbsent(compiled, k -> ConcurrentHashMap.newKeySet())
.add(sub);
subsIndex.put(subId, sub);
log.info("Subscription {} created on pattern '{}'", subId, pattern);
return sub;
}
@Override
public String subscribe(String channel, EventHandler handler) {
// Implementation simplifiee: on cree une subscription normale
// et un thread qui consomme et appelle le handler
Subscription<Object> sub = subscribe(channel, Object.class);
Thread.ofVirtual()
.name("handler-" + sub.getId())
.start(() -> {
while (running) {
try {
BusEvent<Object> event = sub.getQueue().take();
if (event.payload() == null) break; // Poison pill
handler.onEvent(event);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
} catch (Exception e) {
log.error("Handler error on {}: {}", channel, e.getMessage());
}
}
});
return sub.getId();
}
@Override
public void unsubscribe(String subscriptionId) {
SubscriptionImpl<?> sub = subsIndex.remove(subscriptionId);
if (sub == null) {
log.warn("Subscription {} not found", subscriptionId);
return;
}
// Retirer des maps
if (sub.pattern == null) {
Set<SubscriptionImpl<?>> subs = channelSubs.get(sub.channelOrPattern);
if (subs != null) subs.remove(sub);
} else {
Set<SubscriptionImpl<?>> subs = patternSubs.get(sub.pattern);
if (subs != null) subs.remove(sub);
}
sub.notifyShutdown();
log.info("Subscription {} removed", subscriptionId);
}
// ==================== Stats ====================
@Override
public EventBusStats getStats() {
return new EventBusStats(
publishedCount.get(),
deliveredCount.get(),
failedCount.get(),
subsIndex.size(),
channelSubs.size(),
startedAt
);
}
@Override
public Map<String, ChannelStats> getChannelStats() {
Map<String, ChannelStats> stats = new HashMap<>();
for (Map.Entry<String, Set<SubscriptionImpl<?>>> entry : channelSubs.entrySet()) {
stats.put(entry.getKey(), new ChannelStats(
entry.getKey(),
0, // TODO: compteur par channel
entry.getValue().size(),
0
));
}
return stats;
}
// ==================== Subscription Implementation ====================
private static class SubscriptionImpl<T> implements Subscription<T> {
private final String id;
private final String channelOrPattern;
private final Pattern pattern;
private final BlockingQueue<BusEvent<T>> queue;
private final InMemoryEventBus bus;
SubscriptionImpl(String id, String channelOrPattern, Pattern pattern, InMemoryEventBus bus) {
this.id = id;
this.channelOrPattern = channelOrPattern;
this.pattern = pattern;
this.queue = new LinkedBlockingQueue<>(10_000); // Capacite par defaut
this.bus = bus;
}
@Override
public String getId() {
return id;
}
@Override
public String getChannelOrPattern() {
return channelOrPattern;
}
@Override
public BlockingQueue<BusEvent<T>> getQueue() {
return queue;
}
@Override
public void unsubscribe() {
bus.unsubscribe(id);
}
@Override
public int getPendingCount() {
return queue.size();
}
void notifyShutdown() {
// Injecter un poison pill pour debloquer les take()
try {
queue.offer(new BusEvent<>(null, null, null, null, null, null));
} catch (Exception ignored) {}
}
}
}
5.2 RedisEventBus
Implementation pour multi-instances (production).
package eu.lmvi.socle.event.impl;
@Component
@ConditionalOnProperty(name = "socle.eventbus.mode", havingValue = "redis")
public class RedisEventBus implements EventBus {
private final JedisPool jedisPool;
private final ObjectMapper mapper;
private final String prefix;
private final ExecutorService listenerExecutor;
// Subscriptions locales (une queue locale par subscription)
private final Map<String, LocalSubscription<?>> localSubs = new ConcurrentHashMap<>();
// JedisPubSub par channel (partage entre subscriptions locales)
private final Map<String, RedisPubSubListener> redisListeners = new ConcurrentHashMap<>();
// ... Implementation similaire avec Redis Pub/Sub ...
@Override
public void publish(String channel, Object payload) {
try (Jedis jedis = jedisPool.getResource()) {
BusEvent<?> event = BusEvent.of(channel, payload);
String json = mapper.writeValueAsString(event);
jedis.publish(prefix + channel, json);
} catch (Exception e) {
log.error("Redis publish error: {}", e.getMessage());
}
}
// Redis SUBSCRIBE dans un thread dedie
// Dispatch vers les queues locales
}
6. AbstractPushWorker
6.1 Definition
package eu.lmvi.socle.worker.push;
/**
* Worker reveille instantanement par l'EventBus.
*
* <p>Contrairement a {@link AbstractEventDrivenWorker} (DEPRECIE) qui utilise
* un polling avec timeout, ce worker utilise un mecanisme push avec reveil
* instantane via {@link BlockingQueue#take()}.</p>
*
* <h3>Avantages</h3>
* <ul>
* <li>Latence ~0ms (reveil instantane)</li>
* <li>Zero CPU quand idle</li>
* <li>Integration native avec EventBus</li>
* </ul>
*
* <h3>Exemple d'utilisation</h3>
* <pre>{@code
* @Component
* public class OrderWorker extends AbstractPushWorker<OrderEvent> {
*
* public OrderWorker() {
* super("orders.created", "orders.updated");
* }
*
* @Override
* public String getName() {
* return "order_worker";
* }
*
* @Override
* protected Class<OrderEvent> getEventType() {
* return OrderEvent.class;
* }
*
* @Override
* protected void processEvent(OrderEvent event) {
* // Traitement instantane des que l'event arrive
* }
* }
* }</pre>
*
* @param <T> Type du payload des evenements
*/
public abstract class AbstractPushWorker<T> implements Worker {
protected final Logger logger = LoggerFactory.getLogger(getClass());
// ==================== Injection ====================
@Autowired
protected EventBus eventBus;
// ==================== Configuration ====================
private final String[] channels;
private final int concurrency;
private final int queueCapacity;
private final Duration shutdownTimeout;
// ==================== Etat ====================
protected final AtomicBoolean running = new AtomicBoolean(false);
protected ExecutorService executor;
protected List<Subscription<T>> subscriptions = new ArrayList<>();
protected BlockingQueue<BusEvent<T>> mergedQueue;
// ==================== Metriques ====================
protected final AtomicLong processedCount = new AtomicLong(0);
protected final AtomicLong errorCount = new AtomicLong(0);
protected final AtomicLong totalDurationNs = new AtomicLong(0);
protected volatile Instant lastExecution;
protected volatile Instant startedAt;
// ==================== Constructeurs ====================
/**
* Cree un worker push mono-thread.
*
* @param channels Channels a ecouter
*/
protected AbstractPushWorker(String... channels) {
this(1, channels);
}
/**
* Cree un worker push avec concurrence.
*
* @param concurrency Nombre de threads de traitement
* @param channels Channels a ecouter
*/
protected AbstractPushWorker(int concurrency, String... channels) {
this(concurrency, 10_000, Duration.ofSeconds(30), channels);
}
/**
* Cree un worker push avec configuration complete.
*
* @param concurrency Nombre de threads de traitement
* @param queueCapacity Capacite de la queue interne
* @param shutdownTimeout Timeout d'arret gracieux
* @param channels Channels a ecouter
*/
protected AbstractPushWorker(int concurrency, int queueCapacity,
Duration shutdownTimeout, String... channels) {
this.concurrency = Math.max(1, concurrency);
this.queueCapacity = queueCapacity;
this.shutdownTimeout = shutdownTimeout;
this.channels = channels;
if (channels == null || channels.length == 0) {
throw new IllegalArgumentException("At least one channel is required");
}
}
// ==================== Worker Interface ====================
@Override
public final String getSchedule() {
return "PUSH";
}
@Override
public final long getCycleIntervalMs() {
return -1;
}
@Override
public final boolean isPassive() {
return true;
}
@Override
public int getStartPriority() {
return 200; // Apres l'EventBus (100)
}
@Override
public int getStopPriority() {
return 200; // Avant l'EventBus
}
@Override
public void initialize() {
logger.info("[{}] Initializing push worker", getName());
logger.info("[{}] Channels: {}", getName(), Arrays.toString(channels));
logger.info("[{}] Concurrency: {}", getName(), concurrency);
// Queue fusionnee pour tous les channels
mergedQueue = new LinkedBlockingQueue<>(queueCapacity);
onInitialize();
}
@Override
public void start() {
if (running.getAndSet(true)) {
logger.warn("[{}] Already running", getName());
return;
}
startedAt = Instant.now();
// S'abonner aux channels
for (String channel : channels) {
if (channel.contains("*") || channel.contains(">")) {
// Pattern subscription
Subscription<T> sub = eventBus.subscribePattern(channel, getEventType());
subscriptions.add(sub);
startForwarder(sub);
} else {
// Exact subscription
Subscription<T> sub = eventBus.subscribe(channel, getEventType());
subscriptions.add(sub);
startForwarder(sub);
}
}
// Demarrer les workers de traitement
executor = Executors.newThreadPerTaskExecutor(
Thread.ofVirtual().name(getName() + "-push-", 0).factory()
);
for (int i = 0; i < concurrency; i++) {
final int workerId = i;
executor.submit(() -> pushLoop(workerId));
}
onStarted();
logger.info("[{}] Started with {} workers on {} channels",
getName(), concurrency, channels.length);
}
/**
* Demarre un thread qui forward les events d'une subscription vers la queue fusionnee.
*/
private void startForwarder(Subscription<T> sub) {
Thread.ofVirtual()
.name(getName() + "-fwd-" + sub.getId())
.start(() -> {
while (running.get()) {
try {
BusEvent<T> event = sub.getQueue().take();
// Verifier poison pill
if (event.id() == null) break;
// Forward vers la queue fusionnee
if (!mergedQueue.offer(event, 1, TimeUnit.SECONDS)) {
logger.warn("[{}] Merged queue full, event dropped", getName());
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
}
}
});
}
@Override
public void stop() {
if (!running.getAndSet(false)) {
logger.warn("[{}] Not running", getName());
return;
}
logger.info("[{}] Stopping...", getName());
onStopping();
// Se desabonner (injecte des poison pills)
for (Subscription<T> sub : subscriptions) {
try {
sub.unsubscribe();
} catch (Exception e) {
logger.warn("[{}] Error unsubscribing: {}", getName(), e.getMessage());
}
}
subscriptions.clear();
// Injecter poison pills dans la queue fusionnee
for (int i = 0; i < concurrency; i++) {
mergedQueue.offer(new BusEvent<>(null, null, null, null, null, null));
}
// Attendre la fin des workers
if (executor != null) {
executor.shutdown();
try {
if (!executor.awaitTermination(shutdownTimeout.toMillis(), TimeUnit.MILLISECONDS)) {
logger.warn("[{}] Forcing shutdown", getName());
executor.shutdownNow();
}
} catch (InterruptedException e) {
executor.shutdownNow();
Thread.currentThread().interrupt();
}
}
onStopped();
long uptime = Duration.between(startedAt, Instant.now()).toSeconds();
logger.info("[{}] Stopped after {}s. Processed={}, Errors={}",
getName(), uptime, processedCount.get(), errorCount.get());
}
@Override
public void doWork() {
// Non utilise en mode PUSH
}
@Override
public boolean isHealthy() {
return running.get() && executor != null && !executor.isShutdown();
}
@Override
public Map<String, Object> getStats() {
long uptime = startedAt != null
? Duration.between(startedAt, Instant.now()).toMillis()
: 0;
double avgDurationMs = processedCount.get() > 0
? (totalDurationNs.get() / 1_000_000.0) / processedCount.get()
: 0;
double throughput = uptime > 0
? processedCount.get() * 1000.0 / uptime
: 0;
Map<String, Object> stats = new HashMap<>();
// Cles standardisees
stats.put("state", running.get() ? "running" : "stopped");
stats.put("schedule", "PUSH");
stats.put("execution_count", processedCount.get());
stats.put("errors_count", errorCount.get());
stats.put("last_execution", lastExecution != null ? lastExecution.toString() : null);
// Cles specifiques
stats.put("channels", channels);
stats.put("concurrency", concurrency);
stats.put("queue_size", mergedQueue != null ? mergedQueue.size() : 0);
stats.put("queue_capacity", queueCapacity);
stats.put("uptime_ms", uptime);
stats.put("avg_duration_ms", Math.round(avgDurationMs * 100) / 100.0);
stats.put("throughput_per_sec", Math.round(throughput * 100) / 100.0);
// Stats des subscriptions
stats.put("subscriptions", subscriptions.stream()
.map(s -> Map.of(
"id", s.getId(),
"channel", s.getChannelOrPattern(),
"pending", s.getPendingCount()
))
.toList());
return stats;
}
// ==================== Push Loop ====================
private void pushLoop(int workerId) {
logger.debug("[{}] Push loop #{} started", getName(), workerId);
while (running.get()) {
try {
// BLOQUE jusqu'a reception d'un event (reveil instantane)
BusEvent<T> event = mergedQueue.take();
// Verifier poison pill
if (event.id() == null) {
logger.debug("[{}] Push loop #{} received shutdown signal", getName(), workerId);
break;
}
// Traiter l'event
processEventSafe(event, workerId);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
logger.debug("[{}] Push loop #{} interrupted", getName(), workerId);
break;
} catch (Exception e) {
logger.error("[{}] Unexpected error in push loop #{}: {}",
getName(), workerId, e.getMessage(), e);
}
}
logger.debug("[{}] Push loop #{} stopped", getName(), workerId);
}
private void processEventSafe(BusEvent<T> event, int workerId) {
long startNs = System.nanoTime();
try {
processEvent(event.payload(), event);
processedCount.incrementAndGet();
} catch (Exception e) {
errorCount.incrementAndGet();
handleError(event, e, workerId);
} finally {
long durationNs = System.nanoTime() - startNs;
totalDurationNs.addAndGet(durationNs);
lastExecution = Instant.now();
}
}
// ==================== Methodes abstraites ====================
/**
* Type du payload des evenements.
* Utilise pour le typage de la souscription.
*/
protected abstract Class<T> getEventType();
/**
* Traite un evenement.
*
* @param payload Contenu de l'evenement
* @param event Evenement complet (avec metadata)
*/
protected abstract void processEvent(T payload, BusEvent<T> event);
/**
* Version simplifiee sans acces a l'enveloppe.
* Surcharger processEvent(T, BusEvent) pour avoir acces aux metadata.
*/
protected void processEvent(T payload) {
// Par defaut, appelle la version complete
}
// ==================== Hooks ====================
/**
* Hook d'initialisation.
*/
protected void onInitialize() {}
/**
* Hook post-demarrage.
*/
protected void onStarted() {}
/**
* Hook pre-arret.
*/
protected void onStopping() {}
/**
* Hook post-arret.
*/
protected void onStopped() {}
/**
* Gestion des erreurs.
* Par defaut, log l'erreur. Surcharger pour DLQ, retry, etc.
*/
protected void handleError(BusEvent<T> event, Exception e, int workerId) {
logger.error("[{}] Worker #{} error processing event {}: {}",
getName(), workerId, event.id(), e.getMessage(), e);
}
// ==================== Utilitaires ====================
/**
* Publie un evenement sur l'EventBus.
* Raccourci pratique pour les workers qui produisent aussi des events.
*/
protected void publish(String channel, Object payload) {
eventBus.publish(channel, payload);
}
/**
* Retourne le nombre d'evenements en attente.
*/
public long getBacklog() {
int pending = mergedQueue != null ? mergedQueue.size() : 0;
for (Subscription<T> sub : subscriptions) {
pending += sub.getPendingCount();
}
return pending;
}
}
6.2 Version simplifiee (single channel)
/**
* Version simplifiee pour un seul channel.
*/
public abstract class SimplePushWorker<T> extends AbstractPushWorker<T> {
protected SimplePushWorker(String channel) {
super(channel);
}
protected SimplePushWorker(int concurrency, String channel) {
super(concurrency, channel);
}
@Override
protected void processEvent(T payload, BusEvent<T> event) {
processEvent(payload);
}
/**
* Traite le payload directement.
*/
protected abstract void processEvent(T payload);
}
7. Configuration
7.1 application.yml
socle:
eventbus:
# Activer l'EventBus
enabled: ${EVENTBUS_ENABLED:true}
# Mode: in_memory | redis
mode: ${EVENTBUS_MODE:in_memory}
# Configuration commune
common:
# Capacite par defaut des queues de subscription
default_queue_capacity: ${EVENTBUS_QUEUE_CAPACITY:10000}
# Timeout pour offer() quand queue pleine
offer_timeout_ms: ${EVENTBUS_OFFER_TIMEOUT:100}
# Configuration InMemory
in_memory:
# Rien de special
# Configuration Redis
redis:
host: ${REDIS_HOST:localhost}
port: ${REDIS_PORT:6379}
password: ${REDIS_PASSWORD:}
database: ${REDIS_DATABASE:0}
prefix: ${EVENTBUS_REDIS_PREFIX:socle:eventbus}
# Pool de connexions
pool:
max_total: ${EVENTBUS_REDIS_POOL_MAX:16}
max_idle: ${EVENTBUS_REDIS_POOL_MAX_IDLE:8}
min_idle: ${EVENTBUS_REDIS_POOL_MIN_IDLE:2}
# Metriques
metrics:
enabled: ${EVENTBUS_METRICS_ENABLED:true}
prefix: socle_eventbus
7.2 Variables d’environnement
| Variable |
Description |
Defaut |
EVENTBUS_ENABLED |
Activer l’EventBus |
true |
EVENTBUS_MODE |
Mode (in_memory / redis) |
in_memory |
EVENTBUS_QUEUE_CAPACITY |
Capacite des queues |
10000 |
REDIS_HOST |
Host Redis |
localhost |
REDIS_PORT |
Port Redis |
6379 |
8. Exemples d’utilisation
8.1 Worker simple
@Component
public class OrderNotificationWorker extends SimplePushWorker<OrderEvent> {
@Autowired
private NotificationService notificationService;
public OrderNotificationWorker() {
super(2, "orders.created"); // 2 workers concurrents
}
@Override
public String getName() {
return "order_notification_worker";
}
@Override
protected Class<OrderEvent> getEventType() {
return OrderEvent.class;
}
@Override
protected void processEvent(OrderEvent event) {
notificationService.notifyOrderCreated(event.orderId(), event.customerEmail());
}
}
8.2 Worker multi-channels
@Component
public class AuditWorker extends AbstractPushWorker<Object> {
@Autowired
private AuditRepository auditRepository;
public AuditWorker() {
super(4, "orders.*", "payments.*", "users.*"); // Pattern matching
}
@Override
public String getName() {
return "audit_worker";
}
@Override
protected Class<Object> getEventType() {
return Object.class; // Accepte tout
}
@Override
protected void processEvent(Object payload, BusEvent<Object> event) {
// Acces aux metadata
AuditEntry entry = new AuditEntry(
event.id(),
event.channel(),
event.timestamp(),
event.source(),
payload
);
auditRepository.save(entry);
}
}
8.3 Publication d’evenements
@Service
public class OrderService {
@Autowired
private EventBus eventBus;
@Autowired
private OrderRepository orderRepository;
public Order createOrder(CreateOrderRequest request) {
// Logique metier
Order order = orderRepository.save(new Order(request));
// Publication sur l'EventBus
// Tous les workers abonnes a "orders.created" sont reveilles instantanement
eventBus.publish("orders.created", new OrderEvent(
order.getId(),
order.getCustomerEmail(),
order.getTotal()
));
return order;
}
public void updateOrderStatus(String orderId, OrderStatus newStatus) {
Order order = orderRepository.updateStatus(orderId, newStatus);
eventBus.publish("orders.updated", new OrderStatusEvent(
orderId,
newStatus
));
}
}
8.4 Worker avec gestion d’erreur personnalisee
@Component
public class PaymentWorker extends AbstractPushWorker<PaymentEvent> {
@Autowired
private PaymentService paymentService;
@Autowired
private DeadLetterQueue dlq;
public PaymentWorker() {
super(2, "payments.process");
}
@Override
public String getName() {
return "payment_worker";
}
@Override
protected Class<PaymentEvent> getEventType() {
return PaymentEvent.class;
}
@Override
protected void processEvent(PaymentEvent payload, BusEvent<PaymentEvent> event) {
paymentService.processPayment(payload);
}
@Override
protected void handleError(BusEvent<PaymentEvent> event, Exception e, int workerId) {
logger.error("[{}] Payment processing failed for {}: {}",
getName(), event.payload().paymentId(), e.getMessage());
// Envoyer en DLQ pour retry ulterieur
dlq.send("payments.dlq", event, e);
}
}
9. Solutions aux questions ouvertes
9.1 Arret propre des workers en take()
Probleme : BlockingQueue.take() bloque indefiniment. Comment arreter proprement ?
Solution : Poison Pill Pattern
// A l'arret, injecter un evenement "poison" reconnaissable
@Override
public void stop() {
running.set(false);
// Injecter N poison pills (un par worker)
for (int i = 0; i < concurrency; i++) {
queue.offer(new BusEvent<>(null, null, null, null, null, null));
}
}
// Dans la boucle, verifier le poison
private void pushLoop(int workerId) {
while (running.get()) {
BusEvent<T> event = queue.take();
// Detecter le poison pill (id == null)
if (event.id() == null) {
break; // Sortir proprement
}
processEvent(event);
}
}
9.2 Backpressure (queue bornee vs illimitee)
Probleme : Que faire si les events arrivent plus vite qu’ils ne sont traites ?
Solution : Queue bornee avec politique de debordement
// Queue bornee (recommande)
BlockingQueue<BusEvent<T>> queue = new LinkedBlockingQueue<>(10_000);
// Lors du publish, gerer le debordement
private boolean deliver(Subscription sub, BusEvent event) {
// offer() avec timeout - ne bloque pas indefiniment
boolean offered = queue.offer(event, 100, TimeUnit.MILLISECONDS);
if (!offered) {
// Politique de debordement:
// Option 1: Drop (log + metrique)
log.warn("Queue full, event dropped");
droppedCount.incrementAndGet();
// Option 2: Envoyer en DLQ
dlq.send("overflow", event);
// Option 3: Alerter
alertService.queueOverflow(sub.getId());
}
return offered;
}
Metriques a surveiller :
socle_eventbus_queue_size : Taille actuelle
socle_eventbus_queue_capacity : Capacite max
socle_eventbus_dropped_total : Events perdus
9.3 Multi-channel avec une seule queue
Probleme : Un worker ecoute plusieurs channels, comment fusionner ?
Solution : Queue fusionnee avec forwarders
// Queue fusionnee
BlockingQueue<BusEvent<T>> mergedQueue = new LinkedBlockingQueue<>();
// Un forwarder par subscription
for (String channel : channels) {
Subscription<T> sub = eventBus.subscribe(channel, eventType);
// Thread qui forward vers la queue fusionnee
Thread.ofVirtual().start(() -> {
while (running.get()) {
BusEvent<T> event = sub.getQueue().take();
mergedQueue.offer(event);
}
});
}
// Les workers consomment la queue fusionnee
void pushLoop() {
while (running.get()) {
BusEvent<T> event = mergedQueue.take();
// event.channel() permet de savoir d'ou il vient
processEvent(event);
}
}
9.4 Ordre des evenements
Probleme : Les evenements doivent-ils etre traites dans l’ordre ?
Solution : Depends du cas d’usage
| Mode |
Garantie |
Implementation |
| Aucune |
Pas d’ordre garanti |
Concurrency > 1 |
| Par channel |
Ordre par channel |
Concurrency = 1 par channel |
| Par cle |
Ordre par cle metier |
Partitioning sur la cle |
// Pour garantir l'ordre par cle (ex: par orderId)
public class OrderedPushWorker extends AbstractPushWorker<OrderEvent> {
// Une queue par partition (hash de la cle)
private final BlockingQueue<BusEvent<OrderEvent>>[] partitionQueues;
private final int partitions = 8;
@Override
protected void processEvent(OrderEvent payload, BusEvent<OrderEvent> event) {
// Le hash garantit que les events du meme orderId vont dans la meme partition
int partition = Math.abs(payload.orderId().hashCode()) % partitions;
partitionQueues[partition].offer(event);
}
// Un worker par partition = ordre garanti par partition
}
9.5 Retry et DLQ
Probleme : Que faire si le traitement echoue ?
Solution : Retry avec backoff puis DLQ
@Component
public class ResilientPushWorker extends AbstractPushWorker<MyEvent> {
@Autowired
private EventBus eventBus;
private static final int MAX_RETRIES = 3;
@Override
protected void processEvent(MyEvent payload, BusEvent<MyEvent> event) {
int retryCount = getRetryCount(event);
try {
doProcess(payload);
} catch (RetryableException e) {
if (retryCount < MAX_RETRIES) {
// Re-publier avec retry count incremente
scheduleRetry(event, retryCount + 1);
} else {
// Max retries atteint -> DLQ
sendToDlq(event, e);
}
}
}
private void scheduleRetry(BusEvent<MyEvent> event, int retryCount) {
// Backoff exponentiel
long delayMs = (long) Math.pow(2, retryCount) * 1000;
// Re-publier apres le delai
scheduler.schedule(() -> {
Map<String, String> metadata = new HashMap<>(event.metadata());
metadata.put("retry_count", String.valueOf(retryCount));
eventBus.publish(event.channel(), event.payload(), metadata);
}, delayMs, TimeUnit.MILLISECONDS);
}
private int getRetryCount(BusEvent<MyEvent> event) {
return Integer.parseInt(event.metadata().getOrDefault("retry_count", "0"));
}
private void sendToDlq(BusEvent<MyEvent> event, Exception e) {
eventBus.publish("dlq." + event.channel(), new DlqEntry(event, e));
}
}
10. Migration des workers existants
10.1 Avant (AbstractEventDrivenWorker)
// DEPRECIE
@Component
public class OldOrderWorker extends AbstractEventDrivenWorker<OrderEvent> {
private final BlockingQueue<OrderEvent> queue = new LinkedBlockingQueue<>();
public OldOrderWorker() {
super(4);
}
@Override
public String getName() {
return "order_worker";
}
@Override
protected OrderEvent pollEvent() throws InterruptedException {
return queue.poll(100, TimeUnit.MILLISECONDS); // Polling!
}
@Override
protected void processEvent(OrderEvent event) {
// Traitement
}
// Methode externe pour recevoir les events
public void onOrderReceived(OrderEvent event) {
queue.offer(event);
}
}
10.2 Apres (AbstractPushWorker)
// RECOMMANDE
@Component
public class NewOrderWorker extends AbstractPushWorker<OrderEvent> {
public NewOrderWorker() {
super(4, "orders.created"); // Specifie le channel
}
@Override
public String getName() {
return "order_worker";
}
@Override
protected Class<OrderEvent> getEventType() {
return OrderEvent.class;
}
@Override
protected void processEvent(OrderEvent payload, BusEvent<OrderEvent> event) {
// Meme traitement
}
// Plus besoin de methode externe - l'EventBus gere tout
}
10.3 Etapes de migration
- Ajouter la dependance EventBus au worker
- Changer la classe parente :
AbstractEventDrivenWorker → AbstractPushWorker
- Specifier les channels dans le constructeur
- Implementer
getEventType()
- Adapter
processEvent() pour accepter BusEvent
- Supprimer la queue interne et
pollEvent()
- Modifier les producteurs pour utiliser
eventBus.publish()
- Tester le comportement
11. Metriques Prometheus
# EventBus
socle_eventbus_published_total{channel="orders.created"}
socle_eventbus_delivered_total{channel="orders.created"}
socle_eventbus_failed_total{channel="orders.created"}
socle_eventbus_subscriptions_active{channel="orders.created"}
socle_eventbus_queue_size{subscription="sub-123"}
# Push Workers
socle_push_worker_processed_total{worker="order_worker"}
socle_push_worker_errors_total{worker="order_worker"}
socle_push_worker_queue_size{worker="order_worker"}
socle_push_worker_duration_seconds{worker="order_worker", quantile="0.99"}
12. API REST Admin
GET /admin/eventbus/stats → Statistiques globales
GET /admin/eventbus/channels → Liste des channels actifs
GET /admin/eventbus/subscriptions → Liste des subscriptions
POST /admin/eventbus/publish → Publier un event (debug)
Body: {"channel": "test", "payload": {...}}
13. Voir aussi
Socle V004 – EventBus et Workers Push