Skip to content

Commit

Permalink
Merge branch 'main' into rc/fix-flaky-udf-test
Browse files Browse the repository at this point in the history
  • Loading branch information
xxchan authored Aug 20, 2024
2 parents 1f12741 + 281a696 commit 1294db7
Show file tree
Hide file tree
Showing 80 changed files with 1,173 additions and 153 deletions.
9 changes: 9 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,15 @@ incremental = true
split-debuginfo = "packed"
lto = "off"

# Patch profile for production clusters.
# It will trade-off lto for faster build time.
[profile.patch-production]
inherits = "production"
debug = "full"
incremental = false
split-debuginfo = "packed"
lto = "off"

[profile.production]
inherits = "release"
incremental = false
Expand Down
6 changes: 5 additions & 1 deletion ci/scripts/docker.sh
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ set -euo pipefail
ghcraddr="ghcr.io/risingwavelabs/risingwave"
dockerhubaddr="risingwavelabs/risingwave"
arch="$(uname -m)"
CARGO_PROFILE=${CARGO_PROFILE:-production}

echo "--- ghcr login"
echo "$GHCR_TOKEN" | docker login ghcr.io -u "$GHCR_USERNAME" --password-stdin
Expand All @@ -23,12 +24,15 @@ fi

# Build RisingWave docker image ${BUILDKITE_COMMIT}-${arch}
echo "--- docker build and tag"
echo "CARGO_PROFILE is set to ${CARGO_PROFILE}"
docker buildx create \
--name container \
--driver=docker-container

docker buildx build -f docker/Dockerfile \
--build-arg "GIT_SHA=${BUILDKITE_COMMIT}" -t "${ghcraddr}:${BUILDKITE_COMMIT}-${arch}" \
--build-arg "GIT_SHA=${BUILDKITE_COMMIT}" \
--build-arg "CARGO_PROFILE=${CARGO_PROFILE}" \
-t "${ghcraddr}:${BUILDKITE_COMMIT}-${arch}" \
--progress plain \
--builder=container \
--load \
Expand Down
2 changes: 2 additions & 0 deletions ci/scripts/multi-arch-docker.sh
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ if [ "${SKIP_TARGET_AARCH64:-false}" != "true" ]; then
arches+=("aarch64")
fi

echo "--- arches: ${arches[*]}"

