From 9e23a60353a0f4c50bc60a6139794fbc479db523 Mon Sep 17 00:00:00 2001 From: Harley McKee Date: Thu, 22 Aug 2019 09:46:53 -0500 Subject: [PATCH] timescale db --- lib/limit_order/coinbase_update.ex | 19 +++++- lib/limit_order/limit_order.ex | 10 ++- .../20190225003708_create_updates.exs | 6 +- .../20190305232946_create_trigger.exs | 62 +++++++++---------- .../20190822005050_add_timescale.exs | 10 +++ 5 files changed, 69 insertions(+), 38 deletions(-) create mode 100644 priv/repo/migrations/20190822005050_add_timescale.exs diff --git a/lib/limit_order/coinbase_update.ex b/lib/limit_order/coinbase_update.ex index 21c5d71..7ea1891 100644 --- a/lib/limit_order/coinbase_update.ex +++ b/lib/limit_order/coinbase_update.ex @@ -5,10 +5,17 @@ defmodule LimitOrder.CoinbaseUpdate do use Ecto.Schema import Ecto.{Query, Changeset}, warn: false + @primary_key {:time, :naive_datetime, []} + + defimpl Phoenix.Param do + def to_param(%{time: time}) do + NaiveDateTime.to_iso8601(time) + end + end + @type t :: %__MODULE__{} schema "coinbase_updates" do field(:type, :string) - field(:time, :string) field(:sequence, :string) field(:trade_id, :integer) field(:product_id, :string) @@ -20,7 +27,7 @@ defmodule LimitOrder.CoinbaseUpdate do field(:size, :string) field(:remaining_size, :string) field(:reason, :string) - field(:price, :string) + field(:price, :float) field(:side, :string) field(:order_type, :string) field(:funds, :string) @@ -60,4 +67,12 @@ defmodule LimitOrder.CoinbaseUpdate do # |> validate_required() end + + defmodule Query do + import Ecto.Query + + def latest(query \\ Trade, count) do + from query, order_by: [desc: :time], limit: ^count + end + end end diff --git a/lib/limit_order/limit_order.ex b/lib/limit_order/limit_order.ex index 8b42042..9d0e390 100644 --- a/lib/limit_order/limit_order.ex +++ b/lib/limit_order/limit_order.ex @@ -69,7 +69,8 @@ defmodule LimitOrder.Coinbase do payload = Jason.decode!(payload) changeset = LimitOrder.CoinbaseUpdate.changeset(%LimitOrder.CoinbaseUpdate{}, payload) - LimitOrder.Repo.insert!(changeset) + LimitOrder.Repo.insert(changeset) + |> IO.inspect() product_id = payload["product_id"] @@ -104,8 +105,13 @@ defmodule LimitOrder.Coinbase do %LimitOrder.CoinbaseUpdate{}, Map.merge(payload, %{"sequence" => sequence |> Integer.to_string()}) ) + |> IO.inspect() - LimitOrder.Repo.insert!(changeset) + IO.puts("dingus") + + LimitOrder.Repo.insert(changeset) + + IO.puts("dingus ok") book_agent = Agent.get(books_agent, &Map.get(&1, product_id)) sequences = Agent.get(books_agent, &Map.get(&1, :sequences)) diff --git a/priv/repo/migrations/20190225003708_create_updates.exs b/priv/repo/migrations/20190225003708_create_updates.exs index de1b9a7..a9744ea 100644 --- a/priv/repo/migrations/20190225003708_create_updates.exs +++ b/priv/repo/migrations/20190225003708_create_updates.exs @@ -2,9 +2,9 @@ defmodule LimitOrder.Repo.Migrations.CreateCoinbaseUpdates do use Ecto.Migration def change do - create table(:coinbase_updates) do + create table(:coinbase_updates, primary_key: false) do add :type, :string - add :time, :string + add :time, :naive_datetime, null: false add :sequence, :string add :trade_id, :integer add :product_id, :string @@ -17,7 +17,7 @@ defmodule LimitOrder.Repo.Migrations.CreateCoinbaseUpdates do add :size, :string add :remaining_size, :string add :reason, :string - add :price, :string + add :price, :float add :side, :string add :order_type, :string add :funds, :string diff --git a/priv/repo/migrations/20190305232946_create_trigger.exs b/priv/repo/migrations/20190305232946_create_trigger.exs index e043e27..f7d230b 100644 --- a/priv/repo/migrations/20190305232946_create_trigger.exs +++ b/priv/repo/migrations/20190305232946_create_trigger.exs @@ -2,37 +2,37 @@ defmodule LimitOrder.Repo.Migrations.CreateTrigger do use Ecto.Migration def change do - execute """ - CREATE OR REPLACE FUNCTION notify_coinbase_updates_changes() - RETURNS trigger AS $$ - DECLARE - current_row RECORD; - BEGIN - IF (TG_OP = 'INSERT' OR TG_OP = 'UPDATE') THEN - current_row := NEW; - ELSE - current_row := OLD; - END IF; - PERFORM pg_notify( - 'coinbase_updates_changes', - json_build_object( - 'table', TG_TABLE_NAME, - 'type', TG_OP, - 'id', current_row.id, - 'data', row_to_json(current_row) - )::text - ); - RETURN current_row; - END; - $$ LANGUAGE plpgsql; - """ + # execute """ + # CREATE OR REPLACE FUNCTION notify_coinbase_updates_changes() + # RETURNS trigger AS $$ + # DECLARE + # current_row RECORD; + # BEGIN + # IF (TG_OP = 'INSERT' OR TG_OP = 'UPDATE') THEN + # current_row := NEW; + # ELSE + # current_row := OLD; + # END IF; + # PERFORM pg_notify( + # 'coinbase_updates_changes', + # json_build_object( + # 'table', TG_TABLE_NAME, + # 'type', TG_OP, + # 'id', current_row.id, + # 'data', row_to_json(current_row) + # )::text + # ); + # RETURN current_row; + # END; + # $$ LANGUAGE plpgsql; + # """ - execute """ - CREATE TRIGGER notify_coinbase_updates_changes_trigger - AFTER INSERT OR UPDATE OR DELETE - ON coinbase_updates - FOR EACH ROW - EXECUTE PROCEDURE notify_coinbase_updates_changes(); - """ + # execute """ + # CREATE TRIGGER notify_coinbase_updates_changes_trigger + # AFTER INSERT OR UPDATE OR DELETE + # ON coinbase_updates + # FOR EACH ROW + # EXECUTE PROCEDURE notify_coinbase_updates_changes(); + # """ end end diff --git a/priv/repo/migrations/20190822005050_add_timescale.exs b/priv/repo/migrations/20190822005050_add_timescale.exs new file mode 100644 index 0000000..882b3b1 --- /dev/null +++ b/priv/repo/migrations/20190822005050_add_timescale.exs @@ -0,0 +1,10 @@ +defmodule LimitOrder.Repo.Migrations.AddTimescale do + use Ecto.Migration + + def change do + execute "CREATE EXTENSION IF NOT EXISTS timescaledb CASCADE" + execute "SELECT create_hypertable('coinbase_updates', 'time')" + execute "CREATE USER grafana" + execute "GRANT SELECT ON coinbase_updates to grafana" + end +end