Skip to content

Commit

Permalink
Merge branch 'main' into yiming/remove-separate-mutation-subscriber
Browse files Browse the repository at this point in the history
  • Loading branch information
wenym1 committed Sep 6, 2024
2 parents 2679248 + e2c89f4 commit c1a44ab
Show file tree
Hide file tree
Showing 107 changed files with 1,920 additions and 1,141 deletions.
1 change: 1 addition & 0 deletions .typos.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,4 +36,5 @@ extend-exclude = [
# We don't want to fix "fals" here, but may want in other places.
# Ideally, we should just ignore that line: https://github.com/crate-ci/typos/issues/316
"src/common/src/cast/mod.rs",
"src/tests/simulation/tests/integration_tests/scale/shared_source.rs",
]
5 changes: 5 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion ci/scripts/e2e-source-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ echo "> inserted new rows into postgres"

# start cluster w/o clean-data
unset RISINGWAVE_CI
export RUST_LOG="risingwave_stream=debug,risingwave_batch=info,risingwave_storage=info" \
export RUST_LOG="risingwave_stream=debug,risingwave_batch=info,risingwave_storage=info"

risedev dev ci-1cn-1fe-with-recovery
echo "> wait for cluster recovery finish"
Expand Down
3 changes: 3 additions & 0 deletions e2e_test/backup_restore/tpch_snapshot_create.slt
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
include ../tpch/create_tables.slt.part

statement ok
CREATE SECRET secret1 WITH (backend = 'meta') AS 'demo-secret'

# First, insert the data into the tables
include ../tpch/insert_customer.slt.part
include ../tpch/insert_lineitem.slt.part
Expand Down
3 changes: 3 additions & 0 deletions e2e_test/backup_restore/tpch_snapshot_drop.slt
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
statement ok
DROP SECRET secret1;

statement ok
drop materialized view tpch_q7;

Expand Down
1 change: 1 addition & 0 deletions e2e_test/batch/catalog/pg_settings.slt.part
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ user backfill_rate_limit
user background_ddl
user batch_enable_distributed_dml
user batch_parallelism
user bypass_cluster_limits
user bytea_output
user cdc_source_wait_streaming_start_timeout
user client_encoding
Expand Down
3 changes: 3 additions & 0 deletions e2e_test/commands/risectl
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
#!/usr/bin/env bash

RUST_LOG="error" .risingwave/bin/risingwave/risectl "$@"
81 changes: 81 additions & 0 deletions e2e_test/source_inline/kafka/shared_source.slt
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,87 @@ internal_table.mjs --name s0 --type source
3,"{""split_info"": {""partition"": 3, ""start_offset"": 11, ""stop_offset"": null, ""topic"": ""shared_source""}, ""split_type"": ""kafka""}"


# # Note: the parallelism depends on the risedev profile.
# # So scale tests below are commented out.

# query ???
# select name, flags, parallelism from rw_fragments JOIN rw_relations ON rw_fragments.table_id = rw_relations.id order by name;
# ----
# mv_1 {MVIEW,SOURCE_SCAN} 5
# mv_2 {MVIEW,SOURCE_SCAN} 5
# s0 {SOURCE} 5


# system ok
# risectl meta source-split-info --ignore-id
# ----
# Table
# Fragment (Source)
# Actor (1 splits): [0]
# Actor (1 splits): [2]
# Actor (1 splits): [3]
# Actor (1 splits): [1]
# Actor (0 splits): []
# Table
# Fragment (SourceScan)
# Actor (1 splits): [0] <- Upstream Actor #1055: [0]
# Actor (1 splits): [2] <- Upstream Actor #1056: [2]
# Actor (1 splits): [3] <- Upstream Actor #1057: [3]
# Actor (1 splits): [1] <- Upstream Actor #1058: [1]
# Actor (0 splits): [] <- Upstream Actor #1059: []
# Table
# Fragment (SourceScan)
# Actor (1 splits): [0] <- Upstream Actor #1055: [0]
# Actor (1 splits): [2] <- Upstream Actor #1056: [2]
# Actor (1 splits): [3] <- Upstream Actor #1057: [3]
# Actor (1 splits): [1] <- Upstream Actor #1058: [1]
# Actor (0 splits): [] <- Upstream Actor #1059: []


