Skip to content

Commit

Permalink
feat: cherry-pick 18882 and 18946 (#19014)
Browse files Browse the repository at this point in the history
  • Loading branch information
zwang28 authored Oct 18, 2024
1 parent 8196550 commit b3b3004
Show file tree
Hide file tree
Showing 19 changed files with 352 additions and 49 deletions.
6 changes: 6 additions & 0 deletions proto/stream_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -86,9 +86,15 @@ message StreamingControlStreamResponse {
}
}

message GetMinUncommittedSstIdRequest {}
message GetMinUncommittedSstIdResponse {
uint64 min_uncommitted_sst_id = 1;
}

service StreamService {
rpc WaitEpochCommit(WaitEpochCommitRequest) returns (WaitEpochCommitResponse);
rpc StreamingControlStream(stream StreamingControlStreamRequest) returns (stream StreamingControlStreamResponse);
rpc GetMinUncommittedSstId(GetMinUncommittedSstIdRequest) returns (GetMinUncommittedSstIdResponse);
}

// TODO: Lifecycle management for actors.
8 changes: 8 additions & 0 deletions src/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,10 @@ pub struct MetaConfig {
#[serde(default = "default::meta::full_gc_object_limit")]
pub full_gc_object_limit: u64,

/// Duration in seconds to retain garbage collection history data.
#[serde(default = "default::meta::gc_history_retention_time_sec")]
pub gc_history_retention_time_sec: u64,

/// Max number of inflight time travel query.
#[serde(default = "default::meta::max_inflight_time_travel_query")]
pub max_inflight_time_travel_query: u64,
Expand Down Expand Up @@ -1355,6 +1359,10 @@ pub mod default {
3600 * 3
}

pub fn gc_history_retention_time_sec() -> u64 {
3600 * 6
}

pub fn full_gc_interval_sec() -> u64 {
600
}
Expand Down
19 changes: 19 additions & 0 deletions src/compute/src/rpc/service/stream_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

use await_tree::InstrumentAwait;
use futures::{Stream, StreamExt, TryStreamExt};
use risingwave_hummock_sdk::HummockSstableObjectId;
use risingwave_pb::stream_service::stream_service_server::StreamService;
use risingwave_pb::stream_service::*;
use risingwave_storage::dispatch_state_store;
Expand Down Expand Up @@ -87,4 +88,22 @@ impl StreamService for StreamServiceImpl {
self.mgr.handle_new_control_stream(tx, stream, init_request);
Ok(Response::new(UnboundedReceiverStream::new(rx)))
}

async fn get_min_uncommitted_sst_id(
&self,
_request: Request<GetMinUncommittedSstIdRequest>,
) -> Result<Response<GetMinUncommittedSstIdResponse>, Status> {
let min_uncommitted_sst_id = if let Some(hummock) = self.mgr.env.state_store().as_hummock()
{
hummock
.min_uncommitted_sst_id()
.await
.unwrap_or(HummockSstableObjectId::MAX)
} else {
HummockSstableObjectId::MAX
};
Ok(Response::new(GetMinUncommittedSstIdResponse {
min_uncommitted_sst_id,
}))
}
}
1 change: 1 addition & 0 deletions src/config/docs.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ This page is automatically generated by `./risedev generate-example-config`
| event_log_enabled | | true |
| full_gc_interval_sec | Interval of automatic hummock full GC. | 600 |
| full_gc_object_limit | Max number of object per full GC job can fetch. | 100000 |
| gc_history_retention_time_sec | Duration in seconds to retain garbage collection history data. | 21600 |
| hummock_time_travel_snapshot_interval | The interval at which a Hummock version snapshot is taken for time travel. Larger value indicates less storage overhead but worse query performance. | 100 |
| hummock_version_checkpoint_interval_sec | Interval of hummock version checkpoint. | 30 |
| hybrid_partition_vnode_count | Count of partitions of tables in default group and materialized view group. The meta node will decide according to some strategy whether to cut the boundaries of the file according to the vnode alignment. Each partition contains aligned data of `vnode_count / hybrid_partition_vnode_count` consecutive virtual-nodes of one state table. Set it zero to disable this feature. | 4 |
Expand Down
1 change: 1 addition & 0 deletions src/config/example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ dir = "./"
min_sst_retention_time_sec = 10800
full_gc_interval_sec = 600
full_gc_object_limit = 100000
gc_history_retention_time_sec = 21600
max_inflight_time_travel_query = 1000
periodic_compaction_interval_sec = 60
vacuum_interval_sec = 30
Expand Down
58 changes: 30 additions & 28 deletions src/meta/model_v2/migration/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,37 +22,10 @@ mod m20240726_063833_auto_schema_change;
mod m20240806_143329_add_rate_limit_to_source_catalog;
mod m20240820_081248_add_time_travel_per_table_epoch;
mod m20240911_083152_variable_vnode_count;
mod m20241016_065621_hummock_gc_history;

