Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce poll-interval-variance option #431

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Gemfile
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ group :test do

gem 'pry'
gem 'pg_examiner', '~> 0.5.2'

gem 'timecop', '~> 0.9.10'
end

gemspec
29 changes: 20 additions & 9 deletions lib/que/command_line_interface.rb
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,15 @@ def parse(
default_require_file: RAILS_ENVIRONMENT_FILE
)

options = {}
queues = []
log_level = 'info'
log_internals = false
poll_interval = 5
connection_url = nil
worker_count = nil
worker_priorities = nil
options = {}
queues = []
log_level = 'info'
log_internals = false
poll_interval = 5
poll_interval_variance = 0
connection_url = nil
worker_count = nil
worker_priorities = nil

parser =
OptionParser.new do |opts|
Expand All @@ -50,6 +51,15 @@ def parse(
poll_interval = i
end

opts.on(
'-j',
'--poll-interval-variance [INTERVAL]',
Float,
"Set maximum variance in poll interval, in seconds (default: 0)",
) do |j|
poll_interval_variance = j.to_f
end

opts.on(
'--listen [LISTEN]',
String,
Expand Down Expand Up @@ -232,7 +242,8 @@ def parse(
options[:queues] = queues_hash
end

options[:poll_interval] = poll_interval
options[:poll_interval] = poll_interval
options[:poll_interval_variance] = poll_interval_variance

locker =
begin
Expand Down
56 changes: 31 additions & 25 deletions lib/que/locker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ class << self
}

class Locker
attr_reader :thread, :workers, :job_buffer, :locks, :queues, :poll_interval
attr_reader :thread, :workers, :job_buffer, :locks, :queues, :poll_interval, :poll_interval_variance

MESSAGE_RESOLVERS = {}
RESULT_RESOLVERS = {}
Expand All @@ -47,22 +47,24 @@ class Locker
RESULT_RESOLVERS[:job_finished] =
-> (messages) { finish_jobs(messages.map{|m| m.fetch(:metajob)}) }

DEFAULT_POLL_INTERVAL = 5.0
DEFAULT_WAIT_PERIOD = 50
DEFAULT_MAXIMUM_BUFFER_SIZE = 8
DEFAULT_WORKER_PRIORITIES = [10, 30, 50, nil, nil, nil].freeze
DEFAULT_POLL_INTERVAL = 5.0
DEFAULT_POLL_INTERVAL_VARIANCE = 0.0
DEFAULT_WAIT_PERIOD = 50
DEFAULT_MAXIMUM_BUFFER_SIZE = 8
DEFAULT_WORKER_PRIORITIES = [10, 30, 50, nil, nil, nil].freeze

def initialize(
queues: [Que.default_queue],
connection_url: nil,
listen: true,
poll: true,
poll_interval: DEFAULT_POLL_INTERVAL,
wait_period: DEFAULT_WAIT_PERIOD,
maximum_buffer_size: DEFAULT_MAXIMUM_BUFFER_SIZE,
worker_priorities: DEFAULT_WORKER_PRIORITIES,
on_worker_start: nil,
pidfile: nil
queues: [Que.default_queue],
connection_url: nil,
listen: true,
poll: true,
poll_interval: DEFAULT_POLL_INTERVAL,
poll_interval_variance: DEFAULT_POLL_INTERVAL_VARIANCE,
wait_period: DEFAULT_WAIT_PERIOD,
maximum_buffer_size: DEFAULT_MAXIMUM_BUFFER_SIZE,
worker_priorities: DEFAULT_WORKER_PRIORITIES,
on_worker_start: nil,
pidfile: nil
)

# Sanity-check all our arguments, since some users may instantiate Locker
Expand All @@ -71,6 +73,7 @@ def initialize(
Que.assert [TrueClass, FalseClass], poll

Que.assert Numeric, poll_interval
Que.assert Numeric, poll_interval_variance
Que.assert Numeric, wait_period

Que.assert Array, worker_priorities
Expand All @@ -94,20 +97,22 @@ def initialize(

Que.internal_log :locker_instantiate, self do
{
queues: queues,
listen: listen,
poll: poll,
poll_interval: poll_interval,
wait_period: wait_period,
maximum_buffer_size: maximum_buffer_size,
worker_priorities: worker_priorities,
queues: queues,
listen: listen,
poll: poll,
poll_interval: poll_interval,
poll_interval_variance: poll_interval_variance,
wait_period: wait_period,
maximum_buffer_size: maximum_buffer_size,
worker_priorities: worker_priorities,
}
end

# Local cache of which advisory locks are held by this connection.
@locks = Set.new

@poll_interval = poll_interval
@poll_interval_variance = poll_interval_variance

if queues.is_a?(Hash)
@queue_names = queues.keys
Expand Down Expand Up @@ -204,9 +209,10 @@ def initialize(
if poll
@queues.map do |queue_name, interval|
Poller.new(
connection: @connection,
queue: queue_name,
poll_interval: interval,
connection: @connection,
queue: queue_name,
poll_interval: interval,
poll_interval_variance: poll_interval_variance,
)
end
end
Expand Down
50 changes: 31 additions & 19 deletions lib/que/poller.rb
Original file line number Diff line number Diff line change
Expand Up @@ -116,25 +116,32 @@ class Poller
:connection,
:queue,
:poll_interval,
:poll_interval_variance,
:last_polled_at,
:last_poll_satisfied
:last_poll_satisfied,
:next_poll_at

def initialize(
connection:,
queue:,
poll_interval:
poll_interval:,
poll_interval_variance:
)
@connection = connection
@queue = queue
@poll_interval = poll_interval
@connection = connection
@queue = queue
@poll_interval = poll_interval
@poll_interval_variance = poll_interval_variance

@last_polled_at = nil
@last_poll_satisfied = nil
@next_poll_at = Time.now

Que.internal_log :poller_instantiate, self do
{
backend_pid: connection.backend_pid,
queue: queue,
poll_interval: poll_interval,
backend_pid: connection.backend_pid,
queue: queue,
poll_interval: poll_interval,
poll_interval_variance: poll_interval_variance,
}
end
end
Expand All @@ -158,31 +165,36 @@ def poll(

@last_polled_at = Time.now
@last_poll_satisfied = poll_satisfied?(priorities, jobs)
@next_poll_at = last_polled_at +
poll_interval +
rand(-poll_interval_variance..poll_interval_variance)

Que.internal_log :poller_polled, self do
{
queue: @queue,
locked: jobs.count,
priorities: priorities,
held_locks: held_locks.to_a,
newly_locked: jobs.map { |key| key.fetch(:id) },
queue: @queue,
locked: jobs.count,
priorities: priorities,
held_locks: held_locks.to_a,
newly_locked: jobs.map { |key| key.fetch(:id) },
last_polled_at: last_polled_at,
last_poll_satisfied: last_poll_satisfied,
next_poll_at: next_poll_at,
}
end

jobs.map! { |job| Metajob.new(job) }
end

def should_poll?
# polling is disabled for this queue
return false if poll_interval.nil?

# Never polled before?
last_poll_satisfied.nil? ||
# Plenty of jobs were available last time?
last_poll_satisfied == true ||
poll_interval_elapsed?
end

def poll_interval_elapsed?
return unless interval = poll_interval
(Time.now - last_polled_at) > interval
# It's due time to poll again regardless of the last poll's results?
next_poll_at < Time.now
end

class << self
Expand Down
1 change: 1 addition & 0 deletions spec/gemfiles/Gemfile-rails-6.0
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,5 @@ group :test do
gem 'minitest-hooks', '1.4.0'
gem 'pry'
gem 'pg_examiner', '~> 0.5.2'
gem 'timecop', '~> 0.9.10'
end
1 change: 1 addition & 0 deletions spec/gemfiles/Gemfile-rails-6.1
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,5 @@ group :test do
gem 'minitest-hooks', '1.4.0'
gem 'pry'
gem 'pg_examiner', '~> 0.5.2'
gem 'timecop', '~> 0.9.10'
end
1 change: 1 addition & 0 deletions spec/gemfiles/Gemfile-rails-7.0
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,5 @@ group :test do
gem 'minitest-hooks', '1.4.0'
gem 'pry'
gem 'pg_examiner', '~> 0.5.2'
gem 'timecop', '~> 0.9.10'
end
1 change: 1 addition & 0 deletions spec/gemfiles/Gemfile-rails-7.1
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,5 @@ group :test do
gem 'minitest-hooks', '1.4.0'
gem 'pry'
gem 'pg_examiner', '~> 0.5.2'
gem 'timecop', '~> 0.9.10'
end
1 change: 1 addition & 0 deletions spec/gemfiles/Gemfile-rails-7.2
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,5 @@ group :test do
gem 'minitest-hooks', '1.4.0'
gem 'pry'
gem 'pg_examiner', '~> 0.5.2'
gem 'timecop', '~> 0.9.10'
end
24 changes: 17 additions & 7 deletions spec/que/command_line_interface_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,7 @@ def write_file
def assert_locker_instantiated(
worker_priorities: [10, 30, 50, nil, nil, nil],
poll_interval: 5,
poll_interval_variance: 0.0,
listen: true,
wait_period: 50,
queues: ['default'],
Expand All @@ -199,13 +200,14 @@ def assert_locker_instantiated(

locker_instantiate = locker_instantiates.first

assert_equal listen, locker_instantiate[:listen]
assert_equal true, locker_instantiate[:poll]
assert_equal queues, locker_instantiate[:queues]
assert_equal poll_interval, locker_instantiate[:poll_interval]
assert_equal wait_period, locker_instantiate[:wait_period]
assert_equal maximum_buffer_size, locker_instantiate[:maximum_buffer_size]
assert_equal worker_priorities, locker_instantiate[:worker_priorities]
assert_equal listen, locker_instantiate[:listen]
assert_equal true, locker_instantiate[:poll]
assert_equal queues, locker_instantiate[:queues]
assert_equal poll_interval, locker_instantiate[:poll_interval]
assert_equal poll_interval_variance, locker_instantiate[:poll_interval_variance]
assert_equal wait_period, locker_instantiate[:wait_period]
assert_equal maximum_buffer_size, locker_instantiate[:maximum_buffer_size]
assert_equal worker_priorities, locker_instantiate[:worker_priorities]
end

def assert_locker_started(
Expand Down Expand Up @@ -258,6 +260,14 @@ def assert_locker_started(
end
end

["-j", "--poll-interval-variance"].each do |command|
it "with #{command} to configure the poll interval variance" do
assert_successful_invocation "./#{filename} #{command} 5"
assert_locker_instantiated(poll_interval_variance: 5)
assert_locker_started
end
end

it "with --listen false to disable listen mode" do
assert_successful_invocation "./#{filename} --listen false"
assert_locker_instantiated(listen: false)
Expand Down
9 changes: 6 additions & 3 deletions spec/que/poller_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ def poll(
connection: override_connection || connection,
queue: queue_name,
poll_interval: 5,
poll_interval_variance: 0,
)

Que::Poller.setup(override_connection || connection)
Expand Down Expand Up @@ -245,6 +246,7 @@ def assert_poll(priorities:, locked:)
connection: connection,
queue: 'default',
poll_interval: 5,
poll_interval_variance: 0,
)
end

Expand Down Expand Up @@ -275,14 +277,15 @@ def assert_poll(priorities:, locked:)
assert_equal false, poller.should_poll?
end

it "should be false if the number of jobs returned from the last poll was less than the lowest priority request, but the poll_interval has elapsed" do
it "should be true if the number of jobs returned from the last poll was less than the lowest priority request, but the poll_interval has elapsed" do
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That was a typo, the test is actually checking assert_equal true, which is the expected behavior.

job_ids = 5.times.map { Que::Job.enqueue.que_attrs[:id] }

result = poller.poll(priorities: { 500 => 7 }, held_locks: Set.new)
assert_equal job_ids, result.map(&:id)

poller.instance_variable_set(:@last_polled_at, Time.now - 30)
assert_equal true, poller.should_poll?
Timecop.freeze(Time.now + 30) do
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change makes the spec less coupled with implementation details.

Instead of setting private instance variables, it 'time travels' 30 seconds into the future, after the poll_interval has already elapsed.

assert_equal true, poller.should_poll?
end
end
end
end
4 changes: 4 additions & 0 deletions spec/spec_helper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,10 @@
require 'minitest/hooks'
require 'minitest/profile'

# "time travel" capabilities.
require 'timecop'
Timecop.safe_mode = true

# Other support stuff.
Dir['./spec/support/**/*.rb'].sort.each(&method(:require))

Expand Down
Loading