Skip to content

Commit

Permalink
conflict
Browse files Browse the repository at this point in the history
  • Loading branch information
yuhao-su committed Jul 12, 2024
1 parent 5224b1a commit 2e6096a
Showing 1 changed file with 14 additions and 20 deletions.
34 changes: 14 additions & 20 deletions src/connector/src/parser/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1180,11 +1180,10 @@ impl SpecificParserConfig {
info: &StreamSourceInfo,
with_properties: &WithOptionsSecResolved,
) -> ConnectorResult<Self> {
let source_struct = extract_source_struct(info)?;
let format_encode_options_with_secret = LocalSecretManager::global().fill_secrets(
info.format_encode_options.clone(),
info.format_encode_secret_refs.clone(),
)?;
let info = info.clone();
let source_struct = extract_source_struct(&info)?;
let format_encode_options_with_secret = LocalSecretManager::global()
.fill_secrets(info.format_encode_options, info.format_encode_secret_refs)?;
let (options, secret_refs) = with_properties.clone().into_parts();
let options_with_secret =
LocalSecretManager::global().fill_secrets(options.clone(), secret_refs.clone())?;
Expand All @@ -1196,7 +1195,7 @@ impl SpecificParserConfig {
SourceFormat::Native => ProtocolProperties::Native,
SourceFormat::None => ProtocolProperties::None,
SourceFormat::Debezium => {
let debezium_props = DebeziumProps::from(&info.format_encode_options);
let debezium_props = DebeziumProps::from(&format_encode_options_with_secret);
ProtocolProperties::Debezium(debezium_props)
}
SourceFormat::DebeziumMongo => ProtocolProperties::DebeziumMongo,
Expand Down Expand Up @@ -1229,24 +1228,24 @@ impl SpecificParserConfig {
config.enable_upsert = true;
}
config.schema_location = if let Some(schema_arn) =
info.format_encode_options.get(AWS_GLUE_SCHEMA_ARN_KEY)
format_encode_options_with_secret.get(AWS_GLUE_SCHEMA_ARN_KEY)
{
SchemaLocation::Glue {
schema_arn: schema_arn.clone(),
aws_auth_props: serde_json::from_value::<AwsAuthProps>(
serde_json::to_value(info.format_encode_options.clone()).unwrap(),
serde_json::to_value(format_encode_options_with_secret.clone())
.unwrap(),
)
.map_err(|e| anyhow::anyhow!(e))?,
// The option `mock_config` is not public and we can break compatibility.
mock_config: info
.format_encode_options
mock_config: format_encode_options_with_secret
.get("aws.glue.mock_config")
.cloned(),
}
} else if info.use_schema_registry {
SchemaLocation::Confluent {
urls: info.row_schema_location.clone(),
client_config: SchemaRegistryAuth::from(&info.format_encode_options),
client_config: SchemaRegistryAuth::from(&format_encode_options_with_secret),
name_strategy: PbSchemaRegistryNameStrategy::try_from(info.name_strategy)
.unwrap(),
topic: get_kafka_topic(with_properties)?.clone(),
Expand All @@ -1256,7 +1255,8 @@ impl SpecificParserConfig {
url: info.row_schema_location.clone(),
aws_auth_props: Some(
serde_json::from_value::<AwsAuthProps>(
serde_json::to_value(info.format_encode_options.clone()).unwrap(),
serde_json::to_value(format_encode_options_with_secret.clone())
.unwrap(),
)
.map_err(|e| anyhow::anyhow!(e))?,
),
Expand Down Expand Up @@ -1306,19 +1306,13 @@ impl SpecificParserConfig {
Some(info.proto_message_name.clone())
},
key_record_name: info.key_message_name.clone(),
<<<<<<< HEAD
row_schema_location: info.row_schema_location.clone(),
topic: get_kafka_topic(&options_with_secret).unwrap().clone(),
client_config: SchemaRegistryAuth::from(&format_encode_options_with_secret),
=======
schema_location: SchemaLocation::Confluent {
urls: info.row_schema_location.clone(),
client_config: SchemaRegistryAuth::from(&info.format_encode_options),
client_config: SchemaRegistryAuth::from(&format_encode_options_with_secret),
name_strategy: PbSchemaRegistryNameStrategy::try_from(info.name_strategy)
.unwrap(),
topic: get_kafka_topic(with_properties).unwrap().clone(),
},
>>>>>>> 5c52c7d752804e5e0d66b370b7da70a5cd2e8d88
..Default::default()
})
}
Expand All @@ -1332,7 +1326,7 @@ impl SpecificParserConfig {
) => EncodingProperties::Json(JsonProperties {
use_schema_registry: info.use_schema_registry,
timestamptz_handling: TimestamptzHandling::from_options(
&info.format_encode_options,
&format_encode_options_with_secret,
)?,
}),
(SourceFormat::DebeziumMongo, SourceEncode::Json) => {
Expand Down

0 comments on commit 2e6096a

Please sign in to comment.