Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Run compaction as a supervisor on Overlord #16768

Merged
merged 35 commits into from
Sep 2, 2024
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
d89802a
Add Compaction Scheduler
kfaraz Jul 22, 2024
bea1078
Fix checkstyle, forbidden API
kfaraz Jul 22, 2024
9455547
Add some tests
kfaraz Jul 22, 2024
26d4bdc
Add CompactionDutySimulator
kfaraz Jul 24, 2024
caefb27
Fix tests and checkstyle
kfaraz Jul 24, 2024
bbc2eb6
More changes for simulator
kfaraz Jul 27, 2024
8be913e
Merge branch 'master' of github.com:apache/druid into compact_scheduler
kfaraz Jul 27, 2024
23899b8
Use compaction simulator on both coordinator and overlord
kfaraz Jul 30, 2024
9c06504
Merge branch 'master' of github.com:apache/druid into compact_scheduler
kfaraz Jul 30, 2024
f972ea2
Handle master changes
kfaraz Jul 30, 2024
fdcfdf1
Add CompactionSupervisor
kfaraz Aug 8, 2024
4b81779
Merge branch 'master' of github.com:apache/druid into compact_scheduler
kfaraz Aug 8, 2024
4942a08
Add test for OverlordCompactionScheduler
kfaraz Aug 9, 2024
b2ae560
Add more tests
kfaraz Aug 9, 2024
ad98383
Fix flow
kfaraz Aug 10, 2024
2e676be
Remove extra sout
kfaraz Aug 10, 2024
e793f44
Fix CompactSegmentsTest
kfaraz Aug 10, 2024
8ece3f0
Do not call taskStatus API with empty taskIds
kfaraz Aug 10, 2024
2f152e2
Merge branch 'master' of github.com:apache/druid into compact_scheduler
kfaraz Aug 23, 2024
382bfdf
Add javadocs, cleanup, etc.
kfaraz Aug 23, 2024
e111d71
Fix checkstyle, add tests
kfaraz Aug 23, 2024
93284b3
Handle concurrency, validation, cleanup stale entries
kfaraz Aug 28, 2024
273822b
Merge branch 'master' of github.com:apache/druid into compact_scheduler
kfaraz Aug 28, 2024
117b0ab
Fix test, avoid metadata store calls
kfaraz Aug 28, 2024
0a584d4
Redirect compaction progress APIs to Overlord
kfaraz Aug 30, 2024
bfc9ceb
Merge branch 'master' of github.com:apache/druid into compact_scheduler
kfaraz Aug 30, 2024
ab711ed
Bind CompactionSupervisorConfig in SupervisorModule
kfaraz Aug 30, 2024
e3012f3
Merge branch 'master' of github.com:apache/druid into compact_scheduler
kfaraz Aug 30, 2024
bca9810
Minor cleanup and tests
kfaraz Aug 31, 2024
6d84927
Merge branch 'master' of github.com:apache/druid into compact_scheduler
kfaraz Aug 31, 2024
b439935
Remove unused methods, cleanup tests
kfaraz Sep 1, 2024
517f9ad
Rename SegmentsToCompact to CompactionCandidate
kfaraz Sep 1, 2024
eceb767
Fix tests
kfaraz Sep 1, 2024
9d04d9b
Fix checkstyle
kfaraz Sep 1, 2024
3d2e58f
Merge branch 'master' of github.com:apache/druid into compact_scheduler
kfaraz Sep 1, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,10 @@
import org.apache.druid.client.DataSourcesSnapshot;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.server.coordinator.compact.CompactionSegmentIterator;
import org.apache.druid.server.coordinator.compact.CompactionSegmentSearchPolicy;
import org.apache.druid.server.coordinator.compact.NewestSegmentFirstPolicy;
import org.apache.druid.server.compaction.CompactionSegmentIterator;
import org.apache.druid.server.compaction.CompactionSegmentSearchPolicy;
import org.apache.druid.server.compaction.CompactionStatusTracker;
import org.apache.druid.server.compaction.NewestSegmentFirstPolicy;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.SegmentTimeline;
import org.apache.druid.timeline.partition.NumberedShardSpec;
Expand Down Expand Up @@ -61,7 +62,7 @@ public class NewestSegmentFirstPolicyBenchmark
{
private static final String DATA_SOURCE_PREFIX = "dataSource_";

private final CompactionSegmentSearchPolicy policy = new NewestSegmentFirstPolicy(new DefaultObjectMapper());
private final CompactionSegmentSearchPolicy policy = new NewestSegmentFirstPolicy(null);

@Param("100")
private int numDataSources;
Expand Down Expand Up @@ -132,7 +133,12 @@ public void setup()
@Benchmark
public void measureNewestSegmentFirstPolicy(Blackhole blackhole)
{
final CompactionSegmentIterator iterator = policy.createIterator(compactionConfigs, dataSources, Collections.emptyMap());
final CompactionSegmentIterator iterator = policy.createIterator(
compactionConfigs,
dataSources,
Collections.emptyMap(),
new CompactionStatusTracker(new DefaultObjectMapper())
);
for (int i = 0; i < numCompactionTaskSlots && iterator.hasNext(); i++) {
blackhole.consume(iterator.next());
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/


package org.apache.druid.guice;

import com.fasterxml.jackson.databind.Module;
import com.fasterxml.jackson.databind.jsontype.NamedType;
import com.fasterxml.jackson.databind.module.SimpleModule;
import com.google.common.collect.ImmutableList;
import com.google.inject.Binder;
import org.apache.druid.indexing.compact.CompactionSupervisorSpec;
import org.apache.druid.indexing.overlord.supervisor.SupervisorStateManagerConfig;
import org.apache.druid.initialization.DruidModule;

import java.util.List;

public class SupervisorModule implements DruidModule
{
@Override
public void configure(Binder binder)
{
JsonConfigProvider.bind(binder, "druid.supervisor", SupervisorStateManagerConfig.class);
}

@Override
public List<? extends Module> getJacksonModules()
{
return ImmutableList.of(
new SimpleModule(getClass().getSimpleName())
.registerSubtypes(
new NamedType(CompactionSupervisorSpec.class, CompactionSupervisorSpec.TYPE)
)
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,12 @@
* under the License.
*/

package org.apache.druid.indexing.compact;

package org.apache.druid.indexing.overlord.supervisor;

import com.google.inject.Binder;
import com.google.inject.Module;
import org.apache.druid.guice.JsonConfigProvider;

public class SupervisorModule implements Module
/**
* This can contain stats and progress and stuff.
kfaraz marked this conversation as resolved.
Show resolved Hide resolved
* For simulation, we will still have the separate API.
*/
public class CompactionStatusReport
{
@Override
public void configure(Binder binder)
{
JsonConfigProvider.bind(binder, "druid.supervisor", SupervisorStateManagerConfig.class);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.indexing.compact;

import org.apache.druid.indexing.overlord.DataSourceMetadata;
import org.apache.druid.indexing.overlord.supervisor.Supervisor;
import org.apache.druid.indexing.overlord.supervisor.SupervisorReport;
import org.apache.druid.indexing.overlord.supervisor.SupervisorStateManager;
import org.apache.druid.indexing.overlord.supervisor.autoscaler.LagStats;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.server.compaction.CompactionScheduler;
import org.apache.druid.server.coordinator.AutoCompactionSnapshot;

/**
* Supervisor for compaction of a single datasource.
*/
public class CompactionSupervisor implements Supervisor
{
private static final Logger log = new Logger(CompactionSupervisor.class);

private final CompactionScheduler scheduler;
private final CompactionSupervisorSpec supervisorSpec;

public CompactionSupervisor(
CompactionSupervisorSpec supervisorSpec,
CompactionScheduler scheduler
)
{
this.supervisorSpec = supervisorSpec;
this.scheduler = scheduler;
}

@Override
public void start()
{
final String dataSource = getDataSource();
kfaraz marked this conversation as resolved.
Show resolved Hide resolved
if (supervisorSpec.isSuspended()) {
log.info("Suspending compaction for dataSource[%s].", dataSource);
scheduler.stopCompaction(dataSource);
Comment on lines +55 to +57
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When is this case encountered, i.e. the supervisor for the datasource is starting but the compaction for the same datasource needs to be stopped?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is because of the way the supervisor framework works. This code is triggered when start() is called for a suspended supervisor.

} else {
log.info("Starting compaction for dataSource[%s].", dataSource);
scheduler.startCompaction(dataSource, supervisorSpec.getSpec());
}
}

@Override
public void stop(boolean stopGracefully)
{
final String dataSource = getDataSource();
log.info("Stopping compaction for dataSource[%s].", dataSource);
scheduler.stopCompaction(dataSource);
}

@Override
public SupervisorReport<AutoCompactionSnapshot> getStatus()
{
final AutoCompactionSnapshot snapshot;
if (supervisorSpec.isSuspended()) {
snapshot = AutoCompactionSnapshot.builder(getDataSource())
.withStatus(AutoCompactionSnapshot.AutoCompactionScheduleStatus.NOT_ENABLED)
.build();
} else {
snapshot = scheduler.getCompactionSnapshot(getDataSource());
}

return new SupervisorReport<>(supervisorSpec.getId(), DateTimes.nowUtc(), snapshot);
}

@Override
public SupervisorStateManager.State getState()
{
if (!scheduler.isRunning()) {
return State.SCHEDULER_DISABLED;
} else if (supervisorSpec.isSuspended()) {
return State.SUSPENDED;
} else {
return State.RUNNING;
}
}

private String getDataSource()
{
return supervisorSpec.getSpec().getDataSource();
}

// Un-implemented methods used only by streaming supervisors

@Override
public void reset(DataSourceMetadata dataSourceMetadata)
{
// Do nothing
}

@Override
public void resetOffsets(DataSourceMetadata resetDataSourceMetadata)
{
// Do nothing
}

@Override
public void checkpoint(int taskGroupId, DataSourceMetadata checkpointMetadata)
{
// Do nothing
}

@Override
public LagStats computeLagStats()
{
return new LagStats(0L, 0L, 0L);
}

@Override
public int getActiveTaskGroupsCount()
{
return 0;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you add a comment as to why it is 0?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"Task groups" are a concept relevant only to streaming supervisors.

As you suggested, I can throw an unsupported exception here.

}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might be worth moving to a BatchJobSupervisor abstract class. Also, shouldn't these thrown an exception so we know these are not used at all since they are not supposed to be used at all.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and MaterializedViewSupervisor should also extend BatchJobSupervisor?


public enum State implements SupervisorStateManager.State
Fixed Show fixed Hide fixed

Check notice

Code scanning / CodeQL

Class has same name as super class Note

State has the same name as its supertype
org.apache.druid.indexing.overlord.supervisor.SupervisorStateManager$State
.
{
SCHEDULER_DISABLED(true),
RUNNING(true),
SUSPENDED(true),
UNHEALTHY(false);

private final boolean healthy;

State(boolean healthy)
{
this.healthy = healthy;
}

@Override
public boolean isFirstRunOnly()
{
return false;
}

@Override
public boolean isHealthy()
{
return healthy;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.indexing.compact;

import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.druid.common.config.Configs;
import org.apache.druid.error.InvalidInput;
import org.apache.druid.indexing.overlord.supervisor.Supervisor;
import org.apache.druid.indexing.overlord.supervisor.SupervisorSpec;
import org.apache.druid.server.compaction.CompactionScheduler;
import org.apache.druid.server.coordinator.DataSourceCompactionConfig;

import javax.annotation.Nullable;
import java.util.Collections;
import java.util.List;

public class CompactionSupervisorSpec implements SupervisorSpec
{
public static final String TYPE = "autocompact";
public static final String ID_PREFIX = "autocompact__";

private final boolean suspended;
private final DataSourceCompactionConfig spec;
private final CompactionScheduler scheduler;

@JsonCreator
public CompactionSupervisorSpec(
@JsonProperty("spec") DataSourceCompactionConfig spec,
@JsonProperty("suspended") @Nullable Boolean suspended,
@JacksonInject CompactionScheduler scheduler
)
{
if (spec == null) {
throw InvalidInput.exception("'spec' must be specified for a compaction supervisor.");
}

this.spec = spec;
this.suspended = Configs.valueOrDefault(suspended, false);
this.scheduler = scheduler;
}

@JsonProperty
public DataSourceCompactionConfig getSpec()
{
return spec;
}

@Override
@JsonProperty
public boolean isSuspended()
{
return suspended;
}

@Override
public String getId()
{
return ID_PREFIX + spec.getDataSource();
}

@Override
public Supervisor createSupervisor()
{
return new CompactionSupervisor(this, scheduler);
}

@Override
public List<String> getDataSources()
{
return Collections.singletonList(spec.getDataSource());
}

@Override
public SupervisorSpec createSuspendedSpec()
{
return new CompactionSupervisorSpec(spec, true, scheduler);
}

@Override
public SupervisorSpec createRunningSpec()
{
return new CompactionSupervisorSpec(spec, false, scheduler);
}

@Override
public String getType()
{
return TYPE;
}

@Override
public String getSource()
{
return "";
}
}
Loading
Loading