Skip to content

Commit

Permalink
Make shared query records over spans
Browse files Browse the repository at this point in the history
  • Loading branch information
eaasna committed Aug 22, 2024
1 parent 0032913 commit b0a2d70
Show file tree
Hide file tree
Showing 10 changed files with 164 additions and 141 deletions.
39 changes: 16 additions & 23 deletions include/dream_stellar/query_id_map.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,30 +7,23 @@
namespace dream_stellar
{

template <typename alphabet_t, typename sequence_container_t = std::vector<alphabet_t>,
typename sequence_reference_t = std::span<const alphabet_t>>
class QueryIDMap
/**
* @brief Associate a query ID with the corresponding segment sequence.
*/
template <typename alphabet_t>
struct query_id_map
{
std::vector<valik::shared_query_record<sequence_reference_t>> queries;

public:

QueryIDMap(std::vector<valik::shared_query_record<sequence_container_t>> const & records)
{
for (auto & record : records)
queries.emplace_back(record.id, record.get_start(), record.get_len(), record.underlyingData);
}

dream_stellar::StellarQuerySegment<sequence_reference_t> segment_from_id(unsigned const & query_id) const
{
if (query_id >= queries.size())
throw std::runtime_error("Query index " + std::to_string(query_id) + " is out of range [0, "
+ std::to_string(queries.size() - 1) + "]");
valik::shared_query_record<sequence_reference_t> & shared_rec = queries[query_id];

return shared_rec.get_seqan_segment();
}

using rec_t = valik::shared_query_record<alphabet_t>;
std::vector<rec_t> & records;

dream_stellar::StellarQuerySegment<const alphabet_t> segment_from_id(unsigned const & query_id) const
{
if (query_id >= records.size())
throw std::runtime_error("Query index " + std::to_string(query_id) + " is out of range [0, "
+ std::to_string(records.size() - 1) + "]");
rec_t & shared_rec = records[query_id];
return shared_rec.asStellarSegment();
}
};

} // namespace dream_stellar
7 changes: 3 additions & 4 deletions include/dream_stellar/stellar.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ template<typename alphabet_t, typename TTag, typename TIsPatternDisabledFn, type
StellarComputeStatistics
_stellarKernel(jst::contrib::stellar_matcher<std::span<const alphabet_t>> & matcher,
StellarDatabaseSegment<const alphabet_t> & database_segment,
QueryIDMap<alphabet_t> const & queryIDMap,
query_id_map<alphabet_t> const & query_dict,
StellarOptions & localOptions,
SwiftHitVerifier<TTag> & swiftVerifier,
TIsPatternDisabledFn && isPatternDisabled,
Expand All @@ -210,12 +210,11 @@ _stellarKernel(jst::contrib::stellar_matcher<std::span<const alphabet_t>> & matc
statistics.totalLength += database_segment.size();
statistics.maxLength = std::max<size_t>(statistics.maxLength, database_segment.size());

seqan3::debug_stream << "FOUND MATCH\n";

if (!isPatternDisabled(matcher))
{
auto queryID = matcher.curSeqNo();
StellarQuerySegment<alphabet_t> querySegment = queryIDMap.segment_from_id(queryID);
StellarQuerySegment<const alphabet_t> querySegment = query_dict.segment_from_id(queryID);
seqan3::debug_stream << "FOUND MATCH for query\t" << std::to_string(queryID) << '\n';

//!TODO: adjust for alphabet_t
/*
Expand Down
6 changes: 3 additions & 3 deletions include/dream_stellar/stellar_launcher.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,9 @@ struct StellarLauncher
static StellarComputeStatistics
search_and_verify(
jst::contrib::stellar_matcher<sequence_reference_t> & matcher,
StellarDatabaseSegment<alphabet_t> database_segment,
StellarDatabaseSegment<const alphabet_t> database_segment,
id_t const & databaseID,
QueryIDMap<alphabet_t> const & queryIDMap,
query_id_map<alphabet_t> const & query_dict,
bool const databaseStrand,
StellarOptions & localOptions, // localOptions.compactThresh is out-param
stellar_kernel_runtime & strand_runtime,
Expand Down Expand Up @@ -106,7 +106,7 @@ struct StellarLauncher
STELLAR_DESIGNATED_INITIALIZER(.verifier_options = , localOptions),
};

return _stellarKernel(matcher, database_segment, queryIDMap, localOptions, swiftVerifier, isPatternDisabled, onAlignmentResult, strand_runtime);
return _stellarKernel(matcher, database_segment, query_dict, localOptions, swiftVerifier, isPatternDisabled, onAlignmentResult, strand_runtime);
});
return statistics;
}
Expand Down
34 changes: 12 additions & 22 deletions include/valik/search/cart_query_io.hpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#pragma once

#include <filesystem>
#include <span>

#include <dream_stellar/io/import_sequence.hpp>
#include <utilities/alphabet_wrapper/seqan/alphabet.hpp>
Expand All @@ -14,42 +15,31 @@ namespace valik
{

/**
* \brief Function that creates a seqan2::Segment from a query_record (split or short query)
* \brief Extract the segment sequences from shared query records.
*
* \param records vector containing valik split query segments
* \param seqs set of query segments (in-out)
* \param ids set of query segment ids (in-out)
* \param strOut stream for standard output
* \param strErr stream for error output
* \param hostQueries underlying sequences for query segments
* \param hostQueryIDs set of underlying sequence ids
*/
template <typename rec_vec_t, typename TStream, typename seq_vec_t>
inline bool get_cart_queries(rec_vec_t const & records,
seq_vec_t & seqs,
template <typename alphabet_t, typename str_t>
inline bool get_cart_queries(std::vector<shared_query_record<alphabet_t>> const & records,
std::vector<std::span<alphabet_t const>> & seqs,
std::vector<std::string> & ids,
TStream & strOut,
TStream & strErr)
str_t & str_out)
{

std::set<std::string> uniqueIds; // set of short IDs (cut at first whitespace)
bool idsUnique = true;

size_t seqCount{0};

size_t seq_count{0};
for (auto & record : records)
{
seqs.emplace_back(record.sequence);
ids.emplace_back(record.sequence_id);
seqCount++;
idsUnique &= dream_stellar::_checkUniqueId(uniqueIds, record.sequence_id);
seqs.emplace_back(record.sequence());
ids.emplace_back(record.id());
seq_count++;
}

strOut << "Loaded " << seqCount << " query sequence" << ((seqCount > 1) ? "s " : " ") << "from cart." << std::endl;
if (!idsUnique)
strErr << "WARNING: Non-unique query ids. Output can be ambiguous.\n";
str_out << "Loaded " << seq_count << " query sequence" << ((seq_count > 1) ? "s " : " ") << "from cart." << std::endl;
return true;

}

/**
Expand All @@ -69,7 +59,7 @@ void write_cart_queries(rec_vec_t & records, std::filesystem::path const & cart_

for (auto & record : records)
{
sequence_record_type sequence_record{std::move(record.sequence_id), std::move(record.sequence)};
sequence_record_type sequence_record{std::move(record.id()), std::move(record.sequence())};
fout.push_back(sequence_record);
}
}
Expand Down
94 changes: 51 additions & 43 deletions include/valik/search/iterate_queries.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ void iterate_distributed_queries(search_arguments const & arguments,
cart_queue_t & queue)
{
using fields = seqan3::fields<seqan3::field::id, seqan3::field::seq>;
std::vector<query_record> query_records{};
std::vector<query_record<seqan3::dna4>> query_records{};
seqan3::sequence_file_input<dna4_traits, fields> fin{arguments.query_file};
for (auto &&chunked_records : fin | seqan3::views::chunk((1ULL << 20) * 10))
{
Expand All @@ -48,37 +48,39 @@ void iterate_distributed_queries(search_arguments const & arguments,
* @param arguments Command line arguments.
* @param queue Shopping cart queue for sending queries over to Stellar search.
*/
template <typename seq_t>
template <typename alphabet_t>
void iterate_all_queries(size_t const ref_seg_count,
search_arguments const & arguments,
cart_queue<shared_query_record<seq_t>> & queue)
cart_queue<shared_query_record<alphabet_t>> & queue)
{
using TId = std::string;
constexpr uint64_t chunk_size = (1ULL << 20) * 10;

seqan3::sequence_file_input<dna4_adaptor_traits> fin{arguments.query_file};
std::vector<shared_query_record<seq_t>> query_records{};
std::vector<query_record<alphabet_t>> query_records{};
std::vector<shared_query_record<alphabet_t>> query_references{};

std::set<TId> uniqueIds; // set of short IDs (cut at first whitespace)
bool idsUnique = true;
std::set<std::string> unique_ids; // set of short IDs (cut at first whitespace)
bool ids_unique{true};

size_t seqCount{0};
for (auto & record : fin)
{
idsUnique &= dream_stellar::_checkUniqueId(uniqueIds, record.id());
query_records.emplace_back(record.id(), std::make_shared<seq_t>(std::move(record.sequence())));
query_records.emplace_back(std::move(record.id()), std::move(record.sequence()));
auto & added_record = query_records[query_records.size() - 1];
ids_unique &= dream_stellar::_checkUniqueId(unique_ids, added_record.id());
query_references.emplace_back(added_record.id(), std::make_shared<std::vector<alphabet_t>>(added_record.sequence()));

if (query_records.size() > chunk_size)
if (query_references.size() > chunk_size)
{
search_all_parallel<shared_query_record<seq_t>>(ref_seg_count, arguments, query_records, queue);
search_all_parallel<shared_query_record<alphabet_t>>(ref_seg_count, arguments, query_references, queue);
query_references.clear();
query_records.clear();
}
}

if (!idsUnique)
if (!ids_unique)
std::cerr << "WARNING: Non-unique query ids. Output can be ambiguous.\n";

search_all_parallel<shared_query_record<seq_t>>(ref_seg_count, arguments, query_records, queue);
search_all_parallel<shared_query_record<alphabet_t>>(ref_seg_count, arguments, query_references, queue);
}

/**
Expand All @@ -90,38 +92,41 @@ void iterate_all_queries(size_t const ref_seg_count,
* @param thresholder Threshold for number of shared k-mers.
* @param queue Shopping cart queue for load balancing between Valik prefiltering and Stellar search.
*/
template <typename ibf_t, typename seq_t>
template <typename ibf_t, typename alphabet_t>
void iterate_short_queries(search_arguments const & arguments,
ibf_t const & ibf,
raptor::threshold::threshold const & thresholder,
cart_queue<shared_query_record<seq_t>> & queue)
cart_queue<shared_query_record<alphabet_t>> & queue)
{
using TId = std::string;
constexpr uint64_t chunk_size = (1ULL << 20) * 10;

seqan3::sequence_file_input<dna4_adaptor_traits> fin{arguments.query_file};
std::vector<shared_query_record<seq_t>> query_records{};
//!TODO: reserve chunk size?
std::vector<query_record<alphabet_t>> query_records{};
std::vector<shared_query_record<alphabet_t>> query_references{};

std::set<TId> uniqueIds; // set of short IDs (cut at first whitespace)
bool idsUnique = true;
std::set<std::string> unique_ids; // set of short IDs (cut at first whitespace)
bool ids_unique{true};

size_t seqCount{0};
for (auto & record : fin)
{
idsUnique &= dream_stellar::_checkUniqueId(uniqueIds, record.id());
query_records.emplace_back(record.id(), std::make_shared<seq_t>(std::move(record.sequence())));

query_records.emplace_back(std::move(record.id()), std::move(record.sequence()));
query_record<alphabet_t> & added_record = query_records[query_records.size() - 1];
ids_unique &= dream_stellar::_checkUniqueId(unique_ids, added_record.id());
query_references.emplace_back(added_record.id(), std::make_shared<std::vector<alphabet_t>>(added_record.sequence()));

if (query_records.size() > chunk_size)
{
prefilter_queries_parallel<shared_query_record<seq_t>>(ibf, arguments, query_records, thresholder, queue);
prefilter_queries_parallel<shared_query_record<alphabet_t>>(ibf, arguments, query_references, thresholder, queue);
query_references.clear();
query_records.clear();
}
}

if (!idsUnique)
if (!ids_unique)
std::cerr << "WARNING: Non-unique query ids. Output can be ambiguous.\n";

prefilter_queries_parallel<shared_query_record<seq_t>>(ibf, arguments, query_records, thresholder, queue);
prefilter_queries_parallel<shared_query_record<alphabet_t>>(ibf, arguments, query_references, thresholder, queue);
}

/**
Expand All @@ -134,45 +139,48 @@ void iterate_short_queries(search_arguments const & arguments,
* @param queue Shopping cart queue for load balancing between Valik prefiltering and Stellar search.
* @param meta Metadata table for split query segments.
*/
template <typename ibf_t, typename seq_t>
template <typename ibf_t, typename alphabet_t>
void iterate_split_queries(search_arguments const & arguments,
ibf_t const & ibf,
raptor::threshold::threshold const & thresholder,
cart_queue<shared_query_record<seq_t>> & queue,
cart_queue<shared_query_record<alphabet_t>> & queue,
metadata & meta)
{
using TId = std::string;
constexpr uint64_t chunk_size = (1ULL << 20) * 10;

seqan3::sequence_file_input<dna4_adaptor_traits> fin{arguments.query_file};
std::vector<shared_query_record<seq_t>> query_records{};
std::vector<query_record<alphabet_t>> query_records{};
std::vector<shared_query_record<alphabet_t>> query_references{};

std::set<TId> uniqueIds; // set of short IDs (cut at first whitespace)
bool idsUnique = true;
std::set<std::string> unique_ids; // set of short IDs (cut at first whitespace)
bool ids_unique{true};

size_t seqCount{0};
size_t seq_count{0};
for (auto & record : fin)
{
idsUnique &= dream_stellar::_checkUniqueId(uniqueIds, record.id());
auto query_ptr = std::make_shared<seq_t>(std::move(record.sequence()));
for (auto const & seg : meta.segments_from_ind(seqCount))
ids_unique &= dream_stellar::_checkUniqueId(unique_ids, record.id());
query_records.emplace_back(std::move(record.id()), std::move(record.sequence()));
auto & added_record = query_records[query_records.size() - 1];
auto query_ptr = std::make_shared<std::vector<alphabet_t>>(added_record.sequence());
for (auto const & seg : meta.segments_from_ind(seq_count))
{
// each split query record contains a copy of the same shared pointer
query_records.emplace_back(record.id(), seg, query_ptr);
query_references.emplace_back(added_record.id(), seg.start, seg.len, query_ptr);

if (query_records.size() > chunk_size)
{
prefilter_queries_parallel<shared_query_record<seq_t>>(ibf, arguments, query_records, thresholder, queue);
query_records.clear(); // shared pointers are erased -> memory is deallocated
prefilter_queries_parallel<shared_query_record<alphabet_t>>(ibf, arguments, query_references, thresholder, queue);
query_references.clear();
query_records.clear();
}
}
seqCount++;
seq_count++;
}

if (!idsUnique)
if (!ids_unique)
std::cerr << "WARNING: Non-unique query ids. Output can be ambiguous.\n";

prefilter_queries_parallel<shared_query_record<seq_t>>(ibf, arguments, query_records, thresholder, queue);
prefilter_queries_parallel<shared_query_record<alphabet_t>>(ibf, arguments, query_references, thresholder, queue);
}

} // namespace valik::app
2 changes: 1 addition & 1 deletion include/valik/search/local_prefilter.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ void local_prefilter(

for (query_t const & record : records)
{
auto const & seq = record.sequence;
auto const & seq = record.sequence();

// debug
//seqan3::debug_stream << "Prefiltering query " << seq << '\n';
Expand Down
Loading

0 comments on commit b0a2d70

Please sign in to comment.