Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Check for existing asset in db before attempting to insert a new asset. #687

Merged
merged 4 commits into from
Nov 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
190 changes: 190 additions & 0 deletions itest/send_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,196 @@ func testBasicSendUnidirectional(t *harnessTest) {
wg.Wait()
}

// testRestartReceiver tests that the receiver node's asset balance after a
// single asset transfer does not change if the receiver node restarts.
// Before the addition of this test, after restarting the receiver node
// the asset balance would be erroneously incremented. This is because the
// receiver node was not storing asset transfer in its database with the
// appropriate field uniqueness constraints.
func testRestartReceiverCheckBalance(t *harnessTest) {
var (
ctxb = context.Background()
wg sync.WaitGroup
)

const (
// Number of units to send.
numUnits = 10
)

// Subscribe to receive assent send events from primary tapd node.
eventNtfns, err := t.tapd.SubscribeSendAssetEventNtfns(
ctxb, &taprpc.SubscribeSendAssetEventNtfnsRequest{},
)
require.NoError(t.t, err)

// Test to ensure that we execute the transaction broadcast state.
// This test is executed in a goroutine to ensure that we can receive
// the event notification from the tapd node as the rest of the test
// proceeds.
wg.Add(1)
go func() {
defer wg.Done()

broadcastState := tapfreighter.SendStateBroadcast.String()
targetEventSelector := func(event *taprpc.SendAssetEvent) bool {
switch eventTyped := event.Event.(type) {
case *taprpc.SendAssetEvent_ExecuteSendStateEvent:
ev := eventTyped.ExecuteSendStateEvent

// Log send state execution.
timestamp := time.UnixMicro(ev.Timestamp)
t.Logf("Executing send state (%v): %v",
timestamp.Format(time.RFC3339Nano),
ev.SendState)

return ev.SendState == broadcastState
}

return false
}

timeout := 2 * defaultProofTransferReceiverAckTimeout
ctx, cancel := context.WithTimeout(ctxb, timeout)
defer cancel()
assertAssetSendNtfsEvent(
t, ctx, eventNtfns, targetEventSelector, 1,
)
}()

// First, we'll make a normal assets with enough units to allow us to
ffranr marked this conversation as resolved.
Show resolved Hide resolved
// send it around a few times.
rpcAssets := MintAssetsConfirmBatch(
t.t, t.lndHarness.Miner.Client, t.tapd,
[]*mintrpc.MintAssetRequest{issuableAssets[0]},
)

genInfo := rpcAssets[0].AssetGenesis

// Now that we have the asset created, we'll make a new node that'll
// serve as the node which'll receive the assets. The existing tapd
// node will be used to synchronize universe state.
//
// We will stipulate that the receiver node's custodian service should
// not delay commencing the proof retrieval procedure once a suitable
// on-chain asset transfer is detected. This will ensure that on restart
// the receiver node will attempt to immediately retrieve the asset
// proof even if the proof and asset are present.
custodianProofRetrievalDelay := 0 * time.Second

recvTapd := setupTapdHarness(
t.t, t, t.lndHarness.Bob, t.universeServer,
func(params *tapdHarnessParams) {
params.startupSyncNode = t.tapd
params.startupSyncNumAssets = len(rpcAssets)
params.custodianProofRetrievalDelay = &custodianProofRetrievalDelay
},
)
defer func() {
require.NoError(t.t, recvTapd.stop(!*noDelete))
}()

// Next, we'll attempt to complete two transfers with distinct
// addresses from our main node to Bob.
currentUnits := issuableAssets[0].Asset.Amount

// Issue a single address which will be reused for each send.
bobAddr, err := recvTapd.NewAddr(ctxb, &taprpc.NewAddrRequest{
AssetId: genInfo.AssetId,
Amt: numUnits,
AssetVersion: rpcAssets[0].Version,
})
require.NoError(t.t, err)

t.t.Logf("Performing send procedure")

// Deduct what we sent from the expected current number of
// units.
currentUnits -= numUnits

AssertAddrCreated(t.t, recvTapd, rpcAssets[0], bobAddr)

sendResp := sendAssetsToAddr(t, t.tapd, bobAddr)

ConfirmAndAssertOutboundTransfer(
t.t, t.lndHarness.Miner.Client, t.tapd, sendResp,
genInfo.AssetId,
[]uint64{currentUnits, numUnits}, 0, 1,
)
AssertNonInteractiveRecvComplete(t.t, recvTapd, 1)

// Close event stream.
err = eventNtfns.CloseSend()
require.NoError(t.t, err)

wg.Wait()

assertRecvBalance := func() {
// Get asset balance by group from the receiver node.
respGroup, err := recvTapd.ListBalances(
ctxb, &taprpc.ListBalancesRequest{
GroupBy: &taprpc.ListBalancesRequest_GroupKey{
GroupKey: true,
},
},
)
require.NoError(t.t, err)

// We expect to see a single asset group balance. The receiver
// node received one asset only.
require.Len(t.t, respGroup.AssetGroupBalances, 1)

var assetGroupBalance *taprpc.AssetGroupBalance

for _, value := range respGroup.AssetGroupBalances {
assetGroupBalance = value
break
}

require.Equal(t.t, int(10), int(assetGroupBalance.Balance))

// Get asset balance by asset ID from the receiver node.
respAsset, err := recvTapd.ListBalances(
ctxb, &taprpc.ListBalancesRequest{
GroupBy: &taprpc.ListBalancesRequest_AssetId{
AssetId: true,
},
},
)
require.NoError(t.t, err)

// We expect to see a single asset group balance. The receiver
// node received one asset only.
require.Len(t.t, respAsset.AssetBalances, 1)

var assetBalance *taprpc.AssetBalance

for _, value := range respAsset.AssetBalances {
assetBalance = value
break
}

require.Equal(t.t, assetBalance.Balance, uint64(10))
}

// Initial balance check.
assertRecvBalance()

// Restart the receiver node and then check the balance again.
require.NoError(t.t, recvTapd.stop(false))
require.NoError(t.t, recvTapd.start(false))

assertRecvBalance()

// Restart the receiver node, mine some blocks, and then check the
// balance again.
require.NoError(t.t, recvTapd.stop(false))
t.lndHarness.MineBlocks(7)
require.NoError(t.t, recvTapd.start(false))

assertRecvBalance()
}

