diff --git a/lib/sidekiq/throttled.rb b/lib/sidekiq/throttled.rb index 38915cae..222c15ff 100644 --- a/lib/sidekiq/throttled.rb +++ b/lib/sidekiq/throttled.rb @@ -77,12 +77,13 @@ def configure def throttled?(message) message = Sidekiq.load_json(message) job = message.fetch("wrapped") { message["class"] } + args = message.key?("wrapped") ? message.dig("args", 0, "arguments") : message["args"] jid = message["jid"] return false unless job && jid Registry.get(job) do |strategy| - return strategy.throttled?(jid, *message["args"]) + return strategy.throttled?(jid, *args) end false diff --git a/lib/sidekiq/throttled/middlewares/server.rb b/lib/sidekiq/throttled/middlewares/server.rb index 7ded3172..d45f10b1 100644 --- a/lib/sidekiq/throttled/middlewares/server.rb +++ b/lib/sidekiq/throttled/middlewares/server.rb @@ -13,12 +13,13 @@ class Server def call(_worker, msg, _queue) yield ensure - job = msg.fetch("wrapped") { msg["class"] } - jid = msg["jid"] + job = msg.fetch("wrapped") { msg["class"] } + args = msg.key?("wrapped") ? msg.dig("args", 0, "arguments") : msg["args"] + jid = msg["jid"] if job && jid Registry.get job do |strategy| - strategy.finalize!(jid, *msg["args"]) + strategy.finalize!(jid, *args) end end end diff --git a/spec/lib/sidekiq/throttled/middlewares/server_spec.rb b/spec/lib/sidekiq/throttled/middlewares/server_spec.rb index cc6170fc..460fa991 100644 --- a/spec/lib/sidekiq/throttled/middlewares/server_spec.rb +++ b/spec/lib/sidekiq/throttled/middlewares/server_spec.rb @@ -6,7 +6,9 @@ subject(:middleware) { described_class.new } describe "#call" do + let(:args) { ["bar", 1] } let(:payload) { { "class" => "foo", "jid" => "bar" } } + let(:payload_args) { { "class" => "foo", "jid" => "bar", "args" => args } } context "when job class has strategy with concurrency constraint" do let! :strategy do @@ -19,6 +21,11 @@ middleware.call(double, payload, double) { |*| :foobar } end + it "calls #finalize! of queue with jid and args of job being processed" do + expect(strategy).to receive(:finalize!).with "bar", *args + middleware.call(double, payload_args, double) { |*| :foobar } + end + it "returns yields control to the given block" do expect { |b| middleware.call(double, payload, double, &b) } .to yield_control @@ -38,6 +45,14 @@ "jid" => "bar" } end + let(:payload_args) do + { + "class" => "ActiveJob::QueueAdapters::SidekiqAdapter::JobWrapper", + "wrapped" => "wrapped-foo", + "args" => [{ "job_class" => "foo", "arguments" => args }], + "jid" => "bar" + } + end let! :strategy do Sidekiq::Throttled::Registry.add payload["wrapped"], @@ -49,6 +64,12 @@ middleware.call(double, payload, double) { |*| :foobar } end + it "calls #finalize! of queue with jid and args of job being processed" do + expect(strategy).to receive(:finalize!).with "bar", *args + middleware.call(double, payload_args, double) { |*| :foobar } + end + + it "returns yields control to the given block" do expect { |b| middleware.call(double, payload, double, &b) } .to yield_control diff --git a/spec/lib/sidekiq/throttled_spec.rb b/spec/lib/sidekiq/throttled_spec.rb index 7d0341d9..b06ee326 100644 --- a/spec/lib/sidekiq/throttled_spec.rb +++ b/spec/lib/sidekiq/throttled_spec.rb @@ -52,6 +52,20 @@ described_class.throttled? message end + it "passes JID and arguments to registered strategy" do + strategy = Sidekiq::Throttled::Registry.add("foo", + threshold: { limit: 1, period: 1 }, + concurrency: { limit: 1 }) + + payload_jid = jid + args = ["foo", 1] + message = %({"class":"foo","jid":#{payload_jid.inspect},"args":#{args.inspect}}) + + expect(strategy).to receive(:throttled?).with payload_jid, *args + + described_class.throttled? message + end + it "unwraps ActiveJob-jobs default sidekiq adapter" do strategy = Sidekiq::Throttled::Registry.add("wrapped-foo", threshold: { limit: 1, period: 1 }, @@ -85,5 +99,24 @@ described_class.throttled? message end + + it "unwraps ActiveJob-jobs job parameters" do + strategy = Sidekiq::Throttled::Registry.add("wrapped-foo", + threshold: { limit: 1, period: 1 }, + concurrency: { limit: 1 }) + + payload_jid = jid + args = ["foo", 1] + message = JSON.dump({ + "class" => "ActiveJob::QueueAdapters::SidekiqAdapter::JobWrapper", + "wrapped" => "wrapped-foo", + "args" => [{ "job_class" => "TestJob", "arguments" => args }], + "jid" => payload_jid + }) + + expect(strategy).to receive(:throttled?).with payload_jid, *args + + described_class.throttled? message + end end end