diff --git a/README.md b/README.md index 692d5efe..63e4209b 100644 --- a/README.md +++ b/README.md @@ -541,6 +541,9 @@ Some things to note about the above code: * A shortcut extension `ExecuteWorkflowAsync` is available that is just `StartWorkflowAsync` + `GetResultAsync`. * `SignalWithStart` method is present on the workflow options to make the workflow call a signal-with-start call which means it will only start the workflow if it's not running, but send a signal to it regardless. +* Separate `StartUpdateWithStartWorkflowAsync` and `ExecuteUpdateWithStartWorkflowAsync` methods are present on the + client to make the workflow call an update-with-start call which means it may start the workflow if it's not running, + but perform an update on it regardless. #### Invoking Activities diff --git a/src/Temporalio.Extensions.OpenTelemetry/TracingInterceptor.cs b/src/Temporalio.Extensions.OpenTelemetry/TracingInterceptor.cs index 72eeb8c2..06d55198 100644 --- a/src/Temporalio.Extensions.OpenTelemetry/TracingInterceptor.cs +++ b/src/Temporalio.Extensions.OpenTelemetry/TracingInterceptor.cs @@ -124,6 +124,27 @@ protected virtual IDictionary HeadersFromContext( return Enumerable.Empty>(); } + /// + /// Create tag collection for the given workflow and update ID. + /// + /// Workflow ID. + /// Update ID. + /// Tags. + protected virtual IEnumerable> CreateUpdateTags( + string workflowId, string? updateId) + { + var ret = new List>(2); + if (Options.TagNameWorkflowId is string wfName) + { + ret.Add(new(wfName, workflowId)); + } + if (Options.TagNameUpdateId is string updateName && updateId is { } nonNullUpdateId) + { + ret.Add(new(updateName, nonNullUpdateId)); + } + return ret; + } + /// /// Create tag collection from the current workflow environment. Must be called within a /// workflow. @@ -208,6 +229,48 @@ public override async Task> StartWorkflowAsyn } } + public override async Task> StartUpdateWithStartWorkflowAsync( + StartUpdateWithStartWorkflowInput input) + { + // Ignore if for some reason the start operation is not set by this interceptor + if (input.Options.StartWorkflowOperation == null) + { + return await base.StartUpdateWithStartWorkflowAsync(input).ConfigureAwait(false); + } + + using (var activity = ClientSource.StartActivity( + $"UpdateWithStartWorkflow:{input.Options.StartWorkflowOperation.Workflow}", + kind: ActivityKind.Client, + parentContext: default, + tags: root.CreateUpdateTags( + workflowId: input.Options.StartWorkflowOperation.Options.Id!, + updateId: input.Options.Id))) + { + // We want the header on _both_ start and update + if (HeadersFromContext(input.Headers) is Dictionary updateHeaders) + { + input = input with { Headers = updateHeaders }; + } + if (HeadersFromContext(input.Options.StartWorkflowOperation.Headers) is Dictionary startHeaders) + { + // We copy the operation but still mutate the existing headers. This is + // similar to what is done by other interceptors (they copy the input + // object but still mutate the original header dictionary if there). + input.Options.StartWorkflowOperation = (WithStartWorkflowOperation)input.Options.StartWorkflowOperation.Clone(); + input.Options.StartWorkflowOperation.Headers = startHeaders; + } + try + { + return await base.StartUpdateWithStartWorkflowAsync(input).ConfigureAwait(false); + } + catch (Exception e) + { + RecordExceptionWithStatus(activity, e); + throw; + } + } + } + public override async Task SignalWorkflowAsync(SignalWorkflowInput input) { using (var activity = ClientSource.StartActivity( @@ -263,7 +326,7 @@ public override async Task> StartWorkflowUpdateAsy $"UpdateWorkflow:{input.Update}", kind: ActivityKind.Client, parentContext: default, - tags: root.CreateWorkflowTags(input.Id))) + tags: root.CreateUpdateTags(workflowId: input.Id, updateId: input.Options.Id))) { if (HeadersFromContext(input.Headers) is Dictionary headers) { diff --git a/src/Temporalio.Extensions.OpenTelemetry/TracingInterceptorOptions.cs b/src/Temporalio.Extensions.OpenTelemetry/TracingInterceptorOptions.cs index d8abc466..efbc4f5b 100644 --- a/src/Temporalio.Extensions.OpenTelemetry/TracingInterceptorOptions.cs +++ b/src/Temporalio.Extensions.OpenTelemetry/TracingInterceptorOptions.cs @@ -33,6 +33,11 @@ public class TracingInterceptorOptions : ICloneable /// public string? TagNameActivityId { get; set; } = "temporalActivityID"; + /// + /// Gets or sets the tag name for update IDs. If null, no tag is created. + /// + public string? TagNameUpdateId { get; set; } = "temporalUpdateID"; + /// /// Create a shallow copy of these options. /// diff --git a/src/Temporalio/Client/ITemporalClient.Workflow.cs b/src/Temporalio/Client/ITemporalClient.Workflow.cs index 78024daa..80d44c55 100644 --- a/src/Temporalio/Client/ITemporalClient.Workflow.cs +++ b/src/Temporalio/Client/ITemporalClient.Workflow.cs @@ -92,6 +92,84 @@ WorkflowHandle GetWorkflowHandle( WorkflowHandle GetWorkflowHandle( string id, string? runId = null, string? firstExecutionRunId = null); + /// + /// Start an update via a call to a WorkflowUpdate attributed method, possibly starting the + /// workflow at the same time. Note that in some cases this call may fail but the workflow + /// will still be started. + /// + /// Workflow class type. + /// Invocation of workflow update method. + /// Update options. Currently WaitForStage is required. + /// Workflow update handle. + /// Invalid run call or options. + /// + /// Workflow was already started according to ID reuse and conflict policy. + /// + /// Server-side error. + /// WARNING: Workflow update with start is experimental and APIs may change. + /// + Task StartUpdateWithStartWorkflowAsync( + Expression> updateCall, + WorkflowStartUpdateWithStartOptions options); + + /// + /// Start an update via a call to a WorkflowUpdate attributed method, possibly starting the + /// workflow at the same time. Note that in some cases this call may fail but the workflow + /// will still be started. + /// + /// Workflow class type. + /// Update result type. + /// Invocation of workflow update method. + /// Update options. Currently WaitForStage is required. + /// Workflow update handle. + /// Invalid run call or options. + /// + /// Workflow was already started according to ID reuse and conflict policy. + /// + /// Server-side error. + /// WARNING: Workflow update with start is experimental and APIs may change. + /// + Task> StartUpdateWithStartWorkflowAsync( + Expression>> updateCall, + WorkflowStartUpdateWithStartOptions options); + + /// + /// Start an update using its name, possibly starting the workflow at the same time. Note + /// that in some cases this call may fail but the workflow will still be started. + /// + /// Name of the update. + /// Arguments for the update. + /// Update options. Currently WaitForStage is required. + /// Workflow update handle. + /// Invalid run call or options. + /// + /// Workflow was already started according to ID reuse and conflict policy. + /// + /// Server-side error. + /// WARNING: Workflow update with start is experimental and APIs may change. + /// + Task StartUpdateWithStartWorkflowAsync( + string update, IReadOnlyCollection args, WorkflowStartUpdateWithStartOptions options); + + /// + /// Start an update using its name, possibly starting the workflow at the same time. Note + /// that in some cases this call may fail but the workflow will still be started. + /// + /// Update result type. + /// Name of the update. + /// Arguments for the update. + /// Update options. Currently WaitForStage is required. + /// Workflow update handle. + /// Invalid run call or options. + /// + /// Workflow was already started according to ID reuse and conflict policy. + /// + /// Server-side error. + /// WARNING: Workflow update with start is experimental and APIs may change. + /// + Task> StartUpdateWithStartWorkflowAsync( + string update, IReadOnlyCollection args, WorkflowStartUpdateWithStartOptions options); + #if NETCOREAPP3_0_OR_GREATER /// /// List workflows with the given query. diff --git a/src/Temporalio/Client/ITemporalClientExtensions.cs b/src/Temporalio/Client/ITemporalClientExtensions.cs index d980de3a..94b743c2 100644 --- a/src/Temporalio/Client/ITemporalClientExtensions.cs +++ b/src/Temporalio/Client/ITemporalClientExtensions.cs @@ -133,6 +133,142 @@ public static async Task ExecuteWorkflowAsync( return await handle.GetResultAsync().ConfigureAwait(false); } + /// + /// Start an update via a call to a WorkflowUpdate attributed method, possibly starting the + /// workflow at the same time. Note that in some cases this call may fail but the workflow + /// will still be started. This is a shortcut for + /// + /// + + /// . + /// + /// Workflow class type. + /// Client to use. + /// Invocation of workflow update method. + /// Update options. + /// Completed update task. + /// Invalid run call or options. + /// + /// Workflow was already started according to ID reuse and conflict policy. + /// + /// + /// Workflow update failed, but the with-start operation still got a workflow handle. + /// + /// Server-side error. + /// WARNING: Workflow update with start is experimental and APIs may change. + /// + public static async Task ExecuteUpdateWithStartWorkflowAsync( + this ITemporalClient client, + Expression> updateCall, + WorkflowUpdateWithStartOptions options) + { + var handle = await client.StartUpdateWithStartWorkflowAsync( + updateCall, UpdateWithStartOptionsWithDefaultsForExecute(options)).ConfigureAwait(false); + await handle.GetResultAsync(rpcOptions: options.Rpc).ConfigureAwait(false); + } + + /// + /// Start an update via a call to a WorkflowUpdate attributed method, possibly starting the + /// workflow at the same time. Note that in some cases this call may fail but the workflow + /// will still be started. This is a shortcut for + /// + /// + + /// . + /// + /// Workflow class type. + /// Update result type. + /// Client to use. + /// Invocation of workflow update method. + /// Update options. + /// Completed update task. + /// Invalid run call or options. + /// + /// Workflow was already started according to ID reuse and conflict policy. + /// + /// + /// Workflow update failed, but the with-start operation still got a workflow handle. + /// + /// Server-side error. + /// WARNING: Workflow update with start is experimental and APIs may change. + /// + public static async Task ExecuteUpdateWithStartWorkflowAsync( + this ITemporalClient client, + Expression>> updateCall, + WorkflowUpdateWithStartOptions options) + { + var handle = await client.StartUpdateWithStartWorkflowAsync( + updateCall, UpdateWithStartOptionsWithDefaultsForExecute(options)).ConfigureAwait(false); + return await handle.GetResultAsync(rpcOptions: options.Rpc).ConfigureAwait(false); + } + + /// + /// Start an update using its name, possibly starting the workflow at the same time. Note + /// that in some cases this call may fail but the workflow will still be started. This is a + /// shortcut for + /// + /// + + /// . + /// + /// Client to use. + /// Name of the update. + /// Arguments for the update. + /// Update options. + /// Completed update task. + /// Invalid run call or options. + /// + /// Workflow was already started according to ID reuse and conflict policy. + /// + /// + /// Workflow update failed, but the with-start operation still got a workflow handle. + /// + /// Server-side error. + /// WARNING: Workflow update with start is experimental and APIs may change. + /// + public static async Task ExecuteUpdateWithStartWorkflowAsync( + this ITemporalClient client, + string update, + IReadOnlyCollection args, + WorkflowUpdateWithStartOptions options) + { + var handle = await client.StartUpdateWithStartWorkflowAsync( + update, args, UpdateWithStartOptionsWithDefaultsForExecute(options)).ConfigureAwait(false); + await handle.GetResultAsync(rpcOptions: options.Rpc).ConfigureAwait(false); + } + + /// + /// Start an update using its name, possibly starting the workflow at the same time. Note + /// that in some cases this call may fail but the workflow will still be started. This is a + /// shortcut for + /// + /// + + /// . + /// + /// Update result type. + /// Client to use. + /// Name of the update. + /// Arguments for the update. + /// Update options. + /// Completed update task. + /// Invalid run call or options. + /// + /// Workflow was already started according to ID reuse and conflict policy. + /// + /// + /// Workflow update failed, but the with-start operation still got a workflow handle. + /// + /// Server-side error. + /// WARNING: Workflow update with start is experimental and APIs may change. + /// + public static async Task ExecuteUpdateWithStartWorkflowAsync( + this ITemporalClient client, + string update, + IReadOnlyCollection args, + WorkflowUpdateWithStartOptions options) + { + var handle = await client.StartUpdateWithStartWorkflowAsync( + update, args, UpdateWithStartOptionsWithDefaultsForExecute(options)).ConfigureAwait(false); + return await handle.GetResultAsync(rpcOptions: options.Rpc).ConfigureAwait(false); + } + #if NETCOREAPP3_0_OR_GREATER /// /// List workflow histories. This is just a helper combining @@ -157,5 +293,15 @@ public static async IAsyncEnumerable ListWorkflowHistoriesAsync } } #endif + + private static WorkflowStartUpdateWithStartOptions UpdateWithStartOptionsWithDefaultsForExecute( + WorkflowUpdateWithStartOptions options) => + (WorkflowStartUpdateWithStartOptions)new WorkflowStartUpdateWithStartOptions() + { + Id = options.Id, + Rpc = options.Rpc, + StartWorkflowOperation = options.StartWorkflowOperation, + WaitForStage = WorkflowUpdateStage.Completed, + }.Clone(); } } \ No newline at end of file diff --git a/src/Temporalio/Client/Interceptors/ClientOutboundInterceptor.Workflow.cs b/src/Temporalio/Client/Interceptors/ClientOutboundInterceptor.Workflow.cs index 7f0b3310..4b191f37 100644 --- a/src/Temporalio/Client/Interceptors/ClientOutboundInterceptor.Workflow.cs +++ b/src/Temporalio/Client/Interceptors/ClientOutboundInterceptor.Workflow.cs @@ -19,6 +19,16 @@ public virtual Task> StartWorkflowAsync Next.StartWorkflowAsync(input); + /// + /// Intercept start update with start workflow calls. + /// + /// Update result type. + /// Input details of the call. + /// Handle for the update. + public virtual Task> StartUpdateWithStartWorkflowAsync( + StartUpdateWithStartWorkflowInput input) => + Next.StartUpdateWithStartWorkflowAsync(input); + /// /// Intercept signal workflow calls. /// diff --git a/src/Temporalio/Client/Interceptors/StartUpdateWithStartWorkflowInput.cs b/src/Temporalio/Client/Interceptors/StartUpdateWithStartWorkflowInput.cs new file mode 100644 index 00000000..9878343a --- /dev/null +++ b/src/Temporalio/Client/Interceptors/StartUpdateWithStartWorkflowInput.cs @@ -0,0 +1,25 @@ +using System.Collections.Generic; +using Temporalio.Api.Common.V1; + +namespace Temporalio.Client.Interceptors +{ + /// + /// Input for . + /// + /// Update name. + /// Update arguments. + /// Options. + /// Headers if any for the update. These will be encoded using the codec + /// before sent to the server. Note these are the update headers, start headers are in the start + /// operation. + /// + /// WARNING: This constructor may have required properties added. Do not rely on the exact + /// constructor, only use "with" clauses. + /// + /// WARNING: Workflow update with start is experimental and APIs may change. + public record StartUpdateWithStartWorkflowInput( + string Update, + IReadOnlyCollection Args, + WorkflowStartUpdateWithStartOptions Options, + IDictionary? Headers); +} \ No newline at end of file diff --git a/src/Temporalio/Client/TemporalClient.Workflow.cs b/src/Temporalio/Client/TemporalClient.Workflow.cs index c710e1cd..2cd6daac 100644 --- a/src/Temporalio/Client/TemporalClient.Workflow.cs +++ b/src/Temporalio/Client/TemporalClient.Workflow.cs @@ -1,5 +1,6 @@ using System; using System.Collections.Generic; +using System.Linq; using System.Linq.Expressions; using System.Threading.Tasks; using Google.Protobuf; @@ -10,6 +11,7 @@ using Temporalio.Api.TaskQueue.V1; using Temporalio.Api.WorkflowService.V1; using Temporalio.Client.Interceptors; +using Temporalio.Common; using Temporalio.Converters; using Temporalio.Exceptions; @@ -70,6 +72,44 @@ public WorkflowHandle GetWorkflowHandle( string id, string? runId = null, string? firstExecutionRunId = null) => new(Client: this, Id: id, RunId: runId, FirstExecutionRunId: firstExecutionRunId); + /// + public Task StartUpdateWithStartWorkflowAsync( + Expression> updateCall, + WorkflowStartUpdateWithStartOptions options) + { + var (method, args) = ExpressionUtil.ExtractCall(updateCall); + return StartUpdateWithStartWorkflowAsync( + Workflows.WorkflowUpdateDefinition.NameFromMethodForCall(method), + args, + options); + } + + /// + public Task> StartUpdateWithStartWorkflowAsync( + Expression>> updateCall, + WorkflowStartUpdateWithStartOptions options) + { + var (method, args) = ExpressionUtil.ExtractCall(updateCall); + return StartUpdateWithStartWorkflowAsync( + Workflows.WorkflowUpdateDefinition.NameFromMethodForCall(method), + args, + options); + } + + /// + public async Task StartUpdateWithStartWorkflowAsync( + string update, IReadOnlyCollection args, WorkflowStartUpdateWithStartOptions options) => + await StartUpdateWithStartWorkflowAsync(update, args, options).ConfigureAwait(false); + + /// + public Task> StartUpdateWithStartWorkflowAsync( + string update, IReadOnlyCollection args, WorkflowStartUpdateWithStartOptions options) => + OutboundInterceptor.StartUpdateWithStartWorkflowAsync(new( + Update: update, + Args: args, + Options: options, + Headers: null)); + #if NETCOREAPP3_0_OR_GREATER /// public IAsyncEnumerable ListWorkflowsAsync( @@ -115,6 +155,156 @@ public override async Task> StartWorkflowAsyn } } + /// + public override async Task> StartUpdateWithStartWorkflowAsync( + StartUpdateWithStartWorkflowInput input) + { + // Try to mark used before using + if (input.Options.StartWorkflowOperation == null) + { + throw new ArgumentException("Start workflow operation is required in options"); + } + if (!input.Options.StartWorkflowOperation.TryMarkUsed()) + { + throw new ArgumentException("Start operation already used"); + } + + // We choose to put everything in one large try statement because we want to make + // sure that any failures are also propagated to the waiter of the handle too, not + // just thrown out of here. + try + { + // Disallow some options in start that don't work here, and require others + if (input.Options.StartWorkflowOperation.Options.StartSignal != null || + input.Options.StartWorkflowOperation.Options.StartSignalArgs != null) + { + throw new ArgumentException("Cannot have start signal on update with start"); + } + if (input.Options.StartWorkflowOperation.Options.RequestEagerStart) + { + throw new ArgumentException("Cannot request eager start on update with start"); + } + if (input.Options.StartWorkflowOperation.Options.IdConflictPolicy == WorkflowIdConflictPolicy.Unspecified) + { + throw new ArgumentException("Workflow ID conflict policy required for update with start"); + } + if (input.Options.StartWorkflowOperation.Options.Rpc != null) + { + throw new ArgumentException("Cannot set RPC options on start options, set them on the update options"); + } + + // Build request + var startReq = await CreateStartWorkflowRequestAsync( + input.Options.StartWorkflowOperation.Workflow, + input.Options.StartWorkflowOperation.Args, + input.Options.StartWorkflowOperation.Options, + input.Options.StartWorkflowOperation.Headers).ConfigureAwait(false); + var updateReq = await CreateUpdateWorkflowRequestAsync( + input.Update, + input.Args, + input.Options, + input.Options.WaitForStage, + input.Headers).ConfigureAwait(false); + updateReq.WorkflowExecution = new() { WorkflowId = startReq.WorkflowId }; + var req = new ExecuteMultiOperationRequest() { Namespace = Client.Options.Namespace }; + req.Operations.Add( + new ExecuteMultiOperationRequest.Types.Operation() { StartWorkflow = startReq }); + req.Operations.Add( + new ExecuteMultiOperationRequest.Types.Operation() { UpdateWorkflow = updateReq }); + + // Continually try to start until an exception occurs, the user-asked stage is + // reached, or the stage is accepted. But we will set the workflow handle as soon as + // we can. + UpdateWorkflowExecutionResponse? updateResp = null; + string? runId = null; + do + { + var resp = await Client.Connection.WorkflowService.ExecuteMultiOperationAsync( + req, DefaultRetryOptions(input.Options.Rpc)).ConfigureAwait(false); + // Set start result if not already set + runId = resp.Responses[0].StartWorkflow.RunId; + if (!input.Options.StartWorkflowOperation.IsCompleted) + { + input.Options.StartWorkflowOperation.SetResult((WorkflowHandle)Activator.CreateInstance( + input.Options.StartWorkflowOperation.HandleType, + // Parameters on workflow handles are always: client, ID, run ID, + // result run ID, and first execution run ID. This code is tested to + // confirm. + Client, + input.Options.StartWorkflowOperation.Options.Id!, + null, + runId, + runId)!); + } + updateResp = resp.Responses[1].UpdateWorkflow; + } + while (updateResp == null || updateResp.Stage < UpdateWorkflowExecutionLifecycleStage.Accepted); + + // If the requested stage is completed, wait for result, but discard the update + // exception, that will come when _they_ call get result + var handle = new WorkflowUpdateHandle( + Client, updateReq.Request.Meta.UpdateId, input.Options.StartWorkflowOperation.Options.Id!, runId) + { KnownOutcome = updateResp.Outcome }; + if (input.Options.WaitForStage == WorkflowUpdateStage.Completed) + { + await handle.PollUntilOutcomeAsync(input.Options.Rpc).ConfigureAwait(false); + } + return handle; + } + catch (Exception e) + { + // If this is a multi-operation failure, set exception to the first non-aborted + if (e is RpcException rpcErr) + { + var status = rpcErr.GrpcStatus.Value; + if (status != null && status.Details.Count == 1) + { + if (status.Details[0].TryUnpack(out Api.ErrorDetails.V1.MultiOperationExecutionFailure failure)) + { + var nonAborted = failure.Statuses.FirstOrDefault(s => s.Details.Count == 0 || + !s.Details[0].Is(Api.Failure.V1.MultiOperationExecutionAborted.Descriptor)); + var grpcStatus = new GrpcStatus() { Code = nonAborted.Code, Message = nonAborted.Message }; + grpcStatus.Details.AddRange(nonAborted.Details); + e = new RpcException(grpcStatus); + } + } + } + + // If this is a cancellation, use the update cancel exception + if (e is OperationCanceledException || (e is RpcException rpcErr2 && ( + rpcErr2.Code == RpcException.StatusCode.DeadlineExceeded || + rpcErr2.Code == RpcException.StatusCode.Cancelled))) + { + e = new WorkflowUpdateRpcTimeoutOrCanceledException(e); + } + + // Create workflow-already-started failure if it is that + if (e is RpcException rpcErr3 && rpcErr3.Code == RpcException.StatusCode.AlreadyExists) + { + var status = rpcErr3.GrpcStatus.Value; + if (status != null && + status.Details.Count == 1 && + status.Details[0].TryUnpack(out Api.ErrorDetails.V1.WorkflowExecutionAlreadyStartedFailure failure)) + { + e = new WorkflowAlreadyStartedException( + e.Message, + // "" should never happen if it got an RPC exception + workflowId: input.Options.StartWorkflowOperation?.Options?.Id ?? "", + workflowType: input.Options.StartWorkflowOperation?.Workflow ?? "", + runId: failure.RunId); + } + } + + // Before we throw here, we want to try to set the start operation exception + // if it has not already been completed + if (input.Options.StartWorkflowOperation?.IsCompleted == false) + { + input.Options.StartWorkflowOperation.SetException(e); + } + throw e; + } + } + /// public override async Task SignalWorkflowAsync(SignalWorkflowInput input) { @@ -240,50 +430,14 @@ public async override Task> StartWorkflowUpdateAsy "Admitted is not an allowed wait stage to start workflow update"); } // Build request - var req = new UpdateWorkflowExecutionRequest() + var req = await CreateUpdateWorkflowRequestAsync( + input.Update, input.Args, input.Options, input.Options.WaitForStage, input.Headers).ConfigureAwait(false); + req.WorkflowExecution = new() { - Namespace = Client.Options.Namespace, - WorkflowExecution = new() - { - WorkflowId = input.Id, - RunId = input.RunId ?? string.Empty, - }, - Request = new() - { - Meta = new() - { - UpdateId = input.Options.Id ?? Guid.NewGuid().ToString(), - Identity = Client.Connection.Options.Identity, - }, - Input = new() { Name = input.Update }, - }, - WaitPolicy = new() - { - LifecycleStage = (UpdateWorkflowExecutionLifecycleStage)input.Options.WaitForStage, - }, - FirstExecutionRunId = input.FirstExecutionRunId ?? string.Empty, + WorkflowId = input.Id, + RunId = input.RunId ?? string.Empty, }; - if (input.Args.Count > 0) - { - req.Request.Input.Args = new Payloads(); - req.Request.Input.Args.Payloads_.AddRange( - await Client.Options.DataConverter.ToPayloadsAsync( - input.Args).ConfigureAwait(false)); - } - if (input.Headers != null) - { - req.Request.Input.Header = new(); - req.Request.Input.Header.Fields.Add(input.Headers); - // If there is a payload codec, use it to encode the headers - if (Client.Options.DataConverter.PayloadCodec is IPayloadCodec codec) - { - foreach (var kvp in req.Request.Input.Header.Fields) - { - req.Request.Input.Header.Fields[kvp.Key] = - await codec.EncodeSingleAsync(kvp.Value).ConfigureAwait(false); - } - } - } + req.FirstExecutionRunId = input.FirstExecutionRunId ?? string.Empty; // Continually try to start until the user-asked stage is reached or the stage is // accepted @@ -303,8 +457,7 @@ await Client.Options.DataConverter.ToPayloadsAsync( throw new WorkflowUpdateRpcTimeoutOrCanceledException(e); } } - while (resp.Stage < req.WaitPolicy.LifecycleStage && - resp.Stage < UpdateWorkflowExecutionLifecycleStage.Accepted); + while (resp.Stage < UpdateWorkflowExecutionLifecycleStage.Accepted); // If the requested stage is completed, wait for result, but discard the update // exception, that will come when _they_ call get result @@ -498,56 +651,121 @@ private async Task> StartWorkflowInternalAsyn { // We will build the non-signal-with-start request and convert to signal with start // later if needed + var req = await CreateStartWorkflowRequestAsync( + input.Workflow, input.Args, input.Options, input.Headers).ConfigureAwait(false); + + // If not signal with start, just run and return + if (input.Options.StartSignal == null) + { + if (input.Options.StartSignalArgs != null) + { + throw new ArgumentException("Cannot have start signal args without start signal"); + } + var resp = await Client.Connection.WorkflowService.StartWorkflowExecutionAsync( + req, DefaultRetryOptions(input.Options.Rpc)).ConfigureAwait(false); + return new WorkflowHandle( + Client: Client, + Id: req.WorkflowId, + ResultRunId: resp.RunId, + FirstExecutionRunId: resp.RunId); + } + + // Since it's signal with start, convert and run + var signalReq = new SignalWithStartWorkflowExecutionRequest() + { + Namespace = req.Namespace, + WorkflowId = req.WorkflowId, + WorkflowType = req.WorkflowType, + TaskQueue = req.TaskQueue, + Input = req.Input, + WorkflowExecutionTimeout = req.WorkflowExecutionTimeout, + WorkflowRunTimeout = req.WorkflowRunTimeout, + WorkflowTaskTimeout = req.WorkflowTaskTimeout, + Identity = req.Identity, + RequestId = req.RequestId, + WorkflowIdReusePolicy = req.WorkflowIdReusePolicy, + RetryPolicy = req.RetryPolicy, + CronSchedule = req.CronSchedule, + Memo = req.Memo, + SearchAttributes = req.SearchAttributes, + Header = req.Header, + WorkflowStartDelay = req.WorkflowStartDelay, + SignalName = input.Options.StartSignal, + WorkflowIdConflictPolicy = input.Options.IdConflictPolicy, + UserMetadata = req.UserMetadata, + }; + if (input.Options.StartSignalArgs != null && input.Options.StartSignalArgs.Count > 0) + { + signalReq.SignalInput = new Payloads(); + signalReq.SignalInput.Payloads_.AddRange( + await Client.Options.DataConverter.ToPayloadsAsync( + input.Options.StartSignalArgs).ConfigureAwait(false)); + } + var signalResp = await Client.Connection.WorkflowService.SignalWithStartWorkflowExecutionAsync( + signalReq, DefaultRetryOptions(input.Options.Rpc)).ConfigureAwait(false); + // Notice we do _not_ set first execution run ID for signal with start + return new WorkflowHandle( + Client: Client, + Id: req.WorkflowId, + ResultRunId: signalResp.RunId); + } + + private async Task CreateStartWorkflowRequestAsync( + string workflow, + IReadOnlyCollection args, + WorkflowOptions options, + IDictionary? headers) + { var req = new StartWorkflowExecutionRequest() { Namespace = Client.Options.Namespace, - WorkflowId = input.Options.Id ?? + WorkflowId = options.Id ?? throw new ArgumentException("ID required to start workflow"), - WorkflowType = new WorkflowType() { Name = input.Workflow }, + WorkflowType = new WorkflowType() { Name = workflow }, TaskQueue = new TaskQueue() { - Name = input.Options.TaskQueue ?? + Name = options.TaskQueue ?? throw new ArgumentException("Task queue required to start workflow"), }, Identity = Client.Connection.Options.Identity, RequestId = Guid.NewGuid().ToString(), - WorkflowIdReusePolicy = input.Options.IdReusePolicy, - WorkflowIdConflictPolicy = input.Options.IdConflictPolicy, - RetryPolicy = input.Options.RetryPolicy?.ToProto(), - RequestEagerExecution = input.Options.RequestEagerStart, + WorkflowIdReusePolicy = options.IdReusePolicy, + WorkflowIdConflictPolicy = options.IdConflictPolicy, + RetryPolicy = options.RetryPolicy?.ToProto(), + RequestEagerExecution = options.RequestEagerStart, UserMetadata = await Client.Options.DataConverter.ToUserMetadataAsync( - input.Options.StaticSummary, input.Options.StaticDetails). + options.StaticSummary, options.StaticDetails). ConfigureAwait(false), }; - if (input.Args.Count > 0) + if (args.Count > 0) { req.Input = new Payloads(); req.Input.Payloads_.AddRange(await Client.Options.DataConverter.ToPayloadsAsync( - input.Args).ConfigureAwait(false)); + args).ConfigureAwait(false)); } - if (input.Options.ExecutionTimeout != null) + if (options.ExecutionTimeout != null) { req.WorkflowExecutionTimeout = Duration.FromTimeSpan( - (TimeSpan)input.Options.ExecutionTimeout); + (TimeSpan)options.ExecutionTimeout); } - if (input.Options.RunTimeout != null) + if (options.RunTimeout != null) { req.WorkflowRunTimeout = Duration.FromTimeSpan( - (TimeSpan)input.Options.RunTimeout); + (TimeSpan)options.RunTimeout); } - if (input.Options.TaskTimeout != null) + if (options.TaskTimeout != null) { req.WorkflowTaskTimeout = Duration.FromTimeSpan( - (TimeSpan)input.Options.TaskTimeout); + (TimeSpan)options.TaskTimeout); } - if (input.Options.CronSchedule != null) + if (options.CronSchedule != null) { - req.CronSchedule = input.Options.CronSchedule; + req.CronSchedule = options.CronSchedule; } - if (input.Options.Memo != null && input.Options.Memo.Count > 0) + if (options.Memo != null && options.Memo.Count > 0) { req.Memo = new(); - foreach (var field in input.Options.Memo) + foreach (var field in options.Memo) { if (field.Value == null) { @@ -558,18 +776,18 @@ private async Task> StartWorkflowInternalAsyn await Client.Options.DataConverter.ToPayloadAsync(field.Value).ConfigureAwait(false)); } } - if (input.Options.TypedSearchAttributes != null && input.Options.TypedSearchAttributes.Count > 0) + if (options.TypedSearchAttributes != null && options.TypedSearchAttributes.Count > 0) { - req.SearchAttributes = input.Options.TypedSearchAttributes.ToProto(); + req.SearchAttributes = options.TypedSearchAttributes.ToProto(); } - if (input.Options.StartDelay is { } startDelay) + if (options.StartDelay is { } startDelay) { req.WorkflowStartDelay = Duration.FromTimeSpan(startDelay); } - if (input.Headers != null) + if (headers != null) { req.Header = new(); - req.Header.Fields.Add(input.Headers); + req.Header.Fields.Add(headers); // If there is a payload codec, use it to encode the headers if (Client.Options.DataConverter.PayloadCodec is IPayloadCodec codec) { @@ -580,61 +798,64 @@ private async Task> StartWorkflowInternalAsyn } } } + return req; + } - // If not signal with start, just run and return - if (input.Options.StartSignal == null) + private async Task CreateUpdateWorkflowRequestAsync( + string update, + IReadOnlyCollection args, + WorkflowUpdateOptions options, + WorkflowUpdateStage waitForStage, + IDictionary? headers) + { + if (waitForStage == WorkflowUpdateStage.None) { - if (input.Options.StartSignalArgs != null) - { - throw new ArgumentException("Cannot have start signal args without start signal"); - } - var resp = await Client.Connection.WorkflowService.StartWorkflowExecutionAsync( - req, DefaultRetryOptions(input.Options.Rpc)).ConfigureAwait(false); - return new WorkflowHandle( - Client: Client, - Id: req.WorkflowId, - ResultRunId: resp.RunId, - FirstExecutionRunId: resp.RunId); + throw new ArgumentException("WaitForStage is required to start workflow update"); } - - // Since it's signal with start, convert and run - var signalReq = new SignalWithStartWorkflowExecutionRequest() + else if (waitForStage == WorkflowUpdateStage.Admitted) { - Namespace = req.Namespace, - WorkflowId = req.WorkflowId, - WorkflowType = req.WorkflowType, - TaskQueue = req.TaskQueue, - Input = req.Input, - WorkflowExecutionTimeout = req.WorkflowExecutionTimeout, - WorkflowRunTimeout = req.WorkflowRunTimeout, - WorkflowTaskTimeout = req.WorkflowTaskTimeout, - Identity = req.Identity, - RequestId = req.RequestId, - WorkflowIdReusePolicy = req.WorkflowIdReusePolicy, - RetryPolicy = req.RetryPolicy, - CronSchedule = req.CronSchedule, - Memo = req.Memo, - SearchAttributes = req.SearchAttributes, - Header = req.Header, - WorkflowStartDelay = req.WorkflowStartDelay, - SignalName = input.Options.StartSignal, - WorkflowIdConflictPolicy = input.Options.IdConflictPolicy, - UserMetadata = req.UserMetadata, + throw new ArgumentException( + "Admitted is not an allowed wait stage to start workflow update"); + } + // Build request + var req = new UpdateWorkflowExecutionRequest() + { + Namespace = Client.Options.Namespace, + Request = new() + { + Meta = new() + { + UpdateId = options.Id ?? Guid.NewGuid().ToString(), + Identity = Client.Connection.Options.Identity, + }, + Input = new() { Name = update }, + }, + WaitPolicy = new() + { + LifecycleStage = (UpdateWorkflowExecutionLifecycleStage)waitForStage, + }, }; - if (input.Options.StartSignalArgs != null && input.Options.StartSignalArgs.Count > 0) + if (args.Count > 0) { - signalReq.SignalInput = new Payloads(); - signalReq.SignalInput.Payloads_.AddRange( - await Client.Options.DataConverter.ToPayloadsAsync( - input.Options.StartSignalArgs).ConfigureAwait(false)); + req.Request.Input.Args = new Payloads(); + req.Request.Input.Args.Payloads_.AddRange( + await Client.Options.DataConverter.ToPayloadsAsync(args).ConfigureAwait(false)); } - var signalResp = await Client.Connection.WorkflowService.SignalWithStartWorkflowExecutionAsync( - signalReq, DefaultRetryOptions(input.Options.Rpc)).ConfigureAwait(false); - // Notice we do _not_ set first execution run ID for signal with start - return new WorkflowHandle( - Client: Client, - Id: req.WorkflowId, - ResultRunId: signalResp.RunId); + if (headers != null) + { + req.Request.Input.Header = new(); + req.Request.Input.Header.Fields.Add(headers); + // If there is a payload codec, use it to encode the headers + if (Client.Options.DataConverter.PayloadCodec is IPayloadCodec codec) + { + foreach (var kvp in req.Request.Input.Header.Fields) + { + req.Request.Input.Header.Fields[kvp.Key] = + await codec.EncodeSingleAsync(kvp.Value).ConfigureAwait(false); + } + } + } + return req; } } } diff --git a/src/Temporalio/Client/WithStartWorkflowOperation.cs b/src/Temporalio/Client/WithStartWorkflowOperation.cs new file mode 100644 index 00000000..91034169 --- /dev/null +++ b/src/Temporalio/Client/WithStartWorkflowOperation.cs @@ -0,0 +1,234 @@ +#pragma warning disable SA1402 // We are ok with two types of the same name in the same file + +using System; +using System.Collections.Generic; +using System.Linq.Expressions; +using System.Threading; +using System.Threading.Tasks; +using Temporalio.Api.Common.V1; + +namespace Temporalio.Client +{ + /// + /// Representation of a workflow start operation to be used in + /// and + /// for update with start calls. + /// + /// WARNING: Workflow update with start is experimental and APIs may change. + public abstract class WithStartWorkflowOperation : ICloneable + { + // Atomic integer used to check if used yet (0 if unused, 1 if used) + private int used; + + /// + /// Initializes a new instance of the class. + /// + /// Workflow type name. + /// Arguments for the workflow. + /// Workflow options. + internal WithStartWorkflowOperation( + string workflow, IReadOnlyCollection args, WorkflowOptions options) + { + Workflow = workflow; + Args = args; + Options = options; + } + + /// + /// Gets or sets the workflow type to start. + /// + public string Workflow { get; set; } + + /// + /// Gets or sets the workflow arguments. + /// + public IReadOnlyCollection Args { get; set; } + + /// + /// Gets or sets the workflow options. + /// + public WorkflowOptions Options { get; set; } + + /// + /// Gets or sets the workflow headers. + /// + /// + /// NOTE: Some interceptors may mutate the dictionary. + /// +#pragma warning disable CA2227 // We are ok allowing this field to be mutated as a whole + public IDictionary? Headers { get; set; } +#pragma warning restore CA2227 + + /// + /// Gets the workflow handle type. + /// + internal abstract Type HandleType { get; } + + /// + /// Gets a value indicating whether or not the result or exception has been set. + /// + internal abstract bool IsCompleted { get; } + + /// + /// Create a start workflow operation via lambda invoking the run method. + /// + /// Workflow class type. + /// Workflow result type. + /// Invocation of workflow run method with a result. + /// Start workflow options. Id, TaskQueue, and + /// IdConflictPolicy are required. StartSignal, StartSignalArgs, + /// RequestEagerStart, and Rpc are disallowed. + /// Start workflow operation. + /// WARNING: Workflow update with start is experimental and APIs may change. + /// + public static WithStartWorkflowOperation> Create( + Expression>> workflowRunCall, WorkflowOptions options) + { + var (runMethod, args) = Common.ExpressionUtil.ExtractCall(workflowRunCall); + return new( + Workflows.WorkflowDefinition.NameFromRunMethodForCall(runMethod), + args, + options); + } + + /// + /// Create a start workflow operation via lambda invoking the run method. + /// + /// Workflow class type. + /// Invocation of workflow run method. + /// Start workflow options. Id, TaskQueue, and + /// IdConflictPolicy are required. StartSignal, StartSignalArgs, + /// RequestEagerStart, and Rpc are disallowed. + /// Start workflow operation. + /// WARNING: Workflow update with start is experimental and APIs may change. + /// + public static WithStartWorkflowOperation> Create( + Expression> workflowRunCall, WorkflowOptions options) + { + var (runMethod, args) = Common.ExpressionUtil.ExtractCall(workflowRunCall); + return new( + Workflows.WorkflowDefinition.NameFromRunMethodForCall(runMethod), + args, + options); + } + + /// + /// Create a start workflow operation by name. + /// + /// Workflow type name. + /// Arguments for the workflow. + /// Start workflow options. Id, TaskQueue, and + /// IdConflictPolicy are required. StartSignal, StartSignalArgs, + /// RequestEagerStart, and Rpc are disallowed. + /// Start workflow operation. + /// WARNING: Workflow update with start is experimental and APIs may change. + /// + public static WithStartWorkflowOperation Create( + string workflow, IReadOnlyCollection args, WorkflowOptions options) => + new(workflow, args, options); + + /// + /// Create a shallow copy of this operation. + /// + /// A shallow copy of these options and any transitive options fields. But it will + /// not clone the arguments, headers, nor will it clone the underlying promise that is + /// fulfilled when the workflow has started. + /// + public abstract object Clone(); + + /// + /// If not used, mark as such and return true. Otherwise, return false. + /// + /// True if marked used, false if already done before. + internal bool TryMarkUsed() => Interlocked.CompareExchange(ref used, 1, 0) == 0; + + /// + /// Set a successful workflow handle result. + /// + /// Workflow handle. + internal abstract void SetResult(WorkflowHandle result); + + /// + /// Set a failure to start workflow. + /// + /// Failure to start workflow. + internal abstract void SetException(Exception exception); + } + + /// + /// Representation of a workflow start operation to be used in + /// and + /// for update with start calls. + /// + /// Workflow handle type. + /// WARNING: Workflow update with start is experimental and APIs may change. + public sealed class WithStartWorkflowOperation : WithStartWorkflowOperation + where THandle : WorkflowHandle + { + private readonly TaskCompletionSource handleCompletionSource = new(); + + /// + /// Initializes a new instance of the class. + /// + /// Workflow type name. + /// Arguments for the workflow. + /// Workflow options. + internal WithStartWorkflowOperation( + string workflow, IReadOnlyCollection args, WorkflowOptions options) + : this(workflow, args, options, new()) + { + } + + /// + /// Initializes a new instance of the class. + /// + /// Workflow type name. + /// Arguments for the workflow. + /// Workflow options. + /// Task completion source for the handle. + internal WithStartWorkflowOperation( + string workflow, + IReadOnlyCollection args, + WorkflowOptions options, + TaskCompletionSource handleCompletionSource) + : base(workflow, args, options) + { + this.handleCompletionSource = handleCompletionSource; + } + + /// + internal override Type HandleType => typeof(THandle); + + /// + internal override bool IsCompleted => handleCompletionSource.Task.IsCompleted; + + /// + public override object Clone() => new WithStartWorkflowOperation( + workflow: Workflow, + args: Args, + options: (WorkflowOptions)Options.Clone(), + handleCompletionSource: handleCompletionSource); + + /// + /// Get the started workflow handle, waiting if needed. The method call directly will never + /// fail, but the task may with exceptions documented below. + /// + /// Task for the successfully completed handle. + /// Invalid run call or options. + /// + /// Workflow was already started according to ID reuse and conflict policy. + /// + /// Server-side error. + /// WARNING: Workflow update with start is experimental and APIs may change. + /// + public Task GetHandleAsync() => handleCompletionSource.Task; + + /// + internal override void SetResult(WorkflowHandle result) => + handleCompletionSource.SetResult((THandle)result); + + /// + internal override void SetException(Exception exception) => + handleCompletionSource.SetException(exception); + } +} \ No newline at end of file diff --git a/src/Temporalio/Client/WorkflowStartUpdateWithStartOptions.cs b/src/Temporalio/Client/WorkflowStartUpdateWithStartOptions.cs new file mode 100644 index 00000000..06e9f96e --- /dev/null +++ b/src/Temporalio/Client/WorkflowStartUpdateWithStartOptions.cs @@ -0,0 +1,44 @@ +namespace Temporalio.Client +{ + /// + /// Options for executing an update with start. + /// + /// NOTE: and + /// are both required. + public class WorkflowStartUpdateWithStartOptions : WorkflowUpdateWithStartOptions + { + /// + /// Initializes a new instance of the class. + /// + /// NOTE: and + /// are both required. + public WorkflowStartUpdateWithStartOptions() + { + } + + /// + /// Initializes a new instance of the class. + /// + /// Workflow start operation. + /// Stage to wait for. + public WorkflowStartUpdateWithStartOptions( + WithStartWorkflowOperation startWorkflowOperation, WorkflowUpdateStage waitForStage) + : base(startWorkflowOperation) => WaitForStage = waitForStage; + + /// + /// Initializes a new instance of the class. + /// + /// Update ID. + /// Workflow start operation. + /// Stage to wait for. + public WorkflowStartUpdateWithStartOptions( + string id, WithStartWorkflowOperation startWorkflowOperation, WorkflowUpdateStage waitForStage) + : base(id, startWorkflowOperation) => WaitForStage = waitForStage; + + /// + /// Gets or sets the stage to wait for on start. This is required and cannot be set to + /// None or Admitted at this time. + /// + public WorkflowUpdateStage WaitForStage { get; set; } + } +} \ No newline at end of file diff --git a/src/Temporalio/Client/WorkflowUpdateWithStartOptions.cs b/src/Temporalio/Client/WorkflowUpdateWithStartOptions.cs new file mode 100644 index 00000000..43e7687c --- /dev/null +++ b/src/Temporalio/Client/WorkflowUpdateWithStartOptions.cs @@ -0,0 +1,52 @@ +namespace Temporalio.Client +{ + /// + /// Options for executing an update with start. + /// + /// NOTE: is required. + public class WorkflowUpdateWithStartOptions : WorkflowUpdateOptions + { + /// + /// Initializes a new instance of the class. + /// + /// NOTE: is required. + public WorkflowUpdateWithStartOptions() + { + } + + /// + /// Initializes a new instance of the class. + /// + /// Workflow start operation. + public WorkflowUpdateWithStartOptions(WithStartWorkflowOperation startWorkflowOperation) => + StartWorkflowOperation = startWorkflowOperation; + + /// + /// Initializes a new instance of the class. + /// + /// Update ID. + /// Workflow start operation. + public WorkflowUpdateWithStartOptions(string id, WithStartWorkflowOperation startWorkflowOperation) + : base(id) => StartWorkflowOperation = startWorkflowOperation; + + /// + /// Gets or sets the workflow start operation. + /// + /// NOTE: This is required. + public WithStartWorkflowOperation? StartWorkflowOperation { get; set; } + + /// + /// Create a shallow copy of these options including the start operation. + /// + /// A shallow copy of these options and any transitive options fields. + public override object Clone() + { + var copy = (WorkflowUpdateWithStartOptions)base.Clone(); + if (StartWorkflowOperation is { } startOperation) + { + copy.StartWorkflowOperation = (WithStartWorkflowOperation)startOperation.Clone(); + } + return copy; + } + } +} \ No newline at end of file diff --git a/src/Temporalio/Exceptions/RpcException.cs b/src/Temporalio/Exceptions/RpcException.cs index c406ce44..3dda307f 100644 --- a/src/Temporalio/Exceptions/RpcException.cs +++ b/src/Temporalio/Exceptions/RpcException.cs @@ -29,6 +29,18 @@ public RpcException(StatusCode code, string message, byte[]? rawStatus) }); } + /// + /// Initializes a new instance of the class. + /// + /// gRPC status to derive values from. + internal RpcException(GrpcStatus grpcStatus) + : base(grpcStatus.Message) + { + Code = (StatusCode)grpcStatus.Code; + RawStatus = null; + GrpcStatus = new(() => grpcStatus); + } + /// /// gRPC status code taken from /// https://github.com/grpc/grpc-dotnet/blob/master/src/Grpc.Core.Api/StatusCode.cs. @@ -140,7 +152,7 @@ public enum StatusCode public StatusCode Code { get; private init; } /// - /// Gets the gRPC status message as a protobuf. + /// Gets the gRPC status message as a protobuf. This may not be set even if GrpcStatus is. /// public byte[]? RawStatus { get; private init; } diff --git a/tests/Temporalio.Tests/Extensions/OpenTelemetry/TracingInterceptorTests.cs b/tests/Temporalio.Tests/Extensions/OpenTelemetry/TracingInterceptorTests.cs index 1dc4d2b0..6caa7f39 100644 --- a/tests/Temporalio.Tests/Extensions/OpenTelemetry/TracingInterceptorTests.cs +++ b/tests/Temporalio.Tests/Extensions/OpenTelemetry/TracingInterceptorTests.cs @@ -8,6 +8,7 @@ namespace Temporalio.Tests.Extensions.OpenTelemetry; using global::OpenTelemetry.Trace; using Microsoft.Extensions.Logging; using Temporalio.Activities; +using Temporalio.Api.Enums.V1; using Temporalio.Client; using Temporalio.Common; using Temporalio.Exceptions; @@ -644,6 +645,89 @@ await Assert.ThrowsAsync(() => Tags: workflowRunTags)); } + [Fact] + public async Task TracingInterceptor_SignalWithStart_HaveProperSpans() + { + var activities = await WithTracingWorkerAsync(async (client, worker) => + { + var twoSignalWait = new TracingWorkflowParam( + new TracingWorkflowAction[] { new(WaitUntilSignalCount: 2) }); + + // Signal with start new + var options = new WorkflowOptions(id: $"wf-{Guid.NewGuid()}", worker.Options.TaskQueue!); + options.SignalWithStart((TracingWorkflow wf) => wf.Signal1Async()); + var handle = await client.StartWorkflowAsync( + (TracingWorkflow wf) => wf.RunAsync(twoSignalWait), + options); + + // Wait first task done + await AssertMore.HasEventEventuallyAsync( + handle, e => e.WorkflowTaskCompletedEventAttributes != null); + + // Go again + options = new WorkflowOptions(id: handle.Id, worker.Options.TaskQueue!); + options.SignalWithStart((TracingWorkflow wf) => wf.Signal2Async()); + await client.ExecuteWorkflowAsync( + (TracingWorkflow wf) => wf.RunAsync(twoSignalWait), + options); + }); + AssertActivities( + activities, + ActivityAssertion.NameAndParent( + "SignalWithStartWorkflow:TracingWorkflow", null), + ActivityAssertion.NameAndParent( + "HandleSignal:Signal1", "SignalWithStartWorkflow:TracingWorkflow"), + ActivityAssertion.NameAndParent( + "HandleSignal:Signal2", "SignalWithStartWorkflow:TracingWorkflow"), + ActivityAssertion.NameAndParent( + "RunWorkflow:TracingWorkflow", "SignalWithStartWorkflow:TracingWorkflow"), + ActivityAssertion.NameAndParent( + "CompleteWorkflow:TracingWorkflow", "RunWorkflow:TracingWorkflow"), + // There's actually a second one of these + ActivityAssertion.NameAndParent( + "SignalWithStartWorkflow:TracingWorkflow", null)); + } + + [Fact] + public async Task TracingInterceptor_UpdateWithStart_HaveProperSpans() + { + var activities = await WithTracingWorkerAsync(async (client, worker) => + { + var twoUpdateWait = new TracingWorkflowParam( + new TracingWorkflowAction[] { new(WaitUntilUpdateCount: 2) }); + + // Update with start new + var id = $"wf-{Guid.NewGuid()}"; + await client.ExecuteUpdateWithStartWorkflowAsync( + (TracingWorkflow wf) => wf.Update1Async(), + new(WithStartWorkflowOperation.Create( + (TracingWorkflow wf) => wf.RunAsync(twoUpdateWait), + new(id, worker.Options.TaskQueue!) { IdConflictPolicy = WorkflowIdConflictPolicy.Fail }))); + + // And again + await client.ExecuteUpdateWithStartWorkflowAsync( + (TracingWorkflow wf) => wf.Update2Async(), + new(WithStartWorkflowOperation.Create( + (TracingWorkflow wf) => wf.RunAsync(twoUpdateWait), + new(id, worker.Options.TaskQueue!) { IdConflictPolicy = WorkflowIdConflictPolicy.UseExisting }))); + }); + AssertActivities( + activities, + ActivityAssertion.NameAndParent( + "UpdateWithStartWorkflow:TracingWorkflow", null), + ActivityAssertion.NameAndParent( + "HandleUpdate:Update1", "UpdateWithStartWorkflow:TracingWorkflow"), + ActivityAssertion.NameAndParent( + "HandleUpdate:Update2", "UpdateWithStartWorkflow:TracingWorkflow"), + ActivityAssertion.NameAndParent( + "RunWorkflow:TracingWorkflow", "UpdateWithStartWorkflow:TracingWorkflow"), + ActivityAssertion.NameAndParent( + "CompleteWorkflow:TracingWorkflow", "RunWorkflow:TracingWorkflow"), + // There's actually a second one of these + ActivityAssertion.NameAndParent( + "UpdateWithStartWorkflow:TracingWorkflow", null)); + } + private static void AssertActivities( IReadOnlyCollection activities, params ActivityAssertion[] assertions) { @@ -694,6 +778,37 @@ private static string DumpActivity(Activity activity, int IndentDepth = 0) Func, Task>? afterStart = null, bool expectFail = false, bool terminate = false) + { + WorkflowHandle? handle = null; + var activities = await WithTracingWorkerAsync(async (client, worker) => + { + // Start + var options = new WorkflowOptions(id: $"wf-{Guid.NewGuid()}", worker.Options.TaskQueue!); + handle = await client.StartWorkflowAsync((TracingWorkflow wf) => wf.RunAsync(param), options); + + // Run after-start, then wait for complete + if (afterStart != null) + { + await afterStart.Invoke(handle); + } + if (terminate) + { + await handle.TerminateAsync(); + } + if (expectFail) + { + await Assert.ThrowsAsync(() => handle.GetResultAsync()); + } + else + { + await handle.GetResultAsync(); + } + }); + return (handle!, activities); + } + + private async Task> WithTracingWorkerAsync( + Func run) { var activities = new List(); @@ -717,35 +832,11 @@ private static string DumpActivity(Activity activity, int IndentDepth = 0) AddAllActivities(null). AddWorkflow(); using var worker = new TemporalWorker(client, workerOptions); - return await worker.ExecuteAsync(async () => - { - // Start - var handle = await client.StartWorkflowAsync( - (TracingWorkflow wf) => wf.RunAsync(param), - new(id: $"wf-{Guid.NewGuid()}", taskQueue: worker.Options.TaskQueue!)); - - // Run after-start, then wait for complete - if (afterStart != null) - { - await afterStart.Invoke(handle); - } - if (terminate) - { - await handle.TerminateAsync(); - } - if (expectFail) - { - await Assert.ThrowsAsync(() => handle.GetResultAsync()); - } - else - { - await handle.GetResultAsync(); - } - logger.LogDebug( - "Activities:\n{Activities}", - string.Join("\n", DumpActivities(activities))); - return (handle, activities); - }); + await worker.ExecuteAsync(() => run(client, worker)); + logger.LogDebug( + "Activities:\n{Activities}", + string.Join("\n", DumpActivities(activities))); + return activities; } public record ActivityAssertion( @@ -754,15 +845,26 @@ public record ActivityAssertion( Action>[]? Tags = null, Action[]? Events = null, Action[]? Links = null, - bool IgnoreLinks = false) + bool IgnoreLinks = false, + bool IgnoreTags = false, + bool IgnoreEvents = false) { + public static ActivityAssertion NameAndParent(string name, string? parent) => new( + Name: name, Parent: parent, IgnoreLinks: true, IgnoreTags: true, IgnoreEvents: true); + public void AssertActivity(IReadOnlyCollection activities, Activity activity) { Assert.Equal(Name, activity.OperationName); Activity? parent = activities.SingleOrDefault(a => a.SpanId == activity.ParentSpanId); Assert.Equal(Parent, parent?.OperationName); - AssertMore.Every(activity.Tags, Tags ?? Array.Empty>>()); - AssertMore.Every(activity.Events, Events ?? Array.Empty>()); + if (!IgnoreTags) + { + AssertMore.Every(activity.Tags, Tags ?? Array.Empty>>()); + } + if (!IgnoreEvents) + { + AssertMore.Every(activity.Events, Events ?? Array.Empty>()); + } if (!IgnoreLinks) { AssertMore.Every(activity.Links, Links ?? Array.Empty>()); @@ -794,6 +896,7 @@ public class TracingWorkflow public static readonly ActivitySource CustomSource = new("MyCustomSource"); private int signalCount; + private int updateCount; [WorkflowRun] public async Task RunAsync(TracingWorkflowParam param) @@ -872,6 +975,10 @@ public async Task RunAsync(TracingWorkflowParam param) { CustomSource.TrackWorkflowDiagnosticActivity(action.CreateCustomActivity).Dispose(); } + if (action.WaitUntilUpdateCount > 0) + { + await Workflow.WaitConditionAsync(() => updateCount >= action.WaitUntilUpdateCount); + } } } @@ -932,6 +1039,12 @@ public async Task UpdateValidatorFailureAsync(string msg) => public void ValidateUpdateValidatorFailure(string msg) => throw new ApplicationFailureException(msg); + [WorkflowUpdate] + public async Task Update1Async() => updateCount++; + + [WorkflowUpdate] + public async Task Update2Async() => updateCount++; + private static async Task RaiseOnNonReplayAsync(string msg) { var replaying = Workflow.Unsafe.IsReplaying; @@ -970,7 +1083,8 @@ public record TracingWorkflowAction( TracingWorkflowActionActivity? Activity = null, TracingWorkflowActionContinueAsNew? ContinueAsNew = null, int WaitUntilSignalCount = 0, - string? CreateCustomActivity = null); + string? CreateCustomActivity = null, + int WaitUntilUpdateCount = 0); public record TracingWorkflowActionChildWorkflow( TracingWorkflowParam Param, diff --git a/tests/Temporalio.Tests/GeneralTests.cs b/tests/Temporalio.Tests/GeneralTests.cs index b0dc3058..596342be 100644 --- a/tests/Temporalio.Tests/GeneralTests.cs +++ b/tests/Temporalio.Tests/GeneralTests.cs @@ -26,6 +26,13 @@ t.Namespace is { } ns && Assert.Contains(typeof(Temporalio.Client.Schedules.ScheduleListOptions), types); foreach (var type in types) { + // Exclude with start workflow operation which, while cloneable, is not considered an + // "options" object that should be instantiated with a parameterless constructor + if (typeof(Temporalio.Client.WithStartWorkflowOperation).IsAssignableFrom(type)) + { + continue; + } + // Instantiate and attempt clone var noParamConstructor = type.GetConstructors(). First(c => c.GetParameters().All(p => p.IsOptional)); diff --git a/tests/Temporalio.Tests/Worker/WorkflowWorkerTests.cs b/tests/Temporalio.Tests/Worker/WorkflowWorkerTests.cs index 2b18ff76..a087e48d 100644 --- a/tests/Temporalio.Tests/Worker/WorkflowWorkerTests.cs +++ b/tests/Temporalio.Tests/Worker/WorkflowWorkerTests.cs @@ -6213,6 +6213,151 @@ await ExecuteWorkerAsync(async worker => }); } + [Workflow] + public class UpdateWithStartWorkflow + { + private bool finishWaiting; + + [WorkflowRun] + public async Task RunAsync(int initialIncrement) + { + Counter += initialIncrement; + await Workflow.WaitConditionAsync(() => false); + } + + [WorkflowUpdate] + public async Task IncrementCounter(int value) => Counter += value; + + [WorkflowQuery] + public int Counter { get; set; } + + [WorkflowUpdate] + public async Task FailAsync() => throw new ApplicationFailureException("Intentional failure"); + + [WorkflowUpdate] + public Task StartWaitingAsync() => Workflow.WaitConditionAsync(() => finishWaiting); + + [WorkflowUpdate] + public async Task FinishWaitingAsync() => finishWaiting = true; + } + + [Fact] + public async Task ExecuteWorkflowAsync_UpdateWithStart_Simple() + { + await ExecuteWorkerAsync(async worker => + { + // Newly started + var id = $"workflow-{Guid.NewGuid()}"; + var startOp = WithStartWorkflowOperation.Create( + (UpdateWithStartWorkflow wf) => wf.RunAsync(123), + new(id: id, taskQueue: worker.Options.TaskQueue!) + { + IdConflictPolicy = WorkflowIdConflictPolicy.Fail, + }); + await Client.ExecuteUpdateWithStartWorkflowAsync( + (UpdateWithStartWorkflow wf) => wf.IncrementCounter(456), + new(startOp)); + + // Confirm counter + var handle = await startOp.GetHandleAsync(); + Assert.Equal(579, await handle.QueryAsync(wf => wf.Counter)); + + // Update with start 5 more times + foreach (var ignore in Enumerable.Range(0, 5)) + { + await Client.ExecuteUpdateWithStartWorkflowAsync( + (UpdateWithStartWorkflow wf) => wf.IncrementCounter(2), + new(WithStartWorkflowOperation.Create( + (UpdateWithStartWorkflow wf) => wf.RunAsync(10000), + new(id: id, taskQueue: worker.Options.TaskQueue!) + { + IdConflictPolicy = WorkflowIdConflictPolicy.UseExisting, + }))); + } + // Confirm 10 (i.e. 5 * 2) was added + Assert.Equal(589, await handle.QueryAsync(wf => wf.Counter)); + + // Confirm we get an already exists on both the start op and call if we set fail + // existing + startOp = WithStartWorkflowOperation.Create( + (UpdateWithStartWorkflow wf) => wf.RunAsync(123), + new(id: id, taskQueue: worker.Options.TaskQueue!) + { + IdConflictPolicy = WorkflowIdConflictPolicy.Fail, + }); + await Assert.ThrowsAsync(() => + Client.ExecuteUpdateWithStartWorkflowAsync( + (UpdateWithStartWorkflow wf) => wf.IncrementCounter(456), new(startOp))); + await Assert.ThrowsAsync(() => startOp.GetHandleAsync()); + }); + } + + [Fact] + public async Task ExecuteWorkflowAsync_UpdateWithStart_UpdateFailure() + { + await ExecuteWorkerAsync(async worker => + { + // Update failed but workflow started + var id = $"workflow-{Guid.NewGuid()}"; + var startOp = WithStartWorkflowOperation.Create( + (UpdateWithStartWorkflow wf) => wf.RunAsync(123), + new(id: id, taskQueue: worker.Options.TaskQueue!) + { + IdConflictPolicy = WorkflowIdConflictPolicy.Fail, + }); + var err = await Assert.ThrowsAsync(() => + Client.ExecuteUpdateWithStartWorkflowAsync( + (UpdateWithStartWorkflow wf) => wf.FailAsync(), + new(startOp))); + var appErr = Assert.IsType(err.InnerException); + Assert.Equal("Intentional failure", appErr.Message); + var handle = await startOp.GetHandleAsync(); + Assert.Equal(123, await handle.QueryAsync(wf => wf.Counter)); + }); + } + + [Fact] + public async Task ExecuteWorkflowAsync_UpdateWithStart_Cancellation() + { + await ExecuteWorkerAsync(async worker => + { + // Start in background with cancel token + var id = $"workflow-{Guid.NewGuid()}"; + var startOp = WithStartWorkflowOperation.Create( + (UpdateWithStartWorkflow wf) => wf.RunAsync(123), + new(id: id, taskQueue: worker.Options.TaskQueue!) + { + IdConflictPolicy = WorkflowIdConflictPolicy.Fail, + }); + using var cancelSource = new CancellationTokenSource(); + var updateTask = Task.Run(() => Client.ExecuteUpdateWithStartWorkflowAsync( + (UpdateWithStartWorkflow wf) => wf.StartWaitingAsync(), + new(startOp) { Rpc = new() { CancellationToken = cancelSource.Token } })); + + // Wait until workflow ID exists + await AssertMore.EventuallyAsync(async () => + { + try + { + await Client.GetWorkflowHandle(id).DescribeAsync(); + } + catch (RpcException e) when (e.Code == RpcException.StatusCode.NotFound) + { + Assert.Fail("Not found"); + } + }); + + // Now cancel token and confirm update cancellation results in proper cancellation + cancelSource.Cancel(); + await Assert.ThrowsAsync( + () => updateTask); + // Note, currently in this use case the handle isn't set either, the same exception + // appears for the start call + await Assert.ThrowsAsync( + () => startOp.GetHandleAsync()); + }); + } + internal static Task AssertTaskFailureContainsEventuallyAsync( WorkflowHandle handle, string messageContains) { diff --git a/tests/Temporalio.Tests/WorkflowEnvironment.cs b/tests/Temporalio.Tests/WorkflowEnvironment.cs index ec0363f0..959316e6 100644 --- a/tests/Temporalio.Tests/WorkflowEnvironment.cs +++ b/tests/Temporalio.Tests/WorkflowEnvironment.cs @@ -72,6 +72,9 @@ public async Task InitializeAsync() "worker.buildIdScavengerEnabled=true", "--dynamic-config-value", $"limit.historyCount.suggestContinueAsNew={ContinueAsNewSuggestedHistoryCount}", + // Enable multi-op + "--dynamic-config-value", + "frontend.enableExecuteMultiOperation=true", }, }, });