Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/main' into xx/foyer-block-cache
Browse files Browse the repository at this point in the history
Signed-off-by: MrCroxx <[email protected]>
  • Loading branch information
MrCroxx committed Mar 13, 2024
2 parents 9294764 + a37d538 commit e0f4eae
Show file tree
Hide file tree
Showing 78 changed files with 1,820 additions and 1,395 deletions.
1 change: 0 additions & 1 deletion ci/scripts/cron-e2e-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,4 @@ set -euo pipefail

source ci/scripts/common.sh
export RUN_COMPACTION=0;
export RUN_META_BACKUP=1;
source ci/scripts/run-e2e-test.sh
4 changes: 1 addition & 3 deletions ci/scripts/pr.env.sh
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,4 @@
set -euo pipefail

# Don't run e2e compaction test in PR build
export RUN_COMPACTION=0;
# Don't run meta store backup/recovery test
export RUN_META_BACKUP=0;
export RUN_COMPACTION=0;
14 changes: 0 additions & 14 deletions ci/scripts/run-e2e-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -192,20 +192,6 @@ if [[ "$RUN_COMPACTION" -eq "1" ]]; then
cluster_stop
fi

if [[ "$RUN_META_BACKUP" -eq "1" ]]; then
echo "--- e2e, ci-meta-backup-test"
test_root="src/storage/backup/integration_tests"
BACKUP_TEST_MCLI=".risingwave/bin/mcli" \
BACKUP_TEST_MCLI_CONFIG=".risingwave/config/mcli" \
BACKUP_TEST_RW_ALL_IN_ONE="target/debug/risingwave" \
RW_HUMMOCK_URL="hummock+minio://hummockadmin:[email protected]:9301/hummock001" \
RW_META_ADDR="http://127.0.0.1:5690" \
RUST_LOG="info,risingwave_stream=info,risingwave_batch=info,risingwave_storage=info" \
bash "${test_root}/run_all.sh"
echo "--- Kill cluster"
cargo make kill
fi

if [[ "$mode" == "standalone" ]]; then
run_sql() {
psql -h localhost -p 4566 -d dev -U root -c "$@"
Expand Down
80 changes: 80 additions & 0 deletions ci/scripts/run-meta-backup-test.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
#!/usr/bin/env bash

# Exits as soon as any line fails.
set -euo pipefail

source ci/scripts/common.sh

while getopts 'p:m:' opt; do
case ${opt} in
p )
profile=$OPTARG
;;
m )
mode=$OPTARG
;;
\? )
echo "Invalid Option: -$OPTARG" 1>&2
exit 1
;;
: )
echo "Invalid option: $OPTARG requires an argument" 1>&2
;;
esac
done
shift $((OPTIND -1))

if [[ $mode == "standalone" ]]; then
source ci/scripts/standalone-utils.sh
fi

if [[ $mode == "single-node" ]]; then
source ci/scripts/single-node-utils.sh
fi

cluster_start() {
if [[ $mode == "standalone" ]]; then
mkdir -p "$PREFIX_LOG"
cargo make clean-data
cargo make pre-start-dev
start_standalone "$PREFIX_LOG"/standalone.log &
cargo make dev standalone-minio-etcd
elif [[ $mode == "single-node" ]]; then
mkdir -p "$PREFIX_LOG"
cargo make clean-data
cargo make pre-start-dev
start_single_node "$PREFIX_LOG"/single-node.log &
# Give it a while to make sure the single-node is ready.
sleep 3
else
cargo make ci-start "$mode"
fi
}

cluster_stop() {
if [[ $mode == "standalone" ]]
then
stop_standalone
# Don't check standalone logs, they will exceed the limit.
cargo make kill
elif [[ $mode == "single-node" ]]
then
stop_single_node
else
cargo make ci-kill
fi
}

download_and_prepare_rw "$profile" common

echo "--- e2e, ci-meta-backup-test"
test_root="src/storage/backup/integration_tests"
BACKUP_TEST_MCLI=".risingwave/bin/mcli" \
BACKUP_TEST_MCLI_CONFIG=".risingwave/config/mcli" \
BACKUP_TEST_RW_ALL_IN_ONE="target/debug/risingwave" \
RW_HUMMOCK_URL="hummock+minio://hummockadmin:[email protected]:9301/hummock001" \
RW_META_ADDR="http://127.0.0.1:5690" \
RUST_LOG="info,risingwave_stream=info,risingwave_batch=info,risingwave_storage=info" \
bash "${test_root}/run_all.sh"
echo "--- Kill cluster"
cargo make kill
22 changes: 21 additions & 1 deletion ci/workflows/main-cron.yml
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,27 @@ steps:
config: ci/docker-compose.yml
mount-buildkite-agent: true
- ./ci/plugins/upload-failure-logs
timeout_in_minutes: 65
timeout_in_minutes: 15
retry: *auto-retry

