Skip to content

Commit

Permalink
Merge branch 'datahub-project:master' into master
Browse files Browse the repository at this point in the history
  • Loading branch information
hsheth2 authored Dec 17, 2024
2 parents ab613cc + 42cad3d commit 368f43c
Show file tree
Hide file tree
Showing 9 changed files with 19 additions and 13 deletions.
2 changes: 1 addition & 1 deletion datahub-frontend/conf/logback.xml
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@
<logger name="com.linkedin" level="DEBUG">
<appender-ref ref="DEBUG_FILE"/>
</logger>
<logger name="controller" level="DEBUG">
<logger name="controllers" level="DEBUG">
<appender-ref ref="DEBUG_FILE"/>
</logger>
<logger name="auth" level="DEBUG">
Expand Down
2 changes: 1 addition & 1 deletion metadata-ingestion/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@
# We heavily monkeypatch sqlglot.
# Prior to the patching, we originally maintained an acryl-sqlglot fork:
# https://github.com/tobymao/sqlglot/compare/main...hsheth2:sqlglot:main?expand=1
"sqlglot[rs]==25.26.0",
"sqlglot[rs]==25.32.1",
"patchy==2.8.0",
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import logging
from datetime import datetime, timezone
from datetime import datetime, timedelta, timezone
from functools import partial
from typing import Dict, Iterable, List, Optional

Expand All @@ -26,6 +26,7 @@
StatefulIngestionSourceBase,
)
from datahub.metadata.schema_classes import ChangeTypeClass
from datahub.utilities.progress_timer import ProgressTimer

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -105,11 +106,17 @@ def _get_database_workunits(
self, from_createdon: datetime, reader: DataHubDatabaseReader
) -> Iterable[MetadataWorkUnit]:
logger.info(f"Fetching database aspects starting from {from_createdon}")
progress = ProgressTimer(report_every=timedelta(seconds=60))
mcps = reader.get_aspects(from_createdon, self.report.stop_time)
for i, (mcp, createdon) in enumerate(mcps):
if not self.urn_pattern.allowed(str(mcp.entityUrn)):
continue

if progress.should_report():
logger.info(
f"Ingested {i} database aspects so far, currently at {createdon}"
)

yield mcp.as_workunit()
self.report.num_database_aspects_ingested += 1

Expand Down
5 changes: 2 additions & 3 deletions metadata-ingestion/src/datahub/ingestion/source/s3/source.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from itertools import groupby
from pathlib import PurePath
from typing import Any, Dict, Iterable, List, Optional, Tuple
from urllib.parse import urlparse

