Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(dml): sent dml data from the same session to a fixed worker node/channel #14380

Merged
merged 7 commits into from
Jan 8, 2024
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions e2e_test/batch/transaction/same_session.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
statement ok
create table t (id int primary key);

statement ok
insert into t select i from generate_series(1, 100, 1) i;

statement ok
flush

# we don't use flush between delete and insert to test in the same session whether delete and insert overlap.
statement ok
delete from t;

statement ok
insert into t select i from generate_series(1, 100, 1) i;

statement ok
flush

# Should be no overlap
query I
select count(*) from t;
----
100

statement ok
drop table t;
9 changes: 9 additions & 0 deletions proto/batch_plan.proto
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,9 @@ message InsertNode {
// be filled in streaming.
optional uint32 row_id_index = 3;
bool returning = 4;

// Session id is used to ensure that dml data from the same session should be sent to a fixed worker node and channel.
uint32 session_id = 7;
}

message DeleteNode {
Expand All @@ -96,6 +99,9 @@ message DeleteNode {
// Version of the table.
uint64 table_version_id = 3;
bool returning = 2;

// Session id is used to ensure that dml data from the same session should be sent to a fixed worker node and channel.
uint32 session_id = 4;
}

message UpdateNode {
Expand All @@ -107,6 +113,9 @@ message UpdateNode {
bool returning = 3;
// The columns indices in the input schema, representing the columns need to send to streamDML exeuctor.
repeated uint32 update_column_indices = 5;

// Session id is used to ensure that dml data from the same session should be sent to a fixed worker node and channel.
uint32 session_id = 6;
}

message ValuesNode {
Expand Down
7 changes: 6 additions & 1 deletion src/batch/src/executor/delete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ pub struct DeleteExecutor {
identity: String,
returning: bool,
txn_id: TxnId,
session_id: u32,
}

impl DeleteExecutor {
Expand All @@ -55,6 +56,7 @@ impl DeleteExecutor {
chunk_size: usize,
identity: String,
returning: bool,
session_id: u32,
) -> Self {
let table_schema = child.schema().clone();
let txn_id = dml_manager.gen_txn_id();
Expand All @@ -74,6 +76,7 @@ impl DeleteExecutor {
identity,
returning,
txn_id,
session_id,
}
}
}
Expand Down Expand Up @@ -110,7 +113,7 @@ impl DeleteExecutor {
self.child.schema().data_types(),
"bad delete schema"
);
let mut write_handle = table_dml_handle.write_handle(self.txn_id)?;
let mut write_handle = table_dml_handle.write_handle(self.session_id, self.txn_id)?;

write_handle.begin()?;

Expand Down Expand Up @@ -182,6 +185,7 @@ impl BoxedExecutorBuilder for DeleteExecutor {
source.context.get_config().developer.chunk_size,
source.plan_node().get_identity().clone(),
delete_node.returning,
delete_node.session_id,
)))
}
}
Expand Down Expand Up @@ -247,6 +251,7 @@ mod tests {
1024,
"DeleteExecutor".to_string(),
false,
0,
));

