forked from ixti/sidekiq-throttled
-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy paththrottled.rb
116 lines (101 loc) · 3.11 KB
/
throttled.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
# frozen_string_literal: true
require "sidekiq"
require_relative "./throttled/config"
require_relative "./throttled/cooldown"
require_relative "./throttled/job"
require_relative "./throttled/message"
require_relative "./throttled/middlewares/server"
require_relative "./throttled/patches/basic_fetch"
require_relative "./throttled/patches/super_fetch"
require_relative "./throttled/registry"
require_relative "./throttled/version"
require_relative "./throttled/worker"
# @see https://github.com/mperham/sidekiq/
module Sidekiq
# Concurrency and threshold throttling for Sidekiq.
#
# Just add somewhere in your bootstrap:
#
# require "sidekiq/throttled"
#
# Once you've done that you can include {Sidekiq::Throttled::Job} to your
# job classes and configure throttling:
#
# class MyJob
# include Sidekiq::Job
# include Sidekiq::Throttled::Job
#
# sidekiq_options :queue => :my_queue
#
# sidekiq_throttle({
# # Allow maximum 10 concurrent jobs of this class at a time.
# :concurrency => { :limit => 10 },
# # Allow maximum 1K jobs being processed within one hour window.
# :threshold => { :limit => 1_000, :period => 1.hour }
# })
#
# def perform
# # ...
# end
# end
module Throttled
MUTEX = Mutex.new
private_constant :MUTEX
@config = Config.new.freeze
@cooldown = Cooldown[@config]
class << self
# @api internal
#
# @return [Cooldown, nil]
attr_reader :cooldown
# @api internal
#
# @return [Config, nil]
attr_reader :config
# @example
# Sidekiq::Throttled.configure do |config|
# config.cooldown_period = nil # Disable queues cooldown manager
# end
#
# @yieldparam config [Config]
def configure
MUTEX.synchronize do
config = @config.dup
yield config
@config = config.freeze
@cooldown = Cooldown[@config]
end
end
# Tells whenever job is throttled or not.
#
# @param [String] message Job's JSON payload
# @return [Boolean]
def throttled?(message)
message = Message.new(message)
return false unless message.job_class && message.job_id
Registry.get(message.job_class) do |strategy|
return strategy.throttled?(message.job_id, *message.job_args)
end
false
rescue StandardError
false
end
# Return throttled job to be executed later, delegating the details of how to do that
# to the Strategy for that job.
#
# @return [void]
def requeue_throttled(work)
message = JSON.parse(work.job)
job_class = Object.const_get(message.fetch("wrapped") { message.fetch("class") { return false } })
Registry.get job_class do |strategy|
strategy.requeue_throttled(work, **job_class.sidekiq_throttled_requeue_options)
end
end
end
end
configure_server do |config|
config.server_middleware do |chain|
chain.add(Sidekiq::Throttled::Middlewares::Server)
end
end
end