Socle V004 – EventBus et Workers

Socle V004 - EventBus et Workers

30 – EventBus et Workers Push

Version : 4.1.0 Date : 2026-01-23 Statut : Specification

1. Introduction

Ce document introduit le nouveau systeme EventBus et les Workers Push dans le Socle V004. Ce mecanisme remplace avantageusement le polling par un systeme de notification instantanee.

1.1 Motivation

Le mecanisme actuel des workers event-driven (AbstractEventDrivenWorker) utilise un polling avec timeout :

// Ancien mecanisme (DEPRECIE)
while (running) {
    event = queue.poll(100, TimeUnit.MILLISECONDS);  // Verifie toutes les 100ms
    if (event != null) processEvent(event);
}

Problemes :

  • Latence de 0 a 100ms avant traitement
  • Consommation CPU constante (meme si faible)
  • Pas de vrai mecanisme push

1.2 Nouveau mecanisme Push

// Nouveau mecanisme (RECOMMANDE)
while (running) {
    event = queue.take();  // Dort jusqu'a ce qu'un event arrive
    processEvent(event);   // Reveil instantane
}

Avantages :

  • Latence ~0ms (reveil instantane)
  • Zero CPU quand idle
  • Vrai mecanisme push event-driven

2. Depreciation des anciens Workers

2.1 Classes depreciees

Classe Statut Remplacement
AbstractEventDrivenWorker<T> DEPRECIE AbstractPushWorker<T>
KafkaEventWorker<K,V> DEPRECIE KafkaPushWorker<K,V>

2.2 Migration

Les anciens workers continuent de fonctionner mais ne doivent plus etre utilises pour les nouveaux developpements.

// DEPRECIE - Ne plus utiliser
@Deprecated(since = "4.1.0", forRemoval = true)
public abstract class AbstractEventDrivenWorker<T> implements Worker { ... }

// RECOMMANDE - Utiliser ceci
public abstract class AbstractPushWorker<T> implements Worker { ... }

2.3 Calendrier de depreciation

Version Action
4.1.0 Introduction EventBus + Workers Push, depreciation des anciens
4.2.0 Warnings de compilation pour les anciens workers
5.0.0 Suppression des anciens workers (breaking change)

3. Architecture EventBus

3.1 Vue d’ensemble

┌─────────────────────────────────────────────────────────────────────┐
│                              MOP                                     │
│                                                                      │
│  ┌──────────────────────────────────────────────────────────────┐   │
│  │                        EventBus                               │   │
│  │                                                               │   │
│  │   Channels:                                                   │   │
│  │     "orders.created"   ──► [Worker1.queue, Worker2.queue]     │   │
│  │     "orders.updated"   ──► [Worker1.queue]                    │   │
│  │     "alerts.*"         ──► [AlertWorker.queue]                │   │
│  │                                                               │   │
│  └──────────────────────────────────────────────────────────────┘   │
│                              │                                       │
│         publish("orders.created", event)                            │
│                              │                                       │
│                              ▼                                       │
│  ┌────────────────┐  ┌────────────────┐  ┌────────────────┐        │
│  │ PushWorker 1   │  │ PushWorker 2   │  │ PushWorker 3   │        │
│  │                │  │                │  │                │        │
│  │ queue.take()   │  │ queue.take()   │  │ queue.take()   │        │
│  │ (dort)         │  │ (dort)         │  │ (dort)         │        │
│  │      ↓         │  │      ↓         │  │                │        │
│  │ REVEIL!        │  │ REVEIL!        │  │ (pas abonne)   │        │
│  └────────────────┘  └────────────────┘  └────────────────┘        │
│                                                                      │
└─────────────────────────────────────────────────────────────────────┘

3.2 Composants

Composant Role
EventBus Bus central de publication/souscription
BusEvent<T> Enveloppe d’evenement avec metadata
AbstractPushWorker<T> Worker reveille par EventBus
KafkaPushWorker<K,V> Worker Kafka avec push interne
EventBusWorker Worker qui gere le lifecycle de l’EventBus

4. Interface EventBus

4.1 Definition

package eu.lmvi.socle.event;

/**
 * Bus d'evenements central avec mecanisme push.
 *
 * <p>L'EventBus permet la communication entre workers via un systeme
 * de publication/souscription avec reveil instantane.</p>
 */
public interface EventBus {

    // ==================== Publication ====================

    /**
     * Publie un evenement sur un channel.
     * Les subscribers sont reveilles instantanement.
     *
     * @param channel Nom du channel (ex: "orders.created")
     * @param payload Contenu de l'evenement
     */
    void publish(String channel, Object payload);

    /**
     * Publie un evenement avec metadata personnalisees.
     *
     * @param channel  Nom du channel
     * @param payload  Contenu de l'evenement
     * @param metadata Metadata additionnelles
     */
    void publish(String channel, Object payload, Map<String, String> metadata);

    /**
     * Publication asynchrone avec confirmation.
     *
     * @return Future completee quand tous les subscribers ont recu l'event
     */
    CompletableFuture<PublishResult> publishAsync(String channel, Object payload);

    // ==================== Souscription ====================

    /**
     * S'abonne a un channel et retourne une queue pour reception.
     * Utiliser queue.take() pour attendre les evenements (push).
     *
     * @param channel   Nom du channel
     * @param eventType Type attendu du payload
     * @return Queue sur laquelle faire take()
     */
    <T> Subscription<T> subscribe(String channel, Class<T> eventType);

    /**
     * S'abonne avec un pattern glob (ex: "orders.*", "*.created").
     *
     * @param pattern   Pattern glob
     * @param eventType Type attendu du payload
     * @return Subscription avec queue
     */
    <T> Subscription<T> subscribePattern(String pattern, Class<T> eventType);