- label: "meta backup test (release)"
key: "e2e-meta-backup-test-release"
command: "ci/scripts/run-meta-backup-test.sh -p ci-release -m ci-3streaming-2serving-3fe"
if: |
!(build.pull_request.labels includes "ci/main-cron/skip-ci") && build.env("CI_STEPS") == null
|| build.pull_request.labels includes "ci/run-e2e-meta-backup-test"
|| build.env("CI_STEPS") =~ /(^|,)e2e-tests?(,|$$)/
depends_on:
- "build"
- "build-other"
- "docslt"
plugins:
- docker-compose#v5.1.0:
run: rw-build-env
config: ci/docker-compose.yml
mount-buildkite-agent: true
- ./ci/plugins/upload-failure-logs
timeout_in_minutes: 45
retry: *auto-retry

- label: "end-to-end test (parallel) (release)"
Expand Down
2 changes: 1 addition & 1 deletion docker/docker-compose-distributed.yml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
---
version: "3"
x-image: &image
image: ${RW_IMAGE:-risingwavelabs/risingwave:v1.6.1}
image: ${RW_IMAGE:-risingwavelabs/risingwave:v1.7.1}
services:
compactor-0:
<<: *image
Expand Down
2 changes: 1 addition & 1 deletion docker/docker-compose-with-azblob.yml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
---
version: "3"
x-image: &image
image: ${RW_IMAGE:-risingwavelabs/risingwave:v1.6.1}
image: ${RW_IMAGE:-risingwavelabs/risingwave:v1.7.1}
services:
risingwave-standalone:
<<: *image
Expand Down
2 changes: 1 addition & 1 deletion docker/docker-compose-with-gcs.yml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
---
version: "3"
x-image: &image
image: ${RW_IMAGE:-risingwavelabs/risingwave:v1.6.1}
image: ${RW_IMAGE:-risingwavelabs/risingwave:v1.7.1}
services:
risingwave-standalone:
<<: *image
Expand Down
6 changes: 3 additions & 3 deletions docker/docker-compose-with-hdfs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ services:
reservations:
memory: 1G
compute-node-0:
image: "ghcr.io/risingwavelabs/risingwave:RisingWave_1.6.1_HDFS_2.7-x86_64"
image: "ghcr.io/risingwavelabs/risingwave:RisingWave_1.7.1_HDFS_2.7-x86_64"
command:
- compute-node
- "--listen-addr"
Expand Down Expand Up @@ -132,7 +132,7 @@ services:
retries: 5
restart: always
frontend-node-0:
image: "ghcr.io/risingwavelabs/risingwave:RisingWave_1.6.1_HDFS_2.7-x86_64"
image: "ghcr.io/risingwavelabs/risingwave:RisingWave_1.7.1_HDFS_2.7-x86_64"
command:
- frontend-node
- "--listen-addr"
Expand Down Expand Up @@ -195,7 +195,7 @@ services:
retries: 5
restart: always
meta-node-0:
image: "ghcr.io/risingwavelabs/risingwave:RisingWave_1.6.1_HDFS_2.7-x86_64"
image: "ghcr.io/risingwavelabs/risingwave:RisingWave_1.7.1_HDFS_2.7-x86_64"
command:
- meta-node
- "--listen-addr"
Expand Down
2 changes: 1 addition & 1 deletion docker/docker-compose-with-obs.yml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
---
version: "3"
x-image: &image
image: ${RW_IMAGE:-risingwavelabs/risingwave:v1.6.1}
image: ${RW_IMAGE:-risingwavelabs/risingwave:v1.7.1}
services:
risingwave-standalone:
<<: *image
Expand Down
2 changes: 1 addition & 1 deletion docker/docker-compose-with-oss.yml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
---
version: "3"
x-image: &image
image: ${RW_IMAGE:-risingwavelabs/risingwave:v1.6.1}
image: ${RW_IMAGE:-risingwavelabs/risingwave:v1.7.1}
services:
risingwave-standalone:
<<: *image
Expand Down
2 changes: 1 addition & 1 deletion docker/docker-compose-with-s3.yml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
---
version: "3"
x-image: &image
image: ${RW_IMAGE:-risingwavelabs/risingwave:v1.6.1}
image: ${RW_IMAGE:-risingwavelabs/risingwave:v1.7.1}
services:
risingwave-standalone:
<<: *image
Expand Down
2 changes: 1 addition & 1 deletion docker/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
---
version: "3"
x-image: &image
image: ${RW_IMAGE:-risingwavelabs/risingwave:v1.6.1}
image: ${RW_IMAGE:-risingwavelabs/risingwave:v1.7.1}
services:
risingwave-standalone:
<<: *image
Expand Down
44 changes: 21 additions & 23 deletions proto/stream_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -44,33 +44,13 @@ message DropActorsResponse {
common.Status status = 2;
}

