Skip to content

Commit

Permalink
YDA-5460 Improve intake collection scan logic
Browse files Browse the repository at this point in the history
- Extract logic for collection scans intake module to separate utility
  function, so that we can test it separately.
- Add error logging for intake scan failures
- Add some basic unit tests
- Ensure that we can't overwrite WEP(V) values once we are in a dataset
  (YDA-5460). WEP(V) values can only be overwritten as long as we don't have a
  complete Wave / Experiment type / Pseudocode set
  • Loading branch information
stsnel committed Oct 4, 2023
1 parent 2418d9b commit 7472267
Show file tree
Hide file tree
Showing 4 changed files with 234 additions and 154 deletions.
2 changes: 2 additions & 0 deletions intake.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

import fnmatch
import time
import traceback

import genquery

Expand Down Expand Up @@ -407,6 +408,7 @@ def api_intake_scan_for_datasets(ctx, coll):
try:
_intake_scan_for_datasets(ctx, coll)
except Exception:
log.write(ctx, "Intake scan failed with the following exception: " + traceback.format_exc())
return {"proc_status": "NOK", "error_msg": "Error during scanning process"}
else:
return {"proc_status": "NOK", "error_msg": "No permissions to scan collection"}
Expand Down
182 changes: 35 additions & 147 deletions intake_scan.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
import genquery

import intake
from intake_utils import intake_extract_tokens_from_name, intake_tokens_identify_dataset
from intake_utils import dataset_parse_id, intake_scan_get_metadata_update
from util import *


Expand Down Expand Up @@ -40,49 +40,23 @@ def intake_scan_collection(ctx, root, scope, in_dataset, found_datasets):
# Determene lock state for object (no collectoin
locked_state = object_is_locked(ctx, path, False)

if not (locked_state['locked'] or locked_state['frozen']):
remove_dataset_metadata(ctx, path, False)
scan_mark_scanned(ctx, path, False)
if in_dataset:
subscope = scope.copy()

# Safeguard original data
prev_scope = subscope.copy()

# Extract tokens of current name
intake_extract_tokens_from_name(ctx, row[1], row[0], False, subscope)

new_deeper_dataset_toplevel = False
if not (prev_scope['pseudocode'] == subscope['pseudocode']
and prev_scope['experiment_type'] == subscope['experiment_type']
and prev_scope['wave'] == subscope['wave']):
prev_scope['directory'] = prev_scope["dataset_directory"]
if 'version' not in prev_scope:
prev_scope['version'] = 'Raw'

# It is safe to assume that the dataset was collection based without actively checking.
# Otherwise it would mean that each found file on this level could potentially change the dataset properties
# avu.rm_from_coll(ctx, prev_scope['directory'], 'dataset_toplevel', dataset_make_id(prev_scope))
# If still present (could have been removed in loop earlier) remove dataset_toplevel on prev_scope['directory']
try:
avu.rmw_from_coll(ctx, prev_scope['directory'], 'dataset_toplevel', "%")
except msi.Error:
pass
if locked_state['locked'] or locked_state['frozen']:
continue

new_deeper_dataset_toplevel = True
apply_dataset_metadata(ctx, path, subscope, False, new_deeper_dataset_toplevel)
else:
subscope = intake_extract_tokens_from_name(ctx, row[1], row[0], False, scope.copy())
remove_dataset_metadata(ctx, path, False)
scan_mark_scanned(ctx, path, False)

if intake_tokens_identify_dataset(subscope):
parent_in_dataset = in_dataset
metadata_update = intake_scan_get_metadata_update(ctx, path, False, in_dataset, scope)

if metadata_update["in_dataset"]:
apply_dataset_metadata(ctx, path, metadata_update["new_metadata"], False, parent_in_dataset)
if not parent_in_dataset:
# We found a top-level dataset data object.
subscope["dataset_directory"] = row[1]
apply_dataset_metadata(ctx, path, subscope, False, True)
# For reporting purposes collect the subscopes
found_datasets.append(subscope)
else:
apply_partial_metadata(ctx, subscope, path, False)
avu.set_on_data(ctx, path, "unrecognized", "Experiment type, wave or pseudocode missing from path")
found_datasets.append(metadata_update["new_metadata"])
else:
apply_partial_metadata(ctx, metadata_update["new_metadata"], path, False)
avu.set_on_data(ctx, path, "unrecognized", "Experiment type, wave or pseudocode missing from path")

