Skip to content

Commit

Permalink
fix mysql named param
Browse files Browse the repository at this point in the history
  • Loading branch information
StrikeW committed May 30, 2024
1 parent a025420 commit 3117fac
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -195,10 +195,9 @@ private void validateTableSchema() throws SQLException {
var field = res.getString(1);
var dataType = res.getString(2);
var key = res.getString(3);
schema.put(field.toLowerCase(), dataType);
schema.put(field, dataType);
if (key.equalsIgnoreCase("PRI")) {
// RisingWave always use lower case for column name
pkFields.add(field.toLowerCase());
pkFields.add(field);
}
}

Expand All @@ -208,7 +207,7 @@ private void validateTableSchema() throws SQLException {
if (e.getKey().startsWith(ValidatorUtils.INTERNAL_COLUMN_PREFIX)) {
continue;
}
var dataType = schema.get(e.getKey().toLowerCase());
var dataType = schema.get(e.getKey());
if (dataType == null) {
throw ValidatorUtils.invalidArgument(
"Column '" + e.getKey() + "' not found in the upstream database");
Expand Down
21 changes: 15 additions & 6 deletions src/connector/src/source/cdc/external/mysql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -371,13 +371,14 @@ impl MySqlExternalTableReader {
DataType::Timestamp => Value::from(value.into_timestamp().0),
_ => bail!("unsupported primary key data type: {}", ty),
};
ConnectorResult::Ok((pk.clone(), val))
ConnectorResult::Ok((pk.to_lowercase(), val))
} else {
bail!("primary key {} cannot be null", pk);
}
})
.try_collect::<_, _, ConnectorError>()?;

tracing::debug!("snapshot read params: {:?}", &params);
let rs_stream = sql
.with(Params::from(params))
.stream::<mysql_async::Row, _>(&mut *conn)
Expand All @@ -401,29 +402,37 @@ impl MySqlExternalTableReader {
// mysql cannot leverage the given key to narrow down the range of scan,
// we need to rewrite the comparison conditions by our own.
// (a, b) > (x, y) => (`a` > x) OR ((`a` = x) AND (`b` > y))
pub(crate) fn filter_expression(columns: &[String]) -> String {
fn filter_expression(columns: &[String]) -> String {
let mut conditions = vec![];
// push the first condition
conditions.push(format!(
"({} > :{})",
Self::quote_column(&columns[0]),
columns[0]
columns[0].to_lowercase()
));
for i in 2..=columns.len() {
// '=' condition
let mut condition = String::new();
for (j, col) in columns.iter().enumerate().take(i - 1) {
if j == 0 {
condition.push_str(&format!("({} = :{})", Self::quote_column(col), col));
condition.push_str(&format!(
"({} = :{})",
Self::quote_column(col),
col.to_lowercase()
));
} else {
condition.push_str(&format!(" AND ({} = :{})", Self::quote_column(col), col));
condition.push_str(&format!(
" AND ({} = :{})",
Self::quote_column(col),
col.to_lowercase()
));
}
}
// '>' condition
condition.push_str(&format!(
" AND ({} > :{})",
Self::quote_column(&columns[i - 1]),
columns[i - 1]
columns[i - 1].to_lowercase()
));
conditions.push(format!("({})", condition));
}
Expand Down

0 comments on commit 3117fac

Please sign in to comment.