Skip to content

Commit

Permalink
feat: support notification generator in sql backend
Browse files Browse the repository at this point in the history
  • Loading branch information
yezizp2012 committed Jan 30, 2024
1 parent c8030fc commit 8cb5a53
Show file tree
Hide file tree
Showing 9 changed files with 164 additions and 28 deletions.
37 changes: 37 additions & 0 deletions src/meta/model_v2/src/catalog_version.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
// Copyright 2024 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use sea_orm::entity::prelude::*;

#[derive(Clone, Copy, Debug, PartialEq, Eq, EnumIter, DeriveActiveEnum)]
#[sea_orm(rs_type = "String", db_type = "String(None)")]
pub enum VersionCategory {
#[sea_orm(string_value = "NOTIFICATION")]
Notification,
#[sea_orm(string_value = "TABLE_REVISION")]
TableRevision,
}

#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq)]
#[sea_orm(table_name = "catalog_version")]
pub struct Model {
#[sea_orm(primary_key, auto_increment = false)]
pub name: VersionCategory,
pub version: i64,
}

#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]
pub enum Relation {}

impl ActiveModelBehavior for ActiveModel {}
1 change: 1 addition & 0 deletions src/meta/model_v2/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ pub mod prelude;

pub mod actor;
pub mod actor_dispatcher;
pub mod catalog_version;
pub mod cluster;
pub mod compaction_config;
pub mod compaction_status;
Expand Down
1 change: 1 addition & 0 deletions src/meta/model_v2/src/prelude.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

pub use super::actor::Entity as Actor;
pub use super::actor_dispatcher::Entity as ActorDispatcher;
pub use super::catalog_version::Entity as CatalogVersion;
pub use super::cluster::Entity as Cluster;
pub use super::compaction_config::Entity as CompactionConfig;
pub use super::compaction_status::Entity as CompactionStatus;
Expand Down
10 changes: 4 additions & 6 deletions src/meta/src/controller/system_param.rs
Original file line number Diff line number Diff line change
Expand Up @@ -250,16 +250,14 @@ impl SystemParamsController {
}

// Notify workers of parameter change.
// TODO: add system params into snapshot to avoid periodically sync.
async fn notify_workers(&self, params: &PbSystemParams) {
self.notification_manager
.notify_frontend(Operation::Update, Info::SystemParams(params.clone()))
.await;
.notify_frontend_without_version(Operation::Update, Info::SystemParams(params.clone()));
self.notification_manager
.notify_compute(Operation::Update, Info::SystemParams(params.clone()))
.await;
.notify_compute_without_version(Operation::Update, Info::SystemParams(params.clone()));
self.notification_manager
.notify_compactor(Operation::Update, Info::SystemParams(params.clone()))
.await;
.notify_compute_without_version(Operation::Update, Info::SystemParams(params.clone()));
}
}

