Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin' into tab/connection
Browse files Browse the repository at this point in the history
  • Loading branch information
tabversion committed Nov 18, 2024
2 parents 1d2cb3d + a4d96ec commit 45a9b8d
Show file tree
Hide file tree
Showing 28 changed files with 280 additions and 187 deletions.
19 changes: 19 additions & 0 deletions .github/dependabot.yml
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,29 @@ updates:
google-cloud:
patterns:
- "google-cloud*"

# Don't update these directories
- package-ecosystem: cargo
directory: /integration_tests/feature-store
schedule:
interval: "daily"
ignore:
- dependency-name: "*"

- package-ecosystem: maven
directory: /java
schedule:
interval: "weekly"
open-pull-requests-limit: 5
# Disable auto rebase to reduce cost. Use `@dependabot rebase` manually instead.
rebase-strategy: "disabled"
ignore:
# Do not bump Debezium because we have hacked its source code e.g. #18760
- dependency-name: "io.debezium:*"
update-types:
["version-update:semver-minor", "version-update:semver-major"]
groups:
# Group all dependenies together because Java libraries are quite stable
all:
patterns:
- "*"
22 changes: 22 additions & 0 deletions ci/workflows/main-cron.yml
Original file line number Diff line number Diff line change
Expand Up @@ -367,6 +367,28 @@ steps:
timeout_in_minutes: 120
retry: *auto-retry

- label: "end-to-end test (madsim, random vnode count)"
key: "e2e-test-deterministic-random-vnode-count"
command: "TEST_NUM=32 RW_SIM_RANDOM_VNODE_COUNT=true timeout 120m ci/scripts/deterministic-e2e-test.sh"
if: |
!(build.pull_request.labels includes "ci/main-cron/run-selected") && build.env("CI_STEPS") == null
|| build.pull_request.labels includes "ci/run-e2e-test-deterministic-simulation"
|| build.env("CI_STEPS") =~ /(^|,)e2e-tests?-deterministic-simulation(,|$$)/
depends_on: "build-simulation"
plugins:
- seek-oss/aws-sm#v2.3.1:
env:
GITHUB_TOKEN: github-token
- docker-compose#v5.1.0:
run: rw-build-env
config: ci/docker-compose.yml
mount-buildkite-agent: true
environment:
- GITHUB_TOKEN
- ./ci/plugins/upload-failure-logs
timeout_in_minutes: 120
retry: *auto-retry

- label: "recovery test (madsim)"
key: "recovery-test-deterministic"
command: "TEST_NUM=12 KILL_RATE=1.0 BACKGROUND_DDL_RATE=0.0 timeout 65m ci/scripts/deterministic-recovery-test.sh"
Expand Down
2 changes: 1 addition & 1 deletion e2e_test/source_legacy/cdc_inline/postgres_create_drop.slt
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ control substitution on
system ok
psql -c "
DROP TABLE IF EXISTS tt1;
CREATE TABLE tt1 (v1 int primary key, v2 timestamp);
CREATE TABLE tt1 (v1 int primary key, v2 timestamptz);
INSERT INTO tt1 VALUES (1, '2023-10-23 10:00:00');"

