Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

tapgarden: minter perf fixes #357

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
140 changes: 125 additions & 15 deletions tapgarden/caretaker.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,14 @@ import (
"bytes"
"context"
"fmt"
"runtime"
"strings"
"sync"
"time"

"github.com/btcsuite/btcd/btcutil"
"github.com/btcsuite/btcd/btcutil/psbt"
"github.com/btcsuite/btcd/chaincfg/chainhash"
"github.com/btcsuite/btcd/wire"
"github.com/davecgh/go-spew/spew"
"github.com/lightninglabs/taproot-assets/asset"
Expand All @@ -18,6 +20,7 @@ import (
"github.com/lightninglabs/taproot-assets/proof"
"github.com/lightninglabs/taproot-assets/universe"
"github.com/lightningnetwork/lnd/chainntnfs"
"github.com/lightningnetwork/lnd/keychain"
"golang.org/x/exp/maps"
)

Expand Down Expand Up @@ -381,15 +384,81 @@ func (b *BatchCaretaker) seedlingsToAssetSprouts(ctx context.Context,
"with genesis_point=%v", b.batchKey[:],
len(b.cfg.Batch.Seedlings), genesisPoint)

newAssets := make([]*asset.Asset, 0, len(b.cfg.Batch.Seedlings))
newAssets := make([]*asset.Asset, len(b.cfg.Batch.Seedlings))
batchSize := len(b.cfg.Batch.Seedlings)
threadCount := runtime.NumCPU()
scriptKeyDescs := make(chan keychain.KeyDescriptor, 10*threadCount)
scriptKeys := make(chan asset.ScriptKey, 10*threadCount)
errChan := make(chan error, threadCount)
metaChan := make(chan *proof.MetaReveal)
metaHashChan := make(chan [32]byte)
makeKeyChan := make(chan struct{}, threadCount)
var wg sync.WaitGroup

// Derive a new script key if a signal is sent on a channel. If key
// derivation fails, send out the error. The main sprout creation loop
// will propogate that error and exit.
newScriptKey := func(errChan chan<- error) {
for range makeKeyChan {
scriptKeyDesc, err := b.cfg.KeyRing.DeriveNextKey(
ctx, asset.TaprootAssetsKeyFamily,
)
if err != nil {
errChan <- fmt.Errorf("unable to obtain"+
"script key: %w", err)
return
}

select {
case scriptKeyDescs <- scriptKeyDesc:
default:
}
}
}

// Create script keys from new script key descriptors.
newBip86ScriptKey := func() {
for desc := range scriptKeyDescs {
scriptKey := asset.NewScriptKeyBip86(desc)
scriptKeys <- scriptKey
}
}

// Hash metadata for seedlings and forward the metadata hash.
newMetaHash := func() {
for meta := range metaChan {
metaHash := meta.MetaHash()
metaHashChan <- metaHash
}
}

// Start a pipeline to derive new script keys and metadata hashes in
// parallel. The signal to derive new keys is sent from this goroutine,
// and that signal can be closed before the main sprout creation loop
// is done reading keys. The channels for the other goroutines are
// closed after the main sprout creation loop.
wg.Add(1)
go func() {
defer wg.Done()
for i := 0; i < threadCount; i++ {
go newScriptKey(errChan)
go newBip86ScriptKey()
go newMetaHash()
}
for batchSize > 0 {
makeKeyChan <- struct{}{}
batchSize -= 1
}
close(makeKeyChan)
}()

// Seedlings that anchor a group may be referenced by other seedlings,
// and therefore need to be mapped to sprouts first so that we derive
// the initial tweaked group key early.
orederedSeedlings := SortSeedlings(maps.Values(b.cfg.Batch.Seedlings))
newGroups := make(map[string]*asset.AssetGroup, len(orederedSeedlings))

for _, seedlingName := range orederedSeedlings {
for ind, seedlingName := range orederedSeedlings {
seedling := b.cfg.Batch.Seedlings[seedlingName]

assetGen := asset.Genesis{
Expand All @@ -403,22 +472,22 @@ func (b *BatchCaretaker) seedlingsToAssetSprouts(ctx context.Context,
// that by including the hash of the meta data in the asset
// genesis.
if seedling.Meta != nil {
assetGen.MetaHash = seedling.Meta.MetaHash()
}

scriptKey, err := b.cfg.KeyRing.DeriveNextKey(
ctx, asset.TaprootAssetsKeyFamily,
)
if err != nil {
return nil, fmt.Errorf("unable to obtain script "+
"key: %w", err)
metaChan <- seedling.Meta
}

var (
err error
scriptKey asset.ScriptKey
groupInfo *asset.AssetGroup
sproutGroupKey *asset.GroupKey
)

select {
case err = <-errChan:
return nil, err
case scriptKey = <-scriptKeys:
}

// If the seedling has a group key specified,
// that group key was validated earlier. We need to
// sign the new genesis with that group key.
Expand All @@ -434,6 +503,11 @@ func (b *BatchCaretaker) seedlingsToAssetSprouts(ctx context.Context,
groupInfo = newGroups[*seedling.GroupAnchor]
}

// Set the metahash field before possibly deriving a group key.
if seedling.Meta != nil {
assetGen.MetaHash = <-metaHashChan
}

if groupInfo != nil {
sproutGroupKey, err = asset.DeriveGroupKey(
b.cfg.GenSigner, groupInfo.GroupKey.RawKey,
Expand Down Expand Up @@ -483,17 +557,21 @@ func (b *BatchCaretaker) seedlingsToAssetSprouts(ctx context.Context,
}

newAsset, err := asset.New(
assetGen, amount, 0, 0,
asset.NewScriptKeyBip86(scriptKey), sproutGroupKey,
assetGen, amount, 0, 0, scriptKey, sproutGroupKey,
)
if err != nil {
return nil, fmt.Errorf("unable to create new asset: %w",
err)
}

newAssets = append(newAssets, newAsset)
newAssets[ind] = newAsset
}

close(scriptKeyDescs)
close(scriptKeys)
close(metaChan)
wg.Wait()

// Now that we have all our assets created, we'll make a new
// Taproot asset commitment, which commits to all the assets we
// created above in a new root.
Expand Down Expand Up @@ -805,7 +883,9 @@ func (b *BatchCaretaker) stateStep(currentState BatchState) (BatchState, error)
ctx, cancel := b.WithCtxQuitNoTimeout()
defer cancel()

headerVerifier := GenHeaderVerifier(ctx, b.cfg.ChainBridge)
headerVerifier := GenCachingHeaderVerifier(
ctx, b.cfg.ChainBridge,
)

// Now that the minting transaction has been confirmed, we'll
// need to create the series of proof file blobs for each of
Expand Down Expand Up @@ -1033,3 +1113,33 @@ func GenHeaderVerifier(ctx context.Context,
return err
}
}

// GenCachingHeaderVerifier generate a block header on-chain verification
// callback that caches the first block hash verified with the chain bridge,
// and checks for equality with the cached block hash on future calls.
// This is only safe to use for proof generation after asset minting where all
// proofs share an anchor transaction.
func GenCachingHeaderVerifier(ctx context.Context,
chainBridge ChainBridge) func(header wire.BlockHeader) error {

return func(blockHeader wire.BlockHeader) error {
var staticBlockHash chainhash.Hash
if staticBlockHash.IsEqual(&chainhash.Hash{}) {
staticBlockHash = blockHeader.BlockHash()
_, err := chainBridge.GetBlock(
ctx, staticBlockHash,
)
return err
}

// Skip the RPC call if the block header is the same as the
// first used block header.
if blockHeader.BlockHash() == staticBlockHash {
return nil
}

return fmt.Errorf("passed block hash %x does not match cached"+
"%x", blockHeader.BlockHash().String(),
staticBlockHash.String())
}
}
7 changes: 5 additions & 2 deletions universe/auto_syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,8 +190,11 @@ func (f *FederationEnvoy) syncUniverseState(ctx context.Context,
}

// If we synced anything from the server, then we'll log that here.
log.Infof("Synced new Universe leaves from server=%v, diff=%v",
spew.Sdump(addr), spew.Sdump(diff))
log.Infof("Synced new Universe leaves from server=%v", spew.Sdump(addr))
for _, d := range diff {
log.Infof("diff:\n%v", d.StringForInfoLog())
log.Debugf(d.StringForDebugLog())
}

// Log a new sync event in the background now that we know we were able
// to contract the remote server.
Expand Down
3 changes: 2 additions & 1 deletion universe/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,8 @@ func withBaseUni[T any](fetcher uniFetcher, id Identifier,
func (a *MintingArchive) RootNode(ctx context.Context,
id Identifier) (BaseRoot, error) {

log.Debugf("Looking up root node for base Universe %v", spew.Sdump(id))
log.Debugf("Looking up root node for base Universe %v",
id.StringForLog())

return withBaseUni(a, id, func(baseUni BaseBackend) (BaseRoot, error) {
smtNode, assetName, err := baseUni.RootNode(ctx)
Expand Down
71 changes: 71 additions & 0 deletions universe/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"fmt"
"net"
"strconv"
"strings"

"github.com/btcsuite/btcd/btcec/v2"
"github.com/btcsuite/btcd/btcec/v2/schnorr"
Expand Down Expand Up @@ -71,6 +72,35 @@ type GenesisWithGroup struct {
*asset.GroupKey
}

// StringForLog returns a string representation of a GenesisWithGroup for
// logging.
func (g GenesisWithGroup) StringForLog() string {
groupKey := "<nil>"
groupSig := "<nil>"
if g.GroupKey != nil {
groupKey = hex.EncodeToString(
schnorr.SerializePubKey(&g.GroupPubKey),
)
groupSig = hex.EncodeToString(g.Sig.Serialize())
}

var genesisGroupStr strings.Builder
genesisGroupStr.WriteString(
fmt.Sprintln("FirstPrevOut:", g.Genesis.FirstPrevOut.String()),
)
genesisGroupStr.WriteString(fmt.Sprintln("Tag:", g.Genesis.Tag))
genesisGroupStr.WriteString(
fmt.Sprintf("Metahash: %x\n", g.Genesis.MetaHash[:]),
)
genesisGroupStr.WriteString(
fmt.Sprintln("OutputIndex:", g.Genesis.OutputIndex),
)
genesisGroupStr.WriteString(fmt.Sprintf("Type: %v\n", g.Genesis.Type))
genesisGroupStr.WriteString(fmt.Sprintln("Group Key:", groupKey))
genesisGroupStr.WriteString(fmt.Sprintln("Group Sig:", groupSig))
return genesisGroupStr.String()
}

// MintingLeaf is a leaf node in the SMT that represents a minting output. For
// each new asset created for a given asset/universe, a new minting leaf is
// created.
Expand All @@ -87,6 +117,13 @@ type MintingLeaf struct {
Amt uint64
}

// StringForLog returns a string representation of a MintingLeaf for
// logging, omitting the genesis proof.
func (m *MintingLeaf) StringForLog() string {
return fmt.Sprintf("MintingLeaf:\n%vAmt: %d",
m.GenesisWithGroup.StringForLog(), m.Amt)
}

// SmtLeafNode returns the SMT leaf node for the given minting leaf.
func (m *MintingLeaf) SmtLeafNode() *mssmt.LeafNode {
return mssmt.NewLeafNode(m.GenesisProof[:], m.Amt)
Expand Down Expand Up @@ -206,6 +243,21 @@ type BaseRoot struct {
AssetName string
}

// StringForLog returns a string representation of a BaseRoot for logging.
func (b *BaseRoot) StringForLog() string {
var baseRootStr strings.Builder
baseRootStr.WriteString(fmt.Sprintln("ID:", b.ID.StringForLog()))
nodeStr := "Node: <nil>"
if b.Node != nil {
nodeStr = fmt.Sprintf("NodeHash: %v Sum: %d\n",
b.Node.NodeHash(), b.NodeSum())
}
baseRootStr.WriteString(nodeStr)
baseRootStr.WriteString(fmt.Sprintln(b.AssetName))

return baseRootStr.String()
}

// BaseForest is an interface used to keep track of the set of base universe
// roots that we know of. The BaseBackend interface is used to interact with a
// particular base universe, while this is used to obtain aggregate information
Expand Down Expand Up @@ -365,6 +417,25 @@ type AssetSyncDiff struct {
// * can used a sealed interface to return the error
}

// StringForInfoLog returns a string representation of an AssetSyncDiff for
// logging at the info level, omitting all leaf proofs.
func (d *AssetSyncDiff) StringForInfoLog() string {
oldRoot := d.OldUniverseRoot.StringForLog()
newRoot := d.NewUniverseRoot.StringForLog()
return fmt.Sprintf("old:\n%vnew:\n%v", oldRoot, newRoot)
}

// StringForDebugLog returns a string representation of an AssetSyncDiff for
// logging at the debug level, intended to be printed after StringForInfoLog.
func (d *AssetSyncDiff) StringForDebugLog() string {
var syncDiffStr strings.Builder
for _, leaf := range d.NewLeafProofs {
syncDiffStr.WriteString(fmt.Sprintln(leaf.StringForLog()))
}

return syncDiffStr.String()
}

// Syncer is used to synchronize the state of two Universe instances: a local
// instance and a remote instance. As a Universe is a tree based structure,
// tree based bisection can be used to find the point of divergence with
Expand Down
14 changes: 8 additions & 6 deletions universe/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,10 +179,10 @@ func (s *SimpleSyncer) executeSync(ctx context.Context, diffEngine DiffEngine,
// TODO(roasbeef): inclusion w/ root here, also that
// it's the expected asset ID

log.Infof("UniverseRoot(%v): inserting new leaf",
uniID.String())
log.Debugf("UniverseRoot(%v): inserting new leaf",
uniID.StringForLog())
log.Tracef("UniverseRoot(%v): inserting new leaf for "+
"key=%v", uniID.String(), spew.Sdump(key))
"key=%v", uniID.StringForLog(), spew.Sdump(key))

// TODO(roasbeef): this is actually giving a lagging
// proof for each of them
Expand All @@ -202,7 +202,8 @@ func (s *SimpleSyncer) executeSync(ctx context.Context, diffEngine DiffEngine,
}

log.Infof("Universe sync for UniverseRoot(%v) complete, %d "+
"new leaves inserted", uniID.String(), len(keysToFetch))
"new leaves inserted", uniID.StringForLog(),
len(keysToFetch))

// TODO(roabseef): sanity check local and remote roots match
// now?
Expand All @@ -215,9 +216,10 @@ func (s *SimpleSyncer) executeSync(ctx context.Context, diffEngine DiffEngine,
NewLeafProofs: fn.Collect(newLeaves),
}

log.Infof("Sync for UniverseRoot(%v) complete!", uniID.String())
log.Infof("Sync for UniverseRoot(%v) complete!",
uniID.StringForLog())
log.Tracef("Sync for UniverseRoot(%v) complete! New "+
"universe_root=%v", uniID.String(),
"universe_root=%v", uniID.StringForLog(),
spew.Sdump(remoteRoot))

return nil
Expand Down