# Scan collections under root
iter = genquery.row_iterator(
Expand All @@ -100,61 +74,28 @@ def intake_scan_collection(ctx, root, scope, in_dataset, found_datasets):
# get locked /frozen status
locked_state = object_is_locked(ctx, path, True)

if not (locked_state['locked'] or locked_state['frozen']):
remove_dataset_metadata(ctx, path, True)

subscope = scope.copy()
child_in_dataset = in_dataset

if in_dataset: # initially is False
# Safeguard original data
prev_scope = subscope.copy()

# Extract tokens of current name
intake_extract_tokens_from_name(ctx, path, dirname, True, subscope)
if locked_state['locked'] or locked_state['frozen']:
continue

new_deeper_dataset_toplevel = False
remove_dataset_metadata(ctx, path, True)
scan_mark_scanned(ctx, path, True)

# A change in path in relation to the dataset_directory invokes handling of deeper lying dataset.
# This, not necessarily with a dataset id change, but it does change the dataset toplevel
# In fact the same dataset simply lies a level deeper which invokes a dataset_toplevel change
if not (prev_scope['pseudocode'] == subscope['pseudocode']
and prev_scope['experiment_type'] == subscope['experiment_type']
and prev_scope['wave'] == subscope['wave']
and path == prev_scope['dataset_directory']):
# Found a deeper lying dataset with more specific attributes
# Prepwork for being able to create a dataset_id
prev_scope['directory'] = prev_scope["dataset_directory"]
if 'version' not in prev_scope:
prev_scope['version'] = 'Raw'
parent_in_dataset = in_dataset
metadata_update = intake_scan_get_metadata_update(ctx, path, True, in_dataset, scope)

# If still present (could have been removed in loop earlier) remove dataset_toplevel on prev_scope['directory']
try:
avu.rmw_from_coll(ctx, prev_scope['directory'], 'dataset_toplevel', "%")
except msi.Error:
pass

# set flag correctly for creating of new toplevel
new_deeper_dataset_toplevel = True

subscope["dataset_directory"] = path
apply_dataset_metadata(ctx, path, subscope, True, new_deeper_dataset_toplevel)
if metadata_update["in_dataset"]:
apply_dataset_metadata(ctx, path, metadata_update["new_metadata"], True)
if not parent_in_dataset:
# We found a new top-level dataset data object.
found_datasets.append(metadata_update["new_metadata"])
else:
apply_partial_metadata(ctx, metadata_update["new_metadata"], path, True)

scan_mark_scanned(ctx, path, True)
else:
subscope = intake_extract_tokens_from_name(ctx, path, dirname, True, subscope)

if intake_tokens_identify_dataset(subscope):
child_in_dataset = True
# We found a top-level dataset collection.
subscope["dataset_directory"] = path
apply_dataset_metadata(ctx, path, subscope, True, True)
# For reporting purposes collect the subscopes
found_datasets.append(subscope)
else:
apply_partial_metadata(ctx, subscope, path, True)
# Go a level deeper
found_datasets = intake_scan_collection(ctx, path, subscope, child_in_dataset, found_datasets)
found_datasets = intake_scan_collection(ctx,
path,
metadata_update["new_metadata"],
parent_in_dataset or metadata_update["in_dataset"],
found_datasets)

return found_datasets

Expand Down Expand Up @@ -272,45 +213,21 @@ def scan_mark_scanned(ctx, path, is_collection):
avu.set_on_data(ctx, path, 'scanned', user_and_timestamp)


def apply_dataset_metadata(ctx, path, scope, is_collection, is_top_level):
def apply_dataset_metadata(ctx, path, scope, is_collection):
"""Apply dataset metadata to an object in a dataset.
:param ctx: Combined type of a callback and rei struct
:param path: Path to the object
:param scope: A scanner scope containing WEPV values
:param is_collection: Whether the object is a collection
:param is_top_level: If true, a dataset_toplevel field will be set on the object
"""

if "version" not in scope:
version = "Raw"
else:
version = scope["version"]

subscope = {"wave": scope["wave"],
"experiment_type": scope["experiment_type"],
"pseudocode": scope["pseudocode"],
"version": version,
"directory": scope["dataset_directory"]}

subscope["dataset_id"] = dataset_make_id(subscope)

# add all keys to this to this level

for key in subscope:
for key in scope:
if subscope[key]:
if is_collection:
avu.set_on_coll(ctx, path, key, subscope[key])
else:
avu.set_on_data(ctx, path, key, subscope[key])

if is_top_level:
# Add dataset_id to dataset_toplevel
if is_collection:
avu.set_on_coll(ctx, path, 'dataset_toplevel', subscope["dataset_id"])
else:
avu.set_on_data(ctx, path, 'dataset_toplevel', subscope["dataset_id"])


def apply_partial_metadata(ctx, scope, path, is_collection):
"""Apply any available id component metadata to the given object.
Expand Down Expand Up @@ -534,32 +451,3 @@ def get_aggregated_object_error_count(ctx, dataset_id, tl_collection):
"COLL_NAME like '" + tl_collection + "%' AND META_DATA_ATTR_NAME = 'error' ",
genquery.AS_LIST, ctx
)))


def dataset_make_id(scope):
"""Construct a dataset based on WEPV and directory.
:param scope: Create a dataset id
:returns: Dataset identifier
"""
return scope['wave'] + '\t' + scope['experiment_type'] + '\t' + scope['pseudocode'] + '\t' + scope['version'] + '\t' + scope['directory']


def dataset_parse_id(dataset_id):
"""Parse a dataset into its consructive data.
:param dataset_id: Dataset identifier
:returns: Dataset as a dict
"""
dataset_parts = dataset_id.split('\t')
dataset = {}
dataset['wave'] = dataset_parts[0]
dataset['experiment_type'] = dataset_parts[1] # Is DIT NOG ERGENS GOED VOOR
dataset['expType'] = dataset_parts[1]
dataset['pseudocode'] = dataset_parts[2]
dataset['version'] = dataset_parts[3]
dataset['directory'] = dataset_parts[4]

return dataset
93 changes: 88 additions & 5 deletions intake_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,16 +28,25 @@ def intake_tokens_identify_dataset(tokens):
return (missing == 0)


def intake_extract_tokens_from_name(ctx, path, name, is_collection, scoped_buffer):
def intake_ensure_version_present(ctx, metadata):
"""Adds a version attribute with a default value to metadata if it is not yet present.
:param ctx: Combined type of a callback and rei struct
:param metadata: Dictionary with intake module metadata
"""
if "version" not in metadata:
metadata["version"] = "Raw"


def intake_extract_tokens_from_name(ctx, path, scoped_buffer):
"""Extract one or more tokens from a file / directory name and add dataset information as metadata.
:param ctx: Combined type of a callback and rei struct
:param path: Path to object or collection
:param name: Name of object or collection
:param is_collection: Indicates if object or collection
:param path: Full path of the data object or collection
:param scoped_buffer: Holds dataset buffer with prefilled keys
:returns: Returns extended scope buffer
"""
name_without_ext = os.path.splitext(name)[0]
basename = os.path.basename(path)
name_without_ext = os.path.splitext(basename)[0]
parts = re.split("[_-]", name_without_ext)
for part in parts:
scoped_buffer.update(intake_extract_tokens(ctx, part))
Expand Down Expand Up @@ -120,3 +129,77 @@ def intake_extract_tokens(ctx, string):
foundKVs["experiment_type"] = str_lower

return foundKVs


def intake_scan_get_metadata_update(ctx, path, is_collection, in_dataset, parent_metadata):
"""Determine metadata to be updated for a particular collection or data object, based
on its name and parent metadata.
This function is separate from the function that actually performs the updates, so
that we can test the logic separately.
:param ctx: Combined type of a callback and rei struct
:param path: Full path of the data object or collection
:param is_collection: true if it's a collection, false if it's a data object
:param in_dataset: true if the parent already has complete WEP(V) attributes. Otherwise false.
:param parent_metadata: dict containing the intake module metadata of the parent collection ( if any)
:returns: Returns a dictionary with the following keys / values:
new_metadata: dictionary of new metadata to apply to this data object or collection
in_dataset: true if current object (along with values passed from parents) has complete WEP(V) values.
otherwise false.
"""

local_metadata = parent_metadata.copy()

result = {"new_metadata": local_metadata, "in_dataset": in_dataset}

if in_dataset:
# If we already are in a dataset, we get all the metadata from the parent. We
# cannot override attributes in this case. However we need to remove the top-level
# attribute, because the present object is within in a dataset, and thus not a top-level
# data object.
if "dataset_toplevel" in local_metadata:
del [local_metadata["dataset_toplevel"]]
else:
intake_extract_tokens_from_name(ctx, path, local_metadata)
if intake_tokens_identify_dataset(local_metadata):
intake_ensure_version_present(ctx, local_metadata)
local_metadata["directory"] = path if is_collection else os.path.dirname(path)
local_metadata["dataset_id"] = dataset_make_id(local_metadata)
local_metadata["dataset_toplevel"] = dataset_make_id(local_metadata)
result["in_dataset"] = True
else:
# result["in_dataset"] is already set to false
pass

return result


def dataset_make_id(scope):
"""Construct a dataset based on WEPV and directory.
:param scope: Create a dataset id
:returns: Dataset identifier
"""
return scope['wave'] + '\t' + scope['experiment_type'] + '\t' + scope['pseudocode'] + '\t' + scope['version'] + '\t' + scope['directory']


def dataset_parse_id(dataset_id):
"""Parse a dataset into its consructive data.
:param dataset_id: Dataset identifier
:returns: Dataset as a dict
"""
dataset_parts = dataset_id.split('\t')
dataset = {}
dataset['wave'] = dataset_parts[0]
dataset['experiment_type'] = dataset_parts[1] # Is DIT NOG ERGENS GOED VOOR
dataset['expType'] = dataset_parts[1]
dataset['pseudocode'] = dataset_parts[2]
dataset['version'] = dataset_parts[3]
dataset['directory'] = dataset_parts[4]

return dataset
Loading

0 comments on commit 7472267

Please sign in to comment.