Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CRDCDH 2004 and 2005 #207

Open
wants to merge 16 commits into
base: 3.2.0
Choose a base branch
from
29 changes: 29 additions & 0 deletions mongo_db_script/updateDataRecordsScript.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
db.dataRecords.aggregate([
{
// Lookup to join submissions collection based on submissionID
$lookup: {
from: "submissions",
localField: "submissionID",
foreignField: "_id",
as: "submission_info"
}
},
{
// Unwind to access the submission_info document as a single object
$unwind: "$submission_info"
},
{
// Set the studyID in dataRecords based on the studyID from the submissions
$set: {
"studyID": "$submission_info.studyID"
}
},
{
// Merge the updates back into the dataRecords collection
$merge: {
into: "dataRecords",
whenMatched: "merge",
whenNotMatched: "discard"
}
}
]);
59 changes: 59 additions & 0 deletions mongo_db_script/updateReleaseScript.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
db.release.aggregate([
{
// Lookup to join submissions collection based on submissionID
$lookup: {
from: "submissions",
localField: "submissionID",
foreignField: "_id",
as: "submission_info"
}
},
{
// Unwind to access the submission_info document as a single object
$unwind: "$submission_info"
},
{
// Set the studyID in dataRecords based on the studyID from the submissions
$set: {
"studyID": "$submission_info.studyID"
}
},
{
// Merge the updates back into the release collection
$merge: {
into: "release",
whenMatched: "merge",
whenNotMatched: "discard"
}
}
]);


db.release.updateMany({"nodeType": "program"},
{"$set": {"entityType": "Program"}}
);

db.release.updateMany({"nodeType": "study"},
{"$set": {"entityType": "Study"}}
);

db.release.updateMany({"nodeType": {"$in": [ "participant", "subject", "case"]}
},
{"$set": {"entityType": "Participant"}}
);

db.release.updateMany({"nodeType": {"$in": [ "sample", "specimen"]}
},
{"$set": {"entityType": "Sample"}}
);

db.release.updateMany({"nodeType": {"$in": [ "principal_investigator", "study_personnel"]}
},
{"$set": {"entityType": "Principal Investigator"}}
);

db.release.updateMany({"nodeType": {"$in": [ "file", "data_file", "clinical_measure_file", "cytogenomic_file", "radiology_file", "methylation_array_file", "sequencing_file"]}
}, {"$set": {"entityType": "File"}}
);