    /**
     * S'abonne avec un handler callback (execution immediate).
     *
     * @param channel Nom du channel
     * @param handler Handler appele pour chaque evenement
     * @return ID de souscription pour unsubscribe
     */
    String subscribe(String channel, EventHandler handler);

    /**
     * Se desabonne.
     *
     * @param subscriptionId ID retourne par subscribe
     */
    void unsubscribe(String subscriptionId);

    // ==================== Lifecycle ====================

    /**
     * Demarre l'EventBus.
     */
    void start();

    /**
     * Arrete l'EventBus proprement.
     * Les queues sont videes, les subscribers notifies.
     */
    void stop();

    /**
     * Verifie la sante de l'EventBus.
     */
    boolean isHealthy();

    // ==================== Stats ====================

    /**
     * Statistiques globales.
     */
    EventBusStats getStats();

    /**
     * Statistiques par channel.
     */
    Map<String, ChannelStats> getChannelStats();
}

4.2 Classes de support

package eu.lmvi.socle.event;

/**
 * Evenement transporte par l'EventBus.
 */
public record BusEvent<T>(
    String id,                      // UUID unique
    String channel,                 // Channel de publication
    Instant timestamp,              // Timestamp de publication
    String source,                  // Source (worker name)
    T payload,                      // Contenu metier
    Map<String, String> metadata    // Metadata additionnelles
) {
    /**
     * Cree un evenement simple.
     */
    public static <T> BusEvent<T> of(String channel, T payload) {
        return new BusEvent<>(
            UUID.randomUUID().toString(),
            channel,
            Instant.now(),
            null,
            payload,
            Map.of()
        );
    }
}

/**
 * Souscription a un channel.
 */
public interface Subscription<T> {

    /**
     * ID unique de la souscription.
     */
    String getId();

    /**
     * Channel ou pattern souscrit.
     */
    String getChannelOrPattern();

    /**
     * Queue de reception des evenements.
     * Utiliser take() pour attendre (bloquant, reveil instantane).
     * Utiliser poll(timeout) si besoin d'un timeout.
     */
    BlockingQueue<BusEvent<T>> getQueue();

    /**
     * Se desabonner.
     */
    void unsubscribe();

    /**
     * Nombre d'evenements en attente dans la queue.
     */
    int getPendingCount();
}

/**
 * Handler pour souscription callback.
 */
@FunctionalInterface
public interface EventHandler {
    void onEvent(BusEvent<?> event);
}

/**
 * Resultat de publication.
 */
public record PublishResult(
    String eventId,
    int deliveredCount,
    int failedCount,
    Duration duration
) {}

/**
 * Statistiques EventBus.
 */
public record EventBusStats(
    long totalPublished,
    long totalDelivered,
    long totalFailed,
    int activeSubscriptions,
    int activeChannels,
    Instant startedAt
) {}

/**
 * Statistiques par channel.
 */
public record ChannelStats(
    String channel,
    long publishedCount,
    int subscriberCount,
    long lastPublishedAt
) {}

5. Implementations EventBus

5.1 InMemoryEventBus

Implementation pour mono-instance (dev, tests, applications simples).

package eu.lmvi.socle.event.impl;

@Component
@ConditionalOnProperty(name = "socle.eventbus.mode", havingValue = "in_memory", matchIfMissing = true)
public class InMemoryEventBus implements EventBus {

    private static final Logger log = LoggerFactory.getLogger(InMemoryEventBus.class);

    // Subscriptions par channel exact
    private final Map<String, Set<SubscriptionImpl<?>>> channelSubs = new ConcurrentHashMap<>();

    // Subscriptions par pattern
    private final Map<Pattern, Set<SubscriptionImpl<?>>> patternSubs = new ConcurrentHashMap<>();

    // Index pour lookup rapide
    private final Map<String, SubscriptionImpl<?>> subsIndex = new ConcurrentHashMap<>();

    // Compteurs
    private final AtomicLong idGen = new AtomicLong();
    private final AtomicLong publishedCount = new AtomicLong();
    private final AtomicLong deliveredCount = new AtomicLong();
    private final AtomicLong failedCount = new AtomicLong();

    // Etat
    private volatile boolean running = false;
    private Instant startedAt;

    // ==================== Lifecycle ====================

    @Override
    public void start() {
        running = true;
        startedAt = Instant.now();
        log.info("InMemoryEventBus started");
    }

    @Override
    public void stop() {
        running = false;

        // Notifier tous les subscribers (poison pill)
        for (SubscriptionImpl<?> sub : subsIndex.values()) {
            sub.notifyShutdown();
        }

        log.info("InMemoryEventBus stopped. Published={}, Delivered={}, Failed={}",
            publishedCount.get(), deliveredCount.get(), failedCount.get());
    }

    @Override
    public boolean isHealthy() {
        return running;
    }

    // ==================== Publish ====================

    @Override
    public void publish(String channel, Object payload) {
        publish(channel, payload, Map.of());
    }

    @Override
    public void publish(String channel, Object payload, Map<String, String> metadata) {
        if (!running) {
            log.warn("EventBus not running, event dropped on channel '{}'", channel);
            return;
        }

        BusEvent<?> event = new BusEvent<>(
            "evt-" + idGen.incrementAndGet(),
            channel,
            Instant.now(),
            resolveSource(),
            payload,
            metadata
        );

        int delivered = 0;
        int failed = 0;

        // Livraison aux subscriptions exactes
        Set<SubscriptionImpl<?>> exact = channelSubs.get(channel);
        if (exact != null) {
            for (SubscriptionImpl<?> sub : exact) {
                if (deliverToSubscription(sub, event)) {
                    delivered++;
                } else {
                    failed++;
                }
            }
        }

        // Livraison aux subscriptions par pattern
        for (Map.Entry<Pattern, Set<SubscriptionImpl<?>>> entry : patternSubs.entrySet()) {
            if (entry.getKey().matcher(channel).matches()) {
                for (SubscriptionImpl<?> sub : entry.getValue()) {
                    if (deliverToSubscription(sub, event)) {
                        delivered++;
                    } else {
                        failed++;
                    }
                }
            }
        }

        publishedCount.incrementAndGet();
        deliveredCount.addAndGet(delivered);
        failedCount.addAndGet(failed);

        log.debug("Published event {} to channel '{}': delivered={}, failed={}",
            event.id(), channel, delivered, failed);
    }

