Skip to content

Commit

Permalink
Merge branch 'main' into tab/create-secret
Browse files Browse the repository at this point in the history
  • Loading branch information
tabVersion committed May 6, 2024
2 parents 157f156 + 3a1e259 commit 1a75e35
Show file tree
Hide file tree
Showing 77 changed files with 1,417 additions and 721 deletions.
14 changes: 7 additions & 7 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 5 additions & 1 deletion dashboard/pages/await_tree.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,12 @@ export default function AwaitTreeDump() {
.entries()
.map(([k, v]) => `[Barrier ${k}]\n${v}`)
.join("\n")
const barrierWorkerState = _(response.barrierWorkerState)
.entries()
.map(([k, v]) => `[BarrierWorkerState ${k}]\n${v}`)
.join("\n")

result = `${title}\n\n${actorTraces}\n${rpcTraces}\n${compactionTraces}\n${barrierTraces}`
result = `${title}\n\n${actorTraces}\n${rpcTraces}\n${compactionTraces}\n${barrierTraces}\n${barrierWorkerState}`
} catch (e: any) {
result = `${title}\n\nERROR: ${e.message}\n${e.cause}`
}
Expand Down
2 changes: 1 addition & 1 deletion docker/dashboards/risingwave-dev-dashboard.json

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion docker/dashboards/risingwave-user-dashboard.json

Large diffs are not rendered by default.

11 changes: 6 additions & 5 deletions e2e_test/source/cdc/cdc.validate.postgres.slt
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
# CDC Postgres source validate test
control substitution on

