Skip to content

Commit

Permalink
fix: test
Browse files Browse the repository at this point in the history
Signed-off-by: isubasinghe <[email protected]>
  • Loading branch information
isubasinghe committed Dec 31, 2024
1 parent 7e4e245 commit 14599e0
Show file tree
Hide file tree
Showing 3 changed files with 14 additions and 1 deletion.
6 changes: 5 additions & 1 deletion workflow/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,10 @@ func NewWorkflowController(ctx context.Context, restConfig *rest.Config, kubecli
}

func (wfc *WorkflowController) newThrottler() sync.Throttler {
f := func(key string) { wfc.wfQueue.AddRateLimited(key) }
f := func(key string) {
log.Infof("[DEBUG] queued string %s", key)
wfc.wfQueue.Add(key)
}
return sync.NewMultiThrottler(wfc.Config.Parallelism, make(map[string]int), wfc.Config.NamespaceParallelism, f)
}

Expand Down Expand Up @@ -479,6 +482,7 @@ func (wfc *WorkflowController) notifySemaphoreConfigUpdate(cm *apiv1.ConfigMap)
log.Warnf("received object from indexer %s is not an unstructured", indexes.SemaphoreConfigIndexName)
continue
}
log.Infof("Adding workflow %s/%s", un.GetNamespace(), un.GetName())
wfc.wfQueue.AddRateLimited(fmt.Sprintf("%s/%s", un.GetNamespace(), un.GetName()))
}
}
Expand Down
1 change: 1 addition & 0 deletions workflow/controller/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -953,6 +953,7 @@ func TestNotifySemaphoreConfigUpdate(t *testing.T) {
for i := 0; i < 3; i++ {
key, _ := controller.wfQueue.Get()
controller.wfQueue.Done(key)
controller.wfQueue.Forget(key)
}
assert.Equal(0, controller.wfQueue.Len())

Expand Down
8 changes: 8 additions & 0 deletions workflow/sync/multi_throttler.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,14 @@ package sync

import (
"container/heap"
"math/rand"
"sync"
"time"

"k8s.io/client-go/tools/cache"

wfv1 "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1"
log "github.com/sirupsen/logrus"
)

//go:generate mockery --name=Throttler
Expand Down Expand Up @@ -100,8 +102,11 @@ func (m *multiThrottler) namespaceAllows(namespace string) bool {
}

func (m *multiThrottler) Add(key Key, priority int32, creationTime time.Time) {
id := rand.Int63()
log.Infof("[DEBUG][ADD] on instance %d", id)
m.lock.Lock()
defer m.lock.Unlock()
defer log.Infof("[DEBUG][ADD]left instance %d", id)
namespace, _, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
return
Expand All @@ -116,8 +121,11 @@ func (m *multiThrottler) Add(key Key, priority int32, creationTime time.Time) {
}

func (m *multiThrottler) Admit(key Key) bool {
id := rand.Int63()
log.Infof("[DEBUG][ADMIT] on instance %d", id)
m.lock.Lock()
defer m.lock.Unlock()
defer log.Infof("[DEBUG][ADMIT]left instance %d", id)

_, ok := m.running[key]
if ok {
Expand Down

0 comments on commit 14599e0

Please sign in to comment.