From 068f809d733e5fab88f99aa7ed5ec188cff11a4d Mon Sep 17 00:00:00 2001 From: Evelin Date: Sun, 17 Nov 2024 10:46:52 +0100 Subject: [PATCH] Switch for distributed stellar only search --- .../threshold/search_kmer_profile.hpp | 6 +- include/valik/search/search_local.hpp | 15 +++- include/valik/shared.hpp | 1 + include/valik/split/metadata.hpp | 15 ++-- lib/seqan | 2 +- src/argument_parsing/search.cpp | 79 +++++++++++-------- 6 files changed, 69 insertions(+), 49 deletions(-) diff --git a/include/utilities/threshold/search_kmer_profile.hpp b/include/utilities/threshold/search_kmer_profile.hpp index 265690fc..9228a244 100644 --- a/include/utilities/threshold/search_kmer_profile.hpp +++ b/include/utilities/threshold/search_kmer_profile.hpp @@ -117,8 +117,10 @@ struct search_kmer_profile search_error_profile get_error_profile(uint8_t const errors) { if (error_table.find(errors) == error_table.end()) - throw std::runtime_error("Error count " + std::to_string(errors) + " out of precalculated range"); - + { + auto highest_er = error_table.at(max_errors); + return {highest_er.params, highest_er.pattern, search_kind::STELLAR}; + } return error_table.at(errors); } diff --git a/include/valik/search/search_local.hpp b/include/valik/search/search_local.hpp index d8dc8b9c..49d308e7 100644 --- a/include/valik/search/search_local.hpp +++ b/include/valik/search/search_local.hpp @@ -66,7 +66,7 @@ static inline dream_stellar::StellarOptions make_thread_options(search_arguments template bool search_local(search_arguments & arguments, search_time_statistics & time_statistics) { - if (arguments.bin_path.size() > 1 || arguments.bin_path[0].size() > 1) + if (arguments.bin_path.size() > 1 || (arguments.bin_path.size() > 0 && arguments.bin_path[0].size() > 1)) throw std::runtime_error("Multiple reference files can not be searched in shared memory mode. " "Add --distribute argument to launch multiple distributed instances of DREAM-Stellar search."); @@ -84,10 +84,20 @@ bool search_local(search_arguments & arguments, search_time_statistics & time_st metadata ref_meta = metadata(arguments.ref_meta_path); if (stellar_only) { + arguments.bin_path.clear(); // in case inserted from index + for (auto & f : ref_meta.files) + arguments.bin_path.push_back(std::vector{f.path}); + auto prefilter_bin_count = ref_meta.seg_count; split_arguments stellar_dist_arguments; // distribute stellar search - stellar_dist_arguments.seg_count = std::max(ref_meta.seq_count, (size_t) arguments.threads); + + // for some number of reference sequences split sequences into as many segments as is the next multiple of thread count + if (ref_meta.seq_count % arguments.threads > 0) + stellar_dist_arguments.seg_count = ref_meta.seq_count + (arguments.threads - ref_meta.seq_count % arguments.threads); + else + stellar_dist_arguments.seg_count = ref_meta.seq_count; + stellar_dist_arguments.pattern_size = ref_meta.pattern_size; ref_meta.update_segments_for_distributed_stellar(stellar_dist_arguments); // update cart queue parameters for distributed stellar @@ -208,7 +218,6 @@ bool search_local(search_arguments & arguments, search_time_statistics & time_st for (auto next = queue.dequeue(); next; next = queue.dequeue()) { auto & [bin_id, records] = *next; - std::unique_lock g(mutex); std::filesystem::path cart_queries_path = var_pack.tmp_path / std::string("query_" + std::to_string(bin_id) + "_" + std::to_string(exec_meta.bin_count[bin_id]++) + ".fasta"); diff --git a/include/valik/shared.hpp b/include/valik/shared.hpp index d33c6dc8..4cb54666 100644 --- a/include/valik/shared.hpp +++ b/include/valik/shared.hpp @@ -184,6 +184,7 @@ struct search_arguments final : public minimiser_threshold_arguments, search_pro bool keep_best_repeats{false}; double best_bin_entropy_cutoff{0.25}; bool keep_all_repeats{false}; + bool stellar_only{false}; size_t cart_max_capacity{1000}; size_t max_queued_carts{std::numeric_limits::max()}; diff --git a/include/valik/split/metadata.hpp b/include/valik/split/metadata.hpp index b67da576..5ff218bc 100644 --- a/include/valik/split/metadata.hpp +++ b/include/valik/split/metadata.hpp @@ -308,7 +308,7 @@ struct metadata if (segments.size() != n) { - throw std::runtime_error("Error: Database was split into " + std::to_string(segments.size()) + + throw std::runtime_error("Database was split into " + std::to_string(segments.size()) + " instead of " + std::to_string(n) + " segments."); } } @@ -373,6 +373,7 @@ struct metadata size_t len_lower_bound = default_seg_len / 10; // Check how many sequences are discarded for being too short + //!TODO: gather short sequences auto first_long_seq = std::find_if(sequences.begin(), sequences.end(), [&](auto const & seq){return (seq.len > len_lower_bound);}); size_t discarded_short_sequences = first_long_seq - sequences.begin(); @@ -412,19 +413,15 @@ struct metadata std::to_string(arguments.pattern_size) + "bp.\nDecrease the overlap or the number of segments."); } - size_t len_lower_bound = default_seg_len / 10; - // Check how many sequences are discarded for being too short - auto first_long_seq = std::find_if(sequences.begin(), sequences.end(), [&](auto const & seq){return (seq.len > len_lower_bound);}); - size_t discarded_short_sequences = first_long_seq - sequences.begin(); - - arguments.seg_count = std::max(arguments.seg_count, (uint32_t) (sequences.size() - discarded_short_sequences)); - make_exactly_n_segments(arguments.seg_count, arguments.pattern_size, first_long_seq); + auto seq_begin = sequences.begin(); + make_equal_length_segments(arguments.pattern_size, seq_begin); seg_count = segments.size(); std::stable_sort(sequences.begin(), sequences.end(), fasta_order()); std::stable_sort(segments.begin(), segments.end(), fasta_order()); for (size_t i = 0; i < segments.size(); i++) segments[i].id = i; + } /** @@ -467,7 +464,7 @@ struct metadata scan_database_sequences(arguments); if (arguments.manual_parameters && (segments.size() != arguments.seg_count)) - seqan3::debug_stream << "WARNING: Database was split into " << segments.size() << " instead of " << arguments.seg_count << " segments.\n"; + seqan3::debug_stream << "[Warning] Database was split into " << segments.size() << " instead of " << arguments.seg_count << " segments.\n"; seq_count = sequences.size(); seg_count = segments.size(); diff --git a/lib/seqan b/lib/seqan index 8ce355dd..832288f6 160000 --- a/lib/seqan +++ b/lib/seqan @@ -1 +1 @@ -Subproject commit 8ce355dd960bbf7a5fa0292b49f7342f7e456da6 +Subproject commit 832288f66a3a5f5450a2b75d7729a6e2b16a256c diff --git a/src/argument_parsing/search.cpp b/src/argument_parsing/search.cpp index cba0b693..25763aa1 100644 --- a/src/argument_parsing/search.cpp +++ b/src/argument_parsing/search.cpp @@ -15,7 +15,6 @@ void init_search_parser(sharg::parser & parser, search_arguments & arguments) sharg::config{.short_id = '\0', .long_id = "index", .description = "Provide a valid path to an IBF.", - .required = true, .validator = sharg::input_file_validator{}}); parser.add_option(arguments.query_file, sharg::config{.short_id = '\0', @@ -74,6 +73,11 @@ void init_search_parser(sharg::parser & parser, search_arguments & arguments) ///////////////////////////////////////// // Advanced options ///////////////////////////////////////// + parser.add_flag(arguments.stellar_only, + sharg::config{.short_id = '\0', + .long_id = "stellar-only", + .description = "Do not prefilter before Stellar search.", + .advanced = true}); parser.add_flag(arguments.manual_parameters, sharg::config{.short_id = '\0', .long_id = "without-parameter-tuning", @@ -228,7 +232,7 @@ void run_search(sharg::parser & parser) } else { - //TODO: this can be removed? + //!TODO: can this be removed? if (arguments.split_query && arguments.manual_parameters) { throw std::runtime_error{"Provide the chosen number of query segments with --seg-count " @@ -252,10 +256,16 @@ void run_search(sharg::parser & parser) throw sharg::validation_error{"Invalid parameter values: Please choose numMatches <= sortThresh.\n"}; - // ========================================== - // Read window and kmer size, and the bin paths. - // ========================================== + if (!arguments.stellar_only && !parser.is_option_set("index")) + { + throw sharg::parser_error{"Option --index is required but not set."}; + } + + if (!arguments.stellar_only) { + // ========================================== + // Read window and kmer size, and the bin paths. + // ========================================== std::ifstream is{arguments.index_file.string(),std::ios::binary}; cereal::BinaryInputArchive iarchive{is}; valik_index<> tmp{}; @@ -294,7 +304,7 @@ void run_search(sharg::parser & parser) // ========================================== // Process manual threshold. // ========================================== - if (parser.is_option_set("threshold")) + if (parser.is_option_set("threshold") && !arguments.stellar_only) { if (!arguments.manual_parameters) { @@ -325,11 +335,11 @@ void run_search(sharg::parser & parser) } } - // ========================================== - // Extract data from reference metadata. - // ========================================== if (!arguments.manual_parameters) { + // ========================================== + // Extract data from reference metadata. + // ========================================== if (!parser.is_option_set("ref-meta")) throw sharg::validation_error("Provide --ref-meta to deduce suitable search parameters or set --without-parameter-tuning and --pattern size."); @@ -339,8 +349,6 @@ void run_search(sharg::parser & parser) argument_input_validator(search_profile_file); search_kmer_profile search_profile{search_profile_file}; - - arguments.pattern_size = search_profile.l; arguments.errors = std::ceil(arguments.error_rate * arguments.pattern_size); // update based on pattern size in metadata search_error_profile error_profile = search_profile.get_error_profile(arguments.errors); @@ -361,35 +369,38 @@ void run_search(sharg::parser & parser) throw sharg::validation_error("Threshold can not be larger than the number of k-mers per pattern."); arguments.threshold_percentage = arguments.threshold / (double) (arguments.pattern_size - arguments.shape.size() + 1); arguments.fnr = error_profile.fnr; + + if (arguments.window_size > arguments.shape_size) + arguments.search_type = search_kind::MINIMISER; } - if (arguments.window_size > arguments.shape_size) - arguments.search_type = search_kind::MINIMISER; - } + // ========================================== + // More checks. + // ========================================== + if (parser.is_option_set("query-every")) + { + if (arguments.query_every > arguments.pattern_size) + throw sharg::validation_error("Reduce --query-every so that all positions in the query would be considered at least once."); + } + else + { + if (arguments.fast) + arguments.query_every = 3; + } - // ========================================== - // More checks. - // ========================================== - if (parser.is_option_set("query-every")) - { - if (arguments.query_every > arguments.pattern_size) - throw sharg::validation_error("Reduce --query-every so that all positions in the query would be considered at least once."); - } - else - { - if (arguments.fast) - arguments.query_every = 3; + // ========================================== + // Set strict thresholding parameters for fast mode. + // ========================================== + if (!parser.is_option_set("tau") && arguments.fast) + arguments.tau = 0.99999; + + if (!parser.is_option_set("p_max") && arguments.fast) + arguments.p_max = 0.05; } - // ========================================== - // Set strict thresholding parameters for fast mode. - // ========================================== - if (!parser.is_option_set("tau") && arguments.fast) - arguments.tau = 0.99999; + if (arguments.stellar_only) + arguments.search_type = search_kind::STELLAR; - if (!parser.is_option_set("p_max") && arguments.fast) - arguments.p_max = 0.05; - // ========================================== // Dispatch // ==========================================