Skip to content

Commit

Permalink
Count distinct segment k-mers in parallel
Browse files Browse the repository at this point in the history
  • Loading branch information
eaasna committed Oct 22, 2024
1 parent 12acbb6 commit d91e28e
Show file tree
Hide file tree
Showing 3 changed files with 83 additions and 41 deletions.
13 changes: 0 additions & 13 deletions include/raptor/file_reader.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -78,19 +78,6 @@ class file_reader<file_types::sequence>
std::ranges::copy_if(record.sequence() | minimiser_view, target, pred);
}

void on_hash(std::vector<std::string> const & filenames, auto && callback) const
{
for (auto && filename : filenames)
on_hash(filename, callback);
}

void on_hash(std::string const & filename, auto && callback) const
{
sequence_file_t fin{filename};
for (auto && record : fin)
callback(record.sequence() | minimiser_view);
}

void for_each_hash(std::vector<std::string> const & filenames, auto && callback) const
{
for (auto && filename : filenames)
Expand Down
1 change: 0 additions & 1 deletion include/valik/build/index_factory.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,6 @@ class index_factory
{
for (auto & seg : meta.segments_from_ind(i))
{
//!TODO: avoid reallocating memory for deques in the minimiser view construction
for (auto && value : seq | seqan3::views::slice(seg.start, seg.start + seg.len) | min_view)
ibf.emplace(value, seqan3::bin_index{seg.id});
}
Expand Down
110 changes: 83 additions & 27 deletions src/prepare/compute_bin_size.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ void compute_minimiser(valik::build_arguments const & arguments)
namespace detail
{

size_t kmer_count_from_minimiser_files(std::vector<std::vector<std::string>> const & bin_path, uint8_t const threads)
size_t kmer_count_from_minimiser_files(std::vector<std::vector<std::string>> const & minimiser_bin_path, uint8_t const threads)
{
std::mutex callback_mutex{};
size_t max_filesize{};
Expand Down Expand Up @@ -241,7 +241,7 @@ size_t kmer_count_from_minimiser_files(std::vector<std::vector<std::string>> con
callback(biggest_file, max_filesize);
};

valik::call_parallel_on_bins(worker, bin_path, threads);
valik::call_parallel_on_bins(worker, minimiser_bin_path, threads);

std::string shape_string{};
uint64_t window_size{};
Expand All @@ -254,32 +254,95 @@ size_t kmer_count_from_minimiser_files(std::vector<std::vector<std::string>> con
return max_count;
}

size_t kmer_count_from_sequence_files(std::vector<std::vector<std::string>> const & bin_path,
uint8_t const threads,
seqan3::shape const & shape,
uint32_t const window_size)
size_t kmer_count_from_sequence_files(valik::build_arguments const & arguments)
{
size_t max_count{};
std::mutex callback_mutex{};
file_reader<file_types::sequence> const reader{shape, window_size};

auto callback = [&callback_mutex, &max_count](auto && view)
auto callback = [&callback_mutex, &max_count](size_t const count)
{
auto const count = std::ranges::distance(view);
{
std::lock_guard<std::mutex> guard{callback_mutex};
max_count = std::max<size_t>(max_count, count);
}
std::lock_guard<std::mutex> guard{callback_mutex};
max_count = std::max<size_t>(max_count, count);
};

if (arguments.bin_path.size() > 1)
{
file_reader<file_types::sequence> const reader{arguments.shape, arguments.window_size};

auto worker = [&callback, &reader](auto && zipped_view, auto &&)
auto cluster_worker = [&callback, &reader](auto && zipped_view, auto &&)
{
std::unordered_set<uint64_t> kmers;
auto insert_it = std::inserter(kmers, kmers.end());
for (auto && [file_names, bin_number] : zipped_view)
{
kmers.clear();
reader.hash_into(file_names, insert_it);
callback(kmers.size());
}
};

// callback max_count which is the max number of k-mers in any sequence
valik::call_parallel_on_bins(cluster_worker, arguments.bin_path, arguments.threads);
}
else
{
for (auto && [file_names, bin_number] : zipped_view)
reader.on_hash(file_names, callback);
};
using sequence_t = seqan3::dna4_vector;
using sequence_file_t = seqan3::sequence_file_input<dna4_traits, seqan3::fields<seqan3::field::seq>>;
std::vector<valik::shared_reference_record<sequence_t>> reference_records;
reference_records.reserve(arguments.threads);
size_t seq_ind{0};
size_t processed_bin_count{0};

// callback max_count which is the max number of k-mers in any sequence
valik::call_parallel_on_bins(worker, bin_path, threads);
auto hash_view = [&] ()
{
return seqan3::views::minimiser_hash(arguments.shape,
seqan3::window_size{arguments.window_size},
seqan3::seed{adjust_seed(arguments.shape.count())});
};

auto segment_worker = [&callback, &hash_view](const auto && zipped_view, auto &&)
{
std::unordered_set<uint64_t> kmers;
auto insert_it = std::inserter(kmers, kmers.end());
for (auto && [shared_record, bin_number] : zipped_view)
{
kmers.clear();
//!TODO: bin number is not used. How to construct view of shared records to fulfill execution handler requirements?
//seqan3::debug_stream << bin_number << '\n';
auto const & seg = shared_record.segment;
std::ranges::copy(*shared_record.underlying_sequence |
seqan3::views::slice(seg.start, seg.start + seg.len) | hash_view(), insert_it);

callback(kmers.size());
}
};

for (auto && [seq] : sequence_file_t{arguments.bin_path[0][0]})
{
valik::metadata meta(arguments.ref_meta_path);
auto shared_seq = std::make_shared<sequence_t>(seq);
for (auto && seg : meta.segments_from_ind(seq_ind))
{
reference_records.emplace_back(shared_seq, std::move(seg));
if (reference_records.size() == arguments.threads)
{
auto chunked_view = seqan3::views::zip(reference_records, std::views::iota(processed_bin_count)) | seqan3::views::chunk(1u);
seqan3::detail::execution_handler_parallel executioner{arguments.threads};
executioner.bulk_execute(std::move(segment_worker), std::move(chunked_view), []() {});
processed_bin_count += reference_records.size();
reference_records.clear();
}
}
seq_ind++;
}

if (!reference_records.empty())
{
seqan3::detail::execution_handler_parallel executioner{reference_records.size()};
auto chunked_view = seqan3::views::zip(reference_records, std::views::iota(processed_bin_count)) | seqan3::views::chunk(1u);
executioner.bulk_execute(std::move(segment_worker), std::move(chunked_view), []() {});
}
}

return max_count;
}
Expand All @@ -297,14 +360,7 @@ size_t compute_bin_size(valik::build_arguments const & arguments)

size_t max_count = arguments.input_is_minimiser
? detail::kmer_count_from_minimiser_files(minimiser_files, arguments.threads)
: detail::kmer_count_from_sequence_files(arguments.bin_path,
arguments.threads,
arguments.shape,
arguments.window_size);

//!TODO: it would be more exact to count the k-mers in the largest segment instead of averaging across the whole sequence
if ((arguments.bin_path.size() == 1) && (!arguments.input_is_minimiser))
max_count /= (arguments.bins * 0.9); // assume the biggest bin is 10% bigger than the average
: detail::kmer_count_from_sequence_files(arguments);

assert(max_count > 0u);

Expand Down

0 comments on commit d91e28e

Please sign in to comment.