Skip to content

Commit

Permalink
pass session_id to insert/delete/upadte and decide txn channel based …
Browse files Browse the repository at this point in the history
…on session_id
  • Loading branch information
chenzl25 committed Jan 5, 2024
1 parent df86b13 commit 3afd98a
Show file tree
Hide file tree
Showing 13 changed files with 86 additions and 14 deletions.
20 changes: 20 additions & 0 deletions e2e_test/batch/transaction/same_session.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
statement ok
create table t (id int primary key);

# we don't use flush and test in the same session whether delete and insert overlap.
statement ok
delete from t;

statement ok
insert into t select i from generate_series(1, 100, 1) i;

sleep 2s

# Should be no overlap
query I
select count(*) from t;
----
100

statement ok
drop table t;
9 changes: 9 additions & 0 deletions proto/batch_plan.proto
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,9 @@ message InsertNode {
// be filled in streaming.
optional uint32 row_id_index = 3;
bool returning = 4;

// Session id is used to ensure that dml data from the same session should be sent to a fixed worker node and channel.
uint32 session_id = 7;
}

message DeleteNode {
Expand All @@ -96,6 +99,9 @@ message DeleteNode {
// Version of the table.
uint64 table_version_id = 3;
bool returning = 2;

// Session id is used to ensure that dml data from the same session should be sent to a fixed worker node and channel.
uint32 session_id = 4;
}

message UpdateNode {
Expand All @@ -107,6 +113,9 @@ message UpdateNode {
bool returning = 3;
// The columns indices in the input schema, representing the columns need to send to streamDML exeuctor.
repeated uint32 update_column_indices = 5;

// Session id is used to ensure that dml data from the same session should be sent to a fixed worker node and channel.
uint32 session_id = 6;
}

message ValuesNode {
Expand Down
7 changes: 6 additions & 1 deletion src/batch/src/executor/delete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ pub struct DeleteExecutor {
identity: String,
returning: bool,
txn_id: TxnId,
session_id: u32,
}

impl DeleteExecutor {
Expand All @@ -55,6 +56,7 @@ impl DeleteExecutor {
chunk_size: usize,
identity: String,
returning: bool,
session_id: u32,
) -> Self {
let table_schema = child.schema().clone();
let txn_id = dml_manager.gen_txn_id();
Expand All @@ -74,6 +76,7 @@ impl DeleteExecutor {
identity,
returning,
txn_id,
session_id,
}
}
}
Expand Down Expand Up @@ -110,7 +113,7 @@ impl DeleteExecutor {
self.child.schema().data_types(),
"bad delete schema"
);
let mut write_handle = table_dml_handle.write_handle(self.txn_id)?;
let mut write_handle = table_dml_handle.write_handle(self.session_id, self.txn_id)?;

write_handle.begin()?;

Expand Down Expand Up @@ -182,6 +185,7 @@ impl BoxedExecutorBuilder for DeleteExecutor {
source.context.get_config().developer.chunk_size,
source.plan_node().get_identity().clone(),
delete_node.returning,
delete_node.session_id,
)))
}
}
Expand Down Expand Up @@ -247,6 +251,7 @@ mod tests {
1024,
"DeleteExecutor".to_string(),
false,
0,
));

let handle = tokio::spawn(async move {
Expand Down
7 changes: 6 additions & 1 deletion src/batch/src/executor/insert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ pub struct InsertExecutor {
row_id_index: Option<usize>,
returning: bool,
txn_id: TxnId,
session_id: u32,
}

impl InsertExecutor {
Expand All @@ -67,6 +68,7 @@ impl InsertExecutor {
sorted_default_columns: Vec<(usize, BoxedExpression)>,
row_id_index: Option<usize>,
returning: bool,
session_id: u32,
) -> Self {
let table_schema = child.schema().clone();
let txn_id = dml_manager.gen_txn_id();
Expand All @@ -89,6 +91,7 @@ impl InsertExecutor {
row_id_index,
returning,
txn_id,
session_id,
}
}
}
Expand Down Expand Up @@ -116,7 +119,7 @@ impl InsertExecutor {
let table_dml_handle = self
.dml_manager
.table_dml_handle(self.table_id, self.table_version_id)?;
let mut write_handle = table_dml_handle.write_handle(self.txn_id)?;
let mut write_handle = table_dml_handle.write_handle(self.session_id, self.txn_id)?;

write_handle.begin()?;

Expand Down Expand Up @@ -253,6 +256,7 @@ impl BoxedExecutorBuilder for InsertExecutor {
sorted_default_columns,
insert_node.row_id_index.as_ref().map(|index| *index as _),
insert_node.returning,
insert_node.session_id,
)))
}
}
Expand Down Expand Up @@ -348,6 +352,7 @@ mod tests {
vec![],
row_id_index,
false,
0,
));
let handle = tokio::spawn(async move {
let mut stream = insert_executor.execute();
Expand Down
7 changes: 6 additions & 1 deletion src/batch/src/executor/update.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ pub struct UpdateExecutor {
returning: bool,
txn_id: TxnId,
update_column_indices: Vec<usize>,
session_id: u32,
}

impl UpdateExecutor {
Expand All @@ -63,6 +64,7 @@ impl UpdateExecutor {
identity: String,
returning: bool,
update_column_indices: Vec<usize>,
session_id: u32,
) -> Self {
let chunk_size = chunk_size.next_multiple_of(2);
let table_schema = child.schema().clone();
Expand All @@ -86,6 +88,7 @@ impl UpdateExecutor {
returning,
txn_id,
update_column_indices,
session_id,
}
}
}
Expand Down Expand Up @@ -134,7 +137,7 @@ impl UpdateExecutor {
let mut builder = DataChunkBuilder::new(data_types, self.chunk_size);

let mut write_handle: risingwave_source::WriteHandle =
table_dml_handle.write_handle(self.txn_id)?;
table_dml_handle.write_handle(self.session_id, self.txn_id)?;
write_handle.begin()?;

// Transform the data chunk to a stream chunk, then write to the source.
Expand Down Expand Up @@ -246,6 +249,7 @@ impl BoxedExecutorBuilder for UpdateExecutor {
source.plan_node().get_identity().clone(),
update_node.returning,
update_column_indices,
update_node.session_id,
)))
}
}
Expand Down Expand Up @@ -321,6 +325,7 @@ mod tests {
"UpdateExecutor".to_string(),
false,
vec![0, 1],
0,
));

