Skip to content

Commit

Permalink
Use Tessera formats (#59)
Browse files Browse the repository at this point in the history
  • Loading branch information
AlCutter authored Jul 12, 2024
1 parent b573ace commit c644ced
Show file tree
Hide file tree
Showing 2 changed files with 123 additions and 67 deletions.
103 changes: 67 additions & 36 deletions storage/gcp/gcp.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,19 +31,18 @@ package gcp
import (
"bytes"
"context"
"crypto/sha256"
"encoding/gob"
"errors"
"fmt"
"io"
"net/http"
"os"
"slices"
"time"

"cloud.google.com/go/spanner"
"cloud.google.com/go/spanner/apiv1/spannerpb"
gcs "cloud.google.com/go/storage"
"github.com/transparency-dev/trillian-tessera/api"
"github.com/transparency-dev/trillian-tessera/api/layout"
"github.com/transparency-dev/trillian-tessera/storage"
"google.golang.org/api/googleapi"
Expand Down Expand Up @@ -114,47 +113,73 @@ func New(ctx context.Context, cfg Config) (*Storage, error) {
return r, nil
}

// tileSuffix returns either the empty string or a tiles API suffix based on the passed-in tile size.
func tileSuffix(s uint64) string {
if p := s % 256; p != 0 {
return fmt.Sprintf(".p/%d", p)
// setTile idempotently stores the provided tile at the location implied by the given level, index, and treeSize.
func (s *Storage) setTile(ctx context.Context, level, index, logSize uint64, tile *api.HashTile) error {
data, err := tile.MarshalText()
if err != nil {
return err
}
return ""
}

// The location to which the tile is written is defined by the tile layout spec.
func (s *Storage) setTile(ctx context.Context, level, index, logSize uint64, tile [][]byte) error {
t := slices.Concat(tile...)
tPath := layout.TilePath(level, index, logSize)
klog.V(2).Infof("StoreTile: %s", tPath)
klog.V(2).Infof("StoreTile: %s (%d entries)", tPath, len(tile.Nodes))

return s.objStore.setObject(ctx, tPath, t, &gcs.Conditions{DoesNotExist: true})
return s.objStore.setObject(ctx, tPath, data, &gcs.Conditions{DoesNotExist: true})
}

// getTile returns the tile at the given tile-level and tile-index for the specified log size, or
// an error if it doesn't exist.
func (s *Storage) getTile(ctx context.Context, level, index, logSize uint64) ([][]byte, error) {
// getTile returns the tile at the given tile-level and tile-index for the specified log size.
//
// Returns a wrapped os.ErrNotExist if the tile does not exist.
func (s *Storage) getTile(ctx context.Context, level, index, logSize uint64) (*api.HashTile, error) {
objName := layout.TilePath(level, index, logSize)
data, _, err := s.objStore.getObject(ctx, objName)
if err != nil {
if errors.Is(err, gcs.ErrObjectNotExist) {
// Return the generic NotExist error so that higher levels can differentiate
// between this and other errors.
return nil, os.ErrNotExist
return nil, fmt.Errorf("%v: %w", objName, os.ErrNotExist)
}
return nil, err
}
t := &api.HashTile{}
return t, t.UnmarshalText(data)
}

// Recreate the tile slices
l := len(data)
if m := l % sha256.Size; m != 0 {
return nil, fmt.Errorf("invalid tile data size %d", l)
// getEntryBundle returns the entry bundle at the location implied by the given index and treeSize.
//
// Returns a wrapped os.ErrNotExist if the bundle does not exist.
func (s *Storage) getEntryBundle(ctx context.Context, bundleIndex uint64, logSize uint64) (*api.EntryBundle, error) {
objName := layout.EntriesPath(bundleIndex, logSize)
data, _, err := s.objStore.getObject(ctx, objName)
if err != nil {
if errors.Is(err, gcs.ErrObjectNotExist) {
// Return the generic NotExist error so that higher levels can differentiate
// between this and other errors.
return nil, fmt.Errorf("%v: %w", objName, os.ErrNotExist)
}
return nil, err
}
t := make([][]byte, l/sha256.Size)
for i := range t {
t[i] = data[i*sha256.Size : (i+1)*sha256.Size]

r := &api.EntryBundle{}
if err := r.UnmarshalText(data); err != nil {
return nil, fmt.Errorf("failed to unmarshal %q: %v", objName, err)
}
return t, nil
return r, nil
}

// setEntryBundle idempotently stores the entry bundle at the location implied by the bundleIndex and treeSize.
func (s *Storage) setEntryBundle(ctx context.Context, bundleIndex uint64, logSize uint64, bundle *api.EntryBundle) error {
objName := layout.EntriesPath(bundleIndex, logSize)
data, err := bundle.MarshalText()
if err != nil {
return err
}
// Note that setObject does an idempotent interpretation of DoesNotExist - it only
// returns an error if the named object exists _and_ contains different data to what's
// passed in here.
if err := s.objStore.setObject(ctx, objName, data, &gcs.Conditions{DoesNotExist: true}); err != nil {
return fmt.Errorf("setObject(%q): %v", objName, err)

}
return nil
}

// spannerSequencer uses Cloud Spanner to provide
Expand All @@ -173,7 +198,10 @@ func newSpannerSequencer(ctx context.Context, spannerDB string) (*spannerSequenc
r := &spannerSequencer{
dbPool: dbPool,
}
return r, r.initDB(ctx)
if err := r.initDB(ctx); err != nil {
return nil, fmt.Errorf("failed to initDB: %v", err)
}
return r, nil
}

// initDB ensures that the coordination DB is initialised correctly.
Expand Down Expand Up @@ -245,11 +273,11 @@ func (s *spannerSequencer) assignEntries(ctx context.Context, entries [][]byte)
// First we need to grab the next available sequence number from the SeqCoord table.
row, err := txn.ReadRowWithOptions(ctx, "SeqCoord", spanner.Key{0}, []string{"id", "next"}, &spanner.ReadOptions{LockHint: spannerpb.ReadRequest_LOCK_HINT_EXCLUSIVE})
if err != nil {
return err
return fmt.Errorf("failed to read SeqCoord: %v", err)
}
var id int64
if err := row.Columns(&id, &next); err != nil {
return err
return fmt.Errorf("failed to parse id column: %v", err)
}

next := uint64(next) // Shadow next with a uint64 version of the same value to save on casts.
Expand All @@ -261,7 +289,7 @@ func (s *spannerSequencer) assignEntries(ctx context.Context, entries [][]byte)
spanner.Update("SeqCoord", []string{"id", "next"}, []interface{}{0, int64(next) + int64(num)}),
}
if err := txn.BufferWrite(m); err != nil {
return err
return fmt.Errorf("failed to apply TX: %v", err)
}

return nil
Expand Down Expand Up @@ -291,7 +319,7 @@ func newGCSStorage(ctx context.Context, c *gcs.Client, projectID string, bucket
return nil, fmt.Errorf("bucket %q does not exist, please create it", bucket)
}
if err != nil {
return nil, err
return nil, fmt.Errorf("error scanning buckets: %v", err)
}
if bAttrs.Name == bucket {
break
Expand All @@ -314,17 +342,20 @@ func (s *gcsStorage) getObject(ctx context.Context, obj string) ([]byte, int64,
defer r.Close()

d, err := io.ReadAll(r)
return d, r.Attrs.Generation, err
if err != nil {
return nil, -1, fmt.Errorf("failed to read %q: %v", obj, err)
}
return d, r.Attrs.Generation, nil
}

// setObject stores the provided data in the specified object, optionally gated by a condition.
//
// cond can be used to specify preconditions for the write (e.g. write iff not exists, write iff
// current generation is X, etc.), or nil can be passed if no preconditions are desired.
//
// When preconditions are specified and are not met, an error will be returned unless the currently
// stored data is bit-for-bit identical to the data to-be-written. This is intended to provide
// idempotentency for writes.
// Note that when preconditions are specified and are not met, an error will be returned *unless*
// the currently stored data is bit-for-bit identical to the data to-be-written.
// This is intended to provide idempotentency for writes.
func (s *gcsStorage) setObject(ctx context.Context, objName string, data []byte, cond *gcs.Conditions) error {
bkt := s.gcsClient.Bucket(s.bucket)
obj := bkt.Object(objName)
Expand Down Expand Up @@ -357,7 +388,7 @@ func (s *gcsStorage) setObject(ctx context.Context, objName string, data []byte,
return nil
}

return err
return fmt.Errorf("failed to close write on %q: %v", objName, err)
}
return nil
}
87 changes: 56 additions & 31 deletions storage/gcp/gcp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"cloud.google.com/go/spanner/spansql"
gcs "cloud.google.com/go/storage"
"github.com/google/go-cmp/cmp"
"github.com/transparency-dev/trillian-tessera/api"
"github.com/transparency-dev/trillian-tessera/api/layout"
)

Expand Down Expand Up @@ -81,40 +82,12 @@ func TestSpannerSequencer(t *testing.T) {
}
}

func TestTileSuffix(t *testing.T) {
for _, test := range []struct {
name string
size uint64
want string
}{
{
name: "no suffix",
size: 256 * 23,
want: "",
}, {
name: "no suffix on zero",
size: 0,
want: "",
}, {
name: "has suffix",
size: 256*23 + 3,
want: ".p/3",
},
} {
t.Run(test.name, func(t *testing.T) {
if got, want := tileSuffix(test.size), test.want; got != want {
t.Fatalf("got %s want %s", got, want)
}
})
}
}

func makeTile(t *testing.T, size uint64) [][]byte {
func makeTile(t *testing.T, size uint64) *api.HashTile {
t.Helper()
r := make([][]byte, size)
r := &api.HashTile{Nodes: make([][]byte, size)}
for i := uint64(0); i < size; i++ {
h := sha256.Sum256([]byte(fmt.Sprintf("%d", i)))
r[i] = h[:]
r.Nodes[i] = h[:]
}
return r
}
Expand Down Expand Up @@ -164,6 +137,58 @@ func TestTileRoundtrip(t *testing.T) {
}
}

func makeBundle(t *testing.T, size uint64) *api.EntryBundle {
t.Helper()
r := &api.EntryBundle{Entries: make([][]byte, size)}
for i := uint64(0); i < size; i++ {
r.Entries[i] = []byte(fmt.Sprintf("%d", i))
}
return r
}

func TestBundleRoundtrip(t *testing.T) {
ctx := context.Background()
m := newMemObjStore()
s := &Storage{
objStore: m,
}

for _, test := range []struct {
name string
index uint64
logSize uint64
bundleSize uint64
}{
{
name: "ok",
index: 3 * 256,
logSize: 3*256 + 20,
bundleSize: 20,
},
} {
t.Run(test.name, func(t *testing.T) {
wantBundle := makeBundle(t, test.bundleSize)
if err := s.setEntryBundle(ctx, test.index, test.logSize, wantBundle); err != nil {
t.Fatalf("setEntryBundle: %v", err)
}

expPath := layout.EntriesPath(test.index, test.logSize)
_, ok := m.mem[expPath]
if !ok {
t.Fatalf("want bundle at %v but found none", expPath)
}

got, err := s.getEntryBundle(ctx, test.index, test.logSize)
if err != nil {
t.Fatalf("getEntryBundle: %v", err)
}
if !cmp.Equal(got, wantBundle) {
t.Fatal("roundtrip returned different data")
}
})
}
}

type memObjStore struct {
sync.RWMutex
mem map[string][]byte
Expand Down

0 comments on commit c644ced

Please sign in to comment.