Skip to content

Commit

Permalink
TASK: Catchups will only be able to access the state of the projectio…
Browse files Browse the repository at this point in the history
…n they are registered for

A catchup doesn't have access to the full content repository, as it would allow full recursion via handle and accessing other projections
state is not safe as the other projection might not be behind - the order is undefined.

This will make it possible to catchup projections from outside of the cr instance as proposed here: neos#5321
  • Loading branch information
mhsdesign committed Oct 27, 2024
1 parent 39f19d2 commit 971a253
Show file tree
Hide file tree
Showing 14 changed files with 104 additions and 61 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,18 +14,21 @@

namespace Neos\ContentRepository\BehavioralTests\ProjectionRaceConditionTester;

use Neos\ContentRepository\Core\ContentRepository;
use Neos\ContentRepository\Core\Projection\CatchUpHookFactoryInterface;
use Neos\ContentRepository\Core\Projection\CatchUpHookInterface;
use Neos\ContentRepository\Core\Projection\ContentGraph\ContentGraphReadModelInterface;
use Neos\ContentRepository\Core\Projection\ProjectionStateInterface;
use Neos\ContentRepository\Core\SharedModel\ContentRepository\ContentRepositoryId;

/**
* For full docs and context, see {@see RaceTrackerCatchUpHook}
*
* @implements CatchUpHookFactoryInterface<ContentGraphReadModelInterface>
* @internal
*/
final class RaceTrackerCatchUpHookFactory implements CatchUpHookFactoryInterface
{
public function build(ContentRepository $contentRepository): CatchUpHookInterface
public function build(ContentRepositoryId $contentRepositoryId, ProjectionStateInterface $projectionState): CatchUpHookInterface
{
return new RaceTrackerCatchUpHook();
}
Expand Down
4 changes: 2 additions & 2 deletions Neos.ContentRepository.Core/Classes/ContentRepository.php
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@
use Neos\ContentRepository\Core\Projection\CatchUp;
use Neos\ContentRepository\Core\Projection\CatchUpOptions;
use Neos\ContentRepository\Core\Projection\ContentGraph\ContentGraphInterface;
use Neos\ContentRepository\Core\Projection\ContentGraph\ContentGraphReadModelInterface;
use Neos\ContentRepository\Core\Projection\ContentGraph\ContentGraphProjectionInterface;
use Neos\ContentRepository\Core\Projection\ContentGraph\ContentGraphReadModelInterface;
use Neos\ContentRepository\Core\Projection\ProjectionInterface;
use Neos\ContentRepository\Core\Projection\ProjectionsAndCatchUpHooks;
use Neos\ContentRepository\Core\Projection\ProjectionStateInterface;
Expand Down Expand Up @@ -171,7 +171,7 @@ public function catchUpProjection(string $projectionClassName, CatchUpOptions $o
$projection = $this->projectionsAndCatchUpHooks->projections->get($projectionClassName);

$catchUpHookFactory = $this->projectionsAndCatchUpHooks->getCatchUpHookFactoryForProjection($projection);
$catchUpHook = $catchUpHookFactory?->build($this);
$catchUpHook = $catchUpHookFactory?->build($this->id, $projection->getState());

// TODO allow custom stream name per projection
$streamName = VirtualStreamName::all();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,20 +6,20 @@

use Neos\ContentRepository\Core\Projection\CatchUpHookFactories;
use Neos\ContentRepository\Core\Projection\CatchUpHookFactoryInterface;
use Neos\ContentRepository\Core\Projection\ContentGraph\ContentGraphProjectionInterface;
use Neos\ContentRepository\Core\Projection\ProjectionFactoryInterface;
use Neos\ContentRepository\Core\Projection\ProjectionInterface;
use Neos\ContentRepository\Core\Projection\Projections;
use Neos\ContentRepository\Core\Projection\ProjectionsAndCatchUpHooks;
use Neos\ContentRepository\Core\Projection\ProjectionStateInterface;
use Neos\ContentRepository\Core\Projection\ContentGraph\ContentGraphProjectionInterface;

/**
* @api for custom framework integrations, not for users of the CR
*/
final class ProjectionsAndCatchUpHooksFactory
{
/**
* @var array<string, array{factory: ProjectionFactoryInterface<ProjectionInterface<ProjectionStateInterface>>, options: array<string, mixed>, catchUpHooksFactories: array<CatchUpHookFactoryInterface>}>
* @var array<string, array{factory: ProjectionFactoryInterface<ProjectionInterface<ProjectionStateInterface>>, options: array<string, mixed>, catchUpHooksFactories: array<CatchUpHookFactoryInterface<ProjectionStateInterface>>}>
*/
private array $factories = [];

Expand All @@ -40,7 +40,7 @@ public function registerFactory(ProjectionFactoryInterface $factory, array $opti

/**
* @param ProjectionFactoryInterface<ProjectionInterface<ProjectionStateInterface>> $factory
* @param CatchUpHookFactoryInterface $catchUpHookFactory
* @param CatchUpHookFactoryInterface<ProjectionStateInterface> $catchUpHookFactory
* @return void
* @api
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,22 @@

namespace Neos\ContentRepository\Core\Projection;

use Neos\ContentRepository\Core\ContentRepository;
use Neos\ContentRepository\Core\SharedModel\ContentRepository\ContentRepositoryId;

/**
* @implements CatchUpHookFactoryInterface<ProjectionStateInterface>
* @internal
*/
final class CatchUpHookFactories implements CatchUpHookFactoryInterface
{
/**
* @var array<mixed,CatchUpHookFactoryInterface>
* @var array<mixed,CatchUpHookFactoryInterface<ProjectionStateInterface>>
*/
private array $catchUpHookFactories;

/**
* @param CatchUpHookFactoryInterface<ProjectionStateInterface> ...$catchUpHookFactories
*/
private function __construct(CatchUpHookFactoryInterface ...$catchUpHookFactories)
{
$this->catchUpHookFactories = $catchUpHookFactories;
Expand All @@ -26,6 +30,10 @@ public static function create(): self
return new self();
}

/**
* @param CatchUpHookFactoryInterface<ProjectionStateInterface> $catchUpHookFactory
* @return self
*/
public function with(CatchUpHookFactoryInterface $catchUpHookFactory): self
{
if ($this->has($catchUpHookFactory::class)) {
Expand All @@ -44,9 +52,9 @@ private function has(string $catchUpHookFactoryClassName): bool
return array_key_exists($catchUpHookFactoryClassName, $this->catchUpHookFactories);
}

public function build(ContentRepository $contentRepository): CatchUpHookInterface
public function build(ContentRepositoryId $contentRepositoryId, ProjectionStateInterface $projectionState): CatchUpHookInterface
{
$catchUpHooks = array_map(static fn(CatchUpHookFactoryInterface $catchUpHookFactory) => $catchUpHookFactory->build($contentRepository), $this->catchUpHookFactories);
$catchUpHooks = array_map(static fn(CatchUpHookFactoryInterface $catchUpHookFactory) => $catchUpHookFactory->build($contentRepositoryId, $projectionState), $this->catchUpHookFactories);
return new DelegatingCatchUpHook(...$catchUpHooks);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,21 @@

namespace Neos\ContentRepository\Core\Projection;

use Neos\ContentRepository\Core\ContentRepository;
use Neos\ContentRepository\Core\SharedModel\ContentRepository\ContentRepositoryId;

/**
* @template T of ProjectionStateInterface
* @api
*/
interface CatchUpHookFactoryInterface
{
public function build(ContentRepository $contentRepository): CatchUpHookInterface;
/**
* Note that a catchup doesn't have access to the full content repository, as it would allow full recursion via handle and accessing other projections
* state is not safe as the other projection might not be behind - the order is undefined.
*
* @param ContentRepositoryId $contentRepositoryId the content repository the catchup was registered in
* @param ProjectionStateInterface&T $projectionState the state of the projection the catchup was registered to (Its only safe to access this projections state)
* @return CatchUpHookInterface
*/
public function build(ContentRepositoryId $contentRepositoryId, ProjectionStateInterface $projectionState): CatchUpHookInterface;
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,11 @@
use Neos\ContentRepository\Core\SharedModel\Workspace\Workspaces;

/**
* @api for creating a custom content repository graph projection implementation, **not for users of the CR**
* This low level interface gives access to the content graph and workspaces
*
* Generally this is not accessible for users of the CR, except for registering a catchup-hook on the content graph
*
* @api as dependency in catchup hooks and for creating a custom content repository graph projection implementation
*/
interface ContentGraphReadModelInterface extends ProjectionStateInterface
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ public function __construct(

/**
* @param ProjectionInterface<ProjectionStateInterface> $projection
* @return ?CatchUpHookFactoryInterface<ProjectionStateInterface>
*/
public function getCatchUpHookFactoryForProjection(ProjectionInterface $projection): ?CatchUpHookFactoryInterface
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,16 @@

namespace Neos\ContentRepositoryRegistry\SubgraphCachingInMemory;

use Neos\ContentRepository\Core\ContentRepository;
use Neos\ContentRepository\Core\Projection\CatchUpHookFactoryInterface;
use Neos\ContentRepository\Core\Projection\CatchUpHookInterface;
use Neos\ContentRepository\Core\Projection\ContentGraph\ContentGraphReadModelInterface;
use Neos\ContentRepository\Core\Projection\ProjectionStateInterface;
use Neos\ContentRepository\Core\SharedModel\ContentRepository\ContentRepositoryId;

/**
* Factory for {@see FlushSubgraphCachePoolCatchUpHook}, auto-registered in Settings.yaml for GraphProjection
*
* @implements CatchUpHookFactoryInterface<ContentGraphReadModelInterface>
* @internal
*/
class FlushSubgraphCachePoolCatchUpHookFactory implements CatchUpHookFactoryInterface
Expand All @@ -20,7 +23,8 @@ public function __construct(
private readonly SubgraphCachePool $subgraphCachePool
) {
}
public function build(ContentRepository $contentRepository): CatchUpHookInterface

public function build(ContentRepositoryId $contentRepositoryId, ProjectionStateInterface $projectionState): CatchUpHookInterface
{
return new FlushSubgraphCachePoolCatchUpHook($this->subgraphCachePool);
}
Expand Down
24 changes: 13 additions & 11 deletions Neos.Neos/Classes/AssetUsage/CatchUpHook/AssetUsageCatchUpHook.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@

namespace Neos\Neos\AssetUsage\CatchUpHook;

use Neos\ContentRepository\Core\ContentRepository;
use Neos\ContentRepository\Core\DimensionSpace\DimensionSpacePoint;
use Neos\ContentRepository\Core\DimensionSpace\DimensionSpacePointSet;
use Neos\ContentRepository\Core\EventStore\EventInterface;
Expand All @@ -21,9 +20,11 @@
use Neos\ContentRepository\Core\Feature\WorkspacePublication\Event\WorkspaceWasDiscarded;
use Neos\ContentRepository\Core\Feature\WorkspacePublication\Event\WorkspaceWasPartiallyDiscarded;
use Neos\ContentRepository\Core\Projection\CatchUpHookInterface;
use Neos\ContentRepository\Core\Projection\ContentGraph\ContentGraphReadModelInterface;
use Neos\ContentRepository\Core\Projection\ContentGraph\Filter\FindDescendantNodesFilter;
use Neos\ContentRepository\Core\Projection\ContentGraph\Node;
use Neos\ContentRepository\Core\Projection\ContentGraph\VisibilityConstraints;
use Neos\ContentRepository\Core\SharedModel\ContentRepository\ContentRepositoryId;
use Neos\ContentRepository\Core\SharedModel\Exception\WorkspaceDoesNotExist;
use Neos\ContentRepository\Core\SharedModel\Node\NodeAggregateId;
use Neos\ContentRepository\Core\SharedModel\Workspace\WorkspaceName;
Expand All @@ -36,7 +37,8 @@
class AssetUsageCatchUpHook implements CatchUpHookInterface
{
public function __construct(
private readonly ContentRepository $contentRepository,
private readonly ContentRepositoryId $contentRepositoryId,
private readonly ContentGraphReadModelInterface $contentGraphReadModel,
private readonly AssetUsageIndexingService $assetUsageIndexingService
) {
}
Expand All @@ -50,7 +52,7 @@ public function onBeforeEvent(EventInterface $eventInstance, EventEnvelope $even
if ($eventInstance instanceof EmbedsWorkspaceName && $eventInstance instanceof EmbedsContentStreamId) {
// Safeguard for temporary content streams created during partial publish -> We want to skip these events, because their workspace doesn't match current content stream.
try {
$contentGraph = $this->contentRepository->getContentGraph($eventInstance->getWorkspaceName());
$contentGraph = $this->contentGraphReadModel->getContentGraph($eventInstance->getWorkspaceName());
} catch (WorkspaceDoesNotExist) {
return;
}
Expand All @@ -71,7 +73,7 @@ public function onAfterEvent(EventInterface $eventInstance, EventEnvelope $event
if ($eventInstance instanceof EmbedsWorkspaceName && $eventInstance instanceof EmbedsContentStreamId) {
// Safeguard for temporary content streams created during partial publish -> We want to skip these events, because their workspace doesn't match current content stream.
try {
$contentGraph = $this->contentRepository->getContentGraph($eventInstance->getWorkspaceName());
$contentGraph = $this->contentGraphReadModel->getContentGraph($eventInstance->getWorkspaceName());
} catch (WorkspaceDoesNotExist) {
return;
}
Expand Down Expand Up @@ -103,7 +105,7 @@ public function onAfterCatchUp(): void

private function updateNode(WorkspaceName $workspaceName, NodeAggregateId $nodeAggregateId, DimensionSpacePoint $dimensionSpacePoint): void
{
$contentGraph = $this->contentRepository->getContentGraph($workspaceName);
$contentGraph = $this->contentGraphReadModel->getContentGraph($workspaceName);
$node = $contentGraph->getSubgraph($dimensionSpacePoint, VisibilityConstraints::withoutRestrictions())->findNodeById($nodeAggregateId);

if ($node === null) {
Expand All @@ -112,14 +114,14 @@ private function updateNode(WorkspaceName $workspaceName, NodeAggregateId $nodeA
}

$this->assetUsageIndexingService->updateIndex(
$this->contentRepository->id,
$this->contentRepositoryId,
$node
);
}

private function removeNodes(WorkspaceName $workspaceName, NodeAggregateId $nodeAggregateId, DimensionSpacePointSet $dimensionSpacePoints): void
{
$contentGraph = $this->contentRepository->getContentGraph($workspaceName);
$contentGraph = $this->contentGraphReadModel->getContentGraph($workspaceName);

foreach ($dimensionSpacePoints as $dimensionSpacePoint) {
$subgraph = $contentGraph->getSubgraph($dimensionSpacePoint, VisibilityConstraints::withoutRestrictions());
Expand All @@ -131,7 +133,7 @@ private function removeNodes(WorkspaceName $workspaceName, NodeAggregateId $node
/** @var Node $node */
foreach ($nodes as $node) {
$this->assetUsageIndexingService->removeIndexForNode(
$this->contentRepository->id,
$this->contentRepositoryId,
$node
);
}
Expand All @@ -140,7 +142,7 @@ private function removeNodes(WorkspaceName $workspaceName, NodeAggregateId $node

private function discardWorkspace(WorkspaceName $workspaceName): void
{
$this->assetUsageIndexingService->removeIndexForWorkspace($this->contentRepository->id, $workspaceName);
$this->assetUsageIndexingService->removeIndexForWorkspace($this->contentRepositoryId, $workspaceName);
}

private function discardNodes(WorkspaceName $workspaceName, NodeIdsToPublishOrDiscard $nodeIds): void
Expand All @@ -151,7 +153,7 @@ private function discardNodes(WorkspaceName $workspaceName, NodeIdsToPublishOrDi
continue;
}
$this->assetUsageIndexingService->removeIndexForWorkspaceNameNodeAggregateIdAndDimensionSpacePoint(
$this->contentRepository->id,
$this->contentRepositoryId,
$workspaceName,
$nodeId->nodeAggregateId,
$nodeId->dimensionSpacePoint
Expand All @@ -161,6 +163,6 @@ private function discardNodes(WorkspaceName $workspaceName, NodeIdsToPublishOrDi

private function updateDimensionSpacePoint(WorkspaceName $workspaceName, DimensionSpacePoint $source, DimensionSpacePoint $target): void
{
$this->assetUsageIndexingService->updateDimensionSpacePointInIndex($this->contentRepository->id, $workspaceName, $source, $target);
$this->assetUsageIndexingService->updateDimensionSpacePointInIndex($this->contentRepositoryId, $workspaceName, $source, $target);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,21 +14,27 @@
* source code.
*/

use Neos\ContentRepository\Core\ContentRepository;
use Neos\ContentRepository\Core\Projection\CatchUpHookFactoryInterface;
use Neos\ContentRepository\Core\Projection\ContentGraph\ContentGraphReadModelInterface;
use Neos\ContentRepository\Core\Projection\ProjectionStateInterface;
use Neos\ContentRepository\Core\SharedModel\ContentRepository\ContentRepositoryId;
use Neos\Neos\AssetUsage\Service\AssetUsageIndexingService;

/**
* @implements CatchUpHookFactoryInterface<ContentGraphReadModelInterface>
*/
class AssetUsageCatchUpHookFactory implements CatchUpHookFactoryInterface
{
public function __construct(
private AssetUsageIndexingService $assetUsageIndexingService
) {
}

public function build(ContentRepository $contentRepository): AssetUsageCatchUpHook
public function build(ContentRepositoryId $contentRepositoryId, ProjectionStateInterface $projectionState): AssetUsageCatchUpHook
{
return new AssetUsageCatchUpHook(
$contentRepository,
$contentRepositoryId,
$projectionState,
$this->assetUsageIndexingService
);
}
Expand Down
Loading

0 comments on commit 971a253

Please sign in to comment.