Skip to content

Commit

Permalink
[AWS] Separate integration from publishing checkpoints (#372)
Browse files Browse the repository at this point in the history
  • Loading branch information
AlCutter authored Dec 5, 2024
1 parent 18fa545 commit 997c417
Show file tree
Hide file tree
Showing 5 changed files with 226 additions and 53 deletions.
2 changes: 2 additions & 0 deletions cmd/conformance/aws/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ var (
dbMaxConns = flag.Int("db_max_conns", 0, "Maximum connections to the database, defaults to 0, i.e unlimited")
dbMaxIdle = flag.Int("db_max_idle_conns", 2, "Maximum idle database connections in the connection pool, defaults to 2")
signer = flag.String("signer", "", "Note signer to use to sign checkpoints")
publishInterval = flag.Duration("publish_interval", 3*time.Second, "How frequently to publish updated checkpoints")
additionalSigners = []string{}
)

Expand All @@ -64,6 +65,7 @@ func main() {
awsCfg := storageConfigFromFlags()
storage, err := aws.New(ctx, awsCfg,
tessera.WithCheckpointSigner(s, a...),
tessera.WithCheckpointInterval(*publishInterval),
tessera.WithBatching(1024, time.Second),
tessera.WithPushback(10*4096),
)
Expand Down
167 changes: 130 additions & 37 deletions storage/aws/aws.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ import (
"github.com/aws/aws-sdk-go-v2/service/s3/types"
"github.com/aws/smithy-go"
"github.com/google/go-cmp/cmp"
"github.com/transparency-dev/merkle/rfc6962"
tessera "github.com/transparency-dev/trillian-tessera"
"github.com/transparency-dev/trillian-tessera/api"
"github.com/transparency-dev/trillian-tessera/api/layout"
Expand Down Expand Up @@ -75,13 +76,16 @@ type Storage struct {
objStore objStore

queue *storage.Queue

treeUpdated chan struct{}
}

// objStore describes a type which can store and retrieve objects.
type objStore interface {
getObject(ctx context.Context, obj string) ([]byte, error)
setObject(ctx context.Context, obj string, data []byte, contType string) error
setObjectIfNoneMatch(ctx context.Context, obj string, data []byte, contType string) error
lastModified(ctx context.Context, obj string) (time.Time, error)
}

// sequencer describes a type which knows how to sequence entries.
Expand All @@ -95,10 +99,14 @@ type sequencer interface {
// If forceUpdate is true, then the consumeFunc should be called, with an empty slice of entries if
// necessary. This allows the log self-initialise in a transactionally safe manner.
consumeEntries(ctx context.Context, limit uint64, f consumeFunc, forceUpdate bool) (bool, error)

// currentTree returns the sequencer's view of the current tree state.
currentTree(ctx context.Context) (uint64, []byte, error)
}

// consumeFunc is the signature of a function which can consume entries from the sequencer.
type consumeFunc func(ctx context.Context, from uint64, entries []storage.SequencedEntry) error
// Returns the updated root hash of the tree with the consumed entries integrated.
type consumeFunc func(ctx context.Context, from uint64, entries []storage.SequencedEntry) ([]byte, error)

// Config holds AWS project and resource configuration for a storage instance.
type Config struct {
Expand All @@ -113,6 +121,9 @@ type Config struct {
}

// New creates a new instance of the AWS based Storage.
//
// Storage instances created via this c'tor will participate in integrating newly sequenced entries into the log
// and periodically publishing a new checkpoint which commits to the state of the tree.
func New(ctx context.Context, cfg Config, opts ...func(*options.StorageOptions)) (*Storage, error) {
opt := storage.ResolveStorageOptions(opts...)
if opt.PushbackMaxOutstanding == 0 {
Expand All @@ -138,36 +149,71 @@ func New(ctx context.Context, cfg Config, opts ...func(*options.StorageOptions))
sequencer: seq,
newCP: opt.NewCP,
entriesPath: opt.EntriesPath,
treeUpdated: make(chan struct{}),
}
r.queue = storage.NewQueue(ctx, opt.BatchMaxAge, opt.BatchMaxSize, r.sequencer.assignEntries)

if err := r.init(ctx); err != nil {
return nil, fmt.Errorf("failed to initialise log storage: %v", err)
}

go func() {
t := time.NewTicker(1 * time.Second)
defer t.Stop()
for {
select {
case <-ctx.Done():
return
case <-t.C:
}
// Kick off go-routine which handles the integration of entries.
go r.consumeEntriesTask(ctx)

func() {
// Don't quickloop for now, it causes issues updating checkpoint too frequently.
cctx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()
// Kick off go-routing which handles the publication of checkpoints.
go r.publishCheckpointTask(ctx, opt.CheckpointInterval)

if _, err := r.sequencer.consumeEntries(cctx, DefaultIntegrationSizeLimit, r.integrate, false); err != nil {
klog.Errorf("integrate: %v", err)
}
}()
return r, nil
}

// sequenceEntriesTask periodically integrates newly sequenced entries.
//
// This function does not return until the passed context is done.
func (s *Storage) consumeEntriesTask(ctx context.Context) {
t := time.NewTicker(1 * time.Second)
defer t.Stop()
for {
select {
case <-ctx.Done():
return
case <-t.C:
}
}()

return r, nil
func() {
// Don't quickloop for now, it causes issues updating checkpoint too frequently.
cctx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()

if _, err := s.sequencer.consumeEntries(cctx, DefaultIntegrationSizeLimit, s.integrate, false); err != nil {
klog.Errorf("integrate: %v", err)
return
}
select {
case s.treeUpdated <- struct{}{}:
default:
}
}()
}
}

// publishCheckpointTask periodically attempts to publish a new checkpoint representing the current state
// of the tree, once per interval.
//
// This function does not return until the passed in context is done.
func (s *Storage) publishCheckpointTask(ctx context.Context, interval time.Duration) {
t := time.NewTicker(interval)
defer t.Stop()
for {
select {
case <-ctx.Done():
return
case <-s.treeUpdated:
case <-t.C:
}
if err := s.publishCheckpoint(ctx, interval); err != nil {
klog.Warningf("publishCheckpoint: %v", err)
}
}
}

// Add is the entrypoint for adding entries to a sequencing log.
Expand Down Expand Up @@ -210,6 +256,10 @@ func (s *Storage) init(ctx context.Context) error {
if _, err := s.sequencer.consumeEntries(cctx, DefaultIntegrationSizeLimit, s.integrate, true); err != nil {
return fmt.Errorf("forced integrate: %v", err)
}
select {
case s.treeUpdated <- struct{}{}:
default:
}
return nil
}
return fmt.Errorf("failed to read checkpoint: %v", err)
Expand All @@ -218,11 +268,26 @@ func (s *Storage) init(ctx context.Context) error {
return nil
}

func (s *Storage) updateCP(ctx context.Context, newSize uint64, newRoot []byte) error {
cpRaw, err := s.newCP(newSize, newRoot)
func (s *Storage) publishCheckpoint(ctx context.Context, minStaleness time.Duration) error {
m, err := s.objStore.lastModified(ctx, layout.CheckpointPath)
// Do not use errors.Is. Keep errors.As to compare by type and not by value.
var nske *types.NoSuchKey
if err != nil && !errors.As(err, &nske) {
return fmt.Errorf("lastModified(%q): %v", layout.CheckpointPath, err)
}
if time.Since(m) < minStaleness {
return nil
}

size, root, err := s.sequencer.currentTree(ctx)
if err != nil {
return fmt.Errorf("currentTree: %v", err)
}
cpRaw, err := s.newCP(size, root)
if err != nil {
return fmt.Errorf("newCP: %v", err)
}

if err := s.objStore.setObject(ctx, layout.CheckpointPath, cpRaw, ckptContType); err != nil {
return fmt.Errorf("writeCheckpoint: %v", err)
}
Expand Down Expand Up @@ -315,7 +380,11 @@ func (s *Storage) setEntryBundle(ctx context.Context, bundleIndex uint64, logSiz
}

// integrate incorporates the provided entries into the log starting at fromSeq.
func (s *Storage) integrate(ctx context.Context, fromSeq uint64, entries []storage.SequencedEntry) error {
//
// Returns the new root hash of the log with the entries added.
func (s *Storage) integrate(ctx context.Context, fromSeq uint64, entries []storage.SequencedEntry) ([]byte, error) {
var newRoot []byte

getTiles := func(ctx context.Context, tileIDs []storage.TileID, treeSize uint64) ([]*api.HashTile, error) {
n, err := s.getTiles(ctx, tileIDs, treeSize)
if err != nil {
Expand All @@ -334,29 +403,25 @@ func (s *Storage) integrate(ctx context.Context, fromSeq uint64, entries []stora
})

errG.Go(func() error {
newSize, newRoot, tiles, err := storage.Integrate(ctx, getTiles, fromSeq, entries)
newSize, root, tiles, err := storage.Integrate(ctx, getTiles, fromSeq, entries)
if err != nil {
return fmt.Errorf("Integrate: %v", err)
}
newRoot = root
for k, v := range tiles {
func(ctx context.Context, k storage.TileID, v *api.HashTile) {
errG.Go(func() error {
return s.setTile(ctx, uint64(k.Level), k.Index, newSize, v)
})
}(ctx, k, v)
}
errG.Go(func() error {
klog.Infof("New CP: %d, %x", newSize, newRoot)
if s.newCP != nil {
return s.updateCP(ctx, newSize, newRoot)
}
return nil
})
klog.Infof("New tree: %d, %x", newSize, newRoot)

return nil
})

return errG.Wait()
err := errG.Wait()
return newRoot, err
}

// updateEntryBundles adds the entries being integrated into the entry bundles.
Expand Down Expand Up @@ -500,6 +565,7 @@ func (s *mySQLSequencer) initDB(ctx context.Context) error {
`CREATE TABLE IF NOT EXISTS IntCoord(
id INT UNSIGNED NOT NULL,
seq BIGINT UNSIGNED NOT NULL,
rootHash TINYBLOB NOT NULL,
PRIMARY KEY (id)
)`); err != nil {
return err
Expand All @@ -514,7 +580,7 @@ func (s *mySQLSequencer) initDB(ctx context.Context) error {
return err
}
if _, err := s.dbPool.ExecContext(ctx,
`INSERT IGNORE INTO IntCoord (id, seq) VALUES (0, 0)`); err != nil {
`INSERT IGNORE INTO IntCoord (id, seq, rootHash) VALUES (0, 0, ?)`, rfc6962.DefaultHasher.EmptyRoot()); err != nil {
return err
}
return nil
Expand Down Expand Up @@ -618,9 +684,10 @@ func (s *mySQLSequencer) consumeEntries(ctx context.Context, limit uint64, f con
}()

// Figure out which is the starting index of sequenced entries to start consuming from.
row := tx.QueryRowContext(ctx, "SELECT seq FROM IntCoord WHERE id = ? FOR UPDATE", 0)
row := tx.QueryRowContext(ctx, "SELECT seq, rootHash FROM IntCoord WHERE id = ? FOR UPDATE", 0)
var fromSeq uint64
if err := row.Scan(&fromSeq); err == sql.ErrNoRows {
var rootHash []byte
if err := row.Scan(&fromSeq, &rootHash); err == sql.ErrNoRows {
return false, nil
} else if err != nil {
return false, fmt.Errorf("failed to read IntCoord: %v", err)
Expand Down Expand Up @@ -665,13 +732,14 @@ func (s *mySQLSequencer) consumeEntries(ctx context.Context, limit uint64, f con
}

// Call consumeFunc with the entries we've found
if err := f(ctx, uint64(fromSeq), entries); err != nil {
newRoot, err := f(ctx, uint64(fromSeq), entries)
if err != nil {
return false, err
}

// consumeFunc was successful, so we can update our coordination row, and delete the row(s) for
// the then consumed entries.
if _, err := tx.ExecContext(ctx, "UPDATE IntCoord SET seq=? WHERE id=?", orderCheck, 0); err != nil {
if _, err := tx.ExecContext(ctx, "UPDATE IntCoord SET seq=?, rootHash=? WHERE id=?", orderCheck, newRoot, 0); err != nil {
return false, fmt.Errorf("update intcoord: %v", err)
}

Expand All @@ -691,6 +759,18 @@ func (s *mySQLSequencer) consumeEntries(ctx context.Context, limit uint64, f con
return true, nil
}

// currentTree returns the size and root hash of the currently integrated tree.
func (s *mySQLSequencer) currentTree(ctx context.Context) (uint64, []byte, error) {
row := s.dbPool.QueryRowContext(ctx, "SELECT seq, rootHash FROM IntCoord WHERE id = ?", 0)
var fromSeq uint64
var rootHash []byte
if err := row.Scan(&fromSeq, &rootHash); err != nil {
return 0, nil, fmt.Errorf("failed to read IntCoord: %v", err)
}

return fromSeq, rootHash, nil
}

func placeholder(n int) string {
places := make([]string, n)
for i := 0; i < n; i++ {
Expand Down Expand Up @@ -777,3 +857,16 @@ func (s *s3Storage) setObjectIfNoneMatch(ctx context.Context, objName string, da
}
return nil
}

// lastModified returns the time the specified object was last modified, or an error
func (s *s3Storage) lastModified(ctx context.Context, obj string) (time.Time, error) {
r, err := s.s3Client.GetObject(ctx, &s3.GetObjectInput{
Bucket: aws.String(s.bucket),
Key: aws.String(obj),
})
if err != nil {
return time.Time{}, fmt.Errorf("getObject: failed to create reader for object %q in bucket %q: %w", obj, s.bucket, err)
}

return *r.LastModified, r.Body.Close()
}
Loading

0 comments on commit 997c417

Please sign in to comment.