# # scale down
# statement ok
# ALTER MATERIALIZED VIEW mv_1 SET PARALLELISM TO 2;

# # should have no effect, because of NoShuffle
# # TODO: support ALTER SOURCE SET PARALLELISM, then we can
# query ???
# select name, flags, parallelism from rw_fragments JOIN rw_relations ON rw_fragments.table_id = rw_relations.id order by name;
# ----
# mv_1 {MVIEW,SOURCE_SCAN} 5
# mv_2 {MVIEW,SOURCE_SCAN} 5
# s0 {SOURCE} 5

# system ok
# risectl meta source-split-info --ignore-id
# ----
# Table
# Fragment (Source)
# Actor (1 splits): [0]
# Actor (1 splits): [2]
# Actor (1 splits): [3]
# Actor (1 splits): [1]
# Actor (0 splits): []
# Table
# Fragment (SourceScan)
# Actor (1 splits): [0] <- Upstream Actor #1055: [0]
# Actor (1 splits): [2] <- Upstream Actor #1056: [2]
# Actor (1 splits): [3] <- Upstream Actor #1057: [3]
# Actor (1 splits): [1] <- Upstream Actor #1058: [1]
# Actor (0 splits): [] <- Upstream Actor #1059: []
# Table
# Fragment (SourceScan)
# Actor (1 splits): [0] <- Upstream Actor #1055: [0]
# Actor (1 splits): [2] <- Upstream Actor #1056: [2]
# Actor (1 splits): [3] <- Upstream Actor #1057: [3]
# Actor (1 splits): [1] <- Upstream Actor #1058: [1]
# Actor (0 splits): [] <- Upstream Actor #1059: []


# # Manual test: change the parallelism of the compute node, kill and restart, and check
# # risedev ctl meta source-split-info --ignore-id
# # risedev psql -c "select name, flags, parallelism from rw_fragments JOIN rw_relations ON rw_fragments.table_id = rw_relations.id order by name;"


statement ok
drop source s0 cascade;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,12 +71,13 @@ public JDBCSink(JDBCSinkConfig config, TableSchema tableSchema) {
.collect(Collectors.toList());

LOG.info(
"schema = {}, table = {}, tableSchema = {}, columnSqlTypes = {}, pkIndices = {}",
"schema = {}, table = {}, tableSchema = {}, columnSqlTypes = {}, pkIndices = {}, queryTimeout = {}",
config.getSchemaName(),
config.getTableName(),
tableSchema,
columnSqlTypes,
pkIndices);
pkIndices,
config.getQueryTimeout());