import smart_open.compression as so_compression
from more_itertools import peekable
Expand Down Expand Up @@ -993,9 +994,7 @@ def s3_browser(self, path_spec: PathSpec, sample_size: int) -> Iterable[BrowsePa
folders = []
for dir in dirs_to_process:
logger.info(f"Getting files from folder: {dir}")
prefix_to_process = dir.rstrip("\\").lstrip(
self.create_s3_path(bucket_name, "/")
)
prefix_to_process = urlparse(dir).path.lstrip("/")

folders.extend(
self.get_folder_info(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ def diff_metadata_json(
ignore_paths: Sequence[str] = (),
ignore_order: bool = True,
) -> Union[DeepDiff, MCPDiff]:
ignore_paths = (*ignore_paths, *default_exclude_paths, r"root\[\d+].delta_info")
ignore_paths = [*ignore_paths, *default_exclude_paths, r"root\[\d+].delta_info"]
try:
if ignore_order:
golden_map = get_aspects_by_urn(golden)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@
"aspect": {
"json": {
"statement": {
"value": "ALTER TABLE dev.public.foo_staging RENAME TO foo",
"value": "ALTER TABLE dev.public.foo_staging RENAME TO foo /* Datahub generated query text-- */",
"language": "SQL"
},
"source": "SYSTEM",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@
"aspect": {
"json": {
"statement": {
"value": "ALTER TABLE dev.public.person_info_swap SWAP WITH dev.public.person_info",
"value": "ALTER TABLE dev.public.person_info_swap SWAP WITH dev.public.person_info /* Datahub generated query text-- */",
"language": "SQL"
},
"source": "SYSTEM",
Expand Down Expand Up @@ -438,7 +438,7 @@
"aspect": {
"json": {
"statement": {
"value": "ALTER TABLE dev.public.person_info SWAP WITH dev.public.person_info_swap",
"value": "ALTER TABLE dev.public.person_info SWAP WITH dev.public.person_info_swap /* Datahub generated query text-- */",
"language": "SQL"
},
"source": "SYSTEM",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@
"aspect": {
"json": {
"statement": {
"value": "CREATE TABLE person_info_swap CLONE person_info;\n\nCREATE TABLE person_info_incremental AS\nSELECT\n *\nFROM person_info_dep;\n\nINSERT INTO person_info_swap\nSELECT\n *\nFROM person_info_incremental;\n\nALTER TABLE dev.public.person_info_swap SWAP WITH dev.public.person_info",
"value": "CREATE TABLE person_info_swap CLONE person_info;\n\nCREATE TABLE person_info_incremental AS\nSELECT\n *\nFROM person_info_dep;\n\nINSERT INTO person_info_swap\nSELECT\n *\nFROM person_info_incremental;\n\nALTER TABLE dev.public.person_info_swap SWAP WITH dev.public.person_info /* Datahub generated query text-- */",
"language": "SQL"
},
"source": "SYSTEM",
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"query_type": "SELECT",
"query_type_props": {},
"query_fingerprint": "c721ce16410601b36e5f32bd9c5c28488500a93e617363739faebfe71496f163",
"query_fingerprint": "a204522c98a01568d8575a98a715de98985aeef0e822feb8450153f71891d6c6",
"in_tables": [
"urn:li:dataset:(urn:li:dataPlatform:bigquery,acryl-staging-2.smoke_test_db_4.INFORMATION_SCHEMA.COLUMNS,PROD)",
"urn:li:dataset:(urn:li:dataPlatform:bigquery,acryl-staging-2.smoke_test_db_4.INFORMATION_SCHEMA.COLUMN_FIELD_PATHS,PROD)"
Expand Down Expand Up @@ -178,6 +178,6 @@
],
"debug_info": {
"confidence": 0.2,
"generalized_statement": "SELECT c.table_catalog AS table_catalog, c.table_schema AS table_schema, c.table_name AS table_name, c.column_name AS column_name, c.ordinal_position AS ordinal_position, cfp.field_path AS field_path, c.is_nullable AS is_nullable, CASE WHEN CONTAINS_SUBSTR(cfp.field_path, ?) THEN NULL ELSE c.data_type END AS data_type, description AS comment, c.is_hidden AS is_hidden, c.is_partitioning_column AS is_partitioning_column, c.clustering_ordinal_position AS clustering_ordinal_position FROM `acryl-staging-2`.`smoke_test_db_4`.INFORMATION_SCHEMA.COLUMNS AS c JOIN `acryl-staging-2`.`smoke_test_db_4`.INFORMATION_SCHEMA.COLUMN_FIELD_PATHS AS cfp ON cfp.table_name = c.table_name AND cfp.column_name = c.column_name ORDER BY table_catalog, table_schema, table_name, ordinal_position ASC, data_type DESC"
"generalized_statement": "SELECT c.table_catalog AS table_catalog, c.table_schema AS table_schema, c.table_name AS table_name, c.column_name AS column_name, c.ordinal_position AS ordinal_position, cfp.field_path AS field_path, c.is_nullable AS is_nullable, CASE WHEN CONTAINS_SUBSTR(cfp.field_path, ?) THEN NULL ELSE c.data_type END AS data_type, description AS comment, c.is_hidden AS is_hidden, c.is_partitioning_column AS is_partitioning_column, c.clustering_ordinal_position AS clustering_ordinal_position FROM `acryl-staging-2`.`smoke_test_db_4`.`INFORMATION_SCHEMA.COLUMNS` AS c JOIN `acryl-staging-2`.`smoke_test_db_4`.`INFORMATION_SCHEMA.COLUMN_FIELD_PATHS` AS cfp ON cfp.table_name = c.table_name AND cfp.column_name = c.column_name ORDER BY table_catalog, table_schema, table_name, ordinal_position ASC, data_type DESC"
}
}

0 comments on commit 368f43c

Please sign in to comment.