From 3cc20e23af753024bbaa8be6e5795cf4459d47af Mon Sep 17 00:00:00 2001 From: Konstantin Knauf Date: Fri, 27 Jan 2023 10:02:16 +0100 Subject: [PATCH] fix bugs related to the usage of BIGINT and DECIMAL datatype --- .../github/knaufk/flink/faker/FakerUtils.java | 12 ++--- .../faker/FlinkFakerIntegrationTest.java | 50 +++++++++++++++++++ 2 files changed, 54 insertions(+), 8 deletions(-) diff --git a/src/main/java/com/github/knaufk/flink/faker/FakerUtils.java b/src/main/java/com/github/knaufk/flink/faker/FakerUtils.java index ad01d11..ff2654e 100644 --- a/src/main/java/com/github/knaufk/flink/faker/FakerUtils.java +++ b/src/main/java/com/github/knaufk/flink/faker/FakerUtils.java @@ -1,7 +1,6 @@ package com.github.knaufk.flink.faker; import java.math.BigDecimal; -import java.math.BigInteger; import java.sql.Date; import java.time.Instant; import java.time.ZoneId; @@ -17,11 +16,7 @@ import org.apache.flink.table.data.GenericRowData; import org.apache.flink.table.data.StringData; import org.apache.flink.table.data.TimestampData; -import org.apache.flink.table.types.logical.ArrayType; -import org.apache.flink.table.types.logical.LogicalType; -import org.apache.flink.table.types.logical.MapType; -import org.apache.flink.table.types.logical.MultisetType; -import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.table.types.logical.*; public class FakerUtils { @@ -42,8 +37,9 @@ static Object stringValueToType(String[] stringArray, LogicalType logicalType) { case BOOLEAN: return Boolean.parseBoolean(value); case DECIMAL: + DecimalType decimalType = (DecimalType) logicalType; BigDecimal bd = new BigDecimal(value); - return DecimalData.fromBigDecimal(bd, bd.precision(), bd.scale()); + return DecimalData.fromBigDecimal(bd, decimalType.getPrecision(), decimalType.getScale()); case TINYINT: return Byte.parseByte(value); case SMALLINT: @@ -51,7 +47,7 @@ static Object stringValueToType(String[] stringArray, LogicalType logicalType) { case INTEGER: return Integer.parseInt(value); case BIGINT: - return new BigInteger(value); + return Long.valueOf(value); case FLOAT: return Float.parseFloat(value); case DOUBLE: diff --git a/src/test/java/com/github/knaufk/flink/faker/FlinkFakerIntegrationTest.java b/src/test/java/com/github/knaufk/flink/faker/FlinkFakerIntegrationTest.java index 15c9747..a6f4ed1 100644 --- a/src/test/java/com/github/knaufk/flink/faker/FlinkFakerIntegrationTest.java +++ b/src/test/java/com/github/knaufk/flink/faker/FlinkFakerIntegrationTest.java @@ -49,6 +49,56 @@ public void testRandomResultsWithoutParallelism() { assertThat(row1).isNotEqualTo(row2); } + @Test + public void testAllPrimitiveDataTypes() { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(8); + StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); + + tEnv.executeSql( + "CREATE TABLE all_types (" + + "f0 TINYINT, \n" + + "f1 SMALLINT, \n" + + "f2 INT, \n" + + "f3 BIGINT, \n" + + "f4 DOUBLE, \n" + + "f5 FLOAT, \n" + + "f6 DECIMAL(6,3), \n" + + "f7 CHAR(10), \n" + + "f8 VARCHAR(255), \n" + + "f9 STRING, \n" + + "f10 BOOLEAN \n" + + ") WITH ( \n" + + "'connector' = 'faker', \n" + + "'number-of-rows' = '" + + NUM_ROWS + + "', \n" + + "'fields.f0.expression' = '#{number.numberBetween ''-128'',''127''}', \n" + + "'fields.f1.expression' = '#{number.numberBetween ''-32768'',''32767''}', \n" + + "'fields.f2.expression' = '#{number.numberBetween ''-2147483648'',''2147483647''}', \n" + + "'fields.f3.expression' = '#{number.randomNumber ''12'',''false''}', \n" + + "'fields.f4.expression' = '#{number.randomDouble ''3'',''-999'',''999''}', \n" + + "'fields.f5.expression' = '#{number.randomDouble ''3'',''-999'',''999''}', \n" + + "'fields.f6.expression' = '#{number.randomDouble ''3'',''-999'',''999''}', \n" + + "'fields.f7.expression' = '#{Lorem.characters ''10''}', \n" + + "'fields.f8.expression' = '#{Lorem.characters ''255''}', \n" + + "'fields.f9.expression' = '#{Lorem.sentence}', \n" + + "'fields.f10.expression' = '#{regexify ''(true|false){1}''}' \n" + + ");"); + + TableResult tableResult = tEnv.executeSql("SELECT * FROM all_types"); + + CloseableIterator collect = tableResult.collect(); + + int numRows = 0; + while (collect.hasNext()) { + collect.next(); + numRows++; + } + + assertThat(numRows).isEqualTo(NUM_ROWS); + } + @Test public void testWithComputedColumn() { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();