Skip to content

Commit

Permalink
Merge pull request #103 from gocardless/add-lock-databse
Browse files Browse the repository at this point in the history
Add a new adapter to work the job with multiple database
  • Loading branch information
ankithads authored Aug 14, 2024
2 parents 8519451 + 99061e9 commit 851b629
Show file tree
Hide file tree
Showing 9 changed files with 191 additions and 10 deletions.
61 changes: 56 additions & 5 deletions .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ name: tests
on:
push:

jobs:
jobs:
rubocop:
runs-on: ubuntu-latest
env:
Expand All @@ -24,7 +24,6 @@ jobs:
fail-fast: false
matrix:
ruby_version: ["3.0", "3.1", "3.2", "3.3"]

runs-on: ubuntu-latest
services:
postgres:
Expand All @@ -40,7 +39,6 @@ jobs:
--health-interval 10s
--health-timeout 5s
--health-retries 10
env:
PGDATABASE: que-test
PGUSER: ubuntu
Expand All @@ -63,7 +61,6 @@ jobs:
fail-fast: false
matrix:
ruby_version: ["3.0", "3.1", "3.2", "3.3"]

runs-on: ubuntu-latest
services:
postgres:
Expand All @@ -79,7 +76,6 @@ jobs:
--health-interval 10s
--health-timeout 5s
--health-retries 10
env:
PGDATABASE: que-test
PGUSER: ubuntu
Expand All @@ -96,3 +92,58 @@ jobs:
- name: Run specs
run: |
bundle exec rspec
active_record_with_lock_adapter_rspec:
strategy:
fail-fast: false
matrix:
ruby_version: ["3.0", "3.1", "3.2", "3.3"]
runs-on: ubuntu-latest
services:
postgres:
image: postgres:14.2
env:
POSTGRES_DB: que-test
POSTGRES_USER: ubuntu
POSTGRES_PASSWORD: password
ports:
- 5432:5432
options: >-
--health-cmd pg_isready
--health-interval 10s
--health-timeout 5s
--health-retries 10
lock_database:
image: postgres:14.2
env:
POSTGRES_DB: lock-test
POSTGRES_USER: ubuntu
POSTGRES_PASSWORD: password
ports:
- 5434:5432
options: >-
--health-cmd pg_isready
--health-interval 10s
--health-timeout 5s
--health-retries 10
env:
PGDATABASE: que-test
PGUSER: ubuntu
PGPASSWORD: password
PGHOST: localhost
BUNDLE_RUBYGEMS__PKG__GITHUB__COM: gocardless-robot-readonly:${{ secrets.GITHUB_TOKEN }}
LOCK_PGDATABASE: lock-test
LOCK_PGUSER: ubuntu
LOCK_PGPASSWORD: password
LOCK_PGHOST: localhost
ADAPTER: ActiveRecordWithLock
steps:
- uses: actions/checkout@v4
- name: Set up Ruby
uses: ruby/setup-ruby@v1
with:
bundler-cache: true
ruby-version: "${{ matrix.ruby-version }}"
- name: Run Specs With ActiveRecordWithLock Adapter
run: bundle exec rspec

1 change: 1 addition & 0 deletions lib/que.rb
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ def connection=(connection)
Adapters::ActiveRecord.new
else
case connection.class.to_s
when "Que::Adapters::ActiveRecordWithLock" then connection
when "Sequel::Postgres::Database" then Adapters::Sequel.new(connection)
when "ConnectionPool" then Adapters::ConnectionPool.new(connection)
when "PG::Connection" then Adapters::PG.new(connection)
Expand Down
68 changes: 68 additions & 0 deletions lib/que/adapters/active_record_with_lock.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
# frozen_string_literal: true

module Que
module Adapters
class ActiveRecordWithLock < Que::Adapters::ActiveRecord
def initialize(job_connection_pool:, lock_connection_pool:)
@job_connection_pool = job_connection_pool
@lock_connection_pool = lock_connection_pool
super
end

def checkout_activerecord_adapter(&block)
checkout_lock_database_connection do
@job_connection_pool.with_connection(&block)
end
end

def checkout_lock_database_connection(&block)
@lock_connection_pool.with_connection(&block)
end

def execute(command, params = [])
case command
when :lock_job
queue, cursor = params
lock_job_with_lock_database(queue, cursor)
when :unlock_job
job_id = params[0]
unlock_job(job_id)
else
super
end
end

def lock_job_with_lock_database(queue, cursor)
result = []
loop do
result = Que.execute(:find_job_to_lock, [queue, cursor])

break if result.empty?

cursor = result.first["job_id"]
break if pg_try_advisory_lock?(cursor)
end
result
end

def pg_try_advisory_lock?(job_id)
checkout_lock_database_connection do |conn|
conn.execute(
"SELECT pg_try_advisory_lock(#{job_id})",
).try(:first)&.fetch("pg_try_advisory_lock")
end
end

