Skip to content

Commit

Permalink
fix: DRY-up job class/args extraction
Browse files Browse the repository at this point in the history
Related-PR: ixti#184
  • Loading branch information
ixti committed Apr 7, 2024
1 parent 3c1c543 commit c151789
Show file tree
Hide file tree
Showing 8 changed files with 267 additions and 22 deletions.
6 changes: 6 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,12 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

### Fixed

- Correctly unwrap `ActiveJob` arguments:
[#184](https://github.com/ixti/sidekiq-throttled/pull/184),
[#185](https://github.com/ixti/sidekiq-throttled/pull/185).


## [1.3.0] - 2024-01-18

Expand Down
13 changes: 5 additions & 8 deletions lib/sidekiq/throttled.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
require_relative "./throttled/config"
require_relative "./throttled/cooldown"
require_relative "./throttled/job"
require_relative "./throttled/message"
require_relative "./throttled/middlewares/server"
require_relative "./throttled/patches/basic_fetch"
require_relative "./throttled/patches/super_fetch"
Expand Down Expand Up @@ -75,15 +76,11 @@ def configure
# @param [String] message Job's JSON payload
# @return [Boolean]
def throttled?(message)
message = Sidekiq.load_json(message)
job = message.fetch("wrapped") { message["class"] }
args = message.key?("wrapped") ? message.dig("args", 0, "arguments") : message["args"]
jid = message["jid"]
message = Message.new(message)
return false unless message.job_class && message.job_id

return false unless job && jid

Registry.get(job) do |strategy|
return strategy.throttled?(jid, *args)
Registry.get(message.job_class) do |strategy|
return strategy.throttled?(message.job_id, *message.job_args)
end

false
Expand Down
32 changes: 32 additions & 0 deletions lib/sidekiq/throttled/message.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
# frozen_string_literal: true

module Sidekiq
module Throttled
class Message
def initialize(item)
@item = item.is_a?(Hash) ? item : parse(item)
end

def job_class
@item.fetch("wrapped") { @item["class"] }
end

def job_args
@item.key?("wrapped") ? @item.dig("args", 0, "arguments") : @item["args"]
end

def job_id
@item["jid"]
end

private

def parse(item)
item = Sidekiq.load_json(item)
item.is_a?(Hash) ? item : {}
rescue JSON::ParserError
{}
end
end
end
end
11 changes: 5 additions & 6 deletions lib/sidekiq/throttled/middlewares/server.rb
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# frozen_string_literal: true

# internal
require_relative "../message"
require_relative "../registry"

module Sidekiq
Expand All @@ -13,13 +14,11 @@ class Server
def call(_worker, msg, _queue)
yield
ensure
job = msg.fetch("wrapped") { msg["class"] }
args = msg.key?("wrapped") ? msg.dig("args", 0, "arguments") : msg["args"]
jid = msg["jid"]
message = Message.new(msg)

if job && jid
Registry.get job do |strategy|
strategy.finalize!(jid, *args)
if message.job_class && message.job_id
Registry.get(message.job_class) do |strategy|
strategy.finalize!(message.job_id, *message.job_args)
end
end
end
Expand Down
9 changes: 6 additions & 3 deletions rubocop/rspec.yml
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
RSpec/BeNil:
Enabled: true
EnforcedStyle: be

RSpec/ExampleLength:
Enabled: true
Max: 10
Expand All @@ -9,6 +13,5 @@ RSpec/MultipleExpectations:
RSpec/NamedSubject:
Enabled: false

RSpec/BeNil:
Enabled: true
EnforcedStyle: be
RSpec/Rails:
Enabled: false
183 changes: 183 additions & 0 deletions spec/lib/sidekiq/throttled/message_spec.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,183 @@
# frozen_string_literal: true

RSpec.describe Sidekiq::Throttled::Message do
subject(:message) do
described_class.new(item)
end

let(:item) do
{
"class" => "ExcitingJob",
"args" => [42],
"jid" => "deadbeef"
}
end

describe "#job_class" do
subject { message.job_class }

it { is_expected.to eq("ExcitingJob") }

context "with serialized payload" do
let(:item) do
JSON.dump({
"class" => "ExcitingJob",
"args" => [42],
"jid" => "deadbeef"
})
end

it { is_expected.to eq("ExcitingJob") }
end

context "with ActiveJob payload" do
let(:item) do
{
"class" => "ActiveJob",
"wrapped" => "ExcitingJob",
"args" => [{ "arguments" => [42] }],
"jid" => "deadbeef"
}
end

it { is_expected.to eq("ExcitingJob") }
end

context "with serialized ActiveJob payload" do
let(:item) do
JSON.dump({
"class" => "ActiveJob",
"wrapped" => "ExcitingJob",
"args" => [{ "arguments" => [42] }],
"jid" => "deadbeef"
})
end

it { is_expected.to eq("ExcitingJob") }
end

context "with invalid payload" do
let(:item) { "invalid" }

it { is_expected.to be nil }
end

context "with invalid serialized payload" do
let(:item) { JSON.dump("invalid") }

it { is_expected.to be nil }
end
end

describe "#job_args" do
subject { message.job_args }

it { is_expected.to eq([42]) }

context "with serialized payload" do
let(:item) do
JSON.dump({
"class" => "ExcitingJob",
"args" => [42],
"jid" => "deadbeef"
})
end

it { is_expected.to eq([42]) }
end

context "with ActiveJob payload" do
let(:item) do
{
"class" => "ActiveJob",
"wrapped" => "ExcitingJob",
"args" => [{ "arguments" => [42] }],
"jid" => "deadbeef"
}
end

it { is_expected.to eq([42]) }
end

context "with serialized ActiveJob payload" do
let(:item) do
JSON.dump({
"class" => "ActiveJob",
"wrapped" => "ExcitingJob",
"args" => [{ "arguments" => [42] }],
"jid" => "deadbeef"
})
end

it { is_expected.to eq([42]) }
end

context "with invalid payload" do
let(:item) { "invalid" }

it { is_expected.to be nil }
end

context "with invalid serialized payload" do
let(:item) { JSON.dump("invalid") }

it { is_expected.to be nil }
end
end

describe "#job_id" do
subject { message.job_id }

it { is_expected.to eq("deadbeef") }

context "with serialized payload" do
let(:item) do
JSON.dump({
"class" => "ExcitingJob",
"args" => [42],
"jid" => "deadbeef"
})
end

it { is_expected.to eq("deadbeef") }
end

context "with ActiveJob payload" do
let(:item) do
{
"class" => "ActiveJob",
"wrapped" => "ExcitingJob",
"args" => [{ "arguments" => [42] }],
"jid" => "deadbeef"
}
end

it { is_expected.to eq("deadbeef") }
end

context "with serialized ActiveJob payload" do
let(:item) do
JSON.dump({
"class" => "ActiveJob",
"wrapped" => "ExcitingJob",
"args" => [{ "arguments" => [42] }],
"jid" => "deadbeef"
})
end

it { is_expected.to eq("deadbeef") }
end

context "with invalid payload" do
let(:item) { "invalid" }

it { is_expected.to be nil }
end

context "with invalid serialized payload" do
let(:item) { JSON.dump("invalid") }

it { is_expected.to be nil }
end
end
end
27 changes: 26 additions & 1 deletion spec/lib/sidekiq/throttled/middlewares/server_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,6 @@
middleware.call(double, payload_args, double) { |*| :foobar }
end


it "returns yields control to the given block" do
expect { |b| middleware.call(double, payload, double, &b) }
.to yield_control
Expand Down Expand Up @@ -114,5 +113,31 @@
.to be :foobar
end
end

context "when message contains no job class" do
before do
allow(Sidekiq::Throttled::Registry).to receive(:get).and_call_original
payload.delete("class")
end

it "does not attempt to retrieve any strategy" do
expect { |b| middleware.call(double, payload, double, &b) }.to yield_control

expect(Sidekiq::Throttled::Registry).not_to receive(:get)
end
end

context "when message contains no jid" do
before do
allow(Sidekiq::Throttled::Registry).to receive(:get).and_call_original
payload.delete("jid")
end

it "does not attempt to retrieve any strategy" do
expect { |b| middleware.call(double, payload, double, &b) }.to yield_control

expect(Sidekiq::Throttled::Registry).not_to receive(:get)
end
end
end
end
8 changes: 4 additions & 4 deletions spec/lib/sidekiq/throttled_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,8 @@

it "passes JID and arguments to registered strategy" do
strategy = Sidekiq::Throttled::Registry.add("foo",
threshold: { limit: 1, period: 1 },
concurrency: { limit: 1 })
threshold: { limit: 1, period: 1 },
concurrency: { limit: 1 })

payload_jid = jid
args = ["foo", 1]
Expand Down Expand Up @@ -102,8 +102,8 @@

it "unwraps ActiveJob-jobs job parameters" do
strategy = Sidekiq::Throttled::Registry.add("wrapped-foo",
threshold: { limit: 1, period: 1 },
concurrency: { limit: 1 })
threshold: { limit: 1, period: 1 },
concurrency: { limit: 1 })

payload_jid = jid
args = ["foo", 1]
Expand Down

0 comments on commit c151789

Please sign in to comment.