Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(storage): per table try wait epoch #18622

Merged
merged 44 commits into from
Sep 27, 2024
Merged
Show file tree
Hide file tree
Changes from 36 commits
Commits
Show all changes
44 commits
Select commit Hold shift + click to select a range
826da0b
feat(storage): notify frontend with more hummock version info
wenym1 Sep 17, 2024
14fdb68
temp save
wenym1 Sep 18, 2024
e4b390c
fix test
wenym1 Sep 18, 2024
9fc23f9
Merge branch 'yiming/more-frontend-hummock-info' into yiming/per-tabl…
wenym1 Sep 18, 2024
3b142d7
Merge branch 'main' into yiming/per-table-pin-snapshot
wenym1 Sep 18, 2024
73674f3
feat(frontend): generate query epoch by committed epoch of involved t…
wenym1 Sep 18, 2024
8fff90b
Merge branch 'main' into yiming/more-frontend-hummock-info
wenym1 Sep 18, 2024
880d15a
Merge branch 'yiming/more-frontend-hummock-info' into yiming/per-tabl…
wenym1 Sep 18, 2024
ab43fbd
rename
wenym1 Sep 18, 2024
d74fe42
fix recursive
wenym1 Sep 18, 2024
04b112c
fix test
wenym1 Sep 19, 2024
c0ee9cc
Merge branch 'main' into yiming/more-frontend-hummock-info
wenym1 Sep 19, 2024
cfd925d
Merge branch 'yiming/more-frontend-hummock-info' into yiming/per-tabl…
wenym1 Sep 20, 2024
9808588
avoid recalculate
wenym1 Sep 20, 2024
201dde3
fix typo
wenym1 Sep 20, 2024
deb0b17
Merge branch 'main' into yiming/more-frontend-hummock-info
wenym1 Sep 20, 2024
23bb7c6
Merge branch 'yiming/more-frontend-hummock-info' into yiming/per-tabl…
wenym1 Sep 20, 2024
f1cf8ec
feat(storage): per table try wait epoch
wenym1 Sep 20, 2024
1836fec
Merge branch 'main' into yiming/per-table-try-wait-epoch
wenym1 Sep 20, 2024
f5f78d4
fix test
wenym1 Sep 20, 2024
902c065
fix test
wenym1 Sep 20, 2024
ff24069
Merge branch 'main' into yiming/per-table-pin-snapshot
wenym1 Sep 21, 2024
a520eba
Merge branch 'main' into yiming/per-table-pin-snapshot
wenym1 Sep 21, 2024
84c8037
fix comment
wenym1 Sep 23, 2024
c88b9df
simplify now
wenym1 Sep 23, 2024
40f8ac6
refine
wenym1 Sep 23, 2024
6e0e2b7
Merge branch 'main' into yiming/per-table-pin-snapshot
wenym1 Sep 23, 2024
4e897e9
address comment
wenym1 Sep 23, 2024
4723dbe
impl ScanTableVisitor
wenym1 Sep 23, 2024
5fb9ab5
Merge branch 'main' into yiming/per-table-try-wait-epoch
wenym1 Sep 23, 2024
65bc6fa
Merge branch 'main' into yiming/per-table-pin-snapshot
wenym1 Sep 23, 2024
0d0cc25
remove QuerySnapshot
wenym1 Sep 23, 2024
6d32389
Merge branch 'yiming/per-table-pin-snapshot' into yiming/per-table-tr…
wenym1 Sep 23, 2024
fee2a0f
use epoch::now
wenym1 Sep 23, 2024
6cb7b8c
Merge branch 'main' into yiming/per-table-pin-snapshot
wenym1 Sep 24, 2024
0df1339
Merge branch 'yiming/per-table-pin-snapshot' into yiming/per-table-tr…
wenym1 Sep 24, 2024
44a3b4a
separate wait epoch of batch and streaming
wenym1 Sep 25, 2024
ab3e10d
refine
wenym1 Sep 25, 2024
3b10660
fix
wenym1 Sep 26, 2024
49c7cde
fix typo
wenym1 Sep 26, 2024
02b0682
fix
wenym1 Sep 26, 2024
c74a62a
fix borrow and update
wenym1 Sep 26, 2024
eba2456
add log
wenym1 Sep 26, 2024
cb0ccd2
remove log
wenym1 Sep 27, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions proto/stream_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ message BarrierCompleteResponse {

message WaitEpochCommitRequest {
uint64 epoch = 1;
uint32 table_id = 2;
}

message WaitEpochCommitResponse {
Expand Down
11 changes: 9 additions & 2 deletions src/compute/src/rpc/service/stream_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use futures::{Stream, StreamExt, TryStreamExt};
use risingwave_pb::stream_service::stream_service_server::StreamService;
use risingwave_pb::stream_service::*;
use risingwave_storage::dispatch_state_store;
use risingwave_storage::store::TryWaitEpochOptions;
use risingwave_stream::error::StreamError;
use risingwave_stream::task::{LocalStreamManager, StreamEnvironment};
use tokio::sync::mpsc::unbounded_channel;
Expand Down Expand Up @@ -45,14 +46,20 @@ impl StreamService for StreamServiceImpl {
&self,
request: Request<WaitEpochCommitRequest>,
) -> Result<Response<WaitEpochCommitResponse>, Status> {
let epoch = request.into_inner().epoch;
let request = request.into_inner();
let epoch = request.epoch;

dispatch_state_store!(self.env.state_store(), store, {
use risingwave_hummock_sdk::HummockReadEpoch;
use risingwave_storage::StateStore;

store
.try_wait_epoch(HummockReadEpoch::Committed(epoch))
.try_wait_epoch(
HummockReadEpoch::Committed(epoch),
TryWaitEpochOptions {
table_id: request.table_id.into(),
},
)
.instrument_await(format!("wait_epoch_commit (epoch {})", epoch))
.await
.map_err(StreamError::from)?;
Expand Down
21 changes: 16 additions & 5 deletions src/meta/src/barrier/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ use risingwave_common::hash::ActorMapping;
use risingwave_common::types::Timestamptz;
use risingwave_common::util::epoch::Epoch;
use risingwave_connector::source::SplitImpl;
use risingwave_hummock_sdk::HummockEpoch;
use risingwave_pb::catalog::{CreateType, Table};
use risingwave_pb::common::PbWorkerNode;
use risingwave_pb::meta::table_fragments::PbActorStatus;
Expand Down Expand Up @@ -442,6 +441,8 @@ pub struct CommandContext {
pub prev_epoch: TracedEpoch,
pub curr_epoch: TracedEpoch,

pub table_ids_to_commit: HashSet<TableId>,

pub current_paused_reason: Option<PausedReason>,

pub command: Command,
Expand Down Expand Up @@ -470,12 +471,12 @@ impl std::fmt::Debug for CommandContext {
}

impl CommandContext {
#[allow(clippy::too_many_arguments)]
pub(super) fn new(
node_map: HashMap<WorkerId, PbWorkerNode>,
subscription_info: InflightSubscriptionInfo,
prev_epoch: TracedEpoch,
curr_epoch: TracedEpoch,
table_ids_to_commit: HashSet<TableId>,
current_paused_reason: Option<PausedReason>,
command: Command,
kind: BarrierKind,
Expand All @@ -487,6 +488,7 @@ impl CommandContext {
subscription_info,
prev_epoch,
curr_epoch,
table_ids_to_commit,
current_paused_reason,
command,
kind,
Expand Down Expand Up @@ -945,15 +947,24 @@ impl Command {
}

impl CommandContext {
pub async fn wait_epoch_commit(&self, epoch: HummockEpoch) -> MetaResult<()> {
pub async fn wait_epoch_commit(&self) -> MetaResult<()> {
let table_id = self.table_ids_to_commit.iter().next().cloned();
// try wait epoch on an existing random table id
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// try wait epoch on an existing random table id
// try wait epoch on an existing random table id,
// this is okay because it will be deprecated once we enable partial checkpoint

...am I understanding correctly?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually this PR is not directly related to partial checkpoint yet. The main purpose is to deprecate the max_committed_epoch, and then when calling this wait_epoch_commit, we will need to specify a table_id.

For a CommandContext, its table_ids_to_commit are expected to have a same aligned committed epoch, so it's fine to wait on any one of the table ids, or skip it when there is no table ids to commit, though this is somehow unlikely to happen.

let Some(table_id) = table_id else {
// no need to wait epoch when there is no table id
return Ok(());
};
let futures = self.node_map.values().map(|worker_node| async {
let client = self
.barrier_manager_context
.env
.stream_client_pool()
.get(worker_node)
.await?;
let request = WaitEpochCommitRequest { epoch };
let request = WaitEpochCommitRequest {
epoch: self.prev_epoch.value().0,
table_id: table_id.table_id,
};
client.wait_epoch_commit(request).await
});

Expand All @@ -976,7 +987,7 @@ impl CommandContext {
// storage version with this epoch is synced to all compute nodes before the
// execution of the next command of `Update`, as some newly created operators
// may immediately initialize their states on that barrier.
self.wait_epoch_commit(self.prev_epoch.value().0).await?;
self.wait_epoch_commit().await?;
}
}

Expand Down
5 changes: 3 additions & 2 deletions src/meta/src/barrier/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1028,11 +1028,14 @@ impl GlobalBarrierManager {
});
span.record("epoch", curr_epoch.value().0);

let table_ids_to_commit: HashSet<_> = pre_applied_graph_info.existing_table_ids().collect();

let command_ctx = Arc::new(CommandContext::new(
self.active_streaming_nodes.current().clone(),
pre_applied_subscription_info,
prev_epoch.clone(),
curr_epoch.clone(),
table_ids_to_commit.clone(),
self.state.paused_reason(),
command,
kind,
Expand All @@ -1042,8 +1045,6 @@ impl GlobalBarrierManager {

send_latency_timer.observe_duration();

let table_ids_to_commit: HashSet<_> = pre_applied_graph_info.existing_table_ids().collect();

let mut jobs_to_wait = HashSet::new();

for (table_id, creating_job) in &mut self.checkpoint_control.creating_streaming_job_controls
Expand Down
7 changes: 7 additions & 0 deletions src/storage/hummock_sdk/src/version.rs
Original file line number Diff line number Diff line change
Expand Up @@ -422,6 +422,13 @@ impl HummockVersion {
self.max_committed_epoch
}

pub fn table_committed_epoch(&self, table_id: TableId) -> Option<u64> {
self.state_table_info
.info()
.get(&table_id)
.map(|info| info.committed_epoch)
}

pub fn visible_table_committed_epoch(&self) -> u64 {
self.max_committed_epoch
}
Expand Down
10 changes: 7 additions & 3 deletions src/storage/hummock_test/src/bin/replay/replay_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use risingwave_hummock_sdk::{HummockReadEpoch, HummockVersionId, SyncResult};
use risingwave_hummock_trace::{
GlobalReplay, LocalReplay, LocalReplayRead, ReplayItem, ReplayRead, ReplayStateStore,
ReplayWrite, Result, TraceError, TracedBytes, TracedInitOptions, TracedNewLocalOptions,
TracedReadOptions, TracedSealCurrentEpochOptions, TracedSubResp,
TracedReadOptions, TracedSealCurrentEpochOptions, TracedSubResp, TracedTryWaitEpochOptions,
};
use risingwave_meta::manager::{MessageStatus, MetaSrvEnv, NotificationManagerRef, WorkerKey};
use risingwave_pb::common::WorkerNode;
Expand Down Expand Up @@ -170,9 +170,13 @@ impl ReplayStateStore for GlobalReplayImpl {
Box::new(LocalReplayImpl(local_storage))
}

async fn try_wait_epoch(&self, epoch: HummockReadEpoch) -> Result<()> {
async fn try_wait_epoch(
&self,
epoch: HummockReadEpoch,
options: TracedTryWaitEpochOptions,
) -> Result<()> {
self.store
.try_wait_epoch(epoch)
.try_wait_epoch(epoch, options.into())
.await
.map_err(|_| TraceError::TryWaitEpochFailed)?;
Ok(())
Expand Down
13 changes: 10 additions & 3 deletions src/storage/hummock_test/src/failpoint_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ use risingwave_storage::hummock::test_utils::{count_stream, default_opts_for_tes
use risingwave_storage::hummock::{CachePolicy, HummockStorage};
use risingwave_storage::storage_value::StorageValue;
use risingwave_storage::store::{
LocalStateStore, NewLocalOptions, PrefetchOptions, ReadOptions, StateStoreRead, WriteOptions,
LocalStateStore, NewLocalOptions, PrefetchOptions, ReadOptions, StateStoreRead,
TryWaitEpochOptions, WriteOptions,
};
use risingwave_storage::StateStore;

Expand Down Expand Up @@ -148,7 +149,10 @@ async fn test_failpoints_state_store_read_upload() {
.unwrap();
meta_client.commit_epoch(1, res, false).await.unwrap();
hummock_storage
.try_wait_epoch(HummockReadEpoch::Committed(1))
.try_wait_epoch(
HummockReadEpoch::Committed(1),
TryWaitEpochOptions::for_test(local.table_id()),
)
.await
.unwrap();
// clear block cache
Expand Down Expand Up @@ -225,7 +229,10 @@ async fn test_failpoints_state_store_read_upload() {
.unwrap();
meta_client.commit_epoch(3, res, false).await.unwrap();
hummock_storage
.try_wait_epoch(HummockReadEpoch::Committed(3))
.try_wait_epoch(
HummockReadEpoch::Committed(3),
TryWaitEpochOptions::for_test(local.table_id()),
)
.await
.unwrap();

Expand Down
24 changes: 12 additions & 12 deletions src/storage/hummock_test/src/hummock_storage_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -568,7 +568,7 @@ async fn test_state_store_sync() {
.commit_epoch(epoch1, res, false)
.await
.unwrap();
test_env.storage.try_wait_epoch_for_test(epoch1).await;
test_env.wait_sync_committed_version().await;
{
// after sync 1 epoch
let read_version = hummock_storage.read_version();
Expand Down Expand Up @@ -614,7 +614,7 @@ async fn test_state_store_sync() {
.commit_epoch(epoch2, res, false)
.await
.unwrap();
test_env.storage.try_wait_epoch_for_test(epoch2).await;
test_env.wait_sync_committed_version().await;
{
// after sync all epoch
let read_version = hummock_storage.read_version();
Expand Down Expand Up @@ -912,7 +912,7 @@ async fn test_delete_get() {
.commit_epoch(epoch2, res, false)
.await
.unwrap();
test_env.storage.try_wait_epoch_for_test(epoch2).await;
test_env.wait_sync_committed_version().await;
assert!(test_env
.storage
.get(
Expand Down Expand Up @@ -1112,7 +1112,7 @@ async fn test_multiple_epoch_sync() {
.commit_epoch(epoch3, sync_result3, false)
.await
.unwrap();
test_env.storage.try_wait_epoch_for_test(epoch3).await;
test_env.wait_sync_committed_version().await;
test_get(true).await;
}

Expand Down Expand Up @@ -1286,7 +1286,7 @@ async fn test_iter_with_min_epoch() {
.commit_epoch(epoch2, sync_result2, false)
.await
.unwrap();
test_env.storage.try_wait_epoch_for_test(epoch2).await;
test_env.wait_sync_committed_version().await;

{
let iter = test_env
Expand Down Expand Up @@ -1578,7 +1578,7 @@ async fn test_hummock_version_reader() {
.commit_epoch(epoch1, sync_result1, false)
.await
.unwrap();
test_env.storage.try_wait_epoch_for_test(epoch1).await;
test_env.wait_sync_committed_version().await;

let sync_result2 = test_env
.storage
Expand All @@ -1590,7 +1590,7 @@ async fn test_hummock_version_reader() {
.commit_epoch(epoch2, sync_result2, false)
.await
.unwrap();
test_env.storage.try_wait_epoch_for_test(epoch2).await;
test_env.wait_sync_committed_version().await;

let sync_result3 = test_env
.storage
Expand All @@ -1602,7 +1602,7 @@ async fn test_hummock_version_reader() {
.commit_epoch(epoch3, sync_result3, false)
.await
.unwrap();
test_env.storage.try_wait_epoch_for_test(epoch3).await;
test_env.wait_sync_committed_version().await;
{
let (_, read_snapshot) = read_filter_for_version(
epoch1,
Expand Down Expand Up @@ -1996,7 +1996,7 @@ async fn test_get_with_min_epoch() {
.await
.unwrap();

test_env.storage.try_wait_epoch_for_test(epoch2).await;
test_env.wait_sync_committed_version().await;
let k = gen_key(0);
let prefix_hint = {
let mut ret = Vec::with_capacity(TABLE_PREFIX_LEN + k.len());
Expand Down Expand Up @@ -2433,7 +2433,7 @@ async fn test_table_watermark() {
};

test_env.commit_epoch(epoch1).await;
test_env.storage.try_wait_epoch_for_test(epoch1).await;
test_env.wait_sync_committed_version().await;

let (local1, local2) = test_after_epoch2(local1, local2).await;

Expand Down Expand Up @@ -2522,7 +2522,7 @@ async fn test_table_watermark() {
let (local1, local2) = test_after_epoch2(local1, local2).await;

test_env.commit_epoch(epoch2).await;
test_env.storage.try_wait_epoch_for_test(epoch2).await;
test_env.wait_sync_committed_version().await;

test_global_read(test_env.storage.clone(), epoch2).await;

Expand Down Expand Up @@ -2557,7 +2557,7 @@ async fn test_table_watermark() {
let (local1, local2) = test_after_epoch2(local1, local2).await;

test_env.commit_epoch(epoch3).await;
test_env.storage.try_wait_epoch_for_test(epoch3).await;
test_env.wait_sync_committed_version().await;

check_version_table_watermark(test_env.storage.get_pinned_version());

Expand Down
22 changes: 17 additions & 5 deletions src/storage/hummock_test/src/snapshot_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use risingwave_storage::hummock::{CachePolicy, HummockStorage};
use risingwave_storage::storage_value::StorageValue;
use risingwave_storage::store::{
LocalStateStore, NewLocalOptions, PrefetchOptions, ReadOptions, SealCurrentEpochOptions,
StateStoreRead, WriteOptions,
StateStoreRead, TryWaitEpochOptions, WriteOptions,
};
use risingwave_storage::StateStore;

Expand Down Expand Up @@ -157,7 +157,10 @@ async fn test_snapshot_inner(
.await
.unwrap();
hummock_storage
.try_wait_epoch(HummockReadEpoch::Committed(epoch1))
.try_wait_epoch(
HummockReadEpoch::Committed(epoch1),
TryWaitEpochOptions::for_test(Default::default()),
)
.await
.unwrap();
}
Expand Down Expand Up @@ -201,7 +204,10 @@ async fn test_snapshot_inner(
.await
.unwrap();
hummock_storage
.try_wait_epoch(HummockReadEpoch::Committed(epoch2))
.try_wait_epoch(
HummockReadEpoch::Committed(epoch2),
TryWaitEpochOptions::for_test(Default::default()),
)
.await
.unwrap();
}
Expand Down Expand Up @@ -251,7 +257,10 @@ async fn test_snapshot_inner(
.await
.unwrap();
hummock_storage
.try_wait_epoch(HummockReadEpoch::Committed(epoch3))
.try_wait_epoch(
HummockReadEpoch::Committed(epoch3),
TryWaitEpochOptions::for_test(Default::default()),
)
.await
.unwrap();
}
Expand Down Expand Up @@ -334,7 +343,10 @@ async fn test_snapshot_range_scan_inner(
.await
.unwrap();
hummock_storage
.try_wait_epoch(HummockReadEpoch::Committed(epoch))
.try_wait_epoch(
HummockReadEpoch::Committed(epoch),
TryWaitEpochOptions::for_test(local.table_id()),
)
.await
.unwrap();
}
Expand Down
Loading
Loading