Skip to content

Commit

Permalink
Unwrap ActiveJob arguments (ixti#184)
Browse files Browse the repository at this point in the history
When using the `concurrency` strategy with dynamic suffix, I'm getting
this error from `sidekiq-throttle`:

`ArgumentError: wrong number of arguments (given 1, expected 2+)`

This happens because the `key_suffix` I'm using is not receiving the
proper arguments. Consider this dummy job:

```ruby
class TestJob < ActiveJob::Base
  include Sidekiq::Throttled::Job

  queue_as :default

  sidekiq_throttle(
    concurrency: {
      limit: 1,
      key_suffix: ->(job_class, index, *) { "job_class:#{job_class}/index:#{index}" },
      ttl: 1.hour.to_i
    }
  )

  def perform(job_class, index)
    puts "Job Performed. job_class: #{job_class}, index: #{index}"
  end
end
```

This follows the documentation, which states that `key_suffix` must
accept the same parameters as the job. However, `sidekiq-throttle` fails
to unwrap the `ActiveJob` arguments and instead it just calls the
`:throttled?` and `:finalize!` methods passing them the arguments in
`msg["args"]`, which in `ActiveJob` is an array of JSON objects, like
this:

```json
{
  "args": [
    {
      "job_class": "TestJob",
      "job_id": "96f6c127-1e0c-4d8c-bdaa-c934b5aaded7",
      "provider_job_id": null,
      "queue_name": "default",
      "priority": null,
      "arguments": ["TestJob", 9],
      "executions": 0,
      "exception_executions": {},
      "locale": "en",
      "timezone": null,
      "enqueued_at": "2024-03-15T11:31:11.464949297Z",
      "scheduled_at": null
    }
  ]
}
```

Some other methods in the `Concurrency` class like `:count` or `:reset!`
also call the `:key` method, so they are vulnerable to this error too.
However, I haven't found any place in the code where these methods are
called with any parameter.

I wrote a very simple workaround that seems to work fine for my use
case, and the test suite passes.
  • Loading branch information
jlledom authored Mar 16, 2024
1 parent 1b71da2 commit 3c1c543
Show file tree
Hide file tree
Showing 4 changed files with 60 additions and 4 deletions.
3 changes: 2 additions & 1 deletion lib/sidekiq/throttled.rb
Original file line number Diff line number Diff line change
Expand Up @@ -77,12 +77,13 @@ def configure
def throttled?(message)
message = Sidekiq.load_json(message)
job = message.fetch("wrapped") { message["class"] }
args = message.key?("wrapped") ? message.dig("args", 0, "arguments") : message["args"]
jid = message["jid"]

return false unless job && jid

Registry.get(job) do |strategy|
return strategy.throttled?(jid, *message["args"])
return strategy.throttled?(jid, *args)
end

false
Expand Down
7 changes: 4 additions & 3 deletions lib/sidekiq/throttled/middlewares/server.rb
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,13 @@ class Server
def call(_worker, msg, _queue)
yield
ensure
job = msg.fetch("wrapped") { msg["class"] }
jid = msg["jid"]
job = msg.fetch("wrapped") { msg["class"] }
args = msg.key?("wrapped") ? msg.dig("args", 0, "arguments") : msg["args"]
jid = msg["jid"]

if job && jid
Registry.get job do |strategy|
strategy.finalize!(jid, *msg["args"])
strategy.finalize!(jid, *args)
end
end
end
Expand Down
21 changes: 21 additions & 0 deletions spec/lib/sidekiq/throttled/middlewares/server_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@
subject(:middleware) { described_class.new }

describe "#call" do
let(:args) { ["bar", 1] }
let(:payload) { { "class" => "foo", "jid" => "bar" } }
let(:payload_args) { { "class" => "foo", "jid" => "bar", "args" => args } }

context "when job class has strategy with concurrency constraint" do
let! :strategy do
Expand All @@ -19,6 +21,11 @@
middleware.call(double, payload, double) { |*| :foobar }
end

it "calls #finalize! of queue with jid and args of job being processed" do
expect(strategy).to receive(:finalize!).with "bar", *args
middleware.call(double, payload_args, double) { |*| :foobar }
end

it "returns yields control to the given block" do
expect { |b| middleware.call(double, payload, double, &b) }
.to yield_control
Expand All @@ -38,6 +45,14 @@
"jid" => "bar"
}
end
let(:payload_args) do
{
"class" => "ActiveJob::QueueAdapters::SidekiqAdapter::JobWrapper",
"wrapped" => "wrapped-foo",
"args" => [{ "job_class" => "foo", "arguments" => args }],
"jid" => "bar"
}
end

let! :strategy do
Sidekiq::Throttled::Registry.add payload["wrapped"],
Expand All @@ -49,6 +64,12 @@
middleware.call(double, payload, double) { |*| :foobar }
end

it "calls #finalize! of queue with jid and args of job being processed" do
expect(strategy).to receive(:finalize!).with "bar", *args
middleware.call(double, payload_args, double) { |*| :foobar }
end


it "returns yields control to the given block" do
expect { |b| middleware.call(double, payload, double, &b) }
.to yield_control
Expand Down
33 changes: 33 additions & 0 deletions spec/lib/sidekiq/throttled_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,20 @@
described_class.throttled? message
end

it "passes JID and arguments to registered strategy" do
strategy = Sidekiq::Throttled::Registry.add("foo",
threshold: { limit: 1, period: 1 },
concurrency: { limit: 1 })

payload_jid = jid
args = ["foo", 1]
message = %({"class":"foo","jid":#{payload_jid.inspect},"args":#{args.inspect}})

expect(strategy).to receive(:throttled?).with payload_jid, *args

described_class.throttled? message
end

it "unwraps ActiveJob-jobs default sidekiq adapter" do
strategy = Sidekiq::Throttled::Registry.add("wrapped-foo",
threshold: { limit: 1, period: 1 },
Expand Down Expand Up @@ -85,5 +99,24 @@

described_class.throttled? message
end

it "unwraps ActiveJob-jobs job parameters" do
strategy = Sidekiq::Throttled::Registry.add("wrapped-foo",
threshold: { limit: 1, period: 1 },
concurrency: { limit: 1 })

payload_jid = jid
args = ["foo", 1]
message = JSON.dump({
"class" => "ActiveJob::QueueAdapters::SidekiqAdapter::JobWrapper",
"wrapped" => "wrapped-foo",
"args" => [{ "job_class" => "TestJob", "arguments" => args }],
"jid" => payload_jid
})

expect(strategy).to receive(:throttled?).with payload_jid, *args

described_class.throttled? message
end
end
end

0 comments on commit 3c1c543

Please sign in to comment.