Skip to content

Commit

Permalink
Merge branch 'main' of https://github.com/risingwavelabs/risingwave i…
Browse files Browse the repository at this point in the history
…nto li0k/storage_split_with_size
  • Loading branch information
Li0k committed Aug 22, 2024
2 parents e395628 + d8c718b commit f2af33a
Show file tree
Hide file tree
Showing 4 changed files with 65 additions and 30 deletions.
13 changes: 0 additions & 13 deletions ci/scripts/e2e-kafka-sink-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -150,20 +150,7 @@ echo "preparing confluent schema registry"
python3 -m pip install --break-system-packages requests confluent-kafka

echo "testing protobuf"
cp src/connector/src/test_data/proto_recursive/recursive.pb ./proto-recursive
rpk topic create test-rw-sink-append-only-protobuf
rpk topic create test-rw-sink-append-only-protobuf-csr-a
rpk topic create test-rw-sink-append-only-protobuf-csr-hi
python3 e2e_test/sink/kafka/register_schema.py 'http://schemaregistry:8082' 'test-rw-sink-append-only-protobuf-csr-a-value' src/connector/src/test_data/test-index-array.proto
python3 e2e_test/sink/kafka/register_schema.py 'http://schemaregistry:8082' 'test-rw-sink-append-only-protobuf-csr-hi-value' src/connector/src/test_data/test-index-array.proto
sqllogictest -p 4566 -d dev 'e2e_test/sink/kafka/protobuf.slt'
rpk topic delete test-rw-sink-append-only-protobuf
rpk topic delete test-rw-sink-append-only-protobuf-csr-a
rpk topic delete test-rw-sink-append-only-protobuf-csr-hi

echo "testing avro"
python3 e2e_test/sink/kafka/register_schema.py 'http://schemaregistry:8082' 'test-rw-sink-upsert-avro-value' src/connector/src/test_data/all-types.avsc
python3 e2e_test/sink/kafka/register_schema.py 'http://schemaregistry:8082' 'test-rw-sink-upsert-avro-key' src/connector/src/test_data/all-types.avsc 'string_field,int32_field'
rpk topic create test-rw-sink-upsert-avro
sqllogictest -p 4566 -d dev 'e2e_test/sink/kafka/avro.slt'
rpk topic delete test-rw-sink-upsert-avro
12 changes: 12 additions & 0 deletions e2e_test/sink/kafka/avro.slt
Original file line number Diff line number Diff line change
@@ -1,6 +1,15 @@
statement ok
set sink_decouple = false;

system ok
rpk topic create test-rw-sink-upsert-avro

system ok
python3 e2e_test/sink/kafka/register_schema.py 'http://schemaregistry:8082' 'test-rw-sink-upsert-avro-value' src/connector/src/test_data/all-types.avsc

system ok
python3 e2e_test/sink/kafka/register_schema.py 'http://schemaregistry:8082' 'test-rw-sink-upsert-avro-key' src/connector/src/test_data/all-types.avsc 'string_field,int32_field'

statement ok
create table from_kafka ( *, gen_i32_field int as int32_field + 2, primary key (some_key) )
include key as some_key
Expand Down Expand Up @@ -232,3 +241,6 @@ drop table into_kafka;

statement ok
drop table from_kafka;

system ok
rpk topic delete test-rw-sink-upsert-avro
27 changes: 27 additions & 0 deletions e2e_test/sink/kafka/protobuf.slt
Original file line number Diff line number Diff line change
@@ -1,6 +1,12 @@
statement ok
set sink_decouple = false;

system ok
rpk topic create test-rw-sink-append-only-protobuf

system ok
cp src/connector/src/test_data/proto_recursive/recursive.pb ./proto-recursive

