-
Notifications
You must be signed in to change notification settings - Fork 7
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix: BoundedQueue, and restart thread / keep alive on errors #38
Conversation
lib/graphql-hive/bounded_queue.rb
Outdated
if size >= @bound | ||
@logger.error("BoundedQueue is full, discarding operation") | ||
return | ||
end |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There's maybe a problem with the way it's being done here because this is not safe from concurrency. There could be many puma threads adding to this queue, and potentially they both get the same number for size
, and add to the queue. Though this may be safe because we always break the cycle whenever the size is >= bound
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can write a test that uses threads to push to the queue.
subject(:queue) { GraphQL::Hive::BoundedQueue.new(bound: 4, logger: logger) }
it "should discard items and log when full" do
2.times do |i|
Thread.new do
3.times do |ii|
queue.push("Thread #{i} operation {ii}")
end
end
end
expect(queue.size).to eq(4)
expect(logger).to have_received(:error).with("BoundedQueue is full, discarding operation").twice
end
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TY! I Just added a test very similar to this one
#38 (comment)
lib/graphql-hive/bounded_queue.rb
Outdated
if size >= @bound | ||
@logger.error("BoundedQueue is full, discarding operation") | ||
return | ||
end |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can write a test that uses threads to push to the queue.
subject(:queue) { GraphQL::Hive::BoundedQueue.new(bound: 4, logger: logger) }
it "should discard items and log when full" do
2.times do |i|
Thread.new do
3.times do |ii|
queue.push("Thread #{i} operation {ii}")
end
end
end
expect(queue.size).to eq(4)
expect(logger).to have_received(:error).with("BoundedQueue is full, discarding operation").twice
end
lib/graphql-hive.rb
Outdated
@@ -36,6 +36,7 @@ class Hive < GraphQL::Tracing::PlatformTracing | |||
read_operations: true, | |||
report_schema: true, | |||
buffer_size: 50, | |||
bounded_queue_multiple: 5, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm guessing that you added this so that we don't drop operations when the queue is being flushed. I don't think we need a new configuration value here, though. We want to drop the buffer array anyway in the future.
bounded_queue_multiple: 5, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This one is for how big is the size of the @queue
in UsageReporter, and it's different from the buffer size. Maybe it needs a better name!
If we have a buffer
size of 5, we would have @queue
of size 25 by default. And the puma thread could add max of 25 before it starts dropping operations.
While puma is adding operations, we would be flushing the @queue
continuously in the reporting thread, and making room for puma to add more. But if things get too fast (higher rps), we would start dropping since there's no more room in the queue. I've tested this locally with the k6 code.
Some services could update the bounded_queue_multiple
to get a higher queue size if they're OK with the memory usage of the queue. I just thought 5 was a good number to start with, but I'm open to discussion!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I understand that. My comment here is that I am doubtful that we want to expose this as a configuration value. What benefit do users get by configuring the bounded queue size? Why is it a multiplier? For example.
I would recommend that we do not expose the bounded_queue size and, for now, make it a slightly larger number than the buffer size. After all, we have discussed dropping the buffer and just using the queue.
I would prefer to keep configuration values consistent so we don't have to deprecate and remove them in the future.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, I see what you mean, ty for clarifying! I might be giving too much control here to the consumer of this gem.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've updated this now, to remove the bounded multiple config value, and just using the buffer_size. We discussed that this should never practically reach anyways since we pop after every operation. But if it is reached, then we probably have a larger problem to solve (like this OOM we're trying to fix)
lib/graphql-hive/usage_reporter.rb
Outdated
queue_bound = (options[:buffer_size] * options[:bounded_queue_multiple]).to_int | ||
@queue = BoundedQueue.new(bound: queue_bound, logger: options[:logger]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
queue_bound = (options[:buffer_size] * options[:bounded_queue_multiple]).to_int | |
@queue = BoundedQueue.new(bound: queue_bound, logger: options[:logger]) | |
queue_bound = (options[:buffer_size].to_i * 5) | |
@queue = BoundedQueue.new(bound: queue_bound, logger: options[:logger]) |
because
irb(main):003> ("5" * 5).to_i
=> 55555
irb(main):004> ("5".to_i * 5)
=> 25
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch, thank you!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just leaving some comments for reviewers, and also parts that I want some feedback on.
end | ||
|
||
def push(item) | ||
@lock.synchronize do |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we also synchronize the pop
here? That would mean that we sync between the Hive Reporting Thread (pop) and the main Thread for puma (push).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, it would make size
accurate.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm. I thought I'd comment here where I'm at the moment. I'm having some deadlock issues with the way it is currently, because @lock.synchronize
on pop
makes it so that pop happens first, and the thread while(operation)
instantly ends, and there are no more threads available to process.
In tests, this means that the test instantly fails since the processing thread dies. And in the server, we seem to be running into the puma main thread never being able to get the lock 🤔
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The puma main thread isn't able to get the lock because we're constantly pop
'ing in our thread. We discussed that this refactor can be part of or after #33 so we don't make too many changes here.
it "should be thsead-safe and discard items when full" do | ||
threads = [] | ||
20.times do |i| | ||
threads << Thread.new do | ||
queue.push(i) | ||
end | ||
end | ||
|
||
threads.each(&:join) | ||
|
||
expect(queue.size).to eq(2) | ||
expect(logger).to have_received(:error).with("BoundedQueue is full, discarding operation").exactly(18).times | ||
end |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd love to get some opinions on this test here!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this should test cases where the queue is popped after hitting the bound. I know you test that in a single thread on line 36 but we should demonstrate the lock and bound work as expected in a multi-threaded scenario too.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point! I added a new test case for this
Co-authored-by: Cassia Scheffer <[email protected]>
…yng/graphql-ruby-hive into APLT-606-oom-on-errors-hive-buffer
@sampler = Sampler.new(options[:collect_usage_sampling], options[:logger]) # NOTE: logs for deprecated field | ||
@queue = BoundedQueue.new(bound: options[:buffer_size], logger: options[:logger]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i know that the bounded_queue_multiple
config was removed, but should we still have the bound as the buffer size * 5?
@queue = BoundedQueue.new(bound: options[:buffer_size], logger: options[:logger]) | |
@queue = BoundedQueue.new(bound: options[:buffer_size].to_i * 5, logger: options[:logger]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We had a discussion here about this! #38 (comment)
We don't need this to be considerably larger than the buffer size because the thread wakes up for every single operation, and calls pop
.
So ideally, this won't every reach larger than size of ~1
. But to be safe, we can provide a larger queue (buffer size). If we reach this limit, we have other things that are going wrong.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
that makes sense! i thought we still wanted it to be considerably larger. thanks for clearing that up!
end | ||
|
||
def push(item) | ||
@lock.synchronize do |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, it would make size
accurate.
@options[:logger].debug("processing operation from queue: #{operation}") | ||
buffer << operation if @sampler.sample?(operation) | ||
|
||
@options_mutex.synchronize do |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perhaps out of scope because you are fixing the OOM issue. But I don't think this mutex does anything. There isn't another thread accessing this code.
Why
OOMs for GraphQL Hive Reporter due to
@queue
never flushing after an error occurs in the thread.What is happening:
add_operation
can still be called, even though nothing is processing any operations@queue
is unbounded@queue
size keeps increasing, causing a memory leakWhat