Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cleanup InstanceOperations API for active scans/compactions #5102

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.util.List;
import java.util.Map;

import org.apache.accumulo.core.client.admin.servers.ServerId;
import org.apache.accumulo.core.data.Column;
import org.apache.accumulo.core.data.TabletId;
import org.apache.accumulo.core.security.Authorizations;
Expand Down Expand Up @@ -96,4 +97,11 @@ public abstract class ActiveScan {
* @since 1.5.0
*/
public abstract long getIdleTime();

/**
* Return the host where the compaction is running.
*
* @since 4.0.0
*/
public abstract ServerId getHost();
cshannon marked this conversation as resolved.
Show resolved Hide resolved
}
Original file line number Diff line number Diff line change
Expand Up @@ -259,22 +259,22 @@ Set<ServerId> getServers(ServerId.Type type, Predicate<String> resourceGroupPred
* @param tserver The tablet server address. This should be of the form
* {@code <ip address>:<port>}
* @return A list of active scans on tablet server.
* @deprecated see {@link #getActiveScans(ServerId)}
* @deprecated see {@link #getActiveScans(Collection)}
*/
@Deprecated(since = "4.0.0")
List<ActiveScan> getActiveScans(String tserver)
throws AccumuloException, AccumuloSecurityException;

/**
* List the active scans on a server.
* List the active scans on a collection of servers.
*
* @param server server type and address
* @return A stream of active scans on server.
* @param servers Collection of server types and addresses
* @return A stream of active scans on the given servers.
* @throws IllegalArgumentException when the type of the server is not TABLET_SERVER or
* SCAN_SERVER
* @since 4.0.0
*/
List<ActiveScan> getActiveScans(ServerId server)
List<ActiveScan> getActiveScans(Collection<ServerId> servers)
throws AccumuloException, AccumuloSecurityException;

/**
Expand All @@ -286,32 +286,20 @@ List<ActiveScan> getActiveScans(ServerId server)
* @param tserver The server address. This should be of the form {@code <ip address>:<port>}
* @return the list of active compactions
* @since 1.5.0
* @deprecated see {@link #getActiveCompactions(ServerId server)}
* @deprecated see {@link #getActiveCompactions(Collection)}
*/
@Deprecated(since = "4.0.0")
List<ActiveCompaction> getActiveCompactions(String tserver)
throws AccumuloException, AccumuloSecurityException;

/**
* List the active compaction running on a TabletServer or Compactor. The server address can be
* retrieved using {@link #getCompactors()} or {@link #getTabletServers()}. Use
* {@link #getActiveCompactions()} to get a list of all compactions running on tservers and
* compactors.
*
* @param server The ServerId object
* @return the list of active compactions
* @throws IllegalArgumentException when the type of the server is not TABLET_SERVER or COMPACTOR
* @since 4.0.0
*/
List<ActiveCompaction> getActiveCompactions(ServerId server)
throws AccumuloException, AccumuloSecurityException;

/**
* List all internal and external compactions running in Accumulo.
*
* @return the list of active compactions
* @since 2.1.0
* @deprecated see {@link #getActiveCompactions(Collection)}
*/
@Deprecated(since = "4.0.0")
List<ActiveCompaction> getActiveCompactions() throws AccumuloException, AccumuloSecurityException;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,13 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Objects;

import org.apache.accumulo.core.client.TableNotFoundException;
import org.apache.accumulo.core.client.admin.ActiveScan;
import org.apache.accumulo.core.client.admin.ScanState;
import org.apache.accumulo.core.client.admin.ScanType;
import org.apache.accumulo.core.client.admin.servers.ServerId;
import org.apache.accumulo.core.data.Column;
import org.apache.accumulo.core.data.TableId;
import org.apache.accumulo.core.data.TabletId;
Expand All @@ -44,20 +46,21 @@ public class ActiveScanImpl extends ActiveScan {

private final long scanId;
private final String client;
private String tableName;
private final String tableName;
private final long age;
private final long idle;
private ScanType type;
private ScanState state;
private KeyExtent extent;
private List<Column> columns;
private List<String> ssiList;
private Map<String,Map<String,String>> ssio;
private final ScanType type;
keith-turner marked this conversation as resolved.
Show resolved Hide resolved
private final ScanState state;
private final KeyExtent extent;
private final List<Column> columns;
private final List<String> ssiList;
private final Map<String,Map<String,String>> ssio;
private final String user;
private Authorizations authorizations;
private final Authorizations authorizations;
private final ServerId server;

ActiveScanImpl(ClientContext context,
org.apache.accumulo.core.tabletscan.thrift.ActiveScan activeScan)
org.apache.accumulo.core.tabletscan.thrift.ActiveScan activeScan, ServerId server)
throws TableNotFoundException {
this.scanId = activeScan.scanId;
this.client = activeScan.client;
Expand All @@ -81,6 +84,7 @@ public class ActiveScanImpl extends ActiveScan {
this.ssiList.add(ii.iterName + "=" + ii.priority + "," + ii.className);
}
this.ssio = activeScan.ssio;
this.server = Objects.requireNonNull(server);
}

@Override
Expand Down Expand Up @@ -152,4 +156,9 @@ public Authorizations getAuthorizations() {
public long getIdleTime() {
return idle;
}

@Override
public ServerId getHost() {
return server;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import static org.apache.accumulo.core.rpc.ThriftUtil.getClient;
import static org.apache.accumulo.core.rpc.ThriftUtil.returnClient;
import static org.apache.accumulo.core.util.threads.ThreadPoolNames.INSTANCE_OPS_COMPACTIONS_FINDER_POOL;
import static org.apache.accumulo.core.util.threads.ThreadPoolNames.INSTANCE_OPS_SCANS_FINDER_POOL;

import java.time.Duration;
import java.util.ArrayList;
Expand All @@ -38,6 +39,7 @@
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.function.BiPredicate;
import java.util.function.Consumer;
Expand Down Expand Up @@ -72,12 +74,14 @@
import org.apache.accumulo.core.util.LocalityGroupUtil.LocalityGroupConfigurationError;
import org.apache.accumulo.core.util.Retry;
import org.apache.accumulo.core.util.compaction.ExternalCompactionUtil;
import org.apache.accumulo.core.util.threads.ThreadPoolNames;
import org.apache.thrift.TException;
import org.apache.thrift.transport.TTransport;
import org.slf4j.LoggerFactory;

import com.google.common.base.Preconditions;
import com.google.common.net.HostAndPort;
import com.google.common.util.concurrent.MoreExecutors;

/**
* Provides a class for administering the accumulo instance
Expand Down Expand Up @@ -265,33 +269,17 @@ public List<String> getTabletServers() {
@Deprecated(since = "4.0.0")
public List<ActiveScan> getActiveScans(String tserver)
throws AccumuloException, AccumuloSecurityException {
final var parsedTserver = HostAndPort.fromString(tserver);
TabletScanClientService.Client client = null;
try {
client = getClient(ThriftClientTypes.TABLET_SCAN, parsedTserver, context);

List<ActiveScan> as = new ArrayList<>();
for (var activeScan : client.getActiveScans(TraceUtil.traceInfo(), context.rpcCreds())) {
try {
as.add(new ActiveScanImpl(context, activeScan));
} catch (TableNotFoundException e) {
throw new AccumuloException(e);
}
}
return as;
} catch (ThriftSecurityException e) {
throw new AccumuloSecurityException(e.user, e.code, e);
} catch (TException e) {
throw new AccumuloException(e);
} finally {
if (client != null) {
returnClient(client, context);
}
}
ServerId si = getServerId(tserver);
cshannon marked this conversation as resolved.
Show resolved Hide resolved
return si == null ? List.of() : getActiveScans(si);
}

@Override
public List<ActiveScan> getActiveScans(ServerId server)
public List<ActiveScan> getActiveScans(Collection<ServerId> servers)
throws AccumuloException, AccumuloSecurityException {
return queryServers(servers, this::getActiveScans, INSTANCE_OPS_SCANS_FINDER_POOL);
}

private List<ActiveScan> getActiveScans(ServerId server)
throws AccumuloException, AccumuloSecurityException {

Objects.requireNonNull(server);
Expand All @@ -309,7 +297,7 @@ public List<ActiveScan> getActiveScans(ServerId server)
List<ActiveScan> as = new ArrayList<>();
for (var activeScan : rpcClient.getActiveScans(TraceUtil.traceInfo(), context.rpcCreds())) {
try {
as.add(new ActiveScanImpl(context, activeScan));
as.add(new ActiveScanImpl(context, activeScan, server));
} catch (TableNotFoundException e) {
throw new AccumuloException(e);
}
Expand Down Expand Up @@ -337,21 +325,11 @@ public boolean testClassLoad(final String className, final String asTypeName)
@Deprecated
public List<ActiveCompaction> getActiveCompactions(String server)
throws AccumuloException, AccumuloSecurityException {

HostAndPort hp = HostAndPort.fromString(server);

ServerId si = getServer(ServerId.Type.COMPACTOR, null, hp.getHost(), hp.getPort());
if (si == null) {
si = getServer(ServerId.Type.TABLET_SERVER, null, hp.getHost(), hp.getPort());
}
if (si == null) {
return List.of();
}
return getActiveCompactions(si);
ServerId si = getServerId(server);
return si == null ? List.of() : getActiveCompactions(si);
}

@Override
public List<ActiveCompaction> getActiveCompactions(ServerId server)
private List<ActiveCompaction> getActiveCompactions(ServerId server)
throws AccumuloException, AccumuloSecurityException {

Objects.requireNonNull(server);
Expand Down Expand Up @@ -391,6 +369,7 @@ public List<ActiveCompaction> getActiveCompactions(ServerId server)
}

@Override
@Deprecated
public List<ActiveCompaction> getActiveCompactions()
throws AccumuloException, AccumuloSecurityException {

Expand All @@ -404,19 +383,34 @@ public List<ActiveCompaction> getActiveCompactions()
@Override
public List<ActiveCompaction> getActiveCompactions(Collection<ServerId> compactionServers)
throws AccumuloException, AccumuloSecurityException {
return queryServers(compactionServers, this::getActiveCompactions,
INSTANCE_OPS_COMPACTIONS_FINDER_POOL);
}

private <T> List<T> queryServers(Collection<ServerId> servers, ServerQuery<List<T>> serverQuery,
ThreadPoolNames pool) throws AccumuloException, AccumuloSecurityException {

final ExecutorService executorService;
// If size 0 or 1 there's no need to create a thread pool
if (servers.isEmpty()) {
return List.of();
} else if (servers.size() == 1) {
executorService = MoreExecutors.newDirectExecutorService();
} else {
int numThreads = Math.max(4, Math.min((servers.size()) / 10, 256));
executorService =
context.threadPools().getPoolBuilder(pool).numCoreThreads(numThreads).build();
}

int numThreads = Math.max(4, Math.min((compactionServers.size()) / 10, 256));
var executorService = context.threadPools().getPoolBuilder(INSTANCE_OPS_COMPACTIONS_FINDER_POOL)
.numCoreThreads(numThreads).build();
try {
List<Future<List<ActiveCompaction>>> futures = new ArrayList<>();
List<Future<List<T>>> futures = new ArrayList<>();

for (ServerId server : compactionServers) {
futures.add(executorService.submit(() -> getActiveCompactions(server)));
for (ServerId server : servers) {
futures.add(executorService.submit(() -> serverQuery.execute(server)));
}

List<ActiveCompaction> ret = new ArrayList<>();
for (Future<List<ActiveCompaction>> future : futures) {
List<T> ret = new ArrayList<>();
for (Future<List<T>> future : futures) {
try {
ret.addAll(future.get());
} catch (InterruptedException | ExecutionException e) {
Expand Down Expand Up @@ -635,4 +629,18 @@ private ServerId createServerId(ServerId.Type type, ServiceLockPath slp) {
return new ServerId(type, resourceGroup, host, port);
}

private ServerId getServerId(String server) {
HostAndPort hp = HostAndPort.fromString(server);

ServerId si = getServer(ServerId.Type.COMPACTOR, null, hp.getHost(), hp.getPort());
if (si == null) {
si = getServer(ServerId.Type.TABLET_SERVER, null, hp.getHost(), hp.getPort());
}

return si;
}

interface ServerQuery<T> {
T execute(ServerId server) throws AccumuloException, AccumuloSecurityException;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ public enum ThreadPoolNames {
SERVICE_LOCK_POOL("accumulo.pool.service.lock"),
IMPORT_TABLE_RENAME_POOL("accumulo.pool.import.table.rename"),
INSTANCE_OPS_COMPACTIONS_FINDER_POOL("accumulo.pool.instance.ops.active.compactions.finder"),
INSTANCE_OPS_SCANS_FINDER_POOL("accumulo.pool.instance.ops.active.scans.finder"),
MANAGER_FATE_POOL("accumulo.pool.manager.fate"),
MANAGER_STATUS_POOL("accumulo.pool.manager.status"),
MANAGER_UPGRADE_COORDINATOR_METADATA_POOL("accumulo.pool.manager.upgrade.metadata"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,8 @@ public static Stream<String> activeCompactionsForServer(String tserver,
return Stream.of();
} else {
try {
return instanceOps.getActiveCompactions(server).stream().sorted(COMPACTION_AGE_DESCENDING)
return instanceOps.getActiveCompactions(List.of(server)).stream()
.sorted(COMPACTION_AGE_DESCENDING)
.map(ActiveCompactionHelper::formatActiveCompactionLine);
} catch (Exception e) {
LOG.debug("Failed to list active compactions for server {}", tserver, e);
Expand Down Expand Up @@ -160,7 +161,10 @@ public static Stream<String> activeCompactions(InstanceOperations instanceOps,

public static Stream<String> activeCompactions(InstanceOperations instanceOps) {
try {
return sortActiveCompactions(instanceOps.getActiveCompactions());
Set<ServerId> compactionServers = new HashSet<>();
compactionServers.addAll(instanceOps.getServers(ServerId.Type.COMPACTOR));
compactionServers.addAll(instanceOps.getServers(ServerId.Type.TABLET_SERVER));
return sortActiveCompactions(instanceOps.getActiveCompactions(compactionServers));
} catch (AccumuloException | AccumuloSecurityException e) {
return Stream.of("ERROR " + e.getMessage());
}
Expand Down
Loading