Skip to content

Commit

Permalink
feat: call clear shared buffer with latest version id (#17972)
Browse files Browse the repository at this point in the history
  • Loading branch information
wenym1 authored Aug 12, 2024
1 parent b2b5a33 commit 4a38e0c
Show file tree
Hide file tree
Showing 30 changed files with 162 additions and 182 deletions.
2 changes: 1 addition & 1 deletion ci/scripts/run-e2e-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ mv target/debug/risingwave_e2e_extended_mode_test-"$profile" target/debug/rising
chmod +x ./target/debug/risingwave_e2e_extended_mode_test

echo "--- e2e, $mode, streaming"
RUST_LOG="info,risingwave_stream=info,risingwave_batch=info,risingwave_storage=info" \
RUST_LOG="info,risingwave_stream=info,risingwave_batch=info,risingwave_storage=info,risingwave_stream::common::table::state_table=warn" \
cluster_start
# Please make sure the regression is expected before increasing the timeout.
sqllogictest -p 4566 -d dev './e2e_test/streaming/**/*.slt' --junit "streaming-${profile}"
Expand Down
2 changes: 1 addition & 1 deletion proto/stream_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ message WaitEpochCommitResponse {

message StreamingControlStreamRequest {
message InitRequest {
uint64 prev_epoch = 2;
uint64 version_id = 1;
}

message RemovePartialGraphRequest {
Expand Down
7 changes: 5 additions & 2 deletions src/meta/service/src/hummock_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,10 +88,13 @@ impl HummockManagerService for HummockServiceImpl {
&self,
_request: Request<GetCurrentVersionRequest>,
) -> Result<Response<GetCurrentVersionResponse>, Status> {
let current_version = self.hummock_manager.get_current_version().await;
let current_version = self
.hummock_manager
.on_current_version(|version| version.into())
.await;
Ok(Response::new(GetCurrentVersionResponse {
status: None,
current_version: Some(current_version.into()),
current_version: Some(current_version),
}))
}

Expand Down
7 changes: 5 additions & 2 deletions src/meta/service/src/notification_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -346,13 +346,16 @@ impl NotificationServiceImpl {

async fn hummock_subscribe(&self) -> MetaResult<MetaSnapshot> {
let (tables, catalog_version) = self.get_tables_and_creating_tables_snapshot().await?;
let hummock_version = self.hummock_manager.get_current_version().await;
let hummock_version = self
.hummock_manager
.on_current_version(|version| version.into())
.await;
let hummock_write_limits = self.hummock_manager.write_limits().await;
let meta_backup_manifest_id = self.backup_manager.manifest().manifest_id;

Ok(MetaSnapshot {
tables,
hummock_version: Some(hummock_version.into()),
hummock_version: Some(hummock_version),
version: Some(SnapshotVersion {
catalog_version,
..Default::default()
Expand Down
7 changes: 5 additions & 2 deletions src/meta/src/backup_restore/backup_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -346,8 +346,11 @@ impl BackupWorker {
let backup_manager_clone = self.backup_manager.clone();
let job = async move {
let hummock_manager = backup_manager_clone.hummock_manager.clone();
let hummock_version_builder =
async move { hummock_manager.get_current_version().await };
let hummock_version_builder = async move {
hummock_manager
.on_current_version(|version| version.clone())
.await
};
match backup_manager_clone.env.meta_store() {
MetaStoreImpl::Kv(kv) => {
let mut snapshot_builder =
Expand Down
22 changes: 13 additions & 9 deletions src/meta/src/barrier/recovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use itertools::Itertools;
use risingwave_common::catalog::TableId;
use risingwave_common::config::DefaultParallelism;
use risingwave_common::hash::WorkerSlotId;
use risingwave_common::util::epoch::Epoch;
use risingwave_meta_model_v2::StreamingParallelism;
use risingwave_pb::common::ActorInfo;
use risingwave_pb::meta::subscribe_response::{Info, Operation};
Expand Down Expand Up @@ -228,14 +229,6 @@ impl GlobalBarrierManager {
///
/// Returns the new state of the barrier manager after recovery.
pub async fn recovery(&mut self, paused_reason: Option<PausedReason>, err: Option<MetaError>) {
let prev_epoch = TracedEpoch::new(
self.context
.hummock_manager
.latest_snapshot()
.committed_epoch
.into(),
);

// Mark blocked and abort buffered schedules, they might be dirty already.
self.scheduled_barriers
.abort_and_mark_blocked("cluster is under recovering");
Expand Down Expand Up @@ -334,11 +327,22 @@ impl GlobalBarrierManager {
.await
.context("purge state table from hummock")?;

let (prev_epoch, version_id) = self
.context
.hummock_manager
.on_current_version(|version| {
(
TracedEpoch::new(Epoch::from(version.max_committed_epoch)),
version.id,
)
})
.await;

let mut control_stream_manager =
ControlStreamManager::new(self.context.clone());

control_stream_manager
.reset(prev_epoch.value().0, active_streaming_nodes.current())
.reset(version_id, active_streaming_nodes.current())
.await
.inspect_err(|err| {
warn!(error = %err.as_report(), "reset compute nodes failed");
Expand Down
17 changes: 9 additions & 8 deletions src/meta/src/barrier/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use futures::{pin_mut, FutureExt, StreamExt};
use itertools::Itertools;
use risingwave_common::hash::ActorId;
use risingwave_common::util::tracing::TracingContext;
use risingwave_hummock_sdk::HummockVersionId;
use risingwave_pb::common::{ActorInfo, WorkerNode};
use risingwave_pb::stream_plan::{Barrier, BarrierMutation};
use risingwave_pb::stream_service::{
Expand Down Expand Up @@ -104,11 +105,11 @@ impl ControlStreamManager {
warn!(id = node.id, host = ?node.host, "node already exists");
return;
}
let prev_epoch = self
let version_id = self
.context
.hummock_manager
.latest_snapshot()
.committed_epoch;
.on_current_version(|version| version.id)
.await;
let node_id = node.id;
let node_host = node.host.clone().unwrap();
let mut backoff = ExponentialBackoff::from_millis(100)
Expand All @@ -118,7 +119,7 @@ impl ControlStreamManager {
for i in 1..=MAX_RETRY {
match self
.context
.new_control_stream_node(node.clone(), prev_epoch)
.new_control_stream_node(node.clone(), version_id)
.await
{
Ok((stream_node, response_stream)) => {
Expand All @@ -142,13 +143,13 @@ impl ControlStreamManager {

pub(super) async fn reset(
&mut self,
prev_epoch: u64,
version_id: HummockVersionId,
nodes: &HashMap<WorkerId, WorkerNode>,
) -> MetaResult<()> {
let nodes = try_join_all(nodes.iter().map(|(worker_id, node)| async {
let node = self
.context
.new_control_stream_node(node.clone(), prev_epoch)
.new_control_stream_node(node.clone(), version_id)
.await?;
Result::<_, MetaError>::Ok((*worker_id, node))
}))
Expand Down Expand Up @@ -353,7 +354,7 @@ impl GlobalBarrierManagerContext {
async fn new_control_stream_node(
&self,
node: WorkerNode,
prev_epoch: u64,
initial_version_id: HummockVersionId,
) -> MetaResult<(
ControlStreamNode,
BoxStream<'static, risingwave_rpc_client::error::Result<StreamingControlStreamResponse>>,
Expand All @@ -363,7 +364,7 @@ impl GlobalBarrierManagerContext {
.stream_client_pool()
.get(&node)
.await?
.start_streaming_control(prev_epoch)
.start_streaming_control(initial_version_id)
.await?;
Ok((
ControlStreamNode {
Expand Down
16 changes: 9 additions & 7 deletions src/meta/src/hummock/manager/compaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1375,13 +1375,15 @@ impl HummockManager {
_base_version_id: HummockVersionId,
compaction_groups: Vec<CompactionGroupId>,
) -> Result<()> {
let old_version = self.get_current_version().await;
tracing::info!(
"Trigger compaction for version {}, epoch {}, groups {:?}",
old_version.id,
old_version.max_committed_epoch,
compaction_groups
);
self.on_current_version(|old_version| {
tracing::info!(
"Trigger compaction for version {}, epoch {}, groups {:?}",
old_version.id,
old_version.max_committed_epoch,
compaction_groups
);
})
.await;

if compaction_groups.is_empty() {
return Ok(());
Expand Down
11 changes: 4 additions & 7 deletions src/meta/src/hummock/manager/versioning.rs
Original file line number Diff line number Diff line change
Expand Up @@ -149,16 +149,13 @@ impl HummockManager {
/// Should not be called inside [`HummockManager`], because it requests locks internally.
///
/// Note: this method can hurt performance because it will clone a large object.
#[cfg(any(test, feature = "test"))]
pub async fn get_current_version(&self) -> HummockVersion {
self.versioning.read().await.current_version.clone()
self.on_current_version(|version| version.clone()).await
}

pub async fn get_current_max_committed_epoch(&self) -> HummockEpoch {
self.versioning
.read()
.await
.current_version
.max_committed_epoch
pub async fn on_current_version<T>(&self, mut f: impl FnMut(&HummockVersion) -> T) -> T {
f(&self.versioning.read().await.current_version)
}

/// Gets the mapping from table id to compaction group id
Expand Down
59 changes: 33 additions & 26 deletions src/meta/src/manager/diagnose.rs
Original file line number Diff line number Diff line change
Expand Up @@ -414,10 +414,8 @@ impl DiagnoseCommand {

#[cfg_attr(coverage, coverage(off))]
async fn write_storage(&self, s: &mut String) {
let version = self.hummock_manger.get_current_version().await;
let mut sst_num = 0;
let mut sst_total_file_size = 0;
let compaction_group_num = version.levels.len();
let back_pressured_compaction_groups = self
.hummock_manger
.write_limits()
Expand Down Expand Up @@ -470,32 +468,41 @@ impl DiagnoseCommand {

let top_k = 10;
let mut top_tombstone_delete_sst = BinaryHeap::with_capacity(top_k);
for compaction_group in version.levels.values() {
let mut visit_level = |level: &Level| {
sst_num += level.table_infos.len();
sst_total_file_size += level.table_infos.iter().map(|t| t.file_size).sum::<u64>();
for sst in &level.table_infos {
if sst.total_key_count == 0 {
continue;
}
let tombstone_delete_ratio = sst.stale_key_count * 10000 / sst.total_key_count;
let e = SstableSort {
compaction_group_id: compaction_group.group_id,
sst_id: sst.sst_id,
delete_ratio: tombstone_delete_ratio,
let compaction_group_num = self
.hummock_manger
.on_current_version(|version| {
for compaction_group in version.levels.values() {
let mut visit_level = |level: &Level| {
sst_num += level.table_infos.len();
sst_total_file_size +=
level.table_infos.iter().map(|t| t.file_size).sum::<u64>();
for sst in &level.table_infos {
if sst.total_key_count == 0 {
continue;
}
let tombstone_delete_ratio =
sst.stale_key_count * 10000 / sst.total_key_count;
let e = SstableSort {
compaction_group_id: compaction_group.group_id,
sst_id: sst.sst_id,
delete_ratio: tombstone_delete_ratio,
};
top_k_sstables(top_k, &mut top_tombstone_delete_sst, e);
}
};
top_k_sstables(top_k, &mut top_tombstone_delete_sst, e);
let l0 = &compaction_group.l0;
// FIXME: why chaining levels iter leads to segmentation fault?
for level in &l0.sub_levels {
visit_level(level);
}
for level in &compaction_group.levels {
visit_level(level);
}
}
};
let l0 = &compaction_group.l0;
// FIXME: why chaining levels iter leads to segmentation fault?
for level in &l0.sub_levels {
visit_level(level);
}
for level in &compaction_group.levels {
visit_level(level);
}
}
version.levels.len()
})
.await;

let _ = writeln!(s, "number of SSTables: {sst_num}");
let _ = writeln!(s, "total size of SSTables (byte): {sst_total_file_size}");
let _ = writeln!(s, "number of compaction groups: {compaction_group_num}");
Expand Down
10 changes: 8 additions & 2 deletions src/rpc_client/src/stream_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use futures::TryStreamExt;
use risingwave_common::config::MAX_CONNECTION_WINDOW_SIZE;
use risingwave_common::monitor::{EndpointExt, TcpConfig};
use risingwave_common::util::addr::HostAddr;
use risingwave_hummock_sdk::HummockVersionId;
use risingwave_pb::stream_service::stream_service_client::StreamServiceClient;
use risingwave_pb::stream_service::streaming_control_stream_request::InitRequest;
use risingwave_pb::stream_service::streaming_control_stream_response::InitResponse;
Expand Down Expand Up @@ -86,10 +87,15 @@ pub type StreamingControlHandle =
UnboundedBidiStreamHandle<StreamingControlStreamRequest, StreamingControlStreamResponse>;

impl StreamClient {
pub async fn start_streaming_control(&self, prev_epoch: u64) -> Result<StreamingControlHandle> {
pub async fn start_streaming_control(
&self,
version_id: HummockVersionId,
) -> Result<StreamingControlHandle> {
let first_request = StreamingControlStreamRequest {
request: Some(streaming_control_stream_request::Request::Init(
InitRequest { prev_epoch },
InitRequest {
version_id: version_id.to_u64(),
},
)),
};
let mut client = self.0.to_owned();
Expand Down
4 changes: 0 additions & 4 deletions src/storage/hummock_test/src/bin/replay/replay_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -188,10 +188,6 @@ impl ReplayStateStore for GlobalReplayImpl {
.map_err(|_| TraceError::ValidateReadEpochFailed)?;
Ok(())
}

async fn clear_shared_buffer(&self, prev_epoch: u64) {
self.store.clear_shared_buffer(prev_epoch).await
}
}
pub(crate) struct LocalReplayImpl(LocalHummockStorage);

Expand Down
4 changes: 3 additions & 1 deletion src/storage/hummock_test/src/state_store_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1341,7 +1341,9 @@ async fn test_gc_watermark_and_clear_shared_buffer() {

drop(local_hummock_storage);

hummock_storage.clear_shared_buffer(epoch1).await;
hummock_storage
.clear_shared_buffer(hummock_storage.get_pinned_version().id())
.await;

assert_eq!(
hummock_storage
Expand Down
7 changes: 0 additions & 7 deletions src/storage/hummock_trace/src/collector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -216,13 +216,6 @@ impl TraceSpan {
Self::new_global_op(Operation::SealCurrentEpoch { epoch, opts }, storage_type)
}

pub fn new_clear_shared_buffer_span(prev_epoch: u64) -> MayTraceSpan {
Self::new_global_op(
Operation::ClearSharedBuffer(prev_epoch),
StorageType::Global,
)
}

pub fn new_validate_read_epoch_span(epoch: HummockReadEpoch) -> MayTraceSpan {
Self::new_global_op(
Operation::ValidateReadEpoch(epoch.into()),
Expand Down
3 changes: 0 additions & 3 deletions src/storage/hummock_trace/src/record.rs
Original file line number Diff line number Diff line change
Expand Up @@ -169,9 +169,6 @@ pub enum Operation {
/// Try wait epoch
TryWaitEpoch(TracedHummockReadEpoch),

/// clear shared buffer
ClearSharedBuffer(u64),

/// Seal current epoch
SealCurrentEpoch {
epoch: u64,
Expand Down
2 changes: 0 additions & 2 deletions src/storage/hummock_trace/src/replay/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,6 @@ pub trait ReplayStateStore {
async fn notify_hummock(&self, info: Info, op: RespOperation, version: u64) -> Result<u64>;
async fn new_local(&self, opts: TracedNewLocalOptions) -> Box<dyn LocalReplay>;
async fn try_wait_epoch(&self, epoch: HummockReadEpoch) -> Result<()>;
async fn clear_shared_buffer(&self, prev_epoch: u64);
fn validate_read_epoch(&self, epoch: HummockReadEpoch) -> Result<()>;
}

Expand Down Expand Up @@ -152,7 +151,6 @@ mock! {
) -> Result<u64>;
async fn new_local(&self, opts: TracedNewLocalOptions) -> Box<dyn LocalReplay>;
async fn try_wait_epoch(&self, epoch: HummockReadEpoch) -> Result<()>;
async fn clear_shared_buffer(&self, prev_epoch: u64);
fn validate_read_epoch(&self, epoch: HummockReadEpoch) -> Result<()>;
}
impl GlobalReplay for GlobalReplayInterface{}
Expand Down
4 changes: 0 additions & 4 deletions src/storage/hummock_trace/src/replay/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -327,10 +327,6 @@ impl ReplayWorker {
);
}
}
Operation::ClearSharedBuffer(prev_epoch) => {
assert_eq!(storage_type, StorageType::Global);
replay.clear_shared_buffer(prev_epoch).await;
}
Operation::SealCurrentEpoch { epoch, opts } => {
assert_ne!(storage_type, StorageType::Global);
let local_storage = local_storages.get_mut(&storage_type).unwrap();
Expand Down
Loading

0 comments on commit 4a38e0c

Please sign in to comment.