Skip to content

Commit

Permalink
Add opt-in integration resource provision service
Browse files Browse the repository at this point in the history
  • Loading branch information
erikzaadi committed Jan 5, 2025
1 parent ae1abcd commit 4f65997
Show file tree
Hide file tree
Showing 5 changed files with 71 additions and 2 deletions.
3 changes: 3 additions & 0 deletions port_ocean/clients/port/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from port_ocean.clients.port.mixins.entities import EntityClientMixin
from port_ocean.clients.port.mixins.integrations import IntegrationClientMixin
from port_ocean.clients.port.mixins.migrations import MigrationClientMixin
from port_ocean.clients.port.mixins.organization import OrganizationClientMixin
from port_ocean.clients.port.types import (
KafkaCreds,
)
Expand All @@ -21,6 +22,7 @@ class PortClient(
IntegrationClientMixin,
BlueprintClientMixin,
MigrationClientMixin,
OrganizationClientMixin,
):
def __init__(
self,
Expand Down Expand Up @@ -48,6 +50,7 @@ def __init__(
)
BlueprintClientMixin.__init__(self, self.auth, self.client)
MigrationClientMixin.__init__(self, self.auth, self.client)
OrganizationClientMixin.__init__(self, self.auth, self.client)

async def get_kafka_creds(self) -> KafkaCreds:
logger.info("Fetching organization kafka credentials")
Expand Down
26 changes: 24 additions & 2 deletions port_ocean/clients/port/mixins/integrations.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from typing import Any, TYPE_CHECKING, Optional, TypedDict
import asyncio
from typing import Any, Dict, TYPE_CHECKING, Optional, TypedDict
from urllib.parse import quote_plus

import httpx
Expand Down Expand Up @@ -50,27 +51,48 @@ async def get_log_attributes(self) -> LogAttributes:
self._log_attributes = response["logAttributes"]
return self._log_attributes

async def _poll_integration_until_default_provisioning_is_complete(
self,
interval=15,
) -> Dict[str, Any]:
response = await self._get_current_integration()
response_json = response.json()
config = response_json.get("config", {})
if config != {}:
return response_json

await asyncio.sleep(interval)

# TODO: Ensure that get_integration isn't cached
return self._poll_integration_until_default_provisioning_is_complete()

async def create_integration(
self,
_type: str,
changelog_destination: dict[str, Any],
port_app_config: Optional["PortAppConfig"] = None,
use_provisioned_defaults: Optional[bool] = False,
) -> dict:
logger.info(f"Creating integration with id: {self.integration_identifier}")
if use_provisioned_defaults:
logger.info("Creating integration with `use_provisioned_defaults`")
headers = await self.auth.headers()
json = {
"installationId": self.integration_identifier,
"installationAppType": _type,
"version": self.integration_version,
"changelogDestination": changelog_destination,
"provisionEnabled": use_provisioned_defaults,
"config": {},
}
if port_app_config:
if port_app_config and not use_provisioned_defaults:
json["config"] = port_app_config.to_request()
response = await self.client.post(
f"{self.auth.api_url}/integration", headers=headers, json=json
)
handle_status_code(response)
if use_provisioned_defaults:
return self._poll_integration_until_default_provisioning_is_complete()
return response.json()["integration"]

async def patch_integration(
Expand Down
31 changes: 31 additions & 0 deletions port_ocean/clients/port/mixins/organization.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
from typing import List
import httpx
from loguru import logger
from port_ocean.clients.port.authentication import PortAuthentication
from port_ocean.clients.port.utils import handle_status_code


class OrganizationClientMixin:
def __init__(
self,
auth: PortAuthentication,
client: httpx.AsyncClient,
):
self.auth = auth
self.client = client

async def _get_organization_feature_toggles(self) -> httpx.Response:
logger.info("Fetching organization feature toggles")

response = await self.client.get(
f"{self.auth.api_url}/organization",
headers=await self.auth.headers(),
)
return response

async def get_organization_feature_toggles(
self, should_raise: bool = True, should_log: bool = True
) -> List[str]:
response = await self._get_organization_feature_toggles()
handle_status_code(response, should_raise, should_log)
return response.json().get("organization", {}).get("featureFlags", [])
1 change: 1 addition & 0 deletions port_ocean/config/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ class IntegrationConfiguration(BaseOceanSettings, extra=Extra.allow):
initialize_port_resources: bool = True
scheduled_resync_interval: int | None = None
client_timeout: int = 60
use_provisioned_defaults: bool = False
send_raw_data_examples: bool = True
port: PortSettings
event_listener: EventListenerSettingsType = Field(
Expand Down
12 changes: 12 additions & 0 deletions port_ocean/core/defaults/initialize.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
AbortDefaultCreationError,
)

ORG_USE_PROVISIONED_DEFAULTS_FEATURE_TOGGLE = "USE_PROVISIONED_DEFAULTS"


def deconstruct_blueprints_to_creation_steps(
raw_blueprints: list[dict[str, Any]],
Expand Down Expand Up @@ -70,6 +72,7 @@ async def _initialize_required_integration_settings(
integration_config.integration.type,
integration_config.event_listener.to_request(),
port_app_config=default_mapping,
use_provisioned_defaults=integration_config.use_provisioned_defaults,
)
elif not integration.get("config"):
logger.info(
Expand Down Expand Up @@ -205,6 +208,12 @@ async def _initialize_defaults(
logger.warning("No defaults found. Skipping initialization...")
return None

if integration_config.use_provisioned_defaults:
logger.info("`use_provisioned_defaults` set, verifying org feature toggle")
org_feature_toggles = await port_client.get_organization_feature_toggles()
if ORG_USE_PROVISIONED_DEFAULTS_FEATURE_TOGGLE not in org_feature_toggles:
integration_config.use_provisioned_defaults = False

if defaults.port_app_config:
await _initialize_required_integration_settings(
port_client, defaults.port_app_config, integration_config
Expand All @@ -213,6 +222,9 @@ async def _initialize_defaults(
if not integration_config.initialize_port_resources:
return

if integration_config.use_provisioned_defaults:
logger.info("Skipping creating default due to `use_provisioned_defaults`")
return
try:
logger.info("Found default resources, starting creation process")
await _create_resources(port_client, defaults)
Expand Down

0 comments on commit 4f65997

Please sign in to comment.