From 36087b24e9ec2313396b3ab8a718c7d6b3d3306d Mon Sep 17 00:00:00 2001 From: StrikeW Date: Thu, 30 May 2024 19:03:14 +0800 Subject: [PATCH] fix mysql named param --- .../source/common/MySqlValidator.java | 7 +++--- .../src/source/cdc/external/mysql.rs | 22 ++++++++++++++----- 2 files changed, 20 insertions(+), 9 deletions(-) diff --git a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/MySqlValidator.java b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/MySqlValidator.java index a130c7107499..a2f63a28bbd7 100644 --- a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/MySqlValidator.java +++ b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/MySqlValidator.java @@ -195,10 +195,9 @@ private void validateTableSchema() throws SQLException { var field = res.getString(1); var dataType = res.getString(2); var key = res.getString(3); - schema.put(field.toLowerCase(), dataType); + schema.put(field, dataType); if (key.equalsIgnoreCase("PRI")) { - // RisingWave always use lower case for column name - pkFields.add(field.toLowerCase()); + pkFields.add(field); } } @@ -208,7 +207,7 @@ private void validateTableSchema() throws SQLException { if (e.getKey().startsWith(ValidatorUtils.INTERNAL_COLUMN_PREFIX)) { continue; } - var dataType = schema.get(e.getKey().toLowerCase()); + var dataType = schema.get(e.getKey()); if (dataType == null) { throw ValidatorUtils.invalidArgument( "Column '" + e.getKey() + "' not found in the upstream database"); diff --git a/src/connector/src/source/cdc/external/mysql.rs b/src/connector/src/source/cdc/external/mysql.rs index d1009f24ebd1..5bf3f3d34464 100644 --- a/src/connector/src/source/cdc/external/mysql.rs +++ b/src/connector/src/source/cdc/external/mysql.rs @@ -324,6 +324,8 @@ impl MySqlExternalTableReader { ) }; + tracing::info!("snapshot read sql: {}", &sql); + let mut conn = self.conn.lock().await; // Set session timezone to UTC @@ -371,13 +373,15 @@ impl MySqlExternalTableReader { DataType::Timestamp => Value::from(value.into_timestamp().0), _ => bail!("unsupported primary key data type: {}", ty), }; - ConnectorResult::Ok((pk.clone(), val)) + ConnectorResult::Ok((pk.to_lowercase(), val)) } else { bail!("primary key {} cannot be null", pk); } }) .try_collect::<_, _, ConnectorError>()?; + tracing::info!("snapshot read params: {:?}", ¶ms); + let rs_stream = sql .with(Params::from(params)) .stream::(&mut *conn) @@ -401,7 +405,7 @@ impl MySqlExternalTableReader { // mysql cannot leverage the given key to narrow down the range of scan, // we need to rewrite the comparison conditions by our own. // (a, b) > (x, y) => (`a` > x) OR ((`a` = x) AND (`b` > y)) - pub(crate) fn filter_expression(columns: &[String]) -> String { + fn filter_expression(columns: &[String]) -> String { let mut conditions = vec![]; // push the first condition conditions.push(format!( @@ -414,16 +418,24 @@ impl MySqlExternalTableReader { let mut condition = String::new(); for (j, col) in columns.iter().enumerate().take(i - 1) { if j == 0 { - condition.push_str(&format!("({} = :{})", Self::quote_column(col), col)); + condition.push_str(&format!( + "({} = :{})", + Self::quote_column(col), + col.to_lowercase() + )); } else { - condition.push_str(&format!(" AND ({} = :{})", Self::quote_column(col), col)); + condition.push_str(&format!( + " AND ({} = :{})", + Self::quote_column(col), + col.to_lowercase() + )); } } // '>' condition condition.push_str(&format!( " AND ({} > :{})", Self::quote_column(&columns[i - 1]), - columns[i - 1] + columns[i - 1].to_lowercase() )); conditions.push(format!("({})", condition)); }