Skip to content

Commit

Permalink
feat: support conditional start of IPv6 dns servers
Browse files Browse the repository at this point in the history
This PR does those things:
- [x] Refactored `DNSResolveCacheController`. Most of the logic moved to `dns` package types. Simplify and streamline logic.
- [x] Replace most of the goroutine orchestration with suture package.
- [x] Support per-item reaction to the dns listeners/servers failing to start. This allows us to ignore IPv6 errors if it's disabled.
- [x] Support per-item reaction to the dns listeners/servers failing to stop.
- [ ] Raise IPv6 listener on link-local address for dns (both TCP and UDP).
- [ ] Update kubelet's `resolv.conf` IPv4/IPv6 endpoints.

Closes siderolabs#9384

Signed-off-by: Dmitriy Matrenichev <[email protected]>
  • Loading branch information
DmitriyMV committed Oct 29, 2024
1 parent 423b1e5 commit 96c6144
Show file tree
Hide file tree
Showing 9 changed files with 599 additions and 331 deletions.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,7 @@ require (
github.com/spf13/cobra v1.8.1
github.com/spf13/pflag v1.0.5
github.com/stretchr/testify v1.9.0
github.com/thejerf/suture/v4 v4.0.5
github.com/u-root/u-root v0.14.0
github.com/ulikunitz/xz v0.5.12
github.com/vmware/vmw-guestinfo v0.0.0-20220317130741-510905f0efa3
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -724,6 +724,8 @@ github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsT
github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
github.com/syndtr/gocapability v0.0.0-20200815063812-42c35b437635 h1:kdXcSzyDtseVEc4yCz2qF8ZrQvIDBJLl4S1c3GCXmoI=
github.com/syndtr/gocapability v0.0.0-20200815063812-42c35b437635/go.mod h1:hkRG7XYTFWNJGYcbNJQlaLq0fg1yr4J4t/NcTQtrfww=
github.com/thejerf/suture/v4 v4.0.5 h1:F1E/4FZwXWqvlWDKEUo6/ndLtxGAUzMmNqkrMknZbAA=
github.com/thejerf/suture/v4 v4.0.5/go.mod h1:gu9Y4dXNUWFrByqRt30Rm9/UZ0wzRSt9AJS6xu/ZGxU=
github.com/u-root/u-root v0.14.0 h1:Ka4T10EEML7dQ5XDvO9c3MBN8z4nuSnGjcd1jmU2ivg=
github.com/u-root/u-root v0.14.0/go.mod h1:hAyZorapJe4qzbLWlAkmSVCJGbfoU9Pu4jpJ1WMluqE=
github.com/u-root/uio v0.0.0-20240224005618-d2acac8f3701 h1:pyC9PaHYZFgEKFdlp3G8RaCKgVpHZnecvArXvPXcFkM=
Expand Down
279 changes: 76 additions & 203 deletions internal/app/machined/pkg/controllers/network/dns_resolve_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,23 +6,19 @@ package network

import (
"context"
"errors"
"fmt"
"net"
"iter"
"net/netip"
"slices"
"strings"
"sync"
"time"

"github.com/coredns/coredns/plugin/pkg/proxy"
"github.com/cosi-project/runtime/pkg/controller"
"github.com/cosi-project/runtime/pkg/safe"
"github.com/cosi-project/runtime/pkg/state"
dnssrv "github.com/miekg/dns"
"github.com/hashicorp/go-multierror"
"github.com/siderolabs/gen/optional"
"github.com/siderolabs/gen/pair"
"github.com/siderolabs/gen/xiter"
"github.com/thejerf/suture/v4"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"

Expand All @@ -36,13 +32,9 @@ type DNSResolveCacheController struct {
State state.State
Logger *zap.Logger

mx sync.Mutex
handler *dns.Handler
nodeHandler *dns.NodeHandler
rootHandler dnssrv.Handler
runners map[runnerConfig]pair.Pair[func(), <-chan struct{}]
reconcile chan struct{}
originalCtx context.Context //nolint:containedctx
mx sync.Mutex
manager *dns.Manager
reconcile chan struct{}
}

// Name implements controller.Controller interface.
Expand Down Expand Up @@ -75,31 +67,25 @@ func (ctrl *DNSResolveCacheController) Outputs() []controller.Output {

// Run implements controller.Controller interface.
//
//nolint:gocyclo,cyclop
func (ctrl *DNSResolveCacheController) Run(ctx context.Context, r controller.Runtime, logger *zap.Logger) error {
//nolint:gocyclo
func (ctrl *DNSResolveCacheController) Run(ctx context.Context, r controller.Runtime, _ *zap.Logger) error {
ctrl.init(ctx)

ctrl.mx.Lock()
defer ctrl.mx.Unlock()

defer ctrl.stopRunners(ctx, false)
defer func() {
for err := range ctrl.manager.ClearAll(ctx.Err() == nil) {
ctrl.Logger.Error("error stopping dns runner", zap.Error(err))
}
}()

for {
select {
case <-ctx.Done():
return nil
case <-r.EventCh():
case <-ctrl.reconcile:
for cfg, stop := range ctrl.runners {
select {
default:
continue
case <-stop.F2:
}

stop.F1()
delete(ctrl.runners, cfg)
}
}

cfg, err := safe.ReaderGetByID[*network.HostDNSConfig](ctx, r, network.HostDNSConfigID)
Expand All @@ -112,9 +98,12 @@ func (ctrl *DNSResolveCacheController) Run(ctx context.Context, r controller.Run
}

r.StartTrackingOutputs()
ctrl.manager.AllowNodeResolving(cfg.TypedSpec().ResolveMemberNames)

if !cfg.TypedSpec().Enabled {
ctrl.stopRunners(ctx, true)
if err = foldErrors(ctrl.manager.ClearAll(false)); err != nil {
return err
}

if err = safe.CleanupOutputs[*network.DNSResolveCache](ctx, r); err != nil {
return fmt.Errorf("error cleaning up dns status on disable: %w", err)
Expand All @@ -123,37 +112,16 @@ func (ctrl *DNSResolveCacheController) Run(ctx context.Context, r controller.Run
continue
}

ctrl.nodeHandler.SetEnabled(cfg.TypedSpec().ResolveMemberNames)

touchedRunners := make(map[runnerConfig]struct{}, len(ctrl.runners))

for _, addr := range cfg.TypedSpec().ListenAddresses {
for _, netwk := range []string{"udp", "tcp"} {
runnerCfg := runnerConfig{net: netwk, addr: addr}

if _, ok := ctrl.runners[runnerCfg]; !ok {
runner, rErr := newDNSRunner(runnerCfg, ctrl.rootHandler, ctrl.Logger, cfg.TypedSpec().ServiceHostDNSAddress.IsValid())
if rErr != nil {
return fmt.Errorf("error creating dns runner: %w", rErr)
}

ctrl.runners[runnerCfg] = pair.MakePair(runner.Start(ctrl.handleDone(ctx, logger)))
}
pairs := allAddressPairs(cfg.TypedSpec().ListenAddresses)
forwardKubeDNSToHost := cfg.TypedSpec().ServiceHostDNSAddress.IsValid()

if err = ctrl.writeDNSStatus(ctx, r, runnerCfg); err != nil {
return fmt.Errorf("error writing dns status: %w", err)
}

touchedRunners[runnerCfg] = struct{}{}
for runCfg, runErr := range ctrl.manager.RunAll(pairs, forwardKubeDNSToHost) {
if runErr != nil {
return fmt.Errorf("error updating dns runner %v: %w", runCfg, runErr)
}
}

for runnerCfg, stop := range ctrl.runners {
if _, ok := touchedRunners[runnerCfg]; !ok {
stop.F1()
delete(ctrl.runners, runnerCfg)

continue
if err = ctrl.writeDNSStatus(ctx, r, runCfg); err != nil {
return fmt.Errorf("error writing dns status: %w", err)
}
}

Expand All @@ -169,7 +137,7 @@ func (ctrl *DNSResolveCacheController) Run(ctx context.Context, r controller.Run
return upstream.TypedSpec().Value.Conn.Proxy().(*proxy.Proxy)
})

if ctrl.handler.SetProxy(prxs) {
if ctrl.manager.SetUpstreams(prxs) {
ctrl.Logger.Info("updated dns server nameservers", zap.Array("addrs", addrsArr(upstreams)))
}

Expand All @@ -179,174 +147,64 @@ func (ctrl *DNSResolveCacheController) Run(ctx context.Context, r controller.Run
}
}

func (ctrl *DNSResolveCacheController) writeDNSStatus(ctx context.Context, r controller.Runtime, config runnerConfig) error {
return safe.WriterModify(ctx, r, network.NewDNSResolveCache(fmt.Sprintf("%s-%s", config.net, config.addr)), func(drc *network.DNSResolveCache) error {
drc.TypedSpec().Status = "running"

return nil
})
}
func foldErrors(it iter.Seq[error]) error {
var multiErr *multierror.Error

func (ctrl *DNSResolveCacheController) init(ctx context.Context) {
if ctrl.runners != nil {
if ctrl.originalCtx != ctx {
// This should not happen, but if it does, it's a bug.
panic("DNSResolveCacheController is called with a different context")
}

return
for err := range it {
multiErr = multierror.Append(multiErr, err)
}

ctrl.originalCtx = ctx
ctrl.handler = dns.NewHandler(ctrl.Logger)
ctrl.nodeHandler = dns.NewNodeHandler(ctrl.handler, &stateMapper{state: ctrl.State}, ctrl.Logger)
ctrl.rootHandler = dns.NewCache(ctrl.nodeHandler, ctrl.Logger)
ctrl.runners = map[runnerConfig]pair.Pair[func(), <-chan struct{}]{}
ctrl.reconcile = make(chan struct{}, 1)

// Ensure we stop all runners when the context is canceled, no matter where we are currently.
// For example if we are in Controller runtime sleeping after error and ctx is canceled, we should stop all runners
// but, we will never call Run method again, so we need to ensure this happens regardless of the current state.
context.AfterFunc(ctx, func() {
ctrl.mx.Lock()
defer ctrl.mx.Unlock()

ctrl.stopRunners(ctx, true)
})
return multiErr.ErrorOrNil()
}

func (ctrl *DNSResolveCacheController) stopRunners(ctx context.Context, ignoreCtx bool) {
if !ignoreCtx && ctx.Err() == nil {
// context not yet canceled, preserve runners, cache and handler
return
}

for _, stop := range ctrl.runners {
stop.F1()
}
func (ctrl *DNSResolveCacheController) writeDNSStatus(ctx context.Context, r controller.Runtime, config dns.AddressPair) error {
res := network.NewDNSResolveCache(fmt.Sprintf("%s-%s", config.Network, config.Addr))

clear(ctrl.runners)
return safe.WriterModify(ctx, r, res, func(drc *network.DNSResolveCache) error {
drc.TypedSpec().Status = "running"

ctrl.handler.Stop()
return nil
})
}

func (ctrl *DNSResolveCacheController) handleDone(ctx context.Context, logger *zap.Logger) func(err error) {
return func(err error) {
if ctx.Err() != nil {
if err != nil && !errors.Is(err, net.ErrClosed) {
logger.Error("controller is closing, but error running dns server", zap.Error(err))
func (ctrl *DNSResolveCacheController) init(ctx context.Context) {
if ctrl.manager == nil {
ctrl.manager = dns.NewManager(&memberReader{st: ctrl.State}, ctrl.eventHook, ctrl.Logger)

// Ensure we stop all runners when the context is canceled, no matter where we are currently.
// For example if we are in Controller runtime sleeping after error and ctx is canceled, we should stop all runners
// but, we will never call Run method again, so we need to ensure this happens regardless of the current state.
context.AfterFunc(ctx, func() {
ctrl.mx.Lock()
defer ctrl.mx.Unlock()

for err := range ctrl.manager.ClearAll(false) {
ctrl.Logger.Error("error ctx stopping dns runner", zap.Error(err))
}

return
}

if err != nil {
logger.Error("error running dns server", zap.Error(err))
}

select {
case ctrl.reconcile <- struct{}{}:
default:
}
})
}
}

type runnerConfig struct {
net string
addr netip.AddrPort
ctrl.manager.ServeBackground(ctx)
}

func newDNSRunner(cfg runnerConfig, rootHandler dnssrv.Handler, logger *zap.Logger, forwardEnabled bool) (*dns.Server, error) {
if cfg.addr.Addr().Is6() {
cfg.net += "6"
}

logger = logger.With(zap.String("net", cfg.net), zap.Stringer("addr", cfg.addr))

var serverOpts dns.ServerOptions
func (ctrl *DNSResolveCacheController) eventHook(event suture.Event) {
ctrl.Logger.Info("dns-resolve-cache-runners event", zap.String("event", event.String()))

controlFn, ctrlErr := dns.MakeControl(cfg.net, forwardEnabled)
if ctrlErr != nil {
return nil, fmt.Errorf("error creating %q control function: %w", cfg.net, ctrlErr)
select {
case ctrl.reconcile <- struct{}{}:
default:
}

switch cfg.net {
case "udp", "udp6":
packetConn, err := dns.NewUDPPacketConn(cfg.net, cfg.addr.String(), controlFn)
if err != nil {
return nil, fmt.Errorf("error creating %q packet conn: %w", cfg.net, err)
}

serverOpts = dns.ServerOptions{
PacketConn: packetConn,
Handler: rootHandler,
Logger: logger,
}

case "tcp", "tcp6":
listener, err := dns.NewTCPListener(cfg.net, cfg.addr.String(), controlFn)
if err != nil {
return nil, fmt.Errorf("error creating %q listener: %w", cfg.net, err)
}

serverOpts = dns.ServerOptions{
Listener: listener,
Handler: rootHandler,
ReadTimeout: 3 * time.Second,
WriteTimeout: 5 * time.Second,
IdleTimeout: func() time.Duration { return 10 * time.Second },
MaxTCPQueries: -1,
Logger: logger,
}
}

return dns.NewServer(serverOpts), nil
}

type stateMapper struct {
state state.State
}

func (s *stateMapper) ResolveAddr(ctx context.Context, qType uint16, name string) []netip.Addr {
name = strings.TrimRight(name, ".")
type memberReader struct{ st state.State }

list, err := safe.ReaderListAll[*cluster.Member](ctx, s.state)
func (m *memberReader) ReadMembers(ctx context.Context) (iter.Seq[*cluster.Member], error) {
list, err := safe.ReaderListAll[*cluster.Member](ctx, m.st)
if err != nil {
return nil
return nil, err
}

elem, ok := list.Find(func(res *cluster.Member) bool {
return fqdnMatch(name, res.TypedSpec().Hostname) || fqdnMatch(name, res.Metadata().ID())
})
if !ok {
return nil
}

result := slices.DeleteFunc(slices.Clone(elem.TypedSpec().Addresses), func(addr netip.Addr) bool {
return !((qType == dnssrv.TypeA && addr.Is4()) || (qType == dnssrv.TypeAAAA && addr.Is6()))
})

if len(result) == 0 {
return nil
}

return result
}

func fqdnMatch(what, where string) bool {
what = strings.TrimRight(what, ".")
where = strings.TrimRight(where, ".")

if what == where {
return true
}

first, _, found := strings.Cut(where, ".")
if !found {
return false
}

return what == first
return list.All(), nil
}

type addrsArr safe.List[*network.DNSUpstream]
Expand All @@ -360,3 +218,18 @@ func (a addrsArr) MarshalLogArray(encoder zapcore.ArrayEncoder) error {

return nil
}

func allAddressPairs(addresses []netip.AddrPort) iter.Seq[dns.AddressPair] {
return func(yield func(dns.AddressPair) bool) {
for _, addr := range addresses {
for _, netwk := range []string{"udp", "tcp"} {
if !yield(dns.AddressPair{
Network: netwk,
Addr: addr,
}) {
return
}
}
}
}
}
Loading

0 comments on commit 96c6144

Please sign in to comment.