Skip to content

Commit

Permalink
fix error ui slt
Browse files Browse the repository at this point in the history
Signed-off-by: Runji Wang <[email protected]>
  • Loading branch information
wangrunji0408 committed May 10, 2024
1 parent 2970441 commit a35ac1c
Show file tree
Hide file tree
Showing 4 changed files with 15 additions and 14 deletions.
6 changes: 4 additions & 2 deletions e2e_test/error_ui/simple/main.slt
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,10 @@ create function int_42() returns int as int_42 using link '555.0.0.1:8815';
----
db error: ERROR: Failed to run the query

Caused by:
Flight service error: invalid address: 555.0.0.1:8815, err: failed to parse address: http://555.0.0.1:8815: invalid IPv4 address
Caused by these errors (recent errors listed first):
1: Expr error
2: UDF error
3: Flight service error: invalid address: 555.0.0.1:8815, err: failed to parse address: http://555.0.0.1:8815: invalid IPv4 address


statement error
Expand Down
14 changes: 7 additions & 7 deletions src/expr/core/src/expr/expr_udf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -421,7 +421,7 @@ impl Build for UserDefinedFunction {
#[cfg(not(madsim))]
_ => {
let link = udf.get_link()?;
let client = crate::expr::expr_udf::get_or_create_flight_client(link)?;
let client = get_or_create_flight_client(link)?;
// backward compatibility
// see <https://github.com/risingwavelabs/risingwave/pull/16619> for details
if client.protocol_version() == 1 {
Expand Down Expand Up @@ -456,11 +456,11 @@ impl Build for UserDefinedFunction {
}
}

#[cfg(not(madsim))]
#[cfg_or_panic(not(madsim))]
/// Get or create a client for the given UDF service.
///
/// There is a global cache for clients, so that we can reuse the same client for the same service.
pub(crate) fn get_or_create_flight_client(link: &str) -> Result<Arc<FlightClient>> {
pub fn get_or_create_flight_client(link: &str) -> Result<Arc<FlightClient>> {
static CLIENTS: LazyLock<std::sync::Mutex<HashMap<String, Weak<FlightClient>>>> =
LazyLock::new(Default::default);
let mut clients = CLIENTS.lock().unwrap();
Expand Down Expand Up @@ -489,11 +489,11 @@ async fn connect_tonic(mut addr: &str) -> Result<tonic::transport::Channel> {
const REQUEST_TIMEOUT_SECS: u64 = 5;
const CONNECT_TIMEOUT_SECS: u64 = 5;

if addr.starts_with("http://") {
addr = addr.strip_prefix("http://").unwrap();
if let Some(s) = addr.strip_prefix("http://") {
addr = s;
}
if addr.starts_with("https://") {
addr = addr.strip_prefix("https://").unwrap();
if let Some(s) = addr.strip_prefix("https://") {
addr = s;
}
let host_addr = addr.parse::<HostAddr>().map_err(|e| {
arrow_udf_flight::Error::Service(format!(
Expand Down
2 changes: 1 addition & 1 deletion src/expr/core/src/expr/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ use risingwave_common::types::{DataType, Datum};
pub use self::build::*;
pub use self::expr_input_ref::InputRefExpression;
pub use self::expr_literal::LiteralExpression;
pub use self::expr_udf::get_or_create_wasm_runtime;
pub use self::expr_udf::{get_or_create_flight_client, get_or_create_wasm_runtime};
pub use self::value::{ValueImpl, ValueRef};
pub use self::wrapper::*;
pub use super::{ExprError, Result};
Expand Down
7 changes: 3 additions & 4 deletions src/frontend/src/handler/create_function.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,15 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use anyhow::{anyhow, Context};
use anyhow::Context;
use arrow_schema::Fields;
use arrow_udf_flight::Client as FlightClient;
use bytes::Bytes;
use itertools::Itertools;
use pgwire::pg_response::StatementType;
use risingwave_common::array::arrow::{ToArrow, UdfArrowConvert};
use risingwave_common::catalog::FunctionId;
use risingwave_common::types::DataType;
use risingwave_expr::expr::get_or_create_wasm_runtime;
use risingwave_expr::expr::{get_or_create_flight_client, get_or_create_wasm_runtime};
use risingwave_pb::catalog::function::{Kind, ScalarFunction, TableFunction};
use risingwave_pb::catalog::Function;
use risingwave_sqlparser::ast::{CreateFunctionBody, ObjectName, OperateFunctionArg};
Expand Down Expand Up @@ -167,7 +166,7 @@ pub async fn handle_create_function(

// check UDF server
{
let client = FlightClient::connect(&l).await.map_err(|e| anyhow!(e))?;
let client = get_or_create_flight_client(&l)?;
let convert = UdfArrowConvert {
legacy: client.protocol_version() == 1,
};
Expand Down

0 comments on commit a35ac1c

Please sign in to comment.