diff --git a/.github/workflows/lint.yml b/.github/workflows/lint.yml new file mode 100644 index 0000000..d6d9bd3 --- /dev/null +++ b/.github/workflows/lint.yml @@ -0,0 +1,37 @@ +--- +name: Lint +on: + push: + branches: + - master + pull_request: + branches: + - master + +jobs: + rubocop: + name: Rubocop + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v3 + + - name: Set up Ruby ${{ matrix.ruby_version }} + uses: ruby/setup-ruby@v1 + with: + bundler: 2 + ruby-version: 2.7 + + - uses: actions/cache@v3 + with: + path: vendor/bundle + key: ${{ runner.os }}-gems-${{ matrix.ruby-version }}-${{ hashFiles('./*.gemspec') }} + restore-keys: | + ${{ runner.os }}-gems- + + - name: Bundle install + run: | + bundle config path vendor/bundle + bundle install --jobs 4 --retry 3 + + - name: Run Rubocop + run: bundle exec rubocop diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml new file mode 100644 index 0000000..2d9053a --- /dev/null +++ b/.github/workflows/test.yml @@ -0,0 +1,49 @@ +--- +name: Test +on: + push: + branches: + - master + pull_request: + branches: + - master + +jobs: + rspec: + name: Rspec + runs-on: ubuntu-latest + services: + redis: + image: redis:6 + options: >- + --health-cmd "redis-cli ping" + --health-interval 10s + --health-timeout 5s + --health-retries 5 + ports: + - 6379:6379 + strategy: + matrix: + include: + - { ruby_version: 2.7 } + - { ruby_version: '3.0' } + - { ruby_version: 3.1 } + steps: + - uses: actions/checkout@v3 + + - name: Set up Ruby ${{ matrix.ruby_version }} + uses: ruby/setup-ruby@v1 + with: + bundler: 2 + ruby-version: ${{ matrix.ruby_version }} + + # Appraisal doesn't support vendored install + # See: https://github.com/thoughtbot/appraisal/issues/173 + # https://github.com/thoughtbot/appraisal/pull/174 + - name: Bundle install + run: | + bundle install --jobs 4 --retry 3 + bundle exec appraisal install + + - name: Run tests + run: bundle exec appraisal rspec diff --git a/.gitignore b/.gitignore index 80036d4..4633e88 100644 --- a/.gitignore +++ b/.gitignore @@ -16,3 +16,4 @@ spec/reports test/tmp test/version_tmp tmp +.lefthook-local.yml \ No newline at end of file diff --git a/.rubocop.yml b/.rubocop.yml index 980f5b5..548149c 100644 --- a/.rubocop.yml +++ b/.rubocop.yml @@ -1,8 +1,15 @@ +--- require: rubocop-rspec AllCops: + TargetRubyVersion: 2.7.0 + SuggestExtensions: false + NewCops: enable Include: - ./Gemfile + - ./Rakefile + - '*.gemspec' + - '**/*.rb' Documentation: Enabled: false @@ -10,8 +17,29 @@ Documentation: Style/StringLiterals: EnforcedStyle: double_quotes -Style/ClassAndModuleChildren: - EnforcedStyle: compact - RSpec/FilePath: Enabled: false + +RSpec/ExampleLength: + Enabled: false + +RSpec/AnyInstance: + Enabled: false + +Metrics/MethodLength: + Max: 15 + +Metrics/ClassLength: + Max: 150 + +Layout/LineLength: + Max: 80 + +Layout/FirstArgumentIndentation: + EnforcedStyle: consistent + +Layout/FirstMethodArgumentLineBreak: + Enabled: true + +Layout/MultilineMethodArgumentLineBreaks: + Enabled: true diff --git a/.travis.yml b/.travis.yml deleted file mode 100644 index 41fdc5d..0000000 --- a/.travis.yml +++ /dev/null @@ -1,18 +0,0 @@ -language: ruby -rvm: - - 2.2 - - 2.3.1 - - 2.4 - - 2.5 - -cache: bundler - -services: - - redis-server - -gemfile: - - gemfiles/sidekiq_4.0.gemfile - - gemfiles/sidekiq_4.1.gemfile - - gemfiles/sidekiq_4.2.gemfile - - gemfiles/sidekiq_5.0.gemfile - - gemfiles/sidekiq_master.gemfile diff --git a/Appraisals b/Appraisals index d805dc9..0e21902 100644 --- a/Appraisals +++ b/Appraisals @@ -14,6 +14,18 @@ appraise 'sidekiq-5.0' do gem 'sidekiq', '~> 5.0.0' end +appraise 'sidekiq-6.0' do + gem 'sidekiq', '~> 6.0.0' +end + +appraise 'sidekiq-6.5' do + gem 'sidekiq', '~> 6.5.0' +end + +appraise 'sidekiq-7.0' do + gem 'sidekiq', '~> 7.0.0' +end + appraise 'sidekiq-master' do - gem 'sidekiq', github: 'mperham/sidekiq' + gem 'sidekiq', github: 'mperham/sidekiq', branch: 'main' end diff --git a/README.md b/README.md index a42810e..4f2d511 100644 --- a/README.md +++ b/README.md @@ -1,3 +1,5 @@ +[![Gem Version](https://badge.fury.io/rb/sidekiq-grouping.svg)](https://rubygems.org/gems/sidekiq-grouping) + # Sidekiq::Grouping @@ -25,7 +27,7 @@ class ElasticBulkIndexWorker include Sidekiq::Worker sidekiq_options( - queue: :elasic_bulks, + queue: :elastic_bulks, batch_flush_size: 30, # Jobs will be combined when queue size exceeds 30 batch_flush_interval: 60, # Jobs will be combined every 60 seconds retry: 5 diff --git a/bin/console b/bin/console new file mode 100755 index 0000000..dde3a1d --- /dev/null +++ b/bin/console @@ -0,0 +1,8 @@ +#!/usr/bin/env ruby +# frozen_string_literal: true + +require "bundler/setup" +require "sidekiq/grouping" + +require "pry" +Pry.start diff --git a/gemfiles/sidekiq_6.0.gemfile b/gemfiles/sidekiq_6.0.gemfile new file mode 100644 index 0000000..3fc2547 --- /dev/null +++ b/gemfiles/sidekiq_6.0.gemfile @@ -0,0 +1,7 @@ +# This file was generated by Appraisal + +source "https://rubygems.org" + +gem "sidekiq", "~> 6.0.0" + +gemspec path: "../" diff --git a/gemfiles/sidekiq_6.5.gemfile b/gemfiles/sidekiq_6.5.gemfile new file mode 100644 index 0000000..bc33000 --- /dev/null +++ b/gemfiles/sidekiq_6.5.gemfile @@ -0,0 +1,7 @@ +# This file was generated by Appraisal + +source "https://rubygems.org" + +gem "sidekiq", "~> 6.5.0" + +gemspec path: "../" diff --git a/gemfiles/sidekiq_7.0.gemfile b/gemfiles/sidekiq_7.0.gemfile new file mode 100644 index 0000000..381b307 --- /dev/null +++ b/gemfiles/sidekiq_7.0.gemfile @@ -0,0 +1,7 @@ +# This file was generated by Appraisal + +source "https://rubygems.org" + +gem "sidekiq", "~> 7.0.0" + +gemspec path: "../" diff --git a/gemfiles/sidekiq_master.gemfile b/gemfiles/sidekiq_master.gemfile index 1f64b5f..0b63773 100644 --- a/gemfiles/sidekiq_master.gemfile +++ b/gemfiles/sidekiq_master.gemfile @@ -2,6 +2,6 @@ source "https://rubygems.org" -gem "sidekiq", github: "mperham/sidekiq" +gem "sidekiq", github: "mperham/sidekiq", branch: "main" gemspec path: "../" diff --git a/lefthook.yml b/lefthook.yml new file mode 100644 index 0000000..79bf09c --- /dev/null +++ b/lefthook.yml @@ -0,0 +1,20 @@ +--- +# Git hooks configuration +# +# See: github.com/evilmartians/lefthook + +pre-commit: + parallel: true + commands: + appraisal: + glob: "{Appraisals,*.gemfile}" + run: echo {staged_files} > /dev/null; bundle exec appraisal install && git add gemfiles/*.gemfile + rubocop: + glob: "{*.rb,*.gemspec,Gemfile,Rakefile}" + run: bundle exec rubocop -A {staged_files} && git add {staged_files} + +pre-push: + commands: + rspec: + glob: "*.rb" + run: echo {push_files} > /dev/null; bundle exec appraisal rspec diff --git a/lib/sidekiq/grouping.rb b/lib/sidekiq/grouping.rb index 1b94668..eaf49ce 100644 --- a/lib/sidekiq/grouping.rb +++ b/lib/sidekiq/grouping.rb @@ -1,43 +1,51 @@ +# frozen_string_literal: true + require "active_support" require "active_support/core_ext/string" require "active_support/configurable" require "active_support/core_ext/numeric/time" +require "sidekiq" require "sidekiq/grouping/version" require "concurrent" -module Sidekiq::Grouping - autoload :Config, "sidekiq/grouping/config" - autoload :Redis, "sidekiq/grouping/redis" - autoload :Batch, "sidekiq/grouping/batch" - autoload :ReliableBatch, "sidekiq/grouping/reliable_batch" - autoload :Middleware, "sidekiq/grouping/middleware" - autoload :Flusher, "sidekiq/grouping/flusher" - autoload :FlusherObserver, "sidekiq/grouping/flusher_observer" - autoload :Supervisor, "sidekiq/grouping/supervisor" - - class << self - attr_writer :logger - - def logger - @logger ||= Sidekiq.logger - end +module Sidekiq + module Grouping + autoload :Config, "sidekiq/grouping/config" + autoload :Redis, "sidekiq/grouping/redis" + autoload :Batch, "sidekiq/grouping/batch" + autoload :ReliableBatch, "sidekiq/grouping/reliable_batch" + autoload :Supervisor, "sidekiq/grouping/supervisor" + autoload :Middleware, "sidekiq/grouping/middleware" + autoload :Flusher, "sidekiq/grouping/flusher" + autoload :FlusherObserver, "sidekiq/grouping/flusher_observer" - def force_flush_for_test! - Sidekiq::Grouping::Flusher.new.force_flush_for_test! - end + class << self + attr_writer :logger + + def logger + @logger ||= Sidekiq.logger + end + + def force_flush_for_test! + Sidekiq::Grouping::Flusher.new.force_flush_for_test! + end - def start! - interval = Sidekiq::Grouping::Config.poll_interval - @observer = Sidekiq::Grouping::FlusherObserver.new - @task = Concurrent::TimerTask.new(execution_interval: interval) do - Sidekiq::Grouping::Flusher.new.flush - Sidekiq::Grouping::Supervisor.new.requeue_expired if Sidekiq::Grouping::Config.reliable + def start! + interval = Sidekiq::Grouping::Config.poll_interval + @observer = Sidekiq::Grouping::FlusherObserver.new + @task = Concurrent::TimerTask.new(execution_interval: interval) do + Sidekiq::Grouping::Flusher.new.flush + if Sidekiq::Grouping::Config.reliable + Sidekiq::Grouping::Supervisor.new.requeue_expired + end + end + @task.add_observer(@observer) + logger.info( + "[Sidekiq::Grouping] Started polling batches every " \ + "#{interval} seconds" + ) + @task.execute end - @task.add_observer(@observer) - logger.info( - "[Sidekiq::Grouping] Started polling batches every #{interval} seconds" - ) - @task.execute end end end diff --git a/lib/sidekiq/grouping/adapters/base_adapter.rb b/lib/sidekiq/grouping/adapters/base_adapter.rb new file mode 100644 index 0000000..4239e09 --- /dev/null +++ b/lib/sidekiq/grouping/adapters/base_adapter.rb @@ -0,0 +1,38 @@ +# frozen_string_literal: true + +require_relative "../redis_dispatcher" +require_relative "../redis_scripts" + +module Sidekiq + module Grouping + module Adapters + class BaseAdapter + private + + def requeue_script(unique) + if unique + RedisScripts.script_hash(:unique_requeue) + else + RedisScripts.script_hash(:requeue) + end + end + + def unique_messages_key(name) + ns("#{name}:unique_messages") + end + + def pending_jobs(name) + ns("#{name}:pending_jobs") + end + + def this_job_name(name) + ns("#{name}:#{SecureRandom.hex}") + end + + def ns(key = nil) + "batching:#{key}" + end + end + end + end +end diff --git a/lib/sidekiq/grouping/adapters/redis_adapter.rb b/lib/sidekiq/grouping/adapters/redis_adapter.rb new file mode 100644 index 0000000..ec1674a --- /dev/null +++ b/lib/sidekiq/grouping/adapters/redis_adapter.rb @@ -0,0 +1,67 @@ +# frozen_string_literal: true + +require_relative "base_adapter" +require_relative "../redis_dispatcher" +require_relative "../redis_scripts" + +module Sidekiq + module Grouping + module Adapters + class RedisAdapter < BaseAdapter + include RedisDispatcher + + def push_messages(name, messages, remember_unique: false) + keys = [ + ns("batches"), + name, + ns(name), + unique_messages_key(name), + remember_unique.to_s + ] + argv = [messages] + redis_call( + :evalsha, + RedisScripts.script_hash(:merge_array), + keys, + argv + ) + end + + def pluck(name, limit) + keys = [ns(name), unique_messages_key(name)] + args = [limit] + redis_call(:evalsha, RedisScripts.script_hash(:pluck), keys, args) + end + + def reliable_pluck(name, limit) + keys = [ + ns(name), + unique_messages_key(name), + pending_jobs(name), + Time.now.to_i, + this_job_name(name) + ] + argv = [limit] + redis_call( + :evalsha, + RedisScripts.script_hash(:reliable_pluck), + keys, + argv + ) + end + + def requeue_expired(conn, unique, expired, name) + keys = [ + expired, + ns(name), + pending_jobs(name), + unique_messages_key(name) + ] + redis_connection_call( + conn, :evalsha, requeue_script(unique), keys, [] + ) + end + end + end + end +end diff --git a/lib/sidekiq/grouping/adapters/redis_client_adapter.rb b/lib/sidekiq/grouping/adapters/redis_client_adapter.rb new file mode 100644 index 0000000..38e5f82 --- /dev/null +++ b/lib/sidekiq/grouping/adapters/redis_client_adapter.rb @@ -0,0 +1,67 @@ +# frozen_string_literal: true + +require_relative "base_adapter" +require_relative "../redis_dispatcher" +require_relative "../redis_scripts" + +module Sidekiq + module Grouping + module Adapters + class RedisClientAdapter < BaseAdapter + include RedisDispatcher + + def push_messages(name, messages, remember_unique: false) + redis_call( + :evalsha, + RedisScripts.script_hash(:merge_array), + 5, + ns("batches"), + name, + ns(name), + unique_messages_key(name), + remember_unique.to_s, + messages + ) + end + + def pluck(name, limit) + redis_call( + :evalsha, + RedisScripts.script_hash(:pluck), + 2, + ns(name), + unique_messages_key(name), + limit + ) + end + + def reliable_pluck(name, limit) + redis_call( + :evalsha, + RedisScripts.script_hash(:reliable_pluck), + 5, + ns(name), + unique_messages_key(name), + pending_jobs(name), + Time.now.to_i, + this_job_name(name), + limit + ) + end + + def requeue_expired(conn, unique, expired, name) + redis_connection_call( + conn, + :evalsha, + requeue_script(unique), + 4, + expired, + ns(name), + pending_jobs(name), + unique_messages_key(name) + ) + end + end + end + end +end diff --git a/lib/sidekiq/grouping/batch.rb b/lib/sidekiq/grouping/batch.rb index becac2f..490b6c4 100644 --- a/lib/sidekiq/grouping/batch.rb +++ b/lib/sidekiq/grouping/batch.rb @@ -1,7 +1,9 @@ +# frozen_string_literal: true + module Sidekiq module Grouping class Batch - def initialize(worker_class, queue, redis_pool = nil) + def initialize(worker_class, queue, _redis_pool = nil) @worker_class = worker_class @queue = queue @name = "#{worker_class.underscore}:#{queue}" @@ -12,44 +14,68 @@ def initialize(worker_class, queue, redis_pool = nil) def add(msg) msg = msg.to_json - @redis.push_msg(@name, msg, enqueue_similar_once?) if should_add? msg + return unless should_add? msg + + @redis.push_msg( + @name, + msg, + remember_unique: enqueue_similar_once? + ) end - def should_add? msg + def should_add?(msg) return true unless enqueue_similar_once? + !@redis.enqueued?(@name, msg) end def merge(messages) - # messages is expected to be an array with a single item which is an array of elements that would normally be added using Sidekiq::Grouping::Batch#add - raise "batch_merge_array worker received #{messages.size} arguments. Expected a single Array of elements." if messages.size > 1 + # messages is expected to be an array with a single item + # which is an array of elements that would normally be + # added using Sidekiq::Grouping::Batch#add + if messages.size > 1 + raise "batch_merge_array worker received #{messages.size} " \ + "arguments. Expected a single Array of elements." + end messages = messages.first - raise "batch_merge_array worker received type #{messages.class.name}. Expected Array." unless messages.is_a?(Array) + unless messages.is_a?(Array) + messages_type = messages.class.name + raise "batch_merge_array worker received type #{messages_type}. " \ + "Expected Array." + end messages.each_slice(1000) do |slice| - @redis.push_messages(@name, slice.map(&:to_json), enqueue_similar_once?) + push_messages(slice.map(&:to_json), enqueue_similar_once?) end end + def push_messages(slice_json, remember_unique) + @redis.push_messages( + @name, + slice_json, + remember_unique: remember_unique + ) + end + def size @redis.batch_size(@name) end def chunk_size - worker_class_options['batch_size'] || + worker_class_options["batch_size"] || Sidekiq::Grouping::Config.max_batch_size end def pluck_size - worker_class_options['batch_flush_size'] || + worker_class_options["batch_flush_size"] || chunk_size end def pluck - if @redis.lock(@name) - @redis.pluck(@name, pluck_size).map { |value| JSON.parse(value) } - end + return unless @redis.lock(@name) + + @redis.pluck(@name, pluck_size).map { |value| JSON.parse(value) } end def flush @@ -58,9 +84,9 @@ def flush chunk.each_slice(chunk_size) do |subchunk| Sidekiq::Client.push( - 'class' => @worker_class, - 'queue' => @queue, - 'args' => [true, subchunk] + "class" => @worker_class, + "queue" => @queue, + "args" => [true, subchunk] ) end set_current_time_as_last @@ -86,10 +112,11 @@ def last_execution_time end def next_execution_time - if interval = worker_class_options['batch_flush_interval'] - last_time = last_execution_time - last_time + interval.seconds if last_time - end + interval = worker_class_options["batch_flush_interval"] + return unless interval + + last_time = last_execution_time + last_time + interval.seconds if last_time end def delete @@ -111,13 +138,13 @@ def could_flush_on_time? if last_time.blank? set_current_time_as_last false - else - next_time < Time.now if next_time + elsif next_time + next_time < Time.now end end def enqueue_similar_once? - worker_class_options['batch_unique'] == true + worker_class_options["batch_unique"] == true end def set_current_time_as_last @@ -139,7 +166,7 @@ def all end def extract_worker_klass_and_queue(name) - klass, queue = name.split(':') + klass, queue = name.split(":") [klass.camelize, queue] end end diff --git a/lib/sidekiq/grouping/config.rb b/lib/sidekiq/grouping/config.rb index 87410b0..0866678 100644 --- a/lib/sidekiq/grouping/config.rb +++ b/lib/sidekiq/grouping/config.rb @@ -1,34 +1,46 @@ -module Sidekiq::Grouping::Config - include ActiveSupport::Configurable +# frozen_string_literal: true - def self.options - Sidekiq.options[:grouping] || Sidekiq.options["grouping"] || {} # sidekiq 5.x use symbol in keys - end +module Sidekiq + module Grouping + module Config + include ActiveSupport::Configurable - # Queue size overflow check polling interval - config_accessor :poll_interval do - options[:poll_interval] || 3 - end + def self.options + if Sidekiq.respond_to?(:[]) # Sidekiq 6.x + Sidekiq[:grouping] || {} + elsif Sidekiq.respond_to?(:options) # Sidekiq <= 5.x + Sidekiq.options[:grouping] || Sidekiq.options["grouping"] || {} + else # Sidekiq 7.x + Sidekiq.default_configuration[:grouping] || {} + end + end - # Maximum batch size - config_accessor :max_batch_size do - options[:max_batch_size] || 1000 - end + # Queue size overflow check polling interval + config_accessor :poll_interval do + options[:poll_interval] || 3 + end - # Batch queue flush lock timeout - config_accessor :lock_ttl do - options[:lock_ttl] || 1 - end + # Maximum batch size + config_accessor :max_batch_size do + options[:max_batch_size] || 1000 + end - # Use reliable queues - config_accessor :reliable do - options[:reliable] || false - end + # Batch queue flush lock timeout + config_accessor :lock_ttl do + options[:lock_ttl] || 1 + end + + # Use reliable queues + config_accessor :reliable do + options[:reliable] || false + end - # Option to override how Sidekiq::Grouping know about tests env - config_accessor :tests_env do - options[:tests_env] || ( - defined?(::Rails) && Rails.respond_to?(:env) && Rails.env.test? - ) + # Option to override how Sidekiq::Grouping know about tests env + config_accessor :tests_env do + options[:tests_env] || ( + defined?(::Rails) && Rails.respond_to?(:env) && Rails.env.test? + ) + end + end end end diff --git a/lib/sidekiq/grouping/flusher.rb b/lib/sidekiq/grouping/flusher.rb index 34954ac..f52e7d7 100644 --- a/lib/sidekiq/grouping/flusher.rb +++ b/lib/sidekiq/grouping/flusher.rb @@ -1,42 +1,54 @@ -class Sidekiq::Grouping::Flusher - def flush - batches = Sidekiq::Grouping::Batch.all.map do |batch| - batch if batch.could_flush? - end - flush_batches(batches) - end +# frozen_string_literal: true - def force_flush_for_test! - unless Sidekiq::Grouping::Config.tests_env - Sidekiq::Grouping.logger.warn( - "**************************************************" - ) - Sidekiq::Grouping.logger.warn([ - "⛔️ force_flush_for_test! for testing API, ", - "but this is not the test environment. ", - "Please check your environment or ", - "change 'tests_env' to cover this one" - ].join) - Sidekiq::Grouping.logger.warn( - "**************************************************" - ) - end - flush_batches(Sidekiq::Grouping::Batch.all) - end +module Sidekiq + module Grouping + class Flusher + def flush + batches = Sidekiq::Grouping::Batch.all.map do |batch| + batch if batch.could_flush? + end + flush_batches(batches) + end - private + def force_flush_for_test! + unless Sidekiq::Grouping::Config.tests_env + Sidekiq::Grouping.logger.warn( + "**************************************************" + ) + Sidekiq::Grouping.logger.warn( + "⛔️ force_flush_for_test! for testing API, " \ + "but this is not the test environment. " \ + "Please check your environment or " \ + "change 'tests_env' to cover this one" + ) + Sidekiq::Grouping.logger.warn( + "**************************************************" + ) + end + flush_batches(Sidekiq::Grouping::Batch.all) + end - def flush_batches(batches) - batches.compact! - flush_concrete(batches) - end + private + + def flush_batches(batches) + batches.compact! + flush_concrete(batches) + end - def flush_concrete(batches) - return if batches.empty? - names = batches.map { |batch| "#{batch.worker_class} in #{batch.queue}" } - Sidekiq::Grouping.logger.info( - "[Sidekiq::Grouping] Trying to flush batched queues: #{names.join(',')}" - ) unless Sidekiq::Grouping::Config.tests_env - batches.each(&:flush) + def flush_concrete(batches) + return if batches.empty? + + names = batches.map do |batch| + "#{batch.worker_class} in #{batch.queue}" + end + unless Sidekiq::Grouping::Config.tests_env + Sidekiq::Grouping.logger.info( + "[Sidekiq::Grouping] Trying to flush batched queues: " \ + "#{names.join(',')}" + ) + end + batches.each(&:flush) + end + end end end diff --git a/lib/sidekiq/grouping/flusher_observer.rb b/lib/sidekiq/grouping/flusher_observer.rb index 8ffa21a..5e8a48c 100644 --- a/lib/sidekiq/grouping/flusher_observer.rb +++ b/lib/sidekiq/grouping/flusher_observer.rb @@ -1,13 +1,19 @@ -class Sidekiq::Grouping::FlusherObserver - def update(time, _result, ex) - if ex.is_a?(Concurrent::TimeoutError) - Sidekiq::Grouping.logger.error( - "[Sidekiq::Grouping] (#{time}) Execution timed out\n" - ) - elsif ex.present? - Sidekiq::Grouping.logger.error( - "[Sidekiq::Grouping] Execution failed with error #{ex}\n" - ) +# frozen_string_literal: true + +module Sidekiq + module Grouping + class FlusherObserver + def update(time, _result, exception) + if exception.is_a?(Concurrent::TimeoutError) + Sidekiq::Grouping.logger.error( + "[Sidekiq::Grouping] (#{time}) Execution timed out\n" + ) + elsif exception.present? + Sidekiq::Grouping.logger.error( + "[Sidekiq::Grouping] Execution failed with error #{exception}\n" + ) + end + end end end end diff --git a/lib/sidekiq/grouping/middleware.rb b/lib/sidekiq/grouping/middleware.rb index 3c6c540..249dfa3 100644 --- a/lib/sidekiq/grouping/middleware.rb +++ b/lib/sidekiq/grouping/middleware.rb @@ -1,44 +1,63 @@ +# frozen_string_literal: true + module Sidekiq module Grouping class Middleware + # rubocop:disable Metrics/AbcSize, Metrics/CyclomaticComplexity, Metrics/MethodLength, Metrics/PerceivedComplexity def call(worker_class, msg, queue, redis_pool = nil) - return yield if (defined?(Sidekiq::Testing) && Sidekiq::Testing.inline?) - - worker_class = worker_class.camelize.constantize if worker_class.is_a?(String) + if worker_class.is_a?(String) + worker_class = worker_class.camelize.constantize + end options = worker_class.get_sidekiq_options batch = - options.key?('batch_flush_size') || - options.key?('batch_flush_interval') || - options.key?('batch_size') + options.key?("batch_flush_size") || + options.key?("batch_flush_interval") || + options.key?("batch_size") passthrough = - msg['args'] && - msg['args'].is_a?(Array) && - msg['args'].try(:first) == true + msg["args"].is_a?(Array) && + msg["args"].try(:first) == true retrying = msg["failed_at"].present? return yield unless batch - if !(passthrough || retrying) - add_to_batch(worker_class, queue, msg, redis_pool) - else - msg['args'].shift if passthrough + if inline_mode? + wrapped_args = [[msg["args"]]] + msg["args"] = wrapped_args + return yield + end + + if passthrough || retrying + msg["args"].shift if passthrough yield + else + add_to_batch(worker_class, queue, msg, redis_pool) end end + # rubocop:enable Metrics/AbcSize, Metrics/CyclomaticComplexity, Metrics/MethodLength, Metrics/PerceivedComplexity private def add_to_batch(worker_class, queue, msg, redis_pool = nil) - add_method = worker_class.get_sidekiq_options['batch_merge_array'] ? :merge : :add - Sidekiq::Grouping::Batch .new(worker_class.name, queue, redis_pool) - .public_send(add_method, msg['args']) + .public_send(add_method(worker_class), msg["args"]) nil end + + def add_method(worker_class) + if worker_class.get_sidekiq_options["batch_merge_array"] + :merge + else + :add + end + end + + def inline_mode? + defined?(Sidekiq::Testing) && Sidekiq::Testing.inline? + end end end end diff --git a/lib/sidekiq/grouping/redis.rb b/lib/sidekiq/grouping/redis.rb index 7388ab5..5297c44 100644 --- a/lib/sidekiq/grouping/redis.rb +++ b/lib/sidekiq/grouping/redis.rb @@ -1,225 +1,128 @@ +# frozen_string_literal: true + +require_relative "redis_dispatcher" +require_relative "redis_scripts" +require_relative "adapters/redis_adapter" +require_relative "adapters/redis_client_adapter" + module Sidekiq module Grouping class Redis - SCRIPT_HASHES = Concurrent::Map.new - - PLUCK_SCRIPT = <<-SCRIPT - local pluck_values = redis.call('lrange', KEYS[1], 0, ARGV[1] - 1) - redis.call('ltrim', KEYS[1], ARGV[1], -1) - for k, v in pairs(pluck_values) do - redis.call('srem', KEYS[2], v) - end - return pluck_values - SCRIPT - - RELIABLE_PLUCK_SCRIPT = <<-LUA - local queue = KEYS[1] - local unique_messages = KEYS[2] - local pending_jobs = KEYS[3] - local current_time = KEYS[4] - local this_job = KEYS[5] - local limit = tonumber(ARGV[1]) - - redis.call('zadd', pending_jobs, current_time, this_job) - local values = {} - for i = 1, math.min(limit, redis.call('llen', queue)) do - table.insert(values, redis.call('lmove', queue, this_job, 'left', 'right')) - end - if #values > 0 then - redis.call('srem', unique_messages, unpack(values)) - end - - return {this_job, values} - LUA - - REQUEUE_SCRIPT = <<-LUA - local expired_queue = KEYS[1] - local queue = KEYS[2] - local pending_jobs = KEYS[3] - - local to_requeue = redis.call('llen', expired_queue) - for i = 1, to_requeue do - redis.call('lmove', expired_queue, queue, 'left', 'right') - end - redis.call('zrem', pending_jobs, expired_queue) - LUA - - UNIQUE_REQUEUE_SCRIPT = <<-LUA - local expired_queue = KEYS[1] - local queue = KEYS[2] - local pending_jobs = KEYS[3] - local unique_messages = KEYS[4] - - local to_requeue = redis.call('lrange', expired_queue, 0, -1) - for i = 1, #to_requeue do - local message = to_requeue[i] - if redis.call('sismember', unique_messages, message) == 0 then - redis.call('lmove', expired_queue, queue, 'left', 'right') - else - redis.call('lpop', expired_queue) - end - end - redis.call('zrem', pending_jobs, expired_queue) - LUA - - MERGE_ARRAY_SCRIPT = <<-LUA - local batches = KEYS[1] - local name = KEYS[2] - local namespaced_name = KEYS[3] - local unique_messages_key = KEYS[4] - local remember_unique = KEYS[5] - local messages = ARGV - - if remember_unique == 'true' then - local existing_messages = redis.call('smismember', unique_messages_key, unpack(messages)) - local result = {} - - for index, value in ipairs(messages) do - if existing_messages[index] == 0 then - result[#result + 1] = value - end - end - - messages = result - end - - redis.call('sadd', batches, name) - redis.call('rpush', namespaced_name, unpack(messages)) - if remember_unique == 'true' then - redis.call('sadd', unique_messages_key, unpack(messages)) - end - LUA - - SCRIPTS = { - pluck: PLUCK_SCRIPT, - reliable_pluck: RELIABLE_PLUCK_SCRIPT, - requeue: REQUEUE_SCRIPT, - unique_requeue: UNIQUE_REQUEUE_SCRIPT, - merge_array: MERGE_ARRAY_SCRIPT - }.freeze - - class << self - def redis(&block) - Sidekiq.redis(&block) - end + include RedisDispatcher - def script_hash(key) - SCRIPT_HASHES.compute_if_absent(key) do - redis { |conn| conn.script(:load, SCRIPTS[key]) } - end - end + def initialize + @adapter = if new_redis_client? + Adapters::RedisClientAdapter.new + else + Adapters::RedisAdapter.new + end end - def push_msg(name, msg, remember_unique = false) + def push_msg(name, msg, remember_unique: false) redis do |conn| conn.multi do |pipeline| - pipeline.sadd(ns("batches"), name) - pipeline.rpush(ns(name), msg) - pipeline.sadd(unique_messages_key(name), msg) if remember_unique + sadd = pipeline.respond_to?(:sadd?) ? :sadd? : :sadd + redis_connection_call(pipeline, sadd, ns("batches"), name) + redis_connection_call(pipeline, :rpush, ns(name), msg) + + if remember_unique + redis_connection_call( + pipeline, sadd, unique_messages_key(name), msg + ) + end end end end - def push_messages(name, messages, remember_unique = false) - keys = [ns('batches'), name, ns(name), unique_messages_key(name), remember_unique] - args = [messages] - redis { |conn| conn.evalsha script_hash(:merge_array), keys, args } + def push_messages(name, messages, remember_unique: false) + @adapter.push_messages(name, messages, remember_unique: remember_unique) end def enqueued?(name, msg) - redis do |conn| - conn.sismember(unique_messages_key(name), msg) - end + member = redis_call(:sismember, unique_messages_key(name), msg) + return member if member.is_a?(TrueClass) || member.is_a?(FalseClass) + + member != 0 end def batch_size(name) - redis { |conn| conn.llen(ns(name)) } + redis_call(:llen, ns(name)) end def batches - redis { |conn| conn.smembers(ns("batches")) } + redis_call(:smembers, ns("batches")) end def pluck(name, limit) - keys = [ns(name), unique_messages_key(name)] - args = [limit] - redis { |conn| conn.evalsha script_hash(:pluck), keys, args } + @adapter.pluck(name, limit) end def reliable_pluck(name, limit) - keys = [ns(name), unique_messages_key(name), pending_jobs(name), Time.now.to_i, this_job_name(name)] - args = [limit] - redis { |conn| conn.evalsha script_hash(:reliable_pluck), keys, args } + @adapter.reliable_pluck(name, limit) end def get_last_execution_time(name) - redis { |conn| conn.get(ns("last_execution_time:#{name}")) } + redis_call(:get, ns("last_execution_time:#{name}")) end def set_last_execution_time(name, time) - redis { |conn| conn.set(ns("last_execution_time:#{name}"), time.to_json) } + redis_call( + :set, ns("last_execution_time:#{name}"), time.to_json + ) end def lock(name) - redis do |conn| - id = ns("lock:#{name}") - conn.set(id, true, nx: true, ex: Sidekiq::Grouping::Config.lock_ttl) - end + redis_call( + :set, + ns("lock:#{name}"), + "true", + nx: true, + ex: Sidekiq::Grouping::Config.lock_ttl + ) end def delete(name) redis do |conn| - conn.del(ns("last_execution_time:#{name}")) - conn.del(ns(name)) - conn.srem(ns('batches'), name) + redis_connection_call(conn, :del, ns("last_execution_time:#{name}")) + redis_connection_call(conn, :del, ns(name)) + redis_connection_call(conn, :srem, ns("batches"), name) end end def remove_from_pending(name, batch_name) redis do |conn| conn.multi do |pipeline| - pipeline.del(batch_name) - pipeline.zrem(pending_jobs(name), batch_name) + redis_connection_call(pipeline, :del, batch_name) + redis_connection_call( + pipeline, :zrem, pending_jobs(name), batch_name + ) end end end - def requeue_expired(name, unique = false, ttl = 3600) + def requeue_expired(name, unique: false, ttl: 3600) redis do |conn| - conn.zrangebyscore(pending_jobs(name), '0', Time.now.to_i - ttl).each do |expired| - keys = [expired, ns(name), pending_jobs(name), unique_messages_key(name)] - args = [] - script = unique ? script_hash(:unique_requeue) : script_hash(:requeue) - conn.evalsha script, keys, args + redis_connection_call( + conn, :zrangebyscore, pending_jobs(name), "0", Time.now.to_i - ttl + ).each do |expired| + @adapter.requeue_expired(conn, unique, expired, name) end end end private - def unique_messages_key name + def unique_messages_key(name) ns("#{name}:unique_messages") end - def pending_jobs name + def pending_jobs(name) ns("#{name}:pending_jobs") end - def this_job_name name - ns("#{name}:#{SecureRandom.hex}") - end - def ns(key = nil) "batching:#{key}" end - - def script_hash(key) - self.class.script_hash(key) - end - - def redis(&block) - self.class.redis(&block) - end end end end diff --git a/lib/sidekiq/grouping/redis_dispatcher.rb b/lib/sidekiq/grouping/redis_dispatcher.rb new file mode 100644 index 0000000..bdf2d25 --- /dev/null +++ b/lib/sidekiq/grouping/redis_dispatcher.rb @@ -0,0 +1,29 @@ +# frozen_string_literal: true + +module Sidekiq + module Grouping + module RedisDispatcher + def redis_call(command, *args, **kwargs) + redis do |connection| + redis_connection_call(connection, command, *args, **kwargs) + end + end + + def redis_connection_call(connection, command, *args, **kwargs) + if new_redis_client? # redis-client + connection.call(command.to_s.upcase, *args, **kwargs) + else # redis + connection.public_send(command, *args, **kwargs) + end + end + + def new_redis_client? + Sidekiq::VERSION[0].to_i >= 7 + end + + def redis(&block) + Sidekiq.redis(&block) + end + end + end +end diff --git a/lib/sidekiq/grouping/redis_scripts.rb b/lib/sidekiq/grouping/redis_scripts.rb new file mode 100644 index 0000000..22a37eb --- /dev/null +++ b/lib/sidekiq/grouping/redis_scripts.rb @@ -0,0 +1,117 @@ +# frozen_string_literal: true + +module Sidekiq + module Grouping + class RedisScripts + SCRIPT_HASHES = Concurrent::Map.new + + PLUCK_SCRIPT = <<-SCRIPT + local pluck_values = redis.call('lpop', KEYS[1], ARGV[1]) or {} + if #pluck_values > 0 then + redis.call('srem', KEYS[2], unpack(pluck_values)) + end + return pluck_values + SCRIPT + + RELIABLE_PLUCK_SCRIPT = <<-LUA + local queue = KEYS[1] + local unique_messages = KEYS[2] + local pending_jobs = KEYS[3] + local current_time = KEYS[4] + local this_job = KEYS[5] + local limit = tonumber(ARGV[1]) + + redis.call('zadd', pending_jobs, current_time, this_job) + local values = {} + for i = 1, math.min(limit, redis.call('llen', queue)) do + table.insert(values, redis.call('lmove', queue, this_job, 'left', 'right')) + end + if #values > 0 then + redis.call('srem', unique_messages, unpack(values)) + end + + return {this_job, values} + LUA + + REQUEUE_SCRIPT = <<-LUA + local expired_queue = KEYS[1] + local queue = KEYS[2] + local pending_jobs = KEYS[3] + + local to_requeue = redis.call('llen', expired_queue) + for i = 1, to_requeue do + redis.call('lmove', expired_queue, queue, 'left', 'right') + end + redis.call('zrem', pending_jobs, expired_queue) + LUA + + UNIQUE_REQUEUE_SCRIPT = <<-LUA + local expired_queue = KEYS[1] + local queue = KEYS[2] + local pending_jobs = KEYS[3] + local unique_messages = KEYS[4] + + local to_requeue = redis.call('lrange', expired_queue, 0, -1) + for i = 1, #to_requeue do + local message = to_requeue[i] + if redis.call('sismember', unique_messages, message) == 0 then + redis.call('lmove', expired_queue, queue, 'left', 'right') + else + redis.call('lpop', expired_queue) + end + end + redis.call('zrem', pending_jobs, expired_queue) + LUA + + MERGE_ARRAY_SCRIPT = <<-LUA + local batches = KEYS[1] + local name = KEYS[2] + local namespaced_name = KEYS[3] + local unique_messages_key = KEYS[4] + local remember_unique = KEYS[5] + local messages = ARGV + + if remember_unique == 'true' then + local existing_messages = redis.call('smismember', unique_messages_key, unpack(messages)) + local result = {} + + for index, value in ipairs(messages) do + if existing_messages[index] == 0 then + result[#result + 1] = value + end + end + + messages = result + end + + redis.call('sadd', batches, name) + redis.call('rpush', namespaced_name, unpack(messages)) + if remember_unique == 'true' then + redis.call('sadd', unique_messages_key, unpack(messages)) + end + LUA + + SCRIPTS = { + pluck: PLUCK_SCRIPT, + reliable_pluck: RELIABLE_PLUCK_SCRIPT, + requeue: REQUEUE_SCRIPT, + unique_requeue: UNIQUE_REQUEUE_SCRIPT, + merge_array: MERGE_ARRAY_SCRIPT + }.freeze + + class << self + include RedisDispatcher + + def script_hash(key) + SCRIPT_HASHES.compute_if_absent(key) do + redis_call(:script, "LOAD", SCRIPTS[key]) + end + end + end + + def script_hash(key) + self.class.script_hash(key) + end + end + end +end diff --git a/lib/sidekiq/grouping/reliable_batch.rb b/lib/sidekiq/grouping/reliable_batch.rb index 8c93971..10e6c1b 100644 --- a/lib/sidekiq/grouping/reliable_batch.rb +++ b/lib/sidekiq/grouping/reliable_batch.rb @@ -1,3 +1,5 @@ +# frozen_string_literal: true + module Sidekiq module Grouping class ReliableBatch < Batch @@ -6,9 +8,9 @@ def flush return unless chunk Sidekiq::Client.push( - 'class' => @worker_class, - 'queue' => @queue, - 'args' => [true, chunk] + "class" => @worker_class, + "queue" => @queue, + "args" => [true, chunk] ) @redis.remove_from_pending(@name, pending_name) set_current_time_as_last @@ -21,13 +23,17 @@ def pluck end def requeue_expired - @redis.requeue_expired(@name, worker_class_options['batch_unique'], reliable_ttl) + @redis.requeue_expired( + @name, + unique: worker_class_options["batch_unique"], + ttl: reliable_ttl + ) end private def reliable_ttl - worker_class_options['batch_reliable_ttl'] || 3600 + worker_class_options["batch_reliable_ttl"] || 3600 end end end diff --git a/lib/sidekiq/grouping/supervisor.rb b/lib/sidekiq/grouping/supervisor.rb index 58c8713..cfeff97 100644 --- a/lib/sidekiq/grouping/supervisor.rb +++ b/lib/sidekiq/grouping/supervisor.rb @@ -1,9 +1,15 @@ -class Sidekiq::Grouping::Supervisor - def requeue_expired - Sidekiq::Grouping::Batch.all.each do |batch| - next unless batch.is_a?(Sidekiq::Grouping::ReliableBatch) +# frozen_string_literal: true - batch.requeue_expired +module Sidekiq + module Grouping + class Supervisor + def requeue_expired + Sidekiq::Grouping::Batch.all.each do |batch| + next unless batch.is_a?(Sidekiq::Grouping::ReliableBatch) + + batch.requeue_expired + end + end end end end diff --git a/lib/sidekiq/grouping/version.rb b/lib/sidekiq/grouping/version.rb index 8a0e141..c9beea6 100644 --- a/lib/sidekiq/grouping/version.rb +++ b/lib/sidekiq/grouping/version.rb @@ -1,5 +1,7 @@ +# frozen_string_literal: true + module Sidekiq module Grouping - VERSION = "1.1.0.ps2" + VERSION = "1.3.0.ps1" end end diff --git a/lib/sidekiq/grouping/web.rb b/lib/sidekiq/grouping/web.rb index 12804e6..007e6f0 100644 --- a/lib/sidekiq/grouping/web.rb +++ b/lib/sidekiq/grouping/web.rb @@ -1,24 +1,29 @@ -require 'sidekiq/web' +# frozen_string_literal: true + +require "sidekiq/web" module Sidekiq module Grouping module Web - VIEWS = File.expand_path('views', File.dirname(__FILE__)) + VIEWS = File.expand_path("views", File.dirname(__FILE__)) def self.registered(app) app.get "/grouping" do @batches = Sidekiq::Grouping::Batch.all - erb File.read(File.join(VIEWS, 'index.erb')), locals: {view_path: VIEWS} + erb File.read(File.join(VIEWS, "index.erb")), + locals: { view_path: VIEWS } end app.post "/grouping/:name/delete" do - worker_class, queue = Sidekiq::Grouping::Batch.extract_worker_klass_and_queue(params['name']) + worker_class, queue = + Sidekiq::Grouping::Batch.extract_worker_klass_and_queue( + params["name"] + ) batch = Sidekiq::Grouping::Batch.new(worker_class, queue) batch.delete redirect "#{root_path}grouping" end end - end end end diff --git a/sidekiq-grouping.gemspec b/sidekiq-grouping.gemspec index ccfb840..bf3444d 100644 --- a/sidekiq-grouping.gemspec +++ b/sidekiq-grouping.gemspec @@ -1,4 +1,6 @@ -lib = File.expand_path("../lib", __FILE__) +# frozen_string_literal: true + +lib = File.expand_path("lib", __dir__) $LOAD_PATH.unshift(lib) unless $LOAD_PATH.include?(lib) require "sidekiq/grouping/version" @@ -7,26 +9,33 @@ Gem::Specification.new do |spec| spec.version = Sidekiq::Grouping::VERSION spec.authors = ["Victor Sokolov"] spec.email = ["gzigzigzeo@gmail.com"] - spec.summary = %q( + spec.summary = <<~SUMMARY Allows identical sidekiq jobs to be processed with a single background call - ) + SUMMARY spec.homepage = "http://github.com/gzigzigzeo/sidekiq-grouping" spec.license = "MIT" spec.files = `git ls-files -z`.split("\x0") spec.executables = spec.files.grep(%r{^bin/}) { |f| File.basename(f) } - spec.test_files = spec.files.grep(%r{^(test|spec|features)/}) spec.require_paths = ["lib"] + spec.required_ruby_version = ">= 2.7.0" + + # rubocop:disable Gemspec/DevelopmentDependencies + spec.add_development_dependency "appraisal" spec.add_development_dependency "bundler", "> 1.5" + spec.add_development_dependency "pry" spec.add_development_dependency "rake" spec.add_development_dependency "rspec" - spec.add_development_dependency "simplecov" spec.add_development_dependency "rspec-sidekiq" + spec.add_development_dependency "rubocop" + spec.add_development_dependency "rubocop-rspec" + spec.add_development_dependency "simplecov" spec.add_development_dependency "timecop" - spec.add_development_dependency "appraisal" + # rubocop:enable Gemspec/DevelopmentDependencies spec.add_dependency "activesupport" - spec.add_dependency "sidekiq", ">= 3.4.2" spec.add_dependency "concurrent-ruby" + spec.add_dependency "sidekiq", ">= 3.4.2" + spec.metadata["rubygems_mfa_required"] = "true" end diff --git a/spec/modules/batch_spec.rb b/spec/modules/batch_spec.rb index 0e29a1e..e86da62 100644 --- a/spec/modules/batch_spec.rb +++ b/spec/modules/batch_spec.rb @@ -1,67 +1,80 @@ -require 'spec_helper' +# frozen_string_literal: true -describe Sidekiq::Grouping::Batch do - subject { Sidekiq::Grouping::Batch } +require "spec_helper" - context 'adding' do - it 'must enqueue unbatched worker' do - RegularWorker.perform_async('bar') - expect(RegularWorker).to have_enqueued_sidekiq_job("bar") +describe Sidekiq::Grouping::Batch do # rubocop:disable RSpec/SpecFilePathFormat + subject(:batch_service) { described_class } + + context "when adding" do + it "must enqueue unbatched worker" do + RegularWorker.perform_async("bar") + expect(RegularWorker).to have_enqueued_sidekiq_job("bar") end - it 'must not enqueue batched worker' do - BatchedSizeWorker.perform_async('bar') - expect_batch(BatchedSizeWorker, 'batched_size') + it "must not enqueue batched worker based on batch size setting" do + BatchedSizeWorker.perform_async("bar") + expect_batch(BatchedSizeWorker, "batched_size") end - it 'must not enqueue batched worker' do - BatchedIntervalWorker.perform_async('bar') - expect_batch(BatchedIntervalWorker, 'batched_interval') + it "must not enqueue batched worker based on interval setting" do + BatchedIntervalWorker.perform_async("bar") + expect_batch(BatchedIntervalWorker, "batched_interval") end - it 'must not enqueue batched worker' do - BatchedBothWorker.perform_async('bar') - expect_batch(BatchedBothWorker, 'batched_both') + it "must not enqueue batched worker based on both settings" do + BatchedBothWorker.perform_async("bar") + expect_batch(BatchedBothWorker, "batched_both") end - it 'must not enqueue batched worker' do - ReliableBatchedSizeWorker.perform_async('bar') - expect_batch(ReliableBatchedSizeWorker, 'reliable_batched_size') + it "must not enqueue batched worker based on reliable size setting" do + ReliableBatchedSizeWorker.perform_async("bar") + expect_batch(ReliableBatchedSizeWorker, "reliable_batched_size") end - it 'must not enqueue batched worker' do - ReliableBatchedUniqueSizeWorker.perform_async('bar') - expect_batch(ReliableBatchedUniqueSizeWorker, 'reliable_batched_unique_size') + it "must not enqueue batched worker for on reliable unique size setting" do + ReliableBatchedUniqueSizeWorker.perform_async("bar") + expect_batch( + ReliableBatchedUniqueSizeWorker, + "reliable_batched_unique_size" + ) end - context 'in bulk' do - it 'inserts in batches' do + context "when adding in bulk" do + it "inserts in batches", :aggregate_failures do messages = (0..1005).map(&:to_s) - mock_redis = Sidekiq::Grouping::Redis.new + mock_redis = instance_double( + Sidekiq::Grouping::Redis, push_messages: [] + ) allow(Sidekiq::Grouping::Redis).to receive(:new).and_return(mock_redis) - expect(mock_redis).to receive(:push_messages).with(anything, messages[0..999].map(&:to_json), anything).and_call_original - expect(mock_redis).to receive(:push_messages).with(anything, messages[1000..1005].map(&:to_json), anything).and_call_original BatchedBulkInsertWorker.perform_async(messages) - batch = subject.new(BatchedBulkInsertWorker.name, 'batched_bulk_insert') - expect(batch.size).to eq(1006) + expect(mock_redis).to have_received(:push_messages).with( + anything, + messages[0..999].map(&:to_json), + remember_unique: anything + ) + expect(mock_redis).to have_received(:push_messages).with( + anything, + messages[1000..1005].map(&:to_json), + remember_unique: anything + ) end - it 'raises an exception if argument is not an array' do + it "raises an exception if argument is not an array" do failed = false begin - BatchedBulkInsertWorker.perform_async('potato') - rescue StandardError => e + BatchedBulkInsertWorker.perform_async("potato") + rescue StandardError failed = true end expect(failed).to be_truthy end - it 'raises an exception if argument is not a single array' do + it "raises an exception if argument is not a single array" do failed = false begin - BatchedBulkInsertWorker.perform_async(['potato'], ['tomato']) - rescue StandardError => e + BatchedBulkInsertWorker.perform_async(["potato"], ["tomato"]) + rescue StandardError failed = true end expect(failed).to be_truthy @@ -69,46 +82,46 @@ end end - context 'checking if should flush' do - it 'must flush if limit exceeds for limit worker' do - batch = subject.new(BatchedSizeWorker.name, 'batched_size') + context "when checking if should flush" do + it "must flush if limit exceeds for limit worker", :aggregate_failures do + batch = batch_service.new(BatchedSizeWorker.name, "batched_size") - expect(batch.could_flush?).to be_falsy - BatchedSizeWorker.perform_async('bar') - expect(batch.could_flush?).to be_falsy - 4.times { BatchedSizeWorker.perform_async('bar') } - expect(batch.could_flush?).to be_truthy + expect(batch).not_to be_could_flush + BatchedSizeWorker.perform_async("bar") + expect(batch).not_to be_could_flush + 4.times { BatchedSizeWorker.perform_async("bar") } + expect(batch).to be_could_flush end - it 'must flush if limit exceeds for both worker' do - batch = subject.new(BatchedBothWorker.name, 'batched_both') + it "must flush if limit exceeds for both worker", :aggregate_failures do + batch = batch_service.new(BatchedBothWorker.name, "batched_both") - expect(batch.could_flush?).to be_falsy - BatchedBothWorker.perform_async('bar') - expect(batch.could_flush?).to be_falsy - 4.times { BatchedBothWorker.perform_async('bar') } - expect(batch.could_flush?).to be_truthy + expect(batch).not_to be_could_flush + BatchedBothWorker.perform_async("bar") + expect(batch).not_to be_could_flush + 4.times { BatchedBothWorker.perform_async("bar") } + expect(batch).to be_could_flush end - it 'must flush if limit okay but time came' do - batch = subject.new(BatchedIntervalWorker.name, 'batched_interval') + it "must flush if limit okay but time came", :aggregate_failures do + batch = batch_service.new(BatchedIntervalWorker.name, "batched_interval") - expect(batch.could_flush?).to be_falsy - BatchedIntervalWorker.perform_async('bar') - expect(batch.could_flush?).to be_falsy + expect(batch).not_to be_could_flush + BatchedIntervalWorker.perform_async("bar") + expect(batch).not_to be_could_flush expect(batch.size).to eq(1) Timecop.travel(2.hours.since) - expect(batch.could_flush?).to be_truthy + expect(batch).to be_could_flush end end - context 'flushing' do - it 'must put worker to queue on flush' do - batch = subject.new(BatchedSizeWorker.name, 'batched_size') + context "when flushing" do + it "must put worker to queue on flush", :aggregate_failures do + batch = batch_service.new(BatchedSizeWorker.name, "batched_size") - expect(batch.could_flush?).to be_falsy + expect(batch).not_to be_could_flush 10.times { |n| BatchedSizeWorker.perform_async("bar#{n}") } batch.flush expect(BatchedSizeWorker).to( @@ -118,65 +131,93 @@ end end - context 'with similar args' do - context 'option batch_unique = true' do - it 'enqueues once' do - batch = subject.new(BatchedUniqueArgsWorker.name, 'batched_unique_args') - 3.times { BatchedUniqueArgsWorker.perform_async('bar', 1) } + context "with similar args" do + context "when option batch_unique = true" do + it "enqueues once" do + batch = batch_service.new( + BatchedUniqueArgsWorker.name, + "batched_unique_args" + ) + 3.times { BatchedUniqueArgsWorker.perform_async("bar", 1) } expect(batch.size).to eq(1) end - it 'enqueues once each unique set of args' do - batch = subject.new(BatchedUniqueArgsWorker.name, 'batched_unique_args') - 3.times { BatchedUniqueArgsWorker.perform_async('bar', 1) } - 6.times { BatchedUniqueArgsWorker.perform_async('baz', 1) } - 3.times { BatchedUniqueArgsWorker.perform_async('bar', 1) } - 2.times { BatchedUniqueArgsWorker.perform_async('baz', 3) } - 7.times { BatchedUniqueArgsWorker.perform_async('bar', 1) } + it "enqueues once each unique set of args" do + batch = batch_service.new( + BatchedUniqueArgsWorker.name, + "batched_unique_args" + ) + 3.times { BatchedUniqueArgsWorker.perform_async("bar", 1) } + 6.times { BatchedUniqueArgsWorker.perform_async("baz", 1) } + 3.times { BatchedUniqueArgsWorker.perform_async("bar", 1) } + 2.times { BatchedUniqueArgsWorker.perform_async("baz", 3) } + 7.times { BatchedUniqueArgsWorker.perform_async("bar", 1) } expect(batch.size).to eq(3) end - context 'flushing' do + it "flushes the workers" do + batch = batch_service.new( + BatchedUniqueArgsWorker.name, + "batched_unique_args" + ) + 2.times { BatchedUniqueArgsWorker.perform_async("bar", 1) } + 2.times { BatchedUniqueArgsWorker.perform_async("baz", 1) } + batch.flush + expect(batch.size).to eq(0) + end - it 'works' do - batch = subject.new(BatchedUniqueArgsWorker.name, 'batched_unique_args') - 2.times { BatchedUniqueArgsWorker.perform_async('bar', 1) } - 2.times { BatchedUniqueArgsWorker.perform_async('baz', 1) } - batch.flush - expect(batch.size).to eq(0) - end + it "allows to enqueue again after flush" do + batch = batch_service.new( + BatchedUniqueArgsWorker.name, + "batched_unique_args" + ) + 2.times { BatchedUniqueArgsWorker.perform_async("bar", 1) } + 2.times { BatchedUniqueArgsWorker.perform_async("baz", 1) } + batch.flush + BatchedUniqueArgsWorker.perform_async("bar", 1) + BatchedUniqueArgsWorker.perform_async("baz", 1) + expect(batch.size).to eq(2) + end + end - it 'allows to enqueue again after flush' do - batch = subject.new(BatchedUniqueArgsWorker.name, 'batched_unique_args') - 2.times { BatchedUniqueArgsWorker.perform_async('bar', 1) } - 2.times { BatchedUniqueArgsWorker.perform_async('baz', 1) } - batch.flush - BatchedUniqueArgsWorker.perform_async('bar', 1) - BatchedUniqueArgsWorker.perform_async('baz', 1) - expect(batch.size).to eq(2) - end + context "when batch_unique is not specified" do + it "enqueues all" do + batch = batch_service.new(BatchedSizeWorker.name, "batched_size") + 3.times { BatchedSizeWorker.perform_async("bar", 1) } + expect(batch.size).to eq(3) end + end + end + context "when inline mode" do + it "must pass args to worker as array" do + Sidekiq::Testing.inline! do + expect_any_instance_of(BatchedSizeWorker) + .to receive(:perform).with([[1]]) + + BatchedSizeWorker.perform_async(1) + end end - context 'batch_unique is not specified' do - it 'enqueues all' do - batch = subject.new(BatchedSizeWorker.name, 'batched_size') - 3.times { BatchedSizeWorker.perform_async('bar', 1) } - expect(batch.size).to eq(3) + it "must not pass args to worker as array" do + Sidekiq::Testing.inline! do + expect_any_instance_of(RegularWorker).to receive(:perform).with(1) + + RegularWorker.perform_async(1) end end end private - def expect_batch(klass, queue) - expect(klass).to_not have_enqueued_sidekiq_job("bar") - batch = subject.new(klass.name, queue) - stats = subject.all + + def expect_batch(klass, queue) # rubocop:disable Metrics/AbcSize + expect(klass).not_to have_enqueued_sidekiq_job("bar") + batch = batch_service.new(klass.name, queue) + stats = batch_service.all expect(batch.size).to eq(1) expect(stats.size).to eq(1) expect(stats.first.worker_class).to eq(klass.name) expect(stats.first.queue).to eq(queue) - expect(batch.pluck).to eq [['bar']] + expect(batch.pluck).to eq [["bar"]] end end diff --git a/spec/modules/redis_spec.rb b/spec/modules/redis_spec.rb index 858c3d5..5b1ce01 100644 --- a/spec/modules/redis_spec.rb +++ b/spec/modules/redis_spec.rb @@ -1,7 +1,11 @@ -require 'spec_helper' +# frozen_string_literal: true -describe Sidekiq::Grouping::Redis do - subject { Sidekiq::Grouping::Redis.new } +require "spec_helper" + +describe Sidekiq::Grouping::Redis do # rubocop:disable RSpec/SpecFilePathFormat + include Sidekiq::Grouping::RedisDispatcher + + subject(:redis_service) { described_class.new } let(:queue_name) { "my_queue" } let(:key) { "batching:#{queue_name}" } @@ -9,162 +13,218 @@ let(:pending_jobs) { "batching:#{queue_name}:pending_jobs" } describe "#push_msg" do - it "adds message to queue" do - subject.push_msg(queue_name, 'My message') - expect(redis { |c| c.llen key }).to eq 1 - expect(redis { |c| c.lrange key, 0, 1 }).to eq ['My message'] - expect(redis { |c| c.smembers unique_key}).to eq [] + it "adds message to queue", :aggregate_failures do + redis_service.push_msg(queue_name, "My message") + expect(redis_call(:llen, key)).to eq 1 + expect(redis_call(:lrange, key, 0, 1)).to eq ["My message"] + expect(redis_call(:smembers, unique_key)).to eq [] end it "remembers unique message if specified" do - subject.push_msg(queue_name, 'My message', true) - expect(redis { |c| c.smembers unique_key}).to eq ['My message'] + redis_service.push_msg(queue_name, "My message", remember_unique: true) + expect(redis_call(:smembers, unique_key)).to eq ["My message"] end end describe "#pluck" do it "removes messages from queue" do - subject.push_msg(queue_name, "Message 1") - subject.push_msg(queue_name, "Message 2") - subject.pluck(queue_name, 2) - expect(redis { |c| c.llen key }).to eq 0 + redis_service.push_msg(queue_name, "Message 1") + redis_service.push_msg(queue_name, "Message 2") + redis_service.pluck(queue_name, 2) + expect(redis_call(:llen, key)).to eq 0 end - it "forgets unique messages" do - subject.push_msg(queue_name, "Message 1", true) - subject.push_msg(queue_name, "Message 2", true) - expect(redis { |c| c.scard unique_key }).to eq 2 - subject.pluck(queue_name, 2) - expect(redis { |c| c.smembers unique_key }).to eq [] + it "forgets unique messages", :aggregate_failures do + redis_service.push_msg(queue_name, "Message 1", remember_unique: true) + redis_service.push_msg(queue_name, "Message 2", remember_unique: true) + expect(redis_call(:scard, unique_key)).to eq 2 + redis_service.pluck(queue_name, 2) + expect(redis_call(:smembers, unique_key)).to eq [] end end describe "#reliable_pluck" do it "removes messages from queue" do - subject.push_msg(queue_name, "Message 1") - subject.push_msg(queue_name, "Message 2") - subject.reliable_pluck(queue_name, 1000) - expect(redis { |c| c.llen key }).to eq 0 + redis_service.push_msg(queue_name, "Message 1") + redis_service.push_msg(queue_name, "Message 2") + redis_service.reliable_pluck(queue_name, 1000) + expect(redis_call(:llen, key)).to eq 0 end - it "forgets unique messages" do - subject.push_msg(queue_name, "Message 1", true) - subject.push_msg(queue_name, "Message 2", true) - expect(redis { |c| c.scard unique_key }).to eq 2 - subject.reliable_pluck(queue_name, 2) - expect(redis { |c| c.smembers unique_key }).to eq [] + it "forgets unique messages", :aggregate_failures do + redis_service.push_msg(queue_name, "Message 1", remember_unique: true) + redis_service.push_msg(queue_name, "Message 2", remember_unique: true) + expect(redis_call(:scard, unique_key)).to eq 2 + redis_service.reliable_pluck(queue_name, 2) + expect(redis_call(:smembers, unique_key)).to eq [] end - it "tracks the pending jobs" do - subject.push_msg(queue_name, "Message 1", true) - subject.push_msg(queue_name, "Message 2", true) - subject.reliable_pluck(queue_name, 2) - expect(redis { |c| c.zcount(pending_jobs, 0, Time.now.utc.to_i) }).to eq 1 - pending_queue_name = redis { |c| c.zscan(pending_jobs, 0)[1][0][0] } - expect(redis { |c| c.llen(pending_queue_name) }).to eq 2 + it "tracks the pending jobs", :aggregate_failures do + redis_service.push_msg(queue_name, "Message 1", remember_unique: true) + redis_service.push_msg(queue_name, "Message 2", remember_unique: true) + redis_service.reliable_pluck(queue_name, 2) + expect(redis_call(:zcount, pending_jobs, 0, Time.now.utc.to_i)).to eq 1 + pending_queue_name = redis_call(:zscan, pending_jobs, 0)[1][0] + if pending_queue_name.is_a?(Array) + pending_queue_name = pending_queue_name.first + end + expect(redis_call(:llen, pending_queue_name)).to eq 2 end - it "keeps extra items in the queue" do - subject.push_msg(queue_name, "Message 1", true) - subject.push_msg(queue_name, "Message 2", true) - subject.reliable_pluck(queue_name, 1) - expect(redis { |c| c.zcount(pending_jobs, 0, Time.now.utc.to_i) }).to eq 1 - pending_queue_name = redis { |c| c.zscan(pending_jobs, 0)[1][0][0] } - expect(redis { |c| c.llen(pending_queue_name) }).to eq 1 - expect(redis { |c| c.llen key }).to eq 1 + it "keeps extra items in the queue", :aggregate_failures do + redis_service.push_msg(queue_name, "Message 1", remember_unique: true) + redis_service.push_msg(queue_name, "Message 2", remember_unique: true) + redis_service.reliable_pluck(queue_name, 1) + expect(redis_call(:zcount, pending_jobs, 0, Time.now.utc.to_i)).to eq 1 + pending_queue_name = redis_call(:zscan, pending_jobs, 0)[1][0] + if pending_queue_name.is_a?(Array) + pending_queue_name = pending_queue_name.first + end + expect(redis_call(:llen, pending_queue_name)).to eq 1 + expect(redis_call(:llen, key)).to eq 1 end end describe "#remove_from_pending" do - it "removes pending jobs by name" do - subject.push_msg(queue_name, "Message 1", true) - subject.push_msg(queue_name, "Message 2", true) - pending_queue_name, _ = subject.reliable_pluck(queue_name, 2) - expect(redis { |c| c.lrange(pending_queue_name, 0, -1) }).to eq(['Message 1', 'Message 2']) - subject.remove_from_pending(queue_name, pending_queue_name) - expect(redis { |c| c.zcount(pending_jobs, 0, Time.now.utc.to_i) }).to eq 0 - expect(redis { |c| c.lrange(pending_queue_name, 0, -1) }).to eq([]) - expect(redis { |c| c.keys('*') }).not_to include(pending_queue_name) + it "removes pending jobs by name", :aggregate_failures do + redis_service.push_msg(queue_name, "Message 1", remember_unique: true) + redis_service.push_msg(queue_name, "Message 2", remember_unique: true) + pending_queue_name, = redis_service.reliable_pluck(queue_name, 2) + expect(redis_call(:lrange, pending_queue_name, 0, -1)).to eq( + ["Message 1", "Message 2"] + ) + redis_service.remove_from_pending(queue_name, pending_queue_name) + expect(redis_call(:zcount, pending_jobs, 0, Time.now.utc.to_i)).to eq 0 + expect(redis_call(:lrange, pending_queue_name, 0, -1)).to eq([]) + expect(redis_call(:keys, "*")).not_to include(pending_queue_name) end end describe "#requeue_expired" do - it "requeues expired jobs" do - subject.push_msg(queue_name, "Message 1", false) - subject.push_msg(queue_name, "Message 2", false) - pending_queue_name, _ = subject.reliable_pluck(queue_name, 2) - expect(subject.requeue_expired(queue_name, false, 500).size).to eq 0 - redis { |c| c.zincrby pending_jobs, -1000, pending_queue_name } - subject.push_msg(queue_name, "Message 2", false) - expect(subject.requeue_expired(queue_name, false, 500).size).to eq 1 - expect(redis { |c| c.llen key }).to eq 3 - expect(redis { |c| c.lrange(key, 0, -1) }).to match_array(["Message 1", "Message 2", "Message 2"]) + it "requeues expired jobs", :aggregate_failures do + redis_service.push_msg(queue_name, "Message 1") + redis_service.push_msg(queue_name, "Message 2") + pending_queue_name, = redis_service.reliable_pluck(queue_name, 2) + expect( + redis_service.requeue_expired(queue_name, unique: false, ttl: 500).size + ).to eq 0 + redis_call(:zincrby, pending_jobs, -1000, pending_queue_name) + redis_service.push_msg(queue_name, "Message 2", remember_unique: false) + expect( + redis_service.requeue_expired(queue_name, unique: false, ttl: 500).size + ).to eq 1 + expect(redis_call(:llen, key)).to eq 3 + expect(redis_call(:lrange, key, 0, -1)).to contain_exactly( + "Message 1", "Message 2", "Message 2" + ) end - it "removes pending job once enqueued" do - subject.push_msg(queue_name, "Message 1", true) - subject.push_msg(queue_name, "Message 2", true) - pending_queue_name, _ = subject.reliable_pluck(queue_name, 2) - expect(subject.requeue_expired(queue_name, false, 500).size).to eq 0 - redis { |c| c.zincrby pending_jobs, -1000, pending_queue_name } - expect(subject.requeue_expired(queue_name, false, 500).size).to eq 1 - expect(redis { |c| c.zcount(pending_jobs, 0, Time.now.utc.to_i) }).to eq 0 + it "removes pending job once enqueued", :aggregate_failures do + redis_service.push_msg(queue_name, "Message 1", remember_unique: true) + redis_service.push_msg(queue_name, "Message 2", remember_unique: true) + pending_queue_name, = redis_service.reliable_pluck(queue_name, 2) + expect( + redis_service.requeue_expired(queue_name, unique: false, ttl: 500).size + ).to eq 0 + redis_call(:zincrby, pending_jobs, -1000, pending_queue_name) + expect( + redis_service.requeue_expired(queue_name, unique: false, ttl: 500).size + ).to eq 1 + expect(redis_call(:zcount, pending_jobs, 0, Time.now.utc.to_i)).to eq 0 end - context "with batch_unique == true" do + context "with batch_unique == true", :aggregate_failures do it "requeues expired jobs that are not already present" do - subject.push_msg(queue_name, "Message 1", true) - subject.push_msg(queue_name, "Message 2", true) - subject.push_msg(queue_name, "Message 3", true) - pending_queue_name, _ = subject.reliable_pluck(queue_name, 3) - expect(subject.requeue_expired(queue_name, true, 500).size).to eq 0 - redis { |c| c.zincrby pending_jobs, -1000, pending_queue_name } - subject.push_msg(queue_name, "Message 2", true) - expect(subject.requeue_expired(queue_name, true, 500).size).to eq 1 - expect(redis { |c| c.llen key }).to eq 3 - expect(redis { |c| c.lrange(key, 0, -1) }).to match_array(["Message 1", "Message 2", "Message 3"]) + redis_service.push_msg(queue_name, "Message 1", remember_unique: true) + redis_service.push_msg(queue_name, "Message 2", remember_unique: true) + redis_service.push_msg(queue_name, "Message 3", remember_unique: true) + pending_queue_name, = redis_service.reliable_pluck( + queue_name, + 3 + ) + expect( + redis_service.requeue_expired(queue_name, unique: true, ttl: 500).size + ).to eq 0 + redis_call(:zincrby, pending_jobs, -1000, pending_queue_name) + redis_service.push_msg(queue_name, "Message 2", remember_unique: true) + expect( + redis_service.requeue_expired(queue_name, unique: true, ttl: 500).size + ).to eq 1 + expect(redis_call(:llen, key)).to eq 3 + expect(redis_call(:lrange, key, 0, -1)).to contain_exactly( + "Message 1", "Message 2", "Message 3" + ) end - it "removes pending job once enqueued" do - subject.push_msg(queue_name, "Message 1", true) - subject.push_msg(queue_name, "Message 2", true) - pending_queue_name, _ = subject.reliable_pluck(queue_name, 2) - expect(subject.requeue_expired(queue_name, true, 500).size).to eq 0 - redis { |c| c.zincrby pending_jobs, -1000, pending_queue_name } - subject.push_msg(queue_name, "Message 1", true) - expect(subject.requeue_expired(queue_name, true, 500).size).to eq 1 - expect(redis { |c| c.zcount(pending_jobs, 0, Time.now.utc.to_i) }).to eq 0 + it "removes pending job once enqueued", :aggregate_failures do + redis_service.push_msg(queue_name, "Message 1", remember_unique: true) + redis_service.push_msg(queue_name, "Message 2", remember_unique: true) + pending_queue_name, = redis_service.reliable_pluck( + queue_name, + 2 + ) + expect( + redis_service.requeue_expired(queue_name, unique: true, ttl: 500).size + ).to eq 0 + redis_call(:zincrby, pending_jobs, -1000, pending_queue_name) + redis_service.push_msg(queue_name, "Message 1", remember_unique: true) + expect( + redis_service.requeue_expired(queue_name, unique: true, ttl: 500).size + ).to eq 1 + expect( + redis_call(:zcount, pending_jobs, 0, Time.now.utc.to_i) + ).to eq 0 end end end - describe '#push_messages' do - it "adds messages to queue" do - subject.push_messages(queue_name, ['My message', 'My other message', 'My last message']) - expect(redis { |c| c.llen key }).to eq 3 - expect(redis { |c| c.lrange key, 0, 3 }).to eq ['My message', 'My other message', 'My last message'] - expect(redis { |c| c.smembers unique_key}).to eq [] + describe "#push_messages" do + it "adds messages to queue", :aggregate_failures do + redis_service.push_messages( + queue_name, + ["My message", "My other message", "My last message"] + ) + expect(redis_call(:llen, key)).to eq 3 + expect(redis_call(:lrange, key, 0, 3)).to eq( + ["My message", "My other message", "My last message"] + ) + expect(redis_call(:smembers, unique_key)).to eq [] end - it "remembers unique messages if specified" do - subject.push_messages(queue_name, ['My message', 'My other message', 'My last message'], true) - expect(redis { |c| c.lrange key, 0, 3 }).to eq ['My message', 'My other message', 'My last message'] - expect(redis { |c| c.smembers unique_key}).to match_array ['My message', 'My other message', 'My last message'] + it "remembers unique messages if specified", :aggregate_failures do + redis_service.push_messages( + queue_name, + ["My message", "My other message", "My last message"], + remember_unique: true + ) + expect(redis_call(:lrange, key, 0, 3)).to eq( + ["My message", "My other message", "My last message"] + ) + expect(redis_call(:smembers, unique_key)).to contain_exactly( + "My message", "My other message", "My last message" + ) end - it "adds new messages in order" do - subject.push_messages(queue_name, ['My message'], true) - expect(redis { |c| c.smembers unique_key}).to match_array ['My message'] - subject.push_messages(queue_name, ['My other message', 'My message', 'My last message'], true) - expect(redis { |c| c.lrange key, 0, 3 }).to eq ['My message', 'My other message', 'My last message'] - expect(redis { |c| c.smembers unique_key}).to match_array ['My message', 'My other message', 'My last message'] + it "adds new messages in order", :aggregate_failures do + redis_service.push_messages( + queue_name, + ["My message"], + remember_unique: true + ) + expect(redis_call(:smembers, unique_key)).to contain_exactly( + "My message" + ) + redis_service.push_messages( + queue_name, + ["My other message", "My message", "My last message"], + remember_unique: true + ) + expect(redis_call(:lrange, key, 0, 3)).to eq( + ["My message", "My other message", "My last message"] + ) + expect(redis_call(:smembers, unique_key)).to contain_exactly( + "My message", "My other message", "My last message" + ) end end - - private - - def redis(&block) - Sidekiq.redis(&block) - end - end diff --git a/spec/spec_helper.rb b/spec/spec_helper.rb index d5239fd..21339bc 100644 --- a/spec/spec_helper.rb +++ b/spec/spec_helper.rb @@ -1,3 +1,5 @@ +# frozen_string_literal: true + $LOAD_PATH << "." unless $LOAD_PATH.include?(".") require "rubygems" @@ -7,6 +9,7 @@ require "sidekiq" require "rspec-sidekiq" require "support/test_workers" +require "pry" SimpleCov.start do add_filter "spec" @@ -15,8 +18,10 @@ require "sidekiq/grouping" Sidekiq::Grouping.logger = nil -Sidekiq.redis = { namespace: ENV["namespace"] } -Sidekiq.logger = nil +Sidekiq.configure_client do |config| + config.redis = { db: 1 } + config.logger = nil +end RSpec::Sidekiq.configure do |config| config.clear_all_enqueued_jobs = true @@ -28,14 +33,19 @@ config.run_all_when_everything_filtered = true config.filter_run :focus - config.before :each do + config.before do Sidekiq.redis do |conn| - keys = conn.keys "*batching*" - keys.each { |key| conn.del key } + if Sidekiq::VERSION[0].to_i >= 7 + keys = conn.call("KEYS", "*batching*") + keys.each { |key| conn.call("DEL", key) } + else + keys = conn.keys "*batching*" + keys.each { |key| conn.del key } + end end end - config.after :each do + config.after do Timecop.return end end diff --git a/spec/support/test_workers.rb b/spec/support/test_workers.rb index 1665dbb..aa8f575 100644 --- a/spec/support/test_workers.rb +++ b/spec/support/test_workers.rb @@ -1,8 +1,9 @@ +# frozen_string_literal: true + class RegularWorker include Sidekiq::Worker - def perform(foo) - end + def perform(foo); end end class BatchedSizeWorker @@ -10,8 +11,7 @@ class BatchedSizeWorker sidekiq_options queue: :batched_size, batch_flush_size: 3, batch_size: 2 - def perform(foo) - end + def perform(foo); end end class BatchedIntervalWorker @@ -19,8 +19,7 @@ class BatchedIntervalWorker sidekiq_options queue: :batched_interval, batch_flush_interval: 3600 - def perform(foo) - end + def perform(foo); end end class BatchedBothWorker @@ -30,8 +29,7 @@ class BatchedBothWorker queue: :batched_both, batch_flush_interval: 3600, batch_flush_size: 3 ) - def perform(foo) - end + def perform(foo); end end class BatchedUniqueArgsWorker @@ -41,39 +39,46 @@ class BatchedUniqueArgsWorker queue: :batched_unique_args, batch_flush_size: 3, batch_unique: true ) - def perform(foo) - end + def perform(foo); end end class ReliableBatchedSizeWorker include Sidekiq::Worker sidekiq_options( - queue: :reliable_batched_size, batch_flush_size: 3, batch_size: 2, batch_ttl: 10 + queue: :reliable_batched_size, + batch_flush_size: 3, + batch_size: 2, + batch_ttl: 10 ) - def perform(foo) - end + def perform(foo); end end class ReliableBatchedUniqueSizeWorker include Sidekiq::Worker sidekiq_options( - queue: :reliable_batched_unique_size, batch_flush_size: 3, batch_size: 2, batch_ttl: 10, batch_unique: true + queue: :reliable_batched_unique_size, + batch_flush_size: 3, + batch_size: 2, + batch_ttl: 10, + batch_unique: true ) - def perform(foo) - end + def perform(foo); end end class BatchedBulkInsertWorker include Sidekiq::Worker sidekiq_options( - queue: :batched_bulk_insert, batch_flush_size: 3, batch_size: 2, batch_ttl: 10, batch_merge_array: true + queue: :batched_bulk_insert, + batch_flush_size: 3, + batch_size: 2, + batch_ttl: 10, + batch_merge_array: true ) - def perform(foo) - end -end \ No newline at end of file + def perform(foo); end +end