diff --git a/src/connector/src/source/nexmark/source/combined_event.rs b/src/connector/src/source/nexmark/source/combined_event.rs new file mode 100644 index 0000000000000..468606ab4f86d --- /dev/null +++ b/src/connector/src/source/nexmark/source/combined_event.rs @@ -0,0 +1,41 @@ +use nexmark::event::{Auction, Bid, Person}; +use serde::{Deserialize, Serialize}; + +#[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize)] +pub struct CombinedEvent { + event_type: u64, + /// The Person event + person: Option, + /// The Auction event. + auction: Option, + /// The Bid event. + bid: Option, +} + +impl CombinedEvent { + fn new( + event_type: u64, + person: Option, + auction: Option, + bid: Option, + ) -> Self { + Self { + event_type, + person, + auction, + bid, + } + } + + pub fn person(person: Person) -> Self { + Self::new(0, Some(person), None, None) + } + + pub fn auction(auction: Auction) -> Self { + Self::new(1, None, Some(auction), None) + } + + pub fn bid(bid: Bid) -> Self { + Self::new(2, None, None, Some(bid)) + } +} diff --git a/src/connector/src/source/nexmark/source/message.rs b/src/connector/src/source/nexmark/source/message.rs index bc1fc3f207119..3483fe2186f5e 100644 --- a/src/connector/src/source/nexmark/source/message.rs +++ b/src/connector/src/source/nexmark/source/message.rs @@ -13,9 +13,9 @@ // limitations under the License. use bytes::Bytes; -use nexmark::event::{Auction, Bid, Event, Person}; -use serde::{Deserialize, Serialize}; +use nexmark::event::Event; +use crate::source::nexmark::source::combined_event::CombinedEvent; use crate::source::{SourceMessage, SplitId}; #[derive(Clone, Debug)] @@ -35,45 +35,6 @@ impl From for SourceMessage { } } -#[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize)] -struct CombinedEvent { - event_type: u64, - /// The Person event - person: Option, - /// The Auction event. - auction: Option, - /// The Bid event. - bid: Option, -} - -impl CombinedEvent { - fn new( - event_type: u64, - person: Option, - auction: Option, - bid: Option, - ) -> Self { - Self { - event_type, - person, - auction, - bid, - } - } - - fn person(person: Person) -> Self { - Self::new(0, Some(person), None, None) - } - - fn auction(auction: Auction) -> Self { - Self::new(1, None, Some(auction), None) - } - - fn bid(bid: Bid) -> Self { - Self::new(2, None, None, Some(bid)) - } -} - impl NexmarkMessage { pub fn new(split_id: SplitId, offset: u64, event: Event) -> Self { let combined_event = match event { diff --git a/src/connector/src/source/nexmark/source/mod.rs b/src/connector/src/source/nexmark/source/mod.rs index 0c3835c786eff..0fbf995e74f92 100644 --- a/src/connector/src/source/nexmark/source/mod.rs +++ b/src/connector/src/source/nexmark/source/mod.rs @@ -12,5 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. +mod combined_event; pub mod message; pub mod reader;