Skip to content

Commit

Permalink
stress,client: nonce chaos and failed execution options (#604)
Browse files Browse the repository at this point in the history
* stress: randomly fail execution in different ways

* stress: expose nonceChaos option

* client/core: fix ignored WithFee option
  • Loading branch information
jchappelow authored Mar 15, 2024
1 parent 9d4dac6 commit 89f2b36
Show file tree
Hide file tree
Showing 5 changed files with 54 additions and 18 deletions.
9 changes: 6 additions & 3 deletions core/client/tx.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,12 @@ func (c *Client) newTx(ctx context.Context, data transactions.Payload, txOpts *c
}

// estimate price
price, err := c.txClient.EstimateCost(ctx, tx)
if err != nil {
return nil, fmt.Errorf("failed to estimate price: %w", err)
price := txOpts.Fee
if price == nil {
price, err = c.txClient.EstimateCost(ctx, tx)
if err != nil {
return nil, fmt.Errorf("failed to estimate price: %w", err)
}
}

// set fee
Expand Down
16 changes: 15 additions & 1 deletion test/stress/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"context"
"errors"
"fmt"
"math/rand"
"time"

clientType "github.com/kwilteam/kwil-db/core/types/client"
Expand Down Expand Up @@ -178,7 +179,20 @@ func (h *harness) createPost(ctx context.Context, dbid string, postID int, title
*/

func (h *harness) createPostAsync(ctx context.Context, dbid string, postID int, title, content string) (<-chan asyncResp, error) {
txHash, err := h.executeActionAsync(ctx, dbid, actCreatePost, [][]any{{postID, title, content}})
args := [][]any{{postID, title, content}}
// Randomly fail execution. TODO: make frequency flag, like execFailRate,
// but we really don't need it to succeed, only be mined. The failures
// ensure expected nonce and balance updates regardless.
if rand.Intn(6) == 0 {
if rand.Intn(2) == 0 {
// kwild.abci: "msg":"failed to execute transaction","error":"ERROR: invalid input syntax for type bigint: \"not integer\" (SQLSTATE 22P02)"
args = [][]any{{"not integer", title, content}} // id not integer (SQL exec error)
} else {
// kwild.abci: "msg":"failed to execute transaction","error":"incorrect number of arguments: procedure \"create_post\" requires 3 arguments, but 2 were provided"
args = [][]any{{postID, title}} // too few args (engine procedure call error)
}
}
txHash, err := h.executeActionAsync(ctx, dbid, actCreatePost, args)
if err != nil {
return nil, err
}
Expand Down
14 changes: 8 additions & 6 deletions test/stress/hammer.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,14 @@ func hammer(ctx context.Context) error {
return err
}

userID, userName, err := h.getOrCreateUser(ctx, dbid)
if err != nil {
return fmt.Errorf("getOrCreateUser: %w", err)
}
h.printf("user ID = %d / user name = %v", userID, userName)

h.nonceChaos = nonceChaos // after successfully deploying the test db and creating a user in it

// ## badgering read-only requests to various systems

// bother the account store
Expand Down Expand Up @@ -209,12 +217,6 @@ func hammer(ctx context.Context) error {

var pid atomic.Int64 // post ID accessed by separate goroutines

userID, userName, err := h.getOrCreateUser(ctx, dbid)
if err != nil {
return fmt.Errorf("getOrCreateUser: %w", err)
}
h.printf("user ID = %d / user name = %v", userID, userName)

nextPostID, err := h.nextPostID(ctx, dbid, userID)
if err != nil {
return fmt.Errorf("nextPostID: %w", err)
Expand Down
29 changes: 23 additions & 6 deletions test/stress/harness.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import (
"context"
"errors"
"fmt"
"math/big"
"math/rand"
"sync"

"github.com/kwilteam/kwil-db/core/crypto/auth"
Expand All @@ -28,25 +28,42 @@ type harness struct {
nonceMtx sync.Mutex
nonce int64 // atomic.Int64

concurrentBroadcast bool // be wreckless with nonces
concurrentBroadcast bool // broadcast many before confirm, still coordinate nonces
nonceChaos int // apply random nonce-jitter every 1/n times

nestedLogger *log.Logger
}

// for about 1 in every f times, produce a non-zero nonce jitter:
// {-2, 1, 1, 2, 3, 4}
func randNonceJitter(f int) int64 {
if f == 0 {
return 0
}
if rand.Intn(f) > 0 {
return 0
}
n := rand.Int63n(6) - 2
if n >= 0 { // 0-3 => 1-4
return n + 1
}
return n
}

func (h *harness) underNonceLock(ctx context.Context, fn func(int64) error) error {
if h.concurrentBroadcast {
// Grab the next nonce in a thread-safe manner, but do not wait for
// broadcast to complete to release the lock. If there is a nonce error,
// there will be more chaos with concurrent broadcasting.
h.nonceMtx.Lock()
h.nonce++
nonce := h.nonce // chaos: + int64(rand.Intn(2))
nonce := h.nonce + randNonceJitter(h.nonceChaos)
h.nonceMtx.Unlock()
if err := fn(nonce); err != nil {
if errors.Is(err, transactions.ErrInvalidNonce) {
// Note: several goroutines may all try to do this if they all hit the nonce error
h.recoverNonce(ctx)
h.printf("error, nonce reverting to %d\n", h.nonce)
h.printf("error, nonce %d was wrong, reverting to %d\n", nonce, h.nonce)
}
return err
}
Expand Down Expand Up @@ -100,13 +117,13 @@ func (h *harness) printRecs(ctx context.Context, recs *clientType.Records) {
}
}

func (h *harness) executeActionAsync(ctx context.Context, dbid string, action string,
func (h *harness) executeActionAsync(ctx context.Context, dbid, action string,
inputs [][]any) (transactions.TxHash, error) {
var txHash transactions.TxHash
err := h.underNonceLock(ctx, func(nonce int64) error {
var err error
txHash, err = h.ExecuteAction(ctx, dbid, action, inputs,
clientType.WithNonce(nonce), clientType.WithFee(&big.Int{}))
clientType.WithNonce(nonce) /*, clientType.WithFee(&big.Int{})*/) // TODO: badFee mode
return err
})
if err != nil {
Expand Down
4 changes: 2 additions & 2 deletions test/stress/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,9 @@ var (
txPollInterval time.Duration

sequentialBroadcast bool
nonceChaos int
rpcTiming bool

// badNonces bool

wg sync.WaitGroup
)

Expand All @@ -69,6 +68,7 @@ func main() {
flag.IntVar(&maxContentLen, "el", 50_000, "maximum content length in an executed post action")

flag.BoolVar(&sequentialBroadcast, "sb", false, "sequential broadcast (disallow concurrent broadcast, waiting for broadcast result before releasing nonce lock)")
flag.IntVar(&nonceChaos, "nc", 0, "nonce chaos rate (apply nonce jitter every 1/nc times)")
flag.BoolVar(&rpcTiming, "v", false, "print RPC durations")

flag.DurationVar(&txPollInterval, "pollint", 200*time.Millisecond, "polling interval when waiting for tx confirmation")
Expand Down

0 comments on commit 89f2b36

Please sign in to comment.