From af2765aad9537800ee7b34e987f001e3be1683dd Mon Sep 17 00:00:00 2001 From: Al Cutter Date: Thu, 11 Jul 2024 18:01:40 +0100 Subject: [PATCH] all --- cmd/example-gcp/main.go | 13 ++- storage/gcp/gcp.go | 236 +++++++++++++++++++++++++++++++++++++++- storage/integrate.go | 225 +++++++++++++++++++++++--------------- 3 files changed, 380 insertions(+), 94 deletions(-) diff --git a/cmd/example-gcp/main.go b/cmd/example-gcp/main.go index fb458178..037added 100644 --- a/cmd/example-gcp/main.go +++ b/cmd/example-gcp/main.go @@ -20,6 +20,7 @@ import ( "context" "crypto/sha256" "flag" + "fmt" "io" "net/http" "os" @@ -46,7 +47,7 @@ func main() { Bucket: *bucket, Spanner: *spanner, } - _, err := gcp.New(ctx, gcpCfg) + gcpStorage, err := gcp.New(ctx, gcpCfg) if err != nil { klog.Exitf("Failed to create new GCP storage: %v", err) } @@ -60,9 +61,15 @@ func main() { defer r.Body.Close() id := sha256.Sum256(b) - _ = tessera.NewEntry(b, tessera.WithIdentity(id[:])) + entry := tessera.NewEntry(b, tessera.WithIdentity(id[:])) - // TODO: Add entry to log and return assigned index. + idx, err := gcpStorage.Add(ctx, entry) + if err != nil { + w.WriteHeader(http.StatusInternalServerError) + w.Write([]byte(err.Error())) + return + } + w.Write([]byte(fmt.Sprintf("%d\n", idx))) }) if err := http.ListenAndServe(*listen, http.DefaultServeMux); err != nil { diff --git a/storage/gcp/gcp.go b/storage/gcp/gcp.go index 3969376e..f90bb4e3 100644 --- a/storage/gcp/gcp.go +++ b/storage/gcp/gcp.go @@ -44,14 +44,21 @@ import ( "cloud.google.com/go/spanner" "cloud.google.com/go/spanner/apiv1/spannerpb" gcs "cloud.google.com/go/storage" + tessera "github.com/transparency-dev/trillian-tessera" + "github.com/transparency-dev/trillian-tessera/api" "github.com/transparency-dev/trillian-tessera/api/layout" "github.com/transparency-dev/trillian-tessera/storage" + "golang.org/x/sync/errgroup" "google.golang.org/api/googleapi" "google.golang.org/api/iterator" "google.golang.org/grpc/codes" "k8s.io/klog/v2" ) +const ( + entryBundleSize = 256 +) + // Storage is a GCP based storage implementation for Tessera. type Storage struct { gcsClient *gcs.Client @@ -71,9 +78,12 @@ type objStore interface { setObject(ctx context.Context, obj string, data []byte, cond *gcs.Conditions) error } +type integrateFunc func(ctx context.Context, from uint64, entries [][]byte) error + // coord describes a type which knows how to sequence entries. type sequencer interface { assignEntries(ctx context.Context, entries [][]byte) (uint64, error) + consumeEntries(ctx context.Context, limit uint64, f integrateFunc) (bool, error) } // Config holds GCP project and resource configuration for a storage instance. @@ -110,16 +120,36 @@ func New(ctx context.Context, cfg Config) (*Storage, error) { } // TODO(al): make queue options configurable: r.queue = storage.NewQueue(time.Second, 256, r.sequencer.assignEntries) + go func() { + t := time.NewTicker(500 * time.Millisecond) + defer t.Stop() + for { + select { + case <-ctx.Done(): + return + case <-t.C: + // case <-r.sequenceWork: + } + for { + cctx, cancel := context.WithTimeout(ctx, 10*time.Second) + defer cancel() + if more, err := r.sequencer.consumeEntries(cctx, 2048 /*limit*/, r.integrate); err != nil { + klog.Errorf("integrate: %v", err) + break + } else if !more { + break + } + klog.Info("Quickloop") + } + } + }() return r, nil } -// tileSuffix returns either the empty string or a tiles API suffix based on the passed-in tile size. -func tileSuffix(s uint64) string { - if p := s % 256; p != 0 { - return fmt.Sprintf(".p/%d", p) - } - return "" +func (s *Storage) Add(ctx context.Context, e tessera.Entry) (uint64, error) { + f := s.queue.Add(ctx, e.Data()) + return f() } // The location to which the tile is written is defined by the tile layout spec. @@ -157,6 +187,31 @@ func (s *Storage) getTile(ctx context.Context, level, index, logSize uint64) ([] return t, nil } +func (s *Storage) getEntryBundle(ctx context.Context, bundleIndex uint64, logSize uint64) (*api.EntryBundle, error) { + objName := layout.EntriesPath(bundleIndex, logSize) + data, _, err := s.objStore.getObject(ctx, objName) + if err != nil { + if errors.Is(err, gcs.ErrObjectNotExist) { + // Return the generic NotExist error so that higher levels can differentiate + // between this and other errors. + return nil, os.ErrNotExist + } + return nil, err + } + + r := &api.EntryBundle{} + return r, r.UnmarshalText(data) +} + +func (s *Storage) setEntryBundle(ctx context.Context, bundleIndex uint64, logSize uint64, bundle *api.EntryBundle) error { + objName := layout.EntriesPath(bundleIndex, logSize) + data, err := bundle.MarshalText() + if err != nil { + return err + } + return s.objStore.setObject(ctx, objName, data, &gcs.Conditions{DoesNotExist: true}) +} + // spannerSequencer uses Cloud Spanner to provide // a durable and thread/multi-process safe sequencer. type spannerSequencer struct { @@ -274,6 +329,175 @@ func (s *spannerSequencer) assignEntries(ctx context.Context, entries [][]byte) return uint64(next), nil } +var errFinish = errors.New("finish") + +func (s *spannerSequencer) consumeEntries(ctx context.Context, limit uint64, f integrateFunc) (bool, error) { + didWork := false + _, err := s.dbPool.ReadWriteTransaction(ctx, func(ctx context.Context, txn *spanner.ReadWriteTransaction) error { + row, err := txn.ReadRowWithOptions(ctx, "IntCoord", spanner.Key{0}, []string{"seq"}, &spanner.ReadOptions{LockHint: spannerpb.ReadRequest_LOCK_HINT_EXCLUSIVE}) + if err != nil { + return err + } + // TODO: handle no such row? + var fromSeq int64 // Spanner doesn't support uint64 + if err := row.Column(0, &fromSeq); err != nil { + return fmt.Errorf("failed to read coord info: %v", err) + } + klog.Infof("Fromseq: %d", fromSeq) + + //s.curSize = uint64(fromSeq) + //numAdded := 0 + + rows := txn.ReadWithOptions(ctx, "Seq", spanner.KeyRange{Start: spanner.Key{0, fromSeq}, End: spanner.Key{0, fromSeq + int64(limit)}}, []string{"seq", "v"}, &spanner.ReadOptions{LockHint: spannerpb.ReadRequest_LOCK_HINT_EXCLUSIVE}) + defer rows.Stop() + + seqsConsumed := []int64{} + entries := make([][]byte, 0, limit) + + orderCheck := fromSeq + for { + row, err := rows.Next() + if row == nil || err == iterator.Done { + break + } + var vGob []byte + var seq int64 // spanner doesn't have uint64 + if err := row.Columns(&seq, &vGob); err != nil { + return fmt.Errorf("failed to scan seq row: %v", err) + } + seqsConsumed = append(seqsConsumed, seq) + if orderCheck != seq { + return fmt.Errorf("integrity fail - expected seq %d, but started at %d", orderCheck, seq) + } + + g := gob.NewDecoder(bytes.NewReader(vGob)) + b := [][]byte{} + if err := g.Decode(&b); err != nil { + return fmt.Errorf("failed to deserialise v: %v", err) + } + entries = append(entries, b...) + orderCheck = seq + int64(len(b)) + } + if len(seqsConsumed) == 0 { + klog.Info("Found no rows to sequence") + return nil + } + //readDone = time.Now() + + if err := f(ctx, uint64(fromSeq), entries); err != nil { + return err + } + + m := make([]*spanner.Mutation, 0) + m = append(m, spanner.Update("IntCoord", []string{"id", "seq"}, []interface{}{0, int64(orderCheck)})) + + for _, c := range seqsConsumed { + m = append(m, spanner.Delete("Seq", spanner.Key{0, c})) + } + if err := txn.BufferWrite(m); err != nil { + return err + } + didWork = len(seqsConsumed) > 0 + return nil + }) + if err != nil { + return false, err + } + //sqlDone = time.Now() + return didWork, nil +} + +func (s *Storage) integrate(ctx context.Context, fromSeq uint64, entries [][]byte) error { + tb := storage.NewTreeBuilder(func(ctx context.Context, tileLevel uint64, tileIndex uint64, treeSize uint64) (*api.HashTile, error) { + n, err := s.getTile(ctx, tileLevel, tileIndex, treeSize) + if err != nil { + return nil, fmt.Errorf("getTile: %v", err) + } + return &api.HashTile{Nodes: n}, nil + }) + + errG := errgroup.Group{} + errG.Go(func() error { + if err := s.updateEntryBundles(ctx, fromSeq, entries); err != nil { + return fmt.Errorf("updateEntryBundles: %v", err) + } + return nil + }) + + errG.Go(func() error { + + newSize, newRoot, tiles, err := tb.Integrate(ctx, fromSeq, entries) + if err != nil { + return fmt.Errorf("Integrate: %v", err) + } + for k, v := range tiles { + errG.Go(func() error { + return s.setTile(ctx, uint64(k.Level), k.Index, newSize, v.Nodes) + }) + } + //TODO: write out checkpoint + klog.Infof("New CP: %d, %x", newSize, newRoot) + return nil + }) + + return errG.Wait() +} + +func (s *Storage) updateEntryBundles(ctx context.Context, fromSeq uint64, entries [][]byte) error { + numAdded := uint64(0) + bundleIndex, entriesInBundle := fromSeq/entryBundleSize, fromSeq%entryBundleSize + bundle := &api.EntryBundle{} + if entriesInBundle > 0 { + // If the latest bundle is partial, we need to read the data it contains in for our newer, larger, bundle. + part, err := s.getEntryBundle(ctx, uint64(bundleIndex), uint64(entriesInBundle)) + if err != nil { + return err + } + bundle = part + } + + seqErr := errgroup.Group{} + // Add new entries to the bundle + for _, e := range entries { + bundle.Entries = append(bundle.Entries, e) + entriesInBundle++ + fromSeq++ + numAdded++ + if entriesInBundle == entryBundleSize { + // This bundle is full, so we need to write it out... + klog.V(1).Infof("Bundle idx %x is full", bundleIndex) + bundle := bundle + seqErr.Go(func() error { + if err := s.setEntryBundle(ctx, bundleIndex, fromSeq, bundle); err != nil { + if !errors.Is(os.ErrExist, err) { + return err + } + } + return nil + }) + // ... and prepare the next entry bundle for any remaining entries in the batch + bundleIndex++ + entriesInBundle = 0 + bundle = &api.EntryBundle{} + klog.V(1).Infof("Starting bundle idx %d", bundleIndex) + } + } + // If we have a partial bundle remaining once we've added all the entries from the batch, + // this needs writing out too. + if entriesInBundle > 0 { + klog.V(1).Infof("Writing partial bundle idx %d.%d", bundleIndex, entriesInBundle) + seqErr.Go(func() error { + if err := s.setEntryBundle(ctx, bundleIndex, fromSeq, bundle); err != nil { + if !errors.Is(os.ErrExist, err) { + return err + } + } + return nil + }) + } + return seqErr.Wait() +} + // gcsStorage knows how to store and retrieve objects from GCS. type gcsStorage struct { bucket string diff --git a/storage/integrate.go b/storage/integrate.go index b0aece0f..70ce5087 100644 --- a/storage/integrate.go +++ b/storage/integrate.go @@ -19,14 +19,14 @@ import ( "errors" "fmt" "os" + "sync" "github.com/transparency-dev/merkle/compact" "github.com/transparency-dev/merkle/rfc6962" - "github.com/transparency-dev/serverless-log/client" "github.com/transparency-dev/trillian-tessera/api" "github.com/transparency-dev/trillian-tessera/api/layout" "golang.org/x/sync/errgroup" - "k8s.io/klog" + "k8s.io/klog/v2" ) // fullTile represents a "fully populated" tile, i.e. it has all non-ephemeral internaly nodes @@ -65,117 +65,159 @@ func (f fullTile) set(id compact.NodeID, hash []byte) { } } +// get allows access to individual leaf/inner nodes. +func (f fullTile) get(id compact.NodeID) []byte { + if id.Level == 0 { + if l := uint64(len(f.leaves)); id.Index >= l { + return nil + } + return f.leaves[id.Index] + } + return f.inner[id] +} + type GetTileFunc func(ctx context.Context, tileLevel uint64, tileIndex uint64, treeSize uint64) (*api.HashTile, error) +type getFullTileFunc func(ctx context.Context, tileLevel uint64, tileIndex uint64, treeSize uint64) (*fullTile, error) type TreeBuilder struct { - tileWriteCache *tileWriteCache - getTile func(ctx context.Context, tileLevel uint64, tileIndex uint64) (*api.HashTile, error) -} - -func NewTreeBuilder(getTile func(ctx context.Context, tileLevel uint64, tileIndex uint64, treeSize uint64) (*api.HashTile, error)) *TreeBuilder { - getFullTile := func(ctx context.Context, tileLevel uint64, tileIndex uint64) (*fullTile, error) { - t, err := getTile(ctx, tileLevel, tileIndex) - if err != nil { - return nil, fmt.Errorf("getTile: %v", err) - } - return newFullTile(t), nil - } - return &TreeBuilder{ - tileWriteCache: newTileWriteCache(getFullTile), - getTile: getTile, - } + getTile getFullTileFunc + rf *compact.RangeFactory } -func (t *TreeBuilder) Integrate(ctx context.Context, fromSize uint64, entries [][]byte) (newSize uint64, rootHash []byte, err error) { - rc := readCache{entries: make(map[string]fullTile)} - defer func() { - klog.Infof("read cache hits: %d", rc.hits) - }() - getTile := func(l, i uint64) (*api.HashTile, error) { - r, ok := rc.get(l, i) +func NewTreeBuilder(getTile GetTileFunc) *TreeBuilder { + readCache := tileReadCache{entries: make(map[string]*fullTile)} + getFullTile := func(ctx context.Context, tileLevel uint64, tileIndex uint64, treeSize uint64) (*fullTile, error) { + r, ok := readCache.get(tileLevel, tileIndex, treeSize) if ok { return r, nil } - t, err := t.getTile(ctx, l, i, fromSize) + t, err := getTile(ctx, tileLevel, tileIndex, treeSize) if err != nil { return nil, err } - rc.set(l, i, t) - return t, nil + ft := newFullTile(t) + readCache.set(tileLevel, tileIndex, treeSize, ft) + return ft, nil + } + return &TreeBuilder{ + getTile: getFullTile, + rf: &compact.RangeFactory{Hash: rfc6962.DefaultHasher.HashChildren}, } - prewarmCache(compact.RangeNodes(0, fromSize, nil), getTile) +} - hashes, err := client.FetchRangeNodes(ctx, fromSize, func(_ context.Context, l, i uint64) (*api.Tile, error) { - return getTile(l, i) - }) - if err != nil { - return 0, nil, fmt.Errorf("failed to fetch compact range nodes: %w", err) +func (t *TreeBuilder) newRange(ctx context.Context, treeSize uint64) (*compact.Range, error) { + rangeNodes := compact.RangeNodes(0, treeSize, nil) + errG := errgroup.Group{} + hashes := make([][]byte, len(rangeNodes)) + for i, id := range rangeNodes { + i := i + id := id + errG.Go(func() error { + tLevel, tIndex, nLevel, nIndex := layout.NodeCoordsToTileAddress(uint64(id.Level), id.Index) + ft, err := t.getTile(ctx, uint64(tLevel), tIndex, treeSize) + if err != nil { + return err + } + h := ft.get(compact.NodeID{Level: nLevel, Index: nIndex}) + if h == nil { + return fmt.Errorf("missing node: [%d/%d@%d]", id.Level, id.Index, treeSize) + } + hashes[i] = h + return nil + }) + } + if err := errG.Wait(); err != nil { + return nil, err } + return t.rf.NewRange(0, treeSize, hashes) +} - rf := compact.RangeFactory{Hash: h.HashChildren} - baseRange, err := rf.NewRange(0, fromSize, hashes) +func (t *TreeBuilder) Integrate(ctx context.Context, fromSize uint64, entries [][]byte) (newSize uint64, rootHash []byte, tiles map[compact.NodeID]*api.HashTile, err error) { + baseRange, err := t.newRange(ctx, fromSize) if err != nil { - return 0, nil, fmt.Errorf("failed to create range covering existing log: %w", err) + return 0, nil, nil, fmt.Errorf("failed to create range covering existing log: %w", err) } // Initialise a compact range representation, and verify the stored state. r, err := baseRange.GetRootHash(nil) if err != nil { - return 0, nil, fmt.Errorf("invalid log state, unable to recalculate root: %w", err) + return 0, nil, nil, fmt.Errorf("invalid log state, unable to recalculate root: %w", err) } klog.V(1).Infof("Loaded state with roothash %x", r) - // Create a new compact range which represents the update to the tree - newRange := rf.NewEmptyRange(fromSize) - tc := tileCache{m: make(map[tileKey]*api.Tile), getTile: getTile} - if len(batch) == 0 { + newRange := t.rf.NewEmptyRange(fromSize) + if len(entries) == 0 { klog.V(1).Infof("Nothing to do.") // Nothing to do, nothing done. - return fromSize, r, nil + return fromSize, r, nil, nil } - for _, e := range batch { - lh := h.HashLeaf(e) + tc := newTileWriteCache(fromSize, t.getTile) + visitor := tc.Visitor(ctx) + for _, e := range entries { + lh := rfc6962.DefaultHasher.HashLeaf(e) // Update range and set nodes - if err := newRange.Append(lh, tc.Visit); err != nil { - return 0, nil, fmt.Errorf("newRange.Append(): %v", err) + if err := newRange.Append(lh, visitor); err != nil { + return 0, nil, nil, fmt.Errorf("newRange.Append(): %v", err) } } // Merge the update range into the old tree - if err := baseRange.AppendRange(newRange, tc.Visit); err != nil { - return 0, nil, fmt.Errorf("failed to merge new range onto existing log: %w", err) + if err := baseRange.AppendRange(newRange, visitor); err != nil { + return 0, nil, nil, fmt.Errorf("failed to merge new range onto existing log: %w", err) } // Calculate the new root hash - don't pass in the tileCache visitor here since // this will construct any ephemeral nodes and we do not want to store those. newRoot, err := baseRange.GetRootHash(nil) if err != nil { - return 0, nil, fmt.Errorf("failed to calculate new root hash: %w", err) + return 0, nil, nil, fmt.Errorf("failed to calculate new root hash: %w", err) } // All calculation is now complete, all that remains is to store the new // tiles and updated log state. klog.V(1).Infof("New log state: size 0x%x hash: %x", baseRange.End(), newRoot) - eg := errgroup.Group{} - for k, t := range tc.m { - k := k - t := t - eg.Go(func() error { - if err := st.StoreTile(ctx, k.level, k.index, t); err != nil { - return fmt.Errorf("failed to store tile at level %d index %d: %w", k.level, k.index, err) - } - return nil - }) - } - if err := eg.Wait(); err != nil { - return 0, nil, err + return baseRange.End(), newRoot, tc.Tiles(), nil + +} + +// tileReadCache is a structure which provides a very simple thread-safe map of tiles. +type tileReadCache struct { + sync.RWMutex + + hits int + entries map[string]*fullTile +} + +func newTileReadCache() tileReadCache { + return tileReadCache{ + entries: make(map[string]*fullTile), } +} - return baseRange.End(), newRoot, nil +// get returns a previously set tile and true, or, if no such tile is in the cache, returns nil and false. +func (r *tileReadCache) get(tileLevel, tileIndex, treeSize uint64) (*fullTile, bool) { + r.RLock() + defer r.RUnlock() + k := layout.TilePath(tileLevel, tileIndex, treeSize) + e, ok := r.entries[k] + if ok { + r.hits++ + } + return e, ok +} +// set associates the given tile coords with a tile. +func (r *tileReadCache) set(tileLevel, tileIndex, treeSize uint64, t *fullTile) { + r.Lock() + defer r.Unlock() + k := layout.TilePath(tileLevel, tileIndex, treeSize) + if _, ok := r.entries[k]; ok { + panic(fmt.Errorf("Attempting to overwrite %v", k)) + } + r.entries[k] = t } // tileWriteCache is a simple cache for storing the newly created tiles produced by @@ -190,13 +232,15 @@ type tileWriteCache struct { m map[compact.NodeID]*fullTile err []error - getTile func(ctx context.Context, level, index uint64) (*fullTile, error) + treeSize uint64 + getTile getFullTileFunc } -func newTileWriteCache(getTile func(ctx context.Context, tileLevel uint64, tileIndex uint64) (*fullTile, error)) *tileWriteCache { +func newTileWriteCache(treeSize uint64, getTile getFullTileFunc) *tileWriteCache { return &tileWriteCache{ - m: make(map[compact.NodeID]*fullTile), - getTile: getTile, + m: make(map[compact.NodeID]*fullTile), + treeSize: treeSize, + getTile: getTile, } } @@ -210,24 +254,35 @@ func (tc *tileWriteCache) Err() error { // If the tile containing id has not been seen before, this method will fetch // it from disk (or create a new empty in-memory tile if it doesn't exist), and // update it by setting the node corresponding to id to the value hash. -func (tc tileWriteCache) Visit(ctx context.Context, id compact.NodeID, hash []byte) { - tileLevel, tileIndex, nodeLevel, nodeIndex := layout.NodeCoordsToTileAddress(uint64(id.Level), uint64(id.Index)) - tileKey := compact.NodeID{Level: uint(tileLevel), Index: tileIndex} - tile := tc.m[tileKey] - if tile == nil { - var err error - tile, err = tc.getTile(ctx, tileLevel, tileIndex) - if err != nil { - if !os.IsNotExist(err) { - tc.err = append(tc.err, err) - return +func (tc *tileWriteCache) Visitor(ctx context.Context) compact.VisitFn { + return func(id compact.NodeID, hash []byte) { + tileLevel, tileIndex, nodeLevel, nodeIndex := layout.NodeCoordsToTileAddress(uint64(id.Level), uint64(id.Index)) + tileKey := compact.NodeID{Level: uint(tileLevel), Index: tileIndex} + tile := tc.m[tileKey] + if tile == nil { + var err error + tile, err = tc.getTile(ctx, tileLevel, tileIndex, tc.treeSize) + if err != nil { + if !os.IsNotExist(err) { + tc.err = append(tc.err, err) + return + } + // This is a brand new tile. + tile = newFullTile(nil) } - // This is a brand new tile. - tile = newFullTile(nil) + tc.m[tileKey] = tile } - tc.m[tileKey] = tile + // Update the tile with the new node hash. + idx := compact.NodeID{Level: nodeLevel, Index: nodeIndex} + tile.set(idx, hash) + } +} + +// Tiles returns all visited tiles. +func (tc *tileWriteCache) Tiles() map[compact.NodeID]*api.HashTile { + newTiles := make(map[compact.NodeID]*api.HashTile) + for k, t := range tc.m { + newTiles[k] = &api.HashTile{Nodes: t.leaves} } - // Update the tile with the new node hash. - idx := compact.NodeID{Level: nodeLevel, Index: nodeIndex} - tile.set(idx, hash) + return newTiles }