From 99061e97be4f950a531bacf33e54e62d9cfdab58 Mon Sep 17 00:00:00 2001 From: Ankitha Damodara Date: Fri, 14 Jun 2024 12:07:44 +0100 Subject: [PATCH] Add a new que adapter to support the que functionality on databse that does not support advisory locking What? This is an attempt to use que with the database that does not support advisory locking. It will use 2 databases. The primary one will be processing the jobs and uses a 2nd database to acquire the advisory lock on the job. --- .github/workflows/tests.yml | 61 ++++++++++++++++-- lib/que.rb | 1 + lib/que/adapters/active_record_with_lock.rb | 68 +++++++++++++++++++++ lib/que/adapters/base.rb | 1 + lib/que/locker.rb | 2 +- lib/que/sql.rb | 24 ++++++++ spec/active_record_with_lock_spec_helper.rb | 30 +++++++++ spec/lib/que/locker_spec.rb | 2 +- spec/spec_helper.rb | 12 +++- 9 files changed, 191 insertions(+), 10 deletions(-) create mode 100644 lib/que/adapters/active_record_with_lock.rb create mode 100644 spec/active_record_with_lock_spec_helper.rb diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index 0006b60f..5a21a947 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -3,7 +3,7 @@ name: tests on: push: -jobs: +jobs: rubocop: runs-on: ubuntu-latest env: @@ -24,7 +24,6 @@ jobs: fail-fast: false matrix: ruby_version: ["3.0", "3.1", "3.2", "3.3"] - runs-on: ubuntu-latest services: postgres: @@ -40,7 +39,6 @@ jobs: --health-interval 10s --health-timeout 5s --health-retries 10 - env: PGDATABASE: que-test PGUSER: ubuntu @@ -63,7 +61,6 @@ jobs: fail-fast: false matrix: ruby_version: ["3.0", "3.1", "3.2", "3.3"] - runs-on: ubuntu-latest services: postgres: @@ -79,7 +76,6 @@ jobs: --health-interval 10s --health-timeout 5s --health-retries 10 - env: PGDATABASE: que-test PGUSER: ubuntu @@ -96,3 +92,58 @@ jobs: - name: Run specs run: | bundle exec rspec + + active_record_with_lock_adapter_rspec: + strategy: + fail-fast: false + matrix: + ruby_version: ["3.0", "3.1", "3.2", "3.3"] + runs-on: ubuntu-latest + services: + postgres: + image: postgres:14.2 + env: + POSTGRES_DB: que-test + POSTGRES_USER: ubuntu + POSTGRES_PASSWORD: password + ports: + - 5432:5432 + options: >- + --health-cmd pg_isready + --health-interval 10s + --health-timeout 5s + --health-retries 10 + lock_database: + image: postgres:14.2 + env: + POSTGRES_DB: lock-test + POSTGRES_USER: ubuntu + POSTGRES_PASSWORD: password + ports: + - 5434:5432 + options: >- + --health-cmd pg_isready + --health-interval 10s + --health-timeout 5s + --health-retries 10 + env: + PGDATABASE: que-test + PGUSER: ubuntu + PGPASSWORD: password + PGHOST: localhost + BUNDLE_RUBYGEMS__PKG__GITHUB__COM: gocardless-robot-readonly:${{ secrets.GITHUB_TOKEN }} + LOCK_PGDATABASE: lock-test + LOCK_PGUSER: ubuntu + LOCK_PGPASSWORD: password + LOCK_PGHOST: localhost + ADAPTER: ActiveRecordWithLock + steps: + - uses: actions/checkout@v4 + - name: Set up Ruby + uses: ruby/setup-ruby@v1 + with: + bundler-cache: true + ruby-version: "${{ matrix.ruby-version }}" + - name: Run Specs With ActiveRecordWithLock Adapter + run: bundle exec rspec + diff --git a/lib/que.rb b/lib/que.rb index fbc935e3..78405ce1 100644 --- a/lib/que.rb +++ b/lib/que.rb @@ -63,6 +63,7 @@ def connection=(connection) Adapters::ActiveRecord.new else case connection.class.to_s + when "Que::Adapters::ActiveRecordWithLock" then connection when "Sequel::Postgres::Database" then Adapters::Sequel.new(connection) when "ConnectionPool" then Adapters::ConnectionPool.new(connection) when "PG::Connection" then Adapters::PG.new(connection) diff --git a/lib/que/adapters/active_record_with_lock.rb b/lib/que/adapters/active_record_with_lock.rb new file mode 100644 index 00000000..2e43644d --- /dev/null +++ b/lib/que/adapters/active_record_with_lock.rb @@ -0,0 +1,68 @@ +# frozen_string_literal: true + +module Que + module Adapters + class ActiveRecordWithLock < Que::Adapters::ActiveRecord + def initialize(job_connection_pool:, lock_connection_pool:) + @job_connection_pool = job_connection_pool + @lock_connection_pool = lock_connection_pool + super + end + + def checkout_activerecord_adapter(&block) + checkout_lock_database_connection do + @job_connection_pool.with_connection(&block) + end + end + + def checkout_lock_database_connection(&block) + @lock_connection_pool.with_connection(&block) + end + + def execute(command, params = []) + case command + when :lock_job + queue, cursor = params + lock_job_with_lock_database(queue, cursor) + when :unlock_job + job_id = params[0] + unlock_job(job_id) + else + super + end + end + + def lock_job_with_lock_database(queue, cursor) + result = [] + loop do + result = Que.execute(:find_job_to_lock, [queue, cursor]) + + break if result.empty? + + cursor = result.first["job_id"] + break if pg_try_advisory_lock?(cursor) + end + result + end + + def pg_try_advisory_lock?(job_id) + checkout_lock_database_connection do |conn| + conn.execute( + "SELECT pg_try_advisory_lock(#{job_id})", + ).try(:first)&.fetch("pg_try_advisory_lock") + end + end + + def unlock_job(job_id) + # If for any reason the connection that is used to get this advisory lock + # is corrupted, the lock on this job_id would already be released when the + # connection holding the lock goes bad. + # Now, if a new connection tries to release the non existing lock this would just no op + # by returning false and return a warning "WARNING: you don't own a lock of type ExclusiveLock" + checkout_lock_database_connection do |conn| + conn.execute("SELECT pg_advisory_unlock(#{job_id})") + end + end + end + end +end diff --git a/lib/que/adapters/base.rb b/lib/que/adapters/base.rb index 1295e011..2a06412f 100644 --- a/lib/que/adapters/base.rb +++ b/lib/que/adapters/base.rb @@ -9,6 +9,7 @@ module Adapters autoload :PG, "que/adapters/pg" autoload :Pond, "que/adapters/pond" autoload :Sequel, "que/adapters/sequel" + autoload :ActiveRecordWithLock, "que/adapters/active_record_with_lock" class UnavailableConnection < StandardError; end diff --git a/lib/que/locker.rb b/lib/que/locker.rb index 43f3cc33..f93968ac 100644 --- a/lib/que/locker.rb +++ b/lib/que/locker.rb @@ -121,7 +121,7 @@ def with_locked_job ensure if job observe(UnlockTotal, UnlockSecondsTotal, worked_queue: job[:queue]) do - Que.execute("SELECT pg_advisory_unlock($1)", [job[:job_id]]) + Que.execute(:unlock_job, [job[:job_id]]) end end end diff --git a/lib/que/sql.rb b/lib/que/sql.rb index 9e4c0c92..20496d5a 100644 --- a/lib/que/sql.rb +++ b/lib/que/sql.rb @@ -162,6 +162,30 @@ module Que WHERE locktype = 'advisory' ) pg USING (job_id) }, + + unlock_job: %{ + SELECT pg_advisory_unlock($1) + }, + + find_job_to_lock: %{ + SELECT + queue, + priority, + run_at, + job_id, + job_class, + retryable, + args, + error_count, + extract(epoch from (now() - run_at)) as latency + FROM que_jobs + WHERE queue = $1::text + AND run_at <= now() + AND retryable = true + AND job_id >= $2 + ORDER BY priority, run_at, job_id + LIMIT 1 + }, } # rubocop:enable Style/MutableConstant end diff --git a/spec/active_record_with_lock_spec_helper.rb b/spec/active_record_with_lock_spec_helper.rb new file mode 100644 index 00000000..3c10fd0c --- /dev/null +++ b/spec/active_record_with_lock_spec_helper.rb @@ -0,0 +1,30 @@ +# frozen_string_literal: true + +class LockDatabaseRecord < ActiveRecord::Base + establish_connection( + adapter: "postgresql", + host: ENV.fetch("LOCK_PGHOST", "localhost"), + user: ENV.fetch("LOCK_PGUSER", "postgres"), + password: ENV.fetch("LOCK_PGPASSWORD", "password"), + database: ENV.fetch("LOCK_PGDATABASE", "lock-test"), + port: ENV.fetch("LOCK_PGPORT", 5434), + pool: 5, + ) +end + +class JobRecord < ActiveRecord::Base + establish_connection( + adapter: "postgresql", + host: ENV.fetch("PGHOST", "localhost"), + user: ENV.fetch("PGUSER", "ubuntu"), + password: ENV.fetch("PGPASSWORD", "password"), + database: ENV.fetch("PGDATABASE", "que-test"), + ) +end + +def active_record_with_lock_adapter_connection + Que::Adapters::ActiveRecordWithLock.new( + job_connection_pool: JobRecord.connection_pool, + lock_connection_pool: LockDatabaseRecord.connection_pool, + ) +end diff --git a/spec/lib/que/locker_spec.rb b/spec/lib/que/locker_spec.rb index 610c25db..5e70a8cd 100644 --- a/spec/lib/que/locker_spec.rb +++ b/spec/lib/que/locker_spec.rb @@ -34,7 +34,7 @@ def expect_to_work(job) with_locked_job do |actual_job| expect(actual_job[:job_id]).to eql(job[:job_id]) expect(Que).to receive(:execute). - with("SELECT pg_advisory_unlock($1)", [job[:job_id]]) + with(:unlock_job, [job[:job_id]]) # Destroy the job to simulate the behaviour of the queue, and allow our lock query # to discover new jobs. diff --git a/spec/spec_helper.rb b/spec/spec_helper.rb index 3e45ff11..1a096153 100644 --- a/spec/spec_helper.rb +++ b/spec/spec_helper.rb @@ -13,6 +13,7 @@ require_relative "helpers/sleep_job" require_relative "helpers/interruptible_sleep_job" require_relative "helpers/user" +require_relative "active_record_with_lock_spec_helper" def postgres_now ActiveRecord::Base.connection.execute("SELECT NOW();")[0]["now"] @@ -22,8 +23,8 @@ def establish_database_connection ActiveRecord::Base.establish_connection( adapter: "postgresql", host: ENV.fetch("PGHOST", "localhost"), - user: ENV.fetch("PGUSER", "postgres"), - password: ENV.fetch("PGPASSWORD", ""), + user: ENV.fetch("PGUSER", "ubuntu"), + password: ENV.fetch("PGPASSWORD", "password"), database: ENV.fetch("PGDATABASE", "que-test"), ) end @@ -31,7 +32,12 @@ def establish_database_connection establish_database_connection # Make sure our test database is prepared to run Que -Que.connection = ActiveRecord +Que.connection = + case ENV["ADAPTER"] + when "ActiveRecordWithLock" then active_record_with_lock_adapter_connection + else ActiveRecord + end + Que.migrate! # Ensure we have a logger, so that we can test the code paths that log