Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

timescale db #3

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 17 additions & 2 deletions lib/limit_order/coinbase_update.ex
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,17 @@ defmodule LimitOrder.CoinbaseUpdate do
use Ecto.Schema
import Ecto.{Query, Changeset}, warn: false

@primary_key {:time, :naive_datetime, []}

defimpl Phoenix.Param do
def to_param(%{time: time}) do
NaiveDateTime.to_iso8601(time)
end
end

@type t :: %__MODULE__{}
schema "coinbase_updates" do
field(:type, :string)
field(:time, :string)
field(:sequence, :string)
field(:trade_id, :integer)
field(:product_id, :string)
Expand All @@ -20,7 +27,7 @@ defmodule LimitOrder.CoinbaseUpdate do
field(:size, :string)
field(:remaining_size, :string)
field(:reason, :string)
field(:price, :string)
field(:price, :float)
field(:side, :string)
field(:order_type, :string)
field(:funds, :string)
Expand Down Expand Up @@ -60,4 +67,12 @@ defmodule LimitOrder.CoinbaseUpdate do

# |> validate_required()
end

defmodule Query do
import Ecto.Query

def latest(query \\ Trade, count) do
from query, order_by: [desc: :time], limit: ^count
end
end
end
10 changes: 8 additions & 2 deletions lib/limit_order/limit_order.ex
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,8 @@ defmodule LimitOrder.Coinbase do
payload = Jason.decode!(payload)
changeset = LimitOrder.CoinbaseUpdate.changeset(%LimitOrder.CoinbaseUpdate{}, payload)

LimitOrder.Repo.insert!(changeset)
LimitOrder.Repo.insert(changeset)
|> IO.inspect()

product_id = payload["product_id"]

Expand Down Expand Up @@ -104,8 +105,13 @@ defmodule LimitOrder.Coinbase do
%LimitOrder.CoinbaseUpdate{},
Map.merge(payload, %{"sequence" => sequence |> Integer.to_string()})
)
|> IO.inspect()

LimitOrder.Repo.insert!(changeset)
IO.puts("dingus")

LimitOrder.Repo.insert(changeset)

IO.puts("dingus ok")

book_agent = Agent.get(books_agent, &Map.get(&1, product_id))
sequences = Agent.get(books_agent, &Map.get(&1, :sequences))
Expand Down
6 changes: 3 additions & 3 deletions priv/repo/migrations/20190225003708_create_updates.exs
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@ defmodule LimitOrder.Repo.Migrations.CreateCoinbaseUpdates do
use Ecto.Migration

def change do
create table(:coinbase_updates) do
create table(:coinbase_updates, primary_key: false) do
add :type, :string
add :time, :string
add :time, :naive_datetime, null: false
add :sequence, :string
add :trade_id, :integer
add :product_id, :string
Expand All @@ -17,7 +17,7 @@ defmodule LimitOrder.Repo.Migrations.CreateCoinbaseUpdates do
add :size, :string
add :remaining_size, :string
add :reason, :string
add :price, :string
add :price, :float
add :side, :string
add :order_type, :string
add :funds, :string
Expand Down
62 changes: 31 additions & 31 deletions priv/repo/migrations/20190305232946_create_trigger.exs
Original file line number Diff line number Diff line change
Expand Up @@ -2,37 +2,37 @@ defmodule LimitOrder.Repo.Migrations.CreateTrigger do
use Ecto.Migration

def change do
execute """
CREATE OR REPLACE FUNCTION notify_coinbase_updates_changes()
RETURNS trigger AS $$
DECLARE
current_row RECORD;
BEGIN
IF (TG_OP = 'INSERT' OR TG_OP = 'UPDATE') THEN
current_row := NEW;
ELSE
current_row := OLD;
END IF;
PERFORM pg_notify(
'coinbase_updates_changes',
json_build_object(
'table', TG_TABLE_NAME,
'type', TG_OP,
'id', current_row.id,
'data', row_to_json(current_row)
)::text
);
RETURN current_row;
END;
$$ LANGUAGE plpgsql;
"""
# execute """
# CREATE OR REPLACE FUNCTION notify_coinbase_updates_changes()
# RETURNS trigger AS $$
# DECLARE
# current_row RECORD;
# BEGIN
# IF (TG_OP = 'INSERT' OR TG_OP = 'UPDATE') THEN
# current_row := NEW;
# ELSE
# current_row := OLD;
# END IF;
# PERFORM pg_notify(
# 'coinbase_updates_changes',
# json_build_object(
# 'table', TG_TABLE_NAME,
# 'type', TG_OP,
# 'id', current_row.id,
# 'data', row_to_json(current_row)
# )::text
# );
# RETURN current_row;
# END;
# $$ LANGUAGE plpgsql;
# """

execute """
CREATE TRIGGER notify_coinbase_updates_changes_trigger
AFTER INSERT OR UPDATE OR DELETE
ON coinbase_updates
FOR EACH ROW
EXECUTE PROCEDURE notify_coinbase_updates_changes();
"""
# execute """
# CREATE TRIGGER notify_coinbase_updates_changes_trigger
# AFTER INSERT OR UPDATE OR DELETE
# ON coinbase_updates
# FOR EACH ROW
# EXECUTE PROCEDURE notify_coinbase_updates_changes();
# """
end
end
10 changes: 10 additions & 0 deletions priv/repo/migrations/20190822005050_add_timescale.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
defmodule LimitOrder.Repo.Migrations.AddTimescale do
use Ecto.Migration

def change do
execute "CREATE EXTENSION IF NOT EXISTS timescaledb CASCADE"
execute "SELECT create_hypertable('coinbase_updates', 'time')"
execute "CREATE USER grafana"
execute "GRANT SELECT ON coinbase_updates to grafana"
end
end