    @Override
    public CompletableFuture<PublishResult> publishAsync(String channel, Object payload) {
        return CompletableFuture.supplyAsync(() -> {
            Instant start = Instant.now();
            publish(channel, payload);
            // Simplification: on ne trace pas le detail ici
            return new PublishResult(
                "evt-" + idGen.get(),
                1, 0,
                Duration.between(start, Instant.now())
            );
        });
    }

    @SuppressWarnings("unchecked")
    private boolean deliverToSubscription(SubscriptionImpl<?> sub, BusEvent<?> event) {
        try {
            BlockingQueue<BusEvent<?>> queue = (BlockingQueue<BusEvent<?>>) sub.getQueue();

            // offer avec timeout pour eviter blocage infini
            boolean offered = queue.offer(event, 100, TimeUnit.MILLISECONDS);

            if (!offered) {
                log.warn("Queue full for subscription {}, event dropped", sub.getId());
                return false;
            }

            return true;

        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            return false;
        } catch (Exception e) {
            log.error("Error delivering to subscription {}: {}", sub.getId(), e.getMessage());
            return false;
        }
    }

    private String resolveSource() {
        // Recuperer le nom du worker courant si possible
        String threadName = Thread.currentThread().getName();
        return threadName;
    }

    // ==================== Subscribe ====================

    @Override
    public <T> Subscription<T> subscribe(String channel, Class<T> eventType) {
        String subId = "sub-" + idGen.incrementAndGet();
        SubscriptionImpl<T> sub = new SubscriptionImpl<>(subId, channel, null, this);

        channelSubs
            .computeIfAbsent(channel, k -> ConcurrentHashMap.newKeySet())
            .add(sub);

        subsIndex.put(subId, sub);

        log.info("Subscription {} created on channel '{}'", subId, channel);
        return sub;
    }

    @Override
    public <T> Subscription<T> subscribePattern(String pattern, Class<T> eventType) {
        String subId = "sub-" + idGen.incrementAndGet();

        // Convertir glob en regex
        String regex = pattern
            .replace(".", "\\.")
            .replace("*", "[^.]*")
            .replace(">", ".*");
        Pattern compiled = Pattern.compile("^" + regex + "$");

        SubscriptionImpl<T> sub = new SubscriptionImpl<>(subId, pattern, compiled, this);

        patternSubs
            .computeIfAbsent(compiled, k -> ConcurrentHashMap.newKeySet())
            .add(sub);

        subsIndex.put(subId, sub);

        log.info("Subscription {} created on pattern '{}'", subId, pattern);
        return sub;
    }

    @Override
    public String subscribe(String channel, EventHandler handler) {
        // Implementation simplifiee: on cree une subscription normale
        // et un thread qui consomme et appelle le handler
        Subscription<Object> sub = subscribe(channel, Object.class);

        Thread.ofVirtual()
            .name("handler-" + sub.getId())
            .start(() -> {
                while (running) {
                    try {
                        BusEvent<Object> event = sub.getQueue().take();
                        if (event.payload() == null) break; // Poison pill
                        handler.onEvent(event);
                    } catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                        break;
                    } catch (Exception e) {
                        log.error("Handler error on {}: {}", channel, e.getMessage());
                    }
                }
            });

        return sub.getId();
    }

    @Override
    public void unsubscribe(String subscriptionId) {
        SubscriptionImpl<?> sub = subsIndex.remove(subscriptionId);
        if (sub == null) {
            log.warn("Subscription {} not found", subscriptionId);
            return;
        }

        // Retirer des maps
        if (sub.pattern == null) {
            Set<SubscriptionImpl<?>> subs = channelSubs.get(sub.channelOrPattern);
            if (subs != null) subs.remove(sub);
        } else {
            Set<SubscriptionImpl<?>> subs = patternSubs.get(sub.pattern);
            if (subs != null) subs.remove(sub);
        }

        sub.notifyShutdown();
        log.info("Subscription {} removed", subscriptionId);
    }

    // ==================== Stats ====================

    @Override
    public EventBusStats getStats() {
        return new EventBusStats(
            publishedCount.get(),
            deliveredCount.get(),
            failedCount.get(),
            subsIndex.size(),
            channelSubs.size(),
            startedAt
        );
    }

    @Override
    public Map<String, ChannelStats> getChannelStats() {
        Map<String, ChannelStats> stats = new HashMap<>();

        for (Map.Entry<String, Set<SubscriptionImpl<?>>> entry : channelSubs.entrySet()) {
            stats.put(entry.getKey(), new ChannelStats(
                entry.getKey(),
                0, // TODO: compteur par channel
                entry.getValue().size(),
                0
            ));
        }

        return stats;
    }

    // ==================== Subscription Implementation ====================

    private static class SubscriptionImpl<T> implements Subscription<T> {

        private final String id;
        private final String channelOrPattern;
        private final Pattern pattern;
        private final BlockingQueue<BusEvent<T>> queue;
        private final InMemoryEventBus bus;

        SubscriptionImpl(String id, String channelOrPattern, Pattern pattern, InMemoryEventBus bus) {
            this.id = id;
            this.channelOrPattern = channelOrPattern;
            this.pattern = pattern;
            this.queue = new LinkedBlockingQueue<>(10_000); // Capacite par defaut
            this.bus = bus;
        }

        @Override
        public String getId() {
            return id;
        }

        @Override
        public String getChannelOrPattern() {
            return channelOrPattern;
        }

        @Override
        public BlockingQueue<BusEvent<T>> getQueue() {
            return queue;
        }

        @Override
        public void unsubscribe() {
            bus.unsubscribe(id);
        }

        @Override
        public int getPendingCount() {
            return queue.size();
        }

        void notifyShutdown() {
            // Injecter un poison pill pour debloquer les take()
            try {
                queue.offer(new BusEvent<>(null, null, null, null, null, null));
            } catch (Exception ignored) {}
        }
    }
}