statement ok
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,6 @@ private boolean isDataTypeCompatible(String mysqlDataType, Data.DataType.TypeNam
&& val <= Data.DataType.TypeName.INT64_VALUE;
case "bigint":
return val == Data.DataType.TypeName.INT64_VALUE;

case "float":
case "real":
return val == Data.DataType.TypeName.FLOAT_VALUE
Expand All @@ -303,6 +302,12 @@ private boolean isDataTypeCompatible(String mysqlDataType, Data.DataType.TypeNam
return val == Data.DataType.TypeName.DECIMAL_VALUE;
case "varchar":
return val == Data.DataType.TypeName.VARCHAR_VALUE;
case "date":
return val == Data.DataType.TypeName.DATE_VALUE;
case "time":
return val == Data.DataType.TypeName.TIME_VALUE;
case "datetime":
return val == Data.DataType.TypeName.TIMESTAMP_VALUE;
case "timestamp":
return val == Data.DataType.TypeName.TIMESTAMPTZ_VALUE;
default:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -679,7 +679,25 @@ private boolean isDataTypeCompatible(String pgDataType, Data.DataType.TypeName t
|| val == Data.DataType.TypeName.VARCHAR_VALUE;
case "varchar":
case "character varying":
case "uuid":
case "enum":
return val == Data.DataType.TypeName.VARCHAR_VALUE;
case "bytea":
return val == Data.DataType.TypeName.BYTEA_VALUE;
case "date":
return val == Data.DataType.TypeName.DATE_VALUE;
case "time":
return val == Data.DataType.TypeName.TIME_VALUE;
case "timestamp":
case "timestamp without time zone":
return val == Data.DataType.TypeName.TIMESTAMP_VALUE;
case "timestamptz":
case "timestamp with time zone":
return val == Data.DataType.TypeName.TIMESTAMPTZ_VALUE;
case "interval":
return val == Data.DataType.TypeName.INTERVAL_VALUE;
case "jsonb":
return val == Data.DataType.TypeName.JSONB_VALUE;
default:
return true; // true for other uncovered types
}
Expand Down
6 changes: 3 additions & 3 deletions proto/catalog.proto
Original file line number Diff line number Diff line change
Expand Up @@ -325,10 +325,10 @@ message Function {
// The zstd-compressed binary of the function.
optional bytes compressed_binary = 17;
bool always_retry_on_network_error = 16;
// The runtime used when javascript is used as the language. Could be "quickjs" or "deno".
// The runtime selected when multiple runtimes are available for the language. Now is not used.
optional string runtime = 18;
// The function type, which is used to execute the function. Could be "sync", "async", "generator" or "async_generator"
optional string function_type = 19;
reserved 19;
reserved "function_type";

oneof kind {
ScalarFunction scalar = 11;
Expand Down
9 changes: 5 additions & 4 deletions proto/expr.proto
Original file line number Diff line number Diff line change
Expand Up @@ -610,10 +610,10 @@ message UserDefinedFunction {
// - If `language` is `rust` or `wasm`, the zstd-compressed wasm binary.
optional bytes compressed_binary = 10;
bool always_retry_on_network_error = 9;
// The runtime used when javascript is used as the language. Could be "quickjs" or "deno".
// The runtime selected when multiple runtimes are available for the language. Now is not used.
optional string runtime = 11;
// The function type, which is used to execute the function. Could be "sync", "async", "generator" or "async_generator"
optional string function_type = 12;
reserved 12;
reserved "function_type";
}

// Additional information for user defined table/aggregate functions.
Expand All @@ -627,5 +627,6 @@ message UserDefinedFunctionMetadata {
optional string body = 7;
optional bytes compressed_binary = 10;
optional string runtime = 11;
optional string function_type = 12;
reserved 12;
reserved "function_type";
}
1 change: 0 additions & 1 deletion src/expr/core/src/aggregate/user_defined.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,6 @@ pub fn new_user_defined(
arg_names: &udf.arg_names,
return_type,
always_retry_on_network_error: false,
function_type: udf.function_type.as_deref(),
})
.context("failed to build UDF runtime")?;

Expand Down
1 change: 0 additions & 1 deletion src/expr/core/src/expr/expr_udf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,6 @@ impl Build for UserDefinedFunction {
arg_names: &udf.arg_names,
return_type: &return_type,
always_retry_on_network_error: udf.always_retry_on_network_error,
function_type: udf.function_type.as_deref(),
})
.context("failed to build UDF runtime")?;

Expand Down
1 change: 0 additions & 1 deletion src/expr/core/src/sig/udf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,6 @@ pub struct UdfOptions<'a> {
pub arg_names: &'a [String],
pub return_type: &'a DataType,
pub always_retry_on_network_error: bool,
pub function_type: Option<&'a str>,
}

#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, EnumAsInner)]
Expand Down
1 change: 0 additions & 1 deletion src/expr/core/src/table_function/user_defined.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,6 @@ pub fn new_user_defined(prost: &PbTableFunction, chunk_size: usize) -> Result<Bo
arg_names: &udf.arg_names,
return_type: &return_type,
always_retry_on_network_error: false,
function_type: udf.function_type.as_deref(),
})
.context("failed to build UDF runtime")?;

