Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Run compaction as a supervisor on Overlord #16768

Merged
merged 35 commits into from
Sep 2, 2024
Merged
Show file tree
Hide file tree
Changes from 33 commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
d89802a
Add Compaction Scheduler
kfaraz Jul 22, 2024
bea1078
Fix checkstyle, forbidden API
kfaraz Jul 22, 2024
9455547
Add some tests
kfaraz Jul 22, 2024
26d4bdc
Add CompactionDutySimulator
kfaraz Jul 24, 2024
caefb27
Fix tests and checkstyle
kfaraz Jul 24, 2024
bbc2eb6
More changes for simulator
kfaraz Jul 27, 2024
8be913e
Merge branch 'master' of github.com:apache/druid into compact_scheduler
kfaraz Jul 27, 2024
23899b8
Use compaction simulator on both coordinator and overlord
kfaraz Jul 30, 2024
9c06504
Merge branch 'master' of github.com:apache/druid into compact_scheduler
kfaraz Jul 30, 2024
f972ea2
Handle master changes
kfaraz Jul 30, 2024
fdcfdf1
Add CompactionSupervisor
kfaraz Aug 8, 2024
4b81779
Merge branch 'master' of github.com:apache/druid into compact_scheduler
kfaraz Aug 8, 2024
4942a08
Add test for OverlordCompactionScheduler
kfaraz Aug 9, 2024
b2ae560
Add more tests
kfaraz Aug 9, 2024
ad98383
Fix flow
kfaraz Aug 10, 2024
2e676be
Remove extra sout
kfaraz Aug 10, 2024
e793f44
Fix CompactSegmentsTest
kfaraz Aug 10, 2024
8ece3f0
Do not call taskStatus API with empty taskIds
kfaraz Aug 10, 2024
2f152e2
Merge branch 'master' of github.com:apache/druid into compact_scheduler
kfaraz Aug 23, 2024
382bfdf
Add javadocs, cleanup, etc.
kfaraz Aug 23, 2024
e111d71
Fix checkstyle, add tests
kfaraz Aug 23, 2024
93284b3
Handle concurrency, validation, cleanup stale entries
kfaraz Aug 28, 2024
273822b
Merge branch 'master' of github.com:apache/druid into compact_scheduler
kfaraz Aug 28, 2024
117b0ab
Fix test, avoid metadata store calls
kfaraz Aug 28, 2024
0a584d4
Redirect compaction progress APIs to Overlord
kfaraz Aug 30, 2024
bfc9ceb
Merge branch 'master' of github.com:apache/druid into compact_scheduler
kfaraz Aug 30, 2024
ab711ed
Bind CompactionSupervisorConfig in SupervisorModule
kfaraz Aug 30, 2024
e3012f3
Merge branch 'master' of github.com:apache/druid into compact_scheduler
kfaraz Aug 30, 2024
bca9810
Minor cleanup and tests
kfaraz Aug 31, 2024
6d84927
Merge branch 'master' of github.com:apache/druid into compact_scheduler
kfaraz Aug 31, 2024
b439935
Remove unused methods, cleanup tests
kfaraz Sep 1, 2024
517f9ad
Rename SegmentsToCompact to CompactionCandidate
kfaraz Sep 1, 2024
eceb767
Fix tests
kfaraz Sep 1, 2024
9d04d9b
Fix checkstyle
kfaraz Sep 1, 2024
3d2e58f
Merge branch 'master' of github.com:apache/druid into compact_scheduler
kfaraz Sep 1, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,11 @@
import org.apache.druid.client.DataSourcesSnapshot;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.server.coordinator.compact.CompactionSegmentIterator;
import org.apache.druid.server.coordinator.compact.CompactionSegmentSearchPolicy;
import org.apache.druid.server.coordinator.compact.NewestSegmentFirstPolicy;
import org.apache.druid.server.compaction.CompactionSegmentIterator;
import org.apache.druid.server.compaction.CompactionCandidateSearchPolicy;
import org.apache.druid.server.compaction.CompactionStatusTracker;
import org.apache.druid.server.compaction.NewestSegmentFirstPolicy;
import org.apache.druid.server.compaction.PriorityBasedCompactionSegmentIterator;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.SegmentTimeline;
import org.apache.druid.timeline.partition.NumberedShardSpec;
Expand Down Expand Up @@ -61,7 +63,7 @@ public class NewestSegmentFirstPolicyBenchmark
{
private static final String DATA_SOURCE_PREFIX = "dataSource_";

private final CompactionSegmentSearchPolicy policy = new NewestSegmentFirstPolicy(new DefaultObjectMapper());
private final CompactionCandidateSearchPolicy policy = new NewestSegmentFirstPolicy(null);

@Param("100")
private int numDataSources;
Expand Down Expand Up @@ -132,7 +134,13 @@ public void setup()
@Benchmark
public void measureNewestSegmentFirstPolicy(Blackhole blackhole)
{
final CompactionSegmentIterator iterator = policy.createIterator(compactionConfigs, dataSources, Collections.emptyMap());
final CompactionSegmentIterator iterator = new PriorityBasedCompactionSegmentIterator(
policy,
compactionConfigs,
dataSources,
Collections.emptyMap(),
new CompactionStatusTracker(new DefaultObjectMapper())
);
for (int i = 0; i < numCompactionTaskSlots && iterator.hasNext(); i++) {
blackhole.consume(iterator.next());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.druid.indexer.report.KillTaskReport;
import org.apache.druid.indexer.report.TaskReport;
import org.apache.druid.msq.exec.Controller;
import org.apache.druid.segment.TestDataSource;
import org.apache.druid.server.security.AuthorizerMapper;
import org.junit.Assert;
import org.junit.Test;
Expand All @@ -33,8 +34,6 @@

public class ControllerChatHandlerTest
{
private static final String DATASOURCE = "wiki";

@Test
public void testHttpGetLiveReports()
{
Expand All @@ -47,7 +46,8 @@ public void testHttpGetLiveReports()
.thenReturn(reportMap);

final AuthorizerMapper authorizerMapper = new AuthorizerMapper(null);
ControllerChatHandler chatHandler = new ControllerChatHandler(controller, DATASOURCE, authorizerMapper);
ControllerChatHandler chatHandler
= new ControllerChatHandler(controller, TestDataSource.WIKI, authorizerMapper);

HttpServletRequest httpRequest = Mockito.mock(HttpServletRequest.class);
Mockito.when(httpRequest.getAttribute(ArgumentMatchers.anyString()))
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/


package org.apache.druid.guice;

import com.fasterxml.jackson.databind.Module;
import com.fasterxml.jackson.databind.jsontype.NamedType;
import com.fasterxml.jackson.databind.module.SimpleModule;
import com.google.common.collect.ImmutableList;
import com.google.inject.Binder;
import org.apache.druid.indexing.compact.CompactionSupervisorSpec;
import org.apache.druid.indexing.overlord.supervisor.SupervisorStateManagerConfig;
import org.apache.druid.initialization.DruidModule;
import org.apache.druid.server.coordinator.CompactionSupervisorConfig;

import java.util.List;

public class SupervisorModule implements DruidModule
{
@Override
public void configure(Binder binder)
{
JsonConfigProvider.bind(binder, "druid.supervisor", SupervisorStateManagerConfig.class);
JsonConfigProvider.bind(binder, "druid.supervisor.compaction", CompactionSupervisorConfig.class);
}

@Override
public List<? extends Module> getJacksonModules()
{
return ImmutableList.of(
new SimpleModule(getClass().getSimpleName())
.registerSubtypes(
new NamedType(CompactionSupervisorSpec.class, CompactionSupervisorSpec.TYPE)
)
);
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need this interface?

Copy link
Contributor Author

@kfaraz kfaraz Aug 23, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Initially, I had intended to use this interface in DruidCoordinator too but that seemed unnecessary.

Right now, the purpose of this interface is to hide implementation details in the OverlordCompactionScheduler and cleaner guice bindings. It can also be useful for unit testing classes that depend on the OverlordCompactionScheduler.

Let me know if it seems unnecessary.

Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.indexing.compact;

import org.apache.druid.server.compaction.CompactionSimulateResult;
import org.apache.druid.server.coordinator.AutoCompactionSnapshot;
import org.apache.druid.server.coordinator.ClusterCompactionConfig;
import org.apache.druid.server.coordinator.CompactionConfigValidationResult;
import org.apache.druid.server.coordinator.CompactionSupervisorConfig;
import org.apache.druid.server.coordinator.DataSourceCompactionConfig;

import java.util.Map;

/**
* Compaction scheduler that runs on the Overlord if {@link CompactionSupervisorConfig}
* is enabled.
* <p>
* Usage:
* <ul>
* <li>When an active {@link CompactionSupervisor} starts, it should register
* itself by calling {@link #startCompaction}.</li>
* <li>When a suspended {@link CompactionSupervisor} starts, it should stop
* compaction by calling {@link #stopCompaction}.</li>
* <li>When stopping, any {@link CompactionSupervisor} (active or suspended)
* should call {@link #stopCompaction}.</li>
* </ul>
*/
public interface CompactionScheduler
kfaraz marked this conversation as resolved.
Show resolved Hide resolved
{
void start();

void stop();

boolean isRunning();

CompactionConfigValidationResult validateCompactionConfig(DataSourceCompactionConfig compactionConfig);

void startCompaction(String dataSourceName, DataSourceCompactionConfig compactionConfig);

void stopCompaction(String dataSourceName);

Map<String, AutoCompactionSnapshot> getAllCompactionSnapshots();

AutoCompactionSnapshot getCompactionSnapshot(String dataSource);

CompactionSimulateResult simulateRunWithConfigUpdate(ClusterCompactionConfig updateRequest);

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.indexing.compact;

import org.apache.druid.indexing.overlord.DataSourceMetadata;
import org.apache.druid.indexing.overlord.supervisor.Supervisor;
import org.apache.druid.indexing.overlord.supervisor.SupervisorReport;
import org.apache.druid.indexing.overlord.supervisor.SupervisorStateManager;
import org.apache.druid.indexing.overlord.supervisor.autoscaler.LagStats;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.server.coordinator.AutoCompactionSnapshot;

/**
* Supervisor for compaction of a single datasource.
*/
public class CompactionSupervisor implements Supervisor
{
private static final Logger log = new Logger(CompactionSupervisor.class);

private final String dataSource;
private final CompactionScheduler scheduler;
private final CompactionSupervisorSpec supervisorSpec;

public CompactionSupervisor(
CompactionSupervisorSpec supervisorSpec,
CompactionScheduler scheduler
)
{
this.supervisorSpec = supervisorSpec;
this.scheduler = scheduler;
this.dataSource = supervisorSpec.getSpec().getDataSource();
}

@Override
public void start()
{
if (supervisorSpec.isSuspended()) {
log.info("Suspending compaction for dataSource[%s].", dataSource);
scheduler.stopCompaction(dataSource);
Comment on lines +55 to +57
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When is this case encountered, i.e. the supervisor for the datasource is starting but the compaction for the same datasource needs to be stopped?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is because of the way the supervisor framework works. This code is triggered when start() is called for a suspended supervisor.

} else {
log.info("Starting compaction for dataSource[%s].", dataSource);
scheduler.startCompaction(dataSource, supervisorSpec.getSpec());
}
}

@Override
public void stop(boolean stopGracefully)
{
log.info("Stopping compaction for dataSource[%s].", dataSource);
scheduler.stopCompaction(dataSource);
}

@Override
public SupervisorReport<AutoCompactionSnapshot> getStatus()
{
final AutoCompactionSnapshot snapshot;
if (supervisorSpec.isSuspended()) {
snapshot = AutoCompactionSnapshot.builder(dataSource)
.withStatus(AutoCompactionSnapshot.AutoCompactionScheduleStatus.NOT_ENABLED)
.build();
} else {
snapshot = scheduler.getCompactionSnapshot(dataSource);
}

return new SupervisorReport<>(supervisorSpec.getId(), DateTimes.nowUtc(), snapshot);
}

@Override
public SupervisorStateManager.State getState()
{
if (!scheduler.isRunning()) {
return State.SCHEDULER_STOPPED;
} else if (supervisorSpec.isSuspended()) {
return State.SUSPENDED;
} else {
abhishekagarwal87 marked this conversation as resolved.
Show resolved Hide resolved
return State.RUNNING;
}
}

// Un-implemented methods used only by streaming supervisors

@Override
public void reset(DataSourceMetadata dataSourceMetadata)
{
throw new UnsupportedOperationException("Resetting not supported for 'autocompact' supervisors.");
}

@Override
public void resetOffsets(DataSourceMetadata resetDataSourceMetadata)
{
throw new UnsupportedOperationException("Resetting offsets not supported for 'autocompact' supervisors.");
}

@Override
public void checkpoint(int taskGroupId, DataSourceMetadata checkpointMetadata)
{
throw new UnsupportedOperationException("Checkpointing not supported for 'autocompact' supervisors.");
}

@Override
public LagStats computeLagStats()
{
throw new UnsupportedOperationException("Lag stats not supported for 'autocompact' supervisors.");
}

@Override
public int getActiveTaskGroupsCount()
{
throw new UnsupportedOperationException("Task groups not supported for 'autocompact' supervisors.");
}

public enum State implements SupervisorStateManager.State
Fixed Show fixed Hide fixed

Check notice

Code scanning / CodeQL

Class has same name as super class Note

State has the same name as its supertype
org.apache.druid.indexing.overlord.supervisor.SupervisorStateManager$State
.
{
SCHEDULER_STOPPED(true),
RUNNING(true),
SUSPENDED(true),
UNHEALTHY(false);

private final boolean healthy;

State(boolean healthy)
{
this.healthy = healthy;
}

@Override
public boolean isFirstRunOnly()
{
return false;
}

@Override
public boolean isHealthy()
{
return healthy;
}
}
}
Loading
Loading