-
Notifications
You must be signed in to change notification settings - Fork 239
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Revert "Revert "Tests and foundations for new cluster peer discovery (#…
- Loading branch information
Showing
9 changed files
with
600 additions
and
21 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,15 @@ | ||
package discovery | ||
|
||
import ( | ||
"net" | ||
"strconv" | ||
) | ||
|
||
func appendDefaultPort(addr string, port int) string { | ||
_, _, err := net.SplitHostPort(addr) | ||
if err == nil { | ||
// No error means there was a port in the string | ||
return addr | ||
} | ||
return net.JoinHostPort(addr, strconv.Itoa(port)) | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,41 @@ | ||
package discovery | ||
|
||
import ( | ||
"fmt" | ||
stdlog "log" | ||
|
||
"github.com/go-kit/log" | ||
"github.com/hashicorp/go-discover" | ||
"github.com/hashicorp/go-discover/provider/k8s" | ||
) | ||
|
||
func newDynamicDiscovery(l log.Logger, config string, defaultPort int, factory goDiscoverFactory) (DiscoverFn, error) { | ||
providers := make(map[string]discover.Provider, len(discover.Providers)+1) | ||
for k, v := range discover.Providers { | ||
providers[k] = v | ||
} | ||
|
||
// Custom providers that aren't enabled by default | ||
providers["k8s"] = &k8s.Provider{} | ||
|
||
discoverer, err := factory(discover.WithProviders(providers)) | ||
if err != nil { | ||
return nil, fmt.Errorf("bootstrapping peer discovery: %w", err) | ||
} | ||
|
||
return func() ([]string, error) { | ||
addrs, err := discoverer.Addrs(config, stdlog.New(log.NewStdlibAdapter(l), "", 0)) | ||
if err != nil { | ||
return nil, fmt.Errorf("discovering peers: %w", err) | ||
} | ||
|
||
for i := range addrs { | ||
// Default to using the same advertise port as the local node. This may | ||
// break in some cases, so the user should make sure the port numbers | ||
// align on as many nodes as possible. | ||
addrs[i] = appendDefaultPort(addrs[i], defaultPort) | ||
} | ||
|
||
return addrs, nil | ||
}, nil | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,67 @@ | ||
package discovery | ||
|
||
import ( | ||
"fmt" | ||
"net" | ||
|
||
"github.com/go-kit/log" | ||
godiscover "github.com/hashicorp/go-discover" | ||
"go.opentelemetry.io/otel/trace" | ||
) | ||
|
||
type DiscoverFn func() ([]string, error) | ||
|
||
type Options struct { | ||
JoinPeers []string | ||
DiscoverPeers string | ||
DefaultPort int | ||
// Logger to surface extra information to the user. Required. | ||
Logger log.Logger | ||
// Tracer to emit spans. Required. | ||
Tracer trace.TracerProvider | ||
// lookupSRVFn is a function that can be used to lookup SRV records. If nil, net.LookupSRV is used. Used for testing. | ||
lookupSRVFn lookupSRVFn | ||
// goDiscoverFactory is a function that can be used to create a new discover.Discover instance. | ||
// If nil, godiscover.New is used. Used for testing. | ||
goDiscoverFactory goDiscoverFactory | ||
} | ||
|
||
// lookupSRVFn is a function that can be used to lookup SRV records. Matches net.LookupSRV signature. | ||
type lookupSRVFn func(service, proto, name string) (string, []*net.SRV, error) | ||
|
||
// goDiscoverFactory is a function that can be used to create a new discover.Discover instance. | ||
// Matches discover.New signature. | ||
type goDiscoverFactory func(opts ...godiscover.Option) (*godiscover.Discover, error) | ||
|
||
func NewPeerDiscoveryFn(opts Options) (DiscoverFn, error) { | ||
if opts.Logger == nil { | ||
return nil, fmt.Errorf("logger is required, got nil") | ||
} | ||
if opts.Tracer == nil { | ||
return nil, fmt.Errorf("tracer is required, got nil") | ||
} | ||
if len(opts.JoinPeers) > 0 && opts.DiscoverPeers != "" { | ||
return nil, fmt.Errorf("at most one of join peers and discover peers may be set, "+ | ||
"got join peers %q and discover peers %q", opts.JoinPeers, opts.DiscoverPeers) | ||
} | ||
srvLookupFn := net.LookupSRV | ||
if opts.lookupSRVFn != nil { | ||
srvLookupFn = opts.lookupSRVFn | ||
} | ||
discoverFactory := godiscover.New | ||
if opts.goDiscoverFactory != nil { | ||
discoverFactory = opts.goDiscoverFactory | ||
} | ||
|
||
switch { | ||
case len(opts.JoinPeers) > 0: | ||
return newStaticDiscovery(opts.JoinPeers, opts.DefaultPort, opts.Logger, srvLookupFn), nil | ||
case opts.DiscoverPeers != "": | ||
return newDynamicDiscovery(opts.Logger, opts.DiscoverPeers, opts.DefaultPort, discoverFactory) | ||
default: | ||
// Here, both JoinPeers and DiscoverPeers are empty. This is desirable when | ||
// starting a seed node that other nodes connect to, so we don't require | ||
// one of the fields to be set. | ||
return nil, nil | ||
} | ||
} |
Oops, something went wrong.