From 86e2d1ca96419ea5e3774b66ab71b108d819e4a7 Mon Sep 17 00:00:00 2001 From: Dmitry Russ Date: Wed, 8 Jun 2016 10:47:15 +0200 Subject: [PATCH] Add json support for MySQL => 5.7.9 --- lib/mariaex/messages.ex | 4 +- lib/mariaex/protocol.ex | 16 +++-- lib/mariaex/query.ex | 125 ++++++++++++++++++++++++---------------- mix.exs | 3 +- mix.lock | 20 ++++--- test/query_test.exs | 14 ++++- 6 files changed, 116 insertions(+), 66 deletions(-) diff --git a/lib/mariaex/messages.ex b/lib/mariaex/messages.ex index c9cd063..fddf76e 100644 --- a/lib/mariaex/messages.ex +++ b/lib/mariaex/messages.ex @@ -66,7 +66,9 @@ defmodule Mariaex.Messages do field_type_var_string: 0xfd, field_type_string: 0xfe], null: - [field_type_null: 0x06] + [field_type_null: 0x06], + json: + [field_type_json: 0xf5] ] def __type__(:decode, _type, nil), do: nil diff --git a/lib/mariaex/protocol.ex b/lib/mariaex/protocol.ex index 925ae9b..fc15881 100644 --- a/lib/mariaex/protocol.ex +++ b/lib/mariaex/protocol.ex @@ -40,7 +40,7 @@ defmodule Mariaex.Protocol do defstruct [sock: nil, connection_ref: nil, state: nil, state_data: nil, protocol57: false, rows: [], connection_id: nil, opts: [], catch_eof: false, buffer: "", timeout: 0, - lru_cache: nil, cache: nil, seqnum: 0] + lru_cache: nil, cache: nil, seqnum: 0, json_library: nil] @doc """ DBConnection callback @@ -50,6 +50,7 @@ defmodule Mariaex.Protocol do sock_type = opts[:sock_type] |> Atom.to_string |> String.capitalize() sock_mod = Module.concat(Mariaex.Connection, sock_type) host = opts[:hostname] + json_library = opts[:json_library] host = if is_binary(host), do: String.to_char_list(host), else: host connect_opts = [host, opts[:port], opts[:socket_options], opts[:timeout]] @@ -62,7 +63,8 @@ defmodule Mariaex.Protocol do cache: Cache.new(), lru_cache: LruCache.new(opts[:cache_size]), timeout: opts[:timeout], - opts: opts} + opts: opts, + json_library: json_library} handshake_recv(s, %{opts: opts}) {:error, reason} -> {:error, %Mariaex.Error{message: "tcp connect: #{reason}"}} @@ -220,13 +222,17 @@ defmodule Mariaex.Protocol do def handle_prepare(%Query{type: :text} = query, _, s) do {:ok, query, s} end - def handle_prepare(%Query{type: :binary, statement: statement} = query, _, %{connection_ref: ref} = s) do + def handle_prepare(%Query{type: :binary, statement: statement} = query, _, %{connection_ref: ref, json_library: json_library} = s) do case cache_lookup(query, s) do {id, types, parameter_types} -> - {:ok, %{query | statement_id: id, types: types, parameter_types: parameter_types, connection_ref: ref}, s} + {:ok, %{query | statement_id: id, + types: types, + parameter_types: parameter_types, + connection_ref: ref, + json_library: json_library}, s} nil -> msg_send(text_cmd(command: com_stmt_prepare, statement: statement), s, 0) - prepare_recv(%{s | state: :prepare_send}, query) + prepare_recv(%{s | state: :prepare_send}, %{query | json_library: json_library}) end end diff --git a/lib/mariaex/query.ex b/lib/mariaex/query.ex index 0e232ee..c44bb50 100644 --- a/lib/mariaex/query.ex +++ b/lib/mariaex/query.ex @@ -19,7 +19,8 @@ defmodule Mariaex.Query do statement_id: nil, parameter_types: [], types: [], - connection_ref: nil + connection_ref: nil, + json_library: nil end defimpl DBConnection.Query, for: Mariaex.Query do @@ -57,9 +58,9 @@ defimpl DBConnection.Query, for: Mariaex.Query do def encode(%Mariaex.Query{types: nil} = query, _params, _opts) do raise ArgumentError, "query #{inspect query} has not been prepared" end - def encode(%Mariaex.Query{type: :binary, parameter_types: parameter_types} = query, params, _opts) do + def encode(%Mariaex.Query{type: :binary, parameter_types: parameter_types, json_library: json_library} = query, params, _opts) do if length(params) == length(parameter_types) do - parameter_types |> Enum.zip(params) |> parameters_to_binary() + parameter_types |> Enum.zip(params) |> parameters_to_binary(json_library) else raise ArgumentError, "parameters must be of length #{length params} for query #{inspect query}" end @@ -68,15 +69,15 @@ defimpl DBConnection.Query, for: Mariaex.Query do params end - defp parameters_to_binary([]), do: <<>> - defp parameters_to_binary(params) do + defp parameters_to_binary([], _json_library), do: <<>> + defp parameters_to_binary(params, json_library) do set = {<<>>, <<>>, <<>>} - {nullbits, typesbin, valuesbin} = Enum.reduce(params, set, fn(p, acc) -> encode_params(p, acc) end) + {nullbits, typesbin, valuesbin} = Enum.reduce(params, set, fn(p, acc) -> encode_params(p, json_library, acc) end) << null_map_to_mysql(nullbits, <<>>) :: binary, 1 :: 8, typesbin :: binary, valuesbin :: binary >> end - defp encode_params({_, param}, {nullbits, typesbin, valuesbin}) do - {nullbit, type, value} = encode_param(param) + defp encode_params({_, param}, json_library, {nullbits, typesbin, valuesbin}) do + {nullbit, type, value} = encode_param(param, json_library) types_part = case type do :field_type_longlong -> @@ -97,33 +98,37 @@ defimpl DBConnection.Query, for: Mariaex.Query do } end - defp encode_param(nil), + defp encode_param(nil, _), do: {1, :field_type_null, ""} - defp encode_param(bin) when is_binary(bin), + defp encode_param(json, json_library) when is_map(json) do + bin = json_library.encode!(json) + {0, :field_type_var_string, << to_length_encoded_integer(byte_size(bin)) :: binary, bin :: binary >>} + end + defp encode_param(bin, _) when is_binary(bin), do: {0, :field_type_var_string, << to_length_encoded_integer(byte_size(bin)) :: binary, bin :: binary >>} - defp encode_param(int) when is_integer(int), + defp encode_param(int, _) when is_integer(int), do: {0, :field_type_longlong, << int :: 64-little >>} - defp encode_param(float) when is_float(float), + defp encode_param(float, _) when is_float(float), do: {0, :field_type_double, << float :: 64-little-float >>} - defp encode_param(true), + defp encode_param(true, _), do: {0, :field_type_tiny, << 01 >>} - defp encode_param(false), + defp encode_param(false, _), do: {0, :field_type_tiny, << 00 >>} - defp encode_param(%Decimal{} = value) do + defp encode_param(%Decimal{} = value, _) do bin = Decimal.to_string(value, :normal) {0, :field_type_newdecimal, << to_length_encoded_integer(byte_size(bin)) :: binary, bin :: binary >>} end - defp encode_param({year, month, day}), + defp encode_param({year, month, day}, _), do: {0, :field_type_date, << 4::8-little, year::16-little, month::8-little, day::8-little>>} - defp encode_param({hour, min, sec, 0}), + defp encode_param({hour, min, sec, 0}, _), do: {0, :field_type_time, << 8 :: 8-little, 0 :: 8-little, 0 :: 32-little, hour :: 8-little, min :: 8-little, sec :: 8-little >>} - defp encode_param({hour, min, sec, msec}), + defp encode_param({hour, min, sec, msec}, _), do: {0, :field_type_time, << 12 :: 8-little, 0 :: 8-little, 0 :: 32-little, hour :: 8-little, min :: 8-little, sec :: 8-little, msec :: 32-little>>} - defp encode_param({{year, month, day}, {hour, min, sec, 0}}), + defp encode_param({{year, month, day}, {hour, min, sec, 0}}, _), do: {0, :field_type_datetime, << 7::8-little, year::16-little, month::8-little, day::8-little, hour::8-little, min::8-little, sec::8-little>>} - defp encode_param({{year, month, day}, {hour, min, sec, msec}}), + defp encode_param({{year, month, day}, {hour, min, sec, msec}}, _), do: {0, :field_type_datetime, <<11::8-little, year::16-little, month::8-little, day::8-little, hour::8-little, min::8-little, sec::8-little, msec::32-little>>} - defp encode_param(other), + defp encode_param(other, _), do: raise ArgumentError, "query has invalid parameter #{inspect other}" defp null_map_to_mysql(<>, acc) do @@ -146,7 +151,7 @@ defimpl DBConnection.Query, for: Mariaex.Query do @unsigned_flag 0x20 def decode(_, %{rows: nil} = res, _), do: res - def decode(%Mariaex.Query{statement: statement}, {res, types}, opts) do + def decode(%Mariaex.Query{statement: statement, json_library: json_library}, {res, types}, opts) do command = Mariaex.Protocol.get_command(statement) if command in @commands_without_rows do %Mariaex.Result{res | command: command, rows: nil} @@ -154,7 +159,7 @@ defimpl DBConnection.Query, for: Mariaex.Query do mapper = opts[:decode_mapper] || fn x -> x end %Mariaex.Result{rows: rows} = res types = Enum.reverse(types) - decoded = do_decode(rows, types, mapper) + decoded = do_decode(rows, json_library, types, mapper) include_table_name = opts[:include_table_name] columns = for %Column{} = column <- types, do: column_name(column, include_table_name) %Mariaex.Result{res | command: command, @@ -169,43 +174,60 @@ defimpl DBConnection.Query, for: Mariaex.Query do defp column_name(%Column{name: name, table: table}, true), do: "#{table}.#{name}" defp column_name(%Column{name: name}, _), do: name - def do_decode(_, types, mapper \\ fn x -> x end) - def do_decode(rows, types, mapper) do - rows |> Enum.reduce([], &([(decode_bin_rows(&1, types) |> mapper.()) | &2])) + def do_decode(_, json_library, types, mapper \\ fn x -> x end) + def do_decode(rows, json_library, types, mapper) do + rows |> Enum.reduce([], &([(decode_bin_rows(&1, types, json_library) |> mapper.()) | &2])) end - def decode_bin_rows(packet, fields) do + def decode_bin_rows(packet, fields, json_library) do nullbin_size = div(length(fields) + 7 + 2, 8) << 0 :: 8, nullbin :: size(nullbin_size)-binary, rest :: binary >> = packet nullbin = null_map_from_mysql(nullbin) - decode_bin_rows(rest, fields, nullbin, []) + decode_bin_rows(rest, fields, nullbin, [], json_library) end - def decode_bin_rows(<<>>, [], _, acc) do + def decode_bin_rows(<<>>, [], _, acc, _) do Enum.reverse(acc) end - def decode_bin_rows(packet, [_ | fields], << 1 :: 1, nullrest :: bits >>, acc) do - decode_bin_rows(packet, fields, nullrest, [nil | acc]) + def decode_bin_rows(packet, [_ | fields], << 1 :: 1, nullrest :: bits >>, acc, json_library) do + decode_bin_rows(packet, fields, nullrest, [nil | acc], json_library) end - def decode_bin_rows(packet, [%Column{type: type, flags: flags} | fields], << 0 :: 1, nullrest :: bits >>, acc) do - {value, next} = handle_decode_bin_rows(Messages.__type__(:type, type), packet, flags) - decode_bin_rows(next, fields, nullrest, [value | acc]) + def decode_bin_rows(packet, [%Column{type: type, flags: flags} | fields], << 0 :: 1, nullrest :: bits >>, acc, json_library) do + {value, next} = handle_decode_bin_rows(Messages.__type__(:type, type), packet, flags, json_library) + decode_bin_rows(next, fields, nullrest, [value | acc], json_library) end - defp handle_decode_bin_rows({:string, _mysql_type}, packet, _), do: length_encoded_string(packet) - defp handle_decode_bin_rows({:integer, :field_type_tiny}, packet, flags), do: parse_int_packet(packet, 8, flags) - defp handle_decode_bin_rows({:integer, :field_type_short}, packet, flags), do: parse_int_packet(packet, 16, flags) - defp handle_decode_bin_rows({:integer, :field_type_int24}, packet, flags), do: parse_int_packet(packet, 32, flags) - defp handle_decode_bin_rows({:integer, :field_type_long}, packet, flags), do: parse_int_packet(packet, 32, flags) - defp handle_decode_bin_rows({:integer, :field_type_longlong}, packet, flags), do: parse_int_packet(packet, 64, flags) - defp handle_decode_bin_rows({:integer, :field_type_year}, packet, flags), do: parse_int_packet(packet, 16, flags) - defp handle_decode_bin_rows({:time, :field_type_time}, packet, _), do: parse_time_packet(packet) - defp handle_decode_bin_rows({:date, :field_type_date}, packet, _), do: parse_date_packet(packet) - defp handle_decode_bin_rows({:timestamp, :field_type_datetime}, packet, _), do: parse_datetime_packet(packet) - defp handle_decode_bin_rows({:timestamp, :field_type_timestamp}, packet, _), do: parse_datetime_packet(packet) - defp handle_decode_bin_rows({:decimal, :field_type_newdecimal}, packet, _), do: parse_decimal_packet(packet) - defp handle_decode_bin_rows({:float, :field_type_float}, packet, _), do: parse_float_packet(packet, 32) - defp handle_decode_bin_rows({:float, :field_type_double}, packet, _), do: parse_float_packet(packet, 64) - defp handle_decode_bin_rows({:bit, :field_type_bit}, packet, _), do: parse_bit_packet(packet) + defp handle_decode_bin_rows({:string, _mysql_type}, packet, _, _), + do: length_encoded_string(packet) + defp handle_decode_bin_rows({:integer, :field_type_tiny}, packet, flags, _), + do: parse_int_packet(packet, 8, flags) + defp handle_decode_bin_rows({:integer, :field_type_short}, packet, flags, _), + do: parse_int_packet(packet, 16, flags) + defp handle_decode_bin_rows({:integer, :field_type_int24}, packet, flags, _), + do: parse_int_packet(packet, 32, flags) + defp handle_decode_bin_rows({:integer, :field_type_long}, packet, flags, _), + do: parse_int_packet(packet, 32, flags) + defp handle_decode_bin_rows({:integer, :field_type_longlong}, packet, flags, _), + do: parse_int_packet(packet, 64, flags) + defp handle_decode_bin_rows({:integer, :field_type_year}, packet, flags, _), + do: parse_int_packet(packet, 16, flags) + defp handle_decode_bin_rows({:time, :field_type_time}, packet, _, _), + do: parse_time_packet(packet) + defp handle_decode_bin_rows({:date, :field_type_date}, packet, _, _), + do: parse_date_packet(packet) + defp handle_decode_bin_rows({:timestamp, :field_type_datetime}, packet, _, _), + do: parse_datetime_packet(packet) + defp handle_decode_bin_rows({:timestamp, :field_type_timestamp}, packet, _, _), + do: parse_datetime_packet(packet) + defp handle_decode_bin_rows({:decimal, :field_type_newdecimal}, packet, _, _), + do: parse_decimal_packet(packet) + defp handle_decode_bin_rows({:float, :field_type_float}, packet, _, _), + do: parse_float_packet(packet, 32) + defp handle_decode_bin_rows({:float, :field_type_double}, packet, _, _), + do: parse_float_packet(packet, 64) + defp handle_decode_bin_rows({:bit, :field_type_bit}, packet, _, _), + do: parse_bit_packet(packet) + defp handle_decode_bin_rows({:json, :field_type_json}, packet, _, json_library), + do: parse_json_packet(packet, json_library) defp parse_float_packet(packet, size) do << value :: size(size)-float-little, rest :: binary >> = packet @@ -267,6 +289,11 @@ defimpl DBConnection.Query, for: Mariaex.Query do {bitstring, rest} end + defp parse_json_packet(packet, json_library) do + {string, rest} = length_encoded_string(packet) + {json_library.decode!(string), rest} + end + defp null_map_from_mysql(nullbin) do << f :: 1, e :: 1, d :: 1, c :: 1, b :: 1, a ::1, _ :: 2, rest :: binary >> = nullbin reversebin = for << x :: 8-bits <- rest >>, into: <<>> do diff --git a/mix.exs b/mix.exs index 50c4e64..52f83a3 100644 --- a/mix.exs +++ b/mix.exs @@ -21,7 +21,8 @@ defmodule Mariaex.Mixfile do defp deps do [{:decimal, "~> 1.0"}, {:db_connection, "~> 0.2"}, - {:coverex, "~> 1.4.3", only: :test}] + {:coverex, "~> 1.4.3", only: :test}, + {:poison, "~> 2.1", optional: true}] end defp description do diff --git a/mix.lock b/mix.lock index 4368d50..7d53e3b 100644 --- a/mix.lock +++ b/mix.lock @@ -1,10 +1,12 @@ -%{"backoff": {:hex, :backoff, "1.1.1"}, +%{"certifi": {:hex, :certifi, "0.4.0"}, "connection": {:hex, :connection, "1.0.2"}, - "coverex": {:hex, :coverex, "1.4.3"}, - "db_connection": {:hex, :db_connection, "0.2.0"}, - "decimal": {:hex, :decimal, "1.1.0"}, - "hackney": {:hex, :hackney, "1.3.2"}, - "httpoison": {:hex, :httpoison, "0.7.2"}, - "idna": {:hex, :idna, "1.0.2"}, - "poison": {:hex, :poison, "1.4.0"}, - "ssl_verify_hostname": {:hex, :ssl_verify_hostname, "1.0.5"}} + "coverex": {:hex, :coverex, "1.4.9"}, + "db_connection": {:hex, :db_connection, "0.2.5"}, + "decimal": {:hex, :decimal, "1.1.2"}, + "hackney": {:hex, :hackney, "1.6.0"}, + "httpoison": {:hex, :httpoison, "0.8.3"}, + "idna": {:hex, :idna, "1.2.0"}, + "metrics": {:hex, :metrics, "1.0.1"}, + "mimerl": {:hex, :mimerl, "1.0.2"}, + "poison": {:hex, :poison, "2.1.0"}, + "ssl_verify_fun": {:hex, :ssl_verify_fun, "1.1.0"}} diff --git a/test/query_test.exs b/test/query_test.exs index 51a0873..91b5b89 100644 --- a/test/query_test.exs +++ b/test/query_test.exs @@ -3,7 +3,11 @@ defmodule QueryTest do import Mariaex.TestHelper setup do - opts = [database: "mariaex_test", username: "mariaex_user", password: "mariaex_pass", backoff_type: :stop] + opts = [database: "mariaex_test", + username: "mariaex_user", + password: "mariaex_pass", + backoff_type: :stop, + json_library: Poison] {:ok, pid} = Mariaex.Connection.start_link(opts) {:ok, [pid: pid]} end @@ -558,4 +562,12 @@ defmodule QueryTest do assert :ok = query("REPLACE INTO test_replace VALUES (1, 'Old', '2014-08-20 18:47:00');", []) assert :ok = query("REPLACE INTO test_replace VALUES (1, 'New', ?);", [timestamp]) end + + if System.get_env "MYSQL_JSON" do + test "support json for MySQL > 5.1.9", context do + assert :ok == query("CREATE TABLE json_test (jdoc JSON)", []) + assert :ok == query("INSERT INTO json_test VALUES(?)", [%{"test" => "test"}]) + assert [[%{"test" => "test"}]] == query("SELECT * FROM json_test", []) + end + end end