Skip to content

Commit

Permalink
Merge branch 'main' into yiming/batch-query-time-travel
Browse files Browse the repository at this point in the history
  • Loading branch information
wenym1 authored Sep 11, 2024
2 parents 2ddd228 + 23410f0 commit 05f9efc
Show file tree
Hide file tree
Showing 7 changed files with 247 additions and 144 deletions.
97 changes: 97 additions & 0 deletions e2e_test/source_inline/kafka/protobuf/recover.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
control substitution on

system ok
rpk topic create 'test-pb-struct'


system ok
jq -sR '{"schema":.,"schemaType":"PROTOBUF"}' << EOF | curl -X POST -H 'content-type: application/json' -d @- "${RISEDEV_SCHEMA_REGISTRY_URL}/subjects/test-pb-struct-value/versions"
syntax = "proto3";
package test;
message User {
int32 id = 1;
Name name = 2;
}
message Name {
string first_name = 1;
string last_name = 2;
}
EOF


# create a source with v1 schema
statement ok
create source s with (
${RISEDEV_KAFKA_WITH_OPTIONS_COMMON},
topic = 'test-pb-struct')
format plain encode protobuf (
schema.registry = '${RISEDEV_SCHEMA_REGISTRY_URL}',
message = 'test.User');


# register a v2 schema
system ok
jq -sR '{"schema":.,"schemaType":"PROTOBUF"}' << EOF | curl -X POST -H 'content-type: application/json' -d @- "${RISEDEV_SCHEMA_REGISTRY_URL}/subjects/test-pb-struct-value/versions"
syntax = "proto3";
package test;
message User {
int32 id = 1;
Name name = 2;
}
message Name {
string first_name = 1;
string last_name = 2;
string middle_name = 3;
}
EOF


# trigger recovery
statement ok
recover;


sleep 2s


# produce a v2 message
statement ok
create sink sk as select
1 as id,
row('Alan', 'Turing', 'Mathison')::struct<first_name varchar, last_name varchar, middle_name varchar> as name
with (
${RISEDEV_KAFKA_WITH_OPTIONS_COMMON},
topic = 'test-pb-struct')
format plain encode protobuf (
schema.registry = '${RISEDEV_SCHEMA_REGISTRY_URL}',
message = 'test.User');


sleep 1s


# reading as v1 shall not panic
query IT
select * from s;
----
1 (Alan,Turing)


statement ok
drop sink sk;


statement ok
drop source s;


system ok
curl -X DELETE "${RISEDEV_SCHEMA_REGISTRY_URL}/subjects/test-pb-struct-value"


system ok
curl -X DELETE "${RISEDEV_SCHEMA_REGISTRY_URL}/subjects/test-pb-struct-value?permanent=true"


system ok
rpk topic delete 'test-pb-struct'
3 changes: 2 additions & 1 deletion integration_tests/twitter-pulsar/pb/create_source.sql
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
CREATE SOURCE twitter WITH (
connector = 'pulsar',
pulsar.topic = 'twitter',
pulsar.service.url = 'pulsar://message_queue:6650'
pulsar.service.url = 'pulsar://message_queue:6650',
subscription.name.prefix = 'custom_prefix'
) ROW FORMAT PROTOBUF MESSAGE 'twitter.schema.Event' ROW SCHEMA LOCATION 'http://file_server:8080/schema';
Loading

0 comments on commit 05f9efc

Please sign in to comment.