Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added support for ClickHouse #377

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -566,6 +566,7 @@ data_sources:
- [Apache Ignite](#apache-ignite)
- [Apache Spark](#apache-spark)
- [Cassandra](#cassandra)
- [ClickHouse](#clickhouse)
- [Druid](#druid)
- [Elasticsearch](#elasticsearch)
- [Google BigQuery](#google-bigquery)
Expand Down Expand Up @@ -713,6 +714,19 @@ data_sources:
url: cassandra://user:password@hostname:9042/keyspace
```

### ClickHouse

Add [ClickHouse Ruby driver](https://github.com/shlima/click_house) to your Gemfile and set:
```yml
data_sources:
my_source:
adapter: clickhouse
url: http://user:password@hostname:8123/database

# optional settings
ssl_verify: true # false by default
```

### Druid

Enable [SQL support](http://druid.io/docs/latest/querying/sql.html#configuration) on the broker and set:
Expand Down
2 changes: 2 additions & 0 deletions lib/blazer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
require "blazer/adapters/athena_adapter"
require "blazer/adapters/bigquery_adapter"
require "blazer/adapters/cassandra_adapter"
require "blazer/adapters/clickhouse_adapter"
require "blazer/adapters/drill_adapter"
require "blazer/adapters/druid_adapter"
require "blazer/adapters/elasticsearch_adapter"
Expand Down Expand Up @@ -254,6 +255,7 @@ def self.archive_queries
Blazer.register_adapter "athena", Blazer::Adapters::AthenaAdapter
Blazer.register_adapter "bigquery", Blazer::Adapters::BigQueryAdapter
Blazer.register_adapter "cassandra", Blazer::Adapters::CassandraAdapter
Blazer.register_adapter "clickhouse", Blazer::Adapters::ClickhouseAdapter
Blazer.register_adapter "drill", Blazer::Adapters::DrillAdapter
Blazer.register_adapter "druid", Blazer::Adapters::DruidAdapter
Blazer.register_adapter "elasticsearch", Blazer::Adapters::ElasticsearchAdapter
Expand Down
84 changes: 84 additions & 0 deletions lib/blazer/adapters/clickhouse_adapter.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
module Blazer
module Adapters
class ClickhouseAdapter < BaseAdapter
DATE_TIME_TYPES = ["DateTime", "DateTime(%s)", "DateTime64(%d, %s)"].freeze

def run_statement(statement, _comment)
columns = []
rows = []
error = nil

begin
result = connection.select_all(statement)
unless result.data.blank?
date_time_columns = result.meta
.select { |column| column["type"].in?(DATE_TIME_TYPES) }
.map { |column| column["name"] }
columns = result.data.first.keys
rows = result.data.map { |row| convert_time_columns(row, date_time_columns).values }
end
rescue => e
error = e.message
end

[columns, rows, error]
end

def tables
connection.tables
end

def schema
statement = <<-SQL
SELECT table, name, type
FROM system.columns
WHERE database = currentDatabase()
ORDER BY table, position
SQL

response = connection.post(query: { query: statement, default_format: "CSV" })
response.body
.group_by { |row| row[0] }
.transform_values { |columns| columns.map { |c| { name: c[1], data_type: c[2] } } }
.map { |table, columns| { schema: "public", table: table, columns: columns } }
end

def preview_statement
"SELECT * FROM {table} LIMIT 10"
end

def explain(statement)
connection.explain(statement)
end

protected

def connection
@connection ||= ClickHouse::Connection.new(config)
end

def config
@config ||= begin
uri = URI.parse(settings["url"])
options = {
scheme: uri.scheme,
host: uri.host,
port: uri.port,
username: uri.user,
password: uri.password,
database: uri.path.sub(/\A\//, ""),
ssl_verify: settings.fetch("ssl_verify", false)
}.compact
ClickHouse::Config.new(**options)
end
end

def convert_time_columns(row, date_time_columns)
time_values = row.slice(*date_time_columns).transform_values!(&:to_time)
row.merge(time_values)
rescue NoMethodError
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We cannot be absolutely sure that we handle DateTime object here, because built-in types can be overwritten. If conversion to Time fails, we return initial object.

row
end
end
end
end