Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/master'
Browse files Browse the repository at this point in the history
  • Loading branch information
AmatyaAvadhanula committed Dec 29, 2023
2 parents 0780954 + cce5394 commit 6605362
Show file tree
Hide file tree
Showing 104 changed files with 3,669 additions and 2,124 deletions.
68 changes: 46 additions & 22 deletions .github/labeler.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,47 +21,71 @@
# Pull Request Labeler GitHub Action Configuration: https://github.com/marketplace/actions/labeler

'Area - Batch Ingestion':
- 'indexing-hadoop/**'
- 'extensions-core/multi-stage-query/**'
- changed-files:
- any-glob-to-any-file:
- 'indexing-hadoop/**'
- 'extensions-core/multi-stage-query/**'

'Area - Dependencies':
- '**/pom.xml'
- 'licenses.yaml'
- changed-files:
- any-glob-to-any-file:
- '**/pom.xml'
- 'licenses.yaml'

'Area - Documentation':
- 'docs/**/*'
- 'website/**'
- 'examples/quickstart/jupyter-notebooks/**'
- changed-files:
- any-glob-to-any-file:
- 'docs/**/*'
- 'website/**'
- 'examples/quickstart/jupyter-notebooks/**'

'Area - Ingestion':
- 'indexing-service/**'
- changed-files:
- any-glob-to-any-file:
- 'indexing-service/**'

'Area - Lookups':
- 'extensions-core/lookups-cached-global/**'
- 'extensions-core/lookups-cached-single/**'
- 'extensions-core/kafka-extraction-namespace/**'
- changed-files:
- any-glob-to-any-file:
- 'extensions-core/lookups-cached-global/**'
- 'extensions-core/lookups-cached-single/**'
- 'extensions-core/kafka-extraction-namespace/**'

'Area - Metrics/Event Emitting':
- 'processing/src/main/java/org/apache/druid/java/util/metrics/**'
- 'processing/src/main/java/org/apache/druid/java/util/emitter/**'
- 'extensions-contrib/*-emitter/**'
- changed-files:
- any-glob-to-any-file:
- 'processing/src/main/java/org/apache/druid/java/util/metrics/**'
- 'processing/src/main/java/org/apache/druid/java/util/emitter/**'
- 'extensions-contrib/*-emitter/**'

'Area - MSQ':
- 'extensions-core/multi-stage-query/**'
- changed-files:
- any-glob-to-any-file:
- 'extensions-core/multi-stage-query/**'

'Area - Querying':
- 'sql/**'
- 'extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/**'
- changed-files:
- any-glob-to-any-file:
- 'sql/**'
- 'extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/**'

'Area - Segment Format and Ser/De':
- 'processing/src/main/java/org/apache/druid/segment/**'
- changed-files:
- any-glob-to-any-file:
- 'processing/src/main/java/org/apache/druid/segment/**'

'Area - Streaming Ingestion':
- 'extensions-core/kafka-indexing-service/**'
- 'extensions-core/kinesis-indexing-service/**'
- changed-files:
- any-glob-to-any-file:
- 'extensions-core/kafka-indexing-service/**'
- 'extensions-core/kinesis-indexing-service/**'

'Area - Web Console':
- 'web-console/**'
- changed-files:
- any-glob-to-any-file:
- 'web-console/**'

