Skip to content

Commit

Permalink
Switch for distributed stellar only search
Browse files Browse the repository at this point in the history
  • Loading branch information
eaasna committed Dec 10, 2024
1 parent d5ecaef commit 068f809
Show file tree
Hide file tree
Showing 6 changed files with 69 additions and 49 deletions.
6 changes: 4 additions & 2 deletions include/utilities/threshold/search_kmer_profile.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -117,8 +117,10 @@ struct search_kmer_profile
search_error_profile get_error_profile(uint8_t const errors)
{
if (error_table.find(errors) == error_table.end())
throw std::runtime_error("Error count " + std::to_string(errors) + " out of precalculated range");

{
auto highest_er = error_table.at(max_errors);
return {highest_er.params, highest_er.pattern, search_kind::STELLAR};
}
return error_table.at(errors);
}

Expand Down
15 changes: 12 additions & 3 deletions include/valik/search/search_local.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ static inline dream_stellar::StellarOptions make_thread_options(search_arguments
template <bool is_split, bool stellar_only>
bool search_local(search_arguments & arguments, search_time_statistics & time_statistics)
{
if (arguments.bin_path.size() > 1 || arguments.bin_path[0].size() > 1)
if (arguments.bin_path.size() > 1 || (arguments.bin_path.size() > 0 && arguments.bin_path[0].size() > 1))
throw std::runtime_error("Multiple reference files can not be searched in shared memory mode. "
"Add --distribute argument to launch multiple distributed instances of DREAM-Stellar search.");

Expand All @@ -84,10 +84,20 @@ bool search_local(search_arguments & arguments, search_time_statistics & time_st
metadata ref_meta = metadata(arguments.ref_meta_path);
if (stellar_only)
{
arguments.bin_path.clear(); // in case inserted from index
for (auto & f : ref_meta.files)
arguments.bin_path.push_back(std::vector<std::string>{f.path});

auto prefilter_bin_count = ref_meta.seg_count;
split_arguments stellar_dist_arguments;
// distribute stellar search
stellar_dist_arguments.seg_count = std::max(ref_meta.seq_count, (size_t) arguments.threads);

// for some number of reference sequences split sequences into as many segments as is the next multiple of thread count
if (ref_meta.seq_count % arguments.threads > 0)
stellar_dist_arguments.seg_count = ref_meta.seq_count + (arguments.threads - ref_meta.seq_count % arguments.threads);
else
stellar_dist_arguments.seg_count = ref_meta.seq_count;

stellar_dist_arguments.pattern_size = ref_meta.pattern_size;
ref_meta.update_segments_for_distributed_stellar(stellar_dist_arguments);
// update cart queue parameters for distributed stellar
Expand Down Expand Up @@ -208,7 +218,6 @@ bool search_local(search_arguments & arguments, search_time_statistics & time_st
for (auto next = queue.dequeue(); next; next = queue.dequeue())
{
auto & [bin_id, records] = *next;

std::unique_lock g(mutex);
std::filesystem::path cart_queries_path = var_pack.tmp_path / std::string("query_" + std::to_string(bin_id) +
"_" + std::to_string(exec_meta.bin_count[bin_id]++) + ".fasta");
Expand Down
1 change: 1 addition & 0 deletions include/valik/shared.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,7 @@ struct search_arguments final : public minimiser_threshold_arguments, search_pro
bool keep_best_repeats{false};
double best_bin_entropy_cutoff{0.25};
bool keep_all_repeats{false};
bool stellar_only{false};

size_t cart_max_capacity{1000};
size_t max_queued_carts{std::numeric_limits<size_t>::max()};
Expand Down
15 changes: 6 additions & 9 deletions include/valik/split/metadata.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,7 @@ struct metadata

if (segments.size() != n)
{
throw std::runtime_error("Error: Database was split into " + std::to_string(segments.size()) +
throw std::runtime_error("Database was split into " + std::to_string(segments.size()) +
" instead of " + std::to_string(n) + " segments.");
}
}
Expand Down Expand Up @@ -373,6 +373,7 @@ struct metadata

size_t len_lower_bound = default_seg_len / 10;
// Check how many sequences are discarded for being too short
//!TODO: gather short sequences
auto first_long_seq = std::find_if(sequences.begin(), sequences.end(), [&](auto const & seq){return (seq.len > len_lower_bound);});
size_t discarded_short_sequences = first_long_seq - sequences.begin();

Expand Down Expand Up @@ -412,19 +413,15 @@ struct metadata
std::to_string(arguments.pattern_size) + "bp.\nDecrease the overlap or the number of segments.");
}

size_t len_lower_bound = default_seg_len / 10;
// Check how many sequences are discarded for being too short
auto first_long_seq = std::find_if(sequences.begin(), sequences.end(), [&](auto const & seq){return (seq.len > len_lower_bound);});
size_t discarded_short_sequences = first_long_seq - sequences.begin();

arguments.seg_count = std::max(arguments.seg_count, (uint32_t) (sequences.size() - discarded_short_sequences));
make_exactly_n_segments(arguments.seg_count, arguments.pattern_size, first_long_seq);
auto seq_begin = sequences.begin();
make_equal_length_segments(arguments.pattern_size, seq_begin);
seg_count = segments.size();

std::stable_sort(sequences.begin(), sequences.end(), fasta_order());
std::stable_sort(segments.begin(), segments.end(), fasta_order());
for (size_t i = 0; i < segments.size(); i++)
segments[i].id = i;

}

/**
Expand Down Expand Up @@ -467,7 +464,7 @@ struct metadata

scan_database_sequences(arguments);
if (arguments.manual_parameters && (segments.size() != arguments.seg_count))
seqan3::debug_stream << "WARNING: Database was split into " << segments.size() << " instead of " << arguments.seg_count << " segments.\n";
seqan3::debug_stream << "[Warning] Database was split into " << segments.size() << " instead of " << arguments.seg_count << " segments.\n";

seq_count = sequences.size();
seg_count = segments.size();
Expand Down
2 changes: 1 addition & 1 deletion lib/seqan
79 changes: 45 additions & 34 deletions src/argument_parsing/search.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ void init_search_parser(sharg::parser & parser, search_arguments & arguments)
sharg::config{.short_id = '\0',
.long_id = "index",
.description = "Provide a valid path to an IBF.",
.required = true,
.validator = sharg::input_file_validator{}});
parser.add_option(arguments.query_file,
sharg::config{.short_id = '\0',
Expand Down Expand Up @@ -74,6 +73,11 @@ void init_search_parser(sharg::parser & parser, search_arguments & arguments)
/////////////////////////////////////////
// Advanced options
/////////////////////////////////////////
parser.add_flag(arguments.stellar_only,
sharg::config{.short_id = '\0',
.long_id = "stellar-only",
.description = "Do not prefilter before Stellar search.",
.advanced = true});
parser.add_flag(arguments.manual_parameters,
sharg::config{.short_id = '\0',
.long_id = "without-parameter-tuning",
Expand Down Expand Up @@ -228,7 +232,7 @@ void run_search(sharg::parser & parser)
}
else
{
//TODO: this can be removed?
//!TODO: can this be removed?
if (arguments.split_query && arguments.manual_parameters)
{
throw std::runtime_error{"Provide the chosen number of query segments with --seg-count "
Expand All @@ -252,10 +256,16 @@ void run_search(sharg::parser & parser)
throw sharg::validation_error{"Invalid parameter values: Please choose numMatches <= sortThresh.\n"};


// ==========================================
// Read window and kmer size, and the bin paths.
// ==========================================
if (!arguments.stellar_only && !parser.is_option_set("index"))
{
throw sharg::parser_error{"Option --index is required but not set."};
}

if (!arguments.stellar_only)
{
// ==========================================
// Read window and kmer size, and the bin paths.
// ==========================================
std::ifstream is{arguments.index_file.string(),std::ios::binary};
cereal::BinaryInputArchive iarchive{is};
valik_index<> tmp{};
Expand Down Expand Up @@ -294,7 +304,7 @@ void run_search(sharg::parser & parser)
// ==========================================
// Process manual threshold.
// ==========================================
if (parser.is_option_set("threshold"))
if (parser.is_option_set("threshold") && !arguments.stellar_only)
{
if (!arguments.manual_parameters)
{
Expand Down Expand Up @@ -325,11 +335,11 @@ void run_search(sharg::parser & parser)
}
}

// ==========================================
// Extract data from reference metadata.
// ==========================================
if (!arguments.manual_parameters)
{
// ==========================================
// Extract data from reference metadata.
// ==========================================
if (!parser.is_option_set("ref-meta"))
throw sharg::validation_error("Provide --ref-meta to deduce suitable search parameters or set --without-parameter-tuning and --pattern size.");

Expand All @@ -339,8 +349,6 @@ void run_search(sharg::parser & parser)
argument_input_validator(search_profile_file);
search_kmer_profile search_profile{search_profile_file};



arguments.pattern_size = search_profile.l;
arguments.errors = std::ceil(arguments.error_rate * arguments.pattern_size); // update based on pattern size in metadata
search_error_profile error_profile = search_profile.get_error_profile(arguments.errors);
Expand All @@ -361,35 +369,38 @@ void run_search(sharg::parser & parser)
throw sharg::validation_error("Threshold can not be larger than the number of k-mers per pattern.");
arguments.threshold_percentage = arguments.threshold / (double) (arguments.pattern_size - arguments.shape.size() + 1);
arguments.fnr = error_profile.fnr;

if (arguments.window_size > arguments.shape_size)
arguments.search_type = search_kind::MINIMISER;
}

if (arguments.window_size > arguments.shape_size)
arguments.search_type = search_kind::MINIMISER;
}
// ==========================================
// More checks.
// ==========================================
if (parser.is_option_set("query-every"))
{
if (arguments.query_every > arguments.pattern_size)
throw sharg::validation_error("Reduce --query-every so that all positions in the query would be considered at least once.");
}
else
{
if (arguments.fast)
arguments.query_every = 3;
}

// ==========================================
// More checks.
// ==========================================
if (parser.is_option_set("query-every"))
{
if (arguments.query_every > arguments.pattern_size)
throw sharg::validation_error("Reduce --query-every so that all positions in the query would be considered at least once.");
}
else
{
if (arguments.fast)
arguments.query_every = 3;
// ==========================================
// Set strict thresholding parameters for fast mode.
// ==========================================
if (!parser.is_option_set("tau") && arguments.fast)
arguments.tau = 0.99999;

if (!parser.is_option_set("p_max") && arguments.fast)
arguments.p_max = 0.05;
}

// ==========================================
// Set strict thresholding parameters for fast mode.
// ==========================================
if (!parser.is_option_set("tau") && arguments.fast)
arguments.tau = 0.99999;
if (arguments.stellar_only)
arguments.search_type = search_kind::STELLAR;

if (!parser.is_option_set("p_max") && arguments.fast)
arguments.p_max = 0.05;

// ==========================================
// Dispatch
// ==========================================
Expand Down

0 comments on commit 068f809

Please sign in to comment.