Skip to content

Commit

Permalink
Port 6993 ocean allow async calls in the jq entity processor (#403)
Browse files Browse the repository at this point in the history
# Description

What - Right now we cant run async logic from within the jq entities
processor
Why - The functions are sync
How - changed the functions to be async & applied an async wrap for the
jq

## Type of change

- [X] New feature (non-breaking change which adds functionality)
  • Loading branch information
yairsimantov20 authored Mar 3, 2024
1 parent 411cd5b commit 2676ad5
Show file tree
Hide file tree
Showing 3 changed files with 47 additions and 22 deletions.
8 changes: 8 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,14 @@ this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm

<!-- towncrier release notes start -->

## 0.5.3 (2024-03-03)


### Improvements

- Cahnged the JQ Entity processor to work with async callss to allow better parallelism and async work (#1)


## 0.5.2 (2024-02-21)


Expand Down
59 changes: 38 additions & 21 deletions port_ocean/core/handlers/entity_processor/jq_entity_processor.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
import asyncio
import functools
from asyncio import TaskGroup
from functools import lru_cache
from typing import Any

Expand All @@ -22,14 +25,20 @@ class JQEntityProcessor(BaseEntityProcessor):
def _compile(self, pattern: str) -> Any:
return jq.compile(pattern)

def _search(self, data: dict[str, Any], pattern: str) -> Any:
async def _search(self, data: dict[str, Any], pattern: str) -> Any:
try:
return self._compile(pattern).first(data)
loop = asyncio.get_event_loop()
compiled_pattern = self._compile(pattern)
first_value_callable = functools.partial(compiled_pattern.first, data)
return await loop.run_in_executor(None, first_value_callable)
except Exception:
return None

def _search_as_bool(self, data: dict[str, Any], pattern: str) -> bool:
value = self._compile(pattern).first(data)
async def _search_as_bool(self, data: dict[str, Any], pattern: str) -> bool:
loop = asyncio.get_event_loop()
compiled_pattern = self._compile(pattern)
first_value_callable = functools.partial(compiled_pattern.first, data)
value = await loop.run_in_executor(None, first_value_callable)

if isinstance(value, bool):
return value
Expand All @@ -38,33 +47,41 @@ def _search_as_bool(self, data: dict[str, Any], pattern: str) -> bool:
f"Expected boolean value, got {type(value)} instead"
)

def _search_as_object(
async def _search_as_object(
self, data: dict[str, Any], obj: dict[str, Any]
) -> dict[str, Any | None]:
result: dict[str, Any | None] = {}
for key, value in obj.items():
try:
search_tasks = {}
async with TaskGroup() as tg:
for key, value in obj.items():
if isinstance(value, dict):
result[key] = self._search_as_object(data, value)
search_tasks[key] = tg.create_task(
self._search_as_object(data, value)
)
else:
result[key] = self._search(data, value)
search_tasks[key] = tg.create_task(self._search(data, value))

result: dict[str, Any | None] = {}
for key, task in search_tasks.items():
try:
result[key] = await task
except Exception:
result[key] = None

return result

def _calculate_entities(
async def _calculate_entities(
self, mapping: ResourceConfig, raw_data: list[dict[str, Any]]
) -> list[Entity]:
entities = []
for data in raw_data:
should_run = self._search_as_bool(data, mapping.selector.query)

async def calculate_raw(data: dict[str, Any]) -> dict[str, Any]:
should_run = await self._search_as_bool(data, mapping.selector.query)
if should_run and mapping.port.entity:
entities.append(
self._search_as_object(
data, mapping.port.entity.mappings.dict(exclude_unset=True)
)
return await self._search_as_object(
data, mapping.port.entity.mappings.dict(exclude_unset=True)
)
return {}

entities_tasks = [asyncio.create_task(calculate_raw(data)) for data in raw_data]
entities = await asyncio.gather(*entities_tasks)

return [
Entity.parse_obj(entity_data)
Expand All @@ -77,10 +94,10 @@ def _calculate_entities(
async def _parse_items(
self, mapping: ResourceConfig, raw_results: RawEntityDiff
) -> EntityDiff:
entities_before: list[Entity] = self._calculate_entities(
entities_before: list[Entity] = await self._calculate_entities(
mapping, raw_results["before"]
)
entities_after: list[Entity] = self._calculate_entities(
entities_after: list[Entity] = await self._calculate_entities(
mapping, raw_results["after"]
)

Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "port-ocean"
version = "0.5.2"
version = "0.5.3"
description = "Port Ocean is a CLI tool for managing your Port projects."
readme = "README.md"
homepage = "https://app.getport.io"
Expand Down

0 comments on commit 2676ad5

Please sign in to comment.