From 7da9107522d27c01d2c6559aaef10d42de466489 Mon Sep 17 00:00:00 2001 From: ilslv <47687266+ilslv@users.noreply.github.com> Date: Fri, 7 Jan 2022 12:28:54 +0300 Subject: [PATCH] Optimize `runner::Basic` to not wait the whole batch before executing next `Scenario`s (#195) --- CHANGELOG.md | 14 ++ src/runner/basic.rs | 343 +++++++++++++++++++++++++++----------------- 2 files changed, 224 insertions(+), 133 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 49b63408..0a55d918 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,20 @@ All user visible changes to `cucumber` crate will be documented in this file. Th +## [0.11.1] · 2022-01-?? +[0.11.1]: /../../tree/v0.11.1 + +[Diff](/../../compare/v0.11.0...v0.11.1) | [Milestone](/../../milestone/6) + +### Changed + +- Optimized `runner::Basic` to not wait the whole batch to complete before executing next `Scenario`s. ([#195]) + +[#195]: /../../pull/195 + + + + ## [0.11.0] · 2022-01-03 [0.11.0]: /../../tree/v0.11.0 diff --git a/src/runner/basic.rs b/src/runner/basic.rs index a7f7c707..b2c1e31e 100644 --- a/src/runner/basic.rs +++ b/src/runner/basic.rs @@ -18,7 +18,7 @@ use std::{ panic::{self, AssertUnwindSafe}, path::PathBuf, sync::{ - atomic::{AtomicBool, AtomicUsize, Ordering}, + atomic::{AtomicBool, Ordering}, Arc, }, }; @@ -464,12 +464,22 @@ async fn insert_features( /// Retrieves [`Feature`]s and executes them. /// +/// # Events +/// +/// - [`Scenario`] events are emitted by [`Executor`]. +/// - If [`Scenario`] was first or last for particular [`Rule`] or [`Feature`], +/// emits starting or finishing events for them. +/// /// [`Feature`]: gherkin::Feature +/// [`Rule`]: gherkin::Rule +/// [`Scenario`]: gherkin::Scenario async fn execute( features: Features, max_concurrent_scenarios: Option, collection: step::Collection, - sender: mpsc::UnboundedSender>>>, + event_sender: mpsc::UnboundedSender< + parser::Result>>, + >, before_hook: Option, after_hook: Option, ) where @@ -500,60 +510,70 @@ async fn execute( let hook = panic::take_hook(); panic::set_hook(Box::new(|_| {})); - let mut executor = - Executor::new(collection, before_hook, after_hook, sender); + let (finished_sender, finished_receiver) = mpsc::unbounded(); + let mut storage = FinishedRulesAndFeatures::new(finished_receiver); + let executor = Executor::new( + collection, + before_hook, + after_hook, + event_sender, + finished_sender, + ); - executor.send(event::Cucumber::Started); + executor.send_event(event::Cucumber::Started); + let mut started_scenarios = max_concurrent_scenarios; + let mut run_scenarios = stream::FuturesUnordered::new(); loop { - let runnable = features.get(max_concurrent_scenarios).await; - if runnable.is_empty() { + let runnable = features.get(started_scenarios).await; + if run_scenarios.is_empty() && runnable.is_empty() { if features.is_finished() { break; } continue; } - let started = executor.start_scenarios(&runnable); - executor.send_all(started); + let started = storage.start_scenarios(&runnable); + executor.send_all_events(started); - drop( - runnable - .into_iter() - .map(|(f, r, s)| executor.run_scenario(f, r, s)) - .collect::>() - .await, - ); + if let Some(sc) = started_scenarios.as_mut() { + *sc -= runnable.len(); + } - executor.cleanup_finished_rules_and_features(); + for (f, r, s) in runnable { + run_scenarios.push(executor.run_scenario(f, r, s)); + } + + if run_scenarios.next().await.is_some() { + if let Some(sc) = started_scenarios.as_mut() { + *sc += 1; + } + } + + while let Ok(Some((feat, rule))) = storage.finished_receiver.try_next() + { + if let Some(r) = rule { + if let Some(f) = + storage.rule_scenario_finished(Arc::clone(&feat), r) + { + executor.send_event(f); + } + } + if let Some(f) = storage.feature_scenario_finished(feat) { + executor.send_event(f); + } + } } - executor.send(event::Cucumber::Finished); + executor.send_event(event::Cucumber::Finished); panic::set_hook(hook); } -/// Stores currently ran [`Feature`]s and notifies about their state of -/// completion. +/// Runs [`Scenario`]s and notifies about their state of completion. /// -/// [`Feature`]: gherkin::Feature. +/// [`Scenario`]: gherkin::Scenario struct Executor { - /// Number of finished [`Scenario`]s of [`Feature`]. - /// - /// [`Feature`]: gherkin::Feature - /// [`Scenario`]: gherkin::Scenario - features_scenarios_count: HashMap, AtomicUsize>, - - /// Number of finished [`Scenario`]s of [`Rule`]. - /// - /// We also store path to `.feature` file so [`Rule`]s with same names and - /// spans in different files will have different hashes. - /// - /// [`Rule`]: gherkin::Rule - /// [`Scenario`]: gherkin::Scenario - rule_scenarios_count: - HashMap<(Option, Arc), AtomicUsize>, - /// [`Step`]s [`Collection`]. /// /// [`Collection`]: step::Collection @@ -574,10 +594,20 @@ struct Executor { /// [`Step`]: gherkin::Step after_hook: Option, - /// Sender for notifying state of [`Feature`]s completion. + /// Sender for [`Scenario`] [events][1]. /// - /// [`Feature`]: gherkin::Feature - sender: mpsc::UnboundedSender>>>, + /// [`Scenario`]: gherkin::Scenario + /// [1]: event::Scenario + event_sender: + mpsc::UnboundedSender>>>, + + /// Sender for notifying of [`Scenario`]s completion. + /// + /// [`Scenario`]: gherkin::Scenario + finished_sender: mpsc::UnboundedSender<( + Arc, + Option>, + )>, } impl Executor @@ -602,17 +632,20 @@ where collection: step::Collection, before_hook: Option, after_hook: Option, - sender: mpsc::UnboundedSender< + event_sender: mpsc::UnboundedSender< parser::Result>>, >, + finished_sender: mpsc::UnboundedSender<( + Arc, + Option>, + )>, ) -> Self { Self { - features_scenarios_count: HashMap::new(), - rule_scenarios_count: HashMap::new(), collection, before_hook, after_hook, - sender, + event_sender, + finished_sender, } } @@ -621,8 +654,6 @@ where /// # Events /// /// - Emits all [`Scenario`] events. - /// - If [`Scenario`] was last for particular [`Rule`] or [`Feature`], also - /// emits finishing events for them. /// /// [`Feature`]: gherkin::Feature /// [`Rule`]: gherkin::Rule @@ -672,7 +703,7 @@ where event::Scenario::step_failed, ); - self.send(event::Cucumber::scenario( + self.send_event(event::Cucumber::scenario( Arc::clone(&feature), rule.clone(), Arc::clone(&scenario), @@ -732,27 +763,21 @@ where .await .map_or((), drop); - self.send(event::Cucumber::scenario( + self.send_event(event::Cucumber::scenario( Arc::clone(&feature), rule.clone(), Arc::clone(&scenario), event::Scenario::Finished, )); - if let Some(r) = rule { - if let Some(f) = - self.rule_scenario_finished(Arc::clone(&feature), r) - { - self.send(f); - } - } - - if let Some(f) = self.feature_scenario_finished(feature) { - self.send(f); - } + self.scenario_finished(feature, rule); } /// Executes [`HookType::Before`], if present. + /// + /// # Events + /// + /// - Emits [`HookType::Before`] event. async fn run_before_hook( &self, feature: &Arc, @@ -776,7 +801,7 @@ where }; if let Some(hook) = self.before_hook.as_ref() { - self.send(event::Cucumber::scenario( + self.send_event(event::Cucumber::scenario( Arc::clone(feature), rule.map(Arc::clone), Arc::clone(scenario), @@ -798,7 +823,7 @@ where match fut.await { Ok(world) => { - self.send(event::Cucumber::scenario( + self.send_event(event::Cucumber::scenario( Arc::clone(feature), rule.map(Arc::clone), Arc::clone(scenario), @@ -807,7 +832,7 @@ where Ok(Some(world)) } Err((info, world)) => { - self.send(event::Cucumber::scenario( + self.send_event(event::Cucumber::scenario( Arc::clone(feature), rule.map(Arc::clone), Arc::clone(scenario), @@ -826,6 +851,10 @@ where } /// Executes [`HookType::After`], if present. + /// + /// # Event + /// + /// - Emits [`HookType::After`] event. async fn run_after_hook( &self, mut world: Option, @@ -834,7 +863,7 @@ where scenario: &Arc, ) -> Result, ()> { if let Some(hook) = self.after_hook.as_ref() { - self.send(event::Cucumber::scenario( + self.send_event(event::Cucumber::scenario( Arc::clone(feature), rule.map(Arc::clone), Arc::clone(scenario), @@ -857,7 +886,7 @@ where #[allow(clippy::shadow_unrelated)] match fut.await { Ok(world) => { - self.send(event::Cucumber::scenario( + self.send_event(event::Cucumber::scenario( Arc::clone(feature), rule.map(Arc::clone), Arc::clone(scenario), @@ -866,7 +895,7 @@ where Ok(world) } Err((info, world)) => { - self.send(event::Cucumber::scenario( + self.send_event(event::Cucumber::scenario( Arc::clone(feature), rule.map(Arc::clone), Arc::clone(scenario), @@ -908,7 +937,7 @@ where event::StepError, ) -> event::Cucumber, { - self.send(started(Arc::clone(&step))); + self.send_event(started(Arc::clone(&step))); let run = async { let (step_fn, captures, ctx) = match self.collection.find(&step) { @@ -953,39 +982,131 @@ where #[allow(clippy::shadow_unrelated)] match run.await { Ok((Some(captures), Some(world))) => { - self.send(passed(step, captures)); + self.send_event(passed(step, captures)); Ok(world) } Ok((_, world)) => { - self.send(skipped(step)); + self.send_event(skipped(step)); Err(world) } Err((err, captures, world)) => { - self.send(failed(step, captures, world.map(Arc::new), err)); + self.send_event(failed( + step, + captures, + world.map(Arc::new), + err, + )); Err(None) } } } + /// Notifies [`FinishedRulesAndFeatures`] about [`Scenario`] being finished. + /// + /// [`Scenario`]: gherkin::Scenario + fn scenario_finished( + &self, + feature: Arc, + rule: Option>, + ) { + // If the receiver end is dropped, then no one listens for events + // so we can just ignore it. + drop(self.finished_sender.unbounded_send((feature, rule))); + } + + /// Notifies with the given [`Cucumber`] event. + /// + /// [`Cucumber`]: event::Cucumber + fn send_event(&self, event: event::Cucumber) { + // If the receiver end is dropped, then no one listens for events + // so we can just ignore it. + drop(self.event_sender.unbounded_send(Ok(Event::new(event)))); + } + + /// Notifies with the given [`Cucumber`] events. + /// + /// [`Cucumber`]: event::Cucumber + fn send_all_events( + &self, + events: impl Iterator>, + ) { + for v in events { + // If the receiver end is dropped, then no one listens for events + // so we can just stop from here. + if self.event_sender.unbounded_send(Ok(Event::new(v))).is_err() { + break; + } + } + } +} + +/// Stores currently running [`Rule`]s and [`Feature`]s and notifies about their +/// state of completion. +/// +/// [`Feature`]: gherkin::Feature +/// [`Rule`]: gherkin::Rule +struct FinishedRulesAndFeatures { + /// Number of finished [`Scenario`]s of [`Feature`]. + /// + /// [`Feature`]: gherkin::Feature + /// [`Scenario`]: gherkin::Scenario + features_scenarios_count: HashMap, usize>, + + /// Number of finished [`Scenario`]s of [`Rule`]. + /// + /// We also store path to `.feature` file so [`Rule`]s with same names and + /// spans in different files will have different hashes. + /// + /// [`Rule`]: gherkin::Rule + /// [`Scenario`]: gherkin::Scenario + rule_scenarios_count: HashMap<(Option, Arc), usize>, + + /// Receiver for notifying state of [`Scenario`]s completion. + /// + /// [`Scenario`]: gherkin::Scenario + finished_receiver: mpsc::UnboundedReceiver<( + Arc, + Option>, + )>, +} + +impl FinishedRulesAndFeatures { + /// Creates a new [`FinishedRulesAndFeatures`] store. + fn new( + finished_receiver: mpsc::UnboundedReceiver<( + Arc, + Option>, + )>, + ) -> Self { + Self { + features_scenarios_count: HashMap::new(), + rule_scenarios_count: HashMap::new(), + finished_receiver, + } + } + /// Marks [`Rule`]'s [`Scenario`] as finished and returns [`Rule::Finished`] /// event if no [`Scenario`]s left. /// /// [`Rule`]: gherkin::Rule /// [`Rule::Finished`]: event::Rule::Finished /// [`Scenario`]: gherkin::Scenario - fn rule_scenario_finished( - &self, + fn rule_scenario_finished( + &mut self, feature: Arc, rule: Arc, ) -> Option> { let finished_scenarios = self .rule_scenarios_count - .get(&(feature.path.clone(), Arc::clone(&rule))) - .unwrap_or_else(|| panic!("No Rule {}", rule.name)) - .fetch_add(1, Ordering::SeqCst) - + 1; - (rule.scenarios.len() == finished_scenarios) - .then(|| event::Cucumber::rule_finished(feature, rule)) + .get_mut(&(feature.path.clone(), Arc::clone(&rule))) + .unwrap_or_else(|| panic!("No Rule {}", rule.name)); + *finished_scenarios += 1; + (rule.scenarios.len() == *finished_scenarios).then(|| { + let _ = self + .rule_scenarios_count + .remove(&(feature.path.clone(), Arc::clone(&rule))); + event::Cucumber::rule_finished(feature, rule) + }) } /// Marks [`Feature`]'s [`Scenario`] as finished and returns @@ -994,19 +1115,20 @@ where /// [`Feature`]: gherkin::Feature /// [`Feature::Finished`]: event::Feature::Finished /// [`Scenario`]: gherkin::Scenario - fn feature_scenario_finished( - &self, + fn feature_scenario_finished( + &mut self, feature: Arc, ) -> Option> { let finished_scenarios = self .features_scenarios_count - .get(&feature) - .unwrap_or_else(|| panic!("No Feature {}", feature.name)) - .fetch_add(1, Ordering::SeqCst) - + 1; + .get_mut(&feature) + .unwrap_or_else(|| panic!("No Feature {}", feature.name)); + *finished_scenarios += 1; let scenarios = feature.count_scenarios(); - (scenarios == finished_scenarios) - .then(|| event::Cucumber::feature_finished(feature)) + (scenarios == *finished_scenarios).then(|| { + let _ = self.features_scenarios_count.remove(&feature); + event::Cucumber::feature_finished(feature) + }) } /// Marks [`Scenario`]s as started and returns [`Rule::Started`] and @@ -1018,7 +1140,7 @@ where /// [`Rule`]: gherkin::Rule /// [`Rule::Started`]: event::Rule::Started /// [`Scenario`]: gherkin::Scenario - fn start_scenarios( + fn start_scenarios( &mut self, runnable: impl AsRef< [( @@ -1037,7 +1159,7 @@ where .entry(Arc::clone(&feature)) .or_insert_with(|| { started_features.push(feature); - 0.into() + 0 }); } @@ -1054,7 +1176,7 @@ where .entry((feat.path.clone(), Arc::clone(&rule))) .or_insert_with(|| { started_rules.push((feat, rule)); - 0.into() + 0 }); } @@ -1067,51 +1189,6 @@ where .map(|(f, r)| event::Cucumber::rule_started(f, r)), ) } - - /// Removes all finished [`Rule`]s and [`Feature`]s as all their events are - /// emitted already. - /// - /// [`Feature`]: gherkin::Feature - /// [`Rule`]: gherkin::Rule - fn cleanup_finished_rules_and_features(&mut self) { - self.features_scenarios_count = self - .features_scenarios_count - .drain() - .filter(|(f, count)| { - f.count_scenarios() != count.load(Ordering::SeqCst) - }) - .collect(); - - self.rule_scenarios_count = self - .rule_scenarios_count - .drain() - .filter(|((_, r), count)| { - r.scenarios.len() != count.load(Ordering::SeqCst) - }) - .collect(); - } - - /// Notifies with the given [`Cucumber`] event. - /// - /// [`Cucumber`]: event::Cucumber - fn send(&self, event: event::Cucumber) { - // If the receiver end is dropped, then no one listens for events - // so we can just ignore it. - drop(self.sender.unbounded_send(Ok(Event::new(event)))); - } - - /// Notifies with the given [`Cucumber`] events. - /// - /// [`Cucumber`]: event::Cucumber - fn send_all(&self, events: impl Iterator>) { - for v in events { - // If the receiver end is dropped, then no one listens for events - // so we can just stop from here. - if self.sender.unbounded_send(Ok(Event::new(v))).is_err() { - break; - } - } - } } /// [`Scenario`]s storage.