diff --git a/.github/workflows/main.yaml b/.github/workflows/main.yaml index 9d91128d3..8a3e09f69 100644 --- a/.github/workflows/main.yaml +++ b/.github/workflows/main.yaml @@ -209,7 +209,7 @@ jobs: ######################## integration-test-postgres: name: run itests postgres - runs-on: ubuntu-latest + runs-on: ubuntu-20.04 steps: - name: git checkout uses: actions/checkout@v3 diff --git a/Makefile b/Makefile index 0783b8a0a..409d819ef 100644 --- a/Makefile +++ b/Makefile @@ -179,20 +179,6 @@ itest-only-trace: aperture-dir rm -rf itest/regtest; date $(GOTEST) ./itest -v -tags="$(ITEST_TAGS)" $(TEST_FLAGS) $(ITEST_FLAGS) -loglevel=trace -btcdexec=./btcd-itest -logdir=regtest -optional-itest: build-itest optional-itest-only - -optional-itest-trace: build-itest optional-itest-only-trace - -optional-itest-only: aperture-dir - @$(call print, "Running integration tests with ${backend} backend.") - rm -rf itest/regtest; date - $(GOTEST) ./itest -v -tags="$(ITEST_TAGS)" $(TEST_FLAGS) $(ITEST_FLAGS) -optional -btcdexec=./btcd-itest -logdir=regtest - -optional-itest-only-trace: aperture-dir - @$(call print, "Running integration tests with ${backend} backend.") - rm -rf itest/regtest; date - $(GOTEST) ./itest -v -tags="$(ITEST_TAGS)" $(TEST_FLAGS) $(ITEST_FLAGS) -optional -loglevel=trace -btcdexec=./btcd-itest -logdir=regtest - aperture-dir: ifeq ($(UNAME_S),Linux) mkdir -p $$HOME/.aperture diff --git a/fn/recv.go b/fn/recv.go index e4633e9e4..419dc882a 100644 --- a/fn/recv.go +++ b/fn/recv.go @@ -1,6 +1,7 @@ package fn import ( + "context" "fmt" "time" ) @@ -40,8 +41,6 @@ func RecvResp[T any](r <-chan T, e <-chan error, q <-chan struct{}) (T, error) { // // NOTE: This function closes the channel to be able to collect all items at // once. -// -// TODO(roasbeef): instead could take a number of items to recv? func Collect[T any](c chan T) []T { close(c) @@ -52,3 +51,35 @@ func Collect[T any](c chan T) []T { return out } + +// CollectBatch reads from the given channel and returns batchSize items at a +// time and a boolean that indicates whether we expect more items to be sent +// on the channel. If the context is canceled, the function returns the items +// that have been read so far and the context's error. +// +// NOTE: The channel MUST be closed for this function to return. +func CollectBatch[V any](ctx context.Context, values <-chan V, + batchSize int, cb func(ctx context.Context, batch []V) error) error { + + batch := make([]V, 0, batchSize) + for { + select { + case v, ok := <-values: + if !ok { + return cb(ctx, batch) + } + batch = append(batch, v) + + if len(batch) == batchSize { + err := cb(ctx, batch) + if err != nil { + return err + } + batch = make([]V, 0, batchSize) + } + + case <-ctx.Done(): + return ctx.Err() + } + } +} diff --git a/fn/recv_test.go b/fn/recv_test.go new file mode 100644 index 000000000..39dfcd899 --- /dev/null +++ b/fn/recv_test.go @@ -0,0 +1,61 @@ +package fn + +import ( + "context" + "testing" + "time" + + "github.com/stretchr/testify/require" +) + +var ( + testTimeout = 100 * time.Millisecond +) + +func TestCollectBatch(t *testing.T) { + t.Parallel() + + ctxb := context.Background() + ctxt, cancel := context.WithTimeout(ctxb, testTimeout) + defer cancel() + + // First, test the expected normal case where we receive all the items + // and the channel is closed. + var ( + c = make(chan int, 10) + numReceived = 0 + ) + + for i := 0; i < 10; i++ { + c <- i + } + close(c) + + err := CollectBatch( + ctxt, c, 3, func(ctx context.Context, batch []int) error { + numReceived += len(batch) + + return nil + }, + ) + require.NoError(t, err) + require.Equal(t, 10, numReceived) + + // If we don't close the channel, then we expect to run into the + // timeout and only receive 9 out of 10 items (the last batch is never + // completed). + c = make(chan int, 10) + numReceived = 0 + for i := 0; i < 10; i++ { + c <- i + } + err = CollectBatch( + ctxt, c, 3, func(ctx context.Context, batch []int) error { + numReceived += len(batch) + + return nil + }, + ) + require.ErrorIs(t, err, context.DeadlineExceeded) + require.Equal(t, 9, numReceived) +} diff --git a/itest/mint_batch_stress_test.go b/itest/mint_batch_stress_test.go index 494af467a..9ff63a7a1 100644 --- a/itest/mint_batch_stress_test.go +++ b/itest/mint_batch_stress_test.go @@ -39,14 +39,14 @@ func testMintBatch100StressTest(t *harnessTest) { func testMintBatch1kStressTest(t *harnessTest) { batchSize := 1_000 - timeout := defaultWaitTimeout * 10 + timeout := defaultWaitTimeout * 20 testMintBatchNStressTest(t, batchSize, timeout) } func testMintBatch10kStressTest(t *harnessTest) { batchSize := 10_000 - timeout := defaultWaitTimeout * 100 + timeout := defaultWaitTimeout * 200 testMintBatchNStressTest(t, batchSize, timeout) } diff --git a/itest/tapd_harness.go b/itest/tapd_harness.go index 9066932ba..feb533955 100644 --- a/itest/tapd_harness.go +++ b/itest/tapd_harness.go @@ -34,6 +34,14 @@ var ( // to use when starting a tap daemon. dbbackend = flag.String("dbbackend", "sqlite", "Set the database "+ "backend to use when starting a tap daemon.") + + // postgresTimeout is a command line flag for specifying the amount of + // time to allow the postgres fixture to run in total. Needs to be + // increased for long-running tests. + postgresTimeout = flag.Duration("postgrestimeout", + tapdb.DefaultPostgresFixtureLifetime, "The amount of time to "+ + "allow the postgres fixture to run in total. Needs "+ + "to be increased for long-running tests.") ) const ( @@ -109,7 +117,7 @@ func newTapdHarness(ht *harnessTest, cfg tapdConfig, case tapcfg.DatabaseBackendPostgres: fixture := tapdb.NewTestPgFixture( - ht.t, tapdb.DefaultPostgresFixtureLifetime, !*noDelete, + ht.t, *postgresTimeout, !*noDelete, ) ht.t.Cleanup(func() { if !*noDelete { diff --git a/make/testing_flags.mk b/make/testing_flags.mk index a070814d8..1032c10a5 100644 --- a/make/testing_flags.mk +++ b/make/testing_flags.mk @@ -36,6 +36,11 @@ ifneq ($(nodelete),) ITEST_FLAGS += -nodelete endif +# Run the optional tests. +ifneq ($(optional),) +ITEST_FLAGS += -optional -postgrestimeout=240m +endif + # Run itests with specified db backend. ifneq ($(dbbackend),) ITEST_FLAGS += -dbbackend=$(dbbackend) @@ -69,6 +74,8 @@ endif # test command. If not, we set 60m (up from the default 10m). ifneq ($(timeout),) TEST_FLAGS += -test.timeout=$(timeout) +else ifneq ($(optional),) +TEST_FLAGS += -test.timeout=240m else TEST_FLAGS += -test.timeout=60m endif diff --git a/rpcserver.go b/rpcserver.go index 377baf64f..f3e88dd16 100644 --- a/rpcserver.go +++ b/rpcserver.go @@ -2591,17 +2591,15 @@ func (r *rpcServer) AssetLeafKeys(ctx context.Context, func marshalAssetLeaf(ctx context.Context, keys KeyLookup, assetLeaf *universe.MintingLeaf) (*unirpc.AssetLeaf, error) { - // In order to display the full asset, we'll parse the genesis - // proof so we can map that to the asset being proved. - var assetProof proof.Proof - if err := assetProof.Decode( - bytes.NewReader(assetLeaf.GenesisProof), - ); err != nil { + // In order to display the full asset, we'll also encode the genesis + // proof. + var buf bytes.Buffer + if err := assetLeaf.GenesisProof.Encode(&buf); err != nil { return nil, err } rpcAsset, err := MarshalAsset( - ctx, &assetProof.Asset, false, true, keys, + ctx, &assetLeaf.GenesisProof.Asset, false, true, keys, ) if err != nil { return nil, err @@ -2609,7 +2607,7 @@ func marshalAssetLeaf(ctx context.Context, keys KeyLookup, return &unirpc.AssetLeaf{ Asset: rpcAsset, - IssuanceProof: assetLeaf.GenesisProof[:], + IssuanceProof: buf.Bytes(), }, nil } @@ -2859,7 +2857,7 @@ func unmarshalAssetLeaf(leaf *unirpc.AssetLeaf) (*universe.MintingLeaf, error) { Genesis: assetProof.Asset.Genesis, GroupKey: assetProof.Asset.GroupKey, }, - GenesisProof: leaf.IssuanceProof, + GenesisProof: &assetProof, Amt: assetProof.Asset.Amount, }, nil } diff --git a/tapcfg/config.go b/tapcfg/config.go index babfaf36d..bf1db1787 100644 --- a/tapcfg/config.go +++ b/tapcfg/config.go @@ -101,6 +101,10 @@ const ( // to sync Universe state with the federation. defaultUniverseSyncInterval = time.Minute * 10 + // defaultUniverseSyncBatchSize is the default number of proofs we'll + // sync in a single batch. + defaultUniverseSyncBatchSize = 200 + // defaultReOrgSafeDepth is the default number of confirmations we'll // wait for before considering a transaction safely buried in the chain. defaultReOrgSafeDepth = 6 diff --git a/tapcfg/server.go b/tapcfg/server.go index 9aed1f92b..acba44652 100644 --- a/tapcfg/server.go +++ b/tapcfg/server.go @@ -223,6 +223,7 @@ func genServerConfig(cfg *Config, cfgLogger btclog.Logger, LocalDiffEngine: baseUni, NewRemoteDiffEngine: tap.NewRpcUniverseDiff, LocalRegistrar: baseUni, + SyncBatchSize: defaultUniverseSyncBatchSize, }) federationMembers := cfg.Universe.FederationServers @@ -284,9 +285,10 @@ func genServerConfig(cfg *Config, cfgLogger btclog.Logger, GenSigner: tap.NewLndRpcGenSigner( lndServices, ), - ProofFiles: proofFileStore, - Universe: universeFederation, - ProofWatcher: reOrgWatcher, + ProofFiles: proofFileStore, + Universe: universeFederation, + ProofWatcher: reOrgWatcher, + UniversePushBatchSize: defaultUniverseSyncBatchSize, }, BatchTicker: ticker.NewForce(cfg.BatchMintingInterval), ProofUpdates: proofArchive, diff --git a/tapdb/interfaces.go b/tapdb/interfaces.go index 8906bc10b..3ded6b875 100644 --- a/tapdb/interfaces.go +++ b/tapdb/interfaces.go @@ -3,6 +3,7 @@ package tapdb import ( "context" "database/sql" + "math" prand "math/rand" "time" @@ -21,9 +22,16 @@ const ( // repetition. DefaultNumTxRetries = 10 - // DefaultRetryDelay is the default delay between retries. This will be - // used to generate a random delay between 0 and this value. - DefaultRetryDelay = time.Millisecond * 50 + // DefaultInitialRetryDelay is the default initial delay between + // retries. This will be used to generate a random delay between -50% + // and +50% of this value, so 20 to 60 milliseconds. The retry will be + // doubled after each attempt until we reach DefaultMaxRetryDelay. We + // start with a random value to avoid multiple goroutines that are + // created at the same time to effectively retry at the same time. + DefaultInitialRetryDelay = time.Millisecond * 40 + + // DefaultMaxRetryDelay is the default maximum delay between retries. + DefaultMaxRetryDelay = time.Second * 3 ) // TxOptions represents a set of options one can use to control what type of @@ -91,23 +99,50 @@ type BatchedQuerier interface { // executor. This can be used to do things like retry a transaction due to an // error a certain amount of times. type txExecutorOptions struct { - numRetries int - retryDelay time.Duration + numRetries int + initialRetryDelay time.Duration + maxRetryDelay time.Duration } // defaultTxExecutorOptions returns the default options for the transaction // executor. func defaultTxExecutorOptions() *txExecutorOptions { return &txExecutorOptions{ - numRetries: DefaultNumTxRetries, - retryDelay: DefaultRetryDelay, + numRetries: DefaultNumTxRetries, + initialRetryDelay: DefaultInitialRetryDelay, + maxRetryDelay: DefaultMaxRetryDelay, } } -// randRetryDelay returns a random retry delay between 0 and the configured max -// delay. -func (t *txExecutorOptions) randRetryDelay() time.Duration { - return time.Duration(prand.Int63n(int64(t.retryDelay))) //nolint:gosec +// randRetryDelay returns a random retry delay between -50% and +50% +// of the configured delay that is doubled for each attempt and capped at a max +// value. +func (t *txExecutorOptions) randRetryDelay(attempt int) time.Duration { + halfDelay := t.initialRetryDelay / 2 + randDelay := prand.Int63n(int64(t.initialRetryDelay)) //nolint:gosec + + // 50% plus 0%-100% gives us the range of 50%-150%. + initialDelay := halfDelay + time.Duration(randDelay) + + // If this is the first attempt, we just return the initial delay. + if attempt == 0 { + return initialDelay + } + + // For each subsequent delay, we double the initial delay. This still + // gives us a somewhat random delay, but it still increases with each + // attempt. If we double something n times, that's the same as + // multiplying the value with 2^n. We limit the power to 32 to avoid + // overflows. + factor := time.Duration(math.Pow(2, math.Min(float64(attempt), 32))) + actualDelay := initialDelay * factor + + // Cap the delay at the maximum configured value. + if actualDelay > t.maxRetryDelay { + return t.maxRetryDelay + } + + return actualDelay } // TxExecutorOption is a functional option that allows us to pass in optional @@ -126,7 +161,7 @@ func WithTxRetries(numRetries int) TxExecutorOption { // to wait before a transaction is retried. func WithTxRetryDelay(delay time.Duration) TxExecutorOption { return func(o *txExecutorOptions) { - o.retryDelay = delay + o.initialRetryDelay = delay } } @@ -171,7 +206,7 @@ func (t *TransactionExecutor[Q]) ExecTx(ctx context.Context, txOptions TxOptions, txBody func(Q) error) error { waitBeforeRetry := func(attemptNumber int) { - retryDelay := t.opts.randRetryDelay() + retryDelay := t.opts.randRetryDelay(attemptNumber) log.Tracef("Retrying transaction due to tx serialization "+ "error, attempt_number=%v, delay=%v", attemptNumber, diff --git a/tapdb/interfaces_test.go b/tapdb/interfaces_test.go new file mode 100644 index 000000000..1cf7b6962 --- /dev/null +++ b/tapdb/interfaces_test.go @@ -0,0 +1,30 @@ +package tapdb + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +func TestExecutorOptionRetryDelay(t *testing.T) { + t.Parallel() + + opts := defaultTxExecutorOptions() + + halfDelay := opts.initialRetryDelay / 2 + + // Expect a random delay between -0.5 and +0.5 of the initial delay. + require.InDelta( + t, opts.initialRetryDelay, opts.randRetryDelay(0), + float64(halfDelay), + ) + + // Expect the second attempt to be double the initial delay. + require.InDelta( + t, opts.initialRetryDelay*2, opts.randRetryDelay(1), + float64(halfDelay*2), + ) + + // Expect the value to be capped at the maximum delay. + require.Equal(t, opts.maxRetryDelay, opts.randRetryDelay(100)) +} diff --git a/tapdb/multiverse.go b/tapdb/multiverse.go index d5f1ca050..bc10df3ec 100644 --- a/tapdb/multiverse.go +++ b/tapdb/multiverse.go @@ -273,3 +273,70 @@ func (b *BaseMultiverse) RegisterIssuance(ctx context.Context, return issuanceProof, nil } + +// RegisterBatchIssuance inserts a new minting leaf batch within the multiverse +// tree and the universe tree that corresponds to the given base key(s). +func (b *BaseMultiverse) RegisterBatchIssuance(ctx context.Context, + items []*universe.IssuanceItem) error { + + insertProof := func(item *universe.IssuanceItem, + dbTx BaseMultiverseStore) error { + + // Register issuance in the asset (group) specific universe + // tree. + _, universeRoot, err := universeRegisterIssuance( + ctx, dbTx, item.ID, item.Key, item.Leaf, + item.MetaReveal, + ) + if err != nil { + return err + } + + // Retrieve a handle to the multiverse tree so that we can + // update the tree by inserting a new issuance. + multiverseTree := mssmt.NewCompactedTree( + newTreeStoreWrapperTx(dbTx, multiverseNS), + ) + + // Construct a leaf node for insertion into the multiverse tree. + // The leaf node includes a reference to the lower tree via the + // lower tree root hash. + universeRootHash := universeRoot.NodeHash() + assetGroupSum := universeRoot.NodeSum() + + leafNode := mssmt.NewLeafNode( + universeRootHash[:], assetGroupSum, + ) + + // Use asset ID (or asset group hash) as the upper tree leaf + // node key. This is the same as the asset specific universe ID. + leafNodeKey := item.ID.Bytes() + + _, err = multiverseTree.Insert(ctx, leafNodeKey, leafNode) + if err != nil { + return err + } + + return nil + } + + var writeTx BaseMultiverseOptions + dbErr := b.db.ExecTx( + ctx, &writeTx, func(store BaseMultiverseStore) error { + for idx := range items { + item := items[idx] + err := insertProof(item, store) + if err != nil { + return err + } + } + + return nil + }, + ) + if dbErr != nil { + return dbErr + } + + return nil +} diff --git a/tapdb/postgres.go b/tapdb/postgres.go index f673f49fa..167a5a6e0 100644 --- a/tapdb/postgres.go +++ b/tapdb/postgres.go @@ -22,7 +22,7 @@ var ( // container will be terminated forcefully, even if the tests aren't // fully executed yet. So this time needs to be chosen correctly to be // longer than the longest expected individual test run time. - DefaultPostgresFixtureLifetime = 10 * time.Minute + DefaultPostgresFixtureLifetime = 60 * time.Minute ) // PostgresConfig holds the postgres database configuration. diff --git a/tapdb/universe.go b/tapdb/universe.go index 49f33307f..1d081fb15 100644 --- a/tapdb/universe.go +++ b/tapdb/universe.go @@ -6,7 +6,6 @@ import ( "database/sql" "errors" "fmt" - "sync" "github.com/btcsuite/btcd/btcec/v2/schnorr" "github.com/btcsuite/btcd/wire" @@ -122,8 +121,6 @@ type BaseUniverseTree struct { id universe.Identifier - registrationMtx sync.Mutex - smtNamespace string } @@ -217,7 +214,7 @@ func (t *treeStoreWrapperTx) View(ctx context.Context, // exist. Otherwise, the primary key of the existing asset ID is returned. func upsertAssetGen(ctx context.Context, db UpsertAssetStore, assetGen asset.Genesis, groupKey *asset.GroupKey, - genesisProof proof.Blob) (int32, error) { + genesisProof *proof.Proof) (int32, error) { // First, given the genesis point in the passed genesis, we'll insert a // new genesis point in the DB. @@ -244,23 +241,17 @@ func upsertAssetGen(ctx context.Context, db UpsertAssetStore, } } - genProof := &proof.Proof{} - err = genProof.Decode(bytes.NewReader(genesisProof)) - if err != nil { - return 0, fmt.Errorf("unable to decode genesis proof: %w", err) - } - var txBuf bytes.Buffer - if err := genProof.AnchorTx.Serialize(&txBuf); err != nil { + if err := genesisProof.AnchorTx.Serialize(&txBuf); err != nil { return 0, fmt.Errorf("unable to serialize anchor tx: %w", err) } - genTXID := genProof.AnchorTx.TxHash() - genBlockHash := genProof.BlockHeader.BlockHash() + genTXID := genesisProof.AnchorTx.TxHash() + genBlockHash := genesisProof.BlockHeader.BlockHash() chainTXID, err := db.UpsertChainTx(ctx, ChainTxParams{ Txid: genTXID[:], RawTx: txBuf.Bytes(), - BlockHeight: sqlInt32(genProof.BlockHeight), + BlockHeight: sqlInt32(genesisProof.BlockHeight), BlockHash: genBlockHash[:], }) if err != nil { @@ -297,11 +288,6 @@ func (b *BaseUniverseTree) RegisterIssuance(ctx context.Context, err error issuanceProof *universe.IssuanceProof ) - - // Limit to a single writer at a time. - b.registrationMtx.Lock() - defer b.registrationMtx.Unlock() - dbErr := b.db.ExecTx(ctx, &writeTx, func(dbTx BaseUniverseStore) error { issuanceProof, _, err = universeRegisterIssuance( ctx, dbTx, b.id, key, leaf, metaReveal, @@ -337,7 +323,10 @@ func universeRegisterIssuance(ctx context.Context, dbTx BaseUniverseStore, // The value stored in the MS-SMT will be the serialized MintingLeaf, // so we'll convert that into raw bytes now. - leafNode := leaf.SmtLeafNode() + leafNode, err := leaf.SmtLeafNode() + if err != nil { + return nil, nil, err + } var groupKeyBytes []byte if id.GroupKey != nil { @@ -551,6 +540,12 @@ func universeFetchIssuanceProof(ctx context.Context, return err } + var genProof proof.Proof + err = genProof.Decode(bytes.NewReader(leaf.GenesisProof)) + if err != nil { + return fmt.Errorf("unable to decode proof: %w", err) + } + issuanceProof := &universe.IssuanceProof{ MintingKey: universeKey, UniverseRoot: rootNode, @@ -559,7 +554,7 @@ func universeFetchIssuanceProof(ctx context.Context, GenesisWithGroup: universe.GenesisWithGroup{ Genesis: leafAssetGen, }, - GenesisProof: leaf.GenesisProof, + GenesisProof: &genProof, Amt: uint64(leaf.SumAmt), }, } @@ -635,8 +630,8 @@ func (b *BaseUniverseTree) MintingKeys(ctx context.Context, } // MintingLeaves returns all the minting leaves inserted into the universe. -func (b *BaseUniverseTree) MintingLeaves(ctx context.Context, -) ([]universe.MintingLeaf, error) { +func (b *BaseUniverseTree) MintingLeaves( + ctx context.Context) ([]universe.MintingLeaf, error) { var leaves []universe.MintingLeaf @@ -665,13 +660,22 @@ func (b *BaseUniverseTree) MintingLeaves(ctx context.Context, return err } + var genProof proof.Proof + err = genProof.Decode(bytes.NewReader( + dbLeaf.GenesisProof, + )) + if err != nil { + return fmt.Errorf("unable to decode proof: %w", + err) + } + // Now that we have the leaves, we'll encode them all // into the set of minting leaves. leaf := universe.MintingLeaf{ GenesisWithGroup: universe.GenesisWithGroup{ Genesis: leafAssetGen, }, - GenesisProof: dbLeaf.GenesisProof, + GenesisProof: &genProof, Amt: uint64(dbLeaf.SumAmt), } if b.id.GroupKey != nil { diff --git a/tapdb/universe_stats.go b/tapdb/universe_stats.go index c9862e51d..dc0025a72 100644 --- a/tapdb/universe_stats.go +++ b/tapdb/universe_stats.go @@ -145,6 +145,37 @@ func (u *UniverseStats) LogSyncEvent(ctx context.Context, }) } +// LogSyncEvents logs sync events for the target universe. +func (u *UniverseStats) LogSyncEvents(ctx context.Context, + uniIDs ...universe.Identifier) error { + + var writeOpts UniverseStatsOptions + return u.db.ExecTx(ctx, &writeOpts, func(db UniverseStatsStore) error { + for idx := range uniIDs { + uniID := uniIDs[idx] + + var groupKeyXOnly []byte + if uniID.GroupKey != nil { + groupKeyXOnly = schnorr.SerializePubKey( + uniID.GroupKey, + ) + } + + err := db.InsertNewSyncEvent(ctx, NewSyncEvent{ + EventTime: u.clock.Now(), + EventTimestamp: u.clock.Now().UTC().Unix(), + AssetID: uniID.AssetID[:], + GroupKeyXOnly: groupKeyXOnly, + }) + if err != nil { + return err + } + } + + return nil + }) +} + // LogNewProofEvent logs a new proof insertion event for the target universe. func (u *UniverseStats) LogNewProofEvent(ctx context.Context, uniID universe.Identifier, key universe.BaseKey) error { @@ -165,6 +196,36 @@ func (u *UniverseStats) LogNewProofEvent(ctx context.Context, }) } +// LogNewProofEvents logs new proof insertion events for the target universe. +func (u *UniverseStats) LogNewProofEvents(ctx context.Context, + uniIDs ...universe.Identifier) error { + + var writeTxOpts UniverseStatsOptions + return u.db.ExecTx(ctx, &writeTxOpts, func(db UniverseStatsStore) error { + for idx := range uniIDs { + uniID := uniIDs[idx] + var groupKeyXOnly []byte + if uniID.GroupKey != nil { + groupKeyXOnly = schnorr.SerializePubKey( + uniID.GroupKey, + ) + } + + err := db.InsertNewProofEvent(ctx, NewProofEvent{ + EventTime: u.clock.Now(), + EventTimestamp: u.clock.Now().UTC().Unix(), + AssetID: uniID.AssetID[:], + GroupKeyXOnly: groupKeyXOnly, + }) + if err != nil { + return err + } + } + + return nil + }) +} + // AggregateSyncStats returns stats aggregated over all assets within the // Universe. func (u *UniverseStats) AggregateSyncStats( diff --git a/tapdb/universe_test.go b/tapdb/universe_test.go index 1d3a8cc30..e9e484d51 100644 --- a/tapdb/universe_test.go +++ b/tapdb/universe_test.go @@ -1,7 +1,6 @@ package tapdb import ( - "bytes" "context" "database/sql" "math/rand" @@ -87,9 +86,8 @@ func randBaseKey(t *testing.T) universe.BaseKey { } } -func randProof(t *testing.T) proof.Blob { - var buf bytes.Buffer - p := &proof.Proof{ +func randProof(t *testing.T) *proof.Proof { + return &proof.Proof{ PrevOut: wire.OutPoint{}, BlockHeader: wire.BlockHeader{ Timestamp: time.Unix(rand.Int63(), 0), @@ -106,9 +104,6 @@ func randProof(t *testing.T) proof.Blob { InternalKey: test.RandPubKey(t), }, } - require.NoError(t, p.Encode(&buf)) - - return buf.Bytes() } func randMintingLeaf(t *testing.T, assetGen asset.Genesis, @@ -194,8 +189,10 @@ func TestUniverseIssuanceProofs(t *testing.T) { // We should be able to verify the issuance proof given the // root of the SMT. + node, err := leaf.SmtLeafNode() + require.NoError(t, err) proofRoot := issuanceProof.InclusionProof.Root( - targetKey.UniverseKey(), leaf.SmtLeafNode(), + targetKey.UniverseKey(), node, ) require.True(t, mssmt.IsEqualNode(rootNode, proofRoot)) @@ -214,9 +211,10 @@ func TestUniverseIssuanceProofs(t *testing.T) { // The issuance proof we obtained should have a valid inclusion // proof. + node, err = uniProof.Leaf.SmtLeafNode() + require.NoError(t, err) dbProofRoot := uniProof.InclusionProof.Root( - uniProof.MintingKey.UniverseKey(), - uniProof.Leaf.SmtLeafNode(), + uniProof.MintingKey.UniverseKey(), node, ) require.True( t, mssmt.IsEqualNode(uniProof.UniverseRoot, dbProofRoot), @@ -495,6 +493,19 @@ func TestUniverseLeafQuery(t *testing.T) { require.NoError(t, err) require.Len(t, p, 1) - require.Equal(t, leaf, *p[0].Leaf) + // We can't compare the raw leaves as the proofs looks slightly + // differently after an encode->decode cycle (nil vs. empty + // slices and so on). + require.Equal( + t, leaf.GenesisWithGroup, p[0].Leaf.GenesisWithGroup, + ) + + expectedNode, err := leaf.SmtLeafNode() + require.NoError(t, err) + + actualNode, err := p[0].Leaf.SmtLeafNode() + require.NoError(t, err) + + require.True(t, mssmt.IsEqualNode(expectedNode, actualNode)) } } diff --git a/tapgarden/caretaker.go b/tapgarden/caretaker.go index 0ee6ea8bf..ab093ac18 100644 --- a/tapgarden/caretaker.go +++ b/tapgarden/caretaker.go @@ -1,7 +1,6 @@ package tapgarden import ( - "bytes" "context" "fmt" "strings" @@ -10,6 +9,7 @@ import ( "github.com/btcsuite/btcd/btcutil" "github.com/btcsuite/btcd/btcutil/psbt" + "github.com/btcsuite/btcd/chaincfg/chainhash" "github.com/btcsuite/btcd/wire" "github.com/davecgh/go-spew/spew" "github.com/lightninglabs/taproot-assets/asset" @@ -19,6 +19,7 @@ import ( "github.com/lightninglabs/taproot-assets/universe" "github.com/lightningnetwork/lnd/chainntnfs" "golang.org/x/exp/maps" + "golang.org/x/sync/errgroup" ) var ( @@ -853,106 +854,81 @@ func (b *BatchCaretaker) stateStep(currentState BatchState) (BatchState, error) "proofs: %w", err) } + var ( + committedAssets = batchCommitment.CommittedAssets() + numAssets = len(committedAssets) + mintingProofBlobs = make(proof.AssetBlobs, numAssets) + universeItems chan *universe.IssuanceItem + mintTxHash = confInfo.Tx.TxHash() + proofMutex sync.Mutex + batchSyncEG errgroup.Group + ) + + // If we have a universe configured, we'll batch stream the + // issuance items to it. We start this as a goroutine/err group + // now, so we can already start streaming while the proofs are + // still being stored to the local proof store. + if b.cfg.Universe != nil { + universeItems = make( + chan *universe.IssuanceItem, numAssets, + ) + + // We use an error group to simply the error handling of + // a goroutine. + batchSyncEG.Go(func() error { + return b.batchStreamUniverseItems( + ctx, universeItems, numAssets, + ) + }) + } + // Before we confirm the batch, we'll also update the on disk // file system as well. // // TODO(roasbeef): rely on the upsert here instead - mintingProofBlobs := make(proof.AssetBlobs) - for _, newAsset := range batchCommitment.CommittedAssets() { - assetID := newAsset.ID() - scriptPubKey := newAsset.ScriptKey.PubKey - scriptKey := asset.ToSerialized(scriptPubKey) + err = fn.ParSlice( + ctx, committedAssets, + func(ctx context.Context, newAsset *asset.Asset) error { + scriptPubKey := newAsset.ScriptKey.PubKey + scriptKey := asset.ToSerialized(scriptPubKey) - mintingProof := mintingProofs[scriptKey] + mintingProof := mintingProofs[scriptKey] - blob, err := proof.EncodeAsProofFile(mintingProof) - if err != nil { - return 0, fmt.Errorf("unable to encode proof "+ - "file: %w", err) - } - mintingProofBlobs[scriptKey] = blob - - err = b.cfg.ProofFiles.ImportProofs( - ctx, headerVerifier, false, - &proof.AnnotatedProof{ - Locator: proof.Locator{ - AssetID: &assetID, - ScriptKey: *scriptPubKey, - }, - Blob: blob, - }, - ) - if err != nil { - return 0, fmt.Errorf("unable to insert "+ - "proofs: %w", err) - } - - // Before we mark the batch as confirmed below, we'll - // also register the issuance of the new asset with our - // local base universe. - // - // TODO(roasbeef): can combine with minting proof - // creation above? - if b.cfg.Universe != nil { - // The universe ID serves to identifier the - // universe root we want to add this asset to. - // This is either the assetID or the group key. - uniID := universe.Identifier{ - AssetID: assetID, + proofBlob, uniProof, err := b.storeMintingProof( + ctx, newAsset, mintingProof, mintTxHash, + headerVerifier, + ) + if err != nil { + return fmt.Errorf("unable to store "+ + "proof: %w", err) } - groupKey := newAsset.GroupKey - if groupKey != nil { - uniID.GroupKey = &groupKey.GroupPubKey - } + proofMutex.Lock() + mintingProofBlobs[scriptKey] = proofBlob + proofMutex.Unlock() - log.Debugf("Registering asset with "+ - "universe, key=%v", spew.Sdump(uniID)) - - // The base key is the set of bytes that keys - // into the universe, this'll be the outpoint. - // where it was created at and the script key - // for that asset. - baseKey := universe.BaseKey{ - MintingOutpoint: wire.OutPoint{ - Hash: confInfo.Tx.TxHash(), - Index: b.anchorOutputIndex, - }, - ScriptKey: &newAsset.ScriptKey, + if uniProof != nil { + universeItems <- uniProof } - // The universe tree store only the asset state - // transition and not also the proof file - // checksum (as the root is effectively a - // checksum), so we'll use just the state - // transition. - var proofBuf bytes.Buffer - err = mintingProof.Encode(&proofBuf) - if err != nil { - return 0, err - } + return nil + }, + ) + if err != nil { + return 0, fmt.Errorf("unable to update asset proofs: "+ + "%w", err) + } - // With both of those assembled, we can now - // register issuance which takes the amount and - // proof of the minting event. - uniGen := universe.GenesisWithGroup{ - Genesis: newAsset.Genesis, - } - if groupKey != nil { - uniGen.GroupKey = groupKey - } - mintingLeaf := &universe.MintingLeaf{ - GenesisWithGroup: uniGen, - GenesisProof: proofBuf.Bytes(), - Amt: newAsset.Amount, - } - _, err = b.cfg.Universe.RegisterIssuance( - ctx, uniID, baseKey, mintingLeaf, - ) - if err != nil { - return 0, fmt.Errorf("unable to "+ - "register issuance: %v", err) - } + // The local proof store inserts are now completed, but we also + // need to wait for the batch sync to complete before we can + // confirm the batch. + if b.cfg.Universe != nil { + close(universeItems) + + err = batchSyncEG.Wait() + if err != nil { + return 0, fmt.Errorf("unable to batch sync "+ + "universe: %w", err) } } @@ -997,6 +973,130 @@ func (b *BatchCaretaker) stateStep(currentState BatchState) (BatchState, error) } } +// storeMintingProof stores the minting proof for a new asset in the proof +// store. If a universe is configured, it also returns the issuance item that +// can be used to register the asset with the universe. +func (b *BatchCaretaker) storeMintingProof(ctx context.Context, + a *asset.Asset, mintingProof *proof.Proof, mintTxHash chainhash.Hash, + headerVerifier proof.HeaderVerifier) (proof.Blob, + *universe.IssuanceItem, error) { + + assetID := a.ID() + blob, err := proof.EncodeAsProofFile(mintingProof) + if err != nil { + return nil, nil, fmt.Errorf("unable to encode proof file: %w", + err) + } + + err = b.cfg.ProofFiles.ImportProofs( + ctx, headerVerifier, false, &proof.AnnotatedProof{ + Locator: proof.Locator{ + AssetID: &assetID, + ScriptKey: *a.ScriptKey.PubKey, + }, + Blob: blob, + }, + ) + if err != nil { + return nil, nil, fmt.Errorf("unable to insert proofs: %w", err) + } + + // Before we continue with the next item, we'll also register the + // issuance of the new asset with our local base universe. We skip this + // step if there is no universe configured. + if b.cfg.Universe == nil { + return blob, nil, nil + } + + // The universe ID serves to identifier the universe root we want to add + // this asset to. This is either the assetID or the group key. + uniID := universe.Identifier{ + AssetID: assetID, + } + + groupKey := a.GroupKey + if groupKey != nil { + uniID.GroupKey = &groupKey.GroupPubKey + } + + log.Debugf("Preparing asset for registration with universe, key=%v", + spew.Sdump(uniID)) + + // The base key is the set of bytes that keys into the universe, this'll + // be the outpoint where it was created at and the script key for that + // asset. + baseKey := universe.BaseKey{ + MintingOutpoint: wire.OutPoint{ + Hash: mintTxHash, + Index: b.anchorOutputIndex, + }, + ScriptKey: &a.ScriptKey, + } + + // With both of those assembled, we can now register issuance which + // takes the amount and proof of the minting event. + uniGen := universe.GenesisWithGroup{ + Genesis: a.Genesis, + } + if groupKey != nil { + uniGen.GroupKey = groupKey + } + mintingLeaf := &universe.MintingLeaf{ + GenesisWithGroup: uniGen, + + // The universe tree store only the asset state transition and + // not also the proof file checksum (as the root is effectively + // a checksum), so we'll use just the state transition. + GenesisProof: mintingProof, + Amt: a.Amount, + } + + return blob, &universe.IssuanceItem{ + ID: uniID, + Key: baseKey, + Leaf: mintingLeaf, + }, nil +} + +// batchStreamUniverseItems streams the issuance items for a batch to the +// universe. +func (b *BatchCaretaker) batchStreamUniverseItems(ctx context.Context, + universeItems chan *universe.IssuanceItem, numTotal int) error { + + var ( + numItems int + uni = b.cfg.Universe + ) + err := fn.CollectBatch( + ctx, universeItems, b.cfg.UniversePushBatchSize, + func(ctx context.Context, + batch []*universe.IssuanceItem) error { + + numItems += len(batch) + log.Infof("Inserting %d new leaves (%d of %d) into "+ + "local universe", len(batch), numItems, + numTotal) + + err := uni.RegisterNewIssuanceBatch(ctx, batch) + if err != nil { + return fmt.Errorf("unable to register "+ + "issuance batch: %w", err) + } + + log.Infof("Inserted %d new leaves (%d of %d) into "+ + "local universe", len(batch), numItems, + numTotal) + + return nil + }, + ) + if err != nil { + return fmt.Errorf("unable to register issuance proofs: %w", err) + } + + return nil +} + // SortSeedlings sorts the seedling names such that all seedlings that will be // a group anchor are first. func SortSeedlings(seedlings []*Seedling) []string { diff --git a/tapgarden/planter.go b/tapgarden/planter.go index 655691089..8407034d3 100644 --- a/tapgarden/planter.go +++ b/tapgarden/planter.go @@ -1,7 +1,6 @@ package tapgarden import ( - "bytes" "context" "fmt" "sync" @@ -47,11 +46,15 @@ type GardenKit struct { // Universe is used to register new asset issuance with a local/remote // base universe instance. - Universe universe.Registrar + Universe universe.BatchRegistrar // ProofWatcher is used to watch new proofs for their anchor transaction // to be confirmed safely with a minimum number of confirmations. ProofWatcher proof.Watcher + + // UniversePushBatchSize is the number of minted items to push to the + // local universe in a single batch. + UniversePushBatchSize int } // PlanterConfig is the main config for the ChainPlanter. @@ -852,15 +855,6 @@ func (c *ChainPlanter) updateMintingProofs(proofs []*proof.Proof) error { ScriptKey: &p.Asset.ScriptKey, } - // The universe tree stores only the asset state transition and - // not also the proof file checksum (as the root is effectively - // a checksum), so we'll use just the state transition. - var proofBuf bytes.Buffer - err = p.Encode(&proofBuf) - if err != nil { - return err - } - // With both of those assembled, we can now update issuance // which takes the amount and proof of the minting event. uniGen := universe.GenesisWithGroup{ @@ -871,7 +865,7 @@ func (c *ChainPlanter) updateMintingProofs(proofs []*proof.Proof) error { } mintingLeaf := &universe.MintingLeaf{ GenesisWithGroup: uniGen, - GenesisProof: proofBuf.Bytes(), + GenesisProof: p, Amt: p.Asset.Amount, } _, err = c.cfg.Universe.RegisterIssuance( diff --git a/universe/auto_syncer.go b/universe/auto_syncer.go index d74c86b2c..1e4125c9e 100644 --- a/universe/auto_syncer.go +++ b/universe/auto_syncer.go @@ -36,7 +36,7 @@ type FederationConfig struct { // LocalRegistrar is the local register. This'll be used to add new // leaves (minting events) to our local server before pushing them out // to the federation. - LocalRegistrar Registrar + LocalRegistrar BatchRegistrar // SyncInterval is the period that we'll use to synchronize with the // set of Universe servers. @@ -73,6 +73,15 @@ type FederationPushReq struct { err chan error } +// FederationIssuanceBatchPushReq is used to push out a batch of new issuance +// events to all or some members of the federation. +type FederationIssuanceBatchPushReq struct { + IssuanceBatch []*IssuanceItem + + resp chan struct{} + err chan error +} + // FederationEnvoy is used to manage synchronization between the set of // federated Universe servers. It handles the periodic sync between universe // servers, and can also be used to push out new locally created proofs to the @@ -87,13 +96,16 @@ type FederationEnvoy struct { stopOnce sync.Once pushRequests chan *FederationPushReq + + batchPushRequests chan *FederationIssuanceBatchPushReq } // NewFederationEnvoy creates a new federation envoy from the passed config. func NewFederationEnvoy(cfg FederationConfig) *FederationEnvoy { return &FederationEnvoy{ - cfg: cfg, - pushRequests: make(chan *FederationPushReq), + cfg: cfg, + pushRequests: make(chan *FederationPushReq), + batchPushRequests: make(chan *FederationIssuanceBatchPushReq), ContextGuard: &fn.ContextGuard{ DefaultTimeout: DefaultTimeout, Quit: make(chan struct{}), @@ -213,8 +225,8 @@ func (f *FederationEnvoy) syncUniverseState(ctx context.Context, // pushProofToFederation attempts to push out a new proof to the current // federation in parallel. -func (f *FederationEnvoy) pushProofToFederation(uniID Identifier, - newProof *IssuanceProof) { +func (f *FederationEnvoy) pushProofToFederation(uniID Identifier, key BaseKey, + leaf *MintingLeaf) { ctx, cancel := f.WithCtxQuit() defer cancel() @@ -234,7 +246,7 @@ func (f *FederationEnvoy) pushProofToFederation(uniID Identifier, } log.Infof("Pushing new proof to %v federation members, proof_key=%v", - len(fedServers), spew.Sdump(newProof.MintingKey)) + len(fedServers), spew.Sdump(key)) ctx, cancel = f.WithCtxQuitNoTimeout() defer cancel() @@ -250,7 +262,7 @@ func (f *FederationEnvoy) pushProofToFederation(uniID Identifier, } _, err = remoteUniverseServer.RegisterIssuance( - ctx, uniID, newProof.MintingKey, newProof.Leaf, + ctx, uniID, key, leaf, ) return err } @@ -326,13 +338,13 @@ func (f *FederationEnvoy) syncer() { // members. case pushReq := <-f.pushRequests: ctx, cancel := f.WithCtxQuit() - defer cancel() // First, we'll attempt to registrar the issuance with // the local registrar server. newProof, err := f.cfg.LocalRegistrar.RegisterIssuance( ctx, pushReq.ID, pushReq.Key, pushReq.Leaf, ) + cancel() if err != nil { err := fmt.Errorf("unable to insert proof "+ "into local universe: %w", err) @@ -350,7 +362,43 @@ func (f *FederationEnvoy) syncer() { // With the response sent above, we'll push this out to // all the Universe servers in the background. - go f.pushProofToFederation(pushReq.ID, newProof) + go f.pushProofToFederation( + pushReq.ID, pushReq.Key, pushReq.Leaf, + ) + + case pushReq := <-f.batchPushRequests: + ctx, cancel := f.WithCtxQuitNoTimeout() + + // First, we'll attempt to registrar the issuance with + // the local registrar server. + err := f.cfg.LocalRegistrar.RegisterNewIssuanceBatch( + ctx, pushReq.IssuanceBatch, + ) + cancel() + if err != nil { + err := fmt.Errorf("unable to insert proof "+ + "batch into local universe: %w", err) + + log.Warnf(err.Error()) + + pushReq.err <- err + continue + } + + // Now that we know we were able to register the proof, + // we'll return back to the caller. + pushReq.resp <- struct{}{} + + // With the response sent above, we'll push this out to + // all the Universe servers in the background. + go func() { + for idx := range pushReq.IssuanceBatch { + item := pushReq.IssuanceBatch[idx] + f.pushProofToFederation( + item.ID, item.Key, item.Leaf, + ) + } + }() case <-f.Quit: return @@ -364,7 +412,7 @@ func (f *FederationEnvoy) syncer() { // sent to the set of active universe servers. // // NOTE: This is part of the universe.Registrar interface. -func (f *FederationEnvoy) RegisterIssuance(ctx context.Context, id Identifier, +func (f *FederationEnvoy) RegisterIssuance(_ context.Context, id Identifier, key BaseKey, leaf *MintingLeaf) (*IssuanceProof, error) { pushReq := &FederationPushReq{ @@ -382,6 +430,29 @@ func (f *FederationEnvoy) RegisterIssuance(ctx context.Context, id Identifier, return fn.RecvResp(pushReq.resp, pushReq.err, f.Quit) } +// RegisterNewIssuanceBatch inserts a batch of new minting leaves within the +// target universe tree (based on the ID), stored at the base key(s). We assume +// the proofs within the batch have already been checked that they don't yet +// exist in the local database. +// +// NOTE: This is part of the universe.BatchRegistrar interface. +func (f *FederationEnvoy) RegisterNewIssuanceBatch(_ context.Context, + items []*IssuanceItem) error { + + pushReq := &FederationIssuanceBatchPushReq{ + IssuanceBatch: items, + resp: make(chan struct{}, 1), + err: make(chan error, 1), + } + + if !fn.SendOrQuit(f.batchPushRequests, pushReq, f.Quit) { + return fmt.Errorf("unable to push new proof event batch") + } + + _, err := fn.RecvResp(pushReq.resp, pushReq.err, f.Quit) + return err +} + // AddServer adds a new set of servers to the federation, then immediately // performs a new background sync. func (f *FederationEnvoy) AddServer(addrs ...ServerAddr) error { diff --git a/universe/base.go b/universe/base.go index e6a20d853..8d8e0879d 100644 --- a/universe/base.go +++ b/universe/base.go @@ -9,6 +9,7 @@ import ( "github.com/btcsuite/btcd/btcec/v2/schnorr" "github.com/davecgh/go-spew/spew" + "github.com/lightninglabs/taproot-assets/fn" "github.com/lightninglabs/taproot-assets/proof" ) @@ -138,35 +139,20 @@ func (a *MintingArchive) RegisterIssuance(ctx context.Context, id Identifier, log.Debugf("Inserting new proof into Universe: id=%v, base_key=%v", id.StringForLog(), spew.Sdump(key)) - // We first decode the proof to make sure it's at least well-formed. - var newProof proof.Proof - err := newProof.Decode(bytes.NewReader(leaf.GenesisProof)) - if err != nil { - return nil, fmt.Errorf("unable to decode proof: %v", err) - } + newProof := leaf.GenesisProof // We'll first check to see if we already know of this leaf within the // multiverse. If so, then we'll return the existing issuance proof. - issuanceProofs, err := a.cfg.Multiverse.FetchIssuanceProof( - ctx, id, key, - ) + issuanceProofs, err := a.cfg.Multiverse.FetchIssuanceProof(ctx, id, key) switch { case err == nil && len(issuanceProofs) > 0: issuanceProof := issuanceProofs[0] - var existingProof proof.Proof - err := existingProof.Decode( - bytes.NewReader(issuanceProof.Leaf.GenesisProof), - ) - if err != nil { - return nil, fmt.Errorf("unable to decode existing "+ - "proof: %w", err) - } - // The only valid case for an update of a proof is if the mint // TX was re-organized out of the chain. If the block hash is // still the same, we don't see this as an update and just // return the existing proof. + existingProof := issuanceProof.Leaf.GenesisProof if existingProof.BlockHeader.BlockHash() == newProof.BlockHeader.BlockHash() { @@ -188,7 +174,44 @@ func (a *MintingArchive) RegisterIssuance(ctx context.Context, id Identifier, // it as a file first as that's what the expected wants. // // TODO(roasbeef): add option to skip proof verification? - assetSnapshot, err := newProof.Verify(ctx, nil, a.cfg.HeaderVerifier) + assetSnapshot, err := a.verifyIssuanceProof(ctx, id, key, leaf) + if err != nil { + return nil, err + } + + // Now that we know the proof is valid, we'll insert it into the base + // multiverse backend, and return the new issuance proof. + issuanceProof, err := a.cfg.Multiverse.RegisterIssuance( + ctx, id, key, leaf, assetSnapshot.MetaReveal, + ) + if err != nil { + return nil, fmt.Errorf("unable to register new "+ + "issuance: %v", err) + } + + // Log a sync event for the newly inserted leaf in the background as an + // async goroutine. + go func() { + err := a.cfg.UniverseStats.LogNewProofEvent( + context.Background(), id, key, + ) + if err != nil { + log.Warnf("unable to log new proof event (id=%v): %v", + id.StringForLog(), err) + } + }() + + return issuanceProof, nil +} + +// verifyIssuanceProof verifies the passed minting leaf is a valid issuance +// proof, returning the asset snapshot if so. +func (a *MintingArchive) verifyIssuanceProof(ctx context.Context, id Identifier, + key BaseKey, leaf *MintingLeaf) (*proof.AssetSnapshot, error) { + + assetSnapshot, err := leaf.GenesisProof.Verify( + ctx, nil, a.cfg.HeaderVerifier, + ) if err != nil { return nil, fmt.Errorf("unable to verify proof: %v", err) } @@ -223,34 +246,64 @@ func (a *MintingArchive) RegisterIssuance(ctx context.Context, id Identifier, // The script key should also match exactly. case !newAsset.ScriptKey.PubKey.IsEqual(key.ScriptKey.PubKey): - return nil, fmt.Errorf("script key mismatch: expected %v, "+ - "got %v", key.ScriptKey.PubKey.SerializeCompressed(), + return nil, fmt.Errorf("script key mismatch: expected %v, got "+ + "%v", key.ScriptKey.PubKey.SerializeCompressed(), newAsset.ScriptKey.PubKey.SerializeCompressed()) } - // Now that we know the proof is valid, we'll insert it into the base - // multiverse backend, and return the new issuance proof. - issuanceProof, err := a.cfg.Multiverse.RegisterIssuance( - ctx, id, key, leaf, assetSnapshot.MetaReveal, + return assetSnapshot, nil +} + +// RegisterNewIssuanceBatch inserts a batch of new minting leaves within the +// target universe tree (based on the ID), stored at the base key(s). We assume +// the proofs within the batch have already been checked that they don't yet +// exist in the local database. +func (a *MintingArchive) RegisterNewIssuanceBatch(ctx context.Context, + items []*IssuanceItem) error { + + log.Infof("Verifying %d new proofs for insertion into Universe", + len(items)) + + err := fn.ParSlice( + ctx, items, func(ctx context.Context, i *IssuanceItem) error { + assetSnapshot, err := a.verifyIssuanceProof( + ctx, i.ID, i.Key, i.Leaf, + ) + if err != nil { + return err + } + + i.MetaReveal = assetSnapshot.MetaReveal + + return nil + }, ) if err != nil { - return nil, fmt.Errorf("unable to register new "+ - "issuance: %v", err) + return fmt.Errorf("unable to verify issuance proofs: %w", err) + } + + log.Infof("Inserting %d verified proofs into Universe", len(items)) + err = a.cfg.Multiverse.RegisterBatchIssuance(ctx, items) + if err != nil { + return fmt.Errorf("unable to register new issuance proofs: %w", + err) } // Log a sync event for the newly inserted leaf in the background as an // async goroutine. + ids := fn.Map(items, func(item *IssuanceItem) Identifier { + return item.ID + }) go func() { - err := a.cfg.UniverseStats.LogNewProofEvent( - context.Background(), id, key, + err := a.cfg.UniverseStats.LogNewProofEvents( + context.Background(), ids..., ) if err != nil { - log.Warnf("unable to log new proof event (id=%v): %v", - id.StringForLog(), err) + log.Warnf("unable to log new proof events: %v", err) } }() - return issuanceProof, nil + return nil } // FetchIssuanceProof attempts to fetch an issuance proof for the target base diff --git a/universe/interface.go b/universe/interface.go index b1674e30a..e568d4048 100644 --- a/universe/interface.go +++ b/universe/interface.go @@ -1,6 +1,7 @@ package universe import ( + "bytes" "context" "crypto/sha256" "encoding/hex" @@ -88,18 +89,20 @@ type MintingLeaf struct { GenesisWithGroup // GenesisProof is the proof of the newly created asset. - // - // TODO(roasbeef): have instead be a reader? easier to mmap in the - // future - GenesisProof proof.Blob + GenesisProof *proof.Proof // Amt is the amount of units created. Amt uint64 } // SmtLeafNode returns the SMT leaf node for the given minting leaf. -func (m *MintingLeaf) SmtLeafNode() *mssmt.LeafNode { - return mssmt.NewLeafNode(m.GenesisProof[:], m.Amt) +func (m *MintingLeaf) SmtLeafNode() (*mssmt.LeafNode, error) { + var buf bytes.Buffer + if err := m.GenesisProof.Encode(&buf); err != nil { + return nil, err + } + + return mssmt.NewLeafNode(buf.Bytes(), m.Amt), nil } // BaseKey is the top level key for a Base/Root universe. This will be used to @@ -163,14 +166,18 @@ type IssuanceProof struct { // VerifyRoot verifies that the inclusion proof for the root node matches the // specified root. This is useful for sanity checking an issuance proof against // the purported root, and the included leaf. -func (i *IssuanceProof) VerifyRoot(expectedRoot mssmt.Node) bool { +func (i *IssuanceProof) VerifyRoot(expectedRoot mssmt.Node) (bool, error) { + leafNode, err := i.Leaf.SmtLeafNode() + if err != nil { + return false, err + } + reconstructedRoot := i.InclusionProof.Root( - i.MintingKey.UniverseKey(), - i.Leaf.SmtLeafNode(), + i.MintingKey.UniverseKey(), leafNode, ) return mssmt.IsEqualNode(i.UniverseRoot, expectedRoot) && - mssmt.IsEqualNode(reconstructedRoot, expectedRoot) + mssmt.IsEqualNode(reconstructedRoot, expectedRoot), nil } // BaseBackend is the backend storage interface for a base universe. The @@ -238,6 +245,11 @@ type BaseMultiverse interface { leaf *MintingLeaf, metaReveal *proof.MetaReveal) (*IssuanceProof, error) + // RegisterBatchIssuance inserts a new minting leaf batch within the + // multiverse tree and the universe tree that corresponds to the given + // base key(s). + RegisterBatchIssuance(ctx context.Context, items []*IssuanceItem) error + // FetchIssuanceProof returns an issuance proof for the target key. If // the key doesn't have a script key specified, then all the proofs for // the minting outpoint will be returned. If neither are specified, then @@ -258,6 +270,36 @@ type Registrar interface { leaf *MintingLeaf) (*IssuanceProof, error) } +// IssuanceItem is an item that can be used to register a new issuance within a +// base universe. +type IssuanceItem struct { + // ID is the identifier of the base universe that the item should be + // registered within. + ID Identifier + + // Key is the base key that the leaf is or will be stored at. + Key BaseKey + + // Leaf is the minting leaf that was created. + Leaf *MintingLeaf + + // MetaReveal is the meta reveal that was created. + MetaReveal *proof.MetaReveal +} + +// BatchRegistrar is an interface that allows a caller to register a batch of +// issuance items within a base universe. +type BatchRegistrar interface { + Registrar + + // RegisterNewIssuanceBatch inserts a batch of new minting leaves within + // the target universe tree (based on the ID), stored at the base + // key(s). We assume the proofs within the batch have already been + // checked that they don't yet exist in the local database. + RegisterNewIssuanceBatch(ctx context.Context, + items []*IssuanceItem) error +} + const ( // DefaultUniverseRPCPort is the default port that the universe RPC is // hosted on. @@ -658,11 +700,18 @@ type Telemetry interface { LogSyncEvent(ctx context.Context, uniID Identifier, key BaseKey) error + // LogSyncEvents logs sync events for the target universe. + LogSyncEvents(ctx context.Context, uniIDs ...Identifier) error + // LogNewProofEvent logs a new proof insertion event for the target // universe. LogNewProofEvent(ctx context.Context, uniID Identifier, key BaseKey) error + // LogNewProofEvents logs new proof insertion events for the target + // universe. + LogNewProofEvents(ctx context.Context, uniIDs ...Identifier) error + // QuerySyncStats attempts to query the stats for the target universe. // For a given asset ID, tag, or type, the set of universe stats is // returned which lists information such as the total number of syncs diff --git a/universe/syncer.go b/universe/syncer.go index d3b2780bb..f931472ff 100644 --- a/universe/syncer.go +++ b/universe/syncer.go @@ -4,10 +4,12 @@ import ( "context" "errors" "fmt" + "sync/atomic" "github.com/davecgh/go-spew/spew" "github.com/lightninglabs/taproot-assets/fn" "github.com/lightninglabs/taproot-assets/mssmt" + "golang.org/x/sync/errgroup" ) var ( @@ -30,13 +32,21 @@ type SimpleSyncCfg struct { // LocalRegistrar is the registrar tied to a local Universe instance. // This is used to insert new proof into the local DB as a result of // the diff operation. - LocalRegistrar Registrar + LocalRegistrar BatchRegistrar + + // SyncBatchSize is the number of items to sync in a single batch. + SyncBatchSize int } // SimpleSyncer is a simple implementation of the Syncer interface. It's based // on a set difference operation between the local and remote Universe. type SimpleSyncer struct { cfg SimpleSyncCfg + + // isSyncing keeps track of whether we're currently syncing the local + // Universe with a remote Universe. This is used to prevent concurrent + // syncs. + isSyncing atomic.Bool } // NewSimpleSyncer creates a new SimpleSyncer instance. @@ -52,8 +62,18 @@ func NewSimpleSyncer(cfg SimpleSyncCfg) *SimpleSyncer { func (s *SimpleSyncer) executeSync(ctx context.Context, diffEngine DiffEngine, syncType SyncType, idsToSync []Identifier) ([]AssetSyncDiff, error) { + // Prevent the syncer from running twice. + if !s.isSyncing.CompareAndSwap(false, true) { + return nil, fmt.Errorf("sync is already in progress, please " + + "wait for it to finish") + } + + defer func() { + s.isSyncing.Store(false) + }() + var ( - rootsToSync = make(chan BaseRoot, len(idsToSync)) + targetRoots []BaseRoot err error ) switch { @@ -66,9 +86,9 @@ func (s *SimpleSyncer) executeSync(ctx context.Context, diffEngine DiffEngine, // We'll use an error group to fetch each Universe root we need // as a series of parallel requests backed by a worker pool. - // - // TODO(roasbeef): can actually make non-blocking.. - err = fn.ParSlice(ctx, idsToSync, + rootsToSync := make(chan BaseRoot, len(idsToSync)) + err = fn.ParSlice( + ctx, idsToSync, func(ctx context.Context, id Identifier) error { root, err := diffEngine.RootNode(ctx, id) if err != nil { @@ -79,26 +99,21 @@ func (s *SimpleSyncer) executeSync(ctx context.Context, diffEngine DiffEngine, return nil }, ) + if err != nil { + return nil, fmt.Errorf("unable to fetch roots for "+ + "universe sync: %w", err) + } + + targetRoots = fn.Collect(rootsToSync) // Otherwise, we'll just fetch all the roots from the remote universe. default: log.Infof("Fetching all roots for remote Universe server...") - roots, err := diffEngine.RootNodes(ctx) + targetRoots, err = diffEngine.RootNodes(ctx) if err != nil { return nil, err } - - rootsToSync = make(chan BaseRoot, len(roots)) - - // TODO(roasbeef): can make non-blocking w/ goroutine - fn.SendAll(rootsToSync, roots...) } - if err != nil { - return nil, fmt.Errorf("unable to fetch roots for "+ - "universe sync: %w", err) - } - - targetRoots := fn.Collect(rootsToSync) log.Infof("Obtained %v roots from remote Universe server", len(targetRoots)) @@ -108,60 +123,92 @@ func (s *SimpleSyncer) executeSync(ctx context.Context, diffEngine DiffEngine, // Now that we know the set of Universes we need to sync, we'll execute // the diff operation for each of them. syncDiffs := make(chan AssetSyncDiff, len(targetRoots)) - err = fn.ParSlice(ctx, targetRoots, func(ctx context.Context, remoteRoot BaseRoot) error { - // First, we'll compare the remote root against the local root. - uniID := remoteRoot.ID - localRoot, err := s.cfg.LocalDiffEngine.RootNode(ctx, uniID) - switch { - // If we don't have this root, then we don't have anything to - // compare to so we'll proceed as normal. - case errors.Is(err, ErrNoUniverseRoot): - // TODO(roasbeef): abstraction leak, error should be in - // universe package - - // If the local root matches the remote root, then we're done - // here. - case err == nil && mssmt.IsEqualNode(localRoot, remoteRoot): - log.Infof("Root for %v matches, no sync needed", - uniID.String()) + err = fn.ParSlice( + ctx, targetRoots, func(ctx context.Context, r BaseRoot) error { + return s.syncRoot(ctx, r, diffEngine, syncDiffs) + }, + ) + if err != nil { + return nil, err + } - return nil + // Finally, we'll collect all the diffs and return them to the caller. + return fn.Collect(syncDiffs), nil +} - case err != nil: - return fmt.Errorf("unable to fetch local root: %v", err) - } +// syncRoot attempts to sync the local Universe with the remote diff engine for +// a specific base root. +func (s *SimpleSyncer) syncRoot(ctx context.Context, remoteRoot BaseRoot, + diffEngine DiffEngine, result chan<- AssetSyncDiff) error { - log.Infof("UniverseRoot(%v) diverges, performing leaf diff...", + // First, we'll compare the remote root against the local root. + uniID := remoteRoot.ID + localRoot, err := s.cfg.LocalDiffEngine.RootNode(ctx, uniID) + switch { + // If we don't have this root, then we don't have anything to compare + // to, so we'll proceed as normal. + case errors.Is(err, ErrNoUniverseRoot): + // TODO(roasbeef): abstraction leak, error should be in + // universe package + + // If the local root matches the remote root, then we're done here. + case err == nil && mssmt.IsEqualNode(localRoot, remoteRoot): + log.Infof("Root for %v matches, no sync needed", uniID.String()) - // Otherwise, we'll need to perform a diff operation to find - // the set of keys we need to fetch. - remoteUnikeys, err := diffEngine.MintingKeys(ctx, uniID) - if err != nil { - return err - } - localUnikeys, err := s.cfg.LocalDiffEngine.MintingKeys( - ctx, uniID, + return nil + + case err != nil: + return fmt.Errorf("unable to fetch local root: %v", err) + } + + log.Infof("UniverseRoot(%v) diverges, performing leaf diff...", + uniID.String()) + + // Otherwise, we'll need to perform a diff operation to find the set of + // keys we need to fetch. + remoteUniKeys, err := diffEngine.MintingKeys(ctx, uniID) + if err != nil { + return err + } + localUniKeys, err := s.cfg.LocalDiffEngine.MintingKeys(ctx, uniID) + if err != nil { + return err + } + + // With the set of keys fetched, we can now find the set of keys that + // need to be synced. + keysToFetch := fn.SetDiff(remoteUniKeys, localUniKeys) + + log.Infof("UniverseRoot(%v): diff_size=%v", uniID.String(), + len(keysToFetch)) + log.Tracef("UniverseRoot(%v): diff_size=%v, diff=%v", uniID.String(), + len(keysToFetch), spew.Sdump(keysToFetch)) + + // Before we start fetching leaves, we already start our batch stream + // for the new leaves. This allows us to stream the new leaves to the + // local registrar as they're fetched. + var ( + fetchedLeaves = make(chan *IssuanceItem, len(keysToFetch)) + newLeafProofs []*MintingLeaf + batchSyncEG errgroup.Group + ) + + // We use an error group to simply the error handling of a goroutine. + batchSyncEG.Go(func() error { + newLeafProofs, err = s.batchStreamNewItems( + ctx, uniID, fetchedLeaves, len(keysToFetch), ) - if err != nil { - return err - } + return err + }) - // With the set of keys fetched, we can now find the set of - // keys that need to be synced. - keysToFetch := fn.SetDiff(remoteUnikeys, localUnikeys) - - log.Infof("UniverseRoot(%v): diff_size=%v", uniID.String(), - len(keysToFetch)) - log.Tracef("UniverseRoot(%v): diff_size=%v, diff=%v", - uniID.String(), len(keysToFetch), - spew.Sdump(keysToFetch)) - - // Now that we know where the divergence is, we can fetch the - // issuance proofs from the remote party. - newLeaves := make(chan *MintingLeaf, len(keysToFetch)) - err = fn.ParSlice(ctx, keysToFetch, func(ctx context.Context, key BaseKey) error { - newProof, err := diffEngine.FetchIssuanceProof(ctx, uniID, key) + // Now that we know where the divergence is, we can fetch the issuance + // proofs from the remote party. + err = fn.ParSlice( + ctx, keysToFetch, func(ctx context.Context, key BaseKey) error { + newProof, err := diffEngine.FetchIssuanceProof( + ctx, uniID, key, + ) if err != nil { return err } @@ -171,7 +218,12 @@ func (s *SimpleSyncer) executeSync(ctx context.Context, diffEngine DiffEngine, // Now that we have this leaf proof, we want to ensure // that it's actually part of the remote root we were // given. - if !leafProof.VerifyRoot(remoteRoot) { + validRoot, err := leafProof.VerifyRoot(remoteRoot) + if err != nil { + return fmt.Errorf("unable to verify root: %w", + err) + } + if !validRoot { return fmt.Errorf("proof for key=%v is "+ "invalid", spew.Sdump(key)) } @@ -179,55 +231,92 @@ func (s *SimpleSyncer) executeSync(ctx context.Context, diffEngine DiffEngine, // TODO(roasbeef): inclusion w/ root here, also that // it's the expected asset ID - log.Infof("UniverseRoot(%v): inserting new leaf", - uniID.String()) - log.Tracef("UniverseRoot(%v): inserting new leaf for "+ - "key=%v", uniID.String(), spew.Sdump(key)) - - // TODO(roasbeef): this is actually giving a lagging - // proof for each of them - _, err = s.cfg.LocalRegistrar.RegisterIssuance( - ctx, uniID, key, leafProof.Leaf, - ) - if err != nil { - return fmt.Errorf("unable to register "+ - "issuance proof: %w", err) + fetchedLeaves <- &IssuanceItem{ + ID: uniID, + Key: key, + Leaf: leafProof.Leaf, } - newLeaves <- leafProof.Leaf return nil - }) - if err != nil { - return err - } + }, + ) + if err != nil { + return err + } - log.Infof("Universe sync for UniverseRoot(%v) complete, %d "+ - "new leaves inserted", uniID.String(), len(keysToFetch)) + // And now we wait for the batch streamer to finish as well. + close(fetchedLeaves) + err = batchSyncEG.Wait() + if err != nil { + return err + } - // TODO(roabseef): sanity check local and remote roots match - // now? + log.Infof("Universe sync for UniverseRoot(%v) complete, %d "+ + "new leaves inserted", uniID.String(), len(keysToFetch)) - // To wrap up, we'll collect the set of leaves then convert - // them into a final sync diff. - syncDiffs <- AssetSyncDiff{ - OldUniverseRoot: localRoot, - NewUniverseRoot: remoteRoot, - NewLeafProofs: fn.Collect(newLeaves), - } + // TODO(roabseef): sanity check local and remote roots match now? - log.Infof("Sync for UniverseRoot(%v) complete!", uniID.String()) - log.Tracef("Sync for UniverseRoot(%v) complete! New "+ - "universe_root=%v", uniID.String(), - spew.Sdump(remoteRoot)) + // To wrap up, we'll collect the set of leaves then convert them into a + // final sync diff. + result <- AssetSyncDiff{ + OldUniverseRoot: localRoot, + NewUniverseRoot: remoteRoot, + NewLeafProofs: newLeafProofs, + } - return nil - }) + log.Infof("Sync for UniverseRoot(%v) complete!", uniID.String()) + log.Tracef("Sync for UniverseRoot(%v) complete! New "+ + "universe_root=%v", uniID.String(), spew.Sdump(remoteRoot)) + + return nil +} + +// batchStreamNewItems streams the set of new items to the local registrar in +// batches and returns the new leaf proofs. +func (s *SimpleSyncer) batchStreamNewItems(ctx context.Context, + uniID Identifier, fetchedLeaves chan *IssuanceItem, + numTotal int) ([]*MintingLeaf, error) { + + var ( + numItems int + newLeafProofs []*MintingLeaf + ) + err := fn.CollectBatch( + ctx, fetchedLeaves, s.cfg.SyncBatchSize, + func(ctx context.Context, batch []*IssuanceItem) error { + numItems += len(batch) + log.Infof("UniverseRoot(%v): Inserting %d new leaves "+ + "(%d of %d)", uniID.String(), len(batch), + numItems, numTotal) + + err := s.cfg.LocalRegistrar.RegisterNewIssuanceBatch( + ctx, batch, + ) + if err != nil { + return fmt.Errorf("unable to register "+ + "issuance proofs: %w", err) + } + + log.Infof("UniverseRoot(%v): Inserted %d new leaves "+ + "(%d of %d)", uniID.String(), len(batch), + numItems, numTotal) + + newLeaves := fn.Map( + batch, func(i *IssuanceItem) *MintingLeaf { + return i.Leaf + }, + ) + newLeafProofs = append(newLeafProofs, newLeaves...) + + return nil + }, + ) if err != nil { - return nil, err + return nil, fmt.Errorf("unable to register issuance proofs: %w", + err) } - // Finally, we'll collect all the diffs and return them to the caller. - return fn.Collect(syncDiffs), nil + return newLeafProofs, nil } // SyncUniverse attempts to synchronize the local universe with the remote