Skip to content

Commit

Permalink
Add sequence and integrate foundation to MySQL storage implementation (
Browse files Browse the repository at this point in the history
…#87)

* Add sequence and integrate foundation to MySQL storage implementation

* Remove the no-op `tessera.WithIdentity` in example-mysql

* Add issue ID to TODOs in MySQL storage implementation

* Add default values for max age and size for entry bundle queue

* Add TODO for getting the tree size without parsing and verifying the checkpoints every time

* Add a TODO to refactor how we fetch tiles during integrate

* Use `SELECT ... FOR UPDATE` row-level locking on checkpoint during sequencing
  • Loading branch information
roger2hk authored Jul 25, 2024
1 parent cf69091 commit 8c5a7ee
Show file tree
Hide file tree
Showing 2 changed files with 179 additions and 17 deletions.
21 changes: 15 additions & 6 deletions cmd/example-mysql/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ package main

import (
"context"
"crypto/sha256"
"database/sql"
"flag"
"fmt"
Expand Down Expand Up @@ -160,12 +159,22 @@ func main() {
w.WriteHeader(http.StatusInternalServerError)
return
}
defer r.Body.Close()

id := sha256.Sum256(b)
_ = tessera.NewEntry(b, tessera.WithIdentity(id[:]))
defer func() {
if err := r.Body.Close(); err != nil {
klog.Warningf("/add: %v", err)
}
}()

// TODO: Add entry to log and return assigned index.
idx, err := storage.Add(r.Context(), tessera.NewEntry(b))
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
_, _ = w.Write([]byte(err.Error()))
return
}
if _, err = w.Write([]byte(fmt.Sprintf("%d", idx))); err != nil {
klog.Errorf("/add: %v", err)
return
}
})

