diff --git a/.composer.json b/.composer.json index 1bf4b28f90..e7f37b9966 100644 --- a/.composer.json +++ b/.composer.json @@ -25,8 +25,9 @@ "../../bin/phpunit --colors --stop-on-failure -c ../../Build/BuildEssentials/PhpUnit/UnitTests.xml Neos.ContentRepository.Core/Tests/Unit", "../../bin/phpunit --colors --stop-on-failure -c ../../Build/BuildEssentials/PhpUnit/UnitTests.xml Neos.ContentRepositoryRegistry/Tests/Unit" ], + "test:paratest-cli": "../../bin/paratest --debug -v --functional --processes 2 --colors --stop-on-failure -c ../../Build/BuildEssentials/PhpUnit/FunctionalTests.xml", "test:parallel": [ - "FLOW_CONTEXT=Testing/Behat ../../bin/paratest --debug -v --functional --group parallel --processes 2 --colors --stop-on-failure -c ../../Build/BuildEssentials/PhpUnit/FunctionalTests.xml Neos.ContentRepository.BehavioralTests/Tests/Functional/Feature/WorkspacePublication/WorkspaceWritingDuringPublication.php" + "for f in Neos.ContentRepository.BehavioralTests/Tests/Parallel/**/*Test.php; do composer test:paratest-cli $f; done" ], "test:behat-cli": "../../bin/behat -f progress --strict --no-interaction", "test:behavioral": [ diff --git a/Neos.ContentRepository.BehavioralTests/Tests/Parallel/AbstractParallelTestCase.php b/Neos.ContentRepository.BehavioralTests/Tests/Parallel/AbstractParallelTestCase.php index 67afbdc91a..569609ee5c 100644 --- a/Neos.ContentRepository.BehavioralTests/Tests/Parallel/AbstractParallelTestCase.php +++ b/Neos.ContentRepository.BehavioralTests/Tests/Parallel/AbstractParallelTestCase.php @@ -52,14 +52,15 @@ final protected function awaitFile(string $filename): void } } - final protected function awaitSharedLock($resource, int $maximumCycles = 2000): void + final protected function awaitFileRemoval(string $filename): void { $waiting = 0; - while (!flock($resource, LOCK_SH)) { - usleep(10000); + while (!is_file($filename)) { + usleep(1000); $waiting++; - if ($waiting > $maximumCycles) { - throw new \Exception('timeout while waiting on shared lock'); + clearstatcache(true, $filename); + if ($waiting > 60000) { + throw new \Exception('timeout while waiting on file ' . $filename); } } } @@ -82,6 +83,11 @@ final protected function setUpContentRepository( final protected function log(string $message): void { - file_put_contents(self::LOGGING_PATH, substr($this::class, strrpos($this::class, '\\') + 1) . ': ' . getmypid() . ': ' . $message . PHP_EOL, FILE_APPEND); + file_put_contents(self::LOGGING_PATH, self::shortClassName($this::class) . ': ' . getmypid() . ': ' . $message . PHP_EOL, FILE_APPEND); + } + + final protected static function shortClassName(string $className): string + { + return substr($className, strrpos($className, '\\') + 1); } } diff --git a/Neos.ContentRepository.BehavioralTests/Tests/Parallel/WorkspacePublicationDuringWriting/WorkspacePublicationDuringWritingTest.php b/Neos.ContentRepository.BehavioralTests/Tests/Parallel/WorkspacePublicationDuringWriting/WorkspacePublicationDuringWritingTest.php new file mode 100644 index 0000000000..d96a9adddf --- /dev/null +++ b/Neos.ContentRepository.BehavioralTests/Tests/Parallel/WorkspacePublicationDuringWriting/WorkspacePublicationDuringWritingTest.php @@ -0,0 +1,261 @@ +log('------ process started ------'); + // todo refrain from Gherkin naming here and make fakes easier to use: https://github.com/neos/neos-development-collection/pull/5346 + GherkinTableNodeBasedContentDimensionSourceFactory::$contentDimensionsToUse = new class implements ContentDimensionSourceInterface + { + public function getDimension(ContentDimensionId $dimensionId): ?ContentDimension + { + return null; + } + public function getContentDimensionsOrderedByPriority(): array + { + return []; + } + }; + // todo refrain from Gherkin naming here and make fakes easier to use: https://github.com/neos/neos-development-collection/pull/5346 + GherkinPyStringNodeBasedNodeTypeManagerFactory::$nodeTypesToUse = new NodeTypeManager( + fn (): array => [ + 'Neos.ContentRepository:Root' => [], + 'Neos.ContentRepository.Testing:Document' => [ + 'properties' => [ + 'title' => [ + 'type' => 'string' + ] + ] + ] + ] + ); + + $setupLockResource = fopen(self::SETUP_LOCK_PATH, 'w+'); + + $exclusiveNonBlockingLockResult = flock($setupLockResource, LOCK_EX | LOCK_NB); + if ($exclusiveNonBlockingLockResult === false) { + $this->log('waiting for setup'); + if (!flock($setupLockResource, LOCK_SH)) { + throw new \RuntimeException('failed to acquire blocking shared lock'); + } + $this->contentRepository = $this->contentRepositoryRegistry + ->get(ContentRepositoryId::fromString('test_parallel')); + $this->log('wait for setup finished'); + return; + } + + $this->log('setup started'); + $contentRepository = $this->setUpContentRepository(ContentRepositoryId::fromString('test_parallel')); + + $origin = OriginDimensionSpacePoint::createWithoutDimensions(); + $contentRepository->handle(CreateRootWorkspace::create( + WorkspaceName::forLive(), + ContentStreamId::fromString('live-cs-id') + )); + $contentRepository->handle(CreateRootNodeAggregateWithNode::create( + WorkspaceName::forLive(), + NodeAggregateId::fromString('lady-eleonode-rootford'), + NodeTypeName::fromString(NodeTypeName::ROOT_NODE_TYPE_NAME) + )); + $contentRepository->handle(CreateNodeAggregateWithNode::create( + WorkspaceName::forLive(), + NodeAggregateId::fromString('nody-mc-nodeface'), + NodeTypeName::fromString('Neos.ContentRepository.Testing:Document'), + $origin, + NodeAggregateId::fromString('lady-eleonode-rootford'), + initialPropertyValues: PropertyValuesToWrite::fromArray([ + 'title' => 'title-original' + ]) + )); + $contentRepository->handle(CreateWorkspace::create( + WorkspaceName::fromString('user-test'), + WorkspaceName::forLive(), + ContentStreamId::fromString('user-cs-id') + )); + for ($i = 0; $i <= 5000; $i++) { + $contentRepository->handle(CreateNodeAggregateWithNode::create( + WorkspaceName::fromString('user-test'), + NodeAggregateId::fromString('nody-mc-nodeface-' . $i), + NodeTypeName::fromString('Neos.ContentRepository.Testing:Document'), + $origin, + NodeAggregateId::fromString('lady-eleonode-rootford'), + initialPropertyValues: PropertyValuesToWrite::fromArray([ + 'title' => 'title' + ]) + )); + } + $this->contentRepository = $contentRepository; + + if (!flock($setupLockResource, LOCK_UN)) { + throw new \RuntimeException('failed to release setup lock'); + } + + $this->log('setup finished'); + } + + /** + * @test + * @group parallel + */ + public function whileANodesArWrittenOnLive(): void + { + $this->log('writing started'); + + touch(self::WRITING_IS_RUNNING_FLAG_PATH); + + try { + for ($i = 0; $i <= 50; $i++) { + $this->contentRepository->handle( + SetNodeProperties::create( + WorkspaceName::forLive(), + NodeAggregateId::fromString('nody-mc-nodeface'), + OriginDimensionSpacePoint::createWithoutDimensions(), + PropertyValuesToWrite::fromArray([ + 'title' => 'changed-title-' . $i + ]) + ) + ); + } + } finally { + unlink(self::WRITING_IS_RUNNING_FLAG_PATH); + } + + $this->log('writing finished'); + Assert::assertTrue(true, 'No exception was thrown ;)'); + } + + /** + * @test + * @group parallel + */ + public function thenConcurrentPublishLeadsToException(): void + { + if (!is_file(self::WRITING_IS_RUNNING_FLAG_PATH)) { + $this->log('waiting to publish'); + + $this->awaitFile(self::WRITING_IS_RUNNING_FLAG_PATH); + // If write is the process that does the (slowish) setup, and then waits for the rebase to start, + // We give the CR some time to close the content stream + // TODO find another way than to randomly wait!!! + // The problem is, if we dont sleep it happens often that the modification works only then the rebase is startet _really_ + // Doing the modification several times in hope that the second one fails will likely just stop the rebase thread as it cannot close + usleep(10000); + } + + $this->log('publish started'); + + + /* + // NOTE, can also be tested with PartialPublish, or PartialPublish leading to a full publish, but this test only allows one at time :) + + $nodesForAFullPublish = 5000; + $nodesForAPartialPublish = $nodesForAFullPublish - 1; + + $nodeIdToPublish = []; + for ($i = 0; $i <= $nodesForAPartialPublish; $i++) { + $nodeIdToPublish[] = new NodeIdToPublishOrDiscard( + NodeAggregateId::fromString('nody-mc-nodeface-' . $i), // see nodes created above + DimensionSpacePoint::createWithoutDimensions() + ); + } + + $this->contentRepository->handle(PublishIndividualNodesFromWorkspace::create( + WorkspaceName::fromString('user-test'), + NodeIdsToPublishOrDiscard::create(...$nodeIdToPublish) + )); + */ + + $actualException = null; + try { + $this->contentRepository->handle(PublishWorkspace::create( + WorkspaceName::fromString('user-test') + )); + } catch (\Exception $thrownException) { + $actualException = $thrownException; + $this->log(sprintf('Got exception %s: %s', self::shortClassName($actualException::class), $actualException->getMessage())); + } + + $this->log('publish finished'); + + if ($actualException === null) { + Assert::fail(sprintf('No exception was thrown')); + } + + Assert::assertInstanceOf(ConcurrencyException::class, $actualException); + + $this->awaitFileRemoval(self::WRITING_IS_RUNNING_FLAG_PATH); + + // writing to user works!!! + try { + $this->contentRepository->handle( + SetNodeProperties::create( + WorkspaceName::fromString('user-test'), + NodeAggregateId::fromString('nody-mc-nodeface'), + OriginDimensionSpacePoint::createWithoutDimensions(), + PropertyValuesToWrite::fromArray([ + 'title' => 'written-after-failed-publish' + ]) + ) + ); + } catch (ContentStreamIsClosed $exception) { + Assert::fail(sprintf('Workspace that failed to be publish cannot be written: %s', $exception->getMessage())); + } + + $node = $this->contentRepository->getContentGraph(WorkspaceName::fromString('user-test')) + ->getSubgraph(DimensionSpacePoint::createWithoutDimensions(), VisibilityConstraints::withoutRestrictions()) + ->findNodeById(NodeAggregateId::fromString('nody-mc-nodeface')); + + Assert::assertSame('written-after-failed-publish', $node?->getProperty('title')); + } +} diff --git a/Neos.ContentRepository.BehavioralTests/Tests/Parallel/WorkspaceWritingDuringRebase/WorkspaceWritingDuringRebase.php b/Neos.ContentRepository.BehavioralTests/Tests/Parallel/WorkspaceWritingDuringRebase/WorkspaceWritingDuringRebaseTest.php similarity index 92% rename from Neos.ContentRepository.BehavioralTests/Tests/Parallel/WorkspaceWritingDuringRebase/WorkspaceWritingDuringRebase.php rename to Neos.ContentRepository.BehavioralTests/Tests/Parallel/WorkspaceWritingDuringRebase/WorkspaceWritingDuringRebaseTest.php index dedd5d918d..802205f2e1 100644 --- a/Neos.ContentRepository.BehavioralTests/Tests/Parallel/WorkspaceWritingDuringRebase/WorkspaceWritingDuringRebase.php +++ b/Neos.ContentRepository.BehavioralTests/Tests/Parallel/WorkspaceWritingDuringRebase/WorkspaceWritingDuringRebaseTest.php @@ -43,7 +43,7 @@ use Neos\Flow\ObjectManagement\ObjectManagerInterface; use PHPUnit\Framework\Assert; -class WorkspaceWritingDuringRebase extends AbstractParallelTestCase +class WorkspaceWritingDuringRebaseTest extends AbstractParallelTestCase { private const SETUP_LOCK_PATH = __DIR__ . '/setup-lock'; @@ -88,7 +88,9 @@ public function getContentDimensionsOrderedByPriority(): array $exclusiveNonBlockingLockResult = flock($setupLockResource, LOCK_EX | LOCK_NB); if ($exclusiveNonBlockingLockResult === false) { $this->log('waiting for setup'); - $this->awaitSharedLock($setupLockResource); + if (!flock($setupLockResource, LOCK_SH)) { + throw new \RuntimeException('failed to acquire blocking shared lock'); + } $this->contentRepository = $this->contentRepositoryRegistry ->get(ContentRepositoryId::fromString('test_parallel')); $this->log('wait for setup finished'); @@ -158,7 +160,7 @@ public function whileAWorkspaceIsBeingRebased(): void try { $this->contentRepository->handle( RebaseWorkspace::create($workspaceName) - ->withRebasedContentStreamId(ContentStreamId::create()) + ->withRebasedContentStreamId(ContentStreamId::fromString('user-cs-rebased')) ->withErrorHandlingStrategy(RebaseErrorHandlingStrategy::STRATEGY_FORCE)); } finally { unlink(self::REBASE_IS_RUNNING_FLAG_PATH); @@ -188,6 +190,11 @@ public function thenConcurrentCommandsLeadToAnException(): void $this->log('write started'); + $workspaceDuringRebase = $this->contentRepository->getContentGraph(WorkspaceName::fromString('user-test')); + Assert::assertSame('user-cs-id', $workspaceDuringRebase->getContentStreamId()->value, + 'The parallel tests expects the workspace to still point to the original cs.' + ); + $origin = OriginDimensionSpacePoint::createWithoutDimensions(); $actualException = null; try { @@ -201,6 +208,7 @@ public function thenConcurrentCommandsLeadToAnException(): void )); } catch (\Exception $thrownException) { $actualException = $thrownException; + $this->log(sprintf('Got exception %s: %s', self::shortClassName($actualException::class), $actualException->getMessage())); } $this->log('write finished'); @@ -215,7 +223,7 @@ public function thenConcurrentCommandsLeadToAnException(): void Assert::assertThat($actualException, self::logicalOr( self::isInstanceOf(ContentStreamIsClosed::class), - self::isInstanceOf(ConcurrencyException::class), + self::isInstanceOf(ConcurrencyException::class), // todo is only thrown theoretical? but not during tests here ... )); Assert::assertSame('title-original', $node?->getProperty('title')); diff --git a/Neos.ContentRepository.Core/Classes/CommandHandler/CommandBus.php b/Neos.ContentRepository.Core/Classes/CommandHandler/CommandBus.php index 5621e17fd2..5c9a2e6fa7 100644 --- a/Neos.ContentRepository.Core/Classes/CommandHandler/CommandBus.php +++ b/Neos.ContentRepository.Core/Classes/CommandHandler/CommandBus.php @@ -30,6 +30,9 @@ public function __construct( } /** + * The handler only calculate which events they want to have published, + * but do not do the publishing themselves + * * @return EventsToPublish|\Generator */ public function handle(CommandInterface|RebasableToOtherWorkspaceInterface $command): EventsToPublish|\Generator diff --git a/Neos.ContentRepository.Core/Classes/CommandHandler/CommandHandlingDependencies.php b/Neos.ContentRepository.Core/Classes/CommandHandler/CommandHandlingDependencies.php index 9e726cacea..629f5c01c1 100644 --- a/Neos.ContentRepository.Core/Classes/CommandHandler/CommandHandlingDependencies.php +++ b/Neos.ContentRepository.Core/Classes/CommandHandler/CommandHandlingDependencies.php @@ -16,6 +16,7 @@ use Neos\ContentRepository\Core\Projection\ContentGraph\ContentGraphInterface; use Neos\ContentRepository\Core\Projection\ContentGraph\ContentGraphReadModelInterface; +use Neos\ContentRepository\Core\SharedModel\Exception\ContentStreamDoesNotExistYet; use Neos\ContentRepository\Core\SharedModel\Exception\WorkspaceDoesNotExist; use Neos\ContentRepository\Core\SharedModel\Workspace\ContentStreamId; use Neos\ContentRepository\Core\SharedModel\Workspace\Workspace; @@ -48,11 +49,17 @@ public function contentStreamExists(ContentStreamId $contentStreamId): bool return $this->contentGraphReadModel->findContentStreamById($contentStreamId) !== null; } + /** + * @throws ContentStreamDoesNotExistYet if there is no matching content stream + */ public function isContentStreamClosed(ContentStreamId $contentStreamId): bool { $contentStream = $this->contentGraphReadModel->findContentStreamById($contentStreamId); if ($contentStream === null) { - throw new \InvalidArgumentException(sprintf('Failed to find content stream with id "%s"', $contentStreamId->value), 1729863973); + throw new ContentStreamDoesNotExistYet( + 'Content stream "' . $contentStreamId->value . '" does not exist.', + 1521386692 + ); } return $contentStream->isClosed; } diff --git a/Neos.ContentRepository.Core/Classes/ContentRepository.php b/Neos.ContentRepository.Core/Classes/ContentRepository.php index c885d16851..efc240075e 100644 --- a/Neos.ContentRepository.Core/Classes/ContentRepository.php +++ b/Neos.ContentRepository.Core/Classes/ContentRepository.php @@ -47,6 +47,7 @@ use Neos\ContentRepository\Core\SharedModel\Workspace\WorkspaceName; use Neos\ContentRepository\Core\SharedModel\Workspace\Workspaces; use Neos\EventStore\EventStoreInterface; +use Neos\EventStore\Exception\ConcurrencyException; use Neos\EventStore\Model\EventEnvelope; use Neos\EventStore\Model\EventStream\VirtualStreamName; use Psr\Clock\ClockInterface; @@ -97,19 +98,43 @@ public function __construct( public function handle(CommandInterface $command): void { $command = $this->commandHook->onBeforeHandle($command); - // the commands only calculate which events they want to have published, but do not do the - // publishing themselves - $eventsToPublishOrGenerator = $this->commandBus->handle($command); - - if ($eventsToPublishOrGenerator instanceof EventsToPublish) { - $eventsToPublish = $this->enrichEventsToPublishWithMetadata($eventsToPublishOrGenerator); - $this->eventPersister->publishEvents($this, $eventsToPublish); - } else { - foreach ($eventsToPublishOrGenerator as $eventsToPublish) { - assert($eventsToPublish instanceof EventsToPublish); // just for the ide - $eventsToPublish = $this->enrichEventsToPublishWithMetadata($eventsToPublish); - $this->eventPersister->publishEvents($this, $eventsToPublish); + + $toPublish = $this->commandBus->handle($command); + + // simple case + if ($toPublish instanceof EventsToPublish) { + $eventsToPublish = $this->enrichEventsToPublishWithMetadata($toPublish); + $this->eventPersister->publishWithoutCatchup($eventsToPublish); + $this->catchupProjections(); + return; + } + + // control-flow aware command handling via generator + try { + foreach ($toPublish as $yieldedEventsToPublish) { + $eventsToPublish = $this->enrichEventsToPublishWithMetadata($yieldedEventsToPublish); + try { + $this->eventPersister->publishWithoutCatchup($eventsToPublish); + } catch (ConcurrencyException $concurrencyException) { + // we pass the exception into the generator (->throw), so it could be try-caught and reacted upon: + // + // try { + // yield EventsToPublish(...); + // } catch (ConcurrencyException $e) { + // yield $this->reopenContentStream(); + // throw $e; + // } + $yieldedErrorStrategy = $toPublish->throw($concurrencyException); + if ($yieldedErrorStrategy instanceof EventsToPublish) { + $this->eventPersister->publishWithoutCatchup($yieldedErrorStrategy); + } + throw $concurrencyException; + } } + } finally { + // We always NEED to catchup even if there was an unexpected ConcurrencyException to make sure previous commits are handled. + // Technically it would be acceptable for the catchup to fail here (due to hook errors) because all the events are already persisted. + $this->catchupProjections(); } } diff --git a/Neos.ContentRepository.Core/Classes/EventStore/EventPersister.php b/Neos.ContentRepository.Core/Classes/EventStore/EventPersister.php index f6c38f803d..1af59ff3ce 100644 --- a/Neos.ContentRepository.Core/Classes/EventStore/EventPersister.php +++ b/Neos.ContentRepository.Core/Classes/EventStore/EventPersister.php @@ -8,6 +8,7 @@ use Neos\EventStore\EventStoreInterface; use Neos\EventStore\Exception\ConcurrencyException; use Neos\EventStore\Model\Events; +use Neos\EventStore\Model\EventStore\CommitResult; /** * Internal service to persist {@see EventInterface} with the proper normalization, and triggering the @@ -24,19 +25,28 @@ public function __construct( } /** + * TODO Will be refactored via https://github.com/neos/neos-development-collection/pull/5321 * @throws ConcurrencyException in case the expectedVersion does not match */ public function publishEvents(ContentRepository $contentRepository, EventsToPublish $eventsToPublish): void + { + $this->publishWithoutCatchup($eventsToPublish); + $contentRepository->catchUpProjections(); + } + + /** + * TODO Will be refactored via https://github.com/neos/neos-development-collection/pull/5321 + * @throws ConcurrencyException in case the expectedVersion does not match + */ + public function publishWithoutCatchup(EventsToPublish $eventsToPublish): CommitResult { $normalizedEvents = Events::fromArray( $eventsToPublish->events->map($this->eventNormalizer->normalize(...)) ); - $this->eventStore->commit( + return $this->eventStore->commit( $eventsToPublish->streamName, $normalizedEvents, $eventsToPublish->expectedVersion ); - - $contentRepository->catchUpProjections(); } } diff --git a/Neos.ContentRepository.Core/Classes/Feature/Common/ConstraintChecks.php b/Neos.ContentRepository.Core/Classes/Feature/Common/ConstraintChecks.php index 2be9688f06..6e4ec8a739 100644 --- a/Neos.ContentRepository.Core/Classes/Feature/Common/ConstraintChecks.php +++ b/Neos.ContentRepository.Core/Classes/Feature/Common/ConstraintChecks.php @@ -81,14 +81,9 @@ protected function requireContentStream( CommandHandlingDependencies $commandHandlingDependencies ): ContentStreamId { $contentStreamId = $commandHandlingDependencies->getContentGraph($workspaceName)->getContentStreamId(); - if (!$commandHandlingDependencies->contentStreamExists($contentStreamId)) { - throw new ContentStreamDoesNotExistYet( - 'Content stream for "' . $workspaceName->value . '" does not exist yet.', - 1521386692 - ); - } + $isContentStreamClosed = $commandHandlingDependencies->isContentStreamClosed($contentStreamId); - if ($commandHandlingDependencies->isContentStreamClosed($contentStreamId)) { + if ($isContentStreamClosed) { throw new ContentStreamIsClosed( 'Content stream "' . $contentStreamId->value . '" is closed.', 1710260081 diff --git a/Neos.ContentRepository.Core/Classes/Feature/ContentStreamHandling.php b/Neos.ContentRepository.Core/Classes/Feature/ContentStreamHandling.php index c3027d7114..a228ca7c86 100644 --- a/Neos.ContentRepository.Core/Classes/Feature/ContentStreamHandling.php +++ b/Neos.ContentRepository.Core/Classes/Feature/ContentStreamHandling.php @@ -9,55 +9,25 @@ use Neos\ContentRepository\Core\EventStore\EventsToPublish; use Neos\ContentRepository\Core\Feature\ContentStreamClosing\Event\ContentStreamWasClosed; use Neos\ContentRepository\Core\Feature\ContentStreamClosing\Event\ContentStreamWasReopened; -use Neos\ContentRepository\Core\Feature\ContentStreamCreation\Event\ContentStreamWasCreated; use Neos\ContentRepository\Core\Feature\ContentStreamForking\Event\ContentStreamWasForked; use Neos\ContentRepository\Core\Feature\ContentStreamRemoval\Event\ContentStreamWasRemoved; use Neos\ContentRepository\Core\SharedModel\Exception\ContentStreamAlreadyExists; use Neos\ContentRepository\Core\SharedModel\Exception\ContentStreamDoesNotExistYet; use Neos\ContentRepository\Core\SharedModel\Exception\ContentStreamIsClosed; -use Neos\ContentRepository\Core\SharedModel\Exception\ContentStreamIsNotClosed; use Neos\ContentRepository\Core\SharedModel\Workspace\ContentStreamId; +use Neos\EventStore\Model\Event\Version; use Neos\EventStore\Model\EventStream\ExpectedVersion; trait ContentStreamHandling { - /** - * @param ContentStreamId $contentStreamId The id of the content stream to create - * @throws ContentStreamAlreadyExists - * @phpstan-pure this method is pure, to persist the events they must be handled outside - */ - private function createContentStream( - ContentStreamId $contentStreamId, - CommandHandlingDependencies $commandHandlingDependencies, - ): EventsToPublish { - $this->requireContentStreamToNotExistYet($contentStreamId, $commandHandlingDependencies); - $streamName = ContentStreamEventStreamName::fromContentStreamId($contentStreamId) - ->getEventStreamName(); - - return new EventsToPublish( - $streamName, - Events::with( - new ContentStreamWasCreated( - $contentStreamId, - ) - ), - ExpectedVersion::NO_STREAM() - ); - } - /** * @param ContentStreamId $contentStreamId The id of the content stream to close - * @param CommandHandlingDependencies $commandHandlingDependencies - * @return EventsToPublish * @phpstan-pure this method is pure, to persist the events they must be handled outside */ private function closeContentStream( ContentStreamId $contentStreamId, - CommandHandlingDependencies $commandHandlingDependencies, + Version $contentStreamVersion, ): EventsToPublish { - $this->requireContentStreamToExist($contentStreamId, $commandHandlingDependencies); - $expectedVersion = $this->getExpectedVersionOfContentStream($contentStreamId, $commandHandlingDependencies); - $this->requireContentStreamToNotBeClosed($contentStreamId, $commandHandlingDependencies); $streamName = ContentStreamEventStreamName::fromContentStreamId($contentStreamId)->getEventStreamName(); return new EventsToPublish( @@ -67,7 +37,7 @@ private function closeContentStream( $contentStreamId, ), ), - $expectedVersion + ExpectedVersion::fromVersion($contentStreamVersion) ); } @@ -75,21 +45,18 @@ private function closeContentStream( * @param ContentStreamId $contentStreamId The id of the content stream to reopen * @phpstan-pure this method is pure, to persist the events they must be handled outside */ - private function reopenContentStream( + private function reopenContentStreamWithoutConstraintChecks( ContentStreamId $contentStreamId, - CommandHandlingDependencies $commandHandlingDependencies, ): EventsToPublish { - $this->requireContentStreamToExist($contentStreamId, $commandHandlingDependencies); - $this->requireContentStreamToBeClosed($contentStreamId, $commandHandlingDependencies); - $streamName = ContentStreamEventStreamName::fromContentStreamId($contentStreamId)->getEventStreamName(); - return new EventsToPublish( - $streamName, + ContentStreamEventStreamName::fromContentStreamId($contentStreamId)->getEventStreamName(), Events::with( new ContentStreamWasReopened( $contentStreamId ), ), + // We operate here without constraints on purpose to ensure this can be commited. + //Constraints have been checked beforehand and its expected that the content stream is closed. ExpectedVersion::ANY() ); } @@ -104,19 +71,10 @@ private function reopenContentStream( private function forkContentStream( ContentStreamId $newContentStreamId, ContentStreamId $sourceContentStreamId, - CommandHandlingDependencies $commandHandlingDependencies + Version $sourceContentStreamVersion ): EventsToPublish { - $this->requireContentStreamToExist($sourceContentStreamId, $commandHandlingDependencies); - $this->requireContentStreamToNotBeClosed($sourceContentStreamId, $commandHandlingDependencies); - $this->requireContentStreamToNotExistYet($newContentStreamId, $commandHandlingDependencies); - - $sourceContentStreamVersion = $commandHandlingDependencies->getContentStreamVersion($sourceContentStreamId); - - $streamName = ContentStreamEventStreamName::fromContentStreamId($newContentStreamId) - ->getEventStreamName(); - return new EventsToPublish( - $streamName, + ContentStreamEventStreamName::fromContentStreamId($newContentStreamId)->getEventStreamName(), Events::with( new ContentStreamWasForked( $newContentStreamId, @@ -133,25 +91,19 @@ private function forkContentStream( * @param ContentStreamId $contentStreamId The id of the content stream to remove * @phpstan-pure this method is pure, to persist the events they must be handled outside */ - private function removeContentStream( + private function removeContentStreamWithoutConstraintChecks( ContentStreamId $contentStreamId, - CommandHandlingDependencies $commandHandlingDependencies ): EventsToPublish { - $this->requireContentStreamToExist($contentStreamId, $commandHandlingDependencies); - $expectedVersion = $this->getExpectedVersionOfContentStream($contentStreamId, $commandHandlingDependencies); - - $streamName = ContentStreamEventStreamName::fromContentStreamId( - $contentStreamId - )->getEventStreamName(); - return new EventsToPublish( - $streamName, + ContentStreamEventStreamName::fromContentStreamId($contentStreamId)->getEventStreamName(), Events::with( new ContentStreamWasRemoved( $contentStreamId, ), ), - $expectedVersion + // We operate here without constraints on purpose to ensure this can be commited. + // Constraints have been checked beforehand and its expected that the content stream is closed. + ExpectedVersion::ANY() ); } @@ -172,23 +124,6 @@ private function requireContentStreamToNotExistYet( } } - /** - * @param ContentStreamId $contentStreamId - * @param CommandHandlingDependencies $commandHandlingDependencies - * @throws ContentStreamDoesNotExistYet - */ - private function requireContentStreamToExist( - ContentStreamId $contentStreamId, - CommandHandlingDependencies $commandHandlingDependencies - ): void { - if (!$commandHandlingDependencies->contentStreamExists($contentStreamId)) { - throw new ContentStreamDoesNotExistYet( - 'Content stream "' . $contentStreamId->value . '" does not exist yet.', - 1521386692 - ); - } - } - private function requireContentStreamToNotBeClosed( ContentStreamId $contentStreamId, CommandHandlingDependencies $commandHandlingDependencies @@ -200,24 +135,4 @@ private function requireContentStreamToNotBeClosed( ); } } - - private function requireContentStreamToBeClosed( - ContentStreamId $contentStreamId, - CommandHandlingDependencies $commandHandlingDependencies - ): void { - if (!$commandHandlingDependencies->isContentStreamClosed($contentStreamId)) { - throw new ContentStreamIsNotClosed( - 'Content stream "' . $contentStreamId->value . '" is not closed.', - 1710405911 - ); - } - } - - private function getExpectedVersionOfContentStream( - ContentStreamId $contentStreamId, - CommandHandlingDependencies $commandHandlingDependencies - ): ExpectedVersion { - $version = $commandHandlingDependencies->getContentStreamVersion($contentStreamId); - return ExpectedVersion::fromVersion($version); - } } diff --git a/Neos.ContentRepository.Core/Classes/Feature/WorkspaceCommandHandler.php b/Neos.ContentRepository.Core/Classes/Feature/WorkspaceCommandHandler.php index b1be71e7ce..a29b777754 100644 --- a/Neos.ContentRepository.Core/Classes/Feature/WorkspaceCommandHandler.php +++ b/Neos.ContentRepository.Core/Classes/Feature/WorkspaceCommandHandler.php @@ -27,6 +27,8 @@ use Neos\ContentRepository\Core\Feature\Common\RebasableToOtherWorkspaceInterface; use Neos\ContentRepository\Core\Feature\ContentStreamClosing\Event\ContentStreamWasClosed; use Neos\ContentRepository\Core\Feature\ContentStreamClosing\Event\ContentStreamWasReopened; +use Neos\ContentRepository\Core\Feature\ContentStreamCreation\Event\ContentStreamWasCreated; +use Neos\ContentRepository\Core\Feature\ContentStreamRemoval\Event\ContentStreamWasRemoved; use Neos\ContentRepository\Core\Feature\WorkspaceCreation\Command\CreateRootWorkspace; use Neos\ContentRepository\Core\Feature\WorkspaceCreation\Command\CreateWorkspace; use Neos\ContentRepository\Core\Feature\WorkspaceCreation\Event\RootWorkspaceWasCreated; @@ -54,6 +56,7 @@ use Neos\ContentRepository\Core\Feature\WorkspaceRebase\Exception\WorkspaceRebaseFailed; use Neos\ContentRepository\Core\SharedModel\Exception\ContentStreamAlreadyExists; use Neos\ContentRepository\Core\SharedModel\Exception\ContentStreamDoesNotExistYet; +use Neos\ContentRepository\Core\SharedModel\Exception\ContentStreamIsClosed; use Neos\ContentRepository\Core\SharedModel\Exception\WorkspaceDoesNotExist; use Neos\ContentRepository\Core\SharedModel\Exception\WorkspaceHasNoBaseWorkspaceName; use Neos\ContentRepository\Core\SharedModel\Workspace\ContentStreamId; @@ -61,6 +64,7 @@ use Neos\ContentRepository\Core\SharedModel\Workspace\WorkspaceName; use Neos\ContentRepository\Core\SharedModel\Workspace\WorkspaceStatus; use Neos\EventStore\EventStoreInterface; +use Neos\EventStore\Exception\ConcurrencyException; use Neos\EventStore\Model\Event\SequenceNumber; use Neos\EventStore\Model\Event\Version; use Neos\EventStore\Model\EventStream\EventStreamInterface; @@ -85,6 +89,9 @@ public function canHandle(CommandInterface|RebasableToOtherWorkspaceInterface $c return method_exists($this, 'handle' . (new \ReflectionClass($command))->getShortName()); } + /** + * @return \Generator + */ public function handle(CommandInterface|RebasableToOtherWorkspaceInterface $command, CommandHandlingDependencies $commandHandlingDependencies): \Generator { /** @phpstan-ignore-next-line */ @@ -113,7 +120,6 @@ private function handleCreateWorkspace( ): \Generator { $this->requireWorkspaceToNotExist($command->workspaceName, $commandHandlingDependencies); $baseWorkspace = $commandHandlingDependencies->findWorkspaceByName($command->baseWorkspaceName); - if ($baseWorkspace === null) { throw new BaseWorkspaceDoesNotExist(sprintf( 'The workspace %s (base workspace of %s) does not exist', @@ -121,12 +127,15 @@ private function handleCreateWorkspace( $command->workspaceName->value ), 1513890708); } + $sourceContentStreamVersion = $commandHandlingDependencies->getContentStreamVersion($baseWorkspace->currentContentStreamId); + $this->requireContentStreamToNotBeClosed($baseWorkspace->currentContentStreamId, $commandHandlingDependencies); + $this->requireContentStreamToNotExistYet($command->newContentStreamId, $commandHandlingDependencies); // When the workspace is created, we first have to fork the content stream yield $this->forkContentStream( $command->newContentStreamId, $baseWorkspace->currentContentStreamId, - $commandHandlingDependencies + $sourceContentStreamVersion ); yield new EventsToPublish( @@ -152,11 +161,16 @@ private function handleCreateRootWorkspace( CommandHandlingDependencies $commandHandlingDependencies, ): \Generator { $this->requireWorkspaceToNotExist($command->workspaceName, $commandHandlingDependencies); + $this->requireContentStreamToNotExistYet($command->newContentStreamId, $commandHandlingDependencies); - $newContentStreamId = $command->newContentStreamId; - yield $this->createContentStream( - $newContentStreamId, - $commandHandlingDependencies + yield new EventsToPublish( + ContentStreamEventStreamName::fromContentStreamId($command->newContentStreamId)->getEventStreamName(), + Events::with( + new ContentStreamWasCreated( + $command->newContentStreamId, + ) + ), + ExpectedVersion::NO_STREAM() ); yield new EventsToPublish( @@ -164,7 +178,7 @@ private function handleCreateRootWorkspace( Events::with( new RootWorkspaceWasCreated( $command->workspaceName, - $newContentStreamId + $command->newContentStreamId ) ), ExpectedVersion::ANY() @@ -181,17 +195,8 @@ private function handlePublishWorkspace( // no-op return; } - - if (!$commandHandlingDependencies->contentStreamExists($workspace->currentContentStreamId)) { - throw new \RuntimeException('Cannot publish nodes on a workspace with a stateless content stream', 1729711258); - } - $this->requireContentStreamToNotBeClosed($baseWorkspace->currentContentStreamId, $commandHandlingDependencies); - $baseContentStreamVersion = $commandHandlingDependencies->getContentStreamVersion($baseWorkspace->currentContentStreamId); - - yield $this->closeContentStream( - $workspace->currentContentStreamId, - $commandHandlingDependencies - ); + $workspaceContentStreamVersion = $this->requireOpenContentStreamAndVersion($workspace, $commandHandlingDependencies); + $baseWorkspaceContentStreamVersion = $this->requireOpenContentStreamAndVersion($baseWorkspace, $commandHandlingDependencies); $rebaseableCommands = RebaseableCommands::extractFromEventStream( $this->eventStore->load( @@ -200,31 +205,30 @@ private function handlePublishWorkspace( ) ); - try { - yield from $this->publishWorkspace( - $workspace, - $baseWorkspace, - $command->newContentStreamId, - $baseContentStreamVersion, - $rebaseableCommands, - $commandHandlingDependencies - ); - } catch (WorkspaceRebaseFailed $workspaceRebaseFailed) { - yield $this->reopenContentStream( - $workspace->currentContentStreamId, - $commandHandlingDependencies - ); - throw $workspaceRebaseFailed; - } + yield $this->closeContentStream( + $workspace->currentContentStreamId, + $workspaceContentStreamVersion + ); + + yield from $this->publishWorkspace( + $workspace, + $baseWorkspace, + $baseWorkspaceContentStreamVersion, + $command->newContentStreamId, + $rebaseableCommands + ); } + /** + * Note that the workspaces content stream must be closed beforehand. + * It will be reopened here in case of error. + */ private function publishWorkspace( Workspace $workspace, Workspace $baseWorkspace, + Version $baseWorkspaceContentStreamVersion, ContentStreamId $newContentStreamId, - Version $baseContentStreamVersion, - RebaseableCommands $rebaseableCommands, - CommandHandlingDependencies $commandHandlingDependencies, + RebaseableCommands $rebaseableCommands ): \Generator { $commandSimulator = $this->commandSimulatorFactory->createSimulatorForWorkspace($baseWorkspace->workspaceName); @@ -237,24 +241,36 @@ static function ($handle) use ($rebaseableCommands): void { ); if ($commandSimulator->hasConflicts()) { + yield $this->reopenContentStreamWithoutConstraintChecks( + $workspace->currentContentStreamId + ); throw WorkspaceRebaseFailed::duringPublish($commandSimulator->getConflictingEvents()); } - yield new EventsToPublish( - ContentStreamEventStreamName::fromContentStreamId($baseWorkspace->currentContentStreamId) - ->getEventStreamName(), - $this->getCopiedEventsOfEventStream( - $baseWorkspace->workspaceName, - $baseWorkspace->currentContentStreamId, - $commandSimulator->eventStream(), - ), - ExpectedVersion::fromVersion($baseContentStreamVersion) + $eventsOfWorkspaceToPublish = $this->getCopiedEventsOfEventStream( + $baseWorkspace->workspaceName, + $baseWorkspace->currentContentStreamId, + $commandSimulator->eventStream(), ); + try { + yield new EventsToPublish( + ContentStreamEventStreamName::fromContentStreamId($baseWorkspace->currentContentStreamId) + ->getEventStreamName(), + $eventsOfWorkspaceToPublish, + ExpectedVersion::fromVersion($baseWorkspaceContentStreamVersion) + ); + } catch (ConcurrencyException $concurrencyException) { + yield $this->reopenContentStreamWithoutConstraintChecks( + $workspace->currentContentStreamId + ); + throw $concurrencyException; + } + yield $this->forkContentStream( $newContentStreamId, $baseWorkspace->currentContentStreamId, - $commandHandlingDependencies + Version::fromInteger($baseWorkspaceContentStreamVersion->value + $eventsOfWorkspaceToPublish->count()) ); yield new EventsToPublish( @@ -270,19 +286,19 @@ static function ($handle) use ($rebaseableCommands): void { ExpectedVersion::ANY() ); - yield $this->removeContentStream($workspace->currentContentStreamId, $commandHandlingDependencies); + yield $this->removeContentStreamWithoutConstraintChecks($workspace->currentContentStreamId); } private function rebaseWorkspaceWithoutChanges( Workspace $workspace, Workspace $baseWorkspace, - ContentStreamId $newContentStreamId, - CommandHandlingDependencies $commandHandlingDependencies, + Version $baseWorkspaceContentStreamVersion, + ContentStreamId $newContentStreamId ): \Generator { yield $this->forkContentStream( $newContentStreamId, $baseWorkspace->currentContentStreamId, - $commandHandlingDependencies + $baseWorkspaceContentStreamVersion ); yield new EventsToPublish( @@ -297,11 +313,11 @@ private function rebaseWorkspaceWithoutChanges( ExpectedVersion::ANY() ); - yield $this->removeContentStream($workspace->currentContentStreamId, $commandHandlingDependencies); + yield $this->removeContentStreamWithoutConstraintChecks($workspace->currentContentStreamId); } /** - * Copy all events from the passed event stream which implement the {@see PublishableToOtherContentStreamsInterface} + * Copy all events from the passed event stream which implement the {@see PublishableToWorkspaceInterface} */ private function getCopiedEventsOfEventStream( WorkspaceName $targetWorkspaceName, @@ -334,9 +350,9 @@ private function handleRebaseWorkspace( ): \Generator { $workspace = $this->requireWorkspace($command->workspaceName, $commandHandlingDependencies); $baseWorkspace = $this->requireBaseWorkspace($workspace, $commandHandlingDependencies); - if (!$commandHandlingDependencies->contentStreamExists($workspace->currentContentStreamId)) { - throw new \RuntimeException('Cannot rebase a workspace with a stateless content stream', 1711718314); - } + + $workspaceContentStreamVersion = $this->requireOpenContentStreamAndVersion($workspace, $commandHandlingDependencies); + $baseWorkspaceContentStreamVersion = $this->requireOpenContentStreamAndVersion($baseWorkspace, $commandHandlingDependencies); if ( $workspace->status === WorkspaceStatus::UP_TO_DATE @@ -346,18 +362,18 @@ private function handleRebaseWorkspace( return; } - yield $this->closeContentStream( - $workspace->currentContentStreamId, - $commandHandlingDependencies - ); - if (!$workspace->hasPublishableChanges()) { // if we have no changes in the workspace we can fork from the base directly + yield $this->closeContentStream( + $workspace->currentContentStreamId, + $workspaceContentStreamVersion + ); + yield from $this->rebaseWorkspaceWithoutChanges( $workspace, $baseWorkspace, - $command->rebasedContentStreamId, - $commandHandlingDependencies + $baseWorkspaceContentStreamVersion, + $command->rebasedContentStreamId ); return; } @@ -369,6 +385,11 @@ private function handleRebaseWorkspace( ) ); + yield $this->closeContentStream( + $workspace->currentContentStreamId, + $workspaceContentStreamVersion + ); + $commandSimulator = $this->commandSimulatorFactory->createSimulatorForWorkspace($baseWorkspace->workspaceName); $commandSimulator->run( @@ -383,9 +404,8 @@ static function ($handle) use ($rebaseableCommands): void { $command->rebaseErrorHandlingStrategy === RebaseErrorHandlingStrategy::STRATEGY_FAIL && $commandSimulator->hasConflicts() ) { - yield $this->reopenContentStream( - $workspace->currentContentStreamId, - $commandHandlingDependencies + yield $this->reopenContentStreamWithoutConstraintChecks( + $workspace->currentContentStreamId ); // throw an exception that contains all the information about what exactly failed @@ -396,6 +416,7 @@ static function ($handle) use ($rebaseableCommands): void { yield from $this->forkNewContentStreamAndApplyEvents( $command->rebasedContentStreamId, $baseWorkspace->currentContentStreamId, + $baseWorkspaceContentStreamVersion, new EventsToPublish( WorkspaceEventStreamName::fromWorkspaceName($command->workspaceName)->getEventStreamName(), Events::with( @@ -411,21 +432,16 @@ static function ($handle) use ($rebaseableCommands): void { $command->workspaceName, $command->rebasedContentStreamId, $commandSimulator->eventStream(), - ), - $commandHandlingDependencies + ) ); - yield $this->removeContentStream($workspace->currentContentStreamId, $commandHandlingDependencies); + yield $this->removeContentStreamWithoutConstraintChecks($workspace->currentContentStreamId); } /** * This method is like a combined Rebase and Publish! * - * @throws BaseWorkspaceDoesNotExist - * @throws ContentStreamAlreadyExists - * @throws ContentStreamDoesNotExistYet - * @throws WorkspaceDoesNotExist - * @throws \Exception + * @return \Generator */ private function handlePublishIndividualNodesFromWorkspace( PublishIndividualNodesFromWorkspace $command, @@ -438,17 +454,8 @@ private function handlePublishIndividualNodesFromWorkspace( return; } - // todo check that fetching workspace throws if there is no content stream id for it - if (!$commandHandlingDependencies->contentStreamExists($workspace->currentContentStreamId)) { - throw new \RuntimeException('Cannot publish nodes on a workspace with a stateless content stream', 1710410114); - } - $this->requireContentStreamToNotBeClosed($baseWorkspace->currentContentStreamId, $commandHandlingDependencies); - $baseContentStreamVersion = $commandHandlingDependencies->getContentStreamVersion($baseWorkspace->currentContentStreamId); - - yield $this->closeContentStream( - $workspace->currentContentStreamId, - $commandHandlingDependencies - ); + $workspaceContentStreamVersion = $this->requireOpenContentStreamAndVersion($workspace, $commandHandlingDependencies); + $baseWorkspaceContentStreamVersion = $this->requireOpenContentStreamAndVersion($baseWorkspace, $commandHandlingDependencies); $rebaseableCommands = RebaseableCommands::extractFromEventStream( $this->eventStore->load( @@ -461,32 +468,24 @@ private function handlePublishIndividualNodesFromWorkspace( if ($matchingCommands->isEmpty()) { // almost a noop (e.g. random node ids were specified) ;) - yield $this->reopenContentStream( - $workspace->currentContentStreamId, - $commandHandlingDependencies - ); return; } + yield $this->closeContentStream( + $workspace->currentContentStreamId, + $workspaceContentStreamVersion + ); + if ($remainingCommands->isEmpty()) { - try { - // do a full publish, this is simpler for the projections to handle - yield from $this->publishWorkspace( - $workspace, - $baseWorkspace, - $command->contentStreamIdForRemainingPart, - $baseContentStreamVersion, - $matchingCommands, - $commandHandlingDependencies - ); - return; - } catch (WorkspaceRebaseFailed $workspaceRebaseFailed) { - yield $this->reopenContentStream( - $workspace->currentContentStreamId, - $commandHandlingDependencies - ); - throw $workspaceRebaseFailed; - } + // do a full publish, this is simpler for the projections to handle + yield from $this->publishWorkspace( + $workspace, + $baseWorkspace, + $baseWorkspaceContentStreamVersion, + $command->contentStreamIdForRemainingPart, + $matchingCommands + ); + return; } $commandSimulator = $this->commandSimulatorFactory->createSimulatorForWorkspace($baseWorkspace->workspaceName); @@ -505,29 +504,38 @@ static function ($handle) use ($commandSimulator, $matchingCommands, $remainingC ); if ($commandSimulator->hasConflicts()) { - yield $this->reopenContentStream( - $workspace->currentContentStreamId, - $commandHandlingDependencies + yield $this->reopenContentStreamWithoutConstraintChecks( + $workspace->currentContentStreamId ); throw WorkspaceRebaseFailed::duringPublish($commandSimulator->getConflictingEvents()); } - // this could be a no-op for the rare case when a command returns empty events e.g. the node was already tagged with this subtree tag, meaning we actually just rebase - yield new EventsToPublish( - ContentStreamEventStreamName::fromContentStreamId($baseWorkspace->currentContentStreamId) - ->getEventStreamName(), - $this->getCopiedEventsOfEventStream( - $baseWorkspace->workspaceName, - $baseWorkspace->currentContentStreamId, - $commandSimulator->eventStream()->withMaximumSequenceNumber($highestSequenceNumberForMatching), - ), - ExpectedVersion::fromVersion($baseContentStreamVersion) + // this could empty and a no-op for the rare case when a command returns empty events e.g. the node was already tagged with this subtree tag + $selectedEventsOfWorkspaceToPublish = $this->getCopiedEventsOfEventStream( + $baseWorkspace->workspaceName, + $baseWorkspace->currentContentStreamId, + $commandSimulator->eventStream()->withMaximumSequenceNumber($highestSequenceNumberForMatching), ); + try { + yield new EventsToPublish( + ContentStreamEventStreamName::fromContentStreamId($baseWorkspace->currentContentStreamId) + ->getEventStreamName(), + $selectedEventsOfWorkspaceToPublish, + ExpectedVersion::fromVersion($baseWorkspaceContentStreamVersion) + ); + } catch (ConcurrencyException $concurrencyException) { + yield $this->reopenContentStreamWithoutConstraintChecks( + $workspace->currentContentStreamId + ); + throw $concurrencyException; + } + yield from $this->forkNewContentStreamAndApplyEvents( $command->contentStreamIdForRemainingPart, $baseWorkspace->currentContentStreamId, + Version::fromInteger($baseWorkspaceContentStreamVersion->value + $selectedEventsOfWorkspaceToPublish->count()), new EventsToPublish( WorkspaceEventStreamName::fromWorkspaceName($command->workspaceName)->getEventStreamName(), Events::fromArray([ @@ -545,11 +553,10 @@ static function ($handle) use ($commandSimulator, $matchingCommands, $remainingC $command->workspaceName, $command->contentStreamIdForRemainingPart, $commandSimulator->eventStream()->withMinimumSequenceNumber($highestSequenceNumberForMatching->next()) - ), - $commandHandlingDependencies + ) ); - yield $this->removeContentStream($workspace->currentContentStreamId, $commandHandlingDependencies); + yield $this->removeContentStreamWithoutConstraintChecks($workspace->currentContentStreamId); } /** @@ -574,40 +581,37 @@ private function handleDiscardIndividualNodesFromWorkspace( return; } - if (!$commandHandlingDependencies->contentStreamExists($workspace->currentContentStreamId)) { - throw new \RuntimeException('Cannot discard nodes on a workspace with a stateless content stream', 1710408112); - } - - yield $this->closeContentStream( - $workspace->currentContentStreamId, - $commandHandlingDependencies - ); + $workspaceContentStreamVersion = $this->requireOpenContentStreamAndVersion($workspace, $commandHandlingDependencies); + $baseWorkspaceContentStreamVersion = $this->requireOpenContentStreamAndVersion($baseWorkspace, $commandHandlingDependencies); - // filter commands, only keeping the ones NOT MATCHING the nodes from the command (i.e. the modifications we want to keep) $rebaseableCommands = RebaseableCommands::extractFromEventStream( $this->eventStore->load( ContentStreamEventStreamName::fromContentStreamId($workspace->currentContentStreamId) ->getEventStreamName() ) ); + + // filter commands, only keeping the ones NOT MATCHING the nodes from the command (i.e. the modifications we want to keep) [$commandsToDiscard, $commandsToKeep] = $rebaseableCommands->separateMatchingAndRemainingCommands($command->nodesToDiscard); if ($commandsToDiscard->isEmpty()) { // if we have nothing to discard, we can just keep all. (e.g. random node ids were specified) It's almost a noop ;) - yield $this->reopenContentStream( - $workspace->currentContentStreamId, - $commandHandlingDependencies - ); return; } + yield $this->closeContentStream( + $workspace->currentContentStreamId, + $workspaceContentStreamVersion + ); + if ($commandsToKeep->isEmpty()) { // quick path everything was discarded yield from $this->discardWorkspace( $workspace, + $workspaceContentStreamVersion, $baseWorkspace, - $command->newContentStreamId, - $commandHandlingDependencies + $baseWorkspaceContentStreamVersion, + $command->newContentStreamId ); return; } @@ -623,9 +627,8 @@ static function ($handle) use ($commandsToKeep): void { ); if ($commandSimulator->hasConflicts()) { - yield $this->reopenContentStream( - $workspace->currentContentStreamId, - $commandHandlingDependencies + yield $this->reopenContentStreamWithoutConstraintChecks( + $workspace->currentContentStreamId ); throw WorkspaceRebaseFailed::duringDiscard($commandSimulator->getConflictingEvents()); } @@ -633,6 +636,7 @@ static function ($handle) use ($commandsToKeep): void { yield from $this->forkNewContentStreamAndApplyEvents( $command->newContentStreamId, $baseWorkspace->currentContentStreamId, + $baseWorkspaceContentStreamVersion, new EventsToPublish( WorkspaceEventStreamName::fromWorkspaceName($command->workspaceName)->getEventStreamName(), Events::with( @@ -649,11 +653,10 @@ static function ($handle) use ($commandsToKeep): void { $command->workspaceName, $command->newContentStreamId, $commandSimulator->eventStream(), - ), - $commandHandlingDependencies + ) ); - yield $this->removeContentStream($workspace->currentContentStreamId, $commandHandlingDependencies); + yield $this->removeContentStreamWithoutConstraintChecks($workspace->currentContentStreamId); } /** @@ -672,31 +675,32 @@ private function handleDiscardWorkspace( return; } + $workspaceContentStreamVersion = $this->requireOpenContentStreamAndVersion($workspace, $commandHandlingDependencies); + $baseWorkspaceContentStreamVersion = $this->requireOpenContentStreamAndVersion($baseWorkspace, $commandHandlingDependencies); + yield from $this->discardWorkspace( $workspace, + $workspaceContentStreamVersion, $baseWorkspace, - $command->newContentStreamId, - $commandHandlingDependencies + $baseWorkspaceContentStreamVersion, + $command->newContentStreamId ); } /** - * @param Workspace $workspace - * @param Workspace $baseWorkspace - * @param ContentStreamId $newContentStream - * @param CommandHandlingDependencies $commandHandlingDependencies * @phpstan-pure this method is pure, to persist the events they must be handled outside */ private function discardWorkspace( Workspace $workspace, + Version $workspaceContentStreamVersion, Workspace $baseWorkspace, - ContentStreamId $newContentStream, - CommandHandlingDependencies $commandHandlingDependencies + Version $baseWorkspaceContentStreamVersion, + ContentStreamId $newContentStream ): \Generator { yield $this->forkContentStream( $newContentStream, $baseWorkspace->currentContentStreamId, - $commandHandlingDependencies + $baseWorkspaceContentStreamVersion ); yield new EventsToPublish( @@ -711,7 +715,7 @@ private function discardWorkspace( ExpectedVersion::ANY() ); - yield $this->removeContentStream($workspace->currentContentStreamId, $commandHandlingDependencies); + yield $this->removeContentStreamWithoutConstraintChecks($workspace->currentContentStreamId); } /** @@ -729,6 +733,8 @@ private function handleChangeBaseWorkspace( $workspace = $this->requireWorkspace($command->workspaceName, $commandHandlingDependencies); $currentBaseWorkspace = $this->requireBaseWorkspace($workspace, $commandHandlingDependencies); + $this->requireContentStreamToNotBeClosed($workspace->currentContentStreamId, $commandHandlingDependencies); + if ($currentBaseWorkspace->workspaceName->equals($command->baseWorkspaceName)) { // no-op return; @@ -736,13 +742,14 @@ private function handleChangeBaseWorkspace( $this->requireEmptyWorkspace($workspace); $newBaseWorkspace = $this->requireWorkspace($command->baseWorkspaceName, $commandHandlingDependencies); - $this->requireNonCircularRelationBetweenWorkspaces($workspace, $newBaseWorkspace, $commandHandlingDependencies); + $newBaseWorkspaceContentStreamVersion = $this->requireOpenContentStreamAndVersion($newBaseWorkspace, $commandHandlingDependencies); + yield $this->forkContentStream( $command->newContentStreamId, $newBaseWorkspace->currentContentStreamId, - $commandHandlingDependencies + $newBaseWorkspaceContentStreamVersion ); yield new EventsToPublish( @@ -766,10 +773,16 @@ private function handleDeleteWorkspace( CommandHandlingDependencies $commandHandlingDependencies, ): \Generator { $workspace = $this->requireWorkspace($command->workspaceName, $commandHandlingDependencies); + $contentStreamVersion = $commandHandlingDependencies->getContentStreamVersion($workspace->currentContentStreamId); - yield $this->removeContentStream( - $workspace->currentContentStreamId, - $commandHandlingDependencies + yield new EventsToPublish( + ContentStreamEventStreamName::fromContentStreamId($workspace->currentContentStreamId)->getEventStreamName(), + Events::with( + new ContentStreamWasRemoved( + $workspace->currentContentStreamId, + ), + ), + ExpectedVersion::fromVersion($contentStreamVersion) ); yield new EventsToPublish( @@ -786,14 +799,14 @@ private function handleDeleteWorkspace( private function forkNewContentStreamAndApplyEvents( ContentStreamId $newContentStreamId, ContentStreamId $sourceContentStreamId, + Version $sourceContentStreamVersion, EventsToPublish $pointWorkspaceToNewContentStream, Events $eventsToApplyOnNewContentStream, - CommandHandlingDependencies $commandHandlingDependencies, ): \Generator { yield $this->forkContentStream( $newContentStreamId, $sourceContentStreamId, - $commandHandlingDependencies + $sourceContentStreamVersion )->withAppendedEvents(Events::with( new ContentStreamWasClosed( $newContentStreamId @@ -828,6 +841,17 @@ private function requireWorkspaceToNotExist(WorkspaceName $workspaceName, Comman ), 1715341085); } + private function requireOpenContentStreamAndVersion(Workspace $workspace, CommandHandlingDependencies $commandHandlingDependencies): Version + { + if ($commandHandlingDependencies->isContentStreamClosed($workspace->currentContentStreamId)) { + throw new ContentStreamIsClosed( + 'Content stream "' . $workspace->currentContentStreamId . '" is closed.', + 1730730516 + ); + } + return $commandHandlingDependencies->getContentStreamVersion($workspace->currentContentStreamId); + } + /** * @throws WorkspaceDoesNotExist */ diff --git a/Neos.ContentRepository.TestSuite/Classes/Behavior/Features/Bootstrap/GenericCommandExecutionAndEventPublication.php b/Neos.ContentRepository.TestSuite/Classes/Behavior/Features/Bootstrap/GenericCommandExecutionAndEventPublication.php index 7e150e296e..67c91d5187 100644 --- a/Neos.ContentRepository.TestSuite/Classes/Behavior/Features/Bootstrap/GenericCommandExecutionAndEventPublication.php +++ b/Neos.ContentRepository.TestSuite/Classes/Behavior/Features/Bootstrap/GenericCommandExecutionAndEventPublication.php @@ -171,6 +171,9 @@ protected function addDefaultCommandArgumentValues(string $commandClassName, arr if (is_string($commandArguments['parentNodeAggregateId'] ?? null) && str_starts_with($commandArguments['parentNodeAggregateId'], '$')) { $commandArguments['parentNodeAggregateId'] = $this->rememberedNodeAggregateIds[substr($commandArguments['parentNodeAggregateId'], 1)]?->value; } + if (empty($commandArguments['nodeName'])) { + unset($commandArguments['nodeName']); + } } if ($commandClassName === SetNodeProperties::class) { if (is_string($commandArguments['propertyValues'] ?? null)) { diff --git a/Neos.Neos/Classes/Domain/Model/UserId.php b/Neos.Neos/Classes/Domain/Model/UserId.php index 2011ebb9a5..cf73375dde 100644 --- a/Neos.Neos/Classes/Domain/Model/UserId.php +++ b/Neos.Neos/Classes/Domain/Model/UserId.php @@ -15,7 +15,7 @@ public function __construct( public string $value ) { if (!preg_match('/^([a-z0-9\-]{1,40})$/', $value)) { - throw new \InvalidArgumentException(sprintf('Invalid user id "%s" (a user id must only contain lowercase characters, numbers and the "-" sign).', 1718293224)); + throw new \InvalidArgumentException(sprintf('Invalid user id "%s" (a user id must only contain lowercase characters, numbers and the "-" sign).', $this->value), 1718293224); } } diff --git a/composer.json b/composer.json index 7ed3a2599a..668fb6e791 100644 --- a/composer.json +++ b/composer.json @@ -108,8 +108,9 @@ "../../bin/phpunit --colors --stop-on-failure -c ../../Build/BuildEssentials/PhpUnit/UnitTests.xml Neos.ContentRepository.Core/Tests/Unit", "../../bin/phpunit --colors --stop-on-failure -c ../../Build/BuildEssentials/PhpUnit/UnitTests.xml Neos.ContentRepositoryRegistry/Tests/Unit" ], + "test:paratest-cli": "../../bin/paratest --debug -v --functional --processes 2 --colors --stop-on-failure -c ../../Build/BuildEssentials/PhpUnit/FunctionalTests.xml", "test:parallel": [ - "FLOW_CONTEXT=Testing/Behat ../../bin/paratest --debug -v --functional --group parallel --processes 2 --colors --stop-on-failure -c ../../Build/BuildEssentials/PhpUnit/FunctionalTests.xml Neos.ContentRepository.BehavioralTests/Tests/Parallel/WorkspaceWritingDuringRebase/WorkspaceWritingDuringRebase.php" + "for f in Neos.ContentRepository.BehavioralTests/Tests/Parallel/**/*Test.php; do composer test:paratest-cli $f; done" ], "test:behat-cli": "../../bin/behat -f progress --strict --no-interaction", "test:behavioral": [ @@ -296,5 +297,14 @@ "phpunit/phpunit": "^9.0", "neos/behat": "*", "league/flysystem-memory": "^3" + }, + + "config": { + "_comment": "We need to insert a vendor dir (even though composer install MUST NOT be run here) but so autoloading works for composer scripts", + "vendor-dir": "../Libraries", + "allow-plugins": { + "neos/composer-plugin": false, + "cweagans/composer-patches": false + } } }