From 83299e9882991f8e0d4f648c07d027c4662a807f Mon Sep 17 00:00:00 2001 From: Abhishek Radhakrishnan Date: Tue, 24 Sep 2024 13:06:23 -0700 Subject: [PATCH] Miscellaneous cleanup in the supervisor API flow. (#17144) Extracting a few miscellaneous non-functional changes from the batch supervisor branch: - Replace anonymous inner classes with lambda expressions in the SQL supervisor manager layer - Add explicit @Nullable annotations in DynamicConfigProviderUtils to make IDE happy - Small variable renames (copy-paste error perhaps) and fix typos - Add table name for this exception message: Delete the supervisor from the table[%s] in the database... - Prefer CollectionUtils.isEmptyOrNull() over list == null || list.size() > 0. We can change the Precondition checks to throwing DruidException separately for a batch of APIs at a time. --- .../supervisor/SupervisorResource.java | 7 +- .../utils/DynamicConfigProviderUtils.java | 7 +- .../SQLMetadataSupervisorManager.java | 271 +++++++----------- .../SQLMetadataSupervisorManagerTest.java | 15 +- 4 files changed, 110 insertions(+), 190 deletions(-) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResource.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResource.java index 0cf58d385122..130f617d59d1 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResource.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResource.java @@ -50,6 +50,7 @@ import org.apache.druid.server.security.Resource; import org.apache.druid.server.security.ResourceAction; import org.apache.druid.server.security.ResourceType; +import org.apache.druid.utils.CollectionUtils; import javax.annotation.Nonnull; import javax.annotation.Nullable; @@ -123,7 +124,7 @@ public Response specPost(final SupervisorSpec spec, @Context final HttpServletRe return asLeaderWithSupervisorManager( manager -> { Preconditions.checkArgument( - spec.getDataSources() != null && spec.getDataSources().size() > 0, + !CollectionUtils.isNullOrEmpty(spec.getDataSources()), "No dataSources found to perform authorization checks" ); final Set resourceActions; @@ -412,7 +413,7 @@ public Response shutdown(@PathParam("id") final String id) public Response handoffTaskGroups(@PathParam("id") final String id, @Nonnull final HandoffTaskGroupsRequest handoffTaskGroupsRequest) { List taskGroupIds = handoffTaskGroupsRequest.getTaskGroupIds(); - if (taskGroupIds == null || taskGroupIds.isEmpty()) { + if (CollectionUtils.isNullOrEmpty(taskGroupIds)) { return Response.status(Response.Status.BAD_REQUEST) .entity(ImmutableMap.of("error", "List of task groups to handoff can't be empty")) .build(); @@ -533,7 +534,7 @@ public Response specGetHistory( authorizerMapper ) ); - if (authorizedHistoryForId.size() > 0) { + if (!authorizedHistoryForId.isEmpty()) { return Response.ok(authorizedHistoryForId).build(); } } diff --git a/processing/src/main/java/org/apache/druid/utils/DynamicConfigProviderUtils.java b/processing/src/main/java/org/apache/druid/utils/DynamicConfigProviderUtils.java index 0b47116f0e7b..38e227987a64 100644 --- a/processing/src/main/java/org/apache/druid/utils/DynamicConfigProviderUtils.java +++ b/processing/src/main/java/org/apache/druid/utils/DynamicConfigProviderUtils.java @@ -22,13 +22,14 @@ import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.druid.metadata.DynamicConfigProvider; +import javax.annotation.Nullable; import java.util.Collections; import java.util.HashMap; import java.util.Map; public class DynamicConfigProviderUtils { - public static Map extraConfigAndSetStringMap(Map config, String dynamicConfigProviderKey, ObjectMapper mapper) + public static Map extraConfigAndSetStringMap(@Nullable Map config, String dynamicConfigProviderKey, ObjectMapper mapper) { HashMap newConfig = new HashMap<>(); if (config != null) { @@ -43,7 +44,7 @@ public static Map extraConfigAndSetStringMap(Map return newConfig; } - public static Map extraConfigAndSetObjectMap(Map config, String dynamicConfigProviderKey, ObjectMapper mapper) + public static Map extraConfigAndSetObjectMap(@Nullable Map config, String dynamicConfigProviderKey, ObjectMapper mapper) { HashMap newConfig = new HashMap<>(); if (config != null) { @@ -58,7 +59,7 @@ public static Map extraConfigAndSetObjectMap(Map return newConfig; } - private static Map extraConfigFromProvider(Object dynamicConfigProviderJson, ObjectMapper mapper) + private static Map extraConfigFromProvider(@Nullable Object dynamicConfigProviderJson, ObjectMapper mapper) { if (dynamicConfigProviderJson != null) { DynamicConfigProvider dynamicConfigProvider = mapper.convertValue(dynamicConfigProviderJson, DynamicConfigProvider.class); diff --git a/server/src/main/java/org/apache/druid/metadata/SQLMetadataSupervisorManager.java b/server/src/main/java/org/apache/druid/metadata/SQLMetadataSupervisorManager.java index 91d718197210..5564d715792e 100644 --- a/server/src/main/java/org/apache/druid/metadata/SQLMetadataSupervisorManager.java +++ b/server/src/main/java/org/apache/druid/metadata/SQLMetadataSupervisorManager.java @@ -20,7 +20,6 @@ package org.apache.druid.metadata; import com.fasterxml.jackson.core.JsonParseException; -import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.JsonMappingException; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Supplier; @@ -38,9 +37,7 @@ import org.apache.druid.java.util.common.lifecycle.LifecycleStart; import org.apache.druid.java.util.common.logger.Logger; import org.joda.time.DateTime; -import org.skife.jdbi.v2.FoldController; import org.skife.jdbi.v2.Folder3; -import org.skife.jdbi.v2.Handle; import org.skife.jdbi.v2.IDBI; import org.skife.jdbi.v2.PreparedBatch; import org.skife.jdbi.v2.StatementContext; @@ -91,24 +88,19 @@ public void start() public void insert(final String id, final SupervisorSpec spec) { dbi.withHandle( - new HandleCallback() - { - @Override - public Void withHandle(Handle handle) throws Exception - { - handle.createStatement( - StringUtils.format( - "INSERT INTO %s (spec_id, created_date, payload) VALUES (:spec_id, :created_date, :payload)", - getSupervisorsTable() - ) - ) - .bind("spec_id", id) - .bind("created_date", DateTimes.nowUtc().toString()) - .bind("payload", jsonMapper.writeValueAsBytes(spec)) - .execute(); + handle -> { + handle.createStatement( + StringUtils.format( + "INSERT INTO %s (spec_id, created_date, payload) VALUES (:spec_id, :created_date, :payload)", + getSupervisorsTable() + ) + ) + .bind("spec_id", id) + .bind("created_date", DateTimes.nowUtc().toString()) + .bind("payload", jsonMapper.writeValueAsBytes(spec)) + .execute(); - return null; - } + return null; } ); } @@ -118,54 +110,29 @@ public Map> getAll() { return ImmutableMap.copyOf( dbi.withHandle( - new HandleCallback>>() - { - @Override - public Map> withHandle(Handle handle) - { - return handle.createQuery( - StringUtils.format( - "SELECT id, spec_id, created_date, payload FROM %1$s ORDER BY id DESC", - getSupervisorsTable() - ) - ).map( - new ResultSetMapper>() - { - @Override - public Pair map(int index, ResultSet r, StatementContext ctx) - throws SQLException - { - return Pair.of( - r.getString("spec_id"), - createVersionSupervisorSpecFromResponse(r) - ); - } - } - ).fold( - new HashMap<>(), - new Folder3>, Pair>() - { - @Override - public Map> fold( - Map> retVal, - Pair pair, - FoldController foldController, - StatementContext statementContext - ) - { - try { - String specId = pair.lhs; - retVal.computeIfAbsent(specId, sId -> new ArrayList<>()).add(pair.rhs); - return retVal; - } - catch (Exception e) { - throw new RuntimeException(e); - } - } - } - ); - } - } + (HandleCallback>>) handle -> handle.createQuery( + StringUtils.format( + "SELECT id, spec_id, created_date, payload FROM %1$s ORDER BY id DESC", + getSupervisorsTable() + ) + ).map( + (index, r, ctx) -> Pair.of( + r.getString("spec_id"), + createVersionSupervisorSpecFromResponse(r) + ) + ).fold( + new HashMap<>(), + (Folder3>, Pair>) (retVal, pair, foldController, statementContext) -> { + try { + String specId = pair.lhs; + retVal.computeIfAbsent(specId, sId -> new ArrayList<>()).add(pair.rhs); + return retVal; + } + catch (Exception e) { + throw new RuntimeException(e); + } + } + ) ) ); } @@ -175,30 +142,15 @@ public List getAllForId(String id) { return ImmutableList.copyOf( dbi.withHandle( - new HandleCallback>() - { - @Override - public List withHandle(Handle handle) - { - return handle.createQuery( - StringUtils.format( - "SELECT id, spec_id, created_date, payload FROM %1$s WHERE spec_id = :spec_id ORDER BY id DESC", - getSupervisorsTable() - ) - ).bind("spec_id", id - ).map( - new ResultSetMapper() - { - @Override - public VersionedSupervisorSpec map(int index, ResultSet r, StatementContext ctx) - throws SQLException - { - return createVersionSupervisorSpecFromResponse(r); - } - } - ).list(); - } - } + (HandleCallback>) handle -> handle.createQuery( + StringUtils.format( + "SELECT id, spec_id, created_date, payload FROM %1$s WHERE spec_id = :spec_id ORDER BY id DESC", + getSupervisorsTable() + ) + ) + .bind("spec_id", id) + .map((index, r, ctx) -> createVersionSupervisorSpecFromResponse(r)) + .list() ) ); } @@ -207,12 +159,7 @@ private VersionedSupervisorSpec createVersionSupervisorSpecFromResponse(ResultSe { SupervisorSpec payload; try { - payload = jsonMapper.readValue( - r.getBytes("payload"), - new TypeReference() - { - } - ); + payload = jsonMapper.readValue(r.getBytes("payload"), SupervisorSpec.class); } catch (JsonParseException | JsonMappingException e) { log.warn("Failed to deserialize payload for spec_id[%s]", r.getString("spec_id")); @@ -229,74 +176,54 @@ public Map getLatest() { return ImmutableMap.copyOf( dbi.withHandle( - new HandleCallback>() - { - @Override - public Map withHandle(Handle handle) - { - return handle.createQuery( - StringUtils.format( - "SELECT r.spec_id, r.payload " - + "FROM %1$s r " - + "INNER JOIN(SELECT spec_id, max(id) as id FROM %1$s GROUP BY spec_id) latest " - + "ON r.id = latest.id", - getSupervisorsTable() - ) - ).map( - new ResultSetMapper>() - { - @Nullable - @Override - public Pair map(int index, ResultSet r, StatementContext ctx) - throws SQLException - { - try { - return Pair.of( - r.getString("spec_id"), - jsonMapper.readValue( - r.getBytes("payload"), new TypeReference() - { - } - ) - ); - } - catch (IOException e) { - String exceptionMessage = StringUtils.format( - "Could not map json payload to a SupervisorSpec for spec_id: [%s]." - + " Delete the supervisor from the database and re-submit it to the overlord.", - r.getString("spec_id") - ); - log.error(e, exceptionMessage); - return null; - } - } + (HandleCallback>) handle -> handle.createQuery( + StringUtils.format( + "SELECT r.spec_id, r.payload " + + "FROM %1$s r " + + "INNER JOIN(SELECT spec_id, max(id) as id FROM %1$s GROUP BY spec_id) latest " + + "ON r.id = latest.id", + getSupervisorsTable() + ) + ).map( + new ResultSetMapper>() + { + @Nullable + @Override + public Pair map(int index, ResultSet r, StatementContext ctx) + throws SQLException + { + try { + return Pair.of( + r.getString("spec_id"), + jsonMapper.readValue(r.getBytes("payload"), SupervisorSpec.class) + ); + } + catch (IOException e) { + String exceptionMessage = StringUtils.format( + "Could not map json payload to a SupervisorSpec for spec_id: [%s]." + + " Delete the supervisor from the table[%s] in the database and re-submit it to the overlord.", + r.getString("spec_id"), + getSupervisorsTable() + ); + log.error(e, exceptionMessage); + return null; } - ).fold( - new HashMap<>(), - new Folder3, Pair>() - { - @Override - public Map fold( - Map retVal, - Pair stringObjectMap, - FoldController foldController, - StatementContext statementContext - ) - { - try { - if (null != stringObjectMap) { - retVal.put(stringObjectMap.lhs, stringObjectMap.rhs); - } - return retVal; - } - catch (Exception e) { - throw new RuntimeException(e); - } - } + } + } + ).fold( + new HashMap<>(), + (Folder3, Pair>) (retVal, stringObjectMap, foldController, statementContext) -> { + try { + if (null != stringObjectMap) { + retVal.put(stringObjectMap.lhs, stringObjectMap.rhs); } - ); - } - } + return retVal; + } + catch (Exception e) { + throw new RuntimeException(e); + } + } + ) ) ); } @@ -304,10 +231,10 @@ public Map fold( @Override public Map getLatestActiveOnly() { - Map supervisors = getLatest(); - Map activeSupervisors = new HashMap<>(); + final Map supervisors = getLatest(); + final Map activeSupervisors = new HashMap<>(); for (Map.Entry entry : supervisors.entrySet()) { - // Terminated supervisor will have it's latest supervisorSpec as NoopSupervisorSpec + // Terminated supervisor will have its latest supervisorSpec as NoopSupervisorSpec // (NoopSupervisorSpec is used as a tombstone marker) if (!(entry.getValue() instanceof NoopSupervisorSpec)) { activeSupervisors.put(entry.getKey(), entry.getValue()); @@ -319,16 +246,16 @@ public Map getLatestActiveOnly() @Override public Map getLatestTerminatedOnly() { - Map supervisors = getLatest(); - Map activeSupervisors = new HashMap<>(); + final Map supervisors = getLatest(); + final Map terminatedSupervisors = new HashMap<>(); for (Map.Entry entry : supervisors.entrySet()) { - // Terminated supervisor will have it's latest supervisorSpec as NoopSupervisorSpec + // Terminated supervisor will have its latest supervisorSpec as NoopSupervisorSpec // (NoopSupervisorSpec is used as a tombstone marker) if (entry.getValue() instanceof NoopSupervisorSpec) { - activeSupervisors.put(entry.getKey(), entry.getValue()); + terminatedSupervisors.put(entry.getKey(), entry.getValue()); } } - return ImmutableMap.copyOf(activeSupervisors); + return ImmutableMap.copyOf(terminatedSupervisors); } @Override diff --git a/server/src/test/java/org/apache/druid/metadata/SQLMetadataSupervisorManagerTest.java b/server/src/test/java/org/apache/druid/metadata/SQLMetadataSupervisorManagerTest.java index b43958fc746c..ae9316974769 100644 --- a/server/src/test/java/org/apache/druid/metadata/SQLMetadataSupervisorManagerTest.java +++ b/server/src/test/java/org/apache/druid/metadata/SQLMetadataSupervisorManagerTest.java @@ -37,8 +37,6 @@ import org.junit.BeforeClass; import org.junit.Rule; import org.junit.Test; -import org.skife.jdbi.v2.Handle; -import org.skife.jdbi.v2.tweak.HandleCallback; import java.util.Collections; import java.util.List; @@ -65,16 +63,9 @@ public static void setupStatic() public void cleanup() { connector.getDBI().withHandle( - new HandleCallback() - { - @Override - public Void withHandle(Handle handle) - { - handle.createStatement(StringUtils.format("DROP TABLE %s", tablesConfig.getSupervisorTable())) - .execute(); - return null; - } - } + handle -> handle.createStatement(StringUtils.format("DROP TABLE %s", tablesConfig.getSupervisorTable())) + .execute() + ); }