Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Repeat filtering: search in bins with highest entropy #121

Merged
merged 8 commits into from
Nov 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions include/utilities/prepare/parse_bin_paths.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
namespace valik
{

inline auto parse_bin_paths(build_arguments const & arguments)
inline auto parse_bin_paths(build_arguments const & arguments, std::string const & extension = "minimiser")
{
std::vector<std::vector<std::string>> minimiser_files{};
if (arguments.bin_path.size() > 1)
Expand All @@ -16,7 +16,7 @@ inline auto parse_bin_paths(build_arguments const & arguments)
std::vector<std::string> bin_headers;
for (std::filesystem::path file : bin_files)
{
bin_headers.emplace_back(arguments.out_dir / file.stem().replace_extension("minimiser"));
bin_headers.emplace_back(arguments.out_dir / file.stem().replace_extension(extension));
}
minimiser_files.push_back(bin_headers);
}
Expand All @@ -30,7 +30,7 @@ inline auto parse_bin_paths(build_arguments const & arguments)
file /= ref_file.stem();
file += ".";
file += std::to_string(bin);
file += ".minimiser";
file += "." + extension;
minimiser_files.emplace_back(1, file.string());
}
}
Expand Down
24 changes: 24 additions & 0 deletions include/valik/build/index_factory.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ class index_factory

valik_index<> index{*arguments};
auto & ibf = index.ibf();
auto & entropy_ranking = index.entropy_ranking();
std::vector<std::pair<size_t, double>> entropy_map{};

using sequence_file_t = seqan3::sequence_file_input<dna4_traits, seqan3::fields<seqan3::field::seq>>;
auto hash_view = [&] ()
Expand Down Expand Up @@ -66,6 +68,28 @@ class index_factory

std::vector<std::vector<std::string>> file_paths = parse_bin_paths(*arguments);
call_parallel_on_bins(minimiser_worker, file_paths, arguments->threads);

std::vector<std::vector<std::string>> header_paths = parse_bin_paths(*arguments, "header");
std::string shape_string{};
uint64_t window_size{};
size_t count{};
uint64_t bin_size{};
entropy_ranking.reserve(header_paths.size());
for (auto && [file_names, bin_number] : seqan3::views::zip(header_paths, std::views::iota(0u)))
{
//!TODO: disallow multiple files per bin
std::ifstream file_stream{file_names[0]};
file_stream >> shape_string >> window_size >> count >> bin_size;
entropy_map.emplace_back(std::make_pair((size_t) bin_number, (double) count / (double) bin_size));
}

std::ranges::sort(entropy_map.begin(), entropy_map.end(), [](const std::pair<size_t, double> &a, const std::pair<size_t, double> &b)
{
return a.second > b.second;
});

for (auto && [bin_id, entropy] : entropy_map)
entropy_ranking.push_back(bin_id);
}
else if (arguments->bin_path.size() > 1)
{
Expand Down
12 changes: 12 additions & 0 deletions include/valik/index.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ class valik_index
uint64_t window_size_{};
seqan3::shape shape_{};
std::vector<std::vector<std::string>> bin_path_{};
std::vector<size_t> entropy_ranking_{};
data_t ibf_{};

public:
Expand Down Expand Up @@ -81,6 +82,16 @@ class valik_index
return bin_path_;
}

std::vector<size_t> & entropy_ranking()
{
return entropy_ranking_;
}

std::vector<size_t> const & entropy_ranking() const
{
return entropy_ranking_;
}

data_t & ibf()
{
return ibf_;
Expand Down Expand Up @@ -110,6 +121,7 @@ class valik_index
archive(window_size_);
archive(shape_);
archive(bin_path_);
archive(entropy_ranking_);
archive(ibf_);
}
catch (std::exception const & e)
Expand Down
32 changes: 16 additions & 16 deletions include/valik/search/iterate_queries.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,13 @@ namespace valik::app
* @brief Function that sends chunks of queries to the prefilter which then writes shopping carts onto disk.
*
* @param arguments Command line arguments.
* @param ibf Interleaved Bloom Filter.
* @param index Valik index of the reference database.
* @param thresholder Threshold for number of shared k-mers.
* @param queue Shopping cart queue for load balancing between prefiltering and Stellar search.
*/
template <typename ibf_t, typename cart_queue_t>
template <typename index_t, typename cart_queue_t>
void iterate_distributed_queries(search_arguments const & arguments,
ibf_t const & ibf,
index_t const & index,
raptor::threshold::threshold const & thresholder,
cart_queue_t & queue)
{
Expand All @@ -34,7 +34,7 @@ void iterate_distributed_queries(search_arguments const & arguments,
for (auto && fasta_record: chunked_records)
query_records.emplace_back(std::move(fasta_record.id()), std::move(fasta_record.sequence()));

prefilter_queries_parallel(ibf, arguments, query_records, thresholder, queue);
prefilter_queries_parallel(index, arguments, query_records, thresholder, queue);
}
}

