Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(nexmark): unify three sources into one #6800

Merged
merged 5 commits into from
Dec 9, 2022
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 34 additions & 0 deletions e2e_test/nexmark/create_sources.slt.part
Original file line number Diff line number Diff line change
@@ -1,3 +1,37 @@
statement ok
CREATE SOURCE nexmark (
event_type BIGINT,
person STRUCT<"id" BIGINT,
"name" VARCHAR,
"email_address" VARCHAR,
"credit_card" VARCHAR,
"city" VARCHAR,
"state" VARCHAR,
"date_time" TIMESTAMP,
"extra" VARCHAR>,
auction STRUCT<"id" BIGINT,
"item_name" VARCHAR,
"description" VARCHAR,
"initial_bid" BIGINT,
"reserve" BIGINT,
"date_time" TIMESTAMP,
"expires" TIMESTAMP,
"seller" BIGINT,
"category" BIGINT,
"extra" VARCHAR>,
bid STRUCT<"auction" BIGINT,
"bidder" BIGINT,
"price" BIGINT,
"channel" VARCHAR,
"url" VARCHAR,
"date_time" TIMESTAMP,
"extra" VARCHAR>
) WITH (
connector = 'nexmark',
nexmark.split.num = '8',
nexmark.min.event.gap.in.ns = '1000000'
) ROW FORMAT JSON;

