From 0691c96eaf19be3e6dc7cd05bee0997cabb2a737 Mon Sep 17 00:00:00 2001 From: Saurabh Yadav <49230235+saurabhyadav1985@users.noreply.github.com> Date: Tue, 26 Dec 2023 07:37:09 +0530 Subject: [PATCH] FIxes 13543: Added Duckdb Ingestion Connector (#14468) * changes * Added duck db ingestion connector --- .../source/database/duckdb/__init__.py | 0 .../source/database/duckdb/connection.py | 76 ++++ .../source/database/duckdb/metadata.py | 204 ++++++++++ .../source/database/duckdb/queries.py | 141 +++++++ .../ingestion/source/database/duckdb/utils.py | 349 ++++++++++++++++++ .../data/testConnections/database/duckdb.json | 39 ++ .../database/duckdbConnection.json | 126 +++++++ .../entity/services/databaseService.json | 9 +- .../ui/src/assets/img/service-icon-duckdb.png | Bin 0 -> 2293 bytes .../ui/src/constants/Services.constant.ts | 3 +- .../ui/src/utils/DatabaseServiceUtils.ts | 6 + 11 files changed, 951 insertions(+), 2 deletions(-) create mode 100644 ingestion/src/metadata/ingestion/source/database/duckdb/__init__.py create mode 100644 ingestion/src/metadata/ingestion/source/database/duckdb/connection.py create mode 100644 ingestion/src/metadata/ingestion/source/database/duckdb/metadata.py create mode 100644 ingestion/src/metadata/ingestion/source/database/duckdb/queries.py create mode 100644 ingestion/src/metadata/ingestion/source/database/duckdb/utils.py create mode 100644 openmetadata-service/src/main/resources/json/data/testConnections/database/duckdb.json create mode 100644 openmetadata-spec/src/main/resources/json/schema/entity/services/connections/database/duckdbConnection.json create mode 100644 openmetadata-ui/src/main/resources/ui/src/assets/img/service-icon-duckdb.png diff --git a/ingestion/src/metadata/ingestion/source/database/duckdb/__init__.py b/ingestion/src/metadata/ingestion/source/database/duckdb/__init__.py new file mode 100644 index 000000000000..e69de29bb2d1 diff --git a/ingestion/src/metadata/ingestion/source/database/duckdb/connection.py b/ingestion/src/metadata/ingestion/source/database/duckdb/connection.py new file mode 100644 index 000000000000..724e06c56714 --- /dev/null +++ b/ingestion/src/metadata/ingestion/source/database/duckdb/connection.py @@ -0,0 +1,76 @@ +# Copyright 2021 Collate +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Source connection handler +""" + +from typing import Optional + +from sqlalchemy.engine import Engine + +from metadata.generated.schema.entity.automations.workflow import ( + Workflow as AutomationWorkflow, +) +from metadata.generated.schema.entity.services.connections.database.duckdbConnection import ( + DuckDbConnection, + SslMode, +) +from metadata.ingestion.connections.builders import ( + create_generic_db_connection, + get_connection_args_common, + get_connection_url_common, + init_empty_connection_arguments, +) +from metadata.ingestion.connections.test_connections import test_connection_db_common +from metadata.ingestion.ometa.ometa_api import OpenMetadata +from metadata.ingestion.source.database.duckdb.queries import DUCKDB_GET_DATABASE + + +def get_connection(connection: DuckDbConnection) -> Engine: + """ + Create connection + """ + if connection.sslMode: + if not connection.connectionArguments: + connection.connectionArguments = init_empty_connection_arguments() + connection.connectionArguments.__root__["sslmode"] = connection.sslMode.value + if connection.sslMode in (SslMode.verify_ca, SslMode.verify_full): + connection.connectionArguments.__root__[ + "sslrootcert" + ] = connection.sslConfig.__root__.certificatePath + return create_generic_db_connection( + connection=connection, + get_connection_url_fn=get_connection_url_common, + get_connection_args_fn=get_connection_args_common, + ) + + +def test_connection( + metadata: OpenMetadata, + engine: Engine, + service_connection: DuckDbConnection, + automation_workflow: Optional[AutomationWorkflow] = None, +) -> None: + """ + Test connection. This can be executed either as part + of a metadata workflow or during an Automation Workflow + """ + queries = { + "GetDatabases": DUCKDB_GET_DATABASE, + } + test_connection_db_common( + metadata=metadata, + engine=engine, + service_connection=service_connection, + automation_workflow=automation_workflow, + queries=queries, + ) diff --git a/ingestion/src/metadata/ingestion/source/database/duckdb/metadata.py b/ingestion/src/metadata/ingestion/source/database/duckdb/metadata.py new file mode 100644 index 000000000000..5a809716a6c7 --- /dev/null +++ b/ingestion/src/metadata/ingestion/source/database/duckdb/metadata.py @@ -0,0 +1,204 @@ +# Copyright 2021 Collate +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +""" +DuckDb source module +""" +import traceback +from collections import namedtuple +from typing import Iterable, Optional, Tuple + +from sqlalchemy import sql +from sqlalchemy.dialects.postgresql.base import PGDialect, ischema_names +from sqlalchemy.engine.reflection import Inspector + +from metadata.generated.schema.entity.data.database import Database +from metadata.generated.schema.entity.data.table import ( + IntervalType, + TablePartition, + TableType, +) +from metadata.generated.schema.entity.services.connections.database.duckdbConnection import ( + DuckDbConnection, +) +from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import ( + OpenMetadataConnection, +) +from metadata.generated.schema.metadataIngestion.workflow import ( + Source as WorkflowSource, +) +from metadata.ingestion.api.steps import InvalidSourceException +from metadata.ingestion.source.database.column_type_parser import create_sqlalchemy_type +from metadata.ingestion.source.database.common_db_source import ( + CommonDbSourceService, + TableNameAndType, +) +from metadata.ingestion.source.database.duckdb.queries import ( + DUCKDB_GET_DB_NAMES, + DUCKDB_GET_TABLE_NAMES, + DUCKDB_PARTITION_DETAILS, +) +from metadata.ingestion.source.database.duckdb.utils import ( + get_column_info, + get_columns, + get_table_comment, + get_view_definition, +) +from metadata.ingestion.source.database.multi_db_source import MultiDBSource +from metadata.utils import fqn +from metadata.utils.filters import filter_by_database +from metadata.utils.logger import ingestion_logger +from metadata.utils.sqlalchemy_utils import ( + get_all_table_comments, + get_all_view_definitions, +) + +TableKey = namedtuple("TableKey", ["schema", "table_name"]) + +logger = ingestion_logger() + + +INTERVAL_TYPE_MAP = { + "list": IntervalType.COLUMN_VALUE.value, + "hash": IntervalType.COLUMN_VALUE.value, + "range": IntervalType.TIME_UNIT.value, +} + +RELKIND_MAP = { + "r": TableType.Regular, + "p": TableType.Partitioned, + "f": TableType.Foreign, +} + +GEOMETRY = create_sqlalchemy_type("GEOMETRY") +POINT = create_sqlalchemy_type("POINT") +POLYGON = create_sqlalchemy_type("POLYGON") + +ischema_names.update( + { + "geometry": GEOMETRY, + "point": POINT, + "polygon": POLYGON, + "box": create_sqlalchemy_type("BOX"), + "circle": create_sqlalchemy_type("CIRCLE"), + "line": create_sqlalchemy_type("LINE"), + "lseg": create_sqlalchemy_type("LSEG"), + "path": create_sqlalchemy_type("PATH"), + "pg_lsn": create_sqlalchemy_type("PG_LSN"), + "pg_snapshot": create_sqlalchemy_type("PG_SNAPSHOT"), + "tsquery": create_sqlalchemy_type("TSQUERY"), + "txid_snapshot": create_sqlalchemy_type("TXID_SNAPSHOT"), + "xml": create_sqlalchemy_type("XML"), + } +) + + +PGDialect.get_all_table_comments = get_all_table_comments +PGDialect.get_table_comment = get_table_comment +PGDialect._get_column_info = get_column_info # pylint: disable=protected-access +PGDialect.get_view_definition = get_view_definition +PGDialect.get_columns = get_columns +PGDialect.get_all_view_definitions = get_all_view_definitions + +PGDialect.ischema_names = ischema_names + + +class DuckDbSource(CommonDbSourceService, MultiDBSource): + """ + Implements the necessary methods to extract + Database metadata from DuckDb Source + """ + + @classmethod + def create(cls, config_dict, metadata: OpenMetadataConnection): + config: WorkflowSource = WorkflowSource.parse_obj(config_dict) + connection: DuckDbConnection = config.serviceConnection.__root__.config + if not isinstance(connection, DuckDbConnection): + raise InvalidSourceException( + f"Expected DuckDbConnection, but got {connection}" + ) + return cls(config, metadata) + + def query_table_names_and_types( + self, schema_name: str + ) -> Iterable[TableNameAndType]: + """ + Overwrite the inspector implementation to handle partitioned + and foreign types + """ + result = self.connection.execute( + sql.text(DUCKDB_GET_TABLE_NAMES), + {"schema": schema_name}, + ) + + return [ + TableNameAndType( + name=name, type_=RELKIND_MAP.get(relkind, TableType.Regular) + ) + for name, relkind in result + ] + + def get_configured_database(self) -> Optional[str]: + if not self.service_connection.ingestAllDatabases: + return self.service_connection.database + return None + + def get_database_names_raw(self) -> Iterable[str]: + yield from self._execute_database_query(DUCKDB_GET_DB_NAMES) + + def get_database_names(self) -> Iterable[str]: + if not self.config.serviceConnection.__root__.config.ingestAllDatabases: + configured_db = self.config.serviceConnection.__root__.config.database + self.set_inspector(database_name=configured_db) + yield configured_db + else: + for new_database in self.get_database_names_raw(): + database_fqn = fqn.build( + self.metadata, + entity_type=Database, + service_name=self.context.database_service, + database_name=new_database, + ) + + if filter_by_database( + self.source_config.databaseFilterPattern, + database_fqn + if self.source_config.useFqnForFiltering + else new_database, + ): + self.status.filter(database_fqn, "Database Filtered Out") + continue + + try: + self.set_inspector(database_name=new_database) + yield new_database + except Exception as exc: + logger.debug(traceback.format_exc()) + logger.error( + f"Error trying to connect to database {new_database}: {exc}" + ) + + def get_table_partition_details( + self, table_name: str, schema_name: str, inspector: Inspector + ) -> Tuple[bool, TablePartition]: + result = self.engine.execute( + DUCKDB_PARTITION_DETAILS.format( + table_name=table_name, schema_name=schema_name + ) + ).all() + if result: + partition_details = TablePartition( + intervalType=INTERVAL_TYPE_MAP.get( + result[0].partition_strategy, IntervalType.COLUMN_VALUE.value + ), + columns=[row.column_name for row in result if row.column_name], + ) + return True, partition_details + return False, None diff --git a/ingestion/src/metadata/ingestion/source/database/duckdb/queries.py b/ingestion/src/metadata/ingestion/source/database/duckdb/queries.py new file mode 100644 index 000000000000..37d3cde05a47 --- /dev/null +++ b/ingestion/src/metadata/ingestion/source/database/duckdb/queries.py @@ -0,0 +1,141 @@ +# Copyright 2021 Collate +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +""" +SQL Queries used during ingestion +""" + +import textwrap + +# https://www.postgresql.org/docs/current/catalog-pg-class.html +# r = ordinary table, v = view, m = materialized view, c = composite type, f = foreign table, p = partitioned table, +DUCKDB_GET_TABLE_NAMES = """ + select c.relname, c.relkind + from pg_catalog.pg_class c + left outer join pg_catalog.pg_partition_rule pr on c.oid = pr.parchildrelid + JOIN pg_namespace n ON n.oid = c.relnamespace + where c.relkind in ('r', 'p', 'f') + and pr.oid is null + and n.nspname = :schema +""" + +DUCKDB_PARTITION_DETAILS = textwrap.dedent( + """ + select + ns.nspname as schema, + par.relname as table_name, + partition_strategy, + col.column_name + from + (select + parrelid, + parnatts, + case parkind + when 'l' then 'list' + when 'h' then 'hash' + when 'r' then 'range' end as partition_strategy, + unnest(paratts) column_index + from + pg_catalog.pg_partition) pt + join + pg_class par + on + par.oid = pt.parrelid + left join + pg_catalog.pg_namespace ns on par.relnamespace = ns.oid + left join + information_schema.columns col + on + col.table_schema = ns.nspname + and col.table_name = par.relname + and ordinal_position = pt.column_index + where par.relname='{table_name}' and ns.nspname='{schema_name}' + """ +) + +DUCKDB_TABLE_COMMENTS = """ + SELECT n.nspname as schema, + c.relname as table_name, + pgd.description as table_comment + FROM pg_catalog.pg_class c + LEFT JOIN pg_catalog.pg_namespace n ON n.oid = c.relnamespace + LEFT JOIN pg_catalog.pg_description pgd ON pgd.objsubid = 0 AND pgd.objoid = c.oid + WHERE c.relkind in ('r', 'v', 'm', 'f', 'p') + AND pgd.description IS NOT NULL + AND n.nspname <> 'pg_catalog' + ORDER BY "schema", "table_name" +""" + +# Postgres\DuckDb views definitions only contains the select query +# hence we are appending "create view . as " to select query +# to generate the column level lineage +DUCKDB_VIEW_DEFINITIONS = """ +SELECT + n.nspname "schema", + c.relname view_name, + 'create view ' || n.nspname || '.' || c.relname || ' as ' || pg_get_viewdef(c.oid,true) view_def +FROM pg_class c +JOIN pg_namespace n ON n.oid = c.relnamespace +WHERE c.relkind IN ('v', 'm') +AND n.nspname not in ('pg_catalog','information_schema') +""" + +DUCKDB_GET_DATABASE = """ +select datname from pg_catalog.pg_database +""" + +DUCKDB_GET_DB_NAMES = """ +select datname from pg_catalog.pg_database +""" + +DUCKDB_COL_IDENTITY = """\ + (SELECT json_build_object( + 'always', a.attidentity = 'a', + 'start', s.seqstart, + 'increment', s.seqincrement, + 'minvalue', s.seqmin, + 'maxvalue', s.seqmax, + 'cache', s.seqcache, + 'cycle', s.seqcycle) + FROM pg_catalog.pg_sequence s + JOIN pg_catalog.pg_class c on s.seqrelid = c."oid" + WHERE c.relkind = 'S' + AND a.attidentity != '' + AND s.seqrelid = pg_catalog.pg_get_serial_sequence( + a.attrelid::regclass::text, a.attname + )::regclass::oid + ) as identity_options\ +""" + +DUCKDB_SQL_COLUMNS = """ + SELECT a.attname, + pg_catalog.format_type(a.atttypid, a.atttypmod), + ( + SELECT pg_catalog.pg_get_expr(d.adbin, d.adrelid) + FROM pg_catalog.pg_attrdef d + WHERE d.adrelid = a.attrelid AND d.adnum = a.attnum + AND a.atthasdef + ) AS DEFAULT, + a.attnotnull, + a.attrelid as table_oid, + pgd.description as comment, + {generated}, + {identity} + FROM pg_catalog.pg_attribute a + LEFT JOIN pg_catalog.pg_description pgd ON ( + pgd.objoid = a.attrelid AND pgd.objsubid = a.attnum) + WHERE a.attrelid = :table_oid + AND a.attnum > 0 AND NOT a.attisdropped + ORDER BY a.attnum + """ + +DUCKDB_GET_SERVER_VERSION = """ +show server_version +""" diff --git a/ingestion/src/metadata/ingestion/source/database/duckdb/utils.py b/ingestion/src/metadata/ingestion/source/database/duckdb/utils.py new file mode 100644 index 000000000000..6edf416e6869 --- /dev/null +++ b/ingestion/src/metadata/ingestion/source/database/duckdb/utils.py @@ -0,0 +1,349 @@ +# Copyright 2021 Collate +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# pylint: disable=protected-access + +""" +DuckDb SQLAlchemy util methods +""" +import re +from typing import Dict, Tuple + +from sqlalchemy import sql, util +from sqlalchemy.dialects.postgresql.base import ENUM +from sqlalchemy.engine import reflection +from sqlalchemy.sql import sqltypes + +from metadata.ingestion.source.database.duckdb.queries import ( + DUCKDB_COL_IDENTITY, + DUCKDB_SQL_COLUMNS, + DUCKDB_TABLE_COMMENTS, + DUCKDB_VIEW_DEFINITIONS, +) +from metadata.utils.sqlalchemy_utils import ( + get_table_comment_wrapper, + get_view_definition_wrapper, +) + + +@reflection.cache +def get_table_comment( + self, connection, table_name, schema=None, **kw +): # pylint: disable=unused-argument + return get_table_comment_wrapper( + self, + connection, + table_name=table_name, + schema=schema, + query=DUCKDB_TABLE_COMMENTS, + ) + + +@reflection.cache +def get_columns( # pylint: disable=too-many-locals + self, connection, table_name, schema=None, **kw +): + """ + Overriding the dialect method to add raw_data_type in response + """ + + table_oid = self.get_table_oid( + connection, table_name, schema, info_cache=kw.get("info_cache") + ) + + generated = ( + "a.attgenerated as generated" + if self.server_version_info >= (12,) + else "NULL as generated" + ) + if self.server_version_info >= (10,): + # a.attidentity != '' is required or it will reflect also + # serial columns as identity. + identity = DUCKDB_COL_IDENTITY + else: + identity = "NULL as identity_options" + + sql_col_query = DUCKDB_SQL_COLUMNS.format( + generated=generated, + identity=identity, + ) + sql_col_query = ( + sql.text(sql_col_query) + .bindparams(sql.bindparam("table_oid", type_=sqltypes.Integer)) + .columns(attname=sqltypes.Unicode, default=sqltypes.Unicode) + ) + conn = connection.execute(sql_col_query, {"table_oid": table_oid}) + rows = conn.fetchall() + + # dictionary with (name, ) if default search path or (schema, name) + # as keys + domains = self._load_domains(connection) + + # dictionary with (name, ) if default search path or (schema, name) + # as keys + enums = dict( + ((rec["name"],), rec) if rec["visible"] else ((rec["schema"], rec["name"]), rec) + for rec in self._load_enums(connection, schema="*") + ) + + # format columns + columns = [] + + for ( + name, + format_type, + default_, + notnull, + table_oid, + comment, + generated, + identity, + ) in rows: + column_info = self._get_column_info( + name, + format_type, + default_, + notnull, + domains, + enums, + schema, + comment, + generated, + identity, + ) + column_info["system_data_type"] = format_type + columns.append(column_info) + return columns + + +def _get_numeric_args(charlen): + if charlen: + prec, scale = charlen.split(",") + return (int(prec), int(scale)) + return () + + +def _get_interval_args(charlen, attype, kwargs: Dict): + field_match = re.match(r"interval (.+)", attype, re.I) + if charlen: + kwargs["precision"] = int(charlen) + if field_match: + kwargs["fields"] = field_match.group(1) + attype = "interval" + return (), attype, kwargs + + +def _get_bit_var_args(charlen, kwargs): + kwargs["varying"] = True + if charlen: + return (int(charlen),), kwargs + + return (), kwargs + + +def get_column_args( + charlen: str, args: Tuple, kwargs: Dict, attype: str +) -> Tuple[Tuple, Dict]: + """ + Method to determine the args and kwargs + """ + if attype == "numeric": + args = _get_numeric_args(charlen) + elif attype == "double precision": + args = (53,) + elif attype == "integer": + args = () + elif attype in ("timestamp with time zone", "time with time zone"): + kwargs["timezone"] = True + if charlen: + kwargs["precision"] = int(charlen) + args = () + elif attype in ( + "timestamp without time zone", + "time without time zone", + "time", + ): + kwargs["timezone"] = False + if charlen: + kwargs["precision"] = int(charlen) + args = () + elif attype == "bit varying": + args, kwargs = _get_bit_var_args(charlen, kwargs) + elif attype == "geometry": + args = () + elif attype.startswith("interval"): + args, attype, kwargs = _get_interval_args(charlen, attype, kwargs) + elif charlen: + args = (int(charlen),) + + return args, kwargs, attype + + +def get_column_default(coltype, schema, default, generated): + """ + Method to determine the default of column + """ + autoincrement = False + # If a zero byte or blank string depending on driver (is also absent + # for older PG versions), then not a generated column. Otherwise, s = + # stored. (Other values might be added in the future.) + if generated not in (None, "", b"\x00"): + computed = {"sqltext": default, "persisted": generated in ("s", b"s")} + default = None + else: + computed = None + if default is not None: + match = re.search(r"""(nextval\(')([^']+)('.*$)""", default) + if match is not None: + if issubclass(coltype._type_affinity, sqltypes.Integer): + autoincrement = True + # the default is related to a Sequence + sch = schema + if "." not in match.group(2) and sch is not None: + # unconditionally quote the schema name. this could + # later be enhanced to obey quoting rules / + # "quote schema" + default = ( + match.group(1) + + (f'"{sch}"') + + "." + + match.group(2) + + match.group(3) + ) + return default, autoincrement, computed + + +def _handle_array_type(attype): + return ( + # strip '[]' from integer[], etc. + re.sub(r"\[\]$", "", attype), + attype.endswith("[]"), + ) + + +# pylint: disable=too-many-statements,too-many-branches,too-many-locals,too-many-arguments +def get_column_info( + self, + name, + format_type, + default, + notnull, + domains, + enums, + schema, + comment, + generated, + identity, +): + """ + Method to return column info + """ + + if format_type is None: + no_format_type = True + attype = format_type = "no format_type()" + is_array = False + else: + no_format_type = False + + # strip (*) from character varying(5), timestamp(5) + # with time zone, geometry(POLYGON), etc. + attype = re.sub(r"\(.*\)", "", format_type) + + # strip '[]' from integer[], etc. and check if an array + attype, is_array = _handle_array_type(attype) + + # strip quotes from case sensitive enum or domain names + enum_or_domain_key = tuple(util.quoted_token_parser(attype)) + + nullable = not notnull + + charlen = re.search(r"\(([\d,]+)\)", format_type) + if charlen: + charlen = charlen.group(1) + args = re.search(r"\((.*)\)", format_type) + if args and args.group(1): + args = tuple(re.split(r"\s*,\s*", args.group(1))) + else: + args = () + kwargs = {} + + args, kwargs, attype = get_column_args(charlen, args, kwargs, attype) + + while True: + # looping here to suit nested domains + if attype in self.ischema_names: + coltype = self.ischema_names[attype] + break + if enum_or_domain_key in enums: + enum = enums[enum_or_domain_key] + coltype = ENUM + kwargs["name"] = enum["name"] + if not enum["visible"]: + kwargs["schema"] = enum["schema"] + args = tuple(enum["labels"]) + break + if enum_or_domain_key in domains: + domain = domains[enum_or_domain_key] + attype = domain["attype"] + attype, is_array = _handle_array_type(attype) + # strip quotes from case sensitive enum or domain names + enum_or_domain_key = tuple(util.quoted_token_parser(attype)) + # A table can't override a not null on the domain, + # but can override nullable + nullable = nullable and domain["nullable"] + if domain["default"] and not default: + # It can, however, override the default + # value, but can't set it to null. + default = domain["default"] + continue + coltype = None + break + + if coltype: + coltype = coltype(*args, **kwargs) + if is_array: + coltype = self.ischema_names["_array"](coltype) + elif no_format_type: + util.warn(f"PostgreSQL format_type() returned NULL for column '{name}'") + coltype = sqltypes.NULLTYPE + else: + util.warn(f"Did not recognize type '{attype}' of column '{name}'") + coltype = sqltypes.NULLTYPE + + default, autoincrement, computed = get_column_default( + coltype=coltype, schema=schema, default=default, generated=generated + ) + column_info = { + "name": name, + "type": coltype, + "nullable": nullable, + "default": default, + "autoincrement": autoincrement or identity is not None, + "comment": comment, + } + if computed is not None: + column_info["computed"] = computed + if identity is not None: + column_info["identity"] = identity + return column_info + + +@reflection.cache +def get_view_definition( + self, connection, table_name, schema=None, **kw +): # pylint: disable=unused-argument + return get_view_definition_wrapper( + self, + connection, + table_name=table_name, + schema=schema, + query=DUCKDB_VIEW_DEFINITIONS, + ) diff --git a/openmetadata-service/src/main/resources/json/data/testConnections/database/duckdb.json b/openmetadata-service/src/main/resources/json/data/testConnections/database/duckdb.json new file mode 100644 index 000000000000..db7c6ea2497f --- /dev/null +++ b/openmetadata-service/src/main/resources/json/data/testConnections/database/duckdb.json @@ -0,0 +1,39 @@ +{ + "name": "DuckDb", + "displayName": "DuckDb Test Connection", + "description": "This Test Connection validates the access against the database and basic metadata extraction of schemas and tables.", + "steps": [ + { + "name": "CheckAccess", + "description": "Validate that we can properly reach the database and authenticate with the given credentials.", + "errorMessage": "Failed to connect to DuckDb, please validate the credentials", + "mandatory": true + }, + { + "name": "GetDatabases", + "description": "List all the databases available to the user.", + "errorMessage": "Failed to fetch databases, please validate if the user has enough privilege to fetch databases.", + "mandatory": true + }, + { + "name": "GetSchemas", + "description": "List all the schemas available to the user.", + "errorMessage": "Failed to fetch schemas, please validate if the user has enough privilege to fetch schemas.", + "mandatory": true + }, + { + "name": "GetTables", + "description": "From a given schema, list the tables belonging to that schema. If no schema is specified, we'll list the tables of a random schema.", + "errorMessage": "Failed to fetch tables, please validate if the user has enough privilege to fetch tables.", + "mandatory": true + }, + { + "name": "GetViews", + "description": "From a given schema, list the views belonging to that schema. If no schema is specified, we'll list the tables of a random schema.", + "errorMessage": "Failed to fetch views, please validate if the user has enough privilege to fetch views.", + "mandatory": false + } + ] + } + + \ No newline at end of file diff --git a/openmetadata-spec/src/main/resources/json/schema/entity/services/connections/database/duckdbConnection.json b/openmetadata-spec/src/main/resources/json/schema/entity/services/connections/database/duckdbConnection.json new file mode 100644 index 000000000000..260e5273fbd0 --- /dev/null +++ b/openmetadata-spec/src/main/resources/json/schema/entity/services/connections/database/duckdbConnection.json @@ -0,0 +1,126 @@ +{ + "$id": "https://open-metadata.org/schema/entity/services/connections/database/greenplumConnection.json", + "$schema": "http://json-schema.org/draft-07/schema#", + "title": "DuckDbConnection", + "description": "DuckDb Database Connection Config", + "type": "object", + "javaType": "org.openmetadata.schema.services.connections.database.DuckDbConnection", + "definitions": { + "greenplumType": { + "description": "Service type.", + "type": "string", + "enum": [ + "DuckDb" + ], + "default": "DuckDb" + }, + "greenplumScheme": { + "description": "SQLAlchemy driver scheme options.", + "type": "string", + "enum": [ + "postgresql+psycopg2" + ], + "default": "postgresql+psycopg2" + } + }, + "properties": { + "type": { + "title": "Service Type", + "description": "Service Type", + "$ref": "#/definitions/greenplumType", + "default": "DuckDb" + }, + "scheme": { + "title": "Connection Scheme", + "description": "SQLAlchemy driver scheme options.", + "$ref": "#/definitions/greenplumScheme", + "default": "postgresql+psycopg2" + }, + "username": { + "title": "Username", + "description": "Username to connect to DuckDb. This user should have privileges to read all the metadata in DuckDb.", + "type": "string" + }, + "authType": { + "title": "Auth Configuration Type", + "description": "Choose Auth Config Type.", + "oneOf": [ + { + "$ref": "./common/basicAuth.json" + }, + { + "$ref": "./common/iamAuthConfig.json" + } + ] + }, + "hostPort": { + "title": "Host and Port", + "description": "Host and port of the source service.", + "type": "string" + }, + "database": { + "title": "Database", + "description": "Database of the data source. This is optional parameter, if you would like to restrict the metadata reading to a single database. When left blank, OpenMetadata Ingestion attempts to scan all the databases.", + "type": "string" + }, + "sslMode": { + "title": "SSL Mode", + "description": "SSL Mode to connect to DuckDb database.", + "enum": [ + "disable", + "allow", + "prefer", + "require", + "verify-ca", + "verify-full" + ], + "default": "disable" + }, + "sslConfig": { + "$ref": "../../../../security/ssl/verifySSLConfig.json#/definitions/sslConfig" + }, + "ingestAllDatabases": { + "title": "Ingest All Databases", + "description": "Ingest data from all databases in DuckDb. You can use databaseFilterPattern on top of this.", + "type": "boolean", + "default": false + }, + "connectionOptions": { + "title": "Connection Options", + "$ref": "../connectionBasicType.json#/definitions/connectionOptions" + }, + "connectionArguments": { + "title": "Connection Arguments", + "$ref": "../connectionBasicType.json#/definitions/connectionArguments" + }, + "supportsMetadataExtraction": { + "title": "Supports Metadata Extraction", + "$ref": "../connectionBasicType.json#/definitions/supportsMetadataExtraction" + }, + "supportsDBTExtraction": { + "$ref": "../connectionBasicType.json#/definitions/supportsDBTExtraction" + }, + "supportsProfiler": { + "title": "Supports Profiler", + "$ref": "../connectionBasicType.json#/definitions/supportsProfiler" + }, + "supportsDatabase": { + "title": "Supports Database", + "$ref": "../connectionBasicType.json#/definitions/supportsDatabase" + }, + "supportsQueryComment": { + "title": "Supports Query Comment", + "$ref": "../connectionBasicType.json#/definitions/supportsQueryComment" + }, + "sampleDataStorageConfig": { + "title": "Storage Config for Sample Data", + "$ref": "../connectionBasicType.json#/definitions/sampleDataStorageConfig" + } + }, + "additionalProperties": false, + "required": [ + "hostPort", + "username", + "database" + ] +} \ No newline at end of file diff --git a/openmetadata-spec/src/main/resources/json/schema/entity/services/databaseService.json b/openmetadata-spec/src/main/resources/json/schema/entity/services/databaseService.json index 178c7f6e854b..81e4ecbd026a 100644 --- a/openmetadata-spec/src/main/resources/json/schema/entity/services/databaseService.json +++ b/openmetadata-spec/src/main/resources/json/schema/entity/services/databaseService.json @@ -51,7 +51,8 @@ "Couchbase", "Greenplum", "Doris", - "UnityCatalog" + "UnityCatalog", + "DuckDb" ], "javaEnums": [ { @@ -164,6 +165,9 @@ }, { "name": "UnityCatalog" + }, + { + "name": "DuckDb" } ] @@ -282,6 +286,9 @@ }, { "$ref": "./connections/database/unityCatalogConnection.json" + }, + { + "$ref": "./connections/database/duckdbConnection.json" } ] } diff --git a/openmetadata-ui/src/main/resources/ui/src/assets/img/service-icon-duckdb.png b/openmetadata-ui/src/main/resources/ui/src/assets/img/service-icon-duckdb.png new file mode 100644 index 0000000000000000000000000000000000000000..424c9daa618cb0198efd8ce5dcf23b380e9f3492 GIT binary patch literal 2293 zcmZ{ldpy%`8^^!qG>17(+`-o_kI6!-?{#N9x!EnWdHzR zo)i)l%#ptWDhIBVqc_{YBo{~UB>+H6p~`BMJXo7BC{$kn*t-z`gogm&8`u)g0YIuf z0KAI?0DJ`i=<;f=Z+8U+NVK;H3HT{0ynhp{lz5cj6hH?5^;gK?Tj+SOsmS;AB`eNC zG*nEqL}r^40YKrHCy78yfBL8tgBRU{FyikZajB5CdpI?AaPN|wv+0F}hvLf`;%YLMf zAHKJ9K?$B*p7?e$@WUJHM6LTWNgG6)+A9>oXM(y-LJ!0xQ;A|wY>v`76?1#rNuQ}A zov3x4OtE6N>ybaS1D0hVUwEai@Yj%zv-!C6vibDqta;r}h0Fz6xdm!Ngo;{FK$7Ju zAoFgIy2yAL8zf1eRO@%c&Q%8G=WLljKe6TLUJ2=33b9CDiyY>FX zy`6_@Y^_@EiRffs?h>EcW%Z$F5)qJybJes(-~_rMJ)HCV4r>g>k~<`8X~8e9oZLz7 z-vXO1iLDES@JBrnVRF%RSs3L)jdwcj%G`K; ze}GpTU*-T+^3qygz75l2JK|*)O%K`H86%@#sbei@Ik6j$%EJKal`w{WAVJ$w;HUp6#8tM8mAJ0h?0!{4i#TZ|Nn_}PQR(3u2* zC0WQA^R`4#Y)Ag|kysXe!IV+Yt!DLh87)quUmQTVDf{(*8vi-p-ZMzpD6p+VuJdlZ z{4}#DthV;59zg&_6jAWztL8$Ex}h#5O-y4$%u?2qgiZ>+xU#L=#kmq4svPo$kX{As zUJkUM3CvsB=v4*JHp@8qV04485SYy#ef(JorWs&n2~lYaZ}!eo=gHRcp1xdJKeeyP zG_ve{F@CAX=xO$7T1pkAdQ6TS+4}CWe?9baLh^?WO;l=)oK@W*oVjuas`}8`xLA2` zeMhwlb#I@%ppfV>X6lX(ItSRoLQNI(O}jt(%hlX;*TbXqDcX*lyf-MPsqffq_l)RP z`JC&{OqHe@(pwV=7Wq8zjE)5>=a0Wa3W^_zBjMDFUbi*wo`DHXA$6B%Cz{-g0cnG1 z^`lu!?E!9|!wLGH63MCZAnt6^>UQzUhQV14wBM%p*0A92XXCf-9=OLgQ0qsB+`90t z39=?Rg0%@xh6pMKxw$T_-@6+8?-*s|HIa6Fjjt!WA{%Q=94BQEl0ywQ1_ix3Z>W~3 z+XSj2qQUGTiT1n{i z*T7SfzAtM><$Qcx7>wwVgz0BJt5Emu_bv8|XXnKz!zuDmmUh*(0i0|5u47n;^R8EB zWl~qhxymKge75p_8s`jol|M{SbkjEc-216N0}Vsav+nYm6?Fb|j69fYzSMdwk7t*Y z-#Jv-W24{O^Xjbm*q}-2lr#Z%i`BJxF^Z-3sC;WdpKQ(F3e>@cv`#rhXU>J!^S0~v z8G8!iz=yzkZm(+ZixmTo^}1-&^x$SkXtb24CWOXERfy_@SxcB5YBw{MzGVtCLw3aO z(hIV=SD?^)&BpLSYFDhPz+Yvhrriww3BJ;LGUTF?g3hP)8XFNG`!R-BNr}#hm#l^& zBUB+LL&vu@2M;ePS=UdD6@5uo9#ZzCTjpM-!Jc@_WX3R%1}jI^*pn`}-tR5f%L$E> zcl7$R8e!oDQLFLA$1Eq=$LS?Ir{NygC|3_a(WTft>&1#b(rKj#wb!rkao0FYGxqiH zt`mq7pN&OCEkCWpvU}(1#GUD9PiI=uwp%@veH$UIwO0FL&FKJtEptA32R|l?&%`s5 znP39Y_Gkwid$bJ-OGBaX4rn~e$;#dyZ*NbSe(CZ*1Kgw-cI=-2KVW|?P6Q5^{MsQU zDVD!GDwzqe_jmMLc7^z%lA|H?U z2hkJ&)%3H|?jMZjFBd$X!eqy>_<$n{T__I702$rieE!SO*)e?9FT_wFI{_kVe?xrP zTqb`Hj|u$eRCc_?xXrCWe*f_CrGXTH!a89cF-|B4r&*QB=OCi}n@<3HFGx5zVQ@rZ zy>YA { break; } + case DatabaseServiceType.DuckDb: { + schema = duckdbConnection; + + break; + } default: { schema = {};