Socle V004 – Scheduler

Socle V004 - Scheduler

12 – Scheduler

Version : 4.0.0 Date : 2025-12-09

1. Introduction

Le Scheduler permet d’exécuter des Workers selon des expressions cron ou à intervalles réguliers.

Caractéristiques

  • Support des expressions cron standard
  • Exécution à intervalle fixe
  • Gestion du chevauchement
  • Intégration avec le MOP

2. Configuration

2.1 application.yml

socle:
  scheduler:
    enabled: ${SCHEDULER_ENABLED:true}
    thread-pool-size: ${SCHEDULER_POOL_SIZE:4}
    default-timezone: ${SCHEDULER_TIMEZONE:Europe/Paris}

2.2 Variables d’environnement

Variable Description Défaut
SCHEDULER_ENABLED Activer le scheduler true
SCHEDULER_POOL_SIZE Taille du thread pool 4
SCHEDULER_TIMEZONE Timezone par défaut Europe/Paris

3. Types de scheduling

3.1 Cron

Expressions cron standard (6 champs) :

┌───────────── seconde (0-59)
│ ┌───────────── minute (0-59)
│ │ ┌───────────── heure (0-23)
│ │ │ ┌───────────── jour du mois (1-31)
│ │ │ │ ┌───────────── mois (1-12)
│ │ │ │ │ ┌───────────── jour de la semaine (0-6, 0=dimanche)
│ │ │ │ │ │
* * * * * *

Exemples :

  • 0 0 6 * * ? : Tous les jours à 6h00
  • 0 */15 * * * ? : Toutes les 15 minutes
  • 0 0 0 1 * ? : Premier jour de chaque mois à minuit
  • 0 30 8 ? * MON-FRI : 8h30 du lundi au vendredi

3.2 Intervalle fixe

Exécution périodique simple :

@Override
public long getCycleIntervalMs() {
    return 60000;  // Toutes les minutes
}

4. Worker schedulé

4.1 Avec expression cron

@Component
public class DailyReportWorker implements Worker {

    private static final Logger log = LoggerFactory.getLogger(DailyReportWorker.class);

    @Override
    public String getName() {
        return "daily-report-worker";
    }

    @Override
    public String getSchedule() {
        return "0 0 6 * * ?";  // Tous les jours à 6h
    }

    @Override
    public boolean isScheduled() {
        return true;
    }

    @Override
    public void doWork() {
        log.info("Generating daily report...");
        generateReport();
    }

    private void generateReport() {
        // Génération du rapport
    }

    // Autres méthodes Worker...
}

4.2 Avec intervalle

@Component
public class HealthCheckWorker implements Worker {

    @Override
    public String getName() {
        return "health-check-worker";
    }

    @Override
    public String getSchedule() {
        return null;  // Pas de cron
    }

    @Override
    public boolean isScheduled() {
        return false;  // Pas schedulé par cron
    }

    @Override
    public long getCycleIntervalMs() {
        return 30000;  // Toutes les 30 secondes
    }

    @Override
    public void doWork() {
        checkHealth();
    }
}

5. Interface Scheduler

package eu.lmvi.socle.scheduler;

public interface Scheduler {

    /**
     * Planifie un job cron
     */
    void scheduleCron(String jobId, String cronExpression, Runnable task);

    /**
     * Planifie un job à intervalle fixe
     */
    void scheduleInterval(String jobId, long intervalMs, Runnable task);

    /**
     * Planifie un job à intervalle fixe avec délai initial
     */
    void scheduleInterval(String jobId, long initialDelayMs, long intervalMs, Runnable task);

    /**
     * Planifie un job one-shot
     */
    void scheduleOnce(String jobId, long delayMs, Runnable task);

    /**
     * Annule un job
     */
    void cancel(String jobId);

    /**
     * Vérifie si un job est planifié
     */
    boolean isScheduled(String jobId);

    /**
     * Liste les jobs planifiés
     */
    List<ScheduledJob> getScheduledJobs();

    /**
     * Démarre le scheduler
     */
    void start();

    /**
     * Arrête le scheduler
     */
    void stop();
}

6. Implémentation

package eu.lmvi.socle.scheduler;

@Component
public class DefaultScheduler implements Scheduler {

