Skip to content

Commit

Permalink
separate wait epoch of batch and streaming
Browse files Browse the repository at this point in the history
  • Loading branch information
wenym1 committed Sep 25, 2024
1 parent 0df1339 commit 44a3b4a
Show file tree
Hide file tree
Showing 17 changed files with 201 additions and 86 deletions.
7 changes: 6 additions & 1 deletion proto/common.proto
Original file line number Diff line number Diff line change
Expand Up @@ -105,9 +105,14 @@ message WorkerSlotMapping {
repeated uint64 data = 2;
}

message BatchQueryCommittedEpoch {
uint64 epoch = 1;
uint64 hummock_version_id = 2;
}

message BatchQueryEpoch {
oneof epoch {
uint64 committed = 1;
BatchQueryCommittedEpoch committed = 1;

Check failure on line 115 in proto/common.proto

View workflow job for this annotation

GitHub Actions / Check breaking changes in Protobuf files

Field "1" with name "committed" on message "BatchQueryEpoch" changed type from "uint64" to "message". See https://developers.google.com/protocol-buffers/docs/proto3#updating for wire compatibility rules and https://developers.google.com/protocol-buffers/docs/proto3#json for JSON compatibility rules.
uint64 current = 2;
uint64 backup = 3;
uint64 time_travel = 4;
Expand Down
22 changes: 19 additions & 3 deletions src/batch/src/executor/log_row_seq_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use risingwave_common::catalog::{ColumnId, Field, Schema};
use risingwave_common::hash::VirtualNode;
use risingwave_common::row::{Row, RowExt};
use risingwave_common::types::ScalarImpl;
use risingwave_hummock_sdk::{HummockReadEpoch, HummockVersionId};
use risingwave_pb::batch_plan::plan_node::NodeBody;
use risingwave_pb::common::{batch_query_epoch, BatchQueryEpoch};
use risingwave_pb::plan_common::StorageTableDesc;
Expand All @@ -50,13 +51,15 @@ pub struct LogRowSeqScanExecutor<S: StateStore> {
table: StorageTable<S>,
old_epoch: u64,
new_epoch: u64,
version_id: HummockVersionId,
}

impl<S: StateStore> LogRowSeqScanExecutor<S> {
pub fn new(
table: StorageTable<S>,
old_epoch: u64,
new_epoch: u64,
version_id: HummockVersionId,
chunk_size: usize,
identity: String,
metrics: Option<BatchMetrics>,
Expand All @@ -74,6 +77,7 @@ impl<S: StateStore> LogRowSeqScanExecutor<S> {
table,
old_epoch,
new_epoch,
version_id,
}
}
}
Expand Down Expand Up @@ -128,12 +132,18 @@ impl BoxedExecutorBuilder for LogStoreRowSeqScanExecutorBuilder {
unreachable!("invalid new epoch: {:?}", log_store_seq_scan_node.new_epoch)
};

assert_eq!(old_epoch.hummock_version_id, new_epoch.hummock_version_id);
let version_id = old_epoch.hummock_version_id;
let old_epoch = old_epoch.epoch;
let new_epoch = new_epoch.epoch;

dispatch_state_store!(source.context().state_store(), state_store, {
let table = StorageTable::new_partial(state_store, column_ids, vnodes, table_desc);
Ok(Box::new(LogRowSeqScanExecutor::new(
table,
*old_epoch,
*new_epoch,
old_epoch,
new_epoch,
HummockVersionId::new(version_id),
chunk_size as usize,
source.plan_node().get_identity().clone(),
metrics,
Expand Down Expand Up @@ -164,6 +174,7 @@ impl<S: StateStore> LogRowSeqScanExecutor<S> {
table,
old_epoch,
new_epoch,
version_id,
schema,
..
} = *self;
Expand All @@ -180,6 +191,7 @@ impl<S: StateStore> LogRowSeqScanExecutor<S> {
table.clone(),
old_epoch,
new_epoch,
version_id,
chunk_size,
histogram,
Arc::new(schema.clone()),
Expand All @@ -196,13 +208,17 @@ impl<S: StateStore> LogRowSeqScanExecutor<S> {
table: Arc<StorageTable<S>>,
old_epoch: u64,
new_epoch: u64,
version_id: HummockVersionId,
chunk_size: usize,
histogram: Option<impl Deref<Target = Histogram>>,
schema: Arc<Schema>,
) {
// Range Scan.
let iter = table
.batch_iter_log_with_pk_bounds(old_epoch, new_epoch)
.batch_iter_log_with_pk_bounds(
old_epoch,
HummockReadEpoch::BatchQueryCommitted(new_epoch, version_id),
)
.await?
.flat_map(|r| {
futures::stream::iter(std::iter::from_coroutine(
Expand Down
4 changes: 2 additions & 2 deletions src/batch/src/executor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,7 @@ impl<'a, C: BatchTaskContext> ExecutorBuilder<'a, C> {

#[cfg(test)]
mod tests {
use risingwave_hummock_sdk::to_committed_batch_query_epoch;
use risingwave_hummock_sdk::test_batch_query_epoch;
use risingwave_pb::batch_plan::PlanNode;

use crate::executor::ExecutorBuilder;
Expand All @@ -278,7 +278,7 @@ mod tests {
&plan_node,
task_id,
ComputeNodeContext::for_test(),
to_committed_batch_query_epoch(u64::MAX),
test_batch_query_epoch(),
ShutdownToken::empty(),
);
let child_plan = &PlanNode::default();
Expand Down
4 changes: 2 additions & 2 deletions src/batch/src/task/task_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,12 +137,12 @@ impl BatchManager {
tid: &PbTaskId,
plan: PlanFragment,
) -> Result<()> {
use risingwave_hummock_sdk::to_committed_batch_query_epoch;
use risingwave_hummock_sdk::test_batch_query_epoch;

self.fire_task(
tid,
plan,
to_committed_batch_query_epoch(0),
test_batch_query_epoch(),
ComputeNodeContext::for_test(),
StateReporter::new_with_test(),
TracingContext::none(),
Expand Down
4 changes: 2 additions & 2 deletions src/compute/tests/cdc_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ use risingwave_connector::source::cdc::external::{
};
use risingwave_connector::source::cdc::DebeziumCdcSplit;
use risingwave_connector::source::SplitImpl;
use risingwave_hummock_sdk::to_committed_batch_query_epoch;
use risingwave_hummock_sdk::test_batch_query_epoch;
use risingwave_storage::memory::MemoryStateStore;
use risingwave_storage::table::batch_table::storage_table::StorageTable;
use risingwave_stream::common::table::state_table::StateTable;
Expand Down Expand Up @@ -384,7 +384,7 @@ async fn test_cdc_backfill() -> StreamResult<()> {
table.clone(),
vec![ScanRange::full()],
true,
to_committed_batch_query_epoch(u64::MAX),
test_batch_query_epoch(),
1024,
"RowSeqExecutor2".to_string(),
None,
Expand Down
10 changes: 5 additions & 5 deletions src/compute/tests/integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ use risingwave_common::util::iter_util::ZipEqFast;
use risingwave_common::util::sort_util::{ColumnOrder, OrderType};
use risingwave_connector::source::reader::desc::test_utils::create_source_desc_builder;
use risingwave_dml::dml_manager::DmlManager;
use risingwave_hummock_sdk::to_committed_batch_query_epoch;
use risingwave_hummock_sdk::test_batch_query_epoch;
use risingwave_pb::catalog::StreamSourceInfo;
use risingwave_pb::plan_common::PbRowFormatType;
use risingwave_storage::memory::MemoryStateStore;
Expand Down Expand Up @@ -263,7 +263,7 @@ async fn test_table_materialize() -> StreamResult<()> {
table.clone(),
vec![ScanRange::full()],
true,
to_committed_batch_query_epoch(u64::MAX),
test_batch_query_epoch(),
1024,
"RowSeqExecutor2".to_string(),
None,
Expand Down Expand Up @@ -334,7 +334,7 @@ async fn test_table_materialize() -> StreamResult<()> {
table.clone(),
vec![ScanRange::full()],
true,
to_committed_batch_query_epoch(u64::MAX),
test_batch_query_epoch(),
1024,
"RowSeqScanExecutor2".to_string(),
None,
Expand Down Expand Up @@ -414,7 +414,7 @@ async fn test_table_materialize() -> StreamResult<()> {
table,
vec![ScanRange::full()],
true,
to_committed_batch_query_epoch(u64::MAX),
test_batch_query_epoch(),
1024,
"RowSeqScanExecutor2".to_string(),
None,
Expand Down Expand Up @@ -490,7 +490,7 @@ async fn test_row_seq_scan() -> StreamResult<()> {
table,
vec![ScanRange::full()],
true,
to_committed_batch_query_epoch(u64::MAX),
test_batch_query_epoch(),
1,
"RowSeqScanExecutor2".to_string(),
None,
Expand Down
12 changes: 9 additions & 3 deletions src/frontend/src/optimizer/plan_node/batch_log_seq_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
use pretty_xmlish::{Pretty, XmlNode};
use risingwave_pb::batch_plan::plan_node::NodeBody;
use risingwave_pb::batch_plan::LogRowSeqScanNode;
use risingwave_pb::common::BatchQueryEpoch;
use risingwave_pb::common::{BatchQueryCommittedEpoch, BatchQueryEpoch};

use super::batch::prelude::*;
use super::utils::{childless_record, Distill};
Expand Down Expand Up @@ -112,12 +112,18 @@ impl TryToBatchPb for BatchLogSeqScan {
vnode_bitmap: None,
old_epoch: Some(BatchQueryEpoch {
epoch: Some(risingwave_pb::common::batch_query_epoch::Epoch::Committed(
self.core.old_epoch,
BatchQueryCommittedEpoch {
epoch: self.core.old_epoch,
hummock_version_id: 0,
},
)),
}),
new_epoch: Some(BatchQueryEpoch {
epoch: Some(risingwave_pb::common::batch_query_epoch::Epoch::Committed(
self.core.new_epoch,
BatchQueryCommittedEpoch {
epoch: self.core.new_epoch,
hummock_version_id: 0,
},
)),
}),
}))
Expand Down
4 changes: 4 additions & 0 deletions src/frontend/src/optimizer/plan_node/generic/log_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use pretty_xmlish::Pretty;
use risingwave_common::catalog::{Field, Schema, TableDesc};
use risingwave_common::types::DataType;
use risingwave_common::util::sort_util::ColumnOrder;
use risingwave_hummock_sdk::HummockVersionId;

use crate::catalog::ColumnId;
use crate::optimizer::optimizer_context::OptimizerContextRef;
Expand All @@ -44,6 +45,7 @@ pub struct LogScan {

pub old_epoch: u64,
pub new_epoch: u64,
pub version_id: HummockVersionId,
}

impl LogScan {
Expand Down Expand Up @@ -101,6 +103,7 @@ impl LogScan {
ctx: OptimizerContextRef,
old_epoch: u64,
new_epoch: u64,
version_id: HummockVersionId,
) -> Self {
Self {
table_name,
Expand All @@ -110,6 +113,7 @@ impl LogScan {
ctx,
old_epoch,
new_epoch,
version_id,
}
}

Expand Down
7 changes: 5 additions & 2 deletions src/frontend/src/scheduler/snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use risingwave_hummock_sdk::version::HummockVersionStateTableInfo;
use risingwave_hummock_sdk::{
FrontendHummockVersion, FrontendHummockVersionDelta, HummockVersionId, INVALID_VERSION_ID,
};
use risingwave_pb::common::{batch_query_epoch, BatchQueryEpoch};
use risingwave_pb::common::{batch_query_epoch, BatchQueryCommittedEpoch, BatchQueryEpoch};
use risingwave_pb::hummock::{HummockVersionDeltas, StateTableInfoDelta};
use tokio::sync::watch;

Expand Down Expand Up @@ -55,7 +55,10 @@ impl ReadSnapshot {
Ok(match self {
ReadSnapshot::FrontendPinned { snapshot } => BatchQueryEpoch {
epoch: Some(batch_query_epoch::Epoch::Committed(
snapshot.batch_query_epoch(read_storage_tables)?.0,
BatchQueryCommittedEpoch {
epoch: snapshot.batch_query_epoch(read_storage_tables)?.0,
hummock_version_id: snapshot.value.id.to_u64(),
},
)),
},
ReadSnapshot::ReadUncommitted => BatchQueryEpoch {
Expand Down
16 changes: 16 additions & 0 deletions src/frontend/src/session/cursor_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ use risingwave_common::catalog::Field;
use risingwave_common::error::BoxedError;
use risingwave_common::session_config::QueryMode;
use risingwave_common::types::DataType;
use risingwave_hummock_sdk::HummockVersionId;
use risingwave_sqlparser::ast::{Ident, ObjectName, Statement};

use super::SessionImpl;
Expand Down Expand Up @@ -523,6 +524,18 @@ impl SubscriptionCursor {
let init_query_timer = Instant::now();
let (chunk_stream, fields) = if let Some(rw_timestamp) = rw_timestamp {
let context = OptimizerContext::from_handler_args(handle_args);
let version_id = {
let version = session.env.hummock_snapshot_manager.acquire();
let version = version.version();
if !version
.state_table_info
.info()
.contains_key(dependent_table_id)
{
return Err(anyhow!("table id {dependent_table_id} has been dropped").into());
}
version.id
};
let plan_fragmenter_result = gen_batch_plan_fragmenter(
&session,
Self::create_batch_plan_for_cursor(
Expand All @@ -531,6 +544,7 @@ impl SubscriptionCursor {
context.into(),
rw_timestamp,
rw_timestamp,
version_id,
)?,
)?;
create_chunk_stream_for_cursor(session, plan_fragmenter_result).await?
Expand Down Expand Up @@ -606,6 +620,7 @@ impl SubscriptionCursor {
context: OptimizerContextRef,
old_epoch: u64,
new_epoch: u64,
version_id: HummockVersionId,
) -> Result<BatchQueryPlanResult> {
let out_col_idx = table_catalog
.columns
Expand All @@ -621,6 +636,7 @@ impl SubscriptionCursor {
context,
old_epoch,
new_epoch,
version_id,
);
let batch_log_seq_scan = BatchLogSeqScan::new(core);
let schema = batch_log_seq_scan
Expand Down
22 changes: 14 additions & 8 deletions src/storage/hummock_sdk/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -208,8 +208,10 @@ impl PartialEq for LocalSstableInfo {
/// Package read epoch of hummock, it be used for `wait_epoch`
#[derive(Debug, Clone, Copy)]
pub enum HummockReadEpoch {
/// We need to wait the `max_committed_epoch`
/// We need to wait the `committed_epoch` of the read table
Committed(HummockEpoch),
/// We need to wait the `committed_epoch` of the read table and also the hummock version to the version id
BatchQueryCommitted(HummockEpoch, HummockVersionId),
/// We don't need to wait epoch, we usually do stream reading with it.
NoWait(HummockEpoch),
/// We don't need to wait epoch.
Expand All @@ -220,7 +222,10 @@ pub enum HummockReadEpoch {
impl From<BatchQueryEpoch> for HummockReadEpoch {
fn from(e: BatchQueryEpoch) -> Self {
match e.epoch.unwrap() {
batch_query_epoch::Epoch::Committed(epoch) => HummockReadEpoch::Committed(epoch),
batch_query_epoch::Epoch::Committed(epoch) => HummockReadEpoch::BatchQueryCommitted(
epoch.epoch,
HummockVersionId::new(epoch.hummock_version_id),
),
batch_query_epoch::Epoch::Current(epoch) => {
if epoch != HummockEpoch::MAX {
warn!(
Expand All @@ -236,19 +241,20 @@ impl From<BatchQueryEpoch> for HummockReadEpoch {
}
}

pub fn to_committed_batch_query_epoch(epoch: u64) -> BatchQueryEpoch {
pub fn test_batch_query_epoch() -> BatchQueryEpoch {
BatchQueryEpoch {
epoch: Some(batch_query_epoch::Epoch::Committed(epoch)),
epoch: Some(batch_query_epoch::Epoch::Current(u64::MAX)),
}
}

impl HummockReadEpoch {
pub fn get_epoch(&self) -> HummockEpoch {
*match self {
HummockReadEpoch::Committed(epoch) => epoch,
HummockReadEpoch::NoWait(epoch) => epoch,
HummockReadEpoch::Backup(epoch) => epoch,
HummockReadEpoch::TimeTravel(epoch) => epoch,
HummockReadEpoch::Committed(epoch)
| HummockReadEpoch::BatchQueryCommitted(epoch, _)
| HummockReadEpoch::NoWait(epoch)
| HummockReadEpoch::Backup(epoch)
| HummockReadEpoch::TimeTravel(epoch) => epoch,
}
}
}
Expand Down
Loading

0 comments on commit 44a3b4a

Please sign in to comment.