diff --git a/MANIFEST.in b/MANIFEST.in deleted file mode 100644 index f131701..0000000 --- a/MANIFEST.in +++ /dev/null @@ -1,2 +0,0 @@ -include src/scicat_schemas/dataset.schema.json.jinja -include src/scicat_schemas/origdatablock.schema.json.jinja diff --git a/README.md b/README.md index 3b44d0c..4d158ee 100644 --- a/README.md +++ b/README.md @@ -87,3 +87,49 @@ copier update `tox` controls virtual environment and commands for various purposes. Developers and CI actions can use the command. For example, `tox -e docs` builds documentation under `./html` directory and `tox -e py310` will run unit tests with python version `3.10`. + +## ADR +(Architecture Decision Records) + +### ADR-001: Use ``dataclass`` instead of ``jinja`` or ``dict`` to create dataset/data-block instances. +We need a dict-like template to create dataset/data-block instances via scicat APIs. +#### Reason for not using ``dict`` +It used to be implemented with ``dict`` but it didn't have any verifying layer so anyone could easily break the instances without noticing or causing errors in the upstream layers. +#### Reason for not using ``jinja`` + +``Jinja`` template could handle a bit more complicated logic within the template, i.e. ``for`` loop or ``if`` statement could be applied to the variables. +However, the dataset/data-block instances are not complicated to utilize these features of ``jinja``. +#### Reason for using ``dataclasses.dataclass` +First we did try using ``jinja`` but the dataset/data-block instances are simple enough so we replaced ``jinja`` template with ``dataclass``. +``dataclass`` can verify name and type (if we use static checks) of each field. +It can be easily turned into a nested dictionary using ``dataclasses.asdict`` function. + +#### Downside of using ``dataclass`` instead of ``jinja`` +With ``jinja`` template, certain fields could be skipped based on a variable. +However, it is not possible in the dataclass so it will need extra handling after turning it to a dictionary. +For example, each datafile item can have ``chk`` field, but this field shouldn't exist if checksum was not derived. +With jinja template we could handle this like below +```jinja +{ + "path": "{{ path }}", + "size": {{ size }}, + "time": "{{ time }}", + {% if chk %}"chk": "{{ chk }}"{% endif %} +} +``` +However, with dataclass this should be handled like below. +```python +from dataclasses import dataclass, asdict +@dataclass +class DataFileItem: + path: str + size: int + time: str + chk: None | str = None + +data_file_item = { + k: v + for k, v in asdict(DataFileItem('./', 1, '00:00')).items() + if (k!='chk' or v is not None) +} +``` diff --git a/pyproject.toml b/pyproject.toml index 4d69d47..8780d37 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -32,7 +32,6 @@ dependencies = [ "ess-streaming-data-types", "graypy", "h5py", - "jinja2", "kafka-python", "requests", "rich" @@ -46,8 +45,8 @@ dynamic = ["version"] "Source" = "https://github.com/ScicatProject/scicat-filewriter-ingest" [project.scripts] -scicat_ingestor = "scicat_ingestor:main" -background_ingestor = "background_ingestor:main" +scicat_ingestor = "scicat_online_ingestor:main" +background_ingestor = "scicat_offline_ingestor:main" [project.entry-points."scicat_ingestor.metadata_extractor"] max = "numpy:max" diff --git a/resources/base.imsc.json.example b/resources/base.imsc.json.example index 3d3d00e..d6b443d 100644 --- a/resources/base.imsc.json.example +++ b/resources/base.imsc.json.example @@ -3,7 +3,7 @@ "name" : "Generic metadata schema" "instrument" : "", "selector" : "filename:starts_with:/ess/data", - "variables" : { + "variables" : { "pid": { "source": "NXS", "path": "/entry/entry_identifier_uuid", diff --git a/resources/config.sample.json b/resources/config.sample.json index 80bc88d..124882e 100644 --- a/resources/config.sample.json +++ b/resources/config.sample.json @@ -1,81 +1,71 @@ { + "config_file": "config.json", + "id": "", + "dataset": { + "check_by_job_id": true, + "allow_dataset_pid": true, + "generate_dataset_pid": false, + "dataset_pid_prefix": "20.500.12269", + "default_instrument_id": "ID_OF_FALLBACK_INSTRUMENT", + "default_proposal_id": "DEFAULT_PROPOSAL_ID", + "default_owner_group": "DEFAULT_OWNER_GROUP", + "default_access_groups": [ + "ACCESS_GROUP_1" + ] + }, + "ingestion": { + "dry_run": false, + "offline_ingestor_executable" : "./scicat_offline_ingestor.py", + "schemas_directory": "schemas", + "file_handling": { + "compute_file_stats": true, + "compute_file_hash": true, + "file_hash_algorithm": "blake2b", + "save_file_hash": true, + "hash_file_extension": "b2b", + "ingestor_files_directory": "../ingestor", + "message_to_file": true, + "message_file_extension": "message.json" + } + }, "kafka": { - "topics": ["KAFKA_TOPIC_1", "KAFKA_TOPIC_2"], + "topics": [ + "KAFKA_TOPIC_1", + "KAFKA_TOPIC_2" + ], "group_id": "GROUP_ID", - "bootstrap_servers": ["localhost:9093"], + "bootstrap_servers": [ + "localhost:9093" + ], "sasl_mechanism": "SCRAM-SHA-256", "sasl_username": "USERNAME", "sasl_password": "PASSWORD", "ssl_ca_location": "FULL_PATH_TO_CERTIFICATE_FILE", "individual_message_commit": true, "enable_auto_commit": true, - "auto_offset_reset": "earliest", - "message_saving_options": { - "message_to_file": true, - "message_file_extension": "message.json", - "message_output": "SOURCE_FOLDER" - } - }, - "user_office": { - "host": "https://useroffice.host", - "username": "USERNAME", - "password": "PASSWORD", - "token": "JWT_TOKEN" - }, - "scicat": { - "host": "https://scicat.host", - "username": "USERNAME", - "password": "PASSWORD", - "token": "JWT_TOKEN" - }, - "graylog": {"host": "", "port": "", "facility": "scicat.ingestor"}, - "dataset": { - "instrument_id": "ID_OF_FALLBACK_INSTRUMENT", - "instrument": "FALLBACK_INSTRUMENT_NAME", - "default_proposal_id": "DEFAULT_PROPOSAL_ID", - "ownable": { - "ownerGroup": "DEFAULT_OWNER_GROUP", - "accessGroups": ["ACCESS_GROUP_1"] - } + "auto_offset_reset": "earliest" }, - "options": { - "config_file": "config.json", + "logging": { "verbose": false, "file_log": false, "file_log_base_name": "scicat_ingestor_log", "file_log_timestamp": false, "logging_level": "INFO", + "log_message_prefix": "SFI", "system_log": false, "system_log_facility": "mail", - "log_message_prefix": "SFI", - "check_by_job_id": true, - "pyscicat": null, - "graylog": false - }, - "ingestion_options": { - "dry_run": false, - "schemas_directory": "schemas", - "retrieve_instrument_from": "default", - "instrument_position_in_file_path": 3, - "file_handling_options": { - "hdf_structure_in_metadata": false, - "hdf_structure_to_file": true, - "hdf_structure_file_extension": ".hdf_structure.json", - "hdf_structure_output": "SOURCE_FOLDER", - "local_output_directory": "data", - "compute_file_stats": true, - "compute_file_hash": true, - "file_hash_algorithm": "blake2b", - "save_file_hash": true, - "hash_file_extension": "b2b", - "ingestor_files_directory": "ingestor" - }, - "dataset_options": { - "force_dataset_pid": true, - "dataset_pid_prefix": "20.500.12269", - "use_job_id_as_dataset_id": true, - "beautify_metadata_keys": false, - "metadata_levels_separator": " " - } + "graylog": false, + "graylog_host": "", + "graylog_port": "", + "graylog_facility": "scicat.ingestor" + }, + "scicat": { + "host": "https://scicat.host", + "token": "JWT_TOKEN", + "headers": {}, + "timeout": 0, + "stream": true, + "verify": false } } + diff --git a/src/background_ingestor.py b/src/background_ingestor.py deleted file mode 100644 index b3fd80f..0000000 --- a/src/background_ingestor.py +++ /dev/null @@ -1,227 +0,0 @@ -# SPDX-License-Identifier: BSD-3-Clause -# Copyright (c) 2024 ScicatProject contributors (https://github.com/ScicatProject) -# import scippnexus as snx -import json -import logging -import pathlib -from urllib.parse import urljoin - -import h5py -import requests -from scicat_configuration import ( - BackgroundIngestorConfig, - build_background_ingestor_arg_parser, - build_scicat_background_ingester_config, -) -from scicat_dataset import ( - build_single_data_file_desc, - convert_to_type, - save_and_build_single_hash_file_desc, -) -from scicat_logging import build_logger -from scicat_metadata import collect_schemas, select_applicable_schema -from system_helpers import exit_at_exceptions - - -def replace_variables_values(url: str, values: dict) -> str: - for key, value in values.items(): - url = url.replace("{" + key + "}", str(value)) - return url - - -def extract_variables_values( - variables: dict, h5file, config: BackgroundIngestorConfig -) -> dict: - values = {} - - # loop on all the variables defined - for variable in variables.keys(): - source = variables[variable]["source"] - value = "" - if source == "NXS": - # extract value from nexus file - # we need to address path entry/user_*/name - value = h5file[variables[variable]["path"]][...] - elif source == "SC": - # build url - url = replace_variables_values( - config[""]["scicat_url"] + variables[variable]["url"], values - ) - # retrieve value from SciCat - response = requests.get( - url, - headers={"token": config[""]["token"]}, - timeout=10, # TODO: decide timeout. Maybe from configuration? - ) - # extract value - value = response.json()[variables[variable]["field"]] - elif source == "VALUE": - # the value is the one indicated - # there might be some substitution needed - value = replace_variables_values(variables[variable]["value"], values) - if ( - "operator" in variables[variable].keys() - and variables[variable]["operator"] - ): - operator = variables[variable]["operator"] - if operator == "join_with_space": - value = ", ".join(value) - else: - raise Exception("Invalid variable source configuration") - - values[variable] = convert_to_type(value, variables[variable]["value_type"]) - - return values - - -def prepare_scicat_dataset(metadata_schema, values): - """Prepare scicat dataset as dictionary ready to be ``POST``ed.""" - schema: dict = metadata_schema["schema"] - dataset = {} - scientific_metadata = { - 'ingestor_metadata_schema_id': { - "value": metadata_schema["id"], - "unit": "", - "human_name": "Ingestor Metadata Schema Id", - "type": "string", - } - } - for field in schema.values(): - machine_name = field["machine_name"] - field_type = field["type"] - if field["field_type"] == "high_level": - dataset[machine_name] = convert_to_type( - replace_variables_values(field["value"], values), field_type - ) - elif field["field_type"] == "scientific_metadata": - scientific_metadata[machine_name] = { - "value": convert_to_type( - replace_variables_values(field["value"], values), field_type - ), - "unit": "", - "human_name": field["human_name"] - if field.get("human_name", None) - else machine_name, - "type": field_type, - } - else: - raise Exception("Metadata schema field type invalid") - - dataset["scientific_metadata"] = scientific_metadata - - return dataset - - -def create_scicat_dataset(dataset: str, config: dict, logger: logging.Logger) -> dict: - """ - Execute a POST request to scicat to create a dataset - """ - response = requests.request( - method="POST", - url=urljoin(config["scicat_url"], "datasets"), - json=dataset, - headers=config["scicat_headers"], - timeout=config["timeout_seconds"], - stream=False, - verify=True, - ) - - result = response.json() - if response.ok: - ... - else: - err = result.get("error", {}) - raise Exception(f"Error creating new dataset: {err}") - - logger.info("Dataset create successfully. Dataset pid: %s", result['pid']) - return result - - -def prepare_scicat_origdatablock(files_list, config): ... -def create_scicat_origdatablock( - scicat_dataset_pid, nexus_file=None, done_writing_message_file=None -): ... - - -def main() -> None: - """Main entry point of the app.""" - arg_parser = build_background_ingestor_arg_parser() - arg_namespace = arg_parser.parse_args() - config = build_scicat_background_ingester_config(arg_namespace) - ingestion_options = config.ingestion_options - file_handling_options = ingestion_options.file_handling_options - logger = build_logger(config) - - # Log the configuration as dictionary so that it is easier to read from the logs - logger.info( - 'Starting the Scicat background Ingestor with the following configuration:' - ) - logger.info(config.to_dict()) - - # Collect all metadata schema configurations - schemas = collect_schemas(ingestion_options.schema_directory) - - with exit_at_exceptions(logger, daemon=False): - logger.info( - "Nexus file to be ingested : %s", - (nexus_file_path := pathlib.Path(config.single_run_options.nexus_file)), - ) - logger.info( - "Done writing message file linked to nexus file : %s", - ( - done_writing_message_file := pathlib.Path( - config.single_run_options.done_writing_message_file - ) - ), - ) - - # open and read done writing message input file - logger.info(json.load(done_writing_message_file.open())) - - # open nexus file with h5py - with h5py.File(nexus_file_path) as h5file: - # load instrument metadata configuration - metadata_schema = select_applicable_schema(nexus_file_path, h5file, schemas) - - # define variables values - variables_values = extract_variables_values( - metadata_schema['variables'], h5file, config - ) - - # Collect data-file descriptions - data_file_list = [ - build_single_data_file_desc(nexus_file_path, file_handling_options), - build_single_data_file_desc( - done_writing_message_file, file_handling_options - ), - # TODO: Add nexus structure file - ] - # Create hash of all the files if needed - if file_handling_options.save_file_hash: - data_file_list += [ - save_and_build_single_hash_file_desc( - data_file_dict, file_handling_options - ) - for data_file_dict in data_file_list - ] - # Collect all data-files and hash-files descriptions - _ = [json.dumps(file_dict, indent=2) for file_dict in data_file_list] - - # create and populate scicat dataset entry - scicat_dataset = prepare_scicat_dataset(metadata_schema, variables_values) - - # create dataset in scicat - scicat_dataset = create_scicat_dataset(scicat_dataset, config) - scicat_dataset_pid = scicat_dataset["pid"] - - # create and populate scicat origdatablock entry - # with files and hashes previously computed - scicat_origdatablock = create_scicat_origdatablock( - scicat_dataset_pid, nexus_file_path, done_writing_message_file - ) - - # create origdatablock in scicat - scicat_origdatablock_id = create_scicat_origdatablock(scicat_origdatablock) - - # return successful code - return scicat_origdatablock_id diff --git a/src/scicat_communication.py b/src/scicat_communication.py new file mode 100644 index 0000000..d9216c0 --- /dev/null +++ b/src/scicat_communication.py @@ -0,0 +1,99 @@ +# SPDX-License-Identifier: BSD-3-Clause +# Copyright (c) 2024 ScicatProject contributors (https://github.com/ScicatProject) +import logging +from urllib.parse import urljoin + +import requests +from scicat_configuration import SciCatOptions + + +def retrieve_value_from_scicat( + *, + config: SciCatOptions, + variable_url: str, # It should be already rendered from variable_recipe["url"] + field_name: str, # variable_recipe["field"] +) -> str: + url = config.host.removesuffix('/') + variable_url + response: dict = requests.get( + url, headers={"token": config.token}, timeout=config.timeout + ).json() + return response[field_name] + + +class ScicatDatasetAPIError(Exception): + pass + + +def _post_to_scicat(*, url: str, posting_obj: dict, headers: dict, timeout: int): + return requests.request( + method="POST", + url=url, + json=posting_obj, + headers=headers, + timeout=timeout, + stream=False, + verify=True, + ) + + +def create_scicat_dataset( + *, dataset: dict, config: SciCatOptions, logger: logging.Logger +) -> dict: + """ + Execute a POST request to scicat to create a dataset + """ + logger.info("Sending POST request to create new dataset") + response = _post_to_scicat( + url=urljoin(config.host, "datasets"), + posting_obj=dataset, + headers={"token": config.token, **config.headers}, + timeout=config.timeout, + ) + result: dict = response.json() + if not response.ok: + logger.error( + "Failed to create new dataset. \nError message from scicat backend: \n%s", + result.get("error", {}), + ) + raise ScicatDatasetAPIError(f"Error creating new dataset: \n{dataset}") + + logger.info( + "Dataset created successfully. Dataset pid: %s", + result.get("pid"), + ) + return result + + +class ScicatOrigDatablockAPIError(Exception): + pass + + +def create_scicat_origdatablock( + *, origdatablock: dict, config: SciCatOptions, logger: logging.Logger +) -> dict: + """ + Execute a POST request to scicat to create a new origdatablock + """ + logger.info("Sending POST request to create new origdatablock") + response = _post_to_scicat( + url=urljoin(config.host, "origdatablocks"), + posting_obj=origdatablock, + headers={"token": config.token, **config.headers}, + timeout=config.timeout, + ) + result: dict = response.json() + if not response.ok: + logger.error( + "Failed to create new origdatablock. " + "Error message from scicat backend: \n%s", + result.get("error", {}), + ) + raise ScicatOrigDatablockAPIError( + f"Error creating new origdatablock: \n{origdatablock}" + ) + + logger.info( + "Origdatablock created successfully. Origdatablock pid: %s", + result['_id'], + ) + return result diff --git a/src/scicat_configuration.py b/src/scicat_configuration.py index 610cfef..6f8557e 100644 --- a/src/scicat_configuration.py +++ b/src/scicat_configuration.py @@ -2,7 +2,7 @@ # Copyright (c) 2024 ScicatProject contributors (https://github.com/ScicatProject) import argparse from collections.abc import Mapping -from dataclasses import asdict, dataclass +from dataclasses import asdict, dataclass, field from types import MappingProxyType from typing import Any @@ -20,12 +20,21 @@ def _load_config(config_file: Any) -> dict: return {} -def _merge_run_options(config_dict: dict, input_args_dict: dict) -> dict: +def _merge_config_options( + config_dict: dict, input_args_dict: dict, keys: list[str] | None = None +) -> dict: """Merge configuration from the configuration file and input arguments.""" + if keys is None: + keys = config_dict.keys() + return { **config_dict.setdefault("options", {}), - **{key: value for key, value in input_args_dict.items() if value is not None}, + **{ + key: input_args_dict[key] + for key in keys + if input_args_dict[key] is not None + }, } @@ -52,7 +61,7 @@ def _recursive_deepcopy(obj: Any) -> dict: return copied -def build_main_arg_parser() -> argparse.ArgumentParser: +def build_online_arg_parser() -> argparse.ArgumentParser: parser = argparse.ArgumentParser() group = parser.add_argument_group("Scicat Ingestor Options") @@ -67,6 +76,14 @@ def build_main_arg_parser() -> argparse.ArgumentParser: help="Configuration file name. Default: config.20240405.json", type=str, ) + group.add_argument( + "-d", + "--dry-run", + dest="dry_run", + help="Dry run. Does not produce any output file nor modify entry in SciCat", + action="store_true", + default=False, + ) group.add_argument( "-v", "--verbose", @@ -128,13 +145,6 @@ def build_main_arg_parser() -> argparse.ArgumentParser: action="store_true", default=True, ) - group.add_argument( - "--pyscicat", - dest="pyscicat", - help="Location where a specific version of pyscicat is available", - default=None, - type=str, - ) group.add_argument( "--graylog", dest="graylog", @@ -145,9 +155,9 @@ def build_main_arg_parser() -> argparse.ArgumentParser: return parser -def build_background_ingestor_arg_parser() -> argparse.ArgumentParser: - parser = build_main_arg_parser() - group = parser.add_argument_group('Scicat Background Ingestor Options') +def build_offline_ingestor_arg_parser() -> argparse.ArgumentParser: + parser = build_online_arg_parser() + group = parser.add_argument_group('Scicat Offline Ingestor Options') group.add_argument( '-f', @@ -178,51 +188,33 @@ def build_background_ingestor_arg_parser() -> argparse.ArgumentParser: @dataclass -class GraylogOptions: - host: str = "" - port: str = "" - facility: str = "scicat.ingestor" - - -@dataclass -class RunOptions: - """RunOptions dataclass to store the configuration options. +class LoggingOptions: + """ + LoggingOptions dataclass to store the configuration options. Most of options don't have default values because they are expected to be set by the user either in the configuration file or through command line arguments. """ - config_file: str verbose: bool file_log: bool file_log_base_name: str file_log_timestamp: bool - system_log: bool - log_message_prefix: str logging_level: str - check_by_job_id: bool + log_message_prefix: str + system_log: bool system_log_facility: str | None = None - pyscicat: str | None = None graylog: bool = False - - -@dataclass(frozen=True) -class MessageSavingOptions: - message_to_file: bool = True - """Save messages to a file.""" - message_file_extension: str = "message.json" - """Message file extension.""" - message_output: str = "SOURCE_FOLDER" - """Output directory for messages.""" - - -DEFAULT_MESSAGE_SAVING_OPTIONS = MessageSavingOptions() + graylog_host: str = "" + graylog_port: str = "" + graylog_facility: str = "scicat.ingestor" @dataclass -class kafkaOptions: - """KafkaOptions dataclass to store the configuration options. +class KafkaOptions: + """ + KafkaOptions dataclass to store the configuration options. Default values are provided as they are not expected to be set by command line arguments. @@ -248,118 +240,128 @@ class kafkaOptions: """Enable Kafka auto commit.""" auto_offset_reset: str = "earliest" """Kafka auto offset reset.""" - message_saving_options: MessageSavingOptions = DEFAULT_MESSAGE_SAVING_OPTIONS - """Message saving options.""" @classmethod - def from_configurations(cls, config: dict) -> "kafkaOptions": + def from_configurations(cls, config: dict) -> "KafkaOptions": """Create kafkaOptions from a dictionary.""" - return cls( - **{ - **config, - "message_saving_options": MessageSavingOptions( - **config.get("message_saving_options", {}) - ), - }, - ) + return cls(**config) @dataclass class FileHandlingOptions: - hdf_structure_in_metadata: bool = False # Not sure if needed - hdf_structure_to_file: bool = True # Not sure if needed - hdf_structure_file_extension: str = "hdf_structure.json" # Not sure if needed - hdf_structure_output: str = "SOURCE_FOLDER" # Not sure if needed - local_output_directory: str = "data" - compute_file_stats: bool = True - compute_file_hash: bool = True + compute_file_stats: bool = False + compute_file_hash: bool = False file_hash_algorithm: str = "blake2b" - save_file_hash: bool = True + save_file_hash: bool = False hash_file_extension: str = "b2b" - ingestor_files_directory: str = "ingestor" - - -@dataclass -class DatasetOptions: - force_dataset_pid: bool = True # Not sure if needed - dataset_pid_prefix: str = "20.500.12269" - use_job_id_as_dataset_id: bool = True - beautify_metadata_keys: bool = False - metadata_levels_separator: str = " " + ingestor_files_directory: str = "../ingestor" + message_to_file: bool = True + message_file_extension: str = "message.json" @dataclass class IngestionOptions: - file_handling_options: FileHandlingOptions - dataset_options: DatasetOptions - schema_directory: str = "schemas" - retrieve_instrument_from: str = "default" - instrument_position_in_file_path: int = 3 + file_handling: FileHandlingOptions + dry_run: bool = False + schemas_directory: str = "schemas" + offline_ingestor_executable: str = "./scicat_offline_ingestor.py" @classmethod def from_configurations(cls, config: dict) -> "IngestionOptions": """Create IngestionOptions from a dictionary.""" return cls( FileHandlingOptions(**config.get("file_handling_options", {})), - DatasetOptions(**config.get("dataset_options", {})), - schema_directory=config.get("schema_directory", "schemas"), - retrieve_instrument_from=config.get("retrieve_instrument_from", "default"), - instrument_position_in_file_path=config.get( - "instrument_position_in_file_path", 3 - ), + dry_run=config.get("dry_run", False), + schemas_directory=config.get("schemas_directory", "schemas"), ) @dataclass -class IngesterConfig: +class DatasetOptions: + check_by_job_id: bool = (True,) + allow_dataset_pid: bool = (True,) + generate_dataset_pid: bool = (False,) + dataset_pid_prefix: str = ("20.500.12269",) + default_instrument_id: str = ("",) + default_proposal_id: str = ("",) + default_owner_group: str = ("",) + default_access_groups: list[str] = field(default_factory=list) + + @classmethod + def from_configurations(cls, config: dict) -> "DatasetOptions": + """Create DatasetOptions from a dictionary.""" + return cls(**config) + + +@dataclass +class SciCatOptions: + host: str = "" + token: str = "" + headers: dict = field(default_factory=dict) + timeout: int = 0 + stream: bool = True + verify: bool = False + + @classmethod + def from_configurations(cls, config: dict) -> "SciCatOptions": + """Create SciCatOptions from a dictionary.""" + options = cls(**config) + options.headers = {"Authorization": f"Bearer {options.token}"} + return options + + +@dataclass +class OnlineIngestorConfig: original_dict: Mapping """Original configuration dictionary in the json file.""" - run_options: RunOptions - """Merged configuration dictionary with command line arguments.""" - kafka_options: kafkaOptions - """Kafka configuration options read from files.""" - graylog_options: GraylogOptions - """Graylog configuration options for streaming logs.""" - ingestion_options: IngestionOptions - """Ingestion configuration options for background ingestor.""" + dataset: DatasetOptions + kafka: KafkaOptions + logging: LoggingOptions + ingestion: IngestionOptions + scicat: SciCatOptions def to_dict(self) -> dict: """Return the configuration as a dictionary.""" return asdict( - IngesterConfig( + OnlineIngestorConfig( _recursive_deepcopy( self.original_dict ), # asdict does not support MappingProxyType - self.run_options, - self.kafka_options, - self.graylog_options, - self.ingestion_options, + self.dataset, + self.kafka, + self.logging, + self.ingestion, + self.scicat, ) ) -def build_scicat_ingester_config(input_args: argparse.Namespace) -> IngesterConfig: +def build_scicat_online_ingestor_config( + input_args: argparse.Namespace, +) -> OnlineIngestorConfig: """Merge configuration from the configuration file and input arguments.""" config_dict = _load_config(input_args.config_file) - run_option_dict = _merge_run_options(config_dict, vars(input_args)) + logging_dict = _merge_config_options( + config_dict.setdefault("logging", {}), vars(input_args) + ) + ingestion_dict = _merge_config_options( + config_dict.setdefault("ingestion", {}), vars(input_args), ["dry-run"] + ) # Wrap configuration in a dataclass - return IngesterConfig( + return OnlineIngestorConfig( original_dict=_freeze_dict_items(config_dict), - run_options=RunOptions(**run_option_dict), - kafka_options=kafkaOptions.from_configurations( - config_dict.setdefault("kafka", {}) - ), - graylog_options=GraylogOptions(**config_dict.setdefault("graylog", {})), - ingestion_options=IngestionOptions.from_configurations( - config_dict.setdefault("ingestion_options", {}) - ), + dataset=DatasetOptions(**config_dict.setdefault("dataset", {})), + ingestion=IngestionOptions.from_configurations(ingestion_dict), + kafka=KafkaOptions(**config_dict.setdefault("kafka", {})), + logging=LoggingOptions(**logging_dict), + scicat=SciCatOptions(**config_dict.setdefault("scicat", {})), ) @dataclass -class SingleRunOptions: +class OfflineRunOptions: nexus_file: str """Full path of the input nexus file to be ingested.""" done_writing_message_file: str @@ -367,47 +369,52 @@ class SingleRunOptions: @dataclass -class BackgroundIngestorConfig(IngesterConfig): - single_run_options: SingleRunOptions +class OfflineIngestorConfig(OnlineIngestorConfig): + offline_run: OfflineRunOptions """Single run configuration options for background ingestor.""" def to_dict(self) -> dict: """Return the configuration as a dictionary.""" return asdict( - BackgroundIngestorConfig( + OfflineIngestorConfig( _recursive_deepcopy( self.original_dict ), # asdict does not support MappingProxyType - self.run_options, - self.kafka_options, - self.graylog_options, - self.ingestion_options, - self.single_run_options, + self.dataset, + self.kafka, + self.logging, + self.ingestion, + self.scicat, + self.offline_run, ) ) -def build_scicat_background_ingester_config( +def build_scicat_offline_ingestor_config( input_args: argparse.Namespace, -) -> BackgroundIngestorConfig: +) -> OfflineIngestorConfig: """Merge configuration from the configuration file and input arguments.""" config_dict = _load_config(input_args.config_file) input_args_dict = vars(input_args) - single_run_option_dict = { + logging_dict = _merge_config_options( + config_dict.setdefault("logging", {}), input_args_dict + ) + ingestion_dict = _merge_config_options( + config_dict.setdefault("ingestion", {}), input_args_dict, ["dry-run"] + ) + offline_run_option_dict = { "nexus_file": input_args_dict.pop("nexus_file"), "done_writing_message_file": input_args_dict.pop("done_writing_message_file"), } - run_option_dict = _merge_run_options(config_dict, input_args_dict) - ingestion_option_dict = config_dict.setdefault("ingestion_options", {}) - kafka_option_dict = config_dict.setdefault("kafka", {}) # Wrap configuration in a dataclass - return BackgroundIngestorConfig( + return OfflineIngestorConfig( original_dict=_freeze_dict_items(config_dict), - run_options=RunOptions(**run_option_dict), - kafka_options=kafkaOptions.from_configurations(kafka_option_dict), - single_run_options=SingleRunOptions(**single_run_option_dict), - graylog_options=GraylogOptions(**config_dict.setdefault("graylog", {})), - ingestion_options=IngestionOptions.from_configurations(ingestion_option_dict), + dataset=DatasetOptions(**config_dict.setdefault("dataset", {})), + ingestion=IngestionOptions.from_configurations(ingestion_dict), + kafka=KafkaOptions(**config_dict.setdefault("kafka", {})), + logging=LoggingOptions(**logging_dict), + scicat=SciCatOptions(**config_dict.setdefault("scicat", {})), + offline_run=OfflineRunOptions(**offline_run_option_dict), ) diff --git a/src/scicat_dataset.py b/src/scicat_dataset.py index 9800b40..ce15d27 100644 --- a/src/scicat_dataset.py +++ b/src/scicat_dataset.py @@ -1,15 +1,22 @@ # SPDX-License-Identifier: BSD-3-Clause # Copyright (c) 2024 ScicatProject contributors (https://github.com/ScicatProject) import datetime +import logging import pathlib +import uuid +from collections.abc import Callable, Iterable +from dataclasses import asdict, dataclass, field from types import MappingProxyType from typing import Any -from scicat_configuration import FileHandlingOptions -from scicat_schemas import ( - load_dataset_schema_template, - load_origdatablock_schema_template, - load_single_datafile_template, +import h5py +from scicat_communication import retrieve_value_from_scicat +from scicat_configuration import DatasetOptions, FileHandlingOptions, SciCatOptions +from scicat_metadata import ( + HIGH_LEVEL_METADATA_TYPE, + SCIENTIFIC_METADATA_TYPE, + VALID_METADATA_TYPES, + render_variable_value, ) @@ -44,6 +51,7 @@ def to_date(value: Any) -> str | None: "integer": to_integer, "float": to_float, "date": to_date, + # TODO: Add email converter } ) @@ -57,96 +65,107 @@ def convert_to_type(input_value: Any, dtype_desc: str) -> Any: return converter(input_value) -def build_dataset_instance( - *, - dataset_pid_prefix: str, - nxs_dataset_pid: str, - dataset_name: str, - dataset_description: str, - principal_investigator: str, - facility: str, - environment: str, - scientific_metadata: str, - owner: str, - owner_email: str, - source_folder: str, - contact_email: str, - iso_creation_time: str, - technique_pid: str, - technique_name: str, - instrument_id: str, - sample_id: str, - proposal_id: str, - owner_group: str, - access_groups: list[str], -) -> str: - return load_dataset_schema_template().render( - dataset_pid_prefix=dataset_pid_prefix, - nxs_dataset_pid=nxs_dataset_pid, - dataset_name=dataset_name, - dataset_description=dataset_description, - principal_investigator=principal_investigator, - facility=facility, - environment=environment, - scientific_metadata=scientific_metadata, - owner=owner, - owner_email=owner_email, - source_folder=source_folder, - contact_email=contact_email, - iso_creation_time=iso_creation_time, - technique_pid=technique_pid, - technique_name=technique_name, - instrument_id=instrument_id, - sample_id=sample_id, - proposal_id=proposal_id, - owner_group=owner_group, - access_groups=access_groups, - ) - - -def build_single_datafile_instance( - *, - file_absolute_path: str, - file_size: int, - datetime_isoformat: str, - uid: str, - gid: str, - perm: str, - checksum: str = "", -) -> str: - return load_single_datafile_template().render( - file_absolute_path=file_absolute_path, - file_size=file_size, - datetime_isoformat=datetime_isoformat, - checksum=checksum, - uid=uid, - gid=gid, - perm=perm, - ) +_OPERATOR_REGISTRY = MappingProxyType( + { + "DO_NOTHING": lambda value: value, + "join_with_space": lambda value: ", ".join(value), + } +) -def build_origdatablock_instance( - *, - dataset_pid_prefix: str, - nxs_dataset_pid: str, - dataset_size: int, - check_algorithm: str, - data_file_desc_list: list[str], -) -> str: - return load_origdatablock_schema_template().render( - dataset_pid_prefix=dataset_pid_prefix, - nxs_dataset_pid=nxs_dataset_pid, - dataset_size=dataset_size, - check_algorithm=check_algorithm, - data_file_desc_list=data_file_desc_list, - ) +def _get_operator(operator: str | None) -> Callable: + return _OPERATOR_REGISTRY.get(operator or "DO_NOTHING", lambda _: _) -def _calculate_checksum(file_path: pathlib.Path, algorithm_name: str) -> str: +def extract_variables_values( + variables: dict[str, dict], h5file: h5py.File, config: SciCatOptions +) -> dict: + variable_map = {} + for variable_name, variable_recipe in variables.items(): + if (source := variable_recipe["source"]) == "NXS": + value = h5file[variable_recipe["path"]][...] + elif source == "SC": + value = retrieve_value_from_scicat( + config=config, + variable_url=render_variable_value( + variable_recipe["url"], variable_map + ), + field_name=variable_recipe["field"], + ) + elif source == "VALUE": + value = _get_operator(variable_recipe.get("operator"))( + render_variable_value(variable_recipe["value"], variable_map) + ) + else: + raise Exception("Invalid variable source: ", source) + variable_map[variable_name] = convert_to_type( + value, variable_recipe["value_type"] + ) + return variable_map + + +@dataclass(kw_only=True) +class TechniqueDesc: + pid: str + "Technique PID" + name: str + "Technique Name" + + +@dataclass(kw_only=True) +class ScicatDataset: + pid: str | None + size: int + numberOfFiles: int + isPublished: bool = False + datasetName: str + description: str + principalInvestigator: str + creationLocation: str + scientificMetadata: dict + owner: str + ownerEmail: str + sourceFolder: str + contactEmail: str + creationTime: str + type: str = "raw" + sampleId: str + techniques: list[TechniqueDesc] = field(default_factory=list) + instrumentId: str | None = None + proposalId: str | None = None + ownerGroup: str | None = None + accessGroup: list[str] | None = None + + +@dataclass(kw_only=True) +class DataFileListItem: + path: str + "Absolute path to the file." + size: int | None = None + "Size of the single file in bytes." + time: str + chk: str | None = None + uid: str | None = None + gid: str | None = None + perm: str | None = None + + +@dataclass(kw_only=True) +class OrigDataBlockInstance: + datasetId: str + size: int + chkAlg: str + dataFileList: list[DataFileListItem] + + +def _calculate_checksum(file_path: pathlib.Path, algorithm_name: str) -> str | None: """Calculate the checksum of a file.""" import hashlib - if not algorithm_name == "b2blake": + if not file_path.exists(): + return None + + if algorithm_name != "b2blake": raise ValueError( "Only b2blake hash algorithm is supported for now. Got: ", f"{algorithm_name}", @@ -161,72 +180,306 @@ def _calculate_checksum(file_path: pathlib.Path, algorithm_name: str) -> str: return chk.hexdigest() -def build_single_data_file_desc( - file_path: pathlib.Path, config: FileHandlingOptions -) -> dict[str, Any]: - """Build the description of a single data file.""" - import datetime - import json - - from scicat_schemas import load_single_datafile_template - - single_file_template = load_single_datafile_template() - - return json.loads( - single_file_template.render( - file_absolute_path=file_path.absolute(), - file_size=(file_stats := file_path.stat()).st_size, - datetime_isoformat=datetime.datetime.fromtimestamp( +def _create_single_data_file_list_item( + *, + file_path: pathlib.Path, + calculate_checksum: bool, + compute_file_stats: bool, + file_hash_algorithm: str = "", +) -> DataFileListItem: + """``DataFileListItem`` constructing helper.""" + + if file_path.exists() and compute_file_stats: + return DataFileListItem( + path=file_path.absolute().as_posix(), + size=(file_stats := file_path.stat()).st_size, + time=datetime.datetime.fromtimestamp( file_stats.st_ctime, tz=datetime.UTC ).strftime("%Y-%m-%dT%H:%M:%S.000Z"), - chk=_calculate_checksum(file_path, config.file_hash_algorithm), + chk=_calculate_checksum(file_path, file_hash_algorithm) + if calculate_checksum + else None, uid=str(file_stats.st_uid), gid=str(file_stats.st_gid), perm=oct(file_stats.st_mode), ) - ) + else: + return DataFileListItem( + path=file_path.absolute().as_posix(), + time=datetime.datetime.now(tz=datetime.UTC).strftime( + "%Y-%m-%dT%H:%M:%S.000Z" + ), + ) -def _build_hash_file_path( +def _build_hash_path( *, - original_file_path: str, - ingestor_files_directory: str, + original_file_instance: DataFileListItem, + dir_path: pathlib.Path, hash_file_extension: str, ) -> pathlib.Path: - """Build the path for the hash file.""" - original_path = pathlib.Path(original_file_path) - dir_path = pathlib.Path(ingestor_files_directory) - file_name = ".".join([original_path.name, hash_file_extension]) - return dir_path / pathlib.Path(file_name) + "Compose path to the hash file." + file_stem = pathlib.Path(original_file_instance.path).stem + return dir_path / pathlib.Path(".".join([file_stem, hash_file_extension])) -def save_and_build_single_hash_file_desc( - original_file_desciption: dict, config: FileHandlingOptions -) -> dict: - """Save the hash of the file and build the description.""" - import datetime - import json - - from scicat_schemas import load_single_datafile_template - - single_file_template = load_single_datafile_template() - file_hash: str = original_file_desciption["chk"] - hash_path = _build_hash_file_path( - original_file_path=original_file_desciption["path"], - ingestor_files_directory=config.ingestor_files_directory, - hash_file_extension=config.hash_file_extension, +def _save_hash_file( + *, + original_file_instance: DataFileListItem, + hash_path: pathlib.Path, +) -> None: + """Save the hash of the ``original_file_instance``.""" + if original_file_instance.chk is None: + raise ValueError("Checksum is not provided.") + + hash_path.write_text(original_file_instance.chk) + + +def create_data_file_list( + *, + nexus_file: pathlib.Path, + done_writing_message_file: pathlib.Path | None = None, + nexus_structure_file: pathlib.Path | None = None, + ingestor_directory: pathlib.Path, + config: FileHandlingOptions, + logger: logging.Logger, +) -> list[DataFileListItem]: + """ + Create a list of ``DataFileListItem`` instances for the files provided. + + Params + ------ + nexus_file: + Path to the NeXus file. + done_writing_message_file: + Path to the "done writing" message file. + nexus_structure_file: + Path to the NeXus structure file. + ingestor_directory: + Path to the directory where the files will be saved. + config: + Configuration related to the file handling. + logger: + Logger instance. + + """ + from functools import partial + + single_file_constructor = partial( + _create_single_data_file_list_item, + file_hash_algorithm=config.file_hash_algorithm, + compute_file_stats=config.compute_file_stats, ) - hash_path.write_text(file_hash) - return json.loads( - single_file_template.render( - file_absolute_path=hash_path.absolute(), - file_size=(file_stats := hash_path.stat()).st_size, - datetime_isoformat=datetime.datetime.fromtimestamp( - file_stats.st_ctime, tz=datetime.UTC - ).strftime("%Y-%m-%dT%H:%M:%S.000Z"), - uid=str(file_stats.st_uid), - gid=str(file_stats.st_gid), - perm=oct(file_stats.st_mode), + # Collect the files that will be ingested + file_list = [nexus_file] + if done_writing_message_file is not None: + file_list.append(done_writing_message_file) + if nexus_structure_file is not None: + file_list.append(nexus_structure_file) + + # Create the list of the files + data_file_list = [] + for minimum_file_path in file_list: + logger.info("Adding file %s to the datafiles list", minimum_file_path) + new_file_item = single_file_constructor( + file_path=minimum_file_path, + calculate_checksum=config.compute_file_hash, ) + data_file_list.append(new_file_item) + if config.save_file_hash: + logger.info( + "Computing hash of the file(%s) from disk...", minimum_file_path + ) + hash_file_path = _build_hash_path( + original_file_instance=new_file_item, + dir_path=ingestor_directory, + hash_file_extension=config.hash_file_extension, + ) + logger.info("Saving hash into a file ... %s", hash_file_path) + if new_file_item.chk is not None: + _save_hash_file( + original_file_instance=new_file_item, hash_path=hash_file_path + ) + data_file_list.append( + single_file_constructor( + file_path=hash_file_path, calculate_checksum=False + ) + ) + else: + logger.warning( + "File instance of (%s) does not have checksum. " + "Probably the file does not exist. " + "Skip saving...", + minimum_file_path, + ) + + return data_file_list + + +def _filter_by_field_type(schemas: Iterable[dict], field_type: str) -> list[dict]: + return [field for field in schemas if field["field_type"] == field_type] + + +def _render_variable_as_type(value: str, variable_map: dict, dtype: str) -> Any: + return convert_to_type(render_variable_value(value, variable_map), dtype) + + +def _create_scientific_metadata( + *, + metadata_schema_id: str, + sm_schemas: list[dict], + variable_map: dict, +) -> dict: + """Create scientific metadata from the metadata schema configuration. + + Params + ------ + metadata_schema_id: + The ID of the metadata schema configuration. + sm_schemas: + The scientific metadata schema configuration. + variable_map: + The variable map to render the scientific metadata values. + + """ + return { + # Default field + "ingestor_metadata_schema_id": { + "value": metadata_schema_id, + "unit": "", + "human_name": "Ingestor metadata schema ID", + "type": "string", + }, + **{ + field["machine_name"]: { + "value": _render_variable_as_type( + field["value"], variable_map, field["type"] + ), + "unit": field.get("unit", ""), + "human_name": field.get("human_name", field["machine_name"]), + "type": field["type"], + } + for field in sm_schemas + }, + } + + +def _validate_metadata_schemas( + metadata_schemas: dict[str, dict], +) -> None: + if any( + invalid_types := [ + field["field_type"] + for field in metadata_schemas.values() + if field["field_type"] not in VALID_METADATA_TYPES + ] + ): + raise ValueError( + "Invalid metadata schema types found. Valid types are: ", + VALID_METADATA_TYPES, + "Got: ", + invalid_types, + ) + + +def create_scicat_dataset_instance( + *, + metadata_schema_id: str, # metadata-schema["id"] + metadata_schemas: dict[str, dict], # metadata-schema["schema"] + variable_map: dict, + data_file_list: list[DataFileListItem], + config: DatasetOptions, + logger: logging.Logger, +) -> ScicatDataset: + """ + Prepare the ``ScicatDataset`` instance. + + Params + ------ + metadata_schema: + Metadata schema. + variables_values: + Variables values. + data_file_list: + List of the data files. + config: + Configuration related to scicat dataset instance. + logger: + Logger instance. + + """ + _validate_metadata_schemas(metadata_schemas) + # Create the dataset instance + scicat_dataset = ScicatDataset( + size=sum([file.size for file in data_file_list if file.size is not None]), + numberOfFiles=len(data_file_list), + isPublished=False, + scientificMetadata=_create_scientific_metadata( + metadata_schema_id=metadata_schema_id, + sm_schemas=_filter_by_field_type( + metadata_schemas.values(), SCIENTIFIC_METADATA_TYPE + ), # Scientific metadata schemas + variable_map=variable_map, + ), + **{ + field["machine_name"]: _render_variable_as_type( + field["value"], variable_map, field["type"] + ) + for field in _filter_by_field_type( + metadata_schemas.values(), HIGH_LEVEL_METADATA_TYPE + ) + # High level schemas + }, ) + + # Auto generate or assign default values if needed + if not config.allow_dataset_pid: + logger.info("PID is not allowed in the dataset by configuration.") + scicat_dataset.pid = None + elif config.generate_dataset_pid: + logger.info("Auto generating PID for the dataset based on the configuration.") + scicat_dataset.pid = uuid.uuid4().hex + if scicat_dataset.instrumentId is None: + scicat_dataset.instrumentId = config.default_instrument_id + logger.info( + "Instrument ID is not provided. Setting to default value. %s", + scicat_dataset.instrumentId, + ) + if scicat_dataset.proposalId is None: + scicat_dataset.proposalId = config.default_proposal_id + logger.info( + "Proposal ID is not provided. Setting to default value. %s", + scicat_dataset.proposalId, + ) + if scicat_dataset.ownerGroup is None: + scicat_dataset.ownerGroup = config.default_owner_group + logger.info( + "Owner group is not provided. Setting to default value. %s", + scicat_dataset.ownerGroup, + ) + if scicat_dataset.accessGroup is None: + scicat_dataset.accessGroup = config.default_access_groups + logger.info( + "Access group is not provided. Setting to default value. %s", + scicat_dataset.accessGroup, + ) + + logger.info("Dataset instance is created successfully. %s", scicat_dataset) + return scicat_dataset + + +def scicat_dataset_to_dict(dataset: ScicatDataset) -> dict: + """ + Convert the ``dataset`` to a dictionary. + + It removes the ``None`` values from the dictionary. + You can add more handlings for specific fields here if needed. + + Params + ------ + dataset: + Scicat dataset instance. + + """ + return {k: v for k, v in asdict(dataset).items() if v is not None} diff --git a/src/scicat_ingestor.py b/src/scicat_ingestor.py deleted file mode 100644 index b42fdb2..0000000 --- a/src/scicat_ingestor.py +++ /dev/null @@ -1,115 +0,0 @@ -# SPDX-License-Identifier: BSD-3-Clause -# Copyright (c) 2024 Scicatproject contributors (https://github.com/ScicatProject) -# ruff: noqa: E402, F401 - -import importlib.metadata -import logging -import pathlib - -try: - __version__ = importlib.metadata.version(__package__ or __name__) -except importlib.metadata.PackageNotFoundError: - __version__ = "0.0.0" - -del importlib - -from scicat_configuration import ( - MessageSavingOptions, - build_main_arg_parser, - build_scicat_ingester_config, -) -from scicat_kafka import ( - WritingFinished, - build_consumer, - compose_message_path, - save_message_to_file, - wrdn_messages, -) -from scicat_logging import build_logger -from scicat_path_helpers import select_target_directory -from system_helpers import exit_at_exceptions - - -def dump_message_to_file_if_needed( - *, - logger: logging.Logger, - message_file_path: pathlib.Path, - message_saving_options: MessageSavingOptions, - message: WritingFinished, -) -> None: - """Dump the message to a file according to the configuration.""" - if not message_saving_options.message_to_file: - logger.info("Message saving to file is disabled. Skipping saving message.") - return - elif not message_file_path.parent.exists(): - logger.info("Message file directory not accessible. Skipping saving message.") - return - - logger.info("Message will be saved in %s", message_file_path) - save_message_to_file( - message=message, - message_file_path=message_file_path, - ) - logger.info("Message file saved") - - -def main() -> None: - """Main entry point of the app.""" - arg_parser = build_main_arg_parser() - arg_namespace = arg_parser.parse_args() - config = build_scicat_ingester_config(arg_namespace) - logger = build_logger(config) - - # Log the configuration as dictionary so that it is easier to read from the logs - logger.info('Starting the Scicat online Ingestor with the following configuration:') - logger.info(config.to_dict()) - - # Often used options - message_saving_options = config.kafka_options.message_saving_options - - with exit_at_exceptions(logger): - # Kafka consumer - if (consumer := build_consumer(config.kafka_options, logger)) is None: - raise RuntimeError("Failed to build the Kafka consumer") - - # Receive messages - for message in wrdn_messages(consumer, logger): - logger.info("Processing message: %s", message) - - # Check if we have received a WRDN message. - # ``message: None | WritingFinished`` - if message: - # Extract nexus file path from the message. - nexus_file_path = pathlib.Path(message.file_name) - file_saving_dir = select_target_directory( - config.ingestion_options.file_handling_options, nexus_file_path - ) - dump_message_to_file_if_needed( - logger=logger, - message_saving_options=message_saving_options, - message=message, - message_file_path=compose_message_path( - target_dir=file_saving_dir, - nexus_file_path=nexus_file_path, - message_saving_options=message_saving_options, - ), - ) - # instantiate a new process and runs background ingestor - # on the nexus file - # use open process and wait for outcome - """ - background_ingestor - -c configuration_file - -f nexus_filename - -j job_id - -m message_file_path # optional depending on the - # message_saving_options.message_output - """ - - # if background process is successful - # check if we need to commit the individual message - """ - if config.kafka_options.individual_message_commit \ - and background_process is successful: - consumer.commit(message=message) - """ diff --git a/src/scicat_kafka.py b/src/scicat_kafka.py index 8ed99e5..8c47a8a 100644 --- a/src/scicat_kafka.py +++ b/src/scicat_kafka.py @@ -5,7 +5,7 @@ from collections.abc import Generator from confluent_kafka import Consumer -from scicat_configuration import MessageSavingOptions, kafkaOptions +from scicat_configuration import KafkaOptions from streaming_data_types import deserialise_wrdn from streaming_data_types.finished_writing_wrdn import ( FILE_IDENTIFIER as WRDN_FILE_IDENTIFIER, @@ -13,7 +13,7 @@ from streaming_data_types.finished_writing_wrdn import WritingFinished -def collect_consumer_options(options: kafkaOptions) -> dict: +def collect_consumer_options(options: KafkaOptions) -> dict: """Build a Kafka consumer and configure it according to the ``options``.""" from dataclasses import asdict @@ -35,7 +35,7 @@ def collect_consumer_options(options: kafkaOptions) -> dict: return config_dict -def collect_kafka_topics(options: kafkaOptions) -> list[str]: +def collect_kafka_topics(options: KafkaOptions) -> list[str]: """Return the Kafka topics as a list.""" if isinstance(options.topics, str): return options.topics.split(",") @@ -45,7 +45,7 @@ def collect_kafka_topics(options: kafkaOptions) -> list[str]: raise TypeError("The topics must be a list or a comma-separated string.") -def build_consumer(kafka_options: kafkaOptions, logger: logging.Logger) -> Consumer: +def build_consumer(kafka_options: KafkaOptions, logger: logging.Logger) -> Consumer: """Build a Kafka consumer and configure it according to the ``options``.""" consumer_options = collect_consumer_options(kafka_options) logger.info("Connecting to Kafka with the following parameters:") @@ -136,24 +136,24 @@ def wrdn_messages( yield None -def compose_message_path( - *, - target_dir: pathlib.Path, - nexus_file_path: pathlib.Path, - message_saving_options: MessageSavingOptions, -) -> pathlib.Path: - """Compose the message path based on the nexus file path and configuration.""" - - return target_dir / ( - pathlib.Path( - ".".join( - ( - nexus_file_path.stem, - message_saving_options.message_file_extension.removeprefix("."), - ) - ) - ) - ) +# def compose_message_path( +# *, +# target_dir: pathlib.Path, +# nexus_file_path: pathlib.Path, +# message_saving_options: MessageSavingOptions, +# ) -> pathlib.Path: +# """Compose the message path based on the nexus file path and configuration.""" +# +# return target_dir / ( +# pathlib.Path( +# ".".join( +# ( +# nexus_file_path.stem, +# message_saving_options.message_file_extension.removeprefix("."), +# ) +# ) +# ) +# ) def save_message_to_file( diff --git a/src/scicat_logging.py b/src/scicat_logging.py index e25453a..d9ff480 100644 --- a/src/scicat_logging.py +++ b/src/scicat_logging.py @@ -5,19 +5,21 @@ import logging.handlers import graypy -from scicat_configuration import IngesterConfig +from scicat_configuration import OfflineIngestorConfig, OnlineIngestorConfig -def build_logger(config: IngesterConfig) -> logging.Logger: +def build_logger( + config: OnlineIngestorConfig | OfflineIngestorConfig, +) -> logging.Logger: """Build a logger and configure it according to the ``config``.""" - run_options = config.run_options + logging_options = config.logging # Build logger and formatter logger = logging.getLogger('esd extract parameters') formatter = logging.Formatter( " - ".join( ( - run_options.log_message_prefix, + logging_options.log_message_prefix, '%(asctime)s', '%(name)s', '%(levelname)s', @@ -27,9 +29,9 @@ def build_logger(config: IngesterConfig) -> logging.Logger: ) # Add FileHandler - if run_options.file_log: - file_name_components = [run_options.file_log_base_name] - if run_options.file_log_timestamp: + if logging_options.file_log: + file_name_components = [logging_options.file_log_base_name] + if logging_options.file_log_timestamp: file_name_components.append( datetime.datetime.now(datetime.UTC).strftime('%Y%m%d%H%M%S%f') ) @@ -40,30 +42,29 @@ def build_logger(config: IngesterConfig) -> logging.Logger: logger.addHandler(file_handler) # Add SysLogHandler - if run_options.system_log: + if logging_options.system_log: logger.addHandler(logging.handlers.SysLogHandler(address='/dev/log')) # Add graylog handler - if run_options.graylog: - graylog_config = config.graylog_options + if logging_options.graylog: graylog_handler = graypy.GELFTCPHandler( - graylog_config.host, - int(graylog_config.port), - facility=graylog_config.facility, + logging_options.graylog_host, + int(logging_options.graylog_port), + facility=logging_options.graylog_facility, ) logger.addHandler(graylog_handler) # Set the level and formatter for all handlers - logger.setLevel(run_options.logging_level) + logger.setLevel(logging_options.logging_level) for handler in logger.handlers: - handler.setLevel(run_options.logging_level) + handler.setLevel(logging_options.logging_level) handler.setFormatter(formatter) # Add StreamHandler # streamer handler is added last since it is using different formatter - if run_options.verbose: + if logging_options.verbose: from rich.logging import RichHandler - logger.addHandler(RichHandler(level=run_options.logging_level)) + logger.addHandler(RichHandler(level=logging_options.logging_level)) return logger diff --git a/src/scicat_metadata.py b/src/scicat_metadata.py index 3367588..572d50a 100644 --- a/src/scicat_metadata.py +++ b/src/scicat_metadata.py @@ -5,6 +5,10 @@ from collections.abc import Callable from importlib.metadata import entry_points +SCIENTIFIC_METADATA_TYPE = "scientific_metadata" +HIGH_LEVEL_METADATA_TYPE = "high_level" +VALID_METADATA_TYPES = (SCIENTIFIC_METADATA_TYPE, HIGH_LEVEL_METADATA_TYPE) + def load_metadata_extractors(extractor_name: str) -> Callable: """Load metadata extractors from the entry points.""" @@ -77,3 +81,13 @@ def select_applicable_schema(nexus_file, nxs, schemas): return schema raise Exception("No applicable metadata schema configuration found!!") + + +def render_variable_value(var_value: str, variable_registry: dict) -> str: + for var_name, var_value in variable_registry.items(): + var_value = var_value.replace("<" + var_name + ">", str(var_value)) + + if "<" in var_value and ">" in var_value: + raise Exception(f"Unresolved variable: {var_value}") + + return var_value diff --git a/src/scicat_offline_ingestor.py b/src/scicat_offline_ingestor.py new file mode 100644 index 0000000..87e6b29 --- /dev/null +++ b/src/scicat_offline_ingestor.py @@ -0,0 +1,163 @@ +# SPDX-License-Identifier: BSD-3-Clause +# Copyright (c) 2024 ScicatProject contributors (https://github.com/ScicatProject) +# import scippnexus as snx +import copy +import json +import os +import pathlib + +import h5py +from scicat_communication import create_scicat_dataset, create_scicat_origdatablock +from scicat_configuration import ( + build_offline_ingestor_arg_parser, + build_scicat_offline_ingestor_config, +) +from scicat_dataset import ( + create_data_file_list, + create_scicat_dataset_instance, + extract_variables_values, + scicat_dataset_to_dict, +) +from scicat_logging import build_logger +from scicat_metadata import collect_schemas, select_applicable_schema +from scicat_path_helpers import compose_ingestor_directory +from system_helpers import exit, offline_ingestor_exit_at_exceptions + + +def _prepare_scicat_origdatablock(scicat_dataset, datafilelist, config, logger): + """ + Create local copy of the orig datablock to send to scicat + """ + logger.info( + "_prepare_scicat_origdatablock: Preparing scicat origdatablock structure" + ) + origdatablock = { + "ownerGroup": scicat_dataset["ownerGroup"], + "accessGroups": scicat_dataset["accessGroups"], + "size": sum([item["size"] for item in datafilelist]), + "chkAlg": config.ingestion.file_hash_algorithm, + "dataFileList": datafilelist, + "datasetId": scicat_dataset["pid"], + } + + logger.info( + "_prepare_scicat_origdatablock: Scicat origdatablock: %s", + json.dumps(origdatablock), + ) + return origdatablock + + +def _define_dataset_source_folder(datafilelist) -> pathlib.Path: + """ + Return the dataset source folder, which is the common path + between all the data files associated with the dataset + """ + return pathlib.Path(os.path.commonpath([item["path"] for item in datafilelist])) + + +def _path_to_relative( + datafilelist_item: dict, dataset_source_folder: pathlib.Path +) -> dict: + """ + Copy the datafiles item and transform the path to the relative path + to the dataset source folder + """ + origdatablock_datafilelist_item = copy.deepcopy(datafilelist_item) + origdatablock_datafilelist_item["path"] = str( + datafilelist_item["path"].to_relative(dataset_source_folder) + ) + return origdatablock_datafilelist_item + + +def _prepare_origdatablock_datafilelist( + datafiles_list: list, dataset_source_folder: pathlib.Path +) -> list: + """ + Prepare the datafiles list for the origdatablock entry in scicat + That means that the file paths needs to be relative to the dataset source folder + """ + return [_path_to_relative(item, dataset_source_folder) for item in datafiles_list] + + +def main() -> None: + """Main entry point of the app.""" + arg_parser = build_offline_ingestor_arg_parser() + arg_namespace = arg_parser.parse_args() + config = build_scicat_offline_ingestor_config(arg_namespace) + ingestion_options = config.ingestion + fh_options = ingestion_options.file_handling + logger = build_logger(config) + + # Log the configuration as dictionary so that it is easier to read from the logs + logger.info( + 'Starting the Scicat background Ingestor with the following configuration: %s', + config.to_dict(), + ) + + # Collect all metadata schema configurations + schemas = collect_schemas(ingestion_options.schemas_directory) + + with offline_ingestor_exit_at_exceptions(logger): + nexus_file_path = pathlib.Path(config.offline_run.nexus_file) + logger.info( + "Nexus file to be ingested : %s", + nexus_file_path, + ) + + # define which is the directory where the ingestor should save + # the files it creates, if any is created + ingestor_directory = compose_ingestor_directory(fh_options, nexus_file_path) + + # open nexus file with h5py + with h5py.File(nexus_file_path) as h5file: + # load instrument metadata configuration + metadata_schema = select_applicable_schema(nexus_file_path, h5file, schemas) + + # define variables values + variable_map = extract_variables_values( + metadata_schema['variables'], h5file, config.scicat + ) + + # Collect data-file descriptions + data_file_list = create_data_file_list( + nexus_file=nexus_file_path, + ingestor_directory=ingestor_directory, + config=fh_options, + logger=logger, + # TODO: add done_writing_message_file and nexus_structure_file + ) + + # Create scicat dataset instance(entry) + local_dataset = scicat_dataset_to_dict( + create_scicat_dataset_instance( + metadata_schema_id=metadata_schema["id"], + metadata_schemas=metadata_schema["schemas"], + variable_map=variable_map, + data_file_list=data_file_list, + config=config.dataset, + logger=logger, + ) + ) + # create dataset in scicat + scicat_dataset = create_scicat_dataset( + dataset=local_dataset, config=config.scicat, logger=logger + ) + + dataset_source_folder = _define_dataset_source_folder(data_file_list) + + origdatablock_datafiles_list = _prepare_origdatablock_datafilelist( + data_file_list, dataset_source_folder + ) + # create and populate scicat origdatablock entry + # with files and hashes previously computed + local_origdatablock = _prepare_scicat_origdatablock( + scicat_dataset, origdatablock_datafiles_list, config, logger + ) + + # create origdatablock in scicat + scicat_origdatablock = create_scicat_origdatablock( + origdatablock=local_origdatablock, config=config.scicat, logger=logger + ) + + # check one more time if we successfully created the entries in scicat + exit(logger, unexpected=(bool(scicat_dataset) and bool(scicat_origdatablock))) diff --git a/src/scicat_online_ingestor.py b/src/scicat_online_ingestor.py new file mode 100644 index 0000000..eb04123 --- /dev/null +++ b/src/scicat_online_ingestor.py @@ -0,0 +1,153 @@ +# SPDX-License-Identifier: BSD-3-Clause +# Copyright (c) 2024 Scicatproject contributors (https://github.com/ScicatProject) +# ruff: noqa: E402, F401 + +import importlib.metadata +import logging +import pathlib +import subprocess + +try: + __version__ = importlib.metadata.version(__package__ or __name__) +except importlib.metadata.PackageNotFoundError: + __version__ = "0.0.0" + +del importlib + +from scicat_configuration import ( + FileHandlingOptions, + build_online_arg_parser, + build_scicat_online_ingestor_config, +) +from scicat_kafka import ( + WritingFinished, + build_consumer, + save_message_to_file, + wrdn_messages, +) +from scicat_logging import build_logger +from scicat_path_helpers import ( + compose_ingestor_directory, + compose_ingestor_output_file_path, +) +from system_helpers import online_ingestor_exit_at_exceptions + + +def dump_message_to_file_if_needed( + *, + logger: logging.Logger, + message_file_path: pathlib.Path, + file_handling_options: FileHandlingOptions, + message: WritingFinished, +) -> None: + """Dump the message to a file according to the configuration.""" + if not file_handling_options.message_to_file: + logger.info("Message saving to file is disabled. Skipping saving message.") + return + elif not message_file_path.parent.exists(): + logger.info("Message file directory not accessible. Skipping saving message.") + return + + logger.info("Message will be saved in %s", message_file_path) + save_message_to_file( + message=message, + message_file_path=message_file_path, + ) + logger.info("Message file saved") + + +def _individual_message_commit(offline_ingestors, consumer, logger: logging.Logger): + logger.info("%s offline ingestors running", len(offline_ingestors)) + for job_id, job_item in offline_ingestors.items(): + result = job_item["proc"].poll() + if result is not None: + logger.info( + "Offline ingestor for job id %s ended with result %s", job_id, result + ) + if result == 0: + logger.info("Executing commit for message with job id %s", job_id) + consumer.commit(message=job_item["message"]) + logger.info( + "Removed ingestor for message with job id %s from queue", job_id + ) + offline_ingestors.pop(job_id) + + +def main() -> None: + """Main entry point of the app.""" + arg_parser = build_online_arg_parser() + arg_namespace = arg_parser.parse_args() + config = build_scicat_online_ingestor_config(arg_namespace) + logger = build_logger(config) + + # Log the configuration as dictionary so that it is easier to read from the logs + logger.info('Starting the Scicat online Ingestor with the following configuration:') + logger.info(config.to_dict()) + + with online_ingestor_exit_at_exceptions(logger): + # Kafka consumer + if (consumer := build_consumer(config.kafka, logger)) is None: + raise RuntimeError("Failed to build the Kafka consumer") + + # this is the dictionary that contains the list of offline ingestor running + offline_ingestors: dict = {} + + # Receive messages + for message in wrdn_messages(consumer, logger): + logger.info("Processing message: %s", message) + + # Check if we have received a WRDN message. + # ``message: None | WritingFinished`` + if message: + # extract job id + job_id = message.job_id + # Extract nexus file path from the message. + nexus_file_path = pathlib.Path(message.file_name) + ingestor_directory = compose_ingestor_directory( + config.ingestion.file_handling, nexus_file_path + ) + done_writing_message_file_path = compose_ingestor_output_file_path( + ingestor_directory=ingestor_directory, + file_name=nexus_file_path.stem, + file_extension=config.ingestion.file_handling.message_file_extension, + ) + dump_message_to_file_if_needed( + logger=logger, + file_handling_options=config.ingestion.file_handling, + message=message, + message_file_path=done_writing_message_file_path, + ) + + # instantiate a new process and runs background ingestor + # on the nexus file + # use open process and wait for outcome + """ + background_ingestor + -c configuration_file + -f nexus_filename + -j job_id + -m message_file_path # optional depending on the + # message_saving_options.message_output + """ + cmd = [ + config.ingestion.offline_ingestor_executable, + "-c", + arg_namespace.config_file, + "-f", + nexus_file_path, + "-j", + job_id, + ] + if config.ingestion.file_handling.message_to_file: + cmd += ["-m", done_writing_message_file_path] + proc = subprocess.Popen(cmd) # noqa: S603 + # save info about the background process + offline_ingestors[job_id] = { + "proc": proc, + "message": message, + } + + # if background process is successful + # check if we need to commit the individual message + if config.kafka.individual_message_commit: + _individual_message_commit(offline_ingestors, consumer, logger) diff --git a/src/scicat_path_helpers.py b/src/scicat_path_helpers.py index 5a164ca..df52782 100644 --- a/src/scicat_path_helpers.py +++ b/src/scicat_path_helpers.py @@ -5,20 +5,37 @@ from scicat_configuration import FileHandlingOptions -def select_target_directory( - fh_options: FileHandlingOptions, file_path: pathlib.Path +def compose_ingestor_directory( + fh_options: FileHandlingOptions, nexus_file_path: str | pathlib.Path ) -> pathlib.Path: - """Select the target directory based on the file path and the options.""" - if fh_options.hdf_structure_output == "SOURCE_FOLDER": - return file_path.parent / pathlib.Path(fh_options.ingestor_files_directory) + """Select the ingestor directory based on the file path and the options.""" + directory = pathlib.Path(fh_options.ingestor_files_directory) + nexus_file_path = ( + pathlib.Path(nexus_file_path) + if isinstance(nexus_file_path, str) + else nexus_file_path + ) + if directory.is_absolute(): + return directory else: - return pathlib.Path(fh_options.local_output_directory) + directory = nexus_file_path.parents[0] / directory + return directory.resolve() -def compose_checksum_file_path( - fh_options: FileHandlingOptions, file_path: pathlib.Path +def compose_ingestor_output_file_path( + ingestor_directory: pathlib.Path, + file_name: str, + file_extension: str, ) -> pathlib.Path: - """Compose the path for the checksum file.""" - return pathlib.Path(fh_options.ingestor_files_directory) / pathlib.Path( - file_path.name + fh_options.hash_file_extension + """Compose the ingestor output file path based on the input provided.""" + + return ingestor_directory / ( + pathlib.Path( + ".".join( + ( + file_name, + file_extension, + ) + ) + ) ) diff --git a/src/scicat_schemas/__init__.py b/src/scicat_schemas/__init__.py deleted file mode 100644 index cf1e37f..0000000 --- a/src/scicat_schemas/__init__.py +++ /dev/null @@ -1,9 +0,0 @@ -# SPDX-License-Identifier: BSD-3-Clause -# Copyright (c) 2024 ScicatProject contributors (https://github.com/ScicatProject) -# ruff: noqa: F401 - -from .load_template import ( - load_single_datafile_template, - load_dataset_schema_template, - load_origdatablock_schema_template, -) diff --git a/src/scicat_schemas/dataset.schema.json.jinja b/src/scicat_schemas/dataset.schema.json.jinja deleted file mode 100644 index bf24528..0000000 --- a/src/scicat_schemas/dataset.schema.json.jinja +++ /dev/null @@ -1,30 +0,0 @@ -{ - "pid": "{{ dataset_pid }}", - "datasetName": "{{ dataset_name }}", - "description": "{{ dataset_description }}", - "principalInvestigator": "{{ principal_investigator }}", - "creationLocation": "{{ facility }}:{{ environment }}", - "scientificMetadata": { - {{ scientific_metadata }} - }, - "owner": "{{ owner }}", - "ownerEmail": "{{ owner_email }}", - "sourceFolder": "{{ source_folder }}", - "contactEmail": "{{ contact_email }}", - "creationTime": "{{ iso_creation_time }}", - "type": "raw", - "techniques": [ - { - "pid": "{{ technique_pid }}", - "names": "{{ technique_name }}" - } - ], - "instrumentId": "{{ instrument_id }}", - "sampleId": "{{ sample_id }}", - "proposalId": "{{ proposal_id }}", - "ownerGroup": "{{ owner_group }}", - "accessGroups": [ - {% for access_group in access_groups %}"{{ access_group }}"{% if not loop.last %}, - {% endif %}{% endfor %} - ] -} diff --git a/src/scicat_schemas/load_template.py b/src/scicat_schemas/load_template.py deleted file mode 100644 index 070aadc..0000000 --- a/src/scicat_schemas/load_template.py +++ /dev/null @@ -1,27 +0,0 @@ -# SPDX-License-Identifier: BSD-3-Clause -# Copyright (c) 2024 ScicatProject contributors (https://github.com/ScicatProject) -import pathlib - -from jinja2 import Template - -_CUR_DIR = pathlib.Path(__file__).parent -_SINGLE_TEMPLATE_PATH = _CUR_DIR / pathlib.Path("single_datafile.json.jinja") -_DATASET_SCHEMA_TEMPLATE_PATH = _CUR_DIR / pathlib.Path("dataset.schema.json.jinja") -_ORIG_DATABLOCK_SCHEMA_TEMPLATE_PATH = _CUR_DIR / pathlib.Path( - "origdatablock.schema.json.jinja" -) - - -def load_single_datafile_template() -> Template: - """Load the template for the single datafile schema.""" - return Template((_SINGLE_TEMPLATE_PATH).read_text()) - - -def load_dataset_schema_template() -> Template: - """Load the template for the dataset schema.""" - return Template((_CUR_DIR / _DATASET_SCHEMA_TEMPLATE_PATH).read_text()) - - -def load_origdatablock_schema_template() -> Template: - """Load the template for the original data block schema.""" - return Template((_CUR_DIR / _ORIG_DATABLOCK_SCHEMA_TEMPLATE_PATH).read_text()) diff --git a/src/scicat_schemas/origdatablock.schema.json.jinja b/src/scicat_schemas/origdatablock.schema.json.jinja deleted file mode 100644 index 2ab2aa3..0000000 --- a/src/scicat_schemas/origdatablock.schema.json.jinja +++ /dev/null @@ -1,9 +0,0 @@ -{ - "datasetId": "{{ dataset_pid }}", - "size": {{ dataset_size }}, - "chkAlg": "{{ check_algorithm }}", - "dataFileList": [ - {% for data_file_desc in data_file_desc_list %}{{ data_file_desc }}{% if not loop.last %}, - {% endif %}{% endfor %} - ] -} diff --git a/src/scicat_schemas/single_datafile.json.jinja b/src/scicat_schemas/single_datafile.json.jinja deleted file mode 100644 index da87864..0000000 --- a/src/scicat_schemas/single_datafile.json.jinja +++ /dev/null @@ -1,9 +0,0 @@ -{ - "path": "{{ file_absolute_path }}", - "size": {{ file_size }}, - "time": "{{ datetime_isoformat }}",{% if checksum %} - "chk": "{{ checksum }}", - {% endif %}"uid": "{{ uid }}", - "gid": "{{ gid }}", - "perm": "{{ perm }}" -} diff --git a/src/system_helpers.py b/src/system_helpers.py index ed88a33..53ce9b0 100644 --- a/src/system_helpers.py +++ b/src/system_helpers.py @@ -3,7 +3,7 @@ from contextlib import contextmanager -def quit(logger: logging.Logger, unexpected: bool = True) -> None: +def exit(logger: logging.Logger, unexpected: bool = True) -> None: """Log the message and exit the program.""" import sys @@ -12,7 +12,7 @@ def quit(logger: logging.Logger, unexpected: bool = True) -> None: @contextmanager -def exit_at_exceptions( +def online_ingestor_exit_at_exceptions( logger: logging.Logger, daemon: bool = True ) -> Generator[None, None, None]: """Exit the program if an exception is raised.""" @@ -20,14 +20,31 @@ def exit_at_exceptions( yield except KeyboardInterrupt: logger.info("Received keyboard interrupt.") - quit(logger, unexpected=False) + exit(logger, unexpected=False) except Exception as e: logger.error("An exception occurred: %s", e) - quit(logger, unexpected=True) + exit(logger, unexpected=True) else: if daemon: logger.error("Loop finished unexpectedly.") - quit(logger, unexpected=True) + exit(logger, unexpected=True) else: logger.info("Finished successfully.") - quit(logger, unexpected=False) + exit(logger, unexpected=False) + + +@contextmanager +def offline_ingestor_exit_at_exceptions( + logger: logging.Logger, +) -> Generator[None, None, None]: + """ + manage exceptions specifically for offline ingestor + """ + try: + yield + except Exception as e: + logger.error("An exception occurred: %s", e) + else: + logger.error("An unexpected error occurred") + + exit(logger, unexpected=True) diff --git a/tests/minimum_test.py b/tests/minimum_test.py index 9b65be5..0935eef 100644 --- a/tests/minimum_test.py +++ b/tests/minimum_test.py @@ -1,2 +1,3 @@ def test_package() -> None: - import scicat_ingestor # noqa: F401 + import scicat_offline_ingestor # noqa: F401 + import scicat_online_ingestor # noqa: F401 diff --git a/tests/test_logging.py b/tests/test_logging.py deleted file mode 100644 index 8e30445..0000000 --- a/tests/test_logging.py +++ /dev/null @@ -1,45 +0,0 @@ -import pathlib - -import pytest -from scicat_configuration import ( - DatasetOptions, - FileHandlingOptions, - GraylogOptions, - IngesterConfig, - IngestionOptions, - RunOptions, - kafkaOptions, -) - - -@pytest.fixture() -def scicat_config(tmp_path: pathlib.Path) -> IngesterConfig: - return IngesterConfig( - original_dict={}, - run_options=RunOptions( - config_file='test', - verbose=True, - file_log=True, - file_log_base_name=(tmp_path / pathlib.Path('test')).as_posix(), - file_log_timestamp=True, - system_log=False, - system_log_facility=None, - log_message_prefix='test', - logging_level='DEBUG', - check_by_job_id=True, - pyscicat='test', - ), - kafka_options=kafkaOptions(), - graylog_options=GraylogOptions(), - ingestion_options=IngestionOptions( - file_handling_options=FileHandlingOptions(), - dataset_options=DatasetOptions(), - ), - ) - - -def test_scicat_logging_build_logger(scicat_config: IngesterConfig) -> None: - from scicat_logging import build_logger - - logger = build_logger(scicat_config) - assert len(logger.handlers) == 2 # FileHandler and StreamHandler diff --git a/tests/test_scicat_configuration.py b/tests/test_scicat_configuration.py deleted file mode 100644 index 5bd8a08..0000000 --- a/tests/test_scicat_configuration.py +++ /dev/null @@ -1,113 +0,0 @@ -# SPDX-License-Identifier: BSD-3-Clause -# Copyright (c) 2024 ScicatProject contributors (https://github.com/ScicatProject) -import argparse - -import pytest -from scicat_configuration import IngesterConfig - - -@pytest.fixture() -def main_arg_parser() -> argparse.ArgumentParser: - """Return the namespace of the main argument parser.""" - from scicat_configuration import build_main_arg_parser - - return build_main_arg_parser() - - -def test_scicat_arg_parser_configuration_matches( - main_arg_parser: argparse.ArgumentParser, -) -> None: - """Test if options in the configuration file matches the argument parser.""" - import json - import pathlib - - scicat_namespace = main_arg_parser.parse_args( - ['-c', 'resources/config.sample.json'] - ) - - # Check if the configuration file is the same - assert scicat_namespace.config_file == 'resources/config.sample.json' - config_path = pathlib.Path(scicat_namespace.config_file) - config_from_args: dict = vars(scicat_namespace) - - # Parse the configuration file - assert config_path.exists() - config_from_file: dict = json.loads(config_path.read_text()) - main_options: dict = config_from_file.get('options', {}) - - # Check if all keys matches - all_keys = set(config_from_args.keys()).union(main_options.keys()) - for key in all_keys: - assert key in config_from_args - assert key in main_options - - -def test_build_scicat_config_default(main_arg_parser: argparse.ArgumentParser) -> None: - """Test if the configuration can be built from default arguments.""" - from scicat_configuration import build_scicat_ingester_config - - scicat_namespace = main_arg_parser.parse_args() - scicat_config = build_scicat_ingester_config(scicat_namespace) - assert scicat_config.run_options.config_file == 'config.20240405.json' - - -@pytest.fixture() -def ingester_config(main_arg_parser: argparse.ArgumentParser) -> IngesterConfig: - from scicat_configuration import build_scicat_ingester_config - - scicat_namespace = main_arg_parser.parse_args( - ['-c', 'resources/config.sample.json', '--verbose'] - ) - return build_scicat_ingester_config(scicat_namespace) - - -def test_build_scicat_config(ingester_config: IngesterConfig) -> None: - """Test if the configuration can be built from arguments.""" - assert ingester_config.original_dict['options']['config_file'] == 'config.json' - assert ingester_config.run_options.config_file == 'resources/config.sample.json' - assert not ingester_config.original_dict['options']['verbose'] - assert ingester_config.run_options.verbose - - -def test_scicat_config_original_dict_read_only(ingester_config: IngesterConfig) -> None: - """Test if the original dictionary is read-only.""" - from types import MappingProxyType - - assert isinstance(ingester_config.original_dict, MappingProxyType) - for sub_option in ingester_config.original_dict.values(): - assert isinstance(sub_option, MappingProxyType) - - -def test_scicat_config_kafka_options(ingester_config: IngesterConfig) -> None: - """Test if the Kafka options are correctly read.""" - assert ingester_config.kafka_options.topics == ["KAFKA_TOPIC_1", "KAFKA_TOPIC_2"] - assert ingester_config.kafka_options.enable_auto_commit - - -def test_scicat_background_config_single_run_option() -> None: - """Test if the single run options are correctly read.""" - from scicat_configuration import ( - build_background_ingestor_arg_parser, - build_scicat_background_ingester_config, - ) - - arg_parser = build_background_ingestor_arg_parser() - scicat_namespace = arg_parser.parse_args( - [ - '-c', - 'resources/config.sample.json', - '--verbose', - '--nexus-file', - 'file.nxs', - '--done-writing-message-file', - 'file.json', - ] - ) - background_ingester_config = build_scicat_background_ingester_config( - scicat_namespace - ) - assert background_ingester_config.single_run_options.nexus_file == 'file.nxs' - assert ( - background_ingester_config.single_run_options.done_writing_message_file - == 'file.json' - ) diff --git a/tests/test_scicat_schema.py b/tests/test_scicat_schema.py deleted file mode 100644 index 33f23dd..0000000 --- a/tests/test_scicat_schema.py +++ /dev/null @@ -1,226 +0,0 @@ -# SPDX-License-Identifier: BSD-3-Clause -# Copyright (c) 2024 ScicatProject contributors (https://github.com/ScicatProject) - - -def test_single_datafile_template_loading() -> None: - from scicat_schemas.load_template import load_single_datafile_template - - assert load_single_datafile_template() is not None - - -def test_dataset_schema_template_loading() -> None: - from scicat_schemas.load_template import load_dataset_schema_template - - assert load_dataset_schema_template() is not None - - -def test_origdatablock_schema_template_loading() -> None: - from scicat_schemas.load_template import load_origdatablock_schema_template - - assert load_origdatablock_schema_template() is not None - - -_example_scientific_metadata = """"run_number": { - "value": 18856, - "unit": "", - "human_name": "Run Number", - "type": "integer" - }, - "sample_temperature": { - "value": 20.4, - "unit": "C", - "human_name": "Sample Temperature", - "type": "quantity" - }, - "start_time" : { - "value" : "2024-07-16T09:30:12.987Z", - "unit" : "", - "human_name" : "Start Time", - "type" : "date" - }""" - -_example_dataset_schema = ( - """ -{ - "pid": "12.234.34567/e3690b21-ee8c-40d6-9409-6b6fdca776d2", - "datasetName": "this is a dataset", - "description": "this is the description of the dataset", - "principalInvestigator": "Somebodys Name", - "creationLocation": "ESS:CODA", - "scientificMetadata": { - """ - + _example_scientific_metadata - + """ - }, - "owner": "Somebodys Name", - "ownerEmail": "someones_@_email", - "sourceFolder": "/ess/data/coda/2024/616254", - "contactEmail": "someones_@_email", - "creationTime": "2024-07-16T10:00:00.000Z", - "type": "raw", - "techniques": [ - { - "pid": "someprotocol://someones/url/and/id", - "names": "absorption and phase contrast nanotomography" - } - ], - "instrumentId": "12.234.34567/765b3dc3-f658-410e-b371-04dd1adcd520", - "sampleId": "bd31725a-dbfd-4c32-87db-1c1ebe61e5ca", - "proposalId": "616254", - "ownerGroup": "ess_proposal_616254", - "accessGroups": [ - "scientific information management systems group", - "scicat group" - ] -} - -""" -) - - -def test_dataset_schema_rendering() -> None: - import json - - from scicat_dataset import build_dataset_instance - - dataset_schema = build_dataset_description( - dataset_pid_prefix="12.234.34567", - nxs_dataset_pid="e3690b21-ee8c-40d6-9409-6b6fdca776d2", - dataset_name="this is a dataset", - dataset_description="this is the description of the dataset", - principal_investigator="Somebodys Name", - facility="ESS", - environment="CODA", - scientific_metadata=_example_scientific_metadata, - owner="Somebodys Name", - owner_email="someones_@_email", - source_folder="/ess/data/coda/2024/616254", - contact_email="someones_@_email", - iso_creation_time="2024-07-16T10:00:00.000Z", - technique_pid="someprotocol://someones/url/and/id", - technique_name="absorption and phase contrast nanotomography", - instrument_id="12.234.34567/765b3dc3-f658-410e-b371-04dd1adcd520", - sample_id="bd31725a-dbfd-4c32-87db-1c1ebe61e5ca", - proposal_id="616254", - owner_group="ess_proposal_616254", - access_groups=[ - "scientific information management systems group", - "scicat group", - ], - ) - - assert json.loads(dataset_schema) == json.loads(_example_dataset_schema) - - -_example_file_description_1 = """ -{ - "path": "/ess/data/coda/2024/616254/0001.nxs", - "size": 1231231, - "time": "2024-07-16T10:00:00.000Z", - "chk": "1234567890abcdef", - "uid": "1004", - "gid": "1005", - "perm": "33188" -} -""" - - -def test_single_file_description_rendering() -> None: - import json - - from scicat_dataset import build_single_datafile_instance - - file_description = build_single_datafile_description( - file_absolute_path="/ess/data/coda/2024/616254/0001.nxs", - file_size=1231231, - datetime_isoformat="2024-07-16T10:00:00.000Z", - checksum="1234567890abcdef", - uid="1004", - gid="1005", - perm="33188", - ) - - assert json.loads(file_description) == json.loads(_example_file_description_1) - - -_example_file_description_2 = """ -{ - "path": "/ess/data/coda/2024/616254/0002.nxs", - "size": 1231231, - "time": "2024-07-16T10:00:00.000Z", - "uid": "1004", - "gid": "1005", - "perm": "33188" -} -""" - - -def test_single_file_description_rendering_no_checksum() -> None: - import json - - from scicat_dataset import build_single_datafile_instance - - file_description = build_single_datafile_description( - file_absolute_path="/ess/data/coda/2024/616254/0002.nxs", - file_size=1231231, - datetime_isoformat="2024-07-16T10:00:00.000Z", - uid="1004", - gid="1005", - perm="33188", - ) - - assert json.loads(file_description) == json.loads(_example_file_description_2) - - -_example_file_description_3 = """ -{ - "path": "/ess/data/coda/2024/616254/0003.nxs", - "size": 1231231, - "time": "2024-07-16T10:00:00.000Z", - "chk": "1234567890abcdef", - "uid": "1004", - "gid": "1005", - "perm": "33188" -} -""" - -_example_orig_datablock = ( - """ -{ - "datasetId": "20.500.12269/53fd2786-3729-11ef-83e5-fa163e9aae0a", - "size": 446630741, - "chkAlg": "blake2b", - "dataFileList": [ - """ - + _example_file_description_1 - + """, - """ - + _example_file_description_2 - + """, - """ - + _example_file_description_3 - + """ - ] -} -""" -) - - -def test_orig_datablock_rendering() -> None: - import json - - from scicat_dataset import build_orig_datablock_instance - - orig_datablock = build_orig_datablock_description( - dataset_pid_prefix="20.500.12269", - nxs_dataset_pid="53fd2786-3729-11ef-83e5-fa163e9aae0a", - dataset_size=446630741, - check_algorithm="blake2b", - data_file_desc_list=[ - _example_file_description_1, - _example_file_description_2, - _example_file_description_3, - ], - ) - - assert json.loads(orig_datablock) == json.loads(_example_orig_datablock)