# invalid username
statement error
Expand Down Expand Up @@ -230,11 +231,11 @@ create table shipments (
PRIMARY KEY (shipment_id)
) with (
connector = 'postgres-cdc',
hostname = 'db',
port = '5432',
username = 'postgres',
password = 'postgres',
database.name = 'cdc_test',
hostname = '${PGHOST:localhost}',
port = '${PGPORT:5432}',
username = '${PGUSER:$USER}',
password = '${PGPASSWORD:}',
database.name = '${PGDATABASE:postgres}',
schema.name = 'public',
table.name = 'shipments',
slot.name = 'shipments'
Expand Down
126 changes: 63 additions & 63 deletions grafana/risingwave-dev-dashboard.dashboard.py

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion grafana/risingwave-dev-dashboard.json

Large diffs are not rendered by default.

20 changes: 10 additions & 10 deletions grafana/risingwave-user-dashboard.dashboard.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ def section_overview(panels):
)
+ [
panels.target(
f"rate({metric('meta_barrier_duration_seconds_sum')}[$__rate_interval]) / rate({metric('meta_barrier_duration_seconds_count')}[$__rate_interval])",
f"rate({metric('meta_barrier_duration_seconds_sum')}[$__rate_interval]) / rate({metric('meta_barrier_duration_seconds_count')}[$__rate_interval]) > 0",
"barrier_latency_avg",
),
],
Expand Down Expand Up @@ -409,35 +409,35 @@ def section_memory(outer_panels):
"",
[
panels.target(
f"(sum(rate({metric('stream_join_lookup_miss_count')}[$__rate_interval])) by (side, join_table_id, degree_table_id, actor_id) ) / (sum(rate({metric('stream_join_lookup_total_count')}[$__rate_interval])) by (side, join_table_id, degree_table_id, actor_id))",
f"(sum(rate({metric('stream_join_lookup_miss_count')}[$__rate_interval])) by (side, join_table_id, degree_table_id, actor_id) ) / (sum(rate({metric('stream_join_lookup_total_count')}[$__rate_interval])) by (side, join_table_id, degree_table_id, actor_id)) >=0 ",
"join executor cache miss ratio - - {{side}} side, join_table_id {{join_table_id}} degree_table_id {{degree_table_id}} actor {{actor_id}}",
),
panels.target(
f"(sum(rate({metric('stream_agg_lookup_miss_count')}[$__rate_interval])) by (table_id, actor_id) ) / (sum(rate({metric('stream_agg_lookup_total_count')}[$__rate_interval])) by (table_id, actor_id))",
f"(sum(rate({metric('stream_agg_lookup_miss_count')}[$__rate_interval])) by (table_id, actor_id) ) / (sum(rate({metric('stream_agg_lookup_total_count')}[$__rate_interval])) by (table_id, actor_id)) >=0 ",
"Agg cache miss ratio - table {{table_id}} actor {{actor_id}} ",
),
panels.target(
f"(sum(rate({metric('stream_agg_distinct_cache_miss_count')}[$__rate_interval])) by (table_id, actor_id) ) / (sum(rate({metric('stream_agg_distinct_total_cache_count')}[$__rate_interval])) by (table_id, actor_id))",
f"(sum(rate({metric('stream_agg_distinct_cache_miss_count')}[$__rate_interval])) by (table_id, actor_id) ) / (sum(rate({metric('stream_agg_distinct_total_cache_count')}[$__rate_interval])) by (table_id, actor_id)) >=0",
"Distinct agg cache miss ratio - table {{table_id}} actor {{actor_id}} ",
),
panels.target(
f"(sum(rate({metric('stream_group_top_n_cache_miss_count')}[$__rate_interval])) by (table_id, actor_id) ) / (sum(rate({metric('stream_group_top_n_total_query_cache_count')}[$__rate_interval])) by (table_id, actor_id))",
f"(sum(rate({metric('stream_group_top_n_cache_miss_count')}[$__rate_interval])) by (table_id, actor_id) ) / (sum(rate({metric('stream_group_top_n_total_query_cache_count')}[$__rate_interval])) by (table_id, actor_id)) >=0",
"Stream group top n cache miss ratio - table {{table_id}} actor {{actor_id}} ",
),
panels.target(
f"(sum(rate({metric('stream_group_top_n_appendonly_cache_miss_count')}[$__rate_interval])) by (table_id, actor_id) ) / (sum(rate({metric('stream_group_top_n_appendonly_total_query_cache_count')}[$__rate_interval])) by (table_id, actor_id))",
f"(sum(rate({metric('stream_group_top_n_appendonly_cache_miss_count')}[$__rate_interval])) by (table_id, actor_id) ) / (sum(rate({metric('stream_group_top_n_appendonly_total_query_cache_count')}[$__rate_interval])) by (table_id, actor_id)) >=0",
"Stream group top n appendonly cache miss ratio - table {{table_id}} actor {{actor_id}} ",
),
panels.target(
f"(sum(rate({metric('stream_lookup_cache_miss_count')}[$__rate_interval])) by (table_id, actor_id) ) / (sum(rate({metric('stream_lookup_total_query_cache_count')}[$__rate_interval])) by (table_id, actor_id))",
f"(sum(rate({metric('stream_lookup_cache_miss_count')}[$__rate_interval])) by (table_id, actor_id) ) / (sum(rate({metric('stream_lookup_total_query_cache_count')}[$__rate_interval])) by (table_id, actor_id)) >=0",
"Stream lookup cache miss ratio - table {{table_id}} actor {{actor_id}} ",
),
panels.target(
f"(sum(rate({metric('stream_temporal_join_cache_miss_count')}[$__rate_interval])) by (table_id, actor_id) ) / (sum(rate({metric('stream_temporal_join_total_query_cache_count')}[$__rate_interval])) by (table_id, actor_id))",
f"(sum(rate({metric('stream_temporal_join_cache_miss_count')}[$__rate_interval])) by (table_id, actor_id) ) / (sum(rate({metric('stream_temporal_join_total_query_cache_count')}[$__rate_interval])) by (table_id, actor_id)) >=0",
"Stream temporal join cache miss ratio - table {{table_id}} actor {{actor_id}} ",
),
panels.target(
f"1 - (sum(rate({metric('stream_materialize_cache_hit_count')}[$__rate_interval])) by (table_id, actor_id) ) / (sum(rate({metric('stream_materialize_cache_total_count')}[$__rate_interval])) by (table_id, actor_id))",
f"1 - (sum(rate({metric('stream_materialize_cache_hit_count')}[$__rate_interval])) by (table_id, actor_id) ) / (sum(rate({metric('stream_materialize_cache_total_count')}[$__rate_interval])) by (table_id, actor_id)) >=0",
"materialize executor cache miss ratio - table {{table_id}} - actor {{actor_id}} {{%s}}"
% NODE_LABEL,
),
Expand Down Expand Up @@ -648,7 +648,7 @@ def section_storage(outer_panels):
[50, 99],
),
panels.target(
f"sum by(le, {COMPONENT_LABEL}) (rate({metric('state_store_sync_size_sum')}[$__rate_interval])) / sum by(le, {COMPONENT_LABEL}) (rate({metric('state_store_sync_size_count')}[$__rate_interval]))",
f"sum by(le, {COMPONENT_LABEL}) (rate({metric('state_store_sync_size_sum')}[$__rate_interval])) / sum by(le, {COMPONENT_LABEL}) (rate({metric('state_store_sync_size_count')}[$__rate_interval])) > 0",
"avg - {{%s}}" % COMPONENT_LABEL,
),
],
Expand Down
2 changes: 1 addition & 1 deletion grafana/risingwave-user-dashboard.json

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions integration_tests/kinesis-s3-source/create_mv.sql
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
CREATE MATERIALIZED VIEW ad_ctr AS
CREATE MATERIALIZED VIEW ad_ctr_mv AS
SELECT
ad_clicks.ad_id AS ad_id,
ad_clicks.clicks_count :: NUMERIC / ad_impressions.impressions_count AS ctr
Expand All @@ -23,7 +23,7 @@ FROM
ai.ad_id
) AS ad_clicks ON ad_impressions.ad_id = ad_clicks.ad_id;

