Skip to content

Commit

Permalink
Bump flink to 1 16 (#10)
Browse files Browse the repository at this point in the history
* set flink version to 1.16

* dependency renames

* set scala version to 2.12

* fix

* fix

* fix

* nothing is changed

* now it works

* used abstract base class

* pr fixes

* comment added

* MAX_TIMESTAMP_PRECISION changed

* removed copied from postgressDialect comments

---------

Co-authored-by: Pawel Czajka <[email protected]>
  • Loading branch information
paw787878 and Pawel Czajka authored Dec 15, 2023
1 parent c4f255f commit b4b16d6
Show file tree
Hide file tree
Showing 4 changed files with 63 additions and 20 deletions.
10 changes: 5 additions & 5 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,9 @@ under the License.

<properties>
<java.version>11</java.version>
<flink.version>1.14.6</flink.version>
<flink.version>1.16.3</flink.version>
<log4j.version>2.12.1</log4j.version>
<scala.binary.version>2.11</scala.binary.version>
<scala.binary.version>2.12</scala.binary.version>
<ignite.version>2.10.0</ignite.version>
<maven.compiler.source>${java.version}</maven.compiler.source>
<maven.compiler.target>${java.version}</maven.compiler.target>
Expand Down Expand Up @@ -86,14 +86,14 @@ under the License.

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId>
<artifactId>flink-table-api-java-bridge</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-jdbc_${scala.binary.version}</artifactId>
<artifactId>flink-connector-jdbc</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
Expand Down Expand Up @@ -135,7 +135,7 @@ under the License.

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_${scala.binary.version}</artifactId>
<artifactId>flink-clients</artifactId>
<version>${flink.version}</version>
<scope>test</scope>
</dependency>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package pl.touk.flink.ignite.converter;

import org.apache.flink.connector.jdbc.internal.converter.AbstractJdbcRowConverter;
import org.apache.flink.connector.jdbc.converter.AbstractJdbcRowConverter;
import org.apache.flink.table.types.logical.RowType;

public class IgniteRowConverter extends AbstractJdbcRowConverter {
Expand Down
65 changes: 56 additions & 9 deletions src/main/java/pl/touk/flink/ignite/dialect/IgniteDialect.java
Original file line number Diff line number Diff line change
@@ -1,26 +1,31 @@
package pl.touk.flink.ignite.dialect;

import org.apache.flink.connector.jdbc.dialect.JdbcDialect;
import org.apache.flink.connector.jdbc.internal.converter.JdbcRowConverter;
import org.apache.flink.connector.jdbc.converter.JdbcRowConverter;
import org.apache.flink.connector.jdbc.dialect.AbstractDialect;
import org.apache.flink.table.types.logical.LogicalTypeRoot;
import org.apache.flink.table.types.logical.RowType;
import pl.touk.flink.ignite.converter.IgniteRowConverter;

import java.util.EnumSet;
import java.util.Optional;
import java.util.Set;

public class IgniteDialect implements JdbcDialect {

// TODO_PAWEL jest problem, jak sie zamieni wszystkie metody na throw runtime exception oprocz quoteIdentifier i getRowConverter to
// testy dalej przechodza
public class IgniteDialect extends AbstractDialect {
private static final long serialVersionUID = 1L;

// value chosen based on https://ignite.apache.org/docs/latest/sql-reference/data-types#timestamp
private static final int MAX_TIMESTAMP_PRECISION = 9;
private static final int MIN_TIMESTAMP_PRECISION = 1;
private static final int MAX_DECIMAL_PRECISION = 1000;
private static final int MIN_DECIMAL_PRECISION = 1;

@Override
public String dialectName() {
return "Ignite";
}

@Override
public boolean canHandle(String url) {
return url.startsWith("jdbc:ignite:thin");
}

@Override
public JdbcRowConverter getRowConverter(RowType rowType) {
return new IgniteRowConverter(rowType);
Expand All @@ -31,9 +36,51 @@ public String getLimitClause(long l) {
return "LIMIT " + l;
}

@Override
public String quoteIdentifier(String identifier) {
return "\"" + identifier + "\"";
}

@Override
public Optional<String> defaultDriverName() {
return Optional.of("org.apache.ignite.IgniteJdbcThinDriver");
}

// not supported for now, probably merge can be used https://ignite.apache.org/docs/latest/sql-reference/dml#merge
@Override
public Optional<String> getUpsertStatement(
String tableName, String[] fieldNames, String[] uniqueKeyFields) {
return Optional.empty();
}

@Override
public Optional<Range> decimalPrecisionRange() {
return Optional.of(Range.of(MIN_DECIMAL_PRECISION, MAX_DECIMAL_PRECISION));
}

@Override
public Optional<Range> timestampPrecisionRange() {
return Optional.of(Range.of(MIN_TIMESTAMP_PRECISION, MAX_TIMESTAMP_PRECISION));
}

@Override
public Set<LogicalTypeRoot> supportedTypes() {
return EnumSet.of(
LogicalTypeRoot.CHAR,
LogicalTypeRoot.VARCHAR,
LogicalTypeRoot.BOOLEAN,
LogicalTypeRoot.VARBINARY,
LogicalTypeRoot.DECIMAL,
LogicalTypeRoot.TINYINT,
LogicalTypeRoot.SMALLINT,
LogicalTypeRoot.INTEGER,
LogicalTypeRoot.BIGINT,
LogicalTypeRoot.FLOAT,
LogicalTypeRoot.DOUBLE,
LogicalTypeRoot.DATE,
LogicalTypeRoot.TIME_WITHOUT_TIME_ZONE,
LogicalTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE,
LogicalTypeRoot.TIMESTAMP_WITH_LOCAL_TIME_ZONE,
LogicalTypeRoot.ARRAY);
}
}
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
package pl.touk.flink.ignite;

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.configuration.ExecutionOptions;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableConfig;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl;

Expand All @@ -13,11 +11,9 @@ public class StreamTableEnvironmentUtil {
public static StreamTableEnvironment create() {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.BATCH);
TableConfig tableConfig = new TableConfig();
tableConfig.getConfiguration().set(ExecutionOptions.RUNTIME_MODE, RuntimeExecutionMode.BATCH);
EnvironmentSettings settings = EnvironmentSettings.newInstance().inBatchMode().build();

return StreamTableEnvironmentImpl.create(env, settings, tableConfig);
return StreamTableEnvironmentImpl.create(env, settings);
}

}

0 comments on commit b4b16d6

Please sign in to comment.