diff --git a/config/config.exs b/config/config.exs index 192ce79..7a037f9 100644 --- a/config/config.exs +++ b/config/config.exs @@ -4,9 +4,11 @@ config :essig, ecto_repos: [Essig.Repo], generators: [timestamp_type: :utc_datetime] +config :essig, Essig.PubSub, adapter: Phoenix.PubSub.PG2 + if config_env() == :dev do # setup for ecto_dev_logger (https://github.com/fuelen/ecto_dev_logger) - config :essig, Essig.Repo, log: false + # config :essig, Essig.Repo, log: false config :essig, Essig.Repo, username: System.get_env("POSTGRES_USER") || "postgres", diff --git a/lib/cache.ex b/lib/cache.ex new file mode 100644 index 0000000..b3a5f10 --- /dev/null +++ b/lib/cache.ex @@ -0,0 +1,13 @@ +defmodule Essig.Cache do + use GenCache + + def child_spec(opts) do + %{ + id: __MODULE__, + start: {__MODULE__, :start_link, [opts]}, + type: :worker, + restart: :permanent, + shutdown: 500 + } + end +end diff --git a/lib/casts/cast_runner.ex b/lib/casts/cast_runner.ex index b54d1fc..98b911b 100644 --- a/lib/casts/cast_runner.ex +++ b/lib/casts/cast_runner.ex @@ -1,6 +1,8 @@ defmodule Essig.Casts.CastRunner do use GenServer + defstruct key: nil, seq: nil, max_id: nil, module: nil, row: nil + ##### PUBLIC API def send_events(module, events) do @@ -18,7 +20,11 @@ defmodule Essig.Casts.CastRunner do def init(args) do module = Keyword.fetch!(args, :module) apply(module, :bootstrap, []) - {:ok, %{module: module, seq: 0, max_id: 0}} + {:ok, row} = fetch_from_db_or_init(module) + meta_data = %__MODULE__{key: module, seq: row.seq, max_id: row.max_id, module: module} + Essig.Casts.MetaTable.set(module, meta_data) + state = Map.put(meta_data, :row, row) + {:ok, state} end defp via_tuple(module) do @@ -26,9 +32,10 @@ defmodule Essig.Casts.CastRunner do end def handle_call({:send_events, events}, _from, state) do - module = Map.fetch!(state, :module) + module = Map.fetch!(state, :key) {:ok, res, state} = apply(module, :handle_events, [state, events]) state = update_seq_and_max_id(state, events) + state = update_db(state) {:reply, {res, state}, state} end @@ -44,4 +51,31 @@ defmodule Essig.Casts.CastRunner do Essig.Casts.MetaTable.update(state.module, %{seq: new_seq, max_id: new_max_id}) %{state | seq: new_seq, max_id: new_max_id} end + + defp update_db(state) do + {:ok, row} = + Essig.Crud.CastsCrud.update_cast(state.row, %{seq: state.seq, max_id: state.max_id}) + + Map.put(state, :row, row) + end + + defp fetch_from_db_or_init(module) do + case Essig.Crud.CastsCrud.get_cast_by_module(module) do + nil -> + scope_uuid = Essig.Context.current_scope() + + payload = %{ + scope_uuid: scope_uuid, + module: Atom.to_string(module), + seq: 0, + max_id: 0, + setup_done: false + } + + {:ok, _row} = Essig.Crud.CastsCrud.create_cast(payload) + + row -> + {:ok, row} + end + end end diff --git a/lib/casts/cast_runner_test.exs b/lib/casts/cast_runner_test.exs index b7f7986..d5e7279 100644 --- a/lib/casts/cast_runner_test.exs +++ b/lib/casts/cast_runner_test.exs @@ -1,10 +1,11 @@ defmodule Essig.Casts.CastRunnerTest do - use ExUnit.Case, async: true + use Essig.DataCase alias Essig.Casts.CastRunner alias Essig.Casts.MetaTable - setup %{test: test_name} do - Essig.Server.start_scope(test_name) + setup do + scope_uuid = Essig.UUID7.generate() + Essig.Server.start_scope(scope_uuid) :ok end @@ -27,8 +28,19 @@ defmodule Essig.Casts.CastRunnerTest do CastRunner.send_events(SampleCast2, events) # Assert that the events were processed by the respective CastRunners - assert MetaTable.get(SampleCast1) == %{key: SampleCast1, max_id: 100, seq: 2} - assert MetaTable.get(SampleCast2) == %{key: SampleCast2, max_id: 100, seq: 2} + assert MetaTable.get(SampleCast1) == %CastRunner{ + key: SampleCast1, + module: SampleCast1, + max_id: 100, + seq: 2 + } + + assert MetaTable.get(SampleCast2) == %CastRunner{ + key: SampleCast2, + module: SampleCast2, + max_id: 100, + seq: 2 + } end end @@ -45,9 +57,20 @@ defmodule Essig.Casts.CastRunnerTest do CastRunner.send_events(SampleCast2, events) CastRunner.send_events(SampleCast2, events) - # # Assert the metadata for each CastRunner - assert MetaTable.get(SampleCast1) == %{key: SampleCast1, max_id: 100, seq: 2} - assert MetaTable.get(SampleCast2) == %{key: SampleCast2, max_id: 100, seq: 4} + # Assert the metadata for each CastRunner + assert MetaTable.get(SampleCast1) == %CastRunner{ + key: SampleCast1, + module: SampleCast1, + max_id: 100, + seq: 2 + } + + assert MetaTable.get(SampleCast2) == %CastRunner{ + key: SampleCast2, + module: SampleCast2, + max_id: 100, + seq: 4 + } end end end diff --git a/lib/casts/seq_checker_test.exs b/lib/casts/seq_checker_test.exs index 3ec779b..6c375df 100644 --- a/lib/casts/seq_checker_test.exs +++ b/lib/casts/seq_checker_test.exs @@ -1,5 +1,5 @@ defmodule Essig.Casts.SeqCheckerTest do - use ExUnit.Case + use ExUnit.Case, async: true alias Essig.Casts.SeqChecker describe "check_reached/2" do diff --git a/lib/checker.ex b/lib/checker.ex new file mode 100644 index 0000000..07bb409 --- /dev/null +++ b/lib/checker.ex @@ -0,0 +1,27 @@ +defmodule Essig.Checker do + @moduledoc """ + A small dev-only module to test the event store. + """ + def run do + scope_uuid = Essig.UUID7.generate() + stream_uuid = Essig.UUID7.generate() + run(scope_uuid, stream_uuid) + end + + def run(scope_uuid, stream_uuid) do + Essig.Server.start_scope(scope_uuid) + Essig.Server.start_casts([SampleCast1]) + + seq = Essig.EventStore.last_seq(stream_uuid) + + {:ok, %{events: _events}} = + Essig.EventStore.append_to_stream(stream_uuid, "trp", seq, [ + %Sample.TestReports.Events.MasterReportAdded{path: "local/path/to", report: "report 1"}, + %Sample.TestReports.Events.MasterReportAdded{path: "local/path/to", report: "report 2"}, + %Sample.TestReports.Events.MasterReportAdded{path: "local/path/to", report: "report 3"} + ]) + + # this will be unnecessary soon + # Essig.Casts.CastRunner.send_events(SampleCast1, events) + end +end diff --git a/lib/crud/casts_crud_test.exs b/lib/crud/casts_crud_test.exs index 82715e9..41bc9ce 100644 --- a/lib/crud/casts_crud_test.exs +++ b/lib/crud/casts_crud_test.exs @@ -10,13 +10,13 @@ defmodule Essig.CastsCrudTest do end test "provides defaults for numeric values" do - uuid = Essig.Ecto.UUID7.generate() + uuid = Essig.UUID7.generate() {:ok, cast} = CastsCrud.create_cast(%{module: "module-1", scope_uuid: uuid, seq: 1}) assert cast.status == :new end test "prevents duplicate casts with same module" do - uuid = Essig.Ecto.UUID7.generate() + uuid = Essig.UUID7.generate() {:ok, _cast} = CastsCrud.create_cast(%{module: "module-1", scope_uuid: uuid, seq: 1}) {:error, changeset} = CastsCrud.create_cast(%{module: "module-1", scope_uuid: uuid, seq: 2}) errors = errors_on(changeset) @@ -24,7 +24,7 @@ defmodule Essig.CastsCrudTest do end test "allows updating the `max_id` value via upsert on (module)" do - uuid = Essig.Ecto.UUID7.generate() + uuid = Essig.UUID7.generate() {:ok, cast} = CastsCrud.create_cast(%{module: "module-1", scope_uuid: uuid, seq: 1}) {:ok, cast2} = diff --git a/lib/crud/events_crud_test.exs b/lib/crud/events_crud_test.exs index 2ffbbca..8ca9c18 100644 --- a/lib/crud/events_crud_test.exs +++ b/lib/crud/events_crud_test.exs @@ -24,8 +24,8 @@ defmodule Essig.Crud.EventsCrudTest do end test "creates proper events" do - stream_uuid = Essig.Ecto.UUID7.generate() - scope_uuid = Essig.Ecto.UUID7.generate() + stream_uuid = Essig.UUID7.generate() + scope_uuid = Essig.UUID7.generate() {:ok, _stream} = StreamsCrud.create_stream(%{ diff --git a/lib/crud/streams_crud_test.exs b/lib/crud/streams_crud_test.exs index 9d63030..ef0214d 100644 --- a/lib/crud/streams_crud_test.exs +++ b/lib/crud/streams_crud_test.exs @@ -10,7 +10,7 @@ defmodule Essig.Crud.StreamsCrudTest do end test "creates a minimal stream record" do - scope_uuid = Essig.Ecto.UUID7.generate() + scope_uuid = Essig.UUID7.generate() {:ok, stream} = StreamsCrud.create_stream(%{scope_uuid: scope_uuid, stream_type: "user", seq: 1}) @@ -19,8 +19,8 @@ defmodule Essig.Crud.StreamsCrudTest do end test "prevents duplicates" do - uuid = Essig.Ecto.UUID7.generate() - scope_uuid = Essig.Ecto.UUID7.generate() + uuid = Essig.UUID7.generate() + scope_uuid = Essig.UUID7.generate() {:ok, stream} = StreamsCrud.create_stream(%{ @@ -44,8 +44,8 @@ defmodule Essig.Crud.StreamsCrudTest do end test "updates the seq on equal streams (upsert_stream)" do - uuid = Essig.Ecto.UUID7.generate() - scope_uuid = Essig.Ecto.UUID7.generate() + uuid = Essig.UUID7.generate() + scope_uuid = Essig.UUID7.generate() {:ok, stream} = StreamsCrud.upsert_stream(%{ diff --git a/lib/essig/application.ex b/lib/essig/application.ex index 395171d..9bdf4c1 100644 --- a/lib/essig/application.ex +++ b/lib/essig/application.ex @@ -7,6 +7,10 @@ defmodule Essig.Application do def start(_type, _args) do children = [ Essig.Repo, + Essig.RepoSingleConn, + {Phoenix.PubSub, name: Essig.PubSub}, + Essig.PGNotifyListener, + Essig.Cache, {Registry, keys: :unique, name: Essig.Scopes.Registry}, Essig.Scopes.DynamicSupervisor ] diff --git a/lib/event_store/append_to_stream.ex b/lib/event_store/append_to_stream.ex index 78c7021..05f700a 100644 --- a/lib/event_store/append_to_stream.ex +++ b/lib/event_store/append_to_stream.ex @@ -2,6 +2,15 @@ defmodule Essig.EventStore.AppendToStream do use Essig.Repo def run(stream_uuid, stream_type, expected_seq, events) do + # To ensure sequential inserts only, we use locking. + # The likelihood of this triggering in production is low, but still possible. + # Locks are across all OS processes, since we use Postgres for this. + Essig.PGLock.with_lock("es-insert", fn -> + run_unprotected(stream_uuid, stream_type, expected_seq, events) + end) + end + + defp run_unprotected(stream_uuid, stream_type, expected_seq, events) do multi(stream_uuid, stream_type, expected_seq, events) |> Repo.transaction() end @@ -24,6 +33,12 @@ defmodule Essig.EventStore.AppendToStream do last_event = Enum.at(insert_events, -1) Essig.Crud.StreamsCrud.update_stream(stream, %{seq: last_event.seq}) end) + |> Ecto.Multi.run(:signal_new_events, fn _repo, %{insert_events: insert_events} -> + last_event = Enum.at(insert_events, -1) + max_id = last_event.id + count = Enum.count(insert_events) + signal_new_events(stream_uuid, count, max_id) + end) end defp ensure_stream_exists(stream_uuid, stream_type) do @@ -85,4 +100,23 @@ defmodule Essig.EventStore.AppendToStream do events -> {:ok, Enum.reverse(events)} end end + + defp signal_new_events(stream_uuid, count, max_id) do + scope_uuid = Essig.Context.current_scope() + bin_uuid = Ecto.UUID.dump!(scope_uuid) + stream_uuid = Ecto.UUID.dump!(stream_uuid) + + {:ok, _} = + Repo.query( + "insert into essig_signals(scope_uuid, stream_uuid, count, max_id) values ($1, $2, $3, $4)", + [ + bin_uuid, + stream_uuid, + count, + max_id + ] + ) + + {:ok, true} + end end diff --git a/lib/event_store/append_to_stream_test.exs b/lib/event_store/append_to_stream_test.exs index cc88dd2..dcc706a 100644 --- a/lib/event_store/append_to_stream_test.exs +++ b/lib/event_store/append_to_stream_test.exs @@ -3,8 +3,8 @@ defmodule Essig.EventStore.AppendToStreamTest do describe "stream does not exist" do setup do - Essig.Context.set_current_scope(Essig.Ecto.UUID7.generate()) - stream_uuid = Essig.Ecto.UUID7.generate() + Essig.Context.set_current_scope(Essig.UUID7.generate()) + stream_uuid = Essig.UUID7.generate() e1 = %CustomApp.TestReports.Events.TicketMatchAdded{ match_kind: "auto", @@ -48,8 +48,8 @@ defmodule Essig.EventStore.AppendToStreamTest do describe "stream exists + expected value matches" do setup do - Essig.Context.set_current_scope(Essig.Ecto.UUID7.generate()) - stream_uuid = Essig.Ecto.UUID7.generate() + Essig.Context.set_current_scope(Essig.UUID7.generate()) + stream_uuid = Essig.UUID7.generate() e1 = %CustomApp.TestReports.Events.TicketMatchAdded{ match_kind: "auto", @@ -97,8 +97,8 @@ defmodule Essig.EventStore.AppendToStreamTest do describe "stream exists, yet expected seq does not match" do test "returns errors" do - Essig.Context.set_current_scope(Essig.Ecto.UUID7.generate()) - uuid = Essig.Ecto.UUID7.generate() + Essig.Context.set_current_scope(Essig.UUID7.generate()) + uuid = Essig.UUID7.generate() e1 = %CustomApp.TestReports.Events.TicketMatchAdded{ match_kind: "auto", @@ -123,8 +123,8 @@ defmodule Essig.EventStore.AppendToStreamTest do describe "stream exists, seq matches, yet stream type does not match" do test "returns errors" do - Essig.Context.set_current_scope(Essig.Ecto.UUID7.generate()) - uuid = Essig.Ecto.UUID7.generate() + Essig.Context.set_current_scope(Essig.UUID7.generate()) + uuid = Essig.UUID7.generate() e1 = %CustomApp.TestReports.Events.TicketMatchAdded{ match_kind: "auto", diff --git a/lib/event_store/base_query.ex b/lib/event_store/base_query.ex index b9bb5a6..7b85dd2 100644 --- a/lib/event_store/base_query.ex +++ b/lib/event_store/base_query.ex @@ -1,4 +1,4 @@ -defmodule EventStore.BaseQuery do +defmodule Essig.EventStore.BaseQuery do alias Essig.Schemas.Event use Essig.Repo diff --git a/lib/event_store/read_all_stream_backward.ex b/lib/event_store/read_all_stream_backward.ex index 4b112a4..5ec988a 100644 --- a/lib/event_store/read_all_stream_backward.ex +++ b/lib/event_store/read_all_stream_backward.ex @@ -1,5 +1,4 @@ defmodule Essig.EventStore.ReadAllStreamBackward do - alias Essig.Schemas.Event use Essig.Repo def run(from_id, amount) do @@ -7,7 +6,7 @@ defmodule Essig.EventStore.ReadAllStreamBackward do end def query(from_id, amount) do - EventStore.BaseQuery.query() + Essig.EventStore.BaseQuery.query() |> where([event], event.id < ^from_id) |> order_by(desc: :id) |> limit(^amount) diff --git a/lib/event_store/read_all_stream_forward.ex b/lib/event_store/read_all_stream_forward.ex index 490c386..3b77673 100644 --- a/lib/event_store/read_all_stream_forward.ex +++ b/lib/event_store/read_all_stream_forward.ex @@ -6,7 +6,7 @@ defmodule Essig.EventStore.ReadAllStreamForward do end def query(from_id, amount) do - EventStore.BaseQuery.query() + Essig.EventStore.BaseQuery.query() |> where([event], event.id > ^from_id) |> order_by(asc: :id) |> limit(^amount) diff --git a/lib/event_store/read_stream_backward.ex b/lib/event_store/read_stream_backward.ex index fe59b02..d86be85 100644 --- a/lib/event_store/read_stream_backward.ex +++ b/lib/event_store/read_stream_backward.ex @@ -6,7 +6,7 @@ defmodule Essig.EventStore.ReadStreamBackward do end def query(stream_uuid, from_seq, amount) do - EventStore.BaseQuery.query() + Essig.EventStore.BaseQuery.query() |> where([event], event.stream_uuid == ^stream_uuid) |> where([event], event.seq < ^from_seq) |> order_by(desc: :seq) diff --git a/lib/event_store/read_stream_forward.ex b/lib/event_store/read_stream_forward.ex index e0dd73f..fd98659 100644 --- a/lib/event_store/read_stream_forward.ex +++ b/lib/event_store/read_stream_forward.ex @@ -6,7 +6,7 @@ defmodule Essig.EventStore.ReadStreamForward do end def query(stream_uuid, from_seq, amount) do - EventStore.BaseQuery.query() + Essig.EventStore.BaseQuery.query() |> where([event], event.stream_uuid == ^stream_uuid) |> where([event], event.seq > ^from_seq) |> order_by(asc: :id) diff --git a/lib/event_store_reads.ex b/lib/event_store_reads.ex new file mode 100644 index 0000000..872f1f6 --- /dev/null +++ b/lib/event_store_reads.ex @@ -0,0 +1,38 @@ +defmodule Essig.EventStoreReads do + @moduledoc """ + EventStoreReads sets current scope explicitly, for reads. + This makes it possible to cache with [GenCache](https://github.com/maxohq/gen_cache/) + + Usage: + ``` + iex> Essig.Cache.request({Essig.EventStoreReads, :read_all_stream_forward, ["0191be93-c178-71a0-8bd8-bcde9f55b7d6", 5,10]}) + iex> Essig.Cache.request({Essig.EventStoreReads, :read_stream_forward, ["0191be93-c178-71a0-8bd8-bcde9f55b7d6", "0191be93-c178-7203-b137-c4e294ca925b", 5, 10]}) + ``` + """ + use Essig.Repo + + def read_stream_forward(scope_uuid, stream_uuid, from_seq, amount) do + Essig.Context.set_current_scope(scope_uuid) + Essig.EventStore.ReadStreamForward.run(stream_uuid, from_seq, amount) + end + + def read_all_stream_forward(scope_uuid, from_id, amount) do + Essig.Context.set_current_scope(scope_uuid) + Essig.EventStore.ReadAllStreamForward.run(from_id, amount) + end + + def read_stream_backward(scope_uuid, stream_uuid, from_seq, amount) do + Essig.Context.set_current_scope(scope_uuid) + Essig.EventStore.ReadStreamBackward.run(stream_uuid, from_seq, amount) + end + + def read_all_stream_backward(scope_uuid, from_id, amount) do + Essig.Context.set_current_scope(scope_uuid) + Essig.EventStore.ReadAllStreamBackward.run(from_id, amount) + end + + def last_seq(scope_uuid, stream_uuid) do + Essig.Context.set_current_scope(scope_uuid) + Essig.EventStore.LastSeq.run(stream_uuid) + end +end diff --git a/lib/event_store_test.exs b/lib/event_store_test.exs index f23fd5c..5039772 100644 --- a/lib/event_store_test.exs +++ b/lib/event_store_test.exs @@ -4,7 +4,7 @@ defmodule Essig.EventStoreTest do use MnemeDefaults setup do - Essig.Context.set_current_scope(Essig.Ecto.UUID7.generate()) + Essig.Context.set_current_scope(Essig.UUID7.generate()) :ok end @@ -28,8 +28,8 @@ defmodule Essig.EventStoreTest do describe "read_all_stream_forward" do test "iterates over ALL global events from oldest to newest" do - uuid1 = Essig.Ecto.UUID7.generate() - uuid2 = Essig.Ecto.UUID7.generate() + uuid1 = Essig.UUID7.generate() + uuid2 = Essig.UUID7.generate() init_stream(uuid1, 0) init_stream(uuid2, 0) init_stream(uuid1, 10) @@ -48,8 +48,8 @@ defmodule Essig.EventStoreTest do describe "read_all_stream_backward" do test "iterates over ALL global events from newest to oldest" do - uuid1 = Essig.Ecto.UUID7.generate() - uuid2 = Essig.Ecto.UUID7.generate() + uuid1 = Essig.UUID7.generate() + uuid2 = Essig.UUID7.generate() # batch 1 init_stream(uuid1, 0) # batch 2 @@ -85,7 +85,7 @@ defmodule Essig.EventStoreTest do describe "read_stream_backward" do test "fetches events from newest to oldest with filters applied" do - uuid = Essig.Ecto.UUID7.generate() + uuid = Essig.UUID7.generate() init_stream(uuid, 0) @@ -119,7 +119,7 @@ defmodule Essig.EventStoreTest do describe "read_stream_forward" do test "fetches events from oldest to newest with filters applied" do - uuid = Essig.Ecto.UUID7.generate() + uuid = Essig.UUID7.generate() init_stream(uuid, 0) {:ok, _a} = @@ -191,12 +191,12 @@ defmodule Essig.EventStoreTest do scope1 = Essig.Context.current_scope() Essig.Server.start_scope(scope1) - uuid1 = Essig.Ecto.UUID7.generate() + uuid1 = Essig.UUID7.generate() init_stream(uuid1, 0) - scope2 = Essig.Ecto.UUID7.generate() + scope2 = Essig.UUID7.generate() Essig.Server.start_scope(scope2) - uuid2 = Essig.Ecto.UUID7.generate() + uuid2 = Essig.UUID7.generate() init_stream(uuid2, 0) # switch to scope1 diff --git a/lib/helpers/map.ex b/lib/helpers/map.ex new file mode 100644 index 0000000..06130b8 --- /dev/null +++ b/lib/helpers/map.ex @@ -0,0 +1,97 @@ +defmodule Essig.Helpers.Map do + @moduledoc """ + Functions to transform maps + // https://gist.github.com/kipcole9/0bd4c6fb6109bfec9955f785087f53fb - Helpers for Elixir Maps: underscore, atomise and stringify map keys + + """ + + @doc """ + Convert map string camelCase keys to underscore_keys + """ + def underscore_keys(nil), do: nil + + def underscore_keys(map = %{}) do + map + |> Enum.map(fn {k, v} -> {Macro.underscore(k), underscore_keys(v)} end) + |> Enum.map(fn {k, v} -> {String.replace(k, "-", "_"), v} end) + |> Enum.into(%{}) + end + + # Walk the list and atomize the keys of + # of any map members + def underscore_keys([head | rest]) do + [underscore_keys(head) | underscore_keys(rest)] + end + + def underscore_keys(not_a_map) do + not_a_map + end + + @doc """ + Convert map string keys to :atom keys + """ + def atomize_keys(nil), do: nil + + # Structs don't do enumerable and anyway the keys are already + # atoms + def atomize_keys(struct = %{__struct__: _}) do + struct + end + + def atomize_keys(map = %{}) do + map + |> Enum.map(fn {k, v} -> {String.to_atom(k), atomize_keys(v)} end) + |> Enum.into(%{}) + end + + # Walk the list and atomize the keys of + # of any map members + def atomize_keys([head | rest]) do + [atomize_keys(head) | atomize_keys(rest)] + end + + def atomize_keys(not_a_map) do + not_a_map + end + + @doc """ + Convert map atom keys to strings + """ + def stringify_keys(nil), do: nil + + def stringify_keys(map = %{}) do + map + |> Enum.map(fn {k, v} -> {Atom.to_string(k), stringify_keys(v)} end) + |> Enum.into(%{}) + end + + # Walk the list and stringify the keys of + # of any map members + def stringify_keys([head | rest]) do + [stringify_keys(head) | stringify_keys(rest)] + end + + def stringify_keys(not_a_map) do + not_a_map + end + + @doc """ + Deep merge two maps + """ + def deep_merge(left, right) do + Map.merge(left, right, &deep_resolve/3) + end + + # Key exists in both maps, and both values are maps as well. + # These can be merged recursively. + defp deep_resolve(_key, left = %{}, right = %{}) do + deep_merge(left, right) + end + + # Key exists in both maps, but at least one of the values is + # NOT a map. We fall back to standard merge behavior, preferring + # the value on the right. + defp deep_resolve(_key, _left, right) do + right + end +end diff --git a/lib/meta_table/handler_meta_test.exs b/lib/meta_table/handler_meta_test.exs index 2dcce9b..41759a9 100644 --- a/lib/meta_table/handler_meta_test.exs +++ b/lib/meta_table/handler_meta_test.exs @@ -8,25 +8,25 @@ defmodule Essig.HandlerMetaTest do setup do # Set up a test app context - test_app = "test_app_#{:rand.uniform(1000)}" - Essig.Context.set_current_scope(test_app) + scope_uuid = Essig.UUID7.generate() + Essig.Context.set_current_scope(scope_uuid) on_exit(fn -> Essig.Context.set_current_scope(nil) end) - %{test_app: test_app} + %{scope_uuid: scope_uuid} end - test "init/0 creates an ETS table", %{test_app: test_app} do + test "init/0 creates an ETS table", %{scope_uuid: scope_uuid} do HandlerMeta.init() - assert :ets.info(String.to_atom("#{test_app}_handler_meta")) != :undefined + assert :ets.info(String.to_atom("#{scope_uuid}_handler_meta")) != :undefined end - test "repeated init/0 does not raise", %{test_app: test_app} do + test "repeated init/0 does not raise", %{scope_uuid: scope_uuid} do name = HandlerMeta.init() assert name == HandlerMeta.init() - assert :ets.info(String.to_atom("#{test_app}_handler_meta")) != :undefined + assert :ets.info(String.to_atom("#{scope_uuid}_handler_meta")) != :undefined end test "set/2 inserts data into the ETS table" do @@ -139,7 +139,7 @@ defmodule Essig.HandlerMetaTest do assert Enum.at(result, 0) == {TestModule1, %{status: :new, key: TestModule1}} end - test "operations with different app contexts", %{test_app: test_app} do + test "operations with different app contexts", %{scope_uuid: scope_uuid} do HandlerMeta.init() HandlerMeta.set(TestModule, %{field: "value1"}) @@ -151,7 +151,7 @@ defmodule Essig.HandlerMetaTest do assert HandlerMeta.get(TestModule) == %{field: "value2", key: TestModule} # Switch back to the original app context - Essig.Context.set_current_scope(test_app) + Essig.Context.set_current_scope(scope_uuid) assert HandlerMeta.get(TestModule) == %{field: "value1", key: TestModule} end end diff --git a/lib/migrations/all.ex b/lib/migrations/all.ex index 6f0fc6f..4269584 100644 --- a/lib/migrations/all.ex +++ b/lib/migrations/all.ex @@ -4,7 +4,10 @@ defmodule Migrations.All do """ def modules do [ - {20_240_824_120_000, Migrations.Migration001} + {2024_0824_120000, Migrations.Migration001}, + {2024_0904_112600, Migrations.Migration002}, + {2024_0904_114100, Migrations.Migration003}, + {2024_0907_232900, Migrations.Migration004} ] end end diff --git a/lib/migrations/migration002.ex b/lib/migrations/migration002.ex new file mode 100644 index 0000000..02957b7 --- /dev/null +++ b/lib/migrations/migration002.ex @@ -0,0 +1,33 @@ +defmodule Migrations.Migration002 do + use Ecto.Migration + + def change do + alter table(:essig_events) do + add(:txid, :bigint) + add(:snapmin, :bigint) + end + + execute " + CREATE OR REPLACE FUNCTION essig_add_txid_snapmin() + RETURNS TRIGGER AS $$ + BEGIN + -- we add current transaction id and minimal LSN + -- based on suggestions here: https://github.com/josevalim/sync/blob/main/priv/repo/migrations/20240806131210_create_publication.exs + + NEW.txid := pg_current_xact_id(); + NEW.snapmin := pg_snapshot_xmin(pg_current_snapshot()); + RETURN NEW; + END; + $$ LANGUAGE plpgsql; + ", + "DROP FUNCTION essig_add_txid_snapmin()" + + execute " + CREATE OR REPLACE TRIGGER essig_add_txid_to_events + BEFORE INSERT OR UPDATE ON essig_events + FOR EACH ROW + EXECUTE FUNCTION essig_add_txid_snapmin(); + ", + "DROP TRIGGER essig_add_txid_to_events;" + end +end diff --git a/lib/migrations/migration003.ex b/lib/migrations/migration003.ex new file mode 100644 index 0000000..2ab3f4e --- /dev/null +++ b/lib/migrations/migration003.ex @@ -0,0 +1,52 @@ +defmodule Migrations.Migration003 do + use Ecto.Migration + + def change do + create table(:essig_signals, primary_key: false) do + add(:id, :bigserial, primary_key: true) + add(:scope_uuid, :uuid, null: false, default: fragment("gen_random_uuid()")) + add(:stream_uuid, :uuid, null: false) + add(:txid, :bigint) + add(:snapmin, :bigint) + end + + execute " + -- Trigger on singals table, to notify on new transactions (events) via pg_notify + CREATE OR REPLACE TRIGGER essig_add_txid_to_signals + BEFORE INSERT OR UPDATE ON essig_signals + FOR EACH ROW + EXECUTE FUNCTION essig_add_txid_snapmin(); + ", + "DROP TRIGGER essig_add_txid_to_signals;" + + execute """ + CREATE OR REPLACE FUNCTION notify_new_events() + RETURNS TRIGGER AS $$ + DECLARE + payload JSON; + BEGIN + -- Function to notify on new transactions (events) via pg_notify + payload := json_build_object( + 'scope_uuid', NEW.scope_uuid, + 'stream_uuid', NEW.stream_uuid, + 'txid', NEW.txid, + 'snapmin', NEW.snapmin + ); + + PERFORM pg_notify('new_events', payload::TEXT); + RETURN NEW; + END; + $$ LANGUAGE plpgsql; + """, + "DROP FUNCTION notify_new_events();" + + execute " + -- Trigger to notify on new transactions (events) via pg_notify + CREATE TRIGGER signals_notify_new_events + BEFORE INSERT ON essig_signals + FOR EACH ROW + EXECUTE PROCEDURE notify_new_events(); + ", + "DROP TRIGGER signals_notify_new_events;" + end +end diff --git a/lib/migrations/migration004.ex b/lib/migrations/migration004.ex new file mode 100644 index 0000000..6bd216c --- /dev/null +++ b/lib/migrations/migration004.ex @@ -0,0 +1,32 @@ +defmodule Migrations.Migration004 do + use Ecto.Migration + + def change do + alter table(:essig_signals) do + add(:count, :integer, null: false) + add(:max_id, :bigint, null: false) + end + + execute """ + CREATE OR REPLACE FUNCTION notify_new_events() + RETURNS TRIGGER AS $$ + DECLARE + payload JSON; + BEGIN + -- Function to notify on new transactions (events) via pg_notify + payload := json_build_object( + 'scope_uuid', NEW.scope_uuid, + 'stream_uuid', NEW.stream_uuid, + 'txid', NEW.txid, + 'count', NEW.count, + 'max_id', NEW.max_id + ); + + PERFORM pg_notify('new_events', payload::TEXT); + RETURN NEW; + END; + $$ LANGUAGE plpgsql; + """, + "DROP FUNCTION notify_new_events();" + end +end diff --git a/lib/pg_lock.ex b/lib/pg_lock.ex new file mode 100644 index 0000000..8c87d41 --- /dev/null +++ b/lib/pg_lock.ex @@ -0,0 +1,41 @@ +defmodule Essig.PGLock do + @moduledoc """ + A simple wrapper around pg_try_advisory_lock and pg_advisory_unlock. + To get a consistent PG connection, it uses a second Repo with pool_size=1. + This makes it possible to use the same connection for locking and releasing the lock! + Because releasing the lock on a different connection than the one it was created wont work. + This is the best workaround I could come up with. + + + - Check locks with: + + ```sql + SELECT locktype, transactionid, virtualtransaction, mode FROM pg_locks; + ``` + """ + use Essig.RepoSingleConn + + def with_lock(kind, fun) do + lock_key = :erlang.phash2([Essig.Context.current_scope(), kind]) + + case get_lock(lock_key) do + {:ok, %{rows: [[true]]}} -> + try do + fun.() + after + release_lock(lock_key) + end + + _ -> + {:error, :locked} + end + end + + def get_lock(key) do + Ecto.Adapters.SQL.query(RepoSingleConn, "SELECT pg_try_advisory_lock($1)", [key], []) + end + + def release_lock(key) do + Ecto.Adapters.SQL.query(RepoSingleConn, "SELECT pg_advisory_unlock($1)", [key], []) + end +end diff --git a/lib/pg_notify_listener.ex b/lib/pg_notify_listener.ex new file mode 100644 index 0000000..ab90d0e --- /dev/null +++ b/lib/pg_notify_listener.ex @@ -0,0 +1,32 @@ +defmodule Essig.PGNotifyListener do + @moduledoc """ + Receives notifications from the database (signals table / new_events channel) + and rebroadcasts them to the `Phoenix.PubSub` system. + + This is a small payload once per transaction. + """ + use GenServer + + def start_link(_), do: GenServer.start_link(__MODULE__, []) + + def init(_arg) do + config = Essig.Repo.config() + config = Keyword.put(config, :auto_reconnect, true) + {:ok, pid} = Postgrex.Notifications.start_link(config) + Postgrex.Notifications.listen(pid, "new_events") + {:ok, []} + end + + def handle_info({:notification, _connection_pid, _ref, _channel, payload}, state) do + with {:ok, map} = Jason.decode(payload) do + rebroadcast(map) + end + + {:noreply, state} + end + + def rebroadcast(map) do + map = Essig.Helpers.Map.atomize_keys(map) + Essig.Pubsub.broadcast("new_events", {:new_events, map}) + end +end diff --git a/lib/pubsub.ex b/lib/pubsub.ex new file mode 100644 index 0000000..673fb58 --- /dev/null +++ b/lib/pubsub.ex @@ -0,0 +1,13 @@ +defmodule Essig.Pubsub do + def broadcast(topic, message) do + Phoenix.PubSub.broadcast(Essig.PubSub, topic, message) + end + + def subscribe(topic) do + Phoenix.PubSub.subscribe(Essig.PubSub, topic) + end + + def unsubscribe(topic) do + Phoenix.PubSub.unsubscribe(Essig.PubSub, topic) + end +end diff --git a/lib/repo_single_conn.ex b/lib/repo_single_conn.ex new file mode 100644 index 0000000..5e8fd30 --- /dev/null +++ b/lib/repo_single_conn.ex @@ -0,0 +1,39 @@ +defmodule Essig.RepoSingleConn do + @moduledoc """ + This is special RepoSingleConn, with a single connection. + We use this when getting a DB advisory lock and releasing it afterwards. + This is not possible with the standard Ecto.Repo outside of a transaction. + + To keep the configuration overhead low, we use dynamic config (init-callback) and + copy the main config for the Essig.Repo with a few tweaks. + This means the DB config stays unchanged. + """ + + use Ecto.Repo, + otp_app: :essig, + adapter: Ecto.Adapters.Postgres + + use EctoCursorBasedStream + + @impl true + def init(_type, _config) do + special_config = [ + telemetry_prefix: [:essig, :sand_repo], + pool_size: 1 + ] + + main_config = Application.get_env(:essig, Essig.Repo) + config = Keyword.merge(main_config, special_config) + + {:ok, config} + end + + defmacro __using__(_) do + quote do + alias Essig.RepoSingleConn + require Ecto.Query + import Ecto.Query + import Ecto.Changeset + end + end +end diff --git a/lib/sample/events.ex b/lib/sample/events.ex new file mode 100644 index 0000000..762bbe7 --- /dev/null +++ b/lib/sample/events.ex @@ -0,0 +1,64 @@ +defmodule Sample.TestReports.Events.TicketMatchAdded do + @moduledoc """ + - a matching ticket was configured + - we must always have 2 (!!!) subtickets per Test Report process + - "bootloader" / "appsw" + - FIELDS + - [:match_kind, :id, :kind] + - EXAMPLE + - %{match_kind: "manual", id: 50, kind: "bootloader"} + - %{match_kind: "auto", id: 51, kind: "appsw"} + """ + use JsonSerde, alias: "trp.ticket_match_added" + defstruct [:match_kind, :id, :kind] +end + +defmodule Sample.TestReports.Events.ReqTestReportsUpdated do + @moduledoc """ + - there was a change in the required test reports config + - a value was set OR deleted + - removal is only allowed for admins! + - maybe also allow for everyone, we keep the full history anyways + - normal users can only add requirements + - LIST of operations + - `[[name, op, value], [name, op, value]]` + - EXAMPLE: + [ + {:fota, :set, true}, + {:fota, :freeze, true}, + {:fota, :set, false}, + {:fota, :freeze, false}, + ] + + """ + use JsonSerde, alias: "trp.req_test_reports_updated" + defstruct [:ops] +end + +defmodule Sample.TestReports.Events.MasterReportAdded do + @moduledoc """ + - an xml file with master report data was found in ZIP and could be parsed + - we store the XML file name, the content is kept in the BinStorage (BinStorageMasterReport) system (can be potentially multiple MBs) + - we also store the parsed information for the XML file + - and the event version (?) + """ + use JsonSerde, alias: "trp.master_report_added" + defstruct [:path, :report] +end + +defmodule Sample.TestReports.MasterReport do + defstruct meta: %{}, + tool_versions: [], + test_components: [], + test_cases: [] +end + +defimpl Inspect, for: Sample.TestReports.MasterReport do + def inspect(mreport, _opts) do + ~s|Sample.TestReports.MasterReport| + end + + def show_meta(meta) do + ~s|{name: #{meta.name}, path: #{meta.path}, date: #{meta.date}, test_type: #{meta.test_type}, test_tool: #{meta.test_tool}}| + end +end diff --git a/lib/sample/my_genserver.ex b/lib/sample/my_genserver.ex new file mode 100644 index 0000000..dd73612 --- /dev/null +++ b/lib/sample/my_genserver.ex @@ -0,0 +1,24 @@ +defmodule MyGenServer do + use GenServer + + def start_link(_) do + GenServer.start_link(__MODULE__, %{}) + end + + def init(state) do + Phoenix.PubSub.subscribe(Essig.PubSub, pubsub_topic()) + {:ok, state} + end + + def handle_info({:new_event, event}, state) do + # Handle the event + IO.inspect(Essig.Context.current_scope(), label: "Current scope") + IO.inspect(event, label: "Received event") + {:noreply, state} + end + + defp pubsub_topic() do + scope_uuid = Essig.Context.current_scope() + "events:#{scope_uuid}" + end +end diff --git a/lib/schemas/cast.ex b/lib/schemas/cast.ex index 0bd23fd..df0b0e3 100644 --- a/lib/schemas/cast.ex +++ b/lib/schemas/cast.ex @@ -1,7 +1,7 @@ defmodule Essig.Schemas.Cast do use Ecto.Schema - @primary_key {:cast_uuid, Ecto.UUID, autogenerate: {Essig.Ecto.UUID7, :generate, []}} + @primary_key {:cast_uuid, Ecto.UUID, autogenerate: {Essig.UUID7, :generate, []}} schema "essig_casts" do # this is another "primary" key, used for global ordering (+ and when fetching all stream) field(:id, :integer, read_after_writes: true) diff --git a/lib/schemas/event.ex b/lib/schemas/event.ex index 743c5f0..49144a9 100644 --- a/lib/schemas/event.ex +++ b/lib/schemas/event.ex @@ -1,7 +1,7 @@ defmodule Essig.Schemas.Event do use Ecto.Schema - @primary_key {:event_uuid, Ecto.UUID, autogenerate: {Essig.Ecto.UUID7, :generate, []}} + @primary_key {:event_uuid, Ecto.UUID, autogenerate: {Essig.UUID7, :generate, []}} schema "essig_events" do # this is another "primary" key, used for global ordering (+ and when fetching all stream) field(:id, :integer, read_after_writes: true) @@ -15,6 +15,10 @@ defmodule Essig.Schemas.Event do field(:seq, :integer) + ## transaction metadata + field(:txid, :integer, read_after_writes: true) + field(:snapmin, :integer, read_after_writes: true) + # no updated_at! timestamps(type: :utc_datetime_usec, updated_at: false) end diff --git a/lib/schemas/scope.ex b/lib/schemas/scope.ex index 268d2a9..fdffb02 100644 --- a/lib/schemas/scope.ex +++ b/lib/schemas/scope.ex @@ -1,7 +1,7 @@ defmodule Essig.Schemas.Scope do use Ecto.Schema - @primary_key {:scope_uuid, Ecto.UUID, autogenerate: {Essig.Ecto.UUID7, :generate, []}} + @primary_key {:scope_uuid, Ecto.UUID, autogenerate: {Essig.UUID7, :generate, []}} schema "essig_scopes" do # this is another "primary" key, used for global ordering (+ and when fetching all stream) field(:id, :integer, read_after_writes: true) diff --git a/lib/schemas/stream.ex b/lib/schemas/stream.ex index e2eb240..771f9f7 100644 --- a/lib/schemas/stream.ex +++ b/lib/schemas/stream.ex @@ -1,7 +1,7 @@ defmodule Essig.Schemas.Stream do use Ecto.Schema - @primary_key {:stream_uuid, Ecto.UUID, autogenerate: {Essig.Ecto.UUID7, :generate, []}} + @primary_key {:stream_uuid, Ecto.UUID, autogenerate: {Essig.UUID7, :generate, []}} schema "essig_streams" do # this is another "primary" key, used for global ordering (+ and when fetching all stream) field(:id, :integer, read_after_writes: true) diff --git a/lib/server_test.exs b/lib/server_test.exs index 9109298..1920eed 100644 --- a/lib/server_test.exs +++ b/lib/server_test.exs @@ -1,50 +1,56 @@ defmodule Essig.ServerTest do - use ExUnit.Case, async: true + use Essig.DataCase import Liveness describe "full_run" do test "works with casts" do - Essig.Server.start_scope("app1") + scope_uuid = Essig.UUID7.generate() + Essig.Server.start_scope(scope_uuid) Essig.Server.start_casts([SampleCast1, SampleCast2]) - pid = Essig.Scopes.Registry.get("app1") + pid = Essig.Scopes.Registry.get(scope_uuid) assert is_pid(pid) - assert "app1" in Essig.Scopes.Registry.keys() + assert scope_uuid in Essig.Scopes.Registry.keys() assert is_pid(Essig.Server.get_cast(SampleCast1)) assert is_pid(Essig.Server.get_cast(SampleCast2)) Process.flag(:trap_exit, true) GenServer.stop(pid) - assert eventually(fn -> Essig.Scopes.Registry.get("app1") == nil end) + assert eventually(fn -> Essig.Scopes.Registry.get(scope_uuid) == nil end) - assert_raise ArgumentError, "unknown registry: Essig.Casts.Registry_app1", fn -> - is_pid(Essig.Server.get_cast(SampleCast1)) - end + assert_raise ArgumentError, + "unknown registry: :\"Elixir.Essig.Casts.Registry_#{scope_uuid}\"", + fn -> + is_pid(Essig.Server.get_cast(SampleCast1)) + end - refute "app1" in Essig.Scopes.Registry.keys() + refute scope_uuid in Essig.Scopes.Registry.keys() end test "works with entities" do - Essig.Server.start_scope("app1") + scope_uuid = Essig.UUID7.generate() + Essig.Server.start_scope(scope_uuid) {:ok, _} = Essig.Server.start_entity(Entities.Entity1, "1") # duplicate entities are prevented {:error, {:already_started, _}} = Essig.Server.start_entity(Entities.Entity1, "1") - pid = Essig.Scopes.Registry.get("app1") + pid = Essig.Scopes.Registry.get(scope_uuid) assert is_pid(pid) - assert "app1" in Essig.Scopes.Registry.keys() + assert scope_uuid in Essig.Scopes.Registry.keys() assert is_pid(Essig.Server.get_entity(Entities.Entity1, "1")) Process.flag(:trap_exit, true) GenServer.stop(pid) - assert eventually(fn -> Essig.Scopes.Registry.get("app1") == nil end) + assert eventually(fn -> Essig.Scopes.Registry.get(scope_uuid) == nil end) - assert_raise ArgumentError, "unknown registry: Essig.Entities.Registry_app1", fn -> - is_pid(Essig.Server.get_entity(Entities.Entity1, "1")) - end + assert_raise ArgumentError, + "unknown registry: :\"Elixir.Essig.Entities.Registry_#{scope_uuid}\"", + fn -> + is_pid(Essig.Server.get_entity(Entities.Entity1, "1")) + end end end end diff --git a/lib/ecto/uuid7.ex b/lib/uuid7.ex similarity index 77% rename from lib/ecto/uuid7.ex rename to lib/uuid7.ex index 59516ae..2028dc5 100644 --- a/lib/ecto/uuid7.ex +++ b/lib/uuid7.ex @@ -1,4 +1,4 @@ -defmodule Essig.Ecto.UUID7 do +defmodule Essig.UUID7 do @moduledoc """ wrapper around Uniq.UUID """ diff --git a/mix.exs b/mix.exs index f3ab7df..21697c8 100644 --- a/mix.exs +++ b/mix.exs @@ -43,9 +43,13 @@ defmodule Essig.MixProject do ## PUB-SUB {:phoenix_pubsub, "~> 2.1"}, + ## State machine handling + {:gen_state_machine, "~> 3.0"}, + ## UTIL {:json_serde, github: "maxohq/json_serde"}, {:liveness, "~> 1.0.0"}, + {:gen_cache, "~> 0.1"}, {:uniq, "~> 0.6"}, ## DEBUG diff --git a/mix.lock b/mix.lock index dbac717..df40ba0 100644 --- a/mix.lock +++ b/mix.lock @@ -10,6 +10,8 @@ "ecto_sql": {:hex, :ecto_sql, "3.12.0", "73cea17edfa54bde76ee8561b30d29ea08f630959685006d9c6e7d1e59113b7d", [:mix], [{:db_connection, "~> 2.4.1 or ~> 2.5", [hex: :db_connection, repo: "hexpm", optional: false]}, {:ecto, "~> 3.12", [hex: :ecto, repo: "hexpm", optional: false]}, {:myxql, "~> 0.7", [hex: :myxql, repo: "hexpm", optional: true]}, {:postgrex, "~> 0.19 or ~> 1.0", [hex: :postgrex, repo: "hexpm", optional: true]}, {:tds, "~> 2.1.1 or ~> 2.2", [hex: :tds, repo: "hexpm", optional: true]}, {:telemetry, "~> 0.4.0 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "dc9e4d206f274f3947e96142a8fdc5f69a2a6a9abb4649ef5c882323b6d512f0"}, "ets_select": {:hex, :ets_select, "0.1.3", "6ddb60480d8fadb1949d8ac9a06feb95750993adadceb19276bc6fd588326795", [:mix], [], "hexpm", "d2006673d24023a4c97baa56b0116d4dd1c73d43024c03b889aa1370634bb1ef"}, "file_system": {:hex, :file_system, "1.0.1", "79e8ceaddb0416f8b8cd02a0127bdbababe7bf4a23d2a395b983c1f8b3f73edd", [:mix], [], "hexpm", "4414d1f38863ddf9120720cd976fce5bdde8e91d8283353f0e31850fa89feb9e"}, + "gen_cache": {:hex, :gen_cache, "0.1.0", "2ee13fc603a1f0503b6ea6c411568ef0f9cd12f71215266d420e5c8649eb90cc", [:mix], [], "hexpm", "7e1d71307fe32a4dc028c843ea9ba13973a9767302b38f17a705462763f6cad2"}, + "gen_state_machine": {:hex, :gen_state_machine, "3.0.0", "1e57f86a494e5c6b14137ebef26a7eb342b3b0070c7135f2d6768ed3f6b6cdff", [:mix], [], "hexpm", "0a59652574bebceb7309f6b749d2a41b45fdeda8dbb4da0791e355dd19f0ed15"}, "glob_ex": {:hex, :glob_ex, "0.1.8", "f7ef872877ca2ae7a792ab1f9ff73d9c16bf46ecb028603a8a3c5283016adc07", [:mix], [], "hexpm", "9e39d01729419a60a937c9260a43981440c43aa4cadd1fa6672fecd58241c464"}, "jason": {:hex, :jason, "1.4.4", "b9226785a9aa77b6857ca22832cffa5d5011a667207eb2a0ad56adb5db443b8a", [:mix], [{:decimal, "~> 1.0 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm", "c5eb0cab91f094599f94d55bc63409236a8ec69a21a67814529e8d5f6cc90b3b"}, "json_serde": {:git, "https://github.com/maxohq/json_serde.git", "4cee6a51a8ff04be3c10c7924d9d0588187d56d9", []}, diff --git a/snippets/experiment.txt b/snippets/experiment.txt new file mode 100644 index 0000000..c8efbd3 --- /dev/null +++ b/snippets/experiment.txt @@ -0,0 +1,37 @@ +DROP TABLE IF EXISTS rows; + +CREATE TABLE rows ( + id SERIAL PRIMARY KEY, + name varchar, + txid bigserial NOT NULL, + snapmin bigserial NOT NULL, + timestamp timestamp default current_timestamp +); + +CREATE OR REPLACE FUNCTION pgx_add_txid() + RETURNS TRIGGER AS $$ + BEGIN + NEW.txid := pg_current_xact_id(); + NEW.snapmin := pg_snapshot_xmin(pg_current_snapshot()); + RETURN NEW; + END; + $$ LANGUAGE plpgsql; + + + +CREATE OR REPLACE TRIGGER pgx_add_txid_before_insert_update + BEFORE INSERT OR UPDATE ON rows + FOR EACH ROW + EXECUTE FUNCTION pgx_add_txid(); + + +-- run this in 2 psql shells, interleaving the inserts. this helps to see thetxid vs snapmin relationship -- + +begin; +insert into rows (name) values ('1'); +insert into rows (name) values ('2'); +insert into rows (name) values ('3'); + +commit; + +select * from rows order by id desc; diff --git a/snippets/gen_statem.txt b/snippets/gen_statem.txt new file mode 100644 index 0000000..d9bc789 --- /dev/null +++ b/snippets/gen_statem.txt @@ -0,0 +1,51 @@ +### Videos + +- [State (Machine) Of Enlightenment - 01.09.2024](https://www.youtube.com/watch?v=5ym6va__LW8) +- [gen_statem Unveiled: A Theoretical Exploration of State Machines - FOSDEM 2024](https://fosdem.org/2024/schedule/event/fosdem-2024-2130-genstatem-unveiled-a-theoretical-exploration-of-state-machines/) + - [Death by Accidental Complexity by Ulf Wiger](https://www.infoq.com/presentations/Death-by-Accidental-Complexity/) + - https://dm3.github.io/2010/08/01/death-by-accidental-complexity.html + - https://github.com/uwiger/plain_fsm + +- [Pretty state machine - 2019](https://codesync.global/media/pretty-state-machine/) + +- [Lonestar ElixirConf 2018 - Managing state in distributed Elixir - Jerel Unruh](https://www.youtube.com/watch?v=V3iBgStaPmA&t=1131s) + - from 18:45 +- [Andrea Leopardi [GigCityElixir24] The World is a Network](https://youtu.be/9UFeQ11soQg?t=1641) +- [Szymon Świerk - Building event-driven state machines with gen_statem | Elixir Community Krakow](https://www.youtube.com/watch?v=ehZoWwMjWBw&t=137s) +- [Get more out of OTP with GenStateMachine | Erlang Solutions Webinar - 2020](https://www.youtube.com/watch?v=NW2b6lBuBas&t=3113s) +- [Raimo Niskanen - gen_statem - The Tool You Never Knew You Always Wanted - Code BEAM SF 2018](https://www.youtube.com/watch?v=f_jl6MR3kXQ&t=1514s) + +Articles: + +- https://www.erlang.org/doc/system/statem.html (design docs) +- [gen_statem in context - 2024](https://moosie.us/gen_statem_in_context) +- https://andrealeopardi.com/posts/connection-managers-with-gen-statem/ +- https://github.com/antoinereyt/gen_statem_meetup?tab=readme-ov-file +- https://2024-06-06.adoptingerlang.org/docs/cheat_sheets/ + - gen_statem cheat sheet +- https://meraj-gearhead.ca/state-machine-in-elixir-using-erlangs-genstatem-behaviour +- https://dockyard.com/blog/2020/01/31/state-timeouts-with-gen_statem +- https://slides.com/jprem/state-machines-in-elixir-with-gen_statem#/1/4 +- https://shortishly.com/blog/free-tracing-and-event-logging-with-sys/ + - free debugging and logging with sys +- https://erlangforums.com/t/pgmp-postgresql-client-with-logical-replication-to-ets/1707/17 + - great description of how to use gen_statem to handle logical replication events + - https://github.com/pgmp/pgmp_client/blob/master/lib/pgmp_client/replication_connection.ex + +- Shortishly ARTCILES: + [[shortishly-articles.txt]] + + +The gen_statem package, which is part of OTP, offers several special features: + + 1. State-based event handling: Events are handled differently based on the current state. + 2. Flexible state representation: States can be atoms, tuples, or any Erlang term. + 3. State data: Each state can have associated data. + 4. Timeout events: You can set timeouts to automatically trigger state transitions. + 5. State enter calls: Special callbacks can be defined for when a state is entered. + 6. Postponing events: Events can be postponed and handled later in a different state. + 7. State timeouts: Timeouts can be set specifically for each state. + 8. Generic time events: You can set events to occur at specific times. + + + diff --git a/snippets/init_casts.txt b/snippets/init_casts.txt new file mode 100644 index 0000000..8091954 --- /dev/null +++ b/snippets/init_casts.txt @@ -0,0 +1,22 @@ +scope_uuid = Essig.UUID7.generate() + +Essig.Server.start_scope(scope_uuid) +Essig.Server.start_casts([SampleCast1]) + +stream_uuid = Essig.UUID7.generate() + +{:ok, %{events: events}} = Essig.EventStore.append_to_stream(stream_uuid, "trp", 0, [ + %Sample.TestReports.Events.MasterReportAdded{path: "local/path/to", report: "report 1"}, + %Sample.TestReports.Events.MasterReportAdded{path: "local/path/to", report: "report 2"}, + %Sample.TestReports.Events.MasterReportAdded{path: "local/path/to", report: "report 3"}, +]) + +Essig.Casts.CastRunner.send_events(SampleCast1, events) + + +{:ok, %{events: events}} = Essig.EventStore.append_to_stream(stream_uuid, "trp", 3, [ + %Sample.TestReports.Events.MasterReportAdded{path: "local/path/to", report: "report 1"}, + %Sample.TestReports.Events.MasterReportAdded{path: "local/path/to", report: "report 2"}, + %Sample.TestReports.Events.MasterReportAdded{path: "local/path/to", report: "report 3"}, +]) +Essig.Casts.CastRunner.send_events(SampleCast1, events) diff --git a/snippets/postgres_logical_replication.txt b/snippets/postgres_logical_replication.txt new file mode 100644 index 0000000..6c072b9 --- /dev/null +++ b/snippets/postgres_logical_replication.txt @@ -0,0 +1,30 @@ + + +Practical Notes in Change Data Capture with Debezium and Postgres +- https://medium.com/cermati-tech/practical-notes-in-change-data-capture-with-debezium-and-postgres-fe31bb11ab78 +- - https://debezium.io/blog/2021/10/07/incremental-snapshots/ (!!!) + ### Watermark-based Snapshots + In late 2019, the Netflix engineering team announced that they had developed an in-house change data capture framework. They also came up with an innovative solution of executing concurrent snapshots using watermarking, described in the paper DBLog: A Watermark Based Change-Data-Capture Framework by Andreas Andreakis and Ioannis Papapanagiotou. + The main idea behind this approach is that change data streaming is executed continuously together with snapshotting. The framework inserts low and high watermarks into the transaction log (by writing to the source database) and between those two points, a part of the snapshotted table is read. The framework keeps a record of database changes in between the watermarks and reconciles them with the snapshotted values, if the same records are snapshotted and modified during the window. + This means that the data is snapshotted in chunks - no lengthy process at the connector start, and also in case of crashes or a controlled termination of the connector, the snapshotting can be resumed since the last completed chunk. + As per Netflix, the implementation is provided for MySQL and PostgreSQL databases. + + + ### Signalling Table + Before moving to Debezium’s implementation of the watermark-based snapshotting approach, a small detour is needed. + Sometimes it can be useful to control Debezium from the outside, so to force it to execute some requested action. + Let’s suppose it is necessary to re-snapshot an already snapshotted table - a so-called ad-hoc snapshot. + The user would need to send a command to Debezium to pause the current operation and do the snapshot. + For that purpose, Debezium defines the concept signals, issued via a signalling table. + This is a special table, designated for communication between the user and Debezium. + Debezium captures the table and when the user requires a certain operation to be executed, + they simply write a record to the signalling table (sending a signal). + Debezium will receive the captured change and then execute the required action. + + + +- DBLog: A Watermark Based Change-Data-Capture Framework + - https://arxiv.org/pdf/2010.12597v1 + - https://github.com/abhishek-ch/around-dataengineering/blob/master/docs/dblog_netflix_cdc.md + - https://netflixtechblog.com/dblog-a-generic-change-data-capture-framework-69351fb9099b - DBLog: A Generic Change-Data-Capture Framework + - https://medium.com/brexeng/change-data-capture-at-brex-c71263616dd7 - Change Data Capture at Brex diff --git a/snippets/shortishly-articles.txt b/snippets/shortishly-articles.txt new file mode 100644 index 0000000..cfafa5f --- /dev/null +++ b/snippets/shortishly-articles.txt @@ -0,0 +1,25 @@ + + +- https://shortishly.com/blog/cache-consistency-with-streaming-replication/ + - Cache consistency with logical streaming replication + + - https://shortishly.com/blog/pgmp-log-rep-postgresql-fifteen/ + - PGMP Logical Replication in PostgreSQL 15 + + - https://shortishly.com/blog/pgec-read-write-notify/ + - pgec reads: memory replicated cache, writes: PostgreSQL with a Redis API + - https://github.com/shortishly/pgec + + - https://shortishly.com/blog/postgresql-edge-cache/ + - pgec is a real-time in memory database replication cache, with a memcached and REST API + + - https://shortishly.com/blog/mysql-replication-redis-api/ + msec is a disk cache with a Redis compatible API using MySQL/MariaDB replication to remain consistent. Inserts, updates or deletes in the database are replicated in real time and persisted to a local store. + + - https://shortishly.com/blog/property-testing-a-database-driver/ + + + + + - https://erlangforums.com/t/pgmp-postgresql-client-with-logical-replication-to-ets/1707/17 + - great description of how to use gen_statem to handle logical replication events \ No newline at end of file diff --git a/snippets/traffic-light.txt b/snippets/traffic-light.txt new file mode 100644 index 0000000..2476375 --- /dev/null +++ b/snippets/traffic-light.txt @@ -0,0 +1,36 @@ +defmodule TrafficLight do + use GenStateMachine + + def start_link do + GenStateMachine.start_link(__MODULE__, :red, name: __MODULE__) + end + + # Add this init function + def init(:red) do + {:ok, :red, nil} + end + + def change do + GenStateMachine.cast(__MODULE__, :change) + end + + def state do + GenStateMachine.call(__MODULE__, :get_state) + end + + def handle_event(:cast, :change, :red, _data) do + {:next_state, :green, nil} + end + + def handle_event(:cast, :change, :green, _data) do + {:next_state, :yellow, nil} + end + + def handle_event(:cast, :change, :yellow, _data) do + {:next_state, :red, nil} + end + + def handle_event({:call, from}, :get_state, state, _data) do + {:keep_state_and_data, [{:reply, from, state}]} + end +end