Skip to content

Commit

Permalink
Create aminimiser files for each split segment
Browse files Browse the repository at this point in the history
  • Loading branch information
eaasna committed Apr 11, 2024
1 parent 1f6509e commit 238ece5
Show file tree
Hide file tree
Showing 2 changed files with 126 additions and 26 deletions.
1 change: 1 addition & 0 deletions include/utilities/prepare/compute_bin_size.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

#include <valik/shared.hpp>
#include <valik/build/call_parallel_on_bins.hpp>
#include <valik/split/metadata.hpp>
#include <utilities/prepare/cutoff.hpp>

namespace seqan::hibf::build
Expand Down
151 changes: 125 additions & 26 deletions src/prepare/compute_bin_size.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,23 @@ std::filesystem::path get_output_path(std::filesystem::path const & output_dir,
return result;
}

std::filesystem::path get_output_path(std::filesystem::path const & output_dir, std::filesystem::path const & file_name, size_t const bin_id)
{
std::filesystem::path result{output_dir};
bool const is_compressed = raptor::cutoff::file_is_compressed(file_name);
result /= is_compressed ? file_name.stem().stem() : file_name.stem();
result += ".";
result += std::to_string(bin_id);
result += ".dummy_extension"; // https://github.com/seqan/raptor/issues/355
return result;
}

void compute_minimiser(valik::build_arguments const & arguments)
{
file_reader<file_types::sequence> const reader{arguments.shape, arguments.window_size};
raptor::cutoff const cutoffs{arguments};

auto worker = [&](auto && zipped_view, auto &&)
auto cluster_worker = [&](auto && zipped_view, auto &&)
{
for (auto && [file_names, bin_number] : zipped_view)
{
Expand All @@ -42,57 +53,122 @@ void compute_minimiser(valik::build_arguments const & arguments)

// If we are already done with this file, we can skip it. Otherwise, we create a ".in_progress" file to keep
// track of whether the minimiser computation was successful.
bool const already_done = std::filesystem::exists(header_file) && !std::filesystem::exists(progress_file);
bool const already_done = std::filesystem::exists(header_file) && std::filesystem::exists(header_file) &&
!std::filesystem::exists(progress_file);

if (already_done)
continue;
else
std::ofstream outfile{progress_file, std::ios::binary};

// The hash table stores how often a minimiser appears. It does not matter whether a minimiser appears
// 50 times or 2000 times, it is stored regardless because the biggest cutoff value is 50. Hence,
// the hash table stores only values up to 254 to save memory.
std::unordered_map<uint64_t, uint8_t> minimiser_table{};
// The map is (re-)constructed for each file. The alternative is to construct it once for each thread
std::unordered_set<uint64_t> distinct_minimisers{};
// The set is (re-)constructed for each file. The alternative is to construct it once for each thread
// and clear+reuse it for every file that a thread works on. However, this dramatically increases
// memory consumption because the map will stay as big as needed for the biggest encountered file.

reader.for_each_hash(file_names,
[&](auto && hash)
{
minimiser_table[hash] = std::min<uint8_t>(254u, minimiser_table[hash] + 1);
distinct_minimisers.insert(hash);
});

uint8_t const cutoff = cutoffs.get(file_name);
uint64_t count{};

{
std::ofstream outfile{minimiser_file, std::ios::binary};
for (auto && [hash, occurrences] : minimiser_table)
for (auto && hash : distinct_minimisers)
{
if (occurrences >= cutoff)
{
outfile.write(reinterpret_cast<const char *>(&hash), sizeof(hash));
++count;
}
outfile.write(reinterpret_cast<const char *>(&hash), sizeof(hash));
++count;
}
}

{
std::ofstream headerfile{header_file};
headerfile << arguments.shape.to_string() << '\t' << arguments.window_size << '\t'
<< static_cast<uint16_t>(cutoff) << '\t' << count << '\n';
headerfile << arguments.shape.to_string() << '\t' << arguments.window_size << '\t' << count << '\n';
}

std::filesystem::remove(progress_file);
}
};

size_t const chunk_size =
std::max<size_t>(1, std::floor(arguments.bin_path.size() / static_cast<double>(arguments.threads)));
auto chunked_view = seqan3::views::zip(arguments.bin_path, std::views::iota(0u)) | seqan3::views::chunk(chunk_size);
seqan3::detail::execution_handler_parallel executioner{arguments.threads};
executioner.bulk_execute(std::move(worker), std::move(chunked_view), []() {});
auto segment_worker = [&](auto && meta)
{
std::filesystem::path const file_name{arguments.bin_path[0][0]};

using sequence_file_t = seqan3::sequence_file_input<dna4_traits, seqan3::fields<seqan3::field::seq>>;
size_t i{0};
for (auto && [seq] : sequence_file_t{arguments.bin_path[0][0]})
{
for (auto & seg : meta.segments_from_ind(i))
{
std::filesystem::path output_path = get_output_path(arguments.out_dir, file_name, seg.id);
std::filesystem::path const minimiser_file =
std::filesystem::path{output_path}.replace_extension("minimiser");
std::filesystem::path const progress_file =
std::filesystem::path{output_path}.replace_extension("in_progress");
std::filesystem::path const header_file = std::filesystem::path{output_path}.replace_extension("header");

// If we are already done with this file, we can skip it. Otherwise, we create a ".in_progress" file to keep
// track of whether the minimiser computation was successful.
bool const already_done = std::filesystem::exists(header_file) && std::filesystem::exists(header_file) &&
!std::filesystem::exists(progress_file);

if (already_done)
continue;
else
std::ofstream outfile{progress_file, std::ios::binary};

std::unordered_set<uint64_t> distinct_minimisers{};
// The map is (re-)constructed for each segment. The alternative is to construct it once for each thread
// and clear+reuse it for every file that a thread works on. However, this dramatically increases
// memory consumption because the map will stay as big as needed for the biggest encountered file.

auto hash_view = [&] ()
{
return seqan3::views::minimiser_hash(arguments.shape,
seqan3::window_size{arguments.window_size},
seqan3::seed{adjust_seed(arguments.shape_weight)});
};

for (auto && value : seq | seqan3::views::slice(seg.start, seg.start + seg.len) | hash_view())
distinct_minimisers.insert(value);

uint64_t count{};
{
std::ofstream outfile{minimiser_file, std::ios::binary};
for (auto && hash : distinct_minimisers)
{
outfile.write(reinterpret_cast<const char *>(&hash), sizeof(hash));
++count;
}
}

{
std::ofstream headerfile{header_file};
headerfile << arguments.shape.to_string() << '\t' << arguments.window_size << '\t' << count << '\n';
}

std::filesystem::remove(progress_file);
}
i++;
}
};

if (arguments.bin_path.size() > 1)
{
size_t const chunk_size =
std::max<size_t>(1, std::floor(arguments.bin_path.size() / static_cast<double>(arguments.threads)));
auto chunked_view = seqan3::views::zip(arguments.bin_path, std::views::iota(0u)) | seqan3::views::chunk(chunk_size);
seqan3::detail::execution_handler_parallel executioner{arguments.threads};
executioner.bulk_execute(std::move(cluster_worker), std::move(chunked_view), []() {});
}
else
{
//!TODO: avoid loading twice
valik::metadata meta(arguments.ref_meta_path);
segment_worker(meta);
}
}

