Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add new search index and function on release #206

Open
wants to merge 14 commits into
base: 3.2.0
Choose a base branch
from
29 changes: 29 additions & 0 deletions mongo_db_script/updateDataRecordsScript.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
db.dataRecords.aggregate([
{
// Lookup to join submissions collection based on submissionID
$lookup: {
from: "submissions",
localField: "submissionID",
foreignField: "_id",
as: "submission_info"
}
},
{
// Unwind to access the submission_info document as a single object
$unwind: "$submission_info"
},
{
// Set the studyID in dataRecords based on the studyID from the submissions
$set: {
"studyID": "$submission_info.studyID"
}
},
{
// Merge the updates back into the dataRecords collection
$merge: {
into: "dataRecords",
whenMatched: "merge",
whenNotMatched: "discard"
}
}
]);
59 changes: 59 additions & 0 deletions mongo_db_script/updateReleaseScript.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
db.release.aggregate([
{
// Lookup to join submissions collection based on submissionID
$lookup: {
from: "submissions",
localField: "submissionID",
foreignField: "_id",
as: "submission_info"
}
},
{
// Unwind to access the submission_info document as a single object
$unwind: "$submission_info"
},
{
// Set the studyID in dataRecords based on the studyID from the submissions
$set: {
"studyID": "$submission_info.studyID"
}
},
{
// Merge the updates back into the release collection
$merge: {
into: "release",
whenMatched: "merge",
whenNotMatched: "discard"
}
}
]);


db.release.updateMany({"nodeType": "program"},
{"$set": {"entityType": "Program"}}
);

db.release.updateMany({"nodeType": "study"},
{"$set": {"entityType": "Study"}}
);

db.release.updateMany({"nodeType": {"$in": [ "participant", "subject", "case"]}
},
{"$set": {"entityType": "Participant"}}
);

db.release.updateMany({"nodeType": {"$in": [ "sample", "specimen"]}
},
{"$set": {"entityType": "Sample"}}
);

db.release.updateMany({"nodeType": {"$in": [ "principal_investigator", "study_personnel"]}
},
{"$set": {"entityType": "Principal Investigator"}}
);

db.release.updateMany({"nodeType": {"$in": [ "file", "data_file", "clinical_measure_file", "cytogenomic_file", "radiology_file", "methylation_array_file", "sequencing_file"]}
}, {"$set": {"entityType": "File"}}
);


1 change: 1 addition & 0 deletions src/common/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -189,3 +189,4 @@
VALIDATION_TYPE_METADATA = "metadata"
VALIDATION_TYPE_FILE= "data file"
BATCH_IDS = "batchIDs"
QC_VALIDATE_DATE = "validatedDate"
63 changes: 41 additions & 22 deletions src/common/mongo_dao.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
VALUE_PROP, ERRORS, WARNINGS, VALIDATED_AT, STATUS_ERROR, STATUS_WARNING, PARENT_ID_NAME, \
SUBMISSION_REL_STATUS, SUBMISSION_REL_STATUS_DELETED, STUDY_ABBREVIATION, SUBMISSION_STATUS, STUDY_ID, \
CROSS_SUBMISSION_VALIDATION_STATUS, ADDITION_ERRORS, VALIDATION_COLLECTION, VALIDATION_ENDED, CONFIG_COLLECTION, \
BATCH_BUCKET, CDE_COLLECTION, CDE_CODE, CDE_VERSION, ENTITY_TYPE, QC_COLLECTION, QC_RESULT_ID
BATCH_BUCKET, CDE_COLLECTION, CDE_CODE, CDE_VERSION, ENTITY_TYPE, QC_COLLECTION, QC_RESULT_ID, SUBMISSION_REL_STATUS_RELEASED
from common.utils import get_exception_msg, current_datetime

