Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/main' into peng/remove-pu-all
Browse files Browse the repository at this point in the history
  • Loading branch information
shanicky committed Jun 26, 2024
2 parents 7701da4 + 7cbb3a9 commit d5e05bb
Show file tree
Hide file tree
Showing 25 changed files with 350 additions and 389 deletions.
6 changes: 3 additions & 3 deletions ci/scripts/e2e-test-parallel-for-opendal.sh
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ host_args=(-h localhost -p 4565 -h localhost -p 4566 -h localhost -p 4567)
echo "--- e2e, ci-3cn-3fe-opendal-fs-backend, streaming"
RUST_LOG="info,risingwave_stream=info,risingwave_batch=info,risingwave_storage=info" \
risedev ci-start ci-3cn-3fe-opendal-fs-backend
sqllogictest "${host_args[@]}" -d dev './e2e_test/streaming/**/*.slt' -j 16 --junit "parallel-opendal-fs-backend-${profile}"
sqllogictest "${host_args[@]}" -d dev './e2e_test/streaming/**/*.slt' -j 16 --junit "parallel-opendal-fs-backend-${profile}" --label "parallel"

echo "--- Kill cluster Streaming"
risedev ci-kill
Expand All @@ -41,8 +41,8 @@ rm -rf /tmp/rw_ci
echo "--- e2e, ci-3cn-3fe-opendal-fs-backend, batch"
RUST_LOG="info,risingwave_stream=info,risingwave_batch=info,risingwave_storage=info" \
risedev ci-start ci-3cn-3fe-opendal-fs-backend
sqllogictest "${host_args[@]}" -d dev './e2e_test/ddl/**/*.slt' --junit "parallel-opendal-fs-backend-ddl-${profile}"
sqllogictest "${host_args[@]}" -d dev './e2e_test/visibility_mode/*.slt' -j 16 --junit "parallel-opendal-fs-backend-batch-${profile}"
sqllogictest "${host_args[@]}" -d dev './e2e_test/ddl/**/*.slt' --junit "parallel-opendal-fs-backend-ddl-${profile}" --label "parallel"
sqllogictest "${host_args[@]}" -d dev './e2e_test/visibility_mode/*.slt' -j 16 --junit "parallel-opendal-fs-backend-batch-${profile}" --label "parallel"

echo "--- Kill cluster Batch"
risedev ci-kill
Expand Down
6 changes: 3 additions & 3 deletions ci/scripts/e2e-test-parallel-in-memory.sh
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,15 @@ host_args=(-h localhost -p 4565 -h localhost -p 4566 -h localhost -p 4567)
echo "--- e2e, ci-3cn-3fe-in-memory, streaming"
risedev ci-start ci-3cn-3fe-in-memory
sqllogictest --version
sqllogictest "${host_args[@]}" -d dev './e2e_test/streaming/**/*.slt' -j 16 --junit "parallel-in-memory-streaming-${profile}" --label in-memory
sqllogictest "${host_args[@]}" -d dev './e2e_test/streaming/**/*.slt' -j 16 --junit "parallel-in-memory-streaming-${profile}" --label "in-memory" --label "parallel"

echo "--- Kill cluster"
risedev ci-kill

echo "--- e2e, ci-3cn-3fe-in-memory, batch"
risedev ci-start ci-3cn-3fe-in-memory
sqllogictest "${host_args[@]}" -d dev './e2e_test/ddl/**/*.slt' --junit "parallel-in-memory-batch-ddl-${profile}" --label in-memory
sqllogictest "${host_args[@]}" -d dev './e2e_test/batch/**/*.slt' -j 16 --junit "parallel-in-memory-batch-${profile}" --label in-memory
sqllogictest "${host_args[@]}" -d dev './e2e_test/ddl/**/*.slt' --junit "parallel-in-memory-batch-ddl-${profile}" --label "in-memory" --label "parallel"
sqllogictest "${host_args[@]}" -d dev './e2e_test/batch/**/*.slt' -j 16 --junit "parallel-in-memory-batch-${profile}" --label "in-memory" --label "parallel"

