Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(ha_tracker): Replicadesc implement mergeable interface #10020

Draft
wants to merge 5 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 71 additions & 0 deletions pkg/distributor/ha_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/gogo/protobuf/proto"
"github.com/grafana/dskit/kv"
"github.com/grafana/dskit/kv/codec"
"github.com/grafana/dskit/kv/memberlist"
"github.com/grafana/dskit/services"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
Expand Down Expand Up @@ -60,6 +61,76 @@ func NewReplicaDesc() *ReplicaDesc {
return &ReplicaDesc{}
}

// Merge merges other ReplicaDesc into this one.
// The decision is made based on the ReceivedAt timestamp, if the Replica name is the same and at the ElectedAt if the
// Replica name is different
func (r *ReplicaDesc) Merge(other memberlist.Mergeable, _ bool) (change memberlist.Mergeable, error error) {
return r.mergeWithTime(other)
}

func (r *ReplicaDesc) mergeWithTime(mergeable memberlist.Mergeable) (memberlist.Mergeable, error) {
if mergeable == nil {
return nil, nil
}

other, ok := mergeable.(*ReplicaDesc)
if !ok {
return nil, fmt.Errorf("expected *distributor.ReplicaDesc, got %T", mergeable)
}

if other == nil {
return nil, nil
}

if other.Replica == r.Replica {
// Keeping the one with the most recent receivedAt timestamp
if other.ReceivedAt > r.ReceivedAt {
*r = *other
} else if r.ReceivedAt == other.ReceivedAt && r.DeletedAt == 0 && other.DeletedAt != 0 {
*r = *other
}
} else {
// keep the most recent Elected to reach consistency
if other.ElectedAt > r.ElectedAt {
*r = *other
} else if other.ElectedAt == r.ElectedAt {
// if the timestamps are equal we compare receivedAt
if other.ReceivedAt > r.ReceivedAt {
*r = *other
}
}
}

// No changes
if *r != *other {
return nil, nil
}

out := NewReplicaDesc()
*out = *r
return out, nil
}

// MergeContent describes content of this Mergeable.
// Given that ReplicaDesc can have only one instance at a time, it returns the ReplicaDesc it contains.
func (r *ReplicaDesc) MergeContent() []string {
result := []string(nil)
if len(r.Replica) != 0 {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

WDYT about using r.IsEmpty() here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cannot find this function as part of the ReplicaDesc, do you want me to implement it ?

result = append(result, r.String())
}
return result
}

// RemoveTombstones noOp.
func (r *ReplicaDesc) RemoveTombstones(_ time.Time) (total, removed int) {
return
}

// Clone returns a deep copy of the ReplicaDesc.
func (r *ReplicaDesc) Clone() memberlist.Mergeable {
return proto.Clone(r).(*ReplicaDesc)
}

