Skip to content

Commit

Permalink
Update usage scheduler according to FF (#19655)
Browse files Browse the repository at this point in the history
* Update usage scheduler according to FF

* Update dashboard

* Try validate duration first

* Empty fallback to installer config

* undefined fallback to default

* fixup

* Update go mod

* Fix test

* 1m

* Add test cases
  • Loading branch information
mustard-mh authored Apr 29, 2024
1 parent 078eccc commit 9382d33
Show file tree
Hide file tree
Showing 9 changed files with 429 additions and 43 deletions.
1 change: 1 addition & 0 deletions components/dashboard/src/data/featureflag-query.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ const featureFlags = {
// Logging tracing for added for investigate hanging issue
dashboard_logging_tracing: false,
showBrowserExtensionPromotion: false,
usage_update_scheduler_duration: "15m",
};

type FeatureFlags = typeof featureFlags;
Expand Down
4 changes: 2 additions & 2 deletions components/dashboard/src/data/usage/usage-query.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ export function useListUsage(request: ListUsageRequest) {
return getGitpodService().server.listUsage(request);
},
{
cacheTime: 1000 * 60 * 10, // 10 minutes
staleTime: 1000 * 60 * 10, // 10 minutes
cacheTime: 1000 * 60 * 1, // 1 minutes
staleTime: 1000 * 60 * 1, // 1 minutes
retry: false,
},
);
Expand Down
27 changes: 26 additions & 1 deletion components/dashboard/src/usage/UsageView.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,20 @@ import classNames from "classnames";
import { UsageDateFilters } from "./UsageDateFilters";
import { DownloadUsage } from "./download/DownloadUsage";
import { useQueryParams } from "../hooks/use-query-params";
import { useFeatureFlag } from "../data/featureflag-query";

const DATE_PARAM_FORMAT = "YYYY-MM-DD";

interface UsageViewProps {
attributionId: AttributionId;
}

const durationUnitMap: Record<string, string | undefined> = {
s: "seconds",
m: "minutes",
h: "hours",
};

