diff --git a/integration-tests/k8s/tiny-cluster.yaml b/integration-tests/k8s/tiny-cluster.yaml index 6db4dbf421b6..0d44522c8ef8 100644 --- a/integration-tests/k8s/tiny-cluster.yaml +++ b/integration-tests/k8s/tiny-cluster.yaml @@ -73,7 +73,6 @@ spec: druid.discovery.type=k8s druid.discovery.k8s.clusterIdentifier=druid-it druid.serverview.type=http - druid.coordinator.loadqueuepeon.type=http druid.indexer.runner.type=httpRemote # Metadata Store diff --git a/server/src/main/java/org/apache/druid/server/coordination/DataSegmentChangeResponse.java b/server/src/main/java/org/apache/druid/server/coordination/DataSegmentChangeResponse.java new file mode 100644 index 000000000000..06625409ca71 --- /dev/null +++ b/server/src/main/java/org/apache/druid/server/coordination/DataSegmentChangeResponse.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.server.coordination; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; + +/** + * Response of a {@link DataSegmentChangeRequest}. Contains the request itself + * and the result {@link SegmentChangeStatus}. + */ +public class DataSegmentChangeResponse +{ + private final DataSegmentChangeRequest request; + private final SegmentChangeStatus status; + + @JsonCreator + public DataSegmentChangeResponse( + @JsonProperty("request") DataSegmentChangeRequest request, + @JsonProperty("status") SegmentChangeStatus status + ) + { + this.request = request; + this.status = status; + } + + @JsonProperty + public DataSegmentChangeRequest getRequest() + { + return request; + } + + @JsonProperty + public SegmentChangeStatus getStatus() + { + return status; + } + + @Override + public String toString() + { + return "DataSegmentChangeResponse{" + + "request=" + request + + ", status=" + status + + '}'; + } +} diff --git a/server/src/main/java/org/apache/druid/server/coordination/SegmentChangeStatus.java b/server/src/main/java/org/apache/druid/server/coordination/SegmentChangeStatus.java new file mode 100644 index 000000000000..c19d1e7914a9 --- /dev/null +++ b/server/src/main/java/org/apache/druid/server/coordination/SegmentChangeStatus.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.server.coordination; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.base.Preconditions; + +import javax.annotation.Nullable; + +/** + * Contains {@link State} of a {@link DataSegmentChangeRequest} and failure + * message, if any. + */ +public class SegmentChangeStatus +{ + public enum State + { + SUCCESS, FAILED, PENDING + } + + private final State state; + @Nullable + private final String failureCause; + + public static final SegmentChangeStatus SUCCESS = new SegmentChangeStatus(State.SUCCESS, null); + public static final SegmentChangeStatus PENDING = new SegmentChangeStatus(State.PENDING, null); + + public static SegmentChangeStatus failed(String cause) + { + return new SegmentChangeStatus(State.FAILED, cause); + } + + @JsonCreator + private SegmentChangeStatus( + @JsonProperty("state") State state, + @JsonProperty("failureCause") @Nullable String failureCause + ) + { + this.state = Preconditions.checkNotNull(state, "state must be non-null"); + this.failureCause = failureCause; + } + + @JsonProperty + public State getState() + { + return state; + } + + @Nullable + @JsonProperty + public String getFailureCause() + { + return failureCause; + } + + @Override + public String toString() + { + return "SegmentChangeStatus{" + + "state=" + state + + ", failureCause='" + failureCause + '\'' + + '}'; + } +} diff --git a/server/src/main/java/org/apache/druid/server/coordination/SegmentLoadDropHandler.java b/server/src/main/java/org/apache/druid/server/coordination/SegmentLoadDropHandler.java index 636894d214fc..791de9b55da6 100644 --- a/server/src/main/java/org/apache/druid/server/coordination/SegmentLoadDropHandler.java +++ b/server/src/main/java/org/apache/druid/server/coordination/SegmentLoadDropHandler.java @@ -19,11 +19,8 @@ package org.apache.druid.server.coordination; -import com.fasterxml.jackson.annotation.JsonCreator; -import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Preconditions; import com.google.common.base.Throwables; import com.google.common.cache.Cache; import com.google.common.cache.CacheBuilder; @@ -98,7 +95,7 @@ public class SegmentLoadDropHandler implements DataSegmentChangeHandler // Keep history of load/drop request status in a LRU cache to maintain idempotency if same request shows up // again and to return status of a completed request. Maximum size of this cache must be significantly greater // than number of pending load/drop requests. so that history is not lost too quickly. - private final Cache> requestStatuses; + private final Cache> requestStatuses; private final Object requestStatusesLock = new Object(); // This is the list of unresolved futures returned to callers of processBatch(List) @@ -320,7 +317,7 @@ public Map getRowCountDistributionPerDataso @Override public void addSegment(DataSegment segment, @Nullable DataSegmentChangeCallback callback) { - Status result = null; + SegmentChangeStatus result = null; try { log.info("Loading segment %s", segment.getId()); /* @@ -349,13 +346,13 @@ each time when addSegment() is called, it has to wait for the lock in order to m throw new SegmentLoadingException(e, "Failed to announce segment[%s]", segment.getId()); } - result = Status.SUCCESS; + result = SegmentChangeStatus.SUCCESS; } catch (Throwable e) { log.makeAlert(e, "Failed to load segment for dataSource") .addData("segment", segment) .emit(); - result = Status.failed(e.toString()); + result = SegmentChangeStatus.failed(e.toString()); } finally { updateRequestStatus(new SegmentChangeRequestLoad(segment), result); @@ -466,7 +463,7 @@ void removeSegment( final boolean scheduleDrop ) { - Status result = null; + SegmentChangeStatus result = null; try { announcer.unannounceSegment(segment); segmentsToDelete.add(segment); @@ -506,13 +503,13 @@ void removeSegment( runnable.run(); } - result = Status.SUCCESS; + result = SegmentChangeStatus.SUCCESS; } catch (Exception e) { log.makeAlert(e, "Failed to remove segment") .addData("segment", segment) .emit(); - result = Status.failed(e.getMessage()); + result = SegmentChangeStatus.failed(e.getMessage()); } finally { updateRequestStatus(new SegmentChangeRequestDrop(segment), result); @@ -522,20 +519,20 @@ void removeSegment( } } - public Collection getPendingDeleteSnapshot() + public Collection getSegmentsToDelete() { return ImmutableList.copyOf(segmentsToDelete); } - public ListenableFuture> processBatch(List changeRequests) + public ListenableFuture> processBatch(List changeRequests) { boolean isAnyRequestDone = false; - Map> statuses = Maps.newHashMapWithExpectedSize(changeRequests.size()); + Map> statuses = Maps.newHashMapWithExpectedSize(changeRequests.size()); for (DataSegmentChangeRequest cr : changeRequests) { - AtomicReference status = processRequest(cr); - if (status.get().getState() != Status.STATE.PENDING) { + AtomicReference status = processRequest(cr); + if (status.get().getState() != SegmentChangeStatus.State.PENDING) { isAnyRequestDone = true; } statuses.put(cr, status); @@ -554,20 +551,20 @@ public ListenableFuture> processBatch(Li return future; } - private AtomicReference processRequest(DataSegmentChangeRequest changeRequest) + private AtomicReference processRequest(DataSegmentChangeRequest changeRequest) { synchronized (requestStatusesLock) { - AtomicReference status = requestStatuses.getIfPresent(changeRequest); + AtomicReference status = requestStatuses.getIfPresent(changeRequest); // If last load/drop request status is failed, here can try that again - if (status == null || status.get().getState() == Status.STATE.FAILED) { + if (status == null || status.get().getState() == SegmentChangeStatus.State.FAILED) { changeRequest.go( new DataSegmentChangeHandler() { @Override public void addSegment(DataSegment segment, DataSegmentChangeCallback callback) { - requestStatuses.put(changeRequest, new AtomicReference<>(Status.PENDING)); + requestStatuses.put(changeRequest, new AtomicReference<>(SegmentChangeStatus.PENDING)); exec.submit( () -> SegmentLoadDropHandler.this.addSegment( ((SegmentChangeRequestLoad) changeRequest).getSegment(), @@ -579,7 +576,7 @@ public void addSegment(DataSegment segment, DataSegmentChangeCallback callback) @Override public void removeSegment(DataSegment segment, DataSegmentChangeCallback callback) { - requestStatuses.put(changeRequest, new AtomicReference<>(Status.PENDING)); + requestStatuses.put(changeRequest, new AtomicReference<>(SegmentChangeStatus.PENDING)); SegmentLoadDropHandler.this.removeSegment( ((SegmentChangeRequestDrop) changeRequest).getSegment(), () -> resolveWaitingFutures(), @@ -589,7 +586,7 @@ public void removeSegment(DataSegment segment, DataSegmentChangeCallback callbac }, this::resolveWaitingFutures ); - } else if (status.get().getState() == Status.STATE.SUCCESS) { + } else if (status.get().getState() == SegmentChangeStatus.State.SUCCESS) { // SUCCESS case, we'll clear up the cached success while serving it to this client // Not doing this can lead to an incorrect response to upcoming clients for a reload requestStatuses.invalidate(changeRequest); @@ -599,13 +596,13 @@ public void removeSegment(DataSegment segment, DataSegmentChangeCallback callbac } } - private void updateRequestStatus(DataSegmentChangeRequest changeRequest, Status result) + private void updateRequestStatus(DataSegmentChangeRequest changeRequest, SegmentChangeStatus result) { if (result == null) { - result = Status.failed("Unknown reason. Check server logs."); + result = SegmentChangeStatus.failed("Unknown reason. Check server logs."); } synchronized (requestStatusesLock) { - AtomicReference statusRef = requestStatuses.getIfPresent(changeRequest); + AtomicReference statusRef = requestStatuses.getIfPresent(changeRequest); if (statusRef != null) { statusRef.set(result); } @@ -773,14 +770,14 @@ public void close() } // Future with cancel() implementation to remove it from "waitingFutures" list - private class CustomSettableFuture extends AbstractFuture> + private class CustomSettableFuture extends AbstractFuture> { private final LinkedHashSet waitingFutures; - private final Map> statusRefs; + private final Map> statusRefs; private CustomSettableFuture( LinkedHashSet waitingFutures, - Map> statusRefs + Map> statusRefs ) { this.waitingFutures = waitingFutures; @@ -794,14 +791,14 @@ public void resolve() return; } - final List result = new ArrayList<>(statusRefs.size()); + final List result = new ArrayList<>(statusRefs.size()); statusRefs.forEach((request, statusRef) -> { // Remove complete statuses from the cache - final Status status = statusRef.get(); - if (status != null && status.getState() != Status.STATE.PENDING) { + final SegmentChangeStatus status = statusRef.get(); + if (status != null && status.getState() != SegmentChangeStatus.State.PENDING) { requestStatuses.invalidate(request); } - result.add(new DataSegmentChangeRequestAndStatus(request, status)); + result.add(new DataSegmentChangeResponse(request, status)); }); set(result); @@ -818,94 +815,5 @@ public boolean cancel(boolean interruptIfRunning) } } - public static class Status - { - public enum STATE - { - SUCCESS, FAILED, PENDING - } - - private final STATE state; - @Nullable - private final String failureCause; - - public static final Status SUCCESS = new Status(STATE.SUCCESS, null); - public static final Status PENDING = new Status(STATE.PENDING, null); - - @JsonCreator - Status( - @JsonProperty("state") STATE state, - @JsonProperty("failureCause") @Nullable String failureCause - ) - { - Preconditions.checkNotNull(state, "state must be non-null"); - this.state = state; - this.failureCause = failureCause; - } - - public static Status failed(String cause) - { - return new Status(STATE.FAILED, cause); - } - - @JsonProperty - public STATE getState() - { - return state; - } - - @Nullable - @JsonProperty - public String getFailureCause() - { - return failureCause; - } - - @Override - public String toString() - { - return "Status{" + - "state=" + state + - ", failureCause='" + failureCause + '\'' + - '}'; - } - } - - public static class DataSegmentChangeRequestAndStatus - { - private final DataSegmentChangeRequest request; - private final Status status; - - @JsonCreator - public DataSegmentChangeRequestAndStatus( - @JsonProperty("request") DataSegmentChangeRequest request, - @JsonProperty("status") Status status - ) - { - this.request = request; - this.status = status; - } - - @JsonProperty - public DataSegmentChangeRequest getRequest() - { - return request; - } - - @JsonProperty - public Status getStatus() - { - return status; - } - - @Override - public String toString() - { - return "DataSegmentChangeRequestAndStatus{" + - "request=" + request + - ", status=" + status + - '}'; - } - } } diff --git a/server/src/main/java/org/apache/druid/server/coordinator/loading/CuratorLoadQueuePeon.java b/server/src/main/java/org/apache/druid/server/coordinator/loading/CuratorLoadQueuePeon.java index a508251ab58e..52b012a81d46 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/loading/CuratorLoadQueuePeon.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/loading/CuratorLoadQueuePeon.java @@ -47,6 +47,7 @@ import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; /** * Use {@link HttpLoadQueuePeon} instead. @@ -75,7 +76,7 @@ public class CuratorLoadQueuePeon implements LoadQueuePeon private final Duration loadTimeout; private final AtomicLong queuedSize = new AtomicLong(0); - private final CoordinatorRunStats stats = new CoordinatorRunStats(); + private final AtomicReference stats = new AtomicReference<>(new CoordinatorRunStats()); /** * Needs to be thread safe since it can be concurrently accessed via @@ -172,7 +173,7 @@ public long getSizeOfSegmentsToLoad() @Override public CoordinatorRunStats getAndResetStats() { - return stats.getSnapshotAndReset(); + return stats.getAndSet(new CoordinatorRunStats()); } @Override @@ -360,7 +361,7 @@ public void stop() timedOutSegments.clear(); queuedSize.set(0L); - stats.clear(); + stats.get().clear(); } private void onZkNodeDeleted(SegmentHolder segmentHolder, String path) @@ -388,7 +389,7 @@ private void failAssign(SegmentHolder segmentHolder, boolean handleTimeout, Exce if (e != null) { log.error(e, "Server[%s], throwable caught when submitting [%s].", basePath, segmentHolder); } - stats.add(Stats.SegmentQueue.FAILED_ACTIONS, 1); + stats.get().add(Stats.SegmentQueue.FAILED_ACTIONS, 1); if (handleTimeout) { // Avoid removing the segment entry from the load/drop list in case config.getLoadTimeoutDelay() expires. diff --git a/server/src/main/java/org/apache/druid/server/coordinator/loading/HttpLoadQueuePeon.java b/server/src/main/java/org/apache/druid/server/coordinator/loading/HttpLoadQueuePeon.java index 4ec6f2427a33..cb32f95516b4 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/loading/HttpLoadQueuePeon.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/loading/HttpLoadQueuePeon.java @@ -35,7 +35,8 @@ import org.apache.druid.server.coordination.DataSegmentChangeCallback; import org.apache.druid.server.coordination.DataSegmentChangeHandler; import org.apache.druid.server.coordination.DataSegmentChangeRequest; -import org.apache.druid.server.coordination.SegmentLoadDropHandler; +import org.apache.druid.server.coordination.DataSegmentChangeResponse; +import org.apache.druid.server.coordination.SegmentChangeStatus; import org.apache.druid.server.coordinator.BytesAccumulatingResponseHandler; import org.apache.druid.server.coordinator.config.HttpLoadQueuePeonConfig; import org.apache.druid.server.coordinator.stats.CoordinatorRunStats; @@ -66,6 +67,7 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; /** * @@ -77,15 +79,15 @@ public class HttpLoadQueuePeon implements LoadQueuePeon { }; - public static final TypeReference> RESPONSE_ENTITY_TYPE_REF = - new TypeReference>() + public static final TypeReference> RESPONSE_ENTITY_TYPE_REF = + new TypeReference>() { }; private static final EmittingLogger log = new EmittingLogger(HttpLoadQueuePeon.class); private final AtomicLong queuedSize = new AtomicLong(0); - private final CoordinatorRunStats stats = new CoordinatorRunStats(); + private final AtomicReference stats = new AtomicReference<>(new CoordinatorRunStats()); private final ConcurrentMap segmentsToLoad = new ConcurrentHashMap<>(); private final ConcurrentMap segmentsToDrop = new ConcurrentHashMap<>(); @@ -191,10 +193,7 @@ private void doSegmentManagement() if (newRequests.size() == 0) { log.trace( "[%s]Found no load/drop requests. SegmentsToLoad[%d], SegmentsToDrop[%d], batchSize[%d].", - serverId, - segmentsToLoad.size(), - segmentsToDrop.size(), - config.getBatchSize() + serverId, segmentsToLoad.size(), segmentsToDrop.size(), config.getBatchSize() ); mainLoopInProgress.set(false); return; @@ -225,7 +224,7 @@ public void onSuccess(InputStream result) log.trace("Received NO CONTENT reseponse from [%s]", serverId); } else if (HttpServletResponse.SC_OK == responseHandler.getStatus()) { try { - List statuses = + List statuses = jsonMapper.readValue(result, RESPONSE_ENTITY_TYPE_REF); log.trace("Server[%s] returned status response [%s].", serverId, statuses); synchronized (lock) { @@ -235,7 +234,7 @@ public void onSuccess(InputStream result) return; } - for (SegmentLoadDropHandler.DataSegmentChangeRequestAndStatus e : statuses) { + for (DataSegmentChangeResponse e : statuses) { switch (e.getStatus().getState()) { case SUCCESS: case FAILED: @@ -300,7 +299,7 @@ private void logRequestFailure(Throwable t) } } - private void handleResponseStatus(DataSegmentChangeRequest changeRequest, SegmentLoadDropHandler.Status status) + private void handleResponseStatus(DataSegmentChangeRequest changeRequest, SegmentChangeStatus status) { changeRequest.go( new DataSegmentChangeHandler() @@ -317,7 +316,7 @@ public void removeSegment(DataSegment segment, DataSegmentChangeCallback callbac updateSuccessOrFailureInHolder(segmentsToDrop.remove(segment), status); } - private void updateSuccessOrFailureInHolder(SegmentHolder holder, SegmentLoadDropHandler.Status status) + private void updateSuccessOrFailureInHolder(SegmentHolder holder, SegmentChangeStatus status) { if (holder == null) { return; @@ -325,7 +324,7 @@ private void updateSuccessOrFailureInHolder(SegmentHolder holder, SegmentLoadDro queuedSegments.remove(holder); activeRequestSegments.remove(holder.getSegment()); - if (status.getState() == SegmentLoadDropHandler.Status.STATE.FAILED) { + if (status.getState() == SegmentChangeStatus.State.FAILED) { onRequestFailed(holder, status.getFailureCause()); } else { onRequestCompleted(holder, RequestStatus.SUCCESS); @@ -380,7 +379,7 @@ public void stop() queuedSegments.clear(); activeRequestSegments.clear(); queuedSize.set(0L); - stats.clear(); + stats.get().clear(); } } @@ -485,7 +484,7 @@ public long getSizeOfSegmentsToLoad() @Override public CoordinatorRunStats getAndResetStats() { - return stats.getSnapshotAndReset(); + return stats.getAndSet(new CoordinatorRunStats()); } @Override @@ -547,7 +546,7 @@ private void incrementStat(SegmentHolder holder, RequestStatus status) { RowKey rowKey = RowKey.with(Dimension.DATASOURCE, holder.getSegment().getDataSource()) .and(Dimension.DESCRIPTION, holder.getAction().name()); - stats.add(status.datasourceStat, rowKey, 1); + stats.get().add(status.datasourceStat, rowKey, 1); } private void executeCallbacks(SegmentHolder holder, boolean success) diff --git a/server/src/main/java/org/apache/druid/server/coordinator/loading/LoadQueuePeon.java b/server/src/main/java/org/apache/druid/server/coordinator/loading/LoadQueuePeon.java index 3ab6d30faf56..49e5f9a7c087 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/loading/LoadQueuePeon.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/loading/LoadQueuePeon.java @@ -25,8 +25,7 @@ import java.util.Set; /** - * This interface exists only to support configurable load queue management via curator or http. Once HttpLoadQueuePeon - * has been verified enough in production, CuratorLoadQueuePeon and this interface would be removed. + * Supports load queue management. */ @Deprecated public interface LoadQueuePeon diff --git a/server/src/main/java/org/apache/druid/server/coordinator/stats/CoordinatorRunStats.java b/server/src/main/java/org/apache/druid/server/coordinator/stats/CoordinatorRunStats.java index cbee9744524b..5412230c2cc0 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/stats/CoordinatorRunStats.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/stats/CoordinatorRunStats.java @@ -26,10 +26,8 @@ import java.util.Collections; import java.util.EnumMap; import java.util.HashMap; -import java.util.HashSet; import java.util.Map; import java.util.Objects; -import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicInteger; @@ -223,31 +221,6 @@ public void updateMax(CoordinatorStat stat, RowKey rowKey, long value) .mergeLong(stat, value, Math::max); } - /** - * Creates a new {@code CoordinatorRunStats} which represents the snapshot of - * the stats collected so far in this instance. - *

