Skip to content

Commit

Permalink
Merge branch 'main' into patrick/sink-rate-limit.pr
Browse files Browse the repository at this point in the history
  • Loading branch information
MrCroxx authored Dec 23, 2024
2 parents 7189a7e + 6d4f966 commit 1c0aac7
Show file tree
Hide file tree
Showing 32 changed files with 232 additions and 99 deletions.
25 changes: 24 additions & 1 deletion ci/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,27 @@ services:
timeout: 5s
retries: 5

# TODO: reuse the same mysql instance for connector test and meta store
# after https://github.com/risingwavelabs/risingwave/issues/19783 addressed
mysql-meta:
image: mysql:8.0
command: --character-set-server=utf8 --collation-server=utf8_general_ci
ports:
- 3306
environment:
- MYSQL_ROOT_PASSWORD=123456
- MYSQL_USER=mysqluser
- MYSQL_PASSWORD=mysqlpw
healthcheck:
test:
[
"CMD-SHELL",
"mysqladmin ping -h 127.0.0.1 -u root -p123456"
]
interval: 5s
timeout: 5s
retries: 5

message_queue:
image: "redpandadata/redpanda:latest"
command:
Expand Down Expand Up @@ -72,6 +93,7 @@ services:
image: public.ecr.aws/w1p7b4n3/rw-build-env:v20241213
depends_on:
- mysql
- mysql-meta
- sqlserver-server
- db
- message_queue
Expand All @@ -87,6 +109,7 @@ services:
image: public.ecr.aws/w1p7b4n3/rw-build-env:v20241213
depends_on:
- mysql
- mysql-meta
- db
- message_queue
- schemaregistry
Expand Down Expand Up @@ -114,7 +137,7 @@ services:
ci-standard-env:
image: public.ecr.aws/w1p7b4n3/rw-build-env:v20241213
depends_on:
- mysql
- mysql-meta
- db
volumes:
- ..:/risingwave
Expand Down
2 changes: 1 addition & 1 deletion ci/scripts/run-e2e-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ cluster_start() {
risedev pre-start-dev
start_single_node "$PREFIX_LOG"/single-node.log &
# Give it a while to make sure the single-node is ready.
sleep 10
sleep 15
else
risedev ci-start "$mode"
fi
Expand Down
9 changes: 4 additions & 5 deletions ci/workflows/main-cron.yml
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,10 @@ sql-backend: &sql-backend
backend: "postgres"
# PGPASSWORD=postgres psql -h db -p 5432 -U postgres -d rwmeta
endpoint: "postgres://postgres:postgres@db:5432/rwmeta"
# Temporarily disable tests for mysql backend as there are unresolved issues.
# - with:
# backend: "mysql"
# # mysql -h mysql-meta -P 3306 -u root -p123456 -D rwmeta
# endpoint: "mysql://root:123456@mysql-meta:3306/rwmeta"
- with:
backend: "mysql"
# mysql -h mysql-meta -P 3306 -u root -p123456 -D rwmeta
endpoint: "mysql://root:123456@mysql-meta:3306/rwmeta"
env:
RISEDEV_SQL_ENDPOINT: "{{matrix.endpoint}}"

Expand Down
9 changes: 4 additions & 5 deletions ci/workflows/pull-request.yml
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,10 @@ other-sql-backend: &other-sql-backend
label: "postgres"
# PGPASSWORD=postgres psql -h db -p 5432 -U postgres -d rwmeta
endpoint: "postgres://postgres:postgres@db:5432/rwmeta"
# Temporarily disable tests for mysql backend as there are unresolved issues.
# - with:
# label: "mysql"
# # mysql -h mysql-meta -P 3306 -u root -p123456 -D rwmeta
# endpoint: "mysql://root:123456@mysql-meta:3306/rwmeta"
- with:
label: "mysql"
# mysql -h mysql-meta -P 3306 -u root -p123456 -D rwmeta
endpoint: "mysql://root:123456@mysql-meta:3306/rwmeta"
env:
RISEDEV_SQL_ENDPOINT: "{{matrix.endpoint}}"

Expand Down
22 changes: 22 additions & 0 deletions e2e_test/iceberg/test_case/iceberg_engine.slt
Original file line number Diff line number Diff line change
Expand Up @@ -66,4 +66,26 @@ statement ok
DROP TABLE full_type_t;


# test connector with commit_checkpoint_interval
statement ok
create table nexmark_t (
id BIGINT,
item_name VARCHAR,
description VARCHAR,
initial_bid BIGINT,
reserve BIGINT,
date_time TIMESTAMP,
expires TIMESTAMP,
seller BIGINT,
category BIGINT,
extra VARCHAR)
with (
connector = 'nexmark',
nexmark.table.type = 'Auction',
nexmark.split.num = '2',
nexmark.min.event.gap.in.ns = '500000',
commit_checkpoint_interval = 1
) engine = iceberg;

statement ok
DROP TABLE nexmark_t
2 changes: 1 addition & 1 deletion src/batch/executors/src/executor/join/local_lookup_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -404,7 +404,7 @@ impl BoxedExecutorBuilder for LocalLookupJoinExecutorBuilder {
let worker_slot_mapping: HashMap<WorkerSlotId, WorkerNode> = worker_nodes
.iter()
.flat_map(|worker| {
(0..(worker.parallelism()))
(0..(worker.compute_node_parallelism()))
.map(|i| (WorkerSlotId::new(worker.id, i), worker.clone()))
})
.collect();
Expand Down
5 changes: 4 additions & 1 deletion src/batch/src/worker_manager/worker_node_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -336,7 +336,10 @@ impl WorkerNodeSelector {
} else {
self.apply_worker_node_mask(self.manager.list_serving_worker_nodes())
};
worker_nodes.iter().map(|node| node.parallelism()).sum()
worker_nodes
.iter()
.map(|node| node.compute_node_parallelism())
.sum()
}

pub fn fragment_mapping(&self, fragment_id: FragmentId) -> Result<WorkerSlotMapping> {
Expand Down
2 changes: 1 addition & 1 deletion src/common/src/vnode_mapping/vnode_placement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ pub fn place_vnode(
.iter()
.filter(|w| w.property.as_ref().map_or(false, |p| p.is_serving))
.sorted_by_key(|w| w.id)
.map(|w| (0..w.parallelism()).map(|idx| WorkerSlotId::new(w.id, idx)))
.map(|w| (0..w.compute_node_parallelism()).map(|idx| WorkerSlotId::new(w.id, idx)))
.collect();

// Set serving parallelism to the minimum of total number of worker slots, specified
Expand Down
2 changes: 2 additions & 0 deletions src/ctl/src/cmd_impl/meta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,15 @@
// limitations under the License.

mod backup_meta;
mod check;
mod cluster_info;
mod connection;
mod pause_resume;
mod reschedule;
mod serving;

pub use backup_meta::*;
pub use check::*;
pub use cluster_info::*;
pub use connection::*;
pub use pause_resume::*;
Expand Down
33 changes: 33 additions & 0 deletions src/ctl/src/cmd_impl/meta/check.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
// Copyright 2024 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::process::exit;

use risingwave_meta::controller::catalog::CatalogController;
use sea_orm::TransactionTrait;

pub async fn graph_check(endpoint: String) -> anyhow::Result<()> {
let conn = sea_orm::Database::connect(sea_orm::ConnectOptions::new(endpoint)).await?;
let txn = conn.begin().await?;
match CatalogController::graph_check(&txn).await {
Ok(_) => {
println!("all integrity check passed!");
exit(0);
}
Err(_) => {
println!("integrity check failed!");
exit(1);
}
}
}
11 changes: 11 additions & 0 deletions src/ctl/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -444,6 +444,14 @@ enum MetaCommands {
#[clap(long)]
props: String,
},

/// Performing graph check for scaling.
#[clap(verbatim_doc_comment)]
GraphCheck {
/// SQL endpoint
#[clap(long, required = true)]
endpoint: String,
},
}

#[derive(Subcommand, Clone, Debug)]
Expand Down Expand Up @@ -817,6 +825,9 @@ async fn start_impl(opts: CliOpts, context: &CtlContext) -> Result<()> {
Commands::Meta(MetaCommands::ValidateSource { props }) => {
cmd_impl::meta::validate_source(context, props).await?
}
Commands::Meta(MetaCommands::GraphCheck { endpoint }) => {
cmd_impl::meta::graph_check(endpoint).await?
}
Commands::AwaitTree => cmd_impl::await_tree::dump(context).await?,
Commands::Profile(ProfileCommands::Cpu { sleep }) => {
cmd_impl::profile::cpu_profile(context, sleep).await?
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ struct RwWorkerNode {
port: Option<String>,
r#type: String,
state: String,
parallelism: i32,
parallelism: Option<i32>,
is_streaming: Option<bool>,
is_serving: Option<bool>,
is_unschedulable: Option<bool>,
Expand Down Expand Up @@ -59,11 +59,7 @@ async fn read_rw_worker_nodes_info(reader: &SysCatalogReaderImpl) -> Result<Vec<
port: host.map(|h| h.port.to_string()),
r#type: worker.get_type().unwrap().as_str_name().into(),
state: worker.get_state().unwrap().as_str_name().into(),
parallelism: if is_compute {
worker.parallelism() as i32
} else {
0
},
parallelism: worker.parallelism().map(|parallelism| parallelism as _),
is_streaming: if is_compute {
property.map(|p| p.is_streaming)
} else {
Expand Down
12 changes: 9 additions & 3 deletions src/frontend/src/handler/create_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ use risingwave_common::util::sort_util::{ColumnOrder, OrderType};
use risingwave_common::util::value_encoding::DatumToProtoExt;
use risingwave_common::{bail, bail_not_implemented};
use risingwave_connector::jvm_runtime::JVM;
use risingwave_connector::sink::decouple_checkpoint_log_sink::COMMIT_CHECKPOINT_INTERVAL;
use risingwave_connector::source::cdc::build_cdc_table_id;
use risingwave_connector::source::cdc::external::{
ExternalTableConfig, ExternalTableImpl, DATABASE_NAME_KEY, SCHEMA_NAME_KEY, TABLE_NAME_KEY,
Expand Down Expand Up @@ -1381,7 +1382,7 @@ pub async fn handle_create_table(
pub async fn create_iceberg_engine_table(
session: Arc<SessionImpl>,
handler_args: HandlerArgs,
source: Option<PbSource>,
mut source: Option<PbSource>,
table: PbTable,
graph: StreamFragmentGraph,
job_type: TableJobType,
Expand Down Expand Up @@ -1621,7 +1622,7 @@ pub async fn create_iceberg_engine_table(
with.insert("table.name".to_owned(), iceberg_table_name.clone());
let commit_checkpoint_interval = handler_args
.with_options
.get("commit_checkpoint_interval")
.get(COMMIT_CHECKPOINT_INTERVAL)
.map(|v| v.to_owned())
.unwrap_or_else(|| "60".to_owned());
let commit_checkpoint_interval = commit_checkpoint_interval.parse::<u32>().map_err(|_| {
Expand All @@ -1635,13 +1636,18 @@ pub async fn create_iceberg_engine_table(
bail!("commit_checkpoint_interval must be a positive integer: 0");
}

// remove commit_checkpoint_interval from source options, otherwise it will be considered as an unknown field.
source
.as_mut()
.map(|x| x.with_properties.remove(COMMIT_CHECKPOINT_INTERVAL));

let sink_decouple = session.config().sink_decouple();
if matches!(sink_decouple, SinkDecouple::Disable) && commit_checkpoint_interval > 1 {
bail!("config conflict: `commit_checkpoint_interval` larger than 1 means that sink decouple must be enabled, but session config sink_decouple is disabled")
}

with.insert(
"commit_checkpoint_interval".to_owned(),
COMMIT_CHECKPOINT_INTERVAL.to_owned(),
commit_checkpoint_interval.to_string(),
);
with.insert("create_table_if_not_exists".to_owned(), "true".to_owned());
Expand Down
4 changes: 2 additions & 2 deletions src/frontend/src/handler/show.rs
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ struct ShowClusterRow {
addr: String,
r#type: String,
state: String,
parallelism: i32,
parallelism: Option<i32>,
is_streaming: Option<bool>,
is_serving: Option<bool>,
is_unschedulable: Option<bool>,
Expand Down Expand Up @@ -483,7 +483,7 @@ pub async fn handle_show_object(
addr: addr.to_string(),
r#type: worker.get_type().unwrap().as_str_name().into(),
state: worker.get_state().unwrap().as_str_name().to_owned(),
parallelism: worker.parallelism() as _,
parallelism: worker.parallelism().map(|parallelism| parallelism as i32),
is_streaming: property.map(|p| p.is_streaming),
is_serving: property.map(|p| p.is_serving),
is_unschedulable: property.map(|p| p.is_unschedulable),
Expand Down
1 change: 1 addition & 0 deletions src/frontend/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
#![feature(iterator_try_collect)]
#![feature(used_with_arg)]
#![feature(try_trait_v2)]
#![feature(cell_update)]
#![recursion_limit = "256"]

#[cfg(test)]
Expand Down
Loading

0 comments on commit 1c0aac7

Please sign in to comment.