Skip to content

Commit

Permalink
Fix mysql deadlocks while reserving jobs
Browse files Browse the repository at this point in the history
- We frequently see deadlock errors when using mysql and a large number
  of workers
- This PR adds a fix for the deadlocks as well as modifying a test to
  exposes the deadlock issue in the previous implementation
- This implementation is based on the `reserve` implementation from the
  default active_record backend:
  https://github.com/collectiveidea/delayed_job_active_record/blob/42a68ed79917e14244a25a0a77fef0924bef3e97/lib/delayed/backend/active_record.rb#L64
- You can see the original error by pulling in the new test and running:
  `DEBUG=true DB=mysql bundle exec rspec spec/delayed/backend/sequel_spec.rb:27`
  • Loading branch information
ljfranklin committed Jul 26, 2017
1 parent 9ce5c2f commit 4fa95cb
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 13 deletions.
18 changes: 9 additions & 9 deletions lib/delayed/backend/sequel.rb
Original file line number Diff line number Diff line change
Expand Up @@ -44,21 +44,21 @@ def self.clear_locks!(worker_name)
filter(:locked_by => worker_name).update(:locked_by => nil, :locked_at => nil)
end

# adapted from
# https://github.com/collectiveidea/delayed_job_active_record/blob/master/lib/delayed/backend/active_record.rb
def self.reserve(worker, max_run_time = Worker.max_run_time)
ds = ready_to_run(worker.name, max_run_time)

ds = ds.filter(::Sequel.lit("priority >= ?", Worker.min_priority)) if Worker.min_priority
ds = ds.filter(::Sequel.lit("priority <= ?", Worker.max_priority)) if Worker.max_priority
ds = ds.filter(:queue => Worker.queues) if Worker.queues.any?
ds = ds.by_priority
ds = ds.for_update

db.transaction do
if job = ds.first
job.locked_at = self.db_time_now
job.locked_by = worker.name
job.save(:raise_on_failure => true)
job
end

# some initially available jobs may have since been locked by another worker,
# so try up to `worker.read_ahead` jobs before giving up
ds.limit(worker.read_ahead).detect do |job|
count = ds.where(id: job.id).update(locked_at: self.db_time_now, locked_by: worker.name)
count == 1 && job.reload
end
end

Expand Down
22 changes: 18 additions & 4 deletions spec/delayed/backend/sequel_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -14,20 +14,34 @@

it "does not allow more than 1 worker to grab the same job" do
expect do
10.times do
jobs_to_run = 200
workers_to_run = 20
jobs_per_worker = jobs_to_run/workers_to_run

jobs_to_run.times do
described_class.create(payload_object: SimpleJob.new)
end

20.times.map do |i|
Delayed::Backend::Sequel::Job.before_fork
workers_to_run.times.map do |i|
Thread.new do
worker = Delayed::Worker.new
worker.name = "worker_#{i}"
worker.work_off(4)

# Ensure each worker performs the expected number of jobs as
# `work_off` will ocassionally perform less than the requested number
# if it is unable to lock a job within the `worker.read_ahead` limit
jobs_completed_by_this_worker = 0
while jobs_completed_by_this_worker < jobs_per_worker do
successes, failures = worker.work_off(jobs_per_worker - jobs_completed_by_this_worker)
expect(failures).to eq(0), "Expected zero failures, got #{failures}"
jobs_completed_by_this_worker += successes
end
end
end.map(&:join)
end.not_to raise_error

expect(Delayed::Job.count).to be < 10
expect(Delayed::Job.count).to eql 0
end

context ".count" do
Expand Down

0 comments on commit 4fa95cb

Please sign in to comment.