Skip to content

Commit

Permalink
Fix Optional
Browse files Browse the repository at this point in the history
  • Loading branch information
lwesterhof committed Dec 11, 2024
1 parent 9ecf5c8 commit d6bf41b
Show file tree
Hide file tree
Showing 18 changed files with 66 additions and 68 deletions.
16 changes: 8 additions & 8 deletions datarequest.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from collections import OrderedDict
from datetime import datetime
from enum import Enum
from typing import Dict, List, Optional
from typing import Dict, List

import jsonschema
from genquery import AS_DICT, AS_LIST, Query, row_iterator
Expand Down Expand Up @@ -381,7 +381,7 @@ def get_status(stat: str) -> status:
return datarequest_action_permitted(ctx, request_id, roles, statuses)


def datarequest_action_permitted(ctx: rule.Context, request_id: str, roles: List, statuses: Optional[List]) -> bool:
def datarequest_action_permitted(ctx: rule.Context, request_id: str, roles: List, statuses: List | None) -> bool:
"""Check if current user and data request status meet specified restrictions.
:param ctx: Combined type of a callback and rei struct
Expand Down Expand Up @@ -424,7 +424,7 @@ def datarequest_action_permitted(ctx: rule.Context, request_id: str, roles: List


@api.make()
def api_datarequest_roles_get(ctx: rule.Context, request_id: Optional[str] = None) -> api.Result:
def api_datarequest_roles_get(ctx: rule.Context, request_id: str | None = None) -> api.Result:
"""Get roles of invoking user.
:param ctx: Combined type of a callback and rei struct
Expand All @@ -436,7 +436,7 @@ def api_datarequest_roles_get(ctx: rule.Context, request_id: Optional[str] = Non
return datarequest_roles_get(ctx, request_id)


def datarequest_roles_get(ctx: rule.Context, request_id: Optional[str] = None) -> List:
def datarequest_roles_get(ctx: rule.Context, request_id: str | None = None) -> List:
"""Get roles of invoking user.
:param ctx: Combined type of a callback and rei struct
Expand Down Expand Up @@ -472,7 +472,7 @@ def datarequest_is_owner(ctx: rule.Context, request_id: str) -> bool:
return datarequest_owner_get(ctx, request_id) == user.name(ctx)


def datarequest_owner_get(ctx: rule.Context, request_id: str) -> Optional[str]:
def datarequest_owner_get(ctx: rule.Context, request_id: str) -> str | None:
"""Get the account name (i.e. email address) of the owner of a data request.
:param ctx: Combined type of a callback and a rei struct
Expand Down Expand Up @@ -635,7 +635,7 @@ def datarequest_provenance_write(ctx: rule.Context, request_id: str, request_sta
return api.Error("write_error", "Could not write timestamp to provenance log: {}.".format(e))


def datarequest_data_valid(ctx: rule.Context, data: Dict, schema_name: Optional[str] = None, schema: Optional[str] = None) -> bool:
def datarequest_data_valid(ctx: rule.Context, data: Dict, schema_name: str | None = None, schema: str | None = None) -> bool:
"""Check if form data contains no errors
Default mode of operation is to provide schema data and the schema name of the schema against
Expand Down Expand Up @@ -676,7 +676,7 @@ def datarequest_data_valid(ctx: rule.Context, data: Dict, schema_name: Optional[
return False


def cc_email_addresses_get(contact_object: Dict) -> Optional[str]:
def cc_email_addresses_get(contact_object: Dict) -> str | None:
try:
cc = contact_object['cc_email_addresses']
return cc.replace(' ', '')
Expand Down Expand Up @@ -890,7 +890,7 @@ def file_write_and_lock(ctx: rule.Context, coll_path: str, filename: str, data:


@api.make()
def api_datarequest_submit(ctx: rule.Context, data: Dict, draft: bool, draft_request_id: Optional[str] = None) -> api.Result:
def api_datarequest_submit(ctx: rule.Context, data: Dict, draft: bool, draft_request_id: str | None = None) -> api.Result:
"""Persist a data request to disk.
:param ctx: Combined type of a callback and rei struct
Expand Down
4 changes: 2 additions & 2 deletions deposit.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

import re
from collections import OrderedDict
from typing import Dict, Optional
from typing import Dict

import genquery
from genquery import AS_DICT, Query
Expand Down Expand Up @@ -99,7 +99,7 @@ def api_deposit_create(ctx: rule.Context, deposit_group: str) -> api.Result:
return {"deposit_path": result["deposit_path"]}


def deposit_create(ctx: rule.Context, deposit_group: Optional[str]) -> Dict:
def deposit_create(ctx: rule.Context, deposit_group: str | None) -> Dict:
"""Create deposit collection.
:param ctx: Combined type of a callback and rei struct
Expand Down
14 changes: 7 additions & 7 deletions folder.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

import time
import uuid
from typing import List, Optional, Tuple
from typing import List, Tuple

import genquery
import irods_types
Expand Down Expand Up @@ -480,7 +480,7 @@ def set_epic_pid(ctx: rule.Context, target: str) -> bool:
return True


def get_cronjob_status(ctx: rule.Context, coll: str) -> Optional[str]:
def get_cronjob_status(ctx: rule.Context, coll: str) -> str | None:
"""Get the cronjob status of given collection"""
iter = genquery.row_iterator(
"META_COLL_ATTR_VALUE",
Expand Down Expand Up @@ -676,7 +676,7 @@ def get_org_metadata(ctx: rule.Context, path: str, object_type: pathutil.ObjectT
else " AND COLL_NAME = '{}'".format(path)))]


def get_locks(ctx: rule.Context, path: str, org_metadata: Optional[List[Tuple[str, str]]] = None, object_type: pathutil.ObjectType = pathutil.ObjectType.COLL) -> List[str]:
def get_locks(ctx: rule.Context, path: str, org_metadata: List[Tuple[str, str]] | None = None, object_type: pathutil.ObjectType = pathutil.ObjectType.COLL) -> List[str]:
"""Return all locks on a collection or data object (includes locks on parents and children)."""
if org_metadata is None:
org_metadata = get_org_metadata(ctx, path, object_type=object_type)
Expand All @@ -700,12 +700,12 @@ def api_folder_get_locks(ctx: rule.Context, coll: str) -> api.Result:
return locks


def has_locks(ctx: rule.Context, coll: str, org_metadata: Optional[List[Tuple[str, str]]] = None) -> bool:
def has_locks(ctx: rule.Context, coll: str, org_metadata: List[Tuple[str, str]] | None = None) -> bool:
"""Check whether a lock exists on the given collection, its parents or children."""
return len(get_locks(ctx, coll, org_metadata=org_metadata)) > 0


def is_locked(ctx: rule.Context, coll: str, org_metadata: Optional[List[Tuple[str, str]]] = None) -> bool:
def is_locked(ctx: rule.Context, coll: str, org_metadata: List[Tuple[str, str]] | None = None) -> bool:
"""Check whether a lock exists on the given collection itself or a parent collection.
Locks on subcollections are not counted.
Expand All @@ -722,14 +722,14 @@ def is_locked(ctx: rule.Context, coll: str, org_metadata: Optional[List[Tuple[st
return len([x for x in locks if coll.startswith(x)]) > 0


def is_data_locked(ctx: rule.Context, path: str, org_metadata: Optional[List[Tuple[str, str]]] = None) -> bool:
def is_data_locked(ctx: rule.Context, path: str, org_metadata: List[Tuple[str, str]] | None = None) -> bool:
"""Check whether a lock exists on the given data object."""
locks = get_locks(ctx, path, org_metadata=org_metadata, object_type=pathutil.ObjectType.DATA)

return len(locks) > 0


def get_status(ctx: rule.Context, path: str, org_metadata: Optional[List[Tuple[str, str]]] = None) -> constants.research_package_state:
def get_status(ctx: rule.Context, path: str, org_metadata: List[Tuple[str, str]] | None = None) -> constants.research_package_state:
"""Get the status of a research folder."""
if org_metadata is None:
org_metadata = get_org_metadata(ctx, path)
Expand Down
4 changes: 2 additions & 2 deletions groups.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import time
from collections import OrderedDict
from datetime import datetime
from typing import Any, Dict, Iterable, List, Optional, Tuple
from typing import Any, Dict, Iterable, List, Tuple

import genquery
import requests
Expand Down Expand Up @@ -118,7 +118,7 @@ def getGroupsData(ctx: rule.Context) -> Iterable[Any]:
return groups.values()


def getGroupData(ctx: rule.Context, name: str) -> Optional[Dict]:
def getGroupData(ctx: rule.Context, name: str) -> Dict | None:
"""Get data for one group."""
group = None

Expand Down
4 changes: 2 additions & 2 deletions mail.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,14 @@
import re
import smtplib
from email.mime.text import MIMEText
from typing import Optional, Tuple
from typing import Tuple

from util import *

__all__ = ['rule_mail_test']


def send(ctx: rule.Context, to: str, actor: str, subject: str, body: str, cc: Optional[str] = None) -> api.Result:
def send(ctx: rule.Context, to: str, actor: str, subject: str, body: str, cc: str | None = None) -> api.Result:
"""Send an e-mail with specified recipient, subject and body.
The originating address and mail server credentials are taken from the
Expand Down
18 changes: 9 additions & 9 deletions meta.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import re
from collections import OrderedDict
from datetime import datetime
from typing import Dict, List, Optional
from typing import Dict, List

import genquery
import irods_types
Expand Down Expand Up @@ -40,7 +40,7 @@ def metadata_get_links(metadata: Dict) -> List:
metadata['links']))


def metadata_get_schema_id(metadata: Dict) -> Optional[str]:
def metadata_get_schema_id(metadata: Dict) -> str | None:
desc = list(filter(lambda x: x['rel'] == 'describedby', metadata_get_links(metadata)))
if len(desc) > 0:
return desc[0]['href']
Expand All @@ -58,8 +58,8 @@ def metadata_set_schema_id(metadata: Dict, schema_id: str) -> None:

def get_json_metadata_errors(ctx: rule.Context,
metadata_path: str,
metadata: Optional[Dict] = None,
schema: Optional[Dict] = None,
metadata: Dict | None = None,
schema: Dict | None = None,
ignore_required: bool = False) -> List:
"""
Validate JSON metadata, and return a list of errors, if any.
Expand Down Expand Up @@ -153,7 +153,7 @@ def transform_error(e):

def is_json_metadata_valid(ctx: rule.Context,
metadata_path: str,
metadata: Optional[Dict] = None,
metadata: Dict | None = None,
ignore_required: bool = False) -> bool:
"""Check if json metadata contains no errors.
Expand All @@ -177,7 +177,7 @@ def is_json_metadata_valid(ctx: rule.Context,
return False


def get_collection_metadata_path(ctx: rule.Context, coll: str) -> Optional[str]:
def get_collection_metadata_path(ctx: rule.Context, coll: str) -> str | None:
"""Check if a collection has a JSON metadata file and provide its path, if any.
:param ctx: Combined type of a callback and rei struct
Expand All @@ -192,7 +192,7 @@ def get_collection_metadata_path(ctx: rule.Context, coll: str) -> Optional[str]:
return None


def get_latest_vault_metadata_path(ctx: rule.Context, vault_pkg_coll: str) -> Optional[str]:
def get_latest_vault_metadata_path(ctx: rule.Context, vault_pkg_coll: str) -> str | None:
"""Get the latest vault metadata JSON file.
:param ctx: Combined type of a callback and rei struct
Expand Down Expand Up @@ -238,7 +238,7 @@ def rule_meta_validate(rule_args, callback, rei):
rule_args[2] = 'metadata validated'


def collection_has_cloneable_metadata(ctx: rule.Context, coll: str) -> Optional[str]:
def collection_has_cloneable_metadata(ctx: rule.Context, coll: str) -> str | None:
"""Check if a collection has metadata, and validate it.
This always ignores 'required' schema attributes, since metadata can
Expand Down Expand Up @@ -786,7 +786,7 @@ def copy_user_metadata(ctx: rule.Context, source: str, target: str) -> None:
log.write(ctx, "copy_user_metadata: failed to copy user metadata from <{}> to <{}/original>".format(source, target))


def vault_metadata_matches_schema(ctx: rule.Context, coll_name: str, schema_cache: Dict, report_name: str, write_stdout: bool) -> Optional[Dict]:
def vault_metadata_matches_schema(ctx: rule.Context, coll_name: str, schema_cache: Dict, report_name: str, write_stdout: bool) -> Dict | None:
"""Process a single data package to retrieve and validate that its metadata conforms to the schema.
:param ctx: Combined type of a callback and rei struct
Expand Down
6 changes: 3 additions & 3 deletions meta_form.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
__license__ = 'GPLv3, see LICENSE'

import re
from typing import Dict, List, Optional, Tuple
from typing import Dict, List, Tuple

import irods_types

Expand All @@ -20,7 +20,7 @@
'api_meta_form_save']


def get_coll_lock(ctx: rule.Context, path: str, org_metadata: Optional[List] = None) -> Tuple[str, str]:
def get_coll_lock(ctx: rule.Context, path: str, org_metadata: List | None = None) -> Tuple[str, str]:
"""Check for existence of locks on a collection.
path -> ((no|here|outoftree|ancestor|descendant), rootcoll)
Expand Down Expand Up @@ -50,7 +50,7 @@ def get_coll_lock(ctx: rule.Context, path: str, org_metadata: Optional[List] = N
return ret


def get_coll_lock_count(ctx: rule.Context, path: str, org_metadata: Optional[List] = None) -> int:
def get_coll_lock_count(ctx: rule.Context, path: str, org_metadata: List | None = None) -> int:
"""Count locks on a collection.
:param ctx: Combined type of a callback and rei struct
Expand Down
3 changes: 1 addition & 2 deletions policies.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
__license__ = 'GPLv3, see LICENSE'

import re
from typing import Optional

import session_vars

Expand Down Expand Up @@ -641,7 +640,7 @@ def pep_resource_resolve_hierarchy_pre(ctx: rule.Context,
operation: str,
host: str,
parser: str,
vote: str) -> Optional[str]:
vote: str) -> str | None:
if not config.arb_enabled or operation != "CREATE":
return None

Expand Down
4 changes: 2 additions & 2 deletions resources.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
__license__ = 'GPLv3, see LICENSE'

from datetime import datetime
from typing import Dict, List, Optional
from typing import Dict, List

import genquery

Expand Down Expand Up @@ -680,7 +680,7 @@ def get_groups_on_category(ctx: rule.Context, category: str) -> List:
return groups


def get_group_data_sizes(ctx: rule.Context, group_name: str, ref_period: Optional[str] = None) -> List:
def get_group_data_sizes(ctx: rule.Context, group_name: str, ref_period: str | None = None) -> List:
"""Get group data sizes and return as a list of values.
If no reference period is specified return closest to today.
Expand Down
4 changes: 2 additions & 2 deletions revisions.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import random
import re
import time
from typing import Dict, Iterator, List, Optional, Tuple
from typing import Dict, Iterator, List, Tuple

import genquery
import irods_types
Expand Down Expand Up @@ -538,7 +538,7 @@ def is_revision_blocked_by_admin(ctx: rule.Context) -> bool:
return collection.exists(ctx, path)


def get_revision_store(ctx: rule.Context, group_name: str) -> Optional[str]:
def get_revision_store(ctx: rule.Context, group_name: str) -> str | None:
"""Get path to revision store for group if the path exists.
:param ctx: Combined type of a callback and rei struct
Expand Down
10 changes: 5 additions & 5 deletions schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
__license__ = 'GPLv3, see LICENSE'

import re
from typing import Dict, Optional, Tuple
from typing import Dict, Tuple

import genquery

Expand Down Expand Up @@ -88,7 +88,7 @@ def get_schema_collection(ctx: rule.Context, rods_zone: str, group_name: str) ->
return config.default_yoda_schema


def get_schema_id_from_group(ctx: rule.Context, group_name: str) -> Optional[str]:
def get_schema_id_from_group(ctx: rule.Context, group_name: str) -> str | None:
"""Returns the schema_id value that has been set on an iRODS group
:param ctx: Combined type of a callback and rei struct
Expand Down Expand Up @@ -184,14 +184,14 @@ def get_active_schema_id(ctx: rule.Context, path: str) -> str:
return get_active_schema(ctx, path)['$id']


def get_schema_id(ctx: rule.Context, metadata_path: str, metadata: Optional[Dict] = None) -> Optional[str]:
def get_schema_id(ctx: rule.Context, metadata_path: str, metadata: Dict | None = None) -> str | None:
"""Get the current schema id from a path to a metadata json."""
if metadata is None:
metadata = jsonutil.read(ctx, metadata_path)
return meta.metadata_get_schema_id(metadata)


def get_schema_path_by_id(ctx: rule.Context, path: str, schema_id: str) -> Optional[str]:
def get_schema_path_by_id(ctx: rule.Context, path: str, schema_id: str) -> str | None:
"""Get a schema path from a schema id."""
_, zone, _2, _3 = pathutil.info(path)

Expand All @@ -204,7 +204,7 @@ def get_schema_path_by_id(ctx: rule.Context, path: str, schema_id: str) -> Optio
return None


def get_schema_by_id(ctx: rule.Context, path: str, schema_id: str) -> Optional[Dict]:
def get_schema_by_id(ctx: rule.Context, path: str, schema_id: str) -> Dict | None:
"""
Get a schema from a schema id.
Expand Down
Loading

0 comments on commit d6bf41b

Please sign in to comment.