Skip to content

Commit

Permalink
Add sql federation column type converter spi
Browse files Browse the repository at this point in the history
  • Loading branch information
zihaoAK47 committed Mar 17, 2024
1 parent 9913546 commit 60bca19
Show file tree
Hide file tree
Showing 11 changed files with 320 additions and 30 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.shardingsphere.sqlfederation.spi;

import org.apache.shardingsphere.infra.database.core.spi.DatabaseTypedSPI;

import java.util.Optional;

/**
* SQL federation column type convert.
*/
public interface SQLFederationColumnTypeConverter extends DatabaseTypedSPI {

/**
* Transforming the column results of a federated query.
*
* @param columnValue column value
* @return convert column value result
*/
Optional<Object> convertColumnValue(Object columnValue);

/**
* Converting the column types of a federated query.
*
* @param columnType column type
* @return convert column type result
*/
Optional<Integer> convertColumnType(Integer columnType);
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,11 @@
import org.apache.calcite.schema.Schema;
import org.apache.shardingsphere.infra.binder.context.segment.select.projection.Projection;
import org.apache.shardingsphere.infra.binder.context.statement.dml.SelectStatementContext;
import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
import org.apache.shardingsphere.infra.database.mysql.type.MySQLDatabaseType;
import org.apache.shardingsphere.infra.database.core.spi.DatabaseTypedSPILoader;
import org.apache.shardingsphere.infra.exception.core.ShardingSpherePreconditions;
import org.apache.shardingsphere.infra.executor.sql.execute.result.query.impl.driver.jdbc.type.util.ResultSetUtils;
import org.apache.shardingsphere.infra.metadata.database.schema.model.ShardingSphereSchema;
import org.apache.shardingsphere.sqlfederation.spi.SQLFederationColumnTypeConverter;

import java.io.InputStream;
import java.io.Reader;
Expand Down Expand Up @@ -71,7 +71,7 @@ public final class SQLFederationResultSet extends AbstractUnsupportedOperationRe

private final SQLFederationResultSetMetaData resultSetMetaData;

private final DatabaseType databaseType;
private final SQLFederationColumnTypeConverter sqlFederationColumnTypeConverter;

private Object[] currentRows;

Expand All @@ -82,7 +82,7 @@ public final class SQLFederationResultSet extends AbstractUnsupportedOperationRe
public SQLFederationResultSet(final Enumerator<Object> enumerator, final ShardingSphereSchema schema, final Schema sqlFederationSchema,
final SelectStatementContext selectStatementContext, final RelDataType resultColumnType) {
this.enumerator = enumerator;
this.databaseType = selectStatementContext.getDatabaseType();
this.sqlFederationColumnTypeConverter = DatabaseTypedSPILoader.getService(SQLFederationColumnTypeConverter.class, selectStatementContext.getDatabaseType());
columnLabelAndIndexes = new CaseInsensitiveMap<>(selectStatementContext.getProjectionsContext().getExpandProjections().size(), 1F);
Map<Integer, String> indexAndColumnLabels = new CaseInsensitiveMap<>(selectStatementContext.getProjectionsContext().getExpandProjections().size(), 1F);
handleColumnLabelAndIndex(columnLabelAndIndexes, indexAndColumnLabels, selectStatementContext);
Expand Down Expand Up @@ -469,18 +469,7 @@ private Object getValue(final int columnIndex, final Class<?> type) throws SQLEx
ShardingSpherePreconditions.checkState(!INVALID_FEDERATION_TYPES.contains(type), () -> new SQLFeatureNotSupportedException(String.format("Get value from `%s`", type.getName())));
Object result = currentRows[columnIndex - 1];
wasNull = null == result;
return convertValue(result);
}

private Object convertValue(final Object data) {
return databaseType instanceof MySQLDatabaseType ? convertMysqlValue(data) : data;
}

private Object convertMysqlValue(final Object data) {
if (data instanceof Boolean) {
return ((Boolean) data) ? 1 : 0;
}
return data;
return sqlFederationColumnTypeConverter.convertColumnValue(result).orElse(result);
}

private Object getCalendarValue(final int columnIndex) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,9 @@
import org.apache.shardingsphere.infra.binder.context.segment.select.projection.impl.ColumnProjection;
import org.apache.shardingsphere.infra.binder.context.statement.dml.SelectStatementContext;
import org.apache.shardingsphere.infra.database.core.DefaultDatabase;
import org.apache.shardingsphere.infra.database.mysql.type.MySQLDatabaseType;
import org.apache.shardingsphere.infra.database.core.spi.DatabaseTypedSPILoader;
import org.apache.shardingsphere.infra.metadata.database.schema.model.ShardingSphereSchema;
import org.apache.shardingsphere.sqlfederation.spi.SQLFederationColumnTypeConverter;

import java.sql.ResultSetMetaData;
import java.util.List;
Expand All @@ -52,6 +53,8 @@ public final class SQLFederationResultSetMetaData extends WrapperAdapter impleme

private final Map<Integer, String> indexAndColumnLabels;

private final SQLFederationColumnTypeConverter sqlFederationColumnTypeConverter;

public SQLFederationResultSetMetaData(final ShardingSphereSchema schema, final Schema sqlFederationSchema,
final SelectStatementContext selectStatementContext, final RelDataType resultColumnType, final Map<Integer, String> indexAndColumnLabels) {
this.schema = schema;
Expand All @@ -60,6 +63,7 @@ public SQLFederationResultSetMetaData(final ShardingSphereSchema schema, final S
this.selectStatementContext = selectStatementContext;
this.resultColumnType = resultColumnType;
this.indexAndColumnLabels = indexAndColumnLabels;
this.sqlFederationColumnTypeConverter = DatabaseTypedSPILoader.getService(SQLFederationColumnTypeConverter.class, selectStatementContext.getDatabaseType());
}

@Override
Expand Down Expand Up @@ -149,23 +153,16 @@ public String getCatalogName(final int column) {

@Override
public int getColumnType(final int column) {
return convertSqlType(resultColumnType.getFieldList().get(column - 1).getType().getSqlTypeName()).getJdbcOrdinal();
int jdbcType = resultColumnType.getFieldList().get(column - 1).getType().getSqlTypeName().getJdbcOrdinal();
return sqlFederationColumnTypeConverter.convertColumnType(jdbcType).orElse(jdbcType);
}

@Override
public String getColumnTypeName(final int column) {
return convertSqlType(resultColumnType.getFieldList().get(column - 1).getType().getSqlTypeName()).getName();
}

private SqlTypeName convertSqlType(final SqlTypeName sqlTypeName) {
return selectStatementContext.getDatabaseType() instanceof MySQLDatabaseType ? convertMysqlSqlType(sqlTypeName) : sqlTypeName;
}

private SqlTypeName convertMysqlSqlType(final SqlTypeName sqlTypeName) {
if (SqlTypeName.BOOLEAN.getName().equalsIgnoreCase(sqlTypeName.getName())) {
return SqlTypeName.BIGINT;
}
return sqlTypeName;
SqlTypeName originalSqlTypeName = resultColumnType.getFieldList().get(column - 1).getType().getSqlTypeName();
SqlTypeName convertSqlTypeName =
SqlTypeName.getNameForJdbcType(sqlFederationColumnTypeConverter.convertColumnType(originalSqlTypeName.getJdbcOrdinal()).orElse(originalSqlTypeName.getJdbcOrdinal()));
return null == convertSqlTypeName ? originalSqlTypeName.getName() : convertSqlTypeName.getName();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.shardingsphere.sqlfederation.resultset.converter;

import org.apache.calcite.sql.type.SqlTypeName;
import org.apache.shardingsphere.sqlfederation.spi.SQLFederationColumnTypeConverter;

import java.util.Optional;

/**
* MySQL column type converter.
*/
public final class MySQLColumnTypeConverter implements SQLFederationColumnTypeConverter {

@Override
public Optional<Object> convertColumnValue(final Object columnValue) {
if (columnValue instanceof Boolean) {
return Optional.of((Boolean) columnValue ? 1 : 0);
}
return Optional.empty();
}

@Override
public Optional<Integer> convertColumnType(final Integer columnType) {
if (SqlTypeName.BOOLEAN.getJdbcOrdinal() == columnType) {
return Optional.of(SqlTypeName.BIGINT.getJdbcOrdinal());
}
return Optional.empty();
}

@Override
public String getDatabaseType() {
return "MySQL";
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.shardingsphere.sqlfederation.resultset.converter;

import org.apache.shardingsphere.sqlfederation.spi.SQLFederationColumnTypeConverter;

import java.util.Optional;

/**
* OpenGauss column type converter.
*/
public final class OpenGaussColumnTypeConverter implements SQLFederationColumnTypeConverter {

@Override
public Optional<Object> convertColumnValue(final Object columnValue) {
return Optional.empty();
}

@Override
public Optional<Integer> convertColumnType(final Integer columnType) {
return Optional.empty();
}

@Override
public String getDatabaseType() {
return "openGauss";
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.shardingsphere.sqlfederation.resultset.converter;

import org.apache.shardingsphere.sqlfederation.spi.SQLFederationColumnTypeConverter;

import java.util.Optional;

/**
* Oracle column type converter.
*/
public final class OracleColumnTypeConverter implements SQLFederationColumnTypeConverter {

@Override
public Optional<Object> convertColumnValue(final Object columnValue) {
return Optional.empty();
}

@Override
public Optional<Integer> convertColumnType(final Integer columnType) {
return Optional.empty();
}

@Override
public String getDatabaseType() {
return "Oracle";
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.shardingsphere.sqlfederation.resultset.converter;

import org.apache.shardingsphere.sqlfederation.spi.SQLFederationColumnTypeConverter;

import java.util.Optional;

/**
* PostgreSQL column type converter.
*/
public final class PostgreSQLColumnTypeConverter implements SQLFederationColumnTypeConverter {

@Override
public Optional<Object> convertColumnValue(final Object columnValue) {
return Optional.empty();
}

@Override
public Optional<Integer> convertColumnType(final Integer columnType) {
return Optional.empty();
}

@Override
public String getDatabaseType() {
return "PostgreSQL";
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

org.apache.shardingsphere.sqlfederation.resultset.converter.MySQLColumnTypeConverter
org.apache.shardingsphere.sqlfederation.resultset.converter.OracleColumnTypeConverter
org.apache.shardingsphere.sqlfederation.resultset.converter.OpenGaussColumnTypeConverter
org.apache.shardingsphere.sqlfederation.resultset.converter.PostgreSQLColumnTypeConverter
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.shardingsphere.infra.binder.context.statement.dml.SelectStatementContext;
import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
import org.apache.shardingsphere.infra.metadata.database.schema.model.ShardingSphereSchema;
import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
import org.apache.shardingsphere.sqlfederation.optimizer.metadata.schema.SQLFederationSchema;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
Expand Down Expand Up @@ -80,6 +81,7 @@ private SelectStatementContext createSelectStatementContext() {
TablesContext tablesContext = mock(TablesContext.class);
when(tablesContext.getTableNames()).thenReturn(Collections.emptyList());
when(result.getTablesContext()).thenReturn(tablesContext);
when(result.getDatabaseType()).thenReturn(TypedSPILoader.getService(DatabaseType.class, "FIXTURE"));
return result;
}

Expand Down
Loading

0 comments on commit 60bca19

Please sign in to comment.