Skip to content

Commit

Permalink
Merge pull request #213 from Lorak-mmk/lwt-retry-policy
Browse files Browse the repository at this point in the history
Add LWTRetryPolicy interface
  • Loading branch information
sylwiaszunejko authored Jul 8, 2024
2 parents c2e98cb + 44c91c7 commit a00403c
Show file tree
Hide file tree
Showing 5 changed files with 76 additions and 3 deletions.
1 change: 1 addition & 0 deletions AUTHORS
Original file line number Diff line number Diff line change
Expand Up @@ -142,3 +142,4 @@ Dmitry Kropachev <[email protected]>
Oliver Boyle <[email protected]>
Jackson Fleming <[email protected]>
Sylwia Szunejko <[email protected]>
Karol Baryła <[email protected]>
2 changes: 2 additions & 0 deletions doc.go
Original file line number Diff line number Diff line change
Expand Up @@ -321,6 +321,8 @@
// execution.
//
// Idempotent queries are retried in case of errors based on the configured RetryPolicy.
// If the query is LWT and the configured RetryPolicy additionally implements LWTRetryPolicy
// interface, then the policy will be cast to LWTRetryPolicy and used this way.
//
// Queries can be retried even before they fail by setting a SpeculativeExecutionPolicy. The policy can
// cause the driver to retry on a different node if the query is taking longer than a specified delay even before the
Expand Down
32 changes: 32 additions & 0 deletions policies.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,14 @@ type RetryPolicy interface {
GetRetryType(error) RetryType
}

// LWTRetryPolicy is a similar interface to RetryPolicy
// If a query is recognized as an LWT query and its RetryPolicy satisfies this
// interface, then this interface will be used instead of RetryPolicy.
type LWTRetryPolicy interface {
AttemptLWT(RetryableQuery) bool
GetRetryTypeLWT(error) RetryType
}

// SimpleRetryPolicy has simple logic for attempting a query a fixed number of times.
//
// See below for examples of usage:
Expand All @@ -175,10 +183,22 @@ func (s *SimpleRetryPolicy) Attempt(q RetryableQuery) bool {
return q.Attempts() <= s.NumRetries
}

func (s *SimpleRetryPolicy) AttemptLWT(q RetryableQuery) bool {
return s.Attempt(q)
}

func (s *SimpleRetryPolicy) GetRetryType(err error) RetryType {
return RetryNextHost
}

// Retrying on a different host is fine for normal (non-LWT) queries,
// but in case of LWTs it will cause Paxos contention and possibly
// even timeouts if other clients send statements touching the same
// partition to the original node at the same time.
func (s *SimpleRetryPolicy) GetRetryTypeLWT(err error) RetryType {
return Retry
}

// ExponentialBackoffRetryPolicy sleeps between attempts
type ExponentialBackoffRetryPolicy struct {
NumRetries int
Expand All @@ -193,6 +213,10 @@ func (e *ExponentialBackoffRetryPolicy) Attempt(q RetryableQuery) bool {
return true
}

func (e *ExponentialBackoffRetryPolicy) AttemptLWT(q RetryableQuery) bool {
return e.Attempt(q)
}

// used to calculate exponentially growing time
func getExponentialTime(min time.Duration, max time.Duration, attempts int) time.Duration {
if min <= 0 {
Expand All @@ -215,6 +239,14 @@ func (e *ExponentialBackoffRetryPolicy) GetRetryType(err error) RetryType {
return RetryNextHost
}

// Retrying on a different host is fine for normal (non-LWT) queries,
// but in case of LWTs it will cause Paxos contention and possibly
// even timeouts if other clients send statements touching the same
// partition to the original node at the same time.
func (e *ExponentialBackoffRetryPolicy) GetRetryTypeLWT(err error) RetryType {
return Retry
}

// DowngradingConsistencyRetryPolicy: Next retry will be with the next consistency level
// provided in the slice
//
Expand Down
16 changes: 16 additions & 0 deletions policies_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -422,6 +422,14 @@ func TestSimpleRetryPolicy(t *testing.T) {
}
}

func TestLWTSimpleRetryPolicy(t *testing.T) {
ebrp := &SimpleRetryPolicy{NumRetries: 2}
// Verify that SimpleRetryPolicy implements both interfaces
var _ RetryPolicy = ebrp
var lwt_rt LWTRetryPolicy = ebrp
assertEqual(t, "retry type of LWT policy", lwt_rt.GetRetryTypeLWT(nil), Retry)
}

func TestExponentialBackoffPolicy(t *testing.T) {
// test with defaults
sut := &ExponentialBackoffRetryPolicy{NumRetries: 2}
Expand Down Expand Up @@ -450,6 +458,14 @@ func TestExponentialBackoffPolicy(t *testing.T) {
}
}

func TestLWTExponentialBackoffPolicy(t *testing.T) {
ebrp := &ExponentialBackoffRetryPolicy{NumRetries: 2}
// Verify that ExponentialBackoffRetryPolicy implements both interfaces
var _ RetryPolicy = ebrp
var lwt_rt LWTRetryPolicy = ebrp
assertEqual(t, "retry type of LWT policy", lwt_rt.GetRetryTypeLWT(nil), Retry)
}

func TestDowngradingConsistencyRetryPolicy(t *testing.T) {

q := &Query{cons: LocalQuorum, routingInfo: &queryRoutingInfo{}}
Expand Down
28 changes: 25 additions & 3 deletions query_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,9 @@ func (q *queryExecutor) executeQuery(qry ExecutableQuery) (*Iter, error) {
func (q *queryExecutor) do(ctx context.Context, qry ExecutableQuery, hostIter NextHost) *Iter {
selectedHost := hostIter()
rt := qry.retryPolicy()
lwt_rt, use_lwt_rt := rt.(LWTRetryPolicy)
// We only want to apply LWT policy to LWT queries
use_lwt_rt = use_lwt_rt && qry.IsLWT()

var lastErr error
var iter *Iter
Expand Down Expand Up @@ -145,14 +148,33 @@ func (q *queryExecutor) do(ctx context.Context, qry ExecutableQuery, hostIter Ne
}

// Exit if the query was successful
// or no retry policy defined or retry attempts were reached
if iter.err == nil || rt == nil || !rt.Attempt(qry) {
// or no retry policy defined
if iter.err == nil || rt == nil {
return iter
}

// or retry policy decides to not retry anymore
if use_lwt_rt {
if !lwt_rt.AttemptLWT(qry) {
return iter
}
} else {
if !rt.Attempt(qry) {
return iter
}
}

lastErr = iter.err

var retry_type RetryType
if use_lwt_rt {
retry_type = lwt_rt.GetRetryTypeLWT(iter.err)
} else {
retry_type = rt.GetRetryType(iter.err)
}

// If query is unsuccessful, check the error with RetryPolicy to retry
switch rt.GetRetryType(iter.err) {
switch retry_type {
case Retry:
// retry on the same host
continue
Expand Down

0 comments on commit a00403c

Please sign in to comment.