Skip to content

Commit

Permalink
FIX: Decimal insert from JSON (#70)
Browse files Browse the repository at this point in the history
* FIX: Decimal insert from JSON

* add version

* make it work

* missing test files
  • Loading branch information
Zarathustra2 authored Apr 12, 2023
1 parent fd2aa4a commit 8c01e73
Show file tree
Hide file tree
Showing 8 changed files with 152 additions and 32 deletions.
21 changes: 19 additions & 2 deletions lib/pillar.ex
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ defmodule Pillar do
alias Pillar.HttpClient
alias Pillar.QueryBuilder
alias Pillar.ResponseParser
alias Pillar.Util

@default_timeout_ms 5_000

Expand All @@ -13,9 +14,25 @@ defmodule Pillar do
execute_sql(connection, final_sql, options)
end

def insert_to_table(%Connection{} = connection, table_name, record_or_records, options \\ %{})
def insert_to_table(
%Connection{version: version} = connection,
table_name,
record_or_records,
options \\ %{}
)
when is_binary(table_name) do
final_sql = QueryBuilder.insert_to_table(table_name, record_or_records)
query_options = Map.get(options, :query_options, %{})

final_sql =
QueryBuilder.insert_to_table(table_name, record_or_records, version, query_options)

options =
if Util.has_input_format_json_read_numbers_as_strings?(version) do
Map.put(options, :input_format_json_read_numbers_as_strings, true)
else
options
end

execute_sql(connection, final_sql, options)
end

Expand Down
32 changes: 30 additions & 2 deletions lib/pillar/connection.ex
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ defmodule Pillar.Connection do
Structure with connection config, such as host, port, user, password and other
"""

require Logger

@boolean_to_clickhouse %{
true => 1,
false => 0
Expand All @@ -16,7 +18,8 @@ defmodule Pillar.Connection do
user: String.t(),
database: String.t(),
max_query_size: integer() | nil,
allow_suspicious_low_cardinality_types: boolean() | nil
allow_suspicious_low_cardinality_types: boolean() | nil,
version: Version.t()
}
defstruct host: nil,
port: nil,
Expand All @@ -25,7 +28,8 @@ defmodule Pillar.Connection do
user: nil,
database: nil,
max_query_size: nil,
allow_suspicious_low_cardinality_types: nil
allow_suspicious_low_cardinality_types: nil,
version: nil

@doc """
Generates Connection from typical connection string:
Expand Down Expand Up @@ -61,6 +65,7 @@ defmodule Pillar.Connection do
password: password,
max_query_size: nil_or_string_to_int(params["max_query_size"])
}
|> add_version()
end

def url_from_connection(%__MODULE__{} = connect_config, options \\ %{}) do
Expand All @@ -87,6 +92,24 @@ defmodule Pillar.Connection do
URI.to_string(uri_struct)
end

defp add_version(conn) do
case Pillar.query(conn, "select version();") do
{:ok, version} ->
version =
String.replace(version, "\n", "")
|> String.split(".")
|> Enum.take(3)
|> Enum.join(".")

%{conn | version: Version.parse!(version)}

{:error, msg} ->
Logger.error("Failed to get version of clickhouse database #{inspect(msg)}")

conn
end
end

defp parse_options(params, %{db_side_batch_insertions: true} = options) do
Map.put(params, "async_insert", 1)
|> parse_options(Map.delete(options, :db_side_batch_insertions))
Expand All @@ -97,6 +120,11 @@ defmodule Pillar.Connection do
|> parse_options(Map.delete(options, :allow_experimental_object_type))
end

defp parse_options(params, %{input_format_json_read_numbers_as_strings: true} = options) do
Map.put(params, "input_format_json_read_numbers_as_strings", 1)
|> parse_options(Map.delete(options, :input_format_json_read_numbers_as_strings))
end

defp parse_options(params, _options), do: params

defp nil_or_string_to_int(value) do
Expand Down
19 changes: 13 additions & 6 deletions lib/pillar/query_builder.ex
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,21 @@ defmodule Pillar.QueryBuilder do
end
end

def insert_to_table(table_name, record) when is_map(record) do
converted_value = convert_values_to_clickhouse_for_json_insert(record)
def insert_to_table(table_name, records, db_version \\ nil, query_options \\ %{})

def insert_to_table(table_name, record, db_version, query_options) when is_map(record) do
converted_value =
convert_values_to_clickhouse_for_json_insert(record, db_version, query_options)

generate_json_insert_query(table_name, List.wrap(converted_value))
end

def insert_to_table(table_name, records) when is_list(records) do
converted_values = Enum.map(records, &convert_values_to_clickhouse_for_json_insert/1)
def insert_to_table(table_name, records, db_version, query_options) when is_list(records) do
converted_values =
Enum.map(
records,
&convert_values_to_clickhouse_for_json_insert(&1, db_version, query_options)
)

generate_json_insert_query(table_name, converted_values)
end
Expand All @@ -41,11 +48,11 @@ defmodule Pillar.QueryBuilder do
Enum.join(sql_strings, "\n")
end

defp convert_values_to_clickhouse_for_json_insert(map) do
defp convert_values_to_clickhouse_for_json_insert(map, db_version, query_options) do
map
|> Enum.reject(fn {_key, value} -> is_nil(value) end)
|> Enum.map(fn {key, value} ->
{key, ToClickhouseJson.convert(value)}
{key, ToClickhouseJson.convert(value, db_version, query_options)}
end)
|> Map.new()
end
Expand Down
53 changes: 38 additions & 15 deletions lib/pillar/type_convert/to_clickhouse_json.ex
Original file line number Diff line number Diff line change
@@ -1,62 +1,85 @@
defmodule Pillar.TypeConvert.ToClickhouseJson do
require Decimal