# push images to gchr
function pushGchr() {
GHCRTAG="${ghcraddr}:$1"
Expand Down
33 changes: 33 additions & 0 deletions ci/workflows/docker-arm-fast.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
auto-retry: &auto-retry
automatic:
# Agent terminated because the AWS EC2 spot instance killed by AWS.
- signal_reason: agent_stop
limit: 3

steps:
- label: "docker-build-push: aarch64"
if: build.env("SKIP_TARGET_AARCH64") != "true"
command: "CARGO_PROFILE=patch-production ci/scripts/docker.sh"
key: "build-aarch64"
plugins:
- seek-oss/aws-sm#v2.3.1:
env:
GHCR_USERNAME: ghcr-username
GHCR_TOKEN: ghcr-token
DOCKER_TOKEN: docker-token
GITHUB_TOKEN: github-token
agents:
queue: "linux-arm64"
retry: *auto-retry

- label: "multi-arch-image-create-push"
command: "SKIP_TARGET_AMD64=true ci/scripts/multi-arch-docker.sh"
depends_on:
- "build-aarch64"
plugins:
- seek-oss/aws-sm#v2.3.1:
env:
GHCR_USERNAME: ghcr-username
GHCR_TOKEN: ghcr-token
DOCKER_TOKEN: docker-token
retry: *auto-retry
11 changes: 7 additions & 4 deletions docker/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -63,18 +63,21 @@ FROM rust-base AS rust-builder
ARG GIT_SHA
ENV GIT_SHA=$GIT_SHA

ARG CARGO_PROFILE
ENV CARGO_PROFILE=$CARGO_PROFILE

COPY ./ /risingwave
WORKDIR /risingwave

ENV ENABLE_BUILD_DASHBOARD=1
ENV OPENSSL_STATIC=1

RUN cargo fetch && \
cargo build -p risingwave_cmd_all --profile production --features "rw-static-link" --features all-udf && \
cargo build -p risingwave_cmd_all --profile ${CARGO_PROFILE} --features "rw-static-link" --features all-udf && \
mkdir -p /risingwave/bin && \
mv /risingwave/target/production/risingwave /risingwave/bin/ && \
mv /risingwave/target/production/risingwave.dwp /risingwave/bin/ && \
cp ./target/production/build/tikv-jemalloc-sys-*/out/build/bin/jeprof /risingwave/bin/ && \
mv /risingwave/target/${CARGO_PROFILE}/risingwave /risingwave/bin/ && \
mv /risingwave/target/${CARGO_PROFILE}/risingwave.dwp /risingwave/bin/ && \
cp ./target/${CARGO_PROFILE}/build/tikv-jemalloc-sys-*/out/build/bin/jeprof /risingwave/bin/ && \
chmod +x /risingwave/bin/jeprof && \
mkdir -p /risingwave/lib && cargo clean

Expand Down
2 changes: 1 addition & 1 deletion docker/dashboards/risingwave-dev-dashboard.json

Large diffs are not rendered by default.

69 changes: 69 additions & 0 deletions e2e_test/source/cdc_inline/auto_schema_change_mysql.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
control substitution on

system ok
mysql -e "DROP DATABASE IF EXISTS mytest; CREATE DATABASE mytest;"

system ok
mysql -e "
USE mytest;
DROP TABLE IF EXISTS customers;
CREATE TABLE customers(
id BIGINT PRIMARY KEY,
modified DATETIME,
custinfo JSON
);
ALTER TABLE customers ADD INDEX zipsa( (CAST(custinfo->'zipcode' AS UNSIGNED ARRAY)) );
"

statement ok
create source mysql_source with (
connector = 'mysql-cdc',
hostname = '${MYSQL_HOST:localhost}',
port = '${MYSQL_TCP_PORT:8306}',
username = 'root',
password = '${MYSQL_PWD:}',
database.name = 'mytest',
server.id = '5701',
auto.schema.change = 'true'
);

statement ok
create table rw_customers (id bigint, modified timestamp, custinfo jsonb, primary key (id)) from mysql_source table 'mytest.customers';

# Name, Type, Is Hidden, Description
query TTTT
describe rw_customers;
----
id bigint false NULL
modified timestamp without time zone false NULL
custinfo jsonb false NULL
primary key id NULL NULL
distribution key id NULL NULL
table description rw_customers NULL NULL


system ok
mysql -e "
USE mytest;
ALTER TABLE customers ADD COLUMN v1 VARCHAR(255);
ALTER TABLE customers ADD COLUMN v2 double(5,2);
"

sleep 3s

# Name, Type, Is Hidden, Description
query TTTT
describe rw_customers;
----
id bigint false NULL
modified timestamp without time zone false NULL
custinfo jsonb false NULL
v1 character varying false NULL
v2 double precision false NULL
primary key id NULL NULL
distribution key id NULL NULL
table description rw_customers NULL NULL


statement ok
drop source mysql_source cascade;
21 changes: 11 additions & 10 deletions grafana/risingwave-dev-dashboard.dashboard.py
Original file line number Diff line number Diff line change
Expand Up @@ -570,14 +570,15 @@ def section_compaction(outer_panels):

def section_object_storage(outer_panels):
panels = outer_panels.sub_panel()
operation_rate_blacklist = (
"type!~'streaming_upload_write_bytes|streaming_read_read_bytes|streaming_read'"
)
operation_duration_blacklist = "type!~'streaming_upload_write_bytes|streaming_read'"
write_op_filter = "type=~'upload|delete'"
read_op_filter = "type=~'read|readv|list|metadata'"
s3_request_cost_op1 = "type=~'read|streaming_read_start|streaming_read_init'"
s3_request_cost_op2 = "type=~'upload|streaming_upload|streaming_upload_start|s3_upload_part|streaming_upload_finish|list'"
operation_rate_blacklist = "type!~'streaming_read'"
operation_duration_blacklist = "type!~'streaming_read'"
put_op_types = ["upload", "streaming_upload"]
post_op_types = ["streaming_upload_init", "streaming_upload_finish", "delete_objects"]
list_op_types = ["list"]
delete_op_types = ["delete"]
read_op_types = ["read", "streaming_read_init", "metadata"]
write_op_filter = f"type=~'({'|'.join(put_op_types + post_op_types + list_op_types + delete_op_types)})'"
read_op_filter = f"type=~'({'|'.join(read_op_types)})'"
return [
outer_panels.row_collapsed(
"Object Storage",
Expand Down Expand Up @@ -685,11 +686,11 @@ def section_object_storage(outer_panels):
True,
),
panels.target(
f"sum({metric('object_store_operation_latency_count', s3_request_cost_op1)}) * 0.0004 / 1000",
f"sum({metric('object_store_operation_latency_count', read_op_filter)}) * 0.0004 / 1000",
"GET, SELECT, and all other Requests Cost",
),
panels.target(
f"sum({metric('object_store_operation_latency_count', s3_request_cost_op2)}) * 0.005 / 1000",
f"sum({metric('object_store_operation_latency_count', write_op_filter)}) * 0.005 / 1000",
"PUT, COPY, POST, LIST Requests Cost",
),
],
Expand Down
2 changes: 1 addition & 1 deletion grafana/risingwave-dev-dashboard.json

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ table.include.list=${database.name}.${table.name:-*}
schema.history.internal.store.only.captured.tables.ddl=true
schema.history.internal.store.only.captured.databases.ddl=true
# default to disable schema change events
include.schema.changes=${debezium.include.schema.changes:-false}
include.schema.changes=${auto.schema.change:-false}
database.server.id=${server.id}
# default to use unencrypted connection
database.ssl.mode=${ssl.mode:-disabled}
Expand Down
5 changes: 5 additions & 0 deletions proto/catalog.proto
Original file line number Diff line number Diff line change
Expand Up @@ -411,6 +411,11 @@ message Table {
// conflict" operations.
optional uint32 version_column_index = 38;

// The unique identifier of the upstream table if it is a CDC table.
// It will be used in auto schema change to get the Table which mapped to the
// upstream table.
optional string cdc_table_id = 39;

// Per-table catalog version, used by schema change. `None` for internal
// tables and tests. Not to be confused with the global catalog version for
// notification service.
Expand Down
2 changes: 2 additions & 0 deletions proto/common.proto
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ message WorkerNode {
bool is_streaming = 1;
bool is_serving = 2;
bool is_unschedulable = 3;
// This is used for frontend node to register its rpc address
string internal_rpc_host_addr = 4;
}
message Resource {
string rw_version = 1;
Expand Down
9 changes: 8 additions & 1 deletion proto/ddl_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -454,14 +454,20 @@ message TableSchemaChange {
}

TableChangeType change_type = 1;
string cdc_table_name = 2;
string cdc_table_id = 2;
repeated plan_common.ColumnCatalog columns = 3;
}

message SchemaChangeEnvelope {
repeated TableSchemaChange table_changes = 1;
}

message AutoSchemaChangeRequest {
SchemaChangeEnvelope schema_change = 1;
}

message AutoSchemaChangeResponse {}

service DdlService {
rpc CreateDatabase(CreateDatabaseRequest) returns (CreateDatabaseResponse);
rpc DropDatabase(DropDatabaseRequest) returns (DropDatabaseResponse);
Expand Down Expand Up @@ -500,4 +506,5 @@ service DdlService {
rpc GetTables(GetTablesRequest) returns (GetTablesResponse);
rpc Wait(WaitRequest) returns (WaitResponse);
rpc CommentOn(CommentOnRequest) returns (CommentOnResponse);
rpc AutoSchemaChange(AutoSchemaChangeRequest) returns (AutoSchemaChangeResponse);
}
23 changes: 23 additions & 0 deletions proto/frontend_service.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
syntax = "proto3";

package frontend_service;

import "ddl_service.proto";

option java_package = "com.risingwave.proto";
option optimize_for = SPEED;

message GetTableReplacePlanRequest {
uint32 database_id = 1;
uint32 owner = 2;
string table_name = 3;
ddl_service.TableSchemaChange table_change = 4;
}

message GetTableReplacePlanResponse {
ddl_service.ReplaceTablePlan replace_plan = 1;
}

service FrontendService {
rpc GetTableReplacePlan(GetTableReplacePlanRequest) returns (GetTableReplacePlanResponse);
}
2 changes: 2 additions & 0 deletions proto/meta.proto
Original file line number Diff line number Diff line change
Expand Up @@ -312,6 +312,8 @@ message AddWorkerNodeRequest {
bool is_streaming = 2;
bool is_serving = 3;
bool is_unschedulable = 4;
// This is used for frontend node to register its rpc address
string internal_rpc_host_addr = 5;
}
common.WorkerType worker_type = 1;
common.HostAddress host = 2;
Expand Down
1 change: 1 addition & 0 deletions src/batch/src/executor/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,7 @@ impl SourceExecutor {
rate_limit: None,
},
ConnectorProperties::default(),
None,
));
let stream = self
.source
Expand Down
2 changes: 2 additions & 0 deletions src/batch/src/worker_manager/worker_node_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -430,6 +430,7 @@ mod tests {
is_unschedulable: false,
is_serving: true,
is_streaming: true,
internal_rpc_host_addr: "".to_string(),
}),
transactional_id: Some(1),
..Default::default()
Expand All @@ -444,6 +445,7 @@ mod tests {
is_unschedulable: false,
is_serving: true,
is_streaming: false,
internal_rpc_host_addr: "".to_string(),
}),
transactional_id: Some(2),
..Default::default()
Expand Down
2 changes: 1 addition & 1 deletion src/cmd_all/src/standalone.rs
Original file line number Diff line number Diff line change
Expand Up @@ -505,7 +505,7 @@ mod test {
],
),
prometheus_listener_addr: "127.0.0.1:1234",
health_check_listener_addr: "127.0.0.1:6786",
frontend_rpc_listener_addr: "127.0.0.1:6786",
config_path: "src/config/test.toml",
metrics_level: None,
enable_barrier_read: None,
Expand Down
8 changes: 8 additions & 0 deletions src/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1027,6 +1027,10 @@ pub struct StreamingDeveloperConfig {
/// If not specified, the value of `server.connection_pool_size` will be used.
#[serde(default = "default::developer::stream_exchange_connection_pool_size")]
pub exchange_connection_pool_size: Option<u16>,

/// A flag to allow disabling the auto schema change handling
#[serde(default = "default::developer::stream_enable_auto_schema_change")]
pub enable_auto_schema_change: bool,
}