    private static final Logger log = LoggerFactory.getLogger(DefaultScheduler.class);

    private final ScheduledExecutorService executor;
    private final ConcurrentHashMap<String, ScheduledFuture<?>> jobs = new ConcurrentHashMap<>();
    private final ConcurrentHashMap<String, ScheduledJob> jobInfo = new ConcurrentHashMap<>();
    private final ZoneId timezone;

    public DefaultScheduler(SocleConfiguration config) {
        int poolSize = config.getScheduler().getThreadPoolSize();
        this.executor = Executors.newScheduledThreadPool(poolSize,
            r -> new Thread(r, "scheduler-" + System.currentTimeMillis()));
        this.timezone = ZoneId.of(config.getScheduler().getDefaultTimezone());
    }

    @Override
    public void scheduleCron(String jobId, String cronExpression, Runnable task) {
        CronExpression cron = CronExpression.parse(cronExpression);

        Runnable scheduledTask = () -> {
            log.debug("Executing cron job: {}", jobId);
            try {
                task.run();
            } catch (Exception e) {
                log.error("Error executing job {}: {}", jobId, e.getMessage(), e);
            }
            // Replanifier la prochaine exécution
            scheduleNextCronExecution(jobId, cron, task);
        };

        scheduleNextCronExecution(jobId, cron, task);

        jobInfo.put(jobId, new ScheduledJob(jobId, "cron", cronExpression, null, Instant.now()));
        log.info("Scheduled cron job: {} with expression: {}", jobId, cronExpression);
    }

    private void scheduleNextCronExecution(String jobId, CronExpression cron, Runnable task) {
        ZonedDateTime now = ZonedDateTime.now(timezone);
        ZonedDateTime next = cron.next(now);

        if (next != null) {
            long delayMs = Duration.between(now, next).toMillis();

            ScheduledFuture<?> future = executor.schedule(() -> {
                task.run();
                scheduleNextCronExecution(jobId, cron, task);
            }, delayMs, TimeUnit.MILLISECONDS);

            jobs.put(jobId, future);
        }
    }

    @Override
    public void scheduleInterval(String jobId, long intervalMs, Runnable task) {
        scheduleInterval(jobId, 0, intervalMs, task);
    }

    @Override
    public void scheduleInterval(String jobId, long initialDelayMs, long intervalMs, Runnable task) {
        Runnable wrappedTask = () -> {
            log.debug("Executing interval job: {}", jobId);
            try {
                task.run();
            } catch (Exception e) {
                log.error("Error executing job {}: {}", jobId, e.getMessage(), e);
            }
        };

        ScheduledFuture<?> future = executor.scheduleAtFixedRate(
            wrappedTask, initialDelayMs, intervalMs, TimeUnit.MILLISECONDS);

        jobs.put(jobId, future);
        jobInfo.put(jobId, new ScheduledJob(jobId, "interval", null, intervalMs, Instant.now()));

        log.info("Scheduled interval job: {} every {}ms", jobId, intervalMs);
    }

    @Override
    public void scheduleOnce(String jobId, long delayMs, Runnable task) {
        ScheduledFuture<?> future = executor.schedule(() -> {
            log.debug("Executing one-shot job: {}", jobId);
            try {
                task.run();
            } finally {
                jobs.remove(jobId);
                jobInfo.remove(jobId);
            }
        }, delayMs, TimeUnit.MILLISECONDS);

        jobs.put(jobId, future);
        jobInfo.put(jobId, new ScheduledJob(jobId, "once", null, delayMs, Instant.now()));

        log.info("Scheduled one-shot job: {} in {}ms", jobId, delayMs);
    }

    @Override
    public void cancel(String jobId) {
        ScheduledFuture<?> future = jobs.remove(jobId);
        if (future != null) {
            future.cancel(false);
            jobInfo.remove(jobId);
            log.info("Cancelled job: {}", jobId);
        }
    }

    @Override
    public boolean isScheduled(String jobId) {
        return jobs.containsKey(jobId);
    }

    @Override
    public List<ScheduledJob> getScheduledJobs() {
        return new ArrayList<>(jobInfo.values());
    }

    @Override
    public void start() {
        log.info("Scheduler started");
    }

