diff --git a/ci/scripts/build-other.sh b/ci/scripts/build-other.sh index f5b6692c6cbf9..9cd44dc78c95a 100755 --- a/ci/scripts/build-other.sh +++ b/ci/scripts/build-other.sh @@ -11,6 +11,7 @@ cd java mvn -B package -Dmaven.test.skip=true mvn -B install -Dmaven.test.skip=true --pl java-binding-integration-test --am mvn dependency:copy-dependencies --no-transfer-progress --pl java-binding-integration-test +mvn -B test --pl udf cd .. echo "--- Build rust binary for java binding integration test" diff --git a/e2e_test/udf/test.py b/e2e_test/udf/test.py index 6f1aa115bf953..725f6cff43b37 100644 --- a/e2e_test/udf/test.py +++ b/e2e_test/udf/test.py @@ -84,6 +84,11 @@ def hex_to_dec(hex: Optional[str]) -> Optional[Decimal]: return dec +@udf(input_types=["DECIMAL", "DECIMAL"], result_type="DECIMAL") +def decimal_add(a: Decimal, b: Decimal) -> Decimal: + return a + b + + @udf(input_types=["VARCHAR[]", "INT"], result_type="VARCHAR") def array_access(list: List[str], idx: int) -> Optional[str]: if idx == 0 or idx > len(list): @@ -212,6 +217,7 @@ def return_all_arrays( server.add_function(split) server.add_function(extract_tcp_info) server.add_function(hex_to_dec) + server.add_function(decimal_add) server.add_function(array_access) server.add_function(jsonb_access) server.add_function(jsonb_concat) diff --git a/e2e_test/udf/udf.slt b/e2e_test/udf/udf.slt index f5ab290a69a8a..4f5ba147a84a6 100644 --- a/e2e_test/udf/udf.slt +++ b/e2e_test/udf/udf.slt @@ -42,6 +42,9 @@ create function split(varchar) returns table (word varchar, length int) as split statement ok create function hex_to_dec(varchar) returns decimal as hex_to_dec using link 'http://localhost:8815'; +statement ok +create function decimal_add(decimal, decimal) returns decimal as decimal_add using link 'http://localhost:8815'; + statement ok create function array_access(varchar[], int) returns varchar as array_access using link 'http://localhost:8815'; @@ -72,6 +75,7 @@ query TTTTT rowsort show functions ---- array_access character varying[], integer character varying (empty) http://localhost:8815 +decimal_add numeric, numeric numeric (empty) http://localhost:8815 extract_tcp_info bytea struct (empty) http://localhost:8815 gcd integer, integer integer (empty) http://localhost:8815 gcd integer, integer, integer integer (empty) http://localhost:8815 @@ -106,6 +110,11 @@ select hex_to_dec('000000000000000000000000000000000000000000c0f6346334241a61f90 ---- 233276425899864771438119478 +query R +select decimal_add(1.11, 2.22); +---- +3.33 + query T select array_access(ARRAY['a', 'b', 'c'], 2); ---- @@ -142,7 +151,7 @@ select (return_all( 1 ::bigint, 1 ::float4, 1 ::float8, - 1234567890123456789012345678 ::decimal, + 12345678901234567890.12345678 ::decimal, date '2023-06-01', time '01:02:03.456789', timestamp '2023-06-01 01:02:03.456789', @@ -153,7 +162,7 @@ select (return_all( row(1, 2)::struct )).*; ---- -t 1 1 1 1 1 1234567890123456789012345678 2023-06-01 01:02:03.456789 2023-06-01 01:02:03.456789 1 mon 2 days 00:00:03 string \x6279746573 {"key": 1} (1,2) +t 1 1 1 1 1 12345678901234567890.12345678 2023-06-01 01:02:03.456789 2023-06-01 01:02:03.456789 1 mon 2 days 00:00:03 string \x6279746573 {"key": 1} (1,2) query T select (return_all_arrays( @@ -163,7 +172,7 @@ select (return_all_arrays( array[null, 1 ::bigint], array[null, 1 ::float4], array[null, 1 ::float8], - array[null, 1234567890123456789012345678 ::decimal], + array[null, 12345678901234567890.12345678 ::decimal], array[null, date '2023-06-01'], array[null, time '01:02:03.456789'], array[null, timestamp '2023-06-01 01:02:03.456789'], @@ -174,7 +183,7 @@ select (return_all_arrays( array[null, row(1, 2)::struct] )).*; ---- -{NULL,t} {NULL,1} {NULL,1} {NULL,1} {NULL,1} {NULL,1} {NULL,1234567890123456789012345678} {NULL,2023-06-01} {NULL,01:02:03.456789} {NULL,"2023-06-01 01:02:03.456789"} {NULL,"1 mon 2 days 00:00:03"} {NULL,string} {NULL,"\\x6279746573"} {NULL,"{\"key\": 1}"} {NULL,"(1,2)"} +{NULL,t} {NULL,1} {NULL,1} {NULL,1} {NULL,1} {NULL,1} {NULL,12345678901234567890.12345678} {NULL,2023-06-01} {NULL,01:02:03.456789} {NULL,"2023-06-01 01:02:03.456789"} {NULL,"1 mon 2 days 00:00:03"} {NULL,string} {NULL,"\\x6279746573"} {NULL,"{\"key\": 1}"} {NULL,"(1,2)"} query I select series(5); @@ -315,6 +324,9 @@ drop function split; statement ok drop function hex_to_dec; +statement ok +drop function decimal_add; + statement ok drop function array_access; diff --git a/java/udf-example/src/main/java/com/example/UdfExample.java b/java/udf-example/src/main/java/com/example/UdfExample.java index ef673e61c1d91..4cdbaebb93294 100644 --- a/java/udf-example/src/main/java/com/example/UdfExample.java +++ b/java/udf-example/src/main/java/com/example/UdfExample.java @@ -40,6 +40,7 @@ public static void main(String[] args) throws IOException { server.addFunction("gcd3", new Gcd3()); server.addFunction("extract_tcp_info", new ExtractTcpInfo()); server.addFunction("hex_to_dec", new HexToDec()); + server.addFunction("decimal_add", new DecimalAdd()); server.addFunction("array_access", new ArrayAccess()); server.addFunction("jsonb_access", new JsonbAccess()); server.addFunction("jsonb_concat", new JsonbConcat()); @@ -126,6 +127,12 @@ public BigDecimal eval(String hex) { } } + public static class DecimalAdd implements ScalarFunction { + public BigDecimal eval(BigDecimal a, BigDecimal b) { + return a.add(b); + } + } + public static class ArrayAccess implements ScalarFunction { public String eval(String[] array, int index) { return array[index - 1]; diff --git a/java/udf/CHANGELOG.md b/java/udf/CHANGELOG.md index 5f46b7ae339b5..48f2b014271a7 100644 --- a/java/udf/CHANGELOG.md +++ b/java/udf/CHANGELOG.md @@ -7,7 +7,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] -## [0.1.1] - 2023-11-28 +## [0.1.1] - 2023-12-01 ### Added @@ -17,6 +17,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Bump Arrow version to 14. +### Fixed + +- Fix unconstrained decimal type. + ## [0.1.0] - 2023-09-01 - Initial release. \ No newline at end of file diff --git a/java/udf/src/main/java/com/risingwave/functions/TypeUtils.java b/java/udf/src/main/java/com/risingwave/functions/TypeUtils.java index aedba8abf9823..5a70a7ed5973b 100644 --- a/java/udf/src/main/java/com/risingwave/functions/TypeUtils.java +++ b/java/udf/src/main/java/com/risingwave/functions/TypeUtils.java @@ -53,8 +53,8 @@ static Field stringToField(String typeStr, String name) { return Field.nullable(name, new ArrowType.FloatingPoint(FloatingPointPrecision.SINGLE)); } else if (typeStr.equals("FLOAT8") || typeStr.equals("DOUBLE PRECISION")) { return Field.nullable(name, new ArrowType.FloatingPoint(FloatingPointPrecision.DOUBLE)); - } else if (typeStr.startsWith("DECIMAL") || typeStr.startsWith("NUMERIC")) { - return Field.nullable(name, new ArrowType.Decimal(38, 0, 128)); + } else if (typeStr.equals("DECIMAL") || typeStr.equals("NUMERIC")) { + return Field.nullable(name, new ArrowType.LargeBinary()); } else if (typeStr.equals("DATE")) { return Field.nullable(name, new ArrowType.Date(DateUnit.DAY)); } else if (typeStr.equals("TIME") || typeStr.equals("TIME WITHOUT TIME ZONE")) { @@ -110,7 +110,7 @@ static Field classToField(Class param, DataTypeHint hint, String name) { } else if (param == Double.class || param == double.class) { return Field.nullable(name, new ArrowType.FloatingPoint(FloatingPointPrecision.DOUBLE)); } else if (param == BigDecimal.class) { - return Field.nullable(name, new ArrowType.Decimal(38, 0, 128)); + return Field.nullable(name, new ArrowType.LargeBinary()); } else if (param == LocalDate.class) { return Field.nullable(name, new ArrowType.Date(DateUnit.DAY)); } else if (param == LocalTime.class) { @@ -240,12 +240,12 @@ static void fillVector(FieldVector fieldVector, Object[] values) { vector.set(i, (double) values[i]); } } - } else if (fieldVector instanceof DecimalVector) { - var vector = (DecimalVector) fieldVector; + } else if (fieldVector instanceof LargeVarBinaryVector) { + var vector = (LargeVarBinaryVector) fieldVector; vector.allocateNew(values.length); for (int i = 0; i < values.length; i++) { if (values[i] != null) { - vector.set(i, (BigDecimal) values[i]); + vector.set(i, ((BigDecimal) values[i]).toString().getBytes()); } } } else if (fieldVector instanceof DateDayVector) { @@ -329,9 +329,9 @@ static void fillVector(FieldVector fieldVector, Object[] values) { } else if (vector.getDataVector() instanceof Float8Vector) { TypeUtils.fillListVector( vector, values, (vec, i, val) -> vec.set(i, val)); - } else if (vector.getDataVector() instanceof DecimalVector) { - TypeUtils.fillListVector( - vector, values, (vec, i, val) -> vec.set(i, val)); + } else if (vector.getDataVector() instanceof LargeVarBinaryVector) { + TypeUtils.fillListVector( + vector, values, (vec, i, val) -> vec.set(i, val.toString().getBytes())); } else if (vector.getDataVector() instanceof DateDayVector) { TypeUtils.fillListVector( vector, values, (vec, i, val) -> vec.set(i, (int) val.toEpochDay())); @@ -476,6 +476,10 @@ static Function processFunc0(Field field, Class targetClass) } else if (field.getType() instanceof ArrowType.LargeUtf8 && targetClass == String.class) { // object is org.apache.arrow.vector.util.Text return obj -> obj.toString(); + } else if (field.getType() instanceof ArrowType.LargeBinary + && targetClass == BigDecimal.class) { + // object is byte[] + return obj -> new BigDecimal(new String((byte[]) obj)); } else if (field.getType() instanceof ArrowType.Date && targetClass == LocalDate.class) { // object is Integer return obj -> LocalDate.ofEpochDay((int) obj); @@ -504,7 +508,7 @@ static Function processFunc0(Field field, Class targetClass) } else if (subfield.getType() .equals(new ArrowType.FloatingPoint(FloatingPointPrecision.DOUBLE))) { return obj -> ((List) obj).stream().map(subfunc).toArray(Double[]::new); - } else if (subfield.getType() instanceof ArrowType.Decimal) { + } else if (subfield.getType() instanceof ArrowType.LargeBinary) { return obj -> ((List) obj).stream().map(subfunc).toArray(BigDecimal[]::new); } else if (subfield.getType() instanceof ArrowType.Date) { return obj -> ((List) obj).stream().map(subfunc).toArray(LocalDate[]::new); diff --git a/java/udf/src/test/java/com/risingwave/functions/TestUdfServer.java b/java/udf/src/test/java/com/risingwave/functions/TestUdfServer.java index c72774c641aed..25977ec15e45c 100644 --- a/java/udf/src/test/java/com/risingwave/functions/TestUdfServer.java +++ b/java/udf/src/test/java/com/risingwave/functions/TestUdfServer.java @@ -186,9 +186,9 @@ public void all_types() throws Exception { c5.set(0, 1); c5.setValueCount(2); - var c6 = new DecimalVector("", allocator, 38, 0); + var c6 = new LargeVarBinaryVector("", allocator); c6.allocateNew(2); - c6.set(0, BigDecimal.valueOf(10).pow(37)); + c6.set(0, "1.234".getBytes()); c6.setValueCount(2); var c7 = new DateDayVector("", allocator); @@ -255,7 +255,7 @@ public void all_types() throws Exception { var output = stream.getRoot(); assertTrue(stream.next()); assertEquals( - "{\"bool\":true,\"i16\":1,\"i32\":1,\"i64\":1,\"f32\":1.0,\"f64\":1.0,\"decimal\":10000000000000000000000000000000000000,\"date\":19358,\"time\":3723000000,\"timestamp\":[2023,1,1,1,2,3],\"interval\":{\"period\":\"P1000M2000D\",\"duration\":0.000003000},\"str\":\"string\",\"bytes\":\"Ynl0ZXM=\",\"jsonb\":\"{ key: 1 }\",\"struct\":{\"f1\":1,\"f2\":2}}\n{}", + "{\"bool\":true,\"i16\":1,\"i32\":1,\"i64\":1,\"f32\":1.0,\"f64\":1.0,\"decimal\":\"MS4yMzQ=\",\"date\":19358,\"time\":3723000000,\"timestamp\":[2023,1,1,1,2,3],\"interval\":{\"period\":\"P1000M2000D\",\"duration\":0.000003000},\"str\":\"string\",\"bytes\":\"Ynl0ZXM=\",\"jsonb\":\"{ key: 1 }\",\"struct\":{\"f1\":1,\"f2\":2}}\n{}", output.contentToTSVString().trim()); } } diff --git a/src/common/src/array/arrow.rs b/src/common/src/array/arrow.rs index a04584fbce005..7fc6d277bce9e 100644 --- a/src/common/src/array/arrow.rs +++ b/src/common/src/array/arrow.rs @@ -143,7 +143,10 @@ converts_generic! { { arrow_array::Float64Array, Float64, ArrayImpl::Float64 }, { arrow_array::StringArray, Utf8, ArrayImpl::Utf8 }, { arrow_array::BooleanArray, Boolean, ArrayImpl::Bool }, - { arrow_array::Decimal128Array, Decimal128(_, _), ArrayImpl::Decimal }, + // Arrow doesn't have a data type to represent unconstrained numeric (`DECIMAL` in RisingWave and + // Postgres). So we pick a special type `LargeBinary` for it. + // Values stored in the array are the string representation of the decimal. e.g. b"1.234", b"+inf" + { arrow_array::LargeBinaryArray, LargeBinary, ArrayImpl::Decimal }, { arrow_array::Decimal256Array, Decimal256(_, _), ArrayImpl::Int256 }, { arrow_array::Date32Array, Date32, ArrayImpl::Date }, { arrow_array::TimestampMicrosecondArray, Timestamp(Microsecond, None), ArrayImpl::Timestamp }, @@ -169,7 +172,7 @@ impl From<&arrow_schema::DataType> for DataType { Int64 => Self::Int64, Float32 => Self::Float32, Float64 => Self::Float64, - Decimal128(_, _) => Self::Decimal, + LargeBinary => Self::Decimal, Decimal256(_, _) => Self::Int256, Date32 => Self::Date, Time64(Microsecond) => Self::Time, @@ -237,7 +240,7 @@ impl TryFrom<&DataType> for arrow_schema::DataType { DataType::Varchar => Ok(Self::Utf8), DataType::Jsonb => Ok(Self::LargeUtf8), DataType::Bytea => Ok(Self::Binary), - DataType::Decimal => Ok(Self::Decimal128(38, 0)), // arrow precision can not be 0 + DataType::Decimal => Ok(Self::LargeBinary), DataType::Struct(struct_type) => Ok(Self::Struct( struct_type .iter() @@ -462,53 +465,33 @@ impl FromIntoArrow for Interval { } } -// RisingWave Decimal type is self-contained, but Arrow is not. -// In Arrow DecimalArray, the scale is stored in data type as metadata, and the mantissa is stored -// as i128 in the array. -impl From<&DecimalArray> for arrow_array::Decimal128Array { +impl From<&DecimalArray> for arrow_array::LargeBinaryArray { fn from(array: &DecimalArray) -> Self { - let max_scale = array - .iter() - .filter_map(|o| o.map(|v| v.scale().unwrap_or(0))) - .max() - .unwrap_or(0) as u32; - let mut builder = arrow_array::builder::Decimal128Builder::with_capacity(array.len()) - .with_data_type(arrow_schema::DataType::Decimal128(38, max_scale as i8)); + let mut builder = + arrow_array::builder::LargeBinaryBuilder::with_capacity(array.len(), array.len() * 8); for value in array.iter() { - builder.append_option(value.map(|d| decimal_to_i128(d, max_scale))); + builder.append_option(value.map(|d| d.to_string())); } builder.finish() } } -fn decimal_to_i128(value: Decimal, scale: u32) -> i128 { - match value { - Decimal::Normalized(mut d) => { - d.rescale(scale); - d.mantissa() - } - Decimal::NaN => i128::MIN + 1, - Decimal::PositiveInf => i128::MAX, - Decimal::NegativeInf => i128::MIN, - } -} +impl TryFrom<&arrow_array::LargeBinaryArray> for DecimalArray { + type Error = ArrayError; -impl From<&arrow_array::Decimal128Array> for DecimalArray { - fn from(array: &arrow_array::Decimal128Array) -> Self { - assert!(array.scale() >= 0, "todo: support negative scale"); - let from_arrow = |value| { - const NAN: i128 = i128::MIN + 1; - match value { - NAN => Decimal::NaN, - i128::MAX => Decimal::PositiveInf, - i128::MIN => Decimal::NegativeInf, - _ => Decimal::Normalized(rust_decimal::Decimal::from_i128_with_scale( - value, - array.scale() as u32, - )), - } - }; - array.iter().map(|o| o.map(from_arrow)).collect() + fn try_from(array: &arrow_array::LargeBinaryArray) -> Result { + array + .iter() + .map(|o| { + o.map(|s| { + let s = std::str::from_utf8(s) + .map_err(|_| ArrayError::FromArrow(format!("invalid decimal: {s:?}")))?; + s.parse() + .map_err(|_| ArrayError::FromArrow(format!("invalid decimal: {s:?}"))) + }) + .transpose() + }) + .try_collect() } } @@ -631,20 +614,12 @@ impl TryFrom<&ListArray> for arrow_array::ListArray { b.append_option(v) }) } - ArrayImpl::Decimal(a) => { - let max_scale = a - .iter() - .filter_map(|o| o.map(|v| v.scale().unwrap_or(0))) - .max() - .unwrap_or(0) as u32; - build( - array, - a, - Decimal128Builder::with_capacity(a.len()) - .with_data_type(arrow_schema::DataType::Decimal128(38, max_scale as i8)), - |b, v| b.append_option(v.map(|d| decimal_to_i128(d, max_scale))), - ) - } + ArrayImpl::Decimal(a) => build( + array, + a, + LargeBinaryBuilder::with_capacity(a.len(), a.len() * 8), + |b, v| b.append_option(v.map(|d| d.to_string())), + ), ArrayImpl::Interval(a) => build( array, a, @@ -842,8 +817,8 @@ mod tests { Some(Decimal::Normalized("123.4".parse().unwrap())), Some(Decimal::Normalized("123.456".parse().unwrap())), ]); - let arrow = arrow_array::Decimal128Array::from(&array); - assert_eq!(DecimalArray::from(&arrow), array); + let arrow = arrow_array::LargeBinaryArray::from(&array); + assert_eq!(DecimalArray::try_from(&arrow).unwrap(), array); } #[test] diff --git a/src/udf/python/CHANGELOG.md b/src/udf/python/CHANGELOG.md index e035aab4ebb9e..9255d727c1e1d 100644 --- a/src/udf/python/CHANGELOG.md +++ b/src/udf/python/CHANGELOG.md @@ -7,6 +7,12 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +## [0.1.0] - 2023-12-01 + +### Fixed + +- Fix unconstrained decimal type. + ## [0.0.12] - 2023-11-28 ### Changed diff --git a/src/udf/python/pyproject.toml b/src/udf/python/pyproject.toml index 67d17db55dadc..97003bc29157a 100644 --- a/src/udf/python/pyproject.toml +++ b/src/udf/python/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta" [project] name = "risingwave" -version = "0.0.12" +version = "0.1.0" authors = [{ name = "RisingWave Labs" }] description = "RisingWave Python API" readme = "README.md" diff --git a/src/udf/python/risingwave/test_udf.py b/src/udf/python/risingwave/test_udf.py index e331e12f3a761..595695d6a07a6 100644 --- a/src/udf/python/risingwave/test_udf.py +++ b/src/udf/python/risingwave/test_udf.py @@ -199,7 +199,7 @@ def test_all_types(): pa.array([1], type=pa.int64()), pa.array([1], type=pa.float32()), pa.array([1], type=pa.float64()), - pa.array([10**37], type=pa.decimal128(38)), + pa.array(["12345678901234567890.1234567890"], type=pa.large_binary()), pa.array([datetime.date(2023, 6, 1)], type=pa.date32()), pa.array([datetime.time(1, 2, 3, 456789)], type=pa.time64("us")), pa.array( @@ -229,7 +229,7 @@ def test_all_types(): 1, 1.0, 1.0, - 10**37, + b"12345678901234567890.1234567890", datetime.date(2023, 6, 1), datetime.time(1, 2, 3, 456789), datetime.datetime(2023, 6, 1, 1, 2, 3, 456789), diff --git a/src/udf/python/risingwave/udf.py b/src/udf/python/risingwave/udf.py index 6443034c9147a..fc8166bcd83b8 100644 --- a/src/udf/python/risingwave/udf.py +++ b/src/udf/python/risingwave/udf.py @@ -21,6 +21,7 @@ import json from concurrent.futures import ThreadPoolExecutor import concurrent +from decimal import Decimal import signal @@ -96,6 +97,7 @@ def _process_func(type: pa.DataType, output: bool) -> Callable: if pa.types.is_list(type): func = _process_func(type.value_type, output) return lambda array: [(func(v) if v is not None else None) for v in array] + if pa.types.is_struct(type): funcs = [_process_func(field.type, output) for field in type] if output: @@ -109,11 +111,19 @@ def _process_func(type: pa.DataType, output: bool) -> Callable: (func(v) if v is not None else None) for v, func in zip(map.values(), funcs) ) - if pa.types.is_large_string(type): + + if type.equals(JSONB): if output: return lambda v: json.dumps(v) else: return lambda v: json.loads(v) + + if type.equals(UNCONSTRAINED_DECIMAL): + if output: + return lambda v: str(v).encode("utf-8") + else: + return lambda v: Decimal(v.decode("utf-8")) + return lambda v: v @@ -416,6 +426,11 @@ def _to_data_type(t: Union[str, pa.DataType]) -> pa.DataType: return t +# we use `large_binary` to represent unconstrained decimal type +UNCONSTRAINED_DECIMAL = pa.large_binary() +JSONB = pa.large_string() + + def _string_to_data_type(type_str: str): """ Convert a SQL data type string to `pyarrow.DataType`. @@ -437,7 +452,7 @@ def _string_to_data_type(type_str: str): return pa.float64() elif type_str.startswith("DECIMAL") or type_str.startswith("NUMERIC"): if type_str == "DECIMAL" or type_str == "NUMERIC": - return pa.decimal128(38) + return UNCONSTRAINED_DECIMAL rest = type_str[8:-1] # remove "DECIMAL(" and ")" if "," in rest: precision, scale = rest.split(",") @@ -455,7 +470,7 @@ def _string_to_data_type(type_str: str): elif type_str in ("VARCHAR"): return pa.string() elif type_str in ("JSONB"): - return pa.large_string() + return JSONB elif type_str in ("BYTEA"): return pa.binary() elif type_str.startswith("STRUCT"): @@ -499,8 +514,10 @@ def _data_type_to_string(t: pa.DataType) -> str: return "FLOAT4" elif t.equals(pa.float64()): return "FLOAT8" - elif t.equals(pa.decimal128(38)): + elif t.equals(UNCONSTRAINED_DECIMAL): return "DECIMAL" + elif pa.types.is_decimal(t): + return f"DECIMAL({t.precision},{t.scale})" elif t.equals(pa.date32()): return "DATE" elif t.equals(pa.time64("us")): @@ -511,7 +528,7 @@ def _data_type_to_string(t: pa.DataType) -> str: return "INTERVAL" elif t.equals(pa.string()): return "VARCHAR" - elif t.equals(pa.large_string()): + elif t.equals(JSONB): return "JSONB" elif t.equals(pa.binary()): return "BYTEA"