/// The subsections `[batch.developer]`.
Expand Down Expand Up @@ -1903,6 +1907,10 @@ pub mod default {
pub fn enable_actor_tokio_metrics() -> bool {
false
}

pub fn stream_enable_auto_schema_change() -> bool {
true
}
}

pub use crate::system_param::default as system;
Expand Down
7 changes: 4 additions & 3 deletions src/common/src/vnode_mapping/vnode_placement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,7 @@ mod tests {
is_unschedulable: false,
is_serving: true,
is_streaming: false,
internal_rpc_host_addr: "".to_string(),
};

let count_same_vnode_mapping = |wm1: &WorkerSlotMapping, wm2: &WorkerSlotMapping| {
Expand All @@ -231,7 +232,7 @@ mod tests {
let worker_1 = WorkerNode {
id: 1,
parallelism: 1,
property: Some(serving_property),
property: Some(serving_property.clone()),
..Default::default()
};

Expand All @@ -246,7 +247,7 @@ mod tests {
let worker_2 = WorkerNode {
id: 2,
parallelism: 50,
property: Some(serving_property),
property: Some(serving_property.clone()),
..Default::default()
};

Expand All @@ -265,7 +266,7 @@ mod tests {
let worker_3 = WorkerNode {
id: 3,
parallelism: 60,
property: Some(serving_property),
property: Some(serving_property.clone()),
..Default::default()
};
let re_pu_mapping_2 = place_vnode(
Expand Down
Loading

0 comments on commit 1294db7

Please sign in to comment.