Skip to content

Commit

Permalink
Merge pull request #6 from maxohq/feat/less-noisy-logs-for-projections
Browse files Browse the repository at this point in the history
Feat/less noisy logs for projections
  • Loading branch information
mindreframer authored Dec 27, 2024
2 parents e4987ee + d68a317 commit 4b24972
Show file tree
Hide file tree
Showing 7 changed files with 46 additions and 25 deletions.
17 changes: 17 additions & 0 deletions lib/projections/config.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
defmodule Essig.Projections.Config do
@modules [Essig.Projections.Runner, Essig.Projections.Runner.ReadFromEventStore]

def set_log_debug() do
set_log_level(:debug)
end

def set_log_info() do
set_log_level(:info)
end

def set_log_level(level) do
for module <- @modules do
Logger.put_module_level(module, level)
end
end
end
24 changes: 12 additions & 12 deletions lib/projections/runner.ex
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ defmodule Essig.Projections.Runner do
require Logger

alias Essig.Projections.Data
alias Projections.Runner.Common
alias Essig.Projections.Runner.Common

# Client API

Expand Down Expand Up @@ -72,7 +72,7 @@ defmodule Essig.Projections.Runner do
@impl true
def init(%{name: name, pause_ms: pause_ms, module: module} = data) do
scope_uuid = Essig.Context.current_scope()
info(data, "Init with pause_ms #{pause_ms}")
debug(data, "Init with pause_ms #{pause_ms}")
row = fetch_last_record(name)
store_max_id = Essig.EventStoreReads.last_id(scope_uuid)

Expand Down Expand Up @@ -104,14 +104,14 @@ defmodule Essig.Projections.Runner do
end

def handle_event({:call, from}, {:set_pause_ms, pause_ms}, state, data) do
info(data, "set_pause_ms - #{state} - #{pause_ms}")
debug(data, "set_pause_ms - #{state} - #{pause_ms}")

actions = [{:reply, from, :ok}, {:state_timeout, pause_ms, :paused}]
{:keep_state, %Data{data | pause_ms: pause_ms}, actions}
end

def handle_event({:call, from}, :pause, state, data) do
info(data, "pause - #{state}")
debug(data, "pause - #{state}")