namespace detail
Expand Down Expand Up @@ -141,12 +217,11 @@ size_t kmer_count_from_minimiser_files(std::vector<std::vector<std::string>> con

std::string shape_string{};
uint64_t window_size{};
uint16_t cutoff{};
size_t max_count{};

biggest_file.replace_extension("header");
std::ifstream file_stream{biggest_file};
file_stream >> shape_string >> window_size >> cutoff >> max_count;
file_stream >> shape_string >> window_size >> max_count;

return max_count;
}
Expand Down Expand Up @@ -186,14 +261,38 @@ size_t kmer_count_from_sequence_files(std::vector<std::vector<std::string>> cons

size_t compute_bin_size(valik::build_arguments const & arguments)
{
std::vector<std::vector<std::string>> minimiser_files{};
if (arguments.bin_path.size() > 1)
{
for (std::vector<std::string> bin_files : minimiser_files)
{
std::vector<std::string> bin_headers;
for (std::filesystem::path file : bin_files)
{
bin_headers.push_back(file.replace_extension("header"));
}
minimiser_files.push_back(bin_headers);
}
}
else
{
valik::metadata meta(arguments.ref_meta_path);
for (size_t bin{0}; bin < arguments.bins; bin++)
{
std::filesystem::path file = arguments.bin_path[0][0];
minimiser_files.emplace_back((std::vector<std::string>){file.replace_extension(std::to_string(bin) + ".header")});
}
}

size_t max_count = arguments.input_is_minimiser
? detail::kmer_count_from_minimiser_files(arguments.bin_path, arguments.threads)
? detail::kmer_count_from_minimiser_files(minimiser_files, arguments.threads)
: detail::kmer_count_from_sequence_files(arguments.bin_path,
arguments.threads,
arguments.shape,
arguments.window_size);

if (arguments.bin_path.size() == 1)
//!TODO: it would be more exact to count the k-mers in the largest segment instead of averaging across the whole sequence
if ((arguments.bin_path.size() == 1) && (!arguments.input_is_minimiser))
max_count /= arguments.bins;

assert(max_count > 0u);
Expand Down

0 comments on commit 238ece5

Please sign in to comment.