pub struct Migrator;

#[async_trait::async_trait]
impl MigratorTrait for Migrator {
fn migrations() -> Vec<Box<dyn MigrationTrait>> {
vec![
Box::new(m20230908_072257_init::Migration),
Box::new(m20231008_020431_hummock::Migration),
Box::new(m20240304_074901_subscription::Migration),
Box::new(m20240410_082733_with_version_column_migration::Migration),
Box::new(m20240410_154406_session_params::Migration),
Box::new(m20240417_062305_subscription_internal_table_name::Migration),
Box::new(m20240418_142249_function_runtime::Migration),
Box::new(m20240506_112555_subscription_partial_ckpt::Migration),
Box::new(m20240525_090457_secret::Migration),
Box::new(m20240617_070131_index_column_properties::Migration),
Box::new(m20240617_071625_sink_into_table_column::Migration),
Box::new(m20240618_072634_function_compressed_binary::Migration),
Box::new(m20240630_131430_remove_parallel_unit::Migration),
Box::new(m20240701_060504_hummock_time_travel::Migration),
Box::new(m20240702_080451_system_param_value::Migration),
Box::new(m20240702_084927_unnecessary_fk::Migration),
Box::new(m20240726_063833_auto_schema_change::Migration),
Box::new(m20240806_143329_add_rate_limit_to_source_catalog::Migration),
Box::new(m20240820_081248_add_time_travel_per_table_epoch::Migration),
Box::new(m20240911_083152_variable_vnode_count::Migration),
]
}
}

#[macro_export]
macro_rules! assert_not_has_tables {
($manager:expr, $( $table:ident ),+) => {
Expand Down Expand Up @@ -84,3 +57,32 @@ macro_rules! drop_tables {
)+
};
}

