Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(storage): per table try wait epoch #18622

Merged
merged 44 commits into from
Sep 27, 2024
Merged
Show file tree
Hide file tree
Changes from 37 commits
Commits
Show all changes
44 commits
Select commit Hold shift + click to select a range
826da0b
feat(storage): notify frontend with more hummock version info
wenym1 Sep 17, 2024
14fdb68
temp save
wenym1 Sep 18, 2024
e4b390c
fix test
wenym1 Sep 18, 2024
9fc23f9
Merge branch 'yiming/more-frontend-hummock-info' into yiming/per-tabl…
wenym1 Sep 18, 2024
3b142d7
Merge branch 'main' into yiming/per-table-pin-snapshot
wenym1 Sep 18, 2024
73674f3
feat(frontend): generate query epoch by committed epoch of involved t…
wenym1 Sep 18, 2024
8fff90b
Merge branch 'main' into yiming/more-frontend-hummock-info
wenym1 Sep 18, 2024
880d15a
Merge branch 'yiming/more-frontend-hummock-info' into yiming/per-tabl…
wenym1 Sep 18, 2024
ab43fbd
rename
wenym1 Sep 18, 2024
d74fe42
fix recursive
wenym1 Sep 18, 2024
04b112c
fix test
wenym1 Sep 19, 2024
c0ee9cc
Merge branch 'main' into yiming/more-frontend-hummock-info
wenym1 Sep 19, 2024
cfd925d
Merge branch 'yiming/more-frontend-hummock-info' into yiming/per-tabl…
wenym1 Sep 20, 2024
9808588
avoid recalculate
wenym1 Sep 20, 2024
201dde3
fix typo
wenym1 Sep 20, 2024
deb0b17
Merge branch 'main' into yiming/more-frontend-hummock-info
wenym1 Sep 20, 2024
23bb7c6
Merge branch 'yiming/more-frontend-hummock-info' into yiming/per-tabl…
wenym1 Sep 20, 2024
f1cf8ec
feat(storage): per table try wait epoch
wenym1 Sep 20, 2024
1836fec
Merge branch 'main' into yiming/per-table-try-wait-epoch
wenym1 Sep 20, 2024
f5f78d4
fix test
wenym1 Sep 20, 2024
902c065
fix test
wenym1 Sep 20, 2024
ff24069
Merge branch 'main' into yiming/per-table-pin-snapshot
wenym1 Sep 21, 2024
a520eba
Merge branch 'main' into yiming/per-table-pin-snapshot
wenym1 Sep 21, 2024
84c8037
fix comment
wenym1 Sep 23, 2024
c88b9df
simplify now
wenym1 Sep 23, 2024
40f8ac6
refine
wenym1 Sep 23, 2024
6e0e2b7
Merge branch 'main' into yiming/per-table-pin-snapshot
wenym1 Sep 23, 2024
4e897e9
address comment
wenym1 Sep 23, 2024
4723dbe
impl ScanTableVisitor
wenym1 Sep 23, 2024
5fb9ab5
Merge branch 'main' into yiming/per-table-try-wait-epoch
wenym1 Sep 23, 2024
65bc6fa
Merge branch 'main' into yiming/per-table-pin-snapshot
wenym1 Sep 23, 2024
0d0cc25
remove QuerySnapshot
wenym1 Sep 23, 2024
6d32389
Merge branch 'yiming/per-table-pin-snapshot' into yiming/per-table-tr…
wenym1 Sep 23, 2024
fee2a0f
use epoch::now
wenym1 Sep 23, 2024
6cb7b8c
Merge branch 'main' into yiming/per-table-pin-snapshot
wenym1 Sep 24, 2024
0df1339
Merge branch 'yiming/per-table-pin-snapshot' into yiming/per-table-tr…
wenym1 Sep 24, 2024
44a3b4a
separate wait epoch of batch and streaming
wenym1 Sep 25, 2024
ab3e10d
refine
wenym1 Sep 25, 2024
3b10660
fix
wenym1 Sep 26, 2024
49c7cde
fix typo
wenym1 Sep 26, 2024
02b0682
fix
wenym1 Sep 26, 2024
c74a62a
fix borrow and update
wenym1 Sep 26, 2024
eba2456
add log
wenym1 Sep 26, 2024
cb0ccd2
remove log
wenym1 Sep 27, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion proto/common.proto
Original file line number Diff line number Diff line change
Expand Up @@ -105,9 +105,14 @@
repeated uint64 data = 2;
}