Expand Down Expand Up @@ -89,15 +89,15 @@ void iterate_all_queries(size_t const ref_seg_count,
/**
* @brief Function that creates short query records from fasta file input and sends them for prefiltering.
*
* @tparam ibf_t Interleaved Bloom Filter type.
* @tparam index_t Valik index type containing Interleaved Bloom Filter.
* @param arguments Command line arguments.
* @param ibf Interleaved Bloom Filter of the reference database.
* @param infex Valik index of the reference database.
* @param thresholder Threshold for number of shared k-mers.
* @param queue Shopping cart queue for load balancing between Valik prefiltering and Stellar search.
*/
template <typename ibf_t, typename TSequence>
template <typename index_t, typename TSequence>
void iterate_short_queries(search_arguments const & arguments,
ibf_t const & ibf,
index_t const & index,
raptor::threshold::threshold const & thresholder,
cart_queue<shared_query_record<TSequence>> & queue)
{
Expand Down Expand Up @@ -126,30 +126,30 @@ void iterate_short_queries(search_arguments const & arguments,

if (query_records.size() > chunk_size)
{
prefilter_queries_parallel<shared_query_record<TSequence>>(ibf, arguments, query_records, thresholder, queue);
prefilter_queries_parallel<shared_query_record<TSequence>>(index, arguments, query_records, thresholder, queue);
query_records.clear();
}
}

if (!idsUnique)
std::cerr << "WARNING: Non-unique query ids. Output can be ambiguous.\n";

prefilter_queries_parallel<shared_query_record<TSequence>>(ibf, arguments, query_records, thresholder, queue);
prefilter_queries_parallel<shared_query_record<TSequence>>(index, arguments, query_records, thresholder, queue);
}

/**
* @brief Function that creates split query records from fasta file input and sends them for prefiltering.
*
* @tparam ibf_t Interleaved Bloom Filter type.
* @tparam index_t Valik index type containing Interleaved Bloom Filter.
* @param arguments Command line arguments.
* @param ibf Interleaved Bloom Filter of the reference database.
* @param index Valik index of the reference database.
* @param thresholder Threshold for number of shared k-mers.
* @param queue Shopping cart queue for load balancing between Valik prefiltering and Stellar search.
* @param meta Metadata table for split query segments.
*/
template <typename ibf_t, typename TSequence>
template <typename index_t, typename TSequence>
void iterate_split_queries(search_arguments const & arguments,
ibf_t const & ibf,
index_t const & index,
raptor::threshold::threshold const & thresholder,
cart_queue<shared_query_record<TSequence>> & queue,
metadata & meta)
Expand Down Expand Up @@ -184,7 +184,7 @@ void iterate_split_queries(search_arguments const & arguments,

if (query_records.size() > chunk_size)
{
prefilter_queries_parallel<shared_query_record<TSequence>>(ibf, arguments, query_records, thresholder, queue);
prefilter_queries_parallel<shared_query_record<TSequence>>(index, arguments, query_records, thresholder, queue);
query_records.clear(); // shared pointers are erased -> memory is deallocated
}
}
Expand All @@ -193,7 +193,7 @@ void iterate_split_queries(search_arguments const & arguments,
if (!idsUnique)
std::cerr << "WARNING: Non-unique query ids. Output can be ambiguous.\n";

prefilter_queries_parallel<shared_query_record<TSequence>>(ibf, arguments, query_records, thresholder, queue);
prefilter_queries_parallel<shared_query_record<TSequence>>(index, arguments, query_records, thresholder, queue);
}

} // namespace valik::app
14 changes: 5 additions & 9 deletions include/valik/search/local_prefilter.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -107,8 +107,7 @@ template <typename binning_bitvector_t>
void find_pattern_bins(pattern_bounds const & pattern,
size_t const & bin_count,
binning_bitvector_t const & counting_table,
std::unordered_map<size_t, size_t> & sequence_hits,
uint64_t & pattern_hits)
std::unordered_set<size_t> & sequence_hits)
{
// counting vector for the current pattern
seqan3::counting_vector<uint8_t> total_counts(bin_count, 0);
Expand All @@ -121,8 +120,7 @@ void find_pattern_bins(pattern_bounds const & pattern,
if (count >= pattern.threshold)
{
// the result is a union of results from all patterns of a read
sequence_hits[current_bin]++;
pattern_hits++;
sequence_hits.insert(current_bin);
}
}
}
Expand Down Expand Up @@ -200,16 +198,14 @@ void local_prefilter(

minimiser.clear();

uint64_t pattern_hits{0};
// {bin ID, pattern hit count}
std::unordered_map<size_t, size_t> sequence_hits{};
std::unordered_set<size_t> sequence_hits{};
pattern_begin_positions(seq.size(), arguments.pattern_size, arguments.query_every, [&](size_t const begin)
{
pattern_bounds const pattern = make_pattern_bounds(begin, arguments, window_span_begin, thresholder);
find_pattern_bins(pattern, bin_count, counting_table, sequence_hits, pattern_hits);
find_pattern_bins(pattern, bin_count, counting_table, sequence_hits);
});

result_cb(record, sequence_hits, pattern_hits);
result_cb(record, sequence_hits);
}
}

