Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: binding struct values when they are parsed as row #2371

Merged
merged 47 commits into from
May 12, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
47 commits
Select commit Hold shift + click to select a range
6da4c48
feat:bind func
cykbls01 May 4, 2022
c4dda45
fix
cykbls01 May 4, 2022
61e90ab
feat/nullif_expr
cykbls01 May 4, 2022
faf442a
feat: add test
cykbls01 May 4, 2022
b6d7b5d
Merge branch 'main' into feat/nullif_and_coalesce
cykbls01 May 4, 2022
c4c53c5
fix
cykbls01 May 4, 2022
a9f717d
fix
cykbls01 May 4, 2022
00d6aef
fix
cykbls01 May 4, 2022
b789f74
fix
cykbls01 May 4, 2022
1d2c3f4
feat:coalesce
cykbls01 May 5, 2022
7a4d983
add test
cykbls01 May 5, 2022
6493e69
fix
cykbls01 May 5, 2022
7dec8ee
Merge branch 'main' into feat/nullif_and_coalesce
cykbls01 May 5, 2022
030670e
fix
cykbls01 May 5, 2022
e2435b7
fix
cykbls01 May 5, 2022
45f6f19
fix: make case support null
cykbls01 May 5, 2022
17ce870
fix: make case support null
cykbls01 May 5, 2022
c2318ed
fix
cykbls01 May 5, 2022
d71f203
fix
cykbls01 May 5, 2022
98c751b
fix
cykbls01 May 6, 2022
f351919
fix
cykbls01 May 6, 2022
9497e37
fix
cykbls01 May 7, 2022
3595deb
fix
cykbls01 May 7, 2022
ee031dc
fix
cykbls01 May 8, 2022
db2be86
fix
cykbls01 May 8, 2022
6b2dacd
fix
cykbls01 May 8, 2022
b8c3383
fix
cykbls01 May 8, 2022
b253d60
fix
cykbls01 May 8, 2022
fca4aa1
Merge branch 'main' into feat/insert_struct_data
cykbls01 May 8, 2022
d4c3153
fix
cykbls01 May 9, 2022
ee6c873
Merge branch 'main' into feat/insert_struct_data
cykbls01 May 9, 2022
fd23fc5
fix
cykbls01 May 9, 2022
e2e8bc5
Merge branch 'main' into feat/insert_struct_data
cykbls01 May 9, 2022
78c589c
Merge branch 'main' into feat/insert_struct_data
cykbls01 May 9, 2022
9e34ea6
fix
cykbls01 May 9, 2022
c9f76b3
Merge branch 'main' into feat/insert_struct_data
cykbls01 May 10, 2022
cfa1b74
fix
cykbls01 May 10, 2022
fce1836
Merge branch 'feat/insert_struct_data' of github.com:singularity-data…
cykbls01 May 10, 2022
4c827a8
fix
cykbls01 May 10, 2022
d222f47
Merge branch 'main' into feat/insert_struct_data
cykbls01 May 10, 2022
7445b84
Merge branch 'feat/insert_struct_data' of github.com:singularity-data…
cykbls01 May 10, 2022
a29d816
Merge branch 'main' into feat/insert_struct_data
cykbls01 May 10, 2022
924431d
Merge branch 'main' into feat/insert_struct_data
cykbls01 May 11, 2022
40791ca
Merge branch 'main' into feat/insert_struct_data
cykbls01 May 11, 2022
413b973
fix
cykbls01 May 12, 2022
c82805a
fix
cykbls01 May 12, 2022
b1b4902
Merge branch 'main' into feat/insert_struct_data
cykbls01 May 12, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 37 additions & 6 deletions src/batch/src/executor2/insert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ mod tests {
use std::sync::Arc;

use futures::StreamExt;
use risingwave_common::array::{Array, I64Array};
use risingwave_common::array::{Array, I32Array, I64Array, StructArray};
use risingwave_common::catalog::{schema_test_utils, ColumnDesc, ColumnId};
use risingwave_common::column_nonnull;
use risingwave_common::types::DataType;
Expand All @@ -194,13 +194,19 @@ mod tests {
let source_manager = Arc::new(MemSourceManager::default());
let store = MemoryStateStore::new();

// Make struct field
let struct_field = Field::unnamed(DataType::Struct {
fields: vec![DataType::Int32, DataType::Int32, DataType::Int32].into(),
});

// Schema for mock executor.
let schema = schema_test_utils::ii();
let mut schema = schema_test_utils::ii();
schema.fields.push(struct_field.clone());
let mut mock_executor = MockExecutor::new(schema.clone());

// Schema of the table
let schema = schema_test_utils::iii();

let mut schema = schema_test_utils::iii();
schema.fields.push(struct_field);
let table_columns: Vec<_> = schema
.fields
.iter()
Expand All @@ -216,7 +222,19 @@ mod tests {

let col1 = column_nonnull! { I64Array, [1, 3, 5, 7, 9] };
let col2 = column_nonnull! { I64Array, [2, 4, 6, 8, 10] };
let data_chunk: DataChunk = DataChunk::builder().columns(vec![col1, col2]).build();
let array = StructArray::from_slices(
&[true, false, false, false, false],
vec![
array! { I32Array, [Some(1),None,None,None,None] }.into(),
array! { I32Array, [Some(2),None,None,None,None] }.into(),
array! { I32Array, [Some(3),None,None,None,None] }.into(),
],
vec![DataType::Int32, DataType::Int32, DataType::Int32],
)
.map(|x| Arc::new(x.into()))
.unwrap();
let col3 = Column::new(array);
let data_chunk: DataChunk = DataChunk::builder().columns(vec![col1, col2, col3]).build();
mock_executor.add(data_chunk.clone());

// Create the table.
Expand All @@ -227,7 +245,7 @@ mod tests {
let source_desc = source_manager.get_source(&table_id)?;
let source = source_desc.source.as_table_v2().unwrap();
let mut reader = source
.stream_reader(vec![0.into(), 1.into(), 2.into()])
.stream_reader(vec![0.into(), 1.into(), 2.into(), 3.into()])
.await?;

// Insert
Expand Down Expand Up @@ -276,6 +294,19 @@ mod tests {
vec![Some(2), Some(4), Some(6), Some(8), Some(10)]
);

let array: ArrayImpl = StructArray::from_slices(
&[true, false, false, false, false],
vec![
array! { I32Array, [Some(1),None,None,None,None] }.into(),
array! { I32Array, [Some(2),None,None,None,None] }.into(),
array! { I32Array, [Some(3),None,None,None,None] }.into(),
],
vec![DataType::Int32, DataType::Int32, DataType::Int32],
)
.unwrap()
.into();
assert_eq!(*chunk.chunk.columns()[2].array(), array);

// There's nothing in store since `TableSourceV2` has no side effect.
// Data will be materialized in associated streaming task.
let epoch = u64::MAX;
Expand Down
34 changes: 33 additions & 1 deletion src/batch/src/executor2/values.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,9 +143,12 @@ impl BoxedExecutor2Builder for ValuesExecutor2 {

#[cfg(test)]
mod tests {

use futures::stream::StreamExt;
use risingwave_common::array;
use risingwave_common::array::{I16Array, I32Array, I64Array};
use risingwave_common::array::{
ArrayImpl, I16Array, I32Array, I64Array, StructArray, StructValue,
};
use risingwave_common::catalog::{Field, Schema};
use risingwave_common::types::{DataType, ScalarImpl};
use risingwave_expr::expr::{BoxedExpression, LiteralExpression};
Expand All @@ -154,6 +157,11 @@ mod tests {

#[tokio::test]
async fn test_values_executor() {
let value = StructValue::new(vec![
Some(ScalarImpl::Int32(1)),
Some(ScalarImpl::Int32(2)),
Some(ScalarImpl::Int32(3)),
]);
let exprs = vec![
Box::new(LiteralExpression::new(
DataType::Int16,
Expand All @@ -167,6 +175,12 @@ mod tests {
DataType::Int64,
Some(ScalarImpl::Int64(3)),
)),
Box::new(LiteralExpression::new(
DataType::Struct {
fields: vec![DataType::Int32, DataType::Int32, DataType::Int32].into(),
},
Some(ScalarImpl::Struct(value)),
)) as BoxedExpression,
];

let fields = exprs
Expand All @@ -185,9 +199,26 @@ mod tests {
assert_eq!(fields[0].data_type, DataType::Int16);
assert_eq!(fields[1].data_type, DataType::Int32);
assert_eq!(fields[2].data_type, DataType::Int64);
assert_eq!(
fields[3].data_type,
DataType::Struct {
fields: vec![DataType::Int32, DataType::Int32, DataType::Int32].into()
}
);

let mut stream = values_executor.execute();
let result = stream.next().await.unwrap();
let array: ArrayImpl = StructArray::from_slices(
&[true],
vec![
array! { I32Array, [Some(1)] }.into(),
array! { I32Array, [Some(2)] }.into(),
array! { I32Array, [Some(3)] }.into(),
],
vec![DataType::Int32, DataType::Int32, DataType::Int32],
)
.unwrap()
.into();

if let Ok(result) = result {
assert_eq!(
Expand All @@ -202,6 +233,7 @@ mod tests {
*result.column_at(2).array(),
array! {I64Array, [Some(3)]}.into()
);
assert_eq!(*result.column_at(3).array(), array);
}
}

Expand Down
4 changes: 4 additions & 0 deletions src/common/src/array/list_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,10 @@ impl ListValue {
pub fn new(values: Vec<Datum>) -> Self {
Self { values }
}

pub fn values(&self) -> &[Datum] {
&self.values
}
}

#[derive(Copy, Clone)]
Expand Down
12 changes: 10 additions & 2 deletions src/common/src/array/struct_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -275,8 +275,12 @@ pub struct StructValue {
}

impl fmt::Display for StructValue {
fn fmt(&self, _f: &mut fmt::Formatter<'_>) -> fmt::Result {
Ok(())
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(
f,
"{{{}}}",
self.fields.iter().map(|f| format!("{:?}", f)).join(", ")
)
}
}

Expand All @@ -296,6 +300,10 @@ impl StructValue {
pub fn new(fields: Vec<Datum>) -> Self {
Self { fields }
}

pub fn fields(&self) -> &[Datum] {
&self.fields
}
}

#[derive(Copy, Clone)]
Expand Down
13 changes: 11 additions & 2 deletions src/common/src/types/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,7 @@ use bytes::{Buf, BufMut};
use risingwave_pb::data::DataType as ProstDataType;
use serde::{Deserialize, Serialize};

use crate::error::{ErrorCode, Result, RwError};

use crate::error::{internal_error, ErrorCode, Result, RwError};
mod native_type;

mod scalar_impl;
Expand Down Expand Up @@ -327,6 +326,16 @@ for_all_scalar_variants! { scalar_impl_enum }
pub type Datum = Option<ScalarImpl>;
pub type DatumRef<'a> = Option<ScalarRefImpl<'a>>;

pub fn get_data_type_from_datum(datum: &Datum) -> Result<DataType> {
match datum {
// TODO: Predicate data type from None Datum
None => Err(internal_error(
Copy link
Contributor

@neverchanje neverchanje May 12, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the data type of a literal should be unknown until it's inferred by some means, think of such case:

postgres=# create type foo as (v1 int, v2 int);                                                                                           CREATE TYPE
postgres=# create table t (v1 foo);
CREATE TABLE
postgres=# SELECT * FROM t WHERE v1 > (1, null);
 v1
----
(0 rows)

How can we determine the type of (1, null) until it's inferred from v1? (given the rule that v1 and (1, null) must be the same type, so the null's type must be int).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see.., will fix in follow-up issue.

"cannot get data type from None Datum".to_string(),
)),
Some(scalar) => scalar.data_type(),
}
}

/// Convert a [`Datum`] to a [`DatumRef`].
pub fn to_datum_ref(datum: &Datum) -> DatumRef<'_> {
datum.as_ref().map(|d| d.as_scalar_ref_impl())
Expand Down
34 changes: 34 additions & 0 deletions src/common/src/types/scalar_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,40 @@ impl ScalarImpl {
}
for_all_scalar_variants! { impl_all_get_ident, self }
}

pub(crate) fn data_type(&self) -> Result<DataType> {
let data_type = match self {
ScalarImpl::Int16(_) => DataType::Int16,
ScalarImpl::Int32(_) => DataType::Int32,
ScalarImpl::Int64(_) => DataType::Int64,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIRC timestamp with time zone is also stored as ScalarImpl::Int64 but has DataType::Timestampz.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for mention, I will try to fix in follow pr.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just bringing it up. So that we are aware of this tricky issue in type system and can see which design is better. I do need see an immediate way to "fix" - in the current design, mapping from ScalarImpl to DataType is not unique; alternatively, we may add a chrono wrapper for timestampz.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me see, I found that acutally I don't need this function, I refactor this part #2508.

ScalarImpl::Float32(_) => DataType::Float32,
ScalarImpl::Float64(_) => DataType::Float64,
ScalarImpl::Utf8(_) => DataType::Varchar,
ScalarImpl::Bool(_) => DataType::Boolean,
ScalarImpl::Decimal(_) => DataType::Decimal,
ScalarImpl::Interval(_) => DataType::Interval,
ScalarImpl::NaiveDate(_) => DataType::Date,
ScalarImpl::NaiveDateTime(_) => DataType::Timestamp,
ScalarImpl::NaiveTime(_) => DataType::Time,
ScalarImpl::Struct(data) => {
let types = data
.fields()
.iter()
.map(get_data_type_from_datum)
.collect::<Result<Vec<_>>>()?;
DataType::Struct {
fields: types.into(),
}
}
ScalarImpl::List(data) => {
let data = data.values().get(0).ok_or_else(|| {
internal_error("cannot get data type from empty list".to_string())
})?;
get_data_type_from_datum(data)?
}
};
Ok(data_type)
}
}

impl<'scalar> ScalarRefImpl<'scalar> {
Expand Down
1 change: 1 addition & 0 deletions src/expr/src/expr/expr_literal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ fn literal_type_match(return_type: &DataType, literal: Option<&ScalarImpl>) -> b
| (DataType::Timestamp, ScalarImpl::NaiveDateTime(_))
| (DataType::Decimal, ScalarImpl::Decimal(_))
| (DataType::Interval, ScalarImpl::Interval(_))
| (DataType::Struct { .. }, ScalarImpl::Struct(_))
)
}
None => true,
Expand Down
1 change: 1 addition & 0 deletions src/frontend/src/binder/expr/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ impl Binder {
list,
negated,
} => self.bind_in_list(*expr, list, negated),
Expr::Row(exprs) => Ok(ExprImpl::Literal(Box::new(self.bind_row(&exprs)?))),
_ => Err(ErrorCode::NotImplemented(
format!("unsupported expression {:?}", expr),
112.into(),
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/binder/expr/value.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use crate::binder::Binder;
use crate::expr::Literal;

impl Binder {
pub(super) fn bind_value(&mut self, value: Value) -> Result<Literal> {
pub fn bind_value(&mut self, value: Value) -> Result<Literal> {
match value {
Value::Number(s, b) => self.bind_number(s, b),
Value::SingleQuotedString(s) => self.bind_string(s),
Expand Down
37 changes: 33 additions & 4 deletions src/frontend/src/binder/values.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,15 @@
// limitations under the License.

use itertools::Itertools;
use risingwave_common::array::StructValue;
use risingwave_common::catalog::{Field, Schema};
use risingwave_common::error::{ErrorCode, Result};
use risingwave_common::types::DataType;
use risingwave_sqlparser::ast::Values;
use risingwave_common::error::{ErrorCode, Result, TrackingIssue};
use risingwave_common::types::{get_data_type_from_datum, DataType, Datum, Scalar};
use risingwave_sqlparser::ast::{Expr, Values};

use super::bind_context::Clause;
use crate::binder::Binder;
use crate::expr::{align_types, ExprImpl};
use crate::expr::{align_types, ExprImpl, Literal};

#[derive(Debug)]
pub struct BoundValues {
Expand Down Expand Up @@ -80,6 +81,34 @@ impl Binder {
schema,
})
}

/// Bind row to `struct_value` for nested column,
/// e.g. Row(1,2,(1,2,3)).
/// Only accept value and row expr in row.
pub fn bind_row(&mut self, exprs: &[Expr]) -> Result<Literal> {
let datums = exprs
.iter()
.map(|e| match e {
Expr::Value(value) => Ok(self.bind_value(value.clone())?.get_data().clone()),
Expr::Row(expr) => Ok(self.bind_row(expr)?.get_data().clone()),
_ => Err(ErrorCode::NotImplemented(
format!("unsupported expression {:?}", e),
TrackingIssue::none(),
)
.into()),
})
.collect::<Result<Vec<Datum>>>()?;
let value = StructValue::new(datums);
let data_type = DataType::Struct {
fields: value
.fields()
.iter()
.map(get_data_type_from_datum)
.collect::<Result<Vec<_>>>()?
.into(),
};
Ok(Literal::new(Some(value.to_scalar_value()), data_type))
}
}

#[cfg(test)]
Expand Down
1 change: 1 addition & 0 deletions src/frontend/src/expr/type_inference.rs
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,7 @@ fn build_type_derive_map() -> HashMap<FuncSign, DataTypeName> {
E::GreaterThanOrEqual,
];
build_binary_cmp_funcs(&mut map, cmp_exprs, &num_types);
build_binary_cmp_funcs(&mut map, cmp_exprs, &[T::Struct]);
build_binary_cmp_funcs(&mut map, cmp_exprs, &[T::Date, T::Timestamp, T::Timestampz]);
build_binary_cmp_funcs(&mut map, cmp_exprs, &[T::Time, T::Interval]);
for e in cmp_exprs {
Expand Down
10 changes: 8 additions & 2 deletions src/frontend/test_runner/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ pub struct CreateSource {
row_format: String,
name: String,
file: Option<String>,
materialized: Option<bool>,
}

#[serde_with::skip_serializing_none]
Expand Down Expand Up @@ -200,11 +201,16 @@ impl TestCase {
match self.create_source.clone() {
Some(source) => {
if let Some(content) = source.file {
let materialized = if let Some(true) = source.materialized {
"materialized".to_string()
} else {
"".to_string()
};
let sql = format!(
r#"CREATE SOURCE {}
r#"CREATE {} SOURCE {}
WITH ('kafka.topic' = 'abc', 'kafka.servers' = 'localhost:1001')
ROW FORMAT {} MESSAGE '.test.TestRecord' ROW SCHEMA LOCATION 'file://"#,
source.name, source.row_format
materialized, source.name, source.row_format
);
let temp_file = create_proto_file(content.as_str());
self.run_sql(
Expand Down
Loading