// HATrackerConfig contains the configuration required to
// create an HA Tracker.
type HATrackerConfig struct {
Expand Down
261 changes: 261 additions & 0 deletions pkg/distributor/ha_tracker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,19 @@ package distributor
import (
"context"
"fmt"
"net"
"os"
"strings"
"sync"
"testing"
"time"

"github.com/go-kit/log"
"github.com/grafana/dskit/flagext"
"github.com/grafana/dskit/kv"
"github.com/grafana/dskit/kv/codec"
"github.com/grafana/dskit/kv/consul"
"github.com/grafana/dskit/kv/memberlist"
"github.com/grafana/dskit/ring"
"github.com/grafana/dskit/services"
"github.com/grafana/dskit/test"
Expand All @@ -33,6 +37,33 @@ import (
utiltest "github.com/grafana/mimir/pkg/util/test"
)

var addrsOnce sync.Once
var localhostIP string

func getLocalhostAddrs() []string {
addrsOnce.Do(func() {
ip, err := net.ResolveIPAddr("ip4", "localhost")
if err != nil {
localhostIP = "127.0.0.1" // this is the most common answer, try it
}
localhostIP = ip.String()
})
return []string{localhostIP}
}

type dnsProviderMock struct {
resolved []string
}

func (p *dnsProviderMock) Resolve(_ context.Context, addrs []string) error {
p.resolved = addrs
return nil
}

func (p dnsProviderMock) Addresses() []string {
return p.resolved
}

func checkReplicaTimestamp(t *testing.T, duration time.Duration, c *defaultHaTracker, user, cluster, replica string, expected time.Time, elected time.Time) {
t.Helper()

Expand Down Expand Up @@ -70,6 +101,236 @@ func checkReplicaTimestamp(t *testing.T, duration time.Duration, c *defaultHaTra
})
}

func merge(r1, r2 *ReplicaDesc) (*ReplicaDesc, *ReplicaDesc) {
change, err := r1.Merge(r2, false)
if err != nil {
panic(err)
}

if change == nil {
return r1, nil
}

changeRDesc := change.(*ReplicaDesc)
return r1, changeRDesc
}
func TestReplicaDescMerge(t *testing.T) {
now := time.Now().Unix()

const (
replica1 = "r1"
replica2 = "r2"
replica3 = "r3"
)

firstReplica := func() *ReplicaDesc {
return &ReplicaDesc{
Replica: replica1,
ReceivedAt: now,
DeletedAt: 0,
ElectedAt: now,
ElectedChanges: 1,
}
}

firstReplicaWithHigherReceivedAt := func() *ReplicaDesc {
return &ReplicaDesc{
Replica: replica1,
ReceivedAt: now + 5,
DeletedAt: 0,
ElectedAt: now,
ElectedChanges: 1,
}
}

secondReplica := func() *ReplicaDesc {
return &ReplicaDesc{
Replica: replica2,
ReceivedAt: now,
DeletedAt: 0,
ElectedAt: now + 5,
ElectedChanges: 2,
}
}

thirdReplica := func() *ReplicaDesc {
return &ReplicaDesc{
Replica: replica3,
ReceivedAt: now,
DeletedAt: 0,
ElectedAt: now + 10,
ElectedChanges: 3,
}
}

expectedFirstAndFirstHigherReceivedAtMerge := func() *ReplicaDesc {
return &ReplicaDesc{
Replica: replica1,
ReceivedAt: now + 5,
DeletedAt: 0,
ElectedAt: now,
ElectedChanges: 1,
}
}

expectedFirstAndSecondMerge := func() *ReplicaDesc {
return &ReplicaDesc{
Replica: replica2,
ReceivedAt: now,
DeletedAt: 0,
ElectedAt: now + 5,
ElectedChanges: 2,
}
}

testsMerge := []struct {
name string
rDesc1 *ReplicaDesc
rDesc2 *ReplicaDesc
expectedRDesc *ReplicaDesc
expectedChange *ReplicaDesc
}{
{
name: "Merge ReplicaDesc: Same replica name, different receivedAt should return ReplicaDesc with most recent receivedAt timestamp",
rDesc1: firstReplica(),
rDesc2: firstReplicaWithHigherReceivedAt(),
expectedRDesc: expectedFirstAndFirstHigherReceivedAtMerge(),
expectedChange: expectedFirstAndFirstHigherReceivedAtMerge(),
},
{
name: "Merge ReplicaDesc: Different replica name, different electedAt should return ReplicaDesc with most recent electedAt timestamp",
rDesc1: firstReplica(),
rDesc2: secondReplica(),
expectedRDesc: expectedFirstAndSecondMerge(),
expectedChange: expectedFirstAndSecondMerge(),
},
{
name: "idempotency: no change after applying same ReplicaDesc again.",
rDesc1: func() *ReplicaDesc {
out, _ := merge(firstReplica(), secondReplica())
return out
}(),
rDesc2: firstReplica(),
expectedRDesc: expectedFirstAndSecondMerge(),
expectedChange: nil,
},
{
name: "commutativity: Merge(first, second) == Merge(second, first)",
rDesc1: firstReplica(),
rDesc2: secondReplica(),
expectedRDesc: func() *ReplicaDesc {
expected, _ := merge(secondReplica(), firstReplica())
return expected
}(),
expectedChange: expectedFirstAndSecondMerge(),
},
{
name: "associativity: Merge(Merge(first, second), third) == Merge(first, Merge(second, third))",
rDesc1: func() *ReplicaDesc {
ours1, _ := merge(firstReplica(), secondReplica())
ours1, _ = merge(ours1, thirdReplica())
return ours1
}(),
rDesc2: nil,
expectedRDesc: func() *ReplicaDesc {
ours2, _ := merge(secondReplica(), thirdReplica())
ours2, _ = merge(firstReplica(), ours2)
return ours2
}(),
expectedChange: nil,
},
}

for _, tt := range testsMerge {
t.Run(tt.name, func(t *testing.T) {
rDesc, ch := merge(tt.rDesc1, tt.rDesc2)
assert.Equal(t, tt.expectedRDesc, rDesc)
assert.Equal(t, tt.expectedChange, ch)
})
}
}

func TestHaTrackerWithMemberList(t *testing.T) {
var config memberlist.KVConfig

const (
cluster = "cluster"
replica1 = "r1"
replica2 = "r2"
updateTimeout = time.Millisecond * 100
failoverTimeout = 2 * time.Millisecond
failoverTimeoutPlus100ms = failoverTimeout + 100*time.Millisecond
)

flagext.DefaultValues(&config)
ctx := context.Background()

config.TCPTransport = memberlist.TCPTransportConfig{
BindAddrs: getLocalhostAddrs(),
BindPort: 0,
}

config.Codecs = []codec.Codec{
GetReplicaDescCodec(),
}

memberListSvc := memberlist.NewKVInitService(
&config,
log.NewNopLogger(),
&dnsProviderMock{},
prometheus.NewPedanticRegistry(),
)

t.Cleanup(func() {
assert.NoError(t, services.StopAndAwaitTerminated(ctx, memberListSvc))
})

c, err := newHaTracker(HATrackerConfig{
EnableHATracker: true,
KVStore: kv.Config{Store: "memberlist", StoreConfig: kv.StoreConfig{
MemberlistKV: memberListSvc.GetMemberlistKV,
}},
UpdateTimeout: updateTimeout,
UpdateTimeoutJitterMax: 0,
FailoverTimeout: failoverTimeout,
}, trackerLimits{maxClusters: 100}, nil, log.NewNopLogger())
require.NoError(t, services.StartAndAwaitRunning(ctx, c))
require.NoError(t, err)
t.Cleanup(func() {
assert.NoError(t, services.StopAndAwaitTerminated(ctx, c))
})

now := time.Now()

// Write the first time.
err = c.checkReplica(context.Background(), "user", cluster, replica1, now)
assert.NoError(t, err)

// Throw away a sample from replica2.
err = c.checkReplica(context.Background(), "user", cluster, replica2, now)
assert.Error(t, err)

// Wait more than the overwrite timeout.
now = now.Add(failoverTimeoutPlus100ms)

// Another sample from replica2 to update its timestamp.
err = c.checkReplica(context.Background(), "user", cluster, replica2, now)
assert.Error(t, err)

// Update KVStore - this should elect replica 2.
c.updateKVStoreAll(context.Background(), now)

checkReplicaTimestamp(t, time.Second, c, "user", cluster, replica2, now, now)

// Now we should accept from replica 2.
err = c.checkReplica(context.Background(), "user", cluster, replica2, now)
assert.NoError(t, err)

// We timed out accepting samples from replica 1 and should now reject them.
err = c.checkReplica(context.Background(), "user", cluster, replica1, now)
assert.Error(t, err)
}

func TestHATrackerCacheSyncOnStart(t *testing.T) {
const cluster = "c1"
const replicaOne = "r1"
Expand Down