diff --git a/docs/multi-stage-query/reference.md b/docs/multi-stage-query/reference.md
index 6236b6545258..010bbff2a270 100644
--- a/docs/multi-stage-query/reference.md
+++ b/docs/multi-stage-query/reference.md
@@ -246,6 +246,7 @@ The following table lists the context parameters for the MSQ task engine:
| `durableShuffleStorage` | SELECT, INSERT, REPLACE
Whether to use durable storage for shuffle mesh. To use this feature, configure the durable storage at the server level using `druid.msq.intermediate.storage.enable=true`). If these properties are not configured, any query with the context variable `durableShuffleStorage=true` fails with a configuration error.
Whether to turn on fault tolerance mode or not. Failed workers are retried based on [Limits](#limits). Cannot be used when `durableShuffleStorage` is explicitly set to false. | `false` |
| `selectDestination` | SELECT
Controls where the final result of the select query is written. Use `taskReport`(the default) to write select results to the task report. This is not scalable since task reports size explodes for large results Use `durableStorage` to write results to durable storage location. For large results sets, its recommended to use `durableStorage` . To configure durable storage see [`this`](#durable-storage) section. | `taskReport` |
+| `waitTillSegmentsLoad` | INSERT, REPLACE
If set, the ingest query waits for the generated segment to be loaded before exiting, else the ingest query exits without waiting. The task and live reports contain the information about the status of loading segments if this flag is set. This will ensure that any future queries made after the ingestion exits will include results from the ingestion. The drawback is that the controller task will stall till the segments are loaded. | `false` |
## Joins
diff --git a/docs/querying/sql.md b/docs/querying/sql.md
index 378bf302872b..13259bdf4044 100644
--- a/docs/querying/sql.md
+++ b/docs/querying/sql.md
@@ -57,7 +57,7 @@ Druid SQL supports SELECT queries with the following structure:
[ WITH tableName [ ( column1, column2, ... ) ] AS ( query ) ]
SELECT [ ALL | DISTINCT ] { * | exprs }
FROM {
| () | [ INNER | LEFT ] JOIN ON condition }
-[, UNNEST(source_expression) as table_alias_name(column_alias_name) ]
+[ CROSS JOIN UNNEST(source_expression) as table_alias_name(column_alias_name) ]
[ WHERE expr ]
[ GROUP BY [ exprs | GROUPING SETS ( (exprs), ... ) | ROLLUP (exprs) | CUBE (exprs) ] ]
[ HAVING expr ]
@@ -97,7 +97,7 @@ The UNNEST clause unnests array values. It's the SQL equivalent to the [unnest d
The following is the general syntax for UNNEST, specifically a query that returns the column that gets unnested:
```sql
-SELECT column_alias_name FROM datasource, UNNEST(source_expression1) AS table_alias_name1(column_alias_name1), UNNEST(source_expression2) AS table_alias_name2(column_alias_name2), ...
+SELECT column_alias_name FROM datasource CROSS JOIN UNNEST(source_expression1) AS table_alias_name1(column_alias_name1) CROSS JOIN UNNEST(source_expression2) AS table_alias_name2(column_alias_name2) ...
```
* The `datasource` for UNNEST can be any Druid datasource, such as the following:
@@ -112,7 +112,7 @@ Keep the following things in mind when writing your query:
- You must include the context parameter `"enableUnnest": true`.
- You can unnest multiple source expressions in a single query.
-- Notice the comma between the datasource and the UNNEST function. This is needed in most cases of the UNNEST function. Specifically, it is not needed when you're unnesting an inline array since the array itself is the datasource.
+- Notice the CROSS JOIN between the datasource and the UNNEST function. This is needed in most cases of the UNNEST function. Specifically, it is not needed when you're unnesting an inline array since the array itself is the datasource.
- If you view the native explanation of a SQL UNNEST, you'll notice that Druid uses `j0.unnest` as a virtual column to perform the unnest. An underscore is added for each unnest, so you may notice virtual columns named `_j0.unnest` or `__j0.unnest`.
- UNNEST preserves the ordering of the source array that is being unnested.
diff --git a/docs/tutorials/tutorial-unnest-arrays.md b/docs/tutorials/tutorial-unnest-arrays.md
index 1f8c530f8d01..49fdfe98af25 100644
--- a/docs/tutorials/tutorial-unnest-arrays.md
+++ b/docs/tutorials/tutorial-unnest-arrays.md
@@ -163,7 +163,7 @@ In the results, notice that the column named `dim3` has nested values like `["a"
The following is the general syntax for UNNEST:
```sql
-SELECT column_alias_name FROM datasource, UNNEST(source_expression) AS table_alias_name(column_alias_name)
+SELECT column_alias_name FROM datasource CROSS JOIN UNNEST(source_expression) AS table_alias_name(column_alias_name)
```
In addition, you must supply the following context parameter:
@@ -179,7 +179,7 @@ For more information about the syntax, see [UNNEST](../querying/sql.md#unnest).
The following query returns a column called `d3` from the table `nested_data`. `d3` contains the unnested values from the source column `dim3`:
```sql
-SELECT d3 FROM "nested_data", UNNEST(MV_TO_ARRAY(dim3)) AS example_table(d3)
+SELECT d3 FROM "nested_data" CROSS JOIN UNNEST(MV_TO_ARRAY(dim3)) AS example_table(d3)
```
Notice the MV_TO_ARRAY helper function, which converts the multi-value records in `dim3` to arrays. It is required since `dim3` is a multi-value string dimension.
@@ -191,7 +191,7 @@ If the column you are unnesting is not a string dimension, then you do not need
You can unnest into a virtual column (multiple columns treated as one). The following query returns the two source columns and a third virtual column containing the unnested data:
```sql
-SELECT dim4,dim5,d45 FROM nested_data, UNNEST(ARRAY[dim4,dim5]) AS example_table(d45)
+SELECT dim4,dim5,d45 FROM nested_data CROSS JOIN UNNEST(ARRAY[dim4,dim5]) AS example_table(d45)
```
The virtual column `d45` is the product of the two source columns. Notice how the total number of rows has grown. The table `nested_data` had only seven rows originally.
@@ -199,7 +199,7 @@ The virtual column `d45` is the product of the two source columns. Notice how th
Another way to unnest a virtual column is to concatenate them with ARRAY_CONCAT:
```sql
-SELECT dim4,dim5,d45 FROM nested_data, UNNEST(ARRAY_CONCAT(dim4,dim5)) AS example_table(d45)
+SELECT dim4,dim5,d45 FROM nested_data CROSS JOIN UNNEST(ARRAY_CONCAT(dim4,dim5)) AS example_table(d45)
```
Decide which method to use based on what your goals are.
@@ -221,7 +221,7 @@ The example query returns the following from the `nested_data` datasource:
- an unnested virtual column composed of `dim4` and `dim5` aliased to `d45`
```sql
-SELECT dim3,dim4,dim5,d3,d45 FROM "nested_data", UNNEST(MV_TO_ARRAY("dim3")) AS foo1(d3), UNNEST(ARRAY[dim4,dim5]) AS foo2(d45)
+SELECT dim3,dim4,dim5,d3,d45 FROM "nested_data" CROSS JOIN UNNEST(MV_TO_ARRAY("dim3")) AS foo1(d3) CROSS JOIN UNNEST(ARRAY[dim4,dim5]) AS foo2(d45)
```
@@ -230,7 +230,7 @@ SELECT dim3,dim4,dim5,d3,d45 FROM "nested_data", UNNEST(MV_TO_ARRAY("dim3")) AS
The following query uses only three columns from the `nested_data` table as the datasource. From that subset, it unnests the column `dim3` into `d3` and returns `d3`.
```sql
-SELECT d3 FROM (SELECT dim1, dim2, dim3 FROM "nested_data"), UNNEST(MV_TO_ARRAY(dim3)) AS example_table(d3)
+SELECT d3 FROM (SELECT dim1, dim2, dim3 FROM "nested_data") CROSS JOIN UNNEST(MV_TO_ARRAY(dim3)) AS example_table(d3)
```
### Unnest with a filter
@@ -242,7 +242,7 @@ You can specify which rows to unnest by including a filter in your query. The fo
* Returns the records for the unnested `d3` that have a `dim2` record that matches the filter
```sql
-SELECT d3 FROM (SELECT * FROM nested_data WHERE dim2 IN ('abc')), UNNEST(MV_TO_ARRAY(dim3)) AS example_table(d3)
+SELECT d3 FROM (SELECT * FROM nested_data WHERE dim2 IN ('abc')) CROSS JOIN UNNEST(MV_TO_ARRAY(dim3)) AS example_table(d3)
```
You can also filter the results of an UNNEST clause. The following example unnests the inline array `[1,2,3]` but only returns the rows that match the filter:
@@ -257,7 +257,7 @@ This means that you can run a query like the following where Druid only return r
- The value of `m1` is less than 2.
```sql
-SELECT * FROM nested_data, UNNEST(MV_TO_ARRAY("dim3")) AS foo(d3) WHERE d3 IN ('b', 'd') and m1 < 2
+SELECT * FROM nested_data CROSS JOIN UNNEST(MV_TO_ARRAY("dim3")) AS foo(d3) WHERE d3 IN ('b', 'd') and m1 < 2
```
The query only returns a single row since only one row meets the conditions. You can see the results change if you modify the filter.
@@ -267,7 +267,7 @@ The query only returns a single row since only one row meets the conditions. You
The following query unnests `dim3` and then performs a GROUP BY on the output `d3`.
```sql
-SELECT d3 FROM nested_data, UNNEST(MV_TO_ARRAY(dim3)) AS example_table(d3) GROUP BY d3
+SELECT d3 FROM nested_data CROSS JOIN UNNEST(MV_TO_ARRAY(dim3)) AS example_table(d3) GROUP BY d3
```
You can further transform your results by including clauses like `ORDER BY d3 DESC` or LIMIT.
diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
index c423b959eccf..c7b10f245c1d 100644
--- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
+++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
@@ -463,14 +463,18 @@ public TaskStatus runTask(final Closer closer)
}
}
+ boolean shouldWaitForSegmentLoad = MultiStageQueryContext.shouldWaitForSegmentLoad(task.getQuerySpec().getQuery().context());
try {
releaseTaskLocks();
cleanUpDurableStorageIfNeeded();
if (queryKernel != null && queryKernel.isSuccess()) {
- if (segmentLoadWaiter != null) {
- // If successful and there are segments created, segmentLoadWaiter should wait for them to become available.
+ if (shouldWaitForSegmentLoad && segmentLoadWaiter != null) {
+ // If successful, there are segments created and segment load is enabled, segmentLoadWaiter should wait
+ // for them to become available.
+ log.info("Controller will now wait for segments to be loaded. The query has already finished executing,"
+ + " and results will be included once the segments are loaded, even if this query is cancelled now.");
segmentLoadWaiter.waitForSegmentsToLoad();
}
}
@@ -1363,31 +1367,35 @@ private void publishAllSegments(final Set segments) throws IOExcept
}
} else {
Set versionsToAwait = segmentsWithTombstones.stream().map(DataSegment::getVersion).collect(Collectors.toSet());
+ if (MultiStageQueryContext.shouldWaitForSegmentLoad(task.getQuerySpec().getQuery().context())) {
+ segmentLoadWaiter = new SegmentLoadStatusFetcher(
+ context.injector().getInstance(BrokerClient.class),
+ context.jsonMapper(),
+ task.getId(),
+ task.getDataSource(),
+ versionsToAwait,
+ segmentsWithTombstones.size(),
+ true
+ );
+ }
+ performSegmentPublish(
+ context.taskActionClient(),
+ SegmentTransactionalInsertAction.overwriteAction(null, segmentsWithTombstones)
+ );
+ }
+ } else if (!segments.isEmpty()) {
+ Set versionsToAwait = segments.stream().map(DataSegment::getVersion).collect(Collectors.toSet());
+ if (MultiStageQueryContext.shouldWaitForSegmentLoad(task.getQuerySpec().getQuery().context())) {
segmentLoadWaiter = new SegmentLoadStatusFetcher(
context.injector().getInstance(BrokerClient.class),
context.jsonMapper(),
task.getId(),
task.getDataSource(),
versionsToAwait,
- segmentsWithTombstones.size(),
+ segments.size(),
true
);
- performSegmentPublish(
- context.taskActionClient(),
- SegmentTransactionalInsertAction.overwriteAction(null, segmentsWithTombstones)
- );
}
- } else if (!segments.isEmpty()) {
- Set versionsToAwait = segments.stream().map(DataSegment::getVersion).collect(Collectors.toSet());
- segmentLoadWaiter = new SegmentLoadStatusFetcher(
- context.injector().getInstance(BrokerClient.class),
- context.jsonMapper(),
- task.getId(),
- task.getDataSource(),
- versionsToAwait,
- segments.size(),
- true
- );
// Append mode.
performSegmentPublish(
context.taskActionClient(),
diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/SegmentLoadStatusFetcher.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/SegmentLoadStatusFetcher.java
index 478c632a7491..17f46bad23a2 100644
--- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/SegmentLoadStatusFetcher.java
+++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/SegmentLoadStatusFetcher.java
@@ -41,13 +41,10 @@
import javax.annotation.Nullable;
import javax.ws.rs.core.MediaType;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.Map;
import java.util.Set;
-import java.util.TreeSet;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
/**
* Class that periodically checks with the broker if all the segments generated are loaded by querying the sys table
@@ -84,14 +81,14 @@ public class SegmentLoadStatusFetcher implements AutoCloseable
+ "COUNT(*) FILTER (WHERE is_available = 0 AND is_published = 1 AND replication_factor != 0) AS pendingSegments,\n"
+ "COUNT(*) FILTER (WHERE replication_factor = -1) AS unknownSegments\n"
+ "FROM sys.segments\n"
- + "WHERE datasource = '%s' AND is_overshadowed = 0 AND version = '%s'";
+ + "WHERE datasource = '%s' AND is_overshadowed = 0 AND version in (%s)";
private final BrokerClient brokerClient;
private final ObjectMapper objectMapper;
// Map of version vs latest load status.
- private final Map versionToLoadStatusMap;
+ private final AtomicReference versionLoadStatusReference;
private final String datasource;
- private final Set versionsToAwait;
+ private final String versionsInClauseString;
private final int totalSegmentsGenerated;
private final boolean doWait;
// since live reports fetch the value in another thread, we need to use AtomicReference
@@ -112,8 +109,11 @@ public SegmentLoadStatusFetcher(
this.brokerClient = brokerClient;
this.objectMapper = objectMapper;
this.datasource = datasource;
- this.versionsToAwait = new TreeSet<>(versionsToAwait);
- this.versionToLoadStatusMap = new HashMap<>();
+ this.versionsInClauseString = String.join(
+ ",",
+ versionsToAwait.stream().map(s -> StringUtils.format("'%s'", s)).collect(Collectors.toSet())
+ );
+ this.versionLoadStatusReference = new AtomicReference<>(new VersionLoadStatus(0, 0, 0, 0, totalSegmentsGenerated));
this.totalSegmentsGenerated = totalSegmentsGenerated;
this.status = new AtomicReference<>(new SegmentLoadWaiterStatus(
State.INIT,
@@ -145,8 +145,9 @@ public void waitForSegmentsToLoad()
final AtomicReference hasAnySegmentBeenLoaded = new AtomicReference<>(false);
try {
FutureUtils.getUnchecked(executorService.submit(() -> {
+ long lastLogMillis = -TimeUnit.MINUTES.toMillis(1);
try {
- while (!versionsToAwait.isEmpty()) {
+ while (!(hasAnySegmentBeenLoaded.get() && versionLoadStatusReference.get().isLoadingComplete())) {
// Check the timeout and exit if exceeded.
long runningMillis = new Interval(startTime, DateTimes.nowUtc()).toDurationMillis();
if (runningMillis > TIMEOUT_DURATION_MILLIS) {
@@ -159,29 +160,21 @@ public void waitForSegmentsToLoad()
return;
}
- Iterator iterator = versionsToAwait.iterator();
- log.info(
- "Fetching segment load status for datasource[%s] from broker for segment versions[%s]",
- datasource,
- versionsToAwait
- );
-
- // Query the broker for all pending versions
- while (iterator.hasNext()) {
- String version = iterator.next();
-
- // Fetch the load status for this version from the broker
- VersionLoadStatus loadStatus = fetchLoadStatusForVersion(version);
- versionToLoadStatusMap.put(version, loadStatus);
- hasAnySegmentBeenLoaded.set(hasAnySegmentBeenLoaded.get() || loadStatus.getUsedSegments() > 0);
-
- // If loading is done for this stage, remove it from future loops.
- if (hasAnySegmentBeenLoaded.get() && loadStatus.isLoadingComplete()) {
- iterator.remove();
- }
+ if (runningMillis - lastLogMillis >= TimeUnit.MINUTES.toMillis(1)) {
+ lastLogMillis = runningMillis;
+ log.info(
+ "Fetching segment load status for datasource[%s] from broker for segment versions[%s]",
+ datasource,
+ versionsInClauseString
+ );
}
- if (!versionsToAwait.isEmpty()) {
+ // Fetch the load status from the broker
+ VersionLoadStatus loadStatus = fetchLoadStatusFromBroker();
+ versionLoadStatusReference.set(loadStatus);
+ hasAnySegmentBeenLoaded.set(hasAnySegmentBeenLoaded.get() || loadStatus.getUsedSegments() > 0);
+
+ if (!(hasAnySegmentBeenLoaded.get() && versionLoadStatusReference.get().isLoadingComplete())) {
// Update the status.
updateStatus(State.WAITING, startTime);
// Sleep for a bit before checking again.
@@ -216,50 +209,45 @@ private void waitIfNeeded(long waitTimeMillis) throws Exception
}
/**
- * Updates the {@link #status} with the latest details based on {@link #versionToLoadStatusMap}
+ * Updates the {@link #status} with the latest details based on {@link #versionLoadStatusReference}
*/
private void updateStatus(State state, DateTime startTime)
{
- int pendingSegmentCount = 0, usedSegmentsCount = 0, precachedSegmentCount = 0, onDemandSegmentCount = 0, unknownSegmentCount = 0;
- for (Map.Entry entry : versionToLoadStatusMap.entrySet()) {
- usedSegmentsCount += entry.getValue().getUsedSegments();
- precachedSegmentCount += entry.getValue().getPrecachedSegments();
- onDemandSegmentCount += entry.getValue().getOnDemandSegments();
- unknownSegmentCount += entry.getValue().getUnknownSegments();
- pendingSegmentCount += entry.getValue().getPendingSegments();
- }
-
long runningMillis = new Interval(startTime, DateTimes.nowUtc()).toDurationMillis();
+ VersionLoadStatus versionLoadStatus = versionLoadStatusReference.get();
status.set(
new SegmentLoadWaiterStatus(
state,
startTime,
runningMillis,
totalSegmentsGenerated,
- usedSegmentsCount,
- precachedSegmentCount,
- onDemandSegmentCount,
- pendingSegmentCount,
- unknownSegmentCount
+ versionLoadStatus.getUsedSegments(),
+ versionLoadStatus.getPrecachedSegments(),
+ versionLoadStatus.getOnDemandSegments(),
+ versionLoadStatus.getPendingSegments(),
+ versionLoadStatus.getUnknownSegments()
)
);
}
/**
- * Uses {@link #brokerClient} to fetch latest load status for a given version. Converts the response into a
+ * Uses {@link #brokerClient} to fetch latest load status for a given set of versions. Converts the response into a
* {@link VersionLoadStatus} and returns it.
*/
- private VersionLoadStatus fetchLoadStatusForVersion(String version) throws Exception
+ private VersionLoadStatus fetchLoadStatusFromBroker() throws Exception
{
Request request = brokerClient.makeRequest(HttpMethod.POST, "/druid/v2/sql/");
- SqlQuery sqlQuery = new SqlQuery(StringUtils.format(LOAD_QUERY, datasource, version),
+ SqlQuery sqlQuery = new SqlQuery(StringUtils.format(LOAD_QUERY, datasource, versionsInClauseString),
ResultFormat.OBJECTLINES,
false, false, false, null, null
);
request.setContent(MediaType.APPLICATION_JSON, objectMapper.writeValueAsBytes(sqlQuery));
String response = brokerClient.sendQuery(request);
- if (response.trim().isEmpty()) {
+ if (response == null) {
+ // Unable to query broker
+ return new VersionLoadStatus(0, 0, 0, 0, totalSegmentsGenerated);
+ } else if (response.trim().isEmpty()) {
// If no segments are returned for a version, all segments have been dropped by a drop rule.
return new VersionLoadStatus(0, 0, 0, 0, 0);
} else {
diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/util/MultiStageQueryContext.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/util/MultiStageQueryContext.java
index 265f5eae0fe1..98dcd471d0fe 100644
--- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/util/MultiStageQueryContext.java
+++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/util/MultiStageQueryContext.java
@@ -97,6 +97,8 @@ public class MultiStageQueryContext
public static final String CTX_FAULT_TOLERANCE = "faultTolerance";
public static final boolean DEFAULT_FAULT_TOLERANCE = false;
+ public static final String CTX_SEGMENT_LOAD_WAIT = "waitTillSegmentsLoad";
+ public static final boolean DEFAULT_SEGMENT_LOAD_WAIT = false;
public static final String CTX_MAX_INPUT_BYTES_PER_WORKER = "maxInputBytesPerWorker";
public static final String CTX_CLUSTER_STATISTICS_MERGE_MODE = "clusterStatisticsMergeMode";
@@ -148,6 +150,14 @@ public static boolean isFaultToleranceEnabled(final QueryContext queryContext)
);
}
+ public static boolean shouldWaitForSegmentLoad(final QueryContext queryContext)
+ {
+ return queryContext.getBoolean(
+ CTX_SEGMENT_LOAD_WAIT,
+ DEFAULT_SEGMENT_LOAD_WAIT
+ );
+ }
+
public static boolean isReindex(final QueryContext queryContext)
{
return queryContext.getBoolean(
diff --git a/processing/src/main/java/org/apache/druid/java/util/common/DefineClassUtils.java b/processing/src/main/java/org/apache/druid/java/util/common/DefineClassUtils.java
index 604ef987394e..d79c517ae3d1 100644
--- a/processing/src/main/java/org/apache/druid/java/util/common/DefineClassUtils.java
+++ b/processing/src/main/java/org/apache/druid/java/util/common/DefineClassUtils.java
@@ -95,7 +95,7 @@ private static MethodHandle defineClassJava9(MethodHandles.Lookup lookup) throws
}
/**
- * "Compile" a MethodHandle that is equilavent to:
+ * "Compile" a MethodHandle that is equivalent to:
*
* Class> defineClass(Class targetClass, byte[] byteCode, String className) {
* return Unsafe.defineClass(
@@ -147,7 +147,7 @@ private static MethodHandle defineClassJava8(MethodHandles.Lookup lookup) throws
// defineClass(className, byteCode, 0, length, targetClass)
defineClass = MethodHandles.insertArguments(defineClass, 2, (int) 0);
- // JDK8 does not implement MethodHandles.arrayLength so we have to roll our own
+ // JDK8 does not implement MethodHandles.arrayLength, so we have to roll our own
MethodHandle arrayLength = lookup.findStatic(
lookup.lookupClass(),
"getArrayLength",
@@ -171,6 +171,16 @@ private static MethodHandle defineClassJava8(MethodHandles.Lookup lookup) throws
return defineClass;
}
+ /**
+ * This method is referenced in Java 8 using method handle, therefore it is not actually unused, and shouldn't be
+ * removed (till Java 8 is supported)
+ */
+ @SuppressWarnings("unused") // method is referenced and used in defineClassJava8
+ static int getArrayLength(byte[] bytes)
+ {
+ return bytes.length;
+ }
+
public static Class defineClass(
Class> targetClass,
byte[] byteCode,
diff --git a/processing/src/main/java/org/apache/druid/query/IterableRowsCursorHelper.java b/processing/src/main/java/org/apache/druid/query/IterableRowsCursorHelper.java
index c5bb271213e8..b4d06edc77cf 100644
--- a/processing/src/main/java/org/apache/druid/query/IterableRowsCursorHelper.java
+++ b/processing/src/main/java/org/apache/druid/query/IterableRowsCursorHelper.java
@@ -20,18 +20,18 @@
package org.apache.druid.query;
import org.apache.druid.java.util.common.Intervals;
+import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.common.guava.Sequences;
-import org.apache.druid.java.util.common.guava.Yielder;
-import org.apache.druid.java.util.common.guava.Yielders;
+import org.apache.druid.segment.Cursor;
import org.apache.druid.segment.RowAdapter;
import org.apache.druid.segment.RowBasedCursor;
import org.apache.druid.segment.RowWalker;
import org.apache.druid.segment.VirtualColumns;
import org.apache.druid.segment.column.RowSignature;
-import java.util.Iterator;
+import java.io.Closeable;
/**
* Helper methods to create cursor from iterable of rows
@@ -43,7 +43,18 @@ public class IterableRowsCursorHelper
* Creates a cursor that iterates over all the rows generated by the iterable. Presence of __time column is not a
* necessity
*/
- public static RowBasedCursor