Skip to content

Commit

Permalink
Merge branch 'main' into tab/fix-csv
Browse files Browse the repository at this point in the history
  • Loading branch information
tabVersion authored Jul 26, 2023
2 parents 302cefa + a0ebede commit fc63733
Show file tree
Hide file tree
Showing 123 changed files with 2,200 additions and 1,461 deletions.
54 changes: 41 additions & 13 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 10 additions & 0 deletions ci/scripts/e2e-source-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,16 @@ echo "--- Kill cluster"
cargo make ci-kill
pkill -f connector-node

echo "--- e2e, ci-1cn-1fe, protobuf schema registry"
RUST_LOG="info,risingwave_stream=info,risingwave_batch=info,risingwave_storage=info" \
cargo make ci-start ci-1cn-1fe
python3 -m pip install requests protobuf confluent-kafka
python3 e2e_test/schema_registry/pb.py "message_queue:29092" "http://message_queue:8081" "sr_pb_test" 20
sqllogictest -p 4566 -d dev './e2e_test/schema_registry/pb.slt'

echo "--- Kill cluster"
cargo make ci-kill

echo "--- e2e, ci-kafka-plus-pubsub, kafka and pubsub source"
RUST_LOG="info,risingwave_stream=info,risingwave_batch=info,risingwave_storage=info" \
cargo make ci-start ci-pubsub
Expand Down
1 change: 1 addition & 0 deletions ci/workflows/main-cron.yml
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,7 @@ steps:
timeout_in_minutes: 60

- label: "upload micro-benchmark"
if: build.branch == "main" || build.pull_request.labels includes "ci/upload-micro-benchmark"
command:
- "BUILDKITE_BUILD_NUMBER=$BUILDKITE_BUILD_NUMBER ci/scripts/upload-micro-bench-results.sh"
depends_on: "run-micro-benchmarks"
Expand Down
66 changes: 66 additions & 0 deletions e2e_test/schema_registry/pb.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
from protobuf import user_pb2
import sys
from confluent_kafka import Producer
from confluent_kafka.serialization import (
SerializationContext,
MessageField,
)
from confluent_kafka.schema_registry import SchemaRegistryClient
from confluent_kafka.schema_registry.protobuf import ProtobufSerializer


def delivery_report(err, msg):
if err is not None:
print("Delivery failed for User record {}: {}".format(msg.value(), err))


def get_user(i):
return user_pb2.User(
id=i,
name="User_{}".format(i),
address="Address_{}".format(i),
city="City_{}".format(i),
gender=user_pb2.MALE if i % 2 == 0 else user_pb2.FEMALE,
)


def send_to_kafka(producer_conf, schema_registry_conf, topic, num_records):
schema_registry_client = SchemaRegistryClient(schema_registry_conf)
serializer = ProtobufSerializer(
user_pb2.User,
schema_registry_client,
{"use.deprecated.format": False},
)

producer = Producer(producer_conf)
for i in range(num_records):
user = get_user(i)

producer.produce(
topic=topic,
partition=0,
value=serializer(user, SerializationContext(topic, MessageField.VALUE)),
on_delivery=delivery_report,
)
producer.flush()
print("Send {} records to kafka\n".format(num_records))


if __name__ == "__main__":
if len(sys.argv) < 4:
print("pb.py <brokerlist> <schema-registry-url> <topic> <num-records>")
exit(1)

broker_list = sys.argv[1]
schema_registry_url = sys.argv[2]
topic = sys.argv[3]
num_records = int(sys.argv[4])

schema_registry_conf = {"url": schema_registry_url}
producer_conf = {"bootstrap.servers": broker_list}

try:
send_to_kafka(producer_conf, schema_registry_conf, topic, num_records)
except Exception as e:
print("Send Protobuf data to schema registry and kafka failed {}", e)
exit(1)
36 changes: 36 additions & 0 deletions e2e_test/schema_registry/pb.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
# Before running this test, seed data into kafka:
# python3 e2e_test/schema_registry/pb.py

