Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(stream): wait committed epoch in state table init_epoch #19223

Merged
merged 17 commits into from
Nov 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion proto/stream_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,7 @@
}

message StreamingControlStreamRequest {
message InitRequest {

Check failure on line 66 in proto/stream_service.proto

View workflow job for this annotation

GitHub Actions / Check breaking changes in Protobuf files

Previously present field "1" with name "version_id" on message "InitRequest" was deleted without reserving the name "version_id".

Check failure on line 66 in proto/stream_service.proto

View workflow job for this annotation

GitHub Actions / Check breaking changes in Protobuf files

Previously present field "1" with name "version_id" on message "InitRequest" was deleted without reserving the number "1".
uint64 version_id = 1;
repeated stream_plan.SubscriptionUpstreamInfo subscriptions = 2;
}

Expand Down
2 changes: 1 addition & 1 deletion src/compute/tests/integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -471,7 +471,7 @@ async fn test_row_seq_scan() -> StreamResult<()> {
);

let mut epoch = EpochPair::new_test_epoch(test_epoch(1));
state.init_epoch(epoch);
state.init_epoch(epoch).await?;
state.insert(OwnedRow::new(vec![
Some(1_i32.into()),
Some(4_i32.into()),
Expand Down
10 changes: 8 additions & 2 deletions src/ctl/src/cmd_impl/bench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use std::sync::atomic::AtomicU64;
use std::sync::Arc;
use std::time::Instant;

use anyhow::Result;
use anyhow::{anyhow, Result};
use clap::Subcommand;
use futures::future::try_join_all;
use futures::{pin_mut, Future, StreamExt};
Expand Down Expand Up @@ -97,6 +97,11 @@ pub async fn do_bench(context: &CtlContext, cmd: BenchCommands) -> Result<()> {
)?)
.await?;
let table = get_table_catalog(meta.clone(), mv_name).await?;
let committed_epoch = hummock
.inner()
.get_pinned_version()
.table_committed_epoch(table.id)
.ok_or_else(|| anyhow!("table id {} not exist", table.id))?;
let mut handlers = vec![];
for i in 0..threads {
let table = table.clone();
Expand All @@ -107,7 +112,8 @@ pub async fn do_bench(context: &CtlContext, cmd: BenchCommands) -> Result<()> {
tracing::info!(thread = i, "starting scan");
let state_table = {
let mut tb = make_state_table(hummock, &table).await;
tb.init_epoch(EpochPair::new_test_epoch(u64::MAX));
tb.init_epoch(EpochPair::new(u64::MAX, committed_epoch))
.await?;
tb
};
loop {
Expand Down
6 changes: 1 addition & 5 deletions src/meta/src/barrier/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -474,16 +474,12 @@ impl GlobalBarrierWorkerContextImpl {
node: &WorkerNode,
subscriptions: impl Iterator<Item = SubscriptionUpstreamInfo>,
) -> MetaResult<StreamingControlHandle> {
let initial_version_id = self
.hummock_manager
.on_current_version(|version| version.id)
.await;
let handle = self
.env
.stream_client_pool()
.get(node)
.await?
.start_streaming_control(initial_version_id, subscriptions)
.start_streaming_control(subscriptions)
.await?;
Ok(handle)
}
Expand Down
3 changes: 0 additions & 3 deletions src/rpc_client/src/stream_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ use futures::TryStreamExt;
use risingwave_common::config::MAX_CONNECTION_WINDOW_SIZE;
use risingwave_common::monitor::{EndpointExt, TcpConfig};
use risingwave_common::util::addr::HostAddr;
use risingwave_hummock_sdk::HummockVersionId;
use risingwave_pb::stream_plan::SubscriptionUpstreamInfo;
use risingwave_pb::stream_service::stream_service_client::StreamServiceClient;
use risingwave_pb::stream_service::streaming_control_stream_request::InitRequest;
Expand Down Expand Up @@ -87,13 +86,11 @@ pub type StreamingControlHandle =
impl StreamClient {
pub async fn start_streaming_control(
&self,
version_id: HummockVersionId,
subscriptions: impl Iterator<Item = SubscriptionUpstreamInfo>,
) -> Result<StreamingControlHandle> {
let first_request = StreamingControlStreamRequest {
request: Some(streaming_control_stream_request::Request::Init(
InitRequest {
version_id: version_id.to_u64(),
subscriptions: subscriptions.collect(),
},
)),
Expand Down
37 changes: 32 additions & 5 deletions src/storage/hummock_test/src/compactor_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -724,7 +724,11 @@ pub(crate) mod tests {
let val = Bytes::from(b"0"[..].to_vec()); // 1 Byte value

let kv_count = 11;
// let base_epoch = Epoch(0);
let prev_epoch: u64 = hummock_manager_ref
.get_current_version()
.await
.table_committed_epoch(existing_table_id.into())
.unwrap();
let base_epoch = Epoch::now();
let mut epoch: u64 = base_epoch.0;
let millisec_interval_epoch: u64 = (1 << 16) * 100;
Expand All @@ -741,7 +745,10 @@ pub(crate) mod tests {
let next_epoch = epoch + millisec_interval_epoch;
storage.start_epoch(next_epoch, table_id_set.clone());
if i == 0 {
local.init_for_test(epoch).await.unwrap();
local
.init_for_test_with_prev_epoch(epoch, prev_epoch)
.await
.unwrap();
}
epoch_set.insert(epoch);
let mut prefix = BytesMut::default();
Expand Down Expand Up @@ -935,6 +942,11 @@ pub(crate) mod tests {
// 1. add sstables
let val = Bytes::from(b"0"[..].to_vec()); // 1 Byte value
let kv_count = 11;
let prev_epoch: u64 = hummock_manager_ref
.get_current_version()
.await
.table_committed_epoch(existing_table_id.into())
.unwrap();
// let base_epoch = Epoch(0);
let base_epoch = Epoch::now();
let mut epoch: u64 = base_epoch.0;
Expand All @@ -948,7 +960,10 @@ pub(crate) mod tests {
storage.start_epoch(epoch, table_id_set.clone());
for i in 0..kv_count {
if i == 0 {
local.init_for_test(epoch).await.unwrap();
local
.init_for_test_with_prev_epoch(epoch, prev_epoch)
.await
.unwrap();
}
let next_epoch = epoch + millisec_interval_epoch;
storage.start_epoch(next_epoch, table_id_set.clone());
Expand Down Expand Up @@ -1912,11 +1927,23 @@ pub(crate) mod tests {
let table_id_set =
HashSet::from_iter(vec![local_1.0.table_id(), local_2.0.table_id()].into_iter());

let version = hummock_meta_client.get_current_version().await.unwrap();

storage.start_epoch(*epoch, table_id_set.clone());
for i in 0..kv_count {
if i == 0 && *is_init {
local_1.0.init_for_test(*epoch).await.unwrap();
local_2.0.init_for_test(*epoch).await.unwrap();
let prev_epoch_1 = version.table_committed_epoch(local_1.0.table_id()).unwrap();
local_1
.0
.init_for_test_with_prev_epoch(*epoch, prev_epoch_1)
.await
.unwrap();
let prev_epoch_2 = version.table_committed_epoch(local_2.0.table_id()).unwrap();
local_2
.0
.init_for_test_with_prev_epoch(*epoch, prev_epoch_2)
.await
.unwrap();

*is_init = false;
}
Expand Down
52 changes: 46 additions & 6 deletions src/storage/hummock_test/src/hummock_storage_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -476,7 +476,7 @@ async fn test_state_store_sync() {
.committed()
.table_committed_epoch(TEST_TABLE_ID)
.unwrap();
let epoch1 = test_epoch(base_epoch.next_epoch());
let epoch1 = base_epoch.next_epoch();
test_env
.storage
.start_epoch(epoch1, HashSet::from_iter([TEST_TABLE_ID]));
Expand Down Expand Up @@ -1133,6 +1133,13 @@ async fn test_iter_with_min_epoch() {
.new_local(NewLocalOptions::for_test(TEST_TABLE_ID))
.await;

let prev_epoch = test_env
.manager
.get_current_version()
.await
.table_committed_epoch(TEST_TABLE_ID)
.unwrap();

let epoch1 = (31 * 1000) << 16;
test_env
.storage
Expand All @@ -1149,7 +1156,10 @@ async fn test_iter_with_min_epoch() {
.map(|index| (gen_key(index), StorageValue::new_put(gen_val(index))))
.collect();

hummock_storage.init_for_test(epoch1).await.unwrap();
hummock_storage
.init_for_test_with_prev_epoch(epoch1, prev_epoch)
.await
.unwrap();

hummock_storage
.ingest_batch(
Expand Down Expand Up @@ -1422,7 +1432,16 @@ async fn test_hummock_version_reader() {
.map(|index| (gen_key(index), StorageValue::new_put(gen_val(index))))
.collect();
{
hummock_storage.init_for_test(epoch1).await.unwrap();
let prev_epoch = test_env
.manager
.get_current_version()
.await
.table_committed_epoch(TEST_TABLE_ID)
.unwrap();
hummock_storage
.init_for_test_with_prev_epoch(epoch1, prev_epoch)
.await
.unwrap();
hummock_storage
.ingest_batch(
batch_epoch1,
Expand Down Expand Up @@ -1852,7 +1871,16 @@ async fn test_get_with_min_epoch() {
test_env
.storage
.start_epoch(epoch1, HashSet::from_iter([TEST_TABLE_ID]));
hummock_storage.init_for_test(epoch1).await.unwrap();
let prev_epoch = test_env
.manager
.get_current_version()
.await
.table_committed_epoch(TEST_TABLE_ID)
.unwrap();
hummock_storage
.init_for_test_with_prev_epoch(epoch1, prev_epoch)
.await
.unwrap();

let gen_key = |index: usize| -> TableKey<Bytes> {
gen_key_from_str(VirtualNode::ZERO, format!("key_{}", index).as_str())
Expand Down Expand Up @@ -2125,9 +2153,21 @@ async fn test_table_watermark() {
test_env
.storage
.start_epoch(epoch1, HashSet::from_iter([TEST_TABLE_ID]));
local1.init_for_test(epoch1).await.unwrap();
let prev_epoch = test_env
.manager
.get_current_version()
.await
.table_committed_epoch(TEST_TABLE_ID)
.unwrap();
local1
.init_for_test_with_prev_epoch(epoch1, prev_epoch)
.await
.unwrap();
local1.update_vnode_bitmap(vnode_bitmap1.clone());
local2.init_for_test(epoch1).await.unwrap();
local2
.init_for_test_with_prev_epoch(epoch1, prev_epoch)
.await
.unwrap();
local2.update_vnode_bitmap(vnode_bitmap2.clone());

fn gen_inner_key(index: usize) -> Bytes {
Expand Down
8 changes: 8 additions & 0 deletions src/storage/hummock_test/src/local_state_store_test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,5 +22,13 @@ pub trait LocalStateStoreTestExt: LocalStateStore {
fn init_for_test(&mut self, epoch: u64) -> impl Future<Output = StorageResult<()>> + Send + '_ {
self.init(InitOptions::new(EpochPair::new_test_epoch(epoch)))
}

fn init_for_test_with_prev_epoch(
&mut self,
epoch: u64,
prev_epoch: u64,
) -> impl Future<Output = StorageResult<()>> + Send + '_ {
self.init(InitOptions::new(EpochPair::new(epoch, prev_epoch)))
}
}
impl<T: LocalStateStore> LocalStateStoreTestExt for T {}
7 changes: 3 additions & 4 deletions src/storage/hummock_test/src/state_store_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1357,9 +1357,7 @@ async fn test_clear_shared_buffer() {

drop(local_hummock_storage);

hummock_storage
.clear_shared_buffer(hummock_storage.get_pinned_version().id())
.await;
hummock_storage.clear_shared_buffer().await;
}

/// Test the following behaviours:
Expand Down Expand Up @@ -1480,7 +1478,8 @@ async fn test_replicated_local_hummock_storage() {
.new_local(NewLocalOptions::for_test(TEST_TABLE_ID))
.await;

local_hummock_storage_2.init_for_test(epoch2).await.unwrap();
local_hummock_storage_2.init_for_test(epoch1).await.unwrap();
local_hummock_storage_2.seal_current_epoch(epoch2, SealCurrentEpochOptions::for_test());

// ingest 16B batch
let mut batch2 = vec![
Expand Down
Loading
Loading