diff --git a/src/connector/src/parser/config.rs b/src/connector/src/parser/config.rs index 52355ae058635..161606ce2f7cc 100644 --- a/src/connector/src/parser/config.rs +++ b/src/connector/src/parser/config.rs @@ -102,8 +102,12 @@ impl SpecificParserConfig { let format_encode_options_with_secret = LocalSecretManager::global() .fill_secrets(info.format_encode_options, info.format_encode_secret_refs)?; let (options, secret_refs) = with_properties.clone().into_parts(); + // Make sure `with_properties` is no longer used by accident. + // All reads shall go to `options_with_secret` instead. + #[expect(unused_variables)] + let with_properties = (); let options_with_secret = - LocalSecretManager::global().fill_secrets(options.clone(), secret_refs.clone())?; + LocalSecretManager::global().fill_secrets(options, secret_refs)?; let format = source_struct.format; let encode = source_struct.encode; // this transformation is needed since there may be config for the protocol @@ -165,7 +169,7 @@ impl SpecificParserConfig { client_config: SchemaRegistryAuth::from(&format_encode_options_with_secret), name_strategy: PbSchemaRegistryNameStrategy::try_from(info.name_strategy) .unwrap(), - topic: get_kafka_topic(with_properties)?.clone(), + topic: get_kafka_topic(&options_with_secret)?.clone(), } } else { SchemaLocation::File { @@ -228,7 +232,7 @@ impl SpecificParserConfig { client_config: SchemaRegistryAuth::from(&format_encode_options_with_secret), name_strategy: PbSchemaRegistryNameStrategy::try_from(info.name_strategy) .unwrap(), - topic: get_kafka_topic(with_properties).unwrap().clone(), + topic: get_kafka_topic(&options_with_secret).unwrap().clone(), }, ..Default::default() })