Skip to content

Commit

Permalink
Add spanner sequencer impl
Browse files Browse the repository at this point in the history
  • Loading branch information
AlCutter committed Jul 10, 2024
1 parent d0a627e commit c92c1e2
Show file tree
Hide file tree
Showing 3 changed files with 110 additions and 57 deletions.
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ require (
k8s.io/klog/v2 v2.130.1
)

require cloud.google.com/go/longrunning v0.5.7 // indirect

require (
cel.dev/expr v0.15.0 // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20240617180043-68d350f18fd4 // indirect
Expand Down
112 changes: 55 additions & 57 deletions storage/gcp/gcp.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,6 @@ type Storage struct {
projectID string
bucket string

dbPool *spanner.Client
sequencer sequencer
objStore objStore

Expand Down Expand Up @@ -112,61 +111,9 @@ func New(ctx context.Context, cfg Config) (*Storage, error) {
// TODO(al): make queue options configurable:
r.queue = storage.NewQueue(time.Second, 256, r.sequenceEntries)

if err := r.initDB(ctx); err != nil {
return nil, fmt.Errorf("failed to init DB: %v", err)
}

return r, nil
}

// initDB ensures that the coordination DB is initialised correctly.
//
// The database schema consists of 3 tables:
// - SeqCoord
// This table only ever contains a single row which tracks the next available
// sequence number.
// - Seq
// This table holds sequenced "batches" of entries. The batches are keyed
// by the sequence number assigned to the first entry in the batch, and
// each subsequent entry in the batch takes the numerically next sequence number.
// - IntCoord
// This table coordinates integration of the batches of entries stored in
// Seq into the committed tree state.
//
// The database and schema should be created externally, e.g. by terraform.
func (s *Storage) initDB(ctx context.Context) error {

/* Schema for reference:
CREATE TABLE SeqCoord (
id INT64 NOT NULL,
next INT64 NOT NULL,
) PRIMARY KEY (id);
CREATE TABLE Seq (
id INT64 NOT NULL,
seq INT64 NOT NULL,
v BYTES(MAX),
) PRIMARY KEY (id, seq);
CREATE TABLE IntCoord (
id INT64 NOT NULL,
seq INT64 NOT NULL,
) PRIMARY KEY (id);
*/

// Set default values for a newly inisialised schema - these rows being present are a precondition for
// sequencing and integration to occur.
// Note that this will only succeed if no row exists, so there's no danger
// of "resetting" an existing log.
if _, err := s.dbPool.Apply(ctx, []*spanner.Mutation{spanner.Insert("SeqCoord", []string{"id", "next"}, []interface{}{0, 0})}); spanner.ErrCode(err) != codes.AlreadyExists {
return err
}
if _, err := s.dbPool.Apply(ctx, []*spanner.Mutation{spanner.Insert("IntCoord", []string{"id", "seq"}, []interface{}{0, 0})}); spanner.ErrCode(err) != codes.AlreadyExists {
return err
}
return nil
}

// tileSuffix returns either the empty string or a tiles API suffix based on the passed-in tile size.
func tileSuffix(s uint64) string {
if p := s % 256; p != 0 {
Expand Down Expand Up @@ -244,9 +191,58 @@ func newSpannerSequencer(ctx context.Context, spannerDB string) (*spannerSequenc
if err != nil {
return nil, fmt.Errorf("failed to connect to Spanner: %v", err)
}
return &spannerSequencer{
r := &spannerSequencer{
dbPool: dbPool,
}, nil
}
return r, r.initDB(ctx)
}

// initDB ensures that the coordination DB is initialised correctly.
//
// The database schema consists of 3 tables:
// - SeqCoord
// This table only ever contains a single row which tracks the next available
// sequence number.
// - Seq
// This table holds sequenced "batches" of entries. The batches are keyed
// by the sequence number assigned to the first entry in the batch, and
// each subsequent entry in the batch takes the numerically next sequence number.
// - IntCoord
// This table coordinates integration of the batches of entries stored in
// Seq into the committed tree state.
//
// The database and schema should be created externally, e.g. by terraform.
func (s *spannerSequencer) initDB(ctx context.Context) error {

/* Schema for reference:
CREATE TABLE SeqCoord (
id INT64 NOT NULL,
next INT64 NOT NULL,
) PRIMARY KEY (id);
CREATE TABLE Seq (
id INT64 NOT NULL,
seq INT64 NOT NULL,
v BYTES(MAX),
) PRIMARY KEY (id, seq);
CREATE TABLE IntCoord (
id INT64 NOT NULL,
seq INT64 NOT NULL,
) PRIMARY KEY (id);
*/

// Set default values for a newly inisialised schema - these rows being present are a precondition for
// sequencing and integration to occur.
// Note that this will only succeed if no row exists, so there's no danger
// of "resetting" an existing log.
if _, err := s.dbPool.Apply(ctx, []*spanner.Mutation{spanner.Insert("SeqCoord", []string{"id", "next"}, []interface{}{0, 0})}); spanner.ErrCode(err) != codes.AlreadyExists {
return err
}
if _, err := s.dbPool.Apply(ctx, []*spanner.Mutation{spanner.Insert("IntCoord", []string{"id", "seq"}, []interface{}{0, 0})}); spanner.ErrCode(err) != codes.AlreadyExists {
return err
}
return nil
}

// assignEntries durably assigns each of the passed-in entries an index in the log.
Expand Down Expand Up @@ -321,10 +317,12 @@ func newGCSStorage(ctx context.Context, c *gcs.Client, projectID string, bucket
break
}
}
return &gcsStorage{
r := &gcsStorage{
gcsClient: c,
bucket: bucket,
}, nil
}

return r, nil
}

// getObject returns the data and generation of the specified object, or an error.
Expand Down
53 changes: 53 additions & 0 deletions storage/gcp/gcp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,67 @@ import (
"crypto/sha256"
"errors"
"fmt"
"os"
"sync"
"testing"

"cloud.google.com/go/spanner/spannertest"
"cloud.google.com/go/spanner/spansql"
gcs "cloud.google.com/go/storage"
"github.com/google/go-cmp/cmp"
"github.com/transparency-dev/trillian-tessera/api/layout"
)

func newSpannerDB(t *testing.T) func() {
t.Helper()
srv, err := spannertest.NewServer("localhost:0")
if err != nil {
t.Fatalf("Failed to set up test spanner: %v", err)
}
os.Setenv("SPANNER_EMULATOR_HOST", srv.Addr)
dml, err := spansql.ParseDDL("", `
CREATE TABLE SeqCoord (id INT64 NOT NULL, next INT64 NOT NULL,) PRIMARY KEY (id);
CREATE TABLE Seq (id INT64 NOT NULL, seq INT64 NOT NULL, v BYTES(MAX),) PRIMARY KEY (id, seq);
CREATE TABLE IntCoord (id INT64 NOT NULL, seq INT64 NOT NULL,) PRIMARY KEY (id);
`)
if err != nil {
t.Fatalf("Invalid DDL: %v", err)
}
if err := srv.UpdateDDL(dml); err != nil {
t.Fatalf("Failed to create schema in test spanner: %v", err)
}

return srv.Close

}

func TestSpannerSequencer(t *testing.T) {
ctx := context.Background()
close := newSpannerDB(t)
defer close()

seq, err := newSpannerSequencer(ctx, "projects/p/instances/i/databases/d")
if err != nil {
t.Fatalf("newSpannerSequencer: %v", err)
}

want := uint64(0)
for chunks := 0; chunks < 10; chunks++ {
entries := [][]byte{}
for i := 0; i < 10+chunks; i++ {
entries = append(entries, []byte(fmt.Sprintf("item %d/%d", chunks, i)))
}
got, err := seq.assignEntries(ctx, entries)
if err != nil {
t.Fatalf("assignEntries: %v", err)
}
if got != want {
t.Errorf("Chunk %d got seq %d, want %d", chunks, got, want)
}
want += uint64(len(entries))
}
}

func TestTileSuffix(t *testing.T) {
for _, test := range []struct {
name string
Expand Down

0 comments on commit c92c1e2

Please sign in to comment.