From 9d205851778b4a85de14176ecf0ef46eec3b119c Mon Sep 17 00:00:00 2001 From: Bugen Zhao Date: Mon, 25 Nov 2024 16:15:45 +0800 Subject: [PATCH 1/5] feat(risedev): support configuring meta store by url from env var Signed-off-by: Bugen Zhao --- Cargo.lock | 3 + Makefile.toml | 1 + src/risedevtool/Cargo.toml | 5 +- src/risedevtool/src/bin/risedev-dev.rs | 6 ++ src/risedevtool/src/service_config.rs | 1 + src/risedevtool/src/task/meta_node_service.rs | 97 ++++++++++++++++++- 6 files changed, 110 insertions(+), 3 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 46ab0da13c40c..ad3a78617ec56 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -10407,6 +10407,7 @@ dependencies = [ "google-cloud-pubsub", "indicatif", "itertools 0.13.0", + "log", "madsim-rdkafka", "madsim-tokio", "redis", @@ -10416,10 +10417,12 @@ dependencies = [ "serde_json", "serde_with 3.8.1", "serde_yaml", + "sqlx", "tempfile", "thiserror-ext", "tracing", "tracing-subscriber", + "url", "workspace-hack", "yaml-rust", ] diff --git a/Makefile.toml b/Makefile.toml index a7c45d363e8fd..f772ba27e8bb1 100644 --- a/Makefile.toml +++ b/Makefile.toml @@ -1282,6 +1282,7 @@ echo If you still feel this is not enough, you may copy $(tput setaf 4)risedev$( category = "RiseDev - CI" dependencies = ["clean-data", "pre-start-dev"] command = "target/debug/risedev-dev" # `risedev-dev` is always built in dev profile +env = { RISEDEV_CLEAN_START = true } args = ["${@}"] description = "Clean data and start a full RisingWave dev cluster using risedev-dev" diff --git a/src/risedevtool/Cargo.toml b/src/risedevtool/Cargo.toml index a3854a2ba2726..2c415d9f5da78 100644 --- a/src/risedevtool/Cargo.toml +++ b/src/risedevtool/Cargo.toml @@ -26,6 +26,7 @@ glob = "0.3" google-cloud-pubsub = "0.29" indicatif = "0.17" itertools = { workspace = true } +log = "0.4" rdkafka = { workspace = true } redis = "0.25" regex = "1" @@ -34,6 +35,7 @@ serde = { version = "1", features = ["derive"] } serde_json = "1" serde_with = "3" serde_yaml = "0.9" +sqlx = { workspace = true, features = ["any"] } tempfile = "3" thiserror-ext = { workspace = true } tokio = { version = "0.2", package = "madsim-tokio", features = [ @@ -46,7 +48,8 @@ tokio = { version = "0.2", package = "madsim-tokio", features = [ "fs", ] } tracing = "0.1" -tracing-subscriber = "0.3" +tracing-subscriber = { version = "0.3", features = ["env-filter"] } +url = "2" workspace-hack = { path = "../workspace-hack" } yaml-rust = "0.4" diff --git a/src/risedevtool/src/bin/risedev-dev.rs b/src/risedevtool/src/bin/risedev-dev.rs index 1420ec9e6bf72..9cdbe938ecd39 100644 --- a/src/risedevtool/src/bin/risedev-dev.rs +++ b/src/risedevtool/src/bin/risedev-dev.rs @@ -32,6 +32,7 @@ use risedev::{ }; use tempfile::tempdir; use thiserror_ext::AsReport; +use tracing_subscriber::EnvFilter; use yaml_rust::YamlEmitter; #[derive(Default)] @@ -378,6 +379,11 @@ fn main() -> Result<()> { // Backtraces for RisingWave components are enabled in `Task::execute`. std::env::set_var("RUST_BACKTRACE", "0"); + // Init logger from a specific env var. + tracing_subscriber::fmt() + .with_env_filter(EnvFilter::from_env("RISEDEV_RUST_LOG")) + .init(); + preflight_check()?; let task_name = std::env::args() diff --git a/src/risedevtool/src/service_config.rs b/src/risedevtool/src/service_config.rs index 9c47828a61b2b..6d8277c3396f8 100644 --- a/src/risedevtool/src/service_config.rs +++ b/src/risedevtool/src/service_config.rs @@ -51,6 +51,7 @@ pub enum MetaBackend { Sqlite, Postgres, Mysql, + Env, } #[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] diff --git a/src/risedevtool/src/task/meta_node_service.rs b/src/risedevtool/src/task/meta_node_service.rs index 3d80efc5a70de..31350964bc254 100644 --- a/src/risedevtool/src/task/meta_node_service.rs +++ b/src/risedevtool/src/task/meta_node_service.rs @@ -16,16 +16,22 @@ use std::env; use std::path::{Path, PathBuf}; use std::process::Command; -use anyhow::{anyhow, Result}; +use anyhow::{anyhow, bail, Context, Result}; use itertools::Itertools; +use sqlx::{ConnectOptions, Database}; +use url::Url; use super::{risingwave_cmd, ExecuteContext, Task}; -use crate::util::{get_program_args, get_program_env_cmd, get_program_name}; +use crate::util::{get_program_args, get_program_env_cmd, get_program_name, is_env_set}; use crate::{ add_hummock_backend, add_tempo_endpoint, Application, HummockInMemoryStrategy, MetaBackend, MetaNodeConfig, }; +fn sql_endpoint_from_env() -> Result { + env::var("RISEDEV_SQL_ENDPOINT").context("env RISEDEV_SQL_ENDPOINT not set") +} + pub struct MetaNodeService { config: MetaNodeConfig, } @@ -139,6 +145,15 @@ impl MetaNodeService { .arg("--sql-database") .arg(&mysql_store_config.database); } + MetaBackend::Env => { + let endpoint = sql_endpoint_from_env()?; + is_persistent_meta_store = true; + + cmd.arg("--backend") + .arg("sql") + .arg("--sql-endpoint") + .arg(endpoint); + } } let provide_minio = config.provide_minio.as_ref().unwrap(); @@ -245,6 +260,18 @@ impl Task for MetaNodeService { cmd.arg("--config-path") .arg(Path::new(&prefix_config).join("risingwave.toml")); + if let MetaBackend::Env = self.config.meta_backend { + if is_env_set("RISEDEV_CLEAN_START") { + ctx.pb.set_message("initializing meta store from env..."); + initialize_meta_store()?; + } + } else if sql_endpoint_from_env().is_ok() { + bail!( + "should specify `meta-backend: env` for meta-node service \ + when env RISEDEV_SQL_ENDPOINT is set" + ); + } + if !self.config.user_managed { ctx.run_command(ctx.tmux_run(cmd)?)?; ctx.pb.set_message("started"); @@ -266,3 +293,69 @@ impl Task for MetaNodeService { self.config.id.clone() } } + +fn initialize_meta_store() -> Result<(), anyhow::Error> { + let rt = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build()?; + + let endpoint: Url = sql_endpoint_from_env()? + .parse() + .context("invalid url for SQL endpoint")?; + let scheme = endpoint.scheme(); + + let (db, init_url) = if sqlx::Postgres::URL_SCHEMES.contains(&scheme) { + let options = sqlx::postgres::PgConnectOptions::from_url(&endpoint) + .context("invalid database url for Postgres meta backend")?; + + let db = options + .get_database() + .unwrap_or_else(|| options.get_username()) + .to_owned(); + let init_options = options.database("template1"); + let init_url = init_options.to_url_lossy(); + + (db, init_url) + } else if sqlx::MySql::URL_SCHEMES.contains(&scheme) { + let options = sqlx::mysql::MySqlConnectOptions::from_url(&endpoint) + .context("invalid database url for MySQL meta backend")?; + + let db = options + .get_database() + .context("database not specified for MySQL meta backend")? + .to_owned(); + let init_options = options.database(""); + let init_url = init_options.to_url_lossy(); + + (db, init_url) + } else { + // SQLite does not require database creation. + return Ok(()); + }; + + rt.block_on(async move { + use sqlx::any::*; + install_default_drivers(); + + let options = sqlx::any::AnyConnectOptions::from_url(&init_url)? + .log_statements(log::LevelFilter::Debug); + + let mut conn = options + .connect() + .await + .context("failed to connect to a template database for meta store")?; + + // Intentionally not executing in a transaction because Postgres does not allow it. + sqlx::raw_sql(&format!("DROP DATABASE IF EXISTS {};", db)) + .execute(&mut conn) + .await?; + sqlx::raw_sql(&format!("CREATE DATABASE {};", db)) + .execute(&mut conn) + .await?; + + Ok::<_, anyhow::Error>(()) + }) + .context("failed to initialize database for meta store")?; + + Ok(()) +} From ed8cf2c0f4c477e25297eb6406b2e1ea27423186 Mon Sep 17 00:00:00 2001 From: Bugen Zhao Date: Mon, 25 Nov 2024 17:25:15 +0800 Subject: [PATCH 2/5] add more docs Signed-off-by: Bugen Zhao --- src/common/src/config.rs | 2 +- src/risedevtool/src/bin/risedev-dev.rs | 2 +- src/risedevtool/src/task/meta_node_service.rs | 9 +++++++-- 3 files changed, 9 insertions(+), 4 deletions(-) diff --git a/src/common/src/config.rs b/src/common/src/config.rs index 361f524ce775c..8163cd359ff28 100644 --- a/src/common/src/config.rs +++ b/src/common/src/config.rs @@ -173,7 +173,7 @@ serde_with::with_prefix!(batch_prefix "batch_"); pub enum MetaBackend { #[default] Mem, - Sql, // keep for backward compatibility + Sql, // any database url Sqlite, Postgres, Mysql, diff --git a/src/risedevtool/src/bin/risedev-dev.rs b/src/risedevtool/src/bin/risedev-dev.rs index 9cdbe938ecd39..e57adf7df2199 100644 --- a/src/risedevtool/src/bin/risedev-dev.rs +++ b/src/risedevtool/src/bin/risedev-dev.rs @@ -379,7 +379,7 @@ fn main() -> Result<()> { // Backtraces for RisingWave components are enabled in `Task::execute`. std::env::set_var("RUST_BACKTRACE", "0"); - // Init logger from a specific env var. + // Init logger from a customized env var. tracing_subscriber::fmt() .with_env_filter(EnvFilter::from_env("RISEDEV_RUST_LOG")) .init(); diff --git a/src/risedevtool/src/task/meta_node_service.rs b/src/risedevtool/src/task/meta_node_service.rs index 31350964bc254..f8f626877839d 100644 --- a/src/risedevtool/src/task/meta_node_service.rs +++ b/src/risedevtool/src/task/meta_node_service.rs @@ -304,14 +304,17 @@ fn initialize_meta_store() -> Result<(), anyhow::Error> { .context("invalid url for SQL endpoint")?; let scheme = endpoint.scheme(); + // Retrieve the database name to use for the meta store. + // Modify the URL to establish a temporary connection to initialize that database. let (db, init_url) = if sqlx::Postgres::URL_SCHEMES.contains(&scheme) { let options = sqlx::postgres::PgConnectOptions::from_url(&endpoint) .context("invalid database url for Postgres meta backend")?; let db = options .get_database() - .unwrap_or_else(|| options.get_username()) + .unwrap_or_else(|| options.get_username()) // PG defaults to username if no database is specified .to_owned(); + // https://www.postgresql.org/docs/current/manage-ag-templatedbs.html let init_options = options.database("template1"); let init_url = init_options.to_url_lossy(); @@ -324,12 +327,14 @@ fn initialize_meta_store() -> Result<(), anyhow::Error> { .get_database() .context("database not specified for MySQL meta backend")? .to_owned(); + // Effectively unset the database field when converting back to URL, meaning connect to no database. let init_options = options.database(""); let init_url = init_options.to_url_lossy(); (db, init_url) } else { - // SQLite does not require database creation. + // SQLite file itself is the database. + // TODO: shall we empty the file? return Ok(()); }; From 314f7cd9251e363e010f41256295dfe68c5c2359 Mon Sep 17 00:00:00 2001 From: Bugen Zhao Date: Tue, 26 Nov 2024 12:47:01 +0800 Subject: [PATCH 3/5] empty sqlite file & add docs Signed-off-by: Bugen Zhao --- src/risedevtool/src/task/meta_node_service.rs | 25 ++++++++++++++++--- 1 file changed, 22 insertions(+), 3 deletions(-) diff --git a/src/risedevtool/src/task/meta_node_service.rs b/src/risedevtool/src/task/meta_node_service.rs index f8f626877839d..2973919b83429 100644 --- a/src/risedevtool/src/task/meta_node_service.rs +++ b/src/risedevtool/src/task/meta_node_service.rs @@ -28,6 +28,14 @@ use crate::{ MetaNodeConfig, }; +/// URL for connecting to the SQL meta store. +/// +/// # Examples +/// +/// - `mysql://root:my-secret-pw@127.0.0.1:3306/metastore` +/// - `postgresql://localhost:5432/metastore` +/// - `sqlite:///path/to/file.db` +/// - `sqlite::memory:` fn sql_endpoint_from_env() -> Result { env::var("RISEDEV_SQL_ENDPOINT").context("env RISEDEV_SQL_ENDPOINT not set") } @@ -332,10 +340,21 @@ fn initialize_meta_store() -> Result<(), anyhow::Error> { let init_url = init_options.to_url_lossy(); (db, init_url) - } else { - // SQLite file itself is the database. - // TODO: shall we empty the file? + } else if sqlx::Sqlite::URL_SCHEMES.contains(&scheme) { + // For SQLite, simply empty the file. + let options = sqlx::sqlite::SqliteConnectOptions::from_url(&endpoint) + .context("invalid database url for SQLite meta backend")?; + + if endpoint.as_str().contains(":memory:") || endpoint.as_str().contains("mode=memory") { + // SQLite in-memory database does not need initialization. + } else { + let filename = options.get_filename(); + fs_err::write(filename, b"").context("failed to empty SQLite file")?; + } + return Ok(()); + } else { + bail!("unsupported SQL scheme for meta backend: {}", scheme); }; rt.block_on(async move { From b913512be340d785134f2dacea19a6f9302f84a6 Mon Sep 17 00:00:00 2001 From: Bugen Zhao Date: Tue, 26 Nov 2024 16:14:28 +0800 Subject: [PATCH 4/5] defaults to sqlite if unset Signed-off-by: Bugen Zhao --- src/risedevtool/src/task/meta_node_service.rs | 32 +++++++++++++------ 1 file changed, 22 insertions(+), 10 deletions(-) diff --git a/src/risedevtool/src/task/meta_node_service.rs b/src/risedevtool/src/task/meta_node_service.rs index 2973919b83429..752dd87cd0cda 100644 --- a/src/risedevtool/src/task/meta_node_service.rs +++ b/src/risedevtool/src/task/meta_node_service.rs @@ -15,10 +15,12 @@ use std::env; use std::path::{Path, PathBuf}; use std::process::Command; +use std::sync::LazyLock; use anyhow::{anyhow, bail, Context, Result}; use itertools::Itertools; use sqlx::{ConnectOptions, Database}; +use tempfile::NamedTempFile; use url::Url; use super::{risingwave_cmd, ExecuteContext, Task}; @@ -28,7 +30,8 @@ use crate::{ MetaNodeConfig, }; -/// URL for connecting to the SQL meta store. +/// URL for connecting to the SQL meta store, retrieved from the env var `RISEDEV_SQL_ENDPOINT`. +/// If it is not set, a temporary sqlite file is created and used. /// /// # Examples /// @@ -36,8 +39,22 @@ use crate::{ /// - `postgresql://localhost:5432/metastore` /// - `sqlite:///path/to/file.db` /// - `sqlite::memory:` -fn sql_endpoint_from_env() -> Result { - env::var("RISEDEV_SQL_ENDPOINT").context("env RISEDEV_SQL_ENDPOINT not set") +fn sql_endpoint_from_env() -> String { + static SQL_ENDPOINT: LazyLock = LazyLock::new(|| { + if let Ok(endpoint) = env::var("RISEDEV_SQL_ENDPOINT") { + endpoint + } else { + let temp_path = NamedTempFile::with_suffix(".db").unwrap().into_temp_path(); + let temp_sqlite_endpoint = format!("sqlite://{}?mode=rwc", temp_path.to_string_lossy()); + tracing::warn!( + "env RISEDEV_SQL_ENDPOINT not set, use temporary sqlite `{}`", + temp_sqlite_endpoint + ); + temp_sqlite_endpoint + } + }); + + SQL_ENDPOINT.to_owned() } pub struct MetaNodeService { @@ -154,7 +171,7 @@ impl MetaNodeService { .arg(&mysql_store_config.database); } MetaBackend::Env => { - let endpoint = sql_endpoint_from_env()?; + let endpoint = sql_endpoint_from_env(); is_persistent_meta_store = true; cmd.arg("--backend") @@ -273,11 +290,6 @@ impl Task for MetaNodeService { ctx.pb.set_message("initializing meta store from env..."); initialize_meta_store()?; } - } else if sql_endpoint_from_env().is_ok() { - bail!( - "should specify `meta-backend: env` for meta-node service \ - when env RISEDEV_SQL_ENDPOINT is set" - ); } if !self.config.user_managed { @@ -307,7 +319,7 @@ fn initialize_meta_store() -> Result<(), anyhow::Error> { .enable_all() .build()?; - let endpoint: Url = sql_endpoint_from_env()? + let endpoint: Url = sql_endpoint_from_env() .parse() .context("invalid url for SQL endpoint")?; let scheme = endpoint.scheme(); From be023eeae430502762c563389d0425838b620c2a Mon Sep 17 00:00:00 2001 From: Bugen Zhao Date: Tue, 26 Nov 2024 16:20:27 +0800 Subject: [PATCH 5/5] default level to warn Signed-off-by: Bugen Zhao --- src/risedevtool/src/bin/risedev-dev.rs | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/src/risedevtool/src/bin/risedev-dev.rs b/src/risedevtool/src/bin/risedev-dev.rs index e57adf7df2199..8bf9aa52fe8fc 100644 --- a/src/risedevtool/src/bin/risedev-dev.rs +++ b/src/risedevtool/src/bin/risedev-dev.rs @@ -32,6 +32,7 @@ use risedev::{ }; use tempfile::tempdir; use thiserror_ext::AsReport; +use tracing::level_filters::LevelFilter; use tracing_subscriber::EnvFilter; use yaml_rust::YamlEmitter; @@ -381,7 +382,12 @@ fn main() -> Result<()> { // Init logger from a customized env var. tracing_subscriber::fmt() - .with_env_filter(EnvFilter::from_env("RISEDEV_RUST_LOG")) + .with_env_filter( + EnvFilter::builder() + .with_default_directive(LevelFilter::WARN.into()) + .with_env_var("RISEDEV_RUST_LOG") + .from_env_lossy(), + ) .init(); preflight_check()?;