- * While this method is in progress, any updates made to the stats of this - * instance by another thread are not guaranteed to be present in the snapshot. - * But the snapshots are consistent, i.e. stats present in the snapshot created - * in one invocation of this method are permanently removed from this instance - * and will not be present in subsequent snapshots. - * - * @return Snapshot of the current state of this {@code CoordinatorRunStats}. - */ - public CoordinatorRunStats getSnapshotAndReset() - { - final CoordinatorRunStats snapshot = new CoordinatorRunStats(debugDimensions); - - // Get a snapshot of all the keys, remove and copy each of them atomically - final Set keys = new HashSet<>(allStats.keySet()); - for (RowKey key : keys) { - snapshot.allStats.put(key, allStats.remove(key)); - } - - return snapshot; - } - /** * Checks if the given rowKey has any of the debug dimensions. */ diff --git a/server/src/main/java/org/apache/druid/server/http/SegmentListerResource.java b/server/src/main/java/org/apache/druid/server/http/SegmentListerResource.java index 1b6f8326773c..1486355a7768 100644 --- a/server/src/main/java/org/apache/druid/server/http/SegmentListerResource.java +++ b/server/src/main/java/org/apache/druid/server/http/SegmentListerResource.java @@ -35,6 +35,7 @@ import org.apache.druid.server.coordination.ChangeRequestHistory; import org.apache.druid.server.coordination.ChangeRequestsSnapshot; import org.apache.druid.server.coordination.DataSegmentChangeRequest; +import org.apache.druid.server.coordination.DataSegmentChangeResponse; import org.apache.druid.server.coordination.SegmentLoadDropHandler; import org.apache.druid.server.coordinator.loading.HttpLoadQueuePeon; import org.apache.druid.server.http.security.StateResourceFilter; @@ -250,7 +251,7 @@ public void applyDataSegmentChangeRequests( } final ResponseContext context = createContext(req.getHeader("Accept")); - final ListenableFuture> future = + final ListenableFuture> future = loadDropRequestHandler.processBatch(changeRequestList); final AsyncContext asyncContext = req.startAsync(); @@ -286,10 +287,10 @@ public void onStartAsync(AsyncEvent event) Futures.addCallback( future, - new FutureCallback>() + new FutureCallback>() { @Override - public void onSuccess(List result) + public void onSuccess(List result) { try { HttpServletResponse response = (HttpServletResponse) asyncContext.getResponse(); diff --git a/server/src/main/java/org/apache/druid/server/metrics/HistoricalMetricsMonitor.java b/server/src/main/java/org/apache/druid/server/metrics/HistoricalMetricsMonitor.java index ce21a2cef910..e7d6a74275b1 100644 --- a/server/src/main/java/org/apache/druid/server/metrics/HistoricalMetricsMonitor.java +++ b/server/src/main/java/org/apache/druid/server/metrics/HistoricalMetricsMonitor.java @@ -58,7 +58,7 @@ public boolean doMonitor(ServiceEmitter emitter) final Object2LongOpenHashMap pendingDeleteSizes = new Object2LongOpenHashMap<>(); - for (DataSegment segment : segmentLoadDropMgr.getPendingDeleteSnapshot()) { + for (DataSegment segment : segmentLoadDropMgr.getSegmentsToDelete()) { pendingDeleteSizes.addTo(segment.getDataSource(), segment.getSize()); } diff --git a/server/src/test/java/org/apache/druid/client/CachingClusteredClientTest.java b/server/src/test/java/org/apache/druid/client/CachingClusteredClientTest.java index 818c6b77a532..d7464e25a6ce 100644 --- a/server/src/test/java/org/apache/druid/client/CachingClusteredClientTest.java +++ b/server/src/test/java/org/apache/druid/client/CachingClusteredClientTest.java @@ -116,7 +116,6 @@ import org.apache.druid.query.topn.TopNResultValue; import org.apache.druid.segment.TestHelper; import org.apache.druid.server.QueryScheduler; -import org.apache.druid.server.ServerTestHelper; import org.apache.druid.server.coordination.ServerType; import org.apache.druid.server.initialization.ServerConfig; import org.apache.druid.server.metrics.NoopServiceEmitter; @@ -1753,7 +1752,7 @@ private ServerSelector makeMockHashBasedSelector( partitions, partitionDimensions, partitionFunction, - ServerTestHelper.MAPPER + TestHelper.makeJsonMapper() ), null, 9, diff --git a/server/src/test/java/org/apache/druid/segment/realtime/plumber/CustomVersioningPolicyTest.java b/server/src/test/java/org/apache/druid/segment/realtime/plumber/CustomVersioningPolicyTest.java index 2def644cf5ac..f145b0e1d79c 100644 --- a/server/src/test/java/org/apache/druid/segment/realtime/plumber/CustomVersioningPolicyTest.java +++ b/server/src/test/java/org/apache/druid/segment/realtime/plumber/CustomVersioningPolicyTest.java @@ -19,7 +19,8 @@ package org.apache.druid.segment.realtime.plumber; -import org.apache.druid.server.ServerTestHelper; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.druid.segment.TestHelper; import org.joda.time.DateTime; import org.joda.time.DateTimeZone; import org.joda.time.Interval; @@ -37,8 +38,9 @@ public void testSerialization() throws Exception CustomVersioningPolicy policy = new CustomVersioningPolicy(version); - CustomVersioningPolicy serialized = ServerTestHelper.MAPPER.readValue( - ServerTestHelper.MAPPER.writeValueAsBytes(policy), + final ObjectMapper mapper = TestHelper.makeJsonMapper(); + CustomVersioningPolicy serialized = mapper.readValue( + mapper.writeValueAsBytes(policy), CustomVersioningPolicy.class ); diff --git a/server/src/test/java/org/apache/druid/server/ServerTestHelper.java b/server/src/test/java/org/apache/druid/server/ServerTestHelper.java deleted file mode 100644 index 784b79a76657..000000000000 --- a/server/src/test/java/org/apache/druid/server/ServerTestHelper.java +++ /dev/null @@ -1,38 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.druid.server; - -import com.fasterxml.jackson.databind.InjectableValues; -import com.fasterxml.jackson.databind.ObjectMapper; -import org.apache.druid.jackson.DefaultObjectMapper; -import org.apache.druid.timeline.DataSegment.PruneSpecsHolder; - -public class ServerTestHelper -{ - public static final ObjectMapper MAPPER = new DefaultObjectMapper(); - - static { - MAPPER.setInjectableValues( - new InjectableValues.Std() - .addValue(ObjectMapper.class.getName(), MAPPER) - .addValue(PruneSpecsHolder.class, PruneSpecsHolder.DEFAULT) - ); - } -} diff --git a/server/src/test/java/org/apache/druid/server/coordination/SegmentLoadDropHandlerTest.java b/server/src/test/java/org/apache/druid/server/coordination/SegmentLoadDropHandlerTest.java index 7000dd544801..5e3fac5c44ae 100644 --- a/server/src/test/java/org/apache/druid/server/coordination/SegmentLoadDropHandlerTest.java +++ b/server/src/test/java/org/apache/druid/server/coordination/SegmentLoadDropHandlerTest.java @@ -39,8 +39,7 @@ import org.apache.druid.segment.loading.StorageLocationConfig; import org.apache.druid.segment.realtime.appenderator.SegmentSchemas; import org.apache.druid.server.SegmentManager; -import org.apache.druid.server.coordination.SegmentLoadDropHandler.DataSegmentChangeRequestAndStatus; -import org.apache.druid.server.coordination.SegmentLoadDropHandler.Status.STATE; +import org.apache.druid.server.coordination.SegmentChangeStatus.State; import org.apache.druid.server.metrics.NoopServiceEmitter; import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.partition.NoneShardSpec; @@ -503,14 +502,14 @@ public void testProcessBatch() throws Exception new SegmentChangeRequestDrop(segment2) ); - ListenableFuture> future = segmentLoadDropHandler + ListenableFuture> future = segmentLoadDropHandler .processBatch(batch); - Map expectedStatusMap = new HashMap<>(); - expectedStatusMap.put(batch.get(0), SegmentLoadDropHandler.Status.PENDING); - expectedStatusMap.put(batch.get(1), SegmentLoadDropHandler.Status.SUCCESS); - List result = future.get(); - for (SegmentLoadDropHandler.DataSegmentChangeRequestAndStatus requestAndStatus : result) { + Map expectedStatusMap = new HashMap<>(); + expectedStatusMap.put(batch.get(0), SegmentChangeStatus.PENDING); + expectedStatusMap.put(batch.get(1), SegmentChangeStatus.SUCCESS); + List result = future.get(); + for (DataSegmentChangeResponse requestAndStatus : result) { Assert.assertEquals(expectedStatusMap.get(requestAndStatus.getRequest()), requestAndStatus.getStatus()); } @@ -519,7 +518,7 @@ public void testProcessBatch() throws Exception } result = segmentLoadDropHandler.processBatch(ImmutableList.of(new SegmentChangeRequestLoad(segment1))).get(); - Assert.assertEquals(SegmentLoadDropHandler.Status.SUCCESS, result.get(0).getStatus()); + Assert.assertEquals(SegmentChangeStatus.SUCCESS, result.get(0).getStatus()); segmentLoadDropHandler.stop(); } @@ -549,21 +548,21 @@ public void testProcessBatchDuplicateLoadRequestsWhenFirstRequestFailsSecondRequ List batch = ImmutableList.of(new SegmentChangeRequestLoad(segment1)); - ListenableFuture> future = segmentLoadDropHandler + ListenableFuture> future = segmentLoadDropHandler .processBatch(batch); for (Runnable runnable : scheduledRunnable) { runnable.run(); } - List result = future.get(); - Assert.assertEquals(STATE.FAILED, result.get(0).getStatus().getState()); + List result = future.get(); + Assert.assertEquals(State.FAILED, result.get(0).getStatus().getState()); future = segmentLoadDropHandler.processBatch(batch); for (Runnable runnable : scheduledRunnable) { runnable.run(); } result = future.get(); - Assert.assertEquals(STATE.SUCCESS, result.get(0).getStatus().getState()); + Assert.assertEquals(State.SUCCESS, result.get(0).getStatus().getState()); segmentLoadDropHandler.stop(); } @@ -597,13 +596,13 @@ public void testProcessBatchLoadDropLoadSequenceForSameSegment() throws Exceptio List batch = ImmutableList.of(new SegmentChangeRequestLoad(segment1)); // Request 1: Load the segment - ListenableFuture> future = segmentLoadDropHandler + ListenableFuture> future = segmentLoadDropHandler .processBatch(batch); for (Runnable runnable : scheduledRunnable) { runnable.run(); } - List result = future.get(); - Assert.assertEquals(STATE.SUCCESS, result.get(0).getStatus().getState()); + List result = future.get(); + Assert.assertEquals(State.SUCCESS, result.get(0).getStatus().getState()); scheduledRunnable.clear(); // Request 2: Drop the segment @@ -613,7 +612,7 @@ public void testProcessBatchLoadDropLoadSequenceForSameSegment() throws Exceptio runnable.run(); } result = future.get(); - Assert.assertEquals(STATE.SUCCESS, result.get(0).getStatus().getState()); + Assert.assertEquals(State.SUCCESS, result.get(0).getStatus().getState()); scheduledRunnable.clear(); // check invocations after a load-drop sequence @@ -633,7 +632,7 @@ public void testProcessBatchLoadDropLoadSequenceForSameSegment() throws Exceptio runnable.run(); } result = future.get(); - Assert.assertEquals(STATE.SUCCESS, result.get(0).getStatus().getState()); + Assert.assertEquals(State.SUCCESS, result.get(0).getStatus().getState()); scheduledRunnable.clear(); // check invocations - 1 more load has happened @@ -653,7 +652,7 @@ public void testProcessBatchLoadDropLoadSequenceForSameSegment() throws Exceptio runnable.run(); } result = future.get(); - Assert.assertEquals(STATE.SUCCESS, result.get(0).getStatus().getState()); + Assert.assertEquals(State.SUCCESS, result.get(0).getStatus().getState()); scheduledRunnable.clear(); // check invocations - the load segment counter should bump up diff --git a/server/src/test/java/org/apache/druid/server/coordination/ZkCoordinatorTest.java b/server/src/test/java/org/apache/druid/server/coordination/ZkCoordinatorTest.java index 8356a26d33ac..fd0547517b84 100644 --- a/server/src/test/java/org/apache/druid/server/coordination/ZkCoordinatorTest.java +++ b/server/src/test/java/org/apache/druid/server/coordination/ZkCoordinatorTest.java @@ -28,11 +28,11 @@ import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.java.util.emitter.EmittingLogger; import org.apache.druid.segment.IndexIO; +import org.apache.druid.segment.TestHelper; import org.apache.druid.segment.loading.SegmentCacheManager; import org.apache.druid.segment.loading.SegmentLoaderConfig; import org.apache.druid.segment.loading.StorageLocationConfig; import org.apache.druid.server.SegmentManager; -import org.apache.druid.server.ServerTestHelper; import org.apache.druid.server.initialization.ZkPathsConfig; import org.apache.druid.server.metrics.NoopServiceEmitter; import org.apache.druid.timeline.DataSegment; @@ -59,7 +59,7 @@ public class ZkCoordinatorTest extends CuratorTestBase { private static final Logger log = new Logger(ZkCoordinatorTest.class); - private final ObjectMapper jsonMapper = ServerTestHelper.MAPPER; + private final ObjectMapper jsonMapper = TestHelper.makeJsonMapper(); private final DruidServerMetadata me = new DruidServerMetadata( "dummyServer", "dummyHost", @@ -135,7 +135,7 @@ public void testLoadDrop() throws Exception CountDownLatch dropLatch = new CountDownLatch(1); SegmentLoadDropHandler segmentLoadDropHandler = new SegmentLoadDropHandler( - ServerTestHelper.MAPPER, + jsonMapper, new SegmentLoaderConfig() { @Override public File getInfoDir() diff --git a/server/src/test/java/org/apache/druid/server/coordinator/CoordinatorRunStatsTest.java b/server/src/test/java/org/apache/druid/server/coordinator/CoordinatorRunStatsTest.java index 6b4a16f14027..6b9dc8443960 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/CoordinatorRunStatsTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/CoordinatorRunStatsTest.java @@ -74,37 +74,6 @@ public void testAddForRowKey() Assert.assertEquals(1, stats.get(Stat.INFO_1, Key.TIER_1)); } - @Test - public void testGetSnapshotAndReset() - { - stats.add(Stat.ERROR_1, 1); - stats.add(Stat.INFO_1, 3); - stats.add(Stat.ERROR_1, Key.TIER_1, 5); - stats.add(Stat.ERROR_1, Key.DUTY_1, 7); - - final CoordinatorRunStats firstFlush = stats.getSnapshotAndReset(); - Assert.assertEquals(1, firstFlush.get(Stat.ERROR_1)); - Assert.assertEquals(3, firstFlush.get(Stat.INFO_1)); - Assert.assertEquals(5, firstFlush.get(Stat.ERROR_1, Key.TIER_1)); - Assert.assertEquals(7, firstFlush.get(Stat.ERROR_1, Key.DUTY_1)); - - Assert.assertEquals(0, stats.rowCount()); - - stats.add(Stat.ERROR_1, 7); - stats.add(Stat.ERROR_1, Key.TIER_1, 5); - stats.add(Stat.INFO_1, Key.DUTY_1, 3); - stats.add(Stat.INFO_2, Key.TIER_1, 1); - - final CoordinatorRunStats secondFlush = stats.getSnapshotAndReset(); - - Assert.assertEquals(7, secondFlush.get(Stat.ERROR_1)); - Assert.assertEquals(5, secondFlush.get(Stat.ERROR_1, Key.TIER_1)); - Assert.assertEquals(3, secondFlush.get(Stat.INFO_1, Key.DUTY_1)); - Assert.assertEquals(1, secondFlush.get(Stat.INFO_2, Key.TIER_1)); - - Assert.assertEquals(0, stats.rowCount()); - } - @Test public void testUpdateMax() { diff --git a/server/src/test/java/org/apache/druid/server/coordinator/loading/LoadQueuePeonTest.java b/server/src/test/java/org/apache/druid/server/coordinator/loading/CuratorLoadQueuePeonTest.java similarity index 99% rename from server/src/test/java/org/apache/druid/server/coordinator/loading/LoadQueuePeonTest.java rename to server/src/test/java/org/apache/druid/server/coordinator/loading/CuratorLoadQueuePeonTest.java index 3e1c816716c9..e22b037707ae 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/loading/LoadQueuePeonTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/loading/CuratorLoadQueuePeonTest.java @@ -53,13 +53,13 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; -public class LoadQueuePeonTest extends CuratorTestBase +public class CuratorLoadQueuePeonTest extends CuratorTestBase { private static final String LOAD_QUEUE_PATH = "/druid/loadqueue/localhost:1234"; private final ObjectMapper jsonMapper = TestHelper.makeJsonMapper(); - private LoadQueuePeon loadQueuePeon; + private CuratorLoadQueuePeon loadQueuePeon; private PathChildrenCache loadQueueCache; @Before diff --git a/server/src/test/java/org/apache/druid/server/coordinator/loading/HttpLoadQueuePeonTest.java b/server/src/test/java/org/apache/druid/server/coordinator/loading/HttpLoadQueuePeonTest.java index de1801d7ad7a..16251130e77e 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/loading/HttpLoadQueuePeonTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/loading/HttpLoadQueuePeonTest.java @@ -20,6 +20,7 @@ package org.apache.druid.server.coordinator.loading; import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import org.apache.druid.java.util.common.RE; @@ -27,11 +28,12 @@ import org.apache.druid.java.util.http.client.HttpClient; import org.apache.druid.java.util.http.client.Request; import org.apache.druid.java.util.http.client.response.HttpResponseHandler; -import org.apache.druid.server.ServerTestHelper; +import org.apache.druid.segment.TestHelper; import org.apache.druid.server.coordination.DataSegmentChangeCallback; import org.apache.druid.server.coordination.DataSegmentChangeHandler; import org.apache.druid.server.coordination.DataSegmentChangeRequest; -import org.apache.druid.server.coordination.SegmentLoadDropHandler; +import org.apache.druid.server.coordination.DataSegmentChangeResponse; +import org.apache.druid.server.coordination.SegmentChangeStatus; import org.apache.druid.server.coordinator.CreateDataSegments; import org.apache.druid.server.coordinator.config.HttpLoadQueuePeonConfig; import org.apache.druid.server.coordinator.simulate.BlockingExecutorService; @@ -63,6 +65,7 @@ */ public class HttpLoadQueuePeonTest { + private static final ObjectMapper MAPPER = TestHelper.makeJsonMapper(); private final List segments = CreateDataSegments.ofDatasource("test") .forIntervals(1, Granularities.DAY) @@ -87,7 +90,7 @@ public void setUp() httpLoadQueuePeon = new HttpLoadQueuePeon( "http://dummy:4000", - ServerTestHelper.MAPPER, + MAPPER, httpClient, new HttpLoadQueuePeonConfig(null, null, 10), new WrappingScheduledExecutorService("HttpLoadQueuePeonTest-%s", processingExecutor, true), @@ -313,23 +316,22 @@ public ListenableFuture go( httpResponse.setContent(ChannelBuffers.buffer(0)); httpResponseHandler.handleResponse(httpResponse, null); try { - List changeRequests = ServerTestHelper.MAPPER.readValue( + List changeRequests = MAPPER.readValue( request.getContent().array(), new TypeReference>() { } ); - List statuses = new ArrayList<>(changeRequests.size()); + List statuses = new ArrayList<>(changeRequests.size()); for (DataSegmentChangeRequest cr : changeRequests) { cr.go(this, null); - statuses.add(new SegmentLoadDropHandler.DataSegmentChangeRequestAndStatus( - cr, - SegmentLoadDropHandler.Status.SUCCESS - )); + statuses.add( + new DataSegmentChangeResponse(cr, SegmentChangeStatus.SUCCESS) + ); } - return (ListenableFuture) Futures.immediateFuture( + return (ListenableFuture) Futures.immediateFuture( new ByteArrayInputStream( - ServerTestHelper.MAPPER + MAPPER .writerWithType(HttpLoadQueuePeon.RESPONSE_ENTITY_TYPE_REF) .writeValueAsBytes(statuses) ) diff --git a/server/src/test/java/org/apache/druid/server/coordinator/simulate/TestSegmentLoadingHttpClient.java b/server/src/test/java/org/apache/druid/server/coordinator/simulate/TestSegmentLoadingHttpClient.java index 0b91e7009026..f7007bfd833d 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/simulate/TestSegmentLoadingHttpClient.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/simulate/TestSegmentLoadingHttpClient.java @@ -30,7 +30,8 @@ import org.apache.druid.server.coordination.DataSegmentChangeCallback; import org.apache.druid.server.coordination.DataSegmentChangeHandler; import org.apache.druid.server.coordination.DataSegmentChangeRequest; -import org.apache.druid.server.coordination.SegmentLoadDropHandler; +import org.apache.druid.server.coordination.DataSegmentChangeResponse; +import org.apache.druid.server.coordination.SegmentChangeStatus; import org.jboss.netty.buffer.ChannelBuffers; import org.jboss.netty.handler.codec.http.DefaultHttpResponse; import org.jboss.netty.handler.codec.http.HttpResponse; @@ -126,7 +127,7 @@ private Final processRequest( /** * Processes all the changes in the request. */ - private List processRequest( + private List processRequest( Request request, DataSegmentChangeHandler changeHandler ) throws IOException @@ -147,21 +148,20 @@ private List processRe /** * Processes each DataSegmentChangeRequest using the handler. */ - private SegmentLoadDropHandler.DataSegmentChangeRequestAndStatus processRequest( + private DataSegmentChangeResponse processRequest( DataSegmentChangeRequest request, DataSegmentChangeHandler handler ) { - SegmentLoadDropHandler.Status status; + SegmentChangeStatus status; try { request.go(handler, NOOP_CALLBACK); - status = SegmentLoadDropHandler.Status.SUCCESS; + status = SegmentChangeStatus.SUCCESS; } catch (Exception e) { - status = SegmentLoadDropHandler.Status.failed(e.getMessage()); + status = SegmentChangeStatus.failed(e.getMessage()); } - return new SegmentLoadDropHandler - .DataSegmentChangeRequestAndStatus(request, status); + return new DataSegmentChangeResponse(request, status); } } diff --git a/server/src/test/java/org/apache/druid/server/metrics/HistoricalMetricsMonitorTest.java b/server/src/test/java/org/apache/druid/server/metrics/HistoricalMetricsMonitorTest.java index 1806903a9baf..92c2a1064e77 100644 --- a/server/src/test/java/org/apache/druid/server/metrics/HistoricalMetricsMonitorTest.java +++ b/server/src/test/java/org/apache/druid/server/metrics/HistoricalMetricsMonitorTest.java @@ -74,7 +74,7 @@ public void testSimple() final String tier = "tier"; EasyMock.expect(druidServerConfig.getMaxSize()).andReturn(maxSize).once(); - EasyMock.expect(segmentLoadDropMgr.getPendingDeleteSnapshot()).andReturn(ImmutableList.of(dataSegment)).once(); + EasyMock.expect(segmentLoadDropMgr.getSegmentsToDelete()).andReturn(ImmutableList.of(dataSegment)).once(); EasyMock.expect(druidServerConfig.getTier()).andReturn(tier).once(); EasyMock.expect(druidServerConfig.getPriority()).andReturn(priority).once(); EasyMock.expect(segmentManager.getDataSourceSizes()).andReturn(ImmutableMap.of(dataSource, size));