Skip to content

Commit

Permalink
Merge pull request #7 from rinsed-org/better-error-handling
Browse files Browse the repository at this point in the history
implement some better error handling
  • Loading branch information
reidnimz authored Nov 22, 2023
2 parents 3b2661d + 5bda05b commit 8ef9bdd
Show file tree
Hide file tree
Showing 8 changed files with 84 additions and 22 deletions.
7 changes: 5 additions & 2 deletions Gemfile
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,13 @@ gem "concurrent-ruby"
gem "connection_pool"
gem "jwt"
gem "oj"
gem "rspec"
gem "pry"
gem "dotenv"

group :development do
gem "parallel"
gem "pry"
end

group :test do
gem "rspec"
end
2 changes: 1 addition & 1 deletion Gemfile.lock
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
PATH
remote: .
specs:
rb_snowflake_client (0.0.3)
rb_snowflake_client (0.0.4)

GEM
remote: https://rubygems.org/
Expand Down
26 changes: 16 additions & 10 deletions lib/ruby_snowflake/client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,10 @@

require_relative "result"
require_relative "streaming_result"
require_relative "client/http_connection_wrapper"
require_relative "client/single_thread_in_memory_strategy"
require_relative "client/streaming_result_strategy"
require_relative "client/threaded_in_memory_strategy"
require_relative "client/single_thread_in_memory_strategy"

module RubySnowflake
class Error < StandardError
Expand All @@ -26,8 +27,12 @@ def initialize(details)
@sentry_context = details
end
end
class BadResponseError < Error ; end
class ConnectionError < Error ; end
class ConnectionStarvedError < Error ; end
class RequestError < Error ; end


# TODO: double check that net/http is actually using compression like it should be
class Client
JWT_TOKEN_TTL = 3600 # seconds, this is the max supported by snowflake
CONNECTION_TIMEOUT = 30 # seconds
Expand All @@ -43,17 +48,19 @@ def self.connect
ENV["SNOWFLAKE_ACCOUNT"],
ENV["SNOWFLAKE_USER"],
ENV["SNOWFLAKE_PUBLIC_KEY_FINGERPRINT"],
ENV["SNOWFLAKE_DEFAULT_WAREHOUSE"],
)
end

# TODO: parameterize warehouse
def initialize(uri, private_key_path, organization, account, user, public_key_fingerprint)
def initialize(uri, private_key_path, organization, account, user, public_key_fingerprint, default_warehouse)
@base_uri = uri
@private_key_path = private_key_path
@organization = organization
@account = account
@user = user
@public_key_fingerprint = public_key_fingerprint # should be able to generate this from key pair, but haven't figured out right openssl options yet
@default_warehouse = default_warehouse
# should be able to generate this from key pair, but haven't figured out right openssl options yet
@public_key_fingerprint = public_key_fingerprint

