Skip to content

Commit

Permalink
Tesseraify POSIX log using mixins (#77)
Browse files Browse the repository at this point in the history
  • Loading branch information
AlCutter authored Jul 24, 2024
1 parent 35f9635 commit 13a1d5e
Show file tree
Hide file tree
Showing 5 changed files with 119 additions and 347 deletions.
5 changes: 0 additions & 5 deletions api/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (
"bytes"
"crypto/sha256"
"encoding/binary"
"errors"
"fmt"
)

Expand Down Expand Up @@ -67,10 +66,6 @@ type EntryBundle struct {
Entries [][]byte
}

func (t *EntryBundle) MarshalText() ([]byte, error) {
return nil, errors.New("unimplemented")
}

// UnmarshalText implements encoding/TextUnmarshaler and reads EntryBundles
// which are encoded using the tlog-tiles spec.
func (t *EntryBundle) UnmarshalText(raw []byte) error {
Expand Down
51 changes: 30 additions & 21 deletions cmd/example-posix/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,17 @@ package main

import (
"context"
"errors"
"flag"
"fmt"
"os"
"path/filepath"
"sync"
"time"

"golang.org/x/mod/sumdb/note"

"github.com/transparency-dev/merkle/rfc6962"
tessera "github.com/transparency-dev/trillian-tessera"
"github.com/transparency-dev/trillian-tessera/storage/posix"
"k8s.io/klog/v2"

Expand All @@ -49,6 +51,8 @@ func main() {
klog.InitFlags(nil)
flag.Parse()

ctx := context.Background()

// Read log public key from file or environment variable
var pubKey string
var err error
Expand Down Expand Up @@ -109,6 +113,7 @@ func main() {
if err != nil {
klog.Exitf("Failed to glob entries %q: %q", *entries, err)
}
klog.V(1).Infof("toAdd: %v", toAdd)
if len(toAdd) == 0 {
klog.Exit("Sequence must be run with at least one valid entry")
}
Expand All @@ -131,7 +136,7 @@ func main() {
}
return cp.Size, cp.Hash, nil
}
st := posix.New(*storageDir, readCP, writeCP)
st := posix.New(ctx, *storageDir, readCP, tessera.WithCheckpointSignerVerifier(s, v), tessera.WithBatching(256, time.Second))

// sequence entries

Expand All @@ -141,37 +146,41 @@ func main() {
// sequence numbers assigned to the data from the provided input files.
type entryInfo struct {
name string
b []byte
e *tessera.Entry
}
entries := make(chan entryInfo, 100)
entryChan := make(chan entryInfo, 100)
go func() {
for _, fp := range toAdd {
b, err := os.ReadFile(fp)
if err != nil {
klog.Exitf("Failed to read entry file %q: %q", fp, err)
}
entries <- entryInfo{name: fp, b: b}
entryChan <- entryInfo{name: fp, e: tessera.NewEntry(b)}
}
close(entries)
close(entryChan)
}()

for entry := range entries {
// ask storage to sequence
dupe := false
seq, err := st.Sequence(context.Background(), entry.b)
if err != nil {
if errors.Is(err, posix.ErrDupeLeaf) {
dupe = true
} else {
klog.Exitf("failed to sequence %q: %q", entry.name, err)
numWorkers := 256
if l := len(toAdd); l < numWorkers {
numWorkers = l
}

wg := sync.WaitGroup{}
for i := 0; i < numWorkers; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for entry := range entryChan {
// ask storage to sequence
seq, err := st.Add(context.Background(), entry.e)
if err != nil {
klog.Exitf("failed to sequence %q: %q", entry.name, err)
}
klog.Infof("%d: %v", seq, entry.name)
}
}
l := fmt.Sprintf("%d: %v", seq, entry.name)
if dupe {
l += " (dupe)"
}
klog.Info(l)
}()
}
wg.Wait()
}

func getKeyFile(path string) (string, error) {
Expand Down
127 changes: 89 additions & 38 deletions storage/posix/files.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package posix

import (
"bytes"
"context"
"errors"
"fmt"
Expand All @@ -23,9 +24,10 @@ import (
"sync"
"syscall"

"github.com/transparency-dev/merkle/rfc6962"
tessera "github.com/transparency-dev/trillian-tessera"
"github.com/transparency-dev/trillian-tessera/api"
"github.com/transparency-dev/trillian-tessera/api/layout"
"github.com/transparency-dev/trillian-tessera/storage"
"k8s.io/klog/v2"
)

Expand All @@ -38,15 +40,15 @@ const (
// It leverages the POSIX atomic operations.
type Storage struct {
sync.Mutex
path string
pool *Pool
path string
queue *storage.Queue

cpFile *os.File

curTree CurrentTreeFunc
newTree NewTreeFunc

curSize uint64
newCP tessera.NewCPFunc
}

// NewTreeFunc is the signature of a function which receives information about newly integrated trees.
Expand All @@ -56,18 +58,20 @@ type NewTreeFunc func(size uint64, root []byte) error
type CurrentTreeFunc func() (uint64, []byte, error)

// New creates a new POSIX storage.
func New(path string, curTree CurrentTreeFunc, newTree NewTreeFunc) *Storage {
func New(ctx context.Context, path string, curTree func() (uint64, []byte, error), opts ...func(*tessera.StorageOptions)) *Storage {
curSize, _, err := curTree()
if err != nil {
panic(err)
}
opt := tessera.ResolveStorageOptions(nil, opts...)

r := &Storage{
path: path,
curSize: curSize,
curTree: curTree,
newTree: newTree,
newCP: opt.NewCP,
}
r.pool = NewPool(r.sequenceBatch)
r.queue = storage.NewQueue(ctx, opt.BatchMaxAge, opt.BatchMaxSize, r.sequenceBatch)

return r
}
Expand Down Expand Up @@ -112,10 +116,10 @@ func (s *Storage) unlockCP() error {
return nil
}

// Sequence commits to sequence numbers for an entry
// Add commits to sequence numbers for an entry
// Returns the sequence number assigned to the first entry in the batch, or an error.
func (s *Storage) Sequence(ctx context.Context, b []byte) (uint64, error) {
return s.pool.Add(b)
func (s *Storage) Add(ctx context.Context, e *tessera.Entry) (uint64, error) {
return s.queue.Add(ctx, e)()
}

// GetEntryBundle retrieves the Nth entries bundle for a log of the given size.
Expand All @@ -129,7 +133,7 @@ func (s *Storage) GetEntryBundle(ctx context.Context, index, logSize uint64) ([]
// sequenced entries are contiguous from the zeroth entry (i.e left-hand dense).
// We try to minimise the number of partially complete entry bundles by writing entries in chunks rather
// than one-by-one.
func (s *Storage) sequenceBatch(ctx context.Context, batch Batch) (uint64, error) {
func (s *Storage) sequenceBatch(ctx context.Context, entries []*tessera.Entry) error {
// Double locking:
// - The mutex `Lock()` ensures that multiple concurrent calls to this function within a task are serialised.
// - The POSIX `LockCP()` ensures that distinct tasks are serialised.
Expand All @@ -146,92 +150,139 @@ func (s *Storage) sequenceBatch(ctx context.Context, batch Batch) (uint64, error

size, _, err := s.curTree()
if err != nil {
return 0, err
return err
}
s.curSize = size
klog.V(1).Infof("Sequencing from %d", s.curSize)

if len(batch.Entries) == 0 {
return 0, nil
if len(entries) == 0 {
return nil
}
currTile := api.EntryBundle{}
newSize := s.curSize + uint64(len(batch.Entries))
currTile := &bytes.Buffer{}
newSize := s.curSize + uint64(len(entries))
seq := s.curSize
bundleIndex, entriesInBundle := seq/uint64(256), seq%uint64(256)
if entriesInBundle > 0 {
// If the latest bundle is partial, we need to read the data it contains in for our newer, larger, bundle.
part, err := s.GetEntryBundle(ctx, bundleIndex, s.curSize)
if err != nil {
return 0, err
return err
}
if err := currTile.UnmarshalText(part); err != nil {
return 0, fmt.Errorf("failed to parse existing partial bundle for %d: %v", bundleIndex, err)
if _, err := currTile.Write(part); err != nil {
return fmt.Errorf("failed to write partial bundle into buffer: %v", err)
}
}
writeBundle := func(bundleIndex uint64) error {
bf := filepath.Join(s.path, layout.EntriesPath(bundleIndex, newSize))
if err := os.MkdirAll(filepath.Dir(bf), dirPerm); err != nil {
return fmt.Errorf("failed to make entries directory structure: %w", err)
}
data, err := currTile.MarshalText()
if err != nil {
return fmt.Errorf("failed to serialize bundle: %v", err)
}
if err := createExclusive(bf, data); err != nil {
if err := createExclusive(bf, currTile.Bytes()); err != nil {
if !errors.Is(os.ErrExist, err) {
return err
}
}
return nil
}

seqEntries := make([]storage.SequencedEntry, 0, len(entries))
// Add new entries to the bundle
for _, e := range batch.Entries {
currTile.Entries = append(currTile.Entries, e)
for i, e := range entries {
bundleData := e.MarshalBundleData(seq + uint64(i))
if _, err := currTile.Write(bundleData); err != nil {
return fmt.Errorf("failed to write entry %d to currTile: %v", i, err)
}
seqEntries = append(seqEntries, storage.SequencedEntry{
BundleData: bundleData,
LeafHash: e.LeafHash(),
})

entriesInBundle++
if entriesInBundle == uint64(256) {
// This bundle is full, so we need to write it out...
// ... and prepare the next entry bundle for any remaining entries in the batch
if err := writeBundle(bundleIndex); err != nil {
return 0, err
return err
}
bundleIndex++
entriesInBundle = 0
currTile = api.EntryBundle{}
currTile = &bytes.Buffer{}
}
}
// If we have a partial bundle remaining once we've added all the entries from the batch,
// this needs writing out too.
if entriesInBundle > 0 {
if err := writeBundle(bundleIndex); err != nil {
return 0, err
return err
}
}

// For simplicitly, well in-line the integration of these new entries into the Merkle structure too.
return seq, s.doIntegrate(ctx, seq, batch.Entries)
if err := s.doIntegrate(ctx, seq, seqEntries); err != nil {
klog.Errorf("Integrate failed: %v", err)
return err
}
return nil

}

// doIntegrate handles integrating new entries into the log, and updating the checkpoint.
func (s *Storage) doIntegrate(ctx context.Context, from uint64, batch [][]byte) error {
newSize, newRoot, err := Integrate(ctx, from, batch, s, rfc6962.DefaultHasher)
func (s *Storage) doIntegrate(ctx context.Context, fromSeq uint64, entries []storage.SequencedEntry) error {
tb := storage.NewTreeBuilder(func(ctx context.Context, tileIDs []storage.TileID, treeSize uint64) ([]*api.HashTile, error) {
n, err := s.getTiles(ctx, tileIDs, treeSize)
if err != nil {
return nil, fmt.Errorf("getTiles: %w", err)
}
return n, nil
})

newSize, newRoot, tiles, err := tb.Integrate(ctx, fromSeq, entries)
if err != nil {
klog.Errorf("Failed to integrate: %v", err)
return err
klog.Errorf("Integrate: %v", err)
return fmt.Errorf("Integrate: %v", err)
}
if err := s.newTree(newSize, newRoot); err != nil {
return fmt.Errorf("newTree: %v", err)
for k, v := range tiles {
if err := s.StoreTile(ctx, uint64(k.Level), k.Index, newSize, v); err != nil {
return fmt.Errorf("failed to set tile(%v): %v", k, err)
}
}

klog.Infof("New CP: %d, %x", newSize, newRoot)
if s.newCP != nil {
cpRaw, err := s.newCP(newSize, newRoot)
if err != nil {
return fmt.Errorf("newCP: %v", err)
}
if err := WriteCheckpoint(s.path, cpRaw); err != nil {
return fmt.Errorf("failed to write new checkpoint: %v", err)
}
}

return nil
}

func (s *Storage) getTiles(ctx context.Context, tileIDs []storage.TileID, treeSize uint64) ([]*api.HashTile, error) {
r := make([]*api.HashTile, 0, len(tileIDs))
for _, id := range tileIDs {
t, err := s.GetTile(ctx, id.Level, id.Index, treeSize)
if err != nil {
return nil, err
}
r = append(r, t)
}
return r, nil
}

// GetTile returns the tile at the given tile-level and tile-index.
// If no complete tile exists at that location, it will attempt to find a
// partial tile for the given tree size at that location.
func (s *Storage) GetTile(_ context.Context, level, index, logSize uint64) (*api.HashTile, error) {
p := filepath.Join(s.path, layout.TilePath(level, index, logSize))
t, err := os.ReadFile(p)
if err != nil {
if !errors.Is(err, os.ErrNotExist) {
return nil, fmt.Errorf("failed to read tile at %q: %w", p, err)
if errors.Is(err, os.ErrNotExist) {
// We'll signal to higher levels that it wasn't found by retuning a nil for this tile.
return nil, nil
}
return nil, err
}
Expand Down
Loading

0 comments on commit 13a1d5e

Please sign in to comment.