#[async_trait::async_trait]
impl MigratorTrait for Migrator {
fn migrations() -> Vec<Box<dyn MigrationTrait>> {
vec![
Box::new(m20230908_072257_init::Migration),
Box::new(m20231008_020431_hummock::Migration),
Box::new(m20240304_074901_subscription::Migration),
Box::new(m20240410_082733_with_version_column_migration::Migration),
Box::new(m20240410_154406_session_params::Migration),
Box::new(m20240417_062305_subscription_internal_table_name::Migration),
Box::new(m20240418_142249_function_runtime::Migration),
Box::new(m20240506_112555_subscription_partial_ckpt::Migration),
Box::new(m20240525_090457_secret::Migration),
Box::new(m20240617_070131_index_column_properties::Migration),
Box::new(m20240617_071625_sink_into_table_column::Migration),
Box::new(m20240618_072634_function_compressed_binary::Migration),
Box::new(m20240630_131430_remove_parallel_unit::Migration),
Box::new(m20240701_060504_hummock_time_travel::Migration),
Box::new(m20240702_080451_system_param_value::Migration),
Box::new(m20240702_084927_unnecessary_fk::Migration),
Box::new(m20240726_063833_auto_schema_change::Migration),
Box::new(m20240806_143329_add_rate_limit_to_source_catalog::Migration),
Box::new(m20240820_081248_add_time_travel_per_table_epoch::Migration),
Box::new(m20240911_083152_variable_vnode_count::Migration),
Box::new(m20241016_065621_hummock_gc_history::Migration),
]
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
use sea_orm_migration::prelude::*;

#[derive(DeriveMigrationName)]
pub struct Migration;

#[async_trait::async_trait]
impl MigrationTrait for Migration {
async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> {
manager
.create_table(
Table::create()
.table(HummockGcHistory::Table)
.if_not_exists()
.col(
ColumnDef::new(HummockGcHistory::ObjectId)
.big_integer()
.not_null()
.primary_key(),
)
.col(
ColumnDef::new(HummockGcHistory::MarkDeleteAt)
.date_time()
.not_null(),
)
.to_owned(),
)
.await?;
manager
.create_index(
Index::create()
.table(HummockGcHistory::Table)
.name("idx_hummock_gc_history_mark_delete_at")
.col(HummockGcHistory::MarkDeleteAt)
.to_owned(),
)
.await?;
Ok(())
}

async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> {
crate::drop_tables!(manager, HummockGcHistory);
Ok(())
}
}

#[derive(DeriveIden)]
enum HummockGcHistory {
Table,
ObjectId,
MarkDeleteAt,
}
31 changes: 31 additions & 0 deletions src/meta/model_v2/src/hummock_gc_history.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
// Copyright 2024 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use sea_orm::entity::prelude::*;
use sea_orm::{DeriveEntityModel, DeriveRelation, EnumIter};

use crate::HummockSstableObjectId;

#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq, Default)]
#[sea_orm(table_name = "hummock_gc_history")]
pub struct Model {
#[sea_orm(primary_key, auto_increment = false)]
pub object_id: HummockSstableObjectId,
pub mark_delete_at: DateTime,
}

impl ActiveModelBehavior for ActiveModel {}

#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]
pub enum Relation {}
1 change: 1 addition & 0 deletions src/meta/model_v2/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ pub mod database;
pub mod fragment;
pub mod function;
pub mod hummock_epoch_to_version;
pub mod hummock_gc_history;
pub mod hummock_pinned_snapshot;
pub mod hummock_pinned_version;
pub mod hummock_sequence;
Expand Down
1 change: 1 addition & 0 deletions src/meta/node/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -367,6 +367,7 @@ pub fn start(
min_sst_retention_time_sec: config.meta.min_sst_retention_time_sec,
full_gc_interval_sec: config.meta.full_gc_interval_sec,
full_gc_object_limit: config.meta.full_gc_object_limit,
gc_history_retention_time_sec: config.meta.gc_history_retention_time_sec,
max_inflight_time_travel_query: config.meta.max_inflight_time_travel_query,
enable_committed_sst_sanity_check: config.meta.enable_committed_sst_sanity_check,
periodic_compaction_interval_sec: config.meta.periodic_compaction_interval_sec,
Expand Down
45 changes: 43 additions & 2 deletions src/meta/src/hummock/manager/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,17 @@
use std::collections::{BTreeMap, HashMap, HashSet};

use fail::fail_point;
use futures::{stream, StreamExt};
use itertools::Itertools;
use risingwave_common::catalog::TableId;
use risingwave_hummock_sdk::version::HummockVersion;
use risingwave_hummock_sdk::{
HummockContextId, HummockSstableObjectId, HummockVersionId, LocalSstableInfo,
INVALID_VERSION_ID,
};
use risingwave_meta_model_v2::hummock_gc_history;
use risingwave_pb::hummock::{HummockPinnedVersion, ValidationTask};
use sea_orm::{DatabaseConnection, EntityTrait};

use crate::controller::SqlMetaStore;
use crate::hummock::error::{Error, Result};
Expand Down Expand Up @@ -231,7 +234,8 @@ impl HummockManager {

// HummockManager::now requires a write to the meta store. Thus, it should be avoided whenever feasible.
if !sstables.is_empty() {
// sanity check to ensure SSTs to commit have not been full GCed yet.
// Sanity check to ensure SSTs to commit have not been full GCed yet.
// TODO: since HummockManager::complete_full_gc have already filtered out SSTs by min uncommitted SST id, this sanity check can be removed.
let now = self.now().await?;
check_sst_retention(
now,
Expand All @@ -240,6 +244,10 @@ impl HummockManager {
.iter()
.map(|s| (s.sst_info.object_id, s.created_at)),
)?;
if self.env.opts.gc_history_retention_time_sec != 0 {
let ids = sstables.iter().map(|s| s.sst_info.object_id).collect_vec();
check_gc_history(&self.meta_store_ref().conn, ids).await?;
}
}

async {
Expand Down Expand Up @@ -291,7 +299,12 @@ impl HummockManager {
now,
self.env.opts.min_sst_retention_time_sec,
object_timestamps.iter().map(|(k, v)| (*k, *v)),
)
)?;
if self.env.opts.gc_history_retention_time_sec != 0 {
let ids = object_timestamps.iter().map(|(id, _)| *id).collect_vec();
check_gc_history(&self.meta_store_ref().conn, ids).await?;
}
Ok(())
}
}

Expand All @@ -309,6 +322,34 @@ fn check_sst_retention(
Ok(())
}

async fn check_gc_history(
db: &DatabaseConnection,
// need IntoIterator to work around stream's "implementation of `std::iter::Iterator` is not general enough" error.
object_ids: impl IntoIterator<Item = HummockSstableObjectId>,
) -> Result<()> {
let futures = object_ids.into_iter().map(|id| async move {
let id: risingwave_meta_model_v2::HummockSstableObjectId = id.try_into().unwrap();
hummock_gc_history::Entity::find_by_id(id)
.one(db)
.await
.map_err(Error::from)
});
let res: Vec<_> = stream::iter(futures).buffer_unordered(10).collect().await;
let res: Result<Vec<_>> = res.into_iter().collect();
let mut expired_object_ids = res?.into_iter().flatten().peekable();
if expired_object_ids.peek().is_none() {
return Ok(());
}
let expired_object_ids: Vec<_> = expired_object_ids.collect();
tracing::error!(
?expired_object_ids,
"new SSTs are rejected because they have already been GCed"
);
Err(Error::InvalidSst(
expired_object_ids[0].object_id as HummockSstableObjectId,
))
}

// pin and unpin method
impl HummockManager {
/// Pin the current greatest hummock version. The pin belongs to `context_id`
Expand Down
Loading

0 comments on commit b3b3004

Please sign in to comment.