Skip to content

Commit

Permalink
Merge branch 'yiming/remove-sync-finish-event' into yiming/dag-uploader
Browse files Browse the repository at this point in the history
  • Loading branch information
wenym1 committed Jun 26, 2024
2 parents eaac3f9 + 00e9d00 commit b0aad6d
Show file tree
Hide file tree
Showing 52 changed files with 1,005 additions and 291 deletions.
6 changes: 3 additions & 3 deletions ci/scripts/e2e-test-parallel-for-opendal.sh
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ host_args=(-h localhost -p 4565 -h localhost -p 4566 -h localhost -p 4567)
echo "--- e2e, ci-3cn-3fe-opendal-fs-backend, streaming"
RUST_LOG="info,risingwave_stream=info,risingwave_batch=info,risingwave_storage=info" \
risedev ci-start ci-3cn-3fe-opendal-fs-backend
sqllogictest "${host_args[@]}" -d dev './e2e_test/streaming/**/*.slt' -j 16 --junit "parallel-opendal-fs-backend-${profile}"
sqllogictest "${host_args[@]}" -d dev './e2e_test/streaming/**/*.slt' -j 16 --junit "parallel-opendal-fs-backend-${profile}" --label "parallel"

echo "--- Kill cluster Streaming"
risedev ci-kill
Expand All @@ -41,8 +41,8 @@ rm -rf /tmp/rw_ci
echo "--- e2e, ci-3cn-3fe-opendal-fs-backend, batch"
RUST_LOG="info,risingwave_stream=info,risingwave_batch=info,risingwave_storage=info" \
risedev ci-start ci-3cn-3fe-opendal-fs-backend
sqllogictest "${host_args[@]}" -d dev './e2e_test/ddl/**/*.slt' --junit "parallel-opendal-fs-backend-ddl-${profile}"
sqllogictest "${host_args[@]}" -d dev './e2e_test/visibility_mode/*.slt' -j 16 --junit "parallel-opendal-fs-backend-batch-${profile}"
sqllogictest "${host_args[@]}" -d dev './e2e_test/ddl/**/*.slt' --junit "parallel-opendal-fs-backend-ddl-${profile}" --label "parallel"
sqllogictest "${host_args[@]}" -d dev './e2e_test/visibility_mode/*.slt' -j 16 --junit "parallel-opendal-fs-backend-batch-${profile}" --label "parallel"

echo "--- Kill cluster Batch"
risedev ci-kill
Expand Down
6 changes: 3 additions & 3 deletions ci/scripts/e2e-test-parallel-in-memory.sh
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,15 @@ host_args=(-h localhost -p 4565 -h localhost -p 4566 -h localhost -p 4567)
echo "--- e2e, ci-3cn-3fe-in-memory, streaming"
risedev ci-start ci-3cn-3fe-in-memory
sqllogictest --version
sqllogictest "${host_args[@]}" -d dev './e2e_test/streaming/**/*.slt' -j 16 --junit "parallel-in-memory-streaming-${profile}" --label in-memory
sqllogictest "${host_args[@]}" -d dev './e2e_test/streaming/**/*.slt' -j 16 --junit "parallel-in-memory-streaming-${profile}" --label "in-memory" --label "parallel"

echo "--- Kill cluster"
risedev ci-kill

echo "--- e2e, ci-3cn-3fe-in-memory, batch"
risedev ci-start ci-3cn-3fe-in-memory
sqllogictest "${host_args[@]}" -d dev './e2e_test/ddl/**/*.slt' --junit "parallel-in-memory-batch-ddl-${profile}" --label in-memory
sqllogictest "${host_args[@]}" -d dev './e2e_test/batch/**/*.slt' -j 16 --junit "parallel-in-memory-batch-${profile}" --label in-memory
sqllogictest "${host_args[@]}" -d dev './e2e_test/ddl/**/*.slt' --junit "parallel-in-memory-batch-ddl-${profile}" --label "in-memory" --label "parallel"
sqllogictest "${host_args[@]}" -d dev './e2e_test/batch/**/*.slt' -j 16 --junit "parallel-in-memory-batch-${profile}" --label "in-memory" --label "parallel"