message BatchQueryCommittedEpoch {
uint64 epoch = 1;
uint64 hummock_version_id = 2;
}

message BatchQueryEpoch {
oneof epoch {
uint64 committed = 1;
BatchQueryCommittedEpoch committed = 1;

Check failure on line 115 in proto/common.proto

View workflow job for this annotation

GitHub Actions / Check breaking changes in Protobuf files

Field "1" with name "committed" on message "BatchQueryEpoch" changed type from "uint64" to "message". See https://developers.google.com/protocol-buffers/docs/proto3#updating for wire compatibility rules and https://developers.google.com/protocol-buffers/docs/proto3#json for JSON compatibility rules.
uint64 current = 2;
uint64 backup = 3;
uint64 time_travel = 4;
Expand Down
1 change: 1 addition & 0 deletions proto/stream_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ message BarrierCompleteResponse {

message WaitEpochCommitRequest {
uint64 epoch = 1;
uint32 table_id = 2;
}

message WaitEpochCommitResponse {
Expand Down
22 changes: 19 additions & 3 deletions src/batch/src/executor/log_row_seq_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use risingwave_common::catalog::{ColumnId, Field, Schema};
use risingwave_common::hash::VirtualNode;
use risingwave_common::row::{Row, RowExt};
use risingwave_common::types::ScalarImpl;
use risingwave_hummock_sdk::{HummockReadEpoch, HummockVersionId};
use risingwave_pb::batch_plan::plan_node::NodeBody;
use risingwave_pb::common::{batch_query_epoch, BatchQueryEpoch};
use risingwave_pb::plan_common::StorageTableDesc;
Expand All @@ -50,13 +51,15 @@ pub struct LogRowSeqScanExecutor<S: StateStore> {
table: StorageTable<S>,
old_epoch: u64,
new_epoch: u64,
version_id: HummockVersionId,
}

impl<S: StateStore> LogRowSeqScanExecutor<S> {
pub fn new(
table: StorageTable<S>,
old_epoch: u64,
new_epoch: u64,
version_id: HummockVersionId,
chunk_size: usize,
identity: String,
metrics: Option<BatchMetrics>,
Expand All @@ -74,6 +77,7 @@ impl<S: StateStore> LogRowSeqScanExecutor<S> {
table,
old_epoch,
new_epoch,
version_id,
}
}
}
Expand Down Expand Up @@ -128,12 +132,18 @@ impl BoxedExecutorBuilder for LogStoreRowSeqScanExecutorBuilder {
unreachable!("invalid new epoch: {:?}", log_store_seq_scan_node.new_epoch)
};

assert_eq!(old_epoch.hummock_version_id, new_epoch.hummock_version_id);
let version_id = old_epoch.hummock_version_id;
let old_epoch = old_epoch.epoch;
let new_epoch = new_epoch.epoch;

dispatch_state_store!(source.context().state_store(), state_store, {
let table = StorageTable::new_partial(state_store, column_ids, vnodes, table_desc);
Ok(Box::new(LogRowSeqScanExecutor::new(
table,
*old_epoch,
*new_epoch,
old_epoch,
new_epoch,
HummockVersionId::new(version_id),
chunk_size as usize,
source.plan_node().get_identity().clone(),
metrics,
Expand Down Expand Up @@ -164,6 +174,7 @@ impl<S: StateStore> LogRowSeqScanExecutor<S> {
table,
old_epoch,
new_epoch,
version_id,
schema,
..
} = *self;
Expand All @@ -180,6 +191,7 @@ impl<S: StateStore> LogRowSeqScanExecutor<S> {
table.clone(),
old_epoch,
new_epoch,
version_id,
chunk_size,
histogram,
Arc::new(schema.clone()),
Expand All @@ -196,13 +208,17 @@ impl<S: StateStore> LogRowSeqScanExecutor<S> {
table: Arc<StorageTable<S>>,
old_epoch: u64,
new_epoch: u64,
version_id: HummockVersionId,
chunk_size: usize,
histogram: Option<impl Deref<Target = Histogram>>,
schema: Arc<Schema>,
) {
// Range Scan.
let iter = table
.batch_iter_log_with_pk_bounds(old_epoch, new_epoch)
.batch_iter_log_with_pk_bounds(
old_epoch,
HummockReadEpoch::BatchQueryCommitted(new_epoch, version_id),
)
.await?
.flat_map(|r| {
futures::stream::iter(std::iter::from_coroutine(
Expand Down
4 changes: 2 additions & 2 deletions src/batch/src/executor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,7 @@ impl<'a, C: BatchTaskContext> ExecutorBuilder<'a, C> {

#[cfg(test)]
mod tests {
use risingwave_hummock_sdk::to_committed_batch_query_epoch;
use risingwave_hummock_sdk::test_batch_query_epoch;
use risingwave_pb::batch_plan::PlanNode;

use crate::executor::ExecutorBuilder;
Expand All @@ -278,7 +278,7 @@ mod tests {
&plan_node,
task_id,
ComputeNodeContext::for_test(),
to_committed_batch_query_epoch(u64::MAX),
test_batch_query_epoch(),
ShutdownToken::empty(),
);
let child_plan = &PlanNode::default();
Expand Down
4 changes: 2 additions & 2 deletions src/batch/src/task/task_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,12 +137,12 @@ impl BatchManager {
tid: &PbTaskId,
plan: PlanFragment,
) -> Result<()> {
use risingwave_hummock_sdk::to_committed_batch_query_epoch;
use risingwave_hummock_sdk::test_batch_query_epoch;

self.fire_task(
tid,
plan,
to_committed_batch_query_epoch(0),
test_batch_query_epoch(),
ComputeNodeContext::for_test(),
StateReporter::new_with_test(),
TracingContext::none(),
Expand Down
11 changes: 9 additions & 2 deletions src/compute/src/rpc/service/stream_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use futures::{Stream, StreamExt, TryStreamExt};
use risingwave_pb::stream_service::stream_service_server::StreamService;
use risingwave_pb::stream_service::*;
use risingwave_storage::dispatch_state_store;
use risingwave_storage::store::TryWaitEpochOptions;
use risingwave_stream::error::StreamError;
use risingwave_stream::task::{LocalStreamManager, StreamEnvironment};
use tokio::sync::mpsc::unbounded_channel;
Expand Down Expand Up @@ -45,14 +46,20 @@ impl StreamService for StreamServiceImpl {
&self,
request: Request<WaitEpochCommitRequest>,
) -> Result<Response<WaitEpochCommitResponse>, Status> {
let epoch = request.into_inner().epoch;
let request = request.into_inner();
let epoch = request.epoch;

dispatch_state_store!(self.env.state_store(), store, {
use risingwave_hummock_sdk::HummockReadEpoch;
use risingwave_storage::StateStore;

store
.try_wait_epoch(HummockReadEpoch::Committed(epoch))
.try_wait_epoch(
HummockReadEpoch::Committed(epoch),
TryWaitEpochOptions {
table_id: request.table_id.into(),
},
)
.instrument_await(format!("wait_epoch_commit (epoch {})", epoch))
.await
.map_err(StreamError::from)?;
Expand Down
4 changes: 2 additions & 2 deletions src/compute/tests/cdc_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ use risingwave_connector::source::cdc::external::{
};
use risingwave_connector::source::cdc::DebeziumCdcSplit;
use risingwave_connector::source::SplitImpl;
use risingwave_hummock_sdk::to_committed_batch_query_epoch;
use risingwave_hummock_sdk::test_batch_query_epoch;
use risingwave_storage::memory::MemoryStateStore;
use risingwave_storage::table::batch_table::storage_table::StorageTable;
use risingwave_stream::common::table::state_table::StateTable;
Expand Down Expand Up @@ -384,7 +384,7 @@ async fn test_cdc_backfill() -> StreamResult<()> {
table.clone(),
vec![ScanRange::full()],
true,
to_committed_batch_query_epoch(u64::MAX),
test_batch_query_epoch(),
1024,
"RowSeqExecutor2".to_string(),
None,
Expand Down
10 changes: 5 additions & 5 deletions src/compute/tests/integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ use risingwave_common::util::iter_util::ZipEqFast;
use risingwave_common::util::sort_util::{ColumnOrder, OrderType};
use risingwave_connector::source::reader::desc::test_utils::create_source_desc_builder;
use risingwave_dml::dml_manager::DmlManager;
use risingwave_hummock_sdk::to_committed_batch_query_epoch;
use risingwave_hummock_sdk::test_batch_query_epoch;
use risingwave_pb::catalog::StreamSourceInfo;
use risingwave_pb::plan_common::PbRowFormatType;
use risingwave_storage::memory::MemoryStateStore;
Expand Down Expand Up @@ -263,7 +263,7 @@ async fn test_table_materialize() -> StreamResult<()> {
table.clone(),
vec![ScanRange::full()],
true,
to_committed_batch_query_epoch(u64::MAX),
test_batch_query_epoch(),
1024,
"RowSeqExecutor2".to_string(),
None,
Expand Down Expand Up @@ -334,7 +334,7 @@ async fn test_table_materialize() -> StreamResult<()> {
table.clone(),
vec![ScanRange::full()],
true,
to_committed_batch_query_epoch(u64::MAX),
test_batch_query_epoch(),
1024,
"RowSeqScanExecutor2".to_string(),
None,
Expand Down Expand Up @@ -414,7 +414,7 @@ async fn test_table_materialize() -> StreamResult<()> {
table,
vec![ScanRange::full()],
true,
to_committed_batch_query_epoch(u64::MAX),
test_batch_query_epoch(),
1024,
"RowSeqScanExecutor2".to_string(),
None,
Expand Down Expand Up @@ -490,7 +490,7 @@ async fn test_row_seq_scan() -> StreamResult<()> {
table,
vec![ScanRange::full()],
true,
to_committed_batch_query_epoch(u64::MAX),
test_batch_query_epoch(),
1,
"RowSeqScanExecutor2".to_string(),
None,
Expand Down
12 changes: 9 additions & 3 deletions src/frontend/src/optimizer/plan_node/batch_log_seq_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
use pretty_xmlish::{Pretty, XmlNode};
use risingwave_pb::batch_plan::plan_node::NodeBody;
use risingwave_pb::batch_plan::LogRowSeqScanNode;
use risingwave_pb::common::BatchQueryEpoch;
use risingwave_pb::common::{BatchQueryCommittedEpoch, BatchQueryEpoch};

use super::batch::prelude::*;
use super::utils::{childless_record, Distill};
Expand Down Expand Up @@ -112,12 +112,18 @@ impl TryToBatchPb for BatchLogSeqScan {
vnode_bitmap: None,
old_epoch: Some(BatchQueryEpoch {
epoch: Some(risingwave_pb::common::batch_query_epoch::Epoch::Committed(
self.core.old_epoch,
BatchQueryCommittedEpoch {
epoch: self.core.old_epoch,
hummock_version_id: 0,
},
)),
}),
new_epoch: Some(BatchQueryEpoch {
epoch: Some(risingwave_pb::common::batch_query_epoch::Epoch::Committed(
self.core.new_epoch,
BatchQueryCommittedEpoch {
epoch: self.core.new_epoch,
hummock_version_id: 0,
},
)),
}),
}))
Expand Down
4 changes: 4 additions & 0 deletions src/frontend/src/optimizer/plan_node/generic/log_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use pretty_xmlish::Pretty;
use risingwave_common::catalog::{Field, Schema, TableDesc};
use risingwave_common::types::DataType;
use risingwave_common::util::sort_util::ColumnOrder;
use risingwave_hummock_sdk::HummockVersionId;

use crate::catalog::ColumnId;
use crate::optimizer::optimizer_context::OptimizerContextRef;
Expand All @@ -44,6 +45,7 @@ pub struct LogScan {

pub old_epoch: u64,
pub new_epoch: u64,
pub version_id: HummockVersionId,
}

impl LogScan {
Expand Down Expand Up @@ -101,6 +103,7 @@ impl LogScan {
ctx: OptimizerContextRef,
old_epoch: u64,
new_epoch: u64,
version_id: HummockVersionId,
) -> Self {
Self {
table_name,
Expand All @@ -110,6 +113,7 @@ impl LogScan {
ctx,
old_epoch,
new_epoch,
version_id,
}
}

Expand Down
7 changes: 5 additions & 2 deletions src/frontend/src/scheduler/snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use risingwave_hummock_sdk::version::HummockVersionStateTableInfo;
use risingwave_hummock_sdk::{
FrontendHummockVersion, FrontendHummockVersionDelta, HummockVersionId, INVALID_VERSION_ID,
};
use risingwave_pb::common::{batch_query_epoch, BatchQueryEpoch};
use risingwave_pb::common::{batch_query_epoch, BatchQueryCommittedEpoch, BatchQueryEpoch};
use risingwave_pb::hummock::{HummockVersionDeltas, StateTableInfoDelta};
use tokio::sync::watch;

Expand Down Expand Up @@ -55,7 +55,10 @@ impl ReadSnapshot {
Ok(match self {
ReadSnapshot::FrontendPinned { snapshot } => BatchQueryEpoch {
epoch: Some(batch_query_epoch::Epoch::Committed(
snapshot.batch_query_epoch(read_storage_tables)?.0,
BatchQueryCommittedEpoch {
epoch: snapshot.batch_query_epoch(read_storage_tables)?.0,
hummock_version_id: snapshot.value.id.to_u64(),
},
)),
},
ReadSnapshot::ReadUncommitted => BatchQueryEpoch {
Expand Down
16 changes: 16 additions & 0 deletions src/frontend/src/session/cursor_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ use risingwave_common::catalog::Field;
use risingwave_common::error::BoxedError;
use risingwave_common::session_config::QueryMode;
use risingwave_common::types::DataType;
use risingwave_hummock_sdk::HummockVersionId;
use risingwave_sqlparser::ast::{Ident, ObjectName, Statement};

use super::SessionImpl;
Expand Down Expand Up @@ -523,6 +524,18 @@ impl SubscriptionCursor {
let init_query_timer = Instant::now();
let (chunk_stream, fields) = if let Some(rw_timestamp) = rw_timestamp {
let context = OptimizerContext::from_handler_args(handle_args);
let version_id = {
let version = session.env.hummock_snapshot_manager.acquire();
let version = version.version();
if !version
.state_table_info
.info()
.contains_key(dependent_table_id)
{
return Err(anyhow!("table id {dependent_table_id} has been dropped").into());
}
version.id
};
let plan_fragmenter_result = gen_batch_plan_fragmenter(
&session,
Self::create_batch_plan_for_cursor(
Expand All @@ -531,6 +544,7 @@ impl SubscriptionCursor {
context.into(),
rw_timestamp,
rw_timestamp,
version_id,
)?,
)?;
create_chunk_stream_for_cursor(session, plan_fragmenter_result).await?
Expand Down Expand Up @@ -606,6 +620,7 @@ impl SubscriptionCursor {
context: OptimizerContextRef,
old_epoch: u64,
new_epoch: u64,
version_id: HummockVersionId,
) -> Result<BatchQueryPlanResult> {
let out_col_idx = table_catalog
.columns
Expand All @@ -621,6 +636,7 @@ impl SubscriptionCursor {
context,
old_epoch,
new_epoch,
version_id,
);
let batch_log_seq_scan = BatchLogSeqScan::new(core);
let schema = batch_log_seq_scan
Expand Down
Loading
Loading