Skip to content

Commit

Permalink
resolve comments
Browse files Browse the repository at this point in the history
  • Loading branch information
xxchan committed Dec 27, 2023
1 parent 1879fee commit 18b6561
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 15 deletions.
10 changes: 3 additions & 7 deletions src/connector/with_options/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@ use quote::quote;
use syn::{parse_macro_input, DeriveInput};

/// Annotates that the struct represents the `WITH` properties for a connector.
/// This is a dummy marker that has no effect on the generated code. It's used to generate the `with_option_*.yaml` files.
/// This implements a marker trait `WithOptions`.
/// It's also used to generate the `with_option_*.yaml` files.
///
/// ## Notes about how to define property structs
///
Expand All @@ -28,11 +29,6 @@ use syn::{parse_macro_input, DeriveInput};
///
/// The only exception now is CDC, which needs to pass a lot of options as-is to Debezium.
///
/// ### `serde` attributes
///
/// Always add `#[serde(deny_unknown_fields)]` to the top-level struct.
/// However, do not add it if there's a `HashMap` field with `#[serde(flatten)]`.
///
/// ### Common struct
///
/// When there are some fields can be grouped together, and/or can be shared by source and sink,
Expand All @@ -42,7 +38,7 @@ use syn::{parse_macro_input, DeriveInput};
/// Add `#[derive(WithOptions)]` to both the outer and the inner struct.
///
/// Avoid using nested `#[serde(flatten)]` field in the common struct,
/// because this doesn't work with `#[serde(deny_unknown_fields)]`.
/// because this will lead to unexpected serde behaviors.
/// Put all flatten fields in the top-level struct instead.
#[proc_macro_derive(WithOptions, attributes(with_option))]
pub fn derive_helper_attr(input: TokenStream) -> TokenStream {
Expand Down
17 changes: 9 additions & 8 deletions src/meta/src/stream/source_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,12 +81,13 @@ struct ConnectorSourceWorker<P: SourceProperties> {
source_is_up: LabelGuardedIntGauge<2>,
}

fn extract_prop_from_source(
source: &Source,
deny_unknown_fields: bool,
) -> MetaResult<ConnectorProperties> {
let mut properties =
ConnectorProperties::extract(source.with_properties.clone(), deny_unknown_fields)?;
fn extract_prop_from_existing_source(source: &Source) -> MetaResult<ConnectorProperties> {
let mut properties = ConnectorProperties::extract(source.with_properties.clone(), false)?;
properties.init_from_pb_source(source);
Ok(properties)
}
fn extract_prop_from_new_source(source: &Source) -> MetaResult<ConnectorProperties> {
let mut properties = ConnectorProperties::extract(source.with_properties.clone(), true)?;
properties.init_from_pb_source(source);
Ok(properties)
}
Expand Down Expand Up @@ -719,7 +720,7 @@ impl SourceManager {
let current_splits_ref = splits.clone();
let source_id = source.id;

let connector_properties = extract_prop_from_source(&source, false)?;
let connector_properties = extract_prop_from_existing_source(&source)?;
let enable_scale_in = connector_properties.enable_split_scale_in();
let (sync_call_tx, sync_call_rx) = tokio::sync::mpsc::unbounded_channel();
let handle = tokio::spawn(async move {
Expand Down Expand Up @@ -779,7 +780,7 @@ impl SourceManager {
let current_splits_ref = splits.clone();
let source_id = source.id;

let connector_properties = extract_prop_from_source(source, true)?;
let connector_properties = extract_prop_from_new_source(source)?;
let enable_scale_in = connector_properties.enable_split_scale_in();
let (sync_call_tx, sync_call_rx) = tokio::sync::mpsc::unbounded_channel();
let handle = dispatch_source_prop!(connector_properties, prop, {
Expand Down

0 comments on commit 18b6561

Please sign in to comment.