From a89bd8b0d62ee2606de53f3f2818903c3eaa8d59 Mon Sep 17 00:00:00 2001 From: Kitsu Date: Thu, 24 Dec 2020 21:37:40 +0300 Subject: [PATCH 1/3] Extract serial API --- Cargo.toml | 15 +- src/lib.rs | 5 +- src/search/mod.rs | 315 ++++++----------------------------------- src/search/parallel.rs | 295 ++++++++++++++++++++++++++++++++++++++ 4 files changed, 349 insertions(+), 281 deletions(-) create mode 100644 src/search/parallel.rs diff --git a/Cargo.toml b/Cargo.toml index 2c243d3..1fa7ff9 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -25,16 +25,21 @@ name = "fzyr" name = "fzyr" path = "src/bin/main.rs" doc = false +required-features = ["binary-build"] [dependencies] ndarray = "^0.11.2" -itertools = "^0.7.8" -crossbeam = "^0.4.1" bit-vec = "^0.5.0" -clap = "^2.32.0" -console = "^0.6.1" - +itertools = { version = "^0.7.8", optional = true } +crossbeam = { version = "^0.4.1", optional = true } +clap = { version = "^2.32.0", optional = true } +console = { version = "^0.6.1", optional = true } + +[features] +default = ["binary-build", "parallel"] +binary-build = ["clap", "console"] +parallel = ["itertools", "crossbeam"] [profile.release] opt-level = 3 diff --git a/src/lib.rs b/src/lib.rs index f275a3f..c9502b4 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -2,4 +2,7 @@ mod score; mod search; pub use score::{config, has_match, locate, score, LocateResult, Score, ScoreResult}; -pub use search::{search_locate, search_score, LocateResults, ScoreResults}; +pub use search::{search_serial, locate_serial, LocateResults, ScoreResults}; + +#[cfg(feature = "parallel")] +pub use search::{search_locate, search_score}; diff --git a/src/search/mod.rs b/src/search/mod.rs index fbe527b..9fb5fb3 100644 --- a/src/search/mod.rs +++ b/src/search/mod.rs @@ -1,94 +1,58 @@ -extern crate crossbeam; -extern crate itertools; - use std::cmp::Ordering; -use std::usize; - -use self::crossbeam::channel; -use self::crossbeam::scope as thread_scope; -use self::itertools::kmerge; use score::{has_match, locate_inner, score_inner, LocateResult, ScoreResult}; +#[cfg(feature = "parallel")] +mod parallel; + +#[cfg(feature = "parallel")] +pub use self::parallel::{search_score, search_locate}; + /// Collection of scores and the candidates they apply to pub type ScoreResults = Vec; /// Collection of scores, locations, and the candidates they apply to pub type LocateResults = Vec; -/// Search among a collection of candidates using the given query, returning -/// an ordered collection of results (highest score first) -pub fn search_score( +/// Search serially among a collection of candidates using the given query, returning +/// an ordered collection of results (highest score first). +/// +/// # Example +/// +/// ```rust +/// # use fzyr::search_serial; +/// let items = vec!["this", "is", "kind", "of", "magic"]; +/// let res = search_serial("mgc", items.iter()); +/// assert_eq!("magic", items[res[0].candidate_index]); +/// ``` +pub fn search_serial( query: &str, - candidates: &[&str], - parallelism: usize, + candidates: impl IntoIterator>, ) -> ScoreResults { - search_internal(query, candidates, parallelism, score_inner).collect() + search_worker(candidates, query, 0, score_inner) } -/// Search among a collection of candidates using the given query, returning +/// Search serially among a collection of candidates using the given query, returning /// an ordered collection of results (highest score first) with the locations -/// of the query in each candidate -pub fn search_locate( +/// of the query in each candidate. +/// +/// # Example +/// +/// ```rust +/// # use fzyr::locate_serial; +/// let items = vec!["this", "is", "kind", "of", "magic"]; +/// let res = locate_serial("mgc", items.iter()); +/// assert_eq!("magic", items[res[0].candidate_index]); +/// ``` +pub fn locate_serial( query: &str, - candidates: &[&str], - parallelism: usize, + candidates: impl IntoIterator>, ) -> LocateResults { - search_internal(query, candidates, parallelism, locate_inner).collect() -} - -fn search_internal( - query: &str, - candidates: &[&str], - parallelism: usize, - search_fn: fn(&str, &str, usize) -> T, -) -> Box> -where - T: PartialOrd + Sized + Send + 'static, -{ - let parallelism = calculate_parallelism(candidates.len(), parallelism, query.is_empty()); - let mut candidates = candidates; - let (sender, receiver) = channel::bounded::>(parallelism); - - if parallelism < 2 { - Box::new(search_worker(candidates, query, 0, search_fn).into_iter()) - } else { - thread_scope(|scope| { - let mut remaining_candidates = candidates.len(); - let per_thread_count = ceil_div(remaining_candidates, parallelism); - let mut thread_offset = 0; - - // Create "parallelism" threads - while remaining_candidates > 0 { - // Search in this thread's share - let split = if remaining_candidates >= per_thread_count { - remaining_candidates -= per_thread_count; - per_thread_count - } else { - remaining_candidates = 0; - remaining_candidates - }; - let split = candidates.split_at(split); - let splitted_len = split.0.len(); - let sender = sender.clone(); - scope.spawn(move || { - sender.send(search_worker(split.0, query, thread_offset, search_fn)); - }); - thread_offset += splitted_len; - - // Remove that share from the candidate slice - candidates = split.1; - } - - drop(sender); - }); - - Box::new(kmerge(receiver)) - } + search_worker(candidates, query, 0, locate_inner) } // Search among candidates against a query in a single thread fn search_worker( - candidates: &[&str], + candidates: impl IntoIterator>, query: &str, offset_index: usize, search_fn: fn(&str, &str, usize) -> T @@ -96,8 +60,11 @@ fn search_worker( where T: PartialOrd, { - let mut out = Vec::with_capacity(candidates.len()); - for (index, candidate) in candidates.into_iter().enumerate() { + let candidates = candidates.into_iter(); + let (low, high) = candidates.size_hint(); + let mut out = Vec::with_capacity(high.unwrap_or(low)); + for (index, candidate) in candidates.enumerate() { + let candidate = candidate.as_ref(); if has_match(&query, candidate) { out.push(search_fn(&query, candidate, offset_index + index)); } @@ -107,205 +74,3 @@ where out } -fn calculate_parallelism( - candidate_count: usize, - configured_parallelism: usize, - empty_query: bool, -) -> usize { - if empty_query { - // No need to do much for no query - return 1; - } - - // Use a ramp up to avoid unecessarily starting threads with few candidates - let ramped_parallelism = match candidate_count { - n if n < 17 => ceil_div(n, 4), - n if n > 32 => ceil_div(n, 8), - _ => 4, - }; - - configured_parallelism - .min(ramped_parallelism) - .min(candidate_count) - .max(1) -} - -/// Integer ceiling division -fn ceil_div(a: usize, b: usize) -> usize { - (a + b - 1) / b -} - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn parallelism_ramp() { - assert_eq!(1, calculate_parallelism(0, 0, false)); - assert_eq!(1, calculate_parallelism(1, 0, false)); - assert_eq!(1, calculate_parallelism(0, 1, false)); - assert_eq!(1, calculate_parallelism(1, 1, false)); - - assert_eq!(1, calculate_parallelism(2, usize::MAX, false)); - assert_eq!(1, calculate_parallelism(3, 4, false)); - assert_eq!(1, calculate_parallelism(4, 2, false)); - - for n in 5..9 { - assert_eq!(2, calculate_parallelism(n, usize::MAX, false)); - assert_eq!(1, calculate_parallelism(n, usize::MAX, true)); - } - - for n in 9..13 { - assert_eq!(3, calculate_parallelism(n, usize::MAX, false)); - assert_eq!(1, calculate_parallelism(n, usize::MAX, true)); - } - - for n in 13..33 { - assert_eq!(4, calculate_parallelism(n, usize::MAX, false)); - assert_eq!(1, calculate_parallelism(n, usize::MAX, true)); - } - - for n in 1..10_000 { - assert!(calculate_parallelism(n, 12, false) <= 12); - assert_eq!(1, calculate_parallelism(n, 12, true)); - } - } - - fn search_empty_with_parallelism(parallelism: usize) { - let rs = search_score("", &[], parallelism); - assert_eq!(0, rs.len()); - - let rs = search_score("test", &[], parallelism); - assert_eq!(0, rs.len()); - } - - fn search_with_parallelism(parallelism: usize) { - search_empty_with_parallelism(parallelism); - - let rs = search_score("", &["tags"], parallelism); - assert_eq!(1, rs.len()); - assert_eq!(0, rs[0].candidate_index); - - let rs = search_score("♺", &["ñîƹ♺à"], parallelism); - assert_eq!(1, rs.len()); - assert_eq!(0, rs[0].candidate_index); - - let cs = &["tags", "test"]; - - let rs = search_score("", cs, parallelism); - assert_eq!(2, rs.len()); - - let rs = search_score("te", cs, parallelism); - assert_eq!(1, rs.len()); - assert_eq!(1, rs[0].candidate_index); - - let rs = search_score("foobar", cs, parallelism); - assert_eq!(0, rs.len()); - - let rs = search_score("ts", cs, parallelism); - assert_eq!(2, rs.len()); - assert_eq!( - vec![1, 0], - rs.iter().map(|r| r.candidate_index).collect::>() - ); - } - - fn search_med_parallelism(parallelism: usize) { - let cs = &[ - "one", - "two", - "three", - "four", - "five", - "six", - "seven", - "eight", - "nine", - "ten", - "eleven", - "twelve", - "thirteen", - "fourteen", - "fifteen", - "sixteen", - "seventeen", - "eighteen", - "nineteen", - "twenty", - ]; - - let rs = search_score("", cs, parallelism); - assert_eq!(cs.len(), rs.len()); - - let rs = search_score("teen", cs, parallelism); - assert_eq!(7, rs.len()); - for r in rs { - assert_eq!( - "neet", - cs[r.candidate_index].chars().rev().take(4).collect::() - ); - } - - let rs = search_score("tee", cs, parallelism); - assert_eq!(9, rs.len()); - assert_eq!( - "neet", - cs[rs[0].candidate_index].chars().rev().take(4).collect::() - ); - - let rs = search_score("six", cs, parallelism); - assert_eq!("six", cs[rs[0].candidate_index]); - } - - fn search_large_parallelism(parallelism: usize) { - let n = 100_000; - let mut candidates = Vec::with_capacity(n); - for i in 0..n { - candidates.push(format!("{}", i)); - } - - let rs = search_score( - "12", - &(candidates.iter().map(|s| &s[..]).collect::>()), - parallelism, - ); - - // This has been precalculated - // e.g. via `$ seq 0 99999 | grep '.*1.*2.*' | wc -l` - assert_eq!(8146, rs.len()); - assert_eq!("12", candidates[rs[0].candidate_index]); - } - - // TODO: test locate - - #[test] - fn search_single() { - search_with_parallelism(0); - search_with_parallelism(1); - search_large_parallelism(1); - } - - #[test] - fn search_double() { - search_with_parallelism(2); - search_large_parallelism(2); - } - - #[test] - fn search_quad() { - search_med_parallelism(4); - search_large_parallelism(4); - } - - #[test] - fn search_quin() { - search_med_parallelism(4); - search_large_parallelism(5); - } - - #[test] - fn search_large() { - search_med_parallelism(4); - search_large_parallelism(16); - } -} diff --git a/src/search/parallel.rs b/src/search/parallel.rs new file mode 100644 index 0000000..8ebccc2 --- /dev/null +++ b/src/search/parallel.rs @@ -0,0 +1,295 @@ +use super::*; + +/// Search among a collection of candidates using the given query, returning +/// an ordered collection of results (highest score first). +/// +/// # Example +/// +/// ```rust +/// # use fzyr::search_score; +/// let items = vec!["this", "is", "kind", "of", "magic"]; +/// let res = search_score("mgc", &items, 1); +/// assert_eq!("magic", items[res[0].candidate_index]); +/// ``` +pub fn search_score( + query: &str, + candidates: &[&str], + parallelism: usize, +) -> ScoreResults { + search_internal(query, candidates, parallelism, score_inner).collect() +} + +/// Search among a collection of candidates using the given query, returning +/// an ordered collection of results (highest score first) with the locations +/// of the query in each candidate. +/// +/// # Example +/// +/// ```rust +/// # use fzyr::search_locate; +/// let items = vec!["this", "is", "kind", "of", "magic"]; +/// let res = search_locate("mgc", &items, 1); +/// assert_eq!("magic", items[res[0].candidate_index]); +/// ``` +pub fn search_locate( + query: &str, + candidates: &[&str], + parallelism: usize, +) -> LocateResults { + search_internal(query, candidates, parallelism, locate_inner).collect() +} + +fn search_internal( + query: &str, + candidates: &[&str], + parallelism: usize, + search_fn: fn(&str, &str, usize) -> T, +) -> Box> +where + T: PartialOrd + Sized + Send + 'static, +{ + let parallelism = calculate_parallelism(candidates.len(), parallelism, query.is_empty()); + let mut candidates = candidates; + let (sender, receiver) = crossbeam::channel::bounded::>(parallelism); + + if parallelism < 2 { + Box::new(search_worker(candidates.iter(), query, 0, search_fn).into_iter()) + } else { + let _ = crossbeam::scope(|scope| { + let mut remaining_candidates = candidates.len(); + let per_thread_count = ceil_div(remaining_candidates, parallelism); + let mut thread_offset = 0; + + // Create "parallelism" threads + while remaining_candidates > 0 { + // Search in this thread's share + let split = if remaining_candidates >= per_thread_count { + remaining_candidates -= per_thread_count; + per_thread_count + } else { + remaining_candidates = 0; + remaining_candidates + }; + let split = candidates.split_at(split); + let splitted_len = split.0.len(); + let sender = sender.clone(); + scope.spawn(move |_| { + let _ = sender.send(search_worker(split.0.iter(), query, thread_offset, search_fn)); + }); + thread_offset += splitted_len; + + // Remove that share from the candidate slice + candidates = split.1; + } + + drop(sender); + }); + + Box::new(itertools::kmerge(receiver)) + } +} + +fn calculate_parallelism( + candidate_count: usize, + configured_parallelism: usize, + empty_query: bool, +) -> usize { + if empty_query { + // No need to do much for no query + return 1; + } + + // Use a ramp up to avoid unecessarily starting threads with few candidates + let ramped_parallelism = match candidate_count { + n if n < 17 => ceil_div(n, 4), + n if n > 32 => ceil_div(n, 8), + _ => 4, + }; + + configured_parallelism + .min(ramped_parallelism) + .min(candidate_count) + .max(1) +} + +/// Integer ceiling division +fn ceil_div(a: usize, b: usize) -> usize { + (a + b - 1) / b +} + +#[cfg(test)] +mod tests { + use std::usize; + + use super::*; + + #[test] + fn parallelism_ramp() { + assert_eq!(1, calculate_parallelism(0, 0, false)); + assert_eq!(1, calculate_parallelism(1, 0, false)); + assert_eq!(1, calculate_parallelism(0, 1, false)); + assert_eq!(1, calculate_parallelism(1, 1, false)); + + assert_eq!(1, calculate_parallelism(2, usize::MAX, false)); + assert_eq!(1, calculate_parallelism(3, 4, false)); + assert_eq!(1, calculate_parallelism(4, 2, false)); + + for n in 5..9 { + assert_eq!(2, calculate_parallelism(n, usize::MAX, false)); + assert_eq!(1, calculate_parallelism(n, usize::MAX, true)); + } + + for n in 9..13 { + assert_eq!(3, calculate_parallelism(n, usize::MAX, false)); + assert_eq!(1, calculate_parallelism(n, usize::MAX, true)); + } + + for n in 13..33 { + assert_eq!(4, calculate_parallelism(n, usize::MAX, false)); + assert_eq!(1, calculate_parallelism(n, usize::MAX, true)); + } + + for n in 1..10_000 { + assert!(calculate_parallelism(n, 12, false) <= 12); + assert_eq!(1, calculate_parallelism(n, 12, true)); + } + } + + fn search_empty_with_parallelism(parallelism: usize) { + let rs = search_score("", &[], parallelism); + assert_eq!(0, rs.len()); + + let rs = search_score("test", &[], parallelism); + assert_eq!(0, rs.len()); + } + + fn search_with_parallelism(parallelism: usize) { + search_empty_with_parallelism(parallelism); + + let rs = search_score("", &["tags"], parallelism); + assert_eq!(1, rs.len()); + assert_eq!(0, rs[0].candidate_index); + + let rs = search_score("♺", &["ñîƹ♺à"], parallelism); + assert_eq!(1, rs.len()); + assert_eq!(0, rs[0].candidate_index); + + let cs = &["tags", "test"]; + + let rs = search_score("", cs, parallelism); + assert_eq!(2, rs.len()); + + let rs = search_score("te", cs, parallelism); + assert_eq!(1, rs.len()); + assert_eq!(1, rs[0].candidate_index); + + let rs = search_score("foobar", cs, parallelism); + assert_eq!(0, rs.len()); + + let rs = search_score("ts", cs, parallelism); + assert_eq!(2, rs.len()); + assert_eq!( + vec![1, 0], + rs.iter().map(|r| r.candidate_index).collect::>() + ); + } + + fn search_med_parallelism(parallelism: usize) { + let cs = &[ + "one", + "two", + "three", + "four", + "five", + "six", + "seven", + "eight", + "nine", + "ten", + "eleven", + "twelve", + "thirteen", + "fourteen", + "fifteen", + "sixteen", + "seventeen", + "eighteen", + "nineteen", + "twenty", + ]; + + let rs = search_score("", cs, parallelism); + assert_eq!(cs.len(), rs.len()); + + let rs = search_score("teen", cs, parallelism); + assert_eq!(7, rs.len()); + for r in rs { + assert_eq!( + "neet", + cs[r.candidate_index].chars().rev().take(4).collect::() + ); + } + + let rs = search_score("tee", cs, parallelism); + assert_eq!(9, rs.len()); + assert_eq!( + "neet", + cs[rs[0].candidate_index].chars().rev().take(4).collect::() + ); + + let rs = search_score("six", cs, parallelism); + assert_eq!("six", cs[rs[0].candidate_index]); + } + + fn search_large_parallelism(parallelism: usize) { + let n = 100_000; + let mut candidates = Vec::with_capacity(n); + for i in 0..n { + candidates.push(format!("{}", i)); + } + + let rs = search_score( + "12", + &(candidates.iter().map(|s| &s[..]).collect::>()), + parallelism, + ); + + // This has been precalculated + // e.g. via `$ seq 0 99999 | grep '.*1.*2.*' | wc -l` + assert_eq!(8146, rs.len()); + assert_eq!("12", candidates[rs[0].candidate_index]); + } + + // TODO: test locate + + #[test] + fn search_single() { + search_with_parallelism(0); + search_with_parallelism(1); + search_large_parallelism(1); + } + + #[test] + fn search_double() { + search_with_parallelism(2); + search_large_parallelism(2); + } + + #[test] + fn search_quad() { + search_med_parallelism(4); + search_large_parallelism(4); + } + + #[test] + fn search_quin() { + search_med_parallelism(4); + search_large_parallelism(5); + } + + #[test] + fn search_large() { + search_med_parallelism(4); + search_large_parallelism(16); + } +} From 5850c62e860e9a431eb5b386d09d7d805b94ba00 Mon Sep 17 00:00:00 2001 From: Kitsu Date: Mon, 2 May 2022 10:59:35 +0300 Subject: [PATCH 2/3] Update dependencies --- Cargo.toml | 12 ++++++------ src/bin/opts.rs | 34 +++++++++++++++++----------------- 2 files changed, 23 insertions(+), 23 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 1fa7ff9..893f6a5 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -29,12 +29,12 @@ required-features = ["binary-build"] [dependencies] -ndarray = "^0.11.2" -bit-vec = "^0.5.0" -itertools = { version = "^0.7.8", optional = true } -crossbeam = { version = "^0.4.1", optional = true } -clap = { version = "^2.32.0", optional = true } -console = { version = "^0.6.1", optional = true } +ndarray = "^0.15" +bit-vec = "^0.6.0" +itertools = { version = "^0.10", optional = true } +crossbeam = { version = "^0.8", optional = true } +clap = { version = "^3.1", optional = true } +console = { version = "^0.15", optional = true } [features] default = ["binary-build", "parallel"] diff --git a/src/bin/opts.rs b/src/bin/opts.rs index 9deed2e..d61a5b7 100644 --- a/src/bin/opts.rs +++ b/src/bin/opts.rs @@ -1,6 +1,6 @@ extern crate clap; -use self::clap::{App, Arg}; +use self::clap::{Command, Arg}; pub const NAME: &'static str = env!("CARGO_PKG_NAME"); pub const VERSION: &'static str = env!("CARGO_PKG_VERSION"); @@ -43,65 +43,65 @@ pub fn cmd_parse() -> Options { let long_about: String = format!("{}\n[{}]", DESCRIPTION, WEBSITE); - let matches = App::new(NAME) + let matches = Command::new(NAME) .version(VERSION) .about(DESCRIPTION) .long_about(long_about.as_ref()) .arg( - Arg::with_name("query") - .short("q") + Arg::new("query") + .short('q') .long("query") .value_name("QUERY") .default_value(&deflt_query) .help("Query string to search for"), ) .arg( - Arg::with_name("lines") - .short("l") + Arg::new("lines") + .short('l') .long("lines") .value_name("LINES") .default_value(&deflt_lines) .help("Number of output lines to display"), ) .arg( - Arg::with_name("show-scores") - .short("s") + Arg::new("show-scores") + .short('s') .long("show-scores") .help("Show numerical scores for each match"), ) .arg( - Arg::with_name("parallelism") - .short("j") + Arg::new("parallelism") + .short('j') .long("parallelism") .value_name("THREADS") .default_value(&deflt_parallelism) .help("Maximum number of worker threads to use"), ) .arg( - Arg::with_name("prompt") - .short("p") + Arg::new("prompt") + .short('p') .long("prompt") .value_name("PROMPT") .default_value(&deflt_prompt) .help("Propmt to show when entering queries"), ) .arg( - Arg::with_name("benchmark") - .short("b") + Arg::new("benchmark") + .short('b') .long("benchmark") .value_name("REPEATS") .default_value(&deflt_benchmark) .help("Set to a positive value to run that many repeated searches for benchmarking"), ) .arg( - Arg::with_name("workers") + Arg::new("workers") .long("workers") .value_name("THREADS") .help("Identical to \"--parallelism\""), ) .arg( - Arg::with_name("show-matches") - .short("e") + Arg::new("show-matches") + .short('e') .long("show-matches") .value_name("QUERY") .help("Identical to \"--query\""), From 6452ff9ad46432e3591cd0396d5ae8b092e902d2 Mon Sep 17 00:00:00 2001 From: Kitsu Date: Sat, 28 May 2022 11:47:29 +0300 Subject: [PATCH 3/3] Update version to 0.3.0 --- Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Cargo.toml b/Cargo.toml index 893f6a5..6c7b2de 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "fzyr" -version = "0.2.0" +version = "0.3.0" homepage = "https://github.com/jmaargh/fzyr" authors = ["jmaargh "]