let handle = tokio::spawn(async move {
Expand Down
7 changes: 6 additions & 1 deletion src/batch/src/executor/insert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ pub struct InsertExecutor {
row_id_index: Option<usize>,
returning: bool,
txn_id: TxnId,
session_id: u32,
}

impl InsertExecutor {
Expand All @@ -67,6 +68,7 @@ impl InsertExecutor {
sorted_default_columns: Vec<(usize, BoxedExpression)>,
row_id_index: Option<usize>,
returning: bool,
session_id: u32,
) -> Self {
let table_schema = child.schema().clone();
let txn_id = dml_manager.gen_txn_id();
Expand All @@ -89,6 +91,7 @@ impl InsertExecutor {
row_id_index,
returning,
txn_id,
session_id,
}
}
}
Expand Down Expand Up @@ -116,7 +119,7 @@ impl InsertExecutor {
let table_dml_handle = self
.dml_manager
.table_dml_handle(self.table_id, self.table_version_id)?;
let mut write_handle = table_dml_handle.write_handle(self.txn_id)?;
let mut write_handle = table_dml_handle.write_handle(self.session_id, self.txn_id)?;

write_handle.begin()?;

Expand Down Expand Up @@ -253,6 +256,7 @@ impl BoxedExecutorBuilder for InsertExecutor {
sorted_default_columns,
insert_node.row_id_index.as_ref().map(|index| *index as _),
insert_node.returning,
insert_node.session_id,
)))
}
}
Expand Down Expand Up @@ -348,6 +352,7 @@ mod tests {
vec![],
row_id_index,
false,
0,
));
let handle = tokio::spawn(async move {
let mut stream = insert_executor.execute();
Expand Down
7 changes: 6 additions & 1 deletion src/batch/src/executor/update.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ pub struct UpdateExecutor {
returning: bool,
txn_id: TxnId,
update_column_indices: Vec<usize>,
session_id: u32,
}

impl UpdateExecutor {
Expand All @@ -63,6 +64,7 @@ impl UpdateExecutor {
identity: String,
returning: bool,
update_column_indices: Vec<usize>,
session_id: u32,
) -> Self {
let chunk_size = chunk_size.next_multiple_of(2);
let table_schema = child.schema().clone();
Expand All @@ -86,6 +88,7 @@ impl UpdateExecutor {
returning,
txn_id,
update_column_indices,
session_id,
}
}
}
Expand Down Expand Up @@ -134,7 +137,7 @@ impl UpdateExecutor {
let mut builder = DataChunkBuilder::new(data_types, self.chunk_size);

let mut write_handle: risingwave_source::WriteHandle =
table_dml_handle.write_handle(self.txn_id)?;
table_dml_handle.write_handle(self.session_id, self.txn_id)?;
write_handle.begin()?;

// Transform the data chunk to a stream chunk, then write to the source.
Expand Down Expand Up @@ -246,6 +249,7 @@ impl BoxedExecutorBuilder for UpdateExecutor {
source.plan_node().get_identity().clone(),
update_node.returning,
update_column_indices,
update_node.session_id,
)))
}
}
Expand Down Expand Up @@ -321,6 +325,7 @@ mod tests {
"UpdateExecutor".to_string(),
false,
vec![0, 1],
0,
));

let handle = tokio::spawn(async move {
Expand Down
2 changes: 2 additions & 0 deletions src/compute/tests/integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,7 @@ async fn test_table_materialize() -> StreamResult<()> {
vec![],
Some(row_id_index),
false,
0,
));

let value_indices = (0..column_descs.len()).collect_vec();
Expand Down Expand Up @@ -366,6 +367,7 @@ async fn test_table_materialize() -> StreamResult<()> {
1024,
"DeleteExecutor".to_string(),
false,
0,
));

curr_epoch += 1;
Expand Down
1 change: 1 addition & 0 deletions src/frontend/src/optimizer/plan_node/batch_delete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ impl ToBatchPb for BatchDelete {
table_id: self.core.table_id.table_id(),
table_version_id: self.core.table_version_id,
returning: self.core.returning,
session_id: self.base.ctx().session_ctx().session_id().0 as u32,
})
}
}
Expand Down
1 change: 1 addition & 0 deletions src/frontend/src/optimizer/plan_node/batch_insert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ impl ToBatchPb for BatchInsert {
},
row_id_index: self.core.row_id_index.map(|index| index as _),
returning: self.core.returning,
session_id: self.base.ctx().session_ctx().session_id().0 as u32,
})
}
}
Expand Down
1 change: 1 addition & 0 deletions src/frontend/src/optimizer/plan_node/batch_update.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ impl ToBatchPb for BatchUpdate {
table_version_id: self.core.table_version_id,
returning: self.core.returning,
update_column_indices,
session_id: self.base.ctx().session_ctx().session_id().0 as u32,
})
}
}
Expand Down
88 changes: 45 additions & 43 deletions src/frontend/src/scheduler/distributed/stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,12 @@ use std::rc::Rc;
use std::sync::Arc;
use std::time::Duration;

