diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 09437a85..30969109 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -81,6 +81,7 @@ jobs: - output-json - output-junit - libtest + - tracing runs-on: ubuntu-latest steps: - uses: actions/checkout@v3 diff --git a/CHANGELOG.md b/CHANGELOG.md index 39b386c1..fe8d3ae8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,25 @@ All user visible changes to `cucumber` crate will be documented in this file. Th +## [0.20.0] · 2023-??-?? +[0.20.0]: /../../tree/v0.20.0 + +[Diff](/../../compare/v0.19.1...v0.20.0) | [Milestone](/../../milestone/24) + +### BC Breaks + +- Added `Log` variant to `event::Scenario`. ([#258]) + +### Added + +- [`tracing`] crate integration behind the `tracing` feature flag. ([#213], [#258]) + +[#213]: /../../issues/213 +[#258]: /../../pull/258 + + + + ## [0.19.1] · 2022-12-29 [0.19.1]: /../../tree/v0.19.1 @@ -680,6 +699,7 @@ All user visible changes to `cucumber` crate will be documented in this file. Th [`clap`]: https://docs.rs/clap [`gherkin`]: https://docs.rs/gherkin [`gherkin_rust`]: https://docs.rs/gherkin_rust +[`tracing`]: https://docs.rs/tracing [Cargo feature]: https://doc.rust-lang.org/cargo/reference/features.html [Cucumber Expressions]: https://cucumber.github.io/cucumber-expressions diff --git a/Cargo.toml b/Cargo.toml index b01c278b..992aba6e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -19,7 +19,7 @@ repository = "https://github.com/cucumber-rs/cucumber" readme = "README.md" categories = ["asynchronous", "development-tools::testing"] keywords = ["cucumber", "testing", "bdd", "atdd", "async"] -include = ["/src/", "/tests/json.rs", "/tests/junit.rs", "/tests/libtest.rs", "/tests/wait.rs", "/LICENSE-*", "/README.md", "/CHANGELOG.md"] +include = ["/src/", "/tests/json.rs", "/tests/junit.rs", "/tests/libtest.rs", "/tests/tracing.rs", "/tests/wait.rs", "/LICENSE-*", "/README.md", "/CHANGELOG.md"] [package.metadata.docs.rs] all-features = true @@ -37,12 +37,14 @@ output-json = ["dep:Inflector", "dep:serde", "dep:serde_json", "timestamps"] output-junit = ["dep:junit-report", "timestamps"] # Enables timestamps collecting for all events. timestamps = [] +# Enables integraion with `tracing` crate. +tracing = ["dep:crossbeam-utils", "dep:tracing", "dep:tracing-subscriber"] [dependencies] async-trait = "0.1.43" clap = { version = "4.0.27", features = ["derive", "wrap_help"] } console = "0.15" -derive_more = { version = "0.99.17", features = ["as_ref", "deref", "deref_mut", "display", "error", "from", "into"], default_features = false } +derive_more = { version = "0.99.17", features = ["as_ref", "deref", "deref_mut", "display", "error", "from", "from_str", "into"], default_features = false } drain_filter_polyfill = "0.1.2" either = "1.6" futures = "0.3.17" @@ -53,6 +55,7 @@ is-terminal = "0.4.4" itertools = "0.10" linked-hash-map = "0.5.3" once_cell = "1.13" +pin-project = "1.0" regex = "1.5.5" sealed = "0.4" smart-default = "0.6" @@ -71,6 +74,11 @@ Inflector = { version = "0.11", default-features = false, optional = true } # "output-junit" feature dependencies. junit-report = { version = "0.8", optional = true } +# "tracing" feature dependencies. +crossbeam-utils = { version = "0.8.14", optional = true } +tracing = { version = "0.1", optional = true } +tracing-subscriber = { version = "0.3.16", optional = true } + [dev-dependencies] derive_more = "0.99.17" rand = "0.8" @@ -89,6 +97,11 @@ required-features = ["output-junit"] name = "libtest" required-features = ["libtest"] +[[test]] +name = "tracing" +required-features = ["tracing"] +harness = false + [[test]] name = "wait" required-features = ["libtest"] diff --git a/README.md b/README.md index 853b1995..3a4c6054 100644 --- a/README.md +++ b/README.md @@ -90,6 +90,7 @@ For more examples check out the Book ([current][1] | [edge][2]). - `output-json` (implies `timestamps`): Enables support for outputting in [Cucumber JSON format]. - `output-junit` (implies `timestamps`): Enables support for outputting [JUnit XML report]. - `libtest` (implies `timestamps`): Enables compatibility with [Rust `libtest`][4]'s JSON output format. Useful for [IntelliJ Rust plugin integration][3]. +- `tracing`: Enables [integration with `tracing` crate][5]. @@ -129,5 +130,6 @@ at your option. [2]: https://cucumber-rs.github.io/cucumber/main [3]: https://cucumber-rs.github.io/cucumber/main/output/intellij.html [4]: https://doc.rust-lang.org/rustc/tests/index.html +[5]: https://cucumber-rs.github.io/cucumber/main/output/tracing.html [asciicast]:  diff --git a/book/src/SUMMARY.md b/book/src/SUMMARY.md index dd5a3e45..2256670b 100644 --- a/book/src/SUMMARY.md +++ b/book/src/SUMMARY.md @@ -21,6 +21,7 @@ - [JUnit XML report](output/junit.md) - [Cucumber JSON format](output/json.md) - [Multiple outputs](output/multiple.md) + - [`tracing` integration](output/tracing.md) - [IntelliJ Rust integration](output/intellij.md) - [Architecture](architecture/index.md) - [Custom `Parser`](architecture/parser.md) diff --git a/book/src/output/index.md b/book/src/output/index.md index 1d0c3592..8f3cc6a1 100644 --- a/book/src/output/index.md +++ b/book/src/output/index.md @@ -7,4 +7,5 @@ This chapter describes possible way and tweaks of outputting test suite results. 2. [JUnit XML report](junit.md) 3. [Cucumber JSON format](json.md) 4. [Multiple outputs](multiple.md) -5. [IntelliJ Rust integration](intellij.md) +5. [`tracing` integration](tracing.md) +6. [IntelliJ Rust integration](intellij.md) diff --git a/book/src/output/terminal.md b/book/src/output/terminal.md index d1ed8449..7579dcfc 100644 --- a/book/src/output/terminal.md +++ b/book/src/output/terminal.md @@ -213,9 +213,9 @@ async fn main() { -## Manual printing +## Debug printing and/or logging -Though [`cucumber`] crate doesn't capture any manual printing produced in a [step] matching function (such as [`dbg!`] or [`println!`] macros), it may be [quite misleading][#177] to produce and use it for debugging purposes. The reason is simply because [`cucumber`] crate executes [scenario]s concurrently and [normalizes][3] their results before outputting, while any manual print is produced instantly at the moment of its [step] execution. +Though [`cucumber`] crate doesn't capture any manual debug printing produced in a [step] matching function (such as [`dbg!`] or [`println!`] macros), it may be [quite misleading][#177] to produce and use it for debugging purposes. The reason is simply because [`cucumber`] crate executes [scenario]s concurrently and [normalizes][3] their results before outputting, while any manual print is produced instantly at the moment of its [step] execution. > __WARNING:__ Moreover, manual printing will very likely interfere with [default][1] interactive pretty-printing. @@ -348,7 +348,48 @@ async fn main() { ``` ![record](../rec/output_terminal_custom.gif) -> __NOTE__: The custom print is still output before its [step], because is printed during the [step] execution. +> __NOTE__: The custom print is still output before its [step], because is printed during the [step] execution. + +Much better option for debugging would be using [`tracing` crate integration](tracing.md) instead of [`dbg!`]/[`println!`] for doing logs. + +```rust +# extern crate cucumber; +# extern crate tokio; +# extern crate tracing; +# +use std::{ + sync::atomic::{AtomicUsize, Ordering}, + time::Duration, +}; + +use cucumber::{given, then, when, World as _}; +use tokio::time; + +#[derive(cucumber::World, Debug, Default)] +struct World; + +#[given(regex = r"(\d+) secs?")] +#[when(regex = r"(\d+) secs?")] +#[then(regex = r"(\d+) secs?")] +async fn sleep(_: &mut World, secs: u64) { + static ID: AtomicUsize = AtomicUsize::new(0); + + let id = ID.fetch_add(1, Ordering::Relaxed); + + tracing::info!("before {secs}s sleep: {id}"); + time::sleep(Duration::from_secs(secs)).await; + tracing::info!("after {secs}s sleep: {id}"); +} + +#[tokio::main] +async fn main() { + World::cucumber() + .init_tracing() + .run("tests/features/wait") + .await; +} +``` +![record](../rec/tracing_basic_writer.gif) diff --git a/book/src/output/tracing.md b/book/src/output/tracing.md new file mode 100644 index 00000000..54e30eef --- /dev/null +++ b/book/src/output/tracing.md @@ -0,0 +1,119 @@ +`tracing` integration +===================== + +[`Cucumber::init_tracing()`] (enabled by `tracing` feature in `Cargo.toml`) initializes global [`tracing::Subscriber`] that intercepts all the [`tracing` events][1] and transforms them into [`event::Scenario::Log`]s. Each [`Writer`] can handle those [`event::Scenario::Log`]s in its own way. [`writer::Basic`], for example, emits all the [`event::Scenario::Log`]s only whenever [scenario] itself is outputted. + +```rust +# extern crate cucumber; +# extern crate tokio; +# extern crate tracing; +# +use std::{ + sync::atomic::{AtomicUsize, Ordering}, + time::Duration, +}; + +use cucumber::{given, then, when, World as _}; +use tokio::time; + +#[derive(cucumber::World, Debug, Default)] +struct World; + +#[given(regex = r"(\d+) secs?")] +#[when(regex = r"(\d+) secs?")] +#[then(regex = r"(\d+) secs?")] +async fn sleep(_: &mut World, secs: u64) { + static ID: AtomicUsize = AtomicUsize::new(0); + + let id = ID.fetch_add(1, Ordering::Relaxed); + + tracing::info!("before {secs}s sleep: {id}"); + time::sleep(Duration::from_secs(secs)).await; + tracing::info!("after {secs}s sleep: {id}"); +} + +#[tokio::main] +async fn main() { + World::cucumber() + .init_tracing() + .run("tests/features/wait") + .await; +} +``` +![record](../rec/tracing_basic_writer.gif) + + + + +## Loosing [`tracing::Span`] + +[`tracing::Span`] is used to wire emitted [`tracing` events][1] (logs) to concrete [scenario]s: each [scenario] is executed in its own [`tracing::Span`]. In case a [`tracing` event][1] is emitted outside the [`tracing::Span`] of a [scenario], it will be propagated to every running [scenario] at the moment. + +```rust +# extern crate cucumber; +# extern crate tokio; +# extern crate tracing; +# +# use std::{ +# sync::atomic::{AtomicUsize, Ordering}, +# time::Duration, +# }; +# +# use cucumber::{given, then, when, World as _}; +# use tokio::time; +# +# #[derive(cucumber::World, Debug, Default)] +# struct World; +# +# #[given(regex = r"(\d+) secs?")] +# #[when(regex = r"(\d+) secs?")] +# #[then(regex = r"(\d+) secs?")] +# async fn sleep(_: &mut World, secs: u64) { +# static ID: AtomicUsize = AtomicUsize::new(0); +# +# let id = ID.fetch_add(1, Ordering::Relaxed); +# +# tracing::info!("before {secs}s sleep: {id}"); +# time::sleep(Duration::from_secs(secs)).await; +# tracing::info!("after {secs}s sleep: {id}"); +# } +# +#[tokio::main] +async fn main() { + // Background task outside of any scenario. + tokio::spawn(async { + let mut id = 0; + loop { + time::sleep(Duration::from_secs(3)).await; + tracing::info!("Background: {id}"); + id += 1; + } + }); + + World::cucumber() + .init_tracing() + .run("tests/features/wait") + .await; +} +``` +![record](../rec/tracing_outside_span.gif) + +As we see, `Background: 2`/`3`/`4` is shown in multiple [scenario]s, while being emitted only once each. + +> __TIP__: If you're [`spawn`]ing a [`Future`] inside your [step] matching function, consider to [propagate][2] its [`tracing::Span`] into the [`spawn`]ed [`Future`] for outputting its logs properly. + + + + +[`Cucumber::init_tracing()`]: https://docs.rs/cucumber/*/cucumber/struct.Cucumber.html#method.init_tracing +[`event::Scenario::Log`]: https://docs.rs/cucumber/*/cucumber/event/enum.Scenario.html#variant.Log +[`Future`]: https://doc.rust-lang.org/stable/std/future/trait.Future.html +[`spawn`]: https://docs.rs/tokio/*/tokio/fn.spawn.html +[`tracing::Span`]: https://docs.rs/tracing/*/tracing/struct.Span.html +[`tracing::Subscriber`]: https://docs.rs/tracing/*/tracing/trait.Subscriber.html +[`Writer`]: https://docs.rs/cucumber/*/cucumber/writer/trait.Writer.html +[`writer::Basic`]: https://docs.rs/cucumber/*/cucumber/writer/struct.Basic.html +[scenario]: https://cucumber.io/docs/gherkin/reference#example +[step]: https://cucumber.io/docs/gherkin/reference#steps +[1]: https://docs.rs/tracing/*/tracing/index.html#events +[2]: https://docs.rs/tracing/*/tracing/struct.Span.html#method.enter diff --git a/book/src/rec/tracing_basic_writer.gif b/book/src/rec/tracing_basic_writer.gif new file mode 100644 index 00000000..6a61d498 Binary files /dev/null and b/book/src/rec/tracing_basic_writer.gif differ diff --git a/book/src/rec/tracing_outside_span.gif b/book/src/rec/tracing_outside_span.gif new file mode 100644 index 00000000..2f7ba192 Binary files /dev/null and b/book/src/rec/tracing_outside_span.gif differ diff --git a/src/cucumber.rs b/src/cucumber.rs index 074d3e0f..eedc2d7f 100644 --- a/src/cucumber.rs +++ b/src/cucumber.rs @@ -66,7 +66,7 @@ where /// [`Runner`] executing [`Scenario`]s and producing [`event`]s. /// /// [`Scenario`]: gherkin::Scenario - runner: R, + pub(crate) runner: R, /// [`Writer`] outputting [`event`]s to some output. writer: Wr, diff --git a/src/event.rs b/src/event.rs index 568c83de..4ac255b0 100644 --- a/src/event.rs +++ b/src/event.rs @@ -495,6 +495,9 @@ pub enum Scenario { /// [`Step`] event. Step(Arc, Step), + /// [`Scenario`]'s log entry is emitted. + Log(String), + /// [`Scenario`] execution being finished. /// /// [`Scenario`]: gherkin::Scenario @@ -512,6 +515,7 @@ impl Clone for Scenario { Self::Background(Arc::clone(bg), ev.clone()) } Self::Step(st, ev) => Self::Step(Arc::clone(st), ev.clone()), + Self::Log(msg) => Self::Log(msg.clone()), Self::Finished => Self::Finished, } } diff --git a/src/future.rs b/src/future.rs index 68d36ab1..47edcceb 100644 --- a/src/future.rs +++ b/src/future.rs @@ -2,6 +2,12 @@ use std::{future::Future, pin::Pin, task}; +use futures::{ + future::{Either, FusedFuture, Then}, + FutureExt as _, +}; +use pin_project::pin_project; + /// Wakes the current task and returns [`task::Poll::Pending`] once. /// /// This function is useful when we want to cooperatively give time to a task @@ -31,3 +37,121 @@ impl Future for YieldNow { } } } + +/// Return type of a [`FutureExt::then_yield()`] method. +type ThenYield = Then, fn(O) -> YieldThenReturn>; + +/// Extensions of a [`Future`], used inside this crate. +pub(crate) trait FutureExt: Future + Sized { + /// Yields after this [`Future`] is resolved allowing other [`Future`]s + /// making progress. + fn then_yield(self) -> ThenYield { + self.then(YieldThenReturn::new) + } +} + +impl FutureExt for T {} + +/// [`Future`] returning a [`task::Poll::Pending`] once, before returning a +/// contained value. +#[derive(Debug)] +#[pin_project] +pub(crate) struct YieldThenReturn { + /// Value to be returned. + value: Option, + + /// [`YieldNow`] [`Future`]. + r#yield: YieldNow, +} + +impl YieldThenReturn { + /// Creates a new [`YieldThenReturn`] [`Future`]. + const fn new(v: V) -> Self { + Self { + value: Some(v), + r#yield: yield_now(), + } + } +} + +impl Future for YieldThenReturn { + type Output = V; + + fn poll( + self: Pin<&mut Self>, + cx: &mut task::Context<'_>, + ) -> task::Poll { + let this = self.project(); + task::ready!(this.r#yield.poll_unpin(cx)); + this.value + .take() + .map_or(task::Poll::Pending, task::Poll::Ready) + } +} + +/// [`select`] that always [`poll()`]s the `biased` [`Future`] first, and only +/// if it returns [`task::Poll::Pending`] tries to [`poll()`] the `regular` one. +/// +/// Implementation is exactly the same, as [`select`] at the moment, but +/// documentation has no guarantees about this behaviour, so can be changed. +/// +/// [`poll()`]: Future::poll +/// [`select`]: futures::future::select +pub(crate) const fn select_with_biased_first( + biased: A, + regular: B, +) -> SelectWithBiasedFirst +where + A: Future + Unpin, + B: Future + Unpin, +{ + SelectWithBiasedFirst { + inner: Some((biased, regular)), + } +} + +/// [`Future`] returned by a [`select_with_biased_first()`] function. +pub(crate) struct SelectWithBiasedFirst { + /// Inner [`Future`]s. + inner: Option<(A, B)>, +} + +impl Future for SelectWithBiasedFirst +where + A: Future + Unpin, + B: Future + Unpin, +{ + type Output = Either<(A::Output, B), (B::Output, A)>; + + #[allow(clippy::expect_used)] + fn poll( + mut self: Pin<&mut Self>, + cx: &mut task::Context<'_>, + ) -> task::Poll { + let (mut a, mut b) = self + .inner + .take() + .expect("cannot poll `SelectWithBiasedFirst` twice"); + + if let task::Poll::Ready(val) = a.poll_unpin(cx) { + return task::Poll::Ready(Either::Left((val, b))); + } + + if let task::Poll::Ready(val) = b.poll_unpin(cx) { + return task::Poll::Ready(Either::Right((val, a))); + } + + self.inner = Some((a, b)); + task::Poll::Pending + } +} + +impl FusedFuture for SelectWithBiasedFirst +where + A: Future + Unpin, + B: Future + Unpin, +{ + fn is_terminated(&self) -> bool { + self.inner.is_none() + } +} diff --git a/src/lib.rs b/src/lib.rs index a998db15..58a0f747 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -143,6 +143,8 @@ pub mod writer; #[cfg(feature = "macros")] pub mod codegen; +#[cfg(feature = "tracing")] +pub mod tracing; // TODO: Remove once tests run without complains about it. #[cfg(test)] diff --git a/src/runner/basic.rs b/src/runner/basic.rs index 800dee02..22fed327 100644 --- a/src/runner/basic.rs +++ b/src/runner/basic.rs @@ -17,13 +17,16 @@ use std::{ ops::ControlFlow, panic::{self, AssertUnwindSafe}, sync::{ - atomic::{AtomicBool, Ordering}, + atomic::{AtomicBool, AtomicU64, Ordering}, Arc, }, thread, time::{Duration, Instant}, }; +#[cfg(feature = "tracing")] +use crossbeam_utils::atomic::AtomicCell; +use derive_more::{Display, FromStr}; use drain_filter_polyfill::VecExt; use futures::{ channel::{mpsc, oneshot}, @@ -38,10 +41,12 @@ use gherkin::tagexpr::TagOperation; use itertools::Itertools as _; use regex::{CaptureLocations, Regex}; +#[cfg(feature = "tracing")] +use crate::tracing::{Collector as TracingCollector, SpanCloseWaiter}; use crate::{ event::{self, HookType, Info, Retries}, feature::Ext as _, - future::yield_now, + future::{select_with_biased_first, FutureExt as _}, parser, step, tag::Ext as _, Event, Runner, Step, World, @@ -378,8 +383,18 @@ pub struct Basic< /// Indicates whether execution should be stopped after the first failure. fail_fast: bool, + + #[cfg(feature = "tracing")] + /// [`TracingCollector`] for [`event::Scenario::Log`]s forwarding. + pub(crate) logs_collector: Arc>>>, } +#[cfg(feature = "tracing")] +/// Assertion that [`Basic::logs_collector`] [`AtomicCell::is_lock_free`]. +const _: () = { + assert!(AtomicCell::>>::is_lock_free()); +}; + // Implemented manually to omit redundant `World: Clone` trait bound, imposed by // `#[derive(Clone)]`. impl Clone for Basic { @@ -395,6 +410,8 @@ impl Clone for Basic { before_hook: self.before_hook.clone(), after_hook: self.after_hook.clone(), fail_fast: self.fail_fast, + #[cfg(feature = "tracing")] + logs_collector: Arc::clone(&self.logs_collector), } } } @@ -437,6 +454,8 @@ impl Default for Basic { before_hook: None, after_hook: None, fail_fast: false, + #[cfg(feature = "tracing")] + logs_collector: Arc::new(AtomicCell::new(Box::new(None))), } } } @@ -530,6 +549,8 @@ impl Basic { before_hook, after_hook, fail_fast, + #[cfg(feature = "tracing")] + logs_collector, .. } = self; Basic { @@ -543,6 +564,8 @@ impl Basic { before_hook, after_hook, fail_fast, + #[cfg(feature = "tracing")] + logs_collector, } } @@ -592,6 +615,8 @@ impl Basic { retry_options, after_hook, fail_fast, + #[cfg(feature = "tracing")] + logs_collector, .. } = self; Basic { @@ -605,6 +630,8 @@ impl Basic { before_hook: Some(func), after_hook, fail_fast, + #[cfg(feature = "tracing")] + logs_collector, } } @@ -641,6 +668,8 @@ impl Basic { retry_options, before_hook, fail_fast, + #[cfg(feature = "tracing")] + logs_collector, .. } = self; Basic { @@ -654,6 +683,8 @@ impl Basic { before_hook, after_hook: Some(func), fail_fast, + #[cfg(feature = "tracing")] + logs_collector, } } @@ -729,6 +760,8 @@ where where S: Stream> + 'static, { + #[cfg(feature = "tracing")] + let logs_collector = *self.logs_collector.swap(Box::new(None)); let Self { max_concurrent_scenarios, retries, @@ -740,6 +773,7 @@ where before_hook, after_hook, fail_fast, + .. } = self; cli.retry = cli.retry.or(retries); @@ -768,6 +802,8 @@ where before_hook, after_hook, fail_fast, + #[cfg(feature = "tracing")] + logs_collector, ); stream::select( @@ -859,6 +895,7 @@ async fn insert_features( /// [`Feature`]: gherkin::Feature /// [`Rule`]: gherkin::Rule /// [`Scenario`]: gherkin::Scenario +#[allow(clippy::too_many_arguments, clippy::too_many_lines)] async fn execute( features: Features, max_concurrent_scenarios: Option, @@ -869,6 +906,7 @@ async fn execute( before_hook: Option, after_hook: Option, fail_fast: bool, + #[cfg(feature = "tracing")] mut logs_collector: Option, ) where W: World, Before: 'static @@ -918,16 +956,14 @@ async fn execute( ControlFlow::Break(()) => Some(0), }; + #[cfg(feature = "tracing")] + let waiter = logs_collector + .as_ref() + .map(TracingCollector::scenario_span_event_waiter); + let mut started_scenarios = ControlFlow::Continue(max_concurrent_scenarios); let mut run_scenarios = stream::FuturesUnordered::new(); loop { - // We yield once on every iteration, because there is a chance, that - // this function never yields otherwise. In this case event sender won't - // send anything to the `Writer` until the end. This is the case, when - // all the parsing is done, so there is no contention on the `Mutex` - // inside `Features` storage and all `Step` functions don't yield. - yield_now().await; - let (runnable, sleep) = features.get(map_break(started_scenarios)).await; if run_scenarios.is_empty() && runnable.is_empty() { @@ -956,21 +992,63 @@ async fn execute( let started = storage.start_scenarios(&runnable); executor.send_all_events(started); - if let ControlFlow::Continue(Some(sc)) = &mut started_scenarios { - *sc -= runnable.len(); - } - - for (f, r, s, ty, retries) in runnable { - run_scenarios.push(executor.run_scenario(f, r, s, ty, retries)); - } + { + #[cfg(feature = "tracing")] + let forward_logs = { + if let Some(coll) = logs_collector.as_mut() { + coll.start_scenarios(&runnable); + } + async { + loop { + while let Some(logs) = logs_collector + .as_mut() + .and_then(TracingCollector::emitted_logs) + { + executor.send_all_events(logs); + } + future::ready(()).then_yield().await; + } + } + }; + #[cfg(feature = "tracing")] + pin_mut!(forward_logs); + #[cfg(not(feature = "tracing"))] + let forward_logs = future::pending(); - if run_scenarios.next().await.is_some() { if let ControlFlow::Continue(Some(sc)) = &mut started_scenarios { - *sc += 1; + *sc -= runnable.len(); + } + + for (id, f, r, s, ty, retries) in runnable { + run_scenarios.push( + executor + .run_scenario( + id, + f, + r, + s, + ty, + retries, + #[cfg(feature = "tracing")] + waiter.as_ref(), + ) + .then_yield(), + ); + } + + let (finished_scenario, _) = + select_with_biased_first(forward_logs, run_scenarios.next()) + .await + .factor_first(); + if finished_scenario.is_some() { + if let ControlFlow::Continue(Some(sc)) = &mut started_scenarios + { + *sc += 1; + } } } - while let Ok(Some((feat, rule, scenario_failed, retried))) = + while let Ok(Some((id, feat, rule, scenario_failed, retried))) = storage.finished_receiver.try_next() { if let Some(rule) = rule { @@ -985,6 +1063,14 @@ async fn execute( if let Some(f) = storage.feature_scenario_finished(feat, retried) { executor.send_event(f); } + #[cfg(feature = "tracing")] + { + if let Some(coll) = logs_collector.as_mut() { + coll.finish_scenario(id); + } + } + #[cfg(not(feature = "tracing"))] + let _ = id; if fail_fast && scenario_failed && !retried { started_scenarios = ControlFlow::Break(()); @@ -1090,14 +1176,16 @@ where /// [`Feature`]: gherkin::Feature /// [`Rule`]: gherkin::Rule /// [`Scenario`]: gherkin::Scenario - #[allow(clippy::too_many_lines)] + #[allow(clippy::too_many_arguments, clippy::too_many_lines)] async fn run_scenario( &self, + id: ScenarioId, feature: Arc, rule: Option>, scenario: Arc, scenario_ty: ScenarioType, retries: Option, + #[cfg(feature = "tracing")] waiter: Option<&SpanCloseWaiter>, ) { let retry_num = retries.map(|r| r.retries); let ok = |e: fn(_) -> event::Scenario| { @@ -1138,106 +1226,163 @@ where event::Scenario::Started.with_retries(retry_num), )); - let mut result = async { - let before_hook = self - .run_before_hook(&feature, rule.as_ref(), &scenario, retry_num) - .await?; - - let feature_background = feature - .background - .as_ref() - .map(|b| b.steps.iter().map(|s| Arc::new(s.clone()))) - .into_iter() - .flatten(); - - let feature_background = stream::iter(feature_background) - .map(Ok) - .try_fold(before_hook, |world, bg_step| { - self.run_step(world, bg_step, true, into_bg_step_ev) + let is_failed = async { + let mut result = async { + let before_hook = self + .run_before_hook( + &feature, + rule.as_ref(), + &scenario, + retry_num, + id, + #[cfg(feature = "tracing")] + waiter, + ) + .await?; + + let feature_background = feature + .background + .as_ref() + .map(|b| b.steps.iter().map(|s| Arc::new(s.clone()))) + .into_iter() + .flatten(); + + let feature_background = stream::iter(feature_background) + .map(Ok) + .try_fold(before_hook, |world, bg_step| { + self.run_step( + world, + bg_step, + true, + into_bg_step_ev, + id, + #[cfg(feature = "tracing")] + waiter, + ) .map_ok(Some) - }) - .await?; - - let rule_background = rule - .as_ref() - .map(|r| { - r.background - .as_ref() - .map(|b| b.steps.iter().map(|s| Arc::new(s.clone()))) - .into_iter() - .flatten() - }) - .into_iter() - .flatten(); + }) + .await?; + + let rule_background = rule + .as_ref() + .map(|r| { + r.background + .as_ref() + .map(|b| { + b.steps.iter().map(|s| Arc::new(s.clone())) + }) + .into_iter() + .flatten() + }) + .into_iter() + .flatten(); - let rule_background = stream::iter(rule_background) - .map(Ok) - .try_fold(feature_background, |world, bg_step| { - self.run_step(world, bg_step, true, into_bg_step_ev) + let rule_background = stream::iter(rule_background) + .map(Ok) + .try_fold(feature_background, |world, bg_step| { + self.run_step( + world, + bg_step, + true, + into_bg_step_ev, + id, + #[cfg(feature = "tracing")] + waiter, + ) .map_ok(Some) - }) - .await?; + }) + .await?; - stream::iter(scenario.steps.iter().map(|s| Arc::new(s.clone()))) - .map(Ok) - .try_fold(rule_background, |world, step| { - self.run_step(world, step, false, into_step_ev).map_ok(Some) - }) + stream::iter(scenario.steps.iter().map(|s| Arc::new(s.clone()))) + .map(Ok) + .try_fold(rule_background, |world, step| { + self.run_step( + world, + step, + false, + into_step_ev, + id, + #[cfg(feature = "tracing")] + waiter, + ) + .map_ok(Some) + }) + .await + } + .await; + + let (world, scenario_finished_ev) = match &mut result { + Ok(world) => { + (world.take(), event::ScenarioFinished::StepPassed) + } + Err(exec_err) => ( + exec_err.take_world(), + exec_err.get_scenario_finished_event(), + ), + }; + + let (world, after_hook_meta, after_hook_error) = self + .run_after_hook( + world, + &feature, + rule.as_ref(), + &scenario, + scenario_finished_ev, + id, + #[cfg(feature = "tracing")] + waiter, + ) .await - } - .await; + .map_or_else( + |(w, meta, info)| (w.map(Arc::new), Some(meta), Some(info)), + |(w, meta)| (w.map(Arc::new), meta, None), + ); - let (world, scenario_finished_ev) = match &mut result { - Ok(world) => (world.take(), event::ScenarioFinished::StepPassed), - Err(exec_err) => ( - exec_err.take_world(), - exec_err.get_scenario_finished_event(), - ), - }; + let scenario_failed = match &result { + Ok(_) | Err(ExecutionFailure::StepSkipped(_)) => false, + Err( + ExecutionFailure::BeforeHookPanicked { .. } + | ExecutionFailure::StepPanicked { .. }, + ) => true, + }; + let is_failed = scenario_failed || after_hook_error.is_some(); - let (world, after_hook_meta, after_hook_error) = self - .run_after_hook( - world, - &feature, - rule.as_ref(), - &scenario, - scenario_finished_ev, - ) - .await - .map_or_else( - |(w, meta, info)| (w.map(Arc::new), Some(meta), Some(info)), - |(w, meta)| (w.map(Arc::new), meta, None), - ); - - let scenario_failed = match &result { - Ok(_) | Err(ExecutionFailure::StepSkipped(_)) => false, - Err( - ExecutionFailure::BeforeHookPanicked { .. } - | ExecutionFailure::StepPanicked { .. }, - ) => true, - }; - let is_failed = scenario_failed || after_hook_error.is_some(); + if let Some(exec_error) = result.err() { + self.emit_failed_events( + Arc::clone(&feature), + rule.clone(), + Arc::clone(&scenario), + world.clone(), + exec_error, + retry_num, + ); + } - if let Some(exec_error) = result.err() { - self.emit_failed_events( + self.emit_after_hook_events( Arc::clone(&feature), rule.clone(), Arc::clone(&scenario), - world.clone(), - exec_error, + world, + after_hook_meta, + after_hook_error, retry_num, ); - } - self.emit_after_hook_events( - Arc::clone(&feature), - rule.clone(), - Arc::clone(&scenario), - world, - after_hook_meta, - after_hook_error, - retry_num, - ); + is_failed + }; + #[cfg(feature = "tracing")] + let (is_failed, span_id) = { + let span = id.scenario_span(); + let span_id = span.id(); + let is_failed = tracing::Instrument::instrument(is_failed, span); + (is_failed, span_id) + }; + let is_failed = is_failed.then_yield().await; + + #[cfg(feature = "tracing")] + if let Some((waiter, span_id)) = waiter.zip(span_id) { + waiter.wait_for_span_close(span_id).then_yield().await; + } self.send_event(event::Cucumber::scenario( Arc::clone(&feature), @@ -1261,7 +1406,13 @@ where .await; } - self.scenario_finished(feature, rule, is_failed, next_try.is_some()); + self.scenario_finished( + id, + feature, + rule, + is_failed, + next_try.is_some(), + ); } /// Executes [`HookType::Before`], if present. @@ -1278,10 +1429,13 @@ where rule: Option<&Arc>, scenario: &Arc, retries: Option, + scenario_id: ScenarioId, + #[cfg(feature = "tracing")] waiter: Option<&SpanCloseWaiter>, ) -> Result, ExecutionFailure> { let init_world = async { AssertUnwindSafe(async { W::new().await }) .catch_unwind() + .then_yield() .await .map_err(Info::from) .and_then(|r| { @@ -1319,7 +1473,24 @@ where } }); - match fut.await { + #[cfg(feature = "tracing")] + let (fut, span_id) = { + let span = scenario_id.hook_span(HookType::Before); + let span_id = span.id(); + let fut = tracing::Instrument::instrument(fut, span); + (fut, span_id) + }; + #[cfg(not(feature = "tracing"))] + let _ = scenario_id; + + let result = fut.then_yield().await; + + #[cfg(feature = "tracing")] + if let Some((waiter, id)) = waiter.zip(span_id) { + waiter.wait_for_span_close(id).then_yield().await; + } + + match result { Ok(world) => { self.send_event(event::Cucumber::scenario( Arc::clone(feature), @@ -1358,6 +1529,8 @@ where step: Arc, is_background: bool, (started, passed, skipped): (St, Ps, Sk), + scenario_id: ScenarioId, + #[cfg(feature = "tracing")] waiter: Option<&SpanCloseWaiter>, ) -> Result> where St: FnOnce(Arc) -> event::Cucumber, @@ -1386,6 +1559,7 @@ where } else { match AssertUnwindSafe(async { W::new().await }) .catch_unwind() + .then_yield() .await { Ok(Ok(w)) => w, @@ -1414,7 +1588,23 @@ where } }; - match run.await { + #[cfg(feature = "tracing")] + let (run, span_id) = { + let span = scenario_id.step_span(is_background); + let span_id = span.id(); + let run = tracing::Instrument::instrument(run, span); + (run, span_id) + }; + let result = run.then_yield().await; + + #[cfg(feature = "tracing")] + if let Some((waiter, id)) = waiter.zip(span_id) { + waiter.wait_for_span_close(id).then_yield().await; + } + #[cfg(not(feature = "tracing"))] + let _ = scenario_id; + + match result { Ok((Some(captures), loc, Some(world))) => { self.send_event(passed(step, captures, loc)); Ok(world) @@ -1531,6 +1721,7 @@ where /// /// Doesn't emit any events, see [`Self::emit_failed_events()`] for more /// details. + #[allow(clippy::too_many_arguments)] async fn run_after_hook( &self, mut world: Option, @@ -1538,6 +1729,8 @@ where rule: Option<&Arc>, scenario: &Arc, ev: event::ScenarioFinished, + scenario_id: ScenarioId, + #[cfg(feature = "tracing")] waiter: Option<&SpanCloseWaiter>, ) -> Result< (Option, Option), (Option, AfterHookEventsMeta, Info), @@ -1555,7 +1748,25 @@ where }; let started = event::Metadata::new(()); - let res = AssertUnwindSafe(fut).catch_unwind().await; + let fut = AssertUnwindSafe(fut).catch_unwind(); + + #[cfg(feature = "tracing")] + let (fut, span_id) = { + let span = scenario_id.hook_span(HookType::After); + let span_id = span.id(); + let fut = tracing::Instrument::instrument(fut, span); + (fut, span_id) + }; + #[cfg(not(feature = "tracing"))] + let _ = scenario_id; + + let res = fut.then_yield().await; + + #[cfg(feature = "tracing")] + if let Some((waiter, id)) = waiter.zip(span_id) { + waiter.wait_for_span_close(id).then_yield().await; + } + let finished = event::Metadata::new(()); let meta = AfterHookEventsMeta { started, finished }; @@ -1624,6 +1835,7 @@ where /// [`Scenario`]: gherkin::Scenario fn scenario_finished( &self, + id: ScenarioId, feature: Arc, rule: Option>, is_failed: IsFailed, @@ -1633,7 +1845,7 @@ where // so we can just ignore it. drop( self.finished_sender - .unbounded_send((feature, rule, is_failed, is_retried)), + .unbounded_send((id, feature, rule, is_failed, is_retried)), ); } @@ -1665,7 +1877,7 @@ where /// [`Cucumber`]: event::Cucumber fn send_all_events( &self, - events: impl Iterator>, + events: impl IntoIterator>, ) { for v in events { // If the receiver end is dropped, then no one listens for events, @@ -1677,6 +1889,30 @@ where } } +/// ID of a [`Scenario`], uniquely identifying it. +/// +/// **NOTE**: Retried [`Scenario`] has a different ID from a failed one. +/// +/// [`Scenario`]: gherkin::Scenario +#[derive(Clone, Copy, Debug, Display, Eq, FromStr, Hash, PartialEq)] +pub struct ScenarioId(pub(crate) u64); + +impl ScenarioId { + /// Creates a new unique [`ScenarioId`]. + pub fn new() -> Self { + /// [`AtomicU64`] ID. + static ID: AtomicU64 = AtomicU64::new(0); + + Self(ID.fetch_add(1, Ordering::Relaxed)) + } +} + +impl Default for ScenarioId { + fn default() -> Self { + Self::new() + } +} + /// Stores currently running [`Rule`]s and [`Feature`]s and notifies about their /// state of completion. /// @@ -1711,6 +1947,7 @@ struct FinishedRulesAndFeatures { /// /// [`Feature`]: gherkin::Feature type FinishedFeaturesSender = mpsc::UnboundedSender<( + ScenarioId, Arc, Option>, IsFailed, @@ -1722,6 +1959,7 @@ type FinishedFeaturesSender = mpsc::UnboundedSender<( /// /// [`Feature`]: gherkin::Feature type FinishedFeaturesReceiver = mpsc::UnboundedReceiver<( + ScenarioId, Arc, Option>, IsFailed, @@ -1825,6 +2063,7 @@ impl FinishedRulesAndFeatures { &mut self, runnable: impl AsRef< [( + ScenarioId, Arc, Option>, Arc, @@ -1836,7 +2075,7 @@ impl FinishedRulesAndFeatures { let runnable = runnable.as_ref(); let mut started_features = Vec::new(); - for feature in runnable.iter().map(|(f, ..)| Arc::clone(f)).dedup() { + for feature in runnable.iter().map(|(_, f, ..)| Arc::clone(f)).dedup() { let _ = self .features_scenarios_count .entry(Arc::clone(&feature)) @@ -1849,7 +2088,7 @@ impl FinishedRulesAndFeatures { let mut started_rules = Vec::new(); for (feat, rule) in runnable .iter() - .filter_map(|(feat, rule, _, _, _)| { + .filter_map(|(_, feat, rule, _, _, _)| { rule.clone().map(|r| (Arc::clone(feat), r)) }) .dedup() @@ -1880,6 +2119,7 @@ impl FinishedRulesAndFeatures { type Scenarios = HashMap< ScenarioType, Vec<( + ScenarioId, Arc, Option>, Arc, @@ -1891,6 +2131,7 @@ type Scenarios = HashMap< type InsertedScenarios = HashMap< ScenarioType, Vec<( + ScenarioId, Arc, Option>, Arc, @@ -1946,13 +2187,14 @@ impl Features { .map(|(feat, rule, scenario)| { let retries = retry(feat, rule, scenario, cli); ( + ScenarioId::new(), Arc::new(feat.clone()), rule.map(|r| Arc::new(r.clone())), Arc::new(scenario.clone()), retries, ) }) - .into_group_map_by(|(f, r, s, _)| { + .into_group_map_by(|(_, f, r, s, _)| { which_scenario(f, r.as_ref().map(AsRef::as_ref), s) }); @@ -1972,8 +2214,11 @@ impl Features { retries: Option, ) { self.insert_scenarios( - iter::once((scenario_ty, vec![(feature, rule, scenario, retries)])) - .collect(), + iter::once(( + scenario_ty, + vec![(ScenarioId::new(), feature, rule, scenario, retries)], + )) + .collect(), ) .await; } @@ -1987,7 +2232,7 @@ impl Features { let mut with_retries = HashMap::<_, Vec<_>>::new(); let mut without_retries: Scenarios = HashMap::new(); for (which, values) in scenarios { - for (f, r, s, ret) in values { + for (id, f, r, s, ret) in values { match ret { ret @ (None | Some(RetryOptions { @@ -2000,14 +2245,14 @@ impl Features { without_retries .entry(which) .or_default() - .push((f, r, s, ret)); + .push((id, f, r, s, ret)); } Some(ret) => { let ret = ret.with_deadline(now); with_retries .entry(which) .or_default() - .push((f, r, s, ret)); + .push((id, f, r, s, ret)); } } } @@ -2017,8 +2262,8 @@ impl Features { for (which, values) in with_retries { let ty_storage = storage.entry(which).or_default(); - for (f, r, s, ret) in values { - ty_storage.insert(0, (f, r, s, Some(ret))); + for (id, f, r, s, ret) in values { + ty_storage.insert(0, (id, f, r, s, Some(ret))); } } @@ -2050,6 +2295,7 @@ impl Features { max_concurrent_scenarios: Option, ) -> ( Vec<( + ScenarioId, Arc, Option>, Arc, @@ -2058,6 +2304,7 @@ impl Features { )>, Option, ) { + use RetryOptionsWithDeadline as WithDeadline; use ScenarioType::{Concurrent, Serial}; if max_concurrent_scenarios == Some(0) { @@ -2066,14 +2313,14 @@ impl Features { let mut min_dur = None; let mut drain = - |storage: &mut Vec<(_, _, _, Option)>, + |storage: &mut Vec<(_, _, _, _, Option)>, ty, count: Option| { let mut i = 0; // TODO: Replace with `drain_filter`, once stabilized. // https://github.com/rust-lang/rust/issues/43244 let drained = - VecExt::drain_filter(storage, |(_, _, _, ret)| { + VecExt::drain_filter(storage, |(_, _, _, _, ret)| { // Because `drain_filter` runs over entire `Vec` on // `Drop`, we can't just `.take(count)`. if count.filter(|c| i >= *c).is_some() { @@ -2081,9 +2328,7 @@ impl Features { } ret.as_ref() - .and_then( - RetryOptionsWithDeadline::left_until_retry, - ) + .and_then(WithDeadline::left_until_retry) .map_or_else( || { i += 1; @@ -2097,7 +2342,9 @@ impl Features { }, ) }) - .map(|(f, r, s, ret)| (f, r, s, ty, ret.map(Into::into))) + .map(|(id, f, r, s, ret)| { + (id, f, r, s, ty, ret.map(Into::into)) + }) .collect::>(); (!drained.is_empty()).then_some(drained) }; diff --git a/src/tracing.rs b/src/tracing.rs new file mode 100644 index 00000000..7eee99e5 --- /dev/null +++ b/src/tracing.rs @@ -0,0 +1,622 @@ +//! [`tracing`] integration layer. + +use std::{collections::HashMap, fmt, io, iter, sync::Arc}; + +use futures::channel::{mpsc, oneshot}; +use itertools::Either; +use tracing::{ + field::{Field, Visit}, + span, Dispatch, Event, Span, Subscriber, +}; +use tracing_subscriber::{ + field::RecordFields, + filter::LevelFilter, + fmt::{ + format::{self, Format}, + FmtContext, FormatEvent, FormatFields, MakeWriter, + }, + layer::{self, Layer, Layered, SubscriberExt as _}, + registry::LookupSpan, + util::SubscriberInitExt as _, +}; + +use crate::{ + event::{self, HookType}, + runner::{ + self, + basic::{RetryOptions, ScenarioId}, + }, + Cucumber, Parser, Runner, ScenarioType, World, Writer, +}; + +impl + Cucumber, Wr, Cli> +where + W: World, + P: Parser, + runner::Basic: Runner, + Wr: Writer, + Cli: clap::Args, +{ + /// Initializes a global [`tracing::Subscriber`] with a default + /// [`fmt::Layer`] and [`LevelFilter::INFO`]. + /// + /// [`fmt::Layer`]: tracing_subscriber::fmt::Layer + #[must_use] + pub fn init_tracing(self) -> Self { + self.configure_and_init_tracing( + format::DefaultFields::new(), + Format::default(), + |layer| { + tracing_subscriber::registry() + .with(LevelFilter::INFO.and_then(layer)) + }, + ) + } + + /// Configures a [`fmt::Layer`], additionally wraps it (for example, into a + /// [`LevelFilter`]), and initializes as a global [`tracing::Subscriber`]. + /// + /// # Example + /// + /// ```rust + /// # use cucumber::{Cucumber, World as _}; + /// # use tracing_subscriber::{ + /// # filter::LevelFilter, + /// # fmt::format::{self, Format}, + /// # layer::SubscriberExt, + /// # Layer, + /// # }; + /// # + /// # #[derive(Debug, Default, cucumber::World)] + /// # struct World; + /// # + /// # let _ = async { + /// World::cucumber() + /// .configure_and_init_tracing( + /// format::DefaultFields::new(), + /// Format::default(), + /// |fmt_layer| { + /// tracing_subscriber::registry() + /// .with(LevelFilter::INFO.and_then(fmt_layer)) + /// }, + /// ) + /// .run_and_exit("./tests/features/doctests.feature") + /// .await + /// # }; + /// ``` + /// + /// [`fmt::Layer`]: tracing_subscriber::fmt::Layer + #[must_use] + pub fn configure_and_init_tracing( + self, + fmt_fields: Fields, + event_format: Event, + configure: Conf, + ) -> Self + where + Fields: for<'a> FormatFields<'a> + 'static, + Event: FormatEvent> + 'static, + Sub: Subscriber + for<'a> LookupSpan<'a>, + Out: Subscriber + Send + Sync, + // TODO: Replace the inner type with TAIT, once stabilized: + // https://github.com/rust-lang/rust/issues/63063 + Conf: FnOnce( + Layered< + tracing_subscriber::fmt::Layer< + Sub, + SkipScenarioIdSpan, + AppendScenarioMsg, + CollectorWriter, + >, + RecordScenarioId, + Sub, + >, + ) -> Out, + { + let (logs_sender, logs_receiver) = mpsc::unbounded(); + let (span_close_sender, span_close_receiver) = mpsc::unbounded(); + + let layer = RecordScenarioId::new(span_close_sender).and_then( + tracing_subscriber::fmt::layer() + .fmt_fields(SkipScenarioIdSpan(fmt_fields)) + .event_format(AppendScenarioMsg(event_format)) + .with_writer(CollectorWriter::new(logs_sender)), + ); + Dispatch::new(configure(layer)).init(); + + drop( + self.runner + .logs_collector + .swap(Box::new(Some(Collector::new( + logs_receiver, + span_close_receiver, + )))), + ); + + self + } +} + +/// [`HashMap`] from a [`ScenarioId`] to its [`Scenario`] and full path. +/// +/// [`Scenario`]: gherkin::Scenario +type Scenarios = HashMap< + ScenarioId, + ( + Arc, + Option>, + Arc, + Option, + ), +>; + +/// All [`Callback`]s for [`Span`]s closing events with their completion status. +type SpanEventsCallbacks = + HashMap>, IsReceived)>; + +/// Indication whether a [`Span`] closing event was received. +type IsReceived = bool; + +/// Callback for notifying a [`Runner`] about a [`Span`] being closed. +type Callback = oneshot::Sender<()>; + +/// Collector of [`tracing::Event`]s. +#[derive(Debug)] +pub(crate) struct Collector { + /// [`Scenarios`] with their IDs. + scenarios: Scenarios, + + /// Receiver of [`tracing::Event`]s messages with optional corresponding + /// [`ScenarioId`]. + logs_receiver: mpsc::UnboundedReceiver<(Option, String)>, + + /// All [`Callback`]s for [`Span`]s closing events with their completion + /// status. + span_events: SpanEventsCallbacks, + + /// Receiver of a [`Span`] closing event. + span_close_receiver: mpsc::UnboundedReceiver, + + /// Sender for subscribing to a [`Span`] closing event. + wait_span_event_sender: mpsc::UnboundedSender<(span::Id, Callback)>, + + /// Receiver for subscribing to a [`Span`] closing event. + wait_span_event_receiver: mpsc::UnboundedReceiver<(span::Id, Callback)>, +} + +impl Collector { + /// Creates a new [`tracing::Event`]s [`Collector`]. + pub(crate) fn new( + logs_receiver: mpsc::UnboundedReceiver<(Option, String)>, + span_close_receiver: mpsc::UnboundedReceiver, + ) -> Self { + let (sender, receiver) = mpsc::unbounded(); + Self { + scenarios: HashMap::new(), + logs_receiver, + span_events: HashMap::new(), + span_close_receiver, + wait_span_event_sender: sender, + wait_span_event_receiver: receiver, + } + } + + /// Creates a new [`SpanCloseWaiter`]. + pub(crate) fn scenario_span_event_waiter(&self) -> SpanCloseWaiter { + SpanCloseWaiter { + wait_span_event_sender: self.wait_span_event_sender.clone(), + } + } + + /// Starts [`Scenario`]s from the provided `runnable`. + /// + /// [`Scenario`]: gherkin::Scenario + pub(crate) fn start_scenarios( + &mut self, + runnable: impl AsRef< + [( + ScenarioId, + Arc, + Option>, + Arc, + ScenarioType, + Option, + )], + >, + ) { + for (id, f, r, s, _, ret) in runnable.as_ref() { + drop(self.scenarios.insert( + *id, + ( + Arc::clone(f), + r.as_ref().map(Arc::clone), + Arc::clone(s), + *ret, + ), + )); + } + } + + /// Marks a [`Scenario`] as finished, by its ID. + /// + /// [`Scenario`]: gherkin::Scenario + pub(crate) fn finish_scenario(&mut self, id: ScenarioId) { + drop(self.scenarios.remove(&id)); + } + + /// Returns all the emitted [`event::Scenario::Log`]s since this method was + /// last called. + /// + /// In case a received [`tracing::Event`] doesn't contain a [`Scenario`]'s + /// [`Span`], such [`tracing::Event`] will be forwarded to all active + /// [`Scenario`]s. + /// + /// [`Scenario`]: gherkin::Scenario + pub(crate) fn emitted_logs( + &mut self, + ) -> Option>> { + self.notify_about_closing_spans(); + + self.logs_receiver + .try_next() + .ok() + .flatten() + .map(|(id, msg)| { + id.and_then(|k| self.scenarios.get(&k)) + .map_or_else( + || Either::Left(self.scenarios.values()), + |p| Either::Right(iter::once(p)), + ) + .map(|(f, r, s, opt)| { + event::Cucumber::scenario( + Arc::clone(f), + r.as_ref().map(Arc::clone), + Arc::clone(s), + event::RetryableScenario { + event: event::Scenario::Log(msg.clone()), + retries: opt.map(|o| o.retries), + }, + ) + }) + .collect() + }) + } + + /// Notifies all its subscribers about closing [`Span`]s via [`Callback`]s. + fn notify_about_closing_spans(&mut self) { + if let Some(id) = self.span_close_receiver.try_next().ok().flatten() { + self.span_events.entry(id).or_default().1 = true; + } + while let Some((id, callback)) = + self.wait_span_event_receiver.try_next().ok().flatten() + { + self.span_events + .entry(id) + .or_default() + .0 + .get_or_insert(Vec::new()) + .push(callback); + } + self.span_events.retain(|_, (callbacks, is_received)| { + if callbacks.is_some() && *is_received { + for callback in callbacks + .take() + .unwrap_or_else(|| unreachable!("`callbacks.is_some()`")) + { + let _ = callback.send(()).ok(); + } + false + } else { + true + } + }); + } +} + +// We better keep this here, as it's related to `tracing` capabilities only. +#[allow(clippy::multiple_inherent_impl)] +impl ScenarioId { + /// Name of the [`ScenarioId`] [`Span`] field. + const SPAN_FIELD_NAME: &'static str = "__cucumber_scenario_id"; + + /// Creates a new [`Span`] for running a [`Scenario`] with this + /// [`ScenarioId`]. + /// + /// [`Scenario`]: gherkin::Scenario + pub(crate) fn scenario_span(self) -> Span { + // `Level::ERROR` is used to minimize the chance of the user-provided + // filter to skip it. + tracing::error_span!("scenario", __cucumber_scenario_id = self.0) + } + + /// Creates a new [`Span`] for a running [`Step`]. + /// + /// [`Step`]: gherkin::Step + #[allow(clippy::unused_self)] + pub(crate) fn step_span(self, is_background: bool) -> Span { + // `Level::ERROR` is used to minimize the chance of the user-provided + // filter to skip it. + if is_background { + tracing::error_span!("background step") + } else { + tracing::error_span!("step") + } + } + + /// Creates a new [`Span`] for running a [`Hook`]. + /// + /// [`Hook`]: event::Hook + #[allow(clippy::unused_self)] + pub(crate) fn hook_span(self, hook_ty: HookType) -> Span { + // `Level::ERROR` is used to minimize the chance of the user-provided + // filter to skip it. + match hook_ty { + HookType::Before => tracing::error_span!("before hook"), + HookType::After => tracing::error_span!("after hook"), + } + } +} + +/// Waiter for a particular [`Span`] to be closed, wich is required because a +/// [`CollectorWriter`] can notify about an [`event::Scenario::Log`] after a +/// [`Scenario`]/[`Step`] is considered [`Finished`] already, due to +/// implementation details of a [`Subscriber`]. +/// +/// [`Finished`]: event::Scenario::Finished +/// [`Scenario`]: gherkin::Scenario +/// [`Step`]: gherkin::Step +#[derive(Clone, Debug)] +pub(crate) struct SpanCloseWaiter { + /// Sender for subscribing to the [`Span`] closing. + wait_span_event_sender: mpsc::UnboundedSender<(span::Id, Callback)>, +} + +impl SpanCloseWaiter { + /// Waits for the [`Span`] being closed. + pub(crate) async fn wait_for_span_close(&self, id: span::Id) { + let (sender, receiver) = oneshot::channel(); + let _ = self + .wait_span_event_sender + .unbounded_send((id, sender)) + .ok(); + let _ = receiver.await.ok(); + } +} + +/// [`Layer`] recording a [`ScenarioId`] into [`Span`]'s [`Extensions`]. +/// +/// [`Extensions`]: tracing_subscriber::registry::Extensions +#[derive(Debug)] +pub struct RecordScenarioId { + /// Sender for [`Span`] closing events. + span_close_sender: mpsc::UnboundedSender, +} + +impl RecordScenarioId { + /// Creates a new [`RecordScenarioId`] [`Layer`]. + const fn new(span_close_sender: mpsc::UnboundedSender) -> Self { + Self { span_close_sender } + } +} + +impl Layer for RecordScenarioId +where + S: for<'a> LookupSpan<'a> + Subscriber, +{ + fn on_new_span( + &self, + attr: &span::Attributes<'_>, + id: &span::Id, + ctx: layer::Context<'_, S>, + ) { + if let Some(span) = ctx.span(id) { + let mut visitor = GetScenarioId(None); + attr.values().record(&mut visitor); + + if let Some(scenario_id) = visitor.0 { + let mut ext = span.extensions_mut(); + let _ = ext.replace(scenario_id); + } + } + } + + fn on_record( + &self, + id: &span::Id, + values: &span::Record<'_>, + ctx: layer::Context<'_, S>, + ) { + if let Some(span) = ctx.span(id) { + let mut visitor = GetScenarioId(None); + values.record(&mut visitor); + + if let Some(scenario_id) = visitor.0 { + let mut ext = span.extensions_mut(); + let _ = ext.replace(scenario_id); + } + } + } + + fn on_close(&self, id: span::Id, _ctx: layer::Context<'_, S>) { + let _ = self.span_close_sender.unbounded_send(id).ok(); + } +} + +/// [`Visit`]or extracting a [`ScenarioId`] from a +/// [`ScenarioId::SPAN_FIELD_NAME`]d [`Field`], in case it's present. +#[derive(Debug)] +struct GetScenarioId(Option); + +impl Visit for GetScenarioId { + fn record_u64(&mut self, field: &Field, value: u64) { + if field.name() == ScenarioId::SPAN_FIELD_NAME { + self.0 = Some(ScenarioId(value)); + } + } + + fn record_debug(&mut self, _: &Field, _: &dyn fmt::Debug) {} +} + +/// [`FormatFields`] wrapper skipping [`Span`]s with a [`ScenarioId`]. +#[derive(Debug)] +pub struct SkipScenarioIdSpan(pub F); + +impl<'w, F: FormatFields<'w>> FormatFields<'w> for SkipScenarioIdSpan { + fn format_fields( + &self, + writer: format::Writer<'w>, + fields: R, + ) -> fmt::Result { + let mut is_scenario_span = IsScenarioIdSpan(false); + fields.record(&mut is_scenario_span); + if !is_scenario_span.0 { + self.0.format_fields(writer, fields)?; + } + Ok(()) + } +} + +/// [`Visit`]or checking whether a [`Span`] has a [`Field`] with the +/// [`ScenarioId::SPAN_FIELD_NAME`]. +#[derive(Debug)] +struct IsScenarioIdSpan(bool); + +impl Visit for IsScenarioIdSpan { + fn record_debug(&mut self, field: &Field, _: &dyn fmt::Debug) { + if field.name() == ScenarioId::SPAN_FIELD_NAME { + self.0 = true; + } + } +} + +/// [`FormatEvent`] wrapper, appending [`tracing::Event`]s with some markers, +/// to parse them later and retrieve optional [`ScenarioId`]. +/// +/// [`Scenario`]: gherkin::Scenario +#[derive(Debug)] +pub struct AppendScenarioMsg(pub F); + +impl FormatEvent for AppendScenarioMsg +where + S: Subscriber + for<'a> LookupSpan<'a>, + N: for<'a> FormatFields<'a> + 'static, + F: FormatEvent, +{ + fn format_event( + &self, + ctx: &FmtContext<'_, S, N>, + mut writer: format::Writer<'_>, + event: &Event<'_>, + ) -> fmt::Result { + self.0.format_event(ctx, writer.by_ref(), event)?; + + if let Some(scenario_id) = ctx.event_scope().and_then(|scope| { + scope + .from_root() + .find_map(|span| span.extensions().get::().copied()) + }) { + writer.write_fmt(format_args!( + "{}{scenario_id}", + suffix::BEFORE_SCENARIO_ID, + ))?; + } else { + writer.write_fmt(format_args!("{}", suffix::NO_SCENARIO_ID))?; + } + writer.write_fmt(format_args!("{}", suffix::END)) + } +} + +mod suffix { + //! [`str`]ings appending [`tracing::Event`]s to separate them later. + //! + //! Every [`tracing::Event`] ends with: + //! + //! ([`BEFORE_SCENARIO_ID`][`ScenarioId`][`END`]|[`NO_SCENARIO_ID`][`END`]) + //! + //! [`ScenarioId`]: super::ScenarioId + + /// End of a [`tracing::Event`] message. + pub(crate) const END: &str = "__cucumber__scenario"; + + /// Separator before a [`ScenarioId`]. + /// + /// [`ScenarioId`]: super::ScenarioId + pub(crate) const BEFORE_SCENARIO_ID: &str = "__"; + + /// Separator in case there is no [`ScenarioId`]. + /// + /// [`ScenarioId`]: super::ScenarioId + pub(crate) const NO_SCENARIO_ID: &str = "__unknown"; +} + +/// [`io::Write`]r sending [`tracing::Event`]s to a `Collector`. +#[derive(Clone, Debug)] +pub struct CollectorWriter { + /// Sender for notifying the [`Collector`] about [`tracing::Event`]s via. + sender: mpsc::UnboundedSender<(Option, String)>, +} + +impl CollectorWriter { + /// Creates a new [`CollectorWriter`]. + const fn new( + sender: mpsc::UnboundedSender<(Option, String)>, + ) -> Self { + Self { sender } + } +} + +impl<'a> MakeWriter<'a> for CollectorWriter { + type Writer = Self; + + fn make_writer(&'a self) -> Self::Writer { + self.clone() + } +} + +impl io::Write for CollectorWriter { + fn write(&mut self, buf: &[u8]) -> io::Result { + // Although this is not documented explicitly anywhere, `io::Write`rs + // inside `tracing::fmt::Layer` always receives fully formatted messages + // at once, not by parts. + // Inside docs of `fmt::Layer::with_writer()`, a non-locked `io::stderr` + // is passed as an `io::Writer`. So, if this guarantee fails, parts of + // log messages will be able to interleave each other, making the result + // unreadable. + let msgs = String::from_utf8_lossy(buf); + for msg in msgs.split_terminator(suffix::END) { + if let Some((before, after)) = + msg.rsplit_once(suffix::NO_SCENARIO_ID) + { + if !after.is_empty() { + return Err(io::Error::new( + io::ErrorKind::InvalidData, + "wrong separator", + )); + } + let _ = + self.sender.unbounded_send((None, before.to_owned())).ok(); + } else if let Some((before, after)) = + msg.rsplit_once(suffix::BEFORE_SCENARIO_ID) + { + let scenario_id = after.parse().map_err(|e| { + io::Error::new(io::ErrorKind::InvalidData, e) + })?; + let _ = self + .sender + .unbounded_send((Some(scenario_id), before.to_owned())) + .ok(); + } else { + return Err(io::Error::new( + io::ErrorKind::InvalidData, + "missing separator", + )); + } + } + Ok(buf.len()) + } + + fn flush(&mut self) -> io::Result<()> { + Ok(()) + } +} diff --git a/src/writer/basic.rs b/src/writer/basic.rs index bbfdca17..cf945f6a 100644 --- a/src/writer/basic.rs +++ b/src/writer/basic.rs @@ -123,6 +123,11 @@ pub struct Basic { /// Number of lines to clear. lines_to_clear: usize, + /// Buffer to be re-output after [`clear_last_lines_if_term_present()`][0]. + /// + /// [0]: Self::clear_last_lines_if_term_present + re_output_after_clear: String, + /// [`Verbosity`] of this [`Writer`]. verbosity: Verbosity, } @@ -224,6 +229,7 @@ impl Basic { styles: Styles::new(), indent: 0, lines_to_clear: 0, + re_output_after_clear: String::new(), verbosity: verbosity.into(), }; basic.apply_cli(Cli { @@ -248,6 +254,8 @@ impl Basic { fn clear_last_lines_if_term_present(&mut self) -> io::Result<()> { if self.styles.is_present && self.lines_to_clear > 0 { self.output.clear_last_lines(self.lines_to_clear)?; + self.output.write_str(&self.re_output_after_clear)?; + self.re_output_after_clear.clear(); self.lines_to_clear = 0; } Ok(()) @@ -371,10 +379,18 @@ impl Basic { Scenario::Finished => { self.indent = self.indent.saturating_sub(2); } + Scenario::Log(msg) => self.emit_log(msg)?, } Ok(()) } + /// Outputs the [`event::Scenario::Log`]. + pub(crate) fn emit_log(&mut self, msg: impl AsRef) -> io::Result<()> { + self.lines_to_clear += msg.as_ref().lines().count(); + self.re_output_after_clear.push_str(msg.as_ref()); + self.output.write_str(msg) + } + /// Outputs the [failed] [`Scenario`]'s hook. /// /// [failed]: event::Hook::Failed @@ -534,7 +550,7 @@ impl Basic { .unwrap_or_default(), indent = " ".repeat(self.indent), ); - self.lines_to_clear = output.lines().count(); + self.lines_to_clear += output.lines().count(); self.write_line(&output)?; } Ok(()) @@ -809,7 +825,7 @@ impl Basic { .unwrap_or_default(), indent = " ".repeat(self.indent.saturating_sub(2)), ); - self.lines_to_clear = output.lines().count(); + self.lines_to_clear += output.lines().count(); self.write_line(&output)?; } Ok(()) diff --git a/src/writer/json.rs b/src/writer/json.rs index 341cc8f4..b45cae58 100644 --- a/src/writer/json.rs +++ b/src/writer/json.rs @@ -189,7 +189,8 @@ impl Json { feature, rule, scenario, "scenario", &st, ev, meta, ); } - Scenario::Started | Scenario::Finished => {} + // TODO: Report logs for each `Scenario`. + Scenario::Started | Scenario::Finished | Scenario::Log(_) => {} } } diff --git a/src/writer/junit.rs b/src/writer/junit.rs index 0d6422bc..341c5e6d 100644 --- a/src/writer/junit.rs +++ b/src/writer/junit.rs @@ -311,6 +311,8 @@ impl JUnit { }) .add_testcase(case); } + // TODO: Report logs for each `Scenario`. + Scenario::Log(_) => {} } } @@ -393,6 +395,8 @@ impl JUnit { sc.name, ); } + // TODO: Report logs for each `Scenario`. + Scenario::Log(_) => unreachable!(), }; // We should be passing normalized events here, diff --git a/src/writer/libtest.rs b/src/writer/libtest.rs index f76b098f..204851e4 100644 --- a/src/writer/libtest.rs +++ b/src/writer/libtest.rs @@ -479,6 +479,16 @@ impl Libtest { Scenario::Step(step, ev) => self.expand_step_event( feature, rule, scenario, &step, ev, retries, false, cli, ), + // We do use `print!()` intentionally here to support `libtest` + // output capturing properly, which can only capture output from + // the standard library’s `print!()` macro. + // This is the same as `tracing_subscriber::fmt::TestWriter` does + // (check its documentation for details). + #[allow(clippy::print_stdout)] + Scenario::Log(msg) => { + print!("{msg}"); + vec![] + } } } diff --git a/src/writer/summarize.rs b/src/writer/summarize.rs index 217264fc..6d7c13ca 100644 --- a/src/writer/summarize.rs +++ b/src/writer/summarize.rs @@ -417,7 +417,8 @@ impl Summarize { let ret = ev.retries; match &ev.event { Scenario::Started - | Scenario::Hook(_, Hook::Passed | Hook::Started) => {} + | Scenario::Hook(_, Hook::Passed | Hook::Started) + | Scenario::Log(_) => {} Scenario::Hook(_, Hook::Failed(..)) => { // - If Scenario's last Step failed and then After Hook failed // too, we don't need to track second failure; diff --git a/tests/after_hook.rs b/tests/after_hook.rs index 65550086..31d1ecc3 100644 --- a/tests/after_hook.rs +++ b/tests/after_hook.rs @@ -53,6 +53,7 @@ async fn fires_each_time() { }) .fail_on_skipped() .with_default_cli() + .max_concurrent_scenarios(1) .run_and_exit("tests/features/wait"); let err = AssertUnwindSafe(res) diff --git a/tests/features/tracing/.feature b/tests/features/tracing/.feature new file mode 100644 index 00000000..1fcb5f82 --- /dev/null +++ b/tests/features/tracing/.feature @@ -0,0 +1,15 @@ +Feature: Basic + + @serial + Scenario: deny skipped + Given step 1 + + Scenario Outline: steps + Given step + + Examples: + | step | + | 2 | + | 3 | + | 4 | + | 5 | diff --git a/tests/features/tracing/correct.stdout b/tests/features/tracing/correct.stdout new file mode 100644 index 00000000..c8e97d97 --- /dev/null +++ b/tests/features/tracing/correct.stdout @@ -0,0 +1,25 @@ +Feature: Basic + Scenario: deny skipped + INFO tracing: not in span: 0 + INFO scenario:step: tracing: in span: 1 + ✔ Given step 1 + Scenario Outline: steps + INFO tracing: not in span: 1 + INFO tracing: not in span: 2 + INFO scenario:step: tracing: in span: 2 + ✔ Given step 2 + Scenario Outline: steps + INFO tracing: not in span: 1 + INFO tracing: not in span: 2 + INFO scenario:step: tracing: in span: 3 + ✔ Given step 3 + Scenario Outline: steps + INFO tracing: not in span: 1 + INFO tracing: not in span: 2 + INFO scenario:step: tracing: in span: 4 + ✔ Given step 4 + Scenario Outline: steps + INFO tracing: not in span: 1 + INFO tracing: not in span: 2 + INFO scenario:step: tracing: in span: 5 + ✔ Given step 5 diff --git a/tests/tracing.rs b/tests/tracing.rs new file mode 100644 index 00000000..c24dd5fb --- /dev/null +++ b/tests/tracing.rs @@ -0,0 +1,78 @@ +use std::{fs, io, panic::AssertUnwindSafe, time::Duration}; + +use cucumber::{given, writer, writer::Coloring, World as _, WriterExt as _}; +use derive_more::Display; +use futures::FutureExt as _; +use regex::Regex; +use tokio::{spawn, time}; +use tracing_subscriber::{ + filter::LevelFilter, + fmt::format::{DefaultFields, Format}, + layer::SubscriberExt as _, + Layer, +}; + +#[tokio::main] +async fn main() { + spawn(async { + let mut id = 0; + loop { + time::sleep(Duration::from_millis(600)).await; + tracing::info!("not in span: {id}"); + id += 1; + } + }); + + let mut out = Vec::::new(); + + let res = World::cucumber() + .with_writer( + writer::Basic::raw(&mut out, Coloring::Never, 0) + .discard_stats_writes() + .tee::( + writer::Basic::raw(io::stdout(), Coloring::Never, 0) + .summarized(), + ) + .normalized(), + ) + .fail_on_skipped() + .with_default_cli() + .configure_and_init_tracing( + DefaultFields::new(), + Format::default().with_ansi(false).without_time(), + |layer| { + tracing_subscriber::registry() + .with(LevelFilter::INFO.and_then(layer)) + }, + ) + .run_and_exit("tests/features/tracing"); + + AssertUnwindSafe(res).catch_unwind().await.unwrap(); + + // Required to strip out non-deterministic parts of output, so we could + // compare them well. + let non_deterministic = Regex::new( + " ([^\"\\n\\s]*)[/\\\\]([A-z1-9-_]*)\\.(feature|rs)(:\\d+:\\d+)?\ + |\\s?\n", + ) + .unwrap(); + + assert_eq!( + non_deterministic + .replace_all(String::from_utf8_lossy(&out).as_ref(), ""), + non_deterministic.replace_all( + &fs::read_to_string("tests/features/tracing/correct.stdout") + .unwrap(), + "", + ), + ); +} + +#[given(regex = "step (\\d+)")] +async fn step(_: &mut World, n: usize) { + time::sleep(Duration::from_secs(1)).await; + tracing::info!("in span: {n:?}"); +} + +#[derive(Clone, cucumber::World, Debug, Default, Display)] +struct World;