Skip to content

Commit

Permalink
Add spanner sequencer impl
Browse files Browse the repository at this point in the history
  • Loading branch information
AlCutter committed Jul 9, 2024
1 parent 6f86000 commit ed14b2e
Show file tree
Hide file tree
Showing 4 changed files with 110 additions and 75 deletions.
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ require (
k8s.io/klog/v2 v2.130.1
)

require cloud.google.com/go/longrunning v0.5.7 // indirect

require (
cloud.google.com/go v0.115.0 // indirect
cloud.google.com/go/auth v0.6.0 // indirect
Expand Down
18 changes: 0 additions & 18 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,6 @@ cloud.google.com/go v0.104.0/go.mod h1:OO6xxXdJyvuJPcEPBLN9BJPD+jep5G1+2U5B5gkRY
cloud.google.com/go v0.105.0/go.mod h1:PrLgOJNe5nfE9UMxKxgXj4mD3voiP+YQ6gdt6KMFOKM=
cloud.google.com/go v0.107.0/go.mod h1:wpc2eNrD7hXUTy8EKS10jkxpZBjASrORK7goS+3YX2I=
cloud.google.com/go v0.110.0/go.mod h1:SJnCLqQ0FCFGSZMUNUf84MV3Aia54kn7pi8st7tMzaY=
cloud.google.com/go v0.114.0 h1:OIPFAdfrFDFO2ve2U7r/H5SwSbBzEdrBdE7xkgwc+kY=
cloud.google.com/go v0.114.0/go.mod h1:ZV9La5YYxctro1HTPug5lXH/GefROyW8PPD4T8n9J8E=
cloud.google.com/go v0.115.0 h1:CnFSK6Xo3lDYRoBKEcAtia6VSC837/ZkJuRduSFnr14=
cloud.google.com/go v0.115.0/go.mod h1:8jIM5vVgoAEoiVxQ/O4BFTfHqulPZgs/ufEzMcFMdWU=
cloud.google.com/go/accessapproval v1.4.0/go.mod h1:zybIuC3KpDOvotz59lFe5qxRZx6C75OtwbisN56xYB4=
Expand Down Expand Up @@ -101,8 +99,6 @@ cloud.google.com/go/assuredworkloads v1.7.0/go.mod h1:z/736/oNmtGAyU47reJgGN+KVo
cloud.google.com/go/assuredworkloads v1.8.0/go.mod h1:AsX2cqyNCOvEQC8RMPnoc0yEarXQk6WEKkxYfL6kGIo=
cloud.google.com/go/assuredworkloads v1.9.0/go.mod h1:kFuI1P78bplYtT77Tb1hi0FMxM0vVpRC7VVoJC3ZoT0=
cloud.google.com/go/assuredworkloads v1.10.0/go.mod h1:kwdUQuXcedVdsIaKgKTp9t0UJkE5+PAVNhdQm4ZVq2E=
cloud.google.com/go/auth v0.5.1 h1:0QNO7VThG54LUzKiQxv8C6x1YX7lUrzlAa1nVLF8CIw=
cloud.google.com/go/auth v0.5.1/go.mod h1:vbZT8GjzDf3AVqCcQmqeeM32U9HBFc32vVVAbwDsa6s=
cloud.google.com/go/auth v0.6.0 h1:5x+d6b5zdezZ7gmLWD1m/xNjnaQ2YDhmIz/HH3doy1g=
cloud.google.com/go/auth v0.6.0/go.mod h1:b4acV+jLQDyjwm4OXHYjNvRi4jvGBzHWJRtJcy+2P4g=
cloud.google.com/go/auth/oauth2adapt v0.2.2 h1:+TTV8aXpjeChS9M+aTtN/TjdQnzJvmzKFt//oWu7HX4=
Expand Down Expand Up @@ -526,8 +522,6 @@ cloud.google.com/go/shell v1.6.0/go.mod h1:oHO8QACS90luWgxP3N9iZVuEiSF84zNyLytb+
cloud.google.com/go/spanner v1.41.0/go.mod h1:MLYDBJR/dY4Wt7ZaMIQ7rXOTLjYrmxLE/5ve9vFfWos=
cloud.google.com/go/spanner v1.44.0/go.mod h1:G8XIgYdOK+Fbcpbs7p2fiprDw4CaZX63whnSMLVBxjk=
cloud.google.com/go/spanner v1.45.0/go.mod h1:FIws5LowYz8YAE1J8fOS7DJup8ff7xJeetWEo5REA2M=
cloud.google.com/go/spanner v1.63.0 h1:P6+BY70Wtol4MtryBgnXZVTZfsdySEvWfz0EpyLwHi4=
cloud.google.com/go/spanner v1.63.0/go.mod h1:iqDx7urZpgD7RekZ+CFvBRH6kVTW1ZSEb2HMDKOp5Cc=
cloud.google.com/go/spanner v1.64.0 h1:ltyPbHA/nRAtAhU/o742dXBCI1eNHPeaRY09Ja8B+hM=
cloud.google.com/go/spanner v1.64.0/go.mod h1:TOFx3pb2UwPsDGlE1gTehW+y6YlU4IFk+VdDHSGQS/M=
cloud.google.com/go/speech v1.6.0/go.mod h1:79tcr4FHCimOp56lwC01xnt/WPJZc4v3gzyT7FoBkCM=
Expand Down Expand Up @@ -827,8 +821,6 @@ github.com/googleapis/gax-go/v2 v2.5.1/go.mod h1:h6B0KMMFNtI2ddbGJn3T3ZbwkeT6yqE
github.com/googleapis/gax-go/v2 v2.6.0/go.mod h1:1mjbznJAPHFpesgE5ucqfYEscaz5kMdcIDwU/6+DDoY=
github.com/googleapis/gax-go/v2 v2.7.0/go.mod h1:TEop28CZZQ2y+c0VxMUmu1lV+fQx57QpBWsYpwqHJx8=
github.com/googleapis/gax-go/v2 v2.7.1/go.mod h1:4orTrqY6hXxxaUL4LHIPl6lGo8vAE38/qKbhSAKP6QI=
github.com/googleapis/gax-go/v2 v2.12.4 h1:9gWcmF85Wvq4ryPFvGFaOgPIs1AQX0d0bcbGw4Z96qg=
github.com/googleapis/gax-go/v2 v2.12.4/go.mod h1:KYEYLorsnIGDi/rPC8b5TdlB9kbKoFubselGIoBMCwI=
github.com/googleapis/gax-go/v2 v2.12.5 h1:8gw9KZK8TiVKB6q3zHY3SBzLnrGp6HQjyfYBYGmXdxA=
github.com/googleapis/gax-go/v2 v2.12.5/go.mod h1:BUDKcWo+RaKq5SC9vVYL0wLADa3VcfswbOMMRmB9H3E=
github.com/googleapis/go-type-adapters v1.0.0/go.mod h1:zHW75FOG2aur7gAO2B+MLby+cLsWGBF62rFAi7WjWO4=
Expand Down Expand Up @@ -1382,8 +1374,6 @@ google.golang.org/api v0.108.0/go.mod h1:2Ts0XTHNVWxypznxWOYUeI4g3WdP9Pk2Qk58+a/
google.golang.org/api v0.110.0/go.mod h1:7FC4Vvx1Mooxh8C5HWjzZHcavuS2f6pmJpZx60ca7iI=
google.golang.org/api v0.111.0/go.mod h1:qtFHvU9mhgTJegR31csQ+rwxyUTHOKFqCKWp1J0fdw0=
google.golang.org/api v0.114.0/go.mod h1:ifYI2ZsFK6/uGddGfAD5BMxlnkBqCmqHSDUVi45N5Yg=
google.golang.org/api v0.183.0 h1:PNMeRDwo1pJdgNcFQ9GstuLe/noWKIc89pRWRLMvLwE=
google.golang.org/api v0.183.0/go.mod h1:q43adC5/pHoSZTx5h2mSmdF7NcyfW9JuDyIOJAgS9ZQ=
google.golang.org/api v0.186.0 h1:n2OPp+PPXX0Axh4GuSsL5QL8xQCTb2oDwyzPnQvqUug=
google.golang.org/api v0.186.0/go.mod h1:hvRbBmgoje49RV3xqVXrmP6w93n6ehGgIVPYrGtBFFc=
google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM=
Expand Down Expand Up @@ -1525,16 +1515,10 @@ google.golang.org/genproto v0.0.0-20230323212658-478b75c54725/go.mod h1:UUQDJDOl
google.golang.org/genproto v0.0.0-20230330154414-c0448cd141ea/go.mod h1:UUQDJDOlWu4KYeJZffbWgBkS1YFobzKbLVfK69pe0Ak=
google.golang.org/genproto v0.0.0-20230331144136-dcfb400f0633/go.mod h1:UUQDJDOlWu4KYeJZffbWgBkS1YFobzKbLVfK69pe0Ak=
google.golang.org/genproto v0.0.0-20230410155749-daa745c078e1/go.mod h1:nKE/iIaLqn2bQwXBg8f1g2Ylh6r5MN5CmZvuzZCgsCU=
google.golang.org/genproto v0.0.0-20240528184218-531527333157 h1:u7WMYrIrVvs0TF5yaKwKNbcJyySYf+HAIFXxWltJOXE=
google.golang.org/genproto v0.0.0-20240528184218-531527333157/go.mod h1:ubQlAQnzejB8uZzszhrTCU2Fyp6Vi7ZE5nn0c3W8+qQ=
google.golang.org/genproto v0.0.0-20240617180043-68d350f18fd4 h1:CUiCqkPw1nNrNQzCCG4WA65m0nAmQiwXHpub3dNyruU=
google.golang.org/genproto v0.0.0-20240617180043-68d350f18fd4/go.mod h1:EvuUDCulqGgV80RvP1BHuom+smhX4qtlhnNatHuroGQ=
google.golang.org/genproto/googleapis/api v0.0.0-20240604185151-ef581f913117 h1:+rdxYoE3E5htTEWIe15GlN6IfvbURM//Jt0mmkmm6ZU=
google.golang.org/genproto/googleapis/api v0.0.0-20240604185151-ef581f913117/go.mod h1:OimBR/bc1wPO9iV4NC2bpyjy3VnAwZh5EBPQdtaE5oo=
google.golang.org/genproto/googleapis/api v0.0.0-20240617180043-68d350f18fd4 h1:MuYw1wJzT+ZkybKfaOXKp5hJiZDn2iHaXRw0mRYdHSc=
google.golang.org/genproto/googleapis/api v0.0.0-20240617180043-68d350f18fd4/go.mod h1:px9SlOOZBg1wM1zdnr8jEL4CNGUBZ+ZKYtNPApNQc4c=
google.golang.org/genproto/googleapis/rpc v0.0.0-20240610135401-a8a62080eff3 h1:9Xyg6I9IWQZhRVfCWjKK+l6kI0jHcPesVlMnT//aHNo=
google.golang.org/genproto/googleapis/rpc v0.0.0-20240610135401-a8a62080eff3/go.mod h1:EfXuqaE1J41VCDicxHzUDm+8rk+7ZdXzHV0IhO/I6s0=
google.golang.org/genproto/googleapis/rpc v0.0.0-20240617180043-68d350f18fd4 h1:Di6ANFilr+S60a4S61ZM00vLdw0IrQOSMS2/6mrnOU0=
google.golang.org/genproto/googleapis/rpc v0.0.0-20240617180043-68d350f18fd4/go.mod h1:Ue6ibwXGpU+dqIcODieyLOcgj7z8+IcskoNIgZxtrFY=
google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c=
Expand Down Expand Up @@ -1598,8 +1582,6 @@ google.golang.org/protobuf v1.28.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqw
google.golang.org/protobuf v1.28.1/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I=
google.golang.org/protobuf v1.29.1/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I=
google.golang.org/protobuf v1.30.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I=
google.golang.org/protobuf v1.34.1 h1:9ddQBjfCyZPOHPUiPxpYESBLc+T8P3E+Vo4IbKZgFWg=
google.golang.org/protobuf v1.34.1/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos=
google.golang.org/protobuf v1.34.2 h1:6xV6lTsCfpGD21XK49h7MhtcApnLqkfYgPcdHftf6hg=
google.golang.org/protobuf v1.34.2/go.mod h1:qYOHts0dSfpeUzUFpOMr/WGzszTmLH+DiWniOlNbLDw=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
Expand Down
112 changes: 55 additions & 57 deletions storage/gcp/gcp.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,6 @@ type Storage struct {
projectID string
bucket string

dbPool *spanner.Client
sequencer sequencer
objStore objStore

Expand Down Expand Up @@ -112,61 +111,9 @@ func New(ctx context.Context, cfg Config) (*Storage, error) {
// TODO(al): make queue options configurable:
r.queue = storage.NewQueue(time.Second, 256, r.sequenceEntries)

if err := r.initDB(ctx); err != nil {
return nil, fmt.Errorf("failed to init DB: %v", err)
}

return r, nil
}

// initDB ensures that the coordination DB is initialised correctly.
//
// The database schema consists of 3 tables:
// - SeqCoord
// This table only ever contains a single row which tracks the next available
// sequence number.
// - Seq
// This table holds sequenced "batches" of entries. The batches are keyed
// by the sequence number assigned to the first entry in the batch, and
// each subsequent entry in the batch takes the numerically next sequence number.
// - IntCoord
// This table coordinates integration of the batches of entries stored in
// Seq into the committed tree state.
//
// The database and schema should be created externally, e.g. by terraform.
func (s *Storage) initDB(ctx context.Context) error {

/* Schema for reference:
CREATE TABLE SeqCoord (
id INT64 NOT NULL,
next INT64 NOT NULL,
) PRIMARY KEY (id);
CREATE TABLE Seq (
id INT64 NOT NULL,
seq INT64 NOT NULL,
v BYTES(MAX),
) PRIMARY KEY (id, seq);
CREATE TABLE IntCoord (
id INT64 NOT NULL,
seq INT64 NOT NULL,
) PRIMARY KEY (id);
*/

// Set default values for a newly inisialised schema - these rows being present are a precondition for
// sequencing and integration to occur.
// Note that this will only succeed if no row exists, so there's no danger
// of "resetting" an existing log.
if _, err := s.dbPool.Apply(ctx, []*spanner.Mutation{spanner.Insert("SeqCoord", []string{"id", "next"}, []interface{}{0, 0})}); spanner.ErrCode(err) != codes.AlreadyExists {
return err
}
if _, err := s.dbPool.Apply(ctx, []*spanner.Mutation{spanner.Insert("IntCoord", []string{"id", "seq"}, []interface{}{0, 0})}); spanner.ErrCode(err) != codes.AlreadyExists {
return err
}
return nil
}

// tileSuffix returns either the empty string or a tiles API suffix based on the passed-in tile size.
func tileSuffix(s uint64) string {
if p := s % 256; p != 0 {
Expand Down Expand Up @@ -244,9 +191,58 @@ func newSpannerSequencer(ctx context.Context, spannerDB string) (*spannerSequenc
if err != nil {
return nil, fmt.Errorf("failed to connect to Spanner: %v", err)
}
return &spannerSequencer{
r := &spannerSequencer{
dbPool: dbPool,
}, nil
}
return r, r.initDB(ctx)
}

// initDB ensures that the coordination DB is initialised correctly.
//
// The database schema consists of 3 tables:
// - SeqCoord
// This table only ever contains a single row which tracks the next available
// sequence number.
// - Seq
// This table holds sequenced "batches" of entries. The batches are keyed
// by the sequence number assigned to the first entry in the batch, and
// each subsequent entry in the batch takes the numerically next sequence number.
// - IntCoord
// This table coordinates integration of the batches of entries stored in
// Seq into the committed tree state.
//
// The database and schema should be created externally, e.g. by terraform.
func (s *spannerSequencer) initDB(ctx context.Context) error {

/* Schema for reference:
CREATE TABLE SeqCoord (
id INT64 NOT NULL,
next INT64 NOT NULL,
) PRIMARY KEY (id);
CREATE TABLE Seq (
id INT64 NOT NULL,
seq INT64 NOT NULL,
v BYTES(MAX),
) PRIMARY KEY (id, seq);
CREATE TABLE IntCoord (
id INT64 NOT NULL,
seq INT64 NOT NULL,
) PRIMARY KEY (id);
*/

// Set default values for a newly inisialised schema - these rows being present are a precondition for
// sequencing and integration to occur.
// Note that this will only succeed if no row exists, so there's no danger
// of "resetting" an existing log.
if _, err := s.dbPool.Apply(ctx, []*spanner.Mutation{spanner.Insert("SeqCoord", []string{"id", "next"}, []interface{}{0, 0})}); spanner.ErrCode(err) != codes.AlreadyExists {
return err
}
if _, err := s.dbPool.Apply(ctx, []*spanner.Mutation{spanner.Insert("IntCoord", []string{"id", "seq"}, []interface{}{0, 0})}); spanner.ErrCode(err) != codes.AlreadyExists {
return err
}
return nil
}

// assignEntries durably assigns each of the passed-in entries an index in the log.
Expand Down Expand Up @@ -321,10 +317,12 @@ func newGCSStorage(ctx context.Context, c *gcs.Client, projectID string, bucket
break
}
}
return &gcsStorage{
r := &gcsStorage{
gcsClient: c,
bucket: bucket,
}, nil
}

return r, nil
}

// getObject returns the data and generation of the specified object, or an error.
Expand Down
53 changes: 53 additions & 0 deletions storage/gcp/gcp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,67 @@ import (
"crypto/sha256"
"errors"
"fmt"
"os"
"sync"
"testing"

"cloud.google.com/go/spanner/spannertest"
"cloud.google.com/go/spanner/spansql"
gcs "cloud.google.com/go/storage"
"github.com/google/go-cmp/cmp"
"github.com/transparency-dev/trillian-tessera/api/layout"
)

func newSpannerDB(t *testing.T) func() {
t.Helper()
srv, err := spannertest.NewServer("localhost:0")
if err != nil {
t.Fatalf("Failed to set up test spanner: %v", err)
}
os.Setenv("SPANNER_EMULATOR_HOST", srv.Addr)
dml, err := spansql.ParseDDL("", `
CREATE TABLE SeqCoord (id INT64 NOT NULL, next INT64 NOT NULL,) PRIMARY KEY (id);
CREATE TABLE Seq (id INT64 NOT NULL, seq INT64 NOT NULL, v BYTES(MAX),) PRIMARY KEY (id, seq);
CREATE TABLE IntCoord (id INT64 NOT NULL, seq INT64 NOT NULL,) PRIMARY KEY (id);
`)
if err != nil {
t.Fatalf("Invalid DDL: %v", err)
}
if err := srv.UpdateDDL(dml); err != nil {
t.Fatalf("Failed to create schema in test spanner: %v", err)
}

return srv.Close

}

func TestSpannerSequencer(t *testing.T) {
ctx := context.Background()
close := newSpannerDB(t)
defer close()

seq, err := newSpannerSequencer(ctx, "projects/p/instances/i/databases/d")
if err != nil {
t.Fatalf("newSpannerSequencer: %v", err)
}

want := uint64(0)
for chunks := 0; chunks < 10; chunks++ {
entries := [][]byte{}
for i := 0; i < 10+chunks; i++ {
entries = append(entries, []byte(fmt.Sprintf("item %d/%d", chunks, i)))
}
got, err := seq.assignEntries(ctx, entries)
if err != nil {
t.Fatalf("assignEntries: %v", err)
}
if got != want {
t.Errorf("Chunk %d got seq %d, want %d", chunks, got, want)
}
want += uint64(len(entries))
}
}

func TestTileSuffix(t *testing.T) {
for _, test := range []struct {
name string
Expand Down

0 comments on commit ed14b2e

Please sign in to comment.