diff --git a/fn/func.go b/fn/func.go index 5f0fc3c7b..3f330e9b3 100644 --- a/fn/func.go +++ b/fn/func.go @@ -130,6 +130,18 @@ func All[T any](xs []T, pred func(T) bool) bool { return true } +// AllMapItems returns true if the passed predicate returns true for all items +// in the map. +func AllMapItems[T any, K comparable](xs map[K]T, pred func(T) bool) bool { + for i := range xs { + if !pred(xs[i]) { + return false + } + } + + return true +} + // Any returns true if the passed predicate returns true for any item in the // slice. func Any[T any](xs []T, pred func(T) bool) bool { @@ -142,12 +154,30 @@ func Any[T any](xs []T, pred func(T) bool) bool { return false } +// AnyMapItem returns true if the passed predicate returns true for any item in +// the map. +func AnyMapItem[T any, K comparable](xs map[K]T, pred func(T) bool) bool { + for i := range xs { + if pred(xs[i]) { + return true + } + } + + return false +} + // NotAny returns true if the passed predicate returns false for all items in // the slice. func NotAny[T any](xs []T, pred func(T) bool) bool { return !Any(xs, pred) } +// NotAnyMapItem returns true if the passed predicate returns false for all +// items in the map. +func NotAnyMapItem[T any, K comparable](xs map[K]T, pred func(T) bool) bool { + return !AnyMapItem(xs, pred) +} + // Count returns the number of items in the slice that match the predicate. func Count[T any](xs []T, pred func(T) bool) int { var count int @@ -161,6 +191,20 @@ func Count[T any](xs []T, pred func(T) bool) int { return count } +// CountMapItems returns the number of items in the map that match the +// predicate. +func CountMapItems[T any, K comparable](xs map[K]T, pred func(T) bool) int { + var count int + + for i := range xs { + if pred(xs[i]) { + count++ + } + } + + return count +} + // First returns the first item in the slice that matches the predicate, or an // error if none matches. func First[T any](xs []*T, pred func(*T) bool) (*T, error) { diff --git a/fn/iter.go b/fn/iter.go index 9e7c8dc23..c82d41639 100644 --- a/fn/iter.go +++ b/fn/iter.go @@ -7,10 +7,8 @@ package fn // This function can be used instead of the normal range loop to ensure that a // loop scoping bug isn't introduced. func ForEachErr[T any](s []T, f func(T) error) error { - for _, item := range s { - item := item - - if err := f(item); err != nil { + for i := range s { + if err := f(s[i]); err != nil { return err } } @@ -22,9 +20,17 @@ func ForEachErr[T any](s []T, f func(T) error) error { // This can be used to ensure that any normal for-loop don't run into bugs due // to loop variable scoping. func ForEach[T any](items []T, f func(T)) { - for _, item := range items { - item := item - f(item) + for i := range items { + f(items[i]) + } +} + +// ForEachMapItem is a generic implementation of a for-each (map with side +// effects). This can be used to ensure that any normal for-loop don't run into +// bugs due to loop variable scoping. +func ForEachMapItem[T any, K comparable](items map[K]T, f func(T)) { + for i := range items { + f(items[i]) } } @@ -38,6 +44,14 @@ func Enumerate[T any](items []T, f func(int, T)) { } } +// EnumerateMap is a generic enumeration function. The closure will be called +// for each key and item in the passed-in map. +func EnumerateMap[T any, K comparable](items map[K]T, f func(K, T)) { + for key := range items { + f(key, items[key]) + } +} + // MakeSlice is a generic function shorthand for making a slice out of a set // of elements. This can be used to avoid having to specify the type of the // slice as well as the types of the elements. diff --git a/itest/addrs_test.go b/itest/addrs_test.go index 657cafc64..b03ddfbc9 100644 --- a/itest/addrs_test.go +++ b/itest/addrs_test.go @@ -8,6 +8,7 @@ import ( tap "github.com/lightninglabs/taproot-assets" "github.com/lightninglabs/taproot-assets/fn" "github.com/lightninglabs/taproot-assets/internal/test" + "github.com/lightninglabs/taproot-assets/proof" "github.com/lightninglabs/taproot-assets/tappsbt" "github.com/lightninglabs/taproot-assets/taprpc" wrpc "github.com/lightninglabs/taproot-assets/taprpc/assetwalletrpc" @@ -512,6 +513,8 @@ func runMultiSendTest(ctxt context.Context, t *harnessTest, alice, require.NoError(t.t, err) } +// sendProof manually exports a proof from the given source node and imports it +// using the development only ImportProof RPC on the destination node. func sendProof(t *harnessTest, src, dst *tapdHarness, scriptKey []byte, genInfo *taprpc.GenesisInfo) *tapdevrpc.ImportProofResponse { @@ -543,6 +546,85 @@ func sendProof(t *harnessTest, src, dst *tapdHarness, scriptKey []byte, return importResp } +// sendProofUniRPC manually exports a proof from the given source node and +// imports it using the universe related InsertProof RPC on the destination +// node. +func sendProofUniRPC(t *harnessTest, src, dst *tapdHarness, scriptKey []byte, + genInfo *taprpc.GenesisInfo) *unirpc.AssetProofResponse { + + ctxb := context.Background() + + var proofResp *taprpc.ProofFile + waitErr := wait.NoError(func() error { + resp, err := src.ExportProof(ctxb, &taprpc.ExportProofRequest{ + AssetId: genInfo.AssetId, + ScriptKey: scriptKey, + }) + if err != nil { + return err + } + + proofResp = resp + return nil + }, defaultWaitTimeout) + require.NoError(t.t, waitErr) + + t.Logf("Importing proof %x using InsertProof", proofResp.RawProofFile) + + f := proof.File{} + err := f.Decode(bytes.NewReader(proofResp.RawProofFile)) + require.NoError(t.t, err) + + lastProof, err := f.LastProof() + require.NoError(t.t, err) + + var lastProofBytes bytes.Buffer + err = lastProof.Encode(&lastProofBytes) + require.NoError(t.t, err) + asset := lastProof.Asset + + proofType := universe.ProofTypeTransfer + if asset.IsGenesisAsset() { + proofType = universe.ProofTypeIssuance + } + + uniID := universe.Identifier{ + AssetID: asset.ID(), + ProofType: proofType, + } + if asset.GroupKey != nil { + uniID.GroupKey = &asset.GroupKey.GroupPubKey + } + + rpcUniID, err := tap.MarshalUniID(uniID) + require.NoError(t.t, err) + + outpoint := &unirpc.Outpoint{ + HashStr: lastProof.AnchorTx.TxHash().String(), + Index: int32(lastProof.InclusionProof.OutputIndex), + } + + importResp, err := dst.InsertProof(ctxb, &unirpc.AssetProof{ + Key: &unirpc.UniverseKey{ + Id: rpcUniID, + LeafKey: &unirpc.AssetKey{ + Outpoint: &unirpc.AssetKey_Op{ + Op: outpoint, + }, + ScriptKey: &unirpc.AssetKey_ScriptKeyBytes{ + ScriptKeyBytes: scriptKey, + }, + }, + }, + AssetLeaf: &unirpc.AssetLeaf{ + Proof: lastProofBytes.Bytes(), + }, + }) + require.NoError(t.t, err) + + return importResp +} + // sendAssetsToAddr spends the given input asset and sends the amount specified // in the address to the Taproot output derived from the address. func sendAssetsToAddr(t *harnessTest, sender *tapdHarness, diff --git a/itest/send_test.go b/itest/send_test.go index fea93bbd3..a0fa47b8e 100644 --- a/itest/send_test.go +++ b/itest/send_test.go @@ -1374,6 +1374,67 @@ func testSendMultipleCoins(t *harnessTest) { AssertNonInteractiveRecvComplete(t.t, secondTapd, 5) } +// testSendNoCourierUniverseImport tests that we can send assets to a node that +// has no courier, and then manually transfer the proof to the receiving using +// the universe proof import RPC method. +func testSendNoCourierUniverseImport(t *harnessTest) { + ctxb := context.Background() + + // First, we'll make a normal assets with enough units. + rpcAssets := MintAssetsConfirmBatch( + t.t, t.lndHarness.Miner.Client, t.tapd, + []*mintrpc.MintAssetRequest{simpleAssets[0]}, + ) + + firstAsset := rpcAssets[0] + genInfo := firstAsset.AssetGenesis + + // Now that we have the asset created, we'll make a new node that'll + // serve as the node which'll receive the assets. We turn off the proof + // courier by supplying a dummy implementation. + secondTapd := setupTapdHarness( + t.t, t, t.lndHarness.Bob, t.universeServer, + func(params *tapdHarnessParams) { + params.proofCourier = &proof.MockProofCourier{} + }, + ) + defer func() { + require.NoError(t.t, secondTapd.stop(!*noDelete)) + }() + + // Next, we'll attempt to transfer some amount of assets[0] to the + // receiving node. + numUnitsSend := uint64(1200) + + // Get a new address (which accepts the first asset) from the + // receiving node. + receiveAddr, err := secondTapd.NewAddr(ctxb, &taprpc.NewAddrRequest{ + AssetId: genInfo.AssetId, + Amt: numUnitsSend, + }) + require.NoError(t.t, err) + AssertAddrCreated(t.t, secondTapd, firstAsset, receiveAddr) + + // Send the assets to the receiving node. + sendResp := sendAssetsToAddr(t, t.tapd, receiveAddr) + + // Assert that the outbound transfer was confirmed. + expectedAmtAfterSend := firstAsset.Amount - numUnitsSend + ConfirmAndAssertOutboundTransfer( + t.t, t.lndHarness.Miner.Client, t.tapd, sendResp, + genInfo.AssetId, + []uint64{expectedAmtAfterSend, numUnitsSend}, 0, 1, + ) + + // Since we disabled proof couriers, we need to manually transfer the + // proof from the sender to the receiver now. We use the universe RPC + // InsertProof method to do this. + sendProofUniRPC(t, t.tapd, secondTapd, receiveAddr.ScriptKey, genInfo) + + // And now, the transfer should be completed on the receiver side too. + AssertNonInteractiveRecvComplete(t.t, secondTapd, 1) +} + // addProofTestVectorFromFile adds a proof test vector by extracting it from the // proof file found at the given asset ID and script key. func addProofTestVectorFromFile(t *testing.T, testName string, diff --git a/itest/test_list_on_test.go b/itest/test_list_on_test.go index 316623a7b..7ff7ca92f 100644 --- a/itest/test_list_on_test.go +++ b/itest/test_list_on_test.go @@ -76,6 +76,10 @@ var testCases = []*testCase{ test: testOfflineReceiverEventuallyReceives, proofCourierType: proof.HashmailCourierType, }, + { + name: "addr send no proof courier with local universe import", + test: testSendNoCourierUniverseImport, + }, { name: "basic send passive asset", test: testBasicSendPassiveAsset, @@ -179,6 +183,10 @@ var testCases = []*testCase{ name: "universe sync", test: testUniverseSync, }, + { + name: "universe sync manual insert", + test: testUniverseManualSync, + }, { name: "universe federation", test: testUniverseFederation, diff --git a/itest/universe_test.go b/itest/universe_test.go index 57a7b509e..4fbae3a34 100644 --- a/itest/universe_test.go +++ b/itest/universe_test.go @@ -292,6 +292,88 @@ func testUniverseSync(t *harnessTest) { ) } +// testUniverseManualSync tests that we're able to insert proofs manually into +// a universe instead of using a full sync. +func testUniverseManualSync(t *harnessTest) { + miner := t.lndHarness.Miner.Client + + // First, we'll create out usual set of issuable assets. + rpcIssuableAssets := MintAssetsConfirmBatch( + t.t, miner, t.tapd, issuableAssets, + ) + + // With those assets created, we'll now create a new node that we'll + // use to exercise the manual Universe sync. + bob := setupTapdHarness( + t.t, t, t.lndHarness.Bob, t.universeServer, + func(params *tapdHarnessParams) { + params.noDefaultUniverseSync = true + }, + ) + defer func() { + require.NoError(t.t, bob.stop(!*noDelete)) + }() + + ctxb := context.Background() + ctxt, cancel := context.WithTimeout(ctxb, defaultWaitTimeout) + defer cancel() + + // We now side load the issuance proof of our first asset into Bob's + // universe. + firstAsset := rpcIssuableAssets[0] + firstAssetGen := firstAsset.AssetGenesis + sendProofUniRPC(t, t.tapd, bob, firstAsset.ScriptKey, firstAssetGen) + + // We should also be able to fetch an asset from Bob's Universe, and + // query for that asset with the compressed script key. + firstOutpoint, err := tap.UnmarshalOutpoint( + firstAsset.ChainAnchor.AnchorOutpoint, + ) + require.NoError(t.t, err) + + firstAssetProofQuery := unirpc.UniverseKey{ + Id: &unirpc.ID{ + Id: &unirpc.ID_GroupKey{ + GroupKey: firstAsset.AssetGroup.TweakedGroupKey, + }, + ProofType: unirpc.ProofType_PROOF_TYPE_ISSUANCE, + }, + LeafKey: &unirpc.AssetKey{ + Outpoint: &unirpc.AssetKey_Op{ + Op: &unirpc.Outpoint{ + HashStr: firstOutpoint.Hash.String(), + Index: int32(firstOutpoint.Index), + }, + }, + ScriptKey: &unirpc.AssetKey_ScriptKeyBytes{ + ScriptKeyBytes: firstAsset.ScriptKey, + }, + }, + } + + // We should now be able to query for the asset proof. + _, err = bob.QueryProof(ctxt, &firstAssetProofQuery) + require.NoError(t.t, err) + + // We should now also be able to fetch the meta data and group key for + // the asset. + metaData, err := bob.FetchAssetMeta(ctxt, &taprpc.FetchAssetMetaRequest{ + Asset: &taprpc.FetchAssetMetaRequest_MetaHash{ + MetaHash: firstAssetGen.MetaHash, + }, + }) + require.NoError(t.t, err) + require.Equal(t.t, firstAssetGen.MetaHash, metaData.MetaHash) + + // We should be able to create a new address for the asset, since that + // requires us to know the full genesis and group key. + _, err = bob.NewAddr(ctxt, &taprpc.NewAddrRequest{ + AssetId: firstAssetGen.AssetId, + Amt: 500, + }) + require.NoError(t.t, err) +} + // unmarshalMerkleSumNode un-marshals a protobuf MerkleSumNode. func unmarshalMerkleSumNode(root *unirpc.MerkleSumNode) mssmt.Node { var nodeHash mssmt.NodeHash diff --git a/proof/archive.go b/proof/archive.go index 9dd38f472..9530a616f 100644 --- a/proof/archive.go +++ b/proof/archive.go @@ -107,6 +107,11 @@ type Archiver interface { // returned. FetchProof(ctx context.Context, id Locator) (Blob, error) + // HasProof returns true if the proof for the given locator exists. This + // is intended to be a performance optimized lookup compared to fetching + // a proof and checking for ErrProofNotFound. + HasProof(ctx context.Context, id Locator) (bool, error) + // FetchProofs fetches all proofs for assets uniquely identified by the // passed asset ID. FetchProofs(ctx context.Context, id asset.ID) ([]*AnnotatedProof, error) @@ -125,11 +130,103 @@ type Archiver interface { // NotifyArchiver is an Archiver that also allows callers to subscribe to // notifications about new proofs being added to the archiver. type NotifyArchiver interface { - Archiver + // FetchProof fetches a proof for an asset uniquely identified by the + // passed Identifier. The returned blob is expected to be the encoded + // full proof file, containing the complete provenance of the asset. + // + // If a proof cannot be found, then ErrProofNotFound should be returned. + FetchProof(ctx context.Context, id Locator) (Blob, error) fn.EventPublisher[Blob, []*Locator] } +// MultiArchiveNotifier is a NotifyArchiver that wraps several other archives +// and notifies subscribers about new proofs that are added to any of the +// archives. +type MultiArchiveNotifier struct { + archives []NotifyArchiver +} + +// NewMultiArchiveNotifier creates a new MultiArchiveNotifier based on the set +// of specified backends. +func NewMultiArchiveNotifier(archives ...NotifyArchiver) *MultiArchiveNotifier { + return &MultiArchiveNotifier{ + archives: archives, + } +} + +// FetchProof fetches a proof for an asset uniquely identified by the passed +// Identifier. The returned proof can either be a full proof file or just a +// single proof. +// +// If a proof cannot be found, then ErrProofNotFound should be returned. +// +// NOTE: This is part of the NotifyArchiver interface. +func (m *MultiArchiveNotifier) FetchProof(ctx context.Context, + id Locator) (Blob, error) { + + for idx := range m.archives { + a := m.archives[idx] + + proofBlob, err := a.FetchProof(ctx, id) + if errors.Is(err, ErrProofNotFound) { + // Try the next archive. + continue + } else if err != nil { + return nil, fmt.Errorf("error fetching proof "+ + "from archive: %w", err) + } + + return proofBlob, nil + } + + return nil, ErrProofNotFound +} + +// RegisterSubscriber adds a new subscriber for receiving events. The +// registration request is forwarded to all registered archives. +func (m *MultiArchiveNotifier) RegisterSubscriber( + receiver *fn.EventReceiver[Blob], deliverExisting bool, + deliverFrom []*Locator) error { + + for idx := range m.archives { + a := m.archives[idx] + + err := a.RegisterSubscriber( + receiver, deliverExisting, deliverFrom, + ) + if err != nil { + return fmt.Errorf("error registering subscriber: %w", + err) + } + } + + return nil +} + +// RemoveSubscriber removes the given subscriber and also stops it from +// processing events. The removal request is forwarded to all registered +// archives. +func (m *MultiArchiveNotifier) RemoveSubscriber( + subscriber *fn.EventReceiver[Blob]) error { + + for idx := range m.archives { + a := m.archives[idx] + + err := a.RemoveSubscriber(subscriber) + if err != nil { + return fmt.Errorf("error removing subscriber: "+ + "%w", err) + } + } + + return nil +} + +// A compile-time interface to ensure MultiArchiveNotifier meets the +// NotifyArchiver interface. +var _ NotifyArchiver = (*MultiArchiveNotifier)(nil) + // FileArchiver implements proof Archiver backed by an on-disk file system. The // archiver takes a single root directory then creates the following overlap // mapping: @@ -216,6 +313,22 @@ func (f *FileArchiver) FetchProof(_ context.Context, id Locator) (Blob, error) { return proofFile, nil } +// HasProof returns true if the proof for the given locator exists. This is +// intended to be a performance optimized lookup compared to fetching a proof +// and checking for ErrProofNotFound. +func (f *FileArchiver) HasProof(_ context.Context, id Locator) (bool, error) { + // All our on-disk storage is based on asset IDs, so to look up a path, + // we just need to compute the full file path and see if it exists on + // disk. + proofPath, err := genProofFilePath(f.proofPath, id) + if err != nil { + return false, fmt.Errorf("unable to make proof file path: %w", + err) + } + + return lnrpc.FileExists(proofPath), nil +} + // FetchProofs fetches all proofs for assets uniquely identified by the passed // asset ID. func (f *FileArchiver) FetchProofs(_ context.Context, @@ -407,6 +520,27 @@ func (m *MultiArchiver) FetchProof(ctx context.Context, return nil, ErrProofNotFound } +// HasProof returns true if the proof for the given locator exists. This is +// intended to be a performance optimized lookup compared to fetching a proof +// and checking for ErrProofNotFound. The multi archiver only considers a proof +// to be present if all backends have it. +func (m *MultiArchiver) HasProof(ctx context.Context, id Locator) (bool, error) { + for _, archive := range m.backends { + ok, err := archive.HasProof(ctx, id) + if err != nil { + return false, err + } + + // We are expecting all backends to have the proof, otherwise we + // consider the proof not to be found. + if !ok { + return false, nil + } + } + + return true, nil +} + // FetchProofs fetches all proofs for assets uniquely identified by the passed // asset ID. func (m *MultiArchiver) FetchProofs(ctx context.Context, diff --git a/proof/courier.go b/proof/courier.go index 36a9f6a03..b31517c7d 100644 --- a/proof/courier.go +++ b/proof/courier.go @@ -1137,36 +1137,24 @@ func (c *UniverseRpcCourier) DeliverProof(ctx context.Context, func (c *UniverseRpcCourier) ReceiveProof(ctx context.Context, originLocator Locator) (*AnnotatedProof, error) { - // In order to reconstruct the proof file we must collect all the - // transition proofs that make up the main chain of proofs. That is - // accomplished by iterating backwards through the main chain of proofs - // until we reach the genesis point (minting proof). - - // We will update the locator at each iteration. - loc := originLocator - - // revProofs is a slice of transition proofs ordered from latest to - // earliest (the issuance proof comes last in the slice). This ordering - // is a reversal of that found in the proof file. - var revProofs []Proof - - for { - assetID := *loc.AssetID - + fetchProof := func(ctx context.Context, loc Locator) (Blob, error) { var groupKeyBytes []byte if loc.GroupKey != nil { groupKeyBytes = loc.GroupKey.SerializeCompressed() } - universeID := unirpc.MarshalUniverseID( - assetID[:], groupKeyBytes, - ) - assetKey := unirpc.MarshalAssetKey( - *loc.OutPoint, &loc.ScriptKey, - ) + if loc.OutPoint == nil { + return nil, fmt.Errorf("proof locator for asset %x "+ + "is missing outpoint", loc.AssetID[:]) + } + universeKey := unirpc.UniverseKey{ - Id: universeID, - LeafKey: assetKey, + Id: unirpc.MarshalUniverseID( + loc.AssetID[:], groupKeyBytes, + ), + LeafKey: unirpc.MarshalAssetKey( + *loc.OutPoint, &loc.ScriptKey, + ), } // Setup proof receive/query routine and start backoff @@ -1197,50 +1185,13 @@ func (c *UniverseRpcCourier) ReceiveProof(ctx context.Context, "attempt has failed: %w", err) } - // Decode transition proof from query response. - var transitionProof Proof - if err := transitionProof.Decode( - bytes.NewReader(proofBlob), - ); err != nil { - return nil, err - } - - revProofs = append(revProofs, transitionProof) - - // Break if we've reached the genesis point (the asset is the - // genesis asset). - proofAsset := transitionProof.Asset - if proofAsset.IsGenesisAsset() { - break - } - - // Update locator with principal input to the current outpoint. - prevID, err := transitionProof.Asset.PrimaryPrevID() - if err != nil { - return nil, err - } - - // Parse script key public key. - scriptKeyPubKey, err := btcec.ParsePubKey(prevID.ScriptKey[:]) - if err != nil { - return nil, fmt.Errorf("failed to parse script key "+ - "public key from Proof.PrevID: %w", err) - } - loc.ScriptKey = *scriptKeyPubKey - - loc.AssetID = &prevID.ID - loc.OutPoint = &prevID.OutPoint + return proofBlob, nil } - // Append proofs to proof file in reverse order to their collected - // order. - proofFile := &File{} - for i := len(revProofs) - 1; i >= 0; i-- { - err := proofFile.AppendProof(revProofs[i]) - if err != nil { - return nil, fmt.Errorf("error appending proof to "+ - "proof file: %w", err) - } + proofFile, err := FetchProofProvenance(ctx, originLocator, fetchProof) + if err != nil { + return nil, fmt.Errorf("error fetching proof provenance: %w", + err) } // Encode the full proof file. @@ -1315,3 +1266,83 @@ type TransferLog interface { QueryProofTransferLog(context.Context, Locator, TransferType) ([]time.Time, error) } + +// FetchProofProvenance iterates backwards through the main chain of proofs +// until it reaches the genesis point (the asset is the genesis asset) and then +// returns the full proof file with the full provenance for the asset. +func FetchProofProvenance(ctx context.Context, originLocator Locator, + fetchSingleProof func(context.Context, Locator) (Blob, error)) (*File, + error) { + + // In order to reconstruct the proof file we must collect all the + // transition proofs that make up the main chain of proofs. That is + // accomplished by iterating backwards through the main chain of proofs + // until we reach the genesis point (minting proof). + + // We will update the locator at each iteration. + currentLocator := originLocator + + // reversedProofs is a slice of transition proofs ordered from latest to + // earliest (the issuance proof comes last in the slice). This ordering + // is a reversal of that found in the proof file. + var reversedProofs []Blob + for { + // Setup proof receive/query routine and start backoff + // procedure. + proofBlob, err := fetchSingleProof(ctx, currentLocator) + if err != nil { + return nil, fmt.Errorf("fetching single proof "+ + "failed: %w", err) + } + + // Decode just the asset leaf record from the proof. + var proofAsset asset.Asset + assetRecord := AssetLeafRecord(&proofAsset) + err = SparseDecode(bytes.NewReader(proofBlob), assetRecord) + if err != nil { + return nil, fmt.Errorf("unable to decode proof: %w", + err) + } + + reversedProofs = append(reversedProofs, proofBlob) + + // Break if we've reached the genesis point (the asset is the + // genesis asset). + if proofAsset.IsGenesisAsset() { + break + } + + // Update locator with principal input to the current outpoint. + prevID, err := proofAsset.PrimaryPrevID() + if err != nil { + return nil, err + } + + // Parse script key public key. + scriptKeyPubKey, err := btcec.ParsePubKey(prevID.ScriptKey[:]) + if err != nil { + return nil, fmt.Errorf("failed to parse script key "+ + "public key from Proof.PrevID: %w", err) + } + + currentLocator = Locator{ + AssetID: &prevID.ID, + GroupKey: originLocator.GroupKey, + ScriptKey: *scriptKeyPubKey, + OutPoint: &prevID.OutPoint, + } + } + + // Append proofs to proof file in reverse order to their collected + // order. + proofFile := &File{} + for i := len(reversedProofs) - 1; i >= 0; i-- { + err := proofFile.AppendProofRaw(reversedProofs[i]) + if err != nil { + return nil, fmt.Errorf("error appending proof to "+ + "proof file: %w", err) + } + } + + return proofFile, nil +} diff --git a/proof/file.go b/proof/file.go index 3e53e9316..e3c0998d3 100644 --- a/proof/file.go +++ b/proof/file.go @@ -393,6 +393,26 @@ func (f *File) AppendProof(proof Proof) error { return nil } +// AppendProofRaw appends a raw proof to the file and calculates its chained +// hash. +func (f *File) AppendProofRaw(proof []byte) error { + if f.IsUnknownVersion() { + return ErrUnknownVersion + } + + var prevHash [sha256.Size]byte + if !f.IsEmpty() { + prevHash = f.proofs[len(f.proofs)-1].hash + } + + f.proofs = append(f.proofs, &hashedProof{ + proofBytes: proof, + hash: hashProof(proof, prevHash), + }) + + return nil +} + // ReplaceLastProof attempts to replace the last proof in the file with another // one, updating its chained hash in the process. func (f *File) ReplaceLastProof(proof Proof) error { diff --git a/proof/mint.go b/proof/mint.go index 8c73372e2..737ff5014 100644 --- a/proof/mint.go +++ b/proof/mint.go @@ -11,9 +11,86 @@ import ( "github.com/lightninglabs/taproot-assets/commitment" ) -// Blob represents a serialized proof file, including the checksum. +// Blob either represents a serialized proof file, including the checksum or a +// single serialized issuance/transition proof. Which one it is can be found out +// from the leading magic bytes (or the helper methods that inspect those). type Blob []byte +// IsFile returns true if the blob is a serialized proof file. +func (b Blob) IsFile() bool { + return IsProofFile(b) +} + +// IsSingleProof returns true if the blob is a serialized single proof. +func (b Blob) IsSingleProof() bool { + return IsSingleProof(b) +} + +// AsFile returns the blob as a parsed file. If the blob is a single proof, it +// will be parsed as a file with a single proof. +func (b Blob) AsFile() (*File, error) { + switch { + // We have a full file, we can just parse it and return it. + case b.IsFile(): + file := NewEmptyFile(V0) + if err := file.Decode(bytes.NewReader(b)); err != nil { + return nil, fmt.Errorf("error decoding proof file: %w", + err) + } + + return file, nil + + // We have a single proof, so let's parse it and return it directly, + // assuming it is the most recent proof the caller is interested in. + case b.IsSingleProof(): + p := Proof{} + if err := p.Decode(bytes.NewReader(b)); err != nil { + return nil, fmt.Errorf("error decoding single proof: "+ + "%w", err) + } + + file, err := NewFile(V0, p) + if err != nil { + return nil, err + } + + return file, nil + + default: + return nil, fmt.Errorf("unknown proof blob type") + } +} + +// AsSingleProof returns the blob as a parsed single proof. If the blob is a +// full proof file, the parsed last proof of that file will be returned. +func (b Blob) AsSingleProof() (*Proof, error) { + switch { + // We have a full file, we can just parse it and return it. + case b.IsFile(): + file := NewEmptyFile(V0) + if err := file.Decode(bytes.NewReader(b)); err != nil { + return nil, fmt.Errorf("error decoding proof file: %w", + err) + } + + return file.LastProof() + + // We have a single proof, so let's parse it and return it directly, + // assuming it is the most recent proof the caller is interested in. + case b.IsSingleProof(): + p := Proof{} + if err := p.Decode(bytes.NewReader(b)); err != nil { + return nil, fmt.Errorf("error decoding single proof: "+ + "%w", err) + } + + return &p, nil + + default: + return nil, fmt.Errorf("unknown proof blob type") + } +} + // AssetBlobs is a data structure used to pass around the proof files for a // set of assets which may have been created in the same batched transaction. // This maps the script key of the asset to the serialized proof file blob. diff --git a/tapcfg/server.go b/tapcfg/server.go index 1020d2e24..862de4653 100644 --- a/tapcfg/server.go +++ b/tapcfg/server.go @@ -327,6 +327,8 @@ func genServerConfig(cfg *Config, cfgLogger btclog.Logger, TransferLog: assetStore, }) + multiNotifier := proof.NewMultiArchiveNotifier(assetStore, multiverse) + return &tap.Config{ DebugLevel: cfg.DebugLevel, RuntimeID: runtimeID, @@ -361,7 +363,7 @@ func genServerConfig(cfg *Config, cfgLogger btclog.Logger, ), AddrBook: addrBook, ProofArchive: proofArchive, - ProofNotifier: assetStore, + ProofNotifier: multiNotifier, ErrChan: mainErrChan, ProofCourierDispatcher: proofCourierDispatcher, ProofRetrievalDelay: cfg.CustodianProofRetrievalDelay, ProofWatcher: reOrgWatcher, diff --git a/tapdb/assets_store.go b/tapdb/assets_store.go index 8621f992e..857347349 100644 --- a/tapdb/assets_store.go +++ b/tapdb/assets_store.go @@ -175,6 +175,10 @@ type ActiveAssetsStore interface { FetchAssetProof(ctx context.Context, scriptKey []byte) (AssetProofI, error) + // HasAssetProof returns true if we have proof for a given asset + // identified by its script key. + HasAssetProof(ctx context.Context, scriptKey []byte) (bool, error) + // FetchAssetProofsByAssetID fetches all asset proofs for a given asset // ID. FetchAssetProofsByAssetID(ctx context.Context, @@ -1238,6 +1242,38 @@ func (a *AssetStore) FetchProof(ctx context.Context, return diskProof, nil } +// HasProof returns true if the proof for the given locator exists. This is +// intended to be a performance optimized lookup compared to fetching a proof +// and checking for ErrProofNotFound. +func (a *AssetStore) HasProof(ctx context.Context, locator proof.Locator) (bool, + error) { + + // We don't need anything else but the script key since we have an + // on-disk index for all proofs we store. + var ( + scriptKey = locator.ScriptKey + readOpts = NewAssetStoreReadTx() + haveProof bool + ) + dbErr := a.db.ExecTx(ctx, &readOpts, func(q ActiveAssetsStore) error { + proofAvailable, err := q.HasAssetProof( + ctx, scriptKey.SerializeCompressed(), + ) + if err != nil { + return fmt.Errorf("unable to find out if we have "+ + "asset proof: %w", err) + } + + haveProof = proofAvailable + return nil + }) + if dbErr != nil { + return false, dbErr + } + + return haveProof, nil +} + // FetchProofs fetches all proofs for assets uniquely identified by the passed // asset ID. // diff --git a/tapdb/multiverse.go b/tapdb/multiverse.go index e519183f6..39af6b364 100644 --- a/tapdb/multiverse.go +++ b/tapdb/multiverse.go @@ -1,6 +1,7 @@ package tapdb import ( + "bytes" "context" "crypto/sha256" "database/sql" @@ -506,15 +507,23 @@ type MultiverseStore struct { proofCache *proofCache leafKeysCache *universeLeafCache + + // transferProofDistributor is an event distributor that will be used to + // notify subscribers about new proof leaves that are added to the + // multiverse. This is used to notify the custodian about new incoming + // proofs. And since the custodian is only interested in transfer + // proofs, we only signal on transfer proofs. + transferProofDistributor *fn.EventDistributor[proof.Blob] } // NewMultiverseStore creates a new multiverse DB store handle. func NewMultiverseStore(db BatchedMultiverse) *MultiverseStore { return &MultiverseStore{ - db: db, - rootNodeCache: newRootNodeCache(), - proofCache: newProofCache(), - leafKeysCache: newUniverseLeafCache(), + db: db, + rootNodeCache: newRootNodeCache(), + proofCache: newProofCache(), + leafKeysCache: newUniverseLeafCache(), + transferProofDistributor: fn.NewEventDistributor[proof.Blob](), } } @@ -893,6 +902,78 @@ func (b *MultiverseStore) FetchProofLeaf(ctx context.Context, return proofs, nil } +// FetchProof fetches a proof for an asset uniquely identified by the passed +// Locator. The returned blob contains the encoded full proof file, representing +// the complete provenance of the asset. +// +// If a proof cannot be found, then ErrProofNotFound is returned. +// +// NOTE: This is part of the proof.NotifyArchiver interface. +func (b *MultiverseStore) FetchProof(ctx context.Context, + originLocator proof.Locator) (proof.Blob, error) { + + // The universe only delivers a single proof at a time, so we need a + // callback that we can feed into proof.FetchProofProvenance to assemble + // the full proof file. + fetchProof := func(ctx context.Context, loc proof.Locator) (proof.Blob, + error) { + + uniID := universe.Identifier{ + AssetID: *loc.AssetID, + GroupKey: loc.GroupKey, + ProofType: universe.ProofTypeTransfer, + } + scriptKey := asset.NewScriptKey(&loc.ScriptKey) + leafKey := universe.LeafKey{ + ScriptKey: &scriptKey, + } + if loc.OutPoint != nil { + leafKey.OutPoint = *loc.OutPoint + } + + proofs, err := b.FetchProofLeaf(ctx, uniID, leafKey) + if errors.Is(err, universe.ErrNoUniverseProofFound) { + // If we didn't find a proof, maybe we arrived at the + // issuance proof, in which case we need to adjust the + // proof type. + uniID.ProofType = universe.ProofTypeIssuance + proofs, err = b.FetchProofLeaf(ctx, uniID, leafKey) + + // If we still didn't find a proof, then we'll return + // the proof not found error, but the one from the proof + // package, not the universe package, as the Godoc for + // this method in the proof.NotifyArchiver states. + if errors.Is(err, universe.ErrNoUniverseProofFound) { + return nil, proof.ErrProofNotFound + } + } + if err != nil { + return nil, fmt.Errorf("error fetching proof from "+ + "archive: %w", err) + } + + if len(proofs) > 1 { + return nil, fmt.Errorf("expected only one proof, "+ + "got %d", len(proofs)) + } + + return proofs[0].Leaf.RawProof, nil + } + + file, err := proof.FetchProofProvenance(ctx, originLocator, fetchProof) + if err != nil { + return nil, fmt.Errorf("error fetching proof from archive: %w", + err) + } + + var buf bytes.Buffer + if err := file.Encode(&buf); err != nil { + return nil, fmt.Errorf("error encoding proof file: %w", err) + } + + return buf.Bytes(), nil +} + // UpsertProofLeaf upserts a proof leaf within the multiverse tree and the // universe tree that corresponds to the given key. func (b *MultiverseStore) UpsertProofLeaf(ctx context.Context, @@ -930,6 +1011,14 @@ func (b *MultiverseStore) UpsertProofLeaf(ctx context.Context, b.proofCache.delProofsForAsset(id) b.leafKeysCache.wipeCache(idStr) + // Notify subscribers about the new proof leaf, now that we're sure we + // have written it to the database. But we only care about transfer + // proofs, as the events are received by the custodian to finalize + // inbound transfers. + if id.ProofType == universe.ProofTypeTransfer { + b.transferProofDistributor.NotifySubscribers(leaf.RawProof) + } + return issuanceProof, nil } @@ -976,6 +1065,18 @@ func (b *MultiverseStore) UpsertProofLeafBatch(ctx context.Context, b.rootNodeCache.wipeCache() + // Notify subscribers about the new proof leaves, now that we're sure we + // have written them to the database. But we only care about transfer + // proofs, as the events are received by the custodian to finalize + // inbound transfers. + for idx := range items { + if items[idx].ID.ProofType == universe.ProofTypeTransfer { + b.transferProofDistributor.NotifySubscribers( + items[idx].Leaf.RawProof, + ) + } + } + // Invalidate the root node cache for all the assets we just inserted. idsToDelete := fn.NewSet(fn.Map(items, func(item *universe.Item) treeID { return treeID(item.ID.String()) @@ -1109,3 +1210,68 @@ func (b *MultiverseStore) FetchLeaves(ctx context.Context, return leaves, nil } + +// RegisterSubscriber adds a new subscriber for receiving events. The +// deliverExisting boolean indicates whether already existing items should be +// sent to the NewItemCreated channel when the subscription is started. An +// optional deliverFrom can be specified to indicate from which timestamp/index/ +// marker onward existing items should be delivered on startup. If deliverFrom +// is nil/zero/empty then all existing items will be delivered. +func (b *MultiverseStore) RegisterSubscriber( + receiver *fn.EventReceiver[proof.Blob], deliverExisting bool, + deliverFrom []*proof.Locator) error { + + b.transferProofDistributor.RegisterSubscriber(receiver) + + // No delivery of existing items requested, we're done here. + if !deliverExisting { + return nil + } + + ctx := context.Background() + for _, loc := range deliverFrom { + if loc.AssetID == nil { + return fmt.Errorf("missing asset ID") + } + + id := universe.Identifier{ + AssetID: *loc.AssetID, + GroupKey: loc.GroupKey, + ProofType: universe.ProofTypeTransfer, + } + scriptKey := asset.NewScriptKey(&loc.ScriptKey) + key := universe.LeafKey{ + ScriptKey: &scriptKey, + } + + if loc.OutPoint != nil { + key.OutPoint = *loc.OutPoint + } + + leaves, err := b.FetchProofLeaf(ctx, id, key) + if err != nil { + return err + } + + // Deliver the found leaves to the new item queue of the + // subscriber. + for idx := range leaves { + rawProof := leaves[idx].Leaf.RawProof + receiver.NewItemCreated.ChanIn() <- rawProof + } + } + + return nil +} + +// RemoveSubscriber removes the given subscriber and also stops it from +// processing events. +func (b *MultiverseStore) RemoveSubscriber( + subscriber *fn.EventReceiver[proof.Blob]) error { + + return b.transferProofDistributor.RemoveSubscriber(subscriber) +} + +// A compile-time interface to ensure MultiverseStore meets the +// proof.NotifyArchiver interface. +var _ proof.NotifyArchiver = (*MultiverseStore)(nil) diff --git a/tapdb/sqlc/assets.sql.go b/tapdb/sqlc/assets.sql.go index 5a30c91ea..a89066174 100644 --- a/tapdb/sqlc/assets.sql.go +++ b/tapdb/sqlc/assets.sql.go @@ -1598,6 +1598,27 @@ func (q *Queries) GenesisPoints(ctx context.Context) ([]GenesisPoint, error) { return items, nil } +const hasAssetProof = `-- name: HasAssetProof :one +WITH asset_info AS ( + SELECT assets.asset_id + FROM assets + JOIN script_keys + ON assets.script_key_id = script_keys.script_key_id + WHERE script_keys.tweaked_script_key = $1 +) +SELECT COUNT(asset_info.asset_id) > 0 as has_proof +FROM asset_proofs +JOIN asset_info + ON asset_info.asset_id = asset_proofs.asset_id +` + +func (q *Queries) HasAssetProof(ctx context.Context, tweakedScriptKey []byte) (bool, error) { + row := q.db.QueryRowContext(ctx, hasAssetProof, tweakedScriptKey) + var has_proof bool + err := row.Scan(&has_proof) + return has_proof, err +} + const insertAssetSeedling = `-- name: InsertAssetSeedling :exec INSERT INTO asset_seedlings ( asset_name, asset_type, asset_version, asset_supply, asset_meta_id, diff --git a/tapdb/sqlc/querier.go b/tapdb/sqlc/querier.go index 928dceda2..8730f19c4 100644 --- a/tapdb/sqlc/querier.go +++ b/tapdb/sqlc/querier.go @@ -81,6 +81,7 @@ type Querier interface { GenesisAssets(ctx context.Context) ([]GenesisAsset, error) GenesisPoints(ctx context.Context) ([]GenesisPoint, error) GetRootKey(ctx context.Context, id []byte) (Macaroon, error) + HasAssetProof(ctx context.Context, tweakedScriptKey []byte) (bool, error) InsertAddr(ctx context.Context, arg InsertAddrParams) (int64, error) InsertAssetSeedling(ctx context.Context, arg InsertAssetSeedlingParams) error InsertAssetSeedlingIntoBatch(ctx context.Context, arg InsertAssetSeedlingIntoBatchParams) error diff --git a/tapdb/sqlc/queries/assets.sql b/tapdb/sqlc/queries/assets.sql index cc33881e7..84a68ab5d 100644 --- a/tapdb/sqlc/queries/assets.sql +++ b/tapdb/sqlc/queries/assets.sql @@ -690,6 +690,19 @@ FROM asset_proofs JOIN asset_info ON asset_info.asset_id = asset_proofs.asset_id; +-- name: HasAssetProof :one +WITH asset_info AS ( + SELECT assets.asset_id + FROM assets + JOIN script_keys + ON assets.script_key_id = script_keys.script_key_id + WHERE script_keys.tweaked_script_key = $1 +) +SELECT COUNT(asset_info.asset_id) > 0 as has_proof +FROM asset_proofs +JOIN asset_info + ON asset_info.asset_id = asset_proofs.asset_id; + -- name: InsertAssetWitness :exec INSERT INTO asset_witnesses ( asset_id, prev_out_point, prev_asset_id, prev_script_key, witness_stack, diff --git a/tapdb/sqlutils_test.go b/tapdb/sqlutils_test.go index 34ec78765..a4b51fcb5 100644 --- a/tapdb/sqlutils_test.go +++ b/tapdb/sqlutils_test.go @@ -150,6 +150,15 @@ func (d *DbHandler) AddRandomAssetProof(t *testing.T) (*asset.Asset, }) require.NoError(t, err, "unable to insert chain tx: %w", err) + // Before we insert the proof, we expect our backend to report it as not + // found. + proofLocator := proof.Locator{ + ScriptKey: *testAsset.ScriptKey.PubKey, + } + found, err := assetStore.HasProof(ctx, proofLocator) + require.NoError(t, err) + require.False(t, found) + // With all our test data constructed, we'll now attempt to import the // asset into the database. require.NoError(t, assetStore.ImportProofs( @@ -157,6 +166,11 @@ func (d *DbHandler) AddRandomAssetProof(t *testing.T) (*asset.Asset, annotatedProof, )) + // Now the HasProof should return true. + found, err = assetStore.HasProof(ctx, proofLocator) + require.NoError(t, err) + require.True(t, found) + return testAsset, annotatedProof } diff --git a/tapgarden/custodian.go b/tapgarden/custodian.go index 4184d164e..0b87d24a6 100644 --- a/tapgarden/custodian.go +++ b/tapgarden/custodian.go @@ -1,7 +1,6 @@ package tapgarden import ( - "bytes" "errors" "fmt" "strings" @@ -652,6 +651,7 @@ func (c *Custodian) checkProofAvailable(event *address.Event) (bool, error) { AssetID: fn.Ptr(event.Addr.AssetID), GroupKey: event.Addr.GroupKey, ScriptKey: event.Addr.ScriptKey, + OutPoint: &event.Outpoint, }) switch { case errors.Is(err, proof.ErrProofNotFound): @@ -662,15 +662,23 @@ func (c *Custodian) checkProofAvailable(event *address.Event) (bool, error) { err) } - file := proof.NewEmptyFile(proof.V0) - if err := file.Decode(bytes.NewReader(blob)); err != nil { - return false, fmt.Errorf("error decoding proof file: %w", err) + // At this point, we expect the proof to be a full file, containing the + // whole provenance chain (as required by implementers of the + // proof.NotifyArchiver.FetchProof() method). So if we don't we can't + // continue. + if !blob.IsFile() { + return false, fmt.Errorf("expected proof to be a full file, " + + "but got something else") + } + + file, err := blob.AsFile() + if err != nil { + return false, fmt.Errorf("error extracting proof file: %w", err) } // Exit early on empty proof (shouldn't happen outside of test cases). if file.IsEmpty() { - return false, fmt.Errorf("archive contained empty proof file: "+ - "%w", err) + return false, fmt.Errorf("archive contained empty proof file") } lastProof, err := file.LastProof() @@ -691,9 +699,92 @@ func (c *Custodian) checkProofAvailable(event *address.Event) (bool, error) { // and pending address event. If a proof successfully matches the desired state // of the address, that completes the inbound transfer of an asset. func (c *Custodian) mapProofToEvent(p proof.Blob) error { - file := proof.NewEmptyFile(proof.V0) - if err := file.Decode(bytes.NewReader(p)); err != nil { - return fmt.Errorf("error decoding proof file: %w", err) + // We arrive here if we are notified about a new proof. The notification + // interface allows that proof to be a single transition proof. So if + // we don't have a full file yet, we need to fetch it now. The + // proof.NotifyArchiver.FetchProof() method will return the full file as + // per its Godoc. + var ( + proofBlob = p + lastProof *proof.Proof + err error + ) + if !p.IsFile() { + log.Debugf("Received single proof, inspecting if matches event") + lastProof, err = p.AsSingleProof() + if err != nil { + return fmt.Errorf("error decoding proof: %w", err) + } + + // Before we go ahead and fetch the full file, let's make sure + // we are actually interested in this proof. We need to do this + // because we receive all transfer proofs inserted into the + // local universe here. So they could just be from a proof sync + // run and not actually be for an address we are interested in. + haveMatchingEvents := fn.AnyMapItem( + c.events, func(e *address.Event) bool { + return EventMatchesProof(e, lastProof) + }, + ) + if !haveMatchingEvents { + log.Debugf("Proof doesn't match any events, skipping.") + return nil + } + + ctxt, cancel := c.WithCtxQuit() + defer cancel() + + loc := proof.Locator{ + AssetID: fn.Ptr(lastProof.Asset.ID()), + ScriptKey: *lastProof.Asset.ScriptKey.PubKey, + OutPoint: fn.Ptr(lastProof.OutPoint()), + } + if lastProof.Asset.GroupKey != nil { + loc.GroupKey = &lastProof.Asset.GroupKey.GroupPubKey + } + + log.Debugf("Received single proof, fetching full file") + proofBlob, err = c.cfg.ProofNotifier.FetchProof(ctxt, loc) + if err != nil { + return fmt.Errorf("error fetching full proof file for "+ + "event: %w", err) + } + + // Do we already have this proof in our main archive? This + // should only be false if we got the notification from our + // local universe instead of the local proof archive (which the + // couriers use). This is mainly an optimization to make sure we + // don't unnecessarily overwrite the proofs in our main archive. + haveProof, err := c.cfg.ProofArchive.HasProof(ctxt, loc) + if err != nil { + return fmt.Errorf("error checking if proof is "+ + "available: %w", err) + } + + // We don't have the proof yet, or not in all backends, so we + // need to import it now. + if !haveProof { + headerVerifier := GenHeaderVerifier( + ctxt, c.cfg.ChainBridge, + ) + err = c.cfg.ProofArchive.ImportProofs( + ctxt, headerVerifier, c.cfg.GroupVerifier, + false, &proof.AnnotatedProof{ + Locator: loc, + Blob: proofBlob, + }, + ) + if err != nil { + return fmt.Errorf("error importing proof "+ + "file into main archive: %w", err) + } + } + } + + // Now we can be sure we have a file. + file, err := proofBlob.AsFile() + if err != nil { + return fmt.Errorf("error extracting proof file: %w", err) } // Exit early on empty proof (shouldn't happen outside of test cases). @@ -704,19 +795,22 @@ func (c *Custodian) mapProofToEvent(p proof.Blob) error { // We got the proof from the multi archiver, which verifies it before // giving it to us. So we don't have to verify them again and can - // directly look at the last state. - lastProof, err := file.LastProof() - if err != nil { - return fmt.Errorf("error fetching last proof: %w", err) + // directly look at the last state. We can skip extracting the last + // proof if we started out with a single proof in the first place, which + // we already parsed above. + if lastProof == nil { + lastProof, err = file.LastProof() + if err != nil { + return fmt.Errorf("error fetching last proof: %w", err) + } } - log.Infof("Received new proof file, version=%d, num_proofs=%d", - file.Version, file.NumProofs()) + log.Infof("Received new proof file for asset ID %s, version=%d,"+ + "num_proofs=%d", lastProof.Asset.ID().String(), file.Version, + file.NumProofs()) // Check if any of our in-flight events match the last proof's state. for _, event := range c.events { - if AddrMatchesAsset(event.Addr, &lastProof.Asset) && - event.Outpoint == lastProof.OutPoint() { - + if EventMatchesProof(event, lastProof) { // Importing a proof already creates the asset in the // database. Therefore, all we need to do is update the // state of the address event to mark it as completed @@ -870,3 +964,9 @@ func AddrMatchesAsset(addr *address.AddrWithKeyInfo, a *asset.Asset) bool { return addr.AssetID == a.ID() && groupKeyEqual && addr.ScriptKey.IsEqual(a.ScriptKey.PubKey) } + +// EventMatchesProof returns true if the given event matches the given proof. +func EventMatchesProof(event *address.Event, p *proof.Proof) bool { + return AddrMatchesAsset(event.Addr, &p.Asset) && + event.Outpoint == p.OutPoint() +} diff --git a/tapgarden/mock.go b/tapgarden/mock.go index 5f01c7f94..cdbe3540e 100644 --- a/tapgarden/mock.go +++ b/tapgarden/mock.go @@ -592,6 +592,12 @@ func (m *MockProofArchive) FetchProof(ctx context.Context, return nil, nil } +func (m *MockProofArchive) HasProof(ctx context.Context, + id proof.Locator) (bool, error) { + + return false, nil +} + func (m *MockProofArchive) FetchProofs(ctx context.Context, id asset.ID) ([]*proof.AnnotatedProof, error) { diff --git a/tapgarden/re-org_watcher.go b/tapgarden/re-org_watcher.go index 4b650d60c..3bb20201e 100644 --- a/tapgarden/re-org_watcher.go +++ b/tapgarden/re-org_watcher.go @@ -68,7 +68,7 @@ type ReOrgWatcherConfig struct { // ProofArchive is the storage backend for proofs to which we store // updated proofs. - ProofArchive proof.NotifyArchiver + ProofArchive proof.Archiver // NonBuriedAssetFetcher is a function that returns all assets that are // not yet sufficiently deep buried.