Skip to content

Commit

Permalink
Launch distributed stellar search without prefiltering
Browse files Browse the repository at this point in the history
  • Loading branch information
eaasna committed Apr 8, 2024
1 parent 4fa94a8 commit 41cd7d8
Show file tree
Hide file tree
Showing 4 changed files with 91 additions and 42 deletions.
29 changes: 29 additions & 0 deletions include/valik/search/producer_threads_parallel.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -101,4 +101,33 @@ inline void search_all_parallel(size_t const ref_seg_count,
}
}

/**
* @brief Create a queue of bin indices to send for distributed search. Search each query record in each reference bin without prefiltering.
* Query I/O from disk in external process.
*/
template <typename query_t>
inline void fill_queue_with_bin_ids(size_t const ref_seg_count,
search_arguments const & arguments,
cart_queue<query_t> & queue)
{
std::vector<std::jthread> tasks;

query_t dummy_record{};
for (size_t i = 0; i < arguments.threads; ++i)
{
auto all_cb = [=,&queue,&arguments]()
{
for (size_t bin{0}; bin < ref_seg_count; bin++)
{
queue.insert(bin, dummy_record);
}
};

tasks.emplace_back([=]()
{
all_cb();
});
}
}

} // namespace valik::app
88 changes: 54 additions & 34 deletions include/valik/search/search_distributed.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,33 +19,46 @@ namespace valik::app
* @param time_statistics Run-time statistics.
* @return false if search failed.
*/
template <bool compressed>
template <bool compressed, bool stellar_only>
bool search_distributed(search_arguments & arguments, search_time_statistics & time_statistics)
{
using index_structure_t = std::conditional_t<compressed, index_structure::ibf_compressed, index_structure::ibf>;
auto index = valik_index<index_structure_t>{};

std::optional<metadata> ref_meta;
if (!arguments.ref_meta_path.empty())
ref_meta = metadata(arguments.ref_meta_path);

size_t bin_count;
if (stellar_only)
{
if (!ref_meta && arguments.bin_path.size() == 1)
throw std::runtime_error("Preprocess reference with valik split and provide --ref-meta.");

bin_count = std::max(ref_meta->seg_count, arguments.bin_path.size());
if (arguments.max_queued_carts == std::numeric_limits<uint32_t>::max()) // if no user input
arguments.max_queued_carts = bin_count;
arguments.cart_max_capacity = 1;
}
else
{
auto start = std::chrono::high_resolution_clock::now();
load_index(index, arguments.index_file);
auto end = std::chrono::high_resolution_clock::now();
time_statistics.index_io_time += std::chrono::duration_cast<std::chrono::duration<double>>(end - start).count();
}

if (arguments.max_queued_carts == std::numeric_limits<uint32_t>::max()) // if no user input
arguments.max_queued_carts = index.ibf().bin_count();

std::optional<metadata> ref_meta;
if (!arguments.ref_meta_path.empty())
ref_meta = metadata(arguments.ref_meta_path);
if (arguments.max_queued_carts == std::numeric_limits<uint32_t>::max()) // if no user input
arguments.max_queued_carts = index.ibf().bin_count();
bin_count = index.ibf().bin_count();
}

env_var_pack var_pack{};
auto queue = cart_queue<query_record>{index.ibf().bin_count(), arguments.cart_max_capacity, arguments.max_queued_carts};
auto queue = cart_queue<query_record>{bin_count, arguments.cart_max_capacity, arguments.max_queued_carts};

std::mutex mutex;
execution_metadata exec_meta(arguments.threads);

