Skip to content

Commit

Permalink
Merge branch 'main' into yiming/create-sub-via-add-mutation
Browse files Browse the repository at this point in the history
  • Loading branch information
wenym1 committed Aug 5, 2024
2 parents 94d4ce4 + c4adffc commit 025e727
Show file tree
Hide file tree
Showing 117 changed files with 1,109 additions and 447 deletions.
24 changes: 16 additions & 8 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 8 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -247,11 +247,16 @@ rw_iter_util = { path = "src/utils/iter_util" }
[workspace.lints.rust]
# `forbid` will also prevent the misuse of `#[allow(unused)]`
unused_must_use = "forbid"
future_incompatible = "warn"
nonstandard_style = "warn"
rust_2018_idioms = "warn"
future_incompatible = { level = "warn", priority = -1 }
nonstandard_style = { level = "warn", priority = -1 }
rust_2018_idioms = { level = "warn", priority = -1 }
# Backward compatibility is not important for an application.
async_fn_in_trait = "allow"
unexpected_cfgs = { level = "warn", check-cfg = [
'cfg(madsim)',
'cfg(coverage)',
'cfg(dashboard_built)',
] }

[workspace.lints.clippy]
uninlined_format_args = "allow"
Expand Down
2 changes: 1 addition & 1 deletion Makefile.toml
Original file line number Diff line number Diff line change
Expand Up @@ -369,7 +369,7 @@ ln -s "$(pwd)/target/${RISEDEV_BUILD_TARGET_DIR}${BUILD_MODE_DIR}/risingwave" "$

