diff --git a/metadata-ingestion/src/datahub/api/entities/datajob/dataflow.py b/metadata-ingestion/src/datahub/api/entities/datajob/dataflow.py index acd708ee81a5c3..cb2c536bbab20f 100644 --- a/metadata-ingestion/src/datahub/api/entities/datajob/dataflow.py +++ b/metadata-ingestion/src/datahub/api/entities/datajob/dataflow.py @@ -80,9 +80,9 @@ def __post_init__(self): ) def generate_ownership_aspect(self): - owners = set([builder.make_user_urn(owner) for owner in self.owners]) | set( - [builder.make_group_urn(owner) for owner in self.group_owners] - ) + owners = {builder.make_user_urn(owner) for owner in self.owners} | { + builder.make_group_urn(owner) for owner in self.group_owners + } ownership = OwnershipClass( owners=[ OwnerClass( diff --git a/metadata-ingestion/src/datahub/api/entities/datajob/datajob.py b/metadata-ingestion/src/datahub/api/entities/datajob/datajob.py index 0ad786d68643de..69cbcc4c3e45b1 100644 --- a/metadata-ingestion/src/datahub/api/entities/datajob/datajob.py +++ b/metadata-ingestion/src/datahub/api/entities/datajob/datajob.py @@ -70,9 +70,9 @@ def __post_init__(self): ) def generate_ownership_aspect(self) -> Iterable[OwnershipClass]: - owners = set([builder.make_user_urn(owner) for owner in self.owners]) | set( - [builder.make_group_urn(owner) for owner in self.group_owners] - ) + owners = {builder.make_user_urn(owner) for owner in self.owners} | { + builder.make_group_urn(owner) for owner in self.group_owners + } ownership = OwnershipClass( owners=[ OwnerClass( diff --git a/metadata-ingestion/src/datahub/api/entities/dataproduct/dataproduct.py b/metadata-ingestion/src/datahub/api/entities/dataproduct/dataproduct.py index 61bda90447c624..408d6bc7256c6f 100644 --- a/metadata-ingestion/src/datahub/api/entities/dataproduct/dataproduct.py +++ b/metadata-ingestion/src/datahub/api/entities/dataproduct/dataproduct.py @@ -276,7 +276,7 @@ def from_yaml( cls, file: Path, graph: DataHubGraph, - ) -> "DataProduct": + ) -> DataProduct: with open(file) as fp: yaml = YAML(typ="rt") # default, if not specfied, is 'rt' (round-trip) orig_dictionary = yaml.load(fp) @@ -291,7 +291,7 @@ def from_yaml( return parsed_data_product @classmethod - def from_datahub(cls, graph: DataHubGraph, id: str) -> "DataProduct": + def from_datahub(cls, graph: DataHubGraph, id: str) -> DataProduct: data_product_properties: Optional[ DataProductPropertiesClass ] = graph.get_aspect(id, DataProductPropertiesClass) @@ -384,7 +384,7 @@ def _patch_ownership( patches_drop[i] = o # Figure out what if any are new owners to add - new_owners_to_add = set(o for o in new_owner_type_map) - set(owners_matched) + new_owners_to_add = {o for o in new_owner_type_map} - set(owners_matched) if new_owners_to_add: for new_owner in new_owners_to_add: new_owner_type = new_owner_type_map[new_owner] diff --git a/metadata-ingestion/src/datahub/api/entities/dataset/dataset.py b/metadata-ingestion/src/datahub/api/entities/dataset/dataset.py index 4e74a410b5f64e..c71bced38f8aa9 100644 --- a/metadata-ingestion/src/datahub/api/entities/dataset/dataset.py +++ b/metadata-ingestion/src/datahub/api/entities/dataset/dataset.py @@ -242,7 +242,7 @@ def generate_mcp( if self.schema_metadata: if self.schema_metadata.file: - with open(self.schema_metadata.file, "r") as schema_fp: + with open(self.schema_metadata.file) as schema_fp: schema_string = schema_fp.read() schema_metadata = SchemaMetadataClass( schemaName=self.name or self.id or self.urn or "", @@ -377,8 +377,7 @@ def generate_mcp( type="COPY", ) ) - for patch_event in patch_builder.build(): - yield patch_event + yield from patch_builder.build() logger.info(f"Created dataset {self.urn}") diff --git a/metadata-ingestion/src/datahub/api/entities/forms/forms.py b/metadata-ingestion/src/datahub/api/entities/forms/forms.py index fd260e3171ed87..5ac08b6e64ed46 100644 --- a/metadata-ingestion/src/datahub/api/entities/forms/forms.py +++ b/metadata-ingestion/src/datahub/api/entities/forms/forms.py @@ -106,7 +106,7 @@ def create(file: str) -> None: emitter: DataHubGraph with get_default_graph() as emitter: - with open(file, "r") as fp: + with open(file) as fp: forms: List[dict] = yaml.safe_load(fp) for form_raw in forms: form = Forms.parse_obj(form_raw) @@ -204,7 +204,7 @@ def validate_prompts(self, emitter: DataHubGraph) -> List[FormPromptClass]: def upload_entities_for_form(self, emitter: DataHubGraph) -> Union[None, Exception]: if self.entities and self.entities.urns: formatted_entity_urns = ", ".join( - ['"{}"'.format(value) for value in self.entities.urns] + [f'"{value}"' for value in self.entities.urns] ) query = UPLOAD_ENTITIES_FOR_FORMS.format( form_urn=self.urn, entity_urns=formatted_entity_urns @@ -281,7 +281,7 @@ def add_owners(self, emitter: DataHubGraph) -> Union[None, Exception]: @staticmethod def format_form_filter(field: str, urns: List[str]) -> str: - formatted_urns = ", ".join(['"{}"'.format(urn) for urn in urns]) + formatted_urns = ", ".join([f'"{urn}"' for urn in urns]) return FIELD_FILTER_TEMPLATE.format(field=field, values=formatted_urns) @staticmethod diff --git a/metadata-ingestion/src/datahub/api/entities/structuredproperties/structuredproperties.py b/metadata-ingestion/src/datahub/api/entities/structuredproperties/structuredproperties.py index eacbff4b31d935..ed97948de9034c 100644 --- a/metadata-ingestion/src/datahub/api/entities/structuredproperties/structuredproperties.py +++ b/metadata-ingestion/src/datahub/api/entities/structuredproperties/structuredproperties.py @@ -98,7 +98,7 @@ def create(file: str) -> None: emitter: DataHubGraph with get_default_graph() as emitter: - with open(file, "r") as fp: + with open(file) as fp: structuredproperties: List[dict] = yaml.safe_load(fp) for structuredproperty_raw in structuredproperties: structuredproperty = StructuredProperties.parse_obj( diff --git a/metadata-ingestion/src/datahub/cli/config_utils.py b/metadata-ingestion/src/datahub/cli/config_utils.py index 7877a6bf6df593..8cddc41551038a 100644 --- a/metadata-ingestion/src/datahub/cli/config_utils.py +++ b/metadata-ingestion/src/datahub/cli/config_utils.py @@ -84,7 +84,7 @@ def ensure_datahub_config() -> None: def get_client_config(as_dict: bool = False) -> Union[Optional[DatahubConfig], dict]: - with open(DATAHUB_CONFIG_PATH, "r") as stream: + with open(DATAHUB_CONFIG_PATH) as stream: try: config_json = yaml.safe_load(stream) if as_dict: diff --git a/metadata-ingestion/src/datahub/cli/docker_check.py b/metadata-ingestion/src/datahub/cli/docker_check.py index b80c2f3df01da0..ff3965455d1633 100644 --- a/metadata-ingestion/src/datahub/cli/docker_check.py +++ b/metadata-ingestion/src/datahub/cli/docker_check.py @@ -203,7 +203,7 @@ def check_docker_quickstart() -> QuickstartStatus: all_containers = set() for config_file in config_files: - with open(config_file, "r") as config_file: + with open(config_file) as config_file: all_containers.update( yaml.safe_load(config_file).get("services", {}).keys() ) diff --git a/metadata-ingestion/src/datahub/cli/docker_cli.py b/metadata-ingestion/src/datahub/cli/docker_cli.py index e35d4a5c93c2d6..707a9cab076e68 100644 --- a/metadata-ingestion/src/datahub/cli/docker_cli.py +++ b/metadata-ingestion/src/datahub/cli/docker_cli.py @@ -76,7 +76,7 @@ class Architectures(Enum): m2 = "m2" -@functools.lru_cache() +@functools.lru_cache def _docker_subprocess_env() -> Dict[str, str]: # platform.machine() is equivalent to `uname -m`, as per https://stackoverflow.com/a/45124927/5004662 DOCKER_COMPOSE_PLATFORM: str = "linux/" + platform.machine() @@ -316,7 +316,7 @@ def _restore( assert os.path.exists( resolved_restore_file ), f"File {resolved_restore_file} does not exist" - with open(resolved_restore_file, "r") as fp: + with open(resolved_restore_file) as fp: result = subprocess.run( [ "bash", @@ -324,8 +324,7 @@ def _restore( f"docker exec -i {DOCKER_COMPOSE_PROJECT_NAME}-mysql-1 bash -c 'mysql -uroot -pdatahub datahub '", ], stdin=fp, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, + capture_output=True, ) if result.returncode != 0: logger.error("Failed to run MySQL restore") @@ -381,7 +380,7 @@ def _restore( ) env_fp.flush() if logger.isEnabledFor(logging.DEBUG): - with open(env_fp.name, "r") as env_fp_reader: + with open(env_fp.name) as env_fp_reader: logger.debug(f"Env file contents: {env_fp_reader.read()}") # continue to issue the restore indices command @@ -401,8 +400,7 @@ def _restore( + "acryldata/datahub-upgrade:${DATAHUB_VERSION:-head}" + " -u RestoreIndices -a clean", ], - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, + capture_output=True, ) logger.info( f"Index restore command finished with status {result.returncode}" diff --git a/metadata-ingestion/src/datahub/cli/ingest_cli.py b/metadata-ingestion/src/datahub/cli/ingest_cli.py index 2e66b18e481453..453f1d29343728 100644 --- a/metadata-ingestion/src/datahub/cli/ingest_cli.py +++ b/metadata-ingestion/src/datahub/cli/ingest_cli.py @@ -588,6 +588,6 @@ def rollback( for row in unsafe_entities: writer.writerow([row.get("urn")]) - except IOError as e: + except OSError as e: logger.exception(f"Unable to save rollback failure report: {e}") sys.exit(f"Unable to write reports to {report_dir}") diff --git a/metadata-ingestion/src/datahub/cli/quickstart_versioning.py b/metadata-ingestion/src/datahub/cli/quickstart_versioning.py index 493869ac77bb83..9739af5127f4d1 100644 --- a/metadata-ingestion/src/datahub/cli/quickstart_versioning.py +++ b/metadata-ingestion/src/datahub/cli/quickstart_versioning.py @@ -55,7 +55,7 @@ def fetch_quickstart_config(cls) -> "QuickstartVersionMappingConfig": "LOCAL_QUICKSTART_MAPPING_FILE is set, will try to read from local file." ) path = os.path.expanduser(LOCAL_QUICKSTART_MAPPING_FILE) - with open(path, "r") as f: + with open(path) as f: config_raw = yaml.safe_load(f) return cls.parse_obj(config_raw) @@ -70,7 +70,7 @@ def fetch_quickstart_config(cls) -> "QuickstartVersionMappingConfig": ) try: path = os.path.expanduser(DEFAULT_LOCAL_CONFIG_PATH) - with open(path, "r") as f: + with open(path) as f: config_raw = yaml.safe_load(f) except Exception: logger.debug("Couldn't read from local file either.") diff --git a/metadata-ingestion/src/datahub/configuration/common.py b/metadata-ingestion/src/datahub/configuration/common.py index 7aaa1706a6420a..a5971258bcdaac 100644 --- a/metadata-ingestion/src/datahub/configuration/common.py +++ b/metadata-ingestion/src/datahub/configuration/common.py @@ -2,13 +2,24 @@ import unittest.mock from abc import ABC, abstractmethod from enum import auto -from typing import IO, Any, ClassVar, Dict, List, Optional, Type, TypeVar, Union +from typing import ( + IO, + Any, + ClassVar, + Dict, + List, + Optional, + Type, + TypeVar, + Union, + runtime_checkable, +) import pydantic from cached_property import cached_property from pydantic import BaseModel, Extra, ValidationError from pydantic.fields import Field -from typing_extensions import Protocol, runtime_checkable +from typing_extensions import Protocol from datahub.configuration._config_enum import ConfigEnum from datahub.configuration.pydantic_migration_helpers import PYDANTIC_VERSION_2 diff --git a/metadata-ingestion/src/datahub/configuration/git.py b/metadata-ingestion/src/datahub/configuration/git.py index 3c76c8da0d5717..d237cd9ddd306c 100644 --- a/metadata-ingestion/src/datahub/configuration/git.py +++ b/metadata-ingestion/src/datahub/configuration/git.py @@ -101,7 +101,7 @@ def deploy_key_filled_from_deploy_key_file( if v is None: deploy_key_file = values.get("deploy_key_file") if deploy_key_file is not None: - with open(deploy_key_file, "r") as fp: + with open(deploy_key_file) as fp: deploy_key = SecretStr(fp.read()) return deploy_key return v diff --git a/metadata-ingestion/src/datahub/configuration/source_common.py b/metadata-ingestion/src/datahub/configuration/source_common.py index 4b982db2715c20..a792201f9defe9 100644 --- a/metadata-ingestion/src/datahub/configuration/source_common.py +++ b/metadata-ingestion/src/datahub/configuration/source_common.py @@ -10,9 +10,9 @@ DEFAULT_ENV = FabricTypeClass.PROD # Get all the constants from the FabricTypeClass. It's not an enum, so this is a bit hacky but works. -ALL_ENV_TYPES: Set[str] = set( - [value for name, value in vars(FabricTypeClass).items() if not name.startswith("_")] -) +ALL_ENV_TYPES: Set[str] = { + value for name, value in vars(FabricTypeClass).items() if not name.startswith("_") +} class PlatformInstanceConfigMixin(ConfigModel): diff --git a/metadata-ingestion/src/datahub/emitter/request_helper.py b/metadata-ingestion/src/datahub/emitter/request_helper.py index 5263ba1912592a..4e1ec026648b8d 100644 --- a/metadata-ingestion/src/datahub/emitter/request_helper.py +++ b/metadata-ingestion/src/datahub/emitter/request_helper.py @@ -25,4 +25,4 @@ def make_curl_command( ), url, ] - return " ".join(shlex.quote(fragment) for fragment in fragments) + return shlex.join(fragments) diff --git a/metadata-ingestion/src/datahub/ingestion/api/report.py b/metadata-ingestion/src/datahub/ingestion/api/report.py index 08b20d9e856911..4a74d6cbc62682 100644 --- a/metadata-ingestion/src/datahub/ingestion/api/report.py +++ b/metadata-ingestion/src/datahub/ingestion/api/report.py @@ -5,12 +5,12 @@ from dataclasses import dataclass from datetime import datetime, timedelta from enum import Enum -from typing import Any, Optional +from typing import Any, Optional, runtime_checkable import humanfriendly import pydantic from pydantic import BaseModel -from typing_extensions import Literal, Protocol, runtime_checkable +from typing_extensions import Literal, Protocol from datahub.ingestion.api.report_helpers import format_datetime_relative from datahub.utilities.lossy_collections import LossyList diff --git a/metadata-ingestion/src/datahub/ingestion/extractor/json_ref_patch.py b/metadata-ingestion/src/datahub/ingestion/extractor/json_ref_patch.py index daf43bd87ba602..2224a096f53875 100644 --- a/metadata-ingestion/src/datahub/ingestion/extractor/json_ref_patch.py +++ b/metadata-ingestion/src/datahub/ingestion/extractor/json_ref_patch.py @@ -15,7 +15,7 @@ def title_swapping_callback(self: JsonRef) -> dict: try: base_doc = self.loader(uri) except Exception as e: - raise self._error("%s: %s" % (e.__class__.__name__, str(e)), cause=e) from e + raise self._error(f"{e.__class__.__name__}: {str(e)}", cause=e) from e base_doc = _replace_refs( base_doc, **{**self._ref_kwargs, "base_uri": uri, "recursing": False} ) diff --git a/metadata-ingestion/src/datahub/ingestion/extractor/schema_util.py b/metadata-ingestion/src/datahub/ingestion/extractor/schema_util.py index df0b732833fbe1..d5af4f7a2389c0 100644 --- a/metadata-ingestion/src/datahub/ingestion/extractor/schema_util.py +++ b/metadata-ingestion/src/datahub/ingestion/extractor/schema_util.py @@ -436,8 +436,7 @@ def gen_items_from_list_tuple_or_scalar( val: Any, ) -> Iterable[avro.schema.Schema]: if isinstance(val, (list, tuple)): - for i in val: - yield i + yield from val else: yield val diff --git a/metadata-ingestion/src/datahub/ingestion/graph/client.py b/metadata-ingestion/src/datahub/ingestion/graph/client.py index c6b2c8aad82e9f..be3aa2e80780a9 100644 --- a/metadata-ingestion/src/datahub/ingestion/graph/client.py +++ b/metadata-ingestion/src/datahub/ingestion/graph/client.py @@ -324,6 +324,7 @@ def get_ownership(self, entity_urn: str) -> Optional[OwnershipClass]: def get_schema_metadata(self, entity_urn: str) -> Optional[SchemaMetadataClass]: return self.get_aspect(entity_urn=entity_urn, aspect_type=SchemaMetadataClass) + @deprecated(reason="Use get_aspect directly.") def get_domain_properties(self, entity_urn: str) -> Optional[DomainPropertiesClass]: return self.get_aspect(entity_urn=entity_urn, aspect_type=DomainPropertiesClass) @@ -343,11 +344,9 @@ def get_glossary_terms(self, entity_urn: str) -> Optional[GlossaryTermsClass]: def get_domain(self, entity_urn: str) -> Optional[DomainsClass]: return self.get_aspect(entity_urn=entity_urn, aspect_type=DomainsClass) + @deprecated(reason="Use get_aspect directly.") def get_browse_path(self, entity_urn: str) -> Optional[BrowsePathsClass]: - return self.get_aspect( - entity_urn=entity_urn, - aspect_type=BrowsePathsClass, - ) + return self.get_aspect(entity_urn=entity_urn, aspect_type=BrowsePathsClass) def get_usage_aspects_from_urn( self, entity_urn: str, start_timestamp: int, end_timestamp: int @@ -1095,7 +1094,7 @@ def delete_references_to_urn( related_aspects = response.get("relatedAspects", []) return reference_count, related_aspects - @functools.lru_cache() + @functools.lru_cache def _make_schema_resolver( self, platform: str, diff --git a/metadata-ingestion/src/datahub/ingestion/source/aws/glue.py b/metadata-ingestion/src/datahub/ingestion/source/aws/glue.py index 0ac13b256eb030..062ab381d40b77 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/aws/glue.py +++ b/metadata-ingestion/src/datahub/ingestion/source/aws/glue.py @@ -1009,7 +1009,7 @@ def _transform_extraction(self) -> Iterable[MetadataWorkUnit]: # in Glue, it's possible for two buckets to have files of different extensions # if this happens, we append the extension in the URN so the sources can be distinguished # see process_dataflow_node() for details - s3_formats: DefaultDict[str, Set[Optional[str]]] = defaultdict(lambda: set()) + s3_formats: DefaultDict[str, Set[Optional[str]]] = defaultdict(set) for dag in dags.values(): if dag is not None: for s3_name, extension in self.get_dataflow_s3_names(dag): diff --git a/metadata-ingestion/src/datahub/ingestion/source/aws/sagemaker_processors/job_classes.py b/metadata-ingestion/src/datahub/ingestion/source/aws/sagemaker_processors/job_classes.py index 442c5eb2e0a8ff..6e0e352db4af71 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/aws/sagemaker_processors/job_classes.py +++ b/metadata-ingestion/src/datahub/ingestion/source/aws/sagemaker_processors/job_classes.py @@ -1,6 +1,4 @@ -from typing import Dict - -from typing_extensions import Final +from typing import Dict, Final from datahub.metadata.schema_classes import JobStatusClass diff --git a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery.py b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery.py index 27ad2008dae007..eecc0f43729690 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery.py +++ b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery.py @@ -223,7 +223,7 @@ class BigqueryV2Source(StatefulIngestionSourceBase, TestableSource): } def __init__(self, ctx: PipelineContext, config: BigQueryV2Config): - super(BigqueryV2Source, self).__init__(config, ctx) + super().__init__(config, ctx) self.config: BigQueryV2Config = config self.report: BigQueryV2Report = BigQueryV2Report() self.classification_handler = ClassificationHandler(self.config, self.report) @@ -340,7 +340,7 @@ def metadata_read_capability_test( ) -> CapabilityReport: for project_id in project_ids: try: - logger.info((f"Metadata read capability test for project {project_id}")) + logger.info(f"Metadata read capability test for project {project_id}") client: bigquery.Client = config.get_bigquery_client() assert client bigquery_data_dictionary = BigQuerySchemaApi( diff --git a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/lineage.py b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/lineage.py index c8c1e7c893c6c6..c41207ec67f620 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/lineage.py +++ b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/lineage.py @@ -551,22 +551,20 @@ def lineage_via_catalog_lineage_api( # Only builds lineage map when the table has upstreams logger.debug("Found %d upstreams for table %s", len(upstreams), table) if upstreams: - lineage_map[destination_table_str] = set( - [ - LineageEdge( - table=str( - BigQueryTableRef( - table_identifier=BigqueryTableIdentifier.from_string_name( - source_table - ) + lineage_map[destination_table_str] = { + LineageEdge( + table=str( + BigQueryTableRef( + table_identifier=BigqueryTableIdentifier.from_string_name( + source_table ) - ), - column_mapping=frozenset(), - auditStamp=curr_date, - ) - for source_table in upstreams - ] - ) + ) + ), + column_mapping=frozenset(), + auditStamp=curr_date, + ) + for source_table in upstreams + } return lineage_map except Exception as e: self.error( diff --git a/metadata-ingestion/src/datahub/ingestion/source/csv_enricher.py b/metadata-ingestion/src/datahub/ingestion/source/csv_enricher.py index ec3d1715aaecef..d998c37d32ed2a 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/csv_enricher.py +++ b/metadata-ingestion/src/datahub/ingestion/source/csv_enricher.py @@ -154,9 +154,7 @@ def get_resource_glossary_terms_work_unit( # If we want to overwrite or there are no existing terms, create a new GlossaryTerms object current_terms = GlossaryTermsClass(term_associations, get_audit_stamp()) else: - current_term_urns: Set[str] = set( - [term.urn for term in current_terms.terms] - ) + current_term_urns: Set[str] = {term.urn for term in current_terms.terms} term_associations_filtered: List[GlossaryTermAssociationClass] = [ association for association in term_associations @@ -192,7 +190,7 @@ def get_resource_tags_work_unit( # If we want to overwrite or there are no existing tags, create a new GlobalTags object current_tags = GlobalTagsClass(tag_associations) else: - current_tag_urns: Set[str] = set([tag.tag for tag in current_tags.tags]) + current_tag_urns: Set[str] = {tag.tag for tag in current_tags.tags} tag_associations_filtered: List[TagAssociationClass] = [ association for association in tag_associations @@ -453,9 +451,9 @@ def process_sub_resource_row( field_match = True if has_terms: if field_info.glossaryTerms and not self.should_overwrite: - current_term_urns = set( - [term.urn for term in field_info.glossaryTerms.terms] - ) + current_term_urns = { + term.urn for term in field_info.glossaryTerms.terms + } term_associations_filtered = [ association for association in term_associations @@ -472,9 +470,9 @@ def process_sub_resource_row( if has_tags: if field_info.globalTags and not self.should_overwrite: - current_tag_urns = set( - [tag.tag for tag in field_info.globalTags.tags] - ) + current_tag_urns = { + tag.tag for tag in field_info.globalTags.tags + } tag_associations_filtered = [ association for association in tag_associations @@ -631,9 +629,7 @@ def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]: f"Cannot read remote file {self.config.filename}, error:{e}" ) else: - with open( - pathlib.Path(self.config.filename), mode="r", encoding="utf-8-sig" - ) as f: + with open(pathlib.Path(self.config.filename), encoding="utf-8-sig") as f: rows = list(csv.DictReader(f, delimiter=self.config.delimiter)) for row in rows: diff --git a/metadata-ingestion/src/datahub/ingestion/source/data_lake_common/data_lake_utils.py b/metadata-ingestion/src/datahub/ingestion/source/data_lake_common/data_lake_utils.py index b04718a9eabbad..5393dd4835d8c1 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/data_lake_common/data_lake_utils.py +++ b/metadata-ingestion/src/datahub/ingestion/source/data_lake_common/data_lake_utils.py @@ -58,8 +58,7 @@ def create_emit_containers( ) self.processed_containers.append(container_key.guid()) logger.debug(f"Creating container with key: {container_key}") - for wu in container_wus: - yield wu + yield from container_wus def gen_folder_key(self, abs_path): return FolderKey( diff --git a/metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_common.py b/metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_common.py index d7a3aba7065ca8..ebba664a811c78 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_common.py +++ b/metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_common.py @@ -1853,7 +1853,7 @@ def get_transformed_tags_by_prefix( entity_urn: str, tags_prefix_filter: str, ) -> List[TagAssociationClass]: - tag_set = set([new_tag.tag for new_tag in new_tags]) + tag_set = {new_tag.tag for new_tag in new_tags} if self.ctx.graph: existing_tags_class = self.ctx.graph.get_tags(entity_urn) @@ -1868,7 +1868,7 @@ def get_transformed_tags_by_prefix( def get_transformed_terms( self, new_terms: List[GlossaryTermAssociation], entity_urn: str ) -> List[GlossaryTermAssociation]: - term_id_set = set([term.urn for term in new_terms]) + term_id_set = {term.urn for term in new_terms} if self.ctx.graph: existing_terms_class = self.ctx.graph.get_glossary_terms(entity_urn) if existing_terms_class and existing_terms_class.terms: diff --git a/metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_core.py b/metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_core.py index 0fc35ddd281c8e..750fee227b97ab 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_core.py +++ b/metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_core.py @@ -481,7 +481,7 @@ def load_file_as_json( ) return json.loads(response["Body"].read().decode("utf-8")) else: - with open(uri, "r") as f: + with open(uri) as f: return json.load(f) def loadManifestAndCatalog( diff --git a/metadata-ingestion/src/datahub/ingestion/source/delta_lake/source.py b/metadata-ingestion/src/datahub/ingestion/source/delta_lake/source.py index 39066b0c265533..4f427aa203c20c 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/delta_lake/source.py +++ b/metadata-ingestion/src/datahub/ingestion/source/delta_lake/source.py @@ -151,7 +151,7 @@ def delta_type_to_hive_type(self, field_type: Any) -> str: and create the native datatype """ parsed_struct += ( - "{0}:{1}".format( + "{}:{}".format( field.get("name"), self.delta_type_to_hive_type(field.get("type")), ) @@ -343,8 +343,7 @@ def process_folder(self, path: str) -> Iterable[MetadataWorkUnit]: delta_table = read_delta_table(path, self.storage_options, self.source_config) if delta_table: logger.debug(f"Delta table found at: {path}") - for wu in self.ingest_table(delta_table, path.rstrip("/")): - yield wu + yield from self.ingest_table(delta_table, path.rstrip("/")) else: for folder in self.get_folders(path): yield from self.process_folder(folder) diff --git a/metadata-ingestion/src/datahub/ingestion/source/file.py b/metadata-ingestion/src/datahub/ingestion/source/file.py index 590aa59f7b5b6e..49cc314426eb55 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/file.py +++ b/metadata-ingestion/src/datahub/ingestion/source/file.py @@ -256,7 +256,7 @@ def _iterate_file(self, path: str) -> Iterable[Tuple[int, Any]]: file_read_mode = self.config.read_mode if file_read_mode == FileReadMode.BATCH: - with open(path, "r") as f: + with open(path) as f: parse_start_time = datetime.datetime.now() obj_list = json.load(f) parse_end_time = datetime.datetime.now() diff --git a/metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran.py b/metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran.py index 91b0101c10451b..c8ae779b602b8a 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran.py +++ b/metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran.py @@ -66,7 +66,7 @@ class FivetranSource(StatefulIngestionSourceBase): platform: str = "fivetran" def __init__(self, config: FivetranSourceConfig, ctx: PipelineContext): - super(FivetranSource, self).__init__(config, ctx) + super().__init__(config, ctx) self.config = config self.report = FivetranSourceReport() diff --git a/metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran_log_api.py b/metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran_log_api.py index d210941bccba16..a9eb59f9297992 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran_log_api.py +++ b/metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran_log_api.py @@ -76,7 +76,7 @@ def _initialize_fivetran_variables( ) def _query(self, query: str) -> List[Dict]: - logger.debug("Query : {}".format(query)) + logger.debug(f"Query : {query}") resp = self.engine.execute(query) return [row for row in resp] diff --git a/metadata-ingestion/src/datahub/ingestion/source/identity/azure_ad.py b/metadata-ingestion/src/datahub/ingestion/source/identity/azure_ad.py index 7e3ff7d4fb84cb..2bd05ca11e234e 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/identity/azure_ad.py +++ b/metadata-ingestion/src/datahub/ingestion/source/identity/azure_ad.py @@ -263,7 +263,7 @@ def create(cls, config_dict, ctx): return cls(config, ctx) def __init__(self, config: AzureADConfig, ctx: PipelineContext): - super(AzureADSource, self).__init__(config, ctx) + super().__init__(config, ctx) self.config = config self.report = AzureADSourceReport( filtered_tracking=self.config.filtered_tracking @@ -488,7 +488,7 @@ def _get_azure_ad_group_members(self, azure_ad_group: dict) -> Iterable[List]: yield from self._get_azure_ad_data(kind=kind) def _get_azure_ad_data(self, kind: str) -> Iterable[List]: - headers = {"Authorization": "Bearer {}".format(self.token)} + headers = {"Authorization": f"Bearer {self.token}"} # 'ConsistencyLevel': 'eventual'} url = self.config.graph_url + kind while True: diff --git a/metadata-ingestion/src/datahub/ingestion/source/identity/okta.py b/metadata-ingestion/src/datahub/ingestion/source/identity/okta.py index 5c1edce7da6c9a..49b6422902299a 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/identity/okta.py +++ b/metadata-ingestion/src/datahub/ingestion/source/identity/okta.py @@ -289,7 +289,7 @@ def create(cls, config_dict, ctx): return cls(config, ctx) def __init__(self, config: OktaConfig, ctx: PipelineContext): - super(OktaSource, self).__init__(config, ctx) + super().__init__(config, ctx) self.config = config self.report = OktaSourceReport() self.okta_client = self._create_okta_client() @@ -465,8 +465,7 @@ def _get_okta_groups( "okta_groups", f"Failed to fetch Groups from Okta API: {err}" ) if groups: - for group in groups: - yield group + yield from groups if resp and resp.has_next(): sleep(self.config.delay_seconds) try: @@ -504,8 +503,7 @@ def _get_okta_group_users( f"Failed to fetch Users of Group {group.profile.name} from Okta API: {err}", ) if users: - for user in users: - yield user + yield from users if resp and resp.has_next(): sleep(self.config.delay_seconds) try: @@ -542,8 +540,7 @@ def _get_okta_users(self, event_loop: asyncio.AbstractEventLoop) -> Iterable[Use "okta_users", f"Failed to fetch Users from Okta API: {err}" ) if users: - for user in users: - yield user + yield from users if resp and resp.has_next(): sleep(self.config.delay_seconds) try: diff --git a/metadata-ingestion/src/datahub/ingestion/source/kafka_connect.py b/metadata-ingestion/src/datahub/ingestion/source/kafka_connect.py index 4e5ea9154359bf..cf70eb95762c4b 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/kafka_connect.py +++ b/metadata-ingestion/src/datahub/ingestion/source/kafka_connect.py @@ -263,12 +263,12 @@ def __init__( KNOWN_NONTOPICROUTING_TRANSFORMS = ( KAFKA_NONTOPICROUTING_TRANSFORMS + [ - "org.apache.kafka.connect.transforms.{}".format(t) + f"org.apache.kafka.connect.transforms.{t}" for t in KAFKA_NONTOPICROUTING_TRANSFORMS ] + CONFLUENT_NONTOPICROUTING_TRANSFORMS + [ - "io.confluent.connect.transforms.{}".format(t) + f"io.confluent.connect.transforms.{t}" for t in CONFLUENT_NONTOPICROUTING_TRANSFORMS ] ) @@ -314,9 +314,9 @@ def get_parser( transform = {"name": name} transforms.append(transform) for key in self.connector_manifest.config.keys(): - if key.startswith("transforms.{}.".format(name)): + if key.startswith(f"transforms.{name}."): transform[ - key.replace("transforms.{}.".format(name), "") + key.replace(f"transforms.{name}.", "") ] = self.connector_manifest.config[key] return self.JdbcParser( @@ -729,7 +729,7 @@ def _extract_lineages(self): source_platform = parser.source_platform server_name = parser.server_name database_name = parser.database_name - topic_naming_pattern = r"({0})\.(\w+\.\w+)".format(server_name) + topic_naming_pattern = rf"({server_name})\.(\w+\.\w+)" if not self.connector_manifest.topic_names: return lineages diff --git a/metadata-ingestion/src/datahub/ingestion/source/ldap.py b/metadata-ingestion/src/datahub/ingestion/source/ldap.py index 72985688273f60..1368a5b83fe6f7 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/ldap.py +++ b/metadata-ingestion/src/datahub/ingestion/source/ldap.py @@ -205,7 +205,7 @@ class LDAPSource(StatefulIngestionSourceBase): def __init__(self, ctx: PipelineContext, config: LDAPSourceConfig): """Constructor.""" - super(LDAPSource, self).__init__(config, ctx) + super().__init__(config, ctx) self.config = config # ensure prior defaults are in place diff --git a/metadata-ingestion/src/datahub/ingestion/source/looker/lkml_patched.py b/metadata-ingestion/src/datahub/ingestion/source/looker/lkml_patched.py index 6506682b8ed8d4..a44d7e5215c358 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/looker/lkml_patched.py +++ b/metadata-ingestion/src/datahub/ingestion/source/looker/lkml_patched.py @@ -24,5 +24,5 @@ def load_lkml(path: Union[str, pathlib.Path]) -> dict: # Using this method instead of lkml.load directly ensures # that our patches to lkml are applied. - with open(path, "r") as file: + with open(path) as file: return lkml.load(file) diff --git a/metadata-ingestion/src/datahub/ingestion/source/looker/looker_config.py b/metadata-ingestion/src/datahub/ingestion/source/looker/looker_config.py index ec4d8b78b0d065..8de213cfabaf0b 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/looker/looker_config.py +++ b/metadata-ingestion/src/datahub/ingestion/source/looker/looker_config.py @@ -1,11 +1,10 @@ import dataclasses import os import re -from typing import Any, Dict, List, Optional, Union, cast +from typing import Any, ClassVar, Dict, List, Optional, Union, cast import pydantic from pydantic import Field, validator -from typing_extensions import ClassVar from datahub.configuration import ConfigModel from datahub.configuration.common import AllowDenyPattern diff --git a/metadata-ingestion/src/datahub/ingestion/source/looker/looker_source.py b/metadata-ingestion/src/datahub/ingestion/source/looker/looker_source.py index dfa374fe0d779d..c4ba3146031af4 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/looker/looker_source.py +++ b/metadata-ingestion/src/datahub/ingestion/source/looker/looker_source.py @@ -644,9 +644,7 @@ def _make_chart_metadata_events( customProperties={ "upstream_fields": ( ",".join( - sorted( - set(field.name for field in dashboard_element.input_fields) - ) + sorted({field.name for field in dashboard_element.input_fields}) ) if dashboard_element.input_fields else "" @@ -969,8 +967,7 @@ def _make_dashboard_and_chart_mces( dashboard_events = self._make_dashboard_metadata_events( looker_dashboard, list(chart_urns) ) - for dashboard_event in dashboard_events: - yield dashboard_event + yield from dashboard_events def get_ownership( self, looker_dashboard: LookerDashboard diff --git a/metadata-ingestion/src/datahub/ingestion/source/looker/looker_usage.py b/metadata-ingestion/src/datahub/ingestion/source/looker/looker_usage.py index e119e88a24bd7e..c97025d75229b1 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/looker/looker_usage.py +++ b/metadata-ingestion/src/datahub/ingestion/source/looker/looker_usage.py @@ -273,7 +273,7 @@ def _fill_user_stat_aspect( logger.debug("Entering fill user stat aspect") # We first resolve all the users using a threadpool to warm up the cache - user_ids = set([self._get_user_identifier(row) for row in user_wise_rows]) + user_ids = {self._get_user_identifier(row) for row in user_wise_rows} start_time = datetime.datetime.now() with concurrent.futures.ThreadPoolExecutor( max_workers=self.config.max_threads @@ -507,7 +507,7 @@ def append_user_stat( user_urn: Optional[str] = user.get_urn(self.config.strip_user_ids_from_email) if user_urn is None: - logger.warning("user_urn not found for the user {}".format(user)) + logger.warning(f"user_urn not found for the user {user}") return dashboard_stat_aspect.userCounts.append( @@ -614,7 +614,7 @@ def append_user_stat( user_urn: Optional[str] = user.get_urn(self.config.strip_user_ids_from_email) if user_urn is None: - logger.warning("user_urn not found for the user {}".format(user)) + logger.warning(f"user_urn not found for the user {user}") return chart_stat_aspect.userCounts.append( diff --git a/metadata-ingestion/src/datahub/ingestion/source/looker/lookml_source.py b/metadata-ingestion/src/datahub/ingestion/source/looker/lookml_source.py index 9dd276d054de3b..4a872f8b1a025a 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/looker/lookml_source.py +++ b/metadata-ingestion/src/datahub/ingestion/source/looker/lookml_source.py @@ -670,7 +670,7 @@ def _load_viewfile( return self.viewfile_cache[path] try: - with open(path, "r") as file: + with open(path) as file: raw_file_content = file.read() except Exception as e: self.reporter.report_failure(path, f"failed to load view file: {e}") diff --git a/metadata-ingestion/src/datahub/ingestion/source/metadata/business_glossary.py b/metadata-ingestion/src/datahub/ingestion/source/metadata/business_glossary.py index 0edc8d97529837..d3c4e2e3cd80e8 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/metadata/business_glossary.py +++ b/metadata-ingestion/src/datahub/ingestion/source/metadata/business_glossary.py @@ -520,12 +520,11 @@ def get_workunits_internal( materialize_all_node_urns(glossary_config, self.config.enable_auto_id) path_vs_id = populate_path_vs_id(glossary_config) - for event in auto_workunit( + yield from auto_workunit( get_mces( glossary_config, path_vs_id, ingestion_config=self.config, ctx=self.ctx ) - ): - yield event + ) def get_report(self): return self.report diff --git a/metadata-ingestion/src/datahub/ingestion/source/mongodb.py b/metadata-ingestion/src/datahub/ingestion/source/mongodb.py index cd78d1c0309575..af6b44677dffac 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/mongodb.py +++ b/metadata-ingestion/src/datahub/ingestion/source/mongodb.py @@ -71,7 +71,7 @@ # See https://docs.mongodb.com/manual/reference/local-database/ and # https://docs.mongodb.com/manual/reference/config-database/ and # https://stackoverflow.com/a/48273736/5004662. -DENY_DATABASE_LIST = set(["admin", "config", "local"]) +DENY_DATABASE_LIST = {"admin", "config", "local"} class HostingEnvironment(Enum): diff --git a/metadata-ingestion/src/datahub/ingestion/source/powerbi/m_query/resolver.py b/metadata-ingestion/src/datahub/ingestion/source/powerbi/m_query/resolver.py index 72f9c2167cab96..4d58916c571181 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/powerbi/m_query/resolver.py +++ b/metadata-ingestion/src/datahub/ingestion/source/powerbi/m_query/resolver.py @@ -889,7 +889,7 @@ def get_datasource_server( return ( data_access_func_detail.identifier_accessor.items["Name"] if data_access_func_detail.identifier_accessor is not None - else str() + else "" ) diff --git a/metadata-ingestion/src/datahub/ingestion/source/powerbi/powerbi.py b/metadata-ingestion/src/datahub/ingestion/source/powerbi/powerbi.py index 607f3143423752..16f174525254dc 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/powerbi/powerbi.py +++ b/metadata-ingestion/src/datahub/ingestion/source/powerbi/powerbi.py @@ -580,7 +580,7 @@ def tile_custom_properties(tile: powerbi_data_classes.Tile) -> dict: ) # Browse path - browse_path = BrowsePathsClass(paths=["/powerbi/{}".format(workspace.name)]) + browse_path = BrowsePathsClass(paths=[f"/powerbi/{workspace.name}"]) browse_path_mcp = self.new_mcp( entity_type=Constant.CHART, entity_urn=chart_urn, @@ -990,7 +990,7 @@ def to_chart_mcps( ) # Browse path - browse_path = BrowsePathsClass(paths=["/powerbi/{}".format(workspace.name)]) + browse_path = BrowsePathsClass(paths=[f"/powerbi/{workspace.name}"]) browse_path_mcp = self.new_mcp( entity_type=Constant.CHART, entity_urn=chart_urn, @@ -1195,7 +1195,7 @@ class PowerBiDashboardSource(StatefulIngestionSourceBase, TestableSource): platform: str = "powerbi" def __init__(self, config: PowerBiDashboardSourceConfig, ctx: PipelineContext): - super(PowerBiDashboardSource, self).__init__(config, ctx) + super().__init__(config, ctx) self.source_config = config self.reporter = PowerBiDashboardSourceReport() self.dataplatform_instance_resolver = create_dataplatform_instance_resolver( diff --git a/metadata-ingestion/src/datahub/ingestion/source/powerbi/rest_api_wrapper/data_classes.py b/metadata-ingestion/src/datahub/ingestion/source/powerbi/rest_api_wrapper/data_classes.py index 0d41ab00c66f5e..ce4dd9a7a0c0f6 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/powerbi/rest_api_wrapper/data_classes.py +++ b/metadata-ingestion/src/datahub/ingestion/source/powerbi/rest_api_wrapper/data_classes.py @@ -268,7 +268,7 @@ def new_powerbi_dataset(workspace_id: str, raw_instance: dict) -> PowerBIDataset return PowerBIDataset( id=raw_instance["id"], name=raw_instance.get("name"), - description=raw_instance.get("description", str()), + description=raw_instance.get("description", ""), webUrl="{}/details".format(raw_instance.get("webUrl")) if raw_instance.get("webUrl") is not None else None, diff --git a/metadata-ingestion/src/datahub/ingestion/source/powerbi/rest_api_wrapper/data_resolver.py b/metadata-ingestion/src/datahub/ingestion/source/powerbi/rest_api_wrapper/data_resolver.py index 3aeffa60bc28e0..fadd7a48b62f70 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/powerbi/rest_api_wrapper/data_resolver.py +++ b/metadata-ingestion/src/datahub/ingestion/source/powerbi/rest_api_wrapper/data_resolver.py @@ -63,7 +63,7 @@ def __init__( self.__access_token_expiry_time: Optional[datetime] = None self.__tenant_id = tenant_id # Test connection by generating access token - logger.info("Trying to connect to {}".format(self._get_authority_url())) + logger.info(f"Trying to connect to {self._get_authority_url()}") # Power-Bi Auth (Service Principal Auth) self.__msal_client = msal.ConfidentialClientApplication( client_id, @@ -72,7 +72,7 @@ def __init__( ) self.get_access_token() - logger.info("Connected to {}".format(self._get_authority_url())) + logger.info(f"Connected to {self._get_authority_url()}") self._request_session = requests.Session() # set re-try parameter for request_session self._request_session.mount( @@ -124,7 +124,7 @@ def get_users(self, workspace_id: str, entity: str, entity_id: str) -> List[User pass def _get_authority_url(self): - return "{}{}".format(DataResolverBase.AUTHORITY, self.__tenant_id) + return f"{DataResolverBase.AUTHORITY}{self.__tenant_id}" def get_authorization_header(self): return {Constant.Authorization: self.get_access_token()} @@ -193,7 +193,7 @@ def get_dashboards(self, workspace: Workspace) -> List[Dashboard]: id=instance.get(Constant.ID), isReadOnly=instance.get(Constant.IS_READ_ONLY), displayName=instance.get(Constant.DISPLAY_NAME), - description=instance.get(Constant.DESCRIPTION, str()), + description=instance.get(Constant.DESCRIPTION, ""), embedUrl=instance.get(Constant.EMBED_URL), webUrl=instance.get(Constant.WEB_URL), workspace_id=workspace.id, @@ -276,7 +276,7 @@ def fetch_reports(): name=raw_instance.get(Constant.NAME), webUrl=raw_instance.get(Constant.WEB_URL), embedUrl=raw_instance.get(Constant.EMBED_URL), - description=raw_instance.get(Constant.DESCRIPTION, str()), + description=raw_instance.get(Constant.DESCRIPTION, ""), pages=self._get_pages_by_report( workspace=workspace, report_id=raw_instance[Constant.ID] ), @@ -809,7 +809,7 @@ def get_modified_workspaces(self, modified_since: str) -> List[str]: # Return scan_id of Scan created for the given workspace workspace_ids = [row["id"] for row in res.json()] - logger.debug("modified workspace_ids: {}".format(workspace_ids)) + logger.debug(f"modified workspace_ids: {workspace_ids}") return workspace_ids def get_dataset_parameters( diff --git a/metadata-ingestion/src/datahub/ingestion/source/powerbi_report_server/report_server.py b/metadata-ingestion/src/datahub/ingestion/source/powerbi_report_server/report_server.py index b793929faa6912..d6c7076d49507e 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/powerbi_report_server/report_server.py +++ b/metadata-ingestion/src/datahub/ingestion/source/powerbi_report_server/report_server.py @@ -143,7 +143,7 @@ class PowerBiReportServerAPI: def __init__(self, config: PowerBiReportServerAPIConfig) -> None: self.__config: PowerBiReportServerAPIConfig = config self.__auth: HttpNtlmAuth = HttpNtlmAuth( - "{}\\{}".format(self.__config.workstation_name, self.__config.username), + f"{self.__config.workstation_name}\\{self.__config.username}", self.__config.password, ) @@ -153,14 +153,14 @@ def get_auth_credentials(self): def requests_get(self, url_http: str, url_https: str, content_type: str) -> Any: try: - LOGGER.info("Request to Report URL={}".format(url_https)) + LOGGER.info(f"Request to Report URL={url_https}") response = requests.get( url=url_https, auth=self.get_auth_credentials, verify=True, ) except ConnectionError: - LOGGER.info("Request to Report URL={}".format(url_http)) + LOGGER.info(f"Request to Report URL={url_http}") response = requests.get( url=url_http, auth=self.get_auth_credentials, @@ -406,7 +406,7 @@ def to_datahub_user(self, user: CorpUser) -> List[MetadataChangeProposalWrapper] """ user_mcps = [] if user: - LOGGER.info("Converting user {} to datahub's user".format(user.username)) + LOGGER.info(f"Converting user {user.username} to datahub's user") # Create an URN for User user_urn = builder.make_user_urn(user.get_urn_part()) @@ -449,7 +449,7 @@ def to_datahub_user(self, user: CorpUser) -> List[MetadataChangeProposalWrapper] def to_datahub_work_units(self, report: Report) -> List[EquableMetadataWorkUnit]: mcps = [] user_mcps = [] - LOGGER.info("Converting Dashboard={} to DataHub Dashboard".format(report.name)) + LOGGER.info(f"Converting Dashboard={report.name} to DataHub Dashboard") # Convert user to CorpUser user_info = report.user_info.owner_to_add if user_info: diff --git a/metadata-ingestion/src/datahub/ingestion/source/powerbi_report_server/report_server_domain.py b/metadata-ingestion/src/datahub/ingestion/source/powerbi_report_server/report_server_domain.py index ee87d93774b3dc..b65ae5cd2994cc 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/powerbi_report_server/report_server_domain.py +++ b/metadata-ingestion/src/datahub/ingestion/source/powerbi_report_server/report_server_domain.py @@ -39,10 +39,10 @@ def validate_diplay_name(cls, value, values): # noqa: N805 return "" def get_urn_part(self): - return "reports.{}".format(self.id) + return f"reports.{self.id}" def get_web_url(self, base_reports_url: str) -> str: - return "{}powerbi{}".format(base_reports_url, self.path) + return f"{base_reports_url}powerbi{self.path}" def get_browse_path( self, base_folder: str, workspace: str, env: str, report_directory: str @@ -57,7 +57,7 @@ class DataSet(CatalogItem): query_execution_time_out: int = Field(alias="QueryExecutionTimeOut") def get_urn_part(self): - return "datasets.{}".format(self.id) + return f"datasets.{self.id}" def __members(self): return (self.id,) @@ -339,7 +339,7 @@ class CorpUser(BaseModel): global_tags: Optional[GlobalTags] = Field(None, alias="globalTags") def get_urn_part(self): - return "{}".format(self.username) + return f"{self.username}" def __members(self): return (self.username,) diff --git a/metadata-ingestion/src/datahub/ingestion/source/pulsar.py b/metadata-ingestion/src/datahub/ingestion/source/pulsar.py index 00a49cd897d6fa..7671e239284305 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/pulsar.py +++ b/metadata-ingestion/src/datahub/ingestion/source/pulsar.py @@ -53,7 +53,7 @@ logger = logging.getLogger(__name__) -class PulsarTopic(object): +class PulsarTopic: __slots__ = ["topic_parts", "fullname", "type", "tenant", "namespace", "topic"] def __init__(self, topic): @@ -65,7 +65,7 @@ def __init__(self, topic): self.topic = topic_parts[5] -class PulsarSchema(object): +class PulsarSchema: __slots__ = [ "schema_version", "schema_name", diff --git a/metadata-ingestion/src/datahub/ingestion/source/qlik_sense/qlik_api.py b/metadata-ingestion/src/datahub/ingestion/source/qlik_sense/qlik_api.py index 66a18873d86df3..d7a040ff5f0a03 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/qlik_sense/qlik_api.py +++ b/metadata-ingestion/src/datahub/ingestion/source/qlik_sense/qlik_api.py @@ -36,7 +36,7 @@ def __init__(self, config: QlikSourceConfig) -> None: ) self.rest_api_url = f"https://{self.config.tenant_hostname}/api/v1" # Test connection by fetching list of api keys - logger.info("Trying to connect to {}".format(self.rest_api_url)) + logger.info(f"Trying to connect to {self.rest_api_url}") self.session.get(f"{self.rest_api_url}/api-keys").raise_for_status() def _log_http_error(self, message: str) -> Any: diff --git a/metadata-ingestion/src/datahub/ingestion/source/qlik_sense/qlik_sense.py b/metadata-ingestion/src/datahub/ingestion/source/qlik_sense/qlik_sense.py index a5b9adae0376c9..b9fd2a9c4fe221 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/qlik_sense/qlik_sense.py +++ b/metadata-ingestion/src/datahub/ingestion/source/qlik_sense/qlik_sense.py @@ -112,7 +112,7 @@ class QlikSenseSource(StatefulIngestionSourceBase, TestableSource): platform: str = "qlik-sense" def __init__(self, config: QlikSourceConfig, ctx: PipelineContext): - super(QlikSenseSource, self).__init__(config, ctx) + super().__init__(config, ctx) self.config = config self.reporter = QlikSourceReport() try: diff --git a/metadata-ingestion/src/datahub/ingestion/source/redshift/lineage_v2.py b/metadata-ingestion/src/datahub/ingestion/source/redshift/lineage_v2.py index 797b309f528cc5..2c7ebb613c57a4 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/redshift/lineage_v2.py +++ b/metadata-ingestion/src/datahub/ingestion/source/redshift/lineage_v2.py @@ -94,7 +94,7 @@ def build( db_schemas: Dict[str, Dict[str, RedshiftSchema]], ) -> None: # Assume things not in `all_tables` as temp tables. - self.known_urns = set( + self.known_urns = { DatasetUrn.create_from_ids( self.platform, f"{db}.{schema}.{table.name}", @@ -104,7 +104,7 @@ def build( for db, schemas in all_tables.items() for schema, tables in schemas.items() for table in tables - ) + } self.aggregator.is_temp_table = lambda urn: urn not in self.known_urns # Handle all the temp tables up front. diff --git a/metadata-ingestion/src/datahub/ingestion/source/s3/config.py b/metadata-ingestion/src/datahub/ingestion/source/s3/config.py index 55e340e2850d55..921ab275642505 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/s3/config.py +++ b/metadata-ingestion/src/datahub/ingestion/source/s3/config.py @@ -111,9 +111,9 @@ def check_path_specs_and_infer_platform( raise ValueError("path_specs must not be empty") # Check that all path specs have the same platform. - guessed_platforms = set( + guessed_platforms = { "s3" if path_spec.is_s3 else "file" for path_spec in path_specs - ) + } if len(guessed_platforms) > 1: raise ValueError( f"Cannot have multiple platforms in path_specs: {guessed_platforms}" diff --git a/metadata-ingestion/src/datahub/ingestion/source/salesforce.py b/metadata-ingestion/src/datahub/ingestion/source/salesforce.py index b25f67d6f5ef13..946fdcedc571f8 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/salesforce.py +++ b/metadata-ingestion/src/datahub/ingestion/source/salesforce.py @@ -353,7 +353,7 @@ def get_custom_object_details(self, sObjectDeveloperName: str) -> dict: self.base_url + "tooling/query/?q=SELECT Description, Language, ManageableState, " + "CreatedDate, CreatedBy.Username, LastModifiedDate, LastModifiedBy.Username " - + "FROM CustomObject where DeveloperName='{0}'".format(sObjectDeveloperName) + + f"FROM CustomObject where DeveloperName='{sObjectDeveloperName}'" ) custom_objects_response = self.sf._call_salesforce("GET", query_url).json() if len(custom_objects_response["records"]) > 0: @@ -656,7 +656,7 @@ def get_schema_metadata_workunit( + "Precision, Scale, Length, Digits ,FieldDefinition.IsIndexed, IsUnique," + "IsCompound, IsComponent, ReferenceTo, FieldDefinition.ComplianceGroup," + "RelationshipName, IsNillable, FieldDefinition.Description, InlineHelpText " - + "FROM EntityParticle WHERE EntityDefinitionId='{0}'".format( + + "FROM EntityParticle WHERE EntityDefinitionId='{}'".format( sObject["DurableId"] ) ) @@ -665,16 +665,14 @@ def get_schema_metadata_workunit( "GET", sObject_fields_query_url ).json() - logger.debug( - "Received Salesforce {sObject} fields response".format(sObject=sObjectName) - ) + logger.debug(f"Received Salesforce {sObjectName} fields response") sObject_custom_fields_query_url = ( self.base_url + "tooling/query?q=SELECT " + "DeveloperName,CreatedDate,CreatedBy.Username,InlineHelpText," + "LastModifiedDate,LastModifiedBy.Username " - + "FROM CustomField WHERE EntityDefinitionId='{0}'".format( + + "FROM CustomField WHERE EntityDefinitionId='{}'".format( sObject["DurableId"] ) ) diff --git a/metadata-ingestion/src/datahub/ingestion/source/schema/json_schema.py b/metadata-ingestion/src/datahub/ingestion/source/schema/json_schema.py index c7e8a15d8dfa48..635e894d18c7e5 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/schema/json_schema.py +++ b/metadata-ingestion/src/datahub/ingestion/source/schema/json_schema.py @@ -212,7 +212,7 @@ def _load_json_schema(filename, loader, use_id_as_base_uri): """Loads the given schema file""" path = Path(filename).resolve() base_path = dirname(str(path)) - base_uri = "file://{}/".format(base_path) + base_uri = f"file://{base_path}/" with open(path) as schema_file: logger.info(f"Opening file {path}") @@ -243,7 +243,7 @@ def stringreplaceloader(match_string, replace_string, uri, **kwargs): return jsonref.jsonloader(uri, **kwargs) def __init__(self, ctx: PipelineContext, config: JsonSchemaSourceConfig): - super(JsonSchemaSource, self).__init__(ctx=ctx, config=config) + super().__init__(ctx=ctx, config=config) self.config = config self.report = StaleEntityRemovalSourceReport() diff --git a/metadata-ingestion/src/datahub/ingestion/source/sigma/sigma_api.py b/metadata-ingestion/src/datahub/ingestion/source/sigma/sigma_api.py index c335bee15931db..c2c28419ebcfd3 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/sigma/sigma_api.py +++ b/metadata-ingestion/src/datahub/ingestion/source/sigma/sigma_api.py @@ -24,7 +24,7 @@ def __init__(self, config: SigmaSourceConfig) -> None: self.users: Dict[str, str] = {} self.session = requests.Session() # Test connection by generating access token - logger.info("Trying to connect to {}".format(self.config.api_url)) + logger.info(f"Trying to connect to {self.config.api_url}") self._generate_token() def _generate_token(self): diff --git a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_usage_v2.py b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_usage_v2.py index 71e5bae5e9b76d..e8b56a01944ad2 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_usage_v2.py +++ b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_usage_v2.py @@ -332,7 +332,7 @@ def _map_user_counts( and self.config.email_domain and user_count["user_name"] ): - user_email = "{0}@{1}".format( + user_email = "{}@{}".format( user_count["user_name"], self.config.email_domain ).lower() if not user_email or not self.config.user_email_pattern.allowed( diff --git a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_utils.py b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_utils.py index af8d8824a4b172..5708b9f168c51f 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_utils.py +++ b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_utils.py @@ -37,7 +37,7 @@ def get_connection(self) -> SnowflakeConnection: class SnowflakeQueryMixin: def query(self: SnowflakeQueryProtocol, query: str) -> Any: try: - self.logger.debug("Query : {}".format(query)) + self.logger.debug(f"Query : {query}") resp = self.get_connection().cursor(DictCursor).execute(query) return resp diff --git a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_v2.py b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_v2.py index 9344e030d749f2..25626d434f2ef4 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_v2.py +++ b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_v2.py @@ -366,7 +366,7 @@ class SnowflakePrivilege: object_type: str def query(query): - logger.info("Query : {}".format(query)) + logger.info(f"Query : {query}") resp = conn.cursor().execute(query) return resp diff --git a/metadata-ingestion/src/datahub/ingestion/source/sql/clickhouse.py b/metadata-ingestion/src/datahub/ingestion/source/sql/clickhouse.py index 84c1d3844a7b48..b2c40f914bddc6 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/sql/clickhouse.py +++ b/metadata-ingestion/src/datahub/ingestion/source/sql/clickhouse.py @@ -286,7 +286,7 @@ def get_view_names(self, connection, schema=None, **kw): # when reflecting schema for multiple tables at once. @reflection.cache # type: ignore def _get_schema_column_info(self, connection, schema=None, **kw): - schema_clause = "database = '{schema}'".format(schema=schema) if schema else "1" + schema_clause = f"database = '{schema}'" if schema else "1" all_columns = defaultdict(list) result = connection.execute( text( @@ -346,7 +346,7 @@ def _get_column_info(self, name, format_type, comment): @reflection.cache # type: ignore def get_columns(self, connection, table_name, schema=None, **kw): if not schema: - query = "DESCRIBE TABLE {}".format(self._quote_table_name(table_name)) + query = f"DESCRIBE TABLE {self._quote_table_name(table_name)}" cols = self._execute(connection, query) else: cols = self._get_clickhouse_columns(connection, table_name, schema, **kw) diff --git a/metadata-ingestion/src/datahub/ingestion/source/sql/hive.py b/metadata-ingestion/src/datahub/ingestion/source/sql/hive.py index 003732236ba80c..95ce534968df5d 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/sql/hive.py +++ b/metadata-ingestion/src/datahub/ingestion/source/sql/hive.py @@ -74,7 +74,9 @@ def dbapi_get_columns_patched(self, connection, table_name, schema=None, **kw): coltype = _type_map[col_type] except KeyError: util.warn( - "Did not recognize type '%s' of column '%s'" % (col_type, col_name) + "Did not recognize type '{}' of column '{}'".format( + col_type, col_name + ) ) coltype = types.NullType # type: ignore result.append( @@ -112,7 +114,7 @@ def get_view_definition_patched(self, connection, view_name, schema=None, **kw): self.identifier_preparer.quote_identifier(schema), self.identifier_preparer.quote_identifier(view_name), ) - row = connection.execute("SHOW CREATE TABLE {}".format(full_table)).fetchone() + row = connection.execute(f"SHOW CREATE TABLE {full_table}").fetchone() return row[0] diff --git a/metadata-ingestion/src/datahub/ingestion/source/sql/oracle.py b/metadata-ingestion/src/datahub/ingestion/source/sql/oracle.py index 0a67d6228e6dbc..dcc1340c81d7b7 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/sql/oracle.py +++ b/metadata-ingestion/src/datahub/ingestion/source/sql/oracle.py @@ -226,13 +226,13 @@ def get_columns( col.default_on_null, ( SELECT id.generation_type || ',' || id.IDENTITY_OPTIONS - FROM DBA_TAB_IDENTITY_COLS%(dblink)s id + FROM DBA_TAB_IDENTITY_COLS{dblink} id WHERE col.table_name = id.table_name AND col.column_name = id.column_name AND col.owner = id.owner - ) AS identity_options""" % { - "dblink": dblink - } + ) AS identity_options""".format( + dblink=dblink + ) else: identity_cols = "NULL as default_on_null, NULL as identity_options" diff --git a/metadata-ingestion/src/datahub/ingestion/source/sql/sql_common.py b/metadata-ingestion/src/datahub/ingestion/source/sql/sql_common.py index 59819db8b2dc9a..30917915518277 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/sql/sql_common.py +++ b/metadata-ingestion/src/datahub/ingestion/source/sql/sql_common.py @@ -326,7 +326,7 @@ class SQLAlchemySource(StatefulIngestionSourceBase, TestableSource): """A Base class for all SQL Sources that use SQLAlchemy to extend""" def __init__(self, config: SQLCommonConfig, ctx: PipelineContext, platform: str): - super(SQLAlchemySource, self).__init__(config, ctx) + super().__init__(config, ctx) self.config = config self.platform = platform self.report: SQLSourceReport = SQLSourceReport() diff --git a/metadata-ingestion/src/datahub/ingestion/source/sql/sql_utils.py b/metadata-ingestion/src/datahub/ingestion/source/sql/sql_utils.py index 16655d17482872..f45147223b8881 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/sql/sql_utils.py +++ b/metadata-ingestion/src/datahub/ingestion/source/sql/sql_utils.py @@ -210,8 +210,7 @@ def gen_lineage( ).as_workunit() ] - for wu in lineage_workunits: - yield wu + yield from lineage_workunits # downgrade a schema field diff --git a/metadata-ingestion/src/datahub/ingestion/source/sql/trino.py b/metadata-ingestion/src/datahub/ingestion/source/sql/trino.py index e1c47acbc4b874..c79af147808748 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/sql/trino.py +++ b/metadata-ingestion/src/datahub/ingestion/source/sql/trino.py @@ -86,7 +86,7 @@ register_custom_type(datatype.JSON, RecordTypeClass) -@functools.lru_cache() +@functools.lru_cache def gen_catalog_connector_dict(engine: Engine) -> Dict[str, str]: query = dedent( """ @@ -473,7 +473,7 @@ def _parse_struct_fields(parts): "type": "record", "name": "__struct_{}".format(str(uuid.uuid4()).replace("-", "")), "fields": fields, - "native_data_type": "ROW({})".format(parts), + "native_data_type": f"ROW({parts})", } diff --git a/metadata-ingestion/src/datahub/ingestion/source/sql/vertica.py b/metadata-ingestion/src/datahub/ingestion/source/sql/vertica.py index 738cc7e3217640..7534f1295c5283 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/sql/vertica.py +++ b/metadata-ingestion/src/datahub/ingestion/source/sql/vertica.py @@ -123,7 +123,7 @@ def clean_host_port(cls, v): class VerticaSource(SQLAlchemySource): def __init__(self, config: VerticaConfig, ctx: PipelineContext): # self.platform = platform - super(VerticaSource, self).__init__(config, ctx, "vertica") + super().__init__(config, ctx, "vertica") self.report: SQLSourceReport = VerticaSourceReport() self.config: VerticaConfig = config diff --git a/metadata-ingestion/src/datahub/ingestion/source/tableau.py b/metadata-ingestion/src/datahub/ingestion/source/tableau.py index e0b442387d3b66..1d44fb6122a362 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/tableau.py +++ b/metadata-ingestion/src/datahub/ingestion/source/tableau.py @@ -924,8 +924,7 @@ def get_connection_objects( offset += count - for obj in connection_objects.get(c.NODES) or []: - yield obj + yield from connection_objects.get(c.NODES) or [] def emit_workbooks(self) -> Iterable[MetadataWorkUnit]: if self.tableau_project_registry: diff --git a/metadata-ingestion/src/datahub/ingestion/source/unity/hive_metastore_proxy.py b/metadata-ingestion/src/datahub/ingestion/source/unity/hive_metastore_proxy.py index 140698a6c4b107..c99fe3b09c5bb5 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/unity/hive_metastore_proxy.py +++ b/metadata-ingestion/src/datahub/ingestion/source/unity/hive_metastore_proxy.py @@ -332,7 +332,7 @@ def _get_table_info(self, schema_name: str, table_name: str) -> dict: properties[col_name] = data_type.strip() else: # col_name == "", data_type is not None - prop_name = "{} {}".format(active_heading, data_type.rstrip()) + prop_name = f"{active_heading} {data_type.rstrip()}" properties[prop_name] = value.rstrip() except Exception as e: self.report.report_warning( diff --git a/metadata-ingestion/src/datahub/ingestion/source/unity/source.py b/metadata-ingestion/src/datahub/ingestion/source/unity/source.py index f3aeb34002f3f0..f1f0b5ddb44755 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/unity/source.py +++ b/metadata-ingestion/src/datahub/ingestion/source/unity/source.py @@ -163,7 +163,7 @@ def get_report(self) -> UnityCatalogReport: return self.report def __init__(self, ctx: PipelineContext, config: UnityCatalogSourceConfig): - super(UnityCatalogSource, self).__init__(config, ctx) + super().__init__(config, ctx) self.config = config self.report: UnityCatalogReport = UnityCatalogReport() diff --git a/metadata-ingestion/src/datahub/ingestion/transformer/pattern_cleanup_ownership.py b/metadata-ingestion/src/datahub/ingestion/transformer/pattern_cleanup_ownership.py index 1e949affd17668..8ef61ab9679e63 100644 --- a/metadata-ingestion/src/datahub/ingestion/transformer/pattern_cleanup_ownership.py +++ b/metadata-ingestion/src/datahub/ingestion/transformer/pattern_cleanup_ownership.py @@ -42,9 +42,9 @@ def _get_current_owner_urns(self, entity_urn: str) -> Set[str]: if self.ctx.graph is not None: current_ownership = self.ctx.graph.get_ownership(entity_urn=entity_urn) if current_ownership is not None: - current_owner_urns: Set[str] = set( - [owner.owner for owner in current_ownership.owners] - ) + current_owner_urns: Set[str] = { + owner.owner for owner in current_ownership.owners + } return current_owner_urns else: return set() diff --git a/metadata-ingestion/src/datahub/integrations/great_expectations/action.py b/metadata-ingestion/src/datahub/integrations/great_expectations/action.py index f76d145a870432..94501b0d499b75 100644 --- a/metadata-ingestion/src/datahub/integrations/great_expectations/action.py +++ b/metadata-ingestion/src/datahub/integrations/great_expectations/action.py @@ -769,9 +769,7 @@ def make_dataset_urn_from_sqlalchemy_uri( ) return None schema_name = ( - schema_name - if exclude_dbname - else "{}.{}".format(url_instance.database, schema_name) + schema_name if exclude_dbname else f"{url_instance.database}.{schema_name}" ) elif data_platform == "mssql": schema_name = schema_name or "dbo" @@ -781,9 +779,7 @@ def make_dataset_urn_from_sqlalchemy_uri( ) return None schema_name = ( - schema_name - if exclude_dbname - else "{}.{}".format(url_instance.database, schema_name) + schema_name if exclude_dbname else f"{url_instance.database}.{schema_name}" ) elif data_platform in ["trino", "snowflake"]: if schema_name is None or url_instance.database is None: @@ -804,9 +800,7 @@ def make_dataset_urn_from_sqlalchemy_uri( if database_name.endswith(f"/{schema_name}"): database_name = database_name[: -len(f"/{schema_name}")] schema_name = ( - schema_name - if exclude_dbname - else "{}.{}".format(database_name, schema_name) + schema_name if exclude_dbname else f"{database_name}.{schema_name}" ) elif data_platform == "bigquery": @@ -817,7 +811,7 @@ def make_dataset_urn_from_sqlalchemy_uri( ) ) return None - schema_name = "{}.{}".format(url_instance.host, url_instance.database) + schema_name = f"{url_instance.host}.{url_instance.database}" schema_name = schema_name or url_instance.database if schema_name is None: @@ -853,7 +847,7 @@ class DecimalEncoder(json.JSONEncoder): def default(self, o): if isinstance(o, Decimal): return str(o) - return super(DecimalEncoder, self).default(o) + return super().default(o) def convert_to_string(var: Any) -> str: diff --git a/metadata-ingestion/src/datahub/sql_parsing/schema_resolver.py b/metadata-ingestion/src/datahub/sql_parsing/schema_resolver.py index 5e2e510533af14..ae5d83c2dfc943 100644 --- a/metadata-ingestion/src/datahub/sql_parsing/schema_resolver.py +++ b/metadata-ingestion/src/datahub/sql_parsing/schema_resolver.py @@ -81,7 +81,7 @@ def includes_temp_tables(self) -> bool: return False def get_urns(self) -> Set[str]: - return set(k for k, v in self._schema_cache.items() if v is not None) + return {k for k, v in self._schema_cache.items() if v is not None} def schema_count(self) -> int: return int( diff --git a/metadata-ingestion/src/datahub/sql_parsing/sqlglot_lineage.py b/metadata-ingestion/src/datahub/sql_parsing/sqlglot_lineage.py index 911ab7136ed10f..c112f5b74ac510 100644 --- a/metadata-ingestion/src/datahub/sql_parsing/sqlglot_lineage.py +++ b/metadata-ingestion/src/datahub/sql_parsing/sqlglot_lineage.py @@ -919,8 +919,8 @@ def _sqlglot_lineage_inner( # TODO: Can we generate a common WHERE clauses section? # Convert TableName to urns. - in_urns = sorted(set(table_name_urn_mapping[table] for table in tables)) - out_urns = sorted(set(table_name_urn_mapping[table] for table in modified)) + in_urns = sorted({table_name_urn_mapping[table] for table in tables}) + out_urns = sorted({table_name_urn_mapping[table] for table in modified}) column_lineage_urns = None if column_lineage: column_lineage_urns = [ diff --git a/metadata-ingestion/src/datahub/telemetry/telemetry.py b/metadata-ingestion/src/datahub/telemetry/telemetry.py index 08df9e80ecf290..69a790b3d9bc76 100644 --- a/metadata-ingestion/src/datahub/telemetry/telemetry.py +++ b/metadata-ingestion/src/datahub/telemetry/telemetry.py @@ -174,7 +174,7 @@ def update_config(self) -> bool: indent=2, ) return True - except IOError as x: + except OSError as x: if x.errno == errno.ENOENT: logger.debug( f"{CONFIG_FILE} does not exist and could not be created. Please check permissions on the parent folder." @@ -215,12 +215,12 @@ def load_config(self) -> bool: """ try: - with open(CONFIG_FILE, "r") as f: + with open(CONFIG_FILE) as f: config = json.load(f) self.client_id = config["client_id"] self.enabled = config["enabled"] & ENV_ENABLED return True - except IOError as x: + except OSError as x: if x.errno == errno.ENOENT: logger.debug( f"{CONFIG_FILE} does not exist and could not be created. Please check permissions on the parent folder." diff --git a/metadata-ingestion/src/datahub/utilities/file_backed_collections.py b/metadata-ingestion/src/datahub/utilities/file_backed_collections.py index d264a3970fdde4..bb2b827dc06c31 100644 --- a/metadata-ingestion/src/datahub/utilities/file_backed_collections.py +++ b/metadata-ingestion/src/datahub/utilities/file_backed_collections.py @@ -15,6 +15,7 @@ Any, Callable, Dict, + Final, Generic, Iterator, List, @@ -28,8 +29,6 @@ Union, ) -from typing_extensions import Final - from datahub.ingestion.api.closeable import Closeable logger: logging.Logger = logging.getLogger(__name__) diff --git a/metadata-ingestion/src/datahub/utilities/hive_schema_to_avro.py b/metadata-ingestion/src/datahub/utilities/hive_schema_to_avro.py index 7c0f26706ebfa6..447587bea8c407 100644 --- a/metadata-ingestion/src/datahub/utilities/hive_schema_to_avro.py +++ b/metadata-ingestion/src/datahub/utilities/hive_schema_to_avro.py @@ -71,10 +71,8 @@ def _parse_datatype_string( parts = HiveColumnToAvroConverter._ignore_brackets_split(s[4:-1], ",") if len(parts) != 2: raise ValueError( - ( - "The map type string format is: 'map', " - + f"but got: {s}" - ) + "The map type string format is: 'map', " + + f"but got: {s}" ) kt = HiveColumnToAvroConverter._parse_datatype_string(parts[0]) @@ -126,10 +124,8 @@ def _parse_struct_fields_string(s: str, **kwargs: Any) -> Dict[str, object]: ) if len(name_and_type) != 2: raise ValueError( - ( - "The struct field string format is: 'field_name:field_type', " - + f"but got: {part}" - ) + "The struct field string format is: 'field_name:field_type', " + + f"but got: {part}" ) field_name = name_and_type[0].strip() diff --git a/metadata-ingestion/tests/integration/bigquery_v2/test_bigquery.py b/metadata-ingestion/tests/integration/bigquery_v2/test_bigquery.py index e79bbbe995aae0..26511d9e5df1a9 100644 --- a/metadata-ingestion/tests/integration/bigquery_v2/test_bigquery.py +++ b/metadata-ingestion/tests/integration/bigquery_v2/test_bigquery.py @@ -55,7 +55,7 @@ def test_bigquery_v2_ingest( tmp_path, ): test_resources_dir = pytestconfig.rootpath / "tests/integration/bigquery_v2" - mcp_golden_path = "{}/bigquery_mcp_golden.json".format(test_resources_dir) + mcp_golden_path = f"{test_resources_dir}/bigquery_mcp_golden.json" mcp_output_path = "{}/{}".format(tmp_path, "bigquery_mcp_output.json") get_datasets_for_project_id.return_value = [ diff --git a/metadata-ingestion/tests/integration/dbt/test_dbt.py b/metadata-ingestion/tests/integration/dbt/test_dbt.py index 5f7d65f5b23773..941315fcfa9d5e 100644 --- a/metadata-ingestion/tests/integration/dbt/test_dbt.py +++ b/metadata-ingestion/tests/integration/dbt/test_dbt.py @@ -232,13 +232,13 @@ def test_dbt_ingest( config: DbtTestConfig = dbt_test_config test_resources_dir = pytestconfig.rootpath / "tests/integration/dbt" - with open(test_resources_dir / "dbt_manifest.json", "r") as f: + with open(test_resources_dir / "dbt_manifest.json") as f: requests_mock.get("http://some-external-repo/dbt_manifest.json", text=f.read()) - with open(test_resources_dir / "dbt_catalog.json", "r") as f: + with open(test_resources_dir / "dbt_catalog.json") as f: requests_mock.get("http://some-external-repo/dbt_catalog.json", text=f.read()) - with open(test_resources_dir / "dbt_sources.json", "r") as f: + with open(test_resources_dir / "dbt_sources.json") as f: requests_mock.get("http://some-external-repo/dbt_sources.json", text=f.read()) config.set_paths( diff --git a/metadata-ingestion/tests/integration/git/test_git_clone.py b/metadata-ingestion/tests/integration/git/test_git_clone.py index cf1f649825e0c0..773e84cbf7488b 100644 --- a/metadata-ingestion/tests/integration/git/test_git_clone.py +++ b/metadata-ingestion/tests/integration/git/test_git_clone.py @@ -123,15 +123,13 @@ def test_git_clone_private(tmp_path): branch="d380a2b777ec6f4653626f39c68dba85893faa74", ) assert checkout_dir.exists() - assert set(os.listdir(checkout_dir)) == set( - [ - ".datahub", - "models", - "README.md", - ".github", - ".git", - "views", - "manifest_lock.lkml", - "manifest.lkml", - ] - ) + assert set(os.listdir(checkout_dir)) == { + ".datahub", + "models", + "README.md", + ".github", + ".git", + "views", + "manifest_lock.lkml", + "manifest.lkml", + } diff --git a/metadata-ingestion/tests/integration/iceberg/test_iceberg.py b/metadata-ingestion/tests/integration/iceberg/test_iceberg.py index a9ab43169405de..24a636077bfdde 100644 --- a/metadata-ingestion/tests/integration/iceberg/test_iceberg.py +++ b/metadata-ingestion/tests/integration/iceberg/test_iceberg.py @@ -31,9 +31,7 @@ def remove_docker_image(): def spark_submit(file_path: str, args: str = "") -> None: docker = "docker" command = f"{docker} exec spark-iceberg spark-submit {file_path} {args}" - ret = subprocess.run( - command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE - ) + ret = subprocess.run(command, shell=True, capture_output=True) assert ret.returncode == 0 diff --git a/metadata-ingestion/tests/integration/kafka-connect/test_kafka_connect.py b/metadata-ingestion/tests/integration/kafka-connect/test_kafka_connect.py index a2015eb06b5699..26f3d50c1167bc 100644 --- a/metadata-ingestion/tests/integration/kafka-connect/test_kafka_connect.py +++ b/metadata-ingestion/tests/integration/kafka-connect/test_kafka_connect.py @@ -88,9 +88,7 @@ def test_resources_dir(pytestconfig): def loaded_kafka_connect(kafka_connect_runner): # # Setup mongo cluster command = "docker exec test_mongo mongosh test_db -f /scripts/mongo-init.js" - ret = subprocess.run( - command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE - ) + ret = subprocess.run(command, shell=True, capture_output=True) assert ret.returncode == 0 # Creating MySQL source with no transformations , only topic prefix @@ -298,9 +296,7 @@ def loaded_kafka_connect(kafka_connect_runner): assert r.status_code == 201 # Created command = "docker exec test_mongo mongosh test_db -f /scripts/mongo-populate.js" - ret = subprocess.run( - command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE - ) + ret = subprocess.run(command, shell=True, capture_output=True) assert ret.returncode == 0 # Creating S3 Sink source diff --git a/metadata-ingestion/tests/integration/kafka/test_kafka_state.py b/metadata-ingestion/tests/integration/kafka/test_kafka_state.py index 6dfc0427f76c15..24e81fbf128b01 100644 --- a/metadata-ingestion/tests/integration/kafka/test_kafka_state.py +++ b/metadata-ingestion/tests/integration/kafka/test_kafka_state.py @@ -40,9 +40,9 @@ def create_kafka_topics(self, topics: List[NewTopic]) -> None: for topic, f in fs.items(): try: f.result() # The result itself is None - print("Topic {} created".format(topic)) + print(f"Topic {topic} created") except Exception as e: - print("Failed to create topic {}: {}".format(topic, e)) + print(f"Failed to create topic {topic}: {e}") raise e def delete_kafka_topics(self, topics: List[str]) -> None: @@ -60,11 +60,11 @@ def delete_kafka_topics(self, topics: List[str]) -> None: for topic, f in fs.items(): try: f.result() # The result itself is None - print("Topic {} deleted".format(topic)) + print(f"Topic {topic} deleted") except Exception as e: # this error should be ignored when we already deleted # the topic within the test code - print("Failed to delete topic {}: {}".format(topic, e)) + print(f"Failed to delete topic {topic}: {e}") def __enter__(self): topics = [ diff --git a/metadata-ingestion/tests/integration/lookml/test_lookml.py b/metadata-ingestion/tests/integration/lookml/test_lookml.py index 5e0973a007f3ac..1c1f0fec3eebb9 100644 --- a/metadata-ingestion/tests/integration/lookml/test_lookml.py +++ b/metadata-ingestion/tests/integration/lookml/test_lookml.py @@ -158,7 +158,7 @@ def test_lookml_explore_refinement(pytestconfig, tmp_path, mock_time): {"name": "+book", "extends__all": [["order"]]}, {"name": "+book", "extends__all": [["transaction"]]}, ], - connection=str(), + connection="", resolved_includes=[], includes=[], ) diff --git a/metadata-ingestion/tests/integration/metabase/test_metabase.py b/metadata-ingestion/tests/integration/metabase/test_metabase.py index edb23c1fb7a1ce..b39550f3d048a2 100644 --- a/metadata-ingestion/tests/integration/metabase/test_metabase.py +++ b/metadata-ingestion/tests/integration/metabase/test_metabase.py @@ -68,7 +68,7 @@ def get(self, url): def raise_for_status(self): if self.error_list is not None and self.url in self.error_list: - http_error_msg = "%s Client Error: %s for url: %s" % ( + http_error_msg = "{} Client Error: {} for url: {}".format( 400, "Simulate error", self.url, diff --git a/metadata-ingestion/tests/integration/mode/test_mode.py b/metadata-ingestion/tests/integration/mode/test_mode.py index cfd9751ab9f155..def7277494fe7e 100644 --- a/metadata-ingestion/tests/integration/mode/test_mode.py +++ b/metadata-ingestion/tests/integration/mode/test_mode.py @@ -51,7 +51,7 @@ def get(self, url): def raise_for_status(self): if self.error_list is not None and self.url in self.error_list: - http_error_msg = "%s Client Error: %s for url: %s" % ( + http_error_msg = "{} Client Error: {} for url: {}".format( 400, "Simulate error", self.url, diff --git a/metadata-ingestion/tests/integration/oracle/common.py b/metadata-ingestion/tests/integration/oracle/common.py index c2591bd1d5b0d5..79dbda8c30f896 100644 --- a/metadata-ingestion/tests/integration/oracle/common.py +++ b/metadata-ingestion/tests/integration/oracle/common.py @@ -212,7 +212,7 @@ def get_recipe_sink(self, output_path: str) -> dict: } def get_output_mce_path(self): - return "{}/{}".format(self.tmp_path, self.mces_output_file_name) + return f"{self.tmp_path}/{self.mces_output_file_name}" def get_mock_data_impl(self): return self.default_mock_data diff --git a/metadata-ingestion/tests/integration/snowflake/common.py b/metadata-ingestion/tests/integration/snowflake/common.py index 7b8441a1a81505..30c4b2bec3a049 100644 --- a/metadata-ingestion/tests/integration/snowflake/common.py +++ b/metadata-ingestion/tests/integration/snowflake/common.py @@ -230,7 +230,7 @@ def default_query_results( # noqa: C901 return [ { "TABLE_SCHEMA": "TEST_SCHEMA", - "TABLE_NAME": "TABLE_{}".format(tbl_idx), + "TABLE_NAME": f"TABLE_{tbl_idx}", "TABLE_TYPE": "BASE TABLE", "CREATED": datetime(2021, 6, 8, 0, 0, 0, 0), "LAST_ALTERED": datetime(2021, 6, 8, 0, 0, 0, 0), @@ -245,7 +245,7 @@ def default_query_results( # noqa: C901 return [ { "schema_name": "TEST_SCHEMA", - "name": "VIEW_{}".format(view_idx), + "name": f"VIEW_{view_idx}", "created_on": datetime(2021, 6, 8, 0, 0, 0, 0), "comment": "Comment for View", "text": f"create view view_{view_idx} as select * from table_{view_idx}", @@ -257,13 +257,13 @@ def default_query_results( # noqa: C901 elif query in [ *[ SnowflakeQuery.columns_for_table( - "TABLE_{}".format(tbl_idx), "TEST_SCHEMA", "TEST_DB" + f"TABLE_{tbl_idx}", "TEST_SCHEMA", "TEST_DB" ) for tbl_idx in range(1, num_tables + 1) ], *[ SnowflakeQuery.columns_for_table( - "VIEW_{}".format(view_idx), "TEST_SCHEMA", "TEST_DB" + f"VIEW_{view_idx}", "TEST_SCHEMA", "TEST_DB" ) for view_idx in range(1, num_views + 1) ], @@ -273,7 +273,7 @@ def default_query_results( # noqa: C901 # "TABLE_CATALOG": "TEST_DB", # "TABLE_SCHEMA": "TEST_SCHEMA", # "TABLE_NAME": "TABLE_{}".format(tbl_idx), - "COLUMN_NAME": "COL_{}".format(col_idx), + "COLUMN_NAME": f"COL_{col_idx}", "ORDINAL_POSITION": col_idx, "IS_NULLABLE": "NO", "DATA_TYPE": "TEXT" if col_idx > 1 else "NUMBER", @@ -317,7 +317,7 @@ def default_query_results( # noqa: C901 [ { "columns": [ - {"columnId": 0, "columnName": "COL_{}".format(col_idx)} + {"columnId": 0, "columnName": f"COL_{col_idx}"} for col_idx in range(1, num_cols + 1) ], "objectDomain": "Table", @@ -326,7 +326,7 @@ def default_query_results( # noqa: C901 }, { "columns": [ - {"columnId": 0, "columnName": "COL_{}".format(col_idx)} + {"columnId": 0, "columnName": f"COL_{col_idx}"} for col_idx in range(1, num_cols + 1) ], "objectDomain": "Table", @@ -335,7 +335,7 @@ def default_query_results( # noqa: C901 }, { "columns": [ - {"columnId": 0, "columnName": "COL_{}".format(col_idx)} + {"columnId": 0, "columnName": f"COL_{col_idx}"} for col_idx in range(1, num_cols + 1) ], "objectDomain": "Table", @@ -348,7 +348,7 @@ def default_query_results( # noqa: C901 [ { "columns": [ - {"columnId": 0, "columnName": "COL_{}".format(col_idx)} + {"columnId": 0, "columnName": f"COL_{col_idx}"} for col_idx in range(1, num_cols + 1) ], "objectDomain": "Table", @@ -357,7 +357,7 @@ def default_query_results( # noqa: C901 }, { "columns": [ - {"columnId": 0, "columnName": "COL_{}".format(col_idx)} + {"columnId": 0, "columnName": f"COL_{col_idx}"} for col_idx in range(1, num_cols + 1) ], "objectDomain": "Table", @@ -366,7 +366,7 @@ def default_query_results( # noqa: C901 }, { "columns": [ - {"columnId": 0, "columnName": "COL_{}".format(col_idx)} + {"columnId": 0, "columnName": f"COL_{col_idx}"} for col_idx in range(1, num_cols + 1) ], "objectDomain": "Table", @@ -381,10 +381,10 @@ def default_query_results( # noqa: C901 "columns": [ { "columnId": 0, - "columnName": "COL_{}".format(col_idx), + "columnName": f"COL_{col_idx}", "directSources": [ { - "columnName": "COL_{}".format(col_idx), + "columnName": f"COL_{col_idx}", "objectDomain": "Table", "objectId": 0, "objectName": "TEST_DB.TEST_SCHEMA.TABLE_2", @@ -395,7 +395,7 @@ def default_query_results( # noqa: C901 ], "objectDomain": "Table", "objectId": 0, - "objectName": "TEST_DB.TEST_SCHEMA.TABLE_{}".format(op_idx), + "objectName": f"TEST_DB.TEST_SCHEMA.TABLE_{op_idx}", } ] ), @@ -456,11 +456,11 @@ def default_query_results( # noqa: C901 ): return [ { - "DOWNSTREAM_TABLE_NAME": "TEST_DB.TEST_SCHEMA.TABLE_{}".format(op_idx), + "DOWNSTREAM_TABLE_NAME": f"TEST_DB.TEST_SCHEMA.TABLE_{op_idx}", "UPSTREAM_TABLE_NAME": "TEST_DB.TEST_SCHEMA.TABLE_2", "UPSTREAM_TABLE_COLUMNS": json.dumps( [ - {"columnId": 0, "columnName": "COL_{}".format(col_idx)} + {"columnId": 0, "columnName": f"COL_{col_idx}"} for col_idx in range(1, num_cols + 1) ] ), @@ -468,10 +468,10 @@ def default_query_results( # noqa: C901 [ { "columnId": 0, - "columnName": "COL_{}".format(col_idx), + "columnName": f"COL_{col_idx}", "directSources": [ { - "columnName": "COL_{}".format(col_idx), + "columnName": f"COL_{col_idx}", "objectDomain": "Table", "objectId": 0, "objectName": "TEST_DB.TEST_SCHEMA.TABLE_2", @@ -519,7 +519,7 @@ def default_query_results( # noqa: C901 return [ { - "DOWNSTREAM_TABLE_NAME": "TEST_DB.TEST_SCHEMA.TABLE_{}".format(op_idx), + "DOWNSTREAM_TABLE_NAME": f"TEST_DB.TEST_SCHEMA.TABLE_{op_idx}", "DOWNSTREAM_TABLE_DOMAIN": "TABLE", "UPSTREAM_TABLES": json.dumps( [ @@ -609,7 +609,7 @@ def default_query_results( # noqa: C901 ): return [ { - "DOWNSTREAM_TABLE_NAME": "TEST_DB.TEST_SCHEMA.TABLE_{}".format(op_idx), + "DOWNSTREAM_TABLE_NAME": f"TEST_DB.TEST_SCHEMA.TABLE_{op_idx}", "DOWNSTREAM_TABLE_DOMAIN": "TABLE", "UPSTREAM_TABLES": json.dumps( [ @@ -690,7 +690,7 @@ def default_query_results( # noqa: C901 "VIEW_DOMAIN": "VIEW", "VIEW_COLUMNS": json.dumps( [ - {"columnId": 0, "columnName": "COL_{}".format(col_idx)} + {"columnId": 0, "columnName": f"COL_{col_idx}"} for col_idx in range(1, num_cols + 1) ] ), @@ -699,10 +699,10 @@ def default_query_results( # noqa: C901 [ { "columnId": 0, - "columnName": "COL_{}".format(col_idx), + "columnName": f"COL_{col_idx}", "directSources": [ { - "columnName": "COL_{}".format(col_idx), + "columnName": f"COL_{col_idx}", "objectDomain": "Table", "objectId": 0, "objectName": "TEST_DB.TEST_SCHEMA.TABLE_2", diff --git a/metadata-ingestion/tests/integration/snowflake/test_snowflake_failures.py b/metadata-ingestion/tests/integration/snowflake/test_snowflake_failures.py index 65c259e8acdc3d..9760ea1a9c72ba 100644 --- a/metadata-ingestion/tests/integration/snowflake/test_snowflake_failures.py +++ b/metadata-ingestion/tests/integration/snowflake/test_snowflake_failures.py @@ -169,7 +169,7 @@ def test_snowflake_list_columns_error_causes_pipeline_warning( default_query_results, [ SnowflakeQuery.columns_for_table( - "TABLE_{}".format(tbl_idx), "TEST_SCHEMA", "TEST_DB" + f"TABLE_{tbl_idx}", "TEST_SCHEMA", "TEST_DB" ) for tbl_idx in range(1, NUM_TABLES + 1) ], diff --git a/metadata-ingestion/tests/integration/sql_server/test_sql_server.py b/metadata-ingestion/tests/integration/sql_server/test_sql_server.py index f439a322c26771..4e9b4bee8ce6ba 100644 --- a/metadata-ingestion/tests/integration/sql_server/test_sql_server.py +++ b/metadata-ingestion/tests/integration/sql_server/test_sql_server.py @@ -23,9 +23,7 @@ def mssql_runner(docker_compose_runner, pytestconfig): # Run the setup.sql file to populate the database. command = "docker exec testsqlserver /opt/mssql-tools/bin/sqlcmd -S localhost -U sa -P 'test!Password' -d master -i /setup/setup.sql" - ret = subprocess.run( - command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE - ) + ret = subprocess.run(command, shell=True, capture_output=True) assert ret.returncode == 0 yield docker_services diff --git a/metadata-ingestion/tests/test_helpers/mce_helpers.py b/metadata-ingestion/tests/test_helpers/mce_helpers.py index 563ccbee03c274..9ee4642bfe6eb3 100644 --- a/metadata-ingestion/tests/test_helpers/mce_helpers.py +++ b/metadata-ingestion/tests/test_helpers/mce_helpers.py @@ -174,20 +174,16 @@ def get_entity_urns(events_file: str) -> Set[str]: def _get_entity_urns(events_list: List[Dict]) -> Set[str]: entity_type = "dataset" # mce urns - mce_urns = set( - [ - _get_element(x, _get_mce_urn_path_spec(entity_type)) - for x in events_list - if _get_filter(mce=True, entity_type=entity_type)(x) - ] - ) - mcp_urns = set( - [ - _get_element(x, _get_mcp_urn_path_spec()) - for x in events_list - if _get_filter(mcp=True, entity_type=entity_type)(x) - ] - ) + mce_urns = { + _get_element(x, _get_mce_urn_path_spec(entity_type)) + for x in events_list + if _get_filter(mce=True, entity_type=entity_type)(x) + } + mcp_urns = { + _get_element(x, _get_mcp_urn_path_spec()) + for x in events_list + if _get_filter(mcp=True, entity_type=entity_type)(x) + } all_urns = mce_urns.union(mcp_urns) return all_urns @@ -268,20 +264,16 @@ def assert_for_each_entity( test_output = load_json_file(file) assert isinstance(test_output, list) # mce urns - mce_urns = set( - [ - _get_element(x, _get_mce_urn_path_spec(entity_type)) - for x in test_output - if _get_filter(mce=True, entity_type=entity_type)(x) - ] - ) - mcp_urns = set( - [ - _get_element(x, _get_mcp_urn_path_spec()) - for x in test_output - if _get_filter(mcp=True, entity_type=entity_type)(x) - ] - ) + mce_urns = { + _get_element(x, _get_mce_urn_path_spec(entity_type)) + for x in test_output + if _get_filter(mce=True, entity_type=entity_type)(x) + } + mcp_urns = { + _get_element(x, _get_mcp_urn_path_spec()) + for x in test_output + if _get_filter(mcp=True, entity_type=entity_type)(x) + } all_urns = mce_urns.union(mcp_urns) # there should not be any None urns assert None not in all_urns @@ -378,20 +370,16 @@ def assert_entity_urn_not_like(entity_type: str, regex_pattern: str, file: str) test_output = load_json_file(file) assert isinstance(test_output, list) # mce urns - mce_urns = set( - [ - _get_element(x, _get_mce_urn_path_spec(entity_type)) - for x in test_output - if _get_filter(mce=True, entity_type=entity_type)(x) - ] - ) - mcp_urns = set( - [ - _get_element(x, _get_mcp_urn_path_spec()) - for x in test_output - if _get_filter(mcp=True, entity_type=entity_type)(x) - ] - ) + mce_urns = { + _get_element(x, _get_mce_urn_path_spec(entity_type)) + for x in test_output + if _get_filter(mce=True, entity_type=entity_type)(x) + } + mcp_urns = { + _get_element(x, _get_mcp_urn_path_spec()) + for x in test_output + if _get_filter(mcp=True, entity_type=entity_type)(x) + } all_urns = mce_urns.union(mcp_urns) print(all_urns) matched_urns = [u for u in all_urns if re.match(regex_pattern, u)] @@ -406,20 +394,16 @@ def assert_entity_urn_like(entity_type: str, regex_pattern: str, file: str) -> i test_output = load_json_file(file) assert isinstance(test_output, list) # mce urns - mce_urns = set( - [ - _get_element(x, _get_mce_urn_path_spec(entity_type)) - for x in test_output - if _get_filter(mce=True, entity_type=entity_type)(x) - ] - ) - mcp_urns = set( - [ - _get_element(x, _get_mcp_urn_path_spec()) - for x in test_output - if _get_filter(mcp=True, entity_type=entity_type)(x) - ] - ) + mce_urns = { + _get_element(x, _get_mce_urn_path_spec(entity_type)) + for x in test_output + if _get_filter(mce=True, entity_type=entity_type)(x) + } + mcp_urns = { + _get_element(x, _get_mcp_urn_path_spec()) + for x in test_output + if _get_filter(mcp=True, entity_type=entity_type)(x) + } all_urns = mce_urns.union(mcp_urns) print(all_urns) matched_urns = [u for u in all_urns if re.match(regex_pattern, u)] diff --git a/metadata-ingestion/tests/unit/api/source_helpers/test_source_helpers.py b/metadata-ingestion/tests/unit/api/source_helpers/test_source_helpers.py index f28c7167ca3198..d995404ad69a53 100644 --- a/metadata-ingestion/tests/unit/api/source_helpers/test_source_helpers.py +++ b/metadata-ingestion/tests/unit/api/source_helpers/test_source_helpers.py @@ -201,7 +201,7 @@ def test_auto_browse_path_v2_by_container_hierarchy(telemetry_ping_mock): assert paths["i"] == _make_container_browse_path_entries(["one", "a"]) # Check urns emitted on demand -- not all at end - for urn in set(wu.get_urn() for wu in new_wus): + for urn in {wu.get_urn() for wu in new_wus}: try: idx = next( i diff --git a/metadata-ingestion/tests/unit/config/test_config_loader.py b/metadata-ingestion/tests/unit/config/test_config_loader.py index f9a4076e18363d..25ee289ec4e4e7 100644 --- a/metadata-ingestion/tests/unit/config/test_config_loader.py +++ b/metadata-ingestion/tests/unit/config/test_config_loader.py @@ -52,7 +52,7 @@ "VAR1": "stuff1", "VAR2": "stuff2", }, - set(["VAR1", "UNSET_VAR3", "VAR2"]), + {"VAR1", "UNSET_VAR3", "VAR2"}, ), ( "tests/unit/config/complex_variable_expansion.yml", @@ -107,22 +107,20 @@ "VAR10": "stuff10", "VAR11": "stuff11", }, - set( - [ - "VAR1", - "VAR2", - "VAR3", - "VAR4", - "VAR5", - "VAR6", - "VAR7", - "VAR8", - "VAR9", - "VAR10", - # VAR11 is escaped and hence not referenced - "VARNONEXISTENT", - ] - ), + { + "VAR1", + "VAR2", + "VAR3", + "VAR4", + "VAR5", + "VAR6", + "VAR7", + "VAR8", + "VAR9", + "VAR10", + # VAR11 is escaped and hence not referenced + "VARNONEXISTENT", + }, ), ], ) diff --git a/metadata-ingestion/tests/unit/stateful_ingestion/state/test_stateful_ingestion.py b/metadata-ingestion/tests/unit/stateful_ingestion/state/test_stateful_ingestion.py index 783b0fe18b29ab..50d9b86b3a0171 100644 --- a/metadata-ingestion/tests/unit/stateful_ingestion/state/test_stateful_ingestion.py +++ b/metadata-ingestion/tests/unit/stateful_ingestion/state/test_stateful_ingestion.py @@ -73,7 +73,7 @@ class DummySource(StatefulIngestionSourceBase): reporter: DummySourceReport def __init__(self, config: DummySourceConfig, ctx: PipelineContext): - super(DummySource, self).__init__(config, ctx) + super().__init__(config, ctx) self.source_config = config self.reporter = DummySourceReport() # Create and register the stateful ingestion use-case handler. diff --git a/metadata-ingestion/tests/unit/test_pipeline.py b/metadata-ingestion/tests/unit/test_pipeline.py index 194a396edb3108..bcc0f73a5c9677 100644 --- a/metadata-ingestion/tests/unit/test_pipeline.py +++ b/metadata-ingestion/tests/unit/test_pipeline.py @@ -29,7 +29,7 @@ pytestmark = pytest.mark.random_order(disabled=True) -class TestPipeline(object): +class TestPipeline: @patch("datahub.ingestion.source.kafka.KafkaSource.get_workunits", autospec=True) @patch("datahub.ingestion.sink.console.ConsoleSink.close", autospec=True) @freeze_time(FROZEN_TIME) diff --git a/metadata-ingestion/tests/unit/test_snowflake_shares.py b/metadata-ingestion/tests/unit/test_snowflake_shares.py index 9e33ba6132e069..fc753f99b7e8f6 100644 --- a/metadata-ingestion/tests/unit/test_snowflake_shares.py +++ b/metadata-ingestion/tests/unit/test_snowflake_shares.py @@ -284,7 +284,7 @@ def test_snowflake_shares_workunit_outbound_share( ] entity_urns.add(wu.get_urn()) - assert len((entity_urns)) == 6 + assert len(entity_urns) == 6 def test_snowflake_shares_workunit_inbound_and_outbound_share( diff --git a/metadata-ingestion/tests/unit/utilities/test_advanced_thread_executor.py b/metadata-ingestion/tests/unit/utilities/test_advanced_thread_executor.py index ae4616c604a61f..7b51c18a85c5f6 100644 --- a/metadata-ingestion/tests/unit/utilities/test_advanced_thread_executor.py +++ b/metadata-ingestion/tests/unit/utilities/test_advanced_thread_executor.py @@ -77,12 +77,12 @@ def test_backpressure_aware_executor_simple(): def task(i): return i - assert set( + assert { res.result() for res in BackpressureAwareExecutor.map( task, ((i,) for i in range(10)), max_workers=2 ) - ) == set(range(10)) + } == set(range(10)) def test_backpressure_aware_executor_advanced(): @@ -119,7 +119,7 @@ def task(x, y): assert 2 <= len(executed) <= 4 # Finally, consume the rest of the results. - assert set(r.result() for r in results) == { + assert {r.result() for r in results} == { i for i in range(10) if i != first_result.result() } diff --git a/metadata-ingestion/tests/unit/utilities/test_ratelimiter.py b/metadata-ingestion/tests/unit/utilities/test_ratelimiter.py index 0384e1f9188812..bc915e21389a77 100644 --- a/metadata-ingestion/tests/unit/utilities/test_ratelimiter.py +++ b/metadata-ingestion/tests/unit/utilities/test_ratelimiter.py @@ -8,7 +8,7 @@ def test_rate_is_limited(): MAX_CALLS_PER_SEC = 5 TOTAL_CALLS = 18 - actual_calls: Dict[float, int] = defaultdict(lambda: 0) + actual_calls: Dict[float, int] = defaultdict(int) ratelimiter = RateLimiter(max_calls=MAX_CALLS_PER_SEC, period=1) for _ in range(TOTAL_CALLS):