From c8604b6db53c11ab380c5b277aeb56bb6b9dc2a5 Mon Sep 17 00:00:00 2001 From: Al Cutter Date: Tue, 14 Jan 2025 14:02:12 +0000 Subject: [PATCH] Experimental migration support for POSIX (#441) --- api/layout/paths.go | 66 ++++++ api/layout/paths_test.go | 88 ++++++++ cmd/experimental/migrate/internal/migrate.go | 200 +++++++++++++++++++ cmd/experimental/migrate/posix/main.go | 76 +++++++ go.mod | 1 + go.sum | 2 + storage/posix/files.go | 127 +++++++++++- 7 files changed, 558 insertions(+), 2 deletions(-) create mode 100644 cmd/experimental/migrate/internal/migrate.go create mode 100644 cmd/experimental/migrate/posix/main.go diff --git a/api/layout/paths.go b/api/layout/paths.go index 07d2aa2b..c4948d48 100644 --- a/api/layout/paths.go +++ b/api/layout/paths.go @@ -21,6 +21,7 @@ package layout import ( "fmt" + "iter" "math" "strconv" "strings" @@ -40,6 +41,71 @@ func EntriesPathForLogIndex(seq, logSize uint64) string { return EntriesPath(seq/EntryBundleWidth, PartialTileSize(0, seq, logSize)) } +// Range returns an iterator over a list of RangeInfo structs which describe the bundles/tiles +// necessary to cover the specified range of individual entries/hashes `[from, min(from+N, treeSize) )`. +// +// If from >= treeSize or N == 0, the returned iterator will yield no elements. +func Range(from, N, treeSize uint64) iter.Seq[RangeInfo] { + return func(yield func(RangeInfo) bool) { + // Range is empty if we're entirely beyond the extent of the tree, or we've been asked for zero items. + if from >= treeSize || N == 0 { + return + } + // Truncate range at size of tree if necessary. + if from+N > treeSize { + N = treeSize - from + } + + endInc := from + N - 1 + sIndex := from / EntryBundleWidth + eIndex := endInc / EntryBundleWidth + + for idx := sIndex; idx <= eIndex; idx++ { + ri := RangeInfo{ + Index: idx, + N: EntryBundleWidth, + } + + switch ri.Index { + case sIndex: + ri.Partial = PartialTileSize(0, sIndex, treeSize) + ri.First = uint(from % EntryBundleWidth) + ri.N = uint(EntryBundleWidth) - ri.First + + // Handle corner-case where the range is entirely contained in first bundle, if applicable: + if ri.Index == eIndex { + ri.N = uint((endInc)%EntryBundleWidth) - ri.First + 1 + } + case eIndex: + ri.Partial = PartialTileSize(0, eIndex, treeSize) + ri.N = uint((endInc)%EntryBundleWidth) + 1 + } + + if !yield(ri) { + return + } + } + } +} + +// RangeInfo describes a specific range of elements within a particular bundle/tile. +// +// Usage: +// +// bundleRaw, ... := fetchBundle(..., ri.Index, ri.Partial") +// bundle, ... := parseBundle(bundleRaw) +// elements := bundle.Entries[ri.First : ri.First+ri.N] +type RangeInfo struct { + // Index is the index of the entry bundle/tile in the tree. + Index uint64 + // Partial is the partial size of the bundle/tile, or zero if a full bundle/tile is expected. + Partial uint8 + // First is the offset into the entries contained by the bundle/tile at which the range starts. + First uint + // N is the number of entries, starting at First, which are covered by the range. + N uint +} + // NWithSuffix returns a tiles-spec "N" path, with a partial suffix if p > 0. func NWithSuffix(l, n uint64, p uint8) string { suffix := "" diff --git a/api/layout/paths_test.go b/api/layout/paths_test.go index 093eab69..947e1db7 100644 --- a/api/layout/paths_test.go +++ b/api/layout/paths_test.go @@ -17,6 +17,8 @@ package layout import ( "fmt" "testing" + + "github.com/google/go-cmp/cmp" ) func TestEntriesPathForLogIndex(t *testing.T) { @@ -306,3 +308,89 @@ func TestParseTileLevelIndexPartial(t *testing.T) { }) } } + +func TestRange(t *testing.T) { + for _, test := range []struct { + from, N, treeSize uint64 + desc string + want []RangeInfo + }{ + { + desc: "from beyond extent", + from: 10, + N: 1, + treeSize: 5, + want: []RangeInfo{}, + }, { + desc: "range end beyond extent", + from: 3, + N: 100, + treeSize: 5, + want: []RangeInfo{{Index: 0, First: 3, N: 5 - 3, Partial: 5}}, + }, { + desc: "empty range", + from: 1, + N: 0, + treeSize: 2, + want: []RangeInfo{}, + }, { + desc: "ok: full first bundle", + from: 0, + N: 256, + treeSize: 257, + want: []RangeInfo{{N: 256}}, + }, { + desc: "ok: entire single (partial) bundle", + from: 20, + N: 90, + treeSize: 111, + want: []RangeInfo{{Index: 0, Partial: 111, First: 20, N: 90}}, + }, { + desc: "ok: slice from single bundle with initial offset", + from: 20, + N: 90, + treeSize: 1 << 20, + want: []RangeInfo{{Index: 0, Partial: 0, First: 20, N: 90}}, + }, { + desc: "ok: multiple bundles, first is full, last is truncated", + from: 0, + N: 4*256 + 42, + treeSize: 1 << 20, + want: []RangeInfo{ + {Index: 0, Partial: 0, First: 0, N: 256}, + {Index: 1, Partial: 0, First: 0, N: 256}, + {Index: 2, Partial: 0, First: 0, N: 256}, + {Index: 3, Partial: 0, First: 0, N: 256}, + {Index: 4, Partial: 0, First: 0, N: 42}, + }, + }, { + desc: "ok: multiple bundles, first is offset, last is truncated", + from: 2, + N: 4*256 + 4, + treeSize: 1 << 20, + want: []RangeInfo{ + {Index: 0, Partial: 0, First: 2, N: 256 - 2}, + {Index: 1, Partial: 0, First: 0, N: 256}, + {Index: 2, Partial: 0, First: 0, N: 256}, + {Index: 3, Partial: 0, First: 0, N: 256}, + {Index: 4, Partial: 0, First: 0, N: 6}, + }, + }, { + desc: "ok: offset and trucated from single bundle in middle of tree", + from: 8*256 + 66, + N: 4, + treeSize: 1 << 20, + want: []RangeInfo{{Index: 8, Partial: 0, First: 66, N: 4}}, + }, + } { + t.Run(test.desc, func(t *testing.T) { + i := 0 + for gotInfo := range Range(test.from, test.N, test.treeSize) { + if d := cmp.Diff(test.want[i], gotInfo); d != "" { + t.Fatalf("got results[%d] with diff:\n%s", i, d) + } + i++ + } + }) + } +} diff --git a/cmd/experimental/migrate/internal/migrate.go b/cmd/experimental/migrate/internal/migrate.go new file mode 100644 index 00000000..0bc29358 --- /dev/null +++ b/cmd/experimental/migrate/internal/migrate.go @@ -0,0 +1,200 @@ +// Copyright 2024 The Tessera authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package internal + +import ( + "bytes" + "context" + "fmt" + "sync/atomic" + "time" + + "github.com/avast/retry-go/v4" + "github.com/transparency-dev/merkle/rfc6962" + "github.com/transparency-dev/trillian-tessera/api" + "github.com/transparency-dev/trillian-tessera/api/layout" + "github.com/transparency-dev/trillian-tessera/client" + "golang.org/x/sync/errgroup" + "k8s.io/klog/v2" +) + +// copier controls the migration work. +type copier struct { + // storage is the target we're migrating to. + storage MigrationStorage + getEntries client.EntryBundleFetcherFunc + + // sourceSize is the size of the source log. + sourceSize uint64 + // sourceRoot is the root hash of the source log at sourceSize. + sourceRoot []byte + + // todo contains work items to be completed. + todo chan bundle + + // bundlesMigrated is the number of entry bundles migrated so far. + bundlesMigrated atomic.Uint64 +} + +// bundle represents the address of an individual entry bundle. +type bundle struct { + Index uint64 + Partial uint8 +} + +// MigrationStorage describes the required functionality from the target storage driver. +// +// It's expected that the implementation of this interface will attempt to integrate the entry bundles +// being set as soon as is reasonably possible. It's up to the implementation whether that's done as a +// background task, or is in-line with the call to AwaitIntegration. +type MigrationStorage interface { + // SetEntryBundle is called to store the provided entry bundle bytes at the given coordinates. + // + // Implementations SHOULD treat calls to this function as being idempotent - i.e. attempts to set a previously + // set entry bundle should succeed if the bundle data are identical. + // + // This will be called as many times as necessary to set all entry bundles being migrated, quite likely in parallel. + SetEntryBundle(ctx context.Context, index uint64, partial uint8, bundle []byte) error + // AwaitIntegration should block until the storage driver has received and integrated all outstanding entry bundles implied by sourceSize, + // and return the locally calculated root hash. + // An error should be returned if there is a problem integrating. + AwaitIntegration(ctx context.Context, sourceSize uint64) ([]byte, error) + // State returns the current integrated size and root hash of the local tree. + State(ctx context.Context) (uint64, []byte, error) +} + +// Migrate starts the work of copying sourceSize entries from the source to the target log. +// +// Only the entry bundles are copied as the target storage is expected to integrate them and recalculate the root. +// This is done to ensure the correctness of both the source log as well as the copy process itself. +// +// A call to this function will block until either the copying is done, or an error has occurred. +// It is an error if the resource copying completes ok but the resulting root hash does not match the provided sourceRoot. +func Migrate(ctx context.Context, numWorkers int, sourceSize uint64, sourceRoot []byte, getEntries client.EntryBundleFetcherFunc, storage MigrationStorage) error { + klog.Infof("Starting migration; source size %d root %x", sourceSize, sourceRoot) + + m := &copier{ + storage: storage, + sourceSize: sourceSize, + sourceRoot: sourceRoot, + getEntries: getEntries, + todo: make(chan bundle, numWorkers), + } + + // init + targetSize, targetRoot, err := m.storage.State(ctx) + if err != nil { + return fmt.Errorf("Size: %v", err) + } + if targetSize > sourceSize { + return fmt.Errorf("Target size %d > source size %d", targetSize, sourceSize) + } + if targetSize == sourceSize { + if !bytes.Equal(targetRoot, sourceRoot) { + return fmt.Errorf("migration completed, but local root hash %x != source root hash %x", targetRoot, sourceRoot) + } + return nil + } + + bundlesToMigrate := (sourceSize / layout.EntryBundleWidth) - (targetSize / layout.EntryBundleWidth) + 1 + go m.populateWork(targetSize, sourceSize) + + // Print stats + go func() { + for { + time.Sleep(time.Second) + bn := m.bundlesMigrated.Load() + bnp := float64(bn*100) / float64(bundlesToMigrate) + s, _, err := m.storage.State(ctx) + if err != nil { + klog.Warningf("Size: %v", err) + } + intp := float64(s*100) / float64(sourceSize) + klog.Infof("integration: %d (%.2f%%) bundles: %d (%.2f%%)", s, intp, bn, bnp) + } + }() + + // Do the copying + eg := errgroup.Group{} + for i := 0; i < numWorkers; i++ { + eg.Go(func() error { + return m.migrateWorker(ctx) + }) + } + if err := eg.Wait(); err != nil { + return fmt.Errorf("migrate failed to copy resources: %v", err) + } + + root, err := m.storage.AwaitIntegration(ctx, sourceSize) + if err != nil { + return fmt.Errorf("Migration failed: %v", err) + } + if !bytes.Equal(root, sourceRoot) { + return fmt.Errorf("migration completed, but local root hash %x != source root hash %x", targetRoot, sourceRoot) + } + klog.Infof("Migration successful.") + return nil +} + +// populateWork sends entries to the `todo` work channel. +// Each entry corresponds to an individual entryBundle which needs to be copied. +func (m *copier) populateWork(from, treeSize uint64) { + klog.Infof("Spans for entry range [%d, %d)", from, treeSize) + defer close(m.todo) + + for ri := range layout.Range(from, treeSize-from, treeSize) { + m.todo <- bundle{Index: ri.Index, Partial: ri.Partial} + } +} + +// migrateWorker undertakes work items from the `todo` channel. +// +// It will attempt to retry failed operations several times before giving up, this should help +// deal with any transient errors which may occur. +func (m *copier) migrateWorker(ctx context.Context) error { + for b := range m.todo { + err := retry.Do(func() error { + d, err := m.getEntries(ctx, b.Index, uint8(b.Partial)) + if err != nil { + return fmt.Errorf("failed to fetch entrybundle %d (p=%d): %v", b.Index, b.Partial, err) + } + if err := m.storage.SetEntryBundle(ctx, b.Index, b.Partial, d); err != nil { + return fmt.Errorf("failed to store entrybundle %d (p=%d): %v", b.Index, b.Partial, err) + } + m.bundlesMigrated.Add(1) + return nil + }, + retry.Attempts(10), + retry.DelayType(retry.BackOffDelay)) + if err != nil { + return err + } + } + return nil +} + +// BundleHasher parses a C2SP tlog-tile bundle and returns the leaf hashes of each entry it contains. +func BundleHasher(bundle []byte) ([][]byte, error) { + eb := &api.EntryBundle{} + if err := eb.UnmarshalText(bundle); err != nil { + return nil, fmt.Errorf("unmarshal: %v", err) + } + r := make([][]byte, 0, len(eb.Entries)) + for _, e := range eb.Entries { + h := rfc6962.DefaultHasher.HashLeaf(e) + r = append(r, h[:]) + } + return r, nil +} diff --git a/cmd/experimental/migrate/posix/main.go b/cmd/experimental/migrate/posix/main.go new file mode 100644 index 00000000..a414cb49 --- /dev/null +++ b/cmd/experimental/migrate/posix/main.go @@ -0,0 +1,76 @@ +// Copyright 2024 The Tessera authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// posix-migrate is a command-line tool for migrating data from a tlog-tiles +// compliant log, into a Tessera log instance. +package main + +import ( + "context" + "encoding/base64" + "flag" + "net/url" + "strconv" + "strings" + + "github.com/transparency-dev/trillian-tessera/client" + "github.com/transparency-dev/trillian-tessera/cmd/experimental/migrate/internal" + "github.com/transparency-dev/trillian-tessera/storage/posix" + "k8s.io/klog/v2" +) + +var ( + storageDir = flag.String("storage_dir", "", "Root directory to store log data.") + initialise = flag.Bool("initialise", false, "Set when creating a new log to initialise the structure.") + sourceURL = flag.String("source_url", "", "Base URL for the source log.") + numWorkers = flag.Int("num_workers", 30, "Number of migration worker goroutines.") +) + +func main() { + klog.InitFlags(nil) + flag.Parse() + ctx := context.Background() + + srcURL, err := url.Parse(*sourceURL) + if err != nil { + klog.Exitf("Invalid --source_url %q: %v", *sourceURL, err) + } + src, err := client.NewHTTPFetcher(srcURL, nil) + if err != nil { + klog.Exitf("Failed to create HTTP fetcher: %v", err) + } + sourceCP, err := src.ReadCheckpoint(ctx) + if err != nil { + klog.Exitf("fetch initial source checkpoint: %v", err) + } + bits := strings.Split(string(sourceCP), "\n") + sourceSize, err := strconv.ParseUint(bits[1], 10, 64) + if err != nil { + klog.Exitf("invalid CP size %q: %v", bits[1], err) + } + sourceRoot, err := base64.StdEncoding.DecodeString(bits[2]) + if err != nil { + klog.Exitf("invalid checkpoint roothash %q: %v", bits[2], err) + } + + // Create our Tessera storage backend: + st, err := posix.NewMigrationTarget(ctx, *storageDir, *initialise, internal.BundleHasher) + if err != nil { + klog.Exitf("Failed to create new POSIX storage: %v", err) + } + + if err := internal.Migrate(context.Background(), *numWorkers, sourceSize, sourceRoot, src.ReadEntryBundle, st); err != nil { + klog.Exitf("Migrate failed: %v", err) + } +} diff --git a/go.mod b/go.mod index 2785265d..c116e322 100644 --- a/go.mod +++ b/go.mod @@ -8,6 +8,7 @@ require ( cloud.google.com/go/spanner v1.73.0 cloud.google.com/go/storage v1.50.0 github.com/RobinUS2/golang-moving-average v1.0.0 + github.com/avast/retry-go/v4 v4.6.0 github.com/aws/aws-sdk-go-v2 v1.32.8 github.com/aws/aws-sdk-go-v2/config v1.28.10 github.com/aws/aws-sdk-go-v2/credentials v1.17.51 diff --git a/go.sum b/go.sum index 5e939f92..565079ab 100644 --- a/go.sum +++ b/go.sum @@ -646,6 +646,8 @@ github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kd github.com/apache/arrow/go/v10 v10.0.1/go.mod h1:YvhnlEePVnBS4+0z3fhPfUy7W1Ikj0Ih0vcRo/gZ1M0= github.com/apache/arrow/go/v11 v11.0.0/go.mod h1:Eg5OsL5H+e299f7u5ssuXsuHQVEGC4xei5aX110hRiI= github.com/apache/thrift v0.16.0/go.mod h1:PHK3hniurgQaNMZYaCLEqXKsYK8upmhPbmdP2FXSqgU= +github.com/avast/retry-go/v4 v4.6.0 h1:K9xNA+KeB8HHc2aWFuLb25Offp+0iVRXEvFx8IinRJA= +github.com/avast/retry-go/v4 v4.6.0/go.mod h1:gvWlPhBVsvBbLkVGDg/KwvBv0bEkCOLRRSHKIr2PyOE= github.com/aws/aws-sdk-go-v2 v1.32.8 h1:cZV+NUS/eGxKXMtmyhtYPJ7Z4YLoI/V8bkTdRZfYhGo= github.com/aws/aws-sdk-go-v2 v1.32.8/go.mod h1:P5WJBrYqqbWVaOxgH0X/FYYD47/nooaPOZPlQdmiN2U= github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.7 h1:lL7IfaFzngfx0ZwUGOZdsFFnQ5uLvR0hWqqhyE7Q9M8= diff --git a/storage/posix/files.go b/storage/posix/files.go index 79b1ac37..d09321a1 100644 --- a/storage/posix/files.go +++ b/storage/posix/files.go @@ -398,8 +398,10 @@ func (s *Storage) initialise(create bool) error { if err := s.writeTreeState(0, rfc6962.DefaultHasher.EmptyRoot()); err != nil { return fmt.Errorf("failed to write tree-state checkpoint: %v", err) } - if err := s.publishCheckpoint(0); err != nil { - return fmt.Errorf("failed to publish checkpoint: %v", err) + if s.newCP != nil { + if err := s.publishCheckpoint(0); err != nil { + return fmt.Errorf("failed to publish checkpoint: %v", err) + } } } curSize, _, err := s.readTreeState() @@ -506,3 +508,124 @@ func createExclusive(f string, d []byte) error { } return nil } + +// BundleHasherFunc is the signature of a function which knows how to parse an entry bundle and calculate leaf hashes for its entries. +type BundleHasherFunc func(entryBundle []byte) (LeafHashes [][]byte, err error) + +// NewMigrationTarget creates a new POSIX storage for the MigrationTarget lifecycle mode. +// - path is a directory in which the log should be stored +// - create must only be set when first creating the log, and will create the directory structure and an empty checkpoint +// - bundleHasher knows how to parse the provided entry bundle, and returns a slice of leaf hashes for the entries it contains. +func NewMigrationTarget(ctx context.Context, path string, create bool, bundleHasher BundleHasherFunc, opts ...func(*options.StorageOptions)) (*MigrationStorage, error) { + opt := storage.ResolveStorageOptions(opts...) + + r := &MigrationStorage{ + s: Storage{ + path: path, + entriesPath: opt.EntriesPath, + }, + bundleHasher: bundleHasher, + } + if err := r.s.initialise(create); err != nil { + return nil, err + } + return r, nil +} + +type MigrationStorage struct { + s Storage + bundleHasher BundleHasherFunc +} + +func (m *MigrationStorage) AwaitIntegration(ctx context.Context, sourceSize uint64) ([]byte, error) { + t := time.NewTicker(time.Second) + defer t.Stop() + for { + select { + case <-ctx.Done(): + return nil, ctx.Err() + case <-t.C: + } + if err := m.buildTree(ctx, sourceSize); err != nil { + klog.Warningf("buildTree: %v", err) + } + s, r, err := m.s.readTreeState() + if err != nil { + klog.Warningf("readTreeState: %v", err) + } + if s == sourceSize { + return r, nil + } + } +} + +func (m *MigrationStorage) SetEntryBundle(ctx context.Context, index uint64, partial uint8, bundle []byte) error { + return m.s.writeBundle(ctx, index, partial, bundle) +} + +func (m *MigrationStorage) State(_ context.Context) (uint64, []byte, error) { + return m.s.readTreeState() +} + +func (m *MigrationStorage) buildTree(ctx context.Context, targetSize uint64) error { + // Double locking: + // - The mutex `Lock()` ensures that multiple concurrent calls to this function within a task are serialised. + // - The POSIX `lockForTreeUpdate()` ensures that distinct tasks are serialised. + m.s.mu.Lock() + unlock, err := lockFile(filepath.Join(m.s.path, stateDir, "treeState.lock")) + if err != nil { + panic(err) + } + defer func() { + if err := unlock(); err != nil { + panic(err) + } + m.s.mu.Unlock() + }() + + size, _, err := m.s.readTreeState() + if err != nil { + if !errors.Is(err, os.ErrNotExist) { + return err + } + size = 0 + } + m.s.curSize = size + klog.V(1).Infof("Building from %d", m.s.curSize) + + lh, err := m.fetchLeafHashes(ctx, size, targetSize, targetSize) + if err != nil { + return fmt.Errorf("fetchLeafHashes(%d, %d): %v", size, targetSize, err) + } + + if err := m.s.doIntegrate(ctx, size, lh); err != nil { + return fmt.Errorf("doIntegrate(%d, ...): %v", size, err) + } + + return nil +} + +func (m *MigrationStorage) fetchLeafHashes(ctx context.Context, from, to, sourceSize uint64) ([][]byte, error) { + const maxBundles = 300 + + lh := make([][]byte, 0, maxBundles) + n := 0 + for ri := range layout.Range(from, to, sourceSize) { + b, err := m.s.ReadEntryBundle(ctx, ri.Index, ri.Partial) + if err != nil { + return nil, fmt.Errorf("ReadEntryBundle(%d.%d): %v", ri.Index, ri.Partial, err) + } + + bh, err := m.bundleHasher(b) + if err != nil { + return nil, fmt.Errorf("bundleHasherFunc for bundle index %d: %v", ri.Index, err) + } + lh = append(lh, bh[ri.First:ri.First+ri.N]...) + n++ + if n >= maxBundles { + break + } + } + return lh, nil + +}