-
Notifications
You must be signed in to change notification settings - Fork 0
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Feat: Pubsub system to update Casts #1
Merged
Merged
Conversation
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
+ adjust tests
Problem: - It seems to be quite non-trivial to process events that are causally dependent. - We can get sequential IDs, where records with lower IDs can become visible later than records with higher IDs - We can get transaction IDs, that where lower values become visible later - To workaround those issues we would need to write some tricky code and we need to maintain it and write tests for it Solution: - after some contemplation I decided to go for a stupid workaround - we only allow sequential inserts to the EventStore (events) across all Elixir processes - that way the IDs must be only increasing and there should be no interleaving - that way the logic to deal with new events becomes trivial, since we should never see newer events with lower IDs Technical realization: - we use Postgres `pg_try_advisory_lock` and `pg_advisory_unlock` - this allows coordination across all Elixir OS processes of our app, that use the same DB at the same time. - also the locks respect the current Context scope, so that different scopes DO not conflict with each other. - to allow consistent lock unlocking, we need to run the unlocking with the same Postgres DB connection, as the locking - normal Ecto Repo does not enable this functionality - as a workaround we configure SandRepo, that has the sole purpose of requiring and releasing PG locks - event appending to our Event store is only possible through a locked function
Problem: - the sandbox config has some test-specific params, like timing out, cleaning up the conn, etc. - this prevents proper usage Solution: - we just stick to a normal repo with a pool of size=1, that way we always get the same connection - this is only use for locking, so there should be no performance issues
mindreframer
force-pushed
the
feat/event_broadcaster
branch
from
September 3, 2024 22:36
f94c52c
to
214ffda
Compare
…napshot) to events Problem: - we might need to extend features for our event store - to reduce chatiness (notification per event), we need to store the current transaction id Solution: - we add current PG transaction id + min current LSN to every document via triggers - based on the example here - https://github.com/josevalim/sync/blob/main/priv/repo/migrations/20240806131210_create_publication.exs
Problem: - we somehow need to make sure, that our app notices new inserted events - a local Pubsub would miss out on inserts from other app instances (iex shells / etc) - also we do not want to overload our system by sending large messages (many events in a single message) Solution: - we add a signals tables (somewhat inspired by Debezium - https://debezium.io/blog/2023/06/27/Debezium-signaling-and-notifications/) - after each transaction on events table we also insert a small row into signals table with the current scope_uuid - it has configured triggers, that execute a `pg_notify` function with small message - in our app we start PGNotifyListener, whose purpose is to re-broadcast those pg_notify messages to the local PubSub system - that way our app still deals with Phx PubSub, only those messages originate from DB and work across OS process boundaries without a distributed Erlang cluster
…out work duplication Problem: - we need to fetch the same data from the DB at the same time from all the cast runner processes - this could easily overload the DB due to the "thundering herd" effect Solution: - implement a caching layer for the event store - this caching layer holds all in-flight requests except the first one (for the value generation) and gives them the cached value after it was generated - current module is a draft for the concept, with fake work
…guments) tuple Problem: - we need a flexible way to tell the cache how to fill cache values - hardcoding logic feels inflexible Solution: - by using MFA (module / function / arguments) this cache becomes generic - this can be used for any kind of caching and is extremely versatile
- now it became a generic caching layer, without other dependencies, useful for ANYTHING. ;)
Problem: - we would like to prevent our cache from growing indefinitely - for this we need to remember, when a key was used Solution: - store the timestamp for each key
mindreframer
commented
Sep 6, 2024
Problem: - after getting the "poke" from the signals table, we currently know the transaction id and scope / stream uuids, but we still have issue an extra query, to figure out how many new events we need to fetch Solution: - the count of new events + max_id can be easily derived from the signals table, so we add new columns and send those values in the append_to_stream function.
There are already quite some changes in this PR, and there is enough noise through renames and similar. The foundation is good, next changes will happen in a different PR. |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
No description provided.