Catégorie : Blog

  • Socle V004 – Scheduler

    Socle V004 – Scheduler

    12 – Scheduler

    Version : 4.0.0 Date : 2025-12-09

    1. Introduction

    Le Scheduler permet d’exécuter des Workers selon des expressions cron ou à intervalles réguliers.

    Caractéristiques

    • Support des expressions cron standard
    • Exécution à intervalle fixe
    • Gestion du chevauchement
    • Intégration avec le MOP

    2. Configuration

    2.1 application.yml

    socle:
      scheduler:
        enabled: ${SCHEDULER_ENABLED:true}
        thread-pool-size: ${SCHEDULER_POOL_SIZE:4}
        default-timezone: ${SCHEDULER_TIMEZONE:Europe/Paris}
    

    2.2 Variables d’environnement

    Variable Description Défaut
    SCHEDULER_ENABLED Activer le scheduler true
    SCHEDULER_POOL_SIZE Taille du thread pool 4
    SCHEDULER_TIMEZONE Timezone par défaut Europe/Paris

    3. Types de scheduling

    3.1 Cron

    Expressions cron standard (6 champs) :

    ┌───────────── seconde (0-59)
    │ ┌───────────── minute (0-59)
    │ │ ┌───────────── heure (0-23)
    │ │ │ ┌───────────── jour du mois (1-31)
    │ │ │ │ ┌───────────── mois (1-12)
    │ │ │ │ │ ┌───────────── jour de la semaine (0-6, 0=dimanche)
    │ │ │ │ │ │
    * * * * * *
    

    Exemples :

    • 0 0 6 * * ? : Tous les jours à 6h00
    • 0 */15 * * * ? : Toutes les 15 minutes
    • 0 0 0 1 * ? : Premier jour de chaque mois à minuit
    • 0 30 8 ? * MON-FRI : 8h30 du lundi au vendredi

    3.2 Intervalle fixe

    Exécution périodique simple :

    @Override
    public long getCycleIntervalMs() {
        return 60000;  // Toutes les minutes
    }
    

    4. Worker schedulé

    4.1 Avec expression cron

    @Component
    public class DailyReportWorker implements Worker {
    
        private static final Logger log = LoggerFactory.getLogger(DailyReportWorker.class);
    
        @Override
        public String getName() {
            return "daily-report-worker";
        }
    
        @Override
        public String getSchedule() {
            return "0 0 6 * * ?";  // Tous les jours à 6h
        }
    
        @Override
        public boolean isScheduled() {
            return true;
        }
    
        @Override
        public void doWork() {
            log.info("Generating daily report...");
            generateReport();
        }
    
        private void generateReport() {
            // Génération du rapport
        }
    
        // Autres méthodes Worker...
    }
    

    4.2 Avec intervalle

    @Component
    public class HealthCheckWorker implements Worker {
    
        @Override
        public String getName() {
            return "health-check-worker";
        }
    
        @Override
        public String getSchedule() {
            return null;  // Pas de cron
        }
    
        @Override
        public boolean isScheduled() {
            return false;  // Pas schedulé par cron
        }
    
        @Override
        public long getCycleIntervalMs() {
            return 30000;  // Toutes les 30 secondes
        }
    
        @Override
        public void doWork() {
            checkHealth();
        }
    }
    

    5. Interface Scheduler

    package eu.lmvi.socle.scheduler;
    
    public interface Scheduler {
    
        /**
         * Planifie un job cron
         */
        void scheduleCron(String jobId, String cronExpression, Runnable task);
    
        /**
         * Planifie un job à intervalle fixe
         */
        void scheduleInterval(String jobId, long intervalMs, Runnable task);
    
        /**
         * Planifie un job à intervalle fixe avec délai initial
         */
        void scheduleInterval(String jobId, long initialDelayMs, long intervalMs, Runnable task);
    
        /**
         * Planifie un job one-shot
         */
        void scheduleOnce(String jobId, long delayMs, Runnable task);
    
        /**
         * Annule un job
         */
        void cancel(String jobId);
    
        /**
         * Vérifie si un job est planifié
         */
        boolean isScheduled(String jobId);
    
        /**
         * Liste les jobs planifiés
         */
        List<ScheduledJob> getScheduledJobs();
    
        /**
         * Démarre le scheduler
         */
        void start();
    
        /**
         * Arrête le scheduler
         */
        void stop();
    }
    

    6. Implémentation

    package eu.lmvi.socle.scheduler;
    
    @Component
    public class DefaultScheduler implements Scheduler {
    
        private static final Logger log = LoggerFactory.getLogger(DefaultScheduler.class);
    
        private final ScheduledExecutorService executor;
        private final ConcurrentHashMap<String, ScheduledFuture<?>> jobs = new ConcurrentHashMap<>();
        private final ConcurrentHashMap<String, ScheduledJob> jobInfo = new ConcurrentHashMap<>();
        private final ZoneId timezone;
    
        public DefaultScheduler(SocleConfiguration config) {
            int poolSize = config.getScheduler().getThreadPoolSize();
            this.executor = Executors.newScheduledThreadPool(poolSize,
                r -> new Thread(r, "scheduler-" + System.currentTimeMillis()));
            this.timezone = ZoneId.of(config.getScheduler().getDefaultTimezone());
        }
    
        @Override
        public void scheduleCron(String jobId, String cronExpression, Runnable task) {
            CronExpression cron = CronExpression.parse(cronExpression);
    
            Runnable scheduledTask = () -> {
                log.debug("Executing cron job: {}", jobId);
                try {
                    task.run();
                } catch (Exception e) {
                    log.error("Error executing job {}: {}", jobId, e.getMessage(), e);
                }
                // Replanifier la prochaine exécution
                scheduleNextCronExecution(jobId, cron, task);
            };
    
            scheduleNextCronExecution(jobId, cron, task);
    
            jobInfo.put(jobId, new ScheduledJob(jobId, "cron", cronExpression, null, Instant.now()));
            log.info("Scheduled cron job: {} with expression: {}", jobId, cronExpression);
        }
    
        private void scheduleNextCronExecution(String jobId, CronExpression cron, Runnable task) {
            ZonedDateTime now = ZonedDateTime.now(timezone);
            ZonedDateTime next = cron.next(now);
    
            if (next != null) {
                long delayMs = Duration.between(now, next).toMillis();
    
                ScheduledFuture<?> future = executor.schedule(() -> {
                    task.run();
                    scheduleNextCronExecution(jobId, cron, task);
                }, delayMs, TimeUnit.MILLISECONDS);
    
                jobs.put(jobId, future);
            }
        }
    
        @Override
        public void scheduleInterval(String jobId, long intervalMs, Runnable task) {
            scheduleInterval(jobId, 0, intervalMs, task);
        }
    
        @Override
        public void scheduleInterval(String jobId, long initialDelayMs, long intervalMs, Runnable task) {
            Runnable wrappedTask = () -> {
                log.debug("Executing interval job: {}", jobId);
                try {
                    task.run();
                } catch (Exception e) {
                    log.error("Error executing job {}: {}", jobId, e.getMessage(), e);
                }
            };
    
            ScheduledFuture<?> future = executor.scheduleAtFixedRate(
                wrappedTask, initialDelayMs, intervalMs, TimeUnit.MILLISECONDS);
    
            jobs.put(jobId, future);
            jobInfo.put(jobId, new ScheduledJob(jobId, "interval", null, intervalMs, Instant.now()));
    
            log.info("Scheduled interval job: {} every {}ms", jobId, intervalMs);
        }
    
        @Override
        public void scheduleOnce(String jobId, long delayMs, Runnable task) {
            ScheduledFuture<?> future = executor.schedule(() -> {
                log.debug("Executing one-shot job: {}", jobId);
                try {
                    task.run();
                } finally {
                    jobs.remove(jobId);
                    jobInfo.remove(jobId);
                }
            }, delayMs, TimeUnit.MILLISECONDS);
    
            jobs.put(jobId, future);
            jobInfo.put(jobId, new ScheduledJob(jobId, "once", null, delayMs, Instant.now()));
    
            log.info("Scheduled one-shot job: {} in {}ms", jobId, delayMs);
        }
    
        @Override
        public void cancel(String jobId) {
            ScheduledFuture<?> future = jobs.remove(jobId);
            if (future != null) {
                future.cancel(false);
                jobInfo.remove(jobId);
                log.info("Cancelled job: {}", jobId);
            }
        }
    
        @Override
        public boolean isScheduled(String jobId) {
            return jobs.containsKey(jobId);
        }
    
        @Override
        public List<ScheduledJob> getScheduledJobs() {
            return new ArrayList<>(jobInfo.values());
        }
    
        @Override
        public void start() {
            log.info("Scheduler started");
        }
    
        @Override
        public void stop() {
            log.info("Stopping scheduler...");
            jobs.values().forEach(f -> f.cancel(false));
            jobs.clear();
            executor.shutdown();
            try {
                if (!executor.awaitTermination(30, TimeUnit.SECONDS)) {
                    executor.shutdownNow();
                }
            } catch (InterruptedException e) {
                executor.shutdownNow();
                Thread.currentThread().interrupt();
            }
            log.info("Scheduler stopped");
        }
    }
    

    7. Gestion du chevauchement

    7.1 Éviter le chevauchement

    @Component
    public class LongRunningWorker implements Worker {
    
        private final AtomicBoolean running = new AtomicBoolean(false);
    
        @Override
        public void doWork() {
            // Éviter l'exécution concurrente
            if (!running.compareAndSet(false, true)) {
                log.warn("Previous execution still running, skipping");
                return;
            }
    
            try {
                doLongWork();
            } finally {
                running.set(false);
            }
        }
    
        private void doLongWork() {
            // Traitement long
        }
    }
    

    7.2 Avec verrou distribué (multi-instances)

    @Component
    public class DistributedScheduledWorker implements Worker {
    
        @Autowired
        private KvBus kvBus;
    
        @Override
        public void doWork() {
            String lockKey = "lock:job:" + getName();
    
            // Tenter d'acquérir le lock
            if (!kvBus.putIfAbsent(lockKey, "locked", Duration.ofMinutes(10))) {
                log.debug("Job already running on another instance");
                return;
            }
    
            try {
                executeJob();
            } finally {
                kvBus.delete(lockKey);
            }
        }
    }
    

    8. Intégration MOP

    Le MOP intègre automatiquement les workers schedulés :

    // Dans MainOrchestratorProcess
    private void startScheduledWorkers() {
        for (Worker worker : workers) {
            if (worker.isScheduled() && worker.getSchedule() != null) {
                scheduler.scheduleCron(
                    "worker:" + worker.getName(),
                    worker.getSchedule(),
                    () -> {
                        if (worker.isHealthy()) {
                            worker.doWork();
                        }
                    }
                );
            }
        }
    }
    

    9. API Admin

    @RestController
    @RequestMapping("/admin/scheduler")
    public class SchedulerController {
    
        @Autowired
        private Scheduler scheduler;
    
        @GetMapping("/jobs")
        public List<ScheduledJob> listJobs() {
            return scheduler.getScheduledJobs();
        }
    
        @PostMapping("/jobs/{jobId}/cancel")
        public ResponseEntity<Void> cancelJob(@PathVariable String jobId) {
            if (scheduler.isScheduled(jobId)) {
                scheduler.cancel(jobId);
                return ResponseEntity.ok().build();
            }
            return ResponseEntity.notFound().build();
        }
    
        @PostMapping("/jobs/{jobId}/trigger")
        public ResponseEntity<Void> triggerJob(@PathVariable String jobId) {
            // Exécution immédiate one-shot
            scheduler.scheduleOnce(jobId + "-manual-" + System.currentTimeMillis(), 0, () -> {
                // Trouver et exécuter le worker correspondant
            });
            return ResponseEntity.accepted().build();
        }
    }
    

    10. Expressions Cron communes

    Expression Description
    0 0 * * * ? Toutes les heures
    0 */15 * * * ? Toutes les 15 minutes
    0 0 6 * * ? Tous les jours à 6h
    0 0 0 * * ? Tous les jours à minuit
    0 0 0 * * SUN Tous les dimanches à minuit
    0 0 0 1 * ? Premier jour du mois
    0 0 8 ? * MON-FRI 8h en semaine
    0 0 */2 * * ? Toutes les 2 heures

    11. Bonnes pratiques

    DO

    • Utiliser des noms de jobs uniques et descriptifs
    • Gérer le chevauchement pour les jobs longs
    • Utiliser des locks distribués en multi-instances
    • Logger le début et la fin des jobs
    • Monitorer l’exécution des jobs

    DON’T

    • Ne pas planifier des jobs trop fréquents sans nécessité
    • Ne pas ignorer les erreurs dans les jobs
    • Ne pas créer trop de threads
    • Ne pas bloquer indéfiniment dans un job

    12. Références

  • Socle V004 – Architecture

    Socle V004 – Architecture

    02 – Architecture du Socle V4

    Version : 4.0.0 Date : 2025-01-25

    1. Vue d’ensemble

    ┌─────────────────────────────────────────────────────────────────┐
    │                         SOCLE V4                                │
    │                                                                 │
    │  ┌─────────────────────────────────────────────────────────┐   │
    │  │                    MOP (inchangé)                        │   │
    │  │  - Orchestration Workers                                 │   │
    │  │  - Lifecycle management                                  │   │
    │  │  - Scheduling doWork()                                   │   │
    │  └─────────────────────────────────────────────────────────┘   │
    │                              │                                  │
    │  ┌───────────┬───────────┬───┴───────┬─────────────┐           │
    │  │           │           │           │             │           │
    │  ▼           ▼           ▼           ▼             ▼           │
    │ ┌─────┐  ┌───────┐  ┌────────┐  ┌─────────┐  ┌──────────┐     │
    │ │KvBus│  │Shared │  │Supervi-│  │ HTTP    │  │ Workers  │     │
    │ │     │  │Data   │  │sor     │  │ Worker  │  │ métier   │     │
    │ └─────┘  └───────┘  └────────┘  └─────────┘  └──────────┘     │
    │                                                                 │
    │  ════════════════════ NOUVEAUTÉS V4 ════════════════════════   │
    │                                                                 │
    │  ┌─────────────┐  ┌─────────────┐  ┌─────────────────────┐     │
    │  │ H2 TechDB   │  │ Log4j2 +    │  │ Clients centraux    │     │
    │  │ (embarqué)  │  │ LogForwarder│  │ - SocleAuthClient   │     │
    │  │             │  │             │  │ - WorkerRegistry    │     │
    │  └─────────────┘  └─────────────┘  └─────────────────────┘     │
    │                                                                 │
    │  ┌─────────────┐  ┌─────────────────────────────────────┐      │
    │  │ Status      │  │ Pipeline V2                          │      │
    │  │ Dashboard   │  │ (Queue/Claim/Ack, DLQ, at-least-once)│      │
    │  │ (port 9374) │  │                                      │      │
    │  └─────────────┘  └─────────────────────────────────────┘      │
    │                                                                 │
    └─────────────────────────────────────────────────────────────────┘
    

    2. Architecture en couches

    ┌─────────────────────────────────────────────────────────────────┐
    │                    COUCHE APPLICATION                           │
    │  Workers métier + Contrôleurs REST                              │
    ├─────────────────────────────────────────────────────────────────┤
    │                    COUCHE FRAMEWORK (SOCLE)                     │
    │  MOP + Core Components + Nouveautés V4                          │
    ├─────────────────────────────────────────────────────────────────┤
    │                    COUCHE INFRASTRUCTURE                        │
    │  Tomcat (HTTP), H2 (TechDB), Redis (KV), Kafka/NATS (Msg)      │
    └─────────────────────────────────────────────────────────────────┘
    

    3. Structure des packages

    eu.lmvi.socle/
    │
    │  ══════ COMPOSANTS V3 (conservés) ══════
    ├── mop/
    │   └── MainOrchestratorProcess.java    # Orchestrateur central
    ├── worker/
    │   └── Worker.java                     # Interface de base
    ├── config/
    │   └── SocleConfiguration.java         # Configuration centralisée
    ├── kv/
    │   ├── KvBus.java                      # Abstraction KV
    │   ├── KvImplementation.java           # Interface implémentation
    │   ├── InMemoryKvImplementation.java   # Implémentation mémoire
    │   └── RedisKvImplementation.java      # Implémentation Redis
    ├── shared/
    │   └── SharedDataRegistry.java         # Registre données partagées
    ├── supervisor/
    │   └── Supervisor.java                 # Monitoring heartbeats
    ├── http/
    │   ├── HttpWorker.java                 # Worker HTTP
    │   ├── TomcatManager.java              # Gestion Tomcat
    │   └── GracefulShutdownFilter.java     # Filtre drain
    ├── admin/
    │   └── AdminRestApi.java               # API REST admin
    ├── metrics/
    │   └── SocleMetrics.java               # Métriques
    ├── pipeline/
    │   └── PipelineEngine.java             # Traitement asynchrone
    ├── resilience/
    │   ├── CircuitBreaker.java             # Circuit Breaker
    │   └── RetryExecutor.java              # Retry avec backoff
    ├── scheduler/
    │   ├── WorkerScheduler.java            # Scheduler
    │   └── CronExpression.java             # Parser cron
    ├── security/
    │   ├── AdminAuthFilter.java            # Auth admin
    │   └── RateLimitFilter.java            # Rate limiting
    │
    │  ══════ NOUVEAUX COMPOSANTS V4 ══════
    ├── techdb/
    │   ├── TechDbManager.java              # Gestionnaire H2
    │   ├── TechDbConfig.java               # Configuration Spring
    │   └── TechDbRepository.java           # Repository
    ├── logging/
    │   ├── SocleLogForwarderAppender.java  # Appender Log4j2
    │   ├── LogTransport.java               # Interface transport
    │   ├── HttpLogTransport.java           # Transport HTTP
    │   ├── NatsLogTransport.java           # Transport NATS
    │   └── H2FallbackStorage.java          # Fallback H2
    └── client/
        ├── auth/
        │   ├── SocleAuthClient.java        # Interface auth
        │   ├── AuthTokenManager.java       # Gestion tokens
        │   └── AuthTokens.java             # DTO tokens
        └── registry/
            ├── WorkerRegistryClient.java   # Client registry
            ├── WorkerRegistration.java     # DTO registration
            └── WorkerHeartbeat.java        # DTO heartbeat
    

    4. Flux de démarrage

    1. Spring Boot initialise
    2. SocleConfiguration charge la config (.env + YAML)
    3. Spring crée les beans @Component
    4. MOP.start() appelé (ApplicationReadyEvent)
       │
       ├── 4.1 [V4] TechDbManager.initialize()
       │         └── Création tables H2
       │         └── Restauration offsets
       │
       ├── 4.2 [V4] SocleAuthClient.login()
       │         └── Obtention JWT
       │
       ├── 4.3 [V4] WorkerRegistryClient.register()
       │         └── Enregistrement au Registry
       │
       ├── 4.4 SharedDataRegistry.initialize()
       ├── 4.5 KvBus.initialize()
       ├── 4.6 Supervisor.start()
       ├── 4.7 Metrics.start()
       │
       ├── 4.8 Workers triés par START_PRIORITY
       │   └── Pour chaque worker:
       │       ├── worker.initialize()
       │       ├── worker.start()
       │       ├── Register avec Supervisor
       │       └── Schedule si isScheduled()
       │
       ├── 4.9 HttpWorker.start() [priorité 1000 = dernier]
       └── 4.10 Boucle principale doWork()
    

    5. Flux de shutdown

    1. Signal SIGTERM (ou /admin/shutdown)
    2. MOP.stop() appelé
    3. État → DRAINING
       │
       ├── 3.1 HttpWorker.startDraining()
       │         └── Refuse nouvelles connexions
       ├── 3.2 HttpWorker.awaitDrain(timeout)
       │         └── Attente requêtes en cours
       │
       ├── 3.3 Workers par STOP_PRIORITY (petit = premier)
       │   ├── HttpWorker.stop() [priorité 0]
       │   └── Autres workers [priorités 1-999]
       │
       ├── 3.4 [V4] WorkerRegistryClient.unregister()
       ├── 3.5 [V4] TechDbManager.close()
       │
       ├── 3.6 Supervisor.shutdown()
       ├── 3.7 SharedData.close()
       └── 3.8 État → STOPPED
    

    6. Dépendances entre composants

    ┌──────────────────────────────────────────────────────────────┐
    │                        MOP (orchestrateur)                    │
    │                              │                                │
    │         ┌────────────────────┼────────────────────┐          │
    │         │                    │                    │          │
    │         ▼                    ▼                    ▼          │
    │   ┌───────────┐       ┌───────────┐       ┌───────────┐     │
    │   │ TechDB    │◄──────│ Supervisor│───────►│ Workers   │     │
    │   │ Manager   │       │           │        │           │     │
    │   └─────┬─────┘       └─────┬─────┘        └─────┬─────┘     │
    │         │                   │                    │           │
    │         │                   ▼                    │           │
    │         │            ┌───────────┐               │           │
    │         │            │ Worker    │◄──────────────┘           │
    │         │            │ Registry  │                           │
    │         │            │ Client    │                           │
    │         │            └─────┬─────┘                           │
    │         │                  │                                 │
    │         ▼                  ▼                                 │
    │   ┌───────────┐     ┌───────────┐                           │
    │   │ H2        │     │ SocleAuth │                           │
    │   │ Database  │     │ Client    │                           │
    │   └───────────┘     └─────┬─────┘                           │
    │                           │                                  │
    │                           ▼                                  │
    │                    ┌─────────────┐                           │
    │                    │ LogForwarder│──► HTTP/NATS (sortant)   │
    │                    │ Appender    │                           │
    │                    └─────────────┘                           │
    └──────────────────────────────────────────────────────────────┘
    

    7. Architecture du Logging V4

    ┌─────────────────────────────────────────────────────────────┐
    │                      Application                             │
    │                                                              │
    │  Logger.info("message")                                      │
    │         │                                                    │
    │         ▼                                                    │
    │  ┌─────────────────────────────────────────────────────┐    │
    │  │              Log4j2 AsyncLoggers                     │    │
    │  │              (LMAX Disruptor)                        │    │
    │  └────────────────────┬────────────────────────────────┘    │
    │                       │                                      │
    │         ┌─────────────┼─────────────┐                       │
    │         ▼             ▼             ▼                       │
    │  ┌───────────┐ ┌───────────┐ ┌─────────────────────┐       │
    │  │  Console  │ │  File     │ │ SocleLogForwarder   │       │
    │  │  Appender │ │  Appender │ │ Appender            │       │
    │  └───────────┘ └───────────┘ └──────────┬──────────┘       │
    │                                         │                   │
    └─────────────────────────────────────────┼───────────────────┘
                                              │
                        ┌─────────────────────┴─────────────────────┐
                        │                                           │
                        ▼                                           ▼
                ┌──────────────┐                           ┌──────────────┐
                │ HTTP Transport│                           │ NATS Transport│
                │ → LogHub     │                           │ → JetStream  │
                └──────┬───────┘                           └──────┬───────┘
                       │                                          │
                       │  (si échec)                              │
                       ▼                                          │
                ┌──────────────┐                                  │
                │ H2 Fallback  │◄─────────────────────────────────┘
                │ Storage      │
                └──────────────┘
    

    8. Patterns de conception

    Pattern Composant Usage
    Orchestrator MOP Orchestre tout le lifecycle
    Observer KvBus pub/sub Communication inter-workers
    Strategy KvImplementation In-memory ou Redis
    Registry SharedDataRegistry État partagé
    Circuit Breaker CircuitBreaker Résilience pannes
    Factory Spring DI Création composants
    Builder Configuration Construction config

    9. Thread Safety

    Mécanisme Usage
    ConcurrentHashMap Maps partagées
    AtomicLong/Boolean Compteurs atomiques
    ReentrantReadWriteLock Opérations complexes
    BlockingQueue Queues thread-safe
    CompletableFuture Opérations async
    ScheduledExecutorService Scheduling
    LMAX Disruptor Ring buffer logging

    10. Points d’extension

    Créer un nouveau Worker

    @Component
    public class MonWorker implements Worker {
        // Implémenter l'interface Worker
    }
    

    Ajouter une implémentation KvBus

    public class MonKvImplementation implements KvImplementation {
        // Implémenter l'interface
    }
    

    Créer un Pipeline Stage

    public class MonStage implements PipelineStage<MonType> {
        // Implémenter le traitement
    }
    

    11. Références

  • Socle V004 – Scheduler

    Socle V004 – Scheduler

    12 – Scheduler

    Version : 4.0.0 Date : 2025-12-09

    1. Introduction

    Le Scheduler permet d’exécuter des Workers selon des expressions cron ou à intervalles réguliers.

    Caractéristiques

    • Support des expressions cron standard
    • Exécution à intervalle fixe
    • Gestion du chevauchement
    • Intégration avec le MOP

    2. Configuration

    2.1 application.yml

    socle:
      scheduler:
        enabled: ${SCHEDULER_ENABLED:true}
        thread-pool-size: ${SCHEDULER_POOL_SIZE:4}
        default-timezone: ${SCHEDULER_TIMEZONE:Europe/Paris}
    

    2.2 Variables d’environnement

    Variable Description Défaut
    SCHEDULER_ENABLED Activer le scheduler true
    SCHEDULER_POOL_SIZE Taille du thread pool 4
    SCHEDULER_TIMEZONE Timezone par défaut Europe/Paris

    3. Types de scheduling

    3.1 Cron

    Expressions cron standard (6 champs) :

    ┌───────────── seconde (0-59)
    │ ┌───────────── minute (0-59)
    │ │ ┌───────────── heure (0-23)
    │ │ │ ┌───────────── jour du mois (1-31)
    │ │ │ │ ┌───────────── mois (1-12)
    │ │ │ │ │ ┌───────────── jour de la semaine (0-6, 0=dimanche)
    │ │ │ │ │ │
    * * * * * *
    

    Exemples :

    • 0 0 6 * * ? : Tous les jours à 6h00
    • 0 */15 * * * ? : Toutes les 15 minutes
    • 0 0 0 1 * ? : Premier jour de chaque mois à minuit
    • 0 30 8 ? * MON-FRI : 8h30 du lundi au vendredi

    3.2 Intervalle fixe

    Exécution périodique simple :

    @Override
    public long getCycleIntervalMs() {
        return 60000;  // Toutes les minutes
    }
    

    4. Worker schedulé

    4.1 Avec expression cron

    @Component
    public class DailyReportWorker implements Worker {
    
        private static final Logger log = LoggerFactory.getLogger(DailyReportWorker.class);
    
        @Override
        public String getName() {
            return "daily-report-worker";
        }
    
        @Override
        public String getSchedule() {
            return "0 0 6 * * ?";  // Tous les jours à 6h
        }
    
        @Override
        public boolean isScheduled() {
            return true;
        }
    
        @Override
        public void doWork() {
            log.info("Generating daily report...");
            generateReport();
        }
    
        private void generateReport() {
            // Génération du rapport
        }
    
        // Autres méthodes Worker...
    }
    

    4.2 Avec intervalle

    @Component
    public class HealthCheckWorker implements Worker {
    
        @Override
        public String getName() {
            return "health-check-worker";
        }
    
        @Override
        public String getSchedule() {
            return null;  // Pas de cron
        }
    
        @Override
        public boolean isScheduled() {
            return false;  // Pas schedulé par cron
        }
    
        @Override
        public long getCycleIntervalMs() {
            return 30000;  // Toutes les 30 secondes
        }
    
        @Override
        public void doWork() {
            checkHealth();
        }
    }
    

    5. Interface Scheduler

    package eu.lmvi.socle.scheduler;
    
    public interface Scheduler {
    
        /**
         * Planifie un job cron
         */
        void scheduleCron(String jobId, String cronExpression, Runnable task);
    
        /**
         * Planifie un job à intervalle fixe
         */
        void scheduleInterval(String jobId, long intervalMs, Runnable task);
    
        /**
         * Planifie un job à intervalle fixe avec délai initial
         */
        void scheduleInterval(String jobId, long initialDelayMs, long intervalMs, Runnable task);
    
        /**
         * Planifie un job one-shot
         */
        void scheduleOnce(String jobId, long delayMs, Runnable task);
    
        /**
         * Annule un job
         */
        void cancel(String jobId);
    
        /**
         * Vérifie si un job est planifié
         */
        boolean isScheduled(String jobId);
    
        /**
         * Liste les jobs planifiés
         */
        List<ScheduledJob> getScheduledJobs();
    
        /**
         * Démarre le scheduler
         */
        void start();
    
        /**
         * Arrête le scheduler
         */
        void stop();
    }
    

    6. Implémentation

    package eu.lmvi.socle.scheduler;
    
    @Component
    public class DefaultScheduler implements Scheduler {
    
        private static final Logger log = LoggerFactory.getLogger(DefaultScheduler.class);
    
        private final ScheduledExecutorService executor;
        private final ConcurrentHashMap<String, ScheduledFuture<?>> jobs = new ConcurrentHashMap<>();
        private final ConcurrentHashMap<String, ScheduledJob> jobInfo = new ConcurrentHashMap<>();
        private final ZoneId timezone;
    
        public DefaultScheduler(SocleConfiguration config) {
            int poolSize = config.getScheduler().getThreadPoolSize();
            this.executor = Executors.newScheduledThreadPool(poolSize,
                r -> new Thread(r, "scheduler-" + System.currentTimeMillis()));
            this.timezone = ZoneId.of(config.getScheduler().getDefaultTimezone());
        }
    
        @Override
        public void scheduleCron(String jobId, String cronExpression, Runnable task) {
            CronExpression cron = CronExpression.parse(cronExpression);
    
            Runnable scheduledTask = () -> {
                log.debug("Executing cron job: {}", jobId);
                try {
                    task.run();
                } catch (Exception e) {
                    log.error("Error executing job {}: {}", jobId, e.getMessage(), e);
                }
                // Replanifier la prochaine exécution
                scheduleNextCronExecution(jobId, cron, task);
            };
    
            scheduleNextCronExecution(jobId, cron, task);
    
            jobInfo.put(jobId, new ScheduledJob(jobId, "cron", cronExpression, null, Instant.now()));
            log.info("Scheduled cron job: {} with expression: {}", jobId, cronExpression);
        }
    
        private void scheduleNextCronExecution(String jobId, CronExpression cron, Runnable task) {
            ZonedDateTime now = ZonedDateTime.now(timezone);
            ZonedDateTime next = cron.next(now);
    
            if (next != null) {
                long delayMs = Duration.between(now, next).toMillis();
    
                ScheduledFuture<?> future = executor.schedule(() -> {
                    task.run();
                    scheduleNextCronExecution(jobId, cron, task);
                }, delayMs, TimeUnit.MILLISECONDS);
    
                jobs.put(jobId, future);
            }
        }
    
        @Override
        public void scheduleInterval(String jobId, long intervalMs, Runnable task) {
            scheduleInterval(jobId, 0, intervalMs, task);
        }
    
        @Override
        public void scheduleInterval(String jobId, long initialDelayMs, long intervalMs, Runnable task) {
            Runnable wrappedTask = () -> {
                log.debug("Executing interval job: {}", jobId);
                try {
                    task.run();
                } catch (Exception e) {
                    log.error("Error executing job {}: {}", jobId, e.getMessage(), e);
                }
            };
    
            ScheduledFuture<?> future = executor.scheduleAtFixedRate(
                wrappedTask, initialDelayMs, intervalMs, TimeUnit.MILLISECONDS);
    
            jobs.put(jobId, future);
            jobInfo.put(jobId, new ScheduledJob(jobId, "interval", null, intervalMs, Instant.now()));
    
            log.info("Scheduled interval job: {} every {}ms", jobId, intervalMs);
        }
    
        @Override
        public void scheduleOnce(String jobId, long delayMs, Runnable task) {
            ScheduledFuture<?> future = executor.schedule(() -> {
                log.debug("Executing one-shot job: {}", jobId);
                try {
                    task.run();
                } finally {
                    jobs.remove(jobId);
                    jobInfo.remove(jobId);
                }
            }, delayMs, TimeUnit.MILLISECONDS);
    
            jobs.put(jobId, future);
            jobInfo.put(jobId, new ScheduledJob(jobId, "once", null, delayMs, Instant.now()));
    
            log.info("Scheduled one-shot job: {} in {}ms", jobId, delayMs);
        }
    
        @Override
        public void cancel(String jobId) {
            ScheduledFuture<?> future = jobs.remove(jobId);
            if (future != null) {
                future.cancel(false);
                jobInfo.remove(jobId);
                log.info("Cancelled job: {}", jobId);
            }
        }
    
        @Override
        public boolean isScheduled(String jobId) {
            return jobs.containsKey(jobId);
        }
    
        @Override
        public List<ScheduledJob> getScheduledJobs() {
            return new ArrayList<>(jobInfo.values());
        }
    
        @Override
        public void start() {
            log.info("Scheduler started");
        }
    
        @Override
        public void stop() {
            log.info("Stopping scheduler...");
            jobs.values().forEach(f -> f.cancel(false));
            jobs.clear();
            executor.shutdown();
            try {
                if (!executor.awaitTermination(30, TimeUnit.SECONDS)) {
                    executor.shutdownNow();
                }
            } catch (InterruptedException e) {
                executor.shutdownNow();
                Thread.currentThread().interrupt();
            }
            log.info("Scheduler stopped");
        }
    }
    

    7. Gestion du chevauchement

    7.1 Éviter le chevauchement

    @Component
    public class LongRunningWorker implements Worker {
    
        private final AtomicBoolean running = new AtomicBoolean(false);
    
        @Override
        public void doWork() {
            // Éviter l'exécution concurrente
            if (!running.compareAndSet(false, true)) {
                log.warn("Previous execution still running, skipping");
                return;
            }
    
            try {
                doLongWork();
            } finally {
                running.set(false);
            }
        }
    
        private void doLongWork() {
            // Traitement long
        }
    }
    

    7.2 Avec verrou distribué (multi-instances)

    @Component
    public class DistributedScheduledWorker implements Worker {
    
        @Autowired
        private KvBus kvBus;
    
        @Override
        public void doWork() {
            String lockKey = "lock:job:" + getName();
    
            // Tenter d'acquérir le lock
            if (!kvBus.putIfAbsent(lockKey, "locked", Duration.ofMinutes(10))) {
                log.debug("Job already running on another instance");
                return;
            }
    
            try {
                executeJob();
            } finally {
                kvBus.delete(lockKey);
            }
        }
    }
    

    8. Intégration MOP

    Le MOP intègre automatiquement les workers schedulés :

    // Dans MainOrchestratorProcess
    private void startScheduledWorkers() {
        for (Worker worker : workers) {
            if (worker.isScheduled() && worker.getSchedule() != null) {
                scheduler.scheduleCron(
                    "worker:" + worker.getName(),
                    worker.getSchedule(),
                    () -> {
                        if (worker.isHealthy()) {
                            worker.doWork();
                        }
                    }
                );
            }
        }
    }
    

    9. API Admin

    @RestController
    @RequestMapping("/admin/scheduler")
    public class SchedulerController {
    
        @Autowired
        private Scheduler scheduler;
    
        @GetMapping("/jobs")
        public List<ScheduledJob> listJobs() {
            return scheduler.getScheduledJobs();
        }
    
        @PostMapping("/jobs/{jobId}/cancel")
        public ResponseEntity<Void> cancelJob(@PathVariable String jobId) {
            if (scheduler.isScheduled(jobId)) {
                scheduler.cancel(jobId);
                return ResponseEntity.ok().build();
            }
            return ResponseEntity.notFound().build();
        }
    
        @PostMapping("/jobs/{jobId}/trigger")
        public ResponseEntity<Void> triggerJob(@PathVariable String jobId) {
            // Exécution immédiate one-shot
            scheduler.scheduleOnce(jobId + "-manual-" + System.currentTimeMillis(), 0, () -> {
                // Trouver et exécuter le worker correspondant
            });
            return ResponseEntity.accepted().build();
        }
    }
    

    10. Expressions Cron communes

    Expression Description
    0 0 * * * ? Toutes les heures
    0 */15 * * * ? Toutes les 15 minutes
    0 0 6 * * ? Tous les jours à 6h
    0 0 0 * * ? Tous les jours à minuit
    0 0 0 * * SUN Tous les dimanches à minuit
    0 0 0 1 * ? Premier jour du mois
    0 0 8 ? * MON-FRI 8h en semaine
    0 0 */2 * * ? Toutes les 2 heures

    11. Bonnes pratiques

    DO

    • Utiliser des noms de jobs uniques et descriptifs
    • Gérer le chevauchement pour les jobs longs
    • Utiliser des locks distribués en multi-instances
    • Logger le début et la fin des jobs
    • Monitorer l’exécution des jobs

    DON’T

    • Ne pas planifier des jobs trop fréquents sans nécessité
    • Ne pas ignorer les erreurs dans les jobs
    • Ne pas créer trop de threads
    • Ne pas bloquer indéfiniment dans un job

    12. Références

  • Socle V004 – EventBus et Workers

    Socle V004 – EventBus et Workers

    30 – EventBus et Workers Push

    Version : 4.1.0 Date : 2026-01-23 Statut : Specification

    1. Introduction

    Ce document introduit le nouveau systeme EventBus et les Workers Push dans le Socle V004. Ce mecanisme remplace avantageusement le polling par un systeme de notification instantanee.

    1.1 Motivation

    Le mecanisme actuel des workers event-driven (AbstractEventDrivenWorker) utilise un polling avec timeout :

    // Ancien mecanisme (DEPRECIE)
    while (running) {
        event = queue.poll(100, TimeUnit.MILLISECONDS);  // Verifie toutes les 100ms
        if (event != null) processEvent(event);
    }
    

    Problemes :

    • Latence de 0 a 100ms avant traitement
    • Consommation CPU constante (meme si faible)
    • Pas de vrai mecanisme push

    1.2 Nouveau mecanisme Push

    // Nouveau mecanisme (RECOMMANDE)
    while (running) {
        event = queue.take();  // Dort jusqu'a ce qu'un event arrive
        processEvent(event);   // Reveil instantane
    }
    

    Avantages :

    • Latence ~0ms (reveil instantane)
    • Zero CPU quand idle
    • Vrai mecanisme push event-driven

    2. Depreciation des anciens Workers

    2.1 Classes depreciees

    Classe Statut Remplacement
    AbstractEventDrivenWorker<T> DEPRECIE AbstractPushWorker<T>
    KafkaEventWorker<K,V> DEPRECIE KafkaPushWorker<K,V>

    2.2 Migration

    Les anciens workers continuent de fonctionner mais ne doivent plus etre utilises pour les nouveaux developpements.

    // DEPRECIE - Ne plus utiliser
    @Deprecated(since = "4.1.0", forRemoval = true)
    public abstract class AbstractEventDrivenWorker<T> implements Worker { ... }
    
    // RECOMMANDE - Utiliser ceci
    public abstract class AbstractPushWorker<T> implements Worker { ... }
    

    2.3 Calendrier de depreciation

    Version Action
    4.1.0 Introduction EventBus + Workers Push, depreciation des anciens
    4.2.0 Warnings de compilation pour les anciens workers
    5.0.0 Suppression des anciens workers (breaking change)

    3. Architecture EventBus

    3.1 Vue d’ensemble

    ┌─────────────────────────────────────────────────────────────────────┐
    │                              MOP                                     │
    │                                                                      │
    │  ┌──────────────────────────────────────────────────────────────┐   │
    │  │                        EventBus                               │   │
    │  │                                                               │   │
    │  │   Channels:                                                   │   │
    │  │     "orders.created"   ──► [Worker1.queue, Worker2.queue]     │   │
    │  │     "orders.updated"   ──► [Worker1.queue]                    │   │
    │  │     "alerts.*"         ──► [AlertWorker.queue]                │   │
    │  │                                                               │   │
    │  └──────────────────────────────────────────────────────────────┘   │
    │                              │                                       │
    │         publish("orders.created", event)                            │
    │                              │                                       │
    │                              ▼                                       │
    │  ┌────────────────┐  ┌────────────────┐  ┌────────────────┐        │
    │  │ PushWorker 1   │  │ PushWorker 2   │  │ PushWorker 3   │        │
    │  │                │  │                │  │                │        │
    │  │ queue.take()   │  │ queue.take()   │  │ queue.take()   │        │
    │  │ (dort)         │  │ (dort)         │  │ (dort)         │        │
    │  │      ↓         │  │      ↓         │  │                │        │
    │  │ REVEIL!        │  │ REVEIL!        │  │ (pas abonne)   │        │
    │  └────────────────┘  └────────────────┘  └────────────────┘        │
    │                                                                      │
    └─────────────────────────────────────────────────────────────────────┘
    

    3.2 Composants

    Composant Role
    EventBus Bus central de publication/souscription
    BusEvent<T> Enveloppe d’evenement avec metadata
    AbstractPushWorker<T> Worker reveille par EventBus
    KafkaPushWorker<K,V> Worker Kafka avec push interne
    EventBusWorker Worker qui gere le lifecycle de l’EventBus

    4. Interface EventBus

    4.1 Definition

    package eu.lmvi.socle.event;
    
    /**
     * Bus d'evenements central avec mecanisme push.
     *
     * <p>L'EventBus permet la communication entre workers via un systeme
     * de publication/souscription avec reveil instantane.</p>
     */
    public interface EventBus {
    
        // ==================== Publication ====================
    
        /**
         * Publie un evenement sur un channel.
         * Les subscribers sont reveilles instantanement.
         *
         * @param channel Nom du channel (ex: "orders.created")
         * @param payload Contenu de l'evenement
         */
        void publish(String channel, Object payload);
    
        /**
         * Publie un evenement avec metadata personnalisees.
         *
         * @param channel  Nom du channel
         * @param payload  Contenu de l'evenement
         * @param metadata Metadata additionnelles
         */
        void publish(String channel, Object payload, Map<String, String> metadata);
    
        /**
         * Publication asynchrone avec confirmation.
         *
         * @return Future completee quand tous les subscribers ont recu l'event
         */
        CompletableFuture<PublishResult> publishAsync(String channel, Object payload);
    
        // ==================== Souscription ====================
    
        /**
         * S'abonne a un channel et retourne une queue pour reception.
         * Utiliser queue.take() pour attendre les evenements (push).
         *
         * @param channel   Nom du channel
         * @param eventType Type attendu du payload
         * @return Queue sur laquelle faire take()
         */
        <T> Subscription<T> subscribe(String channel, Class<T> eventType);
    
        /**
         * S'abonne avec un pattern glob (ex: "orders.*", "*.created").
         *
         * @param pattern   Pattern glob
         * @param eventType Type attendu du payload
         * @return Subscription avec queue
         */
        <T> Subscription<T> subscribePattern(String pattern, Class<T> eventType);
    
        /**
         * S'abonne avec un handler callback (execution immediate).
         *
         * @param channel Nom du channel
         * @param handler Handler appele pour chaque evenement
         * @return ID de souscription pour unsubscribe
         */
        String subscribe(String channel, EventHandler handler);
    
        /**
         * Se desabonne.
         *
         * @param subscriptionId ID retourne par subscribe
         */
        void unsubscribe(String subscriptionId);
    
        // ==================== Lifecycle ====================
    
        /**
         * Demarre l'EventBus.
         */
        void start();
    
        /**
         * Arrete l'EventBus proprement.
         * Les queues sont videes, les subscribers notifies.
         */
        void stop();
    
        /**
         * Verifie la sante de l'EventBus.
         */
        boolean isHealthy();
    
        // ==================== Stats ====================
    
        /**
         * Statistiques globales.
         */
        EventBusStats getStats();
    
        /**
         * Statistiques par channel.
         */
        Map<String, ChannelStats> getChannelStats();
    }
    

    4.2 Classes de support

    package eu.lmvi.socle.event;
    
    /**
     * Evenement transporte par l'EventBus.
     */
    public record BusEvent<T>(
        String id,                      // UUID unique
        String channel,                 // Channel de publication
        Instant timestamp,              // Timestamp de publication
        String source,                  // Source (worker name)
        T payload,                      // Contenu metier
        Map<String, String> metadata    // Metadata additionnelles
    ) {
        /**
         * Cree un evenement simple.
         */
        public static <T> BusEvent<T> of(String channel, T payload) {
            return new BusEvent<>(
                UUID.randomUUID().toString(),
                channel,
                Instant.now(),
                null,
                payload,
                Map.of()
            );
        }
    }
    
    /**
     * Souscription a un channel.
     */
    public interface Subscription<T> {
    
        /**
         * ID unique de la souscription.
         */
        String getId();
    
        /**
         * Channel ou pattern souscrit.
         */
        String getChannelOrPattern();
    
        /**
         * Queue de reception des evenements.
         * Utiliser take() pour attendre (bloquant, reveil instantane).
         * Utiliser poll(timeout) si besoin d'un timeout.
         */
        BlockingQueue<BusEvent<T>> getQueue();
    
        /**
         * Se desabonner.
         */
        void unsubscribe();
    
        /**
         * Nombre d'evenements en attente dans la queue.
         */
        int getPendingCount();
    }
    
    /**
     * Handler pour souscription callback.
     */
    @FunctionalInterface
    public interface EventHandler {
        void onEvent(BusEvent<?> event);
    }
    
    /**
     * Resultat de publication.
     */
    public record PublishResult(
        String eventId,
        int deliveredCount,
        int failedCount,
        Duration duration
    ) {}
    
    /**
     * Statistiques EventBus.
     */
    public record EventBusStats(
        long totalPublished,
        long totalDelivered,
        long totalFailed,
        int activeSubscriptions,
        int activeChannels,
        Instant startedAt
    ) {}
    
    /**
     * Statistiques par channel.
     */
    public record ChannelStats(
        String channel,
        long publishedCount,
        int subscriberCount,
        long lastPublishedAt
    ) {}
    

    5. Implementations EventBus

    5.1 InMemoryEventBus

    Implementation pour mono-instance (dev, tests, applications simples).

    package eu.lmvi.socle.event.impl;
    
    @Component
    @ConditionalOnProperty(name = "socle.eventbus.mode", havingValue = "in_memory", matchIfMissing = true)
    public class InMemoryEventBus implements EventBus {
    
        private static final Logger log = LoggerFactory.getLogger(InMemoryEventBus.class);
    
        // Subscriptions par channel exact
        private final Map<String, Set<SubscriptionImpl<?>>> channelSubs = new ConcurrentHashMap<>();
    
        // Subscriptions par pattern
        private final Map<Pattern, Set<SubscriptionImpl<?>>> patternSubs = new ConcurrentHashMap<>();
    
        // Index pour lookup rapide
        private final Map<String, SubscriptionImpl<?>> subsIndex = new ConcurrentHashMap<>();
    
        // Compteurs
        private final AtomicLong idGen = new AtomicLong();
        private final AtomicLong publishedCount = new AtomicLong();
        private final AtomicLong deliveredCount = new AtomicLong();
        private final AtomicLong failedCount = new AtomicLong();
    
        // Etat
        private volatile boolean running = false;
        private Instant startedAt;
    
        // ==================== Lifecycle ====================
    
        @Override
        public void start() {
            running = true;
            startedAt = Instant.now();
            log.info("InMemoryEventBus started");
        }
    
        @Override
        public void stop() {
            running = false;
    
            // Notifier tous les subscribers (poison pill)
            for (SubscriptionImpl<?> sub : subsIndex.values()) {
                sub.notifyShutdown();
            }
    
            log.info("InMemoryEventBus stopped. Published={}, Delivered={}, Failed={}",
                publishedCount.get(), deliveredCount.get(), failedCount.get());
        }
    
        @Override
        public boolean isHealthy() {
            return running;
        }
    
        // ==================== Publish ====================
    
        @Override
        public void publish(String channel, Object payload) {
            publish(channel, payload, Map.of());
        }
    
        @Override
        public void publish(String channel, Object payload, Map<String, String> metadata) {
            if (!running) {
                log.warn("EventBus not running, event dropped on channel '{}'", channel);
                return;
            }
    
            BusEvent<?> event = new BusEvent<>(
                "evt-" + idGen.incrementAndGet(),
                channel,
                Instant.now(),
                resolveSource(),
                payload,
                metadata
            );
    
            int delivered = 0;
            int failed = 0;
    
            // Livraison aux subscriptions exactes
            Set<SubscriptionImpl<?>> exact = channelSubs.get(channel);
            if (exact != null) {
                for (SubscriptionImpl<?> sub : exact) {
                    if (deliverToSubscription(sub, event)) {
                        delivered++;
                    } else {
                        failed++;
                    }
                }
            }
    
            // Livraison aux subscriptions par pattern
            for (Map.Entry<Pattern, Set<SubscriptionImpl<?>>> entry : patternSubs.entrySet()) {
                if (entry.getKey().matcher(channel).matches()) {
                    for (SubscriptionImpl<?> sub : entry.getValue()) {
                        if (deliverToSubscription(sub, event)) {
                            delivered++;
                        } else {
                            failed++;
                        }
                    }
                }
            }
    
            publishedCount.incrementAndGet();
            deliveredCount.addAndGet(delivered);
            failedCount.addAndGet(failed);
    
            log.debug("Published event {} to channel '{}': delivered={}, failed={}",
                event.id(), channel, delivered, failed);
        }
    
        @Override
        public CompletableFuture<PublishResult> publishAsync(String channel, Object payload) {
            return CompletableFuture.supplyAsync(() -> {
                Instant start = Instant.now();
                publish(channel, payload);
                // Simplification: on ne trace pas le detail ici
                return new PublishResult(
                    "evt-" + idGen.get(),
                    1, 0,
                    Duration.between(start, Instant.now())
                );
            });
        }
    
        @SuppressWarnings("unchecked")
        private boolean deliverToSubscription(SubscriptionImpl<?> sub, BusEvent<?> event) {
            try {
                BlockingQueue<BusEvent<?>> queue = (BlockingQueue<BusEvent<?>>) sub.getQueue();
    
                // offer avec timeout pour eviter blocage infini
                boolean offered = queue.offer(event, 100, TimeUnit.MILLISECONDS);
    
                if (!offered) {
                    log.warn("Queue full for subscription {}, event dropped", sub.getId());
                    return false;
                }
    
                return true;
    
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                return false;
            } catch (Exception e) {
                log.error("Error delivering to subscription {}: {}", sub.getId(), e.getMessage());
                return false;
            }
        }
    
        private String resolveSource() {
            // Recuperer le nom du worker courant si possible
            String threadName = Thread.currentThread().getName();
            return threadName;
        }
    
        // ==================== Subscribe ====================
    
        @Override
        public <T> Subscription<T> subscribe(String channel, Class<T> eventType) {
            String subId = "sub-" + idGen.incrementAndGet();
            SubscriptionImpl<T> sub = new SubscriptionImpl<>(subId, channel, null, this);
    
            channelSubs
                .computeIfAbsent(channel, k -> ConcurrentHashMap.newKeySet())
                .add(sub);
    
            subsIndex.put(subId, sub);
    
            log.info("Subscription {} created on channel '{}'", subId, channel);
            return sub;
        }
    
        @Override
        public <T> Subscription<T> subscribePattern(String pattern, Class<T> eventType) {
            String subId = "sub-" + idGen.incrementAndGet();
    
            // Convertir glob en regex
            String regex = pattern
                .replace(".", "\\.")
                .replace("*", "[^.]*")
                .replace(">", ".*");
            Pattern compiled = Pattern.compile("^" + regex + "$");
    
            SubscriptionImpl<T> sub = new SubscriptionImpl<>(subId, pattern, compiled, this);
    
            patternSubs
                .computeIfAbsent(compiled, k -> ConcurrentHashMap.newKeySet())
                .add(sub);
    
            subsIndex.put(subId, sub);
    
            log.info("Subscription {} created on pattern '{}'", subId, pattern);
            return sub;
        }
    
        @Override
        public String subscribe(String channel, EventHandler handler) {
            // Implementation simplifiee: on cree une subscription normale
            // et un thread qui consomme et appelle le handler
            Subscription<Object> sub = subscribe(channel, Object.class);
    
            Thread.ofVirtual()
                .name("handler-" + sub.getId())
                .start(() -> {
                    while (running) {
                        try {
                            BusEvent<Object> event = sub.getQueue().take();
                            if (event.payload() == null) break; // Poison pill
                            handler.onEvent(event);
                        } catch (InterruptedException e) {
                            Thread.currentThread().interrupt();
                            break;
                        } catch (Exception e) {
                            log.error("Handler error on {}: {}", channel, e.getMessage());
                        }
                    }
                });
    
            return sub.getId();
        }
    
        @Override
        public void unsubscribe(String subscriptionId) {
            SubscriptionImpl<?> sub = subsIndex.remove(subscriptionId);
            if (sub == null) {
                log.warn("Subscription {} not found", subscriptionId);
                return;
            }
    
            // Retirer des maps
            if (sub.pattern == null) {
                Set<SubscriptionImpl<?>> subs = channelSubs.get(sub.channelOrPattern);
                if (subs != null) subs.remove(sub);
            } else {
                Set<SubscriptionImpl<?>> subs = patternSubs.get(sub.pattern);
                if (subs != null) subs.remove(sub);
            }
    
            sub.notifyShutdown();
            log.info("Subscription {} removed", subscriptionId);
        }
    
        // ==================== Stats ====================
    
        @Override
        public EventBusStats getStats() {
            return new EventBusStats(
                publishedCount.get(),
                deliveredCount.get(),
                failedCount.get(),
                subsIndex.size(),
                channelSubs.size(),
                startedAt
            );
        }
    
        @Override
        public Map<String, ChannelStats> getChannelStats() {
            Map<String, ChannelStats> stats = new HashMap<>();
    
            for (Map.Entry<String, Set<SubscriptionImpl<?>>> entry : channelSubs.entrySet()) {
                stats.put(entry.getKey(), new ChannelStats(
                    entry.getKey(),
                    0, // TODO: compteur par channel
                    entry.getValue().size(),
                    0
                ));
            }
    
            return stats;
        }
    
        // ==================== Subscription Implementation ====================
    
        private static class SubscriptionImpl<T> implements Subscription<T> {
    
            private final String id;
            private final String channelOrPattern;
            private final Pattern pattern;
            private final BlockingQueue<BusEvent<T>> queue;
            private final InMemoryEventBus bus;
    
            SubscriptionImpl(String id, String channelOrPattern, Pattern pattern, InMemoryEventBus bus) {
                this.id = id;
                this.channelOrPattern = channelOrPattern;
                this.pattern = pattern;
                this.queue = new LinkedBlockingQueue<>(10_000); // Capacite par defaut
                this.bus = bus;
            }
    
            @Override
            public String getId() {
                return id;
            }
    
            @Override
            public String getChannelOrPattern() {
                return channelOrPattern;
            }
    
            @Override
            public BlockingQueue<BusEvent<T>> getQueue() {
                return queue;
            }
    
            @Override
            public void unsubscribe() {
                bus.unsubscribe(id);
            }
    
            @Override
            public int getPendingCount() {
                return queue.size();
            }
    
            void notifyShutdown() {
                // Injecter un poison pill pour debloquer les take()
                try {
                    queue.offer(new BusEvent<>(null, null, null, null, null, null));
                } catch (Exception ignored) {}
            }
        }
    }
    

    5.2 RedisEventBus

    Implementation pour multi-instances (production).

    package eu.lmvi.socle.event.impl;
    
    @Component
    @ConditionalOnProperty(name = "socle.eventbus.mode", havingValue = "redis")
    public class RedisEventBus implements EventBus {
    
        private final JedisPool jedisPool;
        private final ObjectMapper mapper;
        private final String prefix;
        private final ExecutorService listenerExecutor;
    
        // Subscriptions locales (une queue locale par subscription)
        private final Map<String, LocalSubscription<?>> localSubs = new ConcurrentHashMap<>();
    
        // JedisPubSub par channel (partage entre subscriptions locales)
        private final Map<String, RedisPubSubListener> redisListeners = new ConcurrentHashMap<>();
    
        // ... Implementation similaire avec Redis Pub/Sub ...
    
        @Override
        public void publish(String channel, Object payload) {
            try (Jedis jedis = jedisPool.getResource()) {
                BusEvent<?> event = BusEvent.of(channel, payload);
                String json = mapper.writeValueAsString(event);
                jedis.publish(prefix + channel, json);
            } catch (Exception e) {
                log.error("Redis publish error: {}", e.getMessage());
            }
        }
    
        // Redis SUBSCRIBE dans un thread dedie
        // Dispatch vers les queues locales
    }
    

    6. AbstractPushWorker

    6.1 Definition

    package eu.lmvi.socle.worker.push;
    
    /**
     * Worker reveille instantanement par l'EventBus.
     *
     * <p>Contrairement a {@link AbstractEventDrivenWorker} (DEPRECIE) qui utilise
     * un polling avec timeout, ce worker utilise un mecanisme push avec reveil
     * instantane via {@link BlockingQueue#take()}.</p>
     *
     * <h3>Avantages</h3>
     * <ul>
     *   <li>Latence ~0ms (reveil instantane)</li>
     *   <li>Zero CPU quand idle</li>
     *   <li>Integration native avec EventBus</li>
     * </ul>
     *
     * <h3>Exemple d'utilisation</h3>
     * <pre>{@code
     * @Component
     * public class OrderWorker extends AbstractPushWorker<OrderEvent> {
     *
     *     public OrderWorker() {
     *         super("orders.created", "orders.updated");
     *     }
     *
     *     @Override
     *     public String getName() {
     *         return "order_worker";
     *     }
     *
     *     @Override
     *     protected Class<OrderEvent> getEventType() {
     *         return OrderEvent.class;
     *     }
     *
     *     @Override
     *     protected void processEvent(OrderEvent event) {
     *         // Traitement instantane des que l'event arrive
     *     }
     * }
     * }</pre>
     *
     * @param <T> Type du payload des evenements
     */
    public abstract class AbstractPushWorker<T> implements Worker {
    
        protected final Logger logger = LoggerFactory.getLogger(getClass());
    
        // ==================== Injection ====================
    
        @Autowired
        protected EventBus eventBus;
    
        // ==================== Configuration ====================
    
        private final String[] channels;
        private final int concurrency;
        private final int queueCapacity;
        private final Duration shutdownTimeout;
    
        // ==================== Etat ====================
    
        protected final AtomicBoolean running = new AtomicBoolean(false);
        protected ExecutorService executor;
        protected List<Subscription<T>> subscriptions = new ArrayList<>();
        protected BlockingQueue<BusEvent<T>> mergedQueue;
    
        // ==================== Metriques ====================
    
        protected final AtomicLong processedCount = new AtomicLong(0);
        protected final AtomicLong errorCount = new AtomicLong(0);
        protected final AtomicLong totalDurationNs = new AtomicLong(0);
        protected volatile Instant lastExecution;
        protected volatile Instant startedAt;
    
        // ==================== Constructeurs ====================
    
        /**
         * Cree un worker push mono-thread.
         *
         * @param channels Channels a ecouter
         */
        protected AbstractPushWorker(String... channels) {
            this(1, channels);
        }
    
        /**
         * Cree un worker push avec concurrence.
         *
         * @param concurrency Nombre de threads de traitement
         * @param channels    Channels a ecouter
         */
        protected AbstractPushWorker(int concurrency, String... channels) {
            this(concurrency, 10_000, Duration.ofSeconds(30), channels);
        }
    
        /**
         * Cree un worker push avec configuration complete.
         *
         * @param concurrency     Nombre de threads de traitement
         * @param queueCapacity   Capacite de la queue interne
         * @param shutdownTimeout Timeout d'arret gracieux
         * @param channels        Channels a ecouter
         */
        protected AbstractPushWorker(int concurrency, int queueCapacity,
                                      Duration shutdownTimeout, String... channels) {
            this.concurrency = Math.max(1, concurrency);
            this.queueCapacity = queueCapacity;
            this.shutdownTimeout = shutdownTimeout;
            this.channels = channels;
    
            if (channels == null || channels.length == 0) {
                throw new IllegalArgumentException("At least one channel is required");
            }
        }
    
        // ==================== Worker Interface ====================
    
        @Override
        public final String getSchedule() {
            return "PUSH";
        }
    
        @Override
        public final long getCycleIntervalMs() {
            return -1;
        }
    
        @Override
        public final boolean isPassive() {
            return true;
        }
    
        @Override
        public int getStartPriority() {
            return 200;  // Apres l'EventBus (100)
        }
    
        @Override
        public int getStopPriority() {
            return 200;  // Avant l'EventBus
        }
    
        @Override
        public void initialize() {
            logger.info("[{}] Initializing push worker", getName());
            logger.info("[{}] Channels: {}", getName(), Arrays.toString(channels));
            logger.info("[{}] Concurrency: {}", getName(), concurrency);
    
            // Queue fusionnee pour tous les channels
            mergedQueue = new LinkedBlockingQueue<>(queueCapacity);
    
            onInitialize();
        }
    
        @Override
        public void start() {
            if (running.getAndSet(true)) {
                logger.warn("[{}] Already running", getName());
                return;
            }
    
            startedAt = Instant.now();
    
            // S'abonner aux channels
            for (String channel : channels) {
                if (channel.contains("*") || channel.contains(">")) {
                    // Pattern subscription
                    Subscription<T> sub = eventBus.subscribePattern(channel, getEventType());
                    subscriptions.add(sub);
                    startForwarder(sub);
                } else {
                    // Exact subscription
                    Subscription<T> sub = eventBus.subscribe(channel, getEventType());
                    subscriptions.add(sub);
                    startForwarder(sub);
                }
            }
    
            // Demarrer les workers de traitement
            executor = Executors.newThreadPerTaskExecutor(
                Thread.ofVirtual().name(getName() + "-push-", 0).factory()
            );
    
            for (int i = 0; i < concurrency; i++) {
                final int workerId = i;
                executor.submit(() -> pushLoop(workerId));
            }
    
            onStarted();
            logger.info("[{}] Started with {} workers on {} channels",
                getName(), concurrency, channels.length);
        }
    
        /**
         * Demarre un thread qui forward les events d'une subscription vers la queue fusionnee.
         */
        private void startForwarder(Subscription<T> sub) {
            Thread.ofVirtual()
                .name(getName() + "-fwd-" + sub.getId())
                .start(() -> {
                    while (running.get()) {
                        try {
                            BusEvent<T> event = sub.getQueue().take();
    
                            // Verifier poison pill
                            if (event.id() == null) break;
    
                            // Forward vers la queue fusionnee
                            if (!mergedQueue.offer(event, 1, TimeUnit.SECONDS)) {
                                logger.warn("[{}] Merged queue full, event dropped", getName());
                            }
                        } catch (InterruptedException e) {
                            Thread.currentThread().interrupt();
                            break;
                        }
                    }
                });
        }
    
        @Override
        public void stop() {
            if (!running.getAndSet(false)) {
                logger.warn("[{}] Not running", getName());
                return;
            }
    
            logger.info("[{}] Stopping...", getName());
            onStopping();
    
            // Se desabonner (injecte des poison pills)
            for (Subscription<T> sub : subscriptions) {
                try {
                    sub.unsubscribe();
                } catch (Exception e) {
                    logger.warn("[{}] Error unsubscribing: {}", getName(), e.getMessage());
                }
            }
            subscriptions.clear();
    
            // Injecter poison pills dans la queue fusionnee
            for (int i = 0; i < concurrency; i++) {
                mergedQueue.offer(new BusEvent<>(null, null, null, null, null, null));
            }
    
            // Attendre la fin des workers
            if (executor != null) {
                executor.shutdown();
                try {
                    if (!executor.awaitTermination(shutdownTimeout.toMillis(), TimeUnit.MILLISECONDS)) {
                        logger.warn("[{}] Forcing shutdown", getName());
                        executor.shutdownNow();
                    }
                } catch (InterruptedException e) {
                    executor.shutdownNow();
                    Thread.currentThread().interrupt();
                }
            }
    
            onStopped();
    
            long uptime = Duration.between(startedAt, Instant.now()).toSeconds();
            logger.info("[{}] Stopped after {}s. Processed={}, Errors={}",
                getName(), uptime, processedCount.get(), errorCount.get());
        }
    
        @Override
        public void doWork() {
            // Non utilise en mode PUSH
        }
    
        @Override
        public boolean isHealthy() {
            return running.get() && executor != null && !executor.isShutdown();
        }
    
        @Override
        public Map<String, Object> getStats() {
            long uptime = startedAt != null
                ? Duration.between(startedAt, Instant.now()).toMillis()
                : 0;
    
            double avgDurationMs = processedCount.get() > 0
                ? (totalDurationNs.get() / 1_000_000.0) / processedCount.get()
                : 0;
    
            double throughput = uptime > 0
                ? processedCount.get() * 1000.0 / uptime
                : 0;
    
            Map<String, Object> stats = new HashMap<>();
    
            // Cles standardisees
            stats.put("state", running.get() ? "running" : "stopped");
            stats.put("schedule", "PUSH");
            stats.put("execution_count", processedCount.get());
            stats.put("errors_count", errorCount.get());
            stats.put("last_execution", lastExecution != null ? lastExecution.toString() : null);
    
            // Cles specifiques
            stats.put("channels", channels);
            stats.put("concurrency", concurrency);
            stats.put("queue_size", mergedQueue != null ? mergedQueue.size() : 0);
            stats.put("queue_capacity", queueCapacity);
            stats.put("uptime_ms", uptime);
            stats.put("avg_duration_ms", Math.round(avgDurationMs * 100) / 100.0);
            stats.put("throughput_per_sec", Math.round(throughput * 100) / 100.0);
    
            // Stats des subscriptions
            stats.put("subscriptions", subscriptions.stream()
                .map(s -> Map.of(
                    "id", s.getId(),
                    "channel", s.getChannelOrPattern(),
                    "pending", s.getPendingCount()
                ))
                .toList());
    
            return stats;
        }
    
        // ==================== Push Loop ====================
    
        private void pushLoop(int workerId) {
            logger.debug("[{}] Push loop #{} started", getName(), workerId);
    
            while (running.get()) {
                try {
                    // BLOQUE jusqu'a reception d'un event (reveil instantane)
                    BusEvent<T> event = mergedQueue.take();
    
                    // Verifier poison pill
                    if (event.id() == null) {
                        logger.debug("[{}] Push loop #{} received shutdown signal", getName(), workerId);
                        break;
                    }
    
                    // Traiter l'event
                    processEventSafe(event, workerId);
    
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    logger.debug("[{}] Push loop #{} interrupted", getName(), workerId);
                    break;
                } catch (Exception e) {
                    logger.error("[{}] Unexpected error in push loop #{}: {}",
                        getName(), workerId, e.getMessage(), e);
                }
            }
    
            logger.debug("[{}] Push loop #{} stopped", getName(), workerId);
        }
    
        private void processEventSafe(BusEvent<T> event, int workerId) {
            long startNs = System.nanoTime();
    
            try {
                processEvent(event.payload(), event);
                processedCount.incrementAndGet();
            } catch (Exception e) {
                errorCount.incrementAndGet();
                handleError(event, e, workerId);
            } finally {
                long durationNs = System.nanoTime() - startNs;
                totalDurationNs.addAndGet(durationNs);
                lastExecution = Instant.now();
            }
        }
    
        // ==================== Methodes abstraites ====================
    
        /**
         * Type du payload des evenements.
         * Utilise pour le typage de la souscription.
         */
        protected abstract Class<T> getEventType();
    
        /**
         * Traite un evenement.
         *
         * @param payload Contenu de l'evenement
         * @param event   Evenement complet (avec metadata)
         */
        protected abstract void processEvent(T payload, BusEvent<T> event);
    
        /**
         * Version simplifiee sans acces a l'enveloppe.
         * Surcharger processEvent(T, BusEvent) pour avoir acces aux metadata.
         */
        protected void processEvent(T payload) {
            // Par defaut, appelle la version complete
        }
    
        // ==================== Hooks ====================
    
        /**
         * Hook d'initialisation.
         */
        protected void onInitialize() {}
    
        /**
         * Hook post-demarrage.
         */
        protected void onStarted() {}
    
        /**
         * Hook pre-arret.
         */
        protected void onStopping() {}
    
        /**
         * Hook post-arret.
         */
        protected void onStopped() {}
    
        /**
         * Gestion des erreurs.
         * Par defaut, log l'erreur. Surcharger pour DLQ, retry, etc.
         */
        protected void handleError(BusEvent<T> event, Exception e, int workerId) {
            logger.error("[{}] Worker #{} error processing event {}: {}",
                getName(), workerId, event.id(), e.getMessage(), e);
        }
    
        // ==================== Utilitaires ====================
    
        /**
         * Publie un evenement sur l'EventBus.
         * Raccourci pratique pour les workers qui produisent aussi des events.
         */
        protected void publish(String channel, Object payload) {
            eventBus.publish(channel, payload);
        }
    
        /**
         * Retourne le nombre d'evenements en attente.
         */
        public long getBacklog() {
            int pending = mergedQueue != null ? mergedQueue.size() : 0;
            for (Subscription<T> sub : subscriptions) {
                pending += sub.getPendingCount();
            }
            return pending;
        }
    }
    

    6.2 Version simplifiee (single channel)

    /**
     * Version simplifiee pour un seul channel.
     */
    public abstract class SimplePushWorker<T> extends AbstractPushWorker<T> {
    
        protected SimplePushWorker(String channel) {
            super(channel);
        }
    
        protected SimplePushWorker(int concurrency, String channel) {
            super(concurrency, channel);
        }
    
        @Override
        protected void processEvent(T payload, BusEvent<T> event) {
            processEvent(payload);
        }
    
        /**
         * Traite le payload directement.
         */
        protected abstract void processEvent(T payload);
    }
    

    7. Configuration

    7.1 application.yml

    socle:
      eventbus:
        # Activer l'EventBus
        enabled: ${EVENTBUS_ENABLED:true}
    
        # Mode: in_memory | redis
        mode: ${EVENTBUS_MODE:in_memory}
    
        # Configuration commune
        common:
          # Capacite par defaut des queues de subscription
          default_queue_capacity: ${EVENTBUS_QUEUE_CAPACITY:10000}
    
          # Timeout pour offer() quand queue pleine
          offer_timeout_ms: ${EVENTBUS_OFFER_TIMEOUT:100}
    
        # Configuration InMemory
        in_memory:
          # Rien de special
    
        # Configuration Redis
        redis:
          host: ${REDIS_HOST:localhost}
          port: ${REDIS_PORT:6379}
          password: ${REDIS_PASSWORD:}
          database: ${REDIS_DATABASE:0}
          prefix: ${EVENTBUS_REDIS_PREFIX:socle:eventbus}
    
          # Pool de connexions
          pool:
            max_total: ${EVENTBUS_REDIS_POOL_MAX:16}
            max_idle: ${EVENTBUS_REDIS_POOL_MAX_IDLE:8}
            min_idle: ${EVENTBUS_REDIS_POOL_MIN_IDLE:2}
    
        # Metriques
        metrics:
          enabled: ${EVENTBUS_METRICS_ENABLED:true}
          prefix: socle_eventbus
    

    7.2 Variables d’environnement

    Variable Description Defaut
    EVENTBUS_ENABLED Activer l’EventBus true
    EVENTBUS_MODE Mode (in_memory / redis) in_memory
    EVENTBUS_QUEUE_CAPACITY Capacite des queues 10000
    REDIS_HOST Host Redis localhost
    REDIS_PORT Port Redis 6379

    8. Exemples d’utilisation

    8.1 Worker simple

    @Component
    public class OrderNotificationWorker extends SimplePushWorker<OrderEvent> {
    
        @Autowired
        private NotificationService notificationService;
    
        public OrderNotificationWorker() {
            super(2, "orders.created");  // 2 workers concurrents
        }
    
        @Override
        public String getName() {
            return "order_notification_worker";
        }
    
        @Override
        protected Class<OrderEvent> getEventType() {
            return OrderEvent.class;
        }
    
        @Override
        protected void processEvent(OrderEvent event) {
            notificationService.notifyOrderCreated(event.orderId(), event.customerEmail());
        }
    }
    

    8.2 Worker multi-channels

    @Component
    public class AuditWorker extends AbstractPushWorker<Object> {
    
        @Autowired
        private AuditRepository auditRepository;
    
        public AuditWorker() {
            super(4, "orders.*", "payments.*", "users.*");  // Pattern matching
        }
    
        @Override
        public String getName() {
            return "audit_worker";
        }
    
        @Override
        protected Class<Object> getEventType() {
            return Object.class;  // Accepte tout
        }
    
        @Override
        protected void processEvent(Object payload, BusEvent<Object> event) {
            // Acces aux metadata
            AuditEntry entry = new AuditEntry(
                event.id(),
                event.channel(),
                event.timestamp(),
                event.source(),
                payload
            );
            auditRepository.save(entry);
        }
    }
    

    8.3 Publication d’evenements

    @Service
    public class OrderService {
    
        @Autowired
        private EventBus eventBus;
    
        @Autowired
        private OrderRepository orderRepository;
    
        public Order createOrder(CreateOrderRequest request) {
            // Logique metier
            Order order = orderRepository.save(new Order(request));
    
            // Publication sur l'EventBus
            // Tous les workers abonnes a "orders.created" sont reveilles instantanement
            eventBus.publish("orders.created", new OrderEvent(
                order.getId(),
                order.getCustomerEmail(),
                order.getTotal()
            ));
    
            return order;
        }
    
        public void updateOrderStatus(String orderId, OrderStatus newStatus) {
            Order order = orderRepository.updateStatus(orderId, newStatus);
    
            eventBus.publish("orders.updated", new OrderStatusEvent(
                orderId,
                newStatus
            ));
        }
    }
    

    8.4 Worker avec gestion d’erreur personnalisee

    @Component
    public class PaymentWorker extends AbstractPushWorker<PaymentEvent> {
    
        @Autowired
        private PaymentService paymentService;
    
        @Autowired
        private DeadLetterQueue dlq;
    
        public PaymentWorker() {
            super(2, "payments.process");
        }
    
        @Override
        public String getName() {
            return "payment_worker";
        }
    
        @Override
        protected Class<PaymentEvent> getEventType() {
            return PaymentEvent.class;
        }
    
        @Override
        protected void processEvent(PaymentEvent payload, BusEvent<PaymentEvent> event) {
            paymentService.processPayment(payload);
        }
    
        @Override
        protected void handleError(BusEvent<PaymentEvent> event, Exception e, int workerId) {
            logger.error("[{}] Payment processing failed for {}: {}",
                getName(), event.payload().paymentId(), e.getMessage());
    
            // Envoyer en DLQ pour retry ulterieur
            dlq.send("payments.dlq", event, e);
        }
    }
    

    9. Solutions aux questions ouvertes

    9.1 Arret propre des workers en take()

    Probleme : BlockingQueue.take() bloque indefiniment. Comment arreter proprement ?

    Solution : Poison Pill Pattern

    // A l'arret, injecter un evenement "poison" reconnaissable
    @Override
    public void stop() {
        running.set(false);
    
        // Injecter N poison pills (un par worker)
        for (int i = 0; i < concurrency; i++) {
            queue.offer(new BusEvent<>(null, null, null, null, null, null));
        }
    }
    
    // Dans la boucle, verifier le poison
    private void pushLoop(int workerId) {
        while (running.get()) {
            BusEvent<T> event = queue.take();
    
            // Detecter le poison pill (id == null)
            if (event.id() == null) {
                break;  // Sortir proprement
            }
    
            processEvent(event);
        }
    }
    

    9.2 Backpressure (queue bornee vs illimitee)

    Probleme : Que faire si les events arrivent plus vite qu’ils ne sont traites ?

    Solution : Queue bornee avec politique de debordement

    // Queue bornee (recommande)
    BlockingQueue<BusEvent<T>> queue = new LinkedBlockingQueue<>(10_000);
    
    // Lors du publish, gerer le debordement
    private boolean deliver(Subscription sub, BusEvent event) {
        // offer() avec timeout - ne bloque pas indefiniment
        boolean offered = queue.offer(event, 100, TimeUnit.MILLISECONDS);
    
        if (!offered) {
            // Politique de debordement:
            // Option 1: Drop (log + metrique)
            log.warn("Queue full, event dropped");
            droppedCount.incrementAndGet();
    
            // Option 2: Envoyer en DLQ
            dlq.send("overflow", event);
    
            // Option 3: Alerter
            alertService.queueOverflow(sub.getId());
        }
    
        return offered;
    }
    

    Metriques a surveiller :

    • socle_eventbus_queue_size : Taille actuelle
    • socle_eventbus_queue_capacity : Capacite max
    • socle_eventbus_dropped_total : Events perdus

    9.3 Multi-channel avec une seule queue

    Probleme : Un worker ecoute plusieurs channels, comment fusionner ?

    Solution : Queue fusionnee avec forwarders

    // Queue fusionnee
    BlockingQueue<BusEvent<T>> mergedQueue = new LinkedBlockingQueue<>();
    
    // Un forwarder par subscription
    for (String channel : channels) {
        Subscription<T> sub = eventBus.subscribe(channel, eventType);
    
        // Thread qui forward vers la queue fusionnee
        Thread.ofVirtual().start(() -> {
            while (running.get()) {
                BusEvent<T> event = sub.getQueue().take();
                mergedQueue.offer(event);
            }
        });
    }
    
    // Les workers consomment la queue fusionnee
    void pushLoop() {
        while (running.get()) {
            BusEvent<T> event = mergedQueue.take();
            // event.channel() permet de savoir d'ou il vient
            processEvent(event);
        }
    }
    

    9.4 Ordre des evenements

    Probleme : Les evenements doivent-ils etre traites dans l’ordre ?

    Solution : Depends du cas d’usage

    Mode Garantie Implementation
    Aucune Pas d’ordre garanti Concurrency > 1
    Par channel Ordre par channel Concurrency = 1 par channel
    Par cle Ordre par cle metier Partitioning sur la cle
    // Pour garantir l'ordre par cle (ex: par orderId)
    public class OrderedPushWorker extends AbstractPushWorker<OrderEvent> {
    
        // Une queue par partition (hash de la cle)
        private final BlockingQueue<BusEvent<OrderEvent>>[] partitionQueues;
        private final int partitions = 8;
    
        @Override
        protected void processEvent(OrderEvent payload, BusEvent<OrderEvent> event) {
            // Le hash garantit que les events du meme orderId vont dans la meme partition
            int partition = Math.abs(payload.orderId().hashCode()) % partitions;
            partitionQueues[partition].offer(event);
        }
    
        // Un worker par partition = ordre garanti par partition
    }
    

    9.5 Retry et DLQ

    Probleme : Que faire si le traitement echoue ?

    Solution : Retry avec backoff puis DLQ

    @Component
    public class ResilientPushWorker extends AbstractPushWorker<MyEvent> {
    
        @Autowired
        private EventBus eventBus;
    
        private static final int MAX_RETRIES = 3;
    
        @Override
        protected void processEvent(MyEvent payload, BusEvent<MyEvent> event) {
            int retryCount = getRetryCount(event);
    
            try {
                doProcess(payload);
            } catch (RetryableException e) {
                if (retryCount < MAX_RETRIES) {
                    // Re-publier avec retry count incremente
                    scheduleRetry(event, retryCount + 1);
                } else {
                    // Max retries atteint -> DLQ
                    sendToDlq(event, e);
                }
            }
        }
    
        private void scheduleRetry(BusEvent<MyEvent> event, int retryCount) {
            // Backoff exponentiel
            long delayMs = (long) Math.pow(2, retryCount) * 1000;
    
            // Re-publier apres le delai
            scheduler.schedule(() -> {
                Map<String, String> metadata = new HashMap<>(event.metadata());
                metadata.put("retry_count", String.valueOf(retryCount));
                eventBus.publish(event.channel(), event.payload(), metadata);
            }, delayMs, TimeUnit.MILLISECONDS);
        }
    
        private int getRetryCount(BusEvent<MyEvent> event) {
            return Integer.parseInt(event.metadata().getOrDefault("retry_count", "0"));
        }
    
        private void sendToDlq(BusEvent<MyEvent> event, Exception e) {
            eventBus.publish("dlq." + event.channel(), new DlqEntry(event, e));
        }
    }
    

    10. Migration des workers existants

    10.1 Avant (AbstractEventDrivenWorker)

    // DEPRECIE
    @Component
    public class OldOrderWorker extends AbstractEventDrivenWorker<OrderEvent> {
    
        private final BlockingQueue<OrderEvent> queue = new LinkedBlockingQueue<>();
    
        public OldOrderWorker() {
            super(4);
        }
    
        @Override
        public String getName() {
            return "order_worker";
        }
    
        @Override
        protected OrderEvent pollEvent() throws InterruptedException {
            return queue.poll(100, TimeUnit.MILLISECONDS);  // Polling!
        }
    
        @Override
        protected void processEvent(OrderEvent event) {
            // Traitement
        }
    
        // Methode externe pour recevoir les events
        public void onOrderReceived(OrderEvent event) {
            queue.offer(event);
        }
    }
    

    10.2 Apres (AbstractPushWorker)

    // RECOMMANDE
    @Component
    public class NewOrderWorker extends AbstractPushWorker<OrderEvent> {
    
        public NewOrderWorker() {
            super(4, "orders.created");  // Specifie le channel
        }
    
        @Override
        public String getName() {
            return "order_worker";
        }
    
        @Override
        protected Class<OrderEvent> getEventType() {
            return OrderEvent.class;
        }
    
        @Override
        protected void processEvent(OrderEvent payload, BusEvent<OrderEvent> event) {
            // Meme traitement
        }
    
        // Plus besoin de methode externe - l'EventBus gere tout
    }
    

    10.3 Etapes de migration

    1. Ajouter la dependance EventBus au worker
    2. Changer la classe parente : AbstractEventDrivenWorkerAbstractPushWorker
    3. Specifier les channels dans le constructeur
    4. Implementer getEventType()
    5. Adapter processEvent() pour accepter BusEvent
    6. Supprimer la queue interne et pollEvent()
    7. Modifier les producteurs pour utiliser eventBus.publish()
    8. Tester le comportement

    11. Metriques Prometheus

    # EventBus
    socle_eventbus_published_total{channel="orders.created"}
    socle_eventbus_delivered_total{channel="orders.created"}
    socle_eventbus_failed_total{channel="orders.created"}
    socle_eventbus_subscriptions_active{channel="orders.created"}
    socle_eventbus_queue_size{subscription="sub-123"}
    
    # Push Workers
    socle_push_worker_processed_total{worker="order_worker"}
    socle_push_worker_errors_total{worker="order_worker"}
    socle_push_worker_queue_size{worker="order_worker"}
    socle_push_worker_duration_seconds{worker="order_worker", quantile="0.99"}
    

    12. API REST Admin

    GET  /admin/eventbus/stats              → Statistiques globales
    GET  /admin/eventbus/channels           → Liste des channels actifs
    GET  /admin/eventbus/subscriptions      → Liste des subscriptions
    
    POST /admin/eventbus/publish            → Publier un event (debug)
         Body: {"channel": "test", "payload": {...}}
    

    13. Voir aussi

    Socle V004 – EventBus et Workers Push

  • Socle V004 – Kubernetes

    Socle V004 – Kubernetes

    16 – Kubernetes

    Version : 4.0.0 Date : 2025-12-09

    1. Introduction

    Guide de déploiement du Socle V4 sur Kubernetes.

    2. Image Docker

    2.1 Dockerfile

    FROM eclipse-temurin:21-jre-alpine
    
    LABEL maintainer="your-team@company.com"
    LABEL version="4.0.0"
    
    WORKDIR /app
    
    # Non-root user
    RUN addgroup -S socle && adduser -S socle -G socle
    USER socle
    
    # Copy application
    COPY --chown=socle:socle target/socle-v004-4.0.0.jar app.jar
    
    # Health check
    HEALTHCHECK --interval=30s --timeout=10s --retries=3 \
      CMD wget -qO- http://localhost:8080/admin/health/live || exit 1
    
    # Default environment
    ENV JAVA_OPTS="-XX:+UseContainerSupport -XX:MaxRAMPercentage=75.0"
    
    EXPOSE 8080
    
    ENTRYPOINT ["sh", "-c", "java $JAVA_OPTS -jar app.jar"]
    

    2.2 Build et Push

    # Build
    docker build -t gcr.io/my-project/socle-v4:4.0.0 .
    
    # Push
    docker push gcr.io/my-project/socle-v4:4.0.0
    

    3. Manifests Kubernetes

    3.1 Namespace

    apiVersion: v1
    kind: Namespace
    metadata:
      name: socle
      labels:
        name: socle
    

    3.2 ConfigMap

    apiVersion: v1
    kind: ConfigMap
    metadata:
      name: socle-config
      namespace: socle
    data:
      APP_NAME: "socle-v4"
      ENV_NAME: "PROD"
      REGION: "europe-west1"
      HTTP_PORT: "8080"
      KVBUS_MODE: "redis"
      REDIS_HOST: "redis-master.redis.svc.cluster.local"
      TECHDB_ENABLED: "true"
      LOG_FORWARDER_ENABLED: "true"
      LOG_TRANSPORT_MODE: "http"
      SCHEDULER_ENABLED: "true"
      ADMIN_ENABLED: "true"
      ADMIN_AUTH_ENABLED: "true"
    

    3.3 Secret

    apiVersion: v1
    kind: Secret
    metadata:
      name: socle-secrets
      namespace: socle
    type: Opaque
    stringData:
      REDIS_PASSWORD: "your-redis-password"
      ADMIN_PASSWORD: "your-admin-password"
      API_KEY: "your-api-key"
      TECHDB_PASSWORD: "your-techdb-password"
    

    3.4 Deployment

    apiVersion: apps/v1
    kind: Deployment
    metadata:
      name: socle-v4
      namespace: socle
      labels:
        app: socle-v4
        version: "4.0.0"
    spec:
      replicas: 2
      selector:
        matchLabels:
          app: socle-v4
      strategy:
        type: RollingUpdate
        rollingUpdate:
          maxSurge: 1
          maxUnavailable: 0
      template:
        metadata:
          labels:
            app: socle-v4
            version: "4.0.0"
          annotations:
            prometheus.io/scrape: "true"
            prometheus.io/path: "/actuator/prometheus"
            prometheus.io/port: "8080"
        spec:
          serviceAccountName: socle-sa
          securityContext:
            runAsNonRoot: true
            runAsUser: 1000
            fsGroup: 1000
          containers:
            - name: socle
              image: gcr.io/my-project/socle-v4:4.0.0
              imagePullPolicy: Always
              ports:
                - name: http
                  containerPort: 8080
                  protocol: TCP
              envFrom:
                - configMapRef:
                    name: socle-config
                - secretRef:
                    name: socle-secrets
              env:
                - name: POD_NAME
                  valueFrom:
                    fieldRef:
                      fieldPath: metadata.name
                - name: POD_NAMESPACE
                  valueFrom:
                    fieldRef:
                      fieldPath: metadata.namespace
                - name: EXEC_ID
                  value: "$(POD_NAME)"
              resources:
                requests:
                  cpu: "250m"
                  memory: "512Mi"
                limits:
                  cpu: "1000m"
                  memory: "1Gi"
              livenessProbe:
                httpGet:
                  path: /admin/health/live
                  port: 8080
                initialDelaySeconds: 30
                periodSeconds: 10
                timeoutSeconds: 5
                failureThreshold: 3
              readinessProbe:
                httpGet:
                  path: /admin/health/ready
                  port: 8080
                initialDelaySeconds: 10
                periodSeconds: 5
                timeoutSeconds: 3
                failureThreshold: 3
              volumeMounts:
                - name: data
                  mountPath: /app/data
                - name: logs
                  mountPath: /app/logs
          volumes:
            - name: data
              emptyDir: {}
            - name: logs
              emptyDir: {}
          affinity:
            podAntiAffinity:
              preferredDuringSchedulingIgnoredDuringExecution:
                - weight: 100
                  podAffinityTerm:
                    labelSelector:
                      matchLabels:
                        app: socle-v4
                    topologyKey: kubernetes.io/hostname
    

    3.5 Service

    apiVersion: v1
    kind: Service
    metadata:
      name: socle-v4
      namespace: socle
      labels:
        app: socle-v4
    spec:
      type: ClusterIP
      ports:
        - name: http
          port: 80
          targetPort: 8080
          protocol: TCP
      selector:
        app: socle-v4
    

    3.6 Ingress

    apiVersion: networking.k8s.io/v1
    kind: Ingress
    metadata:
      name: socle-v4
      namespace: socle
      annotations:
        kubernetes.io/ingress.class: nginx
        nginx.ingress.kubernetes.io/ssl-redirect: "true"
        cert-manager.io/cluster-issuer: letsencrypt-prod
    spec:
      tls:
        - hosts:
            - socle.example.com
          secretName: socle-tls
      rules:
        - host: socle.example.com
          http:
            paths:
              - path: /
                pathType: Prefix
                backend:
                  service:
                    name: socle-v4
                    port:
                      number: 80
    

    3.7 HorizontalPodAutoscaler

    apiVersion: autoscaling/v2
    kind: HorizontalPodAutoscaler
    metadata:
      name: socle-v4
      namespace: socle
    spec:
      scaleTargetRef:
        apiVersion: apps/v1
        kind: Deployment
        name: socle-v4
      minReplicas: 2
      maxReplicas: 10
      metrics:
        - type: Resource
          resource:
            name: cpu
            target:
              type: Utilization
              averageUtilization: 70
        - type: Resource
          resource:
            name: memory
            target:
              type: Utilization
              averageUtilization: 80
    

    3.8 PodDisruptionBudget

    apiVersion: policy/v1
    kind: PodDisruptionBudget
    metadata:
      name: socle-v4
      namespace: socle
    spec:
      minAvailable: 1
      selector:
        matchLabels:
          app: socle-v4
    

    3.9 ServiceAccount

    apiVersion: v1
    kind: ServiceAccount
    metadata:
      name: socle-sa
      namespace: socle
    

    4. Persistence avec PVC

    4.1 PersistentVolumeClaim

    apiVersion: v1
    kind: PersistentVolumeClaim
    metadata:
      name: socle-data
      namespace: socle
    spec:
      accessModes:
        - ReadWriteOnce
      storageClassName: standard
      resources:
        requests:
          storage: 10Gi
    

    4.2 Deployment avec PVC

    # Dans le Deployment
    spec:
      template:
        spec:
          containers:
            - name: socle
              volumeMounts:
                - name: data
                  mountPath: /app/data
          volumes:
            - name: data
              persistentVolumeClaim:
                claimName: socle-data
    

    5. Network Policies

    apiVersion: networking.k8s.io/v1
    kind: NetworkPolicy
    metadata:
      name: socle-network-policy
      namespace: socle
    spec:
      podSelector:
        matchLabels:
          app: socle-v4
      policyTypes:
        - Ingress
        - Egress
      ingress:
        # Allow from ingress controller
        - from:
            - namespaceSelector:
                matchLabels:
                  name: ingress-nginx
          ports:
            - port: 8080
        # Allow from Prometheus
        - from:
            - namespaceSelector:
                matchLabels:
                  name: monitoring
          ports:
            - port: 8080
      egress:
        # Allow to Redis
        - to:
            - namespaceSelector:
                matchLabels:
                  name: redis
          ports:
            - port: 6379
        # Allow to DNS
        - to:
            - namespaceSelector: {}
              podSelector:
                matchLabels:
                  k8s-app: kube-dns
          ports:
            - port: 53
              protocol: UDP
    

    6. Helm Chart

    6.1 Chart.yaml

    apiVersion: v2
    name: socle-v4
    description: Socle V4 Framework
    version: 4.0.0
    appVersion: "4.0.0"
    

    6.2 values.yaml

    replicaCount: 2
    
    image:
      repository: gcr.io/my-project/socle-v4
      tag: "4.0.0"
      pullPolicy: Always
    
    service:
      type: ClusterIP
      port: 80
    
    ingress:
      enabled: true
      className: nginx
      hosts:
        - host: socle.example.com
          paths:
            - path: /
              pathType: Prefix
      tls:
        - secretName: socle-tls
          hosts:
            - socle.example.com
    
    resources:
      requests:
        cpu: 250m
        memory: 512Mi
      limits:
        cpu: 1000m
        memory: 1Gi
    
    autoscaling:
      enabled: true
      minReplicas: 2
      maxReplicas: 10
      targetCPUUtilizationPercentage: 70
    
    config:
      APP_NAME: socle-v4
      ENV_NAME: PROD
      KVBUS_MODE: redis
    
    secrets:
      REDIS_PASSWORD: ""
      ADMIN_PASSWORD: ""
      API_KEY: ""
    

    6.3 Installation

    # Install
    helm install socle-v4 ./socle-v4-chart -n socle --create-namespace -f values-prod.yaml
    
    # Upgrade
    helm upgrade socle-v4 ./socle-v4-chart -n socle -f values-prod.yaml
    
    # Uninstall
    helm uninstall socle-v4 -n socle
    

    7. Observability

    7.1 ServiceMonitor (Prometheus Operator)

    apiVersion: monitoring.coreos.com/v1
    kind: ServiceMonitor
    metadata:
      name: socle-v4
      namespace: socle
      labels:
        release: prometheus
    spec:
      selector:
        matchLabels:
          app: socle-v4
      endpoints:
        - port: http
          path: /actuator/prometheus
          interval: 15s
    

    7.2 PrometheusRule

    apiVersion: monitoring.coreos.com/v1
    kind: PrometheusRule
    metadata:
      name: socle-v4-alerts
      namespace: socle
    spec:
      groups:
        - name: socle-v4
          rules:
            - alert: SocleHighErrorRate
              expr: rate(socle_errors_total[5m]) > 0.1
              for: 5m
              labels:
                severity: warning
              annotations:
                summary: High error rate
    

    8. Déploiement Multi-région

    8.1 Structure

    clusters/
    ├── europe-west1/
    │   ├── kustomization.yaml
    │   └── config-patch.yaml
    ├── us-central1/
    │   ├── kustomization.yaml
    │   └── config-patch.yaml
    └── base/
        ├── kustomization.yaml
        ├── deployment.yaml
        ├── service.yaml
        └── configmap.yaml
    

    8.2 Kustomize overlay

    # clusters/europe-west1/kustomization.yaml
    apiVersion: kustomize.config.k8s.io/v1beta1
    kind: Kustomization
    bases:
      - ../../base
    patchesStrategicMerge:
      - config-patch.yaml
    configMapGenerator:
      - name: socle-config
        behavior: merge
        literals:
          - REGION=europe-west1
    

    9. Troubleshooting

    Commandes utiles

    # Logs
    kubectl logs -f deployment/socle-v4 -n socle
    
    # Describe pod
    kubectl describe pod -l app=socle-v4 -n socle
    
    # Port forward
    kubectl port-forward svc/socle-v4 8080:80 -n socle
    
    # Exec into pod
    kubectl exec -it deployment/socle-v4 -n socle -- sh
    
    # Check events
    kubectl get events -n socle --sort-by='.lastTimestamp'
    

    10. Références

  • Socle V004 – EventBus et Workers

    Socle V004 – EventBus et Workers

    30 – EventBus et Workers Push

    Version : 4.1.0 Date : 2026-01-23 Statut : Specification

    1. Introduction

    Ce document introduit le nouveau systeme EventBus et les Workers Push dans le Socle V004. Ce mecanisme remplace avantageusement le polling par un systeme de notification instantanee.

    1.1 Motivation

    Le mecanisme actuel des workers event-driven (AbstractEventDrivenWorker) utilise un polling avec timeout :

    // Ancien mecanisme (DEPRECIE)
    while (running) {
        event = queue.poll(100, TimeUnit.MILLISECONDS);  // Verifie toutes les 100ms
        if (event != null) processEvent(event);
    }
    

    Problemes :

    • Latence de 0 a 100ms avant traitement
    • Consommation CPU constante (meme si faible)
    • Pas de vrai mecanisme push

    1.2 Nouveau mecanisme Push

    // Nouveau mecanisme (RECOMMANDE)
    while (running) {
        event = queue.take();  // Dort jusqu'a ce qu'un event arrive
        processEvent(event);   // Reveil instantane
    }
    

    Avantages :

    • Latence ~0ms (reveil instantane)
    • Zero CPU quand idle
    • Vrai mecanisme push event-driven

    2. Depreciation des anciens Workers

    2.1 Classes depreciees

    Classe Statut Remplacement
    AbstractEventDrivenWorker<T> DEPRECIE AbstractPushWorker<T>
    KafkaEventWorker<K,V> DEPRECIE KafkaPushWorker<K,V>

    2.2 Migration

    Les anciens workers continuent de fonctionner mais ne doivent plus etre utilises pour les nouveaux developpements.

    // DEPRECIE - Ne plus utiliser
    @Deprecated(since = "4.1.0", forRemoval = true)
    public abstract class AbstractEventDrivenWorker<T> implements Worker { ... }
    
    // RECOMMANDE - Utiliser ceci
    public abstract class AbstractPushWorker<T> implements Worker { ... }
    

    2.3 Calendrier de depreciation

    Version Action
    4.1.0 Introduction EventBus + Workers Push, depreciation des anciens
    4.2.0 Warnings de compilation pour les anciens workers
    5.0.0 Suppression des anciens workers (breaking change)

    3. Architecture EventBus

    3.1 Vue d’ensemble

    ┌─────────────────────────────────────────────────────────────────────┐
    │                              MOP                                     │
    │                                                                      │
    │  ┌──────────────────────────────────────────────────────────────┐   │
    │  │                        EventBus                               │   │
    │  │                                                               │   │
    │  │   Channels:                                                   │   │
    │  │     "orders.created"   ──► [Worker1.queue, Worker2.queue]     │   │
    │  │     "orders.updated"   ──► [Worker1.queue]                    │   │
    │  │     "alerts.*"         ──► [AlertWorker.queue]                │   │
    │  │                                                               │   │
    │  └──────────────────────────────────────────────────────────────┘   │
    │                              │                                       │
    │         publish("orders.created", event)                            │
    │                              │                                       │
    │                              ▼                                       │
    │  ┌────────────────┐  ┌────────────────┐  ┌────────────────┐        │
    │  │ PushWorker 1   │  │ PushWorker 2   │  │ PushWorker 3   │        │
    │  │                │  │                │  │                │        │
    │  │ queue.take()   │  │ queue.take()   │  │ queue.take()   │        │
    │  │ (dort)         │  │ (dort)         │  │ (dort)         │        │
    │  │      ↓         │  │      ↓         │  │                │        │
    │  │ REVEIL!        │  │ REVEIL!        │  │ (pas abonne)   │        │
    │  └────────────────┘  └────────────────┘  └────────────────┘        │
    │                                                                      │
    └─────────────────────────────────────────────────────────────────────┘
    

    3.2 Composants

    Composant Role
    EventBus Bus central de publication/souscription
    BusEvent<T> Enveloppe d’evenement avec metadata
    AbstractPushWorker<T> Worker reveille par EventBus
    KafkaPushWorker<K,V> Worker Kafka avec push interne
    EventBusWorker Worker qui gere le lifecycle de l’EventBus

    4. Interface EventBus

    4.1 Definition

    package eu.lmvi.socle.event;
    
    /**
     * Bus d'evenements central avec mecanisme push.
     *
     * <p>L'EventBus permet la communication entre workers via un systeme
     * de publication/souscription avec reveil instantane.</p>
     */
    public interface EventBus {
    
        // ==================== Publication ====================
    
        /**
         * Publie un evenement sur un channel.
         * Les subscribers sont reveilles instantanement.
         *
         * @param channel Nom du channel (ex: "orders.created")
         * @param payload Contenu de l'evenement
         */
        void publish(String channel, Object payload);
    
        /**
         * Publie un evenement avec metadata personnalisees.
         *
         * @param channel  Nom du channel
         * @param payload  Contenu de l'evenement
         * @param metadata Metadata additionnelles
         */
        void publish(String channel, Object payload, Map<String, String> metadata);
    
        /**
         * Publication asynchrone avec confirmation.
         *
         * @return Future completee quand tous les subscribers ont recu l'event
         */
        CompletableFuture<PublishResult> publishAsync(String channel, Object payload);
    
        // ==================== Souscription ====================
    
        /**
         * S'abonne a un channel et retourne une queue pour reception.
         * Utiliser queue.take() pour attendre les evenements (push).
         *
         * @param channel   Nom du channel
         * @param eventType Type attendu du payload
         * @return Queue sur laquelle faire take()
         */
        <T> Subscription<T> subscribe(String channel, Class<T> eventType);
    
        /**
         * S'abonne avec un pattern glob (ex: "orders.*", "*.created").
         *
         * @param pattern   Pattern glob
         * @param eventType Type attendu du payload
         * @return Subscription avec queue
         */
        <T> Subscription<T> subscribePattern(String pattern, Class<T> eventType);
    
        /**
         * S'abonne avec un handler callback (execution immediate).
         *
         * @param channel Nom du channel
         * @param handler Handler appele pour chaque evenement
         * @return ID de souscription pour unsubscribe
         */
        String subscribe(String channel, EventHandler handler);
    
        /**
         * Se desabonne.
         *
         * @param subscriptionId ID retourne par subscribe
         */
        void unsubscribe(String subscriptionId);
    
        // ==================== Lifecycle ====================
    
        /**
         * Demarre l'EventBus.
         */
        void start();
    
        /**
         * Arrete l'EventBus proprement.
         * Les queues sont videes, les subscribers notifies.
         */
        void stop();
    
        /**
         * Verifie la sante de l'EventBus.
         */
        boolean isHealthy();
    
        // ==================== Stats ====================
    
        /**
         * Statistiques globales.
         */
        EventBusStats getStats();
    
        /**
         * Statistiques par channel.
         */
        Map<String, ChannelStats> getChannelStats();
    }
    

    4.2 Classes de support

    package eu.lmvi.socle.event;
    
    /**
     * Evenement transporte par l'EventBus.
     */
    public record BusEvent<T>(
        String id,                      // UUID unique
        String channel,                 // Channel de publication
        Instant timestamp,              // Timestamp de publication
        String source,                  // Source (worker name)
        T payload,                      // Contenu metier
        Map<String, String> metadata    // Metadata additionnelles
    ) {
        /**
         * Cree un evenement simple.
         */
        public static <T> BusEvent<T> of(String channel, T payload) {
            return new BusEvent<>(
                UUID.randomUUID().toString(),
                channel,
                Instant.now(),
                null,
                payload,
                Map.of()
            );
        }
    }
    
    /**
     * Souscription a un channel.
     */
    public interface Subscription<T> {
    
        /**
         * ID unique de la souscription.
         */
        String getId();
    
        /**
         * Channel ou pattern souscrit.
         */
        String getChannelOrPattern();
    
        /**
         * Queue de reception des evenements.
         * Utiliser take() pour attendre (bloquant, reveil instantane).
         * Utiliser poll(timeout) si besoin d'un timeout.
         */
        BlockingQueue<BusEvent<T>> getQueue();
    
        /**
         * Se desabonner.
         */
        void unsubscribe();
    
        /**
         * Nombre d'evenements en attente dans la queue.
         */
        int getPendingCount();
    }
    
    /**
     * Handler pour souscription callback.
     */
    @FunctionalInterface
    public interface EventHandler {
        void onEvent(BusEvent<?> event);
    }
    
    /**
     * Resultat de publication.
     */
    public record PublishResult(
        String eventId,
        int deliveredCount,
        int failedCount,
        Duration duration
    ) {}
    
    /**
     * Statistiques EventBus.
     */
    public record EventBusStats(
        long totalPublished,
        long totalDelivered,
        long totalFailed,
        int activeSubscriptions,
        int activeChannels,
        Instant startedAt
    ) {}
    
    /**
     * Statistiques par channel.
     */
    public record ChannelStats(
        String channel,
        long publishedCount,
        int subscriberCount,
        long lastPublishedAt
    ) {}
    

    5. Implementations EventBus

    5.1 InMemoryEventBus

    Implementation pour mono-instance (dev, tests, applications simples).

    package eu.lmvi.socle.event.impl;
    
    @Component
    @ConditionalOnProperty(name = "socle.eventbus.mode", havingValue = "in_memory", matchIfMissing = true)
    public class InMemoryEventBus implements EventBus {
    
        private static final Logger log = LoggerFactory.getLogger(InMemoryEventBus.class);
    
        // Subscriptions par channel exact
        private final Map<String, Set<SubscriptionImpl<?>>> channelSubs = new ConcurrentHashMap<>();
    
        // Subscriptions par pattern
        private final Map<Pattern, Set<SubscriptionImpl<?>>> patternSubs = new ConcurrentHashMap<>();
    
        // Index pour lookup rapide
        private final Map<String, SubscriptionImpl<?>> subsIndex = new ConcurrentHashMap<>();
    
        // Compteurs
        private final AtomicLong idGen = new AtomicLong();
        private final AtomicLong publishedCount = new AtomicLong();
        private final AtomicLong deliveredCount = new AtomicLong();
        private final AtomicLong failedCount = new AtomicLong();
    
        // Etat
        private volatile boolean running = false;
        private Instant startedAt;
    
        // ==================== Lifecycle ====================
    
        @Override
        public void start() {
            running = true;
            startedAt = Instant.now();
            log.info("InMemoryEventBus started");
        }
    
        @Override
        public void stop() {
            running = false;
    
            // Notifier tous les subscribers (poison pill)
            for (SubscriptionImpl<?> sub : subsIndex.values()) {
                sub.notifyShutdown();
            }
    
            log.info("InMemoryEventBus stopped. Published={}, Delivered={}, Failed={}",
                publishedCount.get(), deliveredCount.get(), failedCount.get());
        }
    
        @Override
        public boolean isHealthy() {
            return running;
        }
    
        // ==================== Publish ====================
    
        @Override
        public void publish(String channel, Object payload) {
            publish(channel, payload, Map.of());
        }
    
        @Override
        public void publish(String channel, Object payload, Map<String, String> metadata) {
            if (!running) {
                log.warn("EventBus not running, event dropped on channel '{}'", channel);
                return;
            }
    
            BusEvent<?> event = new BusEvent<>(
                "evt-" + idGen.incrementAndGet(),
                channel,
                Instant.now(),
                resolveSource(),
                payload,
                metadata
            );
    
            int delivered = 0;
            int failed = 0;
    
            // Livraison aux subscriptions exactes
            Set<SubscriptionImpl<?>> exact = channelSubs.get(channel);
            if (exact != null) {
                for (SubscriptionImpl<?> sub : exact) {
                    if (deliverToSubscription(sub, event)) {
                        delivered++;
                    } else {
                        failed++;
                    }
                }
            }
    
            // Livraison aux subscriptions par pattern
            for (Map.Entry<Pattern, Set<SubscriptionImpl<?>>> entry : patternSubs.entrySet()) {
                if (entry.getKey().matcher(channel).matches()) {
                    for (SubscriptionImpl<?> sub : entry.getValue()) {
                        if (deliverToSubscription(sub, event)) {
                            delivered++;
                        } else {
                            failed++;
                        }
                    }
                }
            }
    
            publishedCount.incrementAndGet();
            deliveredCount.addAndGet(delivered);
            failedCount.addAndGet(failed);
    
            log.debug("Published event {} to channel '{}': delivered={}, failed={}",
                event.id(), channel, delivered, failed);
        }
    
        @Override
        public CompletableFuture<PublishResult> publishAsync(String channel, Object payload) {
            return CompletableFuture.supplyAsync(() -> {
                Instant start = Instant.now();
                publish(channel, payload);
                // Simplification: on ne trace pas le detail ici
                return new PublishResult(
                    "evt-" + idGen.get(),
                    1, 0,
                    Duration.between(start, Instant.now())
                );
            });
        }
    
        @SuppressWarnings("unchecked")
        private boolean deliverToSubscription(SubscriptionImpl<?> sub, BusEvent<?> event) {
            try {
                BlockingQueue<BusEvent<?>> queue = (BlockingQueue<BusEvent<?>>) sub.getQueue();
    
                // offer avec timeout pour eviter blocage infini
                boolean offered = queue.offer(event, 100, TimeUnit.MILLISECONDS);
    
                if (!offered) {
                    log.warn("Queue full for subscription {}, event dropped", sub.getId());
                    return false;
                }
    
                return true;
    
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                return false;
            } catch (Exception e) {
                log.error("Error delivering to subscription {}: {}", sub.getId(), e.getMessage());
                return false;
            }
        }
    
        private String resolveSource() {
            // Recuperer le nom du worker courant si possible
            String threadName = Thread.currentThread().getName();
            return threadName;
        }
    
        // ==================== Subscribe ====================
    
        @Override
        public <T> Subscription<T> subscribe(String channel, Class<T> eventType) {
            String subId = "sub-" + idGen.incrementAndGet();
            SubscriptionImpl<T> sub = new SubscriptionImpl<>(subId, channel, null, this);
    
            channelSubs
                .computeIfAbsent(channel, k -> ConcurrentHashMap.newKeySet())
                .add(sub);
    
            subsIndex.put(subId, sub);
    
            log.info("Subscription {} created on channel '{}'", subId, channel);
            return sub;
        }
    
        @Override
        public <T> Subscription<T> subscribePattern(String pattern, Class<T> eventType) {
            String subId = "sub-" + idGen.incrementAndGet();
    
            // Convertir glob en regex
            String regex = pattern
                .replace(".", "\\.")
                .replace("*", "[^.]*")
                .replace(">", ".*");
            Pattern compiled = Pattern.compile("^" + regex + "$");
    
            SubscriptionImpl<T> sub = new SubscriptionImpl<>(subId, pattern, compiled, this);
    
            patternSubs
                .computeIfAbsent(compiled, k -> ConcurrentHashMap.newKeySet())
                .add(sub);
    
            subsIndex.put(subId, sub);
    
            log.info("Subscription {} created on pattern '{}'", subId, pattern);
            return sub;
        }
    
        @Override
        public String subscribe(String channel, EventHandler handler) {
            // Implementation simplifiee: on cree une subscription normale
            // et un thread qui consomme et appelle le handler
            Subscription<Object> sub = subscribe(channel, Object.class);
    
            Thread.ofVirtual()
                .name("handler-" + sub.getId())
                .start(() -> {
                    while (running) {
                        try {
                            BusEvent<Object> event = sub.getQueue().take();
                            if (event.payload() == null) break; // Poison pill
                            handler.onEvent(event);
                        } catch (InterruptedException e) {
                            Thread.currentThread().interrupt();
                            break;
                        } catch (Exception e) {
                            log.error("Handler error on {}: {}", channel, e.getMessage());
                        }
                    }
                });
    
            return sub.getId();
        }
    
        @Override
        public void unsubscribe(String subscriptionId) {
            SubscriptionImpl<?> sub = subsIndex.remove(subscriptionId);
            if (sub == null) {
                log.warn("Subscription {} not found", subscriptionId);
                return;
            }
    
            // Retirer des maps
            if (sub.pattern == null) {
                Set<SubscriptionImpl<?>> subs = channelSubs.get(sub.channelOrPattern);
                if (subs != null) subs.remove(sub);
            } else {
                Set<SubscriptionImpl<?>> subs = patternSubs.get(sub.pattern);
                if (subs != null) subs.remove(sub);
            }
    
            sub.notifyShutdown();
            log.info("Subscription {} removed", subscriptionId);
        }
    
        // ==================== Stats ====================
    
        @Override
        public EventBusStats getStats() {
            return new EventBusStats(
                publishedCount.get(),
                deliveredCount.get(),
                failedCount.get(),
                subsIndex.size(),
                channelSubs.size(),
                startedAt
            );
        }
    
        @Override
        public Map<String, ChannelStats> getChannelStats() {
            Map<String, ChannelStats> stats = new HashMap<>();
    
            for (Map.Entry<String, Set<SubscriptionImpl<?>>> entry : channelSubs.entrySet()) {
                stats.put(entry.getKey(), new ChannelStats(
                    entry.getKey(),
                    0, // TODO: compteur par channel
                    entry.getValue().size(),
                    0
                ));
            }
    
            return stats;
        }
    
        // ==================== Subscription Implementation ====================
    
        private static class SubscriptionImpl<T> implements Subscription<T> {
    
            private final String id;
            private final String channelOrPattern;
            private final Pattern pattern;
            private final BlockingQueue<BusEvent<T>> queue;
            private final InMemoryEventBus bus;
    
            SubscriptionImpl(String id, String channelOrPattern, Pattern pattern, InMemoryEventBus bus) {
                this.id = id;
                this.channelOrPattern = channelOrPattern;
                this.pattern = pattern;
                this.queue = new LinkedBlockingQueue<>(10_000); // Capacite par defaut
                this.bus = bus;
            }
    
            @Override
            public String getId() {
                return id;
            }
    
            @Override
            public String getChannelOrPattern() {
                return channelOrPattern;
            }
    
            @Override
            public BlockingQueue<BusEvent<T>> getQueue() {
                return queue;
            }
    
            @Override
            public void unsubscribe() {
                bus.unsubscribe(id);
            }
    
            @Override
            public int getPendingCount() {
                return queue.size();
            }
    
            void notifyShutdown() {
                // Injecter un poison pill pour debloquer les take()
                try {
                    queue.offer(new BusEvent<>(null, null, null, null, null, null));
                } catch (Exception ignored) {}
            }
        }
    }
    

    5.2 RedisEventBus

    Implementation pour multi-instances (production).

    package eu.lmvi.socle.event.impl;
    
    @Component
    @ConditionalOnProperty(name = "socle.eventbus.mode", havingValue = "redis")
    public class RedisEventBus implements EventBus {
    
        private final JedisPool jedisPool;
        private final ObjectMapper mapper;
        private final String prefix;
        private final ExecutorService listenerExecutor;
    
        // Subscriptions locales (une queue locale par subscription)
        private final Map<String, LocalSubscription<?>> localSubs = new ConcurrentHashMap<>();
    
        // JedisPubSub par channel (partage entre subscriptions locales)
        private final Map<String, RedisPubSubListener> redisListeners = new ConcurrentHashMap<>();
    
        // ... Implementation similaire avec Redis Pub/Sub ...
    
        @Override
        public void publish(String channel, Object payload) {
            try (Jedis jedis = jedisPool.getResource()) {
                BusEvent<?> event = BusEvent.of(channel, payload);
                String json = mapper.writeValueAsString(event);
                jedis.publish(prefix + channel, json);
            } catch (Exception e) {
                log.error("Redis publish error: {}", e.getMessage());
            }
        }
    
        // Redis SUBSCRIBE dans un thread dedie
        // Dispatch vers les queues locales
    }
    

    6. AbstractPushWorker

    6.1 Definition

    package eu.lmvi.socle.worker.push;
    
    /**
     * Worker reveille instantanement par l'EventBus.
     *
     * <p>Contrairement a {@link AbstractEventDrivenWorker} (DEPRECIE) qui utilise
     * un polling avec timeout, ce worker utilise un mecanisme push avec reveil
     * instantane via {@link BlockingQueue#take()}.</p>
     *
     * <h3>Avantages</h3>
     * <ul>
     *   <li>Latence ~0ms (reveil instantane)</li>
     *   <li>Zero CPU quand idle</li>
     *   <li>Integration native avec EventBus</li>
     * </ul>
     *
     * <h3>Exemple d'utilisation</h3>
     * <pre>{@code
     * @Component
     * public class OrderWorker extends AbstractPushWorker<OrderEvent> {
     *
     *     public OrderWorker() {
     *         super("orders.created", "orders.updated");
     *     }
     *
     *     @Override
     *     public String getName() {
     *         return "order_worker";
     *     }
     *
     *     @Override
     *     protected Class<OrderEvent> getEventType() {
     *         return OrderEvent.class;
     *     }
     *
     *     @Override
     *     protected void processEvent(OrderEvent event) {
     *         // Traitement instantane des que l'event arrive
     *     }
     * }
     * }</pre>
     *
     * @param <T> Type du payload des evenements
     */
    public abstract class AbstractPushWorker<T> implements Worker {
    
        protected final Logger logger = LoggerFactory.getLogger(getClass());
    
        // ==================== Injection ====================
    
        @Autowired
        protected EventBus eventBus;
    
        // ==================== Configuration ====================
    
        private final String[] channels;
        private final int concurrency;
        private final int queueCapacity;
        private final Duration shutdownTimeout;
    
        // ==================== Etat ====================
    
        protected final AtomicBoolean running = new AtomicBoolean(false);
        protected ExecutorService executor;
        protected List<Subscription<T>> subscriptions = new ArrayList<>();
        protected BlockingQueue<BusEvent<T>> mergedQueue;
    
        // ==================== Metriques ====================
    
        protected final AtomicLong processedCount = new AtomicLong(0);
        protected final AtomicLong errorCount = new AtomicLong(0);
        protected final AtomicLong totalDurationNs = new AtomicLong(0);
        protected volatile Instant lastExecution;
        protected volatile Instant startedAt;
    
        // ==================== Constructeurs ====================
    
        /**
         * Cree un worker push mono-thread.
         *
         * @param channels Channels a ecouter
         */
        protected AbstractPushWorker(String... channels) {
            this(1, channels);
        }
    
        /**
         * Cree un worker push avec concurrence.
         *
         * @param concurrency Nombre de threads de traitement
         * @param channels    Channels a ecouter
         */
        protected AbstractPushWorker(int concurrency, String... channels) {
            this(concurrency, 10_000, Duration.ofSeconds(30), channels);
        }
    
        /**
         * Cree un worker push avec configuration complete.
         *
         * @param concurrency     Nombre de threads de traitement
         * @param queueCapacity   Capacite de la queue interne
         * @param shutdownTimeout Timeout d'arret gracieux
         * @param channels        Channels a ecouter
         */
        protected AbstractPushWorker(int concurrency, int queueCapacity,
                                      Duration shutdownTimeout, String... channels) {
            this.concurrency = Math.max(1, concurrency);
            this.queueCapacity = queueCapacity;
            this.shutdownTimeout = shutdownTimeout;
            this.channels = channels;
    
            if (channels == null || channels.length == 0) {
                throw new IllegalArgumentException("At least one channel is required");
            }
        }
    
        // ==================== Worker Interface ====================
    
        @Override
        public final String getSchedule() {
            return "PUSH";
        }
    
        @Override
        public final long getCycleIntervalMs() {
            return -1;
        }
    
        @Override
        public final boolean isPassive() {
            return true;
        }
    
        @Override
        public int getStartPriority() {
            return 200;  // Apres l'EventBus (100)
        }
    
        @Override
        public int getStopPriority() {
            return 200;  // Avant l'EventBus
        }
    
        @Override
        public void initialize() {
            logger.info("[{}] Initializing push worker", getName());
            logger.info("[{}] Channels: {}", getName(), Arrays.toString(channels));
            logger.info("[{}] Concurrency: {}", getName(), concurrency);
    
            // Queue fusionnee pour tous les channels
            mergedQueue = new LinkedBlockingQueue<>(queueCapacity);
    
            onInitialize();
        }
    
        @Override
        public void start() {
            if (running.getAndSet(true)) {
                logger.warn("[{}] Already running", getName());
                return;
            }
    
            startedAt = Instant.now();
    
            // S'abonner aux channels
            for (String channel : channels) {
                if (channel.contains("*") || channel.contains(">")) {
                    // Pattern subscription
                    Subscription<T> sub = eventBus.subscribePattern(channel, getEventType());
                    subscriptions.add(sub);
                    startForwarder(sub);
                } else {
                    // Exact subscription
                    Subscription<T> sub = eventBus.subscribe(channel, getEventType());
                    subscriptions.add(sub);
                    startForwarder(sub);
                }
            }
    
            // Demarrer les workers de traitement
            executor = Executors.newThreadPerTaskExecutor(
                Thread.ofVirtual().name(getName() + "-push-", 0).factory()
            );
    
            for (int i = 0; i < concurrency; i++) {
                final int workerId = i;
                executor.submit(() -> pushLoop(workerId));
            }
    
            onStarted();
            logger.info("[{}] Started with {} workers on {} channels",
                getName(), concurrency, channels.length);
        }
    
        /**
         * Demarre un thread qui forward les events d'une subscription vers la queue fusionnee.
         */
        private void startForwarder(Subscription<T> sub) {
            Thread.ofVirtual()
                .name(getName() + "-fwd-" + sub.getId())
                .start(() -> {
                    while (running.get()) {
                        try {
                            BusEvent<T> event = sub.getQueue().take();
    
                            // Verifier poison pill
                            if (event.id() == null) break;
    
                            // Forward vers la queue fusionnee
                            if (!mergedQueue.offer(event, 1, TimeUnit.SECONDS)) {
                                logger.warn("[{}] Merged queue full, event dropped", getName());
                            }
                        } catch (InterruptedException e) {
                            Thread.currentThread().interrupt();
                            break;
                        }
                    }
                });
        }
    
        @Override
        public void stop() {
            if (!running.getAndSet(false)) {
                logger.warn("[{}] Not running", getName());
                return;
            }
    
            logger.info("[{}] Stopping...", getName());
            onStopping();
    
            // Se desabonner (injecte des poison pills)
            for (Subscription<T> sub : subscriptions) {
                try {
                    sub.unsubscribe();
                } catch (Exception e) {
                    logger.warn("[{}] Error unsubscribing: {}", getName(), e.getMessage());
                }
            }
            subscriptions.clear();
    
            // Injecter poison pills dans la queue fusionnee
            for (int i = 0; i < concurrency; i++) {
                mergedQueue.offer(new BusEvent<>(null, null, null, null, null, null));
            }
    
            // Attendre la fin des workers
            if (executor != null) {
                executor.shutdown();
                try {
                    if (!executor.awaitTermination(shutdownTimeout.toMillis(), TimeUnit.MILLISECONDS)) {
                        logger.warn("[{}] Forcing shutdown", getName());
                        executor.shutdownNow();
                    }
                } catch (InterruptedException e) {
                    executor.shutdownNow();
                    Thread.currentThread().interrupt();
                }
            }
    
            onStopped();
    
            long uptime = Duration.between(startedAt, Instant.now()).toSeconds();
            logger.info("[{}] Stopped after {}s. Processed={}, Errors={}",
                getName(), uptime, processedCount.get(), errorCount.get());
        }
    
        @Override
        public void doWork() {
            // Non utilise en mode PUSH
        }
    
        @Override
        public boolean isHealthy() {
            return running.get() && executor != null && !executor.isShutdown();
        }
    
        @Override
        public Map<String, Object> getStats() {
            long uptime = startedAt != null
                ? Duration.between(startedAt, Instant.now()).toMillis()
                : 0;
    
            double avgDurationMs = processedCount.get() > 0
                ? (totalDurationNs.get() / 1_000_000.0) / processedCount.get()
                : 0;
    
            double throughput = uptime > 0
                ? processedCount.get() * 1000.0 / uptime
                : 0;
    
            Map<String, Object> stats = new HashMap<>();
    
            // Cles standardisees
            stats.put("state", running.get() ? "running" : "stopped");
            stats.put("schedule", "PUSH");
            stats.put("execution_count", processedCount.get());
            stats.put("errors_count", errorCount.get());
            stats.put("last_execution", lastExecution != null ? lastExecution.toString() : null);
    
            // Cles specifiques
            stats.put("channels", channels);
            stats.put("concurrency", concurrency);
            stats.put("queue_size", mergedQueue != null ? mergedQueue.size() : 0);
            stats.put("queue_capacity", queueCapacity);
            stats.put("uptime_ms", uptime);
            stats.put("avg_duration_ms", Math.round(avgDurationMs * 100) / 100.0);
            stats.put("throughput_per_sec", Math.round(throughput * 100) / 100.0);
    
            // Stats des subscriptions
            stats.put("subscriptions", subscriptions.stream()
                .map(s -> Map.of(
                    "id", s.getId(),
                    "channel", s.getChannelOrPattern(),
                    "pending", s.getPendingCount()
                ))
                .toList());
    
            return stats;
        }
    
        // ==================== Push Loop ====================
    
        private void pushLoop(int workerId) {
            logger.debug("[{}] Push loop #{} started", getName(), workerId);
    
            while (running.get()) {
                try {
                    // BLOQUE jusqu'a reception d'un event (reveil instantane)
                    BusEvent<T> event = mergedQueue.take();
    
                    // Verifier poison pill
                    if (event.id() == null) {
                        logger.debug("[{}] Push loop #{} received shutdown signal", getName(), workerId);
                        break;
                    }
    
                    // Traiter l'event
                    processEventSafe(event, workerId);
    
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    logger.debug("[{}] Push loop #{} interrupted", getName(), workerId);
                    break;
                } catch (Exception e) {
                    logger.error("[{}] Unexpected error in push loop #{}: {}",
                        getName(), workerId, e.getMessage(), e);
                }
            }
    
            logger.debug("[{}] Push loop #{} stopped", getName(), workerId);
        }
    
        private void processEventSafe(BusEvent<T> event, int workerId) {
            long startNs = System.nanoTime();
    
            try {
                processEvent(event.payload(), event);
                processedCount.incrementAndGet();
            } catch (Exception e) {
                errorCount.incrementAndGet();
                handleError(event, e, workerId);
            } finally {
                long durationNs = System.nanoTime() - startNs;
                totalDurationNs.addAndGet(durationNs);
                lastExecution = Instant.now();
            }
        }
    
        // ==================== Methodes abstraites ====================
    
        /**
         * Type du payload des evenements.
         * Utilise pour le typage de la souscription.
         */
        protected abstract Class<T> getEventType();
    
        /**
         * Traite un evenement.
         *
         * @param payload Contenu de l'evenement
         * @param event   Evenement complet (avec metadata)
         */
        protected abstract void processEvent(T payload, BusEvent<T> event);
    
        /**
         * Version simplifiee sans acces a l'enveloppe.
         * Surcharger processEvent(T, BusEvent) pour avoir acces aux metadata.
         */
        protected void processEvent(T payload) {
            // Par defaut, appelle la version complete
        }
    
        // ==================== Hooks ====================
    
        /**
         * Hook d'initialisation.
         */
        protected void onInitialize() {}
    
        /**
         * Hook post-demarrage.
         */
        protected void onStarted() {}
    
        /**
         * Hook pre-arret.
         */
        protected void onStopping() {}
    
        /**
         * Hook post-arret.
         */
        protected void onStopped() {}
    
        /**
         * Gestion des erreurs.
         * Par defaut, log l'erreur. Surcharger pour DLQ, retry, etc.
         */
        protected void handleError(BusEvent<T> event, Exception e, int workerId) {
            logger.error("[{}] Worker #{} error processing event {}: {}",
                getName(), workerId, event.id(), e.getMessage(), e);
        }
    
        // ==================== Utilitaires ====================
    
        /**
         * Publie un evenement sur l'EventBus.
         * Raccourci pratique pour les workers qui produisent aussi des events.
         */
        protected void publish(String channel, Object payload) {
            eventBus.publish(channel, payload);
        }
    
        /**
         * Retourne le nombre d'evenements en attente.
         */
        public long getBacklog() {
            int pending = mergedQueue != null ? mergedQueue.size() : 0;
            for (Subscription<T> sub : subscriptions) {
                pending += sub.getPendingCount();
            }
            return pending;
        }
    }
    

    6.2 Version simplifiee (single channel)

    /**
     * Version simplifiee pour un seul channel.
     */
    public abstract class SimplePushWorker<T> extends AbstractPushWorker<T> {
    
        protected SimplePushWorker(String channel) {
            super(channel);
        }
    
        protected SimplePushWorker(int concurrency, String channel) {
            super(concurrency, channel);
        }
    
        @Override
        protected void processEvent(T payload, BusEvent<T> event) {
            processEvent(payload);
        }
    
        /**
         * Traite le payload directement.
         */
        protected abstract void processEvent(T payload);
    }
    

    7. Configuration

    7.1 application.yml

    socle:
      eventbus:
        # Activer l'EventBus
        enabled: ${EVENTBUS_ENABLED:true}
    
        # Mode: in_memory | redis
        mode: ${EVENTBUS_MODE:in_memory}
    
        # Configuration commune
        common:
          # Capacite par defaut des queues de subscription
          default_queue_capacity: ${EVENTBUS_QUEUE_CAPACITY:10000}
    
          # Timeout pour offer() quand queue pleine
          offer_timeout_ms: ${EVENTBUS_OFFER_TIMEOUT:100}
    
        # Configuration InMemory
        in_memory:
          # Rien de special
    
        # Configuration Redis
        redis:
          host: ${REDIS_HOST:localhost}
          port: ${REDIS_PORT:6379}
          password: ${REDIS_PASSWORD:}
          database: ${REDIS_DATABASE:0}
          prefix: ${EVENTBUS_REDIS_PREFIX:socle:eventbus}
    
          # Pool de connexions
          pool:
            max_total: ${EVENTBUS_REDIS_POOL_MAX:16}
            max_idle: ${EVENTBUS_REDIS_POOL_MAX_IDLE:8}
            min_idle: ${EVENTBUS_REDIS_POOL_MIN_IDLE:2}
    
        # Metriques
        metrics:
          enabled: ${EVENTBUS_METRICS_ENABLED:true}
          prefix: socle_eventbus
    

    7.2 Variables d’environnement

    Variable Description Defaut
    EVENTBUS_ENABLED Activer l’EventBus true
    EVENTBUS_MODE Mode (in_memory / redis) in_memory
    EVENTBUS_QUEUE_CAPACITY Capacite des queues 10000
    REDIS_HOST Host Redis localhost
    REDIS_PORT Port Redis 6379

    8. Exemples d’utilisation

    8.1 Worker simple

    @Component
    public class OrderNotificationWorker extends SimplePushWorker<OrderEvent> {
    
        @Autowired
        private NotificationService notificationService;
    
        public OrderNotificationWorker() {
            super(2, "orders.created");  // 2 workers concurrents
        }
    
        @Override
        public String getName() {
            return "order_notification_worker";
        }
    
        @Override
        protected Class<OrderEvent> getEventType() {
            return OrderEvent.class;
        }
    
        @Override
        protected void processEvent(OrderEvent event) {
            notificationService.notifyOrderCreated(event.orderId(), event.customerEmail());
        }
    }
    

    8.2 Worker multi-channels

    @Component
    public class AuditWorker extends AbstractPushWorker<Object> {
    
        @Autowired
        private AuditRepository auditRepository;
    
        public AuditWorker() {
            super(4, "orders.*", "payments.*", "users.*");  // Pattern matching
        }
    
        @Override
        public String getName() {
            return "audit_worker";
        }
    
        @Override
        protected Class<Object> getEventType() {
            return Object.class;  // Accepte tout
        }
    
        @Override
        protected void processEvent(Object payload, BusEvent<Object> event) {
            // Acces aux metadata
            AuditEntry entry = new AuditEntry(
                event.id(),
                event.channel(),
                event.timestamp(),
                event.source(),
                payload
            );
            auditRepository.save(entry);
        }
    }
    

    8.3 Publication d’evenements

    @Service
    public class OrderService {
    
        @Autowired
        private EventBus eventBus;
    
        @Autowired
        private OrderRepository orderRepository;
    
        public Order createOrder(CreateOrderRequest request) {
            // Logique metier
            Order order = orderRepository.save(new Order(request));
    
            // Publication sur l'EventBus
            // Tous les workers abonnes a "orders.created" sont reveilles instantanement
            eventBus.publish("orders.created", new OrderEvent(
                order.getId(),
                order.getCustomerEmail(),
                order.getTotal()
            ));
    
            return order;
        }
    
        public void updateOrderStatus(String orderId, OrderStatus newStatus) {
            Order order = orderRepository.updateStatus(orderId, newStatus);
    
            eventBus.publish("orders.updated", new OrderStatusEvent(
                orderId,
                newStatus
            ));
        }
    }
    

    8.4 Worker avec gestion d’erreur personnalisee

    @Component
    public class PaymentWorker extends AbstractPushWorker<PaymentEvent> {
    
        @Autowired
        private PaymentService paymentService;
    
        @Autowired
        private DeadLetterQueue dlq;
    
        public PaymentWorker() {
            super(2, "payments.process");
        }
    
        @Override
        public String getName() {
            return "payment_worker";
        }
    
        @Override
        protected Class<PaymentEvent> getEventType() {
            return PaymentEvent.class;
        }
    
        @Override
        protected void processEvent(PaymentEvent payload, BusEvent<PaymentEvent> event) {
            paymentService.processPayment(payload);
        }
    
        @Override
        protected void handleError(BusEvent<PaymentEvent> event, Exception e, int workerId) {
            logger.error("[{}] Payment processing failed for {}: {}",
                getName(), event.payload().paymentId(), e.getMessage());
    
            // Envoyer en DLQ pour retry ulterieur
            dlq.send("payments.dlq", event, e);
        }
    }
    

    9. Solutions aux questions ouvertes

    9.1 Arret propre des workers en take()

    Probleme : BlockingQueue.take() bloque indefiniment. Comment arreter proprement ?

    Solution : Poison Pill Pattern

    // A l'arret, injecter un evenement "poison" reconnaissable
    @Override
    public void stop() {
        running.set(false);
    
        // Injecter N poison pills (un par worker)
        for (int i = 0; i < concurrency; i++) {
            queue.offer(new BusEvent<>(null, null, null, null, null, null));
        }
    }
    
    // Dans la boucle, verifier le poison
    private void pushLoop(int workerId) {
        while (running.get()) {
            BusEvent<T> event = queue.take();
    
            // Detecter le poison pill (id == null)
            if (event.id() == null) {
                break;  // Sortir proprement
            }
    
            processEvent(event);
        }
    }
    

    9.2 Backpressure (queue bornee vs illimitee)

    Probleme : Que faire si les events arrivent plus vite qu’ils ne sont traites ?

    Solution : Queue bornee avec politique de debordement

    // Queue bornee (recommande)
    BlockingQueue<BusEvent<T>> queue = new LinkedBlockingQueue<>(10_000);
    
    // Lors du publish, gerer le debordement
    private boolean deliver(Subscription sub, BusEvent event) {
        // offer() avec timeout - ne bloque pas indefiniment
        boolean offered = queue.offer(event, 100, TimeUnit.MILLISECONDS);
    
        if (!offered) {
            // Politique de debordement:
            // Option 1: Drop (log + metrique)
            log.warn("Queue full, event dropped");
            droppedCount.incrementAndGet();
    
            // Option 2: Envoyer en DLQ
            dlq.send("overflow", event);
    
            // Option 3: Alerter
            alertService.queueOverflow(sub.getId());
        }
    
        return offered;
    }
    

    Metriques a surveiller :

    • socle_eventbus_queue_size : Taille actuelle
    • socle_eventbus_queue_capacity : Capacite max
    • socle_eventbus_dropped_total : Events perdus

    9.3 Multi-channel avec une seule queue

    Probleme : Un worker ecoute plusieurs channels, comment fusionner ?

    Solution : Queue fusionnee avec forwarders

    // Queue fusionnee
    BlockingQueue<BusEvent<T>> mergedQueue = new LinkedBlockingQueue<>();
    
    // Un forwarder par subscription
    for (String channel : channels) {
        Subscription<T> sub = eventBus.subscribe(channel, eventType);
    
        // Thread qui forward vers la queue fusionnee
        Thread.ofVirtual().start(() -> {
            while (running.get()) {
                BusEvent<T> event = sub.getQueue().take();
                mergedQueue.offer(event);
            }
        });
    }
    
    // Les workers consomment la queue fusionnee
    void pushLoop() {
        while (running.get()) {
            BusEvent<T> event = mergedQueue.take();
            // event.channel() permet de savoir d'ou il vient
            processEvent(event);
        }
    }
    

    9.4 Ordre des evenements

    Probleme : Les evenements doivent-ils etre traites dans l’ordre ?

    Solution : Depends du cas d’usage

    Mode Garantie Implementation
    Aucune Pas d’ordre garanti Concurrency > 1
    Par channel Ordre par channel Concurrency = 1 par channel
    Par cle Ordre par cle metier Partitioning sur la cle
    // Pour garantir l'ordre par cle (ex: par orderId)
    public class OrderedPushWorker extends AbstractPushWorker<OrderEvent> {
    
        // Une queue par partition (hash de la cle)
        private final BlockingQueue<BusEvent<OrderEvent>>[] partitionQueues;
        private final int partitions = 8;
    
        @Override
        protected void processEvent(OrderEvent payload, BusEvent<OrderEvent> event) {
            // Le hash garantit que les events du meme orderId vont dans la meme partition
            int partition = Math.abs(payload.orderId().hashCode()) % partitions;
            partitionQueues[partition].offer(event);
        }
    
        // Un worker par partition = ordre garanti par partition
    }
    

    9.5 Retry et DLQ

    Probleme : Que faire si le traitement echoue ?

    Solution : Retry avec backoff puis DLQ

    @Component
    public class ResilientPushWorker extends AbstractPushWorker<MyEvent> {
    
        @Autowired
        private EventBus eventBus;
    
        private static final int MAX_RETRIES = 3;
    
        @Override
        protected void processEvent(MyEvent payload, BusEvent<MyEvent> event) {
            int retryCount = getRetryCount(event);
    
            try {
                doProcess(payload);
            } catch (RetryableException e) {
                if (retryCount < MAX_RETRIES) {
                    // Re-publier avec retry count incremente
                    scheduleRetry(event, retryCount + 1);
                } else {
                    // Max retries atteint -> DLQ
                    sendToDlq(event, e);
                }
            }
        }
    
        private void scheduleRetry(BusEvent<MyEvent> event, int retryCount) {
            // Backoff exponentiel
            long delayMs = (long) Math.pow(2, retryCount) * 1000;
    
            // Re-publier apres le delai
            scheduler.schedule(() -> {
                Map<String, String> metadata = new HashMap<>(event.metadata());
                metadata.put("retry_count", String.valueOf(retryCount));
                eventBus.publish(event.channel(), event.payload(), metadata);
            }, delayMs, TimeUnit.MILLISECONDS);
        }
    
        private int getRetryCount(BusEvent<MyEvent> event) {
            return Integer.parseInt(event.metadata().getOrDefault("retry_count", "0"));
        }
    
        private void sendToDlq(BusEvent<MyEvent> event, Exception e) {
            eventBus.publish("dlq." + event.channel(), new DlqEntry(event, e));
        }
    }
    

    10. Migration des workers existants

    10.1 Avant (AbstractEventDrivenWorker)

    // DEPRECIE
    @Component
    public class OldOrderWorker extends AbstractEventDrivenWorker<OrderEvent> {
    
        private final BlockingQueue<OrderEvent> queue = new LinkedBlockingQueue<>();
    
        public OldOrderWorker() {
            super(4);
        }
    
        @Override
        public String getName() {
            return "order_worker";
        }
    
        @Override
        protected OrderEvent pollEvent() throws InterruptedException {
            return queue.poll(100, TimeUnit.MILLISECONDS);  // Polling!
        }
    
        @Override
        protected void processEvent(OrderEvent event) {
            // Traitement
        }
    
        // Methode externe pour recevoir les events
        public void onOrderReceived(OrderEvent event) {
            queue.offer(event);
        }
    }
    

    10.2 Apres (AbstractPushWorker)

    // RECOMMANDE
    @Component
    public class NewOrderWorker extends AbstractPushWorker<OrderEvent> {
    
        public NewOrderWorker() {
            super(4, "orders.created");  // Specifie le channel
        }
    
        @Override
        public String getName() {
            return "order_worker";
        }
    
        @Override
        protected Class<OrderEvent> getEventType() {
            return OrderEvent.class;
        }
    
        @Override
        protected void processEvent(OrderEvent payload, BusEvent<OrderEvent> event) {
            // Meme traitement
        }
    
        // Plus besoin de methode externe - l'EventBus gere tout
    }
    

    10.3 Etapes de migration

    1. Ajouter la dependance EventBus au worker
    2. Changer la classe parente : AbstractEventDrivenWorkerAbstractPushWorker
    3. Specifier les channels dans le constructeur
    4. Implementer getEventType()
    5. Adapter processEvent() pour accepter BusEvent
    6. Supprimer la queue interne et pollEvent()
    7. Modifier les producteurs pour utiliser eventBus.publish()
    8. Tester le comportement

    11. Metriques Prometheus

    # EventBus
    socle_eventbus_published_total{channel="orders.created"}
    socle_eventbus_delivered_total{channel="orders.created"}
    socle_eventbus_failed_total{channel="orders.created"}
    socle_eventbus_subscriptions_active{channel="orders.created"}
    socle_eventbus_queue_size{subscription="sub-123"}
    
    # Push Workers
    socle_push_worker_processed_total{worker="order_worker"}
    socle_push_worker_errors_total{worker="order_worker"}
    socle_push_worker_queue_size{worker="order_worker"}
    socle_push_worker_duration_seconds{worker="order_worker", quantile="0.99"}
    

    12. API REST Admin

    GET  /admin/eventbus/stats              → Statistiques globales
    GET  /admin/eventbus/channels           → Liste des channels actifs
    GET  /admin/eventbus/subscriptions      → Liste des subscriptions
    
    POST /admin/eventbus/publish            → Publier un event (debug)
         Body: {"channel": "test", "payload": {...}}
    

    13. Voir aussi

    Socle V004 – EventBus et Workers Push

  • Socle V004 – Kubernetes

    Socle V004 – Kubernetes

    16 – Kubernetes

    Version : 4.0.0 Date : 2025-12-09

    1. Introduction

    Guide de déploiement du Socle V4 sur Kubernetes.

    2. Image Docker

    2.1 Dockerfile

    FROM eclipse-temurin:21-jre-alpine
    
    LABEL maintainer="your-team@company.com"
    LABEL version="4.0.0"
    
    WORKDIR /app
    
    # Non-root user
    RUN addgroup -S socle && adduser -S socle -G socle
    USER socle
    
    # Copy application
    COPY --chown=socle:socle target/socle-v004-4.0.0.jar app.jar
    
    # Health check
    HEALTHCHECK --interval=30s --timeout=10s --retries=3 \
      CMD wget -qO- http://localhost:8080/admin/health/live || exit 1
    
    # Default environment
    ENV JAVA_OPTS="-XX:+UseContainerSupport -XX:MaxRAMPercentage=75.0"
    
    EXPOSE 8080
    
    ENTRYPOINT ["sh", "-c", "java $JAVA_OPTS -jar app.jar"]
    

    2.2 Build et Push

    # Build
    docker build -t gcr.io/my-project/socle-v4:4.0.0 .
    
    # Push
    docker push gcr.io/my-project/socle-v4:4.0.0
    

    3. Manifests Kubernetes

    3.1 Namespace

    apiVersion: v1
    kind: Namespace
    metadata:
      name: socle
      labels:
        name: socle
    

    3.2 ConfigMap

    apiVersion: v1
    kind: ConfigMap
    metadata:
      name: socle-config
      namespace: socle
    data:
      APP_NAME: "socle-v4"
      ENV_NAME: "PROD"
      REGION: "europe-west1"
      HTTP_PORT: "8080"
      KVBUS_MODE: "redis"
      REDIS_HOST: "redis-master.redis.svc.cluster.local"
      TECHDB_ENABLED: "true"
      LOG_FORWARDER_ENABLED: "true"
      LOG_TRANSPORT_MODE: "http"
      SCHEDULER_ENABLED: "true"
      ADMIN_ENABLED: "true"
      ADMIN_AUTH_ENABLED: "true"
    

    3.3 Secret

    apiVersion: v1
    kind: Secret
    metadata:
      name: socle-secrets
      namespace: socle
    type: Opaque
    stringData:
      REDIS_PASSWORD: "your-redis-password"
      ADMIN_PASSWORD: "your-admin-password"
      API_KEY: "your-api-key"
      TECHDB_PASSWORD: "your-techdb-password"
    

    3.4 Deployment

    apiVersion: apps/v1
    kind: Deployment
    metadata:
      name: socle-v4
      namespace: socle
      labels:
        app: socle-v4
        version: "4.0.0"
    spec:
      replicas: 2
      selector:
        matchLabels:
          app: socle-v4
      strategy:
        type: RollingUpdate
        rollingUpdate:
          maxSurge: 1
          maxUnavailable: 0
      template:
        metadata:
          labels:
            app: socle-v4
            version: "4.0.0"
          annotations:
            prometheus.io/scrape: "true"
            prometheus.io/path: "/actuator/prometheus"
            prometheus.io/port: "8080"
        spec:
          serviceAccountName: socle-sa
          securityContext:
            runAsNonRoot: true
            runAsUser: 1000
            fsGroup: 1000
          containers:
            - name: socle
              image: gcr.io/my-project/socle-v4:4.0.0
              imagePullPolicy: Always
              ports:
                - name: http
                  containerPort: 8080
                  protocol: TCP
              envFrom:
                - configMapRef:
                    name: socle-config
                - secretRef:
                    name: socle-secrets
              env:
                - name: POD_NAME
                  valueFrom:
                    fieldRef:
                      fieldPath: metadata.name
                - name: POD_NAMESPACE
                  valueFrom:
                    fieldRef:
                      fieldPath: metadata.namespace
                - name: EXEC_ID
                  value: "$(POD_NAME)"
              resources:
                requests:
                  cpu: "250m"
                  memory: "512Mi"
                limits:
                  cpu: "1000m"
                  memory: "1Gi"
              livenessProbe:
                httpGet:
                  path: /admin/health/live
                  port: 8080
                initialDelaySeconds: 30
                periodSeconds: 10
                timeoutSeconds: 5
                failureThreshold: 3
              readinessProbe:
                httpGet:
                  path: /admin/health/ready
                  port: 8080
                initialDelaySeconds: 10
                periodSeconds: 5
                timeoutSeconds: 3
                failureThreshold: 3
              volumeMounts:
                - name: data
                  mountPath: /app/data
                - name: logs
                  mountPath: /app/logs
          volumes:
            - name: data
              emptyDir: {}
            - name: logs
              emptyDir: {}
          affinity:
            podAntiAffinity:
              preferredDuringSchedulingIgnoredDuringExecution:
                - weight: 100
                  podAffinityTerm:
                    labelSelector:
                      matchLabels:
                        app: socle-v4
                    topologyKey: kubernetes.io/hostname
    

    3.5 Service

    apiVersion: v1
    kind: Service
    metadata:
      name: socle-v4
      namespace: socle
      labels:
        app: socle-v4
    spec:
      type: ClusterIP
      ports:
        - name: http
          port: 80
          targetPort: 8080
          protocol: TCP
      selector:
        app: socle-v4
    

    3.6 Ingress

    apiVersion: networking.k8s.io/v1
    kind: Ingress
    metadata:
      name: socle-v4
      namespace: socle
      annotations:
        kubernetes.io/ingress.class: nginx
        nginx.ingress.kubernetes.io/ssl-redirect: "true"
        cert-manager.io/cluster-issuer: letsencrypt-prod
    spec:
      tls:
        - hosts:
            - socle.example.com
          secretName: socle-tls
      rules:
        - host: socle.example.com
          http:
            paths:
              - path: /
                pathType: Prefix
                backend:
                  service:
                    name: socle-v4
                    port:
                      number: 80
    

    3.7 HorizontalPodAutoscaler

    apiVersion: autoscaling/v2
    kind: HorizontalPodAutoscaler
    metadata:
      name: socle-v4
      namespace: socle
    spec:
      scaleTargetRef:
        apiVersion: apps/v1
        kind: Deployment
        name: socle-v4
      minReplicas: 2
      maxReplicas: 10
      metrics:
        - type: Resource
          resource:
            name: cpu
            target:
              type: Utilization
              averageUtilization: 70
        - type: Resource
          resource:
            name: memory
            target:
              type: Utilization
              averageUtilization: 80
    

    3.8 PodDisruptionBudget

    apiVersion: policy/v1
    kind: PodDisruptionBudget
    metadata:
      name: socle-v4
      namespace: socle
    spec:
      minAvailable: 1
      selector:
        matchLabels:
          app: socle-v4
    

    3.9 ServiceAccount

    apiVersion: v1
    kind: ServiceAccount
    metadata:
      name: socle-sa
      namespace: socle
    

    4. Persistence avec PVC

    4.1 PersistentVolumeClaim

    apiVersion: v1
    kind: PersistentVolumeClaim
    metadata:
      name: socle-data
      namespace: socle
    spec:
      accessModes:
        - ReadWriteOnce
      storageClassName: standard
      resources:
        requests:
          storage: 10Gi
    

    4.2 Deployment avec PVC

    # Dans le Deployment
    spec:
      template:
        spec:
          containers:
            - name: socle
              volumeMounts:
                - name: data
                  mountPath: /app/data
          volumes:
            - name: data
              persistentVolumeClaim:
                claimName: socle-data
    

    5. Network Policies

    apiVersion: networking.k8s.io/v1
    kind: NetworkPolicy
    metadata:
      name: socle-network-policy
      namespace: socle
    spec:
      podSelector:
        matchLabels:
          app: socle-v4
      policyTypes:
        - Ingress
        - Egress
      ingress:
        # Allow from ingress controller
        - from:
            - namespaceSelector:
                matchLabels:
                  name: ingress-nginx
          ports:
            - port: 8080
        # Allow from Prometheus
        - from:
            - namespaceSelector:
                matchLabels:
                  name: monitoring
          ports:
            - port: 8080
      egress:
        # Allow to Redis
        - to:
            - namespaceSelector:
                matchLabels:
                  name: redis
          ports:
            - port: 6379
        # Allow to DNS
        - to:
            - namespaceSelector: {}
              podSelector:
                matchLabels:
                  k8s-app: kube-dns
          ports:
            - port: 53
              protocol: UDP
    

    6. Helm Chart

    6.1 Chart.yaml

    apiVersion: v2
    name: socle-v4
    description: Socle V4 Framework
    version: 4.0.0
    appVersion: "4.0.0"
    

    6.2 values.yaml

    replicaCount: 2
    
    image:
      repository: gcr.io/my-project/socle-v4
      tag: "4.0.0"
      pullPolicy: Always
    
    service:
      type: ClusterIP
      port: 80
    
    ingress:
      enabled: true
      className: nginx
      hosts:
        - host: socle.example.com
          paths:
            - path: /
              pathType: Prefix
      tls:
        - secretName: socle-tls
          hosts:
            - socle.example.com
    
    resources:
      requests:
        cpu: 250m
        memory: 512Mi
      limits:
        cpu: 1000m
        memory: 1Gi
    
    autoscaling:
      enabled: true
      minReplicas: 2
      maxReplicas: 10
      targetCPUUtilizationPercentage: 70
    
    config:
      APP_NAME: socle-v4
      ENV_NAME: PROD
      KVBUS_MODE: redis
    
    secrets:
      REDIS_PASSWORD: ""
      ADMIN_PASSWORD: ""
      API_KEY: ""
    

    6.3 Installation

    # Install
    helm install socle-v4 ./socle-v4-chart -n socle --create-namespace -f values-prod.yaml
    
    # Upgrade
    helm upgrade socle-v4 ./socle-v4-chart -n socle -f values-prod.yaml
    
    # Uninstall
    helm uninstall socle-v4 -n socle
    

    7. Observability

    7.1 ServiceMonitor (Prometheus Operator)

    apiVersion: monitoring.coreos.com/v1
    kind: ServiceMonitor
    metadata:
      name: socle-v4
      namespace: socle
      labels:
        release: prometheus
    spec:
      selector:
        matchLabels:
          app: socle-v4
      endpoints:
        - port: http
          path: /actuator/prometheus
          interval: 15s
    

    7.2 PrometheusRule

    apiVersion: monitoring.coreos.com/v1
    kind: PrometheusRule
    metadata:
      name: socle-v4-alerts
      namespace: socle
    spec:
      groups:
        - name: socle-v4
          rules:
            - alert: SocleHighErrorRate
              expr: rate(socle_errors_total[5m]) > 0.1
              for: 5m
              labels:
                severity: warning
              annotations:
                summary: High error rate
    

    8. Déploiement Multi-région

    8.1 Structure

    clusters/
    ├── europe-west1/
    │   ├── kustomization.yaml
    │   └── config-patch.yaml
    ├── us-central1/
    │   ├── kustomization.yaml
    │   └── config-patch.yaml
    └── base/
        ├── kustomization.yaml
        ├── deployment.yaml
        ├── service.yaml
        └── configmap.yaml
    

    8.2 Kustomize overlay

    # clusters/europe-west1/kustomization.yaml
    apiVersion: kustomize.config.k8s.io/v1beta1
    kind: Kustomization
    bases:
      - ../../base
    patchesStrategicMerge:
      - config-patch.yaml
    configMapGenerator:
      - name: socle-config
        behavior: merge
        literals:
          - REGION=europe-west1
    

    9. Troubleshooting

    Commandes utiles

    # Logs
    kubectl logs -f deployment/socle-v4 -n socle
    
    # Describe pod
    kubectl describe pod -l app=socle-v4 -n socle
    
    # Port forward
    kubectl port-forward svc/socle-v4 8080:80 -n socle
    
    # Exec into pod
    kubectl exec -it deployment/socle-v4 -n socle -- sh
    
    # Check events
    kubectl get events -n socle --sort-by='.lastTimestamp'
    

    10. Références

  • Socle V004 – EventBus et Workers

    Socle V004 – EventBus et Workers

    30 – EventBus et Workers Push

    Version : 4.1.0 Date : 2026-01-23 Statut : Specification

    1. Introduction

    Ce document introduit le nouveau systeme EventBus et les Workers Push dans le Socle V004. Ce mecanisme remplace avantageusement le polling par un systeme de notification instantanee.

    1.1 Motivation

    Le mecanisme actuel des workers event-driven (AbstractEventDrivenWorker) utilise un polling avec timeout :

    // Ancien mecanisme (DEPRECIE)
    while (running) {
        event = queue.poll(100, TimeUnit.MILLISECONDS);  // Verifie toutes les 100ms
        if (event != null) processEvent(event);
    }
    

    Problemes :

    • Latence de 0 a 100ms avant traitement
    • Consommation CPU constante (meme si faible)
    • Pas de vrai mecanisme push

    1.2 Nouveau mecanisme Push

    // Nouveau mecanisme (RECOMMANDE)
    while (running) {
        event = queue.take();  // Dort jusqu'a ce qu'un event arrive
        processEvent(event);   // Reveil instantane
    }
    

    Avantages :

    • Latence ~0ms (reveil instantane)
    • Zero CPU quand idle
    • Vrai mecanisme push event-driven

    2. Depreciation des anciens Workers

    2.1 Classes depreciees

    Classe Statut Remplacement
    AbstractEventDrivenWorker<T> DEPRECIE AbstractPushWorker<T>
    KafkaEventWorker<K,V> DEPRECIE KafkaPushWorker<K,V>

    2.2 Migration

    Les anciens workers continuent de fonctionner mais ne doivent plus etre utilises pour les nouveaux developpements.

    // DEPRECIE - Ne plus utiliser
    @Deprecated(since = "4.1.0", forRemoval = true)
    public abstract class AbstractEventDrivenWorker<T> implements Worker { ... }
    
    // RECOMMANDE - Utiliser ceci
    public abstract class AbstractPushWorker<T> implements Worker { ... }
    

    2.3 Calendrier de depreciation

    Version Action
    4.1.0 Introduction EventBus + Workers Push, depreciation des anciens
    4.2.0 Warnings de compilation pour les anciens workers
    5.0.0 Suppression des anciens workers (breaking change)

    3. Architecture EventBus

    3.1 Vue d’ensemble

    ┌─────────────────────────────────────────────────────────────────────┐
    │                              MOP                                     │
    │                                                                      │
    │  ┌──────────────────────────────────────────────────────────────┐   │
    │  │                        EventBus                               │   │
    │  │                                                               │   │
    │  │   Channels:                                                   │   │
    │  │     "orders.created"   ──► [Worker1.queue, Worker2.queue]     │   │
    │  │     "orders.updated"   ──► [Worker1.queue]                    │   │
    │  │     "alerts.*"         ──► [AlertWorker.queue]                │   │
    │  │                                                               │   │
    │  └──────────────────────────────────────────────────────────────┘   │
    │                              │                                       │
    │         publish("orders.created", event)                            │
    │                              │                                       │
    │                              ▼                                       │
    │  ┌────────────────┐  ┌────────────────┐  ┌────────────────┐        │
    │  │ PushWorker 1   │  │ PushWorker 2   │  │ PushWorker 3   │        │
    │  │                │  │                │  │                │        │
    │  │ queue.take()   │  │ queue.take()   │  │ queue.take()   │        │
    │  │ (dort)         │  │ (dort)         │  │ (dort)         │        │
    │  │      ↓         │  │      ↓         │  │                │        │
    │  │ REVEIL!        │  │ REVEIL!        │  │ (pas abonne)   │        │
    │  └────────────────┘  └────────────────┘  └────────────────┘        │
    │                                                                      │
    └─────────────────────────────────────────────────────────────────────┘
    

    3.2 Composants

    Composant Role
    EventBus Bus central de publication/souscription
    BusEvent<T> Enveloppe d’evenement avec metadata
    AbstractPushWorker<T> Worker reveille par EventBus
    KafkaPushWorker<K,V> Worker Kafka avec push interne
    EventBusWorker Worker qui gere le lifecycle de l’EventBus

    4. Interface EventBus

    4.1 Definition

    package eu.lmvi.socle.event;
    
    /**
     * Bus d'evenements central avec mecanisme push.
     *
     * <p>L'EventBus permet la communication entre workers via un systeme
     * de publication/souscription avec reveil instantane.</p>
     */
    public interface EventBus {
    
        // ==================== Publication ====================
    
        /**
         * Publie un evenement sur un channel.
         * Les subscribers sont reveilles instantanement.
         *
         * @param channel Nom du channel (ex: "orders.created")
         * @param payload Contenu de l'evenement
         */
        void publish(String channel, Object payload);
    
        /**
         * Publie un evenement avec metadata personnalisees.
         *
         * @param channel  Nom du channel
         * @param payload  Contenu de l'evenement
         * @param metadata Metadata additionnelles
         */
        void publish(String channel, Object payload, Map<String, String> metadata);
    
        /**
         * Publication asynchrone avec confirmation.
         *
         * @return Future completee quand tous les subscribers ont recu l'event
         */
        CompletableFuture<PublishResult> publishAsync(String channel, Object payload);
    
        // ==================== Souscription ====================
    
        /**
         * S'abonne a un channel et retourne une queue pour reception.
         * Utiliser queue.take() pour attendre les evenements (push).
         *
         * @param channel   Nom du channel
         * @param eventType Type attendu du payload
         * @return Queue sur laquelle faire take()
         */
        <T> Subscription<T> subscribe(String channel, Class<T> eventType);
    
        /**
         * S'abonne avec un pattern glob (ex: "orders.*", "*.created").
         *
         * @param pattern   Pattern glob
         * @param eventType Type attendu du payload
         * @return Subscription avec queue
         */
        <T> Subscription<T> subscribePattern(String pattern, Class<T> eventType);
    
        /**
         * S'abonne avec un handler callback (execution immediate).
         *
         * @param channel Nom du channel
         * @param handler Handler appele pour chaque evenement
         * @return ID de souscription pour unsubscribe
         */
        String subscribe(String channel, EventHandler handler);
    
        /**
         * Se desabonne.
         *
         * @param subscriptionId ID retourne par subscribe
         */
        void unsubscribe(String subscriptionId);
    
        // ==================== Lifecycle ====================
    
        /**
         * Demarre l'EventBus.
         */
        void start();
    
        /**
         * Arrete l'EventBus proprement.
         * Les queues sont videes, les subscribers notifies.
         */
        void stop();
    
        /**
         * Verifie la sante de l'EventBus.
         */
        boolean isHealthy();
    
        // ==================== Stats ====================
    
        /**
         * Statistiques globales.
         */
        EventBusStats getStats();
    
        /**
         * Statistiques par channel.
         */
        Map<String, ChannelStats> getChannelStats();
    }
    

    4.2 Classes de support

    package eu.lmvi.socle.event;
    
    /**
     * Evenement transporte par l'EventBus.
     */
    public record BusEvent<T>(
        String id,                      // UUID unique
        String channel,                 // Channel de publication
        Instant timestamp,              // Timestamp de publication
        String source,                  // Source (worker name)
        T payload,                      // Contenu metier
        Map<String, String> metadata    // Metadata additionnelles
    ) {
        /**
         * Cree un evenement simple.
         */
        public static <T> BusEvent<T> of(String channel, T payload) {
            return new BusEvent<>(
                UUID.randomUUID().toString(),
                channel,
                Instant.now(),
                null,
                payload,
                Map.of()
            );
        }
    }
    
    /**
     * Souscription a un channel.
     */
    public interface Subscription<T> {
    
        /**
         * ID unique de la souscription.
         */
        String getId();
    
        /**
         * Channel ou pattern souscrit.
         */
        String getChannelOrPattern();
    
        /**
         * Queue de reception des evenements.
         * Utiliser take() pour attendre (bloquant, reveil instantane).
         * Utiliser poll(timeout) si besoin d'un timeout.
         */
        BlockingQueue<BusEvent<T>> getQueue();
    
        /**
         * Se desabonner.
         */
        void unsubscribe();
    
        /**
         * Nombre d'evenements en attente dans la queue.
         */
        int getPendingCount();
    }
    
    /**
     * Handler pour souscription callback.
     */
    @FunctionalInterface
    public interface EventHandler {
        void onEvent(BusEvent<?> event);
    }
    
    /**
     * Resultat de publication.
     */
    public record PublishResult(
        String eventId,
        int deliveredCount,
        int failedCount,
        Duration duration
    ) {}
    
    /**
     * Statistiques EventBus.
     */
    public record EventBusStats(
        long totalPublished,
        long totalDelivered,
        long totalFailed,
        int activeSubscriptions,
        int activeChannels,
        Instant startedAt
    ) {}
    
    /**
     * Statistiques par channel.
     */
    public record ChannelStats(
        String channel,
        long publishedCount,
        int subscriberCount,
        long lastPublishedAt
    ) {}
    

    5. Implementations EventBus

    5.1 InMemoryEventBus

    Implementation pour mono-instance (dev, tests, applications simples).

    package eu.lmvi.socle.event.impl;
    
    @Component
    @ConditionalOnProperty(name = "socle.eventbus.mode", havingValue = "in_memory", matchIfMissing = true)
    public class InMemoryEventBus implements EventBus {
    
        private static final Logger log = LoggerFactory.getLogger(InMemoryEventBus.class);
    
        // Subscriptions par channel exact
        private final Map<String, Set<SubscriptionImpl<?>>> channelSubs = new ConcurrentHashMap<>();
    
        // Subscriptions par pattern
        private final Map<Pattern, Set<SubscriptionImpl<?>>> patternSubs = new ConcurrentHashMap<>();
    
        // Index pour lookup rapide
        private final Map<String, SubscriptionImpl<?>> subsIndex = new ConcurrentHashMap<>();
    
        // Compteurs
        private final AtomicLong idGen = new AtomicLong();
        private final AtomicLong publishedCount = new AtomicLong();
        private final AtomicLong deliveredCount = new AtomicLong();
        private final AtomicLong failedCount = new AtomicLong();
    
        // Etat
        private volatile boolean running = false;
        private Instant startedAt;
    
        // ==================== Lifecycle ====================
    
        @Override
        public void start() {
            running = true;
            startedAt = Instant.now();
            log.info("InMemoryEventBus started");
        }
    
        @Override
        public void stop() {
            running = false;
    
            // Notifier tous les subscribers (poison pill)
            for (SubscriptionImpl<?> sub : subsIndex.values()) {
                sub.notifyShutdown();
            }
    
            log.info("InMemoryEventBus stopped. Published={}, Delivered={}, Failed={}",
                publishedCount.get(), deliveredCount.get(), failedCount.get());
        }
    
        @Override
        public boolean isHealthy() {
            return running;
        }
    
        // ==================== Publish ====================
    
        @Override
        public void publish(String channel, Object payload) {
            publish(channel, payload, Map.of());
        }
    
        @Override
        public void publish(String channel, Object payload, Map<String, String> metadata) {
            if (!running) {
                log.warn("EventBus not running, event dropped on channel '{}'", channel);
                return;
            }
    
            BusEvent<?> event = new BusEvent<>(
                "evt-" + idGen.incrementAndGet(),
                channel,
                Instant.now(),
                resolveSource(),
                payload,
                metadata
            );
    
            int delivered = 0;
            int failed = 0;
    
            // Livraison aux subscriptions exactes
            Set<SubscriptionImpl<?>> exact = channelSubs.get(channel);
            if (exact != null) {
                for (SubscriptionImpl<?> sub : exact) {
                    if (deliverToSubscription(sub, event)) {
                        delivered++;
                    } else {
                        failed++;
                    }
                }
            }
    
            // Livraison aux subscriptions par pattern
            for (Map.Entry<Pattern, Set<SubscriptionImpl<?>>> entry : patternSubs.entrySet()) {
                if (entry.getKey().matcher(channel).matches()) {
                    for (SubscriptionImpl<?> sub : entry.getValue()) {
                        if (deliverToSubscription(sub, event)) {
                            delivered++;
                        } else {
                            failed++;
                        }
                    }
                }
            }
    
            publishedCount.incrementAndGet();
            deliveredCount.addAndGet(delivered);
            failedCount.addAndGet(failed);
    
            log.debug("Published event {} to channel '{}': delivered={}, failed={}",
                event.id(), channel, delivered, failed);
        }
    
        @Override
        public CompletableFuture<PublishResult> publishAsync(String channel, Object payload) {
            return CompletableFuture.supplyAsync(() -> {
                Instant start = Instant.now();
                publish(channel, payload);
                // Simplification: on ne trace pas le detail ici
                return new PublishResult(
                    "evt-" + idGen.get(),
                    1, 0,
                    Duration.between(start, Instant.now())
                );
            });
        }
    
        @SuppressWarnings("unchecked")
        private boolean deliverToSubscription(SubscriptionImpl<?> sub, BusEvent<?> event) {
            try {
                BlockingQueue<BusEvent<?>> queue = (BlockingQueue<BusEvent<?>>) sub.getQueue();
    
                // offer avec timeout pour eviter blocage infini
                boolean offered = queue.offer(event, 100, TimeUnit.MILLISECONDS);
    
                if (!offered) {
                    log.warn("Queue full for subscription {}, event dropped", sub.getId());
                    return false;
                }
    
                return true;
    
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                return false;
            } catch (Exception e) {
                log.error("Error delivering to subscription {}: {}", sub.getId(), e.getMessage());
                return false;
            }
        }
    
        private String resolveSource() {
            // Recuperer le nom du worker courant si possible
            String threadName = Thread.currentThread().getName();
            return threadName;
        }
    
        // ==================== Subscribe ====================
    
        @Override
        public <T> Subscription<T> subscribe(String channel, Class<T> eventType) {
            String subId = "sub-" + idGen.incrementAndGet();
            SubscriptionImpl<T> sub = new SubscriptionImpl<>(subId, channel, null, this);
    
            channelSubs
                .computeIfAbsent(channel, k -> ConcurrentHashMap.newKeySet())
                .add(sub);
    
            subsIndex.put(subId, sub);
    
            log.info("Subscription {} created on channel '{}'", subId, channel);
            return sub;
        }
    
        @Override
        public <T> Subscription<T> subscribePattern(String pattern, Class<T> eventType) {
            String subId = "sub-" + idGen.incrementAndGet();
    
            // Convertir glob en regex
            String regex = pattern
                .replace(".", "\\.")
                .replace("*", "[^.]*")
                .replace(">", ".*");
            Pattern compiled = Pattern.compile("^" + regex + "$");
    
            SubscriptionImpl<T> sub = new SubscriptionImpl<>(subId, pattern, compiled, this);
    
            patternSubs
                .computeIfAbsent(compiled, k -> ConcurrentHashMap.newKeySet())
                .add(sub);
    
            subsIndex.put(subId, sub);
    
            log.info("Subscription {} created on pattern '{}'", subId, pattern);
            return sub;
        }
    
        @Override
        public String subscribe(String channel, EventHandler handler) {
            // Implementation simplifiee: on cree une subscription normale
            // et un thread qui consomme et appelle le handler
            Subscription<Object> sub = subscribe(channel, Object.class);
    
            Thread.ofVirtual()
                .name("handler-" + sub.getId())
                .start(() -> {
                    while (running) {
                        try {
                            BusEvent<Object> event = sub.getQueue().take();
                            if (event.payload() == null) break; // Poison pill
                            handler.onEvent(event);
                        } catch (InterruptedException e) {
                            Thread.currentThread().interrupt();
                            break;
                        } catch (Exception e) {
                            log.error("Handler error on {}: {}", channel, e.getMessage());
                        }
                    }
                });
    
            return sub.getId();
        }
    
        @Override
        public void unsubscribe(String subscriptionId) {
            SubscriptionImpl<?> sub = subsIndex.remove(subscriptionId);
            if (sub == null) {
                log.warn("Subscription {} not found", subscriptionId);
                return;
            }
    
            // Retirer des maps
            if (sub.pattern == null) {
                Set<SubscriptionImpl<?>> subs = channelSubs.get(sub.channelOrPattern);
                if (subs != null) subs.remove(sub);
            } else {
                Set<SubscriptionImpl<?>> subs = patternSubs.get(sub.pattern);
                if (subs != null) subs.remove(sub);
            }
    
            sub.notifyShutdown();
            log.info("Subscription {} removed", subscriptionId);
        }
    
        // ==================== Stats ====================
    
        @Override
        public EventBusStats getStats() {
            return new EventBusStats(
                publishedCount.get(),
                deliveredCount.get(),
                failedCount.get(),
                subsIndex.size(),
                channelSubs.size(),
                startedAt
            );
        }
    
        @Override
        public Map<String, ChannelStats> getChannelStats() {
            Map<String, ChannelStats> stats = new HashMap<>();
    
            for (Map.Entry<String, Set<SubscriptionImpl<?>>> entry : channelSubs.entrySet()) {
                stats.put(entry.getKey(), new ChannelStats(
                    entry.getKey(),
                    0, // TODO: compteur par channel
                    entry.getValue().size(),
                    0
                ));
            }
    
            return stats;
        }
    
        // ==================== Subscription Implementation ====================
    
        private static class SubscriptionImpl<T> implements Subscription<T> {
    
            private final String id;
            private final String channelOrPattern;
            private final Pattern pattern;
            private final BlockingQueue<BusEvent<T>> queue;
            private final InMemoryEventBus bus;
    
            SubscriptionImpl(String id, String channelOrPattern, Pattern pattern, InMemoryEventBus bus) {
                this.id = id;
                this.channelOrPattern = channelOrPattern;
                this.pattern = pattern;
                this.queue = new LinkedBlockingQueue<>(10_000); // Capacite par defaut
                this.bus = bus;
            }
    
            @Override
            public String getId() {
                return id;
            }
    
            @Override
            public String getChannelOrPattern() {
                return channelOrPattern;
            }
    
            @Override
            public BlockingQueue<BusEvent<T>> getQueue() {
                return queue;
            }
    
            @Override
            public void unsubscribe() {
                bus.unsubscribe(id);
            }
    
            @Override
            public int getPendingCount() {
                return queue.size();
            }
    
            void notifyShutdown() {
                // Injecter un poison pill pour debloquer les take()
                try {
                    queue.offer(new BusEvent<>(null, null, null, null, null, null));
                } catch (Exception ignored) {}
            }
        }
    }
    

    5.2 RedisEventBus

    Implementation pour multi-instances (production).

    package eu.lmvi.socle.event.impl;
    
    @Component
    @ConditionalOnProperty(name = "socle.eventbus.mode", havingValue = "redis")
    public class RedisEventBus implements EventBus {
    
        private final JedisPool jedisPool;
        private final ObjectMapper mapper;
        private final String prefix;
        private final ExecutorService listenerExecutor;
    
        // Subscriptions locales (une queue locale par subscription)
        private final Map<String, LocalSubscription<?>> localSubs = new ConcurrentHashMap<>();
    
        // JedisPubSub par channel (partage entre subscriptions locales)
        private final Map<String, RedisPubSubListener> redisListeners = new ConcurrentHashMap<>();
    
        // ... Implementation similaire avec Redis Pub/Sub ...
    
        @Override
        public void publish(String channel, Object payload) {
            try (Jedis jedis = jedisPool.getResource()) {
                BusEvent<?> event = BusEvent.of(channel, payload);
                String json = mapper.writeValueAsString(event);
                jedis.publish(prefix + channel, json);
            } catch (Exception e) {
                log.error("Redis publish error: {}", e.getMessage());
            }
        }
    
        // Redis SUBSCRIBE dans un thread dedie
        // Dispatch vers les queues locales
    }
    

    6. AbstractPushWorker

    6.1 Definition

    package eu.lmvi.socle.worker.push;
    
    /**
     * Worker reveille instantanement par l'EventBus.
     *
     * <p>Contrairement a {@link AbstractEventDrivenWorker} (DEPRECIE) qui utilise
     * un polling avec timeout, ce worker utilise un mecanisme push avec reveil
     * instantane via {@link BlockingQueue#take()}.</p>
     *
     * <h3>Avantages</h3>
     * <ul>
     *   <li>Latence ~0ms (reveil instantane)</li>
     *   <li>Zero CPU quand idle</li>
     *   <li>Integration native avec EventBus</li>
     * </ul>
     *
     * <h3>Exemple d'utilisation</h3>
     * <pre>{@code
     * @Component
     * public class OrderWorker extends AbstractPushWorker<OrderEvent> {
     *
     *     public OrderWorker() {
     *         super("orders.created", "orders.updated");
     *     }
     *
     *     @Override
     *     public String getName() {
     *         return "order_worker";
     *     }
     *
     *     @Override
     *     protected Class<OrderEvent> getEventType() {
     *         return OrderEvent.class;
     *     }
     *
     *     @Override
     *     protected void processEvent(OrderEvent event) {
     *         // Traitement instantane des que l'event arrive
     *     }
     * }
     * }</pre>
     *
     * @param <T> Type du payload des evenements
     */
    public abstract class AbstractPushWorker<T> implements Worker {
    
        protected final Logger logger = LoggerFactory.getLogger(getClass());
    
        // ==================== Injection ====================
    
        @Autowired
        protected EventBus eventBus;
    
        // ==================== Configuration ====================
    
        private final String[] channels;
        private final int concurrency;
        private final int queueCapacity;
        private final Duration shutdownTimeout;
    
        // ==================== Etat ====================
    
        protected final AtomicBoolean running = new AtomicBoolean(false);
        protected ExecutorService executor;
        protected List<Subscription<T>> subscriptions = new ArrayList<>();
        protected BlockingQueue<BusEvent<T>> mergedQueue;
    
        // ==================== Metriques ====================
    
        protected final AtomicLong processedCount = new AtomicLong(0);
        protected final AtomicLong errorCount = new AtomicLong(0);
        protected final AtomicLong totalDurationNs = new AtomicLong(0);
        protected volatile Instant lastExecution;
        protected volatile Instant startedAt;
    
        // ==================== Constructeurs ====================
    
        /**
         * Cree un worker push mono-thread.
         *
         * @param channels Channels a ecouter
         */
        protected AbstractPushWorker(String... channels) {
            this(1, channels);
        }
    
        /**
         * Cree un worker push avec concurrence.
         *
         * @param concurrency Nombre de threads de traitement
         * @param channels    Channels a ecouter
         */
        protected AbstractPushWorker(int concurrency, String... channels) {
            this(concurrency, 10_000, Duration.ofSeconds(30), channels);
        }
    
        /**
         * Cree un worker push avec configuration complete.
         *
         * @param concurrency     Nombre de threads de traitement
         * @param queueCapacity   Capacite de la queue interne
         * @param shutdownTimeout Timeout d'arret gracieux
         * @param channels        Channels a ecouter
         */
        protected AbstractPushWorker(int concurrency, int queueCapacity,
                                      Duration shutdownTimeout, String... channels) {
            this.concurrency = Math.max(1, concurrency);
            this.queueCapacity = queueCapacity;
            this.shutdownTimeout = shutdownTimeout;
            this.channels = channels;
    
            if (channels == null || channels.length == 0) {
                throw new IllegalArgumentException("At least one channel is required");
            }
        }
    
        // ==================== Worker Interface ====================
    
        @Override
        public final String getSchedule() {
            return "PUSH";
        }
    
        @Override
        public final long getCycleIntervalMs() {
            return -1;
        }
    
        @Override
        public final boolean isPassive() {
            return true;
        }
    
        @Override
        public int getStartPriority() {
            return 200;  // Apres l'EventBus (100)
        }
    
        @Override
        public int getStopPriority() {
            return 200;  // Avant l'EventBus
        }
    
        @Override
        public void initialize() {
            logger.info("[{}] Initializing push worker", getName());
            logger.info("[{}] Channels: {}", getName(), Arrays.toString(channels));
            logger.info("[{}] Concurrency: {}", getName(), concurrency);
    
            // Queue fusionnee pour tous les channels
            mergedQueue = new LinkedBlockingQueue<>(queueCapacity);
    
            onInitialize();
        }
    
        @Override
        public void start() {
            if (running.getAndSet(true)) {
                logger.warn("[{}] Already running", getName());
                return;
            }
    
            startedAt = Instant.now();
    
            // S'abonner aux channels
            for (String channel : channels) {
                if (channel.contains("*") || channel.contains(">")) {
                    // Pattern subscription
                    Subscription<T> sub = eventBus.subscribePattern(channel, getEventType());
                    subscriptions.add(sub);
                    startForwarder(sub);
                } else {
                    // Exact subscription
                    Subscription<T> sub = eventBus.subscribe(channel, getEventType());
                    subscriptions.add(sub);
                    startForwarder(sub);
                }
            }
    
            // Demarrer les workers de traitement
            executor = Executors.newThreadPerTaskExecutor(
                Thread.ofVirtual().name(getName() + "-push-", 0).factory()
            );
    
            for (int i = 0; i < concurrency; i++) {
                final int workerId = i;
                executor.submit(() -> pushLoop(workerId));
            }
    
            onStarted();
            logger.info("[{}] Started with {} workers on {} channels",
                getName(), concurrency, channels.length);
        }
    
        /**
         * Demarre un thread qui forward les events d'une subscription vers la queue fusionnee.
         */
        private void startForwarder(Subscription<T> sub) {
            Thread.ofVirtual()
                .name(getName() + "-fwd-" + sub.getId())
                .start(() -> {
                    while (running.get()) {
                        try {
                            BusEvent<T> event = sub.getQueue().take();
    
                            // Verifier poison pill
                            if (event.id() == null) break;
    
                            // Forward vers la queue fusionnee
                            if (!mergedQueue.offer(event, 1, TimeUnit.SECONDS)) {
                                logger.warn("[{}] Merged queue full, event dropped", getName());
                            }
                        } catch (InterruptedException e) {
                            Thread.currentThread().interrupt();
                            break;
                        }
                    }
                });
        }
    
        @Override
        public void stop() {
            if (!running.getAndSet(false)) {
                logger.warn("[{}] Not running", getName());
                return;
            }
    
            logger.info("[{}] Stopping...", getName());
            onStopping();
    
            // Se desabonner (injecte des poison pills)
            for (Subscription<T> sub : subscriptions) {
                try {
                    sub.unsubscribe();
                } catch (Exception e) {
                    logger.warn("[{}] Error unsubscribing: {}", getName(), e.getMessage());
                }
            }
            subscriptions.clear();
    
            // Injecter poison pills dans la queue fusionnee
            for (int i = 0; i < concurrency; i++) {
                mergedQueue.offer(new BusEvent<>(null, null, null, null, null, null));
            }
    
            // Attendre la fin des workers
            if (executor != null) {
                executor.shutdown();
                try {
                    if (!executor.awaitTermination(shutdownTimeout.toMillis(), TimeUnit.MILLISECONDS)) {
                        logger.warn("[{}] Forcing shutdown", getName());
                        executor.shutdownNow();
                    }
                } catch (InterruptedException e) {
                    executor.shutdownNow();
                    Thread.currentThread().interrupt();
                }
            }
    
            onStopped();
    
            long uptime = Duration.between(startedAt, Instant.now()).toSeconds();
            logger.info("[{}] Stopped after {}s. Processed={}, Errors={}",
                getName(), uptime, processedCount.get(), errorCount.get());
        }
    
        @Override
        public void doWork() {
            // Non utilise en mode PUSH
        }
    
        @Override
        public boolean isHealthy() {
            return running.get() && executor != null && !executor.isShutdown();
        }
    
        @Override
        public Map<String, Object> getStats() {
            long uptime = startedAt != null
                ? Duration.between(startedAt, Instant.now()).toMillis()
                : 0;
    
            double avgDurationMs = processedCount.get() > 0
                ? (totalDurationNs.get() / 1_000_000.0) / processedCount.get()
                : 0;
    
            double throughput = uptime > 0
                ? processedCount.get() * 1000.0 / uptime
                : 0;
    
            Map<String, Object> stats = new HashMap<>();
    
            // Cles standardisees
            stats.put("state", running.get() ? "running" : "stopped");
            stats.put("schedule", "PUSH");
            stats.put("execution_count", processedCount.get());
            stats.put("errors_count", errorCount.get());
            stats.put("last_execution", lastExecution != null ? lastExecution.toString() : null);
    
            // Cles specifiques
            stats.put("channels", channels);
            stats.put("concurrency", concurrency);
            stats.put("queue_size", mergedQueue != null ? mergedQueue.size() : 0);
            stats.put("queue_capacity", queueCapacity);
            stats.put("uptime_ms", uptime);
            stats.put("avg_duration_ms", Math.round(avgDurationMs * 100) / 100.0);
            stats.put("throughput_per_sec", Math.round(throughput * 100) / 100.0);
    
            // Stats des subscriptions
            stats.put("subscriptions", subscriptions.stream()
                .map(s -> Map.of(
                    "id", s.getId(),
                    "channel", s.getChannelOrPattern(),
                    "pending", s.getPendingCount()
                ))
                .toList());
    
            return stats;
        }
    
        // ==================== Push Loop ====================
    
        private void pushLoop(int workerId) {
            logger.debug("[{}] Push loop #{} started", getName(), workerId);
    
            while (running.get()) {
                try {
                    // BLOQUE jusqu'a reception d'un event (reveil instantane)
                    BusEvent<T> event = mergedQueue.take();
    
                    // Verifier poison pill
                    if (event.id() == null) {
                        logger.debug("[{}] Push loop #{} received shutdown signal", getName(), workerId);
                        break;
                    }
    
                    // Traiter l'event
                    processEventSafe(event, workerId);
    
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    logger.debug("[{}] Push loop #{} interrupted", getName(), workerId);
                    break;
                } catch (Exception e) {
                    logger.error("[{}] Unexpected error in push loop #{}: {}",
                        getName(), workerId, e.getMessage(), e);
                }
            }
    
            logger.debug("[{}] Push loop #{} stopped", getName(), workerId);
        }
    
        private void processEventSafe(BusEvent<T> event, int workerId) {
            long startNs = System.nanoTime();
    
            try {
                processEvent(event.payload(), event);
                processedCount.incrementAndGet();
            } catch (Exception e) {
                errorCount.incrementAndGet();
                handleError(event, e, workerId);
            } finally {
                long durationNs = System.nanoTime() - startNs;
                totalDurationNs.addAndGet(durationNs);
                lastExecution = Instant.now();
            }
        }
    
        // ==================== Methodes abstraites ====================
    
        /**
         * Type du payload des evenements.
         * Utilise pour le typage de la souscription.
         */
        protected abstract Class<T> getEventType();
    
        /**
         * Traite un evenement.
         *
         * @param payload Contenu de l'evenement
         * @param event   Evenement complet (avec metadata)
         */
        protected abstract void processEvent(T payload, BusEvent<T> event);
    
        /**
         * Version simplifiee sans acces a l'enveloppe.
         * Surcharger processEvent(T, BusEvent) pour avoir acces aux metadata.
         */
        protected void processEvent(T payload) {
            // Par defaut, appelle la version complete
        }
    
        // ==================== Hooks ====================
    
        /**
         * Hook d'initialisation.
         */
        protected void onInitialize() {}
    
        /**
         * Hook post-demarrage.
         */
        protected void onStarted() {}
    
        /**
         * Hook pre-arret.
         */
        protected void onStopping() {}
    
        /**
         * Hook post-arret.
         */
        protected void onStopped() {}
    
        /**
         * Gestion des erreurs.
         * Par defaut, log l'erreur. Surcharger pour DLQ, retry, etc.
         */
        protected void handleError(BusEvent<T> event, Exception e, int workerId) {
            logger.error("[{}] Worker #{} error processing event {}: {}",
                getName(), workerId, event.id(), e.getMessage(), e);
        }
    
        // ==================== Utilitaires ====================
    
        /**
         * Publie un evenement sur l'EventBus.
         * Raccourci pratique pour les workers qui produisent aussi des events.
         */
        protected void publish(String channel, Object payload) {
            eventBus.publish(channel, payload);
        }
    
        /**
         * Retourne le nombre d'evenements en attente.
         */
        public long getBacklog() {
            int pending = mergedQueue != null ? mergedQueue.size() : 0;
            for (Subscription<T> sub : subscriptions) {
                pending += sub.getPendingCount();
            }
            return pending;
        }
    }
    

    6.2 Version simplifiee (single channel)

    /**
     * Version simplifiee pour un seul channel.
     */
    public abstract class SimplePushWorker<T> extends AbstractPushWorker<T> {
    
        protected SimplePushWorker(String channel) {
            super(channel);
        }
    
        protected SimplePushWorker(int concurrency, String channel) {
            super(concurrency, channel);
        }
    
        @Override
        protected void processEvent(T payload, BusEvent<T> event) {
            processEvent(payload);
        }
    
        /**
         * Traite le payload directement.
         */
        protected abstract void processEvent(T payload);
    }
    

    7. Configuration

    7.1 application.yml

    socle:
      eventbus:
        # Activer l'EventBus
        enabled: ${EVENTBUS_ENABLED:true}
    
        # Mode: in_memory | redis
        mode: ${EVENTBUS_MODE:in_memory}
    
        # Configuration commune
        common:
          # Capacite par defaut des queues de subscription
          default_queue_capacity: ${EVENTBUS_QUEUE_CAPACITY:10000}
    
          # Timeout pour offer() quand queue pleine
          offer_timeout_ms: ${EVENTBUS_OFFER_TIMEOUT:100}
    
        # Configuration InMemory
        in_memory:
          # Rien de special
    
        # Configuration Redis
        redis:
          host: ${REDIS_HOST:localhost}
          port: ${REDIS_PORT:6379}
          password: ${REDIS_PASSWORD:}
          database: ${REDIS_DATABASE:0}
          prefix: ${EVENTBUS_REDIS_PREFIX:socle:eventbus}
    
          # Pool de connexions
          pool:
            max_total: ${EVENTBUS_REDIS_POOL_MAX:16}
            max_idle: ${EVENTBUS_REDIS_POOL_MAX_IDLE:8}
            min_idle: ${EVENTBUS_REDIS_POOL_MIN_IDLE:2}
    
        # Metriques
        metrics:
          enabled: ${EVENTBUS_METRICS_ENABLED:true}
          prefix: socle_eventbus
    

    7.2 Variables d’environnement

    Variable Description Defaut
    EVENTBUS_ENABLED Activer l’EventBus true
    EVENTBUS_MODE Mode (in_memory / redis) in_memory
    EVENTBUS_QUEUE_CAPACITY Capacite des queues 10000
    REDIS_HOST Host Redis localhost
    REDIS_PORT Port Redis 6379

    8. Exemples d’utilisation

    8.1 Worker simple

    @Component
    public class OrderNotificationWorker extends SimplePushWorker<OrderEvent> {
    
        @Autowired
        private NotificationService notificationService;
    
        public OrderNotificationWorker() {
            super(2, "orders.created");  // 2 workers concurrents
        }
    
        @Override
        public String getName() {
            return "order_notification_worker";
        }
    
        @Override
        protected Class<OrderEvent> getEventType() {
            return OrderEvent.class;
        }
    
        @Override
        protected void processEvent(OrderEvent event) {
            notificationService.notifyOrderCreated(event.orderId(), event.customerEmail());
        }
    }
    

    8.2 Worker multi-channels

    @Component
    public class AuditWorker extends AbstractPushWorker<Object> {
    
        @Autowired
        private AuditRepository auditRepository;
    
        public AuditWorker() {
            super(4, "orders.*", "payments.*", "users.*");  // Pattern matching
        }
    
        @Override
        public String getName() {
            return "audit_worker";
        }
    
        @Override
        protected Class<Object> getEventType() {
            return Object.class;  // Accepte tout
        }
    
        @Override
        protected void processEvent(Object payload, BusEvent<Object> event) {
            // Acces aux metadata
            AuditEntry entry = new AuditEntry(
                event.id(),
                event.channel(),
                event.timestamp(),
                event.source(),
                payload
            );
            auditRepository.save(entry);
        }
    }
    

    8.3 Publication d’evenements

    @Service
    public class OrderService {
    
        @Autowired
        private EventBus eventBus;
    
        @Autowired
        private OrderRepository orderRepository;
    
        public Order createOrder(CreateOrderRequest request) {
            // Logique metier
            Order order = orderRepository.save(new Order(request));
    
            // Publication sur l'EventBus
            // Tous les workers abonnes a "orders.created" sont reveilles instantanement
            eventBus.publish("orders.created", new OrderEvent(
                order.getId(),
                order.getCustomerEmail(),
                order.getTotal()
            ));
    
            return order;
        }
    
        public void updateOrderStatus(String orderId, OrderStatus newStatus) {
            Order order = orderRepository.updateStatus(orderId, newStatus);
    
            eventBus.publish("orders.updated", new OrderStatusEvent(
                orderId,
                newStatus
            ));
        }
    }
    

    8.4 Worker avec gestion d’erreur personnalisee

    @Component
    public class PaymentWorker extends AbstractPushWorker<PaymentEvent> {
    
        @Autowired
        private PaymentService paymentService;
    
        @Autowired
        private DeadLetterQueue dlq;
    
        public PaymentWorker() {
            super(2, "payments.process");
        }
    
        @Override
        public String getName() {
            return "payment_worker";
        }
    
        @Override
        protected Class<PaymentEvent> getEventType() {
            return PaymentEvent.class;
        }
    
        @Override
        protected void processEvent(PaymentEvent payload, BusEvent<PaymentEvent> event) {
            paymentService.processPayment(payload);
        }
    
        @Override
        protected void handleError(BusEvent<PaymentEvent> event, Exception e, int workerId) {
            logger.error("[{}] Payment processing failed for {}: {}",
                getName(), event.payload().paymentId(), e.getMessage());
    
            // Envoyer en DLQ pour retry ulterieur
            dlq.send("payments.dlq", event, e);
        }
    }
    

    9. Solutions aux questions ouvertes

    9.1 Arret propre des workers en take()

    Probleme : BlockingQueue.take() bloque indefiniment. Comment arreter proprement ?

    Solution : Poison Pill Pattern

    // A l'arret, injecter un evenement "poison" reconnaissable
    @Override
    public void stop() {
        running.set(false);
    
        // Injecter N poison pills (un par worker)
        for (int i = 0; i < concurrency; i++) {
            queue.offer(new BusEvent<>(null, null, null, null, null, null));
        }
    }
    
    // Dans la boucle, verifier le poison
    private void pushLoop(int workerId) {
        while (running.get()) {
            BusEvent<T> event = queue.take();
    
            // Detecter le poison pill (id == null)
            if (event.id() == null) {
                break;  // Sortir proprement
            }
    
            processEvent(event);
        }
    }
    

    9.2 Backpressure (queue bornee vs illimitee)

    Probleme : Que faire si les events arrivent plus vite qu’ils ne sont traites ?

    Solution : Queue bornee avec politique de debordement

    // Queue bornee (recommande)
    BlockingQueue<BusEvent<T>> queue = new LinkedBlockingQueue<>(10_000);
    
    // Lors du publish, gerer le debordement
    private boolean deliver(Subscription sub, BusEvent event) {
        // offer() avec timeout - ne bloque pas indefiniment
        boolean offered = queue.offer(event, 100, TimeUnit.MILLISECONDS);
    
        if (!offered) {
            // Politique de debordement:
            // Option 1: Drop (log + metrique)
            log.warn("Queue full, event dropped");
            droppedCount.incrementAndGet();
    
            // Option 2: Envoyer en DLQ
            dlq.send("overflow", event);
    
            // Option 3: Alerter
            alertService.queueOverflow(sub.getId());
        }
    
        return offered;
    }
    

    Metriques a surveiller :

    • socle_eventbus_queue_size : Taille actuelle
    • socle_eventbus_queue_capacity : Capacite max
    • socle_eventbus_dropped_total : Events perdus

    9.3 Multi-channel avec une seule queue

    Probleme : Un worker ecoute plusieurs channels, comment fusionner ?

    Solution : Queue fusionnee avec forwarders

    // Queue fusionnee
    BlockingQueue<BusEvent<T>> mergedQueue = new LinkedBlockingQueue<>();
    
    // Un forwarder par subscription
    for (String channel : channels) {
        Subscription<T> sub = eventBus.subscribe(channel, eventType);
    
        // Thread qui forward vers la queue fusionnee
        Thread.ofVirtual().start(() -> {
            while (running.get()) {
                BusEvent<T> event = sub.getQueue().take();
                mergedQueue.offer(event);
            }
        });
    }
    
    // Les workers consomment la queue fusionnee
    void pushLoop() {
        while (running.get()) {
            BusEvent<T> event = mergedQueue.take();
            // event.channel() permet de savoir d'ou il vient
            processEvent(event);
        }
    }
    

    9.4 Ordre des evenements

    Probleme : Les evenements doivent-ils etre traites dans l’ordre ?

    Solution : Depends du cas d’usage

    Mode Garantie Implementation
    Aucune Pas d’ordre garanti Concurrency > 1
    Par channel Ordre par channel Concurrency = 1 par channel
    Par cle Ordre par cle metier Partitioning sur la cle
    // Pour garantir l'ordre par cle (ex: par orderId)
    public class OrderedPushWorker extends AbstractPushWorker<OrderEvent> {
    
        // Une queue par partition (hash de la cle)
        private final BlockingQueue<BusEvent<OrderEvent>>[] partitionQueues;
        private final int partitions = 8;
    
        @Override
        protected void processEvent(OrderEvent payload, BusEvent<OrderEvent> event) {
            // Le hash garantit que les events du meme orderId vont dans la meme partition
            int partition = Math.abs(payload.orderId().hashCode()) % partitions;
            partitionQueues[partition].offer(event);
        }
    
        // Un worker par partition = ordre garanti par partition
    }
    

    9.5 Retry et DLQ

    Probleme : Que faire si le traitement echoue ?

    Solution : Retry avec backoff puis DLQ

    @Component
    public class ResilientPushWorker extends AbstractPushWorker<MyEvent> {
    
        @Autowired
        private EventBus eventBus;
    
        private static final int MAX_RETRIES = 3;
    
        @Override
        protected void processEvent(MyEvent payload, BusEvent<MyEvent> event) {
            int retryCount = getRetryCount(event);
    
            try {
                doProcess(payload);
            } catch (RetryableException e) {
                if (retryCount < MAX_RETRIES) {
                    // Re-publier avec retry count incremente
                    scheduleRetry(event, retryCount + 1);
                } else {
                    // Max retries atteint -> DLQ
                    sendToDlq(event, e);
                }
            }
        }
    
        private void scheduleRetry(BusEvent<MyEvent> event, int retryCount) {
            // Backoff exponentiel
            long delayMs = (long) Math.pow(2, retryCount) * 1000;
    
            // Re-publier apres le delai
            scheduler.schedule(() -> {
                Map<String, String> metadata = new HashMap<>(event.metadata());
                metadata.put("retry_count", String.valueOf(retryCount));
                eventBus.publish(event.channel(), event.payload(), metadata);
            }, delayMs, TimeUnit.MILLISECONDS);
        }
    
        private int getRetryCount(BusEvent<MyEvent> event) {
            return Integer.parseInt(event.metadata().getOrDefault("retry_count", "0"));
        }
    
        private void sendToDlq(BusEvent<MyEvent> event, Exception e) {
            eventBus.publish("dlq." + event.channel(), new DlqEntry(event, e));
        }
    }
    

    10. Migration des workers existants

    10.1 Avant (AbstractEventDrivenWorker)

    // DEPRECIE
    @Component
    public class OldOrderWorker extends AbstractEventDrivenWorker<OrderEvent> {
    
        private final BlockingQueue<OrderEvent> queue = new LinkedBlockingQueue<>();
    
        public OldOrderWorker() {
            super(4);
        }
    
        @Override
        public String getName() {
            return "order_worker";
        }
    
        @Override
        protected OrderEvent pollEvent() throws InterruptedException {
            return queue.poll(100, TimeUnit.MILLISECONDS);  // Polling!
        }
    
        @Override
        protected void processEvent(OrderEvent event) {
            // Traitement
        }
    
        // Methode externe pour recevoir les events
        public void onOrderReceived(OrderEvent event) {
            queue.offer(event);
        }
    }
    

    10.2 Apres (AbstractPushWorker)

    // RECOMMANDE
    @Component
    public class NewOrderWorker extends AbstractPushWorker<OrderEvent> {
    
        public NewOrderWorker() {
            super(4, "orders.created");  // Specifie le channel
        }
    
        @Override
        public String getName() {
            return "order_worker";
        }
    
        @Override
        protected Class<OrderEvent> getEventType() {
            return OrderEvent.class;
        }
    
        @Override
        protected void processEvent(OrderEvent payload, BusEvent<OrderEvent> event) {
            // Meme traitement
        }
    
        // Plus besoin de methode externe - l'EventBus gere tout
    }
    

    10.3 Etapes de migration

    1. Ajouter la dependance EventBus au worker
    2. Changer la classe parente : AbstractEventDrivenWorkerAbstractPushWorker
    3. Specifier les channels dans le constructeur
    4. Implementer getEventType()
    5. Adapter processEvent() pour accepter BusEvent
    6. Supprimer la queue interne et pollEvent()
    7. Modifier les producteurs pour utiliser eventBus.publish()
    8. Tester le comportement

    11. Metriques Prometheus

    # EventBus
    socle_eventbus_published_total{channel="orders.created"}
    socle_eventbus_delivered_total{channel="orders.created"}
    socle_eventbus_failed_total{channel="orders.created"}
    socle_eventbus_subscriptions_active{channel="orders.created"}
    socle_eventbus_queue_size{subscription="sub-123"}
    
    # Push Workers
    socle_push_worker_processed_total{worker="order_worker"}
    socle_push_worker_errors_total{worker="order_worker"}
    socle_push_worker_queue_size{worker="order_worker"}
    socle_push_worker_duration_seconds{worker="order_worker", quantile="0.99"}
    

    12. API REST Admin

    GET  /admin/eventbus/stats              → Statistiques globales
    GET  /admin/eventbus/channels           → Liste des channels actifs
    GET  /admin/eventbus/subscriptions      → Liste des subscriptions
    
    POST /admin/eventbus/publish            → Publier un event (debug)
         Body: {"channel": "test", "payload": {...}}
    

    13. Voir aussi

    Socle V004 – EventBus et Workers Push

  • Test Article – WordPress Publisher System

    title: « Test Article – WordPress Publisher System » date: 2026-01-31T14:30:00 status: publish categories: [2, 3] tags: [5, 6, 11]

    Test Article – WordPress Publisher System

    This is a test article to demonstrate the WordPress Publisher system integration with the Socle V004 framework.

    Introduction

    The WordPress Publisher is an automated content publishing system that:

    • Monitors a local Publications folder for Markdown files
    • Automatically parses and converts Markdown to HTML
    • Publishes content to WordPress via REST API
    • Manages featured images and metadata
    • Tracks changes and updates existing content

    Features

    Automated Scanning

    The system continuously scans the Publications directory and detects new or modified Markdown files.

    Format Support

    • Markdown Parsing: Full Markdown syntax support with tables
    • HTML Sanitization: WordPress-safe HTML output
    • Frontmatter: YAML metadata for article configuration

    WordPress Integration

    • Direct REST API connection
    • Automatic slug generation
    • Category and tag assignment
    • Featured image management
    • Author configuration

    Implementation Details

    The system is built on:

    • Spring Boot 3.2.1 with Java 21
    • Socle V004 framework
    • Docker for containerization
    • Redis for caching (KvBus)
    • H2 Database for state management

    Testing

    This article demonstrates:

    1. ✅ Markdown file scanning
    2. ✅ YAML frontmatter parsing
    3. ✅ HTML conversion
    4. ✅ WordPress API publishing

    Conclusion

    The WordPress Publisher provides a seamless integration between local Markdown content and WordPress, enabling efficient content management workflows.

  • Socle V004 – Plugins

    Socle V004 – Plugins

    20 – Plugins

    Version : 4.0.0 Date : 2025-12-09

    1. Introduction

    Le Socle V4 supporte une architecture de plugins pour étendre les fonctionnalités de base. Les plugins sont des modules Spring Boot qui s’intègrent automatiquement.

    2. Architecture des plugins

    ┌──────────────────────────────────────────────────────────┐
    │                    Application                            │
    │                                                           │
    │  ┌─────────────────────────────────────────────────────┐ │
    │  │                   Socle V4 Core                      │ │
    │  │  MOP | Workers | KvBus | TechDB | Logging | etc.    │ │
    │  └─────────────────────────────────────────────────────┘ │
    │                          │                                │
    │         ┌────────────────┼────────────────┐              │
    │         ▼                ▼                ▼              │
    │  ┌────────────┐   ┌────────────┐   ┌────────────┐       │
    │  │   Plugin   │   │   Plugin   │   │   Plugin   │       │
    │  │   Kafka    │   │   NATS     │   │   Custom   │       │
    │  └────────────┘   └────────────┘   └────────────┘       │
    │                                                           │
    └──────────────────────────────────────────────────────────┘
    

    3. Créer un plugin

    3.1 Structure Maven

    <?xml version="1.0" encoding="UTF-8"?>
    <project>
        <modelVersion>4.0.0</modelVersion>
    
        <groupId>com.mycompany</groupId>
        <artifactId>socle-plugin-myplugin</artifactId>
        <version>1.0.0</version>
    
        <dependencies>
            <!-- Dépendance Socle -->
            <dependency>
                <groupId>eu.lmvi</groupId>
                <artifactId>socle-v004</artifactId>
                <version>4.0.0</version>
                <scope>provided</scope>
            </dependency>
        </dependencies>
    </project>
    

    3.2 Auto-configuration

    package com.mycompany.plugin;
    
    import org.springframework.boot.autoconfigure.AutoConfiguration;
    import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
    import org.springframework.context.annotation.ComponentScan;
    
    @AutoConfiguration
    @ConditionalOnProperty(name = "socle.plugins.myplugin.enabled", havingValue = "true")
    @ComponentScan(basePackages = "com.mycompany.plugin")
    public class MyPluginAutoConfiguration {
        // Configuration automatique
    }
    

    3.3 Fichier spring.factories

    # src/main/resources/META-INF/spring.factories
    org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
    com.mycompany.plugin.MyPluginAutoConfiguration
    

    Ou pour Spring Boot 3.x :

    # src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports
    com.mycompany.plugin.MyPluginAutoConfiguration
    

    4. Types de plugins

    4.1 Plugin Worker

    package com.mycompany.plugin.worker;
    
    import eu.lmvi.socle.worker.Worker;
    import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
    import org.springframework.stereotype.Component;
    
    @Component
    @ConditionalOnProperty(name = "socle.plugins.myplugin.enabled", havingValue = "true")
    public class MyPluginWorker implements Worker {
    
        @Override
        public String getName() {
            return "my-plugin-worker";
        }
    
        @Override
        public void initialize() {
            // Initialisation
        }
    
        @Override
        public void start() {
            // Démarrage
        }
    
        @Override
        public void doWork() {
            // Traitement
        }
    
        @Override
        public void stop() {
            // Arrêt
        }
    
        @Override
        public boolean isHealthy() {
            return true;
        }
    
        @Override
        public Map<String, Object> getStats() {
            return Map.of();
        }
    }
    

    4.2 Plugin KvBus

    package com.mycompany.plugin.kv;
    
    import eu.lmvi.socle.kv.KvBus;
    import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
    import org.springframework.stereotype.Component;
    
    @Component
    @ConditionalOnProperty(name = "socle.kvbus.mode", havingValue = "custom")
    public class CustomKvBus implements KvBus {
    
        @Override
        public void put(String key, String value) {
            // Implémentation custom
        }
    
        @Override
        public Optional<String> get(String key) {
            // Implémentation custom
            return Optional.empty();
        }
    
        // ... autres méthodes
    }
    

    4.3 Plugin Transport (LogForwarder)

    package com.mycompany.plugin.logging;
    
    import eu.lmvi.socle.logging.LogTransport;
    import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
    import org.springframework.stereotype.Component;
    
    @Component
    @ConditionalOnProperty(name = "socle.logging.forwarder.transport-mode", havingValue = "custom")
    public class CustomLogTransport implements LogTransport {
    
        @Override
        public void send(List<LogEntry> entries) throws Exception {
            // Envoyer les logs vers votre système
        }
    
        @Override
        public boolean isAvailable() {
            return true;
        }
    
        @Override
        public void close() {
            // Cleanup
        }
    }
    

    5. Plugin Kafka (exemple complet)

    5.1 Structure

    socle-plugin-kafka/
    ├── pom.xml
    ├── src/main/java/eu/lmvi/socle/plugin/kafka/
    │   ├── KafkaPluginAutoConfiguration.java
    │   ├── KafkaPluginConfiguration.java
    │   ├── KafkaConsumerWorker.java
    │   ├── KafkaProducerService.java
    │   └── KafkaHealthIndicator.java
    └── src/main/resources/
        └── META-INF/spring/
            └── org.springframework.boot.autoconfigure.AutoConfiguration.imports
    

    5.2 Configuration

    @ConfigurationProperties(prefix = "socle.plugins.kafka")
    public class KafkaPluginConfiguration {
        private boolean enabled = false;
        private String bootstrapServers = "localhost:9092";
        private String groupId = "socle-group";
        private List<String> topics = new ArrayList<>();
        private Map<String, String> consumerProperties = new HashMap<>();
        private Map<String, String> producerProperties = new HashMap<>();
    
        // Getters/Setters
    }
    

    5.3 Auto-configuration

    @AutoConfiguration
    @ConditionalOnProperty(name = "socle.plugins.kafka.enabled", havingValue = "true")
    @EnableConfigurationProperties(KafkaPluginConfiguration.class)
    @ComponentScan(basePackages = "eu.lmvi.socle.plugin.kafka")
    public class KafkaPluginAutoConfiguration {
    
        @Bean
        public KafkaConsumer<String, String> kafkaConsumer(KafkaPluginConfiguration config) {
            Properties props = new Properties();
            props.put("bootstrap.servers", config.getBootstrapServers());
            props.put("group.id", config.getGroupId());
            props.putAll(config.getConsumerProperties());
            return new KafkaConsumer<>(props);
        }
    
        @Bean
        public KafkaProducer<String, String> kafkaProducer(KafkaPluginConfiguration config) {
            Properties props = new Properties();
            props.put("bootstrap.servers", config.getBootstrapServers());
            props.putAll(config.getProducerProperties());
            return new KafkaProducer<>(props);
        }
    }
    

    5.4 Worker

    @Component
    @ConditionalOnProperty(name = "socle.plugins.kafka.enabled", havingValue = "true")
    public class KafkaConsumerWorker extends AbstractWorker {
    
        private final KafkaConsumer<String, String> consumer;
        private final KafkaPluginConfiguration config;
        private final TechDbManager techDb;
    
        @Override
        public String getName() {
            return "kafka-consumer-plugin";
        }
    
        @Override
        protected void doInitialize() {
            consumer.subscribe(config.getTopics());
        }
    
        @Override
        protected void doProcess() {
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
            for (ConsumerRecord<String, String> record : records) {
                processRecord(record);
            }
        }
    
        @Override
        protected void doStop() {
            consumer.close();
        }
    }
    

    5.5 Utilisation

    # application.yml
    socle:
      plugins:
        kafka:
          enabled: true
          bootstrap-servers: kafka:9092
          group-id: my-app
          topics:
            - orders
            - events
    

    6. Plugin NATS (exemple)

    6.1 Configuration

    @ConfigurationProperties(prefix = "socle.plugins.nats")
    public class NatsPluginConfiguration {
        private boolean enabled = false;
        private String url = "nats://localhost:4222";
        private List<String> subjects = new ArrayList<>();
        private String streamName;
        private String consumerName;
    }
    

    6.2 Worker

    @Component
    @ConditionalOnProperty(name = "socle.plugins.nats.enabled", havingValue = "true")
    public class NatsConsumerWorker extends AbstractWorker {
    
        private final NatsPluginConfiguration config;
        private Connection natsConnection;
        private JetStream jetStream;
    
        @Override
        protected void doInitialize() {
            natsConnection = Nats.connect(config.getUrl());
            jetStream = natsConnection.jetStream();
        }
    
        @Override
        protected void doProcess() {
            for (String subject : config.getSubjects()) {
                Message msg = jetStream.pullSubscribe(subject, config.getConsumerName())
                    .fetch(100, Duration.ofSeconds(1))
                    .stream()
                    .findFirst()
                    .orElse(null);
    
                if (msg != null) {
                    processMessage(msg);
                    msg.ack();
                }
            }
        }
    }
    

    7. Extension des APIs Admin

    7.1 Controller additionnel

    @RestController
    @RequestMapping("/admin/plugins/kafka")
    @ConditionalOnProperty(name = "socle.plugins.kafka.enabled", havingValue = "true")
    public class KafkaAdminController {
    
        @Autowired
        private KafkaConsumerWorker worker;
    
        @GetMapping("/status")
        public Map<String, Object> status() {
            return Map.of(
                "connected", worker.isHealthy(),
                "stats", worker.getStats()
            );
        }
    
        @GetMapping("/offsets")
        public Map<String, Long> offsets() {
            return worker.getCurrentOffsets();
        }
    
        @PostMapping("/seek/{topic}/{partition}/{offset}")
        public void seek(
                @PathVariable String topic,
                @PathVariable int partition,
                @PathVariable long offset) {
            worker.seekTo(topic, partition, offset);
        }
    }
    

    8. Métriques du plugin

    @Component
    @ConditionalOnProperty(name = "socle.plugins.kafka.enabled", havingValue = "true")
    public class KafkaPluginMetrics {
    
        private final Counter messagesReceived;
        private final Counter messagesProcessed;
        private final Timer processingTime;
    
        public KafkaPluginMetrics(MeterRegistry registry) {
            this.messagesReceived = Counter.builder("socle_kafka_messages_received_total")
                .description("Total Kafka messages received")
                .register(registry);
    
            this.messagesProcessed = Counter.builder("socle_kafka_messages_processed_total")
                .description("Total Kafka messages processed")
                .register(registry);
    
            this.processingTime = Timer.builder("socle_kafka_processing_duration_seconds")
                .description("Kafka message processing duration")
                .register(registry);
        }
    
        public void recordReceived() {
            messagesReceived.increment();
        }
    
        public void recordProcessed(Duration duration) {
            messagesProcessed.increment();
            processingTime.record(duration);
        }
    }
    

    9. Test du plugin

    @SpringBootTest
    @TestPropertySource(properties = {
        "socle.plugins.kafka.enabled=true",
        "socle.plugins.kafka.bootstrap-servers=localhost:9092"
    })
    class KafkaPluginTest {
    
        @Autowired
        private KafkaConsumerWorker worker;
    
        @Test
        void workerShouldBeRegistered() {
            assertNotNull(worker);
            assertEquals("kafka-consumer-plugin", worker.getName());
        }
    
        @Test
        void workerShouldStart() {
            worker.initialize();
            worker.start();
            assertTrue(worker.isHealthy());
        }
    }
    

    10. Publication du plugin

    10.1 Maven deploy

    <distributionManagement>
        <repository>
            <id>releases</id>
            <url>https://nexus.mycompany.com/repository/maven-releases/</url>
        </repository>
    </distributionManagement>
    
    mvn clean deploy
    

    10.2 Utilisation dans une application

    <dependency>
        <groupId>eu.lmvi</groupId>
        <artifactId>socle-plugin-kafka</artifactId>
        <version>1.0.0</version>
    </dependency>
    

    11. Bonnes pratiques

    DO

    • Utiliser @ConditionalOnProperty pour activer/désactiver
    • Exposer la configuration via @ConfigurationProperties
    • Implémenter des health indicators
    • Exposer des métriques
    • Documenter les options de configuration

    DON’T

    • Ne pas forcer l’activation par défaut
    • Ne pas dupliquer les fonctionnalités du core
    • Ne pas utiliser de dépendances en conflit avec le Socle
    • Ne pas bloquer le démarrage de l’application si le plugin échoue

    12. Références