if err := http.ListenAndServe(*listen, http.DefaultServeMux); err != nil {
Expand Down
175 changes: 164 additions & 11 deletions storage/mysql/mysql.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,41 +18,59 @@ package mysql
import (
"context"
"database/sql"
"fmt"
"time"

_ "github.com/go-sql-driver/mysql"
"github.com/transparency-dev/merkle/rfc6962"
tessera "github.com/transparency-dev/trillian-tessera"
"github.com/transparency-dev/trillian-tessera/api"
"github.com/transparency-dev/trillian-tessera/storage"
"golang.org/x/sync/errgroup"
"k8s.io/klog/v2"
)

const (
selectCheckpointByIDSQL = "SELECT `note` FROM `Checkpoint` WHERE `id` = ?"
replaceCheckpointSQL = "REPLACE INTO `Checkpoint` (`id`, `note`) VALUES (?, ?)"
selectSubtreeByLevelAndIndexSQL = "SELECT `nodes` FROM `Subtree` WHERE `level` = ? AND `index` = ?"
selectTiledLeavesSQL = "SELECT `data` FROM `TiledLeaves` WHERE `tile_index` = ?"
selectCheckpointByIDSQL = "SELECT `note` FROM `Checkpoint` WHERE `id` = ?"
selectCheckpointByIDForUpdateSQL = selectCheckpointByIDSQL + " FOR UPDATE"
replaceCheckpointSQL = "REPLACE INTO `Checkpoint` (`id`, `note`) VALUES (?, ?)"
selectSubtreeByLevelAndIndexSQL = "SELECT `nodes` FROM `Subtree` WHERE `level` = ? AND `index` = ?"
replaceSubtreeSQL = "REPLACE INTO `Subtree` (`level`, `index`, `nodes`) VALUES (?, ?, ?)"
selectTiledLeavesSQL = "SELECT `data` FROM `TiledLeaves` WHERE `tile_index` = ?"
replaceTiledLeavesSQL = "REPLACE INTO `TiledLeaves` (`tile_index`, `data`) VALUES (?, ?)"

checkpointID = 0
checkpointID = 0
defaultEntryBundleSize = 256
defaultQueueMaxAge = time.Second
)

// Storage is a MySQL-based storage implementation for Tessera.
type Storage struct {
db *sql.DB
db *sql.DB
queue *storage.Queue

newCheckpoint tessera.NewCPFunc
newCheckpoint tessera.NewCPFunc
parseCheckpoint tessera.ParseCPFunc
}

// New creates a new instance of the MySQL-based Storage.
func New(ctx context.Context, db *sql.DB, opts ...func(*tessera.StorageOptions)) (*Storage, error) {
opt := tessera.ResolveStorageOptions(nil, opts...)
opt := tessera.ResolveStorageOptions(&tessera.StorageOptions{
BatchMaxAge: defaultQueueMaxAge,
BatchMaxSize: defaultEntryBundleSize,
}, opts...)
s := &Storage{
db: db,
newCheckpoint: opt.NewCP,
db: db,
newCheckpoint: opt.NewCP,
parseCheckpoint: opt.ParseCP,
}
if err := s.db.Ping(); err != nil {
klog.Errorf("Failed to ping database: %v", err)
return nil, err
}

s.queue = storage.NewQueue(ctx, opt.BatchMaxAge, opt.BatchMaxSize, s.sequenceBatch)

// Initialize checkpoint if there is no row in the Checkpoint table.
if _, err := s.ReadCheckpoint(ctx); err != nil {
if err != sql.ErrNoRows {
Expand Down Expand Up @@ -111,7 +129,7 @@ func (s *Storage) writeCheckpoint(ctx context.Context, tx *sql.Tx, size uint64,
return nil
}

// ReadTile returns a full tile or a partial tile at the given level and index.
// ReadTile returns a full tile or a partial tile at the given level, index and width.
//
// TODO: Handle the following scenarios:
// 1. Full tile request with full tile output: Return full tile.
Expand Down Expand Up @@ -142,6 +160,16 @@ func (s *Storage) ReadTile(ctx context.Context, level, index, width uint64) ([]b
return tile, nil
}

// writeTile replaces the tile nodes at the given level and index.
func (s *Storage) writeTile(ctx context.Context, tx *sql.Tx, level, index uint64, nodes []byte) error {
if _, err := tx.ExecContext(ctx, replaceSubtreeSQL, level, index, nodes); err != nil {
klog.Errorf("Failed to execute replaceSubtreeSQL: %v", err)
return err
}

return nil
}

// ReadEntryBundle returns the log entries at the given index.
//
// TODO: Handle the following scenarios:
Expand All @@ -159,3 +187,128 @@ func (s *Storage) ReadEntryBundle(ctx context.Context, index uint64) ([]byte, er
var entryBundle []byte
return entryBundle, row.Scan(&entryBundle)
}

// Add is the entrypoint for adding entries to a sequencing log.
func (s *Storage) Add(ctx context.Context, entry *tessera.Entry) (uint64, error) {
// TODO(#21): Return index if the value is already stored.

return s.queue.Add(ctx, entry)()
}

// sequenceBatch writes the entries from the provided batch into the entry bundle files of the log.
//
// This func starts filling entries bundles at the next available slot in the log, ensuring that the
// sequenced entries are contiguous from the zeroth entry (i.e left-hand dense).
// We try to minimise the number of partially complete entry bundles by writing entries in chunks rather
// than one-by-one.
//
// TODO(#21): Separate sequencing and integration for better performance.
func (s *Storage) sequenceBatch(ctx context.Context, entries []*tessera.Entry) error {
// Return when there is no entry to sequence.
if len(entries) == 0 {
return nil
}

// Get a Tx for making transaction requests.
tx, err := s.db.BeginTx(ctx, nil)
if err != nil {
return err
}
// Defer a rollback in case anything fails.
defer func() {
if err := tx.Rollback(); err != nil && err != sql.ErrTxDone {
klog.Errorf("Failed to rollback in sequenceBatch: %v", err)
}
}()

// Get tree size from checkpoint. Note that "SELECT ... FOR UPDATE" is used for row-level locking.
// TODO(#21): Optimize how we get the tree size without parsing and verifying the checkpoints every time.
row := tx.QueryRowContext(ctx, selectCheckpointByIDForUpdateSQL, checkpointID)
if err := row.Err(); err != nil {
return err
}
var rawCheckpoint []byte
if err := row.Scan(&rawCheckpoint); err != nil {
return fmt.Errorf("failed to read checkpoint: %w", err)
}
checkpoint, err := s.parseCheckpoint(rawCheckpoint)
if err != nil {
return fmt.Errorf("failed to verify checkpoint: %w", err)
}

// Integrate the new entries into the entry bundle (TiledLeaves table) and tile (Subtree table).
if err := s.integrate(ctx, tx, checkpoint.Size, entries); err != nil {
return fmt.Errorf("failed to integrate: %w", err)
}

// Commit the transaction.
return tx.Commit()
}

// integrate incorporates the provided entries into the log starting at fromSeq.
func (s *Storage) integrate(ctx context.Context, tx *sql.Tx, fromSeq uint64, entries []*tessera.Entry) error {
tb := storage.NewTreeBuilder(func(ctx context.Context, tileIDs []storage.TileID, treeSize uint64) ([]*api.HashTile, error) {
r := make([]*api.HashTile, len(tileIDs))

// TODO(#21): Refactor the following to fully utilise the MySQL for fetching multiple tiles in one query with the same ordering.
errG := errgroup.Group{}
for i, id := range tileIDs {
i := i
id := id
errG.Go(func() error {
row := tx.QueryRowContext(ctx, selectSubtreeByLevelAndIndexSQL, id.Level, id.Index)
if err := row.Err(); err != nil {
return err
}

var tile []byte
if err := row.Scan(&tile); err != nil {
return err
}
t := &api.HashTile{}
if err := t.UnmarshalText(tile); err != nil {
return fmt.Errorf("api.HashTile.unmarshalText(level: %d, index: %d): %w", id.Level, id.Index, err)
}
r[i] = t
return nil
})
}
if err := errG.Wait(); err != nil {
return nil, err
}
return r, nil
})

// TODO(#21): Add sequenced entries to entry bundles.

sequencedEntries := make([]storage.SequencedEntry, len(entries))
// Assign provisional sequence numbers to entries.
// We need to do this here in order to support serialisations which include the log position.
for i, e := range entries {
sequencedEntries[i] = storage.SequencedEntry{
BundleData: e.MarshalBundleData(fromSeq + uint64(i)),
LeafHash: e.LeafHash(),
}
}

newSize, newRoot, tiles, err := tb.Integrate(ctx, fromSeq, sequencedEntries)
if err != nil {
return fmt.Errorf("tb.Integrate: %v", err)
}
for k, v := range tiles {
nodes, err := v.MarshalText()
if err != nil {
return err
}

if err := s.writeTile(ctx, tx, uint64(k.Level), k.Index, nodes); err != nil {
return fmt.Errorf("failed to set tile(%v): %w", k, err)
}
}

// Write new checkpoint.
if err := s.writeCheckpoint(ctx, tx, newSize, newRoot); err != nil {
return fmt.Errorf("writeCheckpoint: %w", err)
}
return nil
}

0 comments on commit 8c5a7ee

Please sign in to comment.