Skip to content

Commit

Permalink
Merge pull request #269 from neos/feature/replay-until-sequence-number
Browse files Browse the repository at this point in the history
FEATURE: Replay / catch up until sequence number
  • Loading branch information
skurfuerst authored Aug 20, 2020
2 parents 74e620b + 0703e97 commit 09c480a
Show file tree
Hide file tree
Showing 6 changed files with 390 additions and 43 deletions.
48 changes: 31 additions & 17 deletions Classes/Command/ProjectionCommandController.php
Original file line number Diff line number Diff line change
Expand Up @@ -89,31 +89,39 @@ public function describeCommand(string $projection): void
/**
* Replay a projection
*
* This command allows you to replay all relevant events for one specific projection.
* This command allows you to replay all relevant events for one specific projection. By passing a maximum
* sequence number you can limit replay up to the specified event.
*
* @param string $projection The projection identifier; see projection:list for possible options
* @param int|null $maximumSequenceNumber If specified, only replay events until this event sequence number
* @param bool $quiet If specified, this command won't produce any output apart from errors (useful for automation)
* @return void
* @throws EventCouldNotBeAppliedException
* @throws StopCommandException
* @see neos.eventsourcing:projection:list
* @see neos.eventsourcing:projection:replayall
*/
public function replayCommand(string $projection, $quiet = false): void
public function replayCommand(string $projection, int $maximumSequenceNumber = null, $quiet = false): void
{
$projectionDto = $this->resolveProjectionOrQuit($projection);

if (!$quiet) {
$this->outputLine('Replaying events for projection "%s" ...', [$projectionDto->getIdentifier()]);
$this->outputLine('Replaying events for projection "%s"%s ...', [$projectionDto->getIdentifier(), ($maximumSequenceNumber ? ' until sequence number ' . $maximumSequenceNumber : '')]);
$this->output->progressStart();
}
$eventsCount = 0;
$this->projectionManager->replay($projectionDto->getIdentifier(), function () use (&$eventsCount, $quiet) {
$eventsCount ++;
$progressCallback = function () use (&$eventsCount, $quiet) {
$eventsCount++;
if (!$quiet) {
$this->output->progressAdvance();
}
});
};
if ($maximumSequenceNumber !== null) {
$this->projectionManager->replayUntilSequenceNumber($projectionDto->getIdentifier(), $maximumSequenceNumber, $progressCallback);
} else {
$this->projectionManager->replay($projectionDto->getIdentifier(), $progressCallback);
}

if (!$quiet) {
$this->output->progressFinish();
$this->outputLine('Replayed %s events.', [$eventsCount]);
Expand All @@ -123,31 +131,37 @@ public function replayCommand(string $projection, $quiet = false): void
/**
* Replay all projections
*
* This command allows you to replay all relevant events for all known projections.
* This command allows you to replay all relevant events for all known projections. By passing a maximum
* sequence number you can limit replay up to the specified event.
*
* @param int|null $maximumSequenceNumber If specified, only replay events until this event sequence number
* @param bool $quiet If specified, this command won't produce any output apart from errors (useful for automation)
* @return void
* @see neos.eventsourcing:projection:replay
* @see neos.eventsourcing:projection:list
* @throws EventCouldNotBeAppliedException
* @see neos.eventsourcing:projection:list
* @see neos.eventsourcing:projection:replay
* @noinspection DisconnectedForeachInstructionInspection
*/
public function replayAllCommand($quiet = false): void
public function replayAllCommand(int $maximumSequenceNumber = null, $quiet = false): void
{
if (!$quiet) {
$this->outputLine('Replaying all projections');
$this->outputLine('Replaying all projections%s', [$maximumSequenceNumber ? ' until sequence number ' . $maximumSequenceNumber : '']);
}
$eventsCount = 0;
$progressCallback = function () use (&$eventsCount, $quiet) {
$eventsCount++;
$quiet || $this->output->progressAdvance();
};
foreach ($this->projectionManager->getProjections() as $projection) {
if (!$quiet) {
$this->outputLine('Replaying events for projection "%s" ...', [$projection->getIdentifier()]);
$this->output->progressStart();
}
$this->projectionManager->replay($projection->getIdentifier(), function () use (&$eventsCount, $quiet) {
$eventsCount++;
if (!$quiet) {
$this->output->progressAdvance();
}
});
if ($maximumSequenceNumber !== null) {
$this->projectionManager->replayUntilSequenceNumber($projection->getIdentifier(), $maximumSequenceNumber, $progressCallback);
} else {
$this->projectionManager->replay($projection->getIdentifier(), $progressCallback);
}
if (!$quiet) {
$this->output->progressFinish();
$this->outputLine();
Expand Down
52 changes: 47 additions & 5 deletions Classes/EventListener/EventListenerInvoker.php
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
* source code.
*/

use Closure;
use Doctrine\DBAL\Connection;
use Neos\EventSourcing\EventListener\AppliedEventsStorage\AppliedEventsStorageInterface;
use Neos\EventSourcing\EventListener\AppliedEventsStorage\DoctrineAppliedEventsStorage;
Expand Down Expand Up @@ -59,7 +60,7 @@ final class EventListenerInvoker
private $connection;

/**
* @var \Closure[]
* @var Closure[]
*/
private $progressCallbacks = [];

Expand All @@ -72,6 +73,20 @@ final class EventListenerInvoker
*/
private $transactionBatchSize = 1;

/**
* Until which sequence number should events be caught up?
* By default, events are caught up (for example during replay) until the latest
* available event.
*
* @var int
*/
private $maximumSequenceNumber;

/**
* @param EventStore $eventStore
* @param EventListenerInterface $eventListener
* @param Connection $connection
*/
public function __construct(EventStore $eventStore, EventListenerInterface $eventListener, Connection $connection)
{
$this->eventStore = $eventStore;
Expand All @@ -82,9 +97,9 @@ public function __construct(EventStore $eventStore, EventListenerInterface $even
/**
* Register a callback that is invoked for every event that is applied during replay/catchup
*
* @param \Closure $callback
* @param Closure $callback
*/
public function onProgress(\Closure $callback): void
public function onProgress(Closure $callback): void
{
$this->progressCallbacks[] = $callback;
}
Expand All @@ -94,10 +109,10 @@ public function onProgress(\Closure $callback): void
* This allows for faster replays/catchups at the cost of an "at least once delivery" if an error occurs.
*
* Usage:
* $eventListenerInvoker = (new EventListenerInvoker($eventStore, $listener, $connection))->withTransactionBatchSize(100)->catchUp($listener);
* $eventListenerInvoker = (new EventListenerInvoker($eventStore, $listener, $connection))->withTransactionBatchSize(100)->catchUp();
*
* @param int $batchSize
* @return $this
* @return self
*/
public function withTransactionBatchSize(int $batchSize): self
{
Expand All @@ -110,6 +125,26 @@ public function withTransactionBatchSize(int $batchSize): self
return $instance;
}

/**
* Configures the maximum sequence number for catch up / replay and returns this instance
* This allows for catching up or replaying events only until a certain point.
*
* Usage:
* $eventListenerInvoker = (new EventListenerInvoker($eventStore, $listener, $connection))->withMaximumSequenceNumber(268)->replay();
*
* @param int $maximumSequenceNumber
* @return self
*/
public function withMaximumSequenceNumber(int $maximumSequenceNumber): self
{
if ($maximumSequenceNumber < 1) {
throw new \InvalidArgumentException('The maximum sequence number must be greater than 0', 1597821711);
}
$instance = clone $this;
$instance->maximumSequenceNumber = $maximumSequenceNumber;
return $instance;
}

/**
* @throws EventCouldNotBeAppliedException
*/
Expand All @@ -131,11 +166,15 @@ public function catchUp(): void
$streamName = $this->eventListener instanceof StreamAwareEventListenerInterface ? $this->eventListener::listensToStream() : StreamName::all();
$eventStream = $this->eventStore->load($streamName, $highestAppliedSequenceNumber + 1);
$appliedEventsCounter = 0;

foreach ($eventStream as $eventEnvelope) {
$sequenceNumber = $eventEnvelope->getRawEvent()->getSequenceNumber();
if ($sequenceNumber <= $highestAppliedSequenceNumber) {
continue;
}
if ($this->maximumSequenceNumber !== null && $sequenceNumber > $this->maximumSequenceNumber) {
break;
}
try {
$this->applyEvent($eventEnvelope);
} catch (EventCouldNotBeAppliedException $exception) {
Expand Down Expand Up @@ -186,6 +225,9 @@ private function applyEvent(EventEnvelope $eventEnvelope): void
}
}

/**
* @return AppliedEventsStorageInterface
*/
private function getAppliedEventsStorage(): AppliedEventsStorageInterface
{
if ($this->eventListener instanceof ProvidesAppliedEventsStorageInterface) {
Expand Down
111 changes: 111 additions & 0 deletions Classes/EventStore/Storage/InMemory/InMemoryStreamIterator.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
<?php
declare(strict_types=1);
namespace Neos\EventSourcing\EventStore\Storage\InMemory;

/*
* This file is part of the Neos.EventSourcing package.
*
* (c) Contributors of the Neos Project - www.neos.io
*
* This package is Open Source Software. For the full copyright and license
* information, please view the LICENSE file which was distributed with this
* source code.
*/

use ArrayIterator;
use Neos\EventSourcing\EventStore\EventStreamIteratorInterface;
use Neos\EventSourcing\EventStore\RawEvent;
use Neos\EventSourcing\EventStore\StreamName;

/**
* Stream Iterator for an in-memory based EventStore – intended for testing
*/
final class InMemoryStreamIterator implements EventStreamIteratorInterface
{
/**
* @var int
*/
private $currentOffset = 0;

/**
* @var ArrayIterator
*/
private $innerIterator;

/**
* @var array
*/
private $eventRecords = [];

/**
* @param array $eventRecords
*/
public function setEventRecords(array $eventRecords): void
{
$this->eventRecords = $eventRecords;
$this->innerIterator = new ArrayIterator($this->eventRecords);
}

/**
* @return RawEvent
* @throws \JsonException
*/
public function current(): RawEvent
{
$currentEventData = $this->innerIterator->current();
$payload = json_decode($currentEventData['payload'], true, 512, JSON_THROW_ON_ERROR);
$metadata = json_decode($currentEventData['metadata'], true, 512, JSON_THROW_ON_ERROR);
try {
$recordedAt = new \DateTimeImmutable($currentEventData['recordedat']);
} catch (\Exception $exception) {
throw new \RuntimeException(sprintf('Could not parse recordedat timestamp "%s" as date.', $currentEventData['recordedat']), 1597843669, $exception);
}
return new RawEvent(
(int)$currentEventData['sequencenumber'],
$currentEventData['type'],
$payload,
$metadata,
StreamName::fromString($currentEventData['stream']),
(int)$currentEventData['version'],
$currentEventData['id'],
$recordedAt
);
}

/**
* @return void
*/
public function next(): void
{
$this->currentOffset = $this->innerIterator->current()['sequencenumber'];
$this->innerIterator->next();
}

/**
* @return bool|int
*/
public function key()
{
return $this->innerIterator->valid() ? $this->innerIterator->current()['sequencenumber'] : null;
}

/**
* @return bool
*/
public function valid(): bool
{
return $this->innerIterator->valid();
}

/**
* @return void
*/
public function rewind(): void
{
if ($this->currentOffset === 0) {
return;
}
$this->innerIterator->rewind();
$this->currentOffset = 0;
}
}
41 changes: 34 additions & 7 deletions Classes/Projection/ProjectionManager.php
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
* source code.
*/

use Closure;
use Doctrine\ORM\EntityManagerInterface;
use Neos\EventSourcing\EventListener\EventListenerInvoker;
use Neos\EventSourcing\EventListener\Exception\EventCouldNotBeAppliedException;
Expand Down Expand Up @@ -96,12 +97,42 @@ public function getProjection(string $projectionIdentifier): Projection
* Replay events of the specified projection
*
* @param string $projectionIdentifier unambiguous identifier of the projection to replay
* @param \Closure $progressCallback If set, this callback is invoked for every applied event during replay with the arguments $sequenceNumber and $eventStreamVersion
* @param Closure|null $progressCallback If set, this callback is invoked for every applied event during replay with the arguments $sequenceNumber and $eventStreamVersion
* @return void
* @throws EventCouldNotBeAppliedException
* @api
*/
public function replay(string $projectionIdentifier, Closure $progressCallback = null): void
{
$eventListenerInvoker = $this->createEventListenerInvokerForProjection($projectionIdentifier);
if ($progressCallback !== null) {
$eventListenerInvoker->onProgress($progressCallback);
}
$eventListenerInvoker->replay();
}

/**
* Replay events of the specified projection until the specified event sequence number
*
* @param string $projectionIdentifier unambiguous identifier of the projection to replay
* @param int $maximumSequenceNumber The sequence number of the event until which events should be replayed. The specified event will be included in the replay.
* @param Closure|null $progressCallback If set, this callback is invoked for every applied event during replay with the arguments $sequenceNumber and $eventStreamVersion
* @throws EventCouldNotBeAppliedException
*/
public function replay(string $projectionIdentifier, \Closure $progressCallback = null): void
public function replayUntilSequenceNumber(string $projectionIdentifier, int $maximumSequenceNumber, Closure $progressCallback = null): void
{
$eventListenerInvoker = $this->createEventListenerInvokerForProjection($projectionIdentifier)->withMaximumSequenceNumber($maximumSequenceNumber);
if ($progressCallback !== null) {
$eventListenerInvoker->onProgress($progressCallback);
}
$eventListenerInvoker->replay();
}

/**
* @param string $projectionIdentifier
* @return EventListenerInvoker
*/
private function createEventListenerInvokerForProjection(string $projectionIdentifier): EventListenerInvoker
{
$projection = $this->getProjection($projectionIdentifier);

Expand All @@ -113,11 +144,7 @@ public function replay(string $projectionIdentifier, \Closure $progressCallback
$projector->reset();

$connection = $this->objectManager->get(EntityManagerInterface::class)->getConnection();
$eventListenerInvoker = new EventListenerInvoker($eventStore, $projector, $connection);
if ($progressCallback !== null) {
$eventListenerInvoker->onProgress($progressCallback);
}
$eventListenerInvoker->replay();
return new EventListenerInvoker($eventStore, $projector, $connection);
}

/**
Expand Down
Loading

0 comments on commit 09c480a

Please sign in to comment.