Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add sequence and integrate foundation to MySQL storage implementation #87

Merged
merged 7 commits into from
Jul 25, 2024
21 changes: 15 additions & 6 deletions cmd/example-mysql/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ package main

import (
"context"
"crypto/sha256"
"database/sql"
"flag"
"fmt"
Expand Down Expand Up @@ -160,12 +159,22 @@ func main() {
w.WriteHeader(http.StatusInternalServerError)
return
}
defer r.Body.Close()

id := sha256.Sum256(b)
_ = tessera.NewEntry(b, tessera.WithIdentity(id[:]))
defer func() {
if err := r.Body.Close(); err != nil {
klog.Warningf("/add: %v", err)
}
}()

// TODO: Add entry to log and return assigned index.
idx, err := storage.Add(r.Context(), tessera.NewEntry(b))
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
_, _ = w.Write([]byte(err.Error()))
return
}
if _, err = w.Write([]byte(fmt.Sprintf("%d", idx))); err != nil {
klog.Errorf("/add: %v", err)
return
}
})

if err := http.ListenAndServe(*listen, http.DefaultServeMux); err != nil {
Expand Down
164 changes: 157 additions & 7 deletions storage/mysql/mysql.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,41 +18,58 @@ package mysql
import (
"context"
"database/sql"
"fmt"
"time"

_ "github.com/go-sql-driver/mysql"
"github.com/transparency-dev/merkle/rfc6962"
tessera "github.com/transparency-dev/trillian-tessera"
"github.com/transparency-dev/trillian-tessera/api"
"github.com/transparency-dev/trillian-tessera/storage"
"golang.org/x/sync/errgroup"
"k8s.io/klog/v2"
)

const (
selectCheckpointByIDSQL = "SELECT `note` FROM `Checkpoint` WHERE `id` = ?"
replaceCheckpointSQL = "REPLACE INTO `Checkpoint` (`id`, `note`) VALUES (?, ?)"
selectSubtreeByLevelAndIndexSQL = "SELECT `nodes` FROM `Subtree` WHERE `level` = ? AND `index` = ?"
replaceSubtreeSQL = "REPLACE INTO `Subtree` (`level`, `index`, `nodes`) VALUES (?, ?, ?)"
selectTiledLeavesSQL = "SELECT `data` FROM `TiledLeaves` WHERE `tile_index` = ?"
replaceTiledLeavesSQL = "REPLACE INTO `TiledLeaves` (`tile_index`, `data`) VALUES (?, ?)"

checkpointID = 0
checkpointID = 0
defaultEntryBundleSize = 256
defaultQueueMaxAge = time.Second
)

// Storage is a MySQL-based storage implementation for Tessera.
type Storage struct {
db *sql.DB
db *sql.DB
queue *storage.Queue

newCheckpoint tessera.NewCPFunc
newCheckpoint tessera.NewCPFunc
parseCheckpoint tessera.ParseCPFunc
}

// New creates a new instance of the MySQL-based Storage.
func New(ctx context.Context, db *sql.DB, opts ...func(*tessera.StorageOptions)) (*Storage, error) {
opt := tessera.ResolveStorageOptions(nil, opts...)
opt := tessera.ResolveStorageOptions(&tessera.StorageOptions{
BatchMaxAge: defaultQueueMaxAge,
BatchMaxSize: defaultEntryBundleSize,
}, opts...)
s := &Storage{
db: db,
newCheckpoint: opt.NewCP,
db: db,
newCheckpoint: opt.NewCP,
parseCheckpoint: opt.ParseCP,
}
if err := s.db.Ping(); err != nil {
klog.Errorf("Failed to ping database: %v", err)
return nil, err
}

s.queue = storage.NewQueue(ctx, opt.BatchMaxAge, opt.BatchMaxSize, s.sequenceBatch)

// Initialize checkpoint if there is no row in the Checkpoint table.
if _, err := s.ReadCheckpoint(ctx); err != nil {
if err != sql.ErrNoRows {
Expand Down Expand Up @@ -111,7 +128,7 @@ func (s *Storage) writeCheckpoint(ctx context.Context, tx *sql.Tx, size uint64,
return nil
}

// ReadTile returns a full tile or a partial tile at the given level and index.
// ReadTile returns a full tile or a partial tile at the given level, index and width.
//
// TODO: Handle the following scenarios:
// 1. Full tile request with full tile output: Return full tile.
Expand Down Expand Up @@ -142,6 +159,16 @@ func (s *Storage) ReadTile(ctx context.Context, level, index, width uint64) ([]b
return tile, nil
}

// writeTile replaces the tile nodes at the given level and index.
func (s *Storage) writeTile(ctx context.Context, tx *sql.Tx, level, index uint64, nodes []byte) error {
if _, err := tx.ExecContext(ctx, replaceSubtreeSQL, level, index, nodes); err != nil {
klog.Errorf("Failed to execute replaceSubtreeSQL: %v", err)
return err
}

return nil
}

// ReadEntryBundle returns the log entries at the given index.
//
// TODO: Handle the following scenarios:
Expand All @@ -159,3 +186,126 @@ func (s *Storage) ReadEntryBundle(ctx context.Context, index uint64) ([]byte, er
var entryBundle []byte
return entryBundle, row.Scan(&entryBundle)
}

// Add is the entrypoint for adding entries to a sequencing log.
func (s *Storage) Add(ctx context.Context, entry *tessera.Entry) (uint64, error) {
// TODO(#21): Return index if the value is already stored.

return s.queue.Add(ctx, entry)()
}

// sequenceBatch writes the entries from the provided batch into the entry bundle files of the log.
//
// This func starts filling entries bundles at the next available slot in the log, ensuring that the
// sequenced entries are contiguous from the zeroth entry (i.e left-hand dense).
// We try to minimise the number of partially complete entry bundles by writing entries in chunks rather
// than one-by-one.
//
// TODO(#21): Separate sequencing and integration for better performance.
func (s *Storage) sequenceBatch(ctx context.Context, entries []*tessera.Entry) error {
// Return when there is no entry to sequence.
if len(entries) == 0 {
return nil
}

// Get a Tx for making transaction requests.
tx, err := s.db.BeginTx(ctx, nil)
if err != nil {
return err
}
// Defer a rollback in case anything fails.
defer func() {
if err := tx.Rollback(); err != nil && err != sql.ErrTxDone {
klog.Errorf("Failed to rollback in sequenceBatch: %v", err)
}
}()

// Get tree size from checkpoint.
row := tx.QueryRowContext(ctx, selectCheckpointByIDSQL, checkpointID)
roger2hk marked this conversation as resolved.
Show resolved Hide resolved
if err := row.Err(); err != nil {
return err
}
var rawCheckpoint []byte
if err := row.Scan(&rawCheckpoint); err != nil {
return fmt.Errorf("failed to read checkpoint: %w", err)
}
checkpoint, err := s.parseCheckpoint(rawCheckpoint)
if err != nil {
return fmt.Errorf("failed to verify checkpoint: %w", err)
}
roger2hk marked this conversation as resolved.
Show resolved Hide resolved

// Integrate the new entries into the entry bundle (TiledLeaves table) and tile (Subtree table).
if err := s.integrate(ctx, tx, checkpoint.Size, entries); err != nil {
return fmt.Errorf("failed to integrate: %w", err)
}

// Commit the transaction.
return tx.Commit()
}

// integrate incorporates the provided entries into the log starting at fromSeq.
func (s *Storage) integrate(ctx context.Context, tx *sql.Tx, fromSeq uint64, entries []*tessera.Entry) error {
tb := storage.NewTreeBuilder(func(ctx context.Context, tileIDs []storage.TileID, treeSize uint64) ([]*api.HashTile, error) {
r := make([]*api.HashTile, len(tileIDs))
errG := errgroup.Group{}
for i, id := range tileIDs {
i := i
id := id
errG.Go(func() error {
// TODO: Refactor the query into a function.
roger2hk marked this conversation as resolved.
Show resolved Hide resolved
row := tx.QueryRowContext(ctx, selectSubtreeByLevelAndIndexSQL, id.Level, id.Index)
if err := row.Err(); err != nil {
return err
}

var tile []byte
if err := row.Scan(&tile); err != nil {
return err
}
t := &api.HashTile{}
if err := t.UnmarshalText(tile); err != nil {
return fmt.Errorf("api.HashTile.unmarshalText(level: %d, index: %d): %w", id.Level, id.Index, err)
}
r[i] = t
return nil
})
}
if err := errG.Wait(); err != nil {
return nil, err
}
return r, nil
})

// TODO(#21): Add sequenced entries to entry bundles.

sequencedEntries := make([]storage.SequencedEntry, len(entries))
// Assign provisional sequence numbers to entries.
// We need to do this here in order to support serialisations which include the log position.
for i, e := range entries {
sequencedEntries[i] = storage.SequencedEntry{
BundleData: e.MarshalBundleData(fromSeq + uint64(i)),
LeafHash: e.LeafHash(),
}
}

newSize, newRoot, tiles, err := tb.Integrate(ctx, fromSeq, sequencedEntries)
if err != nil {
return fmt.Errorf("tb.Integrate: %v", err)
}
for k, v := range tiles {
nodes, err := v.MarshalText()
if err != nil {
return err
}

if err := s.writeTile(ctx, tx, uint64(k.Level), k.Index, nodes); err != nil {
return fmt.Errorf("failed to set tile(%v): %w", k, err)
}
}

// Write new checkpoint.
if err := s.writeCheckpoint(ctx, tx, newSize, newRoot); err != nil {
return fmt.Errorf("writeCheckpoint: %w", err)
}
return nil
}
Loading