Skip to content

Commit

Permalink
FEATURE: Decentral subscription store
Browse files Browse the repository at this point in the history
Allows to run projections in a different database WITH their store to ensure exactly once delivery and that on critical error (memory error) the transaction is not committet and that the projection in the other db is not already ahead.

TODO, allow to use actually wire it together via cr registry, That one projection runs in one database WITH its subscription store.
  • Loading branch information
mhsdesign committed Nov 24, 2024
1 parent 6726d73 commit 385e78d
Show file tree
Hide file tree
Showing 8 changed files with 193 additions and 136 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ public function __construct(
Serializer $propertySerializer,
private readonly AuthProviderFactoryInterface $authProviderFactory,
private readonly ClockInterface $clock,
SubscriptionStoreInterface $subscriptionStore,
SubscriptionStoreInterface $contentGraphSubscriptionStore,
ContentGraphProjectionFactoryInterface $contentGraphProjectionFactory,
private readonly CatchUpHookFactoryInterface|null $contentGraphCatchUpHookFactory,
private readonly CommandHooksFactory $commandHooksFactory,
Expand Down Expand Up @@ -106,14 +106,15 @@ public function __construct(
}
$this->additionalProjectionStates = ProjectionStates::fromArray($additionalProjectionStates);
$this->contentGraphProjection = $contentGraphProjectionFactory->build($this->subscriberFactoryDependencies);
$subscribers[] = $this->buildContentGraphSubscriber();
$this->subscriptionEngine = new SubscriptionEngine($this->eventStore, $subscriptionStore, Subscribers::fromArray($subscribers), $eventNormalizer, $logger);
$subscribers[] = $this->buildContentGraphSubscriber($contentGraphSubscriptionStore);
$this->subscriptionEngine = new SubscriptionEngine($this->eventStore, Subscribers::fromArray($subscribers), $eventNormalizer, $logger);
}