5.2 RedisEventBus

Implementation pour multi-instances (production).

package eu.lmvi.socle.event.impl;

@Component
@ConditionalOnProperty(name = "socle.eventbus.mode", havingValue = "redis")
public class RedisEventBus implements EventBus {

    private final JedisPool jedisPool;
    private final ObjectMapper mapper;
    private final String prefix;
    private final ExecutorService listenerExecutor;

    // Subscriptions locales (une queue locale par subscription)
    private final Map<String, LocalSubscription<?>> localSubs = new ConcurrentHashMap<>();

    // JedisPubSub par channel (partage entre subscriptions locales)
    private final Map<String, RedisPubSubListener> redisListeners = new ConcurrentHashMap<>();

    // ... Implementation similaire avec Redis Pub/Sub ...

    @Override
    public void publish(String channel, Object payload) {
        try (Jedis jedis = jedisPool.getResource()) {
            BusEvent<?> event = BusEvent.of(channel, payload);
            String json = mapper.writeValueAsString(event);
            jedis.publish(prefix + channel, json);
        } catch (Exception e) {
            log.error("Redis publish error: {}", e.getMessage());
        }
    }

    // Redis SUBSCRIBE dans un thread dedie
    // Dispatch vers les queues locales
}

6. AbstractPushWorker

6.1 Definition

package eu.lmvi.socle.worker.push;

/**
 * Worker reveille instantanement par l'EventBus.
 *
 * <p>Contrairement a {@link AbstractEventDrivenWorker} (DEPRECIE) qui utilise
 * un polling avec timeout, ce worker utilise un mecanisme push avec reveil
 * instantane via {@link BlockingQueue#take()}.</p>
 *
 * <h3>Avantages</h3>
 * <ul>
 *   <li>Latence ~0ms (reveil instantane)</li>
 *   <li>Zero CPU quand idle</li>
 *   <li>Integration native avec EventBus</li>
 * </ul>
 *
 * <h3>Exemple d'utilisation</h3>
 * <pre>{@code
 * @Component
 * public class OrderWorker extends AbstractPushWorker<OrderEvent> {
 *
 *     public OrderWorker() {
 *         super("orders.created", "orders.updated");
 *     }
 *
 *     @Override
 *     public String getName() {
 *         return "order_worker";
 *     }
 *
 *     @Override
 *     protected Class<OrderEvent> getEventType() {
 *         return OrderEvent.class;
 *     }
 *
 *     @Override
 *     protected void processEvent(OrderEvent event) {
 *         // Traitement instantane des que l'event arrive
 *     }
 * }
 * }</pre>
 *
 * @param <T> Type du payload des evenements
 */
public abstract class AbstractPushWorker<T> implements Worker {

    protected final Logger logger = LoggerFactory.getLogger(getClass());

    // ==================== Injection ====================

    @Autowired
    protected EventBus eventBus;

    // ==================== Configuration ====================

    private final String[] channels;
    private final int concurrency;
    private final int queueCapacity;
    private final Duration shutdownTimeout;

    // ==================== Etat ====================

    protected final AtomicBoolean running = new AtomicBoolean(false);
    protected ExecutorService executor;
    protected List<Subscription<T>> subscriptions = new ArrayList<>();
    protected BlockingQueue<BusEvent<T>> mergedQueue;

    // ==================== Metriques ====================

    protected final AtomicLong processedCount = new AtomicLong(0);
    protected final AtomicLong errorCount = new AtomicLong(0);
    protected final AtomicLong totalDurationNs = new AtomicLong(0);
    protected volatile Instant lastExecution;
    protected volatile Instant startedAt;

    // ==================== Constructeurs ====================

    /**
     * Cree un worker push mono-thread.
     *
     * @param channels Channels a ecouter
     */
    protected AbstractPushWorker(String... channels) {
        this(1, channels);
    }

    /**
     * Cree un worker push avec concurrence.
     *
     * @param concurrency Nombre de threads de traitement
     * @param channels    Channels a ecouter
     */
    protected AbstractPushWorker(int concurrency, String... channels) {
        this(concurrency, 10_000, Duration.ofSeconds(30), channels);
    }

    /**
     * Cree un worker push avec configuration complete.
     *
     * @param concurrency     Nombre de threads de traitement
     * @param queueCapacity   Capacite de la queue interne
     * @param shutdownTimeout Timeout d'arret gracieux
     * @param channels        Channels a ecouter
     */
    protected AbstractPushWorker(int concurrency, int queueCapacity,
                                  Duration shutdownTimeout, String... channels) {
        this.concurrency = Math.max(1, concurrency);
        this.queueCapacity = queueCapacity;
        this.shutdownTimeout = shutdownTimeout;
        this.channels = channels;

        if (channels == null || channels.length == 0) {
            throw new IllegalArgumentException("At least one channel is required");
        }
    }

