Skip to content

Commit

Permalink
Merge branch 'main' of https://github.com/risingwavelabs/risingwave i…
Browse files Browse the repository at this point in the history
…nto li0k/storage_split_with_size
  • Loading branch information
Li0k committed Aug 23, 2024
2 parents 196e218 + 9587945 commit ec000b3
Show file tree
Hide file tree
Showing 37 changed files with 777 additions and 444 deletions.
97 changes: 27 additions & 70 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,28 +9,20 @@

<div align="center">

### 🌊 Reimagine stream processing.
### 🌊 Reimagine real-time data engineering.

</div>

<p align="center">
<p align="center">&nbsp;&nbsp;&nbsp;📚&nbsp;
<a
href="https://docs.risingwave.com/"
target="_blank"
><b>Documentation</b></a>&nbsp;&nbsp;&nbsp;📑&nbsp;&nbsp;&nbsp;
<a
href="https://tutorials.risingwave.com/"
target="_blank"
><b>Hands-on Tutorials</b></a>&nbsp;&nbsp;&nbsp;🎯&nbsp;&nbsp;&nbsp;
<a
href="https://cloud.risingwave.com/"
target="_blank"
><b>RisingWave Cloud</b></a>&nbsp;&nbsp;&nbsp;🚀&nbsp;&nbsp;&nbsp;
><b>Documentation</b></a>&nbsp;&nbsp;&nbsp;🚀&nbsp;
<a
href="https://risingwave.com/slack"
target="_blank"
>
<b>Get Instant Help</b>
<b>Slack Community</b>
</a>
</p>
<div align="center">
Expand All @@ -56,16 +48,9 @@
RisingWave is a Postgres-compatible SQL engine engineered to provide the <i><b>simplest</b></i> and <i><b>most cost-efficient</b></i> approach for <b>processing</b>, <b>analyzing</b>, and <b>managing</b> real-time event streaming data.

![RisingWave](./docs/dev/src/images/architecture_20240814.png)

## When to use RisingWave?
RisingWave can ingest millions of events per second, continuously join live data streams with historical tables, and serve ad-hoc queries in real-time. Typical use cases include, but are not limited to:

* **Streaming analytics**: Perform streaming analytics and build live dashboards with data freshness under one second, ideal for stock trading, sports betting, IoT monitoring, and more.
* **Event-driven applications**: Develop monitoring and alerting applications for fraud detection, anomaly detection, and more.
* **Real-time ETL pipelines**: Ingest data from different sources, perform enrichment queries, and deliver results to downstream systems.
* **Feature stores**: Transform both batch and streaming data into ML features using the same codebase.
RisingWave can <b>ingest</b> millions of events per second, seamlessly <b>join and analyze</b> live data streams with historical tables, <b>serve</b> ad-hoc queries in real-time, and <b>deliver</b> fresh, consistent results.

![RisingWave](./docs/dev/src/images/architecture_20240814.png)

## Try it out in 60 seconds

