From 3afd98a276a6b5e66cbb242b17c03b2bb9636f70 Mon Sep 17 00:00:00 2001 From: Dylan Chen Date: Fri, 5 Jan 2024 16:57:11 +0800 Subject: [PATCH] pass session_id to insert/delete/upadte and decide txn channel based on session_id --- e2e_test/batch/transaction/same_session.slt | 20 +++++++++++++++++++ proto/batch_plan.proto | 9 +++++++++ src/batch/src/executor/delete.rs | 7 ++++++- src/batch/src/executor/insert.rs | 7 ++++++- src/batch/src/executor/update.rs | 7 ++++++- src/compute/tests/integration_tests.rs | 2 ++ .../src/optimizer/plan_node/batch_delete.rs | 1 + .../src/optimizer/plan_node/batch_insert.rs | 1 + .../src/optimizer/plan_node/batch_update.rs | 1 + src/source/src/dml_manager.rs | 17 ++++++++++++---- src/source/src/table.rs | 14 +++++++++---- src/stream/src/executor/dml.rs | 5 ++++- src/stream/src/executor/stream_reader.rs | 9 +++++++-- 13 files changed, 86 insertions(+), 14 deletions(-) create mode 100644 e2e_test/batch/transaction/same_session.slt diff --git a/e2e_test/batch/transaction/same_session.slt b/e2e_test/batch/transaction/same_session.slt new file mode 100644 index 000000000000..ea519af897df --- /dev/null +++ b/e2e_test/batch/transaction/same_session.slt @@ -0,0 +1,20 @@ +statement ok +create table t (id int primary key); + +# we don't use flush and test in the same session whether delete and insert overlap. +statement ok +delete from t; + +statement ok +insert into t select i from generate_series(1, 100, 1) i; + +sleep 2s + +# Should be no overlap +query I +select count(*) from t; +---- +100 + +statement ok +drop table t; \ No newline at end of file diff --git a/proto/batch_plan.proto b/proto/batch_plan.proto index bf7fab1ae37f..f6164f12226b 100644 --- a/proto/batch_plan.proto +++ b/proto/batch_plan.proto @@ -88,6 +88,9 @@ message InsertNode { // be filled in streaming. optional uint32 row_id_index = 3; bool returning = 4; + + // Session id is used to ensure that dml data from the same session should be sent to a fixed worker node and channel. + uint32 session_id = 7; } message DeleteNode { @@ -96,6 +99,9 @@ message DeleteNode { // Version of the table. uint64 table_version_id = 3; bool returning = 2; + + // Session id is used to ensure that dml data from the same session should be sent to a fixed worker node and channel. + uint32 session_id = 4; } message UpdateNode { @@ -107,6 +113,9 @@ message UpdateNode { bool returning = 3; // The columns indices in the input schema, representing the columns need to send to streamDML exeuctor. repeated uint32 update_column_indices = 5; + + // Session id is used to ensure that dml data from the same session should be sent to a fixed worker node and channel. + uint32 session_id = 6; } message ValuesNode { diff --git a/src/batch/src/executor/delete.rs b/src/batch/src/executor/delete.rs index b0a7499ae161..c5d7d06c4233 100644 --- a/src/batch/src/executor/delete.rs +++ b/src/batch/src/executor/delete.rs @@ -44,6 +44,7 @@ pub struct DeleteExecutor { identity: String, returning: bool, txn_id: TxnId, + session_id: u32, } impl DeleteExecutor { @@ -55,6 +56,7 @@ impl DeleteExecutor { chunk_size: usize, identity: String, returning: bool, + session_id: u32, ) -> Self { let table_schema = child.schema().clone(); let txn_id = dml_manager.gen_txn_id(); @@ -74,6 +76,7 @@ impl DeleteExecutor { identity, returning, txn_id, + session_id, } } } @@ -110,7 +113,7 @@ impl DeleteExecutor { self.child.schema().data_types(), "bad delete schema" ); - let mut write_handle = table_dml_handle.write_handle(self.txn_id)?; + let mut write_handle = table_dml_handle.write_handle(self.session_id, self.txn_id)?; write_handle.begin()?; @@ -182,6 +185,7 @@ impl BoxedExecutorBuilder for DeleteExecutor { source.context.get_config().developer.chunk_size, source.plan_node().get_identity().clone(), delete_node.returning, + delete_node.session_id, ))) } } @@ -247,6 +251,7 @@ mod tests { 1024, "DeleteExecutor".to_string(), false, + 0, )); let handle = tokio::spawn(async move { diff --git a/src/batch/src/executor/insert.rs b/src/batch/src/executor/insert.rs index 7536f160be32..d236a2556102 100644 --- a/src/batch/src/executor/insert.rs +++ b/src/batch/src/executor/insert.rs @@ -52,6 +52,7 @@ pub struct InsertExecutor { row_id_index: Option, returning: bool, txn_id: TxnId, + session_id: u32, } impl InsertExecutor { @@ -67,6 +68,7 @@ impl InsertExecutor { sorted_default_columns: Vec<(usize, BoxedExpression)>, row_id_index: Option, returning: bool, + session_id: u32, ) -> Self { let table_schema = child.schema().clone(); let txn_id = dml_manager.gen_txn_id(); @@ -89,6 +91,7 @@ impl InsertExecutor { row_id_index, returning, txn_id, + session_id, } } } @@ -116,7 +119,7 @@ impl InsertExecutor { let table_dml_handle = self .dml_manager .table_dml_handle(self.table_id, self.table_version_id)?; - let mut write_handle = table_dml_handle.write_handle(self.txn_id)?; + let mut write_handle = table_dml_handle.write_handle(self.session_id, self.txn_id)?; write_handle.begin()?; @@ -253,6 +256,7 @@ impl BoxedExecutorBuilder for InsertExecutor { sorted_default_columns, insert_node.row_id_index.as_ref().map(|index| *index as _), insert_node.returning, + insert_node.session_id, ))) } } @@ -348,6 +352,7 @@ mod tests { vec![], row_id_index, false, + 0, )); let handle = tokio::spawn(async move { let mut stream = insert_executor.execute(); diff --git a/src/batch/src/executor/update.rs b/src/batch/src/executor/update.rs index b0e1b4750cfc..1706a5f5cba7 100644 --- a/src/batch/src/executor/update.rs +++ b/src/batch/src/executor/update.rs @@ -49,6 +49,7 @@ pub struct UpdateExecutor { returning: bool, txn_id: TxnId, update_column_indices: Vec, + session_id: u32, } impl UpdateExecutor { @@ -63,6 +64,7 @@ impl UpdateExecutor { identity: String, returning: bool, update_column_indices: Vec, + session_id: u32, ) -> Self { let chunk_size = chunk_size.next_multiple_of(2); let table_schema = child.schema().clone(); @@ -86,6 +88,7 @@ impl UpdateExecutor { returning, txn_id, update_column_indices, + session_id, } } } @@ -134,7 +137,7 @@ impl UpdateExecutor { let mut builder = DataChunkBuilder::new(data_types, self.chunk_size); let mut write_handle: risingwave_source::WriteHandle = - table_dml_handle.write_handle(self.txn_id)?; + table_dml_handle.write_handle(self.session_id, self.txn_id)?; write_handle.begin()?; // Transform the data chunk to a stream chunk, then write to the source. @@ -246,6 +249,7 @@ impl BoxedExecutorBuilder for UpdateExecutor { source.plan_node().get_identity().clone(), update_node.returning, update_column_indices, + update_node.session_id, ))) } } @@ -321,6 +325,7 @@ mod tests { "UpdateExecutor".to_string(), false, vec![0, 1], + 0, )); let handle = tokio::spawn(async move { diff --git a/src/compute/tests/integration_tests.rs b/src/compute/tests/integration_tests.rs index 60cc689616c6..490e90d17401 100644 --- a/src/compute/tests/integration_tests.rs +++ b/src/compute/tests/integration_tests.rs @@ -245,6 +245,7 @@ async fn test_table_materialize() -> StreamResult<()> { vec![], Some(row_id_index), false, + 0, )); let value_indices = (0..column_descs.len()).collect_vec(); @@ -366,6 +367,7 @@ async fn test_table_materialize() -> StreamResult<()> { 1024, "DeleteExecutor".to_string(), false, + 0, )); curr_epoch += 1; diff --git a/src/frontend/src/optimizer/plan_node/batch_delete.rs b/src/frontend/src/optimizer/plan_node/batch_delete.rs index c960eb1d83c9..d1fc6f1947d1 100644 --- a/src/frontend/src/optimizer/plan_node/batch_delete.rs +++ b/src/frontend/src/optimizer/plan_node/batch_delete.rs @@ -71,6 +71,7 @@ impl ToBatchPb for BatchDelete { table_id: self.core.table_id.table_id(), table_version_id: self.core.table_version_id, returning: self.core.returning, + session_id: self.base.ctx().session_ctx().session_id().0 as u32, }) } } diff --git a/src/frontend/src/optimizer/plan_node/batch_insert.rs b/src/frontend/src/optimizer/plan_node/batch_insert.rs index 4a280471fe19..caf7c449358b 100644 --- a/src/frontend/src/optimizer/plan_node/batch_insert.rs +++ b/src/frontend/src/optimizer/plan_node/batch_insert.rs @@ -101,6 +101,7 @@ impl ToBatchPb for BatchInsert { }, row_id_index: self.core.row_id_index.map(|index| index as _), returning: self.core.returning, + session_id: self.base.ctx().session_ctx().session_id().0 as u32, }) } } diff --git a/src/frontend/src/optimizer/plan_node/batch_update.rs b/src/frontend/src/optimizer/plan_node/batch_update.rs index 5b3a6a8739fc..b2e7e1913fb3 100644 --- a/src/frontend/src/optimizer/plan_node/batch_update.rs +++ b/src/frontend/src/optimizer/plan_node/batch_update.rs @@ -84,6 +84,7 @@ impl ToBatchPb for BatchUpdate { table_version_id: self.core.table_version_id, returning: self.core.returning, update_column_indices, + session_id: self.base.ctx().session_ctx().session_id().0 as u32, }) } } diff --git a/src/source/src/dml_manager.rs b/src/source/src/dml_manager.rs index af34003f4bde..b4b03f9798c5 100644 --- a/src/source/src/dml_manager.rs +++ b/src/source/src/dml_manager.rs @@ -180,6 +180,7 @@ mod tests { use super::*; const TEST_TRANSACTION_ID: TxnId = 0; + const TEST_SESSION_ID: u32 = 0; #[tokio::test] async fn test_register_and_drop() { @@ -206,7 +207,9 @@ mod tests { let table_dml_handle = dml_manager .table_dml_handle(table_id, table_version_id) .unwrap(); - let mut write_handle = table_dml_handle.write_handle(TEST_TRANSACTION_ID).unwrap(); + let mut write_handle = table_dml_handle + .write_handle(TEST_SESSION_ID, TEST_TRANSACTION_ID) + .unwrap(); write_handle.begin().unwrap(); // Should be able to write to the table. @@ -219,7 +222,9 @@ mod tests { write_handle.write_chunk(chunk()).await.unwrap_err(); // Unless we create a new write handle. - let mut write_handle = table_dml_handle.write_handle(TEST_TRANSACTION_ID).unwrap(); + let mut write_handle = table_dml_handle + .write_handle(TEST_SESSION_ID, TEST_TRANSACTION_ID) + .unwrap(); write_handle.begin().unwrap(); write_handle.write_chunk(chunk()).await.unwrap(); @@ -254,7 +259,9 @@ mod tests { let table_dml_handle = dml_manager .table_dml_handle(table_id, old_version_id) .unwrap(); - let mut write_handle = table_dml_handle.write_handle(TEST_TRANSACTION_ID).unwrap(); + let mut write_handle = table_dml_handle + .write_handle(TEST_SESSION_ID, TEST_TRANSACTION_ID) + .unwrap(); write_handle.begin().unwrap(); // Should be able to write to the table. @@ -278,7 +285,9 @@ mod tests { let table_dml_handle = dml_manager .table_dml_handle(table_id, new_version_id) .unwrap(); - let mut write_handle = table_dml_handle.write_handle(TEST_TRANSACTION_ID).unwrap(); + let mut write_handle = table_dml_handle + .write_handle(TEST_SESSION_ID, TEST_TRANSACTION_ID) + .unwrap(); write_handle.begin().unwrap(); write_handle.write_chunk(new_chunk()).await.unwrap(); } diff --git a/src/source/src/table.rs b/src/source/src/table.rs index 503807283f46..972611238cd4 100644 --- a/src/source/src/table.rs +++ b/src/source/src/table.rs @@ -78,7 +78,7 @@ impl TableDmlHandle { TableStreamReader { rx } } - pub fn write_handle(&self, txn_id: TxnId) -> Result { + pub fn write_handle(&self, session_id: u32, txn_id: TxnId) -> Result { // The `changes_txs` should not be empty normally, since we ensured that the channels // between the `TableDmlHandle` and the `SourceExecutor`s are ready before we making the // table catalog visible to the users. However, when we're recovering, it's possible @@ -94,9 +94,10 @@ impl TableDmlHandle { ))); } let len = guard.changes_txs.len(); + // used session id instead of txn_id to choose channel so that we can preserve transaction order in the same session. let sender = guard .changes_txs - .get((txn_id % len as u64) as usize) + .get((session_id % len as u32) as usize) .context("no available table reader in streaming source executors")? .clone(); @@ -298,6 +299,7 @@ mod tests { use super::*; const TEST_TRANSACTION_ID: TxnId = 0; + const TEST_SESSION_ID: u32 = 0; fn new_table_dml_handle() -> TableDmlHandle { TableDmlHandle::new( @@ -310,7 +312,9 @@ mod tests { async fn test_table_dml_handle() -> Result<()> { let table_dml_handle = Arc::new(new_table_dml_handle()); let mut reader = table_dml_handle.stream_reader().into_stream(); - let mut write_handle = table_dml_handle.write_handle(TEST_TRANSACTION_ID).unwrap(); + let mut write_handle = table_dml_handle + .write_handle(TEST_SESSION_ID, TEST_TRANSACTION_ID) + .unwrap(); write_handle.begin().unwrap(); assert_matches!(reader.next().await.unwrap()?, TxnMsg::Begin(_)); @@ -354,7 +358,9 @@ mod tests { async fn test_write_handle_rollback_on_drop() -> Result<()> { let table_dml_handle = Arc::new(new_table_dml_handle()); let mut reader = table_dml_handle.stream_reader().into_stream(); - let mut write_handle = table_dml_handle.write_handle(TEST_TRANSACTION_ID).unwrap(); + let mut write_handle = table_dml_handle + .write_handle(TEST_SESSION_ID, TEST_TRANSACTION_ID) + .unwrap(); write_handle.begin().unwrap(); assert_matches!(reader.next().await.unwrap()?, TxnMsg::Begin(_)); diff --git a/src/stream/src/executor/dml.rs b/src/stream/src/executor/dml.rs index 99f0ccbe67be..7e0b50c51a52 100644 --- a/src/stream/src/executor/dml.rs +++ b/src/stream/src/executor/dml.rs @@ -305,6 +305,7 @@ mod tests { use crate::executor::test_utils::MockSource; const TEST_TRANSACTION_ID: TxnId = 0; + const TEST_SESSION_ID: u32 = 0; #[tokio::test] async fn test_dml_executor() { @@ -374,7 +375,9 @@ mod tests { let table_dml_handle = dml_manager .table_dml_handle(table_id, INITIAL_TABLE_VERSION_ID) .unwrap(); - let mut write_handle = table_dml_handle.write_handle(TEST_TRANSACTION_ID).unwrap(); + let mut write_handle = table_dml_handle + .write_handle(TEST_SESSION_ID, TEST_TRANSACTION_ID) + .unwrap(); // Message from batch write_handle.begin().unwrap(); diff --git a/src/stream/src/executor/stream_reader.rs b/src/stream/src/executor/stream_reader.rs index c8d84926bd6a..de490f730dea 100644 --- a/src/stream/src/executor/stream_reader.rs +++ b/src/stream/src/executor/stream_reader.rs @@ -152,6 +152,7 @@ mod tests { const TEST_TRANSACTION_ID1: TxnId = 0; const TEST_TRANSACTION_ID2: TxnId = 1; + const TEST_SESSION_ID: u32 = 0; const TEST_DML_CHANNEL_INIT_PERMITS: usize = 32768; #[tokio::test] @@ -162,8 +163,12 @@ mod tests { let source_stream = table_dml_handle.stream_reader().into_data_stream_for_test(); - let mut write_handle1 = table_dml_handle.write_handle(TEST_TRANSACTION_ID1).unwrap(); - let mut write_handle2 = table_dml_handle.write_handle(TEST_TRANSACTION_ID2).unwrap(); + let mut write_handle1 = table_dml_handle + .write_handle(TEST_SESSION_ID, TEST_TRANSACTION_ID1) + .unwrap(); + let mut write_handle2 = table_dml_handle + .write_handle(TEST_SESSION_ID, TEST_TRANSACTION_ID2) + .unwrap(); let barrier_stream = barrier_to_message_stream(barrier_rx).boxed(); let stream =