    // ==================== Worker Interface ====================

    @Override
    public final String getSchedule() {
        return "PUSH";
    }

    @Override
    public final long getCycleIntervalMs() {
        return -1;
    }

    @Override
    public final boolean isPassive() {
        return true;
    }

    @Override
    public int getStartPriority() {
        return 200;  // Apres l'EventBus (100)
    }

    @Override
    public int getStopPriority() {
        return 200;  // Avant l'EventBus
    }

    @Override
    public void initialize() {
        logger.info("[{}] Initializing push worker", getName());
        logger.info("[{}] Channels: {}", getName(), Arrays.toString(channels));
        logger.info("[{}] Concurrency: {}", getName(), concurrency);

        // Queue fusionnee pour tous les channels
        mergedQueue = new LinkedBlockingQueue<>(queueCapacity);

        onInitialize();
    }

    @Override
    public void start() {
        if (running.getAndSet(true)) {
            logger.warn("[{}] Already running", getName());
            return;
        }

        startedAt = Instant.now();

        // S'abonner aux channels
        for (String channel : channels) {
            if (channel.contains("*") || channel.contains(">")) {
                // Pattern subscription
                Subscription<T> sub = eventBus.subscribePattern(channel, getEventType());
                subscriptions.add(sub);
                startForwarder(sub);
            } else {
                // Exact subscription
                Subscription<T> sub = eventBus.subscribe(channel, getEventType());
                subscriptions.add(sub);
                startForwarder(sub);
            }
        }

        // Demarrer les workers de traitement
        executor = Executors.newThreadPerTaskExecutor(
            Thread.ofVirtual().name(getName() + "-push-", 0).factory()
        );

        for (int i = 0; i < concurrency; i++) {
            final int workerId = i;
            executor.submit(() -> pushLoop(workerId));
        }

        onStarted();
        logger.info("[{}] Started with {} workers on {} channels",
            getName(), concurrency, channels.length);
    }

    /**
     * Demarre un thread qui forward les events d'une subscription vers la queue fusionnee.
     */
    private void startForwarder(Subscription<T> sub) {
        Thread.ofVirtual()
            .name(getName() + "-fwd-" + sub.getId())
            .start(() -> {
                while (running.get()) {
                    try {
                        BusEvent<T> event = sub.getQueue().take();

                        // Verifier poison pill
                        if (event.id() == null) break;

                        // Forward vers la queue fusionnee
                        if (!mergedQueue.offer(event, 1, TimeUnit.SECONDS)) {
                            logger.warn("[{}] Merged queue full, event dropped", getName());
                        }
                    } catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                        break;
                    }
                }
            });
    }

    @Override
    public void stop() {
        if (!running.getAndSet(false)) {
            logger.warn("[{}] Not running", getName());
            return;
        }

        logger.info("[{}] Stopping...", getName());
        onStopping();

        // Se desabonner (injecte des poison pills)
        for (Subscription<T> sub : subscriptions) {
            try {
                sub.unsubscribe();
            } catch (Exception e) {
                logger.warn("[{}] Error unsubscribing: {}", getName(), e.getMessage());
            }
        }
        subscriptions.clear();

        // Injecter poison pills dans la queue fusionnee
        for (int i = 0; i < concurrency; i++) {
            mergedQueue.offer(new BusEvent<>(null, null, null, null, null, null));
        }

        // Attendre la fin des workers
        if (executor != null) {
            executor.shutdown();
            try {
                if (!executor.awaitTermination(shutdownTimeout.toMillis(), TimeUnit.MILLISECONDS)) {
                    logger.warn("[{}] Forcing shutdown", getName());
                    executor.shutdownNow();
                }
            } catch (InterruptedException e) {
                executor.shutdownNow();
                Thread.currentThread().interrupt();
            }
        }

        onStopped();

        long uptime = Duration.between(startedAt, Instant.now()).toSeconds();
        logger.info("[{}] Stopped after {}s. Processed={}, Errors={}",
            getName(), uptime, processedCount.get(), errorCount.get());
    }

    @Override
    public void doWork() {
        // Non utilise en mode PUSH
    }

    @Override
    public boolean isHealthy() {
        return running.get() && executor != null && !executor.isShutdown();
    }

    @Override
    public Map<String, Object> getStats() {
        long uptime = startedAt != null
            ? Duration.between(startedAt, Instant.now()).toMillis()
            : 0;

        double avgDurationMs = processedCount.get() > 0
            ? (totalDurationNs.get() / 1_000_000.0) / processedCount.get()
            : 0;

        double throughput = uptime > 0
            ? processedCount.get() * 1000.0 / uptime
            : 0;

        Map<String, Object> stats = new HashMap<>();

        // Cles standardisees
        stats.put("state", running.get() ? "running" : "stopped");
        stats.put("schedule", "PUSH");
        stats.put("execution_count", processedCount.get());
        stats.put("errors_count", errorCount.get());
        stats.put("last_execution", lastExecution != null ? lastExecution.toString() : null);

        // Cles specifiques
        stats.put("channels", channels);
        stats.put("concurrency", concurrency);
        stats.put("queue_size", mergedQueue != null ? mergedQueue.size() : 0);
        stats.put("queue_capacity", queueCapacity);
        stats.put("uptime_ms", uptime);
        stats.put("avg_duration_ms", Math.round(avgDurationMs * 100) / 100.0);
        stats.put("throughput_per_sec", Math.round(throughput * 100) / 100.0);

        // Stats des subscriptions
        stats.put("subscriptions", subscriptions.stream()
            .map(s -> Map.of(
                "id", s.getId(),
                "channel", s.getChannelOrPattern(),
                "pending", s.getPendingCount()
            ))
            .toList());

        return stats;
    }

    // ==================== Push Loop ====================

    private void pushLoop(int workerId) {
        logger.debug("[{}] Push loop #{} started", getName(), workerId);

        while (running.get()) {
            try {
                // BLOQUE jusqu'a reception d'un event (reveil instantane)
                BusEvent<T> event = mergedQueue.take();

                // Verifier poison pill
                if (event.id() == null) {
                    logger.debug("[{}] Push loop #{} received shutdown signal", getName(), workerId);
                    break;
                }

                // Traiter l'event
                processEventSafe(event, workerId);

            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                logger.debug("[{}] Push loop #{} interrupted", getName(), workerId);
                break;
            } catch (Exception e) {
                logger.error("[{}] Unexpected error in push loop #{}: {}",
                    getName(), workerId, e.getMessage(), e);
            }
        }

        logger.debug("[{}] Push loop #{} stopped", getName(), workerId);
    }

    private void processEventSafe(BusEvent<T> event, int workerId) {
        long startNs = System.nanoTime();

        try {
            processEvent(event.payload(), event);
            processedCount.incrementAndGet();
        } catch (Exception e) {
            errorCount.incrementAndGet();
            handleError(event, e, workerId);
        } finally {
            long durationNs = System.nanoTime() - startNs;
            totalDurationNs.addAndGet(durationNs);
            lastExecution = Instant.now();
        }
    }

    // ==================== Methodes abstraites ====================

    /**
     * Type du payload des evenements.
     * Utilise pour le typage de la souscription.
     */
    protected abstract Class<T> getEventType();

    /**
     * Traite un evenement.
     *
     * @param payload Contenu de l'evenement
     * @param event   Evenement complet (avec metadata)
     */
    protected abstract void processEvent(T payload, BusEvent<T> event);

    /**
     * Version simplifiee sans acces a l'enveloppe.
     * Surcharger processEvent(T, BusEvent) pour avoir acces aux metadata.
     */
    protected void processEvent(T payload) {
        // Par defaut, appelle la version complete
    }

    // ==================== Hooks ====================

    /**
     * Hook d'initialisation.
     */
    protected void onInitialize() {}

    /**
     * Hook post-demarrage.
     */
    protected void onStarted() {}

    /**
     * Hook pre-arret.
     */
    protected void onStopping() {}

    /**
     * Hook post-arret.
     */
    protected void onStopped() {}

    /**
     * Gestion des erreurs.
     * Par defaut, log l'erreur. Surcharger pour DLQ, retry, etc.
     */
    protected void handleError(BusEvent<T> event, Exception e, int workerId) {
        logger.error("[{}] Worker #{} error processing event {}: {}",
            getName(), workerId, event.id(), e.getMessage(), e);
    }

    // ==================== Utilitaires ====================

    /**
     * Publie un evenement sur l'EventBus.
     * Raccourci pratique pour les workers qui produisent aussi des events.
     */
    protected void publish(String channel, Object payload) {
        eventBus.publish(channel, payload);
    }

    /**
     * Retourne le nombre d'evenements en attente.
     */
    public long getBacklog() {
        int pending = mergedQueue != null ? mergedQueue.size() : 0;
        for (Subscription<T> sub : subscriptions) {
            pending += sub.getPendingCount();
        }
        return pending;
    }
}