25 changes: 6 additions & 19 deletions src/common/mongo_dao.py
Original file line number Diff line number Diff line change
Expand Up @@ -601,7 +601,7 @@ def find_submissions(self, query):
"""
set dataRecords search index, 'submissionID_nodeType_nodeID'
"""
def set_search_index_dataRecords(self, submission_index, crdc_index):
def set_search_index_dataRecords(self, submission_index, crdc_index, study_entity_type_index):
db = self.client[self.db_name]
data_collection = db[DATA_COLlECTION]
try:
Expand All @@ -612,6 +612,9 @@ def set_search_index_dataRecords(self, submission_index, crdc_index):
if not index_dict.get(crdc_index):
result = data_collection.create_index([(DATA_COMMON_NAME), (NODE_TYPE),(NODE_ID)], \
name=crdc_index)
if not index_dict.get(study_entity_type_index):
result = data_collection.create_index([(STUDY_ID), (ENTITY_TYPE),(NODE_ID)], \
name=study_entity_type_index)
return True
except errors.PyMongoError as pe:
self.log.exception(pe)
Expand Down Expand Up @@ -794,25 +797,9 @@ def search_node_by_study(self, studyID, entity_type, node_id):
:return:
"""
db = self.client[self.db_name]
data_collection = db[RELEASE_COLLECTION]
data_collection = db[DATA_COLlECTION]
try:
submissions = self.find_submissions({STUDY_ID: studyID})
if len(submissions) < 2: #if there is only one submission that's own submission, skip.
return None
submission_id_list = [item[ID] for item in submissions]
results = data_collection.find({ENTITY_TYPE: entity_type, NODE_ID: node_id, SUBMISSION_ID: {"$in": submission_id_list}})
released_nodes = [node for node in results if node.get(SUBMISSION_REL_STATUS) != SUBMISSION_REL_STATUS_DELETED ]
if len(released_nodes) == 0:
deleted_submission_ids = [rel[SUBMISSION_ID] for rel in results if rel.get(SUBMISSION_REL_STATUS) == SUBMISSION_REL_STATUS_DELETED ]
submission_id_list = [item for item in submission_id_list if item not in deleted_submission_ids]
if len(submission_id_list) < 2:
return None
# search dataRecords
data_collection = db[DATA_COLlECTION]
rtn_val = data_collection.find_one({ENTITY_TYPE: entity_type, NODE_ID: node_id, SUBMISSION_ID: {"$in": submission_id_list}})
else:
rtn_val = released_nodes[0]
return rtn_val
return data_collection.find_one({STUDY_ID: studyID, ENTITY_TYPE: entity_type, NODE_ID: node_id})
except errors.PyMongoError as pe:
self.log.exception(pe)
self.log.exception(f"Failed to search node for study: {get_exception_msg()}")
Expand Down
4 changes: 2 additions & 2 deletions src/common/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -177,8 +177,8 @@ def get_s3_file_md5(bucket_name, key):
"""
create error dict
"""
def create_error(title, msg):
return {"title": title, "description": msg}
def create_error(title, msg, code, severity, property_name, property_value):
return {"code": code, "severity": severity, "title": title, "offendingProperty": property_name, "offendingValue": property_value, "description": msg, }

"""
dataframe util to remove tailing empty rows and columns
Expand Down
4 changes: 4 additions & 0 deletions src/data_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,10 @@ def get_crdc_id(self, exist_node, node_type, node_id, studyID):
if studyID and node_id and entity_type:
result = self.mongo_dao.search_node_by_study(studyID, entity_type, node_id)
crdc_id = result.get(CRDC_ID) if result else None
# if the crdc_id can't be identified from the existing dataset, a new crdc_id should be generated.
if not crdc_id:
crdc_id = get_uuid_str()

else:
crdc_id = exist_node.get(CRDC_ID)
return crdc_id
Expand Down
69 changes: 37 additions & 32 deletions src/file_validator.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from common.constants import ERRORS, WARNINGS, STATUS, S3_FILE_INFO, ID, SIZE, MD5, UPDATED_AT, \
FILE_NAME, SQS_TYPE, SQS_NAME, FILE_ID, STATUS_ERROR, STATUS_WARNING, STATUS_PASSED, SUBMISSION_ID, \
BATCH_BUCKET, SERVICE_TYPE_FILE, LAST_MODIFIED, CREATED_AT, TYPE, SUBMISSION_INTENTION, SUBMISSION_INTENTION_DELETE,\
VALIDATION_ID, VALIDATION_ENDED, QC_RESULT_ID, BATCH_IDS, VALIDATION_TYPE_FILE, QC_SEVERITY
VALIDATION_ID, VALIDATION_ENDED, QC_RESULT_ID, VALIDATION_TYPE_FILE, QC_SEVERITY, QC_VALIDATE_DATE
from common.utils import get_exception_msg, current_datetime, get_s3_file_info, get_s3_file_md5, create_error, get_uuid_str
from service.ecs_agent import set_scale_in_protection
from metadata_validator import get_qc_result
Expand Down Expand Up @@ -135,6 +135,7 @@ def validate(self, fileRecord):
#check if the file record is valid
if not self.validate_fileRecord(fileRecord):
return STATUS_ERROR
self.get_root_path(fileRecord[SUBMISSION_ID])
#escape file validation if submission intention is Delete
if self.submission.get(SUBMISSION_INTENTION) == SUBMISSION_INTENTION_DELETE:
return STATUS_PASSED
Expand All @@ -146,7 +147,8 @@ def validate(self, fileRecord):
self.log.exception(e)
msg = f"{fileRecord.get(SUBMISSION_ID)}: Failed to validate data file, {fileRecord.get(ID)}! {get_exception_msg()}!"
self.log.exception(msg)
error = create_error("Internal error", "Data file validation failed due to internal errors. Please try again and contact the helpdesk if this error persists.")
error = create_error("Internal error", "Data file validation failed due to internal errors. Please try again and contact the helpdesk if this error persists.",
"F011", "Error", "", "")
self.set_status(fileRecord, STATUS_ERROR, error)
return STATUS_ERROR
finally:
Expand All @@ -159,31 +161,31 @@ def validate_fileRecord(self, fileRecord):
if not fileRecord.get(S3_FILE_INFO):
msg = f'Invalid file object, no s3 file info, {fileRecord[ID]}!'
self.log.error(msg)
error = create_error("Invalid dataRecord", msg)
error = create_error("Invalid dataRecord", msg, "F009", "Error", S3_FILE_INFO, "")
self.set_status(fileRecord, STATUS_ERROR, error)
return False
else:
if not fileRecord[S3_FILE_INFO].get(FILE_NAME) or not fileRecord[S3_FILE_INFO].get(SIZE) \
or not fileRecord[S3_FILE_INFO].get(MD5):
msg = f'Invalid data file object: invalid s3 data file info, {fileRecord[ID]}!'
self.log.error(msg)
error = create_error("Invalid data file info", msg)
error = create_error("Invalid data file info", msg, "F010", FILE_NAME, "")
self.set_status(fileRecord, STATUS_ERROR, error)
return False

