Skip to content

Commit

Permalink
fix: Fixes sharding placement algorithm and allows development of alt…
Browse files Browse the repository at this point in the history
…ernative algorithms (argoproj#13018)

* fix: Extraction of DistributionFunction to allow passing different type of functions to filter clusters by shard
- Adding unit tests for sharding
- Refresh clusters list on DistributionFunction call

Signed-off-by: Akram Ben Aissi <[email protected]>
Signed-off-by: ishitasequeira <[email protected]>

* fix: Incorrect conversion of an integer with architecture-dependent bit size from [strconv.Atoi](1) to a lower bit size type uint32 without an upper bound check.

Signed-off-by: Akram Ben Aissi <[email protected]>

* Added config to switch to round-robin sharding

Signed-off-by: Raghavi Shirur <[email protected]>
Signed-off-by: Akram Ben Aissi <[email protected]>

* Documenting sharding more, adding shuffling tests (skipped), re-enable sharding algo env var

Signed-off-by: Akram Ben Aissi <[email protected]>

* Allow configuration through argocd-cmd-params-cm configMap and key: controller.sharding.algorithm

Signed-off-by: Akram Ben Aissi <[email protected]>

* De-duplicate code, remove reflection for default case, shorten distribution methods name, ran codegen on manifests
Signed-off-by: Akram Ben Aissi <[email protected]>

Signed-off-by: Akram Ben Aissi <[email protected]>

---------

Signed-off-by: Akram Ben Aissi <[email protected]>
Signed-off-by: ishitasequeira <[email protected]>
Signed-off-by: Raghavi Shirur <[email protected]>
Co-authored-by: Raghavi Shirur <[email protected]>
  • Loading branch information
akram and raghavi101 authored Jun 5, 2023
1 parent 0018fcc commit ee983fe
Show file tree
Hide file tree
Showing 18 changed files with 637 additions and 45 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
cacheutil "github.com/argoproj/argo-cd/v2/util/cache"
appstatecache "github.com/argoproj/argo-cd/v2/util/cache/appstate"
"github.com/argoproj/argo-cd/v2/util/cli"
"github.com/argoproj/argo-cd/v2/util/db"
"github.com/argoproj/argo-cd/v2/util/env"
"github.com/argoproj/argo-cd/v2/util/errors"
kubeutil "github.com/argoproj/argo-cd/v2/util/kube"
Expand Down Expand Up @@ -62,6 +63,7 @@ func NewCommand() *cobra.Command {
otlpAddress string
applicationNamespaces []string
persistResourceHealth bool
shardingAlgorithm string
)
var command = cobra.Command{
Use: cliName,
Expand Down Expand Up @@ -134,7 +136,7 @@ func NewCommand() *cobra.Command {
appController.InvalidateProjectsCache()
}))
kubectl := kubeutil.NewKubectl()
clusterFilter := getClusterFilter()
clusterFilter := getClusterFilter(kubeClient, settingsMgr, shardingAlgorithm)
appController, err = controller.NewApplicationController(
namespace,
settingsMgr,
Expand All @@ -152,7 +154,8 @@ func NewCommand() *cobra.Command {
kubectlParallelismLimit,
persistResourceHealth,
clusterFilter,
applicationNamespaces)
applicationNamespaces,
)
errors.CheckError(err)
cacheutil.CollectMetrics(redisClient, appController.GetMetricsServer())

Expand Down Expand Up @@ -195,13 +198,14 @@ func NewCommand() *cobra.Command {
command.Flags().StringVar(&otlpAddress, "otlp-address", env.StringFromEnv("ARGOCD_APPLICATION_CONTROLLER_OTLP_ADDRESS", ""), "OpenTelemetry collector address to send traces to")
command.Flags().StringSliceVar(&applicationNamespaces, "application-namespaces", env.StringsFromEnv("ARGOCD_APPLICATION_NAMESPACES", []string{}, ","), "List of additional namespaces that applications are allowed to be reconciled from")
command.Flags().BoolVar(&persistResourceHealth, "persist-resource-health", env.ParseBoolFromEnv("ARGOCD_APPLICATION_CONTROLLER_PERSIST_RESOURCE_HEALTH", true), "Enables storing the managed resources health in the Application CRD")
command.Flags().StringVar(&shardingAlgorithm, "sharding-method", env.StringFromEnv(common.EnvControllerShardingAlgorithm, common.DefaultShardingAlgorithm), "Enables choice of sharding method. Supported sharding methods are : [legacy, round-robin] ")
cacheSrc = appstatecache.AddCacheFlagsToCmd(&command, func(client *redis.Client) {
redisClient = client
})
return &command
}

