Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Split query database #93

Merged
merged 23 commits into from
Sep 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion include/utilities/consolidate/consolidate.hpp
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#pragma once

#include <valik/shared.hpp>
#include <valik/split/reference_segments.hpp>
#include <valik/split/database_segments.hpp>

#include <utilities/consolidate/io.hpp>

Expand Down
4 changes: 2 additions & 2 deletions include/utilities/consolidate/io.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,15 @@

#include <filesystem>

#include <valik/split/reference_metadata.hpp>
#include <valik/split/database_metadata.hpp>

#include <utilities/consolidate/stellar_match.hpp>

namespace valik
{

std::vector<stellar_match> read_stellar_output(std::filesystem::path const & match_path,
reference_metadata const & reference,
database_metadata const & reference,
std::ios_base::openmode const mode = std::ios_base::in);

void write_stellar_output(std::filesystem::path const & out_path,
Expand Down
28 changes: 28 additions & 0 deletions include/utilities/consolidate/merge_processes.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
#pragma once

#include "iostream"

#include <utilities/external_process.hpp>
#include <valik/search/env_var_pack.hpp>
#include <valik/search/execution_metadata.hpp>
#include <valik/shared.hpp>

namespace valik
{

/**
* @brief Function that merges search results and metadata across threads.
*
* @param arguments Command line arguments.
* @param time_statistics Run time statistics. IN-OUT parameter.
* @param exec_meta Metadata table across all search threads.
* @param var_pack Environmental variables, this function calls the merge executable.
* @return false if merge failed.
*/

bool merge_processes(search_arguments const & arguments,
app::search_time_statistics & time_statistics,
app::execution_metadata & exec_meta,
env_var_pack const & var_pack);

} // namespace valik
4 changes: 2 additions & 2 deletions include/utilities/consolidate/stellar_match.hpp
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#pragma once

#include <utilities/shared.hpp>
#include <valik/split/reference_metadata.hpp>
#include <valik/split/database_metadata.hpp>

namespace valik
{
Expand All @@ -19,7 +19,7 @@ struct stellar_match
// 1;seq2Range=1280,1378;cigar=97M1D2M;mutations=14A,45G,58T,92C
std::string attributes{};

stellar_match(std::vector<std::string> const match_vec, reference_metadata const & reference)
stellar_match(std::vector<std::string> const match_vec, database_metadata const & reference)
{
dname = match_vec[0];

Expand Down
18 changes: 18 additions & 0 deletions include/utilities/external_process.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#include <array>
#include <cassert>
#include <filesystem>
#include <iostream>
#include <memory>
#include <ranges>
#include <sstream>
Expand Down Expand Up @@ -138,3 +139,20 @@ class external_process final {
close(stderrpipe[WRITE_END]);
}
};

template <typename arg_t>
bool check_external_process_success(std::vector<arg_t> const & proc_args, external_process const & proc)
{
if (proc.status() != 0) {
std::cerr << "External process failed\n";
std::cerr << "call:";
for (auto args : proc_args) {
std::cerr << " " << args;
}
std::cerr << '\n';
std::cerr << proc.cerr() << '\n';
return false;
}
else
return true;
}
12 changes: 6 additions & 6 deletions include/valik/build/index_factory.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@

#include <valik/build/call_parallel_on_bins.hpp>
#include <valik/index.hpp>
#include <valik/split/reference_metadata.hpp>
#include <valik/split/reference_segments.hpp>
#include <valik/split/database_metadata.hpp>
#include <valik/split/database_segments.hpp>

#include <seqan3/search/views/minimiser_hash.hpp>

Expand Down Expand Up @@ -65,19 +65,19 @@ class index_factory

if (arguments->from_segments)
{
reference_segments segments(arguments->seg_path);
reference_metadata reference(arguments->ref_meta_path, false);
database_segments segments(arguments->seg_path);
database_metadata reference(arguments->ref_meta_path, false);

auto & ibf = index.ibf();
int i = 0;
for (auto && [seq] : sequence_file_t{arguments->bin_file})
{
// get the relevant segments for each reference
auto ref_seg = [&](reference_segments::segment & seg) {return reference.sequences.at(i).ind == seg.ref_ind;};
auto ref_seg = [&](database_segments::segment & seg) {return reference.sequences.at(i).ind == seg.seq_ind;};
for (auto & seg : segments.members | std::views::filter(ref_seg))
{
for (auto && value : seq | seqan3::views::slice(seg.start, seg.start + seg.len) | hash_view())
ibf.emplace(value, seqan3::bin_index{seg.bin});
ibf.emplace(value, seqan3::bin_index{seg.id});
}
i++;
}
Expand Down
78 changes: 78 additions & 0 deletions include/valik/search/cart_query_io.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
#pragma once

#include <filesystem>

#include <valik/search/query_record.hpp>

#include <stellar/stellar_query_segment.hpp>
#include <stellar/io/import_sequence.hpp>

#include <seqan3/io/sequence_file/output.hpp>
#include <seqan3/io/sequence_file/record.hpp>
#include <seqan3/core/debug_stream.hpp>

namespace valik
{

/**
* \brief Function that creates a seqan2::Segment from a query_record (split or short query)
*
* \param records vector containing valik split query segments
* \param seqs set of query segments (in-out)
* \param ids set of query segment ids (in-out)
* \param strOut stream for standard output
* \param strErr stream for error output
* \param hostQueries underlying sequences for query segments
* \param hostQueryIDs set of underlying sequence ids
*/
template <typename rec_vec_t, typename TAlphabet, typename TId, typename TStream>
inline bool get_cart_queries(rec_vec_t const & records,
seqan2::StringSet<seqan2::Segment<seqan2::String<TAlphabet> const, seqan2::InfixSegment>, seqan2::Dependent<>> & seqs,
seqan2::StringSet<TId> & ids,
TStream & strOut,
TStream & strErr)
{

std::set<TId> uniqueIds; // set of short IDs (cut at first whitespace)
bool idsUnique = true;

size_t seqCount{0};
for (auto & record : records)
{
seqan2::String<char> query_id = record.sequence_id;
seqan2::appendValue(seqs, record.querySegment, seqan2::Generous());
seqan2::appendValue(ids, query_id, seqan2::Generous());
seqCount++;
idsUnique &= stellar::_checkUniqueId(uniqueIds, (seqan2::String<char>) record.sequence_id);
}

strOut << "Loaded " << seqCount << " query sequence" << ((seqCount > 1) ? "s " : " ") << "from cart." << std::endl;
if (!idsUnique)
strErr << "WARNING: Non-unique query ids. Output can be ambiguous.\n";
return true;

}

/**
* \brief Function that writes out cart queries.
*
* \param records vector containing query segments
* \param cart_queries_path output path
*/
template <typename rec_vec_t>
void write_cart_queries(rec_vec_t & records, std::filesystem::path const & cart_queries_path)
{
using fields = seqan3::fields<seqan3::field::id, seqan3::field::seq>;
using types = seqan3::type_list<std::string, std::vector<seqan3::dna4>>;
using sequence_record_type = seqan3::sequence_record<types, fields>;

seqan3::sequence_file_output fout{cart_queries_path, fields{}};

for (auto & record : records)
{
sequence_record_type sequence_record{std::move(record.sequence_id), std::move(record.sequence)};
fout.push_back(sequence_record);
}
}

} // namespace valik
5 changes: 5 additions & 0 deletions include/valik/search/env_var_pack.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,11 @@
#include <vector>
#include <algorithm>

#include <utilities/external_process.hpp>
#include <valik/search/execution_metadata.hpp>
#include <valik/search/search_time_statistics.hpp>
#include <valik/shared.hpp>

namespace valik
{

Expand Down
45 changes: 45 additions & 0 deletions include/valik/search/execution_metadata.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
#pragma once

#include "sstream"

#include <seqan3/core/debug_stream.hpp>

#include <valik/search/search_time_statistics.hpp>
#include <valik/shared.hpp>

namespace valik::app
{

struct execution_metadata
{
struct thread_metadata
{
std::vector<std::string> output_files;
std::stringstream text_out;
std::vector<double> time_statistics;
};

execution_metadata(size_t const & threads)
{
table = std::vector<thread_metadata>(threads);
}

void merge(search_arguments const & arguments,
search_time_statistics & time_statistics) // IN-OUT parameter
{
// Merge all local data together
std::ofstream text_out(arguments.out_file.string() + ".out");
for (auto const& td : table)
{
output_files.insert(output_files.end(), td.output_files.begin(), td.output_files.end());
time_statistics.cart_processing_times.insert(time_statistics.cart_processing_times.end(), td.time_statistics.begin(), td.time_statistics.end());
text_out << td.text_out.str();
}
}

std::vector<thread_metadata> table;
std::vector<std::string> output_files;
std::unordered_map<size_t, size_t> bin_count; // <bin_id, carts_per_bin>
};

} // namespace valik::app
Loading