From feeb4f0fb03fce90e523c7e1c10e71a19478400c Mon Sep 17 00:00:00 2001 From: Kashif Faraz Date: Thu, 14 Dec 2023 16:18:39 +0530 Subject: [PATCH 01/14] Allocate pending segments at latest committed version (#15459) The segment allocation algorithm reuses an already allocated pending segment if the new allocation request is made for the same parameters: datasource sequence name same interval same value of skipSegmentLineageCheck (false for batch append, true for streaming append) same previous segment id (used only when skipSegmentLineageCheck = false) The above parameters can thus uniquely identify a pending segment (enforced by the UNIQUE constraint on the sequence_name_prev_id_sha1 column in druid_pendingSegments metadata table). This reuse is done in order to allow replica tasks (in case of streaming ingestion) to use the same set of segment IDs. allow re-run of a failed batch task to use the same segment ID and prevent unnecessary allocations --- .../actions/SegmentAllocateActionTest.java | 71 ++- .../IndexerSQLMetadataStorageCoordinator.java | 522 ++++++++++-------- ...exerSQLMetadataStorageCoordinatorTest.java | 2 - 3 files changed, 365 insertions(+), 230 deletions(-) diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentAllocateActionTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentAllocateActionTest.java index 13c499e47e2a..4ccb87077506 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentAllocateActionTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentAllocateActionTest.java @@ -55,11 +55,11 @@ import org.junit.Before; import org.junit.Rule; import org.junit.Test; -import org.junit.rules.ExpectedException; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; import java.io.IOException; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -70,9 +70,6 @@ @RunWith(Parameterized.class) public class SegmentAllocateActionTest { - @Rule - public ExpectedException thrown = ExpectedException.none(); - @Rule public TaskActionTestKit taskActionTestKit = new TaskActionTestKit(); @@ -403,6 +400,72 @@ public void testResumeSequence() assertSameIdentifier(id2, id7); } + @Test + public void testSegmentIsAllocatedForLatestUsedSegmentVersion() throws IOException + { + final Task task = NoopTask.create(); + taskActionTestKit.getTaskLockbox().add(task); + + final String sequenceName = "sequence_1"; + + // Allocate segments when there are no committed segments + final SegmentIdWithShardSpec pendingSegmentV01 = + allocate(task, PARTY_TIME, Granularities.NONE, Granularities.HOUR, sequenceName, null); + final SegmentIdWithShardSpec pendingSegmentV02 = + allocate(task, PARTY_TIME, Granularities.NONE, Granularities.HOUR, sequenceName, null); + + assertSameIdentifier(pendingSegmentV01, pendingSegmentV02); + + // Commit a segment for version V1 + final DataSegment segmentV1 + = DataSegment.builder() + .dataSource(DATA_SOURCE) + .interval(Granularities.HOUR.bucket(PARTY_TIME)) + .version(PARTY_TIME.plusDays(1).toString()) + .shardSpec(new LinearShardSpec(0)) + .size(100) + .build(); + taskActionTestKit.getMetadataStorageCoordinator().commitSegments( + Collections.singleton(segmentV1) + ); + + // Verify that new allocations use version V1 + final SegmentIdWithShardSpec pendingSegmentV11 = + allocate(task, PARTY_TIME, Granularities.NONE, Granularities.HOUR, sequenceName, null); + final SegmentIdWithShardSpec pendingSegmentV12 = + allocate(task, PARTY_TIME, Granularities.NONE, Granularities.HOUR, sequenceName, null); + + assertSameIdentifier(pendingSegmentV11, pendingSegmentV12); + Assert.assertEquals(segmentV1.getVersion(), pendingSegmentV11.getVersion()); + + Assert.assertNotEquals(pendingSegmentV01, pendingSegmentV11); + + // Commit a segment for version V2 to overshadow V1 + final DataSegment segmentV2 + = DataSegment.builder() + .dataSource(DATA_SOURCE) + .interval(Granularities.HOUR.bucket(PARTY_TIME)) + .version(PARTY_TIME.plusDays(2).toString()) + .shardSpec(new LinearShardSpec(0)) + .size(100) + .build(); + taskActionTestKit.getMetadataStorageCoordinator().commitSegments( + Collections.singleton(segmentV2) + ); + Assert.assertTrue(segmentV2.getVersion().compareTo(segmentV1.getVersion()) > 0); + + // Verify that new segment allocations use version V2 + final SegmentIdWithShardSpec pendingSegmentV21 = + allocate(task, PARTY_TIME, Granularities.NONE, Granularities.HOUR, sequenceName, null); + final SegmentIdWithShardSpec pendingSegmentV22 = + allocate(task, PARTY_TIME, Granularities.NONE, Granularities.HOUR, sequenceName, null); + assertSameIdentifier(pendingSegmentV21, pendingSegmentV22); + Assert.assertEquals(segmentV2.getVersion(), pendingSegmentV21.getVersion()); + + Assert.assertNotEquals(pendingSegmentV21, pendingSegmentV01); + Assert.assertNotEquals(pendingSegmentV21, pendingSegmentV11); + } + @Test public void testMultipleSequences() { diff --git a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java index c62e59c0b25e..9e4fc578eda9 100644 --- a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java +++ b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java @@ -645,10 +645,23 @@ public SegmentIdWithShardSpec allocatePendingSegment( Preconditions.checkNotNull(sequenceName, "sequenceName"); Preconditions.checkNotNull(interval, "interval"); Preconditions.checkNotNull(maxVersion, "version"); - Interval allocateInterval = interval.withChronology(ISOChronology.getInstanceUTC()); + final Interval allocateInterval = interval.withChronology(ISOChronology.getInstanceUTC()); return connector.retryWithHandle( handle -> { + // Get the time chunk and associated data segments for the given interval, if any + final List> existingChunks = + getTimelineForIntervalsWithHandle(handle, dataSource, ImmutableList.of(interval)) + .lookup(interval); + if (existingChunks.size() > 1) { + // Not possible to expand more than one chunk with a single segment. + log.warn( + "Cannot allocate new segment for dataSource[%s], interval[%s] as it already has [%,d] versions.", + dataSource, interval, existingChunks.size() + ); + return null; + } + if (skipSegmentLineageCheck) { return allocatePendingSegment( handle, @@ -656,7 +669,8 @@ public SegmentIdWithShardSpec allocatePendingSegment( sequenceName, allocateInterval, partialShardSpec, - maxVersion + maxVersion, + existingChunks ); } else { return allocatePendingSegmentWithSegmentLineageCheck( @@ -666,7 +680,8 @@ public SegmentIdWithShardSpec allocatePendingSegment( previousSegmentId, allocateInterval, partialShardSpec, - maxVersion + maxVersion, + existingChunks ); } } @@ -803,26 +818,32 @@ private SegmentIdWithShardSpec allocatePendingSegmentWithSegmentLineageCheck( @Nullable final String previousSegmentId, final Interval interval, final PartialShardSpec partialShardSpec, - final String maxVersion + final String maxVersion, + final List> existingChunks ) throws IOException { final String previousSegmentIdNotNull = previousSegmentId == null ? "" : previousSegmentId; - final CheckExistingSegmentIdResult result = checkAndGetExistingSegmentId( - handle.createQuery( - StringUtils.format( - "SELECT payload FROM %s WHERE " - + "dataSource = :dataSource AND " - + "sequence_name = :sequence_name AND " - + "sequence_prev_id = :sequence_prev_id", - dbTables.getPendingSegmentsTable() - ) - ), + + final String sql = StringUtils.format( + "SELECT payload FROM %s WHERE " + + "dataSource = :dataSource AND " + + "sequence_name = :sequence_name AND " + + "sequence_prev_id = :sequence_prev_id", + dbTables.getPendingSegmentsTable() + ); + final Query> query + = handle.createQuery(sql) + .bind("dataSource", dataSource) + .bind("sequence_name", sequenceName) + .bind("sequence_prev_id", previousSegmentIdNotNull); + + final String usedSegmentVersion = existingChunks.isEmpty() ? null : existingChunks.get(0).getVersion(); + final CheckExistingSegmentIdResult result = findExistingPendingSegment( + query, interval, sequenceName, previousSegmentIdNotNull, - Pair.of("dataSource", dataSource), - Pair.of("sequence_name", sequenceName), - Pair.of("sequence_prev_id", previousSegmentIdNotNull) + usedSegmentVersion ); if (result.found) { @@ -835,7 +856,8 @@ private SegmentIdWithShardSpec allocatePendingSegmentWithSegmentLineageCheck( dataSource, interval, partialShardSpec, - maxVersion + maxVersion, + existingChunks ); if (newIdentifier == null) { return null; @@ -854,6 +876,8 @@ private SegmentIdWithShardSpec allocatePendingSegmentWithSegmentLineageCheck( .putBytes(StringUtils.toUtf8(sequenceName)) .putByte((byte) 0xff) .putBytes(StringUtils.toUtf8(previousSegmentIdNotNull)) + .putByte((byte) 0xff) + .putBytes(StringUtils.toUtf8(newIdentifier.getVersion())) .hash() .asBytes() ); @@ -878,11 +902,26 @@ private Map allocatePendingSegment final List requests ) throws IOException { + // Get the time chunk and associated data segments for the given interval, if any + final List> existingChunks = + getTimelineForIntervalsWithHandle(handle, dataSource, Collections.singletonList(interval)) + .lookup(interval); + if (existingChunks.size() > 1) { + log.warn( + "Cannot allocate new segments for dataSource[%s], interval[%s] as interval already has [%,d] chunks.", + dataSource, interval, existingChunks.size() + ); + return Collections.emptyMap(); + } + + final String existingVersion = existingChunks.isEmpty() ? null : existingChunks.get(0).getVersion(); final Map existingSegmentIds; if (skipSegmentLineageCheck) { - existingSegmentIds = getExistingSegmentIdsSkipLineageCheck(handle, dataSource, interval, requests); + existingSegmentIds = + getExistingSegmentIdsSkipLineageCheck(handle, dataSource, interval, existingVersion, requests); } else { - existingSegmentIds = getExistingSegmentIdsWithLineageCheck(handle, dataSource, interval, requests); + existingSegmentIds = + getExistingSegmentIdsWithLineageCheck(handle, dataSource, interval, existingVersion, requests); } // For every request see if a segment id already exists @@ -901,8 +940,14 @@ private Map allocatePendingSegment } // For each of the remaining requests, create a new segment - final Map createdSegments = - createNewSegments(handle, dataSource, interval, skipSegmentLineageCheck, requestsForNewSegments); + final Map createdSegments = createNewSegments( + handle, + dataSource, + interval, + skipSegmentLineageCheck, + existingChunks, + requestsForNewSegments + ); // SELECT -> INSERT can fail due to races; callers must be prepared to retry. // Avoiding ON DUPLICATE KEY since it's not portable. @@ -925,14 +970,16 @@ private Map allocatePendingSegment @SuppressWarnings("UnstableApiUsage") private String getSequenceNameAndPrevIdSha( SegmentCreateRequest request, - Interval interval, + SegmentIdWithShardSpec pendingSegmentId, boolean skipSegmentLineageCheck ) { final Hasher hasher = Hashing.sha1().newHasher() .putBytes(StringUtils.toUtf8(request.getSequenceName())) .putByte((byte) 0xff); + if (skipSegmentLineageCheck) { + final Interval interval = pendingSegmentId.getInterval(); hasher .putLong(interval.getStartMillis()) .putLong(interval.getEndMillis()); @@ -941,6 +988,9 @@ private String getSequenceNameAndPrevIdSha( .putBytes(StringUtils.toUtf8(request.getPreviousSegmentId())); } + hasher.putByte((byte) 0xff); + hasher.putBytes(StringUtils.toUtf8(pendingSegmentId.getVersion())); + return BaseEncoding.base16().encode(hasher.hash().asBytes()); } @@ -951,28 +1001,32 @@ private SegmentIdWithShardSpec allocatePendingSegment( final String sequenceName, final Interval interval, final PartialShardSpec partialShardSpec, - final String maxVersion + final String maxVersion, + final List> existingChunks ) throws IOException { - final CheckExistingSegmentIdResult result = checkAndGetExistingSegmentId( - handle.createQuery( - StringUtils.format( - "SELECT payload FROM %s WHERE " - + "dataSource = :dataSource AND " - + "sequence_name = :sequence_name AND " - + "start = :start AND " - + "%2$send%2$s = :end", - dbTables.getPendingSegmentsTable(), - connector.getQuoteString() - ) - ), + final String sql = StringUtils.format( + "SELECT payload FROM %s WHERE " + + "dataSource = :dataSource AND " + + "sequence_name = :sequence_name AND " + + "start = :start AND " + + "%2$send%2$s = :end", + dbTables.getPendingSegmentsTable(), + connector.getQuoteString() + ); + final Query> query + = handle.createQuery(sql) + .bind("dataSource", dataSource) + .bind("sequence_name", sequenceName) + .bind("start", interval.getStart().toString()) + .bind("end", interval.getEnd().toString()); + + final CheckExistingSegmentIdResult result = findExistingPendingSegment( + query, interval, sequenceName, null, - Pair.of("dataSource", dataSource), - Pair.of("sequence_name", sequenceName), - Pair.of("start", interval.getStart().toString()), - Pair.of("end", interval.getEnd().toString()) + existingChunks.isEmpty() ? null : existingChunks.get(0).getVersion() ); if (result.found) { @@ -984,7 +1038,8 @@ private SegmentIdWithShardSpec allocatePendingSegment( dataSource, interval, partialShardSpec, - maxVersion + maxVersion, + existingChunks ); if (newIdentifier == null) { return null; @@ -1004,6 +1059,8 @@ private SegmentIdWithShardSpec allocatePendingSegment( .putByte((byte) 0xff) .putLong(interval.getStartMillis()) .putLong(interval.getEndMillis()) + .putByte((byte) 0xff) + .putBytes(StringUtils.toUtf8(newIdentifier.getVersion())) .hash() .asBytes() ); @@ -1011,7 +1068,10 @@ private SegmentIdWithShardSpec allocatePendingSegment( // always insert empty previous sequence id insertPendingSegmentIntoMetastore(handle, newIdentifier, dataSource, interval, "", sequenceName, sequenceNamePrevIdSha1); - log.info("Allocated pending segment [%s] for sequence[%s] in DB", newIdentifier, sequenceName); + log.info( + "Created new pending segment[%s] for datasource[%s], sequence[%s], interval[%s].", + newIdentifier, dataSource, sequenceName, interval + ); return newIdentifier; } @@ -1023,6 +1083,7 @@ private Map getExistingSegme Handle handle, String dataSource, Interval interval, + String usedSegmentVersion, List requests ) throws IOException { @@ -1052,7 +1113,11 @@ private Map getExistingSegme final PendingSegmentsRecord record = dbSegments.next(); final SegmentIdWithShardSpec segmentId = jsonMapper.readValue(record.getPayload(), SegmentIdWithShardSpec.class); - sequenceToSegmentId.put(record.getSequenceName(), segmentId); + + // Consider only the pending segments allocated for the latest used segment version + if (usedSegmentVersion == null || segmentId.getVersion().equals(usedSegmentVersion)) { + sequenceToSegmentId.put(record.getSequenceName(), segmentId); + } } final Map requestToResult = new HashMap<>(); @@ -1071,6 +1136,7 @@ private Map getExistingSegme Handle handle, String dataSource, Interval interval, + String usedSegmentVersion, List requests ) throws IOException { @@ -1090,14 +1156,15 @@ private Map getExistingSegme final Map requestToResult = new HashMap<>(); for (SegmentCreateRequest request : requests) { - CheckExistingSegmentIdResult result = checkAndGetExistingSegmentId( + CheckExistingSegmentIdResult result = findExistingPendingSegment( handle.createQuery(sql) .bind("dataSource", dataSource) .bind("sequence_name", request.getSequenceName()) .bind("sequence_prev_id", request.getPreviousSegmentId()), interval, request.getSequenceName(), - request.getPreviousSegmentId() + request.getPreviousSegmentId(), + usedSegmentVersion ); requestToResult.put(request, result); } @@ -1105,50 +1172,43 @@ private Map getExistingSegme return requestToResult; } - private CheckExistingSegmentIdResult checkAndGetExistingSegmentId( + private CheckExistingSegmentIdResult findExistingPendingSegment( final Query> query, final Interval interval, final String sequenceName, final @Nullable String previousSegmentId, - final Pair... queryVars + final @Nullable String usedSegmentVersion ) throws IOException { - Query> boundQuery = query; - for (Pair var : queryVars) { - boundQuery = boundQuery.bind(var.lhs, var.rhs); - } - final List existingBytes = boundQuery.map(ByteArrayMapper.FIRST).list(); - - if (existingBytes.isEmpty()) { + final List records = query.map(ByteArrayMapper.FIRST).list(); + if (records.isEmpty()) { return new CheckExistingSegmentIdResult(false, null); - } else { - final SegmentIdWithShardSpec existingIdentifier = jsonMapper.readValue( - Iterables.getOnlyElement(existingBytes), - SegmentIdWithShardSpec.class - ); - - if (existingIdentifier.getInterval().isEqual(interval)) { - log.info( - "Found existing pending segment [%s] for sequence[%s] (previous = [%s]) in DB", - existingIdentifier, - sequenceName, - previousSegmentId - ); + } - return new CheckExistingSegmentIdResult(true, existingIdentifier); - } else { - log.warn( - "Cannot use existing pending segment [%s] for sequence[%s] (previous = [%s]) in DB, " - + "does not match requested interval[%s]", - existingIdentifier, - sequenceName, - previousSegmentId, - interval - ); + for (byte[] record : records) { + final SegmentIdWithShardSpec pendingSegment + = jsonMapper.readValue(record, SegmentIdWithShardSpec.class); - return new CheckExistingSegmentIdResult(true, null); + // Consider only pending segments matching the expected version + if (usedSegmentVersion == null || pendingSegment.getVersion().equals(usedSegmentVersion)) { + if (pendingSegment.getInterval().isEqual(interval)) { + log.info( + "Found existing pending segment[%s] for sequence[%s], previous segment[%s], version[%s] in DB", + pendingSegment, sequenceName, previousSegmentId, usedSegmentVersion + ); + return new CheckExistingSegmentIdResult(true, pendingSegment); + } else { + log.warn( + "Cannot use existing pending segment [%s] for sequence[%s], previous segment[%s] in DB" + + " as it does not match requested interval[%s], version[%s].", + pendingSegment, sequenceName, previousSegmentId, interval, usedSegmentVersion + ); + return new CheckExistingSegmentIdResult(true, null); + } } } + + return new CheckExistingSegmentIdResult(false, null); } private static class CheckExistingSegmentIdResult @@ -1164,6 +1224,52 @@ private static class CheckExistingSegmentIdResult } } + private static class UniqueAllocateRequest + { + private final Interval interval; + private final String previousSegmentId; + private final String sequenceName; + private final boolean skipSegmentLineageCheck; + + private final int hashCode; + + public UniqueAllocateRequest( + Interval interval, + SegmentCreateRequest request, + boolean skipSegmentLineageCheck + ) + { + this.interval = interval; + this.sequenceName = request.getSequenceName(); + this.previousSegmentId = request.getPreviousSegmentId(); + this.skipSegmentLineageCheck = skipSegmentLineageCheck; + + this.hashCode = Objects.hash(interval, sequenceName, previousSegmentId, skipSegmentLineageCheck); + } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + UniqueAllocateRequest that = (UniqueAllocateRequest) o; + return skipSegmentLineageCheck == that.skipSegmentLineageCheck + && Objects.equals(interval, that.interval) + && Objects.equals(sequenceName, that.sequenceName) + && Objects.equals(previousSegmentId, that.previousSegmentId); + } + + @Override + public int hashCode() + { + return hashCode; + } + } + private SegmentPublishResult commitAppendSegmentsAndMetadataInTransaction( Set appendSegments, Map appendSegmentToReplaceLock, @@ -1264,7 +1370,7 @@ private int insertPendingSegmentsIntoMetastore( .bind("sequence_prev_id", request.getPreviousSegmentId()) .bind( "sequence_name_prev_id_sha1", - getSequenceNameAndPrevIdSha(request, interval, skipSegmentLineageCheck) + getSequenceNameAndPrevIdSha(request, segmentId, skipSegmentLineageCheck) ) .bind("payload", jsonMapper.writeValueAsBytes(segmentId)); } @@ -1480,6 +1586,7 @@ private Map createNewSegments( String dataSource, Interval interval, boolean skipSegmentLineageCheck, + List> existingChunks, List requests ) throws IOException { @@ -1487,22 +1594,6 @@ private Map createNewSegments( return Collections.emptyMap(); } - // Get the time chunk and associated data segments for the given interval, if any - final List> existingChunks = - getTimelineForIntervalsWithHandle(handle, dataSource, Collections.singletonList(interval)) - .lookup(interval); - - if (existingChunks.size() > 1) { - // Not possible to expand more than one chunk with a single segment. - log.warn( - "Cannot allocate new segments for dataSource[%s], interval[%s]: already have [%,d] chunks.", - dataSource, - interval, - existingChunks.size() - ); - return Collections.emptyMap(); - } - // Shard spec of any of the requests (as they are all compatible) can be used to // identify existing shard specs that share partition space with the requested ones. final PartialShardSpec partialShardSpec = requests.get(0).getPartialShardSpec(); @@ -1542,15 +1633,16 @@ private Map createNewSegments( new HashSet<>(getPendingSegmentsForIntervalWithHandle(handle, dataSource, interval).keySet()); final Map createdSegments = new HashMap<>(); - final Map sequenceHashToSegment = new HashMap<>(); + final Map uniqueRequestToSegment = new HashMap<>(); for (SegmentCreateRequest request : requests) { // Check if the required segment has already been created in this batch - final String sequenceHash = getSequenceNameAndPrevIdSha(request, interval, skipSegmentLineageCheck); + final UniqueAllocateRequest uniqueRequest = + new UniqueAllocateRequest(interval, request, skipSegmentLineageCheck); final SegmentIdWithShardSpec createdSegment; - if (sequenceHashToSegment.containsKey(sequenceHash)) { - createdSegment = sequenceHashToSegment.get(sequenceHash); + if (uniqueRequestToSegment.containsKey(uniqueRequest)) { + createdSegment = uniqueRequestToSegment.get(uniqueRequest); } else { createdSegment = createNewSegment( request, @@ -1564,8 +1656,8 @@ private Map createNewSegments( // Add to pendingSegments to consider for partitionId if (createdSegment != null) { pendingSegments.add(createdSegment); - sequenceHashToSegment.put(sequenceHash, createdSegment); - log.info("Created new segment [%s]", createdSegment); + uniqueRequestToSegment.put(uniqueRequest, createdSegment); + log.info("Created new segment[%s]", createdSegment); } } @@ -1574,7 +1666,7 @@ private Map createNewSegments( } } - log.info("Created [%d] new segments for [%d] allocate requests.", sequenceHashToSegment.size(), requests.size()); + log.info("Created [%d] new segments for [%d] allocate requests.", uniqueRequestToSegment.size(), requests.size()); return createdSegments; } @@ -1694,140 +1786,122 @@ private SegmentIdWithShardSpec createNewSegment( final String dataSource, final Interval interval, final PartialShardSpec partialShardSpec, - final String existingVersion + final String existingVersion, + final List> existingChunks ) throws IOException { - // Get the time chunk and associated data segments for the given interval, if any - final List> existingChunks = getTimelineForIntervalsWithHandle( - handle, - dataSource, - ImmutableList.of(interval) - ).lookup(interval); - - if (existingChunks.size() > 1) { - // Not possible to expand more than one chunk with a single segment. - log.warn( - "Cannot allocate new segment for dataSource[%s], interval[%s]: already have [%,d] chunks.", - dataSource, - interval, - existingChunks.size() - ); - return null; + // max partitionId of published data segments which share the same partition space. + SegmentIdWithShardSpec committedMaxId = null; + @Nullable + final String versionOfExistingChunk; + if (existingChunks.isEmpty()) { + versionOfExistingChunk = null; } else { - // max partitionId of published data segments which share the same partition space. - SegmentIdWithShardSpec committedMaxId = null; + TimelineObjectHolder existingHolder = Iterables.getOnlyElement(existingChunks); + versionOfExistingChunk = existingHolder.getVersion(); - @Nullable - final String versionOfExistingChunk; - if (existingChunks.isEmpty()) { - versionOfExistingChunk = null; - } else { - TimelineObjectHolder existingHolder = Iterables.getOnlyElement(existingChunks); - versionOfExistingChunk = existingHolder.getVersion(); - - // Don't use the stream API for performance. - for (DataSegment segment : FluentIterable - .from(existingHolder.getObject()) - .transform(PartitionChunk::getObject) - // Here we check only the segments of the shardSpec which shares the same partition space with the given - // partialShardSpec. Note that OverwriteShardSpec doesn't share the partition space with others. - // See PartitionIds. - .filter(segment -> segment.getShardSpec().sharePartitionSpace(partialShardSpec))) { - if (committedMaxId == null - || committedMaxId.getShardSpec().getPartitionNum() < segment.getShardSpec().getPartitionNum()) { - committedMaxId = SegmentIdWithShardSpec.fromDataSegment(segment); - } + // Don't use the stream API for performance. + for (DataSegment segment : FluentIterable + .from(existingHolder.getObject()) + .transform(PartitionChunk::getObject) + // Here we check only the segments of the shardSpec which shares the same partition space with the given + // partialShardSpec. Note that OverwriteShardSpec doesn't share the partition space with others. + // See PartitionIds. + .filter(segment -> segment.getShardSpec().sharePartitionSpace(partialShardSpec))) { + if (committedMaxId == null + || committedMaxId.getShardSpec().getPartitionNum() < segment.getShardSpec().getPartitionNum()) { + committedMaxId = SegmentIdWithShardSpec.fromDataSegment(segment); } } + } - // Fetch the pending segments for this interval to determine max partitionId - // across all shard specs (published + pending). - // A pending segment having a higher partitionId must also be considered - // to avoid clashes when inserting the pending segment created here. - final Set pendings = new HashSet<>( - getPendingSegmentsForIntervalWithHandle(handle, dataSource, interval).keySet() - ); - if (committedMaxId != null) { - pendings.add(committedMaxId); - } + // Fetch the pending segments for this interval to determine max partitionId + // across all shard specs (published + pending). + // A pending segment having a higher partitionId must also be considered + // to avoid clashes when inserting the pending segment created here. + final Set pendings = new HashSet<>( + getPendingSegmentsForIntervalWithHandle(handle, dataSource, interval).keySet() + ); + if (committedMaxId != null) { + pendings.add(committedMaxId); + } - // If there is an existing chunk, find the max id with the same version as the existing chunk. - // There may still be a pending segment with a higher version (but no corresponding used segments) - // which may generate a clash with an existing segment once the new id is generated - final SegmentIdWithShardSpec overallMaxId; - overallMaxId = pendings.stream() - .filter(id -> id.getShardSpec().sharePartitionSpace(partialShardSpec)) - .filter(id -> versionOfExistingChunk == null - || id.getVersion().equals(versionOfExistingChunk)) - .max(Comparator.comparing(SegmentIdWithShardSpec::getVersion) - .thenComparing(id -> id.getShardSpec().getPartitionNum())) - .orElse(null); - - - // Determine the version of the new segment - final String newSegmentVersion; - if (versionOfExistingChunk != null) { - newSegmentVersion = versionOfExistingChunk; - } else if (overallMaxId != null) { - newSegmentVersion = overallMaxId.getVersion(); - } else { - // this is the first segment for this interval - newSegmentVersion = null; - } + // If there is an existing chunk, find the max id with the same version as the existing chunk. + // There may still be a pending segment with a higher version (but no corresponding used segments) + // which may generate a clash with an existing segment once the new id is generated + final SegmentIdWithShardSpec overallMaxId; + overallMaxId = pendings.stream() + .filter(id -> id.getShardSpec().sharePartitionSpace(partialShardSpec)) + .filter(id -> versionOfExistingChunk == null + || id.getVersion().equals(versionOfExistingChunk)) + .max(Comparator.comparing(SegmentIdWithShardSpec::getVersion) + .thenComparing(id -> id.getShardSpec().getPartitionNum())) + .orElse(null); - if (overallMaxId == null) { - // When appending segments, null overallMaxId means that we are allocating the very initial - // segment for this time chunk. - // This code is executed when the Overlord coordinates segment allocation, which is either you append segments - // or you use segment lock. Since the core partitions set is not determined for appended segments, we set - // it 0. When you use segment lock, the core partitions set doesn't work with it. We simply set it 0 so that the - // OvershadowableManager handles the atomic segment update. - final int newPartitionId = partialShardSpec.useNonRootGenerationPartitionSpace() - ? PartitionIds.NON_ROOT_GEN_START_PARTITION_ID - : PartitionIds.ROOT_GEN_START_PARTITION_ID; - String version = newSegmentVersion == null ? existingVersion : newSegmentVersion; - return new SegmentIdWithShardSpec( - dataSource, - interval, - version, - partialShardSpec.complete(jsonMapper, newPartitionId, 0) - ); - } else if (!overallMaxId.getInterval().equals(interval)) { - log.warn( - "Cannot allocate new segment for dataSource[%s], interval[%s], existingVersion[%s]: conflicting segment[%s].", - dataSource, - interval, - existingVersion, - overallMaxId - ); - return null; - } else if (committedMaxId != null - && committedMaxId.getShardSpec().getNumCorePartitions() - == SingleDimensionShardSpec.UNKNOWN_NUM_CORE_PARTITIONS) { - log.warn( - "Cannot allocate new segment because of unknown core partition size of segment[%s], shardSpec[%s]", - committedMaxId, - committedMaxId.getShardSpec() - ); - return null; - } else { - // The number of core partitions must always be chosen from the set of used segments in the SegmentTimeline. - // When the core partitions have been dropped, using pending segments may lead to an incorrect state - // where the chunk is believed to have core partitions and queries results are incorrect. - return new SegmentIdWithShardSpec( - dataSource, - interval, - Preconditions.checkNotNull(newSegmentVersion, "newSegmentVersion"), - partialShardSpec.complete( - jsonMapper, - overallMaxId.getShardSpec().getPartitionNum() + 1, - committedMaxId == null ? 0 : committedMaxId.getShardSpec().getNumCorePartitions() - ) - ); - } + // Determine the version of the new segment + final String newSegmentVersion; + if (versionOfExistingChunk != null) { + newSegmentVersion = versionOfExistingChunk; + } else if (overallMaxId != null) { + newSegmentVersion = overallMaxId.getVersion(); + } else { + // this is the first segment for this interval + newSegmentVersion = null; + } + + if (overallMaxId == null) { + // When appending segments, null overallMaxId means that we are allocating the very initial + // segment for this time chunk. + // This code is executed when the Overlord coordinates segment allocation, which is either you append segments + // or you use segment lock. Since the core partitions set is not determined for appended segments, we set + // it 0. When you use segment lock, the core partitions set doesn't work with it. We simply set it 0 so that the + // OvershadowableManager handles the atomic segment update. + final int newPartitionId = partialShardSpec.useNonRootGenerationPartitionSpace() + ? PartitionIds.NON_ROOT_GEN_START_PARTITION_ID + : PartitionIds.ROOT_GEN_START_PARTITION_ID; + String version = newSegmentVersion == null ? existingVersion : newSegmentVersion; + return new SegmentIdWithShardSpec( + dataSource, + interval, + version, + partialShardSpec.complete(jsonMapper, newPartitionId, 0) + ); + } else if (!overallMaxId.getInterval().equals(interval)) { + log.warn( + "Cannot allocate new segment for dataSource[%s], interval[%s], existingVersion[%s]: conflicting segment[%s].", + dataSource, + interval, + existingVersion, + overallMaxId + ); + return null; + } else if (committedMaxId != null + && committedMaxId.getShardSpec().getNumCorePartitions() + == SingleDimensionShardSpec.UNKNOWN_NUM_CORE_PARTITIONS) { + log.warn( + "Cannot allocate new segment because of unknown core partition size of segment[%s], shardSpec[%s]", + committedMaxId, + committedMaxId.getShardSpec() + ); + return null; + } else { + // The number of core partitions must always be chosen from the set of used segments in the SegmentTimeline. + // When the core partitions have been dropped, using pending segments may lead to an incorrect state + // where the chunk is believed to have core partitions and queries results are incorrect. + + return new SegmentIdWithShardSpec( + dataSource, + interval, + Preconditions.checkNotNull(newSegmentVersion, "newSegmentVersion"), + partialShardSpec.complete( + jsonMapper, + overallMaxId.getShardSpec().getPartitionNum() + 1, + committedMaxId == null ? 0 : committedMaxId.getShardSpec().getNumCorePartitions() + ) + ); } } diff --git a/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java b/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java index 9e977dec3e8c..4ee72e74f927 100644 --- a/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java +++ b/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java @@ -2078,7 +2078,6 @@ public void testAllocatePendingSegment() * - verify that the id for segment5 is correct * - Later, after the above was dropped, another segment on same interval was created by the stream but this * time there was an integrity violation in the pending segments table because the - * {@link IndexerSQLMetadataStorageCoordinator#createNewSegment(Handle, String, Interval, PartialShardSpec, String)} * method returned a segment id that already existed in the pending segments table */ @Test @@ -2178,7 +2177,6 @@ public void testAllocatePendingSegmentAfterDroppingExistingSegment() Assert.assertEquals("ds_2017-01-01T00:00:00.000Z_2017-02-01T00:00:00.000Z_version_new_2", identifier4.toString()); // Since all core partitions have been dropped Assert.assertEquals(0, identifier4.getShardSpec().getNumCorePartitions()); - } /** From 7fa987dae9240803dd371ebea14142dddf7f84f7 Mon Sep 17 00:00:00 2001 From: Abhishek Radhakrishnan Date: Thu, 14 Dec 2023 12:10:26 -0500 Subject: [PATCH 02/14] Update labeler to v5 that includes fix where bot doesn't remove labels added by maintainers. (#15558) --- .github/workflows/labeler.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/labeler.yml b/.github/workflows/labeler.yml index 5de3d3bcc5a1..4c6c1867583d 100644 --- a/.github/workflows/labeler.yml +++ b/.github/workflows/labeler.yml @@ -32,6 +32,6 @@ jobs: pull-requests: write runs-on: ubuntu-latest steps: - - uses: actions/labeler@v4 + - uses: actions/labeler@v5 with: repo-token: "${{ secrets.GITHUB_TOKEN }}" \ No newline at end of file From 9deeb288c5af8c082a67e31331e375714a8ce7f0 Mon Sep 17 00:00:00 2001 From: Abhishek Radhakrishnan Date: Thu, 14 Dec 2023 14:00:21 -0500 Subject: [PATCH 03/14] Update labeler config per v5 spec. (#15564) --- .github/labeler.yml | 68 ++++++++++++++++++++++++++++++--------------- 1 file changed, 46 insertions(+), 22 deletions(-) diff --git a/.github/labeler.yml b/.github/labeler.yml index a9bfc45a86ec..22895a82d8b7 100644 --- a/.github/labeler.yml +++ b/.github/labeler.yml @@ -21,47 +21,71 @@ # Pull Request Labeler GitHub Action Configuration: https://github.com/marketplace/actions/labeler 'Area - Batch Ingestion': - - 'indexing-hadoop/**' - - 'extensions-core/multi-stage-query/**' + - changed-files: + - any-glob-to-any-file: + - 'indexing-hadoop/**' + - 'extensions-core/multi-stage-query/**' 'Area - Dependencies': - - '**/pom.xml' - - 'licenses.yaml' + - changed-files: + - any-glob-to-any-file: + - '**/pom.xml' + - 'licenses.yaml' 'Area - Documentation': - - 'docs/**/*' - - 'website/**' - - 'examples/quickstart/jupyter-notebooks/**' + - changed-files: + - any-glob-to-any-file: + - 'docs/**/*' + - 'website/**' + - 'examples/quickstart/jupyter-notebooks/**' 'Area - Ingestion': - - 'indexing-service/**' + - changed-files: + - any-glob-to-any-file: + - 'indexing-service/**' 'Area - Lookups': - - 'extensions-core/lookups-cached-global/**' - - 'extensions-core/lookups-cached-single/**' - - 'extensions-core/kafka-extraction-namespace/**' + - changed-files: + - any-glob-to-any-file: + - 'extensions-core/lookups-cached-global/**' + - 'extensions-core/lookups-cached-single/**' + - 'extensions-core/kafka-extraction-namespace/**' 'Area - Metrics/Event Emitting': - - 'processing/src/main/java/org/apache/druid/java/util/metrics/**' - - 'processing/src/main/java/org/apache/druid/java/util/emitter/**' - - 'extensions-contrib/*-emitter/**' + - changed-files: + - any-glob-to-any-file: + - 'processing/src/main/java/org/apache/druid/java/util/metrics/**' + - 'processing/src/main/java/org/apache/druid/java/util/emitter/**' + - 'extensions-contrib/*-emitter/**' 'Area - MSQ': - - 'extensions-core/multi-stage-query/**' + - changed-files: + - any-glob-to-any-file: + - 'extensions-core/multi-stage-query/**' 'Area - Querying': - - 'sql/**' - - 'extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/**' + - changed-files: + - any-glob-to-any-file: + - 'sql/**' + - 'extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/**' 'Area - Segment Format and Ser/De': - - 'processing/src/main/java/org/apache/druid/segment/**' + - changed-files: + - any-glob-to-any-file: + - 'processing/src/main/java/org/apache/druid/segment/**' 'Area - Streaming Ingestion': - - 'extensions-core/kafka-indexing-service/**' - - 'extensions-core/kinesis-indexing-service/**' + - changed-files: + - any-glob-to-any-file: + - 'extensions-core/kafka-indexing-service/**' + - 'extensions-core/kinesis-indexing-service/**' 'Area - Web Console': - - 'web-console/**' + - changed-files: + - any-glob-to-any-file: + - 'web-console/**' 'Kubernetes': - - 'extensions-contrib/kubernetes-overlord-extensions/**' + - changed-files: + - any-glob-to-any-file: + - 'extensions-contrib/kubernetes-overlord-extensions/**' From c9be1cb4e84f15724a3d881fccc80e3b2432c0d5 Mon Sep 17 00:00:00 2001 From: sensor Date: Fri, 15 Dec 2023 11:18:53 +0800 Subject: [PATCH 04/14] Clean useless InterruptedException warn in ingestion task log (#15519) * Clean useless InterruptedException warn in ingestion task log * test coverage for the code change, manually close the scheduler thread to trigger Interrupt signal --------- Co-authored-by: Qiong Chen --- .../java/org/apache/druid/emitter/kafka/KafkaEmitter.java | 4 ++++ .../java/org/apache/druid/emitter/kafka/KafkaEmitterTest.java | 2 ++ 2 files changed, 6 insertions(+) diff --git a/extensions-contrib/kafka-emitter/src/main/java/org/apache/druid/emitter/kafka/KafkaEmitter.java b/extensions-contrib/kafka-emitter/src/main/java/org/apache/druid/emitter/kafka/KafkaEmitter.java index 4a58464f7308..c776e3f2f8b2 100644 --- a/extensions-contrib/kafka-emitter/src/main/java/org/apache/druid/emitter/kafka/KafkaEmitter.java +++ b/extensions-contrib/kafka-emitter/src/main/java/org/apache/druid/emitter/kafka/KafkaEmitter.java @@ -181,6 +181,10 @@ private void sendToKafka(final String topic, MemoryBoundLinkedBlockingQueue setKafkaProducer() } countDownSentEvents.await(); + kafkaEmitter.close(); + Assert.assertEquals(0, kafkaEmitter.getMetricLostCount()); Assert.assertEquals(0, kafkaEmitter.getAlertLostCount()); Assert.assertEquals(0, kafkaEmitter.getSegmentMetadataLostCount()); From 7552dc49fb7937b4de2ef26e9857b8d26003e4cc Mon Sep 17 00:00:00 2001 From: Zoltan Haindrich Date: Fri, 15 Dec 2023 11:41:59 +0100 Subject: [PATCH 05/14] Reduce amount of expression objects created during evaluations (#15552) I was looking into a query which was performing a bit poorly because the case_searched was touching more than 1 columns (if there is only 1 column there is a cache based evaluator). While I was doing that I've noticed that there are a few simple things which could help a bit: use a static TRUE/FALSE instead of creating a new object every time create the ExprEval early for ConstantExpr -s (except the one for BigInteger which seem to have some odd contract) return early from type autodetection these changes mostly reduce the amount of garbage the query creates during case_searched evaluation; although ExpressionSelectorBenchmark shows some improvements ~15% - but my manual trials on the taxi dataset with 60M rows showed more improvements - probably due to the fact that these changes mostly only reduce gc pressure. --- .../benchmark/ExpressionFilterBenchmark.java | 2 + .../ExpressionSelectorBenchmark.java | 156 +++++++++++++++++- .../apache/druid/math/expr/ConstantExpr.java | 20 ++- .../org/apache/druid/math/expr/ExprEval.java | 6 +- .../math/expr/ExpressionTypeConversion.java | 3 + .../org/apache/druid/math/expr/ExprTest.java | 7 +- .../druid/math/expr/OutputTypeTest.java | 11 ++ 7 files changed, 193 insertions(+), 12 deletions(-) diff --git a/benchmarks/src/test/java/org/apache/druid/benchmark/ExpressionFilterBenchmark.java b/benchmarks/src/test/java/org/apache/druid/benchmark/ExpressionFilterBenchmark.java index eea1804292ab..e7d8ad00bc5e 100644 --- a/benchmarks/src/test/java/org/apache/druid/benchmark/ExpressionFilterBenchmark.java +++ b/benchmarks/src/test/java/org/apache/druid/benchmark/ExpressionFilterBenchmark.java @@ -25,6 +25,7 @@ import org.apache.druid.java.util.common.granularity.Granularities; import org.apache.druid.java.util.common.guava.Sequence; import org.apache.druid.java.util.common.io.Closer; +import org.apache.druid.math.expr.ExpressionProcessing; import org.apache.druid.query.expression.TestExprMacroTable; import org.apache.druid.query.filter.AndDimFilter; import org.apache.druid.query.filter.DimFilter; @@ -70,6 +71,7 @@ public class ExpressionFilterBenchmark { static { NullHandling.initializeForTests(); + ExpressionProcessing.initializeForTests(); } @Param({"1000000"}) diff --git a/benchmarks/src/test/java/org/apache/druid/benchmark/ExpressionSelectorBenchmark.java b/benchmarks/src/test/java/org/apache/druid/benchmark/ExpressionSelectorBenchmark.java index fcc4cf24794a..d8e1dfc76464 100644 --- a/benchmarks/src/test/java/org/apache/druid/benchmark/ExpressionSelectorBenchmark.java +++ b/benchmarks/src/test/java/org/apache/druid/benchmark/ExpressionSelectorBenchmark.java @@ -25,8 +25,10 @@ import org.apache.druid.java.util.common.granularity.Granularities; import org.apache.druid.java.util.common.guava.Sequence; import org.apache.druid.java.util.common.io.Closer; +import org.apache.druid.math.expr.ExpressionProcessing; import org.apache.druid.query.dimension.DefaultDimensionSpec; import org.apache.druid.query.dimension.ExtractionDimensionSpec; +import org.apache.druid.query.expression.LookupEnabledTestExprMacroTable; import org.apache.druid.query.expression.TestExprMacroTable; import org.apache.druid.query.extraction.StrlenExtractionFn; import org.apache.druid.query.extraction.TimeFormatExtractionFn; @@ -59,21 +61,21 @@ import org.openjdk.jmh.annotations.TearDown; import org.openjdk.jmh.annotations.Warmup; import org.openjdk.jmh.infra.Blackhole; - import java.util.BitSet; import java.util.List; import java.util.concurrent.TimeUnit; @State(Scope.Benchmark) @Fork(value = 1) -@Warmup(iterations = 15) -@Measurement(iterations = 30) +@Warmup(iterations = 3, time = 3) +@Measurement(iterations = 10, time = 3) @BenchmarkMode(Mode.AverageTime) @OutputTimeUnit(TimeUnit.MILLISECONDS) public class ExpressionSelectorBenchmark { static { NullHandling.initializeForTests(); + ExpressionProcessing.initializeForTests(); } @Param({"1000000"}) @@ -451,6 +453,154 @@ public void stringConcatAndCompareOnLong(Blackhole blackhole) blackhole.consume(results); } + @Benchmark + public void caseSearched1(Blackhole blackhole) + { + final Sequence cursors = new QueryableIndexStorageAdapter(index).makeCursors( + null, + index.getDataInterval(), + VirtualColumns.create( + ImmutableList.of( + new ExpressionVirtualColumn( + "v", + "case_searched(s == 'asd' || isnull(s) || s == 'xxx', 1, s == 'foo' || s == 'bar', 2, 3)", + ColumnType.LONG, + TestExprMacroTable.INSTANCE + ) + ) + ), + Granularities.ALL, + false, + null + ); + + final List results = cursors + .map(cursor -> { + final ColumnValueSelector selector = cursor.getColumnSelectorFactory().makeColumnValueSelector("v"); + consumeLong(cursor, selector, blackhole); + return null; + }) + .toList(); + + blackhole.consume(results); + } + + @Benchmark + public void caseSearched2(Blackhole blackhole) + { + final Sequence cursors = new QueryableIndexStorageAdapter(index).makeCursors( + null, + index.getDataInterval(), + VirtualColumns.create( + ImmutableList.of( + new ExpressionVirtualColumn( + "v", + "case_searched(s == 'asd' || isnull(s) || n == 1, 1, n == 2, 2, 3)", + ColumnType.LONG, + TestExprMacroTable.INSTANCE + ) + ) + ), + Granularities.ALL, + false, + null + ); + + final List results = cursors + .map(cursor -> { + final ColumnValueSelector selector = cursor.getColumnSelectorFactory().makeColumnValueSelector("v"); + consumeLong(cursor, selector, blackhole); + return null; + }) + .toList(); + + blackhole.consume(results); + } + + @Benchmark + public void caseSearchedWithLookup(Blackhole blackhole) + { + final Sequence cursors = new QueryableIndexStorageAdapter(index).makeCursors( + null, + index.getDataInterval(), + VirtualColumns.create( + ImmutableList.of( + new ExpressionVirtualColumn( + "v", + "case_searched(n == 1001, -1, " + + "lookup(s, 'lookyloo') == 'asd1', 1, " + + "lookup(s, 'lookyloo') == 'asd2', 2, " + + "lookup(s, 'lookyloo') == 'asd3', 3, " + + "lookup(s, 'lookyloo') == 'asd4', 4, " + + "lookup(s, 'lookyloo') == 'asd5', 5, " + + "-2)", + ColumnType.LONG, + LookupEnabledTestExprMacroTable.INSTANCE + ) + ) + ), + Granularities.ALL, + false, + null + ); + + final List results = cursors + .map(cursor -> { + final ColumnValueSelector selector = cursor.getColumnSelectorFactory().makeColumnValueSelector("v"); + consumeLong(cursor, selector, blackhole); + return null; + }) + .toList(); + + blackhole.consume(results); + } + + @Benchmark + public void caseSearchedWithLookup2(Blackhole blackhole) + { + final Sequence cursors = new QueryableIndexStorageAdapter(index).makeCursors( + null, + index.getDataInterval(), + VirtualColumns.create( + ImmutableList.of( + new ExpressionVirtualColumn( + "ll", + "lookup(s, 'lookyloo')", + ColumnType.STRING, + LookupEnabledTestExprMacroTable.INSTANCE + ), + new ExpressionVirtualColumn( + "v", + "case_searched(n == 1001, -1, " + + "ll == 'asd1', 1, " + + "ll == 'asd2', 2, " + + "ll == 'asd3', 3, " + + "ll == 'asd4', 4, " + + "ll == 'asd5', 5, " + + "-2)", + ColumnType.LONG, + LookupEnabledTestExprMacroTable.INSTANCE + ) + ) + ), + Granularities.ALL, + false, + null + ); + + final List results = cursors + .map(cursor -> { + final ColumnValueSelector selector = cursor.getColumnSelectorFactory().makeColumnValueSelector("v"); + consumeLong(cursor, selector, blackhole); + return null; + }) + .toList(); + + blackhole.consume(results); + } + + + private void consumeDimension(final Cursor cursor, final DimensionSelector selector, final Blackhole blackhole) { if (selector.getValueCardinality() >= 0) { diff --git a/processing/src/main/java/org/apache/druid/math/expr/ConstantExpr.java b/processing/src/main/java/org/apache/druid/math/expr/ConstantExpr.java index fdf6f080ee9c..8790b2ed6145 100644 --- a/processing/src/main/java/org/apache/druid/math/expr/ConstantExpr.java +++ b/processing/src/main/java/org/apache/druid/math/expr/ConstantExpr.java @@ -157,9 +157,12 @@ public int hashCode() class LongExpr extends ConstantExpr { + private final ExprEval expr; + LongExpr(Long value) { super(ExpressionType.LONG, Preconditions.checkNotNull(value, "value")); + expr = ExprEval.ofLong(value); } @Override @@ -171,7 +174,7 @@ public String toString() @Override public ExprEval eval(ObjectBinding bindings) { - return ExprEval.ofLong(value); + return expr; } @Override @@ -240,9 +243,12 @@ public String toString() class DoubleExpr extends ConstantExpr { + private final ExprEval expr; + DoubleExpr(Double value) { super(ExpressionType.DOUBLE, Preconditions.checkNotNull(value, "value")); + expr = ExprEval.ofDouble(value); } @Override @@ -254,7 +260,7 @@ public String toString() @Override public ExprEval eval(ObjectBinding bindings) { - return ExprEval.ofDouble(value); + return expr; } @Override @@ -323,9 +329,12 @@ public String toString() class StringExpr extends ConstantExpr { + private final ExprEval expr; + StringExpr(@Nullable String value) { super(ExpressionType.STRING, NullHandling.emptyToNullIfNeeded(value)); + expr = ExprEval.of(value); } @Override @@ -337,7 +346,7 @@ public String toString() @Override public ExprEval eval(ObjectBinding bindings) { - return ExprEval.of(value); + return expr; } @Override @@ -464,15 +473,18 @@ public String toString() class ComplexExpr extends ConstantExpr { + private final ExprEval expr; + protected ComplexExpr(ExpressionType outputType, @Nullable Object value) { super(outputType, value); + expr = ExprEval.ofComplex(outputType, value); } @Override public ExprEval eval(ObjectBinding bindings) { - return ExprEval.ofComplex(outputType, value); + return expr; } @Override diff --git a/processing/src/main/java/org/apache/druid/math/expr/ExprEval.java b/processing/src/main/java/org/apache/druid/math/expr/ExprEval.java index 35f59e5ebe75..e62021623bda 100644 --- a/processing/src/main/java/org/apache/druid/math/expr/ExprEval.java +++ b/processing/src/main/java/org/apache/druid/math/expr/ExprEval.java @@ -363,7 +363,7 @@ public static ExprEval ofBoolean(boolean value, ExprType type) case DOUBLE: return ExprEval.of(Evals.asDouble(value)); case LONG: - return ExprEval.of(Evals.asLong(value)); + return ofLongBoolean(value); case STRING: return ExprEval.of(String.valueOf(value)); default: @@ -376,7 +376,7 @@ public static ExprEval ofBoolean(boolean value, ExprType type) */ public static ExprEval ofLongBoolean(boolean value) { - return ExprEval.of(Evals.asLong(value)); + return value ? LongExprEval.TRUE : LongExprEval.FALSE; } public static ExprEval ofComplex(ExpressionType outputType, @Nullable Object value) @@ -922,6 +922,8 @@ public Expr toExpr() private static class LongExprEval extends NumericExprEval { + private static final LongExprEval TRUE = new LongExprEval(Evals.asLong(true)); + private static final LongExprEval FALSE = new LongExprEval(Evals.asLong(false)); private static final LongExprEval OF_NULL = new LongExprEval(null); private LongExprEval(@Nullable Number value) diff --git a/processing/src/main/java/org/apache/druid/math/expr/ExpressionTypeConversion.java b/processing/src/main/java/org/apache/druid/math/expr/ExpressionTypeConversion.java index ba576afa0c15..efe0328a8375 100644 --- a/processing/src/main/java/org/apache/druid/math/expr/ExpressionTypeConversion.java +++ b/processing/src/main/java/org/apache/druid/math/expr/ExpressionTypeConversion.java @@ -55,6 +55,9 @@ public static ExpressionType autoDetect(ExprEval eval, ExprEval otherEval) { ExpressionType type = eval.type(); ExpressionType otherType = otherEval.type(); + if (type == otherType && type.getType().isPrimitive()) { + return type; + } if (Types.is(type, ExprType.STRING) && Types.is(otherType, ExprType.STRING)) { return ExpressionType.STRING; } diff --git a/processing/src/test/java/org/apache/druid/math/expr/ExprTest.java b/processing/src/test/java/org/apache/druid/math/expr/ExprTest.java index 3c089ac5b6af..f8d222e08383 100644 --- a/processing/src/test/java/org/apache/druid/math/expr/ExprTest.java +++ b/processing/src/test/java/org/apache/druid/math/expr/ExprTest.java @@ -139,7 +139,7 @@ public void testEqualsContractForUnaryMinusExpr() public void testEqualsContractForStringExpr() { EqualsVerifier.forClass(StringExpr.class) - .withIgnoredFields("outputType") + .withIgnoredFields("outputType", "expr") .withPrefabValues(ExpressionType.class, ExpressionType.STRING, ExpressionType.DOUBLE) .usingGetClass() .verify(); @@ -149,7 +149,7 @@ public void testEqualsContractForStringExpr() public void testEqualsContractForDoubleExpr() { EqualsVerifier.forClass(DoubleExpr.class) - .withIgnoredFields("outputType") + .withIgnoredFields("outputType", "expr") .withPrefabValues(ExpressionType.class, ExpressionType.DOUBLE, ExpressionType.LONG) .usingGetClass() .verify(); @@ -159,7 +159,7 @@ public void testEqualsContractForDoubleExpr() public void testEqualsContractForLongExpr() { EqualsVerifier.forClass(LongExpr.class) - .withIgnoredFields("outputType") + .withIgnoredFields("outputType", "expr") .withPrefabValues(ExpressionType.class, ExpressionType.LONG, ExpressionType.STRING) .usingGetClass() .verify(); @@ -187,6 +187,7 @@ public void testEqualsContractForComplexExpr() ExpressionTypeFactory.getInstance().ofComplex("bar") ) .withNonnullFields("outputType") + .withIgnoredFields("expr") .usingGetClass() .verify(); } diff --git a/processing/src/test/java/org/apache/druid/math/expr/OutputTypeTest.java b/processing/src/test/java/org/apache/druid/math/expr/OutputTypeTest.java index f8d663abc70b..d313749fef8b 100644 --- a/processing/src/test/java/org/apache/druid/math/expr/OutputTypeTest.java +++ b/processing/src/test/java/org/apache/druid/math/expr/OutputTypeTest.java @@ -453,6 +453,8 @@ public void testEvalAutoConversion() final ExprEval longEval = ExprEval.of(1L); final ExprEval doubleEval = ExprEval.of(1.0); final ExprEval arrayEval = ExprEval.ofLongArray(new Long[]{1L, 2L, 3L}); + final ExprEval complexEval = ExprEval.ofComplex(ExpressionType.UNKNOWN_COMPLEX, new Object()); + final ExprEval complexEval2 = ExprEval.ofComplex(new ExpressionType(ExprType.COMPLEX, null, null), new Object()); // only long stays long Assert.assertEquals(ExpressionType.LONG, ExpressionTypeConversion.autoDetect(longEval, longEval)); @@ -479,6 +481,15 @@ public void testEvalAutoConversion() Assert.assertEquals(ExpressionType.DOUBLE, ExpressionTypeConversion.autoDetect(nullStringEval, arrayEval)); Assert.assertEquals(ExpressionType.DOUBLE, ExpressionTypeConversion.autoDetect(doubleEval, arrayEval)); Assert.assertEquals(ExpressionType.DOUBLE, ExpressionTypeConversion.autoDetect(longEval, arrayEval)); + + Assert.assertEquals(ExpressionType.DOUBLE, ExpressionTypeConversion.autoDetect(longEval, complexEval)); + Assert.assertEquals(ExpressionType.DOUBLE, ExpressionTypeConversion.autoDetect(doubleEval, complexEval)); + Assert.assertEquals(ExpressionType.DOUBLE, ExpressionTypeConversion.autoDetect(arrayEval, complexEval)); + Assert.assertEquals(ExpressionType.DOUBLE, ExpressionTypeConversion.autoDetect(complexEval, complexEval)); + Assert.assertEquals( + ExpressionTypeConversion.autoDetect(complexEval, complexEval), + ExpressionTypeConversion.autoDetect(complexEval2, complexEval) + ); } @Test From da6b3cbc5117632dd736f66b815f39488e86ddd1 Mon Sep 17 00:00:00 2001 From: Abhishek Radhakrishnan Date: Fri, 15 Dec 2023 12:12:03 -0500 Subject: [PATCH 06/14] Detect EXPLAIN PLAN queries in web-console (#15570) --- web-console/src/utils/sql.spec.ts | 232 ++++++++++++++++++++++++++++++ web-console/src/utils/sql.ts | 2 +- 2 files changed, 233 insertions(+), 1 deletion(-) diff --git a/web-console/src/utils/sql.spec.ts b/web-console/src/utils/sql.spec.ts index 2e4fe5ee718a..9f107b1c45fc 100644 --- a/web-console/src/utils/sql.spec.ts +++ b/web-console/src/utils/sql.spec.ts @@ -351,5 +351,237 @@ describe('sql', () => { ] `); }); + + it('works with explain plan query', () => { + const text = sane` + EXPLAIN PLAN FOR + INSERT INTO "wikipedia" + WITH "ext" AS ( + SELECT * + FROM TABLE( + EXTERN( + '{"type":"http","uris":["https://druid.apache.org/data/wikipedia.json.gz"]}', + '{"type":"json"}' + ) + ) EXTEND ("isRobot" VARCHAR, "channel" VARCHAR, "timestamp" VARCHAR) + ) + SELECT + TIME_PARSE("timestamp") AS "__time", + "isRobot", + "channel" + FROM "ext" + PARTITIONED BY DAY + CLUSTERED BY "channel" + `; + + const found = findAllSqlQueriesInText(text); + + expect(found).toMatchInlineSnapshot(` + Array [ + Object { + "endOffset": 404, + "endRowColumn": Object { + "column": 22, + "row": 17, + }, + "sql": "EXPLAIN PLAN FOR + INSERT INTO \\"wikipedia\\" + WITH \\"ext\\" AS ( + SELECT * + FROM TABLE( + EXTERN( + '{\\"type\\":\\"http\\",\\"uris\\":[\\"https://druid.apache.org/data/wikipedia.json.gz\\"]}', + '{\\"type\\":\\"json\\"}' + ) + ) EXTEND (\\"isRobot\\" VARCHAR, \\"channel\\" VARCHAR, \\"timestamp\\" VARCHAR) + ) + SELECT + TIME_PARSE(\\"timestamp\\") AS \\"__time\\", + \\"isRobot\\", + \\"channel\\" + FROM \\"ext\\" + PARTITIONED BY DAY + CLUSTERED BY \\"channel\\"", + "startOffset": 0, + "startRowColumn": Object { + "column": 0, + "row": 0, + }, + }, + Object { + "endOffset": 404, + "endRowColumn": Object { + "column": 22, + "row": 17, + }, + "sql": "INSERT INTO \\"wikipedia\\" + WITH \\"ext\\" AS ( + SELECT * + FROM TABLE( + EXTERN( + '{\\"type\\":\\"http\\",\\"uris\\":[\\"https://druid.apache.org/data/wikipedia.json.gz\\"]}', + '{\\"type\\":\\"json\\"}' + ) + ) EXTEND (\\"isRobot\\" VARCHAR, \\"channel\\" VARCHAR, \\"timestamp\\" VARCHAR) + ) + SELECT + TIME_PARSE(\\"timestamp\\") AS \\"__time\\", + \\"isRobot\\", + \\"channel\\" + FROM \\"ext\\" + PARTITIONED BY DAY + CLUSTERED BY \\"channel\\"", + "startOffset": 17, + "startRowColumn": Object { + "column": 0, + "row": 1, + }, + }, + Object { + "endOffset": 362, + "endRowColumn": Object { + "column": 10, + "row": 15, + }, + "sql": "WITH \\"ext\\" AS ( + SELECT * + FROM TABLE( + EXTERN( + '{\\"type\\":\\"http\\",\\"uris\\":[\\"https://druid.apache.org/data/wikipedia.json.gz\\"]}', + '{\\"type\\":\\"json\\"}' + ) + ) EXTEND (\\"isRobot\\" VARCHAR, \\"channel\\" VARCHAR, \\"timestamp\\" VARCHAR) + ) + SELECT + TIME_PARSE(\\"timestamp\\") AS \\"__time\\", + \\"isRobot\\", + \\"channel\\" + FROM \\"ext\\"", + "startOffset": 41, + "startRowColumn": Object { + "column": 0, + "row": 2, + }, + }, + Object { + "endOffset": 278, + "endRowColumn": Object { + "column": 70, + "row": 9, + }, + "sql": "SELECT * + FROM TABLE( + EXTERN( + '{\\"type\\":\\"http\\",\\"uris\\":[\\"https://druid.apache.org/data/wikipedia.json.gz\\"]}', + '{\\"type\\":\\"json\\"}' + ) + ) EXTEND (\\"isRobot\\" VARCHAR, \\"channel\\" VARCHAR, \\"timestamp\\" VARCHAR)", + "startOffset": 59, + "startRowColumn": Object { + "column": 2, + "row": 3, + }, + }, + Object { + "endOffset": 362, + "endRowColumn": Object { + "column": 10, + "row": 15, + }, + "sql": "SELECT + TIME_PARSE(\\"timestamp\\") AS \\"__time\\", + \\"isRobot\\", + \\"channel\\" + FROM \\"ext\\"", + "startOffset": 281, + "startRowColumn": Object { + "column": 0, + "row": 11, + }, + }, + ] + `); + }); + + it('works with multiple explain plan queries', () => { + const text = sane` + EXPLAIN PLAN FOR + SELECT * + FROM wikipedia + + EXPLAIN PLAN FOR + SELECT * + FROM w2 + LIMIT 5 + + `; + + const found = findAllSqlQueriesInText(text); + + expect(found).toMatchInlineSnapshot(` + Array [ + Object { + "endOffset": 40, + "endRowColumn": Object { + "column": 14, + "row": 2, + }, + "sql": "EXPLAIN PLAN FOR + SELECT * + FROM wikipedia", + "startOffset": 0, + "startRowColumn": Object { + "column": 0, + "row": 0, + }, + }, + Object { + "endOffset": 40, + "endRowColumn": Object { + "column": 14, + "row": 2, + }, + "sql": "SELECT * + FROM wikipedia", + "startOffset": 17, + "startRowColumn": Object { + "column": 0, + "row": 1, + }, + }, + Object { + "endOffset": 83, + "endRowColumn": Object { + "column": 7, + "row": 7, + }, + "sql": "EXPLAIN PLAN FOR + SELECT * + FROM w2 + LIMIT 5", + "startOffset": 42, + "startRowColumn": Object { + "column": 0, + "row": 4, + }, + }, + Object { + "endOffset": 83, + "endRowColumn": Object { + "column": 7, + "row": 7, + }, + "sql": "SELECT * + FROM w2 + LIMIT 5", + "startOffset": 59, + "startRowColumn": Object { + "column": 0, + "row": 5, + }, + }, + ] + `); + }); }); }); diff --git a/web-console/src/utils/sql.ts b/web-console/src/utils/sql.ts index 61cd4c7ed41e..b80c5ea457d5 100644 --- a/web-console/src/utils/sql.ts +++ b/web-console/src/utils/sql.ts @@ -123,7 +123,7 @@ export function findAllSqlQueriesInText(text: string): QuerySlice[] { let offset = 0; let m: RegExpExecArray | null = null; do { - m = /SELECT|WITH|INSERT|REPLACE/i.exec(remainingText); + m = /SELECT|WITH|INSERT|REPLACE|EXPLAIN/i.exec(remainingText); if (m) { const sql = findSqlQueryPrefix(remainingText.slice(m.index)); const advanceBy = m.index + m[0].length; // Skip the initial word From 901ebbb744501252726e5d6c86541f1c6d46c131 Mon Sep 17 00:00:00 2001 From: Tom Date: Fri, 15 Dec 2023 09:21:21 -0800 Subject: [PATCH 07/14] Allow for kafka emitter producer secrets to be masked in logs (#15485) * Allow for kafka emitter producer secrets to be masked in logs instead of being visible This change will allow for kafka producer config values that should be secrets to not show up in the logs. This will enhance the security of the people who use the kafka emitter to use this if they want to. This is opt in and will not affect prior configs for this emitter * fix checkstyle issue * change property name --- .../druid/emitter/kafka/KafkaEmitter.java | 1 + .../emitter/kafka/KafkaEmitterConfig.java | 23 ++++- .../emitter/kafka/KafkaEmitterConfigTest.java | 95 +++++++++++++------ .../druid/emitter/kafka/KafkaEmitterTest.java | 2 +- .../utils/DynamicConfigProviderUtils.java | 8 +- 5 files changed, 89 insertions(+), 40 deletions(-) diff --git a/extensions-contrib/kafka-emitter/src/main/java/org/apache/druid/emitter/kafka/KafkaEmitter.java b/extensions-contrib/kafka-emitter/src/main/java/org/apache/druid/emitter/kafka/KafkaEmitter.java index c776e3f2f8b2..7485cbaab6de 100644 --- a/extensions-contrib/kafka-emitter/src/main/java/org/apache/druid/emitter/kafka/KafkaEmitter.java +++ b/extensions-contrib/kafka-emitter/src/main/java/org/apache/druid/emitter/kafka/KafkaEmitter.java @@ -115,6 +115,7 @@ protected Producer setKafkaProducer() props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); props.put(ProducerConfig.RETRIES_CONFIG, DEFAULT_RETRIES); props.putAll(config.getKafkaProducerConfig()); + props.putAll(config.getKafkaProducerSecrets().getConfig()); return new KafkaProducer<>(props); } diff --git a/extensions-contrib/kafka-emitter/src/main/java/org/apache/druid/emitter/kafka/KafkaEmitterConfig.java b/extensions-contrib/kafka-emitter/src/main/java/org/apache/druid/emitter/kafka/KafkaEmitterConfig.java index 019edd095ea4..d6d823c0a88e 100644 --- a/extensions-contrib/kafka-emitter/src/main/java/org/apache/druid/emitter/kafka/KafkaEmitterConfig.java +++ b/extensions-contrib/kafka-emitter/src/main/java/org/apache/druid/emitter/kafka/KafkaEmitterConfig.java @@ -26,6 +26,8 @@ import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.metadata.DynamicConfigProvider; +import org.apache.druid.metadata.MapStringDynamicConfigProvider; import org.apache.kafka.clients.producer.ProducerConfig; import javax.annotation.Nullable; @@ -73,6 +75,8 @@ public static EventType fromString(String name) private final String clusterName; @JsonProperty("producer.config") private final Map kafkaProducerConfig; + @JsonProperty("producer.hiddenProperties") + private final DynamicConfigProvider kafkaProducerSecrets; @JsonCreator public KafkaEmitterConfig( @@ -83,7 +87,8 @@ public KafkaEmitterConfig( @Nullable @JsonProperty("request.topic") String requestTopic, @Nullable @JsonProperty("segmentMetadata.topic") String segmentMetadataTopic, @JsonProperty("clusterName") String clusterName, - @JsonProperty("producer.config") @Nullable Map kafkaProducerConfig + @JsonProperty("producer.config") @Nullable Map kafkaProducerConfig, + @JsonProperty("producer.hiddenProperties") @Nullable DynamicConfigProvider kafkaProducerSecrets ) { this.bootstrapServers = Preconditions.checkNotNull(bootstrapServers, "druid.emitter.kafka.bootstrap.servers can not be null"); @@ -94,6 +99,7 @@ public KafkaEmitterConfig( this.segmentMetadataTopic = this.eventTypes.contains(EventType.SEGMENT_METADATA) ? Preconditions.checkNotNull(segmentMetadataTopic, "druid.emitter.kafka.segmentMetadata.topic can not be null") : null; this.clusterName = clusterName; this.kafkaProducerConfig = kafkaProducerConfig == null ? ImmutableMap.of() : kafkaProducerConfig; + this.kafkaProducerSecrets = kafkaProducerSecrets == null ? new MapStringDynamicConfigProvider(ImmutableMap.of()) : kafkaProducerSecrets; } private Set maybeUpdateEventTypes(Set eventTypes, String requestTopic) @@ -159,6 +165,12 @@ public Map getKafkaProducerConfig() return kafkaProducerConfig; } + @JsonProperty + public DynamicConfigProvider getKafkaProducerSecrets() + { + return kafkaProducerSecrets; + } + @Override public boolean equals(Object o) { @@ -198,7 +210,10 @@ public boolean equals(Object o) if (getClusterName() != null ? !getClusterName().equals(that.getClusterName()) : that.getClusterName() != null) { return false; } - return getKafkaProducerConfig().equals(that.getKafkaProducerConfig()); + if (!getKafkaProducerConfig().equals(that.getKafkaProducerConfig())) { + return false; + } + return getKafkaProducerSecrets().getConfig().equals(that.getKafkaProducerSecrets().getConfig()); } @Override @@ -212,6 +227,7 @@ public int hashCode() result = 31 * result + (getSegmentMetadataTopic() != null ? getSegmentMetadataTopic().hashCode() : 0); result = 31 * result + (getClusterName() != null ? getClusterName().hashCode() : 0); result = 31 * result + getKafkaProducerConfig().hashCode(); + result = 31 * result + getKafkaProducerSecrets().getConfig().hashCode(); return result; } @@ -226,7 +242,8 @@ public String toString() ", request.topic='" + requestTopic + '\'' + ", segmentMetadata.topic='" + segmentMetadataTopic + '\'' + ", clusterName='" + clusterName + '\'' + - ", Producer.config=" + kafkaProducerConfig + + ", producer.config=" + kafkaProducerConfig + '\'' + + ", producer.hiddenProperties=" + kafkaProducerSecrets + '}'; } } diff --git a/extensions-contrib/kafka-emitter/src/test/java/org/apache/druid/emitter/kafka/KafkaEmitterConfigTest.java b/extensions-contrib/kafka-emitter/src/test/java/org/apache/druid/emitter/kafka/KafkaEmitterConfigTest.java index c4d5811bcb53..603c8e6701bf 100644 --- a/extensions-contrib/kafka-emitter/src/test/java/org/apache/druid/emitter/kafka/KafkaEmitterConfigTest.java +++ b/extensions-contrib/kafka-emitter/src/test/java/org/apache/druid/emitter/kafka/KafkaEmitterConfigTest.java @@ -25,48 +25,67 @@ import com.fasterxml.jackson.databind.exc.ValueInstantiationException; import com.google.common.collect.ImmutableMap; import org.apache.druid.jackson.DefaultObjectMapper; +import org.apache.druid.metadata.DynamicConfigProvider; +import org.apache.druid.metadata.MapStringDynamicConfigProvider; import org.junit.Assert; import org.junit.Before; import org.junit.Test; + import java.io.IOException; import java.util.HashSet; import java.util.Set; public class KafkaEmitterConfigTest { - private ObjectMapper mapper = new DefaultObjectMapper(); + private static final DynamicConfigProvider DEFAULT_PRODUCER_SECRETS = new MapStringDynamicConfigProvider( + ImmutableMap.of("testSecretKey", "testSecretValue")); + private static final ObjectMapper MAPPER = new DefaultObjectMapper(); @Before public void setUp() { - mapper.setInjectableValues(new InjectableValues.Std().addValue(ObjectMapper.class, new DefaultObjectMapper())); + MAPPER.setInjectableValues(new InjectableValues.Std().addValue(ObjectMapper.class, new DefaultObjectMapper())); } @Test public void testSerDeserKafkaEmitterConfig() throws IOException { - KafkaEmitterConfig kafkaEmitterConfig = new KafkaEmitterConfig("hostname", null, "metricTest", - "alertTest", "requestTest", "metadataTest", - "clusterNameTest", ImmutableMap.builder() - .put("testKey", "testValue").build() + KafkaEmitterConfig kafkaEmitterConfig = new KafkaEmitterConfig( + "hostname", + null, + "metricTest", + "alertTest", + "requestTest", + "metadataTest", + "clusterNameTest", + ImmutableMap.builder() + .put("testKey", "testValue").build(), + DEFAULT_PRODUCER_SECRETS ); - String kafkaEmitterConfigString = mapper.writeValueAsString(kafkaEmitterConfig); - KafkaEmitterConfig kafkaEmitterConfigExpected = mapper.readerFor(KafkaEmitterConfig.class) - .readValue(kafkaEmitterConfigString); + String kafkaEmitterConfigString = MAPPER.writeValueAsString(kafkaEmitterConfig); + KafkaEmitterConfig kafkaEmitterConfigExpected = MAPPER.readerFor(KafkaEmitterConfig.class) + .readValue(kafkaEmitterConfigString); Assert.assertEquals(kafkaEmitterConfigExpected, kafkaEmitterConfig); } @Test public void testSerDeserKafkaEmitterConfigNullRequestTopic() throws IOException { - KafkaEmitterConfig kafkaEmitterConfig = new KafkaEmitterConfig("hostname", null, "metricTest", - "alertTest", null, "metadataTest", - "clusterNameTest", ImmutableMap.builder() - .put("testKey", "testValue").build() + KafkaEmitterConfig kafkaEmitterConfig = new KafkaEmitterConfig( + "hostname", + null, + "metricTest", + "alertTest", + null, + "metadataTest", + "clusterNameTest", + ImmutableMap.builder() + .put("testKey", "testValue").build(), + DEFAULT_PRODUCER_SECRETS ); - String kafkaEmitterConfigString = mapper.writeValueAsString(kafkaEmitterConfig); - KafkaEmitterConfig kafkaEmitterConfigExpected = mapper.readerFor(KafkaEmitterConfig.class) - .readValue(kafkaEmitterConfigString); + String kafkaEmitterConfigString = MAPPER.writeValueAsString(kafkaEmitterConfig); + KafkaEmitterConfig kafkaEmitterConfigExpected = MAPPER.readerFor(KafkaEmitterConfig.class) + .readValue(kafkaEmitterConfigString); Assert.assertEquals(kafkaEmitterConfigExpected, kafkaEmitterConfig); } @@ -75,27 +94,34 @@ public void testSerDeserKafkaEmitterConfigNullMetricsTopic() throws IOException { Set eventTypeSet = new HashSet(); eventTypeSet.add(KafkaEmitterConfig.EventType.SEGMENT_METADATA); - KafkaEmitterConfig kafkaEmitterConfig = new KafkaEmitterConfig("hostname", eventTypeSet, null, - null, null, "metadataTest", - "clusterNameTest", ImmutableMap.builder() - .put("testKey", "testValue").build() + KafkaEmitterConfig kafkaEmitterConfig = new KafkaEmitterConfig( + "hostname", + eventTypeSet, + null, + null, + null, + "metadataTest", + "clusterNameTest", + ImmutableMap.builder() + .put("testKey", "testValue").build(), + DEFAULT_PRODUCER_SECRETS ); - String kafkaEmitterConfigString = mapper.writeValueAsString(kafkaEmitterConfig); - KafkaEmitterConfig kafkaEmitterConfigExpected = mapper.readerFor(KafkaEmitterConfig.class) - .readValue(kafkaEmitterConfigString); + String kafkaEmitterConfigString = MAPPER.writeValueAsString(kafkaEmitterConfig); + KafkaEmitterConfig kafkaEmitterConfigExpected = MAPPER.readerFor(KafkaEmitterConfig.class) + .readValue(kafkaEmitterConfigString); Assert.assertEquals(kafkaEmitterConfigExpected, kafkaEmitterConfig); } @Test - public void testSerDeNotRequiredKafkaProducerConfig() + public void testSerDeNotRequiredKafkaProducerConfigOrKafkaSecretProducer() { KafkaEmitterConfig kafkaEmitterConfig = new KafkaEmitterConfig("localhost:9092", null, "metricTest", - "alertTest", null, "metadataTest", - "clusterNameTest", null + "alertTest", null, "metadataTest", + "clusterNameTest", null, null ); try { @SuppressWarnings("unused") - KafkaEmitter emitter = new KafkaEmitter(kafkaEmitterConfig, mapper); + KafkaEmitter emitter = new KafkaEmitter(kafkaEmitterConfig, MAPPER); } catch (NullPointerException e) { Assert.fail(); @@ -105,9 +131,18 @@ public void testSerDeNotRequiredKafkaProducerConfig() @Test public void testDeserializeEventTypesWithDifferentCase() throws JsonProcessingException { - Assert.assertEquals(KafkaEmitterConfig.EventType.SEGMENT_METADATA, mapper.readValue("\"segment_metadata\"", KafkaEmitterConfig.EventType.class)); - Assert.assertEquals(KafkaEmitterConfig.EventType.ALERTS, mapper.readValue("\"alerts\"", KafkaEmitterConfig.EventType.class)); - Assert.assertThrows(ValueInstantiationException.class, () -> mapper.readValue("\"segmentMetadata\"", KafkaEmitterConfig.EventType.class)); + Assert.assertEquals( + KafkaEmitterConfig.EventType.SEGMENT_METADATA, + MAPPER.readValue("\"segment_metadata\"", KafkaEmitterConfig.EventType.class) + ); + Assert.assertEquals( + KafkaEmitterConfig.EventType.ALERTS, + MAPPER.readValue("\"alerts\"", KafkaEmitterConfig.EventType.class) + ); + Assert.assertThrows( + ValueInstantiationException.class, + () -> MAPPER.readValue("\"segmentMetadata\"", KafkaEmitterConfig.EventType.class) + ); } @Test diff --git a/extensions-contrib/kafka-emitter/src/test/java/org/apache/druid/emitter/kafka/KafkaEmitterTest.java b/extensions-contrib/kafka-emitter/src/test/java/org/apache/druid/emitter/kafka/KafkaEmitterTest.java index f39e8e826063..1c30bee12fa2 100644 --- a/extensions-contrib/kafka-emitter/src/test/java/org/apache/druid/emitter/kafka/KafkaEmitterTest.java +++ b/extensions-contrib/kafka-emitter/src/test/java/org/apache/druid/emitter/kafka/KafkaEmitterTest.java @@ -106,7 +106,7 @@ public void testKafkaEmitter() throws InterruptedException ObjectMapper mapper = new ObjectMapper(); mapper.registerModule(new JodaModule()); final KafkaEmitter kafkaEmitter = new KafkaEmitter( - new KafkaEmitterConfig("", eventsType, "metrics", "alerts", requestTopic, "metadata", "test-cluster", null), + new KafkaEmitterConfig("", eventsType, "metrics", "alerts", requestTopic, "metadata", "test-cluster", null, null), mapper ) { diff --git a/processing/src/main/java/org/apache/druid/utils/DynamicConfigProviderUtils.java b/processing/src/main/java/org/apache/druid/utils/DynamicConfigProviderUtils.java index 4c45262aba0d..0b47116f0e7b 100644 --- a/processing/src/main/java/org/apache/druid/utils/DynamicConfigProviderUtils.java +++ b/processing/src/main/java/org/apache/druid/utils/DynamicConfigProviderUtils.java @@ -38,9 +38,7 @@ public static Map extraConfigAndSetStringMap(Map } } Map dynamicConfig = extraConfigFromProvider(config.get(dynamicConfigProviderKey), mapper); - for (Map.Entry entry : dynamicConfig.entrySet()) { - newConfig.put(entry.getKey(), entry.getValue()); - } + newConfig.putAll(dynamicConfig); } return newConfig; } @@ -55,9 +53,7 @@ public static Map extraConfigAndSetObjectMap(Map } } Map dynamicConfig = extraConfigFromProvider(config.get(dynamicConfigProviderKey), mapper); - for (Map.Entry entry : dynamicConfig.entrySet()) { - newConfig.put(entry.getKey(), entry.getValue()); - } + newConfig.putAll(dynamicConfig); } return newConfig; } From fa2c8edb5d08c1b77c502bef532bd3433118570a Mon Sep 17 00:00:00 2001 From: Jan Werner <105367074+janjwerner-confluent@users.noreply.github.com> Date: Fri, 15 Dec 2023 13:33:14 -0500 Subject: [PATCH 08/14] unpin snakeyaml, add suppressions and licenses (#15549) * unpin snakeyaml globally, add suppressions and licenses * pin snakeyaml in the specific modules that require version 1.x, update licenses and owasp suppression This removes the pin of the Snakeyaml introduced in: https://github.com/apache/druid/pull/14519 After the updates of io.kubernetes.java-client and io.confluent.kafka-clients, the only uses of the Snakeyaml 1.x are: - in test scope, transitive dependency of jackson-dataformat-yaml:jar:2.12.7 - in compile scope in contrib extension druid-cassandra-storage - in compile scope in it-tests. With the dependency version un-pinned, io.kubernetes.java-client and io.confluent.kafka-clients bring Snakeyaml versions 2.0 and 2.2, consequently allowing to build a Druid distribution without the contrib-extension and free of vulnerable Snakeyaml versions. --- extensions-contrib/cassandra-storage/pom.xml | 15 +++++++++++++++ .../kubernetes-overlord-extensions/pom.xml | 12 ++++++++++++ integration-tests/pom.xml | 13 +++++++++++++ licenses.yaml | 14 +++++++++++++- owasp-dependency-check-suppressions.xml | 19 +++++++++++++++---- pom.xml | 5 ----- 6 files changed, 68 insertions(+), 10 deletions(-) diff --git a/extensions-contrib/cassandra-storage/pom.xml b/extensions-contrib/cassandra-storage/pom.xml index 9dc2faefcec4..0760ed3cd3d4 100644 --- a/extensions-contrib/cassandra-storage/pom.xml +++ b/extensions-contrib/cassandra-storage/pom.xml @@ -33,6 +33,21 @@ ../../pom.xml + + + + + org.yaml + snakeyaml + 1.33 + + + + org.apache.druid diff --git a/extensions-contrib/kubernetes-overlord-extensions/pom.xml b/extensions-contrib/kubernetes-overlord-extensions/pom.xml index 346cc3a00a9c..84770340f240 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/pom.xml +++ b/extensions-contrib/kubernetes-overlord-extensions/pom.xml @@ -34,6 +34,18 @@ ../../pom.xml + + + + + org.yaml + snakeyaml + 1.33 + + + diff --git a/integration-tests/pom.xml b/integration-tests/pom.xml index 1b81d653fa9b..2193dafd8552 100644 --- a/integration-tests/pom.xml +++ b/integration-tests/pom.xml @@ -43,6 +43,19 @@ org.apache.hadoop.fs.s3a.S3AFileSystem + + + + + org.yaml + snakeyaml + 1.33 + + + + com.amazonaws diff --git a/licenses.yaml b/licenses.yaml index 4367878cbd07..0e96c2ffb1be 100644 --- a/licenses.yaml +++ b/licenses.yaml @@ -1022,7 +1022,7 @@ name: org.yaml snakeyaml license_category: binary module: extensions/druid-kubernetes-extensions license_name: Apache License version 2.0 -version: 1.33 +version: 2.2 libraries: - org.yaml: snakeyaml @@ -2885,6 +2885,18 @@ libraries: - io.confluent: kafka-schema-registry-client - io.confluent: common-utils +--- + +name: org.yaml snakeyaml +license_category: binary +module: extensions/druid-protobuf-extensions +license_name: Apache License version 2.0 +version: 2.0 +libraries: + - org.yaml: snakeyaml + + + --- name: Confluent Kafka Client diff --git a/owasp-dependency-check-suppressions.xml b/owasp-dependency-check-suppressions.xml index 199087e4497b..320a9bcfbc11 100644 --- a/owasp-dependency-check-suppressions.xml +++ b/owasp-dependency-check-suppressions.xml @@ -275,12 +275,23 @@ - - - + + + CVE-2022-1471 CVE-2023-2251 diff --git a/pom.xml b/pom.xml index 59d8bacb6396..81cb00bb0cf5 100644 --- a/pom.xml +++ b/pom.xml @@ -364,11 +364,6 @@ json-smart 2.4.11 - - org.yaml - snakeyaml - 1.33 -