diff --git a/CHANGELOG.md b/CHANGELOG.md index 0e0dc4fcbb..1367c87b6b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,13 @@ this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm +## 0.5.21 (2024-05-26) + +### Features + +- Added `send_raw_data_examples` integration config to allow sending raw data examples from the third party API to port (on resync), for testing and managing the integration mapping + + ## 0.5.20 (2024-05-26) diff --git a/port_ocean/clients/port/mixins/integrations.py b/port_ocean/clients/port/mixins/integrations.py index 3f357d0975..5fc4575748 100644 --- a/port_ocean/clients/port/mixins/integrations.py +++ b/port_ocean/clients/port/mixins/integrations.py @@ -6,6 +6,7 @@ from port_ocean.clients.port.authentication import PortAuthentication from port_ocean.clients.port.utils import handle_status_code +from port_ocean.log.sensetive import sensitive_log_filter if TYPE_CHECKING: from port_ocean.core.handlers.port_app_config.models import PortAppConfig @@ -137,3 +138,18 @@ async def ingest_integration_logs(self, logs: list[dict[str, Any]]) -> None: ) handle_status_code(response) logger.debug("Logs successfully ingested") + + async def ingest_integration_kind_examples( + self, kind: str, data: list[dict[str, Any]], should_log: bool = True + ): + logger.debug(f"Ingesting examples for kind: {kind}") + headers = await self.auth.headers() + response = await self.client.post( + f"{self.auth.api_url}/integration/{self.integration_identifier}/kinds/{kind}/examples", + headers=headers, + json={ + "examples": sensitive_log_filter.mask_object(data, full_hide=True), + }, + ) + handle_status_code(response, should_log=should_log) + logger.debug(f"Examples for kind {kind} successfully ingested") diff --git a/port_ocean/config/settings.py b/port_ocean/config/settings.py index f9da9ebd46..21d3e5836a 100644 --- a/port_ocean/config/settings.py +++ b/port_ocean/config/settings.py @@ -65,6 +65,7 @@ class IntegrationConfiguration(BaseOceanSettings, extra=Extra.allow): initialize_port_resources: bool = True scheduled_resync_interval: int | None = None client_timeout: int = 30 + send_raw_data_examples: bool = True port: PortSettings event_listener: EventListenerSettingsType integration: IntegrationSettings diff --git a/port_ocean/core/handlers/entity_processor/base.py b/port_ocean/core/handlers/entity_processor/base.py index 95164d60e3..dffde29fa1 100644 --- a/port_ocean/core/handlers/entity_processor/base.py +++ b/port_ocean/core/handlers/entity_processor/base.py @@ -40,6 +40,7 @@ async def _parse_items( mapping: ResourceConfig, raw_data: list[RAW_ITEM], parse_all: bool = False, + send_raw_data_examples_amount: int = 0, ) -> EntitySelectorDiff: pass @@ -48,6 +49,7 @@ async def parse_items( mapping: ResourceConfig, raw_data: list[RAW_ITEM], parse_all: bool = False, + send_raw_data_examples_amount: int = 0, ) -> EntitySelectorDiff: """Public method to parse raw entity data and map it to an EntityDiff. @@ -55,9 +57,12 @@ async def parse_items( mapping (ResourceConfig): The configuration for entity mapping. raw_data (list[RawEntity]): The raw data to be parsed. parse_all (bool): Whether to parse all data or just data that passed the selector. + send_raw_data_examples_amount (bool): Whether to send example data to the integration service. Returns: EntityDiff: The parsed entity differences. """ with logger.contextualize(kind=mapping.kind): - return await self._parse_items(mapping, raw_data, parse_all) + return await self._parse_items( + mapping, raw_data, parse_all, send_raw_data_examples_amount + ) diff --git a/port_ocean/core/handlers/entity_processor/jq_entity_processor.py b/port_ocean/core/handlers/entity_processor/jq_entity_processor.py index 6404bda3a6..e68ff49190 100644 --- a/port_ocean/core/handlers/entity_processor/jq_entity_processor.py +++ b/port_ocean/core/handlers/entity_processor/jq_entity_processor.py @@ -1,11 +1,13 @@ import asyncio import functools +from dataclasses import dataclass, field from functools import lru_cache -from typing import Any -from loguru import logger +from typing import Any, Optional import pyjq as jq # type: ignore +from loguru import logger +from port_ocean.context.ocean import ocean from port_ocean.core.handlers.entity_processor.base import BaseEntityProcessor from port_ocean.core.handlers.port_app_config.models import ResourceConfig from port_ocean.core.models import Entity @@ -17,6 +19,18 @@ from port_ocean.utils.queue_utils import process_in_queue +@dataclass +class MappedEntity: + """Represents the entity after applying the mapping + + This class holds the mapping entity along with the selector boolean value and optionally the raw data. + """ + + entity: dict[str, Any] = field(default_factory=dict) + did_entity_pass_selector: bool = False + raw_data: Optional[dict[str, Any]] = None + + class JQEntityProcessor(BaseEntityProcessor): """Processes and parses entities using JQ expressions. @@ -78,13 +92,17 @@ async def _get_mapped_entity( raw_entity_mappings: dict[str, Any], selector_query: str, parse_all: bool = False, - ) -> tuple[dict[str, Any], bool]: + ) -> MappedEntity: should_run = await self._search_as_bool(data, selector_query) if parse_all or should_run: mapped_entity = await self._search_as_object(data, raw_entity_mappings) - return mapped_entity, should_run + return MappedEntity( + mapped_entity, + did_entity_pass_selector=should_run, + raw_data=data if should_run else None, + ) - return {}, False + return MappedEntity() async def _calculate_entity( self, @@ -93,7 +111,7 @@ async def _calculate_entity( items_to_parse: str | None, selector_query: str, parse_all: bool = False, - ) -> list[tuple[dict[str, Any], bool]]: + ) -> list[MappedEntity]: if items_to_parse: items = await self._search(data, items_to_parse) if isinstance(items, list): @@ -118,13 +136,27 @@ async def _calculate_entity( data, raw_entity_mappings, selector_query, parse_all ) ] - return [({}, False)] + return [MappedEntity()] + + @staticmethod + async def _send_examples(data: list[dict[str, Any]], kind: str) -> None: + try: + if data: + await ocean.port_client.ingest_integration_kind_examples( + kind, data, should_log=False + ) + except Exception as ex: + logger.warning( + f"Failed to send raw data example {ex}", + exc_info=True, + ) async def _parse_items( self, mapping: ResourceConfig, raw_results: list[RAW_ITEM], parse_all: bool = False, + send_raw_data_examples_amount: int = 0, ) -> EntitySelectorDiff: raw_entity_mappings: dict[str, Any] = mapping.port.entity.mappings.dict( exclude_unset=True @@ -141,13 +173,21 @@ async def _parse_items( passed_entities = [] failed_entities = [] + examples_to_send: list[dict[str, Any]] = [] for entities_results in calculated_entities_results: - for entity, did_entity_pass_selector in entities_results: - if entity.get("identifier") and entity.get("blueprint"): - parsed_entity = Entity.parse_obj(entity) - if did_entity_pass_selector: + for result in entities_results: + if result.entity.get("identifier") and result.entity.get("blueprint"): + parsed_entity = Entity.parse_obj(result.entity) + if result.did_entity_pass_selector: passed_entities.append(parsed_entity) + if ( + len(examples_to_send) < send_raw_data_examples_amount + and result.raw_data is not None + ): + examples_to_send.append(result.raw_data) else: failed_entities.append(parsed_entity) + await self._send_examples(examples_to_send, mapping.kind) + return EntitySelectorDiff(passed=passed_entities, failed=failed_entities) diff --git a/port_ocean/core/integrations/mixins/sync_raw.py b/port_ocean/core/integrations/mixins/sync_raw.py index d47a57eabe..6997c19a22 100644 --- a/port_ocean/core/integrations/mixins/sync_raw.py +++ b/port_ocean/core/integrations/mixins/sync_raw.py @@ -30,6 +30,9 @@ from port_ocean.exceptions.core import OceanAbortException +SEND_RAW_DATA_EXAMPLES_AMOUNT = 5 + + class SyncRawMixin(HandlerMixin, EventsMixin): """Mixin class for synchronization of raw constructed entities. @@ -124,10 +127,13 @@ async def _calculate_raw( self, raw_diff: list[tuple[ResourceConfig, list[RAW_ITEM]]], parse_all: bool = False, + send_raw_data_examples_amount: int = 0, ) -> list[EntitySelectorDiff]: return await asyncio.gather( *( - self.entity_processor.parse_items(mapping, results, parse_all) + self.entity_processor.parse_items( + mapping, results, parse_all, send_raw_data_examples_amount + ) for mapping, results in raw_diff ) ) @@ -138,8 +144,11 @@ async def _register_resource_raw( results: list[dict[Any, Any]], user_agent_type: UserAgentType, parse_all: bool = False, + send_raw_data_examples_amount: int = 0, ) -> EntitySelectorDiff: - objects_diff = await self._calculate_raw([(resource, results)], parse_all) + objects_diff = await self._calculate_raw( + [(resource, results)], parse_all, send_raw_data_examples_amount + ) await self.entities_state_applier.upsert( objects_diff[0].passed, user_agent_type ) @@ -171,19 +180,32 @@ async def _register_in_batches( else: async_generators.append(result) + send_raw_data_examples_amount = ( + SEND_RAW_DATA_EXAMPLES_AMOUNT if ocean.config.send_raw_data_examples else 0 + ) entities = ( await self._register_resource_raw( - resource_config, raw_results, user_agent_type + resource_config, + raw_results, + user_agent_type, + send_raw_data_examples_amount=send_raw_data_examples_amount, ) ).passed for generator in async_generators: try: async for items in generator: + if send_raw_data_examples_amount > 0: + send_raw_data_examples_amount = max( + 0, send_raw_data_examples_amount - len(entities) + ) entities.extend( ( await self._register_resource_raw( - resource_config, items, user_agent_type + resource_config, + items, + user_agent_type, + send_raw_data_examples_amount=send_raw_data_examples_amount, ) ).passed ) diff --git a/port_ocean/log/sensetive.py b/port_ocean/log/sensetive.py index 07f5b01ba7..e4779cfcc4 100644 --- a/port_ocean/log/sensetive.py +++ b/port_ocean/log/sensetive.py @@ -1,5 +1,5 @@ import re -from typing import Callable, TYPE_CHECKING +from typing import Any, Callable, TYPE_CHECKING if TYPE_CHECKING: from loguru import Record @@ -35,16 +35,31 @@ def hide_sensitive_strings(self, *tokens: str) -> None: [re.compile(re.escape(token.strip())) for token in tokens if token.strip()] ) + def mask_string(self, string: str, full_hide: bool = False) -> str: + masked_string = string + for pattern in self.compiled_patterns: + replace: Callable[[re.Match[str]], str] | str = ( + "[REDACTED]" + if full_hide + else lambda match: match.group()[:6] + "[REDACTED]" + ) + masked_string = pattern.sub(replace, masked_string) + return masked_string + + def mask_object(self, obj: Any, full_hide: bool = False) -> Any: + if isinstance(obj, str): + return self.mask_string(obj, full_hide) + if isinstance(obj, list): + return [self.mask_object(o, full_hide) for o in obj] + if isinstance(obj, dict): + for k, v in obj.items(): + obj[k] = self.mask_object(v, full_hide) + + return obj + def create_filter(self, full_hide: bool = False) -> Callable[["Record"], bool]: def _filter(record: "Record") -> bool: - for pattern in self.compiled_patterns: - replace: Callable[[re.Match[str]], str] | str = ( - "[REDACTED]" - if full_hide - else lambda match: match.group()[:6] + "[REDACTED]" - ) - record["message"] = pattern.sub(replace, record["message"]) - + record["message"] = self.mask_string(record["message"], full_hide) return True return _filter