Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for pushback on GCP #92

Merged
merged 5 commits into from
Jul 29, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions log.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package tessera

import (
"errors"
"fmt"
"time"

Expand All @@ -24,6 +25,13 @@ import (

const DefaultBatchMaxSize = 1

// ErrPushback is returned by underlying storage implementations when there are too many
// entries with indices assigned but which have not yet been integrated into the tree.
//
// Personalities encountering this error should apply back-pressure to the source of new entries
// in an appropriate manner (e.g. for HTTP services, return a 429 with a Retry-After header).
var ErrPushback = errors.New("too many unintegrated entries")

// NewCPFunc is the signature of a function which knows how to format and sign checkpoints.
type NewCPFunc func(size uint64, hash []byte) ([]byte, error)

Expand All @@ -37,6 +45,8 @@ type StorageOptions struct {

BatchMaxAge time.Duration
BatchMaxSize uint

PushbackMaxOutstanding uint
}

// ResolveStorageOptions turns a variadic array of storage options into a StorageOptions instance.
Expand Down Expand Up @@ -89,3 +99,13 @@ func WithBatching(maxSize uint, maxAge time.Duration) func(*StorageOptions) {
o.BatchMaxSize = maxSize
}
}

// WithPushback allows configuration of when the storage should start pushing back on add requests.
//
// maxOutstanding is the number of "in-flight" add requests - i.e. the number of entries with sequence numbers
// assigned, but which are not yet integrated into the log.
func WithPushback(maxOutstanding uint) func(*StorageOptions) {
return func(o *StorageOptions) {
o.PushbackMaxOutstanding = maxOutstanding
}
}
46 changes: 37 additions & 9 deletions storage/gcp/gcp.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,12 @@ import (
"k8s.io/klog/v2"
)

const entryBundleSize = 256
const (
entryBundleSize = 256

DefaultPushbackMaxOutstanding = 4096
DefaultIntegrationSizeLimit = 2048
)

// Storage is a GCP based storage implementation for Tessera.
type Storage struct {
Expand Down Expand Up @@ -103,6 +108,11 @@ type Config struct {

// New creates a new instance of the GCP based Storage.
func New(ctx context.Context, cfg Config, opts ...func(*tessera.StorageOptions)) (*Storage, error) {
opt := tessera.ResolveStorageOptions(nil, opts...)
if opt.PushbackMaxOutstanding == 0 {
opt.PushbackMaxOutstanding = DefaultPushbackMaxOutstanding
}

c, err := gcs.NewClient(ctx, gcs.WithJSONReads())
if err != nil {
return nil, fmt.Errorf("failed to create GCS client: %v", err)
Expand All @@ -111,12 +121,11 @@ func New(ctx context.Context, cfg Config, opts ...func(*tessera.StorageOptions))
if err != nil {
return nil, fmt.Errorf("failed to create GCS storage: %v", err)
}
seq, err := newSpannerSequencer(ctx, cfg.Spanner)
seq, err := newSpannerSequencer(ctx, cfg.Spanner, uint64(opt.PushbackMaxOutstanding))
if err != nil {
return nil, fmt.Errorf("failed to create Spanner sequencer: %v", err)
}

opt := tessera.ResolveStorageOptions(nil, opts...)
r := &Storage{
gcsClient: c,
projectID: cfg.ProjectID,
Expand All @@ -139,7 +148,7 @@ func New(ctx context.Context, cfg Config, opts ...func(*tessera.StorageOptions))
for {
cctx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()
if more, err := r.sequencer.consumeEntries(cctx, 2048 /*limit*/, r.integrate); err != nil {
if more, err := r.sequencer.consumeEntries(cctx, DefaultIntegrationSizeLimit, r.integrate); err != nil {
klog.Errorf("integrate: %v", err)
break
} else if !more {
Expand Down Expand Up @@ -369,18 +378,20 @@ func (s *Storage) updateEntryBundles(ctx context.Context, fromSeq uint64, entrie
// spannerSequencer uses Cloud Spanner to provide
// a durable and thread/multi-process safe sequencer.
type spannerSequencer struct {
dbPool *spanner.Client
dbPool *spanner.Client
maxOutstanding uint64
}

// new SpannerSequencer returns a new spannerSequencer struct which uses the provided
// spanner resource name for its spanner connection.
func newSpannerSequencer(ctx context.Context, spannerDB string) (*spannerSequencer, error) {
func newSpannerSequencer(ctx context.Context, spannerDB string, maxOutstanding uint64) (*spannerSequencer, error) {
dbPool, err := spanner.NewClient(ctx, spannerDB)
if err != nil {
return nil, fmt.Errorf("failed to connect to Spanner: %v", err)
}
r := &spannerSequencer{
dbPool: dbPool,
dbPool: dbPool,
maxOutstanding: maxOutstanding,
}
if err := r.initDB(ctx); err != nil {
return nil, fmt.Errorf("failed to initDB: %v", err)
Expand Down Expand Up @@ -442,6 +453,17 @@ func (s *spannerSequencer) initDB(ctx context.Context) error {
// This is achieved by storing the passed-in entries in the Seq table in Spanner, keyed by the
// index assigned to the first entry in the batch.
func (s *spannerSequencer) assignEntries(ctx context.Context, entries []*tessera.Entry) error {
// First grab the treeSize in a non-locking read-only fashion (we don't want to block/collide with integration).
// We'll use this value to determine whether we need to apply back-pressure.
var treeSize int64
if row, err := s.dbPool.Single().ReadRow(ctx, "IntCoord", spanner.Key{0}, []string{"seq"}); err != nil {
return err
} else {
if err := row.Column(0, &treeSize); err != nil {
return fmt.Errorf("failed to read integration coordination info: %v", err)
}
}

var next int64 // Unfortunately, Spanner doesn't support uint64 so we'll have to cast around a bit.

_, err := s.dbPool.ReadWriteTransaction(ctx, func(ctx context.Context, txn *spanner.ReadWriteTransaction) error {
Expand All @@ -454,8 +476,14 @@ func (s *spannerSequencer) assignEntries(ctx context.Context, entries []*tessera
if err := row.Columns(&id, &next); err != nil {
return fmt.Errorf("failed to parse id column: %v", err)
}
next := uint64(next) // Shadow next with a uint64 version of the same value to save on casts.

// Check whether there are too many outstanding entries and we should apply
// back-pressure.
if outstanding := next - treeSize; outstanding > int64(s.maxOutstanding) {
return tessera.ErrPushback
}

next := uint64(next) // Shadow next with a uint64 version of the same value to save on casts.
sequencedEntries := make([]storage.SequencedEntry, len(entries))
// Assign provisional sequence numbers to entries.
// We need to do this here in order to support serialisations which include the log position.
Expand Down Expand Up @@ -490,7 +518,7 @@ func (s *spannerSequencer) assignEntries(ctx context.Context, entries []*tessera
})

if err != nil {
return fmt.Errorf("failed to flush batch: %v", err)
return fmt.Errorf("failed to flush batch: %w", err)
}

return nil
Expand Down
59 changes: 57 additions & 2 deletions storage/gcp/gcp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func TestSpannerSequencerAssignEntries(t *testing.T) {
close := newSpannerDB(t)
defer close()

seq, err := newSpannerSequencer(ctx, "projects/p/instances/i/databases/d")
seq, err := newSpannerSequencer(ctx, "projects/p/instances/i/databases/d", 1000)
if err != nil {
t.Fatalf("newSpannerSequencer: %v", err)
}
Expand All @@ -86,12 +86,67 @@ func TestSpannerSequencerAssignEntries(t *testing.T) {
}
}

func TestSpannerSequencerPushback(t *testing.T) {
ctx := context.Background()

for _, test := range []struct {
name string
threshold uint64
initialEntries int
wantPushback bool
}{
{
name: "no pushback: num < threshold",
threshold: 10,
initialEntries: 5,
},
{
name: "no pushback: num = threshold",
threshold: 10,
initialEntries: 10,
},
{
name: "pushback: initial > threshold",
threshold: 10,
initialEntries: 15,
wantPushback: true,
},
} {
t.Run(test.name, func(t *testing.T) {
close := newSpannerDB(t)
defer close()

seq, err := newSpannerSequencer(ctx, "projects/p/instances/i/databases/d", test.threshold)
if err != nil {
t.Fatalf("newSpannerSequencer: %v", err)
}
// Set up the test scenario with the configured number of initial outstanding entries
entries := []*tessera.Entry{}
for i := 0; i < test.initialEntries; i++ {
entries = append(entries, tessera.NewEntry([]byte(fmt.Sprintf("initial item %d", i))))
}
if err := seq.assignEntries(ctx, entries); err != nil {
t.Fatalf("initial assignEntries: %v", err)
}

// Now perform the test with a single additional entry to check for pushback
entries = []*tessera.Entry{tessera.NewEntry([]byte("additional"))}
err = seq.assignEntries(ctx, entries)
if gotPushback := errors.Is(err, tessera.ErrPushback); gotPushback != test.wantPushback {
t.Fatalf("assignEntries: got pushback %t (%v), want pushback: %t", gotPushback, err, test.wantPushback)
} else if !gotPushback && err != nil {
t.Fatalf("assignEntries: %v", err)
}
})
}
}

func TestSpannerSequencerRoundTrip(t *testing.T) {
ctx := context.Background()
close := newSpannerDB(t)
defer close()

s, err := newSpannerSequencer(ctx, "projects/p/instances/i/databases/d")
s, err := newSpannerSequencer(ctx, "projects/p/instances/i/databases/d", 1000)
if err != nil {
t.Fatalf("newSpannerSequencer: %v", err)
}
Expand Down
Loading