Skip to content

Commit

Permalink
all
Browse files Browse the repository at this point in the history
  • Loading branch information
AlCutter committed Jul 11, 2024
1 parent dfce60f commit 31d2efb
Show file tree
Hide file tree
Showing 3 changed files with 395 additions and 99 deletions.
13 changes: 10 additions & 3 deletions cmd/example-gcp/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"context"
"crypto/sha256"
"flag"
"fmt"
"io"
"net/http"
"os"
Expand All @@ -46,7 +47,7 @@ func main() {
Bucket: *bucket,
Spanner: *spanner,
}
_, err := gcp.New(ctx, gcpCfg)
gcpStorage, err := gcp.New(ctx, gcpCfg)
if err != nil {
klog.Exitf("Failed to create new GCP storage: %v", err)
}
Expand All @@ -60,9 +61,15 @@ func main() {
defer r.Body.Close()

id := sha256.Sum256(b)
_ = tessera.NewEntry(b, tessera.WithIdentity(id[:]))
entry := tessera.NewEntry(b, tessera.WithIdentity(id[:]))

// TODO: Add entry to log and return assigned index.
idx, err := gcpStorage.Add(ctx, entry)
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
w.Write([]byte(err.Error()))
return
}
w.Write([]byte(fmt.Sprintf("%d\n", idx)))
})

