Skip to content

Commit

Permalink
Send logs to Port integration (#371)
Browse files Browse the repository at this point in the history
  • Loading branch information
yairsimantov20 authored Feb 18, 2024
1 parent 6d534ec commit e7d1440
Show file tree
Hide file tree
Showing 38 changed files with 735 additions and 405 deletions.
19 changes: 19 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,25 @@ this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm

<!-- towncrier release notes start -->

## 0.5.0 (2024-02-18)


### Features

- Added a method for ocean integration to redact sensitive information from the logs and automatically apply it to sensitive configurations and known sensitive patterns. (#1)
- Added an HTTP handler for Ocean logs to facilitate sending the logs to the Port. (#2)

### Improvements

- Seperated the `port_ocean.utils` file into multiple files within the `utils` folder to improve code organization. (#1)
- Changed the Ocean context to be a global variable instead of using Localstack, preventing the framework from re-initiating the context for each thread. (#2)

### Bug Fixes

- Fixed an issue where the event listener was causing the application to continue running even after receiving a termination signal. (#1)
- Fixed a bug that caused some termination signal handlers to not work by consolidating the signal listeners in a single class, as signals can only have one listener. (#2)


## 0.4.17 (2024-01-23)


Expand Down
1 change: 1 addition & 0 deletions integrations/jira/jira/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from httpx import Timeout, BasicAuth
from jira.overrides import JiraResourceConfig
from loguru import logger

from port_ocean.context.event import event
from port_ocean.context.ocean import ocean
from port_ocean.utils import http_async_client
Expand Down
311 changes: 151 additions & 160 deletions poetry.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion port_ocean/bootstrap.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

from port_ocean.core.integrations.base import BaseIntegration
from port_ocean.ocean import Ocean
from port_ocean.utils import load_module
from port_ocean.utils.misc import load_module


def _get_base_integration_class_from_module(
Expand Down
12 changes: 6 additions & 6 deletions port_ocean/cli/commands/defaults/clean.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
# -*- coding: utf-8 -*-
import click

from inspect import getmembers

from port_ocean.utils import load_module
from .group import defaults
import click

from port_ocean.bootstrap import create_default_app
from port_ocean.cli.commands.main import print_logo, console
from port_ocean.ocean import Ocean
from port_ocean.core.defaults import clean_defaults
from port_ocean.bootstrap import create_default_app
from port_ocean.ocean import Ocean
from port_ocean.utils.misc import load_module
from .group import defaults


@defaults.command()
Expand Down
12 changes: 6 additions & 6 deletions port_ocean/cli/commands/defaults/dock.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
# -*- coding: utf-8 -*-
import click

from inspect import getmembers

from port_ocean.utils import load_module
from .group import defaults
import click

from port_ocean.bootstrap import create_default_app
from port_ocean.cli.commands.main import print_logo, console
from port_ocean.ocean import Ocean
from port_ocean.core.defaults.initialize import initialize_defaults
from port_ocean.bootstrap import create_default_app
from port_ocean.ocean import Ocean
from port_ocean.utils.misc import load_module
from .group import defaults


@defaults.command()
Expand Down
3 changes: 1 addition & 2 deletions port_ocean/clients/port/authentication.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

from port_ocean.clients.port.types import UserAgentType
from port_ocean.clients.port.utils import handle_status_code
from port_ocean.utils import get_time
from port_ocean.utils.misc import get_time


class TokenResponse(BaseModel):
Expand Down Expand Up @@ -79,5 +79,4 @@ async def token(self) -> str:
self.last_token_object = await self._get_token(
self.client_id, self.client_secret
)

return self.last_token_object.full_token
27 changes: 26 additions & 1 deletion port_ocean/clients/port/mixins/integrations.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import Any, TYPE_CHECKING, Optional
from typing import Any, TYPE_CHECKING, Optional, TypedDict

import httpx
from loguru import logger
Expand All @@ -11,6 +11,10 @@
from port_ocean.core.handlers.port_app_config.models import PortAppConfig


class LogAttributes(TypedDict):
ingestUrl: str


class IntegrationClientMixin:
def __init__(
self,
Expand All @@ -23,6 +27,7 @@ def __init__(
self.integration_version = integration_version
self.auth = auth
self.client = client
self._log_attributes: LogAttributes | None = None

async def _get_current_integration(self) -> httpx.Response:
logger.info(f"Fetching integration with id: {self.integration_identifier}")
Expand All @@ -39,6 +44,12 @@ async def get_current_integration(
handle_status_code(response, should_raise, should_log)
return response.json()["integration"]

async def get_log_attributes(self) -> LogAttributes:
if self._log_attributes is None:
response = await self.get_current_integration()
self._log_attributes = response["logAttributes"]
return self._log_attributes

async def create_integration(
self,
_type: str,
Expand Down Expand Up @@ -112,3 +123,17 @@ async def initialize_integration(
logger.info(
f"Integration with id: {self.integration_identifier} successfully registered"
)

async def ingest_integration_logs(self, logs: list[dict[str, Any]]) -> None:
logger.debug("Ingesting logs")
log_attributes = await self.get_log_attributes()
headers = await self.auth.headers()
response = await self.client.post(
log_attributes["ingestUrl"],
headers=headers,
json={
"logs": logs,
},
)
handle_status_code(response)
logger.debug("Logs successfully ingested")
29 changes: 29 additions & 0 deletions port_ocean/config/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,9 @@ def load_providers(
class BaseOceanSettings(BaseSettings):
base_path: str

def get_sensitive_fields_data(self) -> set[str]:
return _get_sensitive_information(self)

class Config:
yaml_file = "./config.yaml"
env_prefix = "OCEAN__"
Expand All @@ -153,3 +156,29 @@ def customise_sources( # type: ignore
s, env_settings(s), init_settings.init_kwargs["base_path"]
),
)


class BaseOceanModel(BaseModel):
def get_sensitive_fields_data(self) -> set[str]:
return _get_sensitive_information(self)


def _get_sensitive_information(
model: BaseOceanModel | BaseSettings,
) -> set[str]:
sensitive_fields = [
field_name
for field_name, field in model.__fields__.items()
if field.field_info.extra.get("sensitive", False)
]
sensitive_set = {str(getattr(model, field_name)) for field_name in sensitive_fields}

recursive_sensitive_data = [
getattr(model, field_name).get_sensitive_fields_data()
for field_name, field in model.__fields__.items()
if isinstance(getattr(model, field_name), BaseOceanModel)
]
for sensitive_data in recursive_sensitive_data:
sensitive_set.update(sensitive_data)

return sensitive_set
8 changes: 6 additions & 2 deletions port_ocean/config/dynamic.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,17 @@

from humps import decamelize
from pydantic import BaseModel, AnyUrl, create_model, Extra, parse_obj_as, validator
from pydantic.fields import ModelField
from pydantic.fields import ModelField, Field

from port_ocean.config.base import BaseOceanModel


class Configuration(BaseModel, extra=Extra.allow):
name: str
type: str
required: bool = False
default: Optional[Any]
sensitive: bool = False


def dynamic_parse(value: Any, field: ModelField) -> Any:
Expand Down Expand Up @@ -51,11 +54,12 @@ def default_config_factory(configurations: Any) -> Type[BaseModel]:
default = parse_obj_as(field_type, config.default)
fields[decamelize(config.name)] = (
field_type,
default,
Field(default, sensitive=config.sensitive),
)

dynamic_model = create_model( # type: ignore
__model_name="Config",
__base__=BaseOceanModel,
**fields,
__validators__={"dynamic_parse": validator("*", pre=True)(dynamic_parse)},
)
Expand Down
19 changes: 11 additions & 8 deletions port_ocean/config/settings.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,18 @@
from typing import Any, Literal

from pydantic import BaseSettings, BaseModel, Extra, AnyHttpUrl, parse_obj_as, validator
from pydantic import Extra, AnyHttpUrl, parse_obj_as, validator
from pydantic.fields import Field
from pydantic.main import BaseModel

from port_ocean.config.base import BaseOceanSettings
from port_ocean.config.base import BaseOceanSettings, BaseOceanModel
from port_ocean.core.event_listener import EventListenerSettingsType

LogLevelType = Literal["ERROR", "WARNING", "INFO", "DEBUG", "CRITICAL"]


class ApplicationSettings(BaseSettings):
class ApplicationSettings(BaseOceanModel):
log_level: LogLevelType = "INFO"
enable_http_logging: bool = True
port: int = 8000

class Config:
Expand All @@ -22,16 +25,16 @@ def customise_sources(cls, init_settings, env_settings, *_, **__): # type: igno
return env_settings, init_settings


class PortSettings(BaseModel, extra=Extra.allow):
client_id: str
client_secret: str
class PortSettings(BaseOceanModel, extra=Extra.allow):
client_id: str = Field(..., sensitive=True)
client_secret: str = Field(..., sensitive=True)
base_url: AnyHttpUrl = parse_obj_as(AnyHttpUrl, "https://api.getport.io")


class IntegrationSettings(BaseModel, extra=Extra.allow):
class IntegrationSettings(BaseOceanModel, extra=Extra.allow):
identifier: str
type: str
config: dict[str, Any]
config: dict[str, Any] | BaseModel

@validator("identifier", "type")
def validate_lower(cls, v: str) -> str:
Expand Down
7 changes: 0 additions & 7 deletions port_ocean/consumers/base_consumer.py

This file was deleted.

16 changes: 6 additions & 10 deletions port_ocean/consumers/kafka_consumer.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,10 @@
import signal
import threading
from typing import Any, Callable

from confluent_kafka import Consumer, KafkaException, Message # type: ignore
from loguru import logger
from pydantic import BaseModel

from port_ocean.consumers.base_consumer import BaseConsumer


class KafkaConsumerConfig(BaseModel):
brokers: str
Expand All @@ -19,7 +17,7 @@ class KafkaConsumerConfig(BaseModel):
consumer_poll_timeout: int


class KafkaConsumer(BaseConsumer):
class KafkaConsumer:
def __init__(
self,
msg_process: Callable[[Message], None],
Expand All @@ -30,9 +28,6 @@ def __init__(
self.org_id = org_id
self.config = config

signal.signal(signal.SIGINT, self.exit_gracefully)
signal.signal(signal.SIGTERM, self.exit_gracefully)

self.msg_process = msg_process
if config.kafka_security_enabled:
kafka_config = {
Expand All @@ -53,7 +48,8 @@ def __init__(

self.consumer = Consumer(kafka_config)

def start(self) -> None:
def start(self, event: threading.Event) -> None:
self.running = True
try:
logger.info("Start consumer...")

Expand All @@ -64,8 +60,7 @@ def start(self) -> None:
),
)
logger.info("Subscribed to topics")
self.running = True
while self.running:
while self.running and not event.is_set():
try:
msg = self.consumer.poll(timeout=self.config.consumer_poll_timeout)
if msg is None:
Expand All @@ -90,6 +85,7 @@ def start(self) -> None:
except Exception as message_error:
logger.error(str(message_error))
finally:
logger.info("Closing consumer...")
self.exit_gracefully()

def exit_gracefully(self, *_: Any) -> None:
Expand Down
2 changes: 1 addition & 1 deletion port_ocean/context/event.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
EventContextNotFoundError,
ResourceContextNotFoundError,
)
from port_ocean.utils import get_time
from port_ocean.utils.misc import get_time

if TYPE_CHECKING:
from port_ocean.core.handlers.port_app_config.models import (
Expand Down
Loading

0 comments on commit e7d1440

Please sign in to comment.