Skip to content

Commit

Permalink
Merge branch 'main' of https://github.com/risingwavelabs/risingwave i…
Browse files Browse the repository at this point in the history
…nto li0k/storage_adjust_base_compaction
  • Loading branch information
Li0k committed Aug 15, 2024
2 parents 1a44234 + a1872f3 commit 52a3566
Show file tree
Hide file tree
Showing 24 changed files with 457 additions and 79 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@
RisingWave is a Postgres-compatible SQL engine engineered to provide the <i><b>simplest</b></i> and <i><b>most cost-efficient</b></i> approach for <b>processing</b>, <b>analyzing</b>, and <b>managing</b> real-time event streaming data.

![RisingWave](https://github.com/risingwavelabs/risingwave/assets/41638002/10c44404-f78b-43ce-bbd9-3646690acc59)
![RisingWave](./docs/dev/src/images/architecture_20240814.png)

## When to use RisingWave?
RisingWave can ingest millions of events per second, continuously join live data streams with historical tables, and serve ad-hoc queries in real-time. Typical use cases include, but are not limited to:
Expand Down
4 changes: 1 addition & 3 deletions ci/scripts/run-e2e-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -90,9 +90,7 @@ echo "--- e2e, $mode, batch"
RUST_LOG="info,risingwave_stream=info,risingwave_batch=info,risingwave_storage=info" \
cluster_start
sqllogictest -p 4566 -d dev './e2e_test/ddl/**/*.slt' --junit "batch-ddl-${profile}" --label "can-use-recover"
if [[ "$mode" != "single-node" ]]; then
sqllogictest -p 4566 -d dev './e2e_test/background_ddl/basic.slt' --junit "batch-ddl-${profile}"
fi
sqllogictest -p 4566 -d dev './e2e_test/background_ddl/basic.slt' --junit "batch-ddl-${profile}"

if [[ $mode != "single-node" ]]; then
sqllogictest -p 4566 -d dev './e2e_test/visibility_mode/*.slt' --junit "batch-${profile}"
Expand Down
Binary file added docs/dev/src/images/architecture_20240814.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
195 changes: 195 additions & 0 deletions e2e_test/sink/license.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,195 @@
statement ok
SET RW_IMPLICIT_FLUSH TO true;

statement ok
ALTER SYSTEM SET license_key TO '';

statement ok
CREATE TABLE t (k INT);

statement error
CREATE SINK dynamodb_sink
FROM
t
WITH
(
connector = 'dynamodb',
table = 'xx',
primary_key = 'k',
region = 'xx',
access_key = 'xx',
secret_key = 'xx'
);
----
db error: ERROR: Failed to run the query

Caused by these errors (recent errors listed first):
1: gRPC request to meta service failed: Internal error
2: failed to validate sink
3: Internal error
4: feature DynamoDbSink is only available for tier Paid and above, while the current tier is Free

Hint: You may want to set a license key with `ALTER SYSTEM SET license_key = '...';` command.


statement error
CREATE SINK snowflake_sink
FROM t
WITH (
connector = 'snowflake',
type = 'append-only',
force_append_only = 'true',
s3.bucket_name = 'xx',
s3.credentials.access = 'xx',
s3.credentials.secret = 'xx',
s3.region_name = 'xx',
s3.path = 'xx',
);
----
db error: ERROR: Failed to run the query

Caused by these errors (recent errors listed first):
1: gRPC request to meta service failed: Internal error
2: failed to validate sink
3: Internal error
4: feature SnowflakeSink is only available for tier Paid and above, while the current tier is Free

Hint: You may want to set a license key with `ALTER SYSTEM SET license_key = '...';` command.


statement error
CREATE SINK opensearch_sink
FROM t
WITH (
connector = 'opensearch',
url = 'xx',
username = 'xx',
password = 'xx',
);
----
db error: ERROR: Failed to run the query

Caused by these errors (recent errors listed first):
1: gRPC request to meta service failed: Internal error
2: failed to validate sink
3: feature OpenSearchSink is only available for tier Paid and above, while the current tier is Free

Hint: You may want to set a license key with `ALTER SYSTEM SET license_key = '...';` command.


statement error
CREATE SINK bigquery_sink
FROM
t
WITH
(
connector = 'bigquery',
type = 'append-only',
force_append_only='true',
bigquery.local.path= 'xx',
bigquery.project= 'xx',
bigquery.dataset= 'xx',
bigquery.table= 'xx'
);
----
db error: ERROR: Failed to run the query

Caused by these errors (recent errors listed first):
1: gRPC request to meta service failed: Internal error
2: failed to validate sink
3: Internal error
4: feature BigQuerySink is only available for tier Paid and above, while the current tier is Free

Hint: You may want to set a license key with `ALTER SYSTEM SET license_key = '...';` command.


statement ok
ALTER SYSTEM SET license_key TO DEFAULT;

statement ok
flush;

statement error
CREATE SINK dynamodb_sink
FROM
t
WITH
(
connector = 'dynamodb',
table = 'xx',
primary_key = 'xx',
region = 'xx',
access_key = 'xx',
secret_key = 'xx'
);
----
db error: ERROR: Failed to run the query

Caused by these errors (recent errors listed first):
1: Sink error
2: Sink primary key column not found: xx. Please use ',' as the delimiter for different primary key columns.


statement ok
CREATE SINK snowflake_sink
FROM t
WITH (
connector = 'snowflake',
type = 'append-only',
force_append_only = 'true',
s3.bucket_name = 'xx',
s3.credentials.access = 'xx',
s3.credentials.secret = 'xx',
s3.region_name = 'xx',
s3.path = 'xx',
);


statement error
CREATE SINK opensearch_sink
FROM t
WITH (
connector = 'opensearch',
url = 'xx',
username = 'xx',
password = 'xx',
index = 'xx',
);
----
db error: ERROR: Failed to run the query

Caused by these errors (recent errors listed first):
1: gRPC request to meta service failed: Internal error
2: failed to validate sink
3: sink cannot pass validation: INTERNAL: Connection is closed


statement error
CREATE SINK bigquery_sink
FROM
t
WITH
(
connector = 'bigquery',
type = 'append-only',
force_append_only='true',
bigquery.local.path= 'xx',
bigquery.project= 'xx',
bigquery.dataset= 'xx',
bigquery.table= 'xx'
);
----
db error: ERROR: Failed to run the query

Caused by these errors (recent errors listed first):
1: gRPC request to meta service failed: Internal error
2: failed to validate sink
3: BigQuery error
4: No such file or directory (os error 2)


statement ok
DROP SINK snowflake_sink;

statement ok
DROP TABLE t;
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
# Single phase approx percentile
statement ok
create table t(p_col double, grp_col int);

statement ok
insert into t select a, 1 from generate_series(-1000, 1000) t(a);

statement ok
flush;

query I
select
percentile_cont(0.01) within group (order by p_col) as p01,
min(p_col),
percentile_cont(0.5) within group (order by p_col) as p50,
count(*),
percentile_cont(0.99) within group (order by p_col) as p99
from t;
----
-980 -1000 0 2001 980

statement ok
create materialized view m1 as
select
approx_percentile(0.01, 0.01) within group (order by p_col) as p01,
min(p_col),
approx_percentile(0.5, 0.01) within group (order by p_col) as p50,
count(*),
approx_percentile(0.99, 0.01) within group (order by p_col) as p99
from t;

query I
select * from m1;
----
-982.5779489474152 -1000 0 2001 982.5779489474152

# Test state encode / decode
onlyif can-use-recover
statement ok
recover;

onlyif can-use-recover
sleep 10s

query I
select * from m1;
----
-982.5779489474152 -1000 0 2001 982.5779489474152

# Test 0<x<1 values
statement ok
insert into t select 0.001, 1 from generate_series(1, 500);

statement ok
insert into t select 0.0001, 1 from generate_series(1, 501);

statement ok
flush;

query I
select * from m1;
----
-963.1209598593477 -1000 0.00009999833511933609 3002 963.1209598593477

query I
select
percentile_cont(0.01) within group (order by p_col) as p01,
min(p_col),
percentile_cont(0.5) within group (order by p_col) as p50,
count(*),
percentile_cont(0.99) within group (order by p_col) as p99
from t;
----
-969.99 -1000 0.0001 3002 969.9899999999998

statement ok
drop materialized view m1;

statement ok
drop table t;
Original file line number Diff line number Diff line change
Expand Up @@ -47,19 +47,6 @@ select * from m1;
----
-982.5779489474152 0 0 2001 982.5779489474152

# Test state encode / decode
onlyif can-use-recover
statement ok
recover;

onlyif can-use-recover
sleep 10s

query I
select * from m1;
----
-982.5779489474152 0 0 2001 982.5779489474152

# Test 0<x<1 values
statement ok
insert into t select 0.001, 1 from generate_series(1, 500);
Expand Down
3 changes: 3 additions & 0 deletions e2e_test/streaming/union.slt
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,9 @@ Caused by:
Invalid input syntax: When CORRESPONDING is specified, at least one column of the left side shall have a column name that is the column name of some column of the right side in a UNION operation. Left side query column list: ("v1", "v2", "v4"). Right side query column list: ("vxx").


statement ok
drop table txx;

statement ok
drop table t1;

Expand Down
4 changes: 2 additions & 2 deletions src/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1842,11 +1842,11 @@ pub mod default {
}

pub fn memory_controller_threshold_graceful() -> f64 {
0.8
0.81
}

pub fn memory_controller_threshold_stable() -> f64 {
0.7
0.72
}

pub fn memory_controller_eviction_factor_aggressive() -> f64 {
Expand Down
4 changes: 2 additions & 2 deletions src/config/example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -113,8 +113,8 @@ stream_exchange_concurrent_dispatchers = 0
stream_dml_channel_initial_permits = 32768
stream_hash_agg_max_dirty_groups_heap_size = 67108864
stream_memory_controller_threshold_aggressive = 0.9
stream_memory_controller_threshold_graceful = 0.8
stream_memory_controller_threshold_stable = 0.7
stream_memory_controller_threshold_graceful = 0.81
stream_memory_controller_threshold_stable = 0.72
stream_memory_controller_eviction_factor_aggressive = 2.0
stream_memory_controller_eviction_factor_graceful = 1.5
stream_memory_controller_eviction_factor_stable = 1.0
Expand Down
3 changes: 3 additions & 0 deletions src/connector/src/sink/big_query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -358,6 +358,9 @@ impl Sink for BigQuerySink {
}

async fn validate(&self) -> Result<()> {
risingwave_common::license::Feature::BigQuerySink
.check_available()
.map_err(|e| anyhow::anyhow!(e))?;
if !self.is_append_only && self.pk_indices.is_empty() {
return Err(SinkError::Config(anyhow!(
"Primary key not defined for upsert bigquery sink (please define in `primary_key` field)")));
Expand Down
3 changes: 3 additions & 0 deletions src/connector/src/sink/dynamodb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,9 @@ impl Sink for DynamoDbSink {
const SINK_NAME: &'static str = DYNAMO_DB_SINK;

async fn validate(&self) -> Result<()> {
risingwave_common::license::Feature::DynamoDbSink
.check_available()
.map_err(|e| anyhow::anyhow!(e))?;
let client = (self.config.build_client().await)
.context("validate DynamoDB sink error")
.map_err(SinkError::DynamoDb)?;
Expand Down
4 changes: 2 additions & 2 deletions src/connector/src/sink/elasticsearch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use risingwave_common::types::{JsonbVal, Scalar, ToText};
use serde_json::Value;

use super::encoder::{JsonEncoder, RowEncoder};
use super::remote::{ElasticSearchSink, OpensearchSink};
use super::remote::{ElasticSearchSink, OpenSearchSink};
use crate::sink::{Result, Sink};
pub const ES_OPTION_DELIMITER: &str = "delimiter";
pub const ES_OPTION_INDEX_COLUMN: &str = "index_column";
Expand Down Expand Up @@ -172,5 +172,5 @@ impl EsStreamChunkConverter {
}

pub fn is_es_sink(sink_name: &str) -> bool {
sink_name == ElasticSearchSink::SINK_NAME || sink_name == OpensearchSink::SINK_NAME
sink_name == ElasticSearchSink::SINK_NAME || sink_name == OpenSearchSink::SINK_NAME
}
2 changes: 1 addition & 1 deletion src/connector/src/sink/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ macro_rules! for_all_sinks {
{ Nats, $crate::sink::nats::NatsSink },
{ Jdbc, $crate::sink::remote::JdbcSink },
{ ElasticSearch, $crate::sink::remote::ElasticSearchSink },
{ Opensearch, $crate::sink::remote::OpensearchSink },
{ Opensearch, $crate::sink::remote::OpenSearchSink },
{ Cassandra, $crate::sink::remote::CassandraSink },
{ HttpJava, $crate::sink::remote::HttpJavaSink },
{ Doris, $crate::sink::doris::DorisSink },
Expand Down
Loading

0 comments on commit 52a3566

Please sign in to comment.