CREATE MATERIALIZED VIEW ad_ctr_5min AS
CREATE MATERIALIZED VIEW ad_ctr_5min_mv AS
SELECT
ac.ad_id AS ad_id,
ac.clicks_count :: NUMERIC / ai.impressions_count AS ctr,
Expand Down
2 changes: 1 addition & 1 deletion integration_tests/kinesis-s3-source/data_check
Original file line number Diff line number Diff line change
@@ -1 +1 @@
ad_impression,ad_click,ad_ctr,ad_ctr_5min
ad_ctr,ad_ctr_5min
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ public class DbzConnectorConfig {

/* MySQL configs */
public static final String MYSQL_SERVER_ID = "server.id";
public static final String MYSQL_SSL_MODE = "ssl.mode";

/* Postgres configs */
public static final String PG_SLOT_NAME = "slot.name";
Expand Down Expand Up @@ -231,8 +232,8 @@ public DbzConnectorConfig(
ConfigurableOffsetBackingStore.OFFSET_STATE_VALUE, startOffset);
}

var mongodbUrl = userProps.get("mongodb.url");
var collection = userProps.get("collection.name");
var mongodbUrl = userProps.get(MongoDb.MONGO_URL);
var collection = userProps.get(MongoDb.MONGO_COLLECTION_NAME);
var connectionStr = new ConnectionString(mongodbUrl);
var connectorName =
String.format(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,7 @@
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.*;

public class MySqlValidator extends DatabaseValidator implements AutoCloseable {
private final Map<String, String> userProps;
Expand Down Expand Up @@ -51,9 +48,14 @@ public MySqlValidator(
var dbName = userProps.get(DbzConnectorConfig.DB_NAME);
var jdbcUrl = ValidatorUtils.getJdbcUrl(SourceTypeE.MYSQL, dbHost, dbPort, dbName);

var user = userProps.get(DbzConnectorConfig.USER);
var password = userProps.get(DbzConnectorConfig.PASSWORD);
this.jdbcConnection = DriverManager.getConnection(jdbcUrl, user, password);
var properties = new Properties();
properties.setProperty("user", userProps.get(DbzConnectorConfig.USER));
properties.setProperty("password", userProps.get(DbzConnectorConfig.PASSWORD));
properties.setProperty(
"sslMode", userProps.getOrDefault(DbzConnectorConfig.MYSQL_SSL_MODE, "DISABLED"));
properties.setProperty("allowPublicKeyRetrieval", "true");

this.jdbcConnection = DriverManager.getConnection(jdbcUrl, properties);
this.isCdcSourceJob = isCdcSourceJob;
this.isBackfillTable = isBackfillTable;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ schema.history.internal.store.only.captured.databases.ddl=true
# default to disable schema change events
include.schema.changes=${debezium.include.schema.changes:-false}
database.server.id=${server.id}
# default to use unencrypted connection
database.ssl.mode=${ssl.mode:-disabled}
# default heartbeat interval 60 seconds
heartbeat.interval.ms=${debezium.heartbeat.interval.ms:-60000}
# In sharing cdc mode, we will subscribe to multiple tables in the given database,
Expand Down
4 changes: 2 additions & 2 deletions java/connector-node/risingwave-sink-iceberg/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -113,8 +113,8 @@
<artifactId>postgresql</artifactId>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<groupId>com.mysql</groupId>
<artifactId>mysql-connector-j</artifactId>
</dependency>
<dependency>
<groupId>org.xerial</groupId>
Expand Down
4 changes: 2 additions & 2 deletions java/connector-node/risingwave-sink-jdbc/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@
<artifactId>postgresql</artifactId>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<groupId>com.mysql</groupId>
<artifactId>mysql-connector-j</artifactId>
</dependency>

</dependencies>
Expand Down
4 changes: 2 additions & 2 deletions java/connector-node/risingwave-source-cdc/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,8 @@

<!-- database dependencies -->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<groupId>com.mysql</groupId>
<artifactId>mysql-connector-j</artifactId>
</dependency>
<dependency>
<groupId>com.zendesk</groupId>
Expand Down
11 changes: 3 additions & 8 deletions java/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@
<flink.version>1.18.0</flink.version>
<testcontainers.version>1.17.6</testcontainers.version>
<postgresql.version>42.5.5</postgresql.version>
<mysql.connector.java.version>8.0.28</mysql.connector.java.version>
<mysql.connector.java.version>8.0.33</mysql.connector.java.version>
<mongodb.driver.sync.version>4.11.1</mongodb.driver.sync.version>
<sqlite.version>3.45.0.0</sqlite.version>
<aws.version>2.21.42</aws.version>
Expand Down Expand Up @@ -178,8 +178,8 @@
<version>${postgresql.version}</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<groupId>com.mysql</groupId>
<artifactId>mysql-connector-j</artifactId>
<version>${mysql.connector.java.version}</version>
</dependency>
<dependency>
Expand Down Expand Up @@ -360,11 +360,6 @@
<artifactId>apache-client</artifactId>
<version>${aws.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>${hadoop.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-metastore</artifactId>
Expand Down
1 change: 1 addition & 0 deletions proto/monitor_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ message StackTraceResponse {
map<string, string> rpc_traces = 2;
map<string, string> compaction_task_traces = 3;
map<uint64, string> inflight_barrier_traces = 4;
map<uint32, string> barrier_worker_state = 5;
}

// CPU profiling
Expand Down
14 changes: 14 additions & 0 deletions proto/stream_plan.proto
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,16 @@ message CombinedMutation {
repeated BarrierMutation mutations = 1;
}

message CreateSubscriptionMutation {
uint32 subscription_id = 1;
uint32 upstream_mv_table_id = 2;
}

message DropSubscriptionMutation {
uint32 subscription_id = 1;
uint32 upstream_mv_table_id = 2;
}

message BarrierMutation {
oneof mutation {
// Add new dispatchers to some actors, used for creating materialized views.
Expand All @@ -111,6 +121,10 @@ message BarrierMutation {
ResumeMutation resume = 8;
// Throttle specific source exec or chain exec.
ThrottleMutation throttle = 10;
// Create subscription on mv
CreateSubscriptionMutation create_subscription = 11;
// Drop subscription on mv
DropSubscriptionMutation drop_subscription = 12;
// Combined mutation.
CombinedMutation combined = 100;
}
Expand Down
10 changes: 9 additions & 1 deletion proto/stream_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,18 @@ import "stream_plan.proto";
option java_package = "com.risingwave.proto";
option optimize_for = SPEED;

message BuildActorInfo {
stream_plan.StreamActor actor = 1;
message SubscriptionIds {
repeated uint32 subscription_ids = 1;
}
map<uint32, SubscriptionIds> related_subscriptions = 2;
}

// Describe the fragments which will be running on this node
message UpdateActorsRequest {
string request_id = 1;
repeated stream_plan.StreamActor actors = 2;
repeated BuildActorInfo actors = 2;
}

message UpdateActorsResponse {
Expand Down
Loading

0 comments on commit 1a75e35

Please sign in to comment.