[tasks.post-build-risingwave]
category = "RiseDev - Build"
description = "Copy RisngWave binaries to bin"
description = "Copy RisingWave binaries to bin"
condition = { env_set = ["ENABLE_BUILD_RUST"] }
dependencies = [
"link-all-in-one-binaries",
Expand Down
2 changes: 1 addition & 1 deletion ci/build-ci-image.sh
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ cat ../rust-toolchain
# shellcheck disable=SC2155

# REMEMBER TO ALSO UPDATE ci/docker-compose.yml
export BUILD_ENV_VERSION=v20240729
export BUILD_ENV_VERSION=v20240731

export BUILD_TAG="public.ecr.aws/w1p7b4n3/rw-build-env:${BUILD_ENV_VERSION}"

Expand Down
10 changes: 5 additions & 5 deletions ci/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ services:
retries: 5

source-test-env:
image: public.ecr.aws/w1p7b4n3/rw-build-env:v20240729
image: public.ecr.aws/w1p7b4n3/rw-build-env:v20240731
depends_on:
- mysql
- db
Expand All @@ -84,7 +84,7 @@ services:
- ..:/risingwave

sink-test-env:
image: public.ecr.aws/w1p7b4n3/rw-build-env:v20240729
image: public.ecr.aws/w1p7b4n3/rw-build-env:v20240731
depends_on:
- mysql
- db
Expand All @@ -107,12 +107,12 @@ services:


rw-build-env:
image: public.ecr.aws/w1p7b4n3/rw-build-env:v20240729
image: public.ecr.aws/w1p7b4n3/rw-build-env:v20240731
volumes:
- ..:/risingwave

ci-flamegraph-env:
image: public.ecr.aws/w1p7b4n3/rw-build-env:v20240729
image: public.ecr.aws/w1p7b4n3/rw-build-env:v20240731
# NOTE(kwannoel): This is used in order to permit
# syscalls for `nperf` (perf_event_open),
# so it can do CPU profiling.
Expand All @@ -123,7 +123,7 @@ services:
- ..:/risingwave

regress-test-env:
image: public.ecr.aws/w1p7b4n3/rw-build-env:v20240729
image: public.ecr.aws/w1p7b4n3/rw-build-env:v20240731
depends_on:
db:
condition: service_healthy
Expand Down
2 changes: 1 addition & 1 deletion ci/rust-toolchain
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,4 @@
# 3. (optional) **follow the instructions in lints/README.md** to update the toolchain and dependencies for lints

[toolchain]
channel = "nightly-2024-03-12"
channel = "nightly-2024-06-06"
2 changes: 1 addition & 1 deletion ci/scripts/build-other.sh
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@ set -euo pipefail

source ci/scripts/common.sh


echo "--- Build Rust UDF"
cd e2e_test/udf/wasm
rustup target add wasm32-wasi
cargo build --release
cd ../../..

Expand Down
3 changes: 3 additions & 0 deletions clippy.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,3 +39,6 @@ doc-valid-idents = [
avoid-breaking-exported-api = false
upper-case-acronyms-aggressive = true
too-many-arguments-threshold = 10
ignore-interior-mutability = [
"risingwave_frontend::expr::ExprImpl" # XXX: Where does ExprImpl have interior mutability?
]
2 changes: 1 addition & 1 deletion e2e_test/source/basic/ddl.slt
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ create source s (
properties.bootstrap.server = 'message_queue:29092'
) FORMAT PLAIN ENCODE JSON;

statement error properties `scan_startup_mode` only support earliest and latest or leave it empty
statement error properties `scan_startup_mode` only supports earliest and latest or leaving it empty
create source invalid_startup_mode (
column1 varchar
) with (
Expand Down
2 changes: 1 addition & 1 deletion e2e_test/source/basic/old_row_format_syntax/ddl.slt
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ create source s (
properties.bootstrap.server = 'message_queue:29092'
) ROW FORMAT JSON;

statement error properties `scan_startup_mode` only support earliest and latest or leave it empty
statement error properties `scan_startup_mode` only supports earliest and latest or leaving it empty
create source invalid_startup_mode (
column1 varchar
) with (
Expand Down
19 changes: 19 additions & 0 deletions e2e_test/source/cdc_inline/auto_schema_map_mysql.slt
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@ mysql --protocol=tcp -u root mytest -e "
INSERT INTO mysql_types_test VALUES ( True, 1, -128, -32767, -8388608, -2147483647, -9223372036854775807, -10.0, -9999.999999, -10000.0, 'a', 'b', '', '', '1001-01-01', '00:00:00', '1998-01-01 00:00:00.000000', '1970-01-01 00:00:01', 'sad', '[3,4]');
"

statement ok
ALTER SYSTEM SET license_key TO '';

statement ok
create source mysql_source with (
connector = 'mysql-cdc',
Expand All @@ -51,6 +54,22 @@ create source mysql_source with (
server.id = '5601'
);

statement error
create table rw_customers (*) from mysql_source table 'mytest.customers';
----
db error: ERROR: Failed to run the query

Caused by:
Not supported: feature CdcTableSchemaMap is only available for tier Paid and above, while the current tier is Free

Hint: You may want to set a license key with `ALTER SYSTEM SET license_key = '...';` command.
HINT: Please define the schema manually



statement ok
ALTER SYSTEM SET license_key TO DEFAULT;

statement ok
create table rw_customers (*) from mysql_source table 'mytest.customers';

Expand Down
4 changes: 2 additions & 2 deletions e2e_test/source_inline/pubsub/prepare-data.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
#!/usr/bin/env -S cargo -Zscript
```cargo
---cargo
[dependencies]
anyhow = "1"
google-cloud-googleapis = { version = "0.13", features = ["pubsub"] }
Expand All @@ -13,7 +13,7 @@ tokio = { version = "0.2", package = "madsim-tokio", features = [
"signal",
"fs",
] }
```
---

use google_cloud_googleapis::pubsub::v1::PubsubMessage;
use google_cloud_pubsub::client::{Client, ClientConfig};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ public DbzCdcEngine(
sourceId,
heartbeatTopicPrefix,
transactionTopic,
topicPrefix,
new ArrayBlockingQueue<>(DEFAULT_QUEUE_CAPACITY));

// Builds a debezium engine but not start it
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import com.risingwave.connector.api.source.SourceTypeE;
import com.risingwave.connector.cdc.debezium.internal.DebeziumOffset;
import com.risingwave.connector.cdc.debezium.internal.DebeziumOffsetSerializer;
import com.risingwave.connector.source.common.CdcConnectorException;
import com.risingwave.proto.ConnectorServiceProto.CdcMessage;
import com.risingwave.proto.ConnectorServiceProto.GetEventStreamResponse;
import io.debezium.connector.postgresql.PostgresOffsetContext;
Expand All @@ -43,6 +44,7 @@ enum EventType {
HEARTBEAT,
TRANSACTION,
DATA,
SCHEMA_CHANGE,
}

public class DbzChangeEventConsumer
Expand All @@ -57,6 +59,7 @@ public class DbzChangeEventConsumer
private final JsonConverter keyConverter;
private final String heartbeatTopicPrefix;
private final String transactionTopic;
private final String schemaChangeTopic;

private volatile DebeziumEngine.RecordCommitter<ChangeEvent<SourceRecord, SourceRecord>>
currentRecordCommitter;
Expand All @@ -66,12 +69,14 @@ public class DbzChangeEventConsumer
long sourceId,
String heartbeatTopicPrefix,
String transactionTopic,
String schemaChangeTopic,
BlockingQueue<GetEventStreamResponse> queue) {
this.connector = connector;
this.sourceId = sourceId;
this.outputChannel = queue;
this.heartbeatTopicPrefix = heartbeatTopicPrefix;
this.transactionTopic = transactionTopic;
this.schemaChangeTopic = schemaChangeTopic;
LOG.info("heartbeat topic: {}, trnx topic: {}", heartbeatTopicPrefix, transactionTopic);

// The default JSON converter will output the schema field in the JSON which is unnecessary
Expand Down Expand Up @@ -105,6 +110,8 @@ private EventType getEventType(SourceRecord record) {
return EventType.HEARTBEAT;
} else if (isTransactionMetaEvent(record)) {
return EventType.TRANSACTION;
} else if (isSchemaChangeEvent(record)) {
return EventType.SCHEMA_CHANGE;
} else {
return EventType.DATA;
}
Expand All @@ -122,6 +129,11 @@ private boolean isTransactionMetaEvent(SourceRecord record) {
return topic != null && topic.equals(transactionTopic);
}

private boolean isSchemaChangeEvent(SourceRecord record) {
String topic = record.topic();
return topic != null && topic.equals(schemaChangeTopic);
}

@Override
public void handleBatch(
List<ChangeEvent<SourceRecord, SourceRecord>> events,
Expand Down Expand Up @@ -155,7 +167,8 @@ var record = event.value();
switch (eventType) {
case HEARTBEAT:
{
var message = msgBuilder.build();
var message =
msgBuilder.setMsgType(CdcMessage.CdcMessageType.HEARTBEAT).build();
LOG.debug("heartbeat => {}", message.getOffset());
respBuilder.addEvents(message);
break;
Expand All @@ -168,14 +181,54 @@ var record = event.value();
record.topic(), record.valueSchema(), record.value());
var message =
msgBuilder
.setIsTransactionMeta(true)
.setMsgType(CdcMessage.CdcMessageType.TRANSACTION_META)
.setPayload(new String(payload, StandardCharsets.UTF_8))
.setSourceTsMs(trxTs)
.build();
LOG.debug("transaction => {}", message);
respBuilder.addEvents(message);
break;
}

case SCHEMA_CHANGE:
{
var sourceStruct = ((Struct) record.value()).getStruct("source");
if (sourceStruct == null) {
throw new CdcConnectorException(
"source field is missing in schema change event");
}

// upstream event time
long sourceTsMs = sourceStruct.getInt64("ts_ms");
byte[] payload =
payloadConverter.fromConnectData(
record.topic(), record.valueSchema(), record.value());

// We intentionally don't set the fullTableName for schema change event,
// since it doesn't need to be routed to a specific cdc table
var message =
msgBuilder
.setMsgType(CdcMessage.CdcMessageType.SCHEMA_CHANGE)
.setPayload(new String(payload, StandardCharsets.UTF_8))
.setSourceTsMs(sourceTsMs)
.build();
LOG.debug(
"offset => {}, key => {}, payload => {}",
message.getOffset(),
message.getKey(),
message.getPayload());
respBuilder.addEvents(message);

// emit the schema change event as a single response
respBuilder.setSourceId(sourceId);
var response = respBuilder.build();
outputChannel.put(response);

// reset the response builder
respBuilder = GetEventStreamResponse.newBuilder();
break;
}

case DATA:
{
// Topic naming conventions
Expand All @@ -192,10 +245,11 @@ var record = event.value();
}
// get upstream event time from the "source" field
var sourceStruct = ((Struct) record.value()).getStruct("source");
long sourceTsMs =
sourceStruct == null
? System.currentTimeMillis()
: sourceStruct.getInt64("ts_ms");
if (sourceStruct == null) {
throw new CdcConnectorException(
"source field is missing in data change event");
}
long sourceTsMs = sourceStruct.getInt64("ts_ms");
byte[] payload =
payloadConverter.fromConnectData(
record.topic(), record.valueSchema(), record.value());
Expand All @@ -208,6 +262,7 @@ var record = event.value();
String msgKey = key == null ? "" : new String(key, StandardCharsets.UTF_8);
var message =
msgBuilder
.setMsgType(CdcMessage.CdcMessageType.DATA)
.setFullTableName(fullTableName)
.setPayload(msgPayload)
.setKey(msgKey)
Expand Down
Loading

0 comments on commit 025e727

Please sign in to comment.