Skip to content

Commit

Permalink
Fix reconciliation triggers. (#148)
Browse files Browse the repository at this point in the history
  • Loading branch information
Gerrit91 authored Apr 3, 2023
1 parent 9498ce5 commit 17d8025
Show file tree
Hide file tree
Showing 19 changed files with 427 additions and 375 deletions.
1 change: 0 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -76,4 +76,3 @@ $(CONTROLLER_GEN): $(LOCALBIN)
setup-envtest: $(ENVTEST)
$(ENVTEST): $(LOCALBIN)
test -s $(LOCALBIN)/setup-envtest || GOBIN=$(LOCALBIN) go install sigs.k8s.io/controller-runtime/tools/setup-envtest@latest
echo "after env installation"
5 changes: 2 additions & 3 deletions config/rbac/role.yaml
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@

---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
Expand All @@ -9,7 +8,7 @@ rules:
- apiGroups:
- metal-stack.io
resources:
- Droptailers
- clusterwidenetworkpolicies
verbs:
- create
- delete
Expand All @@ -21,7 +20,7 @@ rules:
- apiGroups:
- metal-stack.io
resources:
- Droptailers/status
- clusterwidenetworkpolicies/status
verbs:
- get
- patch
Expand Down
80 changes: 34 additions & 46 deletions controllers/clusterwidenetworkpolicy_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,8 @@ import (
"github.com/go-logr/logr"

corev1 "k8s.io/api/core/v1"
"k8s.io/client-go/util/workqueue"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

"k8s.io/apimachinery/pkg/types"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/event"
Expand All @@ -36,8 +35,6 @@ type ClusterwideNetworkPolicyReconciler struct {

Log logr.Logger

ExternalFirewallTrigger chan event.GenericEvent

Interval time.Duration
DnsProxy *dns.DNSProxy
SkipDNS bool
Expand All @@ -49,89 +46,76 @@ func (r *ClusterwideNetworkPolicyReconciler) SetupWithManager(mgr ctrl.Manager)
r.Interval = reconcilationInterval
}

if err := mgr.Add(r.getReconciliationTicker(r.ExternalFirewallTrigger)); err != nil {
scheduleChan := make(chan event.GenericEvent)
if err := mgr.Add(r.getReconciliationTicker(scheduleChan)); err != nil {
return fmt.Errorf("failed to add runnable to manager: %w", err)
}

return ctrl.NewControllerManagedBy(mgr).
For(&firewallv1.ClusterwideNetworkPolicy{}).
Watches(&source.Kind{Type: &corev1.Service{}}, handler.Funcs{
CreateFunc: func(ce event.CreateEvent, rli workqueue.RateLimitingInterface) {
r.Log.Info("requesting firewall reconcile due to service creation in shoot cluster")
r.ExternalFirewallTrigger <- event.GenericEvent{}
},
UpdateFunc: func(ue event.UpdateEvent, rli workqueue.RateLimitingInterface) {
r.Log.Info("requesting firewall reconcile due to service update in shoot cluster")
r.ExternalFirewallTrigger <- event.GenericEvent{}
},
DeleteFunc: func(de event.DeleteEvent, rli workqueue.RateLimitingInterface) {
r.Log.Info("requesting firewall reconcile due to service deletion in shoot cluster")
r.ExternalFirewallTrigger <- event.GenericEvent{}
},
}).
Watches(&source.Kind{Type: &corev1.Service{}}, &handler.EnqueueRequestForObject{}).
Watches(&source.Channel{Source: scheduleChan}, &handler.EnqueueRequestForObject{}).
Complete(r)
}

// Reconcile ClusterwideNetworkPolicy and creates nftables rules accordingly
// Reconcile ClusterwideNetworkPolicy and creates nftables rules accordingly.
// - services of type load balancer
//
// +kubebuilder:rbac:groups=metal-stack.io,resources=clusterwidenetworkpolicies,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=metal-stack.io,resources=clusterwidenetworkpolicies/status,verbs=get;update;patch
func (r *ClusterwideNetworkPolicyReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
log := r.Log.WithValues("cwnp", req.Name)

func (r *ClusterwideNetworkPolicyReconciler) Reconcile(ctx context.Context, _ ctrl.Request) (ctrl.Result, error) {
var cwnps firewallv1.ClusterwideNetworkPolicyList
if err := r.ShootClient.List(ctx, &cwnps, client.InNamespace(firewallv1.ClusterwideNetworkPolicyNamespace)); err != nil {
return done, err
return ctrl.Result{}, err
}

return r.reconcileRules(ctx, log, cwnps)
}

func (r *ClusterwideNetworkPolicyReconciler) reconcileRules(ctx context.Context, log logr.Logger, cwnps firewallv1.ClusterwideNetworkPolicyList) (ctrl.Result, error) {
var f firewallv2.Firewall
nn := types.NamespacedName{
Name: r.FirewallName,
Namespace: r.SeedNamespace,
f := &firewallv2.Firewall{
ObjectMeta: metav1.ObjectMeta{
Name: r.FirewallName,
Namespace: r.SeedNamespace,
},
}
if err := r.SeedClient.Get(ctx, nn, &f); err != nil {
return done, client.IgnoreNotFound(err)
if err := r.SeedClient.Get(ctx, client.ObjectKeyFromObject(f), f); err != nil {
return ctrl.Result{}, client.IgnoreNotFound(err)
}

// Set CWNP requeue interval
if interval, err := time.ParseDuration(f.Spec.Interval); err == nil {
r.Interval = interval
} else {
return done, fmt.Errorf("failed to parse Interval field: %w", err)
return ctrl.Result{}, fmt.Errorf("failed to parse Interval field: %w", err)
}

var services corev1.ServiceList
if err := r.ShootClient.List(ctx, &services); err != nil {
return done, err
return ctrl.Result{}, err
}
nftablesFirewall := nftables.NewFirewall(f, &cwnps, &services, r.DnsProxy, log)
if err := r.manageDNSProxy(ctx, log, f, cwnps, nftablesFirewall); err != nil {
return done, err
nftablesFirewall := nftables.NewFirewall(f, &cwnps, &services, r.DnsProxy, r.Log)
if err := r.manageDNSProxy(ctx, f, cwnps, nftablesFirewall); err != nil {
return ctrl.Result{}, err
}
updated, err := nftablesFirewall.Reconcile()
if err != nil {
return done, err
return ctrl.Result{}, err
}

if updated {
for _, i := range cwnps.Items {
o := i
if err := r.ShootClient.Status().Update(ctx, &o); err != nil {
return done, fmt.Errorf("failed to updated CWNP status: %w", err)
return ctrl.Result{}, fmt.Errorf("failed to updated CWNP status: %w", err)
}
}
}

return done, nil
return ctrl.Result{}, nil
}

// manageDNSProxy start DNS proxy if toFQDN rules are present
// if rules were deleted it will stop running DNS proxy
func (r *ClusterwideNetworkPolicyReconciler) manageDNSProxy(
ctx context.Context, log logr.Logger, f firewallv2.Firewall, cwnps firewallv1.ClusterwideNetworkPolicyList, nftablesFirewall *nftables.Firewall,
ctx context.Context, f *firewallv2.Firewall, cwnps firewallv1.ClusterwideNetworkPolicyList, nftablesFirewall *nftables.Firewall,
) (err error) {
// Skipping is needed for testing
if r.SkipDNS {
Expand All @@ -145,13 +129,13 @@ func (r *ClusterwideNetworkPolicyReconciler) manageDNSProxy(
}

if enableDNS && r.DnsProxy == nil {
log.Info("DNS Proxy is initialized")
r.Log.Info("DNS Proxy is initialized")
if r.DnsProxy, err = dns.NewDNSProxy(f.Spec.DNSPort, ctrl.Log.WithName("DNS proxy")); err != nil {
return fmt.Errorf("failed to init DNS proxy: %w", err)
}
go r.DnsProxy.Run(ctx)
} else if !enableDNS && r.DnsProxy != nil {
log.Info("DNS Proxy is stopped")
r.Log.Info("DNS Proxy is stopped")
r.DnsProxy.Stop()
r.DnsProxy = nil
}
Expand All @@ -166,6 +150,10 @@ func (r *ClusterwideNetworkPolicyReconciler) manageDNSProxy(
return nil
}

// TODO: the interval can change over the lifetime of a firewall resource
// in case the interval has changed nothing happens at the moment
// we need to implement the recreation of the ticker
//
// IMPORTANT!
// We shouldn't implement reconciliation loop by assigning RequeueAfter in result like it's done in Firewall controller.
// Here's the case when it would go bad:
Expand All @@ -178,14 +166,14 @@ func (r *ClusterwideNetworkPolicyReconciler) manageDNSProxy(
// 2. DNS Proxy is started by CWNP controller, and it will not be started until some CWNP resource is created/updated/deleted.
func (r *ClusterwideNetworkPolicyReconciler) getReconciliationTicker(scheduleChan chan<- event.GenericEvent) manager.RunnableFunc {
return func(ctx context.Context) error {
e := event.GenericEvent{}
e := event.GenericEvent{Object: &firewallv1.ClusterwideNetworkPolicy{}}
ticker := time.NewTicker(r.Interval)
defer ticker.Stop()

for {
select {
case <-ticker.C:
r.Log.Info("requesting firewall reconcile due to reconciliation ticker event")
r.Log.Info("requesting cwnp reconcile due to reconciliation ticker event")
scheduleChan <- e
case <-ctx.Done():
return nil
Expand Down
8 changes: 4 additions & 4 deletions controllers/clusterwidenetworkpolicy_validation_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ type ClusterwideNetworkPolicyValidationReconciler struct {
func (r *ClusterwideNetworkPolicyValidationReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
var clusterNP firewallv1.ClusterwideNetworkPolicy
if err := r.ShootClient.Get(ctx, req.NamespacedName, &clusterNP); err != nil {
return done, client.IgnoreNotFound(err)
return ctrl.Result{}, client.IgnoreNotFound(err)
}

// if network policy does not belong to the namespace where clusterwide network policies are stored:
Expand All @@ -40,7 +40,7 @@ func (r *ClusterwideNetworkPolicyValidationReconciler) Reconcile(ctx context.Con
"Unapplicable",
fmt.Sprintf("cluster wide network policies must be defined in namespace %s otherwise they won't take effect", firewallv1.ClusterwideNetworkPolicyNamespace),
)
return done, nil
return ctrl.Result{}, nil
}

err := clusterNP.Spec.Validate()
Expand All @@ -51,10 +51,10 @@ func (r *ClusterwideNetworkPolicyValidationReconciler) Reconcile(ctx context.Con
"Unapplicable",
fmt.Sprintf("cluster wide network policy is not valid: %v", err),
)
return done, nil
return ctrl.Result{}, nil
}

return done, nil
return ctrl.Result{}, nil
}

// SetupWithManager configures this controller to watch for ClusterwideNetworkPolicy CRD
Expand Down
Loading

0 comments on commit 17d8025

Please sign in to comment.