diff --git a/.licenserc.yaml b/.licenserc.yaml index c1745a4d1ad74..7b49108b6b2f3 100644 --- a/.licenserc.yaml +++ b/.licenserc.yaml @@ -17,6 +17,6 @@ header: - "**/*.d.ts" - "src/sqlparser/**/*.rs" - "java/connector-node/risingwave-source-cdc/src/main/java/com/risingwave/connector/cdc/debezium/internal/*.java" - - "src/meta/src/model_v2/migration/**/*.rs" + - "src/meta/model_v2/migration/**/*.rs" comment: on-failure diff --git a/Cargo.lock b/Cargo.lock index cf1f1f0e493e9..19e79820fb5e9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4779,15 +4779,6 @@ dependencies = [ "syn 1.0.109", ] -[[package]] -name = "model_migration" -version = "0.1.0" -dependencies = [ - "async-std", - "sea-orm-migration", - "uuid", -] - [[package]] name = "moka" version = "0.12.0" @@ -7695,7 +7686,6 @@ dependencies = [ "maplit", "memcomparable", "mime_guess", - "model_migration", "num-integer", "num-traits", "parking_lot 0.12.1", @@ -7709,6 +7699,8 @@ dependencies = [ "risingwave_common_heap_profiling", "risingwave_connector", "risingwave_hummock_sdk", + "risingwave_meta_model_migration", + "risingwave_meta_model_v2", "risingwave_object_store", "risingwave_pb", "risingwave_rpc_client", @@ -7730,6 +7722,25 @@ dependencies = [ "workspace-hack", ] +[[package]] +name = "risingwave_meta_model_migration" +version = "1.3.0-alpha" +dependencies = [ + "async-std", + "sea-orm-migration", + "uuid", +] + +[[package]] +name = "risingwave_meta_model_v2" +version = "1.3.0-alpha" +dependencies = [ + "risingwave_pb", + "sea-orm", + "serde", + "serde_json", +] + [[package]] name = "risingwave_meta_node" version = "1.3.0-alpha" @@ -7742,13 +7753,13 @@ dependencies = [ "madsim-etcd-client", "madsim-tokio", "madsim-tonic", - "model_migration", "prometheus-http-query", "regex", "risingwave_common", "risingwave_common_heap_profiling", "risingwave_common_service", "risingwave_meta", + "risingwave_meta_model_migration", "risingwave_meta_service", "risingwave_pb", "risingwave_rpc_client", @@ -7772,6 +7783,7 @@ dependencies = [ "risingwave_common", "risingwave_connector", "risingwave_meta", + "risingwave_meta_model_v2", "risingwave_pb", "sea-orm", "sync-point", diff --git a/Cargo.toml b/Cargo.toml index f0dd2d0443b9e..ac533e733f7a8 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -19,9 +19,10 @@ members = [ "src/java_binding", "src/jni_core", "src/meta", + "src/meta/model_v2", + "src/meta/model_v2/migration", "src/meta/node", "src/meta/service", - "src/meta/src/model_v2/migration", "src/object_store", "src/prost", "src/prost/helpers", @@ -143,6 +144,8 @@ risingwave_hummock_test = { path = "./src/storage/hummock_test" } risingwave_hummock_trace = { path = "./src/storage/hummock_trace" } risingwave_meta = { path = "./src/meta" } risingwave_meta_service = { path = "./src/meta/service" } +risingwave_meta_model_migration = { path = "src/meta/model_v2/migration" } +risingwave_meta_model_v2 = { path = "./src/meta/model_v2" } risingwave_meta_node = { path = "./src/meta/node" } risingwave_object_store = { path = "./src/object_store" } risingwave_pb = { path = "./src/prost" } diff --git a/src/meta/Cargo.toml b/src/meta/Cargo.toml index 3e96dfcc7be2f..f37c909546594 100644 --- a/src/meta/Cargo.toml +++ b/src/meta/Cargo.toml @@ -36,7 +36,6 @@ hyper = "0.14" itertools = "0.11" memcomparable = { version = "0.2" } mime_guess = "2" -model_migration = { path = "src/model_v2/migration" } num-integer = "0.1" num-traits = "0.2" parking_lot = { version = "0.12", features = ["arc_lock"] } @@ -50,6 +49,8 @@ risingwave_common = { workspace = true } risingwave_common_heap_profiling = { workspace = true } risingwave_connector = { workspace = true } risingwave_hummock_sdk = { workspace = true } +risingwave_meta_model_migration = { workspace = true } +risingwave_meta_model_v2 = { workspace = true } risingwave_object_store = { workspace = true } risingwave_pb = { workspace = true } risingwave_rpc_client = { workspace = true } diff --git a/src/meta/model_v2/Cargo.toml b/src/meta/model_v2/Cargo.toml new file mode 100644 index 0000000000000..1d9992da8a832 --- /dev/null +++ b/src/meta/model_v2/Cargo.toml @@ -0,0 +1,26 @@ +[package] +name = "risingwave_meta_model_v2" +version = { workspace = true } +edition = { workspace = true } +homepage = { workspace = true } +keywords = { workspace = true } +license = { workspace = true } +repository = { workspace = true } + +[package.metadata.cargo-machete] +ignored = ["workspace-hack"] + +[package.metadata.cargo-udeps.ignore] +normal = ["workspace-hack"] + +[dependencies] +risingwave_pb = { workspace = true } +sea-orm = { version = "0.12.0", features = [ + "sqlx-mysql", + "sqlx-postgres", + "sqlx-sqlite", + "runtime-tokio-native-tls", + "macros", +] } +serde = { version = "1", features = ["derive"] } +serde_json = "1" diff --git a/src/meta/model_v2/migration/Cargo.toml b/src/meta/model_v2/migration/Cargo.toml new file mode 100644 index 0000000000000..4745125140a22 --- /dev/null +++ b/src/meta/model_v2/migration/Cargo.toml @@ -0,0 +1,22 @@ +[package] +name = "risingwave_meta_model_migration" +version = { workspace = true } +edition = { workspace = true } +homepage = { workspace = true } +keywords = { workspace = true } +license = { workspace = true } +repository = { workspace = true } + +[package.metadata.cargo-machete] +ignored = ["workspace-hack"] + +[package.metadata.cargo-udeps.ignore] +normal = ["workspace-hack"] + +[dependencies] +async-std = { version = "1", features = ["attributes", "tokio1"] } +uuid = { version = "1", features = ["v4"] } + +[dependencies.sea-orm-migration] +version = "0.12.0" +features = ["sqlx-mysql", "sqlx-postgres", "sqlx-sqlite", "runtime-tokio-native-tls", "with-uuid"] diff --git a/src/meta/src/model_v2/migration/README.md b/src/meta/model_v2/migration/README.md similarity index 100% rename from src/meta/src/model_v2/migration/README.md rename to src/meta/model_v2/migration/README.md diff --git a/src/meta/src/model_v2/migration/src/lib.rs b/src/meta/model_v2/migration/src/lib.rs similarity index 100% rename from src/meta/src/model_v2/migration/src/lib.rs rename to src/meta/model_v2/migration/src/lib.rs diff --git a/src/meta/src/model_v2/migration/src/m20230908_072257_init.rs b/src/meta/model_v2/migration/src/m20230908_072257_init.rs similarity index 100% rename from src/meta/src/model_v2/migration/src/m20230908_072257_init.rs rename to src/meta/model_v2/migration/src/m20230908_072257_init.rs diff --git a/src/meta/src/model_v2/migration/src/m20231008_020431_hummock.rs b/src/meta/model_v2/migration/src/m20231008_020431_hummock.rs similarity index 100% rename from src/meta/src/model_v2/migration/src/m20231008_020431_hummock.rs rename to src/meta/model_v2/migration/src/m20231008_020431_hummock.rs diff --git a/src/meta/src/model_v2/migration/src/main.rs b/src/meta/model_v2/migration/src/main.rs similarity index 52% rename from src/meta/src/model_v2/migration/src/main.rs rename to src/meta/model_v2/migration/src/main.rs index 9354e45ecd198..9be884a68a11d 100644 --- a/src/meta/src/model_v2/migration/src/main.rs +++ b/src/meta/model_v2/migration/src/main.rs @@ -2,5 +2,5 @@ use sea_orm_migration::prelude::*; #[async_std::main] async fn main() { - cli::run_cli(model_migration::Migrator).await; + cli::run_cli(risingwave_meta_model_migration::Migrator).await; } diff --git a/src/meta/src/model_v2/README.md b/src/meta/model_v2/src/README.md similarity index 93% rename from src/meta/src/model_v2/README.md rename to src/meta/model_v2/src/README.md index 25c22a4f566e1..48095d3e6d67f 100644 --- a/src/meta/src/model_v2/README.md +++ b/src/meta/model_v2/src/README.md @@ -1,6 +1,6 @@ # How to define changes between versions and generate migration and model files -- Generate a new migration file and apply it to the database, check [migration](./migration/README.md) for more details. Let's take a local PG database as an example(`postgres://postgres:@localhost:5432/postgres`): +- Generate a new migration file and apply it to the database, check [migration](../migration/README.md) for more details. Let's take a local PG database as an example(`postgres://postgres:@localhost:5432/postgres`): ```sh export DATABASE_URL=postgres://postgres:@localhost:5432/postgres; cargo run -- generate MIGRATION_NAME diff --git a/src/meta/src/model_v2/actor.rs b/src/meta/model_v2/src/actor.rs similarity index 97% rename from src/meta/src/model_v2/actor.rs rename to src/meta/model_v2/src/actor.rs index 8fecb3046b1bc..79a70e3f65e95 100644 --- a/src/meta/src/model_v2/actor.rs +++ b/src/meta/model_v2/src/actor.rs @@ -14,7 +14,7 @@ use sea_orm::entity::prelude::*; -use crate::model_v2::I32Array; +use crate::I32Array; #[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq)] #[sea_orm(table_name = "actor")] diff --git a/src/meta/src/model_v2/cluster.rs b/src/meta/model_v2/src/cluster.rs similarity index 100% rename from src/meta/src/model_v2/cluster.rs rename to src/meta/model_v2/src/cluster.rs diff --git a/src/meta/src/model_v2/compaction_config.rs b/src/meta/model_v2/src/compaction_config.rs similarity index 100% rename from src/meta/src/model_v2/compaction_config.rs rename to src/meta/model_v2/src/compaction_config.rs diff --git a/src/meta/src/model_v2/compaction_status.rs b/src/meta/model_v2/src/compaction_status.rs similarity index 100% rename from src/meta/src/model_v2/compaction_status.rs rename to src/meta/model_v2/src/compaction_status.rs diff --git a/src/meta/src/model_v2/compaction_task.rs b/src/meta/model_v2/src/compaction_task.rs similarity index 100% rename from src/meta/src/model_v2/compaction_task.rs rename to src/meta/model_v2/src/compaction_task.rs diff --git a/src/meta/src/model_v2/connection.rs b/src/meta/model_v2/src/connection.rs similarity index 97% rename from src/meta/src/model_v2/connection.rs rename to src/meta/model_v2/src/connection.rs index 0096603c843a3..8cff6b2a6025b 100644 --- a/src/meta/src/model_v2/connection.rs +++ b/src/meta/model_v2/src/connection.rs @@ -17,7 +17,7 @@ use risingwave_pb::catalog::PbConnection; use sea_orm::entity::prelude::*; use sea_orm::ActiveValue; -use crate::model_v2::{ConnectionId, PrivateLinkService}; +use crate::{ConnectionId, PrivateLinkService}; #[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq)] #[sea_orm(table_name = "connection")] diff --git a/src/meta/src/model_v2/database.rs b/src/meta/model_v2/src/database.rs similarity index 81% rename from src/meta/src/model_v2/database.rs rename to src/meta/model_v2/src/database.rs index 909c12eceac5a..95ff3a8aee8e6 100644 --- a/src/meta/src/model_v2/database.rs +++ b/src/meta/model_v2/src/database.rs @@ -12,9 +12,11 @@ // See the License for the specific language governing permissions and // limitations under the License. +use risingwave_pb::catalog::PbDatabase; use sea_orm::entity::prelude::*; +use sea_orm::ActiveValue; -use crate::model_v2::DatabaseId; +use crate::DatabaseId; #[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq)] #[sea_orm(table_name = "database")] @@ -44,3 +46,12 @@ impl Related<super::object::Entity> for Entity { } impl ActiveModelBehavior for ActiveModel {} + +impl From<PbDatabase> for ActiveModel { + fn from(db: PbDatabase) -> Self { + Self { + database_id: ActiveValue::Set(db.id), + name: ActiveValue::Set(db.name), + } + } +} diff --git a/src/meta/src/model_v2/fragment.rs b/src/meta/model_v2/src/fragment.rs similarity index 98% rename from src/meta/src/model_v2/fragment.rs rename to src/meta/model_v2/src/fragment.rs index 9263dd99eabb8..c590a58da771e 100644 --- a/src/meta/src/model_v2/fragment.rs +++ b/src/meta/model_v2/src/fragment.rs @@ -14,7 +14,7 @@ use sea_orm::entity::prelude::*; -use crate::model_v2::I32Array; +use crate::I32Array; #[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq)] #[sea_orm(table_name = "fragment")] diff --git a/src/meta/src/model_v2/function.rs b/src/meta/model_v2/src/function.rs similarity index 97% rename from src/meta/src/model_v2/function.rs rename to src/meta/model_v2/src/function.rs index 663f8e2284fd7..4126dddc0f5ee 100644 --- a/src/meta/src/model_v2/function.rs +++ b/src/meta/model_v2/src/function.rs @@ -17,7 +17,7 @@ use risingwave_pb::catalog::PbFunction; use sea_orm::entity::prelude::*; use sea_orm::ActiveValue; -use crate::model_v2::{DataType, DataTypeArray, FunctionId}; +use crate::{DataType, DataTypeArray, FunctionId}; #[derive(Clone, Debug, PartialEq, Eq, EnumIter, DeriveActiveEnum)] #[sea_orm(rs_type = "String", db_type = "String(None)")] diff --git a/src/meta/src/model_v2/hummock_pinned_snapshot.rs b/src/meta/model_v2/src/hummock_pinned_snapshot.rs similarity index 100% rename from src/meta/src/model_v2/hummock_pinned_snapshot.rs rename to src/meta/model_v2/src/hummock_pinned_snapshot.rs diff --git a/src/meta/src/model_v2/hummock_pinned_version.rs b/src/meta/model_v2/src/hummock_pinned_version.rs similarity index 100% rename from src/meta/src/model_v2/hummock_pinned_version.rs rename to src/meta/model_v2/src/hummock_pinned_version.rs diff --git a/src/meta/src/model_v2/hummock_version_delta.rs b/src/meta/model_v2/src/hummock_version_delta.rs similarity index 100% rename from src/meta/src/model_v2/hummock_version_delta.rs rename to src/meta/model_v2/src/hummock_version_delta.rs diff --git a/src/meta/src/model_v2/hummock_version_stats.rs b/src/meta/model_v2/src/hummock_version_stats.rs similarity index 100% rename from src/meta/src/model_v2/hummock_version_stats.rs rename to src/meta/model_v2/src/hummock_version_stats.rs diff --git a/src/meta/src/model_v2/index.rs b/src/meta/model_v2/src/index.rs similarity index 96% rename from src/meta/src/model_v2/index.rs rename to src/meta/model_v2/src/index.rs index 3b80632e2cfc3..c85a896914240 100644 --- a/src/meta/src/model_v2/index.rs +++ b/src/meta/model_v2/src/index.rs @@ -14,7 +14,7 @@ use sea_orm::entity::prelude::*; -use crate::model_v2::{ExprNodeArray, I32Array, IndexId, JobStatus, TableId}; +use crate::{ExprNodeArray, I32Array, IndexId, JobStatus, TableId}; #[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq)] #[sea_orm(table_name = "index")] diff --git a/src/meta/src/model_v2/mod.rs b/src/meta/model_v2/src/lib.rs similarity index 99% rename from src/meta/src/model_v2/mod.rs rename to src/meta/model_v2/src/lib.rs index 1c2f928063fff..5fe23bcaa280c 100644 --- a/src/meta/src/model_v2/mod.rs +++ b/src/meta/model_v2/src/lib.rs @@ -27,7 +27,6 @@ pub mod compaction_status; pub mod compaction_task; pub mod connection; pub mod database; -pub mod ext; pub mod fragment; pub mod function; pub mod hummock_pinned_snapshot; @@ -42,7 +41,6 @@ pub mod sink; pub mod source; pub mod system_parameter; pub mod table; -pub mod trx; pub mod user; pub mod user_privilege; pub mod view; diff --git a/src/meta/src/model_v2/object.rs b/src/meta/model_v2/src/object.rs similarity index 98% rename from src/meta/src/model_v2/object.rs rename to src/meta/model_v2/src/object.rs index 5048f93a483d9..39506777068a3 100644 --- a/src/meta/src/model_v2/object.rs +++ b/src/meta/model_v2/src/object.rs @@ -14,7 +14,7 @@ use sea_orm::entity::prelude::*; -use crate::model_v2::{DatabaseId, ObjectId, SchemaId, UserId}; +use crate::{DatabaseId, ObjectId, SchemaId, UserId}; #[derive(Clone, Debug, PartialEq, Eq, Copy, EnumIter, DeriveActiveEnum)] #[sea_orm(rs_type = "String", db_type = "String(None)")] diff --git a/src/meta/src/model_v2/object_dependency.rs b/src/meta/model_v2/src/object_dependency.rs similarity index 97% rename from src/meta/src/model_v2/object_dependency.rs rename to src/meta/model_v2/src/object_dependency.rs index 53800112a7370..52ca229c6997a 100644 --- a/src/meta/src/model_v2/object_dependency.rs +++ b/src/meta/model_v2/src/object_dependency.rs @@ -14,7 +14,7 @@ use sea_orm::entity::prelude::*; -use crate::model_v2::{ObjectId, UserId}; +use crate::{ObjectId, UserId}; #[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq)] #[sea_orm(table_name = "object_dependency")] diff --git a/src/meta/src/model_v2/prelude.rs b/src/meta/model_v2/src/prelude.rs similarity index 100% rename from src/meta/src/model_v2/prelude.rs rename to src/meta/model_v2/src/prelude.rs diff --git a/src/meta/src/model_v2/schema.rs b/src/meta/model_v2/src/schema.rs similarity index 81% rename from src/meta/src/model_v2/schema.rs rename to src/meta/model_v2/src/schema.rs index 2c28665fd06f0..0af2d7fc020c9 100644 --- a/src/meta/src/model_v2/schema.rs +++ b/src/meta/model_v2/src/schema.rs @@ -12,9 +12,11 @@ // See the License for the specific language governing permissions and // limitations under the License. +use risingwave_pb::catalog::PbSchema; use sea_orm::entity::prelude::*; +use sea_orm::ActiveValue; -use crate::model_v2::SchemaId; +use crate::SchemaId; #[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq)] #[sea_orm(table_name = "schema")] @@ -43,3 +45,12 @@ impl Related<super::object::Entity> for Entity { } impl ActiveModelBehavior for ActiveModel {} + +impl From<PbSchema> for ActiveModel { + fn from(schema: PbSchema) -> Self { + Self { + schema_id: ActiveValue::Set(schema.id), + name: ActiveValue::Set(schema.name), + } + } +} diff --git a/src/meta/src/model_v2/sink.rs b/src/meta/model_v2/src/sink.rs similarity index 99% rename from src/meta/src/model_v2/sink.rs rename to src/meta/model_v2/src/sink.rs index bef46f1d7195f..21ac172246703 100644 --- a/src/meta/src/model_v2/sink.rs +++ b/src/meta/model_v2/src/sink.rs @@ -15,7 +15,7 @@ use risingwave_pb::catalog::PbSinkType; use sea_orm::entity::prelude::*; -use crate::model_v2::{ +use crate::{ ColumnCatalogArray, ColumnOrderArray, ConnectionId, I32Array, JobStatus, Property, SinkFormatDesc, SinkId, }; diff --git a/src/meta/src/model_v2/source.rs b/src/meta/model_v2/src/source.rs similarity index 99% rename from src/meta/src/model_v2/source.rs rename to src/meta/model_v2/src/source.rs index 2ad1de7914d96..620d002c27b55 100644 --- a/src/meta/src/model_v2/source.rs +++ b/src/meta/model_v2/src/source.rs @@ -14,7 +14,7 @@ use sea_orm::entity::prelude::*; -use crate::model_v2::{ +use crate::{ ColumnCatalogArray, ConnectionId, I32Array, Property, SourceId, StreamSourceInfo, TableId, WatermarkDescArray, }; diff --git a/src/meta/src/model_v2/system_parameter.rs b/src/meta/model_v2/src/system_parameter.rs similarity index 100% rename from src/meta/src/model_v2/system_parameter.rs rename to src/meta/model_v2/src/system_parameter.rs diff --git a/src/meta/src/model_v2/table.rs b/src/meta/model_v2/src/table.rs similarity index 99% rename from src/meta/src/model_v2/table.rs rename to src/meta/model_v2/src/table.rs index 08caee7009f8f..a335f41023442 100644 --- a/src/meta/src/model_v2/table.rs +++ b/src/meta/model_v2/src/table.rs @@ -16,7 +16,7 @@ use risingwave_pb::catalog::table::PbTableType; use risingwave_pb::catalog::PbHandleConflictBehavior; use sea_orm::entity::prelude::*; -use crate::model_v2::{ +use crate::{ Cardinality, ColumnCatalogArray, ColumnOrderArray, CreateType, I32Array, JobStatus, Property, SourceId, TableId, TableVersion, }; diff --git a/src/meta/src/model_v2/user.rs b/src/meta/model_v2/src/user.rs similarity index 97% rename from src/meta/src/model_v2/user.rs rename to src/meta/model_v2/src/user.rs index 0e7ab4dd17876..e9cd36f75fb43 100644 --- a/src/meta/src/model_v2/user.rs +++ b/src/meta/model_v2/src/user.rs @@ -14,7 +14,7 @@ use sea_orm::entity::prelude::*; -use crate::model_v2::UserId; +use crate::UserId; #[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq)] #[sea_orm(table_name = "user")] diff --git a/src/meta/src/model_v2/user_privilege.rs b/src/meta/model_v2/src/user_privilege.rs similarity index 97% rename from src/meta/src/model_v2/user_privilege.rs rename to src/meta/model_v2/src/user_privilege.rs index 335f716cec1c8..7e12af225ed02 100644 --- a/src/meta/src/model_v2/user_privilege.rs +++ b/src/meta/model_v2/src/user_privilege.rs @@ -14,7 +14,7 @@ use sea_orm::entity::prelude::*; -use crate::model_v2::{ObjectId, UserId}; +use crate::{ObjectId, UserId}; #[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq)] #[sea_orm(table_name = "user_privilege")] diff --git a/src/meta/src/model_v2/view.rs b/src/meta/model_v2/src/view.rs similarity index 97% rename from src/meta/src/model_v2/view.rs rename to src/meta/model_v2/src/view.rs index 8f7d22408d3f2..0de9ea64a616e 100644 --- a/src/meta/src/model_v2/view.rs +++ b/src/meta/model_v2/src/view.rs @@ -16,7 +16,7 @@ use risingwave_pb::catalog::PbView; use sea_orm::entity::prelude::*; use sea_orm::ActiveValue; -use crate::model_v2::{FieldArray, Property, ViewId}; +use crate::{FieldArray, Property, ViewId}; #[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq)] #[sea_orm(table_name = "view")] diff --git a/src/meta/model_v2/src/worker.rs b/src/meta/model_v2/src/worker.rs new file mode 100644 index 0000000000000..d164fba62b41e --- /dev/null +++ b/src/meta/model_v2/src/worker.rs @@ -0,0 +1,128 @@ +// Copyright 2023 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use risingwave_pb::common::worker_node::PbState; +use risingwave_pb::common::{PbWorkerNode, PbWorkerType}; +use sea_orm::entity::prelude::*; +use sea_orm::ActiveValue; + +use crate::{TransactionId, WorkerId}; + +#[derive(Clone, Debug, Hash, PartialEq, Eq, EnumIter, DeriveActiveEnum)] +#[sea_orm(rs_type = "String", db_type = "String(None)")] +pub enum WorkerType { + #[sea_orm(string_value = "FRONTEND")] + Frontend, + #[sea_orm(string_value = "COMPUTE_NODE")] + ComputeNode, + #[sea_orm(string_value = "RISE_CTL")] + RiseCtl, + #[sea_orm(string_value = "COMPACTOR")] + Compactor, + #[sea_orm(string_value = "META")] + Meta, +} + +impl From<PbWorkerType> for WorkerType { + fn from(worker_type: PbWorkerType) -> Self { + match worker_type { + PbWorkerType::Unspecified => unreachable!("unspecified worker type"), + PbWorkerType::Frontend => Self::Frontend, + PbWorkerType::ComputeNode => Self::ComputeNode, + PbWorkerType::RiseCtl => Self::RiseCtl, + PbWorkerType::Compactor => Self::Compactor, + PbWorkerType::Meta => Self::Meta, + } + } +} + +impl From<WorkerType> for PbWorkerType { + fn from(worker_type: WorkerType) -> Self { + match worker_type { + WorkerType::Frontend => Self::Frontend, + WorkerType::ComputeNode => Self::ComputeNode, + WorkerType::RiseCtl => Self::RiseCtl, + WorkerType::Compactor => Self::Compactor, + WorkerType::Meta => Self::Meta, + } + } +} + +#[derive(Clone, Debug, PartialEq, Eq, EnumIter, DeriveActiveEnum)] +#[sea_orm(rs_type = "String", db_type = "String(None)")] +pub enum WorkerStatus { + #[sea_orm(string_value = "STARTING")] + Starting, + #[sea_orm(string_value = "RUNNING")] + Running, +} + +impl From<PbState> for WorkerStatus { + fn from(state: PbState) -> Self { + match state { + PbState::Unspecified => unreachable!("unspecified worker status"), + PbState::Starting => Self::Starting, + PbState::Running => Self::Running, + } + } +} + +impl From<WorkerStatus> for PbState { + fn from(status: WorkerStatus) -> Self { + match status { + WorkerStatus::Starting => Self::Starting, + WorkerStatus::Running => Self::Running, + } + } +} + +impl From<&PbWorkerNode> for ActiveModel { + fn from(worker: &PbWorkerNode) -> Self { + let host = worker.host.clone().unwrap(); + Self { + worker_id: ActiveValue::Set(worker.id), + worker_type: ActiveValue::Set(worker.r#type().into()), + host: ActiveValue::Set(host.host), + port: ActiveValue::Set(host.port), + status: ActiveValue::Set(worker.state().into()), + ..Default::default() + } + } +} + +#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq)] +#[sea_orm(table_name = "worker")] +pub struct Model { + #[sea_orm(primary_key)] + pub worker_id: WorkerId, + pub worker_type: WorkerType, + pub host: String, + pub port: i32, + pub status: WorkerStatus, + pub transaction_id: Option<TransactionId>, +} + +#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)] +pub enum Relation { + #[sea_orm(has_many = "super::worker_property::Entity")] + WorkerProperty, +} + +impl Related<super::worker_property::Entity> for Entity { + fn to() -> RelationDef { + Relation::WorkerProperty.def() + } +} + +impl ActiveModelBehavior for ActiveModel {} diff --git a/src/meta/src/model_v2/worker_property.rs b/src/meta/model_v2/src/worker_property.rs similarity index 97% rename from src/meta/src/model_v2/worker_property.rs rename to src/meta/model_v2/src/worker_property.rs index 8521cbed15ce2..0512ea97e5be3 100644 --- a/src/meta/src/model_v2/worker_property.rs +++ b/src/meta/model_v2/src/worker_property.rs @@ -14,7 +14,7 @@ use sea_orm::entity::prelude::*; -use crate::model_v2::{I32Array, WorkerId}; +use crate::{I32Array, WorkerId}; #[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq)] #[sea_orm(table_name = "worker_property")] diff --git a/src/meta/node/Cargo.toml b/src/meta/node/Cargo.toml index 8c2a5aeadbe41..84793a74591c8 100644 --- a/src/meta/node/Cargo.toml +++ b/src/meta/node/Cargo.toml @@ -20,13 +20,13 @@ either = "1" etcd-client = { workspace = true } futures = { version = "0.3", default-features = false, features = ["alloc"] } itertools = "0.11" -model_migration = { path = "../src/model_v2/migration" } prometheus-http-query = "0.7" regex = "1" risingwave_common = { workspace = true } risingwave_common_heap_profiling = { workspace = true } risingwave_common_service = { workspace = true } risingwave_meta = { workspace = true } +risingwave_meta_model_migration = { workspace = true } risingwave_meta_service = { workspace = true } risingwave_pb = { workspace = true } risingwave_rpc_client = { workspace = true } diff --git a/src/meta/node/src/server.rs b/src/meta/node/src/server.rs index d922f1c37e033..d8d8525aca235 100644 --- a/src/meta/node/src/server.rs +++ b/src/meta/node/src/server.rs @@ -19,7 +19,6 @@ use either::Either; use etcd_client::ConnectOptions; use futures::future::join_all; use itertools::Itertools; -use model_migration::{Migrator, MigratorTrait}; use regex::Regex; use risingwave_common::monitor::connection::{RouterExt, TcpConfig}; use risingwave_common::telemetry::manager::TelemetryManager; @@ -28,6 +27,7 @@ use risingwave_common_service::metrics_manager::MetricsManager; use risingwave_common_service::tracing::TracingExtractLayer; use risingwave_meta::rpc::intercept::MetricsMiddlewareLayer; use risingwave_meta::rpc::ElectionClientRef; +use risingwave_meta_model_migration::{Migrator, MigratorTrait}; use risingwave_meta_service::backup_service::BackupServiceImpl; use risingwave_meta_service::cloud_service::CloudServiceImpl; use risingwave_meta_service::cluster_service::ClusterServiceImpl; diff --git a/src/meta/service/Cargo.toml b/src/meta/service/Cargo.toml index 1760ccd56a85a..87b293f64a5e6 100644 --- a/src/meta/service/Cargo.toml +++ b/src/meta/service/Cargo.toml @@ -23,6 +23,7 @@ regex = "1" risingwave_common = { workspace = true } risingwave_connector = { workspace = true } risingwave_meta = { workspace = true } +risingwave_meta_model_v2 = { workspace = true } risingwave_pb = { workspace = true } sea-orm = { version = "0.12.0", features = [ "sqlx-mysql", diff --git a/src/meta/service/src/telemetry_service.rs b/src/meta/service/src/telemetry_service.rs index 7c413406f13e5..42200e10a4eeb 100644 --- a/src/meta/service/src/telemetry_service.rs +++ b/src/meta/service/src/telemetry_service.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use risingwave_meta_model_v2::prelude::Cluster; use risingwave_pb::meta::telemetry_info_service_server::TelemetryInfoService; use risingwave_pb::meta::{GetTelemetryInfoRequest, TelemetryInfoResponse}; use sea_orm::EntityTrait; @@ -19,7 +20,6 @@ use tonic::{Request, Response, Status}; use crate::controller::SqlMetaStore; use crate::model::ClusterId; -use crate::model_v2::prelude::Cluster; use crate::storage::MetaStoreRef; use crate::MetaResult; diff --git a/src/meta/src/controller/catalog.rs b/src/meta/src/controller/catalog.rs index cb37307384aa2..daaa9b684850c 100644 --- a/src/meta/src/controller/catalog.rs +++ b/src/meta/src/controller/catalog.rs @@ -17,6 +17,13 @@ use std::iter; use itertools::Itertools; use risingwave_common::bail; use risingwave_common::catalog::{DEFAULT_SCHEMA_NAME, SYSTEM_SCHEMAS}; +use risingwave_meta_model_v2::object::ObjectType; +use risingwave_meta_model_v2::prelude::*; +use risingwave_meta_model_v2::{ + connection, database, function, index, object, object_dependency, schema, sink, source, table, + view, ConnectionId, DatabaseId, FunctionId, ObjectId, PrivateLinkService, SchemaId, SourceId, + TableId, UserId, +}; use risingwave_pb::catalog::{ PbConnection, PbDatabase, PbFunction, PbIndex, PbSchema, PbSink, PbSource, PbTable, PbView, }; @@ -40,13 +47,6 @@ use crate::controller::utils::{ }; use crate::controller::ObjectModel; use crate::manager::{MetaSrvEnv, NotificationVersion}; -use crate::model_v2::object::ObjectType; -use crate::model_v2::prelude::*; -use crate::model_v2::{ - connection, database, function, index, object, object_dependency, schema, sink, source, table, - view, ConnectionId, DatabaseId, FunctionId, ObjectId, PrivateLinkService, SchemaId, SourceId, - TableId, UserId, -}; use crate::rpc::ddl_controller::DropMode; use crate::{MetaError, MetaResult}; diff --git a/src/meta/src/controller/cluster.rs b/src/meta/src/controller/cluster.rs index ca29380a49fca..392a0def5d53f 100644 --- a/src/meta/src/controller/cluster.rs +++ b/src/meta/src/controller/cluster.rs @@ -22,6 +22,9 @@ use std::time::{Duration, SystemTime}; use itertools::Itertools; use risingwave_common::hash::ParallelUnitId; use risingwave_hummock_sdk::HummockSstableObjectId; +use risingwave_meta_model_v2::prelude::{Worker, WorkerProperty}; +use risingwave_meta_model_v2::worker::{WorkerStatus, WorkerType}; +use risingwave_meta_model_v2::{worker, worker_property, I32Array, TransactionId, WorkerId}; use risingwave_pb::common::worker_node::{PbProperty, PbState}; use risingwave_pb::common::{ HostAddress, ParallelUnit, PbHostAddress, PbParallelUnit, PbWorkerNode, PbWorkerType, @@ -39,10 +42,7 @@ use tokio::sync::oneshot::Sender; use tokio::sync::{RwLock, RwLockReadGuard}; use tokio::task::JoinHandle; -use crate::manager::prelude::{Worker, WorkerProperty}; use crate::manager::{LocalNotification, MetaSrvEnv, WorkerKey}; -use crate::model_v2::worker::{WorkerStatus, WorkerType}; -use crate::model_v2::{worker, worker_property, I32Array, TransactionId, WorkerId}; use crate::{MetaError, MetaResult}; pub type ClusterControllerRef = Arc<ClusterController>; @@ -89,64 +89,6 @@ impl From<WorkerInfo> for PbWorkerNode { } } -impl From<PbWorkerType> for WorkerType { - fn from(worker_type: PbWorkerType) -> Self { - match worker_type { - PbWorkerType::Unspecified => unreachable!("unspecified worker type"), - PbWorkerType::Frontend => Self::Frontend, - PbWorkerType::ComputeNode => Self::ComputeNode, - PbWorkerType::RiseCtl => Self::RiseCtl, - PbWorkerType::Compactor => Self::Compactor, - PbWorkerType::Meta => Self::Meta, - } - } -} - -impl From<WorkerType> for PbWorkerType { - fn from(worker_type: WorkerType) -> Self { - match worker_type { - WorkerType::Frontend => Self::Frontend, - WorkerType::ComputeNode => Self::ComputeNode, - WorkerType::RiseCtl => Self::RiseCtl, - WorkerType::Compactor => Self::Compactor, - WorkerType::Meta => Self::Meta, - } - } -} - -impl From<PbState> for WorkerStatus { - fn from(state: PbState) -> Self { - match state { - PbState::Unspecified => unreachable!("unspecified worker status"), - PbState::Starting => Self::Starting, - PbState::Running => Self::Running, - } - } -} - -impl From<WorkerStatus> for PbState { - fn from(status: WorkerStatus) -> Self { - match status { - WorkerStatus::Starting => Self::Starting, - WorkerStatus::Running => Self::Running, - } - } -} - -impl From<&PbWorkerNode> for worker::ActiveModel { - fn from(worker: &PbWorkerNode) -> Self { - let host = worker.host.clone().unwrap(); - Self { - worker_id: ActiveValue::Set(worker.id), - worker_type: ActiveValue::Set(worker.r#type().into()), - host: ActiveValue::Set(host.host), - port: ActiveValue::Set(host.port), - status: ActiveValue::Set(worker.state().into()), - ..Default::default() - } - } -} - impl ClusterController { pub async fn new(env: MetaSrvEnv, max_heartbeat_interval: Duration) -> MetaResult<Self> { let meta_store = env diff --git a/src/meta/src/controller/mod.rs b/src/meta/src/controller/mod.rs index 07793e30a17fe..d9193acd5591f 100644 --- a/src/meta/src/controller/mod.rs +++ b/src/meta/src/controller/mod.rs @@ -14,6 +14,9 @@ use anyhow::anyhow; use risingwave_common::util::epoch::Epoch; +use risingwave_meta_model_v2::{ + connection, database, index, object, schema, sink, source, table, view, +}; use risingwave_pb::catalog::connection::PbInfo as PbConnectionInfo; use risingwave_pb::catalog::source::PbOptionalAssociatedTableId; use risingwave_pb::catalog::table::{PbOptionalAssociatedSourceId, PbTableType}; @@ -21,9 +24,8 @@ use risingwave_pb::catalog::{ PbConnection, PbCreateType, PbDatabase, PbHandleConflictBehavior, PbIndex, PbSchema, PbSink, PbSinkType, PbSource, PbStreamJobStatus, PbTable, PbView, }; -use sea_orm::{ActiveValue, DatabaseConnection, ModelTrait}; +use sea_orm::{DatabaseConnection, ModelTrait}; -use crate::model_v2::{connection, database, index, object, schema, sink, source, table, view}; use crate::MetaError; #[allow(dead_code)] @@ -56,7 +58,7 @@ impl SqlMetaStore { #[cfg(any(test, feature = "test"))] #[cfg(not(madsim))] pub async fn for_test() -> Self { - use model_migration::{Migrator, MigratorTrait}; + use risingwave_meta_model_migration::{Migrator, MigratorTrait}; let conn = sea_orm::Database::connect("sqlite::memory:").await.unwrap(); Migrator::up(&conn, None).await.unwrap(); Self { conn } @@ -75,24 +77,6 @@ impl From<ObjectModel<database::Model>> for PbDatabase { } } -impl From<PbDatabase> for database::ActiveModel { - fn from(db: PbDatabase) -> Self { - Self { - database_id: ActiveValue::Set(db.id), - name: ActiveValue::Set(db.name), - } - } -} - -impl From<PbSchema> for schema::ActiveModel { - fn from(schema: PbSchema) -> Self { - Self { - schema_id: ActiveValue::Set(schema.id), - name: ActiveValue::Set(schema.name), - } - } -} - impl From<ObjectModel<schema::Model>> for PbSchema { fn from(value: ObjectModel<schema::Model>) -> Self { Self { diff --git a/src/meta/src/controller/system_param.rs b/src/meta/src/controller/system_param.rs index 0656da5ea9a46..5c9761a9a119d 100644 --- a/src/meta/src/controller/system_param.rs +++ b/src/meta/src/controller/system_param.rs @@ -21,6 +21,8 @@ use risingwave_common::system_param::{ check_missing_params, derive_missing_fields, set_system_param, }; use risingwave_common::{for_all_params, key_of}; +use risingwave_meta_model_v2::prelude::SystemParameter; +use risingwave_meta_model_v2::system_parameter; use risingwave_pb::meta::subscribe_response::{Info, Operation}; use risingwave_pb::meta::PbSystemParams; use sea_orm::{ActiveModelTrait, ActiveValue, DatabaseConnection, EntityTrait, TransactionTrait}; @@ -31,8 +33,6 @@ use tracing::info; use crate::controller::SqlMetaStore; use crate::manager::{LocalNotification, NotificationManagerRef}; -use crate::model_v2::prelude::SystemParameter; -use crate::model_v2::system_parameter; use crate::{MetaError, MetaResult}; pub type SystemParamsControllerRef = Arc<SystemParamsController>; diff --git a/src/meta/src/controller/utils.rs b/src/meta/src/controller/utils.rs index d36918db3820d..2dbd89ac92423 100644 --- a/src/meta/src/controller/utils.rs +++ b/src/meta/src/controller/utils.rs @@ -13,7 +13,13 @@ // limitations under the License. use anyhow::anyhow; -use model_migration::WithQuery; +use risingwave_meta_model_migration::WithQuery; +use risingwave_meta_model_v2::object::ObjectType; +use risingwave_meta_model_v2::prelude::*; +use risingwave_meta_model_v2::{ + connection, function, index, object, object_dependency, schema, sink, source, table, view, + DataTypeArray, DatabaseId, ObjectId, SchemaId, UserId, +}; use risingwave_pb::catalog::{PbConnection, PbFunction}; use sea_orm::sea_query::{ Alias, CommonTableExpression, Expr, Query, QueryStatementBuilder, SelectStatement, UnionType, @@ -24,12 +30,6 @@ use sea_orm::{ Order, PaginatorTrait, QueryFilter, QuerySelect, RelationTrait, Statement, }; -use crate::model_v2::object::ObjectType; -use crate::model_v2::prelude::*; -use crate::model_v2::{ - connection, function, index, object, object_dependency, schema, sink, source, table, view, - DataTypeArray, DatabaseId, ObjectId, SchemaId, UserId, -}; use crate::{MetaError, MetaResult}; /// This function will construct a query using recursive cte to find all objects[(id, `obj_type`)] that are used by the given object. diff --git a/src/meta/src/lib.rs b/src/meta/src/lib.rs index f549578f079c6..95b4ce7ead72d 100644 --- a/src/meta/src/lib.rs +++ b/src/meta/src/lib.rs @@ -42,7 +42,6 @@ pub mod error; pub mod hummock; pub mod manager; pub mod model; -pub mod model_v2; pub mod rpc; pub mod serving; pub mod storage; diff --git a/src/meta/src/manager/env.rs b/src/meta/src/manager/env.rs index 16a4bcb248b23..28d8200c73ea5 100644 --- a/src/meta/src/manager/env.rs +++ b/src/meta/src/manager/env.rs @@ -16,6 +16,7 @@ use std::ops::Deref; use std::sync::Arc; use risingwave_common::config::{CompactionConfig, DefaultParallelism}; +use risingwave_meta_model_v2::prelude::Cluster; use risingwave_pb::meta::SystemParams; use risingwave_rpc_client::{ConnectorClient, StreamClientPool, StreamClientPoolRef}; use sea_orm::EntityTrait; @@ -28,7 +29,6 @@ use crate::manager::{ NotificationManagerRef, }; use crate::model::ClusterId; -use crate::model_v2::prelude::Cluster; use crate::storage::MetaStoreRef; #[cfg(any(test, feature = "test"))] use crate::storage::{MemStore, MetaStoreBoxExt}; diff --git a/src/meta/src/manager/mod.rs b/src/meta/src/manager/mod.rs index 35642ed0ec143..e7e5208856bc3 100644 --- a/src/meta/src/manager/mod.rs +++ b/src/meta/src/manager/mod.rs @@ -28,7 +28,6 @@ pub use env::{MetaSrvEnv, *}; pub use id::*; pub use idle::*; pub use notification::{LocalNotification, MessageStatus, NotificationManagerRef, *}; +pub use risingwave_meta_model_v2::prelude; pub use streaming_job::*; pub use system_param::*; - -pub use super::model_v2::prelude; diff --git a/src/meta/src/model_v2/ext/hummock.rs b/src/meta/src/model_v2/ext/hummock.rs deleted file mode 100644 index 77111e2e7d202..0000000000000 --- a/src/meta/src/model_v2/ext/hummock.rs +++ /dev/null @@ -1,61 +0,0 @@ -// Copyright 2023 RisingWave Labs -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use risingwave_pb::hummock::HummockPinnedVersion; -use sea_orm::sea_query::OnConflict; -use sea_orm::ActiveValue::{Set, Unchanged}; -use sea_orm::EntityTrait; - -use crate::model::{MetadataModelResult, Transactional}; -use crate::model_v2::hummock_pinned_version; -use crate::model_v2::trx::Transaction; - -#[async_trait::async_trait] -impl Transactional<Transaction> for HummockPinnedVersion { - async fn upsert_in_transaction( - &self, - trx: &mut crate::model_v2::trx::Transaction, - ) -> MetadataModelResult<()> { - // TODO: error type conversion - // TODO: integer type conversion - let m = hummock_pinned_version::ActiveModel { - context_id: Unchanged(self.context_id.try_into().unwrap()), - min_pinned_id: Set(self.min_pinned_id.try_into().unwrap()), - }; - hummock_pinned_version::Entity::insert(m) - .on_conflict( - OnConflict::column(hummock_pinned_version::Column::ContextId) - .update_columns([hummock_pinned_version::Column::MinPinnedId]) - .to_owned(), - ) - .exec(trx) - .await - .unwrap(); - Ok(()) - } - - async fn delete_in_transaction( - &self, - trx: &mut crate::model_v2::trx::Transaction, - ) -> MetadataModelResult<()> { - // TODO: error type conversion - // TODO: integer type conversion - let id: i32 = self.context_id.try_into().unwrap(); - hummock_pinned_version::Entity::delete_by_id(id) - .exec(trx) - .await - .unwrap(); - Ok(()) - } -} diff --git a/src/meta/src/model_v2/ext/mod.rs b/src/meta/src/model_v2/ext/mod.rs deleted file mode 100644 index 47a5ce8623dc4..0000000000000 --- a/src/meta/src/model_v2/ext/mod.rs +++ /dev/null @@ -1,16 +0,0 @@ -// Copyright 2023 RisingWave Labs -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -mod hummock; -pub use hummock::*; diff --git a/src/meta/src/model_v2/migration/Cargo.toml b/src/meta/src/model_v2/migration/Cargo.toml deleted file mode 100644 index d5d51d77da909..0000000000000 --- a/src/meta/src/model_v2/migration/Cargo.toml +++ /dev/null @@ -1,17 +0,0 @@ -[package] -name = "model_migration" -version = "0.1.0" -edition = "2021" -publish = false - -[lib] -name = "model_migration" -path = "src/lib.rs" - -[dependencies] -async-std = { version = "1", features = ["attributes", "tokio1"] } -uuid = { version = "1", features = ["v4"] } - -[dependencies.sea-orm-migration] -version = "0.12.0" -features = ["sqlx-mysql", "sqlx-postgres", "sqlx-sqlite", "runtime-tokio-native-tls", "with-uuid"] diff --git a/src/meta/src/model_v2/trx.rs b/src/meta/src/model_v2/trx.rs deleted file mode 100644 index 4bfe6d0261de4..0000000000000 --- a/src/meta/src/model_v2/trx.rs +++ /dev/null @@ -1,276 +0,0 @@ -// Copyright 2023 RisingWave Labs -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -pub type Transaction = sea_orm::DatabaseTransaction; - -#[cfg(not(madsim))] -#[cfg(test)] -mod tests { - use std::collections::BTreeMap; - - use risingwave_pb::hummock::HummockPinnedVersion; - use sea_orm::{EntityTrait, TransactionTrait}; - - use crate::controller::SqlMetaStore; - use crate::model::{BTreeMapTransaction, ValTransaction, VarTransaction}; - use crate::model_v2::hummock_pinned_version::Model as HummockPinnedVersionModel; - use crate::model_v2::prelude::HummockPinnedVersion as HummockPinnedVersionEntity; - use crate::model_v2::trx::Transaction; - - #[tokio::test] - async fn test_simple_var_transaction_commit() { - let store = SqlMetaStore::for_test().await; - let db = &store.conn; - let mut kv = HummockPinnedVersion { - context_id: 1, - min_pinned_id: 2, - }; - let mut num_txn = VarTransaction::<'_, Transaction, _>::new(&mut kv); - num_txn.min_pinned_id = 3; - assert_eq!(num_txn.min_pinned_id, 3); - let mut txn = db.begin().await.unwrap(); - num_txn.apply_to_txn(&mut txn).await.unwrap(); - txn.commit().await.unwrap(); - let db_val = HummockPinnedVersionEntity::find_by_id(1) - .one(db) - .await - .unwrap() - .unwrap(); - assert_eq!(db_val.min_pinned_id, 3); - num_txn.commit(); - assert_eq!(kv.min_pinned_id, 3); - } - - #[test] - fn test_simple_var_transaction_abort() { - let mut kv = HummockPinnedVersion { - context_id: 1, - min_pinned_id: 11, - }; - let mut num_txn = VarTransaction::<'_, Transaction, _>::new(&mut kv); - num_txn.min_pinned_id = 2; - num_txn.abort(); - assert_eq!(11, kv.min_pinned_id); - } - - #[tokio::test] - async fn test_tree_map_transaction_commit() { - let mut map: BTreeMap<u32, HummockPinnedVersion> = BTreeMap::new(); - // to remove - map.insert( - 1, - HummockPinnedVersion { - context_id: 1, - min_pinned_id: 11, - }, - ); - // to-remove-after-modify - map.insert( - 2, - HummockPinnedVersion { - context_id: 2, - min_pinned_id: 22, - }, - ); - // first - map.insert( - 3, - HummockPinnedVersion { - context_id: 3, - min_pinned_id: 33, - }, - ); - - let mut map_copy = map.clone(); - let mut map_txn = BTreeMapTransaction::new(&mut map); - map_txn.remove(1); - map_txn.insert( - 2, - HummockPinnedVersion { - context_id: 2, - min_pinned_id: 0, - }, - ); - map_txn.remove(2); - // first - map_txn.insert( - 3, - HummockPinnedVersion { - context_id: 3, - min_pinned_id: 333, - }, - ); - // second - map_txn.insert( - 4, - HummockPinnedVersion { - context_id: 4, - min_pinned_id: 44, - }, - ); - assert_eq!( - &HummockPinnedVersion { - context_id: 4, - min_pinned_id: 44 - }, - map_txn.get(&4).unwrap() - ); - // third - map_txn.insert( - 5, - HummockPinnedVersion { - context_id: 5, - min_pinned_id: 55, - }, - ); - assert_eq!( - &HummockPinnedVersion { - context_id: 5, - min_pinned_id: 55 - }, - map_txn.get(&5).unwrap() - ); - - let mut third_entry = map_txn.get_mut(5).unwrap(); - third_entry.min_pinned_id = 555; - assert_eq!( - &HummockPinnedVersion { - context_id: 5, - min_pinned_id: 555 - }, - map_txn.get(&5).unwrap() - ); - - let store = SqlMetaStore::for_test().await; - let db = &store.conn; - let mut txn = db.begin().await.unwrap(); - map_txn.apply_to_txn(&mut txn).await.unwrap(); - txn.commit().await.unwrap(); - - let db_rows: Vec<HummockPinnedVersionModel> = - HummockPinnedVersionEntity::find().all(db).await.unwrap(); - assert_eq!(db_rows.len(), 3); - assert_eq!( - 1, - db_rows - .iter() - .filter(|m| m.context_id == 3 && m.min_pinned_id == 333) - .count() - ); - assert_eq!( - 1, - db_rows - .iter() - .filter(|m| m.context_id == 4 && m.min_pinned_id == 44) - .count() - ); - assert_eq!( - 1, - db_rows - .iter() - .filter(|m| m.context_id == 5 && m.min_pinned_id == 555) - .count() - ); - map_txn.commit(); - - // replay the change to local copy and compare - map_copy.remove(&1).unwrap(); - map_copy.insert( - 2, - HummockPinnedVersion { - context_id: 2, - min_pinned_id: 22, - }, - ); - map_copy.remove(&2).unwrap(); - map_copy.insert( - 3, - HummockPinnedVersion { - context_id: 3, - min_pinned_id: 333, - }, - ); - map_copy.insert( - 4, - HummockPinnedVersion { - context_id: 4, - min_pinned_id: 44, - }, - ); - map_copy.insert( - 5, - HummockPinnedVersion { - context_id: 5, - min_pinned_id: 555, - }, - ); - assert_eq!(map_copy, map); - } - - #[tokio::test] - async fn test_tree_map_entry_update_transaction_commit() { - let mut map: BTreeMap<u32, HummockPinnedVersion> = BTreeMap::new(); - map.insert( - 1, - HummockPinnedVersion { - context_id: 1, - min_pinned_id: 11, - }, - ); - - let mut map_txn = BTreeMapTransaction::new(&mut map); - let mut first_entry_txn = map_txn.new_entry_txn(1).unwrap(); - first_entry_txn.min_pinned_id = 111; - - let store = SqlMetaStore::for_test().await; - let db = &store.conn; - let mut txn = db.begin().await.unwrap(); - first_entry_txn.apply_to_txn(&mut txn).await.unwrap(); - txn.commit().await.unwrap(); - first_entry_txn.commit(); - - let db_rows: Vec<HummockPinnedVersionModel> = - HummockPinnedVersionEntity::find().all(db).await.unwrap(); - assert_eq!(db_rows.len(), 1); - assert_eq!( - 1, - db_rows - .iter() - .filter(|m| m.context_id == 1 && m.min_pinned_id == 111) - .count() - ); - assert_eq!(111, map.get(&1).unwrap().min_pinned_id); - } - - #[tokio::test] - async fn test_tree_map_entry_insert_transaction_commit() { - let mut map: BTreeMap<u32, HummockPinnedVersion> = BTreeMap::new(); - - let mut map_txn = BTreeMapTransaction::new(&mut map); - let first_entry_txn = map_txn.new_entry_insert_txn( - 1, - HummockPinnedVersion { - context_id: 1, - min_pinned_id: 11, - }, - ); - let store = SqlMetaStore::for_test().await; - let db = &store.conn; - let mut txn = db.begin().await.unwrap(); - first_entry_txn.apply_to_txn(&mut txn).await.unwrap(); - txn.commit().await.unwrap(); - first_entry_txn.commit(); - assert_eq!(11, map.get(&1).unwrap().min_pinned_id); - } -} diff --git a/src/meta/src/model_v2/worker.rs b/src/meta/src/model_v2/worker.rs deleted file mode 100644 index 08cdb2be34da1..0000000000000 --- a/src/meta/src/model_v2/worker.rs +++ /dev/null @@ -1,67 +0,0 @@ -// Copyright 2023 RisingWave Labs -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use sea_orm::entity::prelude::*; - -use crate::model_v2::{TransactionId, WorkerId}; - -#[derive(Clone, Debug, Hash, PartialEq, Eq, EnumIter, DeriveActiveEnum)] -#[sea_orm(rs_type = "String", db_type = "String(None)")] -pub enum WorkerType { - #[sea_orm(string_value = "FRONTEND")] - Frontend, - #[sea_orm(string_value = "COMPUTE_NODE")] - ComputeNode, - #[sea_orm(string_value = "RISE_CTL")] - RiseCtl, - #[sea_orm(string_value = "COMPACTOR")] - Compactor, - #[sea_orm(string_value = "META")] - Meta, -} - -#[derive(Clone, Debug, PartialEq, Eq, EnumIter, DeriveActiveEnum)] -#[sea_orm(rs_type = "String", db_type = "String(None)")] -pub enum WorkerStatus { - #[sea_orm(string_value = "STARTING")] - Starting, - #[sea_orm(string_value = "RUNNING")] - Running, -} - -#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq)] -#[sea_orm(table_name = "worker")] -pub struct Model { - #[sea_orm(primary_key)] - pub worker_id: WorkerId, - pub worker_type: WorkerType, - pub host: String, - pub port: i32, - pub status: WorkerStatus, - pub transaction_id: Option<TransactionId>, -} - -#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)] -pub enum Relation { - #[sea_orm(has_many = "super::worker_property::Entity")] - WorkerProperty, -} - -impl Related<super::worker_property::Entity> for Entity { - fn to() -> RelationDef { - Relation::WorkerProperty.def() - } -} - -impl ActiveModelBehavior for ActiveModel {}