# Create a table.
statement ok
create table sr_pb_test with (
connector = 'kafka',
topic = 'sr_pb_test',
properties.bootstrap.server = 'message_queue:29092',
scan.startup.mode = 'earliest',
message = 'test.User')
FORMAT plain ENCODE protobuf(
schema.registry = 'http://message_queue:8081',
message = 'test.User'
);

# Wait for source
sleep 10s

# Flush into storage
statement ok
flush;

query I
select count(*) from sr_pb_test;
----
20

query II
select min(id), max(id) from sr_pb_test;
----
0 19


statement ok
drop table sr_pb_test;
16 changes: 16 additions & 0 deletions e2e_test/schema_registry/protobuf/user.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
syntax = "proto3";

package test;

message User {
int32 id = 1;
string name = 2;
string address = 3;
string city = 4;
Gender gender = 5;
}

enum Gender {
MALE = 0;
FEMALE = 1;
}
28 changes: 28 additions & 0 deletions e2e_test/schema_registry/protobuf/user_pb2.py

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 12 additions & 0 deletions proto/catalog.proto
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,9 @@ message Source {
}
string definition = 13;
optional uint32 connection_id = 14;

optional uint64 initialized_at_epoch = 15;
optional uint64 created_at_epoch = 16;
}

enum SinkType {
Expand Down Expand Up @@ -94,6 +97,8 @@ message Sink {
map<string, string> properties = 12;
string definition = 13;
optional uint32 connection_id = 14;
optional uint64 initialized_at_epoch = 15;
optional uint64 created_at_epoch = 16;
}

message Connection {
Expand Down Expand Up @@ -132,6 +137,9 @@ message Index {
// The index of `InputRef` is the column index of the primary table.
repeated expr.ExprNode index_item = 8;
repeated int32 original_columns = 9;

optional uint64 initialized_at_epoch = 10;
optional uint64 created_at_epoch = 11;
}

message Function {
Expand Down Expand Up @@ -216,6 +224,10 @@ message Table {
// The range of row count of the table.
// This field is not always present due to backward compatibility. Use `Cardinality::unknown` in this case.
plan_common.Cardinality cardinality = 27;

optional uint64 initialized_at_epoch = 28;
optional uint64 created_at_epoch = 29;

// Per-table catalog version, used by schema change. `None` for internal tables and tests.
// Not to be confused with the global catalog version for notification service.
TableVersion version = 100;
Expand Down
2 changes: 1 addition & 1 deletion src/common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ serde = { version = "1", features = ["derive"] }
serde_bytes = "0.11"
serde_default = "0.1"
serde_json = "1"
serde_with = "2"
serde_with = "3"
smallbitset = "0.6.1"
speedate = "0.7.0"
static_assertions = "1"
Expand Down
2 changes: 1 addition & 1 deletion src/connector/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ rust_decimal = "1"
serde = { version = "1", features = ["derive", "rc"] }
serde_derive = "1"
serde_json = "1"
serde_with = { version = "2", features = ["json"] }
serde_with = { version = "3", features = ["json"] }
simd-json = "0.9.1"
tempfile = "3"
thiserror = "1"
Expand Down
7 changes: 6 additions & 1 deletion src/connector/src/aws_auth.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,9 +116,14 @@ impl AwsAuthProps {
let credentials_provider = self
.with_role_provider(self.build_credential_provider())
.await?;
let config_loader = aws_config::from_env()
let mut config_loader = aws_config::from_env()
.region(region)
.credentials_provider(credentials_provider);

if let Some(endpoint) = self.endpoint.as_ref() {
config_loader = config_loader.endpoint_url(endpoint);
}

Ok(config_loader.load().await)
}
}
1 change: 1 addition & 0 deletions src/connector/src/aws_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ pub fn s3_client(
s3_config::Builder::from(&sdk_config.clone())
.retry_config(retry_conf)
.timeout_config(timeout_conf)
.force_path_style(true)
.build()
} else {
s3_config::Config::new(sdk_config)
Expand Down
Loading

0 comments on commit fc63733

Please sign in to comment.