From c92c1e27b05055bdc6961c692f752cb072ed427c Mon Sep 17 00:00:00 2001 From: Al Cutter Date: Tue, 9 Jul 2024 15:21:44 +0100 Subject: [PATCH] Add spanner sequencer impl --- go.mod | 2 + storage/gcp/gcp.go | 112 ++++++++++++++++++++-------------------- storage/gcp/gcp_test.go | 53 +++++++++++++++++++ 3 files changed, 110 insertions(+), 57 deletions(-) diff --git a/go.mod b/go.mod index 00b338b70..9b5a3feb1 100644 --- a/go.mod +++ b/go.mod @@ -13,6 +13,8 @@ require ( k8s.io/klog/v2 v2.130.1 ) +require cloud.google.com/go/longrunning v0.5.7 // indirect + require ( cel.dev/expr v0.15.0 // indirect google.golang.org/genproto/googleapis/api v0.0.0-20240617180043-68d350f18fd4 // indirect diff --git a/storage/gcp/gcp.go b/storage/gcp/gcp.go index 5258ad636..2e24c5dd8 100644 --- a/storage/gcp/gcp.go +++ b/storage/gcp/gcp.go @@ -59,7 +59,6 @@ type Storage struct { projectID string bucket string - dbPool *spanner.Client sequencer sequencer objStore objStore @@ -112,61 +111,9 @@ func New(ctx context.Context, cfg Config) (*Storage, error) { // TODO(al): make queue options configurable: r.queue = storage.NewQueue(time.Second, 256, r.sequenceEntries) - if err := r.initDB(ctx); err != nil { - return nil, fmt.Errorf("failed to init DB: %v", err) - } - return r, nil } -// initDB ensures that the coordination DB is initialised correctly. -// -// The database schema consists of 3 tables: -// - SeqCoord -// This table only ever contains a single row which tracks the next available -// sequence number. -// - Seq -// This table holds sequenced "batches" of entries. The batches are keyed -// by the sequence number assigned to the first entry in the batch, and -// each subsequent entry in the batch takes the numerically next sequence number. -// - IntCoord -// This table coordinates integration of the batches of entries stored in -// Seq into the committed tree state. -// -// The database and schema should be created externally, e.g. by terraform. -func (s *Storage) initDB(ctx context.Context) error { - - /* Schema for reference: - CREATE TABLE SeqCoord ( - id INT64 NOT NULL, - next INT64 NOT NULL, - ) PRIMARY KEY (id); - - CREATE TABLE Seq ( - id INT64 NOT NULL, - seq INT64 NOT NULL, - v BYTES(MAX), - ) PRIMARY KEY (id, seq); - - CREATE TABLE IntCoord ( - id INT64 NOT NULL, - seq INT64 NOT NULL, - ) PRIMARY KEY (id); - */ - - // Set default values for a newly inisialised schema - these rows being present are a precondition for - // sequencing and integration to occur. - // Note that this will only succeed if no row exists, so there's no danger - // of "resetting" an existing log. - if _, err := s.dbPool.Apply(ctx, []*spanner.Mutation{spanner.Insert("SeqCoord", []string{"id", "next"}, []interface{}{0, 0})}); spanner.ErrCode(err) != codes.AlreadyExists { - return err - } - if _, err := s.dbPool.Apply(ctx, []*spanner.Mutation{spanner.Insert("IntCoord", []string{"id", "seq"}, []interface{}{0, 0})}); spanner.ErrCode(err) != codes.AlreadyExists { - return err - } - return nil -} - // tileSuffix returns either the empty string or a tiles API suffix based on the passed-in tile size. func tileSuffix(s uint64) string { if p := s % 256; p != 0 { @@ -244,9 +191,58 @@ func newSpannerSequencer(ctx context.Context, spannerDB string) (*spannerSequenc if err != nil { return nil, fmt.Errorf("failed to connect to Spanner: %v", err) } - return &spannerSequencer{ + r := &spannerSequencer{ dbPool: dbPool, - }, nil + } + return r, r.initDB(ctx) +} + +// initDB ensures that the coordination DB is initialised correctly. +// +// The database schema consists of 3 tables: +// - SeqCoord +// This table only ever contains a single row which tracks the next available +// sequence number. +// - Seq +// This table holds sequenced "batches" of entries. The batches are keyed +// by the sequence number assigned to the first entry in the batch, and +// each subsequent entry in the batch takes the numerically next sequence number. +// - IntCoord +// This table coordinates integration of the batches of entries stored in +// Seq into the committed tree state. +// +// The database and schema should be created externally, e.g. by terraform. +func (s *spannerSequencer) initDB(ctx context.Context) error { + + /* Schema for reference: + CREATE TABLE SeqCoord ( + id INT64 NOT NULL, + next INT64 NOT NULL, + ) PRIMARY KEY (id); + + CREATE TABLE Seq ( + id INT64 NOT NULL, + seq INT64 NOT NULL, + v BYTES(MAX), + ) PRIMARY KEY (id, seq); + + CREATE TABLE IntCoord ( + id INT64 NOT NULL, + seq INT64 NOT NULL, + ) PRIMARY KEY (id); + */ + + // Set default values for a newly inisialised schema - these rows being present are a precondition for + // sequencing and integration to occur. + // Note that this will only succeed if no row exists, so there's no danger + // of "resetting" an existing log. + if _, err := s.dbPool.Apply(ctx, []*spanner.Mutation{spanner.Insert("SeqCoord", []string{"id", "next"}, []interface{}{0, 0})}); spanner.ErrCode(err) != codes.AlreadyExists { + return err + } + if _, err := s.dbPool.Apply(ctx, []*spanner.Mutation{spanner.Insert("IntCoord", []string{"id", "seq"}, []interface{}{0, 0})}); spanner.ErrCode(err) != codes.AlreadyExists { + return err + } + return nil } // assignEntries durably assigns each of the passed-in entries an index in the log. @@ -321,10 +317,12 @@ func newGCSStorage(ctx context.Context, c *gcs.Client, projectID string, bucket break } } - return &gcsStorage{ + r := &gcsStorage{ gcsClient: c, bucket: bucket, - }, nil + } + + return r, nil } // getObject returns the data and generation of the specified object, or an error. diff --git a/storage/gcp/gcp_test.go b/storage/gcp/gcp_test.go index e09bcbe6b..565fe3538 100644 --- a/storage/gcp/gcp_test.go +++ b/storage/gcp/gcp_test.go @@ -20,14 +20,67 @@ import ( "crypto/sha256" "errors" "fmt" + "os" "sync" "testing" + "cloud.google.com/go/spanner/spannertest" + "cloud.google.com/go/spanner/spansql" gcs "cloud.google.com/go/storage" "github.com/google/go-cmp/cmp" "github.com/transparency-dev/trillian-tessera/api/layout" ) +func newSpannerDB(t *testing.T) func() { + t.Helper() + srv, err := spannertest.NewServer("localhost:0") + if err != nil { + t.Fatalf("Failed to set up test spanner: %v", err) + } + os.Setenv("SPANNER_EMULATOR_HOST", srv.Addr) + dml, err := spansql.ParseDDL("", ` + CREATE TABLE SeqCoord (id INT64 NOT NULL, next INT64 NOT NULL,) PRIMARY KEY (id); + CREATE TABLE Seq (id INT64 NOT NULL, seq INT64 NOT NULL, v BYTES(MAX),) PRIMARY KEY (id, seq); + CREATE TABLE IntCoord (id INT64 NOT NULL, seq INT64 NOT NULL,) PRIMARY KEY (id); + `) + if err != nil { + t.Fatalf("Invalid DDL: %v", err) + } + if err := srv.UpdateDDL(dml); err != nil { + t.Fatalf("Failed to create schema in test spanner: %v", err) + } + + return srv.Close + +} + +func TestSpannerSequencer(t *testing.T) { + ctx := context.Background() + close := newSpannerDB(t) + defer close() + + seq, err := newSpannerSequencer(ctx, "projects/p/instances/i/databases/d") + if err != nil { + t.Fatalf("newSpannerSequencer: %v", err) + } + + want := uint64(0) + for chunks := 0; chunks < 10; chunks++ { + entries := [][]byte{} + for i := 0; i < 10+chunks; i++ { + entries = append(entries, []byte(fmt.Sprintf("item %d/%d", chunks, i))) + } + got, err := seq.assignEntries(ctx, entries) + if err != nil { + t.Fatalf("assignEntries: %v", err) + } + if got != want { + t.Errorf("Chunk %d got seq %d, want %d", chunks, got, want) + } + want += uint64(len(entries)) + } +} + func TestTileSuffix(t *testing.T) { for _, test := range []struct { name string