diff --git a/cmd/example-mysql/main.go b/cmd/example-mysql/main.go index f157dbb3..4ba5461f 100644 --- a/cmd/example-mysql/main.go +++ b/cmd/example-mysql/main.go @@ -17,7 +17,6 @@ package main import ( "context" - "crypto/sha256" "database/sql" "flag" "fmt" @@ -160,12 +159,22 @@ func main() { w.WriteHeader(http.StatusInternalServerError) return } - defer r.Body.Close() - - id := sha256.Sum256(b) - _ = tessera.NewEntry(b, tessera.WithIdentity(id[:])) + defer func() { + if err := r.Body.Close(); err != nil { + klog.Warningf("/add: %v", err) + } + }() - // TODO: Add entry to log and return assigned index. + idx, err := storage.Add(r.Context(), tessera.NewEntry(b)) + if err != nil { + w.WriteHeader(http.StatusInternalServerError) + _, _ = w.Write([]byte(err.Error())) + return + } + if _, err = w.Write([]byte(fmt.Sprintf("%d", idx))); err != nil { + klog.Errorf("/add: %v", err) + return + } }) if err := http.ListenAndServe(*listen, http.DefaultServeMux); err != nil { diff --git a/storage/mysql/mysql.go b/storage/mysql/mysql.go index 97ac6c48..80d1692e 100644 --- a/storage/mysql/mysql.go +++ b/storage/mysql/mysql.go @@ -18,41 +18,59 @@ package mysql import ( "context" "database/sql" + "fmt" + "time" _ "github.com/go-sql-driver/mysql" "github.com/transparency-dev/merkle/rfc6962" tessera "github.com/transparency-dev/trillian-tessera" + "github.com/transparency-dev/trillian-tessera/api" + "github.com/transparency-dev/trillian-tessera/storage" + "golang.org/x/sync/errgroup" "k8s.io/klog/v2" ) const ( - selectCheckpointByIDSQL = "SELECT `note` FROM `Checkpoint` WHERE `id` = ?" - replaceCheckpointSQL = "REPLACE INTO `Checkpoint` (`id`, `note`) VALUES (?, ?)" - selectSubtreeByLevelAndIndexSQL = "SELECT `nodes` FROM `Subtree` WHERE `level` = ? AND `index` = ?" - selectTiledLeavesSQL = "SELECT `data` FROM `TiledLeaves` WHERE `tile_index` = ?" + selectCheckpointByIDSQL = "SELECT `note` FROM `Checkpoint` WHERE `id` = ?" + selectCheckpointByIDForUpdateSQL = selectCheckpointByIDSQL + " FOR UPDATE" + replaceCheckpointSQL = "REPLACE INTO `Checkpoint` (`id`, `note`) VALUES (?, ?)" + selectSubtreeByLevelAndIndexSQL = "SELECT `nodes` FROM `Subtree` WHERE `level` = ? AND `index` = ?" + replaceSubtreeSQL = "REPLACE INTO `Subtree` (`level`, `index`, `nodes`) VALUES (?, ?, ?)" + selectTiledLeavesSQL = "SELECT `data` FROM `TiledLeaves` WHERE `tile_index` = ?" + replaceTiledLeavesSQL = "REPLACE INTO `TiledLeaves` (`tile_index`, `data`) VALUES (?, ?)" - checkpointID = 0 + checkpointID = 0 + defaultEntryBundleSize = 256 + defaultQueueMaxAge = time.Second ) // Storage is a MySQL-based storage implementation for Tessera. type Storage struct { - db *sql.DB + db *sql.DB + queue *storage.Queue - newCheckpoint tessera.NewCPFunc + newCheckpoint tessera.NewCPFunc + parseCheckpoint tessera.ParseCPFunc } // New creates a new instance of the MySQL-based Storage. func New(ctx context.Context, db *sql.DB, opts ...func(*tessera.StorageOptions)) (*Storage, error) { - opt := tessera.ResolveStorageOptions(nil, opts...) + opt := tessera.ResolveStorageOptions(&tessera.StorageOptions{ + BatchMaxAge: defaultQueueMaxAge, + BatchMaxSize: defaultEntryBundleSize, + }, opts...) s := &Storage{ - db: db, - newCheckpoint: opt.NewCP, + db: db, + newCheckpoint: opt.NewCP, + parseCheckpoint: opt.ParseCP, } if err := s.db.Ping(); err != nil { klog.Errorf("Failed to ping database: %v", err) return nil, err } + s.queue = storage.NewQueue(ctx, opt.BatchMaxAge, opt.BatchMaxSize, s.sequenceBatch) + // Initialize checkpoint if there is no row in the Checkpoint table. if _, err := s.ReadCheckpoint(ctx); err != nil { if err != sql.ErrNoRows { @@ -111,7 +129,7 @@ func (s *Storage) writeCheckpoint(ctx context.Context, tx *sql.Tx, size uint64, return nil } -// ReadTile returns a full tile or a partial tile at the given level and index. +// ReadTile returns a full tile or a partial tile at the given level, index and width. // // TODO: Handle the following scenarios: // 1. Full tile request with full tile output: Return full tile. @@ -142,6 +160,16 @@ func (s *Storage) ReadTile(ctx context.Context, level, index, width uint64) ([]b return tile, nil } +// writeTile replaces the tile nodes at the given level and index. +func (s *Storage) writeTile(ctx context.Context, tx *sql.Tx, level, index uint64, nodes []byte) error { + if _, err := tx.ExecContext(ctx, replaceSubtreeSQL, level, index, nodes); err != nil { + klog.Errorf("Failed to execute replaceSubtreeSQL: %v", err) + return err + } + + return nil +} + // ReadEntryBundle returns the log entries at the given index. // // TODO: Handle the following scenarios: @@ -159,3 +187,128 @@ func (s *Storage) ReadEntryBundle(ctx context.Context, index uint64) ([]byte, er var entryBundle []byte return entryBundle, row.Scan(&entryBundle) } + +// Add is the entrypoint for adding entries to a sequencing log. +func (s *Storage) Add(ctx context.Context, entry *tessera.Entry) (uint64, error) { + // TODO(#21): Return index if the value is already stored. + + return s.queue.Add(ctx, entry)() +} + +// sequenceBatch writes the entries from the provided batch into the entry bundle files of the log. +// +// This func starts filling entries bundles at the next available slot in the log, ensuring that the +// sequenced entries are contiguous from the zeroth entry (i.e left-hand dense). +// We try to minimise the number of partially complete entry bundles by writing entries in chunks rather +// than one-by-one. +// +// TODO(#21): Separate sequencing and integration for better performance. +func (s *Storage) sequenceBatch(ctx context.Context, entries []*tessera.Entry) error { + // Return when there is no entry to sequence. + if len(entries) == 0 { + return nil + } + + // Get a Tx for making transaction requests. + tx, err := s.db.BeginTx(ctx, nil) + if err != nil { + return err + } + // Defer a rollback in case anything fails. + defer func() { + if err := tx.Rollback(); err != nil && err != sql.ErrTxDone { + klog.Errorf("Failed to rollback in sequenceBatch: %v", err) + } + }() + + // Get tree size from checkpoint. Note that "SELECT ... FOR UPDATE" is used for row-level locking. + // TODO(#21): Optimize how we get the tree size without parsing and verifying the checkpoints every time. + row := tx.QueryRowContext(ctx, selectCheckpointByIDForUpdateSQL, checkpointID) + if err := row.Err(); err != nil { + return err + } + var rawCheckpoint []byte + if err := row.Scan(&rawCheckpoint); err != nil { + return fmt.Errorf("failed to read checkpoint: %w", err) + } + checkpoint, err := s.parseCheckpoint(rawCheckpoint) + if err != nil { + return fmt.Errorf("failed to verify checkpoint: %w", err) + } + + // Integrate the new entries into the entry bundle (TiledLeaves table) and tile (Subtree table). + if err := s.integrate(ctx, tx, checkpoint.Size, entries); err != nil { + return fmt.Errorf("failed to integrate: %w", err) + } + + // Commit the transaction. + return tx.Commit() +} + +// integrate incorporates the provided entries into the log starting at fromSeq. +func (s *Storage) integrate(ctx context.Context, tx *sql.Tx, fromSeq uint64, entries []*tessera.Entry) error { + tb := storage.NewTreeBuilder(func(ctx context.Context, tileIDs []storage.TileID, treeSize uint64) ([]*api.HashTile, error) { + r := make([]*api.HashTile, len(tileIDs)) + + // TODO(#21): Refactor the following to fully utilise the MySQL for fetching multiple tiles in one query with the same ordering. + errG := errgroup.Group{} + for i, id := range tileIDs { + i := i + id := id + errG.Go(func() error { + row := tx.QueryRowContext(ctx, selectSubtreeByLevelAndIndexSQL, id.Level, id.Index) + if err := row.Err(); err != nil { + return err + } + + var tile []byte + if err := row.Scan(&tile); err != nil { + return err + } + t := &api.HashTile{} + if err := t.UnmarshalText(tile); err != nil { + return fmt.Errorf("api.HashTile.unmarshalText(level: %d, index: %d): %w", id.Level, id.Index, err) + } + r[i] = t + return nil + }) + } + if err := errG.Wait(); err != nil { + return nil, err + } + return r, nil + }) + + // TODO(#21): Add sequenced entries to entry bundles. + + sequencedEntries := make([]storage.SequencedEntry, len(entries)) + // Assign provisional sequence numbers to entries. + // We need to do this here in order to support serialisations which include the log position. + for i, e := range entries { + sequencedEntries[i] = storage.SequencedEntry{ + BundleData: e.MarshalBundleData(fromSeq + uint64(i)), + LeafHash: e.LeafHash(), + } + } + + newSize, newRoot, tiles, err := tb.Integrate(ctx, fromSeq, sequencedEntries) + if err != nil { + return fmt.Errorf("tb.Integrate: %v", err) + } + for k, v := range tiles { + nodes, err := v.MarshalText() + if err != nil { + return err + } + + if err := s.writeTile(ctx, tx, uint64(k.Level), k.Index, nodes); err != nil { + return fmt.Errorf("failed to set tile(%v): %w", k, err) + } + } + + // Write new checkpoint. + if err := s.writeCheckpoint(ctx, tx, newSize, newRoot); err != nil { + return fmt.Errorf("writeCheckpoint: %w", err) + } + return nil +}