Skip to content

Commit

Permalink
Support Nexus operation complete before start (#6821)
Browse files Browse the repository at this point in the history
## What changed?
<!-- Describe what has changed in this PR -->
Added support for completing Nexus operations before the StartOperation
response is received. Operation ID, StartTime, and StartLinks were added
to the completion request so that a NexusOperationStarted event can be
fabricated.
Depends on nexus-rpc/sdk-go#26 and
temporalio/sdk-go#1710

Bumps Temporal SDK version to `v1.30.1`
Bumps Nexus SDK version to `v0.0.12`

## Why?
<!-- Tell your future self why have you made these changes -->
Better user experience.

## How did you test it?
<!-- How have you verified this change? Tested locally? Added a unit
test? Checked in staging env? -->
New functional test
  • Loading branch information
pdoerner authored Nov 27, 2024
1 parent 1045b78 commit 17f99c0
Show file tree
Hide file tree
Showing 9 changed files with 2,411 additions and 2,047 deletions.
4,097 changes: 2,068 additions & 2,029 deletions api/historyservice/v1/request_response.pb.go

Large diffs are not rendered by default.

57 changes: 56 additions & 1 deletion components/nexusoperations/completion.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"go.temporal.io/api/serviceerror"
commonnexus "go.temporal.io/server/common/nexus"
"go.temporal.io/server/service/history/hsm"
"google.golang.org/protobuf/types/known/timestamppb"
)

func handleSuccessfulOperationResult(
Expand Down Expand Up @@ -127,11 +128,62 @@ func handleUnsuccessfulOperationError(
}
}

// Adds a NEXUS_OPERATION_STARTED history event and sets the operation state machine to NEXUS_OPERATION_STATE_STARTED.
// Necessary if the completion is received before the start response.
func fabricateStartedEventIfMissing(
node *hsm.Node,
requestID string,
operationID string,
startTime *timestamppb.Timestamp,
links []*commonpb.Link,
) error {
operation, err := hsm.MachineData[Operation](node)
if err != nil {
return err
}

if TransitionStarted.Possible(operation) {
eventID, err := hsm.EventIDFromToken(operation.ScheduledEventToken)
if err != nil {
return err
}

operation.OperationId = operationID

event := node.AddHistoryEvent(enumspb.EVENT_TYPE_NEXUS_OPERATION_STARTED, func(e *historypb.HistoryEvent) {
e.Attributes = &historypb.HistoryEvent_NexusOperationStartedEventAttributes{
NexusOperationStartedEventAttributes: &historypb.NexusOperationStartedEventAttributes{
ScheduledEventId: eventID,
OperationId: operationID,
RequestId: requestID,
},
}
e.Links = links
if startTime != nil {
e.EventTime = startTime
}
})

_, err = TransitionStarted.Apply(operation, EventStarted{
Time: event.EventTime.AsTime(),
Node: node,
Attributes: event.GetNexusOperationStartedEventAttributes(),
})

return err
}

return nil
}

