diff --git a/cmd/example-gcp/main.go b/cmd/example-gcp/main.go index 7e24ec16..9d40c846 100644 --- a/cmd/example-gcp/main.go +++ b/cmd/example-gcp/main.go @@ -55,6 +55,7 @@ func main() { storage, err := gcp.New(ctx, gcpCfg, tessera.WithCheckpointSignerVerifier(signerFromFlags(), nil), tessera.WithBatching(1024, time.Second), + tessera.WithPushback(10*4096), ) if err != nil { klog.Exitf("Failed to create new GCP storage: %v", err) diff --git a/storage/gcp/gcp.go b/storage/gcp/gcp.go index fd321214..515aaea7 100644 --- a/storage/gcp/gcp.go +++ b/storage/gcp/gcp.go @@ -58,7 +58,7 @@ const ( entryBundleSize = 256 DefaultPushbackMaxOutstanding = 4096 - DefaultIntegrationSizeLimit = 2048 + DefaultIntegrationSizeLimit = 5 * 4096 ) // Storage is a GCP based storage implementation for Tessera. @@ -145,16 +145,12 @@ func New(ctx context.Context, cfg Config, opts ...func(*tessera.StorageOptions)) return case <-t.C: } - for { - cctx, cancel := context.WithTimeout(ctx, 10*time.Second) - defer cancel() - if more, err := r.sequencer.consumeEntries(cctx, DefaultIntegrationSizeLimit, r.integrate); err != nil { - klog.Errorf("integrate: %v", err) - break - } else if !more { - break - } - klog.V(1).Info("Quickloop") + // Don't quicklook for now, it causes issues updating checkpoint too frequently. + cctx, cancel := context.WithTimeout(ctx, 10*time.Second) + defer cancel() + if _, err := r.sequencer.consumeEntries(cctx, DefaultIntegrationSizeLimit, r.integrate); err != nil { + klog.Errorf("integrate: %v", err) + break } } }()