6.2 Version simplifiee (single channel)

/**
 * Version simplifiee pour un seul channel.
 */
public abstract class SimplePushWorker<T> extends AbstractPushWorker<T> {

    protected SimplePushWorker(String channel) {
        super(channel);
    }

    protected SimplePushWorker(int concurrency, String channel) {
        super(concurrency, channel);
    }

    @Override
    protected void processEvent(T payload, BusEvent<T> event) {
        processEvent(payload);
    }

    /**
     * Traite le payload directement.
     */
    protected abstract void processEvent(T payload);
}

7. Configuration

7.1 application.yml

socle:
  eventbus:
    # Activer l'EventBus
    enabled: ${EVENTBUS_ENABLED:true}

    # Mode: in_memory | redis
    mode: ${EVENTBUS_MODE:in_memory}

    # Configuration commune
    common:
      # Capacite par defaut des queues de subscription
      default_queue_capacity: ${EVENTBUS_QUEUE_CAPACITY:10000}

      # Timeout pour offer() quand queue pleine
      offer_timeout_ms: ${EVENTBUS_OFFER_TIMEOUT:100}

    # Configuration InMemory
    in_memory:
      # Rien de special

    # Configuration Redis
    redis:
      host: ${REDIS_HOST:localhost}
      port: ${REDIS_PORT:6379}
      password: ${REDIS_PASSWORD:}
      database: ${REDIS_DATABASE:0}
      prefix: ${EVENTBUS_REDIS_PREFIX:socle:eventbus}

      # Pool de connexions
      pool:
        max_total: ${EVENTBUS_REDIS_POOL_MAX:16}
        max_idle: ${EVENTBUS_REDIS_POOL_MAX_IDLE:8}
        min_idle: ${EVENTBUS_REDIS_POOL_MIN_IDLE:2}

    # Metriques
    metrics:
      enabled: ${EVENTBUS_METRICS_ENABLED:true}
      prefix: socle_eventbus

7.2 Variables d’environnement

Variable Description Defaut
EVENTBUS_ENABLED Activer l’EventBus true
EVENTBUS_MODE Mode (in_memory / redis) in_memory
EVENTBUS_QUEUE_CAPACITY Capacite des queues 10000
REDIS_HOST Host Redis localhost
REDIS_PORT Port Redis 6379

8. Exemples d’utilisation

8.1 Worker simple

@Component
public class OrderNotificationWorker extends SimplePushWorker<OrderEvent> {

    @Autowired
    private NotificationService notificationService;