Expand Down
6 changes: 4 additions & 2 deletions src/meta/src/manager/env.rs
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,8 @@ impl MetaSrvEnv {
// change to sync after refactor `IdGeneratorManager::new` sync.
let id_gen_manager = Arc::new(IdGeneratorManager::new(meta_store.clone()).await);
let stream_client_pool = Arc::new(StreamClientPool::default());
let notification_manager = Arc::new(NotificationManager::new(meta_store.clone()).await);
let notification_manager =
Arc::new(NotificationManager::new(meta_store.clone(), meta_store_sql.clone()).await);
let idle_manager = Arc::new(IdleManager::new(opts.max_idle_ms));
let (mut cluster_id, cluster_first_launch) =
if let Some(id) = ClusterId::from_meta_store(&meta_store).await? {
Expand Down Expand Up @@ -455,7 +456,8 @@ impl MetaSrvEnv {
let meta_store_sql = Some(SqlMetaStore::for_test().await);

let id_gen_manager = Arc::new(IdGeneratorManager::new(meta_store.clone()).await);
let notification_manager = Arc::new(NotificationManager::new(meta_store.clone()).await);
let notification_manager =
Arc::new(NotificationManager::new(meta_store.clone(), meta_store_sql.clone()).await);
let stream_client_pool = Arc::new(StreamClientPool::default());
let idle_manager = Arc::new(IdleManager::disabled());
let (cluster_id, cluster_first_launch) = (ClusterId::new(), true);
Expand Down
1 change: 1 addition & 0 deletions src/meta/src/manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ mod id;
mod idle;
mod metadata;
mod notification;
mod notification_version;
pub mod sink_coordination;
mod streaming_job;
mod system_param;
Expand Down
38 changes: 25 additions & 13 deletions src/meta/src/manager/notification.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,10 @@ use tokio::sync::mpsc::{self, UnboundedSender};
use tokio::sync::Mutex;
use tonic::Status;

use crate::controller::SqlMetaStore;
use crate::manager::cluster::WorkerKey;
use crate::model::{FragmentId, NotificationVersion as Version};
use crate::manager::notification_version::NotificationVersionGenerator;
use crate::model::FragmentId;
use crate::storage::MetaStoreRef;

pub type MessageStatus = Status;
Expand Down Expand Up @@ -77,17 +79,19 @@ pub struct NotificationManager {
core: Arc<Mutex<NotificationManagerCore>>,
/// Sender used to add a notification into the waiting queue.
task_tx: UnboundedSender<Task>,
/// The current notification version.
current_version: Mutex<Version>,
meta_store: MetaStoreRef,
/// The current notification version generator.
version_generator: Mutex<NotificationVersionGenerator>,
}

impl NotificationManager {
pub async fn new(meta_store: MetaStoreRef) -> Self {
pub async fn new(meta_store: MetaStoreRef, meta_store_sql: Option<SqlMetaStore>) -> Self {
// notification waiting queue.
let (task_tx, mut task_rx) = mpsc::unbounded_channel::<Task>();
let core = Arc::new(Mutex::new(NotificationManagerCore::new()));
let core_clone = core.clone();
let version_generator = NotificationVersionGenerator::new(meta_store, meta_store_sql)
.await
.unwrap();

tokio::spawn(async move {
while let Some(task) = task_rx.recv().await {
Expand All @@ -105,8 +109,7 @@ impl NotificationManager {
Self {
core: core_clone,
task_tx,
current_version: Mutex::new(Version::new(&meta_store).await),
meta_store,
version_generator: Mutex::new(version_generator),
}
}

Expand Down Expand Up @@ -140,9 +143,14 @@ impl NotificationManager {
operation: Operation,
info: Info,
) -> NotificationVersion {
let mut version_guard = self.current_version.lock().await;
version_guard.increase_version(&self.meta_store).await;
let version = version_guard.version();
println!(
"heiheihei: target: {:?}, operation: {:?}, info: {:?}",
target.subscribe_type, operation, info
);

let mut version_guard = self.version_generator.lock().await;
version_guard.increase_version().await;
let version = version_guard.current_version();
self.notify(target, operation, info, Some(version));
version
}
Expand Down Expand Up @@ -252,6 +260,10 @@ impl NotificationManager {
self.notify_without_version(SubscribeType::Hummock.into(), operation, info)
}

pub fn notify_compactor_without_version(&self, operation: Operation, info: Info) {
self.notify_without_version(SubscribeType::Compactor.into(), operation, info)
}

#[cfg(any(test, feature = "test"))]
pub fn notify_hummock_with_version(
&self,
Expand Down Expand Up @@ -320,8 +332,8 @@ impl NotificationManager {
}

pub async fn current_version(&self) -> NotificationVersion {
let version_guard = self.current_version.lock().await;
version_guard.version()
let version_guard = self.version_generator.lock().await;
version_guard.current_version()
}
}

Expand Down Expand Up @@ -411,7 +423,7 @@ mod tests {

#[tokio::test]
async fn test_multiple_subscribers_one_worker() {
let mgr = NotificationManager::new(MemStore::new().into_ref()).await;
let mgr = NotificationManager::new(MemStore::new().into_ref(), None).await;
let worker_key1 = WorkerKey(HostAddress {
host: "a".to_string(),
port: 1,
Expand Down
84 changes: 84 additions & 0 deletions src/meta/src/manager/notification_version.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
// Copyright 2024 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use risingwave_meta_model_v2::catalog_version;
use risingwave_meta_model_v2::catalog_version::VersionCategory;
use risingwave_meta_model_v2::prelude::CatalogVersion;
use sea_orm::ActiveValue::Set;
use sea_orm::{ActiveModelTrait, DatabaseConnection, EntityTrait, TransactionTrait};

use crate::controller::SqlMetaStore;
use crate::model::NotificationVersion as NotificationModelV1;
use crate::storage::MetaStoreRef;
use crate::MetaResult;

pub enum NotificationVersionGenerator {
KvGenerator(NotificationModelV1, MetaStoreRef),
SqlGenerator(u64, DatabaseConnection),
}

// TODO: add pre-allocation if necessary
impl NotificationVersionGenerator {
pub async fn new(
meta_store: MetaStoreRef,
meta_store_sql: Option<SqlMetaStore>,
) -> MetaResult<Self> {
if let Some(sql) = meta_store_sql {
let txn = sql.conn.begin().await?;
let model = CatalogVersion::find_by_id(VersionCategory::Notification)
.one(&txn)
.await?;
let current_version = model.as_ref().map(|m| m.version).unwrap_or(1) as u64;
if model.is_none() {
catalog_version::ActiveModel {
name: Set(VersionCategory::Notification),
version: Set(1),
}
.insert(&txn)
.await?;
txn.commit().await?;
}

Ok(Self::SqlGenerator(current_version, sql.conn))
} else {
let current_version = NotificationModelV1::new(&meta_store).await;
Ok(Self::KvGenerator(current_version, meta_store))
}
}

pub fn current_version(&self) -> u64 {
match self {
NotificationVersionGenerator::KvGenerator(v, _) => v.version(),
NotificationVersionGenerator::SqlGenerator(v, _) => *v,
}
}

pub async fn increase_version(&mut self) {
match self {
NotificationVersionGenerator::KvGenerator(v, meta_store) => {
v.increase_version(meta_store).await
}
NotificationVersionGenerator::SqlGenerator(v, conn) => {
catalog_version::ActiveModel {
name: Set(VersionCategory::Notification),
version: Set((*v + 1) as i64),
}
.update(conn)
.await
.unwrap();
*v += 1;
}
}
}
}
14 changes: 7 additions & 7 deletions src/meta/src/manager/system_param/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,16 +152,16 @@ impl SystemParamsManager {
}

// Notify workers of parameter change.
// TODO: add system params into snapshot to avoid periodically sync.
async fn notify_workers(&self, params: &SystemParams) {
self.notification_manager
.notify_frontend(Operation::Update, Info::SystemParams(params.clone()))
.await;
self.notification_manager
.notify_compute(Operation::Update, Info::SystemParams(params.clone()))
.await;
.notify_frontend_without_version(Operation::Update, Info::SystemParams(params.clone()));
self.notification_manager
.notify_compactor(Operation::Update, Info::SystemParams(params.clone()))
.await;
.notify_compute_without_version(Operation::Update, Info::SystemParams(params.clone()));
self.notification_manager.notify_compactor_without_version(
Operation::Update,
Info::SystemParams(params.clone()),
);
}
}

Expand Down

0 comments on commit 8cb5a53

Please sign in to comment.