Skip to content

Commit

Permalink
feat: orc support fill missing tuple field (#17247)
Browse files Browse the repository at this point in the history
* tmp orc speed up

* missing field of tuple as null.

* feat: deal will array of tuple which missing fields

* fix: cluster key use wrong index.

* cleanup

* fill missing tuple field only for orc

* add test for cluster by

* fix clippy
  • Loading branch information
youngsofun authored Jan 16, 2025
1 parent e9fc0a1 commit ab0d1a8
Show file tree
Hide file tree
Showing 9 changed files with 273 additions and 31 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -639,7 +639,7 @@ deltalake = { git = "https://github.com/delta-io/delta-rs", rev = "3038c145" }
ethnum = { git = "https://github.com/datafuse-extras/ethnum-rs", rev = "4cb05f1" }
openai_api_rust = { git = "https://github.com/datafuse-extras/openai-api", rev = "819a0ed" }
openraft = { git = "https://github.com/databendlabs/openraft", tag = "v0.10.0-alpha.7" }
orc-rust = { git = "https://github.com/datafusion-contrib/orc-rust", rev = "dfb1ede" }
orc-rust = { git = "https://github.com/youngsofun/orc-rust", rev = "6c5ac57" }
recursive = { git = "https://github.com/datafuse-extras/recursive.git", rev = "6af35a1" }
sled = { git = "https://github.com/datafuse-extras/sled", tag = "v0.34.7-datafuse.1" }
tantivy = { git = "https://github.com/datafuse-extras/tantivy", rev = "7502370" }
Expand Down
13 changes: 5 additions & 8 deletions src/query/sql/src/planner/binder/copy_into_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -182,15 +182,12 @@ impl Binder {
files: stmt.files.clone(),
pattern,
};
let required_values_schema: DataSchemaRef = Arc::new(
match &stmt.dst_columns {
Some(cols) => self.schema_project(&table.schema(), cols)?,
None => self.schema_project(&table.schema(), &[])?,
}
.into(),
);
let stage_schema = match &stmt.dst_columns {
Some(cols) => self.schema_project(&table.schema(), cols)?,
None => self.schema_project(&table.schema(), &[])?,
};

let stage_schema = infer_table_schema(&required_values_schema)?;
let required_values_schema: DataSchemaRef = Arc::new(stage_schema.clone().into());

let default_values = if stage_info.file_format_params.need_field_default() {
Some(
Expand Down
25 changes: 24 additions & 1 deletion src/query/sql/src/planner/expression_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -395,7 +395,30 @@ pub fn parse_cluster_keys(
table_meta: Arc<dyn Table>,
ast_exprs: Vec<AExpr>,
) -> Result<Vec<Expr>> {
let exprs = parse_ast_exprs(ctx, table_meta, ast_exprs)?;
let schema = table_meta.schema();
let (mut bind_context, metadata) = bind_table(table_meta)?;
let settings = ctx.get_settings();
let name_resolution_ctx = NameResolutionContext::try_from(settings.as_ref())?;
let mut type_checker = TypeChecker::try_create(
&mut bind_context,
ctx,
&name_resolution_ctx,
metadata,
&[],
false,
)?;

let exprs: Vec<Expr> = ast_exprs
.iter()
.map(|ast| {
let (scalar, _) = *type_checker.resolve(ast)?;
let expr = scalar
.as_expr()?
.project_column_ref(|col| schema.index_of(&col.column_name).unwrap());
Ok(expr)
})
.collect::<Result<_>>()?;

let mut res = Vec::with_capacity(exprs.len());
for expr in exprs {
let inner_type = expr.data_type().remove_nullable();
Expand Down
193 changes: 173 additions & 20 deletions src/query/storages/common/stage/src/read/columnar/projection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,18 @@

use databend_common_exception::ErrorCode;
use databend_common_expression::type_check::check_cast;
use databend_common_expression::type_check::check_function;
use databend_common_expression::types::DataType;
use databend_common_expression::types::NumberScalar;
use databend_common_expression::Expr;
use databend_common_expression::RemoteExpr;
use databend_common_expression::Scalar;
use databend_common_expression::TableDataType;
use databend_common_expression::TableField;
use databend_common_expression::TableSchemaRef;
use databend_common_functions::BUILTIN_FUNCTIONS;
use databend_common_meta_app::principal::NullAs;
use databend_common_meta_app::principal::StageFileFormatType;

use crate::read::cast::load_can_auto_cast_to;

Expand All @@ -30,10 +36,11 @@ use crate::read::cast::load_can_auto_cast_to;
pub fn project_columnar(
input_schema: &TableSchemaRef,
output_schema: &TableSchemaRef,
null_as: &NullAs,
missing_as: &NullAs,
default_values: &Option<Vec<RemoteExpr>>,
location: &str,
case_sensitive: bool,
fmt: StageFileFormatType,
) -> databend_common_exception::Result<(Vec<Expr>, Vec<usize>)> {
let mut pushdown_columns = vec![];
let mut output_projection = vec![];
Expand Down Expand Up @@ -68,32 +75,96 @@ pub fn project_columnar(

if from_field.data_type == to_field.data_type {
expr
} else {
// note: tuple field name is dropped here, matched by pos here
if load_can_auto_cast_to(
&from_field.data_type().into(),
} else if load_can_auto_cast_to(
&from_field.data_type().into(),
&to_field.data_type().into(),
) {
check_cast(
None,
false,
expr,
&to_field.data_type().into(),
&BUILTIN_FUNCTIONS,
)?
} else if fmt == StageFileFormatType::Orc && !matches!(missing_as, NullAs::Error) {
// special cast for tuple type, fill in default values for the missing fields.
match (
from_field.data_type.remove_nullable(),
to_field.data_type.remove_nullable(),
) {
check_cast(
None,
false,
(
TableDataType::Array(box TableDataType::Nullable(
box TableDataType::Tuple {
fields_name: from_fields_name,
fields_type: _from_fields_type,
},
)),
TableDataType::Array(box TableDataType::Nullable(
box TableDataType::Tuple {
fields_name: to_fields_name,
fields_type: _to_fields_type,
},
)),
) => {
let mut v = vec![];
for to in to_fields_name.iter() {
match from_fields_name.iter().position(|k| k == to) {
Some(p) => v.push(p as i32),
None => v.push(-1),
};
}
let name = v
.iter()
.map(|v| v.to_string())
.collect::<Vec<_>>()
.join(",");
Expr::ColumnRef {
span: None,
id: pos,
data_type: from_field.data_type().into(),
display_name: format!("#!{name}",),
}
}
(
TableDataType::Tuple {
fields_name: from_fields_name,
fields_type: from_fields_type,
},
TableDataType::Tuple {
fields_name: to_fields_name,
fields_type: to_fields_type,
},
) => project_tuple(
expr,
&to_field.data_type().into(),
&BUILTIN_FUNCTIONS,
)?
} else {
return Err(ErrorCode::BadDataValueType(format!(
"fail to load file {}: Cannot cast column {} from {:?} to {:?}",
location,
field_name,
from_field.data_type(),
to_field.data_type()
)));
from_field,
to_field,
&from_fields_name,
&from_fields_type,
&to_fields_name,
&to_fields_type,
)?,
(_, _) => {
return Err(ErrorCode::BadDataValueType(format!(
"fail to load file {}: Cannot cast column {} from {:?} to {:?}",
location,
field_name,
from_field.data_type(),
to_field.data_type()
)));
}
}
} else {
return Err(ErrorCode::BadDataValueType(format!(
"fail to load file {}: Cannot cast column {} from {:?} to {:?}",
location,
field_name,
from_field.data_type(),
to_field.data_type()
)));
}
}
0 => {
match null_as {
match missing_as {
// default
NullAs::Error => {
return Err(ErrorCode::BadDataValueType(format!(
Expand Down Expand Up @@ -139,3 +210,85 @@ pub fn project_columnar(
}
Ok((output_projection, pushdown_columns))
}

fn project_tuple(
expr: Expr,
from_field: &TableField,
to_field: &TableField,
from_fields_name: &[String],
from_fields_type: &[TableDataType],
to_fields_name: &[String],
to_fields_type: &[TableDataType],
) -> databend_common_exception::Result<Expr> {
let mut inner_columns = Vec::with_capacity(to_fields_name.len());

for (to_field_name, to_field_type) in to_fields_name.iter().zip(to_fields_type.iter()) {
let inner_column = match from_fields_name.iter().position(|k| k == to_field_name) {
Some(idx) => {
let from_field_type = from_fields_type.get(idx).unwrap();
let tuple_idx = Scalar::Number(NumberScalar::Int64((idx + 1) as i64));
let inner_column = check_function(
None,
"get",
&[tuple_idx],
&[expr.clone()],
&BUILTIN_FUNCTIONS,
)?;
if from_field_type != to_field_type {
check_cast(
None,
false,
inner_column,
&to_field_type.into(),
&BUILTIN_FUNCTIONS,
)?
} else {
inner_column
}
}
None => {
// if inner field not exists, fill default value.
let data_type: DataType = to_field_type.into();
let scalar = Scalar::default_value(&data_type);
Expr::Constant {
span: None,
scalar,
data_type,
}
}
};
inner_columns.push(inner_column);
}
let tuple_column = check_function(None, "tuple", &[], &inner_columns, &BUILTIN_FUNCTIONS)?;
let tuple_column = if from_field.data_type != to_field.data_type {
let dest_ty: DataType = (&to_field.data_type).into();
check_cast(None, false, tuple_column, &dest_ty, &BUILTIN_FUNCTIONS)?
} else {
tuple_column
};

if from_field.data_type.is_nullable() && to_field.data_type.is_nullable() {
// add if function to cast null value
let is_not_null = check_function(
None,
"is_not_null",
&[],
&[expr.clone()],
&BUILTIN_FUNCTIONS,
)?;
let null_scalar = Expr::Constant {
span: None,
scalar: Scalar::Null,
data_type: DataType::Null,
};
check_function(
None,
"if",
&[],
&[is_not_null, tuple_column, null_scalar],
&BUILTIN_FUNCTIONS,
)
} else {
Ok(tuple_column)
}
}
59 changes: 59 additions & 0 deletions src/query/storages/orc/src/copy_into_table/processors/decoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,19 @@ use arrow_array::RecordBatch;
use databend_common_catalog::query_kind::QueryKind;
use databend_common_catalog::table_context::TableContext;
use databend_common_exception::Result;
use databend_common_expression::types::ArrayColumn;
use databend_common_expression::types::DataType;
use databend_common_expression::types::NullableColumn;
use databend_common_expression::BlockEntry;
use databend_common_expression::BlockMetaInfoDowncast;
use databend_common_expression::Column;
use databend_common_expression::ColumnBuilder;
use databend_common_expression::DataBlock;
use databend_common_expression::DataSchemaRef;
use databend_common_expression::Evaluator;
use databend_common_expression::Expr;
use databend_common_expression::FunctionContext;
use databend_common_expression::Value;
use databend_common_functions::BUILTIN_FUNCTIONS;
use databend_common_pipeline_core::processors::Event;
use databend_common_pipeline_core::processors::InputPort;
Expand Down Expand Up @@ -96,6 +102,59 @@ impl StripeDecoderForCopy {
let evaluator = Evaluator::new(&block, &self.func_ctx, &BUILTIN_FUNCTIONS);
let mut columns = Vec::with_capacity(projection.len());
for (field, expr) in self.output_schema.fields().iter().zip(projection.iter()) {
if let Expr::ColumnRef {
display_name, id, ..
} = expr
{
if let Some(display_name) = display_name.strip_prefix("#!") {
let typs = match field.data_type() {
DataType::Nullable(box DataType::Array(box DataType::Nullable(
box DataType::Tuple(v),
))) => v,
_ => {
log::error!("expect array of tuple, got {:?}", field);
unreachable!("expect value: array of tuple")
}
};
let positions = display_name
.split(',')
.map(|s| s.parse::<i32>().unwrap())
.collect::<Vec<i32>>();
let mut e = block.columns()[*id].clone();
match e.value {
Value::Column(Column::Nullable(box NullableColumn {
column:
Column::Array(box ArrayColumn {
values:
Column::Nullable(box NullableColumn {
column: Column::Tuple(ref mut v),
..
}),
..
}),
..
})) => {
let len = v[0].len();
let mut v2 = vec![];
for (i, p) in positions.iter().enumerate() {
if *p < 0 {
v2.push(ColumnBuilder::repeat_default(&typs[i], len).build());
} else {
v2.push(v[*p as usize].clone());
}
}
*v = v2
}
_ => {
log::error!("expect array of tuple, got {:?} {:?}", field, e.value);
unreachable!("expect value: array of tuple")
}
}
let column = BlockEntry::new(field.data_type().clone(), e.value);
columns.push(column);
continue;
}
}
let value = evaluator.run(expr)?;
let column = BlockEntry::new(field.data_type().clone(), value);
columns.push(column);
Expand Down
Loading

0 comments on commit ab0d1a8

Please sign in to comment.