private function buildContentGraphSubscriber(): ProjectionSubscriber
private function buildContentGraphSubscriber(SubscriptionStoreInterface $subscriptionStore): ProjectionSubscriber
{
return new ProjectionSubscriber(
SubscriptionId::fromString('contentGraph'),
$subscriptionStore,
$this->contentGraphProjection,
$this->contentGraphCatchUpHookFactory?->build(CatchUpHookFactoryDependencies::create(
$this->contentRepositoryId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
use Neos\ContentRepository\Core\Projection\ProjectionFactoryInterface;
use Neos\ContentRepository\Core\Projection\ProjectionInterface;
use Neos\ContentRepository\Core\Projection\ProjectionStateInterface;
use Neos\ContentRepository\Core\Subscription\Store\SubscriptionStoreInterface;
use Neos\ContentRepository\Core\Subscription\Subscriber\ProjectionSubscriber;
use Neos\ContentRepository\Core\Subscription\SubscriptionId;

Expand All @@ -34,6 +35,7 @@
*/
public function __construct(
private SubscriptionId $subscriptionId,
private SubscriptionStoreInterface $subscriptionStore,
private ProjectionFactoryInterface $projectionFactory,
private ?CatchUpHookFactoryInterface $catchUpHookFactory,
private array $projectionFactoryOptions,
Expand All @@ -53,6 +55,7 @@ public function build(SubscriberFactoryDependencies $dependencies): ProjectionSu

return new ProjectionSubscriber(
$this->subscriptionId,
$this->subscriptionStore,
$projection,
$catchUpHook,
);
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -32,4 +32,6 @@ public function createSavepoint(): void;
public function releaseSavepoint(): void;

public function rollbackSavepoint(): void;

public function getId(): string;
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
use Neos\ContentRepository\Core\Projection\CatchUpHook\CatchUpHookInterface;
use Neos\ContentRepository\Core\Projection\ProjectionInterface;
use Neos\ContentRepository\Core\Projection\ProjectionStateInterface;
use Neos\ContentRepository\Core\Subscription\Store\SubscriptionStoreInterface;
use Neos\ContentRepository\Core\Subscription\SubscriptionId;
use Neos\ContentRepository\Core\Subscription\SubscriptionStatus;
use Neos\EventStore\Model\EventEnvelope;
Expand All @@ -22,8 +23,9 @@ final class ProjectionSubscriber
*/
public function __construct(
public readonly SubscriptionId $id,
public readonly SubscriptionStoreInterface $store,
public readonly ProjectionInterface $projection,
private readonly ?CatchUpHookInterface $catchUpHook
private readonly ?CatchUpHookInterface $catchUpHook,
) {
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

namespace Neos\ContentRepository\Core\Subscription\Subscriber;

use Neos\ContentRepository\Core\Subscription\Store\SubscriptionStoreInterface;
use Neos\ContentRepository\Core\Subscription\SubscriptionId;

/**
Expand Down Expand Up @@ -71,6 +72,23 @@ public function count(): int
return count($this->subscribersById);
}

/**
* @return iterable<array{SubscriptionStoreInterface, self}>
*/
public function groupByStore(): iterable
{
$byStore = [];

foreach ($this->subscribersById as $subscriber) {
$byStore[$subscriber->store->getId()][] = $subscriber;
}

foreach ($byStore as $subscribers) {
$store = reset($subscribers)->store;
yield [$store, self::fromArray($subscribers)];
}
}

/**
* @return iterable<mixed>
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -198,11 +198,12 @@ private function buildFactory(ContentRepositoryId $contentRepositoryId): Content
$this->buildPropertySerializer($contentRepositoryId, $contentRepositorySettings),
$this->buildAuthProviderFactory($contentRepositoryId, $contentRepositorySettings),
$clock,
$this->buildSubscriptionStore($contentRepositoryId, $clock, $contentRepositorySettings),
// todo if the contentGraphProjection factory uses different database, we need to use another $subscriptionStore here! Configure it per projection???
$subscriptionStore = $this->buildSubscriptionStore($contentRepositoryId, $clock, $contentRepositorySettings),
$this->buildContentGraphProjectionFactory($contentRepositoryId, $contentRepositorySettings),
$contentGraphCatchUpHookFactory,
$this->buildCommandHooksFactory($contentRepositoryId, $contentRepositorySettings),
$this->buildAdditionalSubscribersFactories($contentRepositoryId, $contentRepositorySettings),
$this->buildAdditionalSubscribersFactories($contentRepositoryId, $contentRepositorySettings, $subscriptionStore),
$this->logger,
);
} catch (\Exception $exception) {
Expand Down Expand Up @@ -330,7 +331,7 @@ private function buildCommandHooksFactory(ContentRepositoryId $contentRepository
}

/** @param array<string, mixed> $contentRepositorySettings */
private function buildAdditionalSubscribersFactories(ContentRepositoryId $contentRepositoryId, array $contentRepositorySettings): ContentRepositorySubscriberFactories
private function buildAdditionalSubscribersFactories(ContentRepositoryId $contentRepositoryId, array $contentRepositorySettings, SubscriptionStoreInterface $subscriptionStore): ContentRepositorySubscriberFactories
{
if (!is_array($contentRepositorySettings['projections'] ?? [])) {
throw InvalidConfigurationException::fromMessage('Content repository "%s" expects projections configured as array.', $contentRepositoryId->value);
Expand All @@ -351,6 +352,7 @@ private function buildAdditionalSubscribersFactories(ContentRepositoryId $conten
}
$projectionSubscriberFactories[$projectionName] = new ProjectionSubscriberFactory(
SubscriptionId::fromString($projectionName),
$subscriptionStore, // todo if projection factory uses different database, we need to use another $subscriptionStore here!
$projectionFactory,
$this->buildCatchUpHookFactory($contentRepositoryId, $projectionName, $projectionOptions),
$projectionOptions['options'] ?? [],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,4 +175,9 @@ public function rollbackSavepoint(): void
{
$this->dbal->rollbackSavepoint('SUBSCRIBER');
}

public function getId(): string
{
return spl_object_hash($this->dbal);
}
}

0 comments on commit 385e78d

Please sign in to comment.