Skip to content

Commit

Permalink
chore(ingest): run pyupgrade for python 3.8 (datahub-project#10513)
Browse files Browse the repository at this point in the history
  • Loading branch information
hsheth2 authored May 16, 2024
1 parent bc9250c commit 3d5735c
Show file tree
Hide file tree
Showing 97 changed files with 297 additions and 350 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -80,9 +80,9 @@ def __post_init__(self):
)

def generate_ownership_aspect(self):
owners = set([builder.make_user_urn(owner) for owner in self.owners]) | set(
[builder.make_group_urn(owner) for owner in self.group_owners]
)
owners = {builder.make_user_urn(owner) for owner in self.owners} | {
builder.make_group_urn(owner) for owner in self.group_owners
}
ownership = OwnershipClass(
owners=[
OwnerClass(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,9 +70,9 @@ def __post_init__(self):
)

def generate_ownership_aspect(self) -> Iterable[OwnershipClass]:
owners = set([builder.make_user_urn(owner) for owner in self.owners]) | set(
[builder.make_group_urn(owner) for owner in self.group_owners]
)
owners = {builder.make_user_urn(owner) for owner in self.owners} | {
builder.make_group_urn(owner) for owner in self.group_owners
}
ownership = OwnershipClass(
owners=[
OwnerClass(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,7 @@ def from_yaml(
cls,
file: Path,
graph: DataHubGraph,
) -> "DataProduct":
) -> DataProduct:
with open(file) as fp:
yaml = YAML(typ="rt") # default, if not specfied, is 'rt' (round-trip)
orig_dictionary = yaml.load(fp)
Expand All @@ -291,7 +291,7 @@ def from_yaml(
return parsed_data_product

@classmethod
def from_datahub(cls, graph: DataHubGraph, id: str) -> "DataProduct":
def from_datahub(cls, graph: DataHubGraph, id: str) -> DataProduct:
data_product_properties: Optional[
DataProductPropertiesClass
] = graph.get_aspect(id, DataProductPropertiesClass)
Expand Down Expand Up @@ -384,7 +384,7 @@ def _patch_ownership(
patches_drop[i] = o

# Figure out what if any are new owners to add
new_owners_to_add = set(o for o in new_owner_type_map) - set(owners_matched)
new_owners_to_add = {o for o in new_owner_type_map} - set(owners_matched)
if new_owners_to_add:
for new_owner in new_owners_to_add:
new_owner_type = new_owner_type_map[new_owner]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,7 @@ def generate_mcp(

if self.schema_metadata:
if self.schema_metadata.file:
with open(self.schema_metadata.file, "r") as schema_fp:
with open(self.schema_metadata.file) as schema_fp:
schema_string = schema_fp.read()
schema_metadata = SchemaMetadataClass(
schemaName=self.name or self.id or self.urn or "",
Expand Down Expand Up @@ -377,8 +377,7 @@ def generate_mcp(
type="COPY",
)
)
for patch_event in patch_builder.build():
yield patch_event
yield from patch_builder.build()

logger.info(f"Created dataset {self.urn}")

Expand Down
6 changes: 3 additions & 3 deletions metadata-ingestion/src/datahub/api/entities/forms/forms.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ def create(file: str) -> None:
emitter: DataHubGraph

with get_default_graph() as emitter:
with open(file, "r") as fp:
with open(file) as fp:
forms: List[dict] = yaml.safe_load(fp)
for form_raw in forms:
form = Forms.parse_obj(form_raw)
Expand Down Expand Up @@ -204,7 +204,7 @@ def validate_prompts(self, emitter: DataHubGraph) -> List[FormPromptClass]:
def upload_entities_for_form(self, emitter: DataHubGraph) -> Union[None, Exception]:
if self.entities and self.entities.urns:
formatted_entity_urns = ", ".join(
['"{}"'.format(value) for value in self.entities.urns]
[f'"{value}"' for value in self.entities.urns]
)
query = UPLOAD_ENTITIES_FOR_FORMS.format(
form_urn=self.urn, entity_urns=formatted_entity_urns
Expand Down Expand Up @@ -281,7 +281,7 @@ def add_owners(self, emitter: DataHubGraph) -> Union[None, Exception]:

@staticmethod
def format_form_filter(field: str, urns: List[str]) -> str:
formatted_urns = ", ".join(['"{}"'.format(urn) for urn in urns])
formatted_urns = ", ".join([f'"{urn}"' for urn in urns])
return FIELD_FILTER_TEMPLATE.format(field=field, values=formatted_urns)

@staticmethod
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ def create(file: str) -> None:
emitter: DataHubGraph

with get_default_graph() as emitter:
with open(file, "r") as fp:
with open(file) as fp:
structuredproperties: List[dict] = yaml.safe_load(fp)
for structuredproperty_raw in structuredproperties:
structuredproperty = StructuredProperties.parse_obj(
Expand Down
2 changes: 1 addition & 1 deletion metadata-ingestion/src/datahub/cli/config_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ def ensure_datahub_config() -> None:


def get_client_config(as_dict: bool = False) -> Union[Optional[DatahubConfig], dict]:
with open(DATAHUB_CONFIG_PATH, "r") as stream:
with open(DATAHUB_CONFIG_PATH) as stream:
try:
config_json = yaml.safe_load(stream)
if as_dict:
Expand Down
2 changes: 1 addition & 1 deletion metadata-ingestion/src/datahub/cli/docker_check.py
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ def check_docker_quickstart() -> QuickstartStatus:

all_containers = set()
for config_file in config_files:
with open(config_file, "r") as config_file:
with open(config_file) as config_file:
all_containers.update(
yaml.safe_load(config_file).get("services", {}).keys()
)
Expand Down
12 changes: 5 additions & 7 deletions metadata-ingestion/src/datahub/cli/docker_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ class Architectures(Enum):
m2 = "m2"


@functools.lru_cache()
@functools.lru_cache
def _docker_subprocess_env() -> Dict[str, str]:
# platform.machine() is equivalent to `uname -m`, as per https://stackoverflow.com/a/45124927/5004662
DOCKER_COMPOSE_PLATFORM: str = "linux/" + platform.machine()
Expand Down Expand Up @@ -316,16 +316,15 @@ def _restore(
assert os.path.exists(
resolved_restore_file
), f"File {resolved_restore_file} does not exist"
with open(resolved_restore_file, "r") as fp:
with open(resolved_restore_file) as fp:
result = subprocess.run(
[
"bash",
"-c",
f"docker exec -i {DOCKER_COMPOSE_PROJECT_NAME}-mysql-1 bash -c 'mysql -uroot -pdatahub datahub '",
],
stdin=fp,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
capture_output=True,
)
if result.returncode != 0:
logger.error("Failed to run MySQL restore")
Expand Down Expand Up @@ -381,7 +380,7 @@ def _restore(
)
env_fp.flush()
if logger.isEnabledFor(logging.DEBUG):
with open(env_fp.name, "r") as env_fp_reader:
with open(env_fp.name) as env_fp_reader:
logger.debug(f"Env file contents: {env_fp_reader.read()}")

# continue to issue the restore indices command
Expand All @@ -401,8 +400,7 @@ def _restore(
+ "acryldata/datahub-upgrade:${DATAHUB_VERSION:-head}"
+ " -u RestoreIndices -a clean",
],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
capture_output=True,
)
logger.info(
f"Index restore command finished with status {result.returncode}"
Expand Down
2 changes: 1 addition & 1 deletion metadata-ingestion/src/datahub/cli/ingest_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -588,6 +588,6 @@ def rollback(
for row in unsafe_entities:
writer.writerow([row.get("urn")])

except IOError as e:
except OSError as e:
logger.exception(f"Unable to save rollback failure report: {e}")
sys.exit(f"Unable to write reports to {report_dir}")
4 changes: 2 additions & 2 deletions metadata-ingestion/src/datahub/cli/quickstart_versioning.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ def fetch_quickstart_config(cls) -> "QuickstartVersionMappingConfig":
"LOCAL_QUICKSTART_MAPPING_FILE is set, will try to read from local file."
)
path = os.path.expanduser(LOCAL_QUICKSTART_MAPPING_FILE)
with open(path, "r") as f:
with open(path) as f:
config_raw = yaml.safe_load(f)
return cls.parse_obj(config_raw)

Expand All @@ -70,7 +70,7 @@ def fetch_quickstart_config(cls) -> "QuickstartVersionMappingConfig":
)
try:
path = os.path.expanduser(DEFAULT_LOCAL_CONFIG_PATH)
with open(path, "r") as f:
with open(path) as f:
config_raw = yaml.safe_load(f)
except Exception:
logger.debug("Couldn't read from local file either.")
Expand Down
15 changes: 13 additions & 2 deletions metadata-ingestion/src/datahub/configuration/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,24 @@
import unittest.mock
from abc import ABC, abstractmethod
from enum import auto
from typing import IO, Any, ClassVar, Dict, List, Optional, Type, TypeVar, Union
from typing import (
IO,
Any,
ClassVar,
Dict,
List,
Optional,
Type,
TypeVar,
Union,
runtime_checkable,
)

import pydantic
from cached_property import cached_property
from pydantic import BaseModel, Extra, ValidationError
from pydantic.fields import Field
from typing_extensions import Protocol, runtime_checkable
from typing_extensions import Protocol

from datahub.configuration._config_enum import ConfigEnum
from datahub.configuration.pydantic_migration_helpers import PYDANTIC_VERSION_2
Expand Down
2 changes: 1 addition & 1 deletion metadata-ingestion/src/datahub/configuration/git.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ def deploy_key_filled_from_deploy_key_file(
if v is None:
deploy_key_file = values.get("deploy_key_file")
if deploy_key_file is not None:
with open(deploy_key_file, "r") as fp:
with open(deploy_key_file) as fp:
deploy_key = SecretStr(fp.read())
return deploy_key
return v
Expand Down
6 changes: 3 additions & 3 deletions metadata-ingestion/src/datahub/configuration/source_common.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,9 @@
DEFAULT_ENV = FabricTypeClass.PROD

# Get all the constants from the FabricTypeClass. It's not an enum, so this is a bit hacky but works.
ALL_ENV_TYPES: Set[str] = set(
[value for name, value in vars(FabricTypeClass).items() if not name.startswith("_")]
)
ALL_ENV_TYPES: Set[str] = {
value for name, value in vars(FabricTypeClass).items() if not name.startswith("_")
}


class PlatformInstanceConfigMixin(ConfigModel):
Expand Down
2 changes: 1 addition & 1 deletion metadata-ingestion/src/datahub/emitter/request_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,4 +25,4 @@ def make_curl_command(
),
url,
]
return " ".join(shlex.quote(fragment) for fragment in fragments)
return shlex.join(fragments)
4 changes: 2 additions & 2 deletions metadata-ingestion/src/datahub/ingestion/api/report.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,12 @@
from dataclasses import dataclass
from datetime import datetime, timedelta
from enum import Enum
from typing import Any, Optional
from typing import Any, Optional, runtime_checkable

import humanfriendly
import pydantic
from pydantic import BaseModel
from typing_extensions import Literal, Protocol, runtime_checkable
from typing_extensions import Literal, Protocol

from datahub.ingestion.api.report_helpers import format_datetime_relative
from datahub.utilities.lossy_collections import LossyList
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ def title_swapping_callback(self: JsonRef) -> dict:
try:
base_doc = self.loader(uri)
except Exception as e:
raise self._error("%s: %s" % (e.__class__.__name__, str(e)), cause=e) from e
raise self._error(f"{e.__class__.__name__}: {str(e)}", cause=e) from e
base_doc = _replace_refs(
base_doc, **{**self._ref_kwargs, "base_uri": uri, "recursing": False}
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -436,8 +436,7 @@ def gen_items_from_list_tuple_or_scalar(
val: Any,
) -> Iterable[avro.schema.Schema]:
if isinstance(val, (list, tuple)):
for i in val:
yield i
yield from val
else:
yield val

Expand Down
9 changes: 4 additions & 5 deletions metadata-ingestion/src/datahub/ingestion/graph/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -324,6 +324,7 @@ def get_ownership(self, entity_urn: str) -> Optional[OwnershipClass]:
def get_schema_metadata(self, entity_urn: str) -> Optional[SchemaMetadataClass]:
return self.get_aspect(entity_urn=entity_urn, aspect_type=SchemaMetadataClass)

@deprecated(reason="Use get_aspect directly.")
def get_domain_properties(self, entity_urn: str) -> Optional[DomainPropertiesClass]:
return self.get_aspect(entity_urn=entity_urn, aspect_type=DomainPropertiesClass)

Expand All @@ -343,11 +344,9 @@ def get_glossary_terms(self, entity_urn: str) -> Optional[GlossaryTermsClass]:
def get_domain(self, entity_urn: str) -> Optional[DomainsClass]:
return self.get_aspect(entity_urn=entity_urn, aspect_type=DomainsClass)

@deprecated(reason="Use get_aspect directly.")
def get_browse_path(self, entity_urn: str) -> Optional[BrowsePathsClass]:
return self.get_aspect(
entity_urn=entity_urn,
aspect_type=BrowsePathsClass,
)
return self.get_aspect(entity_urn=entity_urn, aspect_type=BrowsePathsClass)

def get_usage_aspects_from_urn(
self, entity_urn: str, start_timestamp: int, end_timestamp: int
Expand Down Expand Up @@ -1095,7 +1094,7 @@ def delete_references_to_urn(
related_aspects = response.get("relatedAspects", [])
return reference_count, related_aspects

@functools.lru_cache()
@functools.lru_cache
def _make_schema_resolver(
self,
platform: str,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1009,7 +1009,7 @@ def _transform_extraction(self) -> Iterable[MetadataWorkUnit]:
# in Glue, it's possible for two buckets to have files of different extensions
# if this happens, we append the extension in the URN so the sources can be distinguished
# see process_dataflow_node() for details
s3_formats: DefaultDict[str, Set[Optional[str]]] = defaultdict(lambda: set())
s3_formats: DefaultDict[str, Set[Optional[str]]] = defaultdict(set)
for dag in dags.values():
if dag is not None:
for s3_name, extension in self.get_dataflow_s3_names(dag):
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,4 @@
from typing import Dict

from typing_extensions import Final
from typing import Dict, Final

from datahub.metadata.schema_classes import JobStatusClass

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,7 @@ class BigqueryV2Source(StatefulIngestionSourceBase, TestableSource):
}

def __init__(self, ctx: PipelineContext, config: BigQueryV2Config):
super(BigqueryV2Source, self).__init__(config, ctx)
super().__init__(config, ctx)
self.config: BigQueryV2Config = config
self.report: BigQueryV2Report = BigQueryV2Report()
self.classification_handler = ClassificationHandler(self.config, self.report)
Expand Down Expand Up @@ -340,7 +340,7 @@ def metadata_read_capability_test(
) -> CapabilityReport:
for project_id in project_ids:
try:
logger.info((f"Metadata read capability test for project {project_id}"))
logger.info(f"Metadata read capability test for project {project_id}")
client: bigquery.Client = config.get_bigquery_client()
assert client
bigquery_data_dictionary = BigQuerySchemaApi(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -551,22 +551,20 @@ def lineage_via_catalog_lineage_api(
# Only builds lineage map when the table has upstreams
logger.debug("Found %d upstreams for table %s", len(upstreams), table)
if upstreams:
lineage_map[destination_table_str] = set(
[
LineageEdge(
table=str(
BigQueryTableRef(
table_identifier=BigqueryTableIdentifier.from_string_name(
source_table
)
lineage_map[destination_table_str] = {
LineageEdge(
table=str(
BigQueryTableRef(
table_identifier=BigqueryTableIdentifier.from_string_name(
source_table
)
),
column_mapping=frozenset(),
auditStamp=curr_date,
)
for source_table in upstreams
]
)
)
),
column_mapping=frozenset(),
auditStamp=curr_date,
)
for source_table in upstreams
}
return lineage_map
except Exception as e:
self.error(
Expand Down
Loading

0 comments on commit 3d5735c

Please sign in to comment.