diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/compact/LocalOverlordClient.java b/indexing-service/src/main/java/org/apache/druid/indexing/compact/LocalOverlordClient.java index eeaf138c1065..dcce2f4615ae 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/compact/LocalOverlordClient.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/compact/LocalOverlordClient.java @@ -45,7 +45,8 @@ import org.apache.druid.metadata.LockFilterPolicy; import org.apache.druid.rpc.ServiceRetryPolicy; import org.apache.druid.rpc.indexing.OverlordClient; -import org.apache.druid.server.coordinator.AutoCompactionSnapshot; +import org.apache.druid.server.compaction.CompactionProgressResponse; +import org.apache.druid.server.compaction.CompactionStatusResponse; import org.joda.time.Interval; import javax.annotation.Nullable; @@ -238,13 +239,13 @@ public ListenableFuture> getWorkers() } @Override - public ListenableFuture> getCompactionSnapshots(@Nullable String dataSource) + public ListenableFuture getCompactionSnapshots(@Nullable String dataSource) { throw new UnsupportedOperationException(); } @Override - public ListenableFuture getBytesAwaitingCompaction(String dataSource) + public ListenableFuture getBytesAwaitingCompaction(String dataSource) { throw new UnsupportedOperationException(); } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordCompactionResource.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordCompactionResource.java index 9f21f2ec4c19..425dfc9f35f6 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordCompactionResource.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordCompactionResource.java @@ -19,13 +19,18 @@ package org.apache.druid.indexing.overlord.http; -import com.google.common.collect.ImmutableMap; import com.google.inject.Inject; import com.sun.jersey.spi.container.ResourceFilters; +import org.apache.druid.error.DruidException; +import org.apache.druid.error.InvalidInput; +import org.apache.druid.error.NotFound; import org.apache.druid.indexing.compact.CompactionScheduler; +import org.apache.druid.server.compaction.CompactionProgressResponse; +import org.apache.druid.server.compaction.CompactionStatusResponse; import org.apache.druid.server.coordinator.AutoCompactionSnapshot; import org.apache.druid.server.coordinator.ClusterCompactionConfig; import org.apache.druid.server.coordinator.CompactionSupervisorConfig; +import org.apache.druid.server.http.ServletResourceUtils; import org.apache.druid.server.http.security.StateResourceFilter; import javax.ws.rs.Consumes; @@ -40,8 +45,8 @@ import java.util.Collections; /** - * Contains the same logic as {@code CompactionResource} but the APIs are served - * by {@link CompactionScheduler} instead of {@code DruidCoordinator}. + * Contains the same logic as {@code CoordinatorCompactionResource} but the APIs + * are served by {@link CompactionScheduler} instead of {@code DruidCoordinator}. */ @Path("/druid/indexer/v1/compaction") public class OverlordCompactionResource @@ -81,18 +86,14 @@ public Response getCompactionProgress( } if (dataSource == null || dataSource.isEmpty()) { - return Response.status(Response.Status.BAD_REQUEST) - .entity(Collections.singletonMap("error", "No DataSource specified")) - .build(); + return ServletResourceUtils.buildErrorResponseFrom(InvalidInput.exception("No DataSource specified")); } final AutoCompactionSnapshot snapshot = scheduler.getCompactionSnapshot(dataSource); if (snapshot == null) { - return Response.status(Response.Status.NOT_FOUND) - .entity(Collections.singletonMap("error", "Unknown DataSource")) - .build(); + return ServletResourceUtils.buildErrorResponseFrom(NotFound.exception("Unknown DataSource")); } else { - return Response.ok(Collections.singletonMap("remainingSegmentSize", snapshot.getBytesAwaitingCompaction())) + return Response.ok(new CompactionProgressResponse(snapshot.getBytesAwaitingCompaction())) .build(); } } @@ -115,13 +116,11 @@ public Response getCompactionSnapshots( } else { AutoCompactionSnapshot autoCompactionSnapshot = scheduler.getCompactionSnapshot(dataSource); if (autoCompactionSnapshot == null) { - return Response.status(Response.Status.NOT_FOUND) - .entity(Collections.singletonMap("error", "Unknown DataSource")) - .build(); + return ServletResourceUtils.buildErrorResponseFrom(NotFound.exception("Unknown DataSource")); } snapshots = Collections.singleton(autoCompactionSnapshot); } - return Response.ok(Collections.singletonMap("latestStatus", snapshots)).build(); + return Response.ok(new CompactionStatusResponse(snapshots)).build(); } @POST @@ -139,12 +138,12 @@ public Response simulateRunWithConfigUpdate( private Response buildErrorResponseIfSchedulerDisabled() { - return Response.status(Response.Status.SERVICE_UNAVAILABLE).entity( - ImmutableMap.of( - "error", - "Compaction Supervisors are disabled on the Overlord." - + " Use Coordinator APIs to fetch compaction status." - ) - ).build(); + final String msg = "Compaction Supervisors are disabled on the Overlord." + + " Use Coordinator APIs to fetch compaction status."; + return ServletResourceUtils.buildErrorResponseFrom( + DruidException.forPersona(DruidException.Persona.USER) + .ofCategory(DruidException.Category.UNSUPPORTED) + .build(msg) + ); } } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordCompactionResourceTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordCompactionResourceTest.java new file mode 100644 index 000000000000..b93e6e7c1ac2 --- /dev/null +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordCompactionResourceTest.java @@ -0,0 +1,232 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexing.overlord.http; + +import com.google.common.collect.ImmutableMap; +import org.apache.druid.error.DruidException; +import org.apache.druid.error.DruidExceptionMatcher; +import org.apache.druid.error.ErrorResponse; +import org.apache.druid.indexing.compact.CompactionScheduler; +import org.apache.druid.segment.TestDataSource; +import org.apache.druid.server.compaction.CompactionProgressResponse; +import org.apache.druid.server.compaction.CompactionStatistics; +import org.apache.druid.server.compaction.CompactionStatusResponse; +import org.apache.druid.server.coordinator.AutoCompactionSnapshot; +import org.apache.druid.server.coordinator.CompactionSupervisorConfig; +import org.easymock.EasyMock; +import org.hamcrest.MatcherAssert; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import javax.ws.rs.core.Response; +import java.util.Collections; +import java.util.Map; + +public class OverlordCompactionResourceTest +{ + private static final CompactionSupervisorConfig SUPERVISOR_ENABLED + = new CompactionSupervisorConfig(true); + private static final CompactionSupervisorConfig SUPERVISOR_DISABLED + = new CompactionSupervisorConfig(false); + + private CompactionScheduler scheduler; + + @Before + public void setUp() + { + scheduler = EasyMock.createStrictMock(CompactionScheduler.class); + } + + @After + public void tearDown() + { + EasyMock.verify(scheduler); + } + + @Test + public void testGetCompactionSnapshotWithEmptyDatasource() + { + final Map allSnapshots = ImmutableMap.of( + TestDataSource.WIKI, + AutoCompactionSnapshot.builder(TestDataSource.WIKI).build() + ); + + EasyMock.expect(scheduler.getAllCompactionSnapshots()) + .andReturn(allSnapshots).once(); + EasyMock.replay(scheduler); + + final Response response = new OverlordCompactionResource(SUPERVISOR_ENABLED, scheduler) + .getCompactionSnapshots(""); + Assert.assertEquals(200, response.getStatus()); + Assert.assertEquals( + new CompactionStatusResponse(allSnapshots.values()), + response.getEntity() + ); + } + + @Test + public void testGetCompactionSnapshotWithNullDatasource() + { + final Map allSnapshots = ImmutableMap.of( + TestDataSource.WIKI, + AutoCompactionSnapshot.builder(TestDataSource.WIKI).build() + ); + + EasyMock.expect(scheduler.getAllCompactionSnapshots()) + .andReturn(allSnapshots).once(); + EasyMock.replay(scheduler); + + final Response response = new OverlordCompactionResource(SUPERVISOR_ENABLED, scheduler) + .getCompactionSnapshots(null); + Assert.assertEquals(200, response.getStatus()); + Assert.assertEquals( + new CompactionStatusResponse(allSnapshots.values()), + response.getEntity() + ); + } + + @Test + public void testGetCompactionSnapshotWithValidDatasource() + { + final AutoCompactionSnapshot snapshot = AutoCompactionSnapshot.builder(TestDataSource.WIKI).build(); + + EasyMock.expect(scheduler.getCompactionSnapshot(TestDataSource.WIKI)) + .andReturn(snapshot).once(); + EasyMock.replay(scheduler); + + final Response response = new OverlordCompactionResource(SUPERVISOR_ENABLED, scheduler) + .getCompactionSnapshots(TestDataSource.WIKI); + Assert.assertEquals(200, response.getStatus()); + Assert.assertEquals( + new CompactionStatusResponse(Collections.singleton(snapshot)), + response.getEntity() + ); + } + + @Test + public void testGetCompactionSnapshotWithInvalidDatasource() + { + EasyMock.expect(scheduler.getCompactionSnapshot(TestDataSource.KOALA)) + .andReturn(null).once(); + EasyMock.replay(scheduler); + + final Response response = new OverlordCompactionResource(SUPERVISOR_ENABLED, scheduler) + .getCompactionSnapshots(TestDataSource.KOALA); + Assert.assertEquals(404, response.getStatus()); + } + + @Test + public void testGetProgressForValidDatasource() + { + final AutoCompactionSnapshot.Builder snapshotBuilder + = AutoCompactionSnapshot.builder(TestDataSource.WIKI); + snapshotBuilder.incrementWaitingStats(CompactionStatistics.create(100L, 10L, 1L)); + final AutoCompactionSnapshot snapshot = snapshotBuilder.build(); + + EasyMock.expect(scheduler.getCompactionSnapshot(TestDataSource.WIKI)) + .andReturn(snapshot).once(); + EasyMock.replay(scheduler); + + final Response response = new OverlordCompactionResource(SUPERVISOR_ENABLED, scheduler) + .getCompactionProgress(TestDataSource.WIKI); + Assert.assertEquals(200, response.getStatus()); + Assert.assertEquals(new CompactionProgressResponse(100L), response.getEntity()); + } + + @Test + public void testGetProgressForNullDatasourceReturnsBadRequest() + { + EasyMock.replay(scheduler); + + final Response response = new OverlordCompactionResource(SUPERVISOR_ENABLED, scheduler) + .getCompactionProgress(null); + Assert.assertEquals(Response.Status.BAD_REQUEST.getStatusCode(), response.getStatus()); + + final Object responseEntity = response.getEntity(); + Assert.assertTrue(responseEntity instanceof ErrorResponse); + + MatcherAssert.assertThat( + ((ErrorResponse) responseEntity).getUnderlyingException(), + DruidExceptionMatcher.invalidInput().expectMessageIs("No DataSource specified") + ); + } + + @Test + public void testGetProgressForInvalidDatasourceReturnsNotFound() + { + EasyMock.expect(scheduler.getCompactionSnapshot(TestDataSource.KOALA)) + .andReturn(null).once(); + EasyMock.replay(scheduler); + + final Response response = new OverlordCompactionResource(SUPERVISOR_ENABLED, scheduler) + .getCompactionProgress(TestDataSource.KOALA); + Assert.assertEquals(Response.Status.NOT_FOUND.getStatusCode(), response.getStatus()); + + final Object responseEntity = response.getEntity(); + Assert.assertTrue(responseEntity instanceof ErrorResponse); + + MatcherAssert.assertThat( + ((ErrorResponse) responseEntity).getUnderlyingException(), + DruidExceptionMatcher.notFound().expectMessageIs("Unknown DataSource") + ); + } + + @Test + public void testGetProgressReturnsUnsupportedWhenSupervisorDisabled() + { + EasyMock.replay(scheduler); + verifyResponseWhenSupervisorDisabled( + new OverlordCompactionResource(SUPERVISOR_DISABLED, scheduler) + .getCompactionProgress(TestDataSource.WIKI) + ); + } + + @Test + public void testGetSnapshotReturnsUnsupportedWhenSupervisorDisabled() + { + EasyMock.replay(scheduler); + verifyResponseWhenSupervisorDisabled( + new OverlordCompactionResource(SUPERVISOR_DISABLED, scheduler) + .getCompactionSnapshots(TestDataSource.WIKI) + ); + } + + private void verifyResponseWhenSupervisorDisabled(Response response) + { + Assert.assertEquals(501, response.getStatus()); + + final Object responseEntity = response.getEntity(); + Assert.assertTrue(responseEntity instanceof ErrorResponse); + + MatcherAssert.assertThat( + ((ErrorResponse) responseEntity).getUnderlyingException(), + new DruidExceptionMatcher( + DruidException.Persona.USER, + DruidException.Category.UNSUPPORTED, + "general" + ).expectMessageIs( + "Compaction Supervisors are disabled on the Overlord." + + " Use Coordinator APIs to fetch compaction status." + ) + ); + } +} diff --git a/processing/src/test/java/org/apache/druid/error/DruidExceptionMatcher.java b/processing/src/test/java/org/apache/druid/error/DruidExceptionMatcher.java index 442cbaa48d9e..64a24d53626b 100644 --- a/processing/src/test/java/org/apache/druid/error/DruidExceptionMatcher.java +++ b/processing/src/test/java/org/apache/druid/error/DruidExceptionMatcher.java @@ -40,6 +40,15 @@ public static DruidExceptionMatcher invalidInput() ); } + public static DruidExceptionMatcher notFound() + { + return new DruidExceptionMatcher( + DruidException.Persona.USER, + DruidException.Category.NOT_FOUND, + "notFound" + ); + } + public static DruidExceptionMatcher invalidSqlInput() { return invalidInput().expectContext("sourceType", "sql"); diff --git a/server/src/main/java/org/apache/druid/rpc/indexing/OverlordClient.java b/server/src/main/java/org/apache/druid/rpc/indexing/OverlordClient.java index 6797a00b432b..fcc93d39c8de 100644 --- a/server/src/main/java/org/apache/druid/rpc/indexing/OverlordClient.java +++ b/server/src/main/java/org/apache/druid/rpc/indexing/OverlordClient.java @@ -35,7 +35,8 @@ import org.apache.druid.java.util.common.parsers.CloseableIterator; import org.apache.druid.metadata.LockFilterPolicy; import org.apache.druid.rpc.ServiceRetryPolicy; -import org.apache.druid.server.coordinator.AutoCompactionSnapshot; +import org.apache.druid.server.compaction.CompactionProgressResponse; +import org.apache.druid.server.compaction.CompactionStatusResponse; import org.joda.time.DateTime; import org.joda.time.Interval; @@ -226,7 +227,7 @@ ListenableFuture>> findLockedIntervals( *

* API: {@code /druid/indexer/v1/compaction/progress} */ - ListenableFuture getBytesAwaitingCompaction(String dataSource); + ListenableFuture getBytesAwaitingCompaction(String dataSource); /** * Gets the latest compaction snapshots of one or all datasources. @@ -236,7 +237,7 @@ ListenableFuture>> findLockedIntervals( * @param dataSource If passed as non-null, then the returned list contains only * the snapshot for this datasource. */ - ListenableFuture> getCompactionSnapshots(@Nullable String dataSource); + ListenableFuture getCompactionSnapshots(@Nullable String dataSource); /** * Returns a copy of this client with a different retry policy. diff --git a/server/src/main/java/org/apache/druid/rpc/indexing/OverlordClientImpl.java b/server/src/main/java/org/apache/druid/rpc/indexing/OverlordClientImpl.java index 2a48e0ed6928..1653962e1940 100644 --- a/server/src/main/java/org/apache/druid/rpc/indexing/OverlordClientImpl.java +++ b/server/src/main/java/org/apache/druid/rpc/indexing/OverlordClientImpl.java @@ -45,7 +45,8 @@ import org.apache.druid.rpc.RequestBuilder; import org.apache.druid.rpc.ServiceClient; import org.apache.druid.rpc.ServiceRetryPolicy; -import org.apache.druid.server.coordinator.AutoCompactionSnapshot; +import org.apache.druid.server.compaction.CompactionProgressResponse; +import org.apache.druid.server.compaction.CompactionStatusResponse; import org.jboss.netty.handler.codec.http.HttpMethod; import org.joda.time.Interval; @@ -314,7 +315,7 @@ public ListenableFuture taskPayload(final String taskId) } @Override - public ListenableFuture> getCompactionSnapshots(@Nullable String dataSource) + public ListenableFuture getCompactionSnapshots(@Nullable String dataSource) { final StringBuilder pathBuilder = new StringBuilder("/druid/indexer/v1/compaction/status"); if (dataSource != null && !dataSource.isEmpty()) { @@ -329,13 +330,13 @@ public ListenableFuture> getCompactionSnapshots(@Nu holder -> JacksonUtils.readValue( jsonMapper, holder.getContent(), - new TypeReference>() {} + CompactionStatusResponse.class ) ); } @Override - public ListenableFuture getBytesAwaitingCompaction(String dataSource) + public ListenableFuture getBytesAwaitingCompaction(String dataSource) { final String path = "/druid/indexer/v1/compaction/progress?dataSource=" + dataSource; return FutureUtils.transform( @@ -343,7 +344,7 @@ public ListenableFuture getBytesAwaitingCompaction(String dataSource) new RequestBuilder(HttpMethod.GET, path), new BytesFullResponseHandler() ), - holder -> JacksonUtils.readValue(jsonMapper, holder.getContent(), Long.class) + holder -> JacksonUtils.readValue(jsonMapper, holder.getContent(), CompactionProgressResponse.class) ); } diff --git a/server/src/main/java/org/apache/druid/server/compaction/CompactionProgressResponse.java b/server/src/main/java/org/apache/druid/server/compaction/CompactionProgressResponse.java new file mode 100644 index 000000000000..b91e3d27e7c6 --- /dev/null +++ b/server/src/main/java/org/apache/druid/server/compaction/CompactionProgressResponse.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.server.compaction; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; + +import java.util.Objects; + +/** + * Response of {@code /compaction/progress} API exposed by Coordinator and + * Overlord (when compaction supervisors are enabled). + */ +public class CompactionProgressResponse +{ + private final long remainingSegmentSize; + + @JsonCreator + public CompactionProgressResponse( + @JsonProperty("remainingSegmentSize") long remainingSegmentSize + ) + { + this.remainingSegmentSize = remainingSegmentSize; + } + + @JsonProperty + public long getRemainingSegmentSize() + { + return remainingSegmentSize; + } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + CompactionProgressResponse that = (CompactionProgressResponse) o; + return remainingSegmentSize == that.remainingSegmentSize; + } + + @Override + public int hashCode() + { + return Objects.hashCode(remainingSegmentSize); + } +} diff --git a/server/src/main/java/org/apache/druid/server/compaction/CompactionRunSimulator.java b/server/src/main/java/org/apache/druid/server/compaction/CompactionRunSimulator.java index 6e8ec6b4b116..b51777c9e3b5 100644 --- a/server/src/main/java/org/apache/druid/server/compaction/CompactionRunSimulator.java +++ b/server/src/main/java/org/apache/druid/server/compaction/CompactionRunSimulator.java @@ -35,7 +35,6 @@ import org.apache.druid.metadata.LockFilterPolicy; import org.apache.druid.rpc.ServiceRetryPolicy; import org.apache.druid.rpc.indexing.OverlordClient; -import org.apache.druid.server.coordinator.AutoCompactionSnapshot; import org.apache.druid.server.coordinator.ClusterCompactionConfig; import org.apache.druid.server.coordinator.DataSourceCompactionConfig; import org.apache.druid.server.coordinator.DruidCompactionConfig; @@ -109,7 +108,9 @@ public void onCompactionStatusComputed( ) { final CompactionStatus status = candidateSegments.getCurrentStatus(); - if (status.getState() == CompactionStatus.State.COMPLETE) { + if (status == null) { + // do nothing + } else if (status.getState() == CompactionStatus.State.COMPLETE) { compactedIntervals.addRow( createRow(candidateSegments, null, null) ); @@ -130,7 +131,7 @@ public void onTaskSubmitted(ClientCompactionTaskQuery taskPayload, CompactionCan // Add a row for each task in order of submission final CompactionStatus status = candidateSegments.getCurrentStatus(); queuedIntervals.addRow( - createRow(candidateSegments, taskPayload.getTuningConfig(), status.getReason()) + createRow(candidateSegments, taskPayload.getTuningConfig(), status == null ? "" : status.getReason()) ); } }; @@ -285,13 +286,13 @@ public ListenableFuture> getWorkers() } @Override - public ListenableFuture> getCompactionSnapshots(@Nullable String dataSource) + public ListenableFuture getCompactionSnapshots(@Nullable String dataSource) { throw new UnsupportedOperationException(); } @Override - public ListenableFuture getBytesAwaitingCompaction(String dataSource) + public ListenableFuture getBytesAwaitingCompaction(String dataSource) { throw new UnsupportedOperationException(); } diff --git a/server/src/main/java/org/apache/druid/server/compaction/CompactionStatusResponse.java b/server/src/main/java/org/apache/druid/server/compaction/CompactionStatusResponse.java new file mode 100644 index 000000000000..756da7aa9c4b --- /dev/null +++ b/server/src/main/java/org/apache/druid/server/compaction/CompactionStatusResponse.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.server.compaction; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.druid.server.coordinator.AutoCompactionSnapshot; + +import java.util.Collection; +import java.util.Objects; + +/** + * Response of {@code /compaction/status} API exposed by Coordinator and + * Overlord (when compaction supervisors are enabled). + */ +public class CompactionStatusResponse +{ + private final Collection latestStatus; + + @JsonCreator + public CompactionStatusResponse( + @JsonProperty("latestStatus") Collection latestStatus + ) + { + this.latestStatus = latestStatus; + } + + @JsonProperty + public Collection getLatestStatus() + { + return latestStatus; + } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + CompactionStatusResponse that = (CompactionStatusResponse) o; + return Objects.equals(latestStatus, that.latestStatus); + } + + @Override + public int hashCode() + { + return Objects.hashCode(latestStatus); + } +} diff --git a/server/src/main/java/org/apache/druid/server/http/CoordinatorCompactionResource.java b/server/src/main/java/org/apache/druid/server/http/CoordinatorCompactionResource.java index 015a27435cf8..cb146b7ed955 100644 --- a/server/src/main/java/org/apache/druid/server/http/CoordinatorCompactionResource.java +++ b/server/src/main/java/org/apache/druid/server/http/CoordinatorCompactionResource.java @@ -21,13 +21,17 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableMap; import com.google.common.util.concurrent.ListenableFuture; import com.google.inject.Inject; import com.sun.jersey.spi.container.ResourceFilters; import org.apache.druid.common.guava.FutureUtils; +import org.apache.druid.error.InternalServerError; +import org.apache.druid.error.InvalidInput; +import org.apache.druid.error.NotFound; import org.apache.druid.rpc.HttpResponseException; import org.apache.druid.rpc.indexing.OverlordClient; +import org.apache.druid.server.compaction.CompactionProgressResponse; +import org.apache.druid.server.compaction.CompactionStatusResponse; import org.apache.druid.server.coordinator.AutoCompactionSnapshot; import org.apache.druid.server.coordinator.ClusterCompactionConfig; import org.apache.druid.server.coordinator.DruidCoordinator; @@ -82,9 +86,7 @@ public Response getCompactionProgress( ) { if (dataSource == null || dataSource.isEmpty()) { - return Response.status(Response.Status.BAD_REQUEST) - .entity(ImmutableMap.of("error", "No DataSource specified")) - .build(); + return ServletResourceUtils.buildErrorResponseFrom(InvalidInput.exception("No DataSource specified")); } if (isCompactionSupervisorEnabled()) { @@ -93,9 +95,9 @@ public Response getCompactionProgress( final AutoCompactionSnapshot snapshot = coordinator.getAutoCompactionSnapshotForDataSource(dataSource); if (snapshot == null) { - return Response.status(Response.Status.NOT_FOUND).entity(ImmutableMap.of("error", "Unknown DataSource")).build(); + return ServletResourceUtils.buildErrorResponseFrom(NotFound.exception("Unknown DataSource")); } else { - return Response.ok(ImmutableMap.of("remainingSegmentSize", snapshot.getBytesAwaitingCompaction())).build(); + return Response.ok(new CompactionProgressResponse(snapshot.getBytesAwaitingCompaction())).build(); } } @@ -117,11 +119,11 @@ public Response getCompactionSnapshotForDataSource( } else { AutoCompactionSnapshot autoCompactionSnapshot = coordinator.getAutoCompactionSnapshotForDataSource(dataSource); if (autoCompactionSnapshot == null) { - return Response.status(Response.Status.NOT_FOUND).entity(ImmutableMap.of("error", "Unknown DataSource")).build(); + return ServletResourceUtils.buildErrorResponseFrom(NotFound.exception("Unknown DataSource")); } snapshots = ImmutableList.of(autoCompactionSnapshot); } - return Response.ok(ImmutableMap.of("latestStatus", snapshots)).build(); + return Response.ok(new CompactionStatusResponse(snapshots)).build(); } @POST @@ -149,9 +151,9 @@ private Response buildResponse(ListenableFuture future) .entity(cause.getResponse().getContent()) .build(); } else { - return Response.status(Response.Status.INTERNAL_SERVER_ERROR) - .entity(ImmutableMap.of("error", e.getMessage())) - .build(); + return ServletResourceUtils.buildErrorResponseFrom( + InternalServerError.exception(e.getMessage()) + ); } } } diff --git a/server/src/test/java/org/apache/druid/client/indexing/NoopOverlordClient.java b/server/src/test/java/org/apache/druid/client/indexing/NoopOverlordClient.java index c73ff1ca0599..8ccc32e435ce 100644 --- a/server/src/test/java/org/apache/druid/client/indexing/NoopOverlordClient.java +++ b/server/src/test/java/org/apache/druid/client/indexing/NoopOverlordClient.java @@ -28,7 +28,8 @@ import org.apache.druid.metadata.LockFilterPolicy; import org.apache.druid.rpc.ServiceRetryPolicy; import org.apache.druid.rpc.indexing.OverlordClient; -import org.apache.druid.server.coordinator.AutoCompactionSnapshot; +import org.apache.druid.server.compaction.CompactionProgressResponse; +import org.apache.druid.server.compaction.CompactionStatusResponse; import org.joda.time.Interval; import javax.annotation.Nullable; @@ -124,13 +125,13 @@ public ListenableFuture getTotalWorkerCapacity( } @Override - public ListenableFuture> getCompactionSnapshots(@Nullable String dataSource) + public ListenableFuture getCompactionSnapshots(@Nullable String dataSource) { throw new UnsupportedOperationException(); } @Override - public ListenableFuture getBytesAwaitingCompaction(String dataSource) + public ListenableFuture getBytesAwaitingCompaction(String dataSource) { throw new UnsupportedOperationException(); } diff --git a/server/src/test/java/org/apache/druid/rpc/indexing/OverlordClientImplTest.java b/server/src/test/java/org/apache/druid/rpc/indexing/OverlordClientImplTest.java index 18e7e2769500..1167d10502d7 100644 --- a/server/src/test/java/org/apache/druid/rpc/indexing/OverlordClientImplTest.java +++ b/server/src/test/java/org/apache/druid/rpc/indexing/OverlordClientImplTest.java @@ -47,6 +47,8 @@ import org.apache.druid.rpc.HttpResponseException; import org.apache.druid.rpc.MockServiceClient; import org.apache.druid.rpc.RequestBuilder; +import org.apache.druid.server.compaction.CompactionProgressResponse; +import org.apache.druid.server.compaction.CompactionStatusResponse; import org.apache.druid.server.coordinator.AutoCompactionSnapshot; import org.hamcrest.CoreMatchers; import org.hamcrest.MatcherAssert; @@ -477,18 +479,18 @@ public void test_getCompactionSnapshots_nullDataSource() .withStatus(AutoCompactionSnapshot.AutoCompactionScheduleStatus.RUNNING) .build(), AutoCompactionSnapshot.builder("ds2") - .withStatus(AutoCompactionSnapshot.AutoCompactionScheduleStatus.RUNNING) + .withStatus(AutoCompactionSnapshot.AutoCompactionScheduleStatus.NOT_ENABLED) .build() ); serviceClient.expectAndRespond( new RequestBuilder(HttpMethod.GET, "/druid/indexer/v1/compaction/status"), HttpResponseStatus.OK, Collections.emptyMap(), - DefaultObjectMapper.INSTANCE.writeValueAsBytes(compactionSnapshots) + DefaultObjectMapper.INSTANCE.writeValueAsBytes(new CompactionStatusResponse(compactionSnapshots)) ); Assert.assertEquals( - compactionSnapshots, + new CompactionStatusResponse(compactionSnapshots), overlordClient.getCompactionSnapshots(null).get() ); } @@ -498,19 +500,17 @@ public void test_getCompactionSnapshots_nonNullDataSource() throws JsonProcessingException, ExecutionException, InterruptedException { final List compactionSnapshots = Collections.singletonList( - AutoCompactionSnapshot.builder("ds1") - .withStatus(AutoCompactionSnapshot.AutoCompactionScheduleStatus.RUNNING) - .build() + AutoCompactionSnapshot.builder("ds1").build() ); serviceClient.expectAndRespond( new RequestBuilder(HttpMethod.GET, "/druid/indexer/v1/compaction/status?dataSource=ds1"), HttpResponseStatus.OK, Collections.emptyMap(), - DefaultObjectMapper.INSTANCE.writeValueAsBytes(compactionSnapshots) + DefaultObjectMapper.INSTANCE.writeValueAsBytes(new CompactionStatusResponse(compactionSnapshots)) ); Assert.assertEquals( - compactionSnapshots, + new CompactionStatusResponse(compactionSnapshots), overlordClient.getCompactionSnapshots("ds1").get() ); } @@ -523,12 +523,12 @@ public void test_getBytesAwaitingCompaction() new RequestBuilder(HttpMethod.GET, "/druid/indexer/v1/compaction/progress?dataSource=ds1"), HttpResponseStatus.OK, Collections.emptyMap(), - DefaultObjectMapper.INSTANCE.writeValueAsBytes(100_000L) + DefaultObjectMapper.INSTANCE.writeValueAsBytes(new CompactionProgressResponse(100_000L)) ); Assert.assertEquals( - 100_000L, - overlordClient.getBytesAwaitingCompaction("ds1").get().longValue() + new CompactionProgressResponse(100_000L), + overlordClient.getBytesAwaitingCompaction("ds1").get() ); } } diff --git a/server/src/test/java/org/apache/druid/server/http/CoordinatorCompactionResourceTest.java b/server/src/test/java/org/apache/druid/server/http/CoordinatorCompactionResourceTest.java index 8c6c747d3a8b..4a73047d1955 100644 --- a/server/src/test/java/org/apache/druid/server/http/CoordinatorCompactionResourceTest.java +++ b/server/src/test/java/org/apache/druid/server/http/CoordinatorCompactionResourceTest.java @@ -19,16 +19,19 @@ package org.apache.druid.server.http; -import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import org.apache.druid.client.indexing.NoopOverlordClient; +import org.apache.druid.error.DruidExceptionMatcher; +import org.apache.druid.error.ErrorResponse; import org.apache.druid.rpc.indexing.OverlordClient; import org.apache.druid.server.compaction.CompactionStatistics; +import org.apache.druid.server.compaction.CompactionStatusResponse; import org.apache.druid.server.coordinator.AutoCompactionSnapshot; import org.apache.druid.server.coordinator.DruidCoordinator; import org.easymock.EasyMock; +import org.hamcrest.MatcherAssert; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -37,7 +40,6 @@ import javax.annotation.Nullable; import javax.ws.rs.core.Response; import java.util.Collections; -import java.util.List; import java.util.Map; public class CoordinatorCompactionResourceTest @@ -92,7 +94,7 @@ public void testGetCompactionSnapshotForDataSourceWithEmptyQueryParameter() final Response response = new CoordinatorCompactionResource(mock, overlordClient) .getCompactionSnapshotForDataSource(""); - Assert.assertEquals(ImmutableMap.of("latestStatus", expected.values()), response.getEntity()); + Assert.assertEquals(new CompactionStatusResponse(expected.values()), response.getEntity()); Assert.assertEquals(200, response.getStatus()); } @@ -110,7 +112,7 @@ public void testGetCompactionSnapshotForDataSourceWithNullQueryParameter() final Response response = new CoordinatorCompactionResource(mock, overlordClient) .getCompactionSnapshotForDataSource(null); - Assert.assertEquals(ImmutableMap.of("latestStatus", expected.values()), response.getEntity()); + Assert.assertEquals(new CompactionStatusResponse(expected.values()), response.getEntity()); Assert.assertEquals(200, response.getStatus()); } @@ -119,12 +121,16 @@ public void testGetCompactionSnapshotForDataSourceWithValidQueryParameter() { String dataSourceName = "datasource_1"; - EasyMock.expect(mock.getAutoCompactionSnapshotForDataSource(dataSourceName)).andReturn(expectedSnapshot).once(); + EasyMock.expect(mock.getAutoCompactionSnapshotForDataSource(dataSourceName)) + .andReturn(expectedSnapshot).once(); EasyMock.replay(mock); final Response response = new CoordinatorCompactionResource(mock, overlordClient) .getCompactionSnapshotForDataSource(dataSourceName); - Assert.assertEquals(ImmutableMap.of("latestStatus", ImmutableList.of(expectedSnapshot)), response.getEntity()); + Assert.assertEquals( + new CompactionStatusResponse(Collections.singletonList(expectedSnapshot)), + response.getEntity() + ); Assert.assertEquals(200, response.getStatus()); } @@ -133,7 +139,8 @@ public void testGetCompactionSnapshotForDataSourceWithInvalidQueryParameter() { String dataSourceName = "invalid_datasource"; - EasyMock.expect(mock.getAutoCompactionSnapshotForDataSource(dataSourceName)).andReturn(null).once(); + EasyMock.expect(mock.getAutoCompactionSnapshotForDataSource(dataSourceName)) + .andReturn(null).once(); EasyMock.replay(mock); final Response response = new CoordinatorCompactionResource(mock, overlordClient) @@ -149,14 +156,18 @@ public void testGetProgressForNullDatasourceReturnsBadRequest() final Response response = new CoordinatorCompactionResource(mock, overlordClient) .getCompactionProgress(null); Assert.assertEquals(Response.Status.BAD_REQUEST.getStatusCode(), response.getStatus()); - Assert.assertEquals( - ImmutableMap.of("error", "No DataSource specified"), - response.getEntity() + + final Object responseEntity = response.getEntity(); + Assert.assertTrue(responseEntity instanceof ErrorResponse); + + MatcherAssert.assertThat( + ((ErrorResponse) responseEntity).getUnderlyingException(), + DruidExceptionMatcher.invalidInput().expectMessageIs("No DataSource specified") ); } @Test - public void testGetSnapshotWhenCompactionSupervisorIsEnabled() + public void testGetSnapshotRedirectsToOverlordWhenSupervisorIsEnabled() { EasyMock.replay(mock); @@ -172,9 +183,11 @@ public ListenableFuture isCompactionSupervisorEnabled() } @Override - public ListenableFuture> getCompactionSnapshots(@Nullable String dataSource) + public ListenableFuture getCompactionSnapshots(@Nullable String dataSource) { - return Futures.immediateFuture(Collections.singletonList(snapshotFromOverlord)); + return Futures.immediateFuture( + new CompactionStatusResponse(Collections.singletonList(snapshotFromOverlord)) + ); } }; @@ -182,7 +195,7 @@ public ListenableFuture> getCompactionSnapshots(@Nu .getCompactionSnapshotForDataSource(dataSourceName); Assert.assertEquals(200, response.getStatus()); Assert.assertEquals( - Collections.singletonList(snapshotFromOverlord), + new CompactionStatusResponse(Collections.singletonList(snapshotFromOverlord)), response.getEntity() ); } diff --git a/services/src/main/java/org/apache/druid/cli/CliCoordinator.java b/services/src/main/java/org/apache/druid/cli/CliCoordinator.java index 68923f21f1c1..ba7bd43ef8d8 100644 --- a/services/src/main/java/org/apache/druid/cli/CliCoordinator.java +++ b/services/src/main/java/org/apache/druid/cli/CliCoordinator.java @@ -51,11 +51,10 @@ import org.apache.druid.guice.LifecycleModule; import org.apache.druid.guice.ManageLifecycle; import org.apache.druid.guice.QueryableModule; +import org.apache.druid.guice.SupervisorCleanupModule; import org.apache.druid.guice.annotations.EscalatedGlobal; import org.apache.druid.guice.annotations.Global; import org.apache.druid.guice.http.JettyHttpClientModule; -import org.apache.druid.indexing.overlord.TaskMaster; -import org.apache.druid.indexing.overlord.TaskStorage; import org.apache.druid.java.util.common.IAE; import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.StringUtils; @@ -89,7 +88,6 @@ import org.apache.druid.query.metadata.SegmentMetadataQueryQueryToolChest; import org.apache.druid.query.metadata.SegmentMetadataQueryRunnerFactory; import org.apache.druid.query.metadata.metadata.SegmentMetadataQuery; -import org.apache.druid.segment.incremental.RowIngestionMetersFactory; import org.apache.druid.segment.metadata.CentralizedDatasourceSchemaConfig; import org.apache.druid.segment.metadata.CoordinatorSegmentMetadataCache; import org.apache.druid.segment.metadata.SegmentMetadataCacheConfig; @@ -276,16 +274,6 @@ public void configure(Binder binder) LifecycleModule.register(binder, Server.class); LifecycleModule.register(binder, DataSourcesResource.class); - if (properties.containsKey("druid.coordinator.merge.on")) { - throw new UnsupportedOperationException( - "'druid.coordinator.merge.on' is not supported anymore. " - + "Please consider using Coordinator's automatic compaction instead. " - + "See https://druid.apache.org/docs/latest/operations/segment-optimization.html and " - + "https://druid.apache.org/docs/latest/api-reference/api-reference.html#compaction-configuration " - + "for more details about compaction." - ); - } - bindAnnouncer( binder, Coordinator.class, @@ -296,10 +284,6 @@ public void configure(Binder binder) LifecycleModule.registerKey(binder, Key.get(SelfDiscoveryResource.class)); if (!beOverlord) { - // These are needed to deserialize SupervisorSpec for Supervisor Auto Cleanup - binder.bind(TaskStorage.class).toProvider(Providers.of(null)); - binder.bind(TaskMaster.class).toProvider(Providers.of(null)); - binder.bind(RowIngestionMetersFactory.class).toProvider(Providers.of(null)); // Bind HeartbeatSupplier only when the service operates independently of Overlord. binder.bind(new TypeLiteral>>() {}) .annotatedWith(Names.named(ServiceStatusMonitor.HEARTBEAT_TAGS_BINDING)) @@ -342,6 +326,7 @@ public LoadQueueTaskMaster getLoadQueueTaskMaster( // Only add LookupSerdeModule if !beOverlord, since CliOverlord includes it, and having two copies causes // the injector to get confused due to having multiple bindings for the same classes. modules.add(new LookupSerdeModule()); + modules.add(new SupervisorCleanupModule()); } return modules; diff --git a/services/src/main/java/org/apache/druid/guice/SupervisorCleanupModule.java b/services/src/main/java/org/apache/druid/guice/SupervisorCleanupModule.java new file mode 100644 index 000000000000..c60442a06f75 --- /dev/null +++ b/services/src/main/java/org/apache/druid/guice/SupervisorCleanupModule.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.guice; + +import com.fasterxml.jackson.databind.Module; +import com.google.inject.Binder; +import com.google.inject.util.Providers; +import org.apache.druid.indexing.compact.CompactionScheduler; +import org.apache.druid.indexing.overlord.TaskMaster; +import org.apache.druid.indexing.overlord.TaskStorage; +import org.apache.druid.initialization.DruidModule; +import org.apache.druid.segment.incremental.RowIngestionMetersFactory; + +import java.util.List; + +/** + * Contains bindings necessary for Coordinator to perform supervisor cleanup + * when Coordinator and Overlord are running as separate processes. + */ +public class SupervisorCleanupModule implements DruidModule +{ + @Override + public void configure(Binder binder) + { + // These are needed to deserialize SupervisorSpec for Supervisor Auto Cleanup + binder.bind(TaskStorage.class).toProvider(Providers.of(null)); + binder.bind(TaskMaster.class).toProvider(Providers.of(null)); + binder.bind(RowIngestionMetersFactory.class).toProvider(Providers.of(null)); + binder.bind(CompactionScheduler.class).toProvider(Providers.of(null)); + } + + @Override + public List getJacksonModules() + { + return new SupervisorModule().getJacksonModules(); + } +}