From 04994533c5d817b56eae15eac312da66301a27a2 Mon Sep 17 00:00:00 2001 From: Piotr <17101802+thampiotr@users.noreply.github.com> Date: Fri, 9 Aug 2024 14:11:38 +0100 Subject: [PATCH] Cluster peer discovery improvements and fixes (#1387) * Refactor cluster peer discovery * Fix inconsistent behaviour * Add A record discovery and tests * Changelog and docs * PR feedback, ty --- CHANGELOG.md | 20 ++ docs/sources/reference/cli/run.md | 6 +- internal/service/cluster/discovery/common.go | 15 - internal/service/cluster/discovery/dynamic.go | 41 --- .../service/cluster/discovery/go_discovery.go | 72 ++++ .../service/cluster/discovery/join_peers.go | 164 +++++++++ .../cluster/discovery/peer_discovery.go | 25 +- .../cluster/discovery/peer_discovery_test.go | 316 ++++++++++++++---- internal/service/cluster/discovery/static.go | 73 ---- 9 files changed, 529 insertions(+), 203 deletions(-) delete mode 100644 internal/service/cluster/discovery/common.go delete mode 100644 internal/service/cluster/discovery/dynamic.go create mode 100644 internal/service/cluster/discovery/go_discovery.go create mode 100644 internal/service/cluster/discovery/join_peers.go delete mode 100644 internal/service/cluster/discovery/static.go diff --git a/CHANGELOG.md b/CHANGELOG.md index 8b50578640..4c0bf0f284 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,10 +10,30 @@ internal API changes are not present. Main (unreleased) ----------------- +### Enhancements + +- Clustering peer resolution through `--cluster.join-addresses` flag has been + improved with more consistent behaviour, better error handling and added + support for A/AAAA DNS records. If necessary, users can temporarily opt out of + this new behaviour with the `--cluster.use-discovery-v1`, but this can only be + used as a temporary measure, since this flag will be disabled in future + releases. (@thampiotr) + ### Bugfixes - Fixed an issue which caused loss of context data in Faro exception. (@codecapitano) +- Fixed an issue where providing multiple hostnames or IP addresses + via `--cluster.join-addresses` would only use the first provided value. + (@thampiotr) + +- Fixed an issue where providing `:` + in `--cluster.join-addresses` would only resolve with DNS to a single address, + instead of using all the available records. (@thampiotr) + +- Fixed an issue where clustering peers resolution via hostname in `--cluster.join-addresses` + resolves to duplicated IP addresses when using SRV records. (@thampiotr) + v1.3.0 ----------------- diff --git a/docs/sources/reference/cli/run.md b/docs/sources/reference/cli/run.md index c3c645f353..6c36739280 100644 --- a/docs/sources/reference/cli/run.md +++ b/docs/sources/reference/cli/run.md @@ -108,9 +108,9 @@ If `--cluster.advertise-interfaces` isn't explicitly set, {{< param "PRODUCT_NAM {{< param "PRODUCT_NAME" >}} will fail to start if it can't determine the advertised address. Since Windows doesn't use the interface names `eth0` or `en0`, Windows users must explicitly pass at least one valid network interface for `--cluster.advertise-interfaces` or a value for `--cluster.advertise-address`. -The comma-separated list of addresses provided in `--cluster.join-addresses` can either be IP addresses with an optional port, or DNS SRV records to lookup. -The ports on the list of addresses default to the port used for the HTTP listener if not explicitly provided. -We recommend that you align the port numbers on as many nodes as possible to simplify the deployment process. +The comma-separated list of addresses provided in `--cluster.join-addresses` can either be IP addresses or DNS names to lookup (supports SRV and A/AAAA records). +In both cases, the port number can be specified with a `:` suffix. If ports are not provided, default of the port used for the HTTP listener is used. +If you do not provide the port number explicitly, you must ensure that all instances use the same port for the HTTP listener. The `--cluster.discover-peers` command-line flag expects a list of tuples in the form of `provider=XXX key=val key=val ...`. Clustering uses the [go-discover] package to discover peers and fetch their IP addresses, based on the chosen provider and the filtering key-values it supports. diff --git a/internal/service/cluster/discovery/common.go b/internal/service/cluster/discovery/common.go deleted file mode 100644 index 5b81a4921f..0000000000 --- a/internal/service/cluster/discovery/common.go +++ /dev/null @@ -1,15 +0,0 @@ -package discovery - -import ( - "net" - "strconv" -) - -func appendDefaultPort(addr string, port int) string { - _, _, err := net.SplitHostPort(addr) - if err == nil { - // No error means there was a port in the string - return addr - } - return net.JoinHostPort(addr, strconv.Itoa(port)) -} diff --git a/internal/service/cluster/discovery/dynamic.go b/internal/service/cluster/discovery/dynamic.go deleted file mode 100644 index 095e8ac71b..0000000000 --- a/internal/service/cluster/discovery/dynamic.go +++ /dev/null @@ -1,41 +0,0 @@ -package discovery - -import ( - "fmt" - stdlog "log" - - "github.com/go-kit/log" - "github.com/hashicorp/go-discover" - "github.com/hashicorp/go-discover/provider/k8s" -) - -func newDynamicDiscovery(l log.Logger, config string, defaultPort int, factory goDiscoverFactory) (DiscoverFn, error) { - providers := make(map[string]discover.Provider, len(discover.Providers)+1) - for k, v := range discover.Providers { - providers[k] = v - } - - // Custom providers that aren't enabled by default - providers["k8s"] = &k8s.Provider{} - - discoverer, err := factory(discover.WithProviders(providers)) - if err != nil { - return nil, fmt.Errorf("bootstrapping peer discovery: %w", err) - } - - return func() ([]string, error) { - addrs, err := discoverer.Addrs(config, stdlog.New(log.NewStdlibAdapter(l), "", 0)) - if err != nil { - return nil, fmt.Errorf("discovering peers: %w", err) - } - - for i := range addrs { - // Default to using the same advertise port as the local node. This may - // break in some cases, so the user should make sure the port numbers - // align on as many nodes as possible. - addrs[i] = appendDefaultPort(addrs[i], defaultPort) - } - - return addrs, nil - }, nil -} diff --git a/internal/service/cluster/discovery/go_discovery.go b/internal/service/cluster/discovery/go_discovery.go new file mode 100644 index 0000000000..370d07287d --- /dev/null +++ b/internal/service/cluster/discovery/go_discovery.go @@ -0,0 +1,72 @@ +package discovery + +import ( + "context" + "fmt" + stdlog "log" + "net" + "strconv" + + "github.com/go-kit/log" + "github.com/hashicorp/go-discover" + "github.com/hashicorp/go-discover/provider/k8s" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/codes" + "go.opentelemetry.io/otel/trace" +) + +// newWithGoDiscovery creates a new peer discovery function that uses the github.com/hashicorp/go-discover library to +// discover peer addresses that can be used for clustering. +func newWithGoDiscovery(opt Options) (DiscoverFn, error) { + // Default to discover.New if no factory is provided. + factory := opt.goDiscoverFactory + if factory == nil { + factory = discover.New + } + + providers := make(map[string]discover.Provider, len(discover.Providers)+1) + for k, v := range discover.Providers { + providers[k] = v + } + + // Custom providers that aren't enabled by default + providers["k8s"] = &k8s.Provider{} + + discoverer, err := factory(discover.WithProviders(providers)) + if err != nil { + return nil, fmt.Errorf("bootstrapping peer discovery: %w", err) + } + + return func() ([]string, error) { + _, span := opt.Tracer.Tracer("").Start( + context.Background(), + "DiscoverClusterPeers", + trace.WithSpanKind(trace.SpanKindInternal), + ) + defer span.End() + + addrs, err := discoverer.Addrs(opt.DiscoverPeers, stdlog.New(log.NewStdlibAdapter(opt.Logger), "", 0)) + if err != nil { + span.SetStatus(codes.Error, err.Error()) + return nil, fmt.Errorf("discovering peers: %w", err) + } + + for i := range addrs { + // Default to using the same advertise port as the local node. + addrs[i] = appendPortIfAbsent(addrs[i], strconv.Itoa(opt.DefaultPort)) + } + + span.SetAttributes(attribute.Int("discovered_addresses_count", len(addrs))) + span.SetStatus(codes.Ok, "discovered peers") + return addrs, nil + }, nil +} + +func appendPortIfAbsent(addr string, port string) string { + _, _, err := net.SplitHostPort(addr) + if err == nil { + // No error means there was a port in the string + return addr + } + return net.JoinHostPort(addr, port) +} diff --git a/internal/service/cluster/discovery/join_peers.go b/internal/service/cluster/discovery/join_peers.go new file mode 100644 index 0000000000..bde3354f99 --- /dev/null +++ b/internal/service/cluster/discovery/join_peers.go @@ -0,0 +1,164 @@ +package discovery + +import ( + "context" + "errors" + "fmt" + "net" + "strconv" + + "github.com/go-kit/log" + "github.com/samber/lo" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/codes" + "go.opentelemetry.io/otel/trace" + + "github.com/grafana/alloy/internal/runtime/logging/level" +) + +// newWithJoinPeers creates a DiscoverFn that resolves the provided list of peers to a list of addresses that can be +// used for clustering. See docs/sources/reference/cli/run.md and the tests for more information. +func newWithJoinPeers(opts Options) DiscoverFn { + return func() ([]string, error) { + ctx, span := opts.Tracer.Tracer("").Start( + context.Background(), + "ResolveClusterJoinAddresses", + trace.WithSpanKind(trace.SpanKindInternal), + trace.WithAttributes(attribute.Int("join_peers_count", len(opts.JoinPeers))), + ) + defer span.End() + + // Use these resolvers in order to resolve the provided addresses into a form that can be used by clustering. + resolvers := []addressResolver{ + ipResolver(opts.Logger), + dnsAResolver(opts, ctx), + dnsSRVResolver(opts, ctx), + } + + // Get the addresses. + addresses, err := buildJoinAddresses(opts, resolvers) + if err != nil { + span.SetStatus(codes.Error, err.Error()) + return nil, fmt.Errorf("static peer discovery: %w", err) + } + + // Return unique addresses. + result := lo.Uniq(addresses) + span.SetAttributes(attribute.Int("resolved_addresses_count", len(result))) + span.SetStatus(codes.Ok, "resolved addresses") + return result, nil + } +} + +func buildJoinAddresses(opts Options, resolvers []addressResolver) ([]string, error) { + var ( + result []string + deferredErr error + ) + + for _, addr := range opts.JoinPeers { + // See if we have a port override, if not use the default port. + host, port, err := net.SplitHostPort(addr) + if err != nil { + host = addr + port = strconv.Itoa(opts.DefaultPort) + } + + atLeastOneSuccess := false + for _, resolver := range resolvers { + resolved, err := resolver(host) + deferredErr = errors.Join(deferredErr, err) + for _, foundAddr := range resolved { + result = append(result, net.JoinHostPort(foundAddr, port)) + } + // we stop once we find a resolver that succeeded for given address + if len(resolved) > 0 { + atLeastOneSuccess = true + break + } + } + + if !atLeastOneSuccess { + // It is still useful to know if user provided an address that we could not resolve, even + // if another addresses resolve successfully, and we don't return an error. To keep things simple, we're + // not including more detail as it's available through debug level. + level.Warn(opts.Logger).Log("msg", "failed to resolve provided join address", "addr", addr) + } + } + + if len(result) == 0 { + return nil, fmt.Errorf("failed to find any valid join addresses: %w", deferredErr) + } + return result, nil +} + +type addressResolver func(addr string) ([]string, error) + +func ipResolver(log log.Logger) addressResolver { + return func(addr string) ([]string, error) { + // Check if it's IP and use it if so. + ip := net.ParseIP(addr) + if ip == nil { + return nil, fmt.Errorf("could not parse as an IP or IP:port address: %q", addr) + } + level.Debug(log).Log("msg", "found an IP cluster join address", "addr", addr) + return []string{ip.String()}, nil + } +} + +func dnsAResolver(opts Options, ctx context.Context) addressResolver { + // Default to net.LookupIP if not provided. By default, this will look up A/AAAA records. + ipLookup := opts.lookupIPFn + if ipLookup == nil { + ipLookup = net.LookupIP + } + return dnsResolver(opts, ctx, "A/AAAA", func(addr string) ([]string, error) { + ips, err := ipLookup(addr) + result := make([]string, 0, len(ips)) + for _, ip := range ips { + result = append(result, ip.String()) + } + return result, err + }) +} + +func dnsSRVResolver(opts Options, ctx context.Context) addressResolver { + // Default to net.LookupSRV if not provided. + srvLookup := opts.lookupSRVFn + if srvLookup == nil { + srvLookup = net.LookupSRV + } + return dnsResolver(opts, ctx, "SRV", func(addr string) ([]string, error) { + _, addresses, err := srvLookup("", "", addr) + result := make([]string, 0, len(addresses)) + for _, a := range addresses { + result = append(result, a.Target) + } + return result, err + }) +} + +func dnsResolver(opts Options, ctx context.Context, recordType string, dnsLookupFn func(string) ([]string, error)) addressResolver { + return func(addr string) ([]string, error) { + _, span := opts.Tracer.Tracer("").Start( + ctx, + "ClusterPeersDNSLookup", + trace.WithSpanKind(trace.SpanKindInternal), + trace.WithAttributes(attribute.String("addr", addr)), + trace.WithAttributes(attribute.String("record_type", recordType)), + ) + defer span.End() + + result, err := dnsLookupFn(addr) + if err != nil { + level.Debug(opts.Logger).Log("msg", "failed to resolve DNS records", "addr", addr, "record_type", recordType, "err", err) + span.SetStatus(codes.Error, err.Error()) + return nil, fmt.Errorf("failed to resolve %q records: %w", recordType, err) + } + + level.Debug(opts.Logger).Log("msg", "received DNS query response", "addr", addr, "record_type", recordType, "records_count", len(result)) + span.SetAttributes(attribute.Int("resolved_addresses_count", len(result))) + span.SetStatus(codes.Ok, "resolved addresses") + return result, nil + } +} diff --git a/internal/service/cluster/discovery/peer_discovery.go b/internal/service/cluster/discovery/peer_discovery.go index a6ab3bb18c..36ef34d990 100644 --- a/internal/service/cluster/discovery/peer_discovery.go +++ b/internal/service/cluster/discovery/peer_discovery.go @@ -3,10 +3,13 @@ package discovery import ( "fmt" "net" + "strings" "github.com/go-kit/log" godiscover "github.com/hashicorp/go-discover" "go.opentelemetry.io/otel/trace" + + "github.com/grafana/alloy/internal/runtime/logging/level" ) type DiscoverFn func() ([]string, error) @@ -21,6 +24,9 @@ type Options struct { Tracer trace.TracerProvider // lookupSRVFn is a function that can be used to lookup SRV records. If nil, net.LookupSRV is used. Used for testing. lookupSRVFn lookupSRVFn + // lookupIPFn is a function that can be used to lookup addresses using A/AAAA DNS records. If nil, net.LookupIP is used. Used for testing. + lookupIPFn lookupIPFn + // goDiscoverFactory is a function that can be used to create a new discover.Discover instance. // If nil, godiscover.New is used. Used for testing. goDiscoverFactory goDiscoverFactory @@ -29,6 +35,9 @@ type Options struct { // lookupSRVFn is a function that can be used to lookup SRV records. Matches net.LookupSRV signature. type lookupSRVFn func(service, proto, name string) (string, []*net.SRV, error) +// lookupIPFn is a function that can be used to lookup IP addresses using A/AAAA DNS records. Matches net.LookupIP signature. +type lookupIPFn func(host string) ([]net.IP, error) + // goDiscoverFactory is a function that can be used to create a new discover.Discover instance. // Matches discover.New signature. type goDiscoverFactory func(opts ...godiscover.Option) (*godiscover.Discover, error) @@ -44,24 +53,20 @@ func NewPeerDiscoveryFn(opts Options) (DiscoverFn, error) { return nil, fmt.Errorf("at most one of join peers and discover peers may be set, "+ "got join peers %q and discover peers %q", opts.JoinPeers, opts.DiscoverPeers) } - srvLookupFn := net.LookupSRV - if opts.lookupSRVFn != nil { - srvLookupFn = opts.lookupSRVFn - } - discoverFactory := godiscover.New - if opts.goDiscoverFactory != nil { - discoverFactory = opts.goDiscoverFactory - } switch { case len(opts.JoinPeers) > 0: - return newStaticDiscovery(opts.JoinPeers, opts.DefaultPort, opts.Logger, srvLookupFn), nil + level.Info(opts.Logger).Log("msg", "using provided peers for discovery", "join_peers", strings.Join(opts.JoinPeers, ", ")) + return newWithJoinPeers(opts), nil case opts.DiscoverPeers != "": - return newDynamicDiscovery(opts.Logger, opts.DiscoverPeers, opts.DefaultPort, discoverFactory) + // opts.DiscoverPeers is not logged to avoid leaking sensitive information. + level.Info(opts.Logger).Log("msg", "using go-discovery to discover peers") + return newWithGoDiscovery(opts) default: // Here, both JoinPeers and DiscoverPeers are empty. This is desirable when // starting a seed node that other nodes connect to, so we don't require // one of the fields to be set. + level.Info(opts.Logger).Log("msg", "no peer discovery configured: both join and discover peers are empty") return nil, nil } } diff --git a/internal/service/cluster/discovery/peer_discovery_test.go b/internal/service/cluster/discovery/peer_discovery_test.go index 8172d3da01..d8663230cb 100644 --- a/internal/service/cluster/discovery/peer_discovery_test.go +++ b/internal/service/cluster/discovery/peer_discovery_test.go @@ -53,27 +53,46 @@ func TestPeerDiscovery(t *testing.T) { expectedCreateErrContain: "at most one of join peers and discover peers may be set", }, { - //TODO(thampiotr): there is an inconsistency here: when given host:port, we resolve to it without looking - // up the IP addresses. But when given a host only without the port, we look up the IP addresses with the DNS. - name: "static host:port", + name: "static host:port resolves to IP addresses with the specified port", args: Options{ JoinPeers: []string{"host:1234"}, DefaultPort: 8888, Logger: logger, Tracer: tracer, + lookupSRVFn: func(service, proto, name string) (string, []*net.SRV, error) { + return "", []*net.SRV{ + {Target: "10.10.10.10", Port: 7777}, + {Target: "10.10.10.12", Port: 9999}, + }, nil + }, }, - expected: []string{"host:1234"}, + expected: []string{"10.10.10.10:1234", "10.10.10.12:1234"}, }, { - //TODO(thampiotr): this returns only one right now, but I think it should return multiple - name: "multiple static host:ports given", + name: "mixed host:port and host given", args: Options{ - JoinPeers: []string{"host1:1234", "host2:1234"}, + JoinPeers: []string{"host1:1234", "host2"}, DefaultPort: 8888, Logger: logger, Tracer: tracer, + lookupSRVFn: func(service, proto, name string) (string, []*net.SRV, error) { + switch name { + case "host1": + return "", []*net.SRV{ + {Target: "10.10.10.10", Port: 7777}, + {Target: "10.10.10.12", Port: 9999}, + }, nil + case "host2": + return "", []*net.SRV{ + {Target: "10.10.10.20", Port: 7777}, + {Target: "10.10.10.21", Port: 9999}, + }, nil + default: + return "", nil, fmt.Errorf("unexpected name %q", name) + } + }, }, - expected: []string{"host1:1234"}, + expected: []string{"10.10.10.10:1234", "10.10.10.12:1234", "10.10.10.20:8888", "10.10.10.21:8888"}, }, { name: "static ip address with port", @@ -82,6 +101,10 @@ func TestPeerDiscovery(t *testing.T) { DefaultPort: 12345, Logger: logger, Tracer: tracer, + lookupSRVFn: func(service, proto, name string) (string, []*net.SRV, error) { + t.Fatalf("unexpected call with %q", name) + return "", nil, nil + }, }, expected: []string{"10.10.10.10:8888"}, }, @@ -96,7 +119,6 @@ func TestPeerDiscovery(t *testing.T) { expected: []string{"10.10.10.10:12345"}, }, { - //TODO(thampiotr): the error message is not very informative in this case name: "invalid ip address", args: Options{ JoinPeers: []string{"10.301.10.10"}, @@ -104,10 +126,9 @@ func TestPeerDiscovery(t *testing.T) { Logger: logger, Tracer: tracer, }, - expectedErrContain: "lookup 10.301.10.10: no such host", + expectedErrContain: "could not parse as an IP or IP:port address: \"10.301.10.10\"", }, { - //TODO(thampiotr): should we support multiple? name: "multiple ip addresses", args: Options{ JoinPeers: []string{"10.10.10.10", "11.11.11.11"}, @@ -115,10 +136,9 @@ func TestPeerDiscovery(t *testing.T) { Logger: logger, Tracer: tracer, }, - expected: []string{"10.10.10.10:12345"}, + expected: []string{"10.10.10.10:12345", "11.11.11.11:12345"}, }, { - //TODO(thampiotr): should we drop the invalid ones only or error? name: "multiple ip addresses with some invalid", args: Options{ JoinPeers: []string{"10.10.10.10", "11.311.11.11", "22.22.22.22"}, @@ -126,7 +146,17 @@ func TestPeerDiscovery(t *testing.T) { Logger: logger, Tracer: tracer, }, - expected: []string{"10.10.10.10:12345"}, + expected: []string{"10.10.10.10:12345", "22.22.22.22:12345"}, + }, + { + name: "multiple ip addresses with some having a port", + args: Options{ + JoinPeers: []string{"10.10.10.10", "11.211.11.11:7777", "22.22.22.22"}, + DefaultPort: 12345, + Logger: logger, + Tracer: tracer, + }, + expected: []string{"10.10.10.10:12345", "11.211.11.11:7777", "22.22.22.22:12345"}, }, { name: "no DNS records found", @@ -136,13 +166,16 @@ func TestPeerDiscovery(t *testing.T) { Logger: logger, Tracer: tracer, lookupSRVFn: func(service, proto, name string) (string, []*net.SRV, error) { - return "", []*net.SRV{}, nil + return "", nil, nil + }, + lookupIPFn: func(host string) ([]net.IP, error) { + return nil, nil }, }, expectedErrContain: "failed to find any valid join addresses", }, { - name: "SRV DNS record lookup error", + name: "SRV record lookup error", args: Options{ JoinPeers: []string{"host1"}, DefaultPort: 12345, @@ -163,10 +196,7 @@ func TestPeerDiscovery(t *testing.T) { Tracer: tracer, lookupSRVFn: func(service, proto, name string) (string, []*net.SRV, error) { return "", []*net.SRV{ - { - Target: "10.10.10.10", - Port: 12345, - }, + {Target: "10.10.10.10", Port: 12345}, }, nil }, }, @@ -181,18 +211,9 @@ func TestPeerDiscovery(t *testing.T) { Tracer: tracer, lookupSRVFn: func(service, proto, name string) (string, []*net.SRV, error) { return "", []*net.SRV{ - { - Target: "10.10.10.10", - Port: 12345, - }, - { - Target: "10.10.10.11", - Port: 12346, - }, - { - Target: "10.10.10.12", - Port: 12347, - }, + {Target: "10.10.10.10", Port: 12345}, + {Target: "10.10.10.11", Port: 12346}, + {Target: "10.10.10.12", Port: 12347}, }, nil }, }, @@ -201,34 +222,27 @@ func TestPeerDiscovery(t *testing.T) { { name: "multiple hosts and multiple SRV records found", args: Options{ - JoinPeers: []string{"host1", "host2"}, + JoinPeers: []string{"host1", "host2:7777"}, DefaultPort: 8888, // NOTE: this is the port that will be used, not the one from SRV records. Logger: logger, Tracer: tracer, lookupSRVFn: func(service, proto, name string) (string, []*net.SRV, error) { - if name == "host1" { + switch name { + case "host1": return "", []*net.SRV{ - { - Target: "10.10.10.10", - Port: 12345, - }, - { - Target: "10.10.10.10", - Port: 12346, - }, + {Target: "10.10.10.10", Port: 12345}, + {Target: "10.10.10.10", Port: 12346}, }, nil - } else { + case "host2": return "", []*net.SRV{ - { - Target: "10.10.10.11", - Port: 12346, - }, + {Target: "10.10.10.11", Port: 12346}, }, nil + default: + return "", nil, fmt.Errorf("unexpected name %q", name) } }, }, - //TODO(thampiotr): This is likely wrong, we should not have duplicate results. - expected: []string{"10.10.10.10:8888", "10.10.10.10:8888", "10.10.10.11:8888"}, + expected: []string{"10.10.10.10:8888", "10.10.10.11:7777"}, }, { name: "one SRV record lookup fails, another succeeds", @@ -238,23 +252,177 @@ func TestPeerDiscovery(t *testing.T) { Logger: logger, Tracer: tracer, lookupSRVFn: func(service, proto, name string) (string, []*net.SRV, error) { - if name == "host1" { + switch name { + case "host2": + return "", []*net.SRV{ + {Target: "10.10.10.10", Port: 12345}, + {Target: "10.10.10.10", Port: 12346}, + }, nil + default: return "", []*net.SRV{}, fmt.Errorf("DNS lookup test error") - } else { + } + }, + }, + // NOTE: due to deduplication, only one result is returned here. + expected: []string{"10.10.10.10:8888"}, + }, + { + name: "A record lookup error", + args: Options{ + JoinPeers: []string{"host1"}, + DefaultPort: 12345, + Logger: logger, + Tracer: tracer, + lookupSRVFn: func(service, proto, name string) (string, []*net.SRV, error) { + return "", nil, fmt.Errorf("DNS SRV record lookup test error") + }, + lookupIPFn: func(host string) ([]net.IP, error) { + return nil, fmt.Errorf("DNS A record lookup test error") + }, + }, + expectedErrContain: "DNS A record lookup test error", + }, + { + name: "single A record found", + args: Options{ + JoinPeers: []string{"host1"}, + DefaultPort: 12345, + Logger: logger, + Tracer: tracer, + lookupSRVFn: func(service, proto, name string) (string, []*net.SRV, error) { + return "", nil, fmt.Errorf("DNS SRV record lookup test error") + }, + lookupIPFn: func(host string) ([]net.IP, error) { + return []net.IP{ + net.ParseIP("10.10.10.10"), + }, nil + }, + }, + expected: []string{"10.10.10.10:12345"}, + }, + { + name: "multiple A records found", + args: Options{ + JoinPeers: []string{"host1"}, + DefaultPort: 8888, + Logger: logger, + Tracer: tracer, + lookupSRVFn: func(service, proto, name string) (string, []*net.SRV, error) { + return "", nil, fmt.Errorf("DNS SRV record lookup test error") + }, + lookupIPFn: func(host string) ([]net.IP, error) { + return []net.IP{ + net.ParseIP("10.10.10.10"), + net.ParseIP("10.10.10.11"), + net.ParseIP("10.10.10.12"), + }, nil + }, + }, + expected: []string{"10.10.10.10:8888", "10.10.10.11:8888", "10.10.10.12:8888"}, + }, + { + name: "multiple hosts and multiple A records found", + args: Options{ + JoinPeers: []string{"host1:7777", "host2"}, + DefaultPort: 8888, + Logger: logger, + Tracer: tracer, + lookupSRVFn: func(service, proto, name string) (string, []*net.SRV, error) { + return "", nil, fmt.Errorf("DNS SRV record lookup test error") + }, + lookupIPFn: func(host string) ([]net.IP, error) { + switch host { + case "host1": + return []net.IP{ + net.ParseIP("10.10.10.10"), + net.ParseIP("10.10.10.11"), + }, nil + case "host2": + return []net.IP{ + net.ParseIP("10.10.10.11"), + }, nil + default: + return nil, fmt.Errorf("unexpected name %q", host) + } + }, + }, + expected: []string{"10.10.10.10:7777", "10.10.10.11:7777", "10.10.10.11:8888"}, + }, + { + name: "one A record lookup fails, another succeeds", + args: Options{ + JoinPeers: []string{"host1", "host2"}, + DefaultPort: 8888, // NOTE: this is the port that will be used, not the one from SRV records. + Logger: logger, + Tracer: tracer, + lookupSRVFn: func(service, proto, name string) (string, []*net.SRV, error) { + return "", nil, fmt.Errorf("DNS SRV record lookup test error") + }, + lookupIPFn: func(host string) ([]net.IP, error) { + switch host { + case "host2": + return []net.IP{ + net.ParseIP("10.10.10.10"), + net.ParseIP("10.10.10.11"), + }, nil + default: + return nil, fmt.Errorf("unexpected name %q", host) + } + }, + }, + expected: []string{"10.10.10.10:8888", "10.10.10.11:8888"}, + }, + { + name: "one host has A record and another has SRV record", + args: Options{ + JoinPeers: []string{"host1", "host2"}, + DefaultPort: 8888, // NOTE: this is the port that will be used, not the one from SRV records. + Logger: logger, + Tracer: tracer, + lookupSRVFn: func(service, proto, name string) (string, []*net.SRV, error) { + switch name { + case "host1": return "", []*net.SRV{ - { - Target: "10.10.10.10", - Port: 12345, - }, - { - Target: "10.10.10.10", - Port: 12346, - }, + {Target: "10.10.10.10", Port: 12345}, + {Target: "10.10.10.10", Port: 12346}, }, nil + default: + return "", []*net.SRV{}, fmt.Errorf("DNS lookup test error") } }, + lookupIPFn: func(host string) ([]net.IP, error) { + switch host { + case "host2": + return []net.IP{ + net.ParseIP("10.10.10.11"), + net.ParseIP("10.10.10.12"), + }, nil + default: + return nil, fmt.Errorf("unknown name %q", host) + } + }, + }, + expected: []string{"10.10.10.10:8888", "10.10.10.11:8888", "10.10.10.12:8888"}, + }, + { + name: "A records take precedence over SRV records", + args: Options{ + JoinPeers: []string{"host1"}, + DefaultPort: 8888, + Logger: logger, + Tracer: tracer, + lookupSRVFn: func(service, proto, name string) (string, []*net.SRV, error) { + return "", []*net.SRV{ + {Target: "10.10.10.10", Port: 12345}, + }, nil + }, + lookupIPFn: func(host string) ([]net.IP, error) { + return []net.IP{ + net.ParseIP("10.10.10.11"), + }, nil + }, }, - expected: []string{"10.10.10.10:8888", "10.10.10.10:8888"}, + expected: []string{"10.10.10.11:8888"}, }, { name: "go discovery factory error", @@ -309,6 +477,28 @@ func TestPeerDiscovery(t *testing.T) { }, expectedErrContain: "unknown provider gce", }, + { + name: "go discovery k8s provider added by default", + args: Options{ + DiscoverPeers: "provider=k8s", + Logger: logger, + Tracer: tracer, + goDiscoverFactory: func(opts ...godiscover.Option) (*godiscover.Discover, error) { + d, err := godiscover.New(opts...) + if err != nil { + return nil, err + } + if _, ok := d.Providers["k8s"]; !ok { + return nil, fmt.Errorf("k8s provider not found") + } + return &godiscover.Discover{ + Providers: map[string]godiscover.Provider{ + "k8s": &testProvider{}, + }, + }, nil + }, + }, + }, } for _, tt := range tests { @@ -323,6 +513,7 @@ func TestPeerDiscovery(t *testing.T) { actual, err := fn() if tt.expectedErrContain != "" { + logger.Log("actual_err", err) require.ErrorContains(t, err, tt.expectedErrContain) return } else { @@ -346,7 +537,10 @@ type testProvider struct { fn func() ([]string, error) } -func (t testProvider) Addrs(args map[string]string, l *stdlog.Logger) ([]string, error) { +func (t testProvider) Addrs(_ map[string]string, _ *stdlog.Logger) ([]string, error) { + if t.fn == nil { + return nil, nil + } return t.fn() } diff --git a/internal/service/cluster/discovery/static.go b/internal/service/cluster/discovery/static.go deleted file mode 100644 index 08ed97a9a4..0000000000 --- a/internal/service/cluster/discovery/static.go +++ /dev/null @@ -1,73 +0,0 @@ -package discovery - -import ( - "errors" - "fmt" - "net" - - "github.com/go-kit/log" - - "github.com/grafana/alloy/internal/runtime/logging/level" -) - -func newStaticDiscovery(providedAddr []string, defaultPort int, log log.Logger, srvLookup lookupSRVFn) DiscoverFn { - return func() ([]string, error) { - addresses, err := buildJoinAddresses(providedAddr, log, srvLookup) - if err != nil { - return nil, fmt.Errorf("static peer discovery: %w", err) - } - for i := range addresses { - // Default to using the same advertise port as the local node. This may - // break in some cases, so the user should make sure the port numbers - // align on as many nodes as possible. - addresses[i] = appendDefaultPort(addresses[i], defaultPort) - } - return addresses, nil - } -} - -func buildJoinAddresses(providedAddr []string, log log.Logger, srvLookup lookupSRVFn) ([]string, error) { - // Currently we don't consider it an error to not have any join addresses. - if len(providedAddr) == 0 { - return nil, nil - } - var ( - result []string - deferredErr error - ) - for _, addr := range providedAddr { - // If it's a host:port, use it as is. - _, _, err := net.SplitHostPort(addr) - if err != nil { - deferredErr = errors.Join(deferredErr, fmt.Errorf("failed to extract host and port: %w", err)) - } else { - level.Debug(log).Log("msg", "found a host:port cluster join address", "addr", addr) - result = append(result, addr) - break - } - - // If it's an IP address, use it. - ip := net.ParseIP(addr) - if ip != nil { - level.Debug(log).Log("msg", "found an IP cluster join address", "addr", addr) - result = append(result, ip.String()) - break - } - - // Otherwise, do a DNS lookup and return all the records found. - _, srvs, err := srvLookup("", "", addr) - if err != nil { - level.Warn(log).Log("msg", "failed to resolve SRV records", "addr", addr, "err", err) - deferredErr = errors.Join(deferredErr, fmt.Errorf("failed to resolve SRV records: %w", err)) - } else { - level.Debug(log).Log("msg", "found cluster join addresses via SRV records", "addr", addr, "count", len(srvs)) - for _, srv := range srvs { - result = append(result, srv.Target) - } - } - } - if len(result) == 0 { - return nil, fmt.Errorf("failed to find any valid join addresses: %w", deferredErr) - } - return result, nil -}