From 60fcbc5a2a646e17efd1b63669debd8c2c9ac091 Mon Sep 17 00:00:00 2001 From: zoetrope Date: Thu, 7 Sep 2023 18:49:14 +0900 Subject: [PATCH] Fix blocking by kubelet-restart op --- server/strategy.go | 89 +++++++++++++++++++++++------------------ server/strategy_test.go | 6 +-- 2 files changed, 50 insertions(+), 45 deletions(-) diff --git a/server/strategy.go b/server/strategy.go index b7d2f107..471cecd4 100644 --- a/server/strategy.go +++ b/server/strategy.go @@ -105,35 +105,32 @@ func DecideOps(c *cke.Cluster, cs *cke.ClusterStatus, constraints *cke.Constrain func riversOps(c *cke.Cluster, nf *NodeFilter, maxConcurrentUpdates int) (ops []cke.Operator) { if nodes := nf.SSHConnectedNodes(nf.RiversStoppedNodes(), true, true); len(nodes) > 0 { - ops = append(ops, splitOperators(nodes, func(ns []*cke.Node) cke.Operator { - return op.RiversBootOp(ns, nf.ControlPlane(), c.Options.Rivers, op.RiversContainerName, op.RiversUpstreamPort, op.RiversListenPort) - }, maxConcurrentUpdates)...) + max := maxConcurrentUpdates + if len(nodes) < max { + max = len(nodes) + } + ops = append(ops, op.RiversBootOp(nodes[:max], nf.ControlPlane(), c.Options.Rivers, op.RiversContainerName, op.RiversUpstreamPort, op.RiversListenPort)) } if nodes := nf.SSHConnectedNodes(nf.RiversOutdatedNodes(), true, true); len(nodes) > 0 { - ops = append(ops, splitOperators(nodes, func(ns []*cke.Node) cke.Operator { - return op.RiversRestartOp(ns, nf.ControlPlane(), c.Options.Rivers, op.RiversContainerName, op.RiversUpstreamPort, op.RiversListenPort) - }, maxConcurrentUpdates)...) + max := maxConcurrentUpdates + if len(nodes) < max { + max = len(nodes) + } + ops = append(ops, op.RiversRestartOp(nodes[:max], nf.ControlPlane(), c.Options.Rivers, op.RiversContainerName, op.RiversUpstreamPort, op.RiversListenPort)) } if nodes := nf.SSHConnectedNodes(nf.EtcdRiversStoppedNodes(), true, false); len(nodes) > 0 { - ops = append(ops, splitOperators(nodes, func(ns []*cke.Node) cke.Operator { - return op.RiversBootOp(ns, nf.ControlPlane(), c.Options.EtcdRivers, op.EtcdRiversContainerName, op.EtcdRiversUpstreamPort, op.EtcdRiversListenPort) - }, maxConcurrentUpdates)...) + max := maxConcurrentUpdates + if len(nodes) < max { + max = len(nodes) + } + ops = append(ops, op.RiversBootOp(nodes[:max], nf.ControlPlane(), c.Options.EtcdRivers, op.EtcdRiversContainerName, op.EtcdRiversUpstreamPort, op.EtcdRiversListenPort)) } if nodes := nf.SSHConnectedNodes(nf.EtcdRiversOutdatedNodes(), true, false); len(nodes) > 0 { - ops = append(ops, splitOperators(nodes, func(ns []*cke.Node) cke.Operator { - return op.RiversRestartOp(ns, nf.ControlPlane(), c.Options.EtcdRivers, op.EtcdRiversContainerName, op.EtcdRiversUpstreamPort, op.EtcdRiversListenPort) - }, maxConcurrentUpdates)...) - } - return ops -} - -func splitOperators(nodes []*cke.Node, createOps func(ns []*cke.Node) cke.Operator, maxConcurrentUpdates int) (ops []cke.Operator) { - for i := 0; i < len(nodes); i += maxConcurrentUpdates { - end := i + maxConcurrentUpdates - if end > len(nodes) { - end = len(nodes) + max := maxConcurrentUpdates + if len(nodes) < max { + max = len(nodes) } - ops = append(ops, createOps(nodes[i:end])) + ops = append(ops, op.RiversRestartOp(nodes[:max], nf.ControlPlane(), c.Options.EtcdRivers, op.EtcdRiversContainerName, op.EtcdRiversUpstreamPort, op.EtcdRiversListenPort)) } return ops } @@ -164,34 +161,46 @@ func k8sOps(c *cke.Cluster, nf *NodeFilter, cs *cke.ClusterStatus, maxConcurrent // For all nodes apiServer := nf.HealthyAPIServer() if nodes := nf.SSHConnectedNodes(nf.KubeletUnrecognizedNodes(), true, true); len(nodes) > 0 { - ops = append(ops, splitOperators(nodes, func(ns []*cke.Node) cke.Operator { - return k8s.KubeletRestartOp(ns, c.Name, c.Options.Kubelet, cs.NodeStatuses) - }, maxConcurrentUpdates)...) + max := maxConcurrentUpdates + if len(nodes) < max { + max = len(nodes) + } + ops = append(ops, k8s.KubeletRestartOp(nodes[:max], c.Name, c.Options.Kubelet, cs.NodeStatuses)) } if nodes := nf.SSHConnectedNodes(nf.KubeletStoppedNodes(), true, true); len(nodes) > 0 { - ops = append(ops, splitOperators(nodes, func(ns []*cke.Node) cke.Operator { - return k8s.KubeletBootOp(ns, nf.KubeletStoppedRegisteredNodes(), apiServer, c.Name, c.Options.Kubelet, cs.NodeStatuses) - }, maxConcurrentUpdates)...) + max := maxConcurrentUpdates + if len(nodes) < max { + max = len(nodes) + } + ops = append(ops, k8s.KubeletBootOp(nodes[:max], nf.KubeletStoppedRegisteredNodes(), apiServer, c.Name, c.Options.Kubelet, cs.NodeStatuses)) } if nodes := nf.SSHConnectedNodes(nf.KubeletOutdatedNodes(), true, true); len(nodes) > 0 { - ops = append(ops, splitOperators(nodes, func(ns []*cke.Node) cke.Operator { - return k8s.KubeletRestartOp(ns, c.Name, c.Options.Kubelet, cs.NodeStatuses) - }, maxConcurrentUpdates)...) + max := maxConcurrentUpdates + if len(nodes) < max { + max = len(nodes) + } + ops = append(ops, k8s.KubeletRestartOp(nodes[:max], c.Name, c.Options.Kubelet, cs.NodeStatuses)) } if nodes := nf.SSHConnectedNodes(nf.ProxyStoppedNodes(), true, true); len(nodes) > 0 { - ops = append(ops, splitOperators(nodes, func(ns []*cke.Node) cke.Operator { - return k8s.KubeProxyBootOp(ns, c.Name, "", c.Options.Proxy) - }, maxConcurrentUpdates)...) + max := maxConcurrentUpdates + if len(nodes) < max { + max = len(nodes) + } + ops = append(ops, k8s.KubeProxyBootOp(nodes[:max], c.Name, "", c.Options.Proxy)) } if nodes := nf.SSHConnectedNodes(nf.ProxyOutdatedNodes(c.Options.Proxy), true, true); len(nodes) > 0 { - ops = append(ops, splitOperators(nodes, func(ns []*cke.Node) cke.Operator { - return k8s.KubeProxyRestartOp(ns, c.Name, "", c.Options.Proxy) - }, maxConcurrentUpdates)...) + max := maxConcurrentUpdates + if len(nodes) < max { + max = len(nodes) + } + ops = append(ops, k8s.KubeProxyRestartOp(nodes[:max], c.Name, "", c.Options.Proxy)) } if nodes := nf.SSHConnectedNodes(nf.ProxyRunningUnexpectedlyNodes(), true, true); len(nodes) > 0 { - ops = append(ops, splitOperators(nodes, func(ns []*cke.Node) cke.Operator { - return op.ProxyStopOp(ns) - }, maxConcurrentUpdates)...) + max := maxConcurrentUpdates + if len(nodes) < max { + max = len(nodes) + } + ops = append(ops, op.ProxyStopOp(nodes[:max])) } return ops } diff --git a/server/strategy_test.go b/server/strategy_test.go index e10dfe09..2ef6f049 100644 --- a/server/strategy_test.go +++ b/server/strategy_test.go @@ -608,7 +608,7 @@ func TestDecideOps(t *testing.T) { { Name: "BootRivers", Input: newData(), - ExpectedOps: []opData{{"rivers-bootstrap", 5}, {"rivers-bootstrap", 1}, {"etcd-rivers-bootstrap", 3}}, + ExpectedOps: []opData{{"rivers-bootstrap", 5}, {"etcd-rivers-bootstrap", 3}}, }, { Name: "BootRivers2", @@ -749,9 +749,7 @@ func TestDecideOps(t *testing.T) { {"kube-controller-manager-bootstrap", 3}, {"kube-scheduler-bootstrap", 3}, {"kubelet-bootstrap", 5}, - {"kubelet-bootstrap", 1}, {"kube-proxy-bootstrap", 5}, - {"kube-proxy-bootstrap", 1}, }, }, { @@ -983,7 +981,6 @@ func TestDecideOps(t *testing.T) { Input: newData().withAllServices().withDisableProxy(), ExpectedOps: []opData{ {"stop-kube-proxy", 5}, - {"stop-kube-proxy", 1}, }, }, { @@ -1032,7 +1029,6 @@ func TestDecideOps(t *testing.T) { ExpectedOps: []opData{ {"kube-apiserver-restart", 3}, {"kubelet-restart", 5}, - {"kubelet-restart", 1}, }, }, {