Skip to content

Commit

Permalink
Add config options, beef up error handling
Browse files Browse the repository at this point in the history
  • Loading branch information
reidnimz committed Nov 28, 2023
1 parent 4d7fe6b commit 80f86bc
Show file tree
Hide file tree
Showing 8 changed files with 145 additions and 41 deletions.
1 change: 1 addition & 0 deletions Gemfile
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ gem "dotenv"
gem "jwt"
gem "oj"
gem "rake"
gem "retryable"

group :development do
gem "parallel"
Expand Down
2 changes: 2 additions & 0 deletions Gemfile.lock
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ GEM
coderay (~> 1.1)
method_source (~> 1.0)
rake (13.1.0)
retryable (3.0.5)
rspec (3.12.0)
rspec-core (~> 3.12.0)
rspec-expectations (~> 3.12.0)
Expand Down Expand Up @@ -48,6 +49,7 @@ DEPENDENCIES
pry
rake
rb_snowflake_client!
retryable
rspec

BUNDLED WITH
Expand Down
20 changes: 20 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,26 @@ result.each do |row|
end
```

# Configuration Options

The client supports the following configuration options, each with their own getter/setter

`logger` - takes any ruby logger (by default it's a std lib Logger.new(STDOUT), set at DEBUG level
`jwt_token_ttl` - The time to live set on JWT token in seconds, defaults to 3600 (60 minutes, the longest Snowflake supports)
`connection_timeout` - The amount of time in seconds that the client's connection pool will wait before erroring in handing out a valid connection, defaults to 60 seconds
`max_connections` - The maximum number of http connections to hold open in the connection pool. If you use the client in a threaded context, you may need to increase this to be threads * client.max_threads_per_query, defaults to 16
`max_threads_per_query` - The maximum number of threads the client should use to retreive data, per query, defaults to 8. If you want the client to act in a single threaded way, set this to 1
`thread_scale_factor` - the factor used to decide when to spin up another thread. The default of 4 was determined experimentally, if you have high latency decreasing this might speed up overall speed for a large data set.
`http_retries` - By default the client will retry common typically transient errors (http responses) twice, you can change the number of retries with this.

Example configuration:
```ruby
client = RubySnowflake.connect
client.logger = Rails.logger
client.max_connections = 24
client.http_retries = 1
end

# Gotchas

1. Does not yet support multiple statements (work around is to wrap in `BEGIN ... END`)
Expand Down
95 changes: 72 additions & 23 deletions lib/ruby_snowflake/client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,11 @@
require "connection_pool"
require "json"
require "jwt"
require "logger"
require "net/http"
require "oj"
require "openssl"
require "retryable"
require "securerandom"
require "uri"

Expand All @@ -33,15 +35,28 @@ def initialize(details)
class BadResponseError < Error ; end
class ConnectionError < Error ; end
class ConnectionStarvedError < Error ; end
class RetryableBadResponseError < Error ; end
class RequestError < Error ; end


class Client
JWT_TOKEN_TTL = 3600 # seconds, this is the max supported by snowflake
CONNECTION_TIMEOUT = 60 # seconds, how long for a thread to wait for a connection b4 erroring
MAX_CONNECTIONS = 8
MAX_THREADS = 8
THREAD_SCALE_FACTOR = 4 # parition count factor for number of threads (i.e. 2 == once we have 4 partitions, spin up a second thread)
DEFAULT_LOGGER = Logger.new(STDOUT)
DEFAULT_LOG_LEVEL = Logger::INFO
# seconds, this is the max supported by snowflake
DEFAULT_JWT_TOKEN_TTL = 3600
# seconds, how long for a thread to wait for a connection before erroring
DEFAULT_CONNECTION_TIMEOUT = 60
# default maximum size of the http connection pool
DEFAULT_MAX_CONNECTIONS = 16
# default maximum size of the thread pool on a single query
DEFAULT_MAX_THREADS_PER_QUERY = 8
# parition count factor for number of threads
# (i.e. 2 == once we have 4 partitions, spin up a second thread)
DEFAULT_THREAD_SCALE_FACTOR = 4
# how many times to retry common retryable HTTP responses (i.e. 429, 504)
DEFAULT_HTTP_RETRIES = 2

attr_accessor :logger, :jwt_token_ttl, :connection_timeout, :max_connections,
:max_threads_per_query, :thread_scale_factor, :http_retries

def self.connect
private_key = ENV["SNOWFLAKE_PRIVATE_KEY"] || File.read(ENV["SNOWFLAKE_PRIVATE_KEY_PATH"])
Expand All @@ -65,6 +80,16 @@ def initialize(uri, private_key, organization, account, user, default_warehouse)
@default_warehouse = default_warehouse
@public_key_fingerprint = public_key_fingerprint(@private_key_pem)

