diff --git a/.typos.toml b/.typos.toml index 052a051bd1e0..11d80ecb6c7d 100644 --- a/.typos.toml +++ b/.typos.toml @@ -10,6 +10,7 @@ steam = "stream" # You played with Steam games too much. ot = "ot" bui = "bui" mosquitto = "mosquitto" # This is a MQTT broker. +abd = "abd" [default.extend-identifiers] diff --git a/Cargo.lock b/Cargo.lock index 3cb602dd15d7..4fcd17bf1c61 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -9297,6 +9297,7 @@ dependencies = [ "itertools 0.12.0", "madsim-etcd-client", "madsim-tokio", + "memcomparable", "prost 0.12.1", "regex", "risingwave_common", @@ -9304,11 +9305,14 @@ dependencies = [ "risingwave_frontend", "risingwave_hummock_sdk", "risingwave_meta", + "risingwave_meta_model_migration", + "risingwave_meta_model_v2", "risingwave_object_store", "risingwave_pb", "risingwave_rpc_client", "risingwave_storage", "risingwave_stream", + "sea-orm", "serde", "serde_json", "serde_yaml", diff --git a/Cargo.toml b/Cargo.toml index ae63068391aa..b4adc376372b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -160,6 +160,12 @@ opentelemetry = "0.21" opentelemetry-otlp = "0.14" opentelemetry_sdk = { version = "0.21", default-features = false } opentelemetry-semantic-conventions = "0.13" +sea-orm = { version = "0.12.14", features = [ + "sqlx-mysql", + "sqlx-postgres", + "sqlx-sqlite", + "runtime-tokio-native-tls", +] } tokio-util = "0.7" tracing-opentelemetry = "0.22" diff --git a/ci/scripts/e2e-sql-migration.sh b/ci/scripts/e2e-sql-migration.sh new file mode 100755 index 000000000000..46502844eada --- /dev/null +++ b/ci/scripts/e2e-sql-migration.sh @@ -0,0 +1,89 @@ +#!/usr/bin/env bash + +# Exits as soon as any line fails. +set -euo pipefail + +export RW_PREFIX=$PWD/.risingwave +export RW_PREFIX_DATA=$RW_PREFIX/data + +source ci/scripts/common.sh + +wait_for_recovery() { + set +e + timeout 20s bash -c ' + while true; do + echo "Polling every 1s to check if the recovery is complete for 20s" + if psql -h localhost -p 4566 -d dev -U root -c "FLUSH;" &2 + exit 1 + ;; + : ) + echo "Invalid option: $OPTARG requires an argument" 1>&2 + ;; + esac +done +shift $((OPTIND -1)) + +download_and_prepare_rw "$profile" common + +echo "--- starting risingwave cluster, ci-1cn-1fe-with-recovery" +RUST_LOG="info,risingwave_stream=info,risingwave_batch=info,risingwave_storage=info" \ +cargo make ci-start ci-1cn-1fe-with-recovery + +echo "--- init cluster with some data & DDL" +sqllogictest -d dev -h localhost -p 4566 './e2e_test/sql_migration/prepare.slt' + +echo "--- kill cluster" +cargo make ci-kill + +echo "--- restart etcd" +cargo make dev ci-meta-etcd-for-migration + +echo "--- run migration" +mkdir -p "${RW_PREFIX_DATA}/sqlite/" +./target/debug/risingwave risectl \ +meta \ +migration \ +--etcd-endpoints localhost:2388 \ +--sql-endpoint sqlite://"${RW_PREFIX_DATA}/sqlite/metadata.db"\?mode=rwc \ +-f + +echo "--- kill etcd" +cargo make ci-kill + +echo "--- starting risingwave cluster, meta-1cn-1fe-sqlite" +cargo make dev meta-1cn-1fe-sqlite + +echo "--- wait for recovery" +wait_for_recovery + +echo "--- run check" +sqllogictest -d dev -h localhost -p 4566 './e2e_test/sql_migration/check.slt' + +echo "--- kill cluster" +cargo make kill + +echo "--- clean data" +cargo make clean-data + diff --git a/ci/workflows/pull-request.yml b/ci/workflows/pull-request.yml index 02b33df3b513..f9bd1fe6954c 100644 --- a/ci/workflows/pull-request.yml +++ b/ci/workflows/pull-request.yml @@ -715,6 +715,22 @@ steps: timeout_in_minutes: 30 retry: *auto-retry + - label: "e2e sql migration test" + command: "ci/scripts/e2e-sql-migration.sh -p ci-dev" + if: build.pull_request.labels includes "ci/run-e2e-sql-migration-tests" || build.env("CI_STEPS") =~ /(^|,)e2e-sql-migration-tests?(,|$$)/ + depends_on: + - "build" + - "build-other" + - "docslt" + plugins: + - docker-compose#v5.1.0: + run: rw-build-env + config: ci/docker-compose.yml + mount-buildkite-agent: true + - ./ci/plugins/upload-failure-logs + timeout_in_minutes: 10 + retry: *auto-retry + # FIXME(kwannoel): Let the github PR labeller label it, if sqlsmith source files has changes. - label: "fuzz test" command: "ci/scripts/pr-fuzz-test.sh -p ci-dev" diff --git a/e2e_test/sql_migration/check.slt b/e2e_test/sql_migration/check.slt new file mode 100644 index 000000000000..b4c97bba50bf --- /dev/null +++ b/e2e_test/sql_migration/check.slt @@ -0,0 +1,105 @@ +statement ok +SET RW_IMPLICIT_FLUSH TO true; + +query T rowsort +show databases; +---- +db1 +dev + +query T rowsort +show schemas; +---- +information_schema +pg_catalog +public +rw_catalog +schema1 + +query T +SELECT setting FROM pg_catalog.pg_settings where name = 'max_concurrent_creating_streaming_jobs'; +---- +4 + +query T rowsort +select name, relation_type from rw_relations where relation_type != 'system table' AND relation_type != 'view'; +---- +ddl_subscription_table subscription +idx1 index +m_simple table +mv1 materialized view +mv2 materialized view +s_simple_1 sink +sink sink +src source +t1 table +t_simple table + +query T +show views; +---- +v1 + +query T +select name, type_, provider from rw_connections; +---- +conn0 MOCK PRIVATELINK + +query TTTTT +show functions; +---- +int_42 (empty) integer javascript NULL + +statement ok +insert into t1 select * from generate_series(1, 1000); + +query I +select count(*) from t1; +---- +2000 + +statement ok +create materialized view mv3 as select * from mv2; + +statement ok +REVOKE ALL PRIVILEGES ON ALL TABLES IN SCHEMA schema1 FROM user1; + +statement error Permission denied +drop source src; + +statement ok +drop source src cascade; + +statement ok +drop connection conn0; + +statement ok +drop function int_42; + +statement ok +drop sink s_simple_1; + +statement error Permission denied +drop table t1; + +statement ok +drop table t1 cascade; + +statement ok +drop table t_simple; + +statement ok +drop table m_simple; + +statement ok +drop user user1; + +statement ok +drop schema schema1; + +statement ok +drop database db1; + +query T +select name, relation_type from rw_relations where relation_type != 'system table' AND relation_type != 'view'; +---- diff --git a/e2e_test/sql_migration/prepare.slt b/e2e_test/sql_migration/prepare.slt new file mode 100644 index 000000000000..f0669a4c6b29 --- /dev/null +++ b/e2e_test/sql_migration/prepare.slt @@ -0,0 +1,68 @@ +statement ok +SET RW_IMPLICIT_FLUSH TO true; + +statement ok +create database db1; + +statement ok +create schema schema1; + +statement ok +ALTER SYSTEM SET max_concurrent_creating_streaming_jobs TO 4; + +statement ok +create source src (v int) with ( + connector = 'datagen', + fields.v.kind = 'sequence', + fields.v.start = '1', + fields.v.end = '10', + datagen.rows.per.second='15', + datagen.split.num = '1' +) FORMAT PLAIN ENCODE JSON; + +statement ok +create table t1(v1 int); + +statement ok +create materialized view mv1 as select * from t1; + +statement ok +create materialized view mv2 as select * from src; + +statement ok +create view v1 as select * from mv1; + +statement ok +CREATE SINK sink FROM mv2 WITH (connector='blackhole'); + +statement ok +create user user1; + +statement ok +GRANT ALL PRIVILEGES ON ALL TABLES IN SCHEMA schema1 TO user1; + +statement ok +CREATE CONNECTION conn0 WITH (type = 'privatelink', provider = 'mock'); + +statement ok +create index idx1 on t1(v1); + +statement ok +create table t_simple (v1 int, v2 int); + +statement ok +create table m_simple (v1 int primary key, v2 int); + +statement ok +create sink s_simple_1 into m_simple as select v1, v2 from t_simple; + +statement ok +create subscription ddl_subscription_table from mv2 with(retention = '1D'); + +statement ok +insert into t1 select * from generate_series(1, 1000); + +statement ok +create function int_42() returns int language javascript as $$ + return 42; +$$; diff --git a/risedev.yml b/risedev.yml index 1af18e20dc0f..53eda3ec3c2c 100644 --- a/risedev.yml +++ b/risedev.yml @@ -898,6 +898,11 @@ profile: - use: etcd - use: minio + ci-meta-etcd-for-migration: + config-path: src/config/ci.toml + steps: + - use: etcd + ci-iceberg-test: config-path: src/config/ci-iceberg-test.toml steps: diff --git a/src/ctl/Cargo.toml b/src/ctl/Cargo.toml index a5c67912f491..20af56afb98e 100644 --- a/src/ctl/Cargo.toml +++ b/src/ctl/Cargo.toml @@ -24,6 +24,7 @@ futures = { version = "0.3", default-features = false, features = ["alloc"] } hex = "0.4" inquire = "0.7.0" itertools = "0.12" +memcomparable = "0.2" prost = { workspace = true } regex = "1.10.0" risingwave_common = { workspace = true } @@ -31,11 +32,14 @@ risingwave_connector = { workspace = true } risingwave_frontend = { workspace = true } risingwave_hummock_sdk = { workspace = true } risingwave_meta = { workspace = true } +risingwave_meta_model_migration = { workspace = true } +risingwave_meta_model_v2 = { workspace = true } risingwave_object_store = { workspace = true } risingwave_pb = { workspace = true } risingwave_rpc_client = { workspace = true } risingwave_storage = { workspace = true } risingwave_stream = { workspace = true } +sea-orm = { workspace = true } serde = "1" serde_json = "1" serde_yaml = "0.9.25" diff --git a/src/ctl/src/cmd_impl/meta.rs b/src/ctl/src/cmd_impl/meta.rs index 2996553eaafd..8920839c7b0d 100644 --- a/src/ctl/src/cmd_impl/meta.rs +++ b/src/ctl/src/cmd_impl/meta.rs @@ -15,6 +15,7 @@ mod backup_meta; mod cluster_info; mod connection; +mod migration; mod pause_resume; mod reschedule; mod serving; @@ -22,6 +23,7 @@ mod serving; pub use backup_meta::*; pub use cluster_info::*; pub use connection::*; +pub use migration::*; pub use pause_resume::*; pub use reschedule::*; pub use serving::*; diff --git a/src/ctl/src/cmd_impl/meta/migration.rs b/src/ctl/src/cmd_impl/meta/migration.rs new file mode 100644 index 000000000000..5e3806e93466 --- /dev/null +++ b/src/ctl/src/cmd_impl/meta/migration.rs @@ -0,0 +1,845 @@ +// Copyright 2024 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::{BTreeSet, HashMap}; +use std::time::Duration; + +use anyhow::Context; +use etcd_client::ConnectOptions; +use itertools::Itertools; +use risingwave_common::util::epoch::Epoch; +use risingwave_common::util::stream_graph_visitor::visit_stream_node_tables; +use risingwave_hummock_sdk::compaction_group::StaticCompactionGroupId; +use risingwave_hummock_sdk::version::HummockVersionDelta; +use risingwave_meta::controller::catalog::CatalogController; +use risingwave_meta::controller::system_param::system_params_to_model; +use risingwave_meta::controller::SqlMetaStore; +use risingwave_meta::hummock::compaction::CompactStatus; +use risingwave_meta::hummock::model::CompactionGroup; +use risingwave_meta::manager::model::SystemParamsModel; +use risingwave_meta::model; +use risingwave_meta::model::{ClusterId, MetadataModel, NotificationVersion, TableParallelism}; +use risingwave_meta::storage::{ + EtcdMetaStore, MetaStore, MetaStoreBoxExt, MetaStoreError, MetaStoreRef, + WrappedEtcdClient as EtcdClient, DEFAULT_COLUMN_FAMILY, +}; +use risingwave_meta::stream::TableRevision; +use risingwave_meta_model_migration::{Migrator, MigratorTrait}; +use risingwave_meta_model_v2::catalog_version::VersionCategory; +use risingwave_meta_model_v2::compaction_status::LevelHandlers; +use risingwave_meta_model_v2::fragment::StreamNode; +use risingwave_meta_model_v2::hummock_sequence::{ + COMPACTION_GROUP_ID, COMPACTION_TASK_ID, META_BACKUP_ID, SSTABLE_OBJECT_ID, +}; +use risingwave_meta_model_v2::hummock_version_stats::TableStats; +use risingwave_meta_model_v2::object::ObjectType; +use risingwave_meta_model_v2::prelude::{ + Actor, ActorDispatcher, CatalogVersion, Cluster, Connection, Database, Fragment, Function, + Index, Object, ObjectDependency, Schema, Sink, Source, StreamingJob, Subscription, + SystemParameter, Table, User, UserPrivilege, View, Worker, WorkerProperty, +}; +use risingwave_meta_model_v2::{ + catalog_version, cluster, compaction_config, compaction_status, compaction_task, connection, + database, function, hummock_pinned_snapshot, hummock_pinned_version, hummock_sequence, + hummock_version_delta, hummock_version_stats, index, object, object_dependency, schema, sink, + source, streaming_job, subscription, table, user, user_privilege, view, worker, + worker_property, CreateType, JobStatus, ObjectId, StreamingParallelism, +}; +use risingwave_pb::catalog::table::PbTableType; +use risingwave_pb::catalog::{ + PbConnection, PbDatabase, PbFunction, PbIndex, PbSchema, PbSink, PbSource, PbSubscription, + PbTable, PbView, +}; +use risingwave_pb::common::WorkerType; +use risingwave_pb::hummock::{ + CompactTaskAssignment, HummockPinnedSnapshot, HummockPinnedVersion, HummockVersionStats, +}; +use risingwave_pb::meta::table_fragments::State; +use risingwave_pb::meta::PbSystemParams; +use risingwave_pb::user::grant_privilege::PbObject as GrantObject; +use risingwave_pb::user::PbUserInfo; +use sea_orm::prelude::DateTime; +use sea_orm::ActiveValue::Set; +use sea_orm::{ + ColumnTrait, ConnectionTrait, DatabaseBackend, DbBackend, EntityTrait, IntoActiveModel, NotSet, + QueryFilter, QuerySelect, Statement, +}; +use thiserror_ext::AsReport; +use uuid::Uuid; + +pub struct EtcdBackend { + pub(crate) endpoints: Vec, + pub(crate) credentials: Option<(String, String)>, +} + +pub async fn migrate(from: EtcdBackend, target: String, force_clean: bool) -> anyhow::Result<()> { + // 1. init etcd meta store. + let mut options = + ConnectOptions::default().with_keep_alive(Duration::from_secs(3), Duration::from_secs(5)); + if let Some((username, password)) = &from.credentials { + options = options.with_user(username, password) + } + let auth_enabled = from.credentials.is_some(); + let client = EtcdClient::connect(from.endpoints.clone(), Some(options.clone()), auth_enabled) + .await + .context("failed to connect etcd")?; + let meta_store = EtcdMetaStore::new(client).into_ref(); + + // 2. init sql meta store. + let mut options = sea_orm::ConnectOptions::new(target); + options + .max_connections(10) + .connect_timeout(Duration::from_secs(10)) + .idle_timeout(Duration::from_secs(30)); + let conn = sea_orm::Database::connect(options).await?; + let meta_store_sql = SqlMetaStore::new(conn); + + if force_clean { + Migrator::down(&meta_store_sql.conn, None) + .await + .expect("failed to clean sql backend"); + } + Migrator::up(&meta_store_sql.conn, None) + .await + .expect("failed to init sql backend"); + + // cluster Id. + let cluster_id: Uuid = ClusterId::from_meta_store(&meta_store) + .await? + .expect("cluster id not found") + .parse()?; + + let generated_cluster_id: Uuid = Cluster::find() + .select_only() + .column(cluster::Column::ClusterId) + .into_tuple() + .one(&meta_store_sql.conn) + .await? + .expect("cluster id not found"); + + Cluster::update_many() + .col_expr(cluster::Column::ClusterId, cluster_id.into()) + .filter(cluster::Column::ClusterId.eq(generated_cluster_id)) + .exec(&meta_store_sql.conn) + .await?; + println!("cluster id updated to {}", cluster_id); + + // system parameters. + let system_parameters = PbSystemParams::get(&meta_store) + .await? + .expect("system parameters not found"); + SystemParameter::insert_many(system_params_to_model(&system_parameters)?) + .exec(&meta_store_sql.conn) + .await?; + println!("system parameters migrated"); + + // workers. + let workers = model::Worker::list(&meta_store).await?; + for worker in workers { + Worker::insert(worker::ActiveModel::from(&worker.worker_node)) + .exec(&meta_store_sql.conn) + .await?; + if worker.worker_type() == WorkerType::ComputeNode { + let pb_property = worker.worker_node.property.as_ref().unwrap(); + let parallel_unit_ids = worker + .worker_node + .parallel_units + .iter() + .map(|pu| pu.id as i32) + .collect_vec(); + let property = worker_property::ActiveModel { + worker_id: Set(worker.worker_id() as _), + parallel_unit_ids: Set(parallel_unit_ids.into()), + is_streaming: Set(pb_property.is_streaming), + is_serving: Set(pb_property.is_serving), + is_unschedulable: Set(pb_property.is_unschedulable), + }; + WorkerProperty::insert(property) + .exec(&meta_store_sql.conn) + .await?; + } + } + println!("worker nodes migrated"); + + // catalogs. + let databases = PbDatabase::list(&meta_store).await?; + let schemas = PbSchema::list(&meta_store).await?; + let users = PbUserInfo::list(&meta_store).await?; + let tables = PbTable::list(&meta_store).await?; + let sources = PbSource::list(&meta_store).await?; + let sinks = PbSink::list(&meta_store).await?; + let indexes = PbIndex::list(&meta_store).await?; + let views = PbView::list(&meta_store).await?; + let functions = PbFunction::list(&meta_store).await?; + let connections = PbConnection::list(&meta_store).await?; + let subscriptions = PbSubscription::list(&meta_store).await?; + + // inuse object ids. + let mut inuse_obj_ids = tables + .iter() + .map(|t| t.id) + .chain(sources.iter().map(|s| s.id)) + .chain(sinks.iter().map(|s| s.id)) + .chain(indexes.iter().map(|i| i.id)) + .chain(views.iter().map(|v| v.id)) + .chain(subscriptions.iter().map(|s| s.id)) + .collect::>(); + + // Helper function to get next available id. + let mut next_available_id = || -> u32 { + let id = inuse_obj_ids + .iter() + .enumerate() + .find(|(i, id)| i + 1 != **id as usize) + .map(|(i, _)| i + 1) + .unwrap_or(inuse_obj_ids.len() + 1) as u32; + inuse_obj_ids.insert(id); + id + }; + + // simply truncate all objects. + Object::delete_many() + .filter(object::Column::Oid.ne(0)) + .exec(&meta_store_sql.conn) + .await?; + User::delete_many() + .filter(user::Column::UserId.ne(0)) + .exec(&meta_store_sql.conn) + .await?; + + // user + let user_models = users + .iter() + .map(|u| user::ActiveModel::from(u.clone())) + .collect_vec(); + User::insert_many(user_models) + .exec(&meta_store_sql.conn) + .await?; + println!("users migrated"); + + // database + let mut db_rewrite = HashMap::new(); + for mut db in databases { + let id = next_available_id(); + db_rewrite.insert(db.id, id); + db.id = id as _; + + let obj = object::ActiveModel { + oid: Set(id as _), + obj_type: Set(ObjectType::Database), + owner_id: Set(db.owner as _), + ..Default::default() + }; + Object::insert(obj).exec(&meta_store_sql.conn).await?; + Database::insert(database::ActiveModel::from(db)) + .exec(&meta_store_sql.conn) + .await?; + } + println!("databases migrated"); + + // schema + let mut schema_rewrite = HashMap::new(); + for mut schema in schemas { + let id = next_available_id(); + schema_rewrite.insert(schema.id, id); + schema.id = id as _; + + let obj = object::ActiveModel { + oid: Set(id as _), + obj_type: Set(ObjectType::Schema), + owner_id: Set(schema.owner as _), + database_id: Set(Some(*db_rewrite.get(&schema.database_id).unwrap() as _)), + ..Default::default() + }; + Object::insert(obj).exec(&meta_store_sql.conn).await?; + Schema::insert(schema::ActiveModel::from(schema)) + .exec(&meta_store_sql.conn) + .await?; + } + println!("schemas migrated"); + + // function + let mut function_rewrite = HashMap::new(); + for mut function in functions { + let id = next_available_id(); + function_rewrite.insert(function.id, id); + function.id = id as _; + + let obj = object::ActiveModel { + oid: Set(id as _), + obj_type: Set(ObjectType::Function), + owner_id: Set(function.owner as _), + database_id: Set(Some(*db_rewrite.get(&function.database_id).unwrap() as _)), + schema_id: Set(Some(*schema_rewrite.get(&function.schema_id).unwrap() as _)), + ..Default::default() + }; + Object::insert(obj).exec(&meta_store_sql.conn).await?; + Function::insert(function::ActiveModel::from(function)) + .exec(&meta_store_sql.conn) + .await?; + } + println!("functions migrated"); + + // connection mapping + let mut connection_rewrite = HashMap::new(); + for mut connection in connections { + let id = next_available_id(); + connection_rewrite.insert(connection.id, id); + connection.id = id as _; + + let obj = object::ActiveModel { + oid: Set(id as _), + obj_type: Set(ObjectType::Connection), + owner_id: Set(connection.owner as _), + database_id: Set(Some(*db_rewrite.get(&connection.database_id).unwrap() as _)), + schema_id: Set(Some( + *schema_rewrite.get(&connection.schema_id).unwrap() as _ + )), + ..Default::default() + }; + Object::insert(obj).exec(&meta_store_sql.conn).await?; + Connection::insert(connection::ActiveModel::from(connection)) + .exec(&meta_store_sql.conn) + .await?; + } + println!("connections migrated"); + + // add object: table, source, sink, index, view, subscription. + macro_rules! insert_objects { + ($objects:expr, $object_type:expr) => { + for object in &$objects { + let mut obj = object::ActiveModel { + oid: Set(object.id as _), + obj_type: Set($object_type), + owner_id: Set(object.owner as _), + database_id: Set(Some(*db_rewrite.get(&object.database_id).unwrap() as _)), + schema_id: Set(Some(*schema_rewrite.get(&object.schema_id).unwrap() as _)), + initialized_at_cluster_version: Set(object + .initialized_at_cluster_version + .clone()), + created_at_cluster_version: Set(object.created_at_cluster_version.clone()), + ..Default::default() + }; + if let Some(epoch) = object.initialized_at_epoch.map(Epoch::from) { + obj.initialized_at = + Set(DateTime::from_timestamp_millis(epoch.as_unix_millis() as _).unwrap()); + } + if let Some(epoch) = object.created_at_epoch.map(Epoch::from) { + obj.created_at = + Set(DateTime::from_timestamp_millis(epoch.as_unix_millis() as _).unwrap()); + } + Object::insert(obj).exec(&meta_store_sql.conn).await?; + } + }; + } + insert_objects!(sources, ObjectType::Source); + insert_objects!(sinks, ObjectType::Sink); + insert_objects!(indexes, ObjectType::Index); + insert_objects!(subscriptions, ObjectType::Subscription); + for table in &tables { + if table.table_type() == PbTableType::Index { + // we only store index object. + continue; + } + let mut obj = object::ActiveModel { + oid: Set(table.id as _), + obj_type: Set(ObjectType::Table), + owner_id: Set(table.owner as _), + database_id: Set(Some(*db_rewrite.get(&table.database_id).unwrap() as _)), + schema_id: Set(Some(*schema_rewrite.get(&table.schema_id).unwrap() as _)), + initialized_at_cluster_version: Set(table.initialized_at_cluster_version.clone()), + created_at_cluster_version: Set(table.created_at_cluster_version.clone()), + ..Default::default() + }; + if let Some(epoch) = table.initialized_at_epoch.map(Epoch::from) { + obj.initialized_at = + Set(DateTime::from_timestamp_millis(epoch.as_unix_millis() as _).unwrap()); + } + if let Some(epoch) = table.created_at_epoch.map(Epoch::from) { + obj.created_at = + Set(DateTime::from_timestamp_millis(epoch.as_unix_millis() as _).unwrap()); + } + Object::insert(obj).exec(&meta_store_sql.conn).await?; + } + for view in &views { + let obj = object::ActiveModel { + oid: Set(view.id as _), + obj_type: Set(ObjectType::View), + owner_id: Set(view.owner as _), + database_id: Set(Some(*db_rewrite.get(&view.database_id).unwrap() as _)), + schema_id: Set(Some(*schema_rewrite.get(&view.schema_id).unwrap() as _)), + ..Default::default() + }; + Object::insert(obj).exec(&meta_store_sql.conn).await?; + } + + // table fragments. + let table_fragments = model::TableFragments::list(&meta_store).await?; + let mut fragment_job_map = HashMap::new(); + let mut fragments = vec![]; + let mut actors = vec![]; + let mut actor_dispatchers = vec![]; + + for table_fragment in table_fragments { + let streaming_parallelism = match &table_fragment.assigned_parallelism { + TableParallelism::Adaptive => StreamingParallelism::Adaptive, + TableParallelism::Fixed(n) => StreamingParallelism::Fixed(*n), + TableParallelism::Custom => StreamingParallelism::Custom, + }; + let status = match table_fragment.state() { + State::Unspecified => unreachable!(), + State::Initial => JobStatus::Initial, + State::Creating => JobStatus::Creating, + State::Created => JobStatus::Created, + }; + StreamingJob::insert(streaming_job::ActiveModel { + job_id: Set(table_fragment.table_id().table_id as _), + job_status: Set(status), + create_type: Set(CreateType::Foreground), + timezone: Set(table_fragment.timezone()), + parallelism: Set(streaming_parallelism), + }) + .exec(&meta_store_sql.conn) + .await?; + + let fragment_actors = CatalogController::extract_fragment_and_actors_from_table_fragments( + table_fragment.to_protobuf(), + ) + .unwrap(); + for (fragment, a, ad) in fragment_actors { + fragment_job_map.insert( + fragment.fragment_id as u32, + table_fragment.table_id().table_id as ObjectId, + ); + fragments.push(fragment); + actors.extend(a); + actor_dispatchers.extend(ad); + } + } + for fragment in fragments { + // rewrite conflict ids. + let mut stream_node = fragment.stream_node.to_protobuf(); + visit_stream_node_tables(&mut stream_node, |table, _| { + table.database_id = *db_rewrite.get(&table.database_id).unwrap(); + table.schema_id = *schema_rewrite.get(&table.schema_id).unwrap(); + }); + let mut fragment = fragment.into_active_model(); + fragment.stream_node = Set(StreamNode::from_protobuf(&stream_node)); + Fragment::insert(fragment) + .exec(&meta_store_sql.conn) + .await?; + } + // Add actors and actor dispatchers. + for actor in actors { + let actor = actor.into_active_model(); + Actor::insert(actor).exec(&meta_store_sql.conn).await?; + } + for (_, actor_dispatchers) in actor_dispatchers { + for actor_dispatcher in actor_dispatchers { + let mut actor_dispatcher = actor_dispatcher.into_active_model(); + actor_dispatcher.id = NotSet; + ActorDispatcher::insert(actor_dispatcher) + .exec(&meta_store_sql.conn) + .await?; + } + } + println!("table fragments migrated"); + + // catalogs. + // source + if !sources.is_empty() { + let source_models: Vec = sources + .into_iter() + .map(|mut src| { + if let Some(id) = src.connection_id.as_mut() { + *id = *connection_rewrite.get(id).unwrap(); + } + src.into() + }) + .collect(); + Source::insert_many(source_models) + .exec(&meta_store_sql.conn) + .await?; + } + println!("sources migrated"); + + let mut object_dependencies = vec![]; + + // table + for table in tables { + let job_id = if table.table_type() == PbTableType::Internal { + Set(Some(*fragment_job_map.get(&table.fragment_id).unwrap())) + } else { + NotSet + }; + object_dependencies.extend(table.dependent_relations.iter().map(|id| { + object_dependency::ActiveModel { + id: NotSet, + oid: Set(*id as _), + used_by: Set(table.id as _), + } + })); + let mut t: table::ActiveModel = table.into(); + t.belongs_to_job_id = job_id; + Table::insert(t).exec(&meta_store_sql.conn).await?; + } + println!("tables migrated"); + + // index + if !indexes.is_empty() { + let index_models: Vec = + indexes.into_iter().map(|i| i.into()).collect_vec(); + Index::insert_many(index_models) + .exec(&meta_store_sql.conn) + .await?; + } + println!("indexes migrated"); + + // sink + if !sinks.is_empty() { + let sink_models: Vec = sinks + .into_iter() + .map(|mut s| { + object_dependencies.extend(s.dependent_relations.iter().map(|id| { + object_dependency::ActiveModel { + id: NotSet, + oid: Set(*id as _), + used_by: Set(s.id as _), + } + })); + if let Some(id) = s.connection_id.as_mut() { + *id = *connection_rewrite.get(id).unwrap(); + } + s.into() + }) + .collect(); + Sink::insert_many(sink_models) + .exec(&meta_store_sql.conn) + .await?; + } + println!("sinks migrated"); + + // view + if !views.is_empty() { + let view_models: Vec = views + .into_iter() + .map(|v| { + object_dependencies.extend(v.dependent_relations.iter().map(|id| { + object_dependency::ActiveModel { + id: NotSet, + oid: Set(*id as _), + used_by: Set(v.id as _), + } + })); + v.into() + }) + .collect(); + View::insert_many(view_models) + .exec(&meta_store_sql.conn) + .await?; + } + println!("views migrated"); + + // subscriptions + if !subscriptions.is_empty() { + let subscription_models: Vec = subscriptions + .into_iter() + .map(|s| { + object_dependencies.extend(s.dependent_relations.iter().map(|id| { + object_dependency::ActiveModel { + id: NotSet, + oid: Set(*id as _), + used_by: Set(s.id as _), + } + })); + s.into() + }) + .collect(); + Subscription::insert_many(subscription_models) + .exec(&meta_store_sql.conn) + .await?; + } + println!("subscriptions migrated"); + + // object_dependency + if !object_dependencies.is_empty() { + ObjectDependency::insert_many(object_dependencies) + .exec(&meta_store_sql.conn) + .await?; + } + println!("object dependencies migrated"); + + // user privilege + let mut privileges = vec![]; + assert!(!users.is_empty()); + let next_user_id = users.iter().map(|u| u.id + 1).max().unwrap(); + for user in users { + for gp in user.grant_privileges { + let id = match gp.get_object()? { + GrantObject::DatabaseId(id) => *db_rewrite.get(id).unwrap(), + GrantObject::SchemaId(id) => *schema_rewrite.get(id).unwrap(), + GrantObject::FunctionId(id) => *function_rewrite.get(id).unwrap(), + GrantObject::TableId(id) + | GrantObject::SourceId(id) + | GrantObject::SinkId(id) + | GrantObject::ViewId(id) + | GrantObject::SubscriptionId(id) => *id, + ty => unreachable!("invalid object type: {:?}", ty), + }; + for action_with_opt in &gp.action_with_opts { + privileges.push(user_privilege::ActiveModel { + user_id: Set(user.id as _), + oid: Set(id as _), + granted_by: Set(action_with_opt.granted_by as _), + action: Set(action_with_opt.get_action()?.into()), + with_grant_option: Set(action_with_opt.with_grant_option), + ..Default::default() + }); + } + } + } + if !privileges.is_empty() { + UserPrivilege::insert_many(privileges) + .exec(&meta_store_sql.conn) + .await?; + } + println!("user privileges migrated"); + + // notification. + let notification_version = NotificationVersion::new(&meta_store).await; + CatalogVersion::insert(catalog_version::ActiveModel { + name: Set(VersionCategory::Notification), + version: Set(notification_version.version() as _), + }) + .exec(&meta_store_sql.conn) + .await?; + println!("notification version migrated"); + + // table revision. + let table_revision = TableRevision::get(&meta_store).await?; + CatalogVersion::insert(catalog_version::ActiveModel { + name: Set(VersionCategory::TableRevision), + version: Set(table_revision.inner() as _), + }) + .exec(&meta_store_sql.conn) + .await?; + println!("table revision migrated"); + + // hummock. + // hummock pinned snapshots + let pinned_snapshots = HummockPinnedSnapshot::list(&meta_store).await?; + if !pinned_snapshots.is_empty() { + hummock_pinned_snapshot::Entity::insert_many( + pinned_snapshots + .into_iter() + .map(|ps| hummock_pinned_snapshot::ActiveModel { + context_id: Set(ps.context_id as _), + min_pinned_snapshot: Set(ps.minimal_pinned_snapshot as _), + }) + .collect_vec(), + ) + .exec(&meta_store_sql.conn) + .await?; + } + println!("hummock pinned snapshots migrated"); + + // hummock pinned version + let pinned_version = HummockPinnedVersion::list(&meta_store).await?; + if !pinned_version.is_empty() { + hummock_pinned_version::Entity::insert_many( + pinned_version + .into_iter() + .map(|pv| hummock_pinned_version::ActiveModel { + context_id: Set(pv.context_id as _), + min_pinned_id: Set(pv.min_pinned_id as _), + }) + .collect_vec(), + ) + .exec(&meta_store_sql.conn) + .await?; + } + println!("hummock pinned version migrated"); + + // hummock version delta + let version_delta = HummockVersionDelta::list(&meta_store).await?; + if !version_delta.is_empty() { + hummock_version_delta::Entity::insert_many( + version_delta + .into_iter() + .map(|vd| hummock_version_delta::ActiveModel { + id: Set(vd.id as _), + prev_id: Set(vd.prev_id as _), + max_committed_epoch: Set(vd.max_committed_epoch as _), + safe_epoch: Set(vd.safe_epoch as _), + trivial_move: Set(vd.trivial_move), + full_version_delta: Set(vd.to_protobuf().into()), + }) + .collect_vec(), + ) + .exec(&meta_store_sql.conn) + .await?; + } + println!("hummock version delta migrated"); + + // hummock version stat + let version_stats = HummockVersionStats::list(&meta_store) + .await? + .into_iter() + .next(); + if let Some(version_stats) = version_stats { + hummock_version_stats::Entity::insert(hummock_version_stats::ActiveModel { + id: Set(version_stats.hummock_version_id as _), + stats: Set(TableStats(version_stats.table_stats)), + }) + .exec(&meta_store_sql.conn) + .await?; + } + println!("hummock version stats migrated"); + + // compaction + // compaction config + let compaction_groups = CompactionGroup::list(&meta_store).await?; + if !compaction_groups.is_empty() { + compaction_config::Entity::insert_many( + compaction_groups + .into_iter() + .map(|cg| compaction_config::ActiveModel { + compaction_group_id: Set(cg.group_id as _), + config: Set((*cg.compaction_config).clone().into()), + }) + .collect_vec(), + ) + .exec(&meta_store_sql.conn) + .await?; + } + println!("compaction config migrated"); + + // compaction status + let compaction_statuses = CompactStatus::list(&meta_store).await?; + if !compaction_statuses.is_empty() { + compaction_status::Entity::insert_many( + compaction_statuses + .into_iter() + .map(|cs| compaction_status::ActiveModel { + compaction_group_id: Set(cs.compaction_group_id as _), + status: Set(LevelHandlers(cs.level_handlers.iter().map_into().collect())), + }) + .collect_vec(), + ) + .exec(&meta_store_sql.conn) + .await?; + } + println!("compaction status migrated"); + + // compaction task + let compaction_tasks = CompactTaskAssignment::list(&meta_store).await?; + if !compaction_tasks.is_empty() { + compaction_task::Entity::insert_many(compaction_tasks.into_iter().map(|ct| { + let context_id = ct.context_id; + let task = ct.compact_task.unwrap(); + compaction_task::ActiveModel { + id: Set(task.task_id as _), + context_id: Set(context_id as _), + task: Set(task.into()), + } + })) + .exec(&meta_store_sql.conn) + .await?; + } + println!("compaction task migrated"); + + // hummock sequence + let sst_obj_id = load_current_id(&meta_store, "hummock_ss_table_id", Some(1)).await; + let compaction_task_id = load_current_id(&meta_store, "hummock_compaction_task", Some(1)).await; + let compaction_group_id = load_current_id( + &meta_store, + "compaction_group", + Some(StaticCompactionGroupId::End as u64 + 1), + ) + .await; + let backup_id = load_current_id(&meta_store, "backup", Some(1)).await; + hummock_sequence::Entity::insert_many(vec![ + hummock_sequence::ActiveModel { + name: Set(SSTABLE_OBJECT_ID.into()), + seq: Set(sst_obj_id as _), + }, + hummock_sequence::ActiveModel { + name: Set(COMPACTION_TASK_ID.into()), + seq: Set(compaction_task_id as _), + }, + hummock_sequence::ActiveModel { + name: Set(COMPACTION_GROUP_ID.into()), + seq: Set(compaction_group_id as _), + }, + hummock_sequence::ActiveModel { + name: Set(META_BACKUP_ID.into()), + seq: Set(backup_id as _), + }, + ]) + .exec(&meta_store_sql.conn) + .await?; + println!("hummock sequence migrated"); + + // Rest sequence for object and user. + match meta_store_sql.conn.get_database_backend() { + DbBackend::MySql => { + let next_object_id = next_available_id(); + meta_store_sql + .conn + .execute(Statement::from_string( + DatabaseBackend::MySql, + format!("ALTER TABLE object AUTO_INCREMENT = {next_object_id};"), + )) + .await?; + meta_store_sql + .conn + .execute(Statement::from_string( + DatabaseBackend::MySql, + format!("ALTER TABLE user AUTO_INCREMENT = {next_user_id};"), + )) + .await?; + } + DbBackend::Postgres => { + meta_store_sql + .conn + .execute(Statement::from_string( + DatabaseBackend::Postgres, + "SELECT setval('object_oid_seq', (SELECT MAX(oid) FROM object) + 1);", + )) + .await?; + meta_store_sql + .conn + .execute(Statement::from_string( + DatabaseBackend::Postgres, + "SELECT setval('user_user_id_seq', (SELECT MAX(user_id) FROM \"user\") + 1);", + )) + .await?; + } + DbBackend::Sqlite => {} + } + + Ok(()) +} + +async fn load_current_id(meta_store: &MetaStoreRef, category: &str, start: Option) -> u64 { + let category_gen_key = format!("{}_id_next_generator", category); + let res = meta_store + .get_cf(DEFAULT_COLUMN_FAMILY, category_gen_key.as_bytes()) + .await; + match res { + Ok(value) => memcomparable::from_slice(&value).unwrap(), + Err(MetaStoreError::ItemNotFound(_)) => start.unwrap_or(0), + Err(e) => panic!("{}", e.as_report()), + } +} diff --git a/src/ctl/src/lib.rs b/src/ctl/src/lib.rs index 9cdc99c0d315..257c5e5cf809 100644 --- a/src/ctl/src/lib.rs +++ b/src/ctl/src/lib.rs @@ -19,6 +19,7 @@ use anyhow::Result; use clap::{Args, Parser, Subcommand}; use cmd_impl::bench::BenchCommands; use cmd_impl::hummock::SstDumpArgs; +use itertools::Itertools; use risingwave_hummock_sdk::HummockEpoch; use risingwave_meta::backup_restore::RestoreOpts; use risingwave_pb::meta::update_worker_node_schedulability_request::Schedulability; @@ -27,6 +28,7 @@ use thiserror_ext::AsReport; use crate::cmd_impl::hummock::{ build_compaction_config_vec, list_pinned_snapshots, list_pinned_versions, }; +use crate::cmd_impl::meta::EtcdBackend; use crate::cmd_impl::throttle::apply_throttle; use crate::common::CtlContext; @@ -521,6 +523,29 @@ enum MetaCommands { #[clap(long)] props: String, }, + + /// Migration from etcd meta store to sql backend + Migration { + #[clap( + long, + required = true, + value_delimiter = ',', + value_name = "host:port, ..." + )] + etcd_endpoints: String, + #[clap(long, value_name = "username:password")] + etcd_user_password: Option, + + #[clap( + long, + required = true, + value_name = "postgres://user:password@host:port/dbname or mysql://user:password@host:port/dbname or sqlite://path?mode=rwc" + )] + sql_endpoint: String, + + #[clap(short = 'f', long, default_value_t = false)] + force_clean: bool, + }, } #[derive(Subcommand, Clone, Debug)] @@ -781,6 +806,30 @@ async fn start_impl(opts: CliOpts, context: &CtlContext) -> Result<()> { Commands::Meta(MetaCommands::ValidateSource { props }) => { cmd_impl::meta::validate_source(context, props).await? } + Commands::Meta(MetaCommands::Migration { + etcd_endpoints, + etcd_user_password, + sql_endpoint, + force_clean, + }) => { + let credentials = match etcd_user_password { + Some(user_pwd) => { + let user_pwd_vec = user_pwd.splitn(2, ':').collect_vec(); + if user_pwd_vec.len() != 2 { + return Err(anyhow::Error::msg(format!( + "invalid etcd user password: {user_pwd}" + ))); + } + Some((user_pwd_vec[0].to_string(), user_pwd_vec[1].to_string())) + } + None => None, + }; + let etcd_backend = EtcdBackend { + endpoints: etcd_endpoints.split(',').map(|s| s.to_string()).collect(), + credentials, + }; + cmd_impl::meta::migrate(etcd_backend, sql_endpoint, force_clean).await? + } Commands::AwaitTree => cmd_impl::await_tree::dump(context).await?, Commands::Profile(ProfileCommands::Cpu { sleep }) => { cmd_impl::profile::cpu_profile(context, sleep).await? diff --git a/src/meta/model_v2/src/hummock_sequence.rs b/src/meta/model_v2/src/hummock_sequence.rs index 5f4ed6cb367a..63cfe6592458 100644 --- a/src/meta/model_v2/src/hummock_sequence.rs +++ b/src/meta/model_v2/src/hummock_sequence.rs @@ -14,6 +14,11 @@ use sea_orm::entity::prelude::*; +pub const COMPACTION_TASK_ID: &str = "compaction_task"; +pub const COMPACTION_GROUP_ID: &str = "compaction_group"; +pub const SSTABLE_OBJECT_ID: &str = "sstable_object"; +pub const META_BACKUP_ID: &str = "meta_backup"; + #[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq, Default)] #[sea_orm(table_name = "hummock_sequence")] pub struct Model { diff --git a/src/meta/model_v2/src/worker.rs b/src/meta/model_v2/src/worker.rs index 90eb13ba1a07..8694bb1c6d20 100644 --- a/src/meta/model_v2/src/worker.rs +++ b/src/meta/model_v2/src/worker.rs @@ -96,7 +96,7 @@ impl From<&PbWorkerNode> for ActiveModel { host: Set(host.host), port: Set(host.port), status: Set(worker.state().into()), - ..Default::default() + transaction_id: Set(worker.transactional_id.map(|id| id as _)), } } } diff --git a/src/meta/src/controller/utils.rs b/src/meta/src/controller/utils.rs index 6b5db0be6b36..dad880bbad77 100644 --- a/src/meta/src/controller/utils.rs +++ b/src/meta/src/controller/utils.rs @@ -754,7 +754,8 @@ pub fn extract_grant_obj_id(object: &PbObject) -> ObjectId { | PbObject::SourceId(id) | PbObject::SinkId(id) | PbObject::ViewId(id) - | PbObject::FunctionId(id) => *id as _, + | PbObject::FunctionId(id) + | PbObject::SubscriptionId(id) => *id as _, _ => unreachable!("invalid object type: {:?}", object), } } diff --git a/src/meta/src/hummock/manager/sequence.rs b/src/meta/src/hummock/manager/sequence.rs index ab154376404d..ff58a4d1903b 100644 --- a/src/meta/src/hummock/manager/sequence.rs +++ b/src/meta/src/hummock/manager/sequence.rs @@ -18,6 +18,9 @@ use std::sync::LazyLock; use risingwave_hummock_sdk::compaction_group::StaticCompactionGroupId; use risingwave_meta_model_v2::hummock_sequence; +use risingwave_meta_model_v2::hummock_sequence::{ + COMPACTION_GROUP_ID, COMPACTION_TASK_ID, META_BACKUP_ID, SSTABLE_OBJECT_ID, +}; use risingwave_meta_model_v2::prelude::HummockSequence; use sea_orm::{ActiveModelTrait, ActiveValue, DatabaseConnection, EntityTrait, TransactionTrait}; use tokio::sync::Mutex; @@ -25,11 +28,6 @@ use tokio::sync::Mutex; use crate::hummock::error::Result; use crate::manager::{IdCategory, MetaSrvEnv}; -const COMPACTION_TASK_ID: &str = "compaction_task"; -const COMPACTION_GROUP_ID: &str = "compaction_group"; -const SSTABLE_OBJECT_ID: &str = "sstable_object"; -const META_BACKUP_ID: &str = "meta_backup"; - static SEQ_INIT: LazyLock> = LazyLock::new(|| { maplit::hashmap! { COMPACTION_TASK_ID.into() => 1,