Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/main' into peng/fe-remove-pu-slot
Browse files Browse the repository at this point in the history
Shanicky Chen committed May 17, 2024
2 parents f54bb6c + 347e186 commit 95ded68
Showing 32 changed files with 762 additions and 1,398 deletions.
28 changes: 24 additions & 4 deletions backwards-compat-tests/scripts/run_local.sh
Original file line number Diff line number Diff line change
@@ -13,8 +13,11 @@ trap on_exit EXIT
source backwards-compat-tests/scripts/utils.sh

configure_rw() {
echo "--- Setting up cluster config"
cat <<EOF > risedev-profiles.user.yml
VERSION="$1"

echo "--- Setting up cluster config"
if version_le "$VERSION" "1.9.0"; then
cat <<EOF > risedev-profiles.user.yml
full-without-monitoring:
steps:
- use: minio
@@ -28,6 +31,23 @@ full-without-monitoring:
address: message_queue
port: 29092
EOF
else
cat <<EOF > risedev-profiles.user.yml
full-without-monitoring:
steps:
- use: minio
- use: etcd
- use: meta-node
meta-backend: etcd
- use: compute-node
- use: frontend
- use: compactor
- use: kafka
user-managed: true
address: message_queue
port: 29092
EOF
fi

cat <<EOF > risedev-components.user.env
RISEDEV_CONFIGURED=false
@@ -58,11 +78,11 @@ main() {
set -euo pipefail
get_rw_versions
setup_old_cluster
configure_rw
configure_rw "$OLD_VERSION"
seed_old_cluster "$OLD_VERSION"

setup_new_cluster
configure_rw
configure_rw "99.99.99"
validate_new_cluster "$NEW_VERSION"
}

16 changes: 15 additions & 1 deletion ci/scripts/backwards-compat-test.sh
Original file line number Diff line number Diff line change
@@ -41,7 +41,8 @@ VERSION="$1"
ENABLE_BUILD="$2"

echo "--- Setting up cluster config"
cat <<EOF > risedev-profiles.user.yml
if version_le "$VERSION" "1.9.0"; then
cat <<EOF > risedev-profiles.user.yml
full-without-monitoring:
steps:
- use: minio
@@ -51,6 +52,19 @@ full-without-monitoring:
- use: frontend
- use: compactor
EOF
else
cat <<EOF > risedev-profiles.user.yml
full-without-monitoring:
steps:
- use: minio
- use: etcd
- use: meta-node
meta-backend: etcd
- use: compute-node
- use: frontend
- use: compactor
EOF
fi

cat <<EOF > risedev-components.user.env
RISEDEV_CONFIGURED=true
2 changes: 1 addition & 1 deletion ci/workflows/pull-request.yml
Original file line number Diff line number Diff line change
@@ -97,7 +97,7 @@ steps:
config: ci/docker-compose.yml
mount-buildkite-agent: true
- ./ci/plugins/upload-failure-logs
timeout_in_minutes: 21
timeout_in_minutes: 23
retry: *auto-retry

- label: "end-to-end test (parallel)"
2 changes: 1 addition & 1 deletion e2e_test/source/cdc/cdc.load.slt
Original file line number Diff line number Diff line change
@@ -161,7 +161,7 @@ create table shipments_2 (
# Test user-provided publication
statement ok
create table t1_rw (
v1 int primary key,
"V1" int primary key,
v3 varchar
) with (
connector = 'postgres-cdc',
4 changes: 2 additions & 2 deletions e2e_test/source/cdc/postgres_cdc.sql
Original file line number Diff line number Diff line change
@@ -31,8 +31,8 @@ INSERT INTO person VALUES (1001, 'peter white', 'myckhsp@xpmpe.com', '1781 2313
INSERT INTO person VALUES (1002, 'sarah spencer', 'wipvdbm@dkaap.com', '3453 4987 9481 6270', 'los angeles');

create schema abs;
create table abs.t1 (v1 int primary key, v2 double precision, v3 varchar, v4 numeric);
create publication my_publicaton for table abs.t1 (v1, v3);
create table abs.t1 ("V1" int primary key, v2 double precision, v3 varchar, v4 numeric);
create publication my_publicaton for table abs.t1 ("V1", v3);
insert into abs.t1 values (1, 1.1, 'aaa', '5431.1234');


4 changes: 2 additions & 2 deletions integration_tests/kinesis-s3-source/create_mv.sql
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
CREATE MATERIALIZED VIEW ad_ctr_mv AS
CREATE MATERIALIZED VIEW ad_ctr AS
SELECT
ad_clicks.ad_id AS ad_id,
ad_clicks.clicks_count :: NUMERIC / ad_impressions.impressions_count AS ctr
@@ -23,7 +23,7 @@ FROM
ai.ad_id
) AS ad_clicks ON ad_impressions.ad_id = ad_clicks.ad_id;

CREATE MATERIALIZED VIEW ad_ctr_5min_mv AS
CREATE MATERIALIZED VIEW ad_ctr_5min AS
SELECT
ac.ad_id AS ad_id,
ac.clicks_count :: NUMERIC / ai.impressions_count AS ctr,
Original file line number Diff line number Diff line change
@@ -180,8 +180,7 @@ private void validateTableSchema() throws SQLException {
var pkFields = new HashSet<String>();
while (res.next()) {
var name = res.getString(1);
// RisingWave always use lower case for column name
pkFields.add(name.toLowerCase());
pkFields.add(name);
}

if (!ValidatorUtils.isPrimaryKeyMatch(tableSchema, pkFields)) {
25 changes: 14 additions & 11 deletions src/common/metrics/src/monitor/rwlock.rs
Original file line number Diff line number Diff line change
@@ -16,31 +16,34 @@ use prometheus::HistogramVec;
use tokio::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard};

pub struct MonitoredRwLock<T> {
// labels: [lock_name, lock_type]
metrics: HistogramVec,
inner: RwLock<T>,
lock_name: &'static str,
}

impl<T> MonitoredRwLock<T> {
pub fn new(metrics: HistogramVec, val: T) -> Self {
pub fn new(metrics: HistogramVec, val: T, lock_name: &'static str) -> Self {
Self {
metrics,
inner: RwLock::new(val),
lock_name,
}
}

pub async fn read<'a, 'b>(
&'a self,
label_values: &'b [&'static str],
) -> RwLockReadGuard<'a, T> {
let _timer = self.metrics.with_label_values(label_values).start_timer();
pub async fn read(&self) -> RwLockReadGuard<'_, T> {
let _timer = self
.metrics
.with_label_values(&[self.lock_name, "read"])
.start_timer();
self.inner.read().await
}

pub async fn write<'a, 'b>(
&'a self,
label_values: &'b [&'static str],
) -> RwLockWriteGuard<'a, T> {
let _timer = self.metrics.with_label_values(label_values).start_timer();
pub async fn write(&self) -> RwLockWriteGuard<'_, T> {
let _timer = self
.metrics
.with_label_values(&[self.lock_name, "write"])
.start_timer();
self.inner.write().await
}
}
1 change: 1 addition & 0 deletions src/common/src/config.rs
Original file line number Diff line number Diff line change
@@ -620,6 +620,7 @@ pub struct StorageConfig {

/// The threshold for the number of immutable memtables to merge to a new imm.
#[serde(default = "default::storage::imm_merge_threshold")]
#[deprecated]
pub imm_merge_threshold: usize,

/// Whether to enable write conflict detection
6 changes: 6 additions & 0 deletions src/config/ci.toml
Original file line number Diff line number Diff line change
@@ -16,6 +16,12 @@ stream_exchange_concurrent_barriers = 10
[storage]
imm_merge_threshold = 2

[storage.object_store.retry]
streaming_upload_attempt_timeout_ms = 10000
upload_retry_attempts = 5
read_attempt_timeout_ms = 16000
read_retry_attempts = 6

[system]
barrier_interval_ms = 250
checkpoint_frequency = 5
16 changes: 13 additions & 3 deletions src/frontend/src/handler/create_source.rs
Original file line number Diff line number Diff line change
@@ -503,7 +503,6 @@ pub(crate) async fn bind_columns_from_source(
fn bind_columns_from_source_for_cdc(
session: &SessionImpl,
source_schema: &ConnectorSchema,
_with_properties: &HashMap<String, String>,
) -> Result<(Option<Vec<ColumnCatalog>>, StreamSourceInfo)> {
let format_encode_options = WithOptions::try_from(source_schema.row_options())?.into_inner();
let mut format_encode_options_to_consume = format_encode_options.clone();
@@ -1391,7 +1390,18 @@ pub async fn bind_create_source(
}
debug_assert!(is_column_ids_dedup(&columns));

let (mut columns, pk_col_ids, row_id_index) = bind_pk_on_relation(columns, pk_names, true)?;
let must_need_pk = if is_create_source {
with_properties.connector_need_pk()
} else {
// For those connectors that do not need generate a `row_id`` column in the source schema such as iceberg.
// But in such case, we can not create mv or table on the source because there is not a pk.
assert!(with_properties.connector_need_pk());

true
};

let (mut columns, pk_col_ids, row_id_index) =
bind_pk_on_relation(columns, pk_names, must_need_pk)?;

let watermark_descs =
bind_source_watermark(session, source_name.clone(), source_watermarks, &columns)?;
@@ -1471,7 +1481,7 @@ pub async fn handle_create_source(
|| (with_properties.is_kafka_connector() && session.config().rw_enable_shared_source());

let (columns_from_resolve_source, mut source_info) = if create_cdc_source_job {
bind_columns_from_source_for_cdc(&session, &source_schema, &with_properties)?
bind_columns_from_source_for_cdc(&session, &source_schema)?
} else {
bind_columns_from_source(&session, &source_schema, &with_properties).await?
};
2 changes: 0 additions & 2 deletions src/meta/src/controller/catalog.rs
Original file line number Diff line number Diff line change
@@ -2994,8 +2994,6 @@ impl CatalogControllerInner {
async fn list_subscriptions(&self) -> MetaResult<Vec<PbSubscription>> {
let subscription_objs = Subscription::find()
.find_also_related(Object)
.join(JoinType::LeftJoin, object::Relation::StreamingJob.def())
.filter(streaming_job::Column::JobStatus.eq(JobStatus::Created))
.all(&self.db)
.await?;

22 changes: 10 additions & 12 deletions src/meta/src/hummock/manager/checkpoint.rs
Original file line number Diff line number Diff line change
@@ -17,7 +17,6 @@ use std::ops::Bound::{Excluded, Included};
use std::ops::{Deref, DerefMut};
use std::sync::atomic::Ordering;

use function_name::named;
use risingwave_hummock_sdk::compaction_group::hummock_version_ext::{
object_size_map, summarize_group_deltas,
};
@@ -32,7 +31,6 @@ use tracing::warn;

use crate::hummock::error::Result;
use crate::hummock::manager::versioning::Versioning;
use crate::hummock::manager::{read_lock, write_lock};
use crate::hummock::metrics_utils::{trigger_gc_stat, trigger_split_stat};
use crate::hummock::HummockManager;

@@ -122,11 +120,10 @@ impl HummockManager {
/// Returns the diff between new and old checkpoint id.
/// Note that this method must not be called concurrently, because internally it doesn't hold
/// lock throughout the method.
#[named]
pub async fn create_version_checkpoint(&self, min_delta_log_num: u64) -> Result<u64> {
let timer = self.metrics.version_checkpoint_latency.start_timer();
// 1. hold read lock and create new checkpoint
let versioning_guard = read_lock!(self, versioning).await;
let versioning_guard = self.versioning.read().await;
let versioning: &Versioning = versioning_guard.deref();
let current_version: &HummockVersion = &versioning.current_version;
let old_checkpoint: &HummockVersionCheckpoint = &versioning.checkpoint;
@@ -137,9 +134,10 @@ impl HummockManager {
}
if cfg!(test) && new_checkpoint_id == old_checkpoint_id {
drop(versioning_guard);
let mut versioning = write_lock!(self, versioning).await;
versioning.mark_objects_for_deletion();
let min_pinned_version_id = versioning.min_pinned_version_id();
let versioning = self.versioning.read().await;
let context_info = self.context_info.read().await;
versioning.mark_objects_for_deletion(&context_info, &self.delete_object_tracker);
let min_pinned_version_id = context_info.min_pinned_version_id();
trigger_gc_stat(&self.metrics, &versioning.checkpoint, min_pinned_version_id);
return Ok(0);
}
@@ -224,16 +222,17 @@ impl HummockManager {
}
}
// 3. hold write lock and update in memory state
let mut versioning_guard = write_lock!(self, versioning).await;
let mut versioning_guard = self.versioning.write().await;
let versioning = versioning_guard.deref_mut();
let context_info = self.context_info.read().await;
assert!(new_checkpoint.version.id > versioning.checkpoint.version.id);
versioning.checkpoint = new_checkpoint;
// Not delete stale objects when archive is enabled
if !self.env.opts.enable_hummock_data_archive {
versioning.mark_objects_for_deletion();
versioning.mark_objects_for_deletion(&context_info, &self.delete_object_tracker);
}

let min_pinned_version_id = versioning.min_pinned_version_id();
let min_pinned_version_id = context_info.min_pinned_version_id();
trigger_gc_stat(&self.metrics, &versioning.checkpoint, min_pinned_version_id);
trigger_split_stat(&self.metrics, &versioning.current_version);
drop(versioning_guard);
@@ -260,9 +259,8 @@ impl HummockManager {
self.pause_version_checkpoint.load(Ordering::Relaxed)
}

#[named]
pub async fn get_checkpoint_version(&self) -> HummockVersion {
let versioning_guard = read_lock!(self, versioning).await;
let versioning_guard = self.versioning.read().await;
versioning_guard.checkpoint.version.clone()
}
}
20 changes: 6 additions & 14 deletions src/meta/src/hummock/manager/compaction.rs
Original file line number Diff line number Diff line change
@@ -15,7 +15,6 @@
use std::collections::{BTreeMap, HashMap, HashSet};
use std::sync::Arc;

use function_name::named;
use futures::future::Shared;
use itertools::Itertools;
use risingwave_hummock_sdk::{CompactionGroupId, HummockCompactionTaskId};
@@ -37,7 +36,7 @@ use tokio::sync::oneshot::Receiver as OneShotReceiver;
use crate::hummock::compaction::selector::level_selector::PickerInfo;
use crate::hummock::compaction::selector::DynamicLevelSelectorCore;
use crate::hummock::compaction::{CompactStatus, CompactionDeveloperConfig, CompactionSelector};
use crate::hummock::manager::{init_selectors, read_lock};
use crate::hummock::manager::init_selectors;
use crate::hummock::HummockManager;

const MAX_SKIP_TIMES: usize = 8;
@@ -54,17 +53,12 @@ pub struct Compaction {
}

impl HummockManager {
#[named]
pub async fn get_assigned_compact_task_num(&self) -> u64 {
read_lock!(self, compaction)
.await
.compact_task_assignment
.len() as u64
self.compaction.read().await.compact_task_assignment.len() as u64
}

#[named]
pub async fn list_all_tasks_ids(&self) -> Vec<HummockCompactionTaskId> {
let compaction = read_lock!(self, compaction).await;
let compaction = self.compaction.read().await;

compaction
.compaction_statuses
@@ -77,11 +71,10 @@ impl HummockManager {
.collect_vec()
}

#[named]
pub async fn list_compaction_status(
&self,
) -> (Vec<PbCompactStatus>, Vec<CompactTaskAssignment>) {
let compaction = read_lock!(self, compaction).await;
let compaction = self.compaction.read().await;
(
compaction.compaction_statuses.values().map_into().collect(),
compaction
@@ -92,14 +85,13 @@ impl HummockManager {
)
}

#[named]
pub async fn get_compaction_scores(
&self,
compaction_group_id: CompactionGroupId,
) -> Vec<PickerInfo> {
let (status, levels, group) = {
let compaction = read_lock!(self, compaction).await;
let versioning = read_lock!(self, versioning).await;
let compaction = self.compaction.read().await;
let versioning = self.versioning.read().await;
let config_manager = self.compaction_group_manager.read().await;
match (
compaction.compaction_statuses.get(&compaction_group_id),
67 changes: 26 additions & 41 deletions src/meta/src/hummock/manager/compaction_group_manager.rs
Original file line number Diff line number Diff line change
@@ -16,7 +16,6 @@ use std::collections::{BTreeMap, HashMap, HashSet};
use std::ops::DerefMut;
use std::sync::Arc;

use function_name::named;
use itertools::Itertools;
use risingwave_common::catalog::TableId;
use risingwave_hummock_sdk::compaction_group::hummock_version_ext::{
@@ -31,19 +30,19 @@ use risingwave_pb::hummock::group_delta::DeltaType;
use risingwave_pb::hummock::hummock_version_delta::GroupDeltas;
use risingwave_pb::hummock::rise_ctl_update_compaction_config_request::mutable_config::MutableConfig;
use risingwave_pb::hummock::subscribe_compaction_event_request::ReportTask;
use risingwave_pb::hummock::write_limits::WriteLimit;
use risingwave_pb::hummock::{
compact_task, CompactionConfig, CompactionGroupInfo, CompatibilityVersion, GroupConstruct,
GroupDelta, GroupDestroy, GroupMetaChange,
};
use thiserror_ext::AsReport;
use tokio::sync::{OnceCell, RwLock};
use tokio::sync::OnceCell;

use super::write_lock;
use crate::hummock::compaction::compaction_config::{
validate_compaction_config, CompactionConfigBuilder,
};
use crate::hummock::error::{Error, Result};
use crate::hummock::manager::{commit_multi_var, create_trx_wrapper, read_lock, HummockManager};
use crate::hummock::manager::{commit_multi_var, create_trx_wrapper, HummockManager};
use crate::hummock::metrics_utils::remove_compaction_group_in_sst_stat;
use crate::hummock::model::CompactionGroup;
use crate::hummock::sequence::{next_compaction_group_id, next_sstable_object_id};
@@ -57,7 +56,7 @@ use crate::storage::MetaStore;
impl HummockManager {
pub(super) async fn build_compaction_group_manager(
env: &MetaSrvEnv,
) -> Result<RwLock<CompactionGroupManager>> {
) -> Result<CompactionGroupManager> {
let default_config = match env.opts.compaction_config.as_ref() {
None => CompactionConfigBuilder::new().build(),
Some(opt) => CompactionConfigBuilder::with_opt(opt).build(),
@@ -68,21 +67,21 @@ impl HummockManager {
pub(super) async fn build_compaction_group_manager_with_config(
env: &MetaSrvEnv,
default_config: CompactionConfig,
) -> Result<RwLock<CompactionGroupManager>> {
let compaction_group_manager = RwLock::new(CompactionGroupManager {
) -> Result<CompactionGroupManager> {
let mut compaction_group_manager = CompactionGroupManager {
compaction_groups: BTreeMap::new(),
default_config,
write_limit: Default::default(),
meta_store_impl: env.meta_store_ref(),
});
compaction_group_manager.write().await.init().await?;
};
compaction_group_manager.init().await?;
Ok(compaction_group_manager)
}

/// Should not be called inside [`HummockManager`], because it requests locks internally.
/// The implementation acquires `versioning` lock.
#[named]
pub async fn compaction_group_ids(&self) -> Vec<CompactionGroupId> {
get_compaction_group_ids(&read_lock!(self, versioning).await.current_version).collect_vec()
get_compaction_group_ids(&self.versioning.read().await.current_version).collect_vec()
}

/// The implementation acquires `compaction_group_manager` lock.
@@ -141,10 +140,9 @@ impl HummockManager {
/// Unregisters stale members and groups
/// The caller should ensure `table_fragments_list` remain unchanged during `purge`.
/// Currently `purge` is only called during meta service start ups.
#[named]
pub async fn purge(&self, valid_ids: &[u32]) -> Result<()> {
let registered_members =
get_member_table_ids(&read_lock!(self, versioning).await.current_version);
get_member_table_ids(&self.versioning.read().await.current_version);
let to_unregister = registered_members
.into_iter()
.filter(|table_id| !valid_ids.contains(table_id))
@@ -155,15 +153,14 @@ impl HummockManager {
}

/// The implementation acquires `versioning` lock.
#[named]
pub async fn register_table_ids(
&self,
pairs: &[(StateTableId, CompactionGroupId)],
) -> Result<()> {
if pairs.is_empty() {
return Ok(());
}
let mut versioning_guard = write_lock!(self, versioning).await;
let mut versioning_guard = self.versioning.write().await;
let versioning = versioning_guard.deref_mut();
let current_version = &versioning.current_version;

@@ -250,12 +247,11 @@ impl HummockManager {
Ok(())
}

#[named]
pub async fn unregister_table_ids(&self, table_ids: &[StateTableId]) -> Result<()> {
if table_ids.is_empty() {
return Ok(());
}
let mut versioning_guard = write_lock!(self, versioning).await;
let mut versioning_guard = self.versioning.write().await;
let versioning = versioning_guard.deref_mut();
let current_version = &versioning.current_version;
let mut new_version_delta = create_trx_wrapper!(
@@ -383,9 +379,8 @@ impl HummockManager {

/// Gets complete compaction group info.
/// It is the aggregate of `HummockVersion` and `CompactionGroupConfig`
#[named]
pub async fn list_compaction_group(&self) -> Vec<CompactionGroupInfo> {
let mut versioning_guard = write_lock!(self, versioning).await;
let mut versioning_guard = self.versioning.write().await;
let versioning = versioning_guard.deref_mut();
let current_version = &versioning.current_version;
let mut compaction_groups = vec![];
@@ -431,8 +426,7 @@ impl HummockManager {

/// move some table to another compaction-group. Create a new compaction group if it does not
/// exist.
/// TODO: Move table_to_partition in result to compaction group
#[named]
/// TODO: Move `table_to_partition` in result to compaction group
pub async fn move_state_table_to_compaction_group(
&self,
parent_group_id: CompactionGroupId,
@@ -444,8 +438,8 @@ impl HummockManager {
return Ok((parent_group_id, table_to_partition));
}
let table_ids = table_ids.iter().cloned().unique().collect_vec();
let compaction_guard = write_lock!(self, compaction).await;
let mut versioning_guard = write_lock!(self, versioning).await;
let compaction_guard = self.compaction.write().await;
let mut versioning_guard = self.versioning.write().await;
let versioning = versioning_guard.deref_mut();
let current_version = &versioning.current_version;
// Validate parameters.
@@ -604,11 +598,10 @@ impl HummockManager {
Ok((target_compaction_group_id, table_to_partition))
}

#[named]
pub async fn calculate_compaction_group_statistic(&self) -> Vec<TableGroupInfo> {
let mut infos = vec![];
{
let versioning_guard = read_lock!(self, versioning).await;
let versioning_guard = self.versioning.read().await;
let version = &versioning_guard.current_version;
for (group_id, group) in &version.levels {
let mut group_info = TableGroupInfo {
@@ -649,6 +642,8 @@ impl HummockManager {
pub(super) struct CompactionGroupManager {
compaction_groups: BTreeMap<CompactionGroupId, CompactionGroup>,
default_config: CompactionConfig,
/// Tables that write limit is trigger for.
pub write_limit: HashMap<CompactionGroupId, WriteLimit>,
meta_store_impl: MetaStoreImpl,
}

@@ -871,38 +866,30 @@ mod tests {
#[tokio::test]
async fn test_inner() {
let (env, ..) = setup_compute_env(8080).await;
let inner = HummockManager::build_compaction_group_manager(&env)
let mut inner = HummockManager::build_compaction_group_manager(&env)
.await
.unwrap();
assert_eq!(inner.read().await.compaction_groups.len(), 2);
assert_eq!(inner.compaction_groups.len(), 2);
inner
.write()
.await
.update_compaction_config(&[100, 200], &[])
.await
.unwrap_err();
inner
.write()
.await
.get_or_insert_compaction_group_configs(&[100, 200])
.await
.unwrap();
assert_eq!(inner.read().await.compaction_groups.len(), 4);
let inner = HummockManager::build_compaction_group_manager(&env)
assert_eq!(inner.compaction_groups.len(), 4);
let mut inner = HummockManager::build_compaction_group_manager(&env)
.await
.unwrap();
assert_eq!(inner.read().await.compaction_groups.len(), 4);
assert_eq!(inner.compaction_groups.len(), 4);
inner
.write()
.await
.update_compaction_config(&[100, 200], &[MutableConfig::MaxSubCompaction(123)])
.await
.unwrap();
assert_eq!(inner.read().await.compaction_groups.len(), 4);
assert_eq!(inner.compaction_groups.len(), 4);
assert_eq!(
inner
.read()
.await
.try_get_compaction_group_config(100)
.unwrap()
.compaction_config
@@ -911,8 +898,6 @@ mod tests {
);
assert_eq!(
inner
.read()
.await
.try_get_compaction_group_config(200)
.unwrap()
.compaction_config
110 changes: 77 additions & 33 deletions src/meta/src/hummock/manager/context.rs
Original file line number Diff line number Diff line change
@@ -12,35 +12,41 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::{HashMap, HashSet};
use std::ops::DerefMut;
use std::collections::{BTreeMap, HashMap, HashSet};

use fail::fail_point;
use function_name::named;
use itertools::Itertools;
use risingwave_hummock_sdk::version::HummockVersion;
use risingwave_hummock_sdk::{
ExtendedSstableInfo, HummockContextId, HummockEpoch, HummockSstableObjectId,
ExtendedSstableInfo, HummockContextId, HummockEpoch, HummockSstableObjectId, HummockVersionId,
};
use risingwave_pb::hummock::ValidationTask;
use risingwave_pb::hummock::{HummockPinnedSnapshot, HummockPinnedVersion, ValidationTask};

use crate::hummock::error::{Error, Result};
use crate::hummock::manager::{
commit_multi_var, create_trx_wrapper, read_lock, start_measure_real_process_timer, write_lock,
commit_multi_var, create_trx_wrapper, start_measure_real_process_timer,
};
use crate::hummock::HummockManager;
use crate::manager::META_NODE_ID;
use crate::manager::{MetaStoreImpl, MetadataManager, META_NODE_ID};
use crate::model::{BTreeMapTransaction, BTreeMapTransactionWrapper, ValTransaction};
use crate::storage::MetaStore;

impl HummockManager {
#[derive(Default)]
pub(super) struct ContextInfo {
pub pinned_versions: BTreeMap<HummockContextId, HummockPinnedVersion>,
pub pinned_snapshots: BTreeMap<HummockContextId, HummockPinnedSnapshot>,
/// `version_safe_points` is similar to `pinned_versions` expect for being a transient state.
pub version_safe_points: Vec<HummockVersionId>,
}

impl ContextInfo {
/// Release resources pinned by these contexts, including:
/// - Version
/// - Snapshot
#[named]
pub async fn release_contexts(
&self,
async fn release_contexts(
&mut self,
context_ids: impl AsRef<[HummockContextId]>,
meta_store_ref: MetaStoreImpl,
) -> Result<()> {
fail_point!("release_contexts_metastore_err", |_| Err(Error::MetaStore(
anyhow::anyhow!("failpoint metastore error")
@@ -49,70 +55,102 @@ impl HummockManager {
anyhow::anyhow!("failpoint internal error")
)));

let mut versioning_guard = write_lock!(self, versioning).await;
let versioning = versioning_guard.deref_mut();
let mut pinned_versions = create_trx_wrapper!(
self.meta_store_ref(),
meta_store_ref,
BTreeMapTransactionWrapper,
BTreeMapTransaction::new(&mut versioning.pinned_versions,)
BTreeMapTransaction::new(&mut self.pinned_versions,)
);
let mut pinned_snapshots = create_trx_wrapper!(
self.meta_store_ref(),
meta_store_ref,
BTreeMapTransactionWrapper,
BTreeMapTransaction::new(&mut versioning.pinned_snapshots,)
BTreeMapTransaction::new(&mut self.pinned_snapshots,)
);
for context_id in context_ids.as_ref() {
pinned_versions.remove(*context_id);
pinned_snapshots.remove(*context_id);
}
commit_multi_var!(self.meta_store_ref(), pinned_versions, pinned_snapshots)?;
commit_multi_var!(meta_store_ref, pinned_versions, pinned_snapshots)?;

Ok(())
}
}

impl HummockManager {
pub async fn release_contexts(
&self,
context_ids: impl AsRef<[HummockContextId]>,
) -> Result<()> {
let mut context_info = self.context_info.write().await;
context_info
.release_contexts(context_ids, self.meta_store_ref())
.await?;
#[cfg(test)]
{
drop(versioning_guard);
drop(context_info);
self.check_state_consistency().await;
}

Ok(())
}

/// Checks whether `context_id` is valid.
pub async fn check_context(&self, context_id: HummockContextId) -> Result<bool> {
Ok(self
.metadata_manager()
self.context_info
.read()
.await
.check_context(context_id, &self.metadata_manager)
.await
}
}

impl ContextInfo {
/// Checks whether `context_id` is valid.
///
/// Need `&self` to sync with `release_context`
pub(super) async fn check_context(
&self,
context_id: HummockContextId,
metadata_manager: &MetadataManager,
) -> Result<bool> {
Ok(metadata_manager
.get_worker_by_id(context_id)
.await
.map_err(|err| Error::MetaStore(err.into()))?
.is_some())
}
}

impl HummockManager {
/// Release invalid contexts, aka worker node ids which are no longer valid in `ClusterManager`.
#[named]
pub(super) async fn release_invalid_contexts(&self) -> Result<Vec<HummockContextId>> {
let active_context_ids = {
let compaction_guard = read_lock!(self, compaction).await;
let versioning_guard = read_lock!(self, versioning).await;
let _timer = start_measure_real_process_timer!(self);
let (active_context_ids, mut context_info) = {
let compaction_guard = self.compaction.read().await;
let context_info = self.context_info.write().await;
let _timer = start_measure_real_process_timer!(self, "release_invalid_contexts");
let mut active_context_ids = HashSet::new();
active_context_ids.extend(
compaction_guard
.compact_task_assignment
.values()
.map(|c| c.context_id),
);
active_context_ids.extend(versioning_guard.pinned_versions.keys());
active_context_ids.extend(versioning_guard.pinned_snapshots.keys());
active_context_ids
active_context_ids.extend(context_info.pinned_versions.keys());
active_context_ids.extend(context_info.pinned_snapshots.keys());
(active_context_ids, context_info)
};

let mut invalid_context_ids = vec![];
for active_context_id in &active_context_ids {
if !self.check_context(*active_context_id).await? {
if !context_info
.check_context(*active_context_id, &self.metadata_manager)
.await?
{
invalid_context_ids.push(*active_context_id);
}
}

self.release_contexts(&invalid_context_ids).await?;
context_info
.release_contexts(&invalid_context_ids, self.meta_store_ref())
.await?;

Ok(invalid_context_ids)
}
@@ -133,7 +171,13 @@ impl HummockManager {
continue;
}
}
if !self.check_context(*context_id).await? {
if !self
.context_info
.read()
.await
.check_context(*context_id, &self.metadata_manager)
.await?
{
return Err(Error::InvalidSst(*sst_id));
}
}
69 changes: 45 additions & 24 deletions src/meta/src/hummock/manager/gc.rs
Original file line number Diff line number Diff line change
@@ -19,42 +19,64 @@ use std::ops::DerefMut;
use std::time::Duration;

use anyhow::Context;
use function_name::named;
use futures::{stream, StreamExt};
use itertools::Itertools;
use parking_lot::Mutex;
use risingwave_hummock_sdk::HummockSstableObjectId;
use risingwave_pb::common::worker_node::State::Running;
use risingwave_pb::common::WorkerType;
use risingwave_pb::hummock::subscribe_compaction_event_response::Event as ResponseEvent;
use risingwave_pb::hummock::FullScanTask;

use crate::hummock::error::{Error, Result};
use crate::hummock::manager::versioning::Versioning;
use crate::hummock::manager::{commit_multi_var, create_trx_wrapper, read_lock, write_lock};
use crate::hummock::manager::{commit_multi_var, create_trx_wrapper};
use crate::hummock::HummockManager;
use crate::manager::MetadataManager;
use crate::model::{BTreeMapTransaction, BTreeMapTransactionWrapper, ValTransaction};
use crate::storage::MetaStore;

#[derive(Default)]
pub(super) struct DeleteObjectTracker {
/// Objects that waits to be deleted from object store. It comes from either compaction, or
/// full GC (listing object store).
objects_to_delete: Mutex<HashSet<HummockSstableObjectId>>,
}

impl DeleteObjectTracker {
pub(super) fn add(&self, objects: impl Iterator<Item = HummockSstableObjectId>) {
self.objects_to_delete.lock().extend(objects)
}

pub(super) fn current(&self) -> HashSet<HummockSstableObjectId> {
self.objects_to_delete.lock().clone()
}

pub(super) fn clear(&self) {
self.objects_to_delete.lock().clear();
}

pub(super) fn ack<'a>(&self, objects: impl Iterator<Item = &'a HummockSstableObjectId>) {
let mut lock = self.objects_to_delete.lock();
for object in objects {
lock.remove(object);
}
}
}

impl HummockManager {
/// Gets SST objects that is safe to be deleted from object store.
#[named]
pub async fn get_objects_to_delete(&self) -> Vec<HummockSstableObjectId> {
read_lock!(self, versioning)
.await
.objects_to_delete
pub fn get_objects_to_delete(&self) -> Vec<HummockSstableObjectId> {
self.delete_object_tracker
.current()
.iter()
.cloned()
.collect_vec()
}

/// Acknowledges SSTs have been deleted from object store.
#[named]
pub async fn ack_deleted_objects(&self, object_ids: &[HummockSstableObjectId]) -> Result<()> {
let mut versioning_guard = write_lock!(self, versioning).await;
for object_id in object_ids {
versioning_guard.objects_to_delete.remove(object_id);
}
self.delete_object_tracker.ack(object_ids.iter());
let mut versioning_guard = self.versioning.write().await;
for stale_objects in versioning_guard.checkpoint.stale_objects.values_mut() {
stale_objects.id.retain(|id| !object_ids.contains(id));
}
@@ -69,18 +91,18 @@ impl HummockManager {
/// Deletes at most `batch_size` deltas.
///
/// Returns (number of deleted deltas, number of remain `deltas_to_delete`).
#[named]
pub async fn delete_version_deltas(&self, batch_size: usize) -> Result<(usize, usize)> {
let mut versioning_guard = write_lock!(self, versioning).await;
let mut versioning_guard = self.versioning.write().await;
let versioning = versioning_guard.deref_mut();
let context_info = self.context_info.read().await;
let deltas_to_delete = versioning
.hummock_version_deltas
.range(..=versioning.checkpoint.version.id)
.map(|(k, _)| *k)
.collect_vec();
// If there is any safe point, skip this to ensure meta backup has required delta logs to
// replay version.
if !versioning.version_safe_points.is_empty() {
if !context_info.version_safe_points.is_empty() {
return Ok((0, deltas_to_delete.len()));
}
let mut hummock_version_deltas = create_trx_wrapper!(
@@ -102,6 +124,7 @@ impl HummockManager {
commit_multi_var!(self.meta_store_ref(), hummock_version_deltas)?;
#[cfg(test)]
{
drop(context_info);
drop(versioning_guard);
self.check_state_consistency().await;
}
@@ -110,16 +133,15 @@ impl HummockManager {

/// Extends `objects_to_delete` according to object store full scan result.
/// Caller should ensure `object_ids` doesn't include any SST objects belong to a on-going
/// version write. That's to say, these object_ids won't appear in either `commit_epoch` or
/// version write. That's to say, these `object_ids` won't appear in either `commit_epoch` or
/// `report_compact_task`.
#[named]
pub async fn extend_objects_to_delete_from_scan(
&self,
object_ids: &[HummockSstableObjectId],
) -> usize {
let tracked_object_ids: HashSet<HummockSstableObjectId> = {
let versioning_guard = read_lock!(self, versioning).await;
let versioning: &Versioning = &versioning_guard;
let versioning = self.versioning.read().await;
let context_info = self.context_info.read().await;

// object ids in checkpoint version
let mut tracked_object_ids = versioning.checkpoint.version.get_object_ids();
@@ -131,7 +153,7 @@ impl HummockManager {
tracked_object_ids.extend(delta.newly_added_object_ids());
}
// add stale object ids before the checkpoint version
let min_pinned_version_id = versioning.min_pinned_version_id();
let min_pinned_version_id = context_info.min_pinned_version_id();
tracked_object_ids.extend(
versioning
.checkpoint
@@ -147,9 +169,8 @@ impl HummockManager {
.iter()
.filter(|object_id| !tracked_object_ids.contains(object_id))
.collect_vec();
let mut versioning_guard = write_lock!(self, versioning).await;
versioning_guard.objects_to_delete.extend(to_delete.clone());
drop(versioning_guard);
self.delete_object_tracker
.add(to_delete.iter().map(|id| **id));
to_delete.len()
}

338 changes: 165 additions & 173 deletions src/meta/src/hummock/manager/mod.rs

Large diffs are not rendered by default.

20 changes: 10 additions & 10 deletions src/meta/src/hummock/manager/tests.rs
Original file line number Diff line number Diff line change
@@ -588,7 +588,7 @@ async fn test_hummock_manager_basic() {
);
}
// objects_to_delete is always empty because no compaction is ever invoked.
assert!(hummock_manager.get_objects_to_delete().await.is_empty());
assert!(hummock_manager.get_objects_to_delete().is_empty());
assert_eq!(
hummock_manager
.delete_version_deltas(usize::MAX)
@@ -600,7 +600,7 @@ async fn test_hummock_manager_basic() {
hummock_manager.create_version_checkpoint(1).await.unwrap(),
commit_log_count + register_log_count
);
assert!(hummock_manager.get_objects_to_delete().await.is_empty());
assert!(hummock_manager.get_objects_to_delete().is_empty());
assert_eq!(
hummock_manager
.delete_version_deltas(usize::MAX)
@@ -1125,15 +1125,15 @@ async fn test_extend_objects_to_delete() {
.map(|s| s.get_object_id())
.chain(max_committed_object_id + 1..=max_committed_object_id + orphan_sst_num)
.collect_vec();
assert!(hummock_manager.get_objects_to_delete().await.is_empty());
assert!(hummock_manager.get_objects_to_delete().is_empty());
assert_eq!(
hummock_manager
.extend_objects_to_delete_from_scan(&all_object_ids)
.await,
orphan_sst_num as usize
);
assert_eq!(
hummock_manager.get_objects_to_delete().await.len(),
hummock_manager.get_objects_to_delete().len(),
orphan_sst_num as usize
);

@@ -1143,7 +1143,7 @@ async fn test_extend_objects_to_delete() {
6
);
assert_eq!(
hummock_manager.get_objects_to_delete().await.len(),
hummock_manager.get_objects_to_delete().len(),
orphan_sst_num as usize
);
// since version1 is still pinned, the sst removed in compaction can not be reclaimed.
@@ -1153,10 +1153,10 @@ async fn test_extend_objects_to_delete() {
.await,
orphan_sst_num as usize
);
let objects_to_delete = hummock_manager.get_objects_to_delete().await;
let objects_to_delete = hummock_manager.get_objects_to_delete();
assert_eq!(objects_to_delete.len(), orphan_sst_num as usize);
let pinned_version2: HummockVersion = hummock_manager.pin_version(context_id).await.unwrap();
let objects_to_delete = hummock_manager.get_objects_to_delete().await;
let objects_to_delete = hummock_manager.get_objects_to_delete();
assert_eq!(
objects_to_delete.len(),
orphan_sst_num as usize,
@@ -1167,7 +1167,7 @@ async fn test_extend_objects_to_delete() {
.unpin_version_before(context_id, pinned_version2.id)
.await
.unwrap();
let objects_to_delete = hummock_manager.get_objects_to_delete().await;
let objects_to_delete = hummock_manager.get_objects_to_delete();
assert_eq!(
objects_to_delete.len(),
orphan_sst_num as usize,
@@ -1182,7 +1182,7 @@ async fn test_extend_objects_to_delete() {
.await,
orphan_sst_num as usize
);
let objects_to_delete = hummock_manager.get_objects_to_delete().await;
let objects_to_delete = hummock_manager.get_objects_to_delete();
assert_eq!(objects_to_delete.len(), orphan_sst_num as usize);
let new_epoch = pinned_version2.max_committed_epoch.next_epoch();
hummock_manager
@@ -1206,7 +1206,7 @@ async fn test_extend_objects_to_delete() {
.await,
orphan_sst_num as usize + 3
);
let objects_to_delete = hummock_manager.get_objects_to_delete().await;
let objects_to_delete = hummock_manager.get_objects_to_delete();
assert_eq!(objects_to_delete.len(), orphan_sst_num as usize + 3);
}

105 changes: 49 additions & 56 deletions src/meta/src/hummock/manager/versioning.rs
Original file line number Diff line number Diff line change
@@ -15,7 +15,6 @@
use std::cmp;
use std::collections::{BTreeMap, HashMap, HashSet};

use function_name::named;
use itertools::Itertools;
use risingwave_common::util::epoch::INVALID_EPOCH;
use risingwave_hummock_sdk::compaction_group::hummock_version_ext::{
@@ -38,8 +37,10 @@ use risingwave_pb::meta::subscribe_response::{Info, Operation};
use super::check_cg_write_limit;
use crate::hummock::error::Result;
use crate::hummock::manager::checkpoint::HummockVersionCheckpoint;
use crate::hummock::manager::context::ContextInfo;
use crate::hummock::manager::gc::DeleteObjectTracker;
use crate::hummock::manager::worker::{HummockManagerEvent, HummockManagerEventSender};
use crate::hummock::manager::{commit_multi_var, create_trx_wrapper, read_lock, write_lock};
use crate::hummock::manager::{commit_multi_var, create_trx_wrapper};
use crate::hummock::metrics_utils::{
trigger_safepoint_stat, trigger_write_stop_stats, LocalTableMetrics,
};
@@ -71,30 +72,21 @@ impl Drop for HummockVersionSafePoint {
#[derive(Default)]
pub struct Versioning {
// Volatile states below
/// Avoide commit epoch epochs
/// Avoid commit epoch epochs
/// Don't persist compaction version delta to meta store
pub disable_commit_epochs: bool,
/// Latest hummock version
pub current_version: HummockVersion,
/// Objects that waits to be deleted from object store. It comes from either compaction, or
/// full GC (listing object store).
pub objects_to_delete: HashSet<HummockSstableObjectId>,
/// `version_safe_points` is similar to `pinned_versions` expect for being a transient state.
pub version_safe_points: Vec<HummockVersionId>,
/// Tables that write limit is trigger for.
pub write_limit: HashMap<CompactionGroupId, WriteLimit>,
pub local_metrics: HashMap<u32, LocalTableMetrics>,

// Persistent states below
pub hummock_version_deltas: BTreeMap<HummockVersionId, HummockVersionDelta>,
pub pinned_versions: BTreeMap<HummockContextId, HummockPinnedVersion>,
pub pinned_snapshots: BTreeMap<HummockContextId, HummockPinnedSnapshot>,
/// Stats for latest hummock version.
pub version_stats: HummockVersionStats,
pub checkpoint: HummockVersionCheckpoint,
pub local_metrics: HashMap<u32, LocalTableMetrics>,
}

impl Versioning {
impl ContextInfo {
pub fn min_pinned_version_id(&self) -> HummockVersionId {
let mut min_pinned_version_id = HummockVersionId::MAX;
for id in self
@@ -107,34 +99,40 @@ impl Versioning {
}
min_pinned_version_id
}
}

impl Versioning {
/// Marks all objects <= `min_pinned_version_id` for deletion.
pub(super) fn mark_objects_for_deletion(&mut self) {
let min_pinned_version_id = self.min_pinned_version_id();
self.objects_to_delete.extend(
pub(super) fn mark_objects_for_deletion(
&self,
context_info: &ContextInfo,
delete_object_tracker: &DeleteObjectTracker,
) {
let min_pinned_version_id = context_info.min_pinned_version_id();
delete_object_tracker.add(
self.checkpoint
.stale_objects
.iter()
.filter(|(version_id, _)| **version_id <= min_pinned_version_id)
.flat_map(|(_, stale_objects)| stale_objects.id.clone()),
.flat_map(|(_, stale_objects)| stale_objects.id.iter().cloned()),
);
}
}

impl HummockManager {
#[named]
pub async fn list_pinned_version(&self) -> Vec<HummockPinnedVersion> {
read_lock!(self, versioning)
self.context_info
.read()
.await
.pinned_versions
.values()
.cloned()
.collect_vec()
}

#[named]
pub async fn list_pinned_snapshot(&self) -> Vec<HummockPinnedSnapshot> {
read_lock!(self, versioning)
self.context_info
.read()
.await
.pinned_snapshots
.values()
@@ -159,26 +157,24 @@ impl HummockManager {
Ok(workers)
}

#[named]
pub async fn get_version_stats(&self) -> HummockVersionStats {
read_lock!(self, versioning).await.version_stats.clone()
self.versioning.read().await.version_stats.clone()
}

#[named]
pub async fn register_safe_point(&self) -> HummockVersionSafePoint {
let mut wl = write_lock!(self, versioning).await;
let versioning = self.versioning.read().await;
let mut wl = self.context_info.write().await;
let safe_point = HummockVersionSafePoint {
id: wl.current_version.id,
id: versioning.current_version.id,
event_sender: self.event_sender.clone(),
};
wl.version_safe_points.push(safe_point.id);
trigger_safepoint_stat(&self.metrics, &wl.version_safe_points);
safe_point
}

#[named]
pub async fn unregister_safe_point(&self, safe_point: HummockVersionId) {
let mut wl = write_lock!(self, versioning).await;
let mut wl = self.context_info.write().await;
let version_safe_points = &mut wl.version_safe_points;
if let Some(pos) = version_safe_points.iter().position(|sp| *sp == safe_point) {
version_safe_points.remove(pos);
@@ -189,64 +185,60 @@ impl HummockManager {
/// Updates write limits for `target_groups` and sends notification.
/// Returns true if `write_limit` has been modified.
/// The implementation acquires `versioning` lock and `compaction_group_manager` lock.
#[named]
pub(super) async fn try_update_write_limits(
&self,
target_group_ids: &[CompactionGroupId],
) -> bool {
let mut guard = write_lock!(self, versioning).await;
let config_mgr = self.compaction_group_manager.read().await;
let versioning = self.versioning.read().await;
let mut cg_manager = self.compaction_group_manager.write().await;
let target_group_configs = target_group_ids
.iter()
.filter_map(|id| {
config_mgr
cg_manager
.try_get_compaction_group_config(*id)
.map(|config| (*id, config))
})
.collect();
let mut new_write_limits = calc_new_write_limits(
target_group_configs,
guard.write_limit.clone(),
&guard.current_version,
cg_manager.write_limit.clone(),
&versioning.current_version,
);
let all_group_ids: HashSet<_> =
HashSet::from_iter(get_compaction_group_ids(&guard.current_version));
HashSet::from_iter(get_compaction_group_ids(&versioning.current_version));
new_write_limits.retain(|group_id, _| all_group_ids.contains(group_id));
if new_write_limits == guard.write_limit {
if new_write_limits == cg_manager.write_limit {
return false;
}
tracing::debug!("Hummock stopped write is updated: {:#?}", new_write_limits);
trigger_write_stop_stats(&self.metrics, &new_write_limits);
guard.write_limit = new_write_limits;
cg_manager.write_limit = new_write_limits;
self.env
.notification_manager()
.notify_hummock_without_version(
Operation::Add,
Info::HummockWriteLimits(risingwave_pb::hummock::WriteLimits {
write_limits: guard.write_limit.clone(),
write_limits: cg_manager.write_limit.clone(),
}),
);
true
}

/// Gets write limits.
/// The implementation acquires `versioning` lock.
#[named]
pub async fn write_limits(&self) -> HashMap<CompactionGroupId, WriteLimit> {
let guard = read_lock!(self, versioning).await;
let guard = self.compaction_group_manager.read().await;
guard.write_limit.clone()
}

#[named]
pub async fn list_branched_objects(&self) -> BTreeMap<HummockSstableObjectId, BranchedSstInfo> {
let guard = read_lock!(self, versioning).await;
let guard = self.versioning.read().await;
guard.current_version.build_branched_sst_info()
}

#[named]
pub async fn rebuild_table_stats(&self) -> Result<()> {
use crate::model::ValTransaction;
let mut versioning = write_lock!(self, versioning).await;
let mut versioning = self.versioning.write().await;
let new_stats = rebuild_table_stats(&versioning.current_version);
let mut version_stats = create_trx_wrapper!(
self.meta_store_ref(),
@@ -370,29 +362,30 @@ mod tests {
};

use crate::hummock::compaction::compaction_config::CompactionConfigBuilder;
use crate::hummock::manager::context::ContextInfo;
use crate::hummock::manager::versioning::{
calc_new_write_limits, estimate_table_stats, rebuild_table_stats, Versioning,
calc_new_write_limits, estimate_table_stats, rebuild_table_stats,
};
use crate::hummock::model::CompactionGroup;

#[test]
fn test_min_pinned_version_id() {
let mut versioning = Versioning::default();
assert_eq!(versioning.min_pinned_version_id(), HummockVersionId::MAX);
versioning.pinned_versions.insert(
let mut context_info = ContextInfo::default();
assert_eq!(context_info.min_pinned_version_id(), HummockVersionId::MAX);
context_info.pinned_versions.insert(
1,
HummockPinnedVersion {
context_id: 1,
min_pinned_id: 10,
},
);
assert_eq!(versioning.min_pinned_version_id(), 10);
versioning.version_safe_points.push(5);
assert_eq!(versioning.min_pinned_version_id(), 5);
versioning.version_safe_points.clear();
assert_eq!(versioning.min_pinned_version_id(), 10);
versioning.pinned_versions.clear();
assert_eq!(versioning.min_pinned_version_id(), HummockVersionId::MAX);
assert_eq!(context_info.min_pinned_version_id(), 10);
context_info.version_safe_points.push(5);
assert_eq!(context_info.min_pinned_version_id(), 5);
context_info.version_safe_points.clear();
assert_eq!(context_info.min_pinned_version_id(), 10);
context_info.pinned_versions.clear();
assert_eq!(context_info.min_pinned_version_id(), HummockVersionId::MAX);
}

#[test]
6 changes: 3 additions & 3 deletions src/meta/src/hummock/vacuum.rs
Original file line number Diff line number Diff line change
@@ -95,7 +95,7 @@ impl VacuumManager {
pending_object_ids
} else {
// 2. If no pending SST objects, then fetch new ones.
let mut objects_to_delete = self.hummock_manager.get_objects_to_delete().await;
let mut objects_to_delete = self.hummock_manager.get_objects_to_delete();
self.filter_out_pinned_ssts(&mut objects_to_delete).await?;
if objects_to_delete.is_empty() {
return Ok(vec![]);
@@ -237,13 +237,13 @@ mod tests {
assert_eq!(vacuum.vacuum_metadata().await.unwrap(), 6);
assert_eq!(vacuum.vacuum_metadata().await.unwrap(), 0);

assert!(hummock_manager.get_objects_to_delete().await.is_empty());
assert!(hummock_manager.get_objects_to_delete().is_empty());
hummock_manager
.unpin_version_before(context_id, HummockVersionId::MAX)
.await
.unwrap();
hummock_manager.create_version_checkpoint(0).await.unwrap();
assert!(!hummock_manager.get_objects_to_delete().await.is_empty());
assert!(!hummock_manager.get_objects_to_delete().is_empty());
// No SST deletion is scheduled because no available worker.
assert_eq!(vacuum.vacuum_object().await.unwrap().len(), 0);
let _receiver = compactor_manager.add_compactor(context_id);
2 changes: 1 addition & 1 deletion src/meta/src/rpc/metrics.rs
Original file line number Diff line number Diff line change
@@ -449,7 +449,7 @@ impl MetaMetrics {
let hummock_manager_lock_time = register_histogram_vec_with_registry!(
"hummock_manager_lock_time",
"latency for hummock manager to acquire the rwlock",
&["method", "lock_name", "lock_type"],
&["lock_name", "lock_type"],
registry
)
.unwrap();
12 changes: 10 additions & 2 deletions src/storage/hummock_test/src/hummock_read_version_tests.rs
Original file line number Diff line number Diff line change
@@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::HashMap;
use std::ops::Bound;
use std::sync::Arc;

@@ -26,6 +27,7 @@ use risingwave_hummock_sdk::key::{key_with_epoch, map_table_key_range};
use risingwave_hummock_sdk::LocalSstableInfo;
use risingwave_meta::hummock::test_utils::setup_compute_env;
use risingwave_pb::hummock::{KeyRange, SstableInfo};
use risingwave_storage::hummock::event_handler::TEST_LOCAL_INSTANCE_ID;
use risingwave_storage::hummock::iterator::test_utils::{
iterator_test_table_key_of, iterator_test_user_key_of,
};
@@ -48,7 +50,12 @@ async fn test_read_version_basic() {
let mut epoch = test_epoch(1);
let table_id = 0;
let vnodes = Arc::new(Bitmap::ones(VirtualNode::COUNT));
let mut read_version = HummockReadVersion::new(TableId::from(table_id), pinned_version, vnodes);
let mut read_version = HummockReadVersion::new(
TableId::from(table_id),
TEST_LOCAL_INSTANCE_ID,
pinned_version,
vnodes,
);

{
// single imm
@@ -178,7 +185,7 @@ async fn test_read_version_basic() {
],
vec![],
epoch_id_vec_for_clear,
batch_id_vec_for_clear,
HashMap::from_iter([(TEST_LOCAL_INSTANCE_ID, batch_id_vec_for_clear)]),
1,
));

@@ -267,6 +274,7 @@ async fn test_read_filter_basic() {
let vnodes = Arc::new(Bitmap::ones(VirtualNode::COUNT));
let read_version = Arc::new(RwLock::new(HummockReadVersion::new(
TableId::from(table_id),
TEST_LOCAL_INSTANCE_ID,
pinned_version,
vnodes.clone(),
)));
2 changes: 1 addition & 1 deletion src/storage/src/hummock/compactor/mod.rs
Original file line number Diff line number Diff line change
@@ -510,7 +510,7 @@ pub fn start_compactor(
&request_sender,
);

continue 'start_stream;
continue 'consume_stream;
}

running_task_parallelism
352 changes: 102 additions & 250 deletions src/storage/src/hummock/event_handler/hummock_event_handler.rs

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions src/storage/src/hummock/event_handler/mod.rs
Original file line number Diff line number Diff line change
@@ -170,6 +170,7 @@ impl std::fmt::Debug for HummockEvent {
}

pub type LocalInstanceId = u64;
pub const TEST_LOCAL_INSTANCE_ID: LocalInstanceId = 233;
pub type HummockReadVersionRef = Arc<RwLock<HummockReadVersion>>;
pub type ReadVersionMappingType = HashMap<TableId, HashMap<LocalInstanceId, HummockReadVersionRef>>;
pub type ReadOnlyReadVersionMapping = ReadOnlyRwLockRef<ReadVersionMappingType>;
698 changes: 113 additions & 585 deletions src/storage/src/hummock/event_handler/uploader.rs

Large diffs are not rendered by default.

3 changes: 2 additions & 1 deletion src/storage/src/hummock/shared_buffer/shared_buffer_batch.rs
Original file line number Diff line number Diff line change
@@ -578,10 +578,11 @@ impl SharedBufferBatch {
) -> Self {
let inner =
SharedBufferBatchInner::new(epoch, spill_offset, sorted_items, None, size, None);
use crate::hummock::event_handler::TEST_LOCAL_INSTANCE_ID;
SharedBufferBatch {
inner: Arc::new(inner),
table_id,
instance_id: LocalInstanceId::default(),
instance_id: TEST_LOCAL_INSTANCE_ID,
}
}
}
204 changes: 44 additions & 160 deletions src/storage/src/hummock/store/version.rs
Original file line number Diff line number Diff line change
@@ -14,7 +14,7 @@

use std::cmp::Ordering;
use std::collections::vec_deque::VecDeque;
use std::collections::HashSet;
use std::collections::HashMap;
use std::iter::once;
use std::ops::Bound::Included;
use std::sync::Arc;
@@ -39,17 +39,19 @@ use risingwave_hummock_sdk::version::HummockVersionDelta;
use risingwave_hummock_sdk::{EpochWithGap, HummockEpoch, LocalSstableInfo};
use risingwave_pb::hummock::{EpochNewChangeLog, LevelType, SstableInfo};
use sync_point::sync_point;
use tracing::warn;

use super::StagingDataIterator;
use crate::error::StorageResult;
use crate::hummock::event_handler::LocalInstanceId;
use crate::hummock::iterator::change_log::ChangeLogIterator;
use crate::hummock::iterator::{ConcatIterator, HummockIteratorUnion, MergeIterator, UserIterator};
use crate::hummock::local_version::pinned_version::PinnedVersion;
use crate::hummock::sstable::SstableIteratorReadOptions;
use crate::hummock::sstable_store::SstableStoreRef;
use crate::hummock::utils::{
check_subset_preserve_order, filter_single_sst, prune_nonoverlapping_ssts,
prune_overlapping_ssts, range_overlap, search_sst_idx,
filter_single_sst, prune_nonoverlapping_ssts, prune_overlapping_ssts, range_overlap,
search_sst_idx,
};
use crate::hummock::{
get_from_batch, get_from_sstable_info, hit_sstable_bloom_filter, HummockError, HummockResult,
@@ -77,7 +79,8 @@ pub struct StagingSstableInfo {
/// Epochs whose data are included in the Sstable. The newer epoch comes first.
/// The field must not be empty.
epochs: Vec<HummockEpoch>,
imm_ids: Vec<ImmId>,
// newer data at the front
imm_ids: HashMap<LocalInstanceId, Vec<ImmId>>,
imm_size: usize,
}

@@ -86,7 +89,7 @@ impl StagingSstableInfo {
sstable_infos: Vec<LocalSstableInfo>,
old_value_sstable_infos: Vec<LocalSstableInfo>,
epochs: Vec<HummockEpoch>,
imm_ids: Vec<ImmId>,
imm_ids: HashMap<LocalInstanceId, Vec<ImmId>>,
imm_size: usize,
) -> Self {
// the epochs are sorted from higher epoch to lower epoch
@@ -116,15 +119,14 @@ impl StagingSstableInfo {
&self.epochs
}

pub fn imm_ids(&self) -> &Vec<ImmId> {
pub fn imm_ids(&self) -> &HashMap<LocalInstanceId, Vec<ImmId>> {
&self.imm_ids
}
}

#[derive(Clone)]
pub enum StagingData {
ImmMem(ImmutableMemtable),
MergedImmMem(ImmutableMemtable, Vec<ImmId>),
Sst(Arc<StagingSstableInfo>),
}

@@ -208,6 +210,7 @@ impl StagingVersion {
/// A container of information required for reading from hummock.
pub struct HummockReadVersion {
table_id: TableId,
instance_id: LocalInstanceId,

/// Local version for staging data.
staging: StagingVersion,
@@ -231,6 +234,7 @@ pub struct HummockReadVersion {
impl HummockReadVersion {
pub fn new_with_replication_option(
table_id: TableId,
instance_id: LocalInstanceId,
committed_version: CommittedVersion,
is_replicated: bool,
vnodes: Arc<Bitmap>,
@@ -241,6 +245,7 @@ impl HummockReadVersion {
assert!(committed_version.is_valid());
Self {
table_id,
instance_id,
table_watermarks: committed_version
.version()
.table_watermarks
@@ -265,10 +270,11 @@ impl HummockReadVersion {

pub fn new(
table_id: TableId,
instance_id: LocalInstanceId,
committed_version: CommittedVersion,
vnodes: Arc<Bitmap>,
) -> Self {
Self::new_with_replication_option(table_id, committed_version, false, vnodes)
Self::new_with_replication_option(table_id, instance_id, committed_version, false, vnodes)
}

pub fn table_id(&self) -> TableId {
@@ -297,77 +303,41 @@ impl HummockReadVersion {

self.staging.imm.push_front(imm)
}
StagingData::MergedImmMem(merged_imm, imm_ids) => {
self.add_merged_imm(merged_imm, imm_ids);
}
StagingData::Sst(staging_sst_ref) => {
// The following properties must be ensured:
// 1) self.staging.imm is sorted by imm id descendingly
// 2) staging_sst.imm_ids preserves the imm id partial
// ordering of the participating read version imms. Example:
// If staging_sst contains two read versions r1: [i1, i3] and r2: [i2, i4],
// then [i2, i1, i3, i4] is valid while [i3, i1, i2, i4] is invalid.
// 3) The intersection between staging_sst.imm_ids and self.staging.imm
// are always the suffix of self.staging.imm

// Check 1)
debug_assert!(self
.staging
.imm
.iter()
.rev()
.is_sorted_by_key(|imm| imm.batch_id()));

// Calculate intersection
let staging_imm_ids_from_imms: HashSet<u64> =
self.staging.imm.iter().map(|imm| imm.batch_id()).collect();

// intersected batch_id order from oldest to newest
let intersect_imm_ids = staging_sst_ref
.imm_ids
.iter()
.rev()
.copied()
.filter(|id| staging_imm_ids_from_imms.contains(id))
.collect_vec();

if !intersect_imm_ids.is_empty() {
// Check 2)
debug_assert!(check_subset_preserve_order(
intersect_imm_ids.iter().copied(),
self.staging.imm.iter().map(|imm| imm.batch_id()).rev(),
));

// Check 3) and replace imms with a staging sst
for imm_id in &intersect_imm_ids {
if let Some(imm) = self.staging.imm.back() {
if *imm_id == imm.batch_id() {
self.staging.imm.pop_back();
}
} else {
let local_imm_ids = self
.staging
.imm
.iter()
.map(|imm| imm.batch_id())
.collect_vec();

unreachable!(
"should not reach here staging_sst.size {},
let Some(imms) = staging_sst_ref.imm_ids.get(&self.instance_id) else {
warn!(
instance_id = self.instance_id,
"no related imm in sst input"
);
return;
};

// old data comes first
for imm_id in imms.iter().rev() {
let valid = match self.staging.imm.pop_back() {
None => false,
Some(prev_imm_id) => prev_imm_id.batch_id() == *imm_id,
};
assert!(
valid,
"should be valid staging_sst.size {},
staging_sst.imm_ids {:?},
staging_sst.epochs {:?},
local_imm_ids {:?},
intersect_imm_ids {:?}",
staging_sst_ref.imm_size,
staging_sst_ref.imm_ids,
staging_sst_ref.epochs,
local_imm_ids,
intersect_imm_ids,
);
}
}
self.staging.sst.push_front(staging_sst_ref);
instance_id {}",
staging_sst_ref.imm_size,
staging_sst_ref.imm_ids,
staging_sst_ref.epochs,
self.staging
.imm
.iter()
.map(|imm| imm.batch_id())
.collect_vec(),
self.instance_id,
);
}

self.staging.sst.push_front(staging_sst_ref);
}
},

@@ -445,92 +415,6 @@ impl HummockReadVersion {
}
}

/// `imm_ids` is the list of imm ids that are merged into this batch
/// This field is immutable. Larger imm id at the front.
pub fn add_merged_imm(&mut self, merged_imm: ImmutableMemtable, imm_ids: Vec<ImmId>) {
assert!(imm_ids.iter().rev().is_sorted());
let min_imm_id = *imm_ids.last().expect("non-empty");

let back = self.staging.imm.back().expect("should not be empty");

// pop and save imms that are written earlier than the oldest imm if there is any
let earlier_imms = if back.batch_id() < min_imm_id {
let mut earlier_imms = VecDeque::with_capacity(self.staging.imm.len());
loop {
let batch_id = self
.staging
.imm
.back()
.expect("should not be empty")
.batch_id();
match batch_id.cmp(&min_imm_id) {
Ordering::Less => {
let imm = self.staging.imm.pop_back().unwrap();
earlier_imms.push_front(imm);
}
Ordering::Equal => {
break;
}
Ordering::Greater => {
let remaining_staging_imm_ids = self
.staging
.imm
.iter()
.map(|imm| imm.batch_id())
.collect_vec();
let earlier_imm_ids =
earlier_imms.iter().map(|imm| imm.batch_id()).collect_vec();

unreachable!(
"must have break in equal: {:?} {:?} {:?}",
remaining_staging_imm_ids, earlier_imm_ids, imm_ids
)
}
}
}
Some(earlier_imms)
} else {
assert_eq!(
back.batch_id(),
min_imm_id,
"{:?} {:?}",
{
self.staging
.imm
.iter()
.map(|imm| imm.batch_id())
.collect_vec()
},
imm_ids
);
None
};

// iter from smaller imm and take the older imm at the back.
for imm_id in imm_ids.iter().rev() {
let imm = self.staging.imm.pop_back().expect("should exist");
assert_eq!(
imm.batch_id(),
*imm_id,
"{:?} {:?} {}",
{
self.staging
.imm
.iter()
.map(|imm| imm.batch_id())
.collect_vec()
},
imm_ids,
imm_id,
);
}

self.staging.imm.push_back(merged_imm);
if let Some(earlier_imms) = earlier_imms {
self.staging.imm.extend(earlier_imms);
}
}

pub fn is_replicated(&self) -> bool {
self.is_replicated
}
3 changes: 0 additions & 3 deletions src/storage/src/opts.rs
Original file line number Diff line number Diff line change
@@ -39,8 +39,6 @@ pub struct StorageOpts {
/// The shared buffer will start flushing data to object when the ratio of memory usage to the
/// shared buffer capacity exceed such ratio.
pub shared_buffer_flush_ratio: f32,
/// The threshold for the number of immutable memtables to merge to a new imm.
pub imm_merge_threshold: usize,
/// Remote directory for storing data and metadata objects.
pub data_directory: String,
/// Whether to enable write conflict detection
@@ -160,7 +158,6 @@ impl From<(&RwConfig, &SystemParamsReader, &StorageMemoryConfig)> for StorageOpt
.share_buffer_compaction_worker_threads_number,
shared_buffer_capacity_mb: s.shared_buffer_capacity_mb,
shared_buffer_flush_ratio: c.storage.shared_buffer_flush_ratio,
imm_merge_threshold: c.storage.imm_merge_threshold,
data_directory: p.data_directory().to_string(),
write_conflict_detection_enabled: c.storage.write_conflict_detection_enabled,
block_cache_capacity_mb: s.block_cache_capacity_mb,
13 changes: 13 additions & 0 deletions src/stream/src/executor/hash_join.rs
Original file line number Diff line number Diff line change
@@ -829,6 +829,19 @@ impl<K: HashKey, S: StateStore, const T: JoinTypePrimitive> HashJoinExecutor<K,

if let Some(rows) = &matched_rows {
join_matched_join_keys.observe(rows.len() as _);
if rows.len() > 10000 {
let join_key_data_types = side_update.ht.join_key_data_types();
let key = key.deserialize(join_key_data_types)?;
tracing::debug!(target: "hash_join_amplification",
matched_rows_len = rows.len(),
update_table_id = side_update.ht.table_id(),
match_table_id = side_match.ht.table_id(),
join_key = ?key,
actor_id = ctx.id,
fragment_id = ctx.fragment_id,
"large rows matched for join key"
);
}
} else {
join_matched_join_keys.observe(0.0)
}
4 changes: 4 additions & 0 deletions src/stream/src/executor/join/hash_join.rs
Original file line number Diff line number Diff line change
@@ -626,6 +626,10 @@ impl<K: HashKey, S: StateStore> JoinHashMap<K, S> {
pub fn table_id(&self) -> u32 {
self.state.table.table_id()
}

pub fn join_key_data_types(&self) -> &[DataType] {
&self.join_key_data_types
}
}

use risingwave_common_estimate_size::KvSize;

0 comments on commit 95ded68

Please sign in to comment.