diff --git a/e2e_test/batch/transaction/same_session.slt b/e2e_test/batch/transaction/same_session.slt new file mode 100644 index 000000000000..2593c4d338d0 --- /dev/null +++ b/e2e_test/batch/transaction/same_session.slt @@ -0,0 +1,27 @@ +statement ok +create table t (id int primary key); + +statement ok +insert into t select i from generate_series(1, 100, 1) i; + +statement ok +flush + +# we don't use flush between delete and insert to test in the same session whether delete and insert overlap. +statement ok +delete from t; + +statement ok +insert into t select i from generate_series(1, 100, 1) i; + +statement ok +flush + +# Should be no overlap +query I +select count(*) from t; +---- +100 + +statement ok +drop table t; \ No newline at end of file diff --git a/proto/batch_plan.proto b/proto/batch_plan.proto index bf7fab1ae37f..f6164f12226b 100644 --- a/proto/batch_plan.proto +++ b/proto/batch_plan.proto @@ -88,6 +88,9 @@ message InsertNode { // be filled in streaming. optional uint32 row_id_index = 3; bool returning = 4; + + // Session id is used to ensure that dml data from the same session should be sent to a fixed worker node and channel. + uint32 session_id = 7; } message DeleteNode { @@ -96,6 +99,9 @@ message DeleteNode { // Version of the table. uint64 table_version_id = 3; bool returning = 2; + + // Session id is used to ensure that dml data from the same session should be sent to a fixed worker node and channel. + uint32 session_id = 4; } message UpdateNode { @@ -107,6 +113,9 @@ message UpdateNode { bool returning = 3; // The columns indices in the input schema, representing the columns need to send to streamDML exeuctor. repeated uint32 update_column_indices = 5; + + // Session id is used to ensure that dml data from the same session should be sent to a fixed worker node and channel. + uint32 session_id = 6; } message ValuesNode { diff --git a/src/batch/src/executor/delete.rs b/src/batch/src/executor/delete.rs index b0a7499ae161..c5d7d06c4233 100644 --- a/src/batch/src/executor/delete.rs +++ b/src/batch/src/executor/delete.rs @@ -44,6 +44,7 @@ pub struct DeleteExecutor { identity: String, returning: bool, txn_id: TxnId, + session_id: u32, } impl DeleteExecutor { @@ -55,6 +56,7 @@ impl DeleteExecutor { chunk_size: usize, identity: String, returning: bool, + session_id: u32, ) -> Self { let table_schema = child.schema().clone(); let txn_id = dml_manager.gen_txn_id(); @@ -74,6 +76,7 @@ impl DeleteExecutor { identity, returning, txn_id, + session_id, } } } @@ -110,7 +113,7 @@ impl DeleteExecutor { self.child.schema().data_types(), "bad delete schema" ); - let mut write_handle = table_dml_handle.write_handle(self.txn_id)?; + let mut write_handle = table_dml_handle.write_handle(self.session_id, self.txn_id)?; write_handle.begin()?; @@ -182,6 +185,7 @@ impl BoxedExecutorBuilder for DeleteExecutor { source.context.get_config().developer.chunk_size, source.plan_node().get_identity().clone(), delete_node.returning, + delete_node.session_id, ))) } } @@ -247,6 +251,7 @@ mod tests { 1024, "DeleteExecutor".to_string(), false, + 0, )); let handle = tokio::spawn(async move { diff --git a/src/batch/src/executor/insert.rs b/src/batch/src/executor/insert.rs index 7536f160be32..d236a2556102 100644 --- a/src/batch/src/executor/insert.rs +++ b/src/batch/src/executor/insert.rs @@ -52,6 +52,7 @@ pub struct InsertExecutor { row_id_index: Option, returning: bool, txn_id: TxnId, + session_id: u32, } impl InsertExecutor { @@ -67,6 +68,7 @@ impl InsertExecutor { sorted_default_columns: Vec<(usize, BoxedExpression)>, row_id_index: Option, returning: bool, + session_id: u32, ) -> Self { let table_schema = child.schema().clone(); let txn_id = dml_manager.gen_txn_id(); @@ -89,6 +91,7 @@ impl InsertExecutor { row_id_index, returning, txn_id, + session_id, } } } @@ -116,7 +119,7 @@ impl InsertExecutor { let table_dml_handle = self .dml_manager .table_dml_handle(self.table_id, self.table_version_id)?; - let mut write_handle = table_dml_handle.write_handle(self.txn_id)?; + let mut write_handle = table_dml_handle.write_handle(self.session_id, self.txn_id)?; write_handle.begin()?; @@ -253,6 +256,7 @@ impl BoxedExecutorBuilder for InsertExecutor { sorted_default_columns, insert_node.row_id_index.as_ref().map(|index| *index as _), insert_node.returning, + insert_node.session_id, ))) } } @@ -348,6 +352,7 @@ mod tests { vec![], row_id_index, false, + 0, )); let handle = tokio::spawn(async move { let mut stream = insert_executor.execute(); diff --git a/src/batch/src/executor/update.rs b/src/batch/src/executor/update.rs index b0e1b4750cfc..1706a5f5cba7 100644 --- a/src/batch/src/executor/update.rs +++ b/src/batch/src/executor/update.rs @@ -49,6 +49,7 @@ pub struct UpdateExecutor { returning: bool, txn_id: TxnId, update_column_indices: Vec, + session_id: u32, } impl UpdateExecutor { @@ -63,6 +64,7 @@ impl UpdateExecutor { identity: String, returning: bool, update_column_indices: Vec, + session_id: u32, ) -> Self { let chunk_size = chunk_size.next_multiple_of(2); let table_schema = child.schema().clone(); @@ -86,6 +88,7 @@ impl UpdateExecutor { returning, txn_id, update_column_indices, + session_id, } } } @@ -134,7 +137,7 @@ impl UpdateExecutor { let mut builder = DataChunkBuilder::new(data_types, self.chunk_size); let mut write_handle: risingwave_source::WriteHandle = - table_dml_handle.write_handle(self.txn_id)?; + table_dml_handle.write_handle(self.session_id, self.txn_id)?; write_handle.begin()?; // Transform the data chunk to a stream chunk, then write to the source. @@ -246,6 +249,7 @@ impl BoxedExecutorBuilder for UpdateExecutor { source.plan_node().get_identity().clone(), update_node.returning, update_column_indices, + update_node.session_id, ))) } } @@ -321,6 +325,7 @@ mod tests { "UpdateExecutor".to_string(), false, vec![0, 1], + 0, )); let handle = tokio::spawn(async move { diff --git a/src/compute/tests/integration_tests.rs b/src/compute/tests/integration_tests.rs index 60cc689616c6..490e90d17401 100644 --- a/src/compute/tests/integration_tests.rs +++ b/src/compute/tests/integration_tests.rs @@ -245,6 +245,7 @@ async fn test_table_materialize() -> StreamResult<()> { vec![], Some(row_id_index), false, + 0, )); let value_indices = (0..column_descs.len()).collect_vec(); @@ -366,6 +367,7 @@ async fn test_table_materialize() -> StreamResult<()> { 1024, "DeleteExecutor".to_string(), false, + 0, )); curr_epoch += 1; diff --git a/src/frontend/src/optimizer/plan_node/batch_delete.rs b/src/frontend/src/optimizer/plan_node/batch_delete.rs index c960eb1d83c9..d1fc6f1947d1 100644 --- a/src/frontend/src/optimizer/plan_node/batch_delete.rs +++ b/src/frontend/src/optimizer/plan_node/batch_delete.rs @@ -71,6 +71,7 @@ impl ToBatchPb for BatchDelete { table_id: self.core.table_id.table_id(), table_version_id: self.core.table_version_id, returning: self.core.returning, + session_id: self.base.ctx().session_ctx().session_id().0 as u32, }) } } diff --git a/src/frontend/src/optimizer/plan_node/batch_insert.rs b/src/frontend/src/optimizer/plan_node/batch_insert.rs index 4a280471fe19..caf7c449358b 100644 --- a/src/frontend/src/optimizer/plan_node/batch_insert.rs +++ b/src/frontend/src/optimizer/plan_node/batch_insert.rs @@ -101,6 +101,7 @@ impl ToBatchPb for BatchInsert { }, row_id_index: self.core.row_id_index.map(|index| index as _), returning: self.core.returning, + session_id: self.base.ctx().session_ctx().session_id().0 as u32, }) } } diff --git a/src/frontend/src/optimizer/plan_node/batch_update.rs b/src/frontend/src/optimizer/plan_node/batch_update.rs index 5b3a6a8739fc..b2e7e1913fb3 100644 --- a/src/frontend/src/optimizer/plan_node/batch_update.rs +++ b/src/frontend/src/optimizer/plan_node/batch_update.rs @@ -84,6 +84,7 @@ impl ToBatchPb for BatchUpdate { table_version_id: self.core.table_version_id, returning: self.core.returning, update_column_indices, + session_id: self.base.ctx().session_ctx().session_id().0 as u32, }) } } diff --git a/src/frontend/src/scheduler/distributed/stage.rs b/src/frontend/src/scheduler/distributed/stage.rs index cc86d55f2b98..2d0df049da3f 100644 --- a/src/frontend/src/scheduler/distributed/stage.rs +++ b/src/frontend/src/scheduler/distributed/stage.rs @@ -27,7 +27,6 @@ use futures::stream::Fuse; use futures::{stream, StreamExt, TryStreamExt}; use futures_async_stream::for_await; use itertools::Itertools; -use rand::seq::SliceRandom; use risingwave_batch::executor::ExecutorBuilder; use risingwave_batch::task::{ShutdownMsg, ShutdownSender, ShutdownToken, TaskId as TaskIdBatch}; use risingwave_common::array::DataChunk; @@ -698,51 +697,51 @@ impl StageRunner { dml_table_id: Option, ) -> SchedulerResult> { let plan_node = plan_fragment.root.as_ref().expect("fail to get plan node"); - let vnode_mapping = match dml_table_id { - Some(table_id) => Some(self.get_table_dml_vnode_mapping(&table_id)?), - None => { - if let Some(distributed_lookup_join_node) = - Self::find_distributed_lookup_join_node(plan_node) - { - let fragment_id = self.get_fragment_id( - &distributed_lookup_join_node - .inner_side_table_desc - .as_ref() - .unwrap() - .table_id - .into(), - )?; - let id2pu_vec = self - .worker_node_manager - .fragment_mapping(fragment_id)? - .iter_unique() - .collect_vec(); - - let pu = id2pu_vec[task_id as usize]; - let candidates = self - .worker_node_manager - .manager - .get_workers_by_parallel_unit_ids(&[pu])?; - return Ok(Some(candidates[0].clone())); - } else { - None - } - } - }; - let worker_node = match vnode_mapping { - Some(mapping) => { - let parallel_unit_ids = mapping.iter_unique().collect_vec(); - let candidates = self - .worker_node_manager - .manager - .get_workers_by_parallel_unit_ids(¶llel_unit_ids)?; - Some(candidates.choose(&mut rand::thread_rng()).unwrap().clone()) + if let Some(table_id) = dml_table_id { + let vnode_mapping = self.get_table_dml_vnode_mapping(&table_id)?; + let parallel_unit_ids = vnode_mapping.iter_unique().collect_vec(); + let candidates = self + .worker_node_manager + .manager + .get_workers_by_parallel_unit_ids(¶llel_unit_ids)?; + if candidates.is_empty() { + return Err(SchedulerError::EmptyWorkerNodes); } - None => None, + return Ok(Some( + candidates[self.stage.session_id.0 as usize % candidates.len()].clone(), + )); }; - Ok(worker_node) + if let Some(distributed_lookup_join_node) = + Self::find_distributed_lookup_join_node(plan_node) + { + let fragment_id = self.get_fragment_id( + &distributed_lookup_join_node + .inner_side_table_desc + .as_ref() + .unwrap() + .table_id + .into(), + )?; + let id2pu_vec = self + .worker_node_manager + .fragment_mapping(fragment_id)? + .iter_unique() + .collect_vec(); + + let pu = id2pu_vec[task_id as usize]; + let candidates = self + .worker_node_manager + .manager + .get_workers_by_parallel_unit_ids(&[pu])?; + if candidates.is_empty() { + return Err(SchedulerError::EmptyWorkerNodes); + } + Ok(Some(candidates[0].clone())) + } else { + Ok(None) + } } fn find_distributed_lookup_join_node( diff --git a/src/frontend/src/scheduler/local.rs b/src/frontend/src/scheduler/local.rs index 63ebe1b443bb..95b36e50b497 100644 --- a/src/frontend/src/scheduler/local.rs +++ b/src/frontend/src/scheduler/local.rs @@ -24,7 +24,6 @@ use futures::StreamExt; use futures_async_stream::try_stream; use itertools::Itertools; use pgwire::pg_server::BoxedError; -use rand::seq::SliceRandom; use risingwave_batch::executor::ExecutorBuilder; use risingwave_batch::task::{ShutdownToken, TaskId}; use risingwave_common::array::DataChunk; @@ -581,7 +580,10 @@ impl LocalQueryExecution { .worker_node_manager .manager .get_workers_by_parallel_unit_ids(¶llel_unit_ids)?; - candidates.choose(&mut rand::thread_rng()).unwrap().clone() + if candidates.is_empty() { + return Err(SchedulerError::EmptyWorkerNodes); + } + candidates[stage.session_id.0 as usize % candidates.len()].clone() }; Ok(vec![worker_node]) } else { diff --git a/src/frontend/src/scheduler/plan_fragmenter.rs b/src/frontend/src/scheduler/plan_fragmenter.rs index 09453b9cfe44..e40282cbacf8 100644 --- a/src/frontend/src/scheduler/plan_fragmenter.rs +++ b/src/frontend/src/scheduler/plan_fragmenter.rs @@ -22,6 +22,7 @@ use anyhow::anyhow; use async_recursion::async_recursion; use enum_as_inner::EnumAsInner; use itertools::Itertools; +use pgwire::pg_server::SessionId; use risingwave_common::buffer::{Bitmap, BitmapBuilder}; use risingwave_common::catalog::TableDesc; use risingwave_common::error::RwError; @@ -364,6 +365,7 @@ pub struct QueryStage { pub source_info: Option, pub has_lookup_join: bool, pub dml_table_id: Option, + pub session_id: SessionId, /// Used to generate exchange information when complete source scan information. children_exchange_distribution: Option>, @@ -395,6 +397,7 @@ impl QueryStage { source_info: self.source_info.clone(), has_lookup_join: self.has_lookup_join, dml_table_id: self.dml_table_id, + session_id: self.session_id, children_exchange_distribution: self.children_exchange_distribution.clone(), }; } @@ -423,6 +426,7 @@ impl QueryStage { source_info: Some(source_info), has_lookup_join: self.has_lookup_join, dml_table_id: self.dml_table_id, + session_id: self.session_id, children_exchange_distribution: None, } } @@ -467,6 +471,7 @@ struct QueryStageBuilder { source_info: Option, has_lookup_join: bool, dml_table_id: Option, + session_id: SessionId, children_exchange_distribution: HashMap, } @@ -482,6 +487,7 @@ impl QueryStageBuilder { source_info: Option, has_lookup_join: bool, dml_table_id: Option, + session_id: SessionId, ) -> Self { Self { query_id, @@ -494,6 +500,7 @@ impl QueryStageBuilder { source_info, has_lookup_join, dml_table_id, + session_id, children_exchange_distribution: HashMap::new(), } } @@ -514,6 +521,7 @@ impl QueryStageBuilder { source_info: self.source_info, has_lookup_join: self.has_lookup_join, dml_table_id: self.dml_table_id, + session_id: self.session_id, children_exchange_distribution, }); @@ -809,6 +817,7 @@ impl BatchPlanFragmenter { source_info, has_lookup_join, dml_table_id, + root.ctx().session_ctx().session_id(), ); self.visit_node(root, &mut builder, None)?; diff --git a/src/source/src/dml_manager.rs b/src/source/src/dml_manager.rs index af34003f4bde..b4b03f9798c5 100644 --- a/src/source/src/dml_manager.rs +++ b/src/source/src/dml_manager.rs @@ -180,6 +180,7 @@ mod tests { use super::*; const TEST_TRANSACTION_ID: TxnId = 0; + const TEST_SESSION_ID: u32 = 0; #[tokio::test] async fn test_register_and_drop() { @@ -206,7 +207,9 @@ mod tests { let table_dml_handle = dml_manager .table_dml_handle(table_id, table_version_id) .unwrap(); - let mut write_handle = table_dml_handle.write_handle(TEST_TRANSACTION_ID).unwrap(); + let mut write_handle = table_dml_handle + .write_handle(TEST_SESSION_ID, TEST_TRANSACTION_ID) + .unwrap(); write_handle.begin().unwrap(); // Should be able to write to the table. @@ -219,7 +222,9 @@ mod tests { write_handle.write_chunk(chunk()).await.unwrap_err(); // Unless we create a new write handle. - let mut write_handle = table_dml_handle.write_handle(TEST_TRANSACTION_ID).unwrap(); + let mut write_handle = table_dml_handle + .write_handle(TEST_SESSION_ID, TEST_TRANSACTION_ID) + .unwrap(); write_handle.begin().unwrap(); write_handle.write_chunk(chunk()).await.unwrap(); @@ -254,7 +259,9 @@ mod tests { let table_dml_handle = dml_manager .table_dml_handle(table_id, old_version_id) .unwrap(); - let mut write_handle = table_dml_handle.write_handle(TEST_TRANSACTION_ID).unwrap(); + let mut write_handle = table_dml_handle + .write_handle(TEST_SESSION_ID, TEST_TRANSACTION_ID) + .unwrap(); write_handle.begin().unwrap(); // Should be able to write to the table. @@ -278,7 +285,9 @@ mod tests { let table_dml_handle = dml_manager .table_dml_handle(table_id, new_version_id) .unwrap(); - let mut write_handle = table_dml_handle.write_handle(TEST_TRANSACTION_ID).unwrap(); + let mut write_handle = table_dml_handle + .write_handle(TEST_SESSION_ID, TEST_TRANSACTION_ID) + .unwrap(); write_handle.begin().unwrap(); write_handle.write_chunk(new_chunk()).await.unwrap(); } diff --git a/src/source/src/table.rs b/src/source/src/table.rs index 503807283f46..ba0292e4f6ca 100644 --- a/src/source/src/table.rs +++ b/src/source/src/table.rs @@ -78,7 +78,7 @@ impl TableDmlHandle { TableStreamReader { rx } } - pub fn write_handle(&self, txn_id: TxnId) -> Result { + pub fn write_handle(&self, session_id: u32, txn_id: TxnId) -> Result { // The `changes_txs` should not be empty normally, since we ensured that the channels // between the `TableDmlHandle` and the `SourceExecutor`s are ready before we making the // table catalog visible to the users. However, when we're recovering, it's possible @@ -94,9 +94,11 @@ impl TableDmlHandle { ))); } let len = guard.changes_txs.len(); + // Use session id instead of txn_id to choose channel so that we can preserve transaction order in the same session. + // PS: only hold if there's no scaling on the table. let sender = guard .changes_txs - .get((txn_id % len as u64) as usize) + .get((session_id % len as u32) as usize) .context("no available table reader in streaming source executors")? .clone(); @@ -298,6 +300,7 @@ mod tests { use super::*; const TEST_TRANSACTION_ID: TxnId = 0; + const TEST_SESSION_ID: u32 = 0; fn new_table_dml_handle() -> TableDmlHandle { TableDmlHandle::new( @@ -310,7 +313,9 @@ mod tests { async fn test_table_dml_handle() -> Result<()> { let table_dml_handle = Arc::new(new_table_dml_handle()); let mut reader = table_dml_handle.stream_reader().into_stream(); - let mut write_handle = table_dml_handle.write_handle(TEST_TRANSACTION_ID).unwrap(); + let mut write_handle = table_dml_handle + .write_handle(TEST_SESSION_ID, TEST_TRANSACTION_ID) + .unwrap(); write_handle.begin().unwrap(); assert_matches!(reader.next().await.unwrap()?, TxnMsg::Begin(_)); @@ -354,7 +359,9 @@ mod tests { async fn test_write_handle_rollback_on_drop() -> Result<()> { let table_dml_handle = Arc::new(new_table_dml_handle()); let mut reader = table_dml_handle.stream_reader().into_stream(); - let mut write_handle = table_dml_handle.write_handle(TEST_TRANSACTION_ID).unwrap(); + let mut write_handle = table_dml_handle + .write_handle(TEST_SESSION_ID, TEST_TRANSACTION_ID) + .unwrap(); write_handle.begin().unwrap(); assert_matches!(reader.next().await.unwrap()?, TxnMsg::Begin(_)); diff --git a/src/stream/src/executor/dml.rs b/src/stream/src/executor/dml.rs index 435192974bff..7e0b50c51a52 100644 --- a/src/stream/src/executor/dml.rs +++ b/src/stream/src/executor/dml.rs @@ -192,6 +192,21 @@ impl DmlExecutor { batch_group.iter().map(|c| c.cardinality()).sum::(); if txn_buffer_cardinality >= self.chunk_size { + // txn buffer is too large, so yield batch group first to preserve the transaction order in the same session. + if !batch_group.is_empty() { + let vec = mem::take(&mut batch_group); + for chunk in vec { + for (op, row) in chunk.rows() { + if let Some(chunk) = builder.append_row(op, row) { + yield Message::Chunk(chunk); + } + } + } + if let Some(chunk) = builder.take() { + yield Message::Chunk(chunk); + } + } + // txn buffer isn't small, so yield. for chunk in txn_buffer.vec { yield Message::Chunk(chunk); @@ -202,21 +217,23 @@ impl DmlExecutor { // txn buffer is small and batch group has space. batch_group.extend(txn_buffer.vec); } else { - // txn buffer is small and batch group has no space, so yield the large one. - if txn_buffer_cardinality < batch_group_cardinality { - mem::swap(&mut txn_buffer.vec, &mut batch_group); - } - - for chunk in txn_buffer.vec { - for (op, row) in chunk.rows() { - if let Some(chunk) = builder.append_row(op, row) { - yield Message::Chunk(chunk); + // txn buffer is small and batch group has no space, so yield the batch group first to preserve the transaction order in the same session. + if !batch_group.is_empty() { + let vec = mem::take(&mut batch_group); + for chunk in vec { + for (op, row) in chunk.rows() { + if let Some(chunk) = builder.append_row(op, row) { + yield Message::Chunk(chunk); + } } } + if let Some(chunk) = builder.take() { + yield Message::Chunk(chunk); + } } - if let Some(chunk) = builder.take() { - yield Message::Chunk(chunk); - } + + // put txn buffer into the batch group + mem::swap(&mut txn_buffer.vec, &mut batch_group); } } TxnMsg::Rollback(txn_id) => { @@ -288,6 +305,7 @@ mod tests { use crate::executor::test_utils::MockSource; const TEST_TRANSACTION_ID: TxnId = 0; + const TEST_SESSION_ID: u32 = 0; #[tokio::test] async fn test_dml_executor() { @@ -357,7 +375,9 @@ mod tests { let table_dml_handle = dml_manager .table_dml_handle(table_id, INITIAL_TABLE_VERSION_ID) .unwrap(); - let mut write_handle = table_dml_handle.write_handle(TEST_TRANSACTION_ID).unwrap(); + let mut write_handle = table_dml_handle + .write_handle(TEST_SESSION_ID, TEST_TRANSACTION_ID) + .unwrap(); // Message from batch write_handle.begin().unwrap(); diff --git a/src/stream/src/executor/stream_reader.rs b/src/stream/src/executor/stream_reader.rs index c8d84926bd6a..de490f730dea 100644 --- a/src/stream/src/executor/stream_reader.rs +++ b/src/stream/src/executor/stream_reader.rs @@ -152,6 +152,7 @@ mod tests { const TEST_TRANSACTION_ID1: TxnId = 0; const TEST_TRANSACTION_ID2: TxnId = 1; + const TEST_SESSION_ID: u32 = 0; const TEST_DML_CHANNEL_INIT_PERMITS: usize = 32768; #[tokio::test] @@ -162,8 +163,12 @@ mod tests { let source_stream = table_dml_handle.stream_reader().into_data_stream_for_test(); - let mut write_handle1 = table_dml_handle.write_handle(TEST_TRANSACTION_ID1).unwrap(); - let mut write_handle2 = table_dml_handle.write_handle(TEST_TRANSACTION_ID2).unwrap(); + let mut write_handle1 = table_dml_handle + .write_handle(TEST_SESSION_ID, TEST_TRANSACTION_ID1) + .unwrap(); + let mut write_handle2 = table_dml_handle + .write_handle(TEST_SESSION_ID, TEST_TRANSACTION_ID2) + .unwrap(); let barrier_stream = barrier_to_message_stream(barrier_rx).boxed(); let stream =