Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(risedev): support configuring meta store by url from env var #19560

Merged
merged 5 commits into from
Nov 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Makefile.toml
Original file line number Diff line number Diff line change
Expand Up @@ -1282,6 +1282,7 @@ echo If you still feel this is not enough, you may copy $(tput setaf 4)risedev$(
category = "RiseDev - CI"
dependencies = ["clean-data", "pre-start-dev"]
command = "target/debug/risedev-dev" # `risedev-dev` is always built in dev profile
env = { RISEDEV_CLEAN_START = true }
args = ["${@}"]
description = "Clean data and start a full RisingWave dev cluster using risedev-dev"

Expand Down
2 changes: 1 addition & 1 deletion src/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ serde_with::with_prefix!(batch_prefix "batch_");
pub enum MetaBackend {
#[default]
Mem,
Sql, // keep for backward compatibility
Sql, // any database url
Sqlite,
Postgres,
Mysql,
Expand Down
5 changes: 4 additions & 1 deletion src/risedevtool/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ glob = "0.3"
google-cloud-pubsub = "0.29"
indicatif = "0.17"
itertools = { workspace = true }
log = "0.4"
rdkafka = { workspace = true }
redis = "0.25"
regex = "1"
Expand All @@ -34,6 +35,7 @@ serde = { version = "1", features = ["derive"] }
serde_json = "1"
serde_with = "3"
serde_yaml = "0.9"
sqlx = { workspace = true, features = ["any"] }
tempfile = "3"
thiserror-ext = { workspace = true }
tokio = { version = "0.2", package = "madsim-tokio", features = [
Expand All @@ -46,7 +48,8 @@ tokio = { version = "0.2", package = "madsim-tokio", features = [
"fs",
] }
tracing = "0.1"
tracing-subscriber = "0.3"
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
url = "2"
workspace-hack = { path = "../workspace-hack" }
yaml-rust = "0.4"

Expand Down
12 changes: 12 additions & 0 deletions src/risedevtool/src/bin/risedev-dev.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ use risedev::{
};
use tempfile::tempdir;
use thiserror_ext::AsReport;
use tracing::level_filters::LevelFilter;
use tracing_subscriber::EnvFilter;
use yaml_rust::YamlEmitter;

#[derive(Default)]
Expand Down Expand Up @@ -378,6 +380,16 @@ fn main() -> Result<()> {
// Backtraces for RisingWave components are enabled in `Task::execute`.
std::env::set_var("RUST_BACKTRACE", "0");

// Init logger from a customized env var.
tracing_subscriber::fmt()
.with_env_filter(
EnvFilter::builder()
.with_default_directive(LevelFilter::WARN.into())
.with_env_var("RISEDEV_RUST_LOG")
.from_env_lossy(),
)
.init();

preflight_check()?;

let task_name = std::env::args()
Expand Down
1 change: 1 addition & 0 deletions src/risedevtool/src/service_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ pub enum MetaBackend {
Sqlite,
Postgres,
Mysql,
Env,
}

#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
Expand Down
133 changes: 131 additions & 2 deletions src/risedevtool/src/task/meta_node_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,48 @@
use std::env;
use std::path::{Path, PathBuf};
use std::process::Command;
use std::sync::LazyLock;

use anyhow::{anyhow, Result};
use anyhow::{anyhow, bail, Context, Result};
use itertools::Itertools;
use sqlx::{ConnectOptions, Database};
use tempfile::NamedTempFile;
use url::Url;

use super::{risingwave_cmd, ExecuteContext, Task};
use crate::util::{get_program_args, get_program_env_cmd, get_program_name};
use crate::util::{get_program_args, get_program_env_cmd, get_program_name, is_env_set};
use crate::{
add_hummock_backend, add_tempo_endpoint, Application, HummockInMemoryStrategy, MetaBackend,
MetaNodeConfig,
};

/// URL for connecting to the SQL meta store, retrieved from the env var `RISEDEV_SQL_ENDPOINT`.
/// If it is not set, a temporary sqlite file is created and used.
///
/// # Examples
///
/// - `mysql://root:[email protected]:3306/metastore`
/// - `postgresql://localhost:5432/metastore`
/// - `sqlite:///path/to/file.db`
/// - `sqlite::memory:`
fn sql_endpoint_from_env() -> String {
static SQL_ENDPOINT: LazyLock<String> = LazyLock::new(|| {
if let Ok(endpoint) = env::var("RISEDEV_SQL_ENDPOINT") {
endpoint
} else {
let temp_path = NamedTempFile::with_suffix(".db").unwrap().into_temp_path();
let temp_sqlite_endpoint = format!("sqlite://{}?mode=rwc", temp_path.to_string_lossy());
tracing::warn!(
"env RISEDEV_SQL_ENDPOINT not set, use temporary sqlite `{}`",
temp_sqlite_endpoint
);
temp_sqlite_endpoint
}
});

SQL_ENDPOINT.to_owned()
}

pub struct MetaNodeService {
config: MetaNodeConfig,
}
Expand Down Expand Up @@ -139,6 +170,15 @@ impl MetaNodeService {
.arg("--sql-database")
.arg(&mysql_store_config.database);
}
MetaBackend::Env => {
let endpoint = sql_endpoint_from_env();
is_persistent_meta_store = true;

cmd.arg("--backend")
.arg("sql")
.arg("--sql-endpoint")
.arg(endpoint);
}
}

let provide_minio = config.provide_minio.as_ref().unwrap();
Expand Down Expand Up @@ -245,6 +285,13 @@ impl Task for MetaNodeService {
cmd.arg("--config-path")
.arg(Path::new(&prefix_config).join("risingwave.toml"));

if let MetaBackend::Env = self.config.meta_backend {
if is_env_set("RISEDEV_CLEAN_START") {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It might be a little confusing that RISEDEV_CLEAN_START only works with meta-backend: env

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not meant to be set by user directly but when running risedev ci-start.

The basic idea is that, I don't want to hack the procedure of risedev clean-data to involve such operation, so it's postponed to the next startup. It only applies to ci-start because

ctx.pb.set_message("initializing meta store from env...");
initialize_meta_store()?;
}
}

if !self.config.user_managed {
ctx.run_command(ctx.tmux_run(cmd)?)?;
ctx.pb.set_message("started");
Expand All @@ -266,3 +313,85 @@ impl Task for MetaNodeService {
self.config.id.clone()
}
}

fn initialize_meta_store() -> Result<(), anyhow::Error> {
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()?;

let endpoint: Url = sql_endpoint_from_env()
.parse()
.context("invalid url for SQL endpoint")?;
let scheme = endpoint.scheme();

// Retrieve the database name to use for the meta store.
// Modify the URL to establish a temporary connection to initialize that database.
let (db, init_url) = if sqlx::Postgres::URL_SCHEMES.contains(&scheme) {
let options = sqlx::postgres::PgConnectOptions::from_url(&endpoint)
.context("invalid database url for Postgres meta backend")?;

let db = options
.get_database()
.unwrap_or_else(|| options.get_username()) // PG defaults to username if no database is specified
.to_owned();
// https://www.postgresql.org/docs/current/manage-ag-templatedbs.html
let init_options = options.database("template1");
let init_url = init_options.to_url_lossy();

(db, init_url)
} else if sqlx::MySql::URL_SCHEMES.contains(&scheme) {
let options = sqlx::mysql::MySqlConnectOptions::from_url(&endpoint)
.context("invalid database url for MySQL meta backend")?;

let db = options
.get_database()
.context("database not specified for MySQL meta backend")?
.to_owned();
// Effectively unset the database field when converting back to URL, meaning connect to no database.
let init_options = options.database("");
let init_url = init_options.to_url_lossy();

(db, init_url)
} else if sqlx::Sqlite::URL_SCHEMES.contains(&scheme) {
// For SQLite, simply empty the file.
let options = sqlx::sqlite::SqliteConnectOptions::from_url(&endpoint)
.context("invalid database url for SQLite meta backend")?;

if endpoint.as_str().contains(":memory:") || endpoint.as_str().contains("mode=memory") {
// SQLite in-memory database does not need initialization.
} else {
let filename = options.get_filename();
fs_err::write(filename, b"").context("failed to empty SQLite file")?;
}

return Ok(());
} else {
bail!("unsupported SQL scheme for meta backend: {}", scheme);
};

rt.block_on(async move {
use sqlx::any::*;
install_default_drivers();

let options = sqlx::any::AnyConnectOptions::from_url(&init_url)?
.log_statements(log::LevelFilter::Debug);

let mut conn = options
.connect()
.await
.context("failed to connect to a template database for meta store")?;

// Intentionally not executing in a transaction because Postgres does not allow it.
sqlx::raw_sql(&format!("DROP DATABASE IF EXISTS {};", db))
.execute(&mut conn)
.await?;
sqlx::raw_sql(&format!("CREATE DATABASE {};", db))
.execute(&mut conn)
.await?;

Ok::<_, anyhow::Error>(())
})
.context("failed to initialize database for meta store")?;

Ok(())
}