Expand Down
9 changes: 3 additions & 6 deletions src/frontend/src/catalog/function_catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,12 @@ pub struct FunctionCatalog {
pub arg_types: Vec<DataType>,
pub return_type: DataType,
pub language: String,
pub runtime: Option<String>,
pub identifier: Option<String>,
pub body: Option<String>,
pub link: Option<String>,
pub compressed_binary: Option<Vec<u8>>,
pub always_retry_on_network_error: bool,
pub function_type: Option<String>,
pub runtime: Option<String>,
}

#[derive(Clone, Display, PartialEq, Eq, Hash, Debug, EnumAsInner)]
Expand Down Expand Up @@ -71,13 +70,12 @@ impl From<&PbFunction> for FunctionCatalog {
arg_types: prost.arg_types.iter().map(|arg| arg.into()).collect(),
return_type: prost.return_type.as_ref().expect("no return type").into(),
language: prost.language.clone(),
runtime: prost.runtime.clone(),
identifier: prost.identifier.clone(),
body: prost.body.clone(),
link: prost.link.clone(),
compressed_binary: prost.compressed_binary.clone(),
always_retry_on_network_error: prost.always_retry_on_network_error,
function_type: prost.function_type.clone(),
runtime: prost.runtime.clone(),
}
}
}
Expand All @@ -89,12 +87,11 @@ impl From<&FunctionCatalog> for PbUserDefinedFunctionMetadata {
arg_types: c.arg_types.iter().map(|t| t.to_protobuf()).collect(),
return_type: Some(c.return_type.to_protobuf()),
language: c.language.clone(),
runtime: c.runtime.clone(),
link: c.link.clone(),
identifier: c.identifier.clone(),
body: c.body.clone(),
compressed_binary: c.compressed_binary.clone(),
function_type: c.function_type.clone(),
runtime: c.runtime.clone(),
}
}
}
Expand Down
10 changes: 4 additions & 6 deletions src/frontend/src/expr/user_defined_function.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,21 +47,20 @@ impl UserDefinedFunction {
let catalog = FunctionCatalog {
// FIXME(yuhao): function id is not in udf proto.
id: FunctionId::placeholder(),
name: udf.get_name().clone(),
name: udf.name.clone(),
// FIXME(yuhao): owner is not in udf proto.
owner: u32::MAX - 1,
kind: FunctionKind::Scalar,
arg_names: udf.arg_names.clone(),
arg_types,
return_type,
language: udf.get_language().clone(),
language: udf.language.clone(),
runtime: udf.runtime.clone(),
identifier: udf.identifier.clone(),
body: udf.body.clone(),
link: udf.link.clone(),
compressed_binary: udf.compressed_binary.clone(),
always_retry_on_network_error: udf.always_retry_on_network_error,
function_type: udf.function_type.clone(),
runtime: udf.runtime.clone(),
};

Ok(Self {
Expand Down Expand Up @@ -93,13 +92,12 @@ impl Expr for UserDefinedFunction {
.map(|t| t.to_protobuf())
.collect(),
language: self.catalog.language.clone(),
runtime: self.catalog.runtime.clone(),
identifier: self.catalog.identifier.clone(),
link: self.catalog.link.clone(),
body: self.catalog.body.clone(),
compressed_binary: self.catalog.compressed_binary.clone(),
always_retry_on_network_error: self.catalog.always_retry_on_network_error,
function_type: self.catalog.function_type.clone(),
runtime: self.catalog.runtime.clone(),
})),
}
}
Expand Down
20 changes: 11 additions & 9 deletions src/frontend/src/handler/create_aggregate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,16 @@ pub async fn handle_create_aggregate(
None => return Err(ErrorCode::InvalidParameterValue("no language".into()).into()),
};

