Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Integration][Octopus] Updated integration to ingest data from all spaces #958

Merged
merged 22 commits into from
Oct 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
b8c1da4
Update-octopus-integration-to-ingest-all-spaces
oiadebayo Aug 28, 2024
1c07a9f
Merge branch 'main' into Update-octopus-integration-to-ingest-all-spaces
oiadebayo Aug 28, 2024
f206873
Merge branch 'main' into Update-octopus-integration-to-ingest-all-spaces
PeyGis Aug 28, 2024
693473f
Merge branch 'main' into Update-octopus-integration-to-ingest-all-spaces
oiadebayo Aug 28, 2024
646f959
Attended to revision comments
oiadebayo Aug 28, 2024
21f5dce
Removed unnecessary return
oiadebayo Aug 29, 2024
cf8a03b
Merge branch 'main' into Update-octopus-integration-to-ingest-all-spaces
oiadebayo Aug 29, 2024
e63929e
Merge branch 'main' into Update-octopus-integration-to-ingest-all-spaces
oiadebayo Sep 3, 2024
be01392
Merge branch 'main' into Update-octopus-integration-to-ingest-all-spaces
oiadebayo Sep 4, 2024
9f4317b
Attended to reviewer's suggestion
oiadebayo Sep 4, 2024
bdcad60
Merge branch 'main' into Update-octopus-integration-to-ingest-all-spaces
oiadebayo Sep 10, 2024
632b026
Update pyproject.toml
oiadebayo Sep 10, 2024
be29169
Merge branch 'main' into Update-octopus-integration-to-ingest-all-spaces
PeyGis Sep 11, 2024
0d6f0ce
Update main.py
oiadebayo Sep 12, 2024
db72954
Merge branch 'main' into Update-octopus-integration-to-ingest-all-spaces
oiadebayo Sep 17, 2024
1b6c722
Merge branch 'main' into Update-octopus-integration-to-ingest-all-spaces
oiadebayo Sep 18, 2024
1768226
Update pyproject.toml
oiadebayo Sep 18, 2024
34d5377
Merge branch 'main' into Update-octopus-integration-to-ingest-all-spaces
oiadebayo Sep 26, 2024
4e3f7e9
Merge branch 'main' into Update-octopus-integration-to-ingest-all-spaces
oiadebayo Sep 26, 2024
4fb5b32
Merge branch 'main' into Update-octopus-integration-to-ingest-all-spaces
oiadebayo Oct 8, 2024
2a64298
Merge branch 'main' into Update-octopus-integration-to-ingest-all-spaces
oiadebayo Oct 10, 2024
81d19a5
Updated changelog
oiadebayo Oct 10, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion integrations/octopus/.port/spec.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ configurations:
- name: serverUrl
required: true
type: url
description: The base URL of your Octopus Deploy instance. It should include the protocol (e.g., https://).
description: The base URL of your Octopus Deploy instance. It should include the protocol (e.g. <a href="https://demo.octopus.com" target="_blank">https://demo.octopus.com</a>).
- name: appHost
required: false
type: url
Expand Down
7 changes: 7 additions & 0 deletions integrations/octopus/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,13 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

<!-- towncrier release notes start -->

# Port_Ocean 0.1.16-beta (2024-10-10)

### Improvements

- Updated the integration to ingest resources from all spaces instead of the default space


# Port_Ocean 0.1.15-beta (2024-10-09)

### Improvements
Expand Down
71 changes: 46 additions & 25 deletions integrations/octopus/client.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
from enum import StrEnum
from typing import Any, AsyncGenerator, Optional
from loguru import logger
from port_ocean.utils.cache import cache_iterator_result
from port_ocean.utils import http_async_client
from httpx import HTTPStatusError, Timeout

Expand All @@ -10,6 +12,14 @@
MAX_ITEMS_LIMITATION = 100


class ObjectKind(StrEnum):
SPACE = "space"
PROJECT = "project"
DEPLOYMENT = "deployment"
RELEASE = "release"
MACHINE = "machine"


class OctopusClient:
def __init__(self, server_url: str, octopus_api_key: str) -> None:
self.octopus_url = f"{server_url.rstrip('/')}/api/"
Expand Down Expand Up @@ -47,15 +57,17 @@ async def get_paginated_resources(
self,
kind: str,
params: Optional[dict[str, Any]] = None,
path_parameter: Optional[str] = None,
) -> AsyncGenerator[list[dict[str, Any]], None]:
"""Fetch paginated data from the Octopus Deploy API."""
endpoint = f"{path_parameter}/{kind}s" if path_parameter else f"{kind}s"
if params is None:
params = {}
params["skip"] = 0
params["take"] = PAGE_SIZE
page = 0
while True:
response = await self._send_api_request(f"{kind}s", params=params)
response = await self._send_api_request(endpoint, params=params)
items = response.get("Items", [])
last_page = response.get("LastPageNumber", 0)
yield items
Expand All @@ -70,14 +82,20 @@ async def get_paginated_resources(
page += 1

async def get_single_resource(
self, resource_kind: str, resource_id: str
self, resource_kind: str, resource_id: str, space_id: str
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what happens if you send it an empty string? as you do when you get event from a real time webhook

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It will return 404, but the code has been slightly modified so it will always have a space_id when making this call. All events has the space_id present in the payload

) -> dict[str, Any]:
"""Get a single resource by kind and ID."""
return await self._send_api_request(f"{resource_kind}/{resource_id}")
return await self._send_api_request(f"{space_id}/{resource_kind}/{resource_id}")

async def _get_all_spaces(self) -> list[dict[str, Any]]:
async def get_single_space(self, space_id: str) -> dict[str, Any]:
"""Get a single space by ID."""
return await self._send_api_request(f"{ObjectKind.SPACE}s/{space_id}")

@cache_iterator_result()
async def get_all_spaces(self) -> AsyncGenerator[list[dict[str, Any]], None]:
"""Get all spaces in the Octopus instance."""
return await self._send_api_request("spaces/all")
async for spaces in self.get_paginated_resources(ObjectKind.SPACE):
yield spaces

async def _create_subscription(
self, space_id: str, app_host: str
Expand All @@ -90,7 +108,7 @@ async def _create_subscription(
"WebhookTimeout": WEBHOOK_TIMEOUT,
},
"IsDisabled": False,
"Name": f"Port Subscription - {space_id}",
"Name": f"Port Subscription - {app_host}",
"SpaceId": space_id,
}
logger.info(
Expand All @@ -100,25 +118,28 @@ async def _create_subscription(
endpoint, json_data=subscription_data, method="POST"
)

async def create_webhook_subscription(self, app_host: str) -> dict[str, Any]:
async def create_webhook_subscription(self, app_host: str, space_id: str) -> None:
"""Create a new subscription for all spaces."""
for space in await self._get_all_spaces():
try:
response = await self._create_subscription(space["Id"], app_host)
if response.get("Id"):
logger.info(
f"Subscription created for space '{space['Id']}' with ID {response['Id']}"
)
else:
logger.error(
f"Failed to create subscription for space '{space['Id']}'"
)
except Exception as e:
logger.error(f"Unexpected error for space '{space['Id']}': {str(e)}")
return {"ok": True}
try:
response = await self._create_subscription(space_id, app_host)
if response.get("Id"):
logger.info(
f"Subscription created for space '{space_id}' with ID {response['Id']}"
)
else:
logger.error(f"Failed to create subscription for space '{space_id}'")
except Exception as e:
logger.error(f"Unexpected error for space '{space_id}': {str(e)}")

async def get_webhook_subscriptions(self) -> list[dict[str, Any]]:
async def get_webhook_subscriptions(
self,
space_id: str,
) -> AsyncGenerator[list[dict[str, Any]], None]:
"""Get existing subscriptions."""
response = await self._send_api_request("subscriptions/all")
logger.info(f"Retrieved {len(response)} subscriptions.")
return response
async for subscriptions in self.get_paginated_resources(
"subscription", path_parameter=space_id
):
logger.info(
f"Retrieved {len(subscriptions)} subscriptions for space {space_id}."
)
yield subscriptions
103 changes: 67 additions & 36 deletions integrations/octopus/main.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
from enum import StrEnum
from typing import Any, Dict
from loguru import logger
from port_ocean.context.ocean import ocean
from port_ocean.utils.async_iterators import stream_async_iterators_tasks
from port_ocean.core.ocean_types import ASYNC_GENERATOR_RESYNC_TYPE
from client import OctopusClient
from client import OctopusClient, ObjectKind

TRACKED_EVENTS = [
"spaces",
Expand All @@ -14,14 +14,6 @@
]


class ObjectKind(StrEnum):
SPACE = "space"
PROJECT = "project"
DEPLOYMENT = "deployment"
RELEASE = "release"
MACHINE = "machine"


@ocean.on_start()
async def on_start() -> None:
logger.info("Starting Port Ocean Octopus integration")
Expand All @@ -48,50 +40,89 @@ async def setup_application() -> None:
)
return
octopus_client = await init_client()
existing_subscriptions = await octopus_client.get_webhook_subscriptions()
existing_webhook_uris = {
subscription.get("EventNotificationSubscription", {}).get("WebhookURI")
for subscription in existing_subscriptions
}
webhook_uri = f"{app_host}/integration/webhook"
if webhook_uri in existing_webhook_uris:
logger.info(f"Webhook already exists with URI: {webhook_uri}")
else:
await octopus_client.create_webhook_subscription(app_host)
logger.info(f"Webhook created with URI: {webhook_uri}")
async for spaces in octopus_client.get_all_spaces():
space_tasks = [
(space.get("Id"), octopus_client.get_webhook_subscriptions(space.get("Id")))
for space in spaces
if space.get("Id")
]

for space_id, task in space_tasks:
async for subscriptions in task:
existing_webhook_uris = {
subscription.get("EventNotificationSubscription", {}).get(
"WebhookURI"
)
for subscription in subscriptions
}
webhook_uri = f"{app_host}/integration/webhook"
if webhook_uri in existing_webhook_uris:
logger.info(
f"Webhook already exists with URI: {webhook_uri} for space {space_id}"
)
else:
await octopus_client.create_webhook_subscription(app_host, space_id)
logger.info(
f"Webhook created with URI: {webhook_uri} for space {space_id}"
)


@ocean.on_resync()
async def resync_resources(kind: str) -> ASYNC_GENERATOR_RESYNC_TYPE:
if kind == ObjectKind.SPACE:
return
octopus_client = await init_client()
async for spaces in octopus_client.get_all_spaces():
tasks = [
octopus_client.get_paginated_resources(kind, path_parameter=space["Id"])
for space in spaces
if space["Id"]
]
async for batch in stream_async_iterators_tasks(*tasks):
yield batch


@ocean.on_resync(ObjectKind.SPACE)
async def resync_spaces(kind: str) -> ASYNC_GENERATOR_RESYNC_TYPE:
octopus_client = await init_client()
async for resource_batch in octopus_client.get_paginated_resources(kind):
logger.info(f"Received length {len(resource_batch)} of {kind} ")
yield resource_batch
async for spaces in octopus_client.get_all_spaces():
logger.info(f"Received batch {len(spaces)} spaces")
yield spaces


@ocean.router.post("/webhook")
async def handle_webhook_request(data: Dict[str, Any]) -> Dict[str, Any]:
"""
Handle the webhook request from Octopus Deploy.
"""
logger.debug(f"Received webhook event: {data}")
payload = data.get("Payload", {}).get("Event", {})
related_document_ids = payload.get("RelatedDocumentIds", [])
event_category = payload.get("Category", "")
space_id = payload["SpaceId"]
client = await init_client()
for resource_id in related_document_ids:
logger.info(f"Received webhook event with ID: {resource_id}")
resource_prefix = resource_id.split("-")[0].lower()
if resource_prefix in TRACKED_EVENTS:
kind = ObjectKind(resource_prefix.rstrip("s"))
try:
if event_category == "Deleted":
await ocean.unregister_raw(kind, [{"Id": resource_id}])
else:
if event_category == "Deleted":
resource_id = (
payload.get("ChangeDetails", {}).get("DocumentContext", {}).get("Id")
)
if resource_id and resource_id.split("-")[0].lower() in TRACKED_EVENTS:
kind = ObjectKind(resource_id.split("-")[0].lower().rstrip("s"))
await ocean.unregister_raw(kind, [{"Id": resource_id}])
else:
for resource_id in related_document_ids:
logger.info(f"Received webhook event with ID: {resource_id}")
resource_prefix = resource_id.split("-")[0].lower()
if resource_prefix in TRACKED_EVENTS:
if resource_prefix == ObjectKind.SPACE:
await client.get_single_space(space_id)
return {"ok": True}
kind = ObjectKind(resource_prefix.rstrip("s"))
try:
resource_data = await client.get_single_resource(
resource_prefix, resource_id
resource_prefix, resource_id, space_id
)
await ocean.register_raw(kind, [resource_data])
except Exception as e:
logger.error(f"Failed to process resource {resource_id}: {e}")
except Exception as e:
logger.error(f"Failed to process resource {resource_id}: {e}")
logger.info("Webhook event processed")
return {"ok": True}
2 changes: 1 addition & 1 deletion integrations/octopus/pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "octopus"
version = "0.1.15-beta"
version = "0.1.16-beta"
description = "This integration ingest data from octopus deploy"
authors = ["Adebayo Iyanuoluwa <[email protected]>"]

Expand Down