Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FEATURE: Subscription Engine #5321

Draft
wants to merge 85 commits into
base: 9.0
Choose a base branch
from

Conversation

bwaidelich
Copy link
Member

@bwaidelich bwaidelich commented Oct 24, 2024

Related: #4746

Fixes:

  • Attempting to write to the content repository from two threads previously would cause likely an exception. This is more likely to happen when two importers run concurrently. A test (see ParallelWritingInWorkspaces) was added which fails in 9.0. > Failed to acquire checkpoint lock for subscriber "Neos\ContentGraph\DoctrineDbalAdapter\DoctrineDbalContentGraphProjection" because it is acquired already

TODO

  • SubscriptionEngine::reset() (i.e. projection replay)
  • Tests
    • spying testing catchup is correctly applied for example
    • parallel tests to ensure subscription locking works
  • Fine tuning, inline docs
  • Credit patchlevel (are inline doc comments for all classes underneath Neos\ContentRepository\Core\Subscription enough?)
  • Optional: Allow to target specific groups/subscriptions via CLI commands (the SubscriptionEngine already supports this)

Related: #4746
@github-actions github-actions bot added the 9.0 label Oct 24, 2024
@bwaidelich bwaidelich changed the title WIP CatchUp Subscription Engine Oct 24, 2024
@bwaidelich bwaidelich changed the title CatchUp Subscription Engine WIP: CatchUp Subscription Engine Oct 24, 2024
@mhsdesign
Copy link
Member

I had a quick look and id love to help with the wiring and sticking it all together :) We should probably really get a simple draft running of the catchups just getting their projection state first :) As that will require some work already

mhsdesign added a commit to mhsdesign/neos-development-collection that referenced this pull request Oct 27, 2024
…n they are registered for

A catchup doesn't have access to the full content repository, as it would allow full recursion via handle and accessing other projections
state is not safe as the other projection might not be behind - the order is undefined.

This will make it possible to catchup projections from outside of the cr instance as proposed here: neos#5321
mhsdesign added a commit to mhsdesign/neos-development-collection that referenced this pull request Nov 2, 2024
…n they are registered for

A catchup doesn't have access to the full content repository, as it would allow full recursion via handle and accessing other projections
state is not safe as the other projection might not be behind - the order is undefined.

This will make it possible to catchup projections from outside of the cr instance as proposed here: neos#5321
@mhsdesign
Copy link
Member

Okay i thought further about our little content graph projection vs projection states vs event handlers dilemma and i think the solution is not necessary exposing just the $projection on ProjectionEventHandler ... as that would also undo the explicit graph projection wiring from #5272 but instead pass a "bigger" object than "just" the Subscribers into the content repository factory.

in my eyes this object, being build by the content repository registry in some way, must reflect

  • what is guaranteed to be the ContentGraphProjectionInterface and its state ContentGraphReadModelInterface, not part of some collection but distinct
  • what are the additional projection states
  • what are all the subscribers (with their catchup-hooks) that are just directly passed to the subscription engine without doing anything further with it. (e.g. no public accessors on the ProjectionEventHandler for anything)

that could look like:

final readonly class ContentRepositoryGraphProjectionAndSubscribers
{
    public function __construct(
        public ContentGraphProjectionInterface $contentGraphProjection,
        public Subscribers $subscribers, // must contain a subscriber for the $contentGraphProjection
        public ProjectionStates $additionalProjectionStates, // must not contain the $contentGraphProjection state
    ) {
    }
}

or maybe a little more explicit so the factories dont have to deal with all the logic and we have control over the subscription ids:

