From 4f659976301a480ffecb701cbb43da5bffd1586a Mon Sep 17 00:00:00 2001 From: erikzaadi Date: Sun, 5 Jan 2025 17:43:35 +0200 Subject: [PATCH] Add opt-in integration resource provision service --- port_ocean/clients/port/client.py | 3 ++ .../clients/port/mixins/integrations.py | 26 ++++++++++++++-- .../clients/port/mixins/organization.py | 31 +++++++++++++++++++ port_ocean/config/settings.py | 1 + port_ocean/core/defaults/initialize.py | 12 +++++++ 5 files changed, 71 insertions(+), 2 deletions(-) create mode 100644 port_ocean/clients/port/mixins/organization.py diff --git a/port_ocean/clients/port/client.py b/port_ocean/clients/port/client.py index 2b323f139a..4ed0714947 100644 --- a/port_ocean/clients/port/client.py +++ b/port_ocean/clients/port/client.py @@ -5,6 +5,7 @@ from port_ocean.clients.port.mixins.entities import EntityClientMixin from port_ocean.clients.port.mixins.integrations import IntegrationClientMixin from port_ocean.clients.port.mixins.migrations import MigrationClientMixin +from port_ocean.clients.port.mixins.organization import OrganizationClientMixin from port_ocean.clients.port.types import ( KafkaCreds, ) @@ -21,6 +22,7 @@ class PortClient( IntegrationClientMixin, BlueprintClientMixin, MigrationClientMixin, + OrganizationClientMixin, ): def __init__( self, @@ -48,6 +50,7 @@ def __init__( ) BlueprintClientMixin.__init__(self, self.auth, self.client) MigrationClientMixin.__init__(self, self.auth, self.client) + OrganizationClientMixin.__init__(self, self.auth, self.client) async def get_kafka_creds(self) -> KafkaCreds: logger.info("Fetching organization kafka credentials") diff --git a/port_ocean/clients/port/mixins/integrations.py b/port_ocean/clients/port/mixins/integrations.py index 9148dec519..b4dd318f15 100644 --- a/port_ocean/clients/port/mixins/integrations.py +++ b/port_ocean/clients/port/mixins/integrations.py @@ -1,4 +1,5 @@ -from typing import Any, TYPE_CHECKING, Optional, TypedDict +import asyncio +from typing import Any, Dict, TYPE_CHECKING, Optional, TypedDict from urllib.parse import quote_plus import httpx @@ -50,27 +51,48 @@ async def get_log_attributes(self) -> LogAttributes: self._log_attributes = response["logAttributes"] return self._log_attributes + async def _poll_integration_until_default_provisioning_is_complete( + self, + interval=15, + ) -> Dict[str, Any]: + response = await self._get_current_integration() + response_json = response.json() + config = response_json.get("config", {}) + if config != {}: + return response_json + + await asyncio.sleep(interval) + + # TODO: Ensure that get_integration isn't cached + return self._poll_integration_until_default_provisioning_is_complete() + async def create_integration( self, _type: str, changelog_destination: dict[str, Any], port_app_config: Optional["PortAppConfig"] = None, + use_provisioned_defaults: Optional[bool] = False, ) -> dict: logger.info(f"Creating integration with id: {self.integration_identifier}") + if use_provisioned_defaults: + logger.info("Creating integration with `use_provisioned_defaults`") headers = await self.auth.headers() json = { "installationId": self.integration_identifier, "installationAppType": _type, "version": self.integration_version, "changelogDestination": changelog_destination, + "provisionEnabled": use_provisioned_defaults, "config": {}, } - if port_app_config: + if port_app_config and not use_provisioned_defaults: json["config"] = port_app_config.to_request() response = await self.client.post( f"{self.auth.api_url}/integration", headers=headers, json=json ) handle_status_code(response) + if use_provisioned_defaults: + return self._poll_integration_until_default_provisioning_is_complete() return response.json()["integration"] async def patch_integration( diff --git a/port_ocean/clients/port/mixins/organization.py b/port_ocean/clients/port/mixins/organization.py new file mode 100644 index 0000000000..b5cb8eb5fe --- /dev/null +++ b/port_ocean/clients/port/mixins/organization.py @@ -0,0 +1,31 @@ +from typing import List +import httpx +from loguru import logger +from port_ocean.clients.port.authentication import PortAuthentication +from port_ocean.clients.port.utils import handle_status_code + + +class OrganizationClientMixin: + def __init__( + self, + auth: PortAuthentication, + client: httpx.AsyncClient, + ): + self.auth = auth + self.client = client + + async def _get_organization_feature_toggles(self) -> httpx.Response: + logger.info("Fetching organization feature toggles") + + response = await self.client.get( + f"{self.auth.api_url}/organization", + headers=await self.auth.headers(), + ) + return response + + async def get_organization_feature_toggles( + self, should_raise: bool = True, should_log: bool = True + ) -> List[str]: + response = await self._get_organization_feature_toggles() + handle_status_code(response, should_raise, should_log) + return response.json().get("organization", {}).get("featureFlags", []) diff --git a/port_ocean/config/settings.py b/port_ocean/config/settings.py index 5c948af17d..d1ef40820a 100644 --- a/port_ocean/config/settings.py +++ b/port_ocean/config/settings.py @@ -68,6 +68,7 @@ class IntegrationConfiguration(BaseOceanSettings, extra=Extra.allow): initialize_port_resources: bool = True scheduled_resync_interval: int | None = None client_timeout: int = 60 + use_provisioned_defaults: bool = False send_raw_data_examples: bool = True port: PortSettings event_listener: EventListenerSettingsType = Field( diff --git a/port_ocean/core/defaults/initialize.py b/port_ocean/core/defaults/initialize.py index 1ae1a49cee..932eebad30 100644 --- a/port_ocean/core/defaults/initialize.py +++ b/port_ocean/core/defaults/initialize.py @@ -19,6 +19,8 @@ AbortDefaultCreationError, ) +ORG_USE_PROVISIONED_DEFAULTS_FEATURE_TOGGLE = "USE_PROVISIONED_DEFAULTS" + def deconstruct_blueprints_to_creation_steps( raw_blueprints: list[dict[str, Any]], @@ -70,6 +72,7 @@ async def _initialize_required_integration_settings( integration_config.integration.type, integration_config.event_listener.to_request(), port_app_config=default_mapping, + use_provisioned_defaults=integration_config.use_provisioned_defaults, ) elif not integration.get("config"): logger.info( @@ -205,6 +208,12 @@ async def _initialize_defaults( logger.warning("No defaults found. Skipping initialization...") return None + if integration_config.use_provisioned_defaults: + logger.info("`use_provisioned_defaults` set, verifying org feature toggle") + org_feature_toggles = await port_client.get_organization_feature_toggles() + if ORG_USE_PROVISIONED_DEFAULTS_FEATURE_TOGGLE not in org_feature_toggles: + integration_config.use_provisioned_defaults = False + if defaults.port_app_config: await _initialize_required_integration_settings( port_client, defaults.port_app_config, integration_config @@ -213,6 +222,9 @@ async def _initialize_defaults( if not integration_config.initialize_port_resources: return + if integration_config.use_provisioned_defaults: + logger.info("Skipping creating default due to `use_provisioned_defaults`") + return try: logger.info("Found default resources, starting creation process") await _create_resources(port_client, defaults)