Skip to content

Periodic task executions on top of db-queue library

License

Notifications You must be signed in to change notification settings

yoomoney/db-queue-scheduler

Repository files navigation

Build Status License: MIT Download

Database Scheduler

Library provides periodic task executions on top of db-queue library.

Project uses Semantic Versioning.

Library is available on Maven Central.

implementation 'ru.yoomoney.tech:db-queue-scheduler-core:3.0.0',
               'ru.yoomoney.tech:db-queue-scheduler-spring:3.0.0'

Features

  • Persisted periodic tasks;
  • At most once task execution at the same time;
  • Different schedule configuration: cron expressions, fixed rates, fixed delays, dynamic calculations;
  • Tracing support;
  • Task event listeners to build up monitoring;
  • Many other features.

The library provides only (recurring tasks)/(periodic tasks)/(scheduled tasks) functionality - that allows executing tasks periodically. If you need one-time tasks - tasks that are executed once, please, look at db-queue library.

Database configuration

The project uses db-queue to work with a database.

The library requires a single database table where periodic tasks are stored.

PostgreSQL DDL

CREATE TABLE scheduled_tasks (
  id                BIGSERIAL PRIMARY KEY,
  queue_name        TEXT NOT NULL,
  payload           TEXT,
  created_at        TIMESTAMP WITH TIME ZONE DEFAULT now(),
  next_process_at   TIMESTAMP WITH TIME ZONE DEFAULT now(),
  attempt           INTEGER                  DEFAULT 0,
  reenqueue_attempt INTEGER                  DEFAULT 0,
  total_attempt     INTEGER                  DEFAULT 0
);
CREATE UNIQUE INDEX scheduled_tasks_uq ON scheduled_tasks (queue_name);

MSSQL DDL

CREATE TABLE scheduled_tasks (
  id                INT IDENTITY(1,1) NOT NULL,
  queue_name        VARCHAR(100) NOT NULL,
  payload           TEXT,
  created_at        DATETIMEOFFSET NOT NULL  DEFAULT SYSDATETIMEOFFSET(),
  next_process_at   DATETIMEOFFSET NOT NULL  DEFAULT SYSDATETIMEOFFSET(),
  attempt           INTEGER NOT NULL         DEFAULT 0,
  reenqueue_attempt INTEGER NOT NULL         DEFAULT 0,
  total_attempt     INTEGER NOT NULL         DEFAULT 0,
  PRIMARY KEY (id)
);
CREATE UNIQUE INDEX scheduled_tasks_uq ON scheduled_tasks (queue_name);

Oracle DDL

CREATE TABLE scheduled_tasks (
  id                NUMBER(38) NOT NULL PRIMARY KEY,
  queue_name        VARCHAR2(128) NOT NULL,
  payload           CLOB,
  created_at        TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP,
  next_process_at   TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP,
  attempt           NUMBER(38)                  DEFAULT 0,
  reenqueue_attempt NUMBER(38)                  DEFAULT 0,
  total_attempt     NUMBER(38)                  DEFAULT 0
);
CREATE UNIQUE INDEX scheduled_tasks_uq ON scheduled_tasks (queue_name);

-- Create sequence and specify its name through scheduler configurator.
CREATE SEQUENCE scheduled_tasks_seq;

H2 Database DDL

CREATE TABLE scheduled_tasks (
  id                BIGSERIAL PRIMARY KEY,
  queue_name        VARCHAR(100) NOT NULL,
  payload           VARCHAR(100),
  created_at        TIMESTAMP WITH TIME ZONE DEFAULT now(),
  next_process_at   TIMESTAMP WITH TIME ZONE DEFAULT now(),
  attempt           INTEGER                  DEFAULT 0,
  reenqueue_attempt INTEGER                  DEFAULT 0,
  total_attempt     INTEGER                  DEFAULT 0
);
CREATE UNIQUE INDEX scheduled_tasks_uq ON scheduled_tasks (queue_name);

Example

Scheduler scheduler = new SpringSchedulerConfigurator()
        .withDatabaseDialect(DatabaseDialect.POSTGRESQL)
        .withTableName("scheduled_tasks")
        .withJdbcOperations(jdbcTemplate)
        .withTransactionOperations(transactionTemplate)
        .configure();

         
ScheduledTask task = SimpleScheduledTask.create(
        "scheduled-task-id",
        context -> {
            System.out.println("Hello World!");
            return ScheduledTaskExecutionResult.success();
        });

ScheduledTaskSettings settings = ScheduledTaskSettings.builder()
        .withScheduleSettings(ScheduleSettings.fixedDelay(Duration.ofHours(1L)))
        .withFailureSettings(FailureSettings.none())
        .build()

scheduler.schedule(task, settings);

scheduler.start();

See also our runnable example.

How it works

Overview

  1. When a new scheduled task is registered, the library creates a new db-queue queue that linked exactly to the registered scheduled task;
  2. If the db-queue queue does not have a task, the library creates a new one and postpones it according to the linked schedule settings;
  3. Each db-queue queue have a consumer that executes its linked scheduled task;
  4. When the consumer got a db-queue task it does the following steps:
    1. Postponing the next execution time of the db-queue task according to the linked schedule settings and failure settings;
    2. Starting HeartbeatAgent to prevent concurrent execution of the same task by different application nodes in case of a time-consuming execution;
    3. Executing the linked scheduled task;
    4. Postponing db-queue task according to the result of the last execution of the linked scheduled task.

Schedule and failure settings

The library lets configure ScheduleSettings and FailureSettings.

FailureSettings is applied each time when an execution result is ERROR and the following conditions are true:

  1. The execution task attempts since the last successful one is less than FailureSettings.maxAttempts;
  2. ScheduleSettings.CronSettings is not configured, or the next execution time computed via FailureSettings is earlier than the one computed via ScheduleSettings.CronSettings.

How to contribute?

Just fork the repo and send us a pull request.

Make sure your branch builds without any warnings/issues.