Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: BoundedQueue, and restart thread / keep alive on errors #38

Merged
merged 24 commits into from
Oct 30, 2024
Merged
Show file tree
Hide file tree
Changes from 20 commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,9 @@
.rspec_status
*.gem
k6/node_modules/

# IDEs
.idea

# k6
node_modules
9 changes: 4 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -154,18 +154,17 @@ class MySchema < GraphQL::Schema
debug: false, # verbose logs
logger: MyLogger.new,
endpoint: 'app.graphql-hive.com',
port: 80,
buffer_size: 50, # forward the operations data to Hive every 50 requests

port: 80,
buffer_size: 50, # how many operations can be sent to hive in a single batch (AFTER sampling)
collect_usage: true, # report usage to Hive
collect_usage_sampling: {
# optional members of `collect_usage_sampling`
sample_rate: 0.5, # % of operations reported
sampler: proc { |context| context.operation_name.includes?('someQuery') 1 : 0.5 }, # assign custom sampling rates (overrides `sampling rate`)
at_least_once: true, # sample every distinct operation at least once
key_generator: proc { |context| context.operation_name } # assign custom keys to distinguish between distinct operations
}

},
report_schema: true, # publish schema to Hive
# mandatory if `report_schema: true`
reporting: {
Expand Down
3 changes: 2 additions & 1 deletion lib/graphql-hive.rb
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,8 @@ def initialize_options!(options)
options[:logger] = Logger.new($stderr)
original_formatter = Logger::Formatter.new
options[:logger].formatter = proc { |severity, datetime, progname, msg|
original_formatter.call(severity, datetime, progname, "[hive] #{msg.dump}")
msg = msg.respond_to?(:dump) ? msg.dump : msg
original_formatter.call(severity, datetime, progname, "[hive] #{msg}")
}
options[:logger].level = options[:debug] ? Logger::DEBUG : Logger::INFO
end
Expand Down
23 changes: 23 additions & 0 deletions lib/graphql-hive/bounded_queue.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
module GraphQL
class Hive < GraphQL::Tracing::PlatformTracing
class BoundedQueue < Thread::Queue
def initialize(bound:, logger:)
@bound = bound
@logger = logger
@lock = Mutex.new

super()
end

def push(item)
@lock.synchronize do
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we also synchronize the pop here? That would mean that we sync between the Hive Reporting Thread (pop) and the main Thread for puma (push).

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it would make size accurate.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm. I thought I'd comment here where I'm at the moment. I'm having some deadlock issues with the way it is currently, because @lock.synchronize on pop makes it so that pop happens first, and the thread while(operation) instantly ends, and there are no more threads available to process.

In tests, this means that the test instantly fails since the processing thread dies. And in the server, we seem to be running into the puma main thread never being able to get the lock 🤔

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The puma main thread isn't able to get the lock because we're constantly pop'ing in our thread. We discussed that this refactor can be part of or after #33 so we don't make too many changes here.

if size >= @bound
@logger.error("BoundedQueue is full, discarding operation")
return
end
super
end
end
end
end
end
27 changes: 15 additions & 12 deletions lib/graphql-hive/usage_reporter.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
require "digest"
require "graphql-hive/analyzer"
require "graphql-hive/printer"
require "graphql-hive/bounded_queue"

module GraphQL
class Hive < GraphQL::Tracing::PlatformTracing
Expand All @@ -16,14 +17,11 @@ def self.instance

def initialize(options, client)
@@instance = self

@options = options
@client = client

@options_mutex = Mutex.new
@queue = Queue.new

@sampler = Sampler.new(options[:collect_usage_sampling], options[:logger]) # NOTE: logs for deprecated field
@queue = BoundedQueue.new(bound: options[:buffer_size], logger: options[:logger])
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i know that the bounded_queue_multiple config was removed, but should we still have the bound as the buffer size * 5?

Suggested change
@queue = BoundedQueue.new(bound: options[:buffer_size], logger: options[:logger])
@queue = BoundedQueue.new(bound: options[:buffer_size].to_i * 5, logger: options[:logger])

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We had a discussion here about this! #38 (comment)

We don't need this to be considerably larger than the buffer size because the thread wakes up for every single operation, and calls pop.

So ideally, this won't every reach larger than size of ~1. But to be safe, we can provide a larger queue (buffer size). If we reach this limit, we have other things that are going wrong.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that makes sense! i thought we still wanted it to be considerably larger. thanks for clearing that up!


start_thread
end
Expand Down Expand Up @@ -52,15 +50,20 @@ def start_thread
@thread = Thread.new do
buffer = []
while (operation = @queue.pop(false))
@options[:logger].debug("processing operation from queue: #{operation}")
buffer << operation if @sampler.sample?(operation)

@options_mutex.synchronize do
if buffer.size >= @options[:buffer_size]
@options[:logger].debug("buffer is full, sending!")
process_operations(buffer)
buffer = []
begin
@options[:logger].debug("processing operation from queue: #{operation}")
buffer << operation if @sampler.sample?(operation)

@options_mutex.synchronize do
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps out of scope because you are fixing the OOM issue. But I don't think this mutex does anything. There isn't another thread accessing this code.

if buffer.size >= @options[:buffer_size]
@options[:logger].debug("buffer is full, sending!")
process_operations(buffer)
buffer = []
end
end
rescue => e
buffer = []
@options[:logger].error(e)
end
end

Expand Down
61 changes: 61 additions & 0 deletions spec/graphql/graphql-hive/bounded_queue_spec.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
# frozen_string_literal: true

require "spec_helper"
require "graphql-hive"

RSpec.describe GraphQL::Hive::BoundedQueue do
subject(:queue) { GraphQL::Hive::BoundedQueue.new(bound: 2, logger: logger) }

let(:logger) { instance_double("Logger") }

before do
allow(logger).to receive(:error)
end

it "should be a subclass of Thread::Queue" do
expect(GraphQL::Hive::BoundedQueue.superclass).to eq(Thread::Queue)
end

it "should be able to push items up to size" do
queue.push("one")
queue.push("two")

expect(queue.size).to eq(2)
end

it "should discard items and log when full" do
queue.push("one")
queue.push("two")
queue.push("three")
queue.push("four")

expect(queue.size).to eq(2)
expect(logger).to have_received(:error).with("BoundedQueue is full, discarding operation").twice
end

it "allows pushes after pops" do
queue.push("one")
queue.push("two")

queue.push("invalid")
expect(queue.size).to eq(2)

queue.pop
queue.push("three")
expect(queue.size).to eq(2)
end

it "should be thsead-safe and discard items when full" do
threads = []
20.times do |i|
threads << Thread.new do
queue.push(i)
end
end

threads.each(&:join)

expect(queue.size).to eq(2)
expect(logger).to have_received(:error).with("BoundedQueue is full, discarding operation").exactly(18).times
end
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd love to get some opinions on this test here!

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this should test cases where the queue is popped after hitting the bound. I know you test that in a single thread on line 36 but we should demonstrate the lock and bound work as expected in a multi-threaded scenario too.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point! I added a new test case for this

end
77 changes: 73 additions & 4 deletions spec/graphql/graphql-hive/usage_reporter_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,10 @@

RSpec.describe GraphQL::Hive::UsageReporter do
let(:usage_reporter_instance) { described_class.new(options, client) }
let(:options) { {logger: logger} }
let(:options) { {logger: logger, buffer_size: buffer_size} }
let(:logger) { instance_double("Logger") }
let(:client) { instance_double("Hive::Client") }
let(:buffer_size) { 1 }

let(:timestamp) { 1_720_705_946_333 }
let(:queries) { [] }
Expand All @@ -32,7 +33,7 @@
expect(usage_reporter_instance.instance_variable_get(:@client)).to eq(client)

expect(usage_reporter_instance.instance_variable_get(:@options_mutex)).to be_an_instance_of(Mutex)
expect(usage_reporter_instance.instance_variable_get(:@queue)).to be_an_instance_of(Queue)
expect(usage_reporter_instance.instance_variable_get(:@queue)).to be_an_instance_of(GraphQL::Hive::BoundedQueue)
expect(usage_reporter_instance.instance_variable_get(:@sampler)).to be_an_instance_of(GraphQL::Hive::Sampler)
end
end
Expand Down Expand Up @@ -65,11 +66,13 @@

describe "#start_thread" do
it "logs a warning if the thread is already alive" do
usage_reporter_instance.instance_variable_set(:@thread, Thread.new do
thread = Thread.new do
aryascripts marked this conversation as resolved.
Show resolved Hide resolved
# do nothing
end)
end
usage_reporter_instance.instance_variable_set(:@thread, thread)
expect(logger).to receive(:warn)
usage_reporter_instance.on_start
thread.join
end

context "when configured with sampling" do
Expand Down Expand Up @@ -106,6 +109,72 @@
usage_reporter_instance.add_operation(operation)
end
end

context "with erroneous operations" do
let(:sampler_class) { class_double(GraphQL::Hive::Sampler).as_stubbed_const }
let(:sampler_instance) { instance_double("GraphQL::Hive::Sampler") }
let(:schema) { GraphQL::Schema.from_definition("type Query { test: String }") }
let(:queries_valid) { [GraphQL::Query.new(schema, "query TestingHiveValid { test }", variables: {})] }
let(:queries_invalid) { [GraphQL::Query.new(schema, "query TestingHiveInvalid { test }", variables: {})] }
let(:results) { [GraphQL::Query::Result.new(query: queries_valid[0], values: {"data" => {"test" => "test"}})] }
let(:buffer_size) { 1 }

before do
allow(sampler_class).to receive(:new).and_return(sampler_instance)
end

it "can still process the operations after erroneous operation" do
raise_exception = true
operation_error = StandardError.new("First operation")
allow(sampler_instance).to receive(:sample?) do |_operation|
if raise_exception
raise_exception = false
raise operation_error
else
true
end
end

mutex = Mutex.new
logger_condition = ConditionVariable.new
client_condition = ConditionVariable.new
allow(logger).to receive(:error) do |_e|
mutex.synchronize { logger_condition.signal }
end

allow(client).to receive(:send) do |_endpoint|
mutex.synchronize { client_condition.signal }
end

mutex.synchronize do
usage_reporter_instance.add_operation([timestamp, queries_invalid, results, duration])
logger_condition.wait(mutex)

usage_reporter_instance.add_operation([timestamp, queries_valid, results, duration])
client_condition.wait(mutex)
end

expect(client).to have_received(:send).once.with(
:"/usage",
{
map: {"a69918853baf60d89b871e1fbe13915b" =>
{
fields: ["Query", "Query.test"],
operation: "query TestingHiveValid {\n test\n}",
operationName: "TestingHiveValid"
}},
operations: [{
execution: {duration: 100000, errorsTotal: 0, ok: true},
operationMapKey: "a69918853baf60d89b871e1fbe13915b",
timestamp: 1720705946333
}],
size: 1
},
:usage
)
expect(logger).to have_received(:error).with(operation_error).once
end
end
end

describe "#process_operation" do
Expand Down