diff --git a/src/storage/src/hummock/compactor/compactor_runner.rs b/src/storage/src/hummock/compactor/compactor_runner.rs index 567edc4cdd7e9..8bb439cbabff5 100644 --- a/src/storage/src/hummock/compactor/compactor_runner.rs +++ b/src/storage/src/hummock/compactor/compactor_runner.rs @@ -714,7 +714,7 @@ where let mut drop = false; let epoch = iter_key.epoch; - let value = iter.value(); + let mut value = iter.value(); if is_new_user_key { if !max_key.is_empty() && iter_key >= max_key { break; @@ -797,27 +797,29 @@ where if value.is_delete() { user_key_last_delete_epoch = epoch; } else if earliest_range_delete_which_can_see_iter_key < user_key_last_delete_epoch { - debug_assert!( - iter_key.epoch < earliest_range_delete_which_can_see_iter_key - && earliest_range_delete_which_can_see_iter_key < user_key_last_delete_epoch - ); user_key_last_delete_epoch = earliest_range_delete_which_can_see_iter_key; - // In each SST, since a union set of delete ranges is constructed and thus original - // delete ranges are replaced with the union set and not used in read, we lose exact - // information about whether a key is deleted by a delete range in - // the same SST. Therefore we need to construct a corresponding - // delete key to represent this. - iter_key.epoch = earliest_range_delete_which_can_see_iter_key; - sst_builder - .add_full_key(iter_key, HummockValue::Delete, is_new_user_key) - .verbose_instrument_await("add_full_key_delete") - .await?; - last_table_stats.total_key_count += 1; - last_table_stats.total_key_size += iter_key.encoded_len() as i64; - last_table_stats.total_value_size += 1; - iter_key.epoch = epoch; - is_new_user_key = false; + if iter_key.epoch < earliest_range_delete_which_can_see_iter_key { + // In each SST, since a union set of delete ranges is constructed and thus original + // delete ranges are replaced with the union set and not used in read, we lose exact + // information about whether a key is deleted by a delete range in + // the same SST. Therefore we need to construct a corresponding + // delete key to represent this. + iter_key.epoch = earliest_range_delete_which_can_see_iter_key; + sst_builder + .add_full_key(iter_key, HummockValue::Delete, is_new_user_key) + .verbose_instrument_await("add_full_key_delete") + .await?; + last_table_stats.total_key_count += 1; + last_table_stats.total_key_size += iter_key.encoded_len() as i64; + last_table_stats.total_value_size += 1; + iter_key.epoch = epoch; + is_new_user_key = false; + } else { + // If the range delete comes from the same epoch, convert value to Delete anyway. + debug_assert_eq!(iter_key.epoch, earliest_range_delete_which_can_see_iter_key); + value = HummockValue::Delete; + } } // Don't allow two SSTs to share same user key diff --git a/src/storage/src/hummock/compactor/shared_buffer_compact.rs b/src/storage/src/hummock/compactor/shared_buffer_compact.rs index 428361237c0ac..fb2ac8ec5b70b 100644 --- a/src/storage/src/hummock/compactor/shared_buffer_compact.rs +++ b/src/storage/src/hummock/compactor/shared_buffer_compact.rs @@ -385,7 +385,7 @@ pub async fn merge_imms_in_memory( let mut pivot_last_delete_epoch = HummockEpoch::MAX; - for ((key, value), epoch) in items { + for ((key, mut value), epoch) in items { assert!(key >= pivot, "key should be in ascending order"); let earliest_range_delete_which_can_see_key = if key == pivot { del_iter.earliest_delete_since(epoch) @@ -402,20 +402,21 @@ pub async fn merge_imms_in_memory( if value.is_delete() { pivot_last_delete_epoch = epoch; } else if earliest_range_delete_which_can_see_key < pivot_last_delete_epoch { - debug_assert!( - epoch < earliest_range_delete_which_can_see_key - && earliest_range_delete_which_can_see_key < pivot_last_delete_epoch - ); pivot_last_delete_epoch = earliest_range_delete_which_can_see_key; - // In each merged immutable memtable, since a union set of delete ranges is constructed - // and thus original delete ranges are replaced with the union set and not - // used in read, we lose exact information about whether a key is deleted by - // a delete range in the merged imm which it belongs to. Therefore we need - // to construct a corresponding delete key to represent this. - versions.push(( - earliest_range_delete_which_can_see_key, - HummockValue::Delete, - )); + if epoch < earliest_range_delete_which_can_see_key { + // In each merged immutable memtable, since a union set of delete ranges is constructed + // and thus original delete ranges are replaced with the union set and not + // used in read, we lose exact information about whether a key is deleted by + // a delete range in the merged imm which it belongs to. Therefore we need + // to construct a corresponding delete key to represent this. + versions.push(( + earliest_range_delete_which_can_see_key, + HummockValue::Delete, + )); + } else { + debug_assert_eq!(epoch, earliest_range_delete_which_can_see_key); + value = HummockValue::Delete; + } } versions.push((epoch, value)); } diff --git a/src/storage/src/hummock/sstable/block.rs b/src/storage/src/hummock/sstable/block.rs index 59ea0a6805b35..525377b1a7c0f 100644 --- a/src/storage/src/hummock/sstable/block.rs +++ b/src/storage/src/hummock/sstable/block.rs @@ -473,13 +473,8 @@ impl BlockBuilder { debug_assert_eq!( KeyComparator::compare_encoded_full_key(&self.last_key[..], &key[..]), Ordering::Less, - "epoch: {}, table key: {}", - full_key.epoch, - u64::from_be_bytes( - full_key.user_key.table_key.as_ref()[0..8] - .try_into() - .unwrap() - ), + "key: {:?}", + full_key ); } // Update restart point if needed and calculate diff key. diff --git a/src/tests/simulation/tests/integration_tests/sink/basic.rs b/src/tests/simulation/tests/integration_tests/sink/basic.rs index aa8c8725b3815..bceb45a8a2389 100644 --- a/src/tests/simulation/tests/integration_tests/sink/basic.rs +++ b/src/tests/simulation/tests/integration_tests/sink/basic.rs @@ -291,3 +291,81 @@ async fn test_sink_decouple_basic() -> Result<()> { Ok(()) } + +#[tokio::test] +async fn test_sink_decouple_blackhole() -> Result<()> { + let config_path = { + let mut file = tempfile::NamedTempFile::new().expect("failed to create temp config file"); + file.write_all(include_bytes!("../../../../../config/ci-sim.toml")) + .expect("failed to write config file"); + file.into_temp_path() + }; + + let mut cluster = Cluster::start(Configuration { + config_path: ConfigPath::Temp(config_path.into()), + frontend_nodes: 1, + compute_nodes: 3, + meta_nodes: 1, + compactor_nodes: 1, + compute_node_cores: 2, + etcd_timeout_rate: 0.0, + etcd_data_path: None, + }) + .await?; + + let source_parallelism = 12; + let mut txs = Vec::new(); + let mut rxs = Vec::new(); + for _ in 0..source_parallelism { + let (tx, rx): (_, UnboundedReceiver) = unbounded_channel(); + txs.push(tx); + rxs.push(Some(rx)); + } + + let _source_guard = registry_test_source(BoxSource::new( + move |_, _| { + Ok((0..source_parallelism) + .map(|i: usize| TestSourceSplit { + id: format!("{}", i).as_str().into(), + properties: Default::default(), + offset: "".to_string(), + }) + .collect_vec()) + }, + move |_, splits, _, _, _| { + select_all(splits.into_iter().map(|split| { + let id: usize = split.id.parse().unwrap(); + let rx = rxs[id].take().unwrap(); + UnboundedReceiverStream::new(rx).map(|chunk| Ok(StreamChunkWithState::from(chunk))) + })) + .boxed() + }, + )); + + let mut session = cluster.start_session(); + + session.run("set streaming_parallelism = 6").await?; + session.run("set sink_decouple = true").await?; + session + .run("create table test_table (id int primary key, name varchar) with (connector = 'test') FORMAT PLAIN ENCODE JSON") + .await?; + session + .run("create sink test_sink from test_table with (connector = 'blackhole')") + .await?; + + let mut count = 0; + let mut id_list = (0..100000).collect_vec(); + id_list.shuffle(&mut rand::thread_rng()); + let flush_freq = 50; + for id in &id_list[0..10000] { + let chunk = build_stream_chunk(once((*id as i32, format!("name-{}", id)))); + txs[id % source_parallelism].send(chunk).unwrap(); + count += 1; + if count % flush_freq == 0 { + sleep(Duration::from_millis(10)).await; + } + } + + session.run("drop sink test_sink").await?; + Ok(()) +}