Skip to content

Commit

Permalink
fix bugs related to the usage of BIGINT and DECIMAL datatype
Browse files Browse the repository at this point in the history
  • Loading branch information
knaufk committed Jan 27, 2023
1 parent dbc0476 commit 3cc20e2
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 8 deletions.
12 changes: 4 additions & 8 deletions src/main/java/com/github/knaufk/flink/faker/FakerUtils.java
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package com.github.knaufk.flink.faker;

import java.math.BigDecimal;
import java.math.BigInteger;
import java.sql.Date;
import java.time.Instant;
import java.time.ZoneId;
Expand All @@ -17,11 +16,7 @@
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.StringData;
import org.apache.flink.table.data.TimestampData;
import org.apache.flink.table.types.logical.ArrayType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.MapType;
import org.apache.flink.table.types.logical.MultisetType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.logical.*;

public class FakerUtils {

Expand All @@ -42,16 +37,17 @@ static Object stringValueToType(String[] stringArray, LogicalType logicalType) {
case BOOLEAN:
return Boolean.parseBoolean(value);
case DECIMAL:
DecimalType decimalType = (DecimalType) logicalType;
BigDecimal bd = new BigDecimal(value);
return DecimalData.fromBigDecimal(bd, bd.precision(), bd.scale());
return DecimalData.fromBigDecimal(bd, decimalType.getPrecision(), decimalType.getScale());
case TINYINT:
return Byte.parseByte(value);
case SMALLINT:
return Short.parseShort(value);
case INTEGER:
return Integer.parseInt(value);
case BIGINT:
return new BigInteger(value);
return Long.valueOf(value);
case FLOAT:
return Float.parseFloat(value);
case DOUBLE:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,56 @@ public void testRandomResultsWithoutParallelism() {
assertThat(row1).isNotEqualTo(row2);
}

@Test
public void testAllPrimitiveDataTypes() {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(8);
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

tEnv.executeSql(
"CREATE TABLE all_types ("
+ "f0 TINYINT, \n"
+ "f1 SMALLINT, \n"
+ "f2 INT, \n"
+ "f3 BIGINT, \n"
+ "f4 DOUBLE, \n"
+ "f5 FLOAT, \n"
+ "f6 DECIMAL(6,3), \n"
+ "f7 CHAR(10), \n"
+ "f8 VARCHAR(255), \n"
+ "f9 STRING, \n"
+ "f10 BOOLEAN \n"
+ ") WITH ( \n"
+ "'connector' = 'faker', \n"
+ "'number-of-rows' = '"
+ NUM_ROWS
+ "', \n"
+ "'fields.f0.expression' = '#{number.numberBetween ''-128'',''127''}', \n"
+ "'fields.f1.expression' = '#{number.numberBetween ''-32768'',''32767''}', \n"
+ "'fields.f2.expression' = '#{number.numberBetween ''-2147483648'',''2147483647''}', \n"
+ "'fields.f3.expression' = '#{number.randomNumber ''12'',''false''}', \n"
+ "'fields.f4.expression' = '#{number.randomDouble ''3'',''-999'',''999''}', \n"
+ "'fields.f5.expression' = '#{number.randomDouble ''3'',''-999'',''999''}', \n"
+ "'fields.f6.expression' = '#{number.randomDouble ''3'',''-999'',''999''}', \n"
+ "'fields.f7.expression' = '#{Lorem.characters ''10''}', \n"
+ "'fields.f8.expression' = '#{Lorem.characters ''255''}', \n"
+ "'fields.f9.expression' = '#{Lorem.sentence}', \n"
+ "'fields.f10.expression' = '#{regexify ''(true|false){1}''}' \n"
+ ");");

TableResult tableResult = tEnv.executeSql("SELECT * FROM all_types");

CloseableIterator<Row> collect = tableResult.collect();

int numRows = 0;
while (collect.hasNext()) {
collect.next();
numRows++;
}

assertThat(numRows).isEqualTo(NUM_ROWS);
}

@Test
public void testWithComputedColumn() {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Expand Down

0 comments on commit 3cc20e2

Please sign in to comment.