diff --git a/include/utilities/prepare/compute_bin_size.hpp b/include/utilities/prepare/compute_bin_size.hpp index cf7b5496..dfc02a53 100644 --- a/include/utilities/prepare/compute_bin_size.hpp +++ b/include/utilities/prepare/compute_bin_size.hpp @@ -22,6 +22,7 @@ #include #include #include +#include namespace seqan::hibf::build { diff --git a/include/utilities/prepare/reference_record.hpp b/include/utilities/prepare/reference_record.hpp new file mode 100755 index 00000000..6156e556 --- /dev/null +++ b/include/utilities/prepare/reference_record.hpp @@ -0,0 +1,19 @@ +#pragma once + +#include +#include + +namespace valik +{ + +template +struct shared_reference_record +{ + std::shared_ptr underlying_sequence; + metadata::segment_stats segment; + + shared_reference_record(std::shared_ptr const & seq_ptr, metadata::segment_stats && seg) : + underlying_sequence(seq_ptr), segment(seg) { } +}; + +} // namespace valik diff --git a/src/prepare/compute_bin_size.cpp b/src/prepare/compute_bin_size.cpp index fc938139..3fdbe1b7 100644 --- a/src/prepare/compute_bin_size.cpp +++ b/src/prepare/compute_bin_size.cpp @@ -33,72 +33,81 @@ std::filesystem::path get_output_path(std::filesystem::path const & output_dir, void compute_minimiser(valik::build_arguments const & arguments) { - file_reader const reader{arguments.shape, arguments.window_size}; - - auto cluster_worker = [&](auto && zipped_view, auto &&) + seqan3::detail::execution_handler_parallel executioner{arguments.threads}; + if (arguments.bin_path.size() > 1) { - for (auto && [file_names, bin_number] : zipped_view) + file_reader const reader{arguments.shape, arguments.window_size}; + auto cluster_worker = [&](auto && zipped_view, auto &&) { - std::filesystem::path const file_name{file_names[0]}; - std::filesystem::path output_path = get_output_path(arguments.out_dir, file_name); + for (auto && [file_names, bin_number] : zipped_view) + { + std::filesystem::path const file_name{file_names[0]}; + std::filesystem::path output_path = get_output_path(arguments.out_dir, file_name); - std::filesystem::path const minimiser_file = - std::filesystem::path{output_path}.replace_extension("minimiser"); - std::filesystem::path const progress_file = - std::filesystem::path{output_path}.replace_extension("in_progress"); - std::filesystem::path const header_file = std::filesystem::path{output_path}.replace_extension("header"); + std::filesystem::path const minimiser_file = + std::filesystem::path{output_path}.replace_extension("minimiser"); + std::filesystem::path const progress_file = + std::filesystem::path{output_path}.replace_extension("in_progress"); + std::filesystem::path const header_file = std::filesystem::path{output_path}.replace_extension("header"); - // If we are already done with this file, we can skip it. Otherwise, we create a ".in_progress" file to keep - // track of whether the minimiser computation was successful. - bool const already_done = std::filesystem::exists(header_file) && std::filesystem::exists(header_file) && - !std::filesystem::exists(progress_file); + // If we are already done with this file, we can skip it. Otherwise, we create a ".in_progress" file to keep + // track of whether the minimiser computation was successful. + bool const already_done = std::filesystem::exists(header_file) && std::filesystem::exists(header_file) && + !std::filesystem::exists(progress_file); - if (already_done) - continue; - else - std::ofstream outfile{progress_file, std::ios::binary}; + if (already_done) + continue; + else + std::ofstream outfile{progress_file, std::ios::binary}; - std::unordered_set distinct_minimisers{}; - // The set is (re-)constructed for each file. The alternative is to construct it once for each thread - // and clear+reuse it for every file that a thread works on. However, this dramatically increases - // memory consumption because the map will stay as big as needed for the biggest encountered file. + std::unordered_set distinct_minimisers{}; + // The set is (re-)constructed for each file. The alternative is to construct it once for each thread + // and clear+reuse it for every file that a thread works on. However, this dramatically increases + // memory consumption because the map will stay as big as needed for the biggest encountered file. - reader.for_each_hash(file_names, - [&](auto && hash) - { - distinct_minimisers.insert(hash); - }); + reader.for_each_hash(file_names, + [&](auto && hash) + { + distinct_minimisers.insert(hash); + }); - uint64_t count{}; + uint64_t count{}; - { - std::ofstream outfile{minimiser_file, std::ios::binary}; - for (auto && hash : distinct_minimisers) { - outfile.write(reinterpret_cast(&hash), sizeof(hash)); - ++count; + //!TODO: apply k-mer count cutoffs in metagenome search + std::ofstream outfile{minimiser_file, std::ios::binary}; + for (auto && hash : distinct_minimisers) + { + outfile.write(reinterpret_cast(&hash), sizeof(hash)); + ++count; + } } - } - { - std::ofstream headerfile{header_file}; - headerfile << arguments.shape.to_string() << '\t' << arguments.window_size << '\t' << count << '\n'; - } - - std::filesystem::remove(progress_file); - } - }; + { + std::ofstream headerfile{header_file}; + headerfile << arguments.shape.to_string() << '\t' << arguments.window_size << '\t' << count << '\n'; + } - auto segment_worker = [&](auto && meta) + std::filesystem::remove(progress_file); + } + }; + + size_t const chunk_size = + std::max(1, std::floor(arguments.bin_path.size() / static_cast(arguments.threads))); + auto chunked_view = seqan3::views::zip(arguments.bin_path, std::views::iota(0u)) | seqan3::views::chunk(chunk_size); + executioner.bulk_execute(std::move(cluster_worker), std::move(chunked_view), []() {}); + } + else { + valik::metadata meta(arguments.ref_meta_path); std::filesystem::path const file_name{arguments.bin_path[0][0]}; - using sequence_file_t = seqan3::sequence_file_input>; - size_t i{0}; - for (auto && [seq] : sequence_file_t{arguments.bin_path[0][0]}) + auto segment_worker = [&](const auto && zipped_view, auto &&) { - for (auto & seg : meta.segments_from_ind(i)) + for (auto && [shared_record, bin_number] : zipped_view) { + auto const & seg = shared_record.segment; + std::filesystem::path output_path = get_output_path(arguments.out_dir, file_name, seg.id); std::filesystem::path const minimiser_file = std::filesystem::path{output_path}.replace_extension("minimiser"); @@ -128,7 +137,7 @@ void compute_minimiser(valik::build_arguments const & arguments) seqan3::seed{adjust_seed(arguments.shape_weight)}); }; - for (auto && value : seq | seqan3::views::slice(seg.start, seg.start + seg.len) | hash_view()) + for (auto const value : *shared_record.underlying_sequence | seqan3::views::slice(seg.start, seg.start + seg.len) | hash_view()) { minimiser_table[value] = std::min(254u, minimiser_table[value] + 1); } @@ -138,7 +147,7 @@ void compute_minimiser(valik::build_arguments const & arguments) std::ofstream outfile{minimiser_file, std::ios::binary}; for (auto && [hash, occurrences] : minimiser_table) { - if (occurrences > arguments.kmer_count_min_cutoff && occurrences < arguments.kmer_count_max_cutoff) + if (occurrences >= arguments.kmer_count_min_cutoff && occurrences <= arguments.kmer_count_max_cutoff) { outfile.write(reinterpret_cast(&hash), sizeof(hash)); ++count; @@ -153,22 +162,36 @@ void compute_minimiser(valik::build_arguments const & arguments) std::filesystem::remove(progress_file); } - i++; + }; + + using sequence_t = seqan3::dna4_vector; + using sequence_file_t = seqan3::sequence_file_input>; + std::vector> reference_records; + reference_records.reserve(arguments.threads); + size_t seq_ind{0}; + size_t processed_bin_count{0}; + for (auto && [seq] : sequence_file_t{arguments.bin_path[0][0]}) + { + auto shared_seq = std::make_shared(seq); + for (auto && seg : meta.segments_from_ind(seq_ind)) + { + reference_records.emplace_back(shared_seq, std::move(seg)); + if (reference_records.size() == arguments.threads) + { + auto chunked_view = seqan3::views::zip(reference_records, std::views::iota(processed_bin_count)) | seqan3::views::chunk(1u); + processed_bin_count += reference_records.size(); + executioner.bulk_execute(std::move(segment_worker), std::move(chunked_view), []() {}); + reference_records.clear(); + } + } + seq_ind++; } - }; - if (arguments.bin_path.size() > 1) - { - size_t const chunk_size = - std::max(1, std::floor(arguments.bin_path.size() / static_cast(arguments.threads))); - auto chunked_view = seqan3::views::zip(arguments.bin_path, std::views::iota(0u)) | seqan3::views::chunk(chunk_size); - seqan3::detail::execution_handler_parallel executioner{arguments.threads}; - executioner.bulk_execute(std::move(cluster_worker), std::move(chunked_view), []() {}); - } - else - { - valik::metadata meta(arguments.ref_meta_path); - segment_worker(meta); + if (!reference_records.empty()) + { + auto chunked_view = seqan3::views::zip(reference_records, std::views::iota(0u)) | seqan3::views::chunk(1u); + executioner.bulk_execute(std::move(segment_worker), std::move(chunked_view), []() {}); + } } }