diff --git a/e2e_test/streaming/on_conflict.slt b/e2e_test/streaming/on_conflict.slt index 3c62e7f57a425..66327571a8761 100644 --- a/e2e_test/streaming/on_conflict.slt +++ b/e2e_test/streaming/on_conflict.slt @@ -1,3 +1,6 @@ +statement ok +SET RW_IMPLICIT_FLUSH TO true; + statement ok create table t1 (v1 int, v2 int, v3 int, primary key(v1)) on conflict ignore; @@ -20,6 +23,9 @@ select v1, v2, v3 from mv1; 2 3 3 3 4 5 +statement ok +SET RW_IMPLICIT_FLUSH TO true; + statement ok create table t2 (v1 int, v2 int, v3 int, primary key(v1)) on conflict overwrite; @@ -53,3 +59,50 @@ drop table t1; statement ok drop table t2; + + +statement ok +SET RW_IMPLICIT_FLUSH TO true; + +statement ok +create table t3 (v1 int, v2 int, v3 int, primary key(v1)) on conflict do update if not null; + +statement ok +insert into t3 values (1,null,2), (2,3,null); + +statement ok +insert into t3 values (3,null,5), (3,6,null); + +statement ok +insert into t3 values (1,5,null), (2,null,null); + +statement ok +create materialized view mv3 as select * from t3; + + +query III rowsort +select v1, v2, v3 from mv3; +---- +1 5 2 +2 3 NULL +3 6 5 + + +statement ok +update t3 set v2 = 2 where v1 > 1; + +statement ok +flush; + +query IIII rowsort +select v1, v2, v3 from mv3; +---- +1 5 2 +2 2 NULL +3 2 5 + +statement ok +drop materialized view mv3; + +statement ok +drop table t3; \ No newline at end of file diff --git a/src/sqlparser/src/parser.rs b/src/sqlparser/src/parser.rs index f49ec127cadad..5eae565aac0f8 100644 --- a/src/sqlparser/src/parser.rs +++ b/src/sqlparser/src/parser.rs @@ -2823,7 +2823,7 @@ impl Parser { Keyword::NOT, Keyword::NULL, ]) { - return parser_err!("On conflict behavior do update if not null is not supported yet."); + Ok(Some(OnConflict::DoUpdateIfNotNull)) } else { Ok(None) } diff --git a/src/sqlparser/tests/testdata/create.yaml b/src/sqlparser/tests/testdata/create.yaml index 31e7a9c8a06c1..e932eb60a2ee6 100644 --- a/src/sqlparser/tests/testdata/create.yaml +++ b/src/sqlparser/tests/testdata/create.yaml @@ -57,7 +57,7 @@ - input: CREATE TABLE T ("FULL" INT) ON CONFLICT IGNORE formatted_sql: CREATE TABLE T ("FULL" INT) ON CONFLICT IGNORE - input: CREATE TABLE T ("FULL" INT) ON CONFLICT DO UPDATE IF NOT NULL - error_msg: 'sql parser error: On conflict behavior do update if not null is not supported yet.' + formatted_sql: CREATE TABLE T ("FULL" INT) ON CONFLICT DO UPDATE IF NOT NULL - input: CREATE USER user WITH SUPERUSER CREATEDB PASSWORD 'password' formatted_sql: CREATE USER user WITH SUPERUSER CREATEDB PASSWORD 'password' - input: CREATE SINK snk diff --git a/src/stream/src/executor/mview/materialize.rs b/src/stream/src/executor/mview/materialize.rs index c7912e4334898..e24e48884e47e 100644 --- a/src/stream/src/executor/mview/materialize.rs +++ b/src/stream/src/executor/mview/materialize.rs @@ -25,15 +25,15 @@ use itertools::Itertools; use risingwave_common::array::{Op, StreamChunk}; use risingwave_common::buffer::Bitmap; use risingwave_common::catalog::{ColumnDesc, ColumnId, ConflictBehavior, Schema, TableId}; -use risingwave_common::row::{CompactedRow, RowDeserializer}; -use risingwave_common::types::DataType; +use risingwave_common::row::{CompactedRow, OwnedRow, RowDeserializer}; +use risingwave_common::types::{DataType, ScalarImpl}; use risingwave_common::util::chunk_coalesce::DataChunkBuilder; use risingwave_common::util::iter_util::{ZipEqDebug, ZipEqFast}; use risingwave_common::util::sort_util::ColumnOrder; -use risingwave_common::util::value_encoding::BasicSerde; +use risingwave_common::util::value_encoding::{BasicSerde, ValueRowSerializer}; use risingwave_pb::catalog::Table; use risingwave_storage::mem_table::KeyOp; -use risingwave_storage::row_serde::value_serde::ValueRowSerde; +use risingwave_storage::row_serde::value_serde::{ValueRowSerde, ValueRowSerdeNew}; use risingwave_storage::StateStore; use crate::cache::{new_unbounded, ManagedLruCache}; @@ -82,6 +82,17 @@ impl MaterializeExecutor { conflict_behavior: ConflictBehavior, metrics: Arc, ) -> Self { + let table_columns: Vec = table_catalog + .columns + .iter() + .map(|col| col.column_desc.as_ref().unwrap().into()) + .collect(); + + let row_serde: BasicSerde = BasicSerde::new( + Arc::from_iter(table_catalog.value_indices.iter().map(|val| *val as usize)), + Arc::from(table_columns.into_boxed_slice()), + ); + let arrange_key_indices: Vec = arrange_key.iter().map(|k| k.column_index).collect(); // Note: The current implementation could potentially trigger a switch on the inconsistent_op flag. If the storage relies on this flag to perform optimizations, it would be advisable to maintain consistency with it throughout the lifecycle. @@ -103,7 +114,7 @@ impl MaterializeExecutor { state_table, arrange_key_indices, actor_context, - materialize_cache: MaterializeCache::new(watermark_epoch, metrics_info), + materialize_cache: MaterializeCache::new(watermark_epoch, metrics_info, row_serde), conflict_behavior, } } @@ -139,7 +150,9 @@ impl MaterializeExecutor { .inc_by(chunk.cardinality() as u64); match self.conflict_behavior { - ConflictBehavior::Overwrite | ConflictBehavior::IgnoreConflict + ConflictBehavior::Overwrite + | ConflictBehavior::IgnoreConflict + | ConflictBehavior::DoUpdateIfNotNull if self.state_table.is_consistent_op() => { if chunk.cardinality() == 0 { @@ -193,12 +206,13 @@ impl MaterializeExecutor { } } ConflictBehavior::IgnoreConflict => unreachable!(), - ConflictBehavior::NoCheck | ConflictBehavior::Overwrite => { + ConflictBehavior::NoCheck + | ConflictBehavior::Overwrite + | ConflictBehavior::DoUpdateIfNotNull => { self.state_table.write_chunk(chunk.clone()); self.state_table.try_flush().await?; Message::Chunk(chunk) - } - ConflictBehavior::DoUpdateIfNotNull => unimplemented!(), + } // ConflictBehavior::DoUpdateIfNotNull => unimplemented!(), } } Message::Barrier(b) => { @@ -276,12 +290,16 @@ impl MaterializeExecutor { let arrange_columns: Vec = keys.iter().map(|k| k.column_index).collect(); let arrange_order_types = keys.iter().map(|k| k.order_type).collect(); let schema = input.schema().clone(); - let columns = column_ids + let columns: Vec = column_ids .into_iter() .zip_eq_fast(schema.fields.iter()) .map(|(column_id, field)| ColumnDesc::unnamed(column_id, field.data_type())) .collect_vec(); + let row_serde = BasicSerde::new( + Arc::from((0..columns.len()).collect_vec()), + Arc::from(columns.clone().into_boxed_slice()), + ); let state_table = StateTableInner::new_without_distribution( store, table_id, @@ -297,7 +315,11 @@ impl MaterializeExecutor { state_table, arrange_key_indices: arrange_columns.clone(), actor_context: ActorContext::for_test(0), - materialize_cache: MaterializeCache::new(watermark_epoch, MetricsInfo::for_test()), + materialize_cache: MaterializeCache::new( + watermark_epoch, + MetricsInfo::for_test(), + row_serde, + ), conflict_behavior, } } @@ -443,18 +465,24 @@ impl std::fmt::Debug for MaterializeExecutor { data: ManagedLruCache, CacheValue>, metrics_info: MetricsInfo, + row_serde: BasicSerde, _serde: PhantomData, } type CacheValue = Option; impl MaterializeCache { - pub fn new(watermark_epoch: AtomicU64Ref, metrics_info: MetricsInfo) -> Self { + pub fn new( + watermark_epoch: AtomicU64Ref, + metrics_info: MetricsInfo, + row_serde: BasicSerde, + ) -> Self { let cache: ManagedLruCache, CacheValue> = new_unbounded(watermark_epoch, metrics_info.clone()); Self { data: cache, metrics_info, + row_serde, _serde: PhantomData, } } @@ -472,8 +500,8 @@ impl MaterializeCache { self.fetch_keys(key_set.iter().map(|v| v.deref()), table, conflict_behavior) .await?; - let mut fixed_changes = MaterializeBuffer::new(); + let row_serde = self.row_serde.clone(); for (op, key, value) in row_ops { let mut update_cache = false; let fixed_changes = &mut fixed_changes; @@ -503,12 +531,55 @@ impl MaterializeCache { } }; } + + ConflictBehavior::DoUpdateIfNotNull => { + match self.force_get(&key) { + Some(old_row) => { + // In this section, we compare the new row and old row column by column and perform `DoUpdateIfNotNull` replacement. + // todo(wcy-fdu):find a way to output the resulting new row directly to the downstream chunk, thus avoiding an additional deserialization step. + let mut old_row_deserialized_vec = row_serde + .deserializer + .deserialize(old_row.row.clone())? + .into_inner() + .into_vec(); + let new_row_deserialized = + row_serde.deserializer.deserialize(value.clone())?; + + execute_do_update_if_not_null_replacement( + &mut old_row_deserialized_vec, + new_row_deserialized, + ); + let new_row = OwnedRow::new(old_row_deserialized_vec); + let new_row_bytes = + Bytes::from(row_serde.serializer.serialize(new_row)); + fixed_changes().update( + key.clone(), + old_row.row.clone(), + new_row_bytes.clone(), + ); + // Since `DoUpdateIfNotNull` may generate values that differ from both the new row and old row, + // an update cache needs to be done here. + self.data.push( + key.clone(), + Some(CompactedRow { row: new_row_bytes }), + ); + + update_cache = false; + } + None => { + fixed_changes().insert(key.clone(), value.clone()); + update_cache = true; + } + }; + } _ => unreachable!(), }; if update_cache { match conflict_behavior { - ConflictBehavior::Overwrite | ConflictBehavior::IgnoreConflict => { + ConflictBehavior::Overwrite + | ConflictBehavior::IgnoreConflict + | ConflictBehavior::DoUpdateIfNotNull => { self.data.push(key, Some(CompactedRow { row: value })); } @@ -519,7 +590,9 @@ impl MaterializeCache { Op::Delete | Op::UpdateDelete => { match conflict_behavior { - ConflictBehavior::Overwrite | ConflictBehavior::IgnoreConflict => { + ConflictBehavior::Overwrite + | ConflictBehavior::IgnoreConflict + | ConflictBehavior::DoUpdateIfNotNull => { match self.force_get(&key) { Some(old_row) => { fixed_changes().delete(key.clone(), old_row.row.clone()); @@ -527,7 +600,6 @@ impl MaterializeCache { None => (), // delete a nonexistent value }; } - _ => unreachable!(), }; @@ -571,7 +643,9 @@ impl MaterializeCache { while let Some(result) = buffered.next().await { let (key, value) = result; match conflict_behavior { - ConflictBehavior::Overwrite => self.data.push(key, value?), + ConflictBehavior::Overwrite | ConflictBehavior::DoUpdateIfNotNull => { + self.data.push(key, value?) + } ConflictBehavior::IgnoreConflict => self.data.push(key, value?), _ => unreachable!(), }; @@ -594,6 +668,33 @@ impl MaterializeCache { } } +/// Generates a new row with the behavior of "do update if not null" by replacing columns in the old row with non-empty columns in the new row. +/// +/// # Arguments +/// +/// * `old_row` - The old row containing the original values. +/// * `new_row` - The new row containing the replacement values. +/// +/// # Example +/// +/// ```no_run +/// let old_row = vec![Some(1), None, Some(3)]; +/// let mut new_row = vec![Some(10), Some(20), None]; +/// +/// +/// // After the execute_do_update_if_not_null_replacement function call, old_row will be [Some(10), Some(20), Some(3)]. +/// ``` +fn execute_do_update_if_not_null_replacement( + old_row: &mut Vec>, + new_row: OwnedRow, +) { + for (old_col, new_col) in old_row.iter_mut().zip_eq_fast(new_row) { + if let Some(new_value) = new_col { + *old_col = Some(new_value); + } + } +} + #[cfg(test)] mod tests { @@ -1511,6 +1612,200 @@ mod tests { } } + #[tokio::test] + async fn test_do_update_if_not_null_conflict() { + // Prepare storage and memtable. + let memory_state_store = MemoryStateStore::new(); + let table_id = TableId::new(1); + // Two columns of int32 type, the first column is PK. + let schema = Schema::new(vec![ + Field::unnamed(DataType::Int32), + Field::unnamed(DataType::Int32), + ]); + let column_ids = vec![0.into(), 1.into()]; + + // should get (8, 2) + let chunk1 = StreamChunk::from_pretty( + " i i + + 1 4 + + 2 . + + 3 6 + U- 8 . + U+ 8 2 + + 8 .", + ); + + // should not get (3, x), should not get (5, 0) + let chunk2 = StreamChunk::from_pretty( + " i i + + 7 8 + - 3 4 + - 5 0", + ); + + // should get (2, None), (7, 8) + let chunk3 = StreamChunk::from_pretty( + " i i + + 1 5 + + 7 . + U- 2 4 + U+ 2 . + U- 9 0 + U+ 9 1", + ); + + // Prepare stream executors. + let source = MockSource::with_messages(vec![ + Message::Barrier(Barrier::new_test_barrier(test_epoch(1))), + Message::Chunk(chunk1), + Message::Barrier(Barrier::new_test_barrier(test_epoch(2))), + Message::Chunk(chunk2), + Message::Barrier(Barrier::new_test_barrier(test_epoch(3))), + Message::Chunk(chunk3), + Message::Barrier(Barrier::new_test_barrier(test_epoch(4))), + ]) + .into_executor(schema.clone(), PkIndices::new()); + + let order_types = vec![OrderType::ascending()]; + let column_descs = vec![ + ColumnDesc::unnamed(column_ids[0], DataType::Int32), + ColumnDesc::unnamed(column_ids[1], DataType::Int32), + ]; + + let table = StorageTable::for_test( + memory_state_store.clone(), + table_id, + column_descs, + order_types, + vec![0], + vec![0, 1], + ); + + let mut materialize_executor = MaterializeExecutor::for_test( + source, + memory_state_store, + table_id, + vec![ColumnOrder::new(0, OrderType::ascending())], + column_ids, + Arc::new(AtomicU64::new(0)), + ConflictBehavior::DoUpdateIfNotNull, + ) + .await + .boxed() + .execute(); + materialize_executor.next().await.transpose().unwrap(); + + materialize_executor.next().await.transpose().unwrap(); + + // First stream chunk. We check the existence of (3) -> (3,6) + match materialize_executor.next().await.transpose().unwrap() { + Some(Message::Barrier(_)) => { + let row = table + .get_row( + &OwnedRow::new(vec![Some(8_i32.into())]), + HummockReadEpoch::NoWait(u64::MAX), + ) + .await + .unwrap(); + assert_eq!( + row, + Some(OwnedRow::new(vec![Some(8_i32.into()), Some(2_i32.into())])) + ); + + let row = table + .get_row( + &OwnedRow::new(vec![Some(2_i32.into())]), + HummockReadEpoch::NoWait(u64::MAX), + ) + .await + .unwrap(); + assert_eq!(row, Some(OwnedRow::new(vec![Some(2_i32.into()), None]))); + } + _ => unreachable!(), + } + materialize_executor.next().await.transpose().unwrap(); + + match materialize_executor.next().await.transpose().unwrap() { + Some(Message::Barrier(_)) => { + let row = table + .get_row( + &OwnedRow::new(vec![Some(7_i32.into())]), + HummockReadEpoch::NoWait(u64::MAX), + ) + .await + .unwrap(); + assert_eq!( + row, + Some(OwnedRow::new(vec![Some(7_i32.into()), Some(8_i32.into())])) + ); + + // check delete wrong value + let row = table + .get_row( + &OwnedRow::new(vec![Some(3_i32.into())]), + HummockReadEpoch::NoWait(u64::MAX), + ) + .await + .unwrap(); + assert_eq!(row, None); + + // check delete wrong pk + let row = table + .get_row( + &OwnedRow::new(vec![Some(5_i32.into())]), + HummockReadEpoch::NoWait(u64::MAX), + ) + .await + .unwrap(); + assert_eq!(row, None); + } + _ => unreachable!(), + } + + materialize_executor.next().await.transpose().unwrap(); + // materialize_executor.next().await.transpose().unwrap(); + // Second stream chunk. We check the existence of (7) -> (7,8) + match materialize_executor.next().await.transpose().unwrap() { + Some(Message::Barrier(_)) => { + let row = table + .get_row( + &OwnedRow::new(vec![Some(7_i32.into())]), + HummockReadEpoch::NoWait(u64::MAX), + ) + .await + .unwrap(); + assert_eq!( + row, + Some(OwnedRow::new(vec![Some(7_i32.into()), Some(8_i32.into())])) + ); + + // check update wrong value + let row = table + .get_row( + &OwnedRow::new(vec![Some(2_i32.into())]), + HummockReadEpoch::NoWait(u64::MAX), + ) + .await + .unwrap(); + assert_eq!(row, Some(OwnedRow::new(vec![Some(2_i32.into()), None]))); + + // check update wrong pk, should become insert + let row = table + .get_row( + &OwnedRow::new(vec![Some(9_i32.into())]), + HummockReadEpoch::NoWait(u64::MAX), + ) + .await + .unwrap(); + assert_eq!( + row, + Some(OwnedRow::new(vec![Some(9_i32.into()), Some(1_i32.into())])) + ); + } + _ => unreachable!(), + } + } + fn gen_fuzz_data(row_number: usize, chunk_size: usize) -> Vec { const KN: u32 = 4; const SEED: u64 = 998244353;