Skip to content

Commit

Permalink
Constructor for shared_query_record
Browse files Browse the repository at this point in the history
  • Loading branch information
eaasna committed Jun 26, 2024
1 parent 0361b81 commit daa1f83
Show file tree
Hide file tree
Showing 3 changed files with 55 additions and 51 deletions.
66 changes: 18 additions & 48 deletions include/valik/search/iterate_queries.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,12 +46,11 @@ void iterate_distributed_queries(search_arguments const & arguments,
* @param arguments Command line arguments.
* @param queue Shopping cart queue for sending queries over to Stellar search.
*/
template <typename count_t>
void iterate_all_queries(count_t const ref_seg_count,
template <typename TSequence>
void iterate_all_queries(size_t const ref_seg_count,
search_arguments const & arguments,
cart_queue<shared_query_record<seqan2::String<seqan2::Dna>>> & queue)
cart_queue<shared_query_record<TSequence>> & queue)
{
using TSequence = seqan2::String<seqan2::Dna>;
using TId = seqan2::CharString;
std::vector<shared_query_record<TSequence>> query_records{};
constexpr uint64_t chunk_size = (1ULL << 20) * 10;
Expand All @@ -73,28 +72,19 @@ void iterate_all_queries(count_t const ref_seg_count,
readRecord(id, seq, inSeqs);
idsUnique &= stellar::_checkUniqueId(uniqueIds, id);

auto query_ptr = std::make_shared<TSequence>(std::move(seq));
std::vector<seqan3::dna4> seg_vec{};
for (auto & c : *query_ptr)
{
seqan3::dna4 nuc;
nuc.assign_char(c);
seg_vec.push_back(nuc);
}

query_records.emplace_back(seqan2::toCString(id), std::move(seg_vec), seqan2::infix(*query_ptr, 0, seqan2::length(*query_ptr)), query_ptr);
query_records.emplace_back(std::move(seq), seqan2::toCString(std::move(id)));

if (query_records.size() > chunk_size)
{
search_all_parallel<shared_query_record<seqan2::String<seqan2::Dna>>>(ref_seg_count, arguments, query_records, queue);
search_all_parallel<shared_query_record<TSequence>>(ref_seg_count, arguments, query_records, queue);
query_records.clear();
}
}

if (!idsUnique)
std::cerr << "WARNING: Non-unique query ids. Output can be ambiguous.\n";

search_all_parallel<shared_query_record<seqan2::String<seqan2::Dna>>>(ref_seg_count, arguments, query_records, queue);
search_all_parallel<shared_query_record<TSequence>>(ref_seg_count, arguments, query_records, queue);
}

/**
Expand All @@ -106,13 +96,12 @@ void iterate_all_queries(count_t const ref_seg_count,
* @param thresholder Threshold for number of shared k-mers.
* @param queue Shopping cart queue for load balancing between Valik prefiltering and Stellar search.
*/
template <typename ibf_t>
template <typename ibf_t, typename TSequence>
void iterate_short_queries(search_arguments const & arguments,
ibf_t const & ibf,
raptor::threshold::threshold const & thresholder,
cart_queue<shared_query_record<seqan2::String<seqan2::Dna>>> & queue)
cart_queue<shared_query_record<TSequence>> & queue)
{
using TSequence = seqan2::String<seqan2::Dna>;
using TId = seqan2::CharString;
std::vector<shared_query_record<TSequence>> query_records{};
constexpr uint64_t chunk_size = (1ULL << 20) * 10;
Expand All @@ -134,29 +123,19 @@ void iterate_short_queries(search_arguments const & arguments,
readRecord(id, seq, inSeqs);
idsUnique &= stellar::_checkUniqueId(uniqueIds, id);

auto query_ptr = std::make_shared<TSequence>(std::move(seq));
std::vector<seqan3::dna4> seg_vec{};
for (auto & c : *query_ptr)
{
seqan3::dna4 nuc;
nuc.assign_char(c);
seg_vec.push_back(nuc);
}

query_records.emplace_back(seqan2::toCString(id), std::move(seg_vec), seqan2::infix(*query_ptr, 0, seqan2::length(*query_ptr)), query_ptr);
query_records.emplace_back(std::move(seq), seqan2::toCString(std::move(id)));

if (query_records.size() > chunk_size)
{
prefilter_queries_parallel<shared_query_record<seqan2::String<seqan2::Dna>>>(ibf, arguments, query_records, thresholder, queue);

prefilter_queries_parallel<shared_query_record<TSequence>>(ibf, arguments, query_records, thresholder, queue);
query_records.clear();
}
}

if (!idsUnique)
std::cerr << "WARNING: Non-unique query ids. Output can be ambiguous.\n";

prefilter_queries_parallel<shared_query_record<seqan2::String<seqan2::Dna>>>(ibf, arguments, query_records, thresholder, queue);
prefilter_queries_parallel<shared_query_record<TSequence>>(ibf, arguments, query_records, thresholder, queue);
}

