Skip to content

Commit

Permalink
refactor(storage): pass sync result in commit epoch in test (#16403)
Browse files Browse the repository at this point in the history
  • Loading branch information
wenym1 authored Apr 19, 2024
1 parent be57d0a commit 47ad484
Show file tree
Hide file tree
Showing 27 changed files with 114 additions and 222 deletions.
55 changes: 19 additions & 36 deletions src/meta/src/hummock/mock_hummock_meta_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::HashMap;
use std::sync::atomic::{AtomicU32, Ordering};
use std::sync::Arc;
use std::time::SystemTime;
Expand All @@ -22,13 +21,11 @@ use async_trait::async_trait;
use fail::fail_point;
use futures::stream::BoxStream;
use futures::{Stream, StreamExt};
use risingwave_common::catalog::TableId;
use risingwave_hummock_sdk::compaction_group::StaticCompactionGroupId;
use risingwave_hummock_sdk::table_watermark::TableWatermarks;
use risingwave_hummock_sdk::version::HummockVersion;
use risingwave_hummock_sdk::{
HummockContextId, HummockEpoch, HummockSstableObjectId, HummockVersionId, LocalSstableInfo,
SstObjectIdRange,
SstObjectIdRange, SyncResult,
};
use risingwave_pb::common::{HostAddress, WorkerType};
use risingwave_pb::hummock::compact_task::TaskStatus;
Expand Down Expand Up @@ -93,31 +90,6 @@ impl MockHummockMetaClient {
.await
.unwrap_or(None)
}

pub async fn commit_epoch_with_watermark(
&self,
epoch: HummockEpoch,
sstables: Vec<LocalSstableInfo>,
new_table_watermarks: HashMap<TableId, TableWatermarks>,
) -> Result<()> {
let sst_to_worker = sstables
.iter()
.map(|LocalSstableInfo { sst_info, .. }| (sst_info.get_object_id(), self.context_id))
.collect();
self.hummock_manager
.commit_epoch(
epoch,
CommitEpochInfo::new(
sstables.into_iter().map(Into::into).collect(),
new_table_watermarks,
sst_to_worker,
None,
),
)
.await
.map_err(mock_err)?;
Ok(())
}
}

fn mock_err(error: super::error::Error) -> RpcError {
Expand Down Expand Up @@ -183,17 +155,28 @@ impl HummockMetaClient for MockHummockMetaClient {
})
}

