Skip to content

Commit

Permalink
feat(config): Add oauth cb provider
Browse files Browse the repository at this point in the history
  • Loading branch information
gabrielwry committed Jan 31, 2024
1 parent 7eee6fe commit b5b8e77
Show file tree
Hide file tree
Showing 4 changed files with 125 additions and 56 deletions.
121 changes: 65 additions & 56 deletions metadata-ingestion/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,24 +98,24 @@
}

sql_common = (
{
# Required for all SQL sources.
# This is temporary lower bound that we're open to loosening/tightening as requirements show up
"sqlalchemy>=1.4.39, <2",
# Required for SQL profiling.
"great-expectations>=0.15.12, <=0.15.50",
*pydantic_no_v2, # because of great-expectations
# scipy version restricted to reduce backtracking, used by great-expectations,
"scipy>=1.7.2",
# GE added handling for higher version of jinja2
# https://github.com/great-expectations/great_expectations/pull/5382/files
# datahub does not depend on traitlets directly but great expectations does.
# https://github.com/ipython/traitlets/issues/741
"traitlets<5.2.2",
"greenlet",
}
| usage_common
| sqlglot_lib
{
# Required for all SQL sources.
# This is temporary lower bound that we're open to loosening/tightening as requirements show up
"sqlalchemy>=1.4.39, <2",
# Required for SQL profiling.
"great-expectations>=0.15.12, <=0.15.50",
*pydantic_no_v2, # because of great-expectations
# scipy version restricted to reduce backtracking, used by great-expectations,
"scipy>=1.7.2",
# GE added handling for higher version of jinja2
# https://github.com/great-expectations/great_expectations/pull/5382/files
# datahub does not depend on traitlets directly but great expectations does.
# https://github.com/ipython/traitlets/issues/741
"traitlets<5.2.2",
"greenlet",
}
| usage_common
| sqlglot_lib
)

sqllineage_lib = {
Expand All @@ -134,6 +134,14 @@
"botocore!=1.23.0",
}

aws_msk_iam_sasl_signer = {
# AWS MSK IAM SASL Singer
"aws-msk-iam-sasl-signer-python",
# Deal with a version incompatibility between botocore (used by boto3) and urllib3.
# See https://github.com/boto/botocore/pull/2563.
"botocore!=1.23.0",
}

path_spec_common = {
"parse>=1.19.0",
"wcmatch",
Expand Down Expand Up @@ -286,14 +294,14 @@
# sqlalchemy-bigquery is included here since it provides an implementation of
# a SQLalchemy-conform STRUCT type definition
"athena": sql_common
| {"PyAthena[SQLAlchemy]>=2.6.0,<3.0.0", "sqlalchemy-bigquery>=1.4.1"},
| {"PyAthena[SQLAlchemy]>=2.6.0,<3.0.0", "sqlalchemy-bigquery>=1.4.1"},
"azure-ad": set(),
"bigquery": sql_common
| bigquery_common
| {
*sqlglot_lib,
"google-cloud-datacatalog-lineage==0.2.2",
},
| bigquery_common
| {
*sqlglot_lib,
"google-cloud-datacatalog-lineage==0.2.2",
},
"clickhouse": sql_common | clickhouse_common,
"clickhouse-usage": sql_common | usage_common | clickhouse_common,
"datahub-lineage-file": set(),
Expand All @@ -317,19 +325,19 @@
"glue": aws_common,
# hdbcli is supported officially by SAP, sqlalchemy-hana is built on top but not officially supported
"hana": sql_common
| {
"sqlalchemy-hana>=0.5.0; platform_machine != 'aarch64' and platform_machine != 'arm64'",
"hdbcli>=2.11.20; platform_machine != 'aarch64' and platform_machine != 'arm64'",
},
| {
"sqlalchemy-hana>=0.5.0; platform_machine != 'aarch64' and platform_machine != 'arm64'",
"hdbcli>=2.11.20; platform_machine != 'aarch64' and platform_machine != 'arm64'",
},
"hive": sql_common
| pyhive_common
| {
"databricks-dbapi",
# Due to https://github.com/great-expectations/great_expectations/issues/6146,
# we cannot allow 0.15.{23-26}. This was fixed in 0.15.27 by
# https://github.com/great-expectations/great_expectations/pull/6149.
"great-expectations != 0.15.23, != 0.15.24, != 0.15.25, != 0.15.26",
},
| pyhive_common
| {
"databricks-dbapi",
# Due to https://github.com/great-expectations/great_expectations/issues/6146,
# we cannot allow 0.15.{23-26}. This was fixed in 0.15.27 by
# https://github.com/great-expectations/great_expectations/pull/6149.
"great-expectations != 0.15.23, != 0.15.24, != 0.15.25, != 0.15.26",
},
"iceberg": iceberg_common,
"json-schema": set(),
"kafka": kafka_common | kafka_protobuf,
Expand All @@ -342,10 +350,10 @@
"mode": {"requests", "tenacity>=8.0.1"} | sqllineage_lib,
"mongodb": {"pymongo[srv]>=3.11", "packaging"},
"mssql": sql_common
| {
"sqlalchemy-pytds>=0.3",
"pyOpenSSL",
},
| {
"sqlalchemy-pytds>=0.3",
"pyOpenSSL",
},
"mssql-odbc": sql_common | {"pyodbc"},
"mysql": mysql,
# mariadb should have same dependency as mysql
Expand All @@ -355,15 +363,15 @@
"postgres": sql_common | {"psycopg2-binary", "GeoAlchemy2"},
"presto": sql_common | pyhive_common | trino,
"presto-on-hive": sql_common
| pyhive_common
| {"psycopg2-binary", "pymysql>=1.0.2"},
| pyhive_common
| {"psycopg2-binary", "pymysql>=1.0.2"},
"pulsar": {"requests"},
"redash": {"redash-toolbelt", "sql-metadata"} | sqllineage_lib,
"redshift": sql_common
| redshift_common
| usage_common
| sqlglot_lib
| {"cachetools"},
| redshift_common
| usage_common
| sqlglot_lib
| {"cachetools"},
"s3": {*s3_base, *data_lake_profiling},
"gcs": {*s3_base, *data_lake_profiling},
"sagemaker": aws_common,
Expand All @@ -381,9 +389,9 @@
# to remove that dependency.
"tableau": {"tableauserverclient>=0.17.0"} | sqllineage_lib | sqlglot_lib,
"teradata": sql_common
| usage_common
| sqlglot_lib
| {"teradatasqlalchemy>=17.20.0.0"},
| usage_common
| sqlglot_lib
| {"teradatasqlalchemy>=17.20.0.0"},
"trino": sql_common | trino,
"starburst-trino-usage": sql_common | usage_common | trino,
"nifi": {"requests", "packaging", "requests-gssapi"},
Expand Down Expand Up @@ -731,13 +739,13 @@
| (
plugin_common
if plugin
not in {
"airflow",
"datahub-rest",
"datahub-kafka",
"sync-file-emitter",
"sql-parser",
}
not in {
"airflow",
"datahub-rest",
"datahub-kafka",
"sync-file-emitter",
"sql-parser",
}
else set()
)
| dependencies
Expand All @@ -754,6 +762,7 @@
)
),
"cloud": ["acryl-datahub-cloud"],
"aws-msk-iam-sasl-singer": list(aws_msk_iam_sasl_signer),
"dev": list(dev_requirements),
"testing-utils": list(test_api_requirements), # To import `datahub.testing`
"integration-tests": list(full_test_dev_requirements),
Expand Down
11 changes: 11 additions & 0 deletions metadata-ingestion/src/datahub/configuration/kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