/**
Expand All @@ -169,14 +148,13 @@ void iterate_short_queries(search_arguments const & arguments,
* @param queue Shopping cart queue for load balancing between Valik prefiltering and Stellar search.
* @param meta Metadata table for split query segments.
*/
template <typename ibf_t>
template <typename ibf_t, typename TSequence>
void iterate_split_queries(search_arguments const & arguments,
ibf_t const & ibf,
raptor::threshold::threshold const & thresholder,
cart_queue<shared_query_record<seqan2::String<seqan2::Dna>>> & queue,
cart_queue<shared_query_record<TSequence>> & queue,
metadata & meta)
{
using TSequence = seqan2::String<seqan2::Dna>;
using TId = seqan2::CharString;
std::vector<shared_query_record<TSequence>> query_records{};
constexpr uint64_t chunk_size = (1ULL << 20) * 10;
Expand All @@ -202,29 +180,21 @@ void iterate_split_queries(search_arguments const & arguments,

for (auto const & seg : meta.segments_from_ind(seqCount))
{
seqan2::Segment<TSequence const, seqan2::InfixSegment> inf = seqan2::infixWithLength(*query_ptr, seg.start, seg.len);
std::vector<seqan3::dna4> seg_vec{};
for (auto & c : inf)
{
seqan3::dna4 nuc;
nuc.assign_char(c);
seg_vec.push_back(nuc);
}

query_records.emplace_back(seqan2::toCString(id), std::move(seg_vec), inf, query_ptr);
// each split query record contains a copy of the same shared pointer
query_records.emplace_back(seqan2::toCString(std::move(id)), seg, query_ptr);

if (query_records.size() > chunk_size)
{
prefilter_queries_parallel<shared_query_record<seqan2::String<seqan2::Dna>>>(ibf, arguments, query_records, thresholder, queue);
query_records.clear();
prefilter_queries_parallel<shared_query_record<TSequence>>(ibf, arguments, query_records, thresholder, queue);
query_records.clear(); // shared pointers are erased -> memory is deallocated
}
}
}

if (!idsUnique)
std::cerr << "WARNING: Non-unique query ids. Output can be ambiguous.\n";

prefilter_queries_parallel<shared_query_record<seqan2::String<seqan2::Dna>>>(ibf, arguments, query_records, thresholder, queue);
prefilter_queries_parallel<shared_query_record<TSequence>>(ibf, arguments, query_records, thresholder, queue);
}

} // namespace valik::app
33 changes: 33 additions & 0 deletions include/valik/search/query_record.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,39 @@ struct shared_query_record
std::vector<seqan3::dna4> sequence;
seqan2::Segment<TSequence const, seqan2::InfixSegment> querySegment;
std::shared_ptr<TSequence> underlyingData;

shared_query_record(TSequence seq, std::string id) : sequence_id(std::move(id))
{
// make_shared returns a newly allocated object
auto query_ptr = std::make_shared<TSequence>(std::move(seq));
std::vector<seqan3::dna4> seg_vec{};
for (auto & c : *query_ptr)
{
seqan3::dna4 nuc;
nuc.assign_char(c);
seg_vec.push_back(nuc);
}

sequence = std::move(seg_vec);
querySegment = seqan2::infix(*query_ptr, 0, seqan2::length(*query_ptr));
underlyingData = query_ptr;
}

shared_query_record(std::string id, metadata::segment_stats const & seg, std::shared_ptr<TSequence> query_ptr) : sequence_id(std::move(id))
{
seqan2::Segment<TSequence const, seqan2::InfixSegment> inf = seqan2::infixWithLength(*query_ptr, seg.start, seg.len);
std::vector<seqan3::dna4> seg_vec{};
for (auto & c : inf)
{
seqan3::dna4 nuc;
nuc.assign_char(c);
seg_vec.push_back(nuc);
}

sequence = std::move(seg_vec);
querySegment = inf;
underlyingData = query_ptr;
}
};

} // namespace valik
7 changes: 4 additions & 3 deletions include/valik/search/search_local.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -400,18 +400,19 @@ bool search_local(search_arguments & arguments, search_time_statistics & time_st
// producer threads are created here
if constexpr (stellar_only)
{
iterate_all_queries(ref_meta.seg_count, arguments, queue);
iterate_all_queries<TSequence>(ref_meta.seg_count, arguments, queue);
}
else
{
using ibf_t = decltype(index.ibf());
raptor::threshold::threshold const thresholder{arguments.make_threshold_parameters()};
if constexpr (is_split)
{
iterate_split_queries(arguments, index.ibf(), thresholder, queue, query_meta.value());
iterate_split_queries<ibf_t, TSequence>(arguments, index.ibf(), thresholder, queue, query_meta.value());
}
else
{
iterate_short_queries(arguments, index.ibf(), thresholder, queue);
iterate_short_queries<ibf_t, TSequence>(arguments, index.ibf(), thresholder, queue);
}
}

Expand Down

0 comments on commit daa1f83

Please sign in to comment.