diff --git a/deploy_hed/Dockerfile b/deploy_hed/Dockerfile index 3fe1bbbb..5d6da1c8 100644 --- a/deploy_hed/Dockerfile +++ b/deploy_hed/Dockerfile @@ -28,7 +28,6 @@ RUN mkdir -p /var/log/hedtools && \ # Copy the application code into the container COPY ./hedtools /root/hedtools/ COPY ./hedtools/hedweb /root/hedtools/hedweb/ -#COPY ./hedtools/hedweb/runserver.py /root/ # Set the PYTHONPATH environment variable to include /root/hedtools ENV PYTHONPATH="/root/hedtools" diff --git a/deploy_hed_dev/Dockerfile b/deploy_hed_dev/Dockerfile index ffbd9fa8..a1509a14 100644 --- a/deploy_hed_dev/Dockerfile +++ b/deploy_hed_dev/Dockerfile @@ -29,7 +29,6 @@ RUN mkdir -p /var/log/hedtools && \ # Copy the application code into the container COPY ./hedtools /root/hedtools/ COPY ./hedtools/hedweb /root/hedtools/hedweb/ -# COPY ./hedtools/hedweb/runserver.py /root/ # Set the PYTHONPATH environment variable to include /root/hedtools ENV PYTHONPATH="/root/hedtools" diff --git a/deploy_hed_dev/deploy.sh b/deploy_hed_dev/deploy.sh index 673bdbf1..9981931c 100644 --- a/deploy_hed_dev/deploy.sh +++ b/deploy_hed_dev/deploy.sh @@ -112,6 +112,6 @@ echo "[INFO] Running new container..." run_new_container echo "[INFO] Cleaning up temporary directories..." -# cleanup_directories +cleanup_directories echo "[INFO] Deployment successful!" \ No newline at end of file diff --git a/hedweb/constants/base_constants.py b/hedweb/constants/base_constants.py index 254e0421..c4ad54ff 100644 --- a/hedweb/constants/base_constants.py +++ b/hedweb/constants/base_constants.py @@ -1,3 +1,4 @@ +APPEND_ASSEMBLED = 'append_assembled' CHECK_FOR_WARNINGS = 'check_for_warnings' COLUMN_COUNTS = 'column_counts' diff --git a/hedweb/event_operations.py b/hedweb/event_operations.py index 6989e6d5..e9bf8700 100644 --- a/hedweb/event_operations.py +++ b/hedweb/event_operations.py @@ -1,284 +1,282 @@ -import json - -from hed import schema as hedschema -from hed.errors import get_printable_issue_string, HedFileError, ErrorHandler -from hed.errors.error_reporter import check_for_any_errors -from hed.models.definition_dict import DefinitionDict -from hed.models.tabular_input import TabularInput -from hed.models.query_service import get_query_handlers, search_hed_objs -from hed.tools.remodeling.dispatcher import Dispatcher -from hed.tools.remodeling.remodeler_validator import RemodelerValidator -from hed.tools.analysis.hed_tag_manager import HedTagManager -from hed.tools.analysis.event_manager import EventManager -from hed.tools.analysis.tabular_summary import TabularSummary -from hed.tools.analysis.annotation_util import generate_sidecar_entry -from hedweb.constants import base_constants as bc -from hedweb.base_operations import BaseOperations -from hedweb.web_util import generate_filename, get_schema_versions - - -class EventOperations(BaseOperations): - - def __init__(self, arguments=None): - """ Construct a ProcessEvents object to handle events form requests. - - Parameters: - arguments (dict): Dictionary with parameters extracted from form or service - - """ - self.schema = None - self.events = None - self.command = None - self.check_for_warnings = False - self.columns_skip = [] - self.columns_value = [] - self.expand_defs = False - self.include_context = False - self.include_summaries = False - self.queries = None - self.query_names = None - self.remodel_operations = None - self.remove_types_on = False - self.replace_defs = False - self.sidecar = None - if arguments: - self.set_input_from_dict(arguments) - - def process(self): - """ Perform the requested action for the events file and its sidecar. - - Returns: - dict: A dictionary of results in the standard results format. - - Raises: - HedFileError: If the command was not found or the input arguments were not valid. - - """ - if not self.command: - raise HedFileError('MissingCommand', 'Command is missing', '') - elif self.command == bc.COMMAND_GENERATE_SIDECAR or self.command == bc.COMMAND_REMODEL: - pass - elif not self.schema or not \ - isinstance(self.schema, (hedschema.hed_schema.HedSchema, hedschema.hed_schema_group.HedSchemaGroup)): - raise HedFileError('BadHedSchema', "Please provide a valid HedSchema for event processing", "") - - if not self.events or not isinstance(self.events, TabularInput): - raise HedFileError('InvalidEventsFile', "An events file was not given or could not be read", "") - if self.command == bc.COMMAND_VALIDATE: - results = self.validate() - elif self.command == bc.COMMAND_SEARCH: - results = self.search() - elif self.command == bc.COMMAND_ASSEMBLE: - results = self.assemble() - elif self.command == bc.COMMAND_GENERATE_SIDECAR: - results = self.generate_sidecar() - elif self.command == bc.COMMAND_REMODEL: - results = self.remodel() - else: - raise HedFileError('UnknownEventsProcessingMethod', f'Command {self.command} is missing or invalid', '') - return results - - def assemble(self): - """ Create a tabular file with the original positions in first column and a HED column. - - - Returns: - dict: A dictionary of results in standard format including either the assembled events string or errors. - - Notes: - options include columns_included and expand_defs. - """ - - self.check_for_warnings = False - results = self.validate() - if results['data']: - return results - hed_objs, definitions = self.get_hed_objs() - hed_strs = [str(obj) if obj is not None else '' for obj in hed_objs] - display_name = self.events.name - file_name = generate_filename(display_name, name_suffix='_expanded', extension='.tsv', append_datetime=True) - return {bc.COMMAND: bc.COMMAND_ASSEMBLE, - bc.COMMAND_TARGET: 'events', - 'data': hed_strs, 'output_display_name': file_name, - 'definitions': DefinitionDict.get_as_strings(definitions), - 'schema_version': self.schema.get_formatted_version(), - 'msg_category': 'success', 'msg': 'Events file successfully expanded'} - - def generate_sidecar(self): - """ Generate a JSON sidecar template from a BIDS-style events file. - - Returns: - dict: A dictionary of results in standard format including either the generated sidecar string or errors. - - Notes: Options are the columns selected. If None, all columns are used. - - """ - display_name = self.events.name - if self.columns_skip and self.columns_value: - overlap = set(self.columns_skip).intersection(set(self.columns_value)) - if overlap: - return {bc.COMMAND: bc.COMMAND_GENERATE_SIDECAR, bc.COMMAND_TARGET: 'events', - 'data': f"Skipped and value column names have these names in common: {str(overlap)}", - "output_display_name": generate_filename(display_name, name_suffix='sidecar_generation_issues', - extension='.txt', append_datetime=True), - bc.MSG_CATEGORY: 'warning', - bc.MSG: f"Cannot generate sidecar because skipped and value column names overlap."} - tab_sum = TabularSummary(value_cols=self.columns_value, skip_cols=self.columns_skip) - tab_sum.update(self.events.dataframe) - hed_dict = tab_sum.extract_sidecar_template() - # columns_info = TabularSummary.get_columns_info(self.events.dataframe) - # hed_dict = {} - # for column_name in columns_info: - # if column_name in self.columns_skip: - # continue - # elif column_name in self.columns_value: - # hed_dict[column_name] = generate_sidecar_entry(column_name) - # else: - # hed_dict[column_name] = generate_sidecar_entry(column_name, - # column_values=list(columns_info[column_name].keys())) - file_name = generate_filename(display_name, name_suffix='_generated', extension='.json', append_datetime=True) - return {bc.COMMAND: bc.COMMAND_GENERATE_SIDECAR, - bc.COMMAND_TARGET: 'events', - 'data': json.dumps(hed_dict, indent=4), - 'output_display_name': file_name, 'msg_category': 'success', - 'msg': 'JSON sidecar generation from event file complete'} - - def get_hed_objs(self): - """ Return the assembled objects and applicable definitions. """ - definitions = self.events.get_def_dict(self.schema) - event_manager = EventManager(self.events, self.schema) - if self.remove_types_on: - types = ['Condition-variable', 'Task'] - else: - types = [] - tag_manager = HedTagManager(event_manager, types) - hed_objs = tag_manager.get_hed_objs(self.include_context, self.replace_defs) - return hed_objs, definitions - - def remodel(self): - """ Remodel a given events file. - - Returns: - dict: A dictionary pointing to results or errors. - - Notes: The options for this are - - include_summaries (bool): If true and summaries exist, package event file and summaries in a zip file. - - """ - - display_name = self.events.name - if not self.remodel_operations: - raise HedFileError("RemodelingOperationsMissing", "Must supply remodeling operations for remodeling", "") - remodel_name = self.remodel_operations['name'] - operations = self.remodel_operations['operations'] - validator = RemodelerValidator() - errors = validator.validate(operations) - if errors: - issue_str = "\n".join(errors) - file_name = generate_filename(remodel_name, name_suffix='_operation_parse_errors', - extension='.txt', append_datetime=True) - return {bc.COMMAND: bc.COMMAND_REMODEL, - bc.COMMAND_TARGET: 'events', - 'data': issue_str, 'output_display_name': file_name, - 'msg_category': "warning", - 'msg': f"Remodeling operation list for {display_name} had validation issues"} - df = self.events.dataframe - dispatch = Dispatcher(operations, data_root=None, hed_versions=self.schema) - - for operation in dispatch.parsed_ops: - df = dispatch.prep_data(df) - df = operation.do_op(dispatch, df, display_name, sidecar=self.sidecar) - df = dispatch.post_proc_data(df) - data = df.to_csv(None, sep='\t', index=False, header=True, lineterminator='\n') - name_suffix = f"_remodeled_by_{remodel_name}" - file_name = generate_filename(display_name, name_suffix=name_suffix, extension='.tsv', append_datetime=True) - output_name = file_name - response = {bc.COMMAND: bc.COMMAND_REMODEL, - bc.COMMAND_TARGET: 'events', 'data': '', "output_display_name": output_name, - bc.SCHEMA_VERSION: get_schema_versions(self.schema), - bc.MSG_CATEGORY: 'success', - bc.MSG: f"Command parsing for {display_name} remodeling was successful"} - if dispatch.summary_dicts and self.include_summaries: - file_list = dispatch.get_summaries() - file_list.append({'file_name': output_name, 'file_format': '.tsv', 'file_type': 'tabular', 'content': data}) - response[bc.FILE_LIST] = file_list - response[bc.ZIP_NAME] = generate_filename(display_name, name_suffix=name_suffix + '_zip', - extension='.zip', append_datetime=True) - else: - response['data'] = data - return response - - def search(self): - """ Create a three-column tsv file with event number, matched string, and assembled strings for matched events. - - Returns: - dict: A dictionary pointing to results or errors. - - Notes: The options for this are - columns_included (list): A list of column names of columns to include. - expand_defs (bool): If True, expand the definitions in the assembled HED. Otherwise, shrink definitions. - - """ - - display_name = self.events.name - queries, query_names, issues = get_query_handlers(self.queries, self.query_names) - if issues: - return {bc.COMMAND: bc.COMMAND_SEARCH, bc.COMMAND_TARGET: 'events', - 'data': "Query errors:\n" + "\n".join(issues), - "output_display_name": generate_filename(display_name, name_suffix='query_validation_issues', - extension='.txt', append_datetime=True), - bc.SCHEMA_VERSION: get_schema_versions(self.schema), - bc.MSG_CATEGORY: 'warning', - bc.MSG: f"Queries had validation issues"} - self.check_for_warnings = False - results = self.validate() - if results['data']: - return results - hed_objs, definitions = self.get_hed_objs() - df_factors = search_hed_objs(hed_objs, queries, query_names=query_names) - if self.query_names: - write_header = True - else: - write_header = False - file_name = generate_filename(display_name, name_suffix='_queries', extension='.tsv', append_datetime=True) - return {bc.COMMAND: bc.COMMAND_SEARCH, - bc.COMMAND_TARGET: 'events', - 'data': df_factors.to_csv(None, sep='\t', index=False, header=write_header, lineterminator='\n'), - 'definitions': DefinitionDict.get_as_strings(definitions), - 'output_display_name': file_name, 'schema_version': self.schema.get_formatted_version(), - bc.MSG_CATEGORY: 'success', - bc.MSG: f"Successfully made {len(self.queries)} queries for {display_name}"} - - def validate(self): - """ Validate the events tabular input object and return the results. - - Returns: - dict: A dictionary containing results of validation in standard format. - - Notes: The dictionary of options includes the following. - - check_for_warnings (bool): If true, validation should include warnings. (default False) - - """ - display_name = self.events.name - error_handler = ErrorHandler(check_for_warnings=self.check_for_warnings) - issues = [] - if self.sidecar: - issues = self.sidecar.validate(self.schema, name=self.sidecar.name, error_handler=error_handler) - if not check_for_any_errors(issues): - issues += self.events.validate(self.schema, name=self.events.name, error_handler=error_handler) - if issues: - data = get_printable_issue_string(issues, title="Event file errors:") - file_name = generate_filename(display_name, name_suffix='_validation_issues', - extension='.txt', append_datetime=True) - category = 'warning' - msg = f"Events file {display_name} had validation issues" - else: - data = '' - file_name = display_name - category = 'success' - msg = f"Events file {display_name} did not have validation issues" - - return {bc.COMMAND: bc.COMMAND_VALIDATE, bc.COMMAND_TARGET: 'events', - 'data': data, "output_display_name": file_name, - bc.SCHEMA_VERSION: get_schema_versions(self.schema), - bc.MSG_CATEGORY: category, bc.MSG: msg} +import json +from io import StringIO +import pandas as pd + +from hed import schema as hedschema +from hed.errors import get_printable_issue_string, HedFileError, ErrorHandler +from hed.errors.error_reporter import check_for_any_errors +from hed.models.definition_dict import DefinitionDict +from hed.models.tabular_input import TabularInput +from hed.models.query_service import get_query_handlers, search_hed_objs +from hed.tools.remodeling.dispatcher import Dispatcher +from hed.tools.remodeling.remodeler_validator import RemodelerValidator +from hed.tools.analysis.hed_tag_manager import HedTagManager +from hed.tools.analysis.event_manager import EventManager +from hed.tools.analysis.tabular_summary import TabularSummary +from hedweb.constants import base_constants as bc +from hedweb.base_operations import BaseOperations +from hedweb.web_util import generate_filename, get_schema_versions + + +class EventOperations(BaseOperations): + + def __init__(self, arguments=None): + """ Construct a ProcessEvents object to handle events form requests. + + Parameters: + arguments (dict): Dictionary with parameters extracted from form or service + + """ + self.schema = None + self.events = None + self.command = None + self.append_assembled = False + self.check_for_warnings = False + self.columns_skip = [] + self.columns_value = [] + self.expand_defs = False + self.include_context = False + self.include_summaries = False + self.queries = None + self.query_names = None + self.remodel_operations = None + self.remove_types_on = False + self.replace_defs = False + self.sidecar = None + if arguments: + self.set_input_from_dict(arguments) + + def process(self): + """ Perform the requested action for the events file and its sidecar. + + Returns: + dict: A dictionary of results in the standard results format. + + Raises: + HedFileError: If the command was not found or the input arguments were not valid. + + """ + if not self.command: + raise HedFileError('MissingCommand', 'Command is missing', '') + elif self.command == bc.COMMAND_GENERATE_SIDECAR or self.command == bc.COMMAND_REMODEL: + pass + elif not self.schema or not \ + isinstance(self.schema, (hedschema.hed_schema.HedSchema, hedschema.hed_schema_group.HedSchemaGroup)): + raise HedFileError('BadHedSchema', "Please provide a valid HedSchema for event processing", "") + + if not self.events or not isinstance(self.events, TabularInput): + raise HedFileError('InvalidEventsFile', "An events file was not given or could not be read", "") + if self.command == bc.COMMAND_VALIDATE: + results = self.validate() + elif self.command == bc.COMMAND_SEARCH: + results = self.search() + elif self.command == bc.COMMAND_ASSEMBLE: + results = self.assemble() + elif self.command == bc.COMMAND_GENERATE_SIDECAR: + results = self.generate_sidecar() + elif self.command == bc.COMMAND_REMODEL: + results = self.remodel() + else: + raise HedFileError('UnknownEventsProcessingMethod', f'Command {self.command} is missing or invalid', '') + return results + + def assemble(self): + """ Create a tabular file with the original positions in first column and a HED column. + + + Returns: + dict: A dictionary of results in standard format including either the assembled events string or errors. + + Notes: + options include columns_included and expand_defs. + """ + + self.check_for_warnings = False + results = self.validate() + if results['data']: + return results + hed_objs, definitions = self.get_hed_objs() + data = [str(obj) if obj is not None else '' for obj in hed_objs] + if self.append_assembled: + df = self.events.dataframe + df['HedAssembled'] = data + with StringIO() as output: + df.to_csv(output, sep='\t', index=False, header=True) + data = output.getvalue() # Retrieve the written string + display_name = self.events.name + file_name = generate_filename(display_name, name_suffix='_expanded', extension='.tsv', append_datetime=True) + return {bc.COMMAND: bc.COMMAND_ASSEMBLE, + bc.COMMAND_TARGET: 'events', + 'data': data, 'output_display_name': file_name, + 'definitions': DefinitionDict.get_as_strings(definitions), + 'schema_version': self.schema.get_formatted_version(), + 'msg_category': 'success', 'msg': 'Events file successfully expanded'} + + def generate_sidecar(self): + """ Generate a JSON sidecar template from a BIDS-style events file. + + Returns: + dict: A dictionary of results in standard format including either the generated sidecar string or errors. + + Notes: Options are the columns selected. If None, all columns are used. + + """ + display_name = self.events.name + if self.columns_skip and self.columns_value: + overlap = set(self.columns_skip).intersection(set(self.columns_value)) + if overlap: + return {bc.COMMAND: bc.COMMAND_GENERATE_SIDECAR, bc.COMMAND_TARGET: 'events', + 'data': f"Skipped and value column names have these names in common: {str(overlap)}", + "output_display_name": generate_filename(display_name, name_suffix='sidecar_generation_issues', + extension='.txt', append_datetime=True), + bc.MSG_CATEGORY: 'warning', + bc.MSG: f"Cannot generate sidecar because skipped and value column names overlap."} + tab_sum = TabularSummary(value_cols=self.columns_value, skip_cols=self.columns_skip) + tab_sum.update(self.events.dataframe) + hed_dict = tab_sum.extract_sidecar_template() + file_name = generate_filename(display_name, name_suffix='_generated', extension='.json', append_datetime=True) + return {bc.COMMAND: bc.COMMAND_GENERATE_SIDECAR, + bc.COMMAND_TARGET: 'events', + 'data': json.dumps(hed_dict, indent=4), + 'output_display_name': file_name, 'msg_category': 'success', + 'msg': 'JSON sidecar generation from event file complete'} + + def get_hed_objs(self): + """ Return the assembled objects and applicable definitions. """ + definitions = self.events.get_def_dict(self.schema) + event_manager = EventManager(self.events, self.schema) + if self.remove_types_on: + types = ['Condition-variable', 'Task'] + else: + types = [] + tag_manager = HedTagManager(event_manager, types) + hed_objs = tag_manager.get_hed_objs(self.include_context, self.replace_defs) + return hed_objs, definitions + + def remodel(self): + """ Remodel a given events file. + + Returns: + dict: A dictionary pointing to results or errors. + + Notes: The options for this are + - include_summaries (bool): If true and summaries exist, package event file and summaries in a zip file. + + """ + + display_name = self.events.name + if not self.remodel_operations: + raise HedFileError("RemodelingOperationsMissing", "Must supply remodeling operations for remodeling", "") + remodel_name = self.remodel_operations['name'] + operations = self.remodel_operations['operations'] + validator = RemodelerValidator() + errors = validator.validate(operations) + if errors: + issue_str = "\n".join(errors) + file_name = generate_filename(remodel_name, name_suffix='_operation_parse_errors', + extension='.txt', append_datetime=True) + return {bc.COMMAND: bc.COMMAND_REMODEL, + bc.COMMAND_TARGET: 'events', + 'data': issue_str, 'output_display_name': file_name, + 'msg_category': "warning", + 'msg': f"Remodeling operation list for {display_name} had validation issues"} + df = self.events.dataframe + dispatch = Dispatcher(operations, data_root=None, hed_versions=self.schema) + + for operation in dispatch.parsed_ops: + df = dispatch.prep_data(df) + df = operation.do_op(dispatch, df, display_name, sidecar=self.sidecar) + df = dispatch.post_proc_data(df) + data = df.to_csv(None, sep='\t', index=False, header=True, lineterminator='\n') + name_suffix = f"_remodeled_by_{remodel_name}" + file_name = generate_filename(display_name, name_suffix=name_suffix, extension='.tsv', append_datetime=True) + output_name = file_name + response = {bc.COMMAND: bc.COMMAND_REMODEL, + bc.COMMAND_TARGET: 'events', 'data': '', "output_display_name": output_name, + bc.SCHEMA_VERSION: get_schema_versions(self.schema), + bc.MSG_CATEGORY: 'success', + bc.MSG: f"Command parsing for {display_name} remodeling was successful"} + if dispatch.summary_dicts and self.include_summaries: + file_list = dispatch.get_summaries() + file_list.append({'file_name': output_name, 'file_format': '.tsv', 'file_type': 'tabular', 'content': data}) + response[bc.FILE_LIST] = file_list + response[bc.ZIP_NAME] = generate_filename(display_name, name_suffix=name_suffix + '_zip', + extension='.zip', append_datetime=True) + else: + response['data'] = data + return response + + def search(self): + """ Create a three-column tsv file with event number, matched string, and assembled strings for matched events. + + Returns: + dict: A dictionary pointing to results or errors. + + Notes: The options for this are + columns_included (list): A list of column names of columns to include. + expand_defs (bool): If True, expand the definitions in the assembled HED. Otherwise, shrink definitions. + + """ + + display_name = self.events.name + queries, query_names, issues = get_query_handlers(self.queries, self.query_names) + if issues: + return {bc.COMMAND: bc.COMMAND_SEARCH, bc.COMMAND_TARGET: 'events', + 'data': "Query errors:\n" + "\n".join(issues), + "output_display_name": generate_filename(display_name, name_suffix='query_validation_issues', + extension='.txt', append_datetime=True), + bc.SCHEMA_VERSION: get_schema_versions(self.schema), + bc.MSG_CATEGORY: 'warning', + bc.MSG: f"Queries had validation issues"} + self.check_for_warnings = False + results = self.validate() + if results['data']: + return results + hed_objs, definitions = self.get_hed_objs() + df_factors = search_hed_objs(hed_objs, queries, query_names=query_names) + if self.append_assembled: + df = pd.concat([self.events.dataframe, df_factors], axis=1) + df = df.loc[:, ~df.columns.duplicated(keep='last')] + data = df.to_csv(None, sep='\t', index=False, header=True, lineterminator='\n') + else: + data = df_factors.to_csv(None, sep='\t', index=False, header=True, lineterminator='\n') + file_name = generate_filename(display_name, name_suffix='_queries', extension='.tsv', append_datetime=True) + return {bc.COMMAND: bc.COMMAND_SEARCH, bc.COMMAND_TARGET: 'events', 'data': data, + 'definitions': DefinitionDict.get_as_strings(definitions), + 'output_display_name': file_name, 'schema_version': self.schema.get_formatted_version(), + bc.MSG_CATEGORY: 'success', + bc.MSG: f"Successfully made {len(self.queries)} queries for {display_name}"} + + def validate(self): + """ Validate the events tabular input object and return the results. + + Returns: + dict: A dictionary containing results of validation in standard format. + + Notes: The dictionary of options includes the following. + - check_for_warnings (bool): If true, validation should include warnings. (default False) + + """ + display_name = self.events.name + error_handler = ErrorHandler(check_for_warnings=self.check_for_warnings) + issues = [] + if self.sidecar: + issues = self.sidecar.validate(self.schema, name=self.sidecar.name, error_handler=error_handler) + if not check_for_any_errors(issues): + issues += self.events.validate(self.schema, name=self.events.name, error_handler=error_handler) + if issues: + data = get_printable_issue_string(issues, title="Event file errors:") + file_name = generate_filename(display_name, name_suffix='_validation_issues', + extension='.txt', append_datetime=True) + category = 'warning' + msg = f"Events file {display_name} had validation issues" + else: + data = '' + file_name = display_name + category = 'success' + msg = f"Events file {display_name} did not have validation issues" + + return {bc.COMMAND: bc.COMMAND_VALIDATE, bc.COMMAND_TARGET: 'events', + 'data': data, "output_display_name": file_name, + bc.SCHEMA_VERSION: get_schema_versions(self.schema), + bc.MSG_CATEGORY: category, bc.MSG: msg} diff --git a/hedweb/process_form.py b/hedweb/process_form.py index b124605f..1d344c1d 100644 --- a/hedweb/process_form.py +++ b/hedweb/process_form.py @@ -1,164 +1,165 @@ -import os -import json -from werkzeug.utils import secure_filename -from werkzeug.datastructures import FileStorage -from hed.schema import load_schema_version, from_string -from hed import HedSchema - -from hed import schema as hedschema -from hed.errors import HedFileError -from hed.models.hed_string import HedString -from hed.models.sidecar import Sidecar -from hed.models.spreadsheet_input import SpreadsheetInput -from hed.models.tabular_input import TabularInput -from hedweb.constants import base_constants as bc -from hedweb.constants import file_constants as fc -from hedweb.columns import create_column_selections, get_tag_columns -from hedweb.web_util import form_has_file, form_has_option, form_has_url, get_parsed_name - - -class ProcessForm: - @staticmethod - def get_input_from_form(request): - """ Get a dictionary of input from a service request. - - Parameters: - request (Request): A Request object containing user data for the service request. - - Returns: - dict: A dictionary containing input arguments for calling the service request. - """ - - arguments = { - bc.REQUEST_TYPE: bc.FROM_FORM, - bc.COMMAND: request.form.get(bc.COMMAND_OPTION, ''), - bc.CHECK_FOR_WARNINGS: form_has_option(request.form, bc.CHECK_FOR_WARNINGS, 'on'), - bc.EXPAND_DEFS: form_has_option(request.form, bc.EXPAND_DEFS, 'on'), - bc.INCLUDE_CONTEXT: form_has_option(request.form, bc.INCLUDE_CONTEXT, 'on'), - bc.INCLUDE_DESCRIPTION_TAGS: form_has_option(request.form, bc.INCLUDE_DESCRIPTION_TAGS, 'on'), - bc.INCLUDE_SUMMARIES: form_has_option(request.form, bc.INCLUDE_SUMMARIES, 'on'), - bc.REMOVE_TYPES_ON: form_has_option(request.form, bc.REMOVE_TYPES_ON, 'on'), - bc.REPLACE_DEFS: form_has_option(request.form, bc.REPLACE_DEFS, 'on'), - bc.SPREADSHEET_TYPE: fc.TSV_EXTENSION - } - value, skip = create_column_selections(request.form) - arguments[bc.COLUMNS_SKIP] = skip - arguments[bc.COLUMNS_VALUE] = value - arguments[bc.TAG_COLUMNS] = get_tag_columns(request.form) - ProcessForm.set_schema_from_request(arguments, request) - ProcessForm.set_json_files(arguments, request) - ProcessForm.set_queries(arguments, request) - ProcessForm.set_input_objects(arguments, request) - return arguments - - @staticmethod - def set_input_objects(arguments, request): - if bc.EVENTS_FILE in request.files and request.files[bc.EVENTS_FILE]: - f = request.files[bc.EVENTS_FILE] - arguments[bc.EVENTS] = TabularInput(file=f, sidecar=arguments.get(bc.SIDECAR, None), - name=secure_filename(f.filename)) - if bc.STRING_INPUT in request.form and request.form[bc.STRING_INPUT]: - arguments[bc.STRING_LIST] = [HedString(request.form[bc.STRING_INPUT], arguments[bc.SCHEMA])] - if bc.SPREADSHEET_FILE in request.files and request.files[bc.SPREADSHEET_FILE].filename: - arguments[bc.WORKSHEET] = request.form.get(bc.WORKSHEET_NAME, None) - filename = request.files[bc.SPREADSHEET_FILE].filename - file_ext = os.path.splitext(filename)[1] - if file_ext in fc.EXCEL_FILE_EXTENSIONS: - arguments[bc.SPREADSHEET_TYPE] = fc.EXCEL_EXTENSION - arguments[bc.SPREADSHEET] = SpreadsheetInput(file=request.files[bc.SPREADSHEET_FILE], - file_type=fc.EXCEL_EXTENSION, - worksheet_name=arguments[bc.WORKSHEET], - tag_columns=arguments[bc.TAG_COLUMNS], - has_column_names=True, name=filename) - - @staticmethod - def set_json_files(arguments, request): - if bc.SIDECAR_FILE in request.files and request.files[bc.SIDECAR_FILE]: - f = request.files[bc.SIDECAR_FILE] - arguments[bc.SIDECAR] = Sidecar(files=f, name=secure_filename(f.filename)) - if bc.REMODEL_FILE in request.files and request.files[bc.REMODEL_FILE]: - f = request.files[bc.REMODEL_FILE] - name = secure_filename(f.filename) - arguments[bc.REMODEL_OPERATIONS] = {'name': name, 'operations': json.load(f)} - if bc.DEFINITION_FILE in request.files and request.files[bc.DEFINITION_FILE]: - f = request.files[bc.DEFINITION_FILE] - sidecar = Sidecar(files=f, name=secure_filename(f.filename)) - arguments[bc.DEFINITIONS] = sidecar.get_def_dict(arguments[bc.SCHEMA], extra_def_dicts=None) - - @staticmethod - def set_queries(arguments, request): - """ Update arguments with lists of string queries - - Parameters: - arguments (dict): A dictionary with the extracted parameters that are to be processed. - request (Request): A Request object containing form data. - """ - arguments[bc.QUERY_NAMES] = None - if bc.QUERY_INPUT in request.form and request.form[bc.QUERY_INPUT]: - arguments[bc.QUERIES] = [request.form[bc.QUERY_INPUT]] - else: - arguments[bc.QUERIES] = None - - @staticmethod - def set_schema_from_request(arguments, request): - """ Create a HedSchema object from form pull-down box. - - Parameters: - arguments (dict): Dictionary of parameters to which the schema will be added. - request (Request): A Request object containing form data. - - Returns: - HedSchema: The HED schema to use. - """ - - if form_has_option(request.form, bc.SCHEMA_VERSION) and \ - request.form[bc.SCHEMA_VERSION] != bc.OTHER_VERSION_OPTION: - arguments[bc.SCHEMA] = load_schema_version(request.form[bc.SCHEMA_VERSION]) - elif form_has_option(request.form, bc.SCHEMA_VERSION) and form_has_file(request.files, bc.SCHEMA_PATH): - f = request.files[bc.SCHEMA_PATH] - arguments[bc.SCHEMA] = \ - from_string(f.read(fc.BYTE_LIMIT).decode('utf-8'), schema_format=secure_filename(f.filename)) - if form_has_option(request.form, bc.SCHEMA_UPLOAD_OPTIONS, bc.SCHEMA_FILE_OPTION) and \ - form_has_file(request.files, bc.SCHEMA_FILE, fc.SCHEMA_EXTENSIONS): - arguments[bc.SCHEMA] = ProcessForm.get_schema(request.files[bc.SCHEMA_FILE]) - elif form_has_option(request.form, bc.SCHEMA_UPLOAD_OPTIONS, bc.SCHEMA_URL_OPTION) and \ - form_has_url(request.form, bc.SCHEMA_URL, fc.SCHEMA_EXTENSIONS): - arguments[bc.SCHEMA] = ProcessForm.get_schema(request.values[bc.SCHEMA_URL]) - if form_has_option(request.form, bc.SECOND_SCHEMA_UPLOAD_OPTIONS, bc.SECOND_SCHEMA_FILE_OPTION) and \ - form_has_file(request.files, bc.SECOND_SCHEMA_FILE, fc.SCHEMA_EXTENSIONS): - arguments[bc.SCHEMA2] = ProcessForm.get_schema(request.files[bc.SECOND_SCHEMA_FILE]) - elif form_has_option(request.form, bc.SECOND_SCHEMA_UPLOAD_OPTIONS, bc.SECOND_SCHEMA_URL_OPTION) and \ - form_has_url(request.form, bc.SECOND_SCHEMA_URL, fc.SCHEMA_EXTENSIONS): - arguments[bc.SCHEMA2] = ProcessForm.get_schema(request.values[bc.SECOND_SCHEMA_URL]) - - @staticmethod - def get_schema(schema_input=None, version=None, as_xml_string=None): - """ Return a HedSchema object from the given parameters. - - Parameters: - schema_input (str or FileStorage or None): Input url or file. - version (str or None): A schema version string to load, e.g. "8.2.0" or "score_1.1.0". - as_xml_string (str or None): A schema in xml string format. - - Returns: - HedSchema: Schema - - :raises HedFileError: - - The schema can't be loaded for some reason. - """ - if isinstance(schema_input, FileStorage): - name, extension = get_parsed_name(secure_filename(schema_input.filename)) - hed_schema = hedschema.from_string(schema_input.read(fc.BYTE_LIMIT).decode('utf-8'), - schema_format=extension, - name=name) - elif isinstance(schema_input, str): - name, extension = get_parsed_name(schema_input, is_url=True) - hed_schema = hedschema.load_schema(schema_input, name=name) - elif isinstance(version, str): - return hedschema.load_schema_version(version) - elif isinstance(as_xml_string, str): - return hedschema.from_string(as_xml_string, schema_format=".xml") - else: - raise HedFileError("SCHEMA_NOT_FOUND", "Must provide a loadable schema", "") - - return hed_schema +import os +import json +from werkzeug.utils import secure_filename +from werkzeug.datastructures import FileStorage +from hed.schema import load_schema_version, from_string +from hed import HedSchema + +from hed import schema as hedschema +from hed.errors import HedFileError +from hed.models.hed_string import HedString +from hed.models.sidecar import Sidecar +from hed.models.spreadsheet_input import SpreadsheetInput +from hed.models.tabular_input import TabularInput +from hedweb.constants import base_constants as bc +from hedweb.constants import file_constants as fc +from hedweb.columns import create_column_selections, get_tag_columns +from hedweb.web_util import form_has_file, form_has_option, form_has_url, get_parsed_name + + +class ProcessForm: + @staticmethod + def get_input_from_form(request): + """ Get a dictionary of input from a service request. + + Parameters: + request (Request): A Request object containing user data for the service request. + + Returns: + dict: A dictionary containing input arguments for calling the service request. + """ + + arguments = { + bc.REQUEST_TYPE: bc.FROM_FORM, + bc.COMMAND: request.form.get(bc.COMMAND_OPTION, ''), + bc.APPEND_ASSEMBLED: form_has_option(request.form, bc.APPEND_ASSEMBLED, 'on'), + bc.CHECK_FOR_WARNINGS: form_has_option(request.form, bc.CHECK_FOR_WARNINGS, 'on'), + bc.EXPAND_DEFS: form_has_option(request.form, bc.EXPAND_DEFS, 'on'), + bc.INCLUDE_CONTEXT: form_has_option(request.form, bc.INCLUDE_CONTEXT, 'on'), + bc.INCLUDE_DESCRIPTION_TAGS: form_has_option(request.form, bc.INCLUDE_DESCRIPTION_TAGS, 'on'), + bc.INCLUDE_SUMMARIES: form_has_option(request.form, bc.INCLUDE_SUMMARIES, 'on'), + bc.REMOVE_TYPES_ON: form_has_option(request.form, bc.REMOVE_TYPES_ON, 'on'), + bc.REPLACE_DEFS: form_has_option(request.form, bc.REPLACE_DEFS, 'on'), + bc.SPREADSHEET_TYPE: fc.TSV_EXTENSION + } + value, skip = create_column_selections(request.form) + arguments[bc.COLUMNS_SKIP] = skip + arguments[bc.COLUMNS_VALUE] = value + arguments[bc.TAG_COLUMNS] = get_tag_columns(request.form) + ProcessForm.set_schema_from_request(arguments, request) + ProcessForm.set_json_files(arguments, request) + ProcessForm.set_queries(arguments, request) + ProcessForm.set_input_objects(arguments, request) + return arguments + + @staticmethod + def set_input_objects(arguments, request): + if bc.EVENTS_FILE in request.files and request.files[bc.EVENTS_FILE]: + f = request.files[bc.EVENTS_FILE] + arguments[bc.EVENTS] = TabularInput(file=f, sidecar=arguments.get(bc.SIDECAR, None), + name=secure_filename(f.filename)) + if bc.STRING_INPUT in request.form and request.form[bc.STRING_INPUT]: + arguments[bc.STRING_LIST] = [HedString(request.form[bc.STRING_INPUT], arguments[bc.SCHEMA])] + if bc.SPREADSHEET_FILE in request.files and request.files[bc.SPREADSHEET_FILE].filename: + arguments[bc.WORKSHEET] = request.form.get(bc.WORKSHEET_NAME, None) + filename = request.files[bc.SPREADSHEET_FILE].filename + file_ext = os.path.splitext(filename)[1] + if file_ext in fc.EXCEL_FILE_EXTENSIONS: + arguments[bc.SPREADSHEET_TYPE] = fc.EXCEL_EXTENSION + arguments[bc.SPREADSHEET] = SpreadsheetInput(file=request.files[bc.SPREADSHEET_FILE], + file_type=fc.EXCEL_EXTENSION, + worksheet_name=arguments[bc.WORKSHEET], + tag_columns=arguments[bc.TAG_COLUMNS], + has_column_names=True, name=filename) + + @staticmethod + def set_json_files(arguments, request): + if bc.SIDECAR_FILE in request.files and request.files[bc.SIDECAR_FILE]: + f = request.files[bc.SIDECAR_FILE] + arguments[bc.SIDECAR] = Sidecar(files=f, name=secure_filename(f.filename)) + if bc.REMODEL_FILE in request.files and request.files[bc.REMODEL_FILE]: + f = request.files[bc.REMODEL_FILE] + name = secure_filename(f.filename) + arguments[bc.REMODEL_OPERATIONS] = {'name': name, 'operations': json.load(f)} + if bc.DEFINITION_FILE in request.files and request.files[bc.DEFINITION_FILE]: + f = request.files[bc.DEFINITION_FILE] + sidecar = Sidecar(files=f, name=secure_filename(f.filename)) + arguments[bc.DEFINITIONS] = sidecar.get_def_dict(arguments[bc.SCHEMA], extra_def_dicts=None) + + @staticmethod + def set_queries(arguments, request): + """ Update arguments with lists of string queries + + Parameters: + arguments (dict): A dictionary with the extracted parameters that are to be processed. + request (Request): A Request object containing form data. + """ + arguments[bc.QUERY_NAMES] = None + if bc.QUERY_INPUT in request.form and request.form[bc.QUERY_INPUT]: + arguments[bc.QUERIES] = [request.form[bc.QUERY_INPUT]] + else: + arguments[bc.QUERIES] = None + + @staticmethod + def set_schema_from_request(arguments, request): + """ Create a HedSchema object from form pull-down box. + + Parameters: + arguments (dict): Dictionary of parameters to which the schema will be added. + request (Request): A Request object containing form data. + + Returns: + HedSchema: The HED schema to use. + """ + + if form_has_option(request.form, bc.SCHEMA_VERSION) and \ + request.form[bc.SCHEMA_VERSION] != bc.OTHER_VERSION_OPTION: + arguments[bc.SCHEMA] = load_schema_version(request.form[bc.SCHEMA_VERSION]) + elif form_has_option(request.form, bc.SCHEMA_VERSION) and form_has_file(request.files, bc.SCHEMA_PATH): + f = request.files[bc.SCHEMA_PATH] + arguments[bc.SCHEMA] = \ + from_string(f.read(fc.BYTE_LIMIT).decode('utf-8'), schema_format=secure_filename(f.filename)) + if form_has_option(request.form, bc.SCHEMA_UPLOAD_OPTIONS, bc.SCHEMA_FILE_OPTION) and \ + form_has_file(request.files, bc.SCHEMA_FILE, fc.SCHEMA_EXTENSIONS): + arguments[bc.SCHEMA] = ProcessForm.get_schema(request.files[bc.SCHEMA_FILE]) + elif form_has_option(request.form, bc.SCHEMA_UPLOAD_OPTIONS, bc.SCHEMA_URL_OPTION) and \ + form_has_url(request.form, bc.SCHEMA_URL, fc.SCHEMA_EXTENSIONS): + arguments[bc.SCHEMA] = ProcessForm.get_schema(request.values[bc.SCHEMA_URL]) + if form_has_option(request.form, bc.SECOND_SCHEMA_UPLOAD_OPTIONS, bc.SECOND_SCHEMA_FILE_OPTION) and \ + form_has_file(request.files, bc.SECOND_SCHEMA_FILE, fc.SCHEMA_EXTENSIONS): + arguments[bc.SCHEMA2] = ProcessForm.get_schema(request.files[bc.SECOND_SCHEMA_FILE]) + elif form_has_option(request.form, bc.SECOND_SCHEMA_UPLOAD_OPTIONS, bc.SECOND_SCHEMA_URL_OPTION) and \ + form_has_url(request.form, bc.SECOND_SCHEMA_URL, fc.SCHEMA_EXTENSIONS): + arguments[bc.SCHEMA2] = ProcessForm.get_schema(request.values[bc.SECOND_SCHEMA_URL]) + + @staticmethod + def get_schema(schema_input=None, version=None, as_xml_string=None): + """ Return a HedSchema object from the given parameters. + + Parameters: + schema_input (str or FileStorage or None): Input url or file. + version (str or None): A schema version string to load, e.g. "8.2.0" or "score_1.1.0". + as_xml_string (str or None): A schema in xml string format. + + Returns: + HedSchema: Schema + + :raises HedFileError: + - The schema can't be loaded for some reason. + """ + if isinstance(schema_input, FileStorage): + name, extension = get_parsed_name(secure_filename(schema_input.filename)) + hed_schema = hedschema.from_string(schema_input.read(fc.BYTE_LIMIT).decode('utf-8'), + schema_format=extension, + name=name) + elif isinstance(schema_input, str): + name, extension = get_parsed_name(schema_input, is_url=True) + hed_schema = hedschema.load_schema(schema_input, name=name) + elif isinstance(version, str): + return hedschema.load_schema_version(version) + elif isinstance(as_xml_string, str): + return hedschema.from_string(as_xml_string, schema_format=".xml") + else: + raise HedFileError("SCHEMA_NOT_FOUND", "Must provide a loadable schema", "") + + return hed_schema diff --git a/hedweb/templates/events.html b/hedweb/templates/events.html index abf304de..59bd2c81 100644 --- a/hedweb/templates/events.html +++ b/hedweb/templates/events.html @@ -12,7 +12,7 @@