alias Pillar.Util

def convert(param, db_version \\ nil, query_options \\ %{})

@moduledoc false
def convert(param) when is_list(param) do
def convert(param, db_version, query_options) when is_list(param) do
if !Enum.empty?(param) && Keyword.keyword?(param) do
param
|> Enum.map(fn {k, v} -> {to_string(k), convert(v)} end)
|> Enum.map(fn
{k, d} -> {to_string(k), convert(d, db_version, query_options)}
end)
|> Enum.into(%{})
else
Enum.map(param, &convert/1)
Enum.map(param, fn d ->
convert(d, db_version, query_options)
end)
end
end

def convert(param, db_version, query_options) when Decimal.is_decimal(param) do
cond do
Util.has_input_format_json_read_numbers_as_strings?(db_version) ->
Decimal.to_string(param)

Map.get(query_options, :decimal_as_float) == true ->
Decimal.to_float(param)

:else ->
raise "Your clickhouse version #{db_version} does not support inserting decimals as strings. You can allow converting decimals to floats by passing the option `query_options: %{decimal_as_float: true}` to insert_to_table"
end
end

def convert(param) when is_integer(param) do
def convert(param, _, _) when is_integer(param) do
Integer.to_string(param)
end

def convert(param) when is_boolean(param) do
def convert(param, _, _) when is_boolean(param) do
case param do
true -> 1
false -> 0
end
end

def convert(nil), do: nil
def convert(nil, _, _), do: nil

def convert(param) when is_atom(param) do
def convert(param, _, _) when is_atom(param) do
Atom.to_string(param)
end

def convert(param) when is_float(param) do
def convert(param, _, _) when is_float(param) do
Float.to_string(param)
end

def convert(%DateTime{} = datetime) do
def convert(%DateTime{} = datetime, _, _) do
datetime
|> DateTime.truncate(:second)
|> DateTime.to_iso8601()
|> String.replace("Z", "")
end

def convert(%Date{} = date) do
def convert(%Date{} = date, _, _) do
date
|> Date.to_iso8601()
end

def convert(param) when is_map(param) do
def convert(param, db_version, query_options) when is_map(param) do
json = Jason.encode!(param)
convert(json)
convert(json, db_version, query_options)
end

def convert(param) when is_binary(param) do
def convert(param, _, _) when is_binary(param) do
param
end

def convert({:json, param}) do
def convert({:json, param}, _, _) do
param
end

def convert(param) do
def convert(param, _, _) do
param
end
end
9 changes: 9 additions & 0 deletions lib/pillar/util.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
defmodule Pillar.Util do
def has_input_format_json_read_numbers_as_strings?(%Version{} = version) do
cond do
is_nil(version) -> false
Version.compare(version, "23.0.0") != :lt -> true
:else -> false
end
end
end
15 changes: 9 additions & 6 deletions test/pillar/connection_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,9 @@ defmodule Pillar.ConnectionTest do
user: "user",
password: "password",
port: 8123,
max_query_size: 1024
} ==
max_query_size: 1024,
version: _
} =
Connection.new(
"https://user:password@localhost:8123/some_database?max_query_size=1024"
)
Expand All @@ -24,8 +25,9 @@ defmodule Pillar.ConnectionTest do
scheme: "http",
user: "alice",
password: nil,
port: 8123
} == Connection.new("http://alice@localhost:8123")
port: 8123,
version: _
} = Connection.new("http://alice@localhost:8123")
end

test "#new - minimum required params" do
Expand All @@ -35,8 +37,9 @@ defmodule Pillar.ConnectionTest do
scheme: "http",
user: nil,
password: nil,
port: 8123
} == Connection.new("http://localhost:8123")
port: 8123,
version: _
} = Connection.new("http://localhost:8123")
end

describe "#url_from_connection" do
Expand Down
2 changes: 1 addition & 1 deletion test/pillar/query_builder_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ defmodule Pillar.QueryBuilderTest do
"FORMAT JSONEachRow",
"{\"field_1\":\"1\",\"field_2\":\"2\",\"field_3\":\"3\"} {\"field_2\":\"2\",\"field_3\":\"4\"} {\"field_1\":\"1\"} {\"field_2\":\"2\"} {\"field_3\":\"4\"} {}"
] ==
String.split(QueryBuilder.insert_to_table(table_name, values), "\n")
String.split(QueryBuilder.insert_to_table(table_name, values, nil), "\n")
end
end
end
33 changes: 33 additions & 0 deletions test/pillar_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -693,6 +693,39 @@ defmodule PillarTest do
Pillar.select(conn, "select * from #{table_name}")
end

test "insert keyword as decimal", %{conn: conn} do
table_name = "to_table_inserts_decimal_#{@timestamp}"

create_table_sql = """
CREATE TABLE IF NOT EXISTS #{table_name} (
field1 Decimal32(4)
) ENGINE = Memory
"""

%{"major" => major} = version(conn)

assert {:ok, ""} = Pillar.query(conn, create_table_sql)

record = %{
"field1" => Decimal.new(1)
}

if major >= 23 do
assert {:ok, ""} = Pillar.insert_to_table(conn, table_name, [record])
assert {:ok, [^record]} = Pillar.select(conn, "select * from #{table_name}")
else
assert_raise RuntimeError, fn ->
Pillar.insert_to_table(conn, table_name, [record])
end

Pillar.insert_to_table(conn, table_name, [record], %{
query_options: %{decimal_as_float: true}
})

assert {:ok, [^record]} = Pillar.select(conn, "select * from #{table_name}")
end
end

test "insert keyword as map", %{conn: conn} do
%{"major" => major} = version(conn)

Expand Down

0 comments on commit 8c01e73

Please sign in to comment.