echo "--- Kill cluster"
risedev ci-kill
3 changes: 3 additions & 0 deletions ci/scripts/run-e2e-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,9 @@ RUST_LOG="info,risingwave_stream=info,risingwave_batch=info,risingwave_storage=i
cluster_start
# Please make sure the regression is expected before increasing the timeout.
sqllogictest -p 4566 -d dev './e2e_test/streaming/**/*.slt' --junit "streaming-${profile}"
if [[ "$mode" != "single-node" ]]; then
sqllogictest -p 4566 -d dev './e2e_test/streaming_now/**/*.slt' --junit "streaming-${profile}"
fi
sqllogictest -p 4566 -d dev './e2e_test/backfill/sink/different_pk_and_dist_key.slt'

echo "--- Kill cluster"
Expand Down
38 changes: 15 additions & 23 deletions e2e_test/source_inline/kafka/include_key_as.slt
Original file line number Diff line number Diff line change
Expand Up @@ -184,23 +184,11 @@ rpk topic create 'test_additional_columns'

# Note: `sh` doesn't have {..} brace expansion
system ok
bash -c 'for i in {0..100}; do echo "key\$i:{\\"a\\": \$i}" | rpk topic produce test_additional_columns -f "%k:%v\\n" -H "header1=v1" -H "header2=v2"; done'
bash -c 'for i in {0..10}; do echo "key\$i:{\\"a\\": \$i}" | rpk topic produce test_additional_columns -f "%k:%v\\n" -H "header1=v1" -H "header2=v2"; done'

statement ok
create table additional_columns (a int)
include key as key_col
include partition as partition_col
include offset as offset_col
include timestamp as timestamp_col
include header as header_col
WITH (
${RISEDEV_KAFKA_WITH_OPTIONS_COMMON},
topic = 'test_additional_columns')
FORMAT PLAIN ENCODE JSON

# header with varchar type & non-exist header key
statement error
create table additional_columns_1 (a int)
create table additional_columns (a int)
include key as key_col
include partition as partition_col
include offset as offset_col
Expand All @@ -220,13 +208,13 @@ Caused by:
Protocol error: Only header column can have inner field, but got "timestamp"


# header with varchar type & non-exist header key
statement ok
create table additional_columns_1 (a int)
create table additional_columns (a int)
include key as key_col
include partition as partition_col
include offset as offset_col
include timestamp as timestamp_col
include header as header_col_combined
include header 'header1' as header_col_1
include header 'header2' as header_col_2
include header 'header2' varchar as header_col_3
Expand All @@ -236,7 +224,6 @@ WITH (
topic = 'test_additional_columns')
FORMAT PLAIN ENCODE JSON


# Wait enough time to ensure SourceExecutor consumes all Kafka data.
sleep 3s

Expand Down Expand Up @@ -284,19 +271,21 @@ WHERE key_col IS NOT NULL
AND partition_col IS NOT NULL
AND offset_col IS NOT NULL
AND timestamp_col IS NOT NULL
AND header_col IS NOT NULL
AND header_col_combined IS NOT NULL
----
101
11


query ??
SELECT (header_col[1]).key AS key, (header_col[1]).value::text AS value
FROM additional_columns limit 1;
WITH arr AS (SELECT header_col_combined FROM additional_columns),
unnested AS (SELECT unnest(header_col_combined) FROM arr)
select *, count(*) from unnested group by 1 order by 1;
----
header1 \x7631
(header1,"\\x7631") 11
(header2,"\\x7632") 11

query ????
select header_col_1, header_col_2, header_col_3, header_col_4 from additional_columns_1 limit 1
select header_col_1, header_col_2, header_col_3, header_col_4 from additional_columns limit 1
----
\x7631 \x7632 v2 NULL

Expand All @@ -305,3 +294,6 @@ drop table upsert_students

statement ok
drop table plain_students

statement ok
drop table additional_columns
48 changes: 0 additions & 48 deletions e2e_test/streaming/now.slt

This file was deleted.

1 change: 1 addition & 0 deletions e2e_test/streaming_now/README
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Because currently the e2e test for `generate_series(..., now(), ...)` relies on external command `./risedev psql`, it cannot be tested in parallel, madsim and single-node mode. Let's just place it in this special folder and manually run it in `run-e2e-test.sh` to avoid complexity. Later if we introduce a new `now()`-like function that returns the time of statement execution, we'll be able to get rid of the external command and direcly create MV instead. We will move this to `streaming` folder at that time.
30 changes: 30 additions & 0 deletions e2e_test/streaming_now/now.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
system ok
./risedev psql -c "
create materialized view mv as
select * from generate_series(
to_timestamp($(date +%s)) - interval '10 second',
now(),
interval '1 second'
);
"

