Skip to content

Commit

Permalink
feat(storage): hash join and dynamic filter support spill anytime via…
Browse files Browse the repository at this point in the history
… gap epoch (#12028)
  • Loading branch information
wcy-fdu authored Nov 7, 2023
1 parent 2906017 commit 887655f
Show file tree
Hide file tree
Showing 79 changed files with 560 additions and 256 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions e2e_test/streaming/bug_fixes/issue_12299.slt
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
# https://github.com/risingwavelabs/risingwave/issues/12299
# TL;DR When upstream's stream key is not pk and the stream scan does not contain whole pk.

statement ok
SET RW_IMPLICIT_FLUSH TO true;

statement ok
create table t1(
id bigint primary key,
Expand Down
1 change: 1 addition & 0 deletions src/batch/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ workspace-hack = { path = "../workspace-hack" }
criterion = { workspace = true, features = ["async_tokio", "async"] }
rand = "0.8"
risingwave_expr_impl = { workspace = true }
risingwave_hummock_sdk = { workspace = true, features = ["enable_test_epoch"] }
tempfile = "3"
tikv-jemallocator = { workspace = true }

Expand Down
1 change: 1 addition & 0 deletions src/cmd_all/src/standalone.rs
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,7 @@ mod test {
connector_rpc_sink_payload_format: None,
config_path: "src/config/test.toml",
total_memory_bytes: 34359738368,
mem_table_spill_threshold: 4194304,
parallelism: 10,
role: Both,
metrics_level: None,
Expand Down
8 changes: 8 additions & 0 deletions src/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -582,6 +582,10 @@ pub struct StorageConfig {
pub enable_fast_compaction: bool,
#[serde(default, flatten)]
pub unrecognized: Unrecognized<Self>,

/// The spill threshold for mem table.
#[serde(default = "default::storage::mem_table_spill_threshold")]
pub mem_table_spill_threshold: usize,
}

#[derive(Clone, Debug, Serialize, Deserialize, DefaultFromSerde)]
Expand Down Expand Up @@ -1090,6 +1094,10 @@ pub mod default {
pub fn enable_fast_compaction() -> bool {
true
}

pub fn mem_table_spill_threshold() -> usize {
4 << 20
}
}

pub mod streaming {
Expand Down
36 changes: 17 additions & 19 deletions src/common/src/util/epoch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::cmp::Ordering;
use std::sync::LazyLock;
use std::time::{Duration, SystemTime};

Expand Down Expand Up @@ -42,24 +41,23 @@ impl Epoch {

#[must_use]
pub fn next(self) -> Self {
let physical_now = Epoch::physical_now();
let mut physical_now = Epoch::physical_now();
let prev_physical_time = self.physical_time();

let next_epoch = match physical_now.cmp(&prev_physical_time) {
Ordering::Greater => Self::from_physical_time(physical_now),
Ordering::Equal => {
tracing::warn!("New generate epoch is too close to the previous one.");
Epoch(self.0 + 1)
loop {
if physical_now > prev_physical_time {
break;
}
Ordering::Less => {
tracing::warn!(
"Clock goes backwards when calling Epoch::next(): prev={}, curr={}",
prev_physical_time,
physical_now
);
Epoch(self.0 + 1)
}
};
physical_now = Epoch::physical_now();

#[cfg(madsim)]
tokio::time::advance(std::time::Duration::from_micros(10));
#[cfg(not(madsim))]
std::hint::spin_loop();
}
// The last 16 bits of the previous epoch ((prev_epoch + 1, prev_epoch + 65536)) will be
// used as the gap epoch when the mem table spill occurs.
let next_epoch = Self::from_physical_time(physical_now);

assert!(next_epoch.0 > self.0);
next_epoch
Expand Down Expand Up @@ -117,7 +115,7 @@ impl Epoch {
}

pub const EPOCH_AVAILABLE_BITS: u64 = 16;
pub const MAX_SPILL_TIMES: u64 = 1 << EPOCH_AVAILABLE_BITS;
pub const MAX_SPILL_TIMES: u16 = ((1 << EPOCH_AVAILABLE_BITS) - 1) as u16;
pub const EPOCH_MASK: u64 = (1 << EPOCH_AVAILABLE_BITS) - 1;
pub const MAX_EPOCH: u64 = u64::MAX & !EPOCH_MASK;
impl From<u64> for Epoch {
Expand Down Expand Up @@ -207,8 +205,8 @@ mod tests {
assert_eq!(risingwave_st, *UNIX_RISINGWAVE_DATE_EPOCH);
}

#[test]
fn test_epoch_generate() {
#[tokio::test]
async fn test_epoch_generate() {
let mut prev_epoch = Epoch::now();
for _ in 0..1000 {
let epoch = prev_epoch.next();
Expand Down
1 change: 1 addition & 0 deletions src/compute/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ workspace-hack = { path = "../workspace-hack" }
[dev-dependencies]
futures-async-stream = { workspace = true }
rand = "0.8"
risingwave_hummock_sdk = { workspace = true, features = ["enable_test_epoch"] }
tempfile = "3"

[lints]
Expand Down
8 changes: 8 additions & 0 deletions src/compute/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,10 @@ pub struct ComputeNodeOpts {
#[clap(long, env = "RW_TOTAL_MEMORY_BYTES", default_value_t = default_total_memory_bytes())]
pub total_memory_bytes: usize,

/// Spill threshold for mem table.
#[clap(long, env = "RW_MEM_TABLE_SPILL_THRESHOLD", default_value_t = default_mem_table_spill_threshold())]
pub mem_table_spill_threshold: usize,

/// The parallelism that the compute node will register to the scheduler of the meta service.
#[clap(long, env = "RW_PARALLELISM", default_value_t = default_parallelism())]
#[override_opts(if_absent, path = streaming.actor_runtime_worker_threads_num)]
Expand Down Expand Up @@ -230,6 +234,10 @@ fn default_total_memory_bytes() -> usize {
(system_memory_available_bytes() as f64 * DEFAULT_MEMORY_PROPORTION) as usize
}

fn default_mem_table_spill_threshold() -> usize {
(4 << 20) as usize
}

fn default_parallelism() -> usize {
total_cpu_available().ceil() as usize
}
Expand Down
2 changes: 1 addition & 1 deletion src/config/example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ compactor_max_sst_key_count = 2097152
compact_iter_recreate_timeout_ms = 600000
compactor_max_sst_size = 536870912
enable_fast_compaction = true

mem_table_spill_threshold = 4194304

[storage.data_file_cache]
dir = ""
Expand Down
3 changes: 3 additions & 0 deletions src/ctl/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -53,5 +53,8 @@ uuid = { version = "1", features = ["v4"] }
[target.'cfg(not(madsim))'.dependencies]
workspace-hack = { path = "../workspace-hack" }

[dev-dependencies]
risingwave_hummock_sdk = { workspace = true, features = ["enable_test_epoch"] }

[lints]
workspace = true
5 changes: 3 additions & 2 deletions src/ctl/src/cmd_impl/hummock/list_kv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
use core::ops::Bound::Unbounded;

use risingwave_common::catalog::TableId;
use risingwave_common::util::epoch::MAX_EPOCH;
use risingwave_storage::hummock::CachePolicy;
use risingwave_storage::store::{PrefetchOptions, ReadOptions, StateStoreReadExt};

Expand All @@ -30,8 +31,8 @@ pub async fn list_kv(
let hummock = context
.hummock_store(HummockServiceOpts::from_env(data_dir)?)
.await?;
if epoch == u64::MAX {
tracing::info!("using u64::MAX as epoch");
if epoch == MAX_EPOCH {
tracing::info!("using MAX_EPOCH as epoch");
}
let scan_result = {
let range = (Unbounded, Unbounded);
Expand Down
4 changes: 3 additions & 1 deletion src/ctl/src/cmd_impl/hummock/list_version_deltas.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use risingwave_common::util::epoch::MAX_EPOCH;

use crate::CtlContext;

pub async fn list_version_deltas(
Expand All @@ -21,7 +23,7 @@ pub async fn list_version_deltas(
) -> anyhow::Result<()> {
let meta_client = context.meta_client().await?;
let resp = meta_client
.list_version_deltas(start_id, num_epochs, u64::MAX)
.list_version_deltas(start_id, num_epochs, MAX_EPOCH)
.await?;
println!("{:#?}", resp.version_deltas);
Ok(())
Expand Down
4 changes: 2 additions & 2 deletions src/ctl/src/cmd_impl/hummock/pause_resume.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use risingwave_common::util::epoch::MAX_EPOCH;
use risingwave_hummock_sdk::compaction_group::hummock_version_ext::HummockVersionUpdateExt;
use risingwave_hummock_sdk::HummockEpoch;

use crate::CtlContext;

Expand Down Expand Up @@ -62,7 +62,7 @@ pub async fn replay_version(context: &CtlContext) -> anyhow::Result<()> {
let mut current_delta_id = base_version.id + 1;
loop {
let deltas = meta_client
.list_version_deltas(current_delta_id, delta_fetch_size, HummockEpoch::MAX)
.list_version_deltas(current_delta_id, delta_fetch_size, MAX_EPOCH)
.await
.unwrap();
if deltas.version_deltas.is_empty() {
Expand Down
10 changes: 7 additions & 3 deletions src/ctl/src/cmd_impl/hummock/sst_dump.rs
Original file line number Diff line number Diff line change
Expand Up @@ -320,13 +320,17 @@ fn print_kv_pairs(
let full_val = block_iter.value();
let humm_val = HummockValue::from_slice(full_val)?;

let epoch = Epoch::from(full_key.epoch);
let epoch = Epoch::from(full_key.epoch_with_gap.pure_epoch());
let date_time = DateTime::<Utc>::from(epoch.as_system_time());

println!("\t\t key: {:?}, len={}", full_key, full_key.encoded_len());
println!("\t\t value: {:?}, len={}", humm_val, humm_val.encoded_len());
println!("\t\t epoch: {} ({})", epoch, date_time);

println!(
"\t\t epoch: {} offset = {} ({})",
epoch,
full_key.epoch_with_gap.offset(),
date_time
);
if args.print_table {
print_table_column(full_key, humm_val, table_data)?;
}
Expand Down
3 changes: 2 additions & 1 deletion src/ctl/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use anyhow::Result;
use clap::{Parser, Subcommand};
use cmd_impl::bench::BenchCommands;
use cmd_impl::hummock::SstDumpArgs;
use risingwave_common::util::epoch::MAX_EPOCH;
use risingwave_meta::backup_restore::RestoreOpts;
use risingwave_pb::meta::update_worker_node_schedulability_request::Schedulability;

Expand Down Expand Up @@ -156,7 +157,7 @@ enum HummockCommands {
DisableCommitEpoch,
/// list all Hummock key-value pairs
ListKv {
#[clap(short, long = "epoch", default_value_t = u64::MAX)]
#[clap(short, long = "epoch", default_value_t = MAX_EPOCH)]
epoch: u64,

#[clap(short, long = "table-id")]
Expand Down
3 changes: 2 additions & 1 deletion src/frontend/src/meta_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
use std::collections::HashMap;

use risingwave_common::system_param::reader::SystemParamsReader;
use risingwave_common::util::epoch::MAX_EPOCH;
use risingwave_pb::backup_service::MetaSnapshotMetadata;
use risingwave_pb::catalog::Table;
use risingwave_pb::ddl_service::DdlProgress;
Expand Down Expand Up @@ -219,7 +220,7 @@ impl FrontendMetaClient for FrontendMetaClientImpl {
async fn list_version_deltas(&self) -> Result<Vec<HummockVersionDelta>> {
// FIXME #8612: there can be lots of version deltas, so better to fetch them by pages and refactor `SysRowSeqScanExecutor` to yield multiple chunks.
self.0
.list_version_deltas(0, u32::MAX, u64::MAX)
.list_version_deltas(0, u32::MAX, MAX_EPOCH)
.await
.map(|v| v.version_deltas)
}
Expand Down
1 change: 1 addition & 0 deletions src/jni_core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ tracing = "0.1"
[dev-dependencies]
expect-test = "1"
risingwave_expr = { workspace = true }
risingwave_hummock_sdk = { workspace = true, features = ["enable_test_epoch"] }

[lints]
workspace = true
1 change: 1 addition & 0 deletions src/meta/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ workspace-hack = { path = "../workspace-hack" }
assert_matches = "1"
maplit = "1.0.2"
rand = "0.8"
risingwave_hummock_sdk = { workspace = true, features = ["enable_test_epoch"] }
risingwave_test_runner = { workspace = true }

[features]
Expand Down
7 changes: 3 additions & 4 deletions src/meta/src/hummock/compaction/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
pub mod compaction_config;
mod overlap_strategy;
use risingwave_common::catalog::TableOption;
use risingwave_common::util::epoch::MAX_EPOCH;
use risingwave_hummock_sdk::prost_key_range::KeyRangeExt;
use risingwave_pb::hummock::compact_task::{self, TaskStatus, TaskType};

Expand All @@ -28,9 +29,7 @@ use std::fmt::{Debug, Formatter};
use std::sync::Arc;

use picker::{LevelCompactionPicker, TierCompactionPicker};
use risingwave_hummock_sdk::{
can_concat, CompactionGroupId, HummockCompactionTaskId, HummockEpoch,
};
use risingwave_hummock_sdk::{can_concat, CompactionGroupId, HummockCompactionTaskId};
use risingwave_pb::hummock::compaction_config::CompactionMode;
use risingwave_pb::hummock::hummock_version::Levels;
use risingwave_pb::hummock::{CompactTask, CompactionConfig, KeyRange, LevelType};
Expand Down Expand Up @@ -131,7 +130,7 @@ impl CompactStatus {
let compact_task = CompactTask {
input_ssts: ret.input.input_levels,
splits: vec![KeyRange::inf()],
watermark: HummockEpoch::MAX,
watermark: MAX_EPOCH,
sorted_output_ssts: vec![],
task_id,
target_level: target_level_id as u32,
Expand Down
5 changes: 3 additions & 2 deletions src/meta/src/hummock/metrics_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,12 @@ use std::time::{SystemTime, UNIX_EPOCH};

use itertools::{enumerate, Itertools};
use prost::Message;
use risingwave_common::util::epoch::MAX_EPOCH;
use risingwave_hummock_sdk::compaction_group::hummock_version_ext::{
object_size_map, BranchedSstInfo, HummockVersionExt,
};
use risingwave_hummock_sdk::{
CompactionGroupId, HummockContextId, HummockEpoch, HummockSstableObjectId, HummockVersionId,
CompactionGroupId, HummockContextId, HummockSstableObjectId, HummockVersionId,
};
use risingwave_pb::hummock::hummock_version::Levels;
use risingwave_pb::hummock::write_limits::WriteLimit;
Expand Down Expand Up @@ -344,7 +345,7 @@ pub fn trigger_pin_unpin_snapshot_state(
{
metrics.min_pinned_epoch.set(m as i64);
} else {
metrics.min_pinned_epoch.set(HummockEpoch::MAX as _);
metrics.min_pinned_epoch.set(MAX_EPOCH as _);
}
}

Expand Down
3 changes: 3 additions & 0 deletions src/rpc_client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@ tower = "0.4"
tracing = "0.1"
url = "2.4.1"

[dev-dependencies]
risingwave_hummock_sdk = { workspace = true, features = ["enable_test_epoch"] }

[target.'cfg(not(madsim))'.dependencies]
workspace-hack = { path = "../workspace-hack" }

Expand Down
1 change: 1 addition & 0 deletions src/storage/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ workspace-hack = { path = "../workspace-hack" }
[dev-dependencies]
criterion = { workspace = true, features = ["async_futures"] }
moka = { version = "0.12", features = ["future"] }
risingwave_hummock_sdk = { workspace = true, features = ["enable_test_epoch"] }
risingwave_test_runner = { workspace = true }
uuid = { version = "1", features = ["v4"] }

Expand Down
3 changes: 3 additions & 0 deletions src/storage/backup/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,5 +31,8 @@ serde_json = "1"
thiserror = "1"
twox-hash = "1"

[dev-dependencies]
risingwave_hummock_sdk = { workspace = true, features = ["enable_test_epoch"] }

[lints]
workspace = true
4 changes: 4 additions & 0 deletions src/storage/hummock_sdk/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,3 +27,7 @@ workspace-hack = { path = "../../workspace-hack" }

[lints]
workspace = true

[features]
enable_test_epoch = []

Loading

0 comments on commit 887655f

Please sign in to comment.