-
Notifications
You must be signed in to change notification settings - Fork 590
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(risedev): support configuring meta store by url from env var #19560
Merged
+153
−4
Merged
Changes from all commits
Commits
Show all changes
5 commits
Select commit
Hold shift + click to select a range
9d20585
feat(risedev): support configuring meta store by url from env var
BugenZhao ed8cf2c
add more docs
BugenZhao 314f7cd
empty sqlite file & add docs
BugenZhao b913512
defaults to sqlite if unset
BugenZhao be023ee
default level to warn
BugenZhao File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -15,17 +15,48 @@ | |
use std::env; | ||
use std::path::{Path, PathBuf}; | ||
use std::process::Command; | ||
use std::sync::LazyLock; | ||
|
||
use anyhow::{anyhow, Result}; | ||
use anyhow::{anyhow, bail, Context, Result}; | ||
use itertools::Itertools; | ||
use sqlx::{ConnectOptions, Database}; | ||
use tempfile::NamedTempFile; | ||
use url::Url; | ||
|
||
use super::{risingwave_cmd, ExecuteContext, Task}; | ||
use crate::util::{get_program_args, get_program_env_cmd, get_program_name}; | ||
use crate::util::{get_program_args, get_program_env_cmd, get_program_name, is_env_set}; | ||
use crate::{ | ||
add_hummock_backend, add_tempo_endpoint, Application, HummockInMemoryStrategy, MetaBackend, | ||
MetaNodeConfig, | ||
}; | ||
|
||
/// URL for connecting to the SQL meta store, retrieved from the env var `RISEDEV_SQL_ENDPOINT`. | ||
/// If it is not set, a temporary sqlite file is created and used. | ||
/// | ||
/// # Examples | ||
/// | ||
/// - `mysql://root:[email protected]:3306/metastore` | ||
/// - `postgresql://localhost:5432/metastore` | ||
/// - `sqlite:///path/to/file.db` | ||
/// - `sqlite::memory:` | ||
fn sql_endpoint_from_env() -> String { | ||
static SQL_ENDPOINT: LazyLock<String> = LazyLock::new(|| { | ||
if let Ok(endpoint) = env::var("RISEDEV_SQL_ENDPOINT") { | ||
endpoint | ||
} else { | ||
let temp_path = NamedTempFile::with_suffix(".db").unwrap().into_temp_path(); | ||
let temp_sqlite_endpoint = format!("sqlite://{}?mode=rwc", temp_path.to_string_lossy()); | ||
tracing::warn!( | ||
"env RISEDEV_SQL_ENDPOINT not set, use temporary sqlite `{}`", | ||
temp_sqlite_endpoint | ||
); | ||
temp_sqlite_endpoint | ||
} | ||
}); | ||
|
||
SQL_ENDPOINT.to_owned() | ||
} | ||
|
||
pub struct MetaNodeService { | ||
config: MetaNodeConfig, | ||
} | ||
|
@@ -139,6 +170,15 @@ impl MetaNodeService { | |
.arg("--sql-database") | ||
.arg(&mysql_store_config.database); | ||
} | ||
MetaBackend::Env => { | ||
let endpoint = sql_endpoint_from_env(); | ||
is_persistent_meta_store = true; | ||
|
||
cmd.arg("--backend") | ||
.arg("sql") | ||
.arg("--sql-endpoint") | ||
.arg(endpoint); | ||
} | ||
} | ||
|
||
let provide_minio = config.provide_minio.as_ref().unwrap(); | ||
|
@@ -245,6 +285,13 @@ impl Task for MetaNodeService { | |
cmd.arg("--config-path") | ||
.arg(Path::new(&prefix_config).join("risingwave.toml")); | ||
|
||
if let MetaBackend::Env = self.config.meta_backend { | ||
if is_env_set("RISEDEV_CLEAN_START") { | ||
ctx.pb.set_message("initializing meta store from env..."); | ||
initialize_meta_store()?; | ||
} | ||
} | ||
|
||
if !self.config.user_managed { | ||
ctx.run_command(ctx.tmux_run(cmd)?)?; | ||
ctx.pb.set_message("started"); | ||
|
@@ -266,3 +313,85 @@ impl Task for MetaNodeService { | |
self.config.id.clone() | ||
} | ||
} | ||
|
||
fn initialize_meta_store() -> Result<(), anyhow::Error> { | ||
let rt = tokio::runtime::Builder::new_current_thread() | ||
.enable_all() | ||
.build()?; | ||
|
||
let endpoint: Url = sql_endpoint_from_env() | ||
.parse() | ||
.context("invalid url for SQL endpoint")?; | ||
let scheme = endpoint.scheme(); | ||
|
||
// Retrieve the database name to use for the meta store. | ||
// Modify the URL to establish a temporary connection to initialize that database. | ||
let (db, init_url) = if sqlx::Postgres::URL_SCHEMES.contains(&scheme) { | ||
let options = sqlx::postgres::PgConnectOptions::from_url(&endpoint) | ||
.context("invalid database url for Postgres meta backend")?; | ||
|
||
let db = options | ||
.get_database() | ||
.unwrap_or_else(|| options.get_username()) // PG defaults to username if no database is specified | ||
.to_owned(); | ||
// https://www.postgresql.org/docs/current/manage-ag-templatedbs.html | ||
let init_options = options.database("template1"); | ||
let init_url = init_options.to_url_lossy(); | ||
|
||
(db, init_url) | ||
} else if sqlx::MySql::URL_SCHEMES.contains(&scheme) { | ||
let options = sqlx::mysql::MySqlConnectOptions::from_url(&endpoint) | ||
.context("invalid database url for MySQL meta backend")?; | ||
|
||
let db = options | ||
.get_database() | ||
.context("database not specified for MySQL meta backend")? | ||
.to_owned(); | ||
// Effectively unset the database field when converting back to URL, meaning connect to no database. | ||
let init_options = options.database(""); | ||
let init_url = init_options.to_url_lossy(); | ||
|
||
(db, init_url) | ||
} else if sqlx::Sqlite::URL_SCHEMES.contains(&scheme) { | ||
// For SQLite, simply empty the file. | ||
let options = sqlx::sqlite::SqliteConnectOptions::from_url(&endpoint) | ||
.context("invalid database url for SQLite meta backend")?; | ||
|
||
if endpoint.as_str().contains(":memory:") || endpoint.as_str().contains("mode=memory") { | ||
// SQLite in-memory database does not need initialization. | ||
} else { | ||
let filename = options.get_filename(); | ||
fs_err::write(filename, b"").context("failed to empty SQLite file")?; | ||
} | ||
|
||
return Ok(()); | ||
} else { | ||
bail!("unsupported SQL scheme for meta backend: {}", scheme); | ||
}; | ||
|
||
rt.block_on(async move { | ||
use sqlx::any::*; | ||
install_default_drivers(); | ||
|
||
let options = sqlx::any::AnyConnectOptions::from_url(&init_url)? | ||
.log_statements(log::LevelFilter::Debug); | ||
|
||
let mut conn = options | ||
.connect() | ||
.await | ||
.context("failed to connect to a template database for meta store")?; | ||
|
||
// Intentionally not executing in a transaction because Postgres does not allow it. | ||
sqlx::raw_sql(&format!("DROP DATABASE IF EXISTS {};", db)) | ||
.execute(&mut conn) | ||
.await?; | ||
sqlx::raw_sql(&format!("CREATE DATABASE {};", db)) | ||
.execute(&mut conn) | ||
.await?; | ||
|
||
Ok::<_, anyhow::Error>(()) | ||
}) | ||
.context("failed to initialize database for meta store")?; | ||
|
||
Ok(()) | ||
} |
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It might be a little confusing that
RISEDEV_CLEAN_START
only works withmeta-backend: env
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's not meant to be set by user directly but when running
risedev ci-start
.The basic idea is that, I don't want to hack the procedure of
risedev clean-data
to involve such operation, so it's postponed to the next startup. It only applies toci-start
becauseci-start
we are sure that we want a fresh cluster