diff --git a/include/builder/build.cpp b/include/builder/build.cpp index 128f1bc..f8e6862 100644 --- a/include/builder/build.cpp +++ b/include/builder/build.cpp @@ -71,14 +71,12 @@ void dictionary::build(std::string const& filename, build_configuration const& b } } - /* step 2: sort minimizers and build MPHF ***/ + /* step 2: merge minimizers and build MPHF ***/ timer.start(); - data.minimizers.sort(); - auto minimizers_filename = data.minimizers.get_minimizers_filename(tmp_dirname); + data.minimizers.merge(); { - data.minimizers.flush(minimizers_filename); - data.minimizers.release(); // release internal memory - mm::file_source input(minimizers_filename, mm::advice::sequential); + mm::file_source input(data.minimizers.get_minimizers_filename(), + mm::advice::sequential); uint64_t num_minimizers = 0; for (minimizers_tuples_iterator it(input.data(), input.data() + input.size()); @@ -121,7 +119,7 @@ void dictionary::build(std::string const& filename, build_configuration const& b if (build_config.verbose) buckets_stats.print(); - std::remove(minimizers_filename.c_str()); + data.minimizers.remove_tmp_file(); } } // namespace sshash diff --git a/include/builder/build_index.cpp b/include/builder/build_index.cpp index 8f2ddf4..075b372 100644 --- a/include/builder/build_index.cpp +++ b/include/builder/build_index.cpp @@ -14,10 +14,8 @@ buckets_statistics build_index(parse_data& data, minimizers const& m_minimizers, std::cout << "bits_per_offset = ceil(log2(" << data.strings.num_bits() / 2 << ")) = " << std::ceil(std::log2(data.strings.num_bits() / 2)) << std::endl; - // TODO: have user input here - std::string tmp_dirname = constants::default_tmp_dirname; - auto minimizers_filename = data.minimizers.get_minimizers_filename(tmp_dirname); - mm::file_source input(minimizers_filename, mm::advice::sequential); + mm::file_source input(data.minimizers.get_minimizers_filename(), + mm::advice::sequential); for (minimizers_tuples_iterator it(input.data(), input.data() + input.size()); it.has_next(); it.next()) { diff --git a/include/builder/build_skew_index.cpp b/include/builder/build_skew_index.cpp index b9aff6e..8316089 100644 --- a/include/builder/build_skew_index.cpp +++ b/include/builder/build_skew_index.cpp @@ -16,10 +16,8 @@ void build_skew_index(skew_index& m_skew_index, parse_data& data, buckets const& std::cout << "log2_max_num_super_kmers_in_bucket " << m_skew_index.log2_max_num_super_kmers_in_bucket << std::endl; - // TODO: have user input here - std::string tmp_dirname = constants::default_tmp_dirname; - auto minimizers_filename = data.minimizers.get_minimizers_filename(tmp_dirname); - mm::file_source input(minimizers_filename, mm::advice::sequential); + mm::file_source input(data.minimizers.get_minimizers_filename(), + mm::advice::sequential); uint64_t num_buckets_in_skew_index = 0; uint64_t num_super_kmers_in_skew_index = 0; diff --git a/include/builder/parse_file.cpp b/include/builder/parse_file.cpp index d017279..db1dfe6 100644 --- a/include/builder/parse_file.cpp +++ b/include/builder/parse_file.cpp @@ -181,6 +181,7 @@ void parse_file(std::istream& is, parse_data& data, build_configuration const& b append_super_kmer(); } + data.minimizers.finalize(); builder.finalize(); builder.build(data.strings); diff --git a/include/builder/util.hpp b/include/builder/util.hpp index 666a3c5..346b777 100644 --- a/include/builder/util.hpp +++ b/include/builder/util.hpp @@ -158,47 +158,149 @@ struct minimizers_tuples_iterator : std::forward_iterator_tag { }; struct minimizers_tuples { - static constexpr uint64_t ram_limit = 1 * essentials::GB; - - minimizers_tuples() : m_run_identifier(pthash::clock_type::now().time_since_epoch().count()) {} - - // void reserve(uint64_t n) { - // m_tuples.reserve(n); - // } + static constexpr uint64_t ram_limit = 0.5 * essentials::GB; + + minimizers_tuples(std::string tmp_dirname = constants::default_tmp_dirname) + : m_buffer_size(0) + , m_num_files_to_merge(0) + , m_run_identifier(pthash::clock_type::now().time_since_epoch().count()) + , m_tmp_dirname(tmp_dirname) { + m_buffer_size = ram_limit / sizeof(minimizer_tuple); + std::cout << "m_buffer_size " << m_buffer_size << std::endl; + } void emplace_back(uint64_t minimizer, uint64_t offset, uint64_t num_kmers_in_super_kmer) { - m_tuples.emplace_back(minimizer, offset, num_kmers_in_super_kmer); + if (m_buffer.size() == m_buffer_size) sort_and_flush(); + m_buffer.emplace_back(minimizer, offset, num_kmers_in_super_kmer); } - minimizer_tuple& back() { return m_tuples.back(); } + minimizer_tuple& back() { return m_buffer.back(); } - void sort() { - std::sort(m_tuples.begin(), m_tuples.end(), + void sort_and_flush() { + std::cout << "sorting buffer..." << std::endl; + std::sort(m_buffer.begin(), m_buffer.end(), [](minimizer_tuple const& x, minimizer_tuple const& y) { return (x.minimizer < y.minimizer) or (x.minimizer == y.minimizer and x.offset < y.offset); }); + auto tmp_output_filename = get_tmp_output_filename(m_num_files_to_merge); + std::cout << "saving to file '" << tmp_output_filename << "'..." << std::endl; + std::ofstream out(tmp_output_filename.c_str(), std::ofstream::binary); + if (!out.is_open()) throw std::runtime_error("cannot open file"); + out.write(reinterpret_cast(m_buffer.data()), + m_buffer.size() * sizeof(minimizer_tuple)); + out.close(); + + m_buffer.clear(); + ++m_num_files_to_merge; } - std::string get_minimizers_filename(std::string const& tmp_dirname) const { + void finalize() { + if (!m_buffer.empty()) sort_and_flush(); + } + + std::string get_minimizers_filename() const { std::stringstream filename; - filename << tmp_dirname << "/sshash.tmp.run_" << m_run_identifier << ".minimizers.bin"; + filename << m_tmp_dirname << "/sshash.tmp.run_" << m_run_identifier << ".minimizers.bin"; return filename.str(); } - void flush(std::string const& filename) { - std::ofstream out(filename.c_str(), std::ofstream::binary); + void merge() { + if (m_num_files_to_merge == 0) return; + + assert(m_num_files_to_merge > 0); + std::cout << "files to merge = " << m_num_files_to_merge << std::endl; + + struct iterator_type { + iterator_type(minimizer_tuple const* b, minimizer_tuple const* e) : begin(b), end(e) {} + minimizer_tuple const* begin; + minimizer_tuple const* end; + }; + std::vector iterators; + std::vector idx_heap; + iterators.reserve(m_num_files_to_merge); + idx_heap.reserve(m_num_files_to_merge); + std::vector> mm_files(m_num_files_to_merge); + + auto heap_idx_comparator = [&](uint32_t i, uint32_t j) { + minimizer_tuple const* begin_i = iterators[i].begin; + minimizer_tuple const* begin_j = iterators[j].begin; + if ((*begin_i).minimizer != (*begin_j).minimizer) { + return (*begin_i).minimizer > (*begin_j).minimizer; + } + return (*begin_i).offset > (*begin_j).offset; + }; + + auto advance_heap_head = [&]() { + uint32_t idx = idx_heap.front(); + iterators[idx].begin += 1; + if (iterators[idx].begin != iterators[idx].end) { // percolate down the head + uint64_t pos = 0; + uint64_t size = idx_heap.size(); + while (2 * pos + 1 < size) { + uint64_t i = 2 * pos + 1; + if (i + 1 < size and heap_idx_comparator(idx_heap[i], idx_heap[i + 1])) ++i; + if (heap_idx_comparator(idx_heap[i], idx_heap[pos])) break; + std::swap(idx_heap[pos], idx_heap[i]); + pos = i; + } + } else { + std::pop_heap(idx_heap.begin(), idx_heap.end(), heap_idx_comparator); + idx_heap.pop_back(); + } + }; + + /* create the input iterators and make the heap */ + for (uint64_t i = 0; i != m_num_files_to_merge; ++i) { + auto tmp_output_filename = get_tmp_output_filename(i); + mm_files[i].open(tmp_output_filename, mm::advice::sequential); + iterators.emplace_back(mm_files[i].data(), mm_files[i].data() + mm_files[i].size()); + idx_heap.push_back(i); + } + std::make_heap(idx_heap.begin(), idx_heap.end(), heap_idx_comparator); + + std::ofstream out(get_minimizers_filename().c_str()); if (!out.is_open()) throw std::runtime_error("cannot open file"); - out.write(reinterpret_cast(m_tuples.data()), - m_tuples.size() * sizeof(minimizer_tuple)); + + uint64_t num_written_tuples = 0; + while (!idx_heap.empty()) { + minimizer_tuple const* begin = iterators[idx_heap.front()].begin; + out.write(reinterpret_cast(begin), sizeof(minimizer_tuple)); + num_written_tuples += 1; + if (num_written_tuples % 100000 == 0) { + std::cout << "num_written_tuples = " << num_written_tuples << std::endl; + } + advance_heap_head(); + } + std::cout << "num_written_tuples = " << num_written_tuples << std::endl; out.close(); + + /* remove tmp files */ + for (uint64_t i = 0; i != m_num_files_to_merge; ++i) { + mm_files[i].close(); + auto tmp_output_filename = get_tmp_output_filename(i); + std::remove(tmp_output_filename.c_str()); + } + + std::vector().swap(m_buffer); + m_num_files_to_merge = 0; // any other call to merge() will do nothing } - void release() { std::vector().swap(m_tuples); } + void remove_tmp_file() { std::remove(get_minimizers_filename().c_str()); } private: + uint64_t m_buffer_size; + uint64_t m_num_files_to_merge; uint64_t m_run_identifier; - std::vector m_tuples; + std::string m_tmp_dirname; + std::vector m_buffer; + + std::string get_tmp_output_filename(uint64_t id) { + std::stringstream filename; + filename << m_tmp_dirname << "/sshash.tmp.run_" << m_run_identifier << ".minimizers." << id + << ".bin"; + return filename.str(); + }; }; } // namespace sshash \ No newline at end of file