Skip to content

Commit

Permalink
Always consider retry policy when executing query and use
Browse files Browse the repository at this point in the history
QueryError to signal potential execution
  • Loading branch information
sylwiaszunejko committed Dec 31, 2024
1 parent 46095d0 commit 4039926
Showing 1 changed file with 74 additions and 42 deletions.
116 changes: 74 additions & 42 deletions query_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package gocql

import (
"context"
"errors"
"sync"
"time"
)
Expand Down Expand Up @@ -107,74 +108,107 @@ func (q *queryExecutor) executeQuery(qry ExecutableQuery) (*Iter, error) {
}

func (q *queryExecutor) do(ctx context.Context, qry ExecutableQuery, hostIter NextHost) *Iter {
selectedHost := hostIter()
rt := qry.retryPolicy()
lwt_rt, use_lwt_rt := rt.(LWTRetryPolicy)
// We only want to apply LWT policy to LWT queries
use_lwt_rt = use_lwt_rt && qry.IsLWT()
if rt == nil {
rt = &SimpleRetryPolicy{3}
}

var lastErr error
var iter *Iter
for selectedHost != nil {
lwtRT, isRTSupportsLWT := rt.(LWTRetryPolicy)

var getShouldRetry func(qry RetryableQuery) bool
var getRetryType func(error) RetryType

if isRTSupportsLWT && qry.IsLWT() {
getShouldRetry = lwtRT.AttemptLWT
getRetryType = lwtRT.GetRetryTypeLWT
} else {
getShouldRetry = rt.Attempt
getRetryType = rt.GetRetryType
}

var potentiallyExecuted bool

execute := func(qry ExecutableQuery, selectedHost SelectedHost) (iter *Iter, retry RetryType) {
host := selectedHost.Info()
if host == nil || !host.IsUp() {
selectedHost = hostIter()
continue
return &Iter{
err: &QueryError{
err: ErrHostDown,
potentiallyExecuted: potentiallyExecuted,
},
}, RetryNextHost
}

pool, ok := q.pool.getPool(host)
if !ok {
selectedHost = hostIter()
continue
return &Iter{
err: &QueryError{
err: ErrNoPool,
potentiallyExecuted: potentiallyExecuted,
},
}, RetryNextHost
}

conn := pool.Pick(selectedHost.Token(), qry)
if conn == nil {
selectedHost = hostIter()
continue
return &Iter{
err: &QueryError{
err: ErrNoConnectionsInPool,
potentiallyExecuted: potentiallyExecuted,
},
}, RetryNextHost
}

iter = q.attemptQuery(ctx, qry, conn)
iter.host = selectedHost.Info()
// Update host
switch iter.err {
case context.Canceled, context.DeadlineExceeded, ErrNotFound:
// those errors represents logical errors, they should not count
// toward removing a node from the pool
if iter.err == nil {
return iter, RetryType(255)
}

switch {
case errors.Is(iter.err, context.Canceled),
errors.Is(iter.err, context.DeadlineExceeded):
selectedHost.Mark(nil)
return iter
potentiallyExecuted = true
retry = Rethrow
default:
selectedHost.Mark(iter.err)
retry = RetryType(255) // Don't enforce retry and get it from retry policy
}

// Exit if the query was successful
// or no retry policy defined
if iter.err == nil || rt == nil {
return iter
}

// or retry policy decides to not retry anymore
if use_lwt_rt {
if !lwt_rt.AttemptLWT(qry) {
return iter
}
var qErr *QueryError
if errors.As(iter.err, &qErr) {
potentiallyExecuted = potentiallyExecuted && qErr.PotentiallyExecuted()
qErr.potentiallyExecuted = potentiallyExecuted
qErr.isIdempotent = qry.IsIdempotent()
iter.err = qErr
} else {
if !rt.Attempt(qry) {
return iter
iter.err = &QueryError{
err: iter.err,
potentiallyExecuted: potentiallyExecuted,
isIdempotent: qry.IsIdempotent(),
}
}
return iter, retry
}

var lastErr error
selectedHost := hostIter()
for selectedHost != nil {
iter, retryType := execute(qry, selectedHost)
if iter.err == nil {
return iter
}
lastErr = iter.err

var retry_type RetryType
if use_lwt_rt {
retry_type = lwt_rt.GetRetryTypeLWT(iter.err)
} else {
retry_type = rt.GetRetryType(iter.err)
// Exit if retry policy decides to not retry anymore
if retryType == RetryType(255) {
if !getShouldRetry(qry) {
return iter
}
retryType = getRetryType(iter.err)
}

// If query is unsuccessful, check the error with RetryPolicy to retry
switch retry_type {
switch retryType {
case Retry:
// retry on the same host
continue
Expand All @@ -189,11 +223,9 @@ func (q *queryExecutor) do(ctx context.Context, qry ExecutableQuery, hostIter Ne
return &Iter{err: ErrUnknownRetryType}
}
}

if lastErr != nil {
return &Iter{err: lastErr}
}

return &Iter{err: ErrNoConnections}
}

Expand Down

0 comments on commit 4039926

Please sign in to comment.