message ForceStopActorsRequest {
string request_id = 1;
uint64 prev_epoch = 2;
}

message ForceStopActorsResponse {
string request_id = 1;
common.Status status = 2;
}

message InjectBarrierRequest {
string request_id = 1;
stream_plan.Barrier barrier = 2;
repeated uint32 actor_ids_to_send = 3;
repeated uint32 actor_ids_to_collect = 4;
}

message InjectBarrierResponse {
string request_id = 1;
common.Status status = 2;
}

message BarrierCompleteRequest {
string request_id = 1;
uint64 prev_epoch = 2;
map<string, string> tracing_context = 3;
}
message BarrierCompleteResponse {
message CreateMviewProgress {
uint32 backfill_actor_id = 1;
Expand Down Expand Up @@ -104,15 +84,33 @@ message WaitEpochCommitResponse {
common.Status status = 1;
}

message StreamingControlStreamRequest {
message InitRequest {
uint64 prev_epoch = 2;
}

oneof request {
InitRequest init = 1;
InjectBarrierRequest inject_barrier = 2;
}
}

message StreamingControlStreamResponse {
message InitResponse {}

oneof response {
InitResponse init = 1;
BarrierCompleteResponse complete_barrier = 2;
}
}

service StreamService {
rpc UpdateActors(UpdateActorsRequest) returns (UpdateActorsResponse);
rpc BuildActors(BuildActorsRequest) returns (BuildActorsResponse);
rpc BroadcastActorInfoTable(BroadcastActorInfoTableRequest) returns (BroadcastActorInfoTableResponse);
rpc DropActors(DropActorsRequest) returns (DropActorsResponse);
rpc ForceStopActors(ForceStopActorsRequest) returns (ForceStopActorsResponse);
rpc InjectBarrier(InjectBarrierRequest) returns (InjectBarrierResponse);
rpc BarrierComplete(BarrierCompleteRequest) returns (BarrierCompleteResponse);
rpc WaitEpochCommit(WaitEpochCommitRequest) returns (WaitEpochCommitResponse);
rpc StreamingControlStream(stream StreamingControlStreamRequest) returns (stream StreamingControlStreamResponse);
}

// TODO: Lifecycle management for actors.
37 changes: 18 additions & 19 deletions src/batch/src/executor/row_seq_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ use risingwave_storage::store::PrefetchOptions;
use risingwave_storage::table::batch_table::storage_table::StorageTable;
use risingwave_storage::table::{collect_data_chunk, TableDistribution};
use risingwave_storage::{dispatch_state_store, StateStore};
use rw_futures_util::select_all;

use crate::error::{BatchError, Result};
use crate::executor::{
Expand Down Expand Up @@ -319,28 +318,28 @@ impl<S: StateStore> RowSeqScanExecutor<S> {
}

// Range Scan
let range_scans = select_all(range_scans.into_iter().map(|range_scan| {
let table = table.clone();
let histogram = histogram.clone();
Box::pin(Self::execute_range(
table,
range_scan,
// WARN: DO NOT use `select` to execute range scans concurrently
// it can consume too much memory if there're too many ranges.
for range in range_scans {
let stream = Self::execute_range(
table.clone(),
range,
ordered,
epoch.clone(),
chunk_size,
limit,
histogram,
))
}));
#[for_await]
for chunk in range_scans {
let chunk = chunk?;
returned += chunk.cardinality() as u64;
yield chunk;
if let Some(limit) = &limit
&& returned >= *limit
{
return Ok(());
histogram.clone(),
);
#[for_await]
for chunk in stream {
let chunk = chunk?;
returned += chunk.cardinality() as u64;
yield chunk;
if let Some(limit) = &limit
&& returned >= *limit
{
return Ok(());
}
}
}
}
Expand Down
Loading

0 comments on commit e0f4eae

Please sign in to comment.