Skip to content

Commit

Permalink
write to disk and merge blocks of tuples
Browse files Browse the repository at this point in the history
  • Loading branch information
jermp committed May 25, 2022
1 parent d48432d commit d054366
Show file tree
Hide file tree
Showing 5 changed files with 131 additions and 34 deletions.
12 changes: 5 additions & 7 deletions include/builder/build.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -71,14 +71,12 @@ void dictionary::build(std::string const& filename, build_configuration const& b
}
}

/* step 2: sort minimizers and build MPHF ***/
/* step 2: merge minimizers and build MPHF ***/
timer.start();
data.minimizers.sort();
auto minimizers_filename = data.minimizers.get_minimizers_filename(tmp_dirname);
data.minimizers.merge();
{
data.minimizers.flush(minimizers_filename);
data.minimizers.release(); // release internal memory
mm::file_source<minimizer_tuple> input(minimizers_filename, mm::advice::sequential);
mm::file_source<minimizer_tuple> input(data.minimizers.get_minimizers_filename(),
mm::advice::sequential);

uint64_t num_minimizers = 0;
for (minimizers_tuples_iterator it(input.data(), input.data() + input.size());
Expand Down Expand Up @@ -121,7 +119,7 @@ void dictionary::build(std::string const& filename, build_configuration const& b

if (build_config.verbose) buckets_stats.print();

std::remove(minimizers_filename.c_str());
data.minimizers.remove_tmp_file();
}

} // namespace sshash
6 changes: 2 additions & 4 deletions include/builder/build_index.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,8 @@ buckets_statistics build_index(parse_data& data, minimizers const& m_minimizers,
std::cout << "bits_per_offset = ceil(log2(" << data.strings.num_bits() / 2
<< ")) = " << std::ceil(std::log2(data.strings.num_bits() / 2)) << std::endl;

// TODO: have user input here
std::string tmp_dirname = constants::default_tmp_dirname;
auto minimizers_filename = data.minimizers.get_minimizers_filename(tmp_dirname);
mm::file_source<minimizer_tuple> input(minimizers_filename, mm::advice::sequential);
mm::file_source<minimizer_tuple> input(data.minimizers.get_minimizers_filename(),
mm::advice::sequential);

for (minimizers_tuples_iterator it(input.data(), input.data() + input.size()); it.has_next();
it.next()) {
Expand Down
6 changes: 2 additions & 4 deletions include/builder/build_skew_index.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,8 @@ void build_skew_index(skew_index& m_skew_index, parse_data& data, buckets const&
std::cout << "log2_max_num_super_kmers_in_bucket "
<< m_skew_index.log2_max_num_super_kmers_in_bucket << std::endl;

// TODO: have user input here
std::string tmp_dirname = constants::default_tmp_dirname;
auto minimizers_filename = data.minimizers.get_minimizers_filename(tmp_dirname);
mm::file_source<minimizer_tuple> input(minimizers_filename, mm::advice::sequential);
mm::file_source<minimizer_tuple> input(data.minimizers.get_minimizers_filename(),
mm::advice::sequential);

uint64_t num_buckets_in_skew_index = 0;
uint64_t num_super_kmers_in_skew_index = 0;
Expand Down
1 change: 1 addition & 0 deletions include/builder/parse_file.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,7 @@ void parse_file(std::istream& is, parse_data& data, build_configuration const& b
append_super_kmer();
}

data.minimizers.finalize();
builder.finalize();
builder.build(data.strings);

Expand Down
140 changes: 121 additions & 19 deletions include/builder/util.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -158,47 +158,149 @@ struct minimizers_tuples_iterator : std::forward_iterator_tag {
};

struct minimizers_tuples {
static constexpr uint64_t ram_limit = 1 * essentials::GB;

minimizers_tuples() : m_run_identifier(pthash::clock_type::now().time_since_epoch().count()) {}

// void reserve(uint64_t n) {
// m_tuples.reserve(n);
// }
static constexpr uint64_t ram_limit = 0.5 * essentials::GB;

minimizers_tuples(std::string tmp_dirname = constants::default_tmp_dirname)
: m_buffer_size(0)
, m_num_files_to_merge(0)
, m_run_identifier(pthash::clock_type::now().time_since_epoch().count())
, m_tmp_dirname(tmp_dirname) {
m_buffer_size = ram_limit / sizeof(minimizer_tuple);
std::cout << "m_buffer_size " << m_buffer_size << std::endl;
}

void emplace_back(uint64_t minimizer, uint64_t offset, uint64_t num_kmers_in_super_kmer) {
m_tuples.emplace_back(minimizer, offset, num_kmers_in_super_kmer);
if (m_buffer.size() == m_buffer_size) sort_and_flush();
m_buffer.emplace_back(minimizer, offset, num_kmers_in_super_kmer);
}

minimizer_tuple& back() { return m_tuples.back(); }
minimizer_tuple& back() { return m_buffer.back(); }

void sort() {
std::sort(m_tuples.begin(), m_tuples.end(),
void sort_and_flush() {
std::cout << "sorting buffer..." << std::endl;
std::sort(m_buffer.begin(), m_buffer.end(),
[](minimizer_tuple const& x, minimizer_tuple const& y) {
return (x.minimizer < y.minimizer) or
(x.minimizer == y.minimizer and x.offset < y.offset);
});
auto tmp_output_filename = get_tmp_output_filename(m_num_files_to_merge);
std::cout << "saving to file '" << tmp_output_filename << "'..." << std::endl;
std::ofstream out(tmp_output_filename.c_str(), std::ofstream::binary);
if (!out.is_open()) throw std::runtime_error("cannot open file");
out.write(reinterpret_cast<char const*>(m_buffer.data()),
m_buffer.size() * sizeof(minimizer_tuple));
out.close();

m_buffer.clear();
++m_num_files_to_merge;
}

std::string get_minimizers_filename(std::string const& tmp_dirname) const {
void finalize() {
if (!m_buffer.empty()) sort_and_flush();
}

std::string get_minimizers_filename() const {
std::stringstream filename;
filename << tmp_dirname << "/sshash.tmp.run_" << m_run_identifier << ".minimizers.bin";
filename << m_tmp_dirname << "/sshash.tmp.run_" << m_run_identifier << ".minimizers.bin";
return filename.str();
}

void flush(std::string const& filename) {
std::ofstream out(filename.c_str(), std::ofstream::binary);
void merge() {
if (m_num_files_to_merge == 0) return;

assert(m_num_files_to_merge > 0);
std::cout << "files to merge = " << m_num_files_to_merge << std::endl;

struct iterator_type {
iterator_type(minimizer_tuple const* b, minimizer_tuple const* e) : begin(b), end(e) {}
minimizer_tuple const* begin;
minimizer_tuple const* end;
};
std::vector<iterator_type> iterators;
std::vector<uint32_t> idx_heap;
iterators.reserve(m_num_files_to_merge);
idx_heap.reserve(m_num_files_to_merge);
std::vector<mm::file_source<minimizer_tuple>> mm_files(m_num_files_to_merge);

auto heap_idx_comparator = [&](uint32_t i, uint32_t j) {
minimizer_tuple const* begin_i = iterators[i].begin;
minimizer_tuple const* begin_j = iterators[j].begin;
if ((*begin_i).minimizer != (*begin_j).minimizer) {
return (*begin_i).minimizer > (*begin_j).minimizer;
}
return (*begin_i).offset > (*begin_j).offset;
};

auto advance_heap_head = [&]() {
uint32_t idx = idx_heap.front();
iterators[idx].begin += 1;
if (iterators[idx].begin != iterators[idx].end) { // percolate down the head
uint64_t pos = 0;
uint64_t size = idx_heap.size();
while (2 * pos + 1 < size) {
uint64_t i = 2 * pos + 1;
if (i + 1 < size and heap_idx_comparator(idx_heap[i], idx_heap[i + 1])) ++i;
if (heap_idx_comparator(idx_heap[i], idx_heap[pos])) break;
std::swap(idx_heap[pos], idx_heap[i]);
pos = i;
}
} else {
std::pop_heap(idx_heap.begin(), idx_heap.end(), heap_idx_comparator);
idx_heap.pop_back();
}
};

/* create the input iterators and make the heap */
for (uint64_t i = 0; i != m_num_files_to_merge; ++i) {
auto tmp_output_filename = get_tmp_output_filename(i);
mm_files[i].open(tmp_output_filename, mm::advice::sequential);
iterators.emplace_back(mm_files[i].data(), mm_files[i].data() + mm_files[i].size());
idx_heap.push_back(i);
}
std::make_heap(idx_heap.begin(), idx_heap.end(), heap_idx_comparator);

std::ofstream out(get_minimizers_filename().c_str());
if (!out.is_open()) throw std::runtime_error("cannot open file");
out.write(reinterpret_cast<char const*>(m_tuples.data()),
m_tuples.size() * sizeof(minimizer_tuple));

uint64_t num_written_tuples = 0;
while (!idx_heap.empty()) {
minimizer_tuple const* begin = iterators[idx_heap.front()].begin;
out.write(reinterpret_cast<char const*>(begin), sizeof(minimizer_tuple));
num_written_tuples += 1;
if (num_written_tuples % 100000 == 0) {
std::cout << "num_written_tuples = " << num_written_tuples << std::endl;
}
advance_heap_head();
}
std::cout << "num_written_tuples = " << num_written_tuples << std::endl;
out.close();

/* remove tmp files */
for (uint64_t i = 0; i != m_num_files_to_merge; ++i) {
mm_files[i].close();
auto tmp_output_filename = get_tmp_output_filename(i);
std::remove(tmp_output_filename.c_str());
}

std::vector<minimizer_tuple>().swap(m_buffer);
m_num_files_to_merge = 0; // any other call to merge() will do nothing
}

void release() { std::vector<minimizer_tuple>().swap(m_tuples); }
void remove_tmp_file() { std::remove(get_minimizers_filename().c_str()); }

private:
uint64_t m_buffer_size;
uint64_t m_num_files_to_merge;
uint64_t m_run_identifier;
std::vector<minimizer_tuple> m_tuples;
std::string m_tmp_dirname;
std::vector<minimizer_tuple> m_buffer;

std::string get_tmp_output_filename(uint64_t id) {
std::stringstream filename;
filename << m_tmp_dirname << "/sshash.tmp.run_" << m_run_identifier << ".minimizers." << id
<< ".bin";
return filename.str();
};
};

} // namespace sshash

0 comments on commit d054366

Please sign in to comment.