diff --git a/workflow/controller/controller.go b/workflow/controller/controller.go index dcd0ad6d7444..33e72f563864 100644 --- a/workflow/controller/controller.go +++ b/workflow/controller/controller.go @@ -648,13 +648,23 @@ func (wfc *WorkflowController) deleteOffloadedNodesForWorkflow(uid string, versi if !ok { return fmt.Errorf("object %+v is not an unstructured", workflows[0]) } + key := un.GetNamespace() + "/" + un.GetName() + wfc.workflowKeyLock.Lock(key) + defer wfc.workflowKeyLock.Unlock(key) + + obj, ok := wfc.getWorkflowByKey(key) + if !ok { + return fmt.Errorf("failed to get workflow by key after locking") + } + un, ok = obj.(*unstructured.Unstructured) + if !ok { + return fmt.Errorf("object %+v is not an unstructured", obj) + } wf, err = util.FromUnstructured(un) if err != nil { return err } - key := wf.ObjectMeta.Namespace + "/" + wf.ObjectMeta.Name - wfc.workflowKeyLock.Lock(key) - defer wfc.workflowKeyLock.Unlock(key) + // workflow might still be hydrated if wfc.hydrator.IsHydrated(wf) { log.WithField("uid", wf.UID).Info("Hydrated workflow encountered") @@ -728,20 +738,14 @@ func (wfc *WorkflowController) processNextItem(ctx context.Context) bool { } defer wfc.wfQueue.Done(key) - obj, exists, err := wfc.wfInformer.GetIndexer().GetByKey(key.(string)) - if err != nil { - log.WithFields(log.Fields{"key": key, "error": err}).Error("Failed to get workflow from informer") - return true - } - if !exists { - // This happens after a workflow was labeled with completed=true - // or was deleted, but the work queue still had an entry for it. - return true - } - wfc.workflowKeyLock.Lock(key.(string)) defer wfc.workflowKeyLock.Unlock(key.(string)) + obj, ok := wfc.getWorkflowByKey(key.(string)) + if !ok { + return true + } + // The workflow informer receives unstructured objects to deal with the possibility of invalid // workflow manifests that are unable to unmarshal to workflow objects un, ok := obj.(*unstructured.Unstructured) @@ -815,6 +819,20 @@ func (wfc *WorkflowController) processNextItem(ctx context.Context) bool { return true } +func (wfc *WorkflowController) getWorkflowByKey(key string) (interface{}, bool) { + obj, exists, err := wfc.wfInformer.GetIndexer().GetByKey(key) + if err != nil { + log.WithFields(log.Fields{"key": key, "error": err}).Error("Failed to get workflow from informer") + return nil, false + } + if !exists { + // This happens after a workflow was labeled with completed=true + // or was deleted, but the work queue still had an entry for it. + return nil, false + } + return obj, true +} + func reconciliationNeeded(wf metav1.Object) bool { return wf.GetLabels()[common.LabelKeyCompleted] != "true" || slices.Contains(wf.GetFinalizers(), common.FinalizerArtifactGC) } @@ -1018,6 +1036,11 @@ func (wfc *WorkflowController) archiveWorkflow(ctx context.Context, obj interfac } wfc.workflowKeyLock.Lock(key) defer wfc.workflowKeyLock.Unlock(key) + key, err = cache.MetaNamespaceKeyFunc(obj) + if err != nil { + log.Error("failed to get key for object after locking") + return + } err = wfc.archiveWorkflowAux(ctx, obj) if err != nil { log.WithField("key", key).WithError(err).Error("failed to archive workflow")