-
Notifications
You must be signed in to change notification settings - Fork 238
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Cluster peer discovery improvements and fixes (#1387)
* Refactor cluster peer discovery * Fix inconsistent behaviour * Add A record discovery and tests * Changelog and docs * PR feedback, ty
- Loading branch information
Showing
9 changed files
with
529 additions
and
203 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file was deleted.
Oops, something went wrong.
This file was deleted.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,72 @@ | ||
package discovery | ||
|
||
import ( | ||
"context" | ||
"fmt" | ||
stdlog "log" | ||
"net" | ||
"strconv" | ||
|
||
"github.com/go-kit/log" | ||
"github.com/hashicorp/go-discover" | ||
"github.com/hashicorp/go-discover/provider/k8s" | ||
"go.opentelemetry.io/otel/attribute" | ||
"go.opentelemetry.io/otel/codes" | ||
"go.opentelemetry.io/otel/trace" | ||
) | ||
|
||
// newWithGoDiscovery creates a new peer discovery function that uses the github.com/hashicorp/go-discover library to | ||
// discover peer addresses that can be used for clustering. | ||
func newWithGoDiscovery(opt Options) (DiscoverFn, error) { | ||
// Default to discover.New if no factory is provided. | ||
factory := opt.goDiscoverFactory | ||
if factory == nil { | ||
factory = discover.New | ||
} | ||
|
||
providers := make(map[string]discover.Provider, len(discover.Providers)+1) | ||
for k, v := range discover.Providers { | ||
providers[k] = v | ||
} | ||
|
||
// Custom providers that aren't enabled by default | ||
providers["k8s"] = &k8s.Provider{} | ||
|
||
discoverer, err := factory(discover.WithProviders(providers)) | ||
if err != nil { | ||
return nil, fmt.Errorf("bootstrapping peer discovery: %w", err) | ||
} | ||
|
||
return func() ([]string, error) { | ||
_, span := opt.Tracer.Tracer("").Start( | ||
context.Background(), | ||
"DiscoverClusterPeers", | ||
trace.WithSpanKind(trace.SpanKindInternal), | ||
) | ||
defer span.End() | ||
|
||
addrs, err := discoverer.Addrs(opt.DiscoverPeers, stdlog.New(log.NewStdlibAdapter(opt.Logger), "", 0)) | ||
if err != nil { | ||
span.SetStatus(codes.Error, err.Error()) | ||
return nil, fmt.Errorf("discovering peers: %w", err) | ||
} | ||
|
||
for i := range addrs { | ||
// Default to using the same advertise port as the local node. | ||
addrs[i] = appendPortIfAbsent(addrs[i], strconv.Itoa(opt.DefaultPort)) | ||
} | ||
|
||
span.SetAttributes(attribute.Int("discovered_addresses_count", len(addrs))) | ||
span.SetStatus(codes.Ok, "discovered peers") | ||
return addrs, nil | ||
}, nil | ||
} | ||
|
||
func appendPortIfAbsent(addr string, port string) string { | ||
_, _, err := net.SplitHostPort(addr) | ||
if err == nil { | ||
// No error means there was a port in the string | ||
return addr | ||
} | ||
return net.JoinHostPort(addr, port) | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,164 @@ | ||
package discovery | ||
|
||
import ( | ||
"context" | ||
"errors" | ||
"fmt" | ||
"net" | ||
"strconv" | ||
|
||
"github.com/go-kit/log" | ||
"github.com/samber/lo" | ||
"go.opentelemetry.io/otel/attribute" | ||
"go.opentelemetry.io/otel/codes" | ||
"go.opentelemetry.io/otel/trace" | ||
|
||
"github.com/grafana/alloy/internal/runtime/logging/level" | ||
) | ||
|
||
// newWithJoinPeers creates a DiscoverFn that resolves the provided list of peers to a list of addresses that can be | ||
// used for clustering. See docs/sources/reference/cli/run.md and the tests for more information. | ||
func newWithJoinPeers(opts Options) DiscoverFn { | ||
return func() ([]string, error) { | ||
ctx, span := opts.Tracer.Tracer("").Start( | ||
context.Background(), | ||
"ResolveClusterJoinAddresses", | ||
trace.WithSpanKind(trace.SpanKindInternal), | ||
trace.WithAttributes(attribute.Int("join_peers_count", len(opts.JoinPeers))), | ||
) | ||
defer span.End() | ||
|
||
// Use these resolvers in order to resolve the provided addresses into a form that can be used by clustering. | ||
resolvers := []addressResolver{ | ||
ipResolver(opts.Logger), | ||
dnsAResolver(opts, ctx), | ||
dnsSRVResolver(opts, ctx), | ||
} | ||
|
||
// Get the addresses. | ||
addresses, err := buildJoinAddresses(opts, resolvers) | ||
if err != nil { | ||
span.SetStatus(codes.Error, err.Error()) | ||
return nil, fmt.Errorf("static peer discovery: %w", err) | ||
} | ||
|
||
// Return unique addresses. | ||
result := lo.Uniq(addresses) | ||
span.SetAttributes(attribute.Int("resolved_addresses_count", len(result))) | ||
span.SetStatus(codes.Ok, "resolved addresses") | ||
return result, nil | ||
} | ||
} | ||
|
||
func buildJoinAddresses(opts Options, resolvers []addressResolver) ([]string, error) { | ||
var ( | ||
result []string | ||
deferredErr error | ||
) | ||
|
||
for _, addr := range opts.JoinPeers { | ||
// See if we have a port override, if not use the default port. | ||
host, port, err := net.SplitHostPort(addr) | ||
if err != nil { | ||
host = addr | ||
port = strconv.Itoa(opts.DefaultPort) | ||
} | ||
|
||
atLeastOneSuccess := false | ||
for _, resolver := range resolvers { | ||
resolved, err := resolver(host) | ||
deferredErr = errors.Join(deferredErr, err) | ||
for _, foundAddr := range resolved { | ||
result = append(result, net.JoinHostPort(foundAddr, port)) | ||
} | ||
// we stop once we find a resolver that succeeded for given address | ||
if len(resolved) > 0 { | ||
atLeastOneSuccess = true | ||
break | ||
} | ||
} | ||
|
||
if !atLeastOneSuccess { | ||
// It is still useful to know if user provided an address that we could not resolve, even | ||
// if another addresses resolve successfully, and we don't return an error. To keep things simple, we're | ||
// not including more detail as it's available through debug level. | ||
level.Warn(opts.Logger).Log("msg", "failed to resolve provided join address", "addr", addr) | ||
} | ||
} | ||
|
||
if len(result) == 0 { | ||
return nil, fmt.Errorf("failed to find any valid join addresses: %w", deferredErr) | ||
} | ||
return result, nil | ||
} | ||
|
||
type addressResolver func(addr string) ([]string, error) | ||
|
||
func ipResolver(log log.Logger) addressResolver { | ||
return func(addr string) ([]string, error) { | ||
// Check if it's IP and use it if so. | ||
ip := net.ParseIP(addr) | ||
if ip == nil { | ||
return nil, fmt.Errorf("could not parse as an IP or IP:port address: %q", addr) | ||
} | ||
level.Debug(log).Log("msg", "found an IP cluster join address", "addr", addr) | ||
return []string{ip.String()}, nil | ||
} | ||
} | ||
|
||
func dnsAResolver(opts Options, ctx context.Context) addressResolver { | ||
// Default to net.LookupIP if not provided. By default, this will look up A/AAAA records. | ||
ipLookup := opts.lookupIPFn | ||
if ipLookup == nil { | ||
ipLookup = net.LookupIP | ||
} | ||
return dnsResolver(opts, ctx, "A/AAAA", func(addr string) ([]string, error) { | ||
ips, err := ipLookup(addr) | ||
result := make([]string, 0, len(ips)) | ||
for _, ip := range ips { | ||
result = append(result, ip.String()) | ||
} | ||
return result, err | ||
}) | ||
} | ||
|
||
func dnsSRVResolver(opts Options, ctx context.Context) addressResolver { | ||
// Default to net.LookupSRV if not provided. | ||
srvLookup := opts.lookupSRVFn | ||
if srvLookup == nil { | ||
srvLookup = net.LookupSRV | ||
} | ||
return dnsResolver(opts, ctx, "SRV", func(addr string) ([]string, error) { | ||
_, addresses, err := srvLookup("", "", addr) | ||
result := make([]string, 0, len(addresses)) | ||
for _, a := range addresses { | ||
result = append(result, a.Target) | ||
} | ||
return result, err | ||
}) | ||
} | ||
|
||
func dnsResolver(opts Options, ctx context.Context, recordType string, dnsLookupFn func(string) ([]string, error)) addressResolver { | ||
return func(addr string) ([]string, error) { | ||
_, span := opts.Tracer.Tracer("").Start( | ||
ctx, | ||
"ClusterPeersDNSLookup", | ||
trace.WithSpanKind(trace.SpanKindInternal), | ||
trace.WithAttributes(attribute.String("addr", addr)), | ||
trace.WithAttributes(attribute.String("record_type", recordType)), | ||
) | ||
defer span.End() | ||
|
||
result, err := dnsLookupFn(addr) | ||
if err != nil { | ||
level.Debug(opts.Logger).Log("msg", "failed to resolve DNS records", "addr", addr, "record_type", recordType, "err", err) | ||
span.SetStatus(codes.Error, err.Error()) | ||
return nil, fmt.Errorf("failed to resolve %q records: %w", recordType, err) | ||
} | ||
|
||
level.Debug(opts.Logger).Log("msg", "received DNS query response", "addr", addr, "record_type", recordType, "records_count", len(result)) | ||
span.SetAttributes(attribute.Int("resolved_addresses_count", len(result))) | ||
span.SetStatus(codes.Ok, "resolved addresses") | ||
return result, nil | ||
} | ||
} |
Oops, something went wrong.