Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat: Pubsub system to update Casts #1

Merged
merged 40 commits into from
Sep 8, 2024
Merged

Conversation

mindreframer
Copy link
Member

No description provided.

Problem: 
- It seems to be quite non-trivial to process events that are causally dependent. 
- We can get sequential IDs, where records with lower IDs can become visible later than records with higher IDs
- We can get transaction IDs, that where lower values become visible later
- To workaround those issues we would need to write some tricky code and we need to maintain it and write tests for it


Solution: 
- after some contemplation I decided to go for a stupid workaround
- we only allow sequential inserts to the EventStore (events) across all Elixir processes
- that way the IDs must be only increasing and there should be no interleaving
- that way the logic to deal with new events becomes trivial, since we should never see newer events with lower IDs

Technical realization: 
- we use Postgres `pg_try_advisory_lock` and `pg_advisory_unlock`
- this allows coordination across all Elixir OS processes of our app, that use the same DB at the same time. 
- also the locks respect the current Context scope, so that different scopes DO not conflict with each other. 
- to allow consistent lock unlocking, we need to run the unlocking with the same Postgres DB connection, as the locking
- normal Ecto Repo does not enable this functionality
- as a workaround we configure SandRepo, that has the sole purpose of requiring and releasing PG locks
- event appending to our Event store is only possible through a locked function
Problem: 
- the sandbox config has some test-specific params, like timing out, cleaning up the conn, etc. 
- this prevents proper usage

Solution: 
- we just stick to a normal repo with a pool of size=1, that way we always get the same connection
- this is only use for locking, so there should be no performance issues
@mindreframer mindreframer force-pushed the feat/event_broadcaster branch from f94c52c to 214ffda Compare September 3, 2024 22:36
…napshot) to events

Problem: 
- we might need to extend features for our event store
- to reduce chatiness (notification per event), we need to store the current transaction id


Solution: 
- we add current PG transaction id + min current LSN to every document via triggers
- based on the example here - https://github.com/josevalim/sync/blob/main/priv/repo/migrations/20240806131210_create_publication.exs
Problem: 
- we somehow need to make sure, that our app notices new inserted events
- a local Pubsub would miss out on inserts from other app instances (iex shells / etc)
- also we do not want to overload our system by sending large messages (many events in a single message)


Solution: 
- we add a signals tables (somewhat inspired by Debezium - https://debezium.io/blog/2023/06/27/Debezium-signaling-and-notifications/) 
- after each transaction on events table we also insert a small row into signals table with the current scope_uuid
- it has configured triggers, that execute a `pg_notify` function with small message
- in our app we start PGNotifyListener, whose purpose is to re-broadcast those pg_notify messages to the local PubSub system
- that way our app still deals with Phx PubSub, only those messages originate from DB and work across OS process boundaries without a distributed Erlang cluster
…out work duplication

Problem: 
- we need to fetch the same data from the DB at the same time from all the cast runner processes
- this could easily overload the DB due to the "thundering herd" effect


Solution: 
- implement a caching layer for the event store
- this caching layer holds all in-flight requests except the first one (for the value generation) and gives them the cached value after it was generated
- current module is a draft for the concept, with fake work
…guments) tuple

Problem: 
- we need a flexible way to tell the cache how to fill cache values
- hardcoding logic feels inflexible

Solution: 
- by using MFA (module / function / arguments) this cache becomes generic
- this can be used for any kind of caching and is extremely versatile
- now it became a generic caching layer, without other dependencies, useful for ANYTHING. ;)
Problem: 
- we would like to prevent our cache from growing indefinitely
- for this we need to remember, when a key was used

Solution: 
- store the timestamp for each key
Problem: 
- after getting the "poke" from the signals table, we currently know the transaction id and scope / stream uuids, but we still have issue an extra query, to figure out how many new events we need to fetch



Solution: 
- the count of new events + max_id can be easily derived from the signals table, so we add new columns and send those values in the append_to_stream function.
@mindreframer
Copy link
Member Author

There are already quite some changes in this PR, and there is enough noise through renames and similar. The foundation is good, next changes will happen in a different PR.

@mindreframer mindreframer merged commit be97aeb into main Sep 8, 2024
1 check passed
@mindreframer mindreframer deleted the feat/event_broadcaster branch September 8, 2024 20:17
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant