diff --git a/intake.py b/intake.py index 084451a96..eb0e06615 100644 --- a/intake.py +++ b/intake.py @@ -6,6 +6,7 @@ import fnmatch import time +import traceback import genquery @@ -407,6 +408,7 @@ def api_intake_scan_for_datasets(ctx, coll): try: _intake_scan_for_datasets(ctx, coll) except Exception: + log.write(ctx, "Intake scan failed with the following exception: " + traceback.format_exc()) return {"proc_status": "NOK", "error_msg": "Error during scanning process"} else: return {"proc_status": "NOK", "error_msg": "No permissions to scan collection"} diff --git a/intake_scan.py b/intake_scan.py index 90b028a54..4703d2b92 100644 --- a/intake_scan.py +++ b/intake_scan.py @@ -9,7 +9,7 @@ import genquery import intake -from intake_utils import intake_extract_tokens_from_name, intake_tokens_identify_dataset +from intake_utils import dataset_parse_id, intake_scan_get_metadata_update from util import * @@ -40,49 +40,23 @@ def intake_scan_collection(ctx, root, scope, in_dataset, found_datasets): # Determene lock state for object (no collectoin locked_state = object_is_locked(ctx, path, False) - if not (locked_state['locked'] or locked_state['frozen']): - remove_dataset_metadata(ctx, path, False) - scan_mark_scanned(ctx, path, False) - if in_dataset: - subscope = scope.copy() - - # Safeguard original data - prev_scope = subscope.copy() - - # Extract tokens of current name - intake_extract_tokens_from_name(ctx, row[1], row[0], False, subscope) - - new_deeper_dataset_toplevel = False - if not (prev_scope['pseudocode'] == subscope['pseudocode'] - and prev_scope['experiment_type'] == subscope['experiment_type'] - and prev_scope['wave'] == subscope['wave']): - prev_scope['directory'] = prev_scope["dataset_directory"] - if 'version' not in prev_scope: - prev_scope['version'] = 'Raw' - - # It is safe to assume that the dataset was collection based without actively checking. - # Otherwise it would mean that each found file on this level could potentially change the dataset properties - # avu.rm_from_coll(ctx, prev_scope['directory'], 'dataset_toplevel', dataset_make_id(prev_scope)) - # If still present (could have been removed in loop earlier) remove dataset_toplevel on prev_scope['directory'] - try: - avu.rmw_from_coll(ctx, prev_scope['directory'], 'dataset_toplevel', "%") - except msi.Error: - pass + if locked_state['locked'] or locked_state['frozen']: + continue - new_deeper_dataset_toplevel = True - apply_dataset_metadata(ctx, path, subscope, False, new_deeper_dataset_toplevel) - else: - subscope = intake_extract_tokens_from_name(ctx, row[1], row[0], False, scope.copy()) + remove_dataset_metadata(ctx, path, False) + scan_mark_scanned(ctx, path, False) - if intake_tokens_identify_dataset(subscope): + parent_in_dataset = in_dataset + metadata_update = intake_scan_get_metadata_update(ctx, path, False, in_dataset, scope) + + if metadata_update["in_dataset"]: + apply_dataset_metadata(ctx, path, metadata_update["new_metadata"], False) + if not parent_in_dataset: # We found a top-level dataset data object. - subscope["dataset_directory"] = row[1] - apply_dataset_metadata(ctx, path, subscope, False, True) - # For reporting purposes collect the subscopes - found_datasets.append(subscope) - else: - apply_partial_metadata(ctx, subscope, path, False) - avu.set_on_data(ctx, path, "unrecognized", "Experiment type, wave or pseudocode missing from path") + found_datasets.append(metadata_update["new_metadata"]) + else: + apply_partial_metadata(ctx, metadata_update["new_metadata"], path, False) + avu.set_on_data(ctx, path, "unrecognized", "Experiment type, wave or pseudocode missing from path") # Scan collections under root iter = genquery.row_iterator( @@ -100,61 +74,28 @@ def intake_scan_collection(ctx, root, scope, in_dataset, found_datasets): # get locked /frozen status locked_state = object_is_locked(ctx, path, True) - if not (locked_state['locked'] or locked_state['frozen']): - remove_dataset_metadata(ctx, path, True) - - subscope = scope.copy() - child_in_dataset = in_dataset - - if in_dataset: # initially is False - # Safeguard original data - prev_scope = subscope.copy() - - # Extract tokens of current name - intake_extract_tokens_from_name(ctx, path, dirname, True, subscope) + if locked_state['locked'] or locked_state['frozen']: + continue - new_deeper_dataset_toplevel = False + remove_dataset_metadata(ctx, path, True) + scan_mark_scanned(ctx, path, True) - # A change in path in relation to the dataset_directory invokes handling of deeper lying dataset. - # This, not necessarily with a dataset id change, but it does change the dataset toplevel - # In fact the same dataset simply lies a level deeper which invokes a dataset_toplevel change - if not (prev_scope['pseudocode'] == subscope['pseudocode'] - and prev_scope['experiment_type'] == subscope['experiment_type'] - and prev_scope['wave'] == subscope['wave'] - and path == prev_scope['dataset_directory']): - # Found a deeper lying dataset with more specific attributes - # Prepwork for being able to create a dataset_id - prev_scope['directory'] = prev_scope["dataset_directory"] - if 'version' not in prev_scope: - prev_scope['version'] = 'Raw' + parent_in_dataset = in_dataset + metadata_update = intake_scan_get_metadata_update(ctx, path, True, in_dataset, scope) - # If still present (could have been removed in loop earlier) remove dataset_toplevel on prev_scope['directory'] - try: - avu.rmw_from_coll(ctx, prev_scope['directory'], 'dataset_toplevel', "%") - except msi.Error: - pass - - # set flag correctly for creating of new toplevel - new_deeper_dataset_toplevel = True - - subscope["dataset_directory"] = path - apply_dataset_metadata(ctx, path, subscope, True, new_deeper_dataset_toplevel) + if metadata_update["in_dataset"]: + apply_dataset_metadata(ctx, path, metadata_update["new_metadata"], True) + if not parent_in_dataset: + # We found a new top-level dataset data object. + found_datasets.append(metadata_update["new_metadata"]) + else: + apply_partial_metadata(ctx, metadata_update["new_metadata"], path, True) - scan_mark_scanned(ctx, path, True) - else: - subscope = intake_extract_tokens_from_name(ctx, path, dirname, True, subscope) - - if intake_tokens_identify_dataset(subscope): - child_in_dataset = True - # We found a top-level dataset collection. - subscope["dataset_directory"] = path - apply_dataset_metadata(ctx, path, subscope, True, True) - # For reporting purposes collect the subscopes - found_datasets.append(subscope) - else: - apply_partial_metadata(ctx, subscope, path, True) - # Go a level deeper - found_datasets = intake_scan_collection(ctx, path, subscope, child_in_dataset, found_datasets) + found_datasets = intake_scan_collection(ctx, + path, + metadata_update["new_metadata"], + parent_in_dataset or metadata_update["in_dataset"], + found_datasets) return found_datasets @@ -272,45 +213,21 @@ def scan_mark_scanned(ctx, path, is_collection): avu.set_on_data(ctx, path, 'scanned', user_and_timestamp) -def apply_dataset_metadata(ctx, path, scope, is_collection, is_top_level): +def apply_dataset_metadata(ctx, path, scope, is_collection): """Apply dataset metadata to an object in a dataset. :param ctx: Combined type of a callback and rei struct :param path: Path to the object :param scope: A scanner scope containing WEPV values :param is_collection: Whether the object is a collection - :param is_top_level: If true, a dataset_toplevel field will be set on the object """ - - if "version" not in scope: - version = "Raw" - else: - version = scope["version"] - - subscope = {"wave": scope["wave"], - "experiment_type": scope["experiment_type"], - "pseudocode": scope["pseudocode"], - "version": version, - "directory": scope["dataset_directory"]} - - subscope["dataset_id"] = dataset_make_id(subscope) - - # add all keys to this to this level - - for key in subscope: + for key in scope: if subscope[key]: if is_collection: avu.set_on_coll(ctx, path, key, subscope[key]) else: avu.set_on_data(ctx, path, key, subscope[key]) - if is_top_level: - # Add dataset_id to dataset_toplevel - if is_collection: - avu.set_on_coll(ctx, path, 'dataset_toplevel', subscope["dataset_id"]) - else: - avu.set_on_data(ctx, path, 'dataset_toplevel', subscope["dataset_id"]) - def apply_partial_metadata(ctx, scope, path, is_collection): """Apply any available id component metadata to the given object. @@ -534,32 +451,3 @@ def get_aggregated_object_error_count(ctx, dataset_id, tl_collection): "COLL_NAME like '" + tl_collection + "%' AND META_DATA_ATTR_NAME = 'error' ", genquery.AS_LIST, ctx ))) - - -def dataset_make_id(scope): - """Construct a dataset based on WEPV and directory. - - :param scope: Create a dataset id - - :returns: Dataset identifier - """ - return scope['wave'] + '\t' + scope['experiment_type'] + '\t' + scope['pseudocode'] + '\t' + scope['version'] + '\t' + scope['directory'] - - -def dataset_parse_id(dataset_id): - """Parse a dataset into its consructive data. - - :param dataset_id: Dataset identifier - - :returns: Dataset as a dict - """ - dataset_parts = dataset_id.split('\t') - dataset = {} - dataset['wave'] = dataset_parts[0] - dataset['experiment_type'] = dataset_parts[1] # Is DIT NOG ERGENS GOED VOOR - dataset['expType'] = dataset_parts[1] - dataset['pseudocode'] = dataset_parts[2] - dataset['version'] = dataset_parts[3] - dataset['directory'] = dataset_parts[4] - - return dataset diff --git a/intake_utils.py b/intake_utils.py index fede32b5d..87170859f 100644 --- a/intake_utils.py +++ b/intake_utils.py @@ -28,16 +28,25 @@ def intake_tokens_identify_dataset(tokens): return (missing == 0) -def intake_extract_tokens_from_name(ctx, path, name, is_collection, scoped_buffer): +def intake_ensure_version_present(ctx, metadata): + """Adds a version attribute with a default value to metadata if it is not yet present. + + :param ctx: Combined type of a callback and rei struct + :param metadata: Dictionary with intake module metadata + """ + if "version" not in metadata: + metadata["version"] = "Raw" + + +def intake_extract_tokens_from_name(ctx, path, scoped_buffer): """Extract one or more tokens from a file / directory name and add dataset information as metadata. :param ctx: Combined type of a callback and rei struct - :param path: Path to object or collection - :param name: Name of object or collection - :param is_collection: Indicates if object or collection + :param path: Full path of the data object or collection :param scoped_buffer: Holds dataset buffer with prefilled keys :returns: Returns extended scope buffer """ - name_without_ext = os.path.splitext(name)[0] + basename = os.path.basename(path) + name_without_ext = os.path.splitext(basename)[0] parts = re.split("[_-]", name_without_ext) for part in parts: scoped_buffer.update(intake_extract_tokens(ctx, part)) @@ -120,3 +129,77 @@ def intake_extract_tokens(ctx, string): foundKVs["experiment_type"] = str_lower return foundKVs + + +def intake_scan_get_metadata_update(ctx, path, is_collection, in_dataset, parent_metadata): + """Determine metadata to be updated for a particular collection or data object, based + on its name and parent metadata. + + This function is separate from the function that actually performs the updates, so + that we can test the logic separately. + + :param ctx: Combined type of a callback and rei struct + :param path: Full path of the data object or collection + :param is_collection: true if it's a collection, false if it's a data object + :param in_dataset: true if the parent already has complete WEP(V) attributes. Otherwise false. + :param parent_metadata: dict containing the intake module metadata of the parent collection ( if any) + + :returns: Returns a dictionary with the following keys / values: + new_metadata: dictionary of new metadata to apply to this data object or collection + in_dataset: true if current object (along with values passed from parents) has complete WEP(V) values. + otherwise false. + """ + + local_metadata = parent_metadata.copy() + + result = {"new_metadata": local_metadata, "in_dataset": in_dataset} + + if in_dataset: + # If we already are in a dataset, we get all the metadata from the parent. We + # cannot override attributes in this case. However we need to remove the top-level + # attribute, because the present object is within in a dataset, and thus not a top-level + # data object. + if "dataset_toplevel" in local_metadata: + del [local_metadata["dataset_toplevel"]] + else: + intake_extract_tokens_from_name(ctx, path, local_metadata) + if intake_tokens_identify_dataset(local_metadata): + intake_ensure_version_present(ctx, local_metadata) + local_metadata["directory"] = path if is_collection else os.path.dirname(path) + local_metadata["dataset_id"] = dataset_make_id(local_metadata) + local_metadata["dataset_toplevel"] = dataset_make_id(local_metadata) + result["in_dataset"] = True + else: + # result["in_dataset"] is already set to false + pass + + return result + + +def dataset_make_id(scope): + """Construct a dataset based on WEPV and directory. + + :param scope: Create a dataset id + + :returns: Dataset identifier + """ + return scope['wave'] + '\t' + scope['experiment_type'] + '\t' + scope['pseudocode'] + '\t' + scope['version'] + '\t' + scope['directory'] + + +def dataset_parse_id(dataset_id): + """Parse a dataset into its consructive data. + + :param dataset_id: Dataset identifier + + :returns: Dataset as a dict + """ + dataset_parts = dataset_id.split('\t') + dataset = {} + dataset['wave'] = dataset_parts[0] + dataset['experiment_type'] = dataset_parts[1] # Is DIT NOG ERGENS GOED VOOR + dataset['expType'] = dataset_parts[1] + dataset['pseudocode'] = dataset_parts[2] + dataset['version'] = dataset_parts[3] + dataset['directory'] = dataset_parts[4] + + return dataset diff --git a/unit-tests/test_intake.py b/unit-tests/test_intake.py index 0fb4ccc5f..43d737dcc 100644 --- a/unit-tests/test_intake.py +++ b/unit-tests/test_intake.py @@ -6,12 +6,13 @@ __copyright__ = 'Copyright (c) 2019-2021, Utrecht University' __license__ = 'GPLv3, see LICENSE' +import os import sys from unittest import TestCase sys.path.append('..') -from intake_utils import intake_extract_tokens, intake_extract_tokens_from_name, intake_tokens_identify_dataset +from intake_utils import dataset_make_id, dataset_parse_id, intake_extract_tokens, intake_extract_tokens_from_name, intake_scan_get_metadata_update, intake_tokens_identify_dataset class IntakeTest(TestCase): @@ -44,9 +45,115 @@ def test_intake_extract_tokens(self): def test_intake_extract_tokens_from_name(self): buffer = dict() - output = intake_extract_tokens_from_name(None, None, "20w_chantigap_B12345_VerABC.txt", None, buffer) + output = intake_extract_tokens_from_name(None, "20w_chantigap_B12345_VerABC.txt", buffer) self.assertEquals(len(output), 4) self.assertEquals(output["wave"], "20w") self.assertEquals(output["experiment_type"], "chantigap") self.assertEquals(output["version"], "ABC") self.assertEquals(output["pseudocode"], "B12345") + + def test_intake_scan_get_metadata_update_coll_in_dataset(self): + parent_path = "/foo/bar/chantigap_10w_B12345" + path = parent_path + "/chantigap_20w_B12346" + complete_metadata = {"wave": "1", + "pseudocode": "2", + "experiment_type": "3", + "version": "Raw", + "directory": parent_path, + "dataset_id": "4", + "dataset_toplevel": "5"} + + output = intake_scan_get_metadata_update(None, path, True, True, complete_metadata) + self.assertEquals(output["in_dataset"], True) + self.assertEquals(len(output["new_metadata"]), 6) + self.assertEquals(output["new_metadata"]["directory"], parent_path) + self.assertEquals(output["new_metadata"]["wave"], "1") + self.assertEquals(output["new_metadata"]["pseudocode"], "2") + self.assertEquals(output["new_metadata"]["experiment_type"], "3") + self.assertEquals(output["new_metadata"]["version"], "Raw") + self.assertEquals(output["new_metadata"]["dataset_id"], "4") + self.assertTrue("dataset_toplevel" not in output["new_metadata"]) + + def test_intake_scan_get_metadata_update_coll_out_dataset_complete(self): + incomplete_metadata = {"wave": "1", "pseudocode": "2"} + path = "/foo/bar/chantigap_10w_B12345/chantigap_B12346" + output = intake_scan_get_metadata_update(None, path, True, False, incomplete_metadata) + self.assertEquals(output["in_dataset"], True) + self.assertEquals(len(output["new_metadata"]), 7) + self.assertEquals(output["new_metadata"]["directory"], path) + self.assertEquals(output["new_metadata"]["dataset_toplevel"], dataset_make_id(output["new_metadata"])) + self.assertEquals(output["new_metadata"]["dataset_id"], dataset_make_id(output["new_metadata"])) + self.assertEquals(output["new_metadata"]["wave"], "1") + self.assertEquals(output["new_metadata"]["version"], "Raw") + self.assertEquals(output["new_metadata"]["pseudocode"], "B12346") + self.assertEquals(output["new_metadata"]["experiment_type"], "chantigap") + + def test_intake_scan_get_metadata_update_coll_out_dataset_incomplete(self): + incomplete_metadata = {"wave": "1"} + path = "/foo/bar/chantigap_10w_B12345/B12346" + output = intake_scan_get_metadata_update(None, path, True, False, incomplete_metadata) + self.assertEquals(output["in_dataset"], False) + self.assertEquals(len(output["new_metadata"]), 2) + self.assertEquals(output["new_metadata"]["wave"], "1") + self.assertEquals(output["new_metadata"]["pseudocode"], "B12346") + + def test_intake_scan_get_metadata_update_do_in_dataset(self): + complete_metadata = {"wave": "1", + "pseudocode": "2", + "experiment_type": "3", + "version": "Raw", + "dataset_id": "4", + "dataset_toplevel": "5", + "directory": "6"} + path = "/foo/bar/chantigap_10w_B12345/chantigap_20w_B12346.txt" + output = intake_scan_get_metadata_update(None, path, False, True, complete_metadata) + self.assertEquals(output["in_dataset"], True) + self.assertEquals(len(output["new_metadata"]), 6) + self.assertEquals(output["new_metadata"]["wave"], "1") + self.assertEquals(output["new_metadata"]["pseudocode"], "2") + self.assertEquals(output["new_metadata"]["experiment_type"], "3") + self.assertEquals(output["new_metadata"]["version"], "Raw") + self.assertEquals(output["new_metadata"]["dataset_id"], "4") + self.assertTrue("dataset_toplevel" not in output["new_metadata"]) + + def test_intake_scan_get_metadata_update_do_out_dataset_complete(self): + incomplete_metadata = {"wave": "1", "pseudocode": "2"} + path = "/foo/bar/chantigap_10w_B12345/chantigap_B12346.txt" + coll = os.path.dirname(path) + output = intake_scan_get_metadata_update(None, path, False, False, incomplete_metadata) + self.assertEquals(output["in_dataset"], True) + self.assertEquals(len(output["new_metadata"]), 7) + self.assertEquals(output["new_metadata"]["directory"], coll) + self.assertEquals(output["new_metadata"]["dataset_id"], dataset_make_id(output["new_metadata"])) + self.assertEquals(output["new_metadata"]["dataset_toplevel"], dataset_make_id(output["new_metadata"])) + self.assertEquals(output["new_metadata"]["wave"], "1") + self.assertEquals(output["new_metadata"]["version"], "Raw") + self.assertEquals(output["new_metadata"]["pseudocode"], "B12346") + self.assertEquals(output["new_metadata"]["experiment_type"], "chantigap") + + def test_intake_scan_get_metadata_update_do_out_dataset_incomplete(self): + incomplete_metadata = {"wave": "1"} + path = "/foo/bar/chantigap_10w_B12345/B12346.txt" + output = intake_scan_get_metadata_update(None, path, False, False, incomplete_metadata) + self.assertEquals(output["in_dataset"], False) + self.assertEquals(len(output["new_metadata"]), 2) + self.assertEquals(output["new_metadata"]["wave"], "1") + self.assertEquals(output["new_metadata"]["pseudocode"], "B12346") + + def test_dataset_make_id(self): + input = {"wave": "20w", + "experiment_type": "echo", + "pseudocode": "B12345", + "version": "Raw", + "directory": "/foo/bar/baz"} + self.assertEquals(dataset_make_id(input), + "20w\techo\tB12345\tRaw\t/foo/bar/baz") + + def test_dataset_parse_id(self): + input = "20w\techo\tB12345\tRaw\t/foo/bar/baz" + output = dataset_parse_id(input) + self.assertEquals(output.get("wave"), "20w") + self.assertEquals(output.get("experiment_type"), "echo") + self.assertEquals(output.get("pseudocode"), "B12345") + self.assertEquals(output.get("version"), "Raw") + self.assertEquals(output.get("directory"), "/foo/bar/baz")