Skip to content

Commit

Permalink
refine derive impl. Add missing derives
Browse files Browse the repository at this point in the history
  • Loading branch information
xxchan committed Dec 25, 2023
1 parent 30f499f commit 15281a4
Show file tree
Hide file tree
Showing 9 changed files with 113 additions and 15 deletions.
3 changes: 3 additions & 0 deletions src/connector/src/source/base.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,9 @@ pub trait TryFromHashmap: Sized {
fn try_from_hashmap(props: HashMap<String, String>) -> Result<Self>;
}

/// Represents `WITH` options for sources.
///
/// Each instance should add a `#[derive(with_options::WithOptions)]` marker.
pub trait SourceProperties: TryFromHashmap + Clone + WithOptions {
const SOURCE_NAME: &'static str;
type Split: SplitMetaData + TryFrom<SplitImpl, Error = anyhow::Error> + Into<SplitImpl>;
Expand Down
2 changes: 1 addition & 1 deletion src/connector/src/source/datagen/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use crate::source::SourceProperties;
pub const DATAGEN_CONNECTOR: &str = "datagen";

#[serde_as]
#[derive(Clone, Debug, Deserialize)]
#[derive(Clone, Debug, Deserialize, with_options::WithOptions)]
pub struct DatagenProperties {
/// split_num means data source partition
#[serde(rename = "datagen.split.num")]
Expand Down
2 changes: 1 addition & 1 deletion src/connector/src/source/filesystem/opendal_source/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ impl OpendalSource for OpendalGcs {
}
}

#[derive(Clone, Debug, Deserialize, PartialEq)]
#[derive(Clone, Debug, Deserialize, PartialEq, with_options::WithOptions)]
pub struct OpendalS3Properties {
pub s3_properties: S3Properties,
#[serde(rename = "s3.assume_role", default)]
Expand Down
2 changes: 1 addition & 1 deletion src/connector/src/source/filesystem/s3/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use crate::source::SourceProperties;

pub const S3_CONNECTOR: &str = "s3";

#[derive(Clone, Debug, Deserialize, PartialEq)]
#[derive(Clone, Debug, Deserialize, PartialEq, with_options::WithOptions)]
pub struct S3Properties {
#[serde(rename = "s3.region_name")]
pub region_name: String,
Expand Down
3 changes: 2 additions & 1 deletion src/connector/src/source/test_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use async_trait::async_trait;
use parking_lot::Mutex;
use risingwave_common::types::JsonbVal;
use serde_derive::{Deserialize, Serialize};
use with_options::WithOptions;

use crate::parser::ParserConfig;
use crate::source::{
Expand Down Expand Up @@ -114,7 +115,7 @@ pub fn registry_test_source(box_source: BoxSource) -> TestSourceRegistryGuard {

pub const TEST_CONNECTOR: &str = "test";

#[derive(Clone, Debug)]
#[derive(Clone, Debug, WithOptions)]
pub struct TestSourceProperties {
properties: HashMap<String, String>,
}
Expand Down
17 changes: 16 additions & 1 deletion src/connector/src/with_options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

use std::collections::HashMap;

/// Dummy trait for `WITH` options. Only for `#[derive(WithOptions)]`, should not be used manually.
/// Marker trait for `WITH` options. Only for `#[derive(WithOptions)]`, should not be used manually.
///
/// This is used to ensure the `WITH` options types have reasonable structure.
///
Expand All @@ -26,6 +26,21 @@ pub trait WithOptions {
fn assert_receiver_is_with_options(&self) {}
}

// Currently CDC properties are handled specially.
// - It simply passes HashMap to Java DBZ.
// - It's not handled by serde.
// - It contains fields other than WITH options.
// TODO: remove the workaround here. And also use #[derive] for it.

impl<T: crate::source::cdc::CdcSourceTypeTrait> WithOptions
for crate::source::cdc::CdcProperties<T>
{
}

// We might want to box the struct if it has too many fields.

impl<T: WithOptions> WithOptions for Box<T> {}

// impl the trait for value types

impl<T: WithOptions> WithOptions for Option<T> {}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,12 @@ fn has_with_options_attribute(attrs: &[Attribute]) -> bool {
if let Ok(Meta::List(meta_list)) = attr.parse_meta() {
return meta_list.path.is_ident("derive")
&& meta_list.nested.iter().any(|nested| match nested {
syn::NestedMeta::Meta(Meta::Path(path)) => path.is_ident("WithOptions"),
syn::NestedMeta::Meta(Meta::Path(path)) => {
// Check if the path contains WithOptions
path.segments
.iter()
.any(|segment| segment.ident == "WithOptions")
}
_ => false,
});
}
Expand Down
29 changes: 20 additions & 9 deletions src/connector/with_options/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,26 +28,37 @@ pub fn derive_helper_attr(input: TokenStream) -> TokenStream {
},
_ => return quote! { compile_error!("WithOptions can only be derived for structs"); }.into(),
};
let struct_name = input.ident;

let mut assert_impls = vec![];

for field in fields {
let field_name = field.ident.as_ref().unwrap();

assert_impls.push(quote!(
self.#field_name.assert_receiver_is_with_options();
crate::with_options::WithOptions::assert_receiver_is_with_options(&self.#field_name);
))
}

let struct_name = input.ident;
// This macro is only be expected to used in risingwave_connector. This trait is also defined there.
let code = quote! {
impl crate::with_options::WithOptions for #struct_name {
fn assert_receiver_is_with_options(&self) {
#(#assert_impls)*
if input.generics.params.is_empty() {
quote! {
impl crate::with_options::WithOptions for #struct_name {
fn assert_receiver_is_with_options(&self) {
#(#assert_impls)*
}
}
}
};

code.into()
.into()
} else {
// Note: CDC properties have generics.
let (impl_generics, ty_generics, where_clause) = input.generics.split_for_impl();
quote! {
impl #impl_generics crate::with_options::WithOptions for #struct_name #ty_generics #where_clause {
fn assert_receiver_is_with_options(&self) {
#(#assert_impls)*
}
}
}.into()
}
}
63 changes: 63 additions & 0 deletions src/connector/with_options_source.yaml
Original file line number Diff line number Diff line change
@@ -1,5 +1,36 @@
# THIS FILE IS AUTO_GENERATED. DO NOT EDIT

DatagenProperties:
fields:
- name: datagen.split.num
field_type: String
comments: split_num means data source partition
required: false
- name: datagen.rows.per.second
field_type: u64
comments: default_rows_per_second =10 when the split_num = 3 and default_rows_per_second =10 there will be three readers that generate respectively 4,3,3 message per second
required: false
default: '10'
- name: fields
field_type: HashMap<String,String>
comments: 'Some connector options of the datagen source''s fields for example: create datagen source with column v1 int, v2 float ''fields.v1.kind''=''sequence'', ''fields.v1.start''=''1'', ''fields.v1.end''=''1000'', ''fields.v2.kind''=''random'', datagen will create v1 by self-incrementing from 1 to 1000 datagen will create v2 by randomly generating from default_min to default_max'
required: false
GcsProperties:
fields:
- name: gcs.bucket_name
field_type: String
required: true
- name: gcs.credential
field_type: String
required: false
- name: gcs.service_account
field_type: String
required: false
default: Default::default
- name: match_pattern
field_type: String
required: false
default: Default::default
KafkaProperties:
fields:
- name: bytes.per.second
Expand Down Expand Up @@ -397,6 +428,33 @@ NexmarkProperties:
field_type: usize
required: false
default: None
OpendalS3Properties:
fields:
- name: s3.region_name
field_type: String
required: true
- name: s3.bucket_name
field_type: String
required: true
- name: match_pattern
field_type: String
required: false
default: Default::default
- name: s3.credentials.access
field_type: String
required: false
default: Default::default
- name: s3.credentials.secret
field_type: String
required: false
default: Default::default
- name: s3.endpoint_url
field_type: String
required: false
- name: s3.assume_role
field_type: String
required: false
default: Default::default
PubsubProperties:
fields:
- name: pubsub.split_count
Expand Down Expand Up @@ -488,3 +546,8 @@ PulsarProperties:
field_type: String
required: false
default: Default::default
TestSourceProperties:
fields:
- name: properties
field_type: HashMap<String,String>
required: false

0 comments on commit 15281a4

Please sign in to comment.