Skip to content

Commit

Permalink
[MySQL] Separate integration from publishing checkpoints (#344)
Browse files Browse the repository at this point in the history
  • Loading branch information
AlCutter authored Dec 2, 2024
1 parent ef536ca commit b513925
Show file tree
Hide file tree
Showing 4 changed files with 190 additions and 67 deletions.
6 changes: 5 additions & 1 deletion cmd/conformance/mysql/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ var (
initSchemaPath = flag.String("init_schema_path", "", "Location of the schema file if database initialization is needed")
listen = flag.String("listen", ":2024", "Address:port to listen on")
privateKeyPath = flag.String("private_key_path", "", "Location of private key file")
publishInterval = flag.Duration("publish_interval", 3*time.Second, "How frequently to publish updated checkpoints")
additionalPrivateKeyPaths = []string{}
)

Expand All @@ -59,7 +60,10 @@ func main() {
noteSigner, additionalSigners := createSignersOrDie()

// Initialise the Tessera MySQL storage
storage, err := mysql.New(ctx, db, tessera.WithCheckpointSigner(noteSigner, additionalSigners...))
storage, err := mysql.New(ctx, db,
tessera.WithCheckpointSigner(noteSigner, additionalSigners...),
tessera.WithCheckpointInterval(*publishInterval),
)
if err != nil {
klog.Exitf("Failed to create new MySQL storage: %v", err)
}
Expand Down
217 changes: 158 additions & 59 deletions storage/mysql/mysql.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,27 +22,31 @@ import (
"errors"
"fmt"
"strings"
"time"

_ "github.com/go-sql-driver/mysql"
"github.com/transparency-dev/merkle/rfc6962"
tessera "github.com/transparency-dev/trillian-tessera"
"github.com/transparency-dev/trillian-tessera/api"
options "github.com/transparency-dev/trillian-tessera/internal/options"
"github.com/transparency-dev/trillian-tessera/internal/parse"
"github.com/transparency-dev/trillian-tessera/storage/internal"
storage "github.com/transparency-dev/trillian-tessera/storage/internal"
"k8s.io/klog/v2"
)

const (
selectCheckpointByIDSQL = "SELECT `note` FROM `Checkpoint` WHERE `id` = ?"
selectCheckpointByIDSQL = "SELECT `note`, `published_at` FROM `Checkpoint` WHERE `id` = ?"
selectCheckpointByIDForUpdateSQL = selectCheckpointByIDSQL + " FOR UPDATE"
replaceCheckpointSQL = "REPLACE INTO `Checkpoint` (`id`, `note`) VALUES (?, ?)"
replaceCheckpointSQL = "REPLACE INTO `Checkpoint` (`id`, `note`, `published_at`) VALUES (?, ?, ?)"
selectTreeStateByIDSQL = "SELECT `size`, `root` FROM `TreeState` WHERE `id` = ?"
selectTreeStateByIDForUpdateSQL = selectTreeStateByIDSQL + " FOR UPDATE"
replaceTreeStateSQL = "REPLACE INTO `TreeState` (`id`, `size`, `root`) VALUES (?, ?, ?)"
selectSubtreeByLevelAndIndexSQL = "SELECT `nodes` FROM `Subtree` WHERE `level` = ? AND `index` = ?"
replaceSubtreeSQL = "REPLACE INTO `Subtree` (`level`, `index`, `nodes`) VALUES (?, ?, ?)"
selectTiledLeavesSQL = "SELECT `data` FROM `TiledLeaves` WHERE `tile_index` = ?"
replaceTiledLeavesSQL = "REPLACE INTO `TiledLeaves` (`tile_index`, `data`) VALUES (?, ?)"

checkpointID = 0
treeStateID = 0
entryBundleSize = 256
)

Expand All @@ -52,6 +56,8 @@ type Storage struct {
queue *storage.Queue

newCheckpoint options.NewCPFunc

cpUpdated chan struct{}
}

// New creates a new instance of the MySQL-based Storage.
Expand All @@ -61,6 +67,7 @@ func New(ctx context.Context, db *sql.DB, opts ...func(*options.StorageOptions))
s := &Storage{
db: db,
newCheckpoint: opt.NewCP,
cpUpdated: make(chan struct{}, 1),
}
if err := s.db.Ping(); err != nil {
klog.Errorf("Failed to ping database: %v", err)
Expand All @@ -72,36 +79,66 @@ func New(ctx context.Context, db *sql.DB, opts ...func(*options.StorageOptions))

s.queue = storage.NewQueue(ctx, opt.BatchMaxAge, opt.BatchMaxSize, s.sequenceBatch)

// Initialize checkpoint if there is no row in the Checkpoint table.
checkpoint, err := s.ReadCheckpoint(ctx)
if err := s.maybeInitTree(ctx); err != nil {
return nil, fmt.Errorf("maybeInitTree: %v", err)
}

go func(ctx context.Context, i time.Duration) {
t := time.NewTicker(i)
defer t.Stop()
for {
select {
case <-ctx.Done():
return
case <-s.cpUpdated:
case <-t.C:
}
if err := s.publishCheckpoint(ctx, i); err != nil {
klog.Warningf("publishCheckpoint: %v", err)
}
}
}(ctx, opt.CheckpointInterval)
return s, nil
}

// maybeInitTree will insert an initial "empty tree" row into the
// TreeState table iff no row already exists.
//
// This method doesn't also publish this new empty tree as a Checkpoint,
// rather, such a checkpoint will be published asynchronously by the
// same mechanism used to publish future checkpoints. Although in _this_
// case it would be expected to happen in very short order given that it's
// likely that no row currently exists in the Checkpoints table either.
func (s *Storage) maybeInitTree(ctx context.Context) error {
tx, err := s.db.BeginTx(ctx, nil)
if err != nil {
klog.Errorf("Failed to read checkpoint: %v", err)
return nil, err
return fmt.Errorf("being tx init tree state: %v", err)
}
if checkpoint == nil {
klog.Infof("Initializing checkpoint")
// Get a Tx for making transaction requests.
tx, err := s.db.BeginTx(ctx, nil)
if err != nil {
return nil, err
defer func() {
if err := tx.Rollback(); err != nil && err != sql.ErrTxDone {
klog.Errorf("Failed to rollback in write initial tree state: %v", err)
}
// Defer a rollback in case anything fails.
defer func() {
if err := tx.Rollback(); err != nil && err != sql.ErrTxDone {
klog.Errorf("Failed to rollback in write initial checkpoint: %v", err)
}
}()
if err := s.writeCheckpoint(ctx, tx, 0, rfc6962.DefaultHasher.EmptyRoot()); err != nil {
klog.Errorf("Failed to write initial checkpoint: %v", err)
return nil, err
}()

treeState, err := s.readTreeState(ctx, tx)
if err != nil {
klog.Errorf("Failed to read tree state: %v", err)
return err
}
if treeState == nil {
klog.Infof("Initializing tree state")
if err := s.writeTreeState(ctx, tx, 0, rfc6962.DefaultHasher.EmptyRoot()); err != nil {
klog.Errorf("Failed to write initial tree state: %v", err)
return err
}
// Commit the transaction.
// Only need to commit if we've actually initialised the tree state, otherwise we'll
// rely on the defer'd rollback to tidy up.
if err := tx.Commit(); err != nil {
return nil, err
return fmt.Errorf("commit init tree state: %v", err)
}
s.cpUpdated <- struct{}{}
}

return s, nil
return nil
}

// ReadCheckpoint returns the latest stored checkpoint.
Expand All @@ -113,24 +150,84 @@ func (s *Storage) ReadCheckpoint(ctx context.Context) ([]byte, error) {
}

var checkpoint []byte
if err := row.Scan(&checkpoint); err != nil {
var at int64
if err := row.Scan(&checkpoint, &at); err != nil {
if err == sql.ErrNoRows {
return nil, nil
}
return nil, err
return nil, fmt.Errorf("scan checkpoint: %v", err)
}
return checkpoint, nil
}

// writeCheckpoint stores the log signed checkpoint.
func (s *Storage) writeCheckpoint(ctx context.Context, tx *sql.Tx, size uint64, rootHash []byte) error {
rawCheckpoint, err := s.newCheckpoint(size, rootHash)
// publishCheckpoint creates a new checkpoint for the given size and root hash, and stores it in the
// Checkpoint table.
func (s *Storage) publishCheckpoint(ctx context.Context, interval time.Duration) error {
tx, err := s.db.BeginTx(ctx, nil)
if err != nil {
return fmt.Errorf("begin tx: %v", err)
}
defer func() {
if err := tx.Rollback(); err != nil && err != sql.ErrTxDone {
klog.Warningf("publishCheckpoint rollback failed: %v", err)
}
}()

var note string
var at int64
if err := tx.QueryRowContext(ctx, selectCheckpointByIDForUpdateSQL, checkpointID).Scan(&note, &at); err != nil && !errors.Is(err, sql.ErrNoRows) {
return fmt.Errorf("scan checkpoint: %v", err)
}
if time.Since(time.UnixMilli(at)) < interval {
// Too soon, try again later.
klog.V(1).Info("skipping publish - too soon")
return nil
}

treeState, err := s.readTreeState(ctx, tx)
if err != nil {
return fmt.Errorf("readTreeState: %v", err)
}

rawCheckpoint, err := s.newCheckpoint(treeState.size, treeState.root)
if err != nil {
return err
}

if _, err := tx.ExecContext(ctx, replaceCheckpointSQL, checkpointID, rawCheckpoint, time.Now().UnixMilli()); err != nil {
return err
}

if _, err := tx.ExecContext(ctx, replaceCheckpointSQL, checkpointID, rawCheckpoint); err != nil {
klog.Errorf("Failed to execute replaceCheckpointSQL: %v", err)
return tx.Commit()
}

type treeState struct {
size uint64
root []byte
}

// readTreeState returns the currently stored tree state information.
// If there is no stored tree state, nil is returned with no error.
func (s *Storage) readTreeState(ctx context.Context, tx *sql.Tx) (*treeState, error) {
row := tx.QueryRowContext(ctx, selectTreeStateByIDForUpdateSQL, treeStateID)
if err := row.Err(); err != nil {
return nil, err
}

r := &treeState{}
if err := row.Scan(&r.size, &r.root); err != nil {
if err == sql.ErrNoRows {
return nil, nil
}
return nil, fmt.Errorf("scan tree state: %v", err)
}
return r, nil
}

// writeTreeState updates the TreeState table with the new tree state information.
func (s *Storage) writeTreeState(ctx context.Context, tx *sql.Tx, size uint64, rootHash []byte) error {
if _, err := tx.ExecContext(ctx, replaceTreeStateSQL, treeStateID, size, rootHash); err != nil {
klog.Errorf("Failed to execute replaceTreeStateSQL: %v", err)
return err
}

Expand Down Expand Up @@ -158,7 +255,7 @@ func (s *Storage) ReadTile(ctx context.Context, level, index, width uint64) ([]b
return nil, nil
}

return nil, err
return nil, fmt.Errorf("scan tile: %v", err)
}

// Return nil when returning a partial tile on a full tile request.
Expand Down Expand Up @@ -200,7 +297,7 @@ func (s *Storage) ReadEntryBundle(ctx context.Context, index, treeSize uint64) (
return nil, nil
}

return nil, err
return nil, fmt.Errorf("scan entry bundle: %v", err)
}

return entryBundle, nil
Expand Down Expand Up @@ -237,7 +334,7 @@ func (s *Storage) sequenceBatch(ctx context.Context, entries []*tessera.Entry) e
// Get a Tx for making transaction requests.
tx, err := s.db.BeginTx(ctx, nil)
if err != nil {
return err
return fmt.Errorf("begin tx: %v", err)
}
// Defer a rollback in case anything fails.
defer func() {
Expand All @@ -246,28 +343,30 @@ func (s *Storage) sequenceBatch(ctx context.Context, entries []*tessera.Entry) e
}
}()

// Get tree size from checkpoint. Note that "SELECT ... FOR UPDATE" is used for row-level locking.
// TODO(#21): Optimize how we get the tree size without parsing and verifying the checkpoints every time.
row := tx.QueryRowContext(ctx, selectCheckpointByIDForUpdateSQL, checkpointID)
// Get tree size. Note that "SELECT ... FOR UPDATE" is used for row-level locking.
row := tx.QueryRowContext(ctx, selectTreeStateByIDForUpdateSQL, treeStateID)
if err := row.Err(); err != nil {
return err
return fmt.Errorf("select tree state: %v", err)
}
var rawCheckpoint []byte
if err := row.Scan(&rawCheckpoint); err != nil {
return fmt.Errorf("failed to read checkpoint: %w", err)
}
_, size, err := parse.CheckpointUnsafe(rawCheckpoint)
if err != nil {
return fmt.Errorf("failed to parse checkpoint: %w", err)
state := treeState{}
if err := row.Scan(&state.size, &state.root); err != nil {
return fmt.Errorf("failed to read tree state: %w", err)
}

// Integrate the new entries into the entry bundle (TiledLeaves table) and tile (Subtree table).
if err := s.integrate(ctx, tx, size, entries); err != nil {
if err := s.integrate(ctx, tx, state.size, entries); err != nil {
return fmt.Errorf("failed to integrate: %w", err)
}

// Commit the transaction.
return tx.Commit()
err = tx.Commit()

select {
case s.cpUpdated <- struct{}{}:
default:
}

return err
}

// integrate incorporates the provided entries into the log starting at fromSeq.
Expand Down Expand Up @@ -306,17 +405,17 @@ func (s *Storage) integrate(ctx context.Context, tx *sql.Tx, fromSeq uint64, ent
for rows.Next() {
var tile []byte
if err := rows.Scan(&tile); err != nil {
return nil, fmt.Errorf("rows.Scan: %w", err)
return nil, fmt.Errorf("scan subtree tile: %w", err)
}
t := &api.HashTile{}
if err := t.UnmarshalText(tile); err != nil {
return nil, fmt.Errorf("api.HashTile.unmarshalText: %w", err)
return nil, fmt.Errorf("unmarshal tile: %w", err)
}
hashTiles[i] = t
i++
}
if err = rows.Err(); err != nil {
return nil, fmt.Errorf("rows.Err: %w", err)
return nil, fmt.Errorf("rows error while fetching subtrees: %w", err)
}

return hashTiles, nil
Expand All @@ -340,23 +439,23 @@ func (s *Storage) integrate(ctx context.Context, tx *sql.Tx, fromSeq uint64, ent
if entriesInBundle > 0 {
row := tx.QueryRowContext(ctx, selectTiledLeavesSQL, bundleIndex)
if err := row.Err(); err != nil {
return err
return fmt.Errorf("query tiled leaves: %v", err)
}

var partialEntryBundle []byte
if err := row.Scan(&partialEntryBundle); err != nil {
return fmt.Errorf("row.Scan: %w", err)
return fmt.Errorf("scan partial entry bundle: %w", err)
}

if _, err := bundleWriter.Write(partialEntryBundle); err != nil {
return fmt.Errorf("bundleWriter: %w", err)
return fmt.Errorf("write partial entry bundle: %w", err)
}
}

// Add new entries to the bundle.
for _, e := range sequencedEntries {
if _, err := bundleWriter.Write(e.BundleData); err != nil {
return fmt.Errorf("bundleWriter.Write: %w", err)
return fmt.Errorf("write bundle data: %w", err)
}
entriesInBundle++

Expand Down Expand Up @@ -396,8 +495,8 @@ func (s *Storage) integrate(ctx context.Context, tx *sql.Tx, fromSeq uint64, ent
}
}

// Write new checkpoint.
if err := s.writeCheckpoint(ctx, tx, newSize, newRoot); err != nil {
// Write new tree state.
if err := s.writeTreeState(ctx, tx, newSize, newRoot); err != nil {
return fmt.Errorf("writeCheckpoint: %w", err)
}
return nil
Expand Down
Loading

0 comments on commit b513925

Please sign in to comment.