Skip to content

Commit

Permalink
feat(storage): per table try wait epoch
Browse files Browse the repository at this point in the history
  • Loading branch information
wenym1 committed Sep 20, 2024
1 parent 4ba84de commit f1cf8ec
Show file tree
Hide file tree
Showing 33 changed files with 327 additions and 121 deletions.
1 change: 1 addition & 0 deletions proto/stream_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ message BarrierCompleteResponse {

message WaitEpochCommitRequest {
uint64 epoch = 1;
uint32 table_id = 2;
}

message WaitEpochCommitResponse {
Expand Down
11 changes: 9 additions & 2 deletions src/compute/src/rpc/service/stream_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use futures::{Stream, StreamExt, TryStreamExt};
use risingwave_pb::stream_service::stream_service_server::StreamService;
use risingwave_pb::stream_service::*;
use risingwave_storage::dispatch_state_store;
use risingwave_storage::store::TryWaitEpochOptions;
use risingwave_stream::error::StreamError;
use risingwave_stream::task::{LocalStreamManager, StreamEnvironment};
use tokio::sync::mpsc::unbounded_channel;
Expand Down Expand Up @@ -45,14 +46,20 @@ impl StreamService for StreamServiceImpl {
&self,
request: Request<WaitEpochCommitRequest>,
) -> Result<Response<WaitEpochCommitResponse>, Status> {
let epoch = request.into_inner().epoch;
let request = request.into_inner();
let epoch = request.epoch;

dispatch_state_store!(self.env.state_store(), store, {
use risingwave_hummock_sdk::HummockReadEpoch;
use risingwave_storage::StateStore;

store
.try_wait_epoch(HummockReadEpoch::Committed(epoch))
.try_wait_epoch(
HummockReadEpoch::Committed(epoch),
TryWaitEpochOptions {
table_id: request.table_id.into(),
},
)
.instrument_await(format!("wait_epoch_commit (epoch {})", epoch))
.await
.map_err(StreamError::from)?;
Expand Down
21 changes: 16 additions & 5 deletions src/meta/src/barrier/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ use risingwave_common::hash::ActorMapping;
use risingwave_common::types::Timestamptz;
use risingwave_common::util::epoch::Epoch;
use risingwave_connector::source::SplitImpl;
use risingwave_hummock_sdk::HummockEpoch;
use risingwave_pb::catalog::{CreateType, Table};
use risingwave_pb::common::PbWorkerNode;
use risingwave_pb::meta::table_fragments::PbActorStatus;
Expand Down Expand Up @@ -442,6 +441,8 @@ pub struct CommandContext {
pub prev_epoch: TracedEpoch,
pub curr_epoch: TracedEpoch,

pub table_ids_to_commit: HashSet<TableId>,

pub current_paused_reason: Option<PausedReason>,

pub command: Command,
Expand Down Expand Up @@ -470,12 +471,12 @@ impl std::fmt::Debug for CommandContext {
}

impl CommandContext {
#[allow(clippy::too_many_arguments)]
pub(super) fn new(
node_map: HashMap<WorkerId, PbWorkerNode>,
subscription_info: InflightSubscriptionInfo,
prev_epoch: TracedEpoch,
curr_epoch: TracedEpoch,
table_ids_to_commit: HashSet<TableId>,
current_paused_reason: Option<PausedReason>,
command: Command,
kind: BarrierKind,
Expand All @@ -487,6 +488,7 @@ impl CommandContext {
subscription_info,
prev_epoch,
curr_epoch,
table_ids_to_commit,
current_paused_reason,
command,
kind,
Expand Down Expand Up @@ -945,15 +947,24 @@ impl Command {
}

impl CommandContext {
pub async fn wait_epoch_commit(&self, epoch: HummockEpoch) -> MetaResult<()> {
pub async fn wait_epoch_commit(&self) -> MetaResult<()> {
let table_id = self.table_ids_to_commit.iter().next().cloned();
// try wait epoch on an existing random table id
let Some(table_id) = table_id else {
// no need to wait epoch when there is no table id
return Ok(());
};
let futures = self.node_map.values().map(|worker_node| async {
let client = self
.barrier_manager_context
.env
.stream_client_pool()
.get(worker_node)
.await?;
let request = WaitEpochCommitRequest { epoch };
let request = WaitEpochCommitRequest {
epoch: self.prev_epoch.value().0,
table_id: table_id.table_id,
};
client.wait_epoch_commit(request).await
});

Expand All @@ -976,7 +987,7 @@ impl CommandContext {
// storage version with this epoch is synced to all compute nodes before the
// execution of the next command of `Update`, as some newly created operators
// may immediately initialize their states on that barrier.
self.wait_epoch_commit(self.prev_epoch.value().0).await?;
self.wait_epoch_commit().await?;
}
}

Expand Down
5 changes: 3 additions & 2 deletions src/meta/src/barrier/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1029,11 +1029,14 @@ impl GlobalBarrierManager {
});
span.record("epoch", curr_epoch.value().0);

let table_ids_to_commit: HashSet<_> = pre_applied_graph_info.existing_table_ids().collect();

let command_ctx = Arc::new(CommandContext::new(
self.active_streaming_nodes.current().clone(),
pre_applied_subscription_info,
prev_epoch.clone(),
curr_epoch.clone(),
table_ids_to_commit.clone(),
self.state.paused_reason(),
command,
kind,
Expand All @@ -1043,8 +1046,6 @@ impl GlobalBarrierManager {

send_latency_timer.observe_duration();

let table_ids_to_commit: HashSet<_> = pre_applied_graph_info.existing_table_ids().collect();

let mut jobs_to_wait = HashSet::new();

for (table_id, creating_job) in &mut self.checkpoint_control.creating_streaming_job_controls
Expand Down
7 changes: 7 additions & 0 deletions src/storage/hummock_sdk/src/version.rs
Original file line number Diff line number Diff line change
Expand Up @@ -428,6 +428,13 @@ impl HummockVersion {
self.max_committed_epoch
}

pub fn table_committed_epoch(&self, table_id: TableId) -> Option<u64> {
self.state_table_info
.info()
.get(&table_id)
.map(|info| info.committed_epoch)
}

pub fn visible_table_committed_epoch(&self) -> u64 {
self.max_committed_epoch
}
Expand Down
10 changes: 7 additions & 3 deletions src/storage/hummock_test/src/bin/replay/replay_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use risingwave_hummock_sdk::{HummockReadEpoch, HummockVersionId, SyncResult};
use risingwave_hummock_trace::{
GlobalReplay, LocalReplay, LocalReplayRead, ReplayItem, ReplayRead, ReplayStateStore,
ReplayWrite, Result, TraceError, TracedBytes, TracedInitOptions, TracedNewLocalOptions,
TracedReadOptions, TracedSealCurrentEpochOptions, TracedSubResp,
TracedReadOptions, TracedSealCurrentEpochOptions, TracedSubResp, TracedTryWaitEpochOptions,
};
use risingwave_meta::manager::{MessageStatus, MetaSrvEnv, NotificationManagerRef, WorkerKey};
use risingwave_pb::common::WorkerNode;
Expand Down Expand Up @@ -170,9 +170,13 @@ impl ReplayStateStore for GlobalReplayImpl {
Box::new(LocalReplayImpl(local_storage))
}

async fn try_wait_epoch(&self, epoch: HummockReadEpoch) -> Result<()> {
async fn try_wait_epoch(
&self,
epoch: HummockReadEpoch,
options: TracedTryWaitEpochOptions,
) -> Result<()> {
self.store
.try_wait_epoch(epoch)
.try_wait_epoch(epoch, options.into())
.await
.map_err(|_| TraceError::TryWaitEpochFailed)?;
Ok(())
Expand Down
13 changes: 10 additions & 3 deletions src/storage/hummock_test/src/failpoint_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ use risingwave_storage::hummock::test_utils::{count_stream, default_opts_for_tes
use risingwave_storage::hummock::{CachePolicy, HummockStorage};
use risingwave_storage::storage_value::StorageValue;
use risingwave_storage::store::{
LocalStateStore, NewLocalOptions, PrefetchOptions, ReadOptions, StateStoreRead, WriteOptions,
LocalStateStore, NewLocalOptions, PrefetchOptions, ReadOptions, StateStoreRead,
TryWaitEpochOptions, WriteOptions,
};
use risingwave_storage::StateStore;

Expand Down Expand Up @@ -148,7 +149,10 @@ async fn test_failpoints_state_store_read_upload() {
.unwrap();
meta_client.commit_epoch(1, res, false).await.unwrap();
hummock_storage
.try_wait_epoch(HummockReadEpoch::Committed(1))
.try_wait_epoch(
HummockReadEpoch::Committed(1),
TryWaitEpochOptions::for_test(local.table_id()),
)
.await
.unwrap();
// clear block cache
Expand Down Expand Up @@ -225,7 +229,10 @@ async fn test_failpoints_state_store_read_upload() {
.unwrap();
meta_client.commit_epoch(3, res, false).await.unwrap();
hummock_storage
.try_wait_epoch(HummockReadEpoch::Committed(3))
.try_wait_epoch(
HummockReadEpoch::Committed(3),
TryWaitEpochOptions::for_test(local.table_id()),
)
.await
.unwrap();

Expand Down
24 changes: 12 additions & 12 deletions src/storage/hummock_test/src/hummock_storage_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -568,7 +568,7 @@ async fn test_state_store_sync() {
.commit_epoch(epoch1, res, false)
.await
.unwrap();
test_env.storage.try_wait_epoch_for_test(epoch1).await;
test_env.wait_sync_committed_version().await;
{
// after sync 1 epoch
let read_version = hummock_storage.read_version();
Expand Down Expand Up @@ -614,7 +614,7 @@ async fn test_state_store_sync() {
.commit_epoch(epoch2, res, false)
.await
.unwrap();
test_env.storage.try_wait_epoch_for_test(epoch2).await;
test_env.wait_sync_committed_version().await;
{
// after sync all epoch
let read_version = hummock_storage.read_version();
Expand Down Expand Up @@ -910,7 +910,7 @@ async fn test_delete_get() {
.commit_epoch(epoch2, res, false)
.await
.unwrap();
test_env.storage.try_wait_epoch_for_test(epoch2).await;
test_env.wait_sync_committed_version().await;
assert!(test_env
.storage
.get(
Expand Down Expand Up @@ -1096,7 +1096,7 @@ async fn test_multiple_epoch_sync() {
.commit_epoch(epoch3, sync_result3, false)
.await
.unwrap();
test_env.storage.try_wait_epoch_for_test(epoch3).await;
test_env.wait_sync_committed_version().await;
test_get().await;
}

Expand Down Expand Up @@ -1270,7 +1270,7 @@ async fn test_iter_with_min_epoch() {
.commit_epoch(epoch2, sync_result2, false)
.await
.unwrap();
test_env.storage.try_wait_epoch_for_test(epoch2).await;
test_env.wait_sync_committed_version().await;

{
let iter = test_env
Expand Down Expand Up @@ -1561,7 +1561,7 @@ async fn test_hummock_version_reader() {
.commit_epoch(epoch1, sync_result1, false)
.await
.unwrap();
test_env.storage.try_wait_epoch_for_test(epoch1).await;
test_env.wait_sync_committed_version().await;

let sync_result2 = test_env
.storage
Expand All @@ -1573,7 +1573,7 @@ async fn test_hummock_version_reader() {
.commit_epoch(epoch2, sync_result2, false)
.await
.unwrap();
test_env.storage.try_wait_epoch_for_test(epoch2).await;
test_env.wait_sync_committed_version().await;

let sync_result3 = test_env
.storage
Expand All @@ -1585,7 +1585,7 @@ async fn test_hummock_version_reader() {
.commit_epoch(epoch3, sync_result3, false)
.await
.unwrap();
test_env.storage.try_wait_epoch_for_test(epoch3).await;
test_env.wait_sync_committed_version().await;
{
let (_, read_snapshot) = read_filter_for_version(
epoch1,
Expand Down Expand Up @@ -1979,7 +1979,7 @@ async fn test_get_with_min_epoch() {
.await
.unwrap();

test_env.storage.try_wait_epoch_for_test(epoch2).await;
test_env.wait_sync_committed_version().await;
let k = gen_key(0);
let prefix_hint = {
let mut ret = Vec::with_capacity(TABLE_PREFIX_LEN + k.len());
Expand Down Expand Up @@ -2409,7 +2409,7 @@ async fn test_table_watermark() {
};

test_env.commit_epoch(epoch1).await;
test_env.storage.try_wait_epoch_for_test(epoch1).await;
test_env.wait_sync_committed_version().await;

let (local1, local2) = test_after_epoch2(local1, local2).await;

Expand Down Expand Up @@ -2498,7 +2498,7 @@ async fn test_table_watermark() {
let (local1, local2) = test_after_epoch2(local1, local2).await;

test_env.commit_epoch(epoch2).await;
test_env.storage.try_wait_epoch_for_test(epoch2).await;
test_env.wait_sync_committed_version().await;

test_global_read(test_env.storage.clone(), epoch2).await;

Expand Down Expand Up @@ -2533,7 +2533,7 @@ async fn test_table_watermark() {
let (local1, local2) = test_after_epoch2(local1, local2).await;

test_env.commit_epoch(epoch3).await;
test_env.storage.try_wait_epoch_for_test(epoch3).await;
test_env.wait_sync_committed_version().await;

check_version_table_watermark(test_env.storage.get_pinned_version());

Expand Down
22 changes: 17 additions & 5 deletions src/storage/hummock_test/src/snapshot_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use risingwave_storage::hummock::{CachePolicy, HummockStorage};
use risingwave_storage::storage_value::StorageValue;
use risingwave_storage::store::{
LocalStateStore, NewLocalOptions, PrefetchOptions, ReadOptions, SealCurrentEpochOptions,
StateStoreRead, WriteOptions,
StateStoreRead, TryWaitEpochOptions, WriteOptions,
};
use risingwave_storage::StateStore;

Expand Down Expand Up @@ -149,7 +149,10 @@ async fn test_snapshot_inner(
.await
.unwrap();
hummock_storage
.try_wait_epoch(HummockReadEpoch::Committed(epoch1))
.try_wait_epoch(
HummockReadEpoch::Committed(epoch1),
TryWaitEpochOptions::for_test(Default::default()),
)
.await
.unwrap();
}
Expand Down Expand Up @@ -193,7 +196,10 @@ async fn test_snapshot_inner(
.await
.unwrap();
hummock_storage
.try_wait_epoch(HummockReadEpoch::Committed(epoch2))
.try_wait_epoch(
HummockReadEpoch::Committed(epoch2),
TryWaitEpochOptions::for_test(Default::default()),
)
.await
.unwrap();
}
Expand Down Expand Up @@ -236,7 +242,10 @@ async fn test_snapshot_inner(
.await
.unwrap();
hummock_storage
.try_wait_epoch(HummockReadEpoch::Committed(epoch3))
.try_wait_epoch(
HummockReadEpoch::Committed(epoch3),
TryWaitEpochOptions::for_test(Default::default()),
)
.await
.unwrap();
}
Expand Down Expand Up @@ -298,7 +307,10 @@ async fn test_snapshot_range_scan_inner(
.await
.unwrap();
hummock_storage
.try_wait_epoch(HummockReadEpoch::Committed(epoch))
.try_wait_epoch(
HummockReadEpoch::Committed(epoch),
TryWaitEpochOptions::for_test(local.table_id()),
)
.await
.unwrap();
}
Expand Down
Loading

0 comments on commit f1cf8ec

Please sign in to comment.