diff --git a/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/iface.go b/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/iface.go index 486ac35a16..c2022dea25 100644 --- a/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/iface.go +++ b/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/iface.go @@ -290,6 +290,7 @@ type ExecutableArrayNodeStatus interface { GetSubNodeTaskPhases() bitarray.CompactArray GetSubNodeRetryAttempts() bitarray.CompactArray GetSubNodeSystemFailures() bitarray.CompactArray + GetSubNodeDeltaTimestamps() bitarray.CompactArray GetTaskPhaseVersion() uint32 } @@ -302,6 +303,7 @@ type MutableArrayNodeStatus interface { SetSubNodeTaskPhases(subNodeTaskPhases bitarray.CompactArray) SetSubNodeRetryAttempts(subNodeRetryAttempts bitarray.CompactArray) SetSubNodeSystemFailures(subNodeSystemFailures bitarray.CompactArray) + SetSubNodeDeltaTimestamps(subNodeDeltaTimestamps bitarray.CompactArray) SetTaskPhaseVersion(taskPhaseVersion uint32) } diff --git a/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/node_status.go b/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/node_status.go index 218b045588..2e6fde25d3 100644 --- a/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/node_status.go +++ b/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/node_status.go @@ -230,13 +230,14 @@ const ( type ArrayNodeStatus struct { MutableStruct - Phase ArrayNodePhase `json:"phase,omitempty"` - ExecutionError *core.ExecutionError `json:"executionError,omitempty"` - SubNodePhases bitarray.CompactArray `json:"subphase,omitempty"` - SubNodeTaskPhases bitarray.CompactArray `json:"subtphase,omitempty"` - SubNodeRetryAttempts bitarray.CompactArray `json:"subattempts,omitempty"` - SubNodeSystemFailures bitarray.CompactArray `json:"subsysfailures,omitempty"` - TaskPhaseVersion uint32 `json:"taskPhaseVersion,omitempty"` + Phase ArrayNodePhase `json:"phase,omitempty"` + ExecutionError *core.ExecutionError `json:"executionError,omitempty"` + SubNodePhases bitarray.CompactArray `json:"subphase,omitempty"` + SubNodeTaskPhases bitarray.CompactArray `json:"subtphase,omitempty"` + SubNodeRetryAttempts bitarray.CompactArray `json:"subattempts,omitempty"` + SubNodeSystemFailures bitarray.CompactArray `json:"subsysfailures,omitempty"` + SubNodeDeltaTimestamps bitarray.CompactArray `json: "subtimestamps",omitempty"` + TaskPhaseVersion uint32 `json:"taskPhaseVersion,omitempty"` } func (in *ArrayNodeStatus) GetArrayNodePhase() ArrayNodePhase { @@ -305,6 +306,17 @@ func (in *ArrayNodeStatus) SetSubNodeSystemFailures(subNodeSystemFailures bitarr } } +func (in *ArrayNodeStatus) GetSubNodeDeltaTimestamps() bitarray.CompactArray { + return in.SubNodeDeltaTimestamps +} + +func (in *ArrayNodeStatus) SetSubNodeDeltaTimestamps(subNodeDeltaTimestamps bitarray.CompactArray) { + if in.SubNodeDeltaTimestamps != subNodeDeltaTimestamps { + in.SetDirty() + in.SubNodeDeltaTimestamps = subNodeDeltaTimestamps + } +} + func (in *ArrayNodeStatus) GetTaskPhaseVersion() uint32 { return in.TaskPhaseVersion } diff --git a/flytepropeller/pkg/controller/nodes/array/handler.go b/flytepropeller/pkg/controller/nodes/array/handler.go index 17f49adcd3..f2021006a5 100644 --- a/flytepropeller/pkg/controller/nodes/array/handler.go +++ b/flytepropeller/pkg/controller/nodes/array/handler.go @@ -5,6 +5,9 @@ import ( "fmt" "math" "strconv" + "time" + + "k8s.io/apimachinery/pkg/apis/meta/v1" idlcore "github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/core" "github.com/flyteorg/flyte/flyteplugins/go/tasks/pluginmachinery/core" @@ -28,6 +31,11 @@ import ( "github.com/flyteorg/flyte/flytestdlib/storage" ) +const ( + // value is 3 days of seconds which is covered by 18 bits (262144) + MAX_DELTA_TIMESTAMP = 259200 +) + var ( nilLiteral = &idlcore.Literal{ Value: &idlcore.Literal_Scalar{ @@ -254,6 +262,7 @@ func (a *arrayNodeHandler) Handle(ctx context.Context, nCtx interfaces.NodeExecu {arrayReference: &arrayNodeState.SubNodeTaskPhases, maxValue: len(core.Phases) - 1}, {arrayReference: &arrayNodeState.SubNodeRetryAttempts, maxValue: maxAttemptsValue}, {arrayReference: &arrayNodeState.SubNodeSystemFailures, maxValue: maxSystemFailuresValue}, + {arrayReference: &arrayNodeState.SubNodeDeltaTimestamps, maxValue: MAX_DELTA_TIMESTAMP}, } { *item.arrayReference, err = bitarray.NewCompactArray(uint(size), bitarray.Item(item.maxValue)) // #nosec G115 @@ -380,6 +389,18 @@ func (a *arrayNodeHandler) Handle(ctx context.Context, nCtx interfaces.NodeExecu arrayNodeState.SubNodeRetryAttempts.SetItem(index, uint64(subNodeStatus.GetAttempts())) arrayNodeState.SubNodeSystemFailures.SetItem(index, uint64(subNodeStatus.GetSystemFailures())) + startedAt := nCtx.NodeStatus().GetLastAttemptStartedAt() + subNodeStartedAt := subNodeStatus.GetLastAttemptStartedAt() + if subNodeStartedAt == nil { + // subNodeStartedAt == nil indicates either (1) node has not started or (2) node status has + // been reset (ex. retryable failure). in both cases we set the delta timestamp to 0 + arrayNodeState.SubNodeDeltaTimestamps.SetItem(index, 0) + } else if startedAt != nil && arrayNodeState.SubNodeDeltaTimestamps.GetItem(index) == 0 { + // otherwise if `SubNodeDeltaTimestamps` is unset, we compute the delta and set it + deltaDuration := uint64(subNodeStartedAt.Time.Sub(startedAt.Time).Seconds()) + arrayNodeState.SubNodeDeltaTimestamps.SetItem(index, deltaDuration) + } + // increment task phase version if subNode phase or task phase changed if subNodeStatus.GetPhase() != nodeExecutionRequest.nodePhase || subNodeStatus.GetTaskNodeStatus().GetPhase() != nodeExecutionRequest.taskPhase { incrementTaskPhaseVersion = true @@ -767,6 +788,12 @@ func (a *arrayNodeHandler) buildArrayNodeContext(ctx context.Context, nCtx inter return nil, nil, nil, nil, nil, nil, err } + // compute start time for subNode using delta timestamp from ArrayNode NodeStatus + var startedAt *v1.Time + if deltaSeconds := arrayNodeState.SubNodeDeltaTimestamps.GetItem(subNodeIndex); deltaSeconds != 0 { + startedAt = &v1.Time{Time: nCtx.NodeStatus().GetLastAttemptStartedAt().Add(time.Duration(deltaSeconds) * time.Second)} + } + subNodeStatus := &v1alpha1.NodeStatus{ Phase: nodePhase, DataDir: subDataDir, @@ -777,6 +804,7 @@ func (a *arrayNodeHandler) buildArrayNodeContext(ctx context.Context, nCtx inter Phase: taskPhase, PluginState: pluginStateBytes, }, + LastAttemptStartedAt: startedAt, } // initialize mocks diff --git a/flytepropeller/pkg/controller/nodes/array/handler_test.go b/flytepropeller/pkg/controller/nodes/array/handler_test.go index a759327423..268a4a6fdd 100644 --- a/flytepropeller/pkg/controller/nodes/array/handler_test.go +++ b/flytepropeller/pkg/controller/nodes/array/handler_test.go @@ -252,6 +252,7 @@ func TestAbort(t *testing.T) { {arrayReference: &arrayNodeState.SubNodeTaskPhases, maxValue: len(core.Phases) - 1}, {arrayReference: &arrayNodeState.SubNodeRetryAttempts, maxValue: 1}, {arrayReference: &arrayNodeState.SubNodeSystemFailures, maxValue: 1}, + {arrayReference: &arrayNodeState.SubNodeDeltaTimestamps, maxValue: 1024}, } { *item.arrayReference, err = bitarray.NewCompactArray(uint(size), bitarray.Item(item.maxValue)) // #nosec G115 @@ -348,6 +349,7 @@ func TestFinalize(t *testing.T) { {arrayReference: &arrayNodeState.SubNodeTaskPhases, maxValue: len(core.Phases) - 1}, {arrayReference: &arrayNodeState.SubNodeRetryAttempts, maxValue: 1}, {arrayReference: &arrayNodeState.SubNodeSystemFailures, maxValue: 1}, + {arrayReference: &arrayNodeState.SubNodeDeltaTimestamps, maxValue: 1024}, } { *item.arrayReference, err = bitarray.NewCompactArray(uint(size), bitarray.Item(item.maxValue)) // #nosec G115 assert.NoError(t, err) @@ -858,6 +860,7 @@ func TestHandleArrayNodePhaseExecuting(t *testing.T) { {arrayReference: &arrayNodeState.SubNodeTaskPhases, maxValue: len(core.Phases) - 1}, {arrayReference: &arrayNodeState.SubNodeRetryAttempts, maxValue: 1}, {arrayReference: &arrayNodeState.SubNodeSystemFailures, maxValue: 1}, + {arrayReference: &arrayNodeState.SubNodeDeltaTimestamps, maxValue: 1024}, } { *item.arrayReference, err = bitarray.NewCompactArray(uint(size), bitarray.Item(item.maxValue)) // #nosec G115 assert.NoError(t, err) @@ -1301,6 +1304,7 @@ func TestHandleArrayNodePhaseFailing(t *testing.T) { {arrayReference: &arrayNodeState.SubNodeTaskPhases, maxValue: len(core.Phases) - 1}, {arrayReference: &arrayNodeState.SubNodeRetryAttempts, maxValue: 1}, {arrayReference: &arrayNodeState.SubNodeSystemFailures, maxValue: 1}, + {arrayReference: &arrayNodeState.SubNodeDeltaTimestamps, maxValue: 1024}, } { *item.arrayReference, err = bitarray.NewCompactArray(uint(len(test.subNodePhases)), bitarray.Item(item.maxValue)) // #nosec G115 assert.NoError(t, err) diff --git a/flytepropeller/pkg/controller/nodes/handler/state.go b/flytepropeller/pkg/controller/nodes/handler/state.go index a7fa7bdf87..c3e35e67d7 100644 --- a/flytepropeller/pkg/controller/nodes/handler/state.go +++ b/flytepropeller/pkg/controller/nodes/handler/state.go @@ -48,11 +48,12 @@ type GateNodeState struct { } type ArrayNodeState struct { - Phase v1alpha1.ArrayNodePhase - TaskPhaseVersion uint32 - Error *core.ExecutionError - SubNodePhases bitarray.CompactArray - SubNodeTaskPhases bitarray.CompactArray - SubNodeRetryAttempts bitarray.CompactArray - SubNodeSystemFailures bitarray.CompactArray + Phase v1alpha1.ArrayNodePhase + TaskPhaseVersion uint32 + Error *core.ExecutionError + SubNodePhases bitarray.CompactArray + SubNodeTaskPhases bitarray.CompactArray + SubNodeRetryAttempts bitarray.CompactArray + SubNodeSystemFailures bitarray.CompactArray + SubNodeDeltaTimestamps bitarray.CompactArray } diff --git a/flytepropeller/pkg/controller/nodes/node_state_manager.go b/flytepropeller/pkg/controller/nodes/node_state_manager.go index fd74d107a0..6e58fe2783 100644 --- a/flytepropeller/pkg/controller/nodes/node_state_manager.go +++ b/flytepropeller/pkg/controller/nodes/node_state_manager.go @@ -65,7 +65,7 @@ func (n *nodeStateManager) HasWorkflowNodeState() bool { } func (n *nodeStateManager) HasGateNodeState() bool { - return n.g != nil +return n.g != nil } func (n *nodeStateManager) HasArrayNodeState() bool { @@ -181,6 +181,11 @@ func (n nodeStateManager) GetArrayNodeState() handler.ArrayNodeState { if subNodeSystemFailuresCopy := subNodeSystemFailures.DeepCopy(); subNodeSystemFailuresCopy != nil { as.SubNodeSystemFailures = *subNodeSystemFailuresCopy } + + subNodeDeltaTimestamps := an.GetSubNodeDeltaTimestamps() + if subNodeDeltaTimestampsCopy := subNodeDeltaTimestamps.DeepCopy(); subNodeDeltaTimestampsCopy != nil { + as.SubNodeDeltaTimestamps = *subNodeDeltaTimestampsCopy + } } return as } diff --git a/flytepropeller/pkg/controller/nodes/transformers.go b/flytepropeller/pkg/controller/nodes/transformers.go index 9d19081cc4..ceeaf5aaec 100644 --- a/flytepropeller/pkg/controller/nodes/transformers.go +++ b/flytepropeller/pkg/controller/nodes/transformers.go @@ -314,6 +314,7 @@ func UpdateNodeStatus(np v1alpha1.NodePhase, p handler.PhaseInfo, n interfaces.N t.SetSubNodeTaskPhases(na.SubNodeTaskPhases) t.SetSubNodeRetryAttempts(na.SubNodeRetryAttempts) t.SetSubNodeSystemFailures(na.SubNodeSystemFailures) + t.SetSubNodeDeltaTimestamps(na.SubNodeDeltaTimestamps) t.SetTaskPhaseVersion(na.TaskPhaseVersion) } }