Skip to content

Commit

Permalink
Allow intervals to be configured (#132)
Browse files Browse the repository at this point in the history
  • Loading branch information
AlCutter authored Dec 11, 2023
1 parent c8d8e58 commit 71bfa34
Showing 1 changed file with 16 additions and 6 deletions.
22 changes: 16 additions & 6 deletions omniwitness/omniwitness.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,8 @@ type LogStateWriteOps = persistence.LogStateWriteOps
const (
// Interval between attempts to feed checkpoints
// TODO(mhutchinson): Make this configurable
feedInterval = 5 * time.Minute
distributeInterval = 5 * time.Minute
defaultFeedInterval = 1 * time.Minute
defaultDistributeInterval = 1 * time.Minute
)

// OperatorConfig allows the bare minimum operator-specific configuration.
Expand All @@ -75,6 +75,9 @@ type OperatorConfig struct {
// to a distributor that takes witnessed checkpoints via a PUT request.
// TODO(mhutchinson): This should be baked into the code when there is a public distributor.
RestDistributorBaseURL string

FeedInterval time.Duration
DistributeInterval time.Duration
}

// logFeeder is the de-facto interface that feeders implement.
Expand Down Expand Up @@ -124,6 +127,13 @@ func Main(ctx context.Context, operatorConfig OperatorConfig, p LogStatePersiste
return fmt.Errorf("failed to create witness: %v", err)
}

if operatorConfig.FeedInterval == 0 {
operatorConfig.FeedInterval = defaultFeedInterval
}
if operatorConfig.DistributeInterval == 0 {
operatorConfig.DistributeInterval = defaultDistributeInterval
}

bw := witnessAdapter{
w: witness,
}
Expand All @@ -133,7 +143,7 @@ func Main(ctx context.Context, operatorConfig OperatorConfig, p LogStatePersiste
g.Go(func() error {
glog.Infof("Feeder %q goroutine started", c.Origin)
defer glog.Infof("Feeder %q goroutine done", c.Origin)
return f(ctx, c, bw, httpClient, feedInterval)
return f(ctx, c, bw, httpClient, operatorConfig.FeedInterval)
})
}

Expand All @@ -143,7 +153,7 @@ func Main(ctx context.Context, operatorConfig OperatorConfig, p LogStatePersiste
for l := range feeders {
logs = append(logs, l)
}
runRestDistributors(ctx, g, httpClient, logs, operatorConfig.RestDistributorBaseURL, bw, signerCosigV1.Verifier())
runRestDistributors(ctx, g, httpClient, operatorConfig.DistributeInterval, logs, operatorConfig.RestDistributorBaseURL, bw, signerCosigV1.Verifier())
}

r := mux.NewRouter()
Expand All @@ -168,7 +178,7 @@ func Main(ctx context.Context, operatorConfig OperatorConfig, p LogStatePersiste
return g.Wait()
}

func runRestDistributors(ctx context.Context, g *errgroup.Group, httpClient *http.Client, logs []config.Log, distributorBaseURL string, bw witnessAdapter, witnessV note.Verifier) {
func runRestDistributors(ctx context.Context, g *errgroup.Group, httpClient *http.Client, interval time.Duration, logs []config.Log, distributorBaseURL string, bw witnessAdapter, witnessV note.Verifier) {
g.Go(func() error {
d, err := rest.NewDistributor(distributorBaseURL, httpClient, logs, witnessV, bw)
if err != nil {
Expand All @@ -179,7 +189,7 @@ func runRestDistributors(ctx context.Context, g *errgroup.Group, httpClient *htt
}
for {
select {
case <-time.After(distributeInterval):
case <-time.After(interval):
case <-ctx.Done():
return ctx.Err()
}
Expand Down

0 comments on commit 71bfa34

Please sign in to comment.