From b9c94f40c850b4c54d4f29f4f440b11b3efeedf5 Mon Sep 17 00:00:00 2001 From: Robert Lemke Date: Thu, 20 Aug 2020 08:57:50 +0200 Subject: [PATCH] FEATURE: Replay / catch up until sequence number This change introduces a feature which allows for applying events (catchup / replay) up to a specified event sequence number. Besides supporting this in the `EventListenerInvoker`, the feature is also provided by the `projection:replay` and `projection:replayall` commands which introduce a new flag for this purpose. Resolves #268 --- .../Command/ProjectionCommandController.php | 48 +++++--- .../EventListener/EventListenerInvoker.php | 52 +++++++- .../InMemory/InMemoryStreamIterator.php | 111 ++++++++++++++++++ Classes/Projection/ProjectionManager.php | 41 +++++-- .../EventListenerInvokerTest.php | 74 +++++++++--- .../InMemory/InMemoryStreamIteratorTest.php | 107 +++++++++++++++++ 6 files changed, 390 insertions(+), 43 deletions(-) create mode 100644 Classes/EventStore/Storage/InMemory/InMemoryStreamIterator.php create mode 100644 Tests/Unit/EventStore/Storage/InMemory/InMemoryStreamIteratorTest.php diff --git a/Classes/Command/ProjectionCommandController.php b/Classes/Command/ProjectionCommandController.php index d3875038..58f0cb19 100644 --- a/Classes/Command/ProjectionCommandController.php +++ b/Classes/Command/ProjectionCommandController.php @@ -89,9 +89,11 @@ public function describeCommand(string $projection): void /** * Replay a projection * - * This command allows you to replay all relevant events for one specific projection. + * This command allows you to replay all relevant events for one specific projection. By passing a maximum + * sequence number you can limit replay up to the specified event. * * @param string $projection The projection identifier; see projection:list for possible options + * @param int|null $maximumSequenceNumber If specified, only replay events until this event sequence number * @param bool $quiet If specified, this command won't produce any output apart from errors (useful for automation) * @return void * @throws EventCouldNotBeAppliedException @@ -99,21 +101,27 @@ public function describeCommand(string $projection): void * @see neos.eventsourcing:projection:list * @see neos.eventsourcing:projection:replayall */ - public function replayCommand(string $projection, $quiet = false): void + public function replayCommand(string $projection, int $maximumSequenceNumber = null, $quiet = false): void { $projectionDto = $this->resolveProjectionOrQuit($projection); if (!$quiet) { - $this->outputLine('Replaying events for projection "%s" ...', [$projectionDto->getIdentifier()]); + $this->outputLine('Replaying events for projection "%s"%s ...', [$projectionDto->getIdentifier(), ($maximumSequenceNumber ? ' until sequence number ' . $maximumSequenceNumber : '')]); $this->output->progressStart(); } $eventsCount = 0; - $this->projectionManager->replay($projectionDto->getIdentifier(), function () use (&$eventsCount, $quiet) { - $eventsCount ++; + $progressCallback = function () use (&$eventsCount, $quiet) { + $eventsCount++; if (!$quiet) { $this->output->progressAdvance(); } - }); + }; + if ($maximumSequenceNumber !== null) { + $this->projectionManager->replayUntilSequenceNumber($projectionDto->getIdentifier(), $maximumSequenceNumber, $progressCallback); + } else { + $this->projectionManager->replay($projectionDto->getIdentifier(), $progressCallback); + } + if (!$quiet) { $this->output->progressFinish(); $this->outputLine('Replayed %s events.', [$eventsCount]); @@ -123,31 +131,37 @@ public function replayCommand(string $projection, $quiet = false): void /** * Replay all projections * - * This command allows you to replay all relevant events for all known projections. + * This command allows you to replay all relevant events for all known projections. By passing a maximum + * sequence number you can limit replay up to the specified event. * + * @param int|null $maximumSequenceNumber If specified, only replay events until this event sequence number * @param bool $quiet If specified, this command won't produce any output apart from errors (useful for automation) * @return void - * @see neos.eventsourcing:projection:replay - * @see neos.eventsourcing:projection:list * @throws EventCouldNotBeAppliedException + * @see neos.eventsourcing:projection:list + * @see neos.eventsourcing:projection:replay + * @noinspection DisconnectedForeachInstructionInspection */ - public function replayAllCommand($quiet = false): void + public function replayAllCommand(int $maximumSequenceNumber = null, $quiet = false): void { if (!$quiet) { - $this->outputLine('Replaying all projections'); + $this->outputLine('Replaying all projections%s', [$maximumSequenceNumber ? ' until sequence number ' . $maximumSequenceNumber : '']); } $eventsCount = 0; + $progressCallback = function () use (&$eventsCount, $quiet) { + $eventsCount++; + $quiet || $this->output->progressAdvance(); + }; foreach ($this->projectionManager->getProjections() as $projection) { if (!$quiet) { $this->outputLine('Replaying events for projection "%s" ...', [$projection->getIdentifier()]); $this->output->progressStart(); } - $this->projectionManager->replay($projection->getIdentifier(), function () use (&$eventsCount, $quiet) { - $eventsCount++; - if (!$quiet) { - $this->output->progressAdvance(); - } - }); + if ($maximumSequenceNumber !== null) { + $this->projectionManager->replayUntilSequenceNumber($projection->getIdentifier(), $maximumSequenceNumber, $progressCallback); + } else { + $this->projectionManager->replay($projection->getIdentifier(), $progressCallback); + } if (!$quiet) { $this->output->progressFinish(); $this->outputLine(); diff --git a/Classes/EventListener/EventListenerInvoker.php b/Classes/EventListener/EventListenerInvoker.php index a1a118ba..1ea9f4b8 100644 --- a/Classes/EventListener/EventListenerInvoker.php +++ b/Classes/EventListener/EventListenerInvoker.php @@ -12,6 +12,7 @@ * source code. */ +use Closure; use Doctrine\DBAL\Connection; use Neos\EventSourcing\EventListener\AppliedEventsStorage\AppliedEventsStorageInterface; use Neos\EventSourcing\EventListener\AppliedEventsStorage\DoctrineAppliedEventsStorage; @@ -59,7 +60,7 @@ final class EventListenerInvoker private $connection; /** - * @var \Closure[] + * @var Closure[] */ private $progressCallbacks = []; @@ -72,6 +73,20 @@ final class EventListenerInvoker */ private $transactionBatchSize = 1; + /** + * Until which sequence number should events be caught up? + * By default, events are caught up (for example during replay) until the latest + * available event. + * + * @var int + */ + private $maximumSequenceNumber; + + /** + * @param EventStore $eventStore + * @param EventListenerInterface $eventListener + * @param Connection $connection + */ public function __construct(EventStore $eventStore, EventListenerInterface $eventListener, Connection $connection) { $this->eventStore = $eventStore; @@ -82,9 +97,9 @@ public function __construct(EventStore $eventStore, EventListenerInterface $even /** * Register a callback that is invoked for every event that is applied during replay/catchup * - * @param \Closure $callback + * @param Closure $callback */ - public function onProgress(\Closure $callback): void + public function onProgress(Closure $callback): void { $this->progressCallbacks[] = $callback; } @@ -94,10 +109,10 @@ public function onProgress(\Closure $callback): void * This allows for faster replays/catchups at the cost of an "at least once delivery" if an error occurs. * * Usage: - * $eventListenerInvoker = (new EventListenerInvoker($eventStore, $listener, $connection))->withTransactionBatchSize(100)->catchUp($listener); + * $eventListenerInvoker = (new EventListenerInvoker($eventStore, $listener, $connection))->withTransactionBatchSize(100)->catchUp(); * * @param int $batchSize - * @return $this + * @return self */ public function withTransactionBatchSize(int $batchSize): self { @@ -110,6 +125,26 @@ public function withTransactionBatchSize(int $batchSize): self return $instance; } + /** + * Configures the maximum sequence number for catch up / replay and returns this instance + * This allows for catching up or replaying events only until a certain point. + * + * Usage: + * $eventListenerInvoker = (new EventListenerInvoker($eventStore, $listener, $connection))->withMaximumSequenceNumber(268)->replay(); + * + * @param int $maximumSequenceNumber + * @return self + */ + public function withMaximumSequenceNumber(int $maximumSequenceNumber): self + { + if ($maximumSequenceNumber < 1) { + throw new \InvalidArgumentException('The maximum sequence number must be greater than 0', 1597821711); + } + $instance = clone $this; + $instance->maximumSequenceNumber = $maximumSequenceNumber; + return $instance; + } + /** * @throws EventCouldNotBeAppliedException */ @@ -131,11 +166,15 @@ public function catchUp(): void $streamName = $this->eventListener instanceof StreamAwareEventListenerInterface ? $this->eventListener::listensToStream() : StreamName::all(); $eventStream = $this->eventStore->load($streamName, $highestAppliedSequenceNumber + 1); $appliedEventsCounter = 0; + foreach ($eventStream as $eventEnvelope) { $sequenceNumber = $eventEnvelope->getRawEvent()->getSequenceNumber(); if ($sequenceNumber <= $highestAppliedSequenceNumber) { continue; } + if ($this->maximumSequenceNumber !== null && $sequenceNumber > $this->maximumSequenceNumber) { + break; + } try { $this->applyEvent($eventEnvelope); } catch (EventCouldNotBeAppliedException $exception) { @@ -186,6 +225,9 @@ private function applyEvent(EventEnvelope $eventEnvelope): void } } + /** + * @return AppliedEventsStorageInterface + */ private function getAppliedEventsStorage(): AppliedEventsStorageInterface { if ($this->eventListener instanceof ProvidesAppliedEventsStorageInterface) { diff --git a/Classes/EventStore/Storage/InMemory/InMemoryStreamIterator.php b/Classes/EventStore/Storage/InMemory/InMemoryStreamIterator.php new file mode 100644 index 00000000..365973b0 --- /dev/null +++ b/Classes/EventStore/Storage/InMemory/InMemoryStreamIterator.php @@ -0,0 +1,111 @@ +eventRecords = $eventRecords; + $this->innerIterator = new ArrayIterator($this->eventRecords); + } + + /** + * @return RawEvent + * @throws \JsonException + */ + public function current(): RawEvent + { + $currentEventData = $this->innerIterator->current(); + $payload = json_decode($currentEventData['payload'], true, 512, JSON_THROW_ON_ERROR); + $metadata = json_decode($currentEventData['metadata'], true, 512, JSON_THROW_ON_ERROR); + try { + $recordedAt = new \DateTimeImmutable($currentEventData['recordedat']); + } catch (\Exception $exception) { + throw new \RuntimeException(sprintf('Could not parse recordedat timestamp "%s" as date.', $currentEventData['recordedat']), 1597843669, $exception); + } + return new RawEvent( + (int)$currentEventData['sequencenumber'], + $currentEventData['type'], + $payload, + $metadata, + StreamName::fromString($currentEventData['stream']), + (int)$currentEventData['version'], + $currentEventData['id'], + $recordedAt + ); + } + + /** + * @return void + */ + public function next(): void + { + $this->currentOffset = $this->innerIterator->current()['sequencenumber']; + $this->innerIterator->next(); + } + + /** + * @return bool|int + */ + public function key() + { + return $this->innerIterator->valid() ? $this->innerIterator->current()['sequencenumber'] : null; + } + + /** + * @return bool + */ + public function valid(): bool + { + return $this->innerIterator->valid(); + } + + /** + * @return void + */ + public function rewind(): void + { + if ($this->currentOffset === 0) { + return; + } + $this->innerIterator->rewind(); + $this->currentOffset = 0; + } +} diff --git a/Classes/Projection/ProjectionManager.php b/Classes/Projection/ProjectionManager.php index 66912f1c..9d1b5b69 100644 --- a/Classes/Projection/ProjectionManager.php +++ b/Classes/Projection/ProjectionManager.php @@ -12,6 +12,7 @@ * source code. */ +use Closure; use Doctrine\ORM\EntityManagerInterface; use Neos\EventSourcing\EventListener\EventListenerInvoker; use Neos\EventSourcing\EventListener\Exception\EventCouldNotBeAppliedException; @@ -96,12 +97,42 @@ public function getProjection(string $projectionIdentifier): Projection * Replay events of the specified projection * * @param string $projectionIdentifier unambiguous identifier of the projection to replay - * @param \Closure $progressCallback If set, this callback is invoked for every applied event during replay with the arguments $sequenceNumber and $eventStreamVersion + * @param Closure|null $progressCallback If set, this callback is invoked for every applied event during replay with the arguments $sequenceNumber and $eventStreamVersion * @return void + * @throws EventCouldNotBeAppliedException * @api + */ + public function replay(string $projectionIdentifier, Closure $progressCallback = null): void + { + $eventListenerInvoker = $this->createEventListenerInvokerForProjection($projectionIdentifier); + if ($progressCallback !== null) { + $eventListenerInvoker->onProgress($progressCallback); + } + $eventListenerInvoker->replay(); + } + + /** + * Replay events of the specified projection until the specified event sequence number + * + * @param string $projectionIdentifier unambiguous identifier of the projection to replay + * @param int $maximumSequenceNumber The sequence number of the event until which events should be replayed. The specified event will be included in the replay. + * @param Closure|null $progressCallback If set, this callback is invoked for every applied event during replay with the arguments $sequenceNumber and $eventStreamVersion * @throws EventCouldNotBeAppliedException */ - public function replay(string $projectionIdentifier, \Closure $progressCallback = null): void + public function replayUntilSequenceNumber(string $projectionIdentifier, int $maximumSequenceNumber, Closure $progressCallback = null): void + { + $eventListenerInvoker = $this->createEventListenerInvokerForProjection($projectionIdentifier)->withMaximumSequenceNumber($maximumSequenceNumber); + if ($progressCallback !== null) { + $eventListenerInvoker->onProgress($progressCallback); + } + $eventListenerInvoker->replay(); + } + + /** + * @param string $projectionIdentifier + * @return EventListenerInvoker + */ + private function createEventListenerInvokerForProjection(string $projectionIdentifier): EventListenerInvoker { $projection = $this->getProjection($projectionIdentifier); @@ -113,11 +144,7 @@ public function replay(string $projectionIdentifier, \Closure $progressCallback $projector->reset(); $connection = $this->objectManager->get(EntityManagerInterface::class)->getConnection(); - $eventListenerInvoker = new EventListenerInvoker($eventStore, $projector, $connection); - if ($progressCallback !== null) { - $eventListenerInvoker->onProgress($progressCallback); - } - $eventListenerInvoker->replay(); + return new EventListenerInvoker($eventStore, $projector, $connection); } /** diff --git a/Tests/Unit/EventListener/EventListenerInvokerTest.php b/Tests/Unit/EventListener/EventListenerInvokerTest.php index a129b0ad..c213ec9f 100644 --- a/Tests/Unit/EventListener/EventListenerInvokerTest.php +++ b/Tests/Unit/EventListener/EventListenerInvokerTest.php @@ -9,13 +9,18 @@ use Neos\EventSourcing\EventListener\EventListenerInterface; use Neos\EventSourcing\EventListener\EventListenerInvoker; use Neos\EventSourcing\EventListener\Exception\EventCouldNotBeAppliedException; +use Neos\EventSourcing\EventStore\EventNormalizer; use Neos\EventSourcing\EventStore\EventStore; use Neos\EventSourcing\EventStore\EventStream; +use Neos\EventSourcing\EventStore\EventStreamIteratorInterface; +use Neos\EventSourcing\EventStore\RawEvent; +use Neos\EventSourcing\EventStore\Storage\InMemory\InMemoryStreamIterator; use Neos\EventSourcing\EventStore\StreamAwareEventListenerInterface; use Neos\EventSourcing\EventStore\StreamName; use Neos\EventSourcing\Tests\Unit\EventListener\Fixture\AppliedEventsStorageEventListener; use Neos\Flow\Tests\UnitTestCase; use PHPUnit\Framework\MockObject\MockObject; +use Ramsey\Uuid\Uuid; class EventListenerInvokerTest extends UnitTestCase { @@ -39,6 +44,16 @@ class EventListenerInvokerTest extends UnitTestCase */ private $mockEventStream; + /** + * @var EventListenerInterface|MockObject + */ + private $mockEventListener; + + /** + * @var EventNormalizer|MockObject + */ + private $mockEventNormalizer; + /** * @var AppliedEventsStorageInterface|MockObject */ @@ -55,37 +70,64 @@ public function setUp(): void $mockPlatform = $this->getMockBuilder(AbstractPlatform::class)->getMock(); $this->mockConnection->method('getDatabasePlatform')->willReturn($mockPlatform); - /** @var EventListenerInterface|MockObject $mockEventListener */ - $mockEventListener = $this->getMockBuilder(EventListenerInterface::class)->getMock(); - $this->eventListenerInvoker = new EventListenerInvoker($this->mockEventStore, $mockEventListener, $this->mockConnection); - $this->mockAppliedEventsStorage = $this->getMockBuilder(AppliedEventsStorageInterface::class)->getMock(); + $this->mockEventListener = $this->getMockBuilder(AppliedEventsStorageEventListener::class)->disableOriginalConstructor()->getMock(); + $this->mockEventListener->method('getAppliedEventsStorage')->willReturn($this->mockAppliedEventsStorage); + + $this->eventListenerInvoker = new EventListenerInvoker($this->mockEventStore, $this->mockEventListener, $this->mockConnection); + $this->mockEventStream = $this->getMockBuilder(EventStream::class)->disableOriginalConstructor()->getMock(); + $this->mockEventNormalizer = $this->getMockBuilder(EventNormalizer::class)->disableOriginalConstructor()->getMock(); } /** * @test - * @throws EventCouldNotBeAppliedException + * @throws */ - public function catchUpPassesRespectsReservedSequenceNumber(): void + public function catchUpAppliesEventsUpToTheDefinedMaximumSequenceNumber(): void { - $this->mockConnection->method('fetchColumn')->willReturn(123); - $this->mockEventStore->expects($this->once())->method('load')->with(StreamName::all(), 124)->willReturn($this->mockEventStream); + $eventRecords = []; + for ($sequenceNumber = 1; $sequenceNumber < 123; $sequenceNumber++) { + $eventRecords[] = [ + 'sequencenumber' => $sequenceNumber, + 'type' => 'FooEventType', + 'payload' => json_encode(['foo' => 'bar'], JSON_THROW_ON_ERROR, 512), + 'metadata' => json_encode([], JSON_THROW_ON_ERROR, 512), + 'recordedat' => '2020-08-17', + 'stream' => 'FooStreamName', + 'version' => $sequenceNumber, + 'id' => Uuid::uuid4()->toString() + ]; + } + + $streamIterator = new InMemoryStreamIterator(); + $streamIterator->setEventRecords($eventRecords); + $eventStream = new EventStream(StreamName::fromString('FooStreamName'), $streamIterator, $this->mockEventNormalizer); + + // Simulate that the first 10 events have already been applied: + $this->mockAppliedEventsStorage->expects($this->atLeastOnce())->method('reserveHighestAppliedEventSequenceNumber')->willReturn(10); + $this->mockEventStore->expects($this->once())->method('load')->with(StreamName::all(), 11)->willReturn($eventStream); + + $this->eventListenerInvoker = new EventListenerInvoker($this->mockEventStore, $this->mockEventListener, $this->mockConnection); + + $appliedEventsCounter = 0; + $this->eventListenerInvoker->onProgress(static function() use(&$appliedEventsCounter){ + $appliedEventsCounter ++; + }); + + $this->eventListenerInvoker = $this->eventListenerInvoker->withMaximumSequenceNumber(50); $this->eventListenerInvoker->catchUp(); - } + $this->assertSame(40, $appliedEventsCounter); + } /** * @test * @throws EventCouldNotBeAppliedException */ - public function catchUpPassesRespectsProvidesAppliedEventsStorageInterface(): void + public function catchUpPassesRespectsReservedSequenceNumber(): void { - $eventListener = new AppliedEventsStorageEventListener($this->mockAppliedEventsStorage); - - $this->eventListenerInvoker = new EventListenerInvoker($this->mockEventStore, $eventListener, $this->mockConnection); - $this->mockAppliedEventsStorage->expects($this->atLeastOnce())->method('reserveHighestAppliedEventSequenceNumber')->willReturn(123); $this->mockEventStore->expects($this->once())->method('load')->with(StreamName::all(), 124)->willReturn($this->mockEventStream); $this->eventListenerInvoker->catchUp(); @@ -104,6 +146,10 @@ public function catchUpPassesRespectsStreamAwareEventListenerInterface(): void $this->eventListenerInvoker->catchUp(); } + /** + * @param StreamName|null $streamName + * @return EventListenerInterface + */ private function buildMockEventListener(StreamName $streamName = null): EventListenerInterface { $listenerClassName = 'Mock_EventListener_' . md5(uniqid('', true)); diff --git a/Tests/Unit/EventStore/Storage/InMemory/InMemoryStreamIteratorTest.php b/Tests/Unit/EventStore/Storage/InMemory/InMemoryStreamIteratorTest.php new file mode 100644 index 00000000..91a8b8a7 --- /dev/null +++ b/Tests/Unit/EventStore/Storage/InMemory/InMemoryStreamIteratorTest.php @@ -0,0 +1,107 @@ +iterator = new InMemoryStreamIterator(); + $this->iterator->setEventRecords([ + [ + 'sequencenumber' => 1, + 'type' => 'FooEventType', + 'payload' => json_encode(['foo' => 'bar'], JSON_THROW_ON_ERROR, 512), + 'metadata' => json_encode([], JSON_THROW_ON_ERROR, 512), + 'recordedat' => '2020-08-17', + 'stream' => 'FooStreamName', + 'version' => 1, + 'id' => Uuid::uuid4()->toString() + ], + [ + 'sequencenumber' => 2, + 'type' => 'FooEventType', + 'payload' => json_encode(['foo' => 'bar'], JSON_THROW_ON_ERROR, 512), + 'metadata' => json_encode([], JSON_THROW_ON_ERROR, 512), + 'recordedat' => '2020-08-17', + 'stream' => 'FooStreamName', + 'version' => 2, + 'id' => Uuid::uuid4()->toString() + ] + ]); + } + + /** + * @test + * @throws + */ + public function setEventRecordsRejectsInvalidDate(): void + { + $iterator = new InMemoryStreamIterator(); + $iterator->setEventRecords([ + [ + 'sequencenumber' => 1, + 'type' => 'FooEventType', + 'payload' => json_encode(['foo' => 'bar'], JSON_THROW_ON_ERROR, 512), + 'metadata' => json_encode([], JSON_THROW_ON_ERROR, 512), + 'recordedat' => 'invalid-date', + 'stream' => 'FooStreamName', + 'version' => 1, + 'id' => Uuid::uuid4()->toString() + ] + ]); + + $this->expectExceptionCode(1597843669); + $iterator->current(); + } + + /** + * @test + * @throws + */ + public function canSetEventRecordsAndGetRawEvents(): void + { + $rawEvent = $this->iterator->current(); + $this->assertSame($rawEvent->getSequenceNumber(), 1); + $this->assertSame($rawEvent->getType(), 'FooEventType'); + $this->assertSame($rawEvent->getRecordedAt()->format('Y-m-d'), '2020-08-17'); + $this->assertSame((string)$rawEvent->getStreamName(), 'FooStreamName'); + } + + /** + * @test + * @throws + */ + public function providesIteratorFunctions(): void + { + $this->assertSame($this->iterator->key(), 1); + + $this->iterator->next(); + $this->assertSame($this->iterator->key(), 2); + $this->assertSame($this->iterator->current()->getSequenceNumber(), 2); + + $this->assertTrue($this->iterator->valid()); + $this->iterator->next(); + $this->assertFalse($this->iterator->valid()); + + $this->iterator->rewind(); + $this->assertTrue($this->iterator->valid()); + $this->assertSame($this->iterator->key(), 1); + + $this->iterator->rewind(); + $this->assertTrue($this->iterator->valid()); + $this->assertSame($this->iterator->key(), 1); + } +}