Skip to content

Commit

Permalink
Merge branch 'main' into kwannoel/migrate-backfill-test
Browse files Browse the repository at this point in the history
  • Loading branch information
kwannoel authored Oct 30, 2023
2 parents 8584b95 + e392db0 commit a6e07be
Show file tree
Hide file tree
Showing 31 changed files with 885 additions and 447 deletions.
24 changes: 17 additions & 7 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

60 changes: 33 additions & 27 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -54,52 +54,62 @@
</a>
</div>
RisingWave is a distributed SQL streaming database that enables <b>simple</b>, <b>efficient</b>, and <b>reliable</b> processing of streaming data.
RisingWave is a distributed SQL streaming database that enables <b>cost-efficient</b> and <b>reliable</b> processing of streaming data.

![RisingWave](https://github.com/risingwavelabs/risingwave-docs/blob/0f7e1302b22493ba3c1c48e78810750ce9a5ff42/docs/images/archi_simple.png)

## How to install
**Ubuntu**
```
wget https://github.com/risingwavelabs/risingwave/releases/download/v1.3.0/risingwave-v1.3.0-x86_64-unknown-linux.tar.gz
tar xvf risingwave-v1.3.0-x86_64-unknown-linux.tar.gz
./risingwave playground
```
## Try it out in 5 minutes
**Mac**
```
brew tap risingwavelabs/risingwave
brew install risingwave
risingwave playground
```
**Ubuntu**
```
wget https://github.com/risingwavelabs/risingwave/releases/download/v1.3.0/risingwave-v1.3.0-x86_64-unknown-linux.tar.gz
tar xvf risingwave-v1.3.0-x86_64-unknown-linux.tar.gz
./risingwave playground
```
Now connect to RisingWave using `psql`:
```
psql -h localhost -p 4566 -d dev -U root
```

Learn more at [Quick Start](https://docs.risingwave.com/docs/current/get-started/).

## Production deployments
For **single-node Docker deployments**, please refer to [Docker Compose](https://docs.risingwave.com/docs/current/risingwave-trial/?method=docker-compose).

For **Kubernetes deployments**, please refer to [Kubernetes with Helm](https://docs.risingwave.com/docs/current/risingwave-k8s-helm/) or [Kubernetes with Operator](https://docs.risingwave.com/docs/current/risingwave-kubernetes/).

**RisingWave Cloud** the easiest way to run a fully-fledged RisingWave cluster. Try it out for free at: [cloud.risingwave.com](https://cloud.risingwave.com).


## Why RisingWave for stream processing?
RisingWave adaptly tackles some of the most challenging problems in stream processing. Compared to existing stream processing systems, RisingWave shines through with the following key features:
* **Easy to learn**
RisingWave adaptly addresses some of the most challenging problems in stream processing. Compared to existing stream processing systems like [Apache Flink](https://flink.apache.org/), [Apache Spark Streaming](https://spark.apache.org/docs/latest/streaming-programming-guide.html), and [KsqlDB](https://ksqldb.io/), RisingWave stands out in two primary dimensions: **Ease-of-use** and **efficiency**, thanks to its **[PostgreSQL](https://www.postgresql.org/)-style interaction experience** and **[Snowflake](https://snowflake.com/)-like architectural design** (i.e., compute-storage decoupling).
### Ease-of-use
* **Simple to learn**
* RisingWave speaks PostgreSQL-style SQL, enabling users to dive into stream processing in much the same way as operating a PostgreSQL database.
* **Highly efficient in multi-stream joins**
* RisingWave has made significant optimizations for multiple stream join scenarios. Users can easily join 10-20 streams (or more) efficiently in a production environment.
* **Simple to verify correctness**
* RisingWave persists results in materialized views and allow users to break down complex stream computation programs into stacked materialized views, simplifying program development and result verification.
* **Simple to maintain and operate**
* RisingWave abstracts away unnecessary low-level details, allowing users to concentrate solely on SQL code-level issues.
* **Simple to integrate**
* With integrations to a diverse range of cloud systems and the PostgreSQL ecosystem, RisingWave boasts a rich and expansive ecosystem, making it straightforward to incorporate into existing infrastructures.

### Efficiency
* **High resource utilization**
* Queries in RisingWave leverage shared computational resources, eliminating the need for users to manually allocate resources for each query.
* **No compromise on large state management**
* The decoupled compute-storage architecture of RisingWave ensures remote persistence of internal states, and users never need to worry about the size of internal states when handling complex queries.
* **Highly efficient in multi-stream joins**
* RisingWave has made significant optimizations for multiple stream join scenarios. Users can easily join 10-20 streams (or more) efficiently in a production environment.
* **Transparent dynamic scaling**
* RisingWave supports near-instantaneous dynamic scaling without any service interruptions.
* **Instant failure recovery**
* RisingWave's state management mechanism allows it to recover from failure in seconds, not minutes or hours.
* **Easy to verify correctness**
* RisingWave persists results in materialized views and allow users to break down complex stream computation programs into stacked materialized views, simplifying program development and result verification.
* **Simplified data stack**
* RisingWave's ability to store data and serve queries eliminates the need for separate maintenance of stream processors and databases. Users can effortlessly connect RisingWave to their preferred BI tools or through client libraries.
* **Simple to maintain and operate**
* RisingWave abstracts away unnecessary low-level details, allowing users to concentrate solely on SQL code-level issues.
* **Rich ecosystem**
* With integrations to a diverse range of cloud systems and the PostgreSQL ecosystem, RisingWave boasts a rich and expansive ecosystem.

## RisingWave's limitations
RisingWave isn’t a panacea for all data engineering hurdles. It has its own set of limitations:
Expand All @@ -111,21 +121,17 @@ RisingWave isn’t a panacea for all data engineering hurdles. It has its own se
* RisingWave's row store design is tailored for optimal stream processing performance rather than interactive analytical workloads. Hence, it's not a suitable replacement for OLAP databases. Yet, a reliable integration with many OLAP databases exists, and a collaborative use of RisingWave and OLAP databases is a common practice among many users.


## RisingWave Cloud

RisingWave Cloud is a fully-managed and scalable stream processing platform powered by the open-source RisingWave project. Try it out for free at: [cloud.risingwave.com](https://cloud.risingwave.com).

## Notes on telemetry

RisingWave collects anonymous usage statistics to better understand how the community is using RisingWave. The sole intention of this exercise is to help improve the product. Users may opt out easily at any time. Please refer to the [user documentation](https://docs.risingwave.com/docs/current/telemetry/) for more details.

## In-production use cases
Like other stream processing systems, the primary use cases of RisingWave include monitoring, alerting, real-time dashboard reporting, streaming ETL (Extract, Transform, Load), machine learning feature engineering, and more. It has already been adopted in fields such as financial trading, manufacturing, new media, logistics, gaming, and more. Check out [customer stories](https://www.risingwave.com/use-cases/).

## Community

Looking for help, discussions, collaboration opportunities, or a casual afternoon chat with our fellow engineers and community members? Join our [Slack workspace](https://risingwave.com/slack)!

## Notes on telemetry

RisingWave collects anonymous usage statistics to better understand how the community is using RisingWave. The sole intention of this exercise is to help improve the product. Users may opt out easily at any time. Please refer to the [user documentation](https://docs.risingwave.com/docs/current/telemetry/) for more details.

## License

RisingWave is distributed under the Apache License (Version 2.0). Please refer to [LICENSE](LICENSE) for more information.
Expand Down
6 changes: 6 additions & 0 deletions e2e_test/batch/types/jsonb.slt.part
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,12 @@ null
true
NULL

# typed string
query TT
select jsonb 'true', JsonB '{}';
----
true {}

query T
select 'true'::jsonb::bool;
----
Expand Down
20 changes: 17 additions & 3 deletions proto/expr.proto
Original file line number Diff line number Diff line change
Expand Up @@ -212,16 +212,30 @@ message ExprNode {

// Jsonb functions

// jsonb -> int, jsonb -> text, jsonb #> text[] that returns jsonb
JSONB_ACCESS_INNER = 600;
// jsonb ->> int, jsonb ->> text, jsonb #>> text[] that returns text
// jsonb -> int, jsonb -> text that returns jsonb
JSONB_ACCESS = 600;
// jsonb ->> int, jsonb ->> text that returns text
JSONB_ACCESS_STR = 601;
// jsonb #> text[] -> jsonb
JSONB_ACCESS_MULTI = 613;
// jsonb #>> text[] -> text
JSONB_ACCESS_MULTI_STR = 614;
JSONB_TYPEOF = 602;
JSONB_ARRAY_LENGTH = 603;
IS_JSON = 604;
JSONB_CAT = 605;
JSONB_OBJECT = 606;
JSONB_PRETTY = 607;
// jsonb @> jsonb
JSONB_CONTAINS = 608;
// jsonb <@ jsonb
JSONB_CONTAINED = 609;
// jsonb ? text
JSONB_EXISTS = 610;
// jsonb ?| text[]
JSONB_EXISTS_ANY = 611;
// jsonb ?& text[]
JSONB_EXISTS_ALL = 612;

// Non-pure functions below (> 1000)
// ------------------------
Expand Down
10 changes: 10 additions & 0 deletions src/common/src/types/jsonb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,16 @@ impl<'a> JsonbRef<'a> {
self.0.as_null().is_some()
}

/// Returns true if this is a jsonb array.
pub fn is_array(&self) -> bool {
matches!(self.0, ValueRef::Array(_))
}

/// Returns true if this is a jsonb object.
pub fn is_object(&self) -> bool {
matches!(self.0, ValueRef::Object(_))
}

/// Returns the type name of this jsonb.
///
/// Possible values are: `null`, `boolean`, `number`, `string`, `array`, `object`.
Expand Down
4 changes: 2 additions & 2 deletions src/connector/src/source/pulsar/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,8 @@ pub struct PulsarProperties {
pub common: PulsarCommon,

#[serde(rename = "iceberg.enabled")]
#[serde_as(as = "Option<DisplayFromStr>")]
pub iceberg_loader_enabled: bool,
#[serde_as(as = "DisplayFromStr")]
pub iceberg_loader_enabled: Option<bool>,

#[serde(rename = "iceberg.bucket", default)]
pub iceberg_bucket: Option<String>,
Expand Down
2 changes: 1 addition & 1 deletion src/connector/src/source/pulsar/source/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ impl SplitReader for PulsarSplitReader {

tracing::debug!("creating consumer for pulsar split topic {}", topic,);

if props.iceberg_loader_enabled
if props.iceberg_loader_enabled.unwrap_or(false)
&& matches!(split.start_offset, PulsarEnumeratorOffset::Earliest)
&& !topic.starts_with("non-persistent://")
{
Expand Down
14 changes: 9 additions & 5 deletions src/expr/impl/src/scalar/cast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,14 +37,14 @@ use risingwave_pb::expr::expr_node::PbType;
#[function("cast(varchar) -> timestamp")]
#[function("cast(varchar) -> interval")]
#[function("cast(varchar) -> jsonb")]
pub fn str_parse<T>(elem: &str) -> Result<T>
pub fn str_parse<T>(elem: &str, ctx: &Context) -> Result<T>
where
T: FromStr,
<T as FromStr>::Err: std::fmt::Display,
{
elem.trim()
.parse()
.map_err(|err: <T as FromStr>::Err| ExprError::Parse(err.to_string().into()))
elem.trim().parse().map_err(|err: <T as FromStr>::Err| {
ExprError::Parse(format!("{} {}", ctx.return_type, err).into())
})
}

// TODO: introduce `FromBinary` and support all types
Expand Down Expand Up @@ -521,7 +521,11 @@ mod tests {
async fn test_unary() {
test_unary_bool::<BoolArray, _>(|x| !x, PbType::Not).await;
test_unary_date::<TimestampArray, _>(|x| try_cast(x).unwrap(), PbType::Cast).await;
test_str_to_int16::<I16Array, _>(|x| str_parse(x).unwrap()).await;
let ctx_str_to_int16 = Context {
arg_types: vec![DataType::Varchar],
return_type: DataType::Int16,
};
test_str_to_int16::<I16Array, _>(|x| str_parse(x, &ctx_str_to_int16).unwrap()).await;
}

#[tokio::test]
Expand Down
Loading

0 comments on commit a6e07be

Please sign in to comment.