diff --git a/internal/integration/api/node-annotations.go b/internal/integration/api/node-annotations.go index 784c422072..29a34bc77f 100644 --- a/internal/integration/api/node-annotations.go +++ b/internal/integration/api/node-annotations.go @@ -11,8 +11,6 @@ import ( "time" v1 "k8s.io/api/core/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/watch" "github.com/siderolabs/talos/internal/integration/base" machineapi "github.com/siderolabs/talos/pkg/machinery/api/machine" @@ -68,13 +66,7 @@ func (suite *NodeAnnotationsSuite) testUpdate(node string) { suite.T().Logf("updating annotations on node %q (%q)", node, k8sNode.Name) - watcher, err := suite.Clientset.CoreV1().Nodes().Watch(suite.ctx, metav1.ListOptions{ - FieldSelector: metadataKeyName + k8sNode.Name, - Watch: true, - }) - suite.Require().NoError(err) - - defer watcher.Stop() + watchCh := suite.SetupNodeInformer(suite.ctx, k8sNode.Name) // set two new annotation suite.setNodeAnnotations(node, map[string]string{ @@ -82,7 +74,7 @@ func (suite *NodeAnnotationsSuite) testUpdate(node string) { "talos.dev/ann2": "value2", }) - suite.waitUntil(watcher, map[string]string{ + suite.waitUntil(watchCh, map[string]string{ "talos.dev/ann1": "value1", "talos.dev/ann2": "value2", }) @@ -92,7 +84,7 @@ func (suite *NodeAnnotationsSuite) testUpdate(node string) { "talos.dev/ann1": "foo", }) - suite.waitUntil(watcher, map[string]string{ + suite.waitUntil(watchCh, map[string]string{ "talos.dev/ann1": "foo", "talos.dev/ann2": "", }) @@ -100,20 +92,17 @@ func (suite *NodeAnnotationsSuite) testUpdate(node string) { // remove all Talos annoations suite.setNodeAnnotations(node, nil) - suite.waitUntil(watcher, map[string]string{ + suite.waitUntil(watchCh, map[string]string{ "talos.dev/ann1": "", "talos.dev/ann2": "", }) } -func (suite *NodeAnnotationsSuite) waitUntil(watcher watch.Interface, expectedAnnotations map[string]string) { +func (suite *NodeAnnotationsSuite) waitUntil(watchCh <-chan *v1.Node, expectedAnnotations map[string]string) { outer: for { select { - case ev := <-watcher.ResultChan(): - k8sNode, ok := ev.Object.(*v1.Node) - suite.Require().Truef(ok, "watch event is not of type v1.Node but was %T", ev.Object) - + case k8sNode := <-watchCh: suite.T().Logf("annotations %#v", k8sNode.Annotations) for k, v := range expectedAnnotations { diff --git a/internal/integration/api/node-labels.go b/internal/integration/api/node-labels.go index a9f94e185f..2365a12ac5 100644 --- a/internal/integration/api/node-labels.go +++ b/internal/integration/api/node-labels.go @@ -12,8 +12,6 @@ import ( "github.com/siderolabs/go-pointer" v1 "k8s.io/api/core/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/watch" "github.com/siderolabs/talos/internal/integration/base" machineapi "github.com/siderolabs/talos/pkg/machinery/api/machine" @@ -63,8 +61,6 @@ func (suite *NodeLabelsSuite) TestUpdateWorker() { suite.testUpdate(node, false) } -const metadataKeyName = "metadata.name=" - // testUpdate cycles through a set of node label updates reverting the change in the end. func (suite *NodeLabelsSuite) testUpdate(node string, isControlplane bool) { k8sNode, err := suite.GetK8sNodeByInternalIP(suite.ctx, node) @@ -72,13 +68,7 @@ func (suite *NodeLabelsSuite) testUpdate(node string, isControlplane bool) { suite.T().Logf("updating labels on node %q (%q)", node, k8sNode.Name) - watcher, err := suite.Clientset.CoreV1().Nodes().Watch(suite.ctx, metav1.ListOptions{ - FieldSelector: metadataKeyName + k8sNode.Name, - Watch: true, - }) - suite.Require().NoError(err) - - defer watcher.Stop() + watchCh := suite.SetupNodeInformer(suite.ctx, k8sNode.Name) const stdLabelName = "kubernetes.io/hostname" @@ -90,7 +80,7 @@ func (suite *NodeLabelsSuite) testUpdate(node string, isControlplane bool) { "talos.dev/test2": "value2", }) - suite.waitUntil(watcher, map[string]string{ + suite.waitUntil(watchCh, map[string]string{ "talos.dev/test1": "value1", "talos.dev/test2": "value2", }, isControlplane) @@ -100,7 +90,7 @@ func (suite *NodeLabelsSuite) testUpdate(node string, isControlplane bool) { "talos.dev/test1": "foo", }) - suite.waitUntil(watcher, map[string]string{ + suite.waitUntil(watchCh, map[string]string{ "talos.dev/test1": "foo", "talos.dev/test2": "", }, isControlplane) @@ -112,7 +102,7 @@ func (suite *NodeLabelsSuite) testUpdate(node string, isControlplane bool) { stdLabelName: "bar", }) - suite.waitUntil(watcher, map[string]string{ + suite.waitUntil(watchCh, map[string]string{ "talos.dev/test1": "foo2", stdLabelName: stdLabelValue, }, isControlplane) @@ -121,7 +111,7 @@ func (suite *NodeLabelsSuite) testUpdate(node string, isControlplane bool) { // remove all Talos Labels suite.setNodeLabels(node, nil) - suite.waitUntil(watcher, map[string]string{ + suite.waitUntil(watchCh, map[string]string{ "talos.dev/test1": "", "talos.dev/test2": "", }, isControlplane) @@ -136,34 +126,25 @@ func (suite *NodeLabelsSuite) TestAllowScheduling() { suite.T().Logf("updating taints on node %q (%q)", node, k8sNode.Name) - watcher, err := suite.Clientset.CoreV1().Nodes().Watch(suite.ctx, metav1.ListOptions{ - FieldSelector: metadataKeyName + k8sNode.Name, - Watch: true, - }) - suite.Require().NoError(err) + watchCh := suite.SetupNodeInformer(suite.ctx, k8sNode.Name) - defer watcher.Stop() - - suite.waitUntil(watcher, nil, true) + suite.waitUntil(watchCh, nil, true) suite.setAllowScheduling(node, true) - suite.waitUntil(watcher, nil, false) + suite.waitUntil(watchCh, nil, false) suite.setAllowScheduling(node, false) - suite.waitUntil(watcher, nil, true) + suite.waitUntil(watchCh, nil, true) } //nolint:gocyclo -func (suite *NodeLabelsSuite) waitUntil(watcher watch.Interface, expectedLabels map[string]string, taintNoSchedule bool) { +func (suite *NodeLabelsSuite) waitUntil(watchCh <-chan *v1.Node, expectedLabels map[string]string, taintNoSchedule bool) { outer: for { select { - case ev := <-watcher.ResultChan(): - k8sNode, ok := ev.Object.(*v1.Node) - suite.Require().Truef(ok, "watch event is not of type v1.Node but was %T", ev.Object) - + case k8sNode := <-watchCh: suite.T().Logf("labels %#v, taints %#v", k8sNode.Labels, k8sNode.Spec.Taints) for k, v := range expectedLabels { diff --git a/internal/integration/api/node-taints.go b/internal/integration/api/node-taints.go index dc0220263c..5142397320 100644 --- a/internal/integration/api/node-taints.go +++ b/internal/integration/api/node-taints.go @@ -16,8 +16,6 @@ import ( "github.com/siderolabs/gen/xslices" "github.com/siderolabs/gen/xtesting/must" v1 "k8s.io/api/core/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/watch" "github.com/siderolabs/talos/internal/integration/base" machineapi "github.com/siderolabs/talos/pkg/machinery/api/machine" @@ -67,13 +65,7 @@ func (suite *NodeTaintsSuite) testUpdate(node string) { suite.T().Logf("updating taints on node %q (%q)", node, k8sNode.Name) - watcher, err := suite.Clientset.CoreV1().Nodes().Watch(suite.ctx, metav1.ListOptions{ - FieldSelector: metadataKeyName + k8sNode.Name, - Watch: true, - }) - suite.Require().NoError(err) - - defer watcher.Stop() + watchCh := suite.SetupNodeInformer(suite.ctx, k8sNode.Name) // set two new taints suite.setNodeTaints(node, map[string]string{ @@ -81,7 +73,7 @@ func (suite *NodeTaintsSuite) testUpdate(node string) { "talos.dev/test2": "NoSchedule", }) - suite.waitUntil(watcher, map[string]string{ + suite.waitUntil(watchCh, map[string]string{ constants.LabelNodeRoleControlPlane: "NoSchedule", "talos.dev/test1": "value1:NoSchedule", "talos.dev/test2": "NoSchedule", @@ -92,7 +84,7 @@ func (suite *NodeTaintsSuite) testUpdate(node string) { "talos.dev/test1": "value1:NoSchedule", }) - suite.waitUntil(watcher, map[string]string{ + suite.waitUntil(watchCh, map[string]string{ constants.LabelNodeRoleControlPlane: "NoSchedule", "talos.dev/test1": "value1:NoSchedule", }) @@ -100,19 +92,16 @@ func (suite *NodeTaintsSuite) testUpdate(node string) { // remove all taints suite.setNodeTaints(node, nil) - suite.waitUntil(watcher, map[string]string{ + suite.waitUntil(watchCh, map[string]string{ constants.LabelNodeRoleControlPlane: "NoSchedule", }) } -func (suite *NodeTaintsSuite) waitUntil(watcher watch.Interface, expectedTaints map[string]string) { +func (suite *NodeTaintsSuite) waitUntil(watchCh <-chan *v1.Node, expectedTaints map[string]string) { outer: for { select { - case ev := <-watcher.ResultChan(): - k8sNode, ok := ev.Object.(*v1.Node) - suite.Require().Truef(ok, "watch event is not of type v1.Node but was %T", ev.Object) - + case k8sNode := <-watchCh: suite.T().Logf("labels %#v, taints %#v", k8sNode.Labels, k8sNode.Spec.Taints) taints := xslices.ToMap(k8sNode.Spec.Taints, func(t v1.Taint) (string, string) { diff --git a/internal/integration/base/k8s.go b/internal/integration/base/k8s.go index 9fa70b9497..a984d86db4 100644 --- a/internal/integration/base/k8s.go +++ b/internal/integration/base/k8s.go @@ -21,6 +21,7 @@ import ( "strings" "time" + "github.com/siderolabs/gen/channel" "github.com/siderolabs/gen/xslices" "github.com/siderolabs/go-pointer" "github.com/siderolabs/go-retry/retry" @@ -37,6 +38,7 @@ import ( "k8s.io/client-go/discovery" "k8s.io/client-go/discovery/cached/memory" "k8s.io/client-go/dynamic" + "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" "k8s.io/client-go/restmapper" @@ -804,3 +806,46 @@ func (k8sSuite *K8sSuite) ToUnstructured(obj runtime.Object) unstructured.Unstru return u } + +// SetupNodeInformer sets up a node informer for the given node name. +func (k8sSuite *K8sSuite) SetupNodeInformer(ctx context.Context, nodeName string) <-chan *corev1.Node { + const metadataKeyName = "metadata.name=" + + watchCh := make(chan *corev1.Node) + + informerFactory := informers.NewSharedInformerFactoryWithOptions(k8sSuite.Clientset, 30*time.Second, informers.WithTweakListOptions(func(options *metav1.ListOptions) { + options.FieldSelector = metadataKeyName + nodeName + })) + + nodeInformer := informerFactory.Core().V1().Nodes().Informer() + _, err := nodeInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj any) { + node, ok := obj.(*corev1.Node) + if !ok { + return + } + + channel.SendWithContext(ctx, watchCh, node) + }, + UpdateFunc: func(_, obj any) { + node, ok := obj.(*corev1.Node) + if !ok { + return + } + + channel.SendWithContext(ctx, watchCh, node) + }, + }) + k8sSuite.Require().NoError(err) + + informerFactory.Start(ctx.Done()) + k8sSuite.T().Cleanup(informerFactory.Shutdown) + + result := informerFactory.WaitForCacheSync(ctx.Done()) + + for k, v := range result { + k8sSuite.Assert().True(v, "informer %q failed to sync", k.String()) + } + + return watchCh +}