diff --git a/e2e_test/nexmark/create_sources.slt.part b/e2e_test/nexmark/create_sources.slt.part index 6c651822bc613..16bd0dc1b44ba 100644 --- a/e2e_test/nexmark/create_sources.slt.part +++ b/e2e_test/nexmark/create_sources.slt.part @@ -1,3 +1,37 @@ +statement ok +CREATE SOURCE nexmark ( + event_type BIGINT, + person STRUCT<"id" BIGINT, + "name" VARCHAR, + "email_address" VARCHAR, + "credit_card" VARCHAR, + "city" VARCHAR, + "state" VARCHAR, + "date_time" TIMESTAMP, + "extra" VARCHAR>, + auction STRUCT<"id" BIGINT, + "item_name" VARCHAR, + "description" VARCHAR, + "initial_bid" BIGINT, + "reserve" BIGINT, + "date_time" TIMESTAMP, + "expires" TIMESTAMP, + "seller" BIGINT, + "category" BIGINT, + "extra" VARCHAR>, + bid STRUCT<"auction" BIGINT, + "bidder" BIGINT, + "price" BIGINT, + "channel" VARCHAR, + "url" VARCHAR, + "date_time" TIMESTAMP, + "extra" VARCHAR> +) WITH ( + connector = 'nexmark', + nexmark.split.num = '8', + nexmark.min.event.gap.in.ns = '1000000' +) ROW FORMAT JSON; + statement ok CREATE MATERIALIZED SOURCE person ( "id" BIGINT, diff --git a/e2e_test/nexmark/drop_sources.slt.part b/e2e_test/nexmark/drop_sources.slt.part index c27f833c1522b..716ca30b7fa2b 100644 --- a/e2e_test/nexmark/drop_sources.slt.part +++ b/e2e_test/nexmark/drop_sources.slt.part @@ -1,3 +1,6 @@ +statement ok +DROP SOURCE nexmark; + statement ok DROP SOURCE person; diff --git a/e2e_test/source/basic/nexmark_endless.slt b/e2e_test/source/basic/nexmark_endless.slt index 4b07999c10f26..062c366da0075 100644 --- a/e2e_test/source/basic/nexmark_endless.slt +++ b/e2e_test/source/basic/nexmark_endless.slt @@ -5,11 +5,19 @@ include ../../nexmark/create_sources.slt.part include ../../streaming/nexmark/views/q7.slt.part include ../../streaming/nexmark/views/q5.slt.part +statement ok +create materialized view nexmark_mv as select * from nexmark; + sleep 15s statement ok flush; +query I +select count(*) > 0 as has_results from nexmark_mv; +---- +t + query I select count(*) > 0 as has_results from nexmark_q7; ---- @@ -26,4 +34,7 @@ drop materialized view nexmark_q7; statement ok drop materialized view nexmark_q5; +statement ok +drop materialized view nexmark_mv; + include ../nexmark/drop_sources.slt.part diff --git a/src/connector/src/source/base.rs b/src/connector/src/source/base.rs index 6bacc2138afff..027ff87a421cc 100644 --- a/src/connector/src/source/base.rs +++ b/src/connector/src/source/base.rs @@ -282,7 +282,7 @@ mod tests { let props = ConnectorProperties::extract(props).unwrap(); if let ConnectorProperties::Nexmark(props) = props { - assert_eq!(props.table_type, EventType::Person); + assert_eq!(props.table_type, Some(EventType::Person)); assert_eq!(props.split_num, 1); } else { panic!("extract nexmark config failed"); diff --git a/src/connector/src/source/nexmark/mod.rs b/src/connector/src/source/nexmark/mod.rs index fc1e6f4eb61df..ab19b65695bcb 100644 --- a/src/connector/src/source/nexmark/mod.rs +++ b/src/connector/src/source/nexmark/mod.rs @@ -53,8 +53,8 @@ pub struct NexmarkPropertiesInner { #[serde(rename = "nexmark.event.num", default = "default_event_num")] pub event_num: u64, - #[serde(rename = "nexmark.table.type", default = "default_event_type")] - pub table_type: EventType, + #[serde(rename = "nexmark.table.type", default = "none")] + pub table_type: Option, #[serde_as(as = "DisplayFromStr")] #[serde(rename = "nexmark.max.chunk.size", default = "identity_u64::<1024>")] @@ -221,10 +221,6 @@ fn default_event_num() -> u64 { u64::MAX } -fn default_event_type() -> EventType { - EventType::Person -} - impl Default for NexmarkPropertiesInner { fn default() -> Self { let v = serde_json::to_value(HashMap::::new()).unwrap(); @@ -237,10 +233,16 @@ impl From<&NexmarkPropertiesInner> for NexmarkConfig { // 2015-07-15 00:00:00 pub const BASE_TIME: u64 = 1_436_918_400_000; - let mut cfg = NexmarkConfig { - base_time: BASE_TIME, - ..Default::default() + let mut cfg = match value.table_type { + // This is the old way + Some(_) => NexmarkConfig { + base_time: BASE_TIME, + ..Default::default() + }, + // By using default, it will choose the default proportion of three different events. + None => NexmarkConfig::default(), }; + macro_rules! set { ($name:ident) => { set!($name, $name); diff --git a/src/connector/src/source/nexmark/source/combined_event.rs b/src/connector/src/source/nexmark/source/combined_event.rs new file mode 100644 index 0000000000000..031a34fa086dc --- /dev/null +++ b/src/connector/src/source/nexmark/source/combined_event.rs @@ -0,0 +1,55 @@ +// Copyright 2022 Singularity Data +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use nexmark::event::{Auction, Bid, Person}; +use serde::{Deserialize, Serialize}; + +#[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize)] +pub struct CombinedEvent { + event_type: u64, + /// The Person event + person: Option, + /// The Auction event. + auction: Option, + /// The Bid event. + bid: Option, +} + +impl CombinedEvent { + fn new( + event_type: u64, + person: Option, + auction: Option, + bid: Option, + ) -> Self { + Self { + event_type, + person, + auction, + bid, + } + } + + pub fn person(person: Person) -> Self { + Self::new(0, Some(person), None, None) + } + + pub fn auction(auction: Auction) -> Self { + Self::new(1, None, Some(auction), None) + } + + pub fn bid(bid: Bid) -> Self { + Self::new(2, None, None, Some(bid)) + } +} diff --git a/src/connector/src/source/nexmark/source/message.rs b/src/connector/src/source/nexmark/source/message.rs index dd2213a3534d0..f407ddaa5ea5a 100644 --- a/src/connector/src/source/nexmark/source/message.rs +++ b/src/connector/src/source/nexmark/source/message.rs @@ -15,6 +15,7 @@ use bytes::Bytes; use nexmark::event::Event; +use crate::source::nexmark::source::combined_event::CombinedEvent; use crate::source::{SourceMessage, SplitId}; #[derive(Clone, Debug)] @@ -35,7 +36,7 @@ impl From for SourceMessage { } impl NexmarkMessage { - pub fn new(split_id: SplitId, offset: u64, event: Event) -> Self { + pub fn new_single_event(split_id: SplitId, offset: u64, event: Event) -> Self { NexmarkMessage { split_id, sequence_number: offset.to_string(), @@ -48,4 +49,18 @@ impl NexmarkMessage { .into(), } } + + pub fn new_combined_event(split_id: SplitId, offset: u64, event: Event) -> Self { + let combined_event = match event { + Event::Person(p) => CombinedEvent::person(p), + Event::Auction(a) => CombinedEvent::auction(a), + Event::Bid(b) => CombinedEvent::bid(b), + }; + let combined_event = serde_json::to_string(&combined_event).unwrap(); + NexmarkMessage { + split_id, + sequence_number: offset.to_string(), + payload: combined_event.into(), + } + } } diff --git a/src/connector/src/source/nexmark/source/mod.rs b/src/connector/src/source/nexmark/source/mod.rs index 0c3835c786eff..0fbf995e74f92 100644 --- a/src/connector/src/source/nexmark/source/mod.rs +++ b/src/connector/src/source/nexmark/source/mod.rs @@ -12,5 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. +mod combined_event; pub mod message; pub mod reader; diff --git a/src/connector/src/source/nexmark/source/reader.rs b/src/connector/src/source/nexmark/source/reader.rs index cf6e25a97866d..0aa4e06c9315f 100644 --- a/src/connector/src/source/nexmark/source/reader.rs +++ b/src/connector/src/source/nexmark/source/reader.rs @@ -37,7 +37,7 @@ pub struct NexmarkSplitReader { assigned_split: NexmarkSplit, split_id: SplitId, event_num: u64, - event_type: EventType, + event_type: Option, use_real_time: bool, min_event_gap_in_ns: u64, max_chunk_size: u64, @@ -70,11 +70,17 @@ impl SplitReader for NexmarkSplitReader { assigned_split = split; } + let mut generator = EventGenerator::new(NexmarkConfig::from(&*properties)) + .with_offset(offset) + .with_step(split_num); + // If the user doesn't specify the event type in the source definition, then the user + // intends to use unified source(three different types of events together). + if let Some(event_type) = properties.table_type.as_ref() { + generator = generator.with_type_filter(*event_type); + } + Ok(NexmarkSplitReader { - generator: EventGenerator::new(NexmarkConfig::from(&*properties)) - .with_offset(offset) - .with_step(split_num) - .with_type_filter(properties.table_type), + generator, assigned_split, split_id, max_chunk_size: properties.max_chunk_size, @@ -105,8 +111,18 @@ impl NexmarkSplitReader { break; } let event = self.generator.next().unwrap(); - let event = - NexmarkMessage::new(self.split_id.clone(), self.generator.offset(), event); + let event = match self.event_type { + Some(_) => NexmarkMessage::new_single_event( + self.split_id.clone(), + self.generator.offset(), + event, + ), + None => NexmarkMessage::new_combined_event( + self.split_id.clone(), + self.generator.offset(), + event, + ), + }; msgs.push(event.into()); } if msgs.is_empty() { @@ -147,7 +163,7 @@ mod tests { let props = Box::new(NexmarkPropertiesInner { split_num: 2, min_event_gap_in_ns: 0, - table_type: EventType::Bid, + table_type: Some(EventType::Bid), max_chunk_size: 5, ..Default::default() });