if err := http.ListenAndServe(*listen, http.DefaultServeMux); err != nil {
Expand Down
251 changes: 240 additions & 11 deletions storage/gcp/gcp.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,20 +38,26 @@ import (
"io"
"net/http"
"os"
"slices"
"time"

"cloud.google.com/go/spanner"
"cloud.google.com/go/spanner/apiv1/spannerpb"
gcs "cloud.google.com/go/storage"
tessera "github.com/transparency-dev/trillian-tessera"
"github.com/transparency-dev/trillian-tessera/api"
"github.com/transparency-dev/trillian-tessera/api/layout"
"github.com/transparency-dev/trillian-tessera/storage"
"golang.org/x/sync/errgroup"
"google.golang.org/api/googleapi"
"google.golang.org/api/iterator"
"google.golang.org/grpc/codes"
"k8s.io/klog/v2"
)

const (
entryBundleSize = 256
)

// Storage is a GCP based storage implementation for Tessera.
type Storage struct {
gcsClient *gcs.Client
Expand All @@ -71,9 +77,12 @@ type objStore interface {
setObject(ctx context.Context, obj string, data []byte, cond *gcs.Conditions) error
}

type integrateFunc func(ctx context.Context, from uint64, entries [][]byte) error

// coord describes a type which knows how to sequence entries.
type sequencer interface {
assignEntries(ctx context.Context, entries [][]byte) (uint64, error)
consumeEntries(ctx context.Context, limit uint64, f integrateFunc) (bool, error)
}

// Config holds GCP project and resource configuration for a storage instance.
Expand Down Expand Up @@ -110,25 +119,48 @@ func New(ctx context.Context, cfg Config) (*Storage, error) {
}
// TODO(al): make queue options configurable:
r.queue = storage.NewQueue(time.Second, 256, r.sequencer.assignEntries)
go func() {
t := time.NewTicker(500 * time.Millisecond)
defer t.Stop()
for {
select {
case <-ctx.Done():
return
case <-t.C:
// case <-r.sequenceWork:
}
for {
cctx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()
if more, err := r.sequencer.consumeEntries(cctx, 2048 /*limit*/, r.integrate); err != nil {
klog.Errorf("integrate: %v", err)
break
} else if !more {
break
}
klog.Info("Quickloop")
}
}
}()

return r, nil
}

// tileSuffix returns either the empty string or a tiles API suffix based on the passed-in tile size.
func tileSuffix(s uint64) string {
if p := s % 256; p != 0 {
return fmt.Sprintf(".p/%d", p)
}
return ""
func (s *Storage) Add(ctx context.Context, e tessera.Entry) (uint64, error) {
f := s.queue.Add(ctx, e.Data())
return f()
}

// The location to which the tile is written is defined by the tile layout spec.
func (s *Storage) setTile(ctx context.Context, level, index, logSize uint64, tile [][]byte) error {
t := slices.Concat(tile...)
func (s *Storage) setTile(ctx context.Context, level, index, logSize uint64, tile *api.HashTile) error {
data, err := tile.MarshalText()
if err != nil {
return err
}
tPath := layout.TilePath(level, index, logSize)
klog.V(2).Infof("StoreTile: %s", tPath)

return s.objStore.setObject(ctx, tPath, t, &gcs.Conditions{DoesNotExist: true})
return s.objStore.setObject(ctx, tPath, data, &gcs.Conditions{DoesNotExist: true})
}

// getTile returns the tile at the given tile-level and tile-index for the specified log size, or
Expand All @@ -140,7 +172,7 @@ func (s *Storage) getTile(ctx context.Context, level, index, logSize uint64) ([]
if errors.Is(err, gcs.ErrObjectNotExist) {
// Return the generic NotExist error so that higher levels can differentiate
// between this and other errors.
return nil, os.ErrNotExist
return nil, fmt.Errorf("%v: %w", objName, os.ErrNotExist)
}
return nil, err
}
Expand All @@ -157,6 +189,31 @@ func (s *Storage) getTile(ctx context.Context, level, index, logSize uint64) ([]
return t, nil
}

func (s *Storage) getEntryBundle(ctx context.Context, bundleIndex uint64, logSize uint64) (*api.EntryBundle, error) {
objName := layout.EntriesPath(bundleIndex, logSize)
data, _, err := s.objStore.getObject(ctx, objName)
if err != nil {
if errors.Is(err, gcs.ErrObjectNotExist) {
// Return the generic NotExist error so that higher levels can differentiate
// between this and other errors.
return nil, os.ErrNotExist
}
return nil, err
}

r := &api.EntryBundle{}
return r, r.UnmarshalText(data)
}

func (s *Storage) setEntryBundle(ctx context.Context, bundleIndex uint64, logSize uint64, bundle *api.EntryBundle) error {
objName := layout.EntriesPath(bundleIndex, logSize)
data, err := bundle.MarshalText()
if err != nil {
return err
}
return s.objStore.setObject(ctx, objName, data, &gcs.Conditions{DoesNotExist: true})
}

// spannerSequencer uses Cloud Spanner to provide
// a durable and thread/multi-process safe sequencer.
type spannerSequencer struct {
Expand Down Expand Up @@ -274,6 +331,178 @@ func (s *spannerSequencer) assignEntries(ctx context.Context, entries [][]byte)
return uint64(next), nil
}

var errFinish = errors.New("finish")

func (s *spannerSequencer) consumeEntries(ctx context.Context, limit uint64, f integrateFunc) (bool, error) {
didWork := false
_, err := s.dbPool.ReadWriteTransaction(ctx, func(ctx context.Context, txn *spanner.ReadWriteTransaction) error {
row, err := txn.ReadRowWithOptions(ctx, "IntCoord", spanner.Key{0}, []string{"seq"}, &spanner.ReadOptions{LockHint: spannerpb.ReadRequest_LOCK_HINT_EXCLUSIVE})
if err != nil {
return err
}
// TODO: handle no such row?
var fromSeq int64 // Spanner doesn't support uint64
if err := row.Column(0, &fromSeq); err != nil {
return fmt.Errorf("failed to read coord info: %v", err)
}
klog.Infof("Fromseq: %d", fromSeq)

//s.curSize = uint64(fromSeq)
//numAdded := 0

rows := txn.ReadWithOptions(ctx, "Seq", spanner.KeyRange{Start: spanner.Key{0, fromSeq}, End: spanner.Key{0, fromSeq + int64(limit)}}, []string{"seq", "v"}, &spanner.ReadOptions{LockHint: spannerpb.ReadRequest_LOCK_HINT_EXCLUSIVE})
defer rows.Stop()

seqsConsumed := []int64{}
entries := make([][]byte, 0, limit)

orderCheck := fromSeq
for {
row, err := rows.Next()
if row == nil || err == iterator.Done {
break
}
var vGob []byte
var seq int64 // spanner doesn't have uint64
if err := row.Columns(&seq, &vGob); err != nil {
return fmt.Errorf("failed to scan seq row: %v", err)
}
seqsConsumed = append(seqsConsumed, seq)
if orderCheck != seq {
return fmt.Errorf("integrity fail - expected seq %d, but started at %d", orderCheck, seq)
}

g := gob.NewDecoder(bytes.NewReader(vGob))
b := [][]byte{}
if err := g.Decode(&b); err != nil {
return fmt.Errorf("failed to deserialise v: %v", err)
}
entries = append(entries, b...)
orderCheck += int64(len(b))
}
if len(seqsConsumed) == 0 {
klog.Info("Found no rows to sequence")
return nil
}
//readDone = time.Now()

if err := f(ctx, uint64(fromSeq), entries); err != nil {
return err
}

m := make([]*spanner.Mutation, 0)
m = append(m, spanner.Update("IntCoord", []string{"id", "seq"}, []interface{}{0, int64(orderCheck)}))

for _, c := range seqsConsumed {
m = append(m, spanner.Delete("Seq", spanner.Key{0, c}))
}

klog.Infof("%v", m)
if err := txn.BufferWrite(m); err != nil {
return err
}
didWork = len(seqsConsumed) > 0
return nil
})
if err != nil {
return false, err
}
//sqlDone = time.Now()
return didWork, nil
}

func (s *Storage) integrate(ctx context.Context, fromSeq uint64, entries [][]byte) error {
tb := storage.NewTreeBuilder(func(ctx context.Context, tileLevel uint64, tileIndex uint64, treeSize uint64) (*api.HashTile, error) {
n, err := s.getTile(ctx, tileLevel, tileIndex, treeSize)
if err != nil {
return nil, fmt.Errorf("getTile: %v", err)
}
return &api.HashTile{Nodes: n}, nil
})

errG := errgroup.Group{}
errG.Go(func() error {
if err := s.updateEntryBundles(ctx, fromSeq, entries); err != nil {
return fmt.Errorf("updateEntryBundles: %v", err)
}
return nil
})

errG.Go(func() error {

newSize, newRoot, tiles, err := tb.Integrate(ctx, fromSeq, entries)
if err != nil {
return fmt.Errorf("Integrate: %v", err)
}
klog.V(1).Infof("tiles to write: %v", tiles)
for k, v := range tiles {
errG.Go(func() error {
return s.setTile(ctx, uint64(k.Level), k.Index, newSize, v)
})
}
//TODO: write out checkpoint
klog.Infof("New CP: %d, %x", newSize, newRoot)
return nil
})

return errG.Wait()
}

func (s *Storage) updateEntryBundles(ctx context.Context, fromSeq uint64, entries [][]byte) error {
numAdded := uint64(0)
bundleIndex, entriesInBundle := fromSeq/entryBundleSize, fromSeq%entryBundleSize
bundle := &api.EntryBundle{}
if entriesInBundle > 0 {
// If the latest bundle is partial, we need to read the data it contains in for our newer, larger, bundle.
part, err := s.getEntryBundle(ctx, uint64(bundleIndex), uint64(entriesInBundle))
if err != nil {
return err
}
bundle = part
}

seqErr := errgroup.Group{}
// Add new entries to the bundle
for _, e := range entries {
bundle.Entries = append(bundle.Entries, e)
entriesInBundle++
fromSeq++
numAdded++
if entriesInBundle == entryBundleSize {
// This bundle is full, so we need to write it out...
klog.V(1).Infof("Bundle idx %x is full", bundleIndex)
bundle := bundle
seqErr.Go(func() error {
if err := s.setEntryBundle(ctx, bundleIndex, fromSeq, bundle); err != nil {
if !errors.Is(os.ErrExist, err) {
return err
}
}
return nil
})
// ... and prepare the next entry bundle for any remaining entries in the batch
bundleIndex++
entriesInBundle = 0
bundle = &api.EntryBundle{}
klog.V(1).Infof("Starting bundle idx %d", bundleIndex)
}
}
// If we have a partial bundle remaining once we've added all the entries from the batch,
// this needs writing out too.
if entriesInBundle > 0 {
klog.V(1).Infof("Writing partial bundle idx %d.%d", bundleIndex, entriesInBundle)
seqErr.Go(func() error {
if err := s.setEntryBundle(ctx, bundleIndex, fromSeq, bundle); err != nil {
if !errors.Is(os.ErrExist, err) {
return err
}
}
return nil
})
}
return seqErr.Wait()
}

// gcsStorage knows how to store and retrieve objects from GCS.
type gcsStorage struct {
bucket string
Expand Down
Loading

0 comments on commit 31d2efb

Please sign in to comment.