From bba2588bc7d324c9fa9cabf0667fc1ff63b456d5 Mon Sep 17 00:00:00 2001 From: Runji Wang Date: Mon, 26 Feb 2024 18:28:15 +0800 Subject: [PATCH] remove `wasm_storage_url` parameter and store wasm binary in meta Signed-off-by: Runji Wang --- Cargo.lock | 5 +- e2e_test/batch/catalog/pg_settings.slt.part | 1 - proto/catalog.proto | 3 + proto/expr.proto | 16 ++-- proto/meta.proto | 2 +- src/common/src/system_param/mod.rs | 2 - src/common/src/system_param/reader.rs | 7 -- src/config/docs.md | 1 - src/config/example.toml | 1 - src/expr/core/Cargo.toml | 5 +- src/expr/core/src/expr/expr_udf.rs | 47 ++++-------- .../core/src/table_function/user_defined.rs | 15 ++-- src/frontend/Cargo.toml | 1 + src/frontend/src/catalog/function_catalog.rs | 2 + src/frontend/src/expr/table_function.rs | 1 + .../src/expr/user_defined_function.rs | 2 + src/frontend/src/handler/create_function.rs | 75 ++++++------------- .../src/handler/create_sql_function.rs | 1 + .../migration/src/m20230908_072257_init.rs | 2 + src/meta/model_v2/src/function.rs | 2 + src/meta/src/backup_restore/restore.rs | 1 - src/meta/src/controller/mod.rs | 1 + src/workspace-hack/Cargo.toml | 1 + 23 files changed, 75 insertions(+), 119 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 1e711a211887b..2f1a3496df017 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -9168,6 +9168,7 @@ dependencies = [ "itertools 0.12.0", "linkme", "madsim-tokio", + "md5", "moka", "num-traits", "openssl", @@ -9175,7 +9176,6 @@ dependencies = [ "paste", "risingwave_common", "risingwave_expr_macro", - "risingwave_object_store", "risingwave_pb", "risingwave_udf", "smallvec", @@ -9184,6 +9184,7 @@ dependencies = [ "thiserror-ext", "tracing", "workspace-hack", + "zstd 0.13.0", ] [[package]] @@ -9314,6 +9315,7 @@ dependencies = [ "tracing", "uuid", "workspace-hack", + "zstd 0.13.0", ] [[package]] @@ -13799,6 +13801,7 @@ dependencies = [ "md-5", "memchr", "mio", + "moka", "nom", "num-bigint", "num-integer", diff --git a/e2e_test/batch/catalog/pg_settings.slt.part b/e2e_test/batch/catalog/pg_settings.slt.part index c8e927ba72b9f..9ff41b8dbea45 100644 --- a/e2e_test/batch/catalog/pg_settings.slt.part +++ b/e2e_test/batch/catalog/pg_settings.slt.part @@ -7,7 +7,6 @@ internal data_directory internal parallel_compact_size_mb internal sstable_size_mb internal state_store -internal wasm_storage_url postmaster backup_storage_directory postmaster backup_storage_url postmaster barrier_interval_ms diff --git a/proto/catalog.proto b/proto/catalog.proto index 99fd1b0a69514..452e683e26fcb 100644 --- a/proto/catalog.proto +++ b/proto/catalog.proto @@ -219,7 +219,10 @@ message Function { string language = 7; optional string link = 8; optional string identifier = 10; + // The source code of the function. optional string body = 14; + // The zstd-compressed binary of the function. + optional bytes compressed_binary = 16; oneof kind { ScalarFunction scalar = 11; diff --git a/proto/expr.proto b/proto/expr.proto index 48bf9b55227ef..00219dc4fe412 100644 --- a/proto/expr.proto +++ b/proto/expr.proto @@ -516,16 +516,17 @@ message UserDefinedFunction { repeated string arg_names = 8; repeated data.DataType arg_types = 3; string language = 4; - // For external UDF: the link to the external function service. - // For WASM UDF: the link to the wasm binary file. + // The link to the external function service. optional string link = 5; - // An unique identifier for the function. - // For external UDF, it's the name of the function in the external function service. - // For WASM UDF, it's the name of the function in the wasm binary file. - // For JavaScript UDF, it's the name of the function. + // An unique identifier to the function. + // - If `link` is not empty, the name of the function in the external function service. + // - If `language` is `rust` or `wasm`, the name of the function in the wasm binary file. + // - If `language` is `javascript`, the name of the function. optional string identifier = 6; - // For JavaScript UDF, it's the body of the function. + // - If `language` is `javascript`, the source code of the function. optional string body = 7; + // - If `language` is `rust` or `wasm`, the zstd-compressed wasm binary. + optional bytes compressed_binary = 9; } // Additional information for user defined table functions. @@ -536,4 +537,5 @@ message UserDefinedTableFunction { optional string link = 5; optional string identifier = 6; optional string body = 7; + optional bytes compressed_binary = 9; } diff --git a/proto/meta.proto b/proto/meta.proto index 01492cc0c4fff..1db290af7b308 100644 --- a/proto/meta.proto +++ b/proto/meta.proto @@ -555,7 +555,7 @@ message SystemParams { optional uint32 parallel_compact_size_mb = 11; optional uint32 max_concurrent_creating_streaming_jobs = 12; optional bool pause_on_next_bootstrap = 13; - optional string wasm_storage_url = 14; + optional string wasm_storage_url = 14 [deprecated = true]; optional bool enable_tracing = 15; } diff --git a/src/common/src/system_param/mod.rs b/src/common/src/system_param/mod.rs index 82677e57e9753..19c36baf09c68 100644 --- a/src/common/src/system_param/mod.rs +++ b/src/common/src/system_param/mod.rs @@ -86,7 +86,6 @@ macro_rules! for_all_params { { backup_storage_directory, String, None, true, "Remote directory for storing snapshots.", }, { max_concurrent_creating_streaming_jobs, u32, Some(1_u32), true, "Max number of concurrent creating streaming jobs.", }, { pause_on_next_bootstrap, bool, Some(false), true, "Whether to pause all data sources on next bootstrap.", }, - { wasm_storage_url, String, Some("fs://.risingwave/data".to_string()), false, "", }, { enable_tracing, bool, Some(false), true, "Whether to enable distributed tracing.", }, } }; @@ -440,7 +439,6 @@ mod tests { (BACKUP_STORAGE_DIRECTORY_KEY, "a"), (MAX_CONCURRENT_CREATING_STREAMING_JOBS_KEY, "1"), (PAUSE_ON_NEXT_BOOTSTRAP_KEY, "false"), - (WASM_STORAGE_URL_KEY, "a"), (ENABLE_TRACING_KEY, "true"), ("a_deprecated_param", "foo"), ]; diff --git a/src/common/src/system_param/reader.rs b/src/common/src/system_param/reader.rs index cf17c7bb43dd5..3374e72120238 100644 --- a/src/common/src/system_param/reader.rs +++ b/src/common/src/system_param/reader.rs @@ -160,11 +160,4 @@ where .enable_tracing .unwrap_or_else(default::enable_tracing) } - - fn wasm_storage_url(&self) -> &str { - self.inner() - .wasm_storage_url - .as_ref() - .unwrap_or(&default::WASM_STORAGE_URL) - } } diff --git a/src/config/docs.md b/src/config/docs.md index 36fd40ce2d13a..74e5d393978bb 100644 --- a/src/config/docs.md +++ b/src/config/docs.md @@ -150,4 +150,3 @@ This page is automatically generated by `./risedev generate-example-config` | pause_on_next_bootstrap | Whether to pause all data sources on next bootstrap. | false | | sstable_size_mb | Target size of the Sstable. | 256 | | state_store | | | -| wasm_storage_url | | "fs://.risingwave/data" | diff --git a/src/config/example.toml b/src/config/example.toml index 59c68aff3c7c0..6a6314d7832d2 100644 --- a/src/config/example.toml +++ b/src/config/example.toml @@ -195,5 +195,4 @@ block_size_kb = 64 bloom_false_positive = 0.001 max_concurrent_creating_streaming_jobs = 1 pause_on_next_bootstrap = false -wasm_storage_url = "fs://.risingwave/data" enable_tracing = false diff --git a/src/expr/core/Cargo.toml b/src/expr/core/Cargo.toml index 073b32a3907f3..1236c33d93967 100644 --- a/src/expr/core/Cargo.toml +++ b/src/expr/core/Cargo.toml @@ -38,14 +38,14 @@ futures-async-stream = { workspace = true } futures-util = "0.3" itertools = "0.12" linkme = { version = "0.3", features = ["used_linker"] } -moka = { version = "0.12", features = ["future"] } +md5 = "0.7" +moka = { version = "0.12", features = ["sync"] } num-traits = "0.2" openssl = { version = "0.10", features = ["vendored"] } parse-display = "0.9" paste = "1" risingwave_common = { workspace = true } risingwave_expr_macro = { path = "../macro" } -risingwave_object_store = { workspace = true } risingwave_pb = { workspace = true } risingwave_udf = { workspace = true } smallvec = "1" @@ -57,6 +57,7 @@ tokio = { version = "0.2", package = "madsim-tokio", features = [ "macros", ] } tracing = "0.1" +zstd = { version = "0.13", default-features = false } [target.'cfg(not(madsim))'.dependencies] workspace-hack = { path = "../../workspace-hack" } diff --git a/src/expr/core/src/expr/expr_udf.rs b/src/expr/core/src/expr/expr_udf.rs index 246944ae5d9d3..ac7ae6286885e 100644 --- a/src/expr/core/src/expr/expr_udf.rs +++ b/src/expr/core/src/expr/expr_udf.rs @@ -24,13 +24,10 @@ use arrow_udf_js::{CallMode, Runtime as JsRuntime}; use arrow_udf_wasm::Runtime as WasmRuntime; use await_tree::InstrumentAwait; use cfg_or_panic::cfg_or_panic; -use moka::future::Cache; +use moka::sync::Cache; use risingwave_common::array::{ArrayError, ArrayRef, DataChunk}; -use risingwave_common::config::ObjectStoreConfig; use risingwave_common::row::OwnedRow; use risingwave_common::types::{DataType, Datum}; -use risingwave_object_store::object::build_remote_object_store; -use risingwave_object_store::object::object_metrics::ObjectStoreMetrics; use risingwave_pb::expr::ExprNode; use risingwave_udf::ArrowFlightUdfClient; use thiserror_ext::AsReport; @@ -178,12 +175,11 @@ impl Build for UserDefinedFunction { let imp = match udf.language.as_str() { #[cfg(not(madsim))] "wasm" | "rust" => { - let link = udf.get_link()?; - // Use `block_in_place` as an escape hatch to run async code here in sync context. - // Calling `block_on` directly will panic. - UdfImpl::Wasm(tokio::task::block_in_place(|| { - tokio::runtime::Handle::current().block_on(get_or_create_wasm_runtime(link)) - })?) + let compressed_wasm_binary = udf.get_compressed_binary()?; + let wasm_binary = zstd::stream::decode_all(compressed_wasm_binary.as_slice()) + .context("failed to decompress wasm binary")?; + let runtime = get_or_create_wasm_runtime(&wasm_binary)?; + UdfImpl::Wasm(runtime) } "javascript" => { let mut rt = JsRuntime::new()?; @@ -260,38 +256,21 @@ pub(crate) fn get_or_create_flight_client(link: &str) -> Result Result> { - static RUNTIMES: LazyLock>> = LazyLock::new(|| { +pub fn get_or_create_wasm_runtime(binary: &[u8]) -> Result> { + static RUNTIMES: LazyLock>> = LazyLock::new(|| { Cache::builder() .time_to_idle(Duration::from_secs(60)) .build() }); - if let Some(runtime) = RUNTIMES.get(link).await { + let md5 = md5::compute(binary); + if let Some(runtime) = RUNTIMES.get(&md5) { return Ok(runtime.clone()); } - // create new runtime - let (wasm_storage_url, object_name) = link - .rsplit_once('/') - .context("invalid link for wasm function")?; - - // load wasm binary from object store - let object_store = build_remote_object_store( - wasm_storage_url, - Arc::new(ObjectStoreMetrics::unused()), - "Wasm Engine", - ObjectStoreConfig::default(), - ) - .await; - let binary = object_store - .read(object_name, ..) - .await - .context("failed to load wasm binary from object storage")?; - - let runtime = Arc::new(arrow_udf_wasm::Runtime::new(&binary)?); - RUNTIMES.insert(link.into(), runtime.clone()).await; + let runtime = Arc::new(arrow_udf_wasm::Runtime::new(binary)?); + RUNTIMES.insert(md5, runtime.clone()); Ok(runtime) } diff --git a/src/expr/core/src/table_function/user_defined.rs b/src/expr/core/src/table_function/user_defined.rs index 06383543ceb7b..ad9ba03943662 100644 --- a/src/expr/core/src/table_function/user_defined.rs +++ b/src/expr/core/src/table_function/user_defined.rs @@ -14,6 +14,7 @@ use std::sync::Arc; +use anyhow::Context; use arrow_array::RecordBatch; use arrow_schema::{Field, Fields, Schema, SchemaRef}; use arrow_udf_js::{CallMode, Runtime as JsRuntime}; @@ -188,14 +189,12 @@ pub fn new_user_defined(prost: &PbTableFunction, chunk_size: usize) -> Result { - let link = udtf.get_link()?; - // Use `block_in_place` as an escape hatch to run async code here in sync context. - // Calling `block_on` directly will panic. - UdfImpl::Wasm(tokio::task::block_in_place(|| { - tokio::runtime::Handle::current() - .block_on(crate::expr::expr_udf::get_or_create_wasm_runtime(link)) - })?) + "wasm" | "rust" => { + let compressed_wasm_binary = udtf.get_compressed_binary()?; + let wasm_binary = zstd::stream::decode_all(compressed_wasm_binary.as_slice()) + .context("failed to decompress wasm binary")?; + let runtime = crate::expr::expr_udf::get_or_create_wasm_runtime(&wasm_binary)?; + UdfImpl::Wasm(runtime) } "javascript" => { let mut rt = JsRuntime::new()?; diff --git a/src/frontend/Cargo.toml b/src/frontend/Cargo.toml index 5b9a6afcc6106..f2525a9274111 100644 --- a/src/frontend/Cargo.toml +++ b/src/frontend/Cargo.toml @@ -92,6 +92,7 @@ tokio-stream = "0.1" tonic = { workspace = true } tracing = "0.1" uuid = "1" +zstd = { version = "0.13", default-features = false } [target.'cfg(not(madsim))'.dependencies] workspace-hack = { path = "../workspace-hack" } diff --git a/src/frontend/src/catalog/function_catalog.rs b/src/frontend/src/catalog/function_catalog.rs index 96dbbe77c2a12..e18bcaa150344 100644 --- a/src/frontend/src/catalog/function_catalog.rs +++ b/src/frontend/src/catalog/function_catalog.rs @@ -33,6 +33,7 @@ pub struct FunctionCatalog { pub identifier: Option, pub body: Option, pub link: Option, + pub compressed_binary: Option>, } #[derive(Clone, Display, PartialEq, Eq, Hash, Debug)] @@ -68,6 +69,7 @@ impl From<&PbFunction> for FunctionCatalog { identifier: prost.identifier.clone(), body: prost.body.clone(), link: prost.link.clone(), + compressed_binary: prost.compressed_binary.clone(), } } } diff --git a/src/frontend/src/expr/table_function.rs b/src/frontend/src/expr/table_function.rs index e3000d0c245ab..c72c207c53783 100644 --- a/src/frontend/src/expr/table_function.rs +++ b/src/frontend/src/expr/table_function.rs @@ -79,6 +79,7 @@ impl TableFunction { link: c.link.clone(), identifier: c.identifier.clone(), body: c.body.clone(), + compressed_binary: c.compressed_binary.clone(), }), } } diff --git a/src/frontend/src/expr/user_defined_function.rs b/src/frontend/src/expr/user_defined_function.rs index 323d74b04be08..72f8c4031bb11 100644 --- a/src/frontend/src/expr/user_defined_function.rs +++ b/src/frontend/src/expr/user_defined_function.rs @@ -58,6 +58,7 @@ impl UserDefinedFunction { identifier: udf.identifier.clone(), body: udf.body.clone(), link: udf.link.clone(), + compressed_binary: udf.compressed_binary.clone(), }; Ok(Self { @@ -92,6 +93,7 @@ impl Expr for UserDefinedFunction { identifier: self.catalog.identifier.clone(), link: self.catalog.link.clone(), body: self.catalog.body.clone(), + compressed_binary: self.catalog.compressed_binary.clone(), })), } } diff --git a/src/frontend/src/handler/create_function.rs b/src/frontend/src/handler/create_function.rs index 0a8329e54be08..2f6a376748007 100644 --- a/src/frontend/src/handler/create_function.rs +++ b/src/frontend/src/handler/create_function.rs @@ -18,7 +18,6 @@ use bytes::Bytes; use itertools::Itertools; use pgwire::pg_response::StatementType; use risingwave_common::catalog::FunctionId; -use risingwave_common::system_param::reader::SystemParamsRead; use risingwave_common::types::DataType; use risingwave_expr::expr::get_or_create_wasm_runtime; use risingwave_object_store::object::{build_remote_object_store, ObjectStoreConfig}; @@ -123,6 +122,7 @@ pub async fn handle_create_function( let identifier; let mut link = None; let mut body = None; + let mut compressed_binary = None; match language.as_str() { "python" | "java" | "" => { @@ -204,23 +204,10 @@ pub async fn handle_create_function( .context("failed to build rust function")?; // below is the same as `wasm` language - let runtime = arrow_udf_wasm::Runtime::new(&wasm_binary)?; + let runtime = get_or_create_wasm_runtime(&wasm_binary)?; check_wasm_function(&runtime, &identifier)?; - let system_params = session.env().meta_client().get_system_params().await?; - let object_name = format!("{:?}.wasm", md5::compute(&wasm_binary)); - upload_wasm_binary( - system_params.wasm_storage_url(), - &object_name, - wasm_binary.into(), - ) - .await?; - - link = Some(format!( - "{}/{}", - system_params.wasm_storage_url(), - object_name - )); + compressed_binary = Some(zstd::stream::encode_all(wasm_binary.as_slice(), 0)?); } "wasm" => { identifier = wasm_identifier( @@ -235,38 +222,21 @@ pub async fn handle_create_function( ) .into()); }; - link = match using { - CreateFunctionUsing::Link(link) => { - let runtime = get_or_create_wasm_runtime(&link).await?; - check_wasm_function(&runtime, &identifier)?; - Some(link) - } + let wasm_binary = match using { + CreateFunctionUsing::Link(link) => download_binary_from_link(&link).await?, CreateFunctionUsing::Base64(encoded) => { // decode wasm binary from base64 use base64::prelude::{Engine, BASE64_STANDARD}; - let wasm_binary = BASE64_STANDARD + BASE64_STANDARD .decode(encoded) - .context("invalid base64 encoding")?; - - let runtime = arrow_udf_wasm::Runtime::new(&wasm_binary)?; - check_wasm_function(&runtime, &identifier)?; - - let system_params = session.env().meta_client().get_system_params().await?; - let object_name = format!("{:?}.wasm", md5::compute(&wasm_binary)); - upload_wasm_binary( - system_params.wasm_storage_url(), - &object_name, - wasm_binary.into(), - ) - .await?; - - Some(format!( - "{}/{}", - system_params.wasm_storage_url(), - object_name - )) + .context("invalid base64 encoding")? + .into() } }; + let runtime = get_or_create_wasm_runtime(&wasm_binary)?; + check_wasm_function(&runtime, &identifier)?; + + compressed_binary = Some(zstd::stream::encode_all(wasm_binary.as_ref(), 0)?); } _ => unreachable!("invalid language: {language}"), }; @@ -284,6 +254,7 @@ pub async fn handle_create_function( identifier: Some(identifier), link, body, + compressed_binary, owner: session.user_id(), }; @@ -293,13 +264,12 @@ pub async fn handle_create_function( Ok(PgResponse::empty_result(StatementType::CREATE_FUNCTION)) } -/// Upload wasm binary to object store. -async fn upload_wasm_binary( - wasm_storage_url: &str, - object_name: &str, - wasm_binary: Bytes, -) -> Result<()> { - // Note: it will panic if the url is invalid. We did a validation on meta startup. +/// Download wasm binary from a link to object store. +async fn download_binary_from_link(link: &str) -> Result { + let (wasm_storage_url, object_name) = link + .rsplit_once('/') + .context("invalid link for wasm function")?; + let object_store = build_remote_object_store( wasm_storage_url, Arc::new(ObjectStoreMetrics::unused()), @@ -307,11 +277,10 @@ async fn upload_wasm_binary( ObjectStoreConfig::default(), ) .await; - object_store - .upload(object_name, wasm_binary) + Ok(object_store + .read(object_name, ..) .await - .context("failed to upload wasm binary to object store")?; - Ok(()) + .context("failed to download wasm binary")?) } /// Check if the function exists in the wasm binary. diff --git a/src/frontend/src/handler/create_sql_function.rs b/src/frontend/src/handler/create_sql_function.rs index 311664735603f..e803d067c70df 100644 --- a/src/frontend/src/handler/create_sql_function.rs +++ b/src/frontend/src/handler/create_sql_function.rs @@ -234,6 +234,7 @@ pub async fn handle_create_sql_function( language, identifier: None, body: Some(body), + compressed_binary: None, link: None, owner: session.user_id(), }; diff --git a/src/meta/model_v2/migration/src/m20230908_072257_init.rs b/src/meta/model_v2/migration/src/m20230908_072257_init.rs index 444433d84c31e..7c6b61cc8af3b 100644 --- a/src/meta/model_v2/migration/src/m20230908_072257_init.rs +++ b/src/meta/model_v2/migration/src/m20230908_072257_init.rs @@ -723,6 +723,7 @@ impl MigrationTrait for Migration { .col(ColumnDef::new(Function::Link).string()) .col(ColumnDef::new(Function::Identifier).string()) .col(ColumnDef::new(Function::Body).string()) + .col(ColumnDef::new(Function::CompressedBinary).string()) .col(ColumnDef::new(Function::Kind).string().not_null()) .foreign_key( &mut ForeignKey::create() @@ -1123,6 +1124,7 @@ enum Function { Link, Identifier, Body, + CompressedBinary, Kind, } diff --git a/src/meta/model_v2/src/function.rs b/src/meta/model_v2/src/function.rs index ae68782a50fd1..56de18e344836 100644 --- a/src/meta/model_v2/src/function.rs +++ b/src/meta/model_v2/src/function.rs @@ -44,6 +44,7 @@ pub struct Model { pub link: Option, pub identifier: Option, pub body: Option, + pub compressed_binary: Option>, pub kind: FunctionKind, } @@ -99,6 +100,7 @@ impl From for ActiveModel { link: Set(function.link), identifier: Set(function.identifier), body: Set(function.body), + compressed_binary: Set(function.compressed_binary), kind: Set(function.kind.unwrap().into()), } } diff --git a/src/meta/src/backup_restore/restore.rs b/src/meta/src/backup_restore/restore.rs index cc544a0f589aa..1afe17b15bebc 100644 --- a/src/meta/src/backup_restore/restore.rs +++ b/src/meta/src/backup_restore/restore.rs @@ -244,7 +244,6 @@ mod tests { data_directory: Some("data_directory".into()), backup_storage_url: Some("backup_storage_url".into()), backup_storage_directory: Some("backup_storage_directory".into()), - wasm_storage_url: Some("wasm_storage_url".into()), ..SystemConfig::default().into_init_system_params() } } diff --git a/src/meta/src/controller/mod.rs b/src/meta/src/controller/mod.rs index 7fd70318a1ff3..6054a1030c683 100644 --- a/src/meta/src/controller/mod.rs +++ b/src/meta/src/controller/mod.rs @@ -284,6 +284,7 @@ impl From> for PbFunction { link: value.0.link, identifier: value.0.identifier, body: value.0.body, + compressed_binary: value.0.compressed_binary, kind: Some(value.0.kind.into()), } } diff --git a/src/workspace-hack/Cargo.toml b/src/workspace-hack/Cargo.toml index 9c640926f14f6..7ea2606e39d92 100644 --- a/src/workspace-hack/Cargo.toml +++ b/src/workspace-hack/Cargo.toml @@ -83,6 +83,7 @@ madsim-tokio = { version = "0.2", default-features = false, features = ["fs", "i md-5 = { version = "0.10" } memchr = { version = "2" } mio = { version = "0.8", features = ["net", "os-ext"] } +moka = { version = "0.12", features = ["future", "sync"] } nom = { version = "7" } num-bigint = { version = "0.4" } num-integer = { version = "0.1", features = ["i128"] }