Skip to content

Commit

Permalink
Merge pull request #449 from lightninglabs/remove-mint-lock
Browse files Browse the repository at this point in the history
tapdb: allow batch inserting proofs when syncing universe, remove write lock for RegisterIssuance
  • Loading branch information
Roasbeef authored Sep 7, 2023
2 parents f87d176 + beb76bc commit c20fdd5
Show file tree
Hide file tree
Showing 23 changed files with 998 additions and 337 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/main.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ jobs:
########################
integration-test-postgres:
name: run itests postgres
runs-on: ubuntu-latest
runs-on: ubuntu-20.04
steps:
- name: git checkout
uses: actions/checkout@v3
Expand Down
14 changes: 0 additions & 14 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -179,20 +179,6 @@ itest-only-trace: aperture-dir
rm -rf itest/regtest; date
$(GOTEST) ./itest -v -tags="$(ITEST_TAGS)" $(TEST_FLAGS) $(ITEST_FLAGS) -loglevel=trace -btcdexec=./btcd-itest -logdir=regtest

optional-itest: build-itest optional-itest-only

optional-itest-trace: build-itest optional-itest-only-trace

optional-itest-only: aperture-dir
@$(call print, "Running integration tests with ${backend} backend.")
rm -rf itest/regtest; date
$(GOTEST) ./itest -v -tags="$(ITEST_TAGS)" $(TEST_FLAGS) $(ITEST_FLAGS) -optional -btcdexec=./btcd-itest -logdir=regtest

optional-itest-only-trace: aperture-dir
@$(call print, "Running integration tests with ${backend} backend.")
rm -rf itest/regtest; date
$(GOTEST) ./itest -v -tags="$(ITEST_TAGS)" $(TEST_FLAGS) $(ITEST_FLAGS) -optional -loglevel=trace -btcdexec=./btcd-itest -logdir=regtest

aperture-dir:
ifeq ($(UNAME_S),Linux)
mkdir -p $$HOME/.aperture
Expand Down
35 changes: 33 additions & 2 deletions fn/recv.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package fn