def unlock_job(job_id)
# If for any reason the connection that is used to get this advisory lock
# is corrupted, the lock on this job_id would already be released when the
# connection holding the lock goes bad.
# Now, if a new connection tries to release the non existing lock this would just no op
# by returning false and return a warning "WARNING: you don't own a lock of type ExclusiveLock"
checkout_lock_database_connection do |conn|
conn.execute("SELECT pg_advisory_unlock(#{job_id})")
end
end
end
end
end
1 change: 1 addition & 0 deletions lib/que/adapters/base.rb
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ module Adapters
autoload :PG, "que/adapters/pg"
autoload :Pond, "que/adapters/pond"
autoload :Sequel, "que/adapters/sequel"
autoload :ActiveRecordWithLock, "que/adapters/active_record_with_lock"

class UnavailableConnection < StandardError; end

Expand Down
2 changes: 1 addition & 1 deletion lib/que/locker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ def with_locked_job
ensure
if job
observe(UnlockTotal, UnlockSecondsTotal, worked_queue: job[:queue]) do
Que.execute("SELECT pg_advisory_unlock($1)", [job[:job_id]])
Que.execute(:unlock_job, [job[:job_id]])
end
end
end
Expand Down
24 changes: 24 additions & 0 deletions lib/que/sql.rb
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,30 @@ module Que
WHERE locktype = 'advisory'
) pg USING (job_id)
},

unlock_job: %{
SELECT pg_advisory_unlock($1)
},

find_job_to_lock: %{
SELECT
queue,
priority,
run_at,
job_id,
job_class,
retryable,
args,
error_count,
extract(epoch from (now() - run_at)) as latency
FROM que_jobs
WHERE queue = $1::text
AND run_at <= now()
AND retryable = true
AND job_id >= $2
ORDER BY priority, run_at, job_id
LIMIT 1
},
}
# rubocop:enable Style/MutableConstant
end
30 changes: 30 additions & 0 deletions spec/active_record_with_lock_spec_helper.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
# frozen_string_literal: true

class LockDatabaseRecord < ActiveRecord::Base
establish_connection(
adapter: "postgresql",
host: ENV.fetch("LOCK_PGHOST", "localhost"),
user: ENV.fetch("LOCK_PGUSER", "postgres"),
password: ENV.fetch("LOCK_PGPASSWORD", "password"),
database: ENV.fetch("LOCK_PGDATABASE", "lock-test"),
port: ENV.fetch("LOCK_PGPORT", 5434),
pool: 5,
)
end

class JobRecord < ActiveRecord::Base
establish_connection(
adapter: "postgresql",
host: ENV.fetch("PGHOST", "localhost"),
user: ENV.fetch("PGUSER", "ubuntu"),
password: ENV.fetch("PGPASSWORD", "password"),
database: ENV.fetch("PGDATABASE", "que-test"),
)
end

def active_record_with_lock_adapter_connection
Que::Adapters::ActiveRecordWithLock.new(
job_connection_pool: JobRecord.connection_pool,
lock_connection_pool: LockDatabaseRecord.connection_pool,
)
end
2 changes: 1 addition & 1 deletion spec/lib/que/locker_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ def expect_to_work(job)
with_locked_job do |actual_job|
expect(actual_job[:job_id]).to eql(job[:job_id])
expect(Que).to receive(:execute).
with("SELECT pg_advisory_unlock($1)", [job[:job_id]])
with(:unlock_job, [job[:job_id]])

# Destroy the job to simulate the behaviour of the queue, and allow our lock query
# to discover new jobs.
Expand Down
12 changes: 9 additions & 3 deletions spec/spec_helper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
require_relative "helpers/sleep_job"
require_relative "helpers/interruptible_sleep_job"
require_relative "helpers/user"
require_relative "active_record_with_lock_spec_helper"

def postgres_now
ActiveRecord::Base.connection.execute("SELECT NOW();")[0]["now"]
Expand All @@ -22,16 +23,21 @@ def establish_database_connection
ActiveRecord::Base.establish_connection(
adapter: "postgresql",
host: ENV.fetch("PGHOST", "localhost"),
user: ENV.fetch("PGUSER", "postgres"),
password: ENV.fetch("PGPASSWORD", ""),
user: ENV.fetch("PGUSER", "ubuntu"),
password: ENV.fetch("PGPASSWORD", "password"),
database: ENV.fetch("PGDATABASE", "que-test"),
)
end

establish_database_connection

# Make sure our test database is prepared to run Que
Que.connection = ActiveRecord
Que.connection =
case ENV["ADAPTER"]
when "ActiveRecordWithLock" then active_record_with_lock_adapter_connection
else ActiveRecord
end

Que.migrate!

# Ensure we have a logger, so that we can test the code paths that log
Expand Down

0 comments on commit 851b629

Please sign in to comment.