    public OrderNotificationWorker() {
        super(2, "orders.created");  // 2 workers concurrents
    }

    @Override
    public String getName() {
        return "order_notification_worker";
    }

    @Override
    protected Class<OrderEvent> getEventType() {
        return OrderEvent.class;
    }

    @Override
    protected void processEvent(OrderEvent event) {
        notificationService.notifyOrderCreated(event.orderId(), event.customerEmail());
    }
}

8.2 Worker multi-channels

@Component
public class AuditWorker extends AbstractPushWorker<Object> {

    @Autowired
    private AuditRepository auditRepository;

    public AuditWorker() {
        super(4, "orders.*", "payments.*", "users.*");  // Pattern matching
    }

    @Override
    public String getName() {
        return "audit_worker";
    }

    @Override
    protected Class<Object> getEventType() {
        return Object.class;  // Accepte tout
    }

    @Override
    protected void processEvent(Object payload, BusEvent<Object> event) {
        // Acces aux metadata
        AuditEntry entry = new AuditEntry(
            event.id(),
            event.channel(),
            event.timestamp(),
            event.source(),
            payload
        );
        auditRepository.save(entry);
    }
}

8.3 Publication d’evenements

@Service
public class OrderService {

    @Autowired
    private EventBus eventBus;

    @Autowired
    private OrderRepository orderRepository;

    public Order createOrder(CreateOrderRequest request) {
        // Logique metier
        Order order = orderRepository.save(new Order(request));

        // Publication sur l'EventBus
        // Tous les workers abonnes a "orders.created" sont reveilles instantanement
        eventBus.publish("orders.created", new OrderEvent(
            order.getId(),
            order.getCustomerEmail(),
            order.getTotal()
        ));

        return order;
    }

    public void updateOrderStatus(String orderId, OrderStatus newStatus) {
        Order order = orderRepository.updateStatus(orderId, newStatus);

        eventBus.publish("orders.updated", new OrderStatusEvent(
            orderId,
            newStatus
        ));
    }
}

8.4 Worker avec gestion d’erreur personnalisee

@Component
public class PaymentWorker extends AbstractPushWorker<PaymentEvent> {

    @Autowired
    private PaymentService paymentService;

    @Autowired
    private DeadLetterQueue dlq;

    public PaymentWorker() {
        super(2, "payments.process");
    }

    @Override
    public String getName() {
        return "payment_worker";
    }

    @Override
    protected Class<PaymentEvent> getEventType() {
        return PaymentEvent.class;
    }

    @Override
    protected void processEvent(PaymentEvent payload, BusEvent<PaymentEvent> event) {
        paymentService.processPayment(payload);
    }

    @Override
    protected void handleError(BusEvent<PaymentEvent> event, Exception e, int workerId) {
        logger.error("[{}] Payment processing failed for {}: {}",
            getName(), event.payload().paymentId(), e.getMessage());

        // Envoyer en DLQ pour retry ulterieur
        dlq.send("payments.dlq", event, e);
    }
}

9. Solutions aux questions ouvertes

9.1 Arret propre des workers en take()

Probleme : BlockingQueue.take() bloque indefiniment. Comment arreter proprement ?

Solution : Poison Pill Pattern

// A l'arret, injecter un evenement "poison" reconnaissable
@Override
public void stop() {
    running.set(false);

    // Injecter N poison pills (un par worker)
    for (int i = 0; i < concurrency; i++) {
        queue.offer(new BusEvent<>(null, null, null, null, null, null));
    }
}

// Dans la boucle, verifier le poison
private void pushLoop(int workerId) {
    while (running.get()) {
        BusEvent<T> event = queue.take();

        // Detecter le poison pill (id == null)
        if (event.id() == null) {
            break;  // Sortir proprement
        }

        processEvent(event);
    }
}

9.2 Backpressure (queue bornee vs illimitee)

Probleme : Que faire si les events arrivent plus vite qu’ils ne sont traites ?

Solution : Queue bornee avec politique de debordement

// Queue bornee (recommande)
BlockingQueue<BusEvent<T>> queue = new LinkedBlockingQueue<>(10_000);

// Lors du publish, gerer le debordement
private boolean deliver(Subscription sub, BusEvent event) {
    // offer() avec timeout - ne bloque pas indefiniment
    boolean offered = queue.offer(event, 100, TimeUnit.MILLISECONDS);

    if (!offered) {
        // Politique de debordement:
        // Option 1: Drop (log + metrique)
        log.warn("Queue full, event dropped");
        droppedCount.incrementAndGet();

        // Option 2: Envoyer en DLQ
        dlq.send("overflow", event);

        // Option 3: Alerter
        alertService.queueOverflow(sub.getId());
    }

    return offered;
}

Metriques a surveiller :

  • socle_eventbus_queue_size : Taille actuelle
  • socle_eventbus_queue_capacity : Capacite max
  • socle_eventbus_dropped_total : Events perdus

9.3 Multi-channel avec une seule queue

Probleme : Un worker ecoute plusieurs channels, comment fusionner ?

Solution : Queue fusionnee avec forwarders

// Queue fusionnee
BlockingQueue<BusEvent<T>> mergedQueue = new LinkedBlockingQueue<>();

// Un forwarder par subscription
for (String channel : channels) {
    Subscription<T> sub = eventBus.subscribe(channel, eventType);

    // Thread qui forward vers la queue fusionnee
    Thread.ofVirtual().start(() -> {
        while (running.get()) {
            BusEvent<T> event = sub.getQueue().take();
            mergedQueue.offer(event);
        }
    });
}

// Les workers consomment la queue fusionnee
void pushLoop() {
    while (running.get()) {
        BusEvent<T> event = mergedQueue.take();
        // event.channel() permet de savoir d'ou il vient
        processEvent(event);
    }
}

