Saga Orchestrator is responsible for managing the stateful workflows across Saga participants. Its design has the following premises:
- Should be completely decoupled from Saga participants, where the communication should occur through asynchronous messaging with a message streaming platform.
- Should contain only the coordination logic. Business logic should occur only on Saga participants.
- In the coordination logic, should only produce commands and wait for events from Saga participants to coordinate next steps decisions on the workflow.
To simplify the orchestrator implementation, it is recommended to have a well defined mapping of:
- All commands and payloads the orchestrator should be able to produce
- All events the orchestrator should wait from Saga participants
- Successful and failed workflows
The orchestrator function starts by getting the input object provided by the Saga Client
function.
TransactionItem item = context.GetInput<TransactionItem>();
Then maps:
- All commands and its respective command producer methods the orchestrator is allowed to execute, where each method is responsible for producing the command to a message streaming platform (e.g. Event Hubs)
- All possible saga states and its respective persister methods the orchestrator is allowed to execute, where each method is responsible for persisting the saga state on the database (e.g. Cosmos DB).
var commandProducers = new Dictionary<string, Func<Task<ActivityResult<ProducerResult>>>>
{
[nameof(ValidateTransferCommand)] = () => ActivityFactory.ProduceValidateTransferCommandAsync(item, context, log),
[nameof(TransferCommand)] = () => ActivityFactory.ProduceTransferCommandAsync(item, context, log),
[nameof(CancelTransferCommand)] = () => ActivityFactory.ProduceCancelTransferCommandAsync(item, context, log),
[nameof(IssueReceiptCommand)] = () => ActivityFactory.ProduceIssueReceiptCommandAsync(item, context, log)
};
var sagaStatePersisters = new Dictionary<string, Func<Task<bool>>>
{
[nameof(SagaState.Pending)] = () => SagaFactory.PersistSagaStateAsync(item, SagaState.Pending, context, log),
[nameof(SagaState.Success)] = () => SagaFactory.PersistSagaStateAsync(item, SagaState.Success, context, log),
[nameof(SagaState.Cancelled)] = () => SagaFactory.PersistSagaStateAsync(item, SagaState.Cancelled, context, log),
[nameof(SagaState.Fail)] = () => SagaFactory.PersistSagaStateAsync(item, SagaState.Fail, context, log),
};
Then starts a new instance of the DurableOrchestrator
and invoke the OrchestrateAsync
method.
try
{
var orchestrator = new DurableOrchestrator(commandProducers, sagaStatePersisters);
SagaState sagaState = await orchestrator.OrchestrateAsync(item, context, log);
log.LogInformation($@"Saga state = {nameof(sagaState)} [{context.InstanceId}]");
}
catch (ArgumentException ex)
{
log.LogError(ex.Message);
}
The OrchestrateAsync
method starts by persisting the Saga state as PENDING
on the database. If an error occurs on the persistence, the error is logged and the orchestrator will not start the coordination logic.
bool sagaSatePersisted = await SagaStatePersisters[nameof(SagaState.Pending)]();
if (!sagaSatePersisted) {
return SagaState.Fail;
}
The ValidateTransferCommand
is produced to Validator
event hub. If an error occurs while producing the ValidateTransferCommand
to Event Hubs, the orchestrator persists the Saga state as FAILED
on the database.
ActivityResult<ProducerResult> validateTransferCommandResult = await CommandProducers[nameof(ValidateTransferCommand)]();
if (!validateTransferCommandResult.Valid)
{
await SagaStatePersisters[nameof(SagaState.Fail)]();
return SagaState.Fail;
}
Otherwise, the orchestrator waits for the event from the Validator
saga participant up to a specified response time (defined on ValidatorTimeout
constant).
string validatorEventName = await DurableOrchestrationContextExtensions
.WaitForExternalEventWithTimeout<string>(context, Sources.Validator, ValidatorTimeout);
In a successful scenario, the Validator
participant should return a TransferValidatedEvent
. Otherwise, two potential failures are covered:
- An error occurred on the transfer validation operation and a failed event was produced.
- The orchestrator didn't receive the event from the
Validator
participant in a specified response time.
If one of the potential failures occur, the orchestrator persists the Saga state as FAILED
on the database.
if (validatorEventName != nameof(TransferValidatedEvent))
{
log.LogError($@"{validatorEventName} returned from {Sources.Validator.ToString()} (transaction id: {context.InstanceId})");
await SagaStatePersisters[nameof(SagaState.Fail)]();
return SagaState.Fail;
}
Then the workflow continues with similar steps:
- Produce new commands to Event Hubs and validate if these commands were produced successfully
- Wait for events from Saga participants and takes 3 possible actions:
- Move to the next step of the workflow if received a successful event
- Produce a new compensation command if received a failed event
- Persist the Saga state as
FAILED
on the database and finish the workflow
After completing a successful workflow, persist the Saga state as SUCCESS
on the database and finish the workflow.
await SagaStatePersisters[nameof(SagaState.Success)]();
log.LogInformation($@"Saga '{context.InstanceId}' finished successfully.");
return SagaState.Success;
The complete orchestration source code can be found on
src/Sga.Orchestration/Services/Orchestrator.cs
.
The Saga.Orchestrator.Tests
solution leverages xUnit.net and Moq for the creation of unit tests.
Let's consider you want to test a successful workflow given the following scenario:
- The orchestrator should produce
ValidateTransferCommand
,TransferCommand
andIssueReceiptCommand
. - The orchestrator should receive
TransferValidatedEvent
fromValidator
participant,TransferSucceededEvent
fromTransfer
participant andReceiptIssuedEvent
fromReceipt
participant. - The orchestrator should persist the saga state as
Pending
andSuccess
.
First, create a new instance of the TransactionItem
, which is the orchestrator input:
var item = new TransactionItem
{
Id = Guid.NewGuid().ToString(),
AccountFromId = Guid.NewGuid().ToString(),
AccountToId = Guid.NewGuid().ToString(),
Amount = 100.00M,
State = SagaState.Pending.ToString()
};
Then define all commands and its respective command producer methods, as well as the expected saga state persisters:
var commandProducers = new Dictionary<string, Func<Task<ActivityResult<ProducerResult>>>>
{
[nameof(ValidateTransferCommand)] = () => Task.Run(() => new ActivityResult<ProducerResult>
{
Item = new ProducerResult()
}),
[nameof(TransferCommand)] = () => Task.Run(() => new ActivityResult<ProducerResult>
{
Item = new ProducerResult()
}),
[nameof(IssueReceiptCommand)] = () => Task.Run(() => new ActivityResult<ProducerResult>
{
Item = new ProducerResult()
})
};
var sagaStatePersisters = new Dictionary<string, Func<Task<bool>>>
{
[nameof(SagaState.Pending)] = () => Task.Run(() => true),
[nameof(SagaState.Success)] = () => Task.Run(() => true),
};
Then two mocks are required:
- External events response from
IDurableOrchestrationContext
- Logs from
ILogger
var loggerMock = new Mock<ILogger>();
var mockContext = new Mock<IDurableOrchestrationContext>();
mockContext
.Setup(x => x.WaitForExternalEvent<string>(nameof(Sources.Validator)))
.ReturnsAsync(nameof(TransferValidatedEvent));
mockContext
.Setup(x => x.WaitForExternalEvent<string>(nameof(Sources.Transfer)))
.ReturnsAsync(nameof(TransferSucceededEvent));
mockContext
.Setup(x => x.WaitForExternalEvent<string>(nameof(Sources.Receipt)))
.ReturnsAsync(nameof(ReceiptIssuedEvent));
Then starts the orchestrator with SagaState.Success
as the expected response:
var orchestrator = new DurableOrchestrator(commandProducers, sagaStatePersisters);
SagaState sagaState = await orchestrator.OrchestrateAsync(item, mockContext.Object, loggerMock.Object);
Assert.Equal(SagaState.Success, sagaState);
Let's consider you want to test a workflow that requires compensating transactions given the following scenario:
- The orchestrator should produce
ValidateTransferCommand
,TransferCommand
,IssueReceiptCommand
andCancelTransferCommand
. - The orchestrator should receive
TransferValidatedEvent
fromValidator
participant,TransferSucceededEvent
fromTransfer
participant,OtherReasonReceiptFailedEvent
fromReceipt
participant andTransferCanceledEvent
from theTransfer
participant. - The orchestrator should persist the saga state as
Pending
andCancelled
.
First, create a new instance of the TransactionItem
, which is the orchestrator input:
var item = new TransactionItem
{
Id = Guid.NewGuid().ToString(),
AccountFromId = Guid.NewGuid().ToString(),
AccountToId = Guid.NewGuid().ToString(),
Amount = 100.00M,
State = SagaState.Pending.ToString()
};
Then define all commands and its respective command producer methods, as well as the expected saga state persisters:
var commandProducers = new Dictionary<string, Func<Task<ActivityResult<ProducerResult>>>>
{
[nameof(ValidateTransferCommand)] = () => Task.Run(() => new ActivityResult<ProducerResult>
{
Item = new ProducerResult()
}),
[nameof(TransferCommand)] = () => Task.Run(() => new ActivityResult<ProducerResult>
{
Item = new ProducerResult()
}),
[nameof(CancelTransferCommand)] = () => Task.Run(() => new ActivityResult<ProducerResult>
{
Item = new ProducerResult()
}),
[nameof(IssueReceiptCommand)] = () => Task.Run(() => new ActivityResult<ProducerResult>
{
Item = new ProducerResult()
})
};
var sagaStatePersisters = new Dictionary<string, Func<Task<bool>>>
{
[nameof(SagaState.Pending)] = () => Task.Run(() => true),
[nameof(SagaState.Cancelled)] = () => Task.Run(() => true)
};
Then create the mocks:
var loggerMock = new Mock<ILogger>();
var mockContext = new Mock<IDurableOrchestrationContext>();
mockContext
.Setup(x => x.WaitForExternalEvent<string>(nameof(Sources.Validator)))
.ReturnsAsync(nameof(TransferValidatedEvent));
mockContext
.SetupSequence(x => x.WaitForExternalEvent<string>(nameof(Sources.Transfer)))
.ReturnsAsync(nameof(TransferSucceededEvent))
.ReturnsAsync(nameof(TransferCanceledEvent));
mockContext
.Setup(x => x.WaitForExternalEvent<string>(nameof(Sources.Receipt)))
.ReturnsAsync(nameof(OtherReasonReceiptFailedEvent));
Then starts the orchestrator with SagaState.Cancelled
as the expected response:
var orchestrator = new DurableOrchestrator(commandProducers, sagaStatePersisters);
SagaState sagaState = await orchestrator.OrchestrateAsync(item, mockContext.Object, loggerMock.Object);
Assert.Equal(SagaState.Cancelled, sagaState);