Skip to content

Commit

Permalink
Merge pull request #19 from kiwicom/ms/token-aware-cross-dc-policy
Browse files Browse the repository at this point in the history
Add option to fallback to other DC for given token
  • Loading branch information
Henrik Johansson authored Jul 2, 2019
2 parents 0546b9b + f3de140 commit d2cd4a0
Show file tree
Hide file tree
Showing 4 changed files with 328 additions and 22 deletions.
7 changes: 7 additions & 0 deletions common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"log"
"net"
"reflect"
"strings"
"sync"
"testing"
Expand Down Expand Up @@ -221,6 +222,12 @@ func assertEqual(t *testing.T, description string, expected, actual interface{})
}
}

func assertDeepEqual(t *testing.T, description string, expected, actual interface{}) {
if !reflect.DeepEqual(expected, actual) {
t.Errorf("expected %s to be (%+v) but was (%+v) instead", description, expected, actual)
}
}

func assertNil(t *testing.T, description string, actual interface{}) {
if actual != nil {
t.Errorf("expected %s to be (nil) but was (%+v) instead", description, actual)
Expand Down
44 changes: 34 additions & 10 deletions policies.go
Original file line number Diff line number Diff line change
Expand Up @@ -395,6 +395,18 @@ func ShuffleReplicas() func(*tokenAwareHostPolicy) {
}
}

// NonLocalReplicasFallback enables fallback to replicas that are not considered local.
//
// TokenAwareHostPolicy used with DCAwareHostPolicy fallback first selects replicas by partition key in local DC, then
// falls back to other nodes in the local DC. Enabling NonLocalReplicasFallback causes TokenAwareHostPolicy
// to first select replicas by partition key in local DC, then replicas by partition key in remote DCs and fall back
// to other nodes in local DC.
func NonLocalReplicasFallback() func(policy *tokenAwareHostPolicy) {
return func(t *tokenAwareHostPolicy) {
t.nonLocalReplicasFallback = true
}
}

// TokenAwareHostPolicy is a token aware host selection policy, where hosts are
// selected based on the partition key, so queries are sent to the host which
// owns the partition. Fallback is used when routing information is not available.
Expand All @@ -415,16 +427,19 @@ type tokenAwareHostPolicy struct {
mu sync.RWMutex
partitioner string
fallback HostSelectionPolicy
session *Session
getKeyspaceMetadata func(keyspace string) (*KeyspaceMetadata, error)
getKeyspaceName func() string

tokenRing atomic.Value // *tokenRing
keyspaces atomic.Value // *keyspaceMeta

shuffleReplicas bool
shuffleReplicas bool
nonLocalReplicasFallback bool
}

func (t *tokenAwareHostPolicy) Init(s *Session) {
t.session = s
t.getKeyspaceMetadata = s.KeyspaceMetadata
t.getKeyspaceName = func() string {return s.cfg.Keyspace}
}

func (t *tokenAwareHostPolicy) IsLocal(host *HostInfo) bool {
Expand All @@ -446,7 +461,7 @@ func (t *tokenAwareHostPolicy) updateKeyspaceMetadata(keyspace string) {
replicas: make(map[string]map[token][]*HostInfo, size),
}

ks, err := t.session.KeyspaceMetadata(keyspace)
ks, err := t.getKeyspaceMetadata(keyspace)
if err == nil {
strat := getStrategy(ks)
if strat != nil {
Expand Down Expand Up @@ -482,16 +497,12 @@ func (t *tokenAwareHostPolicy) SetPartitioner(partitioner string) {

func (t *tokenAwareHostPolicy) AddHost(host *HostInfo) {
t.HostUp(host)
if t.session != nil { // disable for unit tests
t.updateKeyspaceMetadata(t.session.cfg.Keyspace)
}
t.updateKeyspaceMetadata(t.getKeyspaceName())
}

func (t *tokenAwareHostPolicy) RemoveHost(host *HostInfo) {
t.HostDown(host)
if t.session != nil { // disable for unit tests
t.updateKeyspaceMetadata(t.session.cfg.Keyspace)
}
t.updateKeyspaceMetadata(t.getKeyspaceName())
}

func (t *tokenAwareHostPolicy) HostUp(host *HostInfo) {
Expand Down Expand Up @@ -573,6 +584,7 @@ func (t *tokenAwareHostPolicy) Pick(qry ExecutableQuery) NextHost {
var (
fallbackIter NextHost
i int
j int
)

used := make(map[*HostInfo]bool, len(replicas))
Expand All @@ -587,6 +599,18 @@ func (t *tokenAwareHostPolicy) Pick(qry ExecutableQuery) NextHost {
}
}

if t.nonLocalReplicasFallback {
for j < len(replicas) {
h := replicas[j]
j++

if h.IsUp() && !t.fallback.IsLocal(h) {
used[h] = true
return selectedHost{info: h, token: token}
}
}
}

if fallbackIter == nil {
// fallback
fallbackIter = t.fallback.Pick(qry)
Expand Down
Loading

0 comments on commit d2cd4a0

Please sign in to comment.