diff --git a/lib/ruby_snowflake/client/threaded_in_memory_strategy.rb b/lib/ruby_snowflake/client/threaded_in_memory_strategy.rb index 88c0e7c..e3e9934 100644 --- a/lib/ruby_snowflake/client/threaded_in_memory_strategy.rb +++ b/lib/ruby_snowflake/client/threaded_in_memory_strategy.rb @@ -9,31 +9,31 @@ def self.result(statement_json_body, retreive_proc, num_threads) result[0] = statement_json_body["data"] thread_pool = Concurrent::FixedThreadPool.new(num_threads) - futures = [] - partitions.each_with_index do |partition, index| - next if index == 0 # already have the first partition - futures << Concurrent::Future.execute(executor: thread_pool) do - [index, retreive_proc.call(index)] + partitions + .each_with_index.map do |partition, index| + next if index == 0 # already have the first partition + [index, Concurrent::Future.execute(executor: thread_pool) { retreive_proc.call(index) }] end - end - futures.each do |future| - if future.rejected? - if future.reason.is_a? RubySnowflake::Error - raise future.reason - else - raise ConnectionStarvedError.new( - "A partition request timed out. This is usually do to using the client in" \ - "multiple threads. The client uses a connection thread pool and if too many" \ - "requests are all done in threads at the same time, threads can get starved" \ - "of access to connections. The solution for this is to either increase the " \ - "max_connections parameter on the client or create a new client instance" \ - "with it's own connection pool to snowflake per thread. Rejection reason: #{future.reason.message}" - ) + .each do |entry| + next if entry.nil? # 0th index + + index, future = entry + if future.rejected? + if future.reason.is_a? RubySnowflake::Error + raise future.reason + else + raise ConnectionStarvedError.new( + "A partition request timed out. This is usually do to using the client in" \ + "multiple threads. The client uses a connection thread pool and if too many" \ + "requests are all done in threads at the same time, threads can get starved" \ + "of access to connections. The solution for this is to either increase the " \ + "max_connections parameter on the client or create a new client instance" \ + "with it's own connection pool to snowflake per thread. Rejection reason: #{future.reason.message}" + ) + end end + result[index] = future.value end - index, partition_data = future.value - result[index] = partition_data - end result end end