diff --git a/README.md b/README.md index e66a087..7daab62 100644 --- a/README.md +++ b/README.md @@ -100,49 +100,52 @@ where the code was compiled (see the section [Compiling the Code](#compiling-the to show the usage of the driver program (reported below for convenience). - Usage: ./build [-h,--help] input_filename k m [-s seed] [-l l] [-c c] [--canonical-parsing] [--weighted] [-o output_filename] [--check] [--bench] [--verbose] - + Usage: ./build [-h,--help] input_filename k m [-s seed] [-l l] [-c c] [--canonical-parsing] [--weighted] [-o output_filename] [-d tmp_dirname] [--check] [--bench] [--verbose] + input_filename Must be a FASTA file (.fa/fasta extension) compressed with gzip (.gz) or not: - without duplicate nor invalid kmers - one DNA sequence per line. For example, it could be the de Bruijn graph topology output by BCALM. - + k K-mer length (must be <= 31). - + m Minimizer length (must be < k). - + [-s seed] Seed for construction (default is 1). - + [-l l] A (integer) constant that controls the space/time trade-off of the dictionary. A reasonable values lies between 2 and 12 (default is 6). - + [-c c] A (floating point) constant that trades construction speed for space effectiveness of minimal perfect hashing. A reasonable value lies between 3.0 and 10.0 (default is 3.000000). - + + [-o output_filename] + Output file name where the data structure will be serialized. + + [-d tmp_dirname] + Temporary directory used for construction in external memory. Default is directory '.'. + [--canonical-parsing] Canonical parsing of k-mers. This option changes the parsing and results in a trade-off between index space and lookup time. - + [--weighted] Also store the weights in compressed format. - - [-o output_filename] - Output file name where the data structure will be serialized. - + [--check] Check correctness after construction. - + [--bench] Run benchmark after construction. - + [--verbose] Verbose output during construction. - + [-h,--help] - Print this help text and silently exits. + Print this help text and silently exits. Examples diff --git a/include/builder/build.cpp b/include/builder/build.cpp index 2089a80..36ef252 100644 --- a/include/builder/build.cpp +++ b/include/builder/build.cpp @@ -68,12 +68,16 @@ void dictionary::build(std::string const& filename, build_configuration const& b } } - /* step 2: sort minimizers and build MPHF ***/ + /* step 2: merge minimizers and build MPHF ***/ timer.start(); - data.minimizers.sort(); - uint64_t num_buckets = 0; - for (auto it = data.minimizers.begin(); it.has_next(); it.next()) ++num_buckets; - m_minimizers.build(data.minimizers.begin(), num_buckets); + data.minimizers.merge(); + { + mm::file_source input(data.minimizers.get_minimizers_filename(), + mm::advice::sequential); + minimizers_tuples_iterator iterator(input.data(), input.data() + input.size()); + m_minimizers.build(iterator, data.minimizers.num_minimizers(), build_config); + input.close(); + } timer.stop(); timings.push_back(timer.elapsed()); print_time(timings.back(), data.num_kmers, "step 2: 'build_minimizers'"); @@ -82,7 +86,7 @@ void dictionary::build(std::string const& filename, build_configuration const& b /* step 3: build index ***/ timer.start(); - auto buckets_stats = build_index(data, m_minimizers, m_buckets); + auto buckets_stats = build_index(data, m_minimizers, m_buckets, build_config); timer.stop(); timings.push_back(timer.elapsed()); print_time(timings.back(), data.num_kmers, "step 3: 'build_index'"); @@ -104,6 +108,8 @@ void dictionary::build(std::string const& filename, build_configuration const& b print_space_breakdown(); if (build_config.verbose) buckets_stats.print(); + + data.minimizers.remove_tmp_file(); } } // namespace sshash diff --git a/include/builder/build_index.cpp b/include/builder/build_index.cpp index b94087d..b44ef3f 100644 --- a/include/builder/build_index.cpp +++ b/include/builder/build_index.cpp @@ -1,40 +1,234 @@ -#include // for std::partial_sum - namespace sshash { -buckets_statistics build_index(parse_data& data, minimizers const& m_minimizers, - buckets& m_buckets) { +#pragma pack(push, 4) +struct bucket_pair { + bucket_pair(uint64_t id, uint32_t size) : id(id), size(size) {} + uint64_t id; + uint32_t size; +}; +#pragma pack(pop) + +struct bucket_pairs_iterator : std::forward_iterator_tag { + bucket_pairs_iterator(bucket_pair const* begin, bucket_pair const* end) + : m_begin(begin) + , m_end(end) + , m_i(0) + , m_val(0) // first returned val is always 0 + {} + + uint64_t operator*() const { return m_val; } + void operator++() { + ++m_i; + if (m_i == (*m_begin).id) { + m_val += (*m_begin).size; + ++m_begin; + assert(m_begin <= m_end); + } + } + +private: + bucket_pair const* m_begin; + bucket_pair const* m_end; + uint64_t m_i; + uint64_t m_val; +}; + +struct bucket_pairs { + static constexpr uint64_t ram_limit = 0.25 * essentials::GB; + + bucket_pairs(std::string const& tmp_dirname) + : m_buffer_size(0) + , m_num_files_to_merge(0) + , m_run_identifier(pthash::clock_type::now().time_since_epoch().count()) + , m_tmp_dirname(tmp_dirname) { + m_buffer_size = ram_limit / sizeof(bucket_pair); + std::cout << "m_buffer_size " << m_buffer_size << std::endl; + } + + void emplace_back(uint64_t id, uint32_t size) { + if (m_buffer.size() == m_buffer_size) sort_and_flush(); + m_buffer.emplace_back(id, size); + } + + void sort_and_flush() { + std::cout << "sorting buffer..." << std::endl; + std::sort(m_buffer.begin(), m_buffer.end(), + [](bucket_pair const& x, bucket_pair const& y) { return x.id < y.id; }); + + auto tmp_output_filename = get_tmp_output_filename(m_num_files_to_merge); + std::cout << "saving to file '" << tmp_output_filename << "'..." << std::endl; + std::ofstream out(tmp_output_filename.c_str(), std::ofstream::binary); + if (!out.is_open()) throw std::runtime_error("cannot open file"); + out.write(reinterpret_cast(m_buffer.data()), + m_buffer.size() * sizeof(bucket_pair)); + out.close(); + + m_buffer.clear(); + ++m_num_files_to_merge; + } + + void finalize() { + if (!m_buffer.empty()) sort_and_flush(); + } + + std::string get_bucket_pairs_filename() const { + if (m_num_files_to_merge == 1) return get_tmp_output_filename(0); + std::stringstream filename; + filename << m_tmp_dirname << "/sshash.tmp.run_" << m_run_identifier << ".bucket_pairs.bin"; + return filename.str(); + } + + void merge() { + std::cout << "files to merge = " << m_num_files_to_merge << std::endl; + if (m_num_files_to_merge <= 1) return; + + assert(m_num_files_to_merge > 1); + + struct iterator_type { + iterator_type(bucket_pair const* b, bucket_pair const* e) : begin(b), end(e) {} + bucket_pair const* begin; + bucket_pair const* end; + }; + std::vector iterators; + std::vector idx_heap; + iterators.reserve(m_num_files_to_merge); + idx_heap.reserve(m_num_files_to_merge); + std::vector> mm_files(m_num_files_to_merge); + + auto heap_idx_comparator = [&](uint32_t i, uint32_t j) { + bucket_pair const* begin_i = iterators[i].begin; + bucket_pair const* begin_j = iterators[j].begin; + return (*begin_i).id > (*begin_j).id; + }; + + auto advance_heap_head = [&]() { + uint32_t idx = idx_heap.front(); + iterators[idx].begin += 1; + if (iterators[idx].begin != iterators[idx].end) { // percolate down the head + uint64_t pos = 0; + uint64_t size = idx_heap.size(); + while (2 * pos + 1 < size) { + uint64_t i = 2 * pos + 1; + if (i + 1 < size and heap_idx_comparator(idx_heap[i], idx_heap[i + 1])) ++i; + if (heap_idx_comparator(idx_heap[i], idx_heap[pos])) break; + std::swap(idx_heap[pos], idx_heap[i]); + pos = i; + } + } else { + std::pop_heap(idx_heap.begin(), idx_heap.end(), heap_idx_comparator); + idx_heap.pop_back(); + } + }; + + /* create the input iterators and make the heap */ + for (uint64_t i = 0; i != m_num_files_to_merge; ++i) { + auto tmp_output_filename = get_tmp_output_filename(i); + mm_files[i].open(tmp_output_filename, mm::advice::sequential); + iterators.emplace_back(mm_files[i].data(), mm_files[i].data() + mm_files[i].size()); + idx_heap.push_back(i); + } + std::make_heap(idx_heap.begin(), idx_heap.end(), heap_idx_comparator); + + std::ofstream out(get_bucket_pairs_filename().c_str()); + if (!out.is_open()) throw std::runtime_error("cannot open file"); + + uint64_t num_written_pairs = 0; + while (!idx_heap.empty()) { + bucket_pair const* begin = iterators[idx_heap.front()].begin; + out.write(reinterpret_cast(begin), sizeof(bucket_pair)); + num_written_pairs += 1; + if (num_written_pairs % 50000000 == 0) { + std::cout << "num_written_pairs = " << num_written_pairs << std::endl; + } + advance_heap_head(); + } + std::cout << "num_written_pairs = " << num_written_pairs << std::endl; + out.close(); + + /* remove tmp files */ + for (uint64_t i = 0; i != m_num_files_to_merge; ++i) { + mm_files[i].close(); + auto tmp_output_filename = get_tmp_output_filename(i); + std::remove(tmp_output_filename.c_str()); + } + + std::vector().swap(m_buffer); + m_num_files_to_merge = 0; // any other call to merge() will do nothing + } + + void remove_tmp_file() { std::remove(get_bucket_pairs_filename().c_str()); } + +private: + uint64_t m_buffer_size; + uint64_t m_num_files_to_merge; + uint64_t m_run_identifier; + std::string m_tmp_dirname; + std::vector m_buffer; + + std::string get_tmp_output_filename(uint64_t id) const { + std::stringstream filename; + filename << m_tmp_dirname << "/sshash.tmp.run_" << m_run_identifier << ".bucket_pairs." + << id << ".bin"; + return filename.str(); + } +}; + +buckets_statistics build_index(parse_data& data, minimizers const& m_minimizers, buckets& m_buckets, + build_configuration const& build_config) { uint64_t num_buckets = m_minimizers.size(); uint64_t num_kmers = data.num_kmers; uint64_t num_super_kmers = data.strings.num_super_kmers(); - std::vector num_super_kmers_before_bucket(num_buckets + 1, 0); + pthash::compact_vector::builder offsets; offsets.resize(num_super_kmers, std::ceil(std::log2(data.strings.num_bits() / 2))); std::cout << "bits_per_offset = ceil(log2(" << data.strings.num_bits() / 2 << ")) = " << std::ceil(std::log2(data.strings.num_bits() / 2)) << std::endl; - for (auto it = data.minimizers.begin(); it.has_next(); it.next()) { - assert(it.list().size() > 0); - if (it.list().size() != 1) { + mm::file_source input(data.minimizers.get_minimizers_filename(), + mm::advice::sequential); + + bucket_pairs bucket_pairs_manager(build_config.tmp_dirname); + uint64_t num_singletons = 0; + for (minimizers_tuples_iterator it(input.data(), input.data() + input.size()); it.has_next(); + it.next()) { + uint32_t list_size = it.list().size(); + assert(list_size > 0); + if (list_size != 1) { uint64_t bucket_id = m_minimizers.lookup(it.minimizer()); - num_super_kmers_before_bucket[bucket_id + 1] = it.list().size() - 1; + bucket_pairs_manager.emplace_back(bucket_id + 1, list_size - 1); + } else { + ++num_singletons; } - // else: it.list().size() = 1 and num_super_kmers_before_bucket[bucket_id + 1] is already 0 } - std::partial_sum(num_super_kmers_before_bucket.begin(), num_super_kmers_before_bucket.end(), - num_super_kmers_before_bucket.begin()); + bucket_pairs_manager.finalize(); + + std::cout << "num_singletons " << num_singletons << "/" << num_buckets << " (" + << (num_singletons * 100.0) / num_buckets << "%)" << std::endl; + + bucket_pairs_manager.merge(); + + { + mm::file_source bucket_pairs_file( + bucket_pairs_manager.get_bucket_pairs_filename(), mm::advice::sequential); + bucket_pairs_iterator iterator(bucket_pairs_file.data(), + bucket_pairs_file.data() + bucket_pairs_file.size()); + m_buckets.num_super_kmers_before_bucket.encode(iterator, num_buckets + 1, + num_super_kmers - num_buckets); + bucket_pairs_file.close(); + } + + bucket_pairs_manager.remove_tmp_file(); buckets_statistics buckets_stats(num_buckets, num_kmers, num_super_kmers); - uint64_t num_singletons = 0; - for (auto it = data.minimizers.begin(); it.has_next(); it.next()) { + for (minimizers_tuples_iterator it(input.data(), input.data() + input.size()); it.has_next(); + it.next()) { uint64_t bucket_id = m_minimizers.lookup(it.minimizer()); - uint64_t base = num_super_kmers_before_bucket[bucket_id] + bucket_id; + uint64_t base = m_buckets.num_super_kmers_before_bucket.access(bucket_id) + bucket_id; uint64_t num_super_kmers_in_bucket = - (num_super_kmers_before_bucket[bucket_id + 1] + bucket_id + 1) - base; + (m_buckets.num_super_kmers_before_bucket.access(bucket_id + 1) + bucket_id + 1) - base; assert(num_super_kmers_in_bucket > 0); - if (num_super_kmers_in_bucket == 1) ++num_singletons; buckets_stats.add_num_super_kmers_in_bucket(num_super_kmers_in_bucket); uint64_t offset_pos = 0; auto list = it.list(); @@ -46,15 +240,13 @@ buckets_statistics build_index(parse_data& data, minimizers const& m_minimizers, assert(offset_pos == num_super_kmers_in_bucket); } - std::cout << "num_singletons " << num_singletons << "/" << num_buckets << " (" - << (num_singletons * 100.0) / num_buckets << "%)" << std::endl; - - m_buckets.pieces.encode(data.strings.pieces.begin(), data.strings.pieces.size()); - m_buckets.num_super_kmers_before_bucket.encode(num_super_kmers_before_bucket.begin(), - num_super_kmers_before_bucket.size()); + m_buckets.pieces.encode(data.strings.pieces.begin(), data.strings.pieces.size(), + data.strings.pieces.back()); offsets.build(m_buckets.offsets); m_buckets.strings.swap(data.strings.strings); + input.close(); + return buckets_stats; } diff --git a/include/builder/build_skew_index.cpp b/include/builder/build_skew_index.cpp index b1c3414..8316089 100644 --- a/include/builder/build_skew_index.cpp +++ b/include/builder/build_skew_index.cpp @@ -16,24 +16,46 @@ void build_skew_index(skew_index& m_skew_index, parse_data& data, buckets const& std::cout << "log2_max_num_super_kmers_in_bucket " << m_skew_index.log2_max_num_super_kmers_in_bucket << std::endl; + mm::file_source input(data.minimizers.get_minimizers_filename(), + mm::advice::sequential); + uint64_t num_buckets_in_skew_index = 0; - for (auto it = data.minimizers.begin(); it.has_next(); it.next()) { - if (it.list().size() > (1ULL << min_log2_size)) ++num_buckets_in_skew_index; + uint64_t num_super_kmers_in_skew_index = 0; + for (minimizers_tuples_iterator it(input.data(), input.data() + input.size()); it.has_next(); + it.next()) { + uint64_t list_size = it.list().size(); + if (list_size > (1ULL << min_log2_size)) { + num_super_kmers_in_skew_index += list_size; + ++num_buckets_in_skew_index; + } } std::cout << "num_buckets_in_skew_index " << num_buckets_in_skew_index << "/" << buckets_stats.num_buckets() << "(" << (num_buckets_in_skew_index * 100.0) / buckets_stats.num_buckets() << "%)" << std::endl; - if (num_buckets_in_skew_index == 0) return; + if (num_buckets_in_skew_index == 0) { + input.close(); + return; + } std::vector lists; lists.reserve(num_buckets_in_skew_index); - for (auto it = data.minimizers.begin(); it.has_next(); it.next()) { + std::vector lists_tuples; // backed memory + lists_tuples.reserve(num_super_kmers_in_skew_index); + for (minimizers_tuples_iterator it(input.data(), input.data() + input.size()); it.has_next(); + it.next()) { auto list = it.list(); - if (list.size() > (1ULL << min_log2_size)) lists.push_back(list); + if (list.size() > (1ULL << min_log2_size)) { + minimizer_tuple const* begin = lists_tuples.data() + lists_tuples.size(); + std::copy(list.begin_ptr(), list.end_ptr(), std::back_inserter(lists_tuples)); + minimizer_tuple const* end = lists_tuples.data() + lists_tuples.size(); + lists.push_back(list_type(begin, end)); + } } assert(lists.size() == num_buckets_in_skew_index); + input.close(); + std::sort(lists.begin(), lists.end(), [](list_type const& x, list_type const& y) { return x.size() < y.size(); }); diff --git a/include/builder/parse_file.cpp b/include/builder/parse_file.cpp index d017279..01f940a 100644 --- a/include/builder/parse_file.cpp +++ b/include/builder/parse_file.cpp @@ -3,7 +3,7 @@ namespace sshash { struct parse_data { - parse_data() : num_kmers(0) {} + parse_data(std::string const& tmp_dirname) : num_kmers(0), minimizers(tmp_dirname) {} uint64_t num_kmers; minimizers_tuples minimizers; compact_string_pool strings; @@ -181,6 +181,7 @@ void parse_file(std::istream& is, parse_data& data, build_configuration const& b append_super_kmer(); } + data.minimizers.finalize(); builder.finalize(); builder.build(data.strings); @@ -204,7 +205,7 @@ parse_data parse_file(std::string const& filename, build_configuration const& bu std::ifstream is(filename.c_str()); if (!is.good()) throw std::runtime_error("error in opening the file '" + filename + "'"); std::cout << "reading file '" << filename << "'..." << std::endl; - parse_data data; + parse_data data(build_config.tmp_dirname); if (util::ends_with(filename, ".gz")) { zip_istream zis(is); parse_file(zis, data, build_config); diff --git a/include/builder/util.hpp b/include/builder/util.hpp index b21a446..442bf14 100644 --- a/include/builder/util.hpp +++ b/include/builder/util.hpp @@ -91,12 +91,11 @@ struct minimizer_tuple { #pragma pack(pop) struct list_type { - list_type(std::vector::iterator begin, - std::vector::iterator end) - : m_begin(begin), m_size(std::distance(begin, end)) {} + list_type(minimizer_tuple const* begin, minimizer_tuple const* end) + : m_begin(begin), m_end(end), m_size(std::distance(begin, end)) {} struct iterator { - iterator(std::vector::iterator begin) : m_begin(begin) {} + iterator(minimizer_tuple const* begin) : m_begin(begin) {} inline std::pair operator*() const { return {(*m_begin).offset, (*m_begin).num_kmers_in_super_kmer}; @@ -107,76 +106,211 @@ struct list_type { bool operator!=(iterator const& other) const { return !(*this == other); } private: - std::vector::iterator m_begin; + minimizer_tuple const* m_begin; }; iterator begin() const { return iterator(m_begin); } - iterator end() const { return iterator(m_begin + m_size); } + iterator end() const { return iterator(m_end); } uint64_t size() const { return m_size; } + minimizer_tuple const* begin_ptr() const { return m_begin; } + minimizer_tuple const* end_ptr() const { return m_end; } + private: - std::vector::iterator m_begin; + minimizer_tuple const* m_begin; + minimizer_tuple const* m_end; uint64_t m_size; }; -struct minimizers_tuples { - minimizers_tuples() {} +struct minimizers_tuples_iterator : std::forward_iterator_tag { + typedef minimizer_tuple value_type; + + minimizers_tuples_iterator(minimizer_tuple const* begin, minimizer_tuple const* end) + : m_list_begin(begin), m_list_end(begin), m_end(end) { + m_list_end = next_begin(); + } + + inline uint64_t minimizer() const { return (*m_list_begin).minimizer; } + inline uint64_t operator*() const { return minimizer(); } + inline void next() { + m_list_begin = m_list_end; + m_list_end = next_begin(); + } + inline void operator++() { next(); } + bool has_next() const { return m_list_begin != m_end; } + list_type list() const { return list_type(m_list_begin, m_list_end); } + +private: + minimizer_tuple const* m_list_begin; + minimizer_tuple const* m_list_end; + minimizer_tuple const* m_end; + + minimizer_tuple const* next_begin() { + minimizer_tuple const* begin = m_list_begin; + uint64_t prev_minimizer = (*begin).minimizer; + while (begin != m_end) { + ++begin; + uint64_t curr_minimizer = (*begin).minimizer; + if (curr_minimizer != prev_minimizer) break; + } + return begin; + } +}; - // void reserve(uint64_t n) { - // tuples.reserve(n); - // } +struct minimizers_tuples { + static constexpr uint64_t ram_limit = 0.5 * essentials::GB; + + minimizers_tuples(std::string const& tmp_dirname) + : m_buffer_size(0) + , m_num_files_to_merge(0) + , m_num_minimizers(0) + , m_run_identifier(pthash::clock_type::now().time_since_epoch().count()) + , m_tmp_dirname(tmp_dirname) { + m_buffer_size = ram_limit / sizeof(minimizer_tuple); + std::cout << "m_buffer_size " << m_buffer_size << std::endl; + } void emplace_back(uint64_t minimizer, uint64_t offset, uint64_t num_kmers_in_super_kmer) { - tuples.emplace_back(minimizer, offset, num_kmers_in_super_kmer); + if (m_buffer.size() == m_buffer_size) sort_and_flush(); + m_buffer.emplace_back(minimizer, offset, num_kmers_in_super_kmer); } - minimizer_tuple& back() { return tuples.back(); } + minimizer_tuple& back() { return m_buffer.back(); } - void sort() { - std::sort(tuples.begin(), tuples.end(), + void sort_and_flush() { + std::cout << "sorting buffer..." << std::endl; + std::sort(m_buffer.begin(), m_buffer.end(), [](minimizer_tuple const& x, minimizer_tuple const& y) { return (x.minimizer < y.minimizer) or (x.minimizer == y.minimizer and x.offset < y.offset); }); + + auto tmp_output_filename = get_tmp_output_filename(m_num_files_to_merge); + std::cout << "saving to file '" << tmp_output_filename << "'..." << std::endl; + std::ofstream out(tmp_output_filename.c_str(), std::ofstream::binary); + if (!out.is_open()) throw std::runtime_error("cannot open file"); + out.write(reinterpret_cast(m_buffer.data()), + m_buffer.size() * sizeof(minimizer_tuple)); + out.close(); + + m_buffer.clear(); + ++m_num_files_to_merge; } - struct iterator { - iterator(std::vector::iterator b, std::vector::iterator e) - : begin(b), end(e) {} - - inline uint64_t minimizer() const { return (*begin).minimizer; } - inline uint64_t operator*() const { return minimizer(); } - list_type list() const { return list_type(begin, next_begin()); } - bool has_next() const { return begin != end; } - inline void next() { begin = next_begin(); } - inline void operator++() { next(); } - - inline iterator operator+(uint64_t /*offset*/) const { - assert(false); - return iterator(end, end); - } + void finalize() { + if (!m_buffer.empty()) sort_and_flush(); + } - private: - std::vector::iterator begin, end; - - std::vector::iterator next_begin() const { - auto i = begin; - uint64_t prev = (*i).minimizer; - while (true) { - ++i; - if (i == end) break; - uint64_t curr = (*i).minimizer; - if (curr != prev) break; + std::string get_minimizers_filename() const { + std::stringstream filename; + filename << m_tmp_dirname << "/sshash.tmp.run_" << m_run_identifier << ".minimizers.bin"; + return filename.str(); + } + + void merge() { + std::cout << "files to merge = " << m_num_files_to_merge << std::endl; + if (m_num_files_to_merge == 0) return; + + assert(m_num_files_to_merge > 0); + + struct iterator_type { + iterator_type(minimizer_tuple const* b, minimizer_tuple const* e) : begin(b), end(e) {} + minimizer_tuple const* begin; + minimizer_tuple const* end; + }; + std::vector iterators; + std::vector idx_heap; + iterators.reserve(m_num_files_to_merge); + idx_heap.reserve(m_num_files_to_merge); + std::vector> mm_files(m_num_files_to_merge); + + auto heap_idx_comparator = [&](uint32_t i, uint32_t j) { + minimizer_tuple const* begin_i = iterators[i].begin; + minimizer_tuple const* begin_j = iterators[j].begin; + if ((*begin_i).minimizer != (*begin_j).minimizer) { + return (*begin_i).minimizer > (*begin_j).minimizer; + } + return (*begin_i).offset > (*begin_j).offset; + }; + + auto advance_heap_head = [&]() { + uint32_t idx = idx_heap.front(); + iterators[idx].begin += 1; + if (iterators[idx].begin != iterators[idx].end) { // percolate down the head + uint64_t pos = 0; + uint64_t size = idx_heap.size(); + while (2 * pos + 1 < size) { + uint64_t i = 2 * pos + 1; + if (i + 1 < size and heap_idx_comparator(idx_heap[i], idx_heap[i + 1])) ++i; + if (heap_idx_comparator(idx_heap[i], idx_heap[pos])) break; + std::swap(idx_heap[pos], idx_heap[i]); + pos = i; + } + } else { + std::pop_heap(idx_heap.begin(), idx_heap.end(), heap_idx_comparator); + idx_heap.pop_back(); } - assert(i > begin); - return i; + }; + + /* create the input iterators and make the heap */ + for (uint64_t i = 0; i != m_num_files_to_merge; ++i) { + auto tmp_output_filename = get_tmp_output_filename(i); + mm_files[i].open(tmp_output_filename, mm::advice::sequential); + iterators.emplace_back(mm_files[i].data(), mm_files[i].data() + mm_files[i].size()); + idx_heap.push_back(i); + } + std::make_heap(idx_heap.begin(), idx_heap.end(), heap_idx_comparator); + + std::ofstream out(get_minimizers_filename().c_str()); + if (!out.is_open()) throw std::runtime_error("cannot open file"); + + uint64_t num_written_tuples = 0; + uint64_t prev_minimizer = constants::invalid; + while (!idx_heap.empty()) { + minimizer_tuple const* begin = iterators[idx_heap.front()].begin; + out.write(reinterpret_cast(begin), sizeof(minimizer_tuple)); + if ((*begin).minimizer != prev_minimizer) { + prev_minimizer = (*begin).minimizer; + ++m_num_minimizers; + } + num_written_tuples += 1; + if (num_written_tuples % 50000000 == 0) { + std::cout << "num_written_tuples = " << num_written_tuples << std::endl; + } + advance_heap_head(); + } + std::cout << "num_written_tuples = " << num_written_tuples << std::endl; + out.close(); + + /* remove tmp files */ + for (uint64_t i = 0; i != m_num_files_to_merge; ++i) { + mm_files[i].close(); + auto tmp_output_filename = get_tmp_output_filename(i); + std::remove(tmp_output_filename.c_str()); } - }; - iterator begin() { return iterator(tuples.begin(), tuples.end()); } + std::vector().swap(m_buffer); + m_num_files_to_merge = 0; // any other call to merge() will do nothing + } + + uint64_t num_minimizers() const { return m_num_minimizers; } + + void remove_tmp_file() { std::remove(get_minimizers_filename().c_str()); } private: - std::vector tuples; + uint64_t m_buffer_size; + uint64_t m_num_files_to_merge; + uint64_t m_num_minimizers; + uint64_t m_run_identifier; + std::string m_tmp_dirname; + std::vector m_buffer; + + std::string get_tmp_output_filename(uint64_t id) const { + std::stringstream filename; + filename << m_tmp_dirname << "/sshash.tmp.run_" << m_run_identifier << ".minimizers." << id + << ".bin"; + return filename.str(); + } }; } // namespace sshash \ No newline at end of file diff --git a/include/ef_sequence.hpp b/include/ef_sequence.hpp index bc5defc..2debf12 100644 --- a/include/ef_sequence.hpp +++ b/include/ef_sequence.hpp @@ -10,10 +10,9 @@ template struct ef_sequence { ef_sequence() : m_universe(0) {} - template - void encode(Iterator begin, uint64_t n) { + template + void encode(ForwardIterator begin, uint64_t n, uint64_t u) { if (n == 0) return; - uint64_t u = *(begin + n - 1); m_universe = u; uint64_t l = uint64_t((n && u / n) ? pthash::util::msb(u / n) : 0); diff --git a/include/minimizers.hpp b/include/minimizers.hpp index 55c859b..b228415 100644 --- a/include/minimizers.hpp +++ b/include/minimizers.hpp @@ -6,7 +6,7 @@ namespace sshash { struct minimizers { template - void build(ForwardIterator begin, uint64_t size) { + void build(ForwardIterator begin, uint64_t size, build_configuration const& build_config) { util::check_hash_collision_probability(size); pthash::build_configuration mphf_config; mphf_config.c = 6.0; @@ -14,12 +14,10 @@ struct minimizers { mphf_config.seed = 1234567890; // my favourite seed mphf_config.minimal_output = true; mphf_config.verbose_output = false; - /* - We use one thread here because the keys iterator is not - a random-access iterator, just forward - */ - mphf_config.num_threads = 1; - m_mphf.build_in_internal_memory(begin, size, mphf_config); + mphf_config.num_threads = std::thread::hardware_concurrency() >= 8 ? 8 : 1; + mphf_config.ram = 2 * essentials::GB; + mphf_config.tmp_dir = build_config.tmp_dirname; + m_mphf.build_in_external_memory(begin, size, mphf_config); } uint64_t lookup(uint64_t uint64_minimizer) const { diff --git a/include/util.hpp b/include/util.hpp index caa5329..fb3a53b 100644 --- a/include/util.hpp +++ b/include/util.hpp @@ -67,7 +67,9 @@ struct build_configuration { , canonical_parsing(false) , weighted(false) - , verbose(true) {} + , verbose(true) + + , tmp_dirname(constants::default_tmp_dirname) {} uint64_t k; // kmer size uint64_t m; // minimizer size @@ -80,6 +82,8 @@ struct build_configuration { bool weighted; bool verbose; + std::string tmp_dirname; + void print() const { std::cout << "k = " << k << ", m = " << m << ", seed = " << seed << ", l = " << l << ", c = " << c diff --git a/include/weights.hpp b/include/weights.hpp index 68aadde..abe9201 100644 --- a/include/weights.hpp +++ b/include/weights.hpp @@ -104,7 +104,8 @@ struct weights { } weight_interval_values.build(index.m_weight_interval_values); index.m_weight_interval_lengths.encode(m_weight_interval_lengths.begin(), - m_weight_interval_lengths.size()); + m_weight_interval_lengths.size(), + m_weight_interval_lengths.back()); m_weight_dictionary_builder.build(index.m_weight_dictionary); } diff --git a/src/build.cpp b/src/build.cpp index 67f258c..15982b8 100644 --- a/src/build.cpp +++ b/src/build.cpp @@ -37,13 +37,18 @@ int main(int argc, char** argv) { "A reasonable value lies between 3.0 and 10.0 (default is " + std::to_string(constants::c) + ").", "-c", false); + parser.add("output_filename", "Output file name where the data structure will be serialized.", + "-o", false); + parser.add( + "tmp_dirname", + "Temporary directory used for construction in external memory. Default is directory '" + + constants::default_tmp_dirname + "'.", + "-d", false); parser.add("canonical_parsing", "Canonical parsing of k-mers. This option changes the parsing and results in a " "trade-off between index space and lookup time.", "--canonical-parsing", true); parser.add("weighted", "Also store the weights in compressed format.", "--weighted", true); - parser.add("output_filename", "Output file name where the data structure will be serialized.", - "-o", false); parser.add("check", "Check correctness after construction.", "--check", true); parser.add("bench", "Run benchmark after construction.", "--bench", true); parser.add("verbose", "Verbose output during construction.", "--verbose", true); @@ -66,6 +71,9 @@ int main(int argc, char** argv) { build_config.canonical_parsing = parser.get("canonical_parsing"); build_config.weighted = parser.get("weighted"); build_config.verbose = parser.get("verbose"); + if (parser.parsed("tmp_dirname")) { + build_config.tmp_dirname = parser.get("tmp_dirname"); + } build_config.print(); dict.build(input_filename, build_config); diff --git a/src/permute.cpp b/src/permute.cpp index 50db1ec..0d50044 100644 --- a/src/permute.cpp +++ b/src/permute.cpp @@ -312,7 +312,7 @@ void permute_and_write(std::istream& is, std::string const& output_filename, uint64_t num_files_to_merge = 0; auto get_tmp_output_filename = [&](uint64_t id) { - return tmp_dirname + "/tmp.run" + run_identifier + "." + std::to_string(id); + return tmp_dirname + "/sshash.tmp.run" + run_identifier + "." + std::to_string(id); }; auto sort_and_flush = [&]() {