Skip to content

Commit

Permalink
Draining v2 (#310)
Browse files Browse the repository at this point in the history
* Processing of DrainerConfig in parallel

* Created dedicated drainNode function

* Adjusting import order

* Draining master nodes sequentially and worker nodes in parallel

* Added more labels and values to detect a master node

* Adjusting timeouts

* Waiting in case at least 1 master is not ready

* Waiting in case at least 1 master is not ready only if we are draining a master node

* Testing parallel master draining

* Testing parallel master draining - commented unused code

* Making sure the entry in the state is removed

* Code cleanup

* Adjusted Changelog

* Ignoring pkg:golang/golang.org/x/[email protected] in Nancy - indirect dependency

* Adjusted comments and changelog
  • Loading branch information
silenteh authored Mar 2, 2023
1 parent c5a4678 commit 0efd1a4
Show file tree
Hide file tree
Showing 5 changed files with 175 additions and 48 deletions.
5 changes: 4 additions & 1 deletion .nancy-ignore
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,7 @@ CVE-2022-42708
CVE-2022-32149

# Ignoring [email protected]
sonatype-2022-6522 until=2023-06-25
sonatype-2022-6522 until=2023-06-25

# Ignoring pkg:golang/golang.org/x/[email protected] (indirect dependency)
CVE-2022-41723 until=2023-06-25
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

- Use Kubernetes Events on `AWSCluster` CR for draining status updates.
- Added the use the runtime/default seccomp profile.
- Concurrent execution of drainining to make sure the operator can handle multiple nodes going down for multiple clusters running in the same MC

### Changed

Expand Down
200 changes: 155 additions & 45 deletions service/controller/resource/drainer/create.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ import (
"k8s.io/client-go/rest"
"k8s.io/kubectl/pkg/drain"

v1alpha1 "github.com/giantswarm/node-operator/api"

"github.com/giantswarm/node-operator/service/controller/key"
)

Expand Down Expand Up @@ -148,67 +150,63 @@ func (r *Resource) EnsureCreated(ctx context.Context, obj interface{}) error {
}

typeOfNode := "worker"
// In case of master nodes, just delete the pods, and don't evict them
// In case of master nodes, adjust the timeouts and make them shorter
if nodeIsMaster(&node) {

// For the master node wait 30 seconds for a pod to be terminated
nodeShutdownHelper.GracePeriodSeconds = 30
// 45 seconds pods termination grace period
nodeShutdownHelper.GracePeriodSeconds = 45

// We want the master nodes to be terminated rather quickly
nodeShutdownHelper.Timeout = 1 * time.Minute
// 1 minute max timout since we are blocking here
nodeShutdownHelper.Timeout = 2 * time.Minute

// Set type to master
typeOfNode = "master"
}

// Cordon the node
if err := drain.RunCordonOrUncordon(&nodeShutdownHelper, &node, true); err != nil {
r.logger.LogCtx(ctx, "level", "error", "message", fmt.Sprintf("failed to cordon %s node with error %s", typeOfNode, err))
r.event.Warn(ctx, awsCluster, "CordoningFailed", fmt.Sprintf("failed to cordon %s node %s with error %s", typeOfNode, node.GetName(), err))
} else {

// Log the node as cordoned
r.logger.LogCtx(ctx, "level", "debug", "message", fmt.Sprintf("cordoned %s node", typeOfNode))

// It means the cordoning was successful, proceed with the draining
// The draining function is going to block until the draining is successful
// or a timeout happens (whichever happens first)
if err := drain.RunNodeDrain(&nodeShutdownHelper, nodeName); err != nil {
}

// This means the draining failed
// Log it
r.logger.LogCtx(ctx, "level", "error", "message", fmt.Sprintf("failed to drain %s node with error %s", typeOfNode, err))
r.event.Warn(ctx, awsCluster, "DrainingFailed", fmt.Sprintf("failed to drain %s node %s with error %s", typeOfNode, node.GetName(), err))
// Check if:
// - the node was already being drained
// - we are done with the draining of the specific node
if draining, ok := r.draining[nodeName]; ok {
select {
case drainingError := <-draining:

// log all the pods that could not be evicted or deleted
r.logUnevictedPods(k8sClient, ctx, awsCluster, typeOfNode, &node)
// It means we successfully drained a node
if drainingError == nil {
// Remove the node from the state
r.removeNodeFromState(nodeName)

// now set the timeout condition, which means the aws-operator will proceed to delete the node
r.logger.LogCtx(ctx, "level", "debug", "message", fmt.Sprintf("setting drainer config status of tenant cluster %s node to timeout condition", typeOfNode))
// update the node status to drained and return
return r.updateDrainerStatus(ctx, drainerConfig.Status.NewDrainedCondition(), drainerConfig, k8sClient)
}

drainerConfig.Status.Conditions = append(drainerConfig.Status.Conditions, drainerConfig.Status.NewTimeoutCondition())
// Otherwise we had an error, so set the condition to a timeout
err := r.updateDrainerStatus(ctx, drainerConfig.Status.NewTimeoutCondition(), drainerConfig, k8sClient)

err := r.client.Status().Update(ctx, &drainerConfig)
if err != nil {
return microerror.Mask(err)
// If updating the status of the drainer config succeeded
// then we are done
if err == nil {
r.removeNodeFromState(nodeName)
return nil
}

} else {
// Otherwise try again to drain the node
draining <- drainingError

// if we got here it means we have got no errors and the node is successfully drained
// Set the drainer status in the node so that the aws-operator can proceed with the deletion
// of the node
drainerConfig.Status.Conditions = append(drainerConfig.Status.Conditions, drainerConfig.Status.NewDrainedCondition())
// We need to pick a number here.
// Unfortunately there is no right amount of time to wait for the
// operation to complete. In various tests it seems a value
// between 10 and 5 is performing well. So picking the average and floring it
case <-time.After(7 * time.Second):
// we want to wait only for a max of N seconds, otherwise continue
}

// Now update the node status
err := r.client.Status().Update(ctx, &drainerConfig)
if err != nil {
return microerror.Mask(err)
}
r.logger.LogCtx(ctx, "level", "debug", "message", fmt.Sprintf("set drainer config status of tenant cluster %s node to drained condition", typeOfNode))
r.event.Info(ctx, awsCluster, "DrainingSucceeded", fmt.Sprintf("drained %s node %s successfully", typeOfNode, node.GetName()))
} else {

// drain async and add the status to the state
// Important run in a different go routine
go r.drainNodeAsync(nodeName, typeOfNode, ctx, *awsCluster, nodeShutdownHelper, node, k8sClient, drainerConfig)

}
}
break
}
Expand All @@ -217,20 +215,132 @@ func (r *Resource) EnsureCreated(ctx context.Context, obj interface{}) error {
return nil
}

// Removes the node from the shared state
func (r *Resource) removeNodeFromState(nodeName string) {
r.lock.Lock()
delete(r.draining, nodeName)
r.lock.Unlock()
}

// Update the drainer config status
func (r *Resource) updateDrainerStatus(ctx context.Context,
status v1alpha1.DrainerConfigStatusCondition,
drainerConfig v1alpha1.DrainerConfig, k8sClient kubernetes.Interface) error {

// Set the status
drainerConfig.Status.Conditions = append(drainerConfig.Status.Conditions, status)

// Update the CR
return r.client.Status().Update(ctx, &drainerConfig)

}

// Cordons a node in a blocking way
func (r *Resource) cordon(ctx context.Context,
awsCluster infrastructurev1alpha3.AWSCluster,
shutdownHelper drain.Helper,
node v1.Node, typeOfNode string) error {

// Cordon the node
if err := drain.RunCordonOrUncordon(&shutdownHelper, &node, true); err != nil {
r.logger.LogCtx(ctx, "level", "error", "message", fmt.Sprintf("failed to cordon %s node with error %s", typeOfNode, err))
r.event.Warn(ctx, &awsCluster, "CordoningFailed", fmt.Sprintf("failed to cordon %s node %s with error %s", typeOfNode, node.GetName(), err))
return err
}

// Log the node as cordoned
r.logger.LogCtx(ctx, "level", "debug", "message", fmt.Sprintf("cordoned %s node", typeOfNode))
return nil
}

// Shared method for draining a node
func (r *Resource) drainNode(nodeName string,
typeOfNode string,
ctx context.Context,
awsCluster infrastructurev1alpha3.AWSCluster,
shutdownHelper drain.Helper,
node v1.Node, k8sClient kubernetes.Interface,
drainerConfig v1alpha1.DrainerConfig) error {

// The draining function is going to block until the draining is successful
// or a timeout happens (whichever happens first)
if err := drain.RunNodeDrain(&shutdownHelper, nodeName); err != nil {

// This means the draining failed
// Log it
r.logger.LogCtx(ctx, "level", "error", "message", fmt.Sprintf("failed to drain %s node with error %s", typeOfNode, err))
r.event.Warn(ctx, &awsCluster, "DrainingFailed", fmt.Sprintf("failed to drain %s node %s with error %s", typeOfNode, node.GetName(), err))

// log all the pods that could not be evicted or deleted
r.logUnevictedPods(k8sClient, ctx, &awsCluster, typeOfNode, &node)

// Return the error
return microerror.Mask(err)

}

// Emit the events that the draining was successful
r.logger.LogCtx(ctx, "level", "debug", "message", fmt.Sprintf("set drainer config status of tenant cluster %s node to drained condition", typeOfNode))
r.event.Info(ctx, &awsCluster, "DrainingSucceeded", fmt.Sprintf("drained %s node %s successfully", typeOfNode, node.GetName()))

return nil

}

// Drain a node
func (r *Resource) drainNodeAsync(
nodeName string,
typeOfNode string,
ctx context.Context,
awsCluster infrastructurev1alpha3.AWSCluster,
shutdownHelper drain.Helper,
node v1.Node, k8sClient kubernetes.Interface,
drainerConfig v1alpha1.DrainerConfig) {

// Cordon the node
if err := r.cordon(ctx, awsCluster, shutdownHelper, node, typeOfNode); err != nil {
// return in case of failure so that we can retry
return
}

// Await channel
// Create a channel with a buffer, so that we don't block
await := make(chan error, 2)

// Add the node to the shared state because it's about to be drained
r.lock.Lock()
r.draining[nodeName] = await
r.lock.Unlock()

// Drain the node now
err := r.drainNode(nodeName, typeOfNode, ctx, awsCluster, shutdownHelper, node, k8sClient, drainerConfig)
await <- microerror.Mask(err)
}

// Checks whether a node is a master node
func nodeIsMaster(node *v1.Node) bool {

for key := range node.Labels {
for key, value := range node.Labels {

// New label
if key == "node-role.kubernetes.io/control-plane" {
return true
}

// Some master nodes seem to have this label
if key == "node.kubernetes.io/master" {
return true
}

// Deprecated label
if key == "node-role.kubernetes.io/master" {
return true
}

if key == "role" && value == "master" {
return true
}

}

return false
Expand Down
8 changes: 6 additions & 2 deletions service/controller/resource/drainer/delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,12 @@ func (r *Resource) EnsureDeleted(ctx context.Context, obj interface{}) error {
{
r.logger.LogCtx(ctx, "level", "debug", "message", "deleting tenant cluster node from Kubernetes API")

n := key.NodeNameFromDrainerConfig(drainerConfig)
err := k8sClient.CoreV1().Nodes().Delete(ctx, n, metav1.DeleteOptions{})
nodeName := key.NodeNameFromDrainerConfig(drainerConfig)

// make sure the entry in the state is removed
r.removeNodeFromState(nodeName)

err := k8sClient.CoreV1().Nodes().Delete(ctx, nodeName, metav1.DeleteOptions{})
if tenant.IsAPINotAvailable(err) {
r.logger.LogCtx(ctx, "level", "debug", "message", "did not delete tenant cluster node from Kubernetes API")
r.logger.LogCtx(ctx, "level", "debug", "message", "tenant cluster API is not available")
Expand Down
9 changes: 9 additions & 0 deletions service/controller/resource/drainer/resource.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package drainer

import (
"sync"

"github.com/giantswarm/microerror"
"github.com/giantswarm/micrologger"
"github.com/giantswarm/tenantcluster/v5/pkg/tenantcluster"
Expand All @@ -24,11 +26,16 @@ type Config struct {
TenantCluster tenantcluster.Interface
}

type NodeName = string

type Resource struct {
client client.Client
event event.Interface
logger micrologger.Logger
tenantCluster tenantcluster.Interface

lock sync.RWMutex
draining map[NodeName]chan error
}

func New(c Config) (*Resource, error) {
Expand All @@ -47,6 +54,8 @@ func New(c Config) (*Resource, error) {
event: c.Event,
logger: c.Logger,
tenantCluster: c.TenantCluster,
lock: sync.RWMutex{},
draining: make(map[string]chan error),
}

return r, nil
Expand Down

0 comments on commit 0efd1a4

Please sign in to comment.