diff --git a/include/utilities/prepare/compute_bin_size.hpp b/include/utilities/prepare/compute_bin_size.hpp index 0bda51ce..2e0e28cf 100644 --- a/include/utilities/prepare/compute_bin_size.hpp +++ b/include/utilities/prepare/compute_bin_size.hpp @@ -20,6 +20,7 @@ #include #include +#include #include namespace seqan::hibf::build diff --git a/src/prepare/compute_bin_size.cpp b/src/prepare/compute_bin_size.cpp index 404b7de5..ff5dc53b 100644 --- a/src/prepare/compute_bin_size.cpp +++ b/src/prepare/compute_bin_size.cpp @@ -22,12 +22,23 @@ std::filesystem::path get_output_path(std::filesystem::path const & output_dir, return result; } +std::filesystem::path get_output_path(std::filesystem::path const & output_dir, std::filesystem::path const & file_name, size_t const bin_id) +{ + std::filesystem::path result{output_dir}; + bool const is_compressed = raptor::cutoff::file_is_compressed(file_name); + result /= is_compressed ? file_name.stem().stem() : file_name.stem(); + result += "."; + result += std::to_string(bin_id); + result += ".dummy_extension"; // https://github.com/seqan/raptor/issues/355 + return result; +} + void compute_minimiser(valik::build_arguments const & arguments) { file_reader const reader{arguments.shape, arguments.window_size}; raptor::cutoff const cutoffs{arguments}; - auto worker = [&](auto && zipped_view, auto &&) + auto cluster_worker = [&](auto && zipped_view, auto &&) { for (auto && [file_names, bin_number] : zipped_view) { @@ -42,57 +53,122 @@ void compute_minimiser(valik::build_arguments const & arguments) // If we are already done with this file, we can skip it. Otherwise, we create a ".in_progress" file to keep // track of whether the minimiser computation was successful. - bool const already_done = std::filesystem::exists(header_file) && !std::filesystem::exists(progress_file); + bool const already_done = std::filesystem::exists(header_file) && std::filesystem::exists(header_file) && + !std::filesystem::exists(progress_file); if (already_done) continue; else std::ofstream outfile{progress_file, std::ios::binary}; - // The hash table stores how often a minimiser appears. It does not matter whether a minimiser appears - // 50 times or 2000 times, it is stored regardless because the biggest cutoff value is 50. Hence, - // the hash table stores only values up to 254 to save memory. - std::unordered_map minimiser_table{}; - // The map is (re-)constructed for each file. The alternative is to construct it once for each thread + std::unordered_set distinct_minimisers{}; + // The set is (re-)constructed for each file. The alternative is to construct it once for each thread // and clear+reuse it for every file that a thread works on. However, this dramatically increases // memory consumption because the map will stay as big as needed for the biggest encountered file. reader.for_each_hash(file_names, [&](auto && hash) { - minimiser_table[hash] = std::min(254u, minimiser_table[hash] + 1); + distinct_minimisers.insert(hash); }); - uint8_t const cutoff = cutoffs.get(file_name); uint64_t count{}; { std::ofstream outfile{minimiser_file, std::ios::binary}; - for (auto && [hash, occurrences] : minimiser_table) + for (auto && hash : distinct_minimisers) { - if (occurrences >= cutoff) - { - outfile.write(reinterpret_cast(&hash), sizeof(hash)); - ++count; - } + outfile.write(reinterpret_cast(&hash), sizeof(hash)); + ++count; } } { std::ofstream headerfile{header_file}; - headerfile << arguments.shape.to_string() << '\t' << arguments.window_size << '\t' - << static_cast(cutoff) << '\t' << count << '\n'; + headerfile << arguments.shape.to_string() << '\t' << arguments.window_size << '\t' << count << '\n'; } std::filesystem::remove(progress_file); } }; - size_t const chunk_size = - std::max(1, std::floor(arguments.bin_path.size() / static_cast(arguments.threads))); - auto chunked_view = seqan3::views::zip(arguments.bin_path, std::views::iota(0u)) | seqan3::views::chunk(chunk_size); - seqan3::detail::execution_handler_parallel executioner{arguments.threads}; - executioner.bulk_execute(std::move(worker), std::move(chunked_view), []() {}); + auto segment_worker = [&](auto && meta) + { + std::filesystem::path const file_name{arguments.bin_path[0][0]}; + + using sequence_file_t = seqan3::sequence_file_input>; + size_t i{0}; + for (auto && [seq] : sequence_file_t{arguments.bin_path[0][0]}) + { + for (auto & seg : meta.segments_from_ind(i)) + { + std::filesystem::path output_path = get_output_path(arguments.out_dir, file_name, seg.id); + std::filesystem::path const minimiser_file = + std::filesystem::path{output_path}.replace_extension("minimiser"); + std::filesystem::path const progress_file = + std::filesystem::path{output_path}.replace_extension("in_progress"); + std::filesystem::path const header_file = std::filesystem::path{output_path}.replace_extension("header"); + + // If we are already done with this file, we can skip it. Otherwise, we create a ".in_progress" file to keep + // track of whether the minimiser computation was successful. + bool const already_done = std::filesystem::exists(header_file) && std::filesystem::exists(header_file) && + !std::filesystem::exists(progress_file); + + if (already_done) + continue; + else + std::ofstream outfile{progress_file, std::ios::binary}; + + std::unordered_set distinct_minimisers{}; + // The map is (re-)constructed for each segment. The alternative is to construct it once for each thread + // and clear+reuse it for every file that a thread works on. However, this dramatically increases + // memory consumption because the map will stay as big as needed for the biggest encountered file. + + auto hash_view = [&] () + { + return seqan3::views::minimiser_hash(arguments.shape, + seqan3::window_size{arguments.window_size}, + seqan3::seed{adjust_seed(arguments.shape_weight)}); + }; + + for (auto && value : seq | seqan3::views::slice(seg.start, seg.start + seg.len) | hash_view()) + distinct_minimisers.insert(value); + + uint64_t count{}; + { + std::ofstream outfile{minimiser_file, std::ios::binary}; + for (auto && hash : distinct_minimisers) + { + outfile.write(reinterpret_cast(&hash), sizeof(hash)); + ++count; + } + } + + { + std::ofstream headerfile{header_file}; + headerfile << arguments.shape.to_string() << '\t' << arguments.window_size << '\t' << count << '\n'; + } + + std::filesystem::remove(progress_file); + } + i++; + } + }; + + if (arguments.bin_path.size() > 1) + { + size_t const chunk_size = + std::max(1, std::floor(arguments.bin_path.size() / static_cast(arguments.threads))); + auto chunked_view = seqan3::views::zip(arguments.bin_path, std::views::iota(0u)) | seqan3::views::chunk(chunk_size); + seqan3::detail::execution_handler_parallel executioner{arguments.threads}; + executioner.bulk_execute(std::move(cluster_worker), std::move(chunked_view), []() {}); + } + else + { + //!TODO: avoid loading twice + valik::metadata meta(arguments.ref_meta_path); + segment_worker(meta); + } } namespace detail @@ -141,12 +217,11 @@ size_t kmer_count_from_minimiser_files(std::vector> con std::string shape_string{}; uint64_t window_size{}; - uint16_t cutoff{}; size_t max_count{}; biggest_file.replace_extension("header"); std::ifstream file_stream{biggest_file}; - file_stream >> shape_string >> window_size >> cutoff >> max_count; + file_stream >> shape_string >> window_size >> max_count; return max_count; } @@ -186,14 +261,38 @@ size_t kmer_count_from_sequence_files(std::vector> cons size_t compute_bin_size(valik::build_arguments const & arguments) { + std::vector> minimiser_files{}; + if (arguments.bin_path.size() > 1) + { + for (std::vector bin_files : minimiser_files) + { + std::vector bin_headers; + for (std::filesystem::path file : bin_files) + { + bin_headers.push_back(file.replace_extension("header")); + } + minimiser_files.push_back(bin_headers); + } + } + else + { + valik::metadata meta(arguments.ref_meta_path); + for (size_t bin{0}; bin < arguments.bins; bin++) + { + std::filesystem::path file = arguments.bin_path[0][0]; + minimiser_files.emplace_back((std::vector){file.replace_extension(std::to_string(bin) + ".header")}); + } + } + size_t max_count = arguments.input_is_minimiser - ? detail::kmer_count_from_minimiser_files(arguments.bin_path, arguments.threads) + ? detail::kmer_count_from_minimiser_files(minimiser_files, arguments.threads) : detail::kmer_count_from_sequence_files(arguments.bin_path, arguments.threads, arguments.shape, arguments.window_size); - if (arguments.bin_path.size() == 1) + //!TODO: it would be more exact to count the k-mers in the largest segment instead of averaging across the whole sequence + if ((arguments.bin_path.size() == 1) && (!arguments.input_is_minimiser)) max_count /= arguments.bins; assert(max_count > 0u);