Skip to content

Commit

Permalink
FIxes 13543: Added Duckdb Ingestion Connector (open-metadata#14468)
Browse files Browse the repository at this point in the history
* changes

* Added duck db ingestion connector
  • Loading branch information
saurabhyadav1985 authored Dec 26, 2023
1 parent 4b7f4f4 commit 0691c96
Show file tree
Hide file tree
Showing 11 changed files with 951 additions and 2 deletions.
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""
Source connection handler
"""

from typing import Optional

from sqlalchemy.engine import Engine

from metadata.generated.schema.entity.automations.workflow import (
Workflow as AutomationWorkflow,
)
from metadata.generated.schema.entity.services.connections.database.duckdbConnection import (
DuckDbConnection,
SslMode,
)
from metadata.ingestion.connections.builders import (
create_generic_db_connection,
get_connection_args_common,
get_connection_url_common,
init_empty_connection_arguments,
)
from metadata.ingestion.connections.test_connections import test_connection_db_common
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.ingestion.source.database.duckdb.queries import DUCKDB_GET_DATABASE


def get_connection(connection: DuckDbConnection) -> Engine:
"""
Create connection
"""
if connection.sslMode:
if not connection.connectionArguments:
connection.connectionArguments = init_empty_connection_arguments()
connection.connectionArguments.__root__["sslmode"] = connection.sslMode.value
if connection.sslMode in (SslMode.verify_ca, SslMode.verify_full):
connection.connectionArguments.__root__[
"sslrootcert"
] = connection.sslConfig.__root__.certificatePath
return create_generic_db_connection(
connection=connection,
get_connection_url_fn=get_connection_url_common,
get_connection_args_fn=get_connection_args_common,
)


def test_connection(
metadata: OpenMetadata,
engine: Engine,
service_connection: DuckDbConnection,
automation_workflow: Optional[AutomationWorkflow] = None,
) -> None:
"""
Test connection. This can be executed either as part
of a metadata workflow or during an Automation Workflow
"""
queries = {
"GetDatabases": DUCKDB_GET_DATABASE,
}
test_connection_db_common(
metadata=metadata,
engine=engine,
service_connection=service_connection,
automation_workflow=automation_workflow,
queries=queries,
)
204 changes: 204 additions & 0 deletions ingestion/src/metadata/ingestion/source/database/duckdb/metadata.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,204 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
DuckDb source module
"""
import traceback
from collections import namedtuple
from typing import Iterable, Optional, Tuple

from sqlalchemy import sql
from sqlalchemy.dialects.postgresql.base import PGDialect, ischema_names
from sqlalchemy.engine.reflection import Inspector

from metadata.generated.schema.entity.data.database import Database
from metadata.generated.schema.entity.data.table import (
IntervalType,
TablePartition,
TableType,
)
from metadata.generated.schema.entity.services.connections.database.duckdbConnection import (
DuckDbConnection,
)
from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import (
OpenMetadataConnection,
)
from metadata.generated.schema.metadataIngestion.workflow import (
Source as WorkflowSource,
)
from metadata.ingestion.api.steps import InvalidSourceException
from metadata.ingestion.source.database.column_type_parser import create_sqlalchemy_type
from metadata.ingestion.source.database.common_db_source import (
CommonDbSourceService,
TableNameAndType,
)
from metadata.ingestion.source.database.duckdb.queries import (
DUCKDB_GET_DB_NAMES,
DUCKDB_GET_TABLE_NAMES,
DUCKDB_PARTITION_DETAILS,
)
from metadata.ingestion.source.database.duckdb.utils import (
get_column_info,
get_columns,
get_table_comment,
get_view_definition,
)
from metadata.ingestion.source.database.multi_db_source import MultiDBSource
from metadata.utils import fqn
from metadata.utils.filters import filter_by_database
from metadata.utils.logger import ingestion_logger
from metadata.utils.sqlalchemy_utils import (
get_all_table_comments,
get_all_view_definitions,
)

TableKey = namedtuple("TableKey", ["schema", "table_name"])

logger = ingestion_logger()


INTERVAL_TYPE_MAP = {
"list": IntervalType.COLUMN_VALUE.value,
"hash": IntervalType.COLUMN_VALUE.value,
"range": IntervalType.TIME_UNIT.value,
}

RELKIND_MAP = {
"r": TableType.Regular,
"p": TableType.Partitioned,
"f": TableType.Foreign,
}

GEOMETRY = create_sqlalchemy_type("GEOMETRY")
POINT = create_sqlalchemy_type("POINT")
POLYGON = create_sqlalchemy_type("POLYGON")

ischema_names.update(
{
"geometry": GEOMETRY,
"point": POINT,
"polygon": POLYGON,
"box": create_sqlalchemy_type("BOX"),
"circle": create_sqlalchemy_type("CIRCLE"),
"line": create_sqlalchemy_type("LINE"),
"lseg": create_sqlalchemy_type("LSEG"),
"path": create_sqlalchemy_type("PATH"),
"pg_lsn": create_sqlalchemy_type("PG_LSN"),
"pg_snapshot": create_sqlalchemy_type("PG_SNAPSHOT"),
"tsquery": create_sqlalchemy_type("TSQUERY"),
"txid_snapshot": create_sqlalchemy_type("TXID_SNAPSHOT"),
"xml": create_sqlalchemy_type("XML"),
}
)


PGDialect.get_all_table_comments = get_all_table_comments
PGDialect.get_table_comment = get_table_comment
PGDialect._get_column_info = get_column_info # pylint: disable=protected-access
PGDialect.get_view_definition = get_view_definition
PGDialect.get_columns = get_columns
PGDialect.get_all_view_definitions = get_all_view_definitions

PGDialect.ischema_names = ischema_names


class DuckDbSource(CommonDbSourceService, MultiDBSource):
"""
Implements the necessary methods to extract
Database metadata from DuckDb Source
"""

@classmethod
def create(cls, config_dict, metadata: OpenMetadataConnection):
config: WorkflowSource = WorkflowSource.parse_obj(config_dict)
connection: DuckDbConnection = config.serviceConnection.__root__.config
if not isinstance(connection, DuckDbConnection):
raise InvalidSourceException(
f"Expected DuckDbConnection, but got {connection}"
)
return cls(config, metadata)

def query_table_names_and_types(
self, schema_name: str
) -> Iterable[TableNameAndType]:
"""
Overwrite the inspector implementation to handle partitioned
and foreign types
"""
result = self.connection.execute(
sql.text(DUCKDB_GET_TABLE_NAMES),
{"schema": schema_name},
)

return [
TableNameAndType(
name=name, type_=RELKIND_MAP.get(relkind, TableType.Regular)
)
for name, relkind in result
]

def get_configured_database(self) -> Optional[str]:
if not self.service_connection.ingestAllDatabases:
return self.service_connection.database
return None

def get_database_names_raw(self) -> Iterable[str]:
yield from self._execute_database_query(DUCKDB_GET_DB_NAMES)

def get_database_names(self) -> Iterable[str]:
if not self.config.serviceConnection.__root__.config.ingestAllDatabases:
configured_db = self.config.serviceConnection.__root__.config.database
self.set_inspector(database_name=configured_db)
yield configured_db
else:
for new_database in self.get_database_names_raw():
database_fqn = fqn.build(
self.metadata,
entity_type=Database,
service_name=self.context.database_service,
database_name=new_database,
)

if filter_by_database(
self.source_config.databaseFilterPattern,
database_fqn
if self.source_config.useFqnForFiltering
else new_database,
):
self.status.filter(database_fqn, "Database Filtered Out")
continue

try:
self.set_inspector(database_name=new_database)
yield new_database
except Exception as exc:
logger.debug(traceback.format_exc())
logger.error(
f"Error trying to connect to database {new_database}: {exc}"
)

def get_table_partition_details(
self, table_name: str, schema_name: str, inspector: Inspector
) -> Tuple[bool, TablePartition]:
result = self.engine.execute(
DUCKDB_PARTITION_DETAILS.format(
table_name=table_name, schema_name=schema_name
)
).all()
if result:
partition_details = TablePartition(
intervalType=INTERVAL_TYPE_MAP.get(
result[0].partition_strategy, IntervalType.COLUMN_VALUE.value
),
columns=[row.column_name for row in result if row.column_name],
)
return True, partition_details
return False, None
Loading

0 comments on commit 0691c96

Please sign in to comment.