From 385e78d267de29cfcbb7d16f0f43edfd3e849a5a Mon Sep 17 00:00:00 2001 From: mhsdesign <85400359+mhsdesign@users.noreply.github.com> Date: Sun, 24 Nov 2024 12:09:07 +0100 Subject: [PATCH] FEATURE: Decentral subscription store Allows to run projections in a different database WITH their store to ensure exactly once delivery and that on critical error (memory error) the transaction is not committet and that the projection in the other db is not already ahead. TODO, allow to use actually wire it together via cr registry, That one projection runs in one database WITH its subscription store. --- .../Factory/ContentRepositoryFactory.php | 9 +- .../Factory/ProjectionSubscriberFactory.php | 3 + .../Engine/SubscriptionEngine.php | 280 ++++++++++-------- .../Store/SubscriptionStoreInterface.php | 2 + .../Subscriber/ProjectionSubscriber.php | 4 +- .../Subscription/Subscriber/Subscribers.php | 18 ++ .../Classes/ContentRepositoryRegistry.php | 8 +- .../DoctrineSubscriptionStore.php | 5 + 8 files changed, 193 insertions(+), 136 deletions(-) diff --git a/Neos.ContentRepository.Core/Classes/Factory/ContentRepositoryFactory.php b/Neos.ContentRepository.Core/Classes/Factory/ContentRepositoryFactory.php index ea4593a6a7..b3ed5f8909 100644 --- a/Neos.ContentRepository.Core/Classes/Factory/ContentRepositoryFactory.php +++ b/Neos.ContentRepository.Core/Classes/Factory/ContentRepositoryFactory.php @@ -75,7 +75,7 @@ public function __construct( Serializer $propertySerializer, private readonly AuthProviderFactoryInterface $authProviderFactory, private readonly ClockInterface $clock, - SubscriptionStoreInterface $subscriptionStore, + SubscriptionStoreInterface $contentGraphSubscriptionStore, ContentGraphProjectionFactoryInterface $contentGraphProjectionFactory, private readonly CatchUpHookFactoryInterface|null $contentGraphCatchUpHookFactory, private readonly CommandHooksFactory $commandHooksFactory, @@ -106,14 +106,15 @@ public function __construct( } $this->additionalProjectionStates = ProjectionStates::fromArray($additionalProjectionStates); $this->contentGraphProjection = $contentGraphProjectionFactory->build($this->subscriberFactoryDependencies); - $subscribers[] = $this->buildContentGraphSubscriber(); - $this->subscriptionEngine = new SubscriptionEngine($this->eventStore, $subscriptionStore, Subscribers::fromArray($subscribers), $eventNormalizer, $logger); + $subscribers[] = $this->buildContentGraphSubscriber($contentGraphSubscriptionStore); + $this->subscriptionEngine = new SubscriptionEngine($this->eventStore, Subscribers::fromArray($subscribers), $eventNormalizer, $logger); } - private function buildContentGraphSubscriber(): ProjectionSubscriber + private function buildContentGraphSubscriber(SubscriptionStoreInterface $subscriptionStore): ProjectionSubscriber { return new ProjectionSubscriber( SubscriptionId::fromString('contentGraph'), + $subscriptionStore, $this->contentGraphProjection, $this->contentGraphCatchUpHookFactory?->build(CatchUpHookFactoryDependencies::create( $this->contentRepositoryId, diff --git a/Neos.ContentRepository.Core/Classes/Factory/ProjectionSubscriberFactory.php b/Neos.ContentRepository.Core/Classes/Factory/ProjectionSubscriberFactory.php index 203936237e..8350a3e309 100644 --- a/Neos.ContentRepository.Core/Classes/Factory/ProjectionSubscriberFactory.php +++ b/Neos.ContentRepository.Core/Classes/Factory/ProjectionSubscriberFactory.php @@ -19,6 +19,7 @@ use Neos\ContentRepository\Core\Projection\ProjectionFactoryInterface; use Neos\ContentRepository\Core\Projection\ProjectionInterface; use Neos\ContentRepository\Core\Projection\ProjectionStateInterface; +use Neos\ContentRepository\Core\Subscription\Store\SubscriptionStoreInterface; use Neos\ContentRepository\Core\Subscription\Subscriber\ProjectionSubscriber; use Neos\ContentRepository\Core\Subscription\SubscriptionId; @@ -34,6 +35,7 @@ */ public function __construct( private SubscriptionId $subscriptionId, + private SubscriptionStoreInterface $subscriptionStore, private ProjectionFactoryInterface $projectionFactory, private ?CatchUpHookFactoryInterface $catchUpHookFactory, private array $projectionFactoryOptions, @@ -53,6 +55,7 @@ public function build(SubscriberFactoryDependencies $dependencies): ProjectionSu return new ProjectionSubscriber( $this->subscriptionId, + $this->subscriptionStore, $projection, $catchUpHook, ); diff --git a/Neos.ContentRepository.Core/Classes/Subscription/Engine/SubscriptionEngine.php b/Neos.ContentRepository.Core/Classes/Subscription/Engine/SubscriptionEngine.php index 0ad1265502..3012b2112e 100644 --- a/Neos.ContentRepository.Core/Classes/Subscription/Engine/SubscriptionEngine.php +++ b/Neos.ContentRepository.Core/Classes/Subscription/Engine/SubscriptionEngine.php @@ -19,7 +19,6 @@ use Psr\Log\LoggerInterface; use Neos\ContentRepository\Core\Subscription\SubscriptionStatus; use Neos\ContentRepository\Core\Subscription\Store\SubscriptionCriteria; -use Neos\ContentRepository\Core\Subscription\Store\SubscriptionStoreInterface; use Neos\ContentRepository\Core\Subscription\Subscriber\Subscribers; use Neos\ContentRepository\Core\Subscription\Subscription; use Neos\ContentRepository\Core\Subscription\Subscriptions; @@ -30,16 +29,14 @@ final class SubscriptionEngine { private bool $processing = false; - private readonly SubscriptionManager $subscriptionManager; + private SubscriptionManager $subscriptionManager; // todo inline!! public function __construct( private readonly EventStoreInterface $eventStore, - private readonly SubscriptionStoreInterface $subscriptionStore, private readonly Subscribers $subscribers, private readonly EventNormalizer $eventNormalizer, private readonly LoggerInterface|null $logger = null, ) { - $this->subscriptionManager = new SubscriptionManager($this->subscriptionStore); } public function setup(SubscriptionEngineCriteria|null $criteria = null): Result @@ -48,27 +45,34 @@ public function setup(SubscriptionEngineCriteria|null $criteria = null): Result $this->logger?->info('Subscription Engine: Start to setup.'); - $this->subscriptionStore->setup(); - $this->discoverNewSubscriptions(); - $subscriptions = $this->subscriptionStore->findByCriteria(SubscriptionCriteria::forEngineCriteriaAndStatus($criteria, SubscriptionStatusFilter::fromArray([ - SubscriptionStatus::NEW, - SubscriptionStatus::BOOTING, - SubscriptionStatus::ACTIVE, - SubscriptionStatus::DETACHED, - SubscriptionStatus::ERROR, - ]))); - if ($subscriptions->isEmpty()) { - $this->logger?->info('Subscription Engine: No subscriptions found.'); // todo not happy? Because there must be at least the content graph?!! - return Result::success(); - } + $subscriberGroups = $this->subscribers->groupByStore(); $errors = []; - foreach ($subscriptions as $subscription) { - $error = $this->setupSubscription($subscription); - if ($error !== null) { - $errors[] = $error; + foreach ($subscriberGroups as [$store]) { + $store->setup(); + + $this->subscriptionManager = new SubscriptionManager($store); // todo hack + $this->discoverNewSubscriptions(); + + $subscriptions = $store->findByCriteria(SubscriptionCriteria::forEngineCriteriaAndStatus($criteria, SubscriptionStatusFilter::fromArray([ + SubscriptionStatus::NEW, + SubscriptionStatus::BOOTING, + SubscriptionStatus::ACTIVE, + SubscriptionStatus::DETACHED, + SubscriptionStatus::ERROR, + ]))); + if ($subscriptions->isEmpty()) { + $this->logger?->info('Subscription Engine: No subscriptions found.'); // todo not happy? Because there must be at least the content graph?!! + return Result::success(); } + foreach ($subscriptions as $subscription) { + $error = $this->setupSubscription($subscription); + if ($error !== null) { + $errors[] = $error; + } + } + $this->subscriptionManager->flush(); } - $this->subscriptionManager->flush(); + return $errors === [] ? Result::success() : Result::failed(Errors::fromArray($errors)); } @@ -87,41 +91,50 @@ public function reset(SubscriptionEngineCriteria|null $criteria = null): Result $criteria ??= SubscriptionEngineCriteria::noConstraints(); $this->logger?->info('Subscription Engine: Start to reset.'); - $subscriptions = $this->subscriptionStore->findByCriteria(SubscriptionCriteria::forEngineCriteriaAndStatus($criteria, SubscriptionStatusFilter::any())); - if ($subscriptions->isEmpty()) { - $this->logger?->info('Subscription Engine: No subscriptions to reset.'); - return Result::success(); - } + + $subscriberGroups = $this->subscribers->groupByStore(); $errors = []; - foreach ($subscriptions as $subscription) { - $error = $this->resetSubscription($subscription); - if ($error !== null) { - $errors[] = $error; + foreach ($subscriberGroups as [$store]) { + $subscriptions = $store->findByCriteria(SubscriptionCriteria::forEngineCriteriaAndStatus($criteria, SubscriptionStatusFilter::any())); + $this->subscriptionManager = new SubscriptionManager($store); + if ($subscriptions->isEmpty()) { + $this->logger?->info('Subscription Engine: No subscriptions to reset.'); + return Result::success(); + } + foreach ($subscriptions as $subscription) { + $error = $this->resetSubscription($subscription); + if ($error !== null) { + $errors[] = $error; + } } + $this->subscriptionManager->flush(); } - $this->subscriptionManager->flush(); return $errors === [] ? Result::success() : Result::failed(Errors::fromArray($errors)); } public function subscriptionStatuses(SubscriptionCriteria|null $criteria = null): SubscriptionAndProjectionStatuses { + $subscriberGroups = $this->subscribers->groupByStore(); $statuses = []; - try { - $subscriptions = $this->subscriptionStore->findByCriteria($criteria ?? SubscriptionCriteria::noConstraints()); - } catch (TableNotFoundException) { - // the schema is not setup - thus there are no subscribers - return SubscriptionAndProjectionStatuses::createEmpty(); - } - foreach ($subscriptions as $subscription) { - $subscriber = $this->subscribers->contain($subscription->id) ? $this->subscribers->get($subscription->id) : null; - $statuses[] = SubscriptionAndProjectionStatus::create( - subscriptionId: $subscription->id, - subscriptionStatus: $subscription->status, - subscriptionPosition: $subscription->position, - subscriptionError: $subscription->error, - projectionStatus: $subscriber?->projection->status(), - ); + foreach ($subscriberGroups as [$store]) { + try { + $subscriptions = $store->findByCriteria($criteria ?? SubscriptionCriteria::noConstraints()); + } catch (TableNotFoundException) { + // the schema is not setup - thus there are no subscribers + continue; + } + foreach ($subscriptions as $subscription) { + $subscriber = $this->subscribers->contain($subscription->id) ? $this->subscribers->get($subscription->id) : null; + $statuses[] = SubscriptionAndProjectionStatus::create( + subscriptionId: $subscription->id, + subscriptionStatus: $subscription->status, + subscriptionPosition: $subscription->position, + subscriptionError: $subscription->error, + projectionStatus: $subscriber?->projection->status(), + ); + } } + return SubscriptionAndProjectionStatuses::fromArray($statuses); } @@ -240,101 +253,112 @@ private function catchUpSubscriptions(SubscriptionEngineCriteria $criteria, Subs { $this->logger?->info(sprintf('Subscription Engine: Start catching up subscriptions in state "%s".', $subscriptionStatus->value)); - return $this->subscriptionManager->findForAndUpdate( + $subscriberGroups = $this->subscribers->groupByStore(); + + // todo merge results + $returnResult = null; + + foreach ($subscriberGroups as [$store, $subscribers]) { + // todo do not global override manager! + $this->subscriptionManager = new SubscriptionManager($store); + $returnResult = $this->subscriptionManager->findForAndUpdate( SubscriptionCriteria::forEngineCriteriaAndStatus($criteria, $subscriptionStatus), - function (Subscriptions $subscriptions) use ($subscriptionStatus, $progressClosure) { - foreach ($subscriptions as $subscription) { - if (!$this->subscribers->contain($subscription->id)) { - // mark detached subscriptions as we cannot handle them and exclude them from catchup - $subscription->set( - status: SubscriptionStatus::DETACHED, - ); - $this->subscriptionManager->update($subscription); - $this->logger?->info(sprintf('Subscription Engine: Subscriber for "%s" not found and has been marked as detached.', $subscription->id->value)); - $subscriptions = $subscriptions->without($subscription->id); + function (Subscriptions $subscriptions) use ($subscriptionStatus, $progressClosure, $store, $subscribers) { + foreach ($subscriptions as $subscription) { + // TODO we cannot mark a subscriber as detached, if it belongs to another store and no one else is using that store anymore. Then it will just be ACTIVE and never found, not even by status + if (!$subscribers->contain($subscription->id)) { + // mark detached subscriptions as we cannot handle them and exclude them from catchup + $subscription->set( + status: SubscriptionStatus::DETACHED, + ); + $this->subscriptionManager->update($subscription); + $this->logger?->info(sprintf('Subscription Engine: Subscriber for "%s" not found and has been marked as detached.', $subscription->id->value)); + $subscriptions = $subscriptions->without($subscription->id); + } } - } - if ($subscriptions->isEmpty()) { - $this->logger?->info(sprintf('Subscription Engine: No subscriptions in state "%s". Finishing catch up', $subscriptionStatus->value)); + if ($subscriptions->isEmpty()) { + $this->logger?->info(sprintf('Subscription Engine: No subscriptions in state "%s". Finishing catch up', $subscriptionStatus->value)); - return ProcessedResult::success(0); - } - foreach ($subscriptions as $subscription) { - try { - $this->subscribers->get($subscription->id)->onBeforeCatchUp($subscription->status); - } catch (\Throwable $e) { - // analog to onAfterCatchUp, we tolerate no exceptions here and consider it a critical developer error. - $message = sprintf('Subscriber "%s" failed onBeforeCatchUp: %s', $subscription->id->value, $e->getMessage()); - $this->logger?->critical($message); - throw new CatchUpFailed($message, 1732374000, $e); + return ProcessedResult::success(0); } - } - $startSequenceNumber = $subscriptions->lowestPosition()?->next() ?? SequenceNumber::none(); - $this->logger?->debug(sprintf('Subscription Engine: Event stream is processed from position %s.', $startSequenceNumber->value)); - - /** @var list $errors */ - $errors = []; - $numberOfProcessedEvents = 0; - try { - $eventStream = $this->eventStore->load(VirtualStreamName::all())->withMinimumSequenceNumber($startSequenceNumber); - foreach ($eventStream as $eventEnvelope) { - $sequenceNumber = $eventEnvelope->sequenceNumber; - if ($numberOfProcessedEvents > 0) { - $this->logger?->debug(sprintf('Subscription Engine: Current event stream position: %s', $sequenceNumber->value)); - } - if ($progressClosure !== null) { - $progressClosure($eventEnvelope); + foreach ($subscriptions as $subscription) { + try { + $subscribers->get($subscription->id)->onBeforeCatchUp($subscription->status); + } catch (\Throwable $e) { + // analog to onAfterCatchUp, we tolerate no exceptions here and consider it a critical developer error. + $message = sprintf('Subscriber "%s" failed onBeforeCatchUp: %s', $subscription->id->value, $e->getMessage()); + $this->logger?->critical($message); + throw new CatchUpFailed($message, 1732374000, $e); } - $domainEvent = $this->eventNormalizer->denormalize($eventEnvelope->event); - foreach ($subscriptions as $subscription) { - if ($subscription->status !== $subscriptionStatus) { - continue; + } + $startSequenceNumber = $subscriptions->lowestPosition()?->next() ?? SequenceNumber::none(); + $this->logger?->debug(sprintf('Subscription Engine: Event stream is processed from position %s.', $startSequenceNumber->value)); + + /** @var list $errors */ + $errors = []; + $numberOfProcessedEvents = 0; + try { + $eventStream = $this->eventStore->load(VirtualStreamName::all())->withMinimumSequenceNumber($startSequenceNumber); + foreach ($eventStream as $eventEnvelope) { + $sequenceNumber = $eventEnvelope->sequenceNumber; + if ($numberOfProcessedEvents > 0) { + $this->logger?->debug(sprintf('Subscription Engine: Current event stream position: %s', $sequenceNumber->value)); } - if ($subscription->position->value >= $sequenceNumber->value) { - $this->logger?->debug(sprintf('Subscription Engine: Subscription "%s" is farther than the current position (%d >= %d), continue catch up.', $subscription->id->value, $subscription->position->value, $sequenceNumber->value)); - continue; + if ($progressClosure !== null) { + $progressClosure($eventEnvelope); } - $this->subscriptionStore->createSavepoint(); - $error = $this->handleEvent($eventEnvelope, $domainEvent, $subscription); - if (!$error) { - $this->subscriptionStore->releaseSavepoint(); - continue; + $domainEvent = $this->eventNormalizer->denormalize($eventEnvelope->event); + foreach ($subscriptions as $subscription) { + if ($subscription->status !== $subscriptionStatus) { + continue; + } + if ($subscription->position->value >= $sequenceNumber->value) { + $this->logger?->debug(sprintf('Subscription Engine: Subscription "%s" is farther than the current position (%d >= %d), continue catch up.', $subscription->id->value, $subscription->position->value, $sequenceNumber->value)); + continue; + } + $store->createSavepoint(); + $error = $this->handleEvent($eventEnvelope, $domainEvent, $subscription); + if (!$error) { + $store->releaseSavepoint(); + continue; + } + $store->rollbackSavepoint(); + $errors[] = $error; } - $this->subscriptionStore->rollbackSavepoint(); - $errors[] = $error; + $numberOfProcessedEvents++; + } + } finally { + foreach ($subscriptions as $subscription) { + $this->subscriptionManager->update($subscription); } - $numberOfProcessedEvents++; } - } finally { foreach ($subscriptions as $subscription) { - $this->subscriptionManager->update($subscription); - } - } - foreach ($subscriptions as $subscription) { - try { - $this->subscribers->get($subscription->id)->onAfterCatchUp(); - } catch (\Throwable $e) { - // analog to onBeforeCatchUp, we tolerate no exceptions here and consider it a critical developer error. - $message = sprintf('Subscriber "%s" failed onAfterCatchUp: %s', $subscription->id->value, $e->getMessage()); - $this->logger?->critical($message); - throw new CatchUpFailed($message, 1732374000, $e); - } - if ($subscription->status !== $subscriptionStatus) { - continue; - } + try { + $subscribers->get($subscription->id)->onAfterCatchUp(); + } catch (\Throwable $e) { + // analog to onBeforeCatchUp, we tolerate no exceptions here and consider it a critical developer error. + $message = sprintf('Subscriber "%s" failed onAfterCatchUp: %s', $subscription->id->value, $e->getMessage()); + $this->logger?->critical($message); + throw new CatchUpFailed($message, 1732374000, $e); + } + if ($subscription->status !== $subscriptionStatus) { + continue; + } - if ($subscription->status !== SubscriptionStatus::ACTIVE) { - $subscription->set( - status: SubscriptionStatus::ACTIVE, - ); - $this->subscriptionManager->update($subscription); - $this->logger?->info(sprintf('Subscription Engine: Subscription "%s" has been set to active after booting.', $subscription->id->value)); + if ($subscription->status !== SubscriptionStatus::ACTIVE) { + $subscription->set( + status: SubscriptionStatus::ACTIVE, + ); + $this->subscriptionManager->update($subscription); + $this->logger?->info(sprintf('Subscription Engine: Subscription "%s" has been set to active after booting.', $subscription->id->value)); + } } + $this->logger?->info(sprintf('Subscription Engine: Finish catch up. %d processed events %d errors.', $numberOfProcessedEvents, count($errors))); + return $errors === [] ? ProcessedResult::success($numberOfProcessedEvents) : ProcessedResult::failed($numberOfProcessedEvents, Errors::fromArray($errors)); } - $this->logger?->info(sprintf('Subscription Engine: Finish catch up. %d processed events %d errors.', $numberOfProcessedEvents, count($errors))); - return $errors === [] ? ProcessedResult::success($numberOfProcessedEvents) : ProcessedResult::failed($numberOfProcessedEvents, Errors::fromArray($errors)); - } - ); + ); + } + return $returnResult ?? ProcessedResult::success(0); } /** diff --git a/Neos.ContentRepository.Core/Classes/Subscription/Store/SubscriptionStoreInterface.php b/Neos.ContentRepository.Core/Classes/Subscription/Store/SubscriptionStoreInterface.php index 4b6a827f8e..b9b50f0ce4 100644 --- a/Neos.ContentRepository.Core/Classes/Subscription/Store/SubscriptionStoreInterface.php +++ b/Neos.ContentRepository.Core/Classes/Subscription/Store/SubscriptionStoreInterface.php @@ -32,4 +32,6 @@ public function createSavepoint(): void; public function releaseSavepoint(): void; public function rollbackSavepoint(): void; + + public function getId(): string; } diff --git a/Neos.ContentRepository.Core/Classes/Subscription/Subscriber/ProjectionSubscriber.php b/Neos.ContentRepository.Core/Classes/Subscription/Subscriber/ProjectionSubscriber.php index 117b52265b..631b90978e 100644 --- a/Neos.ContentRepository.Core/Classes/Subscription/Subscriber/ProjectionSubscriber.php +++ b/Neos.ContentRepository.Core/Classes/Subscription/Subscriber/ProjectionSubscriber.php @@ -8,6 +8,7 @@ use Neos\ContentRepository\Core\Projection\CatchUpHook\CatchUpHookInterface; use Neos\ContentRepository\Core\Projection\ProjectionInterface; use Neos\ContentRepository\Core\Projection\ProjectionStateInterface; +use Neos\ContentRepository\Core\Subscription\Store\SubscriptionStoreInterface; use Neos\ContentRepository\Core\Subscription\SubscriptionId; use Neos\ContentRepository\Core\Subscription\SubscriptionStatus; use Neos\EventStore\Model\EventEnvelope; @@ -22,8 +23,9 @@ final class ProjectionSubscriber */ public function __construct( public readonly SubscriptionId $id, + public readonly SubscriptionStoreInterface $store, public readonly ProjectionInterface $projection, - private readonly ?CatchUpHookInterface $catchUpHook + private readonly ?CatchUpHookInterface $catchUpHook, ) { } diff --git a/Neos.ContentRepository.Core/Classes/Subscription/Subscriber/Subscribers.php b/Neos.ContentRepository.Core/Classes/Subscription/Subscriber/Subscribers.php index ba40fbddb1..a3f0f03aa7 100644 --- a/Neos.ContentRepository.Core/Classes/Subscription/Subscriber/Subscribers.php +++ b/Neos.ContentRepository.Core/Classes/Subscription/Subscriber/Subscribers.php @@ -4,6 +4,7 @@ namespace Neos\ContentRepository\Core\Subscription\Subscriber; +use Neos\ContentRepository\Core\Subscription\Store\SubscriptionStoreInterface; use Neos\ContentRepository\Core\Subscription\SubscriptionId; /** @@ -71,6 +72,23 @@ public function count(): int return count($this->subscribersById); } + /** + * @return iterable + */ + public function groupByStore(): iterable + { + $byStore = []; + + foreach ($this->subscribersById as $subscriber) { + $byStore[$subscriber->store->getId()][] = $subscriber; + } + + foreach ($byStore as $subscribers) { + $store = reset($subscribers)->store; + yield [$store, self::fromArray($subscribers)]; + } + } + /** * @return iterable */ diff --git a/Neos.ContentRepositoryRegistry/Classes/ContentRepositoryRegistry.php b/Neos.ContentRepositoryRegistry/Classes/ContentRepositoryRegistry.php index 4fdec55cd7..1aa4d7bd9b 100644 --- a/Neos.ContentRepositoryRegistry/Classes/ContentRepositoryRegistry.php +++ b/Neos.ContentRepositoryRegistry/Classes/ContentRepositoryRegistry.php @@ -198,11 +198,12 @@ private function buildFactory(ContentRepositoryId $contentRepositoryId): Content $this->buildPropertySerializer($contentRepositoryId, $contentRepositorySettings), $this->buildAuthProviderFactory($contentRepositoryId, $contentRepositorySettings), $clock, - $this->buildSubscriptionStore($contentRepositoryId, $clock, $contentRepositorySettings), + // todo if the contentGraphProjection factory uses different database, we need to use another $subscriptionStore here! Configure it per projection??? + $subscriptionStore = $this->buildSubscriptionStore($contentRepositoryId, $clock, $contentRepositorySettings), $this->buildContentGraphProjectionFactory($contentRepositoryId, $contentRepositorySettings), $contentGraphCatchUpHookFactory, $this->buildCommandHooksFactory($contentRepositoryId, $contentRepositorySettings), - $this->buildAdditionalSubscribersFactories($contentRepositoryId, $contentRepositorySettings), + $this->buildAdditionalSubscribersFactories($contentRepositoryId, $contentRepositorySettings, $subscriptionStore), $this->logger, ); } catch (\Exception $exception) { @@ -330,7 +331,7 @@ private function buildCommandHooksFactory(ContentRepositoryId $contentRepository } /** @param array $contentRepositorySettings */ - private function buildAdditionalSubscribersFactories(ContentRepositoryId $contentRepositoryId, array $contentRepositorySettings): ContentRepositorySubscriberFactories + private function buildAdditionalSubscribersFactories(ContentRepositoryId $contentRepositoryId, array $contentRepositorySettings, SubscriptionStoreInterface $subscriptionStore): ContentRepositorySubscriberFactories { if (!is_array($contentRepositorySettings['projections'] ?? [])) { throw InvalidConfigurationException::fromMessage('Content repository "%s" expects projections configured as array.', $contentRepositoryId->value); @@ -351,6 +352,7 @@ private function buildAdditionalSubscribersFactories(ContentRepositoryId $conten } $projectionSubscriberFactories[$projectionName] = new ProjectionSubscriberFactory( SubscriptionId::fromString($projectionName), + $subscriptionStore, // todo if projection factory uses different database, we need to use another $subscriptionStore here! $projectionFactory, $this->buildCatchUpHookFactory($contentRepositoryId, $projectionName, $projectionOptions), $projectionOptions['options'] ?? [], diff --git a/Neos.ContentRepositoryRegistry/Classes/Factory/SubscriptionStore/DoctrineSubscriptionStore.php b/Neos.ContentRepositoryRegistry/Classes/Factory/SubscriptionStore/DoctrineSubscriptionStore.php index 234f1b4483..76d190f75c 100644 --- a/Neos.ContentRepositoryRegistry/Classes/Factory/SubscriptionStore/DoctrineSubscriptionStore.php +++ b/Neos.ContentRepositoryRegistry/Classes/Factory/SubscriptionStore/DoctrineSubscriptionStore.php @@ -175,4 +175,9 @@ public function rollbackSavepoint(): void { $this->dbal->rollbackSavepoint('SUBSCRIBER'); } + + public function getId(): string + { + return spl_object_hash($this->dbal); + } }