Skip to content

Commit

Permalink
refactor(meta): seq generator for Hummock model v2
Browse files Browse the repository at this point in the history
  • Loading branch information
zwang28 committed Dec 29, 2023
1 parent f19ae82 commit a8a1456
Show file tree
Hide file tree
Showing 6 changed files with 177 additions and 2 deletions.
2 changes: 1 addition & 1 deletion src/meta/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ futures = { version = "0.3", default-features = false, features = ["alloc"] }
hex = "0.4"
hyper = "0.14"
itertools = "0.12"
maplit = "1.0.2"
memcomparable = { version = "0.2" }
mime_guess = "2"
num-integer = "0.1"
Expand Down Expand Up @@ -95,7 +96,6 @@ workspace-hack = { path = "../workspace-hack" }
[dev-dependencies]
assert_matches = "1"
expect-test = "1.4"
maplit = "1.0.2"
rand = "0.8"
risingwave_hummock_sdk = { workspace = true, features = ["enable_test_epoch"] }
risingwave_test_runner = { workspace = true }
Expand Down
28 changes: 27 additions & 1 deletion src/meta/model_v2/migration/src/m20231008_020431_hummock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,24 @@ impl MigrationTrait for Migration {
.to_owned(),
)
.await?;
manager
.create_table(
Table::create()
.table(HummockSequence::Table)
.col(
ColumnDef::new(HummockSequence::Name)
.string()
.not_null()
.primary_key(),
)
.col(
ColumnDef::new(HummockSequence::Seq)
.big_integer()
.not_null(),
)
.to_owned(),
)
.await?;

Ok(())
}
Expand Down Expand Up @@ -201,7 +219,8 @@ impl MigrationTrait for Migration {
HummockPinnedVersion,
HummockPinnedSnapshot,
HummockVersionDelta,
HummockVersionStats
HummockVersionStats,
HummockSequence
);
Ok(())
}
Expand Down Expand Up @@ -260,3 +279,10 @@ enum HummockVersionStats {
Id,
Stats,
}

#[derive(DeriveIden)]
enum HummockSequence {
Table,
Name,
Seq,
}
28 changes: 28 additions & 0 deletions src/meta/model_v2/src/hummock_sequence.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
// Copyright 2023 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use sea_orm::entity::prelude::*;

#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq, Default)]
#[sea_orm(table_name = "hummock_sequence")]
pub struct Model {
#[sea_orm(primary_key, auto_increment = false)]
pub name: String,
pub seq: i64,
}

#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]
pub enum Relation {}

impl ActiveModelBehavior for ActiveModel {}
1 change: 1 addition & 0 deletions src/meta/model_v2/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ pub mod fragment;
pub mod function;
pub mod hummock_pinned_snapshot;
pub mod hummock_pinned_version;
pub mod hummock_sequence;
pub mod hummock_version_delta;
pub mod hummock_version_stats;
pub mod index;
Expand Down
1 change: 1 addition & 0 deletions src/meta/src/hummock/manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ pub use versioning::HummockVersionSafePoint;
use versioning::*;
pub(crate) mod checkpoint;
mod compaction;
mod sequence;
mod worker;

use compaction::*;
Expand Down
119 changes: 119 additions & 0 deletions src/meta/src/hummock/manager/sequence.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
// Copyright 2023 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#![allow(dead_code)]

use std::collections::HashMap;
use std::num::NonZeroU32;
use std::sync::LazyLock;

use risingwave_hummock_sdk::compaction_group::StaticCompactionGroupId;
use risingwave_meta_model_v2::hummock_sequence;
use sea_orm::{ActiveModelTrait, ActiveValue, DatabaseConnection, EntityTrait, TransactionTrait};
use tokio::sync::Mutex;

use crate::MetaResult;

const COMPACTION_TASK_ID: &str = "compaction_task";
const COMPACTION_GROUP_ID: &str = "compaction_group";
const SSTABLE_OBJECT_ID: &str = "sstable_object";
const META_BACKUP_ID: &str = "meta_backup";

static SEQ_INIT: LazyLock<HashMap<String, i64>> = LazyLock::new(|| {
maplit::hashmap! {
COMPACTION_TASK_ID.into() => 1,
COMPACTION_GROUP_ID.into() => StaticCompactionGroupId::End as i64 + 1,
SSTABLE_OBJECT_ID.into() => 1,
META_BACKUP_ID.into() => 1,
}
});

pub struct SequenceGenerator {
db: Mutex<DatabaseConnection>,
}

impl SequenceGenerator {
pub fn new(db: DatabaseConnection) -> Self {
Self { db: Mutex::new(db) }
}

/// Returns start, indicates range [start, start + num).
///
/// Despite being a serial function, its infrequent invocation allows for acceptable performance.
pub async fn next_interval(&self, ident: &str, num: NonZeroU32) -> MetaResult<u64> {
let guard = self.db.lock().await;
let txn = guard.begin().await?;
let model: Option<hummock_sequence::Model> =
hummock_sequence::Entity::find_by_id(ident.to_string())
.one(&txn)
.await
.unwrap();
let start_seq = match model {
None => {
let init = SEQ_INIT
.get(ident)
.copied()
.unwrap_or_else(|| panic!("seq {ident} not found"));
let active_model = hummock_sequence::ActiveModel {
name: ActiveValue::set(ident.into()),
seq: ActiveValue::set(init + num.get() as i64),
};
active_model.insert(&txn).await?;
init
}
Some(model) => {
let start_seq = model.seq;
let mut active_model: hummock_sequence::ActiveModel = model.into();
active_model.seq = ActiveValue::set(start_seq + num.get() as i64);
active_model.update(&txn).await?;
start_seq
}
};
txn.commit().await?;
Ok(u64::try_from(start_seq).unwrap_or_else(|_| panic!("seq {ident} overflow")))
}
}

#[cfg(test)]
mod tests {
use std::num::NonZeroU32;

use crate::controller::SqlMetaStore;
use crate::hummock::manager::sequence::{SequenceGenerator, COMPACTION_TASK_ID};

#[tokio::test]
async fn test_seq_gen() {
let store = SqlMetaStore::for_test().await;
let conn = store.conn.clone();
let s = SequenceGenerator::new(conn);
assert_eq!(
1,
s.next_interval(COMPACTION_TASK_ID, NonZeroU32::new(1).unwrap())
.await
.unwrap()
);
assert_eq!(
2,
s.next_interval(COMPACTION_TASK_ID, NonZeroU32::new(10).unwrap())
.await
.unwrap()
);
assert_eq!(
12,
s.next_interval(COMPACTION_TASK_ID, NonZeroU32::new(10).unwrap())
.await
.unwrap()
);
}
}

0 comments on commit a8a1456

Please sign in to comment.