let handle = tokio::spawn(async move {
Expand Down
2 changes: 2 additions & 0 deletions src/compute/tests/integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,7 @@ async fn test_table_materialize() -> StreamResult<()> {
vec![],
Some(row_id_index),
false,
0,
));

let value_indices = (0..column_descs.len()).collect_vec();
Expand Down Expand Up @@ -366,6 +367,7 @@ async fn test_table_materialize() -> StreamResult<()> {
1024,
"DeleteExecutor".to_string(),
false,
0,
));

curr_epoch += 1;
Expand Down
1 change: 1 addition & 0 deletions src/frontend/src/optimizer/plan_node/batch_delete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ impl ToBatchPb for BatchDelete {
table_id: self.core.table_id.table_id(),
table_version_id: self.core.table_version_id,
returning: self.core.returning,
session_id: self.base.ctx().session_ctx().session_id().0 as u32,
})
}
}
Expand Down
1 change: 1 addition & 0 deletions src/frontend/src/optimizer/plan_node/batch_insert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ impl ToBatchPb for BatchInsert {
},
row_id_index: self.core.row_id_index.map(|index| index as _),
returning: self.core.returning,
session_id: self.base.ctx().session_ctx().session_id().0 as u32,
})
}
}
Expand Down
1 change: 1 addition & 0 deletions src/frontend/src/optimizer/plan_node/batch_update.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ impl ToBatchPb for BatchUpdate {
table_version_id: self.core.table_version_id,
returning: self.core.returning,
update_column_indices,
session_id: self.base.ctx().session_ctx().session_id().0 as u32,
})
}
}
Expand Down
17 changes: 13 additions & 4 deletions src/source/src/dml_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,7 @@ mod tests {
use super::*;

const TEST_TRANSACTION_ID: TxnId = 0;
const TEST_SESSION_ID: u32 = 0;

#[tokio::test]
async fn test_register_and_drop() {
Expand All @@ -206,7 +207,9 @@ mod tests {
let table_dml_handle = dml_manager
.table_dml_handle(table_id, table_version_id)
.unwrap();
let mut write_handle = table_dml_handle.write_handle(TEST_TRANSACTION_ID).unwrap();
let mut write_handle = table_dml_handle
.write_handle(TEST_SESSION_ID, TEST_TRANSACTION_ID)
.unwrap();
write_handle.begin().unwrap();

// Should be able to write to the table.
Expand All @@ -219,7 +222,9 @@ mod tests {
write_handle.write_chunk(chunk()).await.unwrap_err();

// Unless we create a new write handle.
let mut write_handle = table_dml_handle.write_handle(TEST_TRANSACTION_ID).unwrap();
let mut write_handle = table_dml_handle
.write_handle(TEST_SESSION_ID, TEST_TRANSACTION_ID)
.unwrap();
write_handle.begin().unwrap();
write_handle.write_chunk(chunk()).await.unwrap();

Expand Down Expand Up @@ -254,7 +259,9 @@ mod tests {
let table_dml_handle = dml_manager
.table_dml_handle(table_id, old_version_id)
.unwrap();
let mut write_handle = table_dml_handle.write_handle(TEST_TRANSACTION_ID).unwrap();
let mut write_handle = table_dml_handle
.write_handle(TEST_SESSION_ID, TEST_TRANSACTION_ID)
.unwrap();
write_handle.begin().unwrap();

// Should be able to write to the table.
Expand All @@ -278,7 +285,9 @@ mod tests {
let table_dml_handle = dml_manager
.table_dml_handle(table_id, new_version_id)
.unwrap();
let mut write_handle = table_dml_handle.write_handle(TEST_TRANSACTION_ID).unwrap();
let mut write_handle = table_dml_handle
.write_handle(TEST_SESSION_ID, TEST_TRANSACTION_ID)
.unwrap();
write_handle.begin().unwrap();
write_handle.write_chunk(new_chunk()).await.unwrap();
}
Expand Down
14 changes: 10 additions & 4 deletions src/source/src/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ impl TableDmlHandle {
TableStreamReader { rx }
}

pub fn write_handle(&self, txn_id: TxnId) -> Result<WriteHandle> {
pub fn write_handle(&self, session_id: u32, txn_id: TxnId) -> Result<WriteHandle> {
// The `changes_txs` should not be empty normally, since we ensured that the channels
// between the `TableDmlHandle` and the `SourceExecutor`s are ready before we making the
// table catalog visible to the users. However, when we're recovering, it's possible
Expand All @@ -94,9 +94,10 @@ impl TableDmlHandle {
)));
}
let len = guard.changes_txs.len();
// used session id instead of txn_id to choose channel so that we can preserve transaction order in the same session.
let sender = guard
.changes_txs
.get((txn_id % len as u64) as usize)
.get((session_id % len as u32) as usize)
.context("no available table reader in streaming source executors")?
.clone();

Expand Down Expand Up @@ -298,6 +299,7 @@ mod tests {
use super::*;

const TEST_TRANSACTION_ID: TxnId = 0;
const TEST_SESSION_ID: u32 = 0;

fn new_table_dml_handle() -> TableDmlHandle {
TableDmlHandle::new(
Expand All @@ -310,7 +312,9 @@ mod tests {
async fn test_table_dml_handle() -> Result<()> {
let table_dml_handle = Arc::new(new_table_dml_handle());
let mut reader = table_dml_handle.stream_reader().into_stream();
let mut write_handle = table_dml_handle.write_handle(TEST_TRANSACTION_ID).unwrap();
let mut write_handle = table_dml_handle
.write_handle(TEST_SESSION_ID, TEST_TRANSACTION_ID)
.unwrap();
write_handle.begin().unwrap();

assert_matches!(reader.next().await.unwrap()?, TxnMsg::Begin(_));
Expand Down Expand Up @@ -354,7 +358,9 @@ mod tests {
async fn test_write_handle_rollback_on_drop() -> Result<()> {
let table_dml_handle = Arc::new(new_table_dml_handle());
let mut reader = table_dml_handle.stream_reader().into_stream();
let mut write_handle = table_dml_handle.write_handle(TEST_TRANSACTION_ID).unwrap();
let mut write_handle = table_dml_handle
.write_handle(TEST_SESSION_ID, TEST_TRANSACTION_ID)
.unwrap();
write_handle.begin().unwrap();

assert_matches!(reader.next().await.unwrap()?, TxnMsg::Begin(_));
Expand Down
5 changes: 4 additions & 1 deletion src/stream/src/executor/dml.rs
Original file line number Diff line number Diff line change
Expand Up @@ -305,6 +305,7 @@ mod tests {
use crate::executor::test_utils::MockSource;

const TEST_TRANSACTION_ID: TxnId = 0;
const TEST_SESSION_ID: u32 = 0;

#[tokio::test]
async fn test_dml_executor() {
Expand Down Expand Up @@ -374,7 +375,9 @@ mod tests {
let table_dml_handle = dml_manager
.table_dml_handle(table_id, INITIAL_TABLE_VERSION_ID)
.unwrap();
let mut write_handle = table_dml_handle.write_handle(TEST_TRANSACTION_ID).unwrap();
let mut write_handle = table_dml_handle
.write_handle(TEST_SESSION_ID, TEST_TRANSACTION_ID)
.unwrap();

// Message from batch
write_handle.begin().unwrap();
Expand Down
9 changes: 7 additions & 2 deletions src/stream/src/executor/stream_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@ mod tests {

const TEST_TRANSACTION_ID1: TxnId = 0;
const TEST_TRANSACTION_ID2: TxnId = 1;
const TEST_SESSION_ID: u32 = 0;
const TEST_DML_CHANNEL_INIT_PERMITS: usize = 32768;

#[tokio::test]
Expand All @@ -162,8 +163,12 @@ mod tests {

let source_stream = table_dml_handle.stream_reader().into_data_stream_for_test();

let mut write_handle1 = table_dml_handle.write_handle(TEST_TRANSACTION_ID1).unwrap();
let mut write_handle2 = table_dml_handle.write_handle(TEST_TRANSACTION_ID2).unwrap();
let mut write_handle1 = table_dml_handle
.write_handle(TEST_SESSION_ID, TEST_TRANSACTION_ID1)
.unwrap();
let mut write_handle2 = table_dml_handle
.write_handle(TEST_SESSION_ID, TEST_TRANSACTION_ID2)
.unwrap();

let barrier_stream = barrier_to_message_stream(barrier_rx).boxed();
let stream =
Expand Down

0 comments on commit 3afd98a

Please sign in to comment.