if (factory.isPresent()) {
this.jdbcDialect = factory.get().create(columnSqlTypes, pkIndices);
Expand All @@ -92,7 +93,7 @@ public JDBCSink(JDBCSinkConfig config, TableSchema tableSchema) {
// Commit the `getTransactionIsolation`
conn.commit();

jdbcStatements = new JdbcStatements(conn);
jdbcStatements = new JdbcStatements(conn, config.getQueryTimeout());
} catch (SQLException e) {
throw Status.INTERNAL
.withDescription(
Expand Down Expand Up @@ -173,7 +174,7 @@ public boolean write(Iterable<SinkRow> rows) {
conn = JdbcUtils.getConnection(config.getJdbcUrl());
// reset the flag since we will retry to prepare the batch again
updateFlag = false;
jdbcStatements = new JdbcStatements(conn);
jdbcStatements = new JdbcStatements(conn, config.getQueryTimeout());
} else {
throw io.grpc.Status.INTERNAL
.withDescription(
Expand Down Expand Up @@ -206,13 +207,15 @@ public boolean write(Iterable<SinkRow> rows) {
* across multiple batches if only the JDBC connection is valid.
*/
class JdbcStatements implements AutoCloseable {
private final int queryTimeoutSecs;
private PreparedStatement deleteStatement;
private PreparedStatement upsertStatement;
private PreparedStatement insertStatement;

private final Connection conn;

public JdbcStatements(Connection conn) throws SQLException {
public JdbcStatements(Connection conn, int queryTimeoutSecs) throws SQLException {
this.queryTimeoutSecs = queryTimeoutSecs;
this.conn = conn;
var schemaTableName =
jdbcDialect.createSchemaTableName(
Expand Down Expand Up @@ -339,6 +342,9 @@ private void executeStatement(PreparedStatement stmt) throws SQLException {
if (stmt == null) {
return;
}
// if timeout occurs, a SQLTimeoutException will be thrown
// and we will retry to write the stream chunk in `JDBCSink.write`
stmt.setQueryTimeout(queryTimeoutSecs);
LOG.debug("Executing statement: {}", stmt);
stmt.executeBatch();
stmt.clearParameters();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@ public class JDBCSinkConfig extends CommonSinkConfig {
@JsonProperty(value = "schema.name")
private String schemaName;

@JsonProperty(value = "jdbc.query.timeout")
private int queryTimeoutSeconds = 600;

@JsonCreator
public JDBCSinkConfig(
@JsonProperty(value = "jdbc.url") String jdbcUrl,
Expand Down Expand Up @@ -62,4 +65,8 @@ public String getSinkType() {
public boolean isUpsertSink() {
return this.isUpsertSink;
}

public int getQueryTimeout() {
return queryTimeoutSeconds;
}
}
27 changes: 27 additions & 0 deletions proto/meta.proto
Original file line number Diff line number Diff line change
Expand Up @@ -791,3 +791,30 @@ message RelationIdInfos {
// relation_id -> FragmentIdToActorIdMap
map<uint32, FragmentIdToActorIdMap> map = 1;
}

message ActorCountPerParallelism {
message WorkerActorCount {
uint64 actor_count = 1;
uint64 parallelism = 2;
}
map<uint32, WorkerActorCount> worker_id_to_actor_count = 1;
uint64 hard_limit = 2;
uint64 soft_limit = 3;
}

message ClusterLimit {
oneof limit {
ActorCountPerParallelism actor_count = 1;
// TODO: limit DDL using compaction pending bytes
}
}

message GetClusterLimitsRequest {}

message GetClusterLimitsResponse {
repeated ClusterLimit active_limits = 1;
}

service ClusterLimitService {
rpc GetClusterLimits(GetClusterLimitsRequest) returns (GetClusterLimitsResponse);
}
11 changes: 0 additions & 11 deletions proto/stream_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,6 @@ message BuildActorInfo {
map<uint32, SubscriptionIds> related_subscriptions = 2;
}

message DropActorsRequest {
string request_id = 1;
repeated uint32 actor_ids = 2;
}

message DropActorsResponse {
string request_id = 1;
common.Status status = 2;
}

message InjectBarrierRequest {

Check failure on line 20 in proto/stream_service.proto

View workflow job for this annotation

GitHub Actions / Check breaking changes in Protobuf files

Previously present field "7" with name "actor_ids_to_pre_sync_barrier_mutation" on message "InjectBarrierRequest" was deleted without reserving the name "actor_ids_to_pre_sync_barrier_mutation".

Check failure on line 20 in proto/stream_service.proto

View workflow job for this annotation

GitHub Actions / Check breaking changes in Protobuf files

Previously present field "7" with name "actor_ids_to_pre_sync_barrier_mutation" on message "InjectBarrierRequest" was deleted without reserving the number "7".
string request_id = 1;
stream_plan.Barrier barrier = 2;
Expand Down Expand Up @@ -99,7 +89,6 @@ message StreamingControlStreamResponse {
}

service StreamService {
rpc DropActors(DropActorsRequest) returns (DropActorsResponse);
rpc WaitEpochCommit(WaitEpochCommitRequest) returns (WaitEpochCommitResponse);
rpc StreamingControlStream(stream StreamingControlStreamRequest) returns (stream StreamingControlStreamResponse);
}
Expand Down
8 changes: 5 additions & 3 deletions src/batch/src/executor/join/distributed_lookup_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,9 @@ use std::mem::swap;

use futures::pin_mut;
use itertools::Itertools;
use risingwave_common::bitmap::Bitmap;
use risingwave_common::catalog::{ColumnDesc, ColumnId, Field, Schema};
use risingwave_common::hash::{HashKey, HashKeyDispatcher};
use risingwave_common::hash::{HashKey, HashKeyDispatcher, VirtualNode};
use risingwave_common::memory::MemoryContext;
use risingwave_common::row::OwnedRow;
use risingwave_common::types::{DataType, Datum};
Expand All @@ -30,7 +31,7 @@ use risingwave_pb::batch_plan::plan_node::NodeBody;
use risingwave_pb::common::BatchQueryEpoch;
use risingwave_storage::store::PrefetchOptions;
use risingwave_storage::table::batch_table::storage_table::StorageTable;
use risingwave_storage::table::{TableDistribution, TableIter};
use risingwave_storage::table::TableIter;
use risingwave_storage::{dispatch_state_store, StateStore};

use crate::error::Result;
Expand Down Expand Up @@ -194,7 +195,8 @@ impl BoxedExecutorBuilder for DistributedLookupJoinExecutorBuilder {
.collect();

// Lookup Join always contains distribution key, so we don't need vnode bitmap
let vnodes = Some(TableDistribution::all_vnodes());
// TODO(var-vnode): use vnode count from table desc
let vnodes = Some(Bitmap::ones(VirtualNode::COUNT).into());
dispatch_state_store!(source.context().state_store(), state_store, {
let table = StorageTable::new_partial(state_store, column_ids, vnodes, table_desc);
let inner_side_builder = InnerSideExecutorBuilder::new(
Expand Down
9 changes: 4 additions & 5 deletions src/batch/src/executor/join/local_lookup_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use std::marker::PhantomData;

use anyhow::Context;
use itertools::Itertools;
use risingwave_common::bitmap::BitmapBuilder;
use risingwave_common::bitmap::{Bitmap, BitmapBuilder};
use risingwave_common::catalog::{ColumnDesc, Field, Schema};
use risingwave_common::hash::table_distribution::TableDistribution;
use risingwave_common::hash::{
Expand Down Expand Up @@ -408,12 +408,11 @@ impl BoxedExecutorBuilder for LocalLookupJoinExecutorBuilder {
})
.collect();

// TODO(var-vnode): use vnode count from table desc
let vnodes = Some(Bitmap::ones(VirtualNode::COUNT).into());
let inner_side_builder = InnerSideExecutorBuilder {
table_desc: table_desc.clone(),
table_distribution: TableDistribution::new_from_storage_table_desc(
Some(TableDistribution::all_vnodes()),
table_desc,
),
table_distribution: TableDistribution::new_from_storage_table_desc(vnodes, table_desc),
vnode_mapping,
outer_side_key_types,
inner_side_schema,
Expand Down
6 changes: 4 additions & 2 deletions src/batch/src/executor/log_row_seq_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,14 @@ use prometheus::Histogram;
use risingwave_common::array::{DataChunk, Op};
use risingwave_common::bitmap::Bitmap;
use risingwave_common::catalog::{ColumnId, Field, Schema};
use risingwave_common::hash::VirtualNode;
use risingwave_common::row::{Row, RowExt};
use risingwave_common::types::ScalarImpl;
use risingwave_pb::batch_plan::plan_node::NodeBody;
use risingwave_pb::common::{batch_query_epoch, BatchQueryEpoch};
use risingwave_pb::plan_common::StorageTableDesc;
use risingwave_storage::table::batch_table::storage_table::StorageTable;
use risingwave_storage::table::{collect_data_chunk, TableDistribution};
use risingwave_storage::table::collect_data_chunk;
use risingwave_storage::{dispatch_state_store, StateStore};

use super::{BoxedDataChunkStream, BoxedExecutor, BoxedExecutorBuilder, Executor, ExecutorBuilder};
Expand Down Expand Up @@ -106,7 +107,8 @@ impl BoxedExecutorBuilder for LogStoreRowSeqScanExecutorBuilder {
Some(vnodes) => Some(Bitmap::from(vnodes).into()),
// This is possible for dml. vnode_bitmap is not filled by scheduler.
// Or it's single distribution, e.g., distinct agg. We scan in a single executor.
None => Some(TableDistribution::all_vnodes()),
// TODO(var-vnode): use vnode count from table desc
None => Some(Bitmap::ones(VirtualNode::COUNT).into()),
};

let chunk_size = source.context.get_config().developer.chunk_size as u32;
Expand Down
Loading

0 comments on commit c1a44ab

Please sign in to comment.