Skip to content

Commit

Permalink
Improve writeCheckpoint in MySQL storage implementation (#82)
Browse files Browse the repository at this point in the history
* Improve `writeCheckpoint` in MySQL storage implementation

* Remove unused `WriteCheckpointFunc`

* Use `rfc6962.DefaultHasher.EmptyRoot()` as the initial checkpoint root hash
  • Loading branch information
roger2hk authored Jul 24, 2024
1 parent 6d9b335 commit 999c97d
Show file tree
Hide file tree
Showing 3 changed files with 42 additions and 17 deletions.
14 changes: 13 additions & 1 deletion cmd/example-mysql/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,14 @@ import (
"fmt"
"io"
"net/http"
"os"
"strconv"
"strings"
"time"

tessera "github.com/transparency-dev/trillian-tessera"
"github.com/transparency-dev/trillian-tessera/storage/mysql"
"golang.org/x/mod/sumdb/note"
"k8s.io/klog/v2"
)

Expand All @@ -38,6 +40,7 @@ var (
dbMaxOpenConns = flag.Int("db_max_open_conns", 64, "")
dbMaxIdleConns = flag.Int("db_max_idle_conns", 64, "")
listen = flag.String("listen", ":2024", "Address:port to listen on")
privateKeyPath = flag.String("private_key_path", "", "Location of private key file")
)

func main() {
Expand All @@ -53,7 +56,16 @@ func main() {
db.SetMaxOpenConns(*dbMaxOpenConns)
db.SetMaxIdleConns(*dbMaxIdleConns)

storage, err := mysql.New(ctx, db)
rawPrivateKey, err := os.ReadFile(*privateKeyPath)
if err != nil {
klog.Exitf("Failed to read private key file %q: %v", *privateKeyPath, err)
}
noteSigner, err := note.NewSigner(string(rawPrivateKey))
if err != nil {
klog.Exitf("Failed to create new signer: %v", err)
}

storage, err := mysql.New(ctx, db, tessera.WithCheckpointSigner(noteSigner))
if err != nil {
klog.Exitf("Failed to create new MySQL storage: %v", err)
}
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ require (
go.opentelemetry.io/otel v1.24.0 // indirect
go.opentelemetry.io/otel/metric v1.24.0 // indirect
go.opentelemetry.io/otel/trace v1.24.0 // indirect
golang.org/x/crypto v0.25.0 // indirect
golang.org/x/crypto v0.25.0
golang.org/x/net v0.27.0 // indirect
golang.org/x/oauth2 v0.21.0 // indirect
golang.org/x/sync v0.7.0
Expand Down
43 changes: 28 additions & 15 deletions storage/mysql/mysql.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import (
"database/sql"

_ "github.com/go-sql-driver/mysql"
"github.com/transparency-dev/merkle/rfc6962"
tessera "github.com/transparency-dev/trillian-tessera"
"k8s.io/klog/v2"
)

Expand All @@ -35,12 +37,16 @@ const (
// Storage is a MySQL-based storage implementation for Tessera.
type Storage struct {
db *sql.DB

newCheckpoint tessera.NewCPFunc
}

// New creates a new instance of the MySQL-based Storage.
func New(ctx context.Context, db *sql.DB) (*Storage, error) {
func New(ctx context.Context, db *sql.DB, opts ...func(*tessera.StorageOptions)) (*Storage, error) {
opt := tessera.ResolveStorageOptions(nil, opts...)
s := &Storage{
db: db,
db: db,
newCheckpoint: opt.NewCP,
}
if err := s.db.Ping(); err != nil {
klog.Errorf("Failed to ping database: %v", err)
Expand All @@ -55,10 +61,25 @@ func New(ctx context.Context, db *sql.DB) (*Storage, error) {
}

klog.Infof("Initializing checkpoint")
if err := s.writeCheckpoint(ctx, []byte("")); err != nil {
// Get a Tx for making transaction requests.
tx, err := s.db.BeginTx(ctx, nil)
if err != nil {
return nil, err
}
// Defer a rollback in case anything fails.
defer func() {
if err := tx.Rollback(); err != nil && err != sql.ErrTxDone {
klog.Errorf("Failed to rollback in write initial checkpoint: %v", err)
}
}()
if err := s.writeCheckpoint(ctx, tx, 0, rfc6962.DefaultHasher.EmptyRoot()); err != nil {
klog.Errorf("Failed to write initial checkpoint: %v", err)
return nil, err
}
// Commit the transaction.
if err := tx.Commit(); err != nil {
return nil, err
}
}

return s, nil
Expand All @@ -75,27 +96,19 @@ func (s *Storage) ReadCheckpoint(ctx context.Context) ([]byte, error) {
return checkpoint, row.Scan(&checkpoint)
}

// writeCheckpoint stores a raw log checkpoint.
func (s *Storage) writeCheckpoint(ctx context.Context, rawCheckpoint []byte) error {
// Get a Tx for making transaction requests.
tx, err := s.db.BeginTx(ctx, nil)
// writeCheckpoint stores the log signed checkpoint.
func (s *Storage) writeCheckpoint(ctx context.Context, tx *sql.Tx, size uint64, rootHash []byte) error {
rawCheckpoint, err := s.newCheckpoint(size, rootHash)
if err != nil {
return err
}
// Defer a rollback in case anything fails.
defer func() {
if err := tx.Rollback(); err != nil && err != sql.ErrTxDone {
klog.Errorf("Failed to rollback in writeCheckpoint: %v", err)
}
}()

if _, err := tx.ExecContext(ctx, replaceCheckpointSQL, checkpointID, rawCheckpoint); err != nil {
klog.Errorf("Failed to execute replaceCheckpointSQL: %v", err)
return err
}

// Commit the transaction.
return tx.Commit()
return nil
}

// ReadTile returns a full tile or a partial tile at the given level and index.
Expand Down

0 comments on commit 999c97d

Please sign in to comment.