Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Experimental migration support for POSIX #441

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 48 additions & 0 deletions api/layout/paths.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,54 @@ func EntriesPathForLogIndex(seq, logSize uint64) string {
return EntriesPath(seq/EntryBundleWidth, PartialTileSize(0, seq, logSize))
}

// RangeInfo represents a range of entries/hashes across one or more entry bundles/tiles.
type RangeInfo struct {
// StartIndex is the index of the first entry bundle the range touches.
StartIndex uint64
// StartPartial is non-zero if the bundle at StartIndex is expected to be partial.
StartPartial uint8
// StartFirst is the index of the first entry in the StartIndex bundle to be used.
StartFirst uint8

// EndIndex is the index of the final entry bundle containing part of the range.
EndIndex uint64
// EndPartial is non-zero if the bundle at EndIndex is expected to be partial.
EndPartial uint8
// EndN is the number of entries from the EndIndex bundle to be used. Zero means "all".
EndN uint8
}

// Range calculates which bundles, and elements contained within, are necessary to cover the
// provided [from, from+N) range of entries.
//
// If N=0, an empty slice will be returned.
// If [from, from+N) are covered within a single bundle, a single BundleSlice will be returned.
// Otherwise two BundleSlices will be returned: the first will contain information about the
// first bundle of the range, and the second information about the final bundle.
func Range(from, N, treeSize uint64) (RangeInfo, error) {
endInc := from + N - 1
switch {
case N == 0:
return RangeInfo{}, fmt.Errorf("empty range")
case endInc >= treeSize:
return RangeInfo{}, fmt.Errorf("range [%d, %d) is beyond treeSize (%d)", from, endInc, treeSize)
}

r := RangeInfo{
StartIndex: from / EntryBundleWidth,
StartFirst: uint8(from % EntryBundleWidth),
EndIndex: endInc / EntryBundleWidth,
EndN: uint8((endInc)%EntryBundleWidth) + 1,
}
if r.StartIndex == r.EndIndex {
r.EndN = r.EndN - r.StartFirst
}
r.StartPartial = PartialTileSize(0, r.StartIndex, treeSize)
r.EndPartial = PartialTileSize(0, r.EndIndex, treeSize)

return r, nil
}

// NWithSuffix returns a tiles-spec "N" path, with a partial suffix if p > 0.
func NWithSuffix(l, n uint64, p uint8) string {
suffix := ""
Expand Down
77 changes: 77 additions & 0 deletions api/layout/paths_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ package layout
import (
"fmt"
"testing"

"github.com/google/go-cmp/cmp"
)

func TestEntriesPathForLogIndex(t *testing.T) {
Expand Down Expand Up @@ -306,3 +308,78 @@ func TestParseTileLevelIndexPartial(t *testing.T) {
})
}
}

func TestRange(t *testing.T) {
for _, test := range []struct {
from, N, treeSize uint64
desc string
want RangeInfo
wantErr bool
}{
{
desc: "from beyond extent",
from: 10,
N: 1,
treeSize: 5,
wantErr: true,
}, {
desc: "range end beyond extent",
from: 3,
N: 100,
treeSize: 5,
wantErr: true,
}, {
desc: "empty range",
from: 1,
N: 0,
treeSize: 2,
wantErr: true,
}, {
desc: "ok: full first bundle",
from: 0,
N: 256,
treeSize: 257,
want: RangeInfo{},
}, {
desc: "ok: entire single (partial) bundle",
from: 20,
N: 90,
treeSize: 111,
want: RangeInfo{StartIndex: 0, StartFirst: 20, StartPartial: 111, EndN: 90, EndIndex: 0, EndPartial: 111},
}, {
desc: "ok: slice from single bundle with initial offset",
from: 20,
N: 90,
treeSize: 1 << 20,
want: RangeInfo{StartIndex: 0, StartFirst: 20, EndN: 90, EndIndex: 0},
}, {
desc: "ok: multiple bundles, first is full, last is truncated",
from: 0,
N: 4*256 + 42,
treeSize: 1 << 20,
want: RangeInfo{StartIndex: 0, StartFirst: 0, EndN: 42, EndIndex: 4},
}, {
desc: "ok: multiple bundles, first is offset, last is truncated",
from: 2,
N: 4*256 + 4,
treeSize: 1 << 20,
want: RangeInfo{StartIndex: 0, StartFirst: 2, EndN: 4 + 2, EndIndex: 4},
}, {
desc: "ok: offset and trucated from single bundle in middle of tree",
from: 8*256 + 66,
N: 4,
treeSize: 1 << 20,
want: RangeInfo{StartIndex: 8, StartFirst: 66, EndN: 4, EndIndex: 8},
},
} {
t.Run(fmt.Sprintf("%d->%d", test.from, test.N), func(t *testing.T) {
got, err := Range(test.from, test.N, test.treeSize)
if gotErr := err != nil; gotErr != test.wantErr {
t.Fatalf("got error: %q, want error: %t", err, test.wantErr)
}
if d := cmp.Diff(got, test.want); d != "" {
t.Fatalf("got results with diff:\n%s", d)
}
})
}
}
197 changes: 197 additions & 0 deletions cmd/experimental/migrate/internal/migrate.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,197 @@
// Copyright 2024 The Tessera authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package internal

import (
"context"
"fmt"
"sync/atomic"
"time"

"github.com/avast/retry-go/v4"
"github.com/transparency-dev/merkle/rfc6962"
"github.com/transparency-dev/trillian-tessera/api"
"github.com/transparency-dev/trillian-tessera/api/layout"
"github.com/transparency-dev/trillian-tessera/client"
"golang.org/x/sync/errgroup"
"k8s.io/klog/v2"
)