echo "--- Kill cluster"
risedev ci-kill
8 changes: 4 additions & 4 deletions ci/scripts/e2e-test-parallel.sh
Original file line number Diff line number Diff line change
Expand Up @@ -38,21 +38,21 @@ RUST_LOG="info,risingwave_stream=info,risingwave_batch=info,risingwave_storage=i
echo "--- e2e, ci-3streaming-2serving-3fe, streaming"
RUST_LOG=$RUST_LOG \
risedev ci-start ci-3streaming-2serving-3fe
sqllogictest "${host_args[@]}" -d dev './e2e_test/streaming/**/*.slt' -j 16 --junit "parallel-streaming-${profile}"
sqllogictest "${host_args[@]}" -d dev './e2e_test/streaming/**/*.slt' -j 16 --junit "parallel-streaming-${profile}" --label "parallel"

kill_cluster

echo "--- e2e, ci-3streaming-2serving-3fe, batch"
RUST_LOG=$RUST_LOG \
risedev ci-start ci-3streaming-2serving-3fe
sqllogictest "${host_args[@]}" -d dev './e2e_test/ddl/**/*.slt' --junit "parallel-batch-ddl-${profile}"
sqllogictest "${host_args[@]}" -d dev './e2e_test/visibility_mode/*.slt' -j 16 --junit "parallel-batch-${profile}"
sqllogictest "${host_args[@]}" -d dev './e2e_test/ddl/**/*.slt' --junit "parallel-batch-ddl-${profile}" --label "parallel"
sqllogictest "${host_args[@]}" -d dev './e2e_test/visibility_mode/*.slt' -j 16 --junit "parallel-batch-${profile}" --label "parallel"

kill_cluster

echo "--- e2e, ci-3streaming-2serving-3fe, generated"
RUST_LOG=$RUST_LOG \
risedev ci-start ci-3streaming-2serving-3fe
sqllogictest "${host_args[@]}" -d dev './e2e_test/generated/**/*.slt' -j 16 --junit "parallel-generated-${profile}"
sqllogictest "${host_args[@]}" -d dev './e2e_test/generated/**/*.slt' -j 16 --junit "parallel-generated-${profile}" --label "parallel"

kill_cluster
3 changes: 3 additions & 0 deletions ci/scripts/run-e2e-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,9 @@ RUST_LOG="info,risingwave_stream=info,risingwave_batch=info,risingwave_storage=i
cluster_start
# Please make sure the regression is expected before increasing the timeout.
sqllogictest -p 4566 -d dev './e2e_test/streaming/**/*.slt' --junit "streaming-${profile}"
if [[ "$mode" != "single-node" ]]; then
sqllogictest -p 4566 -d dev './e2e_test/streaming_now/**/*.slt' --junit "streaming-${profile}"
fi
sqllogictest -p 4566 -d dev './e2e_test/backfill/sink/different_pk_and_dist_key.slt'

echo "--- Kill cluster"
Expand Down
29 changes: 29 additions & 0 deletions e2e_test/batch/subquery/lateral_subquery.slt.part
Original file line number Diff line number Diff line change
Expand Up @@ -82,3 +82,32 @@ drop table all_sales;
statement ok
drop table salesperson;

statement ok
CREATE TABLE r(ts TIMESTAMPTZ, src_id int, dev_id int);

statement ok
INSERT INTO r VALUES
('2024-06-20T19:00:22Z'::TIMESTAMPTZ, 2, 2),
('2024-06-20T19:00:22Z'::TIMESTAMPTZ, 1, 3),
('2024-06-20T19:00:23Z'::TIMESTAMPTZ, 1, 2),
('2024-06-20T19:00:24Z'::TIMESTAMPTZ, 2, 1),
('2024-06-20T19:00:24Z'::TIMESTAMPTZ, 1, 2),
('2024-06-20T19:00:25Z'::TIMESTAMPTZ, 2, 1);

query TII rowsort
SELECT e.ts AS e_ts, d.*
FROM (
SELECT '2024-06-20T19:01:00Z'::TIMESTAMPTZ ts, 1::INT AS src_id) e
JOIN LATERAL
(
SELECT DISTINCT ON(src_id, dev_id) *
FROM r
WHERE r.src_id = e.src_id AND r.ts <= e.ts
ORDER BY src_id, dev_id, ts DESC
)d on true;
----
2024-06-20 19:01:00+00:00 2024-06-20 19:00:22+00:00 1 3
2024-06-20 19:01:00+00:00 2024-06-20 19:00:24+00:00 1 2

