diff --git a/pkg/distributor/ha_tracker.go b/pkg/distributor/ha_tracker.go index 2303bd08ac..85c0a346d7 100644 --- a/pkg/distributor/ha_tracker.go +++ b/pkg/distributor/ha_tracker.go @@ -21,6 +21,7 @@ import ( "github.com/gogo/protobuf/proto" "github.com/grafana/dskit/kv" "github.com/grafana/dskit/kv/codec" + "github.com/grafana/dskit/kv/memberlist" "github.com/grafana/dskit/services" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" @@ -60,6 +61,76 @@ func NewReplicaDesc() *ReplicaDesc { return &ReplicaDesc{} } +// Merge merges other ReplicaDesc into this one. +// The decision is made based on the ReceivedAt timestamp, if the Replica name is the same and at the ElectedAt if the +// Replica name is different +func (r *ReplicaDesc) Merge(other memberlist.Mergeable, _ bool) (change memberlist.Mergeable, error error) { + return r.mergeWithTime(other) +} + +func (r *ReplicaDesc) mergeWithTime(mergeable memberlist.Mergeable) (memberlist.Mergeable, error) { + if mergeable == nil { + return nil, nil + } + + other, ok := mergeable.(*ReplicaDesc) + if !ok { + return nil, fmt.Errorf("expected *distributor.ReplicaDesc, got %T", mergeable) + } + + if other == nil { + return nil, nil + } + + if other.Replica == r.Replica { + // Keeping the one with the most recent receivedAt timestamp + if other.ReceivedAt > r.ReceivedAt { + *r = *other + } else if r.ReceivedAt == other.ReceivedAt && r.DeletedAt == 0 && other.DeletedAt != 0 { + *r = *other + } + } else { + // keep the most recent Elected to reach consistency + if other.ElectedAt > r.ElectedAt { + *r = *other + } else if other.ElectedAt == r.ElectedAt { + // if the timestamps are equal we compare receivedAt + if other.ReceivedAt > r.ReceivedAt { + *r = *other + } + } + } + + // No changes + if *r != *other { + return nil, nil + } + + out := NewReplicaDesc() + *out = *r + return out, nil +} + +// MergeContent describes content of this Mergeable. +// Given that ReplicaDesc can have only one instance at a time, it returns the ReplicaDesc it contains. +func (r *ReplicaDesc) MergeContent() []string { + result := []string(nil) + if len(r.Replica) != 0 { + result = append(result, r.String()) + } + return result +} + +// RemoveTombstones noOp. +func (r *ReplicaDesc) RemoveTombstones(_ time.Time) (total, removed int) { + return +} + +// Clone returns a deep copy of the ReplicaDesc. +func (r *ReplicaDesc) Clone() memberlist.Mergeable { + return proto.Clone(r).(*ReplicaDesc) +} + // HATrackerConfig contains the configuration required to // create an HA Tracker. type HATrackerConfig struct { diff --git a/pkg/distributor/ha_tracker_test.go b/pkg/distributor/ha_tracker_test.go index c950f92f80..9d4c41a9eb 100644 --- a/pkg/distributor/ha_tracker_test.go +++ b/pkg/distributor/ha_tracker_test.go @@ -8,15 +8,19 @@ package distributor import ( "context" "fmt" + "net" "os" "strings" + "sync" "testing" "time" "github.com/go-kit/log" "github.com/grafana/dskit/flagext" "github.com/grafana/dskit/kv" + "github.com/grafana/dskit/kv/codec" "github.com/grafana/dskit/kv/consul" + "github.com/grafana/dskit/kv/memberlist" "github.com/grafana/dskit/ring" "github.com/grafana/dskit/services" "github.com/grafana/dskit/test" @@ -33,6 +37,33 @@ import ( utiltest "github.com/grafana/mimir/pkg/util/test" ) +var addrsOnce sync.Once +var localhostIP string + +func getLocalhostAddrs() []string { + addrsOnce.Do(func() { + ip, err := net.ResolveIPAddr("ip4", "localhost") + if err != nil { + localhostIP = "127.0.0.1" // this is the most common answer, try it + } + localhostIP = ip.String() + }) + return []string{localhostIP} +} + +type dnsProviderMock struct { + resolved []string +} + +func (p *dnsProviderMock) Resolve(_ context.Context, addrs []string) error { + p.resolved = addrs + return nil +} + +func (p dnsProviderMock) Addresses() []string { + return p.resolved +} + func checkReplicaTimestamp(t *testing.T, duration time.Duration, c *defaultHaTracker, user, cluster, replica string, expected time.Time, elected time.Time) { t.Helper() @@ -70,6 +101,236 @@ func checkReplicaTimestamp(t *testing.T, duration time.Duration, c *defaultHaTra }) } +func merge(r1, r2 *ReplicaDesc) (*ReplicaDesc, *ReplicaDesc) { + change, err := r1.Merge(r2, false) + if err != nil { + panic(err) + } + + if change == nil { + return r1, nil + } + + changeRDesc := change.(*ReplicaDesc) + return r1, changeRDesc +} +func TestReplicaDescMerge(t *testing.T) { + now := time.Now().Unix() + + const ( + replica1 = "r1" + replica2 = "r2" + replica3 = "r3" + ) + + firstReplica := func() *ReplicaDesc { + return &ReplicaDesc{ + Replica: replica1, + ReceivedAt: now, + DeletedAt: 0, + ElectedAt: now, + ElectedChanges: 1, + } + } + + firstReplicaWithHigherReceivedAt := func() *ReplicaDesc { + return &ReplicaDesc{ + Replica: replica1, + ReceivedAt: now + 5, + DeletedAt: 0, + ElectedAt: now, + ElectedChanges: 1, + } + } + + secondReplica := func() *ReplicaDesc { + return &ReplicaDesc{ + Replica: replica2, + ReceivedAt: now, + DeletedAt: 0, + ElectedAt: now + 5, + ElectedChanges: 2, + } + } + + thirdReplica := func() *ReplicaDesc { + return &ReplicaDesc{ + Replica: replica3, + ReceivedAt: now, + DeletedAt: 0, + ElectedAt: now + 10, + ElectedChanges: 3, + } + } + + expectedFirstAndFirstHigherReceivedAtMerge := func() *ReplicaDesc { + return &ReplicaDesc{ + Replica: replica1, + ReceivedAt: now + 5, + DeletedAt: 0, + ElectedAt: now, + ElectedChanges: 1, + } + } + + expectedFirstAndSecondMerge := func() *ReplicaDesc { + return &ReplicaDesc{ + Replica: replica2, + ReceivedAt: now, + DeletedAt: 0, + ElectedAt: now + 5, + ElectedChanges: 2, + } + } + + testsMerge := []struct { + name string + rDesc1 *ReplicaDesc + rDesc2 *ReplicaDesc + expectedRDesc *ReplicaDesc + expectedChange *ReplicaDesc + }{ + { + name: "Merge ReplicaDesc: Same replica name, different receivedAt should return ReplicaDesc with most recent receivedAt timestamp", + rDesc1: firstReplica(), + rDesc2: firstReplicaWithHigherReceivedAt(), + expectedRDesc: expectedFirstAndFirstHigherReceivedAtMerge(), + expectedChange: expectedFirstAndFirstHigherReceivedAtMerge(), + }, + { + name: "Merge ReplicaDesc: Different replica name, different electedAt should return ReplicaDesc with most recent electedAt timestamp", + rDesc1: firstReplica(), + rDesc2: secondReplica(), + expectedRDesc: expectedFirstAndSecondMerge(), + expectedChange: expectedFirstAndSecondMerge(), + }, + { + name: "idempotency: no change after applying same ReplicaDesc again.", + rDesc1: func() *ReplicaDesc { + out, _ := merge(firstReplica(), secondReplica()) + return out + }(), + rDesc2: firstReplica(), + expectedRDesc: expectedFirstAndSecondMerge(), + expectedChange: nil, + }, + { + name: "commutativity: Merge(first, second) == Merge(second, first)", + rDesc1: firstReplica(), + rDesc2: secondReplica(), + expectedRDesc: func() *ReplicaDesc { + expected, _ := merge(secondReplica(), firstReplica()) + return expected + }(), + expectedChange: expectedFirstAndSecondMerge(), + }, + { + name: "associativity: Merge(Merge(first, second), third) == Merge(first, Merge(second, third))", + rDesc1: func() *ReplicaDesc { + ours1, _ := merge(firstReplica(), secondReplica()) + ours1, _ = merge(ours1, thirdReplica()) + return ours1 + }(), + rDesc2: nil, + expectedRDesc: func() *ReplicaDesc { + ours2, _ := merge(secondReplica(), thirdReplica()) + ours2, _ = merge(firstReplica(), ours2) + return ours2 + }(), + expectedChange: nil, + }, + } + + for _, tt := range testsMerge { + t.Run(tt.name, func(t *testing.T) { + rDesc, ch := merge(tt.rDesc1, tt.rDesc2) + assert.Equal(t, tt.expectedRDesc, rDesc) + assert.Equal(t, tt.expectedChange, ch) + }) + } +} + +func TestHaTrackerWithMemberList(t *testing.T) { + var config memberlist.KVConfig + + const ( + cluster = "cluster" + replica1 = "r1" + replica2 = "r2" + updateTimeout = time.Millisecond * 100 + failoverTimeout = 2 * time.Millisecond + failoverTimeoutPlus100ms = failoverTimeout + 100*time.Millisecond + ) + + flagext.DefaultValues(&config) + ctx := context.Background() + + config.TCPTransport = memberlist.TCPTransportConfig{ + BindAddrs: getLocalhostAddrs(), + BindPort: 0, + } + + config.Codecs = []codec.Codec{ + GetReplicaDescCodec(), + } + + memberListSvc := memberlist.NewKVInitService( + &config, + log.NewNopLogger(), + &dnsProviderMock{}, + prometheus.NewPedanticRegistry(), + ) + + t.Cleanup(func() { + assert.NoError(t, services.StopAndAwaitTerminated(ctx, memberListSvc)) + }) + + c, err := newHaTracker(HATrackerConfig{ + EnableHATracker: true, + KVStore: kv.Config{Store: "memberlist", StoreConfig: kv.StoreConfig{ + MemberlistKV: memberListSvc.GetMemberlistKV, + }}, + UpdateTimeout: updateTimeout, + UpdateTimeoutJitterMax: 0, + FailoverTimeout: failoverTimeout, + }, trackerLimits{maxClusters: 100}, nil, log.NewNopLogger()) + require.NoError(t, services.StartAndAwaitRunning(ctx, c)) + require.NoError(t, err) + t.Cleanup(func() { + assert.NoError(t, services.StopAndAwaitTerminated(ctx, c)) + }) + + now := time.Now() + + // Write the first time. + err = c.checkReplica(context.Background(), "user", cluster, replica1, now) + assert.NoError(t, err) + + // Throw away a sample from replica2. + err = c.checkReplica(context.Background(), "user", cluster, replica2, now) + assert.Error(t, err) + + // Wait more than the overwrite timeout. + now = now.Add(failoverTimeoutPlus100ms) + + // Another sample from replica2 to update its timestamp. + err = c.checkReplica(context.Background(), "user", cluster, replica2, now) + assert.Error(t, err) + + // Update KVStore - this should elect replica 2. + c.updateKVStoreAll(context.Background(), now) + + checkReplicaTimestamp(t, time.Second, c, "user", cluster, replica2, now, now) + + // Now we should accept from replica 2. + err = c.checkReplica(context.Background(), "user", cluster, replica2, now) + assert.NoError(t, err) + + // We timed out accepting samples from replica 1 and should now reject them. + err = c.checkReplica(context.Background(), "user", cluster, replica1, now) + assert.Error(t, err) +} + func TestHATrackerCacheSyncOnStart(t *testing.T) { const cluster = "c1" const replicaOne = "r1"