Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: binding struct values when they are parsed as row #2371

Merged
merged 47 commits into from
May 12, 2022
Merged
Show file tree
Hide file tree
Changes from 35 commits
Commits
Show all changes
47 commits
Select commit Hold shift + click to select a range
6da4c48
feat:bind func
cykbls01 May 4, 2022
c4dda45
fix
cykbls01 May 4, 2022
61e90ab
feat/nullif_expr
cykbls01 May 4, 2022
faf442a
feat: add test
cykbls01 May 4, 2022
b6d7b5d
Merge branch 'main' into feat/nullif_and_coalesce
cykbls01 May 4, 2022
c4c53c5
fix
cykbls01 May 4, 2022
a9f717d
fix
cykbls01 May 4, 2022
00d6aef
fix
cykbls01 May 4, 2022
b789f74
fix
cykbls01 May 4, 2022
1d2c3f4
feat:coalesce
cykbls01 May 5, 2022
7a4d983
add test
cykbls01 May 5, 2022
6493e69
fix
cykbls01 May 5, 2022
7dec8ee
Merge branch 'main' into feat/nullif_and_coalesce
cykbls01 May 5, 2022
030670e
fix
cykbls01 May 5, 2022
e2435b7
fix
cykbls01 May 5, 2022
45f6f19
fix: make case support null
cykbls01 May 5, 2022
17ce870
fix: make case support null
cykbls01 May 5, 2022
c2318ed
fix
cykbls01 May 5, 2022
d71f203
fix
cykbls01 May 5, 2022
98c751b
fix
cykbls01 May 6, 2022
f351919
fix
cykbls01 May 6, 2022
9497e37
fix
cykbls01 May 7, 2022
3595deb
fix
cykbls01 May 7, 2022
ee031dc
fix
cykbls01 May 8, 2022
db2be86
fix
cykbls01 May 8, 2022
6b2dacd
fix
cykbls01 May 8, 2022
b8c3383
fix
cykbls01 May 8, 2022
b253d60
fix
cykbls01 May 8, 2022
fca4aa1
Merge branch 'main' into feat/insert_struct_data
cykbls01 May 8, 2022
d4c3153
fix
cykbls01 May 9, 2022
ee6c873
Merge branch 'main' into feat/insert_struct_data
cykbls01 May 9, 2022
fd23fc5
fix
cykbls01 May 9, 2022
e2e8bc5
Merge branch 'main' into feat/insert_struct_data
cykbls01 May 9, 2022
78c589c
Merge branch 'main' into feat/insert_struct_data
cykbls01 May 9, 2022
9e34ea6
fix
cykbls01 May 9, 2022
c9f76b3
Merge branch 'main' into feat/insert_struct_data
cykbls01 May 10, 2022
cfa1b74
fix
cykbls01 May 10, 2022
fce1836
Merge branch 'feat/insert_struct_data' of github.com:singularity-data…
cykbls01 May 10, 2022
4c827a8
fix
cykbls01 May 10, 2022
d222f47
Merge branch 'main' into feat/insert_struct_data
cykbls01 May 10, 2022
7445b84
Merge branch 'feat/insert_struct_data' of github.com:singularity-data…
cykbls01 May 10, 2022
a29d816
Merge branch 'main' into feat/insert_struct_data
cykbls01 May 10, 2022
924431d
Merge branch 'main' into feat/insert_struct_data
cykbls01 May 11, 2022
40791ca
Merge branch 'main' into feat/insert_struct_data
cykbls01 May 11, 2022
413b973
fix
cykbls01 May 12, 2022
c82805a
fix
cykbls01 May 12, 2022
b1b4902
Merge branch 'main' into feat/insert_struct_data
cykbls01 May 12, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 37 additions & 6 deletions src/batch/src/executor2/insert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ mod tests {
use std::sync::Arc;

use futures::StreamExt;
use risingwave_common::array::{Array, I64Array};
use risingwave_common::array::{Array, I32Array, I64Array, StructArray};
use risingwave_common::catalog::{schema_test_utils, ColumnDesc, ColumnId};
use risingwave_common::column_nonnull;
use risingwave_common::types::DataType;
Expand All @@ -194,13 +194,19 @@ mod tests {
let source_manager = Arc::new(MemSourceManager::default());
let store = MemoryStateStore::new();

// Make struct field
let struct_field = Field::unnamed(DataType::Struct {
fields: vec![DataType::Int32, DataType::Int32, DataType::Int32].into(),
});

// Schema for mock executor.
let schema = schema_test_utils::ii();
let mut schema = schema_test_utils::ii();
schema.fields.push(struct_field.clone());
let mut mock_executor = MockExecutor::new(schema.clone());

// Schema of the table
let schema = schema_test_utils::iii();

let mut schema = schema_test_utils::iii();
schema.fields.push(struct_field);
let table_columns: Vec<_> = schema
.fields
.iter()
Expand All @@ -216,7 +222,19 @@ mod tests {

let col1 = column_nonnull! { I64Array, [1, 3, 5, 7, 9] };
let col2 = column_nonnull! { I64Array, [2, 4, 6, 8, 10] };
let data_chunk: DataChunk = DataChunk::builder().columns(vec![col1, col2]).build();
let array = StructArray::from_slices(
&[true, false, false, false, false],
vec![
array! { I32Array, [Some(1),None,None,None,None] }.into(),
array! { I32Array, [Some(2),None,None,None,None] }.into(),
array! { I32Array, [Some(3),None,None,None,None] }.into(),
],
vec![DataType::Int32, DataType::Int32, DataType::Int32],
)
.map(|x| Arc::new(x.into()))
.unwrap();
let col3 = Column::new(array);
let data_chunk: DataChunk = DataChunk::builder().columns(vec![col1, col2, col3]).build();
mock_executor.add(data_chunk.clone());

// Create the table.
Expand All @@ -227,7 +245,7 @@ mod tests {
let source_desc = source_manager.get_source(&table_id)?;
let source = source_desc.source.as_table_v2().unwrap();
let mut reader = source
.stream_reader(vec![0.into(), 1.into(), 2.into()])
.stream_reader(vec![0.into(), 1.into(), 2.into(), 3.into()])
.await?;

// Insert
Expand Down Expand Up @@ -276,6 +294,19 @@ mod tests {
vec![Some(2), Some(4), Some(6), Some(8), Some(10)]
);

let array: ArrayImpl = StructArray::from_slices(
&[true, false, false, false, false],
vec![
array! { I32Array, [Some(1),None,None,None,None] }.into(),
array! { I32Array, [Some(2),None,None,None,None] }.into(),
array! { I32Array, [Some(3),None,None,None,None] }.into(),
],
vec![DataType::Int32, DataType::Int32, DataType::Int32],
)
.unwrap()
.into();
assert_eq!(*chunk.chunk.columns()[2].array(), array);

// There's nothing in store since `TableSourceV2` has no side effect.
// Data will be materialized in associated streaming task.
let epoch = u64::MAX;
Expand Down
34 changes: 33 additions & 1 deletion src/batch/src/executor2/values.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,9 +143,12 @@ impl BoxedExecutor2Builder for ValuesExecutor2 {

#[cfg(test)]
mod tests {

use futures::stream::StreamExt;
use risingwave_common::array;
use risingwave_common::array::{I16Array, I32Array, I64Array};
use risingwave_common::array::{
ArrayImpl, I16Array, I32Array, I64Array, StructArray, StructValue,
};
use risingwave_common::catalog::{Field, Schema};
use risingwave_common::types::{DataType, ScalarImpl};
use risingwave_expr::expr::{BoxedExpression, LiteralExpression};
Expand All @@ -154,6 +157,11 @@ mod tests {

#[tokio::test]
async fn test_values_executor() {
let value = StructValue::new(vec![
Some(ScalarImpl::Int32(1)),
Some(ScalarImpl::Int32(2)),
Some(ScalarImpl::Int32(3)),
]);
let exprs = vec![
Box::new(LiteralExpression::new(
DataType::Int16,
Expand All @@ -167,6 +175,12 @@ mod tests {
DataType::Int64,
Some(ScalarImpl::Int64(3)),
)),
Box::new(LiteralExpression::new(
DataType::Struct {
fields: vec![DataType::Int32, DataType::Int32, DataType::Int32].into(),
},
Some(ScalarImpl::Struct(value)),
)) as BoxedExpression,
];

let fields = exprs
Expand All @@ -185,9 +199,26 @@ mod tests {
assert_eq!(fields[0].data_type, DataType::Int16);
assert_eq!(fields[1].data_type, DataType::Int32);
assert_eq!(fields[2].data_type, DataType::Int64);
assert_eq!(
fields[3].data_type,
DataType::Struct {
fields: vec![DataType::Int32, DataType::Int32, DataType::Int32].into()
}
);

let mut stream = values_executor.execute();
let result = stream.next().await.unwrap();
let array: ArrayImpl = StructArray::from_slices(
&[true],
vec![
array! { I32Array, [Some(1)] }.into(),
array! { I32Array, [Some(2)] }.into(),
array! { I32Array, [Some(3)] }.into(),
],
vec![DataType::Int32, DataType::Int32, DataType::Int32],
)
.unwrap()
.into();

if let Ok(result) = result {
assert_eq!(
Expand All @@ -202,6 +233,7 @@ mod tests {
*result.column_at(2).array(),
array! {I64Array, [Some(3)]}.into()
);
assert_eq!(*result.column_at(3).array(), array);
}
}

Expand Down
4 changes: 4 additions & 0 deletions src/common/src/array/list_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,10 @@ impl ListValue {
pub fn new(values: Vec<Datum>) -> Self {
Self { values }
}

pub fn get_values(&self) -> &[Datum] {
cykbls01 marked this conversation as resolved.
Show resolved Hide resolved
&self.values
}
}

#[derive(Copy, Clone)]
Expand Down
8 changes: 6 additions & 2 deletions src/common/src/array/struct_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -275,8 +275,8 @@ pub struct StructValue {
}

impl fmt::Display for StructValue {
fn fmt(&self, _f: &mut fmt::Formatter<'_>) -> fmt::Result {
Ok(())
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{:?}", self.fields)
cykbls01 marked this conversation as resolved.
Show resolved Hide resolved
}
}

Expand All @@ -296,6 +296,10 @@ impl StructValue {
pub fn new(fields: Vec<Datum>) -> Self {
Self { fields }
}

pub fn get_fields(&self) -> &[Datum] {
cykbls01 marked this conversation as resolved.
Show resolved Hide resolved
&self.fields
}
}

#[derive(Copy, Clone)]
Expand Down
45 changes: 44 additions & 1 deletion src/common/src/types/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use bytes::{Buf, BufMut};
use risingwave_pb::data::DataType as ProstDataType;
use serde::{Deserialize, Serialize};

use crate::error::{ErrorCode, Result, RwError};
use crate::error::{internal_error, ErrorCode, Result, RwError};
mod native_type;

mod scalar_impl;
Expand Down Expand Up @@ -142,6 +142,49 @@ impl DataType {
})
}

/// Transfer `datum_type` to `data_type`.
pub fn datum_type_to_data_type(datum: &Datum) -> Result<DataType> {
cykbls01 marked this conversation as resolved.
Show resolved Hide resolved
let scalar = match datum {
Some(scalar) => scalar,
None => {
return Err(internal_error(
"cannot get data type from None Datum".to_string(),
));
}
};
let data_type = match &scalar {
ScalarImpl::Int16(_) => DataType::Int16,
ScalarImpl::Int32(_) => DataType::Int32,
ScalarImpl::Int64(_) => DataType::Int64,
ScalarImpl::Float32(_) => DataType::Float32,
ScalarImpl::Float64(_) => DataType::Float64,
ScalarImpl::Utf8(_) => DataType::Varchar,
ScalarImpl::Bool(_) => DataType::Boolean,
ScalarImpl::Decimal(_) => DataType::Decimal,
ScalarImpl::Interval(_) => DataType::Interval,
ScalarImpl::NaiveDate(_) => DataType::Date,
ScalarImpl::NaiveDateTime(_) => DataType::Timestamp,
ScalarImpl::NaiveTime(_) => DataType::Time,
ScalarImpl::Struct(data) => {
let types = data
.get_fields()
.iter()
.map(DataType::datum_type_to_data_type)
.collect::<Result<Vec<_>>>()?;
DataType::Struct {
fields: types.into(),
}
}
ScalarImpl::List(data) => {
let data = data.get_values().get(0).ok_or_else(|| {
internal_error("cannot get data type from empty list".to_string())
})?;
DataType::datum_type_to_data_type(data)?
}
};
Ok(data_type)
}

fn prost_type_name(&self) -> TypeName {
match self {
DataType::Int16 => TypeName::Int16,
Expand Down
1 change: 1 addition & 0 deletions src/expr/src/expr/expr_literal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ fn literal_type_match(return_type: &DataType, literal: Option<&ScalarImpl>) -> b
| (DataType::Timestamp, ScalarImpl::NaiveDateTime(_))
| (DataType::Decimal, ScalarImpl::Decimal(_))
| (DataType::Interval, ScalarImpl::Interval(_))
| (DataType::Struct { .. }, ScalarImpl::Struct(_))
)
}
None => true,
Expand Down
19 changes: 17 additions & 2 deletions src/frontend/src/binder/expr/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,14 @@

use itertools::zip_eq;
use risingwave_common::error::{ErrorCode, Result};
use risingwave_common::types::DataType;
use risingwave_common::types::{DataType, Scalar};
use risingwave_sqlparser::ast::{
BinaryOperator, DataType as AstDataType, DateTimeField, Expr, Query, TrimWhereField,
UnaryOperator,
};

use crate::binder::Binder;
use crate::expr::{Expr as _, ExprImpl, ExprType, FunctionCall, SubqueryKind};
use crate::expr::{Expr as _, ExprImpl, ExprType, FunctionCall, Literal, SubqueryKind};

mod binary_op;
mod column;
Expand Down Expand Up @@ -102,6 +102,21 @@ impl Binder {
list,
negated,
} => self.bind_in_list(*expr, list, negated),
Expr::Row(exprs) => {
let value = self.bind_row(&exprs)?;
let data_type = DataType::Struct {
fields: value
.get_fields()
.iter()
.map(DataType::datum_type_to_data_type)
.collect::<Result<Vec<_>>>()?
.into(),
};
Ok(ExprImpl::Literal(Box::new(Literal::new(
Some(value.to_scalar_value()),
data_type,
))))
}
_ => Err(ErrorCode::NotImplemented(
format!("unsupported expression {:?}", expr),
112.into(),
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/binder/expr/value.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use crate::binder::Binder;
use crate::expr::Literal;

impl Binder {
pub(super) fn bind_value(&mut self, value: Value) -> Result<Literal> {
pub fn bind_value(&mut self, value: Value) -> Result<Literal> {
match value {
Value::Number(s, b) => self.bind_number(s, b),
Value::SingleQuotedString(s) => self.bind_string(s),
Expand Down
24 changes: 22 additions & 2 deletions src/frontend/src/binder/values.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,11 @@
// limitations under the License.

use itertools::Itertools;
use risingwave_common::array::StructValue;
use risingwave_common::catalog::{Field, Schema};
use risingwave_common::error::{ErrorCode, Result};
use risingwave_common::types::DataType;
use risingwave_sqlparser::ast::Values;
use risingwave_common::types::{DataType, Datum, Scalar};
use risingwave_sqlparser::ast::{Expr, Values};

use super::bind_context::Clause;
use crate::binder::Binder;
Expand Down Expand Up @@ -80,6 +81,25 @@ impl Binder {
schema,
})
}

/// Bind row to `struct_value` for nested column,
/// e.g. Row(1,2,(1,2,3)).
/// Only accept value and row expr in row.
pub fn bind_row(&mut self, exprs: &[Expr]) -> Result<StructValue> {
cykbls01 marked this conversation as resolved.
Show resolved Hide resolved
let fields = exprs
.iter()
.map(|e| match e {
Expr::Value(value) => Ok(self.bind_value(value.clone())?.get_data().clone()),
Expr::Row(expr) => Ok(Some(self.bind_row(expr)?.to_scalar_value())),
_ => Err(ErrorCode::NotImplemented(
format!("unsupported expression {:?}", e),
112.into(),
cykbls01 marked this conversation as resolved.
Show resolved Hide resolved
)
.into()),
})
.collect::<Result<Vec<Datum>>>()?;
Ok(StructValue::new(fields))
}
}

#[cfg(test)]
Expand Down
1 change: 1 addition & 0 deletions src/frontend/src/expr/type_inference.rs
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,7 @@ fn build_type_derive_map() -> HashMap<FuncSign, DataTypeName> {
E::GreaterThanOrEqual,
];
build_binary_cmp_funcs(&mut map, cmp_exprs, &num_types);
build_binary_cmp_funcs(&mut map, cmp_exprs, &[T::Struct]);
build_binary_cmp_funcs(&mut map, cmp_exprs, &[T::Date, T::Timestamp, T::Timestampz]);
build_binary_cmp_funcs(&mut map, cmp_exprs, &[T::Time, T::Interval]);
for e in cmp_exprs {
Expand Down
10 changes: 8 additions & 2 deletions src/frontend/test_runner/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ pub struct CreateSource {
row_format: String,
name: String,
file: Option<String>,
materialized: Option<bool>,
}

#[serde_with::skip_serializing_none]
Expand Down Expand Up @@ -200,11 +201,16 @@ impl TestCase {
match self.create_source.clone() {
Some(source) => {
if let Some(content) = source.file {
let materialized = if let Some(true) = source.materialized {
"materialized".to_string()
} else {
"".to_string()
};
let sql = format!(
r#"CREATE SOURCE {}
r#"CREATE {} SOURCE {}
WITH ('kafka.topic' = 'abc', 'kafka.servers' = 'localhost:1001')
ROW FORMAT {} MESSAGE '.test.TestRecord' ROW SCHEMA LOCATION 'file://"#,
source.name, source.row_format
materialized, source.name, source.row_format
);
let temp_file = create_proto_file(content.as_str());
self.run_sql(
Expand Down
Loading