statement ok
DROP TABLE r CASCADE;
38 changes: 15 additions & 23 deletions e2e_test/source_inline/kafka/include_key_as.slt
Original file line number Diff line number Diff line change
Expand Up @@ -184,23 +184,11 @@ rpk topic create 'test_additional_columns'

# Note: `sh` doesn't have {..} brace expansion
system ok
bash -c 'for i in {0..100}; do echo "key\$i:{\\"a\\": \$i}" | rpk topic produce test_additional_columns -f "%k:%v\\n" -H "header1=v1" -H "header2=v2"; done'
bash -c 'for i in {0..10}; do echo "key\$i:{\\"a\\": \$i}" | rpk topic produce test_additional_columns -f "%k:%v\\n" -H "header1=v1" -H "header2=v2"; done'

statement ok
create table additional_columns (a int)
include key as key_col
include partition as partition_col
include offset as offset_col
include timestamp as timestamp_col
include header as header_col
WITH (
${RISEDEV_KAFKA_WITH_OPTIONS_COMMON},
topic = 'test_additional_columns')
FORMAT PLAIN ENCODE JSON

# header with varchar type & non-exist header key
statement error
create table additional_columns_1 (a int)
create table additional_columns (a int)
include key as key_col
include partition as partition_col
include offset as offset_col
Expand All @@ -220,13 +208,13 @@ Caused by:
Protocol error: Only header column can have inner field, but got "timestamp"


# header with varchar type & non-exist header key
statement ok
create table additional_columns_1 (a int)
create table additional_columns (a int)
include key as key_col
include partition as partition_col
include offset as offset_col
include timestamp as timestamp_col
include header as header_col_combined
include header 'header1' as header_col_1
include header 'header2' as header_col_2
include header 'header2' varchar as header_col_3
Expand All @@ -236,7 +224,6 @@ WITH (
topic = 'test_additional_columns')
FORMAT PLAIN ENCODE JSON


# Wait enough time to ensure SourceExecutor consumes all Kafka data.
sleep 3s

Expand Down Expand Up @@ -284,19 +271,21 @@ WHERE key_col IS NOT NULL
AND partition_col IS NOT NULL
AND offset_col IS NOT NULL
AND timestamp_col IS NOT NULL
AND header_col IS NOT NULL
AND header_col_combined IS NOT NULL
----
101
11


query ??
SELECT (header_col[1]).key AS key, (header_col[1]).value::text AS value
FROM additional_columns limit 1;
WITH arr AS (SELECT header_col_combined FROM additional_columns),
unnested AS (SELECT unnest(header_col_combined) FROM arr)
select *, count(*) from unnested group by 1 order by 1;
----
header1 \x7631
(header1,"\\x7631") 11
(header2,"\\x7632") 11

query ????
select header_col_1, header_col_2, header_col_3, header_col_4 from additional_columns_1 limit 1
select header_col_1, header_col_2, header_col_3, header_col_4 from additional_columns limit 1
----
\x7631 \x7632 v2 NULL

Expand All @@ -305,3 +294,6 @@ drop table upsert_students

statement ok
drop table plain_students

statement ok
drop table additional_columns
1 change: 1 addition & 0 deletions e2e_test/streaming_now/README
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Because currently the e2e test for `generate_series(..., now(), ...)` relies on external command `./risedev psql`, it cannot be tested in parallel, madsim and single-node mode. Let's just place it in this special folder and manually run it in `run-e2e-test.sh` to avoid complexity. Later if we introduce a new `now()`-like function that returns the time of statement execution, we'll be able to get rid of the external command and direcly create MV instead. We will move this to `streaming` folder at that time.
30 changes: 30 additions & 0 deletions e2e_test/streaming_now/now.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
system ok
./risedev psql -c "
create materialized view mv as
select * from generate_series(
to_timestamp($(date +%s)) - interval '10 second',
now(),
interval '1 second'
);
"

statement ok
flush;

query I
select count(*) >= 10 from mv;
----
t

