diff --git a/e2e_test/source_inline/kafka/issue_19563.slt b/e2e_test/source_inline/kafka/issue_19563.slt new file mode 100644 index 000000000000..874dbaa98ea7 --- /dev/null +++ b/e2e_test/source_inline/kafka/issue_19563.slt @@ -0,0 +1,65 @@ +control substitution on + +system ok +rpk topic create test-topic-19563 + +statement ok +CREATE SOURCE kafkasource ( + v1 timestamp with time zone +) +WITH ( + ${RISEDEV_KAFKA_WITH_OPTIONS_COMMON}, + topic = 'test-topic-19563', + scan.startup.mode = 'earliest' +) FORMAT PLAIN ENCODE JSON ( + timestamptz.handling.mode = 'utc_without_suffix' +); + +# Note that StreamSourceScan is in the StreamDynamicFilter fragment, which has 2 upstream fragments. +query T +explain create materialized view mv1 as select v1 from kafkasource where v1 between now() and now() + interval '1 day' * 365 * 2000; +---- +StreamMaterialize { columns: [v1, _row_id(hidden)], stream_key: [_row_id], pk_columns: [_row_id], pk_conflict: NoCheck } +└─StreamDynamicFilter { predicate: (v1 <= $expr1), output: [v1, _row_id], cleaned_by_watermark: true } + ├─StreamProject { exprs: [v1, _row_id], output_watermarks: [v1] } + │ └─StreamDynamicFilter { predicate: (v1 >= now), output_watermarks: [v1], output: [v1, _rw_kafka_timestamp, _row_id, _rw_kafka_partition, _rw_kafka_offset], cleaned_by_watermark: true } + │ ├─StreamRowIdGen { row_id_index: 2 } + │ │ └─StreamSourceScan { columns: [v1, _rw_kafka_timestamp, _row_id, _rw_kafka_partition, _rw_kafka_offset] } + │ └─StreamExchange { dist: Broadcast } + │ └─StreamNow + └─StreamExchange { dist: Broadcast } + └─StreamProject { exprs: [AddWithTimeZone(now, '730000 days':Interval, 'UTC':Varchar) as $expr1], output_watermarks: [$expr1] } + └─StreamNow + +# The following test is adapted from `temporal_filter.slt`. + +# This statement should be correct for the next ~1000 years +# We cannot have a variable interval for now, so we use 2000 year's worth of days as the upper bound. +statement ok +create materialized view mv1 as select v1 from kafkasource where v1 between now() and now() + interval '1 day' * 365 * 2000; + +system ok +cat < MetadataModelResult>> { - let mut source_fragments = HashMap::new(); + let mut source_backfill_fragments = HashMap::new(); for fragment in self.fragments() { for actor in &fragment.actors { if let Some((source_id, upstream_source_fragment_id)) = actor.nodes.as_ref().unwrap().find_source_backfill() { - source_fragments + source_backfill_fragments .entry(source_id as SourceId) .or_insert(BTreeSet::new()) .insert((fragment.fragment_id, upstream_source_fragment_id)); @@ -442,7 +442,7 @@ impl StreamJobFragments { } } } - Ok(source_fragments) + Ok(source_backfill_fragments) } /// Resolve dependent table