Skip to content

Commit

Permalink
Search abundant queries in bins with the highest entropy
Browse files Browse the repository at this point in the history
  • Loading branch information
eaasna committed Oct 14, 2024
1 parent a33d321 commit 9aeeec2
Show file tree
Hide file tree
Showing 23 changed files with 124 additions and 74 deletions.
6 changes: 3 additions & 3 deletions include/utilities/prepare/parse_bin_paths.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
namespace valik
{

inline auto parse_bin_paths(build_arguments const & arguments)
inline auto parse_bin_paths(build_arguments const & arguments, std::string const & extension = "minimiser")
{
std::vector<std::vector<std::string>> minimiser_files{};
if (arguments.bin_path.size() > 1)
Expand All @@ -16,7 +16,7 @@ inline auto parse_bin_paths(build_arguments const & arguments)
std::vector<std::string> bin_headers;
for (std::filesystem::path file : bin_files)
{
bin_headers.emplace_back(arguments.out_dir / file.stem().replace_extension("minimiser"));
bin_headers.emplace_back(arguments.out_dir / file.stem().replace_extension(extension));
}
minimiser_files.push_back(bin_headers);
}
Expand All @@ -30,7 +30,7 @@ inline auto parse_bin_paths(build_arguments const & arguments)
file /= ref_file.stem();
file += ".";
file += std::to_string(bin);
file += ".minimiser";
file += "." + extension;
minimiser_files.emplace_back(1, file.string());
}
}
Expand Down
24 changes: 24 additions & 0 deletions include/valik/build/index_factory.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ class index_factory

valik_index<> index{*arguments};
auto & ibf = index.ibf();
auto & entropy_ranking = index.entropy_ranking();
std::vector<std::pair<size_t, double>> entropy_map{};

using sequence_file_t = seqan3::sequence_file_input<dna4_traits, seqan3::fields<seqan3::field::seq>>;
auto hash_view = [&] ()
Expand Down Expand Up @@ -66,6 +68,28 @@ class index_factory

std::vector<std::vector<std::string>> file_paths = parse_bin_paths(*arguments);
call_parallel_on_bins(minimiser_worker, file_paths, arguments->threads);

std::vector<std::vector<std::string>> header_paths = parse_bin_paths(*arguments, "header");
std::string shape_string{};
uint64_t window_size{};
size_t count{};
uint64_t bin_size{};
entropy_ranking.reserve(header_paths.size());
for (auto && [file_names, bin_number] : seqan3::views::zip(header_paths, std::views::iota(0u)))
{
//!TODO: disallow multiple files per bin
std::ifstream file_stream{file_names[0]};
file_stream >> shape_string >> window_size >> count >> bin_size;
entropy_map.emplace_back(std::make_pair((size_t) bin_number, (double) count / (double) bin_size));
}

std::ranges::sort(entropy_map.begin(), entropy_map.end(), [](const std::pair<size_t, double> &a, const std::pair<size_t, double> &b)
{
return a.second > b.second;
});

for (auto && [bin_id, entropy] : entropy_map)
entropy_ranking.push_back(bin_id);
}
else if (arguments->bin_path.size() > 1)
{
Expand Down
12 changes: 12 additions & 0 deletions include/valik/index.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ class valik_index
uint64_t window_size_{};
seqan3::shape shape_{};
std::vector<std::vector<std::string>> bin_path_{};
std::vector<size_t> entropy_ranking_{};
data_t ibf_{};

public:
Expand Down Expand Up @@ -81,6 +82,16 @@ class valik_index
return bin_path_;
}

std::vector<size_t> & entropy_ranking()
{
return entropy_ranking_;
}

std::vector<size_t> const & entropy_ranking() const
{
return entropy_ranking_;
}

