From c44e5dd0a4897a503a4c0e8f23696e0018faa131 Mon Sep 17 00:00:00 2001 From: Nikos Angelopoulos Date: Mon, 25 Nov 2024 14:54:39 +0100 Subject: [PATCH 1/5] feat(ha_tracker): make replicadesc mergeable --- pkg/distributor/ha_tracker.go | 77 +++++++++++++++++++++++++++++++++++ 1 file changed, 77 insertions(+) diff --git a/pkg/distributor/ha_tracker.go b/pkg/distributor/ha_tracker.go index 2303bd08ac..4b987420ca 100644 --- a/pkg/distributor/ha_tracker.go +++ b/pkg/distributor/ha_tracker.go @@ -21,6 +21,7 @@ import ( "github.com/gogo/protobuf/proto" "github.com/grafana/dskit/kv" "github.com/grafana/dskit/kv/codec" + "github.com/grafana/dskit/kv/memberlist" "github.com/grafana/dskit/services" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" @@ -60,6 +61,82 @@ func NewReplicaDesc() *ReplicaDesc { return &ReplicaDesc{} } +func (r *ReplicaDesc) Merge(other memberlist.Mergeable, localCAS bool) (change memberlist.Mergeable, error error) { + return r.mergeWithTime(other, localCAS) +} + +func (r *ReplicaDesc) mergeWithTime(mergeable memberlist.Mergeable, _ bool) (memberlist.Mergeable, error) { + if mergeable == nil { + return nil, nil + } + + other, ok := mergeable.(*ReplicaDesc) + if !ok { + return nil, fmt.Errorf("expected *distributor.ReplicaDesc, got %T", mergeable) + } + + if other == nil { + return nil, nil + } + + thisRDesc := r + otherRDesc := other + // Track changes + changed := false + + // Keeping the one with the most recent Received At. + // Even if the Replica is different + if otherRDesc.ReceivedAt > thisRDesc.ReceivedAt { + *r = *other + changed = true + } else if thisRDesc.ReceivedAt == otherRDesc.ReceivedAt && thisRDesc.DeletedAt == 0 && otherRDesc.DeletedAt != 0 { + if thisRDesc.Replica == otherRDesc.Replica { + *r = *other + changed = true + } + } + + // No changes - return + if !changed { + return nil, nil + } + + out := NewReplicaDesc() + *out = *thisRDesc + return out, nil +} + +// MergeContent with the current approach will return always 1, with the current value +// Questions to be answered: +// 1. What if r.ReplicaDesc is empty string: answer its gonna return an empty list of strings +func (r *ReplicaDesc) MergeContent() []string { + result := []string(nil) + if len(r.Replica) != 0 { + result = append(result, r.Replica) + } + return result +} + +// RemoveTombstones noOp for now +// Questions +// 1. How should we handle deletion for one entry (ReplicaDesc)? +func (r *ReplicaDesc) RemoveTombstones(limit time.Time) (total, removed int) { + if r.DeletedAt > 0 { + if limit.IsZero() || time.Unix(r.DeletedAt, 0).Before(limit) { + // need to implement the remove logic + removed = 1 + } else { + total = 1 + } + } + return +} + +// Clone returns a deep copy of the ring state. +func (r *ReplicaDesc) Clone() memberlist.Mergeable { + return proto.Clone(r).(*ReplicaDesc) +} + // HATrackerConfig contains the configuration required to // create an HA Tracker. type HATrackerConfig struct { From 2789864f7312df1cb2ecd9433a2880bcbf201426 Mon Sep 17 00:00:00 2001 From: Nikos Angelopoulos Date: Mon, 25 Nov 2024 14:55:07 +0100 Subject: [PATCH 2/5] test: add test with Memberlist as a KVStore --- pkg/distributor/ha_tracker_test.go | 107 +++++++++++++++++++++++++++++ 1 file changed, 107 insertions(+) diff --git a/pkg/distributor/ha_tracker_test.go b/pkg/distributor/ha_tracker_test.go index c950f92f80..28e40ba84a 100644 --- a/pkg/distributor/ha_tracker_test.go +++ b/pkg/distributor/ha_tracker_test.go @@ -8,15 +8,19 @@ package distributor import ( "context" "fmt" + "net" "os" "strings" + "sync" "testing" "time" "github.com/go-kit/log" "github.com/grafana/dskit/flagext" "github.com/grafana/dskit/kv" + "github.com/grafana/dskit/kv/codec" "github.com/grafana/dskit/kv/consul" + "github.com/grafana/dskit/kv/memberlist" "github.com/grafana/dskit/ring" "github.com/grafana/dskit/services" "github.com/grafana/dskit/test" @@ -33,6 +37,33 @@ import ( utiltest "github.com/grafana/mimir/pkg/util/test" ) +var addrsOnce sync.Once +var localhostIP string + +func getLocalhostAddrs() []string { + addrsOnce.Do(func() { + ip, err := net.ResolveIPAddr("ip4", "localhost") + if err != nil { + localhostIP = "127.0.0.1" // this is the most common answer, try it + } + localhostIP = ip.String() + }) + return []string{localhostIP} +} + +type dnsProviderMock struct { + resolved []string +} + +func (p *dnsProviderMock) Resolve(_ context.Context, addrs []string) error { + p.resolved = addrs + return nil +} + +func (p dnsProviderMock) Addresses() []string { + return p.resolved +} + func checkReplicaTimestamp(t *testing.T, duration time.Duration, c *defaultHaTracker, user, cluster, replica string, expected time.Time, elected time.Time) { t.Helper() @@ -70,6 +101,82 @@ func checkReplicaTimestamp(t *testing.T, duration time.Duration, c *defaultHaTra }) } +func TestHaTrackerWithMemberList(t *testing.T) { + var config memberlist.KVConfig + + const cluster = "cluster" + const replica1 = "r1" + const replica2 = "r2" + + flagext.DefaultValues(&config) + ctx := context.Background() + + config.TCPTransport = memberlist.TCPTransportConfig{ + BindAddrs: getLocalhostAddrs(), + BindPort: 0, + } + + config.Codecs = []codec.Codec{ + GetReplicaDescCodec(), + } + + memberListSvc := memberlist.NewKVInitService( + &config, + log.NewNopLogger(), + &dnsProviderMock{}, + prometheus.NewPedanticRegistry(), + ) + + t.Cleanup(func() { + assert.NoError(t, services.StopAndAwaitTerminated(ctx, memberListSvc)) + }) + + c, err := newHaTracker(HATrackerConfig{ + EnableHATracker: true, + KVStore: kv.Config{Store: "memberlist", StoreConfig: kv.StoreConfig{ + MemberlistKV: memberListSvc.GetMemberlistKV, + }}, + UpdateTimeout: time.Millisecond * 100, + UpdateTimeoutJitterMax: 0, + FailoverTimeout: time.Millisecond * 2, + }, trackerLimits{maxClusters: 100}, nil, log.NewNopLogger()) + require.NoError(t, services.StartAndAwaitRunning(ctx, c)) + require.NoError(t, err) + t.Cleanup(func() { + assert.NoError(t, services.StopAndAwaitTerminated(ctx, c)) + }) + + now := time.Now() + + // Write the first time. + err = c.checkReplica(context.Background(), "user", cluster, replica1, now) + assert.NoError(t, err) + + // Throw away a sample from replica2. + err = c.checkReplica(context.Background(), "user", cluster, replica2, now) + assert.Error(t, err) + + // Wait more than the overwrite timeout. + now = now.Add(1100 * time.Millisecond) + + // Another sample from replica2 to update its timestamp. + err = c.checkReplica(context.Background(), "user", cluster, replica2, now) + assert.Error(t, err) + + // Update KVStore - this should elect replica 2. + c.updateKVStoreAll(context.Background(), now) + + checkReplicaTimestamp(t, time.Second, c, "user", cluster, replica2, now, now) + + // Now we should accept from replica 2. + err = c.checkReplica(context.Background(), "user", cluster, replica2, now) + assert.NoError(t, err) + + // We timed out accepting samples from replica 1 and should now reject them. + err = c.checkReplica(context.Background(), "user", cluster, replica1, now) + assert.Error(t, err) +} + func TestHATrackerCacheSyncOnStart(t *testing.T) { const cluster = "c1" const replicaOne = "r1" From 116b7aaab8b30f69321466fd165859ee6898c097 Mon Sep 17 00:00:00 2001 From: Nikos Angelopoulos Date: Wed, 27 Nov 2024 15:28:42 +0100 Subject: [PATCH 3/5] refactor: merge logic is taking into account ElectedAt for different replicas --- pkg/distributor/ha_tracker.go | 38 ++++++---- pkg/distributor/ha_tracker_test.go | 114 +++++++++++++++++++++++++++++ 2 files changed, 137 insertions(+), 15 deletions(-) diff --git a/pkg/distributor/ha_tracker.go b/pkg/distributor/ha_tracker.go index 4b987420ca..58aac1b0a9 100644 --- a/pkg/distributor/ha_tracker.go +++ b/pkg/distributor/ha_tracker.go @@ -61,11 +61,11 @@ func NewReplicaDesc() *ReplicaDesc { return &ReplicaDesc{} } -func (r *ReplicaDesc) Merge(other memberlist.Mergeable, localCAS bool) (change memberlist.Mergeable, error error) { - return r.mergeWithTime(other, localCAS) +func (r *ReplicaDesc) Merge(other memberlist.Mergeable, _ bool) (change memberlist.Mergeable, error error) { + return r.mergeWithTime(other) } -func (r *ReplicaDesc) mergeWithTime(mergeable memberlist.Mergeable, _ bool) (memberlist.Mergeable, error) { +func (r *ReplicaDesc) mergeWithTime(mergeable memberlist.Mergeable) (memberlist.Mergeable, error) { if mergeable == nil { return nil, nil } @@ -81,19 +81,31 @@ func (r *ReplicaDesc) mergeWithTime(mergeable memberlist.Mergeable, _ bool) (mem thisRDesc := r otherRDesc := other + // Track changes changed := false - // Keeping the one with the most recent Received At. - // Even if the Replica is different - if otherRDesc.ReceivedAt > thisRDesc.ReceivedAt { - *r = *other - changed = true - } else if thisRDesc.ReceivedAt == otherRDesc.ReceivedAt && thisRDesc.DeletedAt == 0 && otherRDesc.DeletedAt != 0 { - if thisRDesc.Replica == otherRDesc.Replica { + if otherRDesc.Replica == thisRDesc.Replica { + // Keeping the one with the most recent receivedAt timestamp + if otherRDesc.ReceivedAt > thisRDesc.ReceivedAt { + *r = *other + changed = true + } else if thisRDesc.ReceivedAt == otherRDesc.ReceivedAt && thisRDesc.DeletedAt == 0 && otherRDesc.DeletedAt != 0 { *r = *other changed = true } + } else { + // keep the most recent Elected to reach consistency + if otherRDesc.ElectedAt > thisRDesc.ElectedAt { + *r = *other + changed = true + } else if otherRDesc.ElectedAt == thisRDesc.ElectedAt { + // if the timestamps are equal we compare receivedAt + if otherRDesc.ReceivedAt > thisRDesc.ReceivedAt { + *r = *other + changed = true + } + } } // No changes - return @@ -106,9 +118,7 @@ func (r *ReplicaDesc) mergeWithTime(mergeable memberlist.Mergeable, _ bool) (mem return out, nil } -// MergeContent with the current approach will return always 1, with the current value -// Questions to be answered: -// 1. What if r.ReplicaDesc is empty string: answer its gonna return an empty list of strings +// MergeContent noOp currently func (r *ReplicaDesc) MergeContent() []string { result := []string(nil) if len(r.Replica) != 0 { @@ -118,8 +128,6 @@ func (r *ReplicaDesc) MergeContent() []string { } // RemoveTombstones noOp for now -// Questions -// 1. How should we handle deletion for one entry (ReplicaDesc)? func (r *ReplicaDesc) RemoveTombstones(limit time.Time) (total, removed int) { if r.DeletedAt > 0 { if limit.IsZero() || time.Unix(r.DeletedAt, 0).Before(limit) { diff --git a/pkg/distributor/ha_tracker_test.go b/pkg/distributor/ha_tracker_test.go index 28e40ba84a..85995a12ea 100644 --- a/pkg/distributor/ha_tracker_test.go +++ b/pkg/distributor/ha_tracker_test.go @@ -101,6 +101,120 @@ func checkReplicaTimestamp(t *testing.T, duration time.Duration, c *defaultHaTra }) } +func merge(r1, r2 *ReplicaDesc) (*ReplicaDesc, *ReplicaDesc) { + change, err := r1.Merge(r2, false) + if err != nil { + panic(err) + } + + if change == nil { + return r1, nil + } + + changeRDesc := change.(*ReplicaDesc) + return r1, changeRDesc +} + +func TestReplicaDescMerge(t *testing.T) { + now := time.Now().Unix() + + const ( + replica1 = "r1" + replica2 = "r2" + replica3 = "r3" + ) + + firstReplica := func() *ReplicaDesc { + return &ReplicaDesc{ + Replica: replica1, + ReceivedAt: now, + DeletedAt: 0, + ElectedAt: now, + ElectedChanges: 1, + } + } + + firstReplicaWithHigherReceivedAt := func() *ReplicaDesc { + return &ReplicaDesc{ + Replica: replica1, + ReceivedAt: now + 5, + DeletedAt: 0, + ElectedAt: now, + ElectedChanges: 1, + } + } + + secondReplica := func() *ReplicaDesc { + return &ReplicaDesc{ + Replica: replica2, + ReceivedAt: now, + DeletedAt: 0, + ElectedAt: now + 5, + ElectedChanges: 2, + } + } + + thirdReplica := func() *ReplicaDesc { + return &ReplicaDesc{ + Replica: replica3, + ReceivedAt: now, + DeletedAt: 0, + ElectedAt: now + 10, + ElectedChanges: 3, + } + } + + expectedFirstAndFirstHigherReceivedAtMerge := func() *ReplicaDesc { + return &ReplicaDesc{ + Replica: replica1, + ReceivedAt: now + 5, + DeletedAt: 0, + ElectedAt: now, + ElectedChanges: 1, + } + } + + expectedFirstSecondThirdAtMerge := func() *ReplicaDesc { + return &ReplicaDesc{ + Replica: replica3, + ReceivedAt: now, + DeletedAt: 0, + ElectedAt: now + 10, + ElectedChanges: 3, + } + } + + { + ours, ch := merge(firstReplica(), firstReplicaWithHigherReceivedAt()) + assert.Equal(t, expectedFirstAndFirstHigherReceivedAtMerge(), ours) + assert.Equal(t, expectedFirstAndFirstHigherReceivedAtMerge(), ch) + } + + { // idempotency: (no change after applying same Replica again) + ours, ch := merge(expectedFirstAndFirstHigherReceivedAtMerge(), firstReplica()) + assert.Equal(t, expectedFirstAndFirstHigherReceivedAtMerge(), ours) + assert.Equal(t, (*ReplicaDesc)(nil), ch) + } + + { // commutativity: Merge(firstReplicaWithHigherReceivedAt, first) == Merge(first, firstReplicaWithHigherReceivedAt) + our, ch := merge(firstReplicaWithHigherReceivedAt(), firstReplica()) + assert.Equal(t, expectedFirstAndFirstHigherReceivedAtMerge(), our) + // change is nil in this case, since the incoming ReplicaDesc has lower receivedAt timestamp + assert.Equal(t, (*ReplicaDesc)(nil), ch) + } + + { // associativity: Merge(Merge(first, second), third) == Merge(first, Merge(second, third)) + ours1, _ := merge(firstReplica(), secondReplica()) + ours1, _ = merge(ours1, thirdReplica()) + assert.Equal(t, expectedFirstSecondThirdAtMerge(), ours1) + + ours2, _ := merge(secondReplica(), thirdReplica()) + ours2, _ = merge(ours2, firstReplica()) + assert.Equal(t, expectedFirstSecondThirdAtMerge(), ours2) + } + +} + func TestHaTrackerWithMemberList(t *testing.T) { var config memberlist.KVConfig From 6cfcd6362e5ac3911cc1a279d7d6c4fba246ead4 Mon Sep 17 00:00:00 2001 From: Nikos Angelopoulos Date: Wed, 27 Nov 2024 17:04:21 +0100 Subject: [PATCH 4/5] tests: utilize t.Run for tests and add failover constant --- pkg/distributor/ha_tracker.go | 17 ++--- pkg/distributor/ha_tracker_test.go | 106 +++++++++++++++++------------ 2 files changed, 68 insertions(+), 55 deletions(-) diff --git a/pkg/distributor/ha_tracker.go b/pkg/distributor/ha_tracker.go index 58aac1b0a9..54578c31c2 100644 --- a/pkg/distributor/ha_tracker.go +++ b/pkg/distributor/ha_tracker.go @@ -82,34 +82,27 @@ func (r *ReplicaDesc) mergeWithTime(mergeable memberlist.Mergeable) (memberlist. thisRDesc := r otherRDesc := other - // Track changes - changed := false - if otherRDesc.Replica == thisRDesc.Replica { // Keeping the one with the most recent receivedAt timestamp if otherRDesc.ReceivedAt > thisRDesc.ReceivedAt { - *r = *other - changed = true + *thisRDesc = *other } else if thisRDesc.ReceivedAt == otherRDesc.ReceivedAt && thisRDesc.DeletedAt == 0 && otherRDesc.DeletedAt != 0 { - *r = *other - changed = true + *thisRDesc = *other } } else { // keep the most recent Elected to reach consistency if otherRDesc.ElectedAt > thisRDesc.ElectedAt { - *r = *other - changed = true + *thisRDesc = *other } else if otherRDesc.ElectedAt == thisRDesc.ElectedAt { // if the timestamps are equal we compare receivedAt if otherRDesc.ReceivedAt > thisRDesc.ReceivedAt { - *r = *other - changed = true + *thisRDesc = *other } } } // No changes - return - if !changed { + if *thisRDesc != *otherRDesc { return nil, nil } diff --git a/pkg/distributor/ha_tracker_test.go b/pkg/distributor/ha_tracker_test.go index 85995a12ea..ea42f0081a 100644 --- a/pkg/distributor/ha_tracker_test.go +++ b/pkg/distributor/ha_tracker_test.go @@ -114,7 +114,6 @@ func merge(r1, r2 *ReplicaDesc) (*ReplicaDesc, *ReplicaDesc) { changeRDesc := change.(*ReplicaDesc) return r1, changeRDesc } - func TestReplicaDescMerge(t *testing.T) { now := time.Now().Unix() @@ -174,53 +173,74 @@ func TestReplicaDescMerge(t *testing.T) { } } - expectedFirstSecondThirdAtMerge := func() *ReplicaDesc { - return &ReplicaDesc{ - Replica: replica3, - ReceivedAt: now, - DeletedAt: 0, - ElectedAt: now + 10, - ElectedChanges: 3, - } - } - - { - ours, ch := merge(firstReplica(), firstReplicaWithHigherReceivedAt()) - assert.Equal(t, expectedFirstAndFirstHigherReceivedAtMerge(), ours) - assert.Equal(t, expectedFirstAndFirstHigherReceivedAtMerge(), ch) - } - - { // idempotency: (no change after applying same Replica again) - ours, ch := merge(expectedFirstAndFirstHigherReceivedAtMerge(), firstReplica()) - assert.Equal(t, expectedFirstAndFirstHigherReceivedAtMerge(), ours) - assert.Equal(t, (*ReplicaDesc)(nil), ch) - } - - { // commutativity: Merge(firstReplicaWithHigherReceivedAt, first) == Merge(first, firstReplicaWithHigherReceivedAt) - our, ch := merge(firstReplicaWithHigherReceivedAt(), firstReplica()) - assert.Equal(t, expectedFirstAndFirstHigherReceivedAtMerge(), our) - // change is nil in this case, since the incoming ReplicaDesc has lower receivedAt timestamp - assert.Equal(t, (*ReplicaDesc)(nil), ch) + tests := []struct { + name string + rDesc1 *ReplicaDesc + rDesc2 *ReplicaDesc + expectedOurs *ReplicaDesc + expectedChange *ReplicaDesc + }{ + { + name: "simple merge: firstReplica and firstReplicaWithHigherReceivedAt", + rDesc1: firstReplica(), + rDesc2: firstReplicaWithHigherReceivedAt(), + expectedOurs: expectedFirstAndFirstHigherReceivedAtMerge(), + expectedChange: expectedFirstAndFirstHigherReceivedAtMerge(), + }, + { + name: "idempotency: no change after applying same Replica again", + rDesc1: expectedFirstAndFirstHigherReceivedAtMerge(), + rDesc2: firstReplica(), + expectedOurs: expectedFirstAndFirstHigherReceivedAtMerge(), + expectedChange: nil, + }, + { + name: "commutativity: Merge(firstReplicaWithHigherReceivedAt, first) == Merge(first, firstReplicaWithHigherReceivedAt)", + rDesc1: firstReplicaWithHigherReceivedAt(), + rDesc2: firstReplica(), + expectedOurs: func() *ReplicaDesc { + expected, _ := merge(firstReplica(), firstReplicaWithHigherReceivedAt()) + return expected + }(), + expectedChange: nil, + }, + { + name: "associativity: Merge(Merge(first, second), third) == Merge(first, Merge(second, third))", + rDesc1: func() *ReplicaDesc { + ours1, _ := merge(firstReplica(), secondReplica()) + ours1, _ = merge(ours1, thirdReplica()) + return ours1 + }(), + rDesc2: nil, + expectedOurs: func() *ReplicaDesc { + ours2, _ := merge(secondReplica(), thirdReplica()) + ours2, _ = merge(ours2, firstReplica()) + return ours2 + }(), + expectedChange: nil, + }, } - { // associativity: Merge(Merge(first, second), third) == Merge(first, Merge(second, third)) - ours1, _ := merge(firstReplica(), secondReplica()) - ours1, _ = merge(ours1, thirdReplica()) - assert.Equal(t, expectedFirstSecondThirdAtMerge(), ours1) - - ours2, _ := merge(secondReplica(), thirdReplica()) - ours2, _ = merge(ours2, firstReplica()) - assert.Equal(t, expectedFirstSecondThirdAtMerge(), ours2) + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + ours, ch := merge(tt.rDesc1, tt.rDesc2) + assert.Equal(t, tt.expectedOurs, ours) + assert.Equal(t, tt.expectedChange, ch) + }) } - } func TestHaTrackerWithMemberList(t *testing.T) { var config memberlist.KVConfig - const cluster = "cluster" - const replica1 = "r1" - const replica2 = "r2" + const ( + cluster = "cluster" + replica1 = "r1" + replica2 = "r2" + updateTimeout = time.Millisecond * 100 + failoverTimeout = 2 * time.Millisecond + failoverTimeoutPlus100ms = failoverTimeout + 100*time.Millisecond + ) flagext.DefaultValues(&config) ctx := context.Background() @@ -250,9 +270,9 @@ func TestHaTrackerWithMemberList(t *testing.T) { KVStore: kv.Config{Store: "memberlist", StoreConfig: kv.StoreConfig{ MemberlistKV: memberListSvc.GetMemberlistKV, }}, - UpdateTimeout: time.Millisecond * 100, + UpdateTimeout: updateTimeout, UpdateTimeoutJitterMax: 0, - FailoverTimeout: time.Millisecond * 2, + FailoverTimeout: failoverTimeout, }, trackerLimits{maxClusters: 100}, nil, log.NewNopLogger()) require.NoError(t, services.StartAndAwaitRunning(ctx, c)) require.NoError(t, err) @@ -271,7 +291,7 @@ func TestHaTrackerWithMemberList(t *testing.T) { assert.Error(t, err) // Wait more than the overwrite timeout. - now = now.Add(1100 * time.Millisecond) + now = now.Add(time.Millisecond * failoverTimeoutPlus100ms) // Another sample from replica2 to update its timestamp. err = c.checkReplica(context.Background(), "user", cluster, replica2, now) From 2ceeadc8c9dcb16ebd9540b074022255be21c72f Mon Sep 17 00:00:00 2001 From: Nikos Angelopoulos Date: Thu, 28 Nov 2024 16:24:40 +0100 Subject: [PATCH 5/5] tests: improve testcases for TestReplicaDescMerge --- pkg/distributor/ha_tracker.go | 51 ++++++++++++-------------- pkg/distributor/ha_tracker_test.go | 58 ++++++++++++++++++++---------- 2 files changed, 61 insertions(+), 48 deletions(-) diff --git a/pkg/distributor/ha_tracker.go b/pkg/distributor/ha_tracker.go index 54578c31c2..85c0a346d7 100644 --- a/pkg/distributor/ha_tracker.go +++ b/pkg/distributor/ha_tracker.go @@ -61,6 +61,9 @@ func NewReplicaDesc() *ReplicaDesc { return &ReplicaDesc{} } +// Merge merges other ReplicaDesc into this one. +// The decision is made based on the ReceivedAt timestamp, if the Replica name is the same and at the ElectedAt if the +// Replica name is different func (r *ReplicaDesc) Merge(other memberlist.Mergeable, _ bool) (change memberlist.Mergeable, error error) { return r.mergeWithTime(other) } @@ -79,61 +82,51 @@ func (r *ReplicaDesc) mergeWithTime(mergeable memberlist.Mergeable) (memberlist. return nil, nil } - thisRDesc := r - otherRDesc := other - - if otherRDesc.Replica == thisRDesc.Replica { + if other.Replica == r.Replica { // Keeping the one with the most recent receivedAt timestamp - if otherRDesc.ReceivedAt > thisRDesc.ReceivedAt { - *thisRDesc = *other - } else if thisRDesc.ReceivedAt == otherRDesc.ReceivedAt && thisRDesc.DeletedAt == 0 && otherRDesc.DeletedAt != 0 { - *thisRDesc = *other + if other.ReceivedAt > r.ReceivedAt { + *r = *other + } else if r.ReceivedAt == other.ReceivedAt && r.DeletedAt == 0 && other.DeletedAt != 0 { + *r = *other } } else { // keep the most recent Elected to reach consistency - if otherRDesc.ElectedAt > thisRDesc.ElectedAt { - *thisRDesc = *other - } else if otherRDesc.ElectedAt == thisRDesc.ElectedAt { + if other.ElectedAt > r.ElectedAt { + *r = *other + } else if other.ElectedAt == r.ElectedAt { // if the timestamps are equal we compare receivedAt - if otherRDesc.ReceivedAt > thisRDesc.ReceivedAt { - *thisRDesc = *other + if other.ReceivedAt > r.ReceivedAt { + *r = *other } } } - // No changes - return - if *thisRDesc != *otherRDesc { + // No changes + if *r != *other { return nil, nil } out := NewReplicaDesc() - *out = *thisRDesc + *out = *r return out, nil } -// MergeContent noOp currently +// MergeContent describes content of this Mergeable. +// Given that ReplicaDesc can have only one instance at a time, it returns the ReplicaDesc it contains. func (r *ReplicaDesc) MergeContent() []string { result := []string(nil) if len(r.Replica) != 0 { - result = append(result, r.Replica) + result = append(result, r.String()) } return result } -// RemoveTombstones noOp for now -func (r *ReplicaDesc) RemoveTombstones(limit time.Time) (total, removed int) { - if r.DeletedAt > 0 { - if limit.IsZero() || time.Unix(r.DeletedAt, 0).Before(limit) { - // need to implement the remove logic - removed = 1 - } else { - total = 1 - } - } +// RemoveTombstones noOp. +func (r *ReplicaDesc) RemoveTombstones(_ time.Time) (total, removed int) { return } -// Clone returns a deep copy of the ring state. +// Clone returns a deep copy of the ReplicaDesc. func (r *ReplicaDesc) Clone() memberlist.Mergeable { return proto.Clone(r).(*ReplicaDesc) } diff --git a/pkg/distributor/ha_tracker_test.go b/pkg/distributor/ha_tracker_test.go index ea42f0081a..9d4c41a9eb 100644 --- a/pkg/distributor/ha_tracker_test.go +++ b/pkg/distributor/ha_tracker_test.go @@ -173,36 +173,56 @@ func TestReplicaDescMerge(t *testing.T) { } } - tests := []struct { + expectedFirstAndSecondMerge := func() *ReplicaDesc { + return &ReplicaDesc{ + Replica: replica2, + ReceivedAt: now, + DeletedAt: 0, + ElectedAt: now + 5, + ElectedChanges: 2, + } + } + + testsMerge := []struct { name string rDesc1 *ReplicaDesc rDesc2 *ReplicaDesc - expectedOurs *ReplicaDesc + expectedRDesc *ReplicaDesc expectedChange *ReplicaDesc }{ { - name: "simple merge: firstReplica and firstReplicaWithHigherReceivedAt", + name: "Merge ReplicaDesc: Same replica name, different receivedAt should return ReplicaDesc with most recent receivedAt timestamp", rDesc1: firstReplica(), rDesc2: firstReplicaWithHigherReceivedAt(), - expectedOurs: expectedFirstAndFirstHigherReceivedAtMerge(), + expectedRDesc: expectedFirstAndFirstHigherReceivedAtMerge(), expectedChange: expectedFirstAndFirstHigherReceivedAtMerge(), }, { - name: "idempotency: no change after applying same Replica again", - rDesc1: expectedFirstAndFirstHigherReceivedAtMerge(), + name: "Merge ReplicaDesc: Different replica name, different electedAt should return ReplicaDesc with most recent electedAt timestamp", + rDesc1: firstReplica(), + rDesc2: secondReplica(), + expectedRDesc: expectedFirstAndSecondMerge(), + expectedChange: expectedFirstAndSecondMerge(), + }, + { + name: "idempotency: no change after applying same ReplicaDesc again.", + rDesc1: func() *ReplicaDesc { + out, _ := merge(firstReplica(), secondReplica()) + return out + }(), rDesc2: firstReplica(), - expectedOurs: expectedFirstAndFirstHigherReceivedAtMerge(), + expectedRDesc: expectedFirstAndSecondMerge(), expectedChange: nil, }, { - name: "commutativity: Merge(firstReplicaWithHigherReceivedAt, first) == Merge(first, firstReplicaWithHigherReceivedAt)", - rDesc1: firstReplicaWithHigherReceivedAt(), - rDesc2: firstReplica(), - expectedOurs: func() *ReplicaDesc { - expected, _ := merge(firstReplica(), firstReplicaWithHigherReceivedAt()) + name: "commutativity: Merge(first, second) == Merge(second, first)", + rDesc1: firstReplica(), + rDesc2: secondReplica(), + expectedRDesc: func() *ReplicaDesc { + expected, _ := merge(secondReplica(), firstReplica()) return expected }(), - expectedChange: nil, + expectedChange: expectedFirstAndSecondMerge(), }, { name: "associativity: Merge(Merge(first, second), third) == Merge(first, Merge(second, third))", @@ -212,19 +232,19 @@ func TestReplicaDescMerge(t *testing.T) { return ours1 }(), rDesc2: nil, - expectedOurs: func() *ReplicaDesc { + expectedRDesc: func() *ReplicaDesc { ours2, _ := merge(secondReplica(), thirdReplica()) - ours2, _ = merge(ours2, firstReplica()) + ours2, _ = merge(firstReplica(), ours2) return ours2 }(), expectedChange: nil, }, } - for _, tt := range tests { + for _, tt := range testsMerge { t.Run(tt.name, func(t *testing.T) { - ours, ch := merge(tt.rDesc1, tt.rDesc2) - assert.Equal(t, tt.expectedOurs, ours) + rDesc, ch := merge(tt.rDesc1, tt.rDesc2) + assert.Equal(t, tt.expectedRDesc, rDesc) assert.Equal(t, tt.expectedChange, ch) }) } @@ -291,7 +311,7 @@ func TestHaTrackerWithMemberList(t *testing.T) { assert.Error(t, err) // Wait more than the overwrite timeout. - now = now.Add(time.Millisecond * failoverTimeoutPlus100ms) + now = now.Add(failoverTimeoutPlus100ms) // Another sample from replica2 to update its timestamp. err = c.checkReplica(context.Background(), "user", cluster, replica2, now)