Skip to content

Commit

Permalink
Thread Entry through
Browse files Browse the repository at this point in the history
  • Loading branch information
AlCutter committed Jul 16, 2024
1 parent 7360149 commit f8b354b
Show file tree
Hide file tree
Showing 8 changed files with 71 additions and 43 deletions.
13 changes: 9 additions & 4 deletions cmd/example-gcp/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"context"
"crypto/sha256"
"flag"
"fmt"
"io"
"net/http"
"os"
Expand All @@ -46,7 +47,7 @@ func main() {
Bucket: *bucket,
Spanner: *spanner,
}
_, err := gcp.New(ctx, gcpCfg)
seqStorage, err := gcp.NewSequencingStorage(ctx, gcpCfg)
if err != nil {
klog.Exitf("Failed to create new GCP storage: %v", err)
}
Expand All @@ -60,9 +61,13 @@ func main() {
defer r.Body.Close()

id := sha256.Sum256(b)
_ = tessera.NewEntry(b, tessera.WithIdentity(id[:]))

// TODO: Add entry to log and return assigned index.
idx, err := seqStorage(r.Context(), tessera.NewEntry(b, tessera.WithIdentity(id[:])))
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
_, _ = w.Write([]byte(err.Error()))
return
}
_, _ = w.Write([]byte(fmt.Sprintf("%d", idx)))
})

