Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

test: use node informer instead of raw watch #10043

Merged
merged 1 commit into from
Dec 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 6 additions & 17 deletions internal/integration/api/node-annotations.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,6 @@ import (
"time"

v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/watch"

"github.com/siderolabs/talos/internal/integration/base"
machineapi "github.com/siderolabs/talos/pkg/machinery/api/machine"
Expand Down Expand Up @@ -68,21 +66,15 @@ func (suite *NodeAnnotationsSuite) testUpdate(node string) {

suite.T().Logf("updating annotations on node %q (%q)", node, k8sNode.Name)

watcher, err := suite.Clientset.CoreV1().Nodes().Watch(suite.ctx, metav1.ListOptions{
FieldSelector: metadataKeyName + k8sNode.Name,
Watch: true,
})
suite.Require().NoError(err)

defer watcher.Stop()
watchCh := suite.SetupNodeInformer(suite.ctx, k8sNode.Name)

// set two new annotation
suite.setNodeAnnotations(node, map[string]string{
"talos.dev/ann1": "value1",
"talos.dev/ann2": "value2",
})

suite.waitUntil(watcher, map[string]string{
suite.waitUntil(watchCh, map[string]string{
"talos.dev/ann1": "value1",
"talos.dev/ann2": "value2",
})
Expand All @@ -92,28 +84,25 @@ func (suite *NodeAnnotationsSuite) testUpdate(node string) {
"talos.dev/ann1": "foo",
})

suite.waitUntil(watcher, map[string]string{
suite.waitUntil(watchCh, map[string]string{
"talos.dev/ann1": "foo",
"talos.dev/ann2": "",
})

// remove all Talos annoations
suite.setNodeAnnotations(node, nil)

suite.waitUntil(watcher, map[string]string{
suite.waitUntil(watchCh, map[string]string{
"talos.dev/ann1": "",
"talos.dev/ann2": "",
})
}

func (suite *NodeAnnotationsSuite) waitUntil(watcher watch.Interface, expectedAnnotations map[string]string) {
func (suite *NodeAnnotationsSuite) waitUntil(watchCh <-chan *v1.Node, expectedAnnotations map[string]string) {
outer:
for {
select {
case ev := <-watcher.ResultChan():
k8sNode, ok := ev.Object.(*v1.Node)
suite.Require().Truef(ok, "watch event is not of type v1.Node but was %T", ev.Object)

case k8sNode := <-watchCh:
suite.T().Logf("annotations %#v", k8sNode.Annotations)

for k, v := range expectedAnnotations {
Expand Down
41 changes: 11 additions & 30 deletions internal/integration/api/node-labels.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@ import (

"github.com/siderolabs/go-pointer"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/watch"

"github.com/siderolabs/talos/internal/integration/base"
machineapi "github.com/siderolabs/talos/pkg/machinery/api/machine"
Expand Down Expand Up @@ -63,22 +61,14 @@ func (suite *NodeLabelsSuite) TestUpdateWorker() {
suite.testUpdate(node, false)
}

const metadataKeyName = "metadata.name="

// testUpdate cycles through a set of node label updates reverting the change in the end.
func (suite *NodeLabelsSuite) testUpdate(node string, isControlplane bool) {
k8sNode, err := suite.GetK8sNodeByInternalIP(suite.ctx, node)
suite.Require().NoError(err)

suite.T().Logf("updating labels on node %q (%q)", node, k8sNode.Name)

watcher, err := suite.Clientset.CoreV1().Nodes().Watch(suite.ctx, metav1.ListOptions{
FieldSelector: metadataKeyName + k8sNode.Name,
Watch: true,
})
suite.Require().NoError(err)

defer watcher.Stop()
watchCh := suite.SetupNodeInformer(suite.ctx, k8sNode.Name)

const stdLabelName = "kubernetes.io/hostname"

Expand All @@ -90,7 +80,7 @@ func (suite *NodeLabelsSuite) testUpdate(node string, isControlplane bool) {
"talos.dev/test2": "value2",
})

suite.waitUntil(watcher, map[string]string{
suite.waitUntil(watchCh, map[string]string{
"talos.dev/test1": "value1",
"talos.dev/test2": "value2",
}, isControlplane)
Expand All @@ -100,7 +90,7 @@ func (suite *NodeLabelsSuite) testUpdate(node string, isControlplane bool) {
"talos.dev/test1": "foo",
})

suite.waitUntil(watcher, map[string]string{
suite.waitUntil(watchCh, map[string]string{
"talos.dev/test1": "foo",
"talos.dev/test2": "",
}, isControlplane)
Expand All @@ -112,7 +102,7 @@ func (suite *NodeLabelsSuite) testUpdate(node string, isControlplane bool) {
stdLabelName: "bar",
})

suite.waitUntil(watcher, map[string]string{
suite.waitUntil(watchCh, map[string]string{
"talos.dev/test1": "foo2",
stdLabelName: stdLabelValue,
}, isControlplane)
Expand All @@ -121,7 +111,7 @@ func (suite *NodeLabelsSuite) testUpdate(node string, isControlplane bool) {
// remove all Talos Labels
suite.setNodeLabels(node, nil)

suite.waitUntil(watcher, map[string]string{
suite.waitUntil(watchCh, map[string]string{
"talos.dev/test1": "",
"talos.dev/test2": "",
}, isControlplane)
Expand All @@ -136,34 +126,25 @@ func (suite *NodeLabelsSuite) TestAllowScheduling() {

suite.T().Logf("updating taints on node %q (%q)", node, k8sNode.Name)

watcher, err := suite.Clientset.CoreV1().Nodes().Watch(suite.ctx, metav1.ListOptions{
FieldSelector: metadataKeyName + k8sNode.Name,
Watch: true,
})
suite.Require().NoError(err)
watchCh := suite.SetupNodeInformer(suite.ctx, k8sNode.Name)

defer watcher.Stop()

suite.waitUntil(watcher, nil, true)
suite.waitUntil(watchCh, nil, true)

suite.setAllowScheduling(node, true)

suite.waitUntil(watcher, nil, false)
suite.waitUntil(watchCh, nil, false)

suite.setAllowScheduling(node, false)

suite.waitUntil(watcher, nil, true)
suite.waitUntil(watchCh, nil, true)
}

//nolint:gocyclo
func (suite *NodeLabelsSuite) waitUntil(watcher watch.Interface, expectedLabels map[string]string, taintNoSchedule bool) {
func (suite *NodeLabelsSuite) waitUntil(watchCh <-chan *v1.Node, expectedLabels map[string]string, taintNoSchedule bool) {
outer:
for {
select {
case ev := <-watcher.ResultChan():
k8sNode, ok := ev.Object.(*v1.Node)
suite.Require().Truef(ok, "watch event is not of type v1.Node but was %T", ev.Object)

case k8sNode := <-watchCh:
suite.T().Logf("labels %#v, taints %#v", k8sNode.Labels, k8sNode.Spec.Taints)

for k, v := range expectedLabels {
Expand Down
23 changes: 6 additions & 17 deletions internal/integration/api/node-taints.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@ import (
"github.com/siderolabs/gen/xslices"
"github.com/siderolabs/gen/xtesting/must"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/watch"

"github.com/siderolabs/talos/internal/integration/base"
machineapi "github.com/siderolabs/talos/pkg/machinery/api/machine"
Expand Down Expand Up @@ -67,21 +65,15 @@ func (suite *NodeTaintsSuite) testUpdate(node string) {

suite.T().Logf("updating taints on node %q (%q)", node, k8sNode.Name)

watcher, err := suite.Clientset.CoreV1().Nodes().Watch(suite.ctx, metav1.ListOptions{
FieldSelector: metadataKeyName + k8sNode.Name,
Watch: true,
})
suite.Require().NoError(err)

defer watcher.Stop()
watchCh := suite.SetupNodeInformer(suite.ctx, k8sNode.Name)

// set two new taints
suite.setNodeTaints(node, map[string]string{
"talos.dev/test1": "value1:NoSchedule",
"talos.dev/test2": "NoSchedule",
})

suite.waitUntil(watcher, map[string]string{
suite.waitUntil(watchCh, map[string]string{
constants.LabelNodeRoleControlPlane: "NoSchedule",
"talos.dev/test1": "value1:NoSchedule",
"talos.dev/test2": "NoSchedule",
Expand All @@ -92,27 +84,24 @@ func (suite *NodeTaintsSuite) testUpdate(node string) {
"talos.dev/test1": "value1:NoSchedule",
})

suite.waitUntil(watcher, map[string]string{
suite.waitUntil(watchCh, map[string]string{
constants.LabelNodeRoleControlPlane: "NoSchedule",
"talos.dev/test1": "value1:NoSchedule",
})

// remove all taints
suite.setNodeTaints(node, nil)

suite.waitUntil(watcher, map[string]string{
suite.waitUntil(watchCh, map[string]string{
constants.LabelNodeRoleControlPlane: "NoSchedule",
})
}

func (suite *NodeTaintsSuite) waitUntil(watcher watch.Interface, expectedTaints map[string]string) {
func (suite *NodeTaintsSuite) waitUntil(watchCh <-chan *v1.Node, expectedTaints map[string]string) {
outer:
for {
select {
case ev := <-watcher.ResultChan():
k8sNode, ok := ev.Object.(*v1.Node)
suite.Require().Truef(ok, "watch event is not of type v1.Node but was %T", ev.Object)

case k8sNode := <-watchCh:
suite.T().Logf("labels %#v, taints %#v", k8sNode.Labels, k8sNode.Spec.Taints)

taints := xslices.ToMap(k8sNode.Spec.Taints, func(t v1.Taint) (string, string) {
Expand Down
45 changes: 45 additions & 0 deletions internal/integration/base/k8s.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"strings"
"time"

"github.com/siderolabs/gen/channel"
"github.com/siderolabs/gen/xslices"
"github.com/siderolabs/go-pointer"
"github.com/siderolabs/go-retry/retry"
Expand All @@ -37,6 +38,7 @@ import (
"k8s.io/client-go/discovery"
"k8s.io/client-go/discovery/cached/memory"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/restmapper"
Expand Down Expand Up @@ -804,3 +806,46 @@ func (k8sSuite *K8sSuite) ToUnstructured(obj runtime.Object) unstructured.Unstru

return u
}

// SetupNodeInformer sets up a node informer for the given node name.
func (k8sSuite *K8sSuite) SetupNodeInformer(ctx context.Context, nodeName string) <-chan *corev1.Node {
const metadataKeyName = "metadata.name="

watchCh := make(chan *corev1.Node)

informerFactory := informers.NewSharedInformerFactoryWithOptions(k8sSuite.Clientset, 30*time.Second, informers.WithTweakListOptions(func(options *metav1.ListOptions) {
options.FieldSelector = metadataKeyName + nodeName
}))

nodeInformer := informerFactory.Core().V1().Nodes().Informer()
_, err := nodeInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj any) {
node, ok := obj.(*corev1.Node)
if !ok {
return
}

channel.SendWithContext(ctx, watchCh, node)
},
UpdateFunc: func(_, obj any) {
node, ok := obj.(*corev1.Node)
if !ok {
return
}

channel.SendWithContext(ctx, watchCh, node)
},
})
k8sSuite.Require().NoError(err)

informerFactory.Start(ctx.Done())
k8sSuite.T().Cleanup(informerFactory.Shutdown)

result := informerFactory.WaitForCacheSync(ctx.Done())

for k, v := range result {
k8sSuite.Assert().True(v, "informer %q failed to sync", k.String())
}

return watchCh
}