final readonly class ProjectionsAndCatchupHooksBetterVersionZwo
{
    public function __construct(
        public ContentGraphProjectionInterface $contentGraphProjection,
        private Projections $additionalProjections,
        private Subscribers $additionalSubscriber,
        private array $catchUpHooksByProjectionClass
    ) {
    }

    public function getSubscribers(): Subscribers
    {
        $subscribers = iterator_to_array($this->additionalSubscriber);

        $subscribers[] = new Subscriber(
            SubscriptionId::fromString('contentGraphProjection'),
            SubscriptionGroup::fromString('default'),
            RunMode::FROM_BEGINNING,
            new ProjectionEventHandler(
                $this->contentGraphProjection,
                $this->getCatchUpHooksForProjectionClass(ContentGraphProjectionInterface::class),
            ),
        );
        
        foreach ($this->additionalProjections as $projection) {
            $subscribers[] = new Subscriber(
                SubscriptionId::fromString(substr(strrchr($projection::class, '\\'), 1)),
                SubscriptionGroup::fromString('default'),
                RunMode::FROM_BEGINNING,
                new ProjectionEventHandler(
                    $projection,
                    $this->getCatchUpHooksForProjectionClass($projection::class),
                ),
            );
        }
        
        return Subscribers::fromArray($subscribers);
    }
    
    public function getAdditionalProjectionStates(): ProjectionStates
    {
        return ProjectionStates::fromArray(array_map(
            fn ($projection) => $projection->getState(),
            iterator_to_array($this->additionalProjections)
        ));
    }

    private function getCatchUpHooksForProjectionClass(string $projectionClass): ?CatchUpHookInterface
    {
        return $this->catchUpHooksByProjectionClass[$projectionClass] ?? null;
    }
}

but for things that will belong to the future ProjectionService like, replayProjection, replayAllProjections, resetAllProjections we might still need to expose all projections here, unless the subscription engine will lear that itself: $this->subscriptionEngine->reset()

@bwaidelich
Copy link
Member Author

@mhsdesign thanks for your input!
Ands in a class name always make me suspicious..

That's my current draft of the ContentRepositoryFactory constructor:

public function __construct(
    private readonly ContentRepositoryId $contentRepositoryId,
    EventStoreInterface $eventStore,
    NodeTypeManager $nodeTypeManager,
    ContentDimensionSourceInterface $contentDimensionSource,
    Serializer $propertySerializer,
    private readonly UserIdProviderInterface $userIdProvider,
    private readonly ClockInterface $clock,
    SubscriptionStoreInterface $subscriptionStore,
    ContentGraphProjectionFactoryInterface $contentGraphProjectionFactory,
    CatchUpHookFactoryInterface $contentGraphCatchUpHookFactory,
    private readonly ContentRepositorySubscribersFactoryInterface $additionalSubscribersFactory,
) {
// ...
}

@mhsdesign
Copy link
Member

As discussed that looks good ❤️ my idea had a flaw because i assumed the projection instance could be build by the registry which it CANNOT because we need factory dependencies.... and the thing with iterating over the event handlers to fetch their state via getState or something is weird but okay in as that projectionState is now a little tunnel through space as well :) So definitely okay to do that little quirk.

neos-bot pushed a commit to neos/contentrepository-core that referenced this pull request Nov 4, 2024
…n they are registered for

A catchup doesn't have access to the full content repository, as it would allow full recursion via handle and accessing other projections
state is not safe as the other projection might not be behind - the order is undefined.

This will make it possible to catchup projections from outside of the cr instance as proposed here: neos/neos-development-collection#5321
neos-bot pushed a commit to neos/contentrepositoryregistry that referenced this pull request Nov 4, 2024
…n they are registered for

A catchup doesn't have access to the full content repository, as it would allow full recursion via handle and accessing other projections
state is not safe as the other projection might not be behind - the order is undefined.

This will make it possible to catchup projections from outside of the cr instance as proposed here: neos/neos-development-collection#5321
neos-bot pushed a commit to neos/neos that referenced this pull request Nov 4, 2024
…n they are registered for

A catchup doesn't have access to the full content repository, as it would allow full recursion via handle and accessing other projections
state is not safe as the other projection might not be behind - the order is undefined.

This will make it possible to catchup projections from outside of the cr instance as proposed here: neos/neos-development-collection#5321
# Conflicts:
#	Neos.ContentRepository.Core/Classes/ContentRepository.php
#	Neos.ContentRepository.Core/Classes/Factory/ContentRepositoryFactory.php
#	Neos.ContentRepositoryRegistry/Classes/ContentRepositoryRegistry.php
#	Neos.ContentRepositoryRegistry/Classes/Service/ProjectionService.php
#	Neos.ContentRepositoryRegistry/Classes/Service/ProjectionServiceFactory.php
#	phpstan-baseline.neon
No retry is simpler at first and its unlikely that a projection will fix itself.
#5321 (comment)

