From d6bf41bfb0a35731b519313d68bd22f96e9ec3c2 Mon Sep 17 00:00:00 2001 From: Lazlo Westerhof Date: Wed, 11 Dec 2024 09:56:21 +0100 Subject: [PATCH] Fix Optional --- datarequest.py | 16 ++++++++-------- deposit.py | 4 ++-- folder.py | 14 +++++++------- groups.py | 4 ++-- mail.py | 4 ++-- meta.py | 18 +++++++++--------- meta_form.py | 6 +++--- policies.py | 3 +-- resources.py | 4 ++-- revisions.py | 4 ++-- schema.py | 10 +++++----- schema_transformation.py | 6 +++--- schema_transformations.py | 4 ++-- schema_transformations_utils.py | 7 +++---- settings.py | 4 ++-- setup.cfg | 4 ++-- vault.py | 16 ++++++++-------- vault_archive.py | 6 +++--- 18 files changed, 66 insertions(+), 68 deletions(-) diff --git a/datarequest.py b/datarequest.py index ec5b73896..d4f5a16fb 100644 --- a/datarequest.py +++ b/datarequest.py @@ -10,7 +10,7 @@ from collections import OrderedDict from datetime import datetime from enum import Enum -from typing import Dict, List, Optional +from typing import Dict, List import jsonschema from genquery import AS_DICT, AS_LIST, Query, row_iterator @@ -381,7 +381,7 @@ def get_status(stat: str) -> status: return datarequest_action_permitted(ctx, request_id, roles, statuses) -def datarequest_action_permitted(ctx: rule.Context, request_id: str, roles: List, statuses: Optional[List]) -> bool: +def datarequest_action_permitted(ctx: rule.Context, request_id: str, roles: List, statuses: List | None) -> bool: """Check if current user and data request status meet specified restrictions. :param ctx: Combined type of a callback and rei struct @@ -424,7 +424,7 @@ def datarequest_action_permitted(ctx: rule.Context, request_id: str, roles: List @api.make() -def api_datarequest_roles_get(ctx: rule.Context, request_id: Optional[str] = None) -> api.Result: +def api_datarequest_roles_get(ctx: rule.Context, request_id: str | None = None) -> api.Result: """Get roles of invoking user. :param ctx: Combined type of a callback and rei struct @@ -436,7 +436,7 @@ def api_datarequest_roles_get(ctx: rule.Context, request_id: Optional[str] = Non return datarequest_roles_get(ctx, request_id) -def datarequest_roles_get(ctx: rule.Context, request_id: Optional[str] = None) -> List: +def datarequest_roles_get(ctx: rule.Context, request_id: str | None = None) -> List: """Get roles of invoking user. :param ctx: Combined type of a callback and rei struct @@ -472,7 +472,7 @@ def datarequest_is_owner(ctx: rule.Context, request_id: str) -> bool: return datarequest_owner_get(ctx, request_id) == user.name(ctx) -def datarequest_owner_get(ctx: rule.Context, request_id: str) -> Optional[str]: +def datarequest_owner_get(ctx: rule.Context, request_id: str) -> str | None: """Get the account name (i.e. email address) of the owner of a data request. :param ctx: Combined type of a callback and a rei struct @@ -635,7 +635,7 @@ def datarequest_provenance_write(ctx: rule.Context, request_id: str, request_sta return api.Error("write_error", "Could not write timestamp to provenance log: {}.".format(e)) -def datarequest_data_valid(ctx: rule.Context, data: Dict, schema_name: Optional[str] = None, schema: Optional[str] = None) -> bool: +def datarequest_data_valid(ctx: rule.Context, data: Dict, schema_name: str | None = None, schema: str | None = None) -> bool: """Check if form data contains no errors Default mode of operation is to provide schema data and the schema name of the schema against @@ -676,7 +676,7 @@ def datarequest_data_valid(ctx: rule.Context, data: Dict, schema_name: Optional[ return False -def cc_email_addresses_get(contact_object: Dict) -> Optional[str]: +def cc_email_addresses_get(contact_object: Dict) -> str | None: try: cc = contact_object['cc_email_addresses'] return cc.replace(' ', '') @@ -890,7 +890,7 @@ def file_write_and_lock(ctx: rule.Context, coll_path: str, filename: str, data: @api.make() -def api_datarequest_submit(ctx: rule.Context, data: Dict, draft: bool, draft_request_id: Optional[str] = None) -> api.Result: +def api_datarequest_submit(ctx: rule.Context, data: Dict, draft: bool, draft_request_id: str | None = None) -> api.Result: """Persist a data request to disk. :param ctx: Combined type of a callback and rei struct diff --git a/deposit.py b/deposit.py index 0b7f5095f..efbc0aadc 100644 --- a/deposit.py +++ b/deposit.py @@ -5,7 +5,7 @@ import re from collections import OrderedDict -from typing import Dict, Optional +from typing import Dict import genquery from genquery import AS_DICT, Query @@ -99,7 +99,7 @@ def api_deposit_create(ctx: rule.Context, deposit_group: str) -> api.Result: return {"deposit_path": result["deposit_path"]} -def deposit_create(ctx: rule.Context, deposit_group: Optional[str]) -> Dict: +def deposit_create(ctx: rule.Context, deposit_group: str | None) -> Dict: """Create deposit collection. :param ctx: Combined type of a callback and rei struct diff --git a/folder.py b/folder.py index 506baa3d5..ec922b3b8 100644 --- a/folder.py +++ b/folder.py @@ -5,7 +5,7 @@ import time import uuid -from typing import List, Optional, Tuple +from typing import List, Tuple import genquery import irods_types @@ -480,7 +480,7 @@ def set_epic_pid(ctx: rule.Context, target: str) -> bool: return True -def get_cronjob_status(ctx: rule.Context, coll: str) -> Optional[str]: +def get_cronjob_status(ctx: rule.Context, coll: str) -> str | None: """Get the cronjob status of given collection""" iter = genquery.row_iterator( "META_COLL_ATTR_VALUE", @@ -676,7 +676,7 @@ def get_org_metadata(ctx: rule.Context, path: str, object_type: pathutil.ObjectT else " AND COLL_NAME = '{}'".format(path)))] -def get_locks(ctx: rule.Context, path: str, org_metadata: Optional[List[Tuple[str, str]]] = None, object_type: pathutil.ObjectType = pathutil.ObjectType.COLL) -> List[str]: +def get_locks(ctx: rule.Context, path: str, org_metadata: List[Tuple[str, str]] | None = None, object_type: pathutil.ObjectType = pathutil.ObjectType.COLL) -> List[str]: """Return all locks on a collection or data object (includes locks on parents and children).""" if org_metadata is None: org_metadata = get_org_metadata(ctx, path, object_type=object_type) @@ -700,12 +700,12 @@ def api_folder_get_locks(ctx: rule.Context, coll: str) -> api.Result: return locks -def has_locks(ctx: rule.Context, coll: str, org_metadata: Optional[List[Tuple[str, str]]] = None) -> bool: +def has_locks(ctx: rule.Context, coll: str, org_metadata: List[Tuple[str, str]] | None = None) -> bool: """Check whether a lock exists on the given collection, its parents or children.""" return len(get_locks(ctx, coll, org_metadata=org_metadata)) > 0 -def is_locked(ctx: rule.Context, coll: str, org_metadata: Optional[List[Tuple[str, str]]] = None) -> bool: +def is_locked(ctx: rule.Context, coll: str, org_metadata: List[Tuple[str, str]] | None = None) -> bool: """Check whether a lock exists on the given collection itself or a parent collection. Locks on subcollections are not counted. @@ -722,14 +722,14 @@ def is_locked(ctx: rule.Context, coll: str, org_metadata: Optional[List[Tuple[st return len([x for x in locks if coll.startswith(x)]) > 0 -def is_data_locked(ctx: rule.Context, path: str, org_metadata: Optional[List[Tuple[str, str]]] = None) -> bool: +def is_data_locked(ctx: rule.Context, path: str, org_metadata: List[Tuple[str, str]] | None = None) -> bool: """Check whether a lock exists on the given data object.""" locks = get_locks(ctx, path, org_metadata=org_metadata, object_type=pathutil.ObjectType.DATA) return len(locks) > 0 -def get_status(ctx: rule.Context, path: str, org_metadata: Optional[List[Tuple[str, str]]] = None) -> constants.research_package_state: +def get_status(ctx: rule.Context, path: str, org_metadata: List[Tuple[str, str]] | None = None) -> constants.research_package_state: """Get the status of a research folder.""" if org_metadata is None: org_metadata = get_org_metadata(ctx, path) diff --git a/groups.py b/groups.py index 094fe7131..7009dd074 100644 --- a/groups.py +++ b/groups.py @@ -6,7 +6,7 @@ import time from collections import OrderedDict from datetime import datetime -from typing import Any, Dict, Iterable, List, Optional, Tuple +from typing import Any, Dict, Iterable, List, Tuple import genquery import requests @@ -118,7 +118,7 @@ def getGroupsData(ctx: rule.Context) -> Iterable[Any]: return groups.values() -def getGroupData(ctx: rule.Context, name: str) -> Optional[Dict]: +def getGroupData(ctx: rule.Context, name: str) -> Dict | None: """Get data for one group.""" group = None diff --git a/mail.py b/mail.py index 0493906ad..07b0c04fc 100644 --- a/mail.py +++ b/mail.py @@ -7,14 +7,14 @@ import re import smtplib from email.mime.text import MIMEText -from typing import Optional, Tuple +from typing import Tuple from util import * __all__ = ['rule_mail_test'] -def send(ctx: rule.Context, to: str, actor: str, subject: str, body: str, cc: Optional[str] = None) -> api.Result: +def send(ctx: rule.Context, to: str, actor: str, subject: str, body: str, cc: str | None = None) -> api.Result: """Send an e-mail with specified recipient, subject and body. The originating address and mail server credentials are taken from the diff --git a/meta.py b/meta.py index 333114481..0be0620b6 100644 --- a/meta.py +++ b/meta.py @@ -7,7 +7,7 @@ import re from collections import OrderedDict from datetime import datetime -from typing import Dict, List, Optional +from typing import Dict, List import genquery import irods_types @@ -40,7 +40,7 @@ def metadata_get_links(metadata: Dict) -> List: metadata['links'])) -def metadata_get_schema_id(metadata: Dict) -> Optional[str]: +def metadata_get_schema_id(metadata: Dict) -> str | None: desc = list(filter(lambda x: x['rel'] == 'describedby', metadata_get_links(metadata))) if len(desc) > 0: return desc[0]['href'] @@ -58,8 +58,8 @@ def metadata_set_schema_id(metadata: Dict, schema_id: str) -> None: def get_json_metadata_errors(ctx: rule.Context, metadata_path: str, - metadata: Optional[Dict] = None, - schema: Optional[Dict] = None, + metadata: Dict | None = None, + schema: Dict | None = None, ignore_required: bool = False) -> List: """ Validate JSON metadata, and return a list of errors, if any. @@ -153,7 +153,7 @@ def transform_error(e): def is_json_metadata_valid(ctx: rule.Context, metadata_path: str, - metadata: Optional[Dict] = None, + metadata: Dict | None = None, ignore_required: bool = False) -> bool: """Check if json metadata contains no errors. @@ -177,7 +177,7 @@ def is_json_metadata_valid(ctx: rule.Context, return False -def get_collection_metadata_path(ctx: rule.Context, coll: str) -> Optional[str]: +def get_collection_metadata_path(ctx: rule.Context, coll: str) -> str | None: """Check if a collection has a JSON metadata file and provide its path, if any. :param ctx: Combined type of a callback and rei struct @@ -192,7 +192,7 @@ def get_collection_metadata_path(ctx: rule.Context, coll: str) -> Optional[str]: return None -def get_latest_vault_metadata_path(ctx: rule.Context, vault_pkg_coll: str) -> Optional[str]: +def get_latest_vault_metadata_path(ctx: rule.Context, vault_pkg_coll: str) -> str | None: """Get the latest vault metadata JSON file. :param ctx: Combined type of a callback and rei struct @@ -238,7 +238,7 @@ def rule_meta_validate(rule_args, callback, rei): rule_args[2] = 'metadata validated' -def collection_has_cloneable_metadata(ctx: rule.Context, coll: str) -> Optional[str]: +def collection_has_cloneable_metadata(ctx: rule.Context, coll: str) -> str | None: """Check if a collection has metadata, and validate it. This always ignores 'required' schema attributes, since metadata can @@ -786,7 +786,7 @@ def copy_user_metadata(ctx: rule.Context, source: str, target: str) -> None: log.write(ctx, "copy_user_metadata: failed to copy user metadata from <{}> to <{}/original>".format(source, target)) -def vault_metadata_matches_schema(ctx: rule.Context, coll_name: str, schema_cache: Dict, report_name: str, write_stdout: bool) -> Optional[Dict]: +def vault_metadata_matches_schema(ctx: rule.Context, coll_name: str, schema_cache: Dict, report_name: str, write_stdout: bool) -> Dict | None: """Process a single data package to retrieve and validate that its metadata conforms to the schema. :param ctx: Combined type of a callback and rei struct diff --git a/meta_form.py b/meta_form.py index 94f8fa4f0..b4573e9ca 100644 --- a/meta_form.py +++ b/meta_form.py @@ -4,7 +4,7 @@ __license__ = 'GPLv3, see LICENSE' import re -from typing import Dict, List, Optional, Tuple +from typing import Dict, List, Tuple import irods_types @@ -20,7 +20,7 @@ 'api_meta_form_save'] -def get_coll_lock(ctx: rule.Context, path: str, org_metadata: Optional[List] = None) -> Tuple[str, str]: +def get_coll_lock(ctx: rule.Context, path: str, org_metadata: List | None = None) -> Tuple[str, str]: """Check for existence of locks on a collection. path -> ((no|here|outoftree|ancestor|descendant), rootcoll) @@ -50,7 +50,7 @@ def get_coll_lock(ctx: rule.Context, path: str, org_metadata: Optional[List] = N return ret -def get_coll_lock_count(ctx: rule.Context, path: str, org_metadata: Optional[List] = None) -> int: +def get_coll_lock_count(ctx: rule.Context, path: str, org_metadata: List | None = None) -> int: """Count locks on a collection. :param ctx: Combined type of a callback and rei struct diff --git a/policies.py b/policies.py index 335ba5a6c..95340ed50 100644 --- a/policies.py +++ b/policies.py @@ -4,7 +4,6 @@ __license__ = 'GPLv3, see LICENSE' import re -from typing import Optional import session_vars @@ -641,7 +640,7 @@ def pep_resource_resolve_hierarchy_pre(ctx: rule.Context, operation: str, host: str, parser: str, - vote: str) -> Optional[str]: + vote: str) -> str | None: if not config.arb_enabled or operation != "CREATE": return None diff --git a/resources.py b/resources.py index c51b3e348..830ef953e 100644 --- a/resources.py +++ b/resources.py @@ -4,7 +4,7 @@ __license__ = 'GPLv3, see LICENSE' from datetime import datetime -from typing import Dict, List, Optional +from typing import Dict, List import genquery @@ -680,7 +680,7 @@ def get_groups_on_category(ctx: rule.Context, category: str) -> List: return groups -def get_group_data_sizes(ctx: rule.Context, group_name: str, ref_period: Optional[str] = None) -> List: +def get_group_data_sizes(ctx: rule.Context, group_name: str, ref_period: str | None = None) -> List: """Get group data sizes and return as a list of values. If no reference period is specified return closest to today. diff --git a/revisions.py b/revisions.py index db05a6b9b..1d533ec07 100644 --- a/revisions.py +++ b/revisions.py @@ -8,7 +8,7 @@ import random import re import time -from typing import Dict, Iterator, List, Optional, Tuple +from typing import Dict, Iterator, List, Tuple import genquery import irods_types @@ -538,7 +538,7 @@ def is_revision_blocked_by_admin(ctx: rule.Context) -> bool: return collection.exists(ctx, path) -def get_revision_store(ctx: rule.Context, group_name: str) -> Optional[str]: +def get_revision_store(ctx: rule.Context, group_name: str) -> str | None: """Get path to revision store for group if the path exists. :param ctx: Combined type of a callback and rei struct diff --git a/schema.py b/schema.py index 043b7f867..bfea19aa2 100644 --- a/schema.py +++ b/schema.py @@ -4,7 +4,7 @@ __license__ = 'GPLv3, see LICENSE' import re -from typing import Dict, Optional, Tuple +from typing import Dict, Tuple import genquery @@ -88,7 +88,7 @@ def get_schema_collection(ctx: rule.Context, rods_zone: str, group_name: str) -> return config.default_yoda_schema -def get_schema_id_from_group(ctx: rule.Context, group_name: str) -> Optional[str]: +def get_schema_id_from_group(ctx: rule.Context, group_name: str) -> str | None: """Returns the schema_id value that has been set on an iRODS group :param ctx: Combined type of a callback and rei struct @@ -184,14 +184,14 @@ def get_active_schema_id(ctx: rule.Context, path: str) -> str: return get_active_schema(ctx, path)['$id'] -def get_schema_id(ctx: rule.Context, metadata_path: str, metadata: Optional[Dict] = None) -> Optional[str]: +def get_schema_id(ctx: rule.Context, metadata_path: str, metadata: Dict | None = None) -> str | None: """Get the current schema id from a path to a metadata json.""" if metadata is None: metadata = jsonutil.read(ctx, metadata_path) return meta.metadata_get_schema_id(metadata) -def get_schema_path_by_id(ctx: rule.Context, path: str, schema_id: str) -> Optional[str]: +def get_schema_path_by_id(ctx: rule.Context, path: str, schema_id: str) -> str | None: """Get a schema path from a schema id.""" _, zone, _2, _3 = pathutil.info(path) @@ -204,7 +204,7 @@ def get_schema_path_by_id(ctx: rule.Context, path: str, schema_id: str) -> Optio return None -def get_schema_by_id(ctx: rule.Context, path: str, schema_id: str) -> Optional[Dict]: +def get_schema_by_id(ctx: rule.Context, path: str, schema_id: str) -> Dict | None: """ Get a schema from a schema id. diff --git a/schema_transformation.py b/schema_transformation.py index fb255cd39..0bcd322f5 100644 --- a/schema_transformation.py +++ b/schema_transformation.py @@ -13,7 +13,7 @@ import os import re import time -from typing import Callable, Dict, Optional +from typing import Callable, Dict import genquery import session_vars @@ -67,7 +67,7 @@ def api_transform_metadata(ctx: rule.Context, coll: str, keep_metadata_backup: b return api.Error('no_metadata', 'No metadata file found') -def get(ctx: rule.Context, metadata_path: str, metadata: Optional[Dict] = None) -> Optional[Callable]: +def get(ctx: rule.Context, metadata_path: str, metadata: Dict | None = None) -> Callable | None: """Find a transformation that can be executed on the given metadata JSON. :param ctx: Combined type of a ctx and rei struct @@ -343,7 +343,7 @@ def transform_orcid(ctx: rule.Context, m: Dict) -> Dict: return {'metadata': m, 'data_changed': data_changed} -def correctify_orcid(org_orcid: str) -> Optional[str]: +def correctify_orcid(org_orcid: str) -> str | None: """Function to correct illformatted ORCIDs. Returns None if value cannot be fixed.""" # Get rid of all spaces. orcid = org_orcid.replace(' ', '') diff --git a/schema_transformations.py b/schema_transformations.py index 079911c1a..82437679d 100644 --- a/schema_transformations.py +++ b/schema_transformations.py @@ -4,7 +4,7 @@ __license__ = 'GPLv3, see LICENSE' import re -from typing import Callable, Dict, Optional +from typing import Callable, Dict from schema_transformations_utils import correctify_isni, correctify_orcid, correctify_researcher_id, correctify_scopus @@ -717,7 +717,7 @@ def _teclab0_teclab1(ctx: rule.Context, m: Dict) -> Dict: # }}} -def get(src_id: str, dst_id: str) -> Optional[Callable]: +def get(src_id: str, dst_id: str) -> Callable | None: """ Get a transformation function that maps metadata from the given src schema id to the dst schema id. diff --git a/schema_transformations_utils.py b/schema_transformations_utils.py index 3f14335dd..d01df318b 100644 --- a/schema_transformations_utils.py +++ b/schema_transformations_utils.py @@ -4,10 +4,9 @@ __license__ = 'GPLv3, see LICENSE' import re -from typing import Optional -def correctify_orcid(org_orcid: str) -> Optional[str]: +def correctify_orcid(org_orcid: str) -> str | None: """Correct illformatted ORCID.""" # Get rid of all spaces. orcid = org_orcid.replace(' ', '') @@ -24,7 +23,7 @@ def correctify_orcid(org_orcid: str) -> Optional[str]: return "https://orcid.org/{}".format(orcs[-1]) -def correctify_scopus(org_scopus: str) -> Optional[str]: +def correctify_scopus(org_scopus: str) -> str | None: """Correct illformatted Scopus.""" # Get rid of all spaces. new_scopus = org_scopus.replace(' ', '') @@ -35,7 +34,7 @@ def correctify_scopus(org_scopus: str) -> Optional[str]: return new_scopus -def correctify_isni(org_isni: str) -> Optional[str]: +def correctify_isni(org_isni: str) -> str | None: """Correct ill-formatted ISNI.""" # Remove all spaces. new_isni = org_isni.replace(' ', '') diff --git a/settings.py b/settings.py index 827b85951..ba133eef3 100644 --- a/settings.py +++ b/settings.py @@ -3,7 +3,7 @@ __copyright__ = 'Copyright (c) 2021-2024, Utrecht University' __license__ = 'GPLv3, see LICENSE' -from typing import Dict, Optional, Sequence +from typing import Dict, Sequence from genquery import Query @@ -23,7 +23,7 @@ SETTINGS_KEY = constants.UUORGMETADATAPREFIX + "settings_" -def load(ctx: rule.Context, setting: str, username: Optional[str] = None) -> Sequence[str]: +def load(ctx: rule.Context, setting: str, username: str | None = None) -> Sequence[str]: """Load user setting. :param ctx: Combined type of a callback and rei struct diff --git a/setup.cfg b/setup.cfg index 08887bf8a..51c0fb4fe 100644 --- a/setup.cfg +++ b/setup.cfg @@ -8,7 +8,7 @@ exclude=__init__.py,tools,tests/env/ application-import-names=avu,conftest,util,api,config,constants,data_access_token,datacite,datarequest,data_object,epic,error,folder,groups,groups_import,json_datacite,json_landing_page,jsonutil,log,mail,meta,meta_form,msi,notifications,schema,schema_transformation,schema_transformations,settings,pathutil,provenance,policies_intake,policies_datamanager,policies_datapackage_status,policies_folder_status,policies_datarequest_status,publication,query,replication,revisions,revision_strategies,revision_utils,rule,user,vault,sram,arb_data_manager,cached_data_manager,resource,yoda_names,policies_utils [mypy] -exclude = tools|unit-tests|util +exclude = tools|unit-tests disable_error_code = arg-type, attr-defined, index, method-assign, misc, no-redef, operator, union-attr, unreachable, var-annotated ignore_missing_imports = True warn_unreachable = True @@ -17,6 +17,6 @@ check_untyped_defs = False disallow_any_generics = False disallow_incomplete_defs = True disallow_untyped_calls = False -disallow_untyped_defs = False +disallow_untyped_defs = True show_error_codes = True show_error_context = True diff --git a/vault.py b/vault.py index 90e7d50ff..d4bb203e0 100644 --- a/vault.py +++ b/vault.py @@ -8,7 +8,7 @@ import subprocess import time from datetime import datetime -from typing import Dict, List, Optional, Tuple +from typing import Dict, List, Tuple import genquery from dateutil import parser @@ -48,7 +48,7 @@ @api.make() -def api_vault_submit(ctx: rule.Context, coll: str, previous_version: Optional[str] = None) -> api.Result: +def api_vault_submit(ctx: rule.Context, coll: str, previous_version: str | None = None) -> api.Result: """Submit data package for publication. :param ctx: Combined type of a callback and rei struct @@ -522,7 +522,7 @@ def api_vault_system_metadata(ctx: rule.Context, coll: str) -> api.Result: return system_metadata -def get_coll_vault_status(ctx: rule.Context, path: str, org_metadata: Optional[List] = None) -> constants.vault_package_state: +def get_coll_vault_status(ctx: rule.Context, path: str, org_metadata: List | None = None) -> constants.vault_package_state: """Get the status of a vault folder.""" if org_metadata is None: org_metadata = folder.get_org_metadata(ctx, path) @@ -539,7 +539,7 @@ def get_coll_vault_status(ctx: rule.Context, path: str, org_metadata: Optional[L return constants.vault_package_state.EMPTY -def get_all_published_versions(ctx: rule.Context, path: str) -> Tuple[Optional[str], Optional[str], List]: +def get_all_published_versions(ctx: rule.Context, path: str) -> Tuple[str | None, str | None, List]: """Get all published versions of a data package.""" base_doi = get_doi(ctx, path, 'base') package_doi = get_doi(ctx, path) @@ -1276,7 +1276,7 @@ def vault_process_status_transitions(ctx: rule.Context, coll: str, new_coll_stat return ['Success', ''] -def vault_request_status_transitions(ctx: rule.Context, coll: str, new_vault_status: str, previous_version: Optional[str] = None) -> List: +def vault_request_status_transitions(ctx: rule.Context, coll: str, new_vault_status: str, previous_version: str | None = None) -> List: """Request vault status transition action. :param ctx: Combined type of a callback and rei struct @@ -1395,7 +1395,7 @@ def get_approver(ctx: rule.Context, path: str) -> str: return "" -def get_doi(ctx: rule.Context, path: str, doi: str = 'version') -> Optional[str]: +def get_doi(ctx: rule.Context, path: str, doi: str = 'version') -> str | None: """Get the DOI of a data package in the vault. :param ctx: Combined type of a callback and rei struct @@ -1419,7 +1419,7 @@ def get_doi(ctx: rule.Context, path: str, doi: str = 'version') -> Optional[str] return None -def get_previous_version(ctx: rule.Context, path: str) -> Optional[str]: +def get_previous_version(ctx: rule.Context, path: str) -> str | None: """Get the previous version of a data package in the vault. :param ctx: Combined type of a callback and rei struct @@ -1579,7 +1579,7 @@ def api_vault_get_published_packages(ctx: rule.Context, path: str) -> Dict: return published_packages -def update_archive(ctx: rule.Context, coll: str, attr: Optional[str] = None) -> None: +def update_archive(ctx: rule.Context, coll: str, attr: str | None = None) -> None: """Potentially update archive after metadata changed. :param ctx: Combined type of a callback and rei struct diff --git a/vault_archive.py b/vault_archive.py index 903a45749..3a0705520 100644 --- a/vault_archive.py +++ b/vault_archive.py @@ -5,7 +5,7 @@ import json import time -from typing import Dict, List, Optional +from typing import Dict, List import genquery import irods_types @@ -70,7 +70,7 @@ def key(item: Dict) -> int: return sorted(provenance_log, key=key) -def package_archive_path(ctx: rule.Context, coll: str) -> Optional[str]: +def package_archive_path(ctx: rule.Context, coll: str) -> str | None: for row in genquery.row_iterator("DATA_PATH", "COLL_NAME = '{}' AND DATA_NAME = 'archive.tar'".format(coll), genquery.AS_LIST, @@ -253,7 +253,7 @@ def vault_extract_archive(ctx: rule.Context, coll: str) -> str: return "Failure" -def update(ctx: rule.Context, coll: str, attr: Optional[str]) -> None: +def update(ctx: rule.Context, coll: str, attr: str | None) -> None: if pathutil.info(coll).space == pathutil.Space.VAULT and attr not in (constants.IIARCHIVEATTRNAME, constants.UUPROVENANCELOG) and vault_archival_status(ctx, coll) == "archived": avu.set_on_coll(ctx, coll, constants.IIARCHIVEATTRNAME, "update") ctx.dmget(package_archive_path(ctx, coll), config.data_package_archive_fqdn, "OFL")