From aeaf7cf8131014107e783da7d9ce2ef80923e728 Mon Sep 17 00:00:00 2001 From: Abhishek Sharma Date: Fri, 12 Jul 2024 16:29:16 -0400 Subject: [PATCH 01/24] Upgrade hadoop version to 3.4.0 (#13582) --- pinot-plugins/pinot-input-format/pinot-orc/pom.xml | 4 ++++ pom.xml | 8 +++++++- 2 files changed, 11 insertions(+), 1 deletion(-) diff --git a/pinot-plugins/pinot-input-format/pinot-orc/pom.xml b/pinot-plugins/pinot-input-format/pinot-orc/pom.xml index 1a92bf7cd4d..1fb737299ab 100644 --- a/pinot-plugins/pinot-input-format/pinot-orc/pom.xml +++ b/pinot-plugins/pinot-input-format/pinot-orc/pom.xml @@ -74,5 +74,9 @@ org.apache.orc orc-core + + com.google.protobuf + protobuf-java + diff --git a/pom.xml b/pom.xml index 82160540eda..c3942be8b6b 100644 --- a/pom.xml +++ b/pom.xml @@ -149,7 +149,7 @@ 3.30.2-GA 1.6.14 5.17.14 - 3.3.6 + 3.4.0 2.9.0 2.5.1 2.3.2 @@ -233,6 +233,7 @@ 2.12 + 24.1.0 2.0.0 3.9.0 2.0.3 @@ -1490,6 +1491,11 @@ kotlin-stdlib-common ${kotlin.stdlib.version} + + org.jetbrains + annotations + ${jetbrains.annotations.version} + com.squareup.okio okio From 1d788196d31793fe7209506c52b7c4424a43c0b3 Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Fri, 12 Jul 2024 13:33:30 -0700 Subject: [PATCH 02/24] Bump io.grpc:grpc-bom from 1.65.0 to 1.65.1 (#13592) --- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index c3942be8b6b..a88472ef91f 100644 --- a/pom.xml +++ b/pom.xml @@ -221,7 +221,7 @@ 3.25.3 - 1.65.0 + 1.65.1 26.43.0 1.1.1 2.28.0 From abbe10c474cfa19dba02cdb14f61c69410f90a2f Mon Sep 17 00:00:00 2001 From: Pratik Tibrewal Date: Tue, 16 Jul 2024 00:02:30 +0530 Subject: [PATCH 03/24] Remove preloading-type nature of enableSnapshot config (#13579) * Remove preloading-type nature of enableSnapshot config * Remove snapshot when adding segment if snapshot is not enabled --- .../BasePartitionUpsertMetadataManager.java | 23 +++---------------- 1 file changed, 3 insertions(+), 20 deletions(-) diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java index d483a65c223..f03c9e86292 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java @@ -432,30 +432,13 @@ protected void doAddSegment(ImmutableSegmentImpl segment) { String segmentName = segment.getSegmentName(); _logger.info("Adding segment: {}, current primary key count: {}", segmentName, getNumPrimaryKeys()); long startTimeMs = System.currentTimeMillis(); - - MutableRoaringBitmap validDocIds; - if (_enableSnapshot) { - validDocIds = segment.loadValidDocIdsFromSnapshot(); - if (validDocIds != null && validDocIds.isEmpty()) { - _logger.info("Skip adding segment: {} without valid doc, current primary key count: {}", - segment.getSegmentName(), getNumPrimaryKeys()); - segment.enableUpsert(this, new ThreadSafeMutableRoaringBitmap(), null); - return; - } - } else { - validDocIds = null; + if (!_enableSnapshot) { segment.deleteValidDocIdsSnapshot(); } - try (UpsertUtils.RecordInfoReader recordInfoReader = new UpsertUtils.RecordInfoReader(segment, _primaryKeyColumns, _comparisonColumns, _deleteRecordColumn)) { - Iterator recordInfoIterator; - if (validDocIds != null) { - recordInfoIterator = UpsertUtils.getRecordInfoIterator(recordInfoReader, validDocIds); - } else { - recordInfoIterator = - UpsertUtils.getRecordInfoIterator(recordInfoReader, segment.getSegmentMetadata().getTotalDocs()); - } + Iterator recordInfoIterator = + UpsertUtils.getRecordInfoIterator(recordInfoReader, segment.getSegmentMetadata().getTotalDocs()); addSegment(segment, null, null, recordInfoIterator); } catch (Exception e) { throw new RuntimeException( From ec044c6e2c230c4268a830917fa6ff71e5ae37a3 Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Mon, 15 Jul 2024 12:16:27 -0700 Subject: [PATCH 04/24] Bump com.microsoft.azure:msal4j from 1.16.0 to 1.16.1 (#13599) --- pinot-plugins/pinot-file-system/pinot-adls/pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pinot-plugins/pinot-file-system/pinot-adls/pom.xml b/pinot-plugins/pinot-file-system/pinot-adls/pom.xml index 0c1b1cf4729..09d9683ce86 100644 --- a/pinot-plugins/pinot-file-system/pinot-adls/pom.xml +++ b/pinot-plugins/pinot-file-system/pinot-adls/pom.xml @@ -67,7 +67,7 @@ com.microsoft.azure msal4j - 1.16.0 + 1.16.1 From 9889cfd17820f928d0167f9c8d4ab34945d45c9a Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Mon, 15 Jul 2024 12:16:46 -0700 Subject: [PATCH 05/24] Bump org.codehaus.mojo:versions-maven-plugin from 2.17.0 to 2.17.1 (#13600) --- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index a88472ef91f..dcb095e47d0 100644 --- a/pom.xml +++ b/pom.xml @@ -2277,7 +2277,7 @@ org.codehaus.mojo versions-maven-plugin - 2.17.0 + 2.17.1 maven-shade-plugin From 02f554df9bd3e612ee6f6f27955a4306f8831c62 Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Mon, 15 Jul 2024 12:17:01 -0700 Subject: [PATCH 06/24] Bump software.amazon.awssdk:bom from 2.26.19 to 2.26.20 (#13601) --- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index dcb095e47d0..c4bb4e73d11 100644 --- a/pom.xml +++ b/pom.xml @@ -173,7 +173,7 @@ 0.15.0 0.4.4 4.2.2 - 2.26.19 + 2.26.20 2.12.7 3.1.12 7.10.2 From a64011cc450cd3267c8e85115e1d1978fa94af9b Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Mon, 15 Jul 2024 12:17:13 -0700 Subject: [PATCH 07/24] Bump org.apache.maven.plugins:maven-release-plugin from 3.1.0 to 3.1.1 (#13602) --- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index c4bb4e73d11..0d6442658b6 100644 --- a/pom.xml +++ b/pom.xml @@ -1790,7 +1790,7 @@ org.apache.maven.plugins maven-release-plugin - 3.1.0 + 3.1.1 org.apache.maven.scm From 429525d9c4bd70e7cedbc14a6d814847dcf023a5 Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Mon, 15 Jul 2024 12:17:34 -0700 Subject: [PATCH 08/24] Bump org.apache.maven.plugins:maven-project-info-reports-plugin (#13603) --- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index 0d6442658b6..6b80de668a0 100644 --- a/pom.xml +++ b/pom.xml @@ -2386,7 +2386,7 @@ org.apache.maven.plugins maven-project-info-reports-plugin - 3.6.1 + 3.6.2 false From a182a0d4061fa529f918ee832ec4f2f549829bf0 Mon Sep 17 00:00:00 2001 From: "Xiaotian (Jackie) Jiang" <17555551+Jackie-Jiang@users.noreply.github.com> Date: Mon, 15 Jul 2024 12:18:07 -0700 Subject: [PATCH 09/24] Remove unused BaseDataTable (#13596) --- .../pinot/common/datatable/BaseDataTable.java | 288 ------------------ .../pinot/common/datatable/DataTable.java | 2 - .../scheduler/PrioritySchedulerTest.java | 3 +- 3 files changed, 1 insertion(+), 292 deletions(-) delete mode 100644 pinot-common/src/main/java/org/apache/pinot/common/datatable/BaseDataTable.java diff --git a/pinot-common/src/main/java/org/apache/pinot/common/datatable/BaseDataTable.java b/pinot-common/src/main/java/org/apache/pinot/common/datatable/BaseDataTable.java deleted file mode 100644 index ab0a227a110..00000000000 --- a/pinot-common/src/main/java/org/apache/pinot/common/datatable/BaseDataTable.java +++ /dev/null @@ -1,288 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pinot.common.datatable; - -import java.io.ByteArrayOutputStream; -import java.io.DataOutputStream; -import java.io.IOException; -import java.math.BigDecimal; -import java.nio.ByteBuffer; -import java.util.HashMap; -import java.util.Map; -import javax.annotation.Nullable; -import org.apache.pinot.common.CustomObject; -import org.apache.pinot.common.utils.DataSchema; -import org.apache.pinot.common.utils.HashUtil; -import org.apache.pinot.spi.trace.Tracing; -import org.apache.pinot.spi.utils.BigDecimalUtils; -import org.apache.pinot.spi.utils.ByteArray; -import org.apache.pinot.spi.utils.BytesUtils; -import org.roaringbitmap.RoaringBitmap; - -import static java.nio.charset.StandardCharsets.UTF_8; - - -/** - * Base implementation of the DataTable interface. - */ -public abstract class BaseDataTable implements DataTable { - protected int _numRows; - protected int _numColumns; - protected DataSchema _dataSchema; - protected int[] _columnOffsets; - protected int _rowSizeInBytes; - protected Map> _dictionaryMap; - protected byte[] _fixedSizeDataBytes; - protected ByteBuffer _fixedSizeData; - protected byte[] _variableSizeDataBytes; - protected ByteBuffer _variableSizeData; - protected Map _metadata; - - public BaseDataTable(int numRows, DataSchema dataSchema, Map> dictionaryMap, - byte[] fixedSizeDataBytes, byte[] variableSizeDataBytes) { - _numRows = numRows; - _numColumns = dataSchema.size(); - _dataSchema = dataSchema; - _columnOffsets = new int[_numColumns]; - _rowSizeInBytes = DataTableUtils.computeColumnOffsets(dataSchema, _columnOffsets, getVersion()); - _dictionaryMap = dictionaryMap; - _fixedSizeDataBytes = fixedSizeDataBytes; - _fixedSizeData = ByteBuffer.wrap(fixedSizeDataBytes); - _variableSizeDataBytes = variableSizeDataBytes; - _variableSizeData = ByteBuffer.wrap(variableSizeDataBytes); - _metadata = new HashMap<>(); - } - - /** - * Construct empty data table. (Server side) - */ - public BaseDataTable() { - _numRows = 0; - _numColumns = 0; - _dataSchema = null; - _columnOffsets = null; - _rowSizeInBytes = 0; - _dictionaryMap = null; - _fixedSizeDataBytes = null; - _fixedSizeData = null; - _variableSizeDataBytes = null; - _variableSizeData = null; - _metadata = new HashMap<>(); - } - - /** - * Helper method to serialize dictionary map. - */ - protected byte[] serializeDictionaryMap() - throws IOException { - ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); - DataOutputStream dataOutputStream = new DataOutputStream(byteArrayOutputStream); - - dataOutputStream.writeInt(_dictionaryMap.size()); - int numEntriesAdded = 0; - for (Map.Entry> dictionaryMapEntry : _dictionaryMap.entrySet()) { - Tracing.ThreadAccountantOps.sampleAndCheckInterruptionPeriodically(numEntriesAdded); - String columnName = dictionaryMapEntry.getKey(); - Map dictionary = dictionaryMapEntry.getValue(); - byte[] bytes = columnName.getBytes(UTF_8); - dataOutputStream.writeInt(bytes.length); - dataOutputStream.write(bytes); - dataOutputStream.writeInt(dictionary.size()); - - for (Map.Entry dictionaryEntry : dictionary.entrySet()) { - dataOutputStream.writeInt(dictionaryEntry.getKey()); - byte[] valueBytes = dictionaryEntry.getValue().getBytes(UTF_8); - dataOutputStream.writeInt(valueBytes.length); - dataOutputStream.write(valueBytes); - } - numEntriesAdded++; - } - - return byteArrayOutputStream.toByteArray(); - } - - /** - * Helper method to deserialize dictionary map. - */ - protected Map> deserializeDictionaryMap(ByteBuffer buffer) - throws IOException { - int numDictionaries = buffer.getInt(); - Map> dictionaryMap = new HashMap<>(HashUtil.getHashMapCapacity(numDictionaries)); - - for (int i = 0; i < numDictionaries; i++) { - String column = DataTableUtils.decodeString(buffer); - int dictionarySize = buffer.getInt(); - Map dictionary = new HashMap<>(HashUtil.getHashMapCapacity(dictionarySize)); - for (int j = 0; j < dictionarySize; j++) { - int key = buffer.getInt(); - String value = DataTableUtils.decodeString(buffer); - dictionary.put(key, value); - } - dictionaryMap.put(column, dictionary); - } - - return dictionaryMap; - } - - @Override - public Map getMetadata() { - return _metadata; - } - - @Override - public DataSchema getDataSchema() { - return _dataSchema; - } - - @Override - public int getNumberOfRows() { - return _numRows; - } - - @Override - public int getInt(int rowId, int colId) { - return _fixedSizeData.getInt(rowId * _rowSizeInBytes + _columnOffsets[colId]); - } - - @Override - public long getLong(int rowId, int colId) { - return _fixedSizeData.getLong(rowId * _rowSizeInBytes + _columnOffsets[colId]); - } - - @Override - public float getFloat(int rowId, int colId) { - return _fixedSizeData.getFloat(rowId * _rowSizeInBytes + _columnOffsets[colId]); - } - - @Override - public double getDouble(int rowId, int colId) { - return _fixedSizeData.getDouble(rowId * _rowSizeInBytes + _columnOffsets[colId]); - } - - @Override - public BigDecimal getBigDecimal(int rowId, int colId) { - int size = positionCursorInVariableBuffer(rowId, colId); - ByteBuffer byteBuffer = _variableSizeData.slice(); - byteBuffer.limit(size); - return BigDecimalUtils.deserialize(byteBuffer); - } - - @Override - public String getString(int rowId, int colId) { - int dictId = _fixedSizeData.getInt(rowId * _rowSizeInBytes + _columnOffsets[colId]); - return _dictionaryMap.get(_dataSchema.getColumnName(colId)).get(dictId); - } - - @Override - public ByteArray getBytes(int rowId, int colId) { - // NOTE: DataTable V2/V3 uses String to store BYTES value - return BytesUtils.toByteArray(getString(rowId, colId)); - } - - @Override - public int[] getIntArray(int rowId, int colId) { - int length = positionCursorInVariableBuffer(rowId, colId); - int[] ints = new int[length]; - for (int i = 0; i < length; i++) { - ints[i] = _variableSizeData.getInt(); - } - return ints; - } - - @Override - public long[] getLongArray(int rowId, int colId) { - int length = positionCursorInVariableBuffer(rowId, colId); - long[] longs = new long[length]; - for (int i = 0; i < length; i++) { - longs[i] = _variableSizeData.getLong(); - } - return longs; - } - - @Override - public float[] getFloatArray(int rowId, int colId) { - int length = positionCursorInVariableBuffer(rowId, colId); - float[] floats = new float[length]; - for (int i = 0; i < length; i++) { - floats[i] = _variableSizeData.getFloat(); - } - return floats; - } - - @Override - public double[] getDoubleArray(int rowId, int colId) { - int length = positionCursorInVariableBuffer(rowId, colId); - double[] doubles = new double[length]; - for (int i = 0; i < length; i++) { - doubles[i] = _variableSizeData.getDouble(); - } - return doubles; - } - - @Override - public String[] getStringArray(int rowId, int colId) { - int length = positionCursorInVariableBuffer(rowId, colId); - String[] strings = new String[length]; - Map dictionary = _dictionaryMap.get(_dataSchema.getColumnName(colId)); - for (int i = 0; i < length; i++) { - strings[i] = dictionary.get(_variableSizeData.getInt()); - } - return strings; - } - - @Nullable - @Override - public CustomObject getCustomObject(int rowId, int colId) { - int size = positionCursorInVariableBuffer(rowId, colId); - int type = _variableSizeData.getInt(); - if (size == 0) { - assert type == CustomObject.NULL_TYPE_VALUE; - return null; - } - ByteBuffer buffer = _variableSizeData.slice(); - buffer.limit(size); - return new CustomObject(type, buffer); - } - - @Nullable - @Override - public RoaringBitmap getNullRowIds(int colId) { - return null; - } - - private int positionCursorInVariableBuffer(int rowId, int colId) { - int offset = rowId * _rowSizeInBytes + _columnOffsets[colId]; - _variableSizeData.position(_fixedSizeData.getInt(offset)); - return _fixedSizeData.getInt(offset + 4); - } - - @Override - public String toString() { - if (_dataSchema == null) { - return _metadata.toString(); - } else { - StringBuilder stringBuilder = new StringBuilder(); - stringBuilder.append("resultSchema:").append('\n'); - stringBuilder.append(_dataSchema).append('\n'); - stringBuilder.append("numRows: ").append(_numRows).append('\n'); - stringBuilder.append("metadata: ").append(_metadata.toString()).append('\n'); - return stringBuilder.toString(); - } - } -} diff --git a/pinot-common/src/main/java/org/apache/pinot/common/datatable/DataTable.java b/pinot-common/src/main/java/org/apache/pinot/common/datatable/DataTable.java index 88490cf7818..890e2963494 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/datatable/DataTable.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/datatable/DataTable.java @@ -35,8 +35,6 @@ * Data table is used to transfer data from server to broker. */ public interface DataTable { - // TODO: remove this when we stop supporting DataTable V2. - String EXCEPTION_METADATA_KEY = "Exception"; void addException(ProcessingException processingException); diff --git a/pinot-core/src/test/java/org/apache/pinot/core/query/scheduler/PrioritySchedulerTest.java b/pinot-core/src/test/java/org/apache/pinot/core/query/scheduler/PrioritySchedulerTest.java index caeb453c553..5f92dd5a25a 100644 --- a/pinot-core/src/test/java/org/apache/pinot/core/query/scheduler/PrioritySchedulerTest.java +++ b/pinot-core/src/test/java/org/apache/pinot/core/query/scheduler/PrioritySchedulerTest.java @@ -213,8 +213,7 @@ public void testOutOfCapacityResponse() group.addLast(createQueryRequest("1", METRICS)); results.add(scheduler.submit(createServerQueryRequest("1", METRICS))); DataTable dataTable = DataTableFactory.getDataTable(results.get(1).get()); - assertTrue(dataTable.getMetadata() - .containsKey(DataTable.EXCEPTION_METADATA_KEY + QueryException.SERVER_OUT_OF_CAPACITY_ERROR.getErrorCode())); + assertTrue(dataTable.getExceptions().containsKey(QueryException.SERVER_OUT_OF_CAPACITY_ERROR.getErrorCode())); scheduler.stop(); } From 1cc3cef28a0bda95dc152f08764d2cda45d7eb59 Mon Sep 17 00:00:00 2001 From: Abhishek Sharma Date: Mon, 15 Jul 2024 16:48:29 -0400 Subject: [PATCH 10/24] Remove deprecated hasDataAccess method. (#13574) * Remove deprecated lang3 RandomUtils usage. * Changes as per the PR comment. * Remove deprecated method hasDataAccess. * Changes as per the PR Comments. * Updated the endpointUrl to QUERY. * Fix linting issue. --- .../controller/api/access/AccessControl.java | 21 ++----------------- .../api/access/AllowAllAccessFactory.java | 2 +- .../access/BasicAuthAccessControlFactory.java | 5 ----- .../ZkBasicAuthAccessControlFactory.java | 5 ----- .../api/resources/PinotQueryResource.java | 3 ++- ...tSegmentUploadDownloadRestletResource.java | 2 +- 6 files changed, 6 insertions(+), 32 deletions(-) diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/access/AccessControl.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/access/AccessControl.java index fe9018ebd08..b2834e2e25f 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/access/AccessControl.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/access/AccessControl.java @@ -31,21 +31,6 @@ public interface AccessControl extends FineGrainedAccessControl { String WORKFLOW_NONE = "NONE"; String WORKFLOW_BASIC = "BASIC"; - /** - * Return whether the client has data access to the given table. - * - * Note: This method is only used fore read access. It's being deprecated and its usage will be replaced by - * `hasAccess` method with AccessType.READ. - * - * @param httpHeaders HTTP headers containing requester identity - * @param tableName Name of the table to be accessed - * @return Whether the client has data access to the table - */ - @Deprecated - default boolean hasDataAccess(HttpHeaders httpHeaders, String tableName) { - return hasAccess(tableName, AccessType.READ, httpHeaders, null); - } - /** * Return whether the client has permission to the given table * @@ -55,10 +40,8 @@ default boolean hasDataAccess(HttpHeaders httpHeaders, String tableName) { * @param endpointUrl the request url for which this access control is called * @return whether the client has permission */ - default boolean hasAccess(@Nullable String tableName, AccessType accessType, HttpHeaders httpHeaders, - @Nullable String endpointUrl) { - return true; - } + boolean hasAccess(@Nullable String tableName, AccessType accessType, HttpHeaders httpHeaders, + @Nullable String endpointUrl); /** * Return whether the client has permission to access the endpoints with are not table level diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/access/AllowAllAccessFactory.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/access/AllowAllAccessFactory.java index eca054a87b6..7d37c5fdb03 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/access/AllowAllAccessFactory.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/access/AllowAllAccessFactory.java @@ -24,7 +24,7 @@ public class AllowAllAccessFactory implements AccessControlFactory { private static final AccessControl ALLOW_ALL_ACCESS = new AccessControl() { @Override - public boolean hasDataAccess(HttpHeaders httpHeaders, String tableName) { + public boolean hasAccess(String tableName, AccessType accessType, HttpHeaders httpHeaders, String endpointUrl) { return true; } }; diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/access/BasicAuthAccessControlFactory.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/access/BasicAuthAccessControlFactory.java index 7e08d289e6d..8147ac36755 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/access/BasicAuthAccessControlFactory.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/access/BasicAuthAccessControlFactory.java @@ -76,11 +76,6 @@ public boolean protectAnnotatedOnly() { return false; } - @Override - public boolean hasDataAccess(HttpHeaders httpHeaders, String tableName) { - return getPrincipal(httpHeaders).filter(p -> p.hasTable(tableName)).isPresent(); - } - @Override public boolean hasAccess(String tableName, AccessType accessType, HttpHeaders httpHeaders, String endpointUrl) { return getPrincipal(httpHeaders) diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/access/ZkBasicAuthAccessControlFactory.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/access/ZkBasicAuthAccessControlFactory.java index 6553ff931df..8d109837cd5 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/access/ZkBasicAuthAccessControlFactory.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/access/ZkBasicAuthAccessControlFactory.java @@ -78,11 +78,6 @@ public boolean protectAnnotatedOnly() { return false; } - @Override - public boolean hasDataAccess(HttpHeaders httpHeaders, String tableName) { - return getPrincipal(httpHeaders).filter(p -> p.hasTable(tableName)).isPresent(); - } - @Override public boolean hasAccess(String tableName, AccessType accessType, HttpHeaders httpHeaders, String endpointUrl) { return getPrincipal(httpHeaders).filter( diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotQueryResource.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotQueryResource.java index 8bcbf96b48f..d86dc7faed7 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotQueryResource.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotQueryResource.java @@ -66,6 +66,7 @@ import org.apache.pinot.controller.api.access.AccessControlFactory; import org.apache.pinot.controller.api.access.AccessType; import org.apache.pinot.controller.helix.core.PinotHelixResourceManager; +import org.apache.pinot.core.auth.Actions; import org.apache.pinot.core.auth.ManualAuthorization; import org.apache.pinot.core.query.executor.sql.SqlQueryExecutor; import org.apache.pinot.query.QueryEnvironment; @@ -292,7 +293,7 @@ private String getQueryResponse(String query, @Nullable SqlNode sqlNode, String // Validate data access AccessControl accessControl = _accessControlFactory.create(); - if (!accessControl.hasDataAccess(httpHeaders, rawTableName)) { + if (!accessControl.hasAccess(rawTableName, AccessType.READ, httpHeaders, Actions.Table.QUERY)) { return QueryException.ACCESS_DENIED_ERROR.toString(); } diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadDownloadRestletResource.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadDownloadRestletResource.java index 20b00a3cc3b..156a3e90950 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadDownloadRestletResource.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadDownloadRestletResource.java @@ -159,7 +159,7 @@ public Response downloadSegment( boolean hasDataAccess; try { AccessControl accessControl = _accessControlFactory.create(); - hasDataAccess = accessControl.hasDataAccess(httpHeaders, tableName); + hasDataAccess = accessControl.hasAccess(tableName, AccessType.READ, httpHeaders, Actions.Table.DOWNLOAD_SEGMENT); } catch (Exception e) { throw new ControllerApplicationException(LOGGER, "Caught exception while validating access to table: " + tableName, Response.Status.INTERNAL_SERVER_ERROR, e); From eb7a3dbf43c5f17356ac763fa65c02cb549145b9 Mon Sep 17 00:00:00 2001 From: "Xiaotian (Jackie) Jiang" <17555551+Jackie-Jiang@users.noreply.github.com> Date: Mon, 15 Jul 2024 14:49:36 -0700 Subject: [PATCH 11/24] Prevent multi-stage operator from returning empty TransferableBlock (#13591) --- .../runtime/blocks/TransferableBlock.java | 3 + .../runtime/operator/FilterOperator.java | 27 ++-- .../runtime/operator/HashJoinOperator.java | 121 +++++++++--------- .../operator/LiteralValueOperator.java | 16 +-- .../operator/MailboxReceiveOperator.java | 2 +- .../runtime/operator/MultiStageOperator.java | 5 + .../query/runtime/operator/SetOperator.java | 49 +++---- .../SortedMailboxReceiveOperator.java | 2 +- .../operator/WindowAggregateOperator.java | 4 +- .../operator/AggregateOperatorTest.java | 8 +- .../runtime/operator/FilterOperatorTest.java | 24 ++-- .../operator/HashJoinOperatorTest.java | 3 +- .../operator/MailboxSendOperatorTest.java | 18 +-- .../operator/WindowAggregateOperatorTest.java | 16 +-- 14 files changed, 159 insertions(+), 139 deletions(-) diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/TransferableBlock.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/TransferableBlock.java index f42fed4eccb..527510937d8 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/TransferableBlock.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/TransferableBlock.java @@ -38,6 +38,7 @@ import org.apache.pinot.core.util.DataBlockExtractUtils; import org.apache.pinot.query.runtime.plan.MultiStageQueryStats; + /** * A {@code TransferableBlock} is a wrapper around {@link DataBlock} for transferring data using * {@link org.apache.pinot.common.proto.Mailbox}. @@ -61,6 +62,8 @@ public TransferableBlock(List container, DataSchema dataSchema, DataBl "Container cannot be used to construct block of type: %s", type); _type = type; _numRows = _container.size(); + // NOTE: Use assert to avoid breaking production code. + assert _numRows > 0 : "Container should not be empty"; _errCodeToExceptionMap = new HashMap<>(); _queryStats = null; } diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/FilterOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/FilterOperator.java index e8a9a7e0259..fb764c22011 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/FilterOperator.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/FilterOperator.java @@ -95,21 +95,28 @@ public String toExplainString() { @Override protected TransferableBlock getNextBlock() { - TransferableBlock block = _input.nextBlock(); - if (block.isEndOfStreamBlock()) { + // Keep reading the input blocks until we find a match row or all blocks are processed. + // TODO: Consider batching the rows to improve performance. + while (true) { + TransferableBlock block = _input.nextBlock(); if (block.isErrorBlock()) { return block; } - return updateEosBlock(block, _statMap); - } - List resultRows = new ArrayList<>(); - for (Object[] row : block.getContainer()) { - Object filterResult = _filterOperand.apply(row); - if (BooleanUtils.isTrueInternalValue(filterResult)) { - resultRows.add(row); + if (block.isSuccessfulEndOfStreamBlock()) { + return updateEosBlock(block, _statMap); + } + assert block.isDataBlock(); + List rows = new ArrayList<>(); + for (Object[] row : block.getContainer()) { + Object filterResult = _filterOperand.apply(row); + if (BooleanUtils.isTrueInternalValue(filterResult)) { + rows.add(row); + } + } + if (!rows.isEmpty()) { + return new TransferableBlock(rows, _dataSchema, DataBlock.Type.ROW); } } - return new TransferableBlock(resultRows, _dataSchema, DataBlock.Type.ROW); } public enum StatKey implements StatMap.Key { diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/HashJoinOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/HashJoinOperator.java index f224a69aa5b..dd60ff523f4 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/HashJoinOperator.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/HashJoinOperator.java @@ -51,6 +51,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; + /** * This basic {@code BroadcastJoinOperator} implement a basic broadcast join algorithm. * This algorithm assumes that the broadcast table has to fit in memory since we are not supporting any spilling. @@ -85,21 +86,14 @@ public class HashJoinOperator extends MultiStageOperator { private final MultiStageOperator _leftInput; private final MultiStageOperator _rightInput; private final JoinRelType _joinType; + private final KeySelector _leftKeySelector; + private final KeySelector _rightKeySelector; private final DataSchema _resultSchema; private final int _leftColumnSize; private final int _resultColumnSize; private final List _nonEquiEvaluators; - private boolean _isHashTableBuilt; private final StatMap _statMap = new StatMap<>(StatKey.class); - // Used by non-inner join. - // Needed to indicate we have finished processing all results after returning last block. - // TODO: Remove this special handling by fixing data block EOS abstraction or operator's invariant. - private boolean _isTerminated; - private TransferableBlock _upstreamErrorBlock; - private final KeySelector _leftKeySelector; - private final KeySelector _rightKeySelector; - // Below are specific parameters to protect the hash table from growing too large. // Once the hash table reaches the limit, we will throw exception or break the right table build process. /** @@ -113,9 +107,14 @@ public class HashJoinOperator extends MultiStageOperator { */ private final JoinOverFlowMode _joinOverflowMode; + private boolean _isHashTableBuilt; private int _currentRowsInHashTable; + private TransferableBlock _upstreamErrorBlock; private MultiStageQueryStats _leftSideStats; private MultiStageQueryStats _rightSideStats; + // Used by non-inner join. + // Needed to indicate we have finished processing all results after returning last block. + private boolean _isTerminated; public HashJoinOperator(OpChainExecutionContext context, MultiStageOperator leftInput, DataSchema leftSchema, MultiStageOperator rightInput, JoinNode node) { @@ -138,7 +137,6 @@ public HashJoinOperator(OpChainExecutionContext context, MultiStageOperator left for (RexExpression nonEquiCondition : nonEquiConditions) { _nonEquiEvaluators.add(TransformOperandFactory.getTransformOperand(nonEquiCondition, _resultSchema)); } - _isHashTableBuilt = false; _broadcastRightTable = new HashMap<>(); if (needUnmatchedRightRows()) { _matchedRightRows = new HashMap<>(); @@ -209,10 +207,6 @@ public String toExplainString() { @Override protected TransferableBlock getNextBlock() throws ProcessingException { - if (_isTerminated) { - assert _leftSideStats != null; - return TransferableBlockUtils.getEndOfStreamTransferableBlock(_leftSideStats); - } if (!_isHashTableBuilt) { // Build JOIN hash table buildBroadcastHashTable(); @@ -220,9 +214,7 @@ protected TransferableBlock getNextBlock() if (_upstreamErrorBlock != null) { return _upstreamErrorBlock; } - TransferableBlock leftBlock = _leftInput.nextBlock(); - // JOIN each left block with the constructed right hash table. - return buildJoinedDataBlock(leftBlock); + return buildJoinedDataBlock(); } private void buildBroadcastHashTable() @@ -280,58 +272,51 @@ private void buildBroadcastHashTable() _statMap.merge(StatKey.TIME_BUILDING_HASH_TABLE_MS, System.currentTimeMillis() - startTime); } - private TransferableBlock buildJoinedDataBlock(TransferableBlock leftBlock) { - if (leftBlock.isErrorBlock()) { - _upstreamErrorBlock = leftBlock; - return _upstreamErrorBlock; - } - if (leftBlock.isSuccessfulEndOfStreamBlock()) { - assert _rightSideStats != null; - _leftSideStats = leftBlock.getQueryStats(); + private TransferableBlock buildJoinedDataBlock() { + if (_isTerminated) { assert _leftSideStats != null; - _leftSideStats.mergeInOrder(_rightSideStats, getOperatorType(), _statMap); + return TransferableBlockUtils.getEndOfStreamTransferableBlock(_leftSideStats); + } - if (!needUnmatchedRightRows()) { - return TransferableBlockUtils.getEndOfStreamTransferableBlock(_leftSideStats); + // Keep reading the input blocks until we find a match row or all blocks are processed. + // TODO: Consider batching the rows to improve performance. + while (true) { + TransferableBlock leftBlock = _leftInput.nextBlock(); + if (leftBlock.isErrorBlock()) { + return leftBlock; } - // TODO: Moved to a different function. - // Return remaining non-matched rows for non-inner join. - List returnRows = new ArrayList<>(); - for (Map.Entry> entry : _broadcastRightTable.entrySet()) { - List rightRows = entry.getValue(); - BitSet matchedIndices = _matchedRightRows.get(entry.getKey()); - if (matchedIndices == null) { - for (Object[] rightRow : rightRows) { - returnRows.add(joinRow(null, rightRow)); - } - } else { - int numRightRows = rightRows.size(); - int unmatchedIndex = 0; - while ((unmatchedIndex = matchedIndices.nextClearBit(unmatchedIndex)) < numRightRows) { - returnRows.add(joinRow(null, rightRows.get(unmatchedIndex++))); + if (leftBlock.isSuccessfulEndOfStreamBlock()) { + assert _rightSideStats != null; + _leftSideStats = leftBlock.getQueryStats(); + assert _leftSideStats != null; + _leftSideStats.mergeInOrder(_rightSideStats, getOperatorType(), _statMap); + if (needUnmatchedRightRows()) { + List rows = buildNonMatchRightRows(); + if (!rows.isEmpty()) { + _isTerminated = true; + return new TransferableBlock(rows, _resultSchema, DataBlock.Type.ROW); } } + return TransferableBlockUtils.getEndOfStreamTransferableBlock(_leftSideStats); + } + assert leftBlock.isDataBlock(); + List rows = buildJoinedRows(leftBlock); + if (!rows.isEmpty()) { + return new TransferableBlock(rows, _resultSchema, DataBlock.Type.ROW); } - _isTerminated = true; - return new TransferableBlock(returnRows, _resultSchema, DataBlock.Type.ROW); } - List rows; + } + + private List buildJoinedRows(TransferableBlock leftBlock) { switch (_joinType) { - case SEMI: { - rows = buildJoinedDataBlockSemi(leftBlock); - break; - } - case ANTI: { - rows = buildJoinedDataBlockAnti(leftBlock); - break; - } + case SEMI: + return buildJoinedDataBlockSemi(leftBlock); + case ANTI: + return buildJoinedDataBlockAnti(leftBlock); default: { // INNER, LEFT, RIGHT, FULL - rows = buildJoinedDataBlockDefault(leftBlock); - break; + return buildJoinedDataBlockDefault(leftBlock); } } - // TODO: Rows can be empty here. Consider fetching another left block instead of returning empty block. - return new TransferableBlock(rows, _resultSchema, DataBlock.Type.ROW); } private List buildJoinedDataBlockSemi(TransferableBlock leftBlock) { @@ -401,6 +386,26 @@ private List buildJoinedDataBlockAnti(TransferableBlock leftBlock) { return rows; } + private List buildNonMatchRightRows() { + List rows = new ArrayList<>(); + for (Map.Entry> entry : _broadcastRightTable.entrySet()) { + List rightRows = entry.getValue(); + BitSet matchedIndices = _matchedRightRows.get(entry.getKey()); + if (matchedIndices == null) { + for (Object[] rightRow : rightRows) { + rows.add(joinRow(null, rightRow)); + } + } else { + int numRightRows = rightRows.size(); + int unmatchedIndex = 0; + while ((unmatchedIndex = matchedIndices.nextClearBit(unmatchedIndex)) < numRightRows) { + rows.add(joinRow(null, rightRows.get(unmatchedIndex++))); + } + } + } + return rows; + } + private Object[] joinRow(@Nullable Object[] leftRow, @Nullable Object[] rightRow) { Object[] resultRow = new Object[_resultColumnSize]; int idx = 0; diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LiteralValueOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LiteralValueOperator.java index e8824ca5e34..fc9b5b51695 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LiteralValueOperator.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LiteralValueOperator.java @@ -38,15 +38,15 @@ public class LiteralValueOperator extends MultiStageOperator { private static final Logger LOGGER = LoggerFactory.getLogger(LiteralValueOperator.class); private final DataSchema _dataSchema; - private final TransferableBlock _rexLiteralBlock; + private final List> _literalRows; private boolean _isLiteralBlockReturned; private final StatMap _statMap = new StatMap<>(StatKey.class); public LiteralValueOperator(OpChainExecutionContext context, ValueNode node) { super(context); _dataSchema = node.getDataSchema(); - _rexLiteralBlock = constructBlock(node.getLiteralRows()); - // only return a single literal block when it is the 1st virtual server. otherwise, result will be duplicated. + _literalRows = node.getLiteralRows(); + // Only return a single literal block when it is the 1st virtual server. Otherwise, result will be duplicated. _isLiteralBlockReturned = context.getId().getVirtualServerId() != 0; } @@ -73,9 +73,9 @@ public String toExplainString() { @Override protected TransferableBlock getNextBlock() { - if (!_isLiteralBlockReturned && !_isEarlyTerminated) { + if (!_isLiteralBlockReturned && !_isEarlyTerminated && !_literalRows.isEmpty()) { _isLiteralBlockReturned = true; - return _rexLiteralBlock; + return constructBlock(); } else { return createEosBlock(); } @@ -91,9 +91,9 @@ public Type getOperatorType() { return Type.LITERAL; } - private TransferableBlock constructBlock(List> literalRows) { - List blockContent = new ArrayList<>(); - for (List row : literalRows) { + private TransferableBlock constructBlock() { + List blockContent = new ArrayList<>(_literalRows.size()); + for (List row : _literalRows) { Object[] values = new Object[_dataSchema.size()]; for (int i = 0; i < _dataSchema.size(); i++) { values[i] = row.get(i).getValue(); diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperator.java index e804b4b4081..c1c7647a1e5 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperator.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperator.java @@ -28,7 +28,7 @@ /** * This {@code MailboxReceiveOperator} receives data from a {@link ReceivingMailbox} and serve it out from the - * {@link MultiStageOperator#getNextBlock()} API. + * {@link #nextBlock()} API. */ public class MailboxReceiveOperator extends BaseMailboxReceiveOperator { private static final Logger LOGGER = LoggerFactory.getLogger(MailboxReceiveOperator.class); diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MultiStageOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MultiStageOperator.java index 50e68a47a69..0321bedc1b7 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MultiStageOperator.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MultiStageOperator.java @@ -66,6 +66,11 @@ public MultiStageOperator(OpChainExecutionContext context) { public abstract void registerExecution(long time, int numRows); + /** + * Returns the next block from the operator. It should return non-empty data blocks followed by an end-of-stream (EOS) + * block when all the data is processed, or an error block if an error occurred. After it returns EOS or error block, + * no more call should be made. + */ @Override public TransferableBlock nextBlock() { if (Tracing.ThreadAccountantOps.isInterrupted()) { diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/SetOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/SetOperator.java index fb8ef505e53..ea5cf046d7f 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/SetOperator.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/SetOperator.java @@ -108,9 +108,7 @@ protected TransferableBlock getNextBlock() { if (_upstreamErrorBlock != null) { return _upstreamErrorBlock; } - // UNION each left block with the constructed right block set. - TransferableBlock leftBlock = _leftChildOperator.nextBlock(); - return constructResultBlockSet(leftBlock); + return constructResultBlockSet(); } protected void constructRightBlockSet() { @@ -132,28 +130,33 @@ protected void constructRightBlockSet() { } } - protected TransferableBlock constructResultBlockSet(TransferableBlock leftBlock) { - List rows = new ArrayList<>(); - // TODO: Other operators keep the first erroneous block, while this keep the last. - // We should decide what is what we want to do and be consistent with that. - if (_upstreamErrorBlock != null || leftBlock.isErrorBlock()) { - _upstreamErrorBlock = leftBlock; - return _upstreamErrorBlock; - } - if (leftBlock.isSuccessfulEndOfStreamBlock()) { - assert _rightQueryStats != null; - MultiStageQueryStats leftQueryStats = leftBlock.getQueryStats(); - assert leftQueryStats != null; - _rightQueryStats.mergeInOrder(leftQueryStats, getOperatorType(), _statMap); - _rightQueryStats.getCurrentStats().concat(leftQueryStats.getCurrentStats()); - return TransferableBlockUtils.getEndOfStreamTransferableBlock(_rightQueryStats); - } - for (Object[] row : leftBlock.getContainer()) { - if (handleRowMatched(row)) { - rows.add(row); + protected TransferableBlock constructResultBlockSet() { + // Keep reading the input blocks until we find a match row or all blocks are processed. + // TODO: Consider batching the rows to improve performance. + while (true) { + TransferableBlock leftBlock = _leftChildOperator.nextBlock(); + if (leftBlock.isErrorBlock()) { + return leftBlock; + } + if (leftBlock.isSuccessfulEndOfStreamBlock()) { + assert _rightQueryStats != null; + MultiStageQueryStats leftQueryStats = leftBlock.getQueryStats(); + assert leftQueryStats != null; + _rightQueryStats.mergeInOrder(leftQueryStats, getOperatorType(), _statMap); + _rightQueryStats.getCurrentStats().concat(leftQueryStats.getCurrentStats()); + return TransferableBlockUtils.getEndOfStreamTransferableBlock(_rightQueryStats); + } + assert leftBlock.isDataBlock(); + List rows = new ArrayList<>(); + for (Object[] row : leftBlock.getContainer()) { + if (handleRowMatched(row)) { + rows.add(row); + } + } + if (!rows.isEmpty()) { + return new TransferableBlock(rows, _dataSchema, DataBlock.Type.ROW); } } - return new TransferableBlock(rows, _dataSchema, DataBlock.Type.ROW); } /** diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/SortedMailboxReceiveOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/SortedMailboxReceiveOperator.java index 95e6cf2d4fa..35e0ac85039 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/SortedMailboxReceiveOperator.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/SortedMailboxReceiveOperator.java @@ -36,7 +36,7 @@ /** * This {@code SortedMailboxReceiveOperator} receives data from a {@link ReceivingMailbox} and serve it out from the - * {@link MultiStageOperator#getNextBlock()}()} API in a sorted manner. + * {@link #nextBlock()} API in a sorted manner. * * TODO: Once sorting on the {@code MailboxSendOperator} is available, modify this to use a k-way merge instead of * resorting via the PriorityQueue. diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/WindowAggregateOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/WindowAggregateOperator.java index 4272873a790..27001778d4c 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/WindowAggregateOperator.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/WindowAggregateOperator.java @@ -40,7 +40,6 @@ import org.apache.pinot.query.planner.plannode.PlanNode; import org.apache.pinot.query.planner.plannode.WindowNode; import org.apache.pinot.query.runtime.blocks.TransferableBlock; -import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils; import org.apache.pinot.query.runtime.operator.utils.AggregationUtils; import org.apache.pinot.query.runtime.operator.utils.TypeUtils; import org.apache.pinot.query.runtime.operator.window.WindowFunction; @@ -244,7 +243,7 @@ private boolean isPartitionByOnlyQuery(int[] keys, List colla private TransferableBlock computeBlocks() throws ProcessingException { TransferableBlock block = _input.nextBlock(); - while (!TransferableBlockUtils.isEndOfStream(block)) { + while (block.isDataBlock()) { List container = block.getContainer(); int containerSize = container.size(); if (_numRows + containerSize > _maxRowsInWindowCache) { @@ -276,6 +275,7 @@ private TransferableBlock computeBlocks() if (block.isErrorBlock()) { return block; } + assert block.isSuccessfulEndOfStreamBlock(); _eosBlock = updateEosBlock(block, _statMap); ColumnDataType[] resultStoredTypes = _resultSchema.getStoredColumnDataTypes(); diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/AggregateOperatorTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/AggregateOperatorTest.java index 03c5cc0e95e..1c7edcae6ca 100644 --- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/AggregateOperatorTest.java +++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/AggregateOperatorTest.java @@ -118,7 +118,7 @@ public void testAggregateSingleInputBlock() { AggregateOperator operator = getOperator(resultSchema, aggCalls, filterArgs, groupKeys); // When: - List resultRows = operator.getNextBlock().getContainer(); + List resultRows = operator.nextBlock().getContainer(); // Then: assertEquals(resultRows.size(), 1); @@ -141,7 +141,7 @@ public void testAggregateMultipleInputBlocks() { AggregateOperator operator = getOperator(resultSchema, aggCalls, filterArgs, groupKeys); // When: - List resultRows = operator.getNextBlock().getContainer(); + List resultRows = operator.nextBlock().getContainer(); // Then: assertEquals(resultRows.size(), 1); @@ -168,7 +168,7 @@ public void testAggregateWithFilter() { AggregateOperator operator = getOperator(resultSchema, aggCalls, filterArgs, groupKeys); // When: - List resultRows = operator.getNextBlock().getContainer(); + List resultRows = operator.nextBlock().getContainer(); // Then: assertEquals(resultRows.size(), 1); @@ -188,7 +188,7 @@ public void testGroupByAggregateWithHashCollision() { DataSchema resultSchema = new DataSchema(new String[]{"group", "sum"}, new ColumnDataType[]{STRING, DOUBLE}); AggregateOperator operator = getOperator(resultSchema, aggCalls, filterArgs, groupKeys); - List resultRows = operator.getNextBlock().getContainer(); + List resultRows = operator.nextBlock().getContainer(); assertEquals(resultRows.size(), 2); if (resultRows.get(0)[0].equals("Aa")) { assertEquals(resultRows.get(0), new Object[]{"Aa", 1.0}); diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/FilterOperatorTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/FilterOperatorTest.java index 462cd2ddbfa..1fb29c79f07 100644 --- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/FilterOperatorTest.java +++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/FilterOperatorTest.java @@ -63,7 +63,7 @@ public void shouldPropagateUpstreamErrorBlock() { ColumnDataType.BOOLEAN }); FilterOperator operator = getOperator(inputSchema, RexExpression.Literal.TRUE); - TransferableBlock block = operator.getNextBlock(); + TransferableBlock block = operator.nextBlock(); assertTrue(block.isErrorBlock()); assertTrue(block.getExceptions().get(QueryException.UNKNOWN_ERROR_CODE).contains("filterError")); } @@ -75,7 +75,7 @@ public void shouldPropagateUpstreamEOS() { }); when(_input.nextBlock()).thenReturn(TransferableBlockTestUtils.getEndOfStreamTransferableBlock(0)); FilterOperator operator = getOperator(inputSchema, RexExpression.Literal.TRUE); - TransferableBlock block = operator.getNextBlock(); + TransferableBlock block = operator.nextBlock(); assertTrue(block.isEndOfStreamBlock()); } @@ -87,7 +87,7 @@ public void shouldHandleTrueBooleanLiteralFilter() { when(_input.nextBlock()).thenReturn(OperatorTestUtil.block(inputSchema, new Object[]{0}, new Object[]{1})) .thenReturn(TransferableBlockTestUtils.getEndOfStreamTransferableBlock(0)); FilterOperator operator = getOperator(inputSchema, RexExpression.Literal.TRUE); - List resultRows = operator.getNextBlock().getContainer(); + List resultRows = operator.nextBlock().getContainer(); assertEquals(resultRows.size(), 2); assertEquals(resultRows.get(0), new Object[]{0}); assertEquals(resultRows.get(1), new Object[]{1}); @@ -98,10 +98,10 @@ public void shouldHandleFalseBooleanLiteralFilter() { DataSchema inputSchema = new DataSchema(new String[]{"intCol"}, new ColumnDataType[]{ ColumnDataType.INT }); - when(_input.nextBlock()).thenReturn(OperatorTestUtil.block(inputSchema, new Object[]{1}, new Object[]{2})); + when(_input.nextBlock()).thenReturn(OperatorTestUtil.block(inputSchema, new Object[]{1}, new Object[]{2})) + .thenReturn(TransferableBlockTestUtils.getEndOfStreamTransferableBlock(0)); FilterOperator operator = getOperator(inputSchema, RexExpression.Literal.FALSE); - List resultRows = operator.getNextBlock().getContainer(); - assertTrue(resultRows.isEmpty()); + assertTrue(operator.nextBlock().isSuccessfulEndOfStreamBlock()); } @Test(expectedExceptions = IllegalStateException.class, expectedExceptionsMessageRegExp = "Filter operand must " @@ -134,7 +134,7 @@ public void shouldHandleBooleanInputRef() { }); when(_input.nextBlock()).thenReturn(OperatorTestUtil.block(inputSchema, new Object[]{1, 1}, new Object[]{2, 0})); FilterOperator operator = getOperator(inputSchema, ref1); - List resultRows = operator.getNextBlock().getContainer(); + List resultRows = operator.nextBlock().getContainer(); assertEquals(resultRows.size(), 1); assertEquals(resultRows.get(0), new Object[]{1, 1}); } @@ -149,7 +149,7 @@ public void shouldHandleAndFilter() { RexExpression.FunctionCall andCall = new RexExpression.FunctionCall(ColumnDataType.BOOLEAN, SqlKind.AND.name(), List.of(new RexExpression.InputRef(0), new RexExpression.InputRef(1))); FilterOperator operator = getOperator(inputSchema, andCall); - List resultRows = operator.getNextBlock().getContainer(); + List resultRows = operator.nextBlock().getContainer(); assertEquals(resultRows.size(), 1); assertEquals(resultRows.get(0), new Object[]{1, 1}); } @@ -164,7 +164,7 @@ public void shouldHandleOrFilter() { RexExpression.FunctionCall orCall = new RexExpression.FunctionCall(ColumnDataType.BOOLEAN, SqlKind.OR.name(), List.of(new RexExpression.InputRef(0), new RexExpression.InputRef(1))); FilterOperator operator = getOperator(inputSchema, orCall); - List resultRows = operator.getNextBlock().getContainer(); + List resultRows = operator.nextBlock().getContainer(); assertEquals(resultRows.size(), 2); assertEquals(resultRows.get(0), new Object[]{1, 1}); assertEquals(resultRows.get(1), new Object[]{1, 0}); @@ -180,7 +180,7 @@ public void shouldHandleNotFilter() { RexExpression.FunctionCall notCall = new RexExpression.FunctionCall(ColumnDataType.BOOLEAN, SqlKind.NOT.name(), List.of(new RexExpression.InputRef(0))); FilterOperator operator = getOperator(inputSchema, notCall); - List resultRows = operator.getNextBlock().getContainer(); + List resultRows = operator.nextBlock().getContainer(); assertEquals(resultRows.size(), 1); assertEquals(resultRows.get(0)[0], 0); assertEquals(resultRows.get(0)[1], 0); @@ -197,7 +197,7 @@ public void shouldHandleGreaterThanFilter() { new RexExpression.FunctionCall(ColumnDataType.BOOLEAN, SqlKind.GREATER_THAN.name(), List.of(new RexExpression.InputRef(0), new RexExpression.InputRef(1))); FilterOperator operator = getOperator(inputSchema, greaterThan); - List resultRows = operator.getNextBlock().getContainer(); + List resultRows = operator.nextBlock().getContainer(); assertEquals(resultRows.size(), 1); assertEquals(resultRows.get(0), new Object[]{3, 2}); } @@ -213,7 +213,7 @@ public void shouldHandleBooleanFunction() { new RexExpression.FunctionCall(ColumnDataType.BOOLEAN, SqlKind.STARTS_WITH.name(), List.of(new RexExpression.InputRef(0), new RexExpression.Literal(ColumnDataType.STRING, "star"))); FilterOperator operator = getOperator(inputSchema, startsWith); - List resultRows = operator.getNextBlock().getContainer(); + List resultRows = operator.nextBlock().getContainer(); assertEquals(resultRows.size(), 1); assertEquals(resultRows.get(0), new Object[]{"starTree"}); } diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/HashJoinOperatorTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/HashJoinOperatorTest.java index e03a2f86f80..f204c04c98d 100644 --- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/HashJoinOperatorTest.java +++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/HashJoinOperatorTest.java @@ -242,8 +242,7 @@ public void shouldPassRightTableEOS() { }); HashJoinOperator operator = getOperator(leftSchema, resultSchema, JoinRelType.INNER, List.of(0), List.of(0), List.of()); - List resultRows = operator.nextBlock().getContainer(); - assertTrue(resultRows.isEmpty()); + assertTrue(operator.nextBlock().isSuccessfulEndOfStreamBlock()); } @Test diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxSendOperatorTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxSendOperatorTest.java index 56108f8af40..cc92873c944 100644 --- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxSendOperatorTest.java +++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxSendOperatorTest.java @@ -22,6 +22,7 @@ import java.util.Map; import java.util.concurrent.TimeoutException; import org.apache.pinot.common.utils.DataSchema; +import org.apache.pinot.common.utils.DataSchema.ColumnDataType; import org.apache.pinot.query.mailbox.MailboxService; import org.apache.pinot.query.routing.StageMetadata; import org.apache.pinot.query.routing.WorkerMetadata; @@ -104,8 +105,7 @@ public void shouldSendErrorBlockWhenInputThrows() public void shouldNotSendErrorBlockWhenTimedOut() throws Exception { // Given: - TransferableBlock dataBlock = - OperatorTestUtil.block(new DataSchema(new String[]{}, new DataSchema.ColumnDataType[]{})); + TransferableBlock dataBlock = getDummyDataBlock(); when(_input.nextBlock()).thenReturn(dataBlock); doThrow(new TimeoutException()).when(_exchange).send(any()); @@ -141,10 +141,8 @@ public void shouldSendEosBlock() public void shouldSendDataBlock() throws Exception { // Given: - TransferableBlock dataBlock1 = - OperatorTestUtil.block(new DataSchema(new String[]{}, new DataSchema.ColumnDataType[]{})); - TransferableBlock dataBlock2 = - OperatorTestUtil.block(new DataSchema(new String[]{}, new DataSchema.ColumnDataType[]{})); + TransferableBlock dataBlock1 = getDummyDataBlock(); + TransferableBlock dataBlock2 = getDummyDataBlock(); TransferableBlock eosBlock = TransferableBlockUtils.getEndOfStreamTransferableBlock(MultiStageQueryStats.emptyStats(SENDER_STAGE_ID)); when(_input.nextBlock()).thenReturn(dataBlock1, dataBlock2, eosBlock); @@ -183,8 +181,7 @@ public void shouldSendDataBlock() public void shouldEarlyTerminateWhenUpstreamWhenIndicated() throws Exception { // Given: - TransferableBlock dataBlock = - OperatorTestUtil.block(new DataSchema(new String[]{}, new DataSchema.ColumnDataType[]{})); + TransferableBlock dataBlock = getDummyDataBlock(); when(_input.nextBlock()).thenReturn(dataBlock); doReturn(true).when(_exchange).send(any()); @@ -203,4 +200,9 @@ private MailboxSendOperator getOperator() { null); return new MailboxSendOperator(context, _input, statMap -> _exchange); } + + private static TransferableBlock getDummyDataBlock() { + return OperatorTestUtil.block(new DataSchema(new String[]{"intCol"}, new ColumnDataType[]{ColumnDataType.INT}), + new Object[]{1}); + } } diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/WindowAggregateOperatorTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/WindowAggregateOperatorTest.java index cd3ca26b6cf..ec5d667254b 100644 --- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/WindowAggregateOperatorTest.java +++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/WindowAggregateOperatorTest.java @@ -27,7 +27,6 @@ import org.apache.pinot.calcite.rel.hint.PinotHintOptions; import org.apache.pinot.common.datatable.StatMap; import org.apache.pinot.common.exception.QueryException; -import org.apache.pinot.common.response.ProcessingException; import org.apache.pinot.common.utils.DataSchema; import org.apache.pinot.common.utils.DataSchema.ColumnDataType; import org.apache.pinot.core.data.table.Key; @@ -270,8 +269,7 @@ public void testShouldThrowOnUnknownRankAggFunction() { } @Test - public void testRankDenseRankRankingFunctions() - throws ProcessingException { + public void testRankDenseRankRankingFunctions() { // Given: DataSchema inputSchema = new DataSchema(new String[]{"group", "arg"}, new ColumnDataType[]{INT, STRING}); // Input should be in sorted order on the order by key as SortExchange will handle pre-sorting the data @@ -294,7 +292,7 @@ public void testRankDenseRankRankingFunctions() Integer.MIN_VALUE, 0); // When: - List resultRows = operator.getNextBlock().getContainer(); + List resultRows = operator.nextBlock().getContainer(); // Then: verifyResultRows(resultRows, keys, @@ -308,8 +306,7 @@ public void testRankDenseRankRankingFunctions() } @Test - public void testRowNumberRankingFunction() - throws ProcessingException { + public void testRowNumberRankingFunction() { // Given: DataSchema inputSchema = new DataSchema(new String[]{"group", "arg"}, new ColumnDataType[]{INT, STRING}); // Input should be in sorted order on the order by key as SortExchange will handle pre-sorting the data @@ -330,7 +327,7 @@ public void testRowNumberRankingFunction() Integer.MIN_VALUE, 0); // When: - List resultRows = operator.getNextBlock().getContainer(); + List resultRows = operator.nextBlock().getContainer(); // Then: verifyResultRows(resultRows, keys, Map.of(1, List.of(new Object[]{1, "foo", 1L}), 2, @@ -340,8 +337,7 @@ public void testRowNumberRankingFunction() } @Test - public void testNonEmptyOrderByKeysNotMatchingPartitionByKeys() - throws ProcessingException { + public void testNonEmptyOrderByKeysNotMatchingPartitionByKeys() { // Given: DataSchema inputSchema = new DataSchema(new String[]{"group", "arg"}, new ColumnDataType[]{INT, STRING}); // Input should be in sorted order on the order by key as SortExchange will handle pre-sorting the data @@ -360,7 +356,7 @@ public void testNonEmptyOrderByKeysNotMatchingPartitionByKeys() Integer.MIN_VALUE, Integer.MAX_VALUE); // When: - List resultRows = operator.getNextBlock().getContainer(); + List resultRows = operator.nextBlock().getContainer(); // Then: verifyResultRows(resultRows, keys, Map.of(1, List.of(new Object[]{1, "foo", 1.0}), 2, From f74e8564afc6635f6644907b62fb8daa57744e76 Mon Sep 17 00:00:00 2001 From: sullis Date: Mon, 15 Jul 2024 15:04:36 -0700 Subject: [PATCH 12/24] add assertion for S3PinotFS delete operation (#13607) --- .../apache/pinot/plugin/filesystem/S3PinotFSTest.java | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/pinot-plugins/pinot-file-system/pinot-s3/src/test/java/org/apache/pinot/plugin/filesystem/S3PinotFSTest.java b/pinot-plugins/pinot-file-system/pinot-s3/src/test/java/org/apache/pinot/plugin/filesystem/S3PinotFSTest.java index 339155643ae..226254e7694 100644 --- a/pinot-plugins/pinot-file-system/pinot-s3/src/test/java/org/apache/pinot/plugin/filesystem/S3PinotFSTest.java +++ b/pinot-plugins/pinot-file-system/pinot-s3/src/test/java/org/apache/pinot/plugin/filesystem/S3PinotFSTest.java @@ -256,7 +256,10 @@ public void testDeleteFile() } } - _s3PinotFS.delete(URI.create(String.format(FILE_FORMAT, SCHEME, BUCKET, fileToDelete)), false); + boolean deleteResult = _s3PinotFS.delete( + URI.create(String.format(FILE_FORMAT, SCHEME, BUCKET, fileToDelete)), false); + + Assert.assertTrue(deleteResult); ListObjectsV2Response listObjectsV2Response = _s3Client.listObjectsV2(S3TestUtils.getListObjectRequest(BUCKET, "", true)); @@ -278,7 +281,10 @@ public void testDeleteFolder() createEmptyFile(folderName, fileName); } - _s3PinotFS.delete(URI.create(String.format(FILE_FORMAT, SCHEME, BUCKET, folderName)), true); + boolean deleteResult = _s3PinotFS.delete( + URI.create(String.format(FILE_FORMAT, SCHEME, BUCKET, folderName)), true); + + Assert.assertTrue(deleteResult); ListObjectsV2Response listObjectsV2Response = _s3Client.listObjectsV2(S3TestUtils.getListObjectRequest(BUCKET, "", true)); From 147e05d7a8c3593fd418a84ecac10aee99b72a90 Mon Sep 17 00:00:00 2001 From: "Xiaotian (Jackie) Jiang" <17555551+Jackie-Jiang@users.noreply.github.com> Date: Mon, 15 Jul 2024 15:09:48 -0700 Subject: [PATCH 13/24] Fix null handling for window aggregate (#13611) --- .../operator/utils/AggregationUtils.java | 32 +++++++------------ .../aggregate/AggregateWindowFunction.java | 11 +++++-- .../test/resources/queries/NullHandling.json | 4 +++ 3 files changed, 25 insertions(+), 22 deletions(-) diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/utils/AggregationUtils.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/utils/AggregationUtils.java index ed24af5a3c1..ee2441fe707 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/utils/AggregationUtils.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/utils/AggregationUtils.java @@ -18,7 +18,6 @@ */ package org.apache.pinot.query.runtime.operator.utils; -import com.google.common.collect.ImmutableMap; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -170,25 +169,18 @@ default Object init(@Nullable Object value, ColumnDataType dataType) { */ public static class Accumulator { //@formatter:off - public static final Map> MERGERS = - ImmutableMap.>builder() - .put("SUM", cdt -> AggregationUtils::mergeSum) - .put("$SUM", cdt -> AggregationUtils::mergeSum) - .put("$SUM0", cdt -> AggregationUtils::mergeSum) - .put("MIN", cdt -> AggregationUtils::mergeMin) - .put("$MIN", cdt -> AggregationUtils::mergeMin) - .put("$MIN0", cdt -> AggregationUtils::mergeMin) - .put("MAX", cdt -> AggregationUtils::mergeMax) - .put("$MAX", cdt -> AggregationUtils::mergeMax) - .put("$MAX0", cdt -> AggregationUtils::mergeMax) - .put("COUNT", cdt -> new AggregationUtils.MergeCounts()) - .put("BOOL_AND", cdt -> AggregationUtils::mergeBoolAnd) - .put("$BOOL_AND", cdt -> AggregationUtils::mergeBoolAnd) - .put("$BOOL_AND0", cdt -> AggregationUtils::mergeBoolAnd) - .put("BOOL_OR", cdt -> AggregationUtils::mergeBoolOr) - .put("$BOOL_OR", cdt -> AggregationUtils::mergeBoolOr) - .put("$BOOL_OR0", cdt -> AggregationUtils::mergeBoolOr) - .build(); + public static final Map> MERGERS = Map.of( + "SUM", cdt -> AggregationUtils::mergeSum, + // NOTE: Keep both 'SUM0' and '$SUM0' for backward compatibility where 'SUM0' is SqlKind and '$SUM0' is function + // name. + "SUM0", cdt -> AggregationUtils::mergeSum, + "$SUM0", cdt -> AggregationUtils::mergeSum, + "MIN", cdt -> AggregationUtils::mergeMin, + "MAX", cdt -> AggregationUtils::mergeMax, + "COUNT", cdt -> new AggregationUtils.MergeCounts(), + "BOOL_AND", cdt -> AggregationUtils::mergeBoolAnd, + "BOOL_OR", cdt -> AggregationUtils::mergeBoolOr + ); //@formatter:on protected final int _inputRef; diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/window/aggregate/AggregateWindowFunction.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/window/aggregate/AggregateWindowFunction.java index fbf0afed773..6763542bd0d 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/window/aggregate/AggregateWindowFunction.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/window/aggregate/AggregateWindowFunction.java @@ -18,26 +18,33 @@ */ package org.apache.pinot.query.runtime.operator.window.aggregate; +import com.google.common.base.Preconditions; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.function.Function; import org.apache.calcite.rel.RelFieldCollation; import org.apache.pinot.common.utils.DataSchema; +import org.apache.pinot.common.utils.DataSchema.ColumnDataType; import org.apache.pinot.core.data.table.Key; import org.apache.pinot.query.planner.logical.RexExpression; import org.apache.pinot.query.runtime.operator.utils.AggregationUtils; +import org.apache.pinot.query.runtime.operator.utils.AggregationUtils.Merger; import org.apache.pinot.query.runtime.operator.window.WindowFunction; public class AggregateWindowFunction extends WindowFunction { - private final AggregationUtils.Merger _merger; + private final Merger _merger; public AggregateWindowFunction(RexExpression.FunctionCall aggCall, DataSchema inputSchema, List collations, boolean partitionByOnly) { super(aggCall, inputSchema, collations, partitionByOnly); - _merger = AggregationUtils.Accumulator.MERGERS.get(aggCall.getFunctionName()).apply(_dataType); + String functionName = aggCall.getFunctionName(); + Function mergerCreator = AggregationUtils.Accumulator.MERGERS.get(functionName); + Preconditions.checkArgument(mergerCreator != null, "Unsupported aggregate function: %s", functionName); + _merger = mergerCreator.apply(_dataType); } @Override diff --git a/pinot-query-runtime/src/test/resources/queries/NullHandling.json b/pinot-query-runtime/src/test/resources/queries/NullHandling.json index ee15c88f345..95e6b6c0c50 100644 --- a/pinot-query-runtime/src/test/resources/queries/NullHandling.json +++ b/pinot-query-runtime/src/test/resources/queries/NullHandling.json @@ -309,6 +309,10 @@ "sql": "SET enableNullHandling=true; SELECT strCol1, intCol1, nIntCol1, nnIntCol1, strCol2, nStrCol2, nnStrCol2 FROM {tbl1} WHERE nStrCol2 IS NULL AND nIntCol1 IS NOT NULL", "h2Sql": "SELECT strCol1, intCol1, nIntCol1, nnIntCol1, strCol2, nStrCol2, 'null' FROM {tbl1} WHERE nStrCol2 IS NULL AND nIntCol1 IS NOT NULL" }, + { + "description": "window function with NULL handling", + "sql": "SET enableNullHandling=true; SELECT SUM(intCol1) OVER() FROM {tbl1}" + }, { "description": "Leaf stages should not return nulls", From 3424a51be9e345bbaab45994d9a2970dae0d005d Mon Sep 17 00:00:00 2001 From: sullis Date: Mon, 15 Jul 2024 16:10:47 -0700 Subject: [PATCH 14/24] add unit test for S3PinotFS move operation (#13608) --- .../plugin/filesystem/S3PinotFSTest.java | 31 +++++++++++++++++++ 1 file changed, 31 insertions(+) diff --git a/pinot-plugins/pinot-file-system/pinot-s3/src/test/java/org/apache/pinot/plugin/filesystem/S3PinotFSTest.java b/pinot-plugins/pinot-file-system/pinot-s3/src/test/java/org/apache/pinot/plugin/filesystem/S3PinotFSTest.java index 226254e7694..a41391034d6 100644 --- a/pinot-plugins/pinot-file-system/pinot-s3/src/test/java/org/apache/pinot/plugin/filesystem/S3PinotFSTest.java +++ b/pinot-plugins/pinot-file-system/pinot-s3/src/test/java/org/apache/pinot/plugin/filesystem/S3PinotFSTest.java @@ -418,6 +418,37 @@ public void testMkdir() Assert.assertTrue(headObjectResponse.sdkHttpResponse().isSuccessful()); } + @Test + public void testMoveFile() + throws Exception { + + String fileName = "file-to-move"; + int fileSize = 5000; + + File file = new File(TEMP_FILE, fileName); + + try { + createDummyFile(file, fileSize); + URI sourceUri = URI.create(String.format(FILE_FORMAT, SCHEME, BUCKET, fileName)); + + _s3PinotFS.copyFromLocalFile(file, sourceUri); + + URI targetUri = URI.create(String.format(FILE_FORMAT, SCHEME, BUCKET, "move-target")); + + boolean moveResult = _s3PinotFS.move(sourceUri, targetUri, false); + Assert.assertTrue(moveResult); + + Assert.assertFalse(_s3PinotFS.exists(sourceUri)); + Assert.assertTrue(_s3PinotFS.exists(targetUri)); + + HeadObjectResponse headObjectResponse = + _s3Client.headObject(S3TestUtils.getHeadObjectRequest(BUCKET, "move-target")); + Assert.assertEquals(headObjectResponse.contentLength(), fileSize); + } finally { + FileUtils.deleteQuietly(file); + } + } + private static void createDummyFile(File file, int size) throws IOException { FileUtils.deleteQuietly(file); From 3015b2ec7783a6a14e6ed51e6b95bc8197dbffcb Mon Sep 17 00:00:00 2001 From: Aadil Khalifa <49197409+aadilkhalifa@users.noreply.github.com> Date: Tue, 16 Jul 2024 04:50:48 +0530 Subject: [PATCH 15/24] Added SHOW TABLES and DESCRIBE table support (#12293) --- .../function/scalar/ArrayFunctions.java | 44 ++ .../FunctionDefinitionRegistryTest.java | 4 +- .../GenerateArrayTransformFunction.java | 410 ++++++++++++++++++ .../function/TransformFunctionFactory.java | 7 + .../GenerateArrayTransformFunctionTest.java | 159 +++++++ .../integration/tests/custom/ArrayTest.java | 199 +++++++++ 6 files changed, 821 insertions(+), 2 deletions(-) create mode 100644 pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/GenerateArrayTransformFunction.java create mode 100644 pinot-core/src/test/java/org/apache/pinot/core/operator/transform/function/GenerateArrayTransformFunctionTest.java diff --git a/pinot-common/src/main/java/org/apache/pinot/common/function/scalar/ArrayFunctions.java b/pinot-common/src/main/java/org/apache/pinot/common/function/scalar/ArrayFunctions.java index 52997d09269..13f77b2c5a3 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/function/scalar/ArrayFunctions.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/function/scalar/ArrayFunctions.java @@ -305,4 +305,48 @@ public static Object arrayValueConstructor(Object... arr) { } return arr; } + + @ScalarFunction + public static int[] generateIntArray(int start, int end, int inc) { + int size = (end - start) / inc + 1; + int[] arr = new int[size]; + + for (int i = 0, value = start; i < size; i++, value += inc) { + arr[i] = value; + } + return arr; + } + + @ScalarFunction + public static long[] generateLongArray(long start, long end, long inc) { + int size = (int) ((end - start) / inc + 1); + long[] arr = new long[size]; + + for (int i = 0; i < size; i++, start += inc) { + arr[i] = start; + } + return arr; + } + + @ScalarFunction + public static float[] generateFloatArray(float start, float end, float inc) { + int size = (int) ((end - start) / inc + 1); + float[] arr = new float[size]; + + for (int i = 0; i < size; i++, start += inc) { + arr[i] = start; + } + return arr; + } + + @ScalarFunction + public static double[] generateDoubleArray(double start, double end, double inc) { + int size = (int) ((end - start) / inc + 1); + double[] arr = new double[size]; + + for (int i = 0; i < size; i++, start += inc) { + arr[i] = start; + } + return arr; + } } diff --git a/pinot-common/src/test/java/org/apache/pinot/common/function/FunctionDefinitionRegistryTest.java b/pinot-common/src/test/java/org/apache/pinot/common/function/FunctionDefinitionRegistryTest.java index d2771bc626f..600016fd8f9 100644 --- a/pinot-common/src/test/java/org/apache/pinot/common/function/FunctionDefinitionRegistryTest.java +++ b/pinot-common/src/test/java/org/apache/pinot/common/function/FunctionDefinitionRegistryTest.java @@ -42,8 +42,8 @@ public class FunctionDefinitionRegistryTest { private static final List IGNORED_FUNCTION_NAMES = ImmutableList.of( // Geo functions are defined in pinot-core "geotoh3", - // ArrayToMV and ArrayValueConstructor are placeholder functions without implementation - "arraytomv", "arrayvalueconstructor", + // ArrayToMV, ArrayValueConstructor and GenerateArray are placeholder functions without implementation + "arraytomv", "arrayvalueconstructor", "generatearray", // Scalar function "scalar", // Functions without scalar function counterpart as of now diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/GenerateArrayTransformFunction.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/GenerateArrayTransformFunction.java new file mode 100644 index 00000000000..44632867ac3 --- /dev/null +++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/GenerateArrayTransformFunction.java @@ -0,0 +1,410 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.core.operator.transform.function; + +import com.google.common.base.Preconditions; +import java.math.BigDecimal; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import javax.annotation.Nullable; +import org.apache.pinot.common.request.context.ExpressionContext; +import org.apache.pinot.core.operator.ColumnContext; +import org.apache.pinot.core.operator.blocks.ValueBlock; +import org.apache.pinot.core.operator.transform.TransformResultMetadata; +import org.apache.pinot.segment.spi.index.reader.Dictionary; +import org.apache.pinot.spi.data.FieldSpec.DataType; +import org.roaringbitmap.RoaringBitmap; + +public class GenerateArrayTransformFunction implements TransformFunction { + public static final String FUNCTION_NAME = "generateArray"; + + private final DataType _dataType; + + private final int[] _intArrayLiteral; + private final long[] _longArrayLiteral; + private final float[] _floatArrayLiteral; + private final double[] _doubleArrayLiteral; + private int[][] _intArrayResult; + private long[][] _longArrayResult; + private float[][] _floatArrayResult; + private double[][] _doubleArrayResult; + + public GenerateArrayTransformFunction(List literalContexts) { + Preconditions.checkNotNull(literalContexts); + if (literalContexts.isEmpty()) { + _dataType = DataType.UNKNOWN; + _intArrayLiteral = new int[0]; + _longArrayLiteral = new long[0]; + _floatArrayLiteral = new float[0]; + _doubleArrayLiteral = new double[0]; + return; + } + Preconditions.checkState(literalContexts.size() == 2 || literalContexts.size() == 3, + "GenerateArrayTransformFunction takes only 2 or 3 arguments, found: %s", literalContexts.size()); + for (ExpressionContext literalContext : literalContexts) { + Preconditions.checkState(literalContext.getType() == ExpressionContext.Type.LITERAL, + "GenerateArrayTransformFunction only takes literals as arguments, found: %s", literalContext); + } + // Get the type of the first member in the literalContext and generate an array + _dataType = literalContexts.get(0).getLiteral().getType(); + + switch (_dataType) { + case INT: + int startInt = literalContexts.get(0).getLiteral().getIntValue(); + int endInt = literalContexts.get(1).getLiteral().getIntValue(); + int incInt; + if (literalContexts.size() == 3) { + incInt = literalContexts.get(2).getLiteral().getIntValue(); + } else { + incInt = 1; + } + Preconditions.checkState((endInt > startInt && incInt > 0) || (startInt > endInt + && incInt < 0), "Incorrect Step value."); + int size = (endInt - startInt) / incInt + 1; + _intArrayLiteral = new int[size]; + for (int i = 0, value = startInt; i < size; i++, value += incInt) { + _intArrayLiteral[i] = value; + } + _longArrayLiteral = null; + _floatArrayLiteral = null; + _doubleArrayLiteral = null; + break; + case LONG: + long startLong = Long.parseLong(literalContexts.get(0).getLiteral().getStringValue()); + long endLong = Long.parseLong(literalContexts.get(1).getLiteral().getStringValue()); + long incLong; + if (literalContexts.size() == 3) { + incLong = Long.parseLong(literalContexts.get(2).getLiteral().getStringValue()); + } else { + incLong = 1L; + } + Preconditions.checkState((endLong > startLong && incLong > 0) || (startLong > endLong + && incLong < 0), "Incorrect Step value."); + size = (int) ((endLong - startLong) / incLong + 1); + _longArrayLiteral = new long[size]; + for (int i = 0; i < size; i++, startLong += incLong) { + _longArrayLiteral[i] = startLong; + } + _intArrayLiteral = null; + _floatArrayLiteral = null; + _doubleArrayLiteral = null; + break; + case FLOAT: + float startFloat = Float.parseFloat(literalContexts.get(0).getLiteral().getStringValue()); + float endFloat = Float.parseFloat(literalContexts.get(1).getLiteral().getStringValue()); + float incFloat; + if (literalContexts.size() == 3) { + incFloat = Float.parseFloat(literalContexts.get(2).getLiteral().getStringValue()); + } else { + incFloat = 1; + } + Preconditions.checkState((endFloat > startFloat && incFloat > 0) || (startFloat > endFloat + && incFloat < 0), "Incorrect Step value."); + size = (int) ((endFloat - startFloat) / incFloat + 1); + _floatArrayLiteral = new float[size]; + for (int i = 0; i < size; i++, startFloat += incFloat) { + _floatArrayLiteral[i] = startFloat; + } + _intArrayLiteral = null; + _longArrayLiteral = null; + _doubleArrayLiteral = null; + break; + case DOUBLE: + double startDouble = Double.parseDouble(literalContexts.get(0).getLiteral().getStringValue()); + double endDouble = Double.parseDouble(literalContexts.get(1).getLiteral().getStringValue()); + double incDouble; + if (literalContexts.size() == 3) { + incDouble = Double.parseDouble(literalContexts.get(2).getLiteral().getStringValue()); + } else { + incDouble = 1.0; + } + Preconditions.checkState((endDouble > startDouble && incDouble > 0) || (startDouble > endDouble + && incDouble < 0), "Incorrect Step value."); + size = (int) ((endDouble - startDouble) / incDouble + 1); + _doubleArrayLiteral = new double[size]; + for (int i = 0; i < size; i++, startDouble += incDouble) { + _doubleArrayLiteral[i] = startDouble; + } + _intArrayLiteral = null; + _longArrayLiteral = null; + _floatArrayLiteral = null; + break; + default: + throw new IllegalStateException( + "Illegal data type for GenerateArrayTransformFunction: " + _dataType + ", literal contexts: " + + Arrays.toString(literalContexts.toArray())); + } + } + + public int[] getIntArrayLiteral() { + return _intArrayLiteral; + } + + public long[] getLongArrayLiteral() { + return _longArrayLiteral; + } + + public float[] getFloatArrayLiteral() { + return _floatArrayLiteral; + } + + public double[] getDoubleArrayLiteral() { + return _doubleArrayLiteral; + } + + @Override + public String getName() { + return FUNCTION_NAME; + } + + @Override + public void init(List arguments, Map columnContextMap) { + if (arguments.size() < 2) { + throw new IllegalArgumentException("At least 2 arguments are required for generateArray function"); + } + } + + @Override + public TransformResultMetadata getResultMetadata() { + return new TransformResultMetadata(_dataType, false, false); + } + + @Nullable + @Override + public Dictionary getDictionary() { + return null; + } + + @Override + public int[] transformToDictIdsSV(ValueBlock valueBlock) { + throw new UnsupportedOperationException(); + } + + @Override + public int[][] transformToDictIdsMV(ValueBlock valueBlock) { + throw new UnsupportedOperationException(); + } + + @Override + public int[] transformToIntValuesSV(ValueBlock valueBlock) { + throw new UnsupportedOperationException(); + } + + @Override + public long[] transformToLongValuesSV(ValueBlock valueBlock) { + throw new UnsupportedOperationException(); + } + + @Override + public float[] transformToFloatValuesSV(ValueBlock valueBlock) { + throw new UnsupportedOperationException(); + } + + @Override + public double[] transformToDoubleValuesSV(ValueBlock valueBlock) { + throw new UnsupportedOperationException(); + } + + @Override + public BigDecimal[] transformToBigDecimalValuesSV(ValueBlock valueBlock) { + throw new UnsupportedOperationException(); + } + + @Override + public String[] transformToStringValuesSV(ValueBlock valueBlock) { + throw new UnsupportedOperationException(); + } + + @Override + public byte[][] transformToBytesValuesSV(ValueBlock valueBlock) { + throw new UnsupportedOperationException(); + } + + @Override + public int[][] transformToIntValuesMV(ValueBlock valueBlock) { + int numDocs = valueBlock.getNumDocs(); + int[][] intArrayResult = _intArrayResult; + if (intArrayResult == null || intArrayResult.length < numDocs) { + intArrayResult = new int[numDocs][]; + int[] intArrayLiteral = _intArrayLiteral; + if (intArrayLiteral == null) { + switch (_dataType) { + case LONG: + intArrayLiteral = new int[_longArrayLiteral.length]; + for (int i = 0; i < _longArrayLiteral.length; i++) { + intArrayLiteral[i] = (int) _longArrayLiteral[i]; + } + break; + case FLOAT: + intArrayLiteral = new int[_floatArrayLiteral.length]; + for (int i = 0; i < _floatArrayLiteral.length; i++) { + intArrayLiteral[i] = (int) _floatArrayLiteral[i]; + } + break; + case DOUBLE: + intArrayLiteral = new int[_doubleArrayLiteral.length]; + for (int i = 0; i < _doubleArrayLiteral.length; i++) { + intArrayLiteral[i] = (int) _doubleArrayLiteral[i]; + } + break; + default: + throw new IllegalStateException("Unable to convert data type: " + _dataType + " to in array"); + } + } + Arrays.fill(intArrayResult, intArrayLiteral); + _intArrayResult = intArrayResult; + } + return intArrayResult; + } + + @Override + public long[][] transformToLongValuesMV(ValueBlock valueBlock) { + int numDocs = valueBlock.getNumDocs(); + long[][] longArrayResult = _longArrayResult; + if (longArrayResult == null || longArrayResult.length < numDocs) { + longArrayResult = new long[numDocs][]; + long[] longArrayLiteral = _longArrayLiteral; + if (longArrayLiteral == null) { + switch (_dataType) { + case INT: + longArrayLiteral = new long[_intArrayLiteral.length]; + for (int i = 0; i < _intArrayLiteral.length; i++) { + longArrayLiteral[i] = _intArrayLiteral[i]; + } + break; + case FLOAT: + longArrayLiteral = new long[_floatArrayLiteral.length]; + for (int i = 0; i < _floatArrayLiteral.length; i++) { + longArrayLiteral[i] = (long) _floatArrayLiteral[i]; + } + break; + case DOUBLE: + longArrayLiteral = new long[_doubleArrayLiteral.length]; + for (int i = 0; i < _doubleArrayLiteral.length; i++) { + longArrayLiteral[i] = (long) _doubleArrayLiteral[i]; + } + break; + default: + throw new IllegalStateException("Unable to convert data type: " + _dataType + " to long array"); + } + } + Arrays.fill(longArrayResult, longArrayLiteral); + _longArrayResult = longArrayResult; + } + return longArrayResult; + } + + @Override + public float[][] transformToFloatValuesMV(ValueBlock valueBlock) { + int numDocs = valueBlock.getNumDocs(); + float[][] floatArrayResult = _floatArrayResult; + if (floatArrayResult == null || floatArrayResult.length < numDocs) { + floatArrayResult = new float[numDocs][]; + float[] floatArrayLiteral = _floatArrayLiteral; + if (floatArrayLiteral == null) { + switch (_dataType) { + case INT: + floatArrayLiteral = new float[_intArrayLiteral.length]; + for (int i = 0; i < _intArrayLiteral.length; i++) { + floatArrayLiteral[i] = _intArrayLiteral[i]; + } + break; + case LONG: + floatArrayLiteral = new float[_longArrayLiteral.length]; + for (int i = 0; i < _longArrayLiteral.length; i++) { + floatArrayLiteral[i] = _longArrayLiteral[i]; + } + break; + case DOUBLE: + floatArrayLiteral = new float[_doubleArrayLiteral.length]; + for (int i = 0; i < _doubleArrayLiteral.length; i++) { + floatArrayLiteral[i] = (float) _doubleArrayLiteral[i]; + } + break; + default: + throw new IllegalStateException("Unable to convert data type: " + _dataType + " to float array"); + } + } + Arrays.fill(floatArrayResult, floatArrayLiteral); + _floatArrayResult = floatArrayResult; + } + return floatArrayResult; + } + + @Override + public double[][] transformToDoubleValuesMV(ValueBlock valueBlock) { + int numDocs = valueBlock.getNumDocs(); + double[][] doubleArrayResult = _doubleArrayResult; + if (doubleArrayResult == null || doubleArrayResult.length < numDocs) { + doubleArrayResult = new double[numDocs][]; + double[] doubleArrayLiteral = _doubleArrayLiteral; + if (doubleArrayLiteral == null) { + switch (_dataType) { + case INT: + doubleArrayLiteral = new double[_intArrayLiteral.length]; + for (int i = 0; i < _intArrayLiteral.length; i++) { + doubleArrayLiteral[i] = _intArrayLiteral[i]; + } + break; + case LONG: + doubleArrayLiteral = new double[_longArrayLiteral.length]; + for (int i = 0; i < _longArrayLiteral.length; i++) { + doubleArrayLiteral[i] = _longArrayLiteral[i]; + } + break; + case FLOAT: + doubleArrayLiteral = new double[_floatArrayLiteral.length]; + for (int i = 0; i < _floatArrayLiteral.length; i++) { + doubleArrayLiteral[i] = _floatArrayLiteral[i]; + } + break; + default: + throw new IllegalStateException("Unable to convert data type: " + _dataType + " to double array"); + } + } + Arrays.fill(doubleArrayResult, doubleArrayLiteral); + _doubleArrayResult = doubleArrayResult; + } + return doubleArrayResult; + } + + @Override + public String[][] transformToStringValuesMV(ValueBlock valueBlock) { + throw new UnsupportedOperationException(); + } + + @Override + public byte[][][] transformToBytesValuesMV(ValueBlock valueBlock) { + throw new UnsupportedOperationException(); + } + + @Nullable + @Override + public RoaringBitmap getNullBitmap(ValueBlock block) { + // Treat all unknown type values as null regardless of the value. + if (_dataType != DataType.UNKNOWN) { + return null; + } + int length = block.getNumDocs(); + RoaringBitmap bitmap = new RoaringBitmap(); + bitmap.add(0L, length); + return bitmap; + } +} diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/TransformFunctionFactory.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/TransformFunctionFactory.java index de7668ca26a..7643959d113 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/TransformFunctionFactory.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/TransformFunctionFactory.java @@ -299,6 +299,13 @@ public static TransformFunction get(ExpressionContext expression, Map transformFunctionClass = TRANSFORM_FUNCTION_MAP.get(functionName); if (transformFunctionClass != null) { diff --git a/pinot-core/src/test/java/org/apache/pinot/core/operator/transform/function/GenerateArrayTransformFunctionTest.java b/pinot-core/src/test/java/org/apache/pinot/core/operator/transform/function/GenerateArrayTransformFunctionTest.java new file mode 100644 index 00000000000..72bd48f7ff1 --- /dev/null +++ b/pinot-core/src/test/java/org/apache/pinot/core/operator/transform/function/GenerateArrayTransformFunctionTest.java @@ -0,0 +1,159 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.core.operator.transform.function; + +import java.util.ArrayList; +import java.util.List; +import org.apache.pinot.common.request.context.ExpressionContext; +import org.apache.pinot.core.operator.blocks.ProjectionBlock; +import org.apache.pinot.spi.data.FieldSpec.DataType; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; +import org.testng.Assert; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +import static org.mockito.Mockito.when; + +public class GenerateArrayTransformFunctionTest { + private static final int NUM_DOCS = 100; + private AutoCloseable _mocks; + + @Mock + private ProjectionBlock _projectionBlock; + + @BeforeMethod + public void setUp() { + _mocks = MockitoAnnotations.openMocks(this); + when(_projectionBlock.getNumDocs()).thenReturn(NUM_DOCS); + } + + @AfterMethod + public void tearDown() + throws Exception { + _mocks.close(); + } + @Test + public void testGenerateIntArrayTransformFunction() { + List arrayExpressions = new ArrayList<>(); + int[] inputArray = {0, 10, 1}; + for (int j : inputArray) { + arrayExpressions.add(ExpressionContext.forLiteral(DataType.INT, j)); + } + + GenerateArrayTransformFunction intArray = new GenerateArrayTransformFunction(arrayExpressions); + Assert.assertEquals(intArray.getResultMetadata().getDataType(), DataType.INT); + Assert.assertEquals(intArray.getIntArrayLiteral(), new int[]{ + 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 + }); + } + + @Test + public void testGenerateLongArrayTransformFunction() { + List arrayExpressions = new ArrayList<>(); + int[] inputArray = {0, 10, 1}; + for (int j : inputArray) { + arrayExpressions.add(ExpressionContext.forLiteral(DataType.LONG, (long) j)); + } + + GenerateArrayTransformFunction longArray = new GenerateArrayTransformFunction(arrayExpressions); + Assert.assertEquals(longArray.getResultMetadata().getDataType(), DataType.LONG); + Assert.assertEquals(longArray.getLongArrayLiteral(), new long[]{ + 0L, 1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L, 10L + }); + } + + @Test + public void testGenerateFloatArrayTransformFunction() { + List arrayExpressions = new ArrayList<>(); + int[] inputArray = {0, 10, 1}; + for (int j : inputArray) { + arrayExpressions.add(ExpressionContext.forLiteral(DataType.FLOAT, (float) j)); + } + + GenerateArrayTransformFunction floatArray = new GenerateArrayTransformFunction(arrayExpressions); + Assert.assertEquals(floatArray.getResultMetadata().getDataType(), DataType.FLOAT); + Assert.assertEquals(floatArray.getFloatArrayLiteral(), new float[]{ + 0f, 1f, 2f, 3f, 4f, 5f, 6f, 7f, 8f, 9f, 10f + }); + } + + @Test + public void testGenerateDoubleArrayTransformFunction() { + List arrayExpressions = new ArrayList<>(); + int[] inputArray = {0, 10, 1}; + for (int j : inputArray) { + arrayExpressions.add(ExpressionContext.forLiteral(DataType.DOUBLE, (double) j)); + } + + GenerateArrayTransformFunction doubleArray = new GenerateArrayTransformFunction(arrayExpressions); + Assert.assertEquals(doubleArray.getResultMetadata().getDataType(), DataType.DOUBLE); + Assert.assertEquals(doubleArray.getDoubleArrayLiteral(), new double[]{ + 0d, 1d, 2d, 3d, 4d, 5d, 6d, 7d, 8d, 9d, 10d + }); + } + @Test + public void testGenerateEmptyArrayTransformFunction() { + List arrayExpressions = new ArrayList<>(); + GenerateArrayTransformFunction emptyLiteral = new GenerateArrayTransformFunction(arrayExpressions); + Assert.assertEquals(emptyLiteral.getIntArrayLiteral(), new int[0]); + Assert.assertEquals(emptyLiteral.getLongArrayLiteral(), new long[0]); + Assert.assertEquals(emptyLiteral.getFloatArrayLiteral(), new float[0]); + Assert.assertEquals(emptyLiteral.getDoubleArrayLiteral(), new double[0]); + + int[][] ints = emptyLiteral.transformToIntValuesMV(_projectionBlock); + Assert.assertEquals(ints.length, NUM_DOCS); + for (int i = 0; i < NUM_DOCS; i++) { + Assert.assertEquals(ints[i].length, 0); + } + + long[][] longs = emptyLiteral.transformToLongValuesMV(_projectionBlock); + Assert.assertEquals(longs.length, NUM_DOCS); + for (int i = 0; i < NUM_DOCS; i++) { + Assert.assertEquals(longs[i].length, 0); + } + + float[][] floats = emptyLiteral.transformToFloatValuesMV(_projectionBlock); + Assert.assertEquals(floats.length, NUM_DOCS); + for (int i = 0; i < NUM_DOCS; i++) { + Assert.assertEquals(floats[i].length, 0); + } + + double[][] doubles = emptyLiteral.transformToDoubleValuesMV(_projectionBlock); + Assert.assertEquals(doubles.length, NUM_DOCS); + for (int i = 0; i < NUM_DOCS; i++) { + Assert.assertEquals(doubles[i].length, 0); + } + } + @Test + public void testGenerateIntArrayTransformFunctionWithIncorrectStepValue() { + List arrayExpressions = new ArrayList<>(); + int[] inputArray = {0, 10, -1}; + for (int j : inputArray) { + arrayExpressions.add(ExpressionContext.forLiteral(DataType.INT, j)); + } + + try { + GenerateArrayTransformFunction intArray = new GenerateArrayTransformFunction(arrayExpressions); + Assert.fail(); + } catch (IllegalStateException ignored) { + } + } +} diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/ArrayTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/ArrayTest.java index cc9d6eb4203..cf72b636d3c 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/ArrayTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/ArrayTest.java @@ -581,6 +581,205 @@ public void testStringArrayLiteral(boolean useMultiStageQueryEngine) } } + @Test(dataProvider = "useV1QueryEngine") + public void testGenerateIntArray(boolean useMultiStageQueryEngine) + throws Exception { + setUseMultiStageQueryEngine(useMultiStageQueryEngine); + String query = + String.format("SELECT " + + "GENERATE_ARRAY(1, 3, 1) " + + "FROM %s LIMIT 1", getTableName()); + JsonNode jsonNode = postQuery(query); + JsonNode rows = jsonNode.get("resultTable").get("rows"); + assertEquals(rows.size(), 1); + JsonNode row = rows.get(0); + assertEquals(row.size(), 1); + assertEquals(row.get(0).size(), 3); + assertEquals(row.get(0).get(0).asInt(), 1); + assertEquals(row.get(0).get(1).asInt(), 2); + assertEquals(row.get(0).get(2).asInt(), 3); + } + + @Test(dataProvider = "useV1QueryEngine") + public void testGenerateIntArrayWithoutStepValue(boolean useMultiStageQueryEngine) + throws Exception { + setUseMultiStageQueryEngine(useMultiStageQueryEngine); + String query = + String.format("SELECT " + + "GENERATE_ARRAY(1, 3) " + + "FROM %s LIMIT 1", getTableName()); + JsonNode jsonNode = postQuery(query); + JsonNode rows = jsonNode.get("resultTable").get("rows"); + assertEquals(rows.size(), 1); + JsonNode row = rows.get(0); + assertEquals(row.size(), 1); + assertEquals(row.get(0).size(), 3); + assertEquals(row.get(0).get(0).asInt(), 1); + assertEquals(row.get(0).get(1).asInt(), 2); + assertEquals(row.get(0).get(2).asInt(), 3); + } + + @Test(dataProvider = "useV1QueryEngine") + public void testGenerateIntArrayWithIncorrectStepValue(boolean useMultiStageQueryEngine) + throws Exception { + setUseMultiStageQueryEngine(useMultiStageQueryEngine); + String query = + String.format("SELECT " + + "GENERATE_ARRAY(1, 3, -1) " + + "FROM %s LIMIT 1", getTableName()); + JsonNode jsonNode = postQuery(query); + assertEquals(jsonNode.get("exceptions").size(), 1); + } + + @Test(dataProvider = "useV1QueryEngine") + public void testGenerateLongArray(boolean useMultiStageQueryEngine) + throws Exception { + setUseMultiStageQueryEngine(useMultiStageQueryEngine); + String query = + String.format("SELECT " + + "GENERATE_ARRAY(2147483648, 2147483650, 2) " + + "FROM %s LIMIT 1", getTableName()); + JsonNode jsonNode = postQuery(query); + JsonNode rows = jsonNode.get("resultTable").get("rows"); + assertEquals(rows.size(), 1); + JsonNode row = rows.get(0); + assertEquals(row.size(), 1); + assertEquals(row.get(0).size(), 2); + assertEquals(row.get(0).get(0).asLong(), 2147483648L); + assertEquals(row.get(0).get(1).asLong(), 2147483650L); + } + + @Test(dataProvider = "useV1QueryEngine") + public void testGenerateLongArrayWithoutStepValue(boolean useMultiStageQueryEngine) + throws Exception { + setUseMultiStageQueryEngine(useMultiStageQueryEngine); + String query = + String.format("SELECT " + + "GENERATE_ARRAY(2147483648, 2147483650) " + + "FROM %s LIMIT 1", getTableName()); + JsonNode jsonNode = postQuery(query); + JsonNode rows = jsonNode.get("resultTable").get("rows"); + assertEquals(rows.size(), 1); + JsonNode row = rows.get(0); + assertEquals(row.size(), 1); + assertEquals(row.get(0).size(), 3); + assertEquals(row.get(0).get(0).asLong(), 2147483648L); + assertEquals(row.get(0).get(1).asLong(), 2147483649L); + assertEquals(row.get(0).get(2).asLong(), 2147483650L); + } + + @Test(dataProvider = "useV1QueryEngine") + public void testGenerateLongArrayWithIncorrectStepValue(boolean useMultiStageQueryEngine) + throws Exception { + setUseMultiStageQueryEngine(useMultiStageQueryEngine); + String query = + String.format("SELECT " + + "GENERATE_ARRAY(2147483648, 2147483650, -1) " + + "FROM %s LIMIT 1", getTableName()); + JsonNode jsonNode = postQuery(query); + assertEquals(jsonNode.get("exceptions").size(), 1); + } + + @Test(dataProvider = "useV1QueryEngine") + public void testGenerateFloatArray(boolean useMultiStageQueryEngine) + throws Exception { + setUseMultiStageQueryEngine(useMultiStageQueryEngine); + String query = + String.format("SELECT " + + "GENERATE_ARRAY(0.1, 0.3, 0.1) " + + "FROM %s LIMIT 1", getTableName()); + JsonNode jsonNode = postQuery(query); + JsonNode rows = jsonNode.get("resultTable").get("rows"); + assertEquals(rows.size(), 1); + JsonNode row = rows.get(0); + assertEquals(row.size(), 1); + assertEquals(row.get(0).size(), 3); + assertEquals(row.get(0).get(0).asDouble(), 0.1); + assertEquals(row.get(0).get(1).asDouble(), 0.1 + 0.1 * 1); + assertEquals(row.get(0).get(2).asDouble(), 0.1 + 0.1 * 2); + } + + @Test(dataProvider = "useV1QueryEngine") + public void testGenerateFloatArrayWithoutStepValue(boolean useMultiStageQueryEngine) + throws Exception { + setUseMultiStageQueryEngine(useMultiStageQueryEngine); + String query = + String.format("SELECT " + + "GENERATE_ARRAY(0.3, 3.1) " + + "FROM %s LIMIT 1", getTableName()); + JsonNode jsonNode = postQuery(query); + JsonNode rows = jsonNode.get("resultTable").get("rows"); + assertEquals(rows.size(), 1); + JsonNode row = rows.get(0); + assertEquals(row.size(), 1); + assertEquals(row.get(0).size(), 3); + assertEquals(row.get(0).get(0).asDouble(), 0.3); + assertEquals(row.get(0).get(1).asDouble(), 1.3); + assertEquals(row.get(0).get(2).asDouble(), 2.3); + } + + @Test(dataProvider = "useV1QueryEngine") + public void testGenerateFloatArrayWithIncorrectStepValue(boolean useMultiStageQueryEngine) + throws Exception { + setUseMultiStageQueryEngine(useMultiStageQueryEngine); + String query = + String.format("SELECT " + + "GENERATE_ARRAY(0.3, 0.1, 1.1) " + + "FROM %s LIMIT 1", getTableName()); + JsonNode jsonNode = postQuery(query); + assertEquals(jsonNode.get("exceptions").size(), 1); + } + + @Test(dataProvider = "useV1QueryEngine") + public void testGenerateDoubleArray(boolean useMultiStageQueryEngine) + throws Exception { + setUseMultiStageQueryEngine(useMultiStageQueryEngine); + String query = + String.format("SELECT " + + "GENERATE_ARRAY(CAST(0.1 AS DOUBLE), CAST(0.3 AS DOUBLE), CAST(0.1 AS DOUBLE)) " + + "FROM %s LIMIT 1", getTableName()); + JsonNode jsonNode = postQuery(query); + JsonNode rows = jsonNode.get("resultTable").get("rows"); + assertEquals(rows.size(), 1); + JsonNode row = rows.get(0); + assertEquals(row.size(), 1); + assertEquals(row.get(0).size(), 3); + assertEquals(row.get(0).get(0).asDouble(), 0.1); + assertEquals(row.get(0).get(1).asDouble(), 0.1 + 0.1 * 1); + assertEquals(row.get(0).get(2).asDouble(), 0.1 + 0.1 * 2); + } + + @Test(dataProvider = "useV1QueryEngine") + public void testGenerateDoubleArrayWithoutStepValue(boolean useMultiStageQueryEngine) + throws Exception { + setUseMultiStageQueryEngine(useMultiStageQueryEngine); + String query = + String.format("SELECT " + + "GENERATE_ARRAY(CAST(0.3 AS DOUBLE), CAST(3.1 AS DOUBLE)) " + + "FROM %s LIMIT 1", getTableName()); + JsonNode jsonNode = postQuery(query); + JsonNode rows = jsonNode.get("resultTable").get("rows"); + assertEquals(rows.size(), 1); + JsonNode row = rows.get(0); + assertEquals(row.size(), 1); + assertEquals(row.get(0).size(), 3); + assertEquals(row.get(0).get(0).asDouble(), 0.3); + assertEquals(row.get(0).get(1).asDouble(), 1.3); + assertEquals(row.get(0).get(2).asDouble(), 2.3); + } + + @Test(dataProvider = "useV1QueryEngine") + public void testGenerateDoubleArrayWithIncorrectStepValue(boolean useMultiStageQueryEngine) + throws Exception { + setUseMultiStageQueryEngine(useMultiStageQueryEngine); + String query = + String.format("SELECT " + + "GENERATE_ARRAY(CAST(0.3 AS DOUBLE), CAST(0.1 AS DOUBLE), CAST(1.1 AS DOUBLE)) " + + "FROM %s LIMIT 1", getTableName()); + JsonNode jsonNode = postQuery(query); + assertEquals(jsonNode.get("exceptions").size(), 1); + } + @Override public String getTableName() { return DEFAULT_TABLE_NAME; From 3db93ccc210991ad91b6a5306b3b610d8f5e0507 Mon Sep 17 00:00:00 2001 From: Xiaobing <61892277+klsince@users.noreply.github.com> Date: Mon, 15 Jul 2024 18:29:05 -0700 Subject: [PATCH 16/24] missed one util method that should derive partition id from uploaded segment name (#13612) * add one util method to derive partition id from uploaded segment names to avoid missing the segment name checks * add tests --- .../pinot/common/utils/SegmentUtils.java | 32 +++++++++++-------- .../pinot/common/utils/SegmentUtilsTest.java | 9 ++++++ 2 files changed, 28 insertions(+), 13 deletions(-) diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/SegmentUtils.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/SegmentUtils.java index 8b89a2b1a5b..26d231651de 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/utils/SegmentUtils.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/SegmentUtils.java @@ -39,17 +39,10 @@ private SegmentUtils() { @Nullable public static Integer getRealtimeSegmentPartitionId(String segmentName, String realtimeTableName, HelixManager helixManager, @Nullable String partitionColumn) { - // A fast path if the segmentName is an LLC segment name: get the partition id from the name directly - LLCSegmentName llcSegmentName = LLCSegmentName.of(segmentName); - if (llcSegmentName != null) { - return llcSegmentName.getPartitionGroupId(); + Integer partitionId = getPartitionIdFromRealtimeSegmentName(segmentName); + if (partitionId != null) { + return partitionId; } - - UploadedRealtimeSegmentName uploadedRealtimeSegmentName = UploadedRealtimeSegmentName.of(segmentName); - if (uploadedRealtimeSegmentName != null) { - return uploadedRealtimeSegmentName.getPartitionId(); - } - // Otherwise, retrieve the partition id from the segment zk metadata. SegmentZKMetadata segmentZKMetadata = ZKMetadataProvider.getSegmentZKMetadata(helixManager.getHelixPropertyStore(), realtimeTableName, segmentName); @@ -61,13 +54,26 @@ public static Integer getRealtimeSegmentPartitionId(String segmentName, String r @Nullable public static Integer getRealtimeSegmentPartitionId(String segmentName, SegmentZKMetadata segmentZKMetadata, @Nullable String partitionColumn) { - // A fast path if the segmentName is an LLC segment name: get the partition id from the name directly + Integer partitionId = getPartitionIdFromRealtimeSegmentName(segmentName); + if (partitionId != null) { + return partitionId; + } + // Otherwise, retrieve the partition id from the segment zk metadata. + return getRealtimeSegmentPartitionId(segmentZKMetadata, partitionColumn); + } + + @Nullable + private static Integer getPartitionIdFromRealtimeSegmentName(String segmentName) { + // A fast path to get partition id if the segmentName is in a known format like LLC. LLCSegmentName llcSegmentName = LLCSegmentName.of(segmentName); if (llcSegmentName != null) { return llcSegmentName.getPartitionGroupId(); } - // Otherwise, retrieve the partition id from the segment zk metadata. - return getRealtimeSegmentPartitionId(segmentZKMetadata, partitionColumn); + UploadedRealtimeSegmentName uploadedRealtimeSegmentName = UploadedRealtimeSegmentName.of(segmentName); + if (uploadedRealtimeSegmentName != null) { + return uploadedRealtimeSegmentName.getPartitionId(); + } + return null; } @Nullable diff --git a/pinot-common/src/test/java/org/apache/pinot/common/utils/SegmentUtilsTest.java b/pinot-common/src/test/java/org/apache/pinot/common/utils/SegmentUtilsTest.java index 1257bc04989..812c6c9959a 100644 --- a/pinot-common/src/test/java/org/apache/pinot/common/utils/SegmentUtilsTest.java +++ b/pinot-common/src/test/java/org/apache/pinot/common/utils/SegmentUtilsTest.java @@ -88,11 +88,20 @@ void testGetRealtimeSegmentPartitionIdForUploadedRealtimeSegment() { String segmentName = "uploaded__table_name__3__100__1716185755000"; try { + // Check the util method that gets segmentZKMetadata via HelixManager for partition id. Integer partitionId = SegmentUtils.getRealtimeSegmentPartitionId(segmentName, "realtimeTableName", null, "partitionColumn"); assertEquals(partitionId, 3); } catch (Exception e) { fail("Exception should not be thrown"); } + + try { + // Check the util method that has segmentZKMetadata passed in directly for partition id. + Integer partitionId = SegmentUtils.getRealtimeSegmentPartitionId(segmentName, null, "partitionColumn"); + assertEquals(partitionId, 3); + } catch (Exception e) { + fail("Exception should not be thrown"); + } } } From 4422e9a1717fc8ede0c5d8bb7faf9f4715ae5413 Mon Sep 17 00:00:00 2001 From: "Xiaotian (Jackie) Jiang" <17555551+Jackie-Jiang@users.noreply.github.com> Date: Tue, 16 Jul 2024 11:41:56 -0700 Subject: [PATCH 17/24] Apply SEARCH expand rule in the end (#13614) --- .../pinot/calcite/rel/rules/PinotQueryRuleSets.java | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/rules/PinotQueryRuleSets.java b/pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/rules/PinotQueryRuleSets.java index 6c2498c70bd..e75dadb2681 100644 --- a/pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/rules/PinotQueryRuleSets.java +++ b/pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/rules/PinotQueryRuleSets.java @@ -121,9 +121,7 @@ private PinotQueryRuleSets() { public static final Collection PINOT_POST_RULES = ImmutableList.of( // Evaluate the Literal filter nodes CoreRules.FILTER_REDUCE_EXPRESSIONS, - // Expand all SEARCH nodes to simplified filter nodes. SEARCH nodes get created for queries with range - // predicates, in-clauses, etc. - PinotFilterExpandSearchRule.INSTANCE, + // TODO: Merge the following 2 rules into a single rule // add an extra exchange for sort PinotSortExchangeNodeInsertRule.INSTANCE, // copy exchanges down, this must be done after SortExchangeNodeInsertRule @@ -139,6 +137,13 @@ private PinotQueryRuleSets() { PinotJoinToDynamicBroadcastRule.INSTANCE, // remove exchanges when there's duplicates - PinotExchangeEliminationRule.INSTANCE + PinotExchangeEliminationRule.INSTANCE, + + // Expand all SEARCH nodes to simplified filter nodes. SEARCH nodes get created for queries with range predicates, + // in-clauses, etc. + // NOTE: Keep this rule at the end because it can potentially create a lot of predicates joined by OR/AND for IN/ + // NOT IN clause, which can be expensive to process in other rules. + // TODO: Consider removing this rule and directly handle SEARCH in RexExpressionUtils. + PinotFilterExpandSearchRule.INSTANCE ); } From 65907b78d633faa29de6e05f955afd7f3d94343f Mon Sep 17 00:00:00 2001 From: Gonzalo Ortiz Jaureguizar Date: Tue, 16 Jul 2024 21:01:00 +0200 Subject: [PATCH 18/24] Make ARRAY_TO_MV a not deterministic function (#13618) --- .../function/TransformFunctionType.java | 21 ++++++++++++++++++- .../MultiStageEngineIntegrationTest.java | 20 ++++++++++++++++++ .../sql/PinotSqlTransformFunction.java | 9 +++++++- .../calcite/sql/fun/PinotOperatorTable.java | 2 +- 4 files changed, 49 insertions(+), 3 deletions(-) diff --git a/pinot-common/src/main/java/org/apache/pinot/common/function/TransformFunctionType.java b/pinot-common/src/main/java/org/apache/pinot/common/function/TransformFunctionType.java index d2c5c8127c4..cb0683fd8bf 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/function/TransformFunctionType.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/function/TransformFunctionType.java @@ -106,7 +106,22 @@ public enum TransformFunctionType { // object type ARRAY_TO_MV("arrayToMV", ReturnTypes.cascade(opBinding -> positionalComponentReturnType(opBinding, 0), SqlTypeTransforms.FORCE_NULLABLE), - OperandTypes.family(SqlTypeFamily.ARRAY), "array_to_mv"), + OperandTypes.family(SqlTypeFamily.ARRAY), "array_to_mv") { + + @Override + public boolean isDeterministic() { + // ARRAY_TO_MV is not deterministic. In fact, it has no implementation + // We need to explicitly set it as not deterministic in order to do not let Calcite to optimize expressions like + // `ARRAY_TO_MV(RandomAirports) = 'MFR' and ARRAY_TO_MV(RandomAirports) = 'GTR'` as `false`. + // If the function were deterministic, its value would never be MFR and GTR at the same time, so Calcite is + // smart enough to know there is no value that satisfies the condition. + // In fact what ARRAY_TO_MV does is just to trick Calcite typesystem, but then what the leaf stage executor + // receives is `RandomAirports = 'MFR' and RandomAirports = 'GTR'`, which in the V1 semantics means: + // true if and only if RandomAirports contains a value equal to 'MFR' and RandomAirports contains a value equal + // to 'GTR' + return false; + } + }, // string functions JSON_EXTRACT_SCALAR("jsonExtractScalar", @@ -358,6 +373,10 @@ public SqlFunctionCategory getSqlFunctionCategory() { return _sqlFunctionCategory; } + public boolean isDeterministic() { + return true; + } + /** Returns the optional explicit returning type specification. */ private static RelDataType positionalReturnTypeInferenceFromStringLiteral(SqlOperatorBinding opBinding, int pos) { return positionalReturnTypeInferenceFromStringLiteral(opBinding, pos, SqlTypeName.ANY); diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiStageEngineIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiStageEngineIntegrationTest.java index 811e957fe14..602979fc57b 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiStageEngineIntegrationTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiStageEngineIntegrationTest.java @@ -704,6 +704,26 @@ public void testVariadicFunction() throws Exception { assertEquals(jsonNode.get("numRowsResultSet").asInt(), 3); } + @Test + public void skipArrayToMvOptimization() + throws Exception { + String sqlQuery = "SELECT 1 " + + "FROM mytable " + + "WHERE ARRAY_TO_MV(RandomAirports) = 'MFR' and ARRAY_TO_MV(RandomAirports) = 'GTR'"; + + JsonNode jsonNode = postQuery("Explain plan for " + sqlQuery); + JsonNode plan = jsonNode.get("resultTable").get("rows").get(0).get(1); + + Pattern pattern = Pattern.compile("LogicalValues\\(tuples=\\[\\[]]\\)"); + String planAsText = plan.asText(); + boolean matches = pattern.matcher(planAsText).find(); + Assert.assertFalse(matches, "Plan should not contain contain LogicalValues node but plan is \n" + + planAsText); + + jsonNode = postQuery(sqlQuery); + Assert.assertNotEquals(jsonNode.get("resultTable").get("rows").size(), 0); + } + @Test public void testMultiValueColumnGroupByOrderBy() throws Exception { diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/calcite/sql/PinotSqlTransformFunction.java b/pinot-query-planner/src/main/java/org/apache/pinot/calcite/sql/PinotSqlTransformFunction.java index 7c97fbf7ae3..ffd5b49667e 100644 --- a/pinot-query-planner/src/main/java/org/apache/pinot/calcite/sql/PinotSqlTransformFunction.java +++ b/pinot-query-planner/src/main/java/org/apache/pinot/calcite/sql/PinotSqlTransformFunction.java @@ -31,10 +31,17 @@ * Pinot SqlAggFunction class to register the Pinot aggregation functions with the Calcite operator table. */ public class PinotSqlTransformFunction extends SqlFunction { + private final boolean _isDeterministic; public PinotSqlTransformFunction(String name, SqlKind kind, @Nullable SqlReturnTypeInference returnTypeInference, @Nullable SqlOperandTypeInference operandTypeInference, @Nullable SqlOperandTypeChecker operandTypeChecker, - SqlFunctionCategory category) { + SqlFunctionCategory category, boolean isDeterministic) { super(name, kind, returnTypeInference, operandTypeInference, operandTypeChecker, category); + _isDeterministic = isDeterministic; + } + + @Override + public boolean isDeterministic() { + return _isDeterministic; } } diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/calcite/sql/fun/PinotOperatorTable.java b/pinot-query-planner/src/main/java/org/apache/pinot/calcite/sql/fun/PinotOperatorTable.java index 1eb1890304a..68dbce88f36 100644 --- a/pinot-query-planner/src/main/java/org/apache/pinot/calcite/sql/fun/PinotOperatorTable.java +++ b/pinot-query-planner/src/main/java/org/apache/pinot/calcite/sql/fun/PinotOperatorTable.java @@ -149,7 +149,7 @@ private void registerTransformFunction(String functionName, TransformFunctionTyp PinotSqlTransformFunction sqlTransformFunction = new PinotSqlTransformFunction(functionName.toUpperCase(Locale.ROOT), functionType.getSqlKind(), functionType.getReturnTypeInference(), null, functionType.getOperandTypeChecker(), - functionType.getSqlFunctionCategory()); + functionType.getSqlFunctionCategory(), functionType.isDeterministic()); if (notRegistered(sqlTransformFunction)) { register(sqlTransformFunction); } From ce79f1a56fb134576637e602cf08f3d2a73ac591 Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Tue, 16 Jul 2024 12:04:14 -0700 Subject: [PATCH 19/24] Bump com.h2database:h2 from 2.2.224 to 2.3.230 (#13622) --- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index 6b80de668a0..2b608b5718b 100644 --- a/pom.xml +++ b/pom.xml @@ -1344,7 +1344,7 @@ com.h2database h2 - 2.2.224 + 2.3.230 com.github.jnr From 0520e7ae96befad1bdf88c272731c60998b12c3d Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Tue, 16 Jul 2024 12:05:00 -0700 Subject: [PATCH 20/24] Bump com.azure:azure-core from 1.49.1 to 1.50.0 (#13624) --- pinot-plugins/pinot-file-system/pinot-adls/pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pinot-plugins/pinot-file-system/pinot-adls/pom.xml b/pinot-plugins/pinot-file-system/pinot-adls/pom.xml index 09d9683ce86..a8a86c9c323 100644 --- a/pinot-plugins/pinot-file-system/pinot-adls/pom.xml +++ b/pinot-plugins/pinot-file-system/pinot-adls/pom.xml @@ -57,7 +57,7 @@ com.azure azure-core - 1.49.1 + 1.50.0 io.projectreactor From e2f2f41c8b1064aba50e5f846171fead76d4ea32 Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Tue, 16 Jul 2024 12:05:14 -0700 Subject: [PATCH 21/24] Bump commons-codec:commons-codec from 1.17.0 to 1.17.1 (#13623) --- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index 2b608b5718b..e9202601694 100644 --- a/pom.xml +++ b/pom.xml @@ -193,7 +193,7 @@ 1.11.0 2.11.0 2.16.1 - 1.17.0 + 1.17.1 1.8.0 3.11.1 1.9.0 From e4497f853a0b032a44ba02f503d36eba911b032f Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Tue, 16 Jul 2024 12:05:28 -0700 Subject: [PATCH 22/24] Bump com.azure:azure-core-http-netty from 1.15.1 to 1.15.2 (#13621) --- pinot-plugins/pinot-file-system/pinot-adls/pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pinot-plugins/pinot-file-system/pinot-adls/pom.xml b/pinot-plugins/pinot-file-system/pinot-adls/pom.xml index a8a86c9c323..23f61cab703 100644 --- a/pinot-plugins/pinot-file-system/pinot-adls/pom.xml +++ b/pinot-plugins/pinot-file-system/pinot-adls/pom.xml @@ -52,7 +52,7 @@ com.azure azure-core-http-netty - 1.15.1 + 1.15.2 com.azure From 93eccdc7aa415d5c7a22cc4d8ccca093c6f98c39 Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Tue, 16 Jul 2024 12:06:01 -0700 Subject: [PATCH 23/24] Bump confluent.version from 7.6.1 to 7.6.2 (#13620) --- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index e9202601694..73fa4d10cd7 100644 --- a/pom.xml +++ b/pom.xml @@ -180,7 +180,7 @@ 7.0.0 8.3.6 - 7.6.1 + 7.6.2 3.3.0 1.19.1 From 0e5ad8ede19a28a6ce6b1c9595dadebbe49b4a60 Mon Sep 17 00:00:00 2001 From: sullis Date: Tue, 16 Jul 2024 17:23:03 -0700 Subject: [PATCH 24/24] add assertions for S3PinotFS move (#13631) --- .../plugin/filesystem/S3PinotFSTest.java | 22 +++++++++++++++++-- 1 file changed, 20 insertions(+), 2 deletions(-) diff --git a/pinot-plugins/pinot-file-system/pinot-s3/src/test/java/org/apache/pinot/plugin/filesystem/S3PinotFSTest.java b/pinot-plugins/pinot-file-system/pinot-s3/src/test/java/org/apache/pinot/plugin/filesystem/S3PinotFSTest.java index a41391034d6..48224a359db 100644 --- a/pinot-plugins/pinot-file-system/pinot-s3/src/test/java/org/apache/pinot/plugin/filesystem/S3PinotFSTest.java +++ b/pinot-plugins/pinot-file-system/pinot-s3/src/test/java/org/apache/pinot/plugin/filesystem/S3PinotFSTest.java @@ -433,6 +433,9 @@ public void testMoveFile() _s3PinotFS.copyFromLocalFile(file, sourceUri); + HeadObjectResponse sourceHeadObjectResponse = + _s3Client.headObject(S3TestUtils.getHeadObjectRequest(BUCKET, fileName)); + URI targetUri = URI.create(String.format(FILE_FORMAT, SCHEME, BUCKET, "move-target")); boolean moveResult = _s3PinotFS.move(sourceUri, targetUri, false); @@ -441,9 +444,24 @@ public void testMoveFile() Assert.assertFalse(_s3PinotFS.exists(sourceUri)); Assert.assertTrue(_s3PinotFS.exists(targetUri)); - HeadObjectResponse headObjectResponse = + HeadObjectResponse targetHeadObjectResponse = _s3Client.headObject(S3TestUtils.getHeadObjectRequest(BUCKET, "move-target")); - Assert.assertEquals(headObjectResponse.contentLength(), fileSize); + Assert.assertEquals(targetHeadObjectResponse.contentLength(), + fileSize); + Assert.assertEquals(targetHeadObjectResponse.storageClass(), + sourceHeadObjectResponse.storageClass()); + Assert.assertEquals(targetHeadObjectResponse.archiveStatus(), + sourceHeadObjectResponse.archiveStatus()); + Assert.assertEquals(targetHeadObjectResponse.contentType(), + sourceHeadObjectResponse.contentType()); + Assert.assertEquals(targetHeadObjectResponse.expiresString(), + sourceHeadObjectResponse.expiresString()); + Assert.assertEquals(targetHeadObjectResponse.eTag(), + sourceHeadObjectResponse.eTag()); + Assert.assertEquals(targetHeadObjectResponse.replicationStatusAsString(), + sourceHeadObjectResponse.replicationStatusAsString()); + Assert.assertEquals(targetHeadObjectResponse.lastModified(), + sourceHeadObjectResponse.lastModified()); } finally { FileUtils.deleteQuietly(file); }