From 9b87cb386625eeb8942580c800986ec5c69115e8 Mon Sep 17 00:00:00 2001 From: Tomasz Gieniusz Date: Fri, 20 Sep 2024 14:16:06 +1000 Subject: [PATCH 1/2] Introduce poll-interval-variance option --- lib/que/command_line_interface.rb | 29 +++++++++---- lib/que/locker.rb | 56 ++++++++++++++----------- lib/que/poller.rb | 50 +++++++++++++--------- spec/que/command_line_interface_spec.rb | 24 +++++++---- spec/que/poller_spec.rb | 6 ++- 5 files changed, 103 insertions(+), 62 deletions(-) diff --git a/lib/que/command_line_interface.rb b/lib/que/command_line_interface.rb index 7e9f2e67..0ae4627f 100644 --- a/lib/que/command_line_interface.rb +++ b/lib/que/command_line_interface.rb @@ -18,14 +18,15 @@ def parse( default_require_file: RAILS_ENVIRONMENT_FILE ) - options = {} - queues = [] - log_level = 'info' - log_internals = false - poll_interval = 5 - connection_url = nil - worker_count = nil - worker_priorities = nil + options = {} + queues = [] + log_level = 'info' + log_internals = false + poll_interval = 5 + poll_interval_variance = 0 + connection_url = nil + worker_count = nil + worker_priorities = nil parser = OptionParser.new do |opts| @@ -50,6 +51,15 @@ def parse( poll_interval = i end + opts.on( + '-j', + '--poll-interval-variance [INTERVAL]', + Float, + "Set maximum variance in poll interval, in seconds (default: 0)", + ) do |j| + poll_interval_variance = j.to_f + end + opts.on( '--listen [LISTEN]', String, @@ -232,7 +242,8 @@ def parse( options[:queues] = queues_hash end - options[:poll_interval] = poll_interval + options[:poll_interval] = poll_interval + options[:poll_interval_variance] = poll_interval_variance locker = begin diff --git a/lib/que/locker.rb b/lib/que/locker.rb index cb7ec40a..7f742ab0 100644 --- a/lib/que/locker.rb +++ b/lib/que/locker.rb @@ -33,7 +33,7 @@ class << self } class Locker - attr_reader :thread, :workers, :job_buffer, :locks, :queues, :poll_interval + attr_reader :thread, :workers, :job_buffer, :locks, :queues, :poll_interval, :poll_interval_variance MESSAGE_RESOLVERS = {} RESULT_RESOLVERS = {} @@ -47,22 +47,24 @@ class Locker RESULT_RESOLVERS[:job_finished] = -> (messages) { finish_jobs(messages.map{|m| m.fetch(:metajob)}) } - DEFAULT_POLL_INTERVAL = 5.0 - DEFAULT_WAIT_PERIOD = 50 - DEFAULT_MAXIMUM_BUFFER_SIZE = 8 - DEFAULT_WORKER_PRIORITIES = [10, 30, 50, nil, nil, nil].freeze + DEFAULT_POLL_INTERVAL = 5.0 + DEFAULT_POLL_INTERVAL_VARIANCE = 0.0 + DEFAULT_WAIT_PERIOD = 50 + DEFAULT_MAXIMUM_BUFFER_SIZE = 8 + DEFAULT_WORKER_PRIORITIES = [10, 30, 50, nil, nil, nil].freeze def initialize( - queues: [Que.default_queue], - connection_url: nil, - listen: true, - poll: true, - poll_interval: DEFAULT_POLL_INTERVAL, - wait_period: DEFAULT_WAIT_PERIOD, - maximum_buffer_size: DEFAULT_MAXIMUM_BUFFER_SIZE, - worker_priorities: DEFAULT_WORKER_PRIORITIES, - on_worker_start: nil, - pidfile: nil + queues: [Que.default_queue], + connection_url: nil, + listen: true, + poll: true, + poll_interval: DEFAULT_POLL_INTERVAL, + poll_interval_variance: DEFAULT_POLL_INTERVAL_VARIANCE, + wait_period: DEFAULT_WAIT_PERIOD, + maximum_buffer_size: DEFAULT_MAXIMUM_BUFFER_SIZE, + worker_priorities: DEFAULT_WORKER_PRIORITIES, + on_worker_start: nil, + pidfile: nil ) # Sanity-check all our arguments, since some users may instantiate Locker @@ -71,6 +73,7 @@ def initialize( Que.assert [TrueClass, FalseClass], poll Que.assert Numeric, poll_interval + Que.assert Numeric, poll_interval_variance Que.assert Numeric, wait_period Que.assert Array, worker_priorities @@ -94,13 +97,14 @@ def initialize( Que.internal_log :locker_instantiate, self do { - queues: queues, - listen: listen, - poll: poll, - poll_interval: poll_interval, - wait_period: wait_period, - maximum_buffer_size: maximum_buffer_size, - worker_priorities: worker_priorities, + queues: queues, + listen: listen, + poll: poll, + poll_interval: poll_interval, + poll_interval_variance: poll_interval_variance, + wait_period: wait_period, + maximum_buffer_size: maximum_buffer_size, + worker_priorities: worker_priorities, } end @@ -108,6 +112,7 @@ def initialize( @locks = Set.new @poll_interval = poll_interval + @poll_interval_variance = poll_interval_variance if queues.is_a?(Hash) @queue_names = queues.keys @@ -204,9 +209,10 @@ def initialize( if poll @queues.map do |queue_name, interval| Poller.new( - connection: @connection, - queue: queue_name, - poll_interval: interval, + connection: @connection, + queue: queue_name, + poll_interval: interval, + poll_interval_variance: poll_interval_variance, ) end end diff --git a/lib/que/poller.rb b/lib/que/poller.rb index 520879fd..5d3f2f57 100644 --- a/lib/que/poller.rb +++ b/lib/que/poller.rb @@ -116,25 +116,32 @@ class Poller :connection, :queue, :poll_interval, + :poll_interval_variance, :last_polled_at, - :last_poll_satisfied + :last_poll_satisfied, + :next_poll_at def initialize( connection:, queue:, - poll_interval: + poll_interval:, + poll_interval_variance: ) - @connection = connection - @queue = queue - @poll_interval = poll_interval + @connection = connection + @queue = queue + @poll_interval = poll_interval + @poll_interval_variance = poll_interval_variance + @last_polled_at = nil @last_poll_satisfied = nil + @next_poll_at = Time.now Que.internal_log :poller_instantiate, self do { - backend_pid: connection.backend_pid, - queue: queue, - poll_interval: poll_interval, + backend_pid: connection.backend_pid, + queue: queue, + poll_interval: poll_interval, + poll_interval_variance: poll_interval_variance, } end end @@ -158,14 +165,20 @@ def poll( @last_polled_at = Time.now @last_poll_satisfied = poll_satisfied?(priorities, jobs) + @next_poll_at = last_polled_at + + poll_interval + + rand(-poll_interval_variance..poll_interval_variance) Que.internal_log :poller_polled, self do { - queue: @queue, - locked: jobs.count, - priorities: priorities, - held_locks: held_locks.to_a, - newly_locked: jobs.map { |key| key.fetch(:id) }, + queue: @queue, + locked: jobs.count, + priorities: priorities, + held_locks: held_locks.to_a, + newly_locked: jobs.map { |key| key.fetch(:id) }, + last_polled_at: last_polled_at, + last_poll_satisfied: last_poll_satisfied, + next_poll_at: next_poll_at, } end @@ -173,16 +186,15 @@ def poll( end def should_poll? + # polling is disabled for this queue + return false if poll_interval.nil? + # Never polled before? last_poll_satisfied.nil? || # Plenty of jobs were available last time? last_poll_satisfied == true || - poll_interval_elapsed? - end - - def poll_interval_elapsed? - return unless interval = poll_interval - (Time.now - last_polled_at) > interval + # It's due time to poll again regardless of the last poll's results? + next_poll_at < Time.now end class << self diff --git a/spec/que/command_line_interface_spec.rb b/spec/que/command_line_interface_spec.rb index 7ee69c31..2fc01c0e 100644 --- a/spec/que/command_line_interface_spec.rb +++ b/spec/que/command_line_interface_spec.rb @@ -188,6 +188,7 @@ def write_file def assert_locker_instantiated( worker_priorities: [10, 30, 50, nil, nil, nil], poll_interval: 5, + poll_interval_variance: 0.0, listen: true, wait_period: 50, queues: ['default'], @@ -199,13 +200,14 @@ def assert_locker_instantiated( locker_instantiate = locker_instantiates.first - assert_equal listen, locker_instantiate[:listen] - assert_equal true, locker_instantiate[:poll] - assert_equal queues, locker_instantiate[:queues] - assert_equal poll_interval, locker_instantiate[:poll_interval] - assert_equal wait_period, locker_instantiate[:wait_period] - assert_equal maximum_buffer_size, locker_instantiate[:maximum_buffer_size] - assert_equal worker_priorities, locker_instantiate[:worker_priorities] + assert_equal listen, locker_instantiate[:listen] + assert_equal true, locker_instantiate[:poll] + assert_equal queues, locker_instantiate[:queues] + assert_equal poll_interval, locker_instantiate[:poll_interval] + assert_equal poll_interval_variance, locker_instantiate[:poll_interval_variance] + assert_equal wait_period, locker_instantiate[:wait_period] + assert_equal maximum_buffer_size, locker_instantiate[:maximum_buffer_size] + assert_equal worker_priorities, locker_instantiate[:worker_priorities] end def assert_locker_started( @@ -258,6 +260,14 @@ def assert_locker_started( end end + ["-j", "--poll-interval-variance"].each do |command| + it "with #{command} to configure the poll interval variance" do + assert_successful_invocation "./#{filename} #{command} 5" + assert_locker_instantiated(poll_interval_variance: 5) + assert_locker_started + end + end + it "with --listen false to disable listen mode" do assert_successful_invocation "./#{filename} --listen false" assert_locker_instantiated(listen: false) diff --git a/spec/que/poller_spec.rb b/spec/que/poller_spec.rb index 57d39bd8..763776ed 100644 --- a/spec/que/poller_spec.rb +++ b/spec/que/poller_spec.rb @@ -44,6 +44,7 @@ def poll( connection: override_connection || connection, queue: queue_name, poll_interval: 5, + poll_interval_variance: 0, ) Que::Poller.setup(override_connection || connection) @@ -245,6 +246,7 @@ def assert_poll(priorities:, locked:) connection: connection, queue: 'default', poll_interval: 5, + poll_interval_variance: 0, ) end @@ -275,13 +277,13 @@ def assert_poll(priorities:, locked:) assert_equal false, poller.should_poll? end - it "should be false if the number of jobs returned from the last poll was less than the lowest priority request, but the poll_interval has elapsed" do + it "should be true if the number of jobs returned from the last poll was less than the lowest priority request, but the poll_interval has elapsed" do job_ids = 5.times.map { Que::Job.enqueue.que_attrs[:id] } result = poller.poll(priorities: { 500 => 7 }, held_locks: Set.new) assert_equal job_ids, result.map(&:id) - poller.instance_variable_set(:@last_polled_at, Time.now - 30) + poller.instance_variable_set(:@next_poll_at, Time.now) assert_equal true, poller.should_poll? end end From 6ccd8bcb38fcfdd0b4f70fe4eefe53873ac49eff Mon Sep 17 00:00:00 2001 From: Tomasz Gieniusz Date: Tue, 24 Sep 2024 19:07:04 +1000 Subject: [PATCH 2/2] Make specs less coupled with implementation details --- Gemfile | 2 ++ spec/gemfiles/Gemfile-rails-6.0 | 1 + spec/gemfiles/Gemfile-rails-6.1 | 1 + spec/gemfiles/Gemfile-rails-7.0 | 1 + spec/gemfiles/Gemfile-rails-7.1 | 1 + spec/gemfiles/Gemfile-rails-7.2 | 1 + spec/que/poller_spec.rb | 5 +++-- spec/spec_helper.rb | 4 ++++ 8 files changed, 14 insertions(+), 2 deletions(-) diff --git a/Gemfile b/Gemfile index eeec1ae1..344bd16a 100644 --- a/Gemfile +++ b/Gemfile @@ -26,6 +26,8 @@ group :test do gem 'pry' gem 'pg_examiner', '~> 0.5.2' + + gem 'timecop', '~> 0.9.10' end gemspec diff --git a/spec/gemfiles/Gemfile-rails-6.0 b/spec/gemfiles/Gemfile-rails-6.0 index 9bcb0dbb..523c53ca 100644 --- a/spec/gemfiles/Gemfile-rails-6.0 +++ b/spec/gemfiles/Gemfile-rails-6.0 @@ -20,4 +20,5 @@ group :test do gem 'minitest-hooks', '1.4.0' gem 'pry' gem 'pg_examiner', '~> 0.5.2' + gem 'timecop', '~> 0.9.10' end diff --git a/spec/gemfiles/Gemfile-rails-6.1 b/spec/gemfiles/Gemfile-rails-6.1 index 7a3d2828..fa72d011 100644 --- a/spec/gemfiles/Gemfile-rails-6.1 +++ b/spec/gemfiles/Gemfile-rails-6.1 @@ -20,4 +20,5 @@ group :test do gem 'minitest-hooks', '1.4.0' gem 'pry' gem 'pg_examiner', '~> 0.5.2' + gem 'timecop', '~> 0.9.10' end diff --git a/spec/gemfiles/Gemfile-rails-7.0 b/spec/gemfiles/Gemfile-rails-7.0 index b40936de..d1909983 100644 --- a/spec/gemfiles/Gemfile-rails-7.0 +++ b/spec/gemfiles/Gemfile-rails-7.0 @@ -20,4 +20,5 @@ group :test do gem 'minitest-hooks', '1.4.0' gem 'pry' gem 'pg_examiner', '~> 0.5.2' + gem 'timecop', '~> 0.9.10' end diff --git a/spec/gemfiles/Gemfile-rails-7.1 b/spec/gemfiles/Gemfile-rails-7.1 index 2da7475c..d9d3545d 100644 --- a/spec/gemfiles/Gemfile-rails-7.1 +++ b/spec/gemfiles/Gemfile-rails-7.1 @@ -20,4 +20,5 @@ group :test do gem 'minitest-hooks', '1.4.0' gem 'pry' gem 'pg_examiner', '~> 0.5.2' + gem 'timecop', '~> 0.9.10' end diff --git a/spec/gemfiles/Gemfile-rails-7.2 b/spec/gemfiles/Gemfile-rails-7.2 index 1caa4180..aabf73c1 100644 --- a/spec/gemfiles/Gemfile-rails-7.2 +++ b/spec/gemfiles/Gemfile-rails-7.2 @@ -20,4 +20,5 @@ group :test do gem 'minitest-hooks', '1.4.0' gem 'pry' gem 'pg_examiner', '~> 0.5.2' + gem 'timecop', '~> 0.9.10' end diff --git a/spec/que/poller_spec.rb b/spec/que/poller_spec.rb index 763776ed..9b748783 100644 --- a/spec/que/poller_spec.rb +++ b/spec/que/poller_spec.rb @@ -283,8 +283,9 @@ def assert_poll(priorities:, locked:) result = poller.poll(priorities: { 500 => 7 }, held_locks: Set.new) assert_equal job_ids, result.map(&:id) - poller.instance_variable_set(:@next_poll_at, Time.now) - assert_equal true, poller.should_poll? + Timecop.freeze(Time.now + 30) do + assert_equal true, poller.should_poll? + end end end end diff --git a/spec/spec_helper.rb b/spec/spec_helper.rb index ac34bb5c..53094515 100644 --- a/spec/spec_helper.rb +++ b/spec/spec_helper.rb @@ -35,6 +35,10 @@ require 'minitest/hooks' require 'minitest/profile' +# "time travel" capabilities. +require 'timecop' +Timecop.safe_mode = true + # Other support stuff. Dir['./spec/support/**/*.rb'].sort.each(&method(:require))