# set defaults for config settings
@logger = DEFAULT_LOGGER
@logger.level = DEFAULT_LOG_LEVEL
@jwt_token_ttl = DEFAULT_JWT_TOKEN_TTL
@connection_timeout = DEFAULT_CONNECTION_TIMEOUT
@max_connections = DEFAULT_MAX_CONNECTIONS
@max_threads_per_query = DEFAULT_MAX_THREADS_PER_QUERY
@thread_scale_factor = DEFAULT_THREAD_SCALE_FACTOR
@http_retries = DEFAULT_HTTP_RETRIES

# start with an expired value to force creation
@token_expires_at = Time.now.to_i - 1
@token_semaphore = Concurrent::Semaphore.new(1)
Expand All @@ -84,13 +109,12 @@ def query(query, warehouse: nil, streaming: false)
Oj.dump(request_body)
)
end
handle_errors(response)
retreive_result_set(response, streaming)
end

private
def connection_pool
@connection_pool ||= ConnectionPool.new(size: MAX_CONNECTIONS, timeout: CONNECTION_TIMEOUT) do
@connection_pool ||= ConnectionPool.new(size: @max_connections, timeout: @connection_timeout) do
HttpConnectionWrapper.new(hostname, port).start
end
end
Expand All @@ -108,7 +132,7 @@ def jwt_token

@token_semaphore.acquire do
now = Time.now.to_i
@token_expires_at = now + JWT_TOKEN_TTL
@token_expires_at = now + @jwt_token_ttl

private_key = OpenSSL::PKey.read(@private_key_pem)

Expand All @@ -127,13 +151,6 @@ def jwt_token_expired?
Time.now.to_i > @token_expires_at
end

def handle_errors(response)
if response.code != "200"
raise BadResponseError.new({}),
"Bad response! Got code: #{response.code}, w/ message #{response.body}"
end
end

def request_with_auth_and_headers(connection, request_class, path, body=nil)
uri = URI.parse("#{@base_uri}#{path}")
request = request_class.new(uri)
Expand All @@ -143,11 +160,43 @@ def request_with_auth_and_headers(connection, request_class, path, body=nil)
request["X-Snowflake-Authorization-Token-Type"] = "KEYPAIR_JWT"
request.body = body unless body.nil?

response = nil
bm = Benchmark.measure { response = connection.request(request) }
puts "HTTP Request time: #{bm.real}"
handle_errors(response)
response
Retryable.retryable(tries: @http_retries + 1,
on: RetryableBadResponseError,
log_method: retryable_log_method) do
response = nil
bm = Benchmark.measure { response = connection.request(request) }
logger.debug { "HTTP Request time: #{bm.real}" }
raise_on_bad_response(response)
response
end
end

def raise_on_bad_response(response)
return if response.code == "200"

# there are a class of errors we want to retry rather than just giving up
if retryable_http_response_code?(response.code)
raise RetryableBadResponseError.new({}),
"Retryable bad response! Got code: #{response.code}, w/ message #{response.body}"

else # not one we should retry
raise BadResponseError.new({}),
"Bad response! Got code: #{response.code}, w/ message #{response.body}"
end
end

# shamelessly stolen from the battle tested python client
# https://github.com/snowflakedb/snowflake-connector-python/blob/eceed981f93e29d2f4663241253b48340389f4ef/src/snowflake/connector/network.py#L191
def retryable_http_response_code?(code)
# retry (in order): bad request, forbidden (token expired in flight), method not allowed,
# request timeout, too many requests, anything in the 500 range (504 is fairly common)
[400, 403, 405, 408, 429, 504].include?(code.to_i) || (500..599).include?(code)
end

def retryable_log_method
@retryable_log_method ||= proc do |retries, error|
logger.info("Retry attempt #{retries} because #{error.message}")
end
end

def retreive_result_set(response, streaming)
Expand Down Expand Up @@ -177,14 +226,14 @@ def retreive_partition_data(statement_handle, partition_index)

partition_json = nil
bm = Benchmark.measure { partition_json = Oj.load(partition_response.body, oj_options) }
puts "JSON parsing took: #{bm.real}"
logger.debug { "JSON parsing took: #{bm.real}" }
partition_data = partition_json["data"]

partition_data
end

def number_of_threads_to_use(partition_count)
[[1, (partition_count / THREAD_SCALE_FACTOR.to_f).ceil].max, MAX_THREADS].min
[[1, (partition_count / @thread_scale_factor.to_f).ceil].max, @max_threads_per_query].min
end

