diff --git a/intake_scan.py b/intake_scan.py index 90b028a54..ff94c1c8f 100644 --- a/intake_scan.py +++ b/intake_scan.py @@ -9,7 +9,7 @@ import genquery import intake -from intake_utils import intake_extract_tokens_from_name, intake_tokens_identify_dataset +from intake_utils import intake_scan_get_metadata_update from util import * @@ -43,46 +43,18 @@ def intake_scan_collection(ctx, root, scope, in_dataset, found_datasets): if not (locked_state['locked'] or locked_state['frozen']): remove_dataset_metadata(ctx, path, False) scan_mark_scanned(ctx, path, False) - if in_dataset: - subscope = scope.copy() - - # Safeguard original data - prev_scope = subscope.copy() - - # Extract tokens of current name - intake_extract_tokens_from_name(ctx, row[1], row[0], False, subscope) - - new_deeper_dataset_toplevel = False - if not (prev_scope['pseudocode'] == subscope['pseudocode'] - and prev_scope['experiment_type'] == subscope['experiment_type'] - and prev_scope['wave'] == subscope['wave']): - prev_scope['directory'] = prev_scope["dataset_directory"] - if 'version' not in prev_scope: - prev_scope['version'] = 'Raw' - - # It is safe to assume that the dataset was collection based without actively checking. - # Otherwise it would mean that each found file on this level could potentially change the dataset properties - # avu.rm_from_coll(ctx, prev_scope['directory'], 'dataset_toplevel', dataset_make_id(prev_scope)) - # If still present (could have been removed in loop earlier) remove dataset_toplevel on prev_scope['directory'] - try: - avu.rmw_from_coll(ctx, prev_scope['directory'], 'dataset_toplevel', "%") - except msi.Error: - pass - new_deeper_dataset_toplevel = True - apply_dataset_metadata(ctx, path, subscope, False, new_deeper_dataset_toplevel) - else: - subscope = intake_extract_tokens_from_name(ctx, row[1], row[0], False, scope.copy()) + parent_in_dataset = in_dataset + metadata_update = intake_scan_get_metadata_update(ctx, path, False, in_dataset, scope) - if intake_tokens_identify_dataset(subscope): + if metadata_update["in_dataset"]: + apply_dataset_metadata(ctx, path, metadata_update["new_metadata"], False, parent_in_dataset) + if not parent_in_dataset: # We found a top-level dataset data object. - subscope["dataset_directory"] = row[1] - apply_dataset_metadata(ctx, path, subscope, False, True) - # For reporting purposes collect the subscopes - found_datasets.append(subscope) - else: - apply_partial_metadata(ctx, subscope, path, False) - avu.set_on_data(ctx, path, "unrecognized", "Experiment type, wave or pseudocode missing from path") + found_datasets.append(metadata_update["new_metadata"]) + else: + apply_partial_metadata(ctx, subscope, path, False) + avu.set_on_data(ctx, path, "unrecognized", "Experiment type, wave or pseudocode missing from path") # Scan collections under root iter = genquery.row_iterator( @@ -100,60 +72,23 @@ def intake_scan_collection(ctx, root, scope, in_dataset, found_datasets): # get locked /frozen status locked_state = object_is_locked(ctx, path, True) - if not (locked_state['locked'] or locked_state['frozen']): - remove_dataset_metadata(ctx, path, True) - - subscope = scope.copy() - child_in_dataset = in_dataset - - if in_dataset: # initially is False - # Safeguard original data - prev_scope = subscope.copy() + if locked_state['locked'] or locked_state['frozen']: + continue - # Extract tokens of current name - intake_extract_tokens_from_name(ctx, path, dirname, True, subscope) + remove_dataset_metadata(ctx, path, True) - new_deeper_dataset_toplevel = False + parent_in_dataset = in_dataset + metadata_update = intake_scan_get_metadata_update(ctx, path, True, in_dataset, scope) - # A change in path in relation to the dataset_directory invokes handling of deeper lying dataset. - # This, not necessarily with a dataset id change, but it does change the dataset toplevel - # In fact the same dataset simply lies a level deeper which invokes a dataset_toplevel change - if not (prev_scope['pseudocode'] == subscope['pseudocode'] - and prev_scope['experiment_type'] == subscope['experiment_type'] - and prev_scope['wave'] == subscope['wave'] - and path == prev_scope['dataset_directory']): - # Found a deeper lying dataset with more specific attributes - # Prepwork for being able to create a dataset_id - prev_scope['directory'] = prev_scope["dataset_directory"] - if 'version' not in prev_scope: - prev_scope['version'] = 'Raw' - - # If still present (could have been removed in loop earlier) remove dataset_toplevel on prev_scope['directory'] - try: - avu.rmw_from_coll(ctx, prev_scope['directory'], 'dataset_toplevel', "%") - except msi.Error: - pass - - # set flag correctly for creating of new toplevel - new_deeper_dataset_toplevel = True - - subscope["dataset_directory"] = path - apply_dataset_metadata(ctx, path, subscope, True, new_deeper_dataset_toplevel) + if metadata_update["in_dataset"]: + apply_dataset_metadata(ctx, path, metadata_update["new_metadata"], True, parent_in_dataset) + if not parent_in_dataset: + # We found a new top-level dataset data object. + found_datasets.append(metadata_update["new_metadata"]) + else: + apply_partial_metadata(ctx, subscope, path, True) + avu.set_on_data(ctx, path, "unrecognized", "Experiment type, wave or pseudocode missing from path") - scan_mark_scanned(ctx, path, True) - else: - subscope = intake_extract_tokens_from_name(ctx, path, dirname, True, subscope) - - if intake_tokens_identify_dataset(subscope): - child_in_dataset = True - # We found a top-level dataset collection. - subscope["dataset_directory"] = path - apply_dataset_metadata(ctx, path, subscope, True, True) - # For reporting purposes collect the subscopes - found_datasets.append(subscope) - else: - apply_partial_metadata(ctx, subscope, path, True) - # Go a level deeper found_datasets = intake_scan_collection(ctx, path, subscope, child_in_dataset, found_datasets) return found_datasets diff --git a/intake_utils.py b/intake_utils.py index fede32b5d..86047651f 100644 --- a/intake_utils.py +++ b/intake_utils.py @@ -28,16 +28,25 @@ def intake_tokens_identify_dataset(tokens): return (missing == 0) -def intake_extract_tokens_from_name(ctx, path, name, is_collection, scoped_buffer): +def intake_ensure_version_present(ctx, metadata): + """Adds a version attribute with a default value to metadata if it is not yet present. + + :param ctx: Combined type of a callback and rei struct + :param metadata: Dictionary with intake module metadata + """ + if "version" not in metadata: + metadata["version"] = "Raw" + + +def intake_extract_tokens_from_name(ctx, path, scoped_buffer): """Extract one or more tokens from a file / directory name and add dataset information as metadata. :param ctx: Combined type of a callback and rei struct - :param path: Path to object or collection - :param name: Name of object or collection - :param is_collection: Indicates if object or collection + :param path: Full path of the data object or collection :param scoped_buffer: Holds dataset buffer with prefilled keys :returns: Returns extended scope buffer """ - name_without_ext = os.path.splitext(name)[0] + basename = os.path.basename(path) + name_without_ext = os.path.splitext(basename)[0] parts = re.split("[_-]", name_without_ext) for part in parts: scoped_buffer.update(intake_extract_tokens(ctx, part)) @@ -120,3 +129,44 @@ def intake_extract_tokens(ctx, string): foundKVs["experiment_type"] = str_lower return foundKVs + + +def intake_scan_get_metadata_update(ctx, path, is_collection, in_dataset, parent_metadata): + """Determine metadata to be updated for a particular collection or data object, based + on its name and parent metadata. + + This function is separate from the function that actually performs the updates, so + that we can test the logic separately. + + :param ctx: Combined type of a callback and rei struct + :param path: Full path of the data object or collection + :param is_collection: true if it's a collection, false if it's a data object + :param in_dataset: true if the parent already has complete WEP(V) attributes. Otherwise false. + :param parent_metadata: dict containing the intake module metadata of the parent collection ( if any) + + :returns: Returns a dictionary with the following keys / values: + new_metadata: dictionary of new metadata to apply to this data object or collection + in_dataset: true if current object (along with values passed from parents) has complete WEP(V) values. + otherwise false. + """ + + local_metadata = parent_metadata.copy() + + result = {"new_metadata": local_metadata, "in_dataset": in_dataset} + + if in_dataset: + # If we already are in a dataset, we get all the metadata from the parent. We + # cannot override attributes in this case. + local_metadata["directory"] = path if is_collection else os.path.dirname(path) + else: + intake_extract_tokens_from_name(ctx, path, local_metadata) + if intake_tokens_identify_dataset(local_metadata): + intake_ensure_version_present(ctx, local_metadata) + local_metadata["directory"] = path if is_collection else os.path.dirname(path) + result["in_dataset"] = True + else: + # result["in_dataset"] is already set false + # No need to set version yet. That will be done when WEP(V) values are complete + pass + + return result diff --git a/unit-tests/test_intake.py b/unit-tests/test_intake.py index 0fb4ccc5f..db8d1e9f9 100644 --- a/unit-tests/test_intake.py +++ b/unit-tests/test_intake.py @@ -6,12 +6,13 @@ __copyright__ = 'Copyright (c) 2019-2021, Utrecht University' __license__ = 'GPLv3, see LICENSE' +import os import sys from unittest import TestCase sys.path.append('..') -from intake_utils import intake_extract_tokens, intake_extract_tokens_from_name, intake_tokens_identify_dataset +from intake_utils import intake_extract_tokens, intake_extract_tokens_from_name, intake_scan_get_metadata_update, intake_tokens_identify_dataset class IntakeTest(TestCase): @@ -44,9 +45,75 @@ def test_intake_extract_tokens(self): def test_intake_extract_tokens_from_name(self): buffer = dict() - output = intake_extract_tokens_from_name(None, None, "20w_chantigap_B12345_VerABC.txt", None, buffer) + output = intake_extract_tokens_from_name(None, "20w_chantigap_B12345_VerABC.txt", buffer) self.assertEquals(len(output), 4) self.assertEquals(output["wave"], "20w") self.assertEquals(output["experiment_type"], "chantigap") self.assertEquals(output["version"], "ABC") self.assertEquals(output["pseudocode"], "B12345") + + def test_intake_scan_get_metadata_update_coll_in_dataset(self): + complete_metadata = {"wave": "1", "pseudocode": "2", "experiment_type": "3"} + path = "/foo/bar/chantigap_10w_B12345/chantigap_20w_B12346" + output = intake_scan_get_metadata_update(None, path, True, True, complete_metadata) + self.assertEquals(output["in_dataset"], True) + self.assertEquals(len(output["new_metadata"]), 4) + self.assertEquals(output["new_metadata"]["directory"], path) + self.assertEquals(output["new_metadata"]["wave"], "1") + self.assertEquals(output["new_metadata"]["pseudocode"], "2") + self.assertEquals(output["new_metadata"]["experiment_type"], "3") + + def test_intake_scan_get_metadata_update_coll_out_dataset_complete(self): + incomplete_metadata = {"wave": "1", "pseudocode": "2"} + path = "/foo/bar/chantigap_10w_B12345/chantigap_B12346" + output = intake_scan_get_metadata_update(None, path, True, False, incomplete_metadata) + self.assertEquals(output["in_dataset"], True) + self.assertEquals(len(output["new_metadata"]), 5) + self.assertEquals(output["new_metadata"]["directory"], path) + self.assertEquals(output["new_metadata"]["wave"], "1") + self.assertEquals(output["new_metadata"]["version"], "Raw") + self.assertEquals(output["new_metadata"]["pseudocode"], "B12346") + self.assertEquals(output["new_metadata"]["experiment_type"], "chantigap") + + def test_intake_scan_get_metadata_update_coll_out_dataset_incomplete(self): + incomplete_metadata = {"wave": "1"} + path = "/foo/bar/chantigap_10w_B12345/B12346" + output = intake_scan_get_metadata_update(None, path, True, False, incomplete_metadata) + self.assertEquals(output["in_dataset"], False) + self.assertEquals(len(output["new_metadata"]), 2) + self.assertEquals(output["new_metadata"]["wave"], "1") + self.assertEquals(output["new_metadata"]["pseudocode"], "B12346") + + def test_intake_scan_get_metadata_update_do_in_dataset(self): + complete_metadata = {"wave": "1", "pseudocode": "2", "experiment_type": "3"} + path = "/foo/bar/chantigap_10w_B12345/chantigap_20w_B12346.txt" + coll = os.path.dirname(path) + output = intake_scan_get_metadata_update(None, path, False, True, complete_metadata) + self.assertEquals(output["in_dataset"], True) + self.assertEquals(len(output["new_metadata"]), 4) + self.assertEquals(output["new_metadata"]["directory"], coll) + self.assertEquals(output["new_metadata"]["wave"], "1") + self.assertEquals(output["new_metadata"]["pseudocode"], "2") + self.assertEquals(output["new_metadata"]["experiment_type"], "3") + + def test_intake_scan_get_metadata_update_do_out_dataset_complete(self): + incomplete_metadata = {"wave": "1", "pseudocode": "2"} + path = "/foo/bar/chantigap_10w_B12345/chantigap_B12346.txt" + coll = os.path.dirname(path) + output = intake_scan_get_metadata_update(None, path, False, False, incomplete_metadata) + self.assertEquals(output["in_dataset"], True) + self.assertEquals(len(output["new_metadata"]), 5) + self.assertEquals(output["new_metadata"]["directory"], coll) + self.assertEquals(output["new_metadata"]["wave"], "1") + self.assertEquals(output["new_metadata"]["version"], "Raw") + self.assertEquals(output["new_metadata"]["pseudocode"], "B12346") + self.assertEquals(output["new_metadata"]["experiment_type"], "chantigap") + + def test_intake_scan_get_metadata_update_do_out_dataset_incomplete(self): + incomplete_metadata = {"wave": "1"} + path = "/foo/bar/chantigap_10w_B12345/B12346.txt" + output = intake_scan_get_metadata_update(None, path, False, False, incomplete_metadata) + self.assertEquals(output["in_dataset"], False) + self.assertEquals(len(output["new_metadata"]), 2) + self.assertEquals(output["new_metadata"]["wave"], "1") + self.assertEquals(output["new_metadata"]["pseudocode"], "B12346")