Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
Signed-off-by: xxchan <[email protected]>
  • Loading branch information
xxchan committed Apr 20, 2024
1 parent fba663f commit afe5279
Show file tree
Hide file tree
Showing 6 changed files with 20 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,27 @@ control substitution on
statement ok
SET rw_enable_shared_source TO true;

system ok
rpk topic create shared_source -p 4

system ok
cat << EOF | rpk topic produce shared_source -f "%p %v\\n" -p 0
0 {"v1": 1, "v2": "1"}
1 {"v1": 2, "v2": "22"}
2 {"v1": 3, "v2": "333"}
3 {"v1": 4, "v2": "4444"}

statement ok
create source s0 (v1 int, v2 varchar) with (
connector = 'kafka',
topic = 'kafka_4_partition_topic',
properties.bootstrap.server = '${KAFKA_BOOTSTRAP_SERVER:message_queue:29092}',
${RISEDEV_KAFKA_WITH_OPTIONS_COMMON},
topic = 'shared_source',
scan.startup.mode = 'earliest'
) FORMAT PLAIN ENCODE JSON;

statement ok
create materialized view mv_1 as select * from s0;

# This does not the behavior for CREATE MATERIALIZED VIEW below. It also uses the shared source, and creates SourceBackfillExecutor.
statement ok
SET rw_enable_shared_source TO false;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
# We use with_config_map to control the config when CREATE SOURCE, and use another SET statement to change the config for CREATE MV
#
# batch: All 4 plans should be the same.
# stream: StreamSourceScan (with backfill) should be used only for the last 1. All other 3 use StreamSource.
# stream: StreamSourceScan (with backfill) should be used only for the last 2. The first 2 use StreamSource. rw_enable_shared_source changes the behavior of CREATE SOURCE, but not CREATE MATERIALIZED VIEW
- with_config_map:
rw_enable_shared_source: false
before:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@
StreamMaterialize { columns: [x, y, _row_id(hidden)], stream_key: [_row_id], pk_columns: [_row_id], pk_conflict: NoCheck }
└─StreamProject { exprs: [x, y, _row_id] }
└─StreamRowIdGen { row_id_index: 3 }
└─StreamSource { source: s, columns: [x, y, _rw_kafka_timestamp, _row_id, _rw_kafka_partition, _rw_kafka_offset] }
└─StreamSourceScan { columns: [x, y, _rw_kafka_timestamp, _row_id, _rw_kafka_partition, _rw_kafka_offset] }
with_config_map:
rw_enable_shared_source: 'true'
- before:
Expand Down
5 changes: 3 additions & 2 deletions src/frontend/src/optimizer/plan_node/logical_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -343,8 +343,9 @@ impl ToStream for LogicalSource {
}
SourceNodeKind::CreateMViewOrBatch => {
// Create MV on source.
let use_shared_source = self.source_catalog().is_some_and(|c| c.info.is_shared())
&& self.ctx().session_ctx().config().rw_enable_shared_source();
// We only check rw_enable_shared_source is true when `CREATE SOURCE`.
// The value does not affect the behavior of `CREATE MATERIALIZED VIEW` here.
let use_shared_source = self.source_catalog().is_some_and(|c| c.info.is_shared());
if use_shared_source {
plan = StreamSourceScan::new(self.core.clone()).into();
} else {
Expand Down
1 change: 1 addition & 0 deletions src/stream/src/executor/source/source_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -498,6 +498,7 @@ impl<S: StateStore> SourceExecutor<S> {
// - For shared source, pause until there's a MV.
// - If the first barrier requires us to pause on startup, pause the stream.
if (self.is_shared && is_uninitialized) || barrier.is_pause_on_startup() {
tracing::info!(is_shared=%self.is_shared, is_uninitialized=%is_uninitialized, "source paused on startup");
stream.pause_stream();
}

Expand Down
2 changes: 1 addition & 1 deletion src/stream/src/from_proto/source/trad_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ impl ExecutorBuilder for SourceExecutorBuilder {
.collect();

let state_table_handler = SourceStateTableHandler::from_table_catalog(
source.state_table.as_ref.unwrap(),
source.state_table.as_ref().unwrap(),
store.clone(),
)
.await;
Expand Down

0 comments on commit afe5279

Please sign in to comment.