MAX_SIZE = 10000
Expand Down Expand Up @@ -601,7 +601,7 @@ def find_submissions(self, query):
"""
set dataRecords search index, 'submissionID_nodeType_nodeID'
"""
def set_search_index_dataRecords(self, submission_index, crdc_index):
def set_search_index_dataRecords(self, submission_index, crdc_index, study_entity_type_index):
db = self.client[self.db_name]
data_collection = db[DATA_COLlECTION]
try:
Expand All @@ -612,6 +612,9 @@ def set_search_index_dataRecords(self, submission_index, crdc_index):
if not index_dict.get(crdc_index):
result = data_collection.create_index([(DATA_COMMON_NAME), (NODE_TYPE),(NODE_ID)], \
name=crdc_index)
if not index_dict.get(study_entity_type_index):
result = data_collection.create_index([(STUDY_ID), (ENTITY_TYPE),(NODE_ID)], \
name=study_entity_type_index)
return True
except errors.PyMongoError as pe:
self.log.exception(pe)
Expand All @@ -625,7 +628,7 @@ def set_search_index_dataRecords(self, submission_index, crdc_index):
"""
set release search index, 'dataCommons_nodeType_nodeID'
"""
def set_search_release_index(self, dataCommon_index, crdcID_index):
def set_search_release_index(self, dataCommon_index, crdcID_index, study_entity_type_index):
db = self.client[self.db_name]
data_collection = db[RELEASE_COLLECTION]
try:
Expand All @@ -636,6 +639,9 @@ def set_search_release_index(self, dataCommon_index, crdcID_index):
if not index_dict or not index_dict.get(crdcID_index):
result = data_collection.create_index([(CRDC_ID)], \
name=crdcID_index)
if not index_dict.get(study_entity_type_index):
result = data_collection.create_index([(STUDY_ID), (ENTITY_TYPE),(NODE_ID)], \
name=study_entity_type_index)
return True
except errors.PyMongoError as pe:
self.log.exception(pe)
Expand Down Expand Up @@ -771,7 +777,7 @@ def search_node(self, data_commons, node_type, node_id):
released_nodes = [node for node in results if node.get(SUBMISSION_REL_STATUS) != SUBMISSION_REL_STATUS_DELETED ]
if len(released_nodes) == 0:
# search dataRecords
deleted_submission_ids = [rel[SUBMISSION_ID] for rel in results if rel.get(SUBMISSION_REL_STATUS) == SUBMISSION_REL_STATUS_DELETED ]
deleted_submission_ids = [rel[SUBMISSION_ID] for rel in results if SUBMISSION_REL_STATUS_DELETED in rel.get(SUBMISSION_REL_STATUS)]
rtn_val = self.search_node_by_index_crdc(data_commons, node_type, node_id, deleted_submission_ids)
else:
rtn_val = released_nodes[0]
Expand All @@ -794,25 +800,12 @@ def search_node_by_study(self, studyID, entity_type, node_id):
:return:
"""
db = self.client[self.db_name]
data_collection = db[RELEASE_COLLECTION]
data_collection = db[DATA_COLlECTION]
try:
submissions = self.find_submissions({STUDY_ID: studyID})
if len(submissions) < 2: #if there is only one submission that's own submission, skip.
return None
submission_id_list = [item[ID] for item in submissions]
results = data_collection.find({ENTITY_TYPE: entity_type, NODE_ID: node_id, SUBMISSION_ID: {"$in": submission_id_list}})
released_nodes = [node for node in results if node.get(SUBMISSION_REL_STATUS) != SUBMISSION_REL_STATUS_DELETED ]
if len(released_nodes) == 0:
deleted_submission_ids = [rel[SUBMISSION_ID] for rel in results if rel.get(SUBMISSION_REL_STATUS) == SUBMISSION_REL_STATUS_DELETED ]
submission_id_list = [item for item in submission_id_list if item not in deleted_submission_ids]
if len(submission_id_list) < 2:
return None
# search dataRecords
data_collection = db[DATA_COLlECTION]
rtn_val = data_collection.find_one({ENTITY_TYPE: entity_type, NODE_ID: node_id, SUBMISSION_ID: {"$in": submission_id_list}})
else:
rtn_val = released_nodes[0]
return rtn_val
existed_node = self.search_released_node_by_study(studyID, entity_type, node_id)
if not existed_node:
existed_node = data_collection.find_one({STUDY_ID: studyID, ENTITY_TYPE: entity_type, NODE_ID: node_id})
return existed_node
except errors.PyMongoError as pe:
self.log.exception(pe)
self.log.exception(f"Failed to search node for study: {get_exception_msg()}")
Expand All @@ -821,6 +814,32 @@ def search_node_by_study(self, studyID, entity_type, node_id):
self.log.exception(e)
self.log.exception(f"Failed to search node for study {get_exception_msg()}")
return None

def search_released_node_by_study(self, studyID, entity_type, node_id):
"""
Search release collection for given node
:param data_commons:
:param node_type:
:param node_id:
:return:
"""
db = self.client[self.db_name]
data_collection = db[RELEASE_COLLECTION]
try:
query = {STUDY_ID: studyID, ENTITY_TYPE: entity_type, NODE_ID: node_id}
results = list(data_collection.find(query))
if not results or len(results) == 0:
return None
released_node = next(node for node in results if SUBMISSION_REL_STATUS_DELETED not in node.get(SUBMISSION_REL_STATUS))
return released_node
except errors.PyMongoError as pe:
self.log.exception(pe)
self.log.exception(f"Failed to find release record for {studyID}/{entity_type}/{node_id}: {get_exception_msg()}")
return None
except Exception as e:
self.log.exception(e)
self.log.exception(f"Failed to find release record for {studyID}/{entity_type}/{node_id}: {get_exception_msg()}")
return None

def search_released_node(self, data_commons, node_type, node_id):
"""
Expand Down
4 changes: 4 additions & 0 deletions src/data_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,10 @@ def get_crdc_id(self, exist_node, node_type, node_id, studyID):
if studyID and node_id and entity_type:
result = self.mongo_dao.search_node_by_study(studyID, entity_type, node_id)
crdc_id = result.get(CRDC_ID) if result else None
# if the crdc_id can't be identified from the existing dataset, a new crdc_id should be generated.
if not crdc_id:
crdc_id = get_uuid_str()