func getClusterFilter() func(cluster *v1alpha1.Cluster) bool {
func getClusterFilter(kubeClient *kubernetes.Clientset, settingsMgr *settings.SettingsManager, shardingAlgorithm string) sharding.ClusterFilterFunction {
replicas := env.ParseNumFromEnv(common.EnvControllerReplicas, 0, 0, math.MaxInt32)
shard := env.ParseNumFromEnv(common.EnvControllerShard, -1, -math.MaxInt32, math.MaxInt32)
var clusterFilter func(cluster *v1alpha1.Cluster) bool
Expand All @@ -212,7 +216,10 @@ func getClusterFilter() func(cluster *v1alpha1.Cluster) bool {
errors.CheckError(err)
}
log.Infof("Processing clusters from shard %d", shard)
clusterFilter = sharding.GetClusterFilter(replicas, shard)
db := db.NewDB(settingsMgr.GetNamespace(), settingsMgr, kubeClient)
log.Infof("Using filter function: %s", shardingAlgorithm)
distributionFunction := sharding.GetDistributionFunction(db, shardingAlgorithm)
clusterFilter = sharding.GetClusterFilter(distributionFunction, shard)
} else {
log.Info("Processing all cluster shards")
}
Expand Down
8 changes: 6 additions & 2 deletions cmd/argocd/commands/admin/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"k8s.io/client-go/kubernetes/fake"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/utils/pointer"

cmdutil "github.com/argoproj/argo-cd/v2/cmd/util"
"github.com/argoproj/argo-cd/v2/common"
Expand Down Expand Up @@ -115,10 +116,13 @@ func loadClusters(ctx context.Context, kubeClient *kubernetes.Clientset, appClie
}
batch := clustersList.Items[batchStart:batchEnd]
_ = kube.RunAllAsync(len(batch), func(i int) error {
cluster := batch[i]
clusterShard := 0
cluster := batch[i]
if replicas > 0 {
clusterShard = sharding.GetShardByID(cluster.ID, replicas)
distributionFunction := sharding.GetDistributionFunction(argoDB, common.DefaultShardingAlgorithm)
distributionFunction(&cluster)
cluster.Shard = pointer.Int64Ptr(int64(clusterShard))
log.Infof("Cluster with uid: %s will be processed by shard %d", cluster.ID, clusterShard)
}

if shard != -1 && clusterShard != shard {
Expand Down
8 changes: 8 additions & 0 deletions common/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,12 @@ const (

// PasswordPatten is the default password patten
PasswordPatten = `^.{8,32}$`

//LegacyShardingAlgorithm is the default value for Sharding Algorithm it uses an `uid` based distribution (non-uniform)
LegacyShardingAlgorithm = "legacy"
//RoundRobinShardingAlgorithm is a flag value that can be opted for Sharding Algorithm it uses an equal distribution accross all shards
RoundRobinShardingAlgorithm = "round-robin"
DefaultShardingAlgorithm = LegacyShardingAlgorithm
)

// Dex related constants
Expand Down Expand Up @@ -203,6 +209,8 @@ const (
EnvControllerReplicas = "ARGOCD_CONTROLLER_REPLICAS"
// EnvControllerShard is the shard number that should be handled by controller
EnvControllerShard = "ARGOCD_CONTROLLER_SHARD"
// EnvControllerShardingAlgorithm is the distribution sharding algorithm to be used: legacy or round-robin
EnvControllerShardingAlgorithm = "ARGOCD_CONTROLLER_SHARDING_ALGORITHM"
// EnvEnableGRPCTimeHistogramEnv enables gRPC metrics collection
EnvEnableGRPCTimeHistogramEnv = "ARGOCD_ENABLE_GRPC_TIME_HISTOGRAM"
// EnvGithubAppCredsExpirationDuration controls the caching of Github app credentials. This value is in minutes (default: 60)
Expand Down
15 changes: 11 additions & 4 deletions controller/appcontroller.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,14 +41,17 @@ import (
"github.com/argoproj/argo-cd/v2/common"
statecache "github.com/argoproj/argo-cd/v2/controller/cache"
"github.com/argoproj/argo-cd/v2/controller/metrics"
"github.com/argoproj/argo-cd/v2/controller/sharding"
"github.com/argoproj/argo-cd/v2/pkg/apis/application"
appv1 "github.com/argoproj/argo-cd/v2/pkg/apis/application/v1alpha1"
argov1alpha "github.com/argoproj/argo-cd/v2/pkg/apis/application/v1alpha1"
appclientset "github.com/argoproj/argo-cd/v2/pkg/client/clientset/versioned"
"github.com/argoproj/argo-cd/v2/pkg/client/informers/externalversions/application/v1alpha1"
applisters "github.com/argoproj/argo-cd/v2/pkg/client/listers/application/v1alpha1"
"github.com/argoproj/argo-cd/v2/reposerver/apiclient"
"github.com/argoproj/argo-cd/v2/util/argo"
argodiff "github.com/argoproj/argo-cd/v2/util/argo/diff"

appstatecache "github.com/argoproj/argo-cd/v2/util/cache/appstate"
"github.com/argoproj/argo-cd/v2/util/db"
"github.com/argoproj/argo-cd/v2/util/errors"
Expand Down Expand Up @@ -229,10 +232,12 @@ func (ctrl *ApplicationController) InvalidateProjectsCache(names ...string) {
ctrl.projByNameCache.Delete(name)
}
} else {
ctrl.projByNameCache.Range(func(key, _ interface{}) bool {
ctrl.projByNameCache.Delete(key)
return true
})
if ctrl != nil {
ctrl.projByNameCache.Range(func(key, _ interface{}) bool {
ctrl.projByNameCache.Delete(key)
return true
})
}
}
}

Expand Down Expand Up @@ -2010,3 +2015,5 @@ func (ctrl *ApplicationController) toAppKey(appName string) string {
func (ctrl *ApplicationController) toAppQualifiedName(appName, appNamespace string) string {
return fmt.Sprintf("%s/%s", appNamespace, appName)
}

type ClusterFilterFunction func(c *argov1alpha.Cluster, distributionFunction sharding.DistributionFunction) bool
1 change: 0 additions & 1 deletion controller/appcontroller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,6 @@ metadata:
namespace: ` + test.FakeArgoCDNamespace + `
type: Opaque
`

var fakeApp = `
apiVersion: argoproj.io/v1alpha1
kind: Application
Expand Down
155 changes: 133 additions & 22 deletions controller/sharding/sharding.go
Original file line number Diff line number Diff line change
@@ -1,17 +1,127 @@
package sharding

import (
"context"
"fmt"
"hash/fnv"
"math"
"os"
"sort"
"strconv"
"strings"

"github.com/argoproj/argo-cd/v2/common"
"github.com/argoproj/argo-cd/v2/pkg/apis/application/v1alpha1"

"github.com/argoproj/argo-cd/v2/util/db"
"github.com/argoproj/argo-cd/v2/util/env"
log "github.com/sirupsen/logrus"
)

// Make it overridable for testing
var osHostnameFunction = os.Hostname

type DistributionFunction func(c *v1alpha1.Cluster) int
type ClusterFilterFunction func(c *v1alpha1.Cluster) bool

// GetClusterFilter returns a ClusterFilterFunction which is a function taking a cluster as a parameter
// and returns wheter or not the cluster should be processed by a given shard. It calls the distributionFunction
// to determine which shard will process the cluster, and if the given shard is equal to the calculated shard
// the function will return true.
func GetClusterFilter(distributionFunction DistributionFunction, shard int) ClusterFilterFunction {
replicas := env.ParseNumFromEnv(common.EnvControllerReplicas, 0, 0, math.MaxInt32)
return func(c *v1alpha1.Cluster) bool {
clusterShard := 0
if c != nil && c.Shard != nil {
requestedShard := int(*c.Shard)
if requestedShard < replicas {
clusterShard = requestedShard
} else {
log.Warnf("Specified cluster shard (%d) for cluster: %s is greater than the number of available shard. Assigning automatically.", requestedShard, c.Name)
}
} else {
clusterShard = distributionFunction(c)
}
return clusterShard == shard
}
}

// GetDistributionFunction returns which DistributionFunction should be used based on the passed algorithm and
// the current datas.
func GetDistributionFunction(db db.ArgoDB, shardingAlgorithm string) DistributionFunction {
log.Infof("Using filter function: %s", shardingAlgorithm)
distributionFunction := LegacyDistributionFunction()
switch shardingAlgorithm {
case common.RoundRobinShardingAlgorithm:
distributionFunction = RoundRobinDistributionFunction(db)
case common.LegacyShardingAlgorithm:
distributionFunction = LegacyDistributionFunction()
default:
log.Warnf("distribution type %s is not supported, defaulting to %s", shardingAlgorithm, common.DefaultShardingAlgorithm)
}
return distributionFunction
}

// LegacyDistributionFunction returns a DistributionFunction using a stable distribution algorithm:
// for a given cluster the function will return the shard number based on the cluster id. This function
// is lightweight and can be distributed easily, however, it does not ensure an homogenous distribution as
// some shards may get assigned more clusters than others. It is the legacy function distribution that is
// kept for compatibility reasons
func LegacyDistributionFunction() DistributionFunction {
replicas := env.ParseNumFromEnv(common.EnvControllerReplicas, 0, 0, math.MaxInt32)
return func(c *v1alpha1.Cluster) int {
if replicas == 0 {
return -1
}
if c == nil {
return 0
}
id := c.ID
log.Debugf("Calculating cluster shard for cluster id: %s", id)
if id == "" {
return 0
} else {
h := fnv.New32a()
_, _ = h.Write([]byte(id))
shard := int32(h.Sum32() % uint32(replicas))
log.Infof("Cluster with id=%s will be processed by shard %d", id, shard)
return int(shard)
}
}
}

// RoundRobinDistributionFunction returns a DistributionFunction using an homogeneous distribution algorithm:
// for a given cluster the function will return the shard number based on the modulo of the cluster rank in
// the cluster's list sorted by uid on the shard number.
// This function ensures an homogenous distribution: each shards got assigned the same number of
// clusters +/-1 , but with the drawback of a reshuffling of clusters accross shards in case of some changes
// in the cluster list
func RoundRobinDistributionFunction(db db.ArgoDB) DistributionFunction {
replicas := env.ParseNumFromEnv(common.EnvControllerReplicas, 0, 0, math.MaxInt32)
return func(c *v1alpha1.Cluster) int {
if replicas > 0 {
if c == nil { // in-cluster does not necessarly have a secret assigned. So we are receiving a nil cluster here.
return 0
} else {
clusterIndexdByClusterIdMap := createClusterIndexByClusterIdMap(db)
clusterIndex, ok := clusterIndexdByClusterIdMap[c.ID]
if !ok {
log.Warnf("Cluster with id=%s not found in cluster map.", c.ID)
return -1
}
shard := int(clusterIndex % replicas)
log.Infof("Cluster with id=%s will be processed by shard %d", c.ID, shard)
return shard
}
}
log.Warnf("The number of replicas (%d) is lower than 1", replicas)
return -1
}
}

// InferShard extracts the shard index based on its hostname.
func InferShard() (int, error) {
hostname, err := os.Hostname()
hostname, err := osHostnameFunction()
if err != nil {
return 0, err
}
Expand All @@ -23,31 +133,32 @@ func InferShard() (int, error) {
if err != nil {
return 0, fmt.Errorf("hostname should ends with shard number separated by '-' but got: %s", hostname)
}
return shard, nil
return int(shard), nil
}

// GetShardByID calculates cluster shard as `clusterSecret.UID % replicas count`
func GetShardByID(id string, replicas int) int {
if id == "" {
return 0
} else {
h := fnv.New32a()
_, _ = h.Write([]byte(id))
return int(h.Sum32() % uint32(replicas))
func getSortedClustersList(db db.ArgoDB) []v1alpha1.Cluster {
ctx := context.Background()
clustersList, dbErr := db.ListClusters(ctx)
if dbErr != nil {
log.Warnf("Error while querying clusters list from database: %v", dbErr)
return []v1alpha1.Cluster{}
}
clusters := clustersList.Items
sort.Slice(clusters, func(i, j int) bool {
return clusters[i].ID < clusters[j].ID
})
return clusters
}

func GetClusterFilter(replicas int, shard int) func(c *v1alpha1.Cluster) bool {
return func(c *v1alpha1.Cluster) bool {
clusterShard := 0
// cluster might be nil if app is using invalid cluster URL, assume shard 0 in this case.
if c != nil {
if c.Shard != nil {
clusterShard = int(*c.Shard)
} else {
clusterShard = GetShardByID(c.ID, replicas)
}
}
return clusterShard == shard
func createClusterIndexByClusterIdMap(db db.ArgoDB) map[string]int {
clusters := getSortedClustersList(db)
log.Debugf("ClustersList has %d items", len(clusters))
clusterById := make(map[string]v1alpha1.Cluster)
clusterIndexedByClusterId := make(map[string]int)
for i, cluster := range clusters {
log.Debugf("Adding cluster with id=%s and name=%s to cluster's map", cluster.ID, cluster.Name)
clusterById[cluster.ID] = cluster
clusterIndexedByClusterId[cluster.ID] = i
}
return clusterIndexedByClusterId
}
Loading

0 comments on commit ee983fe

Please sign in to comment.