Skip to content

Commit

Permalink
tapgarden: fix proof not found bug
Browse files Browse the repository at this point in the history
This fixes a bug reported by a user running v0.3.3-rc1. Although the
situation can only happen if the daemon is shut down in exactly the
wrong moment (or, more likely, due to an otherwise inconsistent database
state), it can happen that the multiverse reports a proof is available
but it wasn't yet imported into the local archive.
To make sure we can definitely rely on the proof being in the asset DB
when trying to complete a receive event, we double check and re-import
the proof if necessary.
  • Loading branch information
guggero committed Feb 5, 2024
1 parent 171535d commit e532f2c
Show file tree
Hide file tree
Showing 2 changed files with 201 additions and 29 deletions.
75 changes: 51 additions & 24 deletions tapgarden/custodian.go
Original file line number Diff line number Diff line change
Expand Up @@ -647,12 +647,13 @@ func (c *Custodian) checkProofAvailable(event *address.Event) (bool, error) {
// be a multi archiver that includes file based storage) to make sure
// the proof is available in the relational database. If the proof is
// not in the DB, we can't update the event.
blob, err := c.cfg.ProofNotifier.FetchProof(ctxt, proof.Locator{
locator := proof.Locator{
AssetID: fn.Ptr(event.Addr.AssetID),
GroupKey: event.Addr.GroupKey,
ScriptKey: event.Addr.ScriptKey,
OutPoint: &event.Outpoint,
})
}
blob, err := c.cfg.ProofNotifier.FetchProof(ctxt, locator)
switch {
case errors.Is(err, proof.ErrProofNotFound):
return false, nil
Expand All @@ -671,6 +672,19 @@ func (c *Custodian) checkProofAvailable(event *address.Event) (bool, error) {
"but got something else")
}

// In case we missed a notification from the local universe and didn't
// previously import the proof (for example because we were shutting
// down), we could be in a situation where the local database doesn't
// have the proof yet. So we make sure to import it now.
err = c.assertProofInLocalArchive(&proof.AnnotatedProof{
Locator: locator,
Blob: blob,
})
if err != nil {
return false, fmt.Errorf("error asserting proof in local "+
"archive: %w", err)
}

file, err := blob.AsFile()
if err != nil {
return false, fmt.Errorf("error extracting proof file: %w", err)
Expand Down Expand Up @@ -755,29 +769,13 @@ func (c *Custodian) mapProofToEvent(p proof.Blob) error {
// local universe instead of the local proof archive (which the
// couriers use). This is mainly an optimization to make sure we
// don't unnecessarily overwrite the proofs in our main archive.
haveProof, err := c.cfg.ProofArchive.HasProof(ctxt, loc)
err := c.assertProofInLocalArchive(&proof.AnnotatedProof{
Locator: loc,
Blob: proofBlob,
})
if err != nil {
return fmt.Errorf("error checking if proof is "+
"available: %w", err)
}

// We don't have the proof yet, or not in all backends, so we
// need to import it now.
if !haveProof {
headerVerifier := GenHeaderVerifier(
ctxt, c.cfg.ChainBridge,
)
err = c.cfg.ProofArchive.ImportProofs(
ctxt, headerVerifier, c.cfg.GroupVerifier,
false, &proof.AnnotatedProof{
Locator: loc,
Blob: proofBlob,
},
)
if err != nil {
return fmt.Errorf("error importing proof "+
"file into main archive: %w", err)
}
return fmt.Errorf("error asserting proof in local "+
"archive: %w", err)
}
}

Expand Down Expand Up @@ -828,6 +826,35 @@ func (c *Custodian) mapProofToEvent(p proof.Blob) error {
return nil
}

// assertProofInLocalArchive checks if the proof is already in the local proof
// archive. If it isn't, it is imported now.
func (c *Custodian) assertProofInLocalArchive(p *proof.AnnotatedProof) error {
ctxt, cancel := c.WithCtxQuit()
defer cancel()

haveProof, err := c.cfg.ProofArchive.HasProof(ctxt, p.Locator)
if err != nil {
return fmt.Errorf("error checking if proof is available: %w",
err)
}

// We don't have the proof yet, or not in all backends, so we
// need to import it now.
if !haveProof {
headerVerifier := GenHeaderVerifier(ctxt, c.cfg.ChainBridge)
err = c.cfg.ProofArchive.ImportProofs(
ctxt, headerVerifier, c.cfg.GroupVerifier, false, p,
)
if err != nil {
log.Errorf("ERROOOORRR: %v", err)
return fmt.Errorf("error importing proof file into "+
"main archive: %w", err)
}
}

return nil
}

// setReceiveCompleted updates the address event in the database to mark it as
// completed successfully and to link it to the proof we received.
func (c *Custodian) setReceiveCompleted(event *address.Event,
Expand Down
155 changes: 150 additions & 5 deletions tapgarden/custodian_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"context"
"database/sql"
"fmt"
"io"
"math/rand"
"net/url"
"testing"
Expand All @@ -23,6 +24,7 @@ import (
"github.com/lightninglabs/taproot-assets/tapdb"
"github.com/lightninglabs/taproot-assets/tapgarden"
"github.com/lightninglabs/taproot-assets/tapscript"
"github.com/lightninglabs/taproot-assets/universe"
"github.com/lightningnetwork/lnd/clock"
"github.com/lightningnetwork/lnd/lnrpc"
"github.com/lightningnetwork/lnd/lntest/wait"
Expand Down Expand Up @@ -59,9 +61,46 @@ func newAddrBookForDB(db *tapdb.BaseDB, keyRing *tapgarden.MockKeyRing,
return book, tapdbBook
}

type mockVerifier struct {
t *testing.T
}

func newMockVerifier(t *testing.T) *mockVerifier {
return &mockVerifier{
t: t,
}
}

func (m *mockVerifier) Verify(_ context.Context, r io.Reader,
headerVerifier proof.HeaderVerifier,
groupVerifier proof.GroupVerifier) (*proof.AssetSnapshot, error) {

f := &proof.File{}
err := f.Decode(r)
require.NoError(m.t, err)

lastProof, err := f.LastProof()
require.NoError(m.t, err)

ac, err := commitment.NewAssetCommitment(&lastProof.Asset)
require.NoError(m.t, err)
tc, err := commitment.NewTapCommitment(ac)
require.NoError(m.t, err)

return &proof.AssetSnapshot{
Asset: &lastProof.Asset,
OutPoint: lastProof.OutPoint(),
OutputIndex: lastProof.InclusionProof.OutputIndex,
AnchorBlockHash: lastProof.BlockHeader.BlockHash(),
AnchorTx: &lastProof.AnchorTx,
InternalKey: lastProof.InclusionProof.InternalKey,
ScriptRoot: tc,
}, nil
}

// newProofArchive creates a new instance of the MultiArchiver.
func newProofArchiveForDB(t *testing.T, db *tapdb.BaseDB) (*proof.MultiArchiver,
*tapdb.AssetStore) {
*tapdb.AssetStore, *tapdb.MultiverseStore) {

txCreator := func(tx *sql.Tx) tapdb.ActiveAssetsStore {
return db.WithTx(tx)
Expand All @@ -78,7 +117,14 @@ func newProofArchiveForDB(t *testing.T, db *tapdb.BaseDB) (*proof.MultiArchiver,
assetStore,
)

return proofArchive, assetStore
multiverseDB := tapdb.NewTransactionExecutor(
db, func(tx *sql.Tx) tapdb.BaseMultiverseStore {
return db.WithTx(tx)
},
)
multiverse := tapdb.NewMultiverseStore(multiverseDB)

return proofArchive, assetStore, multiverse
}

type custodianHarness struct {
Expand All @@ -93,6 +139,7 @@ type custodianHarness struct {
addrBook *address.Book
syncer *tapgarden.MockAssetSyncer
assetDB *tapdb.AssetStore
multiverse *tapdb.MultiverseStore
courier *proof.MockProofCourier
}

Expand Down Expand Up @@ -192,6 +239,48 @@ func (h *custodianHarness) assertAddrsRegistered(
}
}

// addProofFileToMultiverse adds the given proof to the multiverse store.
func (h *custodianHarness) addProofFileToMultiverse(p *proof.AnnotatedProof) {
f := &proof.File{}
err := f.Decode(bytes.NewReader(p.Blob))
require.NoError(h.t, err)

ctx := context.Background()
ctxt, cancel := context.WithTimeout(ctx, testTimeout)
defer cancel()

for i := uint32(0); i < uint32(f.NumProofs()); i++ {
transition, err := f.ProofAt(i)
require.NoError(h.t, err)

rawTransition, err := f.RawProofAt(i)
require.NoError(h.t, err)

id := universe.NewUniIDFromAsset(transition.Asset)
key := universe.LeafKey{
OutPoint: transition.OutPoint(),
ScriptKey: fn.Ptr(asset.NewScriptKey(
transition.Asset.ScriptKey.PubKey,
)),
}
leaf := &universe.Leaf{
GenesisWithGroup: universe.GenesisWithGroup{
Genesis: transition.Asset.Genesis,
GroupKey: transition.Asset.GroupKey,
},
RawProof: rawTransition,
Asset: &transition.Asset,
Amt: transition.Asset.Amount,
}
h.t.Logf("Importing proof with script key %x and outpoint %v "+
"into multiverse",
key.ScriptKey.PubKey.SerializeCompressed(),
key.OutPoint)
_, err = h.multiverse.UpsertProofLeaf(ctxt, id, key, leaf, nil)
require.NoError(h.t, err)
}
}

func newHarness(t *testing.T,
initialAddrs []*address.AddrWithKeyInfo) *custodianHarness {

Expand All @@ -201,7 +290,10 @@ func newHarness(t *testing.T,
syncer := tapgarden.NewMockAssetSyncer()
db := tapdb.NewTestDB(t)
addrBook, tapdbBook := newAddrBookForDB(db.BaseDB, keyRing, syncer)
_, assetDB := newProofArchiveForDB(t, db.BaseDB)

_, assetDB, multiverse := newProofArchiveForDB(t, db.BaseDB)
notifier := proof.NewMultiArchiveNotifier(assetDB, multiverse)

courier := proof.NewMockProofCourier()
courierDispatch := &proof.MockProofCourierDispatcher{
Courier: courier,
Expand All @@ -214,14 +306,18 @@ func newHarness(t *testing.T,
require.NoError(t, err)
}

archive := proof.NewMultiArchiver(
newMockVerifier(t), testTimeout, assetDB,
)

errChan := make(chan error, 1)
cfg := &tapgarden.CustodianConfig{
ChainParams: chainParams,
ChainBridge: chainBridge,
WalletAnchor: walletAnchor,
AddrBook: addrBook,
ProofArchive: assetDB,
ProofNotifier: assetDB,
ProofArchive: archive,
ProofNotifier: notifier,
ProofCourierDispatcher: courierDispatch,
ProofWatcher: proofWatcher,
ErrChan: errChan,
Expand All @@ -238,6 +334,7 @@ func newHarness(t *testing.T,
addrBook: addrBook,
syncer: syncer,
assetDB: assetDB,
multiverse: multiverse,
courier: courier,
}
}
Expand Down Expand Up @@ -315,6 +412,11 @@ func randProof(t *testing.T, outputIndex int, tx *wire.MsgTx,
Genesis: *genesis,
Amount: addr.Amount,
ScriptKey: asset.NewScriptKey(&addr.ScriptKey),
PrevWitnesses: []asset.Witness{
{
PrevID: &asset.PrevID{},
},
},
}
if addr.GroupKey != nil {
a.GroupKey = &asset.GroupKey{
Expand Down Expand Up @@ -653,6 +755,49 @@ func mustMakeAddr(t *testing.T,
return addr
}

// TestProofInMultiverseOnly tests that the custodian imports a proof correctly
// into the local archive if it's only present in the multiverse.
func TestProofInMultiverseOnly(t *testing.T) {
h := newHarness(t, nil)

// Before we start the custodian, we create a random address and a
// corresponding wallet transaction.
ctx := context.Background()

addr, genesis := randAddr(h)
err := h.tapdbBook.InsertAddrs(ctx, *addr)
require.NoError(t, err)

// We now start the custodian and make sure it's started up correctly
// and the pending event is registered.
require.NoError(t, h.c.Start())
h.assertStartup()
h.assertAddrsRegistered(addr)

// Receiving a TX for it should create a pending event and cause the
// proof courier to attempt to fetch it. But the courier won't find it.
outputIdx, tx := randWalletTx(addr)
h.walletAnchor.SubscribeTx <- *tx
h.assertEventsPresent(1, address.StatusTransactionDetected)

// We now stop the custodian again.
require.NoError(t, h.c.Stop())

// The proof is only in the multiverse, not in the local archive. And we
// add the proof to the multiverse before starting the custodian, so the
// notification for it doesn't trigger.
mockProof := randProof(t, outputIdx, tx.Tx, genesis, addr)
h.addProofFileToMultiverse(mockProof)

// And a new start should import the proof into the local archive.
h.c = tapgarden.NewCustodian(h.cfg)
require.NoError(t, h.c.Start())
t.Cleanup(func() {
require.NoError(t, h.c.Stop())
})
h.assertStartup()
}

// TestAddrMatchesAsset tests that the AddrMatchesAsset function works
// correctly.
func TestAddrMatchesAsset(t *testing.T) {
Expand Down

0 comments on commit e532f2c

Please sign in to comment.