if not fileRecord.get(SUBMISSION_ID):
msg = f'Invalid data file object: no submission Id found, {fileRecord[ID]}!'
self.log.error(msg)
error = create_error("Invalid submission Id", msg)
self.set_status(fileRecord, STATUS_ERROR, error)
return False
# following two errors will not happen anymore.
# if not fileRecord.get(SUBMISSION_ID):
# msg = f'Invalid data file object: no submission Id found, {fileRecord[ID]}!'
# self.log.error(msg)
# error = create_error("Invalid submission Id", msg)
# self.set_status(fileRecord, STATUS_ERROR, error, "", "Error", "SUBMISSION_ID", "")
# return False

if not self.get_root_path(fileRecord[SUBMISSION_ID]):
msg = f'Invalid submission object, no rootPath found, {fileRecord[ID]}/{fileRecord[SUBMISSION_ID]}!'
self.log.error(msg)
error = create_error("Invalid submission", msg)
self.set_status(fileRecord, STATUS_ERROR, error)
return False
# if not self.get_root_path(fileRecord[SUBMISSION_ID]):
# msg = f'Invalid submission object, no rootPath found, {fileRecord[ID]}/{fileRecord[SUBMISSION_ID]}!'
# self.log.error(msg)
# error = create_error("Invalid submission", msg)
# self.set_status(fileRecord, STATUS_ERROR, error)
# return False

return True

Expand Down Expand Up @@ -224,7 +226,7 @@ def validate_file(self, fileRecord):
if not self.bucket.file_exists_on_s3(key):
msg = f'Data file “{file_name}” not found.'
self.log.error(msg)
error = create_error("Data file not found", msg)
error = create_error("Data file not found", msg, "F001", "Error", "file", key)
return STATUS_ERROR, error

# 2. check file integrity
Expand All @@ -251,44 +253,44 @@ def validate_file(self, fileRecord):
if int(org_size) != int(size):
msg = f'Data file “{file_name}”: expected size: {org_size}, actual size: {size}.'
self.log.error(msg)
error = create_error("Data file size mismatch", msg)
error = create_error("Data file size mismatch", msg, "F003", "Error", "file size", org_size)
return STATUS_ERROR, error

if org_md5 != md5:
msg = f'Data file “{file_name}”: expected MD5: {org_md5}, actual MD5: {md5}.'
self.log.error(msg)
error = create_error("Data file MD5 mismatch", msg)
error = create_error("Data file MD5 mismatch", msg, "F004", "Error", "md5", org_md5)
return STATUS_ERROR, error

# check duplicates in manifest
manifest_info_list = self.mongo_dao.get_files_by_submission(fileRecord[SUBMISSION_ID])
if not manifest_info_list or len(manifest_info_list) == 0:
msg = f"No data file records found for the submission."
self.log.error(msg)
error = create_error("Data file records not found", msg)
error = create_error("Data file records not found", msg, "F002", "Error", "files", None)
return STATUS_ERROR, error

