Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: rework pubsub source to support parallel read and at-least-once #16733

Merged
merged 16 commits into from
May 21, 2024
Merged
89 changes: 38 additions & 51 deletions e2e_test/source_inline/pubsub/prepare-data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,65 +25,52 @@ const SUBSCRIPTION_COUNT: usize = 50;

#[tokio::main]
async fn main() -> anyhow::Result<()> {
let args: Vec<String> = std::env::args().collect();
let command = args[1].as_str();

std::env::set_var("PUBSUB_EMULATOR_HOST", "127.0.0.1:5980");

let client = Client::new(Default::default()).await?;

// delete and create "test-topic"
let topic = client.topic(TOPIC);
for subscription in topic.subscriptions(None).await? {
subscription.delete(None).await?;
}

let _ = topic.delete(None).await;
topic.create(Some(Default::default()), None).await?;
for i in 0..SUBSCRIPTION_COUNT {
let _ = client
.create_subscription(
format!("test-subscription-{}", i).as_str(),
TOPIC,
SubscriptionConfig {
retain_acked_messages: true,
..Default::default()
},
None,
)
.await?;
}
if command == "create" {
// delete and create "test-topic"
for subscription in topic.subscriptions(None).await? {
subscription.delete(None).await?;
}

let publisher = topic.new_publisher(Default::default());
for line in DATA.lines() {
let a = publisher
.publish(PubsubMessage {
data: line.to_string().into_bytes(),
..Default::default()
})
.await;
a.get().await?;
println!("published {}", line);
let _ = topic.delete(None).await;
topic.create(Some(Default::default()), None).await?;
for i in 0..SUBSCRIPTION_COUNT {
let _ = client
.create_subscription(
format!("test-subscription-{}", i).as_str(),
TOPIC,
SubscriptionConfig {
retain_acked_messages: false,
..Default::default()
},
None,
)
.await?;
}
} else if command == "publish" {
let publisher = topic.new_publisher(Default::default());
for i in 0..10 {
let data = format!("{{\"v1\":{i},\"v2\":\"name{i}\"}}");
let a = publisher
.publish(PubsubMessage {
data: data.to_string().into_bytes(),
..Default::default()
})
.await;
a.get().await?;
println!("published {}", data);
}
} else {
panic!("unknown command {command}");
}

Ok(())
}

const DATA: &str = r#"{"v1":1,"v2":"name0"}
{"v1":2,"v2":"name0"}
{"v1":6,"v2":"name3"}
{"v1":0,"v2":"name5"}
{"v1":5,"v2":"name8"}
{"v1":6,"v2":"name4"}
{"v1":8,"v2":"name9"}
{"v1":9,"v2":"name2"}
{"v1":4,"v2":"name6"}
{"v1":5,"v2":"name3"}
{"v1":8,"v2":"name8"}
{"v1":9,"v2":"name2"}
{"v1":2,"v2":"name3"}
{"v1":4,"v2":"name7"}
{"v1":7,"v2":"name0"}
{"v1":0,"v2":"name9"}
{"v1":3,"v2":"name2"}
{"v1":7,"v2":"name5"}
{"v1":1,"v2":"name7"}
{"v1":3,"v2":"name9"}
"#;
167 changes: 121 additions & 46 deletions e2e_test/source_inline/pubsub/pubsub.slt
Original file line number Diff line number Diff line change
@@ -1,80 +1,155 @@
control substitution on

system ok
e2e_test/source_inline/pubsub/prepare-data.rs
e2e_test/source_inline/pubsub/prepare-data.rs create

statement error missing field `pubsub.subscription`
CREATE SOURCE s (v1 int, v2 varchar) WITH (
connector = 'google_pubsub'
) FORMAT PLAIN ENCODE JSON;

statement error credentials must be set if not using the pubsub emulator
CREATE SOURCE s (v1 int, v2 varchar) WITH (
connector = 'google_pubsub',
pubsub.subscription = 'test-subscription-1'
) FORMAT PLAIN ENCODE JSON;

# fail with invalid emulator_host
statement error failed to lookup address information
CREATE TABLE s1 (v1 int, v2 varchar) WITH (
CREATE TABLE s (v1 int, v2 varchar) WITH (
connector = 'google_pubsub',
pubsub.subscription = 'test-subscription-1',
pubsub.emulator_host = 'invalid_host:5981'
) FORMAT PLAIN ENCODE JSON;

statement error subscription test-subscription-not-exist does not exist
CREATE TABLE s (v1 int, v2 varchar) WITH (
${RISEDEV_PUBSUB_WITH_OPTIONS_COMMON},
pubsub.subscription = 'test-subscription-not-exist',
) FORMAT PLAIN ENCODE JSON;

# fail if both start_offset and start_snapshot are provided
statement error specify at most one of start_offset or start_snapshot
CREATE TABLE s (v1 int, v2 varchar) WITH (
${RISEDEV_PUBSUB_WITH_OPTIONS_COMMON},
pubsub.subscription = 'test-subscription-3',
pubsub.start_offset.nanos = '121212',
pubsub.start_snapshot = 'snapshot-that-doesnt-exist'
) FORMAT PLAIN ENCODE JSON;

statement ok
CREATE TABLE s1 (v1 int, v2 varchar) WITH (
${RISEDEV_PUBSUB_WITH_OPTIONS_COMMON},
pubsub.subscription = 'test-subscription-1',
pubsub.parallelism = 5
) FORMAT PLAIN ENCODE JSON;


# We want to test Pub/Sub ack messages on checkpoint, so disable checkpointing here.
# Note that DDL & flush will trigger checkpoint.
statement ok
SELECT * FROM s1;
ALTER SYSTEM SET checkpoint_frequency TO 114514;

# We publish data after the tables are created, because DDL will trigger checkpoint.
system ok
e2e_test/source_inline/pubsub/prepare-data.rs publish

# default ack timeout is 10s. Let it redeliver once.
sleep 15s

# visibility mode is checkpoint
query IT rowsort
select count(*) from s1;
----
0

statement ok
DROP TABLE s1;
set visibility_mode = 'all';

query IT rowsort
select v1, v2, count(*) FROM s1 group by v1, v2;
----
0 name0 2
1 name1 2
2 name2 2
3 name3 2
4 name4 2
5 name5 2
6 name6 2
7 name7 2
8 name8 2
9 name9 2

statement error subscription test-subscription-not-exist does not exist
CREATE TABLE s2 (v1 int, v2 varchar) WITH (
${RISEDEV_PUBSUB_WITH_OPTIONS_COMMON},
pubsub.subscription = 'test-subscription-not-exist',
) FORMAT PLAIN ENCODE JSON;

statement ok
CREATE TABLE s2 (v1 int, v2 varchar) WITH (
${RISEDEV_PUBSUB_WITH_OPTIONS_COMMON},
pubsub.subscription = 'test-subscription-2',
) FORMAT PLAIN ENCODE JSON;
RECOVER;

# fail if both start_offset and start_snapshot are provided
statement error specify at most one of start_offset or start_snapshot
CREATE TABLE s3 (v1 int, v2 varchar) WITH (
${RISEDEV_PUBSUB_WITH_OPTIONS_COMMON},
pubsub.subscription = 'test-subscription-3',
pubsub.start_offset.nanos = '121212',
pubsub.start_snapshot = 'snapshot-that-doesnt-exist'
) FORMAT PLAIN ENCODE JSON;
sleep 1s

# wait for source
statement ok
flush;

# After recovery, uncheckpointed data in RisingWave is lost.
query IT rowsort
select count(*) from s1;
----
0

# Wait for another redelivery
sleep 10s

# flush data into storage
query IT rowsort
select v1, v2, count(*) FROM s1 group by v1, v2;
----
0 name0 1
1 name1 1
2 name2 1
3 name3 1
4 name4 1
5 name5 1
6 name6 1
7 name7 1
8 name8 1
9 name9 1

# flush will force a checkpoint (and ack Pub/Sub messages on checkpoint)
statement ok
flush;

query IT rowsort
select v1, v2 FROM s2;
select v1, v2, count(*) FROM s1 group by v1, v2;
----
0 name5
0 name9
1 name0
1 name7
2 name0
2 name3
3 name2
3 name9
4 name6
4 name7
5 name3
5 name8
6 name3
6 name4
7 name0
7 name5
8 name8
8 name9
9 name2
9 name2
0 name0 1
1 name1 1
2 name2 1
3 name3 1
4 name4 1
5 name5 1
6 name6 1
7 name7 1
8 name8 1
9 name9 1

sleep 15s

# no redelivery any more
query IT rowsort
select v1, v2, count(*) FROM s1 group by v1, v2;
----
0 name0 1
1 name1 1
2 name2 1
3 name3 1
4 name4 1
5 name5 1
6 name6 1
7 name7 1
8 name8 1
9 name9 1


statement ok
DROP TABLE s2;
DROP TABLE s1;

# Restore to the value in src/config/ci-recovery.toml
statement ok
ALTER SYSTEM SET checkpoint_frequency TO 5;
2 changes: 1 addition & 1 deletion risedev.yml
Original file line number Diff line number Diff line change
Expand Up @@ -863,7 +863,7 @@ profile:
port: 29092

ci-inline-source-test:
config-path: src/config/ci.toml
config-path: src/config/ci-recovery.toml
steps:
- use: minio
- use: etcd
Expand Down
2 changes: 1 addition & 1 deletion src/batch/src/executor/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ impl SourceExecutor {
));
let stream = self
.source
.to_stream(Some(self.split_list), self.column_ids, source_ctx)
.build_stream(Some(self.split_list), self.column_ids, source_ctx)
.await?;

#[for_await]
Expand Down
1 change: 1 addition & 0 deletions src/config/ci-recovery.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,5 +10,6 @@ imm_merge_threshold = 2

[system]
barrier_interval_ms = 250
# If this is changed, remember to also change e2e_test/source_inline/pubsub/pubsub.slt
checkpoint_frequency = 5
max_concurrent_creating_streaming_jobs = 0
Loading
Loading