diff --git a/.github/labeler.yml b/.github/labeler.yml index a9bfc45a86ec..22895a82d8b7 100644 --- a/.github/labeler.yml +++ b/.github/labeler.yml @@ -21,47 +21,71 @@ # Pull Request Labeler GitHub Action Configuration: https://github.com/marketplace/actions/labeler 'Area - Batch Ingestion': - - 'indexing-hadoop/**' - - 'extensions-core/multi-stage-query/**' + - changed-files: + - any-glob-to-any-file: + - 'indexing-hadoop/**' + - 'extensions-core/multi-stage-query/**' 'Area - Dependencies': - - '**/pom.xml' - - 'licenses.yaml' + - changed-files: + - any-glob-to-any-file: + - '**/pom.xml' + - 'licenses.yaml' 'Area - Documentation': - - 'docs/**/*' - - 'website/**' - - 'examples/quickstart/jupyter-notebooks/**' + - changed-files: + - any-glob-to-any-file: + - 'docs/**/*' + - 'website/**' + - 'examples/quickstart/jupyter-notebooks/**' 'Area - Ingestion': - - 'indexing-service/**' + - changed-files: + - any-glob-to-any-file: + - 'indexing-service/**' 'Area - Lookups': - - 'extensions-core/lookups-cached-global/**' - - 'extensions-core/lookups-cached-single/**' - - 'extensions-core/kafka-extraction-namespace/**' + - changed-files: + - any-glob-to-any-file: + - 'extensions-core/lookups-cached-global/**' + - 'extensions-core/lookups-cached-single/**' + - 'extensions-core/kafka-extraction-namespace/**' 'Area - Metrics/Event Emitting': - - 'processing/src/main/java/org/apache/druid/java/util/metrics/**' - - 'processing/src/main/java/org/apache/druid/java/util/emitter/**' - - 'extensions-contrib/*-emitter/**' + - changed-files: + - any-glob-to-any-file: + - 'processing/src/main/java/org/apache/druid/java/util/metrics/**' + - 'processing/src/main/java/org/apache/druid/java/util/emitter/**' + - 'extensions-contrib/*-emitter/**' 'Area - MSQ': - - 'extensions-core/multi-stage-query/**' + - changed-files: + - any-glob-to-any-file: + - 'extensions-core/multi-stage-query/**' 'Area - Querying': - - 'sql/**' - - 'extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/**' + - changed-files: + - any-glob-to-any-file: + - 'sql/**' + - 'extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/**' 'Area - Segment Format and Ser/De': - - 'processing/src/main/java/org/apache/druid/segment/**' + - changed-files: + - any-glob-to-any-file: + - 'processing/src/main/java/org/apache/druid/segment/**' 'Area - Streaming Ingestion': - - 'extensions-core/kafka-indexing-service/**' - - 'extensions-core/kinesis-indexing-service/**' + - changed-files: + - any-glob-to-any-file: + - 'extensions-core/kafka-indexing-service/**' + - 'extensions-core/kinesis-indexing-service/**' 'Area - Web Console': - - 'web-console/**' + - changed-files: + - any-glob-to-any-file: + - 'web-console/**' 'Kubernetes': - - 'extensions-contrib/kubernetes-overlord-extensions/**' + - changed-files: + - any-glob-to-any-file: + - 'extensions-contrib/kubernetes-overlord-extensions/**' diff --git a/.github/workflows/labeler.yml b/.github/workflows/labeler.yml index 5de3d3bcc5a1..4c6c1867583d 100644 --- a/.github/workflows/labeler.yml +++ b/.github/workflows/labeler.yml @@ -32,6 +32,6 @@ jobs: pull-requests: write runs-on: ubuntu-latest steps: - - uses: actions/labeler@v4 + - uses: actions/labeler@v5 with: repo-token: "${{ secrets.GITHUB_TOKEN }}" \ No newline at end of file diff --git a/benchmarks/src/test/java/org/apache/druid/benchmark/ExpressionFilterBenchmark.java b/benchmarks/src/test/java/org/apache/druid/benchmark/ExpressionFilterBenchmark.java index eea1804292ab..e7d8ad00bc5e 100644 --- a/benchmarks/src/test/java/org/apache/druid/benchmark/ExpressionFilterBenchmark.java +++ b/benchmarks/src/test/java/org/apache/druid/benchmark/ExpressionFilterBenchmark.java @@ -25,6 +25,7 @@ import org.apache.druid.java.util.common.granularity.Granularities; import org.apache.druid.java.util.common.guava.Sequence; import org.apache.druid.java.util.common.io.Closer; +import org.apache.druid.math.expr.ExpressionProcessing; import org.apache.druid.query.expression.TestExprMacroTable; import org.apache.druid.query.filter.AndDimFilter; import org.apache.druid.query.filter.DimFilter; @@ -70,6 +71,7 @@ public class ExpressionFilterBenchmark { static { NullHandling.initializeForTests(); + ExpressionProcessing.initializeForTests(); } @Param({"1000000"}) diff --git a/benchmarks/src/test/java/org/apache/druid/benchmark/ExpressionSelectorBenchmark.java b/benchmarks/src/test/java/org/apache/druid/benchmark/ExpressionSelectorBenchmark.java index fcc4cf24794a..d8e1dfc76464 100644 --- a/benchmarks/src/test/java/org/apache/druid/benchmark/ExpressionSelectorBenchmark.java +++ b/benchmarks/src/test/java/org/apache/druid/benchmark/ExpressionSelectorBenchmark.java @@ -25,8 +25,10 @@ import org.apache.druid.java.util.common.granularity.Granularities; import org.apache.druid.java.util.common.guava.Sequence; import org.apache.druid.java.util.common.io.Closer; +import org.apache.druid.math.expr.ExpressionProcessing; import org.apache.druid.query.dimension.DefaultDimensionSpec; import org.apache.druid.query.dimension.ExtractionDimensionSpec; +import org.apache.druid.query.expression.LookupEnabledTestExprMacroTable; import org.apache.druid.query.expression.TestExprMacroTable; import org.apache.druid.query.extraction.StrlenExtractionFn; import org.apache.druid.query.extraction.TimeFormatExtractionFn; @@ -59,21 +61,21 @@ import org.openjdk.jmh.annotations.TearDown; import org.openjdk.jmh.annotations.Warmup; import org.openjdk.jmh.infra.Blackhole; - import java.util.BitSet; import java.util.List; import java.util.concurrent.TimeUnit; @State(Scope.Benchmark) @Fork(value = 1) -@Warmup(iterations = 15) -@Measurement(iterations = 30) +@Warmup(iterations = 3, time = 3) +@Measurement(iterations = 10, time = 3) @BenchmarkMode(Mode.AverageTime) @OutputTimeUnit(TimeUnit.MILLISECONDS) public class ExpressionSelectorBenchmark { static { NullHandling.initializeForTests(); + ExpressionProcessing.initializeForTests(); } @Param({"1000000"}) @@ -451,6 +453,154 @@ public void stringConcatAndCompareOnLong(Blackhole blackhole) blackhole.consume(results); } + @Benchmark + public void caseSearched1(Blackhole blackhole) + { + final Sequence cursors = new QueryableIndexStorageAdapter(index).makeCursors( + null, + index.getDataInterval(), + VirtualColumns.create( + ImmutableList.of( + new ExpressionVirtualColumn( + "v", + "case_searched(s == 'asd' || isnull(s) || s == 'xxx', 1, s == 'foo' || s == 'bar', 2, 3)", + ColumnType.LONG, + TestExprMacroTable.INSTANCE + ) + ) + ), + Granularities.ALL, + false, + null + ); + + final List results = cursors + .map(cursor -> { + final ColumnValueSelector selector = cursor.getColumnSelectorFactory().makeColumnValueSelector("v"); + consumeLong(cursor, selector, blackhole); + return null; + }) + .toList(); + + blackhole.consume(results); + } + + @Benchmark + public void caseSearched2(Blackhole blackhole) + { + final Sequence cursors = new QueryableIndexStorageAdapter(index).makeCursors( + null, + index.getDataInterval(), + VirtualColumns.create( + ImmutableList.of( + new ExpressionVirtualColumn( + "v", + "case_searched(s == 'asd' || isnull(s) || n == 1, 1, n == 2, 2, 3)", + ColumnType.LONG, + TestExprMacroTable.INSTANCE + ) + ) + ), + Granularities.ALL, + false, + null + ); + + final List results = cursors + .map(cursor -> { + final ColumnValueSelector selector = cursor.getColumnSelectorFactory().makeColumnValueSelector("v"); + consumeLong(cursor, selector, blackhole); + return null; + }) + .toList(); + + blackhole.consume(results); + } + + @Benchmark + public void caseSearchedWithLookup(Blackhole blackhole) + { + final Sequence cursors = new QueryableIndexStorageAdapter(index).makeCursors( + null, + index.getDataInterval(), + VirtualColumns.create( + ImmutableList.of( + new ExpressionVirtualColumn( + "v", + "case_searched(n == 1001, -1, " + + "lookup(s, 'lookyloo') == 'asd1', 1, " + + "lookup(s, 'lookyloo') == 'asd2', 2, " + + "lookup(s, 'lookyloo') == 'asd3', 3, " + + "lookup(s, 'lookyloo') == 'asd4', 4, " + + "lookup(s, 'lookyloo') == 'asd5', 5, " + + "-2)", + ColumnType.LONG, + LookupEnabledTestExprMacroTable.INSTANCE + ) + ) + ), + Granularities.ALL, + false, + null + ); + + final List results = cursors + .map(cursor -> { + final ColumnValueSelector selector = cursor.getColumnSelectorFactory().makeColumnValueSelector("v"); + consumeLong(cursor, selector, blackhole); + return null; + }) + .toList(); + + blackhole.consume(results); + } + + @Benchmark + public void caseSearchedWithLookup2(Blackhole blackhole) + { + final Sequence cursors = new QueryableIndexStorageAdapter(index).makeCursors( + null, + index.getDataInterval(), + VirtualColumns.create( + ImmutableList.of( + new ExpressionVirtualColumn( + "ll", + "lookup(s, 'lookyloo')", + ColumnType.STRING, + LookupEnabledTestExprMacroTable.INSTANCE + ), + new ExpressionVirtualColumn( + "v", + "case_searched(n == 1001, -1, " + + "ll == 'asd1', 1, " + + "ll == 'asd2', 2, " + + "ll == 'asd3', 3, " + + "ll == 'asd4', 4, " + + "ll == 'asd5', 5, " + + "-2)", + ColumnType.LONG, + LookupEnabledTestExprMacroTable.INSTANCE + ) + ) + ), + Granularities.ALL, + false, + null + ); + + final List results = cursors + .map(cursor -> { + final ColumnValueSelector selector = cursor.getColumnSelectorFactory().makeColumnValueSelector("v"); + consumeLong(cursor, selector, blackhole); + return null; + }) + .toList(); + + blackhole.consume(results); + } + + + private void consumeDimension(final Cursor cursor, final DimensionSelector selector, final Blackhole blackhole) { if (selector.getValueCardinality() >= 0) { diff --git a/examples/bin/router.sh b/examples/bin/router.sh new file mode 100644 index 000000000000..5adc0c8623a4 --- /dev/null +++ b/examples/bin/router.sh @@ -0,0 +1,25 @@ +#!/bin/bash -eu +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +usage="Usage: router.sh (start|stop|status)" + +if [ $# -lt 1 ]; then + echo $usage + exit 1 +fi + +cd $(dirname $0)/../ +sh ./bin/node.sh router $1 diff --git a/extensions-contrib/cassandra-storage/pom.xml b/extensions-contrib/cassandra-storage/pom.xml index 9dc2faefcec4..0760ed3cd3d4 100644 --- a/extensions-contrib/cassandra-storage/pom.xml +++ b/extensions-contrib/cassandra-storage/pom.xml @@ -33,6 +33,21 @@ ../../pom.xml + + + + + org.yaml + snakeyaml + 1.33 + + + + org.apache.druid diff --git a/extensions-contrib/kafka-emitter/src/main/java/org/apache/druid/emitter/kafka/KafkaEmitter.java b/extensions-contrib/kafka-emitter/src/main/java/org/apache/druid/emitter/kafka/KafkaEmitter.java index 4a58464f7308..7485cbaab6de 100644 --- a/extensions-contrib/kafka-emitter/src/main/java/org/apache/druid/emitter/kafka/KafkaEmitter.java +++ b/extensions-contrib/kafka-emitter/src/main/java/org/apache/druid/emitter/kafka/KafkaEmitter.java @@ -115,6 +115,7 @@ protected Producer setKafkaProducer() props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); props.put(ProducerConfig.RETRIES_CONFIG, DEFAULT_RETRIES); props.putAll(config.getKafkaProducerConfig()); + props.putAll(config.getKafkaProducerSecrets().getConfig()); return new KafkaProducer<>(props); } @@ -181,6 +182,10 @@ private void sendToKafka(final String topic, MemoryBoundLinkedBlockingQueue kafkaProducerConfig; + @JsonProperty("producer.hiddenProperties") + private final DynamicConfigProvider kafkaProducerSecrets; @JsonCreator public KafkaEmitterConfig( @@ -83,7 +87,8 @@ public KafkaEmitterConfig( @Nullable @JsonProperty("request.topic") String requestTopic, @Nullable @JsonProperty("segmentMetadata.topic") String segmentMetadataTopic, @JsonProperty("clusterName") String clusterName, - @JsonProperty("producer.config") @Nullable Map kafkaProducerConfig + @JsonProperty("producer.config") @Nullable Map kafkaProducerConfig, + @JsonProperty("producer.hiddenProperties") @Nullable DynamicConfigProvider kafkaProducerSecrets ) { this.bootstrapServers = Preconditions.checkNotNull(bootstrapServers, "druid.emitter.kafka.bootstrap.servers can not be null"); @@ -94,6 +99,7 @@ public KafkaEmitterConfig( this.segmentMetadataTopic = this.eventTypes.contains(EventType.SEGMENT_METADATA) ? Preconditions.checkNotNull(segmentMetadataTopic, "druid.emitter.kafka.segmentMetadata.topic can not be null") : null; this.clusterName = clusterName; this.kafkaProducerConfig = kafkaProducerConfig == null ? ImmutableMap.of() : kafkaProducerConfig; + this.kafkaProducerSecrets = kafkaProducerSecrets == null ? new MapStringDynamicConfigProvider(ImmutableMap.of()) : kafkaProducerSecrets; } private Set maybeUpdateEventTypes(Set eventTypes, String requestTopic) @@ -159,6 +165,12 @@ public Map getKafkaProducerConfig() return kafkaProducerConfig; } + @JsonProperty + public DynamicConfigProvider getKafkaProducerSecrets() + { + return kafkaProducerSecrets; + } + @Override public boolean equals(Object o) { @@ -198,7 +210,10 @@ public boolean equals(Object o) if (getClusterName() != null ? !getClusterName().equals(that.getClusterName()) : that.getClusterName() != null) { return false; } - return getKafkaProducerConfig().equals(that.getKafkaProducerConfig()); + if (!getKafkaProducerConfig().equals(that.getKafkaProducerConfig())) { + return false; + } + return getKafkaProducerSecrets().getConfig().equals(that.getKafkaProducerSecrets().getConfig()); } @Override @@ -212,6 +227,7 @@ public int hashCode() result = 31 * result + (getSegmentMetadataTopic() != null ? getSegmentMetadataTopic().hashCode() : 0); result = 31 * result + (getClusterName() != null ? getClusterName().hashCode() : 0); result = 31 * result + getKafkaProducerConfig().hashCode(); + result = 31 * result + getKafkaProducerSecrets().getConfig().hashCode(); return result; } @@ -226,7 +242,8 @@ public String toString() ", request.topic='" + requestTopic + '\'' + ", segmentMetadata.topic='" + segmentMetadataTopic + '\'' + ", clusterName='" + clusterName + '\'' + - ", Producer.config=" + kafkaProducerConfig + + ", producer.config=" + kafkaProducerConfig + '\'' + + ", producer.hiddenProperties=" + kafkaProducerSecrets + '}'; } } diff --git a/extensions-contrib/kafka-emitter/src/test/java/org/apache/druid/emitter/kafka/KafkaEmitterConfigTest.java b/extensions-contrib/kafka-emitter/src/test/java/org/apache/druid/emitter/kafka/KafkaEmitterConfigTest.java index c4d5811bcb53..603c8e6701bf 100644 --- a/extensions-contrib/kafka-emitter/src/test/java/org/apache/druid/emitter/kafka/KafkaEmitterConfigTest.java +++ b/extensions-contrib/kafka-emitter/src/test/java/org/apache/druid/emitter/kafka/KafkaEmitterConfigTest.java @@ -25,48 +25,67 @@ import com.fasterxml.jackson.databind.exc.ValueInstantiationException; import com.google.common.collect.ImmutableMap; import org.apache.druid.jackson.DefaultObjectMapper; +import org.apache.druid.metadata.DynamicConfigProvider; +import org.apache.druid.metadata.MapStringDynamicConfigProvider; import org.junit.Assert; import org.junit.Before; import org.junit.Test; + import java.io.IOException; import java.util.HashSet; import java.util.Set; public class KafkaEmitterConfigTest { - private ObjectMapper mapper = new DefaultObjectMapper(); + private static final DynamicConfigProvider DEFAULT_PRODUCER_SECRETS = new MapStringDynamicConfigProvider( + ImmutableMap.of("testSecretKey", "testSecretValue")); + private static final ObjectMapper MAPPER = new DefaultObjectMapper(); @Before public void setUp() { - mapper.setInjectableValues(new InjectableValues.Std().addValue(ObjectMapper.class, new DefaultObjectMapper())); + MAPPER.setInjectableValues(new InjectableValues.Std().addValue(ObjectMapper.class, new DefaultObjectMapper())); } @Test public void testSerDeserKafkaEmitterConfig() throws IOException { - KafkaEmitterConfig kafkaEmitterConfig = new KafkaEmitterConfig("hostname", null, "metricTest", - "alertTest", "requestTest", "metadataTest", - "clusterNameTest", ImmutableMap.builder() - .put("testKey", "testValue").build() + KafkaEmitterConfig kafkaEmitterConfig = new KafkaEmitterConfig( + "hostname", + null, + "metricTest", + "alertTest", + "requestTest", + "metadataTest", + "clusterNameTest", + ImmutableMap.builder() + .put("testKey", "testValue").build(), + DEFAULT_PRODUCER_SECRETS ); - String kafkaEmitterConfigString = mapper.writeValueAsString(kafkaEmitterConfig); - KafkaEmitterConfig kafkaEmitterConfigExpected = mapper.readerFor(KafkaEmitterConfig.class) - .readValue(kafkaEmitterConfigString); + String kafkaEmitterConfigString = MAPPER.writeValueAsString(kafkaEmitterConfig); + KafkaEmitterConfig kafkaEmitterConfigExpected = MAPPER.readerFor(KafkaEmitterConfig.class) + .readValue(kafkaEmitterConfigString); Assert.assertEquals(kafkaEmitterConfigExpected, kafkaEmitterConfig); } @Test public void testSerDeserKafkaEmitterConfigNullRequestTopic() throws IOException { - KafkaEmitterConfig kafkaEmitterConfig = new KafkaEmitterConfig("hostname", null, "metricTest", - "alertTest", null, "metadataTest", - "clusterNameTest", ImmutableMap.builder() - .put("testKey", "testValue").build() + KafkaEmitterConfig kafkaEmitterConfig = new KafkaEmitterConfig( + "hostname", + null, + "metricTest", + "alertTest", + null, + "metadataTest", + "clusterNameTest", + ImmutableMap.builder() + .put("testKey", "testValue").build(), + DEFAULT_PRODUCER_SECRETS ); - String kafkaEmitterConfigString = mapper.writeValueAsString(kafkaEmitterConfig); - KafkaEmitterConfig kafkaEmitterConfigExpected = mapper.readerFor(KafkaEmitterConfig.class) - .readValue(kafkaEmitterConfigString); + String kafkaEmitterConfigString = MAPPER.writeValueAsString(kafkaEmitterConfig); + KafkaEmitterConfig kafkaEmitterConfigExpected = MAPPER.readerFor(KafkaEmitterConfig.class) + .readValue(kafkaEmitterConfigString); Assert.assertEquals(kafkaEmitterConfigExpected, kafkaEmitterConfig); } @@ -75,27 +94,34 @@ public void testSerDeserKafkaEmitterConfigNullMetricsTopic() throws IOException { Set eventTypeSet = new HashSet(); eventTypeSet.add(KafkaEmitterConfig.EventType.SEGMENT_METADATA); - KafkaEmitterConfig kafkaEmitterConfig = new KafkaEmitterConfig("hostname", eventTypeSet, null, - null, null, "metadataTest", - "clusterNameTest", ImmutableMap.builder() - .put("testKey", "testValue").build() + KafkaEmitterConfig kafkaEmitterConfig = new KafkaEmitterConfig( + "hostname", + eventTypeSet, + null, + null, + null, + "metadataTest", + "clusterNameTest", + ImmutableMap.builder() + .put("testKey", "testValue").build(), + DEFAULT_PRODUCER_SECRETS ); - String kafkaEmitterConfigString = mapper.writeValueAsString(kafkaEmitterConfig); - KafkaEmitterConfig kafkaEmitterConfigExpected = mapper.readerFor(KafkaEmitterConfig.class) - .readValue(kafkaEmitterConfigString); + String kafkaEmitterConfigString = MAPPER.writeValueAsString(kafkaEmitterConfig); + KafkaEmitterConfig kafkaEmitterConfigExpected = MAPPER.readerFor(KafkaEmitterConfig.class) + .readValue(kafkaEmitterConfigString); Assert.assertEquals(kafkaEmitterConfigExpected, kafkaEmitterConfig); } @Test - public void testSerDeNotRequiredKafkaProducerConfig() + public void testSerDeNotRequiredKafkaProducerConfigOrKafkaSecretProducer() { KafkaEmitterConfig kafkaEmitterConfig = new KafkaEmitterConfig("localhost:9092", null, "metricTest", - "alertTest", null, "metadataTest", - "clusterNameTest", null + "alertTest", null, "metadataTest", + "clusterNameTest", null, null ); try { @SuppressWarnings("unused") - KafkaEmitter emitter = new KafkaEmitter(kafkaEmitterConfig, mapper); + KafkaEmitter emitter = new KafkaEmitter(kafkaEmitterConfig, MAPPER); } catch (NullPointerException e) { Assert.fail(); @@ -105,9 +131,18 @@ public void testSerDeNotRequiredKafkaProducerConfig() @Test public void testDeserializeEventTypesWithDifferentCase() throws JsonProcessingException { - Assert.assertEquals(KafkaEmitterConfig.EventType.SEGMENT_METADATA, mapper.readValue("\"segment_metadata\"", KafkaEmitterConfig.EventType.class)); - Assert.assertEquals(KafkaEmitterConfig.EventType.ALERTS, mapper.readValue("\"alerts\"", KafkaEmitterConfig.EventType.class)); - Assert.assertThrows(ValueInstantiationException.class, () -> mapper.readValue("\"segmentMetadata\"", KafkaEmitterConfig.EventType.class)); + Assert.assertEquals( + KafkaEmitterConfig.EventType.SEGMENT_METADATA, + MAPPER.readValue("\"segment_metadata\"", KafkaEmitterConfig.EventType.class) + ); + Assert.assertEquals( + KafkaEmitterConfig.EventType.ALERTS, + MAPPER.readValue("\"alerts\"", KafkaEmitterConfig.EventType.class) + ); + Assert.assertThrows( + ValueInstantiationException.class, + () -> MAPPER.readValue("\"segmentMetadata\"", KafkaEmitterConfig.EventType.class) + ); } @Test diff --git a/extensions-contrib/kafka-emitter/src/test/java/org/apache/druid/emitter/kafka/KafkaEmitterTest.java b/extensions-contrib/kafka-emitter/src/test/java/org/apache/druid/emitter/kafka/KafkaEmitterTest.java index 9e6846a5d8b8..1c30bee12fa2 100644 --- a/extensions-contrib/kafka-emitter/src/test/java/org/apache/druid/emitter/kafka/KafkaEmitterTest.java +++ b/extensions-contrib/kafka-emitter/src/test/java/org/apache/druid/emitter/kafka/KafkaEmitterTest.java @@ -106,7 +106,7 @@ public void testKafkaEmitter() throws InterruptedException ObjectMapper mapper = new ObjectMapper(); mapper.registerModule(new JodaModule()); final KafkaEmitter kafkaEmitter = new KafkaEmitter( - new KafkaEmitterConfig("", eventsType, "metrics", "alerts", requestTopic, "metadata", "test-cluster", null), + new KafkaEmitterConfig("", eventsType, "metrics", "alerts", requestTopic, "metadata", "test-cluster", null, null), mapper ) { @@ -140,6 +140,8 @@ protected Producer setKafkaProducer() } countDownSentEvents.await(); + kafkaEmitter.close(); + Assert.assertEquals(0, kafkaEmitter.getMetricLostCount()); Assert.assertEquals(0, kafkaEmitter.getAlertLostCount()); Assert.assertEquals(0, kafkaEmitter.getSegmentMetadataLostCount()); diff --git a/extensions-contrib/kubernetes-overlord-extensions/pom.xml b/extensions-contrib/kubernetes-overlord-extensions/pom.xml index 346cc3a00a9c..84770340f240 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/pom.xml +++ b/extensions-contrib/kubernetes-overlord-extensions/pom.xml @@ -34,6 +34,18 @@ ../../pom.xml + + + + + org.yaml + snakeyaml + 1.33 + + + diff --git a/extensions-core/druid-basic-security/src/main/java/org/apache/druid/security/basic/authentication/endpoint/BasicAuthenticatorResource.java b/extensions-core/druid-basic-security/src/main/java/org/apache/druid/security/basic/authentication/endpoint/BasicAuthenticatorResource.java index 23d8316771c4..fd0b029530c0 100644 --- a/extensions-core/druid-basic-security/src/main/java/org/apache/druid/security/basic/authentication/endpoint/BasicAuthenticatorResource.java +++ b/extensions-core/druid-basic-security/src/main/java/org/apache/druid/security/basic/authentication/endpoint/BasicAuthenticatorResource.java @@ -22,10 +22,14 @@ import com.fasterxml.jackson.jaxrs.smile.SmileMediaTypes; import com.google.inject.Inject; import com.sun.jersey.spi.container.ResourceFilters; +import org.apache.druid.audit.AuditEntry; +import org.apache.druid.audit.AuditManager; import org.apache.druid.guice.LazySingleton; +import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.security.basic.BasicSecurityResourceFilter; import org.apache.druid.security.basic.authentication.entity.BasicAuthenticatorCredentialUpdate; import org.apache.druid.server.security.AuthValidator; +import org.apache.druid.server.security.AuthorizationUtils; import javax.servlet.http.HttpServletRequest; import javax.ws.rs.Consumes; @@ -45,15 +49,18 @@ public class BasicAuthenticatorResource { private final BasicAuthenticatorResourceHandler handler; private final AuthValidator authValidator; + private final AuditManager auditManager; @Inject public BasicAuthenticatorResource( BasicAuthenticatorResourceHandler handler, - AuthValidator authValidator + AuthValidator authValidator, + AuditManager auditManager ) { this.handler = handler; this.authValidator = authValidator; + this.auditManager = auditManager; } /** @@ -151,7 +158,11 @@ public Response createUser( ) { authValidator.validateAuthenticatorName(authenticatorName); - return handler.createUser(authenticatorName, userName); + + final Response response = handler.createUser(authenticatorName, userName); + performAuditIfSuccess(authenticatorName, req, response, "Create user[%s]", userName); + + return response; } /** @@ -174,7 +185,10 @@ public Response deleteUser( ) { authValidator.validateAuthenticatorName(authenticatorName); - return handler.deleteUser(authenticatorName, userName); + final Response response = handler.deleteUser(authenticatorName, userName); + performAuditIfSuccess(authenticatorName, req, response, "Delete user[%s]", userName); + + return response; } /** @@ -198,7 +212,10 @@ public Response updateUserCredentials( ) { authValidator.validateAuthenticatorName(authenticatorName); - return handler.updateUserCredentials(authenticatorName, userName, update); + final Response response = handler.updateUserCredentials(authenticatorName, userName, update); + performAuditIfSuccess(authenticatorName, req, response, "Update credentials for user[%s]", userName); + + return response; } /** @@ -237,4 +254,35 @@ public Response authenticatorUpdateListener( authValidator.validateAuthenticatorName(authenticatorName); return handler.authenticatorUserUpdateListener(authenticatorName, serializedUserMap); } + + private boolean isSuccess(Response response) + { + if (response == null) { + return false; + } + + int responseCode = response.getStatus(); + return responseCode >= 200 && responseCode < 300; + } + + private void performAuditIfSuccess( + String authenticatorName, + HttpServletRequest request, + Response response, + String payloadFormat, + Object... payloadArgs + ) + { + if (isSuccess(response)) { + auditManager.doAudit( + AuditEntry.builder() + .key(authenticatorName) + .type("basic.authenticator") + .auditInfo(AuthorizationUtils.buildAuditInfo(request)) + .request(AuthorizationUtils.buildRequestInfo("coordinator", request)) + .payload(StringUtils.format(payloadFormat, payloadArgs)) + .build() + ); + } + } } diff --git a/extensions-core/druid-basic-security/src/main/java/org/apache/druid/security/basic/authorization/endpoint/BasicAuthorizerResource.java b/extensions-core/druid-basic-security/src/main/java/org/apache/druid/security/basic/authorization/endpoint/BasicAuthorizerResource.java index cb8a9fa2a240..700d86b3b040 100644 --- a/extensions-core/druid-basic-security/src/main/java/org/apache/druid/security/basic/authorization/endpoint/BasicAuthorizerResource.java +++ b/extensions-core/druid-basic-security/src/main/java/org/apache/druid/security/basic/authorization/endpoint/BasicAuthorizerResource.java @@ -22,10 +22,14 @@ import com.fasterxml.jackson.jaxrs.smile.SmileMediaTypes; import com.google.inject.Inject; import com.sun.jersey.spi.container.ResourceFilters; +import org.apache.druid.audit.AuditEntry; +import org.apache.druid.audit.AuditManager; import org.apache.druid.guice.LazySingleton; +import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.security.basic.BasicSecurityResourceFilter; import org.apache.druid.security.basic.authorization.entity.BasicAuthorizerGroupMapping; import org.apache.druid.server.security.AuthValidator; +import org.apache.druid.server.security.AuthorizationUtils; import org.apache.druid.server.security.ResourceAction; import javax.servlet.http.HttpServletRequest; @@ -48,15 +52,18 @@ public class BasicAuthorizerResource { private final BasicAuthorizerResourceHandler resourceHandler; private final AuthValidator authValidator; + private final AuditManager auditManager; @Inject public BasicAuthorizerResource( BasicAuthorizerResourceHandler resourceHandler, - AuthValidator authValidator + AuthValidator authValidator, + AuditManager auditManager ) { this.resourceHandler = resourceHandler; this.authValidator = authValidator; + this.auditManager = auditManager; } /** @@ -198,7 +205,11 @@ public Response createUser( ) { authValidator.validateAuthorizerName(authorizerName); - return resourceHandler.createUser(authorizerName, userName); + + final Response response = resourceHandler.createUser(authorizerName, userName); + performAuditIfSuccess(authorizerName, req, response, "Create user[%s]", userName); + + return response; } /** @@ -221,7 +232,11 @@ public Response deleteUser( ) { authValidator.validateAuthorizerName(authorizerName); - return resourceHandler.deleteUser(authorizerName, userName); + + final Response response = resourceHandler.deleteUser(authorizerName, userName); + performAuditIfSuccess(authorizerName, req, response, "Delete user[%s]", userName); + + return response; } /** @@ -245,10 +260,21 @@ public Response createGroupMapping( ) { authValidator.validateAuthorizerName(authorizerName); - return resourceHandler.createGroupMapping( + final Response response = resourceHandler.createGroupMapping( authorizerName, new BasicAuthorizerGroupMapping(groupMappingName, groupMapping.getGroupPattern(), groupMapping.getRoles()) ); + performAuditIfSuccess( + authorizerName, + req, + response, + "Create groupMapping[%s] with pattern[%s], roles[%s]", + groupMappingName, + groupMapping.getGroupPattern(), + groupMapping.getRoles() + ); + + return response; } /** @@ -271,7 +297,11 @@ public Response deleteGroupMapping( ) { authValidator.validateAuthorizerName(authorizerName); - return resourceHandler.deleteGroupMapping(authorizerName, groupMappingName); + + final Response response = resourceHandler.deleteGroupMapping(authorizerName, groupMappingName); + performAuditIfSuccess(authorizerName, req, response, "Delete groupMapping[%s]", groupMappingName); + + return response; } /** @@ -338,7 +368,11 @@ public Response createRole( ) { authValidator.validateAuthorizerName(authorizerName); - return resourceHandler.createRole(authorizerName, roleName); + + final Response response = resourceHandler.createRole(authorizerName, roleName); + performAuditIfSuccess(authorizerName, req, response, "Create role[%s]", roleName); + + return response; } /** @@ -361,7 +395,11 @@ public Response deleteRole( ) { authValidator.validateAuthorizerName(authorizerName); - return resourceHandler.deleteRole(authorizerName, roleName); + + final Response response = resourceHandler.deleteRole(authorizerName, roleName); + performAuditIfSuccess(authorizerName, req, response, "Delete role[%s]", roleName); + + return response; } /** @@ -386,7 +424,11 @@ public Response assignRoleToUser( ) { authValidator.validateAuthorizerName(authorizerName); - return resourceHandler.assignRoleToUser(authorizerName, userName, roleName); + + final Response response = resourceHandler.assignRoleToUser(authorizerName, userName, roleName); + performAuditIfSuccess(authorizerName, req, response, "Assign role[%s] to user[%s]", roleName, userName); + + return response; } /** @@ -411,7 +453,11 @@ public Response unassignRoleFromUser( ) { authValidator.validateAuthorizerName(authorizerName); - return resourceHandler.unassignRoleFromUser(authorizerName, userName, roleName); + + final Response response = resourceHandler.unassignRoleFromUser(authorizerName, userName, roleName); + performAuditIfSuccess(authorizerName, req, response, "Unassign role[%s] from user[%s]", roleName, userName); + + return response; } /** @@ -436,7 +482,12 @@ public Response assignRoleToGroupMapping( ) { authValidator.validateAuthorizerName(authorizerName); - return resourceHandler.assignRoleToGroupMapping(authorizerName, groupMappingName, roleName); + final Response response = resourceHandler.assignRoleToGroupMapping(authorizerName, groupMappingName, roleName); + + String msgFormat = "Assign role[%s] to groupMapping[%s]"; + performAuditIfSuccess(authorizerName, req, response, msgFormat, roleName, groupMappingName); + + return response; } /** @@ -461,7 +512,12 @@ public Response unassignRoleFromGroupMapping( ) { authValidator.validateAuthorizerName(authorizerName); - return resourceHandler.unassignRoleFromGroupMapping(authorizerName, groupMappingName, roleName); + + final Response response = resourceHandler.unassignRoleFromGroupMapping(authorizerName, groupMappingName, roleName); + String msgFormat = "Unassign role[%s] from groupMapping[%s]"; + performAuditIfSuccess(authorizerName, req, response, msgFormat, roleName, groupMappingName); + + return response; } /** @@ -486,7 +542,11 @@ public Response setRolePermissions( ) { authValidator.validateAuthorizerName(authorizerName); - return resourceHandler.setRolePermissions(authorizerName, roleName, permissions); + + final Response response = resourceHandler.setRolePermissions(authorizerName, roleName, permissions); + performAuditIfSuccess(authorizerName, req, response, "Set permissions[%s] for role[%s]", permissions, roleName); + + return response; } /** @@ -607,4 +667,35 @@ public Response authorizerGroupMappingUpdateListener( authValidator.validateAuthorizerName(authorizerName); return resourceHandler.authorizerGroupMappingUpdateListener(authorizerName, serializedGroupMappingAndRoleMap); } + + private boolean isSuccess(Response response) + { + if (response == null) { + return false; + } + + int responseCode = response.getStatus(); + return responseCode >= 200 && responseCode < 300; + } + + private void performAuditIfSuccess( + String authorizerName, + HttpServletRequest request, + Response response, + String msgFormat, + Object... args + ) + { + if (isSuccess(response)) { + auditManager.doAudit( + AuditEntry.builder() + .key(authorizerName) + .type("basic.authorizer") + .auditInfo(AuthorizationUtils.buildAuditInfo(request)) + .request(AuthorizationUtils.buildRequestInfo("coordinator", request)) + .payload(StringUtils.format(msgFormat, args)) + .build() + ); + } + } } diff --git a/extensions-core/druid-basic-security/src/test/java/org/apache/druid/security/authentication/CoordinatorBasicAuthenticatorResourceTest.java b/extensions-core/druid-basic-security/src/test/java/org/apache/druid/security/authentication/CoordinatorBasicAuthenticatorResourceTest.java index b383a6d3d92f..5dec8646573b 100644 --- a/extensions-core/druid-basic-security/src/test/java/org/apache/druid/security/authentication/CoordinatorBasicAuthenticatorResourceTest.java +++ b/extensions-core/druid-basic-security/src/test/java/org/apache/druid/security/authentication/CoordinatorBasicAuthenticatorResourceTest.java @@ -23,6 +23,7 @@ import com.fasterxml.jackson.dataformat.smile.SmileFactory; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; +import org.apache.druid.audit.AuditManager; import org.apache.druid.metadata.DefaultPasswordProvider; import org.apache.druid.metadata.MetadataStorageTablesConfig; import org.apache.druid.metadata.TestDerbyConnector; @@ -35,7 +36,9 @@ import org.apache.druid.security.basic.authentication.entity.BasicAuthenticatorCredentialUpdate; import org.apache.druid.security.basic.authentication.entity.BasicAuthenticatorCredentials; import org.apache.druid.security.basic.authentication.entity.BasicAuthenticatorUser; +import org.apache.druid.server.security.AuthConfig; import org.apache.druid.server.security.AuthValidator; +import org.apache.druid.server.security.AuthenticationResult; import org.apache.druid.server.security.AuthenticatorMapper; import org.easymock.EasyMock; import org.junit.After; @@ -50,6 +53,7 @@ import javax.servlet.http.HttpServletRequest; import javax.ws.rs.core.Response; +import java.util.Collections; import java.util.Map; import java.util.Set; @@ -68,6 +72,9 @@ public class CoordinatorBasicAuthenticatorResourceTest @Mock private AuthValidator authValidator; + + @Mock + private AuditManager auditManager; private BasicAuthenticatorResource resource; private CoordinatorBasicAuthenticatorMetadataStorageUpdater storageUpdater; private HttpServletRequest req; @@ -77,6 +84,13 @@ public class CoordinatorBasicAuthenticatorResourceTest public void setUp() { req = EasyMock.createStrictMock(HttpServletRequest.class); + EasyMock.expect(req.getHeader(AuditManager.X_DRUID_AUTHOR)).andReturn("author").anyTimes(); + EasyMock.expect(req.getHeader(AuditManager.X_DRUID_COMMENT)).andReturn("comment").anyTimes(); + EasyMock.expect(req.getAttribute(AuthConfig.DRUID_AUTHENTICATION_RESULT)).andReturn( + new AuthenticationResult("id", "authorizer", "authBy", Collections.emptyMap()) + ).anyTimes(); + EasyMock.expect(req.getRemoteAddr()).andReturn("127.0.0.1").anyTimes(); + EasyMock.replay(req); objectMapper = new ObjectMapper(new SmileFactory()); TestDerbyConnector connector = derbyConnectorRule.getConnector(); @@ -145,7 +159,8 @@ public void setUp() authenticatorMapper, objectMapper ), - authValidator + authValidator, + auditManager ); storageUpdater.start(); @@ -155,12 +170,15 @@ public void setUp() public void tearDown() { storageUpdater.stop(); + if (req != null) { + EasyMock.verify(req); + } } @Test public void testInvalidAuthenticator() { - Response response = resource.getAllUsers(req, "invalidName"); + Response response = resource.getAllUsers(mockHttpRequestNoAudit(), "invalidName"); Assert.assertEquals(400, response.getStatus()); Assert.assertEquals( errorMapWithMsg("Basic authenticator with name [invalidName] does not exist."), @@ -171,13 +189,13 @@ public void testInvalidAuthenticator() @Test public void testGetAllUsers() { - Response response = resource.getAllUsers(req, AUTHENTICATOR_NAME); + Response response = resource.getAllUsers(mockHttpRequestNoAudit(), AUTHENTICATOR_NAME); Assert.assertEquals(200, response.getStatus()); Assert.assertEquals(ImmutableSet.of(BasicAuthUtils.ADMIN_NAME, BasicAuthUtils.INTERNAL_USER_NAME), response.getEntity()); - resource.createUser(req, AUTHENTICATOR_NAME, "druid"); - resource.createUser(req, AUTHENTICATOR_NAME, "druid2"); - resource.createUser(req, AUTHENTICATOR_NAME, "druid3"); + resource.createUser(mockHttpRequest(), AUTHENTICATOR_NAME, "druid"); + resource.createUser(mockHttpRequest(), AUTHENTICATOR_NAME, "druid2"); + resource.createUser(mockHttpRequest(), AUTHENTICATOR_NAME, "druid3"); Set expectedUsers = ImmutableSet.of( BasicAuthUtils.ADMIN_NAME, @@ -187,12 +205,12 @@ public void testGetAllUsers() "druid3" ); - response = resource.getAllUsers(req, AUTHENTICATOR_NAME); + response = resource.getAllUsers(mockHttpRequestNoAudit(), AUTHENTICATOR_NAME); Assert.assertEquals(200, response.getStatus()); Assert.assertEquals(expectedUsers, response.getEntity()); // Verify cached user map is also getting updated - response = resource.getCachedSerializedUserMap(req, AUTHENTICATOR_NAME); + response = resource.getCachedSerializedUserMap(mockHttpRequestNoAudit(), AUTHENTICATOR_NAME); Assert.assertEquals(200, response.getStatus()); Assert.assertTrue(response.getEntity() instanceof byte[]); Map cachedUserMap = BasicAuthUtils.deserializeAuthenticatorUserMap(objectMapper, (byte[]) response.getEntity()); @@ -211,17 +229,17 @@ public void testGetAllUsers() @Test public void testGetAllUsersSeparateDatabaseTables() { - Response response = resource.getAllUsers(req, AUTHENTICATOR_NAME); + Response response = resource.getAllUsers(mockHttpRequestNoAudit(), AUTHENTICATOR_NAME); Assert.assertEquals(200, response.getStatus()); Assert.assertEquals(ImmutableSet.of(BasicAuthUtils.ADMIN_NAME, BasicAuthUtils.INTERNAL_USER_NAME), response.getEntity()); - resource.createUser(req, AUTHENTICATOR_NAME, "druid"); - resource.createUser(req, AUTHENTICATOR_NAME, "druid2"); - resource.createUser(req, AUTHENTICATOR_NAME, "druid3"); + resource.createUser(mockHttpRequest(), AUTHENTICATOR_NAME, "druid"); + resource.createUser(mockHttpRequest(), AUTHENTICATOR_NAME, "druid2"); + resource.createUser(mockHttpRequest(), AUTHENTICATOR_NAME, "druid3"); - resource.createUser(req, AUTHENTICATOR_NAME2, "druid4"); - resource.createUser(req, AUTHENTICATOR_NAME2, "druid5"); - resource.createUser(req, AUTHENTICATOR_NAME2, "druid6"); + resource.createUser(mockHttpRequest(), AUTHENTICATOR_NAME2, "druid4"); + resource.createUser(mockHttpRequest(), AUTHENTICATOR_NAME2, "druid5"); + resource.createUser(mockHttpRequest(), AUTHENTICATOR_NAME2, "druid6"); Set expectedUsers = ImmutableSet.of( BasicAuthUtils.ADMIN_NAME, @@ -239,12 +257,12 @@ public void testGetAllUsersSeparateDatabaseTables() "druid6" ); - response = resource.getAllUsers(req, AUTHENTICATOR_NAME); + response = resource.getAllUsers(mockHttpRequestNoAudit(), AUTHENTICATOR_NAME); Assert.assertEquals(200, response.getStatus()); Assert.assertEquals(expectedUsers, response.getEntity()); // Verify cached user map for AUTHENTICATOR_NAME authenticator is also getting updated - response = resource.getCachedSerializedUserMap(req, AUTHENTICATOR_NAME); + response = resource.getCachedSerializedUserMap(mockHttpRequestNoAudit(), AUTHENTICATOR_NAME); Assert.assertEquals(200, response.getStatus()); Assert.assertTrue(response.getEntity() instanceof byte[]); @@ -260,12 +278,12 @@ public void testGetAllUsersSeparateDatabaseTables() Assert.assertNotNull(cachedUserMap.get("druid3")); Assert.assertEquals(cachedUserMap.get("druid3").getName(), "druid3"); - response = resource.getAllUsers(req, AUTHENTICATOR_NAME2); + response = resource.getAllUsers(mockHttpRequestNoAudit(), AUTHENTICATOR_NAME2); Assert.assertEquals(200, response.getStatus()); Assert.assertEquals(expectedUsers2, response.getEntity()); // Verify cached user map for each AUTHENTICATOR_NAME2 is also getting updated - response = resource.getCachedSerializedUserMap(req, AUTHENTICATOR_NAME2); + response = resource.getCachedSerializedUserMap(mockHttpRequestNoAudit(), AUTHENTICATOR_NAME2); Assert.assertEquals(200, response.getStatus()); Assert.assertTrue(response.getEntity() instanceof byte[]); @@ -285,29 +303,29 @@ public void testGetAllUsersSeparateDatabaseTables() @Test public void testCreateDeleteUser() { - Response response = resource.createUser(req, AUTHENTICATOR_NAME, "druid"); + Response response = resource.createUser(mockHttpRequest(), AUTHENTICATOR_NAME, "druid"); Assert.assertEquals(200, response.getStatus()); - response = resource.getUser(req, AUTHENTICATOR_NAME, "druid"); + response = resource.getUser(mockHttpRequestNoAudit(), AUTHENTICATOR_NAME, "druid"); Assert.assertEquals(200, response.getStatus()); BasicAuthenticatorUser expectedUser = new BasicAuthenticatorUser("druid", null); Assert.assertEquals(expectedUser, response.getEntity()); - response = resource.deleteUser(req, AUTHENTICATOR_NAME, "druid"); + response = resource.deleteUser(mockHttpRequest(), AUTHENTICATOR_NAME, "druid"); Assert.assertEquals(200, response.getStatus()); - response = resource.getCachedSerializedUserMap(req, AUTHENTICATOR_NAME); + response = resource.getCachedSerializedUserMap(mockHttpRequestNoAudit(), AUTHENTICATOR_NAME); Assert.assertEquals(200, response.getStatus()); Assert.assertTrue(response.getEntity() instanceof byte[]); Map cachedUserMap = BasicAuthUtils.deserializeAuthenticatorUserMap(objectMapper, (byte[]) response.getEntity()); Assert.assertNotNull(cachedUserMap); Assert.assertNull(cachedUserMap.get("druid")); - response = resource.deleteUser(req, AUTHENTICATOR_NAME, "druid"); + response = resource.deleteUser(mockHttpRequestNoAudit(), AUTHENTICATOR_NAME, "druid"); Assert.assertEquals(400, response.getStatus()); Assert.assertEquals(errorMapWithMsg("User [druid] does not exist."), response.getEntity()); - response = resource.getUser(req, AUTHENTICATOR_NAME, "druid"); + response = resource.getUser(mockHttpRequestNoAudit(), AUTHENTICATOR_NAME, "druid"); Assert.assertEquals(400, response.getStatus()); Assert.assertEquals(errorMapWithMsg("User [druid] does not exist."), response.getEntity()); } @@ -315,18 +333,18 @@ public void testCreateDeleteUser() @Test public void testUserCredentials() { - Response response = resource.createUser(req, AUTHENTICATOR_NAME, "druid"); + Response response = resource.createUser(mockHttpRequest(), AUTHENTICATOR_NAME, "druid"); Assert.assertEquals(200, response.getStatus()); response = resource.updateUserCredentials( - req, + mockHttpRequest(), AUTHENTICATOR_NAME, "druid", new BasicAuthenticatorCredentialUpdate("helloworld", null) ); Assert.assertEquals(200, response.getStatus()); - response = resource.getUser(req, AUTHENTICATOR_NAME, "druid"); + response = resource.getUser(mockHttpRequestNoAudit(), AUTHENTICATOR_NAME, "druid"); Assert.assertEquals(200, response.getStatus()); BasicAuthenticatorUser actualUser = (BasicAuthenticatorUser) response.getEntity(); Assert.assertEquals("druid", actualUser.getName()); @@ -346,7 +364,7 @@ public void testUserCredentials() ); Assert.assertArrayEquals(recalculatedHash, hash); - response = resource.getCachedSerializedUserMap(req, AUTHENTICATOR_NAME); + response = resource.getCachedSerializedUserMap(mockHttpRequestNoAudit(), AUTHENTICATOR_NAME); Assert.assertEquals(200, response.getStatus()); Assert.assertTrue(response.getEntity() instanceof byte[]); Map cachedUserMap = BasicAuthUtils.deserializeAuthenticatorUserMap(objectMapper, (byte[]) response.getEntity()); @@ -369,15 +387,15 @@ public void testUserCredentials() ); Assert.assertArrayEquals(recalculatedHash, hash); - response = resource.deleteUser(req, AUTHENTICATOR_NAME, "druid"); + response = resource.deleteUser(mockHttpRequest(), AUTHENTICATOR_NAME, "druid"); Assert.assertEquals(200, response.getStatus()); - response = resource.getUser(req, AUTHENTICATOR_NAME, "druid"); + response = resource.getUser(mockHttpRequestNoAudit(), AUTHENTICATOR_NAME, "druid"); Assert.assertEquals(400, response.getStatus()); Assert.assertEquals(errorMapWithMsg("User [druid] does not exist."), response.getEntity()); response = resource.updateUserCredentials( - req, + mockHttpRequestNoAudit(), AUTHENTICATOR_NAME, "druid", new BasicAuthenticatorCredentialUpdate("helloworld", null) @@ -386,6 +404,36 @@ public void testUserCredentials() Assert.assertEquals(errorMapWithMsg("User [druid] does not exist."), response.getEntity()); } + private HttpServletRequest mockHttpRequestNoAudit() + { + if (req != null) { + EasyMock.verify(req); + } + req = EasyMock.createStrictMock(HttpServletRequest.class); + EasyMock.replay(req); + return req; + } + + private HttpServletRequest mockHttpRequest() + { + if (req != null) { + EasyMock.verify(req); + } + req = EasyMock.createStrictMock(HttpServletRequest.class); + EasyMock.expect(req.getHeader(AuditManager.X_DRUID_AUTHOR)).andReturn("author").once(); + EasyMock.expect(req.getHeader(AuditManager.X_DRUID_COMMENT)).andReturn("comment").once(); + EasyMock.expect(req.getAttribute(AuthConfig.DRUID_AUTHENTICATION_RESULT)).andReturn( + new AuthenticationResult("id", "authorizer", "authBy", Collections.emptyMap()) + ).once(); + EasyMock.expect(req.getRemoteAddr()).andReturn("127.0.0.1").once(); + EasyMock.expect(req.getMethod()).andReturn("GET").once(); + EasyMock.expect(req.getRequestURI()).andReturn("uri").once(); + EasyMock.expect(req.getQueryString()).andReturn("a=b").once(); + EasyMock.replay(req); + + return req; + } + private static Map errorMapWithMsg(String errorMsg) { return ImmutableMap.of("error", errorMsg); diff --git a/extensions-core/druid-basic-security/src/test/java/org/apache/druid/security/authorization/CoordinatorBasicAuthorizerResourceTest.java b/extensions-core/druid-basic-security/src/test/java/org/apache/druid/security/authorization/CoordinatorBasicAuthorizerResourceTest.java index 746d3339f974..225424daf4e2 100644 --- a/extensions-core/druid-basic-security/src/test/java/org/apache/druid/security/authorization/CoordinatorBasicAuthorizerResourceTest.java +++ b/extensions-core/druid-basic-security/src/test/java/org/apache/druid/security/authorization/CoordinatorBasicAuthorizerResourceTest.java @@ -25,6 +25,7 @@ import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Sets; +import org.apache.druid.audit.AuditManager; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.concurrent.Execs; import org.apache.druid.metadata.MetadataStorageTablesConfig; @@ -55,7 +56,6 @@ import org.junit.Before; import org.junit.Rule; import org.junit.Test; -import org.junit.rules.ExpectedException; import org.junit.runner.RunWith; import org.mockito.Mock; import org.mockito.junit.MockitoJUnitRunner; @@ -78,15 +78,14 @@ public class CoordinatorBasicAuthorizerResourceTest private static final String AUTHORIZER_NAME2 = "test2"; private static final String AUTHORIZER_NAME3 = "test3"; - @Rule - public ExpectedException expectedException = ExpectedException.none(); - @Rule public final TestDerbyConnector.DerbyConnectorRule derbyConnectorRule = new TestDerbyConnector.DerbyConnectorRule(); @Mock private AuthValidator authValidator; @Mock private HttpServletRequest req; + @Mock + private AuditManager auditManager; private TestDerbyConnector connector; private MetadataStorageTablesConfig tablesConfig; @@ -154,7 +153,8 @@ public void setUp() authorizerMapper, new ObjectMapper(new SmileFactory()) ), - authValidator + authValidator, + auditManager ); storageUpdater.start(); diff --git a/extensions-core/druid-basic-security/src/test/java/org/apache/druid/security/basic/authentication/endpoint/BasicAuthenticatorResourceTest.java b/extensions-core/druid-basic-security/src/test/java/org/apache/druid/security/basic/authentication/endpoint/BasicAuthenticatorResourceTest.java index fc403b08ca30..6545b880a065 100644 --- a/extensions-core/druid-basic-security/src/test/java/org/apache/druid/security/basic/authentication/endpoint/BasicAuthenticatorResourceTest.java +++ b/extensions-core/druid-basic-security/src/test/java/org/apache/druid/security/basic/authentication/endpoint/BasicAuthenticatorResourceTest.java @@ -58,7 +58,7 @@ public void setUp() .when(authValidator) .validateAuthenticatorName(INVALID_AUTHENTICATOR_NAME); - target = new BasicAuthenticatorResource(handler, authValidator); + target = new BasicAuthenticatorResource(handler, authValidator, null); } @Test diff --git a/extensions-core/druid-basic-security/src/test/java/org/apache/druid/security/basic/authorization/endpoint/BasicAuthorizerResourceTest.java b/extensions-core/druid-basic-security/src/test/java/org/apache/druid/security/basic/authorization/endpoint/BasicAuthorizerResourceTest.java index de0bf1db4e32..6350d3f91399 100644 --- a/extensions-core/druid-basic-security/src/test/java/org/apache/druid/security/basic/authorization/endpoint/BasicAuthorizerResourceTest.java +++ b/extensions-core/druid-basic-security/src/test/java/org/apache/druid/security/basic/authorization/endpoint/BasicAuthorizerResourceTest.java @@ -68,7 +68,7 @@ public void setUp() .when(authValidator) .validateAuthorizerName(INVALID_AUTHORIZER_NAME); - target = new BasicAuthorizerResource(resourceHandler, authValidator); + target = new BasicAuthorizerResource(resourceHandler, authValidator, null); } @Test diff --git a/extensions-core/mysql-metadata-storage/src/test/java/org/apache/druid/metadata/storage/mysql/MySQLMetadataStorageModuleTest.java b/extensions-core/mysql-metadata-storage/src/test/java/org/apache/druid/metadata/storage/mysql/MySQLMetadataStorageModuleTest.java index 0e2d6c359b5b..b1c4922ea643 100644 --- a/extensions-core/mysql-metadata-storage/src/test/java/org/apache/druid/metadata/storage/mysql/MySQLMetadataStorageModuleTest.java +++ b/extensions-core/mysql-metadata-storage/src/test/java/org/apache/druid/metadata/storage/mysql/MySQLMetadataStorageModuleTest.java @@ -32,6 +32,7 @@ import org.apache.druid.guice.LifecycleModule; import org.apache.druid.guice.MetadataConfigModule; import org.apache.druid.guice.annotations.Json; +import org.apache.druid.guice.security.EscalatorModule; import org.apache.druid.java.util.emitter.core.NoopEmitter; import org.apache.druid.java.util.emitter.service.ServiceEmitter; import org.junit.Assert; @@ -111,6 +112,7 @@ private Injector createInjector() MySQLMetadataStorageModule module = new MySQLMetadataStorageModule(); Injector injector = GuiceInjectors.makeStartupInjectorWithModules( ImmutableList.of( + new EscalatorModule(), new MetadataConfigModule(), new LifecycleModule(), module, diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordResource.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordResource.java index 01af55eadb31..7afaaa905264 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordResource.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordResource.java @@ -24,12 +24,12 @@ import com.google.common.base.Optional; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.inject.Inject; import com.sun.jersey.spi.container.ResourceFilters; import org.apache.druid.audit.AuditEntry; -import org.apache.druid.audit.AuditInfo; import org.apache.druid.audit.AuditManager; import org.apache.druid.client.indexing.ClientTaskQuery; import org.apache.druid.common.config.ConfigManager.SetResult; @@ -37,6 +37,7 @@ import org.apache.druid.error.DruidException; import org.apache.druid.error.ErrorResponse; import org.apache.druid.indexer.RunnerTaskState; +import org.apache.druid.indexer.TaskIdentifier; import org.apache.druid.indexer.TaskInfo; import org.apache.druid.indexer.TaskLocation; import org.apache.druid.indexer.TaskStatus; @@ -93,7 +94,6 @@ import javax.ws.rs.DELETE; import javax.ws.rs.DefaultValue; import javax.ws.rs.GET; -import javax.ws.rs.HeaderParam; import javax.ws.rs.POST; import javax.ws.rs.Path; import javax.ws.rs.PathParam; @@ -139,6 +139,8 @@ public class OverlordResource private AtomicReference workerConfigRef = null; private static final List API_TASK_STATES = ImmutableList.of("pending", "waiting", "running", "complete"); + private static final Set AUDITED_TASK_TYPES + = ImmutableSet.of("index", "index_parallel", "compact", "index_hadoop", "kill"); private enum TaskStateLookup { @@ -193,7 +195,10 @@ public OverlordResource( @Path("/task") @Consumes(MediaType.APPLICATION_JSON) @Produces(MediaType.APPLICATION_JSON) - public Response taskPost(final Task task, @Context final HttpServletRequest req) + public Response taskPost( + final Task task, + @Context final HttpServletRequest req + ) { final Set resourceActions; try { @@ -201,13 +206,8 @@ public Response taskPost(final Task task, @Context final HttpServletRequest req) } catch (UOE e) { return Response.status(Response.Status.BAD_REQUEST) - .entity( - ImmutableMap.of( - "error", - e.getMessage() - ) - ) - .build(); + .entity(ImmutableMap.of("error", e.getMessage())) + .build(); } Access authResult = AuthorizationUtils.authorizeAllResourceActions( @@ -225,18 +225,31 @@ public Response taskPost(final Task task, @Context final HttpServletRequest req) taskQueue -> { try { taskQueue.add(task); + + if (AUDITED_TASK_TYPES.contains(task.getType())) { + auditManager.doAudit( + AuditEntry.builder() + .key(task.getDataSource()) + .type("task") + .request(AuthorizationUtils.buildRequestInfo("overlord", req)) + .payload(new TaskIdentifier(task.getId(), task.getGroupId(), task.getType())) + .auditInfo(AuthorizationUtils.buildAuditInfo(req)) + .build() + ); + } + return Response.ok(ImmutableMap.of("task", task.getId())).build(); } catch (DruidException e) { return Response - .status(e.getStatusCode()) - .entity(new ErrorResponse(e)) - .build(); + .status(e.getStatusCode()) + .entity(new ErrorResponse(e)) + .build(); } catch (org.apache.druid.common.exception.DruidException e) { return Response.status(e.getResponseCode()) - .entity(ImmutableMap.of("error", e.getMessage())) - .build(); + .entity(ImmutableMap.of("error", e.getMessage())) + .build(); } } ); @@ -541,15 +554,13 @@ public Response getTotalWorkerCapacity() @ResourceFilters(ConfigResourceFilter.class) public Response setWorkerConfig( final WorkerBehaviorConfig workerBehaviorConfig, - @HeaderParam(AuditManager.X_DRUID_AUTHOR) @DefaultValue("") final String author, - @HeaderParam(AuditManager.X_DRUID_COMMENT) @DefaultValue("") final String comment, @Context final HttpServletRequest req ) { final SetResult setResult = configManager.set( WorkerBehaviorConfig.CONFIG_KEY, workerBehaviorConfig, - new AuditInfo(author, comment, req.getRemoteAddr()) + AuthorizationUtils.buildAuditInfo(req) ); if (setResult.isOk()) { log.info("Updating Worker configs: %s", workerBehaviorConfig); diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentAllocateActionTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentAllocateActionTest.java index 13c499e47e2a..4ccb87077506 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentAllocateActionTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentAllocateActionTest.java @@ -55,11 +55,11 @@ import org.junit.Before; import org.junit.Rule; import org.junit.Test; -import org.junit.rules.ExpectedException; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; import java.io.IOException; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -70,9 +70,6 @@ @RunWith(Parameterized.class) public class SegmentAllocateActionTest { - @Rule - public ExpectedException thrown = ExpectedException.none(); - @Rule public TaskActionTestKit taskActionTestKit = new TaskActionTestKit(); @@ -403,6 +400,72 @@ public void testResumeSequence() assertSameIdentifier(id2, id7); } + @Test + public void testSegmentIsAllocatedForLatestUsedSegmentVersion() throws IOException + { + final Task task = NoopTask.create(); + taskActionTestKit.getTaskLockbox().add(task); + + final String sequenceName = "sequence_1"; + + // Allocate segments when there are no committed segments + final SegmentIdWithShardSpec pendingSegmentV01 = + allocate(task, PARTY_TIME, Granularities.NONE, Granularities.HOUR, sequenceName, null); + final SegmentIdWithShardSpec pendingSegmentV02 = + allocate(task, PARTY_TIME, Granularities.NONE, Granularities.HOUR, sequenceName, null); + + assertSameIdentifier(pendingSegmentV01, pendingSegmentV02); + + // Commit a segment for version V1 + final DataSegment segmentV1 + = DataSegment.builder() + .dataSource(DATA_SOURCE) + .interval(Granularities.HOUR.bucket(PARTY_TIME)) + .version(PARTY_TIME.plusDays(1).toString()) + .shardSpec(new LinearShardSpec(0)) + .size(100) + .build(); + taskActionTestKit.getMetadataStorageCoordinator().commitSegments( + Collections.singleton(segmentV1) + ); + + // Verify that new allocations use version V1 + final SegmentIdWithShardSpec pendingSegmentV11 = + allocate(task, PARTY_TIME, Granularities.NONE, Granularities.HOUR, sequenceName, null); + final SegmentIdWithShardSpec pendingSegmentV12 = + allocate(task, PARTY_TIME, Granularities.NONE, Granularities.HOUR, sequenceName, null); + + assertSameIdentifier(pendingSegmentV11, pendingSegmentV12); + Assert.assertEquals(segmentV1.getVersion(), pendingSegmentV11.getVersion()); + + Assert.assertNotEquals(pendingSegmentV01, pendingSegmentV11); + + // Commit a segment for version V2 to overshadow V1 + final DataSegment segmentV2 + = DataSegment.builder() + .dataSource(DATA_SOURCE) + .interval(Granularities.HOUR.bucket(PARTY_TIME)) + .version(PARTY_TIME.plusDays(2).toString()) + .shardSpec(new LinearShardSpec(0)) + .size(100) + .build(); + taskActionTestKit.getMetadataStorageCoordinator().commitSegments( + Collections.singleton(segmentV2) + ); + Assert.assertTrue(segmentV2.getVersion().compareTo(segmentV1.getVersion()) > 0); + + // Verify that new segment allocations use version V2 + final SegmentIdWithShardSpec pendingSegmentV21 = + allocate(task, PARTY_TIME, Granularities.NONE, Granularities.HOUR, sequenceName, null); + final SegmentIdWithShardSpec pendingSegmentV22 = + allocate(task, PARTY_TIME, Granularities.NONE, Granularities.HOUR, sequenceName, null); + assertSameIdentifier(pendingSegmentV21, pendingSegmentV22); + Assert.assertEquals(segmentV2.getVersion(), pendingSegmentV21.getVersion()); + + Assert.assertNotEquals(pendingSegmentV21, pendingSegmentV01); + Assert.assertNotEquals(pendingSegmentV21, pendingSegmentV11); + } + @Test public void testMultipleSequences() { diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordResourceTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordResourceTest.java index 6eed9e32df38..561e8dd9b199 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordResourceTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordResourceTest.java @@ -25,6 +25,8 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; +import org.apache.druid.audit.AuditEntry; +import org.apache.druid.audit.AuditManager; import org.apache.druid.common.config.JacksonConfigManager; import org.apache.druid.indexer.RunnerTaskState; import org.apache.druid.indexer.TaskInfo; @@ -32,6 +34,7 @@ import org.apache.druid.indexer.TaskState; import org.apache.druid.indexer.TaskStatus; import org.apache.druid.indexer.TaskStatusPlus; +import org.apache.druid.indexing.common.task.KillUnusedSegmentsTask; import org.apache.druid.indexing.common.task.NoopTask; import org.apache.druid.indexing.common.task.Task; import org.apache.druid.indexing.overlord.ImmutableWorkerInfo; @@ -67,6 +70,7 @@ import org.apache.druid.server.security.Resource; import org.apache.druid.server.security.ResourceAction; import org.apache.druid.server.security.ResourceType; +import org.easymock.Capture; import org.easymock.EasyMock; import org.jboss.netty.handler.codec.http.HttpResponseStatus; import org.joda.time.DateTime; @@ -104,7 +108,9 @@ public class OverlordResourceTest private IndexerMetadataStorageAdapter indexerMetadataStorageAdapter; private HttpServletRequest req; private TaskRunner taskRunner; + private TaskQueue taskQueue; private WorkerTaskRunnerQueryAdapter workerTaskRunnerQueryAdapter; + private AuditManager auditManager; @Rule public ExpectedException expectedException = ExpectedException.none(); @@ -113,6 +119,7 @@ public class OverlordResourceTest public void setUp() { taskRunner = EasyMock.createMock(TaskRunner.class); + taskQueue = EasyMock.createMock(TaskQueue.class); configManager = EasyMock.createMock(JacksonConfigManager.class); provisioningStrategy = EasyMock.createMock(ProvisioningStrategy.class); authConfig = EasyMock.createMock(AuthConfig.class); @@ -121,6 +128,7 @@ public void setUp() indexerMetadataStorageAdapter = EasyMock.createStrictMock(IndexerMetadataStorageAdapter.class); req = EasyMock.createStrictMock(HttpServletRequest.class); workerTaskRunnerQueryAdapter = EasyMock.createStrictMock(WorkerTaskRunnerQueryAdapter.class); + auditManager = EasyMock.createMock(AuditManager.class); EasyMock.expect(taskMaster.getTaskRunner()).andReturn( Optional.of(taskRunner) @@ -165,7 +173,7 @@ public Access authorize(AuthenticationResult authenticationResult, Resource reso indexerMetadataStorageAdapter, null, configManager, - null, + auditManager, authMapper, workerTaskRunnerQueryAdapter, provisioningStrategy, @@ -880,6 +888,53 @@ public void testSecuredTaskPost() overlordResource.taskPost(task, req); } + @Test + public void testKillTaskIsAudited() + { + EasyMock.expect(authConfig.isEnableInputSourceSecurity()).andReturn(false); + + final String username = Users.DRUID; + expectAuthorizationTokenCheck(username); + EasyMock.expect(req.getMethod()).andReturn("POST").once(); + EasyMock.expect(req.getRequestURI()).andReturn("/indexer/v2/task").once(); + EasyMock.expect(req.getQueryString()).andReturn("").once(); + EasyMock.expect(req.getHeader(AuditManager.X_DRUID_AUTHOR)).andReturn(username).once(); + EasyMock.expect(req.getHeader(AuditManager.X_DRUID_COMMENT)).andReturn("killing segments").once(); + EasyMock.expect(req.getAttribute(AuthConfig.DRUID_AUTHENTICATION_RESULT)) + .andReturn(new AuthenticationResult(username, "druid", null, null)) + .once(); + EasyMock.expect(req.getRemoteAddr()).andReturn("127.0.0.1").once(); + + EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes(); + EasyMock.expect(taskQueue.add(EasyMock.anyObject())).andReturn(true).once(); + + final Capture auditEntryCapture = EasyMock.newCapture(); + auditManager.doAudit(EasyMock.capture(auditEntryCapture)); + EasyMock.expectLastCall().once(); + + EasyMock.replay( + taskRunner, + taskMaster, + taskQueue, + taskStorageQueryAdapter, + indexerMetadataStorageAdapter, + req, + workerTaskRunnerQueryAdapter, + authConfig, + auditManager + ); + + Task task = new KillUnusedSegmentsTask("kill_all", "allow", Intervals.ETERNITY, null, false, 10, null); + overlordResource.taskPost(task, req); + + Assert.assertTrue(auditEntryCapture.hasCaptured()); + AuditEntry auditEntry = auditEntryCapture.getValue(); + Assert.assertEquals(username, auditEntry.getAuditInfo().getAuthor()); + Assert.assertEquals("killing segments", auditEntry.getAuditInfo().getComment()); + Assert.assertEquals("druid", auditEntry.getAuditInfo().getIdentity()); + Assert.assertEquals("127.0.0.1", auditEntry.getAuditInfo().getIp()); + } + @Test public void testTaskPostDeniesDatasourceReadUser() { diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordTest.java index f9ce36df1815..b0df15ce4ef5 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordTest.java @@ -31,6 +31,7 @@ import org.apache.curator.retry.RetryOneTime; import org.apache.curator.test.TestingServer; import org.apache.curator.test.Timing; +import org.apache.druid.audit.AuditManager; import org.apache.druid.curator.PotentiallyGzippedCompressionProvider; import org.apache.druid.curator.discovery.LatchableServiceAnnouncer; import org.apache.druid.discovery.DruidLeaderSelector; @@ -148,11 +149,18 @@ private void tearDownServerAndCurator() public void setUp() throws Exception { req = EasyMock.createMock(HttpServletRequest.class); - EasyMock.expect(req.getAttribute(AuthConfig.DRUID_ALLOW_UNSECURED_PATH)).andReturn(null).anyTimes(); - EasyMock.expect(req.getAttribute(AuthConfig.DRUID_AUTHORIZATION_CHECKED)).andReturn(null).anyTimes(); + EasyMock.expect(req.getHeader(AuditManager.X_DRUID_AUTHOR)).andReturn("author").once(); + EasyMock.expect(req.getHeader(AuditManager.X_DRUID_COMMENT)).andReturn("comment").once(); + EasyMock.expect(req.getRemoteAddr()).andReturn("127.0.0.1").once(); EasyMock.expect(req.getAttribute(AuthConfig.DRUID_AUTHENTICATION_RESULT)).andReturn( new AuthenticationResult("druid", "druid", null, null) ).anyTimes(); + EasyMock.expect(req.getMethod()).andReturn("GET").anyTimes(); + EasyMock.expect(req.getRequestURI()).andReturn("/request/uri").anyTimes(); + EasyMock.expect(req.getQueryString()).andReturn("query=string").anyTimes(); + + EasyMock.expect(req.getAttribute(AuthConfig.DRUID_ALLOW_UNSECURED_PATH)).andReturn(null).anyTimes(); + EasyMock.expect(req.getAttribute(AuthConfig.DRUID_AUTHORIZATION_CHECKED)).andReturn(null).anyTimes(); req.setAttribute(AuthConfig.DRUID_AUTHORIZATION_CHECKED, true); EasyMock.expectLastCall().anyTimes(); supervisorManager = EasyMock.createMock(SupervisorManager.class); @@ -260,13 +268,14 @@ public void testOverlordRun() throws Exception final TaskStorageQueryAdapter taskStorageQueryAdapter = new TaskStorageQueryAdapter(taskStorage, taskLockbox, taskMaster); final WorkerTaskRunnerQueryAdapter workerTaskRunnerQueryAdapter = new WorkerTaskRunnerQueryAdapter(taskMaster, null); // Test Overlord resource stuff + AuditManager auditManager = EasyMock.createNiceMock(AuditManager.class); overlordResource = new OverlordResource( taskMaster, taskStorageQueryAdapter, new IndexerMetadataStorageAdapter(taskStorageQueryAdapter, null), null, null, - null, + auditManager, AuthTestUtils.TEST_AUTHORIZER_MAPPER, workerTaskRunnerQueryAdapter, null, diff --git a/integration-tests/docker/environment-configs/common b/integration-tests/docker/environment-configs/common index 4d2d308b2425..e4bc11b7ce44 100644 --- a/integration-tests/docker/environment-configs/common +++ b/integration-tests/docker/environment-configs/common @@ -77,6 +77,7 @@ druid_coordinator_kill_supervisor_period=PT10S druid_coordinator_kill_supervisor_durationToRetain=PT0M druid_coordinator_period_metadataStoreManagementPeriod=PT10S druid_sql_planner_authorizeSystemTablesDirectly=true +druid_audit_manager_type=log # Testing the legacy config from https://github.com/apache/druid/pull/10267 # Can remove this when the flag is no longer needed diff --git a/integration-tests/pom.xml b/integration-tests/pom.xml index 1b81d653fa9b..2193dafd8552 100644 --- a/integration-tests/pom.xml +++ b/integration-tests/pom.xml @@ -43,6 +43,19 @@ org.apache.hadoop.fs.s3a.S3AFileSystem + + + + + org.yaml + snakeyaml + 1.33 + + + + com.amazonaws diff --git a/integration-tests/src/test/java/org/apache/druid/tests/security/ITBasicAuthConfigurationTest.java b/integration-tests/src/test/java/org/apache/druid/tests/security/ITBasicAuthConfigurationTest.java index 410e4a9202d2..17123c02352f 100644 --- a/integration-tests/src/test/java/org/apache/druid/tests/security/ITBasicAuthConfigurationTest.java +++ b/integration-tests/src/test/java/org/apache/druid/tests/security/ITBasicAuthConfigurationTest.java @@ -36,6 +36,7 @@ import org.testng.annotations.Guice; import org.testng.annotations.Test; +import java.io.IOException; import java.util.List; import java.util.Properties; @@ -80,7 +81,6 @@ public void test_druid99User_hasNodeAccess() protected void setupDatasourceOnlyUser() throws Exception { createUserAndRoleWithPermissions( - getHttpClient(User.ADMIN), "datasourceOnlyUser", "helloworld", "datasourceOnlyRole", @@ -92,7 +92,6 @@ protected void setupDatasourceOnlyUser() throws Exception protected void setupDatasourceAndContextParamsUser() throws Exception { createUserAndRoleWithPermissions( - getHttpClient(User.ADMIN), "datasourceAndContextParamsUser", "helloworld", "datasourceAndContextParamsRole", @@ -104,7 +103,6 @@ protected void setupDatasourceAndContextParamsUser() throws Exception protected void setupDatasourceAndSysTableUser() throws Exception { createUserAndRoleWithPermissions( - getHttpClient(User.ADMIN), "datasourceAndSysUser", "helloworld", "datasourceAndSysRole", @@ -116,7 +114,6 @@ protected void setupDatasourceAndSysTableUser() throws Exception protected void setupDatasourceAndSysAndStateUser() throws Exception { createUserAndRoleWithPermissions( - getHttpClient(User.ADMIN), "datasourceWithStateUser", "helloworld", "datasourceWithStateRole", @@ -128,7 +125,6 @@ protected void setupDatasourceAndSysAndStateUser() throws Exception protected void setupSysTableAndStateOnlyUser() throws Exception { createUserAndRoleWithPermissions( - getHttpClient(User.ADMIN), "stateOnlyUser", "helloworld", "stateOnlyRole", @@ -141,7 +137,6 @@ protected void setupTestSpecificHttpClients() throws Exception { // create a new user+role that can read /status createUserAndRoleWithPermissions( - getHttpClient(User.ADMIN), "druid", "helloworld", "druidrole", @@ -150,37 +145,18 @@ protected void setupTestSpecificHttpClients() throws Exception // create 100 users for (int i = 0; i < 100; i++) { - HttpUtil.makeRequest( - getHttpClient(User.ADMIN), - HttpMethod.POST, - config.getCoordinatorUrl() + "/druid-ext/basic-security/authentication/db/basic/users/druid" + i, - null - ); - - HttpUtil.makeRequest( - getHttpClient(User.ADMIN), - HttpMethod.POST, - config.getCoordinatorUrl() + "/druid-ext/basic-security/authorization/db/basic/users/druid" + i, - null - ); - - LOG.info("Finished creating user druid" + i); + final String username = "druid" + i; + postAsAdmin(null, "/authentication/db/basic/users/%s", username); + postAsAdmin(null, "/authorization/db/basic/users/%s", username); + LOG.info("Created user[%s]", username); } // setup the last of 100 users and check that it works - HttpUtil.makeRequest( - getHttpClient(User.ADMIN), - HttpMethod.POST, - config.getCoordinatorUrl() + "/druid-ext/basic-security/authentication/db/basic/users/druid99/credentials", - jsonMapper.writeValueAsBytes(new BasicAuthenticatorCredentialUpdate("helloworld", 5000)) - ); - - HttpUtil.makeRequest( - getHttpClient(User.ADMIN), - HttpMethod.POST, - config.getCoordinatorUrl() + "/druid-ext/basic-security/authorization/db/basic/users/druid99/roles/druidrole", - null + postAsAdmin( + new BasicAuthenticatorCredentialUpdate("helloworld", 5000), + "/authentication/db/basic/users/druid99/credentials" ); + postAsAdmin(null, "/authorization/db/basic/users/druid99/roles/druidrole"); druid99 = new CredentialedHttpClient( new BasicCredentials("druid99", "helloworld"), @@ -231,74 +207,41 @@ protected Properties getAvaticaConnectionPropertiesForUser(User user) } private void createUserAndRoleWithPermissions( - HttpClient adminClient, String user, String password, String role, List permissions ) throws Exception { - HttpUtil.makeRequest( - adminClient, - HttpMethod.POST, - StringUtils.format( - "%s/druid-ext/basic-security/authentication/db/basic/users/%s", - config.getCoordinatorUrl(), - user - ), - null - ); - HttpUtil.makeRequest( - adminClient, - HttpMethod.POST, - StringUtils.format( - "%s/druid-ext/basic-security/authentication/db/basic/users/%s/credentials", - config.getCoordinatorUrl(), - user - ), - jsonMapper.writeValueAsBytes(new BasicAuthenticatorCredentialUpdate(password, 5000)) - ); - HttpUtil.makeRequest( - adminClient, - HttpMethod.POST, - StringUtils.format( - "%s/druid-ext/basic-security/authorization/db/basic/users/%s", - config.getCoordinatorUrl(), - user - ), - null - ); - HttpUtil.makeRequest( - adminClient, - HttpMethod.POST, - StringUtils.format( - "%s/druid-ext/basic-security/authorization/db/basic/roles/%s", - config.getCoordinatorUrl(), - role - ), - null - ); - HttpUtil.makeRequest( - adminClient, - HttpMethod.POST, - StringUtils.format( - "%s/druid-ext/basic-security/authorization/db/basic/users/%s/roles/%s", - config.getCoordinatorUrl(), - user, - role - ), - null - ); - byte[] permissionsBytes = jsonMapper.writeValueAsBytes(permissions); - HttpUtil.makeRequest( - adminClient, - HttpMethod.POST, - StringUtils.format( - "%s/druid-ext/basic-security/authorization/db/basic/roles/%s/permissions", - config.getCoordinatorUrl(), - role - ), - permissionsBytes - ); + // Setup authentication by creating user and password + postAsAdmin(null, "/authentication/db/basic/users/%s", user); + + final BasicAuthenticatorCredentialUpdate credentials + = new BasicAuthenticatorCredentialUpdate(password, 5000); + postAsAdmin(credentials, "/authentication/db/basic/users/%s/credentials", user); + + // Setup authorization by assigning a role to the user + postAsAdmin(null, "/authorization/db/basic/users/%s", user); + postAsAdmin(null, "/authorization/db/basic/roles/%s", role); + postAsAdmin(null, "/authorization/db/basic/users/%s/roles/%s", user, role); + postAsAdmin(permissions, "/authorization/db/basic/roles/%s/permissions", role); + } + + private void postAsAdmin( + Object payload, + String pathFormat, + Object... pathParams + ) throws IOException + { + HttpClient adminClient = getHttpClient(User.ADMIN); + + byte[] payloadBytes = payload == null ? null : jsonMapper.writeValueAsBytes(payload); + String url = getBaseUrl() + StringUtils.format(pathFormat, pathParams); + HttpUtil.makeRequest(adminClient, HttpMethod.POST, url, payloadBytes); + } + + private String getBaseUrl() + { + return config.getCoordinatorUrl() + "/druid-ext/basic-security"; } } diff --git a/licenses.yaml b/licenses.yaml index 4367878cbd07..0e96c2ffb1be 100644 --- a/licenses.yaml +++ b/licenses.yaml @@ -1022,7 +1022,7 @@ name: org.yaml snakeyaml license_category: binary module: extensions/druid-kubernetes-extensions license_name: Apache License version 2.0 -version: 1.33 +version: 2.2 libraries: - org.yaml: snakeyaml @@ -2885,6 +2885,18 @@ libraries: - io.confluent: kafka-schema-registry-client - io.confluent: common-utils +--- + +name: org.yaml snakeyaml +license_category: binary +module: extensions/druid-protobuf-extensions +license_name: Apache License version 2.0 +version: 2.0 +libraries: + - org.yaml: snakeyaml + + + --- name: Confluent Kafka Client diff --git a/owasp-dependency-check-suppressions.xml b/owasp-dependency-check-suppressions.xml index 199087e4497b..320a9bcfbc11 100644 --- a/owasp-dependency-check-suppressions.xml +++ b/owasp-dependency-check-suppressions.xml @@ -275,12 +275,23 @@ - - - + + + CVE-2022-1471 CVE-2023-2251 diff --git a/pom.xml b/pom.xml index 59d8bacb6396..81cb00bb0cf5 100644 --- a/pom.xml +++ b/pom.xml @@ -364,11 +364,6 @@ json-smart 2.4.11 - - org.yaml - snakeyaml - 1.33 -