diff --git a/itest/send_test.go b/itest/send_test.go index c7ab48f26..ca709666f 100644 --- a/itest/send_test.go +++ b/itest/send_test.go @@ -21,6 +21,10 @@ import ( "github.com/stretchr/testify/require" ) +var ( + transferTypeSend = taprpc.ProofTransferType_PROOF_TRANSFER_TYPE_SEND +) + // testBasicSendUnidirectional tests that we can properly send assets back and // forth between nodes. func testBasicSendUnidirectional(t *harnessTest) { @@ -619,7 +623,7 @@ func testReattemptFailedSendHashmailCourier(t *harnessTest) { switch eventTyped := event.Event.(type) { case *taprpc.SendAssetEvent_ProofTransferBackoffWaitEvent: ev := eventTyped.ProofTransferBackoffWaitEvent - if ev.TransferType != taprpc.ProofTransferType_PROOF_TRANSFER_TYPE_SEND { + if ev.TransferType != transferTypeSend { return false } @@ -726,7 +730,7 @@ func testReattemptFailedSendUniCourier(t *harnessTest) { switch eventTyped := event.Event.(type) { case *taprpc.SendAssetEvent_ProofTransferBackoffWaitEvent: ev := eventTyped.ProofTransferBackoffWaitEvent - if ev.TransferType != taprpc.ProofTransferType_PROOF_TRANSFER_TYPE_SEND { + if ev.TransferType != transferTypeSend { return false } @@ -1000,7 +1004,7 @@ func testOfflineReceiverEventuallyReceives(t *harnessTest) { // node. We therefore expect to receive // deliver transfer type backoff wait events // for sending transfers. - if ev.TransferType != taprpc.ProofTransferType_PROOF_TRANSFER_TYPE_SEND { + if ev.TransferType != transferTypeSend { return false } diff --git a/itest/tapd_harness.go b/itest/tapd_harness.go index 0e6bf5c18..5afcc8714 100644 --- a/itest/tapd_harness.go +++ b/itest/tapd_harness.go @@ -47,9 +47,19 @@ var ( "allow the postgres fixture to run in total. Needs "+ "to be increased for long-running tests.") - // defaultBackoffConfig is the default backoff config we'll use for - // sending proofs. - defaultBackoffConfig = proof.BackoffCfg{ + // defaultHashmailBackoffConfig is the default backoff config we'll use + // for sending proofs with the hashmail courier. + defaultHashmailBackoffConfig = proof.BackoffCfg{ + BackoffResetWait: time.Second, + NumTries: 5, + InitialBackoff: 300 * time.Millisecond, + MaxBackoff: 600 * time.Millisecond, + } + + // defaultUniverseRpcBackoffConfig is the default backoff config we'll + // use for sending proofs with the universe RPC courier. + defaultUniverseRpcBackoffConfig = proof.BackoffCfg{ + SkipInitDelay: true, BackoffResetWait: time.Second, NumTries: 5, InitialBackoff: 300 * time.Millisecond, @@ -66,7 +76,7 @@ const ( // defaultProofTransferReceiverAckTimeout is the default itest specific // timeout we'll use for waiting for a receiver to acknowledge a proof // transfer. - defaultProofTransferReceiverAckTimeout = 15 * time.Second + defaultProofTransferReceiverAckTimeout = 5 * time.Second ) // tapdHarness is a test harness that holds everything that is needed to @@ -209,9 +219,11 @@ func newTapdHarness(t *testing.T, ht *harnessTest, cfg tapdConfig, // Populate proof courier specific config fields. // // Use passed in backoff config or default config. - backoffCfg := defaultBackoffConfig + hashmailBackoffCfg := defaultHashmailBackoffConfig + universeRpcBackoffCfg := defaultUniverseRpcBackoffConfig if opts.proofSendBackoffCfg != nil { - backoffCfg = *opts.proofSendBackoffCfg + hashmailBackoffCfg = *opts.proofSendBackoffCfg + universeRpcBackoffCfg = *opts.proofSendBackoffCfg } // Used passed in proof receiver ack timeout or default. @@ -220,12 +232,12 @@ func newTapdHarness(t *testing.T, ht *harnessTest, cfg tapdConfig, receiverAckTimeout = *opts.proofReceiverAckTimeout } - // TODO(ffranr): Disentangle the hashmail config from the universe RPC - // courier config. Right now, the universe courier takes the backoff - // config from the hashmail courier config. finalCfg.HashMailCourier = &proof.HashMailCourierCfg{ ReceiverAckTimeout: receiverAckTimeout, - BackoffCfg: &backoffCfg, + BackoffCfg: &hashmailBackoffCfg, + } + finalCfg.UniverseRpcCourier = &proof.UniverseRpcCourierCfg{ + BackoffCfg: &universeRpcBackoffCfg, } switch typedProofCourier := (opts.proofCourier).(type) { @@ -243,7 +255,6 @@ func newTapdHarness(t *testing.T, ht *harnessTest, cfg tapdConfig, default: finalCfg.DefaultProofCourierAddr = "" - finalCfg.HashMailCourier = nil } ht.t.Logf("Using proof courier address: %v", diff --git a/proof/courier.go b/proof/courier.go index f161b8190..d250a098b 100644 --- a/proof/courier.go +++ b/proof/courier.go @@ -72,206 +72,148 @@ type Courier interface { Close() error } -// CourierAddr is a fully validated courier address (including protocol specific -// validation). -type CourierAddr interface { - // Url returns the url.URL representation of the courier address. - Url() *url.URL - - // NewCourier generates a new courier service handle. - NewCourier(ctx context.Context, cfg *CourierCfg, - recipient Recipient) (Courier, error) -} +// CourierCfg contains general config parameters applicable to all proof +// couriers. +type CourierCfg struct { + // HashMailCfg contains hashmail protocol specific config parameters. + HashMailCfg *HashMailCourierCfg -// ParseCourierAddrString parses a proof courier address string and returns a -// protocol specific courier address instance. -func ParseCourierAddrString(addr string) (CourierAddr, error) { - // Parse URI. - urlAddr, err := url.ParseRequestURI(addr) - if err != nil { - return nil, fmt.Errorf("invalid proof courier URI address: %w", - err) - } + // UniverseRpcCfg contains universe RPC protocol specific config + // parameters. + UniverseRpcCfg *UniverseRpcCourierCfg - return ParseCourierAddrUrl(*urlAddr) + // TransferLog is a log for recording proof delivery and retrieval + // attempts. + TransferLog TransferLog } -// ParseCourierAddrUrl parses a proof courier address url.URL and returns a -// protocol specific courier address instance. -func ParseCourierAddrUrl(addr url.URL) (CourierAddr, error) { - // Create new courier addr based on URL scheme. - switch addr.Scheme { - case HashmailCourierType: - return NewHashMailCourierAddr(addr) - case UniverseRpcCourierType: - return NewUniverseRpcCourierAddr(addr) - } - - return nil, fmt.Errorf("unknown courier address protocol "+ - "(consider updating tapd): %v", addr.Scheme) +// CourierDispatch is an interface that abstracts away the different proof +// courier services that are supported. +type CourierDispatch interface { + // NewCourier instantiates a new courier service handle given a service + // URL address. + NewCourier(addr *url.URL, recipient Recipient) (Courier, error) } -// HashMailCourierAddr is a hashmail protocol specific implementation of the -// CourierAddr interface. -type HashMailCourierAddr struct { - addr url.URL +// URLDispatch is a proof courier dispatch that uses the courier address URL +// scheme to determine which courier service to use. +type URLDispatch struct { + cfg *CourierCfg } -// Url returns the url.URL representation of the hashmail courier address. -func (h *HashMailCourierAddr) Url() *url.URL { - return &h.addr +// NewCourierDispatch creates a new proof courier dispatch. +func NewCourierDispatch(cfg *CourierCfg) *URLDispatch { + return &URLDispatch{ + cfg: cfg, + } } -// NewCourier generates a new courier service handle. -func (h *HashMailCourierAddr) NewCourier(_ context.Context, cfg *CourierCfg, +// NewCourier instantiates a new courier service handle given a service URL +// address. +func (u *URLDispatch) NewCourier(addr *url.URL, recipient Recipient) (Courier, error) { - backoffHandle := NewBackoffHandler(cfg.BackoffCfg, cfg.TransferLog) + subscribers := make(map[uint64]*fn.EventReceiver[fn.Event]) - hashMailCfg := HashMailCourierCfg{ - ReceiverAckTimeout: cfg.ReceiverAckTimeout, - } + // Create new courier addr based on URL scheme. + switch addr.Scheme { + case HashmailCourierType: + cfg := u.cfg.HashMailCfg + backoffHandler := NewBackoffHandler( + cfg.BackoffCfg, u.cfg.TransferLog, + ) - hashMailBox, err := NewHashMailBox(&h.addr) - if err != nil { - return nil, fmt.Errorf("unable to make mailbox: %v", - err) - } + hashMailCfg := HashMailCourierCfg{ + ReceiverAckTimeout: cfg.ReceiverAckTimeout, + } - subscribers := make( - map[uint64]*fn.EventReceiver[fn.Event], - ) - return &HashMailCourier{ - cfg: &hashMailCfg, - backoffHandle: backoffHandle, - recipient: recipient, - mailbox: hashMailBox, - subscribers: subscribers, - }, nil -} + hashMailBox, err := NewHashMailBox(addr) + if err != nil { + return nil, fmt.Errorf("unable to make mailbox: %v", + err) + } -// NewHashMailCourierAddr generates a new hashmail courier address from a given -// URL. This function also performs hashmail protocol specific address -// validation. -func NewHashMailCourierAddr(addr url.URL) (*HashMailCourierAddr, error) { - if addr.Scheme != HashmailCourierType { - return nil, fmt.Errorf("expected hashmail courier protocol: %v", - addr.Scheme) - } + return &HashMailCourier{ + cfg: &hashMailCfg, + backoffHandle: backoffHandler, + recipient: recipient, + mailbox: hashMailBox, + subscribers: subscribers, + }, nil - // We expect the port number to be specified for a hashmail service. - if addr.Port() == "" { - return nil, fmt.Errorf("hashmail proof courier URI address " + - "port unspecified") - } + case UniverseRpcCourierType: + cfg := u.cfg.UniverseRpcCfg + backoffHandler := NewBackoffHandler( + cfg.BackoffCfg, u.cfg.TransferLog, + ) - return &HashMailCourierAddr{ - addr, - }, nil -} + // Connect to the universe RPC server. + dialOpts, err := serverDialOpts() + if err != nil { + return nil, err + } -// UniverseRpcCourierAddr is a universe RPC protocol specific implementation of -// the CourierAddr interface. -type UniverseRpcCourierAddr struct { - addr url.URL -} + serverAddr := fmt.Sprintf("%s:%s", addr.Hostname(), addr.Port()) + conn, err := grpc.Dial(serverAddr, dialOpts...) + if err != nil { + return nil, err + } -// Url returns the url.URL representation of the courier address. -func (h *UniverseRpcCourierAddr) Url() *url.URL { - return &h.addr -} + client := unirpc.NewUniverseClient(conn) -// NewCourier generates a new courier service handle. -func (h *UniverseRpcCourierAddr) NewCourier(_ context.Context, - cfg *CourierCfg, recipient Recipient) (Courier, error) { - - // Skip the initial delivery delay for the universe RPC courier. - // This courier skips the initial delay because it uses the backoff - // procedure for each proof within a proof file separately. - // Consequently, if we attempt to perform two consecutive send events - // which share the same proof lineage (matching ancestral proofs), the - // second send event will be delayed by the initial delay. - cfg.BackoffCfg.SkipInitDelay = true - backoffHandle := NewBackoffHandler(cfg.BackoffCfg, cfg.TransferLog) - - // Ensure that the courier address is a universe RPC address. - if h.addr.Scheme != UniverseRpcCourierType { - return nil, fmt.Errorf("unsupported courier protocol: %v", - h.addr.Scheme) + return &UniverseRpcCourier{ + recipient: recipient, + client: client, + backoffHandle: backoffHandler, + transfer: u.cfg.TransferLog, + subscribers: subscribers, + rawConn: conn, + }, nil + + default: + return nil, fmt.Errorf("unknown courier address protocol "+ + "(consider updating tapd): %v", addr.Scheme) } +} - // Connect to the universe RPC server. - dialOpts, err := serverDialOpts() +// A compile-time assertion to ensure that the URLDispatch meets the +// CourierDispatch interface. +var _ CourierDispatch = (*URLDispatch)(nil) + +// ParseCourierAddress attempts to parse the given string as a proof courier +// address, validates that all required fields are present and ensures the +// protocol is one of the supported protocols. +func ParseCourierAddress(addr string) (*url.URL, error) { + urlAddr, err := url.ParseRequestURI(addr) if err != nil { - return nil, err + return nil, fmt.Errorf("invalid proof courier URI address: %w", + err) } - serverAddr := fmt.Sprintf( - "%s:%s", h.addr.Hostname(), h.addr.Port(), - ) - conn, err := grpc.Dial(serverAddr, dialOpts...) - if err != nil { + if err := ValidateCourierAddress(urlAddr); err != nil { return nil, err } - client := unirpc.NewUniverseClient(conn) - - // Instantiate the events subscribers map. - subscribers := make( - map[uint64]*fn.EventReceiver[fn.Event], - ) - - return &UniverseRpcCourier{ - recipient: recipient, - client: client, - backoffHandle: backoffHandle, - transfer: cfg.TransferLog, - subscribers: subscribers, - rawConn: conn, - }, nil + return urlAddr, nil } -// NewUniverseRpcCourierAddr generates a new universe RPC courier address from a -// given URL. This function also performs protocol specific address validation. -func NewUniverseRpcCourierAddr(addr url.URL) (*UniverseRpcCourierAddr, error) { +// ValidateCourierAddress validates that all required fields are present and +// ensures the protocol is one of the supported protocols. +func ValidateCourierAddress(addr *url.URL) error { // We expect the port number to be specified. if addr.Port() == "" { - return nil, fmt.Errorf("proof courier URI address port " + - "unspecified") + return fmt.Errorf("proof courier URI address port unspecified") } - return &UniverseRpcCourierAddr{ - addr, - }, nil -} - -// NewCourier instantiates a new courier service handle given a service URL -// address. -func NewCourier(ctx context.Context, addr url.URL, cfg *CourierCfg, - recipient Recipient) (Courier, error) { + switch addr.Scheme { + case HashmailCourierType, UniverseRpcCourierType: + // Valid and known courier address protocol. + return nil - courierAddr, err := ParseCourierAddrUrl(addr) - if err != nil { - return nil, err + default: + return fmt.Errorf("unknown courier address protocol "+ + "(consider updating tapd): %v", addr.Scheme) } - - return courierAddr.NewCourier(ctx, cfg, recipient) -} - -// CourierCfg contains general config parameters applicable to all proof -// couriers. -type CourierCfg struct { - // ReceiverAckTimeout is the maximum time we'll wait for the receiver to - // acknowledge the proof. - ReceiverAckTimeout time.Duration - - // BackoffCfg configures the behaviour of the proof delivery - // functionality. - BackoffCfg *BackoffCfg - - // TransferLog is a log for recording proof delivery and retrieval - // attempts. - TransferLog TransferLog } // ProofMailbox represents an abstract store-and-forward mailbox that can be @@ -476,7 +418,7 @@ func (h *HashMailBox) RecvAck(ctx context.Context, sid streamID) error { return fmt.Errorf("expected ack, got %x", msg.Msg) } -// CleanUp atempts to tear down the mailbox as specified by the passed sid. +// CleanUp attempts to tear down the mailbox as specified by the passed sid. func (h *HashMailBox) CleanUp(ctx context.Context, sid streamID) error { streamAuth := &hashmailrpc.CipherBoxAuth{ Desc: &hashmailrpc.CipherBoxDesc{ @@ -557,10 +499,10 @@ func (e *BackoffExecError) Error() string { // BackoffCfg configures the behaviour of the proof delivery backoff procedure. type BackoffCfg struct { - // SkipInitDelay is a flag that indicates whether we should skip - // the initial delay before attempting to deliver the proof to the - // receiver. - SkipInitDelay bool + // SkipInitDelay is a flag that indicates whether we should skip the + // initial delay before attempting to deliver the proof to the receiver + // or receiving from the sender. + SkipInitDelay bool `long:"skipinitdelay" description:"Skip the initial delay before attempting to deliver the proof to the receiver or receiving from the sender."` // BackoffResetWait is the amount of time we'll wait before // resetting the backoff counter to its initial state. @@ -1049,6 +991,13 @@ func (h *HashMailCourier) SetSubscribers( // proof.Courier interface. var _ Courier = (*HashMailCourier)(nil) +// UniverseRpcCourierCfg is the config for the universe RPC proof courier. +type UniverseRpcCourierCfg struct { + // BackoffCfg configures the behaviour of the proof delivery + // functionality. + BackoffCfg *BackoffCfg +} + // UniverseRpcCourier is a universe RPC proof courier service handle. It // implements the Courier interface. type UniverseRpcCourier struct { diff --git a/proof/mock.go b/proof/mock.go index 55c5d1ea6..c4810e8be 100644 --- a/proof/mock.go +++ b/proof/mock.go @@ -5,6 +5,8 @@ import ( "context" "encoding/hex" "io" + "net/url" + "sync" "testing" "time" @@ -13,6 +15,7 @@ import ( "github.com/btcsuite/btcd/wire" "github.com/lightninglabs/taproot-assets/asset" "github.com/lightninglabs/taproot-assets/commitment" + "github.com/lightninglabs/taproot-assets/fn" "github.com/lightninglabs/taproot-assets/internal/test" "github.com/stretchr/testify/require" ) @@ -74,6 +77,94 @@ func MockGroupAnchorVerifier(gen *asset.Genesis, return nil } +// MockProofCourierDispatcher is a mock proof courier dispatcher which returns +// the same courier for all requests. +type MockProofCourierDispatcher struct { + Courier Courier +} + +// NewCourier instantiates a new courier service handle given a service +// URL address. +func (m *MockProofCourierDispatcher) NewCourier(*url.URL, Recipient) (Courier, + error) { + + return m.Courier, nil +} + +// MockProofCourier is a mock proof courier which stores the last proof it +// received. +type MockProofCourier struct { + sync.Mutex + + currentProofs map[asset.SerializedKey]*AnnotatedProof + + subscribers map[uint64]*fn.EventReceiver[fn.Event] +} + +// NewMockProofCourier returns a new mock proof courier. +func NewMockProofCourier() *MockProofCourier { + return &MockProofCourier{ + currentProofs: make(map[asset.SerializedKey]*AnnotatedProof), + } +} + +// Start starts the proof courier service. +func (m *MockProofCourier) Start(chan error) error { + return nil +} + +// Stop stops the proof courier service. +func (m *MockProofCourier) Stop() error { + return nil +} + +// DeliverProof attempts to delivery a proof to the receiver, using the +// information in the Addr type. +func (m *MockProofCourier) DeliverProof(_ context.Context, + proof *AnnotatedProof) error { + + m.Lock() + defer m.Unlock() + + m.currentProofs[asset.ToSerialized(&proof.ScriptKey)] = proof + + return nil +} + +// ReceiveProof attempts to obtain a proof as identified by the passed +// locator from the source encapsulated within the specified address. +func (m *MockProofCourier) ReceiveProof(_ context.Context, + loc Locator) (*AnnotatedProof, error) { + + m.Lock() + defer m.Unlock() + + proof, ok := m.currentProofs[asset.ToSerialized(&loc.ScriptKey)] + if !ok { + return nil, ErrProofNotFound + } + + return proof, nil +} + +// SetSubscribers sets the set of subscribers that will be notified +// of proof courier related events. +func (m *MockProofCourier) SetSubscribers( + subscribers map[uint64]*fn.EventReceiver[fn.Event]) { + + m.Lock() + defer m.Unlock() + + m.subscribers = subscribers +} + +// Close stops the courier instance. +func (m *MockProofCourier) Close() error { + return nil +} + +var _ Courier = (*MockProofCourier)(nil) + type ValidTestCase struct { Proof *TestProof `json:"proof"` Expected string `json:"expected"` diff --git a/rpcserver.go b/rpcserver.go index a89b674d0..1fa021f64 100644 --- a/rpcserver.go +++ b/rpcserver.go @@ -1054,19 +1054,14 @@ func (r *rpcServer) NewAddr(ctx context.Context, // the default specified in the config. courierAddr := r.cfg.DefaultProofCourierAddr if req.ProofCourierAddr != "" { - addr, err := proof.ParseCourierAddrString( + var err error + courierAddr, err = proof.ParseCourierAddress( req.ProofCourierAddr, ) if err != nil { return nil, fmt.Errorf("invalid proof courier "+ "address: %w", err) } - - // At this point, we do not intend on creating a proof courier - // service instance. We are only interested in parsing and - // validating the address. We therefore convert the address into - // an url.URL type for storage in the address book. - courierAddr = addr.Url() } // Check that the proof courier address is set. This should never @@ -1075,7 +1070,6 @@ func (r *rpcServer) NewAddr(ctx context.Context, if courierAddr == nil { return nil, fmt.Errorf("no proof courier address provided") } - proofCourierAddr := *courierAddr if len(req.AssetId) != 32 { return nil, fmt.Errorf("invalid asset id length") @@ -1112,8 +1106,7 @@ func (r *rpcServer) NewAddr(ctx context.Context, // Now that we have all the params, we'll try to add a new // address to the addr book. addr, err = r.cfg.AddrBook.NewAddress( - ctx, assetID, req.Amt, tapscriptSibling, - proofCourierAddr, + ctx, assetID, req.Amt, tapscriptSibling, *courierAddr, address.WithAssetVersion(assetVersion), ) if err != nil { @@ -1154,7 +1147,7 @@ func (r *rpcServer) NewAddr(ctx context.Context, // address to the addr book. addr, err = r.cfg.AddrBook.NewAddressWithKeys( ctx, assetID, req.Amt, *scriptKey, internalKey, - tapscriptSibling, proofCourierAddr, + tapscriptSibling, *courierAddr, address.WithAssetVersion(assetVersion), ) if err != nil { diff --git a/tapcfg/config.go b/tapcfg/config.go index d5f6c5f5c..ebccde50e 100644 --- a/tapcfg/config.go +++ b/tapcfg/config.go @@ -303,8 +303,9 @@ type Config struct { ReOrgSafeDepth int32 `long:"reorgsafedepth" description:"The number of confirmations we'll wait for before considering a transaction safely buried in the chain."` // The following options are used to configure the proof courier. - DefaultProofCourierAddr string `long:"proofcourieraddr" description:"Default proof courier service address."` - HashMailCourier *proof.HashMailCourierCfg `group:"proofcourier" namespace:"hashmailcourier"` + DefaultProofCourierAddr string `long:"proofcourieraddr" description:"Default proof courier service address."` + HashMailCourier *proof.HashMailCourierCfg `group:"hashmailcourier" namespace:"hashmailcourier"` + UniverseRpcCourier *proof.UniverseRpcCourierCfg `group:"universerpccourier" namespace:"universerpccourier"` CustodianProofRetrievalDelay time.Duration `long:"custodianproofretrievaldelay" description:"The number of seconds the custodian waits after identifying an asset transfer on-chain and before retrieving the corresponding proof."` @@ -391,6 +392,15 @@ func DefaultConfig() Config { MaxBackoff: defaultProofTransferMaxBackoff, }, }, + UniverseRpcCourier: &proof.UniverseRpcCourierCfg{ + BackoffCfg: &proof.BackoffCfg{ + SkipInitDelay: true, + BackoffResetWait: defaultProofTransferBackoffResetWait, + NumTries: defaultProofTransferNumTries, + InitialBackoff: defaultProofTransferInitialBackoff, + MaxBackoff: defaultProofTransferMaxBackoff, + }, + }, CustodianProofRetrievalDelay: defaultProofRetrievalDelay, Universe: &UniverseConfig{ SyncInterval: defaultUniverseSyncInterval, diff --git a/tapcfg/server.go b/tapcfg/server.go index 37786b96b..1020d2e24 100644 --- a/tapcfg/server.go +++ b/tapcfg/server.go @@ -212,7 +212,7 @@ func genServerConfig(cfg *Config, cfgLogger btclog.Logger, fallbackHashmailCourierAddr := fmt.Sprintf( "%s://%s", proof.HashmailCourierType, fallbackHashMailAddr, ) - proofCourierAddr, err := proof.ParseCourierAddrString( + proofCourierAddr, err := proof.ParseCourierAddress( fallbackHashmailCourierAddr, ) if err != nil { @@ -222,7 +222,7 @@ func genServerConfig(cfg *Config, cfgLogger btclog.Logger, // If default proof courier address is set, use it as the default. if cfg.DefaultProofCourierAddr != "" { - proofCourierAddr, err = proof.ParseCourierAddrString( + proofCourierAddr, err = proof.ParseCourierAddress( cfg.DefaultProofCourierAddr, ) if err != nil { @@ -231,18 +231,6 @@ func genServerConfig(cfg *Config, cfgLogger btclog.Logger, } } - // TODO(ffranr): This logic is leftover for integration tests which - // do not yet enable a proof courier. Remove once all integration tests - // support a proof courier. - var proofCourierCfg *proof.CourierCfg - if cfg.HashMailCourier != nil { - proofCourierCfg = &proof.CourierCfg{ - ReceiverAckTimeout: cfg.HashMailCourier.ReceiverAckTimeout, - BackoffCfg: cfg.HashMailCourier.BackoffCfg, - TransferLog: assetStore, - } - } - reOrgWatcher := tapgarden.NewReOrgWatcher(&tapgarden.ReOrgWatcherConfig{ ChainBridge: chainBridge, GroupVerifier: tapgarden.GenGroupVerifier( @@ -330,6 +318,15 @@ func genServerConfig(cfg *Config, cfgLogger btclog.Logger, ChainParams: &tapChainParams, }) + // Addresses can have different proof couriers configured, but both + // types of couriers that currently exist will receive this config upon + // initialization. + proofCourierDispatcher := proof.NewCourierDispatch(&proof.CourierCfg{ + HashMailCfg: cfg.HashMailCourier, + UniverseRpcCfg: cfg.UniverseRpcCourier, + TransferLog: assetStore, + }) + return &tap.Config{ DebugLevel: cfg.DebugLevel, RuntimeID: runtimeID, @@ -362,19 +359,18 @@ func genServerConfig(cfg *Config, cfgLogger btclog.Logger, GroupVerifier: tapgarden.GenGroupVerifier( context.Background(), assetMintingStore, ), - AddrBook: addrBook, - ProofArchive: proofArchive, - ProofNotifier: assetStore, - ErrChan: mainErrChan, - ProofCourierCfg: proofCourierCfg, - ProofRetrievalDelay: cfg.CustodianProofRetrievalDelay, - ProofWatcher: reOrgWatcher, + AddrBook: addrBook, + ProofArchive: proofArchive, + ProofNotifier: assetStore, + ErrChan: mainErrChan, + ProofCourierDispatcher: proofCourierDispatcher, + ProofRetrievalDelay: cfg.CustodianProofRetrievalDelay, ProofWatcher: reOrgWatcher, }, ), ChainBridge: chainBridge, AddrBook: addrBook, AddrBookDisableSyncer: cfg.AddrBook.DisableSyncer, - DefaultProofCourierAddr: proofCourierAddr.Url(), + DefaultProofCourierAddr: proofCourierAddr, ProofArchive: proofArchive, AssetWallet: assetWallet, CoinSelect: coinSelect, @@ -387,13 +383,13 @@ func genServerConfig(cfg *Config, cfgLogger btclog.Logger, GroupVerifier: tapgarden.GenGroupVerifier( context.Background(), assetMintingStore, ), - Wallet: walletAnchor, - KeyRing: keyRing, - AssetWallet: assetWallet, - AssetProofs: proofFileStore, - ProofCourierCfg: proofCourierCfg, - ProofWatcher: reOrgWatcher, - ErrChan: mainErrChan, + Wallet: walletAnchor, + KeyRing: keyRing, + AssetWallet: assetWallet, + AssetProofs: proofFileStore, + ProofCourierDispatcher: proofCourierDispatcher, + ProofWatcher: reOrgWatcher, + ErrChan: mainErrChan, }, ), UniverseArchive: baseUni, diff --git a/tapfreighter/chain_porter.go b/tapfreighter/chain_porter.go index 8295567b0..d345cf2c1 100644 --- a/tapfreighter/chain_porter.go +++ b/tapfreighter/chain_porter.go @@ -60,9 +60,10 @@ type ChainPorterConfig struct { // TODO(roasbeef): replace with proof.Courier in the future/ AssetProofs proof.Archiver - // ProofCourierCfg is a general config applicable to all proof courier - // service handles. - ProofCourierCfg *proof.CourierCfg + // ProofCourierDispatcher is the dispatcher that is used to create new + // proof courier handles for sending proofs based on the protocol of + // a proof courier address. + ProofCourierDispatcher proof.CourierDispatch // ProofWatcher is used to watch new proofs for their anchor transaction // to be confirmed safely with a minimum number of confirmations. @@ -648,7 +649,7 @@ func (p *ChainPorter) transferReceiverProof(pkg *sendPackage) error { log.Debugf("Attempting to deliver proof for script key %x", key.SerializeCompressed()) - proofCourierAddr, err := proof.ParseCourierAddrString( + proofCourierAddr, err := proof.ParseCourierAddress( string(out.ProofCourierAddr), ) if err != nil { @@ -663,8 +664,8 @@ func (p *ChainPorter) transferReceiverProof(pkg *sendPackage) error { AssetID: *receiverProof.AssetID, Amount: out.Amount, } - courier, err := proofCourierAddr.NewCourier( - ctx, p.cfg.ProofCourierCfg, recipient, + courier, err := p.cfg.ProofCourierDispatcher.NewCourier( + proofCourierAddr, recipient, ) if err != nil { return fmt.Errorf("unable to initiate proof courier "+ @@ -700,7 +701,7 @@ func (p *ChainPorter) transferReceiverProof(pkg *sendPackage) error { return nil } - // If we have a proof courier instance active, then we'll launch several + // If we have a non-interactive proof, then we'll launch several // goroutines to deliver the proof(s) to the receiver(s). Since a // pre-signed parcel (a parcel that uses the RPC driven vPSBT flow) // doesn't have proof courier URLs (they aren't part of the vPSBT), the @@ -708,7 +709,7 @@ func (p *ChainPorter) transferReceiverProof(pkg *sendPackage) error { // to receiver, and we don't even need to attempt to use a proof // courier. _, isPreSigned := pkg.Parcel.(*PreSignedParcel) - if p.cfg.ProofCourierCfg != nil && !isPreSigned { + if !isPreSigned { ctx, cancel := p.WithCtxQuitNoTimeout() defer cancel() diff --git a/tapfreighter/parcel.go b/tapfreighter/parcel.go index fe6018730..53066864b 100644 --- a/tapfreighter/parcel.go +++ b/tapfreighter/parcel.go @@ -183,7 +183,7 @@ func (p *AddressParcel) Validate() error { tapAddr := p.destAddrs[idx] // Validate proof courier addresses. - _, err := proof.ParseCourierAddrUrl(tapAddr.ProofCourierAddr) + err := proof.ValidateCourierAddress(&tapAddr.ProofCourierAddr) if err != nil { return fmt.Errorf("invalid proof courier address: %w", err) diff --git a/tapgarden/custodian.go b/tapgarden/custodian.go index 024c974f0..8d5b6b352 100644 --- a/tapgarden/custodian.go +++ b/tapgarden/custodian.go @@ -71,7 +71,7 @@ type CustodianConfig struct { // ProofArchive is the storage backend for proofs to which we store new // incoming proofs. - ProofArchive proof.NotifyArchiver + ProofArchive proof.Archiver // ProofNotifier is the storage backend for proofs from which we are // notified about new proofs. This can be the same as the ProofArchive @@ -80,9 +80,10 @@ type CustodianConfig struct { // being available in the relational database). ProofNotifier proof.NotifyArchiver - // ProofCourierCfg is a general config applicable to all proof courier - // service handles. - ProofCourierCfg *proof.CourierCfg + // ProofCourierDispatcher is the dispatcher that is used to create new + // proof courier handles for receiving proofs based on the protocol of + // a proof courier address. + ProofCourierDispatcher proof.CourierDispatch // ProofRetrievalDelay is the time duration the custodian waits having // identified an asset transfer on-chain and before retrieving the @@ -270,11 +271,24 @@ func (c *Custodian) watchInboundAssets() { // Maybe a proof was delivered while we were shutting down or // starting up, let's check now. - err = c.checkProofAvailable(event) + available, err := c.checkProofAvailable(event) if err != nil { reportErr(err) return } + + // If we did find a proof, we did import it now and can remove + // the event from our cache. + if available { + delete(c.events, event.Outpoint) + + continue + } + + // If we didn't find a proof, we'll launch a goroutine to use + // the ProofCourier to import the proof into our local DB. + c.Wg.Add(1) + go c.receiveProof(event.Addr.Tap, event.Outpoint) } // Read all on-chain transactions and make sure they are mapped to an @@ -382,13 +396,21 @@ func (c *Custodian) inspectWalletTx(walletTx *lndclient.Transaction) error { } c.events[op] = event + + // Now that we've seen this output confirm on + // chain, we'll launch a goroutine to use the + // ProofCourier to import the proof into our + // local DB. + c.Wg.Add(1) + go c.receiveProof(event.Addr.Tap, op) } continue } // This is a new output, let's find out if it's for an address - // of ours. + // of ours. This step also creates a new event for the address + // if it doesn't exist yet. addr, err := c.mapToTapAddr(walletTx, uint32(idx), op) if err != nil { return err @@ -400,110 +422,98 @@ func (c *Custodian) inspectWalletTx(walletTx *lndclient.Transaction) error { continue } - // TODO(ffranr): This proof courier disabled check should be - // removed. It was implemented because some integration test do - // not setup and use a proof courier. - if c.cfg.ProofCourierCfg == nil { + // We now need to wait for a confirmation, since proofs will + // be delivered once the anchor transaction is confirmed. If + // we skip it now, we'll receive another notification once the + // transaction is confirmed. + if walletTx.Confirmations == 0 { continue } - // Now that we've seen this output on chain, we'll launch a - // goroutine to use the ProofCourier to import the proof into - // our local DB. + // Now that we've seen this output confirm on chain, we'll + // launch a goroutine to use the ProofCourier to import the + // proof into our local DB. c.Wg.Add(1) - go func() { - defer c.Wg.Done() + go c.receiveProof(addr, op) + } - ctx, cancel := c.WithCtxQuitNoTimeout() - defer cancel() + return nil +} - assetID := addr.AssetID +// receiveProof attempts to receive a proof for the given address and outpoint +// via the proof courier service. +// +// NOTE: This must be called as a goroutine. +func (c *Custodian) receiveProof(addr *address.Tap, op wire.OutPoint) { + defer c.Wg.Done() - log.Debugf("Waiting to receive proof for script key %x", - addr.ScriptKey.SerializeCompressed()) + ctx, cancel := c.WithCtxQuitNoTimeout() + defer cancel() - // Initiate proof courier service handle from the proof - // courier address found in the Tap address. - recipient := proof.Recipient{ - ScriptKey: &addr.ScriptKey, - AssetID: assetID, - Amount: addr.Amount, - } - courier, err := proof.NewCourier( - ctx, addr.ProofCourierAddr, - c.cfg.ProofCourierCfg, recipient, - ) - if err != nil { - log.Errorf("unable to initiate proof courier "+ - "service handle: %v", err) - return - } + assetID := addr.AssetID - // Update courier handle events subscribers before - // attempting to retrieve proof. - c.statusEventsSubsMtx.Lock() - courier.SetSubscribers(c.statusEventsSubs) - c.statusEventsSubsMtx.Unlock() - - // Sleep to give the sender an opportunity to transfer - // the proof to the proof courier service. - // Without this delay our first attempt at retrieving - // the proof will very likely fail. We should expect - // retrieval success before this delay. - select { - case <-time.After(c.cfg.ProofRetrievalDelay): - case <-ctx.Done(): - return - } + scriptKeyBytes := addr.ScriptKey.SerializeCompressed() + log.Debugf("Waiting to receive proof for script key %x", scriptKeyBytes) - // Attempt to receive proof via proof courier service. - loc := proof.Locator{ - AssetID: &assetID, - GroupKey: addr.GroupKey, - ScriptKey: addr.ScriptKey, - OutPoint: &op, - } - addrProof, err := courier.ReceiveProof(ctx, loc) - if err != nil { - log.Errorf("unable to recv proof: %v", err) - return - } + // Initiate proof courier service handle from the proof courier address + // found in the Tap address. + recipient := proof.Recipient{ + ScriptKey: &addr.ScriptKey, + AssetID: assetID, + Amount: addr.Amount, + } + courier, err := c.cfg.ProofCourierDispatcher.NewCourier( + &addr.ProofCourierAddr, recipient, + ) + if err != nil { + log.Errorf("Unable to initiate proof courier service handle: "+ + "%v", err) + return + } - log.Debugf("Received proof for: script_key=%x, "+ - "asset_id=%x", - addr.ScriptKey.SerializeCompressed(), - assetID[:]) - - ctx, cancel = c.CtxBlocking() - defer cancel() - - headerVerifier := GenHeaderVerifier( - ctx, c.cfg.ChainBridge, - ) - err = c.cfg.ProofArchive.ImportProofs( - ctx, headerVerifier, c.cfg.GroupVerifier, false, - addrProof, - ) - if err != nil { - log.Errorf("unable to import proofs: %v", err) - return - } + // Update courier handle events subscribers before attempting to + // retrieve proof. + c.statusEventsSubsMtx.Lock() + courier.SetSubscribers(c.statusEventsSubs) + c.statusEventsSubsMtx.Unlock() + + // Sleep to give the sender an opportunity to transfer the proof to the + // proof courier service. Without this delay our first attempt at + // retrieving the proof will very likely fail. We should expect + // retrieval success before this delay. + select { + case <-time.After(c.cfg.ProofRetrievalDelay): + case <-ctx.Done(): + return + } - // At this point the "receive" process is complete. We - // will now notify all status event subscribers. - recvCompleteEvent := NewAssetRecvCompleteEvent( - *addr, op, - ) - err = c.publishSubscriberStatusEvent(recvCompleteEvent) - if err != nil { - log.Errorf("unable publish status event: %v", - err) - return - } - }() + // Attempt to receive proof via proof courier service. + loc := proof.Locator{ + AssetID: &assetID, + GroupKey: addr.GroupKey, + ScriptKey: addr.ScriptKey, + OutPoint: &op, + } + addrProof, err := courier.ReceiveProof(ctx, loc) + if err != nil { + log.Errorf("Unable to receive proof using courier: %v", err) + return } - return nil + log.Debugf("Received proof for: script_key=%x, asset_id=%x", + scriptKeyBytes, assetID[:]) + + ctx, cancel = c.CtxBlocking() + defer cancel() + + headerVerifier := GenHeaderVerifier(ctx, c.cfg.ChainBridge) + err = c.cfg.ProofArchive.ImportProofs( + ctx, headerVerifier, c.cfg.GroupVerifier, false, addrProof, + ) + if err != nil { + log.Errorf("Unable to import proofs: %v", err) + return + } } // mapToTapAddr attempts to match a transaction output to a Taproot Asset @@ -595,8 +605,7 @@ func (c *Custodian) importAddrToWallet(addr *address.AddrWithKeyInfo) error { log.Infof("Imported Taproot Asset address %v into wallet", addrStr) if p2trAddr != nil { - log.Infof("watching p2tr address %v on chain", - p2trAddr.String()) + log.Infof("Watching p2tr address %v on chain", p2trAddr) } return c.cfg.AddrBook.SetAddrManaged(ctxt, addr, time.Now()) @@ -604,7 +613,7 @@ func (c *Custodian) importAddrToWallet(addr *address.AddrWithKeyInfo) error { // checkProofAvailable checks the proof storage if a proof for the given event // is already available. If it is, and it checks out, the event is updated. -func (c *Custodian) checkProofAvailable(event *address.Event) error { +func (c *Custodian) checkProofAvailable(event *address.Event) (bool, error) { ctxt, cancel := c.WithCtxQuit() defer cancel() @@ -620,34 +629,36 @@ func (c *Custodian) checkProofAvailable(event *address.Event) error { }) switch { case errors.Is(err, proof.ErrProofNotFound): - return nil + return false, nil case err != nil: - return fmt.Errorf("error fetching proof for event: %w", err) + return false, fmt.Errorf("error fetching proof for event: %w", + err) } file := proof.NewEmptyFile(proof.V0) if err := file.Decode(bytes.NewReader(blob)); err != nil { - return fmt.Errorf("error decoding proof file: %w", err) + return false, fmt.Errorf("error decoding proof file: %w", err) } // Exit early on empty proof (shouldn't happen outside of test cases). if file.IsEmpty() { - return fmt.Errorf("archive contained empty proof file: %w", err) + return false, fmt.Errorf("archive contained empty proof file: "+ + "%w", err) } lastProof, err := file.LastProof() if err != nil { - return fmt.Errorf("error fetching last proof: %w", err) + return false, fmt.Errorf("error fetching last proof: %w", err) } // The proof might be an old state, let's make sure it matches our event // before marking the inbound asset transfer as complete. if AddrMatchesAsset(event.Addr, &lastProof.Asset) { - return c.setReceiveCompleted(event, lastProof, file) + return true, c.setReceiveCompleted(event, lastProof, file) } - return nil + return false, nil } // mapProofToEvent inspects a new proof and attempts to match it to an existing @@ -702,13 +713,23 @@ func (c *Custodian) mapProofToEvent(p proof.Blob) error { func (c *Custodian) setReceiveCompleted(event *address.Event, lastProof *proof.Proof, proofFile *proof.File) error { + // At this point the "receive" process is complete. We will now notify + // all status event subscribers. + receiveCompleteEvent := NewAssetRecvCompleteEvent( + *event.Addr.Tap, event.Outpoint, + ) + err := c.publishSubscriberStatusEvent(receiveCompleteEvent) + if err != nil { + log.Errorf("Unable publish status event: %v", err) + } + // The proof is created after a single confirmation. To make sure we // notice if the anchor transaction is re-organized out of the chain, we // give all the not-yet-sufficiently-buried proofs in the received proof // file to the re-org watcher and replace the updated proof in the local // proof archive if a re-org happens. The sender will do the same, so no // re-send of the proof is necessary. - err := c.cfg.ProofWatcher.MaybeWatch( + err = c.cfg.ProofWatcher.MaybeWatch( proofFile, c.cfg.ProofWatcher.DefaultUpdateCallback(), ) if err != nil { diff --git a/tapgarden/custodian_test.go b/tapgarden/custodian_test.go index 21f9276c9..9f2fcf860 100644 --- a/tapgarden/custodian_test.go +++ b/tapgarden/custodian_test.go @@ -1,9 +1,12 @@ package tapgarden_test import ( + "bytes" "context" "database/sql" + "fmt" "math/rand" + "net/url" "testing" "time" @@ -13,15 +16,16 @@ import ( "github.com/lightninglabs/lndclient" "github.com/lightninglabs/taproot-assets/address" "github.com/lightninglabs/taproot-assets/asset" + "github.com/lightninglabs/taproot-assets/commitment" "github.com/lightninglabs/taproot-assets/fn" "github.com/lightninglabs/taproot-assets/internal/test" "github.com/lightninglabs/taproot-assets/proof" "github.com/lightninglabs/taproot-assets/tapdb" - "github.com/lightninglabs/taproot-assets/tapdb/sqlc" "github.com/lightninglabs/taproot-assets/tapgarden" "github.com/lightninglabs/taproot-assets/tapscript" "github.com/lightningnetwork/lnd/clock" "github.com/lightningnetwork/lnd/lnrpc" + "github.com/lightningnetwork/lnd/lntest/wait" "github.com/stretchr/testify/require" ) @@ -34,11 +38,9 @@ var ( ) // newAddrBook creates a new instance of the TapAddressBook book. -func newAddrBook(t *testing.T, keyRing *tapgarden.MockKeyRing, +func newAddrBookForDB(db *tapdb.BaseDB, keyRing *tapgarden.MockKeyRing, syncer *tapgarden.MockAssetSyncer) (*address.Book, - *tapdb.TapAddressBook, sqlc.Querier) { - - db := tapdb.NewTestDB(t) + *tapdb.TapAddressBook) { txCreator := func(tx *sql.Tx) tapdb.AddrBook { return db.WithTx(tx) @@ -54,12 +56,12 @@ func newAddrBook(t *testing.T, keyRing *tapgarden.MockKeyRing, Chain: *chainParams, KeyRing: keyRing, }) - return book, tapdbBook, db + return book, tapdbBook } // newProofArchive creates a new instance of the MultiArchiver. -func newProofArchive(t *testing.T) (*proof.MultiArchiver, *tapdb.AssetStore) { - db := tapdb.NewTestDB(t) +func newProofArchiveForDB(t *testing.T, db *tapdb.BaseDB) (*proof.MultiArchiver, + *tapdb.AssetStore) { txCreator := func(tx *sql.Tx) tapdb.ActiveAssetsStore { return db.WithTx(tx) @@ -90,7 +92,7 @@ type custodianHarness struct { addrBook *address.Book syncer *tapgarden.MockAssetSyncer assetDB *tapdb.AssetStore - proofArchive *proof.MultiArchiver + courier *proof.MockProofCourier } // assertStartup makes sure the custodian was started correctly. @@ -114,6 +116,48 @@ func (h *custodianHarness) eventually(fn func() bool) { require.Eventually(h.t, fn, testTimeout, testPollInterval) } +// assertEventsPresent makes sure that the given number of events is present in +// the address book, then returns those events. +func (h *custodianHarness) assertEventsPresent(numEvents int, + status address.Status) []*address.Event { + + ctx := context.Background() + ctxt, cancel := context.WithTimeout(ctx, testTimeout) + defer cancel() + + // Only one event should be registered though, as we've only created one + // transaction. + var finalEvents []*address.Event + err := wait.NoError(func() error { + events, err := h.tapdbBook.QueryAddrEvents( + ctxt, address.EventQueryParams{}, + ) + if err != nil { + return err + } + + if len(events) != numEvents { + return fmt.Errorf("wanted %d events but got %d", + numEvents, len(events)) + } + + for idx, event := range events { + if event.Status != status { + return fmt.Errorf("event %d has status %v "+ + "but wanted %v", idx, event.Status, + status) + } + } + + finalEvents = events + + return nil + }, testTimeout) + require.NoError(h.t, err) + + return finalEvents +} + // assertAddrsRegistered makes sure that for each of the given addresses a // pubkey was imported into the wallet. func (h *custodianHarness) assertAddrsRegistered( @@ -138,8 +182,14 @@ func newHarness(t *testing.T, walletAnchor := tapgarden.NewMockWalletAnchor() keyRing := tapgarden.NewMockKeyRing() syncer := tapgarden.NewMockAssetSyncer() - addrBook, tapdbBook, _ := newAddrBook(t, keyRing, syncer) - proofArchive, assetDB := newProofArchive(t) + db := tapdb.NewTestDB(t) + addrBook, tapdbBook := newAddrBookForDB(db.BaseDB, keyRing, syncer) + _, assetDB := newProofArchiveForDB(t, db.BaseDB) + courier := proof.NewMockProofCourier() + courierDispatch := &proof.MockProofCourierDispatcher{ + Courier: courier, + } + proofWatcher := &tapgarden.MockProofWatcher{} ctxb := context.Background() for _, initialAddr := range initialAddrs { @@ -148,13 +198,15 @@ func newHarness(t *testing.T, } cfg := &tapgarden.CustodianConfig{ - ChainParams: chainParams, - ChainBridge: chainBridge, - WalletAnchor: walletAnchor, - AddrBook: addrBook, - ProofArchive: proofArchive, - ProofNotifier: assetDB, - ErrChan: make(chan error, 1), + ChainParams: chainParams, + ChainBridge: chainBridge, + WalletAnchor: walletAnchor, + AddrBook: addrBook, + ProofArchive: assetDB, + ProofNotifier: assetDB, + ProofCourierDispatcher: courierDispatch, + ProofWatcher: proofWatcher, + ErrChan: make(chan error, 1), } return &custodianHarness{ t: t, @@ -167,20 +219,21 @@ func newHarness(t *testing.T, addrBook: addrBook, syncer: syncer, assetDB: assetDB, - proofArchive: proofArchive, + courier: courier, } } -func randAddr(h *custodianHarness) *address.AddrWithKeyInfo { - proofCourierAddr := address.RandProofCourierAddr(h.t) +func randAddr(h *custodianHarness) (*address.AddrWithKeyInfo, *asset.Genesis) { addr, genesis, group := address.RandAddr( - h.t, &address.RegressionNetTap, proofCourierAddr, + h.t, &address.RegressionNetTap, url.URL{ + Scheme: "mock", + }, ) err := h.tapdbBook.InsertAssetGen(context.Background(), genesis, group) require.NoError(h.t, err) - return addr + return addr, genesis } func randWalletTx(addr *address.AddrWithKeyInfo) (int, *lndclient.Transaction) { @@ -193,9 +246,12 @@ func randWalletTx(addr *address.AddrWithKeyInfo) (int, *lndclient.Transaction) { taprootOutput := rand.Intn(numOutputs) for idx := 0; idx < numInputs; idx++ { - in := &wire.TxIn{} - _, _ = rand.Read(in.PreviousOutPoint.Hash[:]) - in.PreviousOutPoint.Index = rand.Uint32() + in := &wire.TxIn{ + PreviousOutPoint: wire.OutPoint{ + Hash: test.RandHash(), + Index: rand.Uint32(), + }, + } tx.Tx.AddTxIn(in) tx.PreviousOutpoints = append( tx.PreviousOutpoints, &lnrpc.PreviousOutPoint{ @@ -231,6 +287,71 @@ func randWalletTx(addr *address.AddrWithKeyInfo) (int, *lndclient.Transaction) { return taprootOutput, tx } +func randProof(t *testing.T, outputIndex int, tx *wire.MsgTx, + genesis *asset.Genesis, + addr *address.AddrWithKeyInfo) *proof.AnnotatedProof { + + a := asset.Asset{ + Version: asset.V0, + Genesis: *genesis, + Amount: addr.Amount, + ScriptKey: asset.NewScriptKey(&addr.ScriptKey), + } + if addr.GroupKey != nil { + a.GroupKey = &asset.GroupKey{ + GroupPubKey: *addr.GroupKey, + } + } + + p := &proof.Proof{ + PrevOut: wire.OutPoint{}, + BlockHeader: wire.BlockHeader{ + Timestamp: time.Unix(rand.Int63(), 0), + }, + AnchorTx: *tx, + TxMerkleProof: proof.TxMerkleProof{}, + Asset: a, + InclusionProof: proof.TaprootProof{ + InternalKey: test.RandPubKey(t), + OutputIndex: uint32(outputIndex), + }, + } + + f, err := proof.NewFile(proof.V0, *p) + require.NoError(t, err) + + var buf bytes.Buffer + require.NoError(t, f.Encode(&buf)) + + ac, err := commitment.NewAssetCommitment(&a) + require.NoError(t, err) + tc, err := commitment.NewTapCommitment(ac) + require.NoError(t, err) + + op := wire.OutPoint{ + Hash: tx.TxHash(), + Index: uint32(outputIndex), + } + + return &proof.AnnotatedProof{ + Locator: proof.Locator{ + AssetID: fn.Ptr(genesis.ID()), + GroupKey: addr.GroupKey, + ScriptKey: addr.ScriptKey, + OutPoint: &op, + }, + Blob: buf.Bytes(), + AssetSnapshot: &proof.AssetSnapshot{ + Asset: &a, + OutPoint: op, + AnchorTx: tx, + OutputIndex: uint32(outputIndex), + InternalKey: test.RandPubKey(t), + ScriptRoot: tc, + }, + } +} + // insertAssetInfo starts a background goroutine that receives asset info that // was fetched from the asset syncer, and stores it in the address book. This // simulates asset bootstrapping that would occur during universe sync. @@ -279,7 +400,7 @@ func TestCustodianNewAddr(t *testing.T) { <-h.keyRing.ReqKeys }() ctx := context.Background() - addr := randAddr(h) + addr, _ := randAddr(h) proofCourierAddr := address.RandProofCourierAddr(t) dbAddr, err := h.addrBook.NewAddress( ctx, addr.AssetID, addr.Amount, nil, proofCourierAddr, @@ -373,6 +494,8 @@ func TestBookAssetSyncer(t *testing.T) { close(quitAssetWatcher) } +// TestTransactionHandling tests that the custodian correctly handles incoming +// transactions. func TestTransactionHandling(t *testing.T) { h := newHarness(t, nil) @@ -382,15 +505,21 @@ func TestTransactionHandling(t *testing.T) { const numAddrs = 5 addrs := make([]*address.AddrWithKeyInfo, numAddrs) + genesis := make([]*asset.Genesis, numAddrs) for i := 0; i < numAddrs; i++ { - addrs[i] = randAddr(h) + addrs[i], genesis[i] = randAddr(h) err := h.tapdbBook.InsertAddrs(ctx, *addrs[i]) require.NoError(t, err) } outputIdx, tx := randWalletTx(addrs[0]) + tx.Confirmations = 1 h.walletAnchor.Transactions = append(h.walletAnchor.Transactions, *tx) + mockProof := randProof(t, outputIdx, tx.Tx, genesis[0], addrs[0]) + err := h.courier.DeliverProof(nil, mockProof) + require.NoError(t, err) + require.NoError(t, h.c.Start()) t.Cleanup(func() { require.NoError(t, h.c.Stop()) @@ -402,21 +531,92 @@ func TestTransactionHandling(t *testing.T) { // Only one event should be registered though, as we've only created one // transaction. - h.eventually(func() bool { - events, err := h.tapdbBook.QueryAddrEvents( - ctx, address.EventQueryParams{}, - ) - require.NoError(t, err) + events := h.assertEventsPresent(1, address.StatusCompleted) + require.EqualValues(t, outputIdx, events[0].Outpoint.Index) - if len(events) != 1 { - t.Logf("Got %d events", len(events)) - return false - } + dbProof, err := h.assetDB.FetchProof(ctx, mockProof.Locator) + require.NoError(t, err) + require.EqualValues(t, mockProof.Blob, dbProof) +} + +// TestTransactionConfirmedOnly tests that the custodian only starts the proof +// courier once a transaction has been confirmed. We also test that it correctly +// re-tries fetching proofs using a proof courier after it has been restarted. +func TestTransactionConfirmedOnly(t *testing.T) { + t.Parallel() - require.EqualValues(t, outputIdx, events[0].Outpoint.Index) + runTransactionConfirmedOnlyTest(t, false) + runTransactionConfirmedOnlyTest(t, true) +} + +// runTransactionConfirmedOnlyTest runs the transaction confirmed only test, +// optionally restarting the custodian in the middle. +func runTransactionConfirmedOnlyTest(t *testing.T, withRestart bool) { + h := newHarness(t, nil) - return true + // Before we start the custodian, we create a few random addresses. + ctx := context.Background() + + const numAddrs = 5 + addrs := make([]*address.AddrWithKeyInfo, numAddrs) + genesis := make([]*asset.Genesis, numAddrs) + for i := 0; i < numAddrs; i++ { + addrs[i], genesis[i] = randAddr(h) + err := h.tapdbBook.InsertAddrs(ctx, *addrs[i]) + require.NoError(t, err) + } + + // We start the custodian and make sure it's started up correctly. This + // should add pending events for each of the addresses. + require.NoError(t, h.c.Start()) + t.Cleanup(func() { + require.NoError(t, h.c.Stop()) }) + h.assertStartup() + + // We expect all addresses to be watched by the wallet now. + h.assertAddrsRegistered(addrs...) + + // To make sure the custodian adds address events for each address, we + // need to signal an unconfirmed transaction for each of them now. + outputIndexes := make([]int, numAddrs) + transactions := make([]*lndclient.Transaction, numAddrs) + for idx := range addrs { + outputIndex, tx := randWalletTx(addrs[idx]) + outputIndexes[idx] = outputIndex + transactions[idx] = tx + h.walletAnchor.SubscribeTx <- *tx + + // We also simulate that the proof courier has all the proofs + // it needs. + mockProof := randProof( + t, outputIndexes[idx], tx.Tx, genesis[idx], addrs[idx], + ) + _ = h.courier.DeliverProof(nil, mockProof) + } + + // We want events to be created for each address, they should be in the + // state where they detected a transaction. + h.assertEventsPresent(numAddrs, address.StatusTransactionDetected) + + // In case we're testing with a restart, we now restart the custodian. + if withRestart { + require.NoError(t, h.c.Stop()) + + h.c = tapgarden.NewCustodian(h.cfg) + require.NoError(t, h.c.Start()) + h.assertStartup() + } + + // Now we confirm the transactions. This should trigger the custodian to + // fetch the proof for each of the addresses. + for idx := range transactions { + tx := transactions[idx] + tx.Confirmations = 1 + h.walletAnchor.SubscribeTx <- *tx + } + + h.assertEventsPresent(numAddrs, address.StatusCompleted) } func mustMakeAddr(t *testing.T,