diff --git a/CHANGES.md b/CHANGES.md index ff9ba29c..dbe9ff35 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -8,6 +8,12 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +### Fixed + +- Correctly unwrap `ActiveJob` arguments: + [#184](https://github.com/ixti/sidekiq-throttled/pull/184), + [#185](https://github.com/ixti/sidekiq-throttled/pull/185). + ## [1.3.0] - 2024-01-18 diff --git a/lib/sidekiq/throttled.rb b/lib/sidekiq/throttled.rb index 222c15ff..51cc6fd0 100644 --- a/lib/sidekiq/throttled.rb +++ b/lib/sidekiq/throttled.rb @@ -5,6 +5,7 @@ require_relative "./throttled/config" require_relative "./throttled/cooldown" require_relative "./throttled/job" +require_relative "./throttled/message" require_relative "./throttled/middlewares/server" require_relative "./throttled/patches/basic_fetch" require_relative "./throttled/patches/super_fetch" @@ -75,15 +76,11 @@ def configure # @param [String] message Job's JSON payload # @return [Boolean] def throttled?(message) - message = Sidekiq.load_json(message) - job = message.fetch("wrapped") { message["class"] } - args = message.key?("wrapped") ? message.dig("args", 0, "arguments") : message["args"] - jid = message["jid"] + message = Message.new(message) + return false unless message.job_class && message.job_id - return false unless job && jid - - Registry.get(job) do |strategy| - return strategy.throttled?(jid, *args) + Registry.get(message.job_class) do |strategy| + return strategy.throttled?(message.job_id, *message.job_args) end false diff --git a/lib/sidekiq/throttled/message.rb b/lib/sidekiq/throttled/message.rb new file mode 100644 index 00000000..c5e77344 --- /dev/null +++ b/lib/sidekiq/throttled/message.rb @@ -0,0 +1,32 @@ +# frozen_string_literal: true + +module Sidekiq + module Throttled + class Message + def initialize(item) + @item = item.is_a?(Hash) ? item : parse(item) + end + + def job_class + @item.fetch("wrapped") { @item["class"] } + end + + def job_args + @item.key?("wrapped") ? @item.dig("args", 0, "arguments") : @item["args"] + end + + def job_id + @item["jid"] + end + + private + + def parse(item) + item = Sidekiq.load_json(item) + item.is_a?(Hash) ? item : {} + rescue JSON::ParserError + {} + end + end + end +end diff --git a/lib/sidekiq/throttled/middlewares/server.rb b/lib/sidekiq/throttled/middlewares/server.rb index d45f10b1..e9bba5a0 100644 --- a/lib/sidekiq/throttled/middlewares/server.rb +++ b/lib/sidekiq/throttled/middlewares/server.rb @@ -1,6 +1,7 @@ # frozen_string_literal: true # internal +require_relative "../message" require_relative "../registry" module Sidekiq @@ -13,13 +14,11 @@ class Server def call(_worker, msg, _queue) yield ensure - job = msg.fetch("wrapped") { msg["class"] } - args = msg.key?("wrapped") ? msg.dig("args", 0, "arguments") : msg["args"] - jid = msg["jid"] + message = Message.new(msg) - if job && jid - Registry.get job do |strategy| - strategy.finalize!(jid, *args) + if message.job_class && message.job_id + Registry.get(message.job_class) do |strategy| + strategy.finalize!(message.job_id, *message.job_args) end end end diff --git a/rubocop/rspec.yml b/rubocop/rspec.yml index e0155c94..7821bda8 100644 --- a/rubocop/rspec.yml +++ b/rubocop/rspec.yml @@ -1,3 +1,7 @@ +RSpec/BeNil: + Enabled: true + EnforcedStyle: be + RSpec/ExampleLength: Enabled: true Max: 10 @@ -9,6 +13,5 @@ RSpec/MultipleExpectations: RSpec/NamedSubject: Enabled: false -RSpec/BeNil: - Enabled: true - EnforcedStyle: be +RSpec/Rails: + Enabled: false diff --git a/spec/lib/sidekiq/throttled/message_spec.rb b/spec/lib/sidekiq/throttled/message_spec.rb new file mode 100644 index 00000000..2f7c849f --- /dev/null +++ b/spec/lib/sidekiq/throttled/message_spec.rb @@ -0,0 +1,183 @@ +# frozen_string_literal: true + +RSpec.describe Sidekiq::Throttled::Message do + subject(:message) do + described_class.new(item) + end + + let(:item) do + { + "class" => "ExcitingJob", + "args" => [42], + "jid" => "deadbeef" + } + end + + describe "#job_class" do + subject { message.job_class } + + it { is_expected.to eq("ExcitingJob") } + + context "with serialized payload" do + let(:item) do + JSON.dump({ + "class" => "ExcitingJob", + "args" => [42], + "jid" => "deadbeef" + }) + end + + it { is_expected.to eq("ExcitingJob") } + end + + context "with ActiveJob payload" do + let(:item) do + { + "class" => "ActiveJob", + "wrapped" => "ExcitingJob", + "args" => [{ "arguments" => [42] }], + "jid" => "deadbeef" + } + end + + it { is_expected.to eq("ExcitingJob") } + end + + context "with serialized ActiveJob payload" do + let(:item) do + JSON.dump({ + "class" => "ActiveJob", + "wrapped" => "ExcitingJob", + "args" => [{ "arguments" => [42] }], + "jid" => "deadbeef" + }) + end + + it { is_expected.to eq("ExcitingJob") } + end + + context "with invalid payload" do + let(:item) { "invalid" } + + it { is_expected.to be nil } + end + + context "with invalid serialized payload" do + let(:item) { JSON.dump("invalid") } + + it { is_expected.to be nil } + end + end + + describe "#job_args" do + subject { message.job_args } + + it { is_expected.to eq([42]) } + + context "with serialized payload" do + let(:item) do + JSON.dump({ + "class" => "ExcitingJob", + "args" => [42], + "jid" => "deadbeef" + }) + end + + it { is_expected.to eq([42]) } + end + + context "with ActiveJob payload" do + let(:item) do + { + "class" => "ActiveJob", + "wrapped" => "ExcitingJob", + "args" => [{ "arguments" => [42] }], + "jid" => "deadbeef" + } + end + + it { is_expected.to eq([42]) } + end + + context "with serialized ActiveJob payload" do + let(:item) do + JSON.dump({ + "class" => "ActiveJob", + "wrapped" => "ExcitingJob", + "args" => [{ "arguments" => [42] }], + "jid" => "deadbeef" + }) + end + + it { is_expected.to eq([42]) } + end + + context "with invalid payload" do + let(:item) { "invalid" } + + it { is_expected.to be nil } + end + + context "with invalid serialized payload" do + let(:item) { JSON.dump("invalid") } + + it { is_expected.to be nil } + end + end + + describe "#job_id" do + subject { message.job_id } + + it { is_expected.to eq("deadbeef") } + + context "with serialized payload" do + let(:item) do + JSON.dump({ + "class" => "ExcitingJob", + "args" => [42], + "jid" => "deadbeef" + }) + end + + it { is_expected.to eq("deadbeef") } + end + + context "with ActiveJob payload" do + let(:item) do + { + "class" => "ActiveJob", + "wrapped" => "ExcitingJob", + "args" => [{ "arguments" => [42] }], + "jid" => "deadbeef" + } + end + + it { is_expected.to eq("deadbeef") } + end + + context "with serialized ActiveJob payload" do + let(:item) do + JSON.dump({ + "class" => "ActiveJob", + "wrapped" => "ExcitingJob", + "args" => [{ "arguments" => [42] }], + "jid" => "deadbeef" + }) + end + + it { is_expected.to eq("deadbeef") } + end + + context "with invalid payload" do + let(:item) { "invalid" } + + it { is_expected.to be nil } + end + + context "with invalid serialized payload" do + let(:item) { JSON.dump("invalid") } + + it { is_expected.to be nil } + end + end +end diff --git a/spec/lib/sidekiq/throttled/middlewares/server_spec.rb b/spec/lib/sidekiq/throttled/middlewares/server_spec.rb index 460fa991..23ff91b4 100644 --- a/spec/lib/sidekiq/throttled/middlewares/server_spec.rb +++ b/spec/lib/sidekiq/throttled/middlewares/server_spec.rb @@ -69,7 +69,6 @@ middleware.call(double, payload_args, double) { |*| :foobar } end - it "returns yields control to the given block" do expect { |b| middleware.call(double, payload, double, &b) } .to yield_control @@ -114,5 +113,31 @@ .to be :foobar end end + + context "when message contains no job class" do + before do + allow(Sidekiq::Throttled::Registry).to receive(:get).and_call_original + payload.delete("class") + end + + it "does not attempt to retrieve any strategy" do + expect { |b| middleware.call(double, payload, double, &b) }.to yield_control + + expect(Sidekiq::Throttled::Registry).not_to receive(:get) + end + end + + context "when message contains no jid" do + before do + allow(Sidekiq::Throttled::Registry).to receive(:get).and_call_original + payload.delete("jid") + end + + it "does not attempt to retrieve any strategy" do + expect { |b| middleware.call(double, payload, double, &b) }.to yield_control + + expect(Sidekiq::Throttled::Registry).not_to receive(:get) + end + end end end diff --git a/spec/lib/sidekiq/throttled_spec.rb b/spec/lib/sidekiq/throttled_spec.rb index b06ee326..a3413514 100644 --- a/spec/lib/sidekiq/throttled_spec.rb +++ b/spec/lib/sidekiq/throttled_spec.rb @@ -54,8 +54,8 @@ it "passes JID and arguments to registered strategy" do strategy = Sidekiq::Throttled::Registry.add("foo", - threshold: { limit: 1, period: 1 }, - concurrency: { limit: 1 }) + threshold: { limit: 1, period: 1 }, + concurrency: { limit: 1 }) payload_jid = jid args = ["foo", 1] @@ -102,8 +102,8 @@ it "unwraps ActiveJob-jobs job parameters" do strategy = Sidekiq::Throttled::Registry.add("wrapped-foo", - threshold: { limit: 1, period: 1 }, - concurrency: { limit: 1 }) + threshold: { limit: 1, period: 1 }, + concurrency: { limit: 1 }) payload_jid = jid args = ["foo", 1]