Skip to content

Commit

Permalink
Merge pull request #1088 from lightninglabs/share-proof-courier-con
Browse files Browse the repository at this point in the history
multi: specify proof courier proof recipient at delivery or receive
  • Loading branch information
ffranr authored Aug 16, 2024
2 parents e3c8d02 + 764ef38 commit 0a64116
Show file tree
Hide file tree
Showing 7 changed files with 50 additions and 58 deletions.
45 changes: 18 additions & 27 deletions proof/courier.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
"time"

"github.com/btcsuite/btcd/btcec/v2"
"github.com/davecgh/go-spew/spew"
"github.com/lightninglabs/lightning-node-connect/hashmailrpc"
"github.com/lightninglabs/taproot-assets/asset"
"github.com/lightninglabs/taproot-assets/fn"
Expand Down Expand Up @@ -58,11 +57,12 @@ type CourierHarness interface {
type Courier interface {
// DeliverProof attempts to delivery a proof to the receiver, using the
// information in the Addr type.
DeliverProof(context.Context, *AnnotatedProof) error
DeliverProof(context.Context, Recipient, *AnnotatedProof) error

// ReceiveProof attempts to obtain a proof as identified by the passed
// locator from the source encapsulated within the specified address.
ReceiveProof(context.Context, Locator) (*AnnotatedProof, error)
ReceiveProof(context.Context, Recipient,
Locator) (*AnnotatedProof, error)

// SetSubscribers sets the set of subscribers that will be notified
// of proof courier related events.
Expand Down Expand Up @@ -96,7 +96,7 @@ type CourierCfg struct {
type CourierDispatch interface {
// NewCourier instantiates a new courier service handle given a service
// URL address.
NewCourier(addr *url.URL, recipient Recipient) (Courier, error)
NewCourier(addr *url.URL) (Courier, error)
}

// URLDispatch is a proof courier dispatch that uses the courier address URL
Expand All @@ -114,9 +114,7 @@ func NewCourierDispatch(cfg *CourierCfg) *URLDispatch {

// NewCourier instantiates a new courier service handle given a service URL
// address.
func (u *URLDispatch) NewCourier(addr *url.URL,
recipient Recipient) (Courier, error) {

func (u *URLDispatch) NewCourier(addr *url.URL) (Courier, error) {
subscribers := make(map[uint64]*fn.EventReceiver[fn.Event])

// Create new courier addr based on URL scheme.
Expand All @@ -136,7 +134,6 @@ func (u *URLDispatch) NewCourier(addr *url.URL,
return &HashMailCourier{
cfg: u.cfg,
backoffHandle: backoffHandler,
recipient: recipient,
mailbox: hashMailBox,
subscribers: subscribers,
}, nil
Expand All @@ -162,7 +159,6 @@ func (u *URLDispatch) NewCourier(addr *url.URL,
client := unirpc.NewUniverseClient(conn)

return &UniverseRpcCourier{
recipient: recipient,
client: client,
backoffHandle: backoffHandler,
cfg: u.cfg,
Expand Down Expand Up @@ -728,9 +724,8 @@ type HashMailCourier struct {
// delivery.
backoffHandle *BackoffHandler

// recipient describes the recipient of the proof.
recipient Recipient

// mailbox is the mailbox service that the courier will use to interact
// with the hashmail server.
mailbox ProofMailbox

// subscribers is a map of components that want to be notified on new
Expand All @@ -747,14 +742,14 @@ type HashMailCourier struct {
//
// TODO(roasbeef): other delivery context as type param?
func (h *HashMailCourier) DeliverProof(ctx context.Context,
proof *AnnotatedProof) error {
recipient Recipient, proof *AnnotatedProof) error {

log.Infof("Attempting to deliver receiver proof for send of "+
"asset_id=%v, amt=%v", h.recipient.AssetID, h.recipient.Amount)
"asset_id=%v, amt=%v", recipient.AssetID, recipient.Amount)

// Compute the stream IDs for the sender and receiver.
senderStreamID := deriveSenderStreamID(h.recipient)
receiverStreamID := deriveReceiverStreamID(h.recipient)
senderStreamID := deriveSenderStreamID(recipient)
receiverStreamID := deriveReceiverStreamID(recipient)

// Interact with the hashmail service using a backoff procedure to
// ensure that we don't overwhelm the service with delivery attempts.
Expand Down Expand Up @@ -891,8 +886,7 @@ func (h *HashMailCourier) publishSubscriberEvent(event fn.Event) {
// Close closes the underlying connection to the hashmail server.
func (h *HashMailCourier) Close() error {
if err := h.mailbox.Close(); err != nil {
log.Warnf("unable to close mailbox session, "+
"recipient=%v: %v", err, spew.Sdump(h.recipient))
log.Warnf("Unable to close mailbox session: %v", err)
return err
}

Expand Down Expand Up @@ -941,10 +935,10 @@ func NewBackoffWaitEvent(

// ReceiveProof attempts to obtain a proof as identified by the passed locator
// from the source encapsulated within the specified address.
func (h *HashMailCourier) ReceiveProof(ctx context.Context,
func (h *HashMailCourier) ReceiveProof(ctx context.Context, recipient Recipient,
loc Locator) (*AnnotatedProof, error) {

senderStreamID := deriveSenderStreamID(h.recipient)
senderStreamID := deriveSenderStreamID(recipient)
if err := h.mailbox.Init(ctx, senderStreamID); err != nil {
return nil, err
}
Expand All @@ -960,7 +954,7 @@ func (h *HashMailCourier) ReceiveProof(ctx context.Context,

// Now that we've read the proof, we'll create our mailbox (which might
// already exist) to send an ACK back to the sender.
receiverStreamID := deriveReceiverStreamID(h.recipient)
receiverStreamID := deriveReceiverStreamID(recipient)
log.Infof("Sending ACK to sender via sid=%x", receiverStreamID)
if err := h.mailbox.Init(ctx, receiverStreamID); err != nil {
return nil, err
Expand Down Expand Up @@ -1001,9 +995,6 @@ type UniverseRpcCourierCfg struct {
// UniverseRpcCourier is a universe RPC proof courier service handle. It
// implements the Courier interface.
type UniverseRpcCourier struct {
// recipient describes the recipient of the proof.
recipient Recipient

// client is the RPC client that the courier will use to interact with
// the universe RPC server.
client unirpc.UniverseClient
Expand All @@ -1030,7 +1021,7 @@ type UniverseRpcCourier struct {

// DeliverProof attempts to delivery a proof file to the receiver.
func (c *UniverseRpcCourier) DeliverProof(ctx context.Context,
annotatedProof *AnnotatedProof) error {
recipient Recipient, annotatedProof *AnnotatedProof) error {

// Decode annotated proof into proof file.
proofFile := &File{}
Expand All @@ -1041,7 +1032,7 @@ func (c *UniverseRpcCourier) DeliverProof(ctx context.Context,

log.Infof("Universe RPC proof courier attempting to deliver proof "+
"file (num_proofs=%d) for send event (asset_id=%v, amt=%v)",
proofFile.NumProofs(), c.recipient.AssetID, c.recipient.Amount)
proofFile.NumProofs(), recipient.AssetID, recipient.Amount)

// Iterate over each proof in the proof file and submit to the courier
// service.
Expand Down Expand Up @@ -1136,7 +1127,7 @@ func (c *UniverseRpcCourier) DeliverProof(ctx context.Context,
// ReceiveProof attempts to obtain a proof file from the courier service. The
// final proof in the target proof file is identified by the given locator.
func (c *UniverseRpcCourier) ReceiveProof(ctx context.Context,
originLocator Locator) (*AnnotatedProof, error) {
_ Recipient, originLocator Locator) (*AnnotatedProof, error) {

fetchProof := func(ctx context.Context, loc Locator) (Blob, error) {
var groupKeyBytes []byte
Expand Down
8 changes: 4 additions & 4 deletions proof/courier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,9 @@ func TestUniverseRpcCourierLocalArchiveShortCut(t *testing.T) {

localArchive.proofs.Store(locHash, proofBlob)

recipient := Recipient{}
courier := &UniverseRpcCourier{
recipient: Recipient{},
client: nil,
client: nil,
cfg: &CourierCfg{
LocalArchive: localArchive,
},
Expand All @@ -58,7 +58,7 @@ func TestUniverseRpcCourierLocalArchiveShortCut(t *testing.T) {

// If we attempt to receive a proof that the local archive has, we
// expect to get it back.
annotatedProof, err := courier.ReceiveProof(ctxt, locator)
annotatedProof, err := courier.ReceiveProof(ctxt, recipient, locator)
require.NoError(t, err)

require.Equal(t, proofBlob, annotatedProof.Blob)
Expand All @@ -67,7 +67,7 @@ func TestUniverseRpcCourierLocalArchiveShortCut(t *testing.T) {
// should end up in the code path that attempts to fetch the proof from
// the universe. Since we don't want to set up a full universe server
// in the test, we just make sure we get an error from that code path.
_, err = courier.ReceiveProof(ctxt, Locator{
_, err = courier.ReceiveProof(ctxt, recipient, Locator{
AssetID: fn.Ptr(genesis.ID()),
ScriptKey: *proof.Asset.ScriptKey.PubKey,
})
Expand Down
6 changes: 3 additions & 3 deletions proof/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -443,7 +443,7 @@ type MockProofCourierDispatcher struct {

// NewCourier instantiates a new courier service handle given a service
// URL address.
func (m *MockProofCourierDispatcher) NewCourier(*url.URL, Recipient) (Courier,
func (m *MockProofCourierDispatcher) NewCourier(*url.URL) (Courier,
error) {

return m.Courier, nil
Expand Down Expand Up @@ -479,7 +479,7 @@ func (m *MockProofCourier) Stop() error {
// DeliverProof attempts to delivery a proof to the receiver, using the
// information in the Addr type.
func (m *MockProofCourier) DeliverProof(_ context.Context,
proof *AnnotatedProof) error {
_ Recipient, proof *AnnotatedProof) error {

m.Lock()
defer m.Unlock()
Expand All @@ -492,7 +492,7 @@ func (m *MockProofCourier) DeliverProof(_ context.Context,
// ReceiveProof attempts to obtain a proof as identified by the passed
// locator from the source encapsulated within the specified address.
func (m *MockProofCourier) ReceiveProof(_ context.Context,
loc Locator) (*AnnotatedProof, error) {
_ Recipient, loc Locator) (*AnnotatedProof, error) {

m.Lock()
defer m.Unlock()
Expand Down
15 changes: 7 additions & 8 deletions tapchannel/aux_sweeper.go
Original file line number Diff line number Diff line change
Expand Up @@ -801,21 +801,20 @@ func importOutputProofs(scid lnwire.ShortChannelID,

// First, we'll make a courier to use in fetching the proofs we
// need.
proofFetcher, err := proofDispatch.NewCourier(
courierAddr, proof.Recipient{
ScriptKey: scriptKey,
AssetID: proofPrevID.ID,
Amount: proofToImport.Asset.Amount,
},
)
proofFetcher, err := proofDispatch.NewCourier(courierAddr)
if err != nil {
return fmt.Errorf("unable to create proof courier: %w",
err)
}

ctxb := context.Background()
recipient := proof.Recipient{
ScriptKey: scriptKey,
AssetID: proofPrevID.ID,
Amount: proofToImport.Asset.Amount,
}
prefixProof, err := proofFetcher.ReceiveProof(
ctxb, inputProofLocator,
ctxb, recipient, inputProofLocator,
)

// Always attempt to close the courier, even if we encounter an
Expand Down
14 changes: 7 additions & 7 deletions tapfreighter/chain_porter.go
Original file line number Diff line number Diff line change
Expand Up @@ -819,13 +819,8 @@ func (p *ChainPorter) transferReceiverProof(pkg *sendPackage) error {

// Initiate proof courier service handle from the proof
// courier address found in the Tap address.
recipient := proof.Recipient{
ScriptKey: key,
AssetID: *receiverProof.AssetID,
Amount: out.Amount,
}
courier, err := p.cfg.ProofCourierDispatcher.NewCourier(
proofCourierAddr, recipient,
proofCourierAddr,
)
if err != nil {
return fmt.Errorf("unable to initiate proof courier "+
Expand All @@ -841,7 +836,12 @@ func (p *ChainPorter) transferReceiverProof(pkg *sendPackage) error {
p.subscriberMtx.Unlock()

// Deliver proof to proof courier service.
err = courier.DeliverProof(ctx, receiverProof)
recipient := proof.Recipient{
ScriptKey: key,
AssetID: *receiverProof.AssetID,
Amount: out.Amount,
}
err = courier.DeliverProof(ctx, recipient, receiverProof)

// If the proof courier returned a backoff error, then
// we'll just return nil here so that we can retry
Expand Down
14 changes: 7 additions & 7 deletions tapgarden/custodian.go
Original file line number Diff line number Diff line change
Expand Up @@ -563,13 +563,8 @@ func (c *Custodian) receiveProof(addr *address.Tap, op wire.OutPoint,

// Initiate proof courier service handle from the proof courier address
// found in the Tap address.
recipient := proof.Recipient{
ScriptKey: &addr.ScriptKey,
AssetID: assetID,
Amount: addr.Amount,
}
courier, err := c.cfg.ProofCourierDispatcher.NewCourier(
&addr.ProofCourierAddr, recipient,
&addr.ProofCourierAddr,
)
if err != nil {
return fmt.Errorf("unable to initiate proof courier service "+
Expand All @@ -593,13 +588,18 @@ func (c *Custodian) receiveProof(addr *address.Tap, op wire.OutPoint,
}

// Attempt to receive proof via proof courier service.
recipient := proof.Recipient{
ScriptKey: &addr.ScriptKey,
AssetID: assetID,
Amount: addr.Amount,
}
loc := proof.Locator{
AssetID: &assetID,
GroupKey: addr.GroupKey,
ScriptKey: addr.ScriptKey,
OutPoint: &op,
}
addrProof, err := courier.ReceiveProof(ctx, loc)
addrProof, err := courier.ReceiveProof(ctx, recipient, loc)
if err != nil {
return fmt.Errorf("unable to receive proof using courier: %w",
err)
Expand Down
6 changes: 4 additions & 2 deletions tapgarden/custodian_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -644,7 +644,8 @@ func TestTransactionHandling(t *testing.T) {
h.walletAnchor.Transactions = append(h.walletAnchor.Transactions, *tx)

mockProof := randProof(t, outputIdx, tx.Tx, genesis[0], addrs[0])
err := h.courier.DeliverProof(nil, mockProof)
recipient := proof.Recipient{}
err := h.courier.DeliverProof(nil, recipient, mockProof)
require.NoError(t, err)

require.NoError(t, h.c.Start())
Expand Down Expand Up @@ -708,6 +709,7 @@ func runTransactionConfirmedOnlyTest(t *testing.T, withRestart bool) {
// need to signal an unconfirmed transaction for each of them now.
outputIndexes := make([]int, numAddrs)
transactions := make([]*lndclient.Transaction, numAddrs)
recipient := proof.Recipient{}
for idx := range addrs {
outputIndex, tx := randWalletTx(addrs[idx])
outputIndexes[idx] = outputIndex
Expand All @@ -719,7 +721,7 @@ func runTransactionConfirmedOnlyTest(t *testing.T, withRestart bool) {
mockProof := randProof(
t, outputIndexes[idx], tx.Tx, genesis[idx], addrs[idx],
)
_ = h.courier.DeliverProof(nil, mockProof)
_ = h.courier.DeliverProof(nil, recipient, mockProof)
}

// We want events to be created for each address, they should be in the
Expand Down

0 comments on commit 0a64116

Please sign in to comment.