diff --git a/pom.xml b/pom.xml index faf1661..cc09bd1 100644 --- a/pom.xml +++ b/pom.xml @@ -36,9 +36,9 @@ under the License. 11 - 1.14.6 + 1.16.3 2.12.1 - 2.11 + 2.12 2.10.0 ${java.version} ${java.version} @@ -86,14 +86,14 @@ under the License. org.apache.flink - flink-table-api-java-bridge_${scala.binary.version} + flink-table-api-java-bridge ${flink.version} provided org.apache.flink - flink-connector-jdbc_${scala.binary.version} + flink-connector-jdbc ${flink.version} provided @@ -135,7 +135,7 @@ under the License. org.apache.flink - flink-clients_${scala.binary.version} + flink-clients ${flink.version} test diff --git a/src/main/java/pl/touk/flink/ignite/converter/IgniteRowConverter.java b/src/main/java/pl/touk/flink/ignite/converter/IgniteRowConverter.java index 7ee8952..bcfa39d 100644 --- a/src/main/java/pl/touk/flink/ignite/converter/IgniteRowConverter.java +++ b/src/main/java/pl/touk/flink/ignite/converter/IgniteRowConverter.java @@ -1,6 +1,6 @@ package pl.touk.flink.ignite.converter; -import org.apache.flink.connector.jdbc.internal.converter.AbstractJdbcRowConverter; +import org.apache.flink.connector.jdbc.converter.AbstractJdbcRowConverter; import org.apache.flink.table.types.logical.RowType; public class IgniteRowConverter extends AbstractJdbcRowConverter { diff --git a/src/main/java/pl/touk/flink/ignite/dialect/IgniteDialect.java b/src/main/java/pl/touk/flink/ignite/dialect/IgniteDialect.java index b8c9aba..3e3ab63 100644 --- a/src/main/java/pl/touk/flink/ignite/dialect/IgniteDialect.java +++ b/src/main/java/pl/touk/flink/ignite/dialect/IgniteDialect.java @@ -1,26 +1,31 @@ package pl.touk.flink.ignite.dialect; -import org.apache.flink.connector.jdbc.dialect.JdbcDialect; -import org.apache.flink.connector.jdbc.internal.converter.JdbcRowConverter; +import org.apache.flink.connector.jdbc.converter.JdbcRowConverter; +import org.apache.flink.connector.jdbc.dialect.AbstractDialect; +import org.apache.flink.table.types.logical.LogicalTypeRoot; import org.apache.flink.table.types.logical.RowType; import pl.touk.flink.ignite.converter.IgniteRowConverter; +import java.util.EnumSet; import java.util.Optional; +import java.util.Set; -public class IgniteDialect implements JdbcDialect { - +// TODO_PAWEL jest problem, jak sie zamieni wszystkie metody na throw runtime exception oprocz quoteIdentifier i getRowConverter to +// testy dalej przechodza +public class IgniteDialect extends AbstractDialect { private static final long serialVersionUID = 1L; + // value chosen based on https://ignite.apache.org/docs/latest/sql-reference/data-types#timestamp + private static final int MAX_TIMESTAMP_PRECISION = 9; + private static final int MIN_TIMESTAMP_PRECISION = 1; + private static final int MAX_DECIMAL_PRECISION = 1000; + private static final int MIN_DECIMAL_PRECISION = 1; + @Override public String dialectName() { return "Ignite"; } - @Override - public boolean canHandle(String url) { - return url.startsWith("jdbc:ignite:thin"); - } - @Override public JdbcRowConverter getRowConverter(RowType rowType) { return new IgniteRowConverter(rowType); @@ -31,9 +36,51 @@ public String getLimitClause(long l) { return "LIMIT " + l; } + @Override + public String quoteIdentifier(String identifier) { + return "\"" + identifier + "\""; + } + @Override public Optional defaultDriverName() { return Optional.of("org.apache.ignite.IgniteJdbcThinDriver"); } + // not supported for now, probably merge can be used https://ignite.apache.org/docs/latest/sql-reference/dml#merge + @Override + public Optional getUpsertStatement( + String tableName, String[] fieldNames, String[] uniqueKeyFields) { + return Optional.empty(); + } + + @Override + public Optional decimalPrecisionRange() { + return Optional.of(Range.of(MIN_DECIMAL_PRECISION, MAX_DECIMAL_PRECISION)); + } + + @Override + public Optional timestampPrecisionRange() { + return Optional.of(Range.of(MIN_TIMESTAMP_PRECISION, MAX_TIMESTAMP_PRECISION)); + } + + @Override + public Set supportedTypes() { + return EnumSet.of( + LogicalTypeRoot.CHAR, + LogicalTypeRoot.VARCHAR, + LogicalTypeRoot.BOOLEAN, + LogicalTypeRoot.VARBINARY, + LogicalTypeRoot.DECIMAL, + LogicalTypeRoot.TINYINT, + LogicalTypeRoot.SMALLINT, + LogicalTypeRoot.INTEGER, + LogicalTypeRoot.BIGINT, + LogicalTypeRoot.FLOAT, + LogicalTypeRoot.DOUBLE, + LogicalTypeRoot.DATE, + LogicalTypeRoot.TIME_WITHOUT_TIME_ZONE, + LogicalTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE, + LogicalTypeRoot.TIMESTAMP_WITH_LOCAL_TIME_ZONE, + LogicalTypeRoot.ARRAY); + } } diff --git a/src/test/java/pl/touk/flink/ignite/StreamTableEnvironmentUtil.java b/src/test/java/pl/touk/flink/ignite/StreamTableEnvironmentUtil.java index b7cf074..4d4e004 100644 --- a/src/test/java/pl/touk/flink/ignite/StreamTableEnvironmentUtil.java +++ b/src/test/java/pl/touk/flink/ignite/StreamTableEnvironmentUtil.java @@ -1,10 +1,8 @@ package pl.touk.flink.ignite; import org.apache.flink.api.common.RuntimeExecutionMode; -import org.apache.flink.configuration.ExecutionOptions; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.EnvironmentSettings; -import org.apache.flink.table.api.TableConfig; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl; @@ -13,11 +11,9 @@ public class StreamTableEnvironmentUtil { public static StreamTableEnvironment create() { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setRuntimeMode(RuntimeExecutionMode.BATCH); - TableConfig tableConfig = new TableConfig(); - tableConfig.getConfiguration().set(ExecutionOptions.RUNTIME_MODE, RuntimeExecutionMode.BATCH); EnvironmentSettings settings = EnvironmentSettings.newInstance().inBatchMode().build(); - return StreamTableEnvironmentImpl.create(env, settings, tableConfig); + return StreamTableEnvironmentImpl.create(env, settings); } }