Skip to content

Commit

Permalink
Fix mysql deadlocks while reserving jobs
Browse files Browse the repository at this point in the history
- We frequently see deadlock errors when using mysql and a large number
  of workers
- This PR adds a fix for the deadlocks as well as modifying a test to
  exposes the deadlock issue in the previous implementation
- This implementation is based on the `reserve` implementation from the
  default active_record backend:
  https://github.com/collectiveidea/delayed_job_active_record/blob/42a68ed79917e14244a25a0a77fef0924bef3e97/lib/delayed/backend/active_record.rb#L64
- You can see the original error by pulling in the new test and running:
  `DEBUG=true DB=mysql bundle exec rspec spec/delayed/backend/sequel_spec.rb:27`
  • Loading branch information
ljfranklin committed Aug 2, 2017
1 parent 9ce5c2f commit 3a90c72
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 5 deletions.
25 changes: 24 additions & 1 deletion lib/delayed/backend/sequel.rb
Original file line number Diff line number Diff line change
Expand Up @@ -44,14 +44,28 @@ def self.clear_locks!(worker_name)
filter(:locked_by => worker_name).update(:locked_by => nil, :locked_at => nil)
end

# adapted from
# https://github.com/collectiveidea/delayed_job_active_record/blob/master/lib/delayed/backend/active_record.rb
def self.reserve(worker, max_run_time = Worker.max_run_time)
ds = ready_to_run(worker.name, max_run_time)

ds = ds.filter(::Sequel.lit("priority >= ?", Worker.min_priority)) if Worker.min_priority
ds = ds.filter(::Sequel.lit("priority <= ?", Worker.max_priority)) if Worker.max_priority
ds = ds.filter(:queue => Worker.queues) if Worker.queues.any?
ds = ds.by_priority
ds = ds.for_update

case ::Sequel::Model.db.database_type
when :mysql
lock_with_read_ahead(ds, worker)
else
lock_with_for_update(ds, worker)
end
end

# Lock a single job using SELECT ... FOR UPDATE.
# More performant but may cause deadlocks in some databases.
def self.lock_with_for_update(ds, worker)
ds = ds.for_update
db.transaction do
if job = ds.first
job.locked_at = self.db_time_now
Expand All @@ -62,6 +76,15 @@ def self.reserve(worker, max_run_time = Worker.max_run_time)
end
end

# Fetch up-to `worker.read_ahead` jobs, try to lock one at a time.
# This query is more conservative as it does not acquire any DB read locks.
def self.lock_with_read_ahead(ds, worker)
ds.limit(worker.read_ahead).detect do |job|
count = ds.where(id: job.id).update(locked_at: self.db_time_now, locked_by: worker.name)
count == 1 && job.reload
end
end

# Get the current time (GMT or local depending on DB)
# Note: This does not ping the DB to get the time, so all your clients
# must have syncronized clocks.
Expand Down
21 changes: 17 additions & 4 deletions spec/delayed/backend/sequel_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -14,20 +14,33 @@

it "does not allow more than 1 worker to grab the same job" do
expect do
10.times do
jobs_to_run = 200
workers_to_run = 20
jobs_per_worker = jobs_to_run/workers_to_run

jobs_to_run.times do
described_class.create(payload_object: SimpleJob.new)
end

20.times.map do |i|
workers_to_run.times.map do |i|
Thread.new do
worker = Delayed::Worker.new
worker.name = "worker_#{i}"
worker.work_off(4)

# Ensure each worker performs the expected number of jobs as
# `work_off` will ocassionally perform less than the requested number
# if it is unable to lock a job within the `worker.read_ahead` limit
jobs_completed_by_this_worker = 0
while jobs_completed_by_this_worker < jobs_per_worker do
successes, failures = worker.work_off(jobs_per_worker - jobs_completed_by_this_worker)
expect(failures).to eq(0), "Expected zero failures, got #{failures}"
jobs_completed_by_this_worker += successes
end
end
end.map(&:join)
end.not_to raise_error

expect(Delayed::Job.count).to be < 10
expect(Delayed::Job.count).to eql 0
end

context ".count" do
Expand Down

0 comments on commit 3a90c72

Please sign in to comment.