Skip to content

Commit

Permalink
Squashed commit of the following:
Browse files Browse the repository at this point in the history
commit b52a004
Author: xxchan <[email protected]>
Date:   Thu Oct 26 14:25:18 2023 +0800

    update arrow-ipc

commit e94feeb
Author: xxchan <[email protected]>
Date:   Thu Oct 26 06:21:34 2023 +0000

    Fix "cargo-hakari"

commit 08a5601
Merge: 56e6fc4 942e99d
Author: xxchan <[email protected]>
Date:   Thu Oct 26 14:19:34 2023 +0800

    Merge branch 'main' into xxchan/wasm-udf

commit 942e99d
Author: Yufan Song <[email protected]>
Date:   Wed Oct 25 22:10:31 2023 -0700

    fix(nats-connector): change stream into optional string, add replace stream name logic (#13024)

commit 90fb4a3
Author: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
Date:   Thu Oct 26 04:25:11 2023 +0000

    chore(deps): Bump comfy-table from 7.0.1 to 7.1.0 (#13049)

    Signed-off-by: dependabot[bot] <[email protected]>
    Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>

commit b724be7
Author: jinser <[email protected]>
Date:   Thu Oct 26 00:26:15 2023 +0800

    feat: add `comment on` clause support (#12849)

    Co-authored-by: Richard Chien <[email protected]>
    Co-authored-by: August <[email protected]>

commit 7f791d6
Author: August <[email protected]>
Date:   Wed Oct 25 20:29:16 2023 +0800

    feat: move model_v2 and model_migration into a separate crates (#13058)

commit 7f82929
Author: Noel Kwan <[email protected]>
Date:   Wed Oct 25 16:57:45 2023 +0800

    fix(meta): persist internal tables of `CREATE TABLE` (#13039)

commit 09a67ab
Author: Noel Kwan <[email protected]>
Date:   Wed Oct 25 16:49:08 2023 +0800

    fix: `WAIT` should return error if timeout (#13045)

commit e48547d
Author: Runji Wang <[email protected]>
Date:   Wed Oct 25 16:41:16 2023 +0800

    refactor(type): switch jsonb to flat representation (#12952)

    Signed-off-by: Runji Wang <[email protected]>

commit 56e6fc4
Author: xxchan <[email protected]>
Date:   Wed Oct 25 15:33:36 2023 +0800

    fix merge issue

commit c644361
Merge: fcd6992 2d428b1
Author: xxchan <[email protected]>
Date:   Wed Oct 25 15:23:44 2023 +0800

    Merge remote-tracking branch 'origin/main' into xxchan/wasm-udf

commit fcd6992
Author: xxchan <[email protected]>
Date:   Wed Oct 25 14:28:53 2023 +0800

    fix s3 stuck

commit 21e9740
Author: xxchan <[email protected]>
Date:   Wed Oct 25 12:47:24 2023 +0800

    Revert "fix s3 stuck (why?)"

    This reverts commit f19a6b4.

commit f19a6b4
Author: xxchan <[email protected]>
Date:   Wed Sep 13 14:32:28 2023 +0800

    fix s3 stuck (why?)

commit 019f309
Author: xxchan <[email protected]>
Date:   Tue Sep 12 15:29:52 2023 +0800

    ON_ERROR_STOP=1

commit 6e4ee3c
Author: xxchan <[email protected]>
Date:   Tue Sep 12 15:09:58 2023 +0800

    generate-config

commit b63a1c3
Merge: 2b0cc96 53611bf
Author: xxchan <[email protected]>
Date:   Tue Sep 12 14:53:10 2023 +0800

    Merge remote-tracking branch 'origin/main' into xxchan/wasm-udf

commit 2b0cc96
Author: xxchan <[email protected]>
Date:   Sat Sep 9 23:49:43 2023 +0800

    fix conflicts

commit 6b13fe3
Author: xxchan <[email protected]>
Date:   Sat Sep 9 23:35:50 2023 +0800

    update system param default

commit a273943
Merge: cc34bfe f649aa6
Author: xxchan <[email protected]>
Date:   Sat Sep 9 23:33:38 2023 +0800

    Merge remote-tracking branch 'origin/main' into xxchan/wasm-udf

commit cc34bfe
Author: xxchan <[email protected]>
Date:   Tue Aug 1 17:47:42 2023 +0200

    use count_char as the example

commit f913f63
Merge: 53bf8e0 2637dbd
Author: xxchan <[email protected]>
Date:   Tue Aug 1 17:22:13 2023 +0200

    Merge branch 'main' into xxchan/wasm-udf

commit 53bf8e0
Author: xxchan <[email protected]>
Date:   Mon Jul 31 14:20:07 2023 +0200

    minor update

commit 70cee42
Author: xxchan <[email protected]>
Date:   Mon Jul 17 14:53:29 2023 +0200

    fix arrow_schema into -> try_into

commit a7d172d
Author: xxchan <[email protected]>
Date:   Fri Jul 14 16:31:20 2023 +0200

    buf format

commit 43a3290
Author: xxchan <[email protected]>
Date:   Thu Jul 13 23:04:16 2023 +0200

    add tinygo example & turn on wasi support

commit 61a4998
Author: xxchan <[email protected]>
Date:   Wed Jul 12 11:40:56 2023 +0200

    cleanup

commit 165d4d9
Author: xxchan <[email protected]>
Date:   Wed Jul 12 11:02:44 2023 +0200

    use object store to store wasm

commit 88979e4
Author: xxchan <[email protected]>
Date:   Tue Jul 11 15:32:52 2023 +0200

    add wasm_storage_url system param

commit a897320
Author: xxchan <[email protected]>
Date:   Thu Jul 6 20:04:45 2023 +0200

    Load compiled wasm module in expr 🚀🚀🚀

commit 63b3523
Author: xxchan <[email protected]>
Date:   Sun Jul 2 19:27:22 2023 +0200

    it works (although very slow)
  • Loading branch information
xxchan committed Oct 26, 2023
1 parent 71851d6 commit 26eaf7a
Show file tree
Hide file tree
Showing 42 changed files with 2,173 additions and 85 deletions.
1,075 changes: 1,064 additions & 11 deletions Cargo.lock

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ arrow-buffer = "48"
arrow-flight = "48"
arrow-select = "48"
arrow-ord = "48"
arrow-ipc = "48"
tikv-jemalloc-ctl = { git = "https://github.com/risingwavelabs/jemallocator.git", rev = "64a2d9" }
tikv-jemallocator = { git = "https://github.com/risingwavelabs/jemallocator.git", features = [
"profiling",
Expand Down
6 changes: 6 additions & 0 deletions proto/catalog.proto
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,7 @@ message Function {
repeated data.DataType arg_types = 5;
data.DataType return_type = 6;
string language = 7;
// external function service only
string link = 8;
string identifier = 10;

Expand All @@ -204,6 +205,11 @@ message Function {
message ScalarFunction {}
message TableFunction {}
message AggregateFunction {}

oneof extra {
expr.ExternalUdfExtra external = 14;
expr.WasmUdfExtra wasm = 15;
}
}

// See `TableCatalog` struct in frontend crate for more information.
Expand Down
25 changes: 25 additions & 0 deletions proto/expr.proto
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,7 @@ message ExprNode {
COL_DESCRIPTION = 2100;
CAST_REGCLASS = 2101;
}
// Only use this field for function call. For other types of expression, it should be UNSPECIFIED.
Type function_type = 1;
data.DataType return_type = 3;
oneof rex_node {
Expand Down Expand Up @@ -417,15 +418,39 @@ message WindowFunction {
WindowFrame frame = 5;
}

// Note: due to historic reasons, UserDefinedFunction is a oneof variant parallel to FunctionCall,
// while UserDefinedTableFunction is embedded as a field in TableFunction.

message UserDefinedFunction {
repeated ExprNode children = 1;
string name = 2;
repeated data.DataType arg_types = 3;
string language = 4;
// external function service only
string link = 5;
// An unique identifier for the function. Different kinds of UDF may handle this field differently.
// - For external UDF, it's the name of the function in the external function service.
// It doesn't need to be unique across different external function servers.
// - For wasm UDF, it's the name of the function stored in remote object store, and needs to be globally unique.
string identifier = 6;

oneof extra {
ExternalUdfExtra external = 7;
WasmUdfExtra wasm = 8;
}
}

// extra information for external functions
message ExternalUdfExtra {}

// extra information for wasm functions
message WasmUdfExtra {
// We store the url of the remote object store here, as it's inconvenient to pass it to build expr.
// Maybe we can deprecate it in the future.
string wasm_storage_url = 1;
}

// Additional information for user defined table functions.
message UserDefinedTableFunction {
repeated data.DataType arg_types = 3;
string language = 4;
Expand Down
1 change: 1 addition & 0 deletions proto/meta.proto
Original file line number Diff line number Diff line change
Expand Up @@ -514,6 +514,7 @@ message SystemParams {
optional uint32 parallel_compact_size_mb = 11;
optional uint32 max_concurrent_creating_streaming_jobs = 12;
optional bool pause_on_next_bootstrap = 13;
optional string wasm_storage_url = 14;
}

message GetSystemParamsRequest {}
Expand Down
4 changes: 4 additions & 0 deletions src/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -844,6 +844,9 @@ pub struct SystemConfig {
/// Whether to pause all data sources on next bootstrap.
#[serde(default = "default::system::pause_on_next_bootstrap")]
pub pause_on_next_bootstrap: Option<bool>,

#[serde(default = "default::system::wasm_storage_url")]
pub wasm_storage_url: Option<String>,
}

impl SystemConfig {
Expand All @@ -863,6 +866,7 @@ impl SystemConfig {
max_concurrent_creating_streaming_jobs: self.max_concurrent_creating_streaming_jobs,
pause_on_next_bootstrap: self.pause_on_next_bootstrap,
telemetry_enabled: None, // deprecated
wasm_storage_url: self.wasm_storage_url,
}
}
}
Expand Down
1 change: 1 addition & 0 deletions src/common/src/system_param/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ macro_rules! for_all_params {
{ backup_storage_directory, String, Some("backup".to_string()), true },
{ max_concurrent_creating_streaming_jobs, u32, Some(1_u32), true },
{ pause_on_next_bootstrap, bool, Some(false), true },
{ wasm_storage_url, String, Some("fs://@/tmp/risingwave".to_string()), false },
}
};
}
Expand Down
4 changes: 4 additions & 0 deletions src/common/src/system_param/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,10 @@ impl SystemParamsReader {
self.prost.pause_on_next_bootstrap.unwrap_or(false)
}

pub fn wasm_storage_url(&self) -> &str {
self.prost.wasm_storage_url.as_ref().unwrap()
}

pub fn to_kv(&self) -> Vec<(String, String)> {
system_params_to_kv(&self.prost).unwrap()
}
Expand Down
1 change: 1 addition & 0 deletions src/config/example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -166,3 +166,4 @@ backup_storage_url = "memory"
backup_storage_directory = "backup"
max_concurrent_creating_streaming_jobs = 1
pause_on_next_bootstrap = false
wasm_storage_url = "fs://@/tmp/risingwave"
3 changes: 3 additions & 0 deletions src/expr/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@ risingwave_udf = { workspace = true }
smallvec = "1"
static_assertions = "1"
thiserror = "1"
tokio = { version = "0.2", package = "madsim-tokio", features = [
"rt-multi-thread",
] }
tracing = "0.1"

[target.'cfg(not(madsim))'.dependencies]
Expand Down
6 changes: 4 additions & 2 deletions src/expr/core/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,9 @@ pub enum ExprError {
Internal(#[from] anyhow::Error),

#[error("UDF error: {0}")]
Udf(#[from] risingwave_udf::Error),
ExternalUdf(#[from] risingwave_udf::Error),
#[error("UDF error: {0}")]
WasmUdf(#[from] risingwave_udf::wasm::WasmUdfError),

#[error("not a constant")]
NotConstant,
Expand All @@ -98,7 +100,7 @@ pub enum ExprError {
InvalidState(String),
}

static_assertions::const_assert_eq!(std::mem::size_of::<ExprError>(), 40);
static_assertions::const_assert_eq!(std::mem::size_of::<ExprError>(), 48);

impl From<ExprError> for RwError {
fn from(s: ExprError) -> Self {
Expand Down
81 changes: 67 additions & 14 deletions src/expr/core/src/expr/expr_udf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,11 @@ use cfg_or_panic::cfg_or_panic;
use risingwave_common::array::{ArrayRef, DataChunk};
use risingwave_common::row::OwnedRow;
use risingwave_common::types::{DataType, Datum};
use risingwave_pb::expr::ExprNode;
use risingwave_pb::expr::user_defined_function::PbExtra;
use risingwave_pb::expr::{ExprNode, PbExternalUdfExtra, PbWasmUdfExtra};
use risingwave_udf::wasm::{InstantiatedComponent, WasmEngine};
use risingwave_udf::ArrowFlightUdfClient;
use tracing::Instrument;

use super::{BoxedExpression, Build};
use crate::expr::Expression;
Expand All @@ -35,11 +38,36 @@ pub struct UdfExpression {
arg_types: Vec<DataType>,
return_type: DataType,
arg_schema: Arc<Schema>,
client: Arc<ArrowFlightUdfClient>,
identifier: String,
imp: UdfImpl,
span: await_tree::Span,
}

enum UdfImpl {
External {
client: Arc<ArrowFlightUdfClient>,
identifier: String,
},
Wasm {
component: InstantiatedComponent,
},
}

impl std::fmt::Debug for UdfImpl {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::External { client, identifier } => f
.debug_struct("External")
.field("client", client)
.field("identifier", identifier)
.finish(),
Self::Wasm { component: _ } => f
.debug_struct("Wasm")
// .field("component", component)
.finish(),
}
}
}

#[async_trait::async_trait]
impl Expression for UdfExpression {
fn return_type(&self) -> DataType {
Expand Down Expand Up @@ -98,19 +126,27 @@ impl UdfExpression {
)
.expect("failed to build record batch");

let output = self
.client
.call(&self.identifier, input)
.instrument_await(self.span.clone())
.await?;
let output: arrow_array::RecordBatch = match &self.imp {
UdfImpl::Wasm { component } => {
component
.eval(input)
.instrument_await(self.span.clone())
.await?
}
UdfImpl::External { client, identifier } => {
client
.call(identifier, input)
.instrument_await(self.span.clone())
.await?
}
};
if output.num_rows() != vis.count_ones() {
bail!(
"UDF returned {} rows, but expected {}",
output.num_rows(),
vis.len(),
);
}

let data_chunk =
DataChunk::try_from(&output).expect("failed to convert UDF output to DataChunk");
let output = data_chunk.uncompact(vis.clone());
Expand Down Expand Up @@ -139,8 +175,26 @@ impl Build for UdfExpression {
let return_type = DataType::from(prost.get_return_type().unwrap());
let udf = prost.get_rex_node().unwrap().as_udf().unwrap();

// connect to UDF service
let client = get_or_create_client(&udf.link)?;
let imp = match &udf.extra {
None | Some(PbExtra::External(PbExternalUdfExtra {})) => UdfImpl::External {
client: get_or_create_flight_client(&udf.link)?,
identifier: udf.identifier.clone(),
},
Some(PbExtra::Wasm(PbWasmUdfExtra { wasm_storage_url })) => {
let wasm_engine = WasmEngine::get_or_create();
// Use `block_in_place` as an escape hatch to run async code here in sync context.
// Calling `block_on` directly will panic.
let component = tokio::task::block_in_place(|| {
tokio::runtime::Handle::current().block_on({
wasm_engine
.load_component(wasm_storage_url, &udf.identifier)
.instrument(tracing::info_span!("load_component", %udf.identifier))
})
})?;

UdfImpl::Wasm { component }
}
};

let arg_schema = Arc::new(Schema::new(
udf.arg_types
Expand All @@ -162,8 +216,7 @@ impl Build for UdfExpression {
arg_types: udf.arg_types.iter().map(|t| t.into()).collect(),
return_type,
arg_schema,
client,
identifier: udf.identifier.clone(),
imp,
span: format!("expr_udf_call ({})", udf.identifier).into(),
})
}
Expand All @@ -173,7 +226,7 @@ impl Build for UdfExpression {
/// Get or create a client for the given UDF service.
///
/// There is a global cache for clients, so that we can reuse the same client for the same service.
pub(crate) fn get_or_create_client(link: &str) -> Result<Arc<ArrowFlightUdfClient>> {
pub(crate) fn get_or_create_flight_client(link: &str) -> Result<Arc<ArrowFlightUdfClient>> {
static CLIENTS: LazyLock<Mutex<HashMap<String, Weak<ArrowFlightUdfClient>>>> =
LazyLock::new(Default::default);
let mut clients = CLIENTS.lock().unwrap();
Expand Down
2 changes: 1 addition & 1 deletion src/expr/core/src/table_function/user_defined.rs
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ pub fn new_user_defined(prost: &PbTableFunction, chunk_size: usize) -> Result<Bo
.try_collect::<_, Fields, _>()?,
));
// connect to UDF service
let client = crate::expr::expr_udf::get_or_create_client(&udtf.link)?;
let client = crate::expr::expr_udf::get_or_create_flight_client(&udtf.link)?;

Ok(UserDefinedTableFunction {
children: prost.args.iter().map(expr_build_from_prost).try_collect()?,
Expand Down
1 change: 1 addition & 0 deletions src/frontend/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ arrow-schema = { workspace = true }
async-recursion = "1.0.5"
async-trait = "0.1"
auto_enums = { version = "0.8", features = ["futures03"] }
base64 = "0.21"
bk-tree = "0.5.0"
bytes = "1"
clap = { version = "4", features = ["derive"] }
Expand Down
4 changes: 4 additions & 0 deletions src/frontend/src/catalog/function_catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use risingwave_common::catalog::FunctionId;
use risingwave_common::types::DataType;
use risingwave_pb::catalog::function::PbKind;
use risingwave_pb::catalog::PbFunction;
use risingwave_pb::expr::user_defined_function::PbExtra;

use crate::catalog::OwnedByUserCatalog;

Expand All @@ -31,6 +32,8 @@ pub struct FunctionCatalog {
pub language: String,
pub identifier: String,
pub link: String,
// for backward compatibility, newly added fields should be optional
pub extra: Option<PbExtra>,
}

#[derive(Clone, Display, PartialEq, Eq, Hash, Debug)]
Expand Down Expand Up @@ -64,6 +67,7 @@ impl From<&PbFunction> for FunctionCatalog {
language: prost.language.clone(),
identifier: prost.identifier.clone(),
link: prost.link.clone(),
extra: prost.extra.clone().map(Into::into),
}
}
}
Expand Down
3 changes: 3 additions & 0 deletions src/frontend/src/expr/user_defined_function.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ impl UserDefinedFunction {
language: udf.get_language().clone(),
identifier: udf.get_identifier().clone(),
link: udf.get_link().clone(),
extra: udf.extra.clone(),
};

Ok(Self {
Expand Down Expand Up @@ -88,6 +89,8 @@ impl Expr for UserDefinedFunction {
language: self.catalog.language.clone(),
identifier: self.catalog.identifier.clone(),
link: self.catalog.link.clone(),

extra: self.catalog.extra.clone(),
})),
}
}
Expand Down
Loading

0 comments on commit 26eaf7a

Please sign in to comment.