Skip to content

Commit

Permalink
ensure correct configuration of workers count per namespace
Browse files Browse the repository at this point in the history
+ minor refactoring
  • Loading branch information
ast2023 committed Nov 17, 2023
1 parent 4f8f511 commit 3e26e79
Show file tree
Hide file tree
Showing 6 changed files with 222 additions and 49 deletions.
3 changes: 3 additions & 0 deletions common/dynamicconfig/collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,9 @@ const (
errCountLogThreshold = 1000
)

// ErrInvalidConfiguration indicates that the value provided by dynamic config is not legal
var ErrInvalidConfiguration = errors.New("invalid dynamic configuration")

var (
errKeyNotPresent = errors.New("key not present")
errNoMatchingConstraint = errors.New("no matching constraint in key")
Expand Down
8 changes: 6 additions & 2 deletions common/membership/ringpop/service_resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,12 +156,16 @@ func (r *serviceResolver) Lookup(key string) (membership.HostInfo, error) {
}

func (r *serviceResolver) LookupN(key string, n int) []membership.HostInfo {
if n <= 0 {
return nil
}
addresses := r.ring().LookupN(key, n)
if len(addresses) == 0 {
r.RequestRefresh()
return []membership.HostInfo{}
return nil
}
return util.MapSlice(addresses, membership.NewHostInfoFromAddress)
labels := r.getLabelsMap()
return util.MapSlice(addresses, func(address string) membership.HostInfo { return newHostInfo(address, labels) })
}

func (r *serviceResolver) AddListener(
Expand Down
22 changes: 19 additions & 3 deletions common/util/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,9 +119,12 @@ func MapConcurrent[IN any, OUT any](input []IN, mapper func(IN) (OUT, error)) ([

// MapSlice given slice xs []T and f(T) S produces slice []S by applying f to every element of xs
func MapSlice[T, S any](xs []T, f func(T) S) []S {
var result []S
for _, s := range xs {
result = append(result, f(s))
if xs == nil {
return nil
}
result := make([]S, len(xs))
for i, s := range xs {
result[i] = f(s)
}
return result
}
Expand All @@ -146,6 +149,19 @@ func FoldSlice[T any, A any](in []T, initializer A, reducer func(A, T) A) A {
return acc
}

// RepeatSlice given slice and a number (n) produces a new slice containing original slice n times
// if n is non-positive will produce nil
func RepeatSlice[T any](xs []T, n int) []T {
if xs == nil || n <= 0 {
return nil
}
ys := make([]T, n*len(xs))
for i := 0; i < n; i++ {
copy(ys[i*len(xs):], xs)
}
return ys
}

// Coalesce returns the first non-zero value of its arguments, or the zero value for the type
// if all are zero.
func Coalesce[T comparable](vals ...T) T {
Expand Down
109 changes: 109 additions & 0 deletions common/util/util_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
// The MIT License
//
// Copyright (c) 2020 Temporal Technologies Inc. All rights reserved.
//
// Copyright (c) 2020 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package util

import "testing"

func TestRepeatSlice(t *testing.T) {
t.Run("when input slice is nil should return nil", func(t *testing.T) {
got := RepeatSlice[int](nil, 5)
if got != nil {
t.Errorf("RepeatSlice produced non-nil slice from nil input")
}
})
t.Run("when input slice is empty should return empty", func(t *testing.T) {
empty := []int{}
got := RepeatSlice(empty, 5)
if len(got) != 0 {
t.Errorf("RepeatSlice filled empty slice")
}
})
t.Run("when requested repeat number equal 0 should return empty slice", func(t *testing.T) {
xs := []int{1, 2, 3, 4, 5}
got := RepeatSlice(xs, 0)
if len(got) != 0 {
t.Errorf("RepeatSlice with repeat count 0 returned non-empty slice")
}
})
t.Run("when requested repeat number is less than 0 should return empty slice", func(t *testing.T) {
xs := []int{1, 2, 3, 4, 5}
got := RepeatSlice(xs, -1)
if len(got) != 0 {
t.Errorf("RepeatSlice with repeat count -1 returned non-empty slice")
}
})
t.Run("when requested repeat number is 3 should return slice three times the input", func(t *testing.T) {
xs := []int{1, 2, 3, 4, 5}
got := RepeatSlice(xs, 3)
if len(got) != len(xs)*3 {
t.Errorf("RepeatSlice produced slice of wrong length: expected %d got %d", len(xs)*3, len(got))
}
for i, v := range got {
if v != xs[i%len(xs)] {
t.Errorf("RepeatSlice wrong value in result: expected %d at index %d but got %d", xs[i%len(xs)], i, v)
}
}
})
t.Run("should not change the input slice when truncating", func(t *testing.T) {
xs := []int{1, 2, 3, 4, 5}
_ = RepeatSlice(xs, 0)
if len(xs) != 5 {
t.Errorf("Repeat slice trancated the original slice: expected {1, 2, 3, 4, 5}, got %v", xs)
}
})
t.Run("should not change the input slice when replicating", func(t *testing.T) {
xs := []int{1, 2, 3, 4, 5}
_ = RepeatSlice(xs, 5)
if len(xs) != 5 {
t.Errorf("Repeat slice changed the original slice: expected {1, 2, 3, 4, 5}, got %v", xs)
}
})
}

func TestMapSlice(t *testing.T) {
t.Run("when given nil as slice should return nil", func(t *testing.T) {
ys := MapSlice(nil, func(x int) uint32 { return uint32(x) })
if ys != nil {
t.Errorf("mapping over nil produced non nil got %v", ys)
}
})
t.Run("when given an empty slice should return empty slice", func(t *testing.T) {
xs := []int{}
var ys []uint32
ys = MapSlice(xs, func(x int) uint32 { return uint32(x) })
if len(ys) != 0 {
t.Errorf("mapping over empty slice produced non empty slice got %v", ys)
}
})
t.Run("when given a slice and a function should apply function to every element of the original slice", func(t *testing.T) {
xs := []int{1, 2, 3, 4, 5}
ys := MapSlice(xs, func(x int) int { return x + 1 })
for i, y := range ys {
if y != (xs[i] + 1) {
t.Fatalf("mapping over slice did not apply function expected {2, 3, 4, 5} got %v", ys)
}
}
})
}
82 changes: 55 additions & 27 deletions service/worker/pernamespaceworker.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,10 +123,17 @@ type (
StickyScheduleToStartTimeout string // parse into time.Duration
StickyScheduleToStartTimeoutDuration time.Duration
}

workerAllocation struct {
Total int
Local int
}
)

var (
errNoWorkerNeeded = errors.New("no worker needed") // sentinel value, not a real error
// errInvalidConfiguration indicates that the value provided by dynamic config is not legal
errInvalidConfiguration = errors.New("invalid dynamic configuration")
)

func NewPerNamespaceWorkerManager(params perNamespaceWorkerManagerInitParams) *perNamespaceWorkerManager {
Expand Down Expand Up @@ -260,21 +267,43 @@ func (wm *perNamespaceWorkerManager) removeWorker(ns *namespace.Namespace) {
delete(wm.workers, ns.ID())
}

func (wm *perNamespaceWorkerManager) getWorkerMultiplicity(ns *namespace.Namespace) (int, int, error) {
func (wm *perNamespaceWorkerManager) getWorkerAllocation(ns *namespace.Namespace) (*workerAllocation, error) {
desiredWorkersCount, err := wm.getConfiguredWorkersCountFor(ns)
if err != nil {
return nil, err
}
if desiredWorkersCount == 0 {
return &workerAllocation{0, 0}, nil
}
localCount, err := wm.getLocallyDesiredWorkersCount(ns, desiredWorkersCount)
if err != nil {
return nil, err
}
return &workerAllocation{desiredWorkersCount, localCount}, nil
}

func (wm *perNamespaceWorkerManager) getConfiguredWorkersCountFor(ns *namespace.Namespace) (int, error) {
totalWorkers := wm.config.PerNamespaceWorkerCount(ns.Name().String())
if totalWorkers < 0 {
err := fmt.Errorf("%w namespace %s, workers count %d", errInvalidConfiguration, ns.Name(), totalWorkers)
return 0, err
}
return totalWorkers, nil
}

func (wm *perNamespaceWorkerManager) getLocallyDesiredWorkersCount(ns *namespace.Namespace, desiredNumberOfWorkers int) (int, error) {
key := ns.ID().String()
targets := wm.serviceResolver.LookupN(key, totalWorkers)
if len(targets) == 0 {
return 0, 0, membership.ErrInsufficientHosts
}
IsLocal := func(info membership.HostInfo) bool { return info.Identity() == wm.self.Identity() }
multiplicity := util.FoldSlice(targets, 0, func(acc int, t membership.HostInfo) int {
if IsLocal(t) {
acc++
}
return acc
})
return multiplicity, totalWorkers, nil
availableHosts := wm.serviceResolver.LookupN(key, desiredNumberOfWorkers)
hostsCount := len(availableHosts)
if hostsCount == 0 {
return 0, membership.ErrInsufficientHosts
}
maxWorkersPerHost := desiredNumberOfWorkers/hostsCount + 1
desiredDistribution := util.RepeatSlice(availableHosts, int(maxWorkersPerHost))[:desiredNumberOfWorkers]

isLocal := func(info membership.HostInfo) bool { return info.Identity() == wm.self.Identity() }
result := len(util.FilterSlice(desiredDistribution, isLocal))
return result, nil
}

func (wm *perNamespaceWorkerManager) getWorkerOptions(ns *namespace.Namespace) sdkWorkerOptions {
Expand Down Expand Up @@ -390,18 +419,18 @@ func (w *perNamespaceWorker) tryRefresh(ns *namespace.Namespace) error {
}

// check if we are responsible for this namespace at all
multiplicity, totalWorkers, err := w.wm.getWorkerMultiplicity(ns)
workerAllocation, err := w.wm.getWorkerAllocation(ns)
if err != nil {
w.logger.Error("Failed to look up hosts", tag.Error(err))
// TODO: add metric also
return err
}
if multiplicity == 0 {
if workerAllocation.Local == 0 {
// not ours, don't need a worker
return errNoWorkerNeeded
}
// ensure this changes if multiplicity changes
componentSet += fmt.Sprintf(",%d", multiplicity)
componentSet += fmt.Sprintf(",%d", workerAllocation.Local)

// get sdk worker options
dcOptions := w.wm.getWorkerOptions(ns)
Expand All @@ -421,7 +450,7 @@ func (w *perNamespaceWorker) tryRefresh(ns *namespace.Namespace) error {
// create new one. note that even before startWorker returns, the worker may have started
// and already called the fatal error handler. we need to set w.client+worker+componentSet
// before releasing the lock to keep our state consistent.
client, worker, err := w.startWorker(ns, enabledComponents, multiplicity, totalWorkers, dcOptions)
client, worker, err := w.startWorker(ns, enabledComponents, workerAllocation, dcOptions)
if err != nil {
// TODO: add metric also
return err
Expand All @@ -436,8 +465,7 @@ func (w *perNamespaceWorker) tryRefresh(ns *namespace.Namespace) error {
func (w *perNamespaceWorker) startWorker(
ns *namespace.Namespace,
components []workercommon.PerNSWorkerComponent,
multiplicity int,
totalWorkers int,
allocation *workerAllocation,
dcOptions sdkWorkerOptions,
) (sdkclient.Client, sdkworker.Worker, error) {
nsName := ns.Name().String()
Expand All @@ -462,19 +490,19 @@ func (w *perNamespaceWorker) startWorker(

sdkoptions.BackgroundActivityContext = headers.SetCallerInfo(context.Background(), headers.NewBackgroundCallerInfo(ns.Name().String()))
sdkoptions.Identity = fmt.Sprintf("server-worker@%d@%s@%s", os.Getpid(), w.wm.hostName, nsName)
// increase these if we're supposed to run with more multiplicity
sdkoptions.MaxConcurrentWorkflowTaskPollers *= multiplicity
sdkoptions.MaxConcurrentActivityTaskPollers *= multiplicity
sdkoptions.MaxConcurrentLocalActivityExecutionSize *= multiplicity
sdkoptions.MaxConcurrentWorkflowTaskExecutionSize *= multiplicity
sdkoptions.MaxConcurrentActivityExecutionSize *= multiplicity
// increase these if we're supposed to run with more allocation
sdkoptions.MaxConcurrentWorkflowTaskPollers *= int(allocation.Local)
sdkoptions.MaxConcurrentActivityTaskPollers *= int(allocation.Local)
sdkoptions.MaxConcurrentLocalActivityExecutionSize *= int(allocation.Local)
sdkoptions.MaxConcurrentWorkflowTaskExecutionSize *= int(allocation.Local)
sdkoptions.MaxConcurrentActivityExecutionSize *= int(allocation.Local)
sdkoptions.OnFatalError = w.onFatalError

// this should not block because the client already has server capabilities
worker := w.wm.sdkClientFactory.NewWorker(client, primitives.PerNSWorkerTaskQueue, sdkoptions)
details := workercommon.RegistrationDetails{
TotalWorkers: totalWorkers,
Multiplicity: multiplicity,
TotalWorkers: int(allocation.Total),
Multiplicity: int(allocation.Local),
}
for _, cmp := range components {
cmp.Register(worker, ns, details)
Expand Down
Loading

0 comments on commit 3e26e79

Please sign in to comment.