From bfb8d8338d45b992c9d5d1376bca2ac7b82a8cbe Mon Sep 17 00:00:00 2001 From: Ankitha Damodara Date: Fri, 5 Jul 2024 10:35:21 +0100 Subject: [PATCH] fix formatting --- lib/que/adapters/active_record_with_lock.rb | 116 ++++++++++---------- lib/que/locker.rb | 2 +- spec/lib/que/locker_spec.rb | 7 +- spec/lib/que/worker_spec.rb | 4 +- 4 files changed, 62 insertions(+), 67 deletions(-) diff --git a/lib/que/adapters/active_record_with_lock.rb b/lib/que/adapters/active_record_with_lock.rb index 0f43887..dbd6293 100644 --- a/lib/que/adapters/active_record_with_lock.rb +++ b/lib/que/adapters/active_record_with_lock.rb @@ -2,73 +2,73 @@ # https://github.com/que-rb/que/blob/80d6067861a41766c3adb7e29b230ce93d94c8a4/lib/que/active_job/extensions.rb module Que - module Adapters - class ActiveRecordWithLock < Que::Adapters::ActiveRecord - attr_accessor :job_connection_pool, :lock_record - def initialize(job_connection_pool:, lock_record:) - @job_connection_pool = job_connection_pool - @lock_record = lock_record - super - end + module Adapters + class ActiveRecordWithLock < Que::Adapters::ActiveRecord + def initialize(job_connection_pool:, lock_record:) + @job_connection_pool = job_connection_pool + @lock_record = lock_record + super + end - def checkout_activerecord_adapter(&block) - @job_connection_pool.with_connection(&block) - end + def checkout_activerecord_adapter(&block) + @job_connection_pool.with_connection(&block) + end - def lock_database_connection - if Thread.current[:db_connection] - return Thread.current[:db_connection] if Thread.current[:db_connection].active? - end - # We are storing this in thread variable here to make sure - # same connection is used to acquire and release the advisory locks. - # Advisory lock will not be released if any other connection from the - # pool tries to release the lock - Thread.current[:db_connection] = @lock_record.connection + def lock_database_connection + if Thread.current[:db_connection] + return Thread.current[:db_connection] if Thread.current[:db_connection].active? end + # We are storing this in thread variable here to make sure + # same connection is used to acquire and release the advisory locks. + # Advisory lock will not be released if any other connection from the + # pool tries to release the lock + Thread.current[:db_connection] = @lock_record.connection + end - def execute(command, params=[]) - case command - when :lock_job then - queue, cursor = params - lock_job_with_lock_database(queue, cursor) - when :unlock_job then - job_id = params[0] - unlock_job(job_id) - else - super(command, params) - end + def execute(command, params = []) + case command + when :lock_job + queue, cursor = params + lock_job_with_lock_database(queue, cursor) + when :unlock_job + job_id = params[0] + unlock_job(job_id) + else + super(command, params) end + end - def lock_job_with_lock_database(queue, cursor) - result = [] - loop do - result = Que.execute(:find_job_to_lock, [queue, cursor]) - break if result.empty? - cursor = result.first['job_id'] - if pg_try_advisory_lock?(cursor) - break - end - end - return result - end + def lock_job_with_lock_database(queue, cursor) + result = [] + loop do + result = Que.execute(:find_job_to_lock, [queue, cursor]) + break if result.empty? - def cleanup! - @job_connection_pool.release_connection - @lock_record.remove_connection + cursor = result.first["job_id"] + break if pg_try_advisory_lock?(cursor) end + result + end - def pg_try_advisory_lock?(job_id) - lock_database_connection.execute("SELECT pg_try_advisory_lock(#{job_id})").try(:first)&.fetch('pg_try_advisory_lock') - end + def cleanup! + @job_connection_pool.release_connection + @lock_record.remove_connection + end - def unlock_job(job_id) - # If for any reason the connection that is used to get this advisory lock - # is corrupted, the lock on this job_id would already be released when the - # connection holding the lock goes bad. - # Now, if a new connection tries to release the non existing lock this would just no op - # by returning false and return a warning "WARNING: you don't own a lock of type ExclusiveLock" - lock_database_connection.execute("SELECT pg_advisory_unlock(#{job_id})") - end + def pg_try_advisory_lock?(job_id) + lock_database_connection.execute( + "SELECT pg_try_advisory_lock(#{job_id})", + ).try(:first)&.fetch("pg_try_advisory_lock") + end + + def unlock_job(job_id) + # If for any reason the connection that is used to get this advisory lock + # is corrupted, the lock on this job_id would already be released when the + # connection holding the lock goes bad. + # Now, if a new connection tries to release the non existing lock this would just no op + # by returning false and return a warning "WARNING: you don't own a lock of type ExclusiveLock" + lock_database_connection.execute("SELECT pg_advisory_unlock(#{job_id})") end end -end \ No newline at end of file + end +end diff --git a/lib/que/locker.rb b/lib/que/locker.rb index 27e2f90..442a0b7 100644 --- a/lib/que/locker.rb +++ b/lib/que/locker.rb @@ -153,7 +153,7 @@ def lock_job_query(queue, cursor) def handle_expired_cursors! @consolidated_queues.each do |queue| - queue_cursor_expires_at = @queue_expires_at.fetch(queue, monotonic_now) + queue_cursor_expires_at = @queue_expires_at.fetch(queue, monotonic_now) reset_cursor_for!(queue) if queue_cursor_expires_at < monotonic_now end end diff --git a/spec/lib/que/locker_spec.rb b/spec/lib/que/locker_spec.rb index afaa665..5e70a8c 100644 --- a/spec/lib/que/locker_spec.rb +++ b/spec/lib/que/locker_spec.rb @@ -13,8 +13,6 @@ let(:queue) { "default" } let(:cursor_expiry) { 0 } - - describe ".with_locked_job" do before { allow(Que).to receive(:execute).and_call_original } @@ -60,9 +58,6 @@ def expect_to_lock_with(cursor:) end context "with just one job to lock" do - before do - described_class.instance_variable_set(:@queue_cursors, [0]) - end let!(:job_1) { FakeJob.enqueue(1, queue: queue, priority: 1).attrs } let(:cursor_expiry) { 60 } @@ -124,7 +119,7 @@ def expect_to_lock_with(cursor:) expect_to_lock_with(cursor: job_1[:job_id]) expect_to_work(job_2) - @epoch += (cursor_expiry) # our cursor should now expire + @epoch += cursor_expiry # our cursor should now expire expect_to_lock_with(cursor: 0) expect_to_work(job_3) end diff --git a/spec/lib/que/worker_spec.rb b/spec/lib/que/worker_spec.rb index da2ce1b..400ec0f 100644 --- a/spec/lib/que/worker_spec.rb +++ b/spec/lib/que/worker_spec.rb @@ -221,7 +221,7 @@ FakeJob.enqueue(1) expect(Que). - to receive(:execute).with(:lock_job, include("default", 0)). + to receive(:execute).with(:lock_job, ["default", 0]). and_raise(ActiveRecord::ConnectionTimeoutError) expect(work).to eq(:postgres_error) end @@ -232,7 +232,7 @@ FakeJob.enqueue(1) expect(Que). - to receive(:execute).with(:lock_job, include("default", 0)). + to receive(:execute).with(:lock_job, ["default", 0]). and_raise(ActiveRecord::ConnectionNotEstablished) expect(work).to eq(:postgres_error) end