Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[GLUTEN-8021][CH] Fix ORC read/write mismatch and parquet read failure when column with complex types contains null #8023

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -903,7 +903,7 @@ class GlutenClickHouseNativeWriteTableSuite
}

test("GLUTEN-2584: fix native write and read mismatch about complex types") {
def table(format: String): String = s"t_$format"
def table(format: String): String = s"t_2584_$format"
def create(format: String, table_name: Option[String] = None): String =
s"""CREATE TABLE ${table_name.getOrElse(table(format))}(
| id INT,
Expand Down Expand Up @@ -934,4 +934,62 @@ class GlutenClickHouseNativeWriteTableSuite
}
)
}

test(
"GLUTEN-8021/8022/8032: fix orc read/write mismatch and parquet" +
"read exception when written complex column contains null") {
def table(format: String): String = s"t_8021_$format"
def create(format: String, table_name: Option[String] = None): String =
s"""CREATE TABLE ${table_name.getOrElse(table(format))}(
|id int,
|x int,
|y int,
|mp map<string, string>,
|arr array<int>,
|tup struct<x:int, y:int>,
|arr_mp array<map<string, string>>,
|mp_arr map<string, array<int>>,
|tup_arr struct<a: array<int>>,
|tup_map struct<m: map<string, string>>
|) stored as $format""".stripMargin
def insert(format: String, table_name: Option[String] = None): String =
s"""INSERT OVERWRITE TABLE ${table_name.getOrElse(table(format))}
|with data_source as (
|select
|id,
|if(id % 3 = 1, null, id+1) as x,
|if(id % 3 = 1, null, id+2) as y
|from range(100)
|)
|select
|id, x, y,
|str_to_map(concat('x:', x, ',y:', y)) as mp,
|if(id % 4 = 0, null, array(x, y)) as arr,
|if(id % 4 = 1, null, struct(x, y)) as tup,
|if(id % 4 = 2, null, array(str_to_map(concat('x:', x, ',y:', y)))) as arr_mp,
|if(id % 4 = 3, null, map('x', array(x), 'y', array(y))) as mp_arr,
|if(id % 4 = 0, null, named_struct('a', array(x, y))) as tup_arr,
|if(id % 4 = 1, null, named_struct('m',
|str_to_map(concat('x:', x, ',y:', y)))) as tup_map
|from
|data_source;""".stripMargin

nativeWrite2(
format => (table(format), create(format), insert(format)),
(table_name, format) => {
val vanilla_table = s"${table_name}_v"
val vanilla_create = create(format, Some(vanilla_table))
vanillaWrite {
withDestinationTable(vanilla_table, Option(vanilla_create)) {
checkInsertQuery(insert(format, Some(vanilla_table)), checkNative = false)
}
}
val rowsFromOriginTable =
spark.sql(s"select * from $vanilla_table").collect()
val dfFromWriteTable =
spark.sql(s"select * from $table_name")
checkAnswer(dfFromWriteTable, rowsFromOriginTable)
}
)
}
}
2 changes: 1 addition & 1 deletion cpp-ch/clickhouse.version
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
CH_ORG=Kyligence
CH_BRANCH=rebase_ch/20241118
CH_COMMIT=7f22fe487c88d3b988ea82a5c34882da23ea6289
CH_COMMIT=a5944dfb7b3
1 change: 1 addition & 0 deletions cpp-ch/local-engine/Common/CHUtil.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -710,6 +710,7 @@ void BackendInitializerUtil::initSettings(const SparkConfigs::ConfigMap & spark_
settings.set("max_parsing_threads", 1);
settings.set("max_download_threads", 1);
settings.set("input_format_parquet_enable_row_group_prefetch", false);
settings.set("output_format_parquet_use_custom_encoder", false);

/// update per https://github.com/ClickHouse/ClickHouse/pull/71539
/// if true, we can't get correct metrics for the query
Expand Down
123 changes: 123 additions & 0 deletions cpp-ch/local-engine/Storages/Output/NormalFileWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,134 @@

#include <QueryPipeline/QueryPipeline.h>
#include <Poco/URI.h>
#include <Columns/ColumnConst.h>
#include <Columns/ColumnArray.h>
#include <Columns/ColumnMap.h>

namespace local_engine
{

using namespace DB;

const std::string SubstraitFileSink::NO_PARTITION_ID{"__NO_PARTITION_ID__"};
const std::string SparkPartitionedBaseSink::DEFAULT_PARTITION_NAME{"__HIVE_DEFAULT_PARTITION__"};

/// For Nullable(Map(K, V)) or Nullable(Array(T)), if the i-th row is null, we must make sure its nested data is empty.
/// It is for ORC/Parquet writing compatiability. For more details, refer to
/// https://github.com/apache/incubator-gluten/issues/8022 and https://github.com/apache/incubator-gluten/issues/8021
static ColumnPtr truncateNestedArrayOrMapIfNull(const ColumnPtr & column)
{
if (const auto * col_const = checkAndGetColumn<ColumnConst>(column.get()))
{
size_t s = col_const->size();
auto new_data = truncateNestedArrayOrMapIfNull(col_const->getDataColumnPtr());
return ColumnConst::create(std::move(new_data), s);
}
else if (const auto * col_array = checkAndGetColumn<ColumnArray>(column.get()))
{
auto new_data = truncateNestedArrayOrMapIfNull(col_array->getDataPtr());
return ColumnArray::create(std::move(new_data), col_array->getOffsetsPtr());
}
else if (const auto * col_map = checkAndGetColumn<ColumnMap>(column.get()))
{
auto new_nested = truncateNestedArrayOrMapIfNull(col_map->getNestedColumnPtr());
return ColumnMap::create(std::move(new_nested));
}
else if (const auto * col_tuple = checkAndGetColumn<ColumnTuple>(column.get()))
{
Columns new_columns;
for (size_t i = 0; i < col_tuple->tupleSize(); ++i)
new_columns.emplace_back(truncateNestedArrayOrMapIfNull(col_tuple->getColumnPtr(i)));
return ColumnTuple::create(std::move(new_columns));
}
else if (const auto * col_nullable = checkAndGetColumn<ColumnNullable>(column.get()))
{
const auto & null_map = col_nullable->getNullMapData();
auto nested = truncateNestedArrayOrMapIfNull(col_nullable->getNestedColumnPtr());
const auto * nested_array = checkAndGetColumn<ColumnArray>(nested.get());
const auto * nested_map = checkAndGetColumn<ColumnMap>(nested.get());
const auto * nested_tuple = checkAndGetColumn<ColumnTuple>(nested.get());

if (!memoryIsZero(null_map.data(), 0, null_map.size()) && (nested_array || nested_map || nested_tuple))
{
/// Process Nullable(Array) or Nullable(Map)
if (nested_array || nested_map)
{
if (!nested_array)
nested_array = checkAndGetColumn<ColumnArray>(&nested_map->getNestedColumn());

const auto & offsets = nested_array->getOffsets();
size_t total_data_size = 0;
for (size_t i = 0; i < null_map.size(); ++i)
total_data_size += (offsets[i] - offsets[i - 1]) * (!null_map[i]);

auto new_nested_array = nested_array->cloneEmpty();
new_nested_array->reserve(nested_array->size());
auto & new_nested_array_data = assert_cast<ColumnArray &>(*new_nested_array).getData();
new_nested_array_data.reserve(total_data_size);

for (size_t i = 0; i < null_map.size(); ++i)
if (null_map[i])
new_nested_array->insertDefault();
else
new_nested_array->insertFrom(*nested_array, i);

if (nested_map)
{
auto new_nested_map = ColumnMap::create(std::move(new_nested_array));
return ColumnNullable::create(std::move(new_nested_map), col_nullable->getNullMapColumnPtr());
}
else
{
return ColumnNullable::create(std::move(new_nested_array), col_nullable->getNullMapColumnPtr());
}
}
else
{
/// Process Nullable(Tuple)
const auto & nested_columns = nested_tuple->getColumns();
Columns new_nested_columns(nested_columns.size());
for (size_t i = 0; i < nested_columns.size(); ++i)
{
const auto & nested_column = nested_columns[i];
TypeIndex type_index = nested_column->getDataType();
if (const auto * nullable_nested_column = checkAndGetColumn<ColumnNullable>(nested_column.get()))
type_index = nullable_nested_column->getNestedColumnPtr()->getDataType();

bool should_truncate = type_index == TypeIndex::Array || type_index == TypeIndex::Map || type_index == TypeIndex::Tuple;
if (should_truncate)
{
auto new_nested_column = nested_column->cloneEmpty();
new_nested_column->reserve(nested_column->size());
for (size_t j = 0; j < null_map.size(); ++j)
{
if (null_map[j])
new_nested_column->insertDefault();
else
new_nested_column->insertFrom(*nested_column, j);
}
new_nested_columns[i] = std::move(new_nested_column);
}
else
{
new_nested_columns[i] = nested_column;
}
}

auto new_nested_tuple = ColumnTuple::create(std::move(new_nested_columns));
return ColumnNullable::create(std::move(new_nested_tuple), col_nullable->getNullMapColumnPtr());
}
}
else
{
auto new_nested = truncateNestedArrayOrMapIfNull(nested);
return ColumnNullable::create(std::move(new_nested), col_nullable->getNullMapColumnPtr());
}
}
else
return column;
}

NormalFileWriter::NormalFileWriter(const OutputFormatFilePtr & file_, const DB::ContextPtr & context_) : file(file_), context(context_)
{
}
Expand All @@ -50,6 +171,8 @@ void NormalFileWriter::write(DB::Block & block)
continue;

const auto & preferred_column = preferred_schema.getByPosition(index++);
/// Make sure nested array or map data is empty when the row is null in Nullable(Map(K, V)) or Nullable(Array(T)).
column.column = truncateNestedArrayOrMapIfNull(column.column);
column.column = DB::castColumn(column, preferred_column.type);
column.name = preferred_column.name;
column.type = preferred_column.type;
Expand Down