# start with an expired value to force creation
@token_expires_at = Time.now.to_i - 1
Expand All @@ -62,10 +69,11 @@ def initialize(uri, private_key_path, organization, account, user, public_key_fi

def query(query, options = {})
statement_count = count_statements(query) if options[:statement_count] == nil
warehouse = options[:warehouse] || @default_warehouse

response = nil
connection_pool.with do |connection|
request_body = { "statement" => query, "warehouse" => "WEB_TEST_WH" }
request_body = { "statement" => query, "warehouse" => warehouse }
request_body["MULTI_STATEMENT_COUNT"] = statement_count if statement_count > 1

response = request_with_auth_and_headers(
Expand All @@ -82,9 +90,7 @@ def query(query, options = {})
private
def connection_pool
@connection_pool ||= ConnectionPool.new(size: MAX_CONNECTIONS, timeout: CONNECTION_TIMEOUT) do
# TODO: the connection pool lib expects these connections to be "Self healing", they're obviously not
# so we'll need to write a wrapper that is
Net::HTTP.start(hostname, port, :use_ssl => true)
HttpConnectionWrapper.new(hostname, port).start
end
end

Expand Down Expand Up @@ -126,7 +132,7 @@ def count_statements(query)

def handle_errors(response)
if response.code != "200"
raise Error.new({}),
raise BadResponseError.new({}),
"Bad response! Got code: #{response.code}, w/ message #{response.body}"
end
end
Expand Down
32 changes: 32 additions & 0 deletions lib/ruby_snowflake/client/http_connection_wrapper.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
module RubySnowflake
class Client
class HttpConnectionWrapper
def initialize(hostname, port)
@hostname = hostname
@port = port
end

def start
@connection = Net::HTTP.start(@hostname, @port, use_ssl: true)
self
rescue StandardError
raise ConnectionError.new "Error connecting to server."
end

def request(request)
# connections can timeout and close, re-open them
# which is what the connection pool expects
start unless connection.active?

begin
connection.request(request)
rescue StandardError => error
raise RequestError.new "HTTP error requesting data", cause: error
end
end

private
attr_accessor :connection
end
end
end
10 changes: 9 additions & 1 deletion lib/ruby_snowflake/client/threaded_in_memory_strategy.rb
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,15 @@ def self.result(statement_json_body, retreive_proc, num_threads)
end
end
futures.each do |future|
# TODO: futures can get rejected, handle this error case
if future.rejected?
raise ConnectionStarvedError.new(
"A partition request timed out. This is usually do to using the client in" \
"multiple threads. The client uses a connection thread pool and if too many" \
"requests are all done in threads at the same time, threads can get starved" \
"of access to connections. The solution for this is to spin up a new client" \
"instance with it's own connection pool to snowflake."
)
end
index, partition_data = future.value
result[index] = partition_data
end
Expand Down
23 changes: 17 additions & 6 deletions spec/ruby_snowflake/client_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,19 @@
let(:query) { "" }
let(:result) { client.query(query) }

context "when we can't connect" do
before do
allow(Net::HTTP).to receive(:start).and_raise("Some connection error")
end

it "raises a ConnectionError" do
expect { result }.to raise_error do |error|
expect(error).to be_a RubySnowflake::ConnectionError
expect(error.cause.message).to eq "Some connection error"
end
end
end

context "when the query errors" do
let(:query) { "INVALID QUERY;" }
it "should raise an exception" do
Expand All @@ -24,7 +37,6 @@
it "should raise an exception" do
expect { result }.to raise_error do |error|
expect(error).to be_a RubySnowflake::Error
#binding.pry
expect(error.message).to include "'TEST_DATABASE' does not exist or not authorized"
# TODO: make sure to include query in context
#expect(error.sentry_context).to include(
Expand Down Expand Up @@ -68,7 +80,6 @@
)
end

# TODO: if we want this semantics. W/o streaming doesn't feel too great.
it "should respond to each with a block" do
expect { |b| result.each(&b) }.to yield_with_args(an_instance_of(RubySnowflake::Row))
end
Expand Down Expand Up @@ -161,16 +172,16 @@
end
end

context "fetching 150k rows x 10 times - with threads" do
let(:limit) { 150_000 }
context "fetching 50k rows x 5 times - with threads" do
let(:limit) { 50_000 }
it "should work" do
t = []
10.times do |idx|
5.times do |idx|
t << Thread.new do
client = described_class.connect
result = client.query(query)
rows = result.get_all_rows
expect(rows.length).to eq 150000
expect(rows.length).to eq 50_000
expect((-50000...50000)).to include(rows[0]["id"].to_i)
end
end
Expand Down
3 changes: 2 additions & 1 deletion spec/streaming_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@ def new_client
"GBLARLO",
"OZA47907",
"SNOWFLAKE_CLIENT_TEST",
"SHA256:pbfmeTQ2+MestU2J9dXjGXTjtvZprYfHxzZzqqcIhFc=")
"SHA256:pbfmeTQ2+MestU2J9dXjGXTjtvZprYfHxzZzqqcIhFc=",
"WEB_TEST_WH"))
end

size = 1_000
Expand Down
3 changes: 2 additions & 1 deletion spec/test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@ def new_client
"GBLARLO",
"OZA47907",
"SNOWFLAKE_CLIENT_TEST",
"SHA256:pbfmeTQ2+MestU2J9dXjGXTjtvZprYfHxzZzqqcIhFc=")
"SHA256:pbfmeTQ2+MestU2J9dXjGXTjtvZprYfHxzZzqqcIhFc=",
"WEB_TEST_WH")
end

size = 1_000
Expand Down

0 comments on commit 8ef9bdd

Please sign in to comment.