else:
crdc_id = exist_node.get(CRDC_ID)
return crdc_id
Expand Down
6 changes: 3 additions & 3 deletions src/file_validator.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from common.constants import ERRORS, WARNINGS, STATUS, S3_FILE_INFO, ID, SIZE, MD5, UPDATED_AT, \
FILE_NAME, SQS_TYPE, SQS_NAME, FILE_ID, STATUS_ERROR, STATUS_WARNING, STATUS_PASSED, SUBMISSION_ID, \
BATCH_BUCKET, SERVICE_TYPE_FILE, LAST_MODIFIED, CREATED_AT, TYPE, SUBMISSION_INTENTION, SUBMISSION_INTENTION_DELETE,\
VALIDATION_ID, VALIDATION_ENDED, QC_RESULT_ID, BATCH_IDS, VALIDATION_TYPE_FILE, QC_SEVERITY
VALIDATION_ID, VALIDATION_ENDED, QC_RESULT_ID, VALIDATION_TYPE_FILE, QC_SEVERITY, QC_VALIDATE_DATE
from common.utils import get_exception_msg, current_datetime, get_s3_file_info, get_s3_file_md5, create_error, get_uuid_str
from service.ecs_agent import set_scale_in_protection
from metadata_validator import get_qc_result
Expand Down Expand Up @@ -406,13 +406,13 @@ def save_qc_result(self, fileRecord, status, error):
qc_result = self.mongo_dao.get_qcRecord(fileRecord[S3_FILE_INFO][QC_RESULT_ID])
if status == STATUS_ERROR or status == STATUS_WARNING:
if not qc_result:
qc_result = get_qc_result(fileRecord, self.submission, VALIDATION_TYPE_FILE, self.mongo_dao)
qc_result = get_qc_result(fileRecord, VALIDATION_TYPE_FILE, self.mongo_dao)
self.set_status(fileRecord, qc_result, status, error)
if status == STATUS_PASSED and qc_result:
self.mongo_dao.delete_qcRecord(qc_result[ID])
qc_result = None
fileRecord[S3_FILE_INFO][QC_RESULT_ID] = None
if qc_result: # save QC result
fileRecord[S3_FILE_INFO][QC_RESULT_ID] = qc_result[ID]
qc_result["validatedDate"] = current_datetime()
qc_result[QC_VALIDATE_DATE] = current_datetime()
self.mongo_dao.save_qc_results([qc_result])
62 changes: 32 additions & 30 deletions src/metadata_validator.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
VALIDATED_AT, SERVICE_TYPE_METADATA, NODE_ID, PROPERTIES, PARENTS, KEY, NODE_ID, PARENT_TYPE, PARENT_ID_NAME, PARENT_ID_VAL, \
SUBMISSION_INTENTION, SUBMISSION_INTENTION_NEW_UPDATE, SUBMISSION_INTENTION_DELETE, TYPE_METADATA_VALIDATE, TYPE_CROSS_SUBMISSION, \
SUBMISSION_REL_STATUS_RELEASED, VALIDATION_ID, VALIDATION_ENDED, CDE_TERM, TERM_CODE, TERM_VERSION, CDE_PERMISSIVE_VALUES, \
QC_RESULT_ID, BATCH_IDS, VALIDATION_TYPE_METADATA, S3_FILE_INFO, VALIDATION_TYPE_FILE, QC_SEVERITY
QC_RESULT_ID, BATCH_IDS, VALIDATION_TYPE_METADATA, S3_FILE_INFO, VALIDATION_TYPE_FILE, QC_SEVERITY, QC_VALIDATE_DATE
from common.utils import current_datetime, get_exception_msg, dump_dict_to_json, create_error, get_uuid_str
from common.model_store import ModelFactory
from common.model_reader import valid_prop_types
Expand Down Expand Up @@ -163,33 +163,33 @@ def validate_nodes(self, data_records):
if record.get(QC_RESULT_ID):
qc_result = self.mongo_dao.get_qcRecord(record[QC_RESULT_ID])
status, errors, warnings = self.validate_node(record)
if errors and len(errors) > 0:
self.isError = True
if not qc_result:
qc_result = get_qc_result(record, self.submission, VALIDATION_TYPE_METADATA, self.mongo_dao)
qc_result[ERRORS] = errors
else:
if qc_result:
qc_result[ERRORS] = []
if warnings and len(warnings)> 0:
self.isWarning = True
if not qc_result:
qc_result = get_qc_result(record, self.submission, VALIDATION_TYPE_METADATA, self.mongo_dao)
qc_result[WARNINGS] = warnings
else:
if qc_result:
qc_result[WARNINGS] = []