    @Override
    public void stop() {
        log.info("Stopping scheduler...");
        jobs.values().forEach(f -> f.cancel(false));
        jobs.clear();
        executor.shutdown();
        try {
            if (!executor.awaitTermination(30, TimeUnit.SECONDS)) {
                executor.shutdownNow();
            }
        } catch (InterruptedException e) {
            executor.shutdownNow();
            Thread.currentThread().interrupt();
        }
        log.info("Scheduler stopped");
    }
}

7. Gestion du chevauchement

7.1 Éviter le chevauchement

@Component
public class LongRunningWorker implements Worker {

    private final AtomicBoolean running = new AtomicBoolean(false);

    @Override
    public void doWork() {
        // Éviter l'exécution concurrente
        if (!running.compareAndSet(false, true)) {
            log.warn("Previous execution still running, skipping");
            return;
        }

        try {
            doLongWork();
        } finally {
            running.set(false);
        }
    }

    private void doLongWork() {
        // Traitement long
    }
}

7.2 Avec verrou distribué (multi-instances)

@Component
public class DistributedScheduledWorker implements Worker {

    @Autowired
    private KvBus kvBus;

    @Override
    public void doWork() {
        String lockKey = "lock:job:" + getName();

        // Tenter d'acquérir le lock
        if (!kvBus.putIfAbsent(lockKey, "locked", Duration.ofMinutes(10))) {
            log.debug("Job already running on another instance");
            return;
        }

        try {
            executeJob();
        } finally {
            kvBus.delete(lockKey);
        }
    }
}

8. Intégration MOP

Le MOP intègre automatiquement les workers schedulés :

// Dans MainOrchestratorProcess
private void startScheduledWorkers() {
    for (Worker worker : workers) {
        if (worker.isScheduled() && worker.getSchedule() != null) {
            scheduler.scheduleCron(
                "worker:" + worker.getName(),
                worker.getSchedule(),
                () -> {
                    if (worker.isHealthy()) {
                        worker.doWork();
                    }
                }
            );
        }
    }
}

9. API Admin

@RestController
@RequestMapping("/admin/scheduler")
public class SchedulerController {

    @Autowired
    private Scheduler scheduler;

    @GetMapping("/jobs")
    public List<ScheduledJob> listJobs() {
        return scheduler.getScheduledJobs();
    }

    @PostMapping("/jobs/{jobId}/cancel")
    public ResponseEntity<Void> cancelJob(@PathVariable String jobId) {
        if (scheduler.isScheduled(jobId)) {
            scheduler.cancel(jobId);
            return ResponseEntity.ok().build();
        }
        return ResponseEntity.notFound().build();
    }

    @PostMapping("/jobs/{jobId}/trigger")
    public ResponseEntity<Void> triggerJob(@PathVariable String jobId) {
        // Exécution immédiate one-shot
        scheduler.scheduleOnce(jobId + "-manual-" + System.currentTimeMillis(), 0, () -> {
            // Trouver et exécuter le worker correspondant
        });
        return ResponseEntity.accepted().build();
    }
}

10. Expressions Cron communes

Expression Description
0 0 * * * ? Toutes les heures
0 */15 * * * ? Toutes les 15 minutes
0 0 6 * * ? Tous les jours à 6h
0 0 0 * * ? Tous les jours à minuit
0 0 0 * * SUN Tous les dimanches à minuit
0 0 0 1 * ? Premier jour du mois
0 0 8 ? * MON-FRI 8h en semaine
0 0 */2 * * ? Toutes les 2 heures

11. Bonnes pratiques

DO

  • Utiliser des noms de jobs uniques et descriptifs
  • Gérer le chevauchement pour les jobs longs
  • Utiliser des locks distribués en multi-instances
  • Logger le début et la fin des jobs
  • Monitorer l’exécution des jobs

DON’T

  • Ne pas planifier des jobs trop fréquents sans nécessité
  • Ne pas ignorer les erreurs dans les jobs
  • Ne pas créer trop de threads
  • Ne pas bloquer indéfiniment dans un job

12. Références

Commentaires

Laisser un commentaire

Votre adresse e-mail ne sera pas publiée. Les champs obligatoires sont indiqués avec *