func CompletionHandler(
ctx context.Context,
env hsm.Environment,
ref hsm.Ref,
requestID string,
operationID string,
startTime *timestamppb.Timestamp,
links []*commonpb.Link,
result *commonpb.Payload,
opFailedError *nexus.UnsuccessfulOperationError,
) error {
Expand All @@ -142,6 +194,9 @@ func CompletionHandler(
if err := node.CheckRunning(); err != nil {
return serviceerror.NewNotFound("operation not found")
}
if err := fabricateStartedEventIfMissing(node, requestID, operationID, startTime, links); err != nil {
return err
}
err := hsm.MachineTransition(node, func(operation Operation) (hsm.TransitionOutput, error) {
if requestID != "" && operation.RequestId != requestID {
isRetryableNotFoundErr = false
Expand All @@ -162,7 +217,7 @@ func CompletionHandler(
if errors.As(err, new(*serviceerror.NotFound)) && isRetryableNotFoundErr && ref.WorkflowKey.RunID != "" {
// Try again without a run ID in case the original run was reset.
ref.WorkflowKey.RunID = ""
return CompletionHandler(ctx, env, ref, requestID, result, opFailedError)
return CompletionHandler(ctx, env, ref, requestID, operationID, startTime, links, result, opFailedError)
}
return err
}
33 changes: 31 additions & 2 deletions components/nexusoperations/frontend/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import (
"github.com/nexus-rpc/sdk-go/nexus"
commonpb "go.temporal.io/api/common/v1"
"go.temporal.io/api/serviceerror"
"go.temporal.io/sdk/temporalnexus"
"go.temporal.io/server/api/historyservice/v1"
"go.temporal.io/server/common/authorization"
"go.temporal.io/server/common/cluster"
Expand All @@ -59,6 +60,7 @@ import (
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/types/known/timestamppb"
)

var apiName = configs.CompleteNexusOperation
Expand Down Expand Up @@ -176,9 +178,36 @@ func (h *completionHandler) CompleteOperation(ctx context.Context, r *nexus.Comp
)
return nexus.HandlerErrorf(nexus.HandlerErrorTypeBadRequest, "invalid callback token")
}
var links []*commonpb.Link
for _, nexusLink := range r.StartLinks {
switch nexusLink.Type {
case string((&commonpb.Link_WorkflowEvent{}).ProtoReflect().Descriptor().FullName()):
link, err := temporalnexus.ConvertNexusLinkToLinkWorkflowEvent(nexusLink)
if err != nil {
// TODO(rodrigozhou): links are non-essential for the execution of the workflow,
// so ignoring the error for now; we will revisit how to handle these errors later.
h.Logger.Warn(
fmt.Sprintf("failed to parse link to %q: %s", nexusLink.Type, nexusLink.URL),
tag.Error(err),
)
continue
}
links = append(links, &commonpb.Link{
Variant: &commonpb.Link_WorkflowEvent_{
WorkflowEvent: link,
},
})
default:
// If the link data type is unsupported, just ignore it for now.
h.Logger.Warn(fmt.Sprintf("invalid link data type: %q", nexusLink.Type))
}
}
hr := &historyservice.CompleteNexusOperationRequest{
Completion: completion,
State: string(r.State),
Completion: completion,
State: string(r.State),
OperationId: r.OperationID,
StartTime: timestamppb.New(r.StartTime),
Links: links,
}
switch r.State { // nolint:exhaustive
case nexus.OperationStateFailed, nexus.OperationStateCanceled:
Expand Down
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ require (
github.com/jstemmer/go-junit-report/v2 v2.1.0
github.com/lib/pq v1.10.9
github.com/mitchellh/mapstructure v1.5.0
github.com/nexus-rpc/sdk-go v0.0.11
github.com/nexus-rpc/sdk-go v0.0.12
github.com/olekukonko/tablewriter v0.0.5
github.com/olivere/elastic/v7 v7.0.32
github.com/pborman/uuid v1.2.1
Expand Down Expand Up @@ -57,7 +57,7 @@ require (
go.opentelemetry.io/otel/sdk/metric v1.31.0
go.opentelemetry.io/otel/trace v1.31.0
go.temporal.io/api v1.41.1-0.20241106230151-fd920b39031f
go.temporal.io/sdk v1.30.0
go.temporal.io/sdk v1.30.1
go.temporal.io/version v0.3.0
go.uber.org/automaxprocs v1.5.3
go.uber.org/fx v1.22.0
Expand Down
8 changes: 4 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -210,8 +210,8 @@ github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ=
github.com/ncruces/go-strftime v0.1.9 h1:bY0MQC28UADQmHmaF5dgpLmImcShSi2kHU9XLdhx/f4=
github.com/ncruces/go-strftime v0.1.9/go.mod h1:Fwc5htZGVVkseilnfgOVb9mKy6w1naJmn9CehxcKcls=
github.com/nexus-rpc/sdk-go v0.0.11 h1:qH3Us3spfp50t5ca775V1va2eE6z1zMQDZY4mvbw0CI=
github.com/nexus-rpc/sdk-go v0.0.11/go.mod h1:TpfkM2Cw0Rlk9drGkoiSMpFqflKTiQLWUNyKJjF8mKQ=
github.com/nexus-rpc/sdk-go v0.0.12 h1:Bsjo3aKIaApgi/eohhzufwrAeK/sEphcbeZM1Z7S/nI=
github.com/nexus-rpc/sdk-go v0.0.12/go.mod h1:TpfkM2Cw0Rlk9drGkoiSMpFqflKTiQLWUNyKJjF8mKQ=
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno=
github.com/olekukonko/tablewriter v0.0.5 h1:P2Ga83D34wi1o9J6Wh1mRuqd4mF/x/lgBS7N7AbDhec=
github.com/olekukonko/tablewriter v0.0.5/go.mod h1:hPp6KlRPjbx+hW8ykQs1w3UBbZlj6HuIJcUGPhkA7kY=
Expand Down Expand Up @@ -332,8 +332,8 @@ go.opentelemetry.io/proto/otlp v1.3.1 h1:TrMUixzpM0yuc/znrFTP9MMRh8trP93mkCiDVeX
go.opentelemetry.io/proto/otlp v1.3.1/go.mod h1:0X1WI4de4ZsLrrJNLAQbFeLCm3T7yBkR0XqQ7niQU+8=
go.temporal.io/api v1.41.1-0.20241106230151-fd920b39031f h1:dJMVbAKIhAOYNixV9PgnVevYnFyTngW/uG7gY/SZYLA=
go.temporal.io/api v1.41.1-0.20241106230151-fd920b39031f/go.mod h1:1WwYUMo6lao8yl0371xWUm13paHExN5ATYT/B7QtFis=
go.temporal.io/sdk v1.30.0 h1:7jzSFZYk+tQ2kIYEP+dvrM7AW9EsCEP52JHCjVGuwbI=
go.temporal.io/sdk v1.30.0/go.mod h1:Pv45F/fVDgWKx+jhix5t/dGgqROVaI+VjPLd3CHWqq0=
go.temporal.io/sdk v1.30.1 h1:4wgfSjwuaayQl9Q0mUzpNV6w55TPAESSroR6Z5lE49o=
go.temporal.io/sdk v1.30.1/go.mod h1:hNCZzd6dt7bxD9B4AECQgjHTd2NrzjdmGDbbv4xHuFU=
go.temporal.io/version v0.3.0 h1:dMrei9l9NyHt8nG6EB8vAwDLLTwx2SvRyucCSumAiig=
go.temporal.io/version v0.3.0/go.mod h1:UA9S8/1LaKYae6TyD9NaPMJTZb911JcbqghI2CBSP78=
go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1094,6 +1094,12 @@ message CompleteNexusOperationRequest {
// Operation failure, only set if state != successful.
temporal.api.nexus.v1.Failure failure = 4;
}
// Operation ID - used when the completion is received before the started response.
string operation_id = 5;
// Time the operation was started. Used when completion is received before the started response.
google.protobuf.Timestamp start_time = 6;
// Links to be attached to a fabricated start event if completion is received before started response.
repeated temporal.api.common.v1.Link links = 7;
}

message CompleteNexusOperationResponse {
Expand Down
3 changes: 3 additions & 0 deletions service/history/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -2340,6 +2340,9 @@ func (h *Handler) CompleteNexusOperation(ctx context.Context, request *historyse
engine.StateMachineEnvironment(metrics.OperationTag(metrics.HistoryCompleteNexusOperationScope)),
ref,
request.Completion.RequestId,
request.OperationId,
request.StartTime,
request.Links,
request.GetSuccess(),
opErr,
)
Expand Down
39 changes: 31 additions & 8 deletions service/history/workflow/mutable_state_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ import (
updatepb "go.temporal.io/api/update/v1"
workflowpb "go.temporal.io/api/workflow/v1"
"go.temporal.io/api/workflowservice/v1"
"go.temporal.io/sdk/temporalnexus"
clockspb "go.temporal.io/server/api/clock/v1"
enumsspb "go.temporal.io/server/api/enums/v1"
historyspb "go.temporal.io/server/api/history/v1"
Expand Down Expand Up @@ -584,6 +585,18 @@ func (ms *MutableStateImpl) GetNexusCompletion(ctx context.Context) (nexus.Opera
if err != nil {
return nil, err
}
// Create the link information about the workflow to be attached to fabricated started event if completion is
// received before start response.
startLink := temporalnexus.ConvertLinkWorkflowEventToNexusLink(&commonpb.Link_WorkflowEvent{
Namespace: ms.namespaceEntry.Name().String(),
WorkflowId: ms.executionInfo.WorkflowId,
RunId: ms.executionState.RunId,
Reference: &commonpb.Link_WorkflowEvent_EventRef{
EventRef: &commonpb.Link_WorkflowEvent_EventReference{
EventType: enumspb.EVENT_TYPE_WORKFLOW_EXECUTION_STARTED,
},
},
})
switch ce.GetEventType() {
case enumspb.EVENT_TYPE_WORKFLOW_EXECUTION_COMPLETED:
payloads := ce.GetWorkflowExecutionCompletedEventAttributes().GetResult().GetPayloads()
Expand All @@ -596,6 +609,8 @@ func (ms *MutableStateImpl) GetNexusCompletion(ctx context.Context) (nexus.Opera
}
completion, err := nexus.NewOperationCompletionSuccessful(p, nexus.OperationCompletionSuccessfulOptions{
Serializer: commonnexus.PayloadSerializer,
StartTime: ms.executionState.GetStartTime().AsTime(),
StartLinks: []nexus.Link{startLink},
})
if err != nil {
return nil, serviceerror.NewInternal(fmt.Sprintf("failed to construct Nexus completion: %v", err))
Expand All @@ -604,23 +619,31 @@ func (ms *MutableStateImpl) GetNexusCompletion(ctx context.Context) (nexus.Opera
case enumspb.EVENT_TYPE_WORKFLOW_EXECUTION_FAILED:
f := commonnexus.APIFailureToNexusFailure(ce.GetWorkflowExecutionFailedEventAttributes().GetFailure())
return &nexus.OperationCompletionUnsuccessful{
State: nexus.OperationStateFailed,
Failure: f,
State: nexus.OperationStateFailed,
StartTime: ms.executionState.GetStartTime().AsTime(),
StartLinks: []nexus.Link{startLink},
Failure: f,
}, nil
case enumspb.EVENT_TYPE_WORKFLOW_EXECUTION_CANCELED:
return &nexus.OperationCompletionUnsuccessful{
State: nexus.OperationStateCanceled,
Failure: &nexus.Failure{Message: "operation canceled"},
State: nexus.OperationStateCanceled,
StartTime: ms.executionState.GetStartTime().AsTime(),
StartLinks: []nexus.Link{startLink},
Failure: &nexus.Failure{Message: "operation canceled"},
}, nil
case enumspb.EVENT_TYPE_WORKFLOW_EXECUTION_TERMINATED:
return &nexus.OperationCompletionUnsuccessful{
State: nexus.OperationStateFailed,
Failure: &nexus.Failure{Message: "operation terminated"},
State: nexus.OperationStateFailed,
StartTime: ms.executionState.GetStartTime().AsTime(),
StartLinks: []nexus.Link{startLink},
Failure: &nexus.Failure{Message: "operation terminated"},
}, nil
case enumspb.EVENT_TYPE_WORKFLOW_EXECUTION_TIMED_OUT:
return &nexus.OperationCompletionUnsuccessful{
State: nexus.OperationStateFailed,
Failure: &nexus.Failure{Message: "operation exceeded internal timeout"},
State: nexus.OperationStateFailed,
StartTime: ms.executionState.GetStartTime().AsTime(),
StartLinks: []nexus.Link{startLink},
Failure: &nexus.Failure{Message: "operation exceeded internal timeout"},
}, nil
}
return nil, serviceerror.NewInternal(fmt.Sprintf("invalid workflow execution status: %v", ce.GetEventType()))
Expand Down
Loading

0 comments on commit 17f99c0

Please sign in to comment.