Skip to content

Commit

Permalink
fix test
Browse files Browse the repository at this point in the history
  • Loading branch information
zhyass committed Jan 20, 2025
1 parent adc493b commit 838cb92
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,30 +12,58 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::sync::Arc;

use databend_common_exception::Result;
use databend_common_expression::BlockThresholds;
use databend_common_expression::Column;
use databend_common_expression::DataBlock;
use databend_common_expression::Value;
use databend_common_pipeline_core::processors::InputPort;
use databend_common_pipeline_core::processors::OutputPort;
use databend_common_pipeline_core::processors::ProcessorPtr;
use databend_common_pipeline_core::Pipeline;

use crate::processors::AccumulatingTransform;
use crate::processors::BlockCompactMeta;
use crate::processors::TransformCompactBlock;
use crate::processors::TransformPipelineHelper;
use crate::Transform;
use crate::Transformer;

pub fn build_compact_block_pipeline(
pipeline: &mut Pipeline,
thresholds: BlockThresholds,
) -> Result<()> {
let output_len = pipeline.output_len();
pipeline.add_transform(ConvertToFullTransform::create)?;
pipeline.try_resize(1)?;
pipeline.add_accumulating_transformer(|| BlockCompactBuilder::new(thresholds));
pipeline.try_resize(output_len)?;
pipeline.add_block_meta_transformer(TransformCompactBlock::default);
Ok(())
}

pub(crate) struct ConvertToFullTransform;

impl ConvertToFullTransform {
pub(crate) fn create(input: Arc<InputPort>, output: Arc<OutputPort>) -> Result<ProcessorPtr> {
Ok(ProcessorPtr::create(Transformer::create(
input,
output,
ConvertToFullTransform {},
)))
}
}

impl Transform for ConvertToFullTransform {
const NAME: &'static str = "ConvertToFullTransform";

fn transform(&mut self, data: DataBlock) -> Result<DataBlock> {
Ok(data.consume_convert_to_full())
}
}

pub struct BlockCompactBuilder {
thresholds: BlockThresholds,
// Holds blocks that are partially accumulated but haven't reached the threshold.
Expand Down Expand Up @@ -64,7 +92,6 @@ impl AccumulatingTransform for BlockCompactBuilder {
const NAME: &'static str = "BlockCompactBuilder";

fn transform(&mut self, data: DataBlock) -> Result<Vec<DataBlock>> {
let data = data.consume_convert_to_full();
let num_rows = data.num_rows();
let num_bytes = memory_size(&data);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,6 @@ impl AccumulatingTransform for BlockCompactNoSplitBuilder {
const NAME: &'static str = "BlockCompactNoSplitBuilder";

fn transform(&mut self, data: DataBlock) -> Result<Vec<DataBlock>> {
let data = data.consume_convert_to_full();
self.accumulated_rows += data.num_rows();
self.accumulated_bytes += crate::processors::memory_size(&data);
if !self
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ use crate::processors::transforms::Transformer;
pub struct TransformDummy;

impl TransformDummy {
#[allow(dead_code)]
pub fn create(input: Arc<InputPort>, output: Arc<OutputPort>) -> ProcessorPtr {
ProcessorPtr::create(Transformer::create(input, output, TransformDummy {}))
}
Expand Down

0 comments on commit 838cb92

Please sign in to comment.