From f8b354b830c18d2e69f182f38250c663ee2f0912 Mon Sep 17 00:00:00 2001 From: Al Cutter Date: Tue, 16 Jul 2024 17:50:48 +0100 Subject: [PATCH] Thread Entry through --- cmd/example-gcp/main.go | 13 +++++++++---- entry.go | 9 +++++++++ storage/gcp/gcp.go | 31 +++++++++++++++++++++---------- storage/gcp/gcp_test.go | 14 ++++++++------ storage/integrate.go | 6 +++--- storage/integrate_test.go | 5 +++-- storage/queue.go | 17 +++++++++-------- storage/queue_test.go | 19 +++++++++---------- 8 files changed, 71 insertions(+), 43 deletions(-) diff --git a/cmd/example-gcp/main.go b/cmd/example-gcp/main.go index fb458178d..d673192e7 100644 --- a/cmd/example-gcp/main.go +++ b/cmd/example-gcp/main.go @@ -20,6 +20,7 @@ import ( "context" "crypto/sha256" "flag" + "fmt" "io" "net/http" "os" @@ -46,7 +47,7 @@ func main() { Bucket: *bucket, Spanner: *spanner, } - _, err := gcp.New(ctx, gcpCfg) + seqStorage, err := gcp.NewSequencingStorage(ctx, gcpCfg) if err != nil { klog.Exitf("Failed to create new GCP storage: %v", err) } @@ -60,9 +61,13 @@ func main() { defer r.Body.Close() id := sha256.Sum256(b) - _ = tessera.NewEntry(b, tessera.WithIdentity(id[:])) - - // TODO: Add entry to log and return assigned index. + idx, err := seqStorage(r.Context(), tessera.NewEntry(b, tessera.WithIdentity(id[:]))) + if err != nil { + w.WriteHeader(http.StatusInternalServerError) + _, _ = w.Write([]byte(err.Error())) + return + } + _, _ = w.Write([]byte(fmt.Sprintf("%d", idx))) }) if err := http.ListenAndServe(*listen, http.DefaultServeMux); err != nil { diff --git a/entry.go b/entry.go index 0b5ece742..d61b54019 100644 --- a/entry.go +++ b/entry.go @@ -16,7 +16,9 @@ package tessera import ( + "crypto/sha256" "encoding/binary" + "fmt" "github.com/transparency-dev/merkle/rfc6962" ) @@ -40,6 +42,10 @@ func NewEntry(data []byte, opts ...EntryOpt) Entry { for _, opt := range opts { opt(&e) } + if e.identity == nil { + h := sha256.Sum256(e.data) + e.identity = h[:] + } if e.leafHash == nil { e.leafHash = rfc6962.DefaultHasher.HashLeaf(e.data) @@ -68,6 +74,9 @@ func (e *Entry) UnmarshalBinary(buf []byte) error { l, n = binary.Varint(buf) buf = buf[n:] e.leafHash, buf = buf[:l], buf[l:] + if l := len(buf); l != 0 { + return fmt.Errorf("%d trailing bytes", l) + } return nil } diff --git a/storage/gcp/gcp.go b/storage/gcp/gcp.go index e6a49866b..1b0758f6f 100644 --- a/storage/gcp/gcp.go +++ b/storage/gcp/gcp.go @@ -42,6 +42,7 @@ import ( "cloud.google.com/go/spanner" "cloud.google.com/go/spanner/apiv1/spannerpb" gcs "cloud.google.com/go/storage" + tessera "github.com/transparency-dev/trillian-tessera" "github.com/transparency-dev/trillian-tessera/api" "github.com/transparency-dev/trillian-tessera/api/layout" "github.com/transparency-dev/trillian-tessera/storage" @@ -77,7 +78,7 @@ type objStore interface { type sequencer interface { // assignEntries should durably allocate contiguous index numbers to the provided entries, // and return the lowest assigned index. - assignEntries(ctx context.Context, entries [][]byte) (uint64, error) + assignEntries(ctx context.Context, entries []tessera.Entry) (uint64, error) // consumeEntries should call the provided function with up to limit previously sequenced entries. // If the call to consumeFunc returns no error, the entries should be considered to have been consumed. // If any entries were successfully consumed, the implementation should also return true; this @@ -86,7 +87,7 @@ type sequencer interface { } // consumeFunc is the signature of a function which can consume entries from the sequencer. -type consumeFunc func(ctx context.Context, from uint64, entries [][]byte) error +type consumeFunc func(ctx context.Context, from uint64, entries []tessera.Entry) error // Config holds GCP project and resource configuration for a storage instance. type Config struct { @@ -98,8 +99,13 @@ type Config struct { Spanner string } -// New creates a new instance of the GCP based Storage. -func New(ctx context.Context, cfg Config) (*Storage, error) { +func NewSequencingStorage(ctx context.Context, cfg Config) (func(context.Context, tessera.Entry) (uint64, error), error) { + s, err := newStorage(ctx, cfg) + return s.sequenceEntry, err +} + +// newStorage creates a new instance of the GCP based Storage. +func newStorage(ctx context.Context, cfg Config) (*Storage, error) { c, err := gcs.NewClient(ctx) if err != nil { return nil, fmt.Errorf("failed to create GCS client: %v", err) @@ -149,6 +155,11 @@ func New(ctx context.Context, cfg Config) (*Storage, error) { return r, nil } +// sequenceEntry is the entrypoint for adding entries to a sequencing log. +func (s *Storage) sequenceEntry(ctx context.Context, e tessera.Entry) (uint64, error) { + return s.queue.Add(ctx, e)() +} + // setTile idempotently stores the provided tile at the location implied by the given level, index, and treeSize. func (s *Storage) setTile(ctx context.Context, level, index, logSize uint64, tile *api.HashTile) error { data, err := tile.MarshalText() @@ -236,7 +247,7 @@ func (s *Storage) setEntryBundle(ctx context.Context, bundleIndex uint64, logSiz } // integrate incorporates the provided entries into the log starting at fromSeq. -func (s *Storage) integrate(ctx context.Context, fromSeq uint64, entries [][]byte) error { +func (s *Storage) integrate(ctx context.Context, fromSeq uint64, entries []tessera.Entry) error { tb := storage.NewTreeBuilder(func(ctx context.Context, tileIDs []storage.TileID, treeSize uint64) ([]*api.HashTile, error) { n, err := s.getTiles(ctx, tileIDs, treeSize) if err != nil { @@ -281,7 +292,7 @@ func (s *Storage) integrate(ctx context.Context, fromSeq uint64, entries [][]byt // updateEntryBundles adds the entries being integrated into the entry bundles. // // The right-most bundle will be grown, if it's partial, and/or new bundles will be created as required. -func (s *Storage) updateEntryBundles(ctx context.Context, fromSeq uint64, entries [][]byte) error { +func (s *Storage) updateEntryBundles(ctx context.Context, fromSeq uint64, entries []tessera.Entry) error { numAdded := uint64(0) bundleIndex, entriesInBundle := fromSeq/entryBundleSize, fromSeq%entryBundleSize bundle := &api.EntryBundle{} @@ -309,7 +320,7 @@ func (s *Storage) updateEntryBundles(ctx context.Context, fromSeq uint64, entrie // Add new entries to the bundle for _, e := range entries { - bundle.Entries = append(bundle.Entries, e) + bundle.Entries = append(bundle.Entries, e.Data()) entriesInBundle++ fromSeq++ numAdded++ @@ -408,7 +419,7 @@ func (s *spannerSequencer) initDB(ctx context.Context) error { // Entries are allocated contiguous indices, in the order in which they appear in the entries parameter. // This is achieved by storing the passed-in entries in the Seq table in Spanner, keyed by the // index assigned to the first entry in the batch. -func (s *spannerSequencer) assignEntries(ctx context.Context, entries [][]byte) (uint64, error) { +func (s *spannerSequencer) assignEntries(ctx context.Context, entries []tessera.Entry) (uint64, error) { // Flatted the entries into a single slice of bytes which we can store in the Seq.v column. b := &bytes.Buffer{} e := gob.NewEncoder(b) @@ -481,7 +492,7 @@ func (s *spannerSequencer) consumeEntries(ctx context.Context, limit uint64, f c defer rows.Stop() seqsConsumed := []int64{} - entries := make([][]byte, 0, limit) + entries := make([]tessera.Entry, 0, limit) orderCheck := fromSeq for { row, err := rows.Next() @@ -500,7 +511,7 @@ func (s *spannerSequencer) consumeEntries(ctx context.Context, limit uint64, f c } g := gob.NewDecoder(bytes.NewReader(vGob)) - b := [][]byte{} + b := []tessera.Entry{} if err := g.Decode(&b); err != nil { return fmt.Errorf("failed to deserialise v: %v", err) } diff --git a/storage/gcp/gcp_test.go b/storage/gcp/gcp_test.go index 069db97ae..81082c7ce 100644 --- a/storage/gcp/gcp_test.go +++ b/storage/gcp/gcp_test.go @@ -21,6 +21,7 @@ import ( "errors" "fmt" "os" + "reflect" "sync" "testing" @@ -28,6 +29,7 @@ import ( "cloud.google.com/go/spanner/spansql" gcs "cloud.google.com/go/storage" "github.com/google/go-cmp/cmp" + tessera "github.com/transparency-dev/trillian-tessera" "github.com/transparency-dev/trillian-tessera/api" "github.com/transparency-dev/trillian-tessera/api/layout" "github.com/transparency-dev/trillian-tessera/storage" @@ -68,9 +70,9 @@ func TestSpannerSequencerAssignEntries(t *testing.T) { want := uint64(0) for chunks := 0; chunks < 10; chunks++ { - entries := [][]byte{} + entries := []tessera.Entry{} for i := 0; i < 10+chunks; i++ { - entries = append(entries, []byte(fmt.Sprintf("item %d/%d", chunks, i))) + entries = append(entries, tessera.NewEntry([]byte(fmt.Sprintf("item %d/%d", chunks, i)))) } got, err := seq.assignEntries(ctx, entries) if err != nil { @@ -95,9 +97,9 @@ func TestSpannerSequencerRoundTrip(t *testing.T) { seq := 0 for chunks := 0; chunks < 10; chunks++ { - entries := [][]byte{} + entries := []tessera.Entry{} for i := 0; i < 10+chunks; i++ { - entries = append(entries, []byte(fmt.Sprintf("item %d", seq))) + entries = append(entries, tessera.NewEntry([]byte(fmt.Sprintf("item %d", seq)))) seq++ } if _, err := s.assignEntries(ctx, entries); err != nil { @@ -106,12 +108,12 @@ func TestSpannerSequencerRoundTrip(t *testing.T) { } seenIdx := uint64(0) - f := func(_ context.Context, fromSeq uint64, entries [][]byte) error { + f := func(_ context.Context, fromSeq uint64, entries []tessera.Entry) error { if fromSeq != seenIdx { return fmt.Errorf("f called with fromSeq %d, want %d", fromSeq, seenIdx) } for i, e := range entries { - if !bytes.Equal(e, []byte(fmt.Sprintf("item %d", i))) { + if !reflect.DeepEqual(e, tessera.NewEntry([]byte(fmt.Sprintf("item %d", i)))) { return fmt.Errorf("entry %d+%d != %d", fromSeq, i, seenIdx) } seenIdx++ diff --git a/storage/integrate.go b/storage/integrate.go index a10a4aaa7..e53d1e1f4 100644 --- a/storage/integrate.go +++ b/storage/integrate.go @@ -23,6 +23,7 @@ import ( "github.com/transparency-dev/merkle/compact" "github.com/transparency-dev/merkle/rfc6962" + tessera "github.com/transparency-dev/trillian-tessera" "github.com/transparency-dev/trillian-tessera/api" "github.com/transparency-dev/trillian-tessera/api/layout" "golang.org/x/exp/maps" @@ -86,7 +87,7 @@ func (t *TreeBuilder) newRange(ctx context.Context, treeSize uint64) (*compact.R return t.rf.NewRange(0, treeSize, hashes) } -func (t *TreeBuilder) Integrate(ctx context.Context, fromSize uint64, entries [][]byte) (newSize uint64, rootHash []byte, tiles map[TileID]*api.HashTile, err error) { +func (t *TreeBuilder) Integrate(ctx context.Context, fromSize uint64, entries []tessera.Entry) (newSize uint64, rootHash []byte, tiles map[TileID]*api.HashTile, err error) { baseRange, err := t.newRange(ctx, fromSize) if err != nil { return 0, nil, nil, fmt.Errorf("failed to create range covering existing log: %w", err) @@ -109,9 +110,8 @@ func (t *TreeBuilder) Integrate(ctx context.Context, fromSize uint64, entries [] tc := newTileWriteCache(fromSize, t.readCache.Get) visitor := tc.Visitor(ctx) for _, e := range entries { - lh := rfc6962.DefaultHasher.HashLeaf(e) // Update range and set nodes - if err := newRange.Append(lh, visitor); err != nil { + if err := newRange.Append(e.LeafHash(), visitor); err != nil { return 0, nil, nil, fmt.Errorf("newRange.Append(): %v", err) } diff --git a/storage/integrate_test.go b/storage/integrate_test.go index 8b98e09d1..8c6285914 100644 --- a/storage/integrate_test.go +++ b/storage/integrate_test.go @@ -24,6 +24,7 @@ import ( "github.com/google/go-cmp/cmp" "github.com/transparency-dev/merkle/compact" "github.com/transparency-dev/merkle/rfc6962" + tessera "github.com/transparency-dev/trillian-tessera" "github.com/transparency-dev/trillian-tessera/api" "github.com/transparency-dev/trillian-tessera/api/layout" "k8s.io/klog/v2" @@ -130,10 +131,10 @@ func TestIntegrate(t *testing.T) { seq := uint64(0) for chunk := 0; chunk < numChunks; chunk++ { oldSeq := seq - c := make([][]byte, chunkSize) + c := make([]tessera.Entry, chunkSize) for i := range c { leaf := []byte{byte(seq)} - c[i] = leaf + c[i] = tessera.NewEntry(leaf) if err := cr.Append(rfc6962.DefaultHasher.HashLeaf(leaf), nil); err != nil { t.Fatalf("compact Append: %v", err) } diff --git a/storage/queue.go b/storage/queue.go index 1c9b730ec..22285033c 100644 --- a/storage/queue.go +++ b/storage/queue.go @@ -21,6 +21,7 @@ import ( "time" "github.com/globocom/go-buffer" + tessera "github.com/transparency-dev/trillian-tessera" ) // Queue knows how to queue up a number of entries in order, taking care of deduplication as they're added. @@ -47,7 +48,7 @@ type IndexFunc func() (idx uint64, err error) // FlushFunc is the signature of a function which will receive the slice of queued entries. // It should return the index assigned to the first entry in the provided slice. -type FlushFunc func(ctx context.Context, entries [][]byte) (index uint64, err error) +type FlushFunc func(ctx context.Context, entries []tessera.Entry) (index uint64, err error) // NewQueue creates a new queue with the specified maximum age and size. // @@ -69,11 +70,11 @@ func NewQueue(maxAge time.Duration, maxSize uint, f FlushFunc) *Queue { // squashDupes keeps track of all in-flight requests, enabling dupe squashing for entries currently in the queue. // Returns an entry struct, and a bool which is true if the provided entry is a dupe and should NOT be added to the queue. -func (q *Queue) squashDupes(e []byte) (*entry, bool) { +func (q *Queue) squashDupes(e tessera.Entry) (*entry, bool) { q.inFlightMu.Lock() defer q.inFlightMu.Unlock() - k := string(e) + k := string(e.Identity()) entry, isKnown := q.inFlight[k] if !isKnown { entry = newEntry(e) @@ -83,7 +84,7 @@ func (q *Queue) squashDupes(e []byte) (*entry, bool) { } // Add places e into the queue, and returns a func which may be called to retrieve the assigned index. -func (q *Queue) Add(ctx context.Context, e []byte) IndexFunc { +func (q *Queue) Add(ctx context.Context, e tessera.Entry) IndexFunc { entry, isDupe := q.squashDupes(e) if isDupe { // This entry is already in the queue, so no need to add it again. @@ -102,7 +103,7 @@ func (q *Queue) Add(ctx context.Context, e []byte) IndexFunc { // separate goroutine. func (q *Queue) doFlush(items []interface{}) { entries := make([]*entry, len(items)) - entriesData := make([][]byte, len(items)) + entriesData := make([]tessera.Entry, len(items)) for i, t := range items { entries[i] = t.(*entry) entriesData[i] = entries[i].data @@ -117,7 +118,7 @@ func (q *Queue) doFlush(items []interface{}) { for i, e := range entries { e.assign(s+uint64(i), err) - k := string(e.data) + k := string(e.data.Identity()) delete(q.inFlight, k) } }() @@ -128,13 +129,13 @@ func (q *Queue) doFlush(items []interface{}) { // The index field acts as a Future for the entry's assigned index/error, and will // hang until assign is called. type entry struct { - data []byte + data tessera.Entry c chan IndexFunc index IndexFunc } // newEntry creates a new entry for the provided data. -func newEntry(data []byte) *entry { +func newEntry(data tessera.Entry) *entry { e := &entry{ data: data, c: make(chan IndexFunc, 1), diff --git a/storage/queue_test.go b/storage/queue_test.go index c9f1cd91c..65736872f 100644 --- a/storage/queue_test.go +++ b/storage/queue_test.go @@ -15,14 +15,15 @@ package storage_test import ( - "bytes" "context" "fmt" + "reflect" "sync" "sync/atomic" "testing" "time" + tessera "github.com/transparency-dev/trillian-tessera" "github.com/transparency-dev/trillian-tessera/storage" ) @@ -52,12 +53,12 @@ func TestQueue(t *testing.T) { } { t.Run(test.name, func(t *testing.T) { assignMu := sync.Mutex{} - assignedItems := make([][]byte, test.numItems) + assignedItems := make([]tessera.Entry, test.numItems) assignedIndex := uint64(0) // flushFunc mimics sequencing storage - it takes entries, assigns them to // positions in assignedItems and returns the first assigned positition for each batch. - flushFunc := func(_ context.Context, entries [][]byte) (uint64, error) { + flushFunc := func(_ context.Context, entries []tessera.Entry) (uint64, error) { assignMu.Lock() defer assignMu.Unlock() @@ -74,9 +75,9 @@ func TestQueue(t *testing.T) { // Now submit a bunch of entries adds := make([]storage.IndexFunc, test.numItems) - wantEntries := make([][]byte, test.numItems) + wantEntries := make([]tessera.Entry, test.numItems) for i := uint64(0); i < test.numItems; i++ { - wantEntries[i] = []byte(fmt.Sprintf("item %d", i)) + wantEntries[i] = tessera.NewEntry([]byte(fmt.Sprintf("item %d", i))) adds[i] = q.Add(context.TODO(), wantEntries[i]) } @@ -86,9 +87,7 @@ func TestQueue(t *testing.T) { t.Errorf("Add: %v", err) return } - if item := assignedItems[N]; item == nil { - t.Errorf("Item %d was not present", N) - } else if got, want := item, wantEntries[i]; !bytes.Equal(got, want) { + if got, want := assignedItems[N], wantEntries[i]; !reflect.DeepEqual(got, want) { t.Errorf("Got item@%d %q, want %q", N, got, want) } } @@ -99,7 +98,7 @@ func TestQueue(t *testing.T) { func TestDedup(t *testing.T) { idx := atomic.Uint64{} - q := storage.NewQueue(time.Second, 10 /*maxSize*/, func(ctx context.Context, entries [][]byte) (index uint64, err error) { + q := storage.NewQueue(time.Second, 10 /*maxSize*/, func(ctx context.Context, entries []tessera.Entry) (index uint64, err error) { r := idx.Load() idx.Add(1) return r, nil @@ -108,7 +107,7 @@ func TestDedup(t *testing.T) { numEntries := 10 adds := []storage.IndexFunc{} for i := 0; i < numEntries; i++ { - adds = append(adds, q.Add(context.TODO(), []byte("Have I seen this before?"))) + adds = append(adds, q.Add(context.TODO(), tessera.NewEntry([]byte("Have I seen this before?")))) } firstN, err := adds[0]()