statement ok
CREATE MATERIALIZED SOURCE person (
"id" BIGINT,
Expand Down
3 changes: 3 additions & 0 deletions e2e_test/nexmark/drop_sources.slt.part
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
statement ok
DROP SOURCE nexmark;

statement ok
DROP SOURCE person;

Expand Down
11 changes: 11 additions & 0 deletions e2e_test/source/basic/nexmark_endless.slt
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,19 @@ include ../../nexmark/create_sources.slt.part
include ../../streaming/nexmark/views/q7.slt.part
include ../../streaming/nexmark/views/q5.slt.part

statement ok
create materialized view nexmark_mv as select * from nexmark;

sleep 15s

statement ok
flush;

query I
select count(*) > 0 as has_results from nexmark_mv;
----
t

query I
select count(*) > 0 as has_results from nexmark_q7;
----
Expand All @@ -26,4 +34,7 @@ drop materialized view nexmark_q7;
statement ok
drop materialized view nexmark_q5;

statement ok
drop materialized view nexmark_mv;

include ../nexmark/drop_sources.slt.part
2 changes: 1 addition & 1 deletion src/connector/src/source/base.rs
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,7 @@ mod tests {
let props = ConnectorProperties::extract(props).unwrap();

if let ConnectorProperties::Nexmark(props) = props {
assert_eq!(props.table_type, EventType::Person);
assert_eq!(props.table_type, Some(EventType::Person));
assert_eq!(props.split_num, 1);
} else {
panic!("extract nexmark config failed");
Expand Down
20 changes: 11 additions & 9 deletions src/connector/src/source/nexmark/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,8 @@ pub struct NexmarkPropertiesInner {
#[serde(rename = "nexmark.event.num", default = "default_event_num")]
pub event_num: u64,

#[serde(rename = "nexmark.table.type", default = "default_event_type")]
pub table_type: EventType,
#[serde(rename = "nexmark.table.type", default = "none")]
pub table_type: Option<EventType>,

#[serde_as(as = "DisplayFromStr")]
#[serde(rename = "nexmark.max.chunk.size", default = "identity_u64::<1024>")]
Expand Down Expand Up @@ -221,10 +221,6 @@ fn default_event_num() -> u64 {
u64::MAX
}

fn default_event_type() -> EventType {
EventType::Person
}

impl Default for NexmarkPropertiesInner {
fn default() -> Self {
let v = serde_json::to_value(HashMap::<String, String>::new()).unwrap();
Expand All @@ -237,10 +233,16 @@ impl From<&NexmarkPropertiesInner> for NexmarkConfig {
// 2015-07-15 00:00:00
pub const BASE_TIME: u64 = 1_436_918_400_000;

let mut cfg = NexmarkConfig {
base_time: BASE_TIME,
..Default::default()
let mut cfg = match value.table_type {
// This is the old way
Some(_) => NexmarkConfig {
base_time: BASE_TIME,
..Default::default()
},
// By using default, it will choose the default proportion of three different events.
None => NexmarkConfig::default(),
};

macro_rules! set {
($name:ident) => {
set!($name, $name);
Expand Down
55 changes: 55 additions & 0 deletions src/connector/src/source/nexmark/source/combined_event.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
// Copyright 2022 Singularity Data
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use nexmark::event::{Auction, Bid, Person};
lmatz marked this conversation as resolved.
Show resolved Hide resolved
use serde::{Deserialize, Serialize};

#[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize)]
pub struct CombinedEvent {
lmatz marked this conversation as resolved.
Show resolved Hide resolved
event_type: u64,
/// The Person event
person: Option<Person>,
/// The Auction event.
auction: Option<Auction>,
/// The Bid event.
bid: Option<Bid>,
}

impl CombinedEvent {
fn new(
event_type: u64,
person: Option<Person>,
auction: Option<Auction>,
bid: Option<Bid>,
) -> Self {
Self {
event_type,
person,
auction,
bid,
}
}

pub fn person(person: Person) -> Self {
Self::new(0, Some(person), None, None)
}

pub fn auction(auction: Auction) -> Self {
Self::new(1, None, Some(auction), None)
}

pub fn bid(bid: Bid) -> Self {
Self::new(2, None, None, Some(bid))
}
}
17 changes: 16 additions & 1 deletion src/connector/src/source/nexmark/source/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
use bytes::Bytes;
use nexmark::event::Event;

use crate::source::nexmark::source::combined_event::CombinedEvent;
use crate::source::{SourceMessage, SplitId};

#[derive(Clone, Debug)]
Expand All @@ -35,7 +36,7 @@ impl From<NexmarkMessage> for SourceMessage {
}

impl NexmarkMessage {
pub fn new(split_id: SplitId, offset: u64, event: Event) -> Self {
pub fn new_single_event(split_id: SplitId, offset: u64, event: Event) -> Self {
NexmarkMessage {
split_id,
sequence_number: offset.to_string(),
Expand All @@ -48,4 +49,18 @@ impl NexmarkMessage {
.into(),
}
}

pub fn new_combined_event(split_id: SplitId, offset: u64, event: Event) -> Self {
let combined_event = match event {
Event::Person(p) => CombinedEvent::person(p),
Event::Auction(a) => CombinedEvent::auction(a),
Event::Bid(b) => CombinedEvent::bid(b),
};
let combined_event = serde_json::to_string(&combined_event).unwrap();
NexmarkMessage {
split_id,
sequence_number: offset.to_string(),
payload: combined_event.into(),
}
}
}
1 change: 1 addition & 0 deletions src/connector/src/source/nexmark/source/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.

mod combined_event;
pub mod message;
pub mod reader;
32 changes: 24 additions & 8 deletions src/connector/src/source/nexmark/source/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ pub struct NexmarkSplitReader {
assigned_split: NexmarkSplit,
split_id: SplitId,
event_num: u64,
event_type: EventType,
event_type: Option<EventType>,
use_real_time: bool,
min_event_gap_in_ns: u64,
max_chunk_size: u64,
Expand Down Expand Up @@ -70,11 +70,17 @@ impl SplitReader for NexmarkSplitReader {
assigned_split = split;
}

let mut generator = EventGenerator::new(NexmarkConfig::from(&*properties))
.with_offset(offset)
.with_step(split_num);
// If the user doesn't specify the event type in the source definition, then the user
// intends to use unified source(three different types of events together).
if let Some(event_type) = properties.table_type.as_ref() {
generator = generator.with_type_filter(*event_type);
}

Ok(NexmarkSplitReader {
generator: EventGenerator::new(NexmarkConfig::from(&*properties))
.with_offset(offset)
.with_step(split_num)
.with_type_filter(properties.table_type),
lmatz marked this conversation as resolved.
Show resolved Hide resolved
generator,
assigned_split,
split_id,
max_chunk_size: properties.max_chunk_size,
Expand Down Expand Up @@ -105,8 +111,18 @@ impl NexmarkSplitReader {
break;
}
let event = self.generator.next().unwrap();
let event =
NexmarkMessage::new(self.split_id.clone(), self.generator.offset(), event);
let event = match self.event_type {
Some(_) => NexmarkMessage::new_single_event(
self.split_id.clone(),
self.generator.offset(),
event,
),
None => NexmarkMessage::new_combined_event(
self.split_id.clone(),
self.generator.offset(),
event,
),
};
msgs.push(event.into());
}
if msgs.is_empty() {
Expand Down Expand Up @@ -147,7 +163,7 @@ mod tests {
let props = Box::new(NexmarkPropertiesInner {
split_num: 2,
min_event_gap_in_ns: 0,
table_type: EventType::Bid,
table_type: Some(EventType::Bid),
max_chunk_size: 5,
..Default::default()
});
Expand Down