Skip to content

Commit

Permalink
add test
Browse files Browse the repository at this point in the history
Signed-off-by: xxchan <[email protected]>
  • Loading branch information
xxchan committed Nov 25, 2024
1 parent ea31308 commit 860e859
Show file tree
Hide file tree
Showing 2 changed files with 68 additions and 3 deletions.
65 changes: 65 additions & 0 deletions e2e_test/source_inline/kafka/issue_19563.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
control substitution on

system ok
rpk topic create test-topic-19563

statement ok
CREATE SOURCE kafkasource (
v1 timestamp with time zone
)
WITH (
${RISEDEV_KAFKA_WITH_OPTIONS_COMMON},
topic = 'test-topic-19563',
scan.startup.mode = 'earliest'
) FORMAT PLAIN ENCODE JSON (
timestamptz.handling.mode = 'utc_without_suffix'
);

# Note that StreamSourceScan is in the StreamDynamicFilter fragment, which has 2 upstream fragments.
query T
explain create materialized view mv1 as select v1 from kafkasource where v1 between now() and now() + interval '1 day' * 365 * 2000;
----
StreamMaterialize { columns: [v1, _row_id(hidden)], stream_key: [_row_id], pk_columns: [_row_id], pk_conflict: NoCheck }
└─StreamDynamicFilter { predicate: (v1 <= $expr1), output: [v1, _row_id], cleaned_by_watermark: true }
├─StreamProject { exprs: [v1, _row_id], output_watermarks: [v1] }
│ └─StreamDynamicFilter { predicate: (v1 >= now), output_watermarks: [v1], output: [v1, _rw_kafka_timestamp, _row_id, _rw_kafka_partition, _rw_kafka_offset], cleaned_by_watermark: true }
│ ├─StreamRowIdGen { row_id_index: 2 }
│ │ └─StreamSourceScan { columns: [v1, _rw_kafka_timestamp, _row_id, _rw_kafka_partition, _rw_kafka_offset] }
│ └─StreamExchange { dist: Broadcast }
│ └─StreamNow
└─StreamExchange { dist: Broadcast }
└─StreamProject { exprs: [AddWithTimeZone(now, '730000 days':Interval, 'UTC':Varchar) as $expr1], output_watermarks: [$expr1] }
└─StreamNow

# The following test is adapted from `temporal_filter.slt`.

# This statement should be correct for the next ~1000 years
# We cannot have a variable interval for now, so we use 2000 year's worth of days as the upper bound.
statement ok
create materialized view mv1 as select v1 from kafkasource where v1 between now() and now() + interval '1 day' * 365 * 2000;

system ok
cat <<EOF | rpk topic produce test-topic-19563
{"v1": "3031-01-01 19:00:00"}
{"v1": "3031-01-01 20:00:00"}
{"v1": "3031-01-01 21:00:00"}
{"v1": "5031-01-01 21:00:00"}
{"v1": "0001-01-01 21:00:00"}
EOF

sleep 3s

# Below lower bound and above upper bound are not shown
query I
select * from mv1 order by v1;
----
3031-01-01 19:00:00+00:00
3031-01-01 20:00:00+00:00
3031-01-01 21:00:00+00:00


statement ok
DROP SOURCE kafkasource CASCADE;

system ok
rpk topic delete test-topic-18308
6 changes: 3 additions & 3 deletions src/meta/src/model/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -426,14 +426,14 @@ impl StreamJobFragments {
pub fn source_backfill_fragments(
&self,
) -> MetadataModelResult<HashMap<SourceId, BTreeSet<(FragmentId, FragmentId)>>> {
let mut source_fragments = HashMap::new();
let mut source_backfill_fragments = HashMap::new();

for fragment in self.fragments() {
for actor in &fragment.actors {
if let Some((source_id, upstream_source_fragment_id)) =
actor.nodes.as_ref().unwrap().find_source_backfill()
{
source_fragments
source_backfill_fragments
.entry(source_id as SourceId)
.or_insert(BTreeSet::new())
.insert((fragment.fragment_id, upstream_source_fragment_id));
Expand All @@ -442,7 +442,7 @@ impl StreamJobFragments {
}
}
}
Ok(source_fragments)
Ok(source_backfill_fragments)
}

/// Resolve dependent table
Expand Down

0 comments on commit 860e859

Please sign in to comment.