export const UsageView: FC<UsageViewProps> = ({ attributionId }) => {
const location = useLocation();
const history = useHistory();
Expand Down Expand Up @@ -88,9 +96,26 @@ export const UsageView: FC<UsageViewProps> = ({ attributionId }) => {

const usageEntries = usagePage.data?.usageEntriesList || [];

const schedulerDuration = useFeatureFlag("usage_update_scheduler_duration");

const readableSchedulerDuration = useMemo(() => {
const duration = schedulerDuration.toString().toLowerCase();
if (duration === "undefined") {
return "15 minutes";
}
const unit = duration.slice(-1);
const unitStr = durationUnitMap[unit];
if (!unitStr) {
console.error("failed to parse duration", duration);
return "15 minutes";
}
const value = parseInt(duration.slice(0, -1), 10);
return `${value} ${unitStr}`;
}, [schedulerDuration]);

return (
<>
<Header title="Usage" subtitle="Organization usage, updated every 15 minutes." />
<Header title="Usage" subtitle={"Organization usage, updated every " + readableSchedulerDuration + "."} />
<div className="app-container pt-5">
<div
className={classNames(
Expand Down
2 changes: 2 additions & 0 deletions components/usage/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,9 @@ require (
require (
github.com/alicebob/gopher-json v0.0.0-20200520072559-a9ecdc9d1d3a // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/blang/semver v3.5.1+incompatible // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/configcat/go-sdk/v7 v7.6.0 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
github.com/gitpod-io/gitpod/components/scrubber v0.0.0-00010101000000-000000000000 // indirect
Expand Down
8 changes: 8 additions & 0 deletions components/usage/go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

173 changes: 134 additions & 39 deletions components/usage/pkg/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,10 @@
package server

import (
"context"
"fmt"
"net/http"
"sync"
"time"

"github.com/gitpod-io/gitpod/usage/pkg/scheduler"
Expand All @@ -17,6 +19,7 @@ import (
"google.golang.org/grpc/credentials/insecure"

"github.com/gitpod-io/gitpod/common-go/baseserver"
"github.com/gitpod-io/gitpod/common-go/experiments"
"github.com/gitpod-io/gitpod/common-go/log"
db "github.com/gitpod-io/gitpod/components/gitpod-db/go"
"github.com/gitpod-io/gitpod/components/public-api/go/experimental/v1/v1connect"
Expand Down Expand Up @@ -55,6 +58,8 @@ type Config struct {

// Where to find the gRPC/Connect APIs on the server component
ServerAddress string `json:"serverAddress"`

GitpodHost string `json:"gitpodHost"`
}

type RedisConfiguration struct {
Expand Down Expand Up @@ -136,66 +141,156 @@ func Start(cfg Config, version string) error {
return v1.NewUsageServiceClient(selfConnection), v1.NewBillingServiceClient(selfConnection), nil
}

var schedulerJobSpecs []scheduler.JobSpec
if cfg.LedgerSchedule != "" {
// we do not run the controller if there is no schedule defined.
schedule, err := time.ParseDuration(cfg.LedgerSchedule)
if err != nil {
return fmt.Errorf("failed to parse schedule duration: %w", err)
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

jobSpec, err := scheduler.NewLedgerTriggerJob(schedule,
scheduler.NewLedgerTrigger(jobClientsConstructor),
)
if err != nil {
return fmt.Errorf("failed to setup ledger trigger job: %w", err)
}
exps := experiments.NewClient(experiments.WithPollInterval(1 * time.Minute))
startScheduler(ctx, exps, cfg, redsyncPool, jobClientsConstructor)

schedulerJobSpecs = append(schedulerJobSpecs, jobSpec)
err = registerGRPCServices(srv, conn, stripeClient, pricer, cfg)
if err != nil {
return fmt.Errorf("failed to register gRPC services: %w", err)
}

} else {
log.Info("No controller schedule specified, controller will be disabled.")
scheduler.RegisterMetrics(srv.MetricsRegistry())

err = stripe.RegisterMetrics(srv.MetricsRegistry())
if err != nil {
return fmt.Errorf("failed to register stripe metrics: %w", err)
}

if cfg.ResetUsageSchedule != "" {
schedule, err := time.ParseDuration(cfg.ResetUsageSchedule)
if err != nil {
return fmt.Errorf("failed to parse reset usage schedule as duration: %w", err)
err = srv.ListenAndServe()
if err != nil {
return fmt.Errorf("failed to listen and serve: %w", err)
}

return nil
}

func startScheduler(ctx context.Context, exps experiments.Client, cfg Config, redsyncPool *redsync.Redsync, jobClientsConstructor scheduler.ClientsConstructor) {
getLedgerSchedule := func() string {
schedule := exps.GetStringValue(ctx, "usage_update_scheduler_duration", cfg.LedgerSchedule, experiments.Attributes{
GitpodHost: cfg.GitpodHost,
})
if schedule == "undefined" {
schedule = cfg.LedgerSchedule
}
return schedule
}
ledgerSchedule := getLedgerSchedule()

var (
sch *scheduler.Scheduler
lock sync.Mutex // Lock sch and ledgerSchedule update
)

spec, err := scheduler.NewResetUsageJob(schedule, jobClientsConstructor)
start := func(ledgerDuration string) {
scheduler, err := createScheduler(redsyncPool, jobClientsConstructor, ledgerDuration, cfg.ResetUsageSchedule)
if err != nil {
return fmt.Errorf("failed to setup reset usage job: %w", err)
log.WithError(err).Error("failed to create schedulers: %w", err)
return
}

schedulerJobSpecs = append(schedulerJobSpecs, spec)
lock.Lock()
defer lock.Unlock()

if sch != nil {
sch.Stop()
}
ledgerSchedule = ledgerDuration
sch = scheduler
scheduler.Start()
}

sched := scheduler.New(redsyncPool, schedulerJobSpecs...)
sched.Start()
defer sched.Stop()
start(ledgerSchedule)

// periodically check if the schedule FF has changed
go func() {
ticker := time.NewTicker(3 * time.Second)
defer ticker.Stop()

for {
select {
case <-ctx.Done():
lock.Lock()
if sch != nil {
sch.Stop()
}
lock.Unlock()
return
case <-ticker.C:
newLedgerSchedule := getLedgerSchedule()
if ledgerSchedule == newLedgerSchedule {
continue
}
if _, err := time.ParseDuration(newLedgerSchedule); err != nil {
log.WithError(err).Warn("invalid duration")
} else {
log.WithField("before", ledgerSchedule).WithField("now", newLedgerSchedule).Info("restarting scheduler")
start(newLedgerSchedule)
}
}
}
}()
}

err = registerGRPCServices(srv, conn, stripeClient, pricer, cfg)
if err != nil {
return fmt.Errorf("failed to register gRPC services: %w", err)
func createScheduler(redsyncPool *redsync.Redsync, jobClientsConstructor scheduler.ClientsConstructor, ledgerSchedule, resetUsageSchedule string) (*scheduler.Scheduler, error) {
var schedulerJobSpecs []scheduler.JobSpec
appendLedgerJob := func() error {
if ledgerSchedule != "" {
// we do not run the controller if there is no schedule defined.
schedule, err := time.ParseDuration(ledgerSchedule)
if err != nil {
return fmt.Errorf("failed to parse schedule duration: %w", err)
}

jobSpec, err := scheduler.NewLedgerTriggerJob(schedule,
scheduler.NewLedgerTrigger(jobClientsConstructor),
)
if err != nil {
return fmt.Errorf("failed to setup ledger trigger job: %w", err)
}

schedulerJobSpecs = append(schedulerJobSpecs, jobSpec)
} else {
log.Info("No controller schedule specified, controller will be disabled.")
}
return nil
}

err = scheduler.RegisterMetrics(srv.MetricsRegistry())
if err != nil {
return fmt.Errorf("failed to register controller metrics: %w", err)
appendResetUsageJob := func() error {
if resetUsageSchedule != "" {
schedule, err := time.ParseDuration(resetUsageSchedule)
if err != nil {
return fmt.Errorf("failed to parse reset usage schedule as duration: %w", err)
}

spec, err := scheduler.NewResetUsageJob(schedule, jobClientsConstructor)
if err != nil {
return fmt.Errorf("failed to setup reset usage job: %w", err)
}

schedulerJobSpecs = append(schedulerJobSpecs, spec)
} else {
log.Info("No resetUsage schedule specified, controller will be disabled.")
}
return nil
}

err = stripe.RegisterMetrics(srv.MetricsRegistry())
if err != nil {
return fmt.Errorf("failed to register stripe metrics: %w", err)
if err := appendLedgerJob(); err != nil {
log.WithError(err).Error("failed to append ledger job")
}
if err := appendResetUsageJob(); err != nil {
log.WithError(err).Error("failed to append reset usage job")
}

err = srv.ListenAndServe()
if err != nil {
return fmt.Errorf("failed to listen and serve: %w", err)
if len(schedulerJobSpecs) == 0 {
return nil, fmt.Errorf("no jobs to schedule")
}

return nil
sched := scheduler.New(redsyncPool, schedulerJobSpecs...)
sched.Start()
return sched, nil
}

func registerGRPCServices(srv *baseserver.Server, conn *gorm.DB, stripeClient *stripe.Client, pricer *apiv1.WorkspacePricer, cfg Config) error {
Expand Down
Loading

0 comments on commit 9382d33

Please sign in to comment.