Skip to content

Commit

Permalink
feat(nexmark): add unified source while keeping the old single type s…
Browse files Browse the repository at this point in the history
…ource
  • Loading branch information
lmatz committed Dec 9, 2022
1 parent 353837d commit eaba5d0
Show file tree
Hide file tree
Showing 8 changed files with 150 additions and 19 deletions.
34 changes: 34 additions & 0 deletions e2e_test/nexmark/create_sources.slt.part
Original file line number Diff line number Diff line change
@@ -1,3 +1,37 @@
statement ok
CREATE SOURCE nexmark (
event_type BIGINT,
person STRUCT<"id" BIGINT,
"name" VARCHAR,
"email_address" VARCHAR,
"credit_card" VARCHAR,
"city" VARCHAR,
"state" VARCHAR,
"date_time" TIMESTAMP,
"extra" VARCHAR>,
auction STRUCT<"id" BIGINT,
"item_name" VARCHAR,
"description" VARCHAR,
"initial_bid" BIGINT,
"reserve" BIGINT,
"date_time" TIMESTAMP,
"expires" TIMESTAMP,
"seller" BIGINT,
"category" BIGINT,
"extra" VARCHAR>,
bid STRUCT<"auction" BIGINT,
"bidder" BIGINT,
"price" BIGINT,
"channel" VARCHAR,
"url" VARCHAR,
"date_time" TIMESTAMP,
"extra" VARCHAR>
) WITH (
connector = 'nexmark',
nexmark.split.num = '8',
nexmark.min.event.gap.in.ns = '1000000'
) ROW FORMAT JSON;