data_t & ibf()
{
return ibf_;
Expand Down Expand Up @@ -110,6 +121,7 @@ class valik_index
archive(window_size_);
archive(shape_);
archive(bin_path_);
archive(entropy_ranking_);
archive(ibf_);
}
catch (std::exception const & e)
Expand Down
32 changes: 16 additions & 16 deletions include/valik/search/iterate_queries.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,13 @@ namespace valik::app
* @brief Function that sends chunks of queries to the prefilter which then writes shopping carts onto disk.
*
* @param arguments Command line arguments.
* @param ibf Interleaved Bloom Filter.
* @param index Valik index of the reference database.
* @param thresholder Threshold for number of shared k-mers.
* @param queue Shopping cart queue for load balancing between prefiltering and Stellar search.
*/
template <typename ibf_t, typename cart_queue_t>
template <typename index_t, typename cart_queue_t>
void iterate_distributed_queries(search_arguments const & arguments,
ibf_t const & ibf,
index_t const & index,
raptor::threshold::threshold const & thresholder,
cart_queue_t & queue)
{
Expand All @@ -34,7 +34,7 @@ void iterate_distributed_queries(search_arguments const & arguments,
for (auto && fasta_record: chunked_records)
query_records.emplace_back(std::move(fasta_record.id()), std::move(fasta_record.sequence()));

prefilter_queries_parallel(ibf, arguments, query_records, thresholder, queue);
prefilter_queries_parallel(index, arguments, query_records, thresholder, queue);
}
}

Expand Down Expand Up @@ -89,15 +89,15 @@ void iterate_all_queries(size_t const ref_seg_count,
/**
* @brief Function that creates short query records from fasta file input and sends them for prefiltering.
*
* @tparam ibf_t Interleaved Bloom Filter type.
* @tparam index_t Valik index type containing Interleaved Bloom Filter.
* @param arguments Command line arguments.
* @param ibf Interleaved Bloom Filter of the reference database.
* @param infex Valik index of the reference database.
* @param thresholder Threshold for number of shared k-mers.
* @param queue Shopping cart queue for load balancing between Valik prefiltering and Stellar search.
*/
template <typename ibf_t, typename TSequence>
template <typename index_t, typename TSequence>
void iterate_short_queries(search_arguments const & arguments,
ibf_t const & ibf,
index_t const & index,
raptor::threshold::threshold const & thresholder,
cart_queue<shared_query_record<TSequence>> & queue)
{
Expand Down Expand Up @@ -126,30 +126,30 @@ void iterate_short_queries(search_arguments const & arguments,

if (query_records.size() > chunk_size)
{
prefilter_queries_parallel<shared_query_record<TSequence>>(ibf, arguments, query_records, thresholder, queue);
prefilter_queries_parallel<shared_query_record<TSequence>>(index, arguments, query_records, thresholder, queue);
query_records.clear();
}
}

if (!idsUnique)
std::cerr << "WARNING: Non-unique query ids. Output can be ambiguous.\n";

prefilter_queries_parallel<shared_query_record<TSequence>>(ibf, arguments, query_records, thresholder, queue);
prefilter_queries_parallel<shared_query_record<TSequence>>(index, arguments, query_records, thresholder, queue);
}

/**
* @brief Function that creates split query records from fasta file input and sends them for prefiltering.
*
* @tparam ibf_t Interleaved Bloom Filter type.
* @tparam index_t Valik index type containing Interleaved Bloom Filter.
* @param arguments Command line arguments.
* @param ibf Interleaved Bloom Filter of the reference database.
* @param index Valik index of the reference database.
* @param thresholder Threshold for number of shared k-mers.
* @param queue Shopping cart queue for load balancing between Valik prefiltering and Stellar search.
* @param meta Metadata table for split query segments.
*/
template <typename ibf_t, typename TSequence>
template <typename index_t, typename TSequence>
void iterate_split_queries(search_arguments const & arguments,
ibf_t const & ibf,
index_t const & index,
raptor::threshold::threshold const & thresholder,
cart_queue<shared_query_record<TSequence>> & queue,
metadata & meta)
Expand Down Expand Up @@ -184,7 +184,7 @@ void iterate_split_queries(search_arguments const & arguments,

if (query_records.size() > chunk_size)
{
prefilter_queries_parallel<shared_query_record<TSequence>>(ibf, arguments, query_records, thresholder, queue);
prefilter_queries_parallel<shared_query_record<TSequence>>(index, arguments, query_records, thresholder, queue);
query_records.clear(); // shared pointers are erased -> memory is deallocated
}
}
Expand All @@ -193,7 +193,7 @@ void iterate_split_queries(search_arguments const & arguments,
if (!idsUnique)
std::cerr << "WARNING: Non-unique query ids. Output can be ambiguous.\n";

prefilter_queries_parallel<shared_query_record<TSequence>>(ibf, arguments, query_records, thresholder, queue);
prefilter_queries_parallel<shared_query_record<TSequence>>(index, arguments, query_records, thresholder, queue);
}

} // namespace valik::app
14 changes: 5 additions & 9 deletions include/valik/search/local_prefilter.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -107,8 +107,7 @@ template <typename binning_bitvector_t>
void find_pattern_bins(pattern_bounds const & pattern,
size_t const & bin_count,
binning_bitvector_t const & counting_table,
std::unordered_map<size_t, size_t> & sequence_hits,
uint64_t & pattern_hits)
std::unordered_set<size_t> & sequence_hits)
{
// counting vector for the current pattern
seqan3::counting_vector<uint8_t> total_counts(bin_count, 0);
Expand All @@ -121,8 +120,7 @@ void find_pattern_bins(pattern_bounds const & pattern,
if (count >= pattern.threshold)
{
// the result is a union of results from all patterns of a read
sequence_hits[current_bin]++;
pattern_hits++;
sequence_hits.insert(current_bin);
}
}
}
Expand Down Expand Up @@ -200,16 +198,14 @@ void local_prefilter(

minimiser.clear();

uint64_t pattern_hits{0};
// {bin ID, pattern hit count}
std::unordered_map<size_t, size_t> sequence_hits{};
std::unordered_set<size_t> sequence_hits{};
pattern_begin_positions(seq.size(), arguments.pattern_size, arguments.query_every, [&](size_t const begin)
{
pattern_bounds const pattern = make_pattern_bounds(begin, arguments, window_span_begin, thresholder);
find_pattern_bins(pattern, bin_count, counting_table, sequence_hits, pattern_hits);
find_pattern_bins(pattern, bin_count, counting_table, sequence_hits);
});

result_cb(record, sequence_hits, pattern_hits);
result_cb(record, sequence_hits);
}
}