from datahub.configuration.common import ConfigModel
from datahub.configuration.validate_host_port import validate_host_port
from datahub.ingestion.api.registry import import_path
from datahub.utilities.oauth_cb_providers.base_oauth_cb_provider import BaseOAuthCbProvider


class _KafkaConnectionConfig(ConfigModel):
Expand Down Expand Up @@ -36,6 +38,10 @@ class KafkaConsumerConnectionConfig(_KafkaConnectionConfig):
description="Extra consumer config serialized as JSON. These options will be passed into Kafka's DeserializingConsumer. See https://docs.confluent.io/platform/current/clients/confluent-kafka-python/html/index.html#deserializingconsumer and https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md .",
)

if "oauth_cb_provider_class" in consumer_config:
oauth_cb_provider_class: BaseOAuthCbProvider = import_path(consumer_config["oauth_cb_provider_class"])
consumer_config["oauth_cb"] = oauth_cb_provider_class.oauth_cb


class KafkaProducerConnectionConfig(_KafkaConnectionConfig):
"""Configuration class for holding connectivity information for Kafka producers"""
Expand All @@ -44,3 +50,8 @@ class KafkaProducerConnectionConfig(_KafkaConnectionConfig):
default_factory=dict,
description="Extra producer config serialized as JSON. These options will be passed into Kafka's SerializingProducer. See https://docs.confluent.io/platform/current/clients/confluent-kafka-python/html/index.html#serializingproducer and https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md .",
)

if "oauth_cb_provider_class" in producer_config:
oauth_cb_provider_class: BaseOAuthCbProvider = import_path(producer_config["oauth_cb_provider_class"])
producer_config["oauth_cb"] = oauth_cb_provider_class.oauth_cb

Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
# Copyright 2021 Acryl Data, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os

from aws_msk_iam_sasl_signer import MSKAuthTokenProvider
from base_oauth_cb_provider import BaseOAuthCbProvider


class AwsMskIamSaslSignerCbProvider(BaseOAuthCbProvider):
@staticmethod
def oauth_cb(oauth_config):
aws_region = os.environ.get("AWS_REGION")
aws_debug_creds = os.environ.get("AWS_DEBUG_CREDS").lower() == "true"
auth_token, expiry_ms = MSKAuthTokenProvider.generate_auth_token(aws_region, aws_debug_creds=aws_debug_creds)
# Note that this library expects oauth_cb to return expiry time in seconds since epoch, while the token
# generator returns expiry in ms
return auth_token, expiry_ms / 1000
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
# Copyright 2021 Acryl Data, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from abc import ABC


class BaseOAuthCbProvider(ABC):
@staticmethod
def oauth_cb(oauth_config):
pass

0 comments on commit b5b8e77

Please sign in to comment.