Skip to content

Commit

Permalink
YDA-5460 Improve intake collection scan logic
Browse files Browse the repository at this point in the history
- Extract logic for collection scans intake module to separate utility
  function, so that we can test it separately.
- Add error logging for intake scan failures
- Add some basic unit tests
- Ensure that we can't overwrite WEP(V) values once we are in a dataset
  (YDA-5460). WEP(V) values can only be overwritten as long as we don't have a
  complete Wave / Experiment type / Pseudocode set
  • Loading branch information
stsnel committed Oct 4, 2023
1 parent 2418d9b commit 4b9bde3
Show file tree
Hide file tree
Showing 4 changed files with 156 additions and 96 deletions.
2 changes: 2 additions & 0 deletions intake.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

import fnmatch
import time
import traceback

import genquery

Expand Down Expand Up @@ -407,6 +408,7 @@ def api_intake_scan_for_datasets(ctx, coll):
try:
_intake_scan_for_datasets(ctx, coll)
except Exception:
log.write(ctx, "Intake scan failed with the following exception: " + traceback.format_exc())
return {"proc_status": "NOK", "error_msg": "Error during scanning process"}
else:
return {"proc_status": "NOK", "error_msg": "No permissions to scan collection"}
Expand Down
116 changes: 27 additions & 89 deletions intake_scan.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
import genquery

import intake
from intake_utils import intake_extract_tokens_from_name, intake_tokens_identify_dataset
from intake_utils import intake_scan_get_metadata_update
from util import *


Expand Down Expand Up @@ -43,46 +43,18 @@ def intake_scan_collection(ctx, root, scope, in_dataset, found_datasets):
if not (locked_state['locked'] or locked_state['frozen']):
remove_dataset_metadata(ctx, path, False)
scan_mark_scanned(ctx, path, False)
if in_dataset:
subscope = scope.copy()

# Safeguard original data
prev_scope = subscope.copy()

# Extract tokens of current name
intake_extract_tokens_from_name(ctx, row[1], row[0], False, subscope)

new_deeper_dataset_toplevel = False
if not (prev_scope['pseudocode'] == subscope['pseudocode']
and prev_scope['experiment_type'] == subscope['experiment_type']
and prev_scope['wave'] == subscope['wave']):
prev_scope['directory'] = prev_scope["dataset_directory"]
if 'version' not in prev_scope:
prev_scope['version'] = 'Raw'

# It is safe to assume that the dataset was collection based without actively checking.
# Otherwise it would mean that each found file on this level could potentially change the dataset properties
# avu.rm_from_coll(ctx, prev_scope['directory'], 'dataset_toplevel', dataset_make_id(prev_scope))
# If still present (could have been removed in loop earlier) remove dataset_toplevel on prev_scope['directory']
try:
avu.rmw_from_coll(ctx, prev_scope['directory'], 'dataset_toplevel', "%")
except msi.Error:
pass

new_deeper_dataset_toplevel = True
apply_dataset_metadata(ctx, path, subscope, False, new_deeper_dataset_toplevel)
else:
subscope = intake_extract_tokens_from_name(ctx, row[1], row[0], False, scope.copy())
parent_in_dataset = in_dataset
metadata_update = intake_scan_get_metadata_update(ctx, path, False, in_dataset, scope)

if intake_tokens_identify_dataset(subscope):
if metadata_update["in_dataset"]:
apply_dataset_metadata(ctx, path, metadata_update["new_metadata"], False, parent_in_dataset)
if not parent_in_dataset:
# We found a top-level dataset data object.
subscope["dataset_directory"] = row[1]
apply_dataset_metadata(ctx, path, subscope, False, True)
# For reporting purposes collect the subscopes
found_datasets.append(subscope)
else:
apply_partial_metadata(ctx, subscope, path, False)
avu.set_on_data(ctx, path, "unrecognized", "Experiment type, wave or pseudocode missing from path")
found_datasets.append(metadata_update["new_metadata"])
else:
apply_partial_metadata(ctx, subscope, path, False)
avu.set_on_data(ctx, path, "unrecognized", "Experiment type, wave or pseudocode missing from path")