statement ok
create table from_kafka with (
connector = 'kafka',
Expand All @@ -10,6 +16,12 @@ format plain encode protobuf (
schema.location = 'file:///risingwave/proto-recursive',
message = 'recursive.AllTypes');

system ok
rpk topic create test-rw-sink-append-only-protobuf-csr-a

system ok
python3 e2e_test/sink/kafka/register_schema.py 'http://schemaregistry:8082' 'test-rw-sink-append-only-protobuf-csr-a-value' src/connector/src/test_data/test-index-array.proto

statement ok
create table from_kafka_csr_trivial with (
connector = 'kafka',
Expand All @@ -19,6 +31,12 @@ format plain encode protobuf (
schema.registry = 'http://schemaregistry:8082',
message = 'test.package.MessageA');

system ok
rpk topic create test-rw-sink-append-only-protobuf-csr-hi

system ok
python3 e2e_test/sink/kafka/register_schema.py 'http://schemaregistry:8082' 'test-rw-sink-append-only-protobuf-csr-hi-value' src/connector/src/test_data/test-index-array.proto

statement ok
create table from_kafka_csr_nested with (
connector = 'kafka',
Expand Down Expand Up @@ -215,5 +233,14 @@ drop table from_kafka_raw cascade;
statement ok
drop table into_kafka cascade;

system ok
rpk topic delete test-rw-sink-append-only-protobuf

system ok
rpk topic delete test-rw-sink-append-only-protobuf-csr-a

system ok
rpk topic delete test-rw-sink-append-only-protobuf-csr-hi

system ok
rpk topic delete test-rw-sink-upsert-protobuf
43 changes: 26 additions & 17 deletions src/connector/src/source/kinesis/source/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,6 @@ impl KinesisSplitReader {
#[try_stream(ok = Vec < SourceMessage >, error = crate::error::ConnectorError)]
async fn into_data_stream(mut self) {
self.new_shard_iter().await?;
let mut finish_flag = false;
loop {
if self.shard_iter.is_none() {
tracing::warn!(
Expand All @@ -139,22 +138,39 @@ impl KinesisSplitReader {
}
match self.get_records().await {
Ok(resp) => {
if resp.millis_behind_latest.is_none()
&& let Some(shard) = &resp.child_shards
tracing::trace!(?self.shard_id, ?resp);
self.shard_iter = resp.next_shard_iterator().map(String::from);
let chunk = (resp.records().iter())
.map(|r| from_kinesis_record(r, self.split_id.clone()))
.collect::<Vec<SourceMessage>>();
if let Some(shard) = &resp.child_shards
&& !shard.is_empty()
{
// according to the doc https://docs.rs/aws-sdk-kinesis/latest/aws_sdk_kinesis/operation/get_records/struct.GetRecordsOutput.html
//
// > The list of the current shard's child shards, returned in the GetRecords API's response only when the end of the current shard is reached.
//
// It means the current shard is finished, ie. inactive, and we should stop reading it. Checking `millis_behind_latest` is a double check.

// The response will be like:
// {
// "Records": [], // empty
// "MillisBehindLatest": 2665000, // non-zero
// "ChildShards": [...] // non-empty
// // no NextShardIterator
// }

// Other executors are going to read the child shards.
finish_flag = true;

if !chunk.is_empty() {
// This should not happen. But be extra safe here.
yield chunk;
}

tracing::info!(
"shard {:?} reaches the end and is inactive, stop reading",
self.shard_id
);
break;
}
self.shard_iter = resp.next_shard_iterator().map(String::from);
let chunk = (resp.records().iter())
.map(|r| from_kinesis_record(r, self.split_id.clone()))
.collect::<Vec<SourceMessage>>();
if chunk.is_empty() {
tokio::time::sleep(Duration::from_millis(200)).await;
continue;
Expand All @@ -166,13 +182,6 @@ impl KinesisSplitReader {
self.latest_offset
);
yield chunk;
if finish_flag {
tracing::info!(
"shard {:?} reaches the end and is inactive, stop reading",
self.shard_id
);
break;
}
}
Err(SdkError::ServiceError(e)) if e.err().is_resource_not_found_exception() => {
tracing::warn!("shard {:?} is closed, stop reading", self.shard_id);
Expand Down

0 comments on commit f2af33a

Please sign in to comment.