From e48547dac8c7083a13d9537f9f16e4cc855bc4e7 Mon Sep 17 00:00:00 2001 From: Runji Wang Date: Wed, 25 Oct 2023 16:41:16 +0800 Subject: [PATCH 1/3] refactor(type): switch jsonb to flat representation (#12952) Signed-off-by: Runji Wang --- Cargo.lock | 14 ++ src/common/Cargo.toml | 1 + src/common/src/array/jsonb_array.rs | 128 ++++------- src/common/src/array/proto_reader.rs | 4 +- src/common/src/array/value_reader.rs | 15 +- src/common/src/test_utils/rand_array.rs | 2 +- src/common/src/types/jsonb.rs | 261 ++++++++++++++--------- src/common/src/types/mod.rs | 4 +- src/expr/impl/Cargo.toml | 1 + src/expr/impl/benches/expr.rs | 31 ++- src/expr/impl/src/aggregate/jsonb_agg.rs | 150 +++++++++++-- src/expr/impl/src/scalar/jsonb_concat.rs | 28 +-- src/expr/macro/src/gen.rs | 86 +++++--- src/expr/macro/src/lib.rs | 2 + src/expr/macro/src/parse.rs | 30 ++- 15 files changed, 472 insertions(+), 285 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 3c9dbe9602508..cf1f1f0e493e9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4117,6 +4117,18 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "jsonbb" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "44376417b2ff0cd879b5c84976fa9e0855c316321b4e0502e33e52963bf84f74" +dependencies = [ + "bytes", + "serde", + "serde_json", + "smallvec", +] + [[package]] name = "jsonschema-transpiler" version = "1.10.0" @@ -7073,6 +7085,7 @@ dependencies = [ "hytra", "itertools 0.11.0", "itoa", + "jsonbb", "libc", "lru 0.7.6", "mach2", @@ -7450,6 +7463,7 @@ dependencies = [ "futures-util", "hex", "itertools 0.11.0", + "jsonbb", "madsim-tokio", "md5", "num-traits", diff --git a/src/common/Cargo.toml b/src/common/Cargo.toml index ddd1fe5a33cdb..168ba836d4c1b 100644 --- a/src/common/Cargo.toml +++ b/src/common/Cargo.toml @@ -49,6 +49,7 @@ hyper = "0.14" hytra = { workspace = true } itertools = "0.11" itoa = "1.0" +jsonbb = "0.1" lru = { git = "https://github.com/risingwavelabs/lru-rs.git", rev = "cb2d7c7" } memcomparable = { version = "0.2", features = ["decimal"] } num-integer = "0.1" diff --git a/src/common/src/array/jsonb_array.rs b/src/common/src/array/jsonb_array.rs index 0e9ba7c48511d..3c4ca23fff04e 100644 --- a/src/common/src/array/jsonb_array.rs +++ b/src/common/src/array/jsonb_array.rs @@ -12,36 +12,35 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::mem::size_of; - use risingwave_pb::data::{PbArray, PbArrayType}; -use serde_json::Value; -use super::{Array, ArrayBuilder}; +use super::{Array, ArrayBuilder, ArrayImpl, ArrayResult}; use crate::buffer::{Bitmap, BitmapBuilder}; use crate::estimate_size::EstimateSize; -use crate::types::{DataType, JsonbRef, JsonbVal, F32, F64}; -use crate::util::iter_util::ZipEqFast; +use crate::types::{DataType, JsonbRef, JsonbVal, Scalar}; #[derive(Debug)] pub struct JsonbArrayBuilder { bitmap: BitmapBuilder, - data: Vec, + builder: jsonbb::Builder, } #[derive(Debug, Clone, PartialEq, Eq)] pub struct JsonbArray { bitmap: Bitmap, - data: Vec, + /// Elements are stored as a single JSONB array value. + data: jsonbb::Value, } impl ArrayBuilder for JsonbArrayBuilder { type ArrayType = JsonbArray; fn new(capacity: usize) -> Self { + let mut builder = jsonbb::Builder::with_capacity(capacity); + builder.begin_array(); Self { bitmap: BitmapBuilder::with_capacity(capacity), - data: Vec::with_capacity(capacity), + builder, } } @@ -54,13 +53,15 @@ impl ArrayBuilder for JsonbArrayBuilder { match value { Some(x) => { self.bitmap.append_n(n, true); - self.data - .extend(std::iter::repeat(x).take(n).map(|x| x.0.clone())); + for _ in 0..n { + self.builder.add_value(x.0); + } } None => { self.bitmap.append_n(n, false); - self.data - .extend(std::iter::repeat(*JsonbVal::dummy().0).take(n)); + for _ in 0..n { + self.builder.add_null(); + } } } } @@ -69,29 +70,44 @@ impl ArrayBuilder for JsonbArrayBuilder { for bit in other.bitmap.iter() { self.bitmap.append(bit); } - self.data.extend_from_slice(&other.data); + for value in other.data.as_array().unwrap().iter() { + self.builder.add_value(value); + } } fn pop(&mut self) -> Option<()> { - self.data.pop().map(|_| self.bitmap.pop().unwrap()) + self.bitmap.pop()?; + self.builder.pop(); + Some(()) } fn len(&self) -> usize { self.bitmap.len() } - fn finish(self) -> Self::ArrayType { + fn finish(mut self) -> Self::ArrayType { + self.builder.end_array(); Self::ArrayType { bitmap: self.bitmap.finish(), - data: self.data, + data: self.builder.finish(), } } } -impl JsonbArrayBuilder { - pub fn append_move(&mut self, value: JsonbVal) { - self.bitmap.append(true); - self.data.push(*value.0); +impl JsonbArray { + /// Loads a `JsonbArray` from a protobuf array. + /// + /// See also `JsonbArray::to_protobuf`. + pub fn from_protobuf(array: &PbArray) -> ArrayResult { + ensure!( + array.values.len() == 1, + "Must have exactly 1 buffer in a jsonb array" + ); + let arr = JsonbArray { + bitmap: array.get_null_bitmap()?.into(), + data: jsonbb::Value::from_bytes(&array.values[0].body), + }; + Ok(arr.into()) } } @@ -101,52 +117,23 @@ impl Array for JsonbArray { type RefItem<'a> = JsonbRef<'a>; unsafe fn raw_value_at_unchecked(&self, idx: usize) -> Self::RefItem<'_> { - JsonbRef(self.data.get_unchecked(idx)) + JsonbRef(self.data.as_array().unwrap().get(idx).unwrap()) } fn len(&self) -> usize { - self.data.len() + self.bitmap.len() } fn to_protobuf(&self) -> PbArray { - // The memory layout contains `serde_json::Value` trees, but in protobuf we transmit this as - // variable length bytes in value encoding. That is, one buffer of length n+1 containing - // start and end offsets into the 2nd buffer containing all value bytes concatenated. - use risingwave_pb::common::buffer::CompressionType; use risingwave_pb::common::Buffer; - let mut offset_buffer = - Vec::::with_capacity((1 + self.data.len()) * std::mem::size_of::()); - let mut data_buffer = Vec::::with_capacity(self.data.len()); - - let mut offset = 0; - for (v, not_null) in self.data.iter().zip_eq_fast(self.null_bitmap().iter()) { - if !not_null { - continue; - } - let d = JsonbRef(v).value_serialize(); - offset_buffer.extend_from_slice(&(offset as u64).to_be_bytes()); - data_buffer.extend_from_slice(&d); - offset += d.len(); - } - offset_buffer.extend_from_slice(&(offset as u64).to_be_bytes()); - - let values = vec![ - Buffer { - compression: CompressionType::None as i32, - body: offset_buffer, - }, - Buffer { - compression: CompressionType::None as i32, - body: data_buffer, - }, - ]; - - let null_bitmap = self.null_bitmap().to_protobuf(); PbArray { - null_bitmap: Some(null_bitmap), - values, + null_bitmap: Some(self.null_bitmap().to_protobuf()), + values: vec![Buffer { + compression: CompressionType::None as i32, + body: self.data.as_bytes().to_vec(), + }], array_type: PbArrayType::Jsonb as i32, struct_array_data: None, list_array_data: None, @@ -176,7 +163,7 @@ impl FromIterator> for JsonbArray { let mut builder = ::Builder::new(iter.size_hint().0); for i in iter { match i { - Some(x) => builder.append_move(x), + Some(x) => builder.append(Some(x.as_scalar_ref())), None => builder.append(None), } } @@ -190,31 +177,8 @@ impl FromIterator for JsonbArray { } } -// TODO: We need to fix this later. impl EstimateSize for JsonbArray { fn estimated_heap_size(&self) -> usize { - self.bitmap.estimated_heap_size() + self.data.capacity() * size_of::() - } -} - -impl From for Value { - fn from(v: F32) -> Value { - serde_json::Number::from_f64(v.0 as f64) - .expect("todo: convert Inf/NaN to jsonb") - .into() - } -} - -impl From for Value { - fn from(v: F64) -> Value { - serde_json::Number::from_f64(v.0) - .expect("todo: convert Inf/NaN to jsonb") - .into() - } -} - -impl From> for Value { - fn from(v: JsonbRef<'_>) -> Value { - v.0.clone() + self.bitmap.estimated_heap_size() + self.data.capacity() } } diff --git a/src/common/src/array/proto_reader.rs b/src/common/src/array/proto_reader.rs index 55d505343dadd..4ca6bf7b70d05 100644 --- a/src/common/src/array/proto_reader.rs +++ b/src/common/src/array/proto_reader.rs @@ -52,9 +52,7 @@ impl ArrayImpl { PbArrayType::Timestamp => read_timestamp_array(array, cardinality)?, PbArrayType::Timestamptz => read_timestamptz_array(array, cardinality)?, PbArrayType::Interval => read_interval_array(array, cardinality)?, - PbArrayType::Jsonb => { - read_string_array::(array, cardinality)? - } + PbArrayType::Jsonb => JsonbArray::from_protobuf(array)?, PbArrayType::Struct => StructArray::from_protobuf(array)?, PbArrayType::List => ListArray::from_protobuf(array)?, PbArrayType::Unspecified => unreachable!(), diff --git a/src/common/src/array/value_reader.rs b/src/common/src/array/value_reader.rs index 96ed7c31b88aa..45db47f23242b 100644 --- a/src/common/src/array/value_reader.rs +++ b/src/common/src/array/value_reader.rs @@ -19,8 +19,7 @@ use byteorder::{BigEndian, ReadBytesExt}; use super::ArrayResult; use crate::array::{ - ArrayBuilder, BytesArrayBuilder, JsonbArrayBuilder, PrimitiveArrayItemType, Serial, - Utf8ArrayBuilder, + ArrayBuilder, BytesArrayBuilder, PrimitiveArrayItemType, Serial, Utf8ArrayBuilder, }; use crate::types::{Decimal, F32, F64}; @@ -89,15 +88,3 @@ impl VarSizedValueReader for BytesValueReader { Ok(()) } } - -pub struct JsonbValueReader; - -impl VarSizedValueReader for JsonbValueReader { - fn read(buf: &[u8], builder: &mut JsonbArrayBuilder) -> ArrayResult<()> { - let Some(v) = super::JsonbVal::value_deserialize(buf) else { - bail!("failed to read jsonb from bytes"); - }; - builder.append_move(v); - Ok(()) - } -} diff --git a/src/common/src/test_utils/rand_array.rs b/src/common/src/test_utils/rand_array.rs index 70d0cb73d4dfa..f2dd8ad42854b 100644 --- a/src/common/src/test_utils/rand_array.rs +++ b/src/common/src/test_utils/rand_array.rs @@ -135,7 +135,7 @@ impl RandValue for Int256 { impl RandValue for JsonbVal { fn rand_value(_rand: &mut R) -> Self { - JsonbVal::dummy() + JsonbVal::null() } } diff --git a/src/common/src/types/jsonb.rs b/src/common/src/types/jsonb.rs index 7f4c002037060..590b693e47891 100644 --- a/src/common/src/types/jsonb.rs +++ b/src/common/src/types/jsonb.rs @@ -15,23 +15,21 @@ use std::fmt; use std::hash::Hash; -use postgres_types::{FromSql as _, ToSql as _, Type}; -use serde_json::Value; +use bytes::Buf; +use jsonbb::{Value, ValueRef}; use crate::estimate_size::EstimateSize; -use crate::types::{Scalar, ScalarRef}; +use crate::types::{Scalar, ScalarRef, F32, F64}; -#[derive(Debug, Clone, PartialEq, Eq)] -pub struct JsonbVal(pub(crate) Box); // The `Box` is just to keep `size_of::` smaller. +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +pub struct JsonbVal(pub(crate) Value); -#[derive(Debug, Copy, Clone, PartialEq, Eq)] -pub struct JsonbRef<'a>(pub(crate) &'a Value); +#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)] +pub struct JsonbRef<'a>(pub(crate) ValueRef<'a>); impl EstimateSize for JsonbVal { fn estimated_heap_size(&self) -> usize { - // https://github.com/risingwavelabs/risingwave/issues/8957 - // FIXME: correctly handle jsonb size - 0 + self.0.capacity() } } @@ -63,7 +61,7 @@ impl<'a> ScalarRef<'a> for JsonbRef<'a> { type ScalarType = JsonbVal; fn to_owned_scalar(&self) -> Self::ScalarType { - JsonbVal(self.0.clone().into()) + JsonbVal(self.0.into()) } fn hash_scalar(&self, state: &mut H) { @@ -71,22 +69,6 @@ impl<'a> ScalarRef<'a> for JsonbRef<'a> { } } -impl Hash for JsonbRef<'_> { - fn hash(&self, state: &mut H) { - // We do not intend to support hashing `jsonb` type. - // Before #7981 is done, we do not panic but just hash its string representation. - // Note that `serde_json` without feature `preserve_order` uses `BTreeMap` for json object. - // So its string form always have keys sorted. - self.0.to_string().hash(state) - } -} - -impl Hash for JsonbVal { - fn hash(&self, state: &mut H) { - self.0.to_string().hash(state) - } -} - impl PartialOrd for JsonbVal { fn partial_cmp(&self, other: &Self) -> Option { Some(self.cmp(other)) @@ -160,9 +142,7 @@ impl crate::types::to_binary::ToBinary for JsonbRef<'_> { &self, _ty: &crate::types::DataType, ) -> crate::error::Result> { - let mut output = bytes::BytesMut::new(); - self.0.to_sql(&Type::JSONB, &mut output).unwrap(); - Ok(Some(output.freeze())) + Ok(Some(self.value_serialize().into())) } } @@ -170,43 +150,130 @@ impl std::str::FromStr for JsonbVal { type Err = ::Err; fn from_str(s: &str) -> Result { - let v: Value = s.parse()?; - Ok(Self(v.into())) + Ok(Self(s.parse()?)) } } impl JsonbVal { - /// Constructs a value without specific meaning. Usually used as a lightweight placeholder. - pub fn dummy() -> Self { - Self(Value::Null.into()) + /// Returns a jsonb `null`. + pub fn null() -> Self { + Self(Value::null()) + } + + /// Returns an empty array `[]`. + pub fn empty_array() -> Self { + Self(Value::array([])) + } + + /// Returns an empty array `{}`. + pub fn empty_object() -> Self { + Self(Value::object([])) } + /// Deserialize from a memcomparable encoding. pub fn memcmp_deserialize( deserializer: &mut memcomparable::Deserializer, ) -> memcomparable::Result { - let v: Value = ::deserialize(deserializer)? + let v = ::deserialize(deserializer)? .parse() .map_err(|_| memcomparable::Error::Message("invalid json".into()))?; - Ok(Self(v.into())) + Ok(Self(v)) + } + + /// Deserialize from a pgwire "BINARY" encoding. + pub fn value_deserialize(mut buf: &[u8]) -> Option { + if buf.is_empty() || buf.get_u8() != 1 { + return None; + } + Value::from_text(buf).ok().map(Self) + } + + /// Convert the value to a [`serde_json::Value`]. + pub fn take(self) -> serde_json::Value { + self.0.into() + } +} + +impl From for JsonbVal { + fn from(v: serde_json::Value) -> Self { + Self(v.into()) + } +} + +impl From for JsonbVal { + fn from(v: bool) -> Self { + Self(v.into()) + } +} + +impl From for JsonbVal { + fn from(v: i16) -> Self { + Self(v.into()) } +} - pub fn value_deserialize(buf: &[u8]) -> Option { - let v = Value::from_sql(&Type::JSONB, buf).ok()?; - Some(Self(v.into())) +impl From for JsonbVal { + fn from(v: i32) -> Self { + Self(v.into()) } +} - pub fn take(mut self) -> Value { - self.0.take() +impl From for JsonbVal { + fn from(v: i64) -> Self { + Self(v.into()) } +} - pub fn as_serde_mut(&mut self) -> &mut Value { - &mut self.0 +impl From for JsonbVal { + fn from(v: F32) -> Self { + if v.0 == f32::INFINITY { + Self("Infinity".into()) + } else if v.0 == f32::NEG_INFINITY { + Self("-Infinity".into()) + } else if v.0.is_nan() { + Self("NaN".into()) + } else { + Self(v.0.into()) + } + } +} + +// NOTE: Infinite or NaN values are not JSON numbers. They are stored as strings in Postgres. +impl From for JsonbVal { + fn from(v: F64) -> Self { + if v.0 == f64::INFINITY { + Self("Infinity".into()) + } else if v.0 == f64::NEG_INFINITY { + Self("-Infinity".into()) + } else if v.0.is_nan() { + Self("NaN".into()) + } else { + Self(v.0.into()) + } + } +} + +impl From<&str> for JsonbVal { + fn from(v: &str) -> Self { + Self(v.into()) + } +} + +impl From> for JsonbVal { + fn from(v: JsonbRef<'_>) -> Self { + Self(v.0.to_owned()) } } impl From for JsonbVal { fn from(v: Value) -> Self { - Self(v.into()) + Self(v) + } +} + +impl<'a> From> for ValueRef<'a> { + fn from(v: JsonbRef<'a>) -> Self { + v.0 } } @@ -221,49 +288,52 @@ impl<'a> JsonbRef<'a> { serde::Serialize::serialize(&s, serializer) } + /// Serialize to a pgwire "BINARY" encoding. pub fn value_serialize(&self) -> Vec { + use std::io::Write; // Reuse the pgwire "BINARY" encoding for jsonb type. // It is not truly binary, but one byte of version `1u8` followed by string form. // This version number helps us maintain compatibility when we switch to more efficient // encoding later. - let mut output = bytes::BytesMut::new(); - self.0.to_sql(&Type::JSONB, &mut output).unwrap(); - output.freeze().into() + let mut buf = Vec::with_capacity(self.0.capacity()); + buf.push(1); + write!(&mut buf, "{}", self.0).unwrap(); + buf } + /// Returns true if this is a jsonb `null`. pub fn is_jsonb_null(&self) -> bool { - matches!(self.0, Value::Null) + self.0.as_null().is_some() } + /// Returns the type name of this jsonb. + /// + /// Possible values are: `null`, `boolean`, `number`, `string`, `array`, `object`. pub fn type_name(&self) -> &'static str { match self.0 { - Value::Null => "null", - Value::Bool(_) => "boolean", - Value::Number(_) => "number", - Value::String(_) => "string", - Value::Array(_) => "array", - Value::Object(_) => "object", + ValueRef::Null => "null", + ValueRef::Bool(_) => "boolean", + ValueRef::Number(_) => "number", + ValueRef::String(_) => "string", + ValueRef::Array(_) => "array", + ValueRef::Object(_) => "object", } } + /// Returns the length of this json array. pub fn array_len(&self) -> Result { - match self.0 { - Value::Array(v) => Ok(v.len()), - _ => Err(format!( - "cannot get array length of a jsonb {}", - self.type_name() - )), - } + let array = self + .0 + .as_array() + .ok_or_else(|| format!("cannot get array length of a jsonb {}", self.type_name()))?; + Ok(array.len()) } + /// If the JSON is a boolean, returns the associated bool. pub fn as_bool(&self) -> Result { - match self.0 { - Value::Bool(v) => Ok(*v), - _ => Err(format!( - "cannot cast jsonb {} to type boolean", - self.type_name() - )), - } + self.0 + .as_bool() + .ok_or_else(|| format!("cannot cast jsonb {} to type boolean", self.type_name())) } /// Attempt to read jsonb as a JSON number. @@ -271,13 +341,11 @@ impl<'a> JsonbRef<'a> { /// According to RFC 8259, only number within IEEE 754 binary64 (double precision) has good /// interoperability. We do not support arbitrary precision like PostgreSQL `numeric` right now. pub fn as_number(&self) -> Result { - match self.0 { - Value::Number(v) => v.as_f64().ok_or_else(|| "jsonb number out of range".into()), - _ => Err(format!( - "cannot cast jsonb {} to type number", - self.type_name() - )), - } + self.0 + .as_number() + .ok_or_else(|| format!("cannot cast jsonb {} to type number", self.type_name()))? + .as_f64() + .ok_or_else(|| "jsonb number out of range".into()) } /// This is part of the `->>` or `#>>` syntax to access a child as string. @@ -291,9 +359,9 @@ impl<'a> JsonbRef<'a> { /// * Jsonb string is displayed with quotes but treated as its inner value here. pub fn force_str(&self, writer: &mut W) -> std::fmt::Result { match self.0 { - Value::String(v) => writer.write_str(v), - Value::Null => Ok(()), - Value::Bool(_) | Value::Number(_) | Value::Array(_) | Value::Object(_) => { + ValueRef::String(v) => writer.write_str(v), + ValueRef::Null => Ok(()), + ValueRef::Bool(_) | ValueRef::Number(_) | ValueRef::Array(_) | ValueRef::Object(_) => { use crate::types::to_text::ToText as _; self.write_with_type(&crate::types::DataType::Jsonb, writer) } @@ -316,38 +384,33 @@ impl<'a> JsonbRef<'a> { /// Returns an iterator over the elements if this is an array. pub fn array_elements(self) -> Result>, String> { - match &self.0 { - Value::Array(array) => Ok(array.iter().map(Self)), - _ => Err(format!( - "cannot extract elements from a jsonb {}", - self.type_name() - )), - } + let array = self + .0 + .as_array() + .ok_or_else(|| format!("cannot extract elements from a jsonb {}", self.type_name()))?; + Ok(array.iter().map(Self)) } /// Returns an iterator over the keys if this is an object. pub fn object_keys(self) -> Result, String> { - match &self.0 { - Value::Object(object) => Ok(object.keys().map(|s| s.as_str())), - _ => Err(format!( + let object = self.0.as_object().ok_or_else(|| { + format!( "cannot call jsonb_object_keys on a jsonb {}", self.type_name() - )), - } + ) + })?; + Ok(object.keys()) } /// Returns an iterator over the key-value pairs if this is an object. pub fn object_key_values( self, ) -> Result)>, String> { - match &self.0 { - Value::Object(object) => Ok(object.iter().map(|(k, v)| (k.as_str(), Self(v)))), - _ => Err(format!("cannot deconstruct a jsonb {}", self.type_name())), - } - } - - pub fn value(&self) -> &'a Value { - self.0 + let object = self + .0 + .as_object() + .ok_or_else(|| format!("cannot deconstruct a jsonb {}", self.type_name()))?; + Ok(object.iter().map(|(k, v)| (k, Self(v)))) } } diff --git a/src/common/src/types/mod.rs b/src/common/src/types/mod.rs index 83d281c5238e6..386f63280a557 100644 --- a/src/common/src/types/mod.rs +++ b/src/common/src/types/mod.rs @@ -416,7 +416,7 @@ impl DataType { DataType::Timestamptz => ScalarImpl::Timestamptz(Timestamptz::MIN), DataType::Decimal => ScalarImpl::Decimal(Decimal::NegativeInf), DataType::Interval => ScalarImpl::Interval(Interval::MIN), - DataType::Jsonb => ScalarImpl::Jsonb(JsonbVal::dummy()), // NOT `min` #7981 + DataType::Jsonb => ScalarImpl::Jsonb(JsonbVal::null()), // NOT `min` #7981 DataType::Struct(data_types) => ScalarImpl::Struct(StructValue::new( data_types .types() @@ -1303,7 +1303,7 @@ mod tests { ScalarImpl::Interval(Interval::from_month_day_usec(2, 3, 3333)), DataType::Interval, ), - DataTypeName::Jsonb => (ScalarImpl::Jsonb(JsonbVal::dummy()), DataType::Jsonb), + DataTypeName::Jsonb => (ScalarImpl::Jsonb(JsonbVal::null()), DataType::Jsonb), DataTypeName::Struct => ( ScalarImpl::Struct(StructValue::new(vec![ ScalarImpl::Int64(233).into(), diff --git a/src/expr/impl/Cargo.toml b/src/expr/impl/Cargo.toml index 81cd685c4dc27..cc0229f83ebab 100644 --- a/src/expr/impl/Cargo.toml +++ b/src/expr/impl/Cargo.toml @@ -29,6 +29,7 @@ futures-async-stream = { workspace = true } futures-util = "0.3" hex = "0.4" itertools = "0.11" +jsonbb = "0.1" md5 = "0.7" num-traits = "0.2" regex = "1" diff --git a/src/expr/impl/benches/expr.rs b/src/expr/impl/benches/expr.rs index 1e84d8d8e4825..010508c8de45e 100644 --- a/src/expr/impl/benches/expr.rs +++ b/src/expr/impl/benches/expr.rs @@ -170,10 +170,7 @@ fn bench_expr(c: &mut Criterion) { // 25: serial array SerialArray::from_iter((1..=CHUNK_SIZE).map(|i| Serial::from(i as i64))).into_ref(), // 26: jsonb array - JsonbArray::from_iter( - (1..=CHUNK_SIZE).map(|i| JsonbVal::from(serde_json::Value::Number(i.into()))), - ) - .into_ref(), + JsonbArray::from_iter((1..=CHUNK_SIZE).map(|i| JsonbVal::from(i as i64))).into_ref(), // 27: int256 array Int256Array::from_iter((1..=CHUNK_SIZE).map(|_| Int256::from(1))).into_ref(), // 28: extract field for interval @@ -279,16 +276,16 @@ fn bench_expr(c: &mut Criterion) { 'sig: for sig in sigs { if (sig.inputs_type.iter()) .chain([&sig.ret_type]) - .any(|t| !t.is_exact()) + .any(|t| !t.is_exact() || t.as_exact().is_array()) { - // TODO: support struct and list + // TODO: support struct and array println!("todo: {sig:?}"); continue; } if [ - "date_trunc(varchar, timestamptz) -> timestamptz", - "to_timestamp1(varchar, varchar) -> timestamptz", - "to_char(timestamptz, varchar) -> varchar", + "date_trunc(character varying, timestamp with time zone) -> timestamp with time zone", + "to_timestamp1(character varying, character varying) -> timestamp with time zone", + "to_char(timestamp with time zone, character varying) -> character varying", ] .contains(&format!("{sig:?}").as_str()) { @@ -376,6 +373,13 @@ fn bench_expr(c: &mut Criterion) { args: match sig.inputs_type.as_slice() { [] => AggArgs::None, [t] => AggArgs::Unary(t.as_exact().clone(), input_index_for_type(t.as_exact())), + [t1, t2] => AggArgs::Binary( + [t1.as_exact().clone(), t2.as_exact().clone()], + [ + input_index_for_type(t1.as_exact()), + input_index_for_type(t2.as_exact()), + ], + ), _ => { println!("todo: {sig:?}"); continue; @@ -393,6 +397,15 @@ fn bench_expr(c: &mut Criterion) { continue; } }; + let input = match sig.inputs_type.as_slice() { + [] => input.project(&[]), + [t] => input.project(&[input_index_for_type(t.as_exact())]), + [t1, t2] => input.project(&[ + input_index_for_type(t1.as_exact()), + input_index_for_type(t2.as_exact()), + ]), + _ => unreachable!(), + }; c.bench_function(&format!("{sig:?}"), |bencher| { bencher .to_async(FuturesExecutor) diff --git a/src/expr/impl/src/aggregate/jsonb_agg.rs b/src/expr/impl/src/aggregate/jsonb_agg.rs index 8385e2c6a060b..96f5e50da85e3 100644 --- a/src/expr/impl/src/aggregate/jsonb_agg.rs +++ b/src/expr/impl/src/aggregate/jsonb_agg.rs @@ -12,22 +12,21 @@ // See the License for the specific language governing permissions and // limitations under the License. -use risingwave_common::types::JsonbVal; +use risingwave_common::estimate_size::EstimateSize; +use risingwave_common::types::{JsonbRef, JsonbVal, ScalarImpl, F32, F64}; +use risingwave_expr::aggregate::AggStateDyn; use risingwave_expr::{aggregate, ExprError, Result}; -use serde_json::Value; #[aggregate("jsonb_agg(boolean) -> jsonb")] #[aggregate("jsonb_agg(*int) -> jsonb")] #[aggregate("jsonb_agg(*float) -> jsonb")] #[aggregate("jsonb_agg(varchar) -> jsonb")] #[aggregate("jsonb_agg(jsonb) -> jsonb")] -fn jsonb_agg(state: Option, input: Option>) -> JsonbVal { - let mut jsonb = state.unwrap_or_else(|| Value::Array(Vec::with_capacity(1)).into()); - match jsonb.as_serde_mut() { - Value::Array(a) => a.push(input.map_or(Value::Null, Into::into)), - _ => unreachable!("invalid jsonb state"), - }; - jsonb +fn jsonb_agg(state: &mut JsonbArrayState, input: Option) { + match input { + Some(input) => input.add_to(&mut state.0), + None => state.0.add_null(), + } } #[aggregate("jsonb_object_agg(varchar, boolean) -> jsonb")] @@ -36,15 +35,130 @@ fn jsonb_agg(state: Option, input: Option>) -> JsonbV #[aggregate("jsonb_object_agg(varchar, varchar) -> jsonb")] #[aggregate("jsonb_object_agg(varchar, jsonb) -> jsonb")] fn jsonb_object_agg( - state: Option, + state: &mut JsonbObjectState, key: Option<&str>, - value: Option>, -) -> Result { + value: Option, +) -> Result<()> { let key = key.ok_or(ExprError::FieldNameNull)?; - let mut jsonb = state.unwrap_or_else(|| Value::Object(Default::default()).into()); - match jsonb.as_serde_mut() { - Value::Object(map) => map.insert(key.into(), value.map_or(Value::Null, Into::into)), - _ => unreachable!("invalid jsonb state"), - }; - Ok(jsonb) + state.0.add_string(key); + match value { + Some(value) => value.add_to(&mut state.0), + None => state.0.add_null(), + } + Ok(()) +} + +#[derive(Debug)] +struct JsonbArrayState(jsonbb::Builder); + +impl EstimateSize for JsonbArrayState { + fn estimated_heap_size(&self) -> usize { + self.0.capacity() + } +} + +impl AggStateDyn for JsonbArrayState {} + +/// Creates an initial state. +impl Default for JsonbArrayState { + fn default() -> Self { + let mut builder = jsonbb::Builder::default(); + builder.begin_array(); + Self(builder) + } +} + +/// Finishes aggregation and returns the result. +impl From<&JsonbArrayState> for ScalarImpl { + fn from(builder: &JsonbArrayState) -> Self { + // TODO: avoid clone + let mut builder = builder.0.clone(); + builder.end_array(); + let jsonb: JsonbVal = builder.finish().into(); + jsonb.into() + } +} + +#[derive(Debug)] +struct JsonbObjectState(jsonbb::Builder); + +impl EstimateSize for JsonbObjectState { + fn estimated_heap_size(&self) -> usize { + self.0.capacity() + } +} + +impl AggStateDyn for JsonbObjectState {} + +/// Creates an initial state. +impl Default for JsonbObjectState { + fn default() -> Self { + let mut builder = jsonbb::Builder::default(); + builder.begin_object(); + Self(builder) + } +} + +/// Finishes aggregation and returns the result. +impl From<&JsonbObjectState> for ScalarImpl { + fn from(builder: &JsonbObjectState) -> Self { + // TODO: avoid clone + let mut builder = builder.0.clone(); + builder.end_object(); + let jsonb: JsonbVal = builder.finish().into(); + jsonb.into() + } +} + +/// Values that can be converted to JSON. +trait ToJson { + fn add_to(self, builder: &mut jsonbb::Builder); +} + +impl ToJson for bool { + fn add_to(self, builder: &mut jsonbb::Builder) { + builder.add_bool(self); + } +} + +impl ToJson for i16 { + fn add_to(self, builder: &mut jsonbb::Builder) { + builder.add_i64(self as _); + } +} + +impl ToJson for i32 { + fn add_to(self, builder: &mut jsonbb::Builder) { + builder.add_i64(self as _); + } +} + +impl ToJson for i64 { + fn add_to(self, builder: &mut jsonbb::Builder) { + builder.add_i64(self); + } +} + +impl ToJson for F32 { + fn add_to(self, builder: &mut jsonbb::Builder) { + builder.add_f64(self.0 as f64); + } +} + +impl ToJson for F64 { + fn add_to(self, builder: &mut jsonbb::Builder) { + builder.add_f64(self.0); + } +} + +impl ToJson for &str { + fn add_to(self, builder: &mut jsonbb::Builder) { + builder.add_string(self); + } +} + +impl ToJson for JsonbRef<'_> { + fn add_to(self, builder: &mut jsonbb::Builder) { + builder.add_value(self.into()); + } } diff --git a/src/expr/impl/src/scalar/jsonb_concat.rs b/src/expr/impl/src/scalar/jsonb_concat.rs index 6277db8d5b981..db469457bb135 100644 --- a/src/expr/impl/src/scalar/jsonb_concat.rs +++ b/src/expr/impl/src/scalar/jsonb_concat.rs @@ -12,9 +12,9 @@ // See the License for the specific language governing permissions and // limitations under the License. +use jsonbb::{Value, ValueRef}; use risingwave_common::types::{JsonbRef, JsonbVal}; use risingwave_expr::function; -use serde_json::{json, Value}; /// Concatenates the two jsonbs. /// @@ -59,43 +59,35 @@ use serde_json::{json, Value}; /// ``` #[function("jsonb_cat(jsonb, jsonb) -> jsonb")] pub fn jsonb_cat(left: JsonbRef<'_>, right: JsonbRef<'_>) -> JsonbVal { - let left_val = left.value().clone(); - let right_val = right.value().clone(); - match (left_val, right_val) { + match (left.into(), right.into()) { // left and right are object based. // This would have left:{'a':1}, right:{'b':2} -> {'a':1,'b':2} - (Value::Object(mut left_map), Value::Object(right_map)) => { - left_map.extend(right_map); - JsonbVal::from(Value::Object(left_map)) + (ValueRef::Object(left), ValueRef::Object(right)) => { + JsonbVal::from(Value::object(left.iter().chain(right.iter()))) } // left and right are array-based. // This would merge both arrays into one array. // This would have left:[1,2], right:[3,4] -> [1,2,3,4] - (Value::Array(mut left_arr), Value::Array(right_arr)) => { - left_arr.extend(right_arr); - JsonbVal::from(Value::Array(left_arr)) + (ValueRef::Array(left), ValueRef::Array(right)) => { + JsonbVal::from(Value::array(left.iter().chain(right.iter()))) } // One operand is an array, and the other is a single element. // This would insert the non-array value as another element into the array // Eg left:[1,2] right: {'a':1} -> [1,2,{'a':1}] - (Value::Array(mut left_arr), single_val) => { - left_arr.push(single_val); - JsonbVal::from(Value::Array(left_arr)) - } + (ValueRef::Array(left), value) => JsonbVal::from(Value::array(left.iter().chain([value]))), // One operand is an array, and the other is a single element. // This would insert the non-array value as another element into the array // Eg left:{'a':1} right:[1,2] -> [{'a':1},1,2] - (single_val, Value::Array(mut right_arr)) => { - right_arr.insert(0, single_val); - JsonbVal::from(Value::Array(right_arr)) + (value, ValueRef::Array(right)) => { + JsonbVal::from(Value::array([value].into_iter().chain(right.iter()))) } // Both are non-array inputs. // Both elements would be placed together in an array // Eg left:1 right: 2 -> [1,2] - (left, right) => JsonbVal::from(json!([left, right])), + (left, right) => JsonbVal::from(Value::array([left, right])), } } diff --git a/src/expr/macro/src/gen.rs b/src/expr/macro/src/gen.rs index 9155853df5b7b..454d2a3169137 100644 --- a/src/expr/macro/src/gen.rs +++ b/src/expr/macro/src/gen.rs @@ -579,9 +579,13 @@ impl FunctionAttr { /// Generate build function for aggregate function. fn generate_agg_build_fn(&self, user_fn: &AggregateFnOrImpl) -> Result { - let state_type: TokenStream2 = match &self.state { - Some(state) if state == "ref" => types::ref_type(&self.ret).parse().unwrap(), - Some(state) if state != "ref" => types::owned_type(state).parse().unwrap(), + // If the first argument of the aggregate function is of type `&mut T`, + // we assume it is a user defined state type. + let custom_state = user_fn.accumulate().first_mut_ref_arg.as_ref(); + let state_type: TokenStream2 = match (custom_state, &self.state) { + (Some(s), _) => s.parse().unwrap(), + (_, Some(state)) if state == "ref" => types::ref_type(&self.ret).parse().unwrap(), + (_, Some(state)) if state != "ref" => types::owned_type(state).parse().unwrap(), _ => types::owned_type(&self.ret).parse().unwrap(), }; let let_arrays = self @@ -603,24 +607,37 @@ impl FunctionAttr { quote! { let #v = unsafe { #a.value_at_unchecked(row_id) }; } }) .collect_vec(); - let let_state = match &self.state { - Some(s) if s == "ref" => { - quote! { state0.as_ref().map(|x| x.as_scalar_ref_impl().try_into().unwrap()) } - } - _ => quote! { state0.take().map(|s| s.try_into().unwrap()) }, + let downcast_state = if custom_state.is_some() { + quote! { let mut state: &mut #state_type = state0.downcast_mut(); } + } else if let Some(s) = &self.state && s == "ref" { + quote! { let mut state: Option<#state_type> = state0.as_datum_mut().as_ref().map(|x| x.as_scalar_ref_impl().try_into().unwrap()); } + } else { + quote! { let mut state: Option<#state_type> = state0.as_datum_mut().take().map(|s| s.try_into().unwrap()); } }; - let assign_state = match &self.state { - Some(s) if s == "ref" => quote! { state.map(|x| x.to_owned_scalar().into()) }, - _ => quote! { state.map(|s| s.into()) }, + let restore_state = if custom_state.is_some() { + quote! {} + } else if let Some(s) = &self.state && s == "ref" { + quote! { *state0.as_datum_mut() = state.map(|x| x.to_owned_scalar().into()); } + } else { + quote! { *state0.as_datum_mut() = state.map(|s| s.into()); } }; - let create_state = self.init_state.as_ref().map(|state| { + let create_state = if custom_state.is_some() { + quote! { + fn create_state(&self) -> AggregateState { + AggregateState::Any(Box::<#state_type>::default()) + } + } + } else if let Some(state) = &self.init_state { let state: TokenStream2 = state.parse().unwrap(); quote! { fn create_state(&self) -> AggregateState { AggregateState::Datum(Some(#state.into())) } } - }); + } else { + // by default: `AggregateState::Datum(None)` + quote! {} + }; let args = (0..self.args.len()).map(|i| format_ident!("v{i}")); let args = quote! { #(#args,)* }; let panic_on_retract = { @@ -703,17 +720,23 @@ impl FunctionAttr { _ => todo!("multiple arguments are not supported for non-option function"), } } - let get_result = match user_fn { - AggregateFnOrImpl::Impl(impl_) if impl_.finalize.is_some() => { - quote! { - let state = match state { - Some(s) => s.as_scalar_ref_impl().try_into().unwrap(), - None => return Ok(None), - }; - Ok(Some(self.function.finalize(state).into())) - } + let update_state = if custom_state.is_some() { + quote! { _ = #next_state; } + } else { + quote! { state = #next_state; } + }; + let get_result = if custom_state.is_some() { + quote! { Ok(Some(state.downcast_ref::<#state_type>().into())) } + } else if let AggregateFnOrImpl::Impl(impl_) = user_fn && impl_.finalize.is_some() { + quote! { + let state = match state.as_datum() { + Some(s) => s.as_scalar_ref_impl().try_into().unwrap(), + None => return Ok(None), + }; + Ok(Some(self.function.finalize(state).into())) } - _ => quote! { Ok(state.clone()) }, + } else { + quote! { Ok(state.as_datum().clone()) } }; let function_field = match user_fn { AggregateFnOrImpl::Fn(_) => quote! {}, @@ -768,27 +791,25 @@ impl FunctionAttr { async fn update(&self, state0: &mut AggregateState, input: &StreamChunk) -> Result<()> { #(#let_arrays)* - let state0 = state0.as_datum_mut(); - let mut state: Option<#state_type> = #let_state; + #downcast_state for row_id in input.visibility().iter_ones() { let op = unsafe { *input.ops().get_unchecked(row_id) }; #(#let_values)* - state = #next_state; + #update_state } - *state0 = #assign_state; + #restore_state Ok(()) } async fn update_range(&self, state0: &mut AggregateState, input: &StreamChunk, range: Range) -> Result<()> { assert!(range.end <= input.capacity()); #(#let_arrays)* - let state0 = state0.as_datum_mut(); - let mut state: Option<#state_type> = #let_state; + #downcast_state if input.is_compacted() { for row_id in range { let op = unsafe { *input.ops().get_unchecked(row_id) }; #(#let_values)* - state = #next_state; + #update_state } } else { for row_id in input.visibility().iter_ones() { @@ -799,15 +820,14 @@ impl FunctionAttr { } let op = unsafe { *input.ops().get_unchecked(row_id) }; #(#let_values)* - state = #next_state; + #update_state } } - *state0 = #assign_state; + #restore_state Ok(()) } async fn get_result(&self, state: &AggregateState) -> Result { - let state = state.as_datum(); #get_result } } diff --git a/src/expr/macro/src/lib.rs b/src/expr/macro/src/lib.rs index 363fc958b557d..50a99cf3fda22 100644 --- a/src/expr/macro/src/lib.rs +++ b/src/expr/macro/src/lib.rs @@ -522,6 +522,8 @@ struct UserFunctionAttr { retract: bool, /// The argument type are `Option`s. arg_option: bool, + /// If the first argument type is `&mut T`, then `Some(T)`. + first_mut_ref_arg: Option, /// The return type kind. return_type_kind: ReturnTypeKind, /// The kind of inner type `T` in `impl Iterator` diff --git a/src/expr/macro/src/parse.rs b/src/expr/macro/src/parse.rs index 24cc6942afcee..8e2e8c6d0b2f1 100644 --- a/src/expr/macro/src/parse.rs +++ b/src/expr/macro/src/parse.rs @@ -123,6 +123,7 @@ impl From<&syn::Signature> for UserFunctionAttr { context: sig.inputs.iter().any(arg_is_context), retract: last_arg_is_retract(sig), arg_option: args_contain_option(sig), + first_mut_ref_arg: first_mut_ref_arg(sig), return_type_kind, iterator_item_kind, core_return_type, @@ -223,18 +224,15 @@ fn last_arg_is_retract(sig: &syn::Signature) -> bool { /// Check if any argument is `Option`. fn args_contain_option(sig: &syn::Signature) -> bool { - if sig.inputs.is_empty() { - return false; - } for arg in &sig.inputs { let syn::FnArg::Typed(arg) = arg else { - return false; + continue; }; let syn::Type::Path(path) = arg.ty.as_ref() else { - return false; + continue; }; let Some(seg) = path.path.segments.last() else { - return false; + continue; }; if seg.ident == "Option" { return true; @@ -243,6 +241,26 @@ fn args_contain_option(sig: &syn::Signature) -> bool { false } +/// Returns `T` if the first argument (except `self`) is `&mut T`. +fn first_mut_ref_arg(sig: &syn::Signature) -> Option { + let arg = match sig.inputs.first()? { + syn::FnArg::Typed(arg) => arg, + syn::FnArg::Receiver(_) => match sig.inputs.iter().nth(1)? { + syn::FnArg::Typed(arg) => arg, + _ => return None, + }, + }; + let syn::Type::Reference(syn::TypeReference { + elem, + mutability: Some(_), + .. + }) = arg.ty.as_ref() + else { + return None; + }; + Some(elem.to_token_stream().to_string()) +} + /// Check the return type. fn check_type(ty: &syn::Type) -> (ReturnTypeKind, &syn::Type) { if let Some(inner) = strip_outer_type(ty, "Result") { From 09a67abbefb5c7a0c53596e0a5cf557a2fae3664 Mon Sep 17 00:00:00 2001 From: Noel Kwan <47273164+kwannoel@users.noreply.github.com> Date: Wed, 25 Oct 2023 16:49:08 +0800 Subject: [PATCH 2/3] fix: `WAIT` should return error if timeout (#13045) --- src/meta/service/src/ddl_service.rs | 2 +- src/meta/src/rpc/ddl_controller.rs | 5 +++-- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/src/meta/service/src/ddl_service.rs b/src/meta/service/src/ddl_service.rs index 061ff93589163..fac8f89e17b11 100644 --- a/src/meta/service/src/ddl_service.rs +++ b/src/meta/service/src/ddl_service.rs @@ -734,7 +734,7 @@ impl DdlService for DdlServiceImpl { } async fn wait(&self, _request: Request) -> Result, Status> { - self.ddl_controller.wait().await; + self.ddl_controller.wait().await?; Ok(Response::new(WaitResponse {})) } } diff --git a/src/meta/src/rpc/ddl_controller.rs b/src/meta/src/rpc/ddl_controller.rs index 36615bd93b757..9886121bc03c2 100644 --- a/src/meta/src/rpc/ddl_controller.rs +++ b/src/meta/src/rpc/ddl_controller.rs @@ -1097,7 +1097,7 @@ impl DdlController { } } - pub async fn wait(&self) { + pub async fn wait(&self) -> MetaResult<()> { for _ in 0..30 * 60 { if self .catalog_manager @@ -1105,9 +1105,10 @@ impl DdlController { .await .is_empty() { - break; + return Ok(()); } sleep(Duration::from_secs(1)).await; } + Err(MetaError::cancelled("timeout".into())) } } From 7f82929b38131f43b1290e51b0722c972fa3b6c2 Mon Sep 17 00:00:00 2001 From: Noel Kwan <47273164+kwannoel@users.noreply.github.com> Date: Wed, 25 Oct 2023 16:57:45 +0800 Subject: [PATCH 3/3] fix(meta): persist internal tables of `CREATE TABLE` (#13039) --- src/meta/src/manager/catalog/mod.rs | 33 ++++++++++++----------------- src/meta/src/rpc/ddl_controller.rs | 1 + 2 files changed, 15 insertions(+), 19 deletions(-) diff --git a/src/meta/src/manager/catalog/mod.rs b/src/meta/src/manager/catalog/mod.rs index f988646428aac..15e74e4c2ac9e 100644 --- a/src/meta/src/manager/catalog/mod.rs +++ b/src/meta/src/manager/catalog/mod.rs @@ -647,7 +647,8 @@ impl CatalogManager { self.start_create_table_procedure_with_source(source, table) .await } else { - self.start_create_table_procedure(table, vec![]).await + self.start_create_table_procedure(table, internal_tables) + .await } } } @@ -765,7 +766,9 @@ impl CatalogManager { /// 2. Not belonging to a background stream job. /// Clean up these hanging tables by the id. pub async fn clean_dirty_tables(&self, fragment_manager: FragmentManagerRef) -> MetaResult<()> { - let creating_tables: Vec = self.list_persisted_creating_tables().await; + let core = &mut *self.core.lock().await; + let database_core = &mut core.database; + let creating_tables: Vec
= database_core.list_persisted_creating_tables(); tracing::debug!( "creating_tables ids: {:#?}", creating_tables.iter().map(|t| t.id).collect_vec() @@ -839,14 +842,13 @@ impl CatalogManager { } } - let core = &mut *self.core.lock().await; - let database_core = &mut core.database; let tables = &mut database_core.tables; let mut tables = BTreeMapTransaction::new(tables); for table in &tables_to_clean { - tracing::debug!("cleaning table_id: {}", table.id); - let table = tables.remove(table.id); - assert!(table.is_some()) + let table_id = table.id; + tracing::debug!("cleaning table_id: {}", table_id); + let table = tables.remove(table_id); + assert!(table.is_some(), "table_id {} missing", table_id) } commit_meta!(self, tables)?; @@ -929,14 +931,8 @@ impl CatalogManager { ); return Ok(()); }; - table - }; - - tracing::trace!("cleanup tables for {}", table.id); - { - let core = &mut self.core.lock().await; - let database_core = &mut core.database; + tracing::trace!("cleanup tables for {}", table.id); let mut table_ids = vec![table.id]; table_ids.extend(internal_table_ids); @@ -944,10 +940,11 @@ impl CatalogManager { let mut tables = BTreeMapTransaction::new(tables); for table_id in table_ids { let res = tables.remove(table_id); - assert!(res.is_some()); + assert!(res.is_some(), "table_id {} missing", table_id); } commit_meta!(self, tables)?; - } + table + }; { let core = &mut self.core.lock().await; @@ -1984,9 +1981,7 @@ impl CatalogManager { let table_key = (table.database_id, table.schema_id, table.name.clone()); assert!( !database_core.sources.contains_key(&source.id) - && !database_core.tables.contains_key(&table.id) - && database_core.has_in_progress_creation(&source_key) - && database_core.has_in_progress_creation(&table_key), + && !database_core.tables.contains_key(&table.id), "table and source must be in creating procedure" ); diff --git a/src/meta/src/rpc/ddl_controller.rs b/src/meta/src/rpc/ddl_controller.rs index 9886121bc03c2..5f40d9a561f4e 100644 --- a/src/meta/src/rpc/ddl_controller.rs +++ b/src/meta/src/rpc/ddl_controller.rs @@ -431,6 +431,7 @@ impl DdlController { let env = StreamEnvironment::from_protobuf(fragment_graph.get_env().unwrap()); + // Persist tables tracing::debug!(id = stream_job.id(), "preparing stream job"); let fragment_graph = self .prepare_stream_job(&mut stream_job, fragment_graph)