Skip to content

Commit

Permalink
Merge pull request #72 from port-labs/PORT-8710-git-git-hub-bitbucket…
Browse files Browse the repository at this point in the history
…-data-k-8-s-source-add-live-examples

Port 8710 git git hub bitbucket data k 8 s source add live examples
  • Loading branch information
talsabagport authored Jun 24, 2024
2 parents 5f61510 + 0318c82 commit 2b4c0bf
Show file tree
Hide file tree
Showing 7 changed files with 232 additions and 80 deletions.
30 changes: 19 additions & 11 deletions pkg/handlers/controllers.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package handlers

import (
"context"
"github.com/port-labs/port-k8s-exporter/pkg/port/integration"
"time"

"github.com/port-labs/port-k8s-exporter/pkg/crd"
Expand Down Expand Up @@ -74,8 +75,24 @@ func (c *ControllersHandler) Handle() {
klog.Fatalf("Error while waiting for informer cache sync: %s", err.Error())
}
}

currentEntitiesSet := make([]map[string]interface{}, 0)
for _, controller := range c.controllers {
controllerEntitiesSet, rawDataExamples, err := controller.GetEntitiesSet()
if err != nil {
klog.Errorf("error getting controller entities set: %s", err.Error())
}
currentEntitiesSet = append(currentEntitiesSet, controllerEntitiesSet)
if len(rawDataExamples) > 0 {
err = integration.PostIntegrationKindExample(c.portClient, c.stateKey, controller.Resource.Kind, rawDataExamples)
if err != nil {
klog.Warningf("failed to post integration kind example: %s", err.Error())
}
}
}

klog.Info("Deleting stale entities")
c.RunDeleteStaleEntities()
c.RunDeleteStaleEntities(currentEntitiesSet)
klog.Info("Starting controllers")
for _, controller := range c.controllers {
controller.Run(1, c.stopCh)
Expand All @@ -91,16 +108,7 @@ func (c *ControllersHandler) Handle() {
}()
}

