diff --git a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/hive/GlutenClickHouseNativeWriteTableSuite.scala b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/hive/GlutenClickHouseNativeWriteTableSuite.scala index d0d03eaf1edb..03d27f33b1e1 100644 --- a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/hive/GlutenClickHouseNativeWriteTableSuite.scala +++ b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/hive/GlutenClickHouseNativeWriteTableSuite.scala @@ -903,7 +903,7 @@ class GlutenClickHouseNativeWriteTableSuite } test("GLUTEN-2584: fix native write and read mismatch about complex types") { - def table(format: String): String = s"t_$format" + def table(format: String): String = s"t_2584_$format" def create(format: String, table_name: Option[String] = None): String = s"""CREATE TABLE ${table_name.getOrElse(table(format))}( | id INT, @@ -934,4 +934,63 @@ class GlutenClickHouseNativeWriteTableSuite } ) } + + test( + "GLUTEN-8021/8022/8032: fix orc read/write mismatch and parquet" + + "read exception when written complex column contains null") { + def table(format: String): String = s"t_8021_$format" + def create(format: String, table_name: Option[String] = None): String = + s"""CREATE TABLE ${table_name.getOrElse(table(format))}( + |id int, + |x int, + |y int, + |mp map, + |arr array, + |tup struct, + |arr_mp array>, + |mp_arr map>, + |tup_arr struct>, + |tup_map struct> + |) stored as $format""".stripMargin + def insert(format: String, table_name: Option[String] = None): String = + s"""INSERT OVERWRITE TABLE ${table_name.getOrElse(table(format))} + |SELECT + | id, x, y, + | str_to_map(concat('x:', x, ',y:', y)) AS mp, + | IF(id % 4 = 0, NULL, array(x, y)) AS arr, + | IF(id % 4 = 1, NULL, struct(x, y)) AS tup, + | IF(id % 4 = 2, NULL, array(str_to_map(concat('x:', x, ',y:', y)))) AS arr_mp, + | IF(id % 4 = 3, NULL, map('x', array(x), 'y', array(y))) AS mp_arr, + | IF(id % 4 = 0, NULL, named_struct('a', array(x, y))) AS tup_arr, + | IF(id % 4 = 1, NULL, named_struct('m', + | str_to_map(concat('x:', x, ',y:', y)))) AS tup_map + |FROM ( + | SELECT + | id, + | IF(id % 3 = 1, NULL, id + 1) AS x, + | IF(id % 3 = 1, NULL, id + 2) AS y + | FROM range(100) + |) AS data_source;""".stripMargin + + // TODO fix it in spark3.5 + if (!isSparkVersionGE("3.5")) { + nativeWrite2( + format => (table(format), create(format), insert(format)), + (table_name, format) => { + val vanilla_table = s"${table_name}_v" + val vanilla_create = create(format, Some(vanilla_table)) + vanillaWrite { + withDestinationTable(vanilla_table, Option(vanilla_create)) { + checkInsertQuery(insert(format, Some(vanilla_table)), checkNative = false) + } + } + val rowsFromOriginTable = + spark.sql(s"select * from $vanilla_table").collect() + val dfFromWriteTable = + spark.sql(s"select * from $table_name") + checkAnswer(dfFromWriteTable, rowsFromOriginTable) + } + ) + } + } } diff --git a/cpp-ch/clickhouse.version b/cpp-ch/clickhouse.version index 62a70f06c1e1..003f11113345 100644 --- a/cpp-ch/clickhouse.version +++ b/cpp-ch/clickhouse.version @@ -1,3 +1,3 @@ CH_ORG=Kyligence CH_BRANCH=rebase_ch/20241118 -CH_COMMIT=7f22fe487c88d3b988ea82a5c34882da23ea6289 +CH_COMMIT=a5944dfb7b3 diff --git a/cpp-ch/local-engine/Common/CHUtil.cpp b/cpp-ch/local-engine/Common/CHUtil.cpp index 8fef52e50a68..28c759c0efc3 100644 --- a/cpp-ch/local-engine/Common/CHUtil.cpp +++ b/cpp-ch/local-engine/Common/CHUtil.cpp @@ -710,6 +710,7 @@ void BackendInitializerUtil::initSettings(const SparkConfigs::ConfigMap & spark_ settings.set("max_parsing_threads", 1); settings.set("max_download_threads", 1); settings.set("input_format_parquet_enable_row_group_prefetch", false); + settings.set("output_format_parquet_use_custom_encoder", false); /// update per https://github.com/ClickHouse/ClickHouse/pull/71539 /// if true, we can't get correct metrics for the query diff --git a/cpp-ch/local-engine/Storages/Output/NormalFileWriter.cpp b/cpp-ch/local-engine/Storages/Output/NormalFileWriter.cpp index e9fd4f358a86..e5a2d89f26c8 100644 --- a/cpp-ch/local-engine/Storages/Output/NormalFileWriter.cpp +++ b/cpp-ch/local-engine/Storages/Output/NormalFileWriter.cpp @@ -19,13 +19,134 @@ #include #include #include +#include +#include +#include namespace local_engine { +using namespace DB; + const std::string SubstraitFileSink::NO_PARTITION_ID{"__NO_PARTITION_ID__"}; const std::string SparkPartitionedBaseSink::DEFAULT_PARTITION_NAME{"__HIVE_DEFAULT_PARTITION__"}; +/// For Nullable(Map(K, V)) or Nullable(Array(T)), if the i-th row is null, we must make sure its nested data is empty. +/// It is for ORC/Parquet writing compatiability. For more details, refer to +/// https://github.com/apache/incubator-gluten/issues/8022 and https://github.com/apache/incubator-gluten/issues/8021 +static ColumnPtr truncateNestedDataIfNull(const ColumnPtr & column) +{ + if (const auto * col_const = checkAndGetColumn(column.get())) + { + size_t s = col_const->size(); + auto new_data = truncateNestedDataIfNull(col_const->getDataColumnPtr()); + return ColumnConst::create(std::move(new_data), s); + } + else if (const auto * col_array = checkAndGetColumn(column.get())) + { + auto new_data = truncateNestedDataIfNull(col_array->getDataPtr()); + return ColumnArray::create(std::move(new_data), col_array->getOffsetsPtr()); + } + else if (const auto * col_map = checkAndGetColumn(column.get())) + { + auto new_nested = truncateNestedDataIfNull(col_map->getNestedColumnPtr()); + return ColumnMap::create(std::move(new_nested)); + } + else if (const auto * col_tuple = checkAndGetColumn(column.get())) + { + Columns new_columns; + for (size_t i = 0; i < col_tuple->tupleSize(); ++i) + new_columns.emplace_back(truncateNestedDataIfNull(col_tuple->getColumnPtr(i))); + return ColumnTuple::create(std::move(new_columns)); + } + else if (const auto * col_nullable = checkAndGetColumn(column.get())) + { + const auto & null_map = col_nullable->getNullMapData(); + auto nested = truncateNestedDataIfNull(col_nullable->getNestedColumnPtr()); + const auto * nested_array = checkAndGetColumn(nested.get()); + const auto * nested_map = checkAndGetColumn(nested.get()); + const auto * nested_tuple = checkAndGetColumn(nested.get()); + + if (!memoryIsZero(null_map.data(), 0, null_map.size()) && (nested_array || nested_map || nested_tuple)) + { + /// Process Nullable(Array) or Nullable(Map) + if (nested_array || nested_map) + { + if (!nested_array) + nested_array = checkAndGetColumn(&nested_map->getNestedColumn()); + + const auto & offsets = nested_array->getOffsets(); + size_t total_data_size = 0; + for (size_t i = 0; i < null_map.size(); ++i) + total_data_size += (offsets[i] - offsets[i - 1]) * (!null_map[i]); + + auto new_nested_array = nested_array->cloneEmpty(); + new_nested_array->reserve(nested_array->size()); + auto & new_nested_array_data = assert_cast(*new_nested_array).getData(); + new_nested_array_data.reserve(total_data_size); + + for (size_t i = 0; i < null_map.size(); ++i) + if (null_map[i]) + new_nested_array->insertDefault(); + else + new_nested_array->insertFrom(*nested_array, i); + + if (nested_map) + { + auto new_nested_map = ColumnMap::create(std::move(new_nested_array)); + return ColumnNullable::create(std::move(new_nested_map), col_nullable->getNullMapColumnPtr()); + } + else + { + return ColumnNullable::create(std::move(new_nested_array), col_nullable->getNullMapColumnPtr()); + } + } + else + { + /// Process Nullable(Tuple) + const auto & nested_columns = nested_tuple->getColumns(); + Columns new_nested_columns(nested_columns.size()); + for (size_t i = 0; i < nested_columns.size(); ++i) + { + const auto & nested_column = nested_columns[i]; + TypeIndex type_index = nested_column->getDataType(); + if (const auto * nullable_nested_column = checkAndGetColumn(nested_column.get())) + type_index = nullable_nested_column->getNestedColumnPtr()->getDataType(); + + bool should_truncate = type_index == TypeIndex::Array || type_index == TypeIndex::Map || type_index == TypeIndex::Tuple; + if (should_truncate) + { + auto new_nested_column = nested_column->cloneEmpty(); + new_nested_column->reserve(nested_column->size()); + for (size_t j = 0; j < null_map.size(); ++j) + { + if (null_map[j]) + new_nested_column->insertDefault(); + else + new_nested_column->insertFrom(*nested_column, j); + } + new_nested_columns[i] = std::move(new_nested_column); + } + else + { + new_nested_columns[i] = nested_column; + } + } + + auto new_nested_tuple = ColumnTuple::create(std::move(new_nested_columns)); + return ColumnNullable::create(std::move(new_nested_tuple), col_nullable->getNullMapColumnPtr()); + } + } + else + { + auto new_nested = truncateNestedDataIfNull(nested); + return ColumnNullable::create(std::move(new_nested), col_nullable->getNullMapColumnPtr()); + } + } + else + return column; +} + NormalFileWriter::NormalFileWriter(const OutputFormatFilePtr & file_, const DB::ContextPtr & context_) : file(file_), context(context_) { } @@ -51,6 +172,8 @@ void NormalFileWriter::write(DB::Block & block) continue; const auto & preferred_column = preferred_schema.getByPosition(index++); + /// Make sure nested array or map data is empty when the row is null in Nullable(Map(K, V)) or Nullable(Array(T)). + column.column = truncateNestedDataIfNull(column.column); column.column = DB::castColumn(column, preferred_column.type); column.name = preferred_column.name; column.type = preferred_column.type;