bool error_in_search = false; // indicates if an error happen inside this lambda
bool error_in_search = false; // indicates if an error happened inside this lambda
auto consumerThreads = std::vector<std::jthread>{};
for (size_t threadNbr = 0; threadNbr < arguments.threads; ++threadNbr)
{
Expand All @@ -61,45 +74,45 @@ bool search_distributed(search_arguments & arguments, search_time_statistics & t
std::filesystem::path cart_queries_path = var_pack.tmp_path / std::string("query_" + std::to_string(bin_id) +
"_" + std::to_string(exec_meta.bin_count[bin_id]++) + ".fasta");
g.unlock();

std::filesystem::path cart_output_path = cart_queries_path.string() + ".gff";
thread_meta.output_files.push_back(cart_queries_path.string() + ".gff");

write_cart_queries(records, cart_queries_path);
if (stellar_only)
{
// search all queries in all bins
cart_queries_path = arguments.query_file;
}
else
{
write_cart_queries(records, cart_queries_path);
}

std::vector<std::string> process_args{};
process_args.insert(process_args.end(), {var_pack.stellar_exec, "--version-check", "0", "--verbose", "-a", "dna"});

if (ref_meta)
{
if (arguments.bin_path.size() == 1)
{
//!TODO: Distibution granularity should be reduced for stellar only search
auto ref_len = ref_meta->total_len;
auto seg = ref_meta->segment_from_bin(bin_id);
if (seg.seq_vec.size() > 1)
throw std::runtime_error("Ambiguous sequence for distributed search.");

if (index.bin_path().size() > 1)
{
// search a bin of a clustered metagenomic database
process_args.insert(process_args.end(), {index.bin_path()[bin_id][0], std::string(cart_queries_path)});

}
else
{
// search segments of a single reference file
process_args.insert(process_args.end(), {index.bin_path()[0][0], std::string(cart_queries_path),
"--referenceLength", std::to_string(ref_len),
"--sequenceOfInterest", std::to_string(seg.seq_vec[0]),
"--segmentBegin", std::to_string(seg.start),
"--segmentEnd", std::to_string(seg.start + seg.len)});
}
// search segments of a single reference file
process_args.insert(process_args.end(), {arguments.bin_path[0][0], std::string(cart_queries_path),
"--referenceLength", std::to_string(ref_len),
"--sequenceOfInterest", std::to_string(seg.seq_vec[0]),
"--segmentBegin", std::to_string(seg.start),
"--segmentEnd", std::to_string(seg.start + seg.len)});
}
else
{
// search a reference database of bin sequence files
if (index.bin_path().size() < (size_t) bin_id) {
if (arguments.bin_path.size() < (size_t) bin_id) {
throw std::runtime_error("Could not find reference file with index " + std::to_string(bin_id) +
". Did you forget to provide metadata to search segments in a single reference file instead?");
}
process_args.insert(process_args.end(), {index.bin_path()[bin_id][0], std::string(cart_queries_path)});
process_args.insert(process_args.end(), {arguments.bin_path[bin_id][0], std::string(cart_queries_path)});
}

if (arguments.write_time)
Expand All @@ -108,7 +121,7 @@ bool search_distributed(search_arguments & arguments, search_time_statistics & t
float numEpsilon = arguments.error_rate;
process_args.insert(process_args.end(), {"-e", std::to_string(numEpsilon),
"-l", std::to_string(arguments.pattern_size),
"-o", std::string(cart_queries_path) + ".gff"});
"-o", cart_output_path});

process_args.insert(process_args.end(), {"--repeatPeriod", std::to_string(arguments.maxRepeatPeriod)});
process_args.insert(process_args.end(), {"--repeatLength", std::to_string(arguments.minRepeatLength)});
Expand Down Expand Up @@ -141,9 +154,16 @@ bool search_distributed(search_arguments & arguments, search_time_statistics & t
}

auto start = std::chrono::high_resolution_clock::now();
raptor::threshold::threshold const thresholder{arguments.make_threshold_parameters()};
iterate_distributed_queries(arguments, index.ibf(), thresholder, queue);
if constexpr (stellar_only)
{
fill_queue_with_bin_ids(bin_count, arguments, queue);
}
else
{
raptor::threshold::threshold const thresholder{arguments.make_threshold_parameters()};
iterate_distributed_queries(arguments, index.ibf(), thresholder, queue);

}
queue.finish(); // Flush carts that are not empty yet
consumerThreads.clear();
auto end = std::chrono::high_resolution_clock::now();
Expand Down
10 changes: 5 additions & 5 deletions src/argument_parsing/build.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -115,9 +115,6 @@ void run_build(sharg::parser & parser)
// ==========================================
// Various checks.
// ==========================================
arguments.shape = seqan3::shape{seqan3::ungapped{arguments.kmer_size}};
arguments.shape_weight = arguments.shape.count();

if (parser.is_option_set("kmer-count-cutoff") && parser.is_option_set("use-filesize-dependent-cutoff"))
throw sharg::parser_error{"You cannot use both --kmer-count-cutoff and --use-filesize-dependent-cutoff."};

Expand All @@ -130,14 +127,17 @@ void run_build(sharg::parser & parser)
{
if (arguments.fast)
{
arguments.window_size = arguments.shape.size() + 2;
arguments.window_size = arguments.kmer_size + 2;
raptor::compute_minimiser(arguments);
arguments.input_is_minimiser = true;
}
else
arguments.window_size = arguments.shape.size();
arguments.window_size = arguments.kmer_size;
}

arguments.shape = seqan3::shape{seqan3::ungapped{arguments.kmer_size}};
arguments.shape_weight = arguments.shape.count();

// ==========================================
// Process bin_path:
// if building from clustered sequences each line in input corresponds to a bin
Expand Down
6 changes: 3 additions & 3 deletions src/valik_search.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,10 @@ void valik_search(search_arguments & arguments)
bool failed;
if (arguments.distribute)
{
runtime_to_compile_time([&]<bool is_compressed>()
runtime_to_compile_time([&]<bool is_compressed, bool stellar_only>()
{
failed = search_distributed<is_compressed>(arguments, time_statistics);
}, arguments.compressed);
failed = search_distributed<is_compressed, stellar_only>(arguments, time_statistics);
}, arguments.compressed, (arguments.search_type == search_kind::STELLAR));
}
// Shared memory execution
else
Expand Down

0 comments on commit 41cd7d8

Please sign in to comment.