Skip to content

Commit

Permalink
Improve CCT error logging
Browse files Browse the repository at this point in the history
Signed-off-by: Stefan Büringer [email protected]
  • Loading branch information
sbueringer committed Jul 3, 2024
1 parent fd94039 commit beeb2f4
Showing 1 changed file with 33 additions and 32 deletions.
65 changes: 33 additions & 32 deletions controllers/remote/cluster_cache_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ func NewClusterCacheTracker(manager ctrl.Manager, options ClusterCacheTrackerOpt
func (t *ClusterCacheTracker) GetClient(ctx context.Context, cluster client.ObjectKey) (client.Client, error) {
accessor, err := t.getClusterAccessor(ctx, cluster)
if err != nil {
return nil, err
return nil, errors.Wrapf(err, "failed to get client")
}

return accessor.client, nil
Expand All @@ -198,7 +198,7 @@ func (t *ClusterCacheTracker) GetReader(ctx context.Context, cluster client.Obje
func (t *ClusterCacheTracker) GetRESTConfig(ctc context.Context, cluster client.ObjectKey) (*rest.Config, error) {
accessor, err := t.getClusterAccessor(ctc, cluster)
if err != nil {
return nil, err
return nil, errors.Wrapf(err, "failed to get REST config")
}

return accessor.config, nil
Expand All @@ -208,7 +208,7 @@ func (t *ClusterCacheTracker) GetRESTConfig(ctc context.Context, cluster client.
func (t *ClusterCacheTracker) GetEtcdClientCertificateKey(ctx context.Context, cluster client.ObjectKey) (*rsa.PrivateKey, error) {
accessor, err := t.getClusterAccessor(ctx, cluster)
if err != nil {
return nil, err
return nil, errors.Wrapf(err, "failed to get etcd client certificate key")
}

return accessor.etcdClientCertificateKey, nil
Expand Down Expand Up @@ -267,7 +267,7 @@ func (t *ClusterCacheTracker) getClusterAccessor(ctx context.Context, cluster cl
// for the cluster at the same time.
// Return an error if another go routine already tries to create a clusterAccessor.
if ok := t.clusterLock.TryLock(cluster); !ok {
return nil, errors.Wrapf(ErrClusterLocked, "failed to create cluster accessor: failed to get lock for cluster")
return nil, errors.Wrapf(ErrClusterLocked, "failed to create cluster accessor: failed to get lock for cluster (probably because another worker is trying to create the client at the moment)")
}
defer t.clusterLock.Unlock(cluster)

Expand Down Expand Up @@ -305,7 +305,7 @@ func (t *ClusterCacheTracker) newClusterAccessor(ctx context.Context, cluster cl
}

// Create a http client and a mapper for the cluster.
httpClient, mapper, err := t.createHTTPClientAndMapper(config, cluster)
httpClient, mapper, restClient, err := t.createHTTPClientAndMapper(ctx, config, cluster)
if err != nil {
return nil, errors.Wrapf(err, "error creating http client and mapper for remote cluster %q", cluster.String())
}
Expand Down Expand Up @@ -337,7 +337,7 @@ func (t *ClusterCacheTracker) newClusterAccessor(ctx context.Context, cluster cl
config.Host = inClusterConfig.Host

// Update the http client and the mapper to use in-cluster config.
httpClient, mapper, err = t.createHTTPClientAndMapper(config, cluster)
httpClient, mapper, restClient, err = t.createHTTPClientAndMapper(ctx, config, cluster)
if err != nil {
return nil, errors.Wrapf(err, "error creating http client and mapper (using in-cluster config) for remote cluster %q", cluster.String())
}
Expand All @@ -348,7 +348,7 @@ func (t *ClusterCacheTracker) newClusterAccessor(ctx context.Context, cluster cl
}

// Create a client and a cache for the cluster.
cachedClient, err := t.createCachedClient(ctx, config, cluster, httpClient, mapper)
cachedClient, err := t.createCachedClient(ctx, config, cluster, httpClient, restClient, mapper)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -397,28 +397,40 @@ func (t *ClusterCacheTracker) runningOnWorkloadCluster(ctx context.Context, c cl
}

// createHTTPClientAndMapper creates a http client and a dynamic rest mapper for the given cluster, based on the rest.Config.
func (t *ClusterCacheTracker) createHTTPClientAndMapper(config *rest.Config, cluster client.ObjectKey) (*http.Client, meta.RESTMapper, error) {
func (t *ClusterCacheTracker) createHTTPClientAndMapper(ctx context.Context, config *rest.Config, cluster client.ObjectKey) (*http.Client, meta.RESTMapper, *rest.RESTClient, error) {
// Create a http client for the cluster.
httpClient, err := rest.HTTPClientFor(config)
if err != nil {
return nil, nil, errors.Wrapf(err, "error creating client for remote cluster %q: error creating http client", cluster.String())
return nil, nil, nil, errors.Wrapf(err, "error creating client for remote cluster %q: error creating http client", cluster.String())
}

// Create a mapper for it
mapper, err := apiutil.NewDynamicRESTMapper(config, httpClient)
if err != nil {
return nil, nil, errors.Wrapf(err, "error creating client for remote cluster %q: error creating dynamic rest mapper", cluster.String())
return nil, nil, nil, errors.Wrapf(err, "error creating client for remote cluster %q: error creating dynamic rest mapper", cluster.String())
}

// Create a REST client for the cluster (this is later used for health checking as well).
codec := runtime.NoopEncoder{Decoder: scheme.Codecs.UniversalDecoder()}
restClientConfig := rest.CopyConfig(config)
restClientConfig.NegotiatedSerializer = serializer.NegotiatedSerializerWrapper(runtime.SerializerInfo{Serializer: codec})
restClient, err := rest.UnversionedRESTClientForConfigAndClient(restClientConfig, httpClient)
if err != nil {
return nil, nil, nil, errors.Wrapf(err, "error creating client for remote cluster %q: error creating REST client", cluster.String())
}

// Note: This checks if the apiserver is up. We do this already here to produce a clearer error message if the cluster is unreachable.
if _, err := restClient.Get().AbsPath("/").Timeout(healthCheckRequestTimeout).DoRaw(ctx); err != nil {
return nil, nil, nil, errors.Wrapf(err, "error creating client for remote cluster %q: cluster is not reachable", cluster.String())
}

// Verify if we can get a rest mapping from the workload cluster apiserver.
// Note: This also checks if the apiserver is up in general. We do this already here
// to avoid further effort creating a cache and a client and to produce a clearer error message.
_, err = mapper.RESTMapping(corev1.SchemeGroupVersion.WithKind("Node").GroupKind(), corev1.SchemeGroupVersion.Version)
if err != nil {
return nil, nil, errors.Wrapf(err, "error creating client for remote cluster %q: error getting rest mapping", cluster.String())
return nil, nil, nil, errors.Wrapf(err, "error creating client for remote cluster %q: error getting rest mapping", cluster.String())
}

return httpClient, mapper, nil
return httpClient, mapper, restClient, nil
}

// createUncachedClient creates an uncached client for the given cluster, based on the rest.Config.
Expand All @@ -442,7 +454,7 @@ type cachedClientOutput struct {
}

// createCachedClient creates a cached client for the given cluster, based on a rest.Config.
func (t *ClusterCacheTracker) createCachedClient(ctx context.Context, config *rest.Config, cluster client.ObjectKey, httpClient *http.Client, mapper meta.RESTMapper) (*cachedClientOutput, error) {
func (t *ClusterCacheTracker) createCachedClient(ctx context.Context, config *rest.Config, cluster client.ObjectKey, httpClient *http.Client, restClient *rest.RESTClient, mapper meta.RESTMapper) (*cachedClientOutput, error) {
// Create the cache for the remote cluster
cacheOptions := cache.Options{
HTTPClient: httpClient,
Expand Down Expand Up @@ -505,7 +517,7 @@ func (t *ClusterCacheTracker) createCachedClient(ctx context.Context, config *re
go t.healthCheckCluster(cacheCtx, &healthCheckInput{
cluster: cluster,
cfg: config,
httpClient: httpClient,
restClient: restClient,
})

return &cachedClientOutput{
Expand Down Expand Up @@ -568,13 +580,13 @@ func (t *ClusterCacheTracker) Watch(ctx context.Context, input WatchInput) error

accessor, err := t.getClusterAccessor(ctx, input.Cluster)
if err != nil {
return errors.Wrapf(err, "failed to add %s watch on cluster %s", input.Kind, klog.KRef(input.Cluster.Namespace, input.Cluster.Name))
return errors.Wrapf(err, "failed to add %T watch on cluster %s", input.Kind, klog.KRef(input.Cluster.Namespace, input.Cluster.Name))
}

// We have to lock the cluster, so that the watch is not created multiple times in parallel.
ok := t.clusterLock.TryLock(input.Cluster)
if !ok {
return errors.Wrapf(ErrClusterLocked, "failed to add %T watch on cluster %s: failed to get lock for cluster", input.Kind, klog.KRef(input.Cluster.Namespace, input.Cluster.Name))
return errors.Wrapf(ErrClusterLocked, "failed to add %T watch on cluster %s: failed to get lock for cluster (probably because another worker is trying to create the client at the moment)", input.Kind, klog.KRef(input.Cluster.Namespace, input.Cluster.Name))
}
defer t.clusterLock.Unlock(input.Cluster)

Expand All @@ -586,7 +598,7 @@ func (t *ClusterCacheTracker) Watch(ctx context.Context, input WatchInput) error

// Need to create the watch
if err := input.Watcher.Watch(source.Kind(accessor.cache, input.Kind, input.EventHandler, input.Predicates...)); err != nil {
return errors.Wrapf(err, "failed to add %s watch on cluster %s: failed to create watch", input.Kind, klog.KRef(input.Cluster.Namespace, input.Cluster.Name))
return errors.Wrapf(err, "failed to add %T watch on cluster %s: failed to create watch", input.Kind, klog.KRef(input.Cluster.Namespace, input.Cluster.Name))
}

accessor.watches.Insert(input.Name)
Expand All @@ -597,7 +609,7 @@ func (t *ClusterCacheTracker) Watch(ctx context.Context, input WatchInput) error
// healthCheckInput provides the input for the healthCheckCluster method.
type healthCheckInput struct {
cluster client.ObjectKey
httpClient *http.Client
restClient *rest.RESTClient
cfg *rest.Config
interval time.Duration
requestTimeout time.Duration
Expand Down Expand Up @@ -630,18 +642,7 @@ func (t *ClusterCacheTracker) healthCheckCluster(ctx context.Context, in *health

unhealthyCount := 0

// This gets us a client that can make raw http(s) calls to the remote apiserver. We only need to create it once
// and we can reuse it inside the polling loop.
codec := runtime.NoopEncoder{Decoder: scheme.Codecs.UniversalDecoder()}
cfg := rest.CopyConfig(in.cfg)
cfg.NegotiatedSerializer = serializer.NegotiatedSerializerWrapper(runtime.SerializerInfo{Serializer: codec})
restClient, restClientErr := rest.UnversionedRESTClientForConfigAndClient(cfg, in.httpClient)

runHealthCheckWithThreshold := func(ctx context.Context) (bool, error) {
if restClientErr != nil {
return false, restClientErr
}

cluster := &clusterv1.Cluster{}
if err := t.client.Get(ctx, in.cluster, cluster); err != nil {
if apierrors.IsNotFound(err) {
Expand Down Expand Up @@ -672,7 +673,7 @@ func (t *ClusterCacheTracker) healthCheckCluster(ctx context.Context, in *health

// An error here means there was either an issue connecting or the API returned an error.
// If no error occurs, reset the unhealthy counter.
_, err := restClient.Get().AbsPath(in.path).Timeout(in.requestTimeout).DoRaw(ctx)
_, err := in.restClient.Get().AbsPath(in.path).Timeout(in.requestTimeout).DoRaw(ctx)
if err != nil {
if apierrors.IsUnauthorized(err) {
// Unauthorized means that the underlying kubeconfig is not authorizing properly anymore, which
Expand Down

0 comments on commit beeb2f4

Please sign in to comment.