Skip to content

Commit

Permalink
Merge branch 'main' into yiming/global-barrier-worker-reorg-second
Browse files Browse the repository at this point in the history
  • Loading branch information
wenym1 committed Nov 12, 2024
2 parents 99ffc35 + fe65509 commit 774aeb6
Show file tree
Hide file tree
Showing 63 changed files with 398 additions and 466 deletions.
11 changes: 11 additions & 0 deletions e2e_test/source_inline/fs/posix_fs.slt
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,19 @@ CREATE TABLE diamonds (
connector = 'posix_fs',
match_pattern = 'data*.csv',
posix_fs.root = 'e2e_test/source_inline/fs/data',
source_rate_limit = 0
) FORMAT PLAIN ENCODE CSV ( without_header = 'false', delimiter = ',');

sleep 1s

# no output due to rate limit
query TTTT rowsort
select * from diamonds;
----

statement ok
ALTER TABLE diamonds SET source_rate_limit TO DEFAULT;

sleep 10s

query TTTT rowsort
Expand Down
1 change: 0 additions & 1 deletion proto/stream_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,6 @@ message WaitEpochCommitResponse {

message StreamingControlStreamRequest {
message InitRequest {
uint64 version_id = 1;
repeated stream_plan.SubscriptionUpstreamInfo subscriptions = 2;
}

Expand Down
2 changes: 1 addition & 1 deletion src/compute/tests/integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -471,7 +471,7 @@ async fn test_row_seq_scan() -> StreamResult<()> {
);

let mut epoch = EpochPair::new_test_epoch(test_epoch(1));
state.init_epoch(epoch);
state.init_epoch(epoch).await?;
state.insert(OwnedRow::new(vec![
Some(1_i32.into()),
Some(4_i32.into()),
Expand Down
10 changes: 8 additions & 2 deletions src/ctl/src/cmd_impl/bench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use std::sync::atomic::AtomicU64;
use std::sync::Arc;
use std::time::Instant;

use anyhow::Result;
use anyhow::{anyhow, Result};
use clap::Subcommand;
use futures::future::try_join_all;
use futures::{pin_mut, Future, StreamExt};
Expand Down Expand Up @@ -97,6 +97,11 @@ pub async fn do_bench(context: &CtlContext, cmd: BenchCommands) -> Result<()> {
)?)
.await?;
let table = get_table_catalog(meta.clone(), mv_name).await?;
let committed_epoch = hummock
.inner()
.get_pinned_version()
.table_committed_epoch(table.id)
.ok_or_else(|| anyhow!("table id {} not exist", table.id))?;
let mut handlers = vec![];
for i in 0..threads {
let table = table.clone();
Expand All @@ -107,7 +112,8 @@ pub async fn do_bench(context: &CtlContext, cmd: BenchCommands) -> Result<()> {
tracing::info!(thread = i, "starting scan");
let state_table = {
let mut tb = make_state_table(hummock, &table).await;
tb.init_epoch(EpochPair::new_test_epoch(u64::MAX));
tb.init_epoch(EpochPair::new(u64::MAX, committed_epoch))
.await?;
tb
};
loop {
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/optimizer/plan_node/stream_fs_fetch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ impl StreamNode for StreamFsFetch {
.map(|c| c.to_protobuf())
.collect_vec(),
with_properties,
rate_limit: self.base.ctx().overwrite_options().source_rate_limit,
rate_limit: source_catalog.rate_limit,
secret_refs,
}
});
Expand Down
6 changes: 1 addition & 5 deletions src/meta/src/barrier/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -474,16 +474,12 @@ impl GlobalBarrierWorkerContextImpl {
node: &WorkerNode,
subscriptions: impl Iterator<Item = SubscriptionUpstreamInfo>,
) -> MetaResult<StreamingControlHandle> {
let initial_version_id = self
.hummock_manager()
.on_current_version(|version| version.id)
.await;
let handle = self
.env
.stream_client_pool()
.get(node)
.await?
.start_streaming_control(initial_version_id, subscriptions)
.start_streaming_control(subscriptions)
.await?;
Ok(handle)
}
Expand Down
16 changes: 16 additions & 0 deletions src/meta/src/controller/streaming_job.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use risingwave_common::hash::VnodeCountCompat;
use risingwave_common::util::column_index_mapping::ColIndexMapping;
use risingwave_common::util::stream_graph_visitor::visit_stream_node;
use risingwave_common::{bail, current_cluster_version};
use risingwave_connector::WithPropertiesExt;
use risingwave_meta_model::actor::ActorStatus;
use risingwave_meta_model::actor_dispatcher::DispatcherType;
use risingwave_meta_model::object::ObjectType;
Expand Down Expand Up @@ -1276,6 +1277,7 @@ impl CatalogController {
MetaError::catalog_id_not_found(ObjectType::Source.as_str(), source_id)
})?;

let is_fs_source = source.with_properties.inner_ref().is_new_fs_connector();
let streaming_job_ids: Vec<ObjectId> =
if let Some(table_id) = source.optional_associated_table_id {
vec![table_id]
Expand Down Expand Up @@ -1330,6 +1332,20 @@ impl CatalogController {
}
});
}
if is_fs_source && *fragment_type_mask == PbFragmentTypeFlag::FragmentUnspecified as i32
{
// when create table with fs connector, the fragment type is unspecified
visit_stream_node(stream_node, |node| {
if let PbNodeBody::StreamFsFetch(node) = node {
if let Some(node_inner) = &mut node.node_inner
&& node_inner.source_id == source_id as u32
{
node_inner.rate_limit = rate_limit;
found = true;
}
}
});
}
found
});