# 3. check if Same MD5 checksum and same filename
temp_list = [file for file in manifest_info_list if file[S3_FILE_INFO][FILE_NAME] == file_name and file[S3_FILE_INFO][MD5] == org_md5]
if len(temp_list) > 1:
msg = f'Data file “{file_name}”: already exists with the same name and md5 value.'
self.log.warning(msg)
error = create_error("Duplicated data file records detected", msg)
error = create_error("Duplicated data file records detected", msg, "F005", "Warning", "file_name", file_name)
return STATUS_WARNING, error

# 4. check if Same filename but different MD5 checksum
temp_list = [file for file in manifest_info_list if file[S3_FILE_INFO][FILE_NAME] == file_name and file[S3_FILE_INFO][MD5] != org_md5]
if len(temp_list) > 0:
msg = f'Data file “{file_name}”: A data file with the same name but different md5 value was found.'
self.log.warning(msg)
error = create_error("Conflict data file records detected", msg)
error = create_error("Conflict data file records detected", msg, "F006", "Warning", "file_name", file_name)
return STATUS_WARNING, error

# 5. check if Same MD5 checksum but different filename
temp_list = [file for file in manifest_info_list if file[S3_FILE_INFO][FILE_NAME] != file_name and file[S3_FILE_INFO][MD5] == org_md5]
if len(temp_list) > 0:
msg = f'Data file “{file_name}”: another data file with the same MD5 found.'
error = create_error("Duplicated data file content detected", msg)
error = create_error("Duplicated data file content detected", msg, "F007", "Warning", "file_name", file_name)
self.log.warning(msg)
return STATUS_WARNING, error

Expand All @@ -302,11 +304,13 @@ def validate_file(self, fileRecord):
def validate_all_files(self, submission_id):
errors = []
missing_count = 0
if not self.get_root_path(submission_id):
msg = f'Invalid submission object, no rootPath found, {submission_id}!'
self.log.error(msg)
error = create_error("Invalid submission", msg)
return STATUS_ERROR, [error]
# this error will not happen anymore
# if not self.get_root_path(submission_id):
# msg = f'Invalid submission object, no rootPath found, {submission_id}!'
# self.log.error(msg)
# error = create_error("Invalid submission", msg, "", "Error", SUBMISSION_ID, submission_id)
# return STATUS_ERROR, [error]
self.get_root_path(submission_id)
key = os.path.join(os.path.join(self.rootPath, f"file/"))

try:
Expand Down Expand Up @@ -355,7 +359,7 @@ def validate_all_files(self, submission_id):
"severity": "Error",
"uploadedDate": file.last_modified,
"validatedDate": current_datetime(),
"errors": [create_error("Orphaned file found", msg)]
"errors": [create_error("Orphaned file found", msg, "F008", "Error", "displayID", batchID)]
}
errors.append(error)
missing_count += 1
Expand All @@ -377,7 +381,8 @@ def validate_all_files(self, submission_id):
self.log.exception(e)
msg = f"{submission_id}: Failed to validate data files! {get_exception_msg()}!"
self.log.exception(msg)
error = create_error("Internal error", "Data file validation failed due to internal errors. Please try again and contact the helpdesk if this error persists.")
error = create_error("Internal error", "Data file validation failed due to internal errors. Please try again and contact the helpdesk if this error persists.",
"F011", "Error", "", "")
return None, [error]

def set_status(self, record, qc_result, status, error):
Expand Down Expand Up @@ -414,5 +419,5 @@ def save_qc_result(self, fileRecord, status, error):
fileRecord[S3_FILE_INFO][QC_RESULT_ID] = None
if qc_result: # save QC result
fileRecord[S3_FILE_INFO][QC_RESULT_ID] = qc_result[ID]
qc_result["validatedDate"] = current_datetime()
qc_result[QC_VALIDATE_DATE] = current_datetime()
self.mongo_dao.save_qc_results([qc_result])
Loading