Skip to content

Commit

Permalink
feat(storage): per table try wait epoch (#18622)
Browse files Browse the repository at this point in the history
  • Loading branch information
wenym1 authored Sep 27, 2024
1 parent d6f8ca2 commit e9c2161
Show file tree
Hide file tree
Showing 45 changed files with 536 additions and 186 deletions.
7 changes: 6 additions & 1 deletion proto/common.proto
Original file line number Diff line number Diff line change
Expand Up @@ -105,9 +105,14 @@ message WorkerSlotMapping {
repeated uint64 data = 2;
}

message BatchQueryCommittedEpoch {
uint64 epoch = 1;
uint64 hummock_version_id = 2;
}

message BatchQueryEpoch {
oneof epoch {
uint64 committed = 1;
BatchQueryCommittedEpoch committed = 1;
uint64 current = 2;
uint64 backup = 3;
uint64 time_travel = 4;
Expand Down
1 change: 1 addition & 0 deletions proto/stream_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ message BarrierCompleteResponse {

message WaitEpochCommitRequest {
uint64 epoch = 1;
uint32 table_id = 2;
}

message WaitEpochCommitResponse {
Expand Down
22 changes: 19 additions & 3 deletions src/batch/src/executor/log_row_seq_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use risingwave_common::catalog::{ColumnId, Field, Schema};
use risingwave_common::hash::VnodeCountCompat;
use risingwave_common::row::{Row, RowExt};
use risingwave_common::types::ScalarImpl;
use risingwave_hummock_sdk::{HummockReadEpoch, HummockVersionId};
use risingwave_pb::batch_plan::plan_node::NodeBody;
use risingwave_pb::common::{batch_query_epoch, BatchQueryEpoch};
use risingwave_pb::plan_common::StorageTableDesc;
Expand All @@ -50,13 +51,15 @@ pub struct LogRowSeqScanExecutor<S: StateStore> {
table: StorageTable<S>,
old_epoch: u64,
new_epoch: u64,
version_id: HummockVersionId,
}

impl<S: StateStore> LogRowSeqScanExecutor<S> {
pub fn new(
table: StorageTable<S>,
old_epoch: u64,
new_epoch: u64,
version_id: HummockVersionId,
chunk_size: usize,
identity: String,
metrics: Option<BatchMetrics>,
Expand All @@ -74,6 +77,7 @@ impl<S: StateStore> LogRowSeqScanExecutor<S> {
table,
old_epoch,
new_epoch,
version_id,
}
}
}
Expand Down Expand Up @@ -127,12 +131,18 @@ impl BoxedExecutorBuilder for LogStoreRowSeqScanExecutorBuilder {
unreachable!("invalid new epoch: {:?}", log_store_seq_scan_node.new_epoch)
};

assert_eq!(old_epoch.hummock_version_id, new_epoch.hummock_version_id);
let version_id = old_epoch.hummock_version_id;
let old_epoch = old_epoch.epoch;
let new_epoch = new_epoch.epoch;

dispatch_state_store!(source.context().state_store(), state_store, {
let table = StorageTable::new_partial(state_store, column_ids, vnodes, table_desc);
Ok(Box::new(LogRowSeqScanExecutor::new(
table,
*old_epoch,
*new_epoch,
old_epoch,
new_epoch,
HummockVersionId::new(version_id),
chunk_size as usize,
source.plan_node().get_identity().clone(),
metrics,
Expand Down Expand Up @@ -163,6 +173,7 @@ impl<S: StateStore> LogRowSeqScanExecutor<S> {
table,
old_epoch,
new_epoch,
version_id,
schema,
..
} = *self;
Expand All @@ -179,6 +190,7 @@ impl<S: StateStore> LogRowSeqScanExecutor<S> {
table.clone(),
old_epoch,
new_epoch,
version_id,
chunk_size,
histogram,
Arc::new(schema.clone()),
Expand All @@ -195,13 +207,17 @@ impl<S: StateStore> LogRowSeqScanExecutor<S> {
table: Arc<StorageTable<S>>,
old_epoch: u64,
new_epoch: u64,
version_id: HummockVersionId,
chunk_size: usize,
histogram: Option<impl Deref<Target = Histogram>>,
schema: Arc<Schema>,
) {
// Range Scan.
let iter = table
.batch_iter_log_with_pk_bounds(old_epoch, new_epoch)
.batch_iter_log_with_pk_bounds(
old_epoch,
HummockReadEpoch::BatchQueryCommitted(new_epoch, version_id),
)
.await?
.flat_map(|r| {
futures::stream::iter(std::iter::from_coroutine(
Expand Down
4 changes: 2 additions & 2 deletions src/batch/src/executor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,7 @@ impl<'a, C: BatchTaskContext> ExecutorBuilder<'a, C> {

#[cfg(test)]
mod tests {
use risingwave_hummock_sdk::to_committed_batch_query_epoch;
use risingwave_hummock_sdk::test_batch_query_epoch;
use risingwave_pb::batch_plan::PlanNode;

use crate::executor::ExecutorBuilder;
Expand All @@ -278,7 +278,7 @@ mod tests {
&plan_node,
task_id,
ComputeNodeContext::for_test(),
to_committed_batch_query_epoch(u64::MAX),
test_batch_query_epoch(),
ShutdownToken::empty(),
);
let child_plan = &PlanNode::default();
Expand Down
4 changes: 2 additions & 2 deletions src/batch/src/task/task_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,12 +137,12 @@ impl BatchManager {
tid: &PbTaskId,
plan: PlanFragment,
) -> Result<()> {
use risingwave_hummock_sdk::to_committed_batch_query_epoch;
use risingwave_hummock_sdk::test_batch_query_epoch;

self.fire_task(
tid,
plan,
to_committed_batch_query_epoch(0),
test_batch_query_epoch(),
ComputeNodeContext::for_test(),
StateReporter::new_with_test(),
TracingContext::none(),
Expand Down
11 changes: 9 additions & 2 deletions src/compute/src/rpc/service/stream_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use futures::{Stream, StreamExt, TryStreamExt};
use risingwave_pb::stream_service::stream_service_server::StreamService;
use risingwave_pb::stream_service::*;
use risingwave_storage::dispatch_state_store;
use risingwave_storage::store::TryWaitEpochOptions;
use risingwave_stream::error::StreamError;
use risingwave_stream::task::{LocalStreamManager, StreamEnvironment};
use tokio::sync::mpsc::unbounded_channel;
Expand Down Expand Up @@ -45,14 +46,20 @@ impl StreamService for StreamServiceImpl {
&self,
request: Request<WaitEpochCommitRequest>,
) -> Result<Response<WaitEpochCommitResponse>, Status> {
let epoch = request.into_inner().epoch;
let request = request.into_inner();
let epoch = request.epoch;

dispatch_state_store!(self.env.state_store(), store, {
use risingwave_hummock_sdk::HummockReadEpoch;
use risingwave_storage::StateStore;

store
.try_wait_epoch(HummockReadEpoch::Committed(epoch))
.try_wait_epoch(
HummockReadEpoch::Committed(epoch),
TryWaitEpochOptions {
table_id: request.table_id.into(),
},
)
.instrument_await(format!("wait_epoch_commit (epoch {})", epoch))
.await
.map_err(StreamError::from)?;
Expand Down
4 changes: 2 additions & 2 deletions src/compute/tests/cdc_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ use risingwave_connector::source::cdc::external::{
};
use risingwave_connector::source::cdc::DebeziumCdcSplit;
use risingwave_connector::source::SplitImpl;
use risingwave_hummock_sdk::to_committed_batch_query_epoch;
use risingwave_hummock_sdk::test_batch_query_epoch;
use risingwave_storage::memory::MemoryStateStore;
use risingwave_storage::table::batch_table::storage_table::StorageTable;
use risingwave_stream::common::table::state_table::StateTable;
Expand Down Expand Up @@ -384,7 +384,7 @@ async fn test_cdc_backfill() -> StreamResult<()> {
table.clone(),
vec![ScanRange::full()],
true,
to_committed_batch_query_epoch(u64::MAX),
test_batch_query_epoch(),
1024,
"RowSeqExecutor2".to_string(),
None,
Expand Down
10 changes: 5 additions & 5 deletions src/compute/tests/integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ use risingwave_common::util::iter_util::ZipEqFast;
use risingwave_common::util::sort_util::{ColumnOrder, OrderType};
use risingwave_connector::source::reader::desc::test_utils::create_source_desc_builder;
use risingwave_dml::dml_manager::DmlManager;
use risingwave_hummock_sdk::to_committed_batch_query_epoch;
use risingwave_hummock_sdk::test_batch_query_epoch;
use risingwave_pb::catalog::StreamSourceInfo;
use risingwave_pb::plan_common::PbRowFormatType;
use risingwave_storage::memory::MemoryStateStore;
Expand Down Expand Up @@ -263,7 +263,7 @@ async fn test_table_materialize() -> StreamResult<()> {
table.clone(),
vec![ScanRange::full()],
true,
to_committed_batch_query_epoch(u64::MAX),
test_batch_query_epoch(),
1024,
"RowSeqExecutor2".to_string(),
None,
Expand Down Expand Up @@ -334,7 +334,7 @@ async fn test_table_materialize() -> StreamResult<()> {
table.clone(),
vec![ScanRange::full()],
true,
to_committed_batch_query_epoch(u64::MAX),
test_batch_query_epoch(),
1024,
"RowSeqScanExecutor2".to_string(),
None,
Expand Down Expand Up @@ -414,7 +414,7 @@ async fn test_table_materialize() -> StreamResult<()> {
table,
vec![ScanRange::full()],
true,
to_committed_batch_query_epoch(u64::MAX),
test_batch_query_epoch(),
1024,
"RowSeqScanExecutor2".to_string(),
None,
Expand Down Expand Up @@ -490,7 +490,7 @@ async fn test_row_seq_scan() -> StreamResult<()> {
table,
vec![ScanRange::full()],
true,
to_committed_batch_query_epoch(u64::MAX),
test_batch_query_epoch(),
1,
"RowSeqScanExecutor2".to_string(),
None,
Expand Down
12 changes: 9 additions & 3 deletions src/frontend/src/optimizer/plan_node/batch_log_seq_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
use pretty_xmlish::{Pretty, XmlNode};
use risingwave_pb::batch_plan::plan_node::NodeBody;
use risingwave_pb::batch_plan::LogRowSeqScanNode;
use risingwave_pb::common::BatchQueryEpoch;
use risingwave_pb::common::{BatchQueryCommittedEpoch, BatchQueryEpoch};

use super::batch::prelude::*;
use super::utils::{childless_record, Distill};
Expand Down Expand Up @@ -112,12 +112,18 @@ impl TryToBatchPb for BatchLogSeqScan {
vnode_bitmap: None,
old_epoch: Some(BatchQueryEpoch {
epoch: Some(risingwave_pb::common::batch_query_epoch::Epoch::Committed(
self.core.old_epoch,
BatchQueryCommittedEpoch {
epoch: self.core.old_epoch,
hummock_version_id: 0,
},
)),
}),
new_epoch: Some(BatchQueryEpoch {
epoch: Some(risingwave_pb::common::batch_query_epoch::Epoch::Committed(
self.core.new_epoch,
BatchQueryCommittedEpoch {
epoch: self.core.new_epoch,
hummock_version_id: 0,
},
)),
}),
}))
Expand Down
4 changes: 4 additions & 0 deletions src/frontend/src/optimizer/plan_node/generic/log_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use pretty_xmlish::Pretty;
use risingwave_common::catalog::{Field, Schema, TableDesc};
use risingwave_common::types::DataType;
use risingwave_common::util::sort_util::ColumnOrder;
use risingwave_hummock_sdk::HummockVersionId;

use crate::catalog::ColumnId;
use crate::optimizer::optimizer_context::OptimizerContextRef;
Expand All @@ -44,6 +45,7 @@ pub struct LogScan {

pub old_epoch: u64,
pub new_epoch: u64,
pub version_id: HummockVersionId,
}

impl LogScan {
Expand Down Expand Up @@ -101,6 +103,7 @@ impl LogScan {
ctx: OptimizerContextRef,
old_epoch: u64,
new_epoch: u64,
version_id: HummockVersionId,
) -> Self {
Self {
table_name,
Expand All @@ -110,6 +113,7 @@ impl LogScan {
ctx,
old_epoch,
new_epoch,
version_id,
}
}

Expand Down
7 changes: 5 additions & 2 deletions src/frontend/src/scheduler/snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use risingwave_hummock_sdk::version::HummockVersionStateTableInfo;
use risingwave_hummock_sdk::{
FrontendHummockVersion, FrontendHummockVersionDelta, HummockVersionId, INVALID_VERSION_ID,
};
use risingwave_pb::common::{batch_query_epoch, BatchQueryEpoch};
use risingwave_pb::common::{batch_query_epoch, BatchQueryCommittedEpoch, BatchQueryEpoch};
use risingwave_pb::hummock::{HummockVersionDeltas, StateTableInfoDelta};
use tokio::sync::watch;

Expand Down Expand Up @@ -55,7 +55,10 @@ impl ReadSnapshot {
Ok(match self {
ReadSnapshot::FrontendPinned { snapshot } => BatchQueryEpoch {
epoch: Some(batch_query_epoch::Epoch::Committed(
snapshot.batch_query_epoch(read_storage_tables)?.0,
BatchQueryCommittedEpoch {
epoch: snapshot.batch_query_epoch(read_storage_tables)?.0,
hummock_version_id: snapshot.value.id.to_u64(),
},
)),
},
ReadSnapshot::ReadUncommitted => BatchQueryEpoch {
Expand Down
16 changes: 16 additions & 0 deletions src/frontend/src/session/cursor_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ use risingwave_common::catalog::Field;
use risingwave_common::error::BoxedError;
use risingwave_common::session_config::QueryMode;
use risingwave_common::types::DataType;
use risingwave_hummock_sdk::HummockVersionId;
use risingwave_sqlparser::ast::{Ident, ObjectName, Statement};

use super::SessionImpl;
Expand Down Expand Up @@ -523,6 +524,18 @@ impl SubscriptionCursor {
let init_query_timer = Instant::now();
let (chunk_stream, fields) = if let Some(rw_timestamp) = rw_timestamp {
let context = OptimizerContext::from_handler_args(handle_args);
let version_id = {
let version = session.env.hummock_snapshot_manager.acquire();
let version = version.version();
if !version
.state_table_info
.info()
.contains_key(dependent_table_id)
{
return Err(anyhow!("table id {dependent_table_id} has been dropped").into());
}
version.id
};
let plan_fragmenter_result = gen_batch_plan_fragmenter(
&session,
Self::create_batch_plan_for_cursor(
Expand All @@ -531,6 +544,7 @@ impl SubscriptionCursor {
context.into(),
rw_timestamp,
rw_timestamp,
version_id,
)?,
)?;
create_chunk_stream_for_cursor(session, plan_fragmenter_result).await?
Expand Down Expand Up @@ -606,6 +620,7 @@ impl SubscriptionCursor {
context: OptimizerContextRef,
old_epoch: u64,
new_epoch: u64,
version_id: HummockVersionId,
) -> Result<BatchQueryPlanResult> {
let out_col_idx = table_catalog
.columns
Expand All @@ -621,6 +636,7 @@ impl SubscriptionCursor {
context,
old_epoch,
new_epoch,
version_id,
);
let batch_log_seq_scan = BatchLogSeqScan::new(core);
let schema = batch_log_seq_scan
Expand Down
Loading

0 comments on commit e9c2161

Please sign in to comment.