From b51392576cf4c5452751cb175010d2472c27a49c Mon Sep 17 00:00:00 2001 From: Al Cutter Date: Mon, 2 Dec 2024 17:34:46 +0000 Subject: [PATCH] [MySQL] Separate integration from publishing checkpoints (#344) --- cmd/conformance/mysql/main.go | 6 +- storage/mysql/mysql.go | 217 +++++++++++++++++++++++++--------- storage/mysql/mysql_test.go | 16 ++- storage/mysql/schema.sql | 18 ++- 4 files changed, 190 insertions(+), 67 deletions(-) diff --git a/cmd/conformance/mysql/main.go b/cmd/conformance/mysql/main.go index ad084075..7751597b 100644 --- a/cmd/conformance/mysql/main.go +++ b/cmd/conformance/mysql/main.go @@ -40,6 +40,7 @@ var ( initSchemaPath = flag.String("init_schema_path", "", "Location of the schema file if database initialization is needed") listen = flag.String("listen", ":2024", "Address:port to listen on") privateKeyPath = flag.String("private_key_path", "", "Location of private key file") + publishInterval = flag.Duration("publish_interval", 3*time.Second, "How frequently to publish updated checkpoints") additionalPrivateKeyPaths = []string{} ) @@ -59,7 +60,10 @@ func main() { noteSigner, additionalSigners := createSignersOrDie() // Initialise the Tessera MySQL storage - storage, err := mysql.New(ctx, db, tessera.WithCheckpointSigner(noteSigner, additionalSigners...)) + storage, err := mysql.New(ctx, db, + tessera.WithCheckpointSigner(noteSigner, additionalSigners...), + tessera.WithCheckpointInterval(*publishInterval), + ) if err != nil { klog.Exitf("Failed to create new MySQL storage: %v", err) } diff --git a/storage/mysql/mysql.go b/storage/mysql/mysql.go index 99de03bb..15e236af 100644 --- a/storage/mysql/mysql.go +++ b/storage/mysql/mysql.go @@ -22,27 +22,31 @@ import ( "errors" "fmt" "strings" + "time" _ "github.com/go-sql-driver/mysql" "github.com/transparency-dev/merkle/rfc6962" tessera "github.com/transparency-dev/trillian-tessera" "github.com/transparency-dev/trillian-tessera/api" options "github.com/transparency-dev/trillian-tessera/internal/options" - "github.com/transparency-dev/trillian-tessera/internal/parse" - "github.com/transparency-dev/trillian-tessera/storage/internal" + storage "github.com/transparency-dev/trillian-tessera/storage/internal" "k8s.io/klog/v2" ) const ( - selectCheckpointByIDSQL = "SELECT `note` FROM `Checkpoint` WHERE `id` = ?" + selectCheckpointByIDSQL = "SELECT `note`, `published_at` FROM `Checkpoint` WHERE `id` = ?" selectCheckpointByIDForUpdateSQL = selectCheckpointByIDSQL + " FOR UPDATE" - replaceCheckpointSQL = "REPLACE INTO `Checkpoint` (`id`, `note`) VALUES (?, ?)" + replaceCheckpointSQL = "REPLACE INTO `Checkpoint` (`id`, `note`, `published_at`) VALUES (?, ?, ?)" + selectTreeStateByIDSQL = "SELECT `size`, `root` FROM `TreeState` WHERE `id` = ?" + selectTreeStateByIDForUpdateSQL = selectTreeStateByIDSQL + " FOR UPDATE" + replaceTreeStateSQL = "REPLACE INTO `TreeState` (`id`, `size`, `root`) VALUES (?, ?, ?)" selectSubtreeByLevelAndIndexSQL = "SELECT `nodes` FROM `Subtree` WHERE `level` = ? AND `index` = ?" replaceSubtreeSQL = "REPLACE INTO `Subtree` (`level`, `index`, `nodes`) VALUES (?, ?, ?)" selectTiledLeavesSQL = "SELECT `data` FROM `TiledLeaves` WHERE `tile_index` = ?" replaceTiledLeavesSQL = "REPLACE INTO `TiledLeaves` (`tile_index`, `data`) VALUES (?, ?)" checkpointID = 0 + treeStateID = 0 entryBundleSize = 256 ) @@ -52,6 +56,8 @@ type Storage struct { queue *storage.Queue newCheckpoint options.NewCPFunc + + cpUpdated chan struct{} } // New creates a new instance of the MySQL-based Storage. @@ -61,6 +67,7 @@ func New(ctx context.Context, db *sql.DB, opts ...func(*options.StorageOptions)) s := &Storage{ db: db, newCheckpoint: opt.NewCP, + cpUpdated: make(chan struct{}, 1), } if err := s.db.Ping(); err != nil { klog.Errorf("Failed to ping database: %v", err) @@ -72,36 +79,66 @@ func New(ctx context.Context, db *sql.DB, opts ...func(*options.StorageOptions)) s.queue = storage.NewQueue(ctx, opt.BatchMaxAge, opt.BatchMaxSize, s.sequenceBatch) - // Initialize checkpoint if there is no row in the Checkpoint table. - checkpoint, err := s.ReadCheckpoint(ctx) + if err := s.maybeInitTree(ctx); err != nil { + return nil, fmt.Errorf("maybeInitTree: %v", err) + } + + go func(ctx context.Context, i time.Duration) { + t := time.NewTicker(i) + defer t.Stop() + for { + select { + case <-ctx.Done(): + return + case <-s.cpUpdated: + case <-t.C: + } + if err := s.publishCheckpoint(ctx, i); err != nil { + klog.Warningf("publishCheckpoint: %v", err) + } + } + }(ctx, opt.CheckpointInterval) + return s, nil +} + +// maybeInitTree will insert an initial "empty tree" row into the +// TreeState table iff no row already exists. +// +// This method doesn't also publish this new empty tree as a Checkpoint, +// rather, such a checkpoint will be published asynchronously by the +// same mechanism used to publish future checkpoints. Although in _this_ +// case it would be expected to happen in very short order given that it's +// likely that no row currently exists in the Checkpoints table either. +func (s *Storage) maybeInitTree(ctx context.Context) error { + tx, err := s.db.BeginTx(ctx, nil) if err != nil { - klog.Errorf("Failed to read checkpoint: %v", err) - return nil, err + return fmt.Errorf("being tx init tree state: %v", err) } - if checkpoint == nil { - klog.Infof("Initializing checkpoint") - // Get a Tx for making transaction requests. - tx, err := s.db.BeginTx(ctx, nil) - if err != nil { - return nil, err + defer func() { + if err := tx.Rollback(); err != nil && err != sql.ErrTxDone { + klog.Errorf("Failed to rollback in write initial tree state: %v", err) } - // Defer a rollback in case anything fails. - defer func() { - if err := tx.Rollback(); err != nil && err != sql.ErrTxDone { - klog.Errorf("Failed to rollback in write initial checkpoint: %v", err) - } - }() - if err := s.writeCheckpoint(ctx, tx, 0, rfc6962.DefaultHasher.EmptyRoot()); err != nil { - klog.Errorf("Failed to write initial checkpoint: %v", err) - return nil, err + }() + + treeState, err := s.readTreeState(ctx, tx) + if err != nil { + klog.Errorf("Failed to read tree state: %v", err) + return err + } + if treeState == nil { + klog.Infof("Initializing tree state") + if err := s.writeTreeState(ctx, tx, 0, rfc6962.DefaultHasher.EmptyRoot()); err != nil { + klog.Errorf("Failed to write initial tree state: %v", err) + return err } - // Commit the transaction. + // Only need to commit if we've actually initialised the tree state, otherwise we'll + // rely on the defer'd rollback to tidy up. if err := tx.Commit(); err != nil { - return nil, err + return fmt.Errorf("commit init tree state: %v", err) } + s.cpUpdated <- struct{}{} } - - return s, nil + return nil } // ReadCheckpoint returns the latest stored checkpoint. @@ -113,24 +150,84 @@ func (s *Storage) ReadCheckpoint(ctx context.Context) ([]byte, error) { } var checkpoint []byte - if err := row.Scan(&checkpoint); err != nil { + var at int64 + if err := row.Scan(&checkpoint, &at); err != nil { if err == sql.ErrNoRows { return nil, nil } - return nil, err + return nil, fmt.Errorf("scan checkpoint: %v", err) } return checkpoint, nil } -// writeCheckpoint stores the log signed checkpoint. -func (s *Storage) writeCheckpoint(ctx context.Context, tx *sql.Tx, size uint64, rootHash []byte) error { - rawCheckpoint, err := s.newCheckpoint(size, rootHash) +// publishCheckpoint creates a new checkpoint for the given size and root hash, and stores it in the +// Checkpoint table. +func (s *Storage) publishCheckpoint(ctx context.Context, interval time.Duration) error { + tx, err := s.db.BeginTx(ctx, nil) + if err != nil { + return fmt.Errorf("begin tx: %v", err) + } + defer func() { + if err := tx.Rollback(); err != nil && err != sql.ErrTxDone { + klog.Warningf("publishCheckpoint rollback failed: %v", err) + } + }() + + var note string + var at int64 + if err := tx.QueryRowContext(ctx, selectCheckpointByIDForUpdateSQL, checkpointID).Scan(¬e, &at); err != nil && !errors.Is(err, sql.ErrNoRows) { + return fmt.Errorf("scan checkpoint: %v", err) + } + if time.Since(time.UnixMilli(at)) < interval { + // Too soon, try again later. + klog.V(1).Info("skipping publish - too soon") + return nil + } + + treeState, err := s.readTreeState(ctx, tx) if err != nil { + return fmt.Errorf("readTreeState: %v", err) + } + + rawCheckpoint, err := s.newCheckpoint(treeState.size, treeState.root) + if err != nil { + return err + } + + if _, err := tx.ExecContext(ctx, replaceCheckpointSQL, checkpointID, rawCheckpoint, time.Now().UnixMilli()); err != nil { return err } - if _, err := tx.ExecContext(ctx, replaceCheckpointSQL, checkpointID, rawCheckpoint); err != nil { - klog.Errorf("Failed to execute replaceCheckpointSQL: %v", err) + return tx.Commit() +} + +type treeState struct { + size uint64 + root []byte +} + +// readTreeState returns the currently stored tree state information. +// If there is no stored tree state, nil is returned with no error. +func (s *Storage) readTreeState(ctx context.Context, tx *sql.Tx) (*treeState, error) { + row := tx.QueryRowContext(ctx, selectTreeStateByIDForUpdateSQL, treeStateID) + if err := row.Err(); err != nil { + return nil, err + } + + r := &treeState{} + if err := row.Scan(&r.size, &r.root); err != nil { + if err == sql.ErrNoRows { + return nil, nil + } + return nil, fmt.Errorf("scan tree state: %v", err) + } + return r, nil +} + +// writeTreeState updates the TreeState table with the new tree state information. +func (s *Storage) writeTreeState(ctx context.Context, tx *sql.Tx, size uint64, rootHash []byte) error { + if _, err := tx.ExecContext(ctx, replaceTreeStateSQL, treeStateID, size, rootHash); err != nil { + klog.Errorf("Failed to execute replaceTreeStateSQL: %v", err) return err } @@ -158,7 +255,7 @@ func (s *Storage) ReadTile(ctx context.Context, level, index, width uint64) ([]b return nil, nil } - return nil, err + return nil, fmt.Errorf("scan tile: %v", err) } // Return nil when returning a partial tile on a full tile request. @@ -200,7 +297,7 @@ func (s *Storage) ReadEntryBundle(ctx context.Context, index, treeSize uint64) ( return nil, nil } - return nil, err + return nil, fmt.Errorf("scan entry bundle: %v", err) } return entryBundle, nil @@ -237,7 +334,7 @@ func (s *Storage) sequenceBatch(ctx context.Context, entries []*tessera.Entry) e // Get a Tx for making transaction requests. tx, err := s.db.BeginTx(ctx, nil) if err != nil { - return err + return fmt.Errorf("begin tx: %v", err) } // Defer a rollback in case anything fails. defer func() { @@ -246,28 +343,30 @@ func (s *Storage) sequenceBatch(ctx context.Context, entries []*tessera.Entry) e } }() - // Get tree size from checkpoint. Note that "SELECT ... FOR UPDATE" is used for row-level locking. - // TODO(#21): Optimize how we get the tree size without parsing and verifying the checkpoints every time. - row := tx.QueryRowContext(ctx, selectCheckpointByIDForUpdateSQL, checkpointID) + // Get tree size. Note that "SELECT ... FOR UPDATE" is used for row-level locking. + row := tx.QueryRowContext(ctx, selectTreeStateByIDForUpdateSQL, treeStateID) if err := row.Err(); err != nil { - return err + return fmt.Errorf("select tree state: %v", err) } - var rawCheckpoint []byte - if err := row.Scan(&rawCheckpoint); err != nil { - return fmt.Errorf("failed to read checkpoint: %w", err) - } - _, size, err := parse.CheckpointUnsafe(rawCheckpoint) - if err != nil { - return fmt.Errorf("failed to parse checkpoint: %w", err) + state := treeState{} + if err := row.Scan(&state.size, &state.root); err != nil { + return fmt.Errorf("failed to read tree state: %w", err) } // Integrate the new entries into the entry bundle (TiledLeaves table) and tile (Subtree table). - if err := s.integrate(ctx, tx, size, entries); err != nil { + if err := s.integrate(ctx, tx, state.size, entries); err != nil { return fmt.Errorf("failed to integrate: %w", err) } // Commit the transaction. - return tx.Commit() + err = tx.Commit() + + select { + case s.cpUpdated <- struct{}{}: + default: + } + + return err } // integrate incorporates the provided entries into the log starting at fromSeq. @@ -306,17 +405,17 @@ func (s *Storage) integrate(ctx context.Context, tx *sql.Tx, fromSeq uint64, ent for rows.Next() { var tile []byte if err := rows.Scan(&tile); err != nil { - return nil, fmt.Errorf("rows.Scan: %w", err) + return nil, fmt.Errorf("scan subtree tile: %w", err) } t := &api.HashTile{} if err := t.UnmarshalText(tile); err != nil { - return nil, fmt.Errorf("api.HashTile.unmarshalText: %w", err) + return nil, fmt.Errorf("unmarshal tile: %w", err) } hashTiles[i] = t i++ } if err = rows.Err(); err != nil { - return nil, fmt.Errorf("rows.Err: %w", err) + return nil, fmt.Errorf("rows error while fetching subtrees: %w", err) } return hashTiles, nil @@ -340,23 +439,23 @@ func (s *Storage) integrate(ctx context.Context, tx *sql.Tx, fromSeq uint64, ent if entriesInBundle > 0 { row := tx.QueryRowContext(ctx, selectTiledLeavesSQL, bundleIndex) if err := row.Err(); err != nil { - return err + return fmt.Errorf("query tiled leaves: %v", err) } var partialEntryBundle []byte if err := row.Scan(&partialEntryBundle); err != nil { - return fmt.Errorf("row.Scan: %w", err) + return fmt.Errorf("scan partial entry bundle: %w", err) } if _, err := bundleWriter.Write(partialEntryBundle); err != nil { - return fmt.Errorf("bundleWriter: %w", err) + return fmt.Errorf("write partial entry bundle: %w", err) } } // Add new entries to the bundle. for _, e := range sequencedEntries { if _, err := bundleWriter.Write(e.BundleData); err != nil { - return fmt.Errorf("bundleWriter.Write: %w", err) + return fmt.Errorf("write bundle data: %w", err) } entriesInBundle++ @@ -396,8 +495,8 @@ func (s *Storage) integrate(ctx context.Context, tx *sql.Tx, fromSeq uint64, ent } } - // Write new checkpoint. - if err := s.writeCheckpoint(ctx, tx, newSize, newRoot); err != nil { + // Write new tree state. + if err := s.writeTreeState(ctx, tx, newSize, newRoot); err != nil { return fmt.Errorf("writeCheckpoint: %w", err) } return nil diff --git a/storage/mysql/mysql_test.go b/storage/mysql/mysql_test.go index dc2b669e..6d4d98ef 100644 --- a/storage/mysql/mysql_test.go +++ b/storage/mysql/mysql_test.go @@ -35,6 +35,7 @@ import ( options "github.com/transparency-dev/trillian-tessera/internal/options" "github.com/transparency-dev/trillian-tessera/storage/mysql" "golang.org/x/mod/sumdb/note" + "golang.org/x/sync/errgroup" "k8s.io/klog/v2" ) @@ -98,7 +99,7 @@ func TestMain(m *testing.M) { // `multiStatements=true` in the data source name allows multiple statements in one query. // This is not being used in the actual MySQL storage implementation. func initDatabaseSchema(ctx context.Context) { - dropTablesSQL := "DROP TABLE IF EXISTS `Checkpoint`, `Subtree`, `TiledLeaves`" + dropTablesSQL := "DROP TABLE IF EXISTS `Checkpoint`, `Subtree`, `TiledLeaves`, `TreeState`" rawSchema, err := os.ReadFile("schema.sql") if err != nil { @@ -243,12 +244,15 @@ func TestParallelAdd(t *testing.T) { }, } { t.Run(test.name, func(t *testing.T) { + eG := errgroup.Group{} for i := 0; i < 1024; i++ { - go func() { - if _, err := s.Add(ctx, tessera.NewEntry(test.entry))(); err != nil { - t.Errorf("got err: %v", err) - } - }() + eG.Go(func() error { + _, err := s.Add(ctx, tessera.NewEntry(test.entry))() + return err + }) + } + if err := eG.Wait(); err != nil { + t.Errorf("got err: %v", err) } }) } diff --git a/storage/mysql/schema.sql b/storage/mysql/schema.sql index 485c4fb4..955c5422 100644 --- a/storage/mysql/schema.sql +++ b/storage/mysql/schema.sql @@ -14,12 +14,28 @@ -- MySQL version of the Trillian Tessera database schema. --- "Checkpoint" table stores a single row that records the current state of the log. It is updated after every sequence and integration. +-- "Checkpoint" table stores a single row that records the latest _published_ checkpoint for the log. +-- This is stored separately from the TreeState in order to enable publishing of commitments to updated tree states to happen +-- on an indepentent timeframe to the internal updating of state. CREATE TABLE IF NOT EXISTS `Checkpoint` ( -- id is expected to be always 0 to maintain a maximum of a single row. `id` INT UNSIGNED NOT NULL, -- note is the text signed by one or more keys in the checkpoint format. See https://c2sp.org/tlog-checkpoint and https://c2sp.org/signed-note. `note` MEDIUMBLOB NOT NULL, + -- published_at is the millisecond UNIX timestamp of when this row was written. + `published_at` BIGINT NOT NULL, + PRIMARY KEY(`id`) +); + +-- "TreeState" table stores the current state of the integrated tree. +-- This is not the same thing as a Checkpoint, which is a signed commitment to such a state. +CREATE TABLE IF NOT EXISTS `TreeState` ( + -- id is expected to be always 0 to maintain a maximum of a single row. + `id` INT UNSIGNED NOT NULL, + -- size is the extent of the currently integrated tree. + `size` BIGINT UNSIGNED NOT NULL, + -- root is the root hash of the tree at the size stored in `size`. + `root` TINYBLOB NOT NULL, PRIMARY KEY(`id`) );