Skip to content

Commit

Permalink
Add slog structured logging to main controllers
Browse files Browse the repository at this point in the history
    Integrated slog logging into the MirrorPeerSecretReconciler, MirrorPeerReconciler, and DRPolicyReconciler controllers.
    Improved error handling and logging for better observability and debugging.
    Replaced klog with slog for consistency across the codebase.

Changes include:

    Added Logger field to controller structs to pass and use slog.Logger.
    Enhanced logging messages to provide more context, including function names, object names, namespaces, and error details.

Signed-off-by: vbadrina <[email protected]>
  • Loading branch information
vbnrh committed Jun 19, 2024
1 parent acfbd5a commit 33c042b
Show file tree
Hide file tree
Showing 13 changed files with 478 additions and 290 deletions.
275 changes: 151 additions & 124 deletions controllers/common_controller_utils.go

Large diffs are not rendered by default.

6 changes: 4 additions & 2 deletions controllers/common_controller_utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"context"
"encoding/json"
"fmt"
"log/slog"
"os"
"reflect"
"testing"
Expand Down Expand Up @@ -262,12 +263,13 @@ func TestMirrorPeerSecretReconcile(t *testing.T) {
}

fakeClient := getFakeClient(t, mgrScheme)
fakeLogger := slog.New(slog.NewTextHandler(os.Stdout, nil))
for _, c := range cases {
os.Setenv("POD_NAMESPACE", c.ramenNamespace)
ctx := context.TODO()
err := createOrUpdateSecretsFromInternalSecret(ctx, fakeClient, fakeS3InternalSecret(t, TestSourceManagedClusterEast), fakeMirrorPeers(c.manageS3))
err := createOrUpdateSecretsFromInternalSecret(ctx, fakeClient, fakeS3InternalSecret(t, TestSourceManagedClusterEast), fakeMirrorPeers(c.manageS3), fakeLogger)
assert.NoError(t, err)
err = createOrUpdateSecretsFromInternalSecret(ctx, fakeClient, fakeS3InternalSecret(t, TestDestinationManagedClusterWest), fakeMirrorPeers(c.manageS3))
err = createOrUpdateSecretsFromInternalSecret(ctx, fakeClient, fakeS3InternalSecret(t, TestDestinationManagedClusterWest), fakeMirrorPeers(c.manageS3), fakeLogger)
assert.NoError(t, err)

if c.ignoreS3Profile {
Expand Down
83 changes: 55 additions & 28 deletions controllers/drpolicy_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"encoding/json"
"fmt"
"log/slog"
"time"

"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
Expand All @@ -12,13 +13,11 @@ import (
ramenv1alpha1 "github.com/ramendr/ramen/api/v1alpha1"
multiclusterv1alpha1 "github.com/red-hat-storage/odf-multicluster-orchestrator/api/v1alpha1"
"github.com/red-hat-storage/odf-multicluster-orchestrator/controllers/utils"
"k8s.io/apimachinery/pkg/api/errors"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
"k8s.io/klog/v2"
workv1 "open-cluster-management.io/api/work/v1"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
Expand Down Expand Up @@ -46,87 +45,103 @@ const (
type DRPolicyReconciler struct {
HubClient client.Client
Scheme *runtime.Scheme
Logger *slog.Logger
}

// SetupWithManager sets up the controller with the Manager.
func (r *DRPolicyReconciler) SetupWithManager(mgr ctrl.Manager) error {
r.Logger.Info("Setting up DRPolicyReconciler with manager")

dpPredicate := utils.ComposePredicates(predicate.GenerationChangedPredicate{})

return ctrl.NewControllerManagedBy(mgr).
For(&ramenv1alpha1.DRPolicy{}, builder.WithPredicates(dpPredicate)).
Complete(r)
}

func (r *DRPolicyReconciler) getMirrorPeerForClusterSet(ctx context.Context, clusterSet []string) (*multiclusterv1alpha1.MirrorPeer, error) {
logger := r.Logger.With("method", "getMirrorPeerForClusterSet", "ClusterSet", clusterSet)

var mpList multiclusterv1alpha1.MirrorPeerList
err := r.HubClient.List(ctx, &mpList)
if err != nil {
klog.Error("could not list mirrorpeers on hub")
logger.Error("Could not list MirrorPeers on hub", "error", err)
return nil, err
}

if len(mpList.Items) == 0 {
klog.Info("no mirrorpeers found on hub yet")
logger.Info("No MirrorPeers found on hub yet")
return nil, k8serrors.NewNotFound(schema.GroupResource{Group: multiclusterv1alpha1.GroupVersion.Group, Resource: "MirrorPeer"}, "MirrorPeerList")
}

for _, mp := range mpList.Items {
if (mp.Spec.Items[0].ClusterName == clusterSet[0] && mp.Spec.Items[1].ClusterName == clusterSet[1]) ||
(mp.Spec.Items[1].ClusterName == clusterSet[0] && mp.Spec.Items[0].ClusterName == clusterSet[1]) {
klog.Infof("found mirrorpeer %q for drpolicy", mp.Name)
logger.Info("Found MirrorPeer for DRPolicy", "MirrorPeerName", mp.Name)
return &mp, nil
}
}

klog.Info("could not find any mirrorpeer for drpolicy")
logger.Info("Could not find any MirrorPeer for DRPolicy")
return nil, k8serrors.NewNotFound(schema.GroupResource{Group: multiclusterv1alpha1.GroupVersion.Group, Resource: "MirrorPeer"}, fmt.Sprintf("ClusterSet-%s-%s", clusterSet[0], clusterSet[1]))
}

func (r *DRPolicyReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
klog.Infof("running DRPolicy reconciler on hub cluster")
// Fetch DRPolicy for given Request
r.Logger.Info("Running DRPolicy reconciler on hub cluster", "RequestNamespace", req.Namespace, "RequestName", req.Name)

// Fetch DRPolicy for the given request
var drpolicy ramenv1alpha1.DRPolicy
err := r.HubClient.Get(ctx, req.NamespacedName, &drpolicy)
if err != nil {
if errors.IsNotFound(err) {
klog.Info("Could not find DRPolicy. Ignoring since object must have been deleted")
if k8serrors.IsNotFound(err) {
r.Logger.Info("DRPolicy not found. Ignoring since the object must have been deleted", "RequestNamespace", req.Namespace, "RequestName", req.Name)
return ctrl.Result{}, nil
}
klog.Error(err, "Failed to get DRPolicy")
r.Logger.Error("Failed to get DRPolicy", "error", err, "RequestNamespace", req.Namespace, "RequestName", req.Name)
return ctrl.Result{}, err
}

// find mirrorpeer for clusterset for the storagecluster namespaces
// Find MirrorPeer for clusterset for the storagecluster namespaces
mirrorPeer, err := r.getMirrorPeerForClusterSet(ctx, drpolicy.Spec.DRClusters)
if err != nil {
if k8serrors.IsNotFound(err) {
r.Logger.Info("MirrorPeer not found. Requeuing", "DRClusters", drpolicy.Spec.DRClusters)
return ctrl.Result{RequeueAfter: time.Second * 10}, nil
}
klog.Error("error occurred while trying to fetch MirrorPeer for given DRPolicy")
r.Logger.Error("Error occurred while trying to fetch MirrorPeer for given DRPolicy", "error", err)
return ctrl.Result{}, err
}

if mirrorPeer.Spec.Type == multiclusterv1alpha1.Async {
clusterFSIDs := make(map[string]string)
klog.Infof("Fetching clusterFSIDs")
r.Logger.Info("Fetching cluster FSIDs")
err = r.fetchClusterFSIDs(ctx, mirrorPeer, clusterFSIDs)
if err != nil {
if errors.IsNotFound(err) {
if k8serrors.IsNotFound(err) {
r.Logger.Info("Cluster FSIDs not found, requeuing")
return ctrl.Result{RequeueAfter: 10 * time.Second}, nil
}
return ctrl.Result{}, fmt.Errorf("an unknown error occured while fetching the cluster fsids, retrying again: %v", err)
r.Logger.Error("An unknown error occurred while fetching the cluster FSIDs, retrying", "error", err)
return ctrl.Result{}, fmt.Errorf("an unknown error occurred while fetching the cluster FSIDs, retrying: %v", err)
}

err = r.createOrUpdateManifestWorkForVRC(ctx, mirrorPeer, &drpolicy, clusterFSIDs)
if err != nil {
r.Logger.Error("Failed to create VolumeReplicationClass via ManifestWork", "error", err)
return ctrl.Result{}, fmt.Errorf("failed to create VolumeReplicationClass via ManifestWork: %v", err)
}
}

r.Logger.Info("Successfully reconciled DRPolicy", "RequestNamespace", req.Namespace, "RequestName", req.Name)
return ctrl.Result{}, nil
}

func (r *DRPolicyReconciler) createOrUpdateManifestWorkForVRC(ctx context.Context, mp *multiclusterv1alpha1.MirrorPeer, dp *ramenv1alpha1.DRPolicy, clusterFSIDs map[string]string) error {
logger := r.Logger.With("method", "createOrUpdateManifestWorkForVRC", "DRPolicy", dp.Name, "MirrorPeer", mp.Name)

replicationId, err := utils.CreateUniqueReplicationId(clusterFSIDs)
if err != nil {
logger.Error("Failed to create unique replication ID", "error", err)
return err
}

Expand All @@ -144,9 +159,9 @@ func (r *DRPolicyReconciler) createOrUpdateManifestWorkForVRC(ctx context.Contex

switch {
case err == nil:
klog.Infof("%s already exists. updating...", manifestWorkName)
case !errors.IsNotFound(err):
klog.Error(err, "failed to get ManifestWork: %s", manifestWorkName)
logger.Info("ManifestWork already exists, updating", "ManifestWorkName", manifestWorkName)
case !k8serrors.IsNotFound(err):
logger.Error("Failed to get ManifestWork", "ManifestWorkName", manifestWorkName, "error", err)
return err
}

Expand All @@ -162,7 +177,8 @@ func (r *DRPolicyReconciler) createOrUpdateManifestWorkForVRC(ctx context.Contex
labels[fmt.Sprintf(RamenLabelTemplate, "maintenancemodes")] = "Failover"
vrc := replicationv1alpha1.VolumeReplicationClass{
TypeMeta: metav1.TypeMeta{
Kind: "VolumeReplicationClass", APIVersion: "replication.storage.openshift.io/v1alpha1",
Kind: "VolumeReplicationClass",
APIVersion: "replication.storage.openshift.io/v1alpha1",
},
ObjectMeta: metav1.ObjectMeta{
Name: vrcName,
Expand All @@ -179,6 +195,7 @@ func (r *DRPolicyReconciler) createOrUpdateManifestWorkForVRC(ctx context.Contex

objJson, err := json.Marshal(vrc)
if err != nil {
logger.Error("Failed to marshal VolumeReplicationClass to JSON", "VolumeReplicationClass", vrcName, "error", err)
return fmt.Errorf("failed to marshal %v to JSON, error %w", vrc, err)
}

Expand Down Expand Up @@ -210,7 +227,7 @@ func (r *DRPolicyReconciler) createOrUpdateManifestWorkForVRC(ctx context.Contex
mw := workv1.ManifestWork{
ObjectMeta: metav1.ObjectMeta{
Name: manifestWorkName,
Namespace: pr.ClusterName, //target cluster
Namespace: pr.ClusterName,
OwnerReferences: []metav1.OwnerReference{
{
Kind: dp.Kind,
Expand All @@ -221,6 +238,7 @@ func (r *DRPolicyReconciler) createOrUpdateManifestWorkForVRC(ctx context.Contex
},
},
}

_, err = controllerutil.CreateOrUpdate(ctx, r.HubClient, &mw, func() error {
mw.Spec = workv1.ManifestWorkSpec{
Workload: workv1.ManifestsTemplate{
Expand All @@ -231,35 +249,44 @@ func (r *DRPolicyReconciler) createOrUpdateManifestWorkForVRC(ctx context.Contex
})

if err != nil {
klog.Error(err, "failed to create/update ManifestWork: %s", manifestWorkName)
logger.Error("Failed to create/update ManifestWork", "ManifestWorkName", manifestWorkName, "error", err)
return err
}

klog.Infof("ManifestWork created for %s", vrcName)
logger.Info("ManifestWork created/updated successfully", "ManifestWorkName", manifestWorkName, "VolumeReplicationClassName", vrcName)
}

return nil
}

func (r *DRPolicyReconciler) fetchClusterFSIDs(ctx context.Context, peer *multiclusterv1alpha1.MirrorPeer, clusterFSIDs map[string]string) error {
logger := r.Logger.With("method", "fetchClusterFSIDs", "MirrorPeer", peer.Name)

for _, pr := range peer.Spec.Items {
rookSecretName := utils.GetSecretNameByPeerRef(pr)
klog.Info("Fetching rook secret ", "Secret Name:", rookSecretName)
logger.Info("Fetching rook secret", "SecretName", rookSecretName, "ClusterName", pr.ClusterName)

hs, err := utils.FetchSecretWithName(ctx, r.HubClient, types.NamespacedName{Name: rookSecretName, Namespace: pr.ClusterName})
if err != nil {
if errors.IsNotFound(err) {
klog.Infof("could not find secret %q. will attempt to fetch it again after a delay", rookSecretName)
if k8serrors.IsNotFound(err) {
logger.Info("Secret not found, will attempt to fetch again after a delay", "SecretName", rookSecretName, "ClusterName", pr.ClusterName)
return err
}
logger.Error("Failed to fetch rook secret", "SecretName", rookSecretName, "ClusterName", pr.ClusterName, "error", err)
return err
}
klog.Info("Unmarshalling rook secret ", "Secret Name:", rookSecretName)

logger.Info("Unmarshalling rook secret", "SecretName", rookSecretName, "ClusterName", pr.ClusterName)
rt, err := utils.UnmarshalHubSecret(hs)
if err != nil {
klog.Error(err, "Failed to unmarshal rook secret", "Secret", rookSecretName)
logger.Error("Failed to unmarshal rook secret", "SecretName", rookSecretName, "ClusterName", pr.ClusterName, "error", err)
return err
}

clusterFSIDs[pr.ClusterName] = rt.FSID
logger.Info("Successfully fetched FSID for cluster", "ClusterName", pr.ClusterName, "FSID", rt.FSID)
}

logger.Info("Successfully fetched all cluster FSIDs", "MirrorPeer", peer.Name)
return nil
}
3 changes: 3 additions & 0 deletions controllers/drpolicy_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package controllers
import (
"context"
"fmt"
"log/slog"
"os"
"testing"

ramenv1alpha1 "github.com/ramendr/ramen/api/v1alpha1"
Expand Down Expand Up @@ -146,6 +148,7 @@ func getFakeDRPolicyReconciler(drpolicy *ramenv1alpha1.DRPolicy, mp *multicluste
r := DRPolicyReconciler{
HubClient: fakeClient,
Scheme: scheme,
Logger: slog.New(slog.NewTextHandler(os.Stdout, nil)),
}

return r
Expand Down
5 changes: 5 additions & 0 deletions controllers/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package controllers
import (
"context"
"flag"
"log/slog"
"os"

consolev1alpha1 "github.com/openshift/api/console/v1alpha1"
Expand Down Expand Up @@ -120,9 +121,11 @@ func (o *ManagerOptions) runManager() {

namespace := os.Getenv("POD_NAMESPACE")

logger := slog.New(slog.NewTextHandler(os.Stdout, nil))
if err = (&MirrorPeerReconciler{
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),
Logger: logger,
}).SetupWithManager(mgr); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "MirrorPeer")
os.Exit(1)
Expand All @@ -132,6 +135,7 @@ func (o *ManagerOptions) runManager() {
if err = (&MirrorPeerSecretReconciler{
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),
Logger: logger,
}).SetupWithManager(mgr); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "MirrorPeer")
os.Exit(1)
Expand Down Expand Up @@ -224,6 +228,7 @@ func (o *ManagerOptions) runManager() {
if err = (&DRPolicyReconciler{
HubClient: mgr.GetClient(),
Scheme: mgr.GetScheme(),
Logger: logger,
}).SetupWithManager(mgr); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "DRPolicy")
os.Exit(1)
Expand Down
Loading

0 comments on commit 33c042b

Please sign in to comment.