Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat/less noisy logs for projections #6

Merged
merged 5 commits into from
Dec 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions lib/projections/config.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
defmodule Essig.Projections.Config do
@modules [Essig.Projections.Runner, Essig.Projections.Runner.ReadFromEventStore]

def set_log_debug() do
set_log_level(:debug)
end

def set_log_info() do
set_log_level(:info)
end

def set_log_level(level) do
for module <- @modules do
Logger.put_module_level(module, level)
end
end
end
24 changes: 12 additions & 12 deletions lib/projections/runner.ex
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ defmodule Essig.Projections.Runner do
require Logger

alias Essig.Projections.Data
alias Projections.Runner.Common
alias Essig.Projections.Runner.Common

# Client API

Expand Down Expand Up @@ -72,7 +72,7 @@ defmodule Essig.Projections.Runner do
@impl true
def init(%{name: name, pause_ms: pause_ms, module: module} = data) do
scope_uuid = Essig.Context.current_scope()
info(data, "Init with pause_ms #{pause_ms}")
debug(data, "Init with pause_ms #{pause_ms}")
row = fetch_last_record(name)
store_max_id = Essig.EventStoreReads.last_id(scope_uuid)

Expand Down Expand Up @@ -104,14 +104,14 @@ defmodule Essig.Projections.Runner do
end

def handle_event({:call, from}, {:set_pause_ms, pause_ms}, state, data) do
info(data, "set_pause_ms - #{state} - #{pause_ms}")
debug(data, "set_pause_ms - #{state} - #{pause_ms}")

actions = [{:reply, from, :ok}, {:state_timeout, pause_ms, :paused}]
{:keep_state, %Data{data | pause_ms: pause_ms}, actions}
end

def handle_event({:call, from}, :pause, state, data) do
info(data, "pause - #{state}")
debug(data, "pause - #{state}")

