Skip to content

Commit

Permalink
Merge pull request lightninglabs#756 from lightninglabs/custodian-eve…
Browse files Browse the repository at this point in the history
…nts-cleanup

Refactor and cleanup custodian events cache handling
  • Loading branch information
guggero authored Jan 10, 2024
2 parents 5a25a46 + 58d4baa commit 76cef1b
Showing 1 changed file with 42 additions and 16 deletions.
58 changes: 42 additions & 16 deletions tapgarden/custodian.go
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,14 @@ func (c *Custodian) watchInboundAssets() {
// If we didn't find a proof, we'll launch a goroutine to use
// the ProofCourier to import the proof into our local DB.
c.Wg.Add(1)
go c.receiveProof(event.Addr.Tap, event.Outpoint)
go func() {
defer c.Wg.Done()

recErr := c.receiveProof(event.Addr.Tap, event.Outpoint)
if recErr != nil {
reportErr(recErr)
}
}()
}

// Read all on-chain transactions and make sure they are mapped to an
Expand Down Expand Up @@ -402,7 +409,17 @@ func (c *Custodian) inspectWalletTx(walletTx *lndclient.Transaction) error {
// ProofCourier to import the proof into our
// local DB.
c.Wg.Add(1)
go c.receiveProof(event.Addr.Tap, op)
go func() {
defer c.Wg.Done()

recErr := c.receiveProof(
event.Addr.Tap, op,
)
if recErr != nil {
log.Errorf("Unable to receive "+
"proof: %v", recErr)
}
}()
}

continue
Expand Down Expand Up @@ -434,19 +451,23 @@ func (c *Custodian) inspectWalletTx(walletTx *lndclient.Transaction) error {
// launch a goroutine to use the ProofCourier to import the
// proof into our local DB.
c.Wg.Add(1)
go c.receiveProof(addr, op)
go func() {
defer c.Wg.Done()

recErr := c.receiveProof(addr, op)
if recErr != nil {
log.Errorf("Unable to receive proof: %v",
recErr)
}
}()
}

return nil
}

// receiveProof attempts to receive a proof for the given address and outpoint
// via the proof courier service.
//
// NOTE: This must be called as a goroutine.
func (c *Custodian) receiveProof(addr *address.Tap, op wire.OutPoint) {
defer c.Wg.Done()

func (c *Custodian) receiveProof(addr *address.Tap, op wire.OutPoint) error {
ctx, cancel := c.WithCtxQuitNoTimeout()
defer cancel()

Expand All @@ -466,9 +487,8 @@ func (c *Custodian) receiveProof(addr *address.Tap, op wire.OutPoint) {
&addr.ProofCourierAddr, recipient,
)
if err != nil {
log.Errorf("Unable to initiate proof courier service handle: "+
"%v", err)
return
return fmt.Errorf("unable to initiate proof courier service "+
"handle: %w", err)
}

// Update courier handle events subscribers before attempting to
Expand All @@ -484,7 +504,7 @@ func (c *Custodian) receiveProof(addr *address.Tap, op wire.OutPoint) {
select {
case <-time.After(c.cfg.ProofRetrievalDelay):
case <-ctx.Done():
return
return nil
}

// Attempt to receive proof via proof courier service.
Expand All @@ -496,8 +516,8 @@ func (c *Custodian) receiveProof(addr *address.Tap, op wire.OutPoint) {
}
addrProof, err := courier.ReceiveProof(ctx, loc)
if err != nil {
log.Errorf("Unable to receive proof using courier: %v", err)
return
return fmt.Errorf("unable to receive proof using courier: %w",
err)
}

log.Debugf("Received proof for: script_key=%x, asset_id=%x",
Expand All @@ -511,9 +531,15 @@ func (c *Custodian) receiveProof(addr *address.Tap, op wire.OutPoint) {
ctx, headerVerifier, c.cfg.GroupVerifier, false, addrProof,
)
if err != nil {
log.Errorf("Unable to import proofs: %v", err)
return
return fmt.Errorf("unable to import proofs: %w", err)
}

// The proof is now verified and in our local archive. We will now
// finalize handling the proof like we would with any other newly
// received proof.
c.proofSubscription.NewItemCreated.ChanIn() <- addrProof.Blob

return nil
}

// mapToTapAddr attempts to match a transaction output to a Taproot Asset
Expand Down

0 comments on commit 76cef1b

Please sign in to comment.