From 14599e0d4360e29f17bff9cf1fa5710e94dc6614 Mon Sep 17 00:00:00 2001 From: isubasinghe Date: Tue, 31 Dec 2024 13:25:51 +1100 Subject: [PATCH] fix: test Signed-off-by: isubasinghe --- workflow/controller/controller.go | 6 +++++- workflow/controller/controller_test.go | 1 + workflow/sync/multi_throttler.go | 8 ++++++++ 3 files changed, 14 insertions(+), 1 deletion(-) diff --git a/workflow/controller/controller.go b/workflow/controller/controller.go index a58eba5a4c5f..b314ef4c57f4 100644 --- a/workflow/controller/controller.go +++ b/workflow/controller/controller.go @@ -242,7 +242,10 @@ func NewWorkflowController(ctx context.Context, restConfig *rest.Config, kubecli } func (wfc *WorkflowController) newThrottler() sync.Throttler { - f := func(key string) { wfc.wfQueue.AddRateLimited(key) } + f := func(key string) { + log.Infof("[DEBUG] queued string %s", key) + wfc.wfQueue.Add(key) + } return sync.NewMultiThrottler(wfc.Config.Parallelism, make(map[string]int), wfc.Config.NamespaceParallelism, f) } @@ -479,6 +482,7 @@ func (wfc *WorkflowController) notifySemaphoreConfigUpdate(cm *apiv1.ConfigMap) log.Warnf("received object from indexer %s is not an unstructured", indexes.SemaphoreConfigIndexName) continue } + log.Infof("Adding workflow %s/%s", un.GetNamespace(), un.GetName()) wfc.wfQueue.AddRateLimited(fmt.Sprintf("%s/%s", un.GetNamespace(), un.GetName())) } } diff --git a/workflow/controller/controller_test.go b/workflow/controller/controller_test.go index d9756765b858..61073963b3c0 100644 --- a/workflow/controller/controller_test.go +++ b/workflow/controller/controller_test.go @@ -953,6 +953,7 @@ func TestNotifySemaphoreConfigUpdate(t *testing.T) { for i := 0; i < 3; i++ { key, _ := controller.wfQueue.Get() controller.wfQueue.Done(key) + controller.wfQueue.Forget(key) } assert.Equal(0, controller.wfQueue.Len()) diff --git a/workflow/sync/multi_throttler.go b/workflow/sync/multi_throttler.go index bd18a672f38a..a6ab0d1a4ae0 100644 --- a/workflow/sync/multi_throttler.go +++ b/workflow/sync/multi_throttler.go @@ -2,12 +2,14 @@ package sync import ( "container/heap" + "math/rand" "sync" "time" "k8s.io/client-go/tools/cache" wfv1 "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1" + log "github.com/sirupsen/logrus" ) //go:generate mockery --name=Throttler @@ -100,8 +102,11 @@ func (m *multiThrottler) namespaceAllows(namespace string) bool { } func (m *multiThrottler) Add(key Key, priority int32, creationTime time.Time) { + id := rand.Int63() + log.Infof("[DEBUG][ADD] on instance %d", id) m.lock.Lock() defer m.lock.Unlock() + defer log.Infof("[DEBUG][ADD]left instance %d", id) namespace, _, err := cache.SplitMetaNamespaceKey(key) if err != nil { return @@ -116,8 +121,11 @@ func (m *multiThrottler) Add(key Key, priority int32, creationTime time.Time) { } func (m *multiThrottler) Admit(key Key) bool { + id := rand.Int63() + log.Infof("[DEBUG][ADMIT] on instance %d", id) m.lock.Lock() defer m.lock.Unlock() + defer log.Infof("[DEBUG][ADMIT]left instance %d", id) _, ok := m.running[key] if ok {