// testResumePendingPackageSend tests that we can properly resume a pending
// package send after a restart.
func testResumePendingPackageSend(t *harnessTest) {
Expand Down
14 changes: 10 additions & 4 deletions itest/tapd_harness.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,10 +81,11 @@ type tapdConfig struct {
}

type harnessOpts struct {
proofSendBackoffCfg *proof.BackoffCfg
proofReceiverAckTimeout *time.Duration
proofCourier proof.CourierHarness
addrAssetSyncerDisable bool
proofSendBackoffCfg *proof.BackoffCfg
proofReceiverAckTimeout *time.Duration
proofCourier proof.CourierHarness
custodianProofRetrievalDelay *time.Duration
addrAssetSyncerDisable bool
}

type harnessOption func(*harnessOpts)
Expand Down Expand Up @@ -223,6 +224,11 @@ func newTapdHarness(t *testing.T, ht *harnessTest, cfg tapdConfig,
finalCfg.HashMailCourier = nil
}

// Set the custodian proof retrieval delay if it was specified.
if opts.custodianProofRetrievalDelay != nil {
finalCfg.CustodianProofRetrievalDelay = *opts.custodianProofRetrievalDelay
ffranr marked this conversation as resolved.
Show resolved Hide resolved
}

return &tapdHarness{
cfg: &cfg,
clientCfg: finalCfg,
Expand Down
6 changes: 6 additions & 0 deletions itest/test_harness.go
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,11 @@ type tapdHarnessParams struct {
// an ack from the proof receiver.
proofReceiverAckTimeout *time.Duration

// custodianProofRetrievalDelay is the time duration the custodian waits
// having identified an asset transfer on-chain and before retrieving
// the corresponding proof via the proof courier service.
custodianProofRetrievalDelay *time.Duration

// addrAssetSyncerDisable is a flag that determines if the address book
// will try and bootstrap unknown assets on address creation.
addrAssetSyncerDisable bool
Expand Down Expand Up @@ -371,6 +376,7 @@ func setupTapdHarness(t *testing.T, ht *harnessTest,
ho.proofSendBackoffCfg = params.proofSendBackoffCfg
ho.proofReceiverAckTimeout = params.proofReceiverAckTimeout
ho.proofCourier = selectedProofCourier
ho.custodianProofRetrievalDelay = params.custodianProofRetrievalDelay
ho.addrAssetSyncerDisable = params.addrAssetSyncerDisable
}

Expand Down
5 changes: 5 additions & 0 deletions itest/test_list_on_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,11 @@ var testCases = []*testCase{
test: testBasicSendUnidirectional,
proofCourierType: proof.UniverseRpcCourierType,
},
{
name: "restart receiver check balance",
test: testRestartReceiverCheckBalance,
proofCourierType: proof.UniverseRpcCourierType,
},
{
name: "resume pending package send",
test: testResumePendingPackageSend,
Expand Down
8 changes: 8 additions & 0 deletions tapcfg/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,11 @@ const (
// universe queries. By default we'll allow 100 qps, with a max burst
// of 10 queries.
defaultUniverseQueriesBurst = 10

// defaultProofRetrievalDelay is the default time duration the custodian
// waits having identified an asset transfer on-chain and before
// retrieving the corresponding proof via the proof courier service.
defaultProofRetrievalDelay = 5 * time.Second
)

var (
Expand Down Expand Up @@ -301,6 +306,8 @@ type Config struct {
DefaultProofCourierAddr string `long:"proofcourieraddr" description:"Default proof courier service address."`
HashMailCourier *proof.HashMailCourierCfg `group:"proofcourier" namespace:"hashmailcourier"`

CustodianProofRetrievalDelay time.Duration `long:"custodianproofretrievaldelay" description:"The number of seconds the custodian waits after identifying an asset transfer on-chain and before retrieving the corresponding proof."`

ChainConf *ChainConfig
RpcConf *RpcConfig

Expand Down Expand Up @@ -384,6 +391,7 @@ func DefaultConfig() Config {
MaxBackoff: defaultProofTransferMaxBackoff,
},
},
CustodianProofRetrievalDelay: defaultProofRetrievalDelay,
Universe: &UniverseConfig{
SyncInterval: defaultUniverseSyncInterval,
UniverseQueriesPerSecond: rate.Limit(
Expand Down
13 changes: 7 additions & 6 deletions tapcfg/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -362,12 +362,13 @@ func genServerConfig(cfg *Config, cfgLogger btclog.Logger,
GroupVerifier: tapgarden.GenGroupVerifier(
context.Background(), assetMintingStore,
),
AddrBook: addrBook,
ProofArchive: proofArchive,
ProofNotifier: assetStore,
ErrChan: mainErrChan,
ProofCourierCfg: proofCourierCfg,
ProofWatcher: reOrgWatcher,
AddrBook: addrBook,
ProofArchive: proofArchive,
ProofNotifier: assetStore,
ErrChan: mainErrChan,
ProofCourierCfg: proofCourierCfg,
ProofRetrievalDelay: cfg.CustodianProofRetrievalDelay,
ProofWatcher: reOrgWatcher,
},
),
ChainBridge: chainBridge,
Expand Down
22 changes: 22 additions & 0 deletions tapdb/assets_common.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,10 @@ type UpsertAssetStore interface {
UpsertAssetGroupKey(ctx context.Context, arg AssetGroupKey) (int64,
error)

// QueryAssets fetches a filtered set of fully confirmed assets.
QueryAssets(context.Context, QueryAssetFilters) ([]ConfirmedAsset,
error)

// InsertNewAsset inserts a new asset on disk.
InsertNewAsset(ctx context.Context,
arg sqlc.InsertNewAssetParams) (int64, error)
Expand Down Expand Up @@ -200,6 +204,24 @@ func upsertAssetsWithGenesis(ctx context.Context, q UpsertAssetStore,
anchorUtxoID = anchorUtxoIDs[idx]
}

// Check for matching assets in the database. If we find one,
// we'll just use its primary key ID, and we won't attempt to
// insert the asset again.
existingAssets, err := q.QueryAssets(ctx, QueryAssetFilters{
ffranr marked this conversation as resolved.
Show resolved Hide resolved
AnchorUtxoID: anchorUtxoID,
GenesisID: sqlInt64(genAssetID),
ScriptKeyID: sqlInt64(scriptKeyID),
})
if err != nil {
return 0, nil, fmt.Errorf("unable to query assets: %w",
err)
}

if len(existingAssets) > 0 {
ffranr marked this conversation as resolved.
Show resolved Hide resolved
assetIDs[idx] = existingAssets[0].AssetPrimaryKey
continue
}

// With all the dependent data inserted, we can now insert the
// base asset information itself.
assetIDs[idx], err = q.InsertNewAsset(
Expand Down
11 changes: 10 additions & 1 deletion tapdb/sqlc/assets.sql.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 4 additions & 1 deletion tapdb/sqlc/queries/assets.sql
Original file line number Diff line number Diff line change
Expand Up @@ -430,7 +430,10 @@ WHERE (
assets.amount >= COALESCE(sqlc.narg('min_amt'), assets.amount) AND
assets.spent = COALESCE(sqlc.narg('spent'), assets.spent) AND
(key_group_info_view.tweaked_group_key = sqlc.narg('key_group_filter') OR
sqlc.narg('key_group_filter') IS NULL)
sqlc.narg('key_group_filter') IS NULL) AND
assets.anchor_utxo_id = COALESCE(sqlc.narg('anchor_utxo_id'), assets.anchor_utxo_id) AND
assets.genesis_id = COALESCE(sqlc.narg('genesis_id'), assets.genesis_id) AND
assets.script_key_id = COALESCE(sqlc.narg('script_key_id'), assets.script_key_id)
ffranr marked this conversation as resolved.
Show resolved Hide resolved
);

-- name: AllAssets :many
Expand Down
Loading
Loading