From 1319361582f41ffa20e5c31cea8f1743c130d312 Mon Sep 17 00:00:00 2001 From: Daniel Sinai Date: Thu, 27 Jun 2024 17:07:56 +0300 Subject: [PATCH] fix fatal both in client and in unstructured (#73) * fix fatal both in client and in unstructured * enabling injecting portClient * fix: remove mutex * drill down application config * fix usage of fixtures in tests * using a more performent deep copy * using centric config * chore: cr fixes --- main.go | 13 +---- pkg/crd/crd_test.go | 20 ++----- pkg/defaults/defaults_test.go | 7 +-- pkg/event_handler/event_listener_factory.go | 1 + pkg/event_handler/polling/polling_test.go | 14 ++--- pkg/handlers/controllers.go | 6 +- pkg/jq/parser.go | 6 -- pkg/k8s/controller.go | 15 +++-- pkg/k8s/controller_test.go | 65 ++++++++++++--------- pkg/port/cli/client.go | 13 ++++- 10 files changed, 75 insertions(+), 85 deletions(-) diff --git a/main.go b/main.go index c98aec0..82cf83c 100644 --- a/main.go +++ b/main.go @@ -25,9 +25,6 @@ func initiateHandler(exporterConfig *port.Config, k8sClient *k8s.Client, portCli } - cli.WithDeleteDependents(i.Config.DeleteDependents)(portClient) - cli.WithCreateMissingRelatedEntities(i.Config.CreateMissingRelatedEntities)(portClient) - newHandler := handlers.NewControllersHandler(exporterConfig, i.Config, k8sClient, portClient) newHandler.Handle() @@ -50,15 +47,7 @@ func main() { if err != nil { klog.Fatalf("Error building K8s client: %s", err.Error()) } - - portClient, err := cli.New(config.ApplicationConfig.PortBaseURL, - cli.WithClientID(config.ApplicationConfig.PortClientId), cli.WithClientSecret(config.ApplicationConfig.PortClientSecret), - cli.WithHeader("User-Agent", fmt.Sprintf("port-k8s-exporter/^0.3.4 (statekey/%s)", applicationConfig.StateKey)), - ) - - if err != nil { - klog.Fatalf("Error building Port client: %s", err.Error()) - } + portClient := cli.New(config.ApplicationConfig) if err := defaults.InitIntegration(portClient, applicationConfig); err != nil { klog.Fatalf("Error initializing Port integration: %s", err.Error()) diff --git a/pkg/crd/crd_test.go b/pkg/crd/crd_test.go index 93c1c42..7c63150 100644 --- a/pkg/crd/crd_test.go +++ b/pkg/crd/crd_test.go @@ -26,7 +26,7 @@ func deleteDefaultResources(portClient *cli.PortClient) { _ = blueprint.DeleteBlueprint(portClient, "testkind") } -func newFixture(t *testing.T, portClientId string, portClientSecret string, userAgent string, namespaced bool, crdsDiscoveryPattern string) *Fixture { +func newFixture(t *testing.T, userAgent string, namespaced bool, crdsDiscoveryPattern string) *Fixture { apiExtensionsFakeClient := fakeapiextensionsv1.FakeApiextensionsV1{Fake: &clienttesting.Fake{}} apiExtensionsFakeClient.AddReactor("list", "customresourcedefinitions", func(action clienttesting.Action) (handled bool, ret runtime.Object, err error) { @@ -100,22 +100,12 @@ func newFixture(t *testing.T, portClientId string, portClientSecret string, user return true, fakeCrd, nil }) - if portClientId == "" { - portClientId = config.ApplicationConfig.PortClientId - } - if portClientSecret == "" { - portClientSecret = config.ApplicationConfig.PortClientSecret - } if userAgent == "" { userAgent = "port-k8s-exporter/0.1" } - portClient, err := cli.New(config.ApplicationConfig.PortBaseURL, cli.WithHeader("User-Agent", userAgent), - cli.WithClientID(portClientId), cli.WithClientSecret(portClientSecret)) + portClient := cli.New(config.ApplicationConfig) deleteDefaultResources(portClient) - if err != nil { - t.Errorf("Error building Port client: %s", err.Error()) - } return &Fixture{ t: t, @@ -271,7 +261,7 @@ func checkBlueprintAndActionsProperties(t *testing.T, f *Fixture, namespaced boo } func TestCRD_crd_autoDiscoverCRDsToActionsClusterScoped(t *testing.T) { - f := newFixture(t, "", "", "", false, "true") + f := newFixture(t, "", false, "true") AutodiscoverCRDsToActions(f.portConfig, f.apiextensionClient, f.portClient) @@ -281,7 +271,7 @@ func TestCRD_crd_autoDiscoverCRDsToActionsClusterScoped(t *testing.T) { } func TestCRD_crd_autoDiscoverCRDsToActionsNamespaced(t *testing.T) { - f := newFixture(t, "", "", "", true, "true") + f := newFixture(t, "", true, "true") AutodiscoverCRDsToActions(f.portConfig, f.apiextensionClient, f.portClient) @@ -291,7 +281,7 @@ func TestCRD_crd_autoDiscoverCRDsToActionsNamespaced(t *testing.T) { } func TestCRD_crd_autoDiscoverCRDsToActionsNoCRDs(t *testing.T) { - f := newFixture(t, "", "", "", false, "false") + f := newFixture(t, "", false, "false") AutodiscoverCRDsToActions(f.portConfig, f.apiextensionClient, f.portClient) diff --git a/pkg/defaults/defaults_test.go b/pkg/defaults/defaults_test.go index 2987212..94cfb92 100644 --- a/pkg/defaults/defaults_test.go +++ b/pkg/defaults/defaults_test.go @@ -1,7 +1,6 @@ package defaults import ( - "fmt" "testing" guuid "github.com/google/uuid" @@ -23,11 +22,7 @@ type Fixture struct { func NewFixture(t *testing.T) *Fixture { stateKey := guuid.NewString() - portClient, err := cli.New(config.ApplicationConfig.PortBaseURL, cli.WithHeader("User-Agent", fmt.Sprintf("port-k8s-exporter/0.1 (statekey/%s)", stateKey)), - cli.WithClientID(config.ApplicationConfig.PortClientId), cli.WithClientSecret(config.ApplicationConfig.PortClientSecret)) - if err != nil { - t.Errorf("Error building Port client: %s", err.Error()) - } + portClient := cli.New(config.ApplicationConfig) deleteDefaultResources(portClient, stateKey) return &Fixture{ diff --git a/pkg/event_handler/event_listener_factory.go b/pkg/event_handler/event_listener_factory.go index e84fdb1..05a3a7e 100644 --- a/pkg/event_handler/event_listener_factory.go +++ b/pkg/event_handler/event_listener_factory.go @@ -2,6 +2,7 @@ package event_handler import ( "fmt" + "github.com/port-labs/port-k8s-exporter/pkg/event_handler/consumer" "github.com/port-labs/port-k8s-exporter/pkg/event_handler/polling" "github.com/port-labs/port-k8s-exporter/pkg/port/cli" diff --git a/pkg/event_handler/polling/polling_test.go b/pkg/event_handler/polling/polling_test.go index 468bfd8..7f0fdf6 100644 --- a/pkg/event_handler/polling/polling_test.go +++ b/pkg/event_handler/polling/polling_test.go @@ -1,17 +1,17 @@ package polling import ( - "fmt" _ "github.com/port-labs/port-k8s-exporter/test_utils" + "testing" + "time" + guuid "github.com/google/uuid" "github.com/port-labs/port-k8s-exporter/pkg/config" "github.com/port-labs/port-k8s-exporter/pkg/port" "github.com/port-labs/port-k8s-exporter/pkg/port/cli" "github.com/port-labs/port-k8s-exporter/pkg/port/integration" "github.com/stretchr/testify/assert" - "testing" - "time" ) type Fixture struct { @@ -31,14 +31,10 @@ func (m *MockTicker) GetC() <-chan time.Time { func NewFixture(t *testing.T, c chan time.Time) *Fixture { stateKey := guuid.NewString() - portClient, err := cli.New(config.ApplicationConfig.PortBaseURL, cli.WithHeader("User-Agent", fmt.Sprintf("port-k8s-exporter/0.1 (statekey/%s)", stateKey)), - cli.WithClientID(config.ApplicationConfig.PortClientId), cli.WithClientSecret(config.ApplicationConfig.PortClientSecret)) - if err != nil { - t.Errorf("Error building Port client: %s", err.Error()) - } + portClient := cli.New(config.ApplicationConfig) _ = integration.DeleteIntegration(portClient, stateKey) - err = integration.CreateIntegration(portClient, stateKey, "", &port.IntegrationAppConfig{ + err := integration.CreateIntegration(portClient, stateKey, "", &port.IntegrationAppConfig{ Resources: []port.Resource{}, }) if err != nil { diff --git a/pkg/handlers/controllers.go b/pkg/handlers/controllers.go index a816888..f7ef18a 100644 --- a/pkg/handlers/controllers.go +++ b/pkg/handlers/controllers.go @@ -2,9 +2,11 @@ package handlers import ( "context" - "github.com/port-labs/port-k8s-exporter/pkg/port/integration" "time" + "github.com/port-labs/port-k8s-exporter/pkg/config" + "github.com/port-labs/port-k8s-exporter/pkg/port/integration" + "github.com/port-labs/port-k8s-exporter/pkg/crd" "github.com/port-labs/port-k8s-exporter/pkg/goutils" "github.com/port-labs/port-k8s-exporter/pkg/k8s" @@ -51,7 +53,7 @@ func NewControllersHandler(exporterConfig *port.Config, portConfig *port.Integra } informer := informersFactory.ForResource(gvr) - controller := k8s.NewController(port.AggregatedResource{Kind: kind, KindConfigs: kindConfigs}, portClient, informer, portConfig) + controller := k8s.NewController(port.AggregatedResource{Kind: kind, KindConfigs: kindConfigs}, informer, portConfig, config.ApplicationConfig) controllers = append(controllers, controller) } diff --git a/pkg/jq/parser.go b/pkg/jq/parser.go index c329dfb..3edfa51 100644 --- a/pkg/jq/parser.go +++ b/pkg/jq/parser.go @@ -5,15 +5,12 @@ import ( "os" "reflect" "strings" - "sync" "github.com/itchyny/gojq" "github.com/port-labs/port-k8s-exporter/pkg/goutils" "k8s.io/klog/v2" ) -var mutex = &sync.Mutex{} - func runJQQuery(jqQuery string, obj interface{}) (interface{}, error) { query, err := gojq.Parse(jqQuery) if err != nil { @@ -30,10 +27,7 @@ func runJQQuery(jqQuery string, obj interface{}) (interface{}, error) { klog.Warningf("failed to compile jq query: %s", jqQuery) return nil, err } - - mutex.Lock() queryRes, ok := code.Run(obj).Next() - mutex.Unlock() if !ok { return nil, fmt.Errorf("query should return at least one value") diff --git a/pkg/k8s/controller.go b/pkg/k8s/controller.go index 83dc022..92d8b63 100644 --- a/pkg/k8s/controller.go +++ b/pkg/k8s/controller.go @@ -5,19 +5,21 @@ import ( "fmt" "time" + "hash/fnv" + "strconv" + "github.com/port-labs/port-k8s-exporter/pkg/config" "github.com/port-labs/port-k8s-exporter/pkg/jq" "github.com/port-labs/port-k8s-exporter/pkg/port" "github.com/port-labs/port-k8s-exporter/pkg/port/cli" "github.com/port-labs/port-k8s-exporter/pkg/port/mapping" - "hash/fnv" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" "k8s.io/client-go/informers" - "strconv" "encoding/json" + utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/tools/cache" @@ -49,7 +51,12 @@ type Controller struct { workqueue workqueue.RateLimitingInterface } -func NewController(resource port.AggregatedResource, portClient *cli.PortClient, informer informers.GenericInformer, integrationConfig *port.IntegrationAppConfig) *Controller { +func NewController(resource port.AggregatedResource, informer informers.GenericInformer, integrationConfig *port.IntegrationAppConfig, applicationConfig *config.ApplicationConfiguration) *Controller { + // We create a new Port client for each controller because the Resty client is not thread-safe. + portClient := cli.New(applicationConfig) + + cli.WithDeleteDependents(integrationConfig.DeleteDependents)(portClient) + cli.WithCreateMissingRelatedEntities(integrationConfig.CreateMissingRelatedEntities)(portClient) controller := &Controller{ Resource: resource, portClient: portClient, @@ -247,7 +254,7 @@ func (c *Controller) getObjectEntities(obj interface{}, selector port.Selector, return nil, nil, fmt.Errorf("error casting to unstructured") } var structuredObj interface{} - err := runtime.DefaultUnstructuredConverter.FromUnstructured(unstructuredObj.Object, &structuredObj) + err := runtime.DefaultUnstructuredConverter.FromUnstructured(unstructuredObj.DeepCopy().Object, &structuredObj) if err != nil { return nil, nil, fmt.Errorf("error converting from unstructured: %v", err) } diff --git a/pkg/k8s/controller_test.go b/pkg/k8s/controller_test.go index 4f56af9..a4cdf74 100644 --- a/pkg/k8s/controller_test.go +++ b/pkg/k8s/controller_test.go @@ -3,16 +3,16 @@ package k8s import ( "context" "fmt" - "github.com/port-labs/port-k8s-exporter/pkg/jq" - "github.com/stretchr/testify/assert" "reflect" "strings" "testing" "time" + "github.com/port-labs/port-k8s-exporter/pkg/jq" + "github.com/stretchr/testify/assert" + "github.com/port-labs/port-k8s-exporter/pkg/config" "github.com/port-labs/port-k8s-exporter/pkg/port" - "github.com/port-labs/port-k8s-exporter/pkg/port/cli" _ "github.com/port-labs/port-k8s-exporter/test_utils" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/runtime" @@ -21,8 +21,7 @@ import ( appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/apis/meta/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + v1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/dynamic/dynamicinformer" "k8s.io/client-go/tools/cache" ) @@ -39,7 +38,7 @@ type fixture struct { type fixtureConfig struct { portClientId string portClientSecret string - userAgent string + stateKey string sendRawDataExamples *bool resource port.Resource objects []runtime.Object @@ -60,25 +59,35 @@ func newFixture(t *testing.T, fixtureConfig *fixtureConfig) *fixture { } kubeclient := k8sfake.NewSimpleDynamicClient(runtime.NewScheme()) - if fixtureConfig.portClientId == "" { - fixtureConfig.portClientId = config.ApplicationConfig.PortClientId + newConfig := &config.ApplicationConfiguration{ + ConfigFilePath: config.ApplicationConfig.ConfigFilePath, + ResyncInterval: config.ApplicationConfig.ResyncInterval, + PortBaseURL: config.ApplicationConfig.PortBaseURL, + EventListenerType: config.ApplicationConfig.EventListenerType, + CreateDefaultResources: config.ApplicationConfig.CreateDefaultResources, + OverwriteConfigurationOnRestart: config.ApplicationConfig.OverwriteConfigurationOnRestart, + Resources: config.ApplicationConfig.Resources, + DeleteDependents: config.ApplicationConfig.DeleteDependents, + CreateMissingRelatedEntities: config.ApplicationConfig.CreateMissingRelatedEntities, + UpdateEntityOnlyOnDiff: config.ApplicationConfig.UpdateEntityOnlyOnDiff, + PortClientId: config.ApplicationConfig.PortClientId, + PortClientSecret: config.ApplicationConfig.PortClientSecret, + StateKey: config.ApplicationConfig.StateKey, } - if fixtureConfig.portClientSecret == "" { - fixtureConfig.portClientSecret = config.ApplicationConfig.PortClientSecret + + if fixtureConfig.portClientId != "" { + newConfig.PortClientId = fixtureConfig.portClientId } - if fixtureConfig.userAgent == "" { - fixtureConfig.userAgent = "port-k8s-exporter/0.1" + if fixtureConfig.portClientSecret != "" { + newConfig.PortClientSecret = fixtureConfig.portClientSecret } - - portClient, err := cli.New(config.ApplicationConfig.PortBaseURL, cli.WithHeader("User-Agent", fixtureConfig.userAgent), - cli.WithClientID(fixtureConfig.portClientId), cli.WithClientSecret(fixtureConfig.portClientSecret)) - if err != nil { - t.Errorf("Error building Port client: %s", err.Error()) + if fixtureConfig.stateKey != "" { + newConfig.StateKey = fixtureConfig.stateKey } return &fixture{ t: t, - controller: newController(fixtureConfig.resource, fixtureConfig.objects, portClient, kubeclient, interationConfig), + controller: newController(fixtureConfig.resource, fixtureConfig.objects, kubeclient, interationConfig, newConfig), } } @@ -101,16 +110,16 @@ func newDeployment() *appsv1.Deployment { "app": "port-k8s-exporter", } return &appsv1.Deployment{ - ObjectMeta: metav1.ObjectMeta{ + ObjectMeta: v1.ObjectMeta{ Name: "port-k8s-exporter", Namespace: "port-k8s-exporter", }, Spec: appsv1.DeploymentSpec{ - Selector: &metav1.LabelSelector{ + Selector: &v1.LabelSelector{ MatchLabels: labels, }, Template: corev1.PodTemplateSpec{ - ObjectMeta: metav1.ObjectMeta{ + ObjectMeta: v1.ObjectMeta{ Labels: labels, }, Spec: corev1.PodSpec{ @@ -132,7 +141,7 @@ func newDeploymentWithCustomLabels(generation int64, labels map[string]string, ) *appsv1.Deployment { return &appsv1.Deployment{ - ObjectMeta: metav1.ObjectMeta{ + ObjectMeta: v1.ObjectMeta{ Name: "port-k8s-exporter", Namespace: "port-k8s-exporter", GenerateName: generateName, @@ -140,11 +149,11 @@ func newDeploymentWithCustomLabels(generation int64, CreationTimestamp: creationTimestamp, }, Spec: appsv1.DeploymentSpec{ - Selector: &metav1.LabelSelector{ + Selector: &v1.LabelSelector{ MatchLabels: labels, }, Template: corev1.PodTemplateSpec{ - ObjectMeta: metav1.ObjectMeta{ + ObjectMeta: v1.ObjectMeta{ Labels: labels, }, Spec: corev1.PodSpec{ @@ -168,13 +177,13 @@ func newUnstructured(obj interface{}) *unstructured.Unstructured { return &unstructured.Unstructured{Object: res} } -func newController(resource port.Resource, objects []runtime.Object, portClient *cli.PortClient, kubeclient *k8sfake.FakeDynamicClient, integrationConfig *port.IntegrationAppConfig) *Controller { +func newController(resource port.Resource, objects []runtime.Object, kubeclient *k8sfake.FakeDynamicClient, integrationConfig *port.IntegrationAppConfig, applicationConfig *config.ApplicationConfiguration) *Controller { k8sI := dynamicinformer.NewDynamicSharedInformerFactory(kubeclient, noResyncPeriodFunc()) s := strings.SplitN(resource.Kind, "/", 3) gvr := schema.GroupVersionResource{Group: s[0], Version: s[1], Resource: s[2]} informer := k8sI.ForResource(gvr) kindConfig := port.KindConfig{Selector: resource.Selector, Port: resource.Port} - c := NewController(port.AggregatedResource{Kind: resource.Kind, KindConfigs: []port.KindConfig{kindConfig}}, portClient, informer, integrationConfig) + c := NewController(port.AggregatedResource{Kind: resource.Kind, KindConfigs: []port.KindConfig{kindConfig}}, informer, integrationConfig, applicationConfig) for _, d := range objects { informer.Informer().GetIndexer().Add(d) @@ -366,7 +375,7 @@ func TestDeleteDeploymentSameOwner(t *testing.T) { createItem := EventItem{Key: getKey(d, t), ActionType: CreateAction} item := EventItem{Key: getKey(d, t), ActionType: DeleteAction} - f := newFixture(t, &fixtureConfig{userAgent: fmt.Sprintf("port-k8s-exporter/0.1 (statekey/%s)", config.ApplicationConfig.StateKey), resource: resource, objects: objects}) + f := newFixture(t, &fixtureConfig{stateKey: config.ApplicationConfig.StateKey, resource: resource, objects: objects}) f.runControllerSyncHandler(createItem, false) f.runControllerSyncHandler(item, false) @@ -388,7 +397,7 @@ func TestDeleteDeploymentDifferentOwner(t *testing.T) { createItem := EventItem{Key: getKey(d, t), ActionType: CreateAction} item := EventItem{Key: getKey(d, t), ActionType: DeleteAction} - f := newFixture(t, &fixtureConfig{userAgent: fmt.Sprintf("statekey/%s", "non_exist_statekey") + "port-k8s-exporter", resource: resource, objects: objects}) + f := newFixture(t, &fixtureConfig{stateKey: "non_exist_statekey", resource: resource, objects: objects}) f.runControllerSyncHandler(createItem, false) f.runControllerSyncHandler(item, false) diff --git a/pkg/port/cli/client.go b/pkg/port/cli/client.go index ea98e70..2e2e578 100644 --- a/pkg/port/cli/client.go +++ b/pkg/port/cli/client.go @@ -7,6 +7,7 @@ import ( "strings" "github.com/go-resty/resty/v2" + "github.com/port-labs/port-k8s-exporter/pkg/config" "github.com/port-labs/port-k8s-exporter/pkg/port" ) @@ -21,10 +22,10 @@ type ( } ) -func New(baseURL string, opts ...Option) (*PortClient, error) { +func New(applicationConfig *config.ApplicationConfiguration, opts ...Option) *PortClient { c := &PortClient{ Client: resty.New(). - SetBaseURL(baseURL). + SetBaseURL(applicationConfig.PortBaseURL). SetRetryCount(5). SetRetryWaitTime(300). // retry when create permission fails because scopes are created async-ly and sometimes (mainly in tests) the scope doesn't exist yet. @@ -40,10 +41,16 @@ func New(baseURL string, opts ...Option) (*PortClient, error) { return err != nil || b["ok"] != true }), } + + WithClientID(applicationConfig.PortClientId)(c) + WithClientSecret(applicationConfig.PortClientSecret)(c) + WithHeader("User-Agent", fmt.Sprintf("port-k8s-exporter/^0.3.4 (statekey/%s)", applicationConfig.StateKey))(c) + for _, opt := range opts { opt(c) } - return c, nil + + return c } func (c *PortClient) Authenticate(ctx context.Context, clientID, clientSecret string) (string, error) {