Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FEATURE: Introduce ContentRepositoryMaintainer and restore projection cli commands #5378

Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
f46077e
FEATURE: Introduce `ContentRepositoryMaintainer` and restore cr:proje…
mhsdesign Nov 24, 2024
bace8ff
TASK: Rename `ProjectionStatus` and introduce `ProjectionSubscription…
mhsdesign Nov 24, 2024
8ff0f61
TASK: Introduce `DetachedSubscriptionStatus` as the projection setup …
mhsdesign Nov 24, 2024
9675572
TASK: Inline `pruneAllWorkspacesAndContentStreamsFromEventStream` int…
mhsdesign Nov 24, 2024
655ac3c
TASK: Reimplement 40e8d35e09ee690406c6a9cfc823c775d4ee3b51
mhsdesign Nov 24, 2024
e235e69
TASK: Document new `ContentRepositoryMaintainer`
mhsdesign Nov 24, 2024
611ca37
TASK: Rename `ProjectionSetupStatus` back to `ProjectionStatus`
mhsdesign Nov 25, 2024
0b8a3b5
TASK: Rename `SubscriptionStatuses` to `SubscriptionStatusCollection`
mhsdesign Nov 25, 2024
51f0cf6
TASK: Leave warning hint for why we do a replay
mhsdesign Nov 25, 2024
a2a2411
TASK: Warn in `catchupProjection` if projection is not ready to be ca…
mhsdesign Nov 25, 2024
63d1589
TASK: Document `catchupProjection` correctly
mhsdesign Nov 25, 2024
2297c14
TASK: Reintroduce `ContentRepositoryStatus` object and expose current…
mhsdesign Nov 25, 2024
1220f82
TASK: Swap Projection and Setup in output so that Setup comes first
mhsdesign Nov 25, 2024
800fd53
WIP: Introduce `cr:reactivateSubscription`
mhsdesign Nov 26, 2024
a0c9f90
TASK: Dont crash on status when the event store is not setup
mhsdesign Nov 27, 2024
8c079d9
TASK: Split projection replay into separate SubscriptionCommandContro…
mhsdesign Nov 27, 2024
4c65d81
TASK: Status also shows new subscriptions even if they are not persis…
mhsdesign Nov 27, 2024
8c9c0e8
TASK: Refine todos
mhsdesign Nov 27, 2024
4424483
TASK: Declare SubscriptionEngine and friends as internal
mhsdesign Nov 27, 2024
baa5e4a
TASK: Add error code to `SubscriptionEngineAlreadyProcessingException`
mhsdesign Nov 27, 2024
66e54bc
TASK: Allow cr registry to implement internal subscription store beca…
mhsdesign Nov 27, 2024
51d39e5
TASK: Rename to `SubscriptionReplayProcessor`
mhsdesign Nov 27, 2024
b2c1a29
TASK: Improve legacy projectionReplayCommand stub
mhsdesign Nov 27, 2024
d84c2a4
TASK: Add test for Subscription & Cr Commands (and thus CRMaintainer)
mhsdesign Nov 27, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@
use Neos\ContentRepository\Core\Projection\ContentGraph\ContentGraphReadModelInterface;
use Neos\ContentRepository\Core\Projection\ContentGraph\NodeTags;
use Neos\ContentRepository\Core\Projection\ContentGraph\Timestamps;
use Neos\ContentRepository\Core\Projection\ProjectionStatus;
use Neos\ContentRepository\Core\Projection\ProjectionSetupStatus;
use Neos\ContentRepository\Core\SharedModel\Node\NodeAggregateClassification;
use Neos\ContentRepository\Core\SharedModel\Node\NodeAggregateId;
use Neos\ContentRepository\Core\SharedModel\Node\NodeName;
Expand Down Expand Up @@ -112,23 +112,23 @@ public function setUp(): void
}
}

public function status(): ProjectionStatus
public function setUpStatus(): ProjectionSetupStatus
{
try {
$this->dbal->connect();
} catch (\Throwable $e) {
return ProjectionStatus::error(sprintf('Failed to connect to database: %s', $e->getMessage()));
return ProjectionSetupStatus::error(sprintf('Failed to connect to database: %s', $e->getMessage()));
}
try {
$requiredSqlStatements = $this->determineRequiredSqlStatements();
} catch (\Throwable $e) {
return ProjectionStatus::error(sprintf('Failed to determine required SQL statements: %s', $e->getMessage()));
return ProjectionSetupStatus::error(sprintf('Failed to determine required SQL statements: %s', $e->getMessage()));
}
if ($requiredSqlStatements !== []) {
return ProjectionStatus::setupRequired(sprintf('The following SQL statement%s required: %s', count($requiredSqlStatements) !== 1 ? 's are' : ' is', implode(chr(10), $requiredSqlStatements)));
return ProjectionSetupStatus::setupRequired(sprintf('The following SQL statement%s required: %s', count($requiredSqlStatements) !== 1 ? 's are' : ' is', implode(chr(10), $requiredSqlStatements)));
}

return ProjectionStatus::ok();
return ProjectionSetupStatus::ok();
}

public function resetState(): void
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
use Neos\ContentRepository\Core\Infrastructure\DbalSchemaDiff;
use Neos\ContentRepository\Core\Projection\ContentGraph\ContentGraphProjectionInterface;
use Neos\ContentRepository\Core\Projection\ContentGraph\ContentGraphReadModelInterface;
use Neos\ContentRepository\Core\Projection\ProjectionStatus;
use Neos\ContentRepository\Core\Projection\ProjectionSetupStatus;
use Neos\EventStore\Model\EventEnvelope;

/**
Expand Down Expand Up @@ -89,22 +89,22 @@ public function setUp(): void
');
}

public function status(): ProjectionStatus
public function setUpStatus(): ProjectionSetupStatus
{
try {
$this->getDatabaseConnection()->connect();
} catch (\Throwable $e) {
return ProjectionStatus::error(sprintf('Failed to connect to database: %s', $e->getMessage()));
return ProjectionSetupStatus::error(sprintf('Failed to connect to database: %s', $e->getMessage()));
}
try {
$requiredSqlStatements = $this->determineRequiredSqlStatements();
} catch (\Throwable $e) {
return ProjectionStatus::error(sprintf('Failed to determine required SQL statements: %s', $e->getMessage()));
return ProjectionSetupStatus::error(sprintf('Failed to determine required SQL statements: %s', $e->getMessage()));
}
if ($requiredSqlStatements !== []) {
return ProjectionStatus::setupRequired(sprintf('The following SQL statement%s required: %s', count($requiredSqlStatements) !== 1 ? 's are' : ' is', implode(chr(10), $requiredSqlStatements)));
return ProjectionSetupStatus::setupRequired(sprintf('The following SQL statement%s required: %s', count($requiredSqlStatements) !== 1 ? 's are' : ' is', implode(chr(10), $requiredSqlStatements)));
}
return ProjectionStatus::ok();
return ProjectionSetupStatus::ok();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,14 @@
use Behat\Gherkin\Node\TableNode;
use Doctrine\DBAL\Connection;
use Neos\ContentRepository\Core\ContentRepository;
use Neos\ContentRepository\Core\Service\SubscriptionServiceFactory;
use Neos\ContentRepository\Core\Service\ContentRepositoryMaintainerFactory;
use Neos\ContentRepository\Core\SharedModel\ContentRepository\ContentRepositoryId;
use Neos\ContentRepository\Core\Subscription\Engine\SubscriptionEngine;
use Neos\ContentRepository\TestSuite\Behavior\Features\Bootstrap\Helpers\GherkinTableNodeBasedContentDimensionSource;
use Neos\ContentRepository\TestSuite\Fakes\FakeContentDimensionSourceFactory;
use Neos\ContentRepository\TestSuite\Fakes\FakeNodeTypeManagerFactory;
use Neos\EventStore\EventStoreInterface;
use PHPUnit\Framework\Assert;
use Symfony\Component\Yaml\Yaml;

/**
Expand Down Expand Up @@ -179,20 +181,26 @@ protected function setUpContentRepository(ContentRepositoryId $contentRepository
* Catch Up process and the testcase reset.
*/
$contentRepository = $this->createContentRepository($contentRepositoryId);
$subscriptionService = $this->contentRepositoryRegistry->buildService($contentRepositoryId, new SubscriptionServiceFactory());
$contentRepositoryMaintainer = $this->contentRepositoryRegistry->buildService($contentRepositoryId, new ContentRepositoryMaintainerFactory());
if (!in_array($contentRepository->id, self::$alreadySetUpContentRepositories)) {
$subscriptionService->setupEventStore();
$subscriptionService->subscriptionEngine->setup();
$result = $contentRepositoryMaintainer->setUp();
Assert::assertNull($result);
self::$alreadySetUpContentRepositories[] = $contentRepository->id;
}
// todo we TRUNCATE here and do not want to use $contentRepositoryMaintainer->prune(); here as it would not reset the autoincrement sequence number making some assertions impossible
/** @var EventStoreInterface $eventStore */
$eventStore = (new \ReflectionClass($contentRepository))->getProperty('eventStore')->getValue($contentRepository);
/** @var Connection $databaseConnection */
$databaseConnection = (new \ReflectionClass($eventStore))->getProperty('connection')->getValue($eventStore);
$eventTableName = sprintf('cr_%s_events', $contentRepositoryId->value);
$databaseConnection->executeStatement('TRUNCATE ' . $eventTableName);
$subscriptionService->subscriptionEngine->reset();
$subscriptionService->subscriptionEngine->boot();

/** @var SubscriptionEngine $subscriptionEngine */
$subscriptionEngine = (new \ReflectionClass($contentRepositoryMaintainer))->getProperty('subscriptionEngine')->getValue($contentRepositoryMaintainer);
$result = $subscriptionEngine->reset();
Assert::assertNull($result->errors);
$result = $subscriptionEngine->boot();
Assert::assertNull($result->errors);

return $contentRepository;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
use Neos\ContentRepository\Core\Infrastructure\DbalSchemaFactory;
use Neos\ContentRepository\Core\Projection\ProjectionInterface;
use Neos\ContentRepository\Core\Projection\ProjectionStateInterface;
use Neos\ContentRepository\Core\Projection\ProjectionStatus;
use Neos\ContentRepository\Core\Projection\ProjectionSetupStatus;
use Neos\EventStore\Model\EventEnvelope;
use Neos\Flow\Annotations as Flow;

Expand Down Expand Up @@ -52,13 +52,13 @@ public function setUp(): void
}
}

public function status(): ProjectionStatus
public function setUpStatus(): ProjectionSetupStatus
{
$requiredSqlStatements = $this->determineRequiredSqlStatements();
if ($requiredSqlStatements !== []) {
return ProjectionStatus::setupRequired(sprintf('Requires %d SQL statements', count($requiredSqlStatements)));
return ProjectionSetupStatus::setupRequired(sprintf('Requires %d SQL statements', count($requiredSqlStatements)));
}
return ProjectionStatus::ok();
return ProjectionSetupStatus::ok();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,13 @@
use Neos\ContentRepository\Core\Projection\CatchUpHook\CatchUpHookInterface;
use Neos\ContentRepository\Core\Projection\ProjectionInterface;
use Neos\ContentRepository\Core\Projection\ProjectionStateInterface;
use Neos\ContentRepository\Core\Projection\ProjectionStatus;
use Neos\ContentRepository\Core\Service\SubscriptionService;
use Neos\ContentRepository\Core\Service\SubscriptionServiceFactory;
use Neos\ContentRepository\Core\Projection\ProjectionSetupStatus;
use Neos\ContentRepository\Core\SharedModel\ContentRepository\ContentRepositoryId;
use Neos\ContentRepository\Core\SharedModel\Workspace\ContentStreamId;
use Neos\ContentRepository\Core\Subscription\DetachedSubscriptionStatus;
use Neos\ContentRepository\Core\Subscription\Engine\SubscriptionEngine;
use Neos\ContentRepository\Core\Subscription\Store\SubscriptionCriteria;
use Neos\ContentRepository\Core\Subscription\SubscriptionAndProjectionStatus;
use Neos\ContentRepository\Core\Subscription\ProjectionSubscriptionStatus;
use Neos\ContentRepository\Core\Subscription\SubscriptionId;
use Neos\ContentRepository\Core\Subscription\SubscriptionStatus;
use Neos\ContentRepository\TestSuite\Fakes\FakeCatchUpHookFactory;
Expand All @@ -41,8 +40,6 @@ abstract class AbstractSubscriptionEngineTestCase extends TestCase // we don't u
{
protected ContentRepository $contentRepository;

protected SubscriptionService $subscriptionService;

protected SubscriptionEngine $subscriptionEngine;

protected EventStoreInterface $eventStore;
Expand Down Expand Up @@ -102,8 +99,6 @@ final protected function setupContentRepositoryDependencies(ContentRepositoryId
$contentRepositoryId
);

$this->subscriptionService = $this->getObject(ContentRepositoryRegistry::class)->buildService($contentRepositoryId, new SubscriptionServiceFactory());

$subscriptionEngineAndEventStoreAccessor = new class implements ContentRepositoryServiceFactoryInterface {
public EventStoreInterface|null $eventStore;
public SubscriptionEngine|null $subscriptionEngine;
Expand Down Expand Up @@ -140,9 +135,9 @@ final protected function resetDatabase(Connection $connection, ContentRepository
$connection->prepare('SET FOREIGN_KEY_CHECKS = 1;')->executeStatement();
}

final protected function subscriptionStatus(string $subscriptionId): ?SubscriptionAndProjectionStatus
final protected function subscriptionStatus(string $subscriptionId): ProjectionSubscriptionStatus|DetachedSubscriptionStatus|null
{
return $this->subscriptionService->subscriptionEngine->subscriptionStatuses(SubscriptionCriteria::create(ids: [SubscriptionId::fromString($subscriptionId)]))->first();
return $this->subscriptionEngine->subscriptionStatuses(SubscriptionCriteria::create(ids: [SubscriptionId::fromString($subscriptionId)]))->first();
}

final protected function commitExampleContentStreamEvent(): void
Expand All @@ -162,12 +157,12 @@ final protected function expectOkayStatus($subscriptionId, SubscriptionStatus $s
{
$actual = $this->subscriptionStatus($subscriptionId);
self::assertEquals(
SubscriptionAndProjectionStatus::create(
ProjectionSubscriptionStatus::create(
subscriptionId: SubscriptionId::fromString($subscriptionId),
subscriptionStatus: $status,
subscriptionPosition: $sequenceNumber,
subscriptionError: null,
projectionStatus: ProjectionStatus::ok(),
setupStatus: ProjectionSetupStatus::ok(),
),
$actual
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@
namespace Neos\ContentRepository\BehavioralTests\Tests\Functional\Subscription;

use Neos\ContentRepository\Core\Feature\ContentStreamCreation\Event\ContentStreamWasCreated;
use Neos\ContentRepository\Core\Projection\ProjectionStatus;
use Neos\ContentRepository\Core\Projection\ProjectionSetupStatus;
use Neos\ContentRepository\Core\Subscription\Exception\CatchUpFailed;
use Neos\ContentRepository\Core\Subscription\SubscriptionAndProjectionStatus;
use Neos\ContentRepository\Core\Subscription\ProjectionSubscriptionStatus;
use Neos\ContentRepository\Core\Subscription\SubscriptionError;
use Neos\ContentRepository\Core\Subscription\SubscriptionId;
use Neos\ContentRepository\Core\Subscription\SubscriptionStatus;
Expand All @@ -18,11 +18,11 @@ final class CatchUpHookErrorTest extends AbstractSubscriptionEngineTestCase
/** @test */
public function error_onBeforeEvent_projectionIsNotRun()
{
$this->subscriptionService->setupEventStore();
$this->eventStore->setup();
$this->fakeProjection->expects(self::once())->method('setUp');
$this->fakeProjection->expects(self::once())->method('apply');
$this->subscriptionService->subscriptionEngine->setup();
$this->subscriptionService->subscriptionEngine->boot();
$this->subscriptionEngine->setup();
$this->subscriptionEngine->boot();

// commit an event
$this->commitExampleContentStreamEvent();
Expand All @@ -37,12 +37,12 @@ public function error_onBeforeEvent_projectionIsNotRun()

$this->secondFakeProjection->injectSaboteur(fn () => self::fail('Projection apply is not expected to be called!'));

$expectedFailure = SubscriptionAndProjectionStatus::create(
$expectedFailure = ProjectionSubscriptionStatus::create(
subscriptionId: SubscriptionId::fromString('Vendor.Package:SecondFakeProjection'),
subscriptionStatus: SubscriptionStatus::ERROR,
subscriptionPosition: SequenceNumber::none(),
subscriptionError: SubscriptionError::fromPreviousStatusAndException(SubscriptionStatus::ACTIVE, $exception),
projectionStatus: ProjectionStatus::ok(),
setupStatus: ProjectionSetupStatus::ok(),
);

self::assertEmpty(
Expand All @@ -66,11 +66,11 @@ public function error_onBeforeEvent_projectionIsNotRun()
/** @test */
public function error_onAfterEvent_projectionIsRolledBack()
{
$this->subscriptionService->setupEventStore();
$this->eventStore->setup();
$this->fakeProjection->expects(self::once())->method('setUp');
$this->fakeProjection->expects(self::once())->method('apply');
$this->subscriptionService->subscriptionEngine->setup();
$this->subscriptionService->subscriptionEngine->boot();
$this->subscriptionEngine->setup();
$this->subscriptionEngine->boot();

// commit an event
$this->commitExampleContentStreamEvent();
Expand All @@ -83,12 +83,12 @@ public function error_onAfterEvent_projectionIsRolledBack()
// TODO pass the error subscription status to onAfterCatchUp, so that in case of an error it can be prevented that mails f.x. will be sent?
$this->catchupHookForFakeProjection->expects(self::once())->method('onAfterCatchUp');

$expectedFailure = SubscriptionAndProjectionStatus::create(
$expectedFailure = ProjectionSubscriptionStatus::create(
subscriptionId: SubscriptionId::fromString('Vendor.Package:SecondFakeProjection'),
subscriptionStatus: SubscriptionStatus::ERROR,
subscriptionPosition: SequenceNumber::none(),
subscriptionError: SubscriptionError::fromPreviousStatusAndException(SubscriptionStatus::ACTIVE, $exception),
projectionStatus: ProjectionStatus::ok(),
setupStatus: ProjectionSetupStatus::ok(),
);

self::assertEmpty(
Expand All @@ -112,11 +112,11 @@ public function error_onAfterEvent_projectionIsRolledBack()
/** @test */
public function error_onBeforeCatchUp_abortsCatchup()
{
$this->subscriptionService->setupEventStore();
$this->eventStore->setup();
$this->fakeProjection->expects(self::once())->method('setUp');
$this->fakeProjection->expects(self::never())->method('apply');
$this->subscriptionService->subscriptionEngine->setup();
$this->subscriptionService->subscriptionEngine->boot();
$this->subscriptionEngine->setup();
$this->subscriptionEngine->boot();

$this->expectOkayStatus('Vendor.Package:SecondFakeProjection', SubscriptionStatus::ACTIVE, SequenceNumber::none());

Expand Down Expand Up @@ -158,11 +158,11 @@ public function error_onBeforeCatchUp_abortsCatchup()
/** @test */
public function error_onAfterCatchUp_abortsCatchupAndRollBack()
{
$this->subscriptionService->setupEventStore();
$this->eventStore->setup();
$this->fakeProjection->expects(self::once())->method('setUp');
$this->fakeProjection->expects(self::once())->method('apply');
$this->subscriptionService->subscriptionEngine->setup();
$this->subscriptionService->subscriptionEngine->boot();
$this->subscriptionEngine->setup();
$this->subscriptionEngine->boot();

$this->expectOkayStatus('Vendor.Package:SecondFakeProjection', SubscriptionStatus::ACTIVE, SequenceNumber::none());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,11 @@ final class CatchUpHookTest extends AbstractSubscriptionEngineTestCase
/** @test */
public function catchUpHooksAreExecutedAndCanAccessTheCorrectProjectionsState()
{
$this->subscriptionService->setupEventStore();
$this->eventStore->setup();
$this->fakeProjection->expects(self::once())->method('setUp');
$this->fakeProjection->expects(self::once())->method('apply');
$this->subscriptionService->subscriptionEngine->setup();
$this->subscriptionService->subscriptionEngine->boot();
$this->subscriptionEngine->setup();
$this->subscriptionEngine->boot();

// commit an event
$this->commitExampleContentStreamEvent();
Expand Down
Loading
Loading