Skip to content

Commit

Permalink
Count segment minimisers in parallel
Browse files Browse the repository at this point in the history
  • Loading branch information
eaasna committed Oct 7, 2024
1 parent 783bae9 commit 0a5699e
Show file tree
Hide file tree
Showing 3 changed files with 107 additions and 64 deletions.
1 change: 1 addition & 0 deletions include/utilities/prepare/compute_bin_size.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include <valik/build/call_parallel_on_bins.hpp>
#include <valik/split/metadata.hpp>
#include <utilities/prepare/parse_bin_paths.hpp>
#include <utilities/prepare/reference_record.hpp>

namespace seqan::hibf::build
{
Expand Down
19 changes: 19 additions & 0 deletions include/utilities/prepare/reference_record.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
#pragma once

#include <memory>
#include <valik/split/metadata.hpp>

namespace valik
{

template <typename sequence_t>
struct shared_reference_record
{
std::shared_ptr<sequence_t> underlying_sequence;
metadata::segment_stats segment;

shared_reference_record(std::shared_ptr<sequence_t> const & seq_ptr, metadata::segment_stats && seg) :
underlying_sequence(seq_ptr), segment(seg) { }
};

} // namespace valik
151 changes: 87 additions & 64 deletions src/prepare/compute_bin_size.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,72 +33,81 @@ std::filesystem::path get_output_path(std::filesystem::path const & output_dir,

void compute_minimiser(valik::build_arguments const & arguments)
{
file_reader<file_types::sequence> const reader{arguments.shape, arguments.window_size};

auto cluster_worker = [&](auto && zipped_view, auto &&)
seqan3::detail::execution_handler_parallel executioner{arguments.threads};
if (arguments.bin_path.size() > 1)
{
for (auto && [file_names, bin_number] : zipped_view)
file_reader<file_types::sequence> const reader{arguments.shape, arguments.window_size};
auto cluster_worker = [&](auto && zipped_view, auto &&)
{
std::filesystem::path const file_name{file_names[0]};
std::filesystem::path output_path = get_output_path(arguments.out_dir, file_name);
for (auto && [file_names, bin_number] : zipped_view)
{
std::filesystem::path const file_name{file_names[0]};
std::filesystem::path output_path = get_output_path(arguments.out_dir, file_name);

std::filesystem::path const minimiser_file =
std::filesystem::path{output_path}.replace_extension("minimiser");
std::filesystem::path const progress_file =
std::filesystem::path{output_path}.replace_extension("in_progress");
std::filesystem::path const header_file = std::filesystem::path{output_path}.replace_extension("header");
std::filesystem::path const minimiser_file =
std::filesystem::path{output_path}.replace_extension("minimiser");
std::filesystem::path const progress_file =
std::filesystem::path{output_path}.replace_extension("in_progress");
std::filesystem::path const header_file = std::filesystem::path{output_path}.replace_extension("header");

// If we are already done with this file, we can skip it. Otherwise, we create a ".in_progress" file to keep
// track of whether the minimiser computation was successful.
bool const already_done = std::filesystem::exists(header_file) && std::filesystem::exists(header_file) &&
!std::filesystem::exists(progress_file);
// If we are already done with this file, we can skip it. Otherwise, we create a ".in_progress" file to keep
// track of whether the minimiser computation was successful.
bool const already_done = std::filesystem::exists(header_file) && std::filesystem::exists(header_file) &&
!std::filesystem::exists(progress_file);

if (already_done)
continue;
else
std::ofstream outfile{progress_file, std::ios::binary};
if (already_done)
continue;
else
std::ofstream outfile{progress_file, std::ios::binary};

std::unordered_set<uint64_t> distinct_minimisers{};
// The set is (re-)constructed for each file. The alternative is to construct it once for each thread
// and clear+reuse it for every file that a thread works on. However, this dramatically increases
// memory consumption because the map will stay as big as needed for the biggest encountered file.
std::unordered_set<uint64_t> distinct_minimisers{};
// The set is (re-)constructed for each file. The alternative is to construct it once for each thread
// and clear+reuse it for every file that a thread works on. However, this dramatically increases
// memory consumption because the map will stay as big as needed for the biggest encountered file.

reader.for_each_hash(file_names,
[&](auto && hash)
{
distinct_minimisers.insert(hash);
});
reader.for_each_hash(file_names,
[&](auto && hash)
{
distinct_minimisers.insert(hash);
});

uint64_t count{};
uint64_t count{};

{
std::ofstream outfile{minimiser_file, std::ios::binary};
for (auto && hash : distinct_minimisers)
{
outfile.write(reinterpret_cast<const char *>(&hash), sizeof(hash));
++count;
//!TODO: apply k-mer count cutoffs in metagenome search
std::ofstream outfile{minimiser_file, std::ios::binary};
for (auto && hash : distinct_minimisers)
{
outfile.write(reinterpret_cast<const char *>(&hash), sizeof(hash));
++count;
}
}
}

{
std::ofstream headerfile{header_file};
headerfile << arguments.shape.to_string() << '\t' << arguments.window_size << '\t' << count << '\n';
}

std::filesystem::remove(progress_file);
}
};
{
std::ofstream headerfile{header_file};
headerfile << arguments.shape.to_string() << '\t' << arguments.window_size << '\t' << count << '\n';
}

auto segment_worker = [&](auto && meta)
std::filesystem::remove(progress_file);
}
};

size_t const chunk_size =
std::max<size_t>(1, std::floor(arguments.bin_path.size() / static_cast<double>(arguments.threads)));
auto chunked_view = seqan3::views::zip(arguments.bin_path, std::views::iota(0u)) | seqan3::views::chunk(chunk_size);
executioner.bulk_execute(std::move(cluster_worker), std::move(chunked_view), []() {});
}
else
{
valik::metadata meta(arguments.ref_meta_path);
std::filesystem::path const file_name{arguments.bin_path[0][0]};

using sequence_file_t = seqan3::sequence_file_input<dna4_traits, seqan3::fields<seqan3::field::seq>>;
size_t i{0};
for (auto && [seq] : sequence_file_t{arguments.bin_path[0][0]})
auto segment_worker = [&](const auto && zipped_view, auto &&)
{
for (auto & seg : meta.segments_from_ind(i))
for (auto && [shared_record, bin_number] : zipped_view)
{
auto const & seg = shared_record.segment;

std::filesystem::path output_path = get_output_path(arguments.out_dir, file_name, seg.id);
std::filesystem::path const minimiser_file =
std::filesystem::path{output_path}.replace_extension("minimiser");
Expand Down Expand Up @@ -128,7 +137,7 @@ void compute_minimiser(valik::build_arguments const & arguments)
seqan3::seed{adjust_seed(arguments.shape_weight)});
};

for (auto && value : seq | seqan3::views::slice(seg.start, seg.start + seg.len) | hash_view())
for (auto const value : *shared_record.underlying_sequence | seqan3::views::slice(seg.start, seg.start + seg.len) | hash_view())
{
minimiser_table[value] = std::min<uint8_t>(254u, minimiser_table[value] + 1);
}
Expand All @@ -138,7 +147,7 @@ void compute_minimiser(valik::build_arguments const & arguments)
std::ofstream outfile{minimiser_file, std::ios::binary};
for (auto && [hash, occurrences] : minimiser_table)
{
if (occurrences > arguments.kmer_count_min_cutoff && occurrences < arguments.kmer_count_max_cutoff)
if (occurrences >= arguments.kmer_count_min_cutoff && occurrences <= arguments.kmer_count_max_cutoff)
{
outfile.write(reinterpret_cast<const char *>(&hash), sizeof(hash));
++count;
Expand All @@ -153,22 +162,36 @@ void compute_minimiser(valik::build_arguments const & arguments)

std::filesystem::remove(progress_file);
}
i++;
};

using sequence_t = seqan3::dna4_vector;
using sequence_file_t = seqan3::sequence_file_input<dna4_traits, seqan3::fields<seqan3::field::seq>>;
std::vector<valik::shared_reference_record<sequence_t>> reference_records;
reference_records.reserve(arguments.threads);
size_t seq_ind{0};
size_t processed_bin_count{0};
for (auto && [seq] : sequence_file_t{arguments.bin_path[0][0]})
{
auto shared_seq = std::make_shared<sequence_t>(seq);
for (auto && seg : meta.segments_from_ind(seq_ind))
{
reference_records.emplace_back(shared_seq, std::move(seg));
if (reference_records.size() == arguments.threads)
{
auto chunked_view = seqan3::views::zip(reference_records, std::views::iota(processed_bin_count)) | seqan3::views::chunk(1u);
processed_bin_count += reference_records.size();
executioner.bulk_execute(std::move(segment_worker), std::move(chunked_view), []() {});
reference_records.clear();
}
}
seq_ind++;
}
};

if (arguments.bin_path.size() > 1)
{
size_t const chunk_size =
std::max<size_t>(1, std::floor(arguments.bin_path.size() / static_cast<double>(arguments.threads)));
auto chunked_view = seqan3::views::zip(arguments.bin_path, std::views::iota(0u)) | seqan3::views::chunk(chunk_size);
seqan3::detail::execution_handler_parallel executioner{arguments.threads};
executioner.bulk_execute(std::move(cluster_worker), std::move(chunked_view), []() {});
}
else
{
valik::metadata meta(arguments.ref_meta_path);
segment_worker(meta);
if (!reference_records.empty())
{
auto chunked_view = seqan3::views::zip(reference_records, std::views::iota(0u)) | seqan3::views::chunk(1u);
executioner.bulk_execute(std::move(segment_worker), std::move(chunked_view), []() {});
}
}
}

Expand Down

0 comments on commit 0a5699e

Please sign in to comment.