statement ok
CREATE MATERIALIZED SOURCE person (
"id" BIGINT,
Expand Down
8 changes: 8 additions & 0 deletions e2e_test/source/basic/nexmark_endless.slt
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,19 @@ include ../../nexmark/create_sources.slt.part
include ../../streaming/nexmark/views/q7.slt.part
include ../../streaming/nexmark/views/q5.slt.part

statement ok
create materialized view nexmark_mv as select * from nexmark;

sleep 15s

statement ok
flush;

query I
select count(*) > 0 as has_results from nexmark_mv;
----
t

query I
select count(*) > 0 as has_results from nexmark_q7;
----
Expand Down
2 changes: 1 addition & 1 deletion src/connector/src/source/base.rs
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,7 @@ mod tests {
let props = ConnectorProperties::extract(props).unwrap();

if let ConnectorProperties::Nexmark(props) = props {
assert_eq!(props.table_type, EventType::Person);
assert_eq!(props.table_type, Some(EventType::Person));
assert_eq!(props.split_num, 1);
} else {
panic!("extract nexmark config failed");
Expand Down
20 changes: 11 additions & 9 deletions src/connector/src/source/nexmark/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,8 @@ pub struct NexmarkPropertiesInner {
#[serde(rename = "nexmark.event.num", default = "default_event_num")]
pub event_num: u64,

#[serde(rename = "nexmark.table.type", default = "default_event_type")]
pub table_type: EventType,
#[serde(rename = "nexmark.table.type", default = "none")]
pub table_type: Option<EventType>,

#[serde_as(as = "DisplayFromStr")]
#[serde(rename = "nexmark.max.chunk.size", default = "identity_u64::<1024>")]
Expand Down Expand Up @@ -221,10 +221,6 @@ fn default_event_num() -> u64 {
u64::MAX
}

fn default_event_type() -> EventType {
EventType::Person
}

impl Default for NexmarkPropertiesInner {
fn default() -> Self {
let v = serde_json::to_value(HashMap::<String, String>::new()).unwrap();
Expand All @@ -237,10 +233,16 @@ impl From<&NexmarkPropertiesInner> for NexmarkConfig {
// 2015-07-15 00:00:00
pub const BASE_TIME: u64 = 1_436_918_400_000;

let mut cfg = NexmarkConfig {
base_time: BASE_TIME,
..Default::default()
let mut cfg = match value.table_type {
// This is the old way
Some(_) => NexmarkConfig {
base_time: BASE_TIME,
..Default::default()
},
// By using default, it will choose the default proportion of three different events.
None => NexmarkConfig::default(),
};

macro_rules! set {
($name:ident) => {
set!($name, $name);
Expand Down
55 changes: 55 additions & 0 deletions src/connector/src/source/nexmark/source/combined_event.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
// Copyright 2022 Singularity Data
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use nexmark::event::{Auction, Bid, Person};
use serde::{Deserialize, Serialize};

#[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize)]
pub struct CombinedEvent {
event_type: u64,
/// The Person event
person: Option<Person>,
/// The Auction event.
auction: Option<Auction>,
/// The Bid event.
bid: Option<Bid>,
}

impl CombinedEvent {
fn new(
event_type: u64,
person: Option<Person>,
auction: Option<Auction>,
bid: Option<Bid>,
) -> Self {
Self {
event_type,
person,
auction,
bid,
}
}

pub fn person(person: Person) -> Self {
Self::new(0, Some(person), None, None)
}

pub fn auction(auction: Auction) -> Self {
Self::new(1, None, Some(auction), None)
}

pub fn bid(bid: Bid) -> Self {
Self::new(2, None, None, Some(bid))
}
}
17 changes: 16 additions & 1 deletion src/connector/src/source/nexmark/source/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
use bytes::Bytes;
use nexmark::event::Event;

use crate::source::nexmark::source::combined_event::CombinedEvent;
use crate::source::{SourceMessage, SplitId};

#[derive(Clone, Debug)]
Expand All @@ -35,7 +36,7 @@ impl From<NexmarkMessage> for SourceMessage {
}

impl NexmarkMessage {
pub fn new(split_id: SplitId, offset: u64, event: Event) -> Self {
pub fn new_single_event(split_id: SplitId, offset: u64, event: Event) -> Self {
NexmarkMessage {
split_id,
sequence_number: offset.to_string(),
Expand All @@ -48,4 +49,18 @@ impl NexmarkMessage {
.into(),
}
}

pub fn new_combined_event(split_id: SplitId, offset: u64, event: Event) -> Self {
let combined_event = match event {
Event::Person(p) => CombinedEvent::person(p),
Event::Auction(a) => CombinedEvent::auction(a),
Event::Bid(b) => CombinedEvent::bid(b),
};
let combined_event = serde_json::to_string(&combined_event).unwrap();
NexmarkMessage {
split_id,
sequence_number: offset.to_string(),
payload: combined_event.into(),
}
}
}
1 change: 1 addition & 0 deletions src/connector/src/source/nexmark/source/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.

mod combined_event;
pub mod message;
pub mod reader;
32 changes: 24 additions & 8 deletions src/connector/src/source/nexmark/source/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ pub struct NexmarkSplitReader {
assigned_split: NexmarkSplit,
split_id: SplitId,
event_num: u64,
event_type: EventType,
event_type: Option<EventType>,
use_real_time: bool,
min_event_gap_in_ns: u64,
max_chunk_size: u64,
Expand Down Expand Up @@ -70,11 +70,17 @@ impl SplitReader for NexmarkSplitReader {
assigned_split = split;
}

let mut generator = EventGenerator::new(NexmarkConfig::from(&*properties))
.with_offset(offset)
.with_step(split_num);
// If the user doesn't specify the event type in the source definition, then the user
// intends to use unified source(three different types of events together).
if let Some(event_type) = properties.table_type.as_ref() {
generator = generator.with_type_filter(event_type.clone());
}

Ok(NexmarkSplitReader {
generator: EventGenerator::new(NexmarkConfig::from(&*properties))
.with_offset(offset)
.with_step(split_num)
.with_type_filter(properties.table_type),
generator: generator,
assigned_split,
split_id,
max_chunk_size: properties.max_chunk_size,
Expand Down Expand Up @@ -105,8 +111,18 @@ impl NexmarkSplitReader {
break;
}
let event = self.generator.next().unwrap();
let event =
NexmarkMessage::new(self.split_id.clone(), self.generator.offset(), event);
let event = match self.event_type {
Some(_) => NexmarkMessage::new_single_event(
self.split_id.clone(),
self.generator.offset(),
event,
),
None => NexmarkMessage::new_combined_event(
self.split_id.clone(),
self.generator.offset(),
event,
),
};
msgs.push(event.into());
}
if msgs.is_empty() {
Expand Down Expand Up @@ -147,7 +163,7 @@ mod tests {
let props = Box::new(NexmarkPropertiesInner {
split_num: 2,
min_event_gap_in_ns: 0,
table_type: EventType::Bid,
table_type: Some(EventType::Bid),
max_chunk_size: 5,
..Default::default()
});
Expand Down

0 comments on commit eaba5d0

Please sign in to comment.