From 0ca8d0a23520ed5aeec8ae68fd8e99ad14b26712 Mon Sep 17 00:00:00 2001 From: AFine-gs <69924417+AFine-gs@users.noreply.github.com> Date: Thu, 19 Oct 2023 18:54:19 -0400 Subject: [PATCH] update arrow stream writer (#2388) --- .../format/arrow/ArrowDataWriter.java | 33 +++++++++-------- .../format/arrow/ArrowRuntimeExtension.java | 2 +- .../src/test/java/TestArrowNodeExecutor.java | 11 +++--- .../src/test/java/TestArrowQueries.java | 5 ++- .../src/test/resources/arrowService.json | 34 +++--------------- .../resources/expectedArrowServiceData.arrow | Bin 880 -> 696 bytes 6 files changed, 32 insertions(+), 53 deletions(-) diff --git a/legend-engine-xts-arrow/legend-engine-xt-arrow-runtime/src/main/java/org/finos/legend/engine/external/format/arrow/ArrowDataWriter.java b/legend-engine-xts-arrow/legend-engine-xt-arrow-runtime/src/main/java/org/finos/legend/engine/external/format/arrow/ArrowDataWriter.java index 330d78225c6..381604cbea7 100644 --- a/legend-engine-xts-arrow/legend-engine-xt-arrow-runtime/src/main/java/org/finos/legend/engine/external/format/arrow/ArrowDataWriter.java +++ b/legend-engine-xts-arrow/legend-engine-xt-arrow-runtime/src/main/java/org/finos/legend/engine/external/format/arrow/ArrowDataWriter.java @@ -16,11 +16,9 @@ import java.nio.charset.Charset; import java.util.Calendar; -import java.util.HashMap; -import java.util.Locale; +import java.util.GregorianCalendar; import java.util.TimeZone; -import org.apache.arrow.adapter.jdbc.JdbcFieldInfo; import org.apache.arrow.adapter.jdbc.JdbcToArrowConfig; import org.apache.arrow.adapter.jdbc.JdbcToArrowConfigBuilder; import org.apache.arrow.adapter.jdbc.LegendArrowVectorIterator; @@ -32,39 +30,40 @@ import java.io.IOException; import java.io.OutputStream; -import java.sql.ResultSet; import java.sql.SQLException; +import org.finos.legend.engine.plan.execution.stores.relational.result.RelationalResult; public class ArrowDataWriter extends ExternalFormatWriter implements AutoCloseable { private final LegendArrowVectorIterator iterator; private final BufferAllocator allocator; - public ArrowDataWriter(ResultSet resultSet) throws SQLException + public ArrowDataWriter(RelationalResult resultSet) throws SQLException { - HashMap map = new HashMap(); - this.allocator = new RootAllocator(); - JdbcToArrowConfig config = new JdbcToArrowConfigBuilder(allocator, Calendar.getInstance(TimeZone.getDefault(), Locale.ROOT)).build(); - this.iterator = LegendArrowVectorIterator.create(resultSet, config); + Calendar calendar = resultSet.getRelationalDatabaseTimeZone() == null ? + new GregorianCalendar(TimeZone.getTimeZone("GMT")) : + new GregorianCalendar(TimeZone.getTimeZone(resultSet.getRelationalDatabaseTimeZone())); + JdbcToArrowConfig config = new JdbcToArrowConfigBuilder(allocator, calendar).setReuseVectorSchemaRoot(true).build(); + this.iterator = LegendArrowVectorIterator.create(resultSet.getResultSet(), config); } @Override public void writeData(OutputStream outputStream) throws IOException { - try + try (VectorSchemaRoot vector = iterator.next(); + ArrowStreamWriter writer = new ArrowStreamWriter(vector, null, outputStream); + ) { + writer.start(); + writer.writeBatch(); while (this.iterator.hasNext()) { - try (VectorSchemaRoot vector = iterator.next(); - ArrowStreamWriter writer = new ArrowStreamWriter(vector, null, outputStream) - ) - { - writer.start(); - writer.writeBatch(); - } + iterator.next(); + writer.writeBatch(); + } } catch (Exception e) diff --git a/legend-engine-xts-arrow/legend-engine-xt-arrow-runtime/src/main/java/org/finos/legend/engine/external/format/arrow/ArrowRuntimeExtension.java b/legend-engine-xts-arrow/legend-engine-xt-arrow-runtime/src/main/java/org/finos/legend/engine/external/format/arrow/ArrowRuntimeExtension.java index 372a03f85ac..326a6053071 100644 --- a/legend-engine-xts-arrow/legend-engine-xt-arrow-runtime/src/main/java/org/finos/legend/engine/external/format/arrow/ArrowRuntimeExtension.java +++ b/legend-engine-xts-arrow/legend-engine-xt-arrow-runtime/src/main/java/org/finos/legend/engine/external/format/arrow/ArrowRuntimeExtension.java @@ -65,7 +65,7 @@ public Result executeExternalizeTDSExecutionNode(ExternalFormatExternalizeTDSExe private Result streamArrowFromRelational(RelationalResult relationalResult) throws SQLException, IOException { - return new ExternalFormatSerializeResult(new ArrowDataWriter(relationalResult.getResultSet()), relationalResult, CONTENT_TYPE); + return new ExternalFormatSerializeResult(new ArrowDataWriter(relationalResult), relationalResult, CONTENT_TYPE); } diff --git a/legend-engine-xts-arrow/legend-engine-xt-arrow-runtime/src/test/java/TestArrowNodeExecutor.java b/legend-engine-xts-arrow/legend-engine-xt-arrow-runtime/src/test/java/TestArrowNodeExecutor.java index 0c388a95af5..6db2d4232d4 100644 --- a/legend-engine-xts-arrow/legend-engine-xt-arrow-runtime/src/test/java/TestArrowNodeExecutor.java +++ b/legend-engine-xts-arrow/legend-engine-xt-arrow-runtime/src/test/java/TestArrowNodeExecutor.java @@ -14,6 +14,7 @@ import java.io.FileOutputStream; import java.io.IOException; +import java.util.TimeZone; import org.apache.arrow.memory.BufferAllocator; import org.apache.arrow.memory.RootAllocator; import org.apache.arrow.vector.VectorSchemaRoot; @@ -31,6 +32,7 @@ import org.finos.legend.engine.protocol.pure.v1.model.packageableElement.store.relational.model.result.SQLResultColumn; import org.finos.legend.engine.shared.core.api.request.RequestContext; import org.junit.Assert; +import org.junit.Before; import org.junit.Test; import org.mockito.Mockito; @@ -45,7 +47,6 @@ public class TestArrowNodeExecutor { - @Test public void testExternalize() throws Exception { @@ -57,7 +58,7 @@ public void testExternalize() throws Exception mockExecutionNode.connection = mockDatabaseConnection; Mockito.when(mockDatabaseConnection.accept(any())).thenReturn(false); - try (Connection conn = DriverManager.getConnection("jdbc:h2:~/test", "sa", ""); + try (Connection conn = DriverManager.getConnection("jdbc:h2:~/test;TIME ZONE=America/New_York", "sa", ""); ByteArrayOutputStream outputStream = new ByteArrayOutputStream()) { //setup table @@ -70,7 +71,7 @@ public void testExternalize() throws Exception conn.createStatement().execute("INSERT INTO testtable (testInt, testString, testDate, testBool) VALUES(1,'A', '2020-01-01 00:00:00-05:00',true),( 2,null, '2020-01-01 00:00:00-02:00',false ),( 3,'B', '2020-01-01 00:00:00-05:00',false )"); conn.createStatement().execute("INSERT INTO testtableJoin (testIntR, testStringR) VALUES(6,'A'), (1,'B')"); - RelationalResult result = new RelationalResult(FastList.newListWith(new RelationalExecutionActivity("SELECT * FROM testtable left join testtableJoin on testtable.testInt=testtableJoin.testIntR", null)), mockExecutionNode, FastList.newListWith(new SQLResultColumn("testInt", "INTEGER"), new SQLResultColumn("testStringR", "VARCHAR"), new SQLResultColumn("testString", "VARCHAR"), new SQLResultColumn("testDate", "TIMESTAMP"), new SQLResultColumn("testBool", "TIMESTAMP")), null, "GMT", conn, null, null, null, new RequestContext()); + RelationalResult result = new RelationalResult(FastList.newListWith(new RelationalExecutionActivity("SELECT * FROM testtable left join testtableJoin on testtable.testInt=testtableJoin.testIntR", null)), mockExecutionNode, FastList.newListWith(new SQLResultColumn("testInt", "INTEGER"), new SQLResultColumn("testStringR", "VARCHAR"), new SQLResultColumn("testString", "VARCHAR"), new SQLResultColumn("testDate", "TIMESTAMP"), new SQLResultColumn("testBool", "TIMESTAMP")), null, "America/New_York", conn, null, null, null, new RequestContext()); ExternalFormatSerializeResult nodeExecute = (ExternalFormatSerializeResult) extension.executeExternalizeTDSExecutionNode(node, result, null, null); @@ -97,7 +98,7 @@ public void testExternalizeAsString() throws Exception mockExecutionNode.connection = mockDatabaseConnection; Mockito.when(mockDatabaseConnection.accept(any())).thenReturn(false); - try (Connection conn = DriverManager.getConnection("jdbc:h2:~/test", "sa", ""); + try (Connection conn = DriverManager.getConnection("jdbc:h2:~/test;TIME ZONE=America/New_York", "sa", ""); ) { @@ -106,7 +107,7 @@ public void testExternalizeAsString() throws Exception conn.createStatement().execute("Create Table testtable (testInt INTEGER, testString VARCHAR(255), testDate TIMESTAMP, testBool BOOLEAN)"); conn.createStatement().execute("INSERT INTO testtable (testInt, testString, testDate, testBool) VALUES(1,'A', '2020-01-01 00:00:00-05:00',true),( 2,'B', '2020-01-01 00:00:00-02:00',false ),( 3,'B', '2020-01-01 00:00:00-05:00',false )"); - RelationalResult result = new RelationalResult(FastList.newListWith(new RelationalExecutionActivity("SELECT * FROM testtable", null)), mockExecutionNode, FastList.newListWith(new SQLResultColumn("testInt", "INTEGER"), new SQLResultColumn("testString", "VARCHAR"), new SQLResultColumn("testDate", "TIMESTAMP"), new SQLResultColumn("testBool", "TIMESTAMP")), null, "GMT", conn, null, null, null, new RequestContext()); + RelationalResult result = new RelationalResult(FastList.newListWith(new RelationalExecutionActivity("SELECT * FROM testtable", null)), mockExecutionNode, FastList.newListWith(new SQLResultColumn("testInt", "INTEGER"), new SQLResultColumn("testString", "VARCHAR"), new SQLResultColumn("testDate", "TIMESTAMP"), new SQLResultColumn("testBool", "TIMESTAMP")), null, "America/New_York", conn, null, null, null, new RequestContext()); ExternalFormatSerializeResult nodeExecute = (ExternalFormatSerializeResult) extension.executeExternalizeTDSExecutionNode(node, result, null, null); diff --git a/legend-engine-xts-arrow/legend-engine-xt-arrow-runtime/src/test/java/TestArrowQueries.java b/legend-engine-xts-arrow/legend-engine-xt-arrow-runtime/src/test/java/TestArrowQueries.java index 2f0dfbb49c7..529873fee2a 100644 --- a/legend-engine-xts-arrow/legend-engine-xt-arrow-runtime/src/test/java/TestArrowQueries.java +++ b/legend-engine-xts-arrow/legend-engine-xt-arrow-runtime/src/test/java/TestArrowQueries.java @@ -55,6 +55,7 @@ public class TestArrowQueries { + @Test public void runTest() { @@ -62,6 +63,7 @@ public void runTest() ByteArrayOutputStream baos = new ByteArrayOutputStream(); ) { + ObjectMapper objectMapper = ObjectMapperFactory.getNewStandardObjectMapperWithPureProtocolExtensionSupports(); ExecuteInput input = objectMapper.readValue(getClass().getClassLoader().getResource("arrowService.json"), ExecuteInput.class); @@ -81,7 +83,8 @@ public void runTest() .build(); StreamingResult streamingResult = (StreamingResult) executor.executeWithArgs(executeArgs); streamingResult.stream(baos, SerializationFormat.DEFAULT); - assertAndValidateArrow(new ByteArrayInputStream(baos.toByteArray()), "expectedArrowServiceData.arrow"); + assertAndValidateArrow(new ByteArrayInputStream(baos.toByteArray()), "expectedArrowServiceData.arrow"); + } catch (Exception e) { diff --git a/legend-engine-xts-arrow/legend-engine-xt-arrow-runtime/src/test/resources/arrowService.json b/legend-engine-xts-arrow/legend-engine-xt-arrow-runtime/src/test/resources/arrowService.json index 89e3581953c..59ccedc3265 100644 --- a/legend-engine-xts-arrow/legend-engine-xt-arrow-runtime/src/test/resources/arrowService.json +++ b/legend-engine-xts-arrow/legend-engine-xt-arrow-runtime/src/test/resources/arrowService.json @@ -24,8 +24,8 @@ { "_type": "collection", "multiplicity": { - "lowerBound": 5, - "upperBound": 5 + "lowerBound": 4, + "upperBound": 4 }, "values": [ { @@ -91,27 +91,6 @@ } ] }, - { - "_type": "lambda", - "body": [ - { - "_type": "property", - "parameters": [ - { - "_type": "var", - "name": "x" - } - ], - "property": "settlementDateTime" - } - ], - "parameters": [ - { - "_type": "var", - "name": "x" - } - ] - }, { "_type": "lambda", "body": [ @@ -138,8 +117,8 @@ { "_type": "collection", "multiplicity": { - "lowerBound": 5, - "upperBound": 5 + "lowerBound": 4, + "upperBound": 4 }, "values": [ { @@ -154,10 +133,6 @@ "_type": "string", "value": "Quantity" }, - { - "_type": "string", - "value": "Settlement Date Time" - }, { "_type": "string", "value": "Trade Date" @@ -447,6 +422,7 @@ { "connection": { "_type": "RelationalDatabaseConnection", + "timeZone" : "America/New_York", "authenticationStrategy": { "_type": "h2Default" }, diff --git a/legend-engine-xts-arrow/legend-engine-xt-arrow-runtime/src/test/resources/expectedArrowServiceData.arrow b/legend-engine-xts-arrow/legend-engine-xt-arrow-runtime/src/test/resources/expectedArrowServiceData.arrow index e556d7a94a0da7f8fe45dd9e15edd9a1370e356c..c03b1b8aa2d33cd273ea2a4644ae9d1dfa410cdc 100644 GIT binary patch delta 164 zcmeyswu6=RKM;6KWVIG!VPIgm0mLmp>;S|dzSsYWQF3C(fJ%TI1}-2DDN0O9Rd7ix zNuBuZm?B8C1S1232uK7DAZaMzm@LXv&C3Df a0%?}XdzqXk3ou&<9Pwaa0AY~HAO!%J+9js| delta 317 zcmdnN`hku0KM?Gg$Z9Rc%D}+j!pOkz0Z6X_Vh12*0pdshCWgt?#{rcAISgDt98#2+ zlB(d6Sdz-XAO#c`0g8dJ3=jiVFj|0V2*JU?1JuL9zyaifFarZ4g8-0p%uOxIOitAI zOD&I&%rD9Y$$-oVPAw_PNzF~oD*;=e5R#b-w1r_}_fb`l5|C>>fQ|$y1pyugF$O*c zQJ|45KsEzI#N