Skip to content

Commit

Permalink
Merge pull request #211 from sanger/develop
Browse files Browse the repository at this point in the history
Develop to master
  • Loading branch information
emrojo authored Jan 14, 2021
2 parents 28a27f3 + 51b53b1 commit b53222f
Show file tree
Hide file tree
Showing 5 changed files with 65 additions and 50 deletions.
2 changes: 1 addition & 1 deletion crawler/sql_queries.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@
"""

SQL_DART_GET_PLATE_BARCODES = """\
SELECT DISTINCT [Labware LIMS BARCODE] FROM dbo.LIMS_test_plate_status WHERE [Labware state] = ?
SELECT DISTINCT [Labware LIMS BARCODE] FROM dbo.view_plate_maps WHERE [Labware state] = ?
"""

SQL_DART_SET_WELL_PROPERTY = "{CALL dbo.plDART_PlateUpdateWell (?,?,?,?)}"
Expand Down
26 changes: 12 additions & 14 deletions migrations/helpers/dart_samples_update_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
FIELD_LH_SOURCE_PLATE_UUID,
FIELD_MONGODB_ID,
FIELD_PLATE_BARCODE,
FIELD_RESULT,
FIELD_ROOT_SAMPLE_ID,
FIELD_UPDATED_AT,
MONGO_DATETIME_FORMAT,
Expand Down Expand Up @@ -43,13 +42,14 @@
# * Do not add samples to DART which have already been cherry-picked
# * Only add positive samples to DART
####
# 1. get samples from mongo between these time ranges that are positive
# 1. get samples from mongo between these time ranges
# 2. of these, find which have been cherry-picked and remove them from the list
# 3. add the UUID fields if not present
# 4. update samples in mongo updated in either of the above two steps (would expect the same set of samples from both
# steps)
# 5. update the MLWH (should be an idempotent operation)
# 6. add all the plates of the positive samples we've selected in step 1 above, to DART
# 6. add all the plates with non-cherrypicked samples (determined in step 2) to DART, as well as any positive samples
# in these plates


logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -86,8 +86,8 @@ def migrate_all_dbs(config, s_start_datetime: str = "", s_end_datetime: str = ""

samples_collection = get_mongo_collection(mongo_db, COLLECTION_SAMPLES)

# 1. get samples from mongo between these time ranges that are positive
samples = get_positive_samples(samples_collection, start_datetime, end_datetime)
# 1. get samples from mongo between these time ranges
samples = get_samples(samples_collection, start_datetime, end_datetime)

if not samples:
logger.info("No samples in this time range.")
Expand Down Expand Up @@ -141,7 +141,8 @@ def migrate_all_dbs(config, s_start_datetime: str = "", s_end_datetime: str = ""
# 5. update the MLWH (should be an idempotent operation)
run_mysql_executemany_query(mlwh_conn, SQL_MLWH_MULTIPLE_INSERT, mongo_docs_for_sql)

# 6. add all the plates of the positive samples we've selected in step 1 above, to DART
# 6. add all the plates with non-cherrypicked samples (determined in step 2) to DART, as well as any
# positive samples in these plates
update_dart_fields(config, samples)
else:
logger.info("No documents found for this timestamp range, nothing to insert or update in MLWH or DART")
Expand All @@ -150,16 +151,13 @@ def migrate_all_dbs(config, s_start_datetime: str = "", s_end_datetime: str = ""
logger.exception(e)


def get_positive_samples(
samples_collection: Collection, start_datetime: datetime, end_datetime: datetime
) -> List[Sample]:
logger.debug(f"Selecting positive samples between {start_datetime} and {end_datetime}")
def get_samples(samples_collection: Collection, start_datetime: datetime, end_datetime: datetime) -> List[Sample]:
logger.debug(f"Selecting samples between {start_datetime} and {end_datetime}")

match = {
"$match": {
# 1. First filter by the start and end dates
# Filter by the start and end dates
FIELD_CREATED_AT: {"$gte": start_datetime, "$lte": end_datetime},
FIELD_RESULT: {"$regex": "^positive", "$options": "i"},
}
}

Expand All @@ -175,11 +173,11 @@ def add_sample_uuid_field(samples: List[Sample]) -> List[Sample]:


def update_mongo_fields(mongo_db, samples: List[Sample]) -> bool:
"""Bulk updates sample filtered positive fields in the Mongo database
"""Bulk updates sample uuid fields in the Mongo database
Arguments:
config {ModuleType} -- application config specifying database details
samples {List[Sample]} -- the list of samples whose filtered positive fields should be updated
samples {List[Sample]} -- the list of samples whose uuid fields should be updated
Returns:
bool -- whether the updates completed successfully
Expand Down
27 changes: 15 additions & 12 deletions migrations/helpers/update_filtered_positives_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,9 @@ def remove_cherrypicked_samples(config: ModuleType, samples: List[Sample]) -> Li
root_sample_ids, plate_barcodes = extract_required_cp_info(samples)
cp_samples_df = get_cherrypicked_samples(config, list(root_sample_ids), list(plate_barcodes))

if cp_samples_df is not None and not cp_samples_df.empty:
if cp_samples_df is None:
raise Exception("Unable to determine cherry-picked samples - potentially error connecting to MySQL")
elif not cp_samples_df.empty:
cp_samples = cp_samples_df[[FIELD_ROOT_SAMPLE_ID, FIELD_PLATE_BARCODE]].to_numpy().tolist()
return remove_cp_samples(samples, cp_samples)
else:
Expand Down Expand Up @@ -243,17 +245,18 @@ def update_dart_fields(config: ModuleType, samples: List[Sample]) -> bool:
)
if plate_state == DART_STATE_PENDING:
for sample in samples_in_plate:
well_index = get_dart_well_index(sample.get(FIELD_COORDINATE, None))
if well_index is not None:
well_props = map_mongo_doc_to_dart_well_props(sample)
set_dart_well_properties(
cursor, plate_barcode, well_props, well_index # type:ignore
)
else:
raise ValueError(
"Unable to determine DART well index for sample "
f"{sample[FIELD_ROOT_SAMPLE_ID]} in plate {plate_barcode}"
)
if sample[FIELD_RESULT] == POSITIVE_RESULT_VALUE:
well_index = get_dart_well_index(sample.get(FIELD_COORDINATE, None))
if well_index is not None:
well_props = map_mongo_doc_to_dart_well_props(sample)
set_dart_well_properties(
cursor, plate_barcode, well_props, well_index # type:ignore
)
else:
raise ValueError(
"Unable to determine DART well index for sample "
f"{sample[FIELD_ROOT_SAMPLE_ID]} in plate {plate_barcode}"
)
cursor.commit()
dart_updated_successfully &= True
except Exception as e:
Expand Down
10 changes: 5 additions & 5 deletions tests/migrations/helpers/test_dart_samples_update_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
)
from migrations.helpers.dart_samples_update_helper import (
add_sample_uuid_field,
get_positive_samples,
get_samples,
migrate_all_dbs,
new_mongo_source_plate,
samples_updated_with_source_plate_uuids,
Expand Down Expand Up @@ -99,7 +99,7 @@ def generate_example_samples(range, start_datetime):
# ----- helper method tests -----


def test_get_positive_samples(mongo_database):
def test_get_samples(mongo_database):
_, mongo_db = mongo_database

start_datetime = datetime(year=2020, month=5, day=10, hour=15, minute=10)
Expand All @@ -110,8 +110,8 @@ def test_get_positive_samples(mongo_database):

assert mongo_db.samples.count_documents({}) == 8

# although 6 samples would be created, test that we are selecting only a subset using dates
assert len(get_positive_samples(mongo_db.samples, start_datetime, (start_datetime + timedelta(days=2)))) == 4
# although 6 samples would be created, test that we are selecting only a subset using dates, including the -ve one
assert len(get_samples(mongo_db.samples, start_datetime, (start_datetime + timedelta(days=2)))) == 5


def test_add_sample_uuid_field():
Expand Down Expand Up @@ -346,7 +346,7 @@ def test_migrate_all_dbs_with_cherry_picked_samples_mocked(

migrate_all_dbs(config, start_datetime.strftime("%y%m%d_%H%M"), end_datetime.strftime("%y%m%d_%H%M"))

samples = get_positive_samples(mongo_db.samples, start_datetime, end_datetime)
samples = get_samples(mongo_db.samples, start_datetime, end_datetime)

mock_remove_cherrypicked_samples.assert_called_once_with(
samples, [[cherry_picked_sample[FIELD_ROOT_SAMPLE_ID][0], cherry_picked_sample[FIELD_PLATE_BARCODE][0]]]
Expand Down
50 changes: 32 additions & 18 deletions tests/migrations/helpers/test_update_filtered_positives_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
FIELD_RNA_ID,
FIELD_ROOT_SAMPLE_ID,
FIELD_SOURCE,
FIELD_RESULT,
MLWH_COORDINATE,
MLWH_FILTERED_POSITIVE,
MLWH_FILTERED_POSITIVE_TIMESTAMP,
Expand Down Expand Up @@ -154,8 +155,8 @@ def test_remove_cherrypicked_samples_throws_for_error_getting_cherrypicked_sampl

def test_remove_cherrypicked_samples_returns_input_samples_with_none_cp_samples_df(config, testing_samples):
with patch("migrations.helpers.update_filtered_positives_helper.get_cherrypicked_samples", return_value=None):
result = remove_cherrypicked_samples(config, testing_samples)
assert result == testing_samples
with pytest.raises(Exception):
remove_cherrypicked_samples(config, testing_samples)


def test_remove_cherrypicked_samples_returns_input_samples_with_empty_cp_samples_df(config, testing_samples):
Expand Down Expand Up @@ -373,28 +374,28 @@ def test_biomek_labclass_by_centre_name(config):
assert labclass_by_name["test centre 2"] == "test class 2"


# ----- test update_dart_filtered_positive_fields method -----
# ----- test update_dart_fields method -----


def test_update_dart_filtered_positive_fields_throws_with_error_connecting_to_dart(config, mock_dart_conn):
def test_update_dart_fields_throws_with_error_connecting_to_dart(config, mock_dart_conn):
mock_dart_conn.side_effect = NotImplementedError("Boom!")
with pytest.raises(Exception):
update_dart_fields(config, [])


def test_update_dart_filtered_positive_fields_throws_no_dart_connection(config, mock_dart_conn):
def test_update_dart_fields_throws_no_dart_connection(config, mock_dart_conn):
mock_dart_conn.return_value = None
with pytest.raises(ValueError):
update_dart_fields(config, [])


def test_update_dart_filtered_positive_fields_returns_false_with_error_creating_cursor(config, mock_dart_conn):
def test_update_dart_fields_returns_false_with_error_creating_cursor(config, mock_dart_conn):
mock_dart_conn().cursor.side_effect = NotImplementedError("Boom!")
result = update_dart_fields(config, [])
assert result is False


def test_update_dart_filtered_positive_fields_returns_false_error_adding_plate(config, mock_dart_conn):
def test_update_dart_fields_returns_false_error_adding_plate(config, mock_dart_conn):
with patch(
"migrations.helpers.update_filtered_positives_helper.add_dart_plate_if_doesnt_exist",
side_effect=Exception("Boom!"),
Expand All @@ -404,7 +405,7 @@ def test_update_dart_filtered_positive_fields_returns_false_error_adding_plate(c
assert result is False


def test_update_dart_filtered_positive_fields_non_pending_plate_does_not_update_wells(config, mock_dart_conn):
def test_update_dart_fields_non_pending_plate_does_not_update_wells(config, mock_dart_conn):
with patch(
"migrations.helpers.update_filtered_positives_helper.add_dart_plate_if_doesnt_exist",
return_value="not pending",
Expand All @@ -420,7 +421,7 @@ def test_update_dart_filtered_positive_fields_non_pending_plate_does_not_update_
assert result is True


def test_update_dart_filtered_positive_fields_returns_false_unable_to_determine_well_index(config, mock_dart_conn):
def test_update_dart_fields_returns_false_unable_to_determine_well_index(config, mock_dart_conn):
with patch(
"migrations.helpers.update_filtered_positives_helper.add_dart_plate_if_doesnt_exist",
return_value=DART_STATE_PENDING,
Expand All @@ -440,7 +441,7 @@ def test_update_dart_filtered_positive_fields_returns_false_unable_to_determine_
assert result is False


def test_update_dart_filtered_positive_fields_returns_false_error_mapping_to_well_props(config, mock_dart_conn):
def test_update_dart_fields_returns_false_error_mapping_to_well_props(config, mock_dart_conn):
with patch(
"migrations.helpers.update_filtered_positives_helper.add_dart_plate_if_doesnt_exist",
return_value=DART_STATE_PENDING,
Expand All @@ -464,7 +465,7 @@ def test_update_dart_filtered_positive_fields_returns_false_error_mapping_to_wel
assert result is False


def test_update_dart_filtered_positive_fields_returns_false_error_adding_well_properties(config, mock_dart_conn):
def test_update_dart_fields_returns_false_error_adding_well_properties(config, mock_dart_conn):
with patch(
"migrations.helpers.update_filtered_positives_helper.add_dart_plate_if_doesnt_exist",
return_value=DART_STATE_PENDING,
Expand All @@ -487,7 +488,7 @@ def test_update_dart_filtered_positive_fields_returns_false_error_adding_well_pr
assert result is False


def test_update_dart_filtered_positive_fields_returns_true_multiple_new_plates(config, mock_dart_conn):
def test_update_dart_fields_returns_true_multiple_new_plates(config, mock_dart_conn):
with patch("migrations.helpers.update_filtered_positives_helper.add_dart_plate_if_doesnt_exist") as mock_add_plate:
mock_add_plate.return_value = DART_STATE_PENDING
with patch("migrations.helpers.update_filtered_positives_helper.get_dart_well_index") as mock_get_well_index:
Expand All @@ -508,16 +509,19 @@ def test_update_dart_filtered_positive_fields_returns_true_multiple_new_plates(c
FIELD_PLATE_BARCODE: "123",
FIELD_SOURCE: test_centre_name,
FIELD_COORDINATE: "A01",
FIELD_RESULT: POSITIVE_RESULT_VALUE,
},
{
FIELD_PLATE_BARCODE: "ABC",
FIELD_SOURCE: test_centre_name,
FIELD_COORDINATE: "B03",
FIELD_RESULT: POSITIVE_RESULT_VALUE,
},
{
FIELD_PLATE_BARCODE: "XYZ",
FIELD_SOURCE: test_centre_name,
FIELD_COORDINATE: "E11",
FIELD_RESULT: POSITIVE_RESULT_VALUE,
},
]

Expand Down Expand Up @@ -546,7 +550,7 @@ def test_update_dart_filtered_positive_fields_returns_true_multiple_new_plates(c
assert result is True


def test_update_dart_filtered_positive_fields_returns_true_single_new_plate_multiple_wells(config, mock_dart_conn):
def test_update_dart_fields_returns_true_single_new_plate_multiple_wells(config, mock_dart_conn):
with patch("migrations.helpers.update_filtered_positives_helper.add_dart_plate_if_doesnt_exist") as mock_add_plate:
mock_add_plate.return_value = DART_STATE_PENDING
with patch("migrations.helpers.update_filtered_positives_helper.get_dart_well_index") as mock_get_well_index:
Expand All @@ -568,16 +572,25 @@ def test_update_dart_filtered_positive_fields_returns_true_single_new_plate_mult
FIELD_PLATE_BARCODE: test_plate_barcode,
FIELD_SOURCE: test_centre_name,
FIELD_COORDINATE: "A01",
FIELD_RESULT: POSITIVE_RESULT_VALUE,
},
{
FIELD_PLATE_BARCODE: test_plate_barcode,
FIELD_SOURCE: test_centre_name,
FIELD_COORDINATE: "B03",
FIELD_RESULT: POSITIVE_RESULT_VALUE,
},
{
FIELD_PLATE_BARCODE: test_plate_barcode,
FIELD_SOURCE: test_centre_name,
FIELD_COORDINATE: "E11",
FIELD_RESULT: POSITIVE_RESULT_VALUE,
},
{
FIELD_PLATE_BARCODE: test_plate_barcode,
FIELD_SOURCE: test_centre_name,
FIELD_COORDINATE: "G07",
FIELD_RESULT: "not positive",
},
]

Expand All @@ -587,11 +600,12 @@ def test_update_dart_filtered_positive_fields_returns_true_single_new_plate_mult
mock_dart_conn().cursor(), test_plate_barcode, test_labware_class
)

num_samples = len(samples)
assert mock_get_well_index.call_count == num_samples
assert mock_map.call_count == num_samples
assert mock_set_well_props.call_count == num_samples
for sample in samples:
pos_samples = samples[:-1]
num_pos_samples = len(pos_samples)
assert mock_get_well_index.call_count == num_pos_samples
assert mock_map.call_count == num_pos_samples
assert mock_set_well_props.call_count == num_pos_samples
for sample in pos_samples:
mock_get_well_index.assert_any_call(sample[FIELD_COORDINATE])
mock_map.assert_any_call(sample)
mock_set_well_props.assert_any_call(
Expand Down

0 comments on commit b53222f

Please sign in to comment.