Skip to content

Commit

Permalink
feat(risedev): support configuring meta store by url from env var (#1…
Browse files Browse the repository at this point in the history
…9560)

Signed-off-by: Bugen Zhao <[email protected]>
  • Loading branch information
BugenZhao authored Nov 26, 2024
1 parent 5070856 commit 21ea2bc
Show file tree
Hide file tree
Showing 7 changed files with 153 additions and 4 deletions.
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Makefile.toml
Original file line number Diff line number Diff line change
Expand Up @@ -1282,6 +1282,7 @@ echo If you still feel this is not enough, you may copy $(tput setaf 4)risedev$(
category = "RiseDev - CI"
dependencies = ["clean-data", "pre-start-dev"]
command = "target/debug/risedev-dev" # `risedev-dev` is always built in dev profile
env = { RISEDEV_CLEAN_START = true }
args = ["${@}"]
description = "Clean data and start a full RisingWave dev cluster using risedev-dev"

Expand Down
2 changes: 1 addition & 1 deletion src/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ serde_with::with_prefix!(batch_prefix "batch_");
pub enum MetaBackend {
#[default]
Mem,
Sql, // keep for backward compatibility
Sql, // any database url
Sqlite,
Postgres,
Mysql,
Expand Down
5 changes: 4 additions & 1 deletion src/risedevtool/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ glob = "0.3"
google-cloud-pubsub = "0.29"
indicatif = "0.17"
itertools = { workspace = true }
log = "0.4"
rdkafka = { workspace = true }
redis = "0.25"
regex = "1"
Expand All @@ -34,6 +35,7 @@ serde = { version = "1", features = ["derive"] }
serde_json = "1"
serde_with = "3"
serde_yaml = "0.9"
sqlx = { workspace = true, features = ["any"] }
tempfile = "3"
thiserror-ext = { workspace = true }
tokio = { version = "0.2", package = "madsim-tokio", features = [
Expand All @@ -46,7 +48,8 @@ tokio = { version = "0.2", package = "madsim-tokio", features = [
"fs",
] }
tracing = "0.1"
tracing-subscriber = "0.3"
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
url = "2"
workspace-hack = { path = "../workspace-hack" }
yaml-rust = "0.4"

Expand Down
12 changes: 12 additions & 0 deletions src/risedevtool/src/bin/risedev-dev.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ use risedev::{
};
use tempfile::tempdir;
use thiserror_ext::AsReport;
use tracing::level_filters::LevelFilter;
use tracing_subscriber::EnvFilter;
use yaml_rust::YamlEmitter;

#[derive(Default)]
Expand Down Expand Up @@ -378,6 +380,16 @@ fn main() -> Result<()> {
// Backtraces for RisingWave components are enabled in `Task::execute`.
std::env::set_var("RUST_BACKTRACE", "0");

// Init logger from a customized env var.
tracing_subscriber::fmt()
.with_env_filter(
EnvFilter::builder()
.with_default_directive(LevelFilter::WARN.into())
.with_env_var("RISEDEV_RUST_LOG")
.from_env_lossy(),
)
.init();

preflight_check()?;

let task_name = std::env::args()
Expand Down
1 change: 1 addition & 0 deletions src/risedevtool/src/service_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ pub enum MetaBackend {
Sqlite,
Postgres,
Mysql,
Env,
}

#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
Expand Down
133 changes: 131 additions & 2 deletions src/risedevtool/src/task/meta_node_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,48 @@
use std::env;
use std::path::{Path, PathBuf};
use std::process::Command;
use std::sync::LazyLock;

use anyhow::{anyhow, Result};
use anyhow::{anyhow, bail, Context, Result};
use itertools::Itertools;
use sqlx::{ConnectOptions, Database};
use tempfile::NamedTempFile;
use url::Url;

use super::{risingwave_cmd, ExecuteContext, Task};
use crate::util::{get_program_args, get_program_env_cmd, get_program_name};
use crate::util::{get_program_args, get_program_env_cmd, get_program_name, is_env_set};
use crate::{
add_hummock_backend, add_tempo_endpoint, Application, HummockInMemoryStrategy, MetaBackend,
MetaNodeConfig,
};

/// URL for connecting to the SQL meta store, retrieved from the env var `RISEDEV_SQL_ENDPOINT`.
/// If it is not set, a temporary sqlite file is created and used.
///
/// # Examples
///
/// - `mysql://root:[email protected]:3306/metastore`
/// - `postgresql://localhost:5432/metastore`
/// - `sqlite:///path/to/file.db`
/// - `sqlite::memory:`
fn sql_endpoint_from_env() -> String {
static SQL_ENDPOINT: LazyLock<String> = LazyLock::new(|| {
if let Ok(endpoint) = env::var("RISEDEV_SQL_ENDPOINT") {
endpoint
} else {
let temp_path = NamedTempFile::with_suffix(".db").unwrap().into_temp_path();
let temp_sqlite_endpoint = format!("sqlite://{}?mode=rwc", temp_path.to_string_lossy());
tracing::warn!(
"env RISEDEV_SQL_ENDPOINT not set, use temporary sqlite `{}`",
temp_sqlite_endpoint
);
temp_sqlite_endpoint
}
});

SQL_ENDPOINT.to_owned()
}

pub struct MetaNodeService {
config: MetaNodeConfig,
}
Expand Down Expand Up @@ -139,6 +170,15 @@ impl MetaNodeService {
.arg("--sql-database")
.arg(&mysql_store_config.database);
}
MetaBackend::Env => {
let endpoint = sql_endpoint_from_env();
is_persistent_meta_store = true;

cmd.arg("--backend")
.arg("sql")
.arg("--sql-endpoint")
.arg(endpoint);
}
}

let provide_minio = config.provide_minio.as_ref().unwrap();
Expand Down Expand Up @@ -245,6 +285,13 @@ impl Task for MetaNodeService {
cmd.arg("--config-path")
.arg(Path::new(&prefix_config).join("risingwave.toml"));

if let MetaBackend::Env = self.config.meta_backend {
if is_env_set("RISEDEV_CLEAN_START") {
ctx.pb.set_message("initializing meta store from env...");
initialize_meta_store()?;
}
}

if !self.config.user_managed {
ctx.run_command(ctx.tmux_run(cmd)?)?;
ctx.pb.set_message("started");
Expand All @@ -266,3 +313,85 @@ impl Task for MetaNodeService {
self.config.id.clone()
}
}

fn initialize_meta_store() -> Result<(), anyhow::Error> {
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()?;

let endpoint: Url = sql_endpoint_from_env()
.parse()
.context("invalid url for SQL endpoint")?;
let scheme = endpoint.scheme();

// Retrieve the database name to use for the meta store.
// Modify the URL to establish a temporary connection to initialize that database.
let (db, init_url) = if sqlx::Postgres::URL_SCHEMES.contains(&scheme) {
let options = sqlx::postgres::PgConnectOptions::from_url(&endpoint)
.context("invalid database url for Postgres meta backend")?;

let db = options
.get_database()
.unwrap_or_else(|| options.get_username()) // PG defaults to username if no database is specified
.to_owned();
// https://www.postgresql.org/docs/current/manage-ag-templatedbs.html
let init_options = options.database("template1");
let init_url = init_options.to_url_lossy();

(db, init_url)
} else if sqlx::MySql::URL_SCHEMES.contains(&scheme) {
let options = sqlx::mysql::MySqlConnectOptions::from_url(&endpoint)
.context("invalid database url for MySQL meta backend")?;

let db = options
.get_database()
.context("database not specified for MySQL meta backend")?
.to_owned();
// Effectively unset the database field when converting back to URL, meaning connect to no database.
let init_options = options.database("");
let init_url = init_options.to_url_lossy();

(db, init_url)
} else if sqlx::Sqlite::URL_SCHEMES.contains(&scheme) {
// For SQLite, simply empty the file.
let options = sqlx::sqlite::SqliteConnectOptions::from_url(&endpoint)
.context("invalid database url for SQLite meta backend")?;

if endpoint.as_str().contains(":memory:") || endpoint.as_str().contains("mode=memory") {
// SQLite in-memory database does not need initialization.
} else {
let filename = options.get_filename();
fs_err::write(filename, b"").context("failed to empty SQLite file")?;
}

return Ok(());
} else {
bail!("unsupported SQL scheme for meta backend: {}", scheme);
};

rt.block_on(async move {
use sqlx::any::*;
install_default_drivers();

let options = sqlx::any::AnyConnectOptions::from_url(&init_url)?
.log_statements(log::LevelFilter::Debug);

let mut conn = options
.connect()
.await
.context("failed to connect to a template database for meta store")?;

// Intentionally not executing in a transaction because Postgres does not allow it.
sqlx::raw_sql(&format!("DROP DATABASE IF EXISTS {};", db))
.execute(&mut conn)
.await?;
sqlx::raw_sql(&format!("CREATE DATABASE {};", db))
.execute(&mut conn)
.await?;

Ok::<_, anyhow::Error>(())
})
.context("failed to initialize database for meta store")?;

Ok(())
}

0 comments on commit 21ea2bc

Please sign in to comment.