'Kubernetes':
- 'extensions-contrib/kubernetes-overlord-extensions/**'
- changed-files:
- any-glob-to-any-file:
- 'extensions-contrib/kubernetes-overlord-extensions/**'
2 changes: 1 addition & 1 deletion .github/workflows/labeler.yml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,6 @@ jobs:
pull-requests: write
runs-on: ubuntu-latest
steps:
- uses: actions/labeler@v4
- uses: actions/labeler@v5
with:
repo-token: "${{ secrets.GITHUB_TOKEN }}"
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.math.expr.ExpressionProcessing;
import org.apache.druid.query.expression.TestExprMacroTable;
import org.apache.druid.query.filter.AndDimFilter;
import org.apache.druid.query.filter.DimFilter;
Expand Down Expand Up @@ -70,6 +71,7 @@ public class ExpressionFilterBenchmark
{
static {
NullHandling.initializeForTests();
ExpressionProcessing.initializeForTests();
}

@Param({"1000000"})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,10 @@
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.math.expr.ExpressionProcessing;
import org.apache.druid.query.dimension.DefaultDimensionSpec;
import org.apache.druid.query.dimension.ExtractionDimensionSpec;
import org.apache.druid.query.expression.LookupEnabledTestExprMacroTable;
import org.apache.druid.query.expression.TestExprMacroTable;
import org.apache.druid.query.extraction.StrlenExtractionFn;
import org.apache.druid.query.extraction.TimeFormatExtractionFn;
Expand Down Expand Up @@ -59,21 +61,21 @@
import org.openjdk.jmh.annotations.TearDown;
import org.openjdk.jmh.annotations.Warmup;
import org.openjdk.jmh.infra.Blackhole;

import java.util.BitSet;
import java.util.List;
import java.util.concurrent.TimeUnit;

@State(Scope.Benchmark)
@Fork(value = 1)
@Warmup(iterations = 15)
@Measurement(iterations = 30)
@Warmup(iterations = 3, time = 3)
@Measurement(iterations = 10, time = 3)
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.MILLISECONDS)
public class ExpressionSelectorBenchmark
{
static {
NullHandling.initializeForTests();
ExpressionProcessing.initializeForTests();
}

@Param({"1000000"})
Expand Down Expand Up @@ -451,6 +453,154 @@ public void stringConcatAndCompareOnLong(Blackhole blackhole)
blackhole.consume(results);
}

@Benchmark
public void caseSearched1(Blackhole blackhole)
{
final Sequence<Cursor> cursors = new QueryableIndexStorageAdapter(index).makeCursors(
null,
index.getDataInterval(),
VirtualColumns.create(
ImmutableList.of(
new ExpressionVirtualColumn(
"v",
"case_searched(s == 'asd' || isnull(s) || s == 'xxx', 1, s == 'foo' || s == 'bar', 2, 3)",
ColumnType.LONG,
TestExprMacroTable.INSTANCE
)
)
),
Granularities.ALL,
false,
null
);

final List<?> results = cursors
.map(cursor -> {
final ColumnValueSelector selector = cursor.getColumnSelectorFactory().makeColumnValueSelector("v");
consumeLong(cursor, selector, blackhole);
return null;
})
.toList();

blackhole.consume(results);
}

@Benchmark
public void caseSearched2(Blackhole blackhole)
{
final Sequence<Cursor> cursors = new QueryableIndexStorageAdapter(index).makeCursors(
null,
index.getDataInterval(),
VirtualColumns.create(
ImmutableList.of(
new ExpressionVirtualColumn(
"v",
"case_searched(s == 'asd' || isnull(s) || n == 1, 1, n == 2, 2, 3)",
ColumnType.LONG,
TestExprMacroTable.INSTANCE
)
)
),
Granularities.ALL,
false,
null
);

final List<?> results = cursors
.map(cursor -> {
final ColumnValueSelector selector = cursor.getColumnSelectorFactory().makeColumnValueSelector("v");
consumeLong(cursor, selector, blackhole);
return null;
})
.toList();

blackhole.consume(results);
}

