-
Notifications
You must be signed in to change notification settings - Fork 7
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix: BoundedQueue, and restart thread / keep alive on errors #38
Changes from 21 commits
0aa5862
e5e4314
72e9da0
8aa6edb
ca348a8
3d8ad83
11689a4
7468bd7
81dfa29
8a4ca18
1037961
b896219
745e41a
0262ce4
3079776
985691e
78b10e0
04e4e85
027256c
3e77fdd
ae8faca
81e1552
8b086af
14411f7
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -11,3 +11,9 @@ | |
.rspec_status | ||
*.gem | ||
k6/node_modules/ | ||
|
||
# IDEs | ||
.idea | ||
|
||
# k6 | ||
node_modules |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,23 @@ | ||
module GraphQL | ||
class Hive < GraphQL::Tracing::PlatformTracing | ||
class BoundedQueue < Thread::Queue | ||
def initialize(bound:, logger:) | ||
@bound = bound | ||
@logger = logger | ||
@lock = Mutex.new | ||
|
||
super() | ||
end | ||
|
||
def push(item) | ||
@lock.synchronize do | ||
if size >= @bound | ||
@logger.error("BoundedQueue is full, discarding operation") | ||
return | ||
end | ||
super | ||
end | ||
end | ||
end | ||
end | ||
end |
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -3,6 +3,7 @@ | |||||
require "digest" | ||||||
require "graphql-hive/analyzer" | ||||||
require "graphql-hive/printer" | ||||||
require "graphql-hive/bounded_queue" | ||||||
|
||||||
module GraphQL | ||||||
class Hive < GraphQL::Tracing::PlatformTracing | ||||||
|
@@ -16,14 +17,11 @@ def self.instance | |||||
|
||||||
def initialize(options, client) | ||||||
@@instance = self | ||||||
|
||||||
@options = options | ||||||
@client = client | ||||||
|
||||||
@options_mutex = Mutex.new | ||||||
@queue = Queue.new | ||||||
|
||||||
@sampler = Sampler.new(options[:collect_usage_sampling], options[:logger]) # NOTE: logs for deprecated field | ||||||
@queue = BoundedQueue.new(bound: options[:buffer_size], logger: options[:logger]) | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. i know that the
Suggested change
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We had a discussion here about this! #38 (comment) We don't need this to be considerably larger than the buffer size because the thread wakes up for every single operation, and calls So ideally, this won't every reach larger than size of There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. that makes sense! i thought we still wanted it to be considerably larger. thanks for clearing that up! |
||||||
|
||||||
start_thread | ||||||
end | ||||||
|
@@ -52,15 +50,20 @@ def start_thread | |||||
@thread = Thread.new do | ||||||
buffer = [] | ||||||
while (operation = @queue.pop(false)) | ||||||
@options[:logger].debug("processing operation from queue: #{operation}") | ||||||
buffer << operation if @sampler.sample?(operation) | ||||||
|
||||||
@options_mutex.synchronize do | ||||||
if buffer.size >= @options[:buffer_size] | ||||||
@options[:logger].debug("buffer is full, sending!") | ||||||
process_operations(buffer) | ||||||
buffer = [] | ||||||
begin | ||||||
@options[:logger].debug("processing operation from queue: #{operation}") | ||||||
buffer << operation if @sampler.sample?(operation) | ||||||
|
||||||
@options_mutex.synchronize do | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Perhaps out of scope because you are fixing the OOM issue. But I don't think this mutex does anything. There isn't another thread accessing this code. |
||||||
if buffer.size >= @options[:buffer_size] | ||||||
@options[:logger].debug("buffer is full, sending!") | ||||||
process_operations(buffer) | ||||||
buffer = [] | ||||||
end | ||||||
end | ||||||
rescue => e | ||||||
buffer = [] | ||||||
@options[:logger].error(e) | ||||||
end | ||||||
end | ||||||
|
||||||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,78 @@ | ||
# frozen_string_literal: true | ||
|
||
require "spec_helper" | ||
require "graphql-hive" | ||
|
||
RSpec.describe GraphQL::Hive::BoundedQueue do | ||
subject(:queue) { GraphQL::Hive::BoundedQueue.new(bound: 2, logger: logger) } | ||
|
||
let(:logger) { instance_double("Logger") } | ||
|
||
before do | ||
allow(logger).to receive(:error) | ||
end | ||
|
||
it "should be a subclass of Thread::Queue" do | ||
expect(GraphQL::Hive::BoundedQueue.superclass).to eq(Thread::Queue) | ||
end | ||
|
||
it "should be able to push items up to size" do | ||
queue.push("one") | ||
queue.push("two") | ||
|
||
expect(queue.size).to eq(2) | ||
end | ||
|
||
it "should discard items and log when full" do | ||
queue.push("one") | ||
queue.push("two") | ||
queue.push("three") | ||
queue.push("four") | ||
|
||
expect(queue.size).to eq(2) | ||
expect(logger).to have_received(:error).with("BoundedQueue is full, discarding operation").twice | ||
end | ||
|
||
it "allows pushes after pops" do | ||
queue.push("one") | ||
queue.push("two") | ||
|
||
queue.push("invalid") | ||
expect(queue.size).to eq(2) | ||
|
||
queue.pop | ||
queue.push("three") | ||
expect(queue.size).to eq(2) | ||
end | ||
|
||
it "should be thsead-safe and discard items when full" do | ||
threads = [] | ||
10.times do |i| | ||
threads << Thread.new do | ||
queue.push(i) | ||
end | ||
end | ||
|
||
threads.each(&:join) | ||
|
||
expect(queue.size).to eq(2) | ||
expect(logger).to have_received(:error).with("BoundedQueue is full, discarding operation").exactly(8).times | ||
end | ||
|
||
it "should be able to push after pop in multi-threaded environment" do | ||
threads = [] | ||
perform_operations = [:push, :push, :pop, :push, :push, :push, :push] | ||
|
||
perform_operations.each do |operation| | ||
threads << Thread.new do | ||
queue.push("operation") if operation == :push | ||
queue.pop if operation == :pop | ||
end | ||
end | ||
|
||
threads.each(&:join) | ||
|
||
expect(queue.size).to eq(2) | ||
expect(logger).to have_received(:error).with("BoundedQueue is full, discarding operation").exactly(3).times | ||
end | ||
end |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we also synchronize the
pop
here? That would mean that we sync between the Hive Reporting Thread (pop) and the main Thread for puma (push).There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, it would make
size
accurate.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm. I thought I'd comment here where I'm at the moment. I'm having some deadlock issues with the way it is currently, because
@lock.synchronize
onpop
makes it so that pop happens first, and the threadwhile(operation)
instantly ends, and there are no more threads available to process.In tests, this means that the test instantly fails since the processing thread dies. And in the server, we seem to be running into the puma main thread never being able to get the lock 🤔
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The puma main thread isn't able to get the lock because we're constantly
pop
'ing in our thread. We discussed that this refactor can be part of or after #33 so we don't make too many changes here.