From e0f49be016fd2c8a85b788468fc731ee73250b02 Mon Sep 17 00:00:00 2001 From: Kay Robbins <1189050+VisLab@users.noreply.github.com> Date: Mon, 30 Sep 2024 08:13:42 -0500 Subject: [PATCH] Minor spelling corrections --- hed/errors/error_messages.py | 2 +- hed/models/definition_dict.py | 4 +- hed/models/df_util.py | 584 +++++++------- hed/models/hed_string.py | 754 +++++++++--------- hed/models/query_handler.py | 364 ++++----- hed/schema/hed_cache.py | 14 +- hed/schema/hed_schema_io.py | 8 +- hed/schema/schema_io/base2schema.py | 432 +++++----- hed/schema/schema_io/ontology_util.py | 10 +- hed/schema/schema_io/schema2df.py | 4 +- hed/schema/schema_validation_util.py | 2 +- .../schema_validation_util_deprecated.py | 164 ++-- hed/scripts/convert_and_update_schema.py | 2 +- hed/tools/analysis/hed_type.py | 2 +- hed/validator/def_validator.py | 6 +- hed/validator/hed_validator.py | 4 +- hed/validator/onset_validator.py | 160 ++-- hed/validator/sidecar_validator.py | 636 +++++++-------- hed/validator/spreadsheet_validator.py | 372 ++++----- .../summarize_hed_types_rmdl.json | 2 +- tests/models/test_hed_group.py | 2 +- tests/schema/test_hed_schema_io.py | 6 +- tests/tools/bids/test_bids_tabular_file.py | 2 +- 23 files changed, 1768 insertions(+), 1768 deletions(-) diff --git a/hed/errors/error_messages.py b/hed/errors/error_messages.py index e86a1ac48..06c9c6514 100644 --- a/hed/errors/error_messages.py +++ b/hed/errors/error_messages.py @@ -132,7 +132,7 @@ def val_error_invalid_parent(tag, problem_tag, expected_parent_tag): @hed_tag_error(ValidationErrors.NO_VALID_TAG_FOUND, has_sub_tag=True, actual_code=ValidationErrors.TAG_INVALID) def val_error_no_valid_tag(tag, problem_tag): - return f"'{problem_tag}' in {tag} is not a valid base hed tag." + return f"'{problem_tag}' in {tag} is not a valid base HED tag." @hed_tag_error(ValidationErrors.VALUE_INVALID) diff --git a/hed/models/definition_dict.py b/hed/models/definition_dict.py index a013d4446..b7996e84e 100644 --- a/hed/models/definition_dict.py +++ b/hed/models/definition_dict.py @@ -262,7 +262,7 @@ def get_definition_entry(self, def_tag): Does not validate at all. Parameters: - def_tag (HedTag): Source hed tag that may be a Def or Def-expand tag. + def_tag (HedTag): Source HED tag that may be a Def or Def-expand tag. Returns: def_entry(DefinitionEntry or None): The definition entry if it exists @@ -279,7 +279,7 @@ def _get_definition_contents(self, def_tag): Does not validate at all. Parameters: - def_tag (HedTag): Source hed tag that may be a Def or Def-expand tag. + def_tag (HedTag): Source HED tag that may be a Def or Def-expand tag. Returns: def_contents: HedGroup diff --git a/hed/models/df_util.py b/hed/models/df_util.py index daef2fb26..39aa979b3 100644 --- a/hed/models/df_util.py +++ b/hed/models/df_util.py @@ -1,292 +1,292 @@ -""" Utilities for assembly and conversion of HED strings to different forms. """ -import re -from functools import partial -import pandas as pd -from hed.models.hed_string import HedString -from hed.models.model_constants import DefTagNames - - -def convert_to_form(df, hed_schema, tag_form, columns=None): - """ Convert all tags in underlying dataframe to the specified form (in place). - - Parameters: - df (pd.Dataframe or pd.Series): The dataframe or series to modify. - hed_schema (HedSchema): The schema to use to convert tags. - tag_form(str): HedTag property to convert tags to. - columns (list): The columns to modify on the dataframe. - - """ - if isinstance(df, pd.Series): - df[:] = df.apply(partial(_convert_to_form, hed_schema=hed_schema, tag_form=tag_form)) - else: - if columns is None: - columns = df.columns - - for column in columns: - df[column] = df[column].apply(partial(_convert_to_form, hed_schema=hed_schema, tag_form=tag_form)) - - -def shrink_defs(df, hed_schema, columns=None): - """ Shrink (in place) any def-expand tags found in the specified columns in the dataframe. - - Parameters: - df (pd.Dataframe or pd.Series): The dataframe or series to modify. - hed_schema (HedSchema or None): The schema to use to identify defs. - columns (list or None): The columns to modify on the dataframe. - - """ - if isinstance(df, pd.Series): - mask = df.str.contains('Def-expand/', case=False) - df[mask] = df[mask].apply(partial(_shrink_defs, hed_schema=hed_schema)) - else: - if columns is None: - columns = df.columns - - for column in columns: - mask = df[column].str.contains('Def-expand/', case=False) - df[column][mask] = df[column][mask].apply(partial(_shrink_defs, hed_schema=hed_schema)) - - -def expand_defs(df, hed_schema, def_dict, columns=None): - """ Expands any def tags found in the dataframe. - - Converts in place - - Parameters: - df (pd.Dataframe or pd.Series): The dataframe or series to modify. - hed_schema (HedSchema or None): The schema to use to identify defs. - def_dict (DefinitionDict): The definitions to expand. - columns (list or None): The columns to modify on the dataframe. - """ - if isinstance(df, pd.Series): - mask = df.str.contains('Def/', case=False) - df[mask] = df[mask].apply(partial(_expand_defs, hed_schema=hed_schema, def_dict=def_dict)) - else: - if columns is None: - columns = df.columns - - for column in columns: - mask = df[column].str.contains('Def/', case=False) - df.loc[mask, column] = df.loc[mask, column].apply(partial(_expand_defs, - hed_schema=hed_schema, def_dict=def_dict)) - - -def _convert_to_form(hed_string, hed_schema, tag_form): - return str(HedString(hed_string, hed_schema).get_as_form(tag_form)) - - -def _shrink_defs(hed_string, hed_schema): - return str(HedString(hed_string, hed_schema).shrink_defs()) - - -def _expand_defs(hed_string, hed_schema, def_dict): - return str(HedString(hed_string, hed_schema, def_dict).expand_defs()) - - -def process_def_expands(hed_strings, hed_schema, known_defs=None, ambiguous_defs=None): - """ Gather def-expand tags in the strings/compare with known definitions to find any differences. - - Parameters: - hed_strings (list or pd.Series): A list of HED strings to process. - hed_schema (HedSchema): The schema to use. - known_defs (DefinitionDict or list or str or None): - A DefinitionDict or anything its constructor takes. These are the known definitions going in, that must - match perfectly. - ambiguous_defs (dict): A dictionary containing ambiguous definitions. - format TBD. Currently def name key: list of lists of HED tags values - - Returns: - tuple: A tuple containing the DefinitionDict, ambiguous definitions, and errors. - """ - from hed.models.def_expand_gather import DefExpandGatherer - def_gatherer = DefExpandGatherer(hed_schema, known_defs, ambiguous_defs) - return def_gatherer.process_def_expands(hed_strings) - - -def sort_dataframe_by_onsets(df): - """ Gather def-expand tags in the strings/compare with known definitions to find any differences. - - Parameters: - df(pd.Dataframe): Dataframe to sort. - - Returns: - The sorted dataframe, or the original dataframe if it didn't have an onset column. - """ - if "onset" in df.columns: - # Create a copy and sort by onsets as floats(if needed), but continue to keep the string version. - df_copy = df.copy() - df_copy['_temp_onset_sort'] = df_copy['onset'].astype(float) - df_copy.sort_values(by='_temp_onset_sort', inplace=True) - df_copy.drop(columns=['_temp_onset_sort'], inplace=True) - - return df_copy - return df - - -def replace_ref(text, oldvalue, newvalue="n/a"): - """ Replace column ref in x with y. If it's n/a, delete extra commas/parentheses. - - Parameters: - text (str): The input string containing the ref enclosed in curly braces. - oldvalue (str): The full tag or ref to replace - newvalue (str): The replacement value for the ref. - - Returns: - str: The modified string with the ref replaced or removed. - """ - # If it's not n/a, we can just replace directly. - if newvalue != "n/a": - return text.replace(oldvalue, newvalue) - - def _remover(match): - p1 = match.group("p1").count("(") - p2 = match.group("p2").count(")") - if p1 > p2: # We have more starting parens than ending. Make sure we don't remove comma before - output = match.group("c1") + "(" * (p1 - p2) - elif p2 > p1: # We have more ending parens. Make sure we don't remove comma after - output = ")" * (p2 - p1) + match.group("c2") - else: - c1 = match.group("c1") - c2 = match.group("c2") - if c1: - c1 = "" - elif c2: - c2 = "" - output = c1 + c2 - - return output - - # this finds all surrounding commas and parentheses to a reference. - # c1/c2 contain the comma(and possibly spaces) separating this ref from other tags - # p1/p2 contain the parentheses directly surrounding the tag - # All four groups can have spaces. - pattern = r'(?P[\s,]*)(?P[(\s]*)' + oldvalue + r'(?P[\s)]*)(?P[\s,]*)' - return re.sub(pattern, _remover, text) - - -def _handle_curly_braces_refs(df, refs, column_names): - """ Fills in the refs in the dataframe - - You probably shouldn't call this function directly, but rather use base input. - - Parameters: - df(pd.DataFrame): The dataframe to modify - refs(list or pd.Series): a list of column refs to replace(without {}) - column_names(list): the columns we are interested in(should include all ref columns) - - Returns: - modified_df(pd.DataFrame): The modified dataframe with refs replaced - """ - # Filter out columns and refs that don't exist. - refs = [ref for ref in refs if ref in column_names] - remaining_columns = [column for column in column_names if column not in refs] - - new_df = df.copy() - # Replace references in the columns we are saving out. - saved_columns = new_df[refs] - for column_name in remaining_columns: - for replacing_name in refs: - # If the data has no n/a values, this version is MUCH faster. - # column_name_brackets = f"{{{replacing_name}}}" - # df[column_name] = pd.Series(x.replace(column_name_brackets, y) for x, y - # in zip(df[column_name], saved_columns[replacing_name])) - new_df[column_name] = pd.Series(replace_ref(x, f"{{{replacing_name}}}", y) for x, y - in zip(new_df[column_name], saved_columns[replacing_name])) - new_df = new_df[remaining_columns] - - return new_df - - -# todo: Consider updating this to be a pure string function(or at least, only instantiating the Duration tags) -def split_delay_tags(series, hed_schema, onsets): - """Sorts the series based on Delay tags, so that the onsets are in order after delay is applied. - - Parameters: - series(pd.Series or None): the series of tags to split/sort - hed_schema(HedSchema): The schema to use to identify tags - onsets(pd.Series or None) - - Returns: - sorted_df(pd.Dataframe or None): If we had onsets, a dataframe with 3 columns - "HED": The hed strings(still str) - "onset": the updated onsets - "original_index": the original source line. Multiple lines can have the same original source line. - - Note: This dataframe may be longer than the original series, but it will never be shorter. - """ - if series is None or onsets is None: - return - split_df = pd.DataFrame({"onset": onsets, "HED": series, "original_index": series.index}) - delay_strings = [(i, HedString(hed_string, hed_schema)) for (i, hed_string) in series.items() if - "delay/" in hed_string.casefold()] - delay_groups = [] - for i, delay_string in delay_strings: - duration_tags = delay_string.find_top_level_tags({DefTagNames.DELAY_KEY}) - to_remove = [] - for tag, group in duration_tags: - onset_mod = tag.value_as_default_unit() + float(onsets[i]) - to_remove.append(group) - insert_index = split_df['original_index'].index.max() + 1 - split_df.loc[insert_index] = {'HED': str(group), 'onset': onset_mod, 'original_index': i} - delay_string.remove(to_remove) - # update the old string with the removals done - split_df.at[i, "HED"] = str(delay_string) - - for i, onset_mod, group in delay_groups: - insert_index = split_df['original_index'].index.max() + 1 - split_df.loc[insert_index] = {'HED': str(group), 'onset': onset_mod, 'original_index': i} - split_df = sort_dataframe_by_onsets(split_df) - split_df.reset_index(drop=True, inplace=True) - - split_df = filter_series_by_onset(split_df, split_df.onset) - return split_df - - -def filter_series_by_onset(series, onsets): - """Return the series, with rows that have the same onset combined. - - Parameters: - series(pd.Series or pd.Dataframe): the series to filter. If dataframe, it filters the "HED" column - onsets(pd.Series): the onset column to filter by - Returns: - Series or Dataframe: the series with rows filtered together. - """ - indexed_dict = _indexed_dict_from_onsets(onsets.astype(float)) - return _filter_by_index_list(series, indexed_dict=indexed_dict) - - -def _indexed_dict_from_onsets(onsets): - """Finds series of consecutive lines with the same(or close enough) onset""" - current_onset = -1000000.0 - tol = 1e-9 - from collections import defaultdict - indexed_dict = defaultdict(list) - for i, onset in enumerate(onsets): - if abs(onset - current_onset) > tol: - current_onset = onset - indexed_dict[current_onset].append(i) - - return indexed_dict - - -def _filter_by_index_list(original_data, indexed_dict): - """Filters a series or dataframe by the indexed_dict, joining lines as indicated""" - if isinstance(original_data, pd.Series): - data_series = original_data - elif isinstance(original_data, pd.DataFrame): - data_series = original_data["HED"] - else: - raise TypeError("Input must be a pandas Series or DataFrame") - - new_series = pd.Series([""] * len(data_series), dtype=str) - for onset, indices in indexed_dict.items(): - if indices: - first_index = indices[0] - new_series[first_index] = ",".join([str(data_series[i]) for i in indices]) - - if isinstance(original_data, pd.Series): - return new_series - else: - result_df = original_data.copy() - result_df["HED"] = new_series - return result_df +""" Utilities for assembly and conversion of HED strings to different forms. """ +import re +from functools import partial +import pandas as pd +from hed.models.hed_string import HedString +from hed.models.model_constants import DefTagNames + + +def convert_to_form(df, hed_schema, tag_form, columns=None): + """ Convert all tags in underlying dataframe to the specified form (in place). + + Parameters: + df (pd.Dataframe or pd.Series): The dataframe or series to modify. + hed_schema (HedSchema): The schema to use to convert tags. + tag_form(str): HedTag property to convert tags to. + columns (list): The columns to modify on the dataframe. + + """ + if isinstance(df, pd.Series): + df[:] = df.apply(partial(_convert_to_form, hed_schema=hed_schema, tag_form=tag_form)) + else: + if columns is None: + columns = df.columns + + for column in columns: + df[column] = df[column].apply(partial(_convert_to_form, hed_schema=hed_schema, tag_form=tag_form)) + + +def shrink_defs(df, hed_schema, columns=None): + """ Shrink (in place) any def-expand tags found in the specified columns in the dataframe. + + Parameters: + df (pd.Dataframe or pd.Series): The dataframe or series to modify. + hed_schema (HedSchema or None): The schema to use to identify defs. + columns (list or None): The columns to modify on the dataframe. + + """ + if isinstance(df, pd.Series): + mask = df.str.contains('Def-expand/', case=False) + df[mask] = df[mask].apply(partial(_shrink_defs, hed_schema=hed_schema)) + else: + if columns is None: + columns = df.columns + + for column in columns: + mask = df[column].str.contains('Def-expand/', case=False) + df[column][mask] = df[column][mask].apply(partial(_shrink_defs, hed_schema=hed_schema)) + + +def expand_defs(df, hed_schema, def_dict, columns=None): + """ Expands any def tags found in the dataframe. + + Converts in place + + Parameters: + df (pd.Dataframe or pd.Series): The dataframe or series to modify. + hed_schema (HedSchema or None): The schema to use to identify defs. + def_dict (DefinitionDict): The definitions to expand. + columns (list or None): The columns to modify on the dataframe. + """ + if isinstance(df, pd.Series): + mask = df.str.contains('Def/', case=False) + df[mask] = df[mask].apply(partial(_expand_defs, hed_schema=hed_schema, def_dict=def_dict)) + else: + if columns is None: + columns = df.columns + + for column in columns: + mask = df[column].str.contains('Def/', case=False) + df.loc[mask, column] = df.loc[mask, column].apply(partial(_expand_defs, + hed_schema=hed_schema, def_dict=def_dict)) + + +def _convert_to_form(hed_string, hed_schema, tag_form): + return str(HedString(hed_string, hed_schema).get_as_form(tag_form)) + + +def _shrink_defs(hed_string, hed_schema): + return str(HedString(hed_string, hed_schema).shrink_defs()) + + +def _expand_defs(hed_string, hed_schema, def_dict): + return str(HedString(hed_string, hed_schema, def_dict).expand_defs()) + + +def process_def_expands(hed_strings, hed_schema, known_defs=None, ambiguous_defs=None): + """ Gather def-expand tags in the strings/compare with known definitions to find any differences. + + Parameters: + hed_strings (list or pd.Series): A list of HED strings to process. + hed_schema (HedSchema): The schema to use. + known_defs (DefinitionDict or list or str or None): + A DefinitionDict or anything its constructor takes. These are the known definitions going in, that must + match perfectly. + ambiguous_defs (dict): A dictionary containing ambiguous definitions. + format TBD. Currently def name key: list of lists of HED tags values + + Returns: + tuple: A tuple containing the DefinitionDict, ambiguous definitions, and errors. + """ + from hed.models.def_expand_gather import DefExpandGatherer + def_gatherer = DefExpandGatherer(hed_schema, known_defs, ambiguous_defs) + return def_gatherer.process_def_expands(hed_strings) + + +def sort_dataframe_by_onsets(df): + """ Gather def-expand tags in the strings/compare with known definitions to find any differences. + + Parameters: + df(pd.Dataframe): Dataframe to sort. + + Returns: + The sorted dataframe, or the original dataframe if it didn't have an onset column. + """ + if "onset" in df.columns: + # Create a copy and sort by onsets as floats(if needed), but continue to keep the string version. + df_copy = df.copy() + df_copy['_temp_onset_sort'] = df_copy['onset'].astype(float) + df_copy.sort_values(by='_temp_onset_sort', inplace=True) + df_copy.drop(columns=['_temp_onset_sort'], inplace=True) + + return df_copy + return df + + +def replace_ref(text, oldvalue, newvalue="n/a"): + """ Replace column ref in x with y. If it's n/a, delete extra commas/parentheses. + + Parameters: + text (str): The input string containing the ref enclosed in curly braces. + oldvalue (str): The full tag or ref to replace + newvalue (str): The replacement value for the ref. + + Returns: + str: The modified string with the ref replaced or removed. + """ + # If it's not n/a, we can just replace directly. + if newvalue != "n/a": + return text.replace(oldvalue, newvalue) + + def _remover(match): + p1 = match.group("p1").count("(") + p2 = match.group("p2").count(")") + if p1 > p2: # We have more starting parens than ending. Make sure we don't remove comma before + output = match.group("c1") + "(" * (p1 - p2) + elif p2 > p1: # We have more ending parens. Make sure we don't remove comma after + output = ")" * (p2 - p1) + match.group("c2") + else: + c1 = match.group("c1") + c2 = match.group("c2") + if c1: + c1 = "" + elif c2: + c2 = "" + output = c1 + c2 + + return output + + # this finds all surrounding commas and parentheses to a reference. + # c1/c2 contain the comma(and possibly spaces) separating this ref from other tags + # p1/p2 contain the parentheses directly surrounding the tag + # All four groups can have spaces. + pattern = r'(?P[\s,]*)(?P[(\s]*)' + oldvalue + r'(?P[\s)]*)(?P[\s,]*)' + return re.sub(pattern, _remover, text) + + +def _handle_curly_braces_refs(df, refs, column_names): + """ Fills in the refs in the dataframe + + You probably shouldn't call this function directly, but rather use base input. + + Parameters: + df(pd.DataFrame): The dataframe to modify + refs(list or pd.Series): a list of column refs to replace(without {}) + column_names(list): the columns we are interested in(should include all ref columns) + + Returns: + modified_df(pd.DataFrame): The modified dataframe with refs replaced + """ + # Filter out columns and refs that don't exist. + refs = [ref for ref in refs if ref in column_names] + remaining_columns = [column for column in column_names if column not in refs] + + new_df = df.copy() + # Replace references in the columns we are saving out. + saved_columns = new_df[refs] + for column_name in remaining_columns: + for replacing_name in refs: + # If the data has no n/a values, this version is MUCH faster. + # column_name_brackets = f"{{{replacing_name}}}" + # df[column_name] = pd.Series(x.replace(column_name_brackets, y) for x, y + # in zip(df[column_name], saved_columns[replacing_name])) + new_df[column_name] = pd.Series(replace_ref(x, f"{{{replacing_name}}}", y) for x, y + in zip(new_df[column_name], saved_columns[replacing_name])) + new_df = new_df[remaining_columns] + + return new_df + + +# todo: Consider updating this to be a pure string function(or at least, only instantiating the Duration tags) +def split_delay_tags(series, hed_schema, onsets): + """Sorts the series based on Delay tags, so that the onsets are in order after delay is applied. + + Parameters: + series(pd.Series or None): the series of tags to split/sort + hed_schema(HedSchema): The schema to use to identify tags + onsets(pd.Series or None) + + Returns: + sorted_df(pd.Dataframe or None): If we had onsets, a dataframe with 3 columns + "HED": The HED strings(still str) + "onset": the updated onsets + "original_index": the original source line. Multiple lines can have the same original source line. + + Note: This dataframe may be longer than the original series, but it will never be shorter. + """ + if series is None or onsets is None: + return + split_df = pd.DataFrame({"onset": onsets, "HED": series, "original_index": series.index}) + delay_strings = [(i, HedString(hed_string, hed_schema)) for (i, hed_string) in series.items() if + "delay/" in hed_string.casefold()] + delay_groups = [] + for i, delay_string in delay_strings: + duration_tags = delay_string.find_top_level_tags({DefTagNames.DELAY_KEY}) + to_remove = [] + for tag, group in duration_tags: + onset_mod = tag.value_as_default_unit() + float(onsets[i]) + to_remove.append(group) + insert_index = split_df['original_index'].index.max() + 1 + split_df.loc[insert_index] = {'HED': str(group), 'onset': onset_mod, 'original_index': i} + delay_string.remove(to_remove) + # update the old string with the removals done + split_df.at[i, "HED"] = str(delay_string) + + for i, onset_mod, group in delay_groups: + insert_index = split_df['original_index'].index.max() + 1 + split_df.loc[insert_index] = {'HED': str(group), 'onset': onset_mod, 'original_index': i} + split_df = sort_dataframe_by_onsets(split_df) + split_df.reset_index(drop=True, inplace=True) + + split_df = filter_series_by_onset(split_df, split_df.onset) + return split_df + + +def filter_series_by_onset(series, onsets): + """Return the series, with rows that have the same onset combined. + + Parameters: + series(pd.Series or pd.Dataframe): the series to filter. If dataframe, it filters the "HED" column + onsets(pd.Series): the onset column to filter by + Returns: + Series or Dataframe: the series with rows filtered together. + """ + indexed_dict = _indexed_dict_from_onsets(onsets.astype(float)) + return _filter_by_index_list(series, indexed_dict=indexed_dict) + + +def _indexed_dict_from_onsets(onsets): + """Finds series of consecutive lines with the same(or close enough) onset""" + current_onset = -1000000.0 + tol = 1e-9 + from collections import defaultdict + indexed_dict = defaultdict(list) + for i, onset in enumerate(onsets): + if abs(onset - current_onset) > tol: + current_onset = onset + indexed_dict[current_onset].append(i) + + return indexed_dict + + +def _filter_by_index_list(original_data, indexed_dict): + """Filters a series or dataframe by the indexed_dict, joining lines as indicated""" + if isinstance(original_data, pd.Series): + data_series = original_data + elif isinstance(original_data, pd.DataFrame): + data_series = original_data["HED"] + else: + raise TypeError("Input must be a pandas Series or DataFrame") + + new_series = pd.Series([""] * len(data_series), dtype=str) + for onset, indices in indexed_dict.items(): + if indices: + first_index = indices[0] + new_series[first_index] = ",".join([str(data_series[i]) for i in indices]) + + if isinstance(original_data, pd.Series): + return new_series + else: + result_df = original_data.copy() + result_df["HED"] = new_series + return result_df diff --git a/hed/models/hed_string.py b/hed/models/hed_string.py index 32a443f03..9a986b248 100644 --- a/hed/models/hed_string.py +++ b/hed/models/hed_string.py @@ -1,377 +1,377 @@ -""" A HED string with its schema and definitions. """ -import copy -from hed.models.hed_group import HedGroup -from hed.models.hed_tag import HedTag -from hed.models.model_constants import DefTagNames - - -class HedString(HedGroup): - """ A HED string with its schema and definitions. """ - - OPENING_GROUP_CHARACTER = '(' - CLOSING_GROUP_CHARACTER = ')' - - def __init__(self, hed_string, hed_schema, def_dict=None, _contents=None): - """ Constructor for the HedString class. - - Parameters: - hed_string (str): A HED string consisting of tags and tag groups. - hed_schema (HedSchema): The schema to use to identify tags. - def_dict(DefinitionDict or None): The def dict to use to identify def/def expand tags. - _contents ([HedGroup and/or HedTag] or None): Create a HedString from this exact list of children. - Does not make a copy. - Notes: - - The HedString object parses its component tags and groups into a tree-like structure. - - """ - - if _contents is not None: - contents = _contents - else: - try: - contents = self.split_into_groups(hed_string, hed_schema, def_dict) - except ValueError: - contents = [] - super().__init__(hed_string, contents=contents, startpos=0, endpos=len(hed_string)) - self._schema = hed_schema - self._from_strings = None - self._def_dict = def_dict - - @classmethod - def from_hed_strings(cls, hed_strings): - """ Factory for creating HedStrings via combination. - - Parameters: - hed_strings (list or None): A list of HedString objects to combine. - This takes ownership of their children. - - Returns: - new_string(HedString): The newly combined HedString. - """ - if not hed_strings: - raise TypeError("Passed an empty list to from_hed_strings") - new_string = HedString.__new__(HedString) - hed_string = ",".join([group._hed_string for group in hed_strings]) - contents = [child for sub_string in hed_strings for child in sub_string.children] - first_schema = hed_strings[0]._schema - first_dict = hed_strings[0]._def_dict - new_string.__init__(hed_string=hed_string, _contents=contents, hed_schema=first_schema, def_dict=first_dict) - new_string._from_strings = hed_strings - return new_string - - @property - def is_group(self): - """ Always False since the underlying string is not a group with parentheses. """ - return False - - def _calculate_to_canonical_forms(self, hed_schema): - """ Identify all tags using the given schema. - - Parameters: - hed_schema (HedSchema, HedSchemaGroup): The schema to use to validate/convert tags. - - Returns: - list: A list of issues found while converting the string. Each issue is a dictionary. - - """ - validation_issues = [] - for tag in self.get_all_tags(): - validation_issues += tag._calculate_to_canonical_forms(hed_schema) - - return validation_issues - - def __deepcopy__(self, memo): - # check if the object has already been copied - if id(self) in memo: - return memo[id(self)] - - # create a new instance of HedString class, and direct copy all parameters - new_string = self.__class__.__new__(self.__class__) - new_string.__dict__.update(self.__dict__) - - # add the new object to the memo dictionary - memo[id(self)] = new_string - - # Deep copy the attributes that need it(most notably, we don't copy schema/schema entry) - new_string._original_children = copy.deepcopy(self._original_children, memo) - new_string._from_strings = copy.deepcopy(self._from_strings, memo) - new_string.children = copy.deepcopy(self.children, memo) - - return new_string - - def copy(self): - """ Return a deep copy of this string. - - Returns: - HedString: The copied group. - - """ - return_copy = copy.deepcopy(self) - return return_copy - - def remove_definitions(self): - """ Remove definition tags and groups from this string. - - This does not validate definitions and will blindly removing invalid ones as well. - """ - definition_groups = self.find_top_level_tags({DefTagNames.DEFINITION_KEY}, include_groups=1) - if definition_groups: - self.remove(definition_groups) - - def shrink_defs(self): - """ Replace def-expand tags with def tags. - - This does not validate them and will blindly shrink invalid ones as well. - - Returns: - self - """ - for def_expand_tag, def_expand_group in self.find_tags({DefTagNames.DEF_EXPAND_KEY}, recursive=True): - expanded_parent = def_expand_group._parent - if expanded_parent: - def_expand_tag.short_base_tag = DefTagNames.DEF_KEY - def_expand_tag._parent = expanded_parent - expanded_parent.replace(def_expand_group, def_expand_tag) - - return self - - def expand_defs(self): - """ Replace def tags with def-expand tags. - - This does very minimal validation. - - Returns: - self - """ - def_tags = self.find_def_tags(recursive=True, include_groups=0) - - replacements = [] - for tag in def_tags: - if tag.expandable and not tag.expanded: - replacements.append((tag, tag.expandable)) - - for tag, group in replacements: - tag_parent = tag._parent - tag_parent.replace(tag, group) - tag._parent = group - tag.short_base_tag = DefTagNames.DEF_EXPAND_KEY - - return self - - def get_as_original(self): - """ Return the original form of this string. - - Returns: - str: The string with all the tags in their original form. - - Notes: - Potentially with some extraneous spaces removed on returned string. - """ - return self.get_as_form("org_tag") - - @staticmethod - def split_into_groups(hed_string, hed_schema, def_dict=None): - """ Split the HED string into a parse tree. - - Parameters: - hed_string (str): A HED string consisting of tags and tag groups to be processed. - hed_schema (HedSchema): HED schema to use to identify tags. - def_dict(DefinitionDict): The definitions to identify. - Returns: - list: A list of HedTag and/or HedGroup. - - :raises ValueError: - - The string is significantly malformed, such as mismatched parentheses. - - Notes: - - The parse tree consists of tag groups, tags, and delimiters. - """ - current_tag_group = [[]] - - input_tags = HedString.split_hed_string(hed_string) - for is_hed_tag, (startpos, endpos) in input_tags: - if is_hed_tag: - new_tag = HedTag(hed_string, hed_schema, (startpos, endpos), def_dict) - current_tag_group[-1].append(new_tag) - else: - string_portion = hed_string[startpos:endpos] - delimiter_index = 0 - for i, char in enumerate(string_portion): - if not char.isspace(): - delimiter_index = i - break - - delimiter_char = string_portion[delimiter_index] - - if delimiter_char is HedString.OPENING_GROUP_CHARACTER: - current_tag_group.append(HedGroup(hed_string, startpos + delimiter_index)) - - if delimiter_char is HedString.CLOSING_GROUP_CHARACTER: - # Terminate existing group, and save it off. - paren_end = startpos + delimiter_index + 1 - - if len(current_tag_group) > 1: - new_group = current_tag_group.pop() - new_group._endpos = paren_end - - current_tag_group[-1].append(new_group) - else: - raise ValueError(f"Closing parentheses in hed string {hed_string}") - - # Comma delimiter issues are ignored and assumed already validated currently. - if len(current_tag_group) != 1: - raise ValueError(f"Unmatched opening parentheses in hed string {hed_string}") - - return current_tag_group[0] - - def _get_org_span(self, tag_or_group): - """ If this tag or group was in the original HED string, find its original span. - - Parameters: - tag_or_group (HedTag or HedGroup): The HED tag to locate in this string. - - Returns: - int or None: Starting position of the given item in the original string. - int or None: Ending position of the given item in the original string. - - Notes: - - If the HED tag or group was not in the original string, returns (None, None). - - """ - if self._from_strings: - return self._get_org_span_from_strings(tag_or_group) - - if self.check_if_in_original(tag_or_group): - return tag_or_group.span - - return None, None - - def _get_org_span_from_strings(self, tag_or_group): - """ A different case of the above, to handle if this was created from HED string objects.""" - found_string = None - string_start_index = 0 - for string in self._from_strings: - if string.check_if_in_original(tag_or_group): - found_string = string - break - # Add 1 for comma - string_start_index += string.span[1] + 1 - - if not found_string: - return None, None - - return tag_or_group.span[0] + string_start_index, tag_or_group.span[1] + string_start_index - - @staticmethod - def split_hed_string(hed_string): - """ Split a HED string into delimiters and tags. - - Parameters: - hed_string (str): The HED string to split. - - Returns: - list: A list of tuples where each tuple is (is_hed_tag, (start_pos, end_pos)). - - Notes: - - The tuple format is as follows - - is_hed_tag (bool): A (possible) HED tag if True, delimiter if not. - - start_pos (int): Index of start of string in hed_string. - - end_pos (int): Index of end of string in hed_string. - - - This function does not validate tags or delimiters in any form. - - """ - tag_delimiters = ",()" - current_spacing = 0 - found_symbol = True - result_positions = [] - tag_start_pos = None - last_end_pos = 0 - for i, char in enumerate(hed_string): - if char == " ": - current_spacing += 1 - continue - - if char in tag_delimiters: - if found_symbol: - if last_end_pos != i: - result_positions.append((False, (last_end_pos, i))) - last_end_pos = i - elif not found_symbol: - found_symbol = True - last_end_pos = i - current_spacing - result_positions.append((True, (tag_start_pos, last_end_pos))) - current_spacing = 0 - tag_start_pos = None - continue - - # If we have a current delimiter, end it here. - if found_symbol and last_end_pos is not None: - if last_end_pos != i: - result_positions.append((False, (last_end_pos, i))) - last_end_pos = None - - found_symbol = False - current_spacing = 0 - if tag_start_pos is None: - tag_start_pos = i - - if last_end_pos is not None and len(hed_string) != last_end_pos: - result_positions.append((False, (last_end_pos, len(hed_string)))) - if tag_start_pos is not None: - result_positions.append((True, (tag_start_pos, len(hed_string) - current_spacing))) - if current_spacing: - result_positions.append((False, (len(hed_string) - current_spacing, len(hed_string)))) - - return result_positions - - def validate(self, allow_placeholders=True, error_handler=None): - """ Validate the string using the schema. - - Parameters: - allow_placeholders(bool): Allow placeholders in the string. - error_handler(ErrorHandler or None): The error handler to use, creates a default one if none passed. - Returns: - issues (list of dict): A list of issues for HED string. - """ - from hed.validator import HedValidator - - validator = HedValidator(self._schema, def_dicts=self._def_dict) - return validator.validate(self, allow_placeholders=allow_placeholders, error_handler=error_handler) - - def find_top_level_tags(self, anchor_tags, include_groups=2): - """ Find top level groups with an anchor tag. - - A max of 1 tag located per top level group. - - Parameters: - anchor_tags (container): A list/set/etc. of short_base_tags to find groups by. - include_groups (0, 1 or 2): Parameter indicating what return values to include. - If 0: return only tags. - If 1: return only groups. - If 2 or any other value: return both. - Returns: - list: The returned result depends on include_groups. - """ - anchor_tags = {tag.casefold() for tag in anchor_tags} - top_level_tags = [] - for group in self.groups(): - for tag in group.tags(): - if tag.short_base_tag.casefold() in anchor_tags: - top_level_tags.append((tag, group)) - # Only capture a max of 1 per group. These are implicitly unique. - break - - if include_groups == 0 or include_groups == 1: - return [tag[include_groups] for tag in top_level_tags] - return top_level_tags - - def remove_refs(self): - """ Remove any refs(tags contained entirely inside curly braces) from the string. - - This does NOT validate the contents of the curly braces. This is only relevant when directly - editing sidecar strings. Tools will naturally ignore these. - """ - ref_tags = [tag for tag in self.get_all_tags() if tag.is_column_ref()] - if ref_tags: - self.remove(ref_tags) +""" A HED string with its schema and definitions. """ +import copy +from hed.models.hed_group import HedGroup +from hed.models.hed_tag import HedTag +from hed.models.model_constants import DefTagNames + + +class HedString(HedGroup): + """ A HED string with its schema and definitions. """ + + OPENING_GROUP_CHARACTER = '(' + CLOSING_GROUP_CHARACTER = ')' + + def __init__(self, hed_string, hed_schema, def_dict=None, _contents=None): + """ Constructor for the HedString class. + + Parameters: + hed_string (str): A HED string consisting of tags and tag groups. + hed_schema (HedSchema): The schema to use to identify tags. + def_dict(DefinitionDict or None): The def dict to use to identify def/def expand tags. + _contents ([HedGroup and/or HedTag] or None): Create a HedString from this exact list of children. + Does not make a copy. + Notes: + - The HedString object parses its component tags and groups into a tree-like structure. + + """ + + if _contents is not None: + contents = _contents + else: + try: + contents = self.split_into_groups(hed_string, hed_schema, def_dict) + except ValueError: + contents = [] + super().__init__(hed_string, contents=contents, startpos=0, endpos=len(hed_string)) + self._schema = hed_schema + self._from_strings = None + self._def_dict = def_dict + + @classmethod + def from_hed_strings(cls, hed_strings): + """ Factory for creating HedStrings via combination. + + Parameters: + hed_strings (list or None): A list of HedString objects to combine. + This takes ownership of their children. + + Returns: + new_string(HedString): The newly combined HedString. + """ + if not hed_strings: + raise TypeError("Passed an empty list to from_hed_strings") + new_string = HedString.__new__(HedString) + hed_string = ",".join([group._hed_string for group in hed_strings]) + contents = [child for sub_string in hed_strings for child in sub_string.children] + first_schema = hed_strings[0]._schema + first_dict = hed_strings[0]._def_dict + new_string.__init__(hed_string=hed_string, _contents=contents, hed_schema=first_schema, def_dict=first_dict) + new_string._from_strings = hed_strings + return new_string + + @property + def is_group(self): + """ Always False since the underlying string is not a group with parentheses. """ + return False + + def _calculate_to_canonical_forms(self, hed_schema): + """ Identify all tags using the given schema. + + Parameters: + hed_schema (HedSchema, HedSchemaGroup): The schema to use to validate/convert tags. + + Returns: + list: A list of issues found while converting the string. Each issue is a dictionary. + + """ + validation_issues = [] + for tag in self.get_all_tags(): + validation_issues += tag._calculate_to_canonical_forms(hed_schema) + + return validation_issues + + def __deepcopy__(self, memo): + # check if the object has already been copied + if id(self) in memo: + return memo[id(self)] + + # create a new instance of HedString class, and direct copy all parameters + new_string = self.__class__.__new__(self.__class__) + new_string.__dict__.update(self.__dict__) + + # add the new object to the memo dictionary + memo[id(self)] = new_string + + # Deep copy the attributes that need it(most notably, we don't copy schema/schema entry) + new_string._original_children = copy.deepcopy(self._original_children, memo) + new_string._from_strings = copy.deepcopy(self._from_strings, memo) + new_string.children = copy.deepcopy(self.children, memo) + + return new_string + + def copy(self): + """ Return a deep copy of this string. + + Returns: + HedString: The copied group. + + """ + return_copy = copy.deepcopy(self) + return return_copy + + def remove_definitions(self): + """ Remove definition tags and groups from this string. + + This does not validate definitions and will blindly removing invalid ones as well. + """ + definition_groups = self.find_top_level_tags({DefTagNames.DEFINITION_KEY}, include_groups=1) + if definition_groups: + self.remove(definition_groups) + + def shrink_defs(self): + """ Replace def-expand tags with def tags. + + This does not validate them and will blindly shrink invalid ones as well. + + Returns: + self + """ + for def_expand_tag, def_expand_group in self.find_tags({DefTagNames.DEF_EXPAND_KEY}, recursive=True): + expanded_parent = def_expand_group._parent + if expanded_parent: + def_expand_tag.short_base_tag = DefTagNames.DEF_KEY + def_expand_tag._parent = expanded_parent + expanded_parent.replace(def_expand_group, def_expand_tag) + + return self + + def expand_defs(self): + """ Replace def tags with def-expand tags. + + This does very minimal validation. + + Returns: + self + """ + def_tags = self.find_def_tags(recursive=True, include_groups=0) + + replacements = [] + for tag in def_tags: + if tag.expandable and not tag.expanded: + replacements.append((tag, tag.expandable)) + + for tag, group in replacements: + tag_parent = tag._parent + tag_parent.replace(tag, group) + tag._parent = group + tag.short_base_tag = DefTagNames.DEF_EXPAND_KEY + + return self + + def get_as_original(self): + """ Return the original form of this string. + + Returns: + str: The string with all the tags in their original form. + + Notes: + Potentially with some extraneous spaces removed on returned string. + """ + return self.get_as_form("org_tag") + + @staticmethod + def split_into_groups(hed_string, hed_schema, def_dict=None): + """ Split the HED string into a parse tree. + + Parameters: + hed_string (str): A HED string consisting of tags and tag groups to be processed. + hed_schema (HedSchema): HED schema to use to identify tags. + def_dict(DefinitionDict): The definitions to identify. + Returns: + list: A list of HedTag and/or HedGroup. + + :raises ValueError: + - The string is significantly malformed, such as mismatched parentheses. + + Notes: + - The parse tree consists of tag groups, tags, and delimiters. + """ + current_tag_group = [[]] + + input_tags = HedString.split_hed_string(hed_string) + for is_hed_tag, (startpos, endpos) in input_tags: + if is_hed_tag: + new_tag = HedTag(hed_string, hed_schema, (startpos, endpos), def_dict) + current_tag_group[-1].append(new_tag) + else: + string_portion = hed_string[startpos:endpos] + delimiter_index = 0 + for i, char in enumerate(string_portion): + if not char.isspace(): + delimiter_index = i + break + + delimiter_char = string_portion[delimiter_index] + + if delimiter_char is HedString.OPENING_GROUP_CHARACTER: + current_tag_group.append(HedGroup(hed_string, startpos + delimiter_index)) + + if delimiter_char is HedString.CLOSING_GROUP_CHARACTER: + # Terminate existing group, and save it off. + paren_end = startpos + delimiter_index + 1 + + if len(current_tag_group) > 1: + new_group = current_tag_group.pop() + new_group._endpos = paren_end + + current_tag_group[-1].append(new_group) + else: + raise ValueError(f"Closing parentheses in HED string {hed_string}") + + # Comma delimiter issues are ignored and assumed already validated currently. + if len(current_tag_group) != 1: + raise ValueError(f"Unmatched opening parentheses in HED string {hed_string}") + + return current_tag_group[0] + + def _get_org_span(self, tag_or_group): + """ If this tag or group was in the original HED string, find its original span. + + Parameters: + tag_or_group (HedTag or HedGroup): The HED tag to locate in this string. + + Returns: + int or None: Starting position of the given item in the original string. + int or None: Ending position of the given item in the original string. + + Notes: + - If the HED tag or group was not in the original string, returns (None, None). + + """ + if self._from_strings: + return self._get_org_span_from_strings(tag_or_group) + + if self.check_if_in_original(tag_or_group): + return tag_or_group.span + + return None, None + + def _get_org_span_from_strings(self, tag_or_group): + """ A different case of the above, to handle if this was created from HED string objects.""" + found_string = None + string_start_index = 0 + for string in self._from_strings: + if string.check_if_in_original(tag_or_group): + found_string = string + break + # Add 1 for comma + string_start_index += string.span[1] + 1 + + if not found_string: + return None, None + + return tag_or_group.span[0] + string_start_index, tag_or_group.span[1] + string_start_index + + @staticmethod + def split_hed_string(hed_string): + """ Split a HED string into delimiters and tags. + + Parameters: + hed_string (str): The HED string to split. + + Returns: + list: A list of tuples where each tuple is (is_hed_tag, (start_pos, end_pos)). + + Notes: + - The tuple format is as follows + - is_hed_tag (bool): A (possible) HED tag if True, delimiter if not. + - start_pos (int): Index of start of string in hed_string. + - end_pos (int): Index of end of string in hed_string. + + - This function does not validate tags or delimiters in any form. + + """ + tag_delimiters = ",()" + current_spacing = 0 + found_symbol = True + result_positions = [] + tag_start_pos = None + last_end_pos = 0 + for i, char in enumerate(hed_string): + if char == " ": + current_spacing += 1 + continue + + if char in tag_delimiters: + if found_symbol: + if last_end_pos != i: + result_positions.append((False, (last_end_pos, i))) + last_end_pos = i + elif not found_symbol: + found_symbol = True + last_end_pos = i - current_spacing + result_positions.append((True, (tag_start_pos, last_end_pos))) + current_spacing = 0 + tag_start_pos = None + continue + + # If we have a current delimiter, end it here. + if found_symbol and last_end_pos is not None: + if last_end_pos != i: + result_positions.append((False, (last_end_pos, i))) + last_end_pos = None + + found_symbol = False + current_spacing = 0 + if tag_start_pos is None: + tag_start_pos = i + + if last_end_pos is not None and len(hed_string) != last_end_pos: + result_positions.append((False, (last_end_pos, len(hed_string)))) + if tag_start_pos is not None: + result_positions.append((True, (tag_start_pos, len(hed_string) - current_spacing))) + if current_spacing: + result_positions.append((False, (len(hed_string) - current_spacing, len(hed_string)))) + + return result_positions + + def validate(self, allow_placeholders=True, error_handler=None): + """ Validate the string using the schema. + + Parameters: + allow_placeholders(bool): Allow placeholders in the string. + error_handler(ErrorHandler or None): The error handler to use, creates a default one if none passed. + Returns: + issues (list of dict): A list of issues for HED string. + """ + from hed.validator import HedValidator + + validator = HedValidator(self._schema, def_dicts=self._def_dict) + return validator.validate(self, allow_placeholders=allow_placeholders, error_handler=error_handler) + + def find_top_level_tags(self, anchor_tags, include_groups=2): + """ Find top level groups with an anchor tag. + + A max of 1 tag located per top level group. + + Parameters: + anchor_tags (container): A list/set/etc. of short_base_tags to find groups by. + include_groups (0, 1 or 2): Parameter indicating what return values to include. + If 0: return only tags. + If 1: return only groups. + If 2 or any other value: return both. + Returns: + list: The returned result depends on include_groups. + """ + anchor_tags = {tag.casefold() for tag in anchor_tags} + top_level_tags = [] + for group in self.groups(): + for tag in group.tags(): + if tag.short_base_tag.casefold() in anchor_tags: + top_level_tags.append((tag, group)) + # Only capture a max of 1 per group. These are implicitly unique. + break + + if include_groups == 0 or include_groups == 1: + return [tag[include_groups] for tag in top_level_tags] + return top_level_tags + + def remove_refs(self): + """ Remove any refs(tags contained entirely inside curly braces) from the string. + + This does NOT validate the contents of the curly braces. This is only relevant when directly + editing sidecar strings. Tools will naturally ignore these. + """ + ref_tags = [tag for tag in self.get_all_tags() if tag.is_column_ref()] + if ref_tags: + self.remove(ref_tags) diff --git a/hed/models/query_handler.py b/hed/models/query_handler.py index e1c65a41f..c5a77f3d0 100644 --- a/hed/models/query_handler.py +++ b/hed/models/query_handler.py @@ -1,182 +1,182 @@ -""" Holder for and manipulation of search results. """ -import re - -from hed.models.query_expressions import Expression, ExpressionAnd, ExpressionWildcardNew, ExpressionOr, \ - ExpressionNegation, ExpressionDescendantGroup, ExpressionExactMatch -from hed.models.query_util import Token - - -class QueryHandler: - """Parse a search expression into a form than can be used to search a HED string.""" - - def __init__(self, expression_string): - """Compiles a QueryHandler for a particular expression, so it can be used to search hed strings. - - Basic Input Examples: - - 'Event' - Finds any strings with Event, or a descendent tag of Event such as Sensory-event. - - 'Event && Action' - Find any strings with Event and Action, including descendant tags. - - 'Event || Action' - Same as above, but it has either. - - '"Event"' - Finds the Event tag, but not any descendent tags. - - `Def/DefName/*` - Find Def/DefName instances with placeholders, regardless of the value of the placeholder. - - 'Eve*' - Find any short tags that begin with Eve*, such as Event, but not Sensory-event. - - '[Event && Action]' - Find a group that contains both Event and Action(at any level). - - '{Event && Action}' - Find a group with Event And Action at the same level. - - '{Event && Action:}' - Find a group with Event And Action at the same level, and nothing else. - - '{Event && Action:Agent}' - Find a group with Event And Action at the same level, and optionally an Agent tag. - - Practical Complex Example: - - {(Onset || Offset), (Def || {Def-expand}): ???} - A group with an onset tag, - a def tag or def-expand group, and an optional wildcard group - - Parameters: - expression_string(str): The query string. - """ - self.tokens = [] - self.at_token = -1 - self.tree = self._parse(expression_string.casefold()) - self._org_string = expression_string - - def search(self, hed_string_obj): - """Returns if a match is found in the given string - - Parameters: - hed_string_obj (HedString): String to search - - Returns: - list(SearchResult): Generally you should just treat this as a bool - True if a match was found. - """ - current_node = self.tree - - result = current_node.handle_expr(hed_string_obj) - return result - - def __str__(self): - return str(self.tree) - - def _get_next_token(self): - """Returns the current token and advances the counter""" - self.at_token += 1 - if self.at_token >= len(self.tokens): - raise ValueError("Parse error in get next token") - return self.tokens[self.at_token] - - def _next_token_is(self, kinds): - """Returns the current token if it matches kinds, and advances the counter""" - if self.at_token + 1 >= len(self.tokens): - return None - if self.tokens[self.at_token + 1].kind in kinds: - return self._get_next_token() - return None - - def _parse(self, expression_string): - """Parse the string and build an expression tree""" - self.tokens = self._tokenize(expression_string) - - expr = self._handle_or_op() - - if self.at_token + 1 != len(self.tokens): - raise ValueError("Parse error in search string") - - return expr - - @staticmethod - def _tokenize(expression_string): - """Tokenize the expression string into a list""" - grouping_re = r"\[\[|\[|\]\]|\]|}|{|:" - paren_re = r"\)|\(|~" - word_re = r"\?+|\&\&|\|\||,|[\"_\-a-zA-Z0-9/.^#\*@]+" - re_string = fr"({grouping_re}|{paren_re}|{word_re})" - token_re = re.compile(re_string) - - tokens = token_re.findall(expression_string) - tokens = [Token(token) for token in tokens] - - return tokens - - def _handle_and_op(self): - expr = self._handle_negation() - next_token = self._next_token_is([Token.And]) - while next_token: - right = self._handle_negation() - if next_token.kind == Token.And: - expr = ExpressionAnd(next_token, expr, right) - next_token = self._next_token_is([Token.And]) - return expr - - def _handle_or_op(self): - expr = self._handle_and_op() - next_token = self._next_token_is([Token.Or]) - while next_token: - right = self._handle_and_op() - if next_token.kind == Token.Or: - expr = ExpressionOr(next_token, expr, right) - next_token = self._next_token_is([Token.Or]) - return expr - - def _handle_negation(self): - next_token = self._next_token_is([Token.LogicalNegation]) - if next_token == Token.LogicalNegation: - interior = self._handle_grouping_op() - if "?" in str(interior): - raise ValueError("Cannot negate wildcards, or expressions that contain wildcards." - "Use {required_expression : optional_expression}.") - expr = ExpressionNegation(next_token, right=interior) - return expr - else: - return self._handle_grouping_op() - - def _handle_grouping_op(self): - next_token = self._next_token_is( - [Token.LogicalGroup, Token.DescendantGroup, Token.ExactMatch]) - if next_token == Token.LogicalGroup: - expr = self._handle_or_op() - next_token = self._next_token_is([Token.LogicalGroupEnd]) - if next_token != Token.LogicalGroupEnd: - raise ValueError("Parse error: Missing closing paren") - elif next_token == Token.DescendantGroup: - interior = self._handle_or_op() - expr = ExpressionDescendantGroup(next_token, right=interior) - next_token = self._next_token_is([Token.DescendantGroupEnd]) - if next_token != Token.DescendantGroupEnd: - raise ValueError("Parse error: Missing closing square bracket") - elif next_token == Token.ExactMatch: - interior = self._handle_or_op() - expr = ExpressionExactMatch(next_token, right=interior) - next_token = self._next_token_is([Token.ExactMatchEnd, Token.ExactMatchOptional]) - if next_token == Token.ExactMatchOptional: - # We have an optional portion - this needs to now be an exact match - expr.optional = "none" - next_token = self._next_token_is([Token.ExactMatchEnd]) - if next_token != Token.ExactMatchEnd: - optional_portion = self._handle_or_op() - expr.left = optional_portion - next_token = self._next_token_is([Token.ExactMatchEnd]) - if "~" in str(expr): - raise ValueError("Cannot use negation in exact matching groups," - " as it's not clear what is being matched.\n" - "{thing and ~(expression)} is allowed.") - - if next_token is None: - raise ValueError("Parse error: Missing closing curly bracket") - else: - next_token = self._get_next_token() - if next_token and next_token.kind == Token.Wildcard: - expr = ExpressionWildcardNew(next_token) - elif next_token: - expr = Expression(next_token) - else: - expr = None - - return expr +""" Holder for and manipulation of search results. """ +import re + +from hed.models.query_expressions import Expression, ExpressionAnd, ExpressionWildcardNew, ExpressionOr, \ + ExpressionNegation, ExpressionDescendantGroup, ExpressionExactMatch +from hed.models.query_util import Token + + +class QueryHandler: + """Parse a search expression into a form than can be used to search a HED string.""" + + def __init__(self, expression_string): + """Compiles a QueryHandler for a particular expression, so it can be used to search HED strings. + + Basic Input Examples: + + 'Event' - Finds any strings with Event, or a descendent tag of Event such as Sensory-event. + + 'Event && Action' - Find any strings with Event and Action, including descendant tags. + + 'Event || Action' - Same as above, but it has either. + + '"Event"' - Finds the Event tag, but not any descendent tags. + + `Def/DefName/*` - Find Def/DefName instances with placeholders, regardless of the value of the placeholder. + + 'Eve*' - Find any short tags that begin with Eve*, such as Event, but not Sensory-event. + + '[Event && Action]' - Find a group that contains both Event and Action(at any level). + + '{Event && Action}' - Find a group with Event And Action at the same level. + + '{Event && Action:}' - Find a group with Event And Action at the same level, and nothing else. + + '{Event && Action:Agent}' - Find a group with Event And Action at the same level, and optionally an Agent tag. + + Practical Complex Example: + + {(Onset || Offset), (Def || {Def-expand}): ???} - A group with an onset tag, + a def tag or def-expand group, and an optional wildcard group + + Parameters: + expression_string(str): The query string. + """ + self.tokens = [] + self.at_token = -1 + self.tree = self._parse(expression_string.casefold()) + self._org_string = expression_string + + def search(self, hed_string_obj): + """Returns if a match is found in the given string + + Parameters: + hed_string_obj (HedString): String to search + + Returns: + list(SearchResult): Generally you should just treat this as a bool + True if a match was found. + """ + current_node = self.tree + + result = current_node.handle_expr(hed_string_obj) + return result + + def __str__(self): + return str(self.tree) + + def _get_next_token(self): + """Returns the current token and advances the counter""" + self.at_token += 1 + if self.at_token >= len(self.tokens): + raise ValueError("Parse error in get next token") + return self.tokens[self.at_token] + + def _next_token_is(self, kinds): + """Returns the current token if it matches kinds, and advances the counter""" + if self.at_token + 1 >= len(self.tokens): + return None + if self.tokens[self.at_token + 1].kind in kinds: + return self._get_next_token() + return None + + def _parse(self, expression_string): + """Parse the string and build an expression tree""" + self.tokens = self._tokenize(expression_string) + + expr = self._handle_or_op() + + if self.at_token + 1 != len(self.tokens): + raise ValueError("Parse error in search string") + + return expr + + @staticmethod + def _tokenize(expression_string): + """Tokenize the expression string into a list""" + grouping_re = r"\[\[|\[|\]\]|\]|}|{|:" + paren_re = r"\)|\(|~" + word_re = r"\?+|\&\&|\|\||,|[\"_\-a-zA-Z0-9/.^#\*@]+" + re_string = fr"({grouping_re}|{paren_re}|{word_re})" + token_re = re.compile(re_string) + + tokens = token_re.findall(expression_string) + tokens = [Token(token) for token in tokens] + + return tokens + + def _handle_and_op(self): + expr = self._handle_negation() + next_token = self._next_token_is([Token.And]) + while next_token: + right = self._handle_negation() + if next_token.kind == Token.And: + expr = ExpressionAnd(next_token, expr, right) + next_token = self._next_token_is([Token.And]) + return expr + + def _handle_or_op(self): + expr = self._handle_and_op() + next_token = self._next_token_is([Token.Or]) + while next_token: + right = self._handle_and_op() + if next_token.kind == Token.Or: + expr = ExpressionOr(next_token, expr, right) + next_token = self._next_token_is([Token.Or]) + return expr + + def _handle_negation(self): + next_token = self._next_token_is([Token.LogicalNegation]) + if next_token == Token.LogicalNegation: + interior = self._handle_grouping_op() + if "?" in str(interior): + raise ValueError("Cannot negate wildcards, or expressions that contain wildcards." + "Use {required_expression : optional_expression}.") + expr = ExpressionNegation(next_token, right=interior) + return expr + else: + return self._handle_grouping_op() + + def _handle_grouping_op(self): + next_token = self._next_token_is( + [Token.LogicalGroup, Token.DescendantGroup, Token.ExactMatch]) + if next_token == Token.LogicalGroup: + expr = self._handle_or_op() + next_token = self._next_token_is([Token.LogicalGroupEnd]) + if next_token != Token.LogicalGroupEnd: + raise ValueError("Parse error: Missing closing paren") + elif next_token == Token.DescendantGroup: + interior = self._handle_or_op() + expr = ExpressionDescendantGroup(next_token, right=interior) + next_token = self._next_token_is([Token.DescendantGroupEnd]) + if next_token != Token.DescendantGroupEnd: + raise ValueError("Parse error: Missing closing square bracket") + elif next_token == Token.ExactMatch: + interior = self._handle_or_op() + expr = ExpressionExactMatch(next_token, right=interior) + next_token = self._next_token_is([Token.ExactMatchEnd, Token.ExactMatchOptional]) + if next_token == Token.ExactMatchOptional: + # We have an optional portion - this needs to now be an exact match + expr.optional = "none" + next_token = self._next_token_is([Token.ExactMatchEnd]) + if next_token != Token.ExactMatchEnd: + optional_portion = self._handle_or_op() + expr.left = optional_portion + next_token = self._next_token_is([Token.ExactMatchEnd]) + if "~" in str(expr): + raise ValueError("Cannot use negation in exact matching groups," + " as it's not clear what is being matched.\n" + "{thing and ~(expression)} is allowed.") + + if next_token is None: + raise ValueError("Parse error: Missing closing curly bracket") + else: + next_token = self._get_next_token() + if next_token and next_token.kind == Token.Wildcard: + expr = ExpressionWildcardNew(next_token) + elif next_token: + expr = Expression(next_token) + else: + expr = None + + return expr diff --git a/hed/schema/hed_cache.py b/hed/schema/hed_cache.py index e25c7da1d..eab0a9085 100644 --- a/hed/schema/hed_cache.py +++ b/hed/schema/hed_cache.py @@ -23,7 +23,7 @@ r"(?:\.(?:0|[1-9]\d*|\d*[a-zA-Z-][0-9a-zA-Z-]*))*))?" HED_VERSION_P3 = r"(?:\+(?P[0-9a-zA-Z-]+(?:\.[0-9a-zA-Z-]+)*))?" HED_VERSION = HED_VERSION_P1 + HED_VERSION_P2 + HED_VERSION_P3 -# Actual local hed filename re. +# Actual local HED filename re. HED_VERSION_FINAL = r'^[hH][eE][dD](_([a-z0-9]+)_)?(' + HED_VERSION + r')\.[xX][mM][lL]$' HED_XML_PREFIX = 'HED' @@ -48,7 +48,7 @@ def set_cache_directory(new_cache_dir): - """ Set default global hed cache directory. + """ Set default global HED cache directory. Parameters: new_cache_dir (str): Directory to check for versions. @@ -66,7 +66,7 @@ def get_cache_directory(): def get_hed_versions(local_hed_directory=None, library_name=None, check_prerelease=False): - """ Get the HED versions in the hed directory. + """ Get the HED versions in the HED directory. Parameters: local_hed_directory (str): Directory to check for versions which defaults to hed_cache. @@ -126,10 +126,10 @@ def get_hed_version_path(xml_version, library_name=None, local_hed_directory=Non Parameters: library_name (str or None): Optional the schema library name. xml_version (str): Returns this version if it exists - local_hed_directory (str): Path to local hed directory. Defaults to HED_CACHE_DIRECTORY + local_hed_directory (str): Path to local HED directory. Defaults to HED_CACHE_DIRECTORY check_prerelease(bool): Also check for prerelease schemas Returns: - str: The path to the latest HED version the hed directory. + str: The path to the latest HED version the HED directory. """ if not local_hed_directory: @@ -143,7 +143,7 @@ def get_hed_version_path(xml_version, library_name=None, local_hed_directory=Non def cache_local_versions(cache_folder): - """ Cache all schemas included with the hed installation. + """ Cache all schemas included with the HED installation. Parameters: cache_folder (str): The folder holding the cache. @@ -441,7 +441,7 @@ def _safe_move_tmp_to_folder(temp_hed_xml_file, dest_filename): def _cache_hed_version(version, library_name, version_info, cache_folder): - """Cache the given hed version""" + """Cache the given HED version""" sha_hash, download_url, prerelease = version_info possible_cache_filename = _create_xml_filename(version, library_name, cache_folder, prerelease) diff --git a/hed/schema/hed_schema_io.py b/hed/schema/hed_schema_io.py index 478c51c30..87f0c3e65 100644 --- a/hed/schema/hed_schema_io.py +++ b/hed/schema/hed_schema_io.py @@ -67,7 +67,7 @@ def load_schema(hed_path, schema_namespace=None, schema=None, name=None): Template: basename.tsv, where files are named basename_Struct.tsv, basename_Tag.tsv, etc. Alternatively, you can point to a directory containing the .tsv files. schema_namespace (str or None): The name_prefix all tags in this schema will accept. - schema(HedSchema or None): A hed schema to merge this new file into + schema(HedSchema or None): A HED schema to merge this new file into It must be a with-standard schema with the same value. name(str or None): User supplied identifier for this schema @@ -117,7 +117,7 @@ def from_string(schema_string, schema_format=".xml", schema_namespace=None, sche schema_format (str): The schema format of the source schema string. Allowed normal values: .mediawiki, .xml schema_namespace (str, None): The name_prefix all tags in this schema will accept. - schema(HedSchema or None): A hed schema to merge this new file into + schema(HedSchema or None): A HED schema to merge this new file into It must be a with-standard schema with the same value. name(str or None): User supplied identifier for this schema @@ -207,7 +207,7 @@ def parse_version_list(xml_version_list): e.g. ["score", "testlib", "ol:otherlib"] will return {"": "score, testlib", "ol:": "otherlib"} Parameters: - xml_version_list (list): List of str specifying which hed schemas to use + xml_version_list (list): List of str specifying which HED schemas to use Returns: HedSchema or HedSchemaGroup: The schema or schema group extracted. @@ -297,7 +297,7 @@ def _load_schema_version_sub(xml_version, schema_namespace="", xml_folder=None, xml_version (str): HED version format string. Expected format: '[schema_namespace:][library_name_]X.Y.Z' schema_namespace(str): Namespace to add this schema to, default none xml_folder (str): Path to a folder containing schema. - schema(HedSchema or None): A hed schema to merge this new file into + schema(HedSchema or None): A HED schema to merge this new file into It must be a with-standard schema with the same value. Returns: diff --git a/hed/schema/schema_io/base2schema.py b/hed/schema/schema_io/base2schema.py index 2aebb055e..87d6bca9a 100644 --- a/hed/schema/schema_io/base2schema.py +++ b/hed/schema/schema_io/base2schema.py @@ -1,216 +1,216 @@ -import copy - -from hed.schema.schema_io import schema_util -from hed.errors.exceptions import HedFileError, HedExceptions - -from hed.schema.hed_schema import HedSchema -from hed.schema import hed_schema_constants as constants -from hed.schema.hed_schema_constants import HedKey -from abc import abstractmethod, ABC -from hed.schema import schema_header_util -from hed.schema import hed_schema_constants - - -class SchemaLoader(ABC): - """ Baseclass for schema loading, to handle basic errors and partnered schemas - - Expected usage is SchemaLoaderXML.load(filename) - - SchemaLoaderXML(filename) will load just the header_attributes - """ - def __init__(self, filename, schema_as_string=None, schema=None, file_format=None, name=""): - """Loads the given schema from one of the two parameters. - - Parameters: - filename(str or None): A valid filepath or None - schema_as_string(str or None): A full schema as text or None - schema(HedSchema or None): A hed schema to merge this new file into - It must be a with-standard schema with the same value. - file_format(str or None): The format of this file if needed(only for owl currently) - name(str or None): Optional user supplied identifier, by default uses filename - """ - if schema_as_string and filename: - raise HedFileError(HedExceptions.BAD_PARAMETERS, "Invalid parameters to schema creation.", - filename) - self.file_format = file_format - self.filename = filename - self.name = name if name else filename - self.schema_as_string = schema_as_string - self.appending_to_schema = False - try: - self.input_data = self._open_file() - except OSError as e: - raise HedFileError(HedExceptions.FILE_NOT_FOUND, e.strerror, self.name) - except TypeError as e: - raise HedFileError(HedExceptions.FILE_NOT_FOUND, str(e), self.name) - except ValueError as e: - raise HedFileError(HedExceptions.FILE_NOT_FOUND, str(e), self.name) - - # self._schema.filename = filename - hed_attributes = self._get_header_attributes(self.input_data) - schema_header_util.validate_attributes(hed_attributes, name=self.name) - - with_standard = hed_attributes.get(hed_schema_constants.WITH_STANDARD_ATTRIBUTE, "") - self.library = hed_attributes.get(hed_schema_constants.LIBRARY_ATTRIBUTE, "") - version_number = hed_attributes.get(hed_schema_constants.VERSION_ATTRIBUTE, "") - if not schema: - self._schema = HedSchema() - else: - self._schema = schema - self.appending_to_schema = True - if not self._schema.with_standard: - raise HedFileError(HedExceptions.SCHEMA_DUPLICATE_PREFIX, - "Loading multiple normal schemas as a merged one with the same namespace. " - "Ensure schemas have the withStandard header attribute set", - self.name) - elif with_standard != self._schema.with_standard: - raise HedFileError(HedExceptions.BAD_WITH_STANDARD_MULTIPLE_VALUES, - "Merging schemas requires same withStandard value.", - self.name) - hed_attributes[hed_schema_constants.VERSION_ATTRIBUTE] = self._schema.version_number + f",{version_number}" - hed_attributes[hed_schema_constants.LIBRARY_ATTRIBUTE] = self._schema.library + f",{self.library}" - if name: - self._schema.name = name - self._schema.filename = filename - self._schema.header_attributes = hed_attributes - self._loading_merged = False - self.fatal_errors = [] - - @property - def schema(self): - """ The partially loaded schema if you are after just header attributes.""" - return self._schema - - @classmethod - def load(cls, filename=None, schema_as_string=None, schema=None, file_format=None, name=""): - """ Loads and returns the schema, including partnered schema if applicable. - - Parameters: - filename(str or None): A valid filepath or None - schema_as_string(str or None): A full schema as text or None - schema(HedSchema or None): A hed schema to merge this new file into - It must be a with-standard schema with the same value. - file_format(str or None): If this is an owl file being loaded, this is the format. - Allowed values include: turtle, json-ld, and owl(xml) - name(str or None): Optional user supplied identifier, by default uses filename - Returns: - schema(HedSchema): The new schema - """ - loader = cls(filename, schema_as_string, schema, file_format, name) - return loader._load() - - def _load(self): - """ Parses the previously loaded data, including loading a partnered schema if needed. - - Returns: - schema(HedSchema): The new schema - """ - self._loading_merged = True - # Do a full load of the standard schema if this is a partnered schema - if not self.appending_to_schema and self._schema.with_standard and not self._schema.merged: - from hed.schema.hed_schema_io import load_schema_version - saved_attr = self._schema.header_attributes - saved_format = self._schema.source_format - try: - base_version = load_schema_version(self._schema.with_standard) - except HedFileError as e: - raise HedFileError(HedExceptions.BAD_WITH_STANDARD, - message=f"Cannot load withStandard schema '{self._schema.with_standard}'", - filename=e.filename) - # Copy the non-alterable cached schema - self._schema = copy.deepcopy(base_version) - self._schema.filename = self.filename - self._schema.name = self.name # Manually set name here as we don't want to pass it to load_schema_version - self._schema.header_attributes = saved_attr - self._schema.source_format = saved_format - self._loading_merged = False - - self._parse_data() - self._schema.finalize_dictionaries() - - return self._schema - - @abstractmethod - def _open_file(self): - """Overloaded versions should retrieve the input from filename/schema_as_string""" - pass - - @abstractmethod - def _get_header_attributes(self, input_data): - """Overloaded versions should return the header attributes from the input data.""" - pass - - @abstractmethod - def _parse_data(self): - """Puts the input data into the new schema""" - pass - - def _add_to_dict_base(self, entry, key_class): - if not entry.has_attribute(HedKey.InLibrary) and self.appending_to_schema and self._schema.merged: - return None - - if self.library and ( - not self._schema.with_standard or (not self._schema.merged and self._schema.with_standard)): - # only add it if not already present - This is a rare case - if not entry.has_attribute(HedKey.InLibrary): - entry._set_attribute_value(HedKey.InLibrary, self.library) - - return self._schema._add_tag_to_dict(entry.name, entry, key_class) - - @staticmethod - def find_rooted_entry(tag_entry, schema, loading_merged): - """ This semi-validates rooted tags, raising an exception on major errors - - Parameters: - tag_entry(HedTagEntry): the possibly rooted tag - schema(HedSchema): The schema being loaded - loading_merged(bool): If this schema was already merged before loading - - Returns: - rooted_tag(HedTagEntry or None): The base tag entry from the standard schema - Returns None if this tag isn't rooted - - :raises HedFileError: - - A rooted attribute is found in a non-paired schema - - A rooted attribute is not a string - - A rooted attribute was found on a non-root node in an unmerged schema. - - A rooted attribute is found on a root node in a merged schema. - - A rooted attribute indicates a tag that doesn't exist in the base schema. - """ - rooted_tag = tag_entry.has_attribute(constants.HedKey.Rooted, return_value=True) - if rooted_tag is not None: - if not schema.with_standard: - raise HedFileError(HedExceptions.ROOTED_TAG_INVALID, - f"Rooted tag attribute found on '{tag_entry.short_tag_name}' in a standard schema.", - schema.name) - - if not isinstance(rooted_tag, str): - raise HedFileError(HedExceptions.ROOTED_TAG_INVALID, - f'Rooted tag \'{tag_entry.short_tag_name}\' is not a string."', - schema.name) - - if tag_entry.parent_name and not loading_merged: - raise HedFileError(HedExceptions.ROOTED_TAG_INVALID, - f'Found rooted tag \'{tag_entry.short_tag_name}\' as a non root node.', - schema.name) - - if not tag_entry.parent_name and loading_merged: - raise HedFileError(HedExceptions.ROOTED_TAG_INVALID, - f'Found rooted tag \'{tag_entry.short_tag_name}\' as a root node in a merged schema.', - schema.name) - - rooted_entry = schema.tags.get(rooted_tag) - if not rooted_entry or rooted_entry.has_attribute(constants.HedKey.InLibrary): - raise HedFileError(HedExceptions.ROOTED_TAG_DOES_NOT_EXIST, - f"Rooted tag '{tag_entry.short_tag_name}' not found in paired standard schema", - schema.name) - - if loading_merged: - return None - - return rooted_entry - - def _add_fatal_error(self, line_number, line, warning_message="Schema term is empty or the line is malformed", - error_code=HedExceptions.WIKI_DELIMITERS_INVALID): - - self.fatal_errors += schema_util.format_error(line_number, line, warning_message, error_code) +import copy + +from hed.schema.schema_io import schema_util +from hed.errors.exceptions import HedFileError, HedExceptions + +from hed.schema.hed_schema import HedSchema +from hed.schema import hed_schema_constants as constants +from hed.schema.hed_schema_constants import HedKey +from abc import abstractmethod, ABC +from hed.schema import schema_header_util +from hed.schema import hed_schema_constants + + +class SchemaLoader(ABC): + """ Baseclass for schema loading, to handle basic errors and partnered schemas + + Expected usage is SchemaLoaderXML.load(filename) + + SchemaLoaderXML(filename) will load just the header_attributes + """ + def __init__(self, filename, schema_as_string=None, schema=None, file_format=None, name=""): + """Loads the given schema from one of the two parameters. + + Parameters: + filename(str or None): A valid filepath or None + schema_as_string(str or None): A full schema as text or None + schema(HedSchema or None): A HED schema to merge this new file into + It must be a with-standard schema with the same value. + file_format(str or None): The format of this file if needed(only for owl currently) + name(str or None): Optional user supplied identifier, by default uses filename + """ + if schema_as_string and filename: + raise HedFileError(HedExceptions.BAD_PARAMETERS, "Invalid parameters to schema creation.", + filename) + self.file_format = file_format + self.filename = filename + self.name = name if name else filename + self.schema_as_string = schema_as_string + self.appending_to_schema = False + try: + self.input_data = self._open_file() + except OSError as e: + raise HedFileError(HedExceptions.FILE_NOT_FOUND, e.strerror, self.name) + except TypeError as e: + raise HedFileError(HedExceptions.FILE_NOT_FOUND, str(e), self.name) + except ValueError as e: + raise HedFileError(HedExceptions.FILE_NOT_FOUND, str(e), self.name) + + # self._schema.filename = filename + hed_attributes = self._get_header_attributes(self.input_data) + schema_header_util.validate_attributes(hed_attributes, name=self.name) + + with_standard = hed_attributes.get(hed_schema_constants.WITH_STANDARD_ATTRIBUTE, "") + self.library = hed_attributes.get(hed_schema_constants.LIBRARY_ATTRIBUTE, "") + version_number = hed_attributes.get(hed_schema_constants.VERSION_ATTRIBUTE, "") + if not schema: + self._schema = HedSchema() + else: + self._schema = schema + self.appending_to_schema = True + if not self._schema.with_standard: + raise HedFileError(HedExceptions.SCHEMA_DUPLICATE_PREFIX, + "Loading multiple normal schemas as a merged one with the same namespace. " + "Ensure schemas have the withStandard header attribute set", + self.name) + elif with_standard != self._schema.with_standard: + raise HedFileError(HedExceptions.BAD_WITH_STANDARD_MULTIPLE_VALUES, + "Merging schemas requires same withStandard value.", + self.name) + hed_attributes[hed_schema_constants.VERSION_ATTRIBUTE] = self._schema.version_number + f",{version_number}" + hed_attributes[hed_schema_constants.LIBRARY_ATTRIBUTE] = self._schema.library + f",{self.library}" + if name: + self._schema.name = name + self._schema.filename = filename + self._schema.header_attributes = hed_attributes + self._loading_merged = False + self.fatal_errors = [] + + @property + def schema(self): + """ The partially loaded schema if you are after just header attributes.""" + return self._schema + + @classmethod + def load(cls, filename=None, schema_as_string=None, schema=None, file_format=None, name=""): + """ Loads and returns the schema, including partnered schema if applicable. + + Parameters: + filename(str or None): A valid filepath or None + schema_as_string(str or None): A full schema as text or None + schema(HedSchema or None): A HED schema to merge this new file into + It must be a with-standard schema with the same value. + file_format(str or None): If this is an owl file being loaded, this is the format. + Allowed values include: turtle, json-ld, and owl(xml) + name(str or None): Optional user supplied identifier, by default uses filename + Returns: + schema(HedSchema): The new schema + """ + loader = cls(filename, schema_as_string, schema, file_format, name) + return loader._load() + + def _load(self): + """ Parses the previously loaded data, including loading a partnered schema if needed. + + Returns: + schema(HedSchema): The new schema + """ + self._loading_merged = True + # Do a full load of the standard schema if this is a partnered schema + if not self.appending_to_schema and self._schema.with_standard and not self._schema.merged: + from hed.schema.hed_schema_io import load_schema_version + saved_attr = self._schema.header_attributes + saved_format = self._schema.source_format + try: + base_version = load_schema_version(self._schema.with_standard) + except HedFileError as e: + raise HedFileError(HedExceptions.BAD_WITH_STANDARD, + message=f"Cannot load withStandard schema '{self._schema.with_standard}'", + filename=e.filename) + # Copy the non-alterable cached schema + self._schema = copy.deepcopy(base_version) + self._schema.filename = self.filename + self._schema.name = self.name # Manually set name here as we don't want to pass it to load_schema_version + self._schema.header_attributes = saved_attr + self._schema.source_format = saved_format + self._loading_merged = False + + self._parse_data() + self._schema.finalize_dictionaries() + + return self._schema + + @abstractmethod + def _open_file(self): + """Overloaded versions should retrieve the input from filename/schema_as_string""" + pass + + @abstractmethod + def _get_header_attributes(self, input_data): + """Overloaded versions should return the header attributes from the input data.""" + pass + + @abstractmethod + def _parse_data(self): + """Puts the input data into the new schema""" + pass + + def _add_to_dict_base(self, entry, key_class): + if not entry.has_attribute(HedKey.InLibrary) and self.appending_to_schema and self._schema.merged: + return None + + if self.library and ( + not self._schema.with_standard or (not self._schema.merged and self._schema.with_standard)): + # only add it if not already present - This is a rare case + if not entry.has_attribute(HedKey.InLibrary): + entry._set_attribute_value(HedKey.InLibrary, self.library) + + return self._schema._add_tag_to_dict(entry.name, entry, key_class) + + @staticmethod + def find_rooted_entry(tag_entry, schema, loading_merged): + """ This semi-validates rooted tags, raising an exception on major errors + + Parameters: + tag_entry(HedTagEntry): the possibly rooted tag + schema(HedSchema): The schema being loaded + loading_merged(bool): If this schema was already merged before loading + + Returns: + rooted_tag(HedTagEntry or None): The base tag entry from the standard schema + Returns None if this tag isn't rooted + + :raises HedFileError: + - A rooted attribute is found in a non-paired schema + - A rooted attribute is not a string + - A rooted attribute was found on a non-root node in an unmerged schema. + - A rooted attribute is found on a root node in a merged schema. + - A rooted attribute indicates a tag that doesn't exist in the base schema. + """ + rooted_tag = tag_entry.has_attribute(constants.HedKey.Rooted, return_value=True) + if rooted_tag is not None: + if not schema.with_standard: + raise HedFileError(HedExceptions.ROOTED_TAG_INVALID, + f"Rooted tag attribute found on '{tag_entry.short_tag_name}' in a standard schema.", + schema.name) + + if not isinstance(rooted_tag, str): + raise HedFileError(HedExceptions.ROOTED_TAG_INVALID, + f'Rooted tag \'{tag_entry.short_tag_name}\' is not a string."', + schema.name) + + if tag_entry.parent_name and not loading_merged: + raise HedFileError(HedExceptions.ROOTED_TAG_INVALID, + f'Found rooted tag \'{tag_entry.short_tag_name}\' as a non root node.', + schema.name) + + if not tag_entry.parent_name and loading_merged: + raise HedFileError(HedExceptions.ROOTED_TAG_INVALID, + f'Found rooted tag \'{tag_entry.short_tag_name}\' as a root node in a merged schema.', + schema.name) + + rooted_entry = schema.tags.get(rooted_tag) + if not rooted_entry or rooted_entry.has_attribute(constants.HedKey.InLibrary): + raise HedFileError(HedExceptions.ROOTED_TAG_DOES_NOT_EXIST, + f"Rooted tag '{tag_entry.short_tag_name}' not found in paired standard schema", + schema.name) + + if loading_merged: + return None + + return rooted_entry + + def _add_fatal_error(self, line_number, line, warning_message="Schema term is empty or the line is malformed", + error_code=HedExceptions.WIKI_DELIMITERS_INVALID): + + self.fatal_errors += schema_util.format_error(line_number, line, warning_message, error_code) diff --git a/hed/schema/schema_io/ontology_util.py b/hed/schema/schema_io/ontology_util.py index 28c05d5d2..c5d235afa 100644 --- a/hed/schema/schema_io/ontology_util.py +++ b/hed/schema/schema_io/ontology_util.py @@ -60,7 +60,7 @@ def get_all_ids(df): df(pd.DataFrame): The dataframe Returns: - numbers(Set or None): None if this has no hed column, otherwise all unique numbers as a set. + numbers(Set or None): None if this has no HED column, otherwise all unique numbers as a set. """ if constants.hed_id in df.columns: modified_df = df[constants.hed_id].apply(lambda x: remove_prefix(x, "HED_")) @@ -86,7 +86,7 @@ def update_dataframes_from_schema(dataframes, schema, schema_name="", get_as_ids hedid_errors = [] if not schema_name: schema_name = schema.library - # 1. Verify existing hed ids don't conflict between schema/dataframes + # 1. Verify existing HED ids don't conflict between schema/dataframes for df_key, df in dataframes.items(): section_key = constants.section_mapping_hed_id.get(df_key) if not section_key: @@ -106,13 +106,13 @@ def update_dataframes_from_schema(dataframes, schema, schema_name="", get_as_ids output_dfs = Schema2DF(get_as_ids=get_as_ids).process_schema(schema, save_merged=False) if assign_missing_ids: - # 3: Add any hed ID's as needed to these generated dfs + # 3: Add any HED ID's as needed to these generated dfs for df_key, df in output_dfs.items(): if df_key == constants.STRUCT_KEY: continue unused_tag_ids = _get_hedid_range(schema_name, df_key) - # If no errors, assign new hed ID's + # If no errors, assign new HED ID's assign_hed_ids_section(df, unused_tag_ids) # 4: Merge the dataframes @@ -184,7 +184,7 @@ def assign_hed_ids_section(df, unused_tag_ids): Parameters: df(pd.DataFrame): The dataframe to add id's to. - unused_tag_ids(set of int): The possible hed id's to assign from + unused_tag_ids(set of int): The possible HED id's to assign from """ # Remove already used ids unused_tag_ids -= get_all_ids(df) diff --git a/hed/schema/schema_io/schema2df.py b/hed/schema/schema_io/schema2df.py index 1b8f800c2..e48b4de1a 100644 --- a/hed/schema/schema_io/schema2df.py +++ b/hed/schema/schema_io/schema2df.py @@ -36,7 +36,7 @@ def _get_object_name_and_id(self, object_name, include_prefix=False): """ Get the adjusted name and ID for the given object type. Parameters: - object_name(str): The name of the base hed object, e.g. HedHeader, HedUnit + object_name(str): The name of the base HED object, e.g. HedHeader, HedUnit include_prefix(bool): If True, include the "hed:" Returns: object_name(str): The inherited object name, e.g. StandardHeader @@ -239,7 +239,7 @@ def _get_header_equivalent_to(self, attributes_string, subclass_of): attribute_strings.append(f'({attribute} value "{value}")') if self._get_as_ids: - # we just want the ID for normal hed objects, not schema specific + # we just want the ID for normal HED objects, not schema specific subclass_of = self._get_object_id(subclass_of, base_id=0, include_prefix=True) # If they match, we want to leave equivalent_to blank diff --git a/hed/schema/schema_validation_util.py b/hed/schema/schema_validation_util.py index e241fd502..bbebc5d4e 100644 --- a/hed/schema/schema_validation_util.py +++ b/hed/schema/schema_validation_util.py @@ -78,7 +78,7 @@ def validate_schema_description_new(hed_entry): def schema_version_for_library(hed_schema, library_name): - """ Given the library name and hed schema object, return the version + """ Given the library name and HED schema object, return the version Parameters: hed_schema (HedSchema): the schema object diff --git a/hed/schema/schema_validation_util_deprecated.py b/hed/schema/schema_validation_util_deprecated.py index c8bd3ec7e..0b42ba9f3 100644 --- a/hed/schema/schema_validation_util_deprecated.py +++ b/hed/schema/schema_validation_util_deprecated.py @@ -1,82 +1,82 @@ -"""Legacy validation for terms and descriptions prior to 8.3.0.""" -from hed.errors.error_reporter import ErrorHandler -from hed.errors.error_types import SchemaWarnings - - -ALLOWED_TAG_CHARS = "-" -ALLOWED_DESC_CHARS = "-_:;,./()+ ^" - - -def validate_schema_tag(hed_entry): - """ Check short tag for capitalization and illegal characters. - - Parameters: - hed_entry (HedTagEntry): A single hed term. - - Returns: - list: A list of all formatting issues found in the term. Each issue is a dictionary. - - """ - issues_list = [] - hed_term = hed_entry.short_tag_name - # Any # terms will have already been validated as the previous entry. - if hed_term == "#": - return issues_list - - for i, char in enumerate(hed_term): - if i == 0 and not (char.isdigit() or char.isupper()): - issues_list += ErrorHandler.format_error(SchemaWarnings.SCHEMA_INVALID_CAPITALIZATION, - hed_term, char_index=i, problem_char=char) - continue - if char in ALLOWED_TAG_CHARS or char.isalnum(): - continue - issues_list += ErrorHandler.format_error(SchemaWarnings.SCHEMA_INVALID_CHARACTERS_IN_TAG, - hed_term, char_index=i, problem_char=char) - return issues_list - - -def validate_schema_description(hed_entry): - """ Check the description of a single schema entry. - - Parameters: - hed_entry (HedSchemaEntry): A single schema entry - - Returns: - list: A list of all formatting issues found in the description. - - """ - issues_list = [] - # Blank description is fine - if not hed_entry.description: - return issues_list - for i, char in enumerate(hed_entry.description): - if char.isalnum(): - continue - if char in ALLOWED_DESC_CHARS: - continue - issues_list += ErrorHandler.format_error(SchemaWarnings.SCHEMA_INVALID_CHARACTERS_IN_DESC, - hed_entry.description, hed_entry.name, char_index=i, problem_char=char) - return issues_list - - -def verify_no_brackets(hed_entry): - """ Extremely basic check to block curly braces - - Parameters: - hed_entry (HedSchemaEntry): A single schema entry - - Returns: - list: A list of issues for invalid characters found in the name - """ - hed_term = hed_entry.name - issues_list = [] - indexes = _get_disallowed_character_indexes(hed_term) - for char, index in indexes: - issues_list += ErrorHandler.format_error(SchemaWarnings.SCHEMA_INVALID_CHARACTERS_IN_TAG, - hed_term, char_index=index, problem_char=char) - return issues_list - - -def _get_disallowed_character_indexes(validation_string, index_adj=0, disallowed_chars="{}"): - indexes = [(char, index + index_adj) for index, char in enumerate(validation_string) if char in disallowed_chars] - return indexes +"""Legacy validation for terms and descriptions prior to 8.3.0.""" +from hed.errors.error_reporter import ErrorHandler +from hed.errors.error_types import SchemaWarnings + + +ALLOWED_TAG_CHARS = "-" +ALLOWED_DESC_CHARS = "-_:;,./()+ ^" + + +def validate_schema_tag(hed_entry): + """ Check short tag for capitalization and illegal characters. + + Parameters: + hed_entry (HedTagEntry): A single HED term. + + Returns: + list: A list of all formatting issues found in the term. Each issue is a dictionary. + + """ + issues_list = [] + hed_term = hed_entry.short_tag_name + # Any # terms will have already been validated as the previous entry. + if hed_term == "#": + return issues_list + + for i, char in enumerate(hed_term): + if i == 0 and not (char.isdigit() or char.isupper()): + issues_list += ErrorHandler.format_error(SchemaWarnings.SCHEMA_INVALID_CAPITALIZATION, + hed_term, char_index=i, problem_char=char) + continue + if char in ALLOWED_TAG_CHARS or char.isalnum(): + continue + issues_list += ErrorHandler.format_error(SchemaWarnings.SCHEMA_INVALID_CHARACTERS_IN_TAG, + hed_term, char_index=i, problem_char=char) + return issues_list + + +def validate_schema_description(hed_entry): + """ Check the description of a single schema entry. + + Parameters: + hed_entry (HedSchemaEntry): A single schema entry + + Returns: + list: A list of all formatting issues found in the description. + + """ + issues_list = [] + # Blank description is fine + if not hed_entry.description: + return issues_list + for i, char in enumerate(hed_entry.description): + if char.isalnum(): + continue + if char in ALLOWED_DESC_CHARS: + continue + issues_list += ErrorHandler.format_error(SchemaWarnings.SCHEMA_INVALID_CHARACTERS_IN_DESC, + hed_entry.description, hed_entry.name, char_index=i, problem_char=char) + return issues_list + + +def verify_no_brackets(hed_entry): + """ Extremely basic check to block curly braces + + Parameters: + hed_entry (HedSchemaEntry): A single schema entry + + Returns: + list: A list of issues for invalid characters found in the name + """ + hed_term = hed_entry.name + issues_list = [] + indexes = _get_disallowed_character_indexes(hed_term) + for char, index in indexes: + issues_list += ErrorHandler.format_error(SchemaWarnings.SCHEMA_INVALID_CHARACTERS_IN_TAG, + hed_term, char_index=index, problem_char=char) + return issues_list + + +def _get_disallowed_character_indexes(validation_string, index_adj=0, disallowed_chars="{}"): + indexes = [(char, index + index_adj) for index, char in enumerate(validation_string) if char in disallowed_chars] + return indexes diff --git a/hed/scripts/convert_and_update_schema.py b/hed/scripts/convert_and_update_schema.py index 0debdd488..51032a1f3 100644 --- a/hed/scripts/convert_and_update_schema.py +++ b/hed/scripts/convert_and_update_schema.py @@ -76,7 +76,7 @@ def convert_and_update(filenames, set_ids): def main(): parser = argparse.ArgumentParser(description='Update other schema formats based on the changed one.') parser.add_argument('filenames', nargs='*', help='List of files to process') - parser.add_argument('--set-ids', action='store_true', help='Add missing hed ids') + parser.add_argument('--set-ids', action='store_true', help='Add missing HED ids') args = parser.parse_args() diff --git a/hed/tools/analysis/hed_type.py b/hed/tools/analysis/hed_type.py index 133da324d..d6c64943f 100644 --- a/hed/tools/analysis/hed_type.py +++ b/hed/tools/analysis/hed_type.py @@ -135,7 +135,7 @@ def _update_definition_variables(self, tag, hed_vars, index): Parameters: tag (HedTag): A HedTag that is a Def tag. - hed_vars (list): A list of names of the hed type_variables + hed_vars (list): A list of names of the HED type_variables index (ind): The event number associated with this. Notes: diff --git a/hed/validator/def_validator.py b/hed/validator/def_validator.py index 4c24f7374..26731edc2 100644 --- a/hed/validator/def_validator.py +++ b/hed/validator/def_validator.py @@ -14,7 +14,7 @@ class DefValidator(DefinitionDict): """ def __init__(self, def_dicts=None, hed_schema=None): - """ Initialize for definitions in hed strings. + """ Initialize for definitions in HED strings. Parameters: def_dicts (list or DefinitionDict or str): DefinitionDicts containing the definitions to pass to baseclass @@ -67,7 +67,7 @@ def _validate_def_contents(self, def_tag, def_expand_group, hed_validator): """ Check for issues with expanding a tag from Def to a Def-expand tag group Parameters: - def_tag (HedTag): Source hed tag that may be a Def or Def-expand tag. + def_tag (HedTag): Source HED tag that may be a Def or Def-expand tag. def_expand_group (HedGroup or HedTag): Source group for this def-expand tag. Same as def_tag if this is not a def-expand tag. hed_validator (HedValidator): Used to validate the placeholder replacement. @@ -140,7 +140,7 @@ def validate_onset_offset(self, hed_string_obj): """ Validate onset/offset Parameters: - hed_string_obj (HedString): The hed string to check. + hed_string_obj (HedString): The HED string to check. Returns: list: A list of issues found in validating onsets (i.e., out of order onsets, unknown def names). diff --git a/hed/validator/hed_validator.py b/hed/validator/hed_validator.py index 321accfbe..dffd88fa8 100644 --- a/hed/validator/hed_validator.py +++ b/hed/validator/hed_validator.py @@ -50,7 +50,7 @@ def validate(self, hed_string, allow_placeholders, error_handler=None): allow_placeholders(bool): allow placeholders in the string error_handler(ErrorHandler or None): the error handler to use, creates a default one if none passed Returns: - issues (list of dict): A list of issues for hed string + issues (list of dict): A list of issues for HED string """ if not error_handler: error_handler = error_reporter.ErrorHandler() @@ -101,7 +101,7 @@ def _run_validate_tag_characters(self, original_tag, allow_placeholders): return self._char_validator.check_tag_invalid_chars(original_tag, allow_placeholders) def _run_hed_string_validators(self, hed_string_obj, allow_placeholders=False): - """Basic high level checks of the hed string for illegal characters + """Basic high level checks of the HED string for illegal characters Catches fully banned characters, out of order parentheses, commas, repeated slashes, etc. diff --git a/hed/validator/onset_validator.py b/hed/validator/onset_validator.py index 947328a1d..1983f409d 100644 --- a/hed/validator/onset_validator.py +++ b/hed/validator/onset_validator.py @@ -1,80 +1,80 @@ -""" Validates the onset/offset conditions. """ - -from hed.models.model_constants import DefTagNames -from hed.errors.error_reporter import ErrorHandler -from hed.errors.error_types import TemporalErrors - - -class OnsetValidator: - """ Validates onset/offset pairs. """ - - def __init__(self): - self._onsets = {} - - def validate_temporal_relations(self, hed_string_obj): - """ Validate onset/offset/inset tag relations - - Parameters: - hed_string_obj (HedString): The hed string to check. - - Returns: - list: A list of issues found in validating onsets (i.e., out of order onsets, repeated def names). - """ - onset_issues = [] - used_def_names = set() - for temporal_tag, temporal_group in hed_string_obj.find_top_level_tags(anchor_tags=DefTagNames.TEMPORAL_KEYS): - if not temporal_tag: - return [] - - def_tags = temporal_group.find_def_tags(include_groups=0) - if not def_tags: - continue - - def_tag = def_tags[0] - def_name = def_tag.extension - if def_name.casefold() in used_def_names: - onset_issues += ErrorHandler.format_error(TemporalErrors.ONSET_SAME_DEFS_ONE_ROW, tag=temporal_tag, - def_name=def_name) - continue - - used_def_names.add(def_tag.extension.casefold()) - - # At this point we have either an onset or offset tag and it's name - onset_issues += self._handle_onset_or_offset(def_tag, temporal_tag) - - return onset_issues - - def _handle_onset_or_offset(self, def_tag, onset_offset_tag): - is_onset = onset_offset_tag.short_base_tag == DefTagNames.ONSET_KEY - full_def_name = def_tag.extension - if is_onset: - # onset can never fail as it implies an offset - self._onsets[full_def_name.casefold()] = full_def_name - else: - is_offset = onset_offset_tag.short_base_tag == DefTagNames.OFFSET_KEY - if full_def_name.casefold() not in self._onsets: - if is_offset: - return ErrorHandler.format_error(TemporalErrors.OFFSET_BEFORE_ONSET, tag=def_tag) - else: - return ErrorHandler.format_error(TemporalErrors.INSET_BEFORE_ONSET, tag=def_tag) - elif is_offset: - del self._onsets[full_def_name.casefold()] - - return [] - - @staticmethod - def check_for_banned_tags(hed_string): - """ Returns an issue for every tag found from the banned list - - Parameters: - hed_string(HedString): the string to check - - Returns: - list: The validation issues associated with the characters. Each issue is dictionary. - """ - banned_tag_list = DefTagNames.ALL_TIME_KEYS - issues = [] - for tag in hed_string.get_all_tags(): - if tag.short_base_tag in banned_tag_list: - issues += ErrorHandler.format_error(TemporalErrors.HED_ONSET_WITH_NO_COLUMN, tag) - return issues +""" Validates the onset/offset conditions. """ + +from hed.models.model_constants import DefTagNames +from hed.errors.error_reporter import ErrorHandler +from hed.errors.error_types import TemporalErrors + + +class OnsetValidator: + """ Validates onset/offset pairs. """ + + def __init__(self): + self._onsets = {} + + def validate_temporal_relations(self, hed_string_obj): + """ Validate onset/offset/inset tag relations + + Parameters: + hed_string_obj (HedString): The HED string to check. + + Returns: + list: A list of issues found in validating onsets (i.e., out of order onsets, repeated def names). + """ + onset_issues = [] + used_def_names = set() + for temporal_tag, temporal_group in hed_string_obj.find_top_level_tags(anchor_tags=DefTagNames.TEMPORAL_KEYS): + if not temporal_tag: + return [] + + def_tags = temporal_group.find_def_tags(include_groups=0) + if not def_tags: + continue + + def_tag = def_tags[0] + def_name = def_tag.extension + if def_name.casefold() in used_def_names: + onset_issues += ErrorHandler.format_error(TemporalErrors.ONSET_SAME_DEFS_ONE_ROW, tag=temporal_tag, + def_name=def_name) + continue + + used_def_names.add(def_tag.extension.casefold()) + + # At this point we have either an onset or offset tag and it's name + onset_issues += self._handle_onset_or_offset(def_tag, temporal_tag) + + return onset_issues + + def _handle_onset_or_offset(self, def_tag, onset_offset_tag): + is_onset = onset_offset_tag.short_base_tag == DefTagNames.ONSET_KEY + full_def_name = def_tag.extension + if is_onset: + # onset can never fail as it implies an offset + self._onsets[full_def_name.casefold()] = full_def_name + else: + is_offset = onset_offset_tag.short_base_tag == DefTagNames.OFFSET_KEY + if full_def_name.casefold() not in self._onsets: + if is_offset: + return ErrorHandler.format_error(TemporalErrors.OFFSET_BEFORE_ONSET, tag=def_tag) + else: + return ErrorHandler.format_error(TemporalErrors.INSET_BEFORE_ONSET, tag=def_tag) + elif is_offset: + del self._onsets[full_def_name.casefold()] + + return [] + + @staticmethod + def check_for_banned_tags(hed_string): + """ Returns an issue for every tag found from the banned list + + Parameters: + hed_string(HedString): the string to check + + Returns: + list: The validation issues associated with the characters. Each issue is dictionary. + """ + banned_tag_list = DefTagNames.ALL_TIME_KEYS + issues = [] + for tag in hed_string.get_all_tags(): + if tag.short_base_tag in banned_tag_list: + issues += ErrorHandler.format_error(TemporalErrors.HED_ONSET_WITH_NO_COLUMN, tag) + return issues diff --git a/hed/validator/sidecar_validator.py b/hed/validator/sidecar_validator.py index dfe796ecb..5bbf821f0 100644 --- a/hed/validator/sidecar_validator.py +++ b/hed/validator/sidecar_validator.py @@ -1,318 +1,318 @@ -""" Validates sidecars. """ -import copy -import re -import itertools - -from hed.errors import ErrorHandler, ErrorContext, SidecarErrors, DefinitionErrors, ColumnErrors -from hed.models.column_mapper import ColumnType -from hed.models.hed_string import HedString -from hed.models.column_metadata import ColumnMetadata -from hed.errors.error_reporter import sort_issues -from hed.models.model_constants import DefTagNames -from hed.errors.error_reporter import check_for_any_errors -from hed.models import df_util - - -# todo: Add/improve validation for definitions being in known columns(right now it just assumes they aren't) -class SidecarValidator: - reserved_column_names = ["HED"] - reserved_category_values = ["n/a"] - - def __init__(self, hed_schema): - """ - Constructor for the SidecarValidator class. - - Parameters: - hed_schema (HedSchema): HED schema object to use for validation. - """ - self._schema = hed_schema - - def validate(self, sidecar, extra_def_dicts=None, name=None, error_handler=None): - """Validate the input data using the schema - - Parameters: - sidecar (Sidecar): Input data to be validated. - extra_def_dicts(list or DefinitionDict): extra def dicts in addition to sidecar - name(str): The name to report this sidecar as - error_handler (ErrorHandler): Error context to use. Creates a new one if None - Returns: - issues (list of dict): A list of issues associated with each level in the HED string. - """ - from hed.validator import HedValidator - issues = [] - if error_handler is None: - error_handler = ErrorHandler() - - error_handler.push_error_context(ErrorContext.FILE_NAME, name) - issues += self.validate_structure(sidecar, error_handler=error_handler) - issues += self._validate_refs(sidecar, error_handler) - - # only allowed early out, something is very wrong with structure or refs - if check_for_any_errors(issues): - error_handler.pop_error_context() - return issues - sidecar_def_dict = sidecar.get_def_dict(hed_schema=self._schema, extra_def_dicts=extra_def_dicts) - hed_validator = HedValidator(self._schema, def_dicts=sidecar_def_dict, definitions_allowed=True) - - issues += sidecar._extract_definition_issues - issues += sidecar_def_dict.issues - - # todo: Break this function up - all_ref_columns = sidecar.get_column_refs() - definition_checks = {} - for column_data in sidecar: - column_name = column_data.column_name - column_data = column_data._get_unvalidated_data() - hed_strings = column_data.get_hed_strings() - is_ref_column = column_name in all_ref_columns - error_handler.push_error_context(ErrorContext.SIDECAR_COLUMN_NAME, column_name) - for key_name, hed_string in hed_strings.items(): - new_issues = [] - if len(hed_strings) > 1: - error_handler.push_error_context(ErrorContext.SIDECAR_KEY_NAME, key_name) - hed_string_obj = HedString(hed_string, hed_schema=self._schema, def_dict=sidecar_def_dict) - hed_string_obj.remove_refs() - - error_handler.push_error_context(ErrorContext.HED_STRING, hed_string_obj) - new_issues += hed_validator.run_basic_checks(hed_string_obj, allow_placeholders=True) - def_check_list = definition_checks.setdefault(column_name, []) - def_check_list.append(hed_string_obj.find_tags({DefTagNames.DEFINITION_KEY}, recursive=True, - include_groups=0)) - - # Might refine this later - for now just skip checking placeholder counts in definition columns. - if not def_check_list[-1]: - new_issues += self._validate_pound_sign_count(hed_string_obj, column_type=column_data.column_type) - - error_handler.add_context_and_filter(new_issues) - issues += new_issues - error_handler.pop_error_context() # Hed String - - # Only do full string checks on full columns, not partial ref columns. - if not is_ref_column: - refs = re.findall("\{([a-z_\-0-9]+)\}", hed_string, re.IGNORECASE) - refs_strings = {data.column_name: data.get_hed_strings() for data in sidecar} - if "HED" not in refs_strings: - refs_strings["HED"] = ["n/a"] - for combination in itertools.product(*[refs_strings[key] for key in refs]): - new_issues = [] - ref_dict = dict(zip(refs, combination)) - modified_string = hed_string - for ref in refs: - modified_string = df_util.replace_ref(modified_string, f"{{{ref}}}", ref_dict[ref]) - hed_string_obj = HedString(modified_string, hed_schema=self._schema, def_dict=sidecar_def_dict) - - error_handler.push_error_context(ErrorContext.HED_STRING, hed_string_obj) - new_issues += hed_validator.run_full_string_checks(hed_string_obj) - error_handler.add_context_and_filter(new_issues) - issues += new_issues - error_handler.pop_error_context() # Hed string - if len(hed_strings) > 1: - error_handler.pop_error_context() # Category key - - error_handler.pop_error_context() # Column Name - issues += self._check_definitions_bad_spot(definition_checks, error_handler) - issues = sort_issues(issues) - - error_handler.pop_error_context() # Filename - - return issues - - def validate_structure(self, sidecar, error_handler): - """ Validate the raw structure of this sidecar. - - Parameters: - sidecar(Sidecar): the sidecar to validate - error_handler(ErrorHandler): The error handler to use for error context - - Returns: - issues(list): A list of issues found with the structure - """ - all_validation_issues = [] - for column_name, dict_for_entry in sidecar.loaded_dict.items(): - error_handler.push_error_context(ErrorContext.SIDECAR_COLUMN_NAME, column_name) - all_validation_issues += self._validate_column_structure(column_name, dict_for_entry, error_handler) - error_handler.pop_error_context() - return all_validation_issues - - def _validate_refs(self, sidecar, error_handler): - possible_column_refs = sidecar.all_hed_columns - - if "HED" not in possible_column_refs: - possible_column_refs.append("HED") - - issues = [] - found_column_references = {} - for column_data in sidecar: - column_name = column_data.column_name - hed_strings = column_data.get_hed_strings() - error_handler.push_error_context(ErrorContext.SIDECAR_COLUMN_NAME, column_name) - matches = [] - for key_name, hed_string in hed_strings.items(): - new_issues = [] - if len(hed_strings) > 1: - error_handler.push_error_context(ErrorContext.SIDECAR_KEY_NAME, key_name) - - error_handler.push_error_context(ErrorContext.HED_STRING, - HedString(hed_string, hed_schema=self._schema)) - invalid_locations = self._find_non_matching_braces(hed_string) - for loc in invalid_locations: - bad_symbol = hed_string[loc] - new_issues += error_handler.format_error_with_context(ColumnErrors.MALFORMED_COLUMN_REF, - column_name, loc, bad_symbol) - - sub_matches = re.findall(r"\{([a-z_\-0-9]+)\}", hed_string, re.IGNORECASE) - matches.append(sub_matches) - for match in sub_matches: - if match not in possible_column_refs: - new_issues += error_handler.format_error_with_context(ColumnErrors.INVALID_COLUMN_REF, match) - - error_handler.pop_error_context() - if len(hed_strings) > 1: - error_handler.pop_error_context() - error_handler.add_context_and_filter(new_issues) - issues += new_issues - error_handler.pop_error_context() - references = [match for sublist in matches for match in sublist] - if references: - found_column_references[column_name] = references - if column_name in references: - issues += error_handler.format_error_with_context(ColumnErrors.SELF_COLUMN_REF, column_name) - - for column_name, refs in found_column_references.items(): - for ref in refs: - if ref in found_column_references and ref != column_name: - issues += error_handler.format_error_with_context(ColumnErrors.NESTED_COLUMN_REF, column_name, ref) - - return issues - - @staticmethod - def _find_non_matching_braces(hed_string): - issues = [] - open_brace_index = -1 - - for i, char in enumerate(hed_string): - if char == '{': - if open_brace_index >= 0: # Nested brace detected - issues.append(open_brace_index) - open_brace_index = i - elif char == '}': - if open_brace_index >= 0: - open_brace_index = -1 - else: - issues.append(i) - - if open_brace_index >= 0: - issues.append(open_brace_index) - - return issues - - @staticmethod - def _check_for_key(key, data): - # Probably can be cleaned up more -> Return True if any data or subdata is key - if isinstance(data, dict): - return SidecarValidator._check_dict(key, data) - elif isinstance(data, list): - return SidecarValidator._check_list(key, data) - return False - - @staticmethod - def _check_dict(key, data_dict): - if key in data_dict: - return True - for sub_data in data_dict.values(): - if SidecarValidator._check_for_key(key, sub_data): - return True - return False - - @staticmethod - def _check_list(key, data_list): - for sub_data in data_list: - if SidecarValidator._check_for_key(key, sub_data): - return True - return False - - def _validate_column_structure(self, column_name, dict_for_entry, error_handler): - """ Checks primarily for type errors such as expecting a string and getting a list in a json sidecar. - - Parameters: - error_handler (ErrorHandler) Sets the context for the error reporting. Cannot be None. - - Returns: - list: Issues in performing the operations. Each issue is a dictionary. - - """ - val_issues = [] - if column_name in self.reserved_column_names: - val_issues += error_handler.format_error_with_context(SidecarErrors.SIDECAR_HED_USED_COLUMN) - return val_issues - - column_type = ColumnMetadata._detect_column_type(dict_for_entry=dict_for_entry, basic_validation=False) - if column_type is None: - val_issues += error_handler.format_error_with_context(SidecarErrors.UNKNOWN_COLUMN_TYPE, - column_name=column_name) - elif column_type == ColumnType.Ignore: - found_hed = self._check_for_key("HED", dict_for_entry) - if found_hed: - val_issues += error_handler.format_error_with_context(SidecarErrors.SIDECAR_HED_USED) - elif column_type == ColumnType.Categorical: - val_issues += self._validate_categorical_column(column_name, dict_for_entry, error_handler) - - return val_issues - - def _validate_categorical_column(self, column_name, dict_for_entry, error_handler): - """Validates a categorical column in a json sidecar.""" - val_issues = [] - raw_hed_dict = dict_for_entry["HED"] - if not raw_hed_dict: - val_issues += error_handler.format_error_with_context(SidecarErrors.BLANK_HED_STRING) - for key_name, hed_string in raw_hed_dict.items(): - error_handler.push_error_context(ErrorContext.SIDECAR_KEY_NAME, key_name) - if not hed_string: - val_issues += error_handler.format_error_with_context(SidecarErrors.BLANK_HED_STRING) - elif not isinstance(hed_string, str): - val_issues += error_handler.format_error_with_context(SidecarErrors.WRONG_HED_DATA_TYPE, - given_type=type(hed_string), - expected_type="str") - elif key_name in self.reserved_category_values: - val_issues += error_handler.format_error_with_context(SidecarErrors.SIDECAR_NA_USED, column_name) - error_handler.pop_error_context() - return val_issues - - def _validate_pound_sign_count(self, hed_string, column_type): - """ Check if a given hed string in the column has the correct number of pound signs. - - Parameters: - hed_string (str or HedString): HED string to be checked. - - Returns: - list: Issues due to pound sign errors. Each issue is a dictionary. - - Notes: - Normally the number of # should be either 0 or 1, but sometimes will be higher due to the - presence of definition tags. - - """ - # Make a copy without definitions to check placeholder count. - expected_count, error_type = ColumnMetadata.expected_pound_sign_count(column_type) - hed_string_copy = copy.deepcopy(hed_string) - hed_string_copy.remove_definitions() - hed_string_copy.shrink_defs() - - if str(hed_string_copy).count("#") != expected_count: - return ErrorHandler.format_error(error_type, pound_sign_count=str(hed_string_copy).count("#")) - - return [] - - def _check_definitions_bad_spot(self, definition_checks, error_handler): - issues = [] - # This could be simplified now - for col_name, has_def in definition_checks.items(): - error_handler.push_error_context(ErrorContext.SIDECAR_COLUMN_NAME, col_name) - def_check = set(bool(d) for d in has_def) - if len(def_check) != 1: - flat_def_list = [d for defs in has_def for d in defs] - for d in flat_def_list: - issues += error_handler.format_error_with_context(DefinitionErrors.BAD_DEFINITION_LOCATION, d) - error_handler.pop_error_context() - - return issues +""" Validates sidecars. """ +import copy +import re +import itertools + +from hed.errors import ErrorHandler, ErrorContext, SidecarErrors, DefinitionErrors, ColumnErrors +from hed.models.column_mapper import ColumnType +from hed.models.hed_string import HedString +from hed.models.column_metadata import ColumnMetadata +from hed.errors.error_reporter import sort_issues +from hed.models.model_constants import DefTagNames +from hed.errors.error_reporter import check_for_any_errors +from hed.models import df_util + + +# todo: Add/improve validation for definitions being in known columns(right now it just assumes they aren't) +class SidecarValidator: + reserved_column_names = ["HED"] + reserved_category_values = ["n/a"] + + def __init__(self, hed_schema): + """ + Constructor for the SidecarValidator class. + + Parameters: + hed_schema (HedSchema): HED schema object to use for validation. + """ + self._schema = hed_schema + + def validate(self, sidecar, extra_def_dicts=None, name=None, error_handler=None): + """Validate the input data using the schema + + Parameters: + sidecar (Sidecar): Input data to be validated. + extra_def_dicts(list or DefinitionDict): extra def dicts in addition to sidecar + name(str): The name to report this sidecar as + error_handler (ErrorHandler): Error context to use. Creates a new one if None + Returns: + issues (list of dict): A list of issues associated with each level in the HED string. + """ + from hed.validator import HedValidator + issues = [] + if error_handler is None: + error_handler = ErrorHandler() + + error_handler.push_error_context(ErrorContext.FILE_NAME, name) + issues += self.validate_structure(sidecar, error_handler=error_handler) + issues += self._validate_refs(sidecar, error_handler) + + # only allowed early out, something is very wrong with structure or refs + if check_for_any_errors(issues): + error_handler.pop_error_context() + return issues + sidecar_def_dict = sidecar.get_def_dict(hed_schema=self._schema, extra_def_dicts=extra_def_dicts) + hed_validator = HedValidator(self._schema, def_dicts=sidecar_def_dict, definitions_allowed=True) + + issues += sidecar._extract_definition_issues + issues += sidecar_def_dict.issues + + # todo: Break this function up + all_ref_columns = sidecar.get_column_refs() + definition_checks = {} + for column_data in sidecar: + column_name = column_data.column_name + column_data = column_data._get_unvalidated_data() + hed_strings = column_data.get_hed_strings() + is_ref_column = column_name in all_ref_columns + error_handler.push_error_context(ErrorContext.SIDECAR_COLUMN_NAME, column_name) + for key_name, hed_string in hed_strings.items(): + new_issues = [] + if len(hed_strings) > 1: + error_handler.push_error_context(ErrorContext.SIDECAR_KEY_NAME, key_name) + hed_string_obj = HedString(hed_string, hed_schema=self._schema, def_dict=sidecar_def_dict) + hed_string_obj.remove_refs() + + error_handler.push_error_context(ErrorContext.HED_STRING, hed_string_obj) + new_issues += hed_validator.run_basic_checks(hed_string_obj, allow_placeholders=True) + def_check_list = definition_checks.setdefault(column_name, []) + def_check_list.append(hed_string_obj.find_tags({DefTagNames.DEFINITION_KEY}, recursive=True, + include_groups=0)) + + # Might refine this later - for now just skip checking placeholder counts in definition columns. + if not def_check_list[-1]: + new_issues += self._validate_pound_sign_count(hed_string_obj, column_type=column_data.column_type) + + error_handler.add_context_and_filter(new_issues) + issues += new_issues + error_handler.pop_error_context() # Hed String + + # Only do full string checks on full columns, not partial ref columns. + if not is_ref_column: + refs = re.findall("\{([a-z_\-0-9]+)\}", hed_string, re.IGNORECASE) + refs_strings = {data.column_name: data.get_hed_strings() for data in sidecar} + if "HED" not in refs_strings: + refs_strings["HED"] = ["n/a"] + for combination in itertools.product(*[refs_strings[key] for key in refs]): + new_issues = [] + ref_dict = dict(zip(refs, combination)) + modified_string = hed_string + for ref in refs: + modified_string = df_util.replace_ref(modified_string, f"{{{ref}}}", ref_dict[ref]) + hed_string_obj = HedString(modified_string, hed_schema=self._schema, def_dict=sidecar_def_dict) + + error_handler.push_error_context(ErrorContext.HED_STRING, hed_string_obj) + new_issues += hed_validator.run_full_string_checks(hed_string_obj) + error_handler.add_context_and_filter(new_issues) + issues += new_issues + error_handler.pop_error_context() # Hed string + if len(hed_strings) > 1: + error_handler.pop_error_context() # Category key + + error_handler.pop_error_context() # Column Name + issues += self._check_definitions_bad_spot(definition_checks, error_handler) + issues = sort_issues(issues) + + error_handler.pop_error_context() # Filename + + return issues + + def validate_structure(self, sidecar, error_handler): + """ Validate the raw structure of this sidecar. + + Parameters: + sidecar(Sidecar): the sidecar to validate + error_handler(ErrorHandler): The error handler to use for error context + + Returns: + issues(list): A list of issues found with the structure + """ + all_validation_issues = [] + for column_name, dict_for_entry in sidecar.loaded_dict.items(): + error_handler.push_error_context(ErrorContext.SIDECAR_COLUMN_NAME, column_name) + all_validation_issues += self._validate_column_structure(column_name, dict_for_entry, error_handler) + error_handler.pop_error_context() + return all_validation_issues + + def _validate_refs(self, sidecar, error_handler): + possible_column_refs = sidecar.all_hed_columns + + if "HED" not in possible_column_refs: + possible_column_refs.append("HED") + + issues = [] + found_column_references = {} + for column_data in sidecar: + column_name = column_data.column_name + hed_strings = column_data.get_hed_strings() + error_handler.push_error_context(ErrorContext.SIDECAR_COLUMN_NAME, column_name) + matches = [] + for key_name, hed_string in hed_strings.items(): + new_issues = [] + if len(hed_strings) > 1: + error_handler.push_error_context(ErrorContext.SIDECAR_KEY_NAME, key_name) + + error_handler.push_error_context(ErrorContext.HED_STRING, + HedString(hed_string, hed_schema=self._schema)) + invalid_locations = self._find_non_matching_braces(hed_string) + for loc in invalid_locations: + bad_symbol = hed_string[loc] + new_issues += error_handler.format_error_with_context(ColumnErrors.MALFORMED_COLUMN_REF, + column_name, loc, bad_symbol) + + sub_matches = re.findall(r"\{([a-z_\-0-9]+)\}", hed_string, re.IGNORECASE) + matches.append(sub_matches) + for match in sub_matches: + if match not in possible_column_refs: + new_issues += error_handler.format_error_with_context(ColumnErrors.INVALID_COLUMN_REF, match) + + error_handler.pop_error_context() + if len(hed_strings) > 1: + error_handler.pop_error_context() + error_handler.add_context_and_filter(new_issues) + issues += new_issues + error_handler.pop_error_context() + references = [match for sublist in matches for match in sublist] + if references: + found_column_references[column_name] = references + if column_name in references: + issues += error_handler.format_error_with_context(ColumnErrors.SELF_COLUMN_REF, column_name) + + for column_name, refs in found_column_references.items(): + for ref in refs: + if ref in found_column_references and ref != column_name: + issues += error_handler.format_error_with_context(ColumnErrors.NESTED_COLUMN_REF, column_name, ref) + + return issues + + @staticmethod + def _find_non_matching_braces(hed_string): + issues = [] + open_brace_index = -1 + + for i, char in enumerate(hed_string): + if char == '{': + if open_brace_index >= 0: # Nested brace detected + issues.append(open_brace_index) + open_brace_index = i + elif char == '}': + if open_brace_index >= 0: + open_brace_index = -1 + else: + issues.append(i) + + if open_brace_index >= 0: + issues.append(open_brace_index) + + return issues + + @staticmethod + def _check_for_key(key, data): + # Probably can be cleaned up more -> Return True if any data or subdata is key + if isinstance(data, dict): + return SidecarValidator._check_dict(key, data) + elif isinstance(data, list): + return SidecarValidator._check_list(key, data) + return False + + @staticmethod + def _check_dict(key, data_dict): + if key in data_dict: + return True + for sub_data in data_dict.values(): + if SidecarValidator._check_for_key(key, sub_data): + return True + return False + + @staticmethod + def _check_list(key, data_list): + for sub_data in data_list: + if SidecarValidator._check_for_key(key, sub_data): + return True + return False + + def _validate_column_structure(self, column_name, dict_for_entry, error_handler): + """ Checks primarily for type errors such as expecting a string and getting a list in a json sidecar. + + Parameters: + error_handler (ErrorHandler) Sets the context for the error reporting. Cannot be None. + + Returns: + list: Issues in performing the operations. Each issue is a dictionary. + + """ + val_issues = [] + if column_name in self.reserved_column_names: + val_issues += error_handler.format_error_with_context(SidecarErrors.SIDECAR_HED_USED_COLUMN) + return val_issues + + column_type = ColumnMetadata._detect_column_type(dict_for_entry=dict_for_entry, basic_validation=False) + if column_type is None: + val_issues += error_handler.format_error_with_context(SidecarErrors.UNKNOWN_COLUMN_TYPE, + column_name=column_name) + elif column_type == ColumnType.Ignore: + found_hed = self._check_for_key("HED", dict_for_entry) + if found_hed: + val_issues += error_handler.format_error_with_context(SidecarErrors.SIDECAR_HED_USED) + elif column_type == ColumnType.Categorical: + val_issues += self._validate_categorical_column(column_name, dict_for_entry, error_handler) + + return val_issues + + def _validate_categorical_column(self, column_name, dict_for_entry, error_handler): + """Validates a categorical column in a json sidecar.""" + val_issues = [] + raw_hed_dict = dict_for_entry["HED"] + if not raw_hed_dict: + val_issues += error_handler.format_error_with_context(SidecarErrors.BLANK_HED_STRING) + for key_name, hed_string in raw_hed_dict.items(): + error_handler.push_error_context(ErrorContext.SIDECAR_KEY_NAME, key_name) + if not hed_string: + val_issues += error_handler.format_error_with_context(SidecarErrors.BLANK_HED_STRING) + elif not isinstance(hed_string, str): + val_issues += error_handler.format_error_with_context(SidecarErrors.WRONG_HED_DATA_TYPE, + given_type=type(hed_string), + expected_type="str") + elif key_name in self.reserved_category_values: + val_issues += error_handler.format_error_with_context(SidecarErrors.SIDECAR_NA_USED, column_name) + error_handler.pop_error_context() + return val_issues + + def _validate_pound_sign_count(self, hed_string, column_type): + """ Check if a given HED string in the column has the correct number of pound signs. + + Parameters: + hed_string (str or HedString): HED string to be checked. + + Returns: + list: Issues due to pound sign errors. Each issue is a dictionary. + + Notes: + Normally the number of # should be either 0 or 1, but sometimes will be higher due to the + presence of definition tags. + + """ + # Make a copy without definitions to check placeholder count. + expected_count, error_type = ColumnMetadata.expected_pound_sign_count(column_type) + hed_string_copy = copy.deepcopy(hed_string) + hed_string_copy.remove_definitions() + hed_string_copy.shrink_defs() + + if str(hed_string_copy).count("#") != expected_count: + return ErrorHandler.format_error(error_type, pound_sign_count=str(hed_string_copy).count("#")) + + return [] + + def _check_definitions_bad_spot(self, definition_checks, error_handler): + issues = [] + # This could be simplified now + for col_name, has_def in definition_checks.items(): + error_handler.push_error_context(ErrorContext.SIDECAR_COLUMN_NAME, col_name) + def_check = set(bool(d) for d in has_def) + if len(def_check) != 1: + flat_def_list = [d for defs in has_def for d in defs] + for d in flat_def_list: + issues += error_handler.format_error_with_context(DefinitionErrors.BAD_DEFINITION_LOCATION, d) + error_handler.pop_error_context() + + return issues diff --git a/hed/validator/spreadsheet_validator.py b/hed/validator/spreadsheet_validator.py index d78711715..0e283b2a5 100644 --- a/hed/validator/spreadsheet_validator.py +++ b/hed/validator/spreadsheet_validator.py @@ -1,186 +1,186 @@ -""" Validates spreadsheet tabular data. """ -import copy - -from hed.models.base_input import BaseInput -from hed.errors.error_types import ColumnErrors, ErrorContext, ValidationErrors -from hed.errors.error_reporter import ErrorHandler -from hed.models.column_mapper import ColumnType -from hed.models.hed_string import HedString -from hed.errors.error_reporter import sort_issues, check_for_any_errors -from hed.validator.onset_validator import OnsetValidator -from hed.validator.hed_validator import HedValidator -from hed.models import df_util - - -PANDAS_COLUMN_PREFIX_TO_IGNORE = "Unnamed: " - - -class SpreadsheetValidator: - def __init__(self, hed_schema): - """ - Constructor for the SpreadsheetValidator class. - - Parameters: - hed_schema (HedSchema): HED schema object to use for validation. - """ - self._schema = hed_schema - self._hed_validator = None - self._onset_validator = None - self.invalid_original_rows = set() - - def validate(self, data, def_dicts=None, name=None, error_handler=None): - """ - Validate the input data using the schema - - Parameters: - data (BaseInput): Input data to be validated. - def_dicts(list of DefDict or DefDict): all definitions to use for validation - name(str): The name to report errors from this file as - error_handler (ErrorHandler): Error context to use. Creates a new one if None - Returns: - issues (list of dict): A list of issues for hed string - """ - - issues = [] - if error_handler is None: - error_handler = ErrorHandler() - - if not isinstance(data, BaseInput): - raise TypeError("Invalid type passed to spreadsheet validator. Can only validate BaseInput objects.") - - self.invalid_original_rows = set() - - error_handler.push_error_context(ErrorContext.FILE_NAME, name) - # Adjust to account for 1 based - row_adj = 1 - # Adjust to account for column names - if data.has_column_names: - row_adj += 1 - issues += self._validate_column_structure(data, error_handler, row_adj) - - if data.needs_sorting: - data_new = copy.deepcopy(data) - data_new._dataframe = df_util.sort_dataframe_by_onsets(data.dataframe) - issues += error_handler.format_error_with_context(ValidationErrors.ONSETS_OUT_OF_ORDER) - data = data_new - - onsets = df_util.split_delay_tags(data.series_a, self._schema, data.onsets) - df = data.dataframe_a - - self._hed_validator = HedValidator(self._schema, def_dicts=def_dicts) - if data.onsets is not None: - self._onset_validator = OnsetValidator() - else: - self._onset_validator = None - - # Check the rows of the input data - issues += self._run_checks(df, error_handler=error_handler, row_adj=row_adj, - has_onsets=bool(self._onset_validator)) - if self._onset_validator: - issues += self._run_onset_checks(onsets, error_handler=error_handler, row_adj=row_adj) - error_handler.pop_error_context() - - issues = sort_issues(issues) - return issues - - def _run_checks(self, hed_df, error_handler, row_adj, has_onsets): - issues = [] - columns = list(hed_df.columns) - self.invalid_original_rows = set() - for row_number, text_file_row in hed_df.iterrows(): - error_handler.push_error_context(ErrorContext.ROW, row_number + row_adj) - row_strings = [] - new_column_issues = [] - for column_number, cell in enumerate(text_file_row): - if not cell or cell == "n/a": - continue - - error_handler.push_error_context(ErrorContext.COLUMN, columns[column_number]) - - column_hed_string = HedString(cell, self._schema) - row_strings.append(column_hed_string) - error_handler.push_error_context(ErrorContext.HED_STRING, column_hed_string) - new_column_issues = self._hed_validator.run_basic_checks(column_hed_string, allow_placeholders=False) - - error_handler.add_context_and_filter(new_column_issues) - error_handler.pop_error_context() # HedString - error_handler.pop_error_context() # column - - issues += new_column_issues - # We want to do full onset checks on the combined and filtered rows - if check_for_any_errors(new_column_issues): - self.invalid_original_rows.add(row_number) - error_handler.pop_error_context() # Row - continue - - if has_onsets or not row_strings: - error_handler.pop_error_context() # Row - continue - - row_string = HedString.from_hed_strings(row_strings) - - if row_string: - error_handler.push_error_context(ErrorContext.HED_STRING, row_string) - new_column_issues = self._hed_validator.run_full_string_checks(row_string) - new_column_issues += OnsetValidator.check_for_banned_tags(row_string) - error_handler.add_context_and_filter(new_column_issues) - error_handler.pop_error_context() # HedString - issues += new_column_issues - error_handler.pop_error_context() # Row - return issues - - def _run_onset_checks(self, onset_filtered, error_handler, row_adj): - issues = [] - for row in onset_filtered[["HED", "original_index"]].itertuples(index=True): - # Skip rows that had issues. - if row.original_index in self.invalid_original_rows: - continue - error_handler.push_error_context(ErrorContext.ROW, row.original_index + row_adj) - row_string = HedString(row.HED, self._schema, self._hed_validator._def_validator) - - if row_string: - error_handler.push_error_context(ErrorContext.HED_STRING, row_string) - new_column_issues = self._hed_validator.run_full_string_checks(row_string) - new_column_issues += self._onset_validator.validate_temporal_relations(row_string) - error_handler.add_context_and_filter(new_column_issues) - error_handler.pop_error_context() # HedString - issues += new_column_issues - error_handler.pop_error_context() # Row - return issues - - def _validate_column_structure(self, base_input, error_handler, row_adj): - """ - Validate that each column in the input data has valid values. - - Parameters: - base_input (BaseInput): The input data to be validated. - error_handler (ErrorHandler): Holds context - row_adj(int): Number to adjust row by for reporting errors - Returns: - List of issues associated with each invalid value. Each issue is a dictionary. - """ - issues = [] - col_issues = base_input._mapper.check_for_mapping_issues(base_input) - error_handler.add_context_and_filter(col_issues) - issues += col_issues - for column in base_input.column_metadata().values(): - if column.column_type == ColumnType.Categorical: - error_handler.push_error_context(ErrorContext.COLUMN, column.column_name) - valid_keys = column.hed_dict.keys() - for row_number, value in enumerate(base_input.dataframe[column.column_name]): - if value != "n/a" and value not in valid_keys: - error_handler.push_error_context(ErrorContext.ROW, row_number + row_adj) - issues += error_handler.format_error_with_context(ValidationErrors.SIDECAR_KEY_MISSING, - invalid_key=value, - category_keys=list(valid_keys)) - error_handler.pop_error_context() - error_handler.pop_error_context() - - column_refs = base_input.get_column_refs() - columns = base_input.columns - for ref in column_refs: - if ref not in columns: - issues += error_handler.format_error_with_context(ColumnErrors.INVALID_COLUMN_REF, - bad_ref=ref) - - return issues +""" Validates spreadsheet tabular data. """ +import copy + +from hed.models.base_input import BaseInput +from hed.errors.error_types import ColumnErrors, ErrorContext, ValidationErrors +from hed.errors.error_reporter import ErrorHandler +from hed.models.column_mapper import ColumnType +from hed.models.hed_string import HedString +from hed.errors.error_reporter import sort_issues, check_for_any_errors +from hed.validator.onset_validator import OnsetValidator +from hed.validator.hed_validator import HedValidator +from hed.models import df_util + + +PANDAS_COLUMN_PREFIX_TO_IGNORE = "Unnamed: " + + +class SpreadsheetValidator: + def __init__(self, hed_schema): + """ + Constructor for the SpreadsheetValidator class. + + Parameters: + hed_schema (HedSchema): HED schema object to use for validation. + """ + self._schema = hed_schema + self._hed_validator = None + self._onset_validator = None + self.invalid_original_rows = set() + + def validate(self, data, def_dicts=None, name=None, error_handler=None): + """ + Validate the input data using the schema + + Parameters: + data (BaseInput): Input data to be validated. + def_dicts(list of DefDict or DefDict): all definitions to use for validation + name(str): The name to report errors from this file as + error_handler (ErrorHandler): Error context to use. Creates a new one if None + Returns: + issues (list of dict): A list of issues for HED string + """ + + issues = [] + if error_handler is None: + error_handler = ErrorHandler() + + if not isinstance(data, BaseInput): + raise TypeError("Invalid type passed to spreadsheet validator. Can only validate BaseInput objects.") + + self.invalid_original_rows = set() + + error_handler.push_error_context(ErrorContext.FILE_NAME, name) + # Adjust to account for 1 based + row_adj = 1 + # Adjust to account for column names + if data.has_column_names: + row_adj += 1 + issues += self._validate_column_structure(data, error_handler, row_adj) + + if data.needs_sorting: + data_new = copy.deepcopy(data) + data_new._dataframe = df_util.sort_dataframe_by_onsets(data.dataframe) + issues += error_handler.format_error_with_context(ValidationErrors.ONSETS_OUT_OF_ORDER) + data = data_new + + onsets = df_util.split_delay_tags(data.series_a, self._schema, data.onsets) + df = data.dataframe_a + + self._hed_validator = HedValidator(self._schema, def_dicts=def_dicts) + if data.onsets is not None: + self._onset_validator = OnsetValidator() + else: + self._onset_validator = None + + # Check the rows of the input data + issues += self._run_checks(df, error_handler=error_handler, row_adj=row_adj, + has_onsets=bool(self._onset_validator)) + if self._onset_validator: + issues += self._run_onset_checks(onsets, error_handler=error_handler, row_adj=row_adj) + error_handler.pop_error_context() + + issues = sort_issues(issues) + return issues + + def _run_checks(self, hed_df, error_handler, row_adj, has_onsets): + issues = [] + columns = list(hed_df.columns) + self.invalid_original_rows = set() + for row_number, text_file_row in hed_df.iterrows(): + error_handler.push_error_context(ErrorContext.ROW, row_number + row_adj) + row_strings = [] + new_column_issues = [] + for column_number, cell in enumerate(text_file_row): + if not cell or cell == "n/a": + continue + + error_handler.push_error_context(ErrorContext.COLUMN, columns[column_number]) + + column_hed_string = HedString(cell, self._schema) + row_strings.append(column_hed_string) + error_handler.push_error_context(ErrorContext.HED_STRING, column_hed_string) + new_column_issues = self._hed_validator.run_basic_checks(column_hed_string, allow_placeholders=False) + + error_handler.add_context_and_filter(new_column_issues) + error_handler.pop_error_context() # HedString + error_handler.pop_error_context() # column + + issues += new_column_issues + # We want to do full onset checks on the combined and filtered rows + if check_for_any_errors(new_column_issues): + self.invalid_original_rows.add(row_number) + error_handler.pop_error_context() # Row + continue + + if has_onsets or not row_strings: + error_handler.pop_error_context() # Row + continue + + row_string = HedString.from_hed_strings(row_strings) + + if row_string: + error_handler.push_error_context(ErrorContext.HED_STRING, row_string) + new_column_issues = self._hed_validator.run_full_string_checks(row_string) + new_column_issues += OnsetValidator.check_for_banned_tags(row_string) + error_handler.add_context_and_filter(new_column_issues) + error_handler.pop_error_context() # HedString + issues += new_column_issues + error_handler.pop_error_context() # Row + return issues + + def _run_onset_checks(self, onset_filtered, error_handler, row_adj): + issues = [] + for row in onset_filtered[["HED", "original_index"]].itertuples(index=True): + # Skip rows that had issues. + if row.original_index in self.invalid_original_rows: + continue + error_handler.push_error_context(ErrorContext.ROW, row.original_index + row_adj) + row_string = HedString(row.HED, self._schema, self._hed_validator._def_validator) + + if row_string: + error_handler.push_error_context(ErrorContext.HED_STRING, row_string) + new_column_issues = self._hed_validator.run_full_string_checks(row_string) + new_column_issues += self._onset_validator.validate_temporal_relations(row_string) + error_handler.add_context_and_filter(new_column_issues) + error_handler.pop_error_context() # HedString + issues += new_column_issues + error_handler.pop_error_context() # Row + return issues + + def _validate_column_structure(self, base_input, error_handler, row_adj): + """ + Validate that each column in the input data has valid values. + + Parameters: + base_input (BaseInput): The input data to be validated. + error_handler (ErrorHandler): Holds context + row_adj(int): Number to adjust row by for reporting errors + Returns: + List of issues associated with each invalid value. Each issue is a dictionary. + """ + issues = [] + col_issues = base_input._mapper.check_for_mapping_issues(base_input) + error_handler.add_context_and_filter(col_issues) + issues += col_issues + for column in base_input.column_metadata().values(): + if column.column_type == ColumnType.Categorical: + error_handler.push_error_context(ErrorContext.COLUMN, column.column_name) + valid_keys = column.hed_dict.keys() + for row_number, value in enumerate(base_input.dataframe[column.column_name]): + if value != "n/a" and value not in valid_keys: + error_handler.push_error_context(ErrorContext.ROW, row_number + row_adj) + issues += error_handler.format_error_with_context(ValidationErrors.SIDECAR_KEY_MISSING, + invalid_key=value, + category_keys=list(valid_keys)) + error_handler.pop_error_context() + error_handler.pop_error_context() + + column_refs = base_input.get_column_refs() + columns = base_input.columns + for ref in column_refs: + if ref not in columns: + issues += error_handler.format_error_with_context(ColumnErrors.INVALID_COLUMN_REF, + bad_ref=ref) + + return issues diff --git a/tests/data/remodel_tests/summarize_hed_types_rmdl.json b/tests/data/remodel_tests/summarize_hed_types_rmdl.json index 68b0794d9..a1ef83348 100644 --- a/tests/data/remodel_tests/summarize_hed_types_rmdl.json +++ b/tests/data/remodel_tests/summarize_hed_types_rmdl.json @@ -1,7 +1,7 @@ [ { "operation": "summarize_hed_type", - "description": "Summarize a hed type tag such as condition-variable", + "description": "Summarize a HED type tag such as condition-variable", "parameters": { "summary_name": "Hed type summary", "summary_filename": "hed_type_summary", diff --git a/tests/models/test_hed_group.py b/tests/models/test_hed_group.py index 22d7939c2..3575d706b 100644 --- a/tests/models/test_hed_group.py +++ b/tests/models/test_hed_group.py @@ -59,7 +59,7 @@ def test_find_tags_with_term(self): # works located_tags = basic_hed_string_obj.find_tags_with_term("Object", recursive=True, include_groups=0) self.assertEqual(len(located_tags), 5) - # located tags now has found all 5 hed tags + # located tags now has found all 5 HED tags # This will find no tags located_tags = basic_hed_string_obj.find_tags_with_term("reject", recursive=True, include_groups=0) diff --git a/tests/schema/test_hed_schema_io.py b/tests/schema/test_hed_schema_io.py index 5e05a8a94..5f26d5b16 100644 --- a/tests/schema/test_hed_schema_io.py +++ b/tests/schema/test_hed_schema_io.py @@ -182,7 +182,7 @@ def test_load_schema_version_libraries(self): ver3 = ["8.0.0", "sc:score_1.0.0"] schemas3 = load_schema_version(ver3) self.assertIsInstance(schemas3, HedSchemaGroup, "load_schema_version returns HedSchema version+namespace") - self.assertIsInstance(schemas3._schemas, dict, "load_schema_version group keeps dictionary of hed versions") + self.assertIsInstance(schemas3._schemas, dict, "load_schema_version group keeps dictionary of HED versions") self.assertEqual(len(schemas3._schemas), 2, "load_schema_version group dictionary is right length") self.assertEqual(schemas3.name, "8.0.0,sc:score_1.0.0") s = schemas3._schemas[""] @@ -192,7 +192,7 @@ def test_load_schema_version_libraries(self): formatted_list = schemas3.get_formatted_version() schemas4 = load_schema_version(formatted_list) self.assertIsInstance(schemas4, HedSchemaGroup, "load_schema_version returns HedSchema version+namespace") - self.assertIsInstance(schemas4._schemas, dict, "load_schema_version group keeps dictionary of hed versions") + self.assertIsInstance(schemas4._schemas, dict, "load_schema_version group keeps dictionary of HED versions") self.assertEqual(len(schemas4._schemas), 2, "load_schema_version group dictionary is right length") self.assertEqual(schemas4.get_formatted_version(), '["8.0.0", "sc:score_1.0.0"]', "load_schema_version gives correct version_string with multiple prefixes") @@ -215,7 +215,7 @@ def test_load_schema_version_libraries(self): class TestHedSchemaUnmerged(unittest.TestCase): - # Verify the hed cache can handle loading unmerged with_standard schemas in case they are ever used + # Verify the HED cache can handle loading unmerged with_standard schemas in case they are ever used @classmethod def setUpClass(cls): hed_cache_dir = os.path.join(os.path.dirname(os.path.realpath(__file__)), diff --git a/tests/tools/bids/test_bids_tabular_file.py b/tests/tools/bids/test_bids_tabular_file.py index ce57a1725..4cb75f5de 100644 --- a/tests/tools/bids/test_bids_tabular_file.py +++ b/tests/tools/bids/test_bids_tabular_file.py @@ -34,7 +34,7 @@ def test_set_contents_no_sidecar(self): events.set_contents() self.assertIsInstance(events.contents, TabularInput, "BidsTabularFile should have TabularInput contents after setting.") - self.assertFalse(events.has_hed, "set_contents indicate hed if no sidecar and no HED columns.") + self.assertFalse(events.has_hed, "set_contents indicate HED if no sidecar and no HED columns.") events.clear_contents() self.assertFalse(events.contents, "BidsTabularFile should have no contents after clearing.")