{:keep_state_and_data,
[
Expand All @@ -123,12 +123,12 @@ defmodule Essig.Projections.Runner do
end

def handle_event({:call, from}, :resume, state, data) do
info(data, "resume - #{state}")
debug(data, "resume - #{state}")
{:keep_state_and_data, [{:reply, from, :ok}, {:next_event, :internal, :resume}]}
end

def handle_event({:call, from}, :reset, state, data) do
info(data, "reset - #{state}")
debug(data, "reset - #{state}")

# 1. call the projection-specific logic
data.module.handle_reset(data)
Expand All @@ -153,14 +153,14 @@ defmodule Essig.Projections.Runner do
########### `internal` EVENTS handlers

def handle_event(:internal, :init_storage, :bootstrap, data = %Data{}) do
info(data, "init_storage - #{:bootstrap}")
debug(data, "init_storage - #{:bootstrap}")
data.module.handle_init_storage(data)
:keep_state_and_data
end

def handle_event(:internal, :load_from_eventstore, state, data = %Data{}) do
info(data, "load_from_eventstore - #{state}")
Projections.Runner.ReadFromEventStore.run(data)
debug(data, "load_from_eventstore - #{state}")
Essig.Projections.Runner.ReadFromEventStore.run(data)
end

# resume reading, pause timeout triggered
Expand All @@ -184,7 +184,7 @@ defmodule Essig.Projections.Runner do

def handle_event(:info, {:new_events, notification}, state, data)
when state in [:bootstrap, :idle] do
info(data, "new_events - #{state}")
debug(data, "new_events - #{state}")
## we get a notification from the pubsub, that there are new events
%{max_id: max_id} = notification

Expand Down Expand Up @@ -213,7 +213,7 @@ defmodule Essig.Projections.Runner do
end
end

def info(data, msg) do
Logger.info("Projections.Runner-> #{inspect(data.name)}: #{msg}")
def debug(data, msg) do
Logger.debug("Projections.Runner-> #{inspect(data.name)}: #{msg}")
end
end
2 changes: 1 addition & 1 deletion lib/projections/runner/common.ex
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
defmodule Projections.Runner.Common do
defmodule Essig.Projections.Runner.Common do
alias Essig.Projections.Data

def fetch_events(scope_uuid, max_id, amount) do
Expand Down
19 changes: 9 additions & 10 deletions lib/projections/runner/read_from_event_store.ex
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
defmodule Projections.Runner.ReadFromEventStore do
defmodule Essig.Projections.Runner.ReadFromEventStore do
alias Essig.Projections.Data
alias Projections.Runner.Common
alias Essig.Projections.Runner.Common
require Logger

def run(data = %Data{row: row, pause_ms: pause_ms, store_max_id: store_max_id} = data) do
IO.inspect(data, label: "ReadFromEventStore - data")
scope_uuid = Essig.Context.current_scope()
events = Common.fetch_events(scope_uuid, row.max_id, Essig.Config.events_per_batch())
multi_tuple = {Ecto.Multi.new(), data}
Expand All @@ -17,12 +16,12 @@ defmodule Projections.Runner.ReadFromEventStore do
if length(events) > 0 do
last_event = List.last(events)

info(data, "CURRENT MAX ID #{last_event.id}")
debug(data, "CURRENT MAX ID #{last_event.id}")
# not sure, what to do with response. BUT: projections MUST NEVER fail.
{:ok, _multi_results} = Essig.Repo.transaction(multi) |> IO.inspect()
{:ok, _multi_results} = Essig.Repo.transaction(multi)

if last_event.id != store_max_id do
info(data, "paused for #{pause_ms}ms...")
debug(data, "paused for #{pause_ms}ms...")

row =
Common.update_external_state(data, row, %{max_id: last_event.id, seq: last_event.seq})
Expand All @@ -36,7 +35,7 @@ defmodule Projections.Runner.ReadFromEventStore do
{:keep_state, %Data{data | row: row}, actions}
else
# finished...
info(data, "finished")
debug(data, "finished")

row =
Common.update_external_state(data, row, %{
Expand All @@ -48,13 +47,13 @@ defmodule Projections.Runner.ReadFromEventStore do
{:next_state, :idle, %Data{data | row: row}}
end
else
info(data, "EMPTY EVENTS")
debug(data, "EMPTY EVENTS")
row = Common.update_external_state(data, row, %{status: :idle})
{:next_state, :idle, %Data{data | row: row}}
end
end

def info(data, msg) do
Logger.info("Projection #{inspect(data.name)}: #{msg}")
def debug(data, msg) do
Logger.debug("Projection #{inspect(data.name)}: #{msg}")
end
end
2 changes: 1 addition & 1 deletion lib/sample/projections/proj1.ex
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ defmodule Sample.Projections.Proj1 do
def handle_event(multi, data = %Data{}, {event, index}) do
multi =
Ecto.Multi.run(multi, {:event, index}, fn _repo, _changes ->
IO.inspect(event.data, label: "index-#{index}")
# IO.inspect(event.data, label: "index-#{index}")
Repo.insert_all("projection_proj1", [%{id: index, data: "OK"}])
{:ok, 1}
end)
Expand Down
2 changes: 1 addition & 1 deletion lib/sample/projections/proj2.ex
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ defmodule Sample.Projections.Proj2 do
def handle_event(multi, data = %Data{}, {event, index}) do
multi =
Ecto.Multi.run(multi, {:event, index}, fn _repo, _changes ->
IO.inspect(event.data, label: "index-#{index}")
# IO.inspect(event.data, label: "index-#{index}")
Repo.insert_all("projection_proj2", [%{id: index, data: "OK"}])
{:ok, 1}
end)
Expand Down
5 changes: 5 additions & 0 deletions lib/sample/start_sample.ex
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,11 @@ defmodule Sample.StartSample do
)
end

def reset do
Essig.Projections.Runner.reset(Sample.Projections.Proj1)
Essig.Projections.Runner.reset(Sample.Projections.Proj2)
end

def add_events do
## insert events
events = [
Expand Down

0 comments on commit 4b24972

Please sign in to comment.