From 9f6bb244422d977566a6ab32c75d291995401eb2 Mon Sep 17 00:00:00 2001 From: isubasinghe Date: Mon, 30 Dec 2024 19:35:43 +1100 Subject: [PATCH] fix: ensure tests pass Signed-off-by: isubasinghe --- workflow/controller/controller_test.go | 4 +++- workflow/sync/multi_throttler.go | 28 +++----------------------- workflow/sync/multi_throttler_test.go | 2 +- 3 files changed, 7 insertions(+), 27 deletions(-) diff --git a/workflow/controller/controller_test.go b/workflow/controller/controller_test.go index 627c451edfa5..d9756765b858 100644 --- a/workflow/controller/controller_test.go +++ b/workflow/controller/controller_test.go @@ -688,7 +688,9 @@ func TestParallelism(t *testing.T) { for tt, f := range map[string]func(controller *WorkflowController){ "Parallelism": func(x *WorkflowController) { x.Config.Parallelism = 1 - x.Config.NamespaceParallelism = 0 + }, + "NamespaceParallelism": func(x *WorkflowController) { + x.Config.NamespaceParallelism = 1 }, } { t.Run(tt, func(t *testing.T) { diff --git a/workflow/sync/multi_throttler.go b/workflow/sync/multi_throttler.go index f39bf3818ef6..0e8ee56fa034 100644 --- a/workflow/sync/multi_throttler.go +++ b/workflow/sync/multi_throttler.go @@ -27,7 +27,7 @@ type Throttler interface { type Key = string type QueueFunc func(Key) -// NewMultiThrottler creates a new multi throttler for throttling both namespace and global parallelism +// NewMultiThrottler creates a new multi throttler for throttling both namespace and global parallelism, a parallelism value of zero disables throttling func NewMultiThrottler(parallelism int, namespaceParallelism map[string]int, namespaceParallelismDefault int, queue QueueFunc) Throttler { return &multiThrottler{ queue: queue, @@ -55,9 +55,6 @@ type multiThrottler struct { func (m *multiThrottler) Init(wfs []wfv1.Workflow) error { m.lock.Lock() defer m.lock.Unlock() - if m.totalParallelism == 0 { - return nil - } type keyNamespacePair struct { key string @@ -76,16 +73,11 @@ func (m *multiThrottler) Init(wfs []wfv1.Workflow) error { if err != nil { return err } - _, namespaceLimit := m.namespaceCount(namespace) - if namespaceLimit == 0 { - continue - } pairs = append(pairs, keyNamespacePair{key: key, namespace: namespace}) } for _, pair := range pairs { m.running[pair.key] = true - m.totalParallelism++ m.namespaceCounts[pair.namespace] = m.namespaceCounts[pair.namespace] + 1 } return nil @@ -103,24 +95,16 @@ func (m *multiThrottler) namespaceCount(namespace string) (int, int) { func (m *multiThrottler) namespaceAllows(namespace string) bool { count, limit := m.namespaceCount(namespace) - return count < limit + return count < limit || limit == 0 } func (m *multiThrottler) Add(key Key, priority int32, creationTime time.Time) { m.lock.Lock() defer m.lock.Unlock() - if m.totalParallelism == 0 { - return - } namespace, _, err := cache.SplitMetaNamespaceKey(key) if err != nil { return } - _, namespaceLimit := m.namespaceCount(namespace) - if namespaceLimit == 0 { - return - } - _, ok := m.pending[namespace] if !ok { m.pending[namespace] = &priorityQueue{itemByKey: make(map[string]*item)} @@ -134,12 +118,6 @@ func (m *multiThrottler) Admit(key Key) bool { m.lock.Lock() defer m.lock.Unlock() - namespace, _, _ := cache.SplitMetaNamespaceKey(key) - _, namespaceLimit := m.namespaceCount(namespace) - if m.totalParallelism == 0 || namespaceLimit == 0 { - return true - } - _, ok := m.running[key] if ok { return true @@ -161,7 +139,7 @@ func (m *multiThrottler) Remove(key Key) { } func (m *multiThrottler) queueThrottled() { - if m.totalParallelism != -1 && len(m.running) >= m.totalParallelism { + if m.totalParallelism != 0 && len(m.running) >= m.totalParallelism { return } var bestItem *item diff --git a/workflow/sync/multi_throttler_test.go b/workflow/sync/multi_throttler_test.go index 9ba16c403035..bf04393f0e6a 100644 --- a/workflow/sync/multi_throttler_test.go +++ b/workflow/sync/multi_throttler_test.go @@ -14,7 +14,7 @@ import ( ) func TestMultiNoParallelismSamePriority(t *testing.T) { - throttler := NewMultiThrottler(0, make(map[string]int), 0, nil) + throttler := NewMultiThrottler(0, make(map[string]int), 0, func(Key) {}) throttler.Add("default/c", 0, time.Now().Add(2*time.Hour)) throttler.Add("default/b", 0, time.Now().Add(1*time.Hour))