Skip to content

Commit

Permalink
chore: split with_options.yaml to source and sink (#13500)
Browse files Browse the repository at this point in the history
  • Loading branch information
xxchan authored Nov 20, 2023
1 parent 5a9d119 commit 16e4d2a
Show file tree
Hide file tree
Showing 7 changed files with 924 additions and 869 deletions.
2 changes: 1 addition & 1 deletion Makefile.toml
Original file line number Diff line number Diff line change
Expand Up @@ -1438,7 +1438,7 @@ private = true
category = "Misc"
description = "This command extracts WITH options from the Rust code and maintains them in a single source of truth `with_options.yaml`"
script = '''
UPDATE_EXPECT=1 cargo nextest run risingwave_connector tests::test_with_options_yaml_up_to_date
UPDATE_EXPECT=1 cargo test -p risingwave_connector tests::test_with_options_yaml_up_to_date
'''

[tasks.backwards-compat-test]
Expand Down
9 changes: 5 additions & 4 deletions src/connector/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,16 +121,17 @@ where
mod tests {
use expect_test::expect_file;

use crate::with_options_test::update_with_options_yaml;
use crate::with_options_test::{
generate_with_options_yaml_sink, generate_with_options_yaml_source,
};

/// This test ensures that `src/connector/with_options.yaml` is up-to-date with the default values specified
/// in this file. Developer should run `./risedev generate-with-options` to update it if this
/// test fails.
#[test]
fn test_with_options_yaml_up_to_date() {
let expected = expect_file!("../with_options.yaml");
let actual = update_with_options_yaml();
expect_file!("../with_options_source.yaml").assert_eq(&generate_with_options_yaml_source());

expected.assert_eq(&actual);
expect_file!("../with_options_sink.yaml").assert_eq(&generate_with_options_yaml_sink());
}
}
86 changes: 65 additions & 21 deletions src/connector/src/with_options_test/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,32 +13,50 @@
// limitations under the License.

use std::collections::{BTreeMap, HashSet};
use std::path::PathBuf;
use std::path::{Path, PathBuf};
use std::{env, fs};

use itertools::Itertools;
use quote::ToTokens;
use serde::Serialize;
use syn::{parse_file, Attribute, Field, Item, ItemFn, Lit, Meta, MetaNameValue, NestedMeta, Type};

fn connector_crate_path() -> Option<PathBuf> {
let mut current_dir = env::current_dir().ok()?;
loop {
if current_dir.join("Cargo.lock").exists() {
let connector_path: PathBuf = current_dir.join("./src/connector");
return Some(connector_path);
}
if !current_dir.pop() {
break;
}
}
None
fn connector_crate_path() -> PathBuf {
let connector_crate_path = env::var("CARGO_MANIFEST_DIR").unwrap();
Path::new(&connector_crate_path).to_path_buf()
}

fn source_mod_path() -> PathBuf {
connector_crate_path().join("src").join("source")
}

fn sink_mod_path() -> PathBuf {
connector_crate_path().join("src").join("sink")
}

pub fn update_with_options_yaml() -> String {
fn common_mod_path() -> PathBuf {
connector_crate_path().join("src").join("common.rs")
}

pub fn generate_with_options_yaml_source() -> String {
generate_with_options_yaml_inner(&source_mod_path())
}

pub fn generate_with_options_yaml_sink() -> String {
generate_with_options_yaml_inner(&sink_mod_path())
}

/// Collect all structs with `#[derive(WithOptions)]` in the `.rs` files in `path` (plus `common.rs`),
/// and generate a YAML file.
fn generate_with_options_yaml_inner(path: &Path) -> String {
let mut structs = vec![];
let mut functions = BTreeMap::<String, FunctionInfo>::new();

// Recursively list all the .rs files
for entry in walkdir::WalkDir::new(connector_crate_path().unwrap().join("src")) {
for entry in walkdir::WalkDir::new(path)
.into_iter()
.chain(walkdir::WalkDir::new(common_mod_path()))
{
let entry = entry.expect("Failed to read directory entry");
if entry.path().extension() == Some("rs".as_ref()) {
// Parse the content of the .rs file
Expand Down Expand Up @@ -70,19 +88,27 @@ pub fn update_with_options_yaml() -> String {
for field in struct_item.fields {
// Process each field
if let Some(field_name) = &field.ident {
let serde_props = extract_serde_properties(&field);
let SerdeProperties {
default_func,
rename,
alias,
} = extract_serde_properties(&field);

let field_type = field.ty;
let mut required = match extract_type_name(&field_type).as_str() {
// Fields of type Option<T> or HashMap<K, V> are always considered optional.
"HashMap" | "Option" => false,
_ => true,
};
let field_type = quote::quote!(#field_type).to_string();
let mut field_type = quote::quote!(#field_type)
.to_token_stream()
.into_iter()
.join("");
// Option<T> -> T
if field_type.starts_with("Option") {
field_type = field_type[7..field_type.len() - 1].to_string();
}
let comments = extract_comments(&field.attrs);
let alias = serde_props.alias;
let rename = serde_props.rename;
let default_func = serde_props.default_func;

// Replace the function name with the function body.
let mut default = default_func.clone();
Expand All @@ -105,9 +131,16 @@ pub fn update_with_options_yaml() -> String {
default,
alias,
});
} else {
panic!("Unexpected tuple struct: {}", struct_name);
}
}
struct_infos.insert(struct_name, struct_info);
if struct_infos
.insert(struct_name.clone(), struct_info)
.is_some()
{
panic!("Duplicate struct: {}", struct_name);
};
}

// Flatten the nested options.
Expand Down Expand Up @@ -155,6 +188,7 @@ struct FunctionInfo {
body: String,
}

/// Has `#[derive(WithOptions)]`
fn has_with_options_attribute(attrs: &[Attribute]) -> bool {
attrs.iter().any(|attr| {
if let Ok(Meta::List(meta_list)) = attr.parse_meta() {
Expand Down Expand Up @@ -229,6 +263,16 @@ fn extract_serde_properties(field: &Field) -> SerdeProperties {
SerdeProperties::default()
}

/// Flatten the nested options, e.g.,
/// ```ignore
/// pub struct KafkaConfig {
/// #[serde(flatten)]
/// pub common: KafkaCommon,
/// #[serde(flatten)]
/// pub rdkafka_properties: RdKafkaPropertiesProducer,
/// // ...
/// }
/// ```
fn flatten_nested_options(options: BTreeMap<String, StructInfo>) -> BTreeMap<String, StructInfo> {
let mut deleted_keys = HashSet::new();

Expand Down
Loading

0 comments on commit 16e4d2a

Please sign in to comment.