import (
"context"
"fmt"
"time"
)
Expand Down Expand Up @@ -40,8 +41,6 @@ func RecvResp[T any](r <-chan T, e <-chan error, q <-chan struct{}) (T, error) {
//
// NOTE: This function closes the channel to be able to collect all items at
// once.
//
// TODO(roasbeef): instead could take a number of items to recv?
func Collect[T any](c chan T) []T {
close(c)

Expand All @@ -52,3 +51,35 @@ func Collect[T any](c chan T) []T {

return out
}

// CollectBatch reads from the given channel and returns batchSize items at a
// time and a boolean that indicates whether we expect more items to be sent
// on the channel. If the context is canceled, the function returns the items
// that have been read so far and the context's error.
//
// NOTE: The channel MUST be closed for this function to return.
func CollectBatch[V any](ctx context.Context, values <-chan V,
batchSize int, cb func(ctx context.Context, batch []V) error) error {

batch := make([]V, 0, batchSize)
for {
select {
case v, ok := <-values:
if !ok {
return cb(ctx, batch)
}
batch = append(batch, v)

if len(batch) == batchSize {
err := cb(ctx, batch)
if err != nil {
return err
}
batch = make([]V, 0, batchSize)
}

case <-ctx.Done():
return ctx.Err()
}
}
}
61 changes: 61 additions & 0 deletions fn/recv_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package fn

import (
"context"
"testing"
"time"

"github.com/stretchr/testify/require"
)

var (
testTimeout = 100 * time.Millisecond
)

func TestCollectBatch(t *testing.T) {
t.Parallel()

ctxb := context.Background()
ctxt, cancel := context.WithTimeout(ctxb, testTimeout)
defer cancel()

// First, test the expected normal case where we receive all the items
// and the channel is closed.
var (
c = make(chan int, 10)
numReceived = 0
)

for i := 0; i < 10; i++ {
c <- i
}
close(c)

err := CollectBatch(
ctxt, c, 3, func(ctx context.Context, batch []int) error {
numReceived += len(batch)

return nil
},
)
require.NoError(t, err)
require.Equal(t, 10, numReceived)

// If we don't close the channel, then we expect to run into the
// timeout and only receive 9 out of 10 items (the last batch is never
// completed).
c = make(chan int, 10)
numReceived = 0
for i := 0; i < 10; i++ {
c <- i
}
err = CollectBatch(
ctxt, c, 3, func(ctx context.Context, batch []int) error {
numReceived += len(batch)

return nil
},
)
require.ErrorIs(t, err, context.DeadlineExceeded)
require.Equal(t, 9, numReceived)
}
4 changes: 2 additions & 2 deletions itest/mint_batch_stress_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,14 +39,14 @@ func testMintBatch100StressTest(t *harnessTest) {

func testMintBatch1kStressTest(t *harnessTest) {
batchSize := 1_000
timeout := defaultWaitTimeout * 10
timeout := defaultWaitTimeout * 20

testMintBatchNStressTest(t, batchSize, timeout)
}

func testMintBatch10kStressTest(t *harnessTest) {
batchSize := 10_000
timeout := defaultWaitTimeout * 100
timeout := defaultWaitTimeout * 200

testMintBatchNStressTest(t, batchSize, timeout)
}
Expand Down
10 changes: 9 additions & 1 deletion itest/tapd_harness.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,14 @@ var (
// to use when starting a tap daemon.
dbbackend = flag.String("dbbackend", "sqlite", "Set the database "+
"backend to use when starting a tap daemon.")

// postgresTimeout is a command line flag for specifying the amount of
// time to allow the postgres fixture to run in total. Needs to be
// increased for long-running tests.
postgresTimeout = flag.Duration("postgrestimeout",
tapdb.DefaultPostgresFixtureLifetime, "The amount of time to "+
"allow the postgres fixture to run in total. Needs "+
"to be increased for long-running tests.")
)

const (
Expand Down Expand Up @@ -109,7 +117,7 @@ func newTapdHarness(ht *harnessTest, cfg tapdConfig,

case tapcfg.DatabaseBackendPostgres:
fixture := tapdb.NewTestPgFixture(
ht.t, tapdb.DefaultPostgresFixtureLifetime, !*noDelete,
ht.t, *postgresTimeout, !*noDelete,
)
ht.t.Cleanup(func() {
if !*noDelete {
Expand Down
7 changes: 7 additions & 0 deletions make/testing_flags.mk
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,11 @@ ifneq ($(nodelete),)
ITEST_FLAGS += -nodelete
endif

# Run the optional tests.
ifneq ($(optional),)
ITEST_FLAGS += -optional -postgrestimeout=240m
endif

# Run itests with specified db backend.
ifneq ($(dbbackend),)
ITEST_FLAGS += -dbbackend=$(dbbackend)
Expand Down Expand Up @@ -69,6 +74,8 @@ endif
# test command. If not, we set 60m (up from the default 10m).
ifneq ($(timeout),)
TEST_FLAGS += -test.timeout=$(timeout)
else ifneq ($(optional),)
TEST_FLAGS += -test.timeout=240m
else
TEST_FLAGS += -test.timeout=60m
endif
Expand Down
16 changes: 7 additions & 9 deletions rpcserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -2591,25 +2591,23 @@ func (r *rpcServer) AssetLeafKeys(ctx context.Context,
func marshalAssetLeaf(ctx context.Context, keys KeyLookup,
assetLeaf *universe.MintingLeaf) (*unirpc.AssetLeaf, error) {

// In order to display the full asset, we'll parse the genesis
// proof so we can map that to the asset being proved.
var assetProof proof.Proof
if err := assetProof.Decode(
bytes.NewReader(assetLeaf.GenesisProof),
); err != nil {
// In order to display the full asset, we'll also encode the genesis
// proof.
var buf bytes.Buffer
if err := assetLeaf.GenesisProof.Encode(&buf); err != nil {
return nil, err
}

rpcAsset, err := MarshalAsset(
ctx, &assetProof.Asset, false, true, keys,
ctx, &assetLeaf.GenesisProof.Asset, false, true, keys,
)
if err != nil {
return nil, err
}

return &unirpc.AssetLeaf{
Asset: rpcAsset,
IssuanceProof: assetLeaf.GenesisProof[:],
IssuanceProof: buf.Bytes(),
}, nil
}

Expand Down Expand Up @@ -2859,7 +2857,7 @@ func unmarshalAssetLeaf(leaf *unirpc.AssetLeaf) (*universe.MintingLeaf, error) {
Genesis: assetProof.Asset.Genesis,
GroupKey: assetProof.Asset.GroupKey,
},
GenesisProof: leaf.IssuanceProof,
GenesisProof: &assetProof,
Amt: assetProof.Asset.Amount,
}, nil
}
Expand Down
4 changes: 4 additions & 0 deletions tapcfg/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,10 @@ const (
// to sync Universe state with the federation.
defaultUniverseSyncInterval = time.Minute * 10

// defaultUniverseSyncBatchSize is the default number of proofs we'll
// sync in a single batch.
defaultUniverseSyncBatchSize = 200

// defaultReOrgSafeDepth is the default number of confirmations we'll
// wait for before considering a transaction safely buried in the chain.
defaultReOrgSafeDepth = 6
Expand Down
8 changes: 5 additions & 3 deletions tapcfg/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,7 @@ func genServerConfig(cfg *Config, cfgLogger btclog.Logger,
LocalDiffEngine: baseUni,
NewRemoteDiffEngine: tap.NewRpcUniverseDiff,
LocalRegistrar: baseUni,
SyncBatchSize: defaultUniverseSyncBatchSize,
})

federationMembers := cfg.Universe.FederationServers
Expand Down Expand Up @@ -284,9 +285,10 @@ func genServerConfig(cfg *Config, cfgLogger btclog.Logger,
GenSigner: tap.NewLndRpcGenSigner(
lndServices,
),
ProofFiles: proofFileStore,
Universe: universeFederation,
ProofWatcher: reOrgWatcher,
ProofFiles: proofFileStore,
Universe: universeFederation,
ProofWatcher: reOrgWatcher,
UniversePushBatchSize: defaultUniverseSyncBatchSize,
},
BatchTicker: ticker.NewForce(cfg.BatchMintingInterval),
ProofUpdates: proofArchive,
Expand Down
61 changes: 48 additions & 13 deletions tapdb/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package tapdb
import (
"context"
"database/sql"
"math"
prand "math/rand"
"time"

Expand All @@ -21,9 +22,16 @@ const (
// repetition.
DefaultNumTxRetries = 10

// DefaultRetryDelay is the default delay between retries. This will be
// used to generate a random delay between 0 and this value.
DefaultRetryDelay = time.Millisecond * 50
// DefaultInitialRetryDelay is the default initial delay between
// retries. This will be used to generate a random delay between -50%
// and +50% of this value, so 20 to 60 milliseconds. The retry will be
// doubled after each attempt until we reach DefaultMaxRetryDelay. We
// start with a random value to avoid multiple goroutines that are
// created at the same time to effectively retry at the same time.
DefaultInitialRetryDelay = time.Millisecond * 40

// DefaultMaxRetryDelay is the default maximum delay between retries.
DefaultMaxRetryDelay = time.Second * 3
)

// TxOptions represents a set of options one can use to control what type of
Expand Down Expand Up @@ -91,23 +99,50 @@ type BatchedQuerier interface {
// executor. This can be used to do things like retry a transaction due to an
// error a certain amount of times.
type txExecutorOptions struct {
numRetries int
retryDelay time.Duration
numRetries int
initialRetryDelay time.Duration
maxRetryDelay time.Duration
}

// defaultTxExecutorOptions returns the default options for the transaction
// executor.
func defaultTxExecutorOptions() *txExecutorOptions {
return &txExecutorOptions{
numRetries: DefaultNumTxRetries,
retryDelay: DefaultRetryDelay,
numRetries: DefaultNumTxRetries,
initialRetryDelay: DefaultInitialRetryDelay,
maxRetryDelay: DefaultMaxRetryDelay,
}
}

// randRetryDelay returns a random retry delay between 0 and the configured max
// delay.
func (t *txExecutorOptions) randRetryDelay() time.Duration {
return time.Duration(prand.Int63n(int64(t.retryDelay))) //nolint:gosec
// randRetryDelay returns a random retry delay between -50% and +50%
// of the configured delay that is doubled for each attempt and capped at a max
// value.
func (t *txExecutorOptions) randRetryDelay(attempt int) time.Duration {
halfDelay := t.initialRetryDelay / 2
randDelay := prand.Int63n(int64(t.initialRetryDelay)) //nolint:gosec

// 50% plus 0%-100% gives us the range of 50%-150%.
initialDelay := halfDelay + time.Duration(randDelay)

// If this is the first attempt, we just return the initial delay.
if attempt == 0 {
return initialDelay
}

// For each subsequent delay, we double the initial delay. This still
// gives us a somewhat random delay, but it still increases with each
// attempt. If we double something n times, that's the same as
// multiplying the value with 2^n. We limit the power to 32 to avoid
// overflows.
factor := time.Duration(math.Pow(2, math.Min(float64(attempt), 32)))
actualDelay := initialDelay * factor

// Cap the delay at the maximum configured value.
if actualDelay > t.maxRetryDelay {
return t.maxRetryDelay
}

return actualDelay
}

// TxExecutorOption is a functional option that allows us to pass in optional
Expand All @@ -126,7 +161,7 @@ func WithTxRetries(numRetries int) TxExecutorOption {
// to wait before a transaction is retried.
func WithTxRetryDelay(delay time.Duration) TxExecutorOption {
return func(o *txExecutorOptions) {
o.retryDelay = delay
o.initialRetryDelay = delay
}
}

Expand Down Expand Up @@ -171,7 +206,7 @@ func (t *TransactionExecutor[Q]) ExecTx(ctx context.Context,
txOptions TxOptions, txBody func(Q) error) error {

waitBeforeRetry := func(attemptNumber int) {
retryDelay := t.opts.randRetryDelay()
retryDelay := t.opts.randRetryDelay(attemptNumber)

log.Tracef("Retrying transaction due to tx serialization "+
"error, attempt_number=%v, delay=%v", attemptNumber,
Expand Down
Loading

0 comments on commit c20fdd5

Please sign in to comment.