sleep 2s

statement ok
flush;

query I
select count(*) >= 12 from mv;
----
t

statement ok
drop materialized view mv;
18 changes: 12 additions & 6 deletions grafana/README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# RisingWave Grafana Dashboard

The Grafana dashboard is generated with grafanalib. You'll need
The Grafana dashboard is generated with `grafanalib`. You'll need

- Python
- grafanalib
Expand Down Expand Up @@ -29,18 +29,25 @@ And don't forget to include the generated `risingwave-<xxx>-dashboard.json` in t
./update.sh
```

## Advanced Usage
## Multi-cluster Deployment

We can specify the source uid, dashboard uid, dashboard version, enable namespace filter and enable risingwave_name filter(used in multi-cluster deployment) via env variables.
The `generate.sh` supports multi-cluster deployment. The following environment variables are helpful:

For example, we can use the following query to generate dashboard json used in our benchmark cluster:
- `DASHBOARD_NAMESPACE_FILTER_ENABLED`: When set to `true`, a drop-down list will be added to the Grafana dashboard, and all Prometheus queries will be filtered by the selected namespace.
- `DASHBOARD_RISINGWAVE_NAME_FILTER_ENABLED`: When set to `true`, a drop-down list will be added to the Grafana dashboard, and all Prometheus queries will be filtered by the selected RisingWave name. This is useful when you have multiple RisingWave instances in the same namespace.
- `DASHBOARD_SOURCE_UID`: Set to the UID of your Prometheus source.
- `DASHBOARD_DYNAMIC_SOURCE`: Alternative to `DASHBOARD_SOURCE_UID`. When set to `true`, a drop-down list will be added to the Grafana dashboard to pick any one of the Prometheus sources.
- `DASHBOARD_UID`: Set to the UID of your Grafana dashboard.

See more details in the `common.py` file.

Examples:

```bash
DASHBOARD_NAMESPACE_FILTER_ENABLED=true \
DASHBOARD_RISINGWAVE_NAME_FILTER_ENABLED=true \
DASHBOARD_SOURCE_UID=<source_uid> \
DASHBOARD_UID=<dashboard_uid> \
DASHBOARD_VERSION=<version> \
./generate.sh
```

Expand All @@ -51,6 +58,5 @@ DASHBOARD_NAMESPACE_FILTER_ENABLED=true \
DASHBOARD_RISINGWAVE_NAME_FILTER_ENABLED=true \
DASHBOARD_DYNAMIC_SOURCE=true \
DASHBOARD_UID=<dashboard_uid> \
DASHBOARD_VERSION=<version> \
./generate.sh
```
12 changes: 12 additions & 0 deletions proto/stream_plan.proto
Original file line number Diff line number Diff line change
Expand Up @@ -725,9 +725,21 @@ message RowIdGenNode {
uint64 row_id_index = 1;
}

message NowModeUpdateCurrent {}

message NowModeGenerateSeries {
data.Datum start_timestamp = 1;
data.Datum interval = 2;
}

message NowNode {
// Persists emitted 'now'.
catalog.Table state_table = 1;

oneof mode {
NowModeUpdateCurrent update_current = 101;
NowModeGenerateSeries generate_series = 102;
}
}