statement ok
flush;

query I
select count(*) >= 10 from mv;
----
t

sleep 2s

statement ok
flush;

query I
select count(*) >= 12 from mv;
----
t

statement ok
drop materialized view mv;
18 changes: 12 additions & 6 deletions grafana/README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# RisingWave Grafana Dashboard

The Grafana dashboard is generated with grafanalib. You'll need
The Grafana dashboard is generated with `grafanalib`. You'll need

- Python
- grafanalib
Expand Down Expand Up @@ -29,18 +29,25 @@ And don't forget to include the generated `risingwave-<xxx>-dashboard.json` in t
./update.sh
```

## Advanced Usage
## Multi-cluster Deployment

We can specify the source uid, dashboard uid, dashboard version, enable namespace filter and enable risingwave_name filter(used in multi-cluster deployment) via env variables.
The `generate.sh` supports multi-cluster deployment. The following environment variables are helpful:

For example, we can use the following query to generate dashboard json used in our benchmark cluster:
- `DASHBOARD_NAMESPACE_FILTER_ENABLED`: When set to `true`, a drop-down list will be added to the Grafana dashboard, and all Prometheus queries will be filtered by the selected namespace.
- `DASHBOARD_RISINGWAVE_NAME_FILTER_ENABLED`: When set to `true`, a drop-down list will be added to the Grafana dashboard, and all Prometheus queries will be filtered by the selected RisingWave name. This is useful when you have multiple RisingWave instances in the same namespace.
- `DASHBOARD_SOURCE_UID`: Set to the UID of your Prometheus source.
- `DASHBOARD_DYNAMIC_SOURCE`: Alternative to `DASHBOARD_SOURCE_UID`. When set to `true`, a drop-down list will be added to the Grafana dashboard to pick any one of the Prometheus sources.
- `DASHBOARD_UID`: Set to the UID of your Grafana dashboard.

See more details in the `common.py` file.

Examples:

```bash
DASHBOARD_NAMESPACE_FILTER_ENABLED=true \
DASHBOARD_RISINGWAVE_NAME_FILTER_ENABLED=true \
DASHBOARD_SOURCE_UID=<source_uid> \
DASHBOARD_UID=<dashboard_uid> \
DASHBOARD_VERSION=<version> \
./generate.sh
```

Expand All @@ -51,6 +58,5 @@ DASHBOARD_NAMESPACE_FILTER_ENABLED=true \
DASHBOARD_RISINGWAVE_NAME_FILTER_ENABLED=true \
DASHBOARD_DYNAMIC_SOURCE=true \
DASHBOARD_UID=<dashboard_uid> \
DASHBOARD_VERSION=<version> \
./generate.sh
```
1 change: 0 additions & 1 deletion src/bench/sink_bench/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,6 @@ impl MockDatagenSource {
.unwrap();
let parser_config = ParserConfig {
specific: SpecificParserConfig {
key_encoding_config: None,
encoding_config: EncodingProperties::Native,
protocol_config: ProtocolProperties::Native,
},
Expand Down
1 change: 0 additions & 1 deletion src/connector/src/parser/bytes_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,6 @@ mod tests {
async fn test_bytes_parser(get_payload: fn() -> Vec<Vec<u8>>) {
let descs = vec![SourceColumnDesc::simple("id", DataType::Bytea, 0.into())];
let props = SpecificParserConfig {
key_encoding_config: None,
encoding_config: EncodingProperties::Bytes(BytesProperties { column_name: None }),
protocol_config: ProtocolProperties::Plain,
};
Expand Down
8 changes: 2 additions & 6 deletions src/connector/src/parser/debezium/debezium_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ use risingwave_common::bail;
use super::simd_json_parser::DebeziumJsonAccessBuilder;
use super::{DebeziumAvroAccessBuilder, DebeziumAvroParserConfig};
use crate::error::ConnectorResult;
use crate::extract_key_config;
use crate::parser::unified::debezium::DebeziumChangeEvent;
use crate::parser::unified::json::TimestamptzHandling;
use crate::parser::unified::util::apply_row_operation_on_stream_chunk_writer;
Expand Down Expand Up @@ -89,8 +88,8 @@ impl DebeziumParser {
rw_columns: Vec<SourceColumnDesc>,
source_ctx: SourceContextRef,
) -> ConnectorResult<Self> {
let (key_config, key_type) = extract_key_config!(props);
let key_builder = build_accessor_builder(key_config, key_type).await?;
let key_builder =
build_accessor_builder(props.encoding_config.clone(), EncodingType::Key).await?;
let payload_builder =
build_accessor_builder(props.encoding_config, EncodingType::Value).await?;
let debezium_props = if let ProtocolProperties::Debezium(props) = props.protocol_config {
Expand All @@ -114,7 +113,6 @@ impl DebeziumParser {
use crate::parser::JsonProperties;

let props = SpecificParserConfig {
key_encoding_config: None,
encoding_config: EncodingProperties::Json(JsonProperties {
use_schema_registry: false,
timestamptz_handling: None,
Expand Down Expand Up @@ -225,7 +223,6 @@ mod tests {
.collect::<Vec<_>>();

let props = SpecificParserConfig {
key_encoding_config: None,
encoding_config: EncodingProperties::Json(JsonProperties {
use_schema_registry: false,
timestamptz_handling: None,
Expand Down Expand Up @@ -298,7 +295,6 @@ mod tests {
.collect::<Vec<_>>();

let props = SpecificParserConfig {
key_encoding_config: None,
encoding_config: EncodingProperties::Json(JsonProperties {
use_schema_registry: false,
timestamptz_handling: None,
Expand Down
1 change: 0 additions & 1 deletion src/connector/src/parser/debezium/simd_json_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,6 @@ mod tests {

async fn build_parser(rw_columns: Vec<SourceColumnDesc>) -> DebeziumParser {
let props = SpecificParserConfig {
key_encoding_config: None,
encoding_config: EncodingProperties::Json(JsonProperties {
use_schema_registry: false,
timestamptz_handling: None,
Expand Down
1 change: 0 additions & 1 deletion src/connector/src/parser/maxwell/simd_json_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ mod tests {
];

let props = SpecificParserConfig {
key_encoding_config: None,
encoding_config: EncodingProperties::Json(JsonProperties {
use_schema_registry: false,
timestamptz_handling: None,
Expand Down
3 changes: 0 additions & 3 deletions src/connector/src/parser/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1049,15 +1049,13 @@ pub struct CommonParserConfig {

#[derive(Debug, Clone, Default)]
pub struct SpecificParserConfig {
pub key_encoding_config: Option<EncodingProperties>,
pub encoding_config: EncodingProperties,
pub protocol_config: ProtocolProperties,
}

impl SpecificParserConfig {
// for test only
pub const DEFAULT_PLAIN_JSON: SpecificParserConfig = SpecificParserConfig {
key_encoding_config: None,
encoding_config: EncodingProperties::Json(JsonProperties {
use_schema_registry: false,
timestamptz_handling: None,
Expand Down Expand Up @@ -1278,7 +1276,6 @@ impl SpecificParserConfig {
}
};
Ok(Self {
key_encoding_config: None,
encoding_config,
protocol_config,
})
Expand Down
16 changes: 0 additions & 16 deletions src/connector/src/parser/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,22 +92,6 @@ macro_rules! only_parse_payload {
};
}

// Extract encoding config and encoding type from ParserProperties
// for message key.
//
// Suppose (A, B) is the combination of key/payload combination:
// For (None, B), key should be the the key setting from B
// For (A, B), key should be the value setting from A
#[macro_export]
macro_rules! extract_key_config {
($props:ident) => {
match $props.key_encoding_config {
Some(config) => (config, EncodingType::Value),
None => ($props.encoding_config.clone(), EncodingType::Key),
}
};
}

/// Load raw bytes from:
/// * local file, for on-premise or testing.
/// * http/https, for common usage.
Expand Down
1 change: 0 additions & 1 deletion src/connector/src/source/datagen/source/generator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,6 @@ mod tests {
use_schema_registry: false,
timestamptz_handling: None,
}),
key_encoding_config: None,
},
data_types,
rows_per_second,
Expand Down
Loading

0 comments on commit d5e05bb

Please sign in to comment.