Skip to content

Commit

Permalink
fix(airflow): fix provider loading exception (datahub-project#8861)
Browse files Browse the repository at this point in the history
  • Loading branch information
hsheth2 authored Sep 20, 2023
1 parent b466359 commit 6c6216a
Show file tree
Hide file tree
Showing 4 changed files with 37 additions and 6 deletions.
3 changes: 2 additions & 1 deletion metadata-ingestion-modules/airflow-plugin/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,8 @@ def get_long_description():


entry_points = {
"airflow.plugins": "acryl-datahub-airflow-plugin = datahub_airflow_plugin.datahub_plugin:DatahubPlugin"
"airflow.plugins": "acryl-datahub-airflow-plugin = datahub_airflow_plugin.datahub_plugin:DatahubPlugin",
"apache_airflow_provider": ["provider_info=datahub_provider:get_provider_info"],
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,20 @@ def get_provider_info():
"package-name": f"{__package_name__}",
"name": f"{__package_name__}",
"description": "Datahub metadata collector plugin",
"connection-types": [
{
"hook-class-name": "datahub_airflow_plugin.hooks.datahub.DatahubRestHook",
"connection-type": "datahub-rest",
},
{
"hook-class-name": "datahub_airflow_plugin.hooks.datahub.DatahubKafkaHook",
"connection-type": "datahub-kafka",
},
],
# Deprecated method of providing connection types, kept for backwards compatibility.
# We can remove with Airflow 3.
"hook-class-names": [
"datahub_airflow_plugin.hooks.datahub.DatahubRestHook",
"datahub_airflow_plugin.hooks.datahub.DatahubKafkaHook",
],
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ class DatahubRestHook(BaseHook):

conn_name_attr = "datahub_rest_conn_id"
default_conn_name = "datahub_rest_default"
conn_type = "datahub_rest"
conn_type = "datahub-rest"
hook_name = "DataHub REST Server"

def __init__(self, datahub_rest_conn_id: str = default_conn_name) -> None:
Expand All @@ -50,6 +50,15 @@ def get_ui_field_behaviour() -> Dict:
},
}

def test_connection(self) -> Tuple[bool, str]:
try:
emitter = self.make_emitter()
emitter.test_connection()
except Exception as e:
return False, str(e)

return True, "Successfully connected to DataHub."

def _get_config(self) -> Tuple[str, Optional[str], Optional[int]]:
conn: "Connection" = self.get_connection(self.datahub_rest_conn_id)

Expand Down Expand Up @@ -99,7 +108,7 @@ class DatahubKafkaHook(BaseHook):

conn_name_attr = "datahub_kafka_conn_id"
default_conn_name = "datahub_kafka_default"
conn_type = "datahub_kafka"
conn_type = "datahub-kafka"
hook_name = "DataHub Kafka Sink"

def __init__(self, datahub_kafka_conn_id: str = default_conn_name) -> None:
Expand Down Expand Up @@ -194,9 +203,15 @@ def get_underlying_hook(self) -> Union[DatahubRestHook, DatahubKafkaHook]:

# We need to figure out the underlying hook type. First check the
# conn_type. If that fails, attempt to guess using the conn id name.
if conn.conn_type == DatahubRestHook.conn_type:
if (
conn.conn_type == DatahubRestHook.conn_type
or conn.conn_type == DatahubRestHook.conn_type.replace("-", "_")
):
return DatahubRestHook(self.datahub_conn_id)
elif conn.conn_type == DatahubKafkaHook.conn_type:
elif (
conn.conn_type == DatahubKafkaHook.conn_type
or conn.conn_type == DatahubKafkaHook.conn_type.replace("-", "_")
):
return DatahubKafkaHook(self.datahub_conn_id)
elif "rest" in self.datahub_conn_id:
return DatahubRestHook(self.datahub_conn_id)
Expand Down
1 change: 0 additions & 1 deletion metadata-ingestion/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -643,7 +643,6 @@ def get_long_description():
"datahub = datahub.ingestion.reporting.datahub_ingestion_run_summary_provider:DatahubIngestionRunSummaryProvider",
"file = datahub.ingestion.reporting.file_reporter:FileReporter",
],
"apache_airflow_provider": ["provider_info=datahub_provider:get_provider_info"],
}


Expand Down

0 comments on commit 6c6216a

Please sign in to comment.