From c44a52d506572fb978c43122d00663c8f70726e1 Mon Sep 17 00:00:00 2001 From: Martin Sucha Date: Thu, 27 Jun 2019 17:36:08 +0200 Subject: [PATCH 1/3] Add a test for TokenAware and DCAwareRR host policies used together Token aware policy with DC aware RR as fallback behaves in a way that might surprise some users. This combination queries the host selected by token in the local DC first and since all hosts for that token are in another DC, falls back to other hosts in the local DC. Moreover for retry count > replication factor, the host selected by token aware policy is only queried once. --- policies_test.go | 101 +++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 101 insertions(+) diff --git a/policies_test.go b/policies_test.go index 3feab1f27..9c3af025d 100644 --- a/policies_test.go +++ b/policies_test.go @@ -437,3 +437,104 @@ func TestHostPolicy_DCAwareRR(t *testing.T) { } } + + +// Tests of the token-aware host selection policy implementation with a +// DC aware round-robin host selection policy fallback. +func TestHostPolicy_TokenAware_DCAwareRR(t *testing.T) { + policy := TokenAwareHostPolicy(DCAwareRoundRobinPolicy("local")) + policyInternal := policy.(*tokenAwareHostPolicy) + + query := &Query{} + + iter := policy.Pick(nil) + if iter == nil { + t.Fatal("host iterator was nil") + } + actual := iter() + if actual != nil { + t.Fatalf("expected nil from iterator, but was %v", actual) + } + + // set the hosts + hosts := [...]*HostInfo{ + {hostId: "0", connectAddress: net.IPv4(10, 0, 0, 1), tokens: []string{"05"}, dataCenter: "remote1"}, + {hostId: "1", connectAddress: net.IPv4(10, 0, 0, 2), tokens: []string{"10"}, dataCenter: "local"}, + {hostId: "2", connectAddress: net.IPv4(10, 0, 0, 3), tokens: []string{"15"}, dataCenter: "remote2"}, + {hostId: "3", connectAddress: net.IPv4(10, 0, 0, 4), tokens: []string{"20"}, dataCenter: "remote1"}, + {hostId: "4", connectAddress: net.IPv4(10, 0, 0, 5), tokens: []string{"25"}, dataCenter: "local"}, + {hostId: "5", connectAddress: net.IPv4(10, 0, 0, 6), tokens: []string{"30"}, dataCenter: "remote2"}, + {hostId: "6", connectAddress: net.IPv4(10, 0, 0, 7), tokens: []string{"35"}, dataCenter: "remote1"}, + {hostId: "7", connectAddress: net.IPv4(10, 0, 0, 8), tokens: []string{"40"}, dataCenter: "local"}, + {hostId: "8", connectAddress: net.IPv4(10, 0, 0, 9), tokens: []string{"45"}, dataCenter: "remote2"}, + {hostId: "9", connectAddress: net.IPv4(10, 0, 0, 10), tokens: []string{"50"}, dataCenter: "remote1"}, + {hostId: "10", connectAddress: net.IPv4(10, 0, 0, 11), tokens: []string{"55"}, dataCenter: "local"}, + {hostId: "11", connectAddress: net.IPv4(10, 0, 0, 12), tokens: []string{"60"}, dataCenter: "remote2"}, + } + for _, host := range hosts { + policy.AddHost(host) + } + + // the token ring is not setup without the partitioner, but the fallback + // should work + if actual := policy.Pick(nil)(); actual.Info().HostID() != "1" { + t.Errorf("Expected host 1 but was %s", actual.Info().HostID()) + } + + query.RoutingKey([]byte("30")) + if actual := policy.Pick(query)(); actual.Info().HostID() != "4" { + t.Errorf("Expected peer 4 but was %s", actual.Info().HostID()) + } + + policy.SetPartitioner("OrderedPartitioner") + + // we need to simulate updateKeyspaceMetadata for replicas() to work. + // this should correspond to what networkTopologyStrategy with rf=1 for each DC would generate + newMeta := &keyspaceMeta{ + replicas: map[string]map[token][]*HostInfo{ + "": { + orderedToken("05"): {hosts[0], hosts[1], hosts[2]}, + orderedToken("10"): {hosts[1], hosts[2], hosts[3]}, + orderedToken("15"): {hosts[2], hosts[3], hosts[4]}, + orderedToken("20"): {hosts[3], hosts[4], hosts[5]}, + orderedToken("25"): {hosts[4], hosts[5], hosts[6]}, + orderedToken("30"): {hosts[5], hosts[6], hosts[7]}, + orderedToken("35"): {hosts[6], hosts[7], hosts[8]}, + orderedToken("40"): {hosts[7], hosts[8], hosts[9]}, + orderedToken("45"): {hosts[8], hosts[9], hosts[10]}, + orderedToken("50"): {hosts[9], hosts[10], hosts[11]}, + orderedToken("55"): {hosts[10], hosts[11], hosts[0]}, + orderedToken("60"): {hosts[11], hosts[0], hosts[1]}, + }, + }, + } + policyInternal.keyspaces.Store(newMeta) + + // now the token ring is configured + query.RoutingKey([]byte("23")) + iter = policy.Pick(query) + // first should be host with matching token from the local DC + if actual := iter(); actual.Info().HostID() != "4" { + t.Errorf("Expected peer 4 but was %s", actual.Info().HostID()) + } + // rest are according DCAwareRR from local DC only, starting with 7 as the fallback was used twice above + if actual := iter(); actual.Info().HostID() != "7" { + t.Errorf("Expected peer 7 but was %s", actual.Info().HostID()) + } + if actual := iter(); actual.Info().HostID() != "10" { + t.Errorf("Expected peer 10 but was %s", actual.Info().HostID()) + } + if actual := iter(); actual.Info().HostID() != "1" { + t.Errorf("Expected peer 1 but was %s", actual.Info().HostID()) + } + // and it starts to repeat now without host 4... + if actual := iter(); actual.Info().HostID() != "7" { + t.Errorf("Expected peer 7 but was %s", actual.Info().HostID()) + } + if actual := iter(); actual.Info().HostID() != "10" { + t.Errorf("Expected peer 10 but was %s", actual.Info().HostID()) + } + if actual := iter(); actual.Info().HostID() != "1" { + t.Errorf("Expected peer 1 but was %s", actual.Info().HostID()) + } +} From fd1a38394a94608ace8508ff437bebd38192e1a3 Mon Sep 17 00:00:00 2001 From: Martin Sucha Date: Fri, 28 Jun 2019 13:34:09 +0200 Subject: [PATCH 2/3] Add NonLocalReplicasFallback option to TokenAwareHostPolicy This option allows to fallback to nodes based by token in remote DCs before falling back to other nodes in local DC. This is useful in particular when used with {'class': 'NetworkTopologyStrategy', 'a': '1', 'b': '1', 'c': '1'} --- policies.go | 28 ++++++++++++- policies_test.go | 107 +++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 134 insertions(+), 1 deletion(-) diff --git a/policies.go b/policies.go index 93ff742fb..350ac3478 100644 --- a/policies.go +++ b/policies.go @@ -395,6 +395,18 @@ func ShuffleReplicas() func(*tokenAwareHostPolicy) { } } +// NonLocalReplicasFallback enables fallback to replicas that are not considered local. +// +// TokenAwareHostPolicy used with DCAwareHostPolicy fallback first selects replicas by partition key in local DC, then +// falls back to other nodes in the local DC. Enabling NonLocalReplicasFallback causes TokenAwareHostPolicy +// to first select replicas by partition key in local DC, then replicas by partition key in remote DCs and fall back +// to other nodes in local DC. +func NonLocalReplicasFallback() func(policy *tokenAwareHostPolicy) { + return func(t *tokenAwareHostPolicy) { + t.nonLocalReplicasFallback = true + } +} + // TokenAwareHostPolicy is a token aware host selection policy, where hosts are // selected based on the partition key, so queries are sent to the host which // owns the partition. Fallback is used when routing information is not available. @@ -420,7 +432,8 @@ type tokenAwareHostPolicy struct { tokenRing atomic.Value // *tokenRing keyspaces atomic.Value // *keyspaceMeta - shuffleReplicas bool + shuffleReplicas bool + nonLocalReplicasFallback bool } func (t *tokenAwareHostPolicy) Init(s *Session) { @@ -573,6 +586,7 @@ func (t *tokenAwareHostPolicy) Pick(qry ExecutableQuery) NextHost { var ( fallbackIter NextHost i int + j int ) used := make(map[*HostInfo]bool, len(replicas)) @@ -587,6 +601,18 @@ func (t *tokenAwareHostPolicy) Pick(qry ExecutableQuery) NextHost { } } + if t.nonLocalReplicasFallback { + for j < len(replicas) { + h := replicas[j] + j++ + + if h.IsUp() && !t.fallback.IsLocal(h) { + used[h] = true + return selectedHost{info: h, token: token} + } + } + } + if fallbackIter == nil { // fallback fallbackIter = t.fallback.Pick(qry) diff --git a/policies_test.go b/policies_test.go index 9c3af025d..9814bd78f 100644 --- a/policies_test.go +++ b/policies_test.go @@ -538,3 +538,110 @@ func TestHostPolicy_TokenAware_DCAwareRR(t *testing.T) { t.Errorf("Expected peer 1 but was %s", actual.Info().HostID()) } } + +// Tests of the token-aware host selection policy implementation with a +// DC aware round-robin host selection policy fallback with NonLocalReplicasFallback option enabled. +func TestHostPolicy_TokenAware_DCAwareRR_NonLocalFallback(t *testing.T) { + policy := TokenAwareHostPolicy(DCAwareRoundRobinPolicy("local"), NonLocalReplicasFallback()) + policyInternal := policy.(*tokenAwareHostPolicy) + + query := &Query{} + + iter := policy.Pick(nil) + if iter == nil { + t.Fatal("host iterator was nil") + } + actual := iter() + if actual != nil { + t.Fatalf("expected nil from iterator, but was %v", actual) + } + + // set the hosts + hosts := [...]*HostInfo{ + {hostId: "0", connectAddress: net.IPv4(10, 0, 0, 1), tokens: []string{"05"}, dataCenter: "remote1"}, + {hostId: "1", connectAddress: net.IPv4(10, 0, 0, 2), tokens: []string{"10"}, dataCenter: "local"}, + {hostId: "2", connectAddress: net.IPv4(10, 0, 0, 3), tokens: []string{"15"}, dataCenter: "remote2"}, + {hostId: "3", connectAddress: net.IPv4(10, 0, 0, 4), tokens: []string{"20"}, dataCenter: "remote1"}, + {hostId: "4", connectAddress: net.IPv4(10, 0, 0, 5), tokens: []string{"25"}, dataCenter: "local"}, + {hostId: "5", connectAddress: net.IPv4(10, 0, 0, 6), tokens: []string{"30"}, dataCenter: "remote2"}, + {hostId: "6", connectAddress: net.IPv4(10, 0, 0, 7), tokens: []string{"35"}, dataCenter: "remote1"}, + {hostId: "7", connectAddress: net.IPv4(10, 0, 0, 8), tokens: []string{"40"}, dataCenter: "local"}, + {hostId: "8", connectAddress: net.IPv4(10, 0, 0, 9), tokens: []string{"45"}, dataCenter: "remote2"}, + {hostId: "9", connectAddress: net.IPv4(10, 0, 0, 10), tokens: []string{"50"}, dataCenter: "remote1"}, + {hostId: "10", connectAddress: net.IPv4(10, 0, 0, 11), tokens: []string{"55"}, dataCenter: "local"}, + {hostId: "11", connectAddress: net.IPv4(10, 0, 0, 12), tokens: []string{"60"}, dataCenter: "remote2"}, + } + for _, host := range hosts { + policy.AddHost(host) + } + + // the token ring is not setup without the partitioner, but the fallback + // should work + if actual := policy.Pick(nil)(); actual.Info().HostID() != "1" { + t.Errorf("Expected host 1 but was %s", actual.Info().HostID()) + } + + query.RoutingKey([]byte("30")) + if actual := policy.Pick(query)(); actual.Info().HostID() != "4" { + t.Errorf("Expected peer 4 but was %s", actual.Info().HostID()) + } + + policy.SetPartitioner("OrderedPartitioner") + + // we need to simulate updateKeyspaceMetadata for replicas() to work. + // this should correspond to what networkTopologyStrategy with rf=1 for each DC would generate + newMeta := &keyspaceMeta{ + replicas: map[string]map[token][]*HostInfo{ + "": { + orderedToken("05"): {hosts[0], hosts[1], hosts[2]}, + orderedToken("10"): {hosts[1], hosts[2], hosts[3]}, + orderedToken("15"): {hosts[2], hosts[3], hosts[4]}, + orderedToken("20"): {hosts[3], hosts[4], hosts[5]}, + orderedToken("25"): {hosts[4], hosts[5], hosts[6]}, + orderedToken("30"): {hosts[5], hosts[6], hosts[7]}, + orderedToken("35"): {hosts[6], hosts[7], hosts[8]}, + orderedToken("40"): {hosts[7], hosts[8], hosts[9]}, + orderedToken("45"): {hosts[8], hosts[9], hosts[10]}, + orderedToken("50"): {hosts[9], hosts[10], hosts[11]}, + orderedToken("55"): {hosts[10], hosts[11], hosts[0]}, + orderedToken("60"): {hosts[11], hosts[0], hosts[1]}, + }, + }, + } + policyInternal.keyspaces.Store(newMeta) + + // now the token ring is configured + query.RoutingKey([]byte("18")) + iter = policy.Pick(query) + // first should be host with matching token from the local DC + if actual := iter(); actual.Info().HostID() != "4" { + t.Errorf("Expected peer 4 but was %s", actual.Info().HostID()) + } + // rest should be hosts with matching token from remote DCs + if actual := iter(); actual.Info().HostID() != "3" { + t.Errorf("Expected peer 3 but was %s", actual.Info().HostID()) + } + if actual := iter(); actual.Info().HostID() != "5" { + t.Errorf("Expected peer 5 but was %s", actual.Info().HostID()) + } + // rest are according DCAwareRR from local DC only, starting with 7 as the fallback was used twice above + if actual := iter(); actual.Info().HostID() != "7" { + t.Errorf("Expected peer 7 but was %s", actual.Info().HostID()) + } + if actual := iter(); actual.Info().HostID() != "10" { + t.Errorf("Expected peer 10 but was %s", actual.Info().HostID()) + } + if actual := iter(); actual.Info().HostID() != "1" { + t.Errorf("Expected peer 1 but was %s", actual.Info().HostID()) + } + // and it starts to repeat now without host 4... + if actual := iter(); actual.Info().HostID() != "7" { + t.Errorf("Expected peer 7 but was %s", actual.Info().HostID()) + } + if actual := iter(); actual.Info().HostID() != "10" { + t.Errorf("Expected peer 10 but was %s", actual.Info().HostID()) + } + if actual := iter(); actual.Info().HostID() != "1" { + t.Errorf("Expected peer 1 but was %s", actual.Info().HostID()) + } +} From f3de1406e7467b634cd956757dea425532dfae2f Mon Sep 17 00:00:00 2001 From: Martin Sucha Date: Fri, 28 Jun 2019 15:17:18 +0200 Subject: [PATCH 3/3] Call updateKeyspaceMetadata in tests This way more code is covered by tests. The tests now use stretch/testify as dependency, which should be OK as it was an indirect dependency before. --- common_test.go | 7 +++ policies.go | 16 +++-- policies_test.go | 161 ++++++++++++++++++++++++++++++++--------------- session.go | 6 ++ 4 files changed, 131 insertions(+), 59 deletions(-) diff --git a/common_test.go b/common_test.go index a269101ed..348412047 100644 --- a/common_test.go +++ b/common_test.go @@ -5,6 +5,7 @@ import ( "fmt" "log" "net" + "reflect" "strings" "sync" "testing" @@ -221,6 +222,12 @@ func assertEqual(t *testing.T, description string, expected, actual interface{}) } } +func assertDeepEqual(t *testing.T, description string, expected, actual interface{}) { + if !reflect.DeepEqual(expected, actual) { + t.Errorf("expected %s to be (%+v) but was (%+v) instead", description, expected, actual) + } +} + func assertNil(t *testing.T, description string, actual interface{}) { if actual != nil { t.Errorf("expected %s to be (nil) but was (%+v) instead", description, actual) diff --git a/policies.go b/policies.go index 350ac3478..e5693850b 100644 --- a/policies.go +++ b/policies.go @@ -427,7 +427,8 @@ type tokenAwareHostPolicy struct { mu sync.RWMutex partitioner string fallback HostSelectionPolicy - session *Session + getKeyspaceMetadata func(keyspace string) (*KeyspaceMetadata, error) + getKeyspaceName func() string tokenRing atomic.Value // *tokenRing keyspaces atomic.Value // *keyspaceMeta @@ -437,7 +438,8 @@ type tokenAwareHostPolicy struct { } func (t *tokenAwareHostPolicy) Init(s *Session) { - t.session = s + t.getKeyspaceMetadata = s.KeyspaceMetadata + t.getKeyspaceName = func() string {return s.cfg.Keyspace} } func (t *tokenAwareHostPolicy) IsLocal(host *HostInfo) bool { @@ -459,7 +461,7 @@ func (t *tokenAwareHostPolicy) updateKeyspaceMetadata(keyspace string) { replicas: make(map[string]map[token][]*HostInfo, size), } - ks, err := t.session.KeyspaceMetadata(keyspace) + ks, err := t.getKeyspaceMetadata(keyspace) if err == nil { strat := getStrategy(ks) if strat != nil { @@ -495,16 +497,12 @@ func (t *tokenAwareHostPolicy) SetPartitioner(partitioner string) { func (t *tokenAwareHostPolicy) AddHost(host *HostInfo) { t.HostUp(host) - if t.session != nil { // disable for unit tests - t.updateKeyspaceMetadata(t.session.cfg.Keyspace) - } + t.updateKeyspaceMetadata(t.getKeyspaceName()) } func (t *tokenAwareHostPolicy) RemoveHost(host *HostInfo) { t.HostDown(host) - if t.session != nil { // disable for unit tests - t.updateKeyspaceMetadata(t.session.cfg.Keyspace) - } + t.updateKeyspaceMetadata(t.getKeyspaceName()) } func (t *tokenAwareHostPolicy) HostUp(host *HostInfo) { diff --git a/policies_test.go b/policies_test.go index 9814bd78f..b2a70ae03 100644 --- a/policies_test.go +++ b/policies_test.go @@ -5,6 +5,7 @@ package gocql import ( + "errors" "fmt" "net" "testing" @@ -56,8 +57,12 @@ func TestHostPolicy_RoundRobin(t *testing.T) { func TestHostPolicy_TokenAware(t *testing.T) { policy := TokenAwareHostPolicy(RoundRobinHostPolicy()) policyInternal := policy.(*tokenAwareHostPolicy) - + policyInternal.getKeyspaceName = func() string {return "myKeyspace"} + policyInternal.getKeyspaceMetadata = func(ks string) (*KeyspaceMetadata, error) { + return nil, errors.New("not initalized") + } query := &Query{} + query.getKeyspace = func() string{return "myKeyspace"} iter := policy.Pick(nil) if iter == nil { @@ -100,19 +105,31 @@ func TestHostPolicy_TokenAware(t *testing.T) { policy.SetPartitioner("OrderedPartitioner") - // We need to simulate updateKeyspaceMetadata for replicas() to work. - // This corresponds to what simpleStrategy with rf=2 would generate. - newMeta := &keyspaceMeta{ - replicas: map[string]map[token][]*HostInfo{ - "": { - orderedToken("00"): {hosts[0], hosts[1]}, - orderedToken("25"): {hosts[1], hosts[2]}, - orderedToken("50"): {hosts[2], hosts[3]}, - orderedToken("75"): {hosts[3], hosts[0]}, + policyInternal.getKeyspaceMetadata = func(keyspaceName string) (*KeyspaceMetadata, error) { + if keyspaceName != "myKeyspace" { + return nil, fmt.Errorf("unknown keyspace: %s", keyspaceName) + } + return &KeyspaceMetadata{ + Name: "myKeyspace", + StrategyClass: "SimpleStrategy", + StrategyOptions: map[string]interface{} { + "class": "SimpleStrategy", + "replication_factor": 2, }, + }, nil + } + policy.KeyspaceChanged(KeyspaceUpdateEvent{Keyspace: "myKeyspace"}) + + // The SimpleStrategy above should generate the following replicas. + // It's handy to have as reference here. + assertDeepEqual(t, "replicas", map[string]map[token][]*HostInfo{ + "myKeyspace": { + orderedToken("00"): {hosts[0], hosts[1]}, + orderedToken("25"): {hosts[1], hosts[2]}, + orderedToken("50"): {hosts[2], hosts[3]}, + orderedToken("75"): {hosts[3], hosts[0]}, }, - } - policyInternal.keyspaces.Store(newMeta) + }, policyInternal.keyspaces.Load().(*keyspaceMeta).replicas) // now the token ring is configured query.RoutingKey([]byte("20")) @@ -204,6 +221,11 @@ func TestHostPolicy_RoundRobin_NilHostInfo(t *testing.T) { func TestHostPolicy_TokenAware_NilHostInfo(t *testing.T) { policy := TokenAwareHostPolicy(RoundRobinHostPolicy()) + policyInternal := policy.(*tokenAwareHostPolicy) + policyInternal.getKeyspaceName = func() string {return "myKeyspace"} + policyInternal.getKeyspaceMetadata = func(ks string) (*KeyspaceMetadata, error) { + return nil, errors.New("not initialized") + } hosts := [...]*HostInfo{ {connectAddress: net.IPv4(10, 0, 0, 0), tokens: []string{"00"}}, @@ -217,6 +239,7 @@ func TestHostPolicy_TokenAware_NilHostInfo(t *testing.T) { policy.SetPartitioner("OrderedPartitioner") query := &Query{} + query.getKeyspace = func() string {return "myKeyspace"} query.RoutingKey([]byte("20")) iter := policy.Pick(query) @@ -444,8 +467,13 @@ func TestHostPolicy_DCAwareRR(t *testing.T) { func TestHostPolicy_TokenAware_DCAwareRR(t *testing.T) { policy := TokenAwareHostPolicy(DCAwareRoundRobinPolicy("local")) policyInternal := policy.(*tokenAwareHostPolicy) + policyInternal.getKeyspaceName = func() string {return "myKeyspace"} + policyInternal.getKeyspaceMetadata = func(ks string) (*KeyspaceMetadata, error) { + return nil, errors.New("not initialized") + } query := &Query{} + query.getKeyspace = func() string {return "myKeyspace"} iter := policy.Pick(nil) if iter == nil { @@ -488,27 +516,41 @@ func TestHostPolicy_TokenAware_DCAwareRR(t *testing.T) { policy.SetPartitioner("OrderedPartitioner") - // we need to simulate updateKeyspaceMetadata for replicas() to work. - // this should correspond to what networkTopologyStrategy with rf=1 for each DC would generate - newMeta := &keyspaceMeta{ - replicas: map[string]map[token][]*HostInfo{ - "": { - orderedToken("05"): {hosts[0], hosts[1], hosts[2]}, - orderedToken("10"): {hosts[1], hosts[2], hosts[3]}, - orderedToken("15"): {hosts[2], hosts[3], hosts[4]}, - orderedToken("20"): {hosts[3], hosts[4], hosts[5]}, - orderedToken("25"): {hosts[4], hosts[5], hosts[6]}, - orderedToken("30"): {hosts[5], hosts[6], hosts[7]}, - orderedToken("35"): {hosts[6], hosts[7], hosts[8]}, - orderedToken("40"): {hosts[7], hosts[8], hosts[9]}, - orderedToken("45"): {hosts[8], hosts[9], hosts[10]}, - orderedToken("50"): {hosts[9], hosts[10], hosts[11]}, - orderedToken("55"): {hosts[10], hosts[11], hosts[0]}, - orderedToken("60"): {hosts[11], hosts[0], hosts[1]}, + policyInternal.getKeyspaceMetadata = func(keyspaceName string) (*KeyspaceMetadata, error) { + if keyspaceName != "myKeyspace" { + return nil, fmt.Errorf("unknown keyspace: %s", keyspaceName) + } + return &KeyspaceMetadata{ + Name: "myKeyspace", + StrategyClass: "NetworkTopologyStrategy", + StrategyOptions: map[string]interface{} { + "class": "NetworkTopologyStrategy", + "local": 1, + "remote1": 1, + "remote2": 1, }, + }, nil + } + policy.KeyspaceChanged(KeyspaceUpdateEvent{Keyspace: "myKeyspace"}) + + // The NetworkTopologyStrategy above should generate the following replicas. + // It's handy to have as reference here. + assertDeepEqual(t, "replicas", map[string]map[token][]*HostInfo{ + "myKeyspace": { + orderedToken("05"): {hosts[0], hosts[1], hosts[2]}, + orderedToken("10"): {hosts[1], hosts[2], hosts[3]}, + orderedToken("15"): {hosts[2], hosts[3], hosts[4]}, + orderedToken("20"): {hosts[3], hosts[4], hosts[5]}, + orderedToken("25"): {hosts[4], hosts[5], hosts[6]}, + orderedToken("30"): {hosts[5], hosts[6], hosts[7]}, + orderedToken("35"): {hosts[6], hosts[7], hosts[8]}, + orderedToken("40"): {hosts[7], hosts[8], hosts[9]}, + orderedToken("45"): {hosts[8], hosts[9], hosts[10]}, + orderedToken("50"): {hosts[9], hosts[10], hosts[11]}, + orderedToken("55"): {hosts[10], hosts[11], hosts[0]}, + orderedToken("60"): {hosts[11], hosts[0], hosts[1]}, }, - } - policyInternal.keyspaces.Store(newMeta) + }, policyInternal.keyspaces.Load().(*keyspaceMeta).replicas) // now the token ring is configured query.RoutingKey([]byte("23")) @@ -544,8 +586,13 @@ func TestHostPolicy_TokenAware_DCAwareRR(t *testing.T) { func TestHostPolicy_TokenAware_DCAwareRR_NonLocalFallback(t *testing.T) { policy := TokenAwareHostPolicy(DCAwareRoundRobinPolicy("local"), NonLocalReplicasFallback()) policyInternal := policy.(*tokenAwareHostPolicy) + policyInternal.getKeyspaceName = func() string {return "myKeyspace"} + policyInternal.getKeyspaceMetadata = func(ks string) (*KeyspaceMetadata, error) { + return nil, errors.New("not initialized") + } query := &Query{} + query.getKeyspace = func() string {return "myKeyspace"} iter := policy.Pick(nil) if iter == nil { @@ -588,27 +635,41 @@ func TestHostPolicy_TokenAware_DCAwareRR_NonLocalFallback(t *testing.T) { policy.SetPartitioner("OrderedPartitioner") - // we need to simulate updateKeyspaceMetadata for replicas() to work. - // this should correspond to what networkTopologyStrategy with rf=1 for each DC would generate - newMeta := &keyspaceMeta{ - replicas: map[string]map[token][]*HostInfo{ - "": { - orderedToken("05"): {hosts[0], hosts[1], hosts[2]}, - orderedToken("10"): {hosts[1], hosts[2], hosts[3]}, - orderedToken("15"): {hosts[2], hosts[3], hosts[4]}, - orderedToken("20"): {hosts[3], hosts[4], hosts[5]}, - orderedToken("25"): {hosts[4], hosts[5], hosts[6]}, - orderedToken("30"): {hosts[5], hosts[6], hosts[7]}, - orderedToken("35"): {hosts[6], hosts[7], hosts[8]}, - orderedToken("40"): {hosts[7], hosts[8], hosts[9]}, - orderedToken("45"): {hosts[8], hosts[9], hosts[10]}, - orderedToken("50"): {hosts[9], hosts[10], hosts[11]}, - orderedToken("55"): {hosts[10], hosts[11], hosts[0]}, - orderedToken("60"): {hosts[11], hosts[0], hosts[1]}, + policyInternal.getKeyspaceMetadata = func(keyspaceName string) (*KeyspaceMetadata, error) { + if keyspaceName != "myKeyspace" { + return nil, fmt.Errorf("unknown keyspace: %s", keyspaceName) + } + return &KeyspaceMetadata{ + Name: "myKeyspace", + StrategyClass: "NetworkTopologyStrategy", + StrategyOptions: map[string]interface{} { + "class": "NetworkTopologyStrategy", + "local": 1, + "remote1": 1, + "remote2": 1, }, + }, nil + } + policy.KeyspaceChanged(KeyspaceUpdateEvent{Keyspace: "myKeyspace"}) + + // The NetworkTopologyStrategy above should generate the following replicas. + // It's handy to have as reference here. + assertDeepEqual(t, "replicas", map[string]map[token][]*HostInfo{ + "myKeyspace": { + orderedToken("05"): {hosts[0], hosts[1], hosts[2]}, + orderedToken("10"): {hosts[1], hosts[2], hosts[3]}, + orderedToken("15"): {hosts[2], hosts[3], hosts[4]}, + orderedToken("20"): {hosts[3], hosts[4], hosts[5]}, + orderedToken("25"): {hosts[4], hosts[5], hosts[6]}, + orderedToken("30"): {hosts[5], hosts[6], hosts[7]}, + orderedToken("35"): {hosts[6], hosts[7], hosts[8]}, + orderedToken("40"): {hosts[7], hosts[8], hosts[9]}, + orderedToken("45"): {hosts[8], hosts[9], hosts[10]}, + orderedToken("50"): {hosts[9], hosts[10], hosts[11]}, + orderedToken("55"): {hosts[10], hosts[11], hosts[0]}, + orderedToken("60"): {hosts[11], hosts[0], hosts[1]}, }, - } - policyInternal.keyspaces.Store(newMeta) + }, policyInternal.keyspaces.Load().(*keyspaceMeta).replicas) // now the token ring is configured query.RoutingKey([]byte("18")) diff --git a/session.go b/session.go index 225bcab86..488f54dff 100644 --- a/session.go +++ b/session.go @@ -689,6 +689,9 @@ type Query struct { metrics *queryMetrics disableAutoPage bool + + // getKeyspace is field so that it can be overriden in tests + getKeyspace func() string } func (q *Query) defaultsFromSession() { @@ -909,6 +912,9 @@ func (q *Query) retryPolicy() RetryPolicy { // Keyspace returns the keyspace the query will be executed against. func (q *Query) Keyspace() string { + if q.getKeyspace != nil { + return q.getKeyspace() + } if q.session == nil { return "" }