Auteur/autrice : jmh

  • Socle V004 – Kubernetes

    Socle V004 – Kubernetes

    16 – Kubernetes

    Version : 4.0.0 Date : 2025-12-09

    1. Introduction

    Guide de déploiement du Socle V4 sur Kubernetes.

    2. Image Docker

    2.1 Dockerfile

    FROM eclipse-temurin:21-jre-alpine
    
    LABEL maintainer="your-team@company.com"
    LABEL version="4.0.0"
    
    WORKDIR /app
    
    # Non-root user
    RUN addgroup -S socle && adduser -S socle -G socle
    USER socle
    
    # Copy application
    COPY --chown=socle:socle target/socle-v004-4.0.0.jar app.jar
    
    # Health check
    HEALTHCHECK --interval=30s --timeout=10s --retries=3 \
      CMD wget -qO- http://localhost:8080/admin/health/live || exit 1
    
    # Default environment
    ENV JAVA_OPTS="-XX:+UseContainerSupport -XX:MaxRAMPercentage=75.0"
    
    EXPOSE 8080
    
    ENTRYPOINT ["sh", "-c", "java $JAVA_OPTS -jar app.jar"]
    

    2.2 Build et Push

    # Build
    docker build -t gcr.io/my-project/socle-v4:4.0.0 .
    
    # Push
    docker push gcr.io/my-project/socle-v4:4.0.0
    

    3. Manifests Kubernetes

    3.1 Namespace

    apiVersion: v1
    kind: Namespace
    metadata:
      name: socle
      labels:
        name: socle
    

    3.2 ConfigMap

    apiVersion: v1
    kind: ConfigMap
    metadata:
      name: socle-config
      namespace: socle
    data:
      APP_NAME: "socle-v4"
      ENV_NAME: "PROD"
      REGION: "europe-west1"
      HTTP_PORT: "8080"
      KVBUS_MODE: "redis"
      REDIS_HOST: "redis-master.redis.svc.cluster.local"
      TECHDB_ENABLED: "true"
      LOG_FORWARDER_ENABLED: "true"
      LOG_TRANSPORT_MODE: "http"
      SCHEDULER_ENABLED: "true"
      ADMIN_ENABLED: "true"
      ADMIN_AUTH_ENABLED: "true"
    

    3.3 Secret

    apiVersion: v1
    kind: Secret
    metadata:
      name: socle-secrets
      namespace: socle
    type: Opaque
    stringData:
      REDIS_PASSWORD: "your-redis-password"
      ADMIN_PASSWORD: "your-admin-password"
      API_KEY: "your-api-key"
      TECHDB_PASSWORD: "your-techdb-password"
    

    3.4 Deployment

    apiVersion: apps/v1
    kind: Deployment
    metadata:
      name: socle-v4
      namespace: socle
      labels:
        app: socle-v4
        version: "4.0.0"
    spec:
      replicas: 2
      selector:
        matchLabels:
          app: socle-v4
      strategy:
        type: RollingUpdate
        rollingUpdate:
          maxSurge: 1
          maxUnavailable: 0
      template:
        metadata:
          labels:
            app: socle-v4
            version: "4.0.0"
          annotations:
            prometheus.io/scrape: "true"
            prometheus.io/path: "/actuator/prometheus"
            prometheus.io/port: "8080"
        spec:
          serviceAccountName: socle-sa
          securityContext:
            runAsNonRoot: true
            runAsUser: 1000
            fsGroup: 1000
          containers:
            - name: socle
              image: gcr.io/my-project/socle-v4:4.0.0
              imagePullPolicy: Always
              ports:
                - name: http
                  containerPort: 8080
                  protocol: TCP
              envFrom:
                - configMapRef:
                    name: socle-config
                - secretRef:
                    name: socle-secrets
              env:
                - name: POD_NAME
                  valueFrom:
                    fieldRef:
                      fieldPath: metadata.name
                - name: POD_NAMESPACE
                  valueFrom:
                    fieldRef:
                      fieldPath: metadata.namespace
                - name: EXEC_ID
                  value: "$(POD_NAME)"
              resources:
                requests:
                  cpu: "250m"
                  memory: "512Mi"
                limits:
                  cpu: "1000m"
                  memory: "1Gi"
              livenessProbe:
                httpGet:
                  path: /admin/health/live
                  port: 8080
                initialDelaySeconds: 30
                periodSeconds: 10
                timeoutSeconds: 5
                failureThreshold: 3
              readinessProbe:
                httpGet:
                  path: /admin/health/ready
                  port: 8080
                initialDelaySeconds: 10
                periodSeconds: 5
                timeoutSeconds: 3
                failureThreshold: 3
              volumeMounts:
                - name: data
                  mountPath: /app/data
                - name: logs
                  mountPath: /app/logs
          volumes:
            - name: data
              emptyDir: {}
            - name: logs
              emptyDir: {}
          affinity:
            podAntiAffinity:
              preferredDuringSchedulingIgnoredDuringExecution:
                - weight: 100
                  podAffinityTerm:
                    labelSelector:
                      matchLabels:
                        app: socle-v4
                    topologyKey: kubernetes.io/hostname
    

    3.5 Service

    apiVersion: v1
    kind: Service
    metadata:
      name: socle-v4
      namespace: socle
      labels:
        app: socle-v4
    spec:
      type: ClusterIP
      ports:
        - name: http
          port: 80
          targetPort: 8080
          protocol: TCP
      selector:
        app: socle-v4
    

    3.6 Ingress

    apiVersion: networking.k8s.io/v1
    kind: Ingress
    metadata:
      name: socle-v4
      namespace: socle
      annotations:
        kubernetes.io/ingress.class: nginx
        nginx.ingress.kubernetes.io/ssl-redirect: "true"
        cert-manager.io/cluster-issuer: letsencrypt-prod
    spec:
      tls:
        - hosts:
            - socle.example.com
          secretName: socle-tls
      rules:
        - host: socle.example.com
          http:
            paths:
              - path: /
                pathType: Prefix
                backend:
                  service:
                    name: socle-v4
                    port:
                      number: 80
    

    3.7 HorizontalPodAutoscaler

    apiVersion: autoscaling/v2
    kind: HorizontalPodAutoscaler
    metadata:
      name: socle-v4
      namespace: socle
    spec:
      scaleTargetRef:
        apiVersion: apps/v1
        kind: Deployment
        name: socle-v4
      minReplicas: 2
      maxReplicas: 10
      metrics:
        - type: Resource
          resource:
            name: cpu
            target:
              type: Utilization
              averageUtilization: 70
        - type: Resource
          resource:
            name: memory
            target:
              type: Utilization
              averageUtilization: 80
    

    3.8 PodDisruptionBudget

    apiVersion: policy/v1
    kind: PodDisruptionBudget
    metadata:
      name: socle-v4
      namespace: socle
    spec:
      minAvailable: 1
      selector:
        matchLabels:
          app: socle-v4
    

    3.9 ServiceAccount

    apiVersion: v1
    kind: ServiceAccount
    metadata:
      name: socle-sa
      namespace: socle
    

    4. Persistence avec PVC

    4.1 PersistentVolumeClaim

    apiVersion: v1
    kind: PersistentVolumeClaim
    metadata:
      name: socle-data
      namespace: socle
    spec:
      accessModes:
        - ReadWriteOnce
      storageClassName: standard
      resources:
        requests:
          storage: 10Gi
    

    4.2 Deployment avec PVC

    # Dans le Deployment
    spec:
      template:
        spec:
          containers:
            - name: socle
              volumeMounts:
                - name: data
                  mountPath: /app/data
          volumes:
            - name: data
              persistentVolumeClaim:
                claimName: socle-data
    

    5. Network Policies

    apiVersion: networking.k8s.io/v1
    kind: NetworkPolicy
    metadata:
      name: socle-network-policy
      namespace: socle
    spec:
      podSelector:
        matchLabels:
          app: socle-v4
      policyTypes:
        - Ingress
        - Egress
      ingress:
        # Allow from ingress controller
        - from:
            - namespaceSelector:
                matchLabels:
                  name: ingress-nginx
          ports:
            - port: 8080
        # Allow from Prometheus
        - from:
            - namespaceSelector:
                matchLabels:
                  name: monitoring
          ports:
            - port: 8080
      egress:
        # Allow to Redis
        - to:
            - namespaceSelector:
                matchLabels:
                  name: redis
          ports:
            - port: 6379
        # Allow to DNS
        - to:
            - namespaceSelector: {}
              podSelector:
                matchLabels:
                  k8s-app: kube-dns
          ports:
            - port: 53
              protocol: UDP
    

    6. Helm Chart

    6.1 Chart.yaml

    apiVersion: v2
    name: socle-v4
    description: Socle V4 Framework
    version: 4.0.0
    appVersion: "4.0.0"
    

    6.2 values.yaml

    replicaCount: 2
    
    image:
      repository: gcr.io/my-project/socle-v4
      tag: "4.0.0"
      pullPolicy: Always
    
    service:
      type: ClusterIP
      port: 80
    
    ingress:
      enabled: true
      className: nginx
      hosts:
        - host: socle.example.com
          paths:
            - path: /
              pathType: Prefix
      tls:
        - secretName: socle-tls
          hosts:
            - socle.example.com
    
    resources:
      requests:
        cpu: 250m
        memory: 512Mi
      limits:
        cpu: 1000m
        memory: 1Gi
    
    autoscaling:
      enabled: true
      minReplicas: 2
      maxReplicas: 10
      targetCPUUtilizationPercentage: 70
    
    config:
      APP_NAME: socle-v4
      ENV_NAME: PROD
      KVBUS_MODE: redis
    
    secrets:
      REDIS_PASSWORD: ""
      ADMIN_PASSWORD: ""
      API_KEY: ""
    

    6.3 Installation

    # Install
    helm install socle-v4 ./socle-v4-chart -n socle --create-namespace -f values-prod.yaml
    
    # Upgrade
    helm upgrade socle-v4 ./socle-v4-chart -n socle -f values-prod.yaml
    
    # Uninstall
    helm uninstall socle-v4 -n socle
    

    7. Observability

    7.1 ServiceMonitor (Prometheus Operator)

    apiVersion: monitoring.coreos.com/v1
    kind: ServiceMonitor
    metadata:
      name: socle-v4
      namespace: socle
      labels:
        release: prometheus
    spec:
      selector:
        matchLabels:
          app: socle-v4
      endpoints:
        - port: http
          path: /actuator/prometheus
          interval: 15s
    

    7.2 PrometheusRule

    apiVersion: monitoring.coreos.com/v1
    kind: PrometheusRule
    metadata:
      name: socle-v4-alerts
      namespace: socle
    spec:
      groups:
        - name: socle-v4
          rules:
            - alert: SocleHighErrorRate
              expr: rate(socle_errors_total[5m]) > 0.1
              for: 5m
              labels:
                severity: warning
              annotations:
                summary: High error rate
    

    8. Déploiement Multi-région

    8.1 Structure

    clusters/
    ├── europe-west1/
    │   ├── kustomization.yaml
    │   └── config-patch.yaml
    ├── us-central1/
    │   ├── kustomization.yaml
    │   └── config-patch.yaml
    └── base/
        ├── kustomization.yaml
        ├── deployment.yaml
        ├── service.yaml
        └── configmap.yaml
    

    8.2 Kustomize overlay

    # clusters/europe-west1/kustomization.yaml
    apiVersion: kustomize.config.k8s.io/v1beta1
    kind: Kustomization
    bases:
      - ../../base
    patchesStrategicMerge:
      - config-patch.yaml
    configMapGenerator:
      - name: socle-config
        behavior: merge
        literals:
          - REGION=europe-west1
    

    9. Troubleshooting

    Commandes utiles

    # Logs
    kubectl logs -f deployment/socle-v4 -n socle
    
    # Describe pod
    kubectl describe pod -l app=socle-v4 -n socle
    
    # Port forward
    kubectl port-forward svc/socle-v4 8080:80 -n socle
    
    # Exec into pod
    kubectl exec -it deployment/socle-v4 -n socle -- sh
    
    # Check events
    kubectl get events -n socle --sort-by='.lastTimestamp'
    

    10. Références

  • Socle V004 – Scheduler

    Socle V004 – Scheduler

    12 – Scheduler

    Version : 4.0.0 Date : 2025-12-09

    1. Introduction

    Le Scheduler permet d’exécuter des Workers selon des expressions cron ou à intervalles réguliers.

    Caractéristiques

    • Support des expressions cron standard
    • Exécution à intervalle fixe
    • Gestion du chevauchement
    • Intégration avec le MOP

    2. Configuration

    2.1 application.yml

    socle:
      scheduler:
        enabled: ${SCHEDULER_ENABLED:true}
        thread-pool-size: ${SCHEDULER_POOL_SIZE:4}
        default-timezone: ${SCHEDULER_TIMEZONE:Europe/Paris}
    

    2.2 Variables d’environnement

    Variable Description Défaut
    SCHEDULER_ENABLED Activer le scheduler true
    SCHEDULER_POOL_SIZE Taille du thread pool 4
    SCHEDULER_TIMEZONE Timezone par défaut Europe/Paris

    3. Types de scheduling

    3.1 Cron

    Expressions cron standard (6 champs) :

    ┌───────────── seconde (0-59)
    │ ┌───────────── minute (0-59)
    │ │ ┌───────────── heure (0-23)
    │ │ │ ┌───────────── jour du mois (1-31)
    │ │ │ │ ┌───────────── mois (1-12)
    │ │ │ │ │ ┌───────────── jour de la semaine (0-6, 0=dimanche)
    │ │ │ │ │ │
    * * * * * *
    

    Exemples :

    • 0 0 6 * * ? : Tous les jours à 6h00
    • 0 */15 * * * ? : Toutes les 15 minutes
    • 0 0 0 1 * ? : Premier jour de chaque mois à minuit
    • 0 30 8 ? * MON-FRI : 8h30 du lundi au vendredi

    3.2 Intervalle fixe

    Exécution périodique simple :

    @Override
    public long getCycleIntervalMs() {
        return 60000;  // Toutes les minutes
    }
    

    4. Worker schedulé

    4.1 Avec expression cron

    @Component
    public class DailyReportWorker implements Worker {
    
        private static final Logger log = LoggerFactory.getLogger(DailyReportWorker.class);
    
        @Override
        public String getName() {
            return "daily-report-worker";
        }
    
        @Override
        public String getSchedule() {
            return "0 0 6 * * ?";  // Tous les jours à 6h
        }
    
        @Override
        public boolean isScheduled() {
            return true;
        }
    
        @Override
        public void doWork() {
            log.info("Generating daily report...");
            generateReport();
        }
    
        private void generateReport() {
            // Génération du rapport
        }
    
        // Autres méthodes Worker...
    }
    

    4.2 Avec intervalle

    @Component
    public class HealthCheckWorker implements Worker {
    
        @Override
        public String getName() {
            return "health-check-worker";
        }
    
        @Override
        public String getSchedule() {
            return null;  // Pas de cron
        }
    
        @Override
        public boolean isScheduled() {
            return false;  // Pas schedulé par cron
        }
    
        @Override
        public long getCycleIntervalMs() {
            return 30000;  // Toutes les 30 secondes
        }
    
        @Override
        public void doWork() {
            checkHealth();
        }
    }
    

    5. Interface Scheduler

    package eu.lmvi.socle.scheduler;
    
    public interface Scheduler {
    
        /**
         * Planifie un job cron
         */
        void scheduleCron(String jobId, String cronExpression, Runnable task);
    
        /**
         * Planifie un job à intervalle fixe
         */
        void scheduleInterval(String jobId, long intervalMs, Runnable task);
    
        /**
         * Planifie un job à intervalle fixe avec délai initial
         */
        void scheduleInterval(String jobId, long initialDelayMs, long intervalMs, Runnable task);
    
        /**
         * Planifie un job one-shot
         */
        void scheduleOnce(String jobId, long delayMs, Runnable task);
    
        /**
         * Annule un job
         */
        void cancel(String jobId);
    
        /**
         * Vérifie si un job est planifié
         */
        boolean isScheduled(String jobId);
    
        /**
         * Liste les jobs planifiés
         */
        List<ScheduledJob> getScheduledJobs();
    
        /**
         * Démarre le scheduler
         */
        void start();
    
        /**
         * Arrête le scheduler
         */
        void stop();
    }
    

    6. Implémentation

    package eu.lmvi.socle.scheduler;
    
    @Component
    public class DefaultScheduler implements Scheduler {
    
        private static final Logger log = LoggerFactory.getLogger(DefaultScheduler.class);
    
        private final ScheduledExecutorService executor;
        private final ConcurrentHashMap<String, ScheduledFuture<?>> jobs = new ConcurrentHashMap<>();
        private final ConcurrentHashMap<String, ScheduledJob> jobInfo = new ConcurrentHashMap<>();
        private final ZoneId timezone;
    
        public DefaultScheduler(SocleConfiguration config) {
            int poolSize = config.getScheduler().getThreadPoolSize();
            this.executor = Executors.newScheduledThreadPool(poolSize,
                r -> new Thread(r, "scheduler-" + System.currentTimeMillis()));
            this.timezone = ZoneId.of(config.getScheduler().getDefaultTimezone());
        }
    
        @Override
        public void scheduleCron(String jobId, String cronExpression, Runnable task) {
            CronExpression cron = CronExpression.parse(cronExpression);
    
            Runnable scheduledTask = () -> {
                log.debug("Executing cron job: {}", jobId);
                try {
                    task.run();
                } catch (Exception e) {
                    log.error("Error executing job {}: {}", jobId, e.getMessage(), e);
                }
                // Replanifier la prochaine exécution
                scheduleNextCronExecution(jobId, cron, task);
            };
    
            scheduleNextCronExecution(jobId, cron, task);
    
            jobInfo.put(jobId, new ScheduledJob(jobId, "cron", cronExpression, null, Instant.now()));
            log.info("Scheduled cron job: {} with expression: {}", jobId, cronExpression);
        }
    
        private void scheduleNextCronExecution(String jobId, CronExpression cron, Runnable task) {
            ZonedDateTime now = ZonedDateTime.now(timezone);
            ZonedDateTime next = cron.next(now);
    
            if (next != null) {
                long delayMs = Duration.between(now, next).toMillis();
    
                ScheduledFuture<?> future = executor.schedule(() -> {
                    task.run();
                    scheduleNextCronExecution(jobId, cron, task);
                }, delayMs, TimeUnit.MILLISECONDS);
    
                jobs.put(jobId, future);
            }
        }
    
        @Override
        public void scheduleInterval(String jobId, long intervalMs, Runnable task) {
            scheduleInterval(jobId, 0, intervalMs, task);
        }
    
        @Override
        public void scheduleInterval(String jobId, long initialDelayMs, long intervalMs, Runnable task) {
            Runnable wrappedTask = () -> {
                log.debug("Executing interval job: {}", jobId);
                try {
                    task.run();
                } catch (Exception e) {
                    log.error("Error executing job {}: {}", jobId, e.getMessage(), e);
                }
            };
    
            ScheduledFuture<?> future = executor.scheduleAtFixedRate(
                wrappedTask, initialDelayMs, intervalMs, TimeUnit.MILLISECONDS);
    
            jobs.put(jobId, future);
            jobInfo.put(jobId, new ScheduledJob(jobId, "interval", null, intervalMs, Instant.now()));
    
            log.info("Scheduled interval job: {} every {}ms", jobId, intervalMs);
        }
    
        @Override
        public void scheduleOnce(String jobId, long delayMs, Runnable task) {
            ScheduledFuture<?> future = executor.schedule(() -> {
                log.debug("Executing one-shot job: {}", jobId);
                try {
                    task.run();
                } finally {
                    jobs.remove(jobId);
                    jobInfo.remove(jobId);
                }
            }, delayMs, TimeUnit.MILLISECONDS);
    
            jobs.put(jobId, future);
            jobInfo.put(jobId, new ScheduledJob(jobId, "once", null, delayMs, Instant.now()));
    
            log.info("Scheduled one-shot job: {} in {}ms", jobId, delayMs);
        }
    
        @Override
        public void cancel(String jobId) {
            ScheduledFuture<?> future = jobs.remove(jobId);
            if (future != null) {
                future.cancel(false);
                jobInfo.remove(jobId);
                log.info("Cancelled job: {}", jobId);
            }
        }
    
        @Override
        public boolean isScheduled(String jobId) {
            return jobs.containsKey(jobId);
        }
    
        @Override
        public List<ScheduledJob> getScheduledJobs() {
            return new ArrayList<>(jobInfo.values());
        }
    
        @Override
        public void start() {
            log.info("Scheduler started");
        }
    
        @Override
        public void stop() {
            log.info("Stopping scheduler...");
            jobs.values().forEach(f -> f.cancel(false));
            jobs.clear();
            executor.shutdown();
            try {
                if (!executor.awaitTermination(30, TimeUnit.SECONDS)) {
                    executor.shutdownNow();
                }
            } catch (InterruptedException e) {
                executor.shutdownNow();
                Thread.currentThread().interrupt();
            }
            log.info("Scheduler stopped");
        }
    }
    

    7. Gestion du chevauchement

    7.1 Éviter le chevauchement

    @Component
    public class LongRunningWorker implements Worker {
    
        private final AtomicBoolean running = new AtomicBoolean(false);
    
        @Override
        public void doWork() {
            // Éviter l'exécution concurrente
            if (!running.compareAndSet(false, true)) {
                log.warn("Previous execution still running, skipping");
                return;
            }
    
            try {
                doLongWork();
            } finally {
                running.set(false);
            }
        }
    
        private void doLongWork() {
            // Traitement long
        }
    }
    

    7.2 Avec verrou distribué (multi-instances)

    @Component
    public class DistributedScheduledWorker implements Worker {
    
        @Autowired
        private KvBus kvBus;
    
        @Override
        public void doWork() {
            String lockKey = "lock:job:" + getName();
    
            // Tenter d'acquérir le lock
            if (!kvBus.putIfAbsent(lockKey, "locked", Duration.ofMinutes(10))) {
                log.debug("Job already running on another instance");
                return;
            }
    
            try {
                executeJob();
            } finally {
                kvBus.delete(lockKey);
            }
        }
    }
    

    8. Intégration MOP

    Le MOP intègre automatiquement les workers schedulés :

    // Dans MainOrchestratorProcess
    private void startScheduledWorkers() {
        for (Worker worker : workers) {
            if (worker.isScheduled() && worker.getSchedule() != null) {
                scheduler.scheduleCron(
                    "worker:" + worker.getName(),
                    worker.getSchedule(),
                    () -> {
                        if (worker.isHealthy()) {
                            worker.doWork();
                        }
                    }
                );
            }
        }
    }
    

    9. API Admin

    @RestController
    @RequestMapping("/admin/scheduler")
    public class SchedulerController {
    
        @Autowired
        private Scheduler scheduler;
    
        @GetMapping("/jobs")
        public List<ScheduledJob> listJobs() {
            return scheduler.getScheduledJobs();
        }
    
        @PostMapping("/jobs/{jobId}/cancel")
        public ResponseEntity<Void> cancelJob(@PathVariable String jobId) {
            if (scheduler.isScheduled(jobId)) {
                scheduler.cancel(jobId);
                return ResponseEntity.ok().build();
            }
            return ResponseEntity.notFound().build();
        }
    
        @PostMapping("/jobs/{jobId}/trigger")
        public ResponseEntity<Void> triggerJob(@PathVariable String jobId) {
            // Exécution immédiate one-shot
            scheduler.scheduleOnce(jobId + "-manual-" + System.currentTimeMillis(), 0, () -> {
                // Trouver et exécuter le worker correspondant
            });
            return ResponseEntity.accepted().build();
        }
    }
    

    10. Expressions Cron communes

    Expression Description
    0 0 * * * ? Toutes les heures
    0 */15 * * * ? Toutes les 15 minutes
    0 0 6 * * ? Tous les jours à 6h
    0 0 0 * * ? Tous les jours à minuit
    0 0 0 * * SUN Tous les dimanches à minuit
    0 0 0 1 * ? Premier jour du mois
    0 0 8 ? * MON-FRI 8h en semaine
    0 0 */2 * * ? Toutes les 2 heures

    11. Bonnes pratiques

    DO

    • Utiliser des noms de jobs uniques et descriptifs
    • Gérer le chevauchement pour les jobs longs
    • Utiliser des locks distribués en multi-instances
    • Logger le début et la fin des jobs
    • Monitorer l’exécution des jobs

    DON’T

    • Ne pas planifier des jobs trop fréquents sans nécessité
    • Ne pas ignorer les erreurs dans les jobs
    • Ne pas créer trop de threads
    • Ne pas bloquer indéfiniment dans un job

    12. Références

  • Socle V004 – Migration V3 vers V4

    Socle V004 – Migration V3 vers V4

    25 – Guide de Migration V3 → V4

    Version : 4.0.0 Date : 2025-12-09

    1. Résumé des changements

    1.1 Ce qui change

    Aspect V3 V4
    Logging Logback Log4j2 + LogForwarder
    Persistance technique In-memory/Redis + H2 TechDB
    Auth AdminAuthFilter local + SocleAuthClient JWT
    Registry Supervisor local + WorkerRegistryClient

    1.2 Ce qui ne change PAS

    • Architecture MOP
    • Interface Worker
    • KvBus (in_memory / Redis)
    • SharedDataRegistry
    • Supervisor
    • HttpWorker
    • AdminRestApi
    • PipelineEngine
    • CircuitBreaker / Retry
    • Scheduler

    2. Checklist de migration

    □ Phase 1: Préparation
      □ Lire ce guide en entier
      □ Backup du projet V3
      □ Créer branche migration-v4
    
    □ Phase 2: Dépendances Maven
      □ Mettre à jour pom.xml
      □ Exclure Logback
      □ Ajouter Log4j2
      □ Ajouter H2
    
    □ Phase 3: Configuration
      □ Créer log4j2.xml
      □ Créer log4j2.component.properties
      □ Supprimer logback-spring.xml
      □ Mettre à jour application.yml
    
    □ Phase 4: Code (optionnel)
      □ Intégrer TechDbManager
      □ Intégrer SocleAuthClient
      □ Intégrer WorkerRegistryClient
    
    □ Phase 5: Tests
      □ Compiler
      □ Exécuter les tests
      □ Vérifier les logs
      □ Valider H2 Console
    
    □ Phase 6: Déploiement
      □ Variables d'environnement
      □ Test en staging
      □ Déploiement production
    

    3. Phase 1 : Préparation

    3.1 Backup

    # Backup du projet V3
    cp -r socle-v003 socle-v003-backup
    
    # Créer branche
    cd socle-v003
    git checkout -b migration-v4
    

    3.2 Version cible

    <version>4.0.0</version>
    

    4. Phase 2 : Dépendances Maven

    4.1 Modifications pom.xml

    <!-- AVANT (V3) -->
    <dependency>
        <groupId>ch.qos.logback</groupId>
        <artifactId>logback-classic</artifactId>
    </dependency>
    <dependency>
        <groupId>net.logstash.logback</groupId>
        <artifactId>logstash-logback-encoder</artifactId>
    </dependency>
    
    <!-- APRÈS (V4) -->
    <!-- Exclure Logback de Spring Boot -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
        <exclusions>
            <exclusion>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-logging</artifactId>
            </exclusion>
        </exclusions>
    </dependency>
    
    <!-- Log4j2 -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-log4j2</artifactId>
    </dependency>
    
    <!-- LMAX Disruptor -->
    <dependency>
        <groupId>com.lmax</groupId>
        <artifactId>disruptor</artifactId>
        <version>4.0.0</version>
    </dependency>
    
    <!-- H2 Database -->
    <dependency>
        <groupId>com.h2database</groupId>
        <artifactId>h2</artifactId>
        <version>2.2.224</version>
    </dependency>
    

    4.2 Vérifier les exclusions

    S’assurer que Logback est exclu de TOUTES les dépendances Spring Boot :

    mvn dependency:tree | grep logback
    # Ne doit rien retourner
    

    5. Phase 3 : Configuration

    5.1 Créer log4j2.xml

    Créer src/main/resources/log4j2.xml :

    <?xml version="1.0" encoding="UTF-8"?>
    <Configuration status="WARN" monitorInterval="30">
        <Properties>
            <Property name="LOG_DIR">${env:LOG_DIR:-./logs}</Property>
            <Property name="APP_NAME">${env:APP_NAME:-socle-v4}</Property>
        </Properties>
    
        <Appenders>
            <Console name="Console" target="SYSTEM_OUT">
                <PatternLayout pattern="%d{ISO8601} %-5level [%thread] %logger{36} - %msg%n"/>
            </Console>
    
            <RollingFile name="File"
                         fileName="${LOG_DIR}/${APP_NAME}.log"
                         filePattern="${LOG_DIR}/${APP_NAME}-%d{yyyy-MM-dd}-%i.log.gz">
                <PatternLayout pattern="%d{ISO8601} %-5level [%thread] %logger{36} - %msg%n"/>
                <Policies>
                    <TimeBasedTriggeringPolicy interval="1"/>
                    <SizeBasedTriggeringPolicy size="100MB"/>
                </Policies>
                <DefaultRolloverStrategy max="30"/>
            </RollingFile>
        </Appenders>
    
        <Loggers>
            <Logger name="eu.lmvi.socle" level="${env:LOG_LEVEL:-INFO}" additivity="false">
                <AppenderRef ref="Console"/>
                <AppenderRef ref="File"/>
            </Logger>
            <Logger name="org.springframework" level="WARN"/>
            <Root level="INFO">
                <AppenderRef ref="Console"/>
                <AppenderRef ref="File"/>
            </Root>
        </Loggers>
    </Configuration>
    

    5.2 Créer log4j2.component.properties

    Créer src/main/resources/log4j2.component.properties :

    Log4jContextSelector=org.apache.logging.log4j.core.async.AsyncLoggerContextSelector
    AsyncLogger.RingBufferSize=262144
    AsyncLogger.WaitStrategy=Sleep
    log4j2.formatMsgNoLookups=true
    

    5.3 Supprimer logback-spring.xml

    rm src/main/resources/logback-spring.xml
    

    5.4 Mettre à jour application.yml

    Ajouter les nouvelles configurations V4 :

    # Ajouter à application.yml existant
    
    socle:
      # ... config V3 existante ...
    
      # NOUVEAU V4: H2 TechDB
      techdb:
        enabled: ${TECHDB_ENABLED:true}
        url: jdbc:h2:file:${TECHDB_PATH:./data/socle-techdb};MODE=PostgreSQL;DB_CLOSE_DELAY=-1
        username: socle
        password: ${TECHDB_PASSWORD:socle}
        console:
          enabled: ${H2_CONSOLE_ENABLED:false}
          path: /h2-console
    
      # NOUVEAU V4: LogForwarder (optionnel)
      logging:
        forwarder:
          enabled: ${LOG_FORWARDER_ENABLED:false}
          transport-mode: ${LOG_TRANSPORT_MODE:http}
          log-hub-url: ${LOG_HUB_URL:}
    
      # NOUVEAU V4: Auth Client (optionnel)
      auth:
        enabled: ${AUTH_ENABLED:false}
        server-url: ${AUTH_SERVER_URL:}
        api-key: ${API_KEY:}
    
      # NOUVEAU V4: Worker Registry (optionnel)
      worker-registry:
        enabled: ${WORKER_REGISTRY_ENABLED:false}
        server-url: ${WORKER_REGISTRY_URL:}
    
    # Logging
    logging:
      config: classpath:log4j2.xml
    

    6. Phase 4 : Code (optionnel)

    Les composants V4 sont optionnels et activés via configuration. Aucune modification de code n’est obligatoire.

    6.1 Si vous voulez utiliser TechDB

    @Autowired(required = false)
    private TechDbManager techDbManager;
    
    // Utilisation
    if (techDbManager != null) {
        techDbManager.saveOffset("kafka", "topic-0", offset, null);
    }
    

    6.2 Si vous voulez utiliser AuthClient

    @Autowired(required = false)
    private SocleAuthClient authClient;
    
    // Utilisation
    if (authClient != null && authClient.isAuthenticated()) {
        String token = authClient.getValidAccessToken();
    }
    

    6.3 Si vous voulez utiliser WorkerRegistry

    @Autowired(required = false)
    private WorkerRegistryClient registryClient;
    
    // L'intégration MOP est automatique si enabled
    

    7. Phase 5 : Tests

    7.1 Compilation

    mvn clean compile
    

    Erreurs possibles :

    • package ch.qos.logback does not exist → Logback pas complètement exclu
    • cannot find symbol: class Logger → Import SLF4J correct ?

    7.2 Tests unitaires

    mvn test
    

    7.3 Vérification logs

    mvn spring-boot:run
    

    Vérifier :

    • Logs apparaissent en console
    • Format correct
    • Pas d’erreur Log4j2

    7.4 H2 Console

    Si H2_CONSOLE_ENABLED=true :

    1. Ouvrir http://localhost:8080/h2-console
    2. JDBC URL: jdbc:h2:file:./data/socle-techdb
    3. Vérifier les tables créées

    8. Phase 6 : Déploiement

    8.1 Variables d’environnement (minimum)

    # Existantes V3 (inchangées)
    APP_NAME=my-app
    ENV_NAME=PROD
    HTTP_PORT=8080
    
    # Nouvelles V4 (optionnelles)
    TECHDB_ENABLED=true
    TECHDB_PATH=./data/techdb
    H2_CONSOLE_ENABLED=false
    LOG_FORWARDER_ENABLED=false
    AUTH_ENABLED=false
    WORKER_REGISTRY_ENABLED=false
    

    8.2 Docker

    Mettre à jour le Dockerfile si nécessaire :

    # Pas de changement requis si vous utilisez le JAR
    FROM eclipse-temurin:21-jre
    COPY target/socle-v004-4.0.0.jar app.jar
    ENTRYPOINT ["java", "-jar", "app.jar"]
    

    8.3 Kubernetes

    Mettre à jour les ConfigMaps/Secrets avec les nouvelles variables.

    9. Mapping des configurations

    9.1 Logback → Log4j2

    Logback Log4j2
    <appender class="ConsoleAppender"> <Console name="...">
    <appender class="RollingFileAppender"> <RollingFile name="...">
    <encoder><pattern> <PatternLayout pattern="...">
    <root level="INFO"> <Root level="INFO">
    <logger name="..." level="..."> <Logger name="..." level="...">

    9.2 Pattern identique

    Le pattern de log reste identique :

    %d{ISO8601} %-5level [%thread] %logger{36} - %msg%n
    

    10. Rollback

    En cas de problème, pour revenir à V3 :

    # Restaurer le pom.xml V3
    git checkout HEAD~1 -- pom.xml
    
    # Restaurer logback-spring.xml
    git checkout HEAD~1 -- src/main/resources/logback-spring.xml
    
    # Supprimer fichiers V4
    rm src/main/resources/log4j2.xml
    rm src/main/resources/log4j2.component.properties
    
    # Rebuild
    mvn clean package
    

    11. FAQ

    Q: Dois-je modifier mon code de logging ?

    Non. Le code reste identique :

    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    private static final Logger log = LoggerFactory.getLogger(MyClass.class);
    log.info("Message");
    

    Q: Les MDC fonctionnent-ils ?

    Oui. MDC fonctionne de manière identique avec Log4j2.

    Q: Puis-je activer les fonctionnalités V4 progressivement ?

    Oui. Toutes les fonctionnalités V4 sont optionnelles :

    • TECHDB_ENABLED=false → Pas de H2
    • LOG_FORWARDER_ENABLED=false → Pas de LogForwarder
    • AUTH_ENABLED=false → Pas d’auth JWT
    • WORKER_REGISTRY_ENABLED=false → Pas de registry

    Q: Performance : V4 est-il plus rapide ?

    Oui pour le logging. Log4j2 avec AsyncLoggers (LMAX Disruptor) est 6-68x plus rapide que Logback en mode async.

    12. Références

  • Socle V004 – GraalVM JavaScript

    Socle V004 – GraalVM JavaScript

    GraalVM et JavaScript (GraalJS)

    Version : 4.0.0 Date : 2025-12-13

    1. Vue d’ensemble

    Le Socle V004 supporte l’exécution de scripts JavaScript pour l’enrichissement et la transformation de données via GraalJS, le moteur JavaScript de GraalVM.

    Pourquoi GraalVM ?

    • Performance : Compilation JIT pour JavaScript, bien plus rapide que Nashorn
    • Compatibilité : Support ECMAScript moderne (ES2023+)
    • Interopérabilité : Accès bidirectionnel entre Java et JavaScript
    • Sécurité : Sandbox configurable pour isoler l’exécution

    2. Prérequis

    2.1 GraalVM CE 21.0.2

    Important : Pour les applications utilisant GraalJS, GraalVM CE 21.0.2 est requis au lieu d’OpenJDK standard.

    OpenJDK ne contient pas le runtime Truffle nécessaire à GraalJS. Utiliser OpenJDK avec GraalJS provoquera l’erreur :

    org.graalvm.polyglot.PolyglotException:
    The Truffle API cannot be used without GraalVM runtime.
    

    2.2 Installation de GraalVM

    # Télécharger GraalVM CE 21.0.2
    wget https://github.com/graalvm/graalvm-ce-builds/releases/download/jdk-21.0.2/graalvm-community-jdk-21.0.2_linux-x64_bin.tar.gz
    
    # Extraire dans /opt
    sudo tar -xzf graalvm-community-jdk-21.0.2_linux-x64_bin.tar.gz -C /opt/
    
    # Créer un lien symbolique
    sudo ln -s /opt/graalvm-community-openjdk-21.0.2+13.1 /opt/graalvm
    
    # Vérifier l'installation
    /opt/graalvm/bin/java -version
    

    Sortie attendue :

    openjdk version "21.0.2" 2024-01-16
    OpenJDK Runtime Environment GraalVM CE 21.0.2+13.1 (build 21.0.2+13-jvmci-23.1-b30)
    OpenJDK 64-Bit Server VM GraalVM CE 21.0.2+13.1 (build 21.0.2+13-jvmci-23.1-b30, mixed mode, sharing)
    

    3. Configuration Maven

    3.1 Version GraalJS

    La version de GraalJS doit correspondre à la version du runtime GraalVM :

    GraalVM CE GraalJS Version
    21.0.2 (23.1) 23.1.0
    22.0.0 (24.0) 24.0.0

    3.2 Dépendances pom.xml

    <properties>
        <graaljs.version>23.1.0</graaljs.version>
    </properties>
    
    <dependencies>
        <!-- GraalVM Polyglot API -->
        <dependency>
            <groupId>org.graalvm.polyglot</groupId>
            <artifactId>polyglot</artifactId>
            <version>${graaljs.version}</version>
        </dependency>
    
        <!-- GraalJS Community (runtime) -->
        <dependency>
            <groupId>org.graalvm.polyglot</groupId>
            <artifactId>js-community</artifactId>
            <version>${graaljs.version}</version>
            <type>pom</type>
            <scope>runtime</scope>
        </dependency>
    
        <!-- Script Engine (optionnel, pour JSR-223) -->
        <dependency>
            <groupId>org.graalvm.js</groupId>
            <artifactId>js-scriptengine</artifactId>
            <version>${graaljs.version}</version>
        </dependency>
    </dependencies>
    

    4. Options JVM

    4.1 Option obligatoire pour uber-jars

    Pour les fat JARs (Spring Boot repackaged), cette option est obligatoire :

    -Dpolyglotimpl.DisableClassPathIsolation=true
    

    Sans cette option, vous obtiendrez :

    java.lang.NullPointerException: Cannot invoke "java.io.File.toPath()"
    because the return value of "com.oracle.truffle.polyglot.FileSystems$InternalFileSystemContext.collectClassPathJars()"
    

    4.2 Supprimer les avertissements

    Pour supprimer les avertissements « interpreter only » :

    -Dpolyglot.engine.WarnInterpreterOnly=false
    

    4.3 Configuration systemd complète

    [Service]
    Environment="JAVA_OPTS=-Xms512m -Xmx1024m -XX:+UseG1GC -Dpolyglotimpl.DisableClassPathIsolation=true"
    ExecStart=/opt/graalvm/bin/java $JAVA_OPTS -jar /opt/myapp/myapp.jar
    

    5. Utilisation de GraalJS

    5.1 Exemple basique

    import org.graalvm.polyglot.Context;
    import org.graalvm.polyglot.Value;
    
    public class JavaScriptExecutor {
    
        public String execute(String script, Map<String, Object> bindings) {
            try (Context context = Context.newBuilder("js")
                    .allowAllAccess(false)
                    .allowHostAccess(HostAccess.ALL)
                    .allowHostClassLookup(className -> false)
                    .option("engine.WarnInterpreterOnly", "false")
                    .build()) {
    
                Value jsBindings = context.getBindings("js");
                bindings.forEach(jsBindings::putMember);
    
                Value result = context.eval("js", script);
                return result.asString();
            }
        }
    }
    

    5.2 Options de sécurité

    Option Description
    allowAllAccess(false) Désactive tous les accès par défaut
    allowHostAccess(HostAccess.ALL) Permet l’accès aux objets Java passés
    allowHostClassLookup(className -> false) Empêche Java.type()
    allowIO(false) Désactive les accès fichiers
    allowNativeAccess(false) Désactive les appels natifs

    6. Performance et initialisation

    6.1 Temps d’initialisation

    La première exécution de GraalJS prend environ 10-15 secondes car :

    • Chargement du runtime Truffle
    • Compilation JIT du moteur JavaScript
    • Initialisation des structures internes

    Les exécutions suivantes sont très rapides (<100ms).

    6.2 Pré-initialisation au démarrage

    Pour éviter la latence sur le premier événement, vous pouvez pré-initialiser GraalVM :

    @Component
    public class GraalVMWarmup {
    
        private Engine sharedEngine;
    
        @PostConstruct
        public void init() {
            log.info("Pre-initializing GraalVM JavaScript engine...");
            long start = System.currentTimeMillis();
    
            this.sharedEngine = Engine.newBuilder()
                .option("engine.WarnInterpreterOnly", "false")
                .build();
    
            // Warm-up avec un script simple
            try (Context warmup = Context.newBuilder("js")
                    .engine(sharedEngine)
                    .allowAllAccess(false)
                    .allowHostAccess(HostAccess.ALL)
                    .build()) {
                warmup.eval("js", "var x = 1 + 1;");
            }
    
            log.info("GraalVM initialized in {} ms", System.currentTimeMillis() - start);
        }
    
        @PreDestroy
        public void cleanup() {
            if (sharedEngine != null) {
                sharedEngine.close();
            }
        }
    
        public Engine getSharedEngine() {
            return sharedEngine;
        }
    }
    

    6.3 Impact sur le démarrage

    Configuration Démarrage app Premier événement
    Sans pré-init ~40s ~12s
    Avec pré-init ~52s (+12s) <100ms

    7. Troubleshooting

    7.1 « Truffle API cannot be used without GraalVM runtime »

    Cause : Utilisation d’OpenJDK au lieu de GraalVM Solution : Installer et utiliser GraalVM CE 21.0.2

    7.2 « NullPointerException in collectClassPathJars »

    Cause : Classpath isolation dans un uber-jar Solution : Ajouter -Dpolyglotimpl.DisableClassPathIsolation=true

    7.3 « Version mismatch: truffle-api-24.0.0 vs GraalVM 23.1 »

    Cause : Version GraalJS incompatible avec le runtime GraalVM Solution : Aligner graaljs.version avec la version GraalVM installée

    7.4 Script timeout

    Cause : Initialisation GraalVM dépasse le timeout Solution : Augmenter le timeout à 30 secondes pour le premier appel, ou pré-initialiser

    8. Références

  • Socle V004 – API Administration

    Socle V004 – API Administration

    14 – Admin API

    Version : 4.0.0 Date : 2025-12-09

    1. Introduction

    L’Admin API expose des endpoints REST pour l’administration et le monitoring du Socle V4.

    Endpoints principaux

    Endpoint Description
    /admin/health État de santé
    /admin/workers État des workers
    /admin/config Configuration
    /admin/registry SharedDataRegistry
    /admin/metrics Métriques

    2. Configuration

    2.1 application.yml

    socle:
      admin:
        enabled: ${ADMIN_ENABLED:true}
        path-prefix: ${ADMIN_PATH_PREFIX:/admin}
        auth:
          enabled: ${ADMIN_AUTH_ENABLED:false}
          username: ${ADMIN_USERNAME:admin}
          password: ${ADMIN_PASSWORD:}
    

    2.2 Variables d’environnement

    Variable Description Défaut
    ADMIN_ENABLED Activer l’API admin true
    ADMIN_PATH_PREFIX Préfixe des endpoints /admin
    ADMIN_AUTH_ENABLED Activer l’authentification false
    ADMIN_USERNAME Utilisateur admin admin
    ADMIN_PASSWORD Mot de passe admin

    3. Endpoints Health

    3.1 GET /admin/health

    État de santé global de l’application.

    Réponse :

    {
      "status": "HEALTHY",
      "timestamp": "2025-12-09T10:30:00Z",
      "uptime": "2h 30m 15s",
      "unhealthyWorkers": [],
      "components": {
        "database": "UP",
        "redis": "UP",
        "techdb": "UP"
      }
    }
    

    Codes HTTP :

    • 200 : HEALTHY
    • 503 : UNHEALTHY ou DEGRADED

    3.2 GET /admin/health/live

    Liveness probe pour Kubernetes.

    {
      "status": "UP"
    }
    

    3.3 GET /admin/health/ready

    Readiness probe pour Kubernetes.

    {
      "status": "READY",
      "checks": {
        "workers": "OK",
        "database": "OK"
      }
    }
    

    4. Endpoints Workers

    4.1 GET /admin/workers

    Liste tous les workers et leur état.

    {
      "workers": [
        {
          "name": "kafka-consumer",
          "status": "RUNNING",
          "lastHeartbeat": "2025-12-09T10:29:55Z",
          "healthy": true,
          "stats": {
            "processed": 12345,
            "errors": 2
          }
        },
        {
          "name": "order-processor",
          "status": "RUNNING",
          "lastHeartbeat": "2025-12-09T10:29:58Z",
          "healthy": true,
          "stats": {
            "ordersProcessed": 567
          }
        }
      ],
      "total": 2,
      "healthy": 2,
      "unhealthy": 0
    }
    

    4.2 GET /admin/workers/{name}

    Détails d’un worker spécifique.

    {
      "name": "kafka-consumer",
      "status": "RUNNING",
      "healthy": true,
      "startPriority": 10,
      "stopPriority": 90,
      "scheduled": false,
      "passive": false,
      "cycleIntervalMs": 1000,
      "lastHeartbeat": "2025-12-09T10:29:55Z",
      "stats": {
        "processed": 12345,
        "errors": 2,
        "lastOffset": 98765
      }
    }
    

    4.3 POST /admin/workers/{name}/stop

    Arrête un worker spécifique.

    curl -X POST http://localhost:8080/admin/workers/kafka-consumer/stop
    

    4.4 POST /admin/workers/{name}/start

    Redémarre un worker arrêté.

    curl -X POST http://localhost:8080/admin/workers/kafka-consumer/start
    

    5. Endpoints Configuration

    5.1 GET /admin/config

    Configuration actuelle (sans secrets).

    {
      "appName": "socle-v4",
      "envName": "PROD",
      "region": "MTQ",
      "version": "4.0.0",
      "execId": "socle-v4-abc123",
      "kvbus": {
        "mode": "redis"
      },
      "techdb": {
        "enabled": true
      },
      "logging": {
        "forwarder": {
          "enabled": true,
          "transportMode": "http"
        }
      }
    }
    

    5.2 GET /admin/config/env

    Variables d’environnement (filtrées).

    {
      "APP_NAME": "socle-v4",
      "ENV_NAME": "PROD",
      "REGION": "MTQ",
      "HTTP_PORT": "8080",
      "KVBUS_MODE": "redis"
    }
    

    6. Endpoints Registry

    6.1 GET /admin/registry

    Contenu du SharedDataRegistry.

    {
      "database.connected": true,
      "metrics.requests.total": 12345,
      "metrics.errors.total": 23,
      "batch.current.id": "batch-001",
      "worker.kafka.offset": 98765
    }
    

    6.2 GET /admin/registry/{key}

    Valeur d’une clé spécifique.

    {
      "key": "metrics.requests.total",
      "value": 12345,
      "healthLevel": "NORMAL"
    }
    

    6.3 GET /admin/registry/health

    Clés avec leur niveau de santé.

    {
      "database.connected": "CRITICAL",
      "cache.available": "IMPORTANT",
      "metrics.requests.total": "INFO"
    }
    

    6.4 GET /admin/registry/unhealthy

    Clés en état unhealthy.

    [
      {
        "key": "external.api.available",
        "value": false,
        "healthLevel": "CRITICAL"
      }
    ]
    

    7. Endpoints TechDB (V4)

    7.1 GET /admin/techdb/offsets

    Tous les offsets stockés.

    {
      "offsets": [
        {
          "sourceName": "kafka",
          "partitionKey": "orders-topic-0",
          "lastSequence": 123456,
          "lastUpdated": "2025-12-09T10:30:00Z"
        },
        {
          "sourceName": "nats",
          "partitionKey": "events.orders",
          "lastSequence": 789012,
          "lastUpdated": "2025-12-09T10:29:55Z"
        }
      ]
    }
    

    7.2 GET /admin/techdb/workers

    État des workers persisté.

    {
      "workers": [
        {
          "workerId": "kafka-consumer-001",
          "status": "RUNNING",
          "lastHeartbeat": "2025-12-09T10:30:00Z",
          "metadata": {
            "messagesPerMinute": 523
          }
        }
      ]
    }
    

    7.3 GET /admin/techdb/events

    Événements techniques récents.

    {
      "events": [
        {
          "id": 123,
          "createdAt": "2025-12-09T10:25:00Z",
          "type": "PIPELINE_ERROR",
          "payload": {
            "pipeline": "order-processing",
            "error": "Connection timeout"
          }
        }
      ]
    }
    

    8. Endpoints Resilience

    8.1 GET /admin/resilience/circuits

    État des circuit breakers.

    {
      "circuits": {
        "payment-gateway": "CLOSED",
        "inventory-api": "HALF_OPEN",
        "notification-service": "OPEN"
      }
    }
    

    8.2 POST /admin/resilience/circuits/{name}/reset

    Reset un circuit breaker.

    curl -X POST http://localhost:8080/admin/resilience/circuits/notification-service/reset
    

    9. Endpoints Scheduler

    9.1 GET /admin/scheduler/jobs

    Jobs schedulés.

    {
      "jobs": [
        {
          "jobId": "worker:daily-report",
          "type": "cron",
          "schedule": "0 0 6 * * ?",
          "scheduledAt": "2025-12-09T06:00:00Z"
        },
        {
          "jobId": "worker:health-check",
          "type": "interval",
          "intervalMs": 30000,
          "scheduledAt": "2025-12-09T10:00:00Z"
        }
      ]
    }
    

    9.2 POST /admin/scheduler/jobs/{jobId}/trigger

    Déclenche un job immédiatement.

    curl -X POST http://localhost:8080/admin/scheduler/jobs/worker:daily-report/trigger
    

    10. Endpoints LogForwarder (V4)

    10.1 GET /admin/logforwarder/status

    État du LogForwarder.

    {
      "enabled": true,
      "transportMode": "http",
      "queueSize": 23,
      "queueCapacity": 10000,
      "fallbackCount": 0,
      "lastFlush": "2025-12-09T10:29:55Z"
    }
    

    10.2 POST /admin/logforwarder/flush

    Force le flush des logs.

    curl -X POST http://localhost:8080/admin/logforwarder/flush
    

    10.3 POST /admin/logforwarder/replay

    Rejoue les logs en fallback.

    curl -X POST http://localhost:8080/admin/logforwarder/replay
    

    11. Implémentation

    package eu.lmvi.socle.admin;
    
    @RestController
    @RequestMapping("${socle.admin.path-prefix:/admin}")
    public class AdminRestApi {
    
        @Autowired private Supervisor supervisor;
        @Autowired private SharedDataRegistry registry;
        @Autowired private SocleConfiguration config;
        @Autowired(required = false) private TechDbManager techDb;
        @Autowired(required = false) private Scheduler scheduler;
    
        // === Health ===
    
        @GetMapping("/health")
        public ResponseEntity<HealthResponse> health() {
            HealthStatus status = supervisor.getGlobalHealth();
            HttpStatus httpStatus = status == HealthStatus.HEALTHY
                ? HttpStatus.OK
                : HttpStatus.SERVICE_UNAVAILABLE;
    
            return ResponseEntity.status(httpStatus).body(new HealthResponse(
                status,
                Instant.now(),
                getUptime(),
                supervisor.getUnhealthyWorkers()
            ));
        }
    
        @GetMapping("/health/live")
        public ResponseEntity<Map<String, String>> live() {
            return ResponseEntity.ok(Map.of("status", "UP"));
        }
    
        @GetMapping("/health/ready")
        public ResponseEntity<Map<String, Object>> ready() {
            HealthStatus status = supervisor.getGlobalHealth();
            if (status != HealthStatus.HEALTHY) {
                return ResponseEntity.status(503).body(Map.of(
                    "status", "NOT_READY",
                    "unhealthy", supervisor.getUnhealthyWorkers()
                ));
            }
            return ResponseEntity.ok(Map.of("status", "READY"));
        }
    
        // === Workers ===
    
        @GetMapping("/workers")
        public Map<String, Object> workers() {
            Map<String, WorkerState> states = supervisor.getAllWorkerStates();
            return Map.of(
                "workers", states.values(),
                "total", states.size(),
                "healthy", states.values().stream().filter(WorkerState::isHealthy).count(),
                "unhealthy", states.values().stream().filter(s -> !s.isHealthy()).count()
            );
        }
    
        @GetMapping("/workers/{name}")
        public ResponseEntity<WorkerState> worker(@PathVariable String name) {
            WorkerState state = supervisor.getWorkerState(name);
            return state != null
                ? ResponseEntity.ok(state)
                : ResponseEntity.notFound().build();
        }
    
        // === Config ===
    
        @GetMapping("/config")
        public Map<String, Object> config() {
            return Map.of(
                "appName", config.getApp_name(),
                "envName", config.getEnv_name(),
                "region", config.getRegion(),
                "version", config.getVersion(),
                "execId", config.getExec_id()
            );
        }
    
        // === Registry ===
    
        @GetMapping("/registry")
        public Map<String, Object> registry() {
            return registry.getAll();
        }
    
        @GetMapping("/registry/{key}")
        public ResponseEntity<Map<String, Object>> registryKey(@PathVariable String key) {
            return registry.get(key)
                .map(v -> ResponseEntity.ok(Map.of(
                    "key", key,
                    "value", v,
                    "healthLevel", registry.getHealthLevel(key)
                )))
                .orElse(ResponseEntity.notFound().build());
        }
    
        // ... autres endpoints
    }
    

    12. Sécurité

    12.1 Authentification Basic

    # Avec authentification
    curl -u admin:secret http://localhost:8080/admin/workers
    
    # En-tête Authorization
    curl -H "Authorization: Basic YWRtaW46c2VjcmV0" http://localhost:8080/admin/workers
    

    12.2 Endpoints publics

    Les endpoints suivants sont accessibles sans authentification :

    • /admin/health
    • /admin/health/live
    • /admin/health/ready

    13. Bonnes pratiques

    DO

    • Activer l’authentification en production
    • Utiliser HTTPS pour l’API admin
    • Limiter l’accès réseau à l’API admin
    • Monitorer les accès à l’API admin

    DON’T

    • Ne pas exposer l’API admin publiquement
    • Ne pas désactiver l’authentification en production
    • Ne pas logger les credentials

    14. Références

  • Socle V004 – Worker Registry

    Socle V004 – Worker Registry

    24 – Client Worker Registry (Nouveauté V4)

    Version : 4.0.0 Date : 2025-12-09

    1. Introduction

    Le WorkerRegistryClient permet aux applications Socle V4 de s’auto-enregistrer auprès d’un Registry central pour la supervision.

    Bénéfices

    • Visibilité : Savoir quels workers sont actifs par région
    • Supervision : Détecter les workers « LOST » (heartbeat manquant)
    • Diagnostique : Informations version, capabilities, charge

    2. Architecture

    ┌─────────────────┐                    ┌─────────────────┐
    │   Application   │                    │  Registry       │
    │   Socle V4      │                    │  (central)      │
    └────────┬────────┘                    └────────┬────────┘
             │                                      │
             │  1. POST /workers/register           │
             │     (au démarrage)                   │
             │─────────────────────────────────────►│
             │                                      │
             │  2. POST /workers/heartbeat          │
             │     (toutes les 30s)                 │
             │─────────────────────────────────────►│
             │                                      │
             │  3. DELETE /workers/{id}             │
             │     (à l'arrêt)                      │
             │─────────────────────────────────────►│
             │                                      │
    
                        ┌─────────────────┐
                        │  Metabase /     │
                        │  Grafana        │◄────── Consultation
                        └─────────────────┘
    

    3. Configuration

    3.1 application.yml

    socle:
      worker-registry:
        enabled: ${WORKER_REGISTRY_ENABLED:false}
        server-url: ${WORKER_REGISTRY_URL:https://registry.lmvi.org}
        heartbeat-interval-ms: ${REGISTRY_HEARTBEAT_INTERVAL_MS:30000}
        connect-timeout-ms: 10000
        read-timeout-ms: 30000
    

    3.2 Variables d’environnement

    Variable Description Défaut
    WORKER_REGISTRY_ENABLED Activer le registry false
    WORKER_REGISTRY_URL URL du Registry
    REGISTRY_HEARTBEAT_INTERVAL_MS Intervalle heartbeat (ms) 30000

    4. Interface WorkerRegistryClient

    package eu.lmvi.socle.client.registry;
    
    /**
     * Client Registry pour auto-enregistrement des Workers
     */
    public interface WorkerRegistryClient {
    
        /**
         * Enregistrement initial au démarrage
         * @param registration Informations du worker
         * @throws RegistryException si échec
         */
        void register(WorkerRegistration registration) throws RegistryException;
    
        /**
         * Heartbeat périodique
         * @param heartbeat État courant du worker
         * @throws RegistryException si échec
         */
        void heartbeat(WorkerHeartbeat heartbeat) throws RegistryException;
    
        /**
         * Désenregistrement à l'arrêt
         * @param workerId ID du worker
         * @throws RegistryException si échec
         */
        void unregister(String workerId) throws RegistryException;
    
        /**
         * Vérifie si le worker est enregistré
         */
        boolean isRegistered();
    }
    

    5. DTOs

    5.1 WorkerRegistration

    package eu.lmvi.socle.client.registry;
    
    /**
     * Informations d'enregistrement d'un worker
     */
    public record WorkerRegistration(
        String workerId,          // Identifiant unique (ex: "AGENT-DB2-MTQ-001")
        String workerType,        // Type de worker (ex: "journal-reader")
        String region,            // Région (ex: "MTQ", "GUA", "REU")
        String host,              // Hostname
        String version,           // Version de l'application
        List<String> capabilities,// Capacités (ex: ["db2-cdc", "nats-publisher"])
        Map<String, Object> extra // Métadonnées additionnelles
    ) {
        public static WorkerRegistration of(SocleConfiguration config) {
            return new WorkerRegistration(
                config.getExec_id(),
                config.getApp_name(),
                config.getRegion(),
                getHostname(),
                config.getVersion(),
                List.of(),
                Map.of()
            );
        }
    
        private static String getHostname() {
            try {
                return InetAddress.getLocalHost().getHostName();
            } catch (UnknownHostException e) {
                return "unknown";
            }
        }
    }
    

    5.2 WorkerHeartbeat

    package eu.lmvi.socle.client.registry;
    
    /**
     * Heartbeat périodique d'un worker
     */
    public record WorkerHeartbeat(
        String workerId,           // ID du worker
        String status,             // RUNNING, STOPPING, ERROR
        Map<String, Object> load   // Métriques de charge
    ) {
        public static WorkerHeartbeat running(String workerId, Map<String, Object> load) {
            return new WorkerHeartbeat(workerId, "RUNNING", load);
        }
    
        public static WorkerHeartbeat stopping(String workerId) {
            return new WorkerHeartbeat(workerId, "STOPPING", Map.of());
        }
    
        public static WorkerHeartbeat error(String workerId, String errorMessage) {
            return new WorkerHeartbeat(workerId, "ERROR", Map.of("error", errorMessage));
        }
    }
    

    6. Implémentation

    package eu.lmvi.socle.client.registry;
    
    @Component
    @ConditionalOnProperty(name = "socle.worker-registry.enabled", havingValue = "true")
    public class HttpWorkerRegistryClient implements WorkerRegistryClient {
    
        private static final Logger log = LoggerFactory.getLogger(HttpWorkerRegistryClient.class);
    
        private final SocleConfiguration config;
        private final SocleAuthClient authClient;
        private final OkHttpClient httpClient;
        private final ObjectMapper objectMapper;
    
        private volatile boolean registered = false;
        private volatile String currentWorkerId;
    
        public HttpWorkerRegistryClient(
                SocleConfiguration config,
                @Autowired(required = false) SocleAuthClient authClient) {
            this.config = config;
            this.authClient = authClient;
            this.objectMapper = new ObjectMapper();
    
            this.httpClient = new OkHttpClient.Builder()
                .connectTimeout(config.getRegistryConnectTimeoutMs(), TimeUnit.MILLISECONDS)
                .readTimeout(config.getRegistryReadTimeoutMs(), TimeUnit.MILLISECONDS)
                .build();
        }
    
        @Override
        public void register(WorkerRegistration registration) throws RegistryException {
            log.info("Registering worker: {} ({})", registration.workerId(), registration.workerType());
    
            try {
                String json = objectMapper.writeValueAsString(registration);
    
                Request.Builder requestBuilder = new Request.Builder()
                    .url(config.getRegistryServerUrl() + "/api/v1/workers/register")
                    .post(RequestBody.create(json, MediaType.parse("application/json")));
    
                // Add auth if available
                if (authClient != null && authClient.isAuthenticated()) {
                    requestBuilder.header("Authorization", "Bearer " + authClient.getValidAccessToken());
                }
    
                try (Response response = httpClient.newCall(requestBuilder.build()).execute()) {
                    if (!response.isSuccessful()) {
                        throw new RegistryException("Registration failed: " + response.code());
                    }
    
                    registered = true;
                    currentWorkerId = registration.workerId();
                    log.info("Worker registered successfully: {}", registration.workerId());
                }
            } catch (IOException e) {
                throw new RegistryException("Registration failed", e);
            }
        }
    
        @Override
        public void heartbeat(WorkerHeartbeat heartbeat) throws RegistryException {
            if (!registered) {
                log.warn("Cannot send heartbeat, worker not registered");
                return;
            }
    
            log.debug("Sending heartbeat: {} - {}", heartbeat.workerId(), heartbeat.status());
    
            try {
                String json = objectMapper.writeValueAsString(heartbeat);
    
                Request.Builder requestBuilder = new Request.Builder()
                    .url(config.getRegistryServerUrl() + "/api/v1/workers/heartbeat")
                    .post(RequestBody.create(json, MediaType.parse("application/json")));
    
                if (authClient != null && authClient.isAuthenticated()) {
                    requestBuilder.header("Authorization", "Bearer " + authClient.getValidAccessToken());
                }
    
                try (Response response = httpClient.newCall(requestBuilder.build()).execute()) {
                    if (!response.isSuccessful()) {
                        log.warn("Heartbeat failed: {}", response.code());
                        // Don't throw - heartbeat failure is not critical
                    }
                }
            } catch (IOException e) {
                log.warn("Heartbeat failed: {}", e.getMessage());
                // Don't throw - heartbeat failure is not critical
            }
        }
    
        @Override
        public void unregister(String workerId) throws RegistryException {
            if (!registered) {
                return;
            }
    
            log.info("Unregistering worker: {}", workerId);
    
            try {
                Request.Builder requestBuilder = new Request.Builder()
                    .url(config.getRegistryServerUrl() + "/api/v1/workers/" + workerId)
                    .delete();
    
                if (authClient != null && authClient.isAuthenticated()) {
                    requestBuilder.header("Authorization", "Bearer " + authClient.getValidAccessToken());
                }
    
                try (Response response = httpClient.newCall(requestBuilder.build()).execute()) {
                    // Ignore response - best effort
                    registered = false;
                    currentWorkerId = null;
                    log.info("Worker unregistered: {}", workerId);
                }
            } catch (IOException e) {
                log.warn("Unregister failed: {}", e.getMessage());
                // Don't throw - unregister failure is not critical
            }
        }
    
        @Override
        public boolean isRegistered() {
            return registered;
        }
    }
    

    7. Intégration avec MOP

    7.1 Enregistrement au démarrage

    // Dans MainOrchestratorProcess.start()
    if (registryClient != null && config.isWorkerRegistryEnabled()) {
        log.info("[step:registry_register] Enregistrement au Worker Registry");
        try {
            WorkerRegistration registration = new WorkerRegistration(
                config.getExec_id(),
                config.getApp_name(),
                config.getRegion(),
                InetAddress.getLocalHost().getHostName(),
                config.getVersion(),
                getWorkerCapabilities(),
                Map.of(
                    "startTime", Instant.now(),
                    "javaVersion", System.getProperty("java.version")
                )
            );
            registryClient.register(registration);
        } catch (RegistryException e) {
            log.warn("Registry registration failed, continuing", e);
        }
    }
    

    7.2 Heartbeat périodique

    // Dans la boucle principale ou via ScheduledExecutorService
    private void sendRegistryHeartbeat() {
        if (registryClient == null || !registryClient.isRegistered()) {
            return;
        }
    
        try {
            WorkerHeartbeat heartbeat = new WorkerHeartbeat(
                config.getExec_id(),
                "RUNNING",
                Map.of(
                    "uptime", getUptime(),
                    "workersCount", workers.size(),
                    "healthyWorkers", countHealthyWorkers(),
                    "memoryUsedMb", getMemoryUsedMb()
                )
            );
            registryClient.heartbeat(heartbeat);
        } catch (RegistryException e) {
            log.debug("Heartbeat failed: {}", e.getMessage());
        }
    }
    

    7.3 Désenregistrement à l’arrêt

    // Dans MainOrchestratorProcess.gracefulShutdown()
    if (registryClient != null && registryClient.isRegistered()) {
        log.info("[step:registry_unregister] Désenregistrement du Registry");
        try {
            registryClient.unregister(config.getExec_id());
        } catch (RegistryException e) {
            log.warn("Unregister failed", e);
        }
    }
    

    8. Exemple de données

    8.1 Registration

    {
      "workerId": "AGENT-DB2-MTQ-001",
      "workerType": "journal-reader",
      "region": "MTQ",
      "host": "mtq-nuc-01",
      "version": "4.0.0",
      "capabilities": ["db2-cdc", "nats-publisher"],
      "extra": {
        "journal": "QUSR0023",
        "library": "IMA001FDMQ",
        "startTime": "2025-12-09T10:00:00Z",
        "javaVersion": "21.0.1"
      }
    }
    

    8.2 Heartbeat

    {
      "workerId": "AGENT-DB2-MTQ-001",
      "status": "RUNNING",
      "load": {
        "uptime": 3600,
        "messagesPerMinute": 523,
        "lastSequence": 123456789,
        "memoryUsedMb": 256,
        "cpuPercent": 15
      }
    }
    

    9. Côté serveur (Registry central)

    9.1 Table worker_registry

    CREATE TABLE worker_registry (
        id BIGSERIAL PRIMARY KEY,
        worker_id VARCHAR(200) NOT NULL UNIQUE,
        worker_type VARCHAR(100) NOT NULL,
        region VARCHAR(50),
        host VARCHAR(200),
        version VARCHAR(50),
        status VARCHAR(20) DEFAULT 'UNKNOWN',
        registered_at TIMESTAMPTZ DEFAULT NOW(),
        last_heartbeat TIMESTAMPTZ,
        capabilities JSONB,
        extra JSONB,
        load JSONB
    );
    
    CREATE INDEX idx_worker_registry_region ON worker_registry(region);
    CREATE INDEX idx_worker_registry_type ON worker_registry(worker_type);
    CREATE INDEX idx_worker_registry_status ON worker_registry(status);
    

    9.2 Détection des workers LOST

    -- Workers sans heartbeat depuis plus de 2 minutes
    UPDATE worker_registry
    SET status = 'LOST'
    WHERE status = 'RUNNING'
      AND last_heartbeat < NOW() - INTERVAL '2 minutes';
    

    9.3 Dashboard Metabase/Grafana

    -- Workers actifs par région
    SELECT region, COUNT(*) as count
    FROM worker_registry
    WHERE status = 'RUNNING'
    GROUP BY region;
    
    -- Workers LOST
    SELECT worker_id, region, last_heartbeat
    FROM worker_registry
    WHERE status = 'LOST';
    

    10. Bonnes pratiques

    DO

    • ✅ Enregistrer au démarrage, désenregistrer à l’arrêt
    • ✅ Heartbeat régulier (30s recommandé)
    • ✅ Inclure des métriques utiles dans le heartbeat
    • ✅ Gérer gracieusement les échecs (non bloquant)

    DON’T

    • ❌ Heartbeat trop fréquent (< 10s)
    • ❌ Bloquer sur les erreurs registry
    • ❌ Stocker des données sensibles dans extra

    11. Troubleshooting

    Worker non visible dans le dashboard

    1. Vérifier WORKER_REGISTRY_ENABLED=true
    2. Vérifier WORKER_REGISTRY_URL
    3. Vérifier les logs : « Worker registered successfully »

    Worker marqué LOST

    1. Vérifier que l’application tourne
    2. Vérifier la connectivité réseau
    3. Vérifier les logs heartbeat

    Erreur 401

    1. Vérifier que AUTH_ENABLED=true
    2. Vérifier API_KEY
    3. Vérifier que l’auth fonctionne

    12. Références

  • Socle V004 – Standard TechDB

    Socle V004 – Standard TechDB

    28 – Standard de Creation de Table H2

    Version : 4.0.1 Date : 2026-01-13 Package : eu.lmvi.socle.techdb

    Introduction

    Ce document definit le standard de creation des tables dans la base technique H2 du Socle V004. Ce standard garantit la coherence, la tracabilite et la compatibilite PostgreSQL/H2.

    Structure de Base

    Toutes les tables TechDB doivent suivre cette structure:

    Champ Type Description
    x_id BIGINT GENERATED BY DEFAULT AS IDENTITY Identifiant technique auto-genere
    x_dateCreated TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT CURRENT_TIMESTAMP Date creation
    x_dateChanged TIMESTAMP WITH TIME ZONE Date modification (NULL a l’insert, MAJ par appli)
    x_sub VARCHAR(255) Sujet/categorie
    x_partition VARCHAR(30) Partition logique
    x_comment CLOB Commentaires/historiques JSON texte
    [champs metier] Champs specifiques a la table
    datas CLOB Donnees metier JSON texte (toujours en fin)

    Regles

    Identite

    • Utiliser GENERATED BY DEFAULT AS IDENTITY, pas AUTO_INCREMENT (MySQL)
    • Pas de sequence explicite
    • La colonne x_id est toujours la cle primaire
    -- Correct (SQL standard / H2 / PostgreSQL)
    x_id BIGINT GENERATED BY DEFAULT AS IDENTITY PRIMARY KEY
    
    -- Incorrect (MySQL uniquement)
    id BIGINT AUTO_INCREMENT PRIMARY KEY
    

    Timestamps

    • x_dateCreated: Toujours NOT NULL DEFAULT CURRENT_TIMESTAMP
    • x_dateChanged: NULL a l’insertion, mis a jour par l’application
    • Utiliser TIMESTAMP WITH TIME ZONE pour la compatibilite
    x_dateCreated TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT CURRENT_TIMESTAMP,
    x_dateChanged TIMESTAMP WITH TIME ZONE,
    

    Triggers

    • Aucun trigger dans H2
    • L’audit, l’historisation et la mise a jour de x_dateChanged sont geres par l’application

    Cle Primaire

    • Si une cle existante est presente (ex: worker_name), la conserver comme UNIQUE
    • x_id reste la PK technique
    x_id BIGINT GENERATED BY DEFAULT AS IDENTITY PRIMARY KEY,
    worker_name VARCHAR(255) NOT NULL UNIQUE,
    

    Conventions de Nommage

    Element Convention Exemple
    Champs techniques Prefixe x_ x_id, x_dateCreated
    Cles etrangeres id_<table_cible> id_user, id_order
    Champ id existant Renommer en x_id
    Donnees JSON datas en derniere position

    Permissions

    • Droits geres au niveau utilisateur H2
    • Proprietaire = utilisateur createur (socle par defaut)

    Contraintes H2 vs PostgreSQL

    PostgreSQL H2
    JSONB CLOB
    Triggers natifs Logique applicative
    Validation JSON DB Validation applicative
    SERIAL GENERATED BY DEFAULT AS IDENTITY
    Index GIN sur JSON Non supporte

    Exemple DDL Complet

    CREATE TABLE IF NOT EXISTS techdb_example (
        -- Champs techniques (standard)
        x_id BIGINT GENERATED BY DEFAULT AS IDENTITY PRIMARY KEY,
        x_dateCreated TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT CURRENT_TIMESTAMP,
        x_dateChanged TIMESTAMP WITH TIME ZONE,
        x_sub VARCHAR(255),
        x_partition VARCHAR(30),
        x_comment CLOB,
    
        -- Champs metier specifiques
        example_key VARCHAR(255) NOT NULL UNIQUE,
        status VARCHAR(50) NOT NULL,
        counter INT DEFAULT 0,
        last_activity TIMESTAMP WITH TIME ZONE,
    
        -- Donnees JSON (toujours en dernier)
        datas CLOB
    );
    
    -- Index recommandes
    CREATE INDEX IF NOT EXISTS idx_example_key ON techdb_example(example_key);
    CREATE INDEX IF NOT EXISTS idx_example_status ON techdb_example(status);
    CREATE INDEX IF NOT EXISTS idx_example_created ON techdb_example(x_dateCreated);
    

    Tables TechDB du Socle

    Le Socle V004 definit 5 tables techniques:

    techdb_offsets

    Stockage des offsets de consommation (Kafka, NATS, etc.)

    CREATE TABLE IF NOT EXISTS techdb_offsets (
        x_id BIGINT GENERATED BY DEFAULT AS IDENTITY PRIMARY KEY,
        x_dateCreated TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT CURRENT_TIMESTAMP,
        x_dateChanged TIMESTAMP WITH TIME ZONE,
        x_sub VARCHAR(255),
        x_partition VARCHAR(30),
        x_comment CLOB,
        offset_key VARCHAR(255) NOT NULL UNIQUE,
        topic VARCHAR(255) NOT NULL,
        partition_id INT DEFAULT 0,
        offset_value BIGINT NOT NULL,
        consumer_group VARCHAR(255),
        datas CLOB
    );
    

    techdb_worker_state

    Etat persistant des Workers

    CREATE TABLE IF NOT EXISTS techdb_worker_state (
        x_id BIGINT GENERATED BY DEFAULT AS IDENTITY PRIMARY KEY,
        x_dateCreated TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT CURRENT_TIMESTAMP,
        x_dateChanged TIMESTAMP WITH TIME ZONE,
        x_sub VARCHAR(255),
        x_partition VARCHAR(30),
        x_comment CLOB,
        worker_name VARCHAR(255) NOT NULL UNIQUE,
        state VARCHAR(50) NOT NULL,
        last_run_at TIMESTAMP WITH TIME ZONE,
        next_run_at TIMESTAMP WITH TIME ZONE,
        error_count INT DEFAULT 0,
        last_error CLOB,
        datas CLOB
    );
    

    techdb_events

    Evenements techniques

    CREATE TABLE IF NOT EXISTS techdb_events (
        x_id BIGINT GENERATED BY DEFAULT AS IDENTITY PRIMARY KEY,
        x_dateCreated TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT CURRENT_TIMESTAMP,
        x_dateChanged TIMESTAMP WITH TIME ZONE,
        x_sub VARCHAR(255),
        x_partition VARCHAR(30),
        x_comment CLOB,
        event_type VARCHAR(100) NOT NULL,
        source VARCHAR(255) NOT NULL,
        processed BOOLEAN DEFAULT FALSE,
        processed_at TIMESTAMP WITH TIME ZONE,
        datas CLOB
    );
    

    techdb_log_buffer

    Buffer de logs pour LogForwarder

    CREATE TABLE IF NOT EXISTS techdb_log_buffer (
        x_id BIGINT GENERATED BY DEFAULT AS IDENTITY PRIMARY KEY,
        x_dateCreated TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT CURRENT_TIMESTAMP,
        x_dateChanged TIMESTAMP WITH TIME ZONE,
        x_sub VARCHAR(255),
        x_partition VARCHAR(30),
        x_comment CLOB,
        log_level VARCHAR(20) NOT NULL,
        logger_name VARCHAR(255),
        message CLOB NOT NULL,
        thread_name VARCHAR(255),
        log_timestamp TIMESTAMP WITH TIME ZONE NOT NULL,
        forwarded BOOLEAN DEFAULT FALSE,
        forwarded_at TIMESTAMP WITH TIME ZONE,
        datas CLOB
    );
    

    techdb_kv

    Stockage cle-valeur generique

    CREATE TABLE IF NOT EXISTS techdb_kv (
        x_id BIGINT GENERATED BY DEFAULT AS IDENTITY PRIMARY KEY,
        x_dateCreated TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT CURRENT_TIMESTAMP,
        x_dateChanged TIMESTAMP WITH TIME ZONE,
        x_sub VARCHAR(255),
        x_partition VARCHAR(30),
        x_comment CLOB,
        kv_key VARCHAR(512) NOT NULL UNIQUE,
        value_type VARCHAR(50) DEFAULT 'string',
        expires_at TIMESTAMP WITH TIME ZONE,
        datas CLOB
    );
    

    Bonnes Pratiques

    DO

    • Toujours utiliser le format standard x_ pour les champs techniques
    • Mettre datas en derniere colonne
    • Creer des index sur les colonnes frequemment requetees
    • Utiliser TIMESTAMP WITH TIME ZONE pour tous les timestamps
    • Documenter le contenu JSON attendu dans datas

    DON’T

    • Ne pas utiliser AUTO_INCREMENT (syntaxe MySQL)
    • Ne pas creer de triggers (gestion applicative)
    • Ne pas stocker de BLOBs volumineux (utiliser stockage externe)
    • Ne pas utiliser JSONB (non supporte par H2)
    • Ne pas omettre les index sur les colonnes de recherche

    Migration depuis l’ancien format

    Si vous avez des tables existantes avec l’ancien format:

    -- 1. Sauvegarder les donnees
    CREATE TABLE techdb_events_backup AS SELECT * FROM techdb_events;
    
    -- 2. Supprimer l'ancienne table
    DROP TABLE techdb_events;
    
    -- 3. Recreer avec le nouveau format
    -- (voir DDL ci-dessus)
    
    -- 4. Migrer les donnees
    INSERT INTO techdb_events (event_type, source, processed, processed_at, datas)
    SELECT event_type, source, processed, processed_at, payload
    FROM techdb_events_backup;
    
    -- 5. Supprimer la sauvegarde
    DROP TABLE techdb_events_backup;
    

    Voir aussi

    H2 = dev/tests/outillage – Structure compatible PostgreSQL/H2

    Socle V004 – Standard TechDB

  • Socle V004 – Configuration

    Socle V004 – Configuration

    04 – Configuration

    Version : 4.0.0 Date : 2025-01-25

    1. Introduction

    Le Socle V4 utilise une configuration centralisée via SocleConfiguration qui charge les paramètres depuis application.yml et les variables d’environnement.

    Priorité de configuration

    1. Variables d'environnement (priorité maximale)
    2. application.yml
    3. Valeurs par défaut dans le code
    

    2. Fichier application.yml

    2.1 Configuration minimale

    socle:
      app_name: ${APP_NAME:my-app}
      env_name: ${ENV_NAME:DEV}
      exec_id: ${EXEC_ID:${socle.app_name}-${random.uuid}}
      region: ${REGION:local}
      version: ${APP_VERSION:4.0.0}
    
    spring:
      application:
        name: ${socle.app_name}
    
    server:
      port: ${HTTP_PORT:8080}
    
    logging:
      config: classpath:log4j2.xml
    

    2.2 Configuration complète

    socle:
      # === Identification ===
      app_name: ${APP_NAME:socle-v4}
      env_name: ${ENV_NAME:DEV}
      exec_id: ${EXEC_ID:${socle.app_name}-${random.uuid}}
      region: ${REGION:local}
      version: ${APP_VERSION:4.0.0}
    
      # === HTTP Server ===
      http:
        enabled: ${HTTP_ENABLED:true}
        port: ${HTTP_PORT:8080}
        context-path: ${CONTEXT_PATH:/}
    
      # === KvBus ===
      kvbus:
        mode: ${KVBUS_MODE:in_memory}
        redis:
          host: ${REDIS_HOST:localhost}
          port: ${REDIS_PORT:6379}
          password: ${REDIS_PASSWORD:}
          database: ${REDIS_DATABASE:0}
          prefix: ${REDIS_PREFIX:socle}
    
      # === Supervisor ===
      supervisor:
        heartbeat-interval-ms: ${SUPERVISOR_HEARTBEAT_MS:10000}
        unhealthy-threshold: ${SUPERVISOR_UNHEALTHY_THRESHOLD:3}
        check-interval-ms: ${SUPERVISOR_CHECK_INTERVAL_MS:5000}
        stale-timeout-ms: ${SUPERVISOR_STALE_TIMEOUT_MS:60000}
    
      # === StatusDashboard (V4) ===
      status_dashboard:
        enabled: ${STATUS_DASHBOARD_ENABLED:true}
        port: ${STATUS_DASHBOARD_PORT:9374}
        refresh_interval: ${STATUS_DASHBOARD_REFRESH:5}
    
      # === Scheduler ===
      scheduler:
        enabled: ${SCHEDULER_ENABLED:true}
        thread-pool-size: ${SCHEDULER_POOL_SIZE:4}
    
      # === Admin API ===
      admin:
        enabled: ${ADMIN_ENABLED:true}
        auth:
          enabled: ${ADMIN_AUTH_ENABLED:false}
          username: ${ADMIN_USERNAME:admin}
          password: ${ADMIN_PASSWORD:admin}
    
      # === TechDB (V4) ===
      techdb:
        enabled: ${TECHDB_ENABLED:true}
        url: jdbc:h2:file:${TECHDB_PATH:./data/socle-techdb};MODE=PostgreSQL;DB_CLOSE_DELAY=-1
        username: socle
        password: ${TECHDB_PASSWORD:socle}
        console:
          enabled: ${H2_CONSOLE_ENABLED:false}
          path: /h2-console
    
      # === Logging (V4) ===
      logging:
        forwarder:
          enabled: ${LOG_FORWARDER_ENABLED:false}
          transport-mode: ${LOG_TRANSPORT_MODE:http}
          log-hub-url: ${LOG_HUB_URL:}
          nats-url: ${NATS_URL:}
          batch-size: ${LOG_BATCH_SIZE:100}
          flush-interval-ms: ${LOG_FLUSH_INTERVAL_MS:5000}
    
      # === Auth Client (V4) ===
      auth:
        enabled: ${AUTH_ENABLED:false}
        server-url: ${AUTH_SERVER_URL:}
        source-name: ${SOURCE_NAME:${socle.app_name}}
        api-key: ${API_KEY:}
    
      # === Worker Registry (V4) ===
      worker-registry:
        enabled: ${WORKER_REGISTRY_ENABLED:false}
        server-url: ${WORKER_REGISTRY_URL:}
        heartbeat-interval-ms: ${REGISTRY_HEARTBEAT_MS:30000}
    
    spring:
      application:
        name: ${socle.app_name}
    
    server:
      port: ${socle.http.port}
    
    logging:
      config: classpath:log4j2.xml
    

    3. Variables d’environnement

    3.1 Variables essentielles

    Variable Description Défaut Obligatoire
    APP_NAME Nom de l’application socle-v4 Recommandé
    ENV_NAME Environnement (DEV/STAGING/PROD) DEV Recommandé
    REGION Région géographique local Recommandé
    HTTP_PORT Port HTTP 8080 Non

    3.2 Variables KvBus

    Variable Description Défaut
    KVBUS_MODE Mode (in_memory/redis) in_memory
    REDIS_HOST Hôte Redis localhost
    REDIS_PORT Port Redis 6379
    REDIS_PASSWORD Mot de passe Redis
    REDIS_DATABASE Database Redis 0
    REDIS_PREFIX Préfixe des clés socle

    3.3 Variables Supervisor et Dashboard

    Variable Description Défaut
    SUPERVISOR_HEARTBEAT_MS Intervalle heartbeat attendu 10000 (10s)
    SUPERVISOR_UNHEALTHY_THRESHOLD Heartbeats manqués avant UNHEALTHY 3
    SUPERVISOR_CHECK_INTERVAL_MS Fréquence de vérification 5000 (5s)
    SUPERVISOR_STALE_TIMEOUT_MS Timeout avant STALE 60000 (1min)
    STATUS_DASHBOARD_ENABLED Activer le dashboard true
    STATUS_DASHBOARD_PORT Port du dashboard 9374
    STATUS_DASHBOARD_REFRESH Rafraîchissement (secondes) 5

    3.4 Variables V4

    Variable Description Défaut
    TECHDB_ENABLED Activer H2 TechDB true
    TECHDB_PATH Chemin fichier H2 ./data/socle-techdb
    H2_CONSOLE_ENABLED Console H2 web false
    LOG_FORWARDER_ENABLED Activer LogForwarder false
    LOG_TRANSPORT_MODE Mode transport logs http
    AUTH_ENABLED Activer auth JWT false
    WORKER_REGISTRY_ENABLED Activer registry false

    4. Classe SocleConfiguration

    package eu.lmvi.socle.config;
    
    @Configuration
    @ConfigurationProperties(prefix = "socle")
    public class SocleConfiguration {
    
        // === Identification ===
        private String app_name = "socle-v4";
        private String env_name = "DEV";
        private String exec_id;
        private String region = "local";
        private String version = "4.0.0";
    
        // === HTTP ===
        private HttpConfig http = new HttpConfig();
    
        // === KvBus ===
        private KvBusConfig kvbus = new KvBusConfig();
    
        // === Supervisor ===
        private SupervisorConfig supervisor = new SupervisorConfig();
    
        // === TechDB (V4) ===
        private TechDbConfig techdb = new TechDbConfig();
    
        // === Logging (V4) ===
        private LoggingConfig logging = new LoggingConfig();
    
        // === Auth (V4) ===
        private AuthConfig auth = new AuthConfig();
    
        // === Worker Registry (V4) ===
        private WorkerRegistryConfig workerRegistry = new WorkerRegistryConfig();
    
        // Getters / Setters...
    
        @PostConstruct
        public void init() {
            if (exec_id == null || exec_id.isEmpty()) {
                exec_id = app_name + "-" + UUID.randomUUID().toString().substring(0, 8);
            }
        }
    }
    

    4.1 Sous-configurations

    public static class HttpConfig {
        private boolean enabled = true;
        private int port = 8080;
        private String contextPath = "/";
    }
    
    public static class KvBusConfig {
        private String mode = "in_memory";
        private RedisConfig redis = new RedisConfig();
    }
    
    public static class TechDbConfig {
        private boolean enabled = true;
        private String url = "jdbc:h2:file:./data/socle-techdb";
        private String username = "socle";
        private String password = "socle";
        private ConsoleConfig console = new ConsoleConfig();
    }
    
    public static class LoggingConfig {
        private ForwarderConfig forwarder = new ForwarderConfig();
    }
    
    public static class AuthConfig {
        private boolean enabled = false;
        private String serverUrl;
        private String sourceName;
        private String apiKey;
    }
    
    public static class WorkerRegistryConfig {
        private boolean enabled = false;
        private String serverUrl;
        private long heartbeatIntervalMs = 30000;
    }
    

    5. Accès à la configuration

    5.1 Injection

    @Service
    public class MonService {
    
        @Autowired
        private SocleConfiguration config;
    
        public void doSomething() {
            String appName = config.getApp_name();
            String region = config.getRegion();
            boolean techDbEnabled = config.getTechdb().isEnabled();
        }
    }
    

    5.2 Dans un Worker

    public class MonWorker implements Worker {
    
        private final SocleConfiguration config;
    
        public MonWorker(SocleConfiguration config) {
            this.config = config;
        }
    
        @Override
        public void doWork() {
            log.info("Running in region: {}", config.getRegion());
        }
    }
    

    6. Profils Spring

    6.1 Activation

    # Via variable d'environnement
    export SPRING_PROFILES_ACTIVE=prod
    
    # Via ligne de commande
    java -jar app.jar --spring.profiles.active=prod
    

    6.2 Fichiers par profil

    src/main/resources/
    ├── application.yml           # Configuration de base
    ├── application-dev.yml       # Overrides DEV
    ├── application-staging.yml   # Overrides STAGING
    └── application-prod.yml      # Overrides PROD
    

    6.3 Exemple application-prod.yml

    socle:
      env_name: PROD
      techdb:
        console:
          enabled: false
      logging:
        forwarder:
          enabled: true
      auth:
        enabled: true
      worker-registry:
        enabled: true
    
    logging:
      config: classpath:log4j2-prod.xml
    

    7. Configuration Docker

    7.1 Dockerfile

    FROM eclipse-temurin:21-jre
    
    WORKDIR /app
    
    COPY target/socle-v004-4.0.0.jar app.jar
    
    # Variables par défaut (peuvent être overridées)
    ENV APP_NAME=socle-v4
    ENV ENV_NAME=PROD
    ENV HTTP_PORT=8080
    ENV TECHDB_PATH=/app/data/techdb
    
    EXPOSE 8080
    
    ENTRYPOINT ["java", "-jar", "app.jar"]
    

    7.2 docker-compose.yml

    version: '3.8'
    
    services:
      socle-app:
        image: socle-v4:latest
        environment:
          - APP_NAME=my-service
          - ENV_NAME=PROD
          - REGION=MTQ
          - KVBUS_MODE=redis
          - REDIS_HOST=redis
          - LOG_FORWARDER_ENABLED=true
          - LOG_HUB_URL=http://loghub:8080/api/ingest-logs
        ports:
          - "8080:8080"
        volumes:
          - ./data:/app/data
        depends_on:
          - redis
    
      redis:
        image: redis:7-alpine
        ports:
          - "6379:6379"
    

    8. Configuration Kubernetes

    8.1 ConfigMap

    apiVersion: v1
    kind: ConfigMap
    metadata:
      name: socle-config
    data:
      APP_NAME: "my-service"
      ENV_NAME: "PROD"
      REGION: "MTQ"
      KVBUS_MODE: "redis"
      LOG_FORWARDER_ENABLED: "true"
    

    8.2 Secret

    apiVersion: v1
    kind: Secret
    metadata:
      name: socle-secrets
    type: Opaque
    stringData:
      REDIS_PASSWORD: "secret-password"
      API_KEY: "my-api-key"
      TECHDB_PASSWORD: "techdb-password"
    

    8.3 Deployment

    apiVersion: apps/v1
    kind: Deployment
    metadata:
      name: socle-app
    spec:
      replicas: 2
      template:
        spec:
          containers:
            - name: socle
              image: socle-v4:latest
              envFrom:
                - configMapRef:
                    name: socle-config
                - secretRef:
                    name: socle-secrets
              ports:
                - containerPort: 8080
    

    9. Validation de configuration

    9.1 Validation au démarrage

    @Component
    public class ConfigurationValidator implements ApplicationListener<ApplicationReadyEvent> {
    
        @Autowired
        private SocleConfiguration config;
    
        @Override
        public void onApplicationEvent(ApplicationReadyEvent event) {
            validateRequired();
            validateConsistency();
        }
    
        private void validateRequired() {
            if (config.getApp_name() == null || config.getApp_name().isEmpty()) {
                throw new IllegalStateException("APP_NAME is required");
            }
        }
    
        private void validateConsistency() {
            if (config.getAuth().isEnabled() && config.getAuth().getApiKey() == null) {
                throw new IllegalStateException("API_KEY required when AUTH_ENABLED=true");
            }
        }
    }
    

    9.2 Endpoint de configuration

    GET /admin/config
    

    Retourne la configuration actuelle (sans les secrets).

    10. Bonnes pratiques

    DO

    • Utiliser les variables d’environnement pour les valeurs spécifiques à l’environnement
    • Définir des valeurs par défaut sensées
    • Utiliser des profils Spring pour les environnements
    • Valider la configuration au démarrage
    • Ne jamais committer de secrets

    DON’T

    • Ne pas hardcoder de valeurs dans le code
    • Ne pas mettre de mots de passe dans application.yml
    • Ne pas utiliser de valeurs par défaut dangereuses en prod

    11. Références

  • Socle V004 – Métriques

    Socle V004 – Métriques

    15 – Metrics

    Version : 4.0.0 Date : 2025-12-09

    1. Introduction

    Le Socle V4 expose des métriques au format Prometheus pour le monitoring et l’alerting.

    Types de métriques

    • Counter : Valeur qui ne fait qu’augmenter (requêtes, erreurs)
    • Gauge : Valeur qui peut monter et descendre (connexions actives)
    • Histogram : Distribution de valeurs (latences)
    • Summary : Similaire à histogram avec percentiles pré-calculés

    2. Configuration

    2.1 application.yml

    management:
      endpoints:
        web:
          exposure:
            include: prometheus,health,info,metrics
          base-path: /actuator
      endpoint:
        prometheus:
          enabled: true
      metrics:
        export:
          prometheus:
            enabled: true
        tags:
          application: ${socle.app_name}
          environment: ${socle.env_name}
          region: ${socle.region}
    

    2.2 Dépendances Maven

    <dependency>
        <groupId>io.micrometer</groupId>
        <artifactId>micrometer-registry-prometheus</artifactId>
    </dependency>
    

    3. Métriques Socle

    3.1 Métriques Workers

    # Nombre de workers
    socle_workers_total{application="socle-v4"} 5
    
    # Workers healthy
    socle_workers_healthy{application="socle-v4"} 5
    
    # Workers unhealthy
    socle_workers_unhealthy{application="socle-v4"} 0
    
    # État par worker
    socle_worker_status{worker="kafka-consumer",status="RUNNING"} 1
    socle_worker_status{worker="order-processor",status="RUNNING"} 1
    
    # Heartbeats par worker
    socle_worker_heartbeats_total{worker="kafka-consumer"} 1234
    socle_worker_missed_heartbeats{worker="kafka-consumer"} 0
    

    3.2 Métriques KvBus

    # Opérations
    socle_kvbus_operations_total{operation="get"} 12345
    socle_kvbus_operations_total{operation="put"} 6789
    socle_kvbus_operations_total{operation="delete"} 234
    
    # Latence
    socle_kvbus_operation_duration_seconds{operation="get",quantile="0.5"} 0.001
    socle_kvbus_operation_duration_seconds{operation="get",quantile="0.95"} 0.005
    socle_kvbus_operation_duration_seconds{operation="get",quantile="0.99"} 0.01
    
    # Nombre de clés
    socle_kvbus_keys_count 456
    

    3.3 Métriques Pipeline

    # Exécutions
    socle_pipeline_executions_total{pipeline="order-processing",status="SUCCESS"} 1234
    socle_pipeline_executions_total{pipeline="order-processing",status="FAILURE"} 12
    
    # Durée
    socle_pipeline_duration_seconds{pipeline="order-processing",quantile="0.5"} 0.5
    socle_pipeline_duration_seconds{pipeline="order-processing",quantile="0.95"} 2.0
    socle_pipeline_duration_seconds{pipeline="order-processing",quantile="0.99"} 5.0
    
    # Steps
    socle_pipeline_step_duration_seconds{step="validation",quantile="0.5"} 0.01
    socle_pipeline_step_duration_seconds{step="processing",quantile="0.5"} 0.3
    

    3.4 Métriques Resilience

    # Circuit breaker état (0=CLOSED, 1=HALF_OPEN, 2=OPEN)
    socle_circuit_breaker_state{name="payment-gateway"} 0
    
    # Tentatives de retry
    socle_retry_attempts_total{operation="external-api",attempt="1",success="true"} 1000
    socle_retry_attempts_total{operation="external-api",attempt="2",success="true"} 50
    socle_retry_attempts_total{operation="external-api",attempt="3",success="false"} 5
    

    3.5 Métriques TechDB (V4)

    # Opérations
    socle_techdb_operations_total{operation="saveOffset"} 5678
    socle_techdb_operations_total{operation="getOffset"} 12345
    
    # Taille des tables
    socle_techdb_rows_count{table="socle_offsets"} 23
    socle_techdb_rows_count{table="socle_events"} 456
    socle_techdb_rows_count{table="socle_log_fallback"} 0
    

    3.6 Métriques LogForwarder (V4)

    # Queue
    socle_logforwarder_queue_size 45
    socle_logforwarder_queue_capacity 10000
    
    # Logs envoyés
    socle_logforwarder_logs_sent_total 123456
    socle_logforwarder_logs_failed_total 23
    socle_logforwarder_logs_fallback_total 0
    
    # Batches
    socle_logforwarder_batches_sent_total 1234
    socle_logforwarder_batch_size{quantile="0.5"} 100
    

    4. Implémentation

    4.1 Enregistrement des métriques

    package eu.lmvi.socle.metrics;
    
    @Component
    public class SocleMetrics {
    
        private final MeterRegistry registry;
    
        // Counters
        private final Counter requestsTotal;
        private final Counter errorsTotal;
    
        // Gauges
        private final AtomicInteger activeConnections = new AtomicInteger(0);
    
        // Timers
        private final Timer requestDuration;
    
        public SocleMetrics(MeterRegistry registry) {
            this.registry = registry;
    
            // Counter
            this.requestsTotal = Counter.builder("socle_requests_total")
                .description("Total number of requests")
                .register(registry);
    
            this.errorsTotal = Counter.builder("socle_errors_total")
                .description("Total number of errors")
                .register(registry);
    
            // Gauge
            Gauge.builder("socle_active_connections", activeConnections, AtomicInteger::get)
                .description("Number of active connections")
                .register(registry);
    
            // Timer
            this.requestDuration = Timer.builder("socle_request_duration_seconds")
                .description("Request duration in seconds")
                .publishPercentiles(0.5, 0.95, 0.99)
                .register(registry);
        }
    
        public void recordRequest() {
            requestsTotal.increment();
        }
    
        public void recordError() {
            errorsTotal.increment();
        }
    
        public void connectionOpened() {
            activeConnections.incrementAndGet();
        }
    
        public void connectionClosed() {
            activeConnections.decrementAndGet();
        }
    
        public Timer.Sample startTimer() {
            return Timer.start(registry);
        }
    
        public void stopTimer(Timer.Sample sample) {
            sample.stop(requestDuration);
        }
    }
    

    4.2 Utilisation dans le code

    @Service
    public class OrderService {
    
        @Autowired
        private SocleMetrics metrics;
    
        public Order processOrder(Order order) {
            Timer.Sample sample = metrics.startTimer();
            metrics.recordRequest();
    
            try {
                Order result = doProcess(order);
                return result;
            } catch (Exception e) {
                metrics.recordError();
                throw e;
            } finally {
                metrics.stopTimer(sample);
            }
        }
    }
    

    4.3 Métriques avec tags

    @Component
    public class WorkerMetrics {
    
        private final MeterRegistry registry;
    
        public void recordWorkerStatus(String workerName, String status) {
            Gauge.builder("socle_worker_status", () -> 1)
                .tag("worker", workerName)
                .tag("status", status)
                .register(registry);
        }
    
        public void recordProcessed(String workerName, String type) {
            Counter.builder("socle_worker_processed_total")
                .tag("worker", workerName)
                .tag("type", type)
                .register(registry)
                .increment();
        }
    }
    

    5. Endpoint Prometheus

    5.1 Accès

    curl http://localhost:8080/actuator/prometheus
    

    5.2 Sortie

    # HELP socle_workers_total Number of workers
    # TYPE socle_workers_total gauge
    socle_workers_total{application="socle-v4",environment="PROD",region="MTQ"} 5
    
    # HELP socle_workers_healthy Number of healthy workers
    # TYPE socle_workers_healthy gauge
    socle_workers_healthy{application="socle-v4",environment="PROD",region="MTQ"} 5
    
    # HELP socle_requests_total Total number of requests
    # TYPE socle_requests_total counter
    socle_requests_total{application="socle-v4",environment="PROD",region="MTQ"} 12345
    
    # HELP socle_request_duration_seconds Request duration in seconds
    # TYPE socle_request_duration_seconds summary
    socle_request_duration_seconds{application="socle-v4",quantile="0.5"} 0.05
    socle_request_duration_seconds{application="socle-v4",quantile="0.95"} 0.2
    socle_request_duration_seconds{application="socle-v4",quantile="0.99"} 0.5
    socle_request_duration_seconds_count{application="socle-v4"} 12345
    socle_request_duration_seconds_sum{application="socle-v4"} 617.25
    

    6. Prometheus Configuration

    6.1 prometheus.yml

    global:
      scrape_interval: 15s
    
    scrape_configs:
      - job_name: 'socle-v4'
        metrics_path: '/actuator/prometheus'
        static_configs:
          - targets: ['socle-app:8080']
            labels:
              app: 'socle-v4'
              env: 'prod'
    
      - job_name: 'socle-v4-kubernetes'
        kubernetes_sd_configs:
          - role: pod
        relabel_configs:
          - source_labels: [__meta_kubernetes_pod_annotation_prometheus_io_scrape]
            action: keep
            regex: true
          - source_labels: [__meta_kubernetes_pod_annotation_prometheus_io_path]
            action: replace
            target_label: __metrics_path__
            regex: (.+)
    

    6.2 Kubernetes annotations

    apiVersion: v1
    kind: Pod
    metadata:
      annotations:
        prometheus.io/scrape: "true"
        prometheus.io/path: "/actuator/prometheus"
        prometheus.io/port: "8080"
    

    7. Grafana Dashboards

    7.1 Exemple de requêtes

    # Taux de requêtes par seconde
    rate(socle_requests_total[5m])
    
    # Taux d'erreurs
    rate(socle_errors_total[5m]) / rate(socle_requests_total[5m]) * 100
    
    # Latence P95
    histogram_quantile(0.95, rate(socle_request_duration_seconds_bucket[5m]))
    
    # Workers unhealthy
    socle_workers_unhealthy
    
    # Circuit breakers ouverts
    socle_circuit_breaker_state == 2
    
    # Queue LogForwarder
    socle_logforwarder_queue_size / socle_logforwarder_queue_capacity * 100
    

    7.2 Dashboard JSON

    {
      "title": "Socle V4 Dashboard",
      "panels": [
        {
          "title": "Request Rate",
          "type": "graph",
          "targets": [
            {
              "expr": "rate(socle_requests_total[5m])",
              "legendFormat": "{{application}}"
            }
          ]
        },
        {
          "title": "Error Rate",
          "type": "graph",
          "targets": [
            {
              "expr": "rate(socle_errors_total[5m]) / rate(socle_requests_total[5m]) * 100",
              "legendFormat": "Error %"
            }
          ]
        },
        {
          "title": "P95 Latency",
          "type": "graph",
          "targets": [
            {
              "expr": "histogram_quantile(0.95, rate(socle_request_duration_seconds_bucket[5m]))",
              "legendFormat": "P95"
            }
          ]
        },
        {
          "title": "Workers Status",
          "type": "stat",
          "targets": [
            {
              "expr": "socle_workers_healthy",
              "legendFormat": "Healthy"
            }
          ]
        }
      ]
    }
    

    8. Alerting

    8.1 Prometheus Alertmanager rules

    groups:
      - name: socle-alerts
        rules:
          - alert: SocleHighErrorRate
            expr: rate(socle_errors_total[5m]) / rate(socle_requests_total[5m]) > 0.05
            for: 5m
            labels:
              severity: warning
            annotations:
              summary: "High error rate on {{ $labels.application }}"
              description: "Error rate is {{ $value | humanizePercentage }}"
    
          - alert: SocleWorkerUnhealthy
            expr: socle_workers_unhealthy > 0
            for: 2m
            labels:
              severity: critical
            annotations:
              summary: "Unhealthy workers on {{ $labels.application }}"
              description: "{{ $value }} workers are unhealthy"
    
          - alert: SocleCircuitBreakerOpen
            expr: socle_circuit_breaker_state == 2
            for: 5m
            labels:
              severity: warning
            annotations:
              summary: "Circuit breaker {{ $labels.name }} is OPEN"
    
          - alert: SocleLogForwarderQueueHigh
            expr: socle_logforwarder_queue_size / socle_logforwarder_queue_capacity > 0.8
            for: 5m
            labels:
              severity: warning
            annotations:
              summary: "LogForwarder queue is {{ $value | humanizePercentage }} full"
    

    9. Bonnes pratiques

    DO

    • Utiliser des noms de métriques cohérents (socle_*)
    • Ajouter des tags pertinents (application, environment, region)
    • Utiliser des histogrammes pour les latences
    • Définir des alertes sur les métriques critiques
    • Documenter les métriques

    DON’T

    • Ne pas créer trop de métriques (cardinalité)
    • Ne pas utiliser de valeurs à haute cardinalité dans les tags
    • Ne pas oublier les métriques d’erreur
    • Ne pas ignorer les métriques de queue/buffer

    10. Références

  • Socle V004 – Pipeline

    Socle V004 – Pipeline

    09 – Pipeline Engine

    Version : 4.1.0 Date : 2025-12-28

    1. Introduction

    Le PipelineEngine permet de définir des chaînes de traitement composées d’étapes (steps) exécutées séquentiellement ou en parallèle.

    Caractéristiques

    • Définition déclarative des pipelines
    • Exécution séquentielle ou parallèle
    • Gestion des erreurs et retry
    • Context partagé entre étapes
    • Métriques et logging intégrés

    2. Architecture

    ┌─────────────────────────────────────────────────────────────┐
    │                      PipelineEngine                          │
    └─────────────────────────────────────────────────────────────┘
                                  │
                                  ▼
    ┌─────────────────────────────────────────────────────────────┐
    │                      Pipeline                                │
    │  ┌─────────────────────────────────────────────────────┐    │
    │  │              PipelineContext                         │    │
    │  │  - input data                                        │    │
    │  │  - shared state                                      │    │
    │  │  - results                                           │    │
    │  └─────────────────────────────────────────────────────┘    │
    │                                                              │
    │  ┌──────────┐   ┌──────────┐   ┌──────────┐                │
    │  │  Step 1  │──►│  Step 2  │──►│  Step 3  │                │
    │  └──────────┘   └──────────┘   └──────────┘                │
    │                                                              │
    └─────────────────────────────────────────────────────────────┘
    

    3. Interface Pipeline

    package eu.lmvi.socle.pipeline;
    
    public interface Pipeline<I, O> {
    
        /**
         * Nom du pipeline
         */
        String getName();
    
        /**
         * Exécute le pipeline
         */
        PipelineResult<O> execute(I input);
    
        /**
         * Exécute le pipeline avec context
         */
        PipelineResult<O> execute(I input, PipelineContext context);
    
        /**
         * Liste des étapes
         */
        List<PipelineStep<?, ?>> getSteps();
    }
    

    4. Interface PipelineStep

    package eu.lmvi.socle.pipeline;
    
    public interface PipelineStep<I, O> {
    
        /**
         * Nom de l'étape
         */
        String getName();
    
        /**
         * Exécute l'étape
         */
        StepResult<O> execute(I input, PipelineContext context);
    
        /**
         * L'étape peut-elle être retryée ?
         */
        default boolean isRetryable() {
            return true;
        }
    
        /**
         * Nombre max de retries
         */
        default int getMaxRetries() {
            return 3;
        }
    
        /**
         * L'étape est-elle optionnelle ?
         */
        default boolean isOptional() {
            return false;
        }
    }
    

    5. Context et Result

    5.1 PipelineContext

    package eu.lmvi.socle.pipeline;
    
    public class PipelineContext {
        private final Map<String, Object> attributes = new ConcurrentHashMap<>();
        private final List<StepResult<?>> stepResults = new ArrayList<>();
        private final String correlationId;
        private final Instant startTime;
    
        public PipelineContext() {
            this.correlationId = UUID.randomUUID().toString();
            this.startTime = Instant.now();
        }
    
        public void put(String key, Object value) {
            attributes.put(key, value);
        }
    
        public <T> Optional<T> get(String key, Class<T> type) {
            return Optional.ofNullable(attributes.get(key))
                .filter(type::isInstance)
                .map(type::cast);
        }
    
        public void addStepResult(StepResult<?> result) {
            stepResults.add(result);
        }
    
        public List<StepResult<?>> getStepResults() {
            return Collections.unmodifiableList(stepResults);
        }
    
        public String getCorrelationId() {
            return correlationId;
        }
    
        public Duration getElapsedTime() {
            return Duration.between(startTime, Instant.now());
        }
    }
    

    5.2 StepResult

    package eu.lmvi.socle.pipeline;
    
    public record StepResult<T>(
        String stepName,
        StepStatus status,
        T output,
        Exception error,
        Duration duration,
        int attempts
    ) {
        public boolean isSuccess() {
            return status == StepStatus.SUCCESS;
        }
    
        public boolean isFailure() {
            return status == StepStatus.FAILURE;
        }
    
        public boolean isSkipped() {
            return status == StepStatus.SKIPPED;
        }
    
        public static <T> StepResult<T> success(String name, T output, Duration duration) {
            return new StepResult<>(name, StepStatus.SUCCESS, output, null, duration, 1);
        }
    
        public static <T> StepResult<T> failure(String name, Exception error, Duration duration, int attempts) {
            return new StepResult<>(name, StepStatus.FAILURE, null, error, duration, attempts);
        }
    
        public static <T> StepResult<T> skipped(String name) {
            return new StepResult<>(name, StepStatus.SKIPPED, null, null, Duration.ZERO, 0);
        }
    }
    
    public enum StepStatus {
        SUCCESS,
        FAILURE,
        SKIPPED
    }
    

    5.3 PipelineResult

    package eu.lmvi.socle.pipeline;
    
    public record PipelineResult<T>(
        String pipelineName,
        PipelineStatus status,
        T output,
        PipelineContext context,
        Duration totalDuration
    ) {
        public boolean isSuccess() {
            return status == PipelineStatus.SUCCESS;
        }
    
        public List<StepResult<?>> getFailedSteps() {
            return context.getStepResults().stream()
                .filter(StepResult::isFailure)
                .toList();
        }
    }
    
    public enum PipelineStatus {
        SUCCESS,
        PARTIAL_SUCCESS,
        FAILURE
    }
    

    6. Implémentation

    6.1 DefaultPipelineEngine

    package eu.lmvi.socle.pipeline;
    
    @Component
    public class DefaultPipelineEngine implements PipelineEngine {
    
        private static final Logger log = LoggerFactory.getLogger(DefaultPipelineEngine.class);
    
        @Override
        public <I, O> PipelineResult<O> execute(Pipeline<I, O> pipeline, I input) {
            return execute(pipeline, input, new PipelineContext());
        }
    
        @Override
        @SuppressWarnings("unchecked")
        public <I, O> PipelineResult<O> execute(Pipeline<I, O> pipeline, I input, PipelineContext context) {
            log.info("[{}] Starting pipeline execution", pipeline.getName());
            Instant start = Instant.now();
    
            Object currentInput = input;
            O finalOutput = null;
            boolean hasFailure = false;
    
            for (PipelineStep<?, ?> step : pipeline.getSteps()) {
                PipelineStep<Object, Object> typedStep = (PipelineStep<Object, Object>) step;
    
                StepResult<Object> result = executeStep(typedStep, currentInput, context);
                context.addStepResult(result);
    
                if (result.isSuccess()) {
                    currentInput = result.output();
                    finalOutput = (O) result.output();
                } else if (result.isFailure()) {
                    if (!step.isOptional()) {
                        hasFailure = true;
                        log.error("[{}] Pipeline failed at step: {}", pipeline.getName(), step.getName());
                        break;
                    }
                    log.warn("[{}] Optional step failed, continuing: {}", pipeline.getName(), step.getName());
                }
            }
    
            Duration duration = Duration.between(start, Instant.now());
            PipelineStatus status = hasFailure
                ? PipelineStatus.FAILURE
                : (context.getStepResults().stream().anyMatch(StepResult::isFailure)
                    ? PipelineStatus.PARTIAL_SUCCESS
                    : PipelineStatus.SUCCESS);
    
            log.info("[{}] Pipeline completed with status: {} in {}ms",
                pipeline.getName(), status, duration.toMillis());
    
            return new PipelineResult<>(pipeline.getName(), status, finalOutput, context, duration);
        }
    
        private <I, O> StepResult<O> executeStep(PipelineStep<I, O> step, I input, PipelineContext context) {
            log.debug("[{}] Executing step", step.getName());
            Instant start = Instant.now();
            int attempts = 0;
            Exception lastError = null;
    
            while (attempts < step.getMaxRetries()) {
                attempts++;
                try {
                    StepResult<O> result = step.execute(input, context);
                    if (result.isSuccess()) {
                        Duration duration = Duration.between(start, Instant.now());
                        log.debug("[{}] Step completed in {}ms", step.getName(), duration.toMillis());
                        return StepResult.success(step.getName(), result.output(), duration);
                    }
                    lastError = result.error();
                } catch (Exception e) {
                    lastError = e;
                    log.warn("[{}] Step attempt {} failed: {}", step.getName(), attempts, e.getMessage());
    
                    if (!step.isRetryable() || attempts >= step.getMaxRetries()) {
                        break;
                    }
    
                    // Backoff
                    try {
                        Thread.sleep(attempts * 1000L);
                    } catch (InterruptedException ie) {
                        Thread.currentThread().interrupt();
                        break;
                    }
                }
            }
    
            Duration duration = Duration.between(start, Instant.now());
            return StepResult.failure(step.getName(), lastError, duration, attempts);
        }
    }
    

    6.2 Pipeline Builder

    package eu.lmvi.socle.pipeline;
    
    public class PipelineBuilder<I, O> {
    
        private final String name;
        private final List<PipelineStep<?, ?>> steps = new ArrayList<>();
    
        public PipelineBuilder(String name) {
            this.name = name;
        }
    
        public static <I, O> PipelineBuilder<I, O> create(String name) {
            return new PipelineBuilder<>(name);
        }
    
        public <NO> PipelineBuilder<I, NO> addStep(PipelineStep<O, NO> step) {
            steps.add(step);
            return (PipelineBuilder<I, NO>) this;
        }
    
        public <NO> PipelineBuilder<I, NO> addStep(String name, Function<O, NO> processor) {
            return addStep(new FunctionalStep<>(name, processor));
        }
    
        public Pipeline<I, O> build() {
            return new DefaultPipeline<>(name, List.copyOf(steps));
        }
    
        private record FunctionalStep<I, O>(String name, Function<I, O> processor) implements PipelineStep<I, O> {
    
            @Override
            public String getName() {
                return name;
            }
    
            @Override
            public StepResult<O> execute(I input, PipelineContext context) {
                O output = processor.apply(input);
                return StepResult.success(name, output, Duration.ZERO);
            }
        }
    }
    

    7. Utilisation

    7.1 Pipeline simple

    @Component
    public class OrderPipeline {
    
        @Autowired
        private PipelineEngine engine;
    
        public PipelineResult<OrderResult> processOrder(Order order) {
            Pipeline<Order, OrderResult> pipeline = PipelineBuilder
                .<Order, OrderResult>create("order-processing")
                .addStep("validate", this::validateOrder)
                .addStep("enrich", this::enrichOrder)
                .addStep("process", this::processOrder)
                .addStep("notify", this::notifyCustomer)
                .build();
    
            return engine.execute(pipeline, order);
        }
    
        private ValidatedOrder validateOrder(Order order) {
            // Validation...
            return new ValidatedOrder(order);
        }
    
        private EnrichedOrder enrichOrder(ValidatedOrder order) {
            // Enrichissement...
            return new EnrichedOrder(order);
        }
    
        private ProcessedOrder processOrder(EnrichedOrder order) {
            // Traitement...
            return new ProcessedOrder(order);
        }
    
        private OrderResult notifyCustomer(ProcessedOrder order) {
            // Notification...
            return new OrderResult(order, "SUCCESS");
        }
    }
    

    7.2 Étapes personnalisées

    public class ValidationStep implements PipelineStep<Order, ValidatedOrder> {
    
        @Override
        public String getName() {
            return "validation";
        }
    
        @Override
        public StepResult<ValidatedOrder> execute(Order input, PipelineContext context) {
            List<String> errors = new ArrayList<>();
    
            if (input.getCustomerId() == null) {
                errors.add("Missing customer ID");
            }
            if (input.getItems().isEmpty()) {
                errors.add("No items in order");
            }
    
            if (!errors.isEmpty()) {
                return StepResult.failure(getName(),
                    new ValidationException(errors),
                    Duration.ZERO, 1);
            }
    
            // Stocker des données dans le context
            context.put("customerId", input.getCustomerId());
    
            return StepResult.success(getName(), new ValidatedOrder(input), Duration.ZERO);
        }
    
        @Override
        public boolean isRetryable() {
            return false;  // Validation ne doit pas être retryée
        }
    }
    

    7.3 Étape optionnelle

    public class NotificationStep implements PipelineStep<ProcessedOrder, ProcessedOrder> {
    
        @Override
        public String getName() {
            return "notification";
        }
    
        @Override
        public StepResult<ProcessedOrder> execute(ProcessedOrder input, PipelineContext context) {
            try {
                sendNotification(input);
                return StepResult.success(getName(), input, Duration.ZERO);
            } catch (Exception e) {
                return StepResult.failure(getName(), e, Duration.ZERO, 1);
            }
        }
    
        @Override
        public boolean isOptional() {
            return true;  // Le pipeline continue même si la notif échoue
        }
    }
    

    7.4 Utilisation du context

    public class EnrichmentStep implements PipelineStep<ValidatedOrder, EnrichedOrder> {
    
        @Autowired
        private CustomerService customerService;
    
        @Override
        public String getName() {
            return "enrichment";
        }
    
        @Override
        public StepResult<EnrichedOrder> execute(ValidatedOrder input, PipelineContext context) {
            // Lire depuis le context
            String customerId = context.get("customerId", String.class).orElseThrow();
    
            // Enrichir
            Customer customer = customerService.getCustomer(customerId);
            EnrichedOrder enriched = new EnrichedOrder(input, customer);
    
            // Écrire dans le context pour les étapes suivantes
            context.put("customerEmail", customer.getEmail());
    
            return StepResult.success(getName(), enriched, Duration.ZERO);
        }
    }
    

    8. Gestion des erreurs

    8.1 Retry automatique

    public class ExternalApiStep implements PipelineStep<Data, ApiResponse> {
    
        @Override
        public String getName() {
            return "external-api-call";
        }
    
        @Override
        public boolean isRetryable() {
            return true;
        }
    
        @Override
        public int getMaxRetries() {
            return 5;  // 5 tentatives max
        }
    
        @Override
        public StepResult<ApiResponse> execute(Data input, PipelineContext context) {
            // Appel API qui peut échouer
            ApiResponse response = callApi(input);
            return StepResult.success(getName(), response, Duration.ZERO);
        }
    }
    

    8.2 Traitement des résultats

    PipelineResult<OrderResult> result = engine.execute(pipeline, order);
    
    if (result.isSuccess()) {
        log.info("Order processed successfully: {}", result.output());
    } else {
        // Analyser les étapes en échec
        for (StepResult<?> stepResult : result.getFailedSteps()) {
            log.error("Step {} failed after {} attempts: {}",
                stepResult.stepName(),
                stepResult.attempts(),
                stepResult.error().getMessage());
        }
    
        // Décider quoi faire
        if (result.status() == PipelineStatus.PARTIAL_SUCCESS) {
            // Certaines étapes optionnelles ont échoué
            handlePartialSuccess(result);
        } else {
            // Échec complet
            handleFailure(result);
        }
    }
    

    9. Pipelines parallèles

    public class ParallelEnrichmentStep implements PipelineStep<Order, EnrichedOrder> {
    
        @Autowired
        private ExecutorService executor;
    
        @Override
        public StepResult<EnrichedOrder> execute(Order input, PipelineContext context) {
            // Exécuter plusieurs enrichissements en parallèle
            CompletableFuture<Customer> customerFuture = CompletableFuture.supplyAsync(
                () -> customerService.getCustomer(input.getCustomerId()), executor);
    
            CompletableFuture<Inventory> inventoryFuture = CompletableFuture.supplyAsync(
                () -> inventoryService.checkInventory(input.getItems()), executor);
    
            CompletableFuture<Pricing> pricingFuture = CompletableFuture.supplyAsync(
                () -> pricingService.calculatePrice(input), executor);
    
            try {
                CompletableFuture.allOf(customerFuture, inventoryFuture, pricingFuture).join();
    
                EnrichedOrder enriched = new EnrichedOrder(
                    input,
                    customerFuture.get(),
                    inventoryFuture.get(),
                    pricingFuture.get()
                );
    
                return StepResult.success(getName(), enriched, Duration.ZERO);
            } catch (Exception e) {
                return StepResult.failure(getName(), e, Duration.ZERO, 1);
            }
        }
    }
    

    10. Métriques

    @Component
    public class PipelineMetrics {
    
        private final MeterRegistry registry;
    
        public void recordPipelineExecution(PipelineResult<?> result) {
            Timer.builder("socle_pipeline_duration")
                .tag("pipeline", result.pipelineName())
                .tag("status", result.status().name())
                .register(registry)
                .record(result.totalDuration());
    
            Counter.builder("socle_pipeline_executions")
                .tag("pipeline", result.pipelineName())
                .tag("status", result.status().name())
                .register(registry)
                .increment();
        }
    
        public void recordStepExecution(StepResult<?> result) {
            Timer.builder("socle_pipeline_step_duration")
                .tag("step", result.stepName())
                .tag("status", result.status().name())
                .register(registry)
                .record(result.duration());
        }
    }
    

    11. Bonnes pratiques

    DO

    • Garder les étapes petites et focalisées
    • Utiliser le context pour partager des données entre étapes
    • Marquer les étapes non-critiques comme optionnelles
    • Logger au niveau des étapes pour le debugging
    • Configurer des retries appropriés pour les appels externes

    DON’T

    • Ne pas faire de pipelines avec trop d’étapes (> 10)
    • Ne pas mélanger logique métier et infrastructure dans une étape
    • Ne pas ignorer les erreurs des étapes obligatoires
    • Ne pas utiliser de pipelines pour du traitement simple

    12. Références

    Pipeline Engine V2 (Nouveau)

    Ajouté en version 4.1.0

    13. Introduction Pipeline V2

    Le Pipeline V2 est une refonte complète du système de pipelines, conçue pour les applications à haute disponibilité nécessitant :

    • Garantie at-least-once : Aucune perte de message grâce au pattern Queue/Claim/Ack
    • Virtual Threads (Java 21) : Scalabilité maximale avec les threads virtuels
    • Reprise après crash : État persisté permettant la reprise au dernier stage OK
    • DLQ (Dead Letter Queue) : Gestion des échecs définitifs avec replay

    Quand utiliser V2 vs V1 ?

    Critère Pipeline V1 Pipeline V2
    Exécution Synchrone, mono-thread Asynchrone, multi-thread
    Garantie de livraison At-most-once At-least-once
    Persistance Non Oui (TechDB / H2)
    DLQ Non Oui
    Use case Traitement simple Flux critiques, haute disponibilité

    14. Architecture V2

    ┌─────────────────────────────────────────────────────────────────┐
    │                            MOP                                   │
    │               (supervision, restart, health)                     │
    └───────────────────────────┬─────────────────────────────────────┘
                                │
    ┌───────────────────────────▼─────────────────────────────────────┐
    │                     PipelineV2                                   │
    │                                                                  │
    │  ┌────────────┐     ┌────────────┐     ┌────────────┐          │
    │  │  Stage A   │────►│  Stage B   │────►│  Stage C   │          │
    │  │ ┌────────┐ │     │ ┌────────┐ │     │ ┌────────┐ │          │
    │  │ │ Queue  │ │     │ │ Queue  │ │     │ │ Queue  │ │          │
    │  │ └────────┘ │     │ └────────┘ │     │ └────────┘ │          │
    │  │ VThreads  │     │ VThreads  │     │ VThreads  │          │
    │  └────────────┘     └────────────┘     └────────────┘          │
    │                                                                  │
    │         ┌─────────────────┴─────────────────┐                   │
    │    ┌────▼────┐                        ┌────▼────┐              │
    │    │ Context │                        │   DLQ   │              │
    │    │ (state) │                        │ (errors)│              │
    │    └─────────┘                        └─────────┘              │
    └─────────────────────────────────────────────────────────────────┘
    

    Composants

    • StageQueue : Queue avec sémantique Claim/Ack entre chaque stage
    • VThreadStageExecutor : Workers Virtual Threads consommant les queues
    • PersistentPipelineContext : État persisté pour reprise après crash
    • DLQ : Messages en échec après max retries

    15. Création d’un Pipeline V2

    15.1 Builder Fluent

    import eu.lmvi.socle.pipeline.v2.PipelineBuilderV2;
    import eu.lmvi.socle.pipeline.v2.PipelineV2;
    
    // Création du pipeline
    PipelineV2<Order, Result> pipeline = PipelineBuilderV2
        .<Order>create("order-processing")
        .description("Pipeline de traitement des commandes")
    
        // Stage 1: Validation (2 workers, timeout 30s)
        .addStage("validate", this::validateOrder)
            .concurrency(2)
            .timeout(Duration.ofSeconds(30))
    
        // Stage 2: Enrichissement (4 workers, queue 1000)
        .addStage("enrich", this::enrichOrder)
            .concurrency(4)
            .queueSize(1000)
    
        // Stage 3: Publication (2 workers)
        .addStage("publish", this::publishOrder)
            .concurrency(2)
            .maxRetries(5)
            .backoff(Duration.ofMillis(200), 2.0)
    
        .build();
    

    15.2 Configuration par stage

    .addStage("nom", processor)
        .concurrency(4)           // Nombre de workers parallèles
        .queueSize(500)           // Taille max de la queue
        .timeout(Duration.ofMinutes(5))  // Timeout par message
        .maxRetries(3)            // Tentatives avant DLQ
        .backoff(base, multiplier) // Backoff exponentiel
        .optional()               // Stage optionnel (échec non bloquant)
    

    16. Cycle de vie

    16.1 Démarrage/Arrêt

    // Démarrer le pipeline (lance tous les workers)
    pipeline.start();
    
    // Vérifier l'état
    boolean running = pipeline.isRunning();
    
    // Arrêt gracieux (attend la fin des traitements en cours)
    pipeline.stop();
    
    // Arrêt immédiat (interrompt tout)
    pipeline.stopNow();
    

    16.2 Soumission de messages

    // Soumettre un message
    String messageId = pipeline.submit(order, "exec-001");
    
    // Avec métadonnées
    String messageId = pipeline.submit(order, "exec-001", Map.of(
        "source", "api",
        "priority", "high"
    ));
    

    16.3 Suivi d’exécution

    // État d'une exécution
    Optional<PipelineState> state = pipeline.getExecutionState("exec-001");
    state.ifPresent(s -> {
        System.out.println("Status: " + s.status());
        System.out.println("Current stage: " + s.currentStage());
        System.out.println("Duration: " + s.duration());
    });
    
    // Liste des exécutions actives
    List<PipelineState> active = pipeline.listActiveExecutions();
    
    // Statistiques globales
    PipelineV2.PipelineStats stats = pipeline.getStats();
    System.out.println("Submitted: " + stats.totalSubmitted());
    System.out.println("Completed: " + stats.totalCompleted());
    System.out.println("Failed: " + stats.totalFailed());
    

    17. Queue/Claim/Ack

    17.1 Pattern de consommation

    Le Pipeline V2 utilise un pattern de consommation fiable :

    1. CLAIM   : Le worker réserve un message (lease)
    2. PROCESS : Le worker traite le message
    3. ACK     : Succès → message supprimé
       ou
       NACK    : Échec → retry ou DLQ
    

    17.2 Lease et récupération

    Si un worker crash pendant le traitement :

    • Le lease expire après le timeout configuré
    • Le message est automatiquement remis en queue
    • Un autre worker le reprend
    // Timeout de lease par défaut: 5 minutes
    // Configurable par stage via timeout()
    

    17.3 Implémentations disponibles

    Implémentation Persistance Use case
    InMemoryStageQueue Non Développement, tests
    TechDbStageQueue Oui (H2) Production

    18. Dead Letter Queue (DLQ)

    18.1 Envoi en DLQ

    Un message est envoyé en DLQ après :

    • maxRetries tentatives échouées
    • Une erreur non-retryable

    18.2 Consultation

    // Via API REST
    GET /admin/pipelines/v2/{name}/dlq
    
    // Programmatiquement (via StageQueue)
    List<DlqMessage<Order>> messages = queue.peekDlq(100);
    for (DlqMessage<Order> msg : messages) {
        System.out.println("ID: " + msg.messageId());
        System.out.println("Error: " + msg.errorMessage());
        System.out.println("Attempts: " + msg.attempts());
        System.out.println("Failed at: " + msg.failedAt());
    }
    

    18.3 Replay

    // Via API REST
    POST /admin/pipelines/v2/{name}/dlq/{messageId}/replay
    
    // Programmatiquement
    boolean replayed = queue.replayFromDlq(messageId);
    
    // Replay tous les messages
    int count = queue.replayAllFromDlq("exec-001");
    

    18.4 Suppression

    // Via API REST
    DELETE /admin/pipelines/v2/{name}/dlq/{messageId}
    
    // Programmatiquement
    queue.deleteFromDlq(messageId);
    
    // Purge complète
    queue.purgeDlq();
    

    19. Persistance et reprise

    19.1 Context persistant

    Le Pipeline V2 peut persister son état pour survivre aux redémarrages :

    import eu.lmvi.socle.pipeline.context.TechDbPipelineContext;
    
    // Créer un context persistant
    Supplier<Connection> connectionSupplier = () -> dataSource.getConnection();
    PersistentPipelineContext context = new TechDbPipelineContext(connectionSupplier);
    
    // Utiliser avec le builder
    PipelineV2<Order, Result> pipeline = PipelineBuilderV2
        .<Order>create("order-processing")
        .context(context)  // Utilise le context persistant
        .addStage(...)
        .build();
    

    19.2 Reprise après crash

    Au redémarrage, les messages non-ackés sont automatiquement re-traités :

    1. Les messages en status CLAIMED dont le lease a expiré → remis en PENDING
    2. Les stages reprennent leur consommation normalement
    3. Grâce à l’idempotence, les messages déjà traités ne sont pas re-traités

    20. Idempotence (OBLIGATOIRE)

    20.1 Pourquoi ?

    Le modèle at-least-once signifie qu’un message peut être traité plusieurs fois :

    • Crash après traitement mais avant ACK
    • Timeout de lease pendant un traitement long
    • Replay depuis la DLQ

    Chaque stage DOIT être idempotent.

    20.2 Patterns recommandés

    // Pattern 1: Vérifier "déjà traité"
    public Order processOrder(Order order) {
        if (orderRepository.exists(order.getId())) {
            return orderRepository.get(order.getId()); // Déjà traité
        }
        // Traiter...
        return orderRepository.save(order);
    }
    
    // Pattern 2: UPSERT au lieu d'INSERT
    public void saveOrder(Order order) {
        orderRepository.upsert(order); // Idempotent par nature
    }
    
    // Pattern 3: Utiliser executionId comme clé
    public void processWithDedup(Order order, String executionId) {
        String dedupKey = "order:" + order.getId() + ":" + executionId;
        if (kvBus.exists(dedupKey)) {
            return; // Déjà traité pour cette exécution
        }
    
        // Traiter...
    
        kvBus.set(dedupKey, true, Duration.ofDays(7));
    }
    

    20.3 Anti-patterns

    // MAUVAIS: INSERT sans vérification
    db.insert(order); // Échoue si déjà inséré
    
    // MAUVAIS: Compteur incrémental
    counter.increment(); // Compté plusieurs fois en cas de replay
    
    // MAUVAIS: Email sans déduplication
    emailService.send(email); // Envoyé plusieurs fois
    

    21. API REST d’administration

    21.1 Endpoints disponibles

    GET  /admin/pipelines/v2                    → Liste des pipelines
    GET  /admin/pipelines/v2/{name}             → État d'un pipeline
    GET  /admin/pipelines/v2/{name}/stages      → État des stages
    GET  /admin/pipelines/v2/{name}/executions  → Exécutions en cours
    
    GET  /admin/pipelines/v2/{name}/dlq         → Messages en DLQ
    POST /admin/pipelines/v2/{name}/dlq/{id}/replay  → Rejouer un message
    DELETE /admin/pipelines/v2/{name}/dlq/{id}  → Supprimer de la DLQ
    POST /admin/pipelines/v2/{name}/dlq/replay-all   → Rejouer tous
    
    POST /admin/pipelines/v2/{name}/start       → Démarrer le pipeline
    POST /admin/pipelines/v2/{name}/stop        → Arrêter gracieusement
    POST /admin/pipelines/v2/{name}/stop-now    → Arrêter immédiatement
    

    21.2 Exemple de réponse

    GET /admin/pipelines/v2/order-processing
    
    {
      "timestamp": 1735387200000,
      "name": "order-processing",
      "description": "Pipeline de traitement des commandes",
      "running": true,
      "stats": {
        "stage_count": 3,
        "active_executions": 5,
        "total_submitted": 1500,
        "total_completed": 1480,
        "total_failed": 12
      }
    }
    

    22. Métriques Prometheus

    22.1 Métriques exposées

    # Compteurs
    socle_pipeline_v2_messages_total{pipeline="order-processing", stage="validate", status="success"}
    socle_pipeline_v2_messages_total{pipeline="order-processing", stage="validate", status="failed"}
    socle_pipeline_v2_dlq_total{pipeline="order-processing", stage="validate"}
    
    # Gauges
    socle_pipeline_v2_queue_pending{pipeline="order-processing", stage="validate"}
    socle_pipeline_v2_queue_processing{pipeline="order-processing", stage="validate"}
    socle_pipeline_v2_dlq_size{pipeline="order-processing", stage="validate"}
    socle_pipeline_v2_active_executions{pipeline="order-processing"}
    socle_pipeline_v2_running{pipeline="order-processing"}
    
    # Histogrammes (durée de traitement)
    socle_pipeline_v2_stage_duration_seconds{pipeline="order-processing", stage="validate", quantile="0.5"}
    socle_pipeline_v2_stage_duration_seconds{pipeline="order-processing", stage="validate", quantile="0.95"}
    socle_pipeline_v2_stage_duration_seconds{pipeline="order-processing", stage="validate", quantile="0.99"}
    

    22.2 Alertes recommandées

    # DLQ croissante
    - alert: PipelineDLQGrowing
      expr: increase(socle_pipeline_v2_dlq_total[1h]) > 10
      labels:
        severity: warning
    
    # Queue qui s'accumule (backpressure)
    - alert: PipelineQueueBacklog
      expr: socle_pipeline_v2_queue_pending > 1000
      for: 5m
      labels:
        severity: warning
    
    # Pipeline arrêté
    - alert: PipelineStopped
      expr: socle_pipeline_v2_running == 0
      for: 1m
      labels:
        severity: critical
    

    23. Bonnes pratiques V2

    DO

    • Toujours implémenter l’idempotence dans chaque stage
    • Utiliser TechDbPipelineContext en production pour la persistance
    • Configurer des maxRetries appropriés (3-5 pour API externes)
    • Monitorer les métriques DLQ et queue_pending
    • Traiter régulièrement les messages en DLQ (replay ou suppression)

    DON’T

    • Ne jamais supposer qu’un message ne sera traité qu’une fois
    • Ne pas ignorer les messages en DLQ
    • Ne pas mettre de timeout trop courts (risque de faux positifs)
    • Ne pas créer trop de stages (overhead de queues)
    • Ne pas oublier d’appeler pipeline.stop() à l’arrêt de l’application

    24. Migration V1 → V2

    24.1 Changements d’API

    V1 V2
    PipelineEngine.execute(pipeline, input) pipeline.submit(input, executionId)
    Pipeline<I,O> PipelineV2<I,O>
    PipelineBuilder PipelineBuilderV2
    Exécution synchrone Exécution asynchrone

    24.2 Étapes de migration

    1. Rendre les steps idempotents (obligatoire avant migration)
    2. Créer le pipeline avec PipelineBuilderV2
    3. Remplacer execute() par submit()
    4. Ajouter le suivi asynchrone des résultats si nécessaire
    5. Configurer la persistance (TechDbPipelineContext)
    6. Activer les métriques et monitoring DLQ

    25. Configuration Dynamique (YAML)

    Ajouté en version 4.1.0

    25.1 Introduction

    Le Pipeline V2 peut maintenant être configuré entièrement via YAML, permettant :

    • Activation/désactivation des stages sans recompiler
    • Configuration dynamique de la concurrence, timeouts, retries
    • Ajout de nouveaux stages sans modifier le code de la factory
    • Modularité des workers (un worker = un composant Spring)

    25.2 Architecture

    ┌─────────────────────────────────────────────────────────────────┐
    │                        application.yml                           │
    │  cdc:                                                            │
    │    pipeline:                                                     │
    │      stages:                                                     │
    │        - name: filter                                            │
    │          worker: filterWorker                                    │
    │          enabled: true                                           │
    └───────────────────────────────┬─────────────────────────────────┘
                                    │
                                    ▼
    ┌───────────────────────────────────────────────────────────────────┐
    │            PipelineYamlConfig (@ConfigurationProperties)          │
    │                                                                    │
    │  - name: String                                                    │
    │  - stages: List<StageYamlConfig>                                   │
    │  - getEnabledStages(): List<StageYamlConfig>                       │
    └───────────────────────────────┬───────────────────────────────────┘
                                    │
                                    ▼
    ┌───────────────────────────────────────────────────────────────────┐
    │                    DynamicPipelineBuilder                          │
    │                                                                    │
    │  .config(pipelineConfig)                                           │
    │  .workers(Map<String, PipelineStageWorker>)                        │
    │  .build() → PipelineV2<I, O>                                       │
    └───────────────────────────────────────────────────────────────────┘
    

    25.3 Interface PipelineStageWorker

    Chaque stage est implémenté comme un composant Spring implémentant PipelineStageWorker :

    package eu.lmvi.socle.pipeline.v2.worker;
    
    public interface PipelineStageWorker<I, O> {
    
        /**
         * Nom unique du worker (utilisé dans la config YAML)
         */
        String getName();
    
        /**
         * Initialise le worker (chargement ressources, compilation scripts, etc.)
         */
        default void initialize() throws Exception {}
    
        /**
         * Traite un élément d'entrée (DOIT être thread-safe)
         */
        O process(I input) throws Exception;
    
        /**
         * Libère les ressources
         */
        default void shutdown() {}
    
        /**
         * Indique si ce worker est activé
         */
        default boolean isEnabled() { return true; }
    
        /**
         * Description pour le monitoring
         */
        default String getDescription() { return getName(); }
    
        /**
         * Ordre de priorité (plus bas = plus tôt)
         */
        default int getOrder() { return 100; }
    }
    

    25.4 Exemple d’implémentation

    @Component("filterWorker")
    public class FilterStageWorker implements PipelineStageWorker<WalEvent, PipelineMessage> {
    
        @Autowired
        private TableFilter tableFilter;
    
        @Override
        public String getName() {
            return "filter";
        }
    
        @Override
        public int getOrder() {
            return 10;  // Premier dans le pipeline
        }
    
        @Override
        public PipelineMessage process(WalEvent event) throws Exception {
            PipelineMessage message = new PipelineMessage(event);
            if (!tableFilter.isAllowed(event.getTable())) {
                message.markAsFiltered("table_not_in_whitelist");
            }
            return message;
        }
    }
    

    25.5 Configuration YAML

    cdc:
      pipeline:
        name: cdc-pipeline-v2
        description: Pipeline CDC PostgreSQL vers Kafka
        enabled: true
        stages:
          # Stage 1: Filtrage
          - name: filter
            worker: filterWorker        # Nom du bean Spring
            enabled: true
            concurrency: 1
            timeout-seconds: 5
            max-retries: 1
            order: 10
    
          # Stage 2: Transformation
          - name: transform
            worker: transformWorker
            enabled: true
            concurrency: 2
            timeout-seconds: 30
            order: 20
    
          # Stage 3a: Enrichissement Rules
          - name: enrich-rules
            worker: rulesEnrichmentWorker
            enabled: true
            concurrency: 2
            order: 30
    
          # Stage 3b: Enrichissement BeanShell
          - name: enrich-beanshell
            worker: beanshellEnrichmentWorker
            enabled: ${CDC_ENRICH_BSH_ENABLED:true}  # Variable d'environnement
            concurrency: 2
            order: 31
    
          # Stage 3c: Enrichissement JavaScript
          - name: enrich-javascript
            worker: javascriptEnrichmentWorker
            enabled: ${CDC_ENRICH_JS_ENABLED:true}
            concurrency: 4
            timeout-seconds: 60
            order: 32
    
          # Stage 3d: Enrichissement Janino (nouveau)
          - name: enrich-janino
            worker: janinoEnrichmentWorker
            enabled: ${CDC_ENRICH_JANINO_ENABLED:false}  # Désactivé par défaut
            concurrency: 2
            order: 33
    
          # Stage 4: Publication Kafka
          - name: publish
            worker: publishWorker
            enabled: true
            concurrency: 2
            max-retries: 5
            order: 40
    
          # Stage 5: Commit LSN
          - name: commit
            worker: commitWorker
            enabled: true
            concurrency: 1
            order: 50
    

    25.6 Classe de configuration Spring

    @Configuration
    @ConfigurationProperties(prefix = "cdc.pipeline")
    public class CdcPipelineConfig extends PipelineYamlConfig {
        // Hérite de toutes les propriétés
    }
    

    25.7 Construction du pipeline

    @Component
    public class CdcPipelineFactory {
    
        @Autowired
        private CdcPipelineConfig pipelineConfig;
    
        @Autowired
        private Map<String, PipelineStageWorker<?, ?>> availableWorkers;
    
        @PostConstruct
        public void init() {
            // Log des workers disponibles
            log.info("Available workers: {}", availableWorkers.keySet());
    
            // Construire le pipeline dynamiquement
            cdcPipeline = DynamicPipelineBuilder.create()
                .config(pipelineConfig)
                .workers(availableWorkers)
                .build();
    
            log.info("Pipeline created with {} stages",
                pipelineConfig.countEnabledStages());
        }
    }
    

    25.8 Avantages

    Aspect Avant (hardcodé) Après (YAML)
    Modification Recompilation Redémarrage
    Activation stage Code Java Variable d’env
    Ajout stage Modifier factory Ajouter worker + config
    Configuration Dans le code Centralisée
    Environnements Branches différentes Même code, config différente

    25.9 Activation/Désactivation à chaud

    Via variables d’environnement dans .env :

    # Désactiver BeanShell
    CDC_ENRICH_BSH_ENABLED=false
    
    # Activer Janino
    CDC_ENRICH_JANINO_ENABLED=true
    
    # Redémarrer le service
    sudo systemctl restart cdc-reflet-kafka-v02
    

    25.10 Ajout d’un nouveau type d’enrichissement

    1. Créer le worker :
    @Component("monNouveauWorker")
    public class MonNouveauEnrichmentWorker
        implements PipelineStageWorker<PipelineMessage, PipelineMessage> {
        // ...
    }
    
    1. Ajouter dans application.yml :
    stages:
      - name: enrich-nouveau
        worker: monNouveauWorker
        enabled: ${CDC_ENRICH_NOUVEAU_ENABLED:false}
        order: 34
    
    1. Redémarrer – le pipeline inclura automatiquement le nouveau stage.

    26. Références V2