Expand All @@ -74,72 +59,44 @@ Install RisingWave standalone mode:
curl https://risingwave.com/sh | sh
```

Then follow the prompts to start and connect to RisingWave.

To learn about other installation options, such as using a Docker image, see [Quick Start](https://docs.risingwave.com/docs/current/get-started/).

> Please note: RisingWave uses [Scarf](https://scarf.sh/) to collect anonymized installation analytics. These analytics help support us understand and improve the distribution of our package.
> The privacy policy of Scarf is available at [https://about.scarf.sh/privacy-policy](https://about.scarf.sh/privacy-policy).
## Production deployments

[**RisingWave Cloud**](https://cloud.risingwave.com) offers the easiest way to run RisingWave in production, with a _forever-free_ developer tier.

For **Docker deployment**, please refer to [Docker Compose](https://docs.risingwave.com/docs/current/risingwave-docker-compose/).

For **Kubernetes deployment**, please refer to [Kubernetes with Helm](https://docs.risingwave.com/docs/current/risingwave-k8s-helm/) or [Kubernetes with Operator](https://docs.risingwave.com/docs/current/risingwave-kubernetes/).

## Why RisingWave for real-time materialized views?

RisingWave specializes in providing **incrementally updated, consistent materialized views** — a persistent data structure that represents the results of event stream processing. Compared to materialized views, dynamic tables, and live tables in other database and data warehouse systems, RisingWave's materialized view stands out in several key aspects:
* Highly cost-efficient - up to 95% cost savings compared to state-of-the-art solutions
* Synchronous refresh without compromising consistency
* Extensive SQL support including joins, deletes, and updates
* High concurrency in query serving
* Instant fault tolerance
* Transparent dynamic scaling
* Speedy bootstrapping and backfilling

RisingWave's extensive CDC support further enables users to seamlessly offload event-driven workloads such as materialized views and triggers from operational databases (e.g., [PostgreSQL](https://docs.risingwave.com/docs/current/ingest-from-postgres-cdc/)) to RisingWave.
## When is RisingWave the perfect fit?
RisingWave is the ideal solution for building event-driven applications. Choose RisingWave when you want to:

* Ingest data from real-time sources like Kafka streams, database CDC, and more.
* Perform complex queries (such as joins, aggregations, and time windowing) on the fly.
* Interactively and concurrently explore consistent, up-to-the-moment results.
* Seamlessly send results to downstream systems.
* Process streaming and batch data using the same codebase.

## Why RisingWave for stream processing?

RisingWave provides users with a comprehensive set of frequently used stream processing features, including exactly-once consistency, [time window functions](https://docs.risingwave.com/docs/current/sql-function-time-window/), [watermarks](https://docs.risingwave.com/docs/current/watermarks/), and more. RisingWave significantly reduces the complexity of building stream processing applications by allowing developers to express intricate stream processing logic through cascaded materialized views. Furthermore, it allows users to persist data directly within the system, eliminating the need to deliver results to external databases for storage and query serving.
## In what use cases does RisingWave excel?
RisingWave is particularly effective for the following use cases:

![Real-time Data Pipelines without or with RisingWave](https://github.com/risingwavelabs/risingwave/assets/100685635/414afbb7-5187-410f-9ba4-9a640c8c6306)
* **Streaming analytics**: Achieve sub-second data freshness in live dashboards, ideal for high-stakes scenarios like stock trading, sports betting, and IoT monitoring.
* **Event-driven applications**: Develop sophisticated monitoring and alerting systems for critical applications such as fraud and anomaly detection.
* **Real-time data enrichment**: Continuously ingest data from diverse sources, conduct real-time data enrichment, and efficiently deliver the results to downstream systems.
* **Feature engineering**: Transform batch and streaming data into features in your machine learning models using a unified codebase, ensuring seamless integration and consistency.

Compared to existing stream processing systems like [Apache Flink](https://flink.apache.org/), [Apache Spark Streaming](https://spark.apache.org/docs/latest/streaming-programming-guide.html), and [ksqlDB](https://ksqldb.io/), RisingWave stands out in two primary dimensions: **Ease-of-use** and **cost efficiency**, thanks to its **[PostgreSQL](https://www.postgresql.org/)-style interaction experience** and **[Snowflake](https://snowflake.com/)-like architectural design** (i.e., decoupled storage and compute).

| | RisingWave 🌊 | Traditional stream processing systems |
| :---: | :---: | :---: |
| Learning curve 🎢 | PostgreSQL-style experience | System-specific concepts |
| Integration 🔗 | PostgreSQL ecosystem | System-specific ecosystem |
| Complex queries (e.g., joins) 💡 | Highly efficient | Inefficient |
| Failure recovery 🚨 | Instant | Minutes or even hours |
| Dynamic scaling 🚀 | Transparent | Stop-the-world |
| Bootstrapping and Backfilling ⏪ | Accelerated via dynamic scaling | Slow |
## Production deployments

[**RisingWave Cloud**](https://cloud.risingwave.com) offers the easiest way to run RisingWave in production.

### RisingWave as a database
RisingWave is fundamentally a database that **extends beyond basic streaming data processing capabilities**. It excels in **the effective management of streaming data**, making it a trusted choice for data persistence and powering online applications. RisingWave offers an extensive range of database capabilities, which include:
For **Docker deployment**, please refer to [Docker Compose](https://docs.risingwave.com/docs/current/risingwave-docker-compose/).

* High availability
* Serving highly concurrent queries
* Role-based access control (RBAC)
* Integration with data modeling tools, such as [dbt](https://docs.risingwave.com/docs/current/use-dbt/)
* Integration with database management tools, such as [Dbeaver](https://docs.risingwave.com/docs/current/dbeaver-integration/)
* Integration with BI tools, such as [Grafana](https://docs.risingwave.com/docs/current/grafana-integration/)
* Schema change
* Processing of semi-structured data
For **Kubernetes deployment**, please refer to [Kubernetes with Helm](https://docs.risingwave.com/docs/current/risingwave-k8s-helm/) or [Kubernetes with Operator](https://docs.risingwave.com/docs/current/risingwave-kubernetes/).

## Community

Looking for help, discussions, collaboration opportunities, or a casual afternoon chat with our fellow engineers and community members? Join our [Slack workspace](https://risingwave.com/slack)!

## Notes on telemetry

RisingWave collects anonymous usage statistics to better understand how the community is using RisingWave. The sole intention of this exercise is to help improve the product. Users may opt out easily at any time. Please refer to the [user documentation](https://docs.risingwave.com/docs/current/telemetry/) for more details.

RisingWave uses [Scarf](https://scarf.sh/) to collect anonymized installation analytics. These analytics help support us understand and improve the distribution of our package. The privacy policy of Scarf is available at [https://about.scarf.sh/privacy-policy](https://about.scarf.sh/privacy-policy).

RisingWave also collects anonymous usage statistics to better understand how the community is using RisingWave. The sole intention of this exercise is to help improve the product. Users may opt out easily at any time. Please refer to the [user documentation](https://docs.risingwave.com/docs/current/telemetry/) for more details.

## License

Expand Down
5 changes: 4 additions & 1 deletion docker/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ ENV PATH $NVM_DIR/versions/node/v$NODE_VERSION/bin:$PATH
SHELL ["/bin/bash", "-c"]

RUN curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh -s -- --no-modify-path --default-toolchain none -y
ENV PATH /root/.cargo/bin/:$PATH
ENV PATH /root/.cargo/bin:$PATH
ENV CARGO_INCREMENTAL=0

COPY rust-toolchain rust-toolchain
Expand All @@ -34,6 +34,8 @@ RUN rustup self update \
&& rustup target add wasm32-wasi

RUN cargo install flamegraph
RUN cargo install addr2line --features bin --bin addr2line

# TODO: cargo-chef doesn't work well now, because we update Cargo.lock very often.
# We may consider sccache instead.

Expand Down Expand Up @@ -137,6 +139,7 @@ COPY --from=rust-builder /risingwave/bin/risingwave.dwp /risingwave/bin/risingwa
COPY --from=java-builder /risingwave/bin/connector-node /risingwave/bin/connector-node
COPY --from=rust-builder /risingwave/bin/jeprof /usr/local/bin/jeprof
COPY --from=rust-base /root/.cargo/bin/flamegraph /usr/local/bin/flamegraph
COPY --from=rust-base /root/.cargo/bin/addr2line /usr/local/bin/addr2line

# Set default playground mode to docker-playground profile
ENV PLAYGROUND_PROFILE docker-playground
Expand Down
82 changes: 82 additions & 0 deletions e2e_test/source/basic/temporary_kafka_batch.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
statement ok
create temporary source s1 (v1 int, v2 varchar) with (
connector = 'kafka',
topic = 'kafka_1_partition_topic',
properties.bootstrap.server = 'message_queue:29092',
scan.startup.mode = 'earliest'
) FORMAT PLAIN ENCODE JSON

query IT rowsort
select * from s1
----
1 1
2 22
3 333
4 4444

query IT rowsort
select * from s1 where _rw_kafka_timestamp > '1977-01-01 00:00:00+00:00'
----
1 1
2 22
3 333
4 4444

query IT rowsort
select * from s1 where _rw_kafka_timestamp > '1977-01-01 00:00:00'
----
1 1
2 22
3 333
4 4444

query IT rowsort
select * from s1 where _rw_kafka_timestamp > TO_TIMESTAMP('1977-01-01 00:00:00.000000', 'YYYY-MM-DD HH24:MI:SS.US')
----
1 1
2 22
3 333
4 4444

statement error expected format
select * from s1 where _rw_kafka_timestamp > 'abc'

statement error out of range
select * from s1 where _rw_kafka_timestamp < TO_TIMESTAMP(2147483647 + 1)

query IT
select * from s1 where _rw_kafka_timestamp > '2045-01-01 0:00:00+00:00'
----

query B
select _rw_kafka_timestamp > '1977-01-01 00:00:00+00:00' from s1
----
t
t
t
t

query B
select _rw_kafka_timestamp < now() from s1
----
t
t
t
t

query B
select _rw_kafka_timestamp < now() - interval '1 day' from s1
----
f
f
f
f

query IT rowsort
select * from s1 limit 2
----
1 1
2 22

statement ok
drop source s1
19 changes: 19 additions & 0 deletions e2e_test/udf/bug_fixes/17560_udaf_as_win_func.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
# https://github.com/risingwavelabs/risingwave/issues/17560

statement ok
create aggregate sum00(value int) returns int language python as $$
def create_state():
return 0
def accumulate(state, value):
return state + value
def retract(state, value):
return state - value
def finish(state):
return state
$$;

query ii
select t.value, sum00(weight) OVER (PARTITION BY value) from (values (1, 1), (null, 2), (3, 3)) as t(value, weight);
----
1 1
3 3
12 changes: 2 additions & 10 deletions src/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1165,7 +1165,7 @@ pub struct S3ObjectStoreDeveloperConfig {
)]
pub retryable_service_error_codes: Vec<String>,

// TODO: the following field will be deprecated after opendal is stablized
// TODO: deprecate this config when we are completely deprecate aws sdk.
#[serde(default = "default::object_store_config::s3::developer::use_opendal")]
pub use_opendal: bool,
}
Expand Down Expand Up @@ -2174,10 +2174,6 @@ pub mod default {
}

pub mod developer {
use crate::util::env_var::env_var_is_true_or;

const RW_USE_OPENDAL_FOR_S3: &str = "RW_USE_OPENDAL_FOR_S3";

pub fn retry_unknown_service_error() -> bool {
false
}
Expand All @@ -2187,11 +2183,7 @@ pub mod default {
}

pub fn use_opendal() -> bool {
// TODO: deprecate this config when we are completely switch from aws sdk to opendal.
// The reason why we use !env_var_is_false_or(RW_USE_OPENDAL_FOR_S3, false) here is
// 1. Maintain compatibility so that there is no behavior change in cluster with RW_USE_OPENDAL_FOR_S3 set.
// 2. Change the default behavior to use opendal for s3 if RW_USE_OPENDAL_FOR_S3 is not set.
env_var_is_true_or(RW_USE_OPENDAL_FOR_S3, false)
true
}
}
}
Expand Down
13 changes: 0 additions & 13 deletions src/common/src/util/env_var.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,16 +34,3 @@ pub fn env_var_is_true_or(key: impl AsRef<OsStr>, default: bool) -> bool {
})
.unwrap_or(default)
}

/// Checks whether the environment variable `key` is set to `false` or `f` or `0`.
///
/// Returns `default` if the environment variable is not set, or contains invalid characters.
pub fn env_var_is_false_or(key: impl AsRef<OsStr>, default: bool) -> bool {
env::var(key)
.map(|value| {
["0", "f", "false"]
.iter()
.any(|&s| value.eq_ignore_ascii_case(s))
})
.unwrap_or(default)
}
2 changes: 1 addition & 1 deletion src/config/example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,7 @@ identity_resolution_timeout_s = 5
[storage.object_store.s3.developer]
retry_unknown_service_error = false
retryable_service_error_codes = ["SlowDown", "TooManyRequests"]
use_opendal = false
use_opendal = true

[system]
barrier_interval_ms = 1000
Expand Down
1 change: 1 addition & 0 deletions src/expr/core/src/aggregate/def.rs
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,7 @@ impl Display for AggKind {
}
}

/// `FromStr` for builtin aggregate functions.
impl FromStr for AggKind {
type Err = ();

Expand Down
2 changes: 1 addition & 1 deletion src/expr/core/src/window_function/kind.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use crate::aggregate::AggKind;
use crate::Result;

/// Kind of window functions.
#[derive(Debug, Display, FromStr, Clone, PartialEq, Eq, Hash)]
#[derive(Debug, Display, FromStr /* for builtin */, Clone, PartialEq, Eq, Hash)]
#[display(style = "snake_case")]
pub enum WindowFuncKind {
// General-purpose window functions.
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/planner_test/tests/testdata/output/agg.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -801,7 +801,7 @@
Failed to bind expression: abs(a) FILTER (WHERE a > 0)
Caused by:
Invalid input syntax: DISTINCT, ORDER BY or FILTER is only allowed in aggregation functions, but `abs` is not an aggregation function
Invalid input syntax: `FILTER` is not allowed in scalar/table function call
- name: prune column before filter
sql: |
create table t(v1 int, v2 int);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@
Failed to bind expression: lag(x)
Caused by:
Invalid input syntax: Window function `lag` must have OVER clause
function lag(integer) does not exist, do you mean log
- id: lag with empty over clause
sql: |
create table t(x int);
Expand Down
Loading

0 comments on commit ec000b3

Please sign in to comment.