From 838cb92811045bb1818dd5cbf78126b7ebaa67c8 Mon Sep 17 00:00:00 2001 From: zhyass Date: Mon, 20 Jan 2025 14:40:20 +0800 Subject: [PATCH] fix test --- .../transforms/transform_compact_builder.rs | 29 ++++++++++++++++++- .../transform_compact_no_split_builder.rs | 1 - .../processors/transforms/transform_dummy.rs | 1 - 3 files changed, 28 insertions(+), 3 deletions(-) diff --git a/src/query/pipeline/transforms/src/processors/transforms/transform_compact_builder.rs b/src/query/pipeline/transforms/src/processors/transforms/transform_compact_builder.rs index e31442bdc657c..6c77659ef3cfd 100644 --- a/src/query/pipeline/transforms/src/processors/transforms/transform_compact_builder.rs +++ b/src/query/pipeline/transforms/src/processors/transforms/transform_compact_builder.rs @@ -12,23 +12,31 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::sync::Arc; + use databend_common_exception::Result; use databend_common_expression::BlockThresholds; use databend_common_expression::Column; use databend_common_expression::DataBlock; use databend_common_expression::Value; +use databend_common_pipeline_core::processors::InputPort; +use databend_common_pipeline_core::processors::OutputPort; +use databend_common_pipeline_core::processors::ProcessorPtr; use databend_common_pipeline_core::Pipeline; use crate::processors::AccumulatingTransform; use crate::processors::BlockCompactMeta; use crate::processors::TransformCompactBlock; use crate::processors::TransformPipelineHelper; +use crate::Transform; +use crate::Transformer; pub fn build_compact_block_pipeline( pipeline: &mut Pipeline, thresholds: BlockThresholds, ) -> Result<()> { let output_len = pipeline.output_len(); + pipeline.add_transform(ConvertToFullTransform::create)?; pipeline.try_resize(1)?; pipeline.add_accumulating_transformer(|| BlockCompactBuilder::new(thresholds)); pipeline.try_resize(output_len)?; @@ -36,6 +44,26 @@ pub fn build_compact_block_pipeline( Ok(()) } +pub(crate) struct ConvertToFullTransform; + +impl ConvertToFullTransform { + pub(crate) fn create(input: Arc, output: Arc) -> Result { + Ok(ProcessorPtr::create(Transformer::create( + input, + output, + ConvertToFullTransform {}, + ))) + } +} + +impl Transform for ConvertToFullTransform { + const NAME: &'static str = "ConvertToFullTransform"; + + fn transform(&mut self, data: DataBlock) -> Result { + Ok(data.consume_convert_to_full()) + } +} + pub struct BlockCompactBuilder { thresholds: BlockThresholds, // Holds blocks that are partially accumulated but haven't reached the threshold. @@ -64,7 +92,6 @@ impl AccumulatingTransform for BlockCompactBuilder { const NAME: &'static str = "BlockCompactBuilder"; fn transform(&mut self, data: DataBlock) -> Result> { - let data = data.consume_convert_to_full(); let num_rows = data.num_rows(); let num_bytes = memory_size(&data); diff --git a/src/query/pipeline/transforms/src/processors/transforms/transform_compact_no_split_builder.rs b/src/query/pipeline/transforms/src/processors/transforms/transform_compact_no_split_builder.rs index 9773e0375b1be..adcd014acb56f 100644 --- a/src/query/pipeline/transforms/src/processors/transforms/transform_compact_no_split_builder.rs +++ b/src/query/pipeline/transforms/src/processors/transforms/transform_compact_no_split_builder.rs @@ -78,7 +78,6 @@ impl AccumulatingTransform for BlockCompactNoSplitBuilder { const NAME: &'static str = "BlockCompactNoSplitBuilder"; fn transform(&mut self, data: DataBlock) -> Result> { - let data = data.consume_convert_to_full(); self.accumulated_rows += data.num_rows(); self.accumulated_bytes += crate::processors::memory_size(&data); if !self diff --git a/src/query/pipeline/transforms/src/processors/transforms/transform_dummy.rs b/src/query/pipeline/transforms/src/processors/transforms/transform_dummy.rs index 1dc6ee7d0d39a..17318b7716801 100644 --- a/src/query/pipeline/transforms/src/processors/transforms/transform_dummy.rs +++ b/src/query/pipeline/transforms/src/processors/transforms/transform_dummy.rs @@ -27,7 +27,6 @@ use crate::processors::transforms::Transformer; pub struct TransformDummy; impl TransformDummy { - #[allow(dead_code)] pub fn create(input: Arc, output: Arc) -> ProcessorPtr { ProcessorPtr::create(Transformer::create(input, output, TransformDummy {})) }