Skip to content

Commit

Permalink
refactor(source): remove unnecessary Box for NexmarkProperties
Browse files Browse the repository at this point in the history
  • Loading branch information
xxchan committed Dec 25, 2023
1 parent 4f354b2 commit 2b16c12
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 11 deletions.
12 changes: 5 additions & 7 deletions src/connector/src/source/nexmark/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,9 @@ const fn none<T>() -> Option<T> {
None
}

pub type NexmarkProperties = Box<NexmarkPropertiesInner>;

#[serde_as]
#[derive(Clone, Debug, Deserialize, WithOptions)]
pub struct NexmarkPropertiesInner {
pub struct NexmarkProperties {
#[serde_as(as = "DisplayFromStr")]
#[serde(rename = "nexmark.split.num", default = "identity_i32::<1>")]
pub split_num: i32,
Expand Down Expand Up @@ -233,15 +231,15 @@ fn default_event_num() -> u64 {
u64::MAX
}

impl Default for NexmarkPropertiesInner {
impl Default for NexmarkProperties {
fn default() -> Self {
let v = serde_json::to_value(HashMap::<String, String>::new()).unwrap();
NexmarkPropertiesInner::deserialize(v).unwrap()
NexmarkProperties::deserialize(v).unwrap()
}
}

impl From<&NexmarkPropertiesInner> for NexmarkConfig {
fn from(value: &NexmarkPropertiesInner) -> Self {
impl From<&NexmarkProperties> for NexmarkConfig {
fn from(value: &NexmarkProperties) -> Self {
// 2015-07-15 00:00:00
pub const BASE_TIME: u64 = 1_436_918_400_000;

Expand Down
8 changes: 4 additions & 4 deletions src/connector/src/source/nexmark/source/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ impl SplitReader for NexmarkSplitReader {
let offset = split.start_offset.unwrap_or(split_index);
let assigned_split = split;

let mut generator = EventGenerator::new(NexmarkConfig::from(&*properties))
let mut generator = EventGenerator::new(NexmarkConfig::from(&properties))
.with_offset(offset)
.with_step(split_num);
// If the user doesn't specify the event type in the source definition, then the user
Expand Down Expand Up @@ -188,18 +188,18 @@ mod tests {
use anyhow::Result;

use super::*;
use crate::source::nexmark::{NexmarkPropertiesInner, NexmarkSplitEnumerator};
use crate::source::nexmark::{NexmarkProperties, NexmarkSplitEnumerator};
use crate::source::{SourceEnumeratorContext, SplitEnumerator};

#[tokio::test]
async fn test_nexmark_split_reader() -> Result<()> {
let props = Box::new(NexmarkPropertiesInner {
let props = NexmarkProperties {
split_num: 2,
min_event_gap_in_ns: 0,
table_type: Some(EventType::Bid),
max_chunk_size: 5,
..Default::default()
});
};

let mut enumerator =
NexmarkSplitEnumerator::new(props.clone(), SourceEnumeratorContext::default().into())
Expand Down

0 comments on commit 2b16c12

Please sign in to comment.