Expand Down
32 changes: 19 additions & 13 deletions include/valik/search/producer_threads_parallel.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ namespace valik::app
/**
* @brief Create parallel prefiltering jobs.
*/
template <typename query_t, seqan3::data_layout ibf_data_layout>
inline void prefilter_queries_parallel(seqan3::interleaved_bloom_filter<ibf_data_layout> const & ibf,
template <typename query_t, typename index_t>
inline void prefilter_queries_parallel(index_t const & index,
search_arguments const & arguments,
std::vector<query_t> const & records,
raptor::threshold::threshold const & thresholder,
Expand All @@ -43,43 +43,49 @@ inline void prefilter_queries_parallel(seqan3::interleaved_bloom_filter<ibf_data

std::span<query_t const> records_slice{&records[start], &records[end]};

auto prefilter_cb = [&queue,&arguments,&verbose_out,&ibf](query_t const & record,
std::unordered_map<size_t, size_t> const & bin_hits,
uint64_t const & total_pattern_hits)
auto prefilter_cb = [&queue,&arguments,&verbose_out,&index](query_t const & record,
std::unordered_set<size_t> const & bin_hits)
{
auto & ibf = index.ibf();
if (bin_hits.size() > std::max((size_t) 4, (size_t) std::round(ibf.bin_count() / 2.0)))
{
if (arguments.verbose)
verbose_out.write_warning(record, bin_hits.size());
if (arguments.keep_best_repeats) // keep bin hits that are supported by the most patterns per query segment
if (arguments.keep_best_repeats) // keep hits for bins with the highest entropy
{
size_t mean_bin_support = std::max((size_t) 2, (size_t) std::round((double) total_pattern_hits / (double) bin_hits.size()));
for (auto const [bin, count] : bin_hits)
auto const & entropy_ranking = index.entropy_ranking();
size_t inserted_bins{0};
for (size_t bin : entropy_ranking)
{
if (count > mean_bin_support)
if (bin_hits.count(bin) > 0)
{
queue.insert(bin, record);
inserted_bins++;
}
if (inserted_bins >= std::max((size_t) 4, (size_t) std::round(ibf.bin_count() * arguments.best_bin_entropy_cutoff)))
return;
}
}
else if (arguments.keep_all_repeats)
{
for (auto const [bin, count] : bin_hits)
for (auto & bin : bin_hits)
{
queue.insert(bin, record);
}
}
return;
}

for (auto const [bin, count] : bin_hits)
for (auto const bin : bin_hits)
{
queue.insert(bin, record);
}
};

// The following calls `local_prefilter(records, ibf, arguments, threshold)` on a thread.
tasks.emplace_back([=, &ibf, &arguments, &thresholder]()
tasks.emplace_back([=, &index, &arguments, &thresholder]()
{
local_prefilter(records_slice, ibf, arguments, thresholder, prefilter_cb);
local_prefilter(records_slice, index.ibf(), arguments, thresholder, prefilter_cb);
});
}
}
Expand Down
2 changes: 1 addition & 1 deletion include/valik/search/search_distributed.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ bool search_distributed(search_arguments & arguments, search_time_statistics & t
else
{
raptor::threshold::threshold const thresholder{arguments.make_threshold_parameters()};
iterate_distributed_queries(arguments, index.ibf(), thresholder, queue);
iterate_distributed_queries(arguments, index, thresholder, queue);

}
queue.finish(); // Flush carts that are not empty yet
Expand Down
14 changes: 7 additions & 7 deletions include/valik/search/search_local.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,10 @@ static inline dream_stellar::StellarOptions make_thread_options(search_arguments
template <bool is_split, bool stellar_only>
bool search_local(search_arguments & arguments, search_time_statistics & time_statistics)
{
if (arguments.bin_path.size() > 1 || arguments.bin_path[0].size() > 1)
throw std::runtime_error("Multiple reference files can not be searched in shared memory mode. "
"Add --distribute argument to launch multiple distributed instances of DREAM-Stellar search.");

using index_structure_t = index_structure::ibf;
auto index = valik_index<index_structure_t>{};

Expand Down Expand Up @@ -170,10 +174,6 @@ bool search_local(search_arguments & arguments, search_time_statistics & time_st
}
}

if (arguments.bin_path.size() > 1 || arguments.bin_path[0].size() > 1)
throw std::runtime_error("Multiple reference files can not be searched in shared memory mode. "
"Add --distribute argument to launch multiple distributed instances of DREAM-Stellar search.");

dream_stellar::stellar_runtime input_databases_time{};
bool const databasesSuccess = input_databases_time.measure_time([&]()
{
Expand Down Expand Up @@ -396,15 +396,15 @@ bool search_local(search_arguments & arguments, search_time_statistics & time_st
}
else
{
using ibf_t = decltype(index.ibf());
using index_t = decltype(index);
raptor::threshold::threshold const thresholder{arguments.make_threshold_parameters()};
if constexpr (is_split)
{
iterate_split_queries<ibf_t, TSequence>(arguments, index.ibf(), thresholder, queue, query_meta.value());
iterate_split_queries<index_t, TSequence>(arguments, index, thresholder, queue, query_meta.value());
}
else
{
iterate_short_queries<ibf_t, TSequence>(arguments, index.ibf(), thresholder, queue);
iterate_short_queries<index_t, TSequence>(arguments, index, thresholder, queue);
}
}

Expand Down
1 change: 1 addition & 0 deletions include/valik/shared.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,7 @@ struct search_arguments final : public minimiser_threshold_arguments, search_pro
bool fast{false};
bool verbose{false};
bool keep_best_repeats{false};
double best_bin_entropy_cutoff{0.25};
bool keep_all_repeats{false};

size_t cart_max_capacity{1000};
Expand Down
10 changes: 9 additions & 1 deletion src/argument_parsing/search.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,16 @@ void init_search_parser(sharg::parser & parser, search_arguments & arguments)
parser.add_flag(arguments.keep_best_repeats,
sharg::config{.short_id = '\0',
.long_id = "keep-best-repeats",
.description = "Find only highest similarity matches for repeat regions.",
.description = "Consider only high entropy regions for queries with abundant matches.",
.advanced = true});
parser.add_option(arguments.best_bin_entropy_cutoff,
sharg::config{.short_id = '\0',
.long_id = "bin-entropy-cutoff",
.description = "For queries with abundant matches, search only highly varied reference regions. "
"Increase this value to search more of the reference. "
"Use with --keep-best-repeats.",
.advanced = true,
.validator = sharg::arithmetic_range_validator{0.0, 1.0}});
parser.add_flag(arguments.keep_all_repeats,
sharg::config{.short_id = '\0',
.long_id = "keep-all-repeats",
Expand Down
Loading
Loading