diff --git a/Neos.ContentRepository.Core/Classes/Subscription/Engine/SubscriptionEngine.php b/Neos.ContentRepository.Core/Classes/Subscription/Engine/SubscriptionEngine.php index 93dd501e90..a4b66dd767 100644 --- a/Neos.ContentRepository.Core/Classes/Subscription/Engine/SubscriptionEngine.php +++ b/Neos.ContentRepository.Core/Classes/Subscription/Engine/SubscriptionEngine.php @@ -58,7 +58,7 @@ public function setup(SubscriptionEngineCriteria|null $criteria = null): Result $this->subscriptionStore->setup(); $this->discoverNewSubscriptions(); - $subscriptions = $this->subscriptionStore->findByCriteriaForUpdate(SubscriptionCriteria::forEngineCriteriaAndStatus($criteria, SubscriptionStatusFilter::fromArray([ + $subscriptions = $this->subscriptionStore->findByCriteria(SubscriptionCriteria::forEngineCriteriaAndStatus($criteria, SubscriptionStatusFilter::fromArray([ SubscriptionStatus::NEW, SubscriptionStatus::BOOTING, SubscriptionStatus::ACTIVE @@ -80,45 +80,45 @@ public function setup(SubscriptionEngineCriteria|null $criteria = null): Result public function boot(SubscriptionEngineCriteria|null $criteria = null, \Closure $progressCallback = null): ProcessedResult { $criteria ??= SubscriptionEngineCriteria::noConstraints(); - return $this->processExclusively(fn () => $this->subscriptionStore->transactional( + return $this->processExclusively( function () use ($criteria, $progressCallback) { $this->logger?->info('Subscription Engine: Start catching up subscriptions in state "BOOTING".'); - $subscriptionsToCatchup = $this->subscriptionStore->findByCriteriaForUpdate( + $subscriptionsToCatchup = $this->subscriptionStore->findByCriteria( SubscriptionCriteria::forEngineCriteriaAndStatus($criteria, SubscriptionStatus::BOOTING) ); return $this->catchUpSubscriptions($subscriptionsToCatchup, $progressCallback); - }) + } ); } public function catchUpActive(SubscriptionEngineCriteria|null $criteria = null, \Closure $progressCallback = null): ProcessedResult { $criteria ??= SubscriptionEngineCriteria::noConstraints(); - return $this->processExclusively(fn () => $this->subscriptionStore->transactional( + return $this->processExclusively( function () use ($criteria, $progressCallback) { $this->logger?->info('Subscription Engine: Start catching up subscriptions in state "ACTIVE".'); - $subscriptionsToCatchup = $this->subscriptionStore->findByCriteriaForUpdate( + $subscriptionsToCatchup = $this->subscriptionStore->findByCriteria( SubscriptionCriteria::forEngineCriteriaAndStatus($criteria, SubscriptionStatus::ACTIVE) ); return $this->catchUpSubscriptions($subscriptionsToCatchup, $progressCallback); - }) + } ); } public function reactivate(SubscriptionEngineCriteria|null $criteria = null, \Closure $progressCallback = null): ProcessedResult { $criteria ??= SubscriptionEngineCriteria::noConstraints(); - return $this->processExclusively(fn () => $this->subscriptionStore->transactional( + return $this->processExclusively( function () use ($criteria, $progressCallback) { $this->logger?->info('Subscription Engine: Start catching up subscriptions in state "ACTIVE".'); - $subscriptionsToCatchup = $this->subscriptionStore->findByCriteriaForUpdate( + $subscriptionsToCatchup = $this->subscriptionStore->findByCriteria( SubscriptionCriteria::forEngineCriteriaAndStatus($criteria, SubscriptionStatusFilter::fromArray([ SubscriptionStatus::ERROR, SubscriptionStatus::DETACHED, ])) ); return $this->catchUpSubscriptions($subscriptionsToCatchup, $progressCallback); - }) + } ); } @@ -127,7 +127,7 @@ public function reset(SubscriptionEngineCriteria|null $criteria = null): Result $criteria ??= SubscriptionEngineCriteria::noConstraints(); $this->logger?->info('Subscription Engine: Start to reset.'); - $subscriptions = $this->subscriptionStore->findByCriteriaForUpdate(SubscriptionCriteria::forEngineCriteriaAndStatus($criteria, SubscriptionStatusFilter::any())); + $subscriptions = $this->subscriptionStore->findByCriteria(SubscriptionCriteria::forEngineCriteriaAndStatus($criteria, SubscriptionStatusFilter::any())); if ($subscriptions->isEmpty()) { $this->logger?->info('Subscription Engine: No subscriptions to reset.'); return Result::success(); @@ -146,7 +146,7 @@ public function subscriptionStatus(SubscriptionEngineCriteria|null $criteria = n { $statuses = []; try { - $subscriptions = $this->subscriptionStore->findByCriteriaForUpdate(SubscriptionCriteria::create(ids: $criteria?->ids)); + $subscriptions = $this->subscriptionStore->findByCriteria(SubscriptionCriteria::create(ids: $criteria?->ids)); } catch (TableNotFoundException) { // the schema is not setup - thus there are no subscribers return SubscriptionStatusCollection::createEmpty(); @@ -210,7 +210,7 @@ private function handleEvent(EventEnvelope $eventEnvelope, EventInterface $domai */ private function discoverNewSubscriptions(): void { - $subscriptions = $this->subscriptionStore->findByCriteriaForUpdate(SubscriptionCriteria::noConstraints()); + $subscriptions = $this->subscriptionStore->findByCriteria(SubscriptionCriteria::noConstraints()); foreach ($this->subscribers as $subscriber) { if ($subscriptions->contain($subscriber->id)) { continue; @@ -296,6 +296,16 @@ private function resetSubscription(Subscription $subscription): ?Error private function catchUpSubscriptions(Subscriptions $subscriptionsToCatchup, \Closure $progressClosure = null): ProcessedResult { + + // problems: + // one the onBeforeCatchUp / onAfterCatchUp hooks will be more difficult and less obvious to invoke + // we cannot detach old subscriptions as we loop over the subscribers + // we have to fetch the subscriber and then skip its posisiont if its ahead + // we cannot abort easily if there ar no matching subscribers? + // if abortsCatchupAndRollBack fails we cannot abort transaction!!! -> but this is also a problem if we 'd start batching again.... + // for batches before and after???? + + foreach ($subscriptionsToCatchup as $subscription) { if (!$this->subscribers->contain($subscription->id)) { // mark detached subscriptions as we cannot handle them and exclude them from catchup @@ -349,39 +359,67 @@ private function catchUpSubscriptions(Subscriptions $subscriptionsToCatchup, \Cl $this->logger?->debug(sprintf('Subscription Engine: Subscription "%s" is farther than the current position (%d >= %d), continue catch up.', $subscription->id->value, $subscription->position->value, $sequenceNumber->value)); continue; } - $error = $this->handleEvent($eventEnvelope, $domainEvent, $subscription->id); - if ($error !== null) { - // ERROR Case: - // 1.) for the leftover events we are not including this failed subscription for catchup - $subscriptionsToCatchup = $subscriptionsToCatchup->without($subscription->id); - // 2.) update the subscription error state on either its unchanged or new position (if some events worked) + $this->subscriptionStore->transactional(function () use ($eventEnvelope, $domainEvent, $subscription, $numberOfProcessedEvents, &$subscriptionsToCatchup, &$errors, &$highestSequenceNumberForSubscriber) { + $error = $this->handleEvent($eventEnvelope, $domainEvent, $subscription->id); + if ($error !== null) { + // ERROR Case: + // 1.) for the leftover events we are not including this failed subscription for catchup + $subscriptionsToCatchup = $subscriptionsToCatchup->without($subscription->id); + // 2.) update the subscription error state on either its unchanged or new position (if some events worked) + $this->subscriptionStore->update( + $subscription->id, + status: SubscriptionStatus::ERROR, + // todo rather use previous position: + position: $highestSequenceNumberForSubscriber[$subscription->id->value] ?? $subscription->position, + subscriptionError: SubscriptionError::fromPreviousStatusAndException( + $subscription->status, + $error->throwable + ), + ); + // 3.) invoke onAfterCatchUp, as onBeforeCatchUp was invoked already and to be consistent we want to "shutdown" this catchup iteration event though we know it failed + // todo put the ERROR $subscriptionStatus into the after hook, so it can properly be reacted upon + try { + $this->subscribers->get($subscription->id)->onAfterCatchUp(); + } catch (\Throwable $e) { + // analog to onBeforeCatchUp, we tolerate no exceptions here and consider it a critical developer error. + $message = sprintf('Subscriber "%s" had an error and also failed onAfterCatchUp: %s', $subscription->id->value, $e->getMessage()); + $this->logger?->critical($message); + throw new CatchUpFailed($message, 1732733740, $e); + } + $errors[] = $error; + return; + } + + $highestSequenceNumberForSubscriber[$subscription->id->value] = $eventEnvelope->sequenceNumber; + + // after catchup mark all subscriptions as active, so they are triggered automatically now. + // The position will be set to the one the subscriber handled last, or if no events were in the stream, and we booted we keep the persisted position $this->subscriptionStore->update( $subscription->id, - status: SubscriptionStatus::ERROR, - position: $highestSequenceNumberForSubscriber[$subscription->id->value] ?? $subscription->position, - subscriptionError: SubscriptionError::fromPreviousStatusAndException( - $subscription->status, - $error->throwable - ), + status: SubscriptionStatus::ACTIVE, + position: $eventEnvelope->sequenceNumber, + subscriptionError: null, ); - // 3.) invoke onAfterCatchUp, as onBeforeCatchUp was invoked already and to be consistent we want to "shutdown" this catchup iteration event though we know it failed - // todo put the ERROR $subscriptionStatus into the after hook, so it can properly be reacted upon - try { - $this->subscribers->get($subscription->id)->onAfterCatchUp(); - } catch (\Throwable $e) { - // analog to onBeforeCatchUp, we tolerate no exceptions here and consider it a critical developer error. - $message = sprintf('Subscriber "%s" had an error and also failed onAfterCatchUp: %s', $subscription->id->value, $e->getMessage()); - $this->logger?->critical($message); - throw new CatchUpFailed($message, 1732733740, $e); + if ($numberOfProcessedEvents === 0 && $subscription->status !== SubscriptionStatus::ACTIVE) { + $this->logger?->info(sprintf('Subscription Engine: Subscription "%s" has been set to active from %s.', $subscription->id->value, $subscription->status->name)); } - $errors[] = $error; - continue; - } - // HAPPY Case: - $highestSequenceNumberForSubscriber[$subscription->id->value] = $eventEnvelope->sequenceNumber; + }); + } $numberOfProcessedEvents++; } + foreach ($subscriptionsToCatchup as $subscription) { + if ($numberOfProcessedEvents === 0 && $subscription->status !== SubscriptionStatus::ACTIVE) { + $this->subscriptionStore->update( + $subscription->id, + status: SubscriptionStatus::ACTIVE, + position: $subscription->position, + subscriptionError: null, + ); + $this->logger?->info(sprintf('Subscription Engine: Subscription "%s" has been set to active from %s.', $subscription->id->value, $subscription->status->name)); + } + } + foreach ($subscriptionsToCatchup as $subscription) { try { $this->subscribers->get($subscription->id)->onAfterCatchUp(); @@ -391,17 +429,6 @@ private function catchUpSubscriptions(Subscriptions $subscriptionsToCatchup, \Cl $this->logger?->critical($message); throw new CatchUpFailed($message, 1732374000, $e); } - // after catchup mark all subscriptions as active, so they are triggered automatically now. - // The position will be set to the one the subscriber handled last, or if no events were in the stream, and we booted we keep the persisted position - $this->subscriptionStore->update( - $subscription->id, - status: SubscriptionStatus::ACTIVE, - position: $highestSequenceNumberForSubscriber[$subscription->id->value] ?? $subscription->position, - subscriptionError: null, - ); - if ($subscription->status !== SubscriptionStatus::ACTIVE) { - $this->logger?->info(sprintf('Subscription Engine: Subscription "%s" has been set to active after booting.', $subscription->id->value)); - } } $this->logger?->info(sprintf('Subscription Engine: Finish catch up. %d processed events %d errors.', $numberOfProcessedEvents, count($errors))); return $errors === [] ? ProcessedResult::success($numberOfProcessedEvents) : ProcessedResult::failed($numberOfProcessedEvents, Errors::fromArray($errors)); @@ -418,9 +445,11 @@ private function processExclusively(\Closure $closure): mixed throw new SubscriptionEngineAlreadyProcessingException('Subscription engine is already processing', 1732714075); } $this->processing = true; + $this->subscriptionStore->acquireLock(); try { return $closure(); } finally { + $this->subscriptionStore->releaseLock(); $this->processing = false; } } diff --git a/Neos.ContentRepository.Core/Classes/Subscription/Store/SubscriptionStoreInterface.php b/Neos.ContentRepository.Core/Classes/Subscription/Store/SubscriptionStoreInterface.php index b7b0540415..4f2911303f 100644 --- a/Neos.ContentRepository.Core/Classes/Subscription/Store/SubscriptionStoreInterface.php +++ b/Neos.ContentRepository.Core/Classes/Subscription/Store/SubscriptionStoreInterface.php @@ -18,7 +18,7 @@ interface SubscriptionStoreInterface { public function setup(): void; - public function findByCriteriaForUpdate(SubscriptionCriteria $criteria): Subscriptions; + public function findByCriteria(SubscriptionCriteria $criteria): Subscriptions; public function add(Subscription $subscription): void; @@ -29,6 +29,10 @@ public function update( SubscriptionError|null $subscriptionError, ): void; + public function acquireLock(): void; + + public function releaseLock(): void; + /** * @template T * @param \Closure():T $closure diff --git a/Neos.ContentRepositoryRegistry/Classes/Factory/SubscriptionStore/DoctrineSubscriptionStore.php b/Neos.ContentRepositoryRegistry/Classes/Factory/SubscriptionStore/DoctrineSubscriptionStore.php index fe5f038fda..24347dcd74 100644 --- a/Neos.ContentRepositoryRegistry/Classes/Factory/SubscriptionStore/DoctrineSubscriptionStore.php +++ b/Neos.ContentRepositoryRegistry/Classes/Factory/SubscriptionStore/DoctrineSubscriptionStore.php @@ -64,13 +64,12 @@ public function setup(): void } } - public function findByCriteriaForUpdate(SubscriptionCriteria $criteria): Subscriptions + public function findByCriteria(SubscriptionCriteria $criteria): Subscriptions { $queryBuilder = $this->dbal->createQueryBuilder() ->select('*') ->from($this->tableName) ->orderBy('id'); - $queryBuilder->forUpdate(); if ($criteria->ids !== null) { $queryBuilder->andWhere('id IN (:ids)') ->setParameter( @@ -165,6 +164,23 @@ private static function fromDatabase(array $row): Subscription ); } + public function acquireLock(): void + { + // todo fully implement https://github.com/patchlevel/event-sourcing/blob/caaf54fcf32c0e42b1036a5c7ff77c1a37af0105/src/Store/DoctrineDbalStore.php#L456 + $result = $this->dbal->fetchOne(sprintf('SELECT GET_LOCK("%s", %d)', 'default', 0)); + if ($result !== 1) { + throw new \RuntimeException('failed to acquire lock'); + } + } + + public function releaseLock(): void + { + $result = $this->dbal->fetchOne(sprintf('SELECT RELEASE_LOCK("%s")', 'default')); + if ($result !== 1) { + throw new \RuntimeException('failed to release lock'); + } + } + public function transactional(\Closure $closure): mixed { return $this->dbal->transactional($closure);