diff --git a/lib/que/adapters/active_record_with_lock.rb b/lib/que/adapters/active_record_with_lock.rb index 7dbd41b..f23ad21 100644 --- a/lib/que/adapters/active_record_with_lock.rb +++ b/lib/que/adapters/active_record_with_lock.rb @@ -3,18 +3,19 @@ module Que module Adapters class ActiveRecordWithLock < Que::Adapters::ActiveRecord - class NoLockableJobs < StandardError; end - FindJobSecondsTotal = Prometheus::Client::Counter.new( - :que_find_job_seconds_total, - docstring: "Seconds spent finding a job", - labels: %i[queue], - ) + METRICS = [ + FindJobSecondsTotal = Prometheus::Client::Counter.new( + :que_find_job_seconds_total, + docstring: "Seconds spent finding a job", + labels: %i[queue], + ), - FindJobHitTotal = Prometheus::Client::Counter.new( - :que_find_job_total, - docstring: "total number of job hit and misses when acquiring a lock", - labels: %i[queue job_hit], - ) + FindJobHitTotal = Prometheus::Client::Counter.new( + :que_find_job_hit_total, + docstring: "total number of job hit and misses when acquiring a lock", + labels: %i[queue job_hit], + ), + ].freeze def initialize(job_connection_pool:, lock_connection_pool:) @job_connection_pool = job_connection_pool @@ -50,9 +51,8 @@ def execute(command, params = []) def lock_job_with_lock_database(queue, cursor) loop do observe(duration_metric: FindJobSecondsTotal, labels: { queue: queue }) do - locked_job = Que.transaction do + Que.transaction do job_to_lock = Que.execute(:find_job_to_lock, [queue, cursor]) - return job_to_lock if job_to_lock.empty? cursor = job_to_lock.first["job_id"] @@ -61,7 +61,6 @@ def lock_job_with_lock_database(queue, cursor) observe(count_metric: FindJobHitTotal, labels: { queue: queue, job_hit: job_locked }) return job_to_lock if job_locked end - return locked_job if locked_job end end end diff --git a/lib/que/middleware/worker_collector.rb b/lib/que/middleware/worker_collector.rb index 9d498b9..c90fb70 100644 --- a/lib/que/middleware/worker_collector.rb +++ b/lib/que/middleware/worker_collector.rb @@ -18,6 +18,7 @@ def initialize(app, options = {}) register(*WorkerGroup::METRICS) register(*Worker::METRICS) register(*Locker::METRICS) + register(*Adapters::ActiveRecordWithLock::METRICS) end def call(env) diff --git a/spec/active_record_with_lock_spec_helper.rb b/spec/active_record_with_lock_spec_helper.rb index 3c10fd0..66fdcd4 100644 --- a/spec/active_record_with_lock_spec_helper.rb +++ b/spec/active_record_with_lock_spec_helper.rb @@ -28,3 +28,4 @@ def active_record_with_lock_adapter_connection lock_connection_pool: LockDatabaseRecord.connection_pool, ) end + diff --git a/spec/lib/que/adapters/active_record_with_lock_spec.rb b/spec/lib/que/adapters/active_record_with_lock_spec.rb index 73607e9..92a169a 100644 --- a/spec/lib/que/adapters/active_record_with_lock_spec.rb +++ b/spec/lib/que/adapters/active_record_with_lock_spec.rb @@ -2,10 +2,10 @@ require "spec_helper" -RSpec.describe Que::Adapters::ActiveRecordWithLock do +RSpec.describe Que::Adapters::ActiveRecordWithLock, :active_record_with_lock do subject(:adapter) do described_class.new(job_connection_pool: JobRecord.connection_pool, - lock_connection_pool: LockDatabaseRecord.connection_pool) + lock_connection_pool: lock_connection_pool) end around do |example| @@ -21,6 +21,15 @@ end end + let(:lock_connection_pool) do + return LockDatabaseRecord.connection_pool if ENV["ADAPTER"] == "ActiveRecordWithLock" + + # For CI run default adapter have only 1 databse defined. + # Since we are not using any filters to run this spec + # We need to point job and lock connections to the same database + JobRecord.connection_pool + end + before do described_class::FindJobHitTotal.values.each { |labels, _| labels.clear } end @@ -33,7 +42,9 @@ end it "sets correct metric values" do + expect(QueJob.count).to eq(10) with_workers(5) { wait_for_jobs_to_be_worked } + expect(QueJob.count).to eq(0) expect(described_class::FindJobHitTotal.values[{ :queue => "default", :job_hit => "true" }]).to eq(10.0) end end