if status == STATUS_PASSED:
if qc_result:
self.mongo_dao.delete_qcRecord(qc_result[ID])
qc_result = None
record[QC_RESULT_ID] = None
else:
qc_result[QC_SEVERITY] = STATUS_ERROR if self.isError else STATUS_WARNING
qc_result["validatedDate"] = current_datetime()
if not qc_result:
record[QC_RESULT_ID] = None
qc_result = get_qc_result(record, VALIDATION_TYPE_METADATA, self.mongo_dao)
if errors and len(errors) > 0:
self.isError = True
qc_result[ERRORS] = errors
qc_result[QC_SEVERITY] = STATUS_ERROR
else:
qc_result[ERRORS] = []
if warnings and len(warnings)> 0:
self.isWarning = True
qc_result[WARNINGS] = warnings
if not errors or len(errors) == 0:
qc_result[QC_SEVERITY] = STATUS_WARNING
else:
qc_result[WARNINGS] = []

qc_result[QC_VALIDATE_DATE] = current_datetime()
qc_results.append(qc_result)
record[QC_RESULT_ID] = qc_result[ID]

record[STATUS] = status
record[UPDATED_AT] = record[VALIDATED_AT] = current_datetime()
updated_records.append(record)
Expand Down Expand Up @@ -540,10 +540,13 @@ def validate_prop_value(self, prop_name, value, prop_def, msg_prefix):
def get_permissive_value(self, prop_def):
permissive_vals = prop_def.get("permissible_values")
if prop_def.get(CDE_TERM) and len(prop_def.get(CDE_TERM)) > 0:
# retrieve permissible values from DB
cde_term = prop_def[CDE_TERM][0]
cde_code = cde_term.get(TERM_CODE)
cde_version = cde_term.get(TERM_VERSION)
# retrieve permissible values from DB or cde site
cde_code = None
cde_terms = [ct for ct in prop_def[CDE_TERM] if 'caDSR' in ct.get('Origin', '')]
if cde_terms and len(cde_terms):
cde_code = cde_terms[0].get(TERM_CODE)
cde_version = cde_terms[0].get(TERM_VERSION)