// copier controls the migration work.
type copier struct {
// storage is the target we're migrating to.
storage MigrationStorage
getEntries client.EntryBundleFetcherFunc

// sourceSize is the size of the source log.
sourceSize uint64
// sourceRoot is the root hash of the source log at sourceSize.
sourceRoot []byte

// todo contains work items to be completed.
todo chan bundle

// bundlesToMigrate is the total number of entry bundles which need to be copied.
bundlesToMigrate uint64
// bundlesMigrated is the number of entry bundles migrated so far.
bundlesMigrated atomic.Uint64
}

// bundle represents the address of an individual entry bundle.
type bundle struct {
Index uint64
Partial uint8
}

// MigrationStorage describes the required functionality from the target storage driver.
type MigrationStorage interface {
// SetEntryBundle is called to store the provided entry bundle bytes at the given coordinates.
SetEntryBundle(ctx context.Context, index uint64, partial uint8, bundle []byte) error
// AwaitIntegration should block until the storage driver has received and integrated all outstanding entry bundles implied by sourceSize.
// An error should be returned if there is a problem integrating, or if the root hash of the local tree at size sourceSize does not match sourceRoot.
AwaitIntegration(ctx context.Context, sourceSize uint64, sourceRoot []byte) error
// Size returns the current integrated size of the local tree.
Size(ctx context.Context) (uint64, error)
}

// Migrate starts the work of copying sourceSize entries from the source to the target log.
//
// Only the entry bundles are copied as the target storage is expected to integrate them and recalculate the root.
// This is done to ensure the correctness of both the source log as well as the copy process itself.
//
// A call to this function will block until either the copying is done, or an error has occurred.
// It is an error if the resource copying completes ok but the resulting root hash does not match the provided sourceRoot.
func Migrate(ctx context.Context, numWorkers int, sourceSize uint64, sourceRoot []byte, getEntries client.EntryBundleFetcherFunc, storage MigrationStorage) error {
klog.Infof("Starting migration; source size %d root %x", sourceSize, sourceRoot)

// TODO store state & resume
m := &copier{
storage: storage,
sourceSize: sourceSize,
sourceRoot: sourceRoot,
getEntries: getEntries,
todo: make(chan bundle, numWorkers*2),
}

// init
targetSize, err := m.storage.Size(ctx)
if err != nil {
return fmt.Errorf("Size: %v", err)
}
if targetSize > sourceSize {
return fmt.Errorf("Target size %d > source size %d", targetSize, sourceSize)
}
if targetSize == sourceSize {
return nil
}
go m.populateWork(targetSize, sourceSize)

// Print stats
go func() {
for {
time.Sleep(time.Second)
bn := m.bundlesMigrated.Load()
bnp := float64(bn*100) / float64(m.bundlesToMigrate)
s, err := m.storage.Size(ctx)
if err != nil {
klog.Warningf("Size: %v", err)
}
intp := float64(s*100) / float64(sourceSize)
klog.Infof("integration: %d (%.2f%%) bundles: %d (%.2f%%)", s, intp, bn, bnp)
}
}()

// Do the copying
eg := errgroup.Group{}
for i := 0; i < numWorkers; i++ {
eg.Go(func() error {
return m.migrateWorker(ctx)
})
}
if err := eg.Wait(); err != nil {
return fmt.Errorf("migrate failed to copy resources: %v", err)
}

if err := m.storage.AwaitIntegration(ctx, sourceSize, sourceRoot); err != nil {
klog.Exitf("Migration failed: %v", err)
}
klog.Infof("Migration successful.")
return nil
}

// populateWork sends entries to the `todo` work channel.
// Each entry corresponds to an individual entryBundle which needs to be copied.
func (m *copier) populateWork(from, treeSize uint64) {
klog.Infof("Spans for entry range [%d, %d)", from, treeSize)
defer close(m.todo)
defer klog.Infof("total bundles to fetch %d", m.bundlesToMigrate)

r, err := layout.Range(from, treeSize-from, treeSize)
if err != nil {
klog.Exitf("Range(%d, %d): %v", from, treeSize-from, err)
}

for idx := r.StartIndex; idx <= r.EndIndex; idx++ {
var p uint8
switch idx {
case r.StartIndex:
p = r.StartPartial
case r.EndIndex:
p = r.EndPartial
}
m.todo <- bundle{Index: idx, Partial: p}
m.bundlesToMigrate++
}
}

// migrateWorker undertakes work items from the `todo` channel.
//
// It will attempt to retry failed operations several times before giving up, this should help
// deal with any transient errors which may occur.
func (m *copier) migrateWorker(ctx context.Context) error {
for b := range m.todo {
err := retry.Do(func() error {
d, err := m.getEntries(ctx, b.Index, uint8(b.Partial))
if err != nil {
return fmt.Errorf("failed to fetch entrybundle %d (p=%d): %v", b.Index, b.Partial, err)
}
if err := m.storage.SetEntryBundle(ctx, b.Index, b.Partial, d); err != nil {
return fmt.Errorf("failed to store entrybundle %d (p=%d): %v", b.Index, b.Partial, err)
}
m.bundlesMigrated.Add(1)
return nil
},
retry.Attempts(10),
retry.DelayType(retry.BackOffDelay))
if err != nil {
return err
}
}
return nil
}

// BundleHasher parses a C2SP tlog-tile bundle and returns the leaf hashes of each entry it contains.
func BundleHasher(bundle []byte) ([][]byte, error) {
eb := &api.EntryBundle{}
if err := eb.UnmarshalText(bundle); err != nil {
return nil, fmt.Errorf("unmarshal: %v", err)
}
r := make([][]byte, 0, len(eb.Entries))
for _, e := range eb.Entries {
h := rfc6962.DefaultHasher.HashLeaf(e)
r = append(r, h[:])
}
return r, nil
}
Loading
Loading