diff --git a/metadata-ingestion/src/datahub/configuration/kafka_consumer_config.py b/metadata-ingestion/src/datahub/configuration/kafka_consumer_config.py index cac6bb4996391..f08c78cadc0b2 100644 --- a/metadata-ingestion/src/datahub/configuration/kafka_consumer_config.py +++ b/metadata-ingestion/src/datahub/configuration/kafka_consumer_config.py @@ -1,3 +1,4 @@ +import inspect import logging from typing import Any, Dict, Optional @@ -34,5 +35,34 @@ def _resolve_oauth_callback(self) -> None: "oauth_cb must be a string representing python function reference " "in the format :." ) + + call_back_fn = import_path(call_back) + self._validate_call_back_fn_signature(call_back_fn) + # Set the callback - self._config[CallableConsumerConfig.CALLBACK_ATTRIBUTE] = import_path(call_back) + self._config[CallableConsumerConfig.CALLBACK_ATTRIBUTE] = call_back_fn + + def _validate_call_back_fn_signature(self, call_back_fn: Any) -> None: + sig = inspect.signature(call_back_fn) + + num_positional_args = len( + [ + param + for param in sig.parameters.values() + if param.kind + in ( + inspect.Parameter.POSITIONAL_ONLY, + inspect.Parameter.POSITIONAL_OR_KEYWORD, + ) + and param.default == inspect.Parameter.empty + ] + ) + + has_variadic_args = any( + param.kind == inspect.Parameter.VAR_POSITIONAL + for param in sig.parameters.values() + ) + + assert num_positional_args == 1 or ( + has_variadic_args and num_positional_args <= 1 + ), "oauth_cb function must accept single positional argument." diff --git a/metadata-ingestion/tests/integration/kafka/oauth.py b/metadata-ingestion/tests/integration/kafka/oauth.py index 28cfee521d6c0..81a91fcd5e406 100644 --- a/metadata-ingestion/tests/integration/kafka/oauth.py +++ b/metadata-ingestion/tests/integration/kafka/oauth.py @@ -12,3 +12,19 @@ def create_token(*args: Any, **kwargs: Any) -> Tuple[str, int]: "eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJjbGllbnRfaWQiOiJrYWZrYV9jbGllbnQiLCJleHAiOjE2OTg3NjYwMDB9.dummy_sig_abcdef123456", 3600, ) + + +def create_token_no_args() -> Tuple[str, int]: + logger.warning(MESSAGE) + return ( + "eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJjbGllbnRfaWQiOiJrYWZrYV9jbGllbnQiLCJleHAiOjE2OTg3NjYwMDB9.dummy_sig_abcdef123456", + 3600, + ) + + +def create_token_only_kwargs(**kwargs: Any) -> Tuple[str, int]: + logger.warning(MESSAGE) + return ( + "eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJjbGllbnRfaWQiOiJrYWZrYV9jbGllbnQiLCJleHAiOjE2OTg3NjYwMDB9.dummy_sig_abcdef123456", + 3600, + ) diff --git a/metadata-ingestion/tests/integration/kafka/test_kafka.py b/metadata-ingestion/tests/integration/kafka/test_kafka.py index 7462f177684b7..bf0ec1845a66c 100644 --- a/metadata-ingestion/tests/integration/kafka/test_kafka.py +++ b/metadata-ingestion/tests/integration/kafka/test_kafka.py @@ -5,9 +5,10 @@ import yaml from freezegun import freeze_time +from datahub.configuration.common import ConfigurationError from datahub.ingestion.api.source import SourceCapability from datahub.ingestion.run.pipeline import Pipeline -from datahub.ingestion.source.kafka.kafka import KafkaSource +from datahub.ingestion.source.kafka.kafka import KafkaSource, KafkaSourceConfig from tests.integration.kafka import oauth # type: ignore from tests.test_helpers import mce_helpers, test_connection_helpers from tests.test_helpers.click_helpers import run_datahub_cmd @@ -157,3 +158,31 @@ def test_kafka_oauth_callback( assert checks["consumer_oauth_callback"], "Consumer oauth callback not found" assert checks["admin_polling"], "Admin polling was not initiated" assert checks["admin_oauth_callback"], "Admin oauth callback not found" + + +def test_kafka_source_oauth_cb_signature(): + with pytest.raises( + ConfigurationError, + match=("oauth_cb function must accept single positional argument."), + ): + KafkaSourceConfig.parse_obj( + { + "connection": { + "bootstrap": "foobar:9092", + "consumer_config": {"oauth_cb": "oauth:create_token_no_args"}, + } + } + ) + + with pytest.raises( + ConfigurationError, + match=("oauth_cb function must accept single positional argument."), + ): + KafkaSourceConfig.parse_obj( + { + "connection": { + "bootstrap": "foobar:9092", + "consumer_config": {"oauth_cb": "oauth:create_token_only_kwargs"}, + } + } + )