From 71bfa3432a4d645ad3991a0a9b3dd2dbfc6e6008 Mon Sep 17 00:00:00 2001 From: Al Cutter Date: Mon, 11 Dec 2023 17:09:07 +0100 Subject: [PATCH] Allow intervals to be configured (#132) --- omniwitness/omniwitness.go | 22 ++++++++++++++++------ 1 file changed, 16 insertions(+), 6 deletions(-) diff --git a/omniwitness/omniwitness.go b/omniwitness/omniwitness.go index 4ee51bc..30b1212 100644 --- a/omniwitness/omniwitness.go +++ b/omniwitness/omniwitness.go @@ -62,8 +62,8 @@ type LogStateWriteOps = persistence.LogStateWriteOps const ( // Interval between attempts to feed checkpoints // TODO(mhutchinson): Make this configurable - feedInterval = 5 * time.Minute - distributeInterval = 5 * time.Minute + defaultFeedInterval = 1 * time.Minute + defaultDistributeInterval = 1 * time.Minute ) // OperatorConfig allows the bare minimum operator-specific configuration. @@ -75,6 +75,9 @@ type OperatorConfig struct { // to a distributor that takes witnessed checkpoints via a PUT request. // TODO(mhutchinson): This should be baked into the code when there is a public distributor. RestDistributorBaseURL string + + FeedInterval time.Duration + DistributeInterval time.Duration } // logFeeder is the de-facto interface that feeders implement. @@ -124,6 +127,13 @@ func Main(ctx context.Context, operatorConfig OperatorConfig, p LogStatePersiste return fmt.Errorf("failed to create witness: %v", err) } + if operatorConfig.FeedInterval == 0 { + operatorConfig.FeedInterval = defaultFeedInterval + } + if operatorConfig.DistributeInterval == 0 { + operatorConfig.DistributeInterval = defaultDistributeInterval + } + bw := witnessAdapter{ w: witness, } @@ -133,7 +143,7 @@ func Main(ctx context.Context, operatorConfig OperatorConfig, p LogStatePersiste g.Go(func() error { glog.Infof("Feeder %q goroutine started", c.Origin) defer glog.Infof("Feeder %q goroutine done", c.Origin) - return f(ctx, c, bw, httpClient, feedInterval) + return f(ctx, c, bw, httpClient, operatorConfig.FeedInterval) }) } @@ -143,7 +153,7 @@ func Main(ctx context.Context, operatorConfig OperatorConfig, p LogStatePersiste for l := range feeders { logs = append(logs, l) } - runRestDistributors(ctx, g, httpClient, logs, operatorConfig.RestDistributorBaseURL, bw, signerCosigV1.Verifier()) + runRestDistributors(ctx, g, httpClient, operatorConfig.DistributeInterval, logs, operatorConfig.RestDistributorBaseURL, bw, signerCosigV1.Verifier()) } r := mux.NewRouter() @@ -168,7 +178,7 @@ func Main(ctx context.Context, operatorConfig OperatorConfig, p LogStatePersiste return g.Wait() } -func runRestDistributors(ctx context.Context, g *errgroup.Group, httpClient *http.Client, logs []config.Log, distributorBaseURL string, bw witnessAdapter, witnessV note.Verifier) { +func runRestDistributors(ctx context.Context, g *errgroup.Group, httpClient *http.Client, interval time.Duration, logs []config.Log, distributorBaseURL string, bw witnessAdapter, witnessV note.Verifier) { g.Go(func() error { d, err := rest.NewDistributor(distributorBaseURL, httpClient, logs, witnessV, bw) if err != nil { @@ -179,7 +189,7 @@ func runRestDistributors(ctx context.Context, g *errgroup.Group, httpClient *htt } for { select { - case <-time.After(distributeInterval): + case <-time.After(interval): case <-ctx.Done(): return ctx.Err() }