diff --git a/include/valik/search/producer_threads_parallel.hpp b/include/valik/search/producer_threads_parallel.hpp index 68133559..eab0d0e6 100644 --- a/include/valik/search/producer_threads_parallel.hpp +++ b/include/valik/search/producer_threads_parallel.hpp @@ -101,4 +101,33 @@ inline void search_all_parallel(size_t const ref_seg_count, } } +/** + * @brief Create a queue of bin indices to send for distributed search. Search each query record in each reference bin without prefiltering. + * Query I/O from disk in external process. +*/ +template +inline void fill_queue_with_bin_ids(size_t const ref_seg_count, + search_arguments const & arguments, + cart_queue & queue) +{ + std::vector tasks; + + query_t dummy_record{}; + for (size_t i = 0; i < arguments.threads; ++i) + { + auto all_cb = [=,&queue,&arguments]() + { + for (size_t bin{0}; bin < ref_seg_count; bin++) + { + queue.insert(bin, dummy_record); + } + }; + + tasks.emplace_back([=]() + { + all_cb(); + }); + } +} + } // namespace valik::app diff --git a/include/valik/search/search_distributed.hpp b/include/valik/search/search_distributed.hpp index fec6c1e2..7aec6499 100644 --- a/include/valik/search/search_distributed.hpp +++ b/include/valik/search/search_distributed.hpp @@ -19,33 +19,46 @@ namespace valik::app * @param time_statistics Run-time statistics. * @return false if search failed. */ -template +template bool search_distributed(search_arguments & arguments, search_time_statistics & time_statistics) { using index_structure_t = std::conditional_t; auto index = valik_index{}; + std::optional ref_meta; + if (!arguments.ref_meta_path.empty()) + ref_meta = metadata(arguments.ref_meta_path); + + size_t bin_count; + if (stellar_only) + { + if (!ref_meta && arguments.bin_path.size() == 1) + throw std::runtime_error("Preprocess reference with valik split and provide --ref-meta."); + + bin_count = std::max(ref_meta->seg_count, arguments.bin_path.size()); + if (arguments.max_queued_carts == std::numeric_limits::max()) // if no user input + arguments.max_queued_carts = bin_count; + arguments.cart_max_capacity = 1; + } + else { auto start = std::chrono::high_resolution_clock::now(); load_index(index, arguments.index_file); auto end = std::chrono::high_resolution_clock::now(); time_statistics.index_io_time += std::chrono::duration_cast>(end - start).count(); - } - - if (arguments.max_queued_carts == std::numeric_limits::max()) // if no user input - arguments.max_queued_carts = index.ibf().bin_count(); - std::optional ref_meta; - if (!arguments.ref_meta_path.empty()) - ref_meta = metadata(arguments.ref_meta_path); + if (arguments.max_queued_carts == std::numeric_limits::max()) // if no user input + arguments.max_queued_carts = index.ibf().bin_count(); + bin_count = index.ibf().bin_count(); + } env_var_pack var_pack{}; - auto queue = cart_queue{index.ibf().bin_count(), arguments.cart_max_capacity, arguments.max_queued_carts}; + auto queue = cart_queue{bin_count, arguments.cart_max_capacity, arguments.max_queued_carts}; std::mutex mutex; execution_metadata exec_meta(arguments.threads); - bool error_in_search = false; // indicates if an error happen inside this lambda + bool error_in_search = false; // indicates if an error happened inside this lambda auto consumerThreads = std::vector{}; for (size_t threadNbr = 0; threadNbr < arguments.threads; ++threadNbr) { @@ -61,45 +74,45 @@ bool search_distributed(search_arguments & arguments, search_time_statistics & t std::filesystem::path cart_queries_path = var_pack.tmp_path / std::string("query_" + std::to_string(bin_id) + "_" + std::to_string(exec_meta.bin_count[bin_id]++) + ".fasta"); g.unlock(); - + std::filesystem::path cart_output_path = cart_queries_path.string() + ".gff"; thread_meta.output_files.push_back(cart_queries_path.string() + ".gff"); - write_cart_queries(records, cart_queries_path); + if (stellar_only) + { + // search all queries in all bins + cart_queries_path = arguments.query_file; + } + else + { + write_cart_queries(records, cart_queries_path); + } std::vector process_args{}; process_args.insert(process_args.end(), {var_pack.stellar_exec, "--version-check", "0", "--verbose", "-a", "dna"}); - if (ref_meta) - { + if (arguments.bin_path.size() == 1) + { + //!TODO: Distibution granularity should be reduced for stellar only search auto ref_len = ref_meta->total_len; auto seg = ref_meta->segment_from_bin(bin_id); if (seg.seq_vec.size() > 1) throw std::runtime_error("Ambiguous sequence for distributed search."); - if (index.bin_path().size() > 1) - { - // search a bin of a clustered metagenomic database - process_args.insert(process_args.end(), {index.bin_path()[bin_id][0], std::string(cart_queries_path)}); - - } - else - { - // search segments of a single reference file - process_args.insert(process_args.end(), {index.bin_path()[0][0], std::string(cart_queries_path), - "--referenceLength", std::to_string(ref_len), - "--sequenceOfInterest", std::to_string(seg.seq_vec[0]), - "--segmentBegin", std::to_string(seg.start), - "--segmentEnd", std::to_string(seg.start + seg.len)}); - } + // search segments of a single reference file + process_args.insert(process_args.end(), {arguments.bin_path[0][0], std::string(cart_queries_path), + "--referenceLength", std::to_string(ref_len), + "--sequenceOfInterest", std::to_string(seg.seq_vec[0]), + "--segmentBegin", std::to_string(seg.start), + "--segmentEnd", std::to_string(seg.start + seg.len)}); } else { // search a reference database of bin sequence files - if (index.bin_path().size() < (size_t) bin_id) { + if (arguments.bin_path.size() < (size_t) bin_id) { throw std::runtime_error("Could not find reference file with index " + std::to_string(bin_id) + ". Did you forget to provide metadata to search segments in a single reference file instead?"); } - process_args.insert(process_args.end(), {index.bin_path()[bin_id][0], std::string(cart_queries_path)}); + process_args.insert(process_args.end(), {arguments.bin_path[bin_id][0], std::string(cart_queries_path)}); } if (arguments.write_time) @@ -108,7 +121,7 @@ bool search_distributed(search_arguments & arguments, search_time_statistics & t float numEpsilon = arguments.error_rate; process_args.insert(process_args.end(), {"-e", std::to_string(numEpsilon), "-l", std::to_string(arguments.pattern_size), - "-o", std::string(cart_queries_path) + ".gff"}); + "-o", cart_output_path}); process_args.insert(process_args.end(), {"--repeatPeriod", std::to_string(arguments.maxRepeatPeriod)}); process_args.insert(process_args.end(), {"--repeatLength", std::to_string(arguments.minRepeatLength)}); @@ -141,9 +154,16 @@ bool search_distributed(search_arguments & arguments, search_time_statistics & t } auto start = std::chrono::high_resolution_clock::now(); - raptor::threshold::threshold const thresholder{arguments.make_threshold_parameters()}; - iterate_distributed_queries(arguments, index.ibf(), thresholder, queue); + if constexpr (stellar_only) + { + fill_queue_with_bin_ids(bin_count, arguments, queue); + } + else + { + raptor::threshold::threshold const thresholder{arguments.make_threshold_parameters()}; + iterate_distributed_queries(arguments, index.ibf(), thresholder, queue); + } queue.finish(); // Flush carts that are not empty yet consumerThreads.clear(); auto end = std::chrono::high_resolution_clock::now(); diff --git a/src/argument_parsing/build.cpp b/src/argument_parsing/build.cpp index fda25c2f..656d8139 100644 --- a/src/argument_parsing/build.cpp +++ b/src/argument_parsing/build.cpp @@ -115,9 +115,6 @@ void run_build(sharg::parser & parser) // ========================================== // Various checks. // ========================================== - arguments.shape = seqan3::shape{seqan3::ungapped{arguments.kmer_size}}; - arguments.shape_weight = arguments.shape.count(); - if (parser.is_option_set("kmer-count-cutoff") && parser.is_option_set("use-filesize-dependent-cutoff")) throw sharg::parser_error{"You cannot use both --kmer-count-cutoff and --use-filesize-dependent-cutoff."}; @@ -130,14 +127,17 @@ void run_build(sharg::parser & parser) { if (arguments.fast) { - arguments.window_size = arguments.shape.size() + 2; + arguments.window_size = arguments.kmer_size + 2; raptor::compute_minimiser(arguments); arguments.input_is_minimiser = true; } else - arguments.window_size = arguments.shape.size(); + arguments.window_size = arguments.kmer_size; } + arguments.shape = seqan3::shape{seqan3::ungapped{arguments.kmer_size}}; + arguments.shape_weight = arguments.shape.count(); + // ========================================== // Process bin_path: // if building from clustered sequences each line in input corresponds to a bin diff --git a/src/valik_search.cpp b/src/valik_search.cpp index 3b2c29ad..e526dd37 100644 --- a/src/valik_search.cpp +++ b/src/valik_search.cpp @@ -44,10 +44,10 @@ void valik_search(search_arguments & arguments) bool failed; if (arguments.distribute) { - runtime_to_compile_time([&]() + runtime_to_compile_time([&]() { - failed = search_distributed(arguments, time_statistics); - }, arguments.compressed); + failed = search_distributed(arguments, time_statistics); + }, arguments.compressed, (arguments.search_type == search_kind::STELLAR)); } // Shared memory execution else