From ff5760063b504cd061e7ee0fd20312a7c3e31950 Mon Sep 17 00:00:00 2001 From: Roman Heinrich Date: Fri, 27 Dec 2024 20:26:33 +0100 Subject: [PATCH 1/5] Feat: adjust module namespace + reduce log level to debug for Projections.Runner --- lib/projections/runner.ex | 24 +++++++++---------- lib/projections/runner/common.ex | 2 +- .../runner/read_from_event_store.ex | 17 +++++++------ 3 files changed, 21 insertions(+), 22 deletions(-) diff --git a/lib/projections/runner.ex b/lib/projections/runner.ex index 1b3465e..811c590 100644 --- a/lib/projections/runner.ex +++ b/lib/projections/runner.ex @@ -35,7 +35,7 @@ defmodule Essig.Projections.Runner do require Logger alias Essig.Projections.Data - alias Projections.Runner.Common + alias Essig.Projections.Runner.Common # Client API @@ -72,7 +72,7 @@ defmodule Essig.Projections.Runner do @impl true def init(%{name: name, pause_ms: pause_ms, module: module} = data) do scope_uuid = Essig.Context.current_scope() - info(data, "Init with pause_ms #{pause_ms}") + debug(data, "Init with pause_ms #{pause_ms}") row = fetch_last_record(name) store_max_id = Essig.EventStoreReads.last_id(scope_uuid) @@ -104,14 +104,14 @@ defmodule Essig.Projections.Runner do end def handle_event({:call, from}, {:set_pause_ms, pause_ms}, state, data) do - info(data, "set_pause_ms - #{state} - #{pause_ms}") + debug(data, "set_pause_ms - #{state} - #{pause_ms}") actions = [{:reply, from, :ok}, {:state_timeout, pause_ms, :paused}] {:keep_state, %Data{data | pause_ms: pause_ms}, actions} end def handle_event({:call, from}, :pause, state, data) do - info(data, "pause - #{state}") + debug(data, "pause - #{state}") {:keep_state_and_data, [ @@ -123,12 +123,12 @@ defmodule Essig.Projections.Runner do end def handle_event({:call, from}, :resume, state, data) do - info(data, "resume - #{state}") + debug(data, "resume - #{state}") {:keep_state_and_data, [{:reply, from, :ok}, {:next_event, :internal, :resume}]} end def handle_event({:call, from}, :reset, state, data) do - info(data, "reset - #{state}") + debug(data, "reset - #{state}") # 1. call the projection-specific logic data.module.handle_reset(data) @@ -153,14 +153,14 @@ defmodule Essig.Projections.Runner do ########### `internal` EVENTS handlers def handle_event(:internal, :init_storage, :bootstrap, data = %Data{}) do - info(data, "init_storage - #{:bootstrap}") + debug(data, "init_storage - #{:bootstrap}") data.module.handle_init_storage(data) :keep_state_and_data end def handle_event(:internal, :load_from_eventstore, state, data = %Data{}) do - info(data, "load_from_eventstore - #{state}") - Projections.Runner.ReadFromEventStore.run(data) + debug(data, "load_from_eventstore - #{state}") + Essig.Projections.Runner.ReadFromEventStore.run(data) end # resume reading, pause timeout triggered @@ -184,7 +184,7 @@ defmodule Essig.Projections.Runner do def handle_event(:info, {:new_events, notification}, state, data) when state in [:bootstrap, :idle] do - info(data, "new_events - #{state}") + debug(data, "new_events - #{state}") ## we get a notification from the pubsub, that there are new events %{max_id: max_id} = notification @@ -213,7 +213,7 @@ defmodule Essig.Projections.Runner do end end - def info(data, msg) do - Logger.info("Projections.Runner-> #{inspect(data.name)}: #{msg}") + def debug(data, msg) do + Logger.debug("Projections.Runner-> #{inspect(data.name)}: #{msg}") end end diff --git a/lib/projections/runner/common.ex b/lib/projections/runner/common.ex index 1d86843..085a288 100644 --- a/lib/projections/runner/common.ex +++ b/lib/projections/runner/common.ex @@ -1,4 +1,4 @@ -defmodule Projections.Runner.Common do +defmodule Essig.Projections.Runner.Common do alias Essig.Projections.Data def fetch_events(scope_uuid, max_id, amount) do diff --git a/lib/projections/runner/read_from_event_store.ex b/lib/projections/runner/read_from_event_store.ex index 25c3643..29751b8 100644 --- a/lib/projections/runner/read_from_event_store.ex +++ b/lib/projections/runner/read_from_event_store.ex @@ -1,10 +1,9 @@ -defmodule Projections.Runner.ReadFromEventStore do +defmodule Essig.Projections.Runner.ReadFromEventStore do alias Essig.Projections.Data - alias Projections.Runner.Common + alias Essig.Projections.Runner.Common require Logger def run(data = %Data{row: row, pause_ms: pause_ms, store_max_id: store_max_id} = data) do - IO.inspect(data, label: "ReadFromEventStore - data") scope_uuid = Essig.Context.current_scope() events = Common.fetch_events(scope_uuid, row.max_id, Essig.Config.events_per_batch()) multi_tuple = {Ecto.Multi.new(), data} @@ -17,12 +16,12 @@ defmodule Projections.Runner.ReadFromEventStore do if length(events) > 0 do last_event = List.last(events) - info(data, "CURRENT MAX ID #{last_event.id}") + debug(data, "CURRENT MAX ID #{last_event.id}") # not sure, what to do with response. BUT: projections MUST NEVER fail. {:ok, _multi_results} = Essig.Repo.transaction(multi) |> IO.inspect() if last_event.id != store_max_id do - info(data, "paused for #{pause_ms}ms...") + debug(data, "paused for #{pause_ms}ms...") row = Common.update_external_state(data, row, %{max_id: last_event.id, seq: last_event.seq}) @@ -36,7 +35,7 @@ defmodule Projections.Runner.ReadFromEventStore do {:keep_state, %Data{data | row: row}, actions} else # finished... - info(data, "finished") + debug(data, "finished") row = Common.update_external_state(data, row, %{ @@ -48,13 +47,13 @@ defmodule Projections.Runner.ReadFromEventStore do {:next_state, :idle, %Data{data | row: row}} end else - info(data, "EMPTY EVENTS") + debug(data, "EMPTY EVENTS") row = Common.update_external_state(data, row, %{status: :idle}) {:next_state, :idle, %Data{data | row: row}} end end - def info(data, msg) do - Logger.info("Projection #{inspect(data.name)}: #{msg}") + def debug(data, msg) do + Logger.debug("Projection #{inspect(data.name)}: #{msg}") end end From 660c0d2884ef9db9dc654b381d8aeb0ad509e898 Mon Sep 17 00:00:00 2001 From: Roman Heinrich Date: Fri, 27 Dec 2024 20:26:51 +0100 Subject: [PATCH 2/5] Chore: add function to reset sample projections --- lib/sample/start_sample.ex | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/lib/sample/start_sample.ex b/lib/sample/start_sample.ex index 86cb481..e8422cb 100644 --- a/lib/sample/start_sample.ex +++ b/lib/sample/start_sample.ex @@ -8,6 +8,11 @@ defmodule Sample.StartSample do ) end + def reset do + Essig.Projections.Runner.reset(Sample.Projections.Proj1) + Essig.Projections.Runner.reset(Sample.Projections.Proj2) + end + def add_events do ## insert events events = [ From 4efb13be28d7e7c05b0236d4424439aaacf4c6a7 Mon Sep 17 00:00:00 2001 From: Roman Heinrich Date: Fri, 27 Dec 2024 20:27:33 +0100 Subject: [PATCH 3/5] Feat: provide module to control the log level for projections --- lib/projections/config.ex | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) create mode 100644 lib/projections/config.ex diff --git a/lib/projections/config.ex b/lib/projections/config.ex new file mode 100644 index 0000000..b60e3f9 --- /dev/null +++ b/lib/projections/config.ex @@ -0,0 +1,17 @@ +defmodule Essig.Projections.Config do + @modules [Essig.Projections.Runner, Essig.Projections.Runner.ReadFromEventStore] + + def set_log_debug() do + set_log_level(:debug) + end + + def set_log_info() do + set_log_level(:info) + end + + def set_log_level(level) do + for module <- @modules do + Logger.put_module_level(module, level) + end + end +end From 6e3ee2bd0967a693f83dcbd4fb999ffc2f3514f7 Mon Sep 17 00:00:00 2001 From: Roman Heinrich Date: Fri, 27 Dec 2024 20:33:34 +0100 Subject: [PATCH 4/5] Chore: remove inspect on repo.transaction --- lib/projections/runner/read_from_event_store.ex | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/projections/runner/read_from_event_store.ex b/lib/projections/runner/read_from_event_store.ex index 29751b8..4ca6f41 100644 --- a/lib/projections/runner/read_from_event_store.ex +++ b/lib/projections/runner/read_from_event_store.ex @@ -18,7 +18,7 @@ defmodule Essig.Projections.Runner.ReadFromEventStore do debug(data, "CURRENT MAX ID #{last_event.id}") # not sure, what to do with response. BUT: projections MUST NEVER fail. - {:ok, _multi_results} = Essig.Repo.transaction(multi) |> IO.inspect() + {:ok, _multi_results} = Essig.Repo.transaction(multi) if last_event.id != store_max_id do debug(data, "paused for #{pause_ms}ms...") From d68a317c70479c8933b50f948946bcea27f457cd Mon Sep 17 00:00:00 2001 From: Roman Heinrich Date: Fri, 27 Dec 2024 20:33:50 +0100 Subject: [PATCH 5/5] Chore: less noise in custom projection logic --- lib/sample/projections/proj1.ex | 2 +- lib/sample/projections/proj2.ex | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/lib/sample/projections/proj1.ex b/lib/sample/projections/proj1.ex index 37242e0..be1ae0e 100644 --- a/lib/sample/projections/proj1.ex +++ b/lib/sample/projections/proj1.ex @@ -18,7 +18,7 @@ defmodule Sample.Projections.Proj1 do def handle_event(multi, data = %Data{}, {event, index}) do multi = Ecto.Multi.run(multi, {:event, index}, fn _repo, _changes -> - IO.inspect(event.data, label: "index-#{index}") + # IO.inspect(event.data, label: "index-#{index}") Repo.insert_all("projection_proj1", [%{id: index, data: "OK"}]) {:ok, 1} end) diff --git a/lib/sample/projections/proj2.ex b/lib/sample/projections/proj2.ex index 89e5d78..83a87e4 100644 --- a/lib/sample/projections/proj2.ex +++ b/lib/sample/projections/proj2.ex @@ -7,7 +7,7 @@ defmodule Sample.Projections.Proj2 do def handle_event(multi, data = %Data{}, {event, index}) do multi = Ecto.Multi.run(multi, {:event, index}, fn _repo, _changes -> - IO.inspect(event.data, label: "index-#{index}") + # IO.inspect(event.data, label: "index-#{index}") Repo.insert_all("projection_proj2", [%{id: index, data: "OK"}]) {:ok, 1} end)