From 4c953ebc89e1c4a9e501dcf68d9d1e80a348150d Mon Sep 17 00:00:00 2001 From: "Zhanxiang (Patrick) Huang" Date: Tue, 23 Jan 2024 18:52:00 +0800 Subject: [PATCH 1/2] fix: dedup staging sst on storage table read (#14664) --- .../src/hummock_read_version_tests.rs | 88 ++++++++++++++++--- .../compactor/shared_buffer_compact.rs | 20 ++--- .../src/hummock/event_handler/uploader.rs | 2 +- .../shared_buffer/shared_buffer_batch.rs | 45 +++++++--- .../hummock/store/local_hummock_storage.rs | 2 +- src/storage/src/hummock/store/version.rs | 19 ++-- src/storage/src/hummock/test_utils.rs | 7 +- 7 files changed, 133 insertions(+), 50 deletions(-) diff --git a/src/storage/hummock_test/src/hummock_read_version_tests.rs b/src/storage/hummock_test/src/hummock_read_version_tests.rs index d7c82d6c41e14..2738e1193c554 100644 --- a/src/storage/hummock_test/src/hummock_read_version_tests.rs +++ b/src/storage/hummock_test/src/hummock_read_version_tests.rs @@ -20,7 +20,7 @@ use itertools::Itertools; use parking_lot::RwLock; use risingwave_common::catalog::TableId; use risingwave_hummock_sdk::key::{key_with_epoch, map_table_key_range}; -use risingwave_hummock_sdk::LocalSstableInfo; +use risingwave_hummock_sdk::{HummockEpoch, LocalSstableInfo}; use risingwave_meta::hummock::test_utils::setup_compute_env; use risingwave_pb::hummock::{KeyRange, SstableInfo}; use risingwave_storage::hummock::iterator::test_utils::{ @@ -31,7 +31,7 @@ use risingwave_storage::hummock::store::version::{ read_filter_for_batch, read_filter_for_local, HummockReadVersion, StagingData, StagingSstableInfo, VersionUpdate, }; -use risingwave_storage::hummock::test_utils::gen_dummy_batch; +use risingwave_storage::hummock::test_utils::{gen_dummy_batch, gen_dummy_sst_info}; use crate::test_utils::prepare_first_valid_version; @@ -52,15 +52,13 @@ async fn test_read_version_basic() { let kv_pairs = gen_dummy_batch(epoch); let sorted_items = SharedBufferBatch::build_shared_buffer_item_batches(kv_pairs); let size = SharedBufferBatch::measure_batch_size(&sorted_items); - let imm = SharedBufferBatch::build_shared_buffer_batch( + let imm = SharedBufferBatch::build_shared_buffer_batch_for_test( epoch, 0, sorted_items, size, vec![], TableId::from(table_id), - None, - None, ); read_version.update(VersionUpdate::Staging(StagingData::ImmMem(imm))); @@ -91,15 +89,13 @@ async fn test_read_version_basic() { let kv_pairs = gen_dummy_batch(epoch); let sorted_items = SharedBufferBatch::build_shared_buffer_item_batches(kv_pairs); let size = SharedBufferBatch::measure_batch_size(&sorted_items); - let imm = SharedBufferBatch::build_shared_buffer_batch( + let imm = SharedBufferBatch::build_shared_buffer_batch_for_test( epoch, 0, sorted_items, size, vec![], TableId::from(table_id), - None, - None, ); read_version.update(VersionUpdate::Staging(StagingData::ImmMem(imm))); @@ -278,15 +274,13 @@ async fn test_read_filter_basic() { let kv_pairs = gen_dummy_batch(epoch); let sorted_items = SharedBufferBatch::build_shared_buffer_item_batches(kv_pairs); let size = SharedBufferBatch::measure_batch_size(&sorted_items); - let imm = SharedBufferBatch::build_shared_buffer_batch( + let imm = SharedBufferBatch::build_shared_buffer_batch_for_test( epoch, 0, sorted_items, size, vec![], TableId::from(table_id), - None, - None, ); read_version @@ -344,3 +338,75 @@ async fn test_read_filter_basic() { } } } + +#[tokio::test] +async fn test_read_filter_for_batch_issue_14659() { + use std::ops::Bound::Unbounded; + + let (env, hummock_manager_ref, _cluster_manager_ref, worker_node) = + setup_compute_env(8080).await; + + let (pinned_version, _, _) = + prepare_first_valid_version(env, hummock_manager_ref, worker_node).await; + + const NUM_SHARDS: u64 = 2; + let table_id = TableId::from(2); + let epoch = 1; + let mut read_version_vec = vec![]; + let mut imms = vec![]; + + // Populate IMMs + for i in 0..NUM_SHARDS { + let read_version = Arc::new(RwLock::new(HummockReadVersion::new( + table_id, + pinned_version.clone(), + ))); + + let items = SharedBufferBatch::build_shared_buffer_item_batches(gen_dummy_batch(i)); + let size = SharedBufferBatch::measure_batch_size(&items); + let imm = SharedBufferBatch::build_shared_buffer_batch_for_test( + epoch, + 0, + items, + size, + vec![], + table_id, + ); + + imms.push(imm.clone()); + + read_version + .write() + .update(VersionUpdate::Staging(StagingData::ImmMem(imm))); + + read_version_vec.push(read_version); + } + + // Update read version via staging SSTs + let sst_id = 233; + let staging_sst = gen_dummy_sst_info(sst_id, imms.clone(), table_id, epoch); + read_version_vec.iter().for_each(|v| { + v.write().update(VersionUpdate::Staging(StagingData::Sst( + StagingSstableInfo::new( + vec![LocalSstableInfo::for_test(staging_sst.clone())], + vec![epoch], + imms.iter().map(|imm| imm.batch_id()).collect_vec(), + imms.iter().map(|imm| imm.size()).sum(), + ), + ))); + }); + + // build for batch with max epoch + let (_, hummock_read_snapshot) = read_filter_for_batch( + HummockEpoch::MAX, + table_id, + (Unbounded, Unbounded), + read_version_vec, + ) + .unwrap(); + + // No imms should be proivided + assert_eq!(0, hummock_read_snapshot.0.len()); + // Only 1 staging sst is provided + assert_eq!(1, hummock_read_snapshot.1.len()); +} diff --git a/src/storage/src/hummock/compactor/shared_buffer_compact.rs b/src/storage/src/hummock/compactor/shared_buffer_compact.rs index 8b94364fb2f8a..9f09c5e8542ef 100644 --- a/src/storage/src/hummock/compactor/shared_buffer_compact.rs +++ b/src/storage/src/hummock/compactor/shared_buffer_compact.rs @@ -619,7 +619,7 @@ mod tests { #[tokio::test] async fn test_generate_splits_in_order() { - let imm1 = ImmutableMemtable::build_shared_buffer_batch( + let imm1 = ImmutableMemtable::build_shared_buffer_batch_for_test( 3, 0, vec![( @@ -629,10 +629,8 @@ mod tests { 1024 * 1024, vec![], TableId::new(1), - None, - None, ); - let imm2 = ImmutableMemtable::build_shared_buffer_batch( + let imm2 = ImmutableMemtable::build_shared_buffer_batch_for_test( 3, 0, vec![( @@ -642,11 +640,9 @@ mod tests { (1024 + 256) * 1024, vec![], TableId::new(1), - None, - None, ); - let imm3 = ImmutableMemtable::build_shared_buffer_batch( + let imm3 = ImmutableMemtable::build_shared_buffer_batch_for_test( 2, 0, vec![( @@ -656,10 +652,8 @@ mod tests { (1024 + 512) * 1024, vec![], TableId::new(1), - None, - None, ); - let imm4 = ImmutableMemtable::build_shared_buffer_batch( + let imm4 = ImmutableMemtable::build_shared_buffer_batch_for_test( 3, 0, vec![( @@ -669,11 +663,9 @@ mod tests { (1024 + 512) * 1024, vec![], TableId::new(1), - None, - None, ); - let imm5 = ImmutableMemtable::build_shared_buffer_batch( + let imm5 = ImmutableMemtable::build_shared_buffer_batch_for_test( 3, 0, vec![( @@ -683,8 +675,6 @@ mod tests { (1024 + 256) * 1024, vec![], TableId::new(2), - None, - None, ); let storage_opts = StorageOpts { diff --git a/src/storage/src/hummock/event_handler/uploader.rs b/src/storage/src/hummock/event_handler/uploader.rs index 12c775165a75b..400c622424a30 100644 --- a/src/storage/src/hummock/event_handler/uploader.rs +++ b/src/storage/src/hummock/event_handler/uploader.rs @@ -1222,7 +1222,7 @@ mod tests { size, vec![], TEST_TABLE_ID, - None, + LocalInstanceId::default(), tracker, ) } diff --git a/src/storage/src/hummock/shared_buffer/shared_buffer_batch.rs b/src/storage/src/hummock/shared_buffer/shared_buffer_batch.rs index 00facd7bec101..06592e287e556 100644 --- a/src/storage/src/hummock/shared_buffer/shared_buffer_batch.rs +++ b/src/storage/src/hummock/shared_buffer/shared_buffer_batch.rs @@ -584,7 +584,7 @@ impl SharedBufferBatch { size: usize, delete_ranges: Vec<(Bound, Bound)>, table_id: TableId, - instance_id: Option, + instance_id: LocalInstanceId, tracker: Option, ) -> Self { let inner = SharedBufferBatchInner::new( @@ -599,7 +599,7 @@ impl SharedBufferBatch { SharedBufferBatch { inner: Arc::new(inner), table_id, - instance_id: instance_id.unwrap_or_default(), + instance_id, } } @@ -642,6 +642,31 @@ impl SharedBufferBatch { } vnodes } + + #[cfg(any(test, feature = "test"))] + pub fn build_shared_buffer_batch_for_test( + epoch: HummockEpoch, + spill_offset: u16, + sorted_items: Vec, + size: usize, + delete_ranges: Vec<(Bound, Bound)>, + table_id: TableId, + ) -> Self { + let inner = SharedBufferBatchInner::new( + table_id, + epoch, + spill_offset, + sorted_items, + delete_ranges, + size, + None, + ); + SharedBufferBatch { + inner: Arc::new(inner), + table_id, + instance_id: LocalInstanceId::default(), + } + } } /// Iterate all the items in the shared buffer batch @@ -991,7 +1016,7 @@ mod tests { output.reverse(); assert_eq!(output, shared_buffer_items); - let batch = SharedBufferBatch::build_shared_buffer_batch( + let batch = SharedBufferBatch::build_shared_buffer_batch_for_test( epoch, 0, vec![], @@ -1007,8 +1032,6 @@ mod tests { ), ], TableId::new(0), - None, - None, ); assert_eq!(batch.start_table_key().as_ref(), "a".as_bytes()); assert_eq!( @@ -1174,15 +1197,13 @@ mod tests { Bound::Excluded(Bytes::from(b"eee".to_vec())), ), ]; - let shared_buffer_batch = SharedBufferBatch::build_shared_buffer_batch( + let shared_buffer_batch = SharedBufferBatch::build_shared_buffer_batch_for_test( epoch, 0, vec![], 0, delete_ranges, Default::default(), - None, - None, ); assert_eq!( epoch, @@ -1467,15 +1488,13 @@ mod tests { ]; let sorted_items1 = transform_shared_buffer(shared_buffer_items1); let size = SharedBufferBatch::measure_batch_size(&sorted_items1); - let imm1 = SharedBufferBatch::build_shared_buffer_batch( + let imm1 = SharedBufferBatch::build_shared_buffer_batch_for_test( epoch, 0, sorted_items1, size, delete_ranges, table_id, - None, - None, ); let epoch = 2; @@ -1513,15 +1532,13 @@ mod tests { ]; let sorted_items2 = transform_shared_buffer(shared_buffer_items2); let size = SharedBufferBatch::measure_batch_size(&sorted_items2); - let imm2 = SharedBufferBatch::build_shared_buffer_batch( + let imm2 = SharedBufferBatch::build_shared_buffer_batch_for_test( epoch, 0, sorted_items2, size, delete_ranges, table_id, - None, - None, ); let imms = vec![imm2, imm1]; diff --git a/src/storage/src/hummock/store/local_hummock_storage.rs b/src/storage/src/hummock/store/local_hummock_storage.rs index 2bddff818d9fb..0db2b2ea07c57 100644 --- a/src/storage/src/hummock/store/local_hummock_storage.rs +++ b/src/storage/src/hummock/store/local_hummock_storage.rs @@ -496,7 +496,7 @@ impl LocalHummockStorage { size, delete_ranges, table_id, - Some(instance_id), + instance_id, Some(tracker), ); self.spill_offset += 1; diff --git a/src/storage/src/hummock/store/version.rs b/src/storage/src/hummock/store/version.rs index 6803a686c9426..59626f4e42cee 100644 --- a/src/storage/src/hummock/store/version.rs +++ b/src/storage/src/hummock/store/version.rs @@ -537,22 +537,29 @@ pub fn read_filter_for_batch( let mut imm_vec = Vec::default(); let mut sst_vec = Vec::default(); + let mut seen_imm_ids = HashSet::new(); + let mut seen_sst_ids = HashSet::new(); // only filter the staging data that epoch greater than max_mce to avoid data duplication let (min_epoch, max_epoch) = (max_mce_version.max_committed_epoch(), epoch); // prune imm and sst with max_mce for (staging_imms, staging_ssts) in staging_vec { - imm_vec.extend( - staging_imms - .into_iter() - .filter(|imm| imm.min_epoch() > min_epoch && imm.min_epoch() <= max_epoch), - ); + imm_vec.extend(staging_imms.into_iter().filter(|imm| { + // There shouldn't be duplicated IMMs because merge imm only operates on a single shard. + assert!(seen_imm_ids.insert(imm.batch_id())); + imm.min_epoch() > min_epoch && imm.min_epoch() <= max_epoch + })); sst_vec.extend(staging_ssts.into_iter().filter(|staging_sst| { assert!( staging_sst.get_max_epoch() <= min_epoch || staging_sst.get_min_epoch() > min_epoch ); - staging_sst.min_epoch > min_epoch + // Dedup staging SSTs in different shard. Duplicates can happen in the following case: + // - Table 1 Shard 1 produces IMM 1 + // - Table 1 Shard 2 produces IMM 2 + // - IMM 1 and IMM 2 are compacted into SST 1 as a Staging SST + // - SST 1 is added to both Shard 1's and Shard 2's read version + staging_sst.min_epoch > min_epoch && seen_sst_ids.insert(staging_sst.object_id) })); } diff --git a/src/storage/src/hummock/test_utils.rs b/src/storage/src/hummock/test_utils.rs index bf413fea0bb5c..d308fd50011a3 100644 --- a/src/storage/src/hummock/test_utils.rs +++ b/src/storage/src/hummock/test_utils.rs @@ -116,8 +116,10 @@ pub fn gen_dummy_sst_info( right_exclusive: false, }), file_size, - table_ids: vec![], + table_ids: vec![table_id.table_id], uncompressed_file_size: file_size, + min_epoch: epoch, + max_epoch: epoch, ..Default::default() } } @@ -392,6 +394,7 @@ pub fn create_small_table_cache() -> Arc Date: Tue, 23 Jan 2024 19:09:11 +0800 Subject: [PATCH 2/2] feat(object_store): add retry for `Minio` `SlowDown` and `TooManyRequests` errors (#14739) --- Makefile.toml | 5 +++ ci/scripts/backfill-test.sh | 2 +- ci/scripts/run-backfill-tests.sh | 55 +++++++++++++++++++------------ ci/workflows/main-cron.yml | 2 +- ci/workflows/pull-request.yml | 2 +- risedev.yml | 50 ++++++++++++++++++++++++++++ src/common/src/config.rs | 34 ++++++++++++++++--- src/config/ci.toml | 2 +- src/config/example.toml | 4 +++ src/object_store/src/object/s3.rs | 30 +++++++++++++---- 10 files changed, 151 insertions(+), 35 deletions(-) diff --git a/Makefile.toml b/Makefile.toml index 790564671f358..983b304d74e51 100644 --- a/Makefile.toml +++ b/Makefile.toml @@ -1305,6 +1305,11 @@ command = "target/${BUILD_MODE_DIR}/risedev-dev" args = ["${@}"] description = "Clean data and start a full RisingWave dev cluster using risedev-dev" +[tasks.ci-kill-no-dump-logs] +category = "RiseDev - CI" +dependencies = ["k", "check-logs", "wait-processes-exit"] +description = "Kill cluster and check logs, do not dump logs" + [tasks.ci-kill] category = "RiseDev - CI" dependencies = ["k", "l", "check-logs", "wait-processes-exit"] diff --git a/ci/scripts/backfill-test.sh b/ci/scripts/backfill-test.sh index 056db77c842a4..4769d7c5d229d 100755 --- a/ci/scripts/backfill-test.sh +++ b/ci/scripts/backfill-test.sh @@ -32,4 +32,4 @@ download_and_prepare_rw "$profile" source ################ TESTS -profile=$profile ./ci/scripts/run-backfill-tests.sh +BUILDKITE=${BUILDKITE:-} profile=$profile ./ci/scripts/run-backfill-tests.sh diff --git a/ci/scripts/run-backfill-tests.sh b/ci/scripts/run-backfill-tests.sh index 0fb97b978e67c..e809053f9f376 100755 --- a/ci/scripts/run-backfill-tests.sh +++ b/ci/scripts/run-backfill-tests.sh @@ -23,12 +23,17 @@ BACKGROUND_DDL_DIR=$TEST_DIR/background_ddl COMMON_DIR=$BACKGROUND_DDL_DIR/common CLUSTER_PROFILE='ci-1cn-1fe-kafka-with-recovery' +echo "--- Configuring cluster profiles" if [[ -n "${BUILDKITE:-}" ]]; then - RUNTIME_CLUSTER_PROFILE='ci-3cn-1fe-with-monitoring' -else + echo "Running in buildkite" RUNTIME_CLUSTER_PROFILE='ci-3cn-1fe' + MINIO_RATE_LIMIT_CLUSTER_PROFILE='ci-3cn-1fe-with-minio-rate-limit' +else + echo "Running locally" + RUNTIME_CLUSTER_PROFILE='ci-3cn-1fe-with-monitoring' + MINIO_RATE_LIMIT_CLUSTER_PROFILE='ci-3cn-1fe-with-monitoring-and-minio-rate-limit' fi -export RUST_LOG="info,risingwave_meta::barrier::progress=debug,risingwave_meta::rpc::ddl_controller=debug" +export RUST_LOG="info,risingwave_stream=info,risingwave_batch=info,risingwave_storage=info" \ run_sql_file() { psql -h localhost -p 4566 -d dev -U root -f "$@" @@ -60,8 +65,8 @@ rename_logs_with_prefix() { } kill_cluster() { - cargo make kill - cargo make wait-processes-exit + cargo make ci-kill-no-dump-logs + wait } restart_cluster() { @@ -150,7 +155,6 @@ test_backfill_tombstone() { ./risedev psql -c "CREATE MATERIALIZED VIEW m1 as select * from tomb;" echo "--- Kill cluster" kill_cluster - cargo make wait-processes-exit wait } @@ -171,9 +175,7 @@ test_replication_with_column_pruning() { run_sql_file "$PARENT_PATH"/sql/backfill/replication_with_column_pruning/select.sql , } impl SystemConfig { @@ -1526,8 +1546,14 @@ pub mod default { DEFAULT_RETRY_MAX_ATTEMPTS } - pub fn retry_unknown_service_error() -> bool { - false + pub mod developer { + pub fn object_store_retry_unknown_service_error() -> bool { + false + } + + pub fn object_store_retryable_service_error_codes() -> Vec { + vec!["SlowDown".into(), "TooManyRequests".into()] + } } } } diff --git a/src/config/ci.toml b/src/config/ci.toml index 51793d3dba698..db207ebf44412 100644 --- a/src/config/ci.toml +++ b/src/config/ci.toml @@ -19,4 +19,4 @@ imm_merge_threshold = 2 [system] barrier_interval_ms = 250 checkpoint_frequency = 5 -max_concurrent_creating_streaming_jobs = 0 +max_concurrent_creating_streaming_jobs = 0 \ No newline at end of file diff --git a/src/config/example.toml b/src/config/example.toml index 413321d6ff3ec..914cfe63889d4 100644 --- a/src/config/example.toml +++ b/src/config/example.toml @@ -181,6 +181,10 @@ object_store_req_retry_max_delay_ms = 10000 object_store_req_retry_max_attempts = 8 retry_unknown_service_error = false +[storage.object_store.s3.developer] +object_store_retry_unknown_service_error = false +object_store_retryable_service_error_codes = ["SlowDown", "TooManyRequests"] + [system] barrier_interval_ms = 1000 checkpoint_frequency = 1 diff --git a/src/object_store/src/object/s3.rs b/src/object_store/src/object/s3.rs index cddb9c0c75e33..349d3b7142322 100644 --- a/src/object_store/src/object/s3.rs +++ b/src/object_store/src/object/s3.rs @@ -938,12 +938,18 @@ impl From for ObjectError { struct RetryCondition { retry_unknown_service_error: bool, + retryable_service_error_codes: Vec, } impl RetryCondition { fn new(config: &S3ObjectStoreConfig) -> Self { Self { - retry_unknown_service_error: config.retry_unknown_service_error, + retry_unknown_service_error: config.developer.object_store_retry_unknown_service_error + || config.retry_unknown_service_error, + retryable_service_error_codes: config + .developer + .object_store_retryable_service_error_codes + .clone(), } } } @@ -958,12 +964,24 @@ impl tokio_retry::Condition for RetryCondition { return true; } } - SdkError::ServiceError(e) => { - if self.retry_unknown_service_error && e.err().code().is_none() { - tracing::warn!(target: "unknown_service_error", "{e:?} occurs, retry S3 get_object request."); - return true; + SdkError::ServiceError(e) => match e.err().code() { + None => { + if self.retry_unknown_service_error { + tracing::warn!(target: "unknown_service_error", "{e:?} occurs, retry S3 get_object request."); + return true; + } } - } + Some(code) => { + if self + .retryable_service_error_codes + .iter() + .any(|s| s.as_str().eq_ignore_ascii_case(code)) + { + tracing::warn!(target: "retryable_service_error", "{e:?} occurs, retry S3 get_object request."); + return true; + } + } + }, _ => {} }, Either::Right(_) => {