diff --git a/cmd/example-gcp/main.go b/cmd/example-gcp/main.go index 4bf75f55..7e24ec16 100644 --- a/cmd/example-gcp/main.go +++ b/cmd/example-gcp/main.go @@ -19,6 +19,7 @@ package main import ( "context" "crypto/sha256" + "errors" "flag" "fmt" "io" @@ -70,6 +71,11 @@ func main() { id := sha256.Sum256(b) idx, err := storage.Add(r.Context(), tessera.NewEntry(b, tessera.WithIdentity(id[:]))) if err != nil { + if errors.Is(err, tessera.ErrPushback) { + w.Header().Add("Retry-After", "1") + w.WriteHeader(http.StatusServiceUnavailable) + return + } w.WriteHeader(http.StatusInternalServerError) _, _ = w.Write([]byte(err.Error())) return diff --git a/log.go b/log.go index 76a57bdc..9512af70 100644 --- a/log.go +++ b/log.go @@ -15,6 +15,7 @@ package tessera import ( + "errors" "fmt" "time" @@ -24,6 +25,13 @@ import ( const DefaultBatchMaxSize = 1 +// ErrPushback is returned by underlying storage implementations when there are too many +// entries with indices assigned but which have not yet been integrated into the tree. +// +// Personalities encountering this error should apply back-pressure to the source of new entries +// in an appropriate manner (e.g. for HTTP services, return a 503 with a Retry-After header). +var ErrPushback = errors.New("too many unintegrated entries") + // NewCPFunc is the signature of a function which knows how to format and sign checkpoints. type NewCPFunc func(size uint64, hash []byte) ([]byte, error) @@ -37,6 +45,8 @@ type StorageOptions struct { BatchMaxAge time.Duration BatchMaxSize uint + + PushbackMaxOutstanding uint } // ResolveStorageOptions turns a variadic array of storage options into a StorageOptions instance. @@ -89,3 +99,13 @@ func WithBatching(maxSize uint, maxAge time.Duration) func(*StorageOptions) { o.BatchMaxSize = maxSize } } + +// WithPushback allows configuration of when the storage should start pushing back on add requests. +// +// maxOutstanding is the number of "in-flight" add requests - i.e. the number of entries with sequence numbers +// assigned, but which are not yet integrated into the log. +func WithPushback(maxOutstanding uint) func(*StorageOptions) { + return func(o *StorageOptions) { + o.PushbackMaxOutstanding = maxOutstanding + } +} diff --git a/storage/gcp/gcp.go b/storage/gcp/gcp.go index 4fd3e222..fd321214 100644 --- a/storage/gcp/gcp.go +++ b/storage/gcp/gcp.go @@ -54,7 +54,12 @@ import ( "k8s.io/klog/v2" ) -const entryBundleSize = 256 +const ( + entryBundleSize = 256 + + DefaultPushbackMaxOutstanding = 4096 + DefaultIntegrationSizeLimit = 2048 +) // Storage is a GCP based storage implementation for Tessera. type Storage struct { @@ -103,6 +108,11 @@ type Config struct { // New creates a new instance of the GCP based Storage. func New(ctx context.Context, cfg Config, opts ...func(*tessera.StorageOptions)) (*Storage, error) { + opt := tessera.ResolveStorageOptions(nil, opts...) + if opt.PushbackMaxOutstanding == 0 { + opt.PushbackMaxOutstanding = DefaultPushbackMaxOutstanding + } + c, err := gcs.NewClient(ctx, gcs.WithJSONReads()) if err != nil { return nil, fmt.Errorf("failed to create GCS client: %v", err) @@ -111,12 +121,11 @@ func New(ctx context.Context, cfg Config, opts ...func(*tessera.StorageOptions)) if err != nil { return nil, fmt.Errorf("failed to create GCS storage: %v", err) } - seq, err := newSpannerSequencer(ctx, cfg.Spanner) + seq, err := newSpannerSequencer(ctx, cfg.Spanner, uint64(opt.PushbackMaxOutstanding)) if err != nil { return nil, fmt.Errorf("failed to create Spanner sequencer: %v", err) } - opt := tessera.ResolveStorageOptions(nil, opts...) r := &Storage{ gcsClient: c, projectID: cfg.ProjectID, @@ -139,7 +148,7 @@ func New(ctx context.Context, cfg Config, opts ...func(*tessera.StorageOptions)) for { cctx, cancel := context.WithTimeout(ctx, 10*time.Second) defer cancel() - if more, err := r.sequencer.consumeEntries(cctx, 2048 /*limit*/, r.integrate); err != nil { + if more, err := r.sequencer.consumeEntries(cctx, DefaultIntegrationSizeLimit, r.integrate); err != nil { klog.Errorf("integrate: %v", err) break } else if !more { @@ -369,18 +378,20 @@ func (s *Storage) updateEntryBundles(ctx context.Context, fromSeq uint64, entrie // spannerSequencer uses Cloud Spanner to provide // a durable and thread/multi-process safe sequencer. type spannerSequencer struct { - dbPool *spanner.Client + dbPool *spanner.Client + maxOutstanding uint64 } // new SpannerSequencer returns a new spannerSequencer struct which uses the provided // spanner resource name for its spanner connection. -func newSpannerSequencer(ctx context.Context, spannerDB string) (*spannerSequencer, error) { +func newSpannerSequencer(ctx context.Context, spannerDB string, maxOutstanding uint64) (*spannerSequencer, error) { dbPool, err := spanner.NewClient(ctx, spannerDB) if err != nil { return nil, fmt.Errorf("failed to connect to Spanner: %v", err) } r := &spannerSequencer{ - dbPool: dbPool, + dbPool: dbPool, + maxOutstanding: maxOutstanding, } if err := r.initDB(ctx); err != nil { return nil, fmt.Errorf("failed to initDB: %v", err) @@ -442,6 +453,17 @@ func (s *spannerSequencer) initDB(ctx context.Context) error { // This is achieved by storing the passed-in entries in the Seq table in Spanner, keyed by the // index assigned to the first entry in the batch. func (s *spannerSequencer) assignEntries(ctx context.Context, entries []*tessera.Entry) error { + // First grab the treeSize in a non-locking read-only fashion (we don't want to block/collide with integration). + // We'll use this value to determine whether we need to apply back-pressure. + var treeSize int64 + if row, err := s.dbPool.Single().ReadRow(ctx, "IntCoord", spanner.Key{0}, []string{"seq"}); err != nil { + return err + } else { + if err := row.Column(0, &treeSize); err != nil { + return fmt.Errorf("failed to read integration coordination info: %v", err) + } + } + var next int64 // Unfortunately, Spanner doesn't support uint64 so we'll have to cast around a bit. _, err := s.dbPool.ReadWriteTransaction(ctx, func(ctx context.Context, txn *spanner.ReadWriteTransaction) error { @@ -454,8 +476,14 @@ func (s *spannerSequencer) assignEntries(ctx context.Context, entries []*tessera if err := row.Columns(&id, &next); err != nil { return fmt.Errorf("failed to parse id column: %v", err) } - next := uint64(next) // Shadow next with a uint64 version of the same value to save on casts. + // Check whether there are too many outstanding entries and we should apply + // back-pressure. + if outstanding := next - treeSize; outstanding > int64(s.maxOutstanding) { + return tessera.ErrPushback + } + + next := uint64(next) // Shadow next with a uint64 version of the same value to save on casts. sequencedEntries := make([]storage.SequencedEntry, len(entries)) // Assign provisional sequence numbers to entries. // We need to do this here in order to support serialisations which include the log position. @@ -490,7 +518,7 @@ func (s *spannerSequencer) assignEntries(ctx context.Context, entries []*tessera }) if err != nil { - return fmt.Errorf("failed to flush batch: %v", err) + return fmt.Errorf("failed to flush batch: %w", err) } return nil diff --git a/storage/gcp/gcp_test.go b/storage/gcp/gcp_test.go index fa56e230..cfe79d90 100644 --- a/storage/gcp/gcp_test.go +++ b/storage/gcp/gcp_test.go @@ -63,7 +63,7 @@ func TestSpannerSequencerAssignEntries(t *testing.T) { close := newSpannerDB(t) defer close() - seq, err := newSpannerSequencer(ctx, "projects/p/instances/i/databases/d") + seq, err := newSpannerSequencer(ctx, "projects/p/instances/i/databases/d", 1000) if err != nil { t.Fatalf("newSpannerSequencer: %v", err) } @@ -86,12 +86,67 @@ func TestSpannerSequencerAssignEntries(t *testing.T) { } } +func TestSpannerSequencerPushback(t *testing.T) { + ctx := context.Background() + + for _, test := range []struct { + name string + threshold uint64 + initialEntries int + wantPushback bool + }{ + { + name: "no pushback: num < threshold", + threshold: 10, + initialEntries: 5, + }, + { + name: "no pushback: num = threshold", + threshold: 10, + initialEntries: 10, + }, + { + name: "pushback: initial > threshold", + threshold: 10, + initialEntries: 15, + wantPushback: true, + }, + } { + t.Run(test.name, func(t *testing.T) { + close := newSpannerDB(t) + defer close() + + seq, err := newSpannerSequencer(ctx, "projects/p/instances/i/databases/d", test.threshold) + if err != nil { + t.Fatalf("newSpannerSequencer: %v", err) + } + // Set up the test scenario with the configured number of initial outstanding entries + entries := []*tessera.Entry{} + for i := 0; i < test.initialEntries; i++ { + entries = append(entries, tessera.NewEntry([]byte(fmt.Sprintf("initial item %d", i)))) + } + if err := seq.assignEntries(ctx, entries); err != nil { + t.Fatalf("initial assignEntries: %v", err) + } + + // Now perform the test with a single additional entry to check for pushback + entries = []*tessera.Entry{tessera.NewEntry([]byte("additional"))} + err = seq.assignEntries(ctx, entries) + if gotPushback := errors.Is(err, tessera.ErrPushback); gotPushback != test.wantPushback { + t.Fatalf("assignEntries: got pushback %t (%v), want pushback: %t", gotPushback, err, test.wantPushback) + } else if !gotPushback && err != nil { + t.Fatalf("assignEntries: %v", err) + } + }) + } +} + func TestSpannerSequencerRoundTrip(t *testing.T) { ctx := context.Background() close := newSpannerDB(t) defer close() - s, err := newSpannerSequencer(ctx, "projects/p/instances/i/databases/d") + s, err := newSpannerSequencer(ctx, "projects/p/instances/i/databases/d", 1000) if err != nil { t.Fatalf("newSpannerSequencer: %v", err) }