From ee983fe8ace6efbfbd888a4d985e6f5a767a2db6 Mon Sep 17 00:00:00 2001 From: Akram Ben Aissi Date: Mon, 5 Jun 2023 15:19:14 +0200 Subject: [PATCH] fix: Fixes sharding placement algorithm and allows development of alternative algorithms (#13018) * fix: Extraction of DistributionFunction to allow passing different type of functions to filter clusters by shard - Adding unit tests for sharding - Refresh clusters list on DistributionFunction call Signed-off-by: Akram Ben Aissi Signed-off-by: ishitasequeira * fix: Incorrect conversion of an integer with architecture-dependent bit size from [strconv.Atoi](1) to a lower bit size type uint32 without an upper bound check. Signed-off-by: Akram Ben Aissi * Added config to switch to round-robin sharding Signed-off-by: Raghavi Shirur Signed-off-by: Akram Ben Aissi * Documenting sharding more, adding shuffling tests (skipped), re-enable sharding algo env var Signed-off-by: Akram Ben Aissi * Allow configuration through argocd-cmd-params-cm configMap and key: controller.sharding.algorithm Signed-off-by: Akram Ben Aissi * De-duplicate code, remove reflection for default case, shorten distribution methods name, ran codegen on manifests Signed-off-by: Akram Ben Aissi Signed-off-by: Akram Ben Aissi --------- Signed-off-by: Akram Ben Aissi Signed-off-by: ishitasequeira Signed-off-by: Raghavi Shirur Co-authored-by: Raghavi Shirur --- .../commands/argocd_application_controller.go | 15 +- cmd/argocd/commands/admin/cluster.go | 8 +- common/common.go | 8 + controller/appcontroller.go | 15 +- controller/appcontroller_test.go | 1 - controller/sharding/sharding.go | 155 +++++++-- controller/sharding/sharding_test.go | 317 +++++++++++++++++- controller/sharding/shuffle_test.go | 83 +++++ .../operator-manual/argocd-cmd-params-cm.yaml | 2 + docs/operator-manual/high_availability.md | 28 ++ .../argocd-application-controller.md | 1 + ...cd-application-controller-statefulset.yaml | 6 + manifests/core-install.yaml | 6 + manifests/ha/install.yaml | 6 + manifests/ha/namespace-install.yaml | 6 + manifests/install.yaml | 6 + manifests/namespace-install.yaml | 6 + util/env/env.go | 13 +- 18 files changed, 637 insertions(+), 45 deletions(-) create mode 100644 controller/sharding/shuffle_test.go diff --git a/cmd/argocd-application-controller/commands/argocd_application_controller.go b/cmd/argocd-application-controller/commands/argocd_application_controller.go index 597cfd0e33495..ab88d4ccbdf93 100644 --- a/cmd/argocd-application-controller/commands/argocd_application_controller.go +++ b/cmd/argocd-application-controller/commands/argocd_application_controller.go @@ -23,6 +23,7 @@ import ( cacheutil "github.com/argoproj/argo-cd/v2/util/cache" appstatecache "github.com/argoproj/argo-cd/v2/util/cache/appstate" "github.com/argoproj/argo-cd/v2/util/cli" + "github.com/argoproj/argo-cd/v2/util/db" "github.com/argoproj/argo-cd/v2/util/env" "github.com/argoproj/argo-cd/v2/util/errors" kubeutil "github.com/argoproj/argo-cd/v2/util/kube" @@ -62,6 +63,7 @@ func NewCommand() *cobra.Command { otlpAddress string applicationNamespaces []string persistResourceHealth bool + shardingAlgorithm string ) var command = cobra.Command{ Use: cliName, @@ -134,7 +136,7 @@ func NewCommand() *cobra.Command { appController.InvalidateProjectsCache() })) kubectl := kubeutil.NewKubectl() - clusterFilter := getClusterFilter() + clusterFilter := getClusterFilter(kubeClient, settingsMgr, shardingAlgorithm) appController, err = controller.NewApplicationController( namespace, settingsMgr, @@ -152,7 +154,8 @@ func NewCommand() *cobra.Command { kubectlParallelismLimit, persistResourceHealth, clusterFilter, - applicationNamespaces) + applicationNamespaces, + ) errors.CheckError(err) cacheutil.CollectMetrics(redisClient, appController.GetMetricsServer()) @@ -195,13 +198,14 @@ func NewCommand() *cobra.Command { command.Flags().StringVar(&otlpAddress, "otlp-address", env.StringFromEnv("ARGOCD_APPLICATION_CONTROLLER_OTLP_ADDRESS", ""), "OpenTelemetry collector address to send traces to") command.Flags().StringSliceVar(&applicationNamespaces, "application-namespaces", env.StringsFromEnv("ARGOCD_APPLICATION_NAMESPACES", []string{}, ","), "List of additional namespaces that applications are allowed to be reconciled from") command.Flags().BoolVar(&persistResourceHealth, "persist-resource-health", env.ParseBoolFromEnv("ARGOCD_APPLICATION_CONTROLLER_PERSIST_RESOURCE_HEALTH", true), "Enables storing the managed resources health in the Application CRD") + command.Flags().StringVar(&shardingAlgorithm, "sharding-method", env.StringFromEnv(common.EnvControllerShardingAlgorithm, common.DefaultShardingAlgorithm), "Enables choice of sharding method. Supported sharding methods are : [legacy, round-robin] ") cacheSrc = appstatecache.AddCacheFlagsToCmd(&command, func(client *redis.Client) { redisClient = client }) return &command } -func getClusterFilter() func(cluster *v1alpha1.Cluster) bool { +func getClusterFilter(kubeClient *kubernetes.Clientset, settingsMgr *settings.SettingsManager, shardingAlgorithm string) sharding.ClusterFilterFunction { replicas := env.ParseNumFromEnv(common.EnvControllerReplicas, 0, 0, math.MaxInt32) shard := env.ParseNumFromEnv(common.EnvControllerShard, -1, -math.MaxInt32, math.MaxInt32) var clusterFilter func(cluster *v1alpha1.Cluster) bool @@ -212,7 +216,10 @@ func getClusterFilter() func(cluster *v1alpha1.Cluster) bool { errors.CheckError(err) } log.Infof("Processing clusters from shard %d", shard) - clusterFilter = sharding.GetClusterFilter(replicas, shard) + db := db.NewDB(settingsMgr.GetNamespace(), settingsMgr, kubeClient) + log.Infof("Using filter function: %s", shardingAlgorithm) + distributionFunction := sharding.GetDistributionFunction(db, shardingAlgorithm) + clusterFilter = sharding.GetClusterFilter(distributionFunction, shard) } else { log.Info("Processing all cluster shards") } diff --git a/cmd/argocd/commands/admin/cluster.go b/cmd/argocd/commands/admin/cluster.go index dd5833a21b048..52b33fcc8d5ae 100644 --- a/cmd/argocd/commands/admin/cluster.go +++ b/cmd/argocd/commands/admin/cluster.go @@ -19,6 +19,7 @@ import ( "k8s.io/client-go/kubernetes/fake" "k8s.io/client-go/rest" "k8s.io/client-go/tools/clientcmd" + "k8s.io/utils/pointer" cmdutil "github.com/argoproj/argo-cd/v2/cmd/util" "github.com/argoproj/argo-cd/v2/common" @@ -115,10 +116,13 @@ func loadClusters(ctx context.Context, kubeClient *kubernetes.Clientset, appClie } batch := clustersList.Items[batchStart:batchEnd] _ = kube.RunAllAsync(len(batch), func(i int) error { - cluster := batch[i] clusterShard := 0 + cluster := batch[i] if replicas > 0 { - clusterShard = sharding.GetShardByID(cluster.ID, replicas) + distributionFunction := sharding.GetDistributionFunction(argoDB, common.DefaultShardingAlgorithm) + distributionFunction(&cluster) + cluster.Shard = pointer.Int64Ptr(int64(clusterShard)) + log.Infof("Cluster with uid: %s will be processed by shard %d", cluster.ID, clusterShard) } if shard != -1 && clusterShard != shard { diff --git a/common/common.go b/common/common.go index b00d436b1ea50..c44b4e7364e00 100644 --- a/common/common.go +++ b/common/common.go @@ -103,6 +103,12 @@ const ( // PasswordPatten is the default password patten PasswordPatten = `^.{8,32}$` + + //LegacyShardingAlgorithm is the default value for Sharding Algorithm it uses an `uid` based distribution (non-uniform) + LegacyShardingAlgorithm = "legacy" + //RoundRobinShardingAlgorithm is a flag value that can be opted for Sharding Algorithm it uses an equal distribution accross all shards + RoundRobinShardingAlgorithm = "round-robin" + DefaultShardingAlgorithm = LegacyShardingAlgorithm ) // Dex related constants @@ -203,6 +209,8 @@ const ( EnvControllerReplicas = "ARGOCD_CONTROLLER_REPLICAS" // EnvControllerShard is the shard number that should be handled by controller EnvControllerShard = "ARGOCD_CONTROLLER_SHARD" + // EnvControllerShardingAlgorithm is the distribution sharding algorithm to be used: legacy or round-robin + EnvControllerShardingAlgorithm = "ARGOCD_CONTROLLER_SHARDING_ALGORITHM" // EnvEnableGRPCTimeHistogramEnv enables gRPC metrics collection EnvEnableGRPCTimeHistogramEnv = "ARGOCD_ENABLE_GRPC_TIME_HISTOGRAM" // EnvGithubAppCredsExpirationDuration controls the caching of Github app credentials. This value is in minutes (default: 60) diff --git a/controller/appcontroller.go b/controller/appcontroller.go index 585c95c35efa6..cda4939b042e3 100644 --- a/controller/appcontroller.go +++ b/controller/appcontroller.go @@ -41,14 +41,17 @@ import ( "github.com/argoproj/argo-cd/v2/common" statecache "github.com/argoproj/argo-cd/v2/controller/cache" "github.com/argoproj/argo-cd/v2/controller/metrics" + "github.com/argoproj/argo-cd/v2/controller/sharding" "github.com/argoproj/argo-cd/v2/pkg/apis/application" appv1 "github.com/argoproj/argo-cd/v2/pkg/apis/application/v1alpha1" + argov1alpha "github.com/argoproj/argo-cd/v2/pkg/apis/application/v1alpha1" appclientset "github.com/argoproj/argo-cd/v2/pkg/client/clientset/versioned" "github.com/argoproj/argo-cd/v2/pkg/client/informers/externalversions/application/v1alpha1" applisters "github.com/argoproj/argo-cd/v2/pkg/client/listers/application/v1alpha1" "github.com/argoproj/argo-cd/v2/reposerver/apiclient" "github.com/argoproj/argo-cd/v2/util/argo" argodiff "github.com/argoproj/argo-cd/v2/util/argo/diff" + appstatecache "github.com/argoproj/argo-cd/v2/util/cache/appstate" "github.com/argoproj/argo-cd/v2/util/db" "github.com/argoproj/argo-cd/v2/util/errors" @@ -229,10 +232,12 @@ func (ctrl *ApplicationController) InvalidateProjectsCache(names ...string) { ctrl.projByNameCache.Delete(name) } } else { - ctrl.projByNameCache.Range(func(key, _ interface{}) bool { - ctrl.projByNameCache.Delete(key) - return true - }) + if ctrl != nil { + ctrl.projByNameCache.Range(func(key, _ interface{}) bool { + ctrl.projByNameCache.Delete(key) + return true + }) + } } } @@ -2010,3 +2015,5 @@ func (ctrl *ApplicationController) toAppKey(appName string) string { func (ctrl *ApplicationController) toAppQualifiedName(appName, appNamespace string) string { return fmt.Sprintf("%s/%s", appNamespace, appName) } + +type ClusterFilterFunction func(c *argov1alpha.Cluster, distributionFunction sharding.DistributionFunction) bool diff --git a/controller/appcontroller_test.go b/controller/appcontroller_test.go index 179ab84e2f879..d49aa6e8d5499 100644 --- a/controller/appcontroller_test.go +++ b/controller/appcontroller_test.go @@ -167,7 +167,6 @@ metadata: namespace: ` + test.FakeArgoCDNamespace + ` type: Opaque ` - var fakeApp = ` apiVersion: argoproj.io/v1alpha1 kind: Application diff --git a/controller/sharding/sharding.go b/controller/sharding/sharding.go index 1c0615196bd06..8529171f9fae7 100644 --- a/controller/sharding/sharding.go +++ b/controller/sharding/sharding.go @@ -1,17 +1,127 @@ package sharding import ( + "context" "fmt" "hash/fnv" + "math" "os" + "sort" "strconv" "strings" + "github.com/argoproj/argo-cd/v2/common" "github.com/argoproj/argo-cd/v2/pkg/apis/application/v1alpha1" + + "github.com/argoproj/argo-cd/v2/util/db" + "github.com/argoproj/argo-cd/v2/util/env" + log "github.com/sirupsen/logrus" ) +// Make it overridable for testing +var osHostnameFunction = os.Hostname + +type DistributionFunction func(c *v1alpha1.Cluster) int +type ClusterFilterFunction func(c *v1alpha1.Cluster) bool + +// GetClusterFilter returns a ClusterFilterFunction which is a function taking a cluster as a parameter +// and returns wheter or not the cluster should be processed by a given shard. It calls the distributionFunction +// to determine which shard will process the cluster, and if the given shard is equal to the calculated shard +// the function will return true. +func GetClusterFilter(distributionFunction DistributionFunction, shard int) ClusterFilterFunction { + replicas := env.ParseNumFromEnv(common.EnvControllerReplicas, 0, 0, math.MaxInt32) + return func(c *v1alpha1.Cluster) bool { + clusterShard := 0 + if c != nil && c.Shard != nil { + requestedShard := int(*c.Shard) + if requestedShard < replicas { + clusterShard = requestedShard + } else { + log.Warnf("Specified cluster shard (%d) for cluster: %s is greater than the number of available shard. Assigning automatically.", requestedShard, c.Name) + } + } else { + clusterShard = distributionFunction(c) + } + return clusterShard == shard + } +} + +// GetDistributionFunction returns which DistributionFunction should be used based on the passed algorithm and +// the current datas. +func GetDistributionFunction(db db.ArgoDB, shardingAlgorithm string) DistributionFunction { + log.Infof("Using filter function: %s", shardingAlgorithm) + distributionFunction := LegacyDistributionFunction() + switch shardingAlgorithm { + case common.RoundRobinShardingAlgorithm: + distributionFunction = RoundRobinDistributionFunction(db) + case common.LegacyShardingAlgorithm: + distributionFunction = LegacyDistributionFunction() + default: + log.Warnf("distribution type %s is not supported, defaulting to %s", shardingAlgorithm, common.DefaultShardingAlgorithm) + } + return distributionFunction +} + +// LegacyDistributionFunction returns a DistributionFunction using a stable distribution algorithm: +// for a given cluster the function will return the shard number based on the cluster id. This function +// is lightweight and can be distributed easily, however, it does not ensure an homogenous distribution as +// some shards may get assigned more clusters than others. It is the legacy function distribution that is +// kept for compatibility reasons +func LegacyDistributionFunction() DistributionFunction { + replicas := env.ParseNumFromEnv(common.EnvControllerReplicas, 0, 0, math.MaxInt32) + return func(c *v1alpha1.Cluster) int { + if replicas == 0 { + return -1 + } + if c == nil { + return 0 + } + id := c.ID + log.Debugf("Calculating cluster shard for cluster id: %s", id) + if id == "" { + return 0 + } else { + h := fnv.New32a() + _, _ = h.Write([]byte(id)) + shard := int32(h.Sum32() % uint32(replicas)) + log.Infof("Cluster with id=%s will be processed by shard %d", id, shard) + return int(shard) + } + } +} + +// RoundRobinDistributionFunction returns a DistributionFunction using an homogeneous distribution algorithm: +// for a given cluster the function will return the shard number based on the modulo of the cluster rank in +// the cluster's list sorted by uid on the shard number. +// This function ensures an homogenous distribution: each shards got assigned the same number of +// clusters +/-1 , but with the drawback of a reshuffling of clusters accross shards in case of some changes +// in the cluster list +func RoundRobinDistributionFunction(db db.ArgoDB) DistributionFunction { + replicas := env.ParseNumFromEnv(common.EnvControllerReplicas, 0, 0, math.MaxInt32) + return func(c *v1alpha1.Cluster) int { + if replicas > 0 { + if c == nil { // in-cluster does not necessarly have a secret assigned. So we are receiving a nil cluster here. + return 0 + } else { + clusterIndexdByClusterIdMap := createClusterIndexByClusterIdMap(db) + clusterIndex, ok := clusterIndexdByClusterIdMap[c.ID] + if !ok { + log.Warnf("Cluster with id=%s not found in cluster map.", c.ID) + return -1 + } + shard := int(clusterIndex % replicas) + log.Infof("Cluster with id=%s will be processed by shard %d", c.ID, shard) + return shard + } + } + log.Warnf("The number of replicas (%d) is lower than 1", replicas) + return -1 + } +} + +// InferShard extracts the shard index based on its hostname. func InferShard() (int, error) { - hostname, err := os.Hostname() + hostname, err := osHostnameFunction() if err != nil { return 0, err } @@ -23,31 +133,32 @@ func InferShard() (int, error) { if err != nil { return 0, fmt.Errorf("hostname should ends with shard number separated by '-' but got: %s", hostname) } - return shard, nil + return int(shard), nil } -// GetShardByID calculates cluster shard as `clusterSecret.UID % replicas count` -func GetShardByID(id string, replicas int) int { - if id == "" { - return 0 - } else { - h := fnv.New32a() - _, _ = h.Write([]byte(id)) - return int(h.Sum32() % uint32(replicas)) +func getSortedClustersList(db db.ArgoDB) []v1alpha1.Cluster { + ctx := context.Background() + clustersList, dbErr := db.ListClusters(ctx) + if dbErr != nil { + log.Warnf("Error while querying clusters list from database: %v", dbErr) + return []v1alpha1.Cluster{} } + clusters := clustersList.Items + sort.Slice(clusters, func(i, j int) bool { + return clusters[i].ID < clusters[j].ID + }) + return clusters } -func GetClusterFilter(replicas int, shard int) func(c *v1alpha1.Cluster) bool { - return func(c *v1alpha1.Cluster) bool { - clusterShard := 0 - // cluster might be nil if app is using invalid cluster URL, assume shard 0 in this case. - if c != nil { - if c.Shard != nil { - clusterShard = int(*c.Shard) - } else { - clusterShard = GetShardByID(c.ID, replicas) - } - } - return clusterShard == shard +func createClusterIndexByClusterIdMap(db db.ArgoDB) map[string]int { + clusters := getSortedClustersList(db) + log.Debugf("ClustersList has %d items", len(clusters)) + clusterById := make(map[string]v1alpha1.Cluster) + clusterIndexedByClusterId := make(map[string]int) + for i, cluster := range clusters { + log.Debugf("Adding cluster with id=%s and name=%s to cluster's map", cluster.ID, cluster.Name) + clusterById[cluster.ID] = cluster + clusterIndexedByClusterId[cluster.ID] = i } + return clusterIndexedByClusterId } diff --git a/controller/sharding/sharding_test.go b/controller/sharding/sharding_test.go index dc27726f8a6fa..ca44bf32e2d6b 100644 --- a/controller/sharding/sharding_test.go +++ b/controller/sharding/sharding_test.go @@ -1,29 +1,330 @@ package sharding import ( + "errors" + "fmt" + "os" "testing" + "github.com/argoproj/argo-cd/v2/common" "github.com/argoproj/argo-cd/v2/pkg/apis/application/v1alpha1" - + dbmocks "github.com/argoproj/argo-cd/v2/util/db/mocks" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" ) func TestGetShardByID_NotEmptyID(t *testing.T) { - assert.Equal(t, 0, GetShardByID("1", 2)) - assert.Equal(t, 1, GetShardByID("2", 2)) - assert.Equal(t, 0, GetShardByID("3", 2)) - assert.Equal(t, 1, GetShardByID("4", 2)) + os.Setenv(common.EnvControllerReplicas, "1") + assert.Equal(t, 0, LegacyDistributionFunction()(&v1alpha1.Cluster{ID: "1"})) + assert.Equal(t, 0, LegacyDistributionFunction()(&v1alpha1.Cluster{ID: "2"})) + assert.Equal(t, 0, LegacyDistributionFunction()(&v1alpha1.Cluster{ID: "3"})) + assert.Equal(t, 0, LegacyDistributionFunction()(&v1alpha1.Cluster{ID: "4"})) } func TestGetShardByID_EmptyID(t *testing.T) { - shard := GetShardByID("", 10) + os.Setenv(common.EnvControllerReplicas, "1") + distributionFunction := LegacyDistributionFunction + shard := distributionFunction()(&v1alpha1.Cluster{}) assert.Equal(t, 0, shard) } -func TestGetClusterFilter(t *testing.T) { - filter := GetClusterFilter(2, 1) +func TestGetShardByID_NoReplicas(t *testing.T) { + os.Setenv(common.EnvControllerReplicas, "0") + distributionFunction := LegacyDistributionFunction + shard := distributionFunction()(&v1alpha1.Cluster{}) + assert.Equal(t, -1, shard) +} + +func TestGetShardByID_NoReplicasUsingHashDistributionFunction(t *testing.T) { + os.Setenv(common.EnvControllerReplicas, "0") + distributionFunction := LegacyDistributionFunction + shard := distributionFunction()(&v1alpha1.Cluster{}) + assert.Equal(t, -1, shard) +} + +func TestGetShardByID_NoReplicasUsingHashDistributionFunctionWithClusters(t *testing.T) { + db, cluster1, cluster2, cluster3, cluster4, cluster5 := createTestClusters() + // Test with replicas set to 0 + os.Setenv(common.EnvControllerReplicas, "0") + os.Setenv(common.EnvControllerShardingAlgorithm, common.RoundRobinShardingAlgorithm) + distributionFunction := RoundRobinDistributionFunction(db) + assert.Equal(t, -1, distributionFunction(nil)) + assert.Equal(t, -1, distributionFunction(&cluster1)) + assert.Equal(t, -1, distributionFunction(&cluster2)) + assert.Equal(t, -1, distributionFunction(&cluster3)) + assert.Equal(t, -1, distributionFunction(&cluster4)) + assert.Equal(t, -1, distributionFunction(&cluster5)) + +} + +func TestGetClusterFilterDefault(t *testing.T) { + shardIndex := 1 // ensuring that a shard with index 1 will process all the clusters with an "even" id (2,4,6,...) + os.Unsetenv(common.EnvControllerShardingAlgorithm) + os.Setenv(common.EnvControllerReplicas, "2") + filter := GetClusterFilter(GetDistributionFunction(nil, common.DefaultShardingAlgorithm), shardIndex) + assert.False(t, filter(&v1alpha1.Cluster{ID: "1"})) + assert.True(t, filter(&v1alpha1.Cluster{ID: "2"})) + assert.False(t, filter(&v1alpha1.Cluster{ID: "3"})) + assert.True(t, filter(&v1alpha1.Cluster{ID: "4"})) +} + +func TestGetClusterFilterLegacy(t *testing.T) { + shardIndex := 1 // ensuring that a shard with index 1 will process all the clusters with an "even" id (2,4,6,...) + os.Setenv(common.EnvControllerReplicas, "2") + os.Setenv(common.EnvControllerShardingAlgorithm, common.LegacyShardingAlgorithm) + filter := GetClusterFilter(GetDistributionFunction(nil, common.LegacyShardingAlgorithm), shardIndex) assert.False(t, filter(&v1alpha1.Cluster{ID: "1"})) assert.True(t, filter(&v1alpha1.Cluster{ID: "2"})) assert.False(t, filter(&v1alpha1.Cluster{ID: "3"})) assert.True(t, filter(&v1alpha1.Cluster{ID: "4"})) } + +func TestGetClusterFilterUnknown(t *testing.T) { + shardIndex := 1 // ensuring that a shard with index 1 will process all the clusters with an "even" id (2,4,6,...) + os.Setenv(common.EnvControllerReplicas, "2") + os.Setenv(common.EnvControllerShardingAlgorithm, "unknown") + filter := GetClusterFilter(GetDistributionFunction(nil, "unknown"), shardIndex) + assert.False(t, filter(&v1alpha1.Cluster{ID: "1"})) + assert.True(t, filter(&v1alpha1.Cluster{ID: "2"})) + assert.False(t, filter(&v1alpha1.Cluster{ID: "3"})) + assert.True(t, filter(&v1alpha1.Cluster{ID: "4"})) +} + +func TestLegacyGetClusterFilterWithFixedShard(t *testing.T) { + shardIndex := 1 // ensuring that a shard with index 1 will process all the clusters with an "even" id (2,4,6,...) + os.Setenv(common.EnvControllerReplicas, "2") + filter := GetClusterFilter(GetDistributionFunction(nil, common.DefaultShardingAlgorithm), shardIndex) + assert.False(t, filter(nil)) + assert.False(t, filter(&v1alpha1.Cluster{ID: "1"})) + assert.True(t, filter(&v1alpha1.Cluster{ID: "2"})) + assert.False(t, filter(&v1alpha1.Cluster{ID: "3"})) + assert.True(t, filter(&v1alpha1.Cluster{ID: "4"})) + + var fixedShard int64 = 4 + filter = GetClusterFilter(GetDistributionFunction(nil, common.DefaultShardingAlgorithm), int(fixedShard)) + assert.False(t, filter(&v1alpha1.Cluster{ID: "4", Shard: &fixedShard})) + + fixedShard = 1 + filter = GetClusterFilter(GetDistributionFunction(nil, common.DefaultShardingAlgorithm), int(fixedShard)) + assert.True(t, filter(&v1alpha1.Cluster{Name: "cluster4", ID: "4", Shard: &fixedShard})) + +} + +func TestRoundRobinGetClusterFilterWithFixedShard(t *testing.T) { + shardIndex := 1 // ensuring that a shard with index 1 will process all the clusters with an "even" id (2,4,6,...) + os.Setenv(common.EnvControllerReplicas, "2") + db, cluster1, cluster2, cluster3, cluster4, _ := createTestClusters() + + filter := GetClusterFilter(GetDistributionFunction(db, common.RoundRobinShardingAlgorithm), shardIndex) + assert.False(t, filter(nil)) + assert.False(t, filter(&cluster1)) + assert.True(t, filter(&cluster2)) + assert.False(t, filter(&cluster3)) + assert.True(t, filter(&cluster4)) + + // a cluster with a fixed shard should be processed by the specified exact + // same shard unless the specified shard index is greater than the number of replicas. + var fixedShard int64 = 4 + filter = GetClusterFilter(GetDistributionFunction(db, common.RoundRobinShardingAlgorithm), int(fixedShard)) + assert.False(t, filter(&v1alpha1.Cluster{Name: "cluster4", ID: "4", Shard: &fixedShard})) + + fixedShard = 1 + filter = GetClusterFilter(GetDistributionFunction(db, common.RoundRobinShardingAlgorithm), int(fixedShard)) + assert.True(t, filter(&v1alpha1.Cluster{Name: "cluster4", ID: "4", Shard: &fixedShard})) +} + +func TestGetClusterFilterLegacyHash(t *testing.T) { + shardIndex := 1 // ensuring that a shard with index 1 will process all the clusters with an "even" id (2,4,6,...) + os.Setenv(common.EnvControllerReplicas, "2") + os.Setenv(common.EnvControllerShardingAlgorithm, "hash") + db, cluster1, cluster2, cluster3, cluster4, _ := createTestClusters() + filter := GetClusterFilter(GetDistributionFunction(db, common.LegacyShardingAlgorithm), shardIndex) + assert.False(t, filter(&cluster1)) + assert.True(t, filter(&cluster2)) + assert.False(t, filter(&cluster3)) + assert.True(t, filter(&cluster4)) + + // a cluster with a fixed shard should be processed by the specified exact + // same shard unless the specified shard index is greater than the number of replicas. + var fixedShard int64 = 4 + filter = GetClusterFilter(GetDistributionFunction(db, common.LegacyShardingAlgorithm), int(fixedShard)) + assert.False(t, filter(&v1alpha1.Cluster{Name: "cluster4", ID: "4", Shard: &fixedShard})) + + fixedShard = 1 + filter = GetClusterFilter(GetDistributionFunction(db, common.LegacyShardingAlgorithm), int(fixedShard)) + assert.True(t, filter(&v1alpha1.Cluster{Name: "cluster4", ID: "4", Shard: &fixedShard})) +} + +func TestGetClusterFilterWithEnvControllerShardingAlgorithms(t *testing.T) { + db, cluster1, cluster2, cluster3, cluster4, _ := createTestClusters() + shardIndex := 1 + os.Setenv(common.EnvControllerReplicas, "2") + os.Setenv(common.EnvControllerShardingAlgorithm, common.LegacyShardingAlgorithm) + shardShouldProcessCluster := GetClusterFilter(GetDistributionFunction(db, common.LegacyShardingAlgorithm), shardIndex) + assert.False(t, shardShouldProcessCluster(&cluster1)) + assert.True(t, shardShouldProcessCluster(&cluster2)) + assert.False(t, shardShouldProcessCluster(&cluster3)) + assert.True(t, shardShouldProcessCluster(&cluster4)) + assert.False(t, shardShouldProcessCluster(nil)) + + os.Setenv(common.EnvControllerShardingAlgorithm, common.RoundRobinShardingAlgorithm) + shardShouldProcessCluster = GetClusterFilter(GetDistributionFunction(db, common.LegacyShardingAlgorithm), shardIndex) + assert.False(t, shardShouldProcessCluster(&cluster1)) + assert.True(t, shardShouldProcessCluster(&cluster2)) + assert.False(t, shardShouldProcessCluster(&cluster3)) + assert.True(t, shardShouldProcessCluster(&cluster4)) + assert.False(t, shardShouldProcessCluster(nil)) +} + +func TestGetShardByIndexModuloReplicasCountDistributionFunction2(t *testing.T) { + db, cluster1, cluster2, cluster3, cluster4, cluster5 := createTestClusters() + // Test with replicas set to 1 + os.Setenv(common.EnvControllerReplicas, "1") + distributionFunction := RoundRobinDistributionFunction(db) + assert.Equal(t, 0, distributionFunction(nil)) + assert.Equal(t, 0, distributionFunction(&cluster1)) + assert.Equal(t, 0, distributionFunction(&cluster2)) + assert.Equal(t, 0, distributionFunction(&cluster3)) + assert.Equal(t, 0, distributionFunction(&cluster4)) + assert.Equal(t, 0, distributionFunction(&cluster5)) + + // Test with replicas set to 2 + os.Setenv(common.EnvControllerReplicas, "2") + distributionFunction = RoundRobinDistributionFunction(db) + assert.Equal(t, 0, distributionFunction(nil)) + assert.Equal(t, 0, distributionFunction(&cluster1)) + assert.Equal(t, 1, distributionFunction(&cluster2)) + assert.Equal(t, 0, distributionFunction(&cluster3)) + assert.Equal(t, 1, distributionFunction(&cluster4)) + assert.Equal(t, 0, distributionFunction(&cluster5)) + + // // Test with replicas set to 3 + os.Setenv(common.EnvControllerReplicas, "3") + distributionFunction = RoundRobinDistributionFunction(db) + assert.Equal(t, 0, distributionFunction(nil)) + assert.Equal(t, 0, distributionFunction(&cluster1)) + assert.Equal(t, 1, distributionFunction(&cluster2)) + assert.Equal(t, 2, distributionFunction(&cluster3)) + assert.Equal(t, 0, distributionFunction(&cluster4)) + assert.Equal(t, 1, distributionFunction(&cluster5)) +} + +func TestGetShardByIndexModuloReplicasCountDistributionFunctionWhenClusterNumberIsHigh(t *testing.T) { + // Unit test written to evaluate the cost of calling db.ListCluster on every call of distributionFunction + // Doing that allows to accept added and removed clusters on the fly. + // Initial tests where showing that under 1024 clusters, execution time was around 400ms + // and for 4096 clusters, execution time was under 9s + // The other implementation was giving almost linear time of 400ms up to 10'000 clusters + db := dbmocks.ArgoDB{} + clusterList := &v1alpha1.ClusterList{Items: []v1alpha1.Cluster{}} + for i := 0; i < 2048; i++ { + cluster := createCluster(fmt.Sprintf("cluster-%d", i), fmt.Sprintf("%d", i)) + clusterList.Items = append(clusterList.Items, cluster) + } + db.On("ListClusters", mock.Anything).Return(clusterList, nil) + os.Setenv(common.EnvControllerReplicas, "2") + distributionFunction := RoundRobinDistributionFunction(&db) + for i, c := range clusterList.Items { + assert.Equal(t, i%2, distributionFunction(&c)) + } +} + +func TestGetShardByIndexModuloReplicasCountDistributionFunctionWhenClusterIsAddedAndRemoved(t *testing.T) { + db := dbmocks.ArgoDB{} + cluster1 := createCluster("cluster1", "1") + cluster2 := createCluster("cluster2", "2") + cluster3 := createCluster("cluster3", "3") + cluster4 := createCluster("cluster4", "4") + cluster5 := createCluster("cluster5", "5") + cluster6 := createCluster("cluster6", "6") + + clusterList := &v1alpha1.ClusterList{Items: []v1alpha1.Cluster{cluster1, cluster2, cluster3, cluster4, cluster5}} + db.On("ListClusters", mock.Anything).Return(clusterList, nil) + + // Test with replicas set to 2 + os.Setenv(common.EnvControllerReplicas, "2") + distributionFunction := RoundRobinDistributionFunction(&db) + assert.Equal(t, 0, distributionFunction(nil)) + assert.Equal(t, 0, distributionFunction(&cluster1)) + assert.Equal(t, 1, distributionFunction(&cluster2)) + assert.Equal(t, 0, distributionFunction(&cluster3)) + assert.Equal(t, 1, distributionFunction(&cluster4)) + assert.Equal(t, 0, distributionFunction(&cluster5)) + assert.Equal(t, -1, distributionFunction(&cluster6)) // as cluster6 is not in the DB, this one should not have a shard assigned + + // Now, the database knows cluster6. Shard should be assigned a proper shard + clusterList.Items = append(clusterList.Items, cluster6) + assert.Equal(t, 1, distributionFunction(&cluster6)) + + // Now, we remove the last added cluster, it should be unassigned as well + clusterList.Items = clusterList.Items[:len(clusterList.Items)-1] + assert.Equal(t, -1, distributionFunction(&cluster6)) + +} + +func TestGetShardByIndexModuloReplicasCountDistributionFunction(t *testing.T) { + db, cluster1, cluster2, _, _, _ := createTestClusters() + os.Setenv(common.EnvControllerReplicas, "2") + distributionFunction := RoundRobinDistributionFunction(db) + + // Test that the function returns the correct shard for cluster1 and cluster2 + expectedShardForCluster1 := 0 + expectedShardForCluster2 := 1 + shardForCluster1 := distributionFunction(&cluster1) + shardForCluster2 := distributionFunction(&cluster2) + + if shardForCluster1 != expectedShardForCluster1 { + t.Errorf("Expected shard for cluster1 to be %d but got %d", expectedShardForCluster1, shardForCluster1) + } + if shardForCluster2 != expectedShardForCluster2 { + t.Errorf("Expected shard for cluster2 to be %d but got %d", expectedShardForCluster2, shardForCluster2) + } +} + +func TestInferShard(t *testing.T) { + // Override the os.Hostname function to return a specific hostname for testing + defer func() { osHostnameFunction = os.Hostname }() + + osHostnameFunction = func() (string, error) { return "example-shard-3", nil } + expectedShard := 3 + actualShard, _ := InferShard() + assert.Equal(t, expectedShard, actualShard) + + osHostnameError := errors.New("cannot resolve hostname") + osHostnameFunction = func() (string, error) { return "exampleshard", osHostnameError } + _, err := InferShard() + assert.NotNil(t, err) + assert.Equal(t, err, osHostnameError) + + osHostnameFunction = func() (string, error) { return "exampleshard", nil } + _, err = InferShard() + assert.NotNil(t, err) + + osHostnameFunction = func() (string, error) { return "example-shard", nil } + _, err = InferShard() + assert.NotNil(t, err) + +} + +func createTestClusters() (*dbmocks.ArgoDB, v1alpha1.Cluster, v1alpha1.Cluster, v1alpha1.Cluster, v1alpha1.Cluster, v1alpha1.Cluster) { + db := dbmocks.ArgoDB{} + cluster1 := createCluster("cluster1", "1") + cluster2 := createCluster("cluster2", "2") + cluster3 := createCluster("cluster3", "3") + cluster4 := createCluster("cluster4", "4") + cluster5 := createCluster("cluster5", "5") + + db.On("ListClusters", mock.Anything).Return(&v1alpha1.ClusterList{Items: []v1alpha1.Cluster{ + cluster1, cluster2, cluster3, cluster4, cluster5, + }}, nil) + return &db, cluster1, cluster2, cluster3, cluster4, cluster5 +} + +func createCluster(name string, id string) v1alpha1.Cluster { + cluster := v1alpha1.Cluster{ + Name: name, + ID: id, + Server: "https://kubernetes.default.svc?" + id, + } + return cluster +} diff --git a/controller/sharding/shuffle_test.go b/controller/sharding/shuffle_test.go new file mode 100644 index 0000000000000..2baaa6a758ca9 --- /dev/null +++ b/controller/sharding/shuffle_test.go @@ -0,0 +1,83 @@ +package sharding + +import ( + "fmt" + "math" + "os" + "testing" + + "github.com/argoproj/argo-cd/v2/common" + "github.com/argoproj/argo-cd/v2/pkg/apis/application/v1alpha1" + dbmocks "github.com/argoproj/argo-cd/v2/util/db/mocks" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" +) + +func TestLargeShuffle(t *testing.T) { + t.Skip() + db := dbmocks.ArgoDB{} + clusterList := &v1alpha1.ClusterList{Items: []v1alpha1.Cluster{}} + for i := 0; i < math.MaxInt/4096; i += 256 { + //fmt.Fprintf(os.Stdout, "%d", i) + cluster := createCluster(fmt.Sprintf("cluster-%d", i), fmt.Sprintf("%d", i)) + clusterList.Items = append(clusterList.Items, cluster) + } + db.On("ListClusters", mock.Anything).Return(clusterList, nil) + // Test with replicas set to 256 + os.Setenv(common.EnvControllerReplicas, "256") + distributionFunction := RoundRobinDistributionFunction(&db) + for i, c := range clusterList.Items { + assert.Equal(t, i%2567, distributionFunction(&c)) + } + +} + +func TestShuffle(t *testing.T) { + t.Skip() + db := dbmocks.ArgoDB{} + cluster1 := createCluster("cluster1", "10") + cluster2 := createCluster("cluster2", "20") + cluster3 := createCluster("cluster3", "30") + cluster4 := createCluster("cluster4", "40") + cluster5 := createCluster("cluster5", "50") + cluster6 := createCluster("cluster6", "60") + cluster25 := createCluster("cluster6", "25") + + clusterList := &v1alpha1.ClusterList{Items: []v1alpha1.Cluster{cluster1, cluster2, cluster3, cluster4, cluster5, cluster6}} + db.On("ListClusters", mock.Anything).Return(clusterList, nil) + + // Test with replicas set to 3 + os.Setenv(common.EnvControllerReplicas, "3") + distributionFunction := RoundRobinDistributionFunction(&db) + assert.Equal(t, 0, distributionFunction(nil)) + assert.Equal(t, 0, distributionFunction(&cluster1)) + assert.Equal(t, 1, distributionFunction(&cluster2)) + assert.Equal(t, 2, distributionFunction(&cluster3)) + assert.Equal(t, 0, distributionFunction(&cluster4)) + assert.Equal(t, 1, distributionFunction(&cluster5)) + assert.Equal(t, 2, distributionFunction(&cluster6)) + + // Now, we remove cluster1, it should be unassigned, and all the other should be resuffled + clusterList.Items = Remove(clusterList.Items, 0) + assert.Equal(t, -1, distributionFunction(&cluster1)) + assert.Equal(t, 0, distributionFunction(&cluster2)) + assert.Equal(t, 1, distributionFunction(&cluster3)) + assert.Equal(t, 2, distributionFunction(&cluster4)) + assert.Equal(t, 0, distributionFunction(&cluster5)) + assert.Equal(t, 1, distributionFunction(&cluster6)) + + // Now, we add a cluster with an id=25 so it will be placed right after cluster2 + clusterList.Items = append(clusterList.Items, cluster25) + assert.Equal(t, -1, distributionFunction(&cluster1)) + assert.Equal(t, 0, distributionFunction(&cluster2)) + assert.Equal(t, 1, distributionFunction(&cluster25)) + assert.Equal(t, 2, distributionFunction(&cluster3)) + assert.Equal(t, 0, distributionFunction(&cluster4)) + assert.Equal(t, 1, distributionFunction(&cluster5)) + assert.Equal(t, 2, distributionFunction(&cluster6)) + +} + +func Remove(slice []v1alpha1.Cluster, s int) []v1alpha1.Cluster { + return append(slice[:s], slice[s+1:]...) +} diff --git a/docs/operator-manual/argocd-cmd-params-cm.yaml b/docs/operator-manual/argocd-cmd-params-cm.yaml index 4aeb9c3bbbe34..47ec108c6512e 100644 --- a/docs/operator-manual/argocd-cmd-params-cm.yaml +++ b/docs/operator-manual/argocd-cmd-params-cm.yaml @@ -57,6 +57,8 @@ data: controller.resource.health.persist: "true" # Cache expiration default (default 24h0m0s) controller.default.cache.expiration: "24h0m0s" + # Sharding algorithm used to balance clusters accross application controller shards (default "legacy") + controller.sharding.algorithm: legacy # Number of allowed concurrent kubectl fork/execs. Any value less than 1 means no limit. controller.kubectl.parallelism.limit: "20" diff --git a/docs/operator-manual/high_availability.md b/docs/operator-manual/high_availability.md index 8e69c0cfb01ef..56030fa2e9a57 100644 --- a/docs/operator-manual/high_availability.md +++ b/docs/operator-manual/high_availability.md @@ -80,6 +80,34 @@ spec: value: "2" ``` +* The shard distribution algorithm of the `argocd-application-controller` can be set by using the `--sharding-method` parameter. Supported sharding methods are : [legacy (default), round-robin]. `legacy` mode uses an `uid` based distribution (non-uniform). `round-robin` uses an equal distribution across all shards. The `--sharding-method` parameter can also be overriden by setting the key `controller.sharding.algorithm` in the `argocd-cmd-params-cm` `configMap` (preferably) or by setting the `ARGOCD_CONTROLLER_SHARDING_ALGORITHM` environment variable and by specifiying the same possible values. + +!!! warning "Alpha Feature" + The `round-robin` shard distribution algorithm is an experimental feature. Reshuffling is known to occur in certain scenarios with cluster removal. If the cluster at rank-0 is removed, reshuffling all clusters across shards will occur and may temporarly have negative performance impacts. + +* A cluster can be manually assigned and forced to a `shard` by patching the `shard` field in the cluster secret to contain the shard number, e.g. +```yaml +apiVersion: v1 +kind: Secret +metadata: + name: mycluster-secret + labels: + argocd.argoproj.io/secret-type: cluster +type: Opaque +stringData: + shard: 1 + name: mycluster.com + server: https://mycluster.com + config: | + { + "bearerToken": "", + "tlsClientConfig": { + "insecure": false, + "caData": "" + } + } +``` + * `ARGOCD_ENABLE_GRPC_TIME_HISTOGRAM` - environment variable that enables collecting RPC performance metrics. Enable it if you need to troubleshoot performance issues. Note: This metric is expensive to both query and store! **metrics** diff --git a/docs/operator-manual/server-commands/argocd-application-controller.md b/docs/operator-manual/server-commands/argocd-application-controller.md index fb27a3c176e71..d21763afa7404 100644 --- a/docs/operator-manual/server-commands/argocd-application-controller.md +++ b/docs/operator-manual/server-commands/argocd-application-controller.md @@ -59,6 +59,7 @@ argocd-application-controller [flags] --sentinel stringArray Redis sentinel hostname and port (e.g. argocd-redis-ha-announce-0:6379). --sentinelmaster string Redis sentinel master group name. (default "master") --server string The address and port of the Kubernetes API server + --sharding-method string Enables choice of sharding method. Supported sharding methods are : [legacy, round-robin] (default "legacy") --status-processors int Number of application status processors (default 20) --tls-server-name string If provided, this name will be used to validate server certificate. If this is not provided, hostname used to contact the server is used. --token string Bearer token for authentication to the API server diff --git a/manifests/base/application-controller/argocd-application-controller-statefulset.yaml b/manifests/base/application-controller/argocd-application-controller-statefulset.yaml index d1a7991bd036b..270fa05bcc62e 100644 --- a/manifests/base/application-controller/argocd-application-controller-statefulset.yaml +++ b/manifests/base/application-controller/argocd-application-controller-statefulset.yaml @@ -143,6 +143,12 @@ spec: name: argocd-cmd-params-cm key: application.namespaces optional: true + - name: ARGOCD_CONTROLLER_SHARDING_ALGORITHM + valueFrom: + configMapKeyRef: + name: argocd-cmd-params-cm + key: controller.sharding.algorithm + optional: true - name: ARGOCD_APPLICATION_CONTROLLER_KUBECTL_PARALLELISM_LIMIT valueFrom: configMapKeyRef: diff --git a/manifests/core-install.yaml b/manifests/core-install.yaml index 1668aef487445..0352d6b068cd5 100644 --- a/manifests/core-install.yaml +++ b/manifests/core-install.yaml @@ -18899,6 +18899,12 @@ spec: key: application.namespaces name: argocd-cmd-params-cm optional: true + - name: ARGOCD_CONTROLLER_SHARDING_ALGORITHM + valueFrom: + configMapKeyRef: + key: controller.sharding.algorithm + name: argocd-cmd-params-cm + optional: true - name: ARGOCD_APPLICATION_CONTROLLER_KUBECTL_PARALLELISM_LIMIT valueFrom: configMapKeyRef: diff --git a/manifests/ha/install.yaml b/manifests/ha/install.yaml index 80ad0036798ef..e8afa2dd2b17a 100644 --- a/manifests/ha/install.yaml +++ b/manifests/ha/install.yaml @@ -20686,6 +20686,12 @@ spec: key: application.namespaces name: argocd-cmd-params-cm optional: true + - name: ARGOCD_CONTROLLER_SHARDING_ALGORITHM + valueFrom: + configMapKeyRef: + key: controller.sharding.algorithm + name: argocd-cmd-params-cm + optional: true - name: ARGOCD_APPLICATION_CONTROLLER_KUBECTL_PARALLELISM_LIMIT valueFrom: configMapKeyRef: diff --git a/manifests/ha/namespace-install.yaml b/manifests/ha/namespace-install.yaml index a23472c4901a1..b193db90768c5 100644 --- a/manifests/ha/namespace-install.yaml +++ b/manifests/ha/namespace-install.yaml @@ -2714,6 +2714,12 @@ spec: key: application.namespaces name: argocd-cmd-params-cm optional: true + - name: ARGOCD_CONTROLLER_SHARDING_ALGORITHM + valueFrom: + configMapKeyRef: + key: controller.sharding.algorithm + name: argocd-cmd-params-cm + optional: true - name: ARGOCD_APPLICATION_CONTROLLER_KUBECTL_PARALLELISM_LIMIT valueFrom: configMapKeyRef: diff --git a/manifests/install.yaml b/manifests/install.yaml index f90c3b62ef916..abee52d790b54 100644 --- a/manifests/install.yaml +++ b/manifests/install.yaml @@ -19726,6 +19726,12 @@ spec: key: application.namespaces name: argocd-cmd-params-cm optional: true + - name: ARGOCD_CONTROLLER_SHARDING_ALGORITHM + valueFrom: + configMapKeyRef: + key: controller.sharding.algorithm + name: argocd-cmd-params-cm + optional: true - name: ARGOCD_APPLICATION_CONTROLLER_KUBECTL_PARALLELISM_LIMIT valueFrom: configMapKeyRef: diff --git a/manifests/namespace-install.yaml b/manifests/namespace-install.yaml index e06c931218b21..a12501e6e89ec 100644 --- a/manifests/namespace-install.yaml +++ b/manifests/namespace-install.yaml @@ -1754,6 +1754,12 @@ spec: key: application.namespaces name: argocd-cmd-params-cm optional: true + - name: ARGOCD_CONTROLLER_SHARDING_ALGORITHM + valueFrom: + configMapKeyRef: + key: controller.sharding.algorithm + name: argocd-cmd-params-cm + optional: true - name: ARGOCD_APPLICATION_CONTROLLER_KUBECTL_PARALLELISM_LIMIT valueFrom: configMapKeyRef: diff --git a/util/env/env.go b/util/env/env.go index dc1549082db10..1b49a0c322065 100644 --- a/util/env/env.go +++ b/util/env/env.go @@ -1,6 +1,7 @@ package env import ( + "math" "os" "strconv" "strings" @@ -21,20 +22,24 @@ func ParseNumFromEnv(env string, defaultValue, min, max int) int { if str == "" { return defaultValue } - num, err := strconv.Atoi(str) + num, err := strconv.ParseInt(str, 10, 0) if err != nil { log.Warnf("Could not parse '%s' as a number from environment %s", str, env) return defaultValue } - if num < min { + if num > math.MaxInt || num < math.MinInt { + log.Warnf("Value in %s is %d is outside of the min and max %d allowed values. Using default %d", env, num, min, defaultValue) + return defaultValue + } + if int(num) < min { log.Warnf("Value in %s is %d, which is less than minimum %d allowed", env, num, min) return defaultValue } - if num > max { + if int(num) > max { log.Warnf("Value in %s is %d, which is greater than maximum %d allowed", env, num, max) return defaultValue } - return num + return int(num) } // Helper function to parse a int64 from an environment variable. Returns a