# Scan collections under root
iter = genquery.row_iterator(
Expand All @@ -100,61 +72,27 @@ def intake_scan_collection(ctx, root, scope, in_dataset, found_datasets):
# get locked /frozen status
locked_state = object_is_locked(ctx, path, True)

if not (locked_state['locked'] or locked_state['frozen']):
remove_dataset_metadata(ctx, path, True)

subscope = scope.copy()
child_in_dataset = in_dataset

if in_dataset: # initially is False
# Safeguard original data
prev_scope = subscope.copy()
if locked_state['locked'] or locked_state['frozen']:
continue

# Extract tokens of current name
intake_extract_tokens_from_name(ctx, path, dirname, True, subscope)
remove_dataset_metadata(ctx, path, True)

new_deeper_dataset_toplevel = False
parent_in_dataset = in_dataset
metadata_update = intake_scan_get_metadata_update(ctx, path, True, in_dataset, scope)

# A change in path in relation to the dataset_directory invokes handling of deeper lying dataset.
# This, not necessarily with a dataset id change, but it does change the dataset toplevel
# In fact the same dataset simply lies a level deeper which invokes a dataset_toplevel change
if not (prev_scope['pseudocode'] == subscope['pseudocode']
and prev_scope['experiment_type'] == subscope['experiment_type']
and prev_scope['wave'] == subscope['wave']
and path == prev_scope['dataset_directory']):
# Found a deeper lying dataset with more specific attributes
# Prepwork for being able to create a dataset_id
prev_scope['directory'] = prev_scope["dataset_directory"]
if 'version' not in prev_scope:
prev_scope['version'] = 'Raw'

# If still present (could have been removed in loop earlier) remove dataset_toplevel on prev_scope['directory']
try:
avu.rmw_from_coll(ctx, prev_scope['directory'], 'dataset_toplevel', "%")
except msi.Error:
pass

# set flag correctly for creating of new toplevel
new_deeper_dataset_toplevel = True

subscope["dataset_directory"] = path
apply_dataset_metadata(ctx, path, subscope, True, new_deeper_dataset_toplevel)
if metadata_update["in_dataset"]:
apply_dataset_metadata(ctx, path, metadata_update["new_metadata"], True, parent_in_dataset)
if not parent_in_dataset:
# We found a new top-level dataset data object.
found_datasets.append(metadata_update["new_metadata"])
else:
apply_partial_metadata(ctx, metadata_update["new_metadata"], path, True)

scan_mark_scanned(ctx, path, True)
else:
subscope = intake_extract_tokens_from_name(ctx, path, dirname, True, subscope)

if intake_tokens_identify_dataset(subscope):
child_in_dataset = True
# We found a top-level dataset collection.
subscope["dataset_directory"] = path
apply_dataset_metadata(ctx, path, subscope, True, True)
# For reporting purposes collect the subscopes
found_datasets.append(subscope)
else:
apply_partial_metadata(ctx, subscope, path, True)
# Go a level deeper
found_datasets = intake_scan_collection(ctx, path, subscope, child_in_dataset, found_datasets)
found_datasets = intake_scan_collection(ctx,
path,
metadata_update["new_metadata"],
parent_in_dataset or metadata_update["in_dataset"],
found_datasets)

return found_datasets

Expand Down
61 changes: 56 additions & 5 deletions intake_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,16 +28,25 @@ def intake_tokens_identify_dataset(tokens):
return (missing == 0)


def intake_extract_tokens_from_name(ctx, path, name, is_collection, scoped_buffer):
def intake_ensure_version_present(ctx, metadata):
"""Adds a version attribute with a default value to metadata if it is not yet present.
:param ctx: Combined type of a callback and rei struct
:param metadata: Dictionary with intake module metadata
"""
if "version" not in metadata:
metadata["version"] = "Raw"


def intake_extract_tokens_from_name(ctx, path, scoped_buffer):
"""Extract one or more tokens from a file / directory name and add dataset information as metadata.
:param ctx: Combined type of a callback and rei struct
:param path: Path to object or collection
:param name: Name of object or collection
:param is_collection: Indicates if object or collection
:param path: Full path of the data object or collection
:param scoped_buffer: Holds dataset buffer with prefilled keys
:returns: Returns extended scope buffer
"""
name_without_ext = os.path.splitext(name)[0]
basename = os.path.basename(path)
name_without_ext = os.path.splitext(basename)[0]
parts = re.split("[_-]", name_without_ext)
for part in parts:
scoped_buffer.update(intake_extract_tokens(ctx, part))
Expand Down Expand Up @@ -120,3 +129,45 @@ def intake_extract_tokens(ctx, string):
foundKVs["experiment_type"] = str_lower

return foundKVs


def intake_scan_get_metadata_update(ctx, path, is_collection, in_dataset, parent_metadata):
"""Determine metadata to be updated for a particular collection or data object, based
on its name and parent metadata.
This function is separate from the function that actually performs the updates, so
that we can test the logic separately.
:param ctx: Combined type of a callback and rei struct
:param path: Full path of the data object or collection
:param is_collection: true if it's a collection, false if it's a data object
:param in_dataset: true if the parent already has complete WEP(V) attributes. Otherwise false.
:param parent_metadata: dict containing the intake module metadata of the parent collection ( if any)
:returns: Returns a dictionary with the following keys / values:
new_metadata: dictionary of new metadata to apply to this data object or collection
in_dataset: true if current object (along with values passed from parents) has complete WEP(V) values.
otherwise false.
"""

local_metadata = parent_metadata.copy()

result = {"new_metadata": local_metadata, "in_dataset": in_dataset}

if in_dataset:
# If we already are in a dataset, we get all the metadata from the parent. We
# cannot override attributes in this case.
local_metadata["directory"] = path if is_collection else os.path.dirname(path)
else:
intake_extract_tokens_from_name(ctx, path, local_metadata)
if intake_tokens_identify_dataset(local_metadata):
intake_ensure_version_present(ctx, local_metadata)
local_metadata["directory"] = path if is_collection else os.path.dirname(path)
local_metadata["dataset_directory"] = path if is_collection else os.path.dirname(path)
result["in_dataset"] = True
else:
# result["in_dataset"] is already set false
# No need to set version yet. That will be done when WEP(V) values are complete
pass

return result
73 changes: 71 additions & 2 deletions unit-tests/test_intake.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,13 @@
__copyright__ = 'Copyright (c) 2019-2021, Utrecht University'
__license__ = 'GPLv3, see LICENSE'

import os
import sys
from unittest import TestCase

sys.path.append('..')

from intake_utils import intake_extract_tokens, intake_extract_tokens_from_name, intake_tokens_identify_dataset
from intake_utils import intake_extract_tokens, intake_extract_tokens_from_name, intake_scan_get_metadata_update, intake_tokens_identify_dataset


class IntakeTest(TestCase):
Expand Down Expand Up @@ -44,9 +45,77 @@ def test_intake_extract_tokens(self):

def test_intake_extract_tokens_from_name(self):
buffer = dict()
output = intake_extract_tokens_from_name(None, None, "20w_chantigap_B12345_VerABC.txt", None, buffer)
output = intake_extract_tokens_from_name(None, "20w_chantigap_B12345_VerABC.txt", buffer)
self.assertEquals(len(output), 4)
self.assertEquals(output["wave"], "20w")
self.assertEquals(output["experiment_type"], "chantigap")
self.assertEquals(output["version"], "ABC")
self.assertEquals(output["pseudocode"], "B12345")

def test_intake_scan_get_metadata_update_coll_in_dataset(self):
complete_metadata = {"wave": "1", "pseudocode": "2", "experiment_type": "3"}
path = "/foo/bar/chantigap_10w_B12345/chantigap_20w_B12346"
output = intake_scan_get_metadata_update(None, path, True, True, complete_metadata)
self.assertEquals(output["in_dataset"], True)
self.assertEquals(len(output["new_metadata"]), 4)
self.assertEquals(output["new_metadata"]["directory"], path)
self.assertEquals(output["new_metadata"]["wave"], "1")
self.assertEquals(output["new_metadata"]["pseudocode"], "2")
self.assertEquals(output["new_metadata"]["experiment_type"], "3")

def test_intake_scan_get_metadata_update_coll_out_dataset_complete(self):
incomplete_metadata = {"wave": "1", "pseudocode": "2"}
path = "/foo/bar/chantigap_10w_B12345/chantigap_B12346"
output = intake_scan_get_metadata_update(None, path, True, False, incomplete_metadata)
self.assertEquals(output["in_dataset"], True)
self.assertEquals(len(output["new_metadata"]), 6)
self.assertEquals(output["new_metadata"]["directory"], path)
self.assertEquals(output["new_metadata"]["dataset_directory"], path)
self.assertEquals(output["new_metadata"]["wave"], "1")
self.assertEquals(output["new_metadata"]["version"], "Raw")
self.assertEquals(output["new_metadata"]["pseudocode"], "B12346")
self.assertEquals(output["new_metadata"]["experiment_type"], "chantigap")

def test_intake_scan_get_metadata_update_coll_out_dataset_incomplete(self):
incomplete_metadata = {"wave": "1"}
path = "/foo/bar/chantigap_10w_B12345/B12346"
output = intake_scan_get_metadata_update(None, path, True, False, incomplete_metadata)
self.assertEquals(output["in_dataset"], False)
self.assertEquals(len(output["new_metadata"]), 2)
self.assertEquals(output["new_metadata"]["wave"], "1")
self.assertEquals(output["new_metadata"]["pseudocode"], "B12346")

def test_intake_scan_get_metadata_update_do_in_dataset(self):
complete_metadata = {"wave": "1", "pseudocode": "2", "experiment_type": "3"}
path = "/foo/bar/chantigap_10w_B12345/chantigap_20w_B12346.txt"
coll = os.path.dirname(path)
output = intake_scan_get_metadata_update(None, path, False, True, complete_metadata)
self.assertEquals(output["in_dataset"], True)
self.assertEquals(len(output["new_metadata"]), 4)
self.assertEquals(output["new_metadata"]["directory"], coll)
self.assertEquals(output["new_metadata"]["wave"], "1")
self.assertEquals(output["new_metadata"]["pseudocode"], "2")
self.assertEquals(output["new_metadata"]["experiment_type"], "3")

def test_intake_scan_get_metadata_update_do_out_dataset_complete(self):
incomplete_metadata = {"wave": "1", "pseudocode": "2"}
path = "/foo/bar/chantigap_10w_B12345/chantigap_B12346.txt"
coll = os.path.dirname(path)
output = intake_scan_get_metadata_update(None, path, False, False, incomplete_metadata)
self.assertEquals(output["in_dataset"], True)
self.assertEquals(len(output["new_metadata"]), 6)
self.assertEquals(output["new_metadata"]["directory"], coll)
self.assertEquals(output["new_metadata"]["dataset_directory"], coll)
self.assertEquals(output["new_metadata"]["wave"], "1")
self.assertEquals(output["new_metadata"]["version"], "Raw")
self.assertEquals(output["new_metadata"]["pseudocode"], "B12346")
self.assertEquals(output["new_metadata"]["experiment_type"], "chantigap")

def test_intake_scan_get_metadata_update_do_out_dataset_incomplete(self):
incomplete_metadata = {"wave": "1"}
path = "/foo/bar/chantigap_10w_B12345/B12346.txt"
output = intake_scan_get_metadata_update(None, path, False, False, incomplete_metadata)
self.assertEquals(output["in_dataset"], False)
self.assertEquals(len(output["new_metadata"]), 2)
self.assertEquals(output["new_metadata"]["wave"], "1")
self.assertEquals(output["new_metadata"]["pseudocode"], "B12346")

0 comments on commit 4b9bde3

Please sign in to comment.