message ValuesNode {
Expand Down
1 change: 0 additions & 1 deletion src/bench/sink_bench/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,6 @@ impl MockDatagenSource {
.unwrap();
let parser_config = ParserConfig {
specific: SpecificParserConfig {
key_encoding_config: None,
encoding_config: EncodingProperties::Native,
protocol_config: ProtocolProperties::Native,
},
Expand Down
5 changes: 5 additions & 0 deletions src/common/src/array/stream_chunk_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,11 @@ impl StreamChunkBuilder {
}
}

/// Get the current number of rows in the builder.
pub fn size(&self) -> usize {
self.size
}

/// Append an iterator of output index and datum to the builder, return a chunk if the builder
/// is full.
///
Expand Down
7 changes: 4 additions & 3 deletions src/common/src/util/epoch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -174,10 +174,11 @@ impl EpochPair {
Self::new(curr, curr - EPOCH_INC_MIN_STEP_FOR_TEST)
}
}
/// As most unit tests initializ a new epoch from a random value (e.g. 1, 2, 233 etc.), but the correct epoch in the system is a u64 with the last `EPOCH_AVAILABLE_BITS` bits set to 0.

/// As most unit tests initialize a new epoch from a random value (e.g. 1, 2, 233 etc.), but the correct epoch in the system is a u64 with the last `EPOCH_AVAILABLE_BITS` bits set to 0.
/// This method is to turn a a random epoch into a well shifted value.
pub const fn test_epoch(value: u64) -> u64 {
value << EPOCH_AVAILABLE_BITS
pub const fn test_epoch(value_millis: u64) -> u64 {
value_millis << EPOCH_AVAILABLE_BITS
}

/// There are numerous operations in our system's unit tests that involve incrementing or decrementing the epoch.
Expand Down
1 change: 0 additions & 1 deletion src/connector/src/parser/bytes_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,6 @@ mod tests {
async fn test_bytes_parser(get_payload: fn() -> Vec<Vec<u8>>) {
let descs = vec![SourceColumnDesc::simple("id", DataType::Bytea, 0.into())];
let props = SpecificParserConfig {
key_encoding_config: None,
encoding_config: EncodingProperties::Bytes(BytesProperties { column_name: None }),
protocol_config: ProtocolProperties::Plain,
};
Expand Down
8 changes: 2 additions & 6 deletions src/connector/src/parser/debezium/debezium_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ use risingwave_common::bail;
use super::simd_json_parser::DebeziumJsonAccessBuilder;
use super::{DebeziumAvroAccessBuilder, DebeziumAvroParserConfig};
use crate::error::ConnectorResult;
use crate::extract_key_config;
use crate::parser::unified::debezium::DebeziumChangeEvent;
use crate::parser::unified::json::TimestamptzHandling;
use crate::parser::unified::util::apply_row_operation_on_stream_chunk_writer;
Expand Down Expand Up @@ -89,8 +88,8 @@ impl DebeziumParser {
rw_columns: Vec<SourceColumnDesc>,
source_ctx: SourceContextRef,
) -> ConnectorResult<Self> {
let (key_config, key_type) = extract_key_config!(props);
let key_builder = build_accessor_builder(key_config, key_type).await?;
let key_builder =
build_accessor_builder(props.encoding_config.clone(), EncodingType::Key).await?;
let payload_builder =
build_accessor_builder(props.encoding_config, EncodingType::Value).await?;
let debezium_props = if let ProtocolProperties::Debezium(props) = props.protocol_config {
Expand All @@ -114,7 +113,6 @@ impl DebeziumParser {
use crate::parser::JsonProperties;

let props = SpecificParserConfig {
key_encoding_config: None,
encoding_config: EncodingProperties::Json(JsonProperties {
use_schema_registry: false,
timestamptz_handling: None,
Expand Down Expand Up @@ -225,7 +223,6 @@ mod tests {
.collect::<Vec<_>>();

let props = SpecificParserConfig {
key_encoding_config: None,
encoding_config: EncodingProperties::Json(JsonProperties {
use_schema_registry: false,
timestamptz_handling: None,
Expand Down Expand Up @@ -298,7 +295,6 @@ mod tests {
.collect::<Vec<_>>();

let props = SpecificParserConfig {
key_encoding_config: None,
encoding_config: EncodingProperties::Json(JsonProperties {
use_schema_registry: false,
timestamptz_handling: None,
Expand Down
1 change: 0 additions & 1 deletion src/connector/src/parser/debezium/simd_json_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,6 @@ mod tests {

async fn build_parser(rw_columns: Vec<SourceColumnDesc>) -> DebeziumParser {
let props = SpecificParserConfig {
key_encoding_config: None,
encoding_config: EncodingProperties::Json(JsonProperties {
use_schema_registry: false,
timestamptz_handling: None,
Expand Down
1 change: 0 additions & 1 deletion src/connector/src/parser/maxwell/simd_json_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ mod tests {
];

let props = SpecificParserConfig {
key_encoding_config: None,
encoding_config: EncodingProperties::Json(JsonProperties {
use_schema_registry: false,
timestamptz_handling: None,
Expand Down
Loading

0 comments on commit b0aad6d

Please sign in to comment.