From 7bb3d9ad99cf8553afbd8139c883579ff1da8ad6 Mon Sep 17 00:00:00 2001 From: Yash Mehrotra Date: Mon, 23 Oct 2023 09:37:35 +0530 Subject: [PATCH 1/5] feat: new job type for sync canary jobs --- api/context/context.go | 3 + api/v1/canary_types.go | 5 ++ cmd/operator.go | 16 ++++ go.mod | 2 + go.sum | 27 ++++++ pkg/controllers/canary_controller.go | 5 +- pkg/db/canary.go | 21 ++--- pkg/jobs/canary/canary_jobs.go | 109 ++++++++++++++----------- pkg/jobs/canary/sync_upstream.go | 4 +- pkg/jobs/jobs.go | 9 +- pkg/topology/checks/component_check.go | 3 +- 11 files changed, 139 insertions(+), 65 deletions(-) diff --git a/api/context/context.go b/api/context/context.go index 46700dce0..16afffbd2 100644 --- a/api/context/context.go +++ b/api/context/context.go @@ -11,6 +11,7 @@ import ( "github.com/flanksource/commons/logger" ctemplate "github.com/flanksource/commons/template" "github.com/flanksource/duty" + dutyCtx "github.com/flanksource/duty/context" "github.com/flanksource/duty/models" "github.com/flanksource/duty/types" "github.com/flanksource/kommons" @@ -19,6 +20,8 @@ import ( "k8s.io/client-go/kubernetes" ) +var DefaultContext dutyCtx.Context + type KubernetesContext struct { gocontext.Context Kommons *kommons.Client diff --git a/api/v1/canary_types.go b/api/v1/canary_types.go index 7528739ba..adca8d8d3 100644 --- a/api/v1/canary_types.go +++ b/api/v1/canary_types.go @@ -25,6 +25,7 @@ import ( "github.com/flanksource/canary-checker/api/external" "github.com/flanksource/commons/logger" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" ) type ResultMode string @@ -234,6 +235,10 @@ func (c Canary) GetDescription(check external.Check) string { return check.GetEndpoint() } +func (c Canary) GetNamespacedName() types.NamespacedName { + return types.NamespacedName{Name: c.Name, Namespace: c.Namespace} +} + func (c *Canary) SetRunnerName(name string) { c.Status.runnerName = name } diff --git a/cmd/operator.go b/cmd/operator.go index 6c9c473c8..3c47638b8 100644 --- a/cmd/operator.go +++ b/cmd/operator.go @@ -1,9 +1,11 @@ package cmd import ( + gocontext "context" "os" "time" + apicontext "github.com/flanksource/canary-checker/api/context" "github.com/flanksource/canary-checker/pkg/cache" "github.com/flanksource/canary-checker/pkg/db" "github.com/flanksource/canary-checker/pkg/jobs" @@ -16,9 +18,12 @@ import ( "github.com/flanksource/canary-checker/pkg" "github.com/flanksource/canary-checker/pkg/controllers" "github.com/flanksource/canary-checker/pkg/labels" + commonsCtx "github.com/flanksource/commons/context" "github.com/flanksource/commons/logger" + "github.com/flanksource/duty/context" "github.com/go-logr/zapr" "github.com/spf13/cobra" + "go.opentelemetry.io/otel" "k8s.io/apimachinery/pkg/runtime" clientgoscheme "k8s.io/client-go/kubernetes/scheme" ctrl "sigs.k8s.io/controller-runtime" @@ -73,6 +78,17 @@ func run(cmd *cobra.Command, args []string) { if err := db.Init(); err != nil { logger.Fatalf("error connecting with postgres: %v", err) } + kommonsClient, k8s, err := pkg.NewKommonsClient() + if err != nil { + logger.Warnf("failed to get kommons client, checks that read kubernetes configs will fail: %v", err) + } + + apicontext.DefaultContext = context.NewContext(gocontext.Background(), commonsCtx.WithTracer(otel.GetTracerProvider().Tracer("canary-checker"))). + WithDB(db.Gorm, db.Pool). + WithKubernetes(k8s). + WithKommons(kommonsClient). + WithNamespace(runner.WatchNamespace) + cache.PostgresCache = cache.NewPostgresCache(db.Pool) if operatorExecutor { logger.Infof("Starting executors") diff --git a/go.mod b/go.mod index 2b24625a5..1c776a0bf 100644 --- a/go.mod +++ b/go.mod @@ -269,3 +269,5 @@ require ( sigs.k8s.io/kustomize/kyaml v0.14.3 // indirect sigs.k8s.io/structured-merge-diff/v4 v4.3.0 // indirect ) + +replace github.com/flanksource/duty => ../duty diff --git a/go.sum b/go.sum index ff9a2e3aa..1d15c2748 100644 --- a/go.sum +++ b/go.sum @@ -817,6 +817,18 @@ github.com/flanksource/commons v1.17.1 h1:jd114sxRwe2VWcbG/PVVEAWsEkialL6eltbqFG github.com/flanksource/commons v1.17.1/go.mod h1:RDdQI0/QYC4GzicbDaXIvBPjWuQWKLzX8/rFBbFjG5U= github.com/flanksource/duty v1.0.205 h1:sQq+J4TMx69NnoM4XxBcJZ8P5HM5GjY/7zcuv/IQGTo= github.com/flanksource/duty v1.0.205/go.mod h1:V3fgZdrBgN47lloIz7MedwD/tq4ycHI8zFOApzUpFv4= +github.com/flanksource/commons v1.17.0 h1:rSahn6c4vyq3bPC5jsayET4y8TECRz6Q8NbooItZiGA= +github.com/flanksource/commons v1.17.0/go.mod h1:RDdQI0/QYC4GzicbDaXIvBPjWuQWKLzX8/rFBbFjG5U= +github.com/flanksource/duty v1.0.201 h1:c8r02bfuF47E2svK+qXCLHKaSqOCZZHKPj+v54eimqc= +github.com/flanksource/duty v1.0.201/go.mod h1:aO1uXnT1eVtiIcicriK4brqJLmeXgbrYWtNR0H5IkLE= +github.com/flanksource/commons v1.15.0 h1:p74hrKzIz0r3H8YN3CuB8ePJOjzPFO0BRLVmpXmeqvY= +github.com/flanksource/commons v1.15.0/go.mod h1:FMZFLcQr98JwBKuKLs44DvCQ2JNoHz5maRIzVufQ9Cs= +github.com/flanksource/commons v1.15.1 h1:cFvxQd5SBFe+q16ciz8Q2IeBMeQ7+atdACGanbW27hg= +github.com/flanksource/commons v1.15.1/go.mod h1:FMZFLcQr98JwBKuKLs44DvCQ2JNoHz5maRIzVufQ9Cs= +github.com/flanksource/duty v1.0.191 h1:acnvyTeQlfqmtyXxWprNFGK/vBTUlqkYwxEPLtXSPrk= +github.com/flanksource/duty v1.0.191/go.mod h1:ikyl/TcRy6Cc0R5b0wEHT7CecV7gyJvrDGq/4oIZHoc= +github.com/flanksource/duty v1.0.197 h1:KRw4EPAD2kcqNPkipnkHzlbf5wmLqg3JgtXqiPzCLhw= +github.com/flanksource/duty v1.0.197/go.mod h1:aO1uXnT1eVtiIcicriK4brqJLmeXgbrYWtNR0H5IkLE= github.com/flanksource/gomplate/v3 v3.20.4/go.mod h1:27BNWhzzSjDed1z8YShO6W+z6G9oZXuxfNFGd/iGSdc= github.com/flanksource/gomplate/v3 v3.20.19 h1:xl+XMYWXtlrO6FfU+VxwjNwX4/oBK3/soOtHRvUt2us= github.com/flanksource/gomplate/v3 v3.20.19/go.mod h1:2GgHZ2vWmtDspJMBfUIryOuzJSwc8jU7Kw9fDLr0TMA= @@ -1127,6 +1139,10 @@ github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ github.com/hashicorp/golang-lru v0.5.1/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= github.com/hashicorp/hcl/v2 v2.19.1 h1://i05Jqznmb2EXqa39Nsvyan2o5XyMowW5fnCKW5RPI= github.com/hashicorp/hcl/v2 v2.19.1/go.mod h1:ThLC89FV4p9MPW804KVbe/cEXoQ8NZEh+JtMeeGErHE= +github.com/hashicorp/hcl/v2 v2.18.0 h1:wYnG7Lt31t2zYkcquwgKo6MWXzRUDIeIVU5naZwHLl8= +github.com/hashicorp/hcl/v2 v2.18.0/go.mod h1:ThLC89FV4p9MPW804KVbe/cEXoQ8NZEh+JtMeeGErHE= +github.com/hashicorp/hcl/v2 v2.18.1 h1:6nxnOJFku1EuSawSD81fuviYUV8DxFr3fp2dUi3ZYSo= +github.com/hashicorp/hcl/v2 v2.18.1/go.mod h1:ThLC89FV4p9MPW804KVbe/cEXoQ8NZEh+JtMeeGErHE= github.com/henvic/httpretty v0.1.2 h1:EQo556sO0xeXAjP10eB+BZARMuvkdGqtfeS4Ntjvkiw= github.com/henvic/httpretty v0.1.2/go.mod h1:ViEsly7wgdugYtymX54pYp6Vv2wqZmNHayJ6q8tlKCc= github.com/hirochachacha/go-smb2 v1.1.0 h1:b6hs9qKIql9eVXAiN0M2wSFY5xnhbHAQoCwRKbaRTZI= @@ -1507,6 +1523,8 @@ github.com/yuin/goldmark v1.4.1/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1 github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY= github.com/yuin/gopher-lua v1.1.0 h1:BojcDhfyDWgU2f2TOzYK/g5p2gxMrku8oupLDqlnSqE= github.com/yuin/gopher-lua v1.1.0/go.mod h1:GBR0iDaNXjAgGg9zfCvksxSRnQx76gclCIb7kdAd1Pw= +github.com/zclconf/go-cty v1.14.0 h1:/Xrd39K7DXbHzlisFP9c4pHao4yyf+/Ug9LEz+Y/yhc= +github.com/zclconf/go-cty v1.14.0/go.mod h1:VvMs5i0vgZdhYawQNq5kePSpLAoz8u1xvZgrPIxfnZE= github.com/zclconf/go-cty v1.14.1 h1:t9fyA35fwjjUMcmL5hLER+e/rEPqrbCK1/OSE4SI9KA= github.com/zclconf/go-cty v1.14.1/go.mod h1:VvMs5i0vgZdhYawQNq5kePSpLAoz8u1xvZgrPIxfnZE= github.com/zeebo/assert v1.3.0/go.mod h1:Pq9JiuJQpG8JLJdtkwrJESF0Foym2/D9XMU5ciN/wJ0= @@ -2300,6 +2318,13 @@ gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gorm.io/driver/postgres v1.5.3 h1:qKGY5CPHOuj47K/VxbCXJfFvIUeqMSXXadqdCY+MbBU= gorm.io/driver/postgres v1.5.3/go.mod h1:F+LtvlFhZT7UBiA81mC9W6Su3D4WUhSboc/36QZU0gk= gorm.io/gorm v1.25.0/go.mod h1:L4uxeKpfBml98NYqVqwAdmV1a2nBtAec/cf3fpucW/k= +gorm.io/driver/postgres v1.5.2 h1:ytTDxxEv+MplXOfFe3Lzm7SjG09fcdb3Z/c056DTBx0= +gorm.io/driver/postgres v1.5.2/go.mod h1:fmpX0m2I1PKuR7mKZiEluwrP3hbs+ps7JIGMUBpCgl8= +gorm.io/driver/postgres v1.5.3 h1:qKGY5CPHOuj47K/VxbCXJfFvIUeqMSXXadqdCY+MbBU= +gorm.io/driver/postgres v1.5.3/go.mod h1:F+LtvlFhZT7UBiA81mC9W6Su3D4WUhSboc/36QZU0gk= +gorm.io/gorm v1.25.0/go.mod h1:L4uxeKpfBml98NYqVqwAdmV1a2nBtAec/cf3fpucW/k= +gorm.io/gorm v1.25.4 h1:iyNd8fNAe8W9dvtlgeRI5zSVZPsq3OpcTu37cYcpCmw= +gorm.io/gorm v1.25.4/go.mod h1:L4uxeKpfBml98NYqVqwAdmV1a2nBtAec/cf3fpucW/k= gorm.io/gorm v1.25.5 h1:zR9lOiiYf09VNh5Q1gphfyia1JpiClIWG9hQaxB/mls= gorm.io/gorm v1.25.5/go.mod h1:hbnx/Oo0ChWMn1BIhpy1oYozzpM15i4YPuHDmfYtwg8= gorm.io/plugin/prometheus v0.0.0-20230504115745-1aec2356381b h1:uHPZdwwf4+AVvAEgZ/LQR1UTub8LJ2nh0wQDW3Dt4jE= @@ -2339,6 +2364,8 @@ k8s.io/klog/v2 v2.100.1 h1:7WCHKK6K8fNhTqfBhISHQ97KrnJNFZMcQvKp7gP/tmg= k8s.io/klog/v2 v2.100.1/go.mod h1:y1WjHnz7Dj687irZUWR/WLkLc5N1YHtjLdmgWjndZn0= k8s.io/kube-openapi v0.0.0-20220328201542-3ee0da9b0b42/go.mod h1:Z/45zLw8lUo4wdiUkI+v/ImEGAvu3WatcZl3lPMR4Rk= k8s.io/kube-openapi v0.0.0-20221012153701-172d655c2280/go.mod h1:+Axhij7bCpeqhklhUTe3xmOn6bWxolyZEeyaFpjGtl4= +k8s.io/kube-openapi v0.0.0-20230717233707-2695361300d9 h1:LyMgNKD2P8Wn1iAwQU5OhxCKlKJy0sHc+PcDwFB24dQ= +k8s.io/kube-openapi v0.0.0-20230717233707-2695361300d9/go.mod h1:wZK2AVp1uHCp4VamDVgBP2COHZjqD1T68Rf0CM3YjSM= k8s.io/kube-openapi v0.0.0-20231010175941-2dd684a91f00 h1:aVUu9fTY98ivBPKR9Y5w/AuzbMm96cd3YHRTU83I780= k8s.io/kube-openapi v0.0.0-20231010175941-2dd684a91f00/go.mod h1:AsvuZPBlUDVuCdzJ87iajxtXuR9oktsTctW/R9wwouA= k8s.io/utils v0.0.0-20210802155522-efc7438f0176/go.mod h1:jPW/WVKK9YHAvNhRxK0md/EJ228hCsBRufyofKtW8HA= diff --git a/pkg/controllers/canary_controller.go b/pkg/controllers/canary_controller.go index 98686bb52..56ec2958c 100644 --- a/pkg/controllers/canary_controller.go +++ b/pkg/controllers/canary_controller.go @@ -24,6 +24,7 @@ import ( corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "github.com/flanksource/canary-checker/api/context" v1 "github.com/flanksource/canary-checker/api/v1" "github.com/flanksource/canary-checker/pkg" canaryJobs "github.com/flanksource/canary-checker/pkg/jobs/canary" @@ -105,7 +106,7 @@ func (r *CanaryReconciler) Reconcile(ctx gocontext.Context, req ctrl.Request) (c // Sync jobs if canary is created or updated if canary.Generation == 1 { - if err := canaryJobs.SyncCanaryJob(*dbCanary); err != nil { + if err := canaryJobs.SyncCanaryJob(context.DefaultContext, *dbCanary); err != nil { logger.Error(err, "failed to sync canary job") return ctrl.Result{Requeue: true, RequeueAfter: 2 * time.Minute}, err } @@ -143,7 +144,7 @@ func (r *CanaryReconciler) persistAndCacheCanary(canary *v1.Canary) (*pkg.Canary } r.CanaryCache.Set(dbCanary.ID.String(), dbCanary, cache.DefaultExpiration) - if err := canaryJobs.SyncCanaryJob(*dbCanary); err != nil { + if err := canaryJobs.SyncCanaryJob(context.DefaultContext, *dbCanary); err != nil { return nil, err } return dbCanary, nil diff --git a/pkg/db/canary.go b/pkg/db/canary.go index fa21d4ddc..c98ce72c2 100644 --- a/pkg/db/canary.go +++ b/pkg/db/canary.go @@ -1,7 +1,7 @@ package db import ( - "context" + gocontext "context" "encoding/json" "errors" "fmt" @@ -15,6 +15,7 @@ import ( "github.com/flanksource/canary-checker/pkg/utils" "github.com/flanksource/commons/logger" "github.com/flanksource/duty" + "github.com/flanksource/duty/context" "github.com/flanksource/duty/models" dutyTypes "github.com/flanksource/duty/types" "github.com/google/uuid" @@ -23,7 +24,7 @@ import ( "gorm.io/gorm/clause" ) -func GetAllCanariesForSync(namespace string) ([]pkg.Canary, error) { +func GetAllCanariesForSync(ctx context.Context, namespace string) ([]pkg.Canary, error) { query := ` SELECT json_agg( jsonb_set_lax(to_jsonb(canaries),'{checks}', ( @@ -49,7 +50,7 @@ func GetAllCanariesForSync(namespace string) ([]pkg.Canary, error) { args["namespace"] = namespace } - rows, err := Pool.Query(context.Background(), query, args) + rows, err := ctx.Pool().Query(ctx, query, args) if err != nil { return nil, err } @@ -125,9 +126,9 @@ func PersistCheck(check pkg.Check, canaryID uuid.UUID) (uuid.UUID, error) { return check.ID, nil } -func GetTransformedCheckIDs(canaryID string) ([]string, error) { +func GetTransformedCheckIDs(ctx context.Context, canaryID string) ([]string, error) { var ids []string - err := Gorm.Table("checks"). + err := ctx.DB().Table("checks"). Select("id"). Where("canary_id = ? AND transformed = true AND deleted_at IS NULL", canaryID). Find(&ids). @@ -135,7 +136,7 @@ func GetTransformedCheckIDs(canaryID string) ([]string, error) { return ids, err } -func AddCheckStatuses(ids []string, status models.CheckHealthStatus) error { +func AddCheckStatuses(ctx context.Context, ids []string, status models.CheckHealthStatus) error { if len(ids) == 0 { return nil } @@ -158,12 +159,12 @@ func AddCheckStatuses(ids []string, status models.CheckHealthStatus) error { }) } } - return Gorm.Table("check_statuses"). + return ctx.DB().Table("check_statuses"). Create(objs). Error } -func RemoveTransformedChecks(ids []string) error { +func RemoveTransformedChecks(ctx context.Context, ids []string) error { if len(ids) == 0 { return nil } @@ -171,7 +172,7 @@ func RemoveTransformedChecks(ids []string) error { "deleted_at": gorm.Expr("NOW()"), } - return Gorm.Table("checks"). + return ctx.DB().Table("checks"). Where("id in (?)", ids). Where("transformed = true"). Updates(updates). @@ -278,7 +279,7 @@ func FindCheck(canary pkg.Canary, name string) (*pkg.Check, error) { return &model, nil } -func FindDeletedChecksSince(ctx context.Context, since time.Time) ([]string, error) { +func FindDeletedChecksSince(ctx gocontext.Context, since time.Time) ([]string, error) { var ids []string err := Gorm.Model(&models.Check{}).Where("deleted_at > ?", since).Pluck("id", &ids).Error return ids, err diff --git a/pkg/jobs/canary/canary_jobs.go b/pkg/jobs/canary/canary_jobs.go index 5fb8fd60a..04767d649 100644 --- a/pkg/jobs/canary/canary_jobs.go +++ b/pkg/jobs/canary/canary_jobs.go @@ -6,7 +6,7 @@ import ( "sync" "time" - "github.com/flanksource/canary-checker/api/context" + canarycontext "github.com/flanksource/canary-checker/api/context" v1 "github.com/flanksource/canary-checker/api/v1" "github.com/flanksource/canary-checker/checks" "github.com/flanksource/canary-checker/pkg" @@ -17,6 +17,8 @@ import ( "github.com/flanksource/canary-checker/pkg/runner" "github.com/flanksource/canary-checker/pkg/utils" "github.com/flanksource/commons/logger" + "github.com/flanksource/duty/context" + dutyjob "github.com/flanksource/duty/job" "github.com/flanksource/duty/models" "github.com/flanksource/kommons" "github.com/robfig/cron/v3" @@ -51,6 +53,7 @@ func StartScanCanaryConfigs(dataFile string, configFiles []string) { type CanaryJob struct { *kommons.Client + dutyjob.JobRuntime Kubernetes kubernetes.Interface Canary v1.Canary DBCanary pkg.Canary @@ -65,21 +68,35 @@ func (job CanaryJob) GetNamespacedName() types.NamespacedName { var minimumTimeBetweenCanaryRuns = 10 * time.Second var canaryLastRuntimes = sync.Map{} -func (job CanaryJob) Run() { - if runner.IsCanaryIgnored(&job.Canary.ObjectMeta) { - return +func SyncCanary(ctx dutyjob.JobRuntime, args ...any) error { + if len(args) != 2 { + return fmt.Errorf("wrong arg count for SyncCanary: %d", len(args)) + } + dbCanary, ok := args[0].(pkg.Canary) + if !ok { + return fmt.Errorf("wrong arg type for dbCanary: %T", args[0]) + } + + canary, ok := args[1].(v1.Canary) + if !ok { + return fmt.Errorf("wrong arg type for canary: %T", args[1]) + } + + if runner.IsCanaryIgnored(&canary.ObjectMeta) { + return nil } - canaryID := job.DBCanary.ID.String() + + canaryID := dbCanary.ID.String() val, _ := concurrentJobLocks.LoadOrStore(canaryID, &sync.Mutex{}) lock, ok := val.(*sync.Mutex) if !ok { logger.Warnf("expected mutex but got %T for canary(id=%s)", lock, canaryID) - return + return nil } if !lock.TryLock() { logger.Debugf("canary (id=%s) is already running. skipping this run ...", canaryID) - return + return nil } defer lock.Unlock() @@ -93,25 +110,28 @@ func (job CanaryJob) Run() { // Skip run if job ran too recently if lastRunDelta < minimumTimeBetweenCanaryRuns { - logger.Infof("Skipping Canary[%s]:%s since it last ran %.2f seconds ago", canaryID, job.GetNamespacedName(), lastRunDelta.Seconds()) - return + logger.Infof("Skipping Canary[%s]:%s since it last ran %.2f seconds ago", canaryID, canary.GetNamespacedName(), lastRunDelta.Seconds()) + return nil } // Get transformed checks before and after, and then delete the olds ones that are not in new set - existingTransformedChecks, _ := db.GetTransformedCheckIDs(canaryID) + existingTransformedChecks, _ := db.GetTransformedCheckIDs(ctx.Context, canaryID) var transformedChecksCreated []string // Transformed checks have a delete strategy // On deletion they can either be marked healthy, unhealthy or left as is checkIDDeleteStrategyMap := make(map[string]string) - results, err := checks.RunChecks(job.NewContext()) + canaryCtx := canarycontext.New(ctx.Kommons(), ctx.Kubernetes(), ctx.DB(), ctx.Pool(), canary) + results, err := checks.RunChecks(canaryCtx) if err != nil { - logger.Errorf("error running checks for canary %s: %v", job.Canary.GetPersistedID(), err) - job.Errorf("error running checks for canary %s: %v", job.Canary.GetPersistedID(), err) - return + logger.Errorf("error running checks for canary %s: %v", canaryID, err) + return nil } + // TODO: Use ctx with object here + logPass := canary.IsTrace() || canary.IsDebug() || LogPass + logFail := canary.IsTrace() || canary.IsDebug() || LogFail for _, result := range results { - if job.LogPass && result.Pass || job.LogFail && !result.Pass { + if logPass && result.Pass || logFail && !result.Pass { logger.Infof(result.String()) } transformedChecksAdded := cache.PostgresCache.Add(pkg.FromV1(result.Canary, result.Check), pkg.FromResult(*result)) @@ -120,7 +140,7 @@ func (job CanaryJob) Run() { checkIDDeleteStrategyMap[checkID] = result.Check.GetTransformDeleteStrategy() } } - job.updateStatusAndEvent(results) + updateCanaryStatusAndEvent(canary, results) checkDeleteStrategyGroup := make(map[string][]string) checksToRemove := utils.SetDifference(existingTransformedChecks, transformedChecksCreated) @@ -137,10 +157,10 @@ func (job CanaryJob) Run() { checkDeleteStrategyGroup[status] = append(checkDeleteStrategyGroup[status], checkID) } for status, checkIDs := range checkDeleteStrategyGroup { - if err := db.AddCheckStatuses(checkIDs, models.CheckHealthStatus(status)); err != nil { + if err := db.AddCheckStatuses(ctx.Context, checkIDs, models.CheckHealthStatus(status)); err != nil { logger.Errorf("error adding statuses for transformed checks: %v", err) } - if err := db.RemoveTransformedChecks(checkIDs); err != nil { + if err := db.RemoveTransformedChecks(ctx.Context, checkIDs); err != nil { logger.Errorf("error deleting transformed checks for canary %s: %v", canaryID, err) } } @@ -148,13 +168,14 @@ func (job CanaryJob) Run() { // Update last runtime map canaryLastRuntimes.Store(canaryID, time.Now()) + return nil } -func (job *CanaryJob) NewContext() *context.Context { - return context.New(job.Client, job.Kubernetes, db.Gorm, db.Pool, job.Canary) +func (job *CanaryJob) NewContext() *canarycontext.Context { + return canarycontext.New(job.Client, job.Kubernetes, db.Gorm, db.Pool, job.Canary) } -func (job CanaryJob) updateStatusAndEvent(results []*pkg.CheckResult) { +func updateCanaryStatusAndEvent(canary v1.Canary, results []*pkg.CheckResult) { if CanaryStatusChannel == nil { return } @@ -175,8 +196,8 @@ func (job CanaryJob) updateStatusAndEvent(results []*pkg.CheckResult) { duration += result.Duration // Set uptime and latency - uptime, latency := metrics.Record(job.Canary, result) - checkKey := job.Canary.GetKey(result.Check) + uptime, latency := metrics.Record(canary, result) + checkKey := canary.GetKey(result.Check) checkStatus[checkKey] = &v1.CheckStatus{} checkStatus[checkKey].Uptime1H = uptime.String() checkStatus[checkKey].Latency1H = latency.String() @@ -192,8 +213,8 @@ func (job CanaryJob) updateStatusAndEvent(results []*pkg.CheckResult) { // Transition q := cache.QueryParams{Check: checkKey, StatusCount: 1} - if job.Canary.Status.LastTransitionedTime != nil { - q.Start = job.Canary.Status.LastTransitionedTime.Format(time.RFC3339) + if canary.Status.LastTransitionedTime != nil { + q.Start = canary.Status.LastTransitionedTime.Format(time.RFC3339) } lastStatus, err := cache.PostgresCache.Query(q) if err != nil || len(lastStatus) == 0 || len(lastStatus[0].Statuses) == 0 { @@ -206,7 +227,7 @@ func (job CanaryJob) updateStatusAndEvent(results []*pkg.CheckResult) { lastTransitionedTime = &metav1.Time{Time: time.Now()} } - push.Queue(pkg.FromV1(job.Canary, result.Check), pkg.FromResult(*result)) + push.Queue(pkg.FromV1(canary, result.Check), pkg.FromResult(*result)) // Update status message if len(messages) == 1 { @@ -235,7 +256,7 @@ func (job CanaryJob) updateStatusAndEvent(results []*pkg.CheckResult) { ErrorMessage: errorMsg, Uptime: uptimeAgg.String(), Latency: utils.Age(time.Duration(highestLatency) * time.Millisecond), - NamespacedName: job.GetNamespacedName(), + NamespacedName: canary.GetNamespacedName(), } CanaryStatusChannel <- payload @@ -255,7 +276,7 @@ type CanaryStatusPayload struct { func findCronEntry(id string) *cron.Entry { for _, entry := range CanaryScheduler.Entries() { - if entry.Job.(CanaryJob).DBCanary.ID.String() == id { + if entry.Job.(*dutyjob.Job).ID == id { return &entry } } @@ -265,7 +286,7 @@ func findCronEntry(id string) *cron.Entry { func getAllCanaryIDsInCron() []string { var ids []string for _, entry := range CanaryScheduler.Entries() { - ids = append(ids, entry.Job.(CanaryJob).DBCanary.ID.String()) + ids = append(ids, entry.Job.(*dutyjob.Job).ID) } return ids } @@ -295,7 +316,7 @@ func ScanCanaryConfigs() { var canaryUpdateTimeCache = sync.Map{} // TODO: Refactor to use database object instead of kubernetes -func SyncCanaryJob(dbCanary pkg.Canary) error { +func SyncCanaryJob(ctx context.Context, dbCanary pkg.Canary) error { canary, err := dbCanary.ToV1() if err != nil { return err @@ -313,21 +334,14 @@ func SyncCanaryJob(dbCanary pkg.Canary) error { if Kommons == nil { var err error Kommons, Kubernetes, err = pkg.NewKommonsClient() + ctx = ctx.WithKommons(Kommons).WithKubernetes(Kubernetes) if err != nil { logger.Warnf("Failed to get kommons client, features that read kubernetes config will fail: %v", err) } } - job := CanaryJob{ - Client: Kommons, - Kubernetes: Kubernetes, - Canary: *canary, - DBCanary: dbCanary, - LogPass: canary.IsTrace() || canary.IsDebug() || LogPass, - LogFail: canary.IsTrace() || canary.IsDebug() || LogFail, - } - updateTime, exists := canaryUpdateTimeCache.Load(dbCanary.ID.String()) + newJob := dutyjob.NewJob(ctx, "SyncCanaryJob", canary.Spec.GetSchedule(), SyncCanary, dbCanary, canary).SetID(dbCanary.ID.String()) entry := findCronEntry(dbCanary.ID.String()) if !exists || dbCanary.UpdatedAt.After(updateTime.(time.Time)) || entry == nil { // Remove entry if it exists @@ -336,11 +350,11 @@ func SyncCanaryJob(dbCanary pkg.Canary) error { } // Schedule canary for the first time - entryID, err := CanaryScheduler.AddJob(canary.Spec.GetSchedule(), job) - if err != nil { + if err := newJob.AddToScheduler(CanaryScheduler); err != nil { return fmt.Errorf("failed to schedule canary %s/%s: %v", canary.Namespace, canary.Name, err) } - entry = utils.Ptr(CanaryScheduler.Entry(entryID)) + + entry = newJob.GetEntry(CanaryScheduler) logger.Infof("Scheduled %s: %s", canary, canary.Spec.GetSchedule()) canaryUpdateTimeCache.Store(dbCanary.ID.String(), dbCanary.UpdatedAt) @@ -354,17 +368,17 @@ func SyncCanaryJob(dbCanary pkg.Canary) error { return nil } -func SyncCanaryJobs() { - logger.Debugf("Syncing canary jobs") +func SyncCanaryJobs(ctx dutyjob.JobRuntime, _ ...any) error { + ctx.Debugf("Syncing canary jobs") - canaries, err := db.GetAllCanariesForSync(runner.WatchNamespace) + canaries, err := db.GetAllCanariesForSync(ctx.Context, runner.WatchNamespace) if err != nil { logger.Errorf("Failed to get canaries: %v", err) jobHistory := models.NewJobHistory("SyncCanaries", "canary", "").Start() logIfError(db.PersistJobHistory(jobHistory.AddError(err.Error()).End()), "failed to persist job history [SyncCanaries]") - return + return err } existingIDsInCron := getAllCanaryIDsInCron() @@ -373,7 +387,7 @@ func SyncCanaryJobs() { jobHistory := models.NewJobHistory("CanarySync", "canary", c.ID.String()).Start() idsInNewFetch = append(idsInNewFetch, c.ID.String()) - if err := SyncCanaryJob(c); err != nil { + if err := SyncCanaryJob(ctx.Context, c); err != nil { logger.Errorf("Error syncing canary[%s]: %v", c.ID, err.Error()) logIfError(db.PersistJobHistory(jobHistory.AddError(err.Error()).End()), "failed to persist job history [CanarySync]") continue @@ -386,6 +400,7 @@ func SyncCanaryJobs() { } logger.Infof("Synced canary jobs %d", len(CanaryScheduler.Entries())) + return nil } func DeleteCanaryJob(id string) { diff --git a/pkg/jobs/canary/sync_upstream.go b/pkg/jobs/canary/sync_upstream.go index 7a31b448a..e16d95f11 100644 --- a/pkg/jobs/canary/sync_upstream.go +++ b/pkg/jobs/canary/sync_upstream.go @@ -37,12 +37,12 @@ const ( // that are missing on the upstream. func ReconcileChecks() { jobHistory := models.NewJobHistory("PushChecksToUpstream", "Canary", "") + _ = db.PersistJobHistory(jobHistory.Start()) defer func() { _ = db.PersistJobHistory(jobHistory.End()) }() - ctx := dutyContext.NewContext(gocontext.TODO()).WithDB(db.Gorm, db.Pool) reconciler := upstream.NewUpstreamReconciler(UpstreamConf, 5) - if err := reconciler.SyncAfter(ctx, "checks", ReconcileMaxAge); err != nil { + if err := reconciler.SyncAfter(context.DefaultContext, "checks", ReconcileMaxAge); err != nil { jobHistory.AddError(err.Error()) logger.Errorf("failed to sync table 'checks': %v", err) } else { diff --git a/pkg/jobs/jobs.go b/pkg/jobs/jobs.go index bcc06ebde..a4cd258e8 100644 --- a/pkg/jobs/jobs.go +++ b/pkg/jobs/jobs.go @@ -3,6 +3,7 @@ package jobs import ( "github.com/flanksource/canary-checker/api/context" v1 "github.com/flanksource/canary-checker/api/v1" + "github.com/flanksource/canary-checker/pkg/db" canaryJobs "github.com/flanksource/canary-checker/pkg/jobs/canary" systemJobs "github.com/flanksource/canary-checker/pkg/jobs/system" @@ -10,6 +11,7 @@ import ( "github.com/flanksource/canary-checker/pkg/topology/checks" "github.com/flanksource/canary-checker/pkg/topology/configs" "github.com/flanksource/commons/logger" + dutyjob "github.com/flanksource/duty/job" "github.com/robfig/cron/v3" ) @@ -67,9 +69,11 @@ func Start() { } } - if _, err := ScheduleFunc(SyncCanaryJobsSchedule, canaryJobs.SyncCanaryJobs); err != nil { - logger.Errorf("Failed to schedule sync jobs for canary: %v", err) + if err := dutyjob.NewJob(context.DefaultContext, "SyncCanaryJobs", SyncCanaryJobsSchedule, canaryJobs.SyncCanaryJobs). + RunOnStart().AddToScheduler(FuncScheduler); err != nil { + logger.Fatalf("Failed to schedule job [canaryJobs.SyncCanaryJobs]: %v", err) } + if _, err := ScheduleFunc(SyncSystemsJobsSchedule, systemJobs.SyncTopologyJobs); err != nil { logger.Errorf("Failed to schedule sync jobs for systems: %v", err) } @@ -114,7 +118,6 @@ func Start() { } canaryJobs.CleanupMetricsGauges() - canaryJobs.SyncCanaryJobs() systemJobs.SyncTopologyJobs() } diff --git a/pkg/topology/checks/component_check.go b/pkg/topology/checks/component_check.go index 553e671b3..e1d8e8cb1 100644 --- a/pkg/topology/checks/component_check.go +++ b/pkg/topology/checks/component_check.go @@ -3,6 +3,7 @@ package checks import ( "time" + "github.com/flanksource/canary-checker/api/context" "github.com/flanksource/canary-checker/pkg" "github.com/flanksource/canary-checker/pkg/db" canaryJobs "github.com/flanksource/canary-checker/pkg/jobs/canary" @@ -70,7 +71,7 @@ func GetCheckComponentRelationshipsForComponent(component *pkg.Component) (relat logger.Debugf("error creating canary from inline: %v", err) } - if err := canaryJobs.SyncCanaryJob(*canary); err != nil { + if err := canaryJobs.SyncCanaryJob(context.DefaultContext, *canary); err != nil { logger.Debugf("error creating canary job: %v", err) } From 86992216e4c067e21609eb8530b8c35e8fb5a466 Mon Sep 17 00:00:00 2001 From: Yash Mehrotra Date: Wed, 25 Oct 2023 07:38:36 +0530 Subject: [PATCH 2/5] chore: embed dutyjob in canary job struct --- go.mod | 4 +-- go.sum | 27 -------------- pkg/jobs/canary/canary_jobs.go | 66 +++++++++++++++------------------- 3 files changed, 30 insertions(+), 67 deletions(-) diff --git a/go.mod b/go.mod index 1c776a0bf..d9251faa5 100644 --- a/go.mod +++ b/go.mod @@ -63,6 +63,7 @@ require ( github.com/spf13/cobra v1.7.0 github.com/spf13/pflag v1.0.5 go.mongodb.org/mongo-driver v1.12.1 + go.opentelemetry.io/otel v1.19.0 golang.org/x/crypto v0.14.0 golang.org/x/net v0.17.0 golang.org/x/sync v0.4.0 @@ -231,7 +232,6 @@ require ( github.com/yuin/gopher-lua v1.1.0 // indirect github.com/zclconf/go-cty v1.14.1 // indirect go.opencensus.io v0.24.0 // indirect - go.opentelemetry.io/otel v1.19.0 // indirect go.opentelemetry.io/otel/metric v1.19.0 // indirect go.opentelemetry.io/otel/trace v1.19.0 // indirect go.starlark.net v0.0.0-20230925163745-10651d5192ab // indirect @@ -269,5 +269,3 @@ require ( sigs.k8s.io/kustomize/kyaml v0.14.3 // indirect sigs.k8s.io/structured-merge-diff/v4 v4.3.0 // indirect ) - -replace github.com/flanksource/duty => ../duty diff --git a/go.sum b/go.sum index 1d15c2748..ff9a2e3aa 100644 --- a/go.sum +++ b/go.sum @@ -817,18 +817,6 @@ github.com/flanksource/commons v1.17.1 h1:jd114sxRwe2VWcbG/PVVEAWsEkialL6eltbqFG github.com/flanksource/commons v1.17.1/go.mod h1:RDdQI0/QYC4GzicbDaXIvBPjWuQWKLzX8/rFBbFjG5U= github.com/flanksource/duty v1.0.205 h1:sQq+J4TMx69NnoM4XxBcJZ8P5HM5GjY/7zcuv/IQGTo= github.com/flanksource/duty v1.0.205/go.mod h1:V3fgZdrBgN47lloIz7MedwD/tq4ycHI8zFOApzUpFv4= -github.com/flanksource/commons v1.17.0 h1:rSahn6c4vyq3bPC5jsayET4y8TECRz6Q8NbooItZiGA= -github.com/flanksource/commons v1.17.0/go.mod h1:RDdQI0/QYC4GzicbDaXIvBPjWuQWKLzX8/rFBbFjG5U= -github.com/flanksource/duty v1.0.201 h1:c8r02bfuF47E2svK+qXCLHKaSqOCZZHKPj+v54eimqc= -github.com/flanksource/duty v1.0.201/go.mod h1:aO1uXnT1eVtiIcicriK4brqJLmeXgbrYWtNR0H5IkLE= -github.com/flanksource/commons v1.15.0 h1:p74hrKzIz0r3H8YN3CuB8ePJOjzPFO0BRLVmpXmeqvY= -github.com/flanksource/commons v1.15.0/go.mod h1:FMZFLcQr98JwBKuKLs44DvCQ2JNoHz5maRIzVufQ9Cs= -github.com/flanksource/commons v1.15.1 h1:cFvxQd5SBFe+q16ciz8Q2IeBMeQ7+atdACGanbW27hg= -github.com/flanksource/commons v1.15.1/go.mod h1:FMZFLcQr98JwBKuKLs44DvCQ2JNoHz5maRIzVufQ9Cs= -github.com/flanksource/duty v1.0.191 h1:acnvyTeQlfqmtyXxWprNFGK/vBTUlqkYwxEPLtXSPrk= -github.com/flanksource/duty v1.0.191/go.mod h1:ikyl/TcRy6Cc0R5b0wEHT7CecV7gyJvrDGq/4oIZHoc= -github.com/flanksource/duty v1.0.197 h1:KRw4EPAD2kcqNPkipnkHzlbf5wmLqg3JgtXqiPzCLhw= -github.com/flanksource/duty v1.0.197/go.mod h1:aO1uXnT1eVtiIcicriK4brqJLmeXgbrYWtNR0H5IkLE= github.com/flanksource/gomplate/v3 v3.20.4/go.mod h1:27BNWhzzSjDed1z8YShO6W+z6G9oZXuxfNFGd/iGSdc= github.com/flanksource/gomplate/v3 v3.20.19 h1:xl+XMYWXtlrO6FfU+VxwjNwX4/oBK3/soOtHRvUt2us= github.com/flanksource/gomplate/v3 v3.20.19/go.mod h1:2GgHZ2vWmtDspJMBfUIryOuzJSwc8jU7Kw9fDLr0TMA= @@ -1139,10 +1127,6 @@ github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ github.com/hashicorp/golang-lru v0.5.1/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= github.com/hashicorp/hcl/v2 v2.19.1 h1://i05Jqznmb2EXqa39Nsvyan2o5XyMowW5fnCKW5RPI= github.com/hashicorp/hcl/v2 v2.19.1/go.mod h1:ThLC89FV4p9MPW804KVbe/cEXoQ8NZEh+JtMeeGErHE= -github.com/hashicorp/hcl/v2 v2.18.0 h1:wYnG7Lt31t2zYkcquwgKo6MWXzRUDIeIVU5naZwHLl8= -github.com/hashicorp/hcl/v2 v2.18.0/go.mod h1:ThLC89FV4p9MPW804KVbe/cEXoQ8NZEh+JtMeeGErHE= -github.com/hashicorp/hcl/v2 v2.18.1 h1:6nxnOJFku1EuSawSD81fuviYUV8DxFr3fp2dUi3ZYSo= -github.com/hashicorp/hcl/v2 v2.18.1/go.mod h1:ThLC89FV4p9MPW804KVbe/cEXoQ8NZEh+JtMeeGErHE= github.com/henvic/httpretty v0.1.2 h1:EQo556sO0xeXAjP10eB+BZARMuvkdGqtfeS4Ntjvkiw= github.com/henvic/httpretty v0.1.2/go.mod h1:ViEsly7wgdugYtymX54pYp6Vv2wqZmNHayJ6q8tlKCc= github.com/hirochachacha/go-smb2 v1.1.0 h1:b6hs9qKIql9eVXAiN0M2wSFY5xnhbHAQoCwRKbaRTZI= @@ -1523,8 +1507,6 @@ github.com/yuin/goldmark v1.4.1/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1 github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY= github.com/yuin/gopher-lua v1.1.0 h1:BojcDhfyDWgU2f2TOzYK/g5p2gxMrku8oupLDqlnSqE= github.com/yuin/gopher-lua v1.1.0/go.mod h1:GBR0iDaNXjAgGg9zfCvksxSRnQx76gclCIb7kdAd1Pw= -github.com/zclconf/go-cty v1.14.0 h1:/Xrd39K7DXbHzlisFP9c4pHao4yyf+/Ug9LEz+Y/yhc= -github.com/zclconf/go-cty v1.14.0/go.mod h1:VvMs5i0vgZdhYawQNq5kePSpLAoz8u1xvZgrPIxfnZE= github.com/zclconf/go-cty v1.14.1 h1:t9fyA35fwjjUMcmL5hLER+e/rEPqrbCK1/OSE4SI9KA= github.com/zclconf/go-cty v1.14.1/go.mod h1:VvMs5i0vgZdhYawQNq5kePSpLAoz8u1xvZgrPIxfnZE= github.com/zeebo/assert v1.3.0/go.mod h1:Pq9JiuJQpG8JLJdtkwrJESF0Foym2/D9XMU5ciN/wJ0= @@ -2318,13 +2300,6 @@ gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gorm.io/driver/postgres v1.5.3 h1:qKGY5CPHOuj47K/VxbCXJfFvIUeqMSXXadqdCY+MbBU= gorm.io/driver/postgres v1.5.3/go.mod h1:F+LtvlFhZT7UBiA81mC9W6Su3D4WUhSboc/36QZU0gk= gorm.io/gorm v1.25.0/go.mod h1:L4uxeKpfBml98NYqVqwAdmV1a2nBtAec/cf3fpucW/k= -gorm.io/driver/postgres v1.5.2 h1:ytTDxxEv+MplXOfFe3Lzm7SjG09fcdb3Z/c056DTBx0= -gorm.io/driver/postgres v1.5.2/go.mod h1:fmpX0m2I1PKuR7mKZiEluwrP3hbs+ps7JIGMUBpCgl8= -gorm.io/driver/postgres v1.5.3 h1:qKGY5CPHOuj47K/VxbCXJfFvIUeqMSXXadqdCY+MbBU= -gorm.io/driver/postgres v1.5.3/go.mod h1:F+LtvlFhZT7UBiA81mC9W6Su3D4WUhSboc/36QZU0gk= -gorm.io/gorm v1.25.0/go.mod h1:L4uxeKpfBml98NYqVqwAdmV1a2nBtAec/cf3fpucW/k= -gorm.io/gorm v1.25.4 h1:iyNd8fNAe8W9dvtlgeRI5zSVZPsq3OpcTu37cYcpCmw= -gorm.io/gorm v1.25.4/go.mod h1:L4uxeKpfBml98NYqVqwAdmV1a2nBtAec/cf3fpucW/k= gorm.io/gorm v1.25.5 h1:zR9lOiiYf09VNh5Q1gphfyia1JpiClIWG9hQaxB/mls= gorm.io/gorm v1.25.5/go.mod h1:hbnx/Oo0ChWMn1BIhpy1oYozzpM15i4YPuHDmfYtwg8= gorm.io/plugin/prometheus v0.0.0-20230504115745-1aec2356381b h1:uHPZdwwf4+AVvAEgZ/LQR1UTub8LJ2nh0wQDW3Dt4jE= @@ -2364,8 +2339,6 @@ k8s.io/klog/v2 v2.100.1 h1:7WCHKK6K8fNhTqfBhISHQ97KrnJNFZMcQvKp7gP/tmg= k8s.io/klog/v2 v2.100.1/go.mod h1:y1WjHnz7Dj687irZUWR/WLkLc5N1YHtjLdmgWjndZn0= k8s.io/kube-openapi v0.0.0-20220328201542-3ee0da9b0b42/go.mod h1:Z/45zLw8lUo4wdiUkI+v/ImEGAvu3WatcZl3lPMR4Rk= k8s.io/kube-openapi v0.0.0-20221012153701-172d655c2280/go.mod h1:+Axhij7bCpeqhklhUTe3xmOn6bWxolyZEeyaFpjGtl4= -k8s.io/kube-openapi v0.0.0-20230717233707-2695361300d9 h1:LyMgNKD2P8Wn1iAwQU5OhxCKlKJy0sHc+PcDwFB24dQ= -k8s.io/kube-openapi v0.0.0-20230717233707-2695361300d9/go.mod h1:wZK2AVp1uHCp4VamDVgBP2COHZjqD1T68Rf0CM3YjSM= k8s.io/kube-openapi v0.0.0-20231010175941-2dd684a91f00 h1:aVUu9fTY98ivBPKR9Y5w/AuzbMm96cd3YHRTU83I780= k8s.io/kube-openapi v0.0.0-20231010175941-2dd684a91f00/go.mod h1:AsvuZPBlUDVuCdzJ87iajxtXuR9oktsTctW/R9wwouA= k8s.io/utils v0.0.0-20210802155522-efc7438f0176/go.mod h1:jPW/WVKK9YHAvNhRxK0md/EJ228hCsBRufyofKtW8HA= diff --git a/pkg/jobs/canary/canary_jobs.go b/pkg/jobs/canary/canary_jobs.go index 04767d649..0ac975abb 100644 --- a/pkg/jobs/canary/canary_jobs.go +++ b/pkg/jobs/canary/canary_jobs.go @@ -52,13 +52,11 @@ func StartScanCanaryConfigs(dataFile string, configFiles []string) { } type CanaryJob struct { - *kommons.Client - dutyjob.JobRuntime - Kubernetes kubernetes.Interface - Canary v1.Canary - DBCanary pkg.Canary - LogPass bool - LogFail bool + dutyjob.Job + Canary v1.Canary + DBCanary pkg.Canary + LogPass bool + LogFail bool } func (job CanaryJob) GetNamespacedName() types.NamespacedName { @@ -68,25 +66,12 @@ func (job CanaryJob) GetNamespacedName() types.NamespacedName { var minimumTimeBetweenCanaryRuns = 10 * time.Second var canaryLastRuntimes = sync.Map{} -func SyncCanary(ctx dutyjob.JobRuntime, args ...any) error { - if len(args) != 2 { - return fmt.Errorf("wrong arg count for SyncCanary: %d", len(args)) - } - dbCanary, ok := args[0].(pkg.Canary) - if !ok { - return fmt.Errorf("wrong arg type for dbCanary: %T", args[0]) - } - - canary, ok := args[1].(v1.Canary) - if !ok { - return fmt.Errorf("wrong arg type for canary: %T", args[1]) - } - - if runner.IsCanaryIgnored(&canary.ObjectMeta) { +func (j CanaryJob) Run(ctx dutyjob.JobRuntime) error { + if runner.IsCanaryIgnored(&j.Canary.ObjectMeta) { return nil } - canaryID := dbCanary.ID.String() + canaryID := j.DBCanary.ID.String() val, _ := concurrentJobLocks.LoadOrStore(canaryID, &sync.Mutex{}) lock, ok := val.(*sync.Mutex) if !ok { @@ -110,7 +95,7 @@ func SyncCanary(ctx dutyjob.JobRuntime, args ...any) error { // Skip run if job ran too recently if lastRunDelta < minimumTimeBetweenCanaryRuns { - logger.Infof("Skipping Canary[%s]:%s since it last ran %.2f seconds ago", canaryID, canary.GetNamespacedName(), lastRunDelta.Seconds()) + logger.Infof("Skipping Canary[%s]:%s since it last ran %.2f seconds ago", canaryID, j.Canary.GetNamespacedName(), lastRunDelta.Seconds()) return nil } @@ -120,7 +105,7 @@ func SyncCanary(ctx dutyjob.JobRuntime, args ...any) error { // Transformed checks have a delete strategy // On deletion they can either be marked healthy, unhealthy or left as is checkIDDeleteStrategyMap := make(map[string]string) - canaryCtx := canarycontext.New(ctx.Kommons(), ctx.Kubernetes(), ctx.DB(), ctx.Pool(), canary) + canaryCtx := canarycontext.New(ctx.Kommons(), ctx.Kubernetes(), ctx.DB(), ctx.Pool(), j.Canary) results, err := checks.RunChecks(canaryCtx) if err != nil { logger.Errorf("error running checks for canary %s: %v", canaryID, err) @@ -128,8 +113,8 @@ func SyncCanary(ctx dutyjob.JobRuntime, args ...any) error { } // TODO: Use ctx with object here - logPass := canary.IsTrace() || canary.IsDebug() || LogPass - logFail := canary.IsTrace() || canary.IsDebug() || LogFail + logPass := j.Canary.IsTrace() || j.Canary.IsDebug() || LogPass + logFail := j.Canary.IsTrace() || j.Canary.IsDebug() || LogFail for _, result := range results { if logPass && result.Pass || logFail && !result.Pass { logger.Infof(result.String()) @@ -140,7 +125,7 @@ func SyncCanary(ctx dutyjob.JobRuntime, args ...any) error { checkIDDeleteStrategyMap[checkID] = result.Check.GetTransformDeleteStrategy() } } - updateCanaryStatusAndEvent(canary, results) + updateCanaryStatusAndEvent(j.Canary, results) checkDeleteStrategyGroup := make(map[string][]string) checksToRemove := utils.SetDifference(existingTransformedChecks, transformedChecksCreated) @@ -171,10 +156,6 @@ func SyncCanary(ctx dutyjob.JobRuntime, args ...any) error { return nil } -func (job *CanaryJob) NewContext() *canarycontext.Context { - return canarycontext.New(job.Client, job.Kubernetes, db.Gorm, db.Pool, job.Canary) -} - func updateCanaryStatusAndEvent(canary v1.Canary, results []*pkg.CheckResult) { if CanaryStatusChannel == nil { return @@ -198,9 +179,10 @@ func updateCanaryStatusAndEvent(canary v1.Canary, results []*pkg.CheckResult) { // Set uptime and latency uptime, latency := metrics.Record(canary, result) checkKey := canary.GetKey(result.Check) - checkStatus[checkKey] = &v1.CheckStatus{} - checkStatus[checkKey].Uptime1H = uptime.String() - checkStatus[checkKey].Latency1H = latency.String() + checkStatus[checkKey] = &v1.CheckStatus{ + Uptime1H: uptime.String(), + Latency1H: latency.String(), + } // Increment aggregate uptime uptimeAgg.Passed += uptime.Passed @@ -227,6 +209,7 @@ func updateCanaryStatusAndEvent(canary v1.Canary, results []*pkg.CheckResult) { lastTransitionedTime = &metav1.Time{Time: time.Now()} } + // TODO Why is this here ? push.Queue(pkg.FromV1(canary, result.Check), pkg.FromResult(*result)) // Update status message @@ -341,7 +324,16 @@ func SyncCanaryJob(ctx context.Context, dbCanary pkg.Canary) error { } updateTime, exists := canaryUpdateTimeCache.Load(dbCanary.ID.String()) - newJob := dutyjob.NewJob(ctx, "SyncCanaryJob", canary.Spec.GetSchedule(), SyncCanary, dbCanary, canary).SetID(dbCanary.ID.String()) + newJob := CanaryJob{ + Job: dutyjob.Job{ + Context: ctx, + Name: "SyncCanaryJob", + Schedule: canary.Spec.GetSchedule(), + ID: dbCanary.ID.String(), + }, + Canary: *canary, + DBCanary: dbCanary, + } entry := findCronEntry(dbCanary.ID.String()) if !exists || dbCanary.UpdatedAt.After(updateTime.(time.Time)) || entry == nil { // Remove entry if it exists @@ -368,7 +360,7 @@ func SyncCanaryJob(ctx context.Context, dbCanary pkg.Canary) error { return nil } -func SyncCanaryJobs(ctx dutyjob.JobRuntime, _ ...any) error { +func SyncCanaryJobs(ctx dutyjob.JobRuntime) error { ctx.Debugf("Syncing canary jobs") canaries, err := db.GetAllCanariesForSync(ctx.Context, runner.WatchNamespace) From fb4d21e36c04bc97e163fb7109d3f89dfe4fdc6e Mon Sep 17 00:00:00 2001 From: Yash Mehrotra Date: Wed, 25 Oct 2023 11:48:06 +0530 Subject: [PATCH 3/5] chore: fix test --- pkg/jobs/canary/canary_jobs_test.go | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/pkg/jobs/canary/canary_jobs_test.go b/pkg/jobs/canary/canary_jobs_test.go index d32f1fbfd..6020d0f24 100644 --- a/pkg/jobs/canary/canary_jobs_test.go +++ b/pkg/jobs/canary/canary_jobs_test.go @@ -1,6 +1,7 @@ package canary import ( + gocontext "context" "encoding/json" "fmt" "net/http" @@ -8,6 +9,8 @@ import ( v1 "github.com/flanksource/canary-checker/api/v1" "github.com/flanksource/canary-checker/pkg/db" + "github.com/flanksource/duty/context" + "github.com/flanksource/duty/job" "github.com/flanksource/duty/models" "github.com/flanksource/duty/types" "github.com/google/uuid" @@ -26,6 +29,8 @@ var _ = ginkgo.Describe("Test Sync Canary Job", ginkgo.Ordered, func() { }, } + ctx := context.NewContext(gocontext.Background()).WithDB(db.Gorm, db.Pool) + ginkgo.It("should save a canary spec", func() { b, err := json.Marshal(canarySpec) Expect(err).To(BeNil()) @@ -42,7 +47,7 @@ var _ = ginkgo.Describe("Test Sync Canary Job", ginkgo.Ordered, func() { err = db.Gorm.Create(canaryM).Error Expect(err).To(BeNil()) - response, err := db.GetAllCanariesForSync("") + response, err := db.GetAllCanariesForSync(ctx, "") Expect(err).To(BeNil()) Expect(len(response)).To(Equal(1)) }) @@ -50,7 +55,10 @@ var _ = ginkgo.Describe("Test Sync Canary Job", ginkgo.Ordered, func() { ginkgo.It("schedule the canary job", func() { CanaryScheduler.Start() minimumTimeBetweenCanaryRuns = 0 // reset this for now so it doesn't hinder test with small schedules - SyncCanaryJobs() + jobCtx := job.JobRuntime{ + Context: ctx, + } + SyncCanaryJobs(jobCtx) }) ginkgo.It("should verify that the endpoint wasn't called more than once after 3 seconds", func() { From e7d59ff7afe8a045e47276bae72c8a5c39b6e58d Mon Sep 17 00:00:00 2001 From: Yash Mehrotra Date: Wed, 25 Oct 2023 16:24:00 +0530 Subject: [PATCH 4/5] chore: add telemetry package --- cmd/root.go | 27 +++++++++---- go.mod | 6 +++ go.sum | 12 ++++++ pkg/jobs/canary/canary_jobs.go | 38 +++++++++++-------- pkg/jobs/canary/canary_jobs_test.go | 7 ++-- pkg/telemetry/tracer.go | 59 +++++++++++++++++++++++++++++ 6 files changed, 123 insertions(+), 26 deletions(-) create mode 100644 pkg/telemetry/tracer.go diff --git a/cmd/root.go b/cmd/root.go index 4805b3b1a..93596971e 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -8,6 +8,7 @@ import ( "github.com/flanksource/canary-checker/pkg/db" "github.com/flanksource/canary-checker/pkg/jobs/canary" "github.com/flanksource/canary-checker/pkg/runner" + "github.com/flanksource/canary-checker/pkg/telemetry" "github.com/flanksource/commons/logger" gomplate "github.com/flanksource/gomplate/v3" "github.com/spf13/cobra" @@ -31,16 +32,25 @@ var Root = &cobra.Command{ if canary.UpstreamConf.Valid() { logger.Infof("Pushing checks to %s with name=%s user=%s", canary.UpstreamConf.Host, canary.UpstreamConf.AgentName, canary.UpstreamConf.Username) } + + if otelcollectorURL != "" { + telemetry.InitTracer(otelServiceName, otelcollectorURL, true) + } }, } -var httpPort = 8080 -var publicEndpoint = "http://localhost:8080" -var prometheusURL string -var pushServers, pullServers []string -var sharedLibrary []string -var exposeEnv bool -var logPass, logFail bool +var ( + httpPort = 8080 + publicEndpoint = "http://localhost:8080" + prometheusURL string + pushServers, pullServers []string + sharedLibrary []string + exposeEnv bool + logPass, logFail bool + + otelcollectorURL string + otelServiceName string +) func ServerFlags(flags *pflag.FlagSet) { flags.IntVar(&httpPort, "httpPort", httpPort, "Port to expose a health dashboard ") @@ -76,6 +86,9 @@ func ServerFlags(flags *pflag.FlagSet) { flags.StringVar(&canary.UpstreamConf.Password, "upstream-password", os.Getenv("UPSTREAM_PASSWORD"), "upstream password") flags.StringVar(&canary.UpstreamConf.AgentName, "agent-name", os.Getenv("UPSTREAM_NAME"), "name of this agent") flags.BoolVar(&canary.UpstreamConf.InsecureSkipVerify, "upstream-insecure-skip-verify", os.Getenv("UPSTREAM_INSECURE_SKIP_VERIFY") == "true", "Skip TLS verification on the upstream servers certificate") + + flags.StringVar(&otelcollectorURL, "otel-collector-url", "", "OpenTelemetry gRPC Collector URL in host:port format") + flags.StringVar(&otelServiceName, "otel-service-name", "canary-checker", "OpenTelemetry service name for the resource") } func readFromEnv(v string) string { diff --git a/go.mod b/go.mod index d9251faa5..9c8e74e1f 100644 --- a/go.mod +++ b/go.mod @@ -112,6 +112,7 @@ require ( github.com/aws/smithy-go v1.14.2 // indirect github.com/beorn7/perks v1.0.1 // indirect github.com/bgentry/go-netrc v0.0.0-20140422174119-9fd32a8b3d3d // indirect + github.com/cenkalti/backoff/v4 v4.2.1 // indirect github.com/cespare/xxhash/v2 v2.2.0 // indirect github.com/davecgh/go-spew v1.1.1 // indirect github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect @@ -161,6 +162,7 @@ require ( github.com/gosimple/slug v1.13.1 // indirect github.com/gosimple/unidecode v1.0.1 // indirect github.com/gregjones/httpcache v0.0.0-20190611155906-901d90724c79 // indirect + github.com/grpc-ecosystem/grpc-gateway/v2 v2.16.0 // indirect github.com/hairyhenderson/toml v0.4.2-0.20210923231440-40456b8e66cf // indirect github.com/hairyhenderson/yaml v0.0.0-20220618171115-2d35fca545ce // indirect github.com/hashicorp/go-cleanhttp v0.5.2 // indirect @@ -232,8 +234,12 @@ require ( github.com/yuin/gopher-lua v1.1.0 // indirect github.com/zclconf/go-cty v1.14.1 // indirect go.opencensus.io v0.24.0 // indirect + go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.19.0 // indirect + go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.19.0 // indirect go.opentelemetry.io/otel/metric v1.19.0 // indirect + go.opentelemetry.io/otel/sdk v1.19.0 // indirect go.opentelemetry.io/otel/trace v1.19.0 // indirect + go.opentelemetry.io/proto/otlp v1.0.0 // indirect go.starlark.net v0.0.0-20230925163745-10651d5192ab // indirect go.uber.org/multierr v1.11.0 // indirect go.uber.org/zap v1.26.0 // indirect diff --git a/go.sum b/go.sum index ff9a2e3aa..3458e0866 100644 --- a/go.sum +++ b/go.sum @@ -737,6 +737,8 @@ github.com/boombuler/barcode v1.0.0/go.mod h1:paBWMcWSl3LHKBqUq+rly7CNSldXjb2rDl github.com/boombuler/barcode v1.0.1/go.mod h1:paBWMcWSl3LHKBqUq+rly7CNSldXjb2rDl3JlRe0mD8= github.com/c2h5oh/datasize v0.0.0-20220606134207-859f65c6625b h1:6+ZFm0flnudZzdSE0JxlhR2hKnGPcNB35BjQf4RYQDY= github.com/c2h5oh/datasize v0.0.0-20220606134207-859f65c6625b/go.mod h1:S/7n9copUssQ56c7aAgHqftWO4LTf4xY6CGWt8Bc+3M= +github.com/cenkalti/backoff/v4 v4.2.1 h1:y4OZtCnogmCPw98Zjyt5a6+QwPLGkiQsYW5oUqylYbM= +github.com/cenkalti/backoff/v4 v4.2.1/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE= github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= github.com/census-instrumentation/opencensus-proto v0.3.0/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= github.com/census-instrumentation/opencensus-proto v0.4.1/go.mod h1:4T9NM4+4Vw91VeyqjLS6ao50K5bOcLKN6Q42XnYaRYw= @@ -1108,9 +1110,12 @@ github.com/gosimple/unidecode v1.0.1 h1:hZzFTMMqSswvf0LBJZCZgThIZrpDHFXux9KeGmn6 github.com/gosimple/unidecode v1.0.1/go.mod h1:CP0Cr1Y1kogOtx0bJblKzsVWrqYaqfNOnHzpgWw4Awc= github.com/gregjones/httpcache v0.0.0-20190611155906-901d90724c79 h1:+ngKgrYPPJrOjhax5N+uePQ0Fh1Z7PheYoUI/0nzkPA= github.com/gregjones/httpcache v0.0.0-20190611155906-901d90724c79/go.mod h1:FecbI9+v66THATjSRHfNgh1IVFe/9kFxbXtjV0ctIMA= +github.com/grpc-ecosystem/grpc-gateway v1.16.0 h1:gmcG1KaJ57LophUzW0Hy8NmPhnMZb4M0+kPpLofRdBo= github.com/grpc-ecosystem/grpc-gateway v1.16.0/go.mod h1:BDjrQk3hbvj6Nolgz8mAMFbcEtjT1g+wF4CSlocrBnw= github.com/grpc-ecosystem/grpc-gateway/v2 v2.7.0/go.mod h1:hgWBS7lorOAVIJEQMi4ZsPv9hVvWI6+ch50m39Pf2Ks= github.com/grpc-ecosystem/grpc-gateway/v2 v2.11.3/go.mod h1:o//XUCC/F+yRGJoPO/VU0GSB0f8Nhgmxx0VIRUvaC0w= +github.com/grpc-ecosystem/grpc-gateway/v2 v2.16.0 h1:YBftPWNWd4WwGqtY2yeZL2ef8rHAxPBD8KFhJpmcqms= +github.com/grpc-ecosystem/grpc-gateway/v2 v2.16.0/go.mod h1:YN5jB8ie0yfIUg6VvR9Kz84aCaG7AsGZnLjhHbUqwPg= github.com/hairyhenderson/toml v0.4.2-0.20210923231440-40456b8e66cf h1:I1sbT4ZbIt9i+hB1zfKw2mE8C12TuGxPiW7YmtLbPa4= github.com/hairyhenderson/toml v0.4.2-0.20210923231440-40456b8e66cf/go.mod h1:jDHmWDKZY6MIIYltYYfW4Rs7hQ50oS4qf/6spSiZAxY= github.com/hairyhenderson/yaml v0.0.0-20220618171115-2d35fca545ce h1:cVkYhlWAxwuS2/Yp6qPtcl0fGpcWxuZNonywHZ6/I+s= @@ -1527,14 +1532,21 @@ go.opencensus.io v0.24.0 h1:y73uSU6J157QMP2kn2r30vwW1A2W2WFwSCGnAVxeaD0= go.opencensus.io v0.24.0/go.mod h1:vNK8G9p7aAivkbmorf4v+7Hgx+Zs0yY+0fOtgBfjQKo= go.opentelemetry.io/otel v1.19.0 h1:MuS/TNf4/j4IXsZuJegVzI1cwut7Qc00344rgH7p8bs= go.opentelemetry.io/otel v1.19.0/go.mod h1:i0QyjOq3UPoTzff0PJB2N66fb4S0+rSbSB15/oyH9fY= +go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.19.0 h1:Mne5On7VWdx7omSrSSZvM4Kw7cS7NQkOOmLcgscI51U= +go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.19.0/go.mod h1:IPtUMKL4O3tH5y+iXVyAXqpAwMuzC1IrxVS81rummfE= +go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.19.0 h1:3d+S281UTjM+AbF31XSOYn1qXn3BgIdWl8HNEpx08Jk= +go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.19.0/go.mod h1:0+KuTDyKL4gjKCF75pHOX4wuzYDUZYfAQdSu43o+Z2I= go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.19.0 h1:Nw7Dv4lwvGrI68+wULbcq7su9K2cebeCUrDjVrUJHxM= go.opentelemetry.io/otel/metric v1.19.0 h1:aTzpGtV0ar9wlV4Sna9sdJyII5jTVJEvKETPiOKwvpE= go.opentelemetry.io/otel/metric v1.19.0/go.mod h1:L5rUsV9kM1IxCj1MmSdS+JQAcVm319EUrDVLrt7jqt8= go.opentelemetry.io/otel/sdk v1.19.0 h1:6USY6zH+L8uMH8L3t1enZPR3WFEmSTADlqldyHtJi3o= +go.opentelemetry.io/otel/sdk v1.19.0/go.mod h1:NedEbbS4w3C6zElbLdPJKOpJQOrGUJ+GfzpjUvI0v1A= go.opentelemetry.io/otel/trace v1.19.0 h1:DFVQmlVbfVeOuBRrwdtaehRrWiL1JoVs9CPIQ1Dzxpg= go.opentelemetry.io/otel/trace v1.19.0/go.mod h1:mfaSyvGyEJEI0nyV2I4qhNQnbBOUUmYZpYojqMnX2vo= go.opentelemetry.io/proto/otlp v0.7.0/go.mod h1:PqfVotwruBrMGOCsRd/89rSnXhoiJIqeYNgFYFoEGnI= go.opentelemetry.io/proto/otlp v0.15.0/go.mod h1:H7XAot3MsfNsj7EXtrA2q5xSNQ10UqI405h3+duxN4U= +go.opentelemetry.io/proto/otlp v1.0.0 h1:T0TX0tmXU8a3CbNXzEKGeU5mIVOdf0oykP+u2lIVU/I= +go.opentelemetry.io/proto/otlp v1.0.0/go.mod h1:Sy6pihPLfYHkr3NkUbEhGHFhINUSI/v80hjKIs5JXpM= go.starlark.net v0.0.0-20230925163745-10651d5192ab h1:7QkXlIVjYdSsKKSGnM0jQdw/2w9W5qcFDGTc00zKqgI= go.starlark.net v0.0.0-20230925163745-10651d5192ab/go.mod h1:LcLNIzVOMp4oV+uusnpk+VU+SzXaJakUuBjoCSWH5dM= go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= diff --git a/pkg/jobs/canary/canary_jobs.go b/pkg/jobs/canary/canary_jobs.go index 0ac975abb..0adb8f37c 100644 --- a/pkg/jobs/canary/canary_jobs.go +++ b/pkg/jobs/canary/canary_jobs.go @@ -1,6 +1,7 @@ package canary import ( + gocontext "context" "fmt" "path" "sync" @@ -21,7 +22,10 @@ import ( dutyjob "github.com/flanksource/duty/job" "github.com/flanksource/duty/models" "github.com/flanksource/kommons" + "go.opentelemetry.io/otel/trace" + "github.com/robfig/cron/v3" + "go.opentelemetry.io/otel/attribute" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/kubernetes" @@ -52,21 +56,21 @@ func StartScanCanaryConfigs(dataFile string, configFiles []string) { } type CanaryJob struct { - dutyjob.Job Canary v1.Canary DBCanary pkg.Canary LogPass bool LogFail bool } -func (job CanaryJob) GetNamespacedName() types.NamespacedName { - return types.NamespacedName{Name: job.Canary.Name, Namespace: job.Canary.Namespace} +func (j CanaryJob) GetNamespacedName() types.NamespacedName { + return types.NamespacedName{Name: j.Canary.Name, Namespace: j.Canary.Namespace} } var minimumTimeBetweenCanaryRuns = 10 * time.Second var canaryLastRuntimes = sync.Map{} func (j CanaryJob) Run(ctx dutyjob.JobRuntime) error { + ctx.GetSpan().SetAttributes(attribute.String("canary-id", j.DBCanary.ID.String())) if runner.IsCanaryIgnored(&j.Canary.ObjectMeta) { return nil } @@ -99,18 +103,22 @@ func (j CanaryJob) Run(ctx dutyjob.JobRuntime) error { return nil } - // Get transformed checks before and after, and then delete the olds ones that are not in new set - existingTransformedChecks, _ := db.GetTransformedCheckIDs(ctx.Context, canaryID) - var transformedChecksCreated []string - // Transformed checks have a delete strategy - // On deletion they can either be marked healthy, unhealthy or left as is - checkIDDeleteStrategyMap := make(map[string]string) canaryCtx := canarycontext.New(ctx.Kommons(), ctx.Kubernetes(), ctx.DB(), ctx.Pool(), j.Canary) + var span trace.Span + ctx.Context, span = ctx.StartSpan("RunCanaryChecks") results, err := checks.RunChecks(canaryCtx) if err != nil { logger.Errorf("error running checks for canary %s: %v", canaryID, err) return nil } + span.End() + + // Get transformed checks before and after, and then delete the olds ones that are not in new set + existingTransformedChecks, _ := db.GetTransformedCheckIDs(ctx.Context, canaryID) + var transformedChecksCreated []string + // Transformed checks have a delete strategy + // On deletion they can either be marked healthy, unhealthy or left as is + checkIDDeleteStrategyMap := make(map[string]string) // TODO: Use ctx with object here logPass := j.Canary.IsTrace() || j.Canary.IsDebug() || LogPass @@ -324,16 +332,14 @@ func SyncCanaryJob(ctx context.Context, dbCanary pkg.Canary) error { } updateTime, exists := canaryUpdateTimeCache.Load(dbCanary.ID.String()) - newJob := CanaryJob{ - Job: dutyjob.Job{ - Context: ctx, - Name: "SyncCanaryJob", - Schedule: canary.Spec.GetSchedule(), - ID: dbCanary.ID.String(), - }, + cj := CanaryJob{ Canary: *canary, DBCanary: dbCanary, } + + // Create new job context from empty context to create root spans for jobs + jobCtx := ctx.Wrap(gocontext.Background()).WithObject(canary.ObjectMeta) + newJob := dutyjob.NewJob(jobCtx, "SyncCanaryJob", canary.Spec.GetSchedule(), cj.Run).SetID(dbCanary.ID.String()) entry := findCronEntry(dbCanary.ID.String()) if !exists || dbCanary.UpdatedAt.After(updateTime.(time.Time)) || entry == nil { // Remove entry if it exists diff --git a/pkg/jobs/canary/canary_jobs_test.go b/pkg/jobs/canary/canary_jobs_test.go index 6020d0f24..e90277c55 100644 --- a/pkg/jobs/canary/canary_jobs_test.go +++ b/pkg/jobs/canary/canary_jobs_test.go @@ -29,8 +29,6 @@ var _ = ginkgo.Describe("Test Sync Canary Job", ginkgo.Ordered, func() { }, } - ctx := context.NewContext(gocontext.Background()).WithDB(db.Gorm, db.Pool) - ginkgo.It("should save a canary spec", func() { b, err := json.Marshal(canarySpec) Expect(err).To(BeNil()) @@ -47,6 +45,7 @@ var _ = ginkgo.Describe("Test Sync Canary Job", ginkgo.Ordered, func() { err = db.Gorm.Create(canaryM).Error Expect(err).To(BeNil()) + ctx := context.NewContext(gocontext.Background()).WithDB(db.Gorm, db.Pool) response, err := db.GetAllCanariesForSync(ctx, "") Expect(err).To(BeNil()) Expect(len(response)).To(Equal(1)) @@ -55,10 +54,12 @@ var _ = ginkgo.Describe("Test Sync Canary Job", ginkgo.Ordered, func() { ginkgo.It("schedule the canary job", func() { CanaryScheduler.Start() minimumTimeBetweenCanaryRuns = 0 // reset this for now so it doesn't hinder test with small schedules + ctx := context.NewContext(gocontext.Background()).WithDB(db.Gorm, db.Pool) jobCtx := job.JobRuntime{ Context: ctx, } - SyncCanaryJobs(jobCtx) + err := SyncCanaryJobs(jobCtx) + Expect(err).To(BeNil()) }) ginkgo.It("should verify that the endpoint wasn't called more than once after 3 seconds", func() { diff --git a/pkg/telemetry/tracer.go b/pkg/telemetry/tracer.go new file mode 100644 index 000000000..962bda4e6 --- /dev/null +++ b/pkg/telemetry/tracer.go @@ -0,0 +1,59 @@ +package telemetry + +import ( + "context" + + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/exporters/otlp/otlptrace" + "go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc" + "go.opentelemetry.io/otel/propagation" + "google.golang.org/grpc/credentials" + + "github.com/flanksource/commons/logger" + "go.opentelemetry.io/otel/sdk/resource" + sdktrace "go.opentelemetry.io/otel/sdk/trace" +) + +func InitTracer(serviceName, collectorURL string, insecure bool) func(context.Context) error { + var secureOption otlptracegrpc.Option + if !insecure { + secureOption = otlptracegrpc.WithTLSCredentials(credentials.NewClientTLSFromCert(nil, "")) + } else { + secureOption = otlptracegrpc.WithInsecure() + } + + exporter, err := otlptrace.New( + context.Background(), + otlptracegrpc.NewClient( + secureOption, + otlptracegrpc.WithEndpoint(collectorURL), + ), + ) + + if err != nil { + logger.Fatalf("Failed to create exporter: %v", err) + } + resources, err := resource.New( + context.Background(), + resource.WithAttributes( + attribute.String("service.name", serviceName), + ), + ) + if err != nil { + logger.Fatalf("Could not set resources: %v", err) + } + + otel.SetTracerProvider( + sdktrace.NewTracerProvider( + sdktrace.WithSampler(sdktrace.AlwaysSample()), + sdktrace.WithBatcher(exporter), + sdktrace.WithResource(resources), + ), + ) + + // Register the TraceContext propagator globally. + otel.SetTextMapPropagator(propagation.TraceContext{}) + + return exporter.Shutdown +} From 374743a646f2924bcc5d2a181dc9a4cca8345845 Mon Sep 17 00:00:00 2001 From: Yash Mehrotra Date: Mon, 30 Oct 2023 10:58:18 +0530 Subject: [PATCH 5/5] chore: use duty context for find deleted checks --- pkg/db/canary.go | 5 ++--- pkg/jobs/canary/prometheus_cleanup.go | 6 ++---- 2 files changed, 4 insertions(+), 7 deletions(-) diff --git a/pkg/db/canary.go b/pkg/db/canary.go index c98ce72c2..167291ac2 100644 --- a/pkg/db/canary.go +++ b/pkg/db/canary.go @@ -1,7 +1,6 @@ package db import ( - gocontext "context" "encoding/json" "errors" "fmt" @@ -279,9 +278,9 @@ func FindCheck(canary pkg.Canary, name string) (*pkg.Check, error) { return &model, nil } -func FindDeletedChecksSince(ctx gocontext.Context, since time.Time) ([]string, error) { +func FindDeletedChecksSince(ctx context.Context, since time.Time) ([]string, error) { var ids []string - err := Gorm.Model(&models.Check{}).Where("deleted_at > ?", since).Pluck("id", &ids).Error + err := ctx.DB().Model(&models.Check{}).Where("deleted_at > ?", since).Pluck("id", &ids).Error return ids, err } diff --git a/pkg/jobs/canary/prometheus_cleanup.go b/pkg/jobs/canary/prometheus_cleanup.go index b8ae80593..8c0db729b 100644 --- a/pkg/jobs/canary/prometheus_cleanup.go +++ b/pkg/jobs/canary/prometheus_cleanup.go @@ -1,9 +1,9 @@ package canary import ( - "context" "time" + "github.com/flanksource/canary-checker/api/context" "github.com/flanksource/canary-checker/pkg/db" "github.com/flanksource/canary-checker/pkg/metrics" "github.com/flanksource/commons/logger" @@ -12,10 +12,8 @@ import ( // CleanupMetricsGauges removes gauges for checks that no longer exist. func CleanupMetricsGauges() { - ctx := context.Background() - sevenDaysAgo := time.Now().Add(-time.Hour * 24 * 7) - deletedCheckIDs, err := db.FindDeletedChecksSince(ctx, sevenDaysAgo) + deletedCheckIDs, err := db.FindDeletedChecksSince(context.DefaultContext, sevenDaysAgo) if err != nil { logger.Errorf("Error finding deleted checks: %v", err) return