Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
mhsdesign committed Nov 28, 2024
1 parent dc5ff10 commit 7da6d1f
Show file tree
Hide file tree
Showing 3 changed files with 102 additions and 53 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ public function setup(SubscriptionEngineCriteria|null $criteria = null): Result

$this->subscriptionStore->setup();
$this->discoverNewSubscriptions();
$subscriptions = $this->subscriptionStore->findByCriteriaForUpdate(SubscriptionCriteria::forEngineCriteriaAndStatus($criteria, SubscriptionStatusFilter::fromArray([
$subscriptions = $this->subscriptionStore->findByCriteria(SubscriptionCriteria::forEngineCriteriaAndStatus($criteria, SubscriptionStatusFilter::fromArray([
SubscriptionStatus::NEW,
SubscriptionStatus::BOOTING,
SubscriptionStatus::ACTIVE
Expand All @@ -80,45 +80,45 @@ public function setup(SubscriptionEngineCriteria|null $criteria = null): Result
public function boot(SubscriptionEngineCriteria|null $criteria = null, \Closure $progressCallback = null): ProcessedResult
{
$criteria ??= SubscriptionEngineCriteria::noConstraints();
return $this->processExclusively(fn () => $this->subscriptionStore->transactional(
return $this->processExclusively(
function () use ($criteria, $progressCallback) {
$this->logger?->info('Subscription Engine: Start catching up subscriptions in state "BOOTING".');
$subscriptionsToCatchup = $this->subscriptionStore->findByCriteriaForUpdate(
$subscriptionsToCatchup = $this->subscriptionStore->findByCriteria(
SubscriptionCriteria::forEngineCriteriaAndStatus($criteria, SubscriptionStatus::BOOTING)
);
return $this->catchUpSubscriptions($subscriptionsToCatchup, $progressCallback);
})
}
);
}

public function catchUpActive(SubscriptionEngineCriteria|null $criteria = null, \Closure $progressCallback = null): ProcessedResult
{
$criteria ??= SubscriptionEngineCriteria::noConstraints();
return $this->processExclusively(fn () => $this->subscriptionStore->transactional(
return $this->processExclusively(
function () use ($criteria, $progressCallback) {
$this->logger?->info('Subscription Engine: Start catching up subscriptions in state "ACTIVE".');
$subscriptionsToCatchup = $this->subscriptionStore->findByCriteriaForUpdate(
$subscriptionsToCatchup = $this->subscriptionStore->findByCriteria(
SubscriptionCriteria::forEngineCriteriaAndStatus($criteria, SubscriptionStatus::ACTIVE)
);
return $this->catchUpSubscriptions($subscriptionsToCatchup, $progressCallback);
})
}
);
}

public function reactivate(SubscriptionEngineCriteria|null $criteria = null, \Closure $progressCallback = null): ProcessedResult
{
$criteria ??= SubscriptionEngineCriteria::noConstraints();
return $this->processExclusively(fn () => $this->subscriptionStore->transactional(
return $this->processExclusively(
function () use ($criteria, $progressCallback) {
$this->logger?->info('Subscription Engine: Start catching up subscriptions in state "ACTIVE".');
$subscriptionsToCatchup = $this->subscriptionStore->findByCriteriaForUpdate(
$subscriptionsToCatchup = $this->subscriptionStore->findByCriteria(
SubscriptionCriteria::forEngineCriteriaAndStatus($criteria, SubscriptionStatusFilter::fromArray([
SubscriptionStatus::ERROR,
SubscriptionStatus::DETACHED,
]))
);
return $this->catchUpSubscriptions($subscriptionsToCatchup, $progressCallback);
})
}
);
}

Expand All @@ -127,7 +127,7 @@ public function reset(SubscriptionEngineCriteria|null $criteria = null): Result
$criteria ??= SubscriptionEngineCriteria::noConstraints();

$this->logger?->info('Subscription Engine: Start to reset.');
$subscriptions = $this->subscriptionStore->findByCriteriaForUpdate(SubscriptionCriteria::forEngineCriteriaAndStatus($criteria, SubscriptionStatusFilter::any()));
$subscriptions = $this->subscriptionStore->findByCriteria(SubscriptionCriteria::forEngineCriteriaAndStatus($criteria, SubscriptionStatusFilter::any()));
if ($subscriptions->isEmpty()) {
$this->logger?->info('Subscription Engine: No subscriptions to reset.');
return Result::success();
Expand All @@ -146,7 +146,7 @@ public function subscriptionStatus(SubscriptionEngineCriteria|null $criteria = n
{
$statuses = [];
try {
$subscriptions = $this->subscriptionStore->findByCriteriaForUpdate(SubscriptionCriteria::create(ids: $criteria?->ids));
$subscriptions = $this->subscriptionStore->findByCriteria(SubscriptionCriteria::create(ids: $criteria?->ids));
} catch (TableNotFoundException) {
// the schema is not setup - thus there are no subscribers
return SubscriptionStatusCollection::createEmpty();
Expand Down Expand Up @@ -210,7 +210,7 @@ private function handleEvent(EventEnvelope $eventEnvelope, EventInterface $domai
*/
private function discoverNewSubscriptions(): void
{
$subscriptions = $this->subscriptionStore->findByCriteriaForUpdate(SubscriptionCriteria::noConstraints());
$subscriptions = $this->subscriptionStore->findByCriteria(SubscriptionCriteria::noConstraints());
foreach ($this->subscribers as $subscriber) {
if ($subscriptions->contain($subscriber->id)) {
continue;
Expand Down Expand Up @@ -296,6 +296,16 @@ private function resetSubscription(Subscription $subscription): ?Error

private function catchUpSubscriptions(Subscriptions $subscriptionsToCatchup, \Closure $progressClosure = null): ProcessedResult
{

// problems:
// one the onBeforeCatchUp / onAfterCatchUp hooks will be more difficult and less obvious to invoke
// we cannot detach old subscriptions as we loop over the subscribers
// we have to fetch the subscriber and then skip its posisiont if its ahead
// we cannot abort easily if there ar no matching subscribers?
// if abortsCatchupAndRollBack fails we cannot abort transaction!!! -> but this is also a problem if we 'd start batching again....
// for batches before and after????


foreach ($subscriptionsToCatchup as $subscription) {
if (!$this->subscribers->contain($subscription->id)) {
// mark detached subscriptions as we cannot handle them and exclude them from catchup
Expand Down Expand Up @@ -349,39 +359,67 @@ private function catchUpSubscriptions(Subscriptions $subscriptionsToCatchup, \Cl
$this->logger?->debug(sprintf('Subscription Engine: Subscription "%s" is farther than the current position (%d >= %d), continue catch up.', $subscription->id->value, $subscription->position->value, $sequenceNumber->value));
continue;
}
$error = $this->handleEvent($eventEnvelope, $domainEvent, $subscription->id);
if ($error !== null) {
// ERROR Case:
// 1.) for the leftover events we are not including this failed subscription for catchup
$subscriptionsToCatchup = $subscriptionsToCatchup->without($subscription->id);
// 2.) update the subscription error state on either its unchanged or new position (if some events worked)
$this->subscriptionStore->transactional(function () use ($eventEnvelope, $domainEvent, $subscription, $numberOfProcessedEvents, &$subscriptionsToCatchup, &$errors, &$highestSequenceNumberForSubscriber) {
$error = $this->handleEvent($eventEnvelope, $domainEvent, $subscription->id);
if ($error !== null) {
// ERROR Case:
// 1.) for the leftover events we are not including this failed subscription for catchup
$subscriptionsToCatchup = $subscriptionsToCatchup->without($subscription->id);
// 2.) update the subscription error state on either its unchanged or new position (if some events worked)
$this->subscriptionStore->update(
$subscription->id,
status: SubscriptionStatus::ERROR,
// todo rather use previous position:
position: $highestSequenceNumberForSubscriber[$subscription->id->value] ?? $subscription->position,
subscriptionError: SubscriptionError::fromPreviousStatusAndException(
$subscription->status,
$error->throwable
),
);
// 3.) invoke onAfterCatchUp, as onBeforeCatchUp was invoked already and to be consistent we want to "shutdown" this catchup iteration event though we know it failed
// todo put the ERROR $subscriptionStatus into the after hook, so it can properly be reacted upon
try {
$this->subscribers->get($subscription->id)->onAfterCatchUp();
} catch (\Throwable $e) {
// analog to onBeforeCatchUp, we tolerate no exceptions here and consider it a critical developer error.
$message = sprintf('Subscriber "%s" had an error and also failed onAfterCatchUp: %s', $subscription->id->value, $e->getMessage());
$this->logger?->critical($message);
throw new CatchUpFailed($message, 1732733740, $e);
}
$errors[] = $error;
return;
}

$highestSequenceNumberForSubscriber[$subscription->id->value] = $eventEnvelope->sequenceNumber;

// after catchup mark all subscriptions as active, so they are triggered automatically now.
// The position will be set to the one the subscriber handled last, or if no events were in the stream, and we booted we keep the persisted position
$this->subscriptionStore->update(
$subscription->id,
status: SubscriptionStatus::ERROR,
position: $highestSequenceNumberForSubscriber[$subscription->id->value] ?? $subscription->position,
subscriptionError: SubscriptionError::fromPreviousStatusAndException(
$subscription->status,
$error->throwable
),
status: SubscriptionStatus::ACTIVE,
position: $eventEnvelope->sequenceNumber,
subscriptionError: null,
);
// 3.) invoke onAfterCatchUp, as onBeforeCatchUp was invoked already and to be consistent we want to "shutdown" this catchup iteration event though we know it failed
// todo put the ERROR $subscriptionStatus into the after hook, so it can properly be reacted upon
try {
$this->subscribers->get($subscription->id)->onAfterCatchUp();
} catch (\Throwable $e) {
// analog to onBeforeCatchUp, we tolerate no exceptions here and consider it a critical developer error.
$message = sprintf('Subscriber "%s" had an error and also failed onAfterCatchUp: %s', $subscription->id->value, $e->getMessage());
$this->logger?->critical($message);
throw new CatchUpFailed($message, 1732733740, $e);
if ($numberOfProcessedEvents === 0 && $subscription->status !== SubscriptionStatus::ACTIVE) {
$this->logger?->info(sprintf('Subscription Engine: Subscription "%s" has been set to active from %s.', $subscription->id->value, $subscription->status->name));
}
$errors[] = $error;
continue;
}
// HAPPY Case:
$highestSequenceNumberForSubscriber[$subscription->id->value] = $eventEnvelope->sequenceNumber;
});

}
$numberOfProcessedEvents++;
}
foreach ($subscriptionsToCatchup as $subscription) {
if ($numberOfProcessedEvents === 0 && $subscription->status !== SubscriptionStatus::ACTIVE) {
$this->subscriptionStore->update(
$subscription->id,
status: SubscriptionStatus::ACTIVE,
position: $subscription->position,
subscriptionError: null,
);
$this->logger?->info(sprintf('Subscription Engine: Subscription "%s" has been set to active from %s.', $subscription->id->value, $subscription->status->name));
}
}

foreach ($subscriptionsToCatchup as $subscription) {
try {
$this->subscribers->get($subscription->id)->onAfterCatchUp();
Expand All @@ -391,17 +429,6 @@ private function catchUpSubscriptions(Subscriptions $subscriptionsToCatchup, \Cl
$this->logger?->critical($message);
throw new CatchUpFailed($message, 1732374000, $e);
}
// after catchup mark all subscriptions as active, so they are triggered automatically now.
// The position will be set to the one the subscriber handled last, or if no events were in the stream, and we booted we keep the persisted position
$this->subscriptionStore->update(
$subscription->id,
status: SubscriptionStatus::ACTIVE,
position: $highestSequenceNumberForSubscriber[$subscription->id->value] ?? $subscription->position,
subscriptionError: null,
);
if ($subscription->status !== SubscriptionStatus::ACTIVE) {
$this->logger?->info(sprintf('Subscription Engine: Subscription "%s" has been set to active after booting.', $subscription->id->value));
}
}
$this->logger?->info(sprintf('Subscription Engine: Finish catch up. %d processed events %d errors.', $numberOfProcessedEvents, count($errors)));
return $errors === [] ? ProcessedResult::success($numberOfProcessedEvents) : ProcessedResult::failed($numberOfProcessedEvents, Errors::fromArray($errors));
Expand All @@ -418,9 +445,11 @@ private function processExclusively(\Closure $closure): mixed
throw new SubscriptionEngineAlreadyProcessingException('Subscription engine is already processing', 1732714075);
}
$this->processing = true;
$this->subscriptionStore->acquireLock();
try {
return $closure();
} finally {
$this->subscriptionStore->releaseLock();
$this->processing = false;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ interface SubscriptionStoreInterface
{
public function setup(): void;

public function findByCriteriaForUpdate(SubscriptionCriteria $criteria): Subscriptions;
public function findByCriteria(SubscriptionCriteria $criteria): Subscriptions;

public function add(Subscription $subscription): void;

Expand All @@ -29,6 +29,10 @@ public function update(
SubscriptionError|null $subscriptionError,
): void;

public function acquireLock(): void;

public function releaseLock(): void;

/**
* @template T
* @param \Closure():T $closure
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,13 +64,12 @@ public function setup(): void
}
}

public function findByCriteriaForUpdate(SubscriptionCriteria $criteria): Subscriptions
public function findByCriteria(SubscriptionCriteria $criteria): Subscriptions
{
$queryBuilder = $this->dbal->createQueryBuilder()
->select('*')
->from($this->tableName)
->orderBy('id');
$queryBuilder->forUpdate();
if ($criteria->ids !== null) {
$queryBuilder->andWhere('id IN (:ids)')
->setParameter(
Expand Down Expand Up @@ -165,6 +164,23 @@ private static function fromDatabase(array $row): Subscription
);
}

public function acquireLock(): void
{
// todo fully implement https://github.com/patchlevel/event-sourcing/blob/caaf54fcf32c0e42b1036a5c7ff77c1a37af0105/src/Store/DoctrineDbalStore.php#L456
$result = $this->dbal->fetchOne(sprintf('SELECT GET_LOCK("%s", %d)', 'default', 0));
if ($result !== 1) {
throw new \RuntimeException('failed to acquire lock');
}
}

public function releaseLock(): void
{
$result = $this->dbal->fetchOne(sprintf('SELECT RELEASE_LOCK("%s")', 'default'));
if ($result !== 1) {
throw new \RuntimeException('failed to release lock');
}
}

public function transactional(\Closure $closure): mixed
{
return $this->dbal->transactional($closure);
Expand Down

0 comments on commit 7da6d1f

Please sign in to comment.