Expand Down
9 changes: 9 additions & 0 deletions src/meta/src/stream/stream_graph/fragment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,15 @@ impl BuildingFragment {
dml_node.table_id = job_id;
dml_node.table_version_id = job.table_version_id().unwrap();
}
NodeBody::StreamFsFetch(fs_fetch_node) => {
if let StreamingJob::Table(table_source, _, _) = job {
if let Some(node_inner) = fs_fetch_node.node_inner.as_mut()
&& let Some(source) = table_source
{
node_inner.source_id = source.id;
}
}
}
NodeBody::Source(source_node) => {
match job {
// Note: For table without connector, it has a dummy Source node.
Expand Down
3 changes: 0 additions & 3 deletions src/rpc_client/src/stream_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ use futures::TryStreamExt;
use risingwave_common::config::MAX_CONNECTION_WINDOW_SIZE;
use risingwave_common::monitor::{EndpointExt, TcpConfig};
use risingwave_common::util::addr::HostAddr;
use risingwave_hummock_sdk::HummockVersionId;
use risingwave_pb::stream_plan::SubscriptionUpstreamInfo;
use risingwave_pb::stream_service::stream_service_client::StreamServiceClient;
use risingwave_pb::stream_service::streaming_control_stream_request::InitRequest;
Expand Down Expand Up @@ -87,13 +86,11 @@ pub type StreamingControlHandle =
impl StreamClient {
pub async fn start_streaming_control(
&self,
version_id: HummockVersionId,
subscriptions: impl Iterator<Item = SubscriptionUpstreamInfo>,
) -> Result<StreamingControlHandle> {
let first_request = StreamingControlStreamRequest {
request: Some(streaming_control_stream_request::Request::Init(
InitRequest {
version_id: version_id.to_u64(),
subscriptions: subscriptions.collect(),
},
)),
Expand Down
37 changes: 32 additions & 5 deletions src/storage/hummock_test/src/compactor_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -724,7 +724,11 @@ pub(crate) mod tests {
let val = Bytes::from(b"0"[..].to_vec()); // 1 Byte value

let kv_count = 11;
// let base_epoch = Epoch(0);
let prev_epoch: u64 = hummock_manager_ref
.get_current_version()
.await
.table_committed_epoch(existing_table_id.into())
.unwrap();
let base_epoch = Epoch::now();
let mut epoch: u64 = base_epoch.0;
let millisec_interval_epoch: u64 = (1 << 16) * 100;
Expand All @@ -741,7 +745,10 @@ pub(crate) mod tests {
let next_epoch = epoch + millisec_interval_epoch;
storage.start_epoch(next_epoch, table_id_set.clone());
if i == 0 {
local.init_for_test(epoch).await.unwrap();
local
.init_for_test_with_prev_epoch(epoch, prev_epoch)
.await
.unwrap();
}
epoch_set.insert(epoch);
let mut prefix = BytesMut::default();
Expand Down Expand Up @@ -935,6 +942,11 @@ pub(crate) mod tests {
// 1. add sstables
let val = Bytes::from(b"0"[..].to_vec()); // 1 Byte value
let kv_count = 11;
let prev_epoch: u64 = hummock_manager_ref
.get_current_version()
.await
.table_committed_epoch(existing_table_id.into())
.unwrap();
// let base_epoch = Epoch(0);
let base_epoch = Epoch::now();
let mut epoch: u64 = base_epoch.0;
Expand All @@ -948,7 +960,10 @@ pub(crate) mod tests {
storage.start_epoch(epoch, table_id_set.clone());
for i in 0..kv_count {
if i == 0 {
local.init_for_test(epoch).await.unwrap();
local
.init_for_test_with_prev_epoch(epoch, prev_epoch)
.await
.unwrap();
}
let next_epoch = epoch + millisec_interval_epoch;
storage.start_epoch(next_epoch, table_id_set.clone());
Expand Down Expand Up @@ -1912,11 +1927,23 @@ pub(crate) mod tests {
let table_id_set =
HashSet::from_iter(vec![local_1.0.table_id(), local_2.0.table_id()].into_iter());

let version = hummock_meta_client.get_current_version().await.unwrap();

storage.start_epoch(*epoch, table_id_set.clone());
for i in 0..kv_count {
if i == 0 && *is_init {
local_1.0.init_for_test(*epoch).await.unwrap();
local_2.0.init_for_test(*epoch).await.unwrap();
let prev_epoch_1 = version.table_committed_epoch(local_1.0.table_id()).unwrap();
local_1
.0
.init_for_test_with_prev_epoch(*epoch, prev_epoch_1)
.await
.unwrap();
let prev_epoch_2 = version.table_committed_epoch(local_2.0.table_id()).unwrap();
local_2
.0
.init_for_test_with_prev_epoch(*epoch, prev_epoch_2)
.await
.unwrap();

*is_init = false;
}
Expand Down
52 changes: 46 additions & 6 deletions src/storage/hummock_test/src/hummock_storage_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -476,7 +476,7 @@ async fn test_state_store_sync() {
.committed()
.table_committed_epoch(TEST_TABLE_ID)
.unwrap();
let epoch1 = test_epoch(base_epoch.next_epoch());
let epoch1 = base_epoch.next_epoch();
test_env
.storage
.start_epoch(epoch1, HashSet::from_iter([TEST_TABLE_ID]));
Expand Down Expand Up @@ -1133,6 +1133,13 @@ async fn test_iter_with_min_epoch() {
.new_local(NewLocalOptions::for_test(TEST_TABLE_ID))
.await;

let prev_epoch = test_env
.manager
.get_current_version()
.await
.table_committed_epoch(TEST_TABLE_ID)
.unwrap();

let epoch1 = (31 * 1000) << 16;
test_env
.storage
Expand All @@ -1149,7 +1156,10 @@ async fn test_iter_with_min_epoch() {
.map(|index| (gen_key(index), StorageValue::new_put(gen_val(index))))
.collect();

hummock_storage.init_for_test(epoch1).await.unwrap();
hummock_storage
.init_for_test_with_prev_epoch(epoch1, prev_epoch)
.await
.unwrap();

hummock_storage
.ingest_batch(
Expand Down Expand Up @@ -1422,7 +1432,16 @@ async fn test_hummock_version_reader() {
.map(|index| (gen_key(index), StorageValue::new_put(gen_val(index))))
.collect();
{
hummock_storage.init_for_test(epoch1).await.unwrap();
let prev_epoch = test_env
.manager
.get_current_version()
.await
.table_committed_epoch(TEST_TABLE_ID)
.unwrap();
hummock_storage
.init_for_test_with_prev_epoch(epoch1, prev_epoch)
.await
.unwrap();
hummock_storage
.ingest_batch(
batch_epoch1,
Expand Down Expand Up @@ -1852,7 +1871,16 @@ async fn test_get_with_min_epoch() {
test_env
.storage
.start_epoch(epoch1, HashSet::from_iter([TEST_TABLE_ID]));
hummock_storage.init_for_test(epoch1).await.unwrap();
let prev_epoch = test_env
.manager
.get_current_version()
.await
.table_committed_epoch(TEST_TABLE_ID)
.unwrap();
hummock_storage
.init_for_test_with_prev_epoch(epoch1, prev_epoch)
.await
.unwrap();

let gen_key = |index: usize| -> TableKey<Bytes> {
gen_key_from_str(VirtualNode::ZERO, format!("key_{}", index).as_str())
Expand Down Expand Up @@ -2125,9 +2153,21 @@ async fn test_table_watermark() {
test_env
.storage
.start_epoch(epoch1, HashSet::from_iter([TEST_TABLE_ID]));
local1.init_for_test(epoch1).await.unwrap();
let prev_epoch = test_env
.manager
.get_current_version()
.await
.table_committed_epoch(TEST_TABLE_ID)
.unwrap();
local1
.init_for_test_with_prev_epoch(epoch1, prev_epoch)
.await
.unwrap();
local1.update_vnode_bitmap(vnode_bitmap1.clone());
local2.init_for_test(epoch1).await.unwrap();
local2
.init_for_test_with_prev_epoch(epoch1, prev_epoch)
.await
.unwrap();
local2.update_vnode_bitmap(vnode_bitmap2.clone());

fn gen_inner_key(index: usize) -> Bytes {
Expand Down
8 changes: 8 additions & 0 deletions src/storage/hummock_test/src/local_state_store_test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,5 +22,13 @@ pub trait LocalStateStoreTestExt: LocalStateStore {
fn init_for_test(&mut self, epoch: u64) -> impl Future<Output = StorageResult<()>> + Send + '_ {
self.init(InitOptions::new(EpochPair::new_test_epoch(epoch)))
}

fn init_for_test_with_prev_epoch(
&mut self,
epoch: u64,
prev_epoch: u64,
) -> impl Future<Output = StorageResult<()>> + Send + '_ {
self.init(InitOptions::new(EpochPair::new(epoch, prev_epoch)))
}
}
impl<T: LocalStateStore> LocalStateStoreTestExt for T {}
7 changes: 3 additions & 4 deletions src/storage/hummock_test/src/state_store_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1357,9 +1357,7 @@ async fn test_clear_shared_buffer() {

drop(local_hummock_storage);

hummock_storage
.clear_shared_buffer(hummock_storage.get_pinned_version().id())
.await;
hummock_storage.clear_shared_buffer().await;
}

/// Test the following behaviours:
Expand Down Expand Up @@ -1480,7 +1478,8 @@ async fn test_replicated_local_hummock_storage() {
.new_local(NewLocalOptions::for_test(TEST_TABLE_ID))
.await;

local_hummock_storage_2.init_for_test(epoch2).await.unwrap();
local_hummock_storage_2.init_for_test(epoch1).await.unwrap();
local_hummock_storage_2.seal_current_epoch(epoch2, SealCurrentEpochOptions::for_test());

// ingest 16B batch
let mut batch2 = vec![
Expand Down
Loading

0 comments on commit 774aeb6

Please sign in to comment.