Skip to content

Commit

Permalink
feat(ingest/kafka): additional validation for oauth_db signature (dat…
Browse files Browse the repository at this point in the history
  • Loading branch information
mayurinehate authored Dec 5, 2024
1 parent 65f44ef commit 8d15df0
Show file tree
Hide file tree
Showing 3 changed files with 77 additions and 2 deletions.
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import inspect
import logging
from typing import Any, Dict, Optional

Expand Down Expand Up @@ -34,5 +35,34 @@ def _resolve_oauth_callback(self) -> None:
"oauth_cb must be a string representing python function reference "
"in the format <python-module>:<function-name>."
)

call_back_fn = import_path(call_back)
self._validate_call_back_fn_signature(call_back_fn)

# Set the callback
self._config[CallableConsumerConfig.CALLBACK_ATTRIBUTE] = import_path(call_back)
self._config[CallableConsumerConfig.CALLBACK_ATTRIBUTE] = call_back_fn

def _validate_call_back_fn_signature(self, call_back_fn: Any) -> None:
sig = inspect.signature(call_back_fn)

num_positional_args = len(
[
param
for param in sig.parameters.values()
if param.kind
in (
inspect.Parameter.POSITIONAL_ONLY,
inspect.Parameter.POSITIONAL_OR_KEYWORD,
)
and param.default == inspect.Parameter.empty
]
)

has_variadic_args = any(
param.kind == inspect.Parameter.VAR_POSITIONAL
for param in sig.parameters.values()
)

assert num_positional_args == 1 or (
has_variadic_args and num_positional_args <= 1
), "oauth_cb function must accept single positional argument."
16 changes: 16 additions & 0 deletions metadata-ingestion/tests/integration/kafka/oauth.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,19 @@ def create_token(*args: Any, **kwargs: Any) -> Tuple[str, int]:
"eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJjbGllbnRfaWQiOiJrYWZrYV9jbGllbnQiLCJleHAiOjE2OTg3NjYwMDB9.dummy_sig_abcdef123456",
3600,
)


def create_token_no_args() -> Tuple[str, int]:
logger.warning(MESSAGE)
return (
"eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJjbGllbnRfaWQiOiJrYWZrYV9jbGllbnQiLCJleHAiOjE2OTg3NjYwMDB9.dummy_sig_abcdef123456",
3600,
)


def create_token_only_kwargs(**kwargs: Any) -> Tuple[str, int]:
logger.warning(MESSAGE)
return (
"eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJjbGllbnRfaWQiOiJrYWZrYV9jbGllbnQiLCJleHAiOjE2OTg3NjYwMDB9.dummy_sig_abcdef123456",
3600,
)
31 changes: 30 additions & 1 deletion metadata-ingestion/tests/integration/kafka/test_kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,10 @@
import yaml
from freezegun import freeze_time

from datahub.configuration.common import ConfigurationError
from datahub.ingestion.api.source import SourceCapability
from datahub.ingestion.run.pipeline import Pipeline
from datahub.ingestion.source.kafka.kafka import KafkaSource
from datahub.ingestion.source.kafka.kafka import KafkaSource, KafkaSourceConfig
from tests.integration.kafka import oauth # type: ignore
from tests.test_helpers import mce_helpers, test_connection_helpers
from tests.test_helpers.click_helpers import run_datahub_cmd
Expand Down Expand Up @@ -157,3 +158,31 @@ def test_kafka_oauth_callback(
assert checks["consumer_oauth_callback"], "Consumer oauth callback not found"
assert checks["admin_polling"], "Admin polling was not initiated"
assert checks["admin_oauth_callback"], "Admin oauth callback not found"


def test_kafka_source_oauth_cb_signature():
with pytest.raises(
ConfigurationError,
match=("oauth_cb function must accept single positional argument."),
):
KafkaSourceConfig.parse_obj(
{
"connection": {
"bootstrap": "foobar:9092",
"consumer_config": {"oauth_cb": "oauth:create_token_no_args"},
}
}
)

with pytest.raises(
ConfigurationError,
match=("oauth_cb function must accept single positional argument."),
):
KafkaSourceConfig.parse_obj(
{
"connection": {
"bootstrap": "foobar:9092",
"consumer_config": {"oauth_cb": "oauth:create_token_only_kwargs"},
}
}
)

0 comments on commit 8d15df0

Please sign in to comment.