{:keep_state_and_data,
[
Expand All @@ -123,12 +123,12 @@ defmodule Essig.Projections.Runner do
end

def handle_event({:call, from}, :resume, state, data) do
info(data, "resume - #{state}")
debug(data, "resume - #{state}")
{:keep_state_and_data, [{:reply, from, :ok}, {:next_event, :internal, :resume}]}
end

def handle_event({:call, from}, :reset, state, data) do
info(data, "reset - #{state}")
debug(data, "reset - #{state}")

# 1. call the projection-specific logic
data.module.handle_reset(data)
Expand All @@ -153,14 +153,14 @@ defmodule Essig.Projections.Runner do
########### `internal` EVENTS handlers

def handle_event(:internal, :init_storage, :bootstrap, data = %Data{}) do
info(data, "init_storage - #{:bootstrap}")
debug(data, "init_storage - #{:bootstrap}")
data.module.handle_init_storage(data)
:keep_state_and_data
end

def handle_event(:internal, :load_from_eventstore, state, data = %Data{}) do
info(data, "load_from_eventstore - #{state}")
Projections.Runner.ReadFromEventStore.run(data)
debug(data, "load_from_eventstore - #{state}")
Essig.Projections.Runner.ReadFromEventStore.run(data)
end

# resume reading, pause timeout triggered
Expand All @@ -184,7 +184,7 @@ defmodule Essig.Projections.Runner do

def handle_event(:info, {:new_events, notification}, state, data)
when state in [:bootstrap, :idle] do
info(data, "new_events - #{state}")
debug(data, "new_events - #{state}")
## we get a notification from the pubsub, that there are new events
%{max_id: max_id} = notification

Expand Down Expand Up @@ -213,7 +213,7 @@ defmodule Essig.Projections.Runner do
end
end

def info(data, msg) do
Logger.info("Projections.Runner-> #{inspect(data.name)}: #{msg}")
def debug(data, msg) do
Logger.debug("Projections.Runner-> #{inspect(data.name)}: #{msg}")
end
end
2 changes: 1 addition & 1 deletion lib/projections/runner/common.ex
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
defmodule Projections.Runner.Common do
defmodule Essig.Projections.Runner.Common do
alias Essig.Projections.Data

def fetch_events(scope_uuid, max_id, amount) do
Expand Down
19 changes: 9 additions & 10 deletions lib/projections/runner/read_from_event_store.ex
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
defmodule Projections.Runner.ReadFromEventStore do
defmodule Essig.Projections.Runner.ReadFromEventStore do
alias Essig.Projections.Data
alias Projections.Runner.Common
alias Essig.Projections.Runner.Common
require Logger

def run(data = %Data{row: row, pause_ms: pause_ms, store_max_id: store_max_id} = data) do
IO.inspect(data, label: "ReadFromEventStore - data")
scope_uuid = Essig.Context.current_scope()
events = Common.fetch_events(scope_uuid, row.max_id, Essig.Config.events_per_batch())
multi_tuple = {Ecto.Multi.new(), data}
Expand All @@ -17,12 +16,12 @@ defmodule Projections.Runner.ReadFromEventStore do
if length(events) > 0 do
last_event = List.last(events)

info(data, "CURRENT MAX ID #{last_event.id}")
debug(data, "CURRENT MAX ID #{last_event.id}")
# not sure, what to do with response. BUT: projections MUST NEVER fail.
{:ok, _multi_results} = Essig.Repo.transaction(multi) |> IO.inspect()
{:ok, _multi_results} = Essig.Repo.transaction(multi)

if last_event.id != store_max_id do
info(data, "paused for #{pause_ms}ms...")
debug(data, "paused for #{pause_ms}ms...")

row =
Common.update_external_state(data, row, %{max_id: last_event.id, seq: last_event.seq})
Expand All @@ -36,7 +35,7 @@ defmodule Projections.Runner.ReadFromEventStore do
{:keep_state, %Data{data | row: row}, actions}
else
# finished...
info(data, "finished")
debug(data, "finished")

row =
Common.update_external_state(data, row, %{
Expand All @@ -48,13 +47,13 @@ defmodule Projections.Runner.ReadFromEventStore do
{:next_state, :idle, %Data{data | row: row}}
end
else
info(data, "EMPTY EVENTS")
debug(data, "EMPTY EVENTS")
row = Common.update_external_state(data, row, %{status: :idle})
{:next_state, :idle, %Data{data | row: row}}
end
end

def info(data, msg) do
Logger.info("Projection #{inspect(data.name)}: #{msg}")
def debug(data, msg) do
Logger.debug("Projection #{inspect(data.name)}: #{msg}")
end
end
2 changes: 1 addition & 1 deletion lib/sample/projections/proj1.ex
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ defmodule Sample.Projections.Proj1 do
def handle_event(multi, data = %Data{}, {event, index}) do
multi =
Ecto.Multi.run(multi, {:event, index}, fn _repo, _changes ->
IO.inspect(event.data, label: "index-#{index}")
# IO.inspect(event.data, label: "index-#{index}")
Repo.insert_all("projection_proj1", [%{id: index, data: "OK"}])
{:ok, 1}
end)
Expand Down
2 changes: 1 addition & 1 deletion lib/sample/projections/proj2.ex
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ defmodule Sample.Projections.Proj2 do
def handle_event(multi, data = %Data{}, {event, index}) do
multi =
Ecto.Multi.run(multi, {:event, index}, fn _repo, _changes ->
IO.inspect(event.data, label: "index-#{index}")
# IO.inspect(event.data, label: "index-#{index}")
Repo.insert_all("projection_proj2", [%{id: index, data: "OK"}])
{:ok, 1}
end)
Expand Down
5 changes: 5 additions & 0 deletions lib/sample/start_sample.ex
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,11 @@ defmodule Sample.StartSample do
)
end

def reset do
Essig.Projections.Runner.reset(Sample.Projections.Proj1)
Essig.Projections.Runner.reset(Sample.Projections.Proj2)
end

def add_events do
## insert events
events = [
Expand Down
Loading