diff --git a/audio_processor/src/cdcfg.rs b/audio_processor/src/cdcfg.rs index 485a25ed..2c1544f8 100644 --- a/audio_processor/src/cdcfg.rs +++ b/audio_processor/src/cdcfg.rs @@ -165,6 +165,7 @@ fn resolve( .inner_block_size .try_into() .context("wrap_chunk inner_block_size")?, + disallow_hoisting: false, }, Resample(resample) => Processor::Resample { output_frame_rate: resample @@ -334,7 +335,8 @@ mod tests { inner: Box::new(Processor::Resample { output_frame_rate: 24000 }), - inner_block_size: 10 + inner_block_size: 10, + disallow_hoisting: false, }, Processor::Plugin { path: PathBuf::from("foo"), diff --git a/audio_processor/src/config.rs b/audio_processor/src/config.rs index 5036d46e..42bc619e 100644 --- a/audio_processor/src/config.rs +++ b/audio_processor/src/config.rs @@ -40,6 +40,10 @@ pub enum Processor { WrapChunk { inner: Box, inner_block_size: usize, + /// Prevents merging with the outer pipeline even if they have the same block size. + /// Used when the outer pipeline doesn't actually have a stable block size and the + /// ChunkWrapper is used for regulating it. + disallow_hoisting: bool, }, Resample { output_frame_rate: usize, @@ -208,8 +212,9 @@ impl PipelineBuilder { WrapChunk { inner, inner_block_size, + disallow_hoisting, } => { - if self.output_format().block_size == inner_block_size { + if self.output_format().block_size == inner_block_size && !disallow_hoisting { self.add(*inner).context("inner")?; } else { let inner_pipeline = self @@ -336,6 +341,7 @@ mod tests { WrapChunk { inner: Box::new(Negate), inner_block_size: 1, + disallow_hoisting: false, }, WavSink { path: tempdir.path().join("2.wav"), @@ -353,12 +359,14 @@ mod tests { ], }), inner_block_size: 2, + disallow_hoisting: false, }, WrapChunk { inner_block_size: 5, // Same block size, should pass through. inner: Box::new(WavSink { path: tempdir.path().join("5.wav"), }), + disallow_hoisting: false, }, Resample { output_frame_rate: 48000, @@ -725,6 +733,7 @@ mod bazel_tests { ], }), inner_block_size: 4096, + disallow_hoisting: false, }, ], }) diff --git a/cras/server/processor/BUILD.bazel b/cras/server/processor/BUILD.bazel index 247262f9..94a42ca8 100644 --- a/cras/server/processor/BUILD.bazel +++ b/cras/server/processor/BUILD.bazel @@ -61,7 +61,6 @@ cc_test( deps = [ ":cc", "@pkg_config//gtest", - "@pkg_config//gtest_main", ], ) diff --git a/cras/server/processor/cras_processor_test.cc b/cras/server/processor/cras_processor_test.cc index 091592e0..309a54ff 100644 --- a/cras/server/processor/cras_processor_test.cc +++ b/cras/server/processor/cras_processor_test.cc @@ -2,6 +2,7 @@ // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. +#include #include #include "audio_processor/c/plugin_processor.h" @@ -81,6 +82,7 @@ TEST_P(CrasProcessor, Simple) { .block_size = 480, .frame_rate = 48000, .effect = GetParam().effect, + // wrap_mode is implicitly WrapModeNone. }; auto r = cras_processor_create(&cfg, GetParam().apm); @@ -117,3 +119,62 @@ TEST_P(CrasProcessor, Simple) { processor->ops->destroy(processor); } + +TEST_P(CrasProcessor, Negate) { + CrasProcessorConfig cfg = { + .channels = 2, + .block_size = 2, + .frame_rate = 48000, + .effect = GetParam().effect, + .wrap_mode = WrapModeChunk, + }; + const float m = GetParam().expected_output_mult; + auto r = cras_processor_create(&cfg, GetParam().apm); + plugin_processor* processor = r.plugin_processor; + ASSERT_EQ(r.effect, cfg.effect); + ASSERT_THAT(processor, testing::NotNull()); + + { + std::vector ch0 = {1, 2, 3}; + std::vector ch1 = {4, 5, 6}; + multi_slice input = { + .channels = 2, + .num_frames = ch0.size(), + .data = {ch0.data(), ch1.data()}, + }; + multi_slice output = {}; + ASSERT_EQ(processor->ops->run(processor, &input, &output), StatusOk); + ASSERT_EQ(output.channels, 2); + ASSERT_EQ(output.num_frames, 3); + EXPECT_THAT(std::span(output.data[0], output.num_frames), + testing::ElementsAre(m * 0, m * 0, m * 1)); + EXPECT_THAT(std::span(output.data[1], output.num_frames), + testing::ElementsAre(m * 0, m * 0, m * 4)); + } + + { + std::vector ch0 = {7, 8, 9, 10}; + std::vector ch1 = {11, 12, 13, 14}; + multi_slice input = { + .channels = 2, + .num_frames = ch0.size(), + .data = {ch0.data(), ch1.data()}, + }; + multi_slice output = {}; + ASSERT_EQ(processor->ops->run(processor, &input, &output), StatusOk); + ASSERT_EQ(output.channels, 2); + ASSERT_EQ(output.num_frames, 4); + EXPECT_THAT(std::span(output.data[0], output.num_frames), + testing::ElementsAre(m * 2, m * 3, m * 7, m * 8)); + EXPECT_THAT(std::span(output.data[1], output.num_frames), + testing::ElementsAre(m * 5, m * 6, m * 11, m * 12)); + } + + processor->ops->destroy(processor); +} + +int main(int argc, char** argv) { + ::testing::InitGoogleTest(&argc, argv); + cras_rust_init_logging(); + return RUN_ALL_TESTS(); +} diff --git a/cras/server/processor/processor.h b/cras/server/processor/processor.h index 261de04e..bcd62e0a 100644 --- a/cras/server/processor/processor.h +++ b/cras/server/processor/processor.h @@ -20,6 +20,20 @@ extern "C" { #include "audio_processor/c/plugin_processor.h" #include "cras/common/rust_common.h" +enum CrasProcessorWrapMode { + WrapModeNone, + /** + * Run the processor pipeline in a separate, dedicated thread. + */ + WrapModeDedicatedThread, + /** + * Run the processor pipeline with a ChunkWrapper. + * In this mode, the caller is allowed to run the pipeline with a block + * size that is different from `CrasProcessorConfig::block_size` + */ + WrapModeChunk, +}; + struct CrasProcessorCreateResult { /** * The created processor. @@ -37,7 +51,7 @@ struct CrasProcessorConfig { size_t block_size; size_t frame_rate; enum CrasProcessorEffect effect; - bool dedicated_thread; + enum CrasProcessorWrapMode wrap_mode; bool wav_dump; }; diff --git a/cras/server/processor/src/lib.rs b/cras/server/processor/src/lib.rs index 4e9a8874..d5d604c6 100644 --- a/cras/server/processor/src/lib.rs +++ b/cras/server/processor/src/lib.rs @@ -32,6 +32,18 @@ use cras_s2::global::cras_s2_get_cras_processor_vars; mod processor_override; mod proto; +#[repr(C)] +#[derive(Clone, Copy, Debug)] +pub enum CrasProcessorWrapMode { + WrapModeNone, + /// Run the processor pipeline in a separate, dedicated thread. + WrapModeDedicatedThread, + /// Run the processor pipeline with a ChunkWrapper. + /// In this mode, the caller is allowed to run the pipeline with a block + /// size that is different from `CrasProcessorConfig::block_size` + WrapModeChunk, +} + #[repr(C)] #[derive(Clone, Debug)] pub struct CrasProcessorConfig { @@ -42,8 +54,7 @@ pub struct CrasProcessorConfig { effect: CrasProcessorEffect, - // Run the processor pipeline in a separate, dedicated thread. - dedicated_thread: bool, + wrap_mode: CrasProcessorWrapMode, // Enable processing dumps as WAVE files. wav_dump: bool, @@ -226,6 +237,7 @@ impl CrasProcessor { block_size => Processor::WrapChunk { inner_block_size: block_size as usize, inner: Box::new(plugin), + disallow_hoisting: false, }, }); } @@ -249,11 +261,21 @@ impl CrasProcessor { }); } - let decl_debug = format!("{decl:?}"); + let pipeline = if matches!(config.wrap_mode, CrasProcessorWrapMode::WrapModeChunk) { + Processor::WrapChunk { + inner: Box::new(Processor::Pipeline { processors: decl }), + inner_block_size: config.block_size, + disallow_hoisting: true, + } + } else { + Processor::Pipeline { processors: decl } + }; + + let decl_debug = format!("{pipeline:?}"); let pipeline = PipelineBuilder::new(config.format()) // TODO(b/349784210): Use a hardened worker factory. .with_worker_factory(AudioWorkerSubprocessFactory::default().with_set_thread_priority()) - .build(Processor::Pipeline { processors: decl }) + .build(pipeline) .context("failed to build pipeline")?; log::info!("CrasProcessor #{id} created with: {config:?}"); @@ -345,11 +367,7 @@ pub unsafe extern "C" fn cras_processor_create( CrasProcessor::new( CrasProcessorConfig { effect: CrasProcessorEffect::NoEffects, - channels: config.channels, - block_size: config.block_size, - frame_rate: config.frame_rate, - dedicated_thread: config.dedicated_thread, - wav_dump: config.wav_dump, + ..config }, // apm_processor was consumed so create it again. create_apm_processor(&config, apm_plugin_processor).expect("create_apm_processor should not fail given that we created it successfully once"), @@ -359,7 +377,10 @@ pub unsafe extern "C" fn cras_processor_create( }; let effect = processor.config.effect; - let plugin_processor = if config.dedicated_thread { + let plugin_processor = if matches!( + config.wrap_mode, + CrasProcessorWrapMode::WrapModeDedicatedThread + ) { let threaded_processor = ThreadedProcessor::new(processor, 1); export_plugin(threaded_processor) } else { diff --git a/cras/src/server/cras_stream_apm.c b/cras/src/server/cras_stream_apm.c index a8919234..93f753b1 100644 --- a/cras/src/server/cras_stream_apm.c +++ b/cras/src/server/cras_stream_apm.c @@ -782,10 +782,11 @@ struct cras_apm* cras_stream_apm_add(struct cras_stream_apm* stream, .block_size = frame_length, .frame_rate = apm->fmt.frame_rate, .effect = cp_effect, - .dedicated_thread = - cp_effect == NoEffects - ? false - : cras_feature_enabled(CrOSLateBootCrasProcessorDedicatedThread), + .wrap_mode = + cp_effect != NoEffects && + cras_feature_enabled(CrOSLateBootCrasProcessorDedicatedThread) + ? WrapModeDedicatedThread + : WrapModeNone, .wav_dump = cras_feature_enabled(CrOSLateBootCrasProcessorWavDump), }; struct CrasProcessorCreateResult cras_processor_create_result =