From daa1f83c84b4ae5bd0059ea6c7f7177e9d9a8b43 Mon Sep 17 00:00:00 2001 From: Evelin Aasna Date: Wed, 26 Jun 2024 16:09:09 +0200 Subject: [PATCH] Constructor for shared_query_record --- include/valik/search/iterate_queries.hpp | 66 +++++++----------------- include/valik/search/query_record.hpp | 33 ++++++++++++ include/valik/search/search_local.hpp | 7 +-- 3 files changed, 55 insertions(+), 51 deletions(-) diff --git a/include/valik/search/iterate_queries.hpp b/include/valik/search/iterate_queries.hpp index 175a5d67..7bfa5fdb 100644 --- a/include/valik/search/iterate_queries.hpp +++ b/include/valik/search/iterate_queries.hpp @@ -46,12 +46,11 @@ void iterate_distributed_queries(search_arguments const & arguments, * @param arguments Command line arguments. * @param queue Shopping cart queue for sending queries over to Stellar search. */ -template -void iterate_all_queries(count_t const ref_seg_count, +template +void iterate_all_queries(size_t const ref_seg_count, search_arguments const & arguments, - cart_queue>> & queue) + cart_queue> & queue) { - using TSequence = seqan2::String; using TId = seqan2::CharString; std::vector> query_records{}; constexpr uint64_t chunk_size = (1ULL << 20) * 10; @@ -73,20 +72,11 @@ void iterate_all_queries(count_t const ref_seg_count, readRecord(id, seq, inSeqs); idsUnique &= stellar::_checkUniqueId(uniqueIds, id); - auto query_ptr = std::make_shared(std::move(seq)); - std::vector seg_vec{}; - for (auto & c : *query_ptr) - { - seqan3::dna4 nuc; - nuc.assign_char(c); - seg_vec.push_back(nuc); - } - - query_records.emplace_back(seqan2::toCString(id), std::move(seg_vec), seqan2::infix(*query_ptr, 0, seqan2::length(*query_ptr)), query_ptr); + query_records.emplace_back(std::move(seq), seqan2::toCString(std::move(id))); if (query_records.size() > chunk_size) { - search_all_parallel>>(ref_seg_count, arguments, query_records, queue); + search_all_parallel>(ref_seg_count, arguments, query_records, queue); query_records.clear(); } } @@ -94,7 +84,7 @@ void iterate_all_queries(count_t const ref_seg_count, if (!idsUnique) std::cerr << "WARNING: Non-unique query ids. Output can be ambiguous.\n"; - search_all_parallel>>(ref_seg_count, arguments, query_records, queue); + search_all_parallel>(ref_seg_count, arguments, query_records, queue); } /** @@ -106,13 +96,12 @@ void iterate_all_queries(count_t const ref_seg_count, * @param thresholder Threshold for number of shared k-mers. * @param queue Shopping cart queue for load balancing between Valik prefiltering and Stellar search. */ -template +template void iterate_short_queries(search_arguments const & arguments, ibf_t const & ibf, raptor::threshold::threshold const & thresholder, - cart_queue>> & queue) + cart_queue> & queue) { - using TSequence = seqan2::String; using TId = seqan2::CharString; std::vector> query_records{}; constexpr uint64_t chunk_size = (1ULL << 20) * 10; @@ -134,21 +123,11 @@ void iterate_short_queries(search_arguments const & arguments, readRecord(id, seq, inSeqs); idsUnique &= stellar::_checkUniqueId(uniqueIds, id); - auto query_ptr = std::make_shared(std::move(seq)); - std::vector seg_vec{}; - for (auto & c : *query_ptr) - { - seqan3::dna4 nuc; - nuc.assign_char(c); - seg_vec.push_back(nuc); - } - - query_records.emplace_back(seqan2::toCString(id), std::move(seg_vec), seqan2::infix(*query_ptr, 0, seqan2::length(*query_ptr)), query_ptr); + query_records.emplace_back(std::move(seq), seqan2::toCString(std::move(id))); if (query_records.size() > chunk_size) { - prefilter_queries_parallel>>(ibf, arguments, query_records, thresholder, queue); - + prefilter_queries_parallel>(ibf, arguments, query_records, thresholder, queue); query_records.clear(); } } @@ -156,7 +135,7 @@ void iterate_short_queries(search_arguments const & arguments, if (!idsUnique) std::cerr << "WARNING: Non-unique query ids. Output can be ambiguous.\n"; - prefilter_queries_parallel>>(ibf, arguments, query_records, thresholder, queue); + prefilter_queries_parallel>(ibf, arguments, query_records, thresholder, queue); } /** @@ -169,14 +148,13 @@ void iterate_short_queries(search_arguments const & arguments, * @param queue Shopping cart queue for load balancing between Valik prefiltering and Stellar search. * @param meta Metadata table for split query segments. */ -template +template void iterate_split_queries(search_arguments const & arguments, ibf_t const & ibf, raptor::threshold::threshold const & thresholder, - cart_queue>> & queue, + cart_queue> & queue, metadata & meta) { - using TSequence = seqan2::String; using TId = seqan2::CharString; std::vector> query_records{}; constexpr uint64_t chunk_size = (1ULL << 20) * 10; @@ -202,21 +180,13 @@ void iterate_split_queries(search_arguments const & arguments, for (auto const & seg : meta.segments_from_ind(seqCount)) { - seqan2::Segment inf = seqan2::infixWithLength(*query_ptr, seg.start, seg.len); - std::vector seg_vec{}; - for (auto & c : inf) - { - seqan3::dna4 nuc; - nuc.assign_char(c); - seg_vec.push_back(nuc); - } - - query_records.emplace_back(seqan2::toCString(id), std::move(seg_vec), inf, query_ptr); + // each split query record contains a copy of the same shared pointer + query_records.emplace_back(seqan2::toCString(std::move(id)), seg, query_ptr); if (query_records.size() > chunk_size) { - prefilter_queries_parallel>>(ibf, arguments, query_records, thresholder, queue); - query_records.clear(); + prefilter_queries_parallel>(ibf, arguments, query_records, thresholder, queue); + query_records.clear(); // shared pointers are erased -> memory is deallocated } } } @@ -224,7 +194,7 @@ void iterate_split_queries(search_arguments const & arguments, if (!idsUnique) std::cerr << "WARNING: Non-unique query ids. Output can be ambiguous.\n"; - prefilter_queries_parallel>>(ibf, arguments, query_records, thresholder, queue); + prefilter_queries_parallel>(ibf, arguments, query_records, thresholder, queue); } } // namespace valik::app diff --git a/include/valik/search/query_record.hpp b/include/valik/search/query_record.hpp index 392724e6..11f60d4f 100644 --- a/include/valik/search/query_record.hpp +++ b/include/valik/search/query_record.hpp @@ -31,6 +31,39 @@ struct shared_query_record std::vector sequence; seqan2::Segment querySegment; std::shared_ptr underlyingData; + + shared_query_record(TSequence seq, std::string id) : sequence_id(std::move(id)) + { + // make_shared returns a newly allocated object + auto query_ptr = std::make_shared(std::move(seq)); + std::vector seg_vec{}; + for (auto & c : *query_ptr) + { + seqan3::dna4 nuc; + nuc.assign_char(c); + seg_vec.push_back(nuc); + } + + sequence = std::move(seg_vec); + querySegment = seqan2::infix(*query_ptr, 0, seqan2::length(*query_ptr)); + underlyingData = query_ptr; + } + + shared_query_record(std::string id, metadata::segment_stats const & seg, std::shared_ptr query_ptr) : sequence_id(std::move(id)) + { + seqan2::Segment inf = seqan2::infixWithLength(*query_ptr, seg.start, seg.len); + std::vector seg_vec{}; + for (auto & c : inf) + { + seqan3::dna4 nuc; + nuc.assign_char(c); + seg_vec.push_back(nuc); + } + + sequence = std::move(seg_vec); + querySegment = inf; + underlyingData = query_ptr; + } }; } // namespace valik diff --git a/include/valik/search/search_local.hpp b/include/valik/search/search_local.hpp index adcfd2cc..3194ba8e 100644 --- a/include/valik/search/search_local.hpp +++ b/include/valik/search/search_local.hpp @@ -400,18 +400,19 @@ bool search_local(search_arguments & arguments, search_time_statistics & time_st // producer threads are created here if constexpr (stellar_only) { - iterate_all_queries(ref_meta.seg_count, arguments, queue); + iterate_all_queries(ref_meta.seg_count, arguments, queue); } else { + using ibf_t = decltype(index.ibf()); raptor::threshold::threshold const thresholder{arguments.make_threshold_parameters()}; if constexpr (is_split) { - iterate_split_queries(arguments, index.ibf(), thresholder, queue, query_meta.value()); + iterate_split_queries(arguments, index.ibf(), thresholder, queue, query_meta.value()); } else { - iterate_short_queries(arguments, index.ibf(), thresholder, queue); + iterate_short_queries(arguments, index.ibf(), thresholder, queue); } }