diff --git a/mongo_db_script/updateDataRecordsScript.js b/mongo_db_script/updateDataRecordsScript.js new file mode 100644 index 0000000..eea25a3 --- /dev/null +++ b/mongo_db_script/updateDataRecordsScript.js @@ -0,0 +1,29 @@ +db.dataRecords.aggregate([ + { + // Lookup to join submissions collection based on submissionID + $lookup: { + from: "submissions", + localField: "submissionID", + foreignField: "_id", + as: "submission_info" + } + }, + { + // Unwind to access the submission_info document as a single object + $unwind: "$submission_info" + }, + { + // Set the studyID in dataRecords based on the studyID from the submissions + $set: { + "studyID": "$submission_info.studyID" + } + }, + { + // Merge the updates back into the dataRecords collection + $merge: { + into: "dataRecords", + whenMatched: "merge", + whenNotMatched: "discard" + } + } +]); diff --git a/mongo_db_script/updateReleaseScript.js b/mongo_db_script/updateReleaseScript.js new file mode 100644 index 0000000..b3f9c61 --- /dev/null +++ b/mongo_db_script/updateReleaseScript.js @@ -0,0 +1,59 @@ +db.release.aggregate([ + { + // Lookup to join submissions collection based on submissionID + $lookup: { + from: "submissions", + localField: "submissionID", + foreignField: "_id", + as: "submission_info" + } + }, + { + // Unwind to access the submission_info document as a single object + $unwind: "$submission_info" + }, + { + // Set the studyID in dataRecords based on the studyID from the submissions + $set: { + "studyID": "$submission_info.studyID" + } + }, + { + // Merge the updates back into the release collection + $merge: { + into: "release", + whenMatched: "merge", + whenNotMatched: "discard" + } + } +]); + + +db.release.updateMany({"nodeType": "program"}, + {"$set": {"entityType": "Program"}} +); + +db.release.updateMany({"nodeType": "study"}, + {"$set": {"entityType": "Study"}} +); + +db.release.updateMany({"nodeType": {"$in": [ "participant", "subject", "case"]} +}, + {"$set": {"entityType": "Participant"}} +); + +db.release.updateMany({"nodeType": {"$in": [ "sample", "specimen"]} +}, + {"$set": {"entityType": "Sample"}} +); + +db.release.updateMany({"nodeType": {"$in": [ "principal_investigator", "study_personnel"]} +}, + {"$set": {"entityType": "Principal Investigator"}} +); + +db.release.updateMany({"nodeType": {"$in": [ "file", "data_file", "clinical_measure_file", "cytogenomic_file", "radiology_file", "methylation_array_file", "sequencing_file"]} +}, {"$set": {"entityType": "File"}} +); + + diff --git a/src/common/mongo_dao.py b/src/common/mongo_dao.py index eb81f83..6b66827 100644 --- a/src/common/mongo_dao.py +++ b/src/common/mongo_dao.py @@ -7,7 +7,7 @@ VALUE_PROP, ERRORS, WARNINGS, VALIDATED_AT, STATUS_ERROR, STATUS_WARNING, PARENT_ID_NAME, \ SUBMISSION_REL_STATUS, SUBMISSION_REL_STATUS_DELETED, STUDY_ABBREVIATION, SUBMISSION_STATUS, STUDY_ID, \ CROSS_SUBMISSION_VALIDATION_STATUS, ADDITION_ERRORS, VALIDATION_COLLECTION, VALIDATION_ENDED, CONFIG_COLLECTION, \ - BATCH_BUCKET, CDE_COLLECTION, CDE_CODE, CDE_VERSION, ENTITY_TYPE, QC_COLLECTION, QC_RESULT_ID, CONFIG_TYPE + BATCH_BUCKET, CDE_COLLECTION, CDE_CODE, CDE_VERSION, ENTITY_TYPE, QC_COLLECTION, QC_RESULT_ID, CONFIG_TYPE, SUBMISSION_REL_STATUS_RELEASED from common.utils import get_exception_msg, current_datetime MAX_SIZE = 10000 @@ -601,7 +601,7 @@ def find_submissions(self, query): """ set dataRecords search index, 'submissionID_nodeType_nodeID' """ - def set_search_index_dataRecords(self, submission_index, crdc_index): + def set_search_index_dataRecords(self, submission_index, crdc_index, study_entity_type_index): db = self.client[self.db_name] data_collection = db[DATA_COLlECTION] try: @@ -612,6 +612,9 @@ def set_search_index_dataRecords(self, submission_index, crdc_index): if not index_dict.get(crdc_index): result = data_collection.create_index([(DATA_COMMON_NAME), (NODE_TYPE),(NODE_ID)], \ name=crdc_index) + if not index_dict.get(study_entity_type_index): + result = data_collection.create_index([(STUDY_ID), (ENTITY_TYPE),(NODE_ID)], \ + name=study_entity_type_index) return True except errors.PyMongoError as pe: self.log.exception(pe) @@ -625,7 +628,7 @@ def set_search_index_dataRecords(self, submission_index, crdc_index): """ set release search index, 'dataCommons_nodeType_nodeID' """ - def set_search_release_index(self, dataCommon_index, crdcID_index): + def set_search_release_index(self, dataCommon_index, crdcID_index, study_entity_type_index): db = self.client[self.db_name] data_collection = db[RELEASE_COLLECTION] try: @@ -636,6 +639,9 @@ def set_search_release_index(self, dataCommon_index, crdcID_index): if not index_dict or not index_dict.get(crdcID_index): result = data_collection.create_index([(CRDC_ID)], \ name=crdcID_index) + if not index_dict.get(study_entity_type_index): + result = data_collection.create_index([(STUDY_ID), (ENTITY_TYPE),(NODE_ID)], \ + name=study_entity_type_index) return True except errors.PyMongoError as pe: self.log.exception(pe) @@ -771,7 +777,7 @@ def search_node(self, data_commons, node_type, node_id): released_nodes = [node for node in results if node.get(SUBMISSION_REL_STATUS) != SUBMISSION_REL_STATUS_DELETED ] if len(released_nodes) == 0: # search dataRecords - deleted_submission_ids = [rel[SUBMISSION_ID] for rel in results if rel.get(SUBMISSION_REL_STATUS) == SUBMISSION_REL_STATUS_DELETED ] + deleted_submission_ids = [rel[SUBMISSION_ID] for rel in results if SUBMISSION_REL_STATUS_DELETED in rel.get(SUBMISSION_REL_STATUS)] rtn_val = self.search_node_by_index_crdc(data_commons, node_type, node_id, deleted_submission_ids) else: rtn_val = released_nodes[0] @@ -794,25 +800,12 @@ def search_node_by_study(self, studyID, entity_type, node_id): :return: """ db = self.client[self.db_name] - data_collection = db[RELEASE_COLLECTION] + data_collection = db[DATA_COLlECTION] try: - submissions = self.find_submissions({STUDY_ID: studyID}) - if len(submissions) < 2: #if there is only one submission that's own submission, skip. - return None - submission_id_list = [item[ID] for item in submissions] - results = data_collection.find({ENTITY_TYPE: entity_type, NODE_ID: node_id, SUBMISSION_ID: {"$in": submission_id_list}}) - released_nodes = [node for node in results if node.get(SUBMISSION_REL_STATUS) != SUBMISSION_REL_STATUS_DELETED ] - if len(released_nodes) == 0: - deleted_submission_ids = [rel[SUBMISSION_ID] for rel in results if rel.get(SUBMISSION_REL_STATUS) == SUBMISSION_REL_STATUS_DELETED ] - submission_id_list = [item for item in submission_id_list if item not in deleted_submission_ids] - if len(submission_id_list) < 2: - return None - # search dataRecords - data_collection = db[DATA_COLlECTION] - rtn_val = data_collection.find_one({ENTITY_TYPE: entity_type, NODE_ID: node_id, SUBMISSION_ID: {"$in": submission_id_list}}) - else: - rtn_val = released_nodes[0] - return rtn_val + existed_node = self.search_released_node_by_study(studyID, entity_type, node_id) + if not existed_node: + existed_node = data_collection.find_one({STUDY_ID: studyID, ENTITY_TYPE: entity_type, NODE_ID: node_id}) + return existed_node except errors.PyMongoError as pe: self.log.exception(pe) self.log.exception(f"Failed to search node for study: {get_exception_msg()}") @@ -821,6 +814,32 @@ def search_node_by_study(self, studyID, entity_type, node_id): self.log.exception(e) self.log.exception(f"Failed to search node for study {get_exception_msg()}") return None + + def search_released_node_by_study(self, studyID, entity_type, node_id): + """ + Search release collection for given node + :param data_commons: + :param node_type: + :param node_id: + :return: + """ + db = self.client[self.db_name] + data_collection = db[RELEASE_COLLECTION] + try: + query = {STUDY_ID: studyID, ENTITY_TYPE: entity_type, NODE_ID: node_id} + results = list(data_collection.find(query)) + if not results or len(results) == 0: + return None + released_node = next(node for node in results if SUBMISSION_REL_STATUS_DELETED not in node.get(SUBMISSION_REL_STATUS)) + return released_node + except errors.PyMongoError as pe: + self.log.exception(pe) + self.log.exception(f"Failed to find release record for {studyID}/{entity_type}/{node_id}: {get_exception_msg()}") + return None + except Exception as e: + self.log.exception(e) + self.log.exception(f"Failed to find release record for {studyID}/{entity_type}/{node_id}: {get_exception_msg()}") + return None def search_released_node(self, data_commons, node_type, node_id): """ diff --git a/src/data_loader.py b/src/data_loader.py index 59ecdb4..f262644 100644 --- a/src/data_loader.py +++ b/src/data_loader.py @@ -188,6 +188,10 @@ def get_crdc_id(self, exist_node, node_type, node_id, studyID): if studyID and node_id and entity_type: result = self.mongo_dao.search_node_by_study(studyID, entity_type, node_id) crdc_id = result.get(CRDC_ID) if result else None + # if the crdc_id can't be identified from the existing dataset, a new crdc_id should be generated. + if not crdc_id: + crdc_id = get_uuid_str() + else: crdc_id = exist_node.get(CRDC_ID) return crdc_id diff --git a/src/file_validator.py b/src/file_validator.py index 9de032d..27056ae 100644 --- a/src/file_validator.py +++ b/src/file_validator.py @@ -8,7 +8,7 @@ from common.constants import ERRORS, WARNINGS, STATUS, S3_FILE_INFO, ID, SIZE, MD5, UPDATED_AT, \ FILE_NAME, SQS_TYPE, SQS_NAME, FILE_ID, STATUS_ERROR, STATUS_WARNING, STATUS_PASSED, SUBMISSION_ID, \ BATCH_BUCKET, SERVICE_TYPE_FILE, LAST_MODIFIED, CREATED_AT, TYPE, SUBMISSION_INTENTION, SUBMISSION_INTENTION_DELETE,\ - VALIDATION_ID, VALIDATION_ENDED, QC_RESULT_ID, BATCH_IDS, VALIDATION_TYPE_FILE, QC_SEVERITY + VALIDATION_ID, VALIDATION_ENDED, QC_RESULT_ID, VALIDATION_TYPE_FILE, QC_SEVERITY, QC_VALIDATE_DATE from common.utils import get_exception_msg, current_datetime, get_s3_file_info, get_s3_file_md5, create_error, get_uuid_str from service.ecs_agent import set_scale_in_protection from metadata_validator import get_qc_result @@ -414,5 +414,5 @@ def save_qc_result(self, fileRecord, status, error): fileRecord[S3_FILE_INFO][QC_RESULT_ID] = None if qc_result: # save QC result fileRecord[S3_FILE_INFO][QC_RESULT_ID] = qc_result[ID] - qc_result["validatedDate"] = current_datetime() + qc_result[QC_VALIDATE_DATE] = current_datetime() self.mongo_dao.save_qc_results([qc_result]) \ No newline at end of file diff --git a/src/metadata_validator.py b/src/metadata_validator.py index 51e3bdd..eae751d 100644 --- a/src/metadata_validator.py +++ b/src/metadata_validator.py @@ -13,6 +13,7 @@ QC_RESULT_ID, BATCH_IDS, VALIDATION_TYPE_METADATA, S3_FILE_INFO, VALIDATION_TYPE_FILE, QC_SEVERITY, QC_VALIDATE_DATE, QC_ORIGIN, \ QC_ORIGIN_METADATA_VALIDATE_SERVICE, QC_ORIGIN_FILE_VALIDATE_SERVICE, DISPLAY_ID, UPLOADED_DATE, LATEST_BATCH_ID, SUBMITTED_ID, \ LATEST_BATCH_DISPLAY_ID, QC_VALIDATION_TYPE, DATA_RECORD_ID + from common.utils import current_datetime, get_exception_msg, dump_dict_to_json, create_error, get_uuid_str from common.model_store import ModelFactory from common.model_reader import valid_prop_types @@ -186,12 +187,11 @@ def validate_nodes(self, data_records): if not errors or len(errors) == 0: qc_result[QC_SEVERITY] = STATUS_WARNING else: - qc_result[WARNINGS] = [] + qc_result[WARNINGS] = [] qc_result[QC_VALIDATE_DATE] = current_datetime() qc_results.append(qc_result) record[QC_RESULT_ID] = qc_result[ID] - record[STATUS] = status record[UPDATED_AT] = record[VALIDATED_AT] = current_datetime() updated_records.append(record) @@ -569,8 +569,7 @@ def get_permissive_value(self, prop_def): permissive_vals = None #escape validation self.mongo_dao.insert_cde([cde]) return permissive_vals - - + """util functions""" def check_permissive(value, permissive_vals, msg_prefix, prop_name): result = True, diff --git a/src/validator.py b/src/validator.py index 4ad9bdd..171cab7 100644 --- a/src/validator.py +++ b/src/validator.py @@ -20,9 +20,11 @@ DATA_RECORDS_SEARCH_INDEX = "submissionID_nodeType_nodeID" DATA_RECORDS_CRDC_SEARCH_INDEX = "dataCommons_nodeType_nodeID" +DATA_RECORDS_STUDY_ENTITY_INDEX = 'studyID_entityType_nodeID' RELEASE_SEARCH_INDEX = "dataCommons_nodeType_nodeID" CRDCID_SEARCH_INDEX = "CRDC_ID" CDE_SEARCH_INDEX = 'CDECode_1_CDEVersion_1' + #Set log file prefix for bento logger if LOG_PREFIX not in os.environ: os.environ[LOG_PREFIX] = 'Validation Service' @@ -48,11 +50,11 @@ def controller(): # mongo_dao = MongoDao(configs[MONGO_DB], configs[DB]) mongo_dao = config.mongodb_dao # set dataRecord search index - if not mongo_dao.set_search_index_dataRecords(DATA_RECORDS_SEARCH_INDEX, DATA_RECORDS_CRDC_SEARCH_INDEX): + if not mongo_dao.set_search_index_dataRecords(DATA_RECORDS_SEARCH_INDEX, DATA_RECORDS_CRDC_SEARCH_INDEX, DATA_RECORDS_STUDY_ENTITY_INDEX): log.error("Failed to set dataRecords search index!") return 1 # set release search index - if not mongo_dao.set_search_release_index(RELEASE_SEARCH_INDEX , CRDCID_SEARCH_INDEX): + if not mongo_dao.set_search_release_index(RELEASE_SEARCH_INDEX, CRDCID_SEARCH_INDEX, DATA_RECORDS_STUDY_ENTITY_INDEX): log.error("Failed to set release search index!") return 1