async fn commit_epoch(
&self,
epoch: HummockEpoch,
sstables: Vec<LocalSstableInfo>,
) -> Result<()> {
let sst_to_worker = sstables
async fn commit_epoch(&self, epoch: HummockEpoch, sync_result: SyncResult) -> Result<()> {
let sst_to_worker = sync_result
.uncommitted_ssts
.iter()
.map(|LocalSstableInfo { sst_info, .. }| (sst_info.get_object_id(), self.context_id))
.collect();
let new_table_watermark = sync_result.table_watermarks;

self.hummock_manager
.commit_epoch(epoch, CommitEpochInfo::for_test(sstables, sst_to_worker))
.commit_epoch(
epoch,
CommitEpochInfo::new(
sync_result
.uncommitted_ssts
.into_iter()
.map(|sst| sst.into())
.collect(),
new_table_watermark,
sst_to_worker,
None,
),
)
.await
.map_err(mock_err)?;
Ok(())
Expand Down
9 changes: 1 addition & 8 deletions src/meta/src/stream/stream_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -872,14 +872,7 @@ mod tests {
let _ = tx.send(Ok(StreamingControlStreamResponse {
response: Some(
streaming_control_stream_response::Response::CompleteBarrier(
BarrierCompleteResponse {
request_id: "".to_string(),
status: None,
create_mview_progress: vec![],
synced_sstables: vec![],
worker_id: 0,
table_watermarks: Default::default(),
},
BarrierCompleteResponse::default(),
),
),
}));
Expand Down
8 changes: 2 additions & 6 deletions src/rpc_client/src/hummock_meta_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use async_trait::async_trait;
use futures::stream::BoxStream;
use risingwave_hummock_sdk::version::HummockVersion;
use risingwave_hummock_sdk::{
HummockEpoch, HummockSstableObjectId, HummockVersionId, LocalSstableInfo, SstObjectIdRange,
HummockEpoch, HummockSstableObjectId, HummockVersionId, SstObjectIdRange, SyncResult,
};
use risingwave_pb::hummock::{
HummockSnapshot, SubscribeCompactionEventRequest, SubscribeCompactionEventResponse, VacuumTask,
Expand All @@ -37,11 +37,7 @@ pub trait HummockMetaClient: Send + Sync + 'static {
async fn get_snapshot(&self) -> Result<HummockSnapshot>;
async fn get_new_sst_ids(&self, number: u32) -> Result<SstObjectIdRange>;
// We keep `commit_epoch` only for test/benchmark.
async fn commit_epoch(
&self,
epoch: HummockEpoch,
sstables: Vec<LocalSstableInfo>,
) -> Result<()>;
async fn commit_epoch(&self, epoch: HummockEpoch, sync_result: SyncResult) -> Result<()>;
async fn update_current_epoch(&self, epoch: HummockEpoch) -> Result<()>;
async fn report_vacuum_task(&self, vacuum_task: VacuumTask) -> Result<()>;
async fn trigger_manual_compaction(
Expand Down
10 changes: 3 additions & 7 deletions src/rpc_client/src/meta_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@ use risingwave_common::RW_VERSION;
use risingwave_hummock_sdk::compaction_group::StateTableId;
use risingwave_hummock_sdk::version::{HummockVersion, HummockVersionDelta};
use risingwave_hummock_sdk::{
CompactionGroupId, HummockEpoch, HummockSstableObjectId, HummockVersionId, LocalSstableInfo,
SstObjectIdRange,
CompactionGroupId, HummockEpoch, HummockSstableObjectId, HummockVersionId, SstObjectIdRange,
SyncResult,
};
use risingwave_pb::backup_service::backup_service_client::BackupServiceClient;
use risingwave_pb::backup_service::*;
Expand Down Expand Up @@ -1392,11 +1392,7 @@ impl HummockMetaClient for MetaClient {
Ok(SstObjectIdRange::new(resp.start_id, resp.end_id))
}

async fn commit_epoch(
&self,
_epoch: HummockEpoch,
_sstables: Vec<LocalSstableInfo>,
) -> Result<()> {
async fn commit_epoch(&self, _epoch: HummockEpoch, _sync_result: SyncResult) -> Result<()> {
panic!("Only meta service can commit_epoch in production.")
}

Expand Down
14 changes: 14 additions & 0 deletions src/storage/hummock_sdk/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@

mod key_cmp;
use std::cmp::Ordering;
use std::collections::HashMap;

pub use key_cmp::*;
use risingwave_common::util::epoch::EPOCH_SPILL_TIME_MASK;
Expand All @@ -48,6 +49,9 @@ pub mod table_watermark;
pub mod version;

pub use compact::*;
use risingwave_common::catalog::TableId;

use crate::table_watermark::TableWatermarks;

pub type HummockSstableObjectId = u64;
pub type HummockSstableId = u64;
Expand Down Expand Up @@ -89,6 +93,16 @@ macro_rules! info_in_release {
}
}

#[derive(Default, Debug)]
pub struct SyncResult {
/// The size of all synced shared buffers.
pub sync_size: usize,
/// The `sst_info` of sync.
pub uncommitted_ssts: Vec<LocalSstableInfo>,
/// The collected table watermarks written by state tables.
pub table_watermarks: HashMap<TableId, TableWatermarks>,
}

#[derive(Debug, Clone)]
pub struct LocalSstableInfo {
pub compaction_group_id: CompactionGroupId,
Expand Down
4 changes: 2 additions & 2 deletions src/storage/hummock_test/src/bin/replay/replay_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use futures_async_stream::try_stream;
use risingwave_common::util::addr::HostAddr;
use risingwave_common_service::observer_manager::{Channel, NotificationClient, ObserverError};
use risingwave_hummock_sdk::key::TableKey;
use risingwave_hummock_sdk::HummockReadEpoch;
use risingwave_hummock_sdk::{HummockReadEpoch, SyncResult};
use risingwave_hummock_trace::{
GlobalReplay, LocalReplay, LocalReplayRead, ReplayItem, ReplayRead, ReplayStateStore,
ReplayWrite, Result, TraceError, TracedBytes, TracedInitOptions, TracedNewLocalOptions,
Expand All @@ -33,7 +33,7 @@ use risingwave_pb::meta::{SubscribeResponse, SubscribeType};
use risingwave_storage::hummock::store::LocalHummockStorage;
use risingwave_storage::hummock::HummockStorage;
use risingwave_storage::store::{
to_owned_item, LocalStateStore, StateStoreIterExt, StateStoreRead, SyncResult,
to_owned_item, LocalStateStore, StateStoreIterExt, StateStoreRead,
};
use risingwave_storage::{StateStore, StateStoreIter, StateStoreReadIter};
use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver};
Expand Down
40 changes: 10 additions & 30 deletions src/storage/hummock_test/src/compactor_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -186,13 +186,9 @@ pub(crate) mod tests {
} else {
local.seal_current_epoch(u64::MAX, SealCurrentEpochOptions::for_test());
}
let ssts = storage
.seal_and_sync_epoch(epoch)
.await
.unwrap()
.uncommitted_ssts;
let res = storage.seal_and_sync_epoch(epoch).await.unwrap();

hummock_meta_client.commit_epoch(epoch, ssts).await.unwrap();
hummock_meta_client.commit_epoch(epoch, res).await.unwrap();
}
}

Expand Down Expand Up @@ -537,12 +533,8 @@ pub(crate) mod tests {
storage: &HummockStorage,
epoch: u64,
) {
let ssts = storage
.seal_and_sync_epoch(epoch)
.await
.unwrap()
.uncommitted_ssts;
hummock_meta_client.commit_epoch(epoch, ssts).await.unwrap();
let res = storage.seal_and_sync_epoch(epoch).await.unwrap();
hummock_meta_client.commit_epoch(epoch, res).await.unwrap();
}

async fn prepare_data(
Expand Down Expand Up @@ -766,12 +758,8 @@ pub(crate) mod tests {
storage.seal_current_epoch(next_epoch, SealCurrentEpochOptions::for_test());
other.seal_current_epoch(next_epoch, SealCurrentEpochOptions::for_test());

let ssts = global_storage
.seal_and_sync_epoch(epoch)
.await
.unwrap()
.uncommitted_ssts;
hummock_meta_client.commit_epoch(epoch, ssts).await.unwrap();
let res = global_storage.seal_and_sync_epoch(epoch).await.unwrap();
hummock_meta_client.commit_epoch(epoch, res).await.unwrap();
}

// Mimic dropping table
Expand Down Expand Up @@ -959,12 +947,8 @@ pub(crate) mod tests {
local.flush().await.unwrap();
local.seal_current_epoch(next_epoch, SealCurrentEpochOptions::for_test());

let ssts = storage
.seal_and_sync_epoch(epoch)
.await
.unwrap()
.uncommitted_ssts;
hummock_meta_client.commit_epoch(epoch, ssts).await.unwrap();
let res = storage.seal_and_sync_epoch(epoch).await.unwrap();
hummock_meta_client.commit_epoch(epoch, res).await.unwrap();
}

let manual_compcation_option = ManualCompactionOption {
Expand Down Expand Up @@ -1158,12 +1142,8 @@ pub(crate) mod tests {
.unwrap();
local.flush().await.unwrap();
local.seal_current_epoch(next_epoch, SealCurrentEpochOptions::for_test());
let ssts = storage
.seal_and_sync_epoch(epoch)
.await
.unwrap()
.uncommitted_ssts;
hummock_meta_client.commit_epoch(epoch, ssts).await.unwrap();
let res = storage.seal_and_sync_epoch(epoch).await.unwrap();
hummock_meta_client.commit_epoch(epoch, res).await.unwrap();
}

let manual_compcation_option = ManualCompactionOption {
Expand Down
16 changes: 4 additions & 12 deletions src/storage/hummock_test/src/failpoint_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,12 +140,8 @@ async fn test_failpoints_state_store_read_upload() {
);

// sync epoch1 test the read_error
let ssts = hummock_storage
.seal_and_sync_epoch(1)
.await
.unwrap()
.uncommitted_ssts;
meta_client.commit_epoch(1, ssts).await.unwrap();
let res = hummock_storage.seal_and_sync_epoch(1).await.unwrap();
meta_client.commit_epoch(1, res).await.unwrap();
hummock_storage
.try_wait_epoch(HummockReadEpoch::Committed(1))
.await
Expand Down Expand Up @@ -216,12 +212,8 @@ async fn test_failpoints_state_store_read_upload() {
assert!(result.is_err());
fail::remove(mem_upload_err);

let ssts = hummock_storage
.seal_and_sync_epoch(3)
.await
.unwrap()
.uncommitted_ssts;
meta_client.commit_epoch(3, ssts).await.unwrap();
let res = hummock_storage.seal_and_sync_epoch(3).await.unwrap();
meta_client.commit_epoch(3, res).await.unwrap();
hummock_storage
.try_wait_epoch(HummockReadEpoch::Committed(3))
.await
Expand Down
Loading

0 comments on commit 47ad484

Please sign in to comment.