Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat: Pubsub system to update Casts #1

Merged
merged 40 commits into from
Sep 8, 2024
Merged
Show file tree
Hide file tree
Changes from 39 commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
d6a828f
Chore: make test case async
mindreframer Sep 2, 2024
90de59b
Feat: configure Essig.PubSub
mindreframer Sep 2, 2024
a951781
Chore: remove unused alias
mindreframer Sep 2, 2024
1e9c877
Chore: sample event structs for local testing
mindreframer Sep 2, 2024
da72f0a
Chore: broadcast events after insertion + simple subscriber
mindreframer Sep 2, 2024
b79439d
Feat: also sets metadata when starting casts
mindreframer Sep 2, 2024
eb0bb36
Chore: Drop Ecto from UUID generator module name
mindreframer Sep 2, 2024
25efe3a
Chore: add gen_state_machine (wrapper for :gen_statem)
mindreframer Sep 2, 2024
fd9e9b9
Feat: fetch DB row for casts on init
mindreframer Sep 2, 2024
71cbc57
Feat: update db row for casts for each update
mindreframer Sep 2, 2024
4741e8b
Chore: remove unused test_name from setup callback
mindreframer Sep 3, 2024
e4227ec
Feat: ensure sequential ES inserts only by applying locks.
mindreframer Sep 3, 2024
1182bc7
Chore: add some useful snippets
mindreframer Sep 3, 2024
214ffda
Feat: just use a Repo with pool=1 for PG locks
mindreframer Sep 3, 2024
88f08f4
Chore: wording
mindreframer Sep 4, 2024
9e97a87
Feat: wrapper for pubsub
mindreframer Sep 4, 2024
9a5da73
Feat: add xid (transaction id) and snapmin (min logical replication s…
mindreframer Sep 4, 2024
745eeb9
Feat: implementation for DB based notification system for new events
mindreframer Sep 4, 2024
f6bc054
Chore: A small checker module
mindreframer Sep 4, 2024
8c05558
Chore: move sql comments to be within function bodies, so they can be…
mindreframer Sep 4, 2024
40c486a
Chore: move uuid7 file to lib/ folder
mindreframer Sep 4, 2024
bf5d375
Chore: adjust naming in tests
mindreframer Sep 4, 2024
d228e6d
Chore: rename `_xid` to `txid`
mindreframer Sep 4, 2024
7370ce3
Chore: remove debug statement from migration
mindreframer Sep 4, 2024
0ace7e6
Chore: Checker allows appending to same stream
mindreframer Sep 5, 2024
b45fd28
Chore: fix compilation warning
mindreframer Sep 5, 2024
c5fb31a
Feat: also store stream_uuid on essig_signals table
mindreframer Sep 5, 2024
d85687a
Chore: fix module name for EventStore.BaseQuery
mindreframer Sep 5, 2024
5a96ac3
Feat: EventStore.Cache module, that supports concurrent requests with…
mindreframer Sep 5, 2024
71d303b
Feat: EventStore.Cache now supports using MFA (module / function / ar…
mindreframer Sep 5, 2024
cd79d41
Feat: Essig.EventStore.Cache -> Essig.Cache
mindreframer Sep 5, 2024
bffd565
Feat: add option to remove entry from cache in Essig.Cache
mindreframer Sep 5, 2024
91226cb
Feat: also update last_used timestamps for each accessed key in cache
mindreframer Sep 5, 2024
d9d608a
Feat: prepare support for expiration of Essig.Cache values
mindreframer Sep 6, 2024
6db80f8
Feat: function to handle expired entries in Essig.Cache
mindreframer Sep 6, 2024
ba15379
Feat: use GenCache for Essig.Cache
mindreframer Sep 6, 2024
6197ec4
Chore: keep ecto logs in dev
mindreframer Sep 6, 2024
0efcd0e
Feat: add EventStoreRead with explicit current_scope handling, to mak…
mindreframer Sep 6, 2024
489d09f
Chore: usage examples
mindreframer Sep 6, 2024
9568358
Feat: add max_id and count columns to the essig_signals table
mindreframer Sep 8, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion config/config.exs
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,11 @@ config :essig,
ecto_repos: [Essig.Repo],
generators: [timestamp_type: :utc_datetime]

config :essig, Essig.PubSub, adapter: Phoenix.PubSub.PG2

if config_env() == :dev do
# setup for ecto_dev_logger (https://github.com/fuelen/ecto_dev_logger)
config :essig, Essig.Repo, log: false
# config :essig, Essig.Repo, log: false

config :essig, Essig.Repo,
username: System.get_env("POSTGRES_USER") || "postgres",
Expand Down
13 changes: 13 additions & 0 deletions lib/cache.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
defmodule Essig.Cache do
use GenCache

def child_spec(opts) do
%{
id: __MODULE__,
start: {__MODULE__, :start_link, [opts]},
type: :worker,
restart: :permanent,
shutdown: 500
}
end
end
38 changes: 36 additions & 2 deletions lib/casts/cast_runner.ex
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
defmodule Essig.Casts.CastRunner do
use GenServer

defstruct key: nil, seq: nil, max_id: nil, module: nil, row: nil

##### PUBLIC API

def send_events(module, events) do
Expand All @@ -18,17 +20,22 @@ defmodule Essig.Casts.CastRunner do
def init(args) do
module = Keyword.fetch!(args, :module)
apply(module, :bootstrap, [])
{:ok, %{module: module, seq: 0, max_id: 0}}
{:ok, row} = fetch_from_db_or_init(module)
meta_data = %__MODULE__{key: module, seq: row.seq, max_id: row.max_id, module: module}
Essig.Casts.MetaTable.set(module, meta_data)
state = Map.put(meta_data, :row, row)
{:ok, state}
end

defp via_tuple(module) do
Essig.Casts.Registry.via(module)
end

def handle_call({:send_events, events}, _from, state) do
module = Map.fetch!(state, :module)
module = Map.fetch!(state, :key)
{:ok, res, state} = apply(module, :handle_events, [state, events])
state = update_seq_and_max_id(state, events)
state = update_db(state)
{:reply, {res, state}, state}
end

Expand All @@ -44,4 +51,31 @@ defmodule Essig.Casts.CastRunner do
Essig.Casts.MetaTable.update(state.module, %{seq: new_seq, max_id: new_max_id})
%{state | seq: new_seq, max_id: new_max_id}
end

defp update_db(state) do
{:ok, row} =
Essig.Crud.CastsCrud.update_cast(state.row, %{seq: state.seq, max_id: state.max_id})

Map.put(state, :row, row)
end

defp fetch_from_db_or_init(module) do
case Essig.Crud.CastsCrud.get_cast_by_module(module) do
nil ->
scope_uuid = Essig.Context.current_scope()

payload = %{
scope_uuid: scope_uuid,
module: Atom.to_string(module),
seq: 0,
max_id: 0,
setup_done: false
}

{:ok, _row} = Essig.Crud.CastsCrud.create_cast(payload)

row ->
{:ok, row}
end
end
end
39 changes: 31 additions & 8 deletions lib/casts/cast_runner_test.exs
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
defmodule Essig.Casts.CastRunnerTest do
use ExUnit.Case, async: true
use Essig.DataCase
alias Essig.Casts.CastRunner
alias Essig.Casts.MetaTable

setup %{test: test_name} do
Essig.Server.start_scope(test_name)
setup do
scope_uuid = Essig.UUID7.generate()
Essig.Server.start_scope(scope_uuid)
:ok
end

Expand All @@ -27,8 +28,19 @@ defmodule Essig.Casts.CastRunnerTest do
CastRunner.send_events(SampleCast2, events)

# Assert that the events were processed by the respective CastRunners
assert MetaTable.get(SampleCast1) == %{key: SampleCast1, max_id: 100, seq: 2}
assert MetaTable.get(SampleCast2) == %{key: SampleCast2, max_id: 100, seq: 2}
assert MetaTable.get(SampleCast1) == %CastRunner{
key: SampleCast1,
module: SampleCast1,
max_id: 100,
seq: 2
}

assert MetaTable.get(SampleCast2) == %CastRunner{
key: SampleCast2,
module: SampleCast2,
max_id: 100,
seq: 2
}
end
end

Expand All @@ -45,9 +57,20 @@ defmodule Essig.Casts.CastRunnerTest do
CastRunner.send_events(SampleCast2, events)
CastRunner.send_events(SampleCast2, events)

# # Assert the metadata for each CastRunner
assert MetaTable.get(SampleCast1) == %{key: SampleCast1, max_id: 100, seq: 2}
assert MetaTable.get(SampleCast2) == %{key: SampleCast2, max_id: 100, seq: 4}
# Assert the metadata for each CastRunner
assert MetaTable.get(SampleCast1) == %CastRunner{
key: SampleCast1,
module: SampleCast1,
max_id: 100,
seq: 2
}

assert MetaTable.get(SampleCast2) == %CastRunner{
key: SampleCast2,
module: SampleCast2,
max_id: 100,
seq: 4
}
end
end
end
2 changes: 1 addition & 1 deletion lib/casts/seq_checker_test.exs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
defmodule Essig.Casts.SeqCheckerTest do
use ExUnit.Case
use ExUnit.Case, async: true
alias Essig.Casts.SeqChecker

describe "check_reached/2" do
Expand Down
27 changes: 27 additions & 0 deletions lib/checker.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
defmodule Essig.Checker do
@moduledoc """
A small dev-only module to test the event store.
"""
def run do
scope_uuid = Essig.UUID7.generate()
stream_uuid = Essig.UUID7.generate()
run(scope_uuid, stream_uuid)
end

def run(scope_uuid, stream_uuid) do
Essig.Server.start_scope(scope_uuid)
Essig.Server.start_casts([SampleCast1])

seq = Essig.EventStore.last_seq(stream_uuid)

{:ok, %{events: _events}} =
Essig.EventStore.append_to_stream(stream_uuid, "trp", seq, [
%Sample.TestReports.Events.MasterReportAdded{path: "local/path/to", report: "report 1"},
%Sample.TestReports.Events.MasterReportAdded{path: "local/path/to", report: "report 2"},
%Sample.TestReports.Events.MasterReportAdded{path: "local/path/to", report: "report 3"}
])

# this will be unnecessary soon
# Essig.Casts.CastRunner.send_events(SampleCast1, events)
end
end
6 changes: 3 additions & 3 deletions lib/crud/casts_crud_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -10,21 +10,21 @@ defmodule Essig.CastsCrudTest do
end

test "provides defaults for numeric values" do
uuid = Essig.Ecto.UUID7.generate()
uuid = Essig.UUID7.generate()
{:ok, cast} = CastsCrud.create_cast(%{module: "module-1", scope_uuid: uuid, seq: 1})
assert cast.status == :new
end

test "prevents duplicate casts with same module" do
uuid = Essig.Ecto.UUID7.generate()
uuid = Essig.UUID7.generate()
{:ok, _cast} = CastsCrud.create_cast(%{module: "module-1", scope_uuid: uuid, seq: 1})
{:error, changeset} = CastsCrud.create_cast(%{module: "module-1", scope_uuid: uuid, seq: 2})
errors = errors_on(changeset)
assert errors == %{scope_uuid: ["has already been taken"]}
end

test "allows updating the `max_id` value via upsert on (module)" do
uuid = Essig.Ecto.UUID7.generate()
uuid = Essig.UUID7.generate()
{:ok, cast} = CastsCrud.create_cast(%{module: "module-1", scope_uuid: uuid, seq: 1})

{:ok, cast2} =
Expand Down
4 changes: 2 additions & 2 deletions lib/crud/events_crud_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ defmodule Essig.Crud.EventsCrudTest do
end

test "creates proper events" do
stream_uuid = Essig.Ecto.UUID7.generate()
scope_uuid = Essig.Ecto.UUID7.generate()
stream_uuid = Essig.UUID7.generate()
scope_uuid = Essig.UUID7.generate()

{:ok, _stream} =
StreamsCrud.create_stream(%{
Expand Down
10 changes: 5 additions & 5 deletions lib/crud/streams_crud_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ defmodule Essig.Crud.StreamsCrudTest do
end

test "creates a minimal stream record" do
scope_uuid = Essig.Ecto.UUID7.generate()
scope_uuid = Essig.UUID7.generate()

{:ok, stream} =
StreamsCrud.create_stream(%{scope_uuid: scope_uuid, stream_type: "user", seq: 1})
Expand All @@ -19,8 +19,8 @@ defmodule Essig.Crud.StreamsCrudTest do
end

test "prevents duplicates" do
uuid = Essig.Ecto.UUID7.generate()
scope_uuid = Essig.Ecto.UUID7.generate()
uuid = Essig.UUID7.generate()
scope_uuid = Essig.UUID7.generate()

{:ok, stream} =
StreamsCrud.create_stream(%{
Expand All @@ -44,8 +44,8 @@ defmodule Essig.Crud.StreamsCrudTest do
end

test "updates the seq on equal streams (upsert_stream)" do
uuid = Essig.Ecto.UUID7.generate()
scope_uuid = Essig.Ecto.UUID7.generate()
uuid = Essig.UUID7.generate()
scope_uuid = Essig.UUID7.generate()

{:ok, stream} =
StreamsCrud.upsert_stream(%{
Expand Down
4 changes: 4 additions & 0 deletions lib/essig/application.ex
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@ defmodule Essig.Application do
def start(_type, _args) do
children = [
Essig.Repo,
Essig.RepoSingleConn,
{Phoenix.PubSub, name: Essig.PubSub},
Essig.PGNotifyListener,
Essig.Cache,
{Registry, keys: :unique, name: Essig.Scopes.Registry},
Essig.Scopes.DynamicSupervisor
]
Expand Down
26 changes: 26 additions & 0 deletions lib/event_store/append_to_stream.ex
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,15 @@ defmodule Essig.EventStore.AppendToStream do
use Essig.Repo

def run(stream_uuid, stream_type, expected_seq, events) do
# To ensure sequential inserts only, we use locking.
# The likelihood of this triggering in production is low, but still possible.
# Locks are across all OS processes, since we use Postgres for this.
Essig.PGLock.with_lock("es-insert", fn ->
run_unprotected(stream_uuid, stream_type, expected_seq, events)
end)
end

defp run_unprotected(stream_uuid, stream_type, expected_seq, events) do
multi(stream_uuid, stream_type, expected_seq, events)
|> Repo.transaction()
end
Expand All @@ -24,6 +33,9 @@ defmodule Essig.EventStore.AppendToStream do
last_event = Enum.at(insert_events, -1)
Essig.Crud.StreamsCrud.update_stream(stream, %{seq: last_event.seq})
end)
|> Ecto.Multi.run(:signal_new_events, fn _repo, _ ->
signal_new_events(stream_uuid)
end)
end

defp ensure_stream_exists(stream_uuid, stream_type) do
Expand Down Expand Up @@ -85,4 +97,18 @@ defmodule Essig.EventStore.AppendToStream do
events -> {:ok, Enum.reverse(events)}
end
end

defp signal_new_events(stream_uuid) do
scope_uuid = Essig.Context.current_scope()
bin_uuid = Ecto.UUID.dump!(scope_uuid)
stream_uuid = Ecto.UUID.dump!(stream_uuid)
mindreframer marked this conversation as resolved.
Show resolved Hide resolved

{:ok, _} =
Repo.query("insert into essig_signals(scope_uuid, stream_uuid) values ($1, $2)", [
bin_uuid,
stream_uuid
])

{:ok, true}
end
end
16 changes: 8 additions & 8 deletions lib/event_store/append_to_stream_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ defmodule Essig.EventStore.AppendToStreamTest do

describe "stream does not exist" do
setup do
Essig.Context.set_current_scope(Essig.Ecto.UUID7.generate())
stream_uuid = Essig.Ecto.UUID7.generate()
Essig.Context.set_current_scope(Essig.UUID7.generate())
stream_uuid = Essig.UUID7.generate()

e1 = %CustomApp.TestReports.Events.TicketMatchAdded{
match_kind: "auto",
Expand Down Expand Up @@ -48,8 +48,8 @@ defmodule Essig.EventStore.AppendToStreamTest do

describe "stream exists + expected value matches" do
setup do
Essig.Context.set_current_scope(Essig.Ecto.UUID7.generate())
stream_uuid = Essig.Ecto.UUID7.generate()
Essig.Context.set_current_scope(Essig.UUID7.generate())
stream_uuid = Essig.UUID7.generate()

e1 = %CustomApp.TestReports.Events.TicketMatchAdded{
match_kind: "auto",
Expand Down Expand Up @@ -97,8 +97,8 @@ defmodule Essig.EventStore.AppendToStreamTest do

describe "stream exists, yet expected seq does not match" do
test "returns errors" do
Essig.Context.set_current_scope(Essig.Ecto.UUID7.generate())
uuid = Essig.Ecto.UUID7.generate()
Essig.Context.set_current_scope(Essig.UUID7.generate())
uuid = Essig.UUID7.generate()

e1 = %CustomApp.TestReports.Events.TicketMatchAdded{
match_kind: "auto",
Expand All @@ -123,8 +123,8 @@ defmodule Essig.EventStore.AppendToStreamTest do

describe "stream exists, seq matches, yet stream type does not match" do
test "returns errors" do
Essig.Context.set_current_scope(Essig.Ecto.UUID7.generate())
uuid = Essig.Ecto.UUID7.generate()
Essig.Context.set_current_scope(Essig.UUID7.generate())
uuid = Essig.UUID7.generate()

e1 = %CustomApp.TestReports.Events.TicketMatchAdded{
match_kind: "auto",
Expand Down
2 changes: 1 addition & 1 deletion lib/event_store/base_query.ex
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
defmodule EventStore.BaseQuery do
defmodule Essig.EventStore.BaseQuery do
alias Essig.Schemas.Event
use Essig.Repo

Expand Down
3 changes: 1 addition & 2 deletions lib/event_store/read_all_stream_backward.ex
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
defmodule Essig.EventStore.ReadAllStreamBackward do
alias Essig.Schemas.Event
use Essig.Repo

def run(from_id, amount) do
query(from_id, amount) |> Repo.all()
end

def query(from_id, amount) do
EventStore.BaseQuery.query()
Essig.EventStore.BaseQuery.query()
|> where([event], event.id < ^from_id)
|> order_by(desc: :id)
|> limit(^amount)
Expand Down
2 changes: 1 addition & 1 deletion lib/event_store/read_all_stream_forward.ex
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ defmodule Essig.EventStore.ReadAllStreamForward do
end

def query(from_id, amount) do
EventStore.BaseQuery.query()
Essig.EventStore.BaseQuery.query()
|> where([event], event.id > ^from_id)
|> order_by(asc: :id)
|> limit(^amount)
Expand Down
2 changes: 1 addition & 1 deletion lib/event_store/read_stream_backward.ex
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ defmodule Essig.EventStore.ReadStreamBackward do
end

def query(stream_uuid, from_seq, amount) do
EventStore.BaseQuery.query()
Essig.EventStore.BaseQuery.query()
|> where([event], event.stream_uuid == ^stream_uuid)
|> where([event], event.seq < ^from_seq)
|> order_by(desc: :seq)
Expand Down
Loading
Loading