def oj_options
Expand Down
19 changes: 12 additions & 7 deletions lib/ruby_snowflake/client/threaded_in_memory_strategy.rb
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,18 @@ def self.result(statement_json_body, retreive_proc, num_threads)
end
futures.each do |future|
if future.rejected?
raise ConnectionStarvedError.new(
"A partition request timed out. This is usually do to using the client in" \
"multiple threads. The client uses a connection thread pool and if too many" \
"requests are all done in threads at the same time, threads can get starved" \
"of access to connections. The solution for this is to spin up a new client" \
"instance with it's own connection pool to snowflake."
)
if future.reason.is_a? RubySnowflake::Error
raise future.reason
else
raise ConnectionStarvedError.new(
"A partition request timed out. This is usually do to using the client in" \
"multiple threads. The client uses a connection thread pool and if too many" \
"requests are all done in threads at the same time, threads can get starved" \
"of access to connections. The solution for this is to either increase the " \
"max_connections parameter on the client or create a new client instance" \
"with it's own connection pool to snowflake per thread. Rejection reason: #{future.reason.message}"
)
end
end
index, partition_data = future.value
result[index] = partition_data
Expand Down
35 changes: 29 additions & 6 deletions spec/ruby_snowflake/client_spec.rb
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
require "spec_helper"

RSpec.describe RubySnowflake::Client do
describe "#fetch" do
let(:client) { described_class.connect }
let(:client) { described_class.connect }

describe "querying" do
let(:query) { "" }
let(:result) { client.query(query) }

Expand Down Expand Up @@ -159,10 +159,10 @@
end
end

context "fetching 150k rows x 100 times" do
context "fetching 150k rows x 20 times" do
let(:limit) { 150_000 }
it "should work" do
100.times do |idx|
20.times do |idx|
client = described_class.connect
result = client.query(query)
rows = result.get_all_rows
Expand All @@ -179,6 +179,8 @@
5.times do |idx|
t << Thread.new do
client = described_class.connect
client.max_threads_per_query = 12
client.max_connections = 12
result = client.query(query)
rows = result.get_all_rows
expect(rows.length).to eq 50_000
Expand All @@ -190,12 +192,14 @@
end
end

context "fetching 150k rows x 10 times - with threads & shared client" do
context "fetching 150k rows x 5 times - with threads & shared client" do
let(:limit) { 150_000 }
it "should work" do
t = []
client = described_class.connect
10.times do |idx|
client.max_threads_per_query = 8
client.max_connections = 40
5.times do |idx|
t << Thread.new do
result = client.query(query)
rows = result.get_all_rows
Expand Down Expand Up @@ -232,4 +236,23 @@
end
end
end

shared_examples "a configuration setting" do |attribute, value|
it "supports configuring #{attribute}" do
expect do
client.send("#{attribute}=", value)
client.send(attribute)
end.not_to raise_error
end
end

describe "configuration" do
it_behaves_like "a configuration setting", :logger, Logger.new(STDOUT)
it_behaves_like "a configuration setting", :jwt_token_ttl, 43
it_behaves_like "a configuration setting", :connection_timeout, 43
it_behaves_like "a configuration setting", :max_connections, 13
it_behaves_like "a configuration setting", :max_threads_per_query, 6
it_behaves_like "a configuration setting", :thread_scale_factor, 5
it_behaves_like "a configuration setting", :http_retries, 2
end
end
8 changes: 5 additions & 3 deletions spec/streaming_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,15 @@
require "rb_snowflake_client"

def new_client
RubySnowflake::Client.new(
client = RubySnowflake::Client.new(
"https://oza47907.us-east-1.snowflakecomputing.com",
ENV["SNOWFLAKE_PRIVATE_KEY"], # set this in your .env file
"GBLARLO",
"OZA47907",
"SNOWFLAKE_CLIENT_TEST",
"WEB_TEST_WH"))
"SHA256:pbfmeTQ2+MestU2J9dXjGXTjtvZprYfHxzZzqqcIhFc=",
"WEB_TEST_WH")
client.logger.level = Logger::DEBUG
client
end

size = 1_000
Expand Down
6 changes: 4 additions & 2 deletions spec/test.rb
Original file line number Diff line number Diff line change
@@ -1,15 +1,17 @@
require "benchmark"
require "logger"
require "rb_snowflake_client"


def new_client
RubySnowflake::Client.new(
client = RubySnowflake::Client.new(
"https://oza47907.us-east-1.snowflakecomputing.com",
ENV["SNOWFLAKE_PRIVATE_KEY"], # set this in your .env file
"GBLARLO",
"OZA47907",
"SNOWFLAKE_CLIENT_TEST",
"WEB_TEST_WH")
client.logger.level = Logger::DEBUG
client
end

size = 1_000
Expand Down

0 comments on commit 80f86bc

Please sign in to comment.