9.4 Ordre des evenements

Probleme : Les evenements doivent-ils etre traites dans l’ordre ?

Solution : Depends du cas d’usage

Mode Garantie Implementation
Aucune Pas d’ordre garanti Concurrency > 1
Par channel Ordre par channel Concurrency = 1 par channel
Par cle Ordre par cle metier Partitioning sur la cle
// Pour garantir l'ordre par cle (ex: par orderId)
public class OrderedPushWorker extends AbstractPushWorker<OrderEvent> {

    // Une queue par partition (hash de la cle)
    private final BlockingQueue<BusEvent<OrderEvent>>[] partitionQueues;
    private final int partitions = 8;

    @Override
    protected void processEvent(OrderEvent payload, BusEvent<OrderEvent> event) {
        // Le hash garantit que les events du meme orderId vont dans la meme partition
        int partition = Math.abs(payload.orderId().hashCode()) % partitions;
        partitionQueues[partition].offer(event);
    }

    // Un worker par partition = ordre garanti par partition
}

9.5 Retry et DLQ

Probleme : Que faire si le traitement echoue ?

Solution : Retry avec backoff puis DLQ

@Component
public class ResilientPushWorker extends AbstractPushWorker<MyEvent> {

    @Autowired
    private EventBus eventBus;

    private static final int MAX_RETRIES = 3;

    @Override
    protected void processEvent(MyEvent payload, BusEvent<MyEvent> event) {
        int retryCount = getRetryCount(event);

        try {
            doProcess(payload);
        } catch (RetryableException e) {
            if (retryCount < MAX_RETRIES) {
                // Re-publier avec retry count incremente
                scheduleRetry(event, retryCount + 1);
            } else {
                // Max retries atteint -> DLQ
                sendToDlq(event, e);
            }
        }
    }

    private void scheduleRetry(BusEvent<MyEvent> event, int retryCount) {
        // Backoff exponentiel
        long delayMs = (long) Math.pow(2, retryCount) * 1000;

        // Re-publier apres le delai
        scheduler.schedule(() -> {
            Map<String, String> metadata = new HashMap<>(event.metadata());
            metadata.put("retry_count", String.valueOf(retryCount));
            eventBus.publish(event.channel(), event.payload(), metadata);
        }, delayMs, TimeUnit.MILLISECONDS);
    }

    private int getRetryCount(BusEvent<MyEvent> event) {
        return Integer.parseInt(event.metadata().getOrDefault("retry_count", "0"));
    }

    private void sendToDlq(BusEvent<MyEvent> event, Exception e) {
        eventBus.publish("dlq." + event.channel(), new DlqEntry(event, e));
    }
}

10. Migration des workers existants

10.1 Avant (AbstractEventDrivenWorker)

// DEPRECIE
@Component
public class OldOrderWorker extends AbstractEventDrivenWorker<OrderEvent> {

    private final BlockingQueue<OrderEvent> queue = new LinkedBlockingQueue<>();

    public OldOrderWorker() {
        super(4);
    }

    @Override
    public String getName() {
        return "order_worker";
    }

    @Override
    protected OrderEvent pollEvent() throws InterruptedException {
        return queue.poll(100, TimeUnit.MILLISECONDS);  // Polling!
    }

    @Override
    protected void processEvent(OrderEvent event) {
        // Traitement
    }

    // Methode externe pour recevoir les events
    public void onOrderReceived(OrderEvent event) {
        queue.offer(event);
    }
}

10.2 Apres (AbstractPushWorker)

// RECOMMANDE
@Component
public class NewOrderWorker extends AbstractPushWorker<OrderEvent> {

    public NewOrderWorker() {
        super(4, "orders.created");  // Specifie le channel
    }

    @Override
    public String getName() {
        return "order_worker";
    }

    @Override
    protected Class<OrderEvent> getEventType() {
        return OrderEvent.class;
    }

    @Override
    protected void processEvent(OrderEvent payload, BusEvent<OrderEvent> event) {
        // Meme traitement
    }

    // Plus besoin de methode externe - l'EventBus gere tout
}

10.3 Etapes de migration

  1. Ajouter la dependance EventBus au worker
  2. Changer la classe parente : AbstractEventDrivenWorkerAbstractPushWorker
  3. Specifier les channels dans le constructeur
  4. Implementer getEventType()
  5. Adapter processEvent() pour accepter BusEvent
  6. Supprimer la queue interne et pollEvent()
  7. Modifier les producteurs pour utiliser eventBus.publish()
  8. Tester le comportement

11. Metriques Prometheus

# EventBus
socle_eventbus_published_total{channel="orders.created"}
socle_eventbus_delivered_total{channel="orders.created"}
socle_eventbus_failed_total{channel="orders.created"}
socle_eventbus_subscriptions_active{channel="orders.created"}
socle_eventbus_queue_size{subscription="sub-123"}

# Push Workers
socle_push_worker_processed_total{worker="order_worker"}
socle_push_worker_errors_total{worker="order_worker"}
socle_push_worker_queue_size{worker="order_worker"}
socle_push_worker_duration_seconds{worker="order_worker", quantile="0.99"}

12. API REST Admin

GET  /admin/eventbus/stats              → Statistiques globales
GET  /admin/eventbus/channels           → Liste des channels actifs
GET  /admin/eventbus/subscriptions      → Liste des subscriptions

POST /admin/eventbus/publish            → Publier un event (debug)
     Body: {"channel": "test", "payload": {...}}

13. Voir aussi

Socle V004 – EventBus et Workers Push

Commentaires

Laisser un commentaire

Votre adresse e-mail ne sera pas publiée. Les champs obligatoires sont indiqués avec *