use anyhow::anyhow;
use anyhow::{anyhow, Context};
use arc_swap::ArcSwap;
use futures::stream::Fuse;
use futures::{stream, StreamExt, TryStreamExt};
use futures_async_stream::for_await;
use itertools::Itertools;
use rand::seq::SliceRandom;
use risingwave_batch::executor::ExecutorBuilder;
use risingwave_batch::task::{ShutdownMsg, ShutdownSender, ShutdownToken, TaskId as TaskIdBatch};
use risingwave_common::array::DataChunk;
Expand Down Expand Up @@ -698,51 +697,54 @@ impl StageRunner {
dml_table_id: Option<TableId>,
) -> SchedulerResult<Option<WorkerNode>> {
let plan_node = plan_fragment.root.as_ref().expect("fail to get plan node");
let vnode_mapping = match dml_table_id {
Some(table_id) => Some(self.get_table_dml_vnode_mapping(&table_id)?),
None => {
if let Some(distributed_lookup_join_node) =
Self::find_distributed_lookup_join_node(plan_node)
{
let fragment_id = self.get_fragment_id(
&distributed_lookup_join_node
.inner_side_table_desc
.as_ref()
.unwrap()
.table_id
.into(),
)?;
let id2pu_vec = self
.worker_node_manager
.fragment_mapping(fragment_id)?
.iter_unique()
.collect_vec();

let pu = id2pu_vec[task_id as usize];
let candidates = self
.worker_node_manager
.manager
.get_workers_by_parallel_unit_ids(&[pu])?;
return Ok(Some(candidates[0].clone()));
} else {
None
}
}
};

let worker_node = match vnode_mapping {
Some(mapping) => {
let parallel_unit_ids = mapping.iter_unique().collect_vec();
let candidates = self
.worker_node_manager
.manager
.get_workers_by_parallel_unit_ids(&parallel_unit_ids)?;
Some(candidates.choose(&mut rand::thread_rng()).unwrap().clone())
if let Some(table_id) = dml_table_id {
let vnode_mapping = self.get_table_dml_vnode_mapping(&table_id)?;
let parallel_unit_ids = vnode_mapping.iter_unique().collect_vec();
let candidates = self
.worker_node_manager
.manager
.get_workers_by_parallel_unit_ids(&parallel_unit_ids)?;
if candidates.is_empty() {
return Err(SchedulerError::EmptyWorkerNodes);
}
None => None,
return Ok(Some(
candidates
.get(self.stage.session_id.0 as usize % candidates.len())
.context("no available worker node for dml")?
chenzl25 marked this conversation as resolved.
Show resolved Hide resolved
.clone(),
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Choose a worker based on session id.

chenzl25 marked this conversation as resolved.
Show resolved Hide resolved
));
};

Ok(worker_node)
if let Some(distributed_lookup_join_node) =
Self::find_distributed_lookup_join_node(plan_node)
{
let fragment_id = self.get_fragment_id(
&distributed_lookup_join_node
.inner_side_table_desc
.as_ref()
.unwrap()
.table_id
.into(),
)?;
let id2pu_vec = self
.worker_node_manager
.fragment_mapping(fragment_id)?
.iter_unique()
.collect_vec();

let pu = id2pu_vec[task_id as usize];
let candidates = self
.worker_node_manager
.manager
.get_workers_by_parallel_unit_ids(&[pu])?;
if candidates.is_empty() {
return Err(SchedulerError::EmptyWorkerNodes);
}
Ok(Some(candidates[0].clone()))
} else {
Ok(None)
}
}

fn find_distributed_lookup_join_node(
Expand Down
11 changes: 8 additions & 3 deletions src/frontend/src/scheduler/local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,12 @@ use std::sync::atomic::{AtomicU32, Ordering};
use std::sync::Arc;
use std::time::Duration;

use anyhow::anyhow;
use anyhow::{anyhow, Context};
use futures::stream::BoxStream;
use futures::StreamExt;
use futures_async_stream::try_stream;
use itertools::Itertools;
use pgwire::pg_server::BoxedError;
use rand::seq::SliceRandom;
use risingwave_batch::executor::ExecutorBuilder;
use risingwave_batch::task::{ShutdownToken, TaskId};
use risingwave_common::array::DataChunk;
Expand Down Expand Up @@ -581,7 +580,13 @@ impl LocalQueryExecution {
.worker_node_manager
.manager
.get_workers_by_parallel_unit_ids(&parallel_unit_ids)?;
candidates.choose(&mut rand::thread_rng()).unwrap().clone()
if candidates.is_empty() {
return Err(SchedulerError::EmptyWorkerNodes);
}
candidates
.get(stage.session_id.0 as usize % candidates.len())
.context("no available worker node for dml")?
.clone()
chenzl25 marked this conversation as resolved.
Show resolved Hide resolved
};
Ok(vec![worker_node])
} else {
Expand Down
Loading
Loading