Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(connector): Add a dummy trait WithOptions #14175

Merged
merged 6 commits into from
Dec 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ members = [
"src/common/heap_profiling",
"src/compute",
"src/connector",
"src/connector/with_options",
"src/ctl",
"src/error",
"src/expr/core",
Expand Down Expand Up @@ -55,7 +56,6 @@ members = [
"src/utils/runtime",
"src/utils/sync-point",
"src/utils/variables",
"src/utils/with_options",
"src/utils/workspace-config",
"src/workspace-hack",
]
Expand Down
2 changes: 1 addition & 1 deletion src/connector/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ tracing = "0.1"
tracing-futures = { version = "0.2", features = ["futures-03"] }
url = "2"
urlencoding = "2"
with_options = { path = "../utils/with_options" }
with_options = { path = "./with_options" }
yup-oauth2 = "8.3"

[target.'cfg(not(madsim))'.dependencies]
Expand Down
2 changes: 2 additions & 0 deletions src/connector/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ pub mod common;

pub use paste::paste;

mod with_options;

#[cfg(test)]
mod with_options_test;

Expand Down
2 changes: 1 addition & 1 deletion src/connector/src/sink/kafka.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ const fn _default_max_in_flight_requests_per_connection() -> usize {

#[derive(Debug, Clone, PartialEq, Display, Deserialize, EnumString)]
#[strum(serialize_all = "snake_case")]
enum CompressionCodec {
pub enum CompressionCodec {
None,
Gzip,
Snappy,
Expand Down
6 changes: 5 additions & 1 deletion src/connector/src/source/base.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ use crate::parser::ParserConfig;
pub(crate) use crate::source::common::CommonSplitReader;
use crate::source::filesystem::FsPageItem;
use crate::source::monitor::EnumeratorMetrics;
use crate::with_options::WithOptions;
use crate::{
dispatch_source_prop, dispatch_split_impl, for_all_sources, impl_connector_properties,
impl_split, match_source_name_str,
Expand All @@ -60,7 +61,10 @@ pub trait TryFromHashmap: Sized {
fn try_from_hashmap(props: HashMap<String, String>) -> Result<Self>;
}

pub trait SourceProperties: TryFromHashmap + Clone {
/// Represents `WITH` options for sources.
///
/// Each instance should add a `#[derive(with_options::WithOptions)]` marker.
pub trait SourceProperties: TryFromHashmap + Clone + WithOptions {
const SOURCE_NAME: &'static str;
type Split: SplitMetaData + TryFrom<SplitImpl, Error = anyhow::Error> + Into<SplitImpl>;
type SplitEnumerator: SplitEnumerator<Properties = Self, Split = Self::Split>;
Expand Down
2 changes: 1 addition & 1 deletion src/connector/src/source/datagen/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use crate::source::SourceProperties;
pub const DATAGEN_CONNECTOR: &str = "datagen";

#[serde_as]
#[derive(Clone, Debug, Deserialize)]
#[derive(Clone, Debug, Deserialize, with_options::WithOptions)]
pub struct DatagenProperties {
/// split_num means data source partition
#[serde(rename = "datagen.split.num")]
Expand Down
5 changes: 3 additions & 2 deletions src/connector/src/source/filesystem/opendal_source/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ pub mod gcs_source;
pub mod s3_source;

use serde::Deserialize;
use with_options::WithOptions;
pub mod opendal_enumerator;
pub mod opendal_reader;

Expand All @@ -30,7 +31,7 @@ pub const GCS_CONNECTOR: &str = "gcs";
// The new s3_v2 will use opendal.
pub const OPENDAL_S3_CONNECTOR: &str = "s3_v2";

#[derive(Clone, Debug, Deserialize, PartialEq)]
#[derive(Clone, Debug, Deserialize, PartialEq, WithOptions)]
pub struct GcsProperties {
#[serde(rename = "gcs.bucket_name")]
pub bucket_name: String,
Expand Down Expand Up @@ -78,7 +79,7 @@ impl OpendalSource for OpendalGcs {
}
}

#[derive(Clone, Debug, Deserialize, PartialEq)]
#[derive(Clone, Debug, Deserialize, PartialEq, with_options::WithOptions)]
pub struct OpendalS3Properties {
#[serde(flatten)]
pub s3_properties: S3PropertiesCommon,
Expand Down
4 changes: 2 additions & 2 deletions src/connector/src/source/filesystem/s3/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use crate::source::SourceProperties;
pub const S3_CONNECTOR: &str = "s3";

/// These are supported by both `s3` and `s3_v2` (opendal) sources.
#[derive(Clone, Debug, Deserialize, PartialEq)]
#[derive(Clone, Debug, Deserialize, PartialEq, with_options::WithOptions)]
pub struct S3PropertiesCommon {
#[serde(rename = "s3.region_name")]
pub region_name: String,
Expand All @@ -41,7 +41,7 @@ pub struct S3PropertiesCommon {
pub endpoint_url: Option<String>,
}

#[derive(Clone, Debug, Deserialize, PartialEq)]
#[derive(Clone, Debug, Deserialize, PartialEq, with_options::WithOptions)]
pub struct S3Properties {
#[serde(flatten)]
pub common: S3PropertiesCommon,
Expand Down
3 changes: 2 additions & 1 deletion src/connector/src/source/test_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use async_trait::async_trait;
use parking_lot::Mutex;
use risingwave_common::types::JsonbVal;
use serde_derive::{Deserialize, Serialize};
use with_options::WithOptions;

use crate::parser::ParserConfig;
use crate::source::{
Expand Down Expand Up @@ -114,7 +115,7 @@ pub fn registry_test_source(box_source: BoxSource) -> TestSourceRegistryGuard {

pub const TEST_CONNECTOR: &str = "test";

#[derive(Clone, Debug)]
#[derive(Clone, Debug, WithOptions)]
pub struct TestSourceProperties {
properties: HashMap<String, String>,
}
Expand Down
57 changes: 57 additions & 0 deletions src/connector/src/with_options.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
// Copyright 2023 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::HashMap;

/// Marker trait for `WITH` options. Only for `#[derive(WithOptions)]`, should not be used manually.
///
/// This is used to ensure the `WITH` options types have reasonable structure.
///
/// TODO: add this bound for sink. There's a `SourceProperties` trait for sources, but no similar
/// things for sinks.
pub trait WithOptions {
#[doc(hidden)]
#[inline(always)]
fn assert_receiver_is_with_options(&self) {}
}

// Currently CDC properties are handled specially.
// - It simply passes HashMap to Java DBZ.
// - It's not handled by serde.
// - It contains fields other than WITH options.
// TODO: remove the workaround here. And also use #[derive] for it.
Comment on lines +17 to +33
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Highlight TODOs


impl<T: crate::source::cdc::CdcSourceTypeTrait> WithOptions
for crate::source::cdc::CdcProperties<T>
{
}

// impl the trait for value types

impl<T: WithOptions> WithOptions for Option<T> {}
impl WithOptions for Vec<String> {}
impl WithOptions for HashMap<String, String> {}

impl WithOptions for String {}
impl WithOptions for bool {}
impl WithOptions for usize {}
impl WithOptions for u32 {}
impl WithOptions for u64 {}
impl WithOptions for i32 {}
impl WithOptions for i64 {}
impl WithOptions for f64 {}
impl WithOptions for std::time::Duration {}
impl WithOptions for crate::sink::kafka::CompressionCodec {}
impl WithOptions for nexmark::config::RateShape {}
impl WithOptions for nexmark::event::EventType {}
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,13 @@ pub fn generate_with_options_yaml_sink() -> String {

/// Collect all structs with `#[derive(WithOptions)]` in the `.rs` files in `path` (plus `common.rs`),
/// and generate a YAML file.
///
/// Note: here we assumes the struct is parsed by `serde`. If it's not the case,
/// the generated `yaml` might be inconsistent with the actual parsing logic.
/// TODO: improve the test to check whether serde is used.
///
/// - For sources, the parsing logic is in `TryFromHashMap`.
/// - For sinks, the parsing logic is in `TryFrom<SinkParam>`.
Comment on lines +51 to +57
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Highlight Note

fn generate_with_options_yaml_inner(path: &Path) -> String {
let mut structs = vec![];
let mut functions = BTreeMap::<String, FunctionInfo>::new();
Expand Down Expand Up @@ -194,7 +201,12 @@ fn has_with_options_attribute(attrs: &[Attribute]) -> bool {
if let Ok(Meta::List(meta_list)) = attr.parse_meta() {
return meta_list.path.is_ident("derive")
&& meta_list.nested.iter().any(|nested| match nested {
syn::NestedMeta::Meta(Meta::Path(path)) => path.is_ident("WithOptions"),
syn::NestedMeta::Meta(Meta::Path(path)) => {
// Check if the path contains WithOptions
path.segments
.iter()
.any(|segment| segment.ident == "WithOptions")
}
_ => false,
});
}
Expand Down Expand Up @@ -273,6 +285,9 @@ fn extract_serde_properties(field: &Field) -> SerdeProperties {
/// // ...
/// }
/// ```
///
/// Note: here we assumes `#[serde(flatten)]` is used for struct fields. If it's not the case,
/// the generated `yaml` might be inconsistent with the actual parsing logic.
fn flatten_nested_options(options: BTreeMap<String, StructInfo>) -> BTreeMap<String, StructInfo> {
let mut deleted_keys = HashSet::new();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@ repository = { workspace = true }
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
proc-macro2 = "1"
quote = "1"
syn = "2"

[lib]
proc-macro = true
Expand Down
64 changes: 64 additions & 0 deletions src/connector/with_options/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
// Copyright 2023 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use proc_macro::TokenStream;
use quote::quote;
use syn::{parse_macro_input, DeriveInput};

/// Annotates that the struct represents the WITH properties for a connector.
#[proc_macro_derive(WithOptions, attributes(with_option))]
pub fn derive_helper_attr(input: TokenStream) -> TokenStream {
let input = parse_macro_input!(input as DeriveInput);

let fields = match input.data {
syn::Data::Struct(ref data) => match data.fields {
syn::Fields::Named(ref fields) => &fields.named,
_ => return quote! { compile_error!("WithOptions can only be derived for structs with named fields"); }.into(),
},
_ => return quote! { compile_error!("WithOptions can only be derived for structs"); }.into(),
};

let mut assert_impls = vec![];

for field in fields {
let field_name = field.ident.as_ref().unwrap();

assert_impls.push(quote!(
crate::with_options::WithOptions::assert_receiver_is_with_options(&self.#field_name);
))
}

let struct_name = input.ident;
// This macro is only be expected to used in risingwave_connector. This trait is also defined there.
if input.generics.params.is_empty() {
quote! {
impl crate::with_options::WithOptions for #struct_name {
fn assert_receiver_is_with_options(&self) {
#(#assert_impls)*
}
}
}
.into()
} else {
// Note: CDC properties have generics.
let (impl_generics, ty_generics, where_clause) = input.generics.split_for_impl();
quote! {
impl #impl_generics crate::with_options::WithOptions for #struct_name #ty_generics #where_clause {
fn assert_receiver_is_with_options(&self) {
#(#assert_impls)*
}
}
}.into()
}
}
Loading
Loading