diff --git a/storage/gcp/gcp_test.go b/storage/gcp/gcp_test.go index fa56e230..cfe79d90 100644 --- a/storage/gcp/gcp_test.go +++ b/storage/gcp/gcp_test.go @@ -63,7 +63,7 @@ func TestSpannerSequencerAssignEntries(t *testing.T) { close := newSpannerDB(t) defer close() - seq, err := newSpannerSequencer(ctx, "projects/p/instances/i/databases/d") + seq, err := newSpannerSequencer(ctx, "projects/p/instances/i/databases/d", 1000) if err != nil { t.Fatalf("newSpannerSequencer: %v", err) } @@ -86,12 +86,67 @@ func TestSpannerSequencerAssignEntries(t *testing.T) { } } +func TestSpannerSequencerPushback(t *testing.T) { + ctx := context.Background() + + for _, test := range []struct { + name string + threshold uint64 + initialEntries int + wantPushback bool + }{ + { + name: "no pushback: num < threshold", + threshold: 10, + initialEntries: 5, + }, + { + name: "no pushback: num = threshold", + threshold: 10, + initialEntries: 10, + }, + { + name: "pushback: initial > threshold", + threshold: 10, + initialEntries: 15, + wantPushback: true, + }, + } { + t.Run(test.name, func(t *testing.T) { + close := newSpannerDB(t) + defer close() + + seq, err := newSpannerSequencer(ctx, "projects/p/instances/i/databases/d", test.threshold) + if err != nil { + t.Fatalf("newSpannerSequencer: %v", err) + } + // Set up the test scenario with the configured number of initial outstanding entries + entries := []*tessera.Entry{} + for i := 0; i < test.initialEntries; i++ { + entries = append(entries, tessera.NewEntry([]byte(fmt.Sprintf("initial item %d", i)))) + } + if err := seq.assignEntries(ctx, entries); err != nil { + t.Fatalf("initial assignEntries: %v", err) + } + + // Now perform the test with a single additional entry to check for pushback + entries = []*tessera.Entry{tessera.NewEntry([]byte("additional"))} + err = seq.assignEntries(ctx, entries) + if gotPushback := errors.Is(err, tessera.ErrPushback); gotPushback != test.wantPushback { + t.Fatalf("assignEntries: got pushback %t (%v), want pushback: %t", gotPushback, err, test.wantPushback) + } else if !gotPushback && err != nil { + t.Fatalf("assignEntries: %v", err) + } + }) + } +} + func TestSpannerSequencerRoundTrip(t *testing.T) { ctx := context.Background() close := newSpannerDB(t) defer close() - s, err := newSpannerSequencer(ctx, "projects/p/instances/i/databases/d") + s, err := newSpannerSequencer(ctx, "projects/p/instances/i/databases/d", 1000) if err != nil { t.Fatalf("newSpannerSequencer: %v", err) }