diff --git a/README.md b/README.md
index 07d12e99223e..d438d4365c2e 100644
--- a/README.md
+++ b/README.md
@@ -9,28 +9,20 @@
-### 🌊 Reimagine stream processing.
+### 🌊 Reimagine real-time data engineering.
-
@@ -56,16 +48,9 @@
RisingWave is a Postgres-compatible SQL engine engineered to provide the
simplest and
most cost-efficient approach for
processing,
analyzing, and
managing real-time event streaming data.
-![RisingWave](./docs/dev/src/images/architecture_20240814.png)
-
-## When to use RisingWave?
-RisingWave can ingest millions of events per second, continuously join live data streams with historical tables, and serve ad-hoc queries in real-time. Typical use cases include, but are not limited to:
-
-* **Streaming analytics**: Perform streaming analytics and build live dashboards with data freshness under one second, ideal for stock trading, sports betting, IoT monitoring, and more.
-* **Event-driven applications**: Develop monitoring and alerting applications for fraud detection, anomaly detection, and more.
-* **Real-time ETL pipelines**: Ingest data from different sources, perform enrichment queries, and deliver results to downstream systems.
-* **Feature stores**: Transform both batch and streaming data into ML features using the same codebase.
+RisingWave can
ingest millions of events per second, seamlessly
join and analyze live data streams with historical tables,
serve ad-hoc queries in real-time, and
deliver fresh, consistent results.
+![RisingWave](./docs/dev/src/images/architecture_20240814.png)
## Try it out in 60 seconds
@@ -74,64 +59,33 @@ Install RisingWave standalone mode:
curl https://risingwave.com/sh | sh
```
-Then follow the prompts to start and connect to RisingWave.
-
To learn about other installation options, such as using a Docker image, see [Quick Start](https://docs.risingwave.com/docs/current/get-started/).
-> Please note: RisingWave uses [Scarf](https://scarf.sh/) to collect anonymized installation analytics. These analytics help support us understand and improve the distribution of our package.
-> The privacy policy of Scarf is available at [https://about.scarf.sh/privacy-policy](https://about.scarf.sh/privacy-policy).
-
-## Production deployments
-
-[**RisingWave Cloud**](https://cloud.risingwave.com) offers the easiest way to run RisingWave in production, with a _forever-free_ developer tier.
-
-For **Docker deployment**, please refer to [Docker Compose](https://docs.risingwave.com/docs/current/risingwave-docker-compose/).
-
-For **Kubernetes deployment**, please refer to [Kubernetes with Helm](https://docs.risingwave.com/docs/current/risingwave-k8s-helm/) or [Kubernetes with Operator](https://docs.risingwave.com/docs/current/risingwave-kubernetes/).
-
-## Why RisingWave for real-time materialized views?
-
-RisingWave specializes in providing **incrementally updated, consistent materialized views** — a persistent data structure that represents the results of event stream processing. Compared to materialized views, dynamic tables, and live tables in other database and data warehouse systems, RisingWave's materialized view stands out in several key aspects:
-* Highly cost-efficient - up to 95% cost savings compared to state-of-the-art solutions
-* Synchronous refresh without compromising consistency
-* Extensive SQL support including joins, deletes, and updates
-* High concurrency in query serving
-* Instant fault tolerance
-* Transparent dynamic scaling
-* Speedy bootstrapping and backfilling
-
-RisingWave's extensive CDC support further enables users to seamlessly offload event-driven workloads such as materialized views and triggers from operational databases (e.g., [PostgreSQL](https://docs.risingwave.com/docs/current/ingest-from-postgres-cdc/)) to RisingWave.
+## When is RisingWave the perfect fit?
+RisingWave is the ideal solution for building event-driven applications. Choose RisingWave when you want to:
+* Ingest data from real-time sources like Kafka streams, database CDC, and more.
+* Perform complex queries (such as joins, aggregations, and time windowing) on the fly.
+* Interactively and concurrently explore consistent, up-to-the-moment results.
+* Seamlessly send results to downstream systems.
+* Process streaming and batch data using the same codebase.
-## Why RisingWave for stream processing?
-RisingWave provides users with a comprehensive set of frequently used stream processing features, including exactly-once consistency, [time window functions](https://docs.risingwave.com/docs/current/sql-function-time-window/), [watermarks](https://docs.risingwave.com/docs/current/watermarks/), and more. RisingWave significantly reduces the complexity of building stream processing applications by allowing developers to express intricate stream processing logic through cascaded materialized views. Furthermore, it allows users to persist data directly within the system, eliminating the need to deliver results to external databases for storage and query serving.
+## In what use cases does RisingWave excel?
+RisingWave is particularly effective for the following use cases:
-![Real-time Data Pipelines without or with RisingWave](https://github.com/risingwavelabs/risingwave/assets/100685635/414afbb7-5187-410f-9ba4-9a640c8c6306)
+* **Streaming analytics**: Achieve sub-second data freshness in live dashboards, ideal for high-stakes scenarios like stock trading, sports betting, and IoT monitoring.
+* **Event-driven applications**: Develop sophisticated monitoring and alerting systems for critical applications such as fraud and anomaly detection.
+* **Real-time data enrichment**: Continuously ingest data from diverse sources, conduct real-time data enrichment, and efficiently deliver the results to downstream systems.
+* **Feature engineering**: Transform batch and streaming data into features in your machine learning models using a unified codebase, ensuring seamless integration and consistency.
-Compared to existing stream processing systems like [Apache Flink](https://flink.apache.org/), [Apache Spark Streaming](https://spark.apache.org/docs/latest/streaming-programming-guide.html), and [ksqlDB](https://ksqldb.io/), RisingWave stands out in two primary dimensions: **Ease-of-use** and **cost efficiency**, thanks to its **[PostgreSQL](https://www.postgresql.org/)-style interaction experience** and **[Snowflake](https://snowflake.com/)-like architectural design** (i.e., decoupled storage and compute).
-
-| | RisingWave 🌊 | Traditional stream processing systems |
-| :---: | :---: | :---: |
-| Learning curve 🎢 | PostgreSQL-style experience | System-specific concepts |
-| Integration 🔗 | PostgreSQL ecosystem | System-specific ecosystem |
-| Complex queries (e.g., joins) 💡 | Highly efficient | Inefficient |
-| Failure recovery 🚨 | Instant | Minutes or even hours |
-| Dynamic scaling 🚀 | Transparent | Stop-the-world |
-| Bootstrapping and Backfilling ⏪ | Accelerated via dynamic scaling | Slow |
+## Production deployments
+[**RisingWave Cloud**](https://cloud.risingwave.com) offers the easiest way to run RisingWave in production.
-### RisingWave as a database
-RisingWave is fundamentally a database that **extends beyond basic streaming data processing capabilities**. It excels in **the effective management of streaming data**, making it a trusted choice for data persistence and powering online applications. RisingWave offers an extensive range of database capabilities, which include:
+For **Docker deployment**, please refer to [Docker Compose](https://docs.risingwave.com/docs/current/risingwave-docker-compose/).
-* High availability
-* Serving highly concurrent queries
-* Role-based access control (RBAC)
-* Integration with data modeling tools, such as [dbt](https://docs.risingwave.com/docs/current/use-dbt/)
-* Integration with database management tools, such as [Dbeaver](https://docs.risingwave.com/docs/current/dbeaver-integration/)
-* Integration with BI tools, such as [Grafana](https://docs.risingwave.com/docs/current/grafana-integration/)
-* Schema change
-* Processing of semi-structured data
+For **Kubernetes deployment**, please refer to [Kubernetes with Helm](https://docs.risingwave.com/docs/current/risingwave-k8s-helm/) or [Kubernetes with Operator](https://docs.risingwave.com/docs/current/risingwave-kubernetes/).
## Community
@@ -139,7 +93,10 @@ Looking for help, discussions, collaboration opportunities, or a casual afternoo
## Notes on telemetry
-RisingWave collects anonymous usage statistics to better understand how the community is using RisingWave. The sole intention of this exercise is to help improve the product. Users may opt out easily at any time. Please refer to the [user documentation](https://docs.risingwave.com/docs/current/telemetry/) for more details.
+
+RisingWave uses [Scarf](https://scarf.sh/) to collect anonymized installation analytics. These analytics help support us understand and improve the distribution of our package. The privacy policy of Scarf is available at [https://about.scarf.sh/privacy-policy](https://about.scarf.sh/privacy-policy).
+
+RisingWave also collects anonymous usage statistics to better understand how the community is using RisingWave. The sole intention of this exercise is to help improve the product. Users may opt out easily at any time. Please refer to the [user documentation](https://docs.risingwave.com/docs/current/telemetry/) for more details.
## License
diff --git a/docker/Dockerfile b/docker/Dockerfile
index 411396b98a99..ff45460e86ef 100644
--- a/docker/Dockerfile
+++ b/docker/Dockerfile
@@ -21,7 +21,7 @@ ENV PATH $NVM_DIR/versions/node/v$NODE_VERSION/bin:$PATH
SHELL ["/bin/bash", "-c"]
RUN curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh -s -- --no-modify-path --default-toolchain none -y
-ENV PATH /root/.cargo/bin/:$PATH
+ENV PATH /root/.cargo/bin:$PATH
ENV CARGO_INCREMENTAL=0
COPY rust-toolchain rust-toolchain
@@ -34,6 +34,8 @@ RUN rustup self update \
&& rustup target add wasm32-wasi
RUN cargo install flamegraph
+RUN cargo install addr2line --features bin --bin addr2line
+
# TODO: cargo-chef doesn't work well now, because we update Cargo.lock very often.
# We may consider sccache instead.
@@ -137,6 +139,7 @@ COPY --from=rust-builder /risingwave/bin/risingwave.dwp /risingwave/bin/risingwa
COPY --from=java-builder /risingwave/bin/connector-node /risingwave/bin/connector-node
COPY --from=rust-builder /risingwave/bin/jeprof /usr/local/bin/jeprof
COPY --from=rust-base /root/.cargo/bin/flamegraph /usr/local/bin/flamegraph
+COPY --from=rust-base /root/.cargo/bin/addr2line /usr/local/bin/addr2line
# Set default playground mode to docker-playground profile
ENV PLAYGROUND_PROFILE docker-playground
diff --git a/e2e_test/source/basic/temporary_kafka_batch.slt b/e2e_test/source/basic/temporary_kafka_batch.slt
new file mode 100644
index 000000000000..578cc76b47d5
--- /dev/null
+++ b/e2e_test/source/basic/temporary_kafka_batch.slt
@@ -0,0 +1,82 @@
+statement ok
+create temporary source s1 (v1 int, v2 varchar) with (
+ connector = 'kafka',
+ topic = 'kafka_1_partition_topic',
+ properties.bootstrap.server = 'message_queue:29092',
+ scan.startup.mode = 'earliest'
+) FORMAT PLAIN ENCODE JSON
+
+query IT rowsort
+select * from s1
+----
+1 1
+2 22
+3 333
+4 4444
+
+query IT rowsort
+select * from s1 where _rw_kafka_timestamp > '1977-01-01 00:00:00+00:00'
+----
+1 1
+2 22
+3 333
+4 4444
+
+query IT rowsort
+select * from s1 where _rw_kafka_timestamp > '1977-01-01 00:00:00'
+----
+1 1
+2 22
+3 333
+4 4444
+
+query IT rowsort
+select * from s1 where _rw_kafka_timestamp > TO_TIMESTAMP('1977-01-01 00:00:00.000000', 'YYYY-MM-DD HH24:MI:SS.US')
+----
+1 1
+2 22
+3 333
+4 4444
+
+statement error expected format
+select * from s1 where _rw_kafka_timestamp > 'abc'
+
+statement error out of range
+select * from s1 where _rw_kafka_timestamp < TO_TIMESTAMP(2147483647 + 1)
+
+query IT
+select * from s1 where _rw_kafka_timestamp > '2045-01-01 0:00:00+00:00'
+----
+
+query B
+select _rw_kafka_timestamp > '1977-01-01 00:00:00+00:00' from s1
+----
+t
+t
+t
+t
+
+query B
+select _rw_kafka_timestamp < now() from s1
+----
+t
+t
+t
+t
+
+query B
+select _rw_kafka_timestamp < now() - interval '1 day' from s1
+----
+f
+f
+f
+f
+
+query IT rowsort
+select * from s1 limit 2
+----
+1 1
+2 22
+
+statement ok
+drop source s1
diff --git a/e2e_test/udf/bug_fixes/17560_udaf_as_win_func.slt b/e2e_test/udf/bug_fixes/17560_udaf_as_win_func.slt
new file mode 100644
index 000000000000..3d8e3279a8b4
--- /dev/null
+++ b/e2e_test/udf/bug_fixes/17560_udaf_as_win_func.slt
@@ -0,0 +1,19 @@
+# https://github.com/risingwavelabs/risingwave/issues/17560
+
+statement ok
+create aggregate sum00(value int) returns int language python as $$
+def create_state():
+ return 0
+def accumulate(state, value):
+ return state + value
+def retract(state, value):
+ return state - value
+def finish(state):
+ return state
+$$;
+
+query ii
+select t.value, sum00(weight) OVER (PARTITION BY value) from (values (1, 1), (null, 2), (3, 3)) as t(value, weight);
+----
+1 1
+3 3
diff --git a/src/common/src/config.rs b/src/common/src/config.rs
index b774c6394c4e..88ea110869b7 100644
--- a/src/common/src/config.rs
+++ b/src/common/src/config.rs
@@ -1165,7 +1165,7 @@ pub struct S3ObjectStoreDeveloperConfig {
)]
pub retryable_service_error_codes: Vec
,
- // TODO: the following field will be deprecated after opendal is stablized
+ // TODO: deprecate this config when we are completely deprecate aws sdk.
#[serde(default = "default::object_store_config::s3::developer::use_opendal")]
pub use_opendal: bool,
}
@@ -2174,10 +2174,6 @@ pub mod default {
}
pub mod developer {
- use crate::util::env_var::env_var_is_true_or;
-
- const RW_USE_OPENDAL_FOR_S3: &str = "RW_USE_OPENDAL_FOR_S3";
-
pub fn retry_unknown_service_error() -> bool {
false
}
@@ -2187,11 +2183,7 @@ pub mod default {
}
pub fn use_opendal() -> bool {
- // TODO: deprecate this config when we are completely switch from aws sdk to opendal.
- // The reason why we use !env_var_is_false_or(RW_USE_OPENDAL_FOR_S3, false) here is
- // 1. Maintain compatibility so that there is no behavior change in cluster with RW_USE_OPENDAL_FOR_S3 set.
- // 2. Change the default behavior to use opendal for s3 if RW_USE_OPENDAL_FOR_S3 is not set.
- env_var_is_true_or(RW_USE_OPENDAL_FOR_S3, false)
+ true
}
}
}
diff --git a/src/common/src/util/env_var.rs b/src/common/src/util/env_var.rs
index 1b7655dc73b1..ae5870644751 100644
--- a/src/common/src/util/env_var.rs
+++ b/src/common/src/util/env_var.rs
@@ -34,16 +34,3 @@ pub fn env_var_is_true_or(key: impl AsRef, default: bool) -> bool {
})
.unwrap_or(default)
}
-
-/// Checks whether the environment variable `key` is set to `false` or `f` or `0`.
-///
-/// Returns `default` if the environment variable is not set, or contains invalid characters.
-pub fn env_var_is_false_or(key: impl AsRef, default: bool) -> bool {
- env::var(key)
- .map(|value| {
- ["0", "f", "false"]
- .iter()
- .any(|&s| value.eq_ignore_ascii_case(s))
- })
- .unwrap_or(default)
-}
diff --git a/src/config/example.toml b/src/config/example.toml
index e9076bf9bf08..c81b35163eaf 100644
--- a/src/config/example.toml
+++ b/src/config/example.toml
@@ -232,7 +232,7 @@ identity_resolution_timeout_s = 5
[storage.object_store.s3.developer]
retry_unknown_service_error = false
retryable_service_error_codes = ["SlowDown", "TooManyRequests"]
-use_opendal = false
+use_opendal = true
[system]
barrier_interval_ms = 1000
diff --git a/src/expr/core/src/aggregate/def.rs b/src/expr/core/src/aggregate/def.rs
index b050f8039e1c..59e02af81fa9 100644
--- a/src/expr/core/src/aggregate/def.rs
+++ b/src/expr/core/src/aggregate/def.rs
@@ -239,6 +239,7 @@ impl Display for AggKind {
}
}
+/// `FromStr` for builtin aggregate functions.
impl FromStr for AggKind {
type Err = ();
diff --git a/src/expr/core/src/window_function/kind.rs b/src/expr/core/src/window_function/kind.rs
index 04b320f8ce9f..3042facb5cff 100644
--- a/src/expr/core/src/window_function/kind.rs
+++ b/src/expr/core/src/window_function/kind.rs
@@ -19,7 +19,7 @@ use crate::aggregate::AggKind;
use crate::Result;
/// Kind of window functions.
-#[derive(Debug, Display, FromStr, Clone, PartialEq, Eq, Hash)]
+#[derive(Debug, Display, FromStr /* for builtin */, Clone, PartialEq, Eq, Hash)]
#[display(style = "snake_case")]
pub enum WindowFuncKind {
// General-purpose window functions.
diff --git a/src/frontend/planner_test/tests/testdata/output/agg.yaml b/src/frontend/planner_test/tests/testdata/output/agg.yaml
index 6a8a00eaa970..3c6f0d613361 100644
--- a/src/frontend/planner_test/tests/testdata/output/agg.yaml
+++ b/src/frontend/planner_test/tests/testdata/output/agg.yaml
@@ -801,7 +801,7 @@
Failed to bind expression: abs(a) FILTER (WHERE a > 0)
Caused by:
- Invalid input syntax: DISTINCT, ORDER BY or FILTER is only allowed in aggregation functions, but `abs` is not an aggregation function
+ Invalid input syntax: `FILTER` is not allowed in scalar/table function call
- name: prune column before filter
sql: |
create table t(v1 int, v2 int);
diff --git a/src/frontend/planner_test/tests/testdata/output/over_window_function.yaml b/src/frontend/planner_test/tests/testdata/output/over_window_function.yaml
index fe98be90cc66..6ab86ec9e20f 100644
--- a/src/frontend/planner_test/tests/testdata/output/over_window_function.yaml
+++ b/src/frontend/planner_test/tests/testdata/output/over_window_function.yaml
@@ -62,7 +62,7 @@
Failed to bind expression: lag(x)
Caused by:
- Invalid input syntax: Window function `lag` must have OVER clause
+ function lag(integer) does not exist, do you mean log
- id: lag with empty over clause
sql: |
create table t(x int);
diff --git a/src/frontend/src/binder/expr/function/aggregate.rs b/src/frontend/src/binder/expr/function/aggregate.rs
index d6410616c1d9..a9067205f77b 100644
--- a/src/frontend/src/binder/expr/function/aggregate.rs
+++ b/src/frontend/src/binder/expr/function/aggregate.rs
@@ -16,7 +16,7 @@ use itertools::Itertools;
use risingwave_common::bail_not_implemented;
use risingwave_common::types::{DataType, ScalarImpl};
use risingwave_expr::aggregate::{agg_kinds, AggKind, PbAggKind};
-use risingwave_sqlparser::ast::{Function, FunctionArgExpr};
+use risingwave_sqlparser::ast::{self, FunctionArgExpr};
use crate::binder::Clause;
use crate::error::{ErrorCode, Result};
@@ -48,21 +48,22 @@ impl Binder {
pub(super) fn bind_aggregate_function(
&mut self,
- f: Function,
kind: AggKind,
+ distinct: bool,
+ args: Vec,
+ order_by: Vec,
+ within_group: Option>,
+ filter: Option>,
) -> Result {
self.ensure_aggregate_allowed()?;
- let distinct = f.arg_list.distinct;
- let filter_expr = f.filter.clone();
-
let (direct_args, args, order_by) = if matches!(kind, agg_kinds::ordered_set!()) {
- self.bind_ordered_set_agg(f, kind.clone())?
+ self.bind_ordered_set_agg(&kind, distinct, args, order_by, within_group)?
} else {
- self.bind_normal_agg(f, kind.clone())?
+ self.bind_normal_agg(&kind, distinct, args, order_by, within_group)?
};
- let filter = match filter_expr {
+ let filter = match filter {
Some(filter) => {
let mut clause = Some(Clause::Filter);
std::mem::swap(&mut self.context.clause, &mut clause);
@@ -96,8 +97,11 @@ impl Binder {
fn bind_ordered_set_agg(
&mut self,
- f: Function,
- kind: AggKind,
+ kind: &AggKind,
+ distinct: bool,
+ args: Vec,
+ order_by: Vec,
+ within_group: Option>,
) -> Result<(Vec, Vec, OrderBy)> {
// Syntax:
// aggregate_name ( [ expression [ , ... ] ] ) WITHIN GROUP ( order_by_clause ) [ FILTER
@@ -105,44 +109,38 @@ impl Binder {
assert!(matches!(kind, agg_kinds::ordered_set!()));
- if !f.arg_list.order_by.is_empty() {
+ if !order_by.is_empty() {
return Err(ErrorCode::InvalidInputSyntax(format!(
- "ORDER BY is not allowed for ordered-set aggregation `{}`",
+ "`ORDER BY` is not allowed for ordered-set aggregation `{}`",
kind
))
.into());
}
- if f.arg_list.distinct {
+ if distinct {
return Err(ErrorCode::InvalidInputSyntax(format!(
- "DISTINCT is not allowed for ordered-set aggregation `{}`",
+ "`DISTINCT` is not allowed for ordered-set aggregation `{}`",
kind
))
.into());
}
- let within_group = *f.within_group.ok_or_else(|| {
+ let within_group = *within_group.ok_or_else(|| {
ErrorCode::InvalidInputSyntax(format!(
- "WITHIN GROUP is expected for ordered-set aggregation `{}`",
+ "`WITHIN GROUP` is expected for ordered-set aggregation `{}`",
kind
))
})?;
- let mut direct_args: Vec<_> = f
- .arg_list
- .args
- .into_iter()
- .map(|arg| self.bind_function_arg(arg))
- .flatten_ok()
- .try_collect()?;
+ let mut direct_args = args;
let mut args =
self.bind_function_expr_arg(FunctionArgExpr::Expr(within_group.expr.clone()))?;
let order_by = OrderBy::new(vec![self.bind_order_by_expr(within_group)?]);
// check signature and do implicit cast
- match (&kind, direct_args.len(), args.as_mut_slice()) {
+ match (kind, direct_args.len(), args.as_mut_slice()) {
(AggKind::Builtin(PbAggKind::PercentileCont | PbAggKind::PercentileDisc), 1, [arg]) => {
let fraction = &mut direct_args[0];
- decimal_to_float64(fraction, &kind)?;
+ decimal_to_float64(fraction, kind)?;
if matches!(&kind, AggKind::Builtin(PbAggKind::PercentileCont)) {
arg.cast_implicit_mut(DataType::Float64).map_err(|_| {
ErrorCode::InvalidInputSyntax(format!(
@@ -155,11 +153,11 @@ impl Binder {
(AggKind::Builtin(PbAggKind::Mode), 0, [_arg]) => {}
(AggKind::Builtin(PbAggKind::ApproxPercentile), 1..=2, [_percentile_col]) => {
let percentile = &mut direct_args[0];
- decimal_to_float64(percentile, &kind)?;
+ decimal_to_float64(percentile, kind)?;
match direct_args.len() {
2 => {
let relative_error = &mut direct_args[1];
- decimal_to_float64(relative_error, &kind)?;
+ decimal_to_float64(relative_error, kind)?;
}
1 => {
let relative_error: ExprImpl = Literal::new(
@@ -198,8 +196,11 @@ impl Binder {
fn bind_normal_agg(
&mut self,
- f: Function,
- kind: AggKind,
+ kind: &AggKind,
+ distinct: bool,
+ args: Vec,
+ order_by: Vec,
+ within_group: Option>,
) -> Result<(Vec, Vec, OrderBy)> {
// Syntax:
// aggregate_name (expression [ , ... ] [ order_by_clause ] ) [ FILTER ( WHERE
@@ -212,30 +213,22 @@ impl Binder {
assert!(!matches!(kind, agg_kinds::ordered_set!()));
- if f.within_group.is_some() {
+ if within_group.is_some() {
return Err(ErrorCode::InvalidInputSyntax(format!(
- "WITHIN GROUP is not allowed for non-ordered-set aggregation `{}`",
+ "`WITHIN GROUP` is not allowed for non-ordered-set aggregation `{}`",
kind
))
.into());
}
- let args: Vec<_> = f
- .arg_list
- .args
- .iter()
- .map(|arg| self.bind_function_arg(arg.clone()))
- .flatten_ok()
- .try_collect()?;
let order_by = OrderBy::new(
- f.arg_list
- .order_by
+ order_by
.into_iter()
.map(|e| self.bind_order_by_expr(e))
.try_collect()?,
);
- if f.arg_list.distinct {
+ if distinct {
if matches!(
kind,
AggKind::Builtin(PbAggKind::ApproxCountDistinct)
diff --git a/src/frontend/src/binder/expr/function/mod.rs b/src/frontend/src/binder/expr/function/mod.rs
index 898a98c57dea..2793ee54b85d 100644
--- a/src/frontend/src/binder/expr/function/mod.rs
+++ b/src/frontend/src/binder/expr/function/mod.rs
@@ -27,6 +27,7 @@ use risingwave_sqlparser::parser::ParserError;
use crate::binder::bind_context::Clause;
use crate::binder::{Binder, UdfContext};
+use crate::catalog::function_catalog::FunctionCatalog;
use crate::error::{ErrorCode, Result, RwError};
use crate::expr::{
Expr, ExprImpl, ExprType, FunctionCallWithLambda, InputRef, TableFunction, TableFunctionType,
@@ -60,13 +61,27 @@ pub(super) fn is_sys_function_without_args(ident: &Ident) -> bool {
/// stack is set to `16`.
const SQL_UDF_MAX_CALLING_DEPTH: u32 = 16;
-impl Binder {
- pub(in crate::binder) fn bind_function(&mut self, f: Function) -> Result {
- if f.arg_list.ignore_nulls {
- bail_not_implemented!("IGNORE NULLS is not supported yet");
+macro_rules! reject_syntax {
+ ($pred:expr, $msg:expr) => {
+ if $pred {
+ return Err(ErrorCode::InvalidInputSyntax($msg.to_string()).into());
}
+ };
+}
- let function_name = match f.name.0.as_slice() {
+impl Binder {
+ pub(in crate::binder) fn bind_function(
+ &mut self,
+ Function {
+ scalar_as_agg,
+ name,
+ arg_list,
+ within_group,
+ filter,
+ over,
+ }: Function,
+ ) -> Result {
+ let func_name = match name.0.as_slice() {
[name] => name.real_value(),
[schema, name] => {
let schema_name = schema.real_value();
@@ -95,7 +110,7 @@ impl Binder {
);
}
}
- _ => bail_not_implemented!(issue = 112, "qualified function {}", f.name),
+ _ => bail_not_implemented!(issue = 112, "qualified function {}", name),
};
// FIXME: This is a hack to support [Bytebase queries](https://github.com/TennyZhuang/bytebase/blob/4a26f7c62b80e86e58ad2f77063138dc2f420623/backend/plugin/db/pg/sync.go#L549).
@@ -104,25 +119,44 @@ impl Binder {
// retrieve object comment, however we don't support casting a non-literal expression to
// regclass. We just hack the `obj_description` and `col_description` here, to disable it to
// bind its arguments.
- if function_name == "obj_description" || function_name == "col_description" {
+ if func_name == "obj_description" || func_name == "col_description" {
return Ok(ExprImpl::literal_varchar("".to_string()));
}
- if function_name == "array_transform" {
+
+ // special binding logic for `array_transform`
+ if func_name == "array_transform" {
// For type inference, we need to bind the array type first.
- return self.bind_array_transform(f);
+ reject_syntax!(
+ scalar_as_agg,
+ "`AGGREGATE:` prefix is not allowed for `array_transform`"
+ );
+ reject_syntax!(!arg_list.is_args_only(), "keywords like `DISTINCT`, `ORDER BY` are not allowed in `array_transform` argument list");
+ reject_syntax!(
+ within_group.is_some(),
+ "`WITHIN GROUP` is not allowed in `array_transform` call"
+ );
+ reject_syntax!(
+ filter.is_some(),
+ "`FILTER` is not allowed in `array_transform` call"
+ );
+ reject_syntax!(
+ over.is_some(),
+ "`OVER` is not allowed in `array_transform` call"
+ );
+ return self.bind_array_transform(arg_list.args);
}
- let mut inputs: Vec<_> = f
- .arg_list
+ let mut args: Vec<_> = arg_list
.args
.iter()
.map(|arg| self.bind_function_arg(arg.clone()))
.flatten_ok()
.try_collect()?;
- // `aggregate:` on a scalar function
- if f.scalar_as_agg {
- let mut scalar_inputs = inputs
+ let wrapped_agg_kind = if scalar_as_agg {
+ // Let's firstly try to apply the `AGGREGATE:` prefix.
+ // We will reject functions that are not able to be wrapped as aggregate function.
+ let mut array_args = args
.iter()
.enumerate()
.map(|(i, expr)| {
@@ -130,203 +164,201 @@ impl Binder {
})
.collect_vec();
let scalar: ExprImpl = if let Ok(schema) = self.first_valid_schema()
- && let Some(func) =
- schema.get_function_by_name_inputs(&function_name, &mut scalar_inputs)
+ && let Some(func) = schema.get_function_by_name_inputs(&func_name, &mut array_args)
{
if !func.kind.is_scalar() {
return Err(ErrorCode::InvalidInputSyntax(
- "expect a scalar function after `aggregate:`".to_string(),
+ "expect a scalar function after `AGGREGATE:`".to_string(),
)
.into());
}
- UserDefinedFunction::new(func.clone(), scalar_inputs).into()
+ UserDefinedFunction::new(func.clone(), array_args).into()
} else {
- self.bind_builtin_scalar_function(
- &function_name,
- scalar_inputs,
- f.arg_list.variadic,
- )?
+ self.bind_builtin_scalar_function(&func_name, array_args, arg_list.variadic)?
};
- return self.bind_aggregate_function(f, AggKind::WrapScalar(scalar.to_expr_proto()));
- }
- // user defined function
- // TODO: resolve schema name https://github.com/risingwavelabs/risingwave/issues/12422
- if let Ok(schema) = self.first_valid_schema()
- && let Some(func) = schema.get_function_by_name_inputs(&function_name, &mut inputs)
- {
- use crate::catalog::function_catalog::FunctionKind::*;
+ // now this is either an aggregate/window function call
+ Some(AggKind::WrapScalar(scalar.to_expr_proto()))
+ } else {
+ None
+ };
+ let udf = if wrapped_agg_kind.is_none()
+ && let Ok(schema) = self.first_valid_schema()
+ && let Some(func) = schema
+ .get_function_by_name_inputs(&func_name, &mut args)
+ .cloned()
+ {
if func.language == "sql" {
- if func.body.is_none() {
- return Err(ErrorCode::InvalidInputSyntax(
- "`body` must exist for sql udf".to_string(),
- )
- .into());
- }
-
- // This represents the current user defined function is `language sql`
- let parse_result = risingwave_sqlparser::parser::Parser::parse_sql(
- func.body.as_ref().unwrap().as_str(),
+ let name = format!("SQL user-defined function `{}`", func.name);
+ reject_syntax!(
+ scalar_as_agg,
+ format!("`AGGREGATE:` prefix is not allowed for {}", name)
);
- if let Err(ParserError::ParserError(err)) | Err(ParserError::TokenizerError(err)) =
- parse_result
- {
- // Here we just return the original parse error message
- return Err(ErrorCode::InvalidInputSyntax(err).into());
- }
-
- debug_assert!(parse_result.is_ok());
-
- // We can safely unwrap here
- let ast = parse_result.unwrap();
-
- // Stash the current `udf_context`
- // Note that the `udf_context` may be empty,
- // if the current binding is the root (top-most) sql udf.
- // In this case the empty context will be stashed
- // and restored later, no need to maintain other flags.
- let stashed_udf_context = self.udf_context.get_context();
-
- // The actual inline logic for sql udf
- // Note that we will always create new udf context for each sql udf
- let Ok(context) =
- UdfContext::create_udf_context(&f.arg_list.args, &Arc::clone(func))
- else {
- return Err(ErrorCode::InvalidInputSyntax(
- "failed to create the `udf_context`, please recheck your function definition and syntax".to_string()
+ reject_syntax!(
+ !arg_list.is_args_only(),
+ format!(
+ "keywords like `DISTINCT`, `ORDER BY` are not allowed in {} argument list",
+ name
)
- .into());
- };
-
- let mut udf_context = HashMap::new();
- for (c, e) in context {
- // Note that we need to bind the args before actual delve in the function body
- // This will update the context in the subsequent inner calling function
- // e.g.,
- // - create function print(INT) returns int language sql as 'select $1';
- // - create function print_add_one(INT) returns int language sql as 'select print($1 + 1)';
- // - select print_add_one(1); # The result should be 2 instead of 1.
- // Without the pre-binding here, the ($1 + 1) will not be correctly populated,
- // causing the result to always be 1.
- let Ok(e) = self.bind_expr(e) else {
- return Err(ErrorCode::BindError(
- "failed to bind the argument, please recheck the syntax".to_string(),
- )
- .into());
- };
- udf_context.insert(c, e);
- }
- self.udf_context.update_context(udf_context);
-
- // Check for potential recursive calling
- if self.udf_context.global_count() >= SQL_UDF_MAX_CALLING_DEPTH {
- return Err(ErrorCode::BindError(format!(
- "function {} calling stack depth limit exceeded",
- &function_name
- ))
- .into());
- } else {
- // Update the status for the global counter
- self.udf_context.incr_global_count();
- }
+ );
+ reject_syntax!(
+ within_group.is_some(),
+ format!("`WITHIN GROUP` is not allowed in {} call", name)
+ );
+ reject_syntax!(
+ filter.is_some(),
+ format!("`FILTER` is not allowed in {} call", name)
+ );
+ reject_syntax!(
+ over.is_some(),
+ format!("`OVER` is not allowed in {} call", name)
+ );
+ return self.bind_sql_udf(func, arg_list.args);
+ }
- if let Ok(expr) = UdfContext::extract_udf_expression(ast) {
- let bind_result = self.bind_expr(expr);
+ // now `func` is a non-SQL user-defined scalar/aggregate/table function
+ Some(func)
+ } else {
+ None
+ };
- // We should properly decrement global count after a successful binding
- // Since the subsequent probe operation in `bind_column` or
- // `bind_parameter` relies on global counting
- self.udf_context.decr_global_count();
+ let agg_kind = if let Some(wrapped_agg_kind) = wrapped_agg_kind {
+ Some(wrapped_agg_kind)
+ } else if let Some(ref udf) = udf
+ && udf.kind.is_aggregate()
+ {
+ Some(AggKind::UserDefined(udf.as_ref().into()))
+ } else if let Ok(kind) = AggKind::from_str(&func_name) {
+ Some(kind)
+ } else {
+ None
+ };
- // Restore context information for subsequent binding
- self.udf_context.update_context(stashed_udf_context);
+ // try to bind it as a window function call
+ if let Some(over) = over {
+ reject_syntax!(
+ arg_list.distinct,
+ "`DISTINCT` is not allowed in window function call"
+ );
+ reject_syntax!(
+ arg_list.variadic,
+ "`VARIADIC` is not allowed in window function call"
+ );
+ reject_syntax!(
+ !arg_list.order_by.is_empty(),
+ "`ORDER BY` is not allowed in window function call argument list"
+ );
+ reject_syntax!(
+ within_group.is_some(),
+ "`WITHIN GROUP` is not allowed in window function call"
+ );
- return bind_result;
- } else {
- return Err(ErrorCode::InvalidInputSyntax(
- "failed to parse the input query and extract the udf expression,
- please recheck the syntax"
- .to_string(),
- )
- .into());
- }
+ let kind = if let Some(agg_kind) = agg_kind {
+ // aggregate as window function
+ WindowFuncKind::Aggregate(agg_kind)
+ } else if let Ok(kind) = WindowFuncKind::from_str(&func_name) {
+ kind
} else {
- match &func.kind {
- Scalar { .. } => {
- return Ok(UserDefinedFunction::new(func.clone(), inputs).into())
- }
- Table { .. } => {
- self.ensure_table_function_allowed()?;
- return Ok(TableFunction::new_user_defined(func.clone(), inputs).into());
- }
- Aggregate => {
- return self.bind_aggregate_function(
- f,
- AggKind::UserDefined(func.as_ref().into()),
- );
- }
- }
- }
- }
-
- // agg calls
- if f.over.is_none()
- && let Ok(kind) = function_name.parse()
- {
- return self.bind_aggregate_function(f, AggKind::Builtin(kind));
+ bail_not_implemented!(issue = 8961, "Unrecognized window function: {}", func_name);
+ };
+ return self.bind_window_function(kind, args, arg_list.ignore_nulls, filter, over);
}
- if f.arg_list.distinct || !f.arg_list.order_by.is_empty() || f.filter.is_some() {
- return Err(ErrorCode::InvalidInputSyntax(format!(
- "DISTINCT, ORDER BY or FILTER is only allowed in aggregation functions, but `{}` is not an aggregation function", function_name
- )
- )
- .into());
+ // now it's a aggregate/scalar/table function call
+ reject_syntax!(
+ arg_list.ignore_nulls,
+ "`IGNORE NULLS` is not allowed in aggregate/scalar/table function call"
+ );
+
+ // try to bind it as an aggregate function call
+ if let Some(agg_kind) = agg_kind {
+ reject_syntax!(
+ arg_list.variadic,
+ "`VARIADIC` is not allowed in aggregate function call"
+ );
+ return self.bind_aggregate_function(
+ agg_kind,
+ arg_list.distinct,
+ args,
+ arg_list.order_by,
+ within_group,
+ filter,
+ );
}
- // window function
- let window_func_kind = WindowFuncKind::from_str(function_name.as_str());
- if let Ok(kind) = window_func_kind {
- if let Some(window_spec) = f.over {
- return self.bind_window_function(kind, inputs, window_spec);
+ // now it's a scalar/table function call
+ reject_syntax!(
+ arg_list.distinct,
+ "`DISTINCT` is not allowed in scalar/table function call"
+ );
+ reject_syntax!(
+ !arg_list.order_by.is_empty(),
+ "`ORDER BY` is not allowed in scalar/table function call"
+ );
+ reject_syntax!(
+ within_group.is_some(),
+ "`WITHIN GROUP` is not allowed in scalar/table function call"
+ );
+ reject_syntax!(
+ filter.is_some(),
+ "`FILTER` is not allowed in scalar/table function call"
+ );
+
+ // try to bind it as a table function call
+ {
+ // `file_scan` table function
+ if func_name.eq_ignore_ascii_case("file_scan") {
+ reject_syntax!(
+ arg_list.variadic,
+ "`VARIADIC` is not allowed in table function call"
+ );
+ self.ensure_table_function_allowed()?;
+ return Ok(TableFunction::new_file_scan(args)?.into());
+ }
+ // UDTF
+ if let Some(ref udf) = udf
+ && udf.kind.is_table()
+ {
+ reject_syntax!(
+ arg_list.variadic,
+ "`VARIADIC` is not allowed in table function call"
+ );
+ self.ensure_table_function_allowed()?;
+ return Ok(TableFunction::new_user_defined(udf.clone(), args).into());
+ }
+ // builtin table function
+ if let Ok(function_type) = TableFunctionType::from_str(&func_name) {
+ reject_syntax!(
+ arg_list.variadic,
+ "`VARIADIC` is not allowed in table function call"
+ );
+ self.ensure_table_function_allowed()?;
+ return Ok(TableFunction::new(function_type, args)?.into());
}
- return Err(ErrorCode::InvalidInputSyntax(format!(
- "Window function `{}` must have OVER clause",
- function_name
- ))
- .into());
- } else if f.over.is_some() {
- bail_not_implemented!(
- issue = 8961,
- "Unrecognized window function: {}",
- function_name
- );
}
- // file_scan table function
- if function_name.eq_ignore_ascii_case("file_scan") {
- self.ensure_table_function_allowed()?;
- return Ok(TableFunction::new_file_scan(inputs)?.into());
- }
- // table function
- if let Ok(function_type) = TableFunctionType::from_str(function_name.as_str()) {
- self.ensure_table_function_allowed()?;
- return Ok(TableFunction::new(function_type, inputs)?.into());
+ // try to bind it as a scalar function call
+ if let Some(ref udf) = udf {
+ assert!(udf.kind.is_scalar());
+ reject_syntax!(
+ arg_list.variadic,
+ "`VARIADIC` is not allowed in user-defined function call"
+ );
+ return Ok(UserDefinedFunction::new(udf.clone(), args).into());
}
- self.bind_builtin_scalar_function(function_name.as_str(), inputs, f.arg_list.variadic)
+ self.bind_builtin_scalar_function(&func_name, args, arg_list.variadic)
}
- fn bind_array_transform(&mut self, f: Function) -> Result {
- let [array, lambda] =
- <[FunctionArg; 2]>::try_from(f.arg_list.args).map_err(|args| -> RwError {
- ErrorCode::BindError(format!(
- "`array_transform` expect two inputs `array` and `lambda`, but {} were given",
- args.len()
- ))
- .into()
- })?;
+ fn bind_array_transform(&mut self, args: Vec) -> Result {
+ let [array, lambda] = <[FunctionArg; 2]>::try_from(args).map_err(|args| -> RwError {
+ ErrorCode::BindError(format!(
+ "`array_transform` expect two inputs `array` and `lambda`, but {} were given",
+ args.len()
+ ))
+ .into()
+ })?;
let bound_array = self.bind_function_arg(array)?;
let [bound_array] = <[ExprImpl; 1]>::try_from(bound_array).map_err(|bound_array| -> RwError {
@@ -416,6 +448,102 @@ impl Binder {
Ok(())
}
+ fn bind_sql_udf(
+ &mut self,
+ func: Arc,
+ args: Vec,
+ ) -> Result {
+ if func.body.is_none() {
+ return Err(
+ ErrorCode::InvalidInputSyntax("`body` must exist for sql udf".to_string()).into(),
+ );
+ }
+
+ // This represents the current user defined function is `language sql`
+ let parse_result =
+ risingwave_sqlparser::parser::Parser::parse_sql(func.body.as_ref().unwrap().as_str());
+ if let Err(ParserError::ParserError(err)) | Err(ParserError::TokenizerError(err)) =
+ parse_result
+ {
+ // Here we just return the original parse error message
+ return Err(ErrorCode::InvalidInputSyntax(err).into());
+ }
+
+ debug_assert!(parse_result.is_ok());
+
+ // We can safely unwrap here
+ let ast = parse_result.unwrap();
+
+ // Stash the current `udf_context`
+ // Note that the `udf_context` may be empty,
+ // if the current binding is the root (top-most) sql udf.
+ // In this case the empty context will be stashed
+ // and restored later, no need to maintain other flags.
+ let stashed_udf_context = self.udf_context.get_context();
+
+ // The actual inline logic for sql udf
+ // Note that we will always create new udf context for each sql udf
+ let Ok(context) = UdfContext::create_udf_context(&args, &func) else {
+ return Err(ErrorCode::InvalidInputSyntax(
+ "failed to create the `udf_context`, please recheck your function definition and syntax".to_string()
+ )
+ .into());
+ };
+
+ let mut udf_context = HashMap::new();
+ for (c, e) in context {
+ // Note that we need to bind the args before actual delve in the function body
+ // This will update the context in the subsequent inner calling function
+ // e.g.,
+ // - create function print(INT) returns int language sql as 'select $1';
+ // - create function print_add_one(INT) returns int language sql as 'select print($1 + 1)';
+ // - select print_add_one(1); # The result should be 2 instead of 1.
+ // Without the pre-binding here, the ($1 + 1) will not be correctly populated,
+ // causing the result to always be 1.
+ let Ok(e) = self.bind_expr(e) else {
+ return Err(ErrorCode::BindError(
+ "failed to bind the argument, please recheck the syntax".to_string(),
+ )
+ .into());
+ };
+ udf_context.insert(c, e);
+ }
+ self.udf_context.update_context(udf_context);
+
+ // Check for potential recursive calling
+ if self.udf_context.global_count() >= SQL_UDF_MAX_CALLING_DEPTH {
+ return Err(ErrorCode::BindError(format!(
+ "function {} calling stack depth limit exceeded",
+ func.name
+ ))
+ .into());
+ } else {
+ // Update the status for the global counter
+ self.udf_context.incr_global_count();
+ }
+
+ if let Ok(expr) = UdfContext::extract_udf_expression(ast) {
+ let bind_result = self.bind_expr(expr);
+
+ // We should properly decrement global count after a successful binding
+ // Since the subsequent probe operation in `bind_column` or
+ // `bind_parameter` relies on global counting
+ self.udf_context.decr_global_count();
+
+ // Restore context information for subsequent binding
+ self.udf_context.update_context(stashed_udf_context);
+
+ return bind_result;
+ }
+
+ Err(ErrorCode::InvalidInputSyntax(
+ "failed to parse the input query and extract the udf expression,
+ please recheck the syntax"
+ .to_string(),
+ )
+ .into())
+ }
+
pub(in crate::binder) fn bind_function_expr_arg(
&mut self,
arg_expr: FunctionArgExpr,
diff --git a/src/frontend/src/binder/expr/function/window.rs b/src/frontend/src/binder/expr/function/window.rs
index 3124d4717dd8..03288cbf9e24 100644
--- a/src/frontend/src/binder/expr/function/window.rs
+++ b/src/frontend/src/binder/expr/function/window.rs
@@ -57,7 +57,9 @@ impl Binder {
pub(super) fn bind_window_function(
&mut self,
kind: WindowFuncKind,
- inputs: Vec,
+ args: Vec,
+ ignore_nulls: bool,
+ filter: Option>,
WindowSpec {
partition_by,
order_by,
@@ -65,6 +67,15 @@ impl Binder {
}: WindowSpec,
) -> Result {
self.ensure_window_function_allowed()?;
+
+ if ignore_nulls {
+ bail_not_implemented!("`IGNORE NULLS` is not supported yet");
+ }
+
+ if filter.is_some() {
+ bail_not_implemented!("`FILTER` is not supported yet");
+ }
+
let partition_by = partition_by
.into_iter()
.map(|arg| self.bind_expr_inner(arg))
@@ -181,7 +192,7 @@ impl Binder {
} else {
None
};
- Ok(WindowFunction::new(kind, partition_by, order_by, inputs, frame)?.into())
+ Ok(WindowFunction::new(kind, partition_by, order_by, args, frame)?.into())
}
fn bind_window_frame_usize_bounds(
diff --git a/src/frontend/src/binder/mod.rs b/src/frontend/src/binder/mod.rs
index ebbf965d55e7..96ab6251fa3f 100644
--- a/src/frontend/src/binder/mod.rs
+++ b/src/frontend/src/binder/mod.rs
@@ -68,7 +68,7 @@ use crate::catalog::schema_catalog::SchemaCatalog;
use crate::catalog::{CatalogResult, TableId, ViewId};
use crate::error::ErrorCode;
use crate::expr::ExprImpl;
-use crate::session::{AuthContext, SessionImpl};
+use crate::session::{AuthContext, SessionImpl, TemporarySourceManager};
pub type ShareId = usize;
@@ -127,6 +127,9 @@ pub struct Binder {
/// The sql udf context that will be used during binding phase
udf_context: UdfContext,
+
+ /// The temporary sources that will be used during binding phase
+ temporary_source_manager: TemporarySourceManager,
}
#[derive(Clone, Debug, Default)]
@@ -360,6 +363,7 @@ impl Binder {
included_relations: HashSet::new(),
param_types: ParameterTypes::new(param_types),
udf_context: UdfContext::new(),
+ temporary_source_manager: session.temporary_source_manager(),
}
}
diff --git a/src/frontend/src/binder/relation/table_or_source.rs b/src/frontend/src/binder/relation/table_or_source.rs
index c0ff372581b1..c2688175be0a 100644
--- a/src/frontend/src/binder/relation/table_or_source.rs
+++ b/src/frontend/src/binder/relation/table_or_source.rs
@@ -120,6 +120,11 @@ impl Binder {
table_name
);
}
+ } else if let Some(source_catalog) =
+ self.temporary_source_manager.get_source(table_name)
+ // don't care about the database and schema
+ {
+ self.resolve_source_relation(&source_catalog.clone(), as_of)
} else if let Ok((table_catalog, schema_name)) = self
.catalog
.get_created_table_by_name(&self.db_name, schema_path, table_name)
@@ -163,7 +168,13 @@ impl Binder {
if let Ok(schema) =
self.catalog.get_schema_by_name(&self.db_name, schema_name)
{
- if let Some(table_catalog) =
+ if let Some(source_catalog) =
+ self.temporary_source_manager.get_source(table_name)
+ // don't care about the database and schema
+ {
+ return Ok(self
+ .resolve_source_relation(&source_catalog.clone(), as_of));
+ } else if let Some(table_catalog) =
schema.get_created_table_by_name(table_name)
{
return self.resolve_table_relation(
diff --git a/src/frontend/src/expr/window_function.rs b/src/frontend/src/expr/window_function.rs
index 70f6a79866fb..8f2e6c66728d 100644
--- a/src/frontend/src/expr/window_function.rs
+++ b/src/frontend/src/expr/window_function.rs
@@ -16,11 +16,11 @@ use itertools::Itertools;
use risingwave_common::bail_not_implemented;
use risingwave_common::types::DataType;
use risingwave_expr::aggregate::AggKind;
-use risingwave_expr::sig::FUNCTION_REGISTRY;
use risingwave_expr::window_function::{Frame, WindowFuncKind};
use super::{Expr, ExprImpl, OrderBy, RwResult};
use crate::error::{ErrorCode, RwError};
+use crate::expr::infer_type;
/// A window function performs a calculation across a set of table rows that are somehow related to
/// the current row, according to the window spec `OVER (PARTITION BY .. ORDER BY ..)`.
@@ -45,10 +45,10 @@ impl WindowFunction {
kind: WindowFuncKind,
partition_by: Vec,
order_by: OrderBy,
- args: Vec,
+ mut args: Vec,
frame: Option,
) -> RwResult {
- let return_type = Self::infer_return_type(&kind, &args)?;
+ let return_type = Self::infer_return_type(&kind, &mut args)?;
Ok(Self {
kind,
args,
@@ -59,7 +59,7 @@ impl WindowFunction {
})
}
- fn infer_return_type(kind: &WindowFuncKind, args: &[ExprImpl]) -> RwResult {
+ fn infer_return_type(kind: &WindowFuncKind, args: &mut [ExprImpl]) -> RwResult {
use WindowFuncKind::*;
match (kind, args) {
(RowNumber, []) => Ok(DataType::Int64),
@@ -87,13 +87,13 @@ impl WindowFunction {
);
}
- (Aggregate(AggKind::Builtin(agg_kind)), args) => {
- let arg_types = args.iter().map(ExprImpl::return_type).collect::>();
- let return_type = FUNCTION_REGISTRY.get_return_type(*agg_kind, &arg_types)?;
- Ok(return_type)
- }
+ (Aggregate(agg_kind), args) => Ok(match agg_kind {
+ AggKind::Builtin(kind) => infer_type((*kind).into(), args)?,
+ AggKind::UserDefined(udf) => udf.return_type.as_ref().unwrap().into(),
+ AggKind::WrapScalar(expr) => expr.return_type.as_ref().unwrap().into(),
+ }),
- _ => {
+ (_, args) => {
let args = args
.iter()
.map(|e| format!("{}", e.return_type()))
diff --git a/src/frontend/src/handler/create_source.rs b/src/frontend/src/handler/create_source.rs
index 09d5b8232e38..0be418aecd9d 100644
--- a/src/frontend/src/handler/create_source.rs
+++ b/src/frontend/src/handler/create_source.rs
@@ -73,7 +73,7 @@ use thiserror_ext::AsReport;
use super::RwPgResponse;
use crate::binder::Binder;
use crate::catalog::source_catalog::SourceCatalog;
-use crate::catalog::{DatabaseId, SchemaId};
+use crate::catalog::{CatalogError, DatabaseId, SchemaId};
use crate::error::ErrorCode::{self, Deprecated, InvalidInputSyntax, NotSupported, ProtocolError};
use crate::error::{Result, RwError};
use crate::expr::Expr;
@@ -1657,6 +1657,15 @@ pub async fn handle_create_source(
)
.await?;
+ // If it is a temporary source, put it into SessionImpl.
+ if stmt.temporary {
+ if session.get_temporary_source(&source_catalog.name).is_some() {
+ return Err(CatalogError::Duplicated("source", source_catalog.name.clone()).into());
+ }
+ session.create_temporary_source(source_catalog);
+ return Ok(PgResponse::empty_result(StatementType::CREATE_SOURCE));
+ }
+
let source = source_catalog.to_prost(schema_id, database_id);
let catalog_writer = session.catalog_writer()?;
diff --git a/src/frontend/src/handler/drop_source.rs b/src/frontend/src/handler/drop_source.rs
index fa5c5d7b9b1f..711cf2d7caec 100644
--- a/src/frontend/src/handler/drop_source.rs
+++ b/src/frontend/src/handler/drop_source.rs
@@ -33,6 +33,12 @@ pub async fn handle_drop_source(
let search_path = session.config().search_path();
let user_name = &session.auth_context().user_name;
+ // Check if temporary source exists, if yes, drop it.
+ if let Some(_source) = session.get_temporary_source(&source_name) {
+ session.drop_temporary_source(&source_name);
+ return Ok(PgResponse::empty_result(StatementType::DROP_SOURCE));
+ }
+
let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name);
let (source, schema_name) = {
diff --git a/src/frontend/src/handler/show.rs b/src/frontend/src/handler/show.rs
index 6c0bc2c7e61e..176dee0dd1e8 100644
--- a/src/frontend/src/handler/show.rs
+++ b/src/frontend/src/handler/show.rs
@@ -319,6 +319,7 @@ pub async fn handle_show_object(
.get_schema_by_name(session.database(), &schema_or_default(&schema))?
.iter_source()
.map(|t| t.name.clone())
+ .chain(session.temporary_source_manager().keys())
.collect(),
ShowObject::Sink { schema } => catalog_reader
.read_guard()
diff --git a/src/frontend/src/session.rs b/src/frontend/src/session.rs
index 7f61a81d90f4..16f0c7226be2 100644
--- a/src/frontend/src/session.rs
+++ b/src/frontend/src/session.rs
@@ -89,6 +89,7 @@ use crate::catalog::catalog_service::{CatalogReader, CatalogWriter, CatalogWrite
use crate::catalog::connection_catalog::ConnectionCatalog;
use crate::catalog::root_catalog::Catalog;
use crate::catalog::secret_catalog::SecretCatalog;
+use crate::catalog::source_catalog::SourceCatalog;
use crate::catalog::subscription_catalog::SubscriptionCatalog;
use crate::catalog::{
check_schema_writable, CatalogError, DatabaseId, OwnedByUserCatalog, SchemaId, TableId,
@@ -652,6 +653,46 @@ pub struct SessionImpl {
last_idle_instant: Arc>>,
cursor_manager: Arc,
+
+ /// temporary sources for the current session
+ temporary_source_manager: Arc>,
+}
+
+/// If TEMPORARY or TEMP is specified, the source is created as a temporary source.
+/// Temporary sources are automatically dropped at the end of a session
+/// Temporary sources are expected to be selected by batch queries, not streaming queries.
+/// Temporary sources currently are only used by cloud portal to preview the data during table and
+/// source creation, so it is a internal feature and not exposed to users.
+/// The current PR supports temporary source with minimum effort,
+/// so we don't care about the database name and schema name, but only care about the source name.
+/// Temporary sources can only be shown via `show sources` command but not other system tables.
+#[derive(Default, Clone)]
+pub struct TemporarySourceManager {
+ sources: HashMap,
+}
+
+impl TemporarySourceManager {
+ pub fn new() -> Self {
+ Self {
+ sources: HashMap::new(),
+ }
+ }
+
+ pub fn create_source(&mut self, name: String, source: SourceCatalog) {
+ self.sources.insert(name, source);
+ }
+
+ pub fn drop_source(&mut self, name: &str) {
+ self.sources.remove(name);
+ }
+
+ pub fn get_source(&self, name: &str) -> Option<&SourceCatalog> {
+ self.sources.get(name)
+ }
+
+ pub fn keys(&self) -> Vec {
+ self.sources.keys().cloned().collect()
+ }
}
#[derive(Error, Debug)]
@@ -693,6 +734,7 @@ impl SessionImpl {
exec_context: Mutex::new(None),
last_idle_instant: Default::default(),
cursor_manager: Arc::new(CursorManager::default()),
+ temporary_source_manager: Default::default(),
}
}
@@ -720,6 +762,7 @@ impl SessionImpl {
.into(),
last_idle_instant: Default::default(),
cursor_manager: Arc::new(CursorManager::default()),
+ temporary_source_manager: Default::default(),
}
}
@@ -1130,6 +1173,27 @@ impl SessionImpl {
Duration::from_secs(self.config().statement_timeout() as u64)
}
}
+
+ pub fn create_temporary_source(&self, source: SourceCatalog) {
+ self.temporary_source_manager
+ .lock()
+ .create_source(source.name.to_string(), source);
+ }
+
+ pub fn get_temporary_source(&self, name: &str) -> Option {
+ self.temporary_source_manager
+ .lock()
+ .get_source(name)
+ .cloned()
+ }
+
+ pub fn drop_temporary_source(&self, name: &str) {
+ self.temporary_source_manager.lock().drop_source(name);
+ }
+
+ pub fn temporary_source_manager(&self) -> TemporarySourceManager {
+ self.temporary_source_manager.lock().clone()
+ }
}
pub static SESSION_MANAGER: std::sync::OnceLock> =
diff --git a/src/object_store/src/object/error.rs b/src/object_store/src/object/error.rs
index aa79f53c4c06..11cda88468c4 100644
--- a/src/object_store/src/object/error.rs
+++ b/src/object_store/src/object/error.rs
@@ -101,14 +101,18 @@ impl ObjectError {
false
}
- pub fn should_retry(&self) -> bool {
+ pub fn should_retry(&self, retry_opendal_s3_unknown_error: bool) -> bool {
match self.inner() {
ObjectErrorInner::S3 {
inner: _,
should_retry,
} => *should_retry,
- ObjectErrorInner::Opendal(e) => e.is_temporary(),
+ ObjectErrorInner::Opendal(e) => {
+ e.is_temporary()
+ || (retry_opendal_s3_unknown_error
+ && e.kind() == opendal::ErrorKind::Unexpected)
+ }
ObjectErrorInner::Timeout(_) => true,
diff --git a/src/object_store/src/object/mod.rs b/src/object_store/src/object/mod.rs
index 8ee8dc078fe1..aff197263f8f 100644
--- a/src/object_store/src/object/mod.rs
+++ b/src/object_store/src/object/mod.rs
@@ -567,6 +567,7 @@ impl MonitoredObjectStore {
pub async fn upload(&self, path: &str, obj: Bytes) -> ObjectResult<()> {
let operation_type = OperationType::Upload;
let operation_type_str = operation_type.as_str();
+ let media_type = self.media_type();
self.object_store_metrics
.write_bytes
@@ -578,7 +579,7 @@ impl MonitoredObjectStore {
let _timer = self
.object_store_metrics
.operation_latency
- .with_label_values(&[self.media_type(), operation_type_str])
+ .with_label_values(&[media_type, operation_type_str])
.start_timer();
let builder = || async {
@@ -593,6 +594,7 @@ impl MonitoredObjectStore {
&self.config,
operation_type,
self.object_store_metrics.clone(),
+ media_type,
)
.await;
@@ -630,10 +632,12 @@ impl MonitoredObjectStore {
pub async fn read(&self, path: &str, range: impl ObjectRangeBounds) -> ObjectResult {
let operation_type = OperationType::Read;
let operation_type_str = operation_type.as_str();
+ let media_type = self.media_type();
+
let _timer = self
.object_store_metrics
.operation_latency
- .with_label_values(&[self.media_type(), operation_type_str])
+ .with_label_values(&[media_type, operation_type_str])
.start_timer();
let builder = || async {
@@ -648,6 +652,7 @@ impl MonitoredObjectStore {
&self.config,
operation_type,
self.object_store_metrics.clone(),
+ media_type,
)
.await;
@@ -701,6 +706,7 @@ impl MonitoredObjectStore {
&self.config,
operation_type,
self.object_store_metrics.clone(),
+ media_type,
)
.await;
@@ -719,10 +725,11 @@ impl MonitoredObjectStore {
pub async fn metadata(&self, path: &str) -> ObjectResult {
let operation_type = OperationType::Metadata;
let operation_type_str = operation_type.as_str();
+ let media_type = self.media_type();
let _timer = self
.object_store_metrics
.operation_latency
- .with_label_values(&[self.media_type(), operation_type_str])
+ .with_label_values(&[media_type, operation_type_str])
.start_timer();
let builder = || async {
@@ -737,6 +744,7 @@ impl MonitoredObjectStore {
&self.config,
operation_type,
self.object_store_metrics.clone(),
+ media_type,
)
.await;
@@ -747,10 +755,12 @@ impl MonitoredObjectStore {
pub async fn delete(&self, path: &str) -> ObjectResult<()> {
let operation_type = OperationType::Delete;
let operation_type_str = operation_type.as_str();
+ let media_type = self.media_type();
+
let _timer = self
.object_store_metrics
.operation_latency
- .with_label_values(&[self.media_type(), operation_type_str])
+ .with_label_values(&[media_type, operation_type_str])
.start_timer();
let builder = || async {
@@ -765,6 +775,7 @@ impl MonitoredObjectStore {
&self.config,
operation_type,
self.object_store_metrics.clone(),
+ media_type,
)
.await;
@@ -775,6 +786,8 @@ impl MonitoredObjectStore {
async fn delete_objects(&self, paths: &[String]) -> ObjectResult<()> {
let operation_type = OperationType::DeleteObjects;
let operation_type_str = operation_type.as_str();
+ let media_type = self.media_type();
+
let _timer = self
.object_store_metrics
.operation_latency
@@ -793,6 +806,7 @@ impl MonitoredObjectStore {
&self.config,
operation_type,
self.object_store_metrics.clone(),
+ media_type,
)
.await;
@@ -803,11 +817,12 @@ impl MonitoredObjectStore {
pub async fn list(&self, prefix: &str) -> ObjectResult {
let operation_type = OperationType::List;
let operation_type_str = operation_type.as_str();
+ let media_type = self.media_type();
let _timer = self
.object_store_metrics
.operation_latency
- .with_label_values(&[self.media_type(), operation_type_str])
+ .with_label_values(&[media_type, operation_type_str])
.start_timer();
let builder = || async {
@@ -822,6 +837,7 @@ impl MonitoredObjectStore {
&self.config,
operation_type,
self.object_store_metrics.clone(),
+ media_type,
)
.await;
@@ -1101,20 +1117,26 @@ struct RetryCondition {
operation_type: OperationType,
retry_count: usize,
metrics: Arc,
+ retry_opendal_s3_unknown_error: bool,
}
impl RetryCondition {
- fn new(operation_type: OperationType, metrics: Arc) -> Self {
+ fn new(
+ operation_type: OperationType,
+ metrics: Arc,
+ retry_opendal_s3_unknown_error: bool,
+ ) -> Self {
Self {
operation_type,
retry_count: 0,
metrics,
+ retry_opendal_s3_unknown_error,
}
}
#[inline(always)]
fn should_retry_inner(&mut self, err: &ObjectError) -> bool {
- let should_retry = err.should_retry();
+ let should_retry = err.should_retry(self.retry_opendal_s3_unknown_error);
if should_retry {
self.retry_count += 1;
}
@@ -1145,6 +1167,7 @@ async fn retry_request(
config: &ObjectStoreConfig,
operation_type: OperationType,
object_store_metrics: Arc,
+ media_type: &'static str,
) -> ObjectResult
where
B: Fn() -> F,
@@ -1155,7 +1178,13 @@ where
Duration::from_millis(get_attempt_timeout_by_type(config, operation_type));
let operation_type_str = operation_type.as_str();
- let retry_condition = RetryCondition::new(operation_type, object_store_metrics);
+ let retry_condition = RetryCondition::new(
+ operation_type,
+ object_store_metrics,
+ (config.s3.developer.retry_unknown_service_error || config.s3.retry_unknown_service_error)
+ && (media_type == opendal_engine::MediaType::S3.as_str()
+ || media_type == opendal_engine::MediaType::Minio.as_str()),
+ );
let f = || async {
let future = builder();
diff --git a/src/object_store/src/object/opendal_engine/azblob.rs b/src/object_store/src/object/opendal_engine/azblob.rs
index e584e59aafe8..24ccacb3c649 100644
--- a/src/object_store/src/object/opendal_engine/azblob.rs
+++ b/src/object_store/src/object/opendal_engine/azblob.rs
@@ -19,7 +19,7 @@ use opendal::services::Azblob;
use opendal::Operator;
use risingwave_common::config::ObjectStoreConfig;
-use super::{EngineType, OpendalObjectStore};
+use super::{MediaType, OpendalObjectStore};
use crate::object::object_metrics::ObjectStoreMetrics;
use crate::object::ObjectResult;
@@ -47,7 +47,7 @@ impl OpendalObjectStore {
.finish();
Ok(Self {
op,
- engine_type: EngineType::Azblob,
+ media_type: MediaType::Azblob,
config,
metrics,
})
diff --git a/src/object_store/src/object/opendal_engine/fs.rs b/src/object_store/src/object/opendal_engine/fs.rs
index 2edaaa44d6bb..3792151ff474 100644
--- a/src/object_store/src/object/opendal_engine/fs.rs
+++ b/src/object_store/src/object/opendal_engine/fs.rs
@@ -19,7 +19,7 @@ use opendal::services::Fs;
use opendal::Operator;
use risingwave_common::config::ObjectStoreConfig;
-use super::{EngineType, OpendalObjectStore};
+use super::{MediaType, OpendalObjectStore};
use crate::object::object_metrics::ObjectStoreMetrics;
use crate::object::opendal_engine::ATOMIC_WRITE_DIR;
use crate::object::ObjectResult;
@@ -44,7 +44,7 @@ impl OpendalObjectStore {
.finish();
Ok(Self {
op,
- engine_type: EngineType::Fs,
+ media_type: MediaType::Fs,
config,
metrics,
})
diff --git a/src/object_store/src/object/opendal_engine/gcs.rs b/src/object_store/src/object/opendal_engine/gcs.rs
index a3876b30ef56..ee0d155058dd 100644
--- a/src/object_store/src/object/opendal_engine/gcs.rs
+++ b/src/object_store/src/object/opendal_engine/gcs.rs
@@ -19,7 +19,7 @@ use opendal::services::Gcs;
use opendal::Operator;
use risingwave_common::config::ObjectStoreConfig;
-use super::{EngineType, OpendalObjectStore};
+use super::{MediaType, OpendalObjectStore};
use crate::object::object_metrics::ObjectStoreMetrics;
use crate::object::ObjectResult;
@@ -49,7 +49,7 @@ impl OpendalObjectStore {
.finish();
Ok(Self {
op,
- engine_type: EngineType::Gcs,
+ media_type: MediaType::Gcs,
config,
metrics,
})
diff --git a/src/object_store/src/object/opendal_engine/hdfs.rs b/src/object_store/src/object/opendal_engine/hdfs.rs
index b7b28ef08a05..d7b3655c0fc3 100644
--- a/src/object_store/src/object/opendal_engine/hdfs.rs
+++ b/src/object_store/src/object/opendal_engine/hdfs.rs
@@ -19,7 +19,7 @@ use opendal::services::Hdfs;
use opendal::Operator;
use risingwave_common::config::ObjectStoreConfig;
-use super::{EngineType, OpendalObjectStore};
+use super::{MediaType, OpendalObjectStore};
// use crate::object::opendal_engine::ATOMIC_WRITE_DIR;
use crate::object::ObjectResult;
@@ -53,7 +53,7 @@ impl OpendalObjectStore {
.finish();
Ok(Self {
op,
- engine_type: EngineType::Hdfs,
+ media_type: MediaType::Hdfs,
config,
metrics,
})
diff --git a/src/object_store/src/object/opendal_engine/obs.rs b/src/object_store/src/object/opendal_engine/obs.rs
index 03919ec57d37..31c86109c820 100644
--- a/src/object_store/src/object/opendal_engine/obs.rs
+++ b/src/object_store/src/object/opendal_engine/obs.rs
@@ -19,7 +19,7 @@ use opendal::services::Obs;
use opendal::Operator;
use risingwave_common::config::ObjectStoreConfig;
-use super::{EngineType, OpendalObjectStore};
+use super::{MediaType, OpendalObjectStore};
use crate::object::object_metrics::ObjectStoreMetrics;
use crate::object::ObjectResult;
@@ -55,7 +55,7 @@ impl OpendalObjectStore {
.finish();
Ok(Self {
op,
- engine_type: EngineType::Obs,
+ media_type: MediaType::Obs,
config,
metrics,
})
diff --git a/src/object_store/src/object/opendal_engine/opendal_object_store.rs b/src/object_store/src/object/opendal_engine/opendal_object_store.rs
index 6ea0cbb6fe8f..6855ae951956 100644
--- a/src/object_store/src/object/opendal_engine/opendal_object_store.rs
+++ b/src/object_store/src/object/opendal_engine/opendal_object_store.rs
@@ -37,14 +37,14 @@ use crate::object::{
#[derive(Clone)]
pub struct OpendalObjectStore {
pub(crate) op: Operator,
- pub(crate) engine_type: EngineType,
+ pub(crate) media_type: MediaType,
pub(crate) config: Arc,
pub(crate) metrics: Arc,
}
#[derive(Clone)]
-pub enum EngineType {
+pub enum MediaType {
Memory,
Hdfs,
Gcs,
@@ -57,6 +57,23 @@ pub enum EngineType {
Fs,
}
+impl MediaType {
+ pub fn as_str(&self) -> &'static str {
+ match self {
+ MediaType::Memory => "Memory",
+ MediaType::Hdfs => "Hdfs",
+ MediaType::Gcs => "Gcs",
+ MediaType::Minio => "Minio",
+ MediaType::S3 => "S3",
+ MediaType::Obs => "Obs",
+ MediaType::Oss => "Oss",
+ MediaType::Webhdfs => "Webhdfs",
+ MediaType::Azblob => "Azblob",
+ MediaType::Fs => "Fs",
+ }
+ }
+}
+
impl OpendalObjectStore {
/// create opendal memory engine, used for unit tests.
pub fn test_new_memory_engine() -> ObjectResult {
@@ -65,7 +82,7 @@ impl OpendalObjectStore {
let op: Operator = Operator::new(builder)?.finish();
Ok(Self {
op,
- engine_type: EngineType::Memory,
+ media_type: MediaType::Memory,
config: Arc::new(ObjectStoreConfig::default()),
metrics: Arc::new(ObjectStoreMetrics::unused()),
})
@@ -77,17 +94,17 @@ impl ObjectStore for OpendalObjectStore {
type StreamingUploader = OpendalStreamingUploader;
fn get_object_prefix(&self, obj_id: u64, use_new_object_prefix_strategy: bool) -> String {
- match self.engine_type {
- EngineType::S3 => prefix::s3::get_object_prefix(obj_id),
- EngineType::Minio => prefix::s3::get_object_prefix(obj_id),
- EngineType::Memory => String::default(),
- EngineType::Hdfs
- | EngineType::Gcs
- | EngineType::Obs
- | EngineType::Oss
- | EngineType::Webhdfs
- | EngineType::Azblob
- | EngineType::Fs => {
+ match self.media_type {
+ MediaType::S3 => prefix::s3::get_object_prefix(obj_id),
+ MediaType::Minio => prefix::s3::get_object_prefix(obj_id),
+ MediaType::Memory => String::default(),
+ MediaType::Hdfs
+ | MediaType::Gcs
+ | MediaType::Obs
+ | MediaType::Oss
+ | MediaType::Webhdfs
+ | MediaType::Azblob
+ | MediaType::Fs => {
prefix::opendal_engine::get_object_prefix(obj_id, use_new_object_prefix_strategy)
}
}
@@ -248,18 +265,7 @@ impl ObjectStore for OpendalObjectStore {
}
fn store_media_type(&self) -> &'static str {
- match self.engine_type {
- EngineType::Memory => "Memory",
- EngineType::Hdfs => "Hdfs",
- EngineType::Minio => "Minio",
- EngineType::S3 => "S3",
- EngineType::Gcs => "Gcs",
- EngineType::Obs => "Obs",
- EngineType::Oss => "Oss",
- EngineType::Webhdfs => "Webhdfs",
- EngineType::Azblob => "Azblob",
- EngineType::Fs => "Fs",
- }
+ self.media_type.as_str()
}
fn support_streaming_upload(&self) -> bool {
diff --git a/src/object_store/src/object/opendal_engine/opendal_s3.rs b/src/object_store/src/object/opendal_engine/opendal_s3.rs
index 183496d08673..43629bbf5157 100644
--- a/src/object_store/src/object/opendal_engine/opendal_s3.rs
+++ b/src/object_store/src/object/opendal_engine/opendal_s3.rs
@@ -21,7 +21,7 @@ use opendal::services::S3;
use opendal::Operator;
use risingwave_common::config::ObjectStoreConfig;
-use super::{EngineType, OpendalObjectStore};
+use super::{MediaType, OpendalObjectStore};
use crate::object::object_metrics::ObjectStoreMetrics;
use crate::object::ObjectResult;
@@ -53,7 +53,7 @@ impl OpendalObjectStore {
Ok(Self {
op,
- engine_type: EngineType::S3,
+ media_type: MediaType::S3,
config,
metrics,
})
@@ -97,7 +97,7 @@ impl OpendalObjectStore {
Ok(Self {
op,
- engine_type: EngineType::Minio,
+ media_type: MediaType::Minio,
config,
metrics,
})
@@ -146,7 +146,7 @@ impl OpendalObjectStore {
Ok(Self {
op,
- engine_type: EngineType::S3,
+ media_type: MediaType::S3,
config,
metrics,
})
diff --git a/src/object_store/src/object/opendal_engine/oss.rs b/src/object_store/src/object/opendal_engine/oss.rs
index c4fc5d500b11..cc0ecdfb7a94 100644
--- a/src/object_store/src/object/opendal_engine/oss.rs
+++ b/src/object_store/src/object/opendal_engine/oss.rs
@@ -19,7 +19,7 @@ use opendal::services::Oss;
use opendal::Operator;
use risingwave_common::config::ObjectStoreConfig;
-use super::{EngineType, OpendalObjectStore};
+use super::{MediaType, OpendalObjectStore};
use crate::object::object_metrics::ObjectStoreMetrics;
use crate::object::ObjectResult;
@@ -55,7 +55,7 @@ impl OpendalObjectStore {
.finish();
Ok(Self {
op,
- engine_type: EngineType::Oss,
+ media_type: MediaType::Oss,
config,
metrics,
})
diff --git a/src/object_store/src/object/opendal_engine/webhdfs.rs b/src/object_store/src/object/opendal_engine/webhdfs.rs
index f083102a3ed2..b214bcfad2cc 100644
--- a/src/object_store/src/object/opendal_engine/webhdfs.rs
+++ b/src/object_store/src/object/opendal_engine/webhdfs.rs
@@ -19,7 +19,7 @@ use opendal::services::Webhdfs;
use opendal::Operator;
use risingwave_common::config::ObjectStoreConfig;
-use super::{EngineType, OpendalObjectStore};
+use super::{MediaType, OpendalObjectStore};
use crate::object::object_metrics::ObjectStoreMetrics;
use crate::object::opendal_engine::ATOMIC_WRITE_DIR;
use crate::object::ObjectResult;
@@ -47,7 +47,7 @@ impl OpendalObjectStore {
.finish();
Ok(Self {
op,
- engine_type: EngineType::Webhdfs,
+ media_type: MediaType::Webhdfs,
config,
metrics,
})
diff --git a/src/object_store/src/object/s3.rs b/src/object_store/src/object/s3.rs
index 001eb8128a5b..7ea687559ab3 100644
--- a/src/object_store/src/object/s3.rs
+++ b/src/object_store/src/object/s3.rs
@@ -103,6 +103,8 @@ pub struct S3StreamingUploader {
}
impl S3StreamingUploader {
+ const MEDIA_TYPE: &'static str = "s3";
+
pub fn new(
client: Client,
bucket: String,
@@ -159,6 +161,7 @@ impl S3StreamingUploader {
&self.config,
OperationType::StreamingUploadInit,
self.metrics.clone(),
+ Self::MEDIA_TYPE,
)
.await;
@@ -221,7 +224,14 @@ impl S3StreamingUploader {
})
};
- let res = retry_request(builder, &config, operation_type, metrics.clone()).await;
+ let res = retry_request(
+ builder,
+ &config,
+ operation_type,
+ metrics.clone(),
+ Self::MEDIA_TYPE,
+ )
+ .await;
try_update_failure_metric(&metrics, &res, operation_type_str);
Ok((part_id, res?))
}));
@@ -280,7 +290,14 @@ impl S3StreamingUploader {
})
};
- let res = retry_request(builder, &self.config, operation_type, self.metrics.clone()).await;
+ let res = retry_request(
+ builder,
+ &self.config,
+ operation_type,
+ self.metrics.clone(),
+ Self::MEDIA_TYPE,
+ )
+ .await;
try_update_failure_metric(&self.metrics, &res, operation_type.as_str());
let _res = res?;
@@ -353,9 +370,14 @@ impl StreamingUploader for S3StreamingUploader {
})
};
- let res =
- retry_request(builder, &self.config, operation_type, self.metrics.clone())
- .await;
+ let res = retry_request(
+ builder,
+ &self.config,
+ operation_type,
+ self.metrics.clone(),
+ Self::MEDIA_TYPE,
+ )
+ .await;
try_update_failure_metric(&self.metrics, &res, operation_type.as_str());
res?;
Ok(())
diff --git a/src/sqlparser/src/ast/mod.rs b/src/sqlparser/src/ast/mod.rs
index 3df7b753147c..80410f767fb6 100644
--- a/src/sqlparser/src/ast/mod.rs
+++ b/src/sqlparser/src/ast/mod.rs
@@ -2552,6 +2552,10 @@ impl FunctionArgList {
}
}
+ pub fn is_args_only(&self) -> bool {
+ !self.distinct && !self.variadic && self.order_by.is_empty() && !self.ignore_nulls
+ }
+
pub fn for_agg(distinct: bool, args: Vec, order_by: Vec) -> Self {
Self {
distinct,
diff --git a/src/sqlparser/src/ast/statement.rs b/src/sqlparser/src/ast/statement.rs
index db314fc8d6f6..c97c524b937b 100644
--- a/src/sqlparser/src/ast/statement.rs
+++ b/src/sqlparser/src/ast/statement.rs
@@ -28,7 +28,7 @@ use crate::ast::{
display_comma_separated, display_separated, ColumnDef, ObjectName, SqlOption, TableConstraint,
};
use crate::keywords::Keyword;
-use crate::parser::{IncludeOption, IsOptional, Parser, UPSTREAM_SOURCE_KEY};
+use crate::parser::{IncludeOption, IsOptional, Parser};
use crate::parser_err;
use crate::parser_v2::literal_u32;
use crate::tokenizer::Token;
@@ -82,6 +82,7 @@ macro_rules! impl_fmt_display {
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
pub struct CreateSourceStatement {
+ pub temporary: bool,
pub if_not_exists: bool,
pub columns: Vec,
// The wildchar position in columns defined in sql. Only exist when using external schema.
@@ -399,44 +400,6 @@ impl fmt::Display for ConnectorSchema {
}
}
-impl ParseTo for CreateSourceStatement {
- fn parse_to(p: &mut Parser<'_>) -> PResult {
- impl_parse_to!(if_not_exists => [Keyword::IF, Keyword::NOT, Keyword::EXISTS], p);
- impl_parse_to!(source_name: ObjectName, p);
-
- // parse columns
- let (columns, constraints, source_watermarks, wildcard_idx) =
- p.parse_columns_with_watermark()?;
- let include_options = p.parse_include_options()?;
-
- let with_options = p.parse_with_properties()?;
- let option = with_options
- .iter()
- .find(|&opt| opt.name.real_value() == UPSTREAM_SOURCE_KEY);
- let connector: String = option.map(|opt| opt.value.to_string()).unwrap_or_default();
- let cdc_source_job = connector.contains("-cdc");
- if cdc_source_job && (!columns.is_empty() || !constraints.is_empty()) {
- parser_err!("CDC source cannot define columns and constraints");
- }
-
- // row format for nexmark source must be native
- // default row format for datagen source is native
- let source_schema = p.parse_source_schema_with_connector(&connector, cdc_source_job)?;
-
- Ok(Self {
- if_not_exists,
- columns,
- wildcard_idx,
- constraints,
- source_name,
- with_properties: WithProperties(with_options),
- source_schema,
- source_watermarks,
- include_column_options: include_options,
- })
- }
-}
-
pub(super) fn fmt_create_items(
columns: &[ColumnDef],
constraints: &[TableConstraint],
diff --git a/src/sqlparser/src/parser.rs b/src/sqlparser/src/parser.rs
index 5e7df3c2fc53..872ff61919a9 100644
--- a/src/sqlparser/src/parser.rs
+++ b/src/sqlparser/src/parser.rs
@@ -29,12 +29,12 @@ use winnow::{PResult, Parser as _};
use crate::ast::*;
use crate::keywords::{self, Keyword};
-use crate::parser_v2;
use crate::parser_v2::{
dollar_quoted_string, keyword, literal_i64, literal_uint, single_quoted_string, token_number,
ParserExt as _,
};
use crate::tokenizer::*;
+use crate::{impl_parse_to, parser_v2};
pub(crate) const UPSTREAM_SOURCE_KEY: &str = "connector";
@@ -2008,7 +2008,7 @@ impl Parser<'_> {
} else if self.parse_keywords(&[Keyword::MATERIALIZED, Keyword::SOURCE]) {
parser_err!("CREATE MATERIALIZED SOURCE has been deprecated, use CREATE TABLE instead")
} else if self.parse_keyword(Keyword::SOURCE) {
- self.parse_create_source(or_replace)
+ self.parse_create_source(or_replace, temporary)
} else if self.parse_keyword(Keyword::SINK) {
self.parse_create_sink(or_replace)
} else if self.parse_keyword(Keyword::SUBSCRIPTION) {
@@ -2102,17 +2102,54 @@ impl Parser<'_> {
}
// CREATE [OR REPLACE]?
- // [MATERIALIZED] SOURCE
+ // [TEMPORARY] SOURCE
// [IF NOT EXISTS]?
//
// [COLUMNS]?
// [WITH (properties)]?
// ROW FORMAT
// [ROW SCHEMA LOCATION ]?
- pub fn parse_create_source(&mut self, _or_replace: bool) -> PResult {
- Ok(Statement::CreateSource {
- stmt: CreateSourceStatement::parse_to(self)?,
- })
+ pub fn parse_create_source(
+ &mut self,
+ _or_replace: bool,
+ temporary: bool,
+ ) -> PResult {
+ impl_parse_to!(if_not_exists => [Keyword::IF, Keyword::NOT, Keyword::EXISTS], self);
+ impl_parse_to!(source_name: ObjectName, self);
+
+ // parse columns
+ let (columns, constraints, source_watermarks, wildcard_idx) =
+ self.parse_columns_with_watermark()?;
+ let include_options = self.parse_include_options()?;
+
+ let with_options = self.parse_with_properties()?;
+ let option = with_options
+ .iter()
+ .find(|&opt| opt.name.real_value() == UPSTREAM_SOURCE_KEY);
+ let connector: String = option.map(|opt| opt.value.to_string()).unwrap_or_default();
+ let cdc_source_job = connector.contains("-cdc");
+ if cdc_source_job && (!columns.is_empty() || !constraints.is_empty()) {
+ parser_err!("CDC source cannot define columns and constraints");
+ }
+
+ // row format for nexmark source must be native
+ // default row format for datagen source is native
+ let source_schema = self.parse_source_schema_with_connector(&connector, cdc_source_job)?;
+
+ let stmt = CreateSourceStatement {
+ temporary,
+ if_not_exists,
+ columns,
+ wildcard_idx,
+ constraints,
+ source_name,
+ with_properties: WithProperties(with_options),
+ source_schema,
+ source_watermarks,
+ include_column_options: include_options,
+ };
+
+ Ok(Statement::CreateSource { stmt })
}
// CREATE [OR REPLACE]?
diff --git a/src/sqlparser/tests/testdata/create.yaml b/src/sqlparser/tests/testdata/create.yaml
index 13f5e2dad5bf..bcd94d53f1ed 100644
--- a/src/sqlparser/tests/testdata/create.yaml
+++ b/src/sqlparser/tests/testdata/create.yaml
@@ -38,13 +38,13 @@
^
- input: CREATE SOURCE IF NOT EXISTS src WITH (kafka.topic = 'abc', kafka.brokers = 'localhost:1001') FORMAT PLAIN ENCODE PROTOBUF (message = 'Foo', schema.location = 'file://')
formatted_sql: CREATE SOURCE IF NOT EXISTS src WITH (kafka.topic = 'abc', kafka.brokers = 'localhost:1001') FORMAT PLAIN ENCODE PROTOBUF (message = 'Foo', schema.location = 'file://')
- formatted_ast: 'CreateSource { stmt: CreateSourceStatement { if_not_exists: true, columns: [], wildcard_idx: None, constraints: [], source_name: ObjectName([Ident { value: "src", quote_style: None }]), with_properties: WithProperties([SqlOption { name: ObjectName([Ident { value: "kafka", quote_style: None }, Ident { value: "topic", quote_style: None }]), value: SingleQuotedString("abc") }, SqlOption { name: ObjectName([Ident { value: "kafka", quote_style: None }, Ident { value: "brokers", quote_style: None }]), value: SingleQuotedString("localhost:1001") }]), source_schema: V2(ConnectorSchema { format: Plain, row_encode: Protobuf, row_options: [SqlOption { name: ObjectName([Ident { value: "message", quote_style: None }]), value: SingleQuotedString("Foo") }, SqlOption { name: ObjectName([Ident { value: "schema", quote_style: None }, Ident { value: "location", quote_style: None }]), value: SingleQuotedString("file://") }], key_encode: None }), source_watermarks: [], include_column_options: [] } }'
+ formatted_ast: 'CreateSource { stmt: CreateSourceStatement { temporary: false, if_not_exists: true, columns: [], wildcard_idx: None, constraints: [], source_name: ObjectName([Ident { value: "src", quote_style: None }]), with_properties: WithProperties([SqlOption { name: ObjectName([Ident { value: "kafka", quote_style: None }, Ident { value: "topic", quote_style: None }]), value: SingleQuotedString("abc") }, SqlOption { name: ObjectName([Ident { value: "kafka", quote_style: None }, Ident { value: "brokers", quote_style: None }]), value: SingleQuotedString("localhost:1001") }]), source_schema: V2(ConnectorSchema { format: Plain, row_encode: Protobuf, row_options: [SqlOption { name: ObjectName([Ident { value: "message", quote_style: None }]), value: SingleQuotedString("Foo") }, SqlOption { name: ObjectName([Ident { value: "schema", quote_style: None }, Ident { value: "location", quote_style: None }]), value: SingleQuotedString("file://") }], key_encode: None }), source_watermarks: [], include_column_options: [] } }'
- input: CREATE SOURCE IF NOT EXISTS src WITH (kafka.topic = 'abc', kafka.brokers = 'localhost:1001') FORMAT PLAIN ENCODE PROTOBUF (message = 'Foo', schema.registry = 'http://')
formatted_sql: CREATE SOURCE IF NOT EXISTS src WITH (kafka.topic = 'abc', kafka.brokers = 'localhost:1001') FORMAT PLAIN ENCODE PROTOBUF (message = 'Foo', schema.registry = 'http://')
- formatted_ast: 'CreateSource { stmt: CreateSourceStatement { if_not_exists: true, columns: [], wildcard_idx: None, constraints: [], source_name: ObjectName([Ident { value: "src", quote_style: None }]), with_properties: WithProperties([SqlOption { name: ObjectName([Ident { value: "kafka", quote_style: None }, Ident { value: "topic", quote_style: None }]), value: SingleQuotedString("abc") }, SqlOption { name: ObjectName([Ident { value: "kafka", quote_style: None }, Ident { value: "brokers", quote_style: None }]), value: SingleQuotedString("localhost:1001") }]), source_schema: V2(ConnectorSchema { format: Plain, row_encode: Protobuf, row_options: [SqlOption { name: ObjectName([Ident { value: "message", quote_style: None }]), value: SingleQuotedString("Foo") }, SqlOption { name: ObjectName([Ident { value: "schema", quote_style: None }, Ident { value: "registry", quote_style: None }]), value: SingleQuotedString("http://") }], key_encode: None }), source_watermarks: [], include_column_options: [] } }'
+ formatted_ast: 'CreateSource { stmt: CreateSourceStatement { temporary: false, if_not_exists: true, columns: [], wildcard_idx: None, constraints: [], source_name: ObjectName([Ident { value: "src", quote_style: None }]), with_properties: WithProperties([SqlOption { name: ObjectName([Ident { value: "kafka", quote_style: None }, Ident { value: "topic", quote_style: None }]), value: SingleQuotedString("abc") }, SqlOption { name: ObjectName([Ident { value: "kafka", quote_style: None }, Ident { value: "brokers", quote_style: None }]), value: SingleQuotedString("localhost:1001") }]), source_schema: V2(ConnectorSchema { format: Plain, row_encode: Protobuf, row_options: [SqlOption { name: ObjectName([Ident { value: "message", quote_style: None }]), value: SingleQuotedString("Foo") }, SqlOption { name: ObjectName([Ident { value: "schema", quote_style: None }, Ident { value: "registry", quote_style: None }]), value: SingleQuotedString("http://") }], key_encode: None }), source_watermarks: [], include_column_options: [] } }'
- input: CREATE SOURCE bid (auction INTEGER, bidder INTEGER, price INTEGER, WATERMARK FOR auction AS auction - 1, "date_time" TIMESTAMP) with (connector = 'nexmark', nexmark.table.type = 'Bid', nexmark.split.num = '12', nexmark.min.event.gap.in.ns = '0')
formatted_sql: CREATE SOURCE bid (auction INT, bidder INT, price INT, "date_time" TIMESTAMP, WATERMARK FOR auction AS auction - 1) WITH (connector = 'nexmark', nexmark.table.type = 'Bid', nexmark.split.num = '12', nexmark.min.event.gap.in.ns = '0') FORMAT NATIVE ENCODE NATIVE
- formatted_ast: 'CreateSource { stmt: CreateSourceStatement { if_not_exists: false, columns: [ColumnDef { name: Ident { value: "auction", quote_style: None }, data_type: Some(Int), collation: None, options: [] }, ColumnDef { name: Ident { value: "bidder", quote_style: None }, data_type: Some(Int), collation: None, options: [] }, ColumnDef { name: Ident { value: "price", quote_style: None }, data_type: Some(Int), collation: None, options: [] }, ColumnDef { name: Ident { value: "date_time", quote_style: Some(''"'') }, data_type: Some(Timestamp(false)), collation: None, options: [] }], wildcard_idx: None, constraints: [], source_name: ObjectName([Ident { value: "bid", quote_style: None }]), with_properties: WithProperties([SqlOption { name: ObjectName([Ident { value: "connector", quote_style: None }]), value: SingleQuotedString("nexmark") }, SqlOption { name: ObjectName([Ident { value: "nexmark", quote_style: None }, Ident { value: "table", quote_style: None }, Ident { value: "type", quote_style: None }]), value: SingleQuotedString("Bid") }, SqlOption { name: ObjectName([Ident { value: "nexmark", quote_style: None }, Ident { value: "split", quote_style: None }, Ident { value: "num", quote_style: None }]), value: SingleQuotedString("12") }, SqlOption { name: ObjectName([Ident { value: "nexmark", quote_style: None }, Ident { value: "min", quote_style: None }, Ident { value: "event", quote_style: None }, Ident { value: "gap", quote_style: None }, Ident { value: "in", quote_style: None }, Ident { value: "ns", quote_style: None }]), value: SingleQuotedString("0") }]), source_schema: V2(ConnectorSchema { format: Native, row_encode: Native, row_options: [], key_encode: None }), source_watermarks: [SourceWatermark { column: Ident { value: "auction", quote_style: None }, expr: BinaryOp { left: Identifier(Ident { value: "auction", quote_style: None }), op: Minus, right: Value(Number("1")) } }], include_column_options: [] } }'
+ formatted_ast: 'CreateSource { stmt: CreateSourceStatement { temporary: false, if_not_exists: false, columns: [ColumnDef { name: Ident { value: "auction", quote_style: None }, data_type: Some(Int), collation: None, options: [] }, ColumnDef { name: Ident { value: "bidder", quote_style: None }, data_type: Some(Int), collation: None, options: [] }, ColumnDef { name: Ident { value: "price", quote_style: None }, data_type: Some(Int), collation: None, options: [] }, ColumnDef { name: Ident { value: "date_time", quote_style: Some(''"'') }, data_type: Some(Timestamp(false)), collation: None, options: [] }], wildcard_idx: None, constraints: [], source_name: ObjectName([Ident { value: "bid", quote_style: None }]), with_properties: WithProperties([SqlOption { name: ObjectName([Ident { value: "connector", quote_style: None }]), value: SingleQuotedString("nexmark") }, SqlOption { name: ObjectName([Ident { value: "nexmark", quote_style: None }, Ident { value: "table", quote_style: None }, Ident { value: "type", quote_style: None }]), value: SingleQuotedString("Bid") }, SqlOption { name: ObjectName([Ident { value: "nexmark", quote_style: None }, Ident { value: "split", quote_style: None }, Ident { value: "num", quote_style: None }]), value: SingleQuotedString("12") }, SqlOption { name: ObjectName([Ident { value: "nexmark", quote_style: None }, Ident { value: "min", quote_style: None }, Ident { value: "event", quote_style: None }, Ident { value: "gap", quote_style: None }, Ident { value: "in", quote_style: None }, Ident { value: "ns", quote_style: None }]), value: SingleQuotedString("0") }]), source_schema: V2(ConnectorSchema { format: Native, row_encode: Native, row_options: [], key_encode: None }), source_watermarks: [SourceWatermark { column: Ident { value: "auction", quote_style: None }, expr: BinaryOp { left: Identifier(Ident { value: "auction", quote_style: None }), op: Minus, right: Value(Number("1")) } }], include_column_options: [] } }'
- input: CREATE TABLE T (v1 INT, v2 STRUCT)
formatted_sql: CREATE TABLE T (v1 INT, v2 STRUCT)
- input: CREATE TABLE T (v1 INT, v2 STRUCT>)