Skip to content

Commit

Permalink
Merge branch 'datahub-project:master' into master
Browse files Browse the repository at this point in the history
  • Loading branch information
treff7es authored Dec 22, 2023
2 parents 6d79251 + ed5bdfc commit d460f82
Show file tree
Hide file tree
Showing 10 changed files with 87 additions and 46 deletions.
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { Button, Form, Switch, Typography } from 'antd';
import { Button, Checkbox, Form, Input, Switch, Typography } from 'antd';
import React, { useMemo, useState } from 'react';
import { Cron } from 'react-js-cron';
import 'react-js-cron/dist/styles.css';
Expand Down Expand Up @@ -31,6 +31,10 @@ const CronText = styled(Typography.Paragraph)`
color: ${ANTD_GRAY[7]};
`;

const AdvancedCheckBox = styled(Typography.Text)`
margin-right: 10px;
margin-bottom: 8px;
`;
const CronSuccessCheck = styled(CheckCircleOutlined)`
color: ${REDESIGN_COLORS.BLUE};
margin-right: 4px;
Expand Down Expand Up @@ -68,8 +72,8 @@ export const CreateScheduleStep = ({ state, updateState, goTo, prev }: StepProps
const { schedule } = state;
const interval = schedule?.interval?.replaceAll(', ', ' ') || DAILY_MIDNIGHT_CRON_INTERVAL;
const timezone = schedule?.timezone || Intl.DateTimeFormat().resolvedOptions().timeZone;

const [scheduleEnabled, setScheduleEnabled] = useState(!!schedule);
const [advancedCronCheck, setAdvancedCronCheck] = useState(false);
const [scheduleCronInterval, setScheduleCronInterval] = useState(interval);
const [scheduleTimezone, setScheduleTimezone] = useState(timezone);

Expand Down Expand Up @@ -137,13 +141,29 @@ export const CreateScheduleStep = ({ state, updateState, goTo, prev }: StepProps
)}
</Form.Item>
<StyledFormItem required label={<Typography.Text strong>Schedule</Typography.Text>}>
<Cron
value={scheduleCronInterval}
setValue={setScheduleCronInterval}
clearButton={false}
className="cron-builder"
leadingZero
/>
<div style={{ paddingBottom: 10, paddingLeft: 10 }}>
<AdvancedCheckBox type="secondary">Advanced</AdvancedCheckBox>
<Checkbox
checked={advancedCronCheck}
onChange={(event) => setAdvancedCronCheck(event.target.checked)}
/>
</div>
{advancedCronCheck ? (
<Input
placeholder={DAILY_MIDNIGHT_CRON_INTERVAL}
autoFocus
value={scheduleCronInterval}
onChange={(e) => setScheduleCronInterval(e.target.value)}
/>
) : (
<Cron
value={scheduleCronInterval}
setValue={setScheduleCronInterval}
clearButton={false}
className="cron-builder"
leadingZero
/>
)}
<CronText>
{cronAsText.error && <>Invalid cron schedule. Cron must be of UNIX form:</>}
{!cronAsText.text && (
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,9 @@ export const SelectTemplateStep = ({ state, updateState, goTo, cancel, ingestion
};

const filteredSources = ingestionSources.filter(
(source) => source.displayName.includes(searchFilter) || source.name.includes(searchFilter),
(source) =>
source.displayName.toLocaleLowerCase().includes(searchFilter.toLocaleLowerCase()) ||
source.name.toLocaleLowerCase().includes(searchFilter.toLocaleLowerCase()),
);

return (
Expand Down
74 changes: 46 additions & 28 deletions metadata-ingestion/src/datahub/ingestion/source/redshift/lineage.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
UpstreamLineageClass,
)
from datahub.utilities import memory_footprint
from datahub.utilities.dedup_list import deduplicate_list
from datahub.utilities.urns import dataset_urn

logger: logging.Logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -85,6 +86,30 @@ def __post_init__(self):
else:
self.dataset_lineage_type = DatasetLineageTypeClass.TRANSFORMED

def merge_lineage(
self,
upstreams: Set[LineageDataset],
cll: Optional[List[sqlglot_l.ColumnLineageInfo]],
) -> None:
self.upstreams = self.upstreams.union(upstreams)

# Merge CLL using the output column name as the merge key.
self.cll = self.cll or []
existing_cll: Dict[str, sqlglot_l.ColumnLineageInfo] = {
c.downstream.column: c for c in self.cll
}
for c in cll or []:
if c.downstream.column in existing_cll:
# Merge using upstream + column name as the merge key.
existing_cll[c.downstream.column].upstreams = deduplicate_list(
[*existing_cll[c.downstream.column].upstreams, *c.upstreams]
)
else:
# New output column, just add it as is.
self.cll.append(c)

self.cll = self.cll or None


class RedshiftLineageExtractor:
def __init__(
Expand Down Expand Up @@ -161,7 +186,12 @@ def _get_sources_from_query(
)
sources.append(source)

return sources, parsed_result.column_lineage
return (
sources,
parsed_result.column_lineage
if self.config.include_view_column_lineage
else None,
)

def _build_s3_path_from_row(self, filename: str) -> str:
path = filename.strip()
Expand Down Expand Up @@ -208,7 +238,7 @@ def _get_sources(
"Only s3 source supported with copy. The source was: {path}."
)
self.report.num_lineage_dropped_not_support_copy_path += 1
return sources, cll
return [], None
path = strip_s3_prefix(self._get_s3_path(path))
urn = make_dataset_urn_with_platform_instance(
platform=platform.value,
Expand Down Expand Up @@ -284,7 +314,6 @@ def _populate_lineage_map(
ddl=lineage_row.ddl,
filename=lineage_row.filename,
)
target.cll = cll

target.upstreams.update(
self._get_upstream_lineages(
Expand All @@ -294,13 +323,13 @@ def _populate_lineage_map(
raw_db_name=raw_db_name,
)
)
target.cll = cll

# Merging downstreams if dataset already exists and has downstreams
# Merging upstreams if dataset already exists and has upstreams
if target.dataset.urn in self._lineage_map:
self._lineage_map[target.dataset.urn].upstreams = self._lineage_map[
target.dataset.urn
].upstreams.union(target.upstreams)

self._lineage_map[target.dataset.urn].merge_lineage(
upstreams=target.upstreams, cll=target.cll
)
else:
self._lineage_map[target.dataset.urn] = target

Expand Down Expand Up @@ -420,23 +449,21 @@ def populate_lineage(
) -> None:
populate_calls: List[Tuple[str, LineageCollectorType]] = []

if self.config.table_lineage_mode == LineageMode.STL_SCAN_BASED:
if self.config.table_lineage_mode in {
LineageMode.STL_SCAN_BASED,
LineageMode.MIXED,
}:
# Populate table level lineage by getting upstream tables from stl_scan redshift table
query = RedshiftQuery.stl_scan_based_lineage_query(
self.config.database,
self.start_time,
self.end_time,
)
populate_calls.append((query, LineageCollectorType.QUERY_SCAN))
elif self.config.table_lineage_mode == LineageMode.SQL_BASED:
# Populate table level lineage by parsing table creating sqls
query = RedshiftQuery.list_insert_create_queries_sql(
db_name=database,
start_time=self.start_time,
end_time=self.end_time,
)
populate_calls.append((query, LineageCollectorType.QUERY_SQL_PARSER))
elif self.config.table_lineage_mode == LineageMode.MIXED:
if self.config.table_lineage_mode in {
LineageMode.SQL_BASED,
LineageMode.MIXED,
}:
# Populate table level lineage by parsing table creating sqls
query = RedshiftQuery.list_insert_create_queries_sql(
db_name=database,
Expand All @@ -445,15 +472,7 @@ def populate_lineage(
)
populate_calls.append((query, LineageCollectorType.QUERY_SQL_PARSER))

# Populate table level lineage by getting upstream tables from stl_scan redshift table
query = RedshiftQuery.stl_scan_based_lineage_query(
db_name=database,
start_time=self.start_time,
end_time=self.end_time,
)
populate_calls.append((query, LineageCollectorType.QUERY_SCAN))

if self.config.include_views:
if self.config.include_views and self.config.include_view_lineage:
# Populate table level lineage for views
query = RedshiftQuery.view_lineage_query()
populate_calls.append((query, LineageCollectorType.VIEW))
Expand Down Expand Up @@ -540,7 +559,6 @@ def get_lineage(
dataset_urn: str,
schema: RedshiftSchema,
) -> Optional[Tuple[UpstreamLineageClass, Dict[str, str]]]:

upstream_lineage: List[UpstreamClass] = []

cll_lineage: List[FineGrainedLineage] = []
Expand Down
5 changes: 3 additions & 2 deletions metadata-ingestion/src/datahub/utilities/sqlglot_lineage.py
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ class _ColumnRef(_FrozenModel):
column: str


class ColumnRef(_ParserBaseModel):
class ColumnRef(_FrozenModel):
table: Urn
column: str

Expand Down Expand Up @@ -929,6 +929,7 @@ def _translate_sqlglot_type(
TypeClass = ArrayTypeClass
elif sqlglot_type in {
sqlglot.exp.DataType.Type.UNKNOWN,
sqlglot.exp.DataType.Type.NULL,
}:
return None
else:
Expand Down Expand Up @@ -1090,7 +1091,7 @@ def _sqlglot_lineage_inner(
table_schemas_resolved=total_schemas_resolved,
)
logger.debug(
f"Resolved {len(table_name_schema_mapping)} of {len(tables)} table schemas"
f"Resolved {total_schemas_resolved} of {total_tables_discovered} table schemas"
)

# Simplify the input statement for column-level lineage generation.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@
"com.linkedin.pegasus2avro.schema.StringType": {}
}
},
"nativeDataType": "VARCHAR",
"nativeDataType": "TEXT",
"recursive": false,
"isPartOfKey": false
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@
"com.linkedin.pegasus2avro.schema.StringType": {}
}
},
"nativeDataType": "VARCHAR",
"nativeDataType": "TEXT",
"recursive": false,
"isPartOfKey": false
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@
"com.linkedin.pegasus2avro.schema.StringType": {}
}
},
"nativeDataType": "VARCHAR",
"nativeDataType": "TEXT",
"recursive": false,
"isPartOfKey": false
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@
"com.linkedin.pegasus2avro.schema.StringType": {}
}
},
"nativeDataType": "VARCHAR",
"nativeDataType": "TEXT",
"recursive": false,
"isPartOfKey": false
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@
"com.linkedin.pegasus2avro.schema.StringType": {}
}
},
"nativeDataType": "VARCHAR",
"nativeDataType": "TEXT",
"recursive": false,
"isPartOfKey": false
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@
"com.linkedin.pegasus2avro.schema.StringType": {}
}
},
"nativeDataType": "VARCHAR",
"nativeDataType": "TEXT",
"recursive": false,
"isPartOfKey": false
},
Expand Down

0 comments on commit d460f82

Please sign in to comment.