Auteur/autrice : jmh

  • Socle V004 – Données Partagées

    Socle V004 – Données Partagées

    07 – SharedDataRegistry

    Version : 4.0.0 Date : 2025-12-09

    1. Introduction

    SharedDataRegistry est un registre centralisé pour partager des données entre Workers au sein d’une même instance. Il fournit des opérations atomiques et un système de niveaux de santé.

    Différence avec KvBus

    Aspect SharedDataRegistry KvBus
    Scope Intra-instance Optionnel inter-instances (Redis)
    Performance Ultra rapide (mémoire) Variable (réseau si Redis)
    Types Fortement typés Strings/JSON
    Health levels Oui Non
    Callbacks Oui Non

    2. Interface SharedDataRegistry

    package eu.lmvi.socle.shared;
    
    public interface SharedDataRegistry {
    
        // === Key-Value basique ===
    
        void put(String key, Object value);
        void put(String key, Object value, HealthLevel level);
        Optional<Object> get(String key);
        <T> Optional<T> get(String key, Class<T> type);
        void delete(String key);
        boolean exists(String key);
    
        // === Typed getters ===
    
        Optional<String> getString(String key);
        Optional<Integer> getInt(String key);
        Optional<Long> getLong(String key);
        Optional<Double> getDouble(String key);
        Optional<Boolean> getBoolean(String key);
    
        // === Sequences (compteurs atomiques) ===
    
        void createSequence(String key, long initialValue, HealthLevel level);
        long incrementSequence(String key);
        long incrementSequence(String key, long delta);
        long getSequence(String key);
        void setSequence(String key, long value);
    
        // === Lists ===
    
        <T> void addToList(String key, T item);
        <T> List<T> getList(String key, Class<T> type);
        void clearList(String key);
    
        // === Maps ===
    
        <V> void putInMap(String key, String mapKey, V value);
        <V> Optional<V> getFromMap(String key, String mapKey, Class<V> type);
        <V> Map<String, V> getMap(String key, Class<V> type);
        void removeFromMap(String key, String mapKey);
    
        // === Health ===
    
        HealthLevel getHealthLevel(String key);
        Map<String, HealthLevel> getAllHealthLevels();
        List<String> getUnhealthyKeys();
    
        // === Callbacks ===
    
        void registerCallback(String key, Consumer<Object> callback);
        void unregisterCallback(String key);
    
        // === Introspection ===
    
        Set<String> keys();
        Set<String> keys(String pattern);
        Map<String, Object> getAll();
        int size();
        void clear();
    }
    

    3. Health Levels

    package eu.lmvi.socle.shared;
    
    public enum HealthLevel {
        /**
         * Informatif - pas d'impact sur la santé
         */
        INFO,
    
        /**
         * Normal - contribue à la santé normale
         */
        NORMAL,
    
        /**
         * Important - dégradation si problème
         */
        IMPORTANT,
    
        /**
         * Critique - unhealthy si problème
         */
        CRITICAL
    }
    

    Utilisation dans le Supervisor

    Le Supervisor consulte les HealthLevel pour déterminer l’état de santé global :

    • CRITICAL absent ou invalide → Instance UNHEALTHY
    • IMPORTANT absent ou invalide → Instance DEGRADED
    • NORMAL/INFO → Pas d’impact

    4. Implémentation

    package eu.lmvi.socle.shared;
    
    @Component
    public class InMemorySharedDataRegistry implements SharedDataRegistry {
    
        private final ConcurrentHashMap<String, Entry> store = new ConcurrentHashMap<>();
        private final ConcurrentHashMap<String, Consumer<Object>> callbacks = new ConcurrentHashMap<>();
    
        @Override
        public void put(String key, Object value) {
            put(key, value, HealthLevel.NORMAL);
        }
    
        @Override
        public void put(String key, Object value, HealthLevel level) {
            Entry previous = store.put(key, new Entry(value, level));
            notifyCallback(key, value);
        }
    
        @Override
        public Optional<Object> get(String key) {
            Entry entry = store.get(key);
            return entry != null ? Optional.of(entry.value) : Optional.empty();
        }
    
        @Override
        public <T> Optional<T> get(String key, Class<T> type) {
            return get(key).filter(type::isInstance).map(type::cast);
        }
    
        @Override
        public Optional<String> getString(String key) {
            return get(key).map(Object::toString);
        }
    
        @Override
        public Optional<Integer> getInt(String key) {
            return get(key, Number.class).map(Number::intValue);
        }
    
        @Override
        public Optional<Long> getLong(String key) {
            return get(key, Number.class).map(Number::longValue);
        }
    
        @Override
        public void createSequence(String key, long initialValue, HealthLevel level) {
            store.put(key, new Entry(new AtomicLong(initialValue), level));
        }
    
        @Override
        public long incrementSequence(String key) {
            return incrementSequence(key, 1);
        }
    
        @Override
        public long incrementSequence(String key, long delta) {
            Entry entry = store.get(key);
            if (entry == null || !(entry.value instanceof AtomicLong)) {
                throw new IllegalStateException("Sequence not found: " + key);
            }
            long newValue = ((AtomicLong) entry.value).addAndGet(delta);
            notifyCallback(key, newValue);
            return newValue;
        }
    
        @Override
        public long getSequence(String key) {
            Entry entry = store.get(key);
            if (entry == null || !(entry.value instanceof AtomicLong)) {
                throw new IllegalStateException("Sequence not found: " + key);
            }
            return ((AtomicLong) entry.value).get();
        }
    
        @Override
        public HealthLevel getHealthLevel(String key) {
            Entry entry = store.get(key);
            return entry != null ? entry.level : null;
        }
    
        @Override
        public List<String> getUnhealthyKeys() {
            return store.entrySet().stream()
                .filter(e -> e.getValue().level == HealthLevel.CRITICAL)
                .filter(e -> !isValueHealthy(e.getValue().value))
                .map(Map.Entry::getKey)
                .toList();
        }
    
        @Override
        public void registerCallback(String key, Consumer<Object> callback) {
            callbacks.put(key, callback);
        }
    
        private void notifyCallback(String key, Object value) {
            Consumer<Object> callback = callbacks.get(key);
            if (callback != null) {
                try {
                    callback.accept(value);
                } catch (Exception e) {
                    // Log but don't propagate
                }
            }
        }
    
        private boolean isValueHealthy(Object value) {
            if (value == null) return false;
            if (value instanceof Boolean b) return b;
            if (value instanceof Number n) return n.doubleValue() >= 0;
            return true;
        }
    
        private record Entry(Object value, HealthLevel level) {}
    }
    

    5. Utilisation

    5.1 Injection

    @Service
    public class MonService {
    
        @Autowired
        private SharedDataRegistry registry;
    
        public void process() {
            // ...
        }
    }
    

    5.2 Key-Value simple

    // Stocker
    registry.put("config.maxRetries", 3);
    registry.put("status.lastSync", Instant.now().toString());
    
    // Récupérer
    int maxRetries = registry.getInt("config.maxRetries").orElse(5);
    String lastSync = registry.getString("status.lastSync").orElse("never");
    

    5.3 Avec Health Level

    // Donnée critique - instance unhealthy si absente
    registry.put("database.connected", true, HealthLevel.CRITICAL);
    
    // Donnée importante - instance degraded si absente
    registry.put("cache.available", true, HealthLevel.IMPORTANT);
    
    // Donnée normale
    registry.put("stats.requestsTotal", 0, HealthLevel.NORMAL);
    
    // Donnée informative
    registry.put("info.startTime", Instant.now(), HealthLevel.INFO);
    

    5.4 Sequences (Compteurs)

    // Créer une séquence
    registry.createSequence("orders.processed", 0, HealthLevel.NORMAL);
    
    // Incrémenter
    long count = registry.incrementSequence("orders.processed");
    log.info("Processed order #{}", count);
    
    // Incrémenter avec delta
    long bytes = registry.incrementSequence("bytes.transferred", 1024);
    
    // Lire
    long total = registry.getSequence("orders.processed");
    

    5.5 Listes

    // Ajouter à une liste
    registry.addToList("errors.recent", new ErrorRecord("timeout", Instant.now()));
    registry.addToList("errors.recent", new ErrorRecord("connection", Instant.now()));
    
    // Lire la liste
    List<ErrorRecord> errors = registry.getList("errors.recent", ErrorRecord.class);
    
    // Vider
    registry.clearList("errors.recent");
    

    5.6 Maps

    // Stocker dans une map
    registry.putInMap("workers.status", "worker-1", "RUNNING");
    registry.putInMap("workers.status", "worker-2", "STOPPED");
    
    // Lire une entrée
    Optional<String> status = registry.getFromMap("workers.status", "worker-1", String.class);
    
    // Lire toute la map
    Map<String, String> allStatus = registry.getMap("workers.status", String.class);
    

    5.7 Callbacks

    // Enregistrer un callback
    registry.registerCallback("config.maxRetries", newValue -> {
        log.info("maxRetries changed to: {}", newValue);
        reconfigure((Integer) newValue);
    });
    
    // La modification déclenche le callback
    registry.put("config.maxRetries", 5);  // Callback appelé
    

    6. Patterns courants

    6.1 État de connexion

    @Component
    public class DatabaseWorker implements Worker {
    
        @Autowired
        private SharedDataRegistry registry;
    
        @Override
        public void initialize() {
            registry.put("database.connected", false, HealthLevel.CRITICAL);
        }
    
        @Override
        public void start() {
            try {
                connect();
                registry.put("database.connected", true, HealthLevel.CRITICAL);
            } catch (Exception e) {
                registry.put("database.connected", false, HealthLevel.CRITICAL);
                throw e;
            }
        }
    
        @Override
        public void stop() {
            disconnect();
            registry.put("database.connected", false, HealthLevel.CRITICAL);
        }
    }
    

    6.2 Métriques temps réel

    @Component
    public class MetricsCollector {
    
        @Autowired
        private SharedDataRegistry registry;
    
        @PostConstruct
        public void init() {
            registry.createSequence("metrics.requests.total", 0, HealthLevel.INFO);
            registry.createSequence("metrics.requests.errors", 0, HealthLevel.NORMAL);
            registry.createSequence("metrics.bytes.in", 0, HealthLevel.INFO);
            registry.createSequence("metrics.bytes.out", 0, HealthLevel.INFO);
        }
    
        public void recordRequest(long bytesIn, long bytesOut, boolean success) {
            registry.incrementSequence("metrics.requests.total");
            registry.incrementSequence("metrics.bytes.in", bytesIn);
            registry.incrementSequence("metrics.bytes.out", bytesOut);
    
            if (!success) {
                registry.incrementSequence("metrics.requests.errors");
            }
        }
    
        public Map<String, Object> getMetrics() {
            return Map.of(
                "requests.total", registry.getSequence("metrics.requests.total"),
                "requests.errors", registry.getSequence("metrics.requests.errors"),
                "bytes.in", registry.getSequence("metrics.bytes.in"),
                "bytes.out", registry.getSequence("metrics.bytes.out")
            );
        }
    }
    

    6.3 Circuit Breaker state

    @Component
    public class CircuitBreakerStateManager {
    
        @Autowired
        private SharedDataRegistry registry;
    
        public void updateState(String circuitName, CircuitState state) {
            String key = "circuit." + circuitName + ".state";
            HealthLevel level = state == CircuitState.OPEN
                ? HealthLevel.IMPORTANT
                : HealthLevel.NORMAL;
            registry.put(key, state.name(), level);
        }
    
        public CircuitState getState(String circuitName) {
            return registry.getString("circuit." + circuitName + ".state")
                .map(CircuitState::valueOf)
                .orElse(CircuitState.CLOSED);
        }
    }
    

    6.4 Progress tracking

    @Component
    public class BatchProcessor {
    
        @Autowired
        private SharedDataRegistry registry;
    
        public void processBatch(String batchId, List<Item> items) {
            registry.put("batch." + batchId + ".total", items.size());
            registry.createSequence("batch." + batchId + ".processed", 0, HealthLevel.NORMAL);
    
            for (Item item : items) {
                processItem(item);
                registry.incrementSequence("batch." + batchId + ".processed");
            }
    
            registry.put("batch." + batchId + ".status", "COMPLETED");
        }
    
        public double getProgress(String batchId) {
            int total = registry.getInt("batch." + batchId + ".total").orElse(0);
            if (total == 0) return 0;
    
            long processed = registry.getSequence("batch." + batchId + ".processed");
            return (double) processed / total * 100;
        }
    }
    

    7. Intégration avec Supervisor

    @Component
    public class HealthAggregator {
    
        @Autowired
        private SharedDataRegistry registry;
    
        @Autowired
        private List<Worker> workers;
    
        public HealthStatus aggregateHealth() {
            // Check workers
            boolean allWorkersHealthy = workers.stream().allMatch(Worker::isHealthy);
    
            // Check critical registry entries
            List<String> unhealthyKeys = registry.getUnhealthyKeys();
            boolean hasCriticalFailure = !unhealthyKeys.isEmpty();
    
            if (!allWorkersHealthy || hasCriticalFailure) {
                return HealthStatus.UNHEALTHY;
            }
    
            // Check important entries
            Map<String, HealthLevel> levels = registry.getAllHealthLevels();
            boolean hasImportantFailure = levels.entrySet().stream()
                .filter(e -> e.getValue() == HealthLevel.IMPORTANT)
                .anyMatch(e -> !isHealthy(registry.get(e.getKey())));
    
            if (hasImportantFailure) {
                return HealthStatus.DEGRADED;
            }
    
            return HealthStatus.HEALTHY;
        }
    }
    

    8. Exposition API

    @RestController
    @RequestMapping("/admin/registry")
    public class SharedDataController {
    
        @Autowired
        private SharedDataRegistry registry;
    
        @GetMapping
        public Map<String, Object> getAll() {
            return registry.getAll();
        }
    
        @GetMapping("/{key}")
        public ResponseEntity<?> get(@PathVariable String key) {
            return registry.get(key)
                .map(ResponseEntity::ok)
                .orElse(ResponseEntity.notFound().build());
        }
    
        @GetMapping("/health")
        public Map<String, HealthLevel> getHealthLevels() {
            return registry.getAllHealthLevels();
        }
    
        @GetMapping("/unhealthy")
        public List<String> getUnhealthyKeys() {
            return registry.getUnhealthyKeys();
        }
    }
    

    9. Bonnes pratiques

    Conventions de nommage

    <category>.<subcategory>.<name>
    
    Exemples:
    - database.connected
    - worker.kafka.status
    - metrics.requests.total
    - batch.order-123.progress
    - circuit.external-api.state
    

    DO

    • Utiliser des noms de clés cohérents et hiérarchiques
    • Définir le HealthLevel approprié pour chaque donnée
    • Utiliser les sequences pour les compteurs (thread-safe)
    • Nettoyer les données obsolètes

    DON’T

    • Ne pas stocker de données volumineuses (logs, payloads)
    • Ne pas utiliser pour le stockage persistant (utiliser TechDB)
    • Ne pas créer de nouvelles clés dynamiquement sans contrôle
    • Ne pas oublier que c’est per-instance (pas de sync multi-instances)

    10. Références

  • Socle V004 – Log4j2 et LogForwarder

    Socle V004 – Log4j2 et LogForwarder

    22 – Log4j2 et LogForwarder (Nouveauté V4)

    Version : 4.0.0 Date : 2025-12-09

    1. Introduction

    Le Socle V4 remplace Logback par Log4j2 pour le logging, avec un LogForwarder intégré pour la centralisation des logs.

    Pourquoi Log4j2 ?

    Critère Logback (V3) Log4j2 (V4)
    Async natif AsyncAppender (wrapper) AsyncLoggers (LMAX Disruptor)
    Performance Bon 6-68x plus rapide
    Garbage-free Non Oui
    Custom Appender Complexe Plugin system simple
    JSON natif Via encoder externe JsonTemplateLayout intégré

    2. Architecture

    ┌─────────────────────────────────────────────────────────────┐
    │                      Application                             │
    │                                                              │
    │  Logger.info("message")                                      │
    │         │                                                    │
    │         ▼                                                    │
    │  ┌─────────────────────────────────────────────────────┐    │
    │  │              Log4j2 AsyncLoggers                     │    │
    │  │              (LMAX Disruptor)                        │    │
    │  └────────────────────┬────────────────────────────────┘    │
    │                       │                                      │
    │         ┌─────────────┼─────────────┐                       │
    │         ▼             ▼             ▼                       │
    │  ┌───────────┐ ┌───────────┐ ┌─────────────────────┐       │
    │  │  Console  │ │  File     │ │ SocleLogForwarder   │       │
    │  │  Appender │ │  Appender │ │ Appender            │       │
    │  └───────────┘ └───────────┘ └──────────┬──────────┘       │
    │                                         │                   │
    └─────────────────────────────────────────┼───────────────────┘
                                              │
                        ┌─────────────────────┴─────────────────────┐
                        │                                           │
                        ▼                                           ▼
                ┌──────────────┐                           ┌──────────────┐
                │ HTTP Transport│                           │ NATS Transport│
                │ → LogHub     │                           │ → JetStream  │
                └──────┬───────┘                           └──────┬───────┘
                       │                                          │
                       │  (si échec)                              │
                       ▼                                          │
                ┌──────────────┐                                  │
                │ H2 Fallback  │◄─────────────────────────────────┘
                │ Storage      │
                └──────────────┘
    

    3. Configuration

    3.1 Dépendances Maven

    <!-- Exclure Logback de Spring Boot -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
        <exclusions>
            <exclusion>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-logging</artifactId>
            </exclusion>
        </exclusions>
    </dependency>
    
    <!-- Log4j2 -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-log4j2</artifactId>
    </dependency>
    
    <!-- LMAX Disruptor (AsyncLoggers) -->
    <dependency>
        <groupId>com.lmax</groupId>
        <artifactId>disruptor</artifactId>
        <version>4.0.0</version>
    </dependency>
    
    <!-- JSON Template Layout -->
    <dependency>
        <groupId>org.apache.logging.log4j</groupId>
        <artifactId>log4j-layout-template-json</artifactId>
        <version>2.22.1</version>
    </dependency>
    

    3.2 log4j2.xml

    <?xml version="1.0" encoding="UTF-8"?>
    <Configuration status="WARN" monitorInterval="30">
    
        <Properties>
            <Property name="LOG_DIR">${env:LOG_DIR:-./logs}</Property>
            <Property name="APP_NAME">${env:APP_NAME:-socle-v4}</Property>
            <Property name="REGION">${env:REGION:-local}</Property>
        </Properties>
    
        <Appenders>
            <!-- Console (dev) -->
            <Console name="Console" target="SYSTEM_OUT">
                <PatternLayout pattern="%d{ISO8601} %highlight{%-5level} [%thread] %logger{36} - %msg%n"/>
            </Console>
    
            <!-- Fichier rotatif -->
            <RollingFile name="File"
                         fileName="${LOG_DIR}/${APP_NAME}.log"
                         filePattern="${LOG_DIR}/${APP_NAME}-%d{yyyy-MM-dd}-%i.log.gz">
                <PatternLayout pattern="%d{ISO8601} %-5level [%thread] %logger{36} - %msg%n"/>
                <Policies>
                    <TimeBasedTriggeringPolicy interval="1"/>
                    <SizeBasedTriggeringPolicy size="100MB"/>
                </Policies>
                <DefaultRolloverStrategy max="30"/>
            </RollingFile>
    
            <!-- LogForwarder (centralisation) -->
            <SocleLogForwarder name="LogForwarder"
                               transportMode="${env:LOG_TRANSPORT_MODE:-http}"
                               logHubUrl="${env:LOG_HUB_URL:-http://localhost:8080/api/ingest-logs}"
                               natsUrl="${env:NATS_URL:-nats://localhost:4222}"
                               batchSize="100"
                               flushIntervalMs="5000"
                               queueCapacity="10000"
                               serviceName="${APP_NAME}"
                               region="${REGION}">
                <ThresholdFilter level="INFO"/>
            </SocleLogForwarder>
        </Appenders>
    
        <Loggers>
            <!-- Socle -->
            <Logger name="eu.lmvi.socle" level="${env:LOG_LEVEL:-INFO}" additivity="false">
                <AppenderRef ref="Console"/>
                <AppenderRef ref="File"/>
                <AppenderRef ref="LogForwarder"/>
            </Logger>
    
            <!-- Frameworks (moins verbeux) -->
            <Logger name="org.springframework" level="WARN"/>
            <Logger name="org.apache.kafka" level="WARN"/>
            <Logger name="io.nats" level="WARN"/>
    
            <!-- Root -->
            <Root level="INFO">
                <AppenderRef ref="Console"/>
                <AppenderRef ref="File"/>
                <AppenderRef ref="LogForwarder"/>
            </Root>
        </Loggers>
    
    </Configuration>
    

    3.3 log4j2.component.properties

    # Activer AsyncLoggers globalement (LMAX Disruptor)
    Log4jContextSelector=org.apache.logging.log4j.core.async.AsyncLoggerContextSelector
    
    # Ring buffer (puissance de 2)
    AsyncLogger.RingBufferSize=262144
    
    # Politique d'attente
    AsyncLogger.WaitStrategy=Sleep
    
    # Sécurité (Log4Shell)
    log4j2.formatMsgNoLookups=true
    

    3.4 application.yml

    socle:
      logging:
        forwarder:
          enabled: ${LOG_FORWARDER_ENABLED:false}
          transport-mode: ${LOG_TRANSPORT_MODE:http}
          log-hub-url: ${LOG_HUB_URL:http://localhost:8080/api/ingest-logs}
          nats-url: ${NATS_URL:nats://localhost:4222}
          nats-subject-prefix: ${LOG_NATS_SUBJECT:logs}
          batch-size: ${LOG_BATCH_SIZE:100}
          flush-interval-ms: ${LOG_FLUSH_INTERVAL_MS:5000}
          queue-capacity: ${LOG_QUEUE_CAPACITY:10000}
    
    logging:
      config: classpath:log4j2.xml
    

    4. Variables d’environnement

    Variable Description Défaut
    LOG_LEVEL Niveau de log INFO
    LOG_DIR Répertoire des logs ./logs
    LOG_FORWARDER_ENABLED Activer LogForwarder false
    LOG_TRANSPORT_MODE Mode transport (http/nats) http
    LOG_HUB_URL URL du LogHub
    NATS_URL URL NATS
    LOG_BATCH_SIZE Taille des batches 100
    LOG_FLUSH_INTERVAL_MS Intervalle flush (ms) 5000

    5. Utilisation

    5.1 Logging standard

    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    public class MonService {
        private static final Logger log = LoggerFactory.getLogger(MonService.class);
    
        public void process() {
            log.debug("Processing started");
            log.info("Processing item: {}", itemId);
            log.warn("Slow processing detected");
            log.error("Processing failed", exception);
        }
    }
    

    5.2 MDC (Mapped Diagnostic Context)

    import org.slf4j.MDC;
    
    public class MonService {
        public void process(String correlationId) {
            MDC.put("correlationId", correlationId);
            MDC.put("worker", "order-processor");
            try {
                log.info("Processing order");
                // Les logs incluront correlationId et worker
            } finally {
                MDC.clear();
            }
        }
    }
    

    5.3 Structured logging

    // Les logs JSON incluent automatiquement :
    // - timestamp
    // - level
    // - logger
    // - thread
    // - message
    // - MDC (correlationId, execId, etc.)
    // - exception (si présente)
    
    log.info("Order processed: orderId={}, amount={}", orderId, amount);
    

    6. LogForwarder

    6.1 Principe

    Le LogForwarder :

    1. Collecte les logs dans une queue interne (non-bloquant)
    2. Envoie les logs en batch vers le LogHub (HTTP ou NATS)
    3. Stocke en H2 si le réseau est indisponible
    4. Rejoue automatiquement à la reconnexion

    6.2 Mode HTTP

    socle:
      logging:
        forwarder:
          enabled: true
          transport-mode: http
          log-hub-url: https://logs.mycompany.com/api/ingest-logs
    

    Les logs sont envoyés en POST avec JWT :

    POST /api/ingest-logs
    Authorization: Bearer <jwt>
    Content-Type: application/json
    
    [
      {"timestamp": "...", "level": "INFO", "message": "...", ...},
      {"timestamp": "...", "level": "ERROR", "message": "...", ...}
    ]
    

    6.3 Mode NATS

    socle:
      logging:
        forwarder:
          enabled: true
          transport-mode: nats
          nats-url: nats://nats.mycompany.com:4222
          nats-subject-prefix: logs
    

    Les logs sont publiés sur logs.<region>.<service> :

    logs.mtq.order-service
    logs.gua.sync-agent
    

    6.4 Fallback H2

    Si le transport échoue, les logs sont stockés dans socle_log_fallback :

    SELECT COUNT(*) FROM socle_log_fallback;  -- Logs en attente
    

    Ils sont automatiquement rejoués quand le transport redevient disponible.

    7. Format JSON des logs

    {
      "timestamp": "2025-12-09T10:30:00.123Z",
      "level": "INFO",
      "logger": "eu.lmvi.socle.mop.MainOrchestratorProcess",
      "thread": "main",
      "message": "MOP démarré avec succès",
      "service": "socle-v4",
      "region": "MTQ",
      "instanceId": "nuc-mtq-001",
      "execId": "20251209-1030-abc123",
      "correlationId": "MTQ-2025-12-09-000001",
      "mdc": {
        "worker": "http_worker",
        "phase": "startup"
      },
      "exception": null
    }
    

    8. Profils de configuration

    8.1 Développement

    <!-- log4j2-dev.xml -->
    <Configuration status="WARN">
        <Appenders>
            <Console name="Console" target="SYSTEM_OUT">
                <PatternLayout pattern="%d{HH:mm:ss.SSS} %highlight{%-5level} [%thread] %logger{36} - %msg%n"/>
            </Console>
        </Appenders>
        <Loggers>
            <Root level="DEBUG">
                <AppenderRef ref="Console"/>
            </Root>
        </Loggers>
    </Configuration>
    

    8.2 Production

    <!-- log4j2-prod.xml -->
    <Configuration status="ERROR">
        <Appenders>
            <Console name="ConsoleJson" target="SYSTEM_OUT">
                <JsonTemplateLayout eventTemplateUri="classpath:socle-log-template.json"/>
            </Console>
            <SocleLogForwarder name="LogForwarder" .../>
        </Appenders>
        <Loggers>
            <Root level="INFO">
                <AppenderRef ref="ConsoleJson"/>
                <AppenderRef ref="LogForwarder"/>
            </Root>
        </Loggers>
    </Configuration>
    

    8.3 Sélection du profil

    logging:
      config: classpath:log4j2-${spring.profiles.active}.xml
    

    9. Performances

    AsyncLoggers vs Sync

    Mode Throughput Latence
    Sync ~1M logs/sec Variable
    Async (LMAX) ~18M logs/sec Stable

    Bonnes pratiques

    // BON - Lazy evaluation
    log.debug("Processing: {}", () -> expensiveToString());
    
    // MAUVAIS - Toujours évalué
    log.debug("Processing: " + expensiveToString());
    

    10. Troubleshooting

    Logs non visibles

    1. Vérifier log4j2.xml dans src/main/resources/
    2. Vérifier les niveaux de log
    3. Vérifier logging.config dans application.yml

    AsyncLoggers non activés

    Vérifier que log4j2.component.properties existe et contient :

    Log4jContextSelector=org.apache.logging.log4j.core.async.AsyncLoggerContextSelector
    

    LogForwarder queue pleine

    WARN - Log queue full, storing to fallback
    

    Solutions :

    • Augmenter queue-capacity
    • Réduire batch-size
    • Vérifier la connectivité réseau

    Logs en fallback non rejoués

    -- Vérifier les logs en attente
    SELECT COUNT(*) FROM socle_log_fallback;
    
    -- Forcer le replay (via API admin)
    POST /admin/logforwarder/replay
    

    11. Sécurité

    Log4Shell (CVE-2021-44228)

    Log4j2 2.22.1 est protégé contre Log4Shell. De plus :

    # Désactiver les lookups JNDI
    log4j2.formatMsgNoLookups=true
    

    Données sensibles

    // NE PAS logger de données sensibles
    log.info("User logged in: {}", user.getEmail());  // OK
    log.info("Password: {}", password);  // INTERDIT
    

    12. Références

  • Socle V004 – Plugins

    Socle V004 – Plugins

    20 – Plugins

    Version : 4.0.0 Date : 2025-12-09

    1. Introduction

    Le Socle V4 supporte une architecture de plugins pour étendre les fonctionnalités de base. Les plugins sont des modules Spring Boot qui s’intègrent automatiquement.

    2. Architecture des plugins

    ┌──────────────────────────────────────────────────────────┐
    │                    Application                            │
    │                                                           │
    │  ┌─────────────────────────────────────────────────────┐ │
    │  │                   Socle V4 Core                      │ │
    │  │  MOP | Workers | KvBus | TechDB | Logging | etc.    │ │
    │  └─────────────────────────────────────────────────────┘ │
    │                          │                                │
    │         ┌────────────────┼────────────────┐              │
    │         ▼                ▼                ▼              │
    │  ┌────────────┐   ┌────────────┐   ┌────────────┐       │
    │  │   Plugin   │   │   Plugin   │   │   Plugin   │       │
    │  │   Kafka    │   │   NATS     │   │   Custom   │       │
    │  └────────────┘   └────────────┘   └────────────┘       │
    │                                                           │
    └──────────────────────────────────────────────────────────┘
    

    3. Créer un plugin

    3.1 Structure Maven

    <?xml version="1.0" encoding="UTF-8"?>
    <project>
        <modelVersion>4.0.0</modelVersion>
    
        <groupId>com.mycompany</groupId>
        <artifactId>socle-plugin-myplugin</artifactId>
        <version>1.0.0</version>
    
        <dependencies>
            <!-- Dépendance Socle -->
            <dependency>
                <groupId>eu.lmvi</groupId>
                <artifactId>socle-v004</artifactId>
                <version>4.0.0</version>
                <scope>provided</scope>
            </dependency>
        </dependencies>
    </project>
    

    3.2 Auto-configuration

    package com.mycompany.plugin;
    
    import org.springframework.boot.autoconfigure.AutoConfiguration;
    import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
    import org.springframework.context.annotation.ComponentScan;
    
    @AutoConfiguration
    @ConditionalOnProperty(name = "socle.plugins.myplugin.enabled", havingValue = "true")
    @ComponentScan(basePackages = "com.mycompany.plugin")
    public class MyPluginAutoConfiguration {
        // Configuration automatique
    }
    

    3.3 Fichier spring.factories

    # src/main/resources/META-INF/spring.factories
    org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
    com.mycompany.plugin.MyPluginAutoConfiguration
    

    Ou pour Spring Boot 3.x :

    # src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports
    com.mycompany.plugin.MyPluginAutoConfiguration
    

    4. Types de plugins

    4.1 Plugin Worker

    package com.mycompany.plugin.worker;
    
    import eu.lmvi.socle.worker.Worker;
    import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
    import org.springframework.stereotype.Component;
    
    @Component
    @ConditionalOnProperty(name = "socle.plugins.myplugin.enabled", havingValue = "true")
    public class MyPluginWorker implements Worker {
    
        @Override
        public String getName() {
            return "my-plugin-worker";
        }
    
        @Override
        public void initialize() {
            // Initialisation
        }
    
        @Override
        public void start() {
            // Démarrage
        }
    
        @Override
        public void doWork() {
            // Traitement
        }
    
        @Override
        public void stop() {
            // Arrêt
        }
    
        @Override
        public boolean isHealthy() {
            return true;
        }
    
        @Override
        public Map<String, Object> getStats() {
            return Map.of();
        }
    }
    

    4.2 Plugin KvBus

    package com.mycompany.plugin.kv;
    
    import eu.lmvi.socle.kv.KvBus;
    import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
    import org.springframework.stereotype.Component;
    
    @Component
    @ConditionalOnProperty(name = "socle.kvbus.mode", havingValue = "custom")
    public class CustomKvBus implements KvBus {
    
        @Override
        public void put(String key, String value) {
            // Implémentation custom
        }
    
        @Override
        public Optional<String> get(String key) {
            // Implémentation custom
            return Optional.empty();
        }
    
        // ... autres méthodes
    }
    

    4.3 Plugin Transport (LogForwarder)

    package com.mycompany.plugin.logging;
    
    import eu.lmvi.socle.logging.LogTransport;
    import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
    import org.springframework.stereotype.Component;
    
    @Component
    @ConditionalOnProperty(name = "socle.logging.forwarder.transport-mode", havingValue = "custom")
    public class CustomLogTransport implements LogTransport {
    
        @Override
        public void send(List<LogEntry> entries) throws Exception {
            // Envoyer les logs vers votre système
        }
    
        @Override
        public boolean isAvailable() {
            return true;
        }
    
        @Override
        public void close() {
            // Cleanup
        }
    }
    

    5. Plugin Kafka (exemple complet)

    5.1 Structure

    socle-plugin-kafka/
    ├── pom.xml
    ├── src/main/java/eu/lmvi/socle/plugin/kafka/
    │   ├── KafkaPluginAutoConfiguration.java
    │   ├── KafkaPluginConfiguration.java
    │   ├── KafkaConsumerWorker.java
    │   ├── KafkaProducerService.java
    │   └── KafkaHealthIndicator.java
    └── src/main/resources/
        └── META-INF/spring/
            └── org.springframework.boot.autoconfigure.AutoConfiguration.imports
    

    5.2 Configuration

    @ConfigurationProperties(prefix = "socle.plugins.kafka")
    public class KafkaPluginConfiguration {
        private boolean enabled = false;
        private String bootstrapServers = "localhost:9092";
        private String groupId = "socle-group";
        private List<String> topics = new ArrayList<>();
        private Map<String, String> consumerProperties = new HashMap<>();
        private Map<String, String> producerProperties = new HashMap<>();
    
        // Getters/Setters
    }
    

    5.3 Auto-configuration

    @AutoConfiguration
    @ConditionalOnProperty(name = "socle.plugins.kafka.enabled", havingValue = "true")
    @EnableConfigurationProperties(KafkaPluginConfiguration.class)
    @ComponentScan(basePackages = "eu.lmvi.socle.plugin.kafka")
    public class KafkaPluginAutoConfiguration {
    
        @Bean
        public KafkaConsumer<String, String> kafkaConsumer(KafkaPluginConfiguration config) {
            Properties props = new Properties();
            props.put("bootstrap.servers", config.getBootstrapServers());
            props.put("group.id", config.getGroupId());
            props.putAll(config.getConsumerProperties());
            return new KafkaConsumer<>(props);
        }
    
        @Bean
        public KafkaProducer<String, String> kafkaProducer(KafkaPluginConfiguration config) {
            Properties props = new Properties();
            props.put("bootstrap.servers", config.getBootstrapServers());
            props.putAll(config.getProducerProperties());
            return new KafkaProducer<>(props);
        }
    }
    

    5.4 Worker

    @Component
    @ConditionalOnProperty(name = "socle.plugins.kafka.enabled", havingValue = "true")
    public class KafkaConsumerWorker extends AbstractWorker {
    
        private final KafkaConsumer<String, String> consumer;
        private final KafkaPluginConfiguration config;
        private final TechDbManager techDb;
    
        @Override
        public String getName() {
            return "kafka-consumer-plugin";
        }
    
        @Override
        protected void doInitialize() {
            consumer.subscribe(config.getTopics());
        }
    
        @Override
        protected void doProcess() {
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
            for (ConsumerRecord<String, String> record : records) {
                processRecord(record);
            }
        }
    
        @Override
        protected void doStop() {
            consumer.close();
        }
    }
    

    5.5 Utilisation

    # application.yml
    socle:
      plugins:
        kafka:
          enabled: true
          bootstrap-servers: kafka:9092
          group-id: my-app
          topics:
            - orders
            - events
    

    6. Plugin NATS (exemple)

    6.1 Configuration

    @ConfigurationProperties(prefix = "socle.plugins.nats")
    public class NatsPluginConfiguration {
        private boolean enabled = false;
        private String url = "nats://localhost:4222";
        private List<String> subjects = new ArrayList<>();
        private String streamName;
        private String consumerName;
    }
    

    6.2 Worker

    @Component
    @ConditionalOnProperty(name = "socle.plugins.nats.enabled", havingValue = "true")
    public class NatsConsumerWorker extends AbstractWorker {
    
        private final NatsPluginConfiguration config;
        private Connection natsConnection;
        private JetStream jetStream;
    
        @Override
        protected void doInitialize() {
            natsConnection = Nats.connect(config.getUrl());
            jetStream = natsConnection.jetStream();
        }
    
        @Override
        protected void doProcess() {
            for (String subject : config.getSubjects()) {
                Message msg = jetStream.pullSubscribe(subject, config.getConsumerName())
                    .fetch(100, Duration.ofSeconds(1))
                    .stream()
                    .findFirst()
                    .orElse(null);
    
                if (msg != null) {
                    processMessage(msg);
                    msg.ack();
                }
            }
        }
    }
    

    7. Extension des APIs Admin

    7.1 Controller additionnel

    @RestController
    @RequestMapping("/admin/plugins/kafka")
    @ConditionalOnProperty(name = "socle.plugins.kafka.enabled", havingValue = "true")
    public class KafkaAdminController {
    
        @Autowired
        private KafkaConsumerWorker worker;
    
        @GetMapping("/status")
        public Map<String, Object> status() {
            return Map.of(
                "connected", worker.isHealthy(),
                "stats", worker.getStats()
            );
        }
    
        @GetMapping("/offsets")
        public Map<String, Long> offsets() {
            return worker.getCurrentOffsets();
        }
    
        @PostMapping("/seek/{topic}/{partition}/{offset}")
        public void seek(
                @PathVariable String topic,
                @PathVariable int partition,
                @PathVariable long offset) {
            worker.seekTo(topic, partition, offset);
        }
    }
    

    8. Métriques du plugin

    @Component
    @ConditionalOnProperty(name = "socle.plugins.kafka.enabled", havingValue = "true")
    public class KafkaPluginMetrics {
    
        private final Counter messagesReceived;
        private final Counter messagesProcessed;
        private final Timer processingTime;
    
        public KafkaPluginMetrics(MeterRegistry registry) {
            this.messagesReceived = Counter.builder("socle_kafka_messages_received_total")
                .description("Total Kafka messages received")
                .register(registry);
    
            this.messagesProcessed = Counter.builder("socle_kafka_messages_processed_total")
                .description("Total Kafka messages processed")
                .register(registry);
    
            this.processingTime = Timer.builder("socle_kafka_processing_duration_seconds")
                .description("Kafka message processing duration")
                .register(registry);
        }
    
        public void recordReceived() {
            messagesReceived.increment();
        }
    
        public void recordProcessed(Duration duration) {
            messagesProcessed.increment();
            processingTime.record(duration);
        }
    }
    

    9. Test du plugin

    @SpringBootTest
    @TestPropertySource(properties = {
        "socle.plugins.kafka.enabled=true",
        "socle.plugins.kafka.bootstrap-servers=localhost:9092"
    })
    class KafkaPluginTest {
    
        @Autowired
        private KafkaConsumerWorker worker;
    
        @Test
        void workerShouldBeRegistered() {
            assertNotNull(worker);
            assertEquals("kafka-consumer-plugin", worker.getName());
        }
    
        @Test
        void workerShouldStart() {
            worker.initialize();
            worker.start();
            assertTrue(worker.isHealthy());
        }
    }
    

    10. Publication du plugin

    10.1 Maven deploy

    <distributionManagement>
        <repository>
            <id>releases</id>
            <url>https://nexus.mycompany.com/repository/maven-releases/</url>
        </repository>
    </distributionManagement>
    
    mvn clean deploy
    

    10.2 Utilisation dans une application

    <dependency>
        <groupId>eu.lmvi</groupId>
        <artifactId>socle-plugin-kafka</artifactId>
        <version>1.0.0</version>
    </dependency>
    

    11. Bonnes pratiques

    DO

    • Utiliser @ConditionalOnProperty pour activer/désactiver
    • Exposer la configuration via @ConfigurationProperties
    • Implémenter des health indicators
    • Exposer des métriques
    • Documenter les options de configuration

    DON’T

    • Ne pas forcer l’activation par défaut
    • Ne pas dupliquer les fonctionnalités du core
    • Ne pas utiliser de dépendances en conflit avec le Socle
    • Ne pas bloquer le démarrage de l’application si le plugin échoue

    12. Références

  • Socle V004 – EventBus et Workers

    Socle V004 – EventBus et Workers

    30 – EventBus et Workers Push

    Version : 4.1.0 Date : 2026-01-23 Statut : Specification

    1. Introduction

    Ce document introduit le nouveau systeme EventBus et les Workers Push dans le Socle V004. Ce mecanisme remplace avantageusement le polling par un systeme de notification instantanee.

    1.1 Motivation

    Le mecanisme actuel des workers event-driven (AbstractEventDrivenWorker) utilise un polling avec timeout :

    // Ancien mecanisme (DEPRECIE)
    while (running) {
        event = queue.poll(100, TimeUnit.MILLISECONDS);  // Verifie toutes les 100ms
        if (event != null) processEvent(event);
    }
    

    Problemes :

    • Latence de 0 a 100ms avant traitement
    • Consommation CPU constante (meme si faible)
    • Pas de vrai mecanisme push

    1.2 Nouveau mecanisme Push

    // Nouveau mecanisme (RECOMMANDE)
    while (running) {
        event = queue.take();  // Dort jusqu'a ce qu'un event arrive
        processEvent(event);   // Reveil instantane
    }
    

    Avantages :

    • Latence ~0ms (reveil instantane)
    • Zero CPU quand idle
    • Vrai mecanisme push event-driven

    2. Depreciation des anciens Workers

    2.1 Classes depreciees

    Classe Statut Remplacement
    AbstractEventDrivenWorker<T> DEPRECIE AbstractPushWorker<T>
    KafkaEventWorker<K,V> DEPRECIE KafkaPushWorker<K,V>

    2.2 Migration

    Les anciens workers continuent de fonctionner mais ne doivent plus etre utilises pour les nouveaux developpements.

    // DEPRECIE - Ne plus utiliser
    @Deprecated(since = "4.1.0", forRemoval = true)
    public abstract class AbstractEventDrivenWorker<T> implements Worker { ... }
    
    // RECOMMANDE - Utiliser ceci
    public abstract class AbstractPushWorker<T> implements Worker { ... }
    

    2.3 Calendrier de depreciation

    Version Action
    4.1.0 Introduction EventBus + Workers Push, depreciation des anciens
    4.2.0 Warnings de compilation pour les anciens workers
    5.0.0 Suppression des anciens workers (breaking change)

    3. Architecture EventBus

    3.1 Vue d’ensemble

    ┌─────────────────────────────────────────────────────────────────────┐
    │                              MOP                                     │
    │                                                                      │
    │  ┌──────────────────────────────────────────────────────────────┐   │
    │  │                        EventBus                               │   │
    │  │                                                               │   │
    │  │   Channels:                                                   │   │
    │  │     "orders.created"   ──► [Worker1.queue, Worker2.queue]     │   │
    │  │     "orders.updated"   ──► [Worker1.queue]                    │   │
    │  │     "alerts.*"         ──► [AlertWorker.queue]                │   │
    │  │                                                               │   │
    │  └──────────────────────────────────────────────────────────────┘   │
    │                              │                                       │
    │         publish("orders.created", event)                            │
    │                              │                                       │
    │                              ▼                                       │
    │  ┌────────────────┐  ┌────────────────┐  ┌────────────────┐        │
    │  │ PushWorker 1   │  │ PushWorker 2   │  │ PushWorker 3   │        │
    │  │                │  │                │  │                │        │
    │  │ queue.take()   │  │ queue.take()   │  │ queue.take()   │        │
    │  │ (dort)         │  │ (dort)         │  │ (dort)         │        │
    │  │      ↓         │  │      ↓         │  │                │        │
    │  │ REVEIL!        │  │ REVEIL!        │  │ (pas abonne)   │        │
    │  └────────────────┘  └────────────────┘  └────────────────┘        │
    │                                                                      │
    └─────────────────────────────────────────────────────────────────────┘
    

    3.2 Composants

    Composant Role
    EventBus Bus central de publication/souscription
    BusEvent<T> Enveloppe d’evenement avec metadata
    AbstractPushWorker<T> Worker reveille par EventBus
    KafkaPushWorker<K,V> Worker Kafka avec push interne
    EventBusWorker Worker qui gere le lifecycle de l’EventBus

    4. Interface EventBus

    4.1 Definition

    package eu.lmvi.socle.event;
    
    /**
     * Bus d'evenements central avec mecanisme push.
     *
     * <p>L'EventBus permet la communication entre workers via un systeme
     * de publication/souscription avec reveil instantane.</p>
     */
    public interface EventBus {
    
        // ==================== Publication ====================
    
        /**
         * Publie un evenement sur un channel.
         * Les subscribers sont reveilles instantanement.
         *
         * @param channel Nom du channel (ex: "orders.created")
         * @param payload Contenu de l'evenement
         */
        void publish(String channel, Object payload);
    
        /**
         * Publie un evenement avec metadata personnalisees.
         *
         * @param channel  Nom du channel
         * @param payload  Contenu de l'evenement
         * @param metadata Metadata additionnelles
         */
        void publish(String channel, Object payload, Map<String, String> metadata);
    
        /**
         * Publication asynchrone avec confirmation.
         *
         * @return Future completee quand tous les subscribers ont recu l'event
         */
        CompletableFuture<PublishResult> publishAsync(String channel, Object payload);
    
        // ==================== Souscription ====================
    
        /**
         * S'abonne a un channel et retourne une queue pour reception.
         * Utiliser queue.take() pour attendre les evenements (push).
         *
         * @param channel   Nom du channel
         * @param eventType Type attendu du payload
         * @return Queue sur laquelle faire take()
         */
        <T> Subscription<T> subscribe(String channel, Class<T> eventType);
    
        /**
         * S'abonne avec un pattern glob (ex: "orders.*", "*.created").
         *
         * @param pattern   Pattern glob
         * @param eventType Type attendu du payload
         * @return Subscription avec queue
         */
        <T> Subscription<T> subscribePattern(String pattern, Class<T> eventType);
    
        /**
         * S'abonne avec un handler callback (execution immediate).
         *
         * @param channel Nom du channel
         * @param handler Handler appele pour chaque evenement
         * @return ID de souscription pour unsubscribe
         */
        String subscribe(String channel, EventHandler handler);
    
        /**
         * Se desabonne.
         *
         * @param subscriptionId ID retourne par subscribe
         */
        void unsubscribe(String subscriptionId);
    
        // ==================== Lifecycle ====================
    
        /**
         * Demarre l'EventBus.
         */
        void start();
    
        /**
         * Arrete l'EventBus proprement.
         * Les queues sont videes, les subscribers notifies.
         */
        void stop();
    
        /**
         * Verifie la sante de l'EventBus.
         */
        boolean isHealthy();
    
        // ==================== Stats ====================
    
        /**
         * Statistiques globales.
         */
        EventBusStats getStats();
    
        /**
         * Statistiques par channel.
         */
        Map<String, ChannelStats> getChannelStats();
    }
    

    4.2 Classes de support

    package eu.lmvi.socle.event;
    
    /**
     * Evenement transporte par l'EventBus.
     */
    public record BusEvent<T>(
        String id,                      // UUID unique
        String channel,                 // Channel de publication
        Instant timestamp,              // Timestamp de publication
        String source,                  // Source (worker name)
        T payload,                      // Contenu metier
        Map<String, String> metadata    // Metadata additionnelles
    ) {
        /**
         * Cree un evenement simple.
         */
        public static <T> BusEvent<T> of(String channel, T payload) {
            return new BusEvent<>(
                UUID.randomUUID().toString(),
                channel,
                Instant.now(),
                null,
                payload,
                Map.of()
            );
        }
    }
    
    /**
     * Souscription a un channel.
     */
    public interface Subscription<T> {
    
        /**
         * ID unique de la souscription.
         */
        String getId();
    
        /**
         * Channel ou pattern souscrit.
         */
        String getChannelOrPattern();
    
        /**
         * Queue de reception des evenements.
         * Utiliser take() pour attendre (bloquant, reveil instantane).
         * Utiliser poll(timeout) si besoin d'un timeout.
         */
        BlockingQueue<BusEvent<T>> getQueue();
    
        /**
         * Se desabonner.
         */
        void unsubscribe();
    
        /**
         * Nombre d'evenements en attente dans la queue.
         */
        int getPendingCount();
    }
    
    /**
     * Handler pour souscription callback.
     */
    @FunctionalInterface
    public interface EventHandler {
        void onEvent(BusEvent<?> event);
    }
    
    /**
     * Resultat de publication.
     */
    public record PublishResult(
        String eventId,
        int deliveredCount,
        int failedCount,
        Duration duration
    ) {}
    
    /**
     * Statistiques EventBus.
     */
    public record EventBusStats(
        long totalPublished,
        long totalDelivered,
        long totalFailed,
        int activeSubscriptions,
        int activeChannels,
        Instant startedAt
    ) {}
    
    /**
     * Statistiques par channel.
     */
    public record ChannelStats(
        String channel,
        long publishedCount,
        int subscriberCount,
        long lastPublishedAt
    ) {}
    

    5. Implementations EventBus

    5.1 InMemoryEventBus

    Implementation pour mono-instance (dev, tests, applications simples).

    package eu.lmvi.socle.event.impl;
    
    @Component
    @ConditionalOnProperty(name = "socle.eventbus.mode", havingValue = "in_memory", matchIfMissing = true)
    public class InMemoryEventBus implements EventBus {
    
        private static final Logger log = LoggerFactory.getLogger(InMemoryEventBus.class);
    
        // Subscriptions par channel exact
        private final Map<String, Set<SubscriptionImpl<?>>> channelSubs = new ConcurrentHashMap<>();
    
        // Subscriptions par pattern
        private final Map<Pattern, Set<SubscriptionImpl<?>>> patternSubs = new ConcurrentHashMap<>();
    
        // Index pour lookup rapide
        private final Map<String, SubscriptionImpl<?>> subsIndex = new ConcurrentHashMap<>();
    
        // Compteurs
        private final AtomicLong idGen = new AtomicLong();
        private final AtomicLong publishedCount = new AtomicLong();
        private final AtomicLong deliveredCount = new AtomicLong();
        private final AtomicLong failedCount = new AtomicLong();
    
        // Etat
        private volatile boolean running = false;
        private Instant startedAt;
    
        // ==================== Lifecycle ====================
    
        @Override
        public void start() {
            running = true;
            startedAt = Instant.now();
            log.info("InMemoryEventBus started");
        }
    
        @Override
        public void stop() {
            running = false;
    
            // Notifier tous les subscribers (poison pill)
            for (SubscriptionImpl<?> sub : subsIndex.values()) {
                sub.notifyShutdown();
            }
    
            log.info("InMemoryEventBus stopped. Published={}, Delivered={}, Failed={}",
                publishedCount.get(), deliveredCount.get(), failedCount.get());
        }
    
        @Override
        public boolean isHealthy() {
            return running;
        }
    
        // ==================== Publish ====================
    
        @Override
        public void publish(String channel, Object payload) {
            publish(channel, payload, Map.of());
        }
    
        @Override
        public void publish(String channel, Object payload, Map<String, String> metadata) {
            if (!running) {
                log.warn("EventBus not running, event dropped on channel '{}'", channel);
                return;
            }
    
            BusEvent<?> event = new BusEvent<>(
                "evt-" + idGen.incrementAndGet(),
                channel,
                Instant.now(),
                resolveSource(),
                payload,
                metadata
            );
    
            int delivered = 0;
            int failed = 0;
    
            // Livraison aux subscriptions exactes
            Set<SubscriptionImpl<?>> exact = channelSubs.get(channel);
            if (exact != null) {
                for (SubscriptionImpl<?> sub : exact) {
                    if (deliverToSubscription(sub, event)) {
                        delivered++;
                    } else {
                        failed++;
                    }
                }
            }
    
            // Livraison aux subscriptions par pattern
            for (Map.Entry<Pattern, Set<SubscriptionImpl<?>>> entry : patternSubs.entrySet()) {
                if (entry.getKey().matcher(channel).matches()) {
                    for (SubscriptionImpl<?> sub : entry.getValue()) {
                        if (deliverToSubscription(sub, event)) {
                            delivered++;
                        } else {
                            failed++;
                        }
                    }
                }
            }
    
            publishedCount.incrementAndGet();
            deliveredCount.addAndGet(delivered);
            failedCount.addAndGet(failed);
    
            log.debug("Published event {} to channel '{}': delivered={}, failed={}",
                event.id(), channel, delivered, failed);
        }
    
        @Override
        public CompletableFuture<PublishResult> publishAsync(String channel, Object payload) {
            return CompletableFuture.supplyAsync(() -> {
                Instant start = Instant.now();
                publish(channel, payload);
                // Simplification: on ne trace pas le detail ici
                return new PublishResult(
                    "evt-" + idGen.get(),
                    1, 0,
                    Duration.between(start, Instant.now())
                );
            });
        }
    
        @SuppressWarnings("unchecked")
        private boolean deliverToSubscription(SubscriptionImpl<?> sub, BusEvent<?> event) {
            try {
                BlockingQueue<BusEvent<?>> queue = (BlockingQueue<BusEvent<?>>) sub.getQueue();
    
                // offer avec timeout pour eviter blocage infini
                boolean offered = queue.offer(event, 100, TimeUnit.MILLISECONDS);
    
                if (!offered) {
                    log.warn("Queue full for subscription {}, event dropped", sub.getId());
                    return false;
                }
    
                return true;
    
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                return false;
            } catch (Exception e) {
                log.error("Error delivering to subscription {}: {}", sub.getId(), e.getMessage());
                return false;
            }
        }
    
        private String resolveSource() {
            // Recuperer le nom du worker courant si possible
            String threadName = Thread.currentThread().getName();
            return threadName;
        }
    
        // ==================== Subscribe ====================
    
        @Override
        public <T> Subscription<T> subscribe(String channel, Class<T> eventType) {
            String subId = "sub-" + idGen.incrementAndGet();
            SubscriptionImpl<T> sub = new SubscriptionImpl<>(subId, channel, null, this);
    
            channelSubs
                .computeIfAbsent(channel, k -> ConcurrentHashMap.newKeySet())
                .add(sub);
    
            subsIndex.put(subId, sub);
    
            log.info("Subscription {} created on channel '{}'", subId, channel);
            return sub;
        }
    
        @Override
        public <T> Subscription<T> subscribePattern(String pattern, Class<T> eventType) {
            String subId = "sub-" + idGen.incrementAndGet();
    
            // Convertir glob en regex
            String regex = pattern
                .replace(".", "\\.")
                .replace("*", "[^.]*")
                .replace(">", ".*");
            Pattern compiled = Pattern.compile("^" + regex + "$");
    
            SubscriptionImpl<T> sub = new SubscriptionImpl<>(subId, pattern, compiled, this);
    
            patternSubs
                .computeIfAbsent(compiled, k -> ConcurrentHashMap.newKeySet())
                .add(sub);
    
            subsIndex.put(subId, sub);
    
            log.info("Subscription {} created on pattern '{}'", subId, pattern);
            return sub;
        }
    
        @Override
        public String subscribe(String channel, EventHandler handler) {
            // Implementation simplifiee: on cree une subscription normale
            // et un thread qui consomme et appelle le handler
            Subscription<Object> sub = subscribe(channel, Object.class);
    
            Thread.ofVirtual()
                .name("handler-" + sub.getId())
                .start(() -> {
                    while (running) {
                        try {
                            BusEvent<Object> event = sub.getQueue().take();
                            if (event.payload() == null) break; // Poison pill
                            handler.onEvent(event);
                        } catch (InterruptedException e) {
                            Thread.currentThread().interrupt();
                            break;
                        } catch (Exception e) {
                            log.error("Handler error on {}: {}", channel, e.getMessage());
                        }
                    }
                });
    
            return sub.getId();
        }
    
        @Override
        public void unsubscribe(String subscriptionId) {
            SubscriptionImpl<?> sub = subsIndex.remove(subscriptionId);
            if (sub == null) {
                log.warn("Subscription {} not found", subscriptionId);
                return;
            }
    
            // Retirer des maps
            if (sub.pattern == null) {
                Set<SubscriptionImpl<?>> subs = channelSubs.get(sub.channelOrPattern);
                if (subs != null) subs.remove(sub);
            } else {
                Set<SubscriptionImpl<?>> subs = patternSubs.get(sub.pattern);
                if (subs != null) subs.remove(sub);
            }
    
            sub.notifyShutdown();
            log.info("Subscription {} removed", subscriptionId);
        }
    
        // ==================== Stats ====================
    
        @Override
        public EventBusStats getStats() {
            return new EventBusStats(
                publishedCount.get(),
                deliveredCount.get(),
                failedCount.get(),
                subsIndex.size(),
                channelSubs.size(),
                startedAt
            );
        }
    
        @Override
        public Map<String, ChannelStats> getChannelStats() {
            Map<String, ChannelStats> stats = new HashMap<>();
    
            for (Map.Entry<String, Set<SubscriptionImpl<?>>> entry : channelSubs.entrySet()) {
                stats.put(entry.getKey(), new ChannelStats(
                    entry.getKey(),
                    0, // TODO: compteur par channel
                    entry.getValue().size(),
                    0
                ));
            }
    
            return stats;
        }
    
        // ==================== Subscription Implementation ====================
    
        private static class SubscriptionImpl<T> implements Subscription<T> {
    
            private final String id;
            private final String channelOrPattern;
            private final Pattern pattern;
            private final BlockingQueue<BusEvent<T>> queue;
            private final InMemoryEventBus bus;
    
            SubscriptionImpl(String id, String channelOrPattern, Pattern pattern, InMemoryEventBus bus) {
                this.id = id;
                this.channelOrPattern = channelOrPattern;
                this.pattern = pattern;
                this.queue = new LinkedBlockingQueue<>(10_000); // Capacite par defaut
                this.bus = bus;
            }
    
            @Override
            public String getId() {
                return id;
            }
    
            @Override
            public String getChannelOrPattern() {
                return channelOrPattern;
            }
    
            @Override
            public BlockingQueue<BusEvent<T>> getQueue() {
                return queue;
            }
    
            @Override
            public void unsubscribe() {
                bus.unsubscribe(id);
            }
    
            @Override
            public int getPendingCount() {
                return queue.size();
            }
    
            void notifyShutdown() {
                // Injecter un poison pill pour debloquer les take()
                try {
                    queue.offer(new BusEvent<>(null, null, null, null, null, null));
                } catch (Exception ignored) {}
            }
        }
    }
    

    5.2 RedisEventBus

    Implementation pour multi-instances (production).

    package eu.lmvi.socle.event.impl;
    
    @Component
    @ConditionalOnProperty(name = "socle.eventbus.mode", havingValue = "redis")
    public class RedisEventBus implements EventBus {
    
        private final JedisPool jedisPool;
        private final ObjectMapper mapper;
        private final String prefix;
        private final ExecutorService listenerExecutor;
    
        // Subscriptions locales (une queue locale par subscription)
        private final Map<String, LocalSubscription<?>> localSubs = new ConcurrentHashMap<>();
    
        // JedisPubSub par channel (partage entre subscriptions locales)
        private final Map<String, RedisPubSubListener> redisListeners = new ConcurrentHashMap<>();
    
        // ... Implementation similaire avec Redis Pub/Sub ...
    
        @Override
        public void publish(String channel, Object payload) {
            try (Jedis jedis = jedisPool.getResource()) {
                BusEvent<?> event = BusEvent.of(channel, payload);
                String json = mapper.writeValueAsString(event);
                jedis.publish(prefix + channel, json);
            } catch (Exception e) {
                log.error("Redis publish error: {}", e.getMessage());
            }
        }
    
        // Redis SUBSCRIBE dans un thread dedie
        // Dispatch vers les queues locales
    }
    

    6. AbstractPushWorker

    6.1 Definition

    package eu.lmvi.socle.worker.push;
    
    /**
     * Worker reveille instantanement par l'EventBus.
     *
     * <p>Contrairement a {@link AbstractEventDrivenWorker} (DEPRECIE) qui utilise
     * un polling avec timeout, ce worker utilise un mecanisme push avec reveil
     * instantane via {@link BlockingQueue#take()}.</p>
     *
     * <h3>Avantages</h3>
     * <ul>
     *   <li>Latence ~0ms (reveil instantane)</li>
     *   <li>Zero CPU quand idle</li>
     *   <li>Integration native avec EventBus</li>
     * </ul>
     *
     * <h3>Exemple d'utilisation</h3>
     * <pre>{@code
     * @Component
     * public class OrderWorker extends AbstractPushWorker<OrderEvent> {
     *
     *     public OrderWorker() {
     *         super("orders.created", "orders.updated");
     *     }
     *
     *     @Override
     *     public String getName() {
     *         return "order_worker";
     *     }
     *
     *     @Override
     *     protected Class<OrderEvent> getEventType() {
     *         return OrderEvent.class;
     *     }
     *
     *     @Override
     *     protected void processEvent(OrderEvent event) {
     *         // Traitement instantane des que l'event arrive
     *     }
     * }
     * }</pre>
     *
     * @param <T> Type du payload des evenements
     */
    public abstract class AbstractPushWorker<T> implements Worker {
    
        protected final Logger logger = LoggerFactory.getLogger(getClass());
    
        // ==================== Injection ====================
    
        @Autowired
        protected EventBus eventBus;
    
        // ==================== Configuration ====================
    
        private final String[] channels;
        private final int concurrency;
        private final int queueCapacity;
        private final Duration shutdownTimeout;
    
        // ==================== Etat ====================
    
        protected final AtomicBoolean running = new AtomicBoolean(false);
        protected ExecutorService executor;
        protected List<Subscription<T>> subscriptions = new ArrayList<>();
        protected BlockingQueue<BusEvent<T>> mergedQueue;
    
        // ==================== Metriques ====================
    
        protected final AtomicLong processedCount = new AtomicLong(0);
        protected final AtomicLong errorCount = new AtomicLong(0);
        protected final AtomicLong totalDurationNs = new AtomicLong(0);
        protected volatile Instant lastExecution;
        protected volatile Instant startedAt;
    
        // ==================== Constructeurs ====================
    
        /**
         * Cree un worker push mono-thread.
         *
         * @param channels Channels a ecouter
         */
        protected AbstractPushWorker(String... channels) {
            this(1, channels);
        }
    
        /**
         * Cree un worker push avec concurrence.
         *
         * @param concurrency Nombre de threads de traitement
         * @param channels    Channels a ecouter
         */
        protected AbstractPushWorker(int concurrency, String... channels) {
            this(concurrency, 10_000, Duration.ofSeconds(30), channels);
        }
    
        /**
         * Cree un worker push avec configuration complete.
         *
         * @param concurrency     Nombre de threads de traitement
         * @param queueCapacity   Capacite de la queue interne
         * @param shutdownTimeout Timeout d'arret gracieux
         * @param channels        Channels a ecouter
         */
        protected AbstractPushWorker(int concurrency, int queueCapacity,
                                      Duration shutdownTimeout, String... channels) {
            this.concurrency = Math.max(1, concurrency);
            this.queueCapacity = queueCapacity;
            this.shutdownTimeout = shutdownTimeout;
            this.channels = channels;
    
            if (channels == null || channels.length == 0) {
                throw new IllegalArgumentException("At least one channel is required");
            }
        }
    
        // ==================== Worker Interface ====================
    
        @Override
        public final String getSchedule() {
            return "PUSH";
        }
    
        @Override
        public final long getCycleIntervalMs() {
            return -1;
        }
    
        @Override
        public final boolean isPassive() {
            return true;
        }
    
        @Override
        public int getStartPriority() {
            return 200;  // Apres l'EventBus (100)
        }
    
        @Override
        public int getStopPriority() {
            return 200;  // Avant l'EventBus
        }
    
        @Override
        public void initialize() {
            logger.info("[{}] Initializing push worker", getName());
            logger.info("[{}] Channels: {}", getName(), Arrays.toString(channels));
            logger.info("[{}] Concurrency: {}", getName(), concurrency);
    
            // Queue fusionnee pour tous les channels
            mergedQueue = new LinkedBlockingQueue<>(queueCapacity);
    
            onInitialize();
        }
    
        @Override
        public void start() {
            if (running.getAndSet(true)) {
                logger.warn("[{}] Already running", getName());
                return;
            }
    
            startedAt = Instant.now();
    
            // S'abonner aux channels
            for (String channel : channels) {
                if (channel.contains("*") || channel.contains(">")) {
                    // Pattern subscription
                    Subscription<T> sub = eventBus.subscribePattern(channel, getEventType());
                    subscriptions.add(sub);
                    startForwarder(sub);
                } else {
                    // Exact subscription
                    Subscription<T> sub = eventBus.subscribe(channel, getEventType());
                    subscriptions.add(sub);
                    startForwarder(sub);
                }
            }
    
            // Demarrer les workers de traitement
            executor = Executors.newThreadPerTaskExecutor(
                Thread.ofVirtual().name(getName() + "-push-", 0).factory()
            );
    
            for (int i = 0; i < concurrency; i++) {
                final int workerId = i;
                executor.submit(() -> pushLoop(workerId));
            }
    
            onStarted();
            logger.info("[{}] Started with {} workers on {} channels",
                getName(), concurrency, channels.length);
        }
    
        /**
         * Demarre un thread qui forward les events d'une subscription vers la queue fusionnee.
         */
        private void startForwarder(Subscription<T> sub) {
            Thread.ofVirtual()
                .name(getName() + "-fwd-" + sub.getId())
                .start(() -> {
                    while (running.get()) {
                        try {
                            BusEvent<T> event = sub.getQueue().take();
    
                            // Verifier poison pill
                            if (event.id() == null) break;
    
                            // Forward vers la queue fusionnee
                            if (!mergedQueue.offer(event, 1, TimeUnit.SECONDS)) {
                                logger.warn("[{}] Merged queue full, event dropped", getName());
                            }
                        } catch (InterruptedException e) {
                            Thread.currentThread().interrupt();
                            break;
                        }
                    }
                });
        }
    
        @Override
        public void stop() {
            if (!running.getAndSet(false)) {
                logger.warn("[{}] Not running", getName());
                return;
            }
    
            logger.info("[{}] Stopping...", getName());
            onStopping();
    
            // Se desabonner (injecte des poison pills)
            for (Subscription<T> sub : subscriptions) {
                try {
                    sub.unsubscribe();
                } catch (Exception e) {
                    logger.warn("[{}] Error unsubscribing: {}", getName(), e.getMessage());
                }
            }
            subscriptions.clear();
    
            // Injecter poison pills dans la queue fusionnee
            for (int i = 0; i < concurrency; i++) {
                mergedQueue.offer(new BusEvent<>(null, null, null, null, null, null));
            }
    
            // Attendre la fin des workers
            if (executor != null) {
                executor.shutdown();
                try {
                    if (!executor.awaitTermination(shutdownTimeout.toMillis(), TimeUnit.MILLISECONDS)) {
                        logger.warn("[{}] Forcing shutdown", getName());
                        executor.shutdownNow();
                    }
                } catch (InterruptedException e) {
                    executor.shutdownNow();
                    Thread.currentThread().interrupt();
                }
            }
    
            onStopped();
    
            long uptime = Duration.between(startedAt, Instant.now()).toSeconds();
            logger.info("[{}] Stopped after {}s. Processed={}, Errors={}",
                getName(), uptime, processedCount.get(), errorCount.get());
        }
    
        @Override
        public void doWork() {
            // Non utilise en mode PUSH
        }
    
        @Override
        public boolean isHealthy() {
            return running.get() && executor != null && !executor.isShutdown();
        }
    
        @Override
        public Map<String, Object> getStats() {
            long uptime = startedAt != null
                ? Duration.between(startedAt, Instant.now()).toMillis()
                : 0;
    
            double avgDurationMs = processedCount.get() > 0
                ? (totalDurationNs.get() / 1_000_000.0) / processedCount.get()
                : 0;
    
            double throughput = uptime > 0
                ? processedCount.get() * 1000.0 / uptime
                : 0;
    
            Map<String, Object> stats = new HashMap<>();
    
            // Cles standardisees
            stats.put("state", running.get() ? "running" : "stopped");
            stats.put("schedule", "PUSH");
            stats.put("execution_count", processedCount.get());
            stats.put("errors_count", errorCount.get());
            stats.put("last_execution", lastExecution != null ? lastExecution.toString() : null);
    
            // Cles specifiques
            stats.put("channels", channels);
            stats.put("concurrency", concurrency);
            stats.put("queue_size", mergedQueue != null ? mergedQueue.size() : 0);
            stats.put("queue_capacity", queueCapacity);
            stats.put("uptime_ms", uptime);
            stats.put("avg_duration_ms", Math.round(avgDurationMs * 100) / 100.0);
            stats.put("throughput_per_sec", Math.round(throughput * 100) / 100.0);
    
            // Stats des subscriptions
            stats.put("subscriptions", subscriptions.stream()
                .map(s -> Map.of(
                    "id", s.getId(),
                    "channel", s.getChannelOrPattern(),
                    "pending", s.getPendingCount()
                ))
                .toList());
    
            return stats;
        }
    
        // ==================== Push Loop ====================
    
        private void pushLoop(int workerId) {
            logger.debug("[{}] Push loop #{} started", getName(), workerId);
    
            while (running.get()) {
                try {
                    // BLOQUE jusqu'a reception d'un event (reveil instantane)
                    BusEvent<T> event = mergedQueue.take();
    
                    // Verifier poison pill
                    if (event.id() == null) {
                        logger.debug("[{}] Push loop #{} received shutdown signal", getName(), workerId);
                        break;
                    }
    
                    // Traiter l'event
                    processEventSafe(event, workerId);
    
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    logger.debug("[{}] Push loop #{} interrupted", getName(), workerId);
                    break;
                } catch (Exception e) {
                    logger.error("[{}] Unexpected error in push loop #{}: {}",
                        getName(), workerId, e.getMessage(), e);
                }
            }
    
            logger.debug("[{}] Push loop #{} stopped", getName(), workerId);
        }
    
        private void processEventSafe(BusEvent<T> event, int workerId) {
            long startNs = System.nanoTime();
    
            try {
                processEvent(event.payload(), event);
                processedCount.incrementAndGet();
            } catch (Exception e) {
                errorCount.incrementAndGet();
                handleError(event, e, workerId);
            } finally {
                long durationNs = System.nanoTime() - startNs;
                totalDurationNs.addAndGet(durationNs);
                lastExecution = Instant.now();
            }
        }
    
        // ==================== Methodes abstraites ====================
    
        /**
         * Type du payload des evenements.
         * Utilise pour le typage de la souscription.
         */
        protected abstract Class<T> getEventType();
    
        /**
         * Traite un evenement.
         *
         * @param payload Contenu de l'evenement
         * @param event   Evenement complet (avec metadata)
         */
        protected abstract void processEvent(T payload, BusEvent<T> event);
    
        /**
         * Version simplifiee sans acces a l'enveloppe.
         * Surcharger processEvent(T, BusEvent) pour avoir acces aux metadata.
         */
        protected void processEvent(T payload) {
            // Par defaut, appelle la version complete
        }
    
        // ==================== Hooks ====================
    
        /**
         * Hook d'initialisation.
         */
        protected void onInitialize() {}
    
        /**
         * Hook post-demarrage.
         */
        protected void onStarted() {}
    
        /**
         * Hook pre-arret.
         */
        protected void onStopping() {}
    
        /**
         * Hook post-arret.
         */
        protected void onStopped() {}
    
        /**
         * Gestion des erreurs.
         * Par defaut, log l'erreur. Surcharger pour DLQ, retry, etc.
         */
        protected void handleError(BusEvent<T> event, Exception e, int workerId) {
            logger.error("[{}] Worker #{} error processing event {}: {}",
                getName(), workerId, event.id(), e.getMessage(), e);
        }
    
        // ==================== Utilitaires ====================
    
        /**
         * Publie un evenement sur l'EventBus.
         * Raccourci pratique pour les workers qui produisent aussi des events.
         */
        protected void publish(String channel, Object payload) {
            eventBus.publish(channel, payload);
        }
    
        /**
         * Retourne le nombre d'evenements en attente.
         */
        public long getBacklog() {
            int pending = mergedQueue != null ? mergedQueue.size() : 0;
            for (Subscription<T> sub : subscriptions) {
                pending += sub.getPendingCount();
            }
            return pending;
        }
    }
    

    6.2 Version simplifiee (single channel)

    /**
     * Version simplifiee pour un seul channel.
     */
    public abstract class SimplePushWorker<T> extends AbstractPushWorker<T> {
    
        protected SimplePushWorker(String channel) {
            super(channel);
        }
    
        protected SimplePushWorker(int concurrency, String channel) {
            super(concurrency, channel);
        }
    
        @Override
        protected void processEvent(T payload, BusEvent<T> event) {
            processEvent(payload);
        }
    
        /**
         * Traite le payload directement.
         */
        protected abstract void processEvent(T payload);
    }
    

    7. Configuration

    7.1 application.yml

    socle:
      eventbus:
        # Activer l'EventBus
        enabled: ${EVENTBUS_ENABLED:true}
    
        # Mode: in_memory | redis
        mode: ${EVENTBUS_MODE:in_memory}
    
        # Configuration commune
        common:
          # Capacite par defaut des queues de subscription
          default_queue_capacity: ${EVENTBUS_QUEUE_CAPACITY:10000}
    
          # Timeout pour offer() quand queue pleine
          offer_timeout_ms: ${EVENTBUS_OFFER_TIMEOUT:100}
    
        # Configuration InMemory
        in_memory:
          # Rien de special
    
        # Configuration Redis
        redis:
          host: ${REDIS_HOST:localhost}
          port: ${REDIS_PORT:6379}
          password: ${REDIS_PASSWORD:}
          database: ${REDIS_DATABASE:0}
          prefix: ${EVENTBUS_REDIS_PREFIX:socle:eventbus}
    
          # Pool de connexions
          pool:
            max_total: ${EVENTBUS_REDIS_POOL_MAX:16}
            max_idle: ${EVENTBUS_REDIS_POOL_MAX_IDLE:8}
            min_idle: ${EVENTBUS_REDIS_POOL_MIN_IDLE:2}
    
        # Metriques
        metrics:
          enabled: ${EVENTBUS_METRICS_ENABLED:true}
          prefix: socle_eventbus
    

    7.2 Variables d’environnement

    Variable Description Defaut
    EVENTBUS_ENABLED Activer l’EventBus true
    EVENTBUS_MODE Mode (in_memory / redis) in_memory
    EVENTBUS_QUEUE_CAPACITY Capacite des queues 10000
    REDIS_HOST Host Redis localhost
    REDIS_PORT Port Redis 6379

    8. Exemples d’utilisation

    8.1 Worker simple

    @Component
    public class OrderNotificationWorker extends SimplePushWorker<OrderEvent> {
    
        @Autowired
        private NotificationService notificationService;
    
        public OrderNotificationWorker() {
            super(2, "orders.created");  // 2 workers concurrents
        }
    
        @Override
        public String getName() {
            return "order_notification_worker";
        }
    
        @Override
        protected Class<OrderEvent> getEventType() {
            return OrderEvent.class;
        }
    
        @Override
        protected void processEvent(OrderEvent event) {
            notificationService.notifyOrderCreated(event.orderId(), event.customerEmail());
        }
    }
    

    8.2 Worker multi-channels

    @Component
    public class AuditWorker extends AbstractPushWorker<Object> {
    
        @Autowired
        private AuditRepository auditRepository;
    
        public AuditWorker() {
            super(4, "orders.*", "payments.*", "users.*");  // Pattern matching
        }
    
        @Override
        public String getName() {
            return "audit_worker";
        }
    
        @Override
        protected Class<Object> getEventType() {
            return Object.class;  // Accepte tout
        }
    
        @Override
        protected void processEvent(Object payload, BusEvent<Object> event) {
            // Acces aux metadata
            AuditEntry entry = new AuditEntry(
                event.id(),
                event.channel(),
                event.timestamp(),
                event.source(),
                payload
            );
            auditRepository.save(entry);
        }
    }
    

    8.3 Publication d’evenements

    @Service
    public class OrderService {
    
        @Autowired
        private EventBus eventBus;
    
        @Autowired
        private OrderRepository orderRepository;
    
        public Order createOrder(CreateOrderRequest request) {
            // Logique metier
            Order order = orderRepository.save(new Order(request));
    
            // Publication sur l'EventBus
            // Tous les workers abonnes a "orders.created" sont reveilles instantanement
            eventBus.publish("orders.created", new OrderEvent(
                order.getId(),
                order.getCustomerEmail(),
                order.getTotal()
            ));
    
            return order;
        }
    
        public void updateOrderStatus(String orderId, OrderStatus newStatus) {
            Order order = orderRepository.updateStatus(orderId, newStatus);
    
            eventBus.publish("orders.updated", new OrderStatusEvent(
                orderId,
                newStatus
            ));
        }
    }
    

    8.4 Worker avec gestion d’erreur personnalisee

    @Component
    public class PaymentWorker extends AbstractPushWorker<PaymentEvent> {
    
        @Autowired
        private PaymentService paymentService;
    
        @Autowired
        private DeadLetterQueue dlq;
    
        public PaymentWorker() {
            super(2, "payments.process");
        }
    
        @Override
        public String getName() {
            return "payment_worker";
        }
    
        @Override
        protected Class<PaymentEvent> getEventType() {
            return PaymentEvent.class;
        }
    
        @Override
        protected void processEvent(PaymentEvent payload, BusEvent<PaymentEvent> event) {
            paymentService.processPayment(payload);
        }
    
        @Override
        protected void handleError(BusEvent<PaymentEvent> event, Exception e, int workerId) {
            logger.error("[{}] Payment processing failed for {}: {}",
                getName(), event.payload().paymentId(), e.getMessage());
    
            // Envoyer en DLQ pour retry ulterieur
            dlq.send("payments.dlq", event, e);
        }
    }
    

    9. Solutions aux questions ouvertes

    9.1 Arret propre des workers en take()

    Probleme : BlockingQueue.take() bloque indefiniment. Comment arreter proprement ?

    Solution : Poison Pill Pattern

    // A l'arret, injecter un evenement "poison" reconnaissable
    @Override
    public void stop() {
        running.set(false);
    
        // Injecter N poison pills (un par worker)
        for (int i = 0; i < concurrency; i++) {
            queue.offer(new BusEvent<>(null, null, null, null, null, null));
        }
    }
    
    // Dans la boucle, verifier le poison
    private void pushLoop(int workerId) {
        while (running.get()) {
            BusEvent<T> event = queue.take();
    
            // Detecter le poison pill (id == null)
            if (event.id() == null) {
                break;  // Sortir proprement
            }
    
            processEvent(event);
        }
    }
    

    9.2 Backpressure (queue bornee vs illimitee)

    Probleme : Que faire si les events arrivent plus vite qu’ils ne sont traites ?

    Solution : Queue bornee avec politique de debordement

    // Queue bornee (recommande)
    BlockingQueue<BusEvent<T>> queue = new LinkedBlockingQueue<>(10_000);
    
    // Lors du publish, gerer le debordement
    private boolean deliver(Subscription sub, BusEvent event) {
        // offer() avec timeout - ne bloque pas indefiniment
        boolean offered = queue.offer(event, 100, TimeUnit.MILLISECONDS);
    
        if (!offered) {
            // Politique de debordement:
            // Option 1: Drop (log + metrique)
            log.warn("Queue full, event dropped");
            droppedCount.incrementAndGet();
    
            // Option 2: Envoyer en DLQ
            dlq.send("overflow", event);
    
            // Option 3: Alerter
            alertService.queueOverflow(sub.getId());
        }
    
        return offered;
    }
    

    Metriques a surveiller :

    • socle_eventbus_queue_size : Taille actuelle
    • socle_eventbus_queue_capacity : Capacite max
    • socle_eventbus_dropped_total : Events perdus

    9.3 Multi-channel avec une seule queue

    Probleme : Un worker ecoute plusieurs channels, comment fusionner ?

    Solution : Queue fusionnee avec forwarders

    // Queue fusionnee
    BlockingQueue<BusEvent<T>> mergedQueue = new LinkedBlockingQueue<>();
    
    // Un forwarder par subscription
    for (String channel : channels) {
        Subscription<T> sub = eventBus.subscribe(channel, eventType);
    
        // Thread qui forward vers la queue fusionnee
        Thread.ofVirtual().start(() -> {
            while (running.get()) {
                BusEvent<T> event = sub.getQueue().take();
                mergedQueue.offer(event);
            }
        });
    }
    
    // Les workers consomment la queue fusionnee
    void pushLoop() {
        while (running.get()) {
            BusEvent<T> event = mergedQueue.take();
            // event.channel() permet de savoir d'ou il vient
            processEvent(event);
        }
    }
    

    9.4 Ordre des evenements

    Probleme : Les evenements doivent-ils etre traites dans l’ordre ?

    Solution : Depends du cas d’usage

    Mode Garantie Implementation
    Aucune Pas d’ordre garanti Concurrency > 1
    Par channel Ordre par channel Concurrency = 1 par channel
    Par cle Ordre par cle metier Partitioning sur la cle
    // Pour garantir l'ordre par cle (ex: par orderId)
    public class OrderedPushWorker extends AbstractPushWorker<OrderEvent> {
    
        // Une queue par partition (hash de la cle)
        private final BlockingQueue<BusEvent<OrderEvent>>[] partitionQueues;
        private final int partitions = 8;
    
        @Override
        protected void processEvent(OrderEvent payload, BusEvent<OrderEvent> event) {
            // Le hash garantit que les events du meme orderId vont dans la meme partition
            int partition = Math.abs(payload.orderId().hashCode()) % partitions;
            partitionQueues[partition].offer(event);
        }
    
        // Un worker par partition = ordre garanti par partition
    }
    

    9.5 Retry et DLQ

    Probleme : Que faire si le traitement echoue ?

    Solution : Retry avec backoff puis DLQ

    @Component
    public class ResilientPushWorker extends AbstractPushWorker<MyEvent> {
    
        @Autowired
        private EventBus eventBus;
    
        private static final int MAX_RETRIES = 3;
    
        @Override
        protected void processEvent(MyEvent payload, BusEvent<MyEvent> event) {
            int retryCount = getRetryCount(event);
    
            try {
                doProcess(payload);
            } catch (RetryableException e) {
                if (retryCount < MAX_RETRIES) {
                    // Re-publier avec retry count incremente
                    scheduleRetry(event, retryCount + 1);
                } else {
                    // Max retries atteint -> DLQ
                    sendToDlq(event, e);
                }
            }
        }
    
        private void scheduleRetry(BusEvent<MyEvent> event, int retryCount) {
            // Backoff exponentiel
            long delayMs = (long) Math.pow(2, retryCount) * 1000;
    
            // Re-publier apres le delai
            scheduler.schedule(() -> {
                Map<String, String> metadata = new HashMap<>(event.metadata());
                metadata.put("retry_count", String.valueOf(retryCount));
                eventBus.publish(event.channel(), event.payload(), metadata);
            }, delayMs, TimeUnit.MILLISECONDS);
        }
    
        private int getRetryCount(BusEvent<MyEvent> event) {
            return Integer.parseInt(event.metadata().getOrDefault("retry_count", "0"));
        }
    
        private void sendToDlq(BusEvent<MyEvent> event, Exception e) {
            eventBus.publish("dlq." + event.channel(), new DlqEntry(event, e));
        }
    }
    

    10. Migration des workers existants

    10.1 Avant (AbstractEventDrivenWorker)

    // DEPRECIE
    @Component
    public class OldOrderWorker extends AbstractEventDrivenWorker<OrderEvent> {
    
        private final BlockingQueue<OrderEvent> queue = new LinkedBlockingQueue<>();
    
        public OldOrderWorker() {
            super(4);
        }
    
        @Override
        public String getName() {
            return "order_worker";
        }
    
        @Override
        protected OrderEvent pollEvent() throws InterruptedException {
            return queue.poll(100, TimeUnit.MILLISECONDS);  // Polling!
        }
    
        @Override
        protected void processEvent(OrderEvent event) {
            // Traitement
        }
    
        // Methode externe pour recevoir les events
        public void onOrderReceived(OrderEvent event) {
            queue.offer(event);
        }
    }
    

    10.2 Apres (AbstractPushWorker)

    // RECOMMANDE
    @Component
    public class NewOrderWorker extends AbstractPushWorker<OrderEvent> {
    
        public NewOrderWorker() {
            super(4, "orders.created");  // Specifie le channel
        }
    
        @Override
        public String getName() {
            return "order_worker";
        }
    
        @Override
        protected Class<OrderEvent> getEventType() {
            return OrderEvent.class;
        }
    
        @Override
        protected void processEvent(OrderEvent payload, BusEvent<OrderEvent> event) {
            // Meme traitement
        }
    
        // Plus besoin de methode externe - l'EventBus gere tout
    }
    

    10.3 Etapes de migration

    1. Ajouter la dependance EventBus au worker
    2. Changer la classe parente : AbstractEventDrivenWorkerAbstractPushWorker
    3. Specifier les channels dans le constructeur
    4. Implementer getEventType()
    5. Adapter processEvent() pour accepter BusEvent
    6. Supprimer la queue interne et pollEvent()
    7. Modifier les producteurs pour utiliser eventBus.publish()
    8. Tester le comportement

    11. Metriques Prometheus

    # EventBus
    socle_eventbus_published_total{channel="orders.created"}
    socle_eventbus_delivered_total{channel="orders.created"}
    socle_eventbus_failed_total{channel="orders.created"}
    socle_eventbus_subscriptions_active{channel="orders.created"}
    socle_eventbus_queue_size{subscription="sub-123"}
    
    # Push Workers
    socle_push_worker_processed_total{worker="order_worker"}
    socle_push_worker_errors_total{worker="order_worker"}
    socle_push_worker_queue_size{worker="order_worker"}
    socle_push_worker_duration_seconds{worker="order_worker", quantile="0.99"}
    

    12. API REST Admin

    GET  /admin/eventbus/stats              → Statistiques globales
    GET  /admin/eventbus/channels           → Liste des channels actifs
    GET  /admin/eventbus/subscriptions      → Liste des subscriptions
    
    POST /admin/eventbus/publish            → Publier un event (debug)
         Body: {"channel": "test", "payload": {...}}
    

    13. Voir aussi

    Socle V004 – EventBus et Workers Push

  • Socle V004 – Architecture

    Socle V004 – Architecture

    02 – Architecture du Socle V4

    Version : 4.0.0 Date : 2025-01-25

    1. Vue d’ensemble

    ┌─────────────────────────────────────────────────────────────────┐
    │                         SOCLE V4                                │
    │                                                                 │
    │  ┌─────────────────────────────────────────────────────────┐   │
    │  │                    MOP (inchangé)                        │   │
    │  │  - Orchestration Workers                                 │   │
    │  │  - Lifecycle management                                  │   │
    │  │  - Scheduling doWork()                                   │   │
    │  └─────────────────────────────────────────────────────────┘   │
    │                              │                                  │
    │  ┌───────────┬───────────┬───┴───────┬─────────────┐           │
    │  │           │           │           │             │           │
    │  ▼           ▼           ▼           ▼             ▼           │
    │ ┌─────┐  ┌───────┐  ┌────────┐  ┌─────────┐  ┌──────────┐     │
    │ │KvBus│  │Shared │  │Supervi-│  │ HTTP    │  │ Workers  │     │
    │ │     │  │Data   │  │sor     │  │ Worker  │  │ métier   │     │
    │ └─────┘  └───────┘  └────────┘  └─────────┘  └──────────┘     │
    │                                                                 │
    │  ════════════════════ NOUVEAUTÉS V4 ════════════════════════   │
    │                                                                 │
    │  ┌─────────────┐  ┌─────────────┐  ┌─────────────────────┐     │
    │  │ H2 TechDB   │  │ Log4j2 +    │  │ Clients centraux    │     │
    │  │ (embarqué)  │  │ LogForwarder│  │ - SocleAuthClient   │     │
    │  │             │  │             │  │ - WorkerRegistry    │     │
    │  └─────────────┘  └─────────────┘  └─────────────────────┘     │
    │                                                                 │
    │  ┌─────────────┐  ┌─────────────────────────────────────┐      │
    │  │ Status      │  │ Pipeline V2                          │      │
    │  │ Dashboard   │  │ (Queue/Claim/Ack, DLQ, at-least-once)│      │
    │  │ (port 9374) │  │                                      │      │
    │  └─────────────┘  └─────────────────────────────────────┘      │
    │                                                                 │
    └─────────────────────────────────────────────────────────────────┘
    

    2. Architecture en couches

    ┌─────────────────────────────────────────────────────────────────┐
    │                    COUCHE APPLICATION                           │
    │  Workers métier + Contrôleurs REST                              │
    ├─────────────────────────────────────────────────────────────────┤
    │                    COUCHE FRAMEWORK (SOCLE)                     │
    │  MOP + Core Components + Nouveautés V4                          │
    ├─────────────────────────────────────────────────────────────────┤
    │                    COUCHE INFRASTRUCTURE                        │
    │  Tomcat (HTTP), H2 (TechDB), Redis (KV), Kafka/NATS (Msg)      │
    └─────────────────────────────────────────────────────────────────┘
    

    3. Structure des packages

    eu.lmvi.socle/
    │
    │  ══════ COMPOSANTS V3 (conservés) ══════
    ├── mop/
    │   └── MainOrchestratorProcess.java    # Orchestrateur central
    ├── worker/
    │   └── Worker.java                     # Interface de base
    ├── config/
    │   └── SocleConfiguration.java         # Configuration centralisée
    ├── kv/
    │   ├── KvBus.java                      # Abstraction KV
    │   ├── KvImplementation.java           # Interface implémentation
    │   ├── InMemoryKvImplementation.java   # Implémentation mémoire
    │   └── RedisKvImplementation.java      # Implémentation Redis
    ├── shared/
    │   └── SharedDataRegistry.java         # Registre données partagées
    ├── supervisor/
    │   └── Supervisor.java                 # Monitoring heartbeats
    ├── http/
    │   ├── HttpWorker.java                 # Worker HTTP
    │   ├── TomcatManager.java              # Gestion Tomcat
    │   └── GracefulShutdownFilter.java     # Filtre drain
    ├── admin/
    │   └── AdminRestApi.java               # API REST admin
    ├── metrics/
    │   └── SocleMetrics.java               # Métriques
    ├── pipeline/
    │   └── PipelineEngine.java             # Traitement asynchrone
    ├── resilience/
    │   ├── CircuitBreaker.java             # Circuit Breaker
    │   └── RetryExecutor.java              # Retry avec backoff
    ├── scheduler/
    │   ├── WorkerScheduler.java            # Scheduler
    │   └── CronExpression.java             # Parser cron
    ├── security/
    │   ├── AdminAuthFilter.java            # Auth admin
    │   └── RateLimitFilter.java            # Rate limiting
    │
    │  ══════ NOUVEAUX COMPOSANTS V4 ══════
    ├── techdb/
    │   ├── TechDbManager.java              # Gestionnaire H2
    │   ├── TechDbConfig.java               # Configuration Spring
    │   └── TechDbRepository.java           # Repository
    ├── logging/
    │   ├── SocleLogForwarderAppender.java  # Appender Log4j2
    │   ├── LogTransport.java               # Interface transport
    │   ├── HttpLogTransport.java           # Transport HTTP
    │   ├── NatsLogTransport.java           # Transport NATS
    │   └── H2FallbackStorage.java          # Fallback H2
    └── client/
        ├── auth/
        │   ├── SocleAuthClient.java        # Interface auth
        │   ├── AuthTokenManager.java       # Gestion tokens
        │   └── AuthTokens.java             # DTO tokens
        └── registry/
            ├── WorkerRegistryClient.java   # Client registry
            ├── WorkerRegistration.java     # DTO registration
            └── WorkerHeartbeat.java        # DTO heartbeat
    

    4. Flux de démarrage

    1. Spring Boot initialise
    2. SocleConfiguration charge la config (.env + YAML)
    3. Spring crée les beans @Component
    4. MOP.start() appelé (ApplicationReadyEvent)
       │
       ├── 4.1 [V4] TechDbManager.initialize()
       │         └── Création tables H2
       │         └── Restauration offsets
       │
       ├── 4.2 [V4] SocleAuthClient.login()
       │         └── Obtention JWT
       │
       ├── 4.3 [V4] WorkerRegistryClient.register()
       │         └── Enregistrement au Registry
       │
       ├── 4.4 SharedDataRegistry.initialize()
       ├── 4.5 KvBus.initialize()
       ├── 4.6 Supervisor.start()
       ├── 4.7 Metrics.start()
       │
       ├── 4.8 Workers triés par START_PRIORITY
       │   └── Pour chaque worker:
       │       ├── worker.initialize()
       │       ├── worker.start()
       │       ├── Register avec Supervisor
       │       └── Schedule si isScheduled()
       │
       ├── 4.9 HttpWorker.start() [priorité 1000 = dernier]
       └── 4.10 Boucle principale doWork()
    

    5. Flux de shutdown

    1. Signal SIGTERM (ou /admin/shutdown)
    2. MOP.stop() appelé
    3. État → DRAINING
       │
       ├── 3.1 HttpWorker.startDraining()
       │         └── Refuse nouvelles connexions
       ├── 3.2 HttpWorker.awaitDrain(timeout)
       │         └── Attente requêtes en cours
       │
       ├── 3.3 Workers par STOP_PRIORITY (petit = premier)
       │   ├── HttpWorker.stop() [priorité 0]
       │   └── Autres workers [priorités 1-999]
       │
       ├── 3.4 [V4] WorkerRegistryClient.unregister()
       ├── 3.5 [V4] TechDbManager.close()
       │
       ├── 3.6 Supervisor.shutdown()
       ├── 3.7 SharedData.close()
       └── 3.8 État → STOPPED
    

    6. Dépendances entre composants

    ┌──────────────────────────────────────────────────────────────┐
    │                        MOP (orchestrateur)                    │
    │                              │                                │
    │         ┌────────────────────┼────────────────────┐          │
    │         │                    │                    │          │
    │         ▼                    ▼                    ▼          │
    │   ┌───────────┐       ┌───────────┐       ┌───────────┐     │
    │   │ TechDB    │◄──────│ Supervisor│───────►│ Workers   │     │
    │   │ Manager   │       │           │        │           │     │
    │   └─────┬─────┘       └─────┬─────┘        └─────┬─────┘     │
    │         │                   │                    │           │
    │         │                   ▼                    │           │
    │         │            ┌───────────┐               │           │
    │         │            │ Worker    │◄──────────────┘           │
    │         │            │ Registry  │                           │
    │         │            │ Client    │                           │
    │         │            └─────┬─────┘                           │
    │         │                  │                                 │
    │         ▼                  ▼                                 │
    │   ┌───────────┐     ┌───────────┐                           │
    │   │ H2        │     │ SocleAuth │                           │
    │   │ Database  │     │ Client    │                           │
    │   └───────────┘     └─────┬─────┘                           │
    │                           │                                  │
    │                           ▼                                  │
    │                    ┌─────────────┐                           │
    │                    │ LogForwarder│──► HTTP/NATS (sortant)   │
    │                    │ Appender    │                           │
    │                    └─────────────┘                           │
    └──────────────────────────────────────────────────────────────┘
    

    7. Architecture du Logging V4

    ┌─────────────────────────────────────────────────────────────┐
    │                      Application                             │
    │                                                              │
    │  Logger.info("message")                                      │
    │         │                                                    │
    │         ▼                                                    │
    │  ┌─────────────────────────────────────────────────────┐    │
    │  │              Log4j2 AsyncLoggers                     │    │
    │  │              (LMAX Disruptor)                        │    │
    │  └────────────────────┬────────────────────────────────┘    │
    │                       │                                      │
    │         ┌─────────────┼─────────────┐                       │
    │         ▼             ▼             ▼                       │
    │  ┌───────────┐ ┌───────────┐ ┌─────────────────────┐       │
    │  │  Console  │ │  File     │ │ SocleLogForwarder   │       │
    │  │  Appender │ │  Appender │ │ Appender            │       │
    │  └───────────┘ └───────────┘ └──────────┬──────────┘       │
    │                                         │                   │
    └─────────────────────────────────────────┼───────────────────┘
                                              │
                        ┌─────────────────────┴─────────────────────┐
                        │                                           │
                        ▼                                           ▼
                ┌──────────────┐                           ┌──────────────┐
                │ HTTP Transport│                           │ NATS Transport│
                │ → LogHub     │                           │ → JetStream  │
                └──────┬───────┘                           └──────┬───────┘
                       │                                          │
                       │  (si échec)                              │
                       ▼                                          │
                ┌──────────────┐                                  │
                │ H2 Fallback  │◄─────────────────────────────────┘
                │ Storage      │
                └──────────────┘
    

    8. Patterns de conception

    Pattern Composant Usage
    Orchestrator MOP Orchestre tout le lifecycle
    Observer KvBus pub/sub Communication inter-workers
    Strategy KvImplementation In-memory ou Redis
    Registry SharedDataRegistry État partagé
    Circuit Breaker CircuitBreaker Résilience pannes
    Factory Spring DI Création composants
    Builder Configuration Construction config

    9. Thread Safety

    Mécanisme Usage
    ConcurrentHashMap Maps partagées
    AtomicLong/Boolean Compteurs atomiques
    ReentrantReadWriteLock Opérations complexes
    BlockingQueue Queues thread-safe
    CompletableFuture Opérations async
    ScheduledExecutorService Scheduling
    LMAX Disruptor Ring buffer logging

    10. Points d’extension

    Créer un nouveau Worker

    @Component
    public class MonWorker implements Worker {
        // Implémenter l'interface Worker
    }
    

    Ajouter une implémentation KvBus

    public class MonKvImplementation implements KvImplementation {
        // Implémenter l'interface
    }
    

    Créer un Pipeline Stage

    public class MonStage implements PipelineStage<MonType> {
        // Implémenter le traitement
    }
    

    11. Références

  • Socle V004 – Janino

    Socle V004 – Janino

    29 – Janino (Compilateur Java Dynamique)

    Version : 4.0.0 Date : 2026-01-17

    1. Introduction

    Janino est un compilateur Java embarqué qui permet de compiler du code source Java en bytecode JVM à la volée. Contrairement au ScriptEngine existant (interprété), Janino offre des performances natives car le code est réellement compilé.

    Positionnement

    ┌─────────────────────────────────────────────────────────────────┐
    │                      Socle V004 Scripts                          │
    ├─────────────────────────────────────────────────────────────────┤
    │                                                                  │
    │  ┌─────────────────────┐       ┌─────────────────────┐          │
    │  │    ScriptEngine     │       │   JaninoEngine      │          │
    │  │    (existant)       │       │   (NOUVEAU)         │          │
    │  ├─────────────────────┤       ├─────────────────────┤          │
    │  │ - JavaScript        │       │ - Java pur          │          │
    │  │ - BeanShell         │       │ - Bytecode natif    │          │
    │  │ - Interprété        │       │ - Haute performance │          │
    │  │ - Typage dynamique  │       │ - Typage statique   │          │
    │  └─────────────────────┘       └─────────────────────┘          │
    │                                                                  │
    └─────────────────────────────────────────────────────────────────┘
    

    Cas d’usage

    Situation Recommandation
    Calculs financiers (frais, taxes) Janino
    Validations métier complexes Janino
    Transformations haute performance Janino
    Scripts simples, prototypage ScriptEngine
    Formules configurables Janino

    2. Architecture

    2.1 Composants

    eu.lmvi.socle.janino/
    ├── JaninoEngine.java              # Moteur principal
    ├── JaninoWorker.java              # Worker de gestion
    ├── JaninoScript.java              # Script compilé
    ├── JaninoClassLoader.java         # ClassLoader sécurisé
    ├── JaninoConfiguration.java       # Configuration Spring
    ├── JaninoCompilationException.java # Exception
    └── interfaces/
        ├── Calculator.java            # Interface calculateur
        ├── Executable.java            # Interface exécutable
        ├── Validator.java             # Interface validateur
        └── ValidationResult.java      # Résultat validation
    

    2.2 Diagramme de classes

    ┌───────────────────────┐
    │   JaninoWorker        │  (implements Worker)
    │   @Component          │
    ├───────────────────────┤
    │ - janinoEngine        │
    │ - config              │
    │ - techDb              │
    │ - supervisor          │
    ├───────────────────────┤
    │ + execute()           │
    │ + compileScript()     │
    │ + forceReload()       │
    │ + getEngine()         │
    └───────────┬───────────┘
                │
                ▼
    ┌───────────────────────┐
    │   JaninoEngine        │
    │   @Component          │
    ├───────────────────────┤
    │ - scriptCache         │
    │ - classLoader         │
    ├───────────────────────┤
    │ + compile()           │
    │ + execute()           │
    │ + reload()            │
    │ + getStats()          │
    └───────────┬───────────┘
                │
                ▼
    ┌───────────────────────┐
    │   JaninoScript        │
    ├───────────────────────┤
    │ - name                │
    │ - compiledClass       │
    │ - executionCount      │
    │ - avgExecutionTimeNs  │
    └───────────────────────┘
    

    3. Configuration

    3.1 application.yml

    socle:
      janino:
        # Activer Janino (défaut: false)
        enabled: ${JANINO_ENABLED:false}
    
        # Répertoire des scripts Java
        scripts-path: ${JANINO_SCRIPTS_PATH:./repository/scripts/java}
    
        # Intervalle de rechargement (défaut: 5 minutes)
        reload-interval-ms: ${JANINO_RELOAD_INTERVAL:300000}
    
        # Nombre max de classes en cache
        max-cached-classes: ${JANINO_MAX_CACHED:100}
    
        # Sécurité
        security:
          # Packages bloqués dans les scripts
          blocked-packages:
            - java.io
            - java.net
            - java.lang.reflect
            - java.lang.invoke
            - sun.
            - com.sun.
    
          # Timeout d'exécution max
          max-execution-time-ms: ${JANINO_MAX_EXEC_TIME:5000}
    

    3.2 Variables d’environnement

    Variable Description Défaut
    JANINO_ENABLED Activer Janino false
    JANINO_SCRIPTS_PATH Répertoire scripts ./repository/scripts/java
    JANINO_RELOAD_INTERVAL Intervalle reload (ms) 300000
    JANINO_MAX_CACHED Max classes en cache 100
    JANINO_MAX_EXEC_TIME Timeout exécution (ms) 5000

    4. Interfaces de Scripts

    Les scripts Java doivent implémenter une des interfaces suivantes :

    4.1 Calculator

    Pour les calculs (frais, taxes, conversions).

    package eu.lmvi.socle.janino.interfaces;
    
    public interface Calculator<T> {
        T calculate(Map<String, Object> context);
    }
    

    Exemple :

    import eu.lmvi.socle.janino.interfaces.Calculator;
    import java.math.BigDecimal;
    import java.math.RoundingMode;
    import java.util.Map;
    
    public class FeeCalculator implements Calculator<BigDecimal> {
    
        private static final BigDecimal FEE_RATE = new BigDecimal("0.0026");
    
        @Override
        public BigDecimal calculate(Map<String, Object> context) {
            BigDecimal amount = (BigDecimal) context.get("amount");
            String side = (String) context.get("side");
    
            BigDecimal rate = "MAKER".equals(side)
                ? new BigDecimal("0.0016")
                : FEE_RATE;
    
            return amount.multiply(rate).setScale(8, RoundingMode.HALF_UP);
        }
    }
    

    4.2 Executable

    Pour les exécutions génériques.

    package eu.lmvi.socle.janino.interfaces;
    
    public interface Executable {
        Object execute(Map<String, Object> context);
    }
    

    Exemple :

    import eu.lmvi.socle.janino.interfaces.Executable;
    import java.util.Map;
    
    public class OrderProcessor implements Executable {
    
        @Override
        public Object execute(Map<String, Object> context) {
            String orderId = (String) context.get("orderId");
            Double amount = (Double) context.get("amount");
    
            // Logique de traitement...
    
            return Map.of(
                "status", "PROCESSED",
                "orderId", orderId,
                "processedAmount", amount * 0.99
            );
        }
    }
    

    4.3 Validator

    Pour les validations métier.

    package eu.lmvi.socle.janino.interfaces;
    
    public interface Validator {
        ValidationResult validate(Object input);
    }
    

    Exemple :

    import eu.lmvi.socle.janino.interfaces.Validator;
    import eu.lmvi.socle.janino.interfaces.ValidationResult;
    import java.math.BigDecimal;
    
    public class OrderValidator implements Validator {
    
        private static final BigDecimal MIN_AMOUNT = new BigDecimal("10.00");
        private static final BigDecimal MAX_AMOUNT = new BigDecimal("100000.00");
    
        @Override
        public ValidationResult validate(Object input) {
            Order order = (Order) input;
            ValidationResult result = new ValidationResult();
    
            if (order.getAmount().compareTo(MIN_AMOUNT) < 0) {
                result.addError("AMOUNT_TOO_LOW",
                    "Amount must be >= " + MIN_AMOUNT);
            }
    
            if (order.getAmount().compareTo(MAX_AMOUNT) > 0) {
                result.addError("AMOUNT_TOO_HIGH",
                    "Amount must be <= " + MAX_AMOUNT);
            }
    
            if (order.getPair() == null || order.getPair().isEmpty()) {
                result.addError("INVALID_PAIR", "Trading pair is required");
            }
    
            return result;
        }
    }
    

    5. Utilisation

    5.1 Structure des scripts

    repository/scripts/java/
    ├── fees/
    │   ├── KrakenFeeCalculator.java
    │   ├── BinanceFeeCalculator.java
    │   └── CryptoComFeeCalculator.java
    ├── validators/
    │   ├── OrderValidator.java
    │   └── AmountValidator.java
    └── processors/
        ├── OrderProcessor.java
        └── TradeProcessor.java
    

    5.2 Injection dans un Worker

    @Component
    public class TradingWorker implements Worker {
    
        @Autowired
        private JaninoWorker janinoWorker;
    
        @Override
        public void doWork() {
            // Exécuter un calculateur
            Map<String, Object> context = Map.of(
                "amount", new BigDecimal("1000.00"),
                "side", "TAKER"
            );
    
            BigDecimal fee = janinoWorker.execute(
                "KrakenFeeCalculator",
                context,
                BigDecimal.class
            );
    
            log.info("Fee calculated: {}", fee);
        }
    }
    

    5.3 Compilation à la volée

    @Autowired
    private JaninoWorker janinoWorker;
    
    public void compileCustomScript() {
        String source = """
            import eu.lmvi.socle.janino.interfaces.Calculator;
            import java.math.BigDecimal;
            import java.util.Map;
    
            public class CustomCalculator implements Calculator<BigDecimal> {
                @Override
                public BigDecimal calculate(Map<String, Object> context) {
                    BigDecimal value = (BigDecimal) context.get("value");
                    return value.multiply(new BigDecimal("1.05"));
                }
            }
            """;
    
        janinoWorker.compileScript("CustomCalculator", source);
    
        // Exécuter
        BigDecimal result = janinoWorker.execute(
            "CustomCalculator",
            Map.of("value", new BigDecimal("100")),
            BigDecimal.class
        );
    }
    

    5.4 Accès direct au moteur

    @Autowired
    private JaninoWorker janinoWorker;
    
    public void advancedUsage() {
        JaninoEngine engine = janinoWorker.getEngine();
    
        // Vérifier si un script est compilé
        boolean ready = engine.isCompiled("FeeCalculator");
    
        // Liste des scripts compilés
        Set<String> scripts = engine.getCompiledScripts();
    
        // Statistiques détaillées
        Map<String, Map<String, Object>> stats = engine.getScriptStats();
    
        // Forcer le rechargement
        janinoWorker.forceReload();
    }
    

    6. Hot-Reload

    Le JaninoWorker surveille automatiquement les modifications des fichiers .java dans le répertoire configuré.

    Fonctionnement

    1. À chaque cycle (reload-interval-ms), le worker scanne le répertoire
    2. Pour chaque fichier modifié (timestamp changé), le script est recompilé
    3. Le nouveau bytecode remplace l’ancien dans le cache
    4. Les prochaines exécutions utilisent la nouvelle version

    Logs

    [exec:xxx][step:janino_reloaded] Reloaded script: FeeCalculator
    [exec:xxx][step:janino_reload_cycle] Reload cycle completed (5 scripts)
    

    Forcer le rechargement

    // Recharger tous les scripts
    janinoWorker.forceReload();
    

    7. Securite

    7.1 Validation des Imports (Source-Level)

    Le JaninoEngine valide les imports avant la compilation pour bloquer l’acces aux packages dangereux. Cette approche (validation au niveau du code source) est plus compatible avec les fat-jars Spring Boot que la precedente approche ClassLoader.

    Package bloque Raison
    java.io Acces fichiers
    java.net Acces reseau
    java.lang.reflect Reflection
    java.lang.invoke MethodHandles
    sun.* Classes internes
    com.sun.* Classes internes

    7.2 Fonctionnement

    Source Java → validateSourceSecurity() → Compilation Janino → Execution
                         ↓
                  Analyse des imports
                         ↓
                Blocage si package interdit
    

    Le moteur analyse les declarations import dans le code source et rejette le script si un package bloque est detecte.

    7.3 Tentative d’acces bloque

    // Ce script sera bloque AVANT compilation
    import java.io.File;  // BLOQUE!
    
    public class MaliciousScript implements Executable {
        @Override
        public Object execute(Map<String, Object> context) {
            File file = new File("/etc/passwd");
            return null;
        }
    }
    

    Erreur :

    JaninoCompilationException: Security violation in script 'MaliciousScript':
    Import of blocked package 'java.io' is not allowed. Blocked import: java.io.File
    

    7.4 Configuration personnalisee

    socle:
      janino:
        security:
          blocked-packages:
            - java.io
            - java.net
            - java.lang.reflect
            - java.lang.invoke
            - sun.
            - com.sun.
            - com.mycompany.internal  # Packages internes custom
    

    8. Métriques et Monitoring

    8.1 Stats du Worker

    Map<String, Object> stats = janinoWorker.getStats();
    
    // Résultat:
    {
        "name": "janino_worker",
        "running": true,
        "healthy": true,
        "scripts_path": "./repository/scripts/java",
        "reload_interval_ms": 300000,
        "reload_count": 15,
        "last_reload_at": "2026-01-17T10:30:00Z",
        "scripts_compiled": 5,
        "compilation_count": 8,
        "compilation_errors": 0,
        "execution_count": 1234,
        "execution_errors": 2
    }
    

    8.2 Stats par script

    Map<String, Map<String, Object>> scriptStats = janinoWorker.getEngine().getScriptStats();
    
    // Résultat:
    {
        "KrakenFeeCalculator": {
            "class": "KrakenFeeCalculator",
            "compiled_at": 1705487400000,
            "execution_count": 500,
            "avg_execution_us": 12
        },
        "BinanceFeeCalculator": {
            "class": "BinanceFeeCalculator",
            "compiled_at": 1705487400000,
            "execution_count": 300,
            "avg_execution_us": 8
        }
    }
    

    8.3 TechDB

    Le worker persiste ses stats dans la table janino_stats :

    SELECT * FROM janino_stats;
    
    -- worker_name | scripts_count | compilation_count | execution_count | reload_count | last_reload
    -- janino_worker | 5 | 8 | 1234 | 15 | 2026-01-17 10:30:00
    

    9. Comparaison avec ScriptEngine

    Critère ScriptEngine JaninoEngine
    Langages JavaScript, BeanShell Java pur
    Exécution Interprétée Compilée (bytecode)
    Performance ~1000x plus lent Native JVM
    Typage Dynamique Statique
    IDE Support Limité Complet (Java)
    Debug Difficile Standard Java
    Hot-reload Non Oui
    Sécurité Sandbox complexe ClassLoader isolation
    Courbe apprentissage Nouveau langage Java existant

    Quand utiliser quoi ?

    Situation Recommandation
    Calculs critiques (frais, taxes) JaninoEngine
    Validations complexes JaninoEngine
    Scripts simples, one-liners ScriptEngine
    Prototypage rapide ScriptEngine
    Logique métier complexe JaninoEngine
    Transformations JSON basiques ScriptEngine

    10. Dépannage

    10.1 Script non compilé

    Erreur :

    IllegalStateException: Script not compiled: MyScript
    

    Solution :

    • Vérifier que le fichier existe dans scripts-path
    • Vérifier l’extension .java
    • Consulter les logs pour les erreurs de compilation

    10.2 Erreur de compilation

    Erreur :

    JaninoCompilationException: Failed to compile script: MyScript
    

    Solution :

    • Vérifier la syntaxe Java
    • Vérifier les imports
    • Vérifier que la classe implémente une interface valide

    10.3 Classe non trouvée

    Erreur :

    Cannot find class name in source
    

    Solution :

    • Le source doit contenir public class NomDeLaClasse
    • Le nom de la classe doit correspondre au nom du fichier

    10.4 Package bloque

    Erreur :

    JaninoCompilationException: Security violation in script 'MyScript':
    Import of blocked package 'java.io' is not allowed
    

    Solution :

    • Utiliser uniquement les packages autorises
    • Si necessaire, modifier la config blocked-packages

    10.5 Cannot load simple types (Fat-Jar Spring Boot)

    Erreur :

    org.codehaus.commons.compiler.CompileException: Cannot load simple types
    

    Cause : Ce probleme survient dans les fat-jars Spring Boot car le classloader personnalise casse la resolution des modules Java 9+.

    Solution : Cette erreur a ete corrigee dans le socle V4. Le JaninoEngine utilise maintenant le context classloader directement :

    // JaninoEngine.java - ligne 117-121
    ClassLoader contextLoader = Thread.currentThread().getContextClassLoader();
    if (contextLoader == null) {
        contextLoader = getClass().getClassLoader();
    }
    compiler.setParentClassLoader(contextLoader);
    

    Si vous rencontrez encore cette erreur, verifiez que vous utilisez la derniere version du socle.

    10.6 Assignment conversion not possible from Object

    Erreur :

    Line 35, Column 41: Assignment conversion not possible from type "java.lang.Object" to type "java.math.BigDecimal"
    

    Cause : Janino ne supporte pas l’inference de type pour Map.get().

    Solution : Ajouter un cast explicite :

    // AVANT (erreur)
    BigDecimal rate = myMap.get(key);
    
    // APRES (correct)
    BigDecimal rate = (BigDecimal) myMap.get(key);
    

    10.7 Invalid escape sequence

    Erreur :

    Line 27, Column 35: Invalid escape sequence
    

    Cause : Les sequences d’echappement regex (\s, \d, \{) doivent etre double-echappees.

    Solution :

    // AVANT (erreur)
    Pattern p = Pattern.compile("\s+");
    
    // APRES (correct)
    Pattern p = Pattern.compile("\\s+");
    

    10.8 Invocation of static interface methods

    Erreur :

    Invocation of static interface methods only available for target version 8+
    

    Cause : Janino ne supporte pas Map.of(), Set.of(), List.of() (Java 9+).

    Solution : Utiliser des blocs static :

    // AVANT (erreur)
    private static final Map<String, String> DATA = Map.of("a", "b");
    
    // APRES (correct)
    private static final Map<String, String> DATA = new HashMap<>();
    static {
        DATA.put("a", "b");
    }
    

    11. Bonnes Pratiques

    DO

    • Implémenter une des interfaces (Calculator, Executable, Validator)
    • Utiliser des types explicites (pas de var)
    • Gérer les exceptions dans le script
    • Tester les scripts avant déploiement
    • Utiliser des noms de classes descriptifs

    DON’T

    • Ne pas utiliser java.io, java.net, java.lang.reflect
    • Ne pas stocker d’état entre les exécutions (stateless)
    • Ne pas faire d’opérations bloquantes longues
    • Ne pas utiliser de dépendances externes non disponibles

    12. Limitations Janino et Compatibilite

    Janino est un compilateur Java simplifie qui ne supporte pas toutes les fonctionnalites du langage Java moderne. Cette section documente les limitations et les solutions.

    12.1 Fonctionnalites Non Supportees

    Fonctionnalite Version Java Statut Janino
    Methodes generiques <T> Java 5+ NON SUPPORTE
    Map.of(), Set.of(), List.of() Java 9+ NON SUPPORTE
    switch expressions Java 14+ NON SUPPORTE
    var (inference de type) Java 10+ NON SUPPORTE
    Records Java 16+ NON SUPPORTE
    Pattern matching Java 16+ NON SUPPORTE
    Text blocks """ Java 15+ NON SUPPORTE

    12.2 Solutions et Contournements

    A. Pas de methodes generiques

    // INTERDIT - Ne compile pas
    private <T extends Number> T extractValue(String json, String key, Class<T> type) {
        // ...
    }
    
    // CORRECT - Methodes specifiques par type
    private Integer extractIntValue(String json, String key) {
        // implementation pour Integer
    }
    
    private Double extractDoubleValue(String json, String key) {
        // implementation pour Double
    }
    

    B. Pas de Map.of() / Set.of() / List.of()

    // INTERDIT - Ne compile pas
    private static final Map<String, BigDecimal> RATES = Map.of(
        "FR", new BigDecimal("0.015"),
        "DE", new BigDecimal("0.018")
    );
    
    // CORRECT - Bloc static avec HashMap
    private static final Map<String, BigDecimal> RATES = new HashMap<>();
    static {
        RATES.put("FR", new BigDecimal("0.015"));
        RATES.put("DE", new BigDecimal("0.018"));
    }
    
    // CORRECT - Pour Set
    private static final Set<String> COUNTRIES = new HashSet<>();
    static {
        COUNTRIES.add("FR");
        COUNTRIES.add("DE");
    }
    

    C. Cast explicite pour Map.get()

    // INTERDIT - "Assignment conversion not possible from Object to BigDecimal"
    BigDecimal rate = RATES.get(country);
    
    // CORRECT - Cast explicite
    BigDecimal rate = (BigDecimal) RATES.get(country);
    if (rate == null) rate = BigDecimal.ZERO;
    

    D. Pas de switch expressions

    // INTERDIT - Ne compile pas
    String result = switch (status) {
        case "A" -> "Active";
        case "I" -> "Inactive";
        default -> "Unknown";
    };
    
    // CORRECT - Switch statement classique
    String result;
    switch (status) {
        case "A": result = "Active"; break;
        case "I": result = "Inactive"; break;
        default: result = "Unknown";
    }
    

    E. Double echappement des regex

    // INTERDIT - "Invalid escape sequence"
    Pattern p = Pattern.compile("\s+");
    Pattern p2 = Pattern.compile("\{.*\}");
    
    // CORRECT - Double echappement
    Pattern p = Pattern.compile("\\s+");
    Pattern p2 = Pattern.compile("\\{.*\\}");
    

    12.3 Template de Script Compatible

    Voici un template de script qui fonctionne avec Janino :

    import eu.lmvi.socle.janino.interfaces.Calculator;
    import java.math.BigDecimal;
    import java.math.RoundingMode;
    import java.util.Map;
    import java.util.HashMap;
    import java.util.Set;
    import java.util.HashSet;
    import java.util.regex.Matcher;
    import java.util.regex.Pattern;
    
    public class MyCalculator implements Calculator<BigDecimal> {
    
        // Collections statiques avec bloc static
        private static final Map<String, BigDecimal> RATES = new HashMap<>();
        private static final Set<String> VALID_CODES = new HashSet<>();
    
        static {
            RATES.put("A", new BigDecimal("0.10"));
            RATES.put("B", new BigDecimal("0.20"));
    
            VALID_CODES.add("X");
            VALID_CODES.add("Y");
        }
    
        @Override
        public BigDecimal calculate(Map<String, Object> context) {
            // Extraction avec cast explicite
            BigDecimal amount = extractAmount(context.get("amount"));
            String code = (String) context.get("code");
            if (code == null) code = "A";
    
            // Acces Map avec cast
            BigDecimal rate = (BigDecimal) RATES.get(code);
            if (rate == null) rate = new BigDecimal("0.15");
    
            return amount.multiply(rate).setScale(2, RoundingMode.HALF_UP);
        }
    
        // Methode d'extraction type-safe (pas de generiques)
        private BigDecimal extractAmount(Object value) {
            if (value == null) return BigDecimal.ZERO;
            if (value instanceof BigDecimal) return (BigDecimal) value;
            if (value instanceof Number) {
                return BigDecimal.valueOf(((Number) value).doubleValue());
            }
            try {
                return new BigDecimal(value.toString());
            } catch (NumberFormatException e) {
                return BigDecimal.ZERO;
            }
        }
    }
    

    12.4 Checklist de Compatibilite

    Avant de deployer un script Janino, verifiez :

    • [ ] Pas de <T> dans les signatures de methodes
    • [ ] Pas de Map.of(), Set.of(), List.of()
    • [ ] Pas de var pour les declarations
    • [ ] Pas de switch expressions (fleches ->)
    • [ ] Cast explicite (Type) pour tous les Map.get()
    • [ ] Double echappement \\ dans les regex
    • [ ] Pas de text blocks """
    • [ ] Imports explicites (pas de wildcards import java.util.*)

    13. Exemple Complet

    13.1 Script : TradingFeeCalculator.java

    import eu.lmvi.socle.janino.interfaces.Calculator;
    import java.math.BigDecimal;
    import java.math.RoundingMode;
    import java.util.Map;
    import java.util.HashMap;
    
    /**
     * Calculateur de frais de trading multi-exchange.
     * Compatible Janino (pas de switch expressions, pas de Map.of)
     */
    public class TradingFeeCalculator implements Calculator<BigDecimal> {
    
        // Taux MAKER par exchange
        private static final Map<String, BigDecimal> MAKER_RATES = new HashMap<>();
        // Taux TAKER par exchange
        private static final Map<String, BigDecimal> TAKER_RATES = new HashMap<>();
    
        static {
            MAKER_RATES.put("KRAKEN", new BigDecimal("0.0016"));
            MAKER_RATES.put("BINANCE", new BigDecimal("0.0010"));
            MAKER_RATES.put("COINBASE", new BigDecimal("0.0040"));
    
            TAKER_RATES.put("KRAKEN", new BigDecimal("0.0026"));
            TAKER_RATES.put("BINANCE", new BigDecimal("0.0010"));
            TAKER_RATES.put("COINBASE", new BigDecimal("0.0060"));
        }
    
        private static final BigDecimal DEFAULT_RATE = new BigDecimal("0.0025");
    
        @Override
        public BigDecimal calculate(Map<String, Object> context) {
            String exchange = (String) context.get("exchange");
            BigDecimal amount = (BigDecimal) context.get("amount");
            String side = (String) context.get("side");
    
            if (exchange == null) exchange = "DEFAULT";
            if (side == null) side = "TAKER";
    
            BigDecimal feeRate = getFeeRate(exchange.toUpperCase(), side.toUpperCase());
    
            return amount.multiply(feeRate).setScale(8, RoundingMode.HALF_UP);
        }
    
        private BigDecimal getFeeRate(String exchange, String side) {
            boolean isMaker = "MAKER".equals(side);
    
            // Cast explicite requis par Janino
            BigDecimal rate;
            if (isMaker) {
                rate = (BigDecimal) MAKER_RATES.get(exchange);
            } else {
                rate = (BigDecimal) TAKER_RATES.get(exchange);
            }
    
            if (rate == null) {
                rate = DEFAULT_RATE;
            }
            return rate;
        }
    }
    

    13.2 Worker utilisant le script

    Note : Ce Worker est du code Java standard compile par javac/Maven, pas un script Janino. Il peut donc utiliser List.of(), Map.of() et les fonctionnalites Java modernes.

    @Component
    public class FeeWorker implements Worker {
    
        private static final Logger log = LoggerFactory.getLogger(FeeWorker.class);
    
        @Autowired
        private JaninoWorker janinoWorker;
    
        @Override
        public String getName() {
            return "fee-worker";
        }
    
        @Override
        public void doWork() {
            // Calculer les frais pour différents exchanges
            List<String> exchanges = List.of("KRAKEN", "BINANCE", "COINBASE");
            BigDecimal amount = new BigDecimal("10000.00");
    
            for (String exchange : exchanges) {
                Map<String, Object> context = Map.of(
                    "exchange", exchange,
                    "amount", amount,
                    "side", "TAKER"
                );
    
                BigDecimal fee = janinoWorker.execute(
                    "TradingFeeCalculator",
                    context,
                    BigDecimal.class
                );
    
                log.info("Fee for {} on amount {}: {}",
                    exchange, amount, fee);
            }
        }
    
        // ... autres méthodes Worker
    }
    

    14. References

  • Socle V004 – Résilience

    Socle V004 – Résilience

    11 – Resilience (Circuit Breaker & Retry)

    Version : 4.0.0 Date : 2025-12-09

    1. Introduction

    Le Socle V4 intègre des patterns de résilience pour gérer les défaillances des systèmes externes :

    • Retry : Réessayer les opérations échouées
    • Circuit Breaker : Protéger contre les cascades de défaillances

    2. Retry Pattern

    2.1 Configuration

    socle:
      resilience:
        retry:
          max-attempts: ${RETRY_MAX_ATTEMPTS:3}
          initial-delay-ms: ${RETRY_INITIAL_DELAY_MS:1000}
          max-delay-ms: ${RETRY_MAX_DELAY_MS:30000}
          multiplier: ${RETRY_MULTIPLIER:2.0}
    

    2.2 Interface RetryTemplate

    package eu.lmvi.socle.resilience;
    
    public interface RetryTemplate {
    
        /**
         * Exécute une opération avec retry
         */
        <T> T execute(Supplier<T> operation) throws RetryExhaustedException;
    
        /**
         * Exécute une opération avec retry et fallback
         */
        <T> T executeWithFallback(Supplier<T> operation, Supplier<T> fallback);
    
        /**
         * Exécute une opération void avec retry
         */
        void executeVoid(Runnable operation) throws RetryExhaustedException;
    }
    

    2.3 Implémentation

    package eu.lmvi.socle.resilience;
    
    @Component
    public class ExponentialBackoffRetryTemplate implements RetryTemplate {
    
        private static final Logger log = LoggerFactory.getLogger(ExponentialBackoffRetryTemplate.class);
    
        private final int maxAttempts;
        private final long initialDelayMs;
        private final long maxDelayMs;
        private final double multiplier;
    
        public ExponentialBackoffRetryTemplate(SocleConfiguration config) {
            this.maxAttempts = config.getResilience().getRetry().getMaxAttempts();
            this.initialDelayMs = config.getResilience().getRetry().getInitialDelayMs();
            this.maxDelayMs = config.getResilience().getRetry().getMaxDelayMs();
            this.multiplier = config.getResilience().getRetry().getMultiplier();
        }
    
        @Override
        public <T> T execute(Supplier<T> operation) throws RetryExhaustedException {
            Exception lastException = null;
            long delay = initialDelayMs;
    
            for (int attempt = 1; attempt <= maxAttempts; attempt++) {
                try {
                    return operation.get();
                } catch (Exception e) {
                    lastException = e;
                    log.warn("Attempt {}/{} failed: {}", attempt, maxAttempts, e.getMessage());
    
                    if (attempt < maxAttempts) {
                        sleep(delay);
                        delay = Math.min((long) (delay * multiplier), maxDelayMs);
                    }
                }
            }
    
            throw new RetryExhaustedException(
                "Operation failed after " + maxAttempts + " attempts",
                lastException
            );
        }
    
        @Override
        public <T> T executeWithFallback(Supplier<T> operation, Supplier<T> fallback) {
            try {
                return execute(operation);
            } catch (RetryExhaustedException e) {
                log.warn("All retries exhausted, using fallback");
                return fallback.get();
            }
        }
    
        @Override
        public void executeVoid(Runnable operation) throws RetryExhaustedException {
            execute(() -> {
                operation.run();
                return null;
            });
        }
    
        private void sleep(long ms) {
            try {
                Thread.sleep(ms);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new RuntimeException("Retry interrupted", e);
            }
        }
    }
    

    2.4 Utilisation

    @Service
    public class ExternalApiService {
    
        @Autowired
        private RetryTemplate retryTemplate;
    
        public Data fetchData(String id) {
            return retryTemplate.execute(() -> {
                // Appel qui peut échouer
                return httpClient.get("/data/" + id);
            });
        }
    
        public Data fetchDataWithFallback(String id) {
            return retryTemplate.executeWithFallback(
                () -> httpClient.get("/data/" + id),
                () -> getCachedData(id)  // Fallback vers le cache
            );
        }
    }
    

    3. Circuit Breaker Pattern

    3.1 États

            succès
         ┌─────────────────┐
         │                 │
         ▼                 │
    ┌─────────┐      ┌─────┴─────┐      ┌─────────┐
    │  CLOSED │─────►│   OPEN    │─────►│HALF_OPEN│
    └────┬────┘      └───────────┘      └────┬────┘
         │  échecs        │ timeout          │
         │  threshold     │                  │
         │                │                  │
         └────────────────┴──────────────────┘
                  retour succès
    
    • CLOSED : Fonctionnement normal, les requêtes passent
    • OPEN : Circuit ouvert, les requêtes échouent immédiatement
    • HALF_OPEN : Test de reprise, quelques requêtes passent

    3.2 Configuration

    socle:
      resilience:
        circuit-breaker:
          failure-threshold: ${CB_FAILURE_THRESHOLD:5}
          success-threshold: ${CB_SUCCESS_THRESHOLD:3}
          timeout-ms: ${CB_TIMEOUT_MS:60000}
          half-open-requests: ${CB_HALF_OPEN_REQUESTS:3}
    

    3.3 Interface CircuitBreaker

    package eu.lmvi.socle.resilience;
    
    public interface CircuitBreaker {
    
        /**
         * Nom du circuit
         */
        String getName();
    
        /**
         * État actuel
         */
        CircuitState getState();
    
        /**
         * Exécute une opération protégée
         */
        <T> T execute(Supplier<T> operation) throws CircuitBreakerOpenException;
    
        /**
         * Exécute avec fallback
         */
        <T> T executeWithFallback(Supplier<T> operation, Supplier<T> fallback);
    
        /**
         * Force l'ouverture
         */
        void forceOpen();
    
        /**
         * Force la fermeture
         */
        void forceClose();
    
        /**
         * Reset les compteurs
         */
        void reset();
    
        /**
         * Métriques
         */
        CircuitBreakerMetrics getMetrics();
    }
    
    public enum CircuitState {
        CLOSED,
        OPEN,
        HALF_OPEN
    }
    

    3.4 Implémentation

    package eu.lmvi.socle.resilience;
    
    public class DefaultCircuitBreaker implements CircuitBreaker {
    
        private static final Logger log = LoggerFactory.getLogger(DefaultCircuitBreaker.class);
    
        private final String name;
        private final int failureThreshold;
        private final int successThreshold;
        private final long timeoutMs;
        private final int halfOpenRequests;
    
        private volatile CircuitState state = CircuitState.CLOSED;
        private final AtomicInteger failureCount = new AtomicInteger(0);
        private final AtomicInteger successCount = new AtomicInteger(0);
        private final AtomicInteger halfOpenCount = new AtomicInteger(0);
        private volatile Instant lastFailureTime;
        private final ReentrantLock lock = new ReentrantLock();
    
        @Override
        public <T> T execute(Supplier<T> operation) throws CircuitBreakerOpenException {
            if (!allowRequest()) {
                throw new CircuitBreakerOpenException(name);
            }
    
            try {
                T result = operation.get();
                onSuccess();
                return result;
            } catch (Exception e) {
                onFailure();
                throw e;
            }
        }
    
        @Override
        public <T> T executeWithFallback(Supplier<T> operation, Supplier<T> fallback) {
            try {
                return execute(operation);
            } catch (CircuitBreakerOpenException e) {
                log.debug("[{}] Circuit open, using fallback", name);
                return fallback.get();
            } catch (Exception e) {
                log.warn("[{}] Operation failed, using fallback", name, e);
                return fallback.get();
            }
        }
    
        private boolean allowRequest() {
            switch (state) {
                case CLOSED:
                    return true;
    
                case OPEN:
                    // Vérifier si le timeout est passé
                    if (lastFailureTime != null &&
                        Duration.between(lastFailureTime, Instant.now()).toMillis() > timeoutMs) {
                        transitionTo(CircuitState.HALF_OPEN);
                        return true;
                    }
                    return false;
    
                case HALF_OPEN:
                    // Limiter les requêtes en half-open
                    return halfOpenCount.incrementAndGet() <= halfOpenRequests;
    
                default:
                    return false;
            }
        }
    
        private void onSuccess() {
            lock.lock();
            try {
                switch (state) {
                    case CLOSED:
                        failureCount.set(0);
                        break;
    
                    case HALF_OPEN:
                        if (successCount.incrementAndGet() >= successThreshold) {
                            transitionTo(CircuitState.CLOSED);
                        }
                        break;
                }
            } finally {
                lock.unlock();
            }
        }
    
        private void onFailure() {
            lock.lock();
            try {
                lastFailureTime = Instant.now();
    
                switch (state) {
                    case CLOSED:
                        if (failureCount.incrementAndGet() >= failureThreshold) {
                            transitionTo(CircuitState.OPEN);
                        }
                        break;
    
                    case HALF_OPEN:
                        transitionTo(CircuitState.OPEN);
                        break;
                }
            } finally {
                lock.unlock();
            }
        }
    
        private void transitionTo(CircuitState newState) {
            if (state != newState) {
                log.info("[{}] Circuit state: {} -> {}", name, state, newState);
                state = newState;
                failureCount.set(0);
                successCount.set(0);
                halfOpenCount.set(0);
            }
        }
    
        @Override
        public CircuitState getState() {
            return state;
        }
    
        @Override
        public String getName() {
            return name;
        }
    
        @Override
        public void forceOpen() {
            transitionTo(CircuitState.OPEN);
        }
    
        @Override
        public void forceClose() {
            transitionTo(CircuitState.CLOSED);
        }
    
        @Override
        public void reset() {
            lock.lock();
            try {
                state = CircuitState.CLOSED;
                failureCount.set(0);
                successCount.set(0);
                halfOpenCount.set(0);
                lastFailureTime = null;
            } finally {
                lock.unlock();
            }
        }
    }
    

    3.5 CircuitBreakerRegistry

    @Component
    public class CircuitBreakerRegistry {
    
        private final ConcurrentHashMap<String, CircuitBreaker> circuits = new ConcurrentHashMap<>();
        private final SocleConfiguration config;
    
        public CircuitBreaker getOrCreate(String name) {
            return circuits.computeIfAbsent(name, this::createCircuitBreaker);
        }
    
        public CircuitBreaker get(String name) {
            return circuits.get(name);
        }
    
        public Map<String, CircuitState> getAllStates() {
            return circuits.entrySet().stream()
                .collect(Collectors.toMap(
                    Map.Entry::getKey,
                    e -> e.getValue().getState()
                ));
        }
    
        private CircuitBreaker createCircuitBreaker(String name) {
            return new DefaultCircuitBreaker(
                name,
                config.getResilience().getCircuitBreaker().getFailureThreshold(),
                config.getResilience().getCircuitBreaker().getSuccessThreshold(),
                config.getResilience().getCircuitBreaker().getTimeoutMs(),
                config.getResilience().getCircuitBreaker().getHalfOpenRequests()
            );
        }
    }
    

    3.6 Utilisation

    @Service
    public class PaymentService {
    
        @Autowired
        private CircuitBreakerRegistry cbRegistry;
    
        public PaymentResult processPayment(Payment payment) {
            CircuitBreaker cb = cbRegistry.getOrCreate("payment-gateway");
    
            return cb.executeWithFallback(
                () -> paymentGateway.process(payment),
                () -> {
                    // Fallback: mettre en queue pour traitement ultérieur
                    paymentQueue.enqueue(payment);
                    return PaymentResult.pending("Queued for later processing");
                }
            );
        }
    }
    

    4. Combinaison Retry + Circuit Breaker

    @Service
    public class ResilientApiClient {
    
        @Autowired
        private RetryTemplate retryTemplate;
    
        @Autowired
        private CircuitBreakerRegistry cbRegistry;
    
        public Data fetchData(String endpoint) {
            CircuitBreaker cb = cbRegistry.getOrCreate("api-" + endpoint);
    
            return cb.executeWithFallback(
                () -> retryTemplate.execute(() -> httpClient.get(endpoint)),
                () -> getCachedData(endpoint)
            );
        }
    }
    

    Ordre d’exécution

    1. CircuitBreaker vérifie si le circuit est ouvert
       → Si OPEN: fallback immédiat
       → Si CLOSED/HALF_OPEN: continue
    
    2. RetryTemplate essaie l'opération
       → Retry avec backoff exponentiel
       → Si tous les retries échouent: exception
    
    3. CircuitBreaker compte l'échec
       → Si seuil atteint: passe en OPEN
    
    4. Fallback si échec
    

    5. Annotations (optionnel)

    @Target(ElementType.METHOD)
    @Retention(RetentionPolicy.RUNTIME)
    public @interface Resilient {
        String circuitBreaker() default "";
        int maxRetries() default 3;
        long retryDelay() default 1000;
        Class<? extends Throwable>[] retryOn() default {Exception.class};
    }
    
    @Aspect
    @Component
    public class ResilienceAspect {
    
        @Around("@annotation(resilient)")
        public Object handleResilience(ProceedingJoinPoint pjp, Resilient resilient) throws Throwable {
            String cbName = resilient.circuitBreaker();
            if (cbName.isEmpty()) {
                cbName = pjp.getSignature().getDeclaringTypeName() + "." + pjp.getSignature().getName();
            }
    
            CircuitBreaker cb = cbRegistry.getOrCreate(cbName);
    
            return cb.execute(() -> {
                return retryTemplate.execute(() -> {
                    try {
                        return pjp.proceed();
                    } catch (Throwable t) {
                        throw new RuntimeException(t);
                    }
                });
            });
        }
    }
    

    Utilisation

    @Service
    public class UserService {
    
        @Resilient(circuitBreaker = "user-api", maxRetries = 5)
        public User getUser(String id) {
            return userApiClient.fetchUser(id);
        }
    }
    

    6. Bulkhead Pattern

    Le pattern Bulkhead limite le nombre d’appels concurrents pour éviter l’épuisement des ressources.

    @Component
    public class BulkheadRegistry {
    
        private final ConcurrentHashMap<String, Semaphore> bulkheads = new ConcurrentHashMap<>();
    
        public <T> T execute(String name, int maxConcurrent, Supplier<T> operation)
                throws BulkheadFullException {
            Semaphore semaphore = bulkheads.computeIfAbsent(name,
                k -> new Semaphore(maxConcurrent));
    
            if (!semaphore.tryAcquire()) {
                throw new BulkheadFullException(name);
            }
    
            try {
                return operation.get();
            } finally {
                semaphore.release();
            }
        }
    }
    
    // Utilisation
    @Service
    public class ApiService {
    
        @Autowired
        private BulkheadRegistry bulkheadRegistry;
    
        public Data callExternalApi() {
            return bulkheadRegistry.execute("external-api", 10, () -> {
                // Max 10 appels concurrents
                return httpClient.get("/api/data");
            });
        }
    }
    

    7. Timeout Pattern

    @Component
    public class TimeoutTemplate {
    
        private final ScheduledExecutorService scheduler =
            Executors.newScheduledThreadPool(4);
    
        public <T> T executeWithTimeout(Supplier<T> operation, Duration timeout)
                throws TimeoutException {
    
            CompletableFuture<T> future = CompletableFuture.supplyAsync(operation);
    
            try {
                return future.get(timeout.toMillis(), TimeUnit.MILLISECONDS);
            } catch (java.util.concurrent.TimeoutException e) {
                future.cancel(true);
                throw new TimeoutException("Operation timed out after " + timeout);
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        }
    }
    

    8. API Admin

    @RestController
    @RequestMapping("/admin/resilience")
    public class ResilienceController {
    
        @Autowired
        private CircuitBreakerRegistry cbRegistry;
    
        @GetMapping("/circuits")
        public Map<String, CircuitState> getCircuits() {
            return cbRegistry.getAllStates();
        }
    
        @PostMapping("/circuits/{name}/reset")
        public void resetCircuit(@PathVariable String name) {
            CircuitBreaker cb = cbRegistry.get(name);
            if (cb != null) {
                cb.reset();
            }
        }
    
        @PostMapping("/circuits/{name}/open")
        public void openCircuit(@PathVariable String name) {
            CircuitBreaker cb = cbRegistry.get(name);
            if (cb != null) {
                cb.forceOpen();
            }
        }
    
        @PostMapping("/circuits/{name}/close")
        public void closeCircuit(@PathVariable String name) {
            CircuitBreaker cb = cbRegistry.get(name);
            if (cb != null) {
                cb.forceClose();
            }
        }
    }
    

    9. Métriques

    @Component
    public class ResilienceMetrics {
    
        private final MeterRegistry registry;
    
        public void recordRetry(String operation, int attempt, boolean success) {
            Counter.builder("socle_retry_attempts")
                .tag("operation", operation)
                .tag("attempt", String.valueOf(attempt))
                .tag("success", String.valueOf(success))
                .register(registry)
                .increment();
        }
    
        public void recordCircuitBreakerState(String name, CircuitState state) {
            Gauge.builder("socle_circuit_breaker_state", () ->
                state == CircuitState.CLOSED ? 0 :
                state == CircuitState.HALF_OPEN ? 1 : 2)
                .tag("name", name)
                .register(registry);
        }
    }
    

    10. Bonnes pratiques

    DO

    • Utiliser le circuit breaker pour les appels réseau externes
    • Configurer des timeouts appropriés
    • Toujours prévoir un fallback
    • Monitorer l’état des circuits
    • Logger les transitions d’état

    DON’T

    • Ne pas utiliser le retry pour les erreurs non récupérables (400, 401)
    • Ne pas configurer des seuils trop bas (faux positifs)
    • Ne pas oublier les timeouts (risque de blocage)
    • Ne pas ignorer les métriques

    11. Références

  • Socle V004 – Plan de Documentation

    Socle V004 – Plan de Documentation

    Plan de Documentation – Socle V4

    Version : 4.0.0 Date : 2025-01-25

    Structure de la documentation

    # Document Description Statut
    00 PLAN-DOCUMENTATION Ce document – Index de la documentation Done
    01 INTRODUCTION Présentation du Socle V4 et philosophie Done
    02 ARCHITECTURE Architecture technique et composants Done
    03 QUICKSTART Guide de démarrage rapide (5 min) Done
    04 CONFIGURATION Référence complète de configuration Done
    05 WORKERS Guide des Workers et lifecycle Done
    06 KV-BUS Guide du Key-Value Bus Done
    07 SHARED-DATA Guide du SharedDataRegistry Done
    08 SUPERVISOR Supervision et heartbeats Done
    09 PIPELINE Pipeline Engine V1 et V2 (Queue/Claim/Ack, DLQ) Done
    10 SECURITY Sécurité, Auth, Rate Limiting Done
    11 RESILIENCE Circuit Breaker et Retry Done
    12 SCHEDULER Scheduling cron et interval Done
    13 TLS-HTTPS Configuration TLS/HTTPS Done
    14 ADMIN-API API REST d’administration Done
    15 METRICS Métriques et Prometheus Done
    16 KUBERNETES Déploiement Kubernetes Done
    17 HOWTO Guides pratiques Done
    18 TROUBLESHOOTING Résolution de problèmes Done
    19 EXEMPLES Exemples de code Done
    20 PLUGINS Système de plugins Done
    21 H2-TECHDB Base technique H2 (V4) Done
    22 LOG4J2-LOGFORWARDER Log4j2 et LogForwarder (V4) Done
    23 AUTH-CLIENT Client authentification JWT (V4) Done
    24 WORKER-REGISTRY Client Worker Registry (V4) Done
    25 MIGRATION-V3-V4 Guide de migration V3 → V4 Done
    26 GRAALVM-JAVASCRIPT GraalVM CE et GraalJS pour scripts JS Done
    27 STATUS-DASHBOARD Dashboard HTML de supervision (port 9374) Done
    29 JANINO Scripts Java compiles dynamiquement Done
    30 EVENTBUS-WORKERS Workers event-driven Done
    31 GRPC-INTER-SOCLES Communication gRPC entre Socles Done

    Nouveautés V4

    Les documents 21 à 30 sont spécifiques au Socle V4 :

    • 21-H2-TECHDB : Base embarquée H2 pour état technique
    • 22-LOG4J2-LOGFORWARDER : Migration Logback → Log4j2 + centralisation logs
    • 23-AUTH-CLIENT : Client JWT pour services centraux
    • 24-WORKER-REGISTRY : Auto-enregistrement des workers
    • 25-MIGRATION-V3-V4 : Guide de migration depuis V3
    • 26-GRAALVM-JAVASCRIPT : GraalVM CE 21 et GraalJS pour exécution JavaScript
    • 27-STATUS-DASHBOARD : Dashboard HTML de supervision temps réel sur port 9374
    • 29-JANINO : Compilation dynamique de scripts Java
    • 30-EVENTBUS-WORKERS : Workers orientés événements
    • 31-GRPC-INTER-SOCLES : Communication gRPC bidirectionnelle entre Socles

    Guide Méthodologique

    Un guide méthodologique complet est disponible pour aider les développeurs à implémenter leurs solutions :

    Ce document répond à la question : « Je dois implémenter X, comment je fais ? »

    Conventions

    • Tous les fichiers sont en Markdown
    • Les exemples de code sont en Java 21
    • Les configurations sont en YAML
    • Les commandes sont pour Linux/macOS (adaptables Windows)
  • Socle V004 – Sécurité

    Socle V004 – Sécurité

    10 – Security

    Version : 4.0.0 Date : 2025-12-09

    1. Introduction

    Le Socle V4 intègre plusieurs mécanismes de sécurité :

    • Authentification Admin API (Basic Auth)
    • Client JWT pour services externes (nouveauté V4)
    • Filtrage des endpoints sensibles
    • Gestion sécurisée des secrets

    2. Authentification Admin API

    2.1 Configuration

    socle:
      admin:
        enabled: true
        auth:
          enabled: ${ADMIN_AUTH_ENABLED:false}
          username: ${ADMIN_USERNAME:admin}
          password: ${ADMIN_PASSWORD:}
    

    2.2 AdminAuthFilter

    package eu.lmvi.socle.security;
    
    @Component
    @Order(1)
    public class AdminAuthFilter implements Filter {
    
        private final SocleConfiguration config;
    
        @Override
        public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain)
                throws IOException, ServletException {
    
            HttpServletRequest httpRequest = (HttpServletRequest) request;
            HttpServletResponse httpResponse = (HttpServletResponse) response;
    
            // Skip si auth désactivée
            if (!config.getAdmin().getAuth().isEnabled()) {
                chain.doFilter(request, response);
                return;
            }
    
            // Skip les endpoints publics
            String path = httpRequest.getRequestURI();
            if (isPublicEndpoint(path)) {
                chain.doFilter(request, response);
                return;
            }
    
            // Vérifier l'authentification pour /admin/*
            if (path.startsWith("/admin")) {
                String authHeader = httpRequest.getHeader("Authorization");
    
                if (!isValidAuth(authHeader)) {
                    httpResponse.setStatus(HttpServletResponse.SC_UNAUTHORIZED);
                    httpResponse.setHeader("WWW-Authenticate", "Basic realm=\"Admin API\"");
                    return;
                }
            }
    
            chain.doFilter(request, response);
        }
    
        private boolean isPublicEndpoint(String path) {
            return path.equals("/admin/health") || path.equals("/admin/health/live");
        }
    
        private boolean isValidAuth(String authHeader) {
            if (authHeader == null || !authHeader.startsWith("Basic ")) {
                return false;
            }
    
            String base64Credentials = authHeader.substring("Basic ".length());
            String credentials = new String(Base64.getDecoder().decode(base64Credentials));
            String[] parts = credentials.split(":", 2);
    
            if (parts.length != 2) {
                return false;
            }
    
            return parts[0].equals(config.getAdmin().getAuth().getUsername())
                && parts[1].equals(config.getAdmin().getAuth().getPassword());
        }
    }
    

    2.3 Utilisation

    # Sans auth
    curl http://localhost:8080/admin/health
    
    # Avec auth
    curl -u admin:secret http://localhost:8080/admin/workers
    

    3. JWT Auth Client (Nouveauté V4)

    3.1 Principe

    Le SocleAuthClient permet aux applications Socle de s’authentifier auprès de services centraux (LogHub, Registry, etc.).

    Application Socle  ──────►  Auth Server  ──────►  Services sécurisés
          │                          │                      │
          │  1. Login (API Key)      │                      │
          │─────────────────────────►│                      │
          │                          │                      │
          │  2. JWT tokens           │                      │
          │◄─────────────────────────│                      │
          │                          │                      │
          │  3. Requêtes avec Bearer │                      │
          │──────────────────────────────────────────────────►
    

    3.2 Configuration

    socle:
      auth:
        enabled: ${AUTH_ENABLED:false}
        server-url: ${AUTH_SERVER_URL:https://auth.mycompany.com}
        source-name: ${SOURCE_NAME:${socle.app_name}}
        api-key: ${API_KEY:}
        access-token-buffer-seconds: 60
    

    3.3 Utilisation

    @Service
    public class SecuredApiClient {
    
        @Autowired(required = false)
        private SocleAuthClient authClient;
    
        public void callSecuredApi() {
            if (authClient == null || !authClient.isAuthenticated()) {
                throw new SecurityException("Authentication required");
            }
    
            String token = authClient.getValidAccessToken();
    
            HttpRequest request = HttpRequest.newBuilder()
                .uri(URI.create("https://api.mycompany.com/data"))
                .header("Authorization", "Bearer " + token)
                .build();
    
            // ...
        }
    }
    

    Voir 23-AUTH-CLIENT pour la documentation complète.

    4. Gestion des secrets

    4.1 Variables d’environnement

    # JAMAIS dans le code ou les fichiers committes
    export ADMIN_PASSWORD="super-secret-password"
    export API_KEY="my-api-key"
    export REDIS_PASSWORD="redis-password"
    export TECHDB_PASSWORD="techdb-password"
    

    4.2 Docker Secrets

    # docker-compose.yml
    services:
      app:
        image: socle-v4:latest
        secrets:
          - admin_password
          - api_key
        environment:
          - ADMIN_PASSWORD_FILE=/run/secrets/admin_password
          - API_KEY_FILE=/run/secrets/api_key
    
    secrets:
      admin_password:
        file: ./secrets/admin_password.txt
      api_key:
        file: ./secrets/api_key.txt
    

    4.3 Kubernetes Secrets

    apiVersion: v1
    kind: Secret
    metadata:
      name: socle-secrets
    type: Opaque
    stringData:
      ADMIN_PASSWORD: "super-secret"
      API_KEY: "my-api-key"
    ---
    apiVersion: apps/v1
    kind: Deployment
    spec:
      template:
        spec:
          containers:
            - name: app
              envFrom:
                - secretRef:
                    name: socle-secrets
    

    4.4 Configuration sécurisée

    @Configuration
    public class SecretsConfiguration {
    
        @Value("${ADMIN_PASSWORD:#{null}}")
        private String adminPassword;
    
        @Value("${ADMIN_PASSWORD_FILE:#{null}}")
        private String adminPasswordFile;
    
        @PostConstruct
        public void loadSecrets() {
            // Charger depuis fichier si spécifié
            if (adminPasswordFile != null) {
                try {
                    adminPassword = Files.readString(Path.of(adminPasswordFile)).trim();
                } catch (IOException e) {
                    throw new RuntimeException("Failed to load secret from file", e);
                }
            }
        }
    
        public String getAdminPassword() {
            return adminPassword;
        }
    }
    

    5. CORS Configuration

    @Configuration
    public class CorsConfiguration implements WebMvcConfigurer {
    
        @Value("${socle.security.cors.allowed-origins:*}")
        private String allowedOrigins;
    
        @Override
        public void addCorsMappings(CorsRegistry registry) {
            registry.addMapping("/api/**")
                .allowedOrigins(allowedOrigins.split(","))
                .allowedMethods("GET", "POST", "PUT", "DELETE")
                .allowedHeaders("*")
                .exposedHeaders("X-Total-Count")
                .allowCredentials(true)
                .maxAge(3600);
        }
    }
    

    6. Rate Limiting

    6.1 Implémentation simple

    @Component
    public class RateLimitFilter implements Filter {
    
        private final KvBus kvBus;
        private final int maxRequests = 100;
        private final Duration window = Duration.ofMinutes(1);
    
        @Override
        public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain)
                throws IOException, ServletException {
    
            HttpServletRequest httpRequest = (HttpServletRequest) request;
            HttpServletResponse httpResponse = (HttpServletResponse) response;
    
            String clientId = getClientId(httpRequest);
            String key = "ratelimit:" + clientId + ":" + Instant.now().truncatedTo(ChronoUnit.MINUTES);
    
            long count = kvBus.increment(key);
            if (count == 1) {
                kvBus.setTtl(key, window);
            }
    
            if (count > maxRequests) {
                httpResponse.setStatus(429);
                httpResponse.setHeader("Retry-After", "60");
                httpResponse.getWriter().write("Rate limit exceeded");
                return;
            }
    
            httpResponse.setHeader("X-RateLimit-Limit", String.valueOf(maxRequests));
            httpResponse.setHeader("X-RateLimit-Remaining", String.valueOf(maxRequests - count));
    
            chain.doFilter(request, response);
        }
    
        private String getClientId(HttpServletRequest request) {
            String apiKey = request.getHeader("X-API-Key");
            if (apiKey != null) {
                return apiKey;
            }
            return request.getRemoteAddr();
        }
    }
    

    7. Input Validation

    7.1 Validation des DTOs

    public record CreateOrderRequest(
        @NotNull @Size(min = 1, max = 100)
        String customerId,
    
        @NotEmpty
        List<@Valid OrderItem> items,
    
        @Email
        String notificationEmail
    ) {}
    
    public record OrderItem(
        @NotNull @Size(min = 1, max = 50)
        String productId,
    
        @Min(1) @Max(1000)
        int quantity
    ) {}
    

    7.2 Controller avec validation

    @RestController
    @RequestMapping("/api/orders")
    public class OrderController {
    
        @PostMapping
        public ResponseEntity<Order> createOrder(@Valid @RequestBody CreateOrderRequest request) {
            // request est validé automatiquement
            return ResponseEntity.ok(orderService.create(request));
        }
    }
    

    7.3 Sanitization

    public class InputSanitizer {
    
        private static final Pattern SAFE_STRING = Pattern.compile("^[a-zA-Z0-9-_]+$");
    
        public static String sanitizeId(String input) {
            if (input == null) return null;
            if (!SAFE_STRING.matcher(input).matches()) {
                throw new IllegalArgumentException("Invalid ID format");
            }
            return input;
        }
    
        public static String sanitizeForLog(String input) {
            if (input == null) return null;
            return input.replaceAll("[\n\r\t]", "_");
        }
    }
    

    8. Logging sécurisé

    8.1 Ne pas logger les secrets

    // MAUVAIS
    log.info("Connecting with password: {}", password);
    log.info("API Key: {}", apiKey);
    log.info("Request: {}", requestWithSensitiveData);
    
    // BON
    log.info("Connecting to database");
    log.info("Using API Key: {}...", apiKey.substring(0, 4));
    log.info("Request received for user: {}", sanitizeForLog(userId));
    

    8.2 Pattern pour masquer les données sensibles

    public class SecureLogger {
    
        private static final Set<String> SENSITIVE_FIELDS = Set.of(
            "password", "apiKey", "token", "secret", "credential"
        );
    
        public static String maskSensitiveData(Map<String, Object> data) {
            Map<String, Object> masked = new HashMap<>();
            for (Map.Entry<String, Object> entry : data.entrySet()) {
                if (SENSITIVE_FIELDS.contains(entry.getKey().toLowerCase())) {
                    masked.put(entry.getKey(), "***MASKED***");
                } else {
                    masked.put(entry.getKey(), entry.getValue());
                }
            }
            return masked.toString();
        }
    }
    

    9. Headers de sécurité

    @Component
    public class SecurityHeadersFilter implements Filter {
    
        @Override
        public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain)
                throws IOException, ServletException {
    
            HttpServletResponse httpResponse = (HttpServletResponse) response;
    
            // Prevent clickjacking
            httpResponse.setHeader("X-Frame-Options", "DENY");
    
            // XSS protection
            httpResponse.setHeader("X-Content-Type-Options", "nosniff");
            httpResponse.setHeader("X-XSS-Protection", "1; mode=block");
    
            // HSTS (si HTTPS)
            httpResponse.setHeader("Strict-Transport-Security", "max-age=31536000; includeSubDomains");
    
            // Content Security Policy
            httpResponse.setHeader("Content-Security-Policy", "default-src 'self'");
    
            chain.doFilter(request, response);
        }
    }
    

    10. Audit logging

    @Aspect
    @Component
    public class AuditAspect {
    
        private static final Logger auditLog = LoggerFactory.getLogger("AUDIT");
    
        @Around("@annotation(Audited)")
        public Object audit(ProceedingJoinPoint joinPoint) throws Throwable {
            String method = joinPoint.getSignature().getName();
            String user = getCurrentUser();
            Instant start = Instant.now();
    
            try {
                Object result = joinPoint.proceed();
                auditLog.info("SUCCESS | user={} | method={} | duration={}ms",
                    user, method, Duration.between(start, Instant.now()).toMillis());
                return result;
            } catch (Exception e) {
                auditLog.warn("FAILURE | user={} | method={} | error={}",
                    user, method, e.getMessage());
                throw e;
            }
        }
    
        private String getCurrentUser() {
            // Récupérer l'utilisateur du contexte
            return "system";
        }
    }
    
    @Retention(RetentionPolicy.RUNTIME)
    @Target(ElementType.METHOD)
    public @interface Audited {}
    

    11. Checklist de sécurité

    Configuration

    • [ ] ADMIN_AUTH_ENABLED=true en production
    • [ ] Mots de passe forts et uniques
    • [ ] Secrets via variables d’environnement ou secret manager
    • [ ] HTTPS activé
    • [ ] CORS configuré correctement

    Code

    • [ ] Validation de tous les inputs
    • [ ] Pas de secrets dans les logs
    • [ ] Headers de sécurité activés
    • [ ] Rate limiting en place
    • [ ] Audit logging activé

    Infrastructure

    • [ ] Firewall configuré
    • [ ] Ports non nécessaires fermés
    • [ ] H2 Console désactivée en production
    • [ ] Accès admin restreint

    12. Références

  • Socle V004 – Dépannage

    Socle V004 – Dépannage

    18 – Troubleshooting

    Version : 4.0.0 Date : 2025-12-09

    1. Problèmes de démarrage

    1.1 Application ne démarre pas

    Symptôme : L’application ne démarre pas ou crashe immédiatement.

    Causes possibles :

    1. Port déjà utilisé

      Error: Address already in use: bind
      

      Solution :

      # Trouver le processus
      lsof -i :8080
      # Changer le port
      export HTTP_PORT=8081
      
    2. Configuration manquante

      Failed to bind properties under 'socle.xxx'
      

      Solution : Vérifier les variables d’environnement et application.yml

    3. Erreur de logging

      ERROR StatusLogger Log4j2 could not find a logging implementation
      

      Solution : Vérifier que log4j2.xml existe dans src/main/resources/

    1.2 Workers ne démarrent pas

    Symptôme : Les workers sont enregistrés mais restent en état REGISTERED.

    Solutions :

    1. Vérifier les logs d’initialisation
    2. Vérifier les dépendances (base de données, Redis, etc.)
    3. Vérifier les priorités de démarrage
    curl http://localhost:8080/admin/workers
    

    2. Problèmes de connectivité

    2.1 Redis non accessible

    Symptôme :

    Cannot get Jedis connection
    

    Solutions :

    1. Vérifier l’hôte et le port Redis
      redis-cli -h localhost -p 6379 ping
      
    2. Vérifier le mot de passe
    3. Vérifier les firewalls

    2.2 Database non accessible

    Symptôme :

    Database may be already in use: "locked by another process"
    

    Solutions :

    1. Arrêter les autres instances
    2. Utiliser AUTO_SERVER=TRUE dans l’URL H2
    3. Supprimer les fichiers de lock
    rm ./data/socle-techdb.lock.db
    

    3. Problèmes de logging (V4)

    3.1 Logs non visibles

    Symptôme : Aucun log n’apparaît.

    Solutions :

    1. Vérifier que log4j2.xml existe
    2. Vérifier le niveau de log
    3. Vérifier logging.config dans application.yml
    logging:
      config: classpath:log4j2.xml
    

    3.2 Conflit Logback/Log4j2

    Symptôme :

    SLF4J: Class path contains multiple SLF4J bindings
    

    Solution : Exclure Logback de toutes les dépendances

    mvn dependency:tree | grep logback
    
    <exclusion>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-logging</artifactId>
    </exclusion>
    

    3.3 AsyncLoggers non actifs

    Symptôme : Performance de logging dégradée.

    Solution : Vérifier log4j2.component.properties

    Log4jContextSelector=org.apache.logging.log4j.core.async.AsyncLoggerContextSelector
    

    3.4 LogForwarder queue pleine

    Symptôme :

    WARN - Log queue full, storing to fallback
    

    Solutions :

    1. Augmenter queue-capacity
    2. Vérifier la connectivité réseau vers le LogHub
    3. Réduire le volume de logs
    # Vérifier les logs en fallback
    curl http://localhost:8080/admin/logforwarder/status
    

    4. Problèmes de performance

    4.1 Haute consommation CPU

    Solutions :

    1. Vérifier les workers en boucle infinie
    2. Vérifier les logs en DEBUG
    3. Profiler avec JFR
    jcmd <pid> JFR.start duration=60s filename=recording.jfr
    

    4.2 Haute consommation mémoire

    Solutions :

    1. Analyser le heap dump
      jmap -dump:format=b,file=heap.hprof <pid>
      
    2. Vérifier les fuites dans KvBus/SharedDataRegistry
    3. Ajuster la configuration JVM

    4.3 Latence élevée

    Solutions :

    1. Vérifier les circuit breakers
      curl http://localhost:8080/admin/resilience/circuits
      
    2. Vérifier les connexions réseau
    3. Vérifier les métriques
      curl http://localhost:8080/actuator/prometheus | grep latency
      

    5. Problèmes de résilience

    5.1 Circuit Breaker bloqué en OPEN

    Symptôme : Un circuit reste ouvert malgré la récupération du service.

    Solutions :

    1. Reset manuel
      curl -X POST http://localhost:8080/admin/resilience/circuits/my-circuit/reset
      
    2. Vérifier le timeout configuré
    3. Vérifier que le service cible répond

    5.2 Retry infini

    Symptôme : L’application retry sans fin.

    Solutions :

    1. Vérifier max-attempts configuré
    2. Vérifier les exceptions retryables
    3. Ajouter un circuit breaker

    6. Problèmes H2 TechDB (V4)

    6.1 Base corrompue

    Symptôme :

    File corrupted
    

    Solution :

    rm -rf ./data/socle-techdb.*
    # Redémarrer l'application
    

    6.2 H2 Console inaccessible

    Solutions :

    1. Vérifier socle.techdb.console.enabled=true
    2. Vérifier l’URL : http://localhost:8080/h2-console
    3. Utiliser l’URL JDBC correcte : jdbc:h2:file:./data/socle-techdb

    6.3 Offsets perdus

    Solutions :

    1. Vérifier que TechDB est enabled
    2. Vérifier les logs de saveOffset
    3. Requêter directement H2
      SELECT * FROM socle_offsets;
      

    7. Problèmes d’authentification (V4)

    7.1 Login échoue

    Symptôme :

    AuthenticationException: Login failed: 401
    

    Solutions :

    1. Vérifier API_KEY
    2. Vérifier SOURCE_NAME
    3. Vérifier AUTH_SERVER_URL

    7.2 Token expiré

    Symptôme :

    Token expired
    

    Solutions :

    1. Vérifier l’horloge système (NTP)
    2. Augmenter access-token-buffer-seconds

    7.3 Admin API 401

    Solutions :

    1. Vérifier si l’auth est activée
      socle.admin.auth.enabled: true
      
    2. Utiliser les credentials corrects
      curl -u admin:password http://localhost:8080/admin/workers
      

    8. Problèmes Kubernetes

    8.1 Pod en CrashLoopBackOff

    Solutions :

    1. Vérifier les logs
      kubectl logs <pod-name> --previous
      
    2. Vérifier les ressources
    3. Vérifier les probes

    8.2 Probes échouent

    Solutions :

    1. Augmenter initialDelaySeconds
    2. Vérifier que l’endpoint /admin/health répond
    3. Vérifier le port

    8.3 OOMKilled

    Solutions :

    1. Augmenter les limites mémoire
    2. Ajuster les options JVM
      -XX:MaxRAMPercentage=75.0
      

    9. Commandes de diagnostic

    9.1 API Admin

    # Santé globale
    curl http://localhost:8080/admin/health
    
    # État des workers
    curl http://localhost:8080/admin/workers
    
    # Registry
    curl http://localhost:8080/admin/registry
    
    # Circuits breakers
    curl http://localhost:8080/admin/resilience/circuits
    
    # Configuration
    curl http://localhost:8080/admin/config
    
    # Métriques
    curl http://localhost:8080/actuator/prometheus
    

    9.2 JVM

    # Thread dump
    jstack <pid>
    
    # Heap info
    jmap -heap <pid>
    
    # GC stats
    jstat -gcutil <pid> 1000
    
    # JFR recording
    jcmd <pid> JFR.start duration=60s filename=recording.jfr
    

    9.3 Réseau

    # Test Redis
    redis-cli -h localhost ping
    
    # Test HTTP
    curl -v http://localhost:8080/admin/health
    
    # DNS
    nslookup myservice.namespace.svc.cluster.local
    

    10. Logs utiles à activer

    # application.yml ou variables d'environnement
    
    logging:
      level:
        eu.lmvi.socle: DEBUG
        eu.lmvi.socle.mop: DEBUG
        eu.lmvi.socle.supervisor: DEBUG
        eu.lmvi.socle.techdb: DEBUG
        eu.lmvi.socle.resilience: DEBUG
        org.springframework.web: DEBUG
        io.lettuce: DEBUG  # Redis
    

    11. Checklist de diagnostic

    □ L'application démarre-t-elle ?
      □ Logs de démarrage présents ?
      □ Port disponible ?
      □ Configuration valide ?
    
    □ Les workers sont-ils healthy ?
      □ GET /admin/workers
      □ Heartbeats reçus ?
      □ Erreurs dans les logs ?
    
    □ Les connexions externes fonctionnent-elles ?
      □ Redis accessible ?
      □ Base de données accessible ?
      □ APIs externes accessibles ?
    
    □ Les métriques sont-elles normales ?
      □ CPU < 80% ?
      □ Mémoire < 80% ?
      □ Latence acceptable ?
      □ Taux d'erreur bas ?
    
    □ Les logs sont-ils corrects ?
      □ Log4j2 configuré ?
      □ LogForwarder fonctionne ?
      □ Pas de logs en fallback ?
    

    12. Références