From b5b8e776c702fd593560188df3aebc8b97ee116b Mon Sep 17 00:00:00 2001 From: "gabriel.wang" Date: Wed, 24 Jan 2024 23:36:36 -0500 Subject: [PATCH] feat(config): Add oauth cb provider --- metadata-ingestion/setup.py | 121 ++++++++++-------- .../src/datahub/configuration/kafka.py | 11 ++ .../aws_msk_iam_sasl_signer_cb_provider.py | 28 ++++ .../base_oauth_cb_provider.py | 21 +++ 4 files changed, 125 insertions(+), 56 deletions(-) create mode 100644 metadata-ingestion/src/datahub/utilities/oauth_cb_providers/aws_msk_iam_sasl_signer_cb_provider.py diff --git a/metadata-ingestion/setup.py b/metadata-ingestion/setup.py index f8d51997330a9d..5af712be8a4e9a 100644 --- a/metadata-ingestion/setup.py +++ b/metadata-ingestion/setup.py @@ -98,24 +98,24 @@ } sql_common = ( - { - # Required for all SQL sources. - # This is temporary lower bound that we're open to loosening/tightening as requirements show up - "sqlalchemy>=1.4.39, <2", - # Required for SQL profiling. - "great-expectations>=0.15.12, <=0.15.50", - *pydantic_no_v2, # because of great-expectations - # scipy version restricted to reduce backtracking, used by great-expectations, - "scipy>=1.7.2", - # GE added handling for higher version of jinja2 - # https://github.com/great-expectations/great_expectations/pull/5382/files - # datahub does not depend on traitlets directly but great expectations does. - # https://github.com/ipython/traitlets/issues/741 - "traitlets<5.2.2", - "greenlet", - } - | usage_common - | sqlglot_lib + { + # Required for all SQL sources. + # This is temporary lower bound that we're open to loosening/tightening as requirements show up + "sqlalchemy>=1.4.39, <2", + # Required for SQL profiling. + "great-expectations>=0.15.12, <=0.15.50", + *pydantic_no_v2, # because of great-expectations + # scipy version restricted to reduce backtracking, used by great-expectations, + "scipy>=1.7.2", + # GE added handling for higher version of jinja2 + # https://github.com/great-expectations/great_expectations/pull/5382/files + # datahub does not depend on traitlets directly but great expectations does. + # https://github.com/ipython/traitlets/issues/741 + "traitlets<5.2.2", + "greenlet", + } + | usage_common + | sqlglot_lib ) sqllineage_lib = { @@ -134,6 +134,14 @@ "botocore!=1.23.0", } +aws_msk_iam_sasl_signer = { + # AWS MSK IAM SASL Singer + "aws-msk-iam-sasl-signer-python", + # Deal with a version incompatibility between botocore (used by boto3) and urllib3. + # See https://github.com/boto/botocore/pull/2563. + "botocore!=1.23.0", +} + path_spec_common = { "parse>=1.19.0", "wcmatch", @@ -286,14 +294,14 @@ # sqlalchemy-bigquery is included here since it provides an implementation of # a SQLalchemy-conform STRUCT type definition "athena": sql_common - | {"PyAthena[SQLAlchemy]>=2.6.0,<3.0.0", "sqlalchemy-bigquery>=1.4.1"}, + | {"PyAthena[SQLAlchemy]>=2.6.0,<3.0.0", "sqlalchemy-bigquery>=1.4.1"}, "azure-ad": set(), "bigquery": sql_common - | bigquery_common - | { - *sqlglot_lib, - "google-cloud-datacatalog-lineage==0.2.2", - }, + | bigquery_common + | { + *sqlglot_lib, + "google-cloud-datacatalog-lineage==0.2.2", + }, "clickhouse": sql_common | clickhouse_common, "clickhouse-usage": sql_common | usage_common | clickhouse_common, "datahub-lineage-file": set(), @@ -317,19 +325,19 @@ "glue": aws_common, # hdbcli is supported officially by SAP, sqlalchemy-hana is built on top but not officially supported "hana": sql_common - | { - "sqlalchemy-hana>=0.5.0; platform_machine != 'aarch64' and platform_machine != 'arm64'", - "hdbcli>=2.11.20; platform_machine != 'aarch64' and platform_machine != 'arm64'", - }, + | { + "sqlalchemy-hana>=0.5.0; platform_machine != 'aarch64' and platform_machine != 'arm64'", + "hdbcli>=2.11.20; platform_machine != 'aarch64' and platform_machine != 'arm64'", + }, "hive": sql_common - | pyhive_common - | { - "databricks-dbapi", - # Due to https://github.com/great-expectations/great_expectations/issues/6146, - # we cannot allow 0.15.{23-26}. This was fixed in 0.15.27 by - # https://github.com/great-expectations/great_expectations/pull/6149. - "great-expectations != 0.15.23, != 0.15.24, != 0.15.25, != 0.15.26", - }, + | pyhive_common + | { + "databricks-dbapi", + # Due to https://github.com/great-expectations/great_expectations/issues/6146, + # we cannot allow 0.15.{23-26}. This was fixed in 0.15.27 by + # https://github.com/great-expectations/great_expectations/pull/6149. + "great-expectations != 0.15.23, != 0.15.24, != 0.15.25, != 0.15.26", + }, "iceberg": iceberg_common, "json-schema": set(), "kafka": kafka_common | kafka_protobuf, @@ -342,10 +350,10 @@ "mode": {"requests", "tenacity>=8.0.1"} | sqllineage_lib, "mongodb": {"pymongo[srv]>=3.11", "packaging"}, "mssql": sql_common - | { - "sqlalchemy-pytds>=0.3", - "pyOpenSSL", - }, + | { + "sqlalchemy-pytds>=0.3", + "pyOpenSSL", + }, "mssql-odbc": sql_common | {"pyodbc"}, "mysql": mysql, # mariadb should have same dependency as mysql @@ -355,15 +363,15 @@ "postgres": sql_common | {"psycopg2-binary", "GeoAlchemy2"}, "presto": sql_common | pyhive_common | trino, "presto-on-hive": sql_common - | pyhive_common - | {"psycopg2-binary", "pymysql>=1.0.2"}, + | pyhive_common + | {"psycopg2-binary", "pymysql>=1.0.2"}, "pulsar": {"requests"}, "redash": {"redash-toolbelt", "sql-metadata"} | sqllineage_lib, "redshift": sql_common - | redshift_common - | usage_common - | sqlglot_lib - | {"cachetools"}, + | redshift_common + | usage_common + | sqlglot_lib + | {"cachetools"}, "s3": {*s3_base, *data_lake_profiling}, "gcs": {*s3_base, *data_lake_profiling}, "sagemaker": aws_common, @@ -381,9 +389,9 @@ # to remove that dependency. "tableau": {"tableauserverclient>=0.17.0"} | sqllineage_lib | sqlglot_lib, "teradata": sql_common - | usage_common - | sqlglot_lib - | {"teradatasqlalchemy>=17.20.0.0"}, + | usage_common + | sqlglot_lib + | {"teradatasqlalchemy>=17.20.0.0"}, "trino": sql_common | trino, "starburst-trino-usage": sql_common | usage_common | trino, "nifi": {"requests", "packaging", "requests-gssapi"}, @@ -731,13 +739,13 @@ | ( plugin_common if plugin - not in { - "airflow", - "datahub-rest", - "datahub-kafka", - "sync-file-emitter", - "sql-parser", - } + not in { + "airflow", + "datahub-rest", + "datahub-kafka", + "sync-file-emitter", + "sql-parser", + } else set() ) | dependencies @@ -754,6 +762,7 @@ ) ), "cloud": ["acryl-datahub-cloud"], + "aws-msk-iam-sasl-singer": list(aws_msk_iam_sasl_signer), "dev": list(dev_requirements), "testing-utils": list(test_api_requirements), # To import `datahub.testing` "integration-tests": list(full_test_dev_requirements), diff --git a/metadata-ingestion/src/datahub/configuration/kafka.py b/metadata-ingestion/src/datahub/configuration/kafka.py index 07e2f759bb3ff6..2900943577343b 100644 --- a/metadata-ingestion/src/datahub/configuration/kafka.py +++ b/metadata-ingestion/src/datahub/configuration/kafka.py @@ -2,6 +2,8 @@ from datahub.configuration.common import ConfigModel from datahub.configuration.validate_host_port import validate_host_port +from datahub.ingestion.api.registry import import_path +from datahub.utilities.oauth_cb_providers.base_oauth_cb_provider import BaseOAuthCbProvider class _KafkaConnectionConfig(ConfigModel): @@ -36,6 +38,10 @@ class KafkaConsumerConnectionConfig(_KafkaConnectionConfig): description="Extra consumer config serialized as JSON. These options will be passed into Kafka's DeserializingConsumer. See https://docs.confluent.io/platform/current/clients/confluent-kafka-python/html/index.html#deserializingconsumer and https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md .", ) + if "oauth_cb_provider_class" in consumer_config: + oauth_cb_provider_class: BaseOAuthCbProvider = import_path(consumer_config["oauth_cb_provider_class"]) + consumer_config["oauth_cb"] = oauth_cb_provider_class.oauth_cb + class KafkaProducerConnectionConfig(_KafkaConnectionConfig): """Configuration class for holding connectivity information for Kafka producers""" @@ -44,3 +50,8 @@ class KafkaProducerConnectionConfig(_KafkaConnectionConfig): default_factory=dict, description="Extra producer config serialized as JSON. These options will be passed into Kafka's SerializingProducer. See https://docs.confluent.io/platform/current/clients/confluent-kafka-python/html/index.html#serializingproducer and https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md .", ) + + if "oauth_cb_provider_class" in producer_config: + oauth_cb_provider_class: BaseOAuthCbProvider = import_path(producer_config["oauth_cb_provider_class"]) + producer_config["oauth_cb"] = oauth_cb_provider_class.oauth_cb + diff --git a/metadata-ingestion/src/datahub/utilities/oauth_cb_providers/aws_msk_iam_sasl_signer_cb_provider.py b/metadata-ingestion/src/datahub/utilities/oauth_cb_providers/aws_msk_iam_sasl_signer_cb_provider.py new file mode 100644 index 00000000000000..f424f85e330b6d --- /dev/null +++ b/metadata-ingestion/src/datahub/utilities/oauth_cb_providers/aws_msk_iam_sasl_signer_cb_provider.py @@ -0,0 +1,28 @@ +# Copyright 2021 Acryl Data, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import os + +from aws_msk_iam_sasl_signer import MSKAuthTokenProvider +from base_oauth_cb_provider import BaseOAuthCbProvider + + +class AwsMskIamSaslSignerCbProvider(BaseOAuthCbProvider): + @staticmethod + def oauth_cb(oauth_config): + aws_region = os.environ.get("AWS_REGION") + aws_debug_creds = os.environ.get("AWS_DEBUG_CREDS").lower() == "true" + auth_token, expiry_ms = MSKAuthTokenProvider.generate_auth_token(aws_region, aws_debug_creds=aws_debug_creds) + # Note that this library expects oauth_cb to return expiry time in seconds since epoch, while the token + # generator returns expiry in ms + return auth_token, expiry_ms / 1000 diff --git a/metadata-ingestion/src/datahub/utilities/oauth_cb_providers/base_oauth_cb_provider.py b/metadata-ingestion/src/datahub/utilities/oauth_cb_providers/base_oauth_cb_provider.py index e69de29bb2d1d6..b804190a47d936 100644 --- a/metadata-ingestion/src/datahub/utilities/oauth_cb_providers/base_oauth_cb_provider.py +++ b/metadata-ingestion/src/datahub/utilities/oauth_cb_providers/base_oauth_cb_provider.py @@ -0,0 +1,21 @@ +# Copyright 2021 Acryl Data, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from abc import ABC + + +class BaseOAuthCbProvider(ABC): + @staticmethod + def oauth_cb(oauth_config): + pass