Skip to content

Commit

Permalink
Enable multiple installations by not failing if all existing default …
Browse files Browse the repository at this point in the history
…resources exists (#89)

Enable multiple installations by not failing if some of the default blueprints already exists
  • Loading branch information
erikzaadi authored Jan 16, 2025
1 parent 36cdff4 commit 9f85629
Show file tree
Hide file tree
Showing 3 changed files with 65 additions and 128 deletions.
130 changes: 33 additions & 97 deletions pkg/defaults/defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,12 @@ package defaults

import (
"encoding/json"
"fmt"
"os"
"sync"

"github.com/port-labs/port-k8s-exporter/pkg/port"
"github.com/port-labs/port-k8s-exporter/pkg/port/blueprint"
"github.com/port-labs/port-k8s-exporter/pkg/port/cli"
"github.com/port-labs/port-k8s-exporter/pkg/port/integration"
"github.com/port-labs/port-k8s-exporter/pkg/port/page"
"github.com/port-labs/port-k8s-exporter/pkg/port/scorecards"
"gopkg.in/yaml.v3"
Expand Down Expand Up @@ -132,81 +130,48 @@ func (e *AbortDefaultCreationError) Error() string {
return "AbortDefaultCreationError"
}

func validateResourcesErrors(createdBlueprints []string, createdPages []string, resourceErrors []error) *AbortDefaultCreationError {
if len(resourceErrors) > 0 {
for _, err := range resourceErrors {
klog.Infof("Failed to create resources: %v.", err.Error())
}
return &AbortDefaultCreationError{BlueprintsToRollback: createdBlueprints, PagesToRollback: createdPages, Errors: resourceErrors}
}
return nil
}

func validateResourcesDoesNotExist(portClient *cli.PortClient, defaults *Defaults, config *port.Config) *AbortDefaultCreationError {
var errors []error
if _, err := integration.GetIntegration(portClient, config.StateKey); err == nil {
klog.Warningf("Integration with state key %s already exists", config.StateKey)
return &AbortDefaultCreationError{Errors: []error{
fmt.Errorf("integration with state key %s already exists", config.StateKey),
}}
}

func createResources(portClient *cli.PortClient, defaults *Defaults) error {
existingBlueprints := []string{}
for _, bp := range defaults.Blueprints {
if _, err := blueprint.GetBlueprint(portClient, bp.Identifier); err == nil {
klog.Warningf("Blueprint with identifier %s already exists", bp.Identifier)
errors = append(errors, fmt.Errorf("blueprint with identifier %s already exists", bp.Identifier))
existingBlueprints = append(existingBlueprints, bp.Identifier)
}
}

for _, p := range defaults.Pages {
if _, err := page.GetPage(portClient, p.Identifier); err == nil {
klog.Warningf("Page with identifier %s already exists", p.Identifier)
errors = append(errors, fmt.Errorf("page with identifier %s already exists", p.Identifier))
}
}

if errors != nil {
return &AbortDefaultCreationError{Errors: errors}
}
return nil
}

func createResources(portClient *cli.PortClient, defaults *Defaults, config *port.Config) *AbortDefaultCreationError {
if err := validateResourcesDoesNotExist(portClient, defaults, config); err != nil {
klog.Warningf("Failed to create resources: %v.", err.Errors)
return err
if len(existingBlueprints) > 0 {
klog.Infof("Found existing blueprints: %v, skipping default resources creation", existingBlueprints)
return nil
}

bareBlueprints, patchStages := deconstructBlueprintsToCreationSteps(defaults.Blueprints)

waitGroup := sync.WaitGroup{}

var resourceErrors []error
var createdBlueprints []string
var createdPages []string
mutex := sync.Mutex{}
createdBlueprints := []string{}

for _, bp := range bareBlueprints {
waitGroup.Add(1)
go func(bp port.Blueprint) {
defer waitGroup.Done()
result, err := blueprint.NewBlueprint(portClient, bp)

mutex.Lock()
if err != nil {
klog.Warningf("Failed to create blueprint %s: %v", bp.Identifier, err.Error())
resourceErrors = append(resourceErrors, err)
} else {
klog.Infof("Created blueprint %s", result.Identifier)
createdBlueprints = append(createdBlueprints, result.Identifier)
createdBlueprints = append(createdBlueprints, bp.Identifier)
}
mutex.Unlock()
}(bp)
}
waitGroup.Wait()

if err := validateResourcesErrors(createdBlueprints, createdPages, resourceErrors); err != nil {
return err
if len(resourceErrors) > 0 {
return &AbortDefaultCreationError{
BlueprintsToRollback: createdBlueprints,
Errors: resourceErrors,
}
}

for _, patchStage := range patchStages {
Expand All @@ -215,16 +180,21 @@ func createResources(portClient *cli.PortClient, defaults *Defaults, config *por
go func(bp port.Blueprint) {
defer waitGroup.Done()
if _, err := blueprint.PatchBlueprint(portClient, bp); err != nil {
mutex.Lock()
klog.Warningf("Failed to patch blueprint %s: %v", bp.Identifier, err.Error())
resourceErrors = append(resourceErrors, err)
mutex.Unlock()
}
}(bp)
}
waitGroup.Wait()
}

if err := validateResourcesErrors(createdBlueprints, createdPages, resourceErrors); err != nil {
return err
if len(resourceErrors) > 0 {
return &AbortDefaultCreationError{
BlueprintsToRollback: createdBlueprints,
Errors: resourceErrors,
}
}
}

for _, blueprintScorecards := range defaults.Scorecards {
Expand All @@ -234,76 +204,42 @@ func createResources(portClient *cli.PortClient, defaults *Defaults, config *por
defer waitGroup.Done()
if err := scorecards.CreateScorecard(portClient, blueprintIdentifier, scorecard); err != nil {
klog.Warningf("Failed to create scorecard for blueprint %s: %v", blueprintIdentifier, err.Error())
resourceErrors = append(resourceErrors, err)
}
}(blueprintScorecards.Blueprint, scorecard)
}
}
waitGroup.Wait()

if err := validateResourcesErrors(createdBlueprints, createdPages, resourceErrors); err != nil {
return err
}

for _, pageToCreate := range defaults.Pages {
waitGroup.Add(1)
go func(p port.Page) {
defer waitGroup.Done()
if err := page.CreatePage(portClient, p); err != nil {
klog.Warningf("Failed to create page %s: %v", p.Identifier, err.Error())
resourceErrors = append(resourceErrors, err)
} else {
klog.Infof("Created page %s", p.Identifier)
createdPages = append(createdPages, p.Identifier)
}
}(pageToCreate)
}
waitGroup.Wait()

if err := validateResourcesErrors(createdBlueprints, createdPages, resourceErrors); err != nil {
return err
}

if err := integration.CreateIntegration(portClient, config.StateKey, config.EventListenerType, defaults.AppConfig); err != nil {
klog.Warningf("Failed to create integration with default configuration. state key %s: %v", config.StateKey, err.Error())
return &AbortDefaultCreationError{BlueprintsToRollback: createdBlueprints, PagesToRollback: createdPages, Errors: []error{err}}
}

return nil
}

func initializeDefaults(portClient *cli.PortClient, config *port.Config) error {
defaults, err := getDefaults()
if err != nil {
return err
}

if err := createResources(portClient, defaults, config); err != nil {
klog.Infof("Failed to create resources. Rolling back blueprints: %v", err.BlueprintsToRollback)
var rollbackWg sync.WaitGroup
for _, identifier := range err.BlueprintsToRollback {
rollbackWg.Add(1)
go func(identifier string) {
defer rollbackWg.Done()
if err := blueprint.DeleteBlueprint(portClient, identifier); err != nil {
klog.Warningf("Failed to rollback blueprint %s creation: %v", identifier, err)
func initializeDefaults(portClient *cli.PortClient, defaults *Defaults) error {
if err := createResources(portClient, defaults); err != nil {
if abortErr, ok := err.(*AbortDefaultCreationError); ok {
klog.Warningf("Rolling back blueprints due to creation error")
for _, blueprintID := range abortErr.BlueprintsToRollback {
if err := blueprint.DeleteBlueprint(portClient, blueprintID); err != nil {
klog.Warningf("Failed to rollback blueprint %s: %v", blueprintID, err)
} else {
klog.Infof("Successfully rolled back blueprint %s", blueprintID)
}
}(identifier)
}
rollbackWg.Wait()

for _, identifier := range err.PagesToRollback {
rollbackWg.Add(1)
go func(identifier string) {
defer rollbackWg.Done()
if err := page.DeletePage(portClient, identifier); err != nil {
klog.Warningf("Failed to rollback page %s creation: %v", identifier, err)
}
}(identifier)
}
}
rollbackWg.Wait()

return &ExceptionGroup{Message: err.Error(), Errors: err.Errors}
klog.Warningf("Error creating default resources: %v", err)
return err
}

return nil
Expand Down
17 changes: 8 additions & 9 deletions pkg/defaults/defaults_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ func Test_InitIntegration_BlueprintExists(t *testing.T) {
assert.Nil(t, e)

i, err := integration.GetIntegration(f.portClient, f.stateKey)
assert.Nil(t, i.Config.Resources)
assert.NotNil(t, i.Config.Resources)
assert.Nil(t, err)

_, err = blueprint.GetBlueprint(f.portClient, "workload")
Expand All @@ -157,13 +157,13 @@ func Test_InitIntegration_PageExists(t *testing.T) {
assert.Nil(t, e)

i, err := integration.GetIntegration(f.portClient, f.stateKey)
assert.Nil(t, i.Config.Resources)
assert.NotNil(t, i.Config.Resources)
assert.Nil(t, err)

_, err = page.GetPage(f.portClient, "workload_overview_dashboard")
assert.Nil(t, err)

testUtils.CheckResourcesExistence(false, false, f.portClient, f.t, []string{"workload", "namespace", "cluster"}, []string{"availability_scorecard_dashboard"}, []string{})
testUtils.CheckResourcesExistence(true, false, f.portClient, f.t, []string{"workload", "namespace", "cluster"}, []string{"availability_scorecard_dashboard"}, []string{})
}

func Test_InitIntegration_ExistingIntegration(t *testing.T) {
Expand All @@ -183,7 +183,7 @@ func Test_InitIntegration_ExistingIntegration(t *testing.T) {
_, err = integration.GetIntegration(f.portClient, f.stateKey)
assert.Nil(t, err)

testUtils.CheckResourcesExistence(false, false, f.portClient, f.t, []string{"workload", "namespace", "cluster"}, []string{"workload_overview_dashboard", "availability_scorecard_dashboard"}, []string{})
testUtils.CheckResourcesExistence(true, false, f.portClient, f.t, []string{"workload", "namespace", "cluster"}, []string{"workload_overview_dashboard", "availability_scorecard_dashboard"}, []string{})
}

func Test_InitIntegration_LocalResourcesConfiguration(t *testing.T) {
Expand Down Expand Up @@ -225,7 +225,7 @@ func Test_InitIntegration_LocalResourcesConfiguration(t *testing.T) {
assert.Equal(t, expectedResources, i.Config.Resources)
assert.Nil(t, err)

testUtils.CheckResourcesExistence(false, false, f.portClient, f.t, []string{"workload", "namespace", "cluster"}, []string{"workload_overview_dashboard", "availability_scorecard_dashboard"}, []string{})
testUtils.CheckResourcesExistence(true, false, f.portClient, f.t, []string{"workload", "namespace", "cluster"}, []string{"workload_overview_dashboard", "availability_scorecard_dashboard"}, []string{})
}

func Test_InitIntegration_LocalResourcesConfiguration_ExistingIntegration_EmptyConfiguration(t *testing.T) {
Expand All @@ -247,11 +247,12 @@ func Test_InitIntegration_LocalResourcesConfiguration_ExistingIntegration_EmptyC
assert.Nil(t, err)
assert.Equal(t, "KAFKA", i.EventListener.Type)

testUtils.CheckResourcesExistence(false, false, f.portClient, f.t, []string{"workload", "namespace", "cluster"}, []string{"workload_overview_dashboard", "availability_scorecard_dashboard"}, []string{})
testUtils.CheckResourcesExistence(true, false, f.portClient, f.t, []string{"workload", "namespace", "cluster"}, []string{"workload_overview_dashboard", "availability_scorecard_dashboard"}, []string{})
}

func Test_InitIntegration_LocalResourcesConfiguration_ExistingIntegration_WithConfiguration_WithOverwriteConfigurationOnRestartFlag(t *testing.T) {
f := NewFixture(t)
defer tearDownFixture(t, f)

expectedConfig := &port.IntegrationAppConfig{
Resources: []port.Resource{
Expand Down Expand Up @@ -294,7 +295,5 @@ func Test_InitIntegration_LocalResourcesConfiguration_ExistingIntegration_WithCo
assert.Nil(t, err)
assert.Equal(t, expectedConfig.Resources, i.Config.Resources)

testUtils.CheckResourcesExistence(false, false, f.portClient, f.t, []string{"workload", "namespace", "cluster"}, []string{"workload_overview_dashboard", "availability_scorecard_dashboard"}, []string{})
defer tearDownFixture(t, f)

testUtils.CheckResourcesExistence(true, false, f.portClient, f.t, []string{"workload", "namespace", "cluster"}, []string{"workload_overview_dashboard", "availability_scorecard_dashboard"}, []string{})
}
46 changes: 24 additions & 22 deletions pkg/defaults/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,11 @@ func getEventListenerConfig(eventListenerType string) *port.EventListenerSetting

func InitIntegration(portClient *cli.PortClient, applicationConfig *port.Config) error {
klog.Infof("Initializing Port integration")
defaults, err := getDefaults()
if err != nil {
return err
}

existingIntegration, err := integration.GetIntegration(portClient, applicationConfig.StateKey)
defaultIntegrationConfig := &port.IntegrationAppConfig{
Resources: applicationConfig.Resources,
Expand All @@ -28,39 +33,36 @@ func InitIntegration(portClient *cli.PortClient, applicationConfig *port.Config)
}

if err != nil {
klog.Infof("Could not get integration with state key %s, error: %s", applicationConfig.StateKey, err.Error())
klog.Infof("Creating integration")
// The exporter supports a deprecated case where resources are provided in config file and integration does not
// exist. If this is not the case, we support the new way of creating the integration with the default resources.
// Only one of the two cases can be true.
if defaultIntegrationConfig.Resources == nil && applicationConfig.CreateDefaultResources {
klog.Infof("Creating default resources")
if err := initializeDefaults(portClient, applicationConfig); err != nil {
klog.Warningf("Error initializing defaults: %s", err.Error())
klog.Warningf("The integration will start without default integration mapping and other default resources. Please create them manually in Port. ")
} else {
klog.Infof("Default resources created successfully")
return nil
}
if applicationConfig.CreateDefaultResources {
defaultIntegrationConfig = defaults.AppConfig
}

klog.Infof("Could not create default resources, creating integration with no resources")
klog.Infof("Creating integration with config: %v", defaultIntegrationConfig)
// Handle a deprecated case where resources are provided in config file
return integration.CreateIntegration(portClient, applicationConfig.StateKey, applicationConfig.EventListenerType, defaultIntegrationConfig)
klog.Warningf("Could not get integration with state key %s, error: %s", applicationConfig.StateKey, err.Error())
if err := integration.CreateIntegration(portClient, applicationConfig.StateKey, applicationConfig.EventListenerType, defaultIntegrationConfig); err != nil {
return err
}
} else {
klog.Infof("Integration with state key %s already exists, patching it", applicationConfig.StateKey)
integrationPatch := &port.Integration{
EventListener: getEventListenerConfig(applicationConfig.EventListenerType),
}

// Handle a deprecated case where resources are provided in config file and integration exists from previous
//versions without a config
if existingIntegration.Config == nil || applicationConfig.OverwriteConfigurationOnRestart {
klog.Infof("Integration exists without a config, patching it with default config: %v", defaultIntegrationConfig)
integrationPatch.Config = defaultIntegrationConfig
}

return integration.PatchIntegration(portClient, applicationConfig.StateKey, integrationPatch)
if err := integration.PatchIntegration(portClient, applicationConfig.StateKey, integrationPatch); err != nil {
return err
}
}

if applicationConfig.CreateDefaultResources {
klog.Infof("Creating default resources (blueprints, pages, etc..)")
if err := initializeDefaults(portClient, defaults); err != nil {
klog.Warningf("Error initializing defaults: %s", err.Error())
klog.Warningf("Some default resources may not have been created. The integration will continue running.")
}
}

return nil
}

0 comments on commit 9f85629

Please sign in to comment.