From 0da3ff4aa0ef8cbc7305d64b215cd482da97e399 Mon Sep 17 00:00:00 2001 From: lgbo-ustc Date: Wed, 14 Sep 2022 20:54:45 +0800 Subject: [PATCH] support multi parition columns --- .../PartitionColumnFillingTransform.cpp | 84 +++++++++++-------- .../PartitionColumnFillingTransform.h | 15 ++-- .../Parser/SerializedPlanParser.cpp | 33 +++++--- 3 files changed, 79 insertions(+), 53 deletions(-) diff --git a/utils/local-engine/Operator/PartitionColumnFillingTransform.cpp b/utils/local-engine/Operator/PartitionColumnFillingTransform.cpp index d273bee815da..4ff597dacdf1 100644 --- a/utils/local-engine/Operator/PartitionColumnFillingTransform.cpp +++ b/utils/local-engine/Operator/PartitionColumnFillingTransform.cpp @@ -6,12 +6,16 @@ #include #include #include +#include "Processors/Chunk.h" #include #include #include #include #include +#include +#include + using namespace DB; @@ -26,40 +30,37 @@ namespace ErrorCodes namespace local_engine { template -requires( - std::is_same_v || std::is_same_v || std::is_same_v || std::is_same_v || std::is_same_v) - ColumnPtr createIntPartitionColumn(DataTypePtr column_type, std::string partition_value) + requires(std::is_same_v || std::is_same_v || std::is_same_v || std::is_same_v || std::is_same_v) +ColumnPtr createIntPartitionColumn(DataTypePtr column_type, std::string partition_value, size_t rows) { Type value; auto value_buffer = ReadBufferFromString(partition_value); readIntText(value, value_buffer); - return column_type->createColumnConst(1, value); + return column_type->createColumnConst(rows, value); } template -requires(std::is_same_v || std::is_same_v) ColumnPtr - createFloatPartitionColumn(DataTypePtr column_type, std::string partition_value) + requires(std::is_same_v || std::is_same_v) +ColumnPtr createFloatPartitionColumn(DataTypePtr column_type, std::string partition_value, size_t rows) { Type value; auto value_buffer = ReadBufferFromString(partition_value); readFloatText(value, value_buffer); - return column_type->createColumnConst(1, value); + return column_type->createColumnConst(rows, value); } -//template <> -//ColumnPtr createFloatPartitionColumn(DataTypePtr column_type, std::string partition_value); -//template <> -//ColumnPtr createFloatPartitionColumn(DataTypePtr column_type, std::string partition_value); PartitionColumnFillingTransform::PartitionColumnFillingTransform( - const DB::Block & input_, const DB::Block & output_, const String & partition_col_name_, const String & partition_col_value_) - : ISimpleTransform(input_, output_, true), partition_col_name(partition_col_name_), partition_col_value(partition_col_value_) + const DB::Block & input_, const DB::Block & output_, const PartitionValues & partition_columns_) + : ISimpleTransform(input_, output_, true), partition_column_values(partition_columns_) { - partition_col_type = output_.getByName(partition_col_name_).type; - partition_column = createPartitionColumn(); + for (const auto & value : partition_column_values) + { + partition_columns[value.first] = value.second; + } } -/// In the case that a partition column is wrapper by nullable and LowCardinality, we need to keep the data type same +/// In the case that a partition column is wrapper by nullable or LowCardinality, we need to keep the data type same /// as input. ColumnPtr PartitionColumnFillingTransform::tryWrapPartitionColumn(const ColumnPtr & nested_col, DataTypePtr original_data_type) { @@ -71,9 +72,10 @@ ColumnPtr PartitionColumnFillingTransform::tryWrapPartitionColumn(const ColumnPt return result; } -ColumnPtr PartitionColumnFillingTransform::createPartitionColumn() +ColumnPtr PartitionColumnFillingTransform::createPartitionColumn(const String & parition_col, const String & partition_col_value, size_t rows) { ColumnPtr result; + auto partition_col_type = output.getHeader().getByName(parition_col).type; DataTypePtr nested_type = partition_col_type; if (const DataTypeNullable * nullable_type = checkAndGetDataType(partition_col_type.get())) { @@ -86,45 +88,45 @@ ColumnPtr PartitionColumnFillingTransform::createPartitionColumn() WhichDataType which(nested_type); if (which.isInt8()) { - result = createIntPartitionColumn(nested_type, partition_col_value); + result = createIntPartitionColumn(nested_type, partition_col_value, rows); } else if (which.isInt16()) { - result = createIntPartitionColumn(nested_type, partition_col_value); + result = createIntPartitionColumn(nested_type, partition_col_value, rows); } else if (which.isInt32()) { - result = createIntPartitionColumn(nested_type, partition_col_value); + result = createIntPartitionColumn(nested_type, partition_col_value, rows); } else if (which.isInt64()) { - result = createIntPartitionColumn(nested_type, partition_col_value); + result = createIntPartitionColumn(nested_type, partition_col_value, rows); } else if (which.isFloat32()) { - result = createFloatPartitionColumn(nested_type, partition_col_value); + result = createFloatPartitionColumn(nested_type, partition_col_value, rows); } else if (which.isFloat64()) { - result = createFloatPartitionColumn(nested_type, partition_col_value); + result = createFloatPartitionColumn(nested_type, partition_col_value, rows); } else if (which.isDate()) { DayNum value; auto value_buffer = ReadBufferFromString(partition_col_value); readDateText(value, value_buffer); - result = nested_type->createColumnConst(1, value); + result = nested_type->createColumnConst(rows, value); } else if (which.isDate32()) { ExtendedDayNum value; auto value_buffer = ReadBufferFromString(partition_col_value); readDateText(value, value_buffer); - result = nested_type->createColumnConst(1, value.toUnderType()); + result = nested_type->createColumnConst(rows, value.toUnderType()); } else if (which.isString()) { - result = nested_type->createColumnConst(1, partition_col_value); + result = nested_type->createColumnConst(rows, partition_col_value); } else { @@ -136,14 +138,30 @@ ColumnPtr PartitionColumnFillingTransform::createPartitionColumn() void PartitionColumnFillingTransform::transform(DB::Chunk & chunk) { - size_t partition_column_position = output.getHeader().getPositionByName(partition_col_name); - if (partition_column_position == input.getHeader().columns()) - { - chunk.addColumn(partition_column->cloneResized(chunk.getNumRows())); - } - else + auto rows = chunk.getNumRows(); + auto input_cols = chunk.detachColumns(); + Columns result_cols; + auto input_header = input.getHeader(); + for (const auto & output_col : output.getHeader()) { - chunk.addColumn(partition_column_position, partition_column->cloneResized(chunk.getNumRows())); + if (input_header.has(output_col.name)) + { + size_t pos = input_header.getPositionByName(output_col.name); + result_cols.push_back(input_cols[pos]); + } + else + { + // it's a partition column + auto it = partition_columns.find(output_col.name); + if (it == partition_columns.end()) + { + throw Exception(ErrorCodes::LOGICAL_ERROR, "Not found column({}) in parition columns", output_col.name); + } + result_cols.emplace_back(createPartitionColumn(it->first, it->second, rows)); + + } + } + chunk = DB::Chunk(std::move(result_cols), rows); } } diff --git a/utils/local-engine/Operator/PartitionColumnFillingTransform.h b/utils/local-engine/Operator/PartitionColumnFillingTransform.h index 65f7c0c2e0e6..fb3272125145 100644 --- a/utils/local-engine/Operator/PartitionColumnFillingTransform.h +++ b/utils/local-engine/Operator/PartitionColumnFillingTransform.h @@ -1,6 +1,10 @@ #pragma once #include +#include "Common/StringUtils.h" +#include "Columns/IColumn.h" +#include "Core/Block.h" +#include "DataTypes/Serializations/ISerialization.h" namespace local_engine { @@ -10,8 +14,7 @@ class PartitionColumnFillingTransform : public DB::ISimpleTransform PartitionColumnFillingTransform( const DB::Block & input_, const DB::Block & output_, - const String & partition_col_name_, - const String & partition_col_value_); + const PartitionValues & partition_columns_); void transform(DB::Chunk & chunk) override; String getName() const override { @@ -19,13 +22,11 @@ class PartitionColumnFillingTransform : public DB::ISimpleTransform } private: - DB::ColumnPtr createPartitionColumn(); + DB::ColumnPtr createPartitionColumn(const String & parition_col, const String & partition_col_value, size_t row); static DB::ColumnPtr tryWrapPartitionColumn(const DB::ColumnPtr & nested_col, DB::DataTypePtr original_data_type); - DB::DataTypePtr partition_col_type; - String partition_col_name; - String partition_col_value; - DB::ColumnPtr partition_column; + PartitionValues partition_column_values; + std::map partition_columns; }; } diff --git a/utils/local-engine/Parser/SerializedPlanParser.cpp b/utils/local-engine/Parser/SerializedPlanParser.cpp index c64565fdd517..ad5e94b4b622 100644 --- a/utils/local-engine/Parser/SerializedPlanParser.cpp +++ b/utils/local-engine/Parser/SerializedPlanParser.cpp @@ -1,4 +1,5 @@ -#include +#include "SerializedPlanParser.h" +#include #include #include #include @@ -41,7 +42,7 @@ #include #include -#include "SerializedPlanParser.h" +#include namespace DB { @@ -197,19 +198,14 @@ QueryPlanPtr SerializedPlanParser::parseReadRealWithLocalFile(const substrait::R } auto header = parseNameStruct(rel.base_schema()); PartitionValues partition_values = StringUtils::parsePartitionTablePath(files_info->files[0]); - if (partition_values.size() > 1) - { - throw Exception(ErrorCodes::NOT_IMPLEMENTED, "doesn't support multiple level partition."); - } - ProcessorPtr partition_transform; - if (!partition_values.empty()) + + auto origin_header = header.cloneEmpty(); + for (const auto & partition_value : partition_values) { - auto origin_header = header.cloneEmpty(); - PartitionValue partition_value = partition_values[0]; header.erase(partition_value.first); - partition_transform - = std::make_shared(header, origin_header, partition_value.first, partition_value.second); } + ProcessorPtr partition_transform = std::make_shared(header, origin_header, partition_values); + auto query_plan = std::make_unique(); std::shared_ptr source = std::make_shared(files_info, header, context); auto source_pipe = Pipe(source); @@ -1281,7 +1277,18 @@ QueryPlanPtr SerializedPlanParser::parse(std::string & plan) { auto plan_ptr = std::make_unique(); plan_ptr->ParseFromString(plan); - LOG_DEBUG(&Poco::Logger::get("SerializedPlanParser"), "parse plan \n{}", plan_ptr->DebugString()); + + auto printPlan = [](const std::string & plan_raw){ + substrait::Plan plan; + plan.ParseFromString(plan_raw); + std::string json_ret; + google::protobuf::util::JsonPrintOptions json_opt; + json_opt.add_whitespace = true; + google::protobuf::util::MessageToJsonString(plan, &json_ret, json_opt); + return json_ret; + }; + + LOG_DEBUG(&Poco::Logger::get("SerializedPlanParser"), "parse plan \n{}", printPlan(plan)); return parse(std::move(plan_ptr)); } void SerializedPlanParser::initFunctionEnv()