if err := http.ListenAndServe(*listen, http.DefaultServeMux); err != nil {
Expand Down
9 changes: 9 additions & 0 deletions entry.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@
package tessera

import (
"crypto/sha256"
"encoding/binary"
"fmt"

"github.com/transparency-dev/merkle/rfc6962"
)
Expand All @@ -40,6 +42,10 @@ func NewEntry(data []byte, opts ...EntryOpt) Entry {
for _, opt := range opts {
opt(&e)
}
if e.identity == nil {
h := sha256.Sum256(e.data)
e.identity = h[:]
}
if e.leafHash == nil {
e.leafHash = rfc6962.DefaultHasher.HashLeaf(e.data)

Expand Down Expand Up @@ -68,6 +74,9 @@ func (e *Entry) UnmarshalBinary(buf []byte) error {
l, n = binary.Varint(buf)
buf = buf[n:]
e.leafHash, buf = buf[:l], buf[l:]
if l := len(buf); l != 0 {
return fmt.Errorf("%d trailing bytes", l)
}
return nil
}

Expand Down
31 changes: 21 additions & 10 deletions storage/gcp/gcp.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import (
"cloud.google.com/go/spanner"
"cloud.google.com/go/spanner/apiv1/spannerpb"
gcs "cloud.google.com/go/storage"
tessera "github.com/transparency-dev/trillian-tessera"
"github.com/transparency-dev/trillian-tessera/api"
"github.com/transparency-dev/trillian-tessera/api/layout"
"github.com/transparency-dev/trillian-tessera/storage"
Expand Down Expand Up @@ -77,7 +78,7 @@ type objStore interface {
type sequencer interface {
// assignEntries should durably allocate contiguous index numbers to the provided entries,
// and return the lowest assigned index.
assignEntries(ctx context.Context, entries [][]byte) (uint64, error)
assignEntries(ctx context.Context, entries []tessera.Entry) (uint64, error)
// consumeEntries should call the provided function with up to limit previously sequenced entries.
// If the call to consumeFunc returns no error, the entries should be considered to have been consumed.
// If any entries were successfully consumed, the implementation should also return true; this
Expand All @@ -86,7 +87,7 @@ type sequencer interface {
}

// consumeFunc is the signature of a function which can consume entries from the sequencer.
type consumeFunc func(ctx context.Context, from uint64, entries [][]byte) error
type consumeFunc func(ctx context.Context, from uint64, entries []tessera.Entry) error

// Config holds GCP project and resource configuration for a storage instance.
type Config struct {
Expand All @@ -98,8 +99,13 @@ type Config struct {
Spanner string
}

// New creates a new instance of the GCP based Storage.
func New(ctx context.Context, cfg Config) (*Storage, error) {
func NewSequencingStorage(ctx context.Context, cfg Config) (func(context.Context, tessera.Entry) (uint64, error), error) {
s, err := newStorage(ctx, cfg)
return s.sequenceEntry, err
}

// newStorage creates a new instance of the GCP based Storage.
func newStorage(ctx context.Context, cfg Config) (*Storage, error) {
c, err := gcs.NewClient(ctx)
if err != nil {
return nil, fmt.Errorf("failed to create GCS client: %v", err)
Expand Down Expand Up @@ -149,6 +155,11 @@ func New(ctx context.Context, cfg Config) (*Storage, error) {
return r, nil
}

// sequenceEntry is the entrypoint for adding entries to a sequencing log.
func (s *Storage) sequenceEntry(ctx context.Context, e tessera.Entry) (uint64, error) {
return s.queue.Add(ctx, e)()
}

// setTile idempotently stores the provided tile at the location implied by the given level, index, and treeSize.
func (s *Storage) setTile(ctx context.Context, level, index, logSize uint64, tile *api.HashTile) error {
data, err := tile.MarshalText()
Expand Down Expand Up @@ -236,7 +247,7 @@ func (s *Storage) setEntryBundle(ctx context.Context, bundleIndex uint64, logSiz
}

// integrate incorporates the provided entries into the log starting at fromSeq.
func (s *Storage) integrate(ctx context.Context, fromSeq uint64, entries [][]byte) error {
func (s *Storage) integrate(ctx context.Context, fromSeq uint64, entries []tessera.Entry) error {
tb := storage.NewTreeBuilder(func(ctx context.Context, tileIDs []storage.TileID, treeSize uint64) ([]*api.HashTile, error) {
n, err := s.getTiles(ctx, tileIDs, treeSize)
if err != nil {
Expand Down Expand Up @@ -281,7 +292,7 @@ func (s *Storage) integrate(ctx context.Context, fromSeq uint64, entries [][]byt
// updateEntryBundles adds the entries being integrated into the entry bundles.
//
// The right-most bundle will be grown, if it's partial, and/or new bundles will be created as required.
func (s *Storage) updateEntryBundles(ctx context.Context, fromSeq uint64, entries [][]byte) error {
func (s *Storage) updateEntryBundles(ctx context.Context, fromSeq uint64, entries []tessera.Entry) error {
numAdded := uint64(0)
bundleIndex, entriesInBundle := fromSeq/entryBundleSize, fromSeq%entryBundleSize
bundle := &api.EntryBundle{}
Expand Down Expand Up @@ -309,7 +320,7 @@ func (s *Storage) updateEntryBundles(ctx context.Context, fromSeq uint64, entrie

// Add new entries to the bundle
for _, e := range entries {
bundle.Entries = append(bundle.Entries, e)
bundle.Entries = append(bundle.Entries, e.Data())
entriesInBundle++
fromSeq++
numAdded++
Expand Down Expand Up @@ -408,7 +419,7 @@ func (s *spannerSequencer) initDB(ctx context.Context) error {
// Entries are allocated contiguous indices, in the order in which they appear in the entries parameter.
// This is achieved by storing the passed-in entries in the Seq table in Spanner, keyed by the
// index assigned to the first entry in the batch.
func (s *spannerSequencer) assignEntries(ctx context.Context, entries [][]byte) (uint64, error) {
func (s *spannerSequencer) assignEntries(ctx context.Context, entries []tessera.Entry) (uint64, error) {
// Flatted the entries into a single slice of bytes which we can store in the Seq.v column.
b := &bytes.Buffer{}
e := gob.NewEncoder(b)
Expand Down Expand Up @@ -481,7 +492,7 @@ func (s *spannerSequencer) consumeEntries(ctx context.Context, limit uint64, f c
defer rows.Stop()

seqsConsumed := []int64{}
entries := make([][]byte, 0, limit)
entries := make([]tessera.Entry, 0, limit)
orderCheck := fromSeq
for {
row, err := rows.Next()
Expand All @@ -500,7 +511,7 @@ func (s *spannerSequencer) consumeEntries(ctx context.Context, limit uint64, f c
}

g := gob.NewDecoder(bytes.NewReader(vGob))
b := [][]byte{}
b := []tessera.Entry{}
if err := g.Decode(&b); err != nil {
return fmt.Errorf("failed to deserialise v: %v", err)
}
Expand Down
14 changes: 8 additions & 6 deletions storage/gcp/gcp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,15 @@ import (
"errors"
"fmt"
"os"
"reflect"
"sync"
"testing"

"cloud.google.com/go/spanner/spannertest"
"cloud.google.com/go/spanner/spansql"
gcs "cloud.google.com/go/storage"
"github.com/google/go-cmp/cmp"
tessera "github.com/transparency-dev/trillian-tessera"
"github.com/transparency-dev/trillian-tessera/api"
"github.com/transparency-dev/trillian-tessera/api/layout"
"github.com/transparency-dev/trillian-tessera/storage"
Expand Down Expand Up @@ -68,9 +70,9 @@ func TestSpannerSequencerAssignEntries(t *testing.T) {

want := uint64(0)
for chunks := 0; chunks < 10; chunks++ {
entries := [][]byte{}
entries := []tessera.Entry{}
for i := 0; i < 10+chunks; i++ {
entries = append(entries, []byte(fmt.Sprintf("item %d/%d", chunks, i)))
entries = append(entries, tessera.NewEntry([]byte(fmt.Sprintf("item %d/%d", chunks, i))))
}
got, err := seq.assignEntries(ctx, entries)
if err != nil {
Expand All @@ -95,9 +97,9 @@ func TestSpannerSequencerRoundTrip(t *testing.T) {

seq := 0
for chunks := 0; chunks < 10; chunks++ {
entries := [][]byte{}
entries := []tessera.Entry{}
for i := 0; i < 10+chunks; i++ {
entries = append(entries, []byte(fmt.Sprintf("item %d", seq)))
entries = append(entries, tessera.NewEntry([]byte(fmt.Sprintf("item %d", seq))))
seq++
}
if _, err := s.assignEntries(ctx, entries); err != nil {
Expand All @@ -106,12 +108,12 @@ func TestSpannerSequencerRoundTrip(t *testing.T) {
}

seenIdx := uint64(0)
f := func(_ context.Context, fromSeq uint64, entries [][]byte) error {
f := func(_ context.Context, fromSeq uint64, entries []tessera.Entry) error {
if fromSeq != seenIdx {
return fmt.Errorf("f called with fromSeq %d, want %d", fromSeq, seenIdx)
}
for i, e := range entries {
if !bytes.Equal(e, []byte(fmt.Sprintf("item %d", i))) {
if !reflect.DeepEqual(e, tessera.NewEntry([]byte(fmt.Sprintf("item %d", i)))) {
return fmt.Errorf("entry %d+%d != %d", fromSeq, i, seenIdx)
}
seenIdx++
Expand Down
6 changes: 3 additions & 3 deletions storage/integrate.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (

"github.com/transparency-dev/merkle/compact"
"github.com/transparency-dev/merkle/rfc6962"
tessera "github.com/transparency-dev/trillian-tessera"
"github.com/transparency-dev/trillian-tessera/api"
"github.com/transparency-dev/trillian-tessera/api/layout"
"golang.org/x/exp/maps"
Expand Down Expand Up @@ -86,7 +87,7 @@ func (t *TreeBuilder) newRange(ctx context.Context, treeSize uint64) (*compact.R
return t.rf.NewRange(0, treeSize, hashes)
}

func (t *TreeBuilder) Integrate(ctx context.Context, fromSize uint64, entries [][]byte) (newSize uint64, rootHash []byte, tiles map[TileID]*api.HashTile, err error) {
func (t *TreeBuilder) Integrate(ctx context.Context, fromSize uint64, entries []tessera.Entry) (newSize uint64, rootHash []byte, tiles map[TileID]*api.HashTile, err error) {
baseRange, err := t.newRange(ctx, fromSize)
if err != nil {
return 0, nil, nil, fmt.Errorf("failed to create range covering existing log: %w", err)
Expand All @@ -109,9 +110,8 @@ func (t *TreeBuilder) Integrate(ctx context.Context, fromSize uint64, entries []
tc := newTileWriteCache(fromSize, t.readCache.Get)
visitor := tc.Visitor(ctx)
for _, e := range entries {
lh := rfc6962.DefaultHasher.HashLeaf(e)
// Update range and set nodes
if err := newRange.Append(lh, visitor); err != nil {
if err := newRange.Append(e.LeafHash(), visitor); err != nil {
return 0, nil, nil, fmt.Errorf("newRange.Append(): %v", err)
}

Expand Down
5 changes: 3 additions & 2 deletions storage/integrate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/google/go-cmp/cmp"
"github.com/transparency-dev/merkle/compact"
"github.com/transparency-dev/merkle/rfc6962"
tessera "github.com/transparency-dev/trillian-tessera"
"github.com/transparency-dev/trillian-tessera/api"
"github.com/transparency-dev/trillian-tessera/api/layout"
"k8s.io/klog/v2"
Expand Down Expand Up @@ -130,10 +131,10 @@ func TestIntegrate(t *testing.T) {
seq := uint64(0)
for chunk := 0; chunk < numChunks; chunk++ {
oldSeq := seq
c := make([][]byte, chunkSize)
c := make([]tessera.Entry, chunkSize)
for i := range c {
leaf := []byte{byte(seq)}
c[i] = leaf
c[i] = tessera.NewEntry(leaf)
if err := cr.Append(rfc6962.DefaultHasher.HashLeaf(leaf), nil); err != nil {
t.Fatalf("compact Append: %v", err)
}
Expand Down
17 changes: 9 additions & 8 deletions storage/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"time"

"github.com/globocom/go-buffer"
tessera "github.com/transparency-dev/trillian-tessera"
)

// Queue knows how to queue up a number of entries in order, taking care of deduplication as they're added.
Expand All @@ -47,7 +48,7 @@ type IndexFunc func() (idx uint64, err error)

// FlushFunc is the signature of a function which will receive the slice of queued entries.
// It should return the index assigned to the first entry in the provided slice.
type FlushFunc func(ctx context.Context, entries [][]byte) (index uint64, err error)
type FlushFunc func(ctx context.Context, entries []tessera.Entry) (index uint64, err error)

// NewQueue creates a new queue with the specified maximum age and size.
//
Expand All @@ -69,11 +70,11 @@ func NewQueue(maxAge time.Duration, maxSize uint, f FlushFunc) *Queue {

// squashDupes keeps track of all in-flight requests, enabling dupe squashing for entries currently in the queue.
// Returns an entry struct, and a bool which is true if the provided entry is a dupe and should NOT be added to the queue.
func (q *Queue) squashDupes(e []byte) (*entry, bool) {
func (q *Queue) squashDupes(e tessera.Entry) (*entry, bool) {
q.inFlightMu.Lock()
defer q.inFlightMu.Unlock()

k := string(e)
k := string(e.Identity())
entry, isKnown := q.inFlight[k]
if !isKnown {
entry = newEntry(e)
Expand All @@ -83,7 +84,7 @@ func (q *Queue) squashDupes(e []byte) (*entry, bool) {
}

// Add places e into the queue, and returns a func which may be called to retrieve the assigned index.
func (q *Queue) Add(ctx context.Context, e []byte) IndexFunc {
func (q *Queue) Add(ctx context.Context, e tessera.Entry) IndexFunc {
entry, isDupe := q.squashDupes(e)
if isDupe {
// This entry is already in the queue, so no need to add it again.
Expand All @@ -102,7 +103,7 @@ func (q *Queue) Add(ctx context.Context, e []byte) IndexFunc {
// separate goroutine.
func (q *Queue) doFlush(items []interface{}) {
entries := make([]*entry, len(items))
entriesData := make([][]byte, len(items))
entriesData := make([]tessera.Entry, len(items))
for i, t := range items {
entries[i] = t.(*entry)
entriesData[i] = entries[i].data
Expand All @@ -117,7 +118,7 @@ func (q *Queue) doFlush(items []interface{}) {

for i, e := range entries {
e.assign(s+uint64(i), err)
k := string(e.data)
k := string(e.data.Identity())
delete(q.inFlight, k)
}
}()
Expand All @@ -128,13 +129,13 @@ func (q *Queue) doFlush(items []interface{}) {
// The index field acts as a Future for the entry's assigned index/error, and will
// hang until assign is called.
type entry struct {
data []byte
data tessera.Entry
c chan IndexFunc
index IndexFunc
}

// newEntry creates a new entry for the provided data.
func newEntry(data []byte) *entry {
func newEntry(data tessera.Entry) *entry {
e := &entry{
data: data,
c: make(chan IndexFunc, 1),
Expand Down
Loading

0 comments on commit f8b354b

Please sign in to comment.