Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
StrikeW committed Jun 2, 2024
1 parent 56d4bf6 commit 92e63f6
Show file tree
Hide file tree
Showing 4 changed files with 15 additions and 11 deletions.
10 changes: 9 additions & 1 deletion e2e_test/source/cdc/cdc.share_stream.slt
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ SELECT id,name,description FROM rw.products_test order by id limit 3
103 12-pack drill bits 12-pack of drill bits with sizes ranging from #40 to #3

query TT
select database_name, tabl_name from rw.products_test limit 3;
select database_name, table_name from rw.products_test limit 3;
----
mytest products
mytest products
Expand Down Expand Up @@ -291,6 +291,14 @@ SELECT * from person_new_cnt
----
6

query TTT
SELECT database_name,schema_name,table_name from person_new limit 3;
----
cdc_test public person
cdc_test public person
cdc_test public person


query ITTTT
SELECT id,name,email_address,credit_card,city from person_new order by id;
----
Expand Down
8 changes: 1 addition & 7 deletions src/connector/src/source/cdc/external/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,13 +96,6 @@ pub const SCHEMA_NAME_KEY: &str = "schema.name";
pub const DATABASE_NAME_KEY: &str = "database.name";

impl SchemaTableName {
pub fn new(schema_name: String, table_name: String) -> Self {
Self {
schema_name,
table_name,
}
}

pub fn from_properties(properties: &HashMap<String, String>) -> Self {
let table_type = CdcTableType::from_properties(properties);
let table_name = properties.get(TABLE_NAME_KEY).cloned().unwrap_or_default();
Expand Down Expand Up @@ -385,6 +378,7 @@ impl MySqlExternalTableReader {
)
};

tracing::debug!("snapshot sql: {}", sql);
let mut conn = self.conn.lock().await;

// Set session timezone to UTC
Expand Down
5 changes: 4 additions & 1 deletion src/connector/src/source/cdc/external/postgres.rs
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,10 @@ impl PostgresExternalTableReader {
.map(|i| rw_schema.fields[*i].name.clone())
.collect_vec();

let table_name = SchemaTableName::new(config.schema.clone(), config.table.clone());
let table_name = SchemaTableName {
schema_name: config.schema.clone(),
table_name: config.table.clone(),
};
let order_key = primary_keys.iter().join(",");
let scan_sql = format!(
"SELECT {} FROM {} WHERE {} ORDER BY {} LIMIT {scan_limit}",
Expand Down
3 changes: 1 addition & 2 deletions src/stream/src/from_proto/stream_cdc_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,12 +91,11 @@ impl ExecutorBuilder for StreamCdcScanExecutorBuilder {
.map(Into::into)
.collect();

let schema_table_name = SchemaTableName::from_properties(&properties);
let table_config = serde_json::from_value::<ExternalTableConfig>(
serde_json::to_value(properties).unwrap(),
)
.map_err(|e| anyhow!("failed to parse external table config").context(e))?;
let schema_table_name =
SchemaTableName::new(table_config.schema.clone(), table_config.table.clone());
let database_name = table_config.database.clone();
let table_reader = table_type
.create_table_reader(
Expand Down

0 comments on commit 92e63f6

Please sign in to comment.