let runtime = match params.runtime {
Some(_) => {
return Err(ErrorCode::InvalidParameterValue(
"runtime selection is currently not supported".to_string(),
)
.into());
}
None => None,
};

let return_type = bind_data_type(&returns)?;

let mut arg_names = vec![];
Expand Down Expand Up @@ -94,13 +104,6 @@ pub async fn handle_create_aggregate(
}
_ => None,
};
let function_type = match params.function_type {
Some(CreateFunctionType::Sync) => Some("sync".to_string()),
Some(CreateFunctionType::Async) => Some("async".to_string()),
Some(CreateFunctionType::Generator) => Some("generator".to_string()),
Some(CreateFunctionType::AsyncGenerator) => Some("async_generator".to_string()),
None => None,
};

let create_fn = risingwave_expr::sig::find_udf_impl(&language, None, link)?.create_fn;
let output = create_fn(CreateFunctionOptions {
Expand All @@ -124,14 +127,13 @@ pub async fn handle_create_aggregate(
arg_types: arg_types.into_iter().map(|t| t.into()).collect(),
return_type: Some(return_type.into()),
language,
runtime,
identifier: Some(output.identifier),
link: link.map(|s| s.to_string()),
body: output.body,
compressed_binary: output.compressed_binary,
owner: session.user_id(),
always_retry_on_network_error: false,
runtime: None,
function_type,
};

let catalog_writer = session.catalog_writer()?;
Expand Down
24 changes: 6 additions & 18 deletions src/frontend/src/handler/create_function.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,15 +60,11 @@ pub async fn handle_create_function(
};

let runtime = match params.runtime {
Some(runtime) => {
if language == "javascript" {
Some(runtime.real_value())
} else {
return Err(ErrorCode::InvalidParameterValue(
"runtime is only supported for javascript".to_string(),
)
.into());
}
Some(_) => {
return Err(ErrorCode::InvalidParameterValue(
"runtime selection is currently not supported".to_string(),
)
.into());
}
None => None,
};
Expand Down Expand Up @@ -141,13 +137,6 @@ pub async fn handle_create_function(
}
_ => None,
};
let function_type = match params.function_type {
Some(CreateFunctionType::Sync) => Some("sync".to_string()),
Some(CreateFunctionType::Async) => Some("async".to_string()),
Some(CreateFunctionType::Generator) => Some("generator".to_string()),
Some(CreateFunctionType::AsyncGenerator) => Some("async_generator".to_string()),
None => None,
};

let create_fn =
risingwave_expr::sig::find_udf_impl(&language, runtime.as_deref(), link)?.create_fn;
Expand Down Expand Up @@ -176,6 +165,7 @@ pub async fn handle_create_function(
arg_types: arg_types.into_iter().map(|t| t.into()).collect(),
return_type: Some(return_type.into()),
language,
runtime,
identifier: Some(output.identifier),
link: link.map(|s| s.to_string()),
body: output.body,
Expand All @@ -184,8 +174,6 @@ pub async fn handle_create_function(
always_retry_on_network_error: with_options
.always_retry_on_network_error
.unwrap_or_default(),
runtime,
function_type,
};

let catalog_writer = session.catalog_writer()?;
Expand Down
3 changes: 1 addition & 2 deletions src/frontend/src/handler/create_sql_function.rs
Original file line number Diff line number Diff line change
Expand Up @@ -336,14 +336,13 @@ pub async fn handle_create_sql_function(
arg_types: arg_types.into_iter().map(|t| t.into()).collect(),
return_type: Some(return_type.into()),
language,
runtime: None,
identifier: None,
body: Some(body),
compressed_binary: None,
link: None,
owner: session.user_id(),
always_retry_on_network_error: false,
runtime: None,
function_type: None,
};

let catalog_writer = session.catalog_writer()?;
Expand Down
Loading

0 comments on commit 45a9b8d

Please sign in to comment.