if not cde_code:
return permissive_vals

Expand All @@ -565,8 +568,7 @@ def get_permissive_value(self, prop_def):
permissive_vals = None #escape validation
self.mongo_dao.insert_cde([cde])
return permissive_vals



"""util functions"""
def check_permissive(value, permissive_vals, msg_prefix, prop_name):
result = True,
Expand Down Expand Up @@ -595,18 +597,18 @@ def check_boundary(value, min, max, msg_prefix, prop_name):
"""
get qc result for the node record by qc_id
"""
def get_qc_result(node, submission, validation_type, mongo_dao):
def get_qc_result(node, validation_type, mongo_dao):
qc_id = node.get(QC_RESULT_ID) if validation_type == VALIDATION_TYPE_METADATA else node[S3_FILE_INFO].get(QC_RESULT_ID)
rc_result = None
if not qc_id:
rc_result = create_new_qc_result(node, submission, validation_type)
rc_result = create_new_qc_result(node, validation_type)
else:
rc_result = mongo_dao.get_qcRecord(qc_id)
if not rc_result:
rc_result = create_new_qc_result(node, submission, validation_type)
rc_result = create_new_qc_result(node, validation_type)
return rc_result

def create_new_qc_result(node, submission, validation_type):
def create_new_qc_result(node, validation_type):
qc_result = {
ID: get_uuid_str(),
SUBMISSION_ID: node[SUBMISSION_ID],
Expand Down
6 changes: 4 additions & 2 deletions src/validator.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,11 @@

DATA_RECORDS_SEARCH_INDEX = "submissionID_nodeType_nodeID"
DATA_RECORDS_CRDC_SEARCH_INDEX = "dataCommons_nodeType_nodeID"
DATA_RECORDS_STUDY_ENTITY_INDEX = 'studyID_entityType_nodeID'
RELEASE_SEARCH_INDEX = "dataCommons_nodeType_nodeID"
CRDCID_SEARCH_INDEX = "CRDC_ID"
CDE_SEARCH_INDEX = 'CDECode_1_CDEVersion_1'

#Set log file prefix for bento logger
if LOG_PREFIX not in os.environ:
os.environ[LOG_PREFIX] = 'Validation Service'
Expand All @@ -47,11 +49,11 @@ def controller():
job_queue = Queue(configs[SQS_NAME])
mongo_dao = MongoDao(configs[MONGO_DB], configs[DB])
# set dataRecord search index
if not mongo_dao.set_search_index_dataRecords(DATA_RECORDS_SEARCH_INDEX, DATA_RECORDS_CRDC_SEARCH_INDEX):
if not mongo_dao.set_search_index_dataRecords(DATA_RECORDS_SEARCH_INDEX, DATA_RECORDS_CRDC_SEARCH_INDEX, DATA_RECORDS_STUDY_ENTITY_INDEX):
log.error("Failed to set dataRecords search index!")
return 1
# set release search index
if not mongo_dao.set_search_release_index(RELEASE_SEARCH_INDEX , CRDCID_SEARCH_INDEX):
if not mongo_dao.set_search_release_index(RELEASE_SEARCH_INDEX, CRDCID_SEARCH_INDEX, DATA_RECORDS_STUDY_ENTITY_INDEX):
log.error("Failed to set release search index!")
return 1

Expand Down