Skip to content

Commit

Permalink
fix: ensure tests pass
Browse files Browse the repository at this point in the history
Signed-off-by: isubasinghe <[email protected]>
  • Loading branch information
isubasinghe committed Dec 30, 2024
1 parent 76b839c commit 9f6bb24
Show file tree
Hide file tree
Showing 3 changed files with 7 additions and 27 deletions.
4 changes: 3 additions & 1 deletion workflow/controller/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -688,7 +688,9 @@ func TestParallelism(t *testing.T) {
for tt, f := range map[string]func(controller *WorkflowController){
"Parallelism": func(x *WorkflowController) {
x.Config.Parallelism = 1
x.Config.NamespaceParallelism = 0
},
"NamespaceParallelism": func(x *WorkflowController) {
x.Config.NamespaceParallelism = 1
},
} {
t.Run(tt, func(t *testing.T) {
Expand Down
28 changes: 3 additions & 25 deletions workflow/sync/multi_throttler.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ type Throttler interface {
type Key = string
type QueueFunc func(Key)

// NewMultiThrottler creates a new multi throttler for throttling both namespace and global parallelism
// NewMultiThrottler creates a new multi throttler for throttling both namespace and global parallelism, a parallelism value of zero disables throttling
func NewMultiThrottler(parallelism int, namespaceParallelism map[string]int, namespaceParallelismDefault int, queue QueueFunc) Throttler {
return &multiThrottler{
queue: queue,
Expand Down Expand Up @@ -55,9 +55,6 @@ type multiThrottler struct {
func (m *multiThrottler) Init(wfs []wfv1.Workflow) error {
m.lock.Lock()
defer m.lock.Unlock()
if m.totalParallelism == 0 {
return nil
}

type keyNamespacePair struct {
key string
Expand All @@ -76,16 +73,11 @@ func (m *multiThrottler) Init(wfs []wfv1.Workflow) error {
if err != nil {
return err
}
_, namespaceLimit := m.namespaceCount(namespace)
if namespaceLimit == 0 {
continue
}
pairs = append(pairs, keyNamespacePair{key: key, namespace: namespace})
}

for _, pair := range pairs {
m.running[pair.key] = true
m.totalParallelism++
m.namespaceCounts[pair.namespace] = m.namespaceCounts[pair.namespace] + 1
}
return nil
Expand All @@ -103,24 +95,16 @@ func (m *multiThrottler) namespaceCount(namespace string) (int, int) {

func (m *multiThrottler) namespaceAllows(namespace string) bool {
count, limit := m.namespaceCount(namespace)
return count < limit
return count < limit || limit == 0
}

func (m *multiThrottler) Add(key Key, priority int32, creationTime time.Time) {
m.lock.Lock()
defer m.lock.Unlock()
if m.totalParallelism == 0 {
return
}
namespace, _, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
return
}
_, namespaceLimit := m.namespaceCount(namespace)
if namespaceLimit == 0 {
return
}

_, ok := m.pending[namespace]
if !ok {
m.pending[namespace] = &priorityQueue{itemByKey: make(map[string]*item)}
Expand All @@ -134,12 +118,6 @@ func (m *multiThrottler) Admit(key Key) bool {
m.lock.Lock()
defer m.lock.Unlock()

namespace, _, _ := cache.SplitMetaNamespaceKey(key)
_, namespaceLimit := m.namespaceCount(namespace)
if m.totalParallelism == 0 || namespaceLimit == 0 {
return true
}

_, ok := m.running[key]
if ok {
return true
Expand All @@ -161,7 +139,7 @@ func (m *multiThrottler) Remove(key Key) {
}

func (m *multiThrottler) queueThrottled() {
if m.totalParallelism != -1 && len(m.running) >= m.totalParallelism {
if m.totalParallelism != 0 && len(m.running) >= m.totalParallelism {
return
}
var bestItem *item
Expand Down
2 changes: 1 addition & 1 deletion workflow/sync/multi_throttler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import (
)

func TestMultiNoParallelismSamePriority(t *testing.T) {
throttler := NewMultiThrottler(0, make(map[string]int), 0, nil)
throttler := NewMultiThrottler(0, make(map[string]int), 0, func(Key) {})

throttler.Add("default/c", 0, time.Now().Add(2*time.Hour))
throttler.Add("default/b", 0, time.Now().Add(1*time.Hour))
Expand Down

0 comments on commit 9f6bb24

Please sign in to comment.