diff --git a/CHANGELOG.md b/CHANGELOG.md index 98b93cfe74..08fc21177b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,14 @@ this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm +## 0.5.3 (2024-03-03) + + +### Improvements + +- Cahnged the JQ Entity processor to work with async callss to allow better parallelism and async work (#1) + + ## 0.5.2 (2024-02-21) diff --git a/port_ocean/core/handlers/entity_processor/jq_entity_processor.py b/port_ocean/core/handlers/entity_processor/jq_entity_processor.py index 7a7338cdbe..e4315af007 100644 --- a/port_ocean/core/handlers/entity_processor/jq_entity_processor.py +++ b/port_ocean/core/handlers/entity_processor/jq_entity_processor.py @@ -1,3 +1,6 @@ +import asyncio +import functools +from asyncio import TaskGroup from functools import lru_cache from typing import Any @@ -22,14 +25,20 @@ class JQEntityProcessor(BaseEntityProcessor): def _compile(self, pattern: str) -> Any: return jq.compile(pattern) - def _search(self, data: dict[str, Any], pattern: str) -> Any: + async def _search(self, data: dict[str, Any], pattern: str) -> Any: try: - return self._compile(pattern).first(data) + loop = asyncio.get_event_loop() + compiled_pattern = self._compile(pattern) + first_value_callable = functools.partial(compiled_pattern.first, data) + return await loop.run_in_executor(None, first_value_callable) except Exception: return None - def _search_as_bool(self, data: dict[str, Any], pattern: str) -> bool: - value = self._compile(pattern).first(data) + async def _search_as_bool(self, data: dict[str, Any], pattern: str) -> bool: + loop = asyncio.get_event_loop() + compiled_pattern = self._compile(pattern) + first_value_callable = functools.partial(compiled_pattern.first, data) + value = await loop.run_in_executor(None, first_value_callable) if isinstance(value, bool): return value @@ -38,33 +47,41 @@ def _search_as_bool(self, data: dict[str, Any], pattern: str) -> bool: f"Expected boolean value, got {type(value)} instead" ) - def _search_as_object( + async def _search_as_object( self, data: dict[str, Any], obj: dict[str, Any] ) -> dict[str, Any | None]: - result: dict[str, Any | None] = {} - for key, value in obj.items(): - try: + search_tasks = {} + async with TaskGroup() as tg: + for key, value in obj.items(): if isinstance(value, dict): - result[key] = self._search_as_object(data, value) + search_tasks[key] = tg.create_task( + self._search_as_object(data, value) + ) else: - result[key] = self._search(data, value) + search_tasks[key] = tg.create_task(self._search(data, value)) + + result: dict[str, Any | None] = {} + for key, task in search_tasks.items(): + try: + result[key] = await task except Exception: result[key] = None + return result - def _calculate_entities( + async def _calculate_entities( self, mapping: ResourceConfig, raw_data: list[dict[str, Any]] ) -> list[Entity]: - entities = [] - for data in raw_data: - should_run = self._search_as_bool(data, mapping.selector.query) - + async def calculate_raw(data: dict[str, Any]) -> dict[str, Any]: + should_run = await self._search_as_bool(data, mapping.selector.query) if should_run and mapping.port.entity: - entities.append( - self._search_as_object( - data, mapping.port.entity.mappings.dict(exclude_unset=True) - ) + return await self._search_as_object( + data, mapping.port.entity.mappings.dict(exclude_unset=True) ) + return {} + + entities_tasks = [asyncio.create_task(calculate_raw(data)) for data in raw_data] + entities = await asyncio.gather(*entities_tasks) return [ Entity.parse_obj(entity_data) @@ -77,10 +94,10 @@ def _calculate_entities( async def _parse_items( self, mapping: ResourceConfig, raw_results: RawEntityDiff ) -> EntityDiff: - entities_before: list[Entity] = self._calculate_entities( + entities_before: list[Entity] = await self._calculate_entities( mapping, raw_results["before"] ) - entities_after: list[Entity] = self._calculate_entities( + entities_after: list[Entity] = await self._calculate_entities( mapping, raw_results["after"] ) diff --git a/pyproject.toml b/pyproject.toml index 1e78a79cda..05909289b7 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "port-ocean" -version = "0.5.2" +version = "0.5.3" description = "Port Ocean is a CLI tool for managing your Port projects." readme = "README.md" homepage = "https://app.getport.io"