func (c *ControllersHandler) RunDeleteStaleEntities() {
currentEntitiesSet := make([]map[string]interface{}, 0)
for _, controller := range c.controllers {
controllerEntitiesSet, err := controller.GetEntitiesSet()
if err != nil {
klog.Errorf("error getting controller entities set: %s", err.Error())
}
currentEntitiesSet = append(currentEntitiesSet, controllerEntitiesSet)
}

func (c *ControllersHandler) RunDeleteStaleEntities(currentEntitiesSet []map[string]interface{}) {
_, err := c.portClient.Authenticate(context.Background(), c.portClient.ClientID, c.portClient.ClientSecret)
if err != nil {
klog.Errorf("error authenticating with Port: %v", err)
Expand Down
91 changes: 50 additions & 41 deletions pkg/k8s/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,11 @@ import (
type EventActionType string

const (
CreateAction EventActionType = "create"
UpdateAction EventActionType = "update"
DeleteAction EventActionType = "delete"
MaxNumRequeues int = 4
CreateAction EventActionType = "create"
UpdateAction EventActionType = "update"
DeleteAction EventActionType = "delete"
MaxNumRequeues int = 4
MaxRawDataExamplesToSend = 5
)

type EventItem struct {
Expand All @@ -40,20 +41,22 @@ type EventItem struct {
}

type Controller struct {
resource port.AggregatedResource
portClient *cli.PortClient
informer cache.SharedIndexInformer
lister cache.GenericLister
workqueue workqueue.RateLimitingInterface
Resource port.AggregatedResource
portClient *cli.PortClient
integrationConfig *port.IntegrationAppConfig
informer cache.SharedIndexInformer
lister cache.GenericLister
workqueue workqueue.RateLimitingInterface
}

func NewController(resource port.AggregatedResource, portClient *cli.PortClient, informer informers.GenericInformer, integrationConfig *port.IntegrationAppConfig) *Controller {
controller := &Controller{
resource: resource,
portClient: portClient,
informer: informer.Informer(),
lister: informer.Lister(),
workqueue: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()),
Resource: resource,
portClient: portClient,
integrationConfig: integrationConfig,
informer: informer.Informer(),
lister: informer.Lister(),
workqueue: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()),
}

controller.informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
Expand Down Expand Up @@ -96,13 +99,13 @@ func NewController(resource port.AggregatedResource, portClient *cli.PortClient,
}

func (c *Controller) Shutdown() {
klog.Infof("Shutting down controller for resource '%s'", c.resource.Kind)
klog.Infof("Shutting down controller for resource '%s'", c.Resource.Kind)
c.workqueue.ShutDown()
klog.Infof("Closed controller for resource '%s'", c.resource.Kind)
klog.Infof("Closed controller for resource '%s'", c.Resource.Kind)
}

func (c *Controller) WaitForCacheSync(stopCh <-chan struct{}) error {
klog.Infof("Waiting for informer cache to sync for resource '%s'", c.resource.Kind)
klog.Infof("Waiting for informer cache to sync for resource '%s'", c.Resource.Kind)
if ok := cache.WaitForCacheSync(stopCh, c.informer.HasSynced); !ok {
return fmt.Errorf("failed to wait for caches to sync")
}
Expand All @@ -113,11 +116,11 @@ func (c *Controller) WaitForCacheSync(stopCh <-chan struct{}) error {
func (c *Controller) Run(workers int, stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()

klog.Infof("Starting workers for resource '%s'", c.resource.Kind)
klog.Infof("Starting workers for resource '%s'", c.Resource.Kind)
for i := 0; i < workers; i++ {
go wait.Until(c.runWorker, time.Second, stopCh)
}
klog.Infof("Started workers for resource '%s'", c.resource.Kind)
klog.Infof("Started workers for resource '%s'", c.Resource.Kind)
}

func (c *Controller) runWorker() {
Expand All @@ -139,18 +142,18 @@ func (c *Controller) processNextWorkItem() bool {

if !ok {
c.workqueue.Forget(obj)
utilruntime.HandleError(fmt.Errorf("expected event item of resource '%s' in workqueue but got %#v", c.resource.Kind, obj))
utilruntime.HandleError(fmt.Errorf("expected event item of resource '%s' in workqueue but got %#v", c.Resource.Kind, obj))
return nil
}

if err := c.syncHandler(item); err != nil {
if c.workqueue.NumRequeues(obj) >= MaxNumRequeues {
utilruntime.HandleError(fmt.Errorf("error syncing '%s' of resource '%s': %s, give up after %d requeues", item.Key, c.resource.Kind, err.Error(), MaxNumRequeues))
utilruntime.HandleError(fmt.Errorf("error syncing '%s' of resource '%s': %s, give up after %d requeues", item.Key, c.Resource.Kind, err.Error(), MaxNumRequeues))
return nil
}

c.workqueue.AddRateLimited(obj)
return fmt.Errorf("error syncing '%s' of resource '%s': %s, requeuing", item.Key, c.resource.Kind, err.Error())
return fmt.Errorf("error syncing '%s' of resource '%s': %s, requeuing", item.Key, c.Resource.Kind, err.Error())
}

c.workqueue.Forget(obj)
Expand Down Expand Up @@ -190,8 +193,8 @@ func (c *Controller) objectHandler(obj interface{}, item EventItem) error {
}

errors := make([]error, 0)
for _, kindConfig := range c.resource.KindConfigs {
portEntities, err := c.getObjectEntities(obj, kindConfig.Selector, kindConfig.Port.Entity.Mappings, kindConfig.Port.ItemsToParse)
for _, kindConfig := range c.Resource.KindConfigs {
portEntities, _, err := c.getObjectEntities(obj, kindConfig.Selector, kindConfig.Port.Entity.Mappings, kindConfig.Port.ItemsToParse)
if err != nil {
utilruntime.HandleError(fmt.Errorf("error getting entities for object key '%s': %v", item.Key, err))
continue
Expand Down Expand Up @@ -238,15 +241,15 @@ func mapEntities(obj interface{}, mappings []port.EntityMapping) ([]port.Entity,
return entities, nil
}

func (c *Controller) getObjectEntities(obj interface{}, selector port.Selector, mappings []port.EntityMapping, itemsToParse string) ([]port.Entity, error) {
func (c *Controller) getObjectEntities(obj interface{}, selector port.Selector, mappings []port.EntityMapping, itemsToParse string) ([]port.Entity, []interface{}, error) {
unstructuredObj, ok := obj.(*unstructured.Unstructured)
if !ok {
return nil, fmt.Errorf("error casting to unstructured")
return nil, nil, fmt.Errorf("error casting to unstructured")
}
var structuredObj interface{}
err := runtime.DefaultUnstructuredConverter.FromUnstructured(unstructuredObj.Object, &structuredObj)
if err != nil {
return nil, fmt.Errorf("error converting from unstructured: %v", err)
return nil, nil, fmt.Errorf("error converting from unstructured: %v", err)
}

entities := make([]port.Entity, 0, len(mappings))
Expand All @@ -257,12 +260,12 @@ func (c *Controller) getObjectEntities(obj interface{}, selector port.Selector,
} else {
items, parseItemsError := jq.ParseArray(itemsToParse, structuredObj)
if parseItemsError != nil {
return nil, parseItemsError
return nil, nil, parseItemsError
}

mappedObject, ok := structuredObj.(map[string]interface{})
if !ok {
return nil, fmt.Errorf("error parsing object '%#v'", structuredObj)
return nil, nil, fmt.Errorf("error parsing object '%#v'", structuredObj)
}

for _, item := range items {
Expand All @@ -275,24 +278,28 @@ func (c *Controller) getObjectEntities(obj interface{}, selector port.Selector,
}
}

rawDataExamples := make([]interface{}, 0)
for _, objectToMap := range objectsToMap {
selectorResult, err := isPassSelector(objectToMap, selector)

if err != nil {
return nil, err
return nil, nil, err
}

if selectorResult {
if *c.integrationConfig.SendRawDataExamples && len(rawDataExamples) < MaxRawDataExamplesToSend {
rawDataExamples = append(rawDataExamples, objectToMap)
}
currentEntities, err := mapEntities(objectToMap, mappings)
if err != nil {
return nil, err
return nil, nil, err
}

entities = append(entities, currentEntities...)
}
}

return entities, nil
return entities, rawDataExamples, nil
}

func checkIfOwnEntity(entity port.Entity, portClient *cli.PortClient) (*bool, error) {
Expand Down Expand Up @@ -362,33 +369,35 @@ func (c *Controller) entityHandler(portEntity port.Entity, action EventActionTyp
return nil
}

func (c *Controller) GetEntitiesSet() (map[string]interface{}, error) {
func (c *Controller) GetEntitiesSet() (map[string]interface{}, []interface{}, error) {
k8sEntitiesSet := map[string]interface{}{}
objects, err := c.lister.List(labels.Everything())
if err != nil {
return nil, fmt.Errorf("error listing K8s objects of resource '%s': %v", c.resource.Kind, err)
return nil, nil, fmt.Errorf("error listing K8s objects of resource '%s': %v", c.Resource.Kind, err)
}

rawDataExamples := make([]interface{}, 0)
for _, obj := range objects {
for _, kindConfig := range c.resource.KindConfigs {
for _, kindConfig := range c.Resource.KindConfigs {
mappings := make([]port.EntityMapping, 0, len(kindConfig.Port.Entity.Mappings))
for _, m := range kindConfig.Port.Entity.Mappings {
mappings = append(mappings, port.EntityMapping{
Identifier: m.Identifier,
Blueprint: m.Blueprint,
})
}
entities, err := c.getObjectEntities(obj, kindConfig.Selector, mappings, kindConfig.Port.ItemsToParse)
entities, examples, err := c.getObjectEntities(obj, kindConfig.Selector, mappings, kindConfig.Port.ItemsToParse)
if err != nil {
return nil, fmt.Errorf("error getting entities of object: %v", err)
return nil, nil, fmt.Errorf("error getting entities of object: %v", err)
}
for _, entity := range entities {
k8sEntitiesSet[c.portClient.GetEntityIdentifierKey(&entity)] = nil
}
rawDataExamples = append(rawDataExamples, examples[:min(len(examples), MaxRawDataExamplesToSend-len(rawDataExamples))]...)
}
}

return k8sEntitiesSet, nil
return k8sEntitiesSet, rawDataExamples, nil
}

func hashAllEntities(entities []port.Entity) (string, error) {
Expand All @@ -411,13 +420,13 @@ func (c *Controller) shouldSendUpdateEvent(old interface{}, new interface{}, upd
if updateEntityOnlyOnDiff == false {
return true
}
for _, kindConfig := range c.resource.KindConfigs {
oldEntities, err := c.getObjectEntities(old, kindConfig.Selector, kindConfig.Port.Entity.Mappings, kindConfig.Port.ItemsToParse)
for _, kindConfig := range c.Resource.KindConfigs {
oldEntities, _, err := c.getObjectEntities(old, kindConfig.Selector, kindConfig.Port.Entity.Mappings, kindConfig.Port.ItemsToParse)
if err != nil {
klog.Errorf("Error getting old entities: %v", err)
return true
}
newEntities, err := c.getObjectEntities(new, kindConfig.Selector, kindConfig.Port.Entity.Mappings, kindConfig.Port.ItemsToParse)
newEntities, _, err := c.getObjectEntities(new, kindConfig.Selector, kindConfig.Port.Entity.Mappings, kindConfig.Port.ItemsToParse)
if err != nil {
klog.Errorf("Error getting new entities: %v", err)
return true
Expand Down
Loading

0 comments on commit 2b4c0bf

Please sign in to comment.