@Benchmark
public void caseSearchedWithLookup(Blackhole blackhole)
{
final Sequence<Cursor> cursors = new QueryableIndexStorageAdapter(index).makeCursors(
null,
index.getDataInterval(),
VirtualColumns.create(
ImmutableList.of(
new ExpressionVirtualColumn(
"v",
"case_searched(n == 1001, -1, "
+ "lookup(s, 'lookyloo') == 'asd1', 1, "
+ "lookup(s, 'lookyloo') == 'asd2', 2, "
+ "lookup(s, 'lookyloo') == 'asd3', 3, "
+ "lookup(s, 'lookyloo') == 'asd4', 4, "
+ "lookup(s, 'lookyloo') == 'asd5', 5, "
+ "-2)",
ColumnType.LONG,
LookupEnabledTestExprMacroTable.INSTANCE
)
)
),
Granularities.ALL,
false,
null
);

final List<?> results = cursors
.map(cursor -> {
final ColumnValueSelector selector = cursor.getColumnSelectorFactory().makeColumnValueSelector("v");
consumeLong(cursor, selector, blackhole);
return null;
})
.toList();

blackhole.consume(results);
}

@Benchmark
public void caseSearchedWithLookup2(Blackhole blackhole)
{
final Sequence<Cursor> cursors = new QueryableIndexStorageAdapter(index).makeCursors(
null,
index.getDataInterval(),
VirtualColumns.create(
ImmutableList.of(
new ExpressionVirtualColumn(
"ll",
"lookup(s, 'lookyloo')",
ColumnType.STRING,
LookupEnabledTestExprMacroTable.INSTANCE
),
new ExpressionVirtualColumn(
"v",
"case_searched(n == 1001, -1, "
+ "ll == 'asd1', 1, "
+ "ll == 'asd2', 2, "
+ "ll == 'asd3', 3, "
+ "ll == 'asd4', 4, "
+ "ll == 'asd5', 5, "
+ "-2)",
ColumnType.LONG,
LookupEnabledTestExprMacroTable.INSTANCE
)
)
),
Granularities.ALL,
false,
null
);

final List<?> results = cursors
.map(cursor -> {
final ColumnValueSelector selector = cursor.getColumnSelectorFactory().makeColumnValueSelector("v");
consumeLong(cursor, selector, blackhole);
return null;
})
.toList();

blackhole.consume(results);
}



private void consumeDimension(final Cursor cursor, final DimensionSelector selector, final Blackhole blackhole)
{
if (selector.getValueCardinality() >= 0) {
Expand Down
25 changes: 25 additions & 0 deletions examples/bin/router.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
#!/bin/bash -eu
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

usage="Usage: router.sh (start|stop|status)"

if [ $# -lt 1 ]; then
echo $usage
exit 1
fi

cd $(dirname $0)/../
sh ./bin/node.sh router $1
15 changes: 15 additions & 0 deletions extensions-contrib/cassandra-storage/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,21 @@
<relativePath>../../pom.xml</relativePath>
</parent>

<dependencyManagement>
<dependencies>
<!-- snakeyaml explicitly pinned to version 1.33 as it is
a transitive dependency of:
com.netflix.astyanax:astyanax:jar -> g.apache.cassandra:cassandra-all:jar
please remove this pin after the update of astyanax, see comment below
-->
<dependency>
<groupId>org.yaml</groupId>
<artifactId>snakeyaml</artifactId>
<version>1.33</version>
</dependency>
</dependencies>
</dependencyManagement>

<dependencies>
<dependency>
<groupId>org.apache.druid</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ protected Producer<String, String> setKafkaProducer()
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.RETRIES_CONFIG, DEFAULT_RETRIES);
props.putAll(config.getKafkaProducerConfig());
props.putAll(config.getKafkaProducerSecrets().getConfig());

return new KafkaProducer<>(props);
}
Expand Down Expand Up @@ -181,6 +182,10 @@ private void sendToKafka(final String topic, MemoryBoundLinkedBlockingQueue<Stri
}
}
catch (Throwable e) {
if (e instanceof InterruptedException && e.getMessage() == null) {
log.info("Normal exit.");
return;
}
log.warn(e, "Exception while getting record from queue or producer send, Events would not be emitted anymore.");
}
}
Expand Down
Loading

0 comments on commit 6605362

Please sign in to comment.