Expand Down
32 changes: 19 additions & 13 deletions include/valik/search/producer_threads_parallel.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ namespace valik::app
/**
* @brief Create parallel prefiltering jobs.
*/
template <typename query_t, seqan3::data_layout ibf_data_layout>
inline void prefilter_queries_parallel(seqan3::interleaved_bloom_filter<ibf_data_layout> const & ibf,
template <typename query_t, typename index_t>
inline void prefilter_queries_parallel(index_t const & index,
search_arguments const & arguments,
std::vector<query_t> const & records,
raptor::threshold::threshold const & thresholder,
Expand All @@ -43,43 +43,49 @@ inline void prefilter_queries_parallel(seqan3::interleaved_bloom_filter<ibf_data

std::span<query_t const> records_slice{&records[start], &records[end]};

auto prefilter_cb = [&queue,&arguments,&verbose_out,&ibf](query_t const & record,
std::unordered_map<size_t, size_t> const & bin_hits,
uint64_t const & total_pattern_hits)
auto prefilter_cb = [&queue,&arguments,&verbose_out,&index](query_t const & record,
std::unordered_set<size_t> const & bin_hits)
{
auto & ibf = index.ibf();
if (bin_hits.size() > std::max((size_t) 4, (size_t) std::round(ibf.bin_count() / 2.0)))
{
if (arguments.verbose)
verbose_out.write_warning(record, bin_hits.size());
if (arguments.keep_best_repeats) // keep bin hits that are supported by the most patterns per query segment
if (arguments.keep_best_repeats) // keep hits for bins with the highest entropy
{
size_t mean_bin_support = std::max((size_t) 2, (size_t) std::round((double) total_pattern_hits / (double) bin_hits.size()));
for (auto const [bin, count] : bin_hits)
auto const & entropy_ranking = index.entropy_ranking();
size_t inserted_bins{0};
for (size_t bin : entropy_ranking)
{
if (count > mean_bin_support)
if (bin_hits.count(bin) > 0)
{
queue.insert(bin, record);
inserted_bins++;
}
if (inserted_bins >= std::max((size_t) 4, (size_t) std::round(ibf.bin_count() * arguments.best_bin_entropy_cutoff)))
return;
}
}
else if (arguments.keep_all_repeats)
{
for (auto const [bin, count] : bin_hits)
for (auto & bin : bin_hits)
{
queue.insert(bin, record);
}
}
return;
}

for (auto const [bin, count] : bin_hits)
for (auto const bin : bin_hits)
{
queue.insert(bin, record);
}
};

// The following calls `local_prefilter(records, ibf, arguments, threshold)` on a thread.
tasks.emplace_back([=, &ibf, &arguments, &thresholder]()
tasks.emplace_back([=, &index, &arguments, &thresholder]()
{
local_prefilter(records_slice, ibf, arguments, thresholder, prefilter_cb);
local_prefilter(records_slice, index.ibf(), arguments, thresholder, prefilter_cb);
});
}
}
Expand Down
2 changes: 1 addition & 1 deletion include/valik/search/search_distributed.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ bool search_distributed(search_arguments & arguments, search_time_statistics & t
else
{
raptor::threshold::threshold const thresholder{arguments.make_threshold_parameters()};
iterate_distributed_queries(arguments, index.ibf(), thresholder, queue);
iterate_distributed_queries(arguments, index, thresholder, queue);

}
queue.finish(); // Flush carts that are not empty yet
Expand Down
14 changes: 7 additions & 7 deletions include/valik/search/search_local.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,10 @@ static inline dream_stellar::StellarOptions make_thread_options(search_arguments
template <bool is_split, bool stellar_only>
bool search_local(search_arguments & arguments, search_time_statistics & time_statistics)
{
if (arguments.bin_path.size() > 1 || arguments.bin_path[0].size() > 1)
throw std::runtime_error("Multiple reference files can not be searched in shared memory mode. "
"Add --distribute argument to launch multiple distributed instances of DREAM-Stellar search.");

using index_structure_t = index_structure::ibf;
auto index = valik_index<index_structure_t>{};

Expand Down Expand Up @@ -170,10 +174,6 @@ bool search_local(search_arguments & arguments, search_time_statistics & time_st
}
}

if (arguments.bin_path.size() > 1 || arguments.bin_path[0].size() > 1)
throw std::runtime_error("Multiple reference files can not be searched in shared memory mode. "
"Add --distribute argument to launch multiple distributed instances of DREAM-Stellar search.");

dream_stellar::stellar_runtime input_databases_time{};
bool const databasesSuccess = input_databases_time.measure_time([&]()
{
Expand Down Expand Up @@ -396,15 +396,15 @@ bool search_local(search_arguments & arguments, search_time_statistics & time_st
}
else
{
using ibf_t = decltype(index.ibf());
using index_t = decltype(index);
raptor::threshold::threshold const thresholder{arguments.make_threshold_parameters()};
if constexpr (is_split)
{
iterate_split_queries<ibf_t, TSequence>(arguments, index.ibf(), thresholder, queue, query_meta.value());
iterate_split_queries<index_t, TSequence>(arguments, index, thresholder, queue, query_meta.value());
}
else
{
iterate_short_queries<ibf_t, TSequence>(arguments, index.ibf(), thresholder, queue);
iterate_short_queries<index_t, TSequence>(arguments, index, thresholder, queue);
}
}

Expand Down
1 change: 1 addition & 0 deletions include/valik/shared.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,7 @@ struct search_arguments final : public minimiser_threshold_arguments, search_pro
bool fast{false};
bool verbose{false};
bool keep_best_repeats{false};
double best_bin_entropy_cutoff{0.1};
bool keep_all_repeats{false};

size_t cart_max_capacity{1000};
Expand Down
10 changes: 9 additions & 1 deletion src/argument_parsing/search.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,16 @@ void init_search_parser(sharg::parser & parser, search_arguments & arguments)
parser.add_flag(arguments.keep_best_repeats,
sharg::config{.short_id = '\0',
.long_id = "keep-best-repeats",
.description = "Find only highest similarity matches for repeat regions.",
.description = "Consider only high entropy regions for queries with abundant matches.",
.advanced = true});
parser.add_option(arguments.best_bin_entropy_cutoff,
sharg::config{.short_id = '\0',
.long_id = "bin-entropy-cutoff",
.description = "For queries with abundant matches, search only highly varied reference regions. "
"Increase this value to search more of the reference. "
"Use with --keep-best-repeats.",
.advanced = true,
.validator = sharg::arithmetic_range_validator{0.001, 0.99}});
parser.add_flag(arguments.keep_all_repeats,
sharg::config{.short_id = '\0',
.long_id = "keep-all-repeats",
Expand Down
Loading

0 comments on commit 9aeeec2

Please sign in to comment.