> Anyways, I think with the removed retry strategy we should just get rid of the automatic retry altogether right now.
  It's quite unlikely that a retry suddenly works without other changes. So I'd be fully OK if it was only possible to manually retry failed subscriptions for 9.0
> there's a mismatch between variable name "additionalProjectionsFactory" and type "ContentRepositorySubsciberFactory"
> the distinction between projection and subscription makes sense even if the only supported type of subscription target projections (for now)

#5375 (review)
which lead to it being invoked on aktive

>  Subscriber "Vendor.Package:FakeProjection" could not invoke onBeforeCatchUp: Subscriber with the subscription id "Vendor.Package:FakeProjection" not found.
…cription-pr

TASK: Radical cleanup for subscription pr to simplify and get into 9.0
Also allows the `Vendor.Package:FakeCatchupHook` to be picked up for testing
Readded assertions as retry was removed for now ... and should probably NOT do anything if setup is called!
…rationIsRemoved

> InvalidArgumentException: Subscriber with the subscription id "Vendor.Package:FakeProjection" not found.

The subscriber is detached, so the state is not calculate-able at this point!
…terCatchUp failes

as we consider it a critical developer error

For `onBeforeCatchUp` we could probably wrap a savepoint and roll it back and skip also the projection, but errors in `onAfterCatchUp` would then analog also need to rollback only the one projection where it was registered. This is not possible and too complex.

see \Neos\ContentRepository\BehavioralTests\Tests\Functional\Subscription\CatchUpHookErrorTest::error_onBeforeCatchUp_abortsCatchup
…ailure

https://neos-project.slack.com/archives/C04PYL8H3/p1732318989845619

We dont want to rollback the main transaction, as other projections still need to be processed, the previously working events need to be applied, and we want to set the ERROR state of the projection
To reduce additional sql query and lock, and do it in the main transaction
We do not expect any changes during runtime. Setup and status should handle this case.
@mhsdesign
Copy link
Member

mhsdesign commented Nov 23, 2024

a little update from my part:

i spend a lot of time first getting a little bit of testcoverage before i trusted myself to do any core changes :D
So right now i have everything i want i think including a spying fake projection to check which events were applied and assert events are only applied once, as well as the capabilities to check how catchups work and especially how does everything behave in case of a projection or catchup error.

  • Test for CatchUpHookError
  • Test for CatchUpHook
  • Test for ProjectionError
  • Test for SubscriptionActiveStatus
  • Test for SubscriptionBootingStatus
  • Test for SubscriptionDetachedStatus
  • Test for SubscriptionGetStatus
  • Test for SubscriptionNewStatus
  • Test for SubscriptionReset
  • Test for SubscriptionSetup

bugs i discovered and fixed for the tests:


Deliberate behaviour changes / simplifications:


introduction of savepoints

The previous checkpoint implementation guaranteed an exactly once delivery. (implemented by using a transaction for each event for each projection in a for update lock)

We still need this rollback behaviour: If a catchup fails on the content graph projection, it means the content graph projection is in error state but has the event applied and would get it after repairing the catchup again, that will not work for the projection.
And the rollback also greatly simplifies to reason about the catchup hooks and if they have to deal with an at least once delivery or now.
Also the case where a projection is really broken it will not be trivial to fix if the state is possibly already partially messed up.

Currently the subscription engine uses a big transaction for all projections and to update the subscriptions at the end. We cannot roll that back. So we want the following things:

  • a for update lock in ONE transaction (allows to terminate the cli process via CTRL+C without any locking artefacts)
  • fetching all subscribers at once instead of one by one
  • adding an error state if the projection failed and rollback, but keep not failed events on that projection

i implemented this with savepoints which would look like:

BEGIN
SELECT subscriptions FOR UPDATE ...

SAVEPOINT CATCHUP
INSERT INTO projection_A (event1)
RELEASE SAVEPOINT CATCHUP

UPDATE subscriptions ...
COMMIT

the exactly once delivery is well tested with my above tests.

and looks like this on error:

BEGIN
SELECT subscriptions FOR UPDATE ...

SAVEPOINT CATCHUP
INSERT INTO projection_A (event1)
ROLLBACK TO SAVEPOINT CATCHUP

UPDATE subscriptions ... // update the error state
COMMIT

Regarding onBeforeCatchUp and onAfterCatchUp, we cannot wrap these into a savepoint but will just throw hard and consider this a not forgive-able developer error. The whole transaction will be rolled back.: TASK: Throw CatchUpFailed exception in case onBeforeCatchUp or onAfterCatchUp failes

followup: inconsistency of projection transaction vs subscription transaction.

In this commit TASK: Use save points to rollback projections during transaction on failure i added methods on the subscription store to set the savepoints. This is not entirely correct as there is no rule that a projection and its store have to share the same dbal instance or even connection nor database.

In the current state of 9.0 this is solved by using the pattern of a checkpoint table per projection, which would live in the same database and transaction.
One central store cannot offer that flexibility. In case we want to support that each projection can live in their own database we probably need a solution like this to attach the subscription store to the subscriber: #5377 and on catchup iterate over all stores (deduplicated) and their subscribers. That works but brings some new complexity for wiring and also for the detached state.

Instead for now it might be sensible to add a transactional function to the projection where we use savepoints to emulate nested transactions. In case that no transaction is open, we can throw an exception stating that its not possible yet to decouple projection from store:

public function transactional(\Closure $fn): mixed
{
    if ($this->dbal->isTransactionActive() === false) {
        throw new \RuntimeException(sprintf('Cannot set savepoint for projection, because no transaction is active. SubscriptionEngine and Projection must use the same database connection.'));
    }
    $this->dbal->createSavepoint('PROJECTION');
    try {
        $result = $fn();
    } catch (\Throwable $e) {
        $this->dbal->rollbackSavepoint('PROJECTION');
        throw $e;
    }
    $this->dbal->releaseSavepoint('PROJECTION');
    return $result;
}

mutable SubscriptionManager

regarding the mutable vs immutable subscription updating, i have a wip local which does simplify this.

I'm happy if you can simplify this ofc but it's important that the subscription states only get persisted once just befor the transaction is comitted.

as far as i can see its only important to reduce sql statements, but for the result it should not matter. In my case, marking the projection as failure happens directly, while the other active and position updates happen at the end.

@mhsdesign
Copy link
Member

@kitsunet and me agreed on the following:

In order to get the basis of the change in we should keep the locking behaviour as it is now (meaning one transaction for each event for each projection) as this will allow discussing behaviour changes in a separate pull request and also if a simplification to reduce transactions brings really any performance. So this pr would only introduce the central registration (which allows to fetch the lowest position in one query) and leave us with the performance gain of only having to load the events one and not n times on replay.

That means each projection will receive a function which either uses save points to emulate nested transactions, or if its another connection or database uses an own transaction:

public function transactional(\Closure $fn): mixed
{
    if ($this->dbal->isTransactionActive() === false) {
        return $this->dbal->transactional($fn);
    }
    $this->dbal->createSavepoint('PROJECTION');
    try {
        $result = $fn();
    } catch (\Throwable $e) {
        $this->dbal->rollbackSavepoint('PROJECTION');
        throw $e;
    }
    $this->dbal->releaseSavepoint('PROJECTION');
    return $result;
}

Our reasons for still replying so hard on exactly once delivery are that our projections simply are not build to ignore events twice. Until we fully agree that this is a good idea, and also find solutions to do the same for catchup hooks, and have test that prove that at least once delivery is okay in most cases, it would be not a good idea to introduce a change here before done the other.

future followups

  • experiment with a write table lock (which is on connection basis, meaning it should be unlocked if the process crashes) that would allow us to: Lock, fetch lowest position, open transaction (without for update) and commit.
  • rediscuss and test if only using one transaction is dramatically faster which could lead us to decide to not support projections in other databases
  • rethink FOR UPDATE NOAWAIT locking, as two importers would shoot each other down: will be part of WIP: Remove NOWAIT? #5376

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants