Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use a custom executor service if config is set #73

Open
wants to merge 3 commits into
base: branch-3.4.0
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@
import com.amazonaws.services.glue.model.UserDefinedFunctionInput;
import com.google.common.base.Throwables;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.common.StatsSetupConst;
import org.apache.hadoop.hive.metastore.api.EnvironmentContext;
Expand All @@ -73,7 +72,6 @@
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

import static com.google.common.base.Preconditions.checkArgument;
Expand Down Expand Up @@ -110,29 +108,23 @@ public class DefaultAWSGlueMetastore implements AWSGlueMetastore {
*/
public static final String SKIP_AWS_GLUE_ARCHIVE = "skipAWSGlueArchive";

private static final int NUM_EXECUTOR_THREADS = 5;
static final String GLUE_METASTORE_DELEGATE_THREADPOOL_NAME_FORMAT = "glue-metastore-delegate-%d";
private static final ExecutorService GLUE_METASTORE_DELEGATE_THREAD_POOL = Executors.newFixedThreadPool(
NUM_EXECUTOR_THREADS,
new ThreadFactoryBuilder()
.setNameFormat(GLUE_METASTORE_DELEGATE_THREADPOOL_NAME_FORMAT)
.setDaemon(true).build()
);

private final Configuration conf;
private final AWSGlue glueClient;
private final String catalogId;
private final ExecutorService executorService;
private final int numPartitionSegments;
private static ExecutorService sharedExecutorService;

protected ExecutorService getExecutorService(Configuration conf) {
Class<? extends ExecutorServiceFactory> executorFactoryClass = conf
protected synchronized ExecutorService getExecutorService() {
if (sharedExecutorService == null) {
Class<? extends ExecutorServiceFactory> executorFactoryClass = this.conf
.getClass(CUSTOM_EXECUTOR_FACTORY_CONF,
DefaultExecutorServiceFactory.class).asSubclass(
ExecutorServiceFactory.class);
ExecutorServiceFactory factory = ReflectionUtils.newInstance(
DefaultExecutorServiceFactory.class).asSubclass(
ExecutorServiceFactory.class);
ExecutorServiceFactory factory = ReflectionUtils.newInstance(
executorFactoryClass, conf);
return factory.getExecutorService(conf);
sharedExecutorService = factory.getExecutorService(conf);
}
return sharedExecutorService;
}

public DefaultAWSGlueMetastore(Configuration conf, AWSGlue glueClient) {
Expand All @@ -144,7 +136,6 @@ public DefaultAWSGlueMetastore(Configuration conf, AWSGlue glueClient) {
this.conf = conf;
this.glueClient = glueClient;
this.catalogId = MetastoreClientUtils.getCatalogId(conf);
this.executorService = getExecutorService(conf);
}

// ======================= Database =======================
Expand Down Expand Up @@ -275,7 +266,7 @@ public List<Partition> getPartitionsByNames(String dbName, String tableName,
.withTableName(tableName)
.withPartitionsToGet(batch)
.withCatalogId(catalogId);
batchGetPartitionFutures.add(this.executorService.submit(new Callable<BatchGetPartitionResult>() {
batchGetPartitionFutures.add(getExecutorService().submit(new Callable<BatchGetPartitionResult>() {
@Override
public BatchGetPartitionResult call() throws Exception {
return glueClient.batchGetPartition(request);
Expand Down Expand Up @@ -327,7 +318,7 @@ private List<Partition> getPartitionsParallel(
// We could convert this into a parallelStream after upgrading to JDK 8 compiler base.
List<Future<List<Partition>>> futures = Lists.newArrayList();
for (final Segment segment : segments) {
futures.add(this.executorService.submit(new Callable<List<Partition>>() {
futures.add(getExecutorService().submit(new Callable<List<Partition>>() {
@Override
public List<Partition> call() throws Exception {
return getCatalogPartitions(databaseName, tableName, expression, max, segment);
Expand Down Expand Up @@ -508,7 +499,7 @@ public Map<String, List<ColumnStatistics>> getPartitionColumnStatistics(String d
.withTableName(tableName)
.withPartitionValues(partValues)
.withColumnNames(cols);
pagedResult.add(GLUE_METASTORE_DELEGATE_THREAD_POOL.submit(new Callable<GetColumnStatisticsForPartitionResult>() {
pagedResult.add(getExecutorService().submit(new Callable<GetColumnStatisticsForPartitionResult>() {
@Override
public GetColumnStatisticsForPartitionResult call() throws Exception {
return glueClient.getColumnStatisticsForPartition(request);
Expand Down Expand Up @@ -543,7 +534,7 @@ public List<ColumnStatistics> getTableColumnStatistics(String dbName, String tab
.withDatabaseName(dbName)
.withTableName(tableName)
.withColumnNames(cols);
pagedResult.add(GLUE_METASTORE_DELEGATE_THREAD_POOL.submit(new Callable<GetColumnStatisticsForTableResult>() {
pagedResult.add(getExecutorService().submit(new Callable<GetColumnStatisticsForTableResult>() {
@Override
public GetColumnStatisticsForTableResult call() throws Exception {
return glueClient.getColumnStatisticsForTable(request);
Expand Down Expand Up @@ -581,7 +572,7 @@ public List<ColumnStatisticsError> updatePartitionColumnStatistics(
.withTableName(tableName)
.withPartitionValues(partitionValues)
.withColumnStatisticsList(statList);
pagedResult.add(GLUE_METASTORE_DELEGATE_THREAD_POOL.submit(new Callable<UpdateColumnStatisticsForPartitionResult>() {
pagedResult.add(getExecutorService().submit(new Callable<UpdateColumnStatisticsForPartitionResult>() {
@Override
public UpdateColumnStatisticsForPartitionResult call() throws Exception {
return glueClient.updateColumnStatisticsForPartition(request);
Expand Down Expand Up @@ -617,7 +608,7 @@ public List<ColumnStatisticsError> updateTableColumnStatistics(
.withDatabaseName(dbName)
.withTableName(tableName)
.withColumnStatisticsList(statList);
pagedResult.add(GLUE_METASTORE_DELEGATE_THREAD_POOL.submit(new Callable<UpdateColumnStatisticsForTableResult>() {
pagedResult.add(getExecutorService().submit(new Callable<UpdateColumnStatisticsForTableResult>() {
@Override
public UpdateColumnStatisticsForTableResult call() throws Exception {
return glueClient.updateColumnStatisticsForTable(request);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import com.amazonaws.services.glue.model.UserDefinedFunctionInput;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.conf.Configuration;
Expand Down Expand Up @@ -79,6 +78,7 @@
import org.apache.hadoop.hive.metastore.api.UnknownTableException;
import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
import org.apache.hadoop.hive.metastore.partition.spec.PartitionSpecProxy;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.log4j.Logger;
import org.apache.thrift.TException;

Expand All @@ -93,7 +93,6 @@
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -125,25 +124,32 @@ public class GlueMetastoreClientDelegate {
public static final String MATCH_ALL = ".*";
private static final int BATCH_CREATE_PARTITIONS_MAX_REQUEST_SIZE = 100;

private static final int NUM_EXECUTOR_THREADS = 5;
static final String GLUE_METASTORE_DELEGATE_THREADPOOL_NAME_FORMAT = "glue-metastore-delegate-%d";
private static final ExecutorService GLUE_METASTORE_DELEGATE_THREAD_POOL = Executors.newFixedThreadPool(
NUM_EXECUTOR_THREADS,
new ThreadFactoryBuilder()
.setNameFormat(GLUE_METASTORE_DELEGATE_THREADPOOL_NAME_FORMAT)
.setDaemon(true).build()
);
public static final String CUSTOM_EXECUTOR_FACTORY_CONF = "hive.metastore.executorservice.factory.class";

private final AWSGlueMetastore glueMetastore;
private final Configuration conf;
private final Warehouse wh;
private final AwsGlueHiveShims hiveShims = ShimsLoader.getHiveShims();
private final CatalogToHiveConverter catalogToHiveConverter;
private final String catalogId;
private static ExecutorService sharedExecutorService;

public static final String CATALOG_ID_CONF = "hive.metastore.glue.catalogid";
public static final String NUM_PARTITION_SEGMENTS_CONF = "aws.glue.partition.num.segments";

protected synchronized ExecutorService getExecutorService() {
if (sharedExecutorService == null) {
Class<? extends ExecutorServiceFactory> executorFactoryClass = this.conf
.getClass(CUSTOM_EXECUTOR_FACTORY_CONF,
DefaultExecutorServiceFactory.class).asSubclass(
ExecutorServiceFactory.class);
ExecutorServiceFactory factory = ReflectionUtils.newInstance(
executorFactoryClass, conf);
sharedExecutorService = factory.getExecutorService(conf);
}
return sharedExecutorService;
}

public GlueMetastoreClientDelegate(Configuration conf, AWSGlueMetastore glueMetastore,
Warehouse wh) throws MetaException {
checkNotNull(conf, "Hive Config cannot be null");
Expand Down Expand Up @@ -665,7 +671,7 @@ private List<Partition> batchCreatePartitions(
int j = Math.min(i + BATCH_CREATE_PARTITIONS_MAX_REQUEST_SIZE, catalogPartitions.size());
final List<Partition> partitionsOnePage = catalogPartitions.subList(i, j);

batchCreatePartitionsFutures.add(GLUE_METASTORE_DELEGATE_THREAD_POOL.submit(new Callable<BatchCreatePartitionsHelper>() {
batchCreatePartitionsFutures.add(getExecutorService().submit(new Callable<BatchCreatePartitionsHelper>() {
@Override
public BatchCreatePartitionsHelper call() throws Exception {
return new BatchCreatePartitionsHelper(glueMetastore, dbName, tableName, catalogId, partitionsOnePage, ifNotExists)
Expand Down Expand Up @@ -865,7 +871,7 @@ private void alterPartitionsColumnsParallel(
List<Column> newCols) throws TException {
List<Pair<Partition, Future>> partitionFuturePairs = Collections.synchronizedList(Lists.newArrayList());
partitions.parallelStream().forEach(partition -> partitionFuturePairs.add(Pair.of(partition,
(GLUE_METASTORE_DELEGATE_THREAD_POOL.submit(
(getExecutorService().submit(
() -> alterPartitionColumns(databaseName, tableName, partition, newCols))))));

List<List<String>> failedPartitionValues = new ArrayList<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import com.amazonaws.glue.catalog.converters.GlueInputConverter;
import com.amazonaws.glue.catalog.converters.HiveToCatalogConverter;
import com.amazonaws.glue.catalog.util.TestObjects;
import com.amazonaws.glue.catalog.util.TestExecutorServiceFactory;
import com.amazonaws.services.glue.AWSGlue;
import com.amazonaws.services.glue.model.AlreadyExistsException;
import com.amazonaws.services.glue.model.BatchCreatePartitionRequest;
Expand Down Expand Up @@ -175,6 +176,23 @@ private void setupMockWarehouseForPath(Path path, boolean isDir, boolean mkDir)
when(wh.mkdirs(path)).thenReturn(mkDir);
}

// ===================== Thread Executor =====================

@Test
public void testExecutorService() throws Exception {
Object defaultExecutorService = new DefaultExecutorServiceFactory().getExecutorService(conf);
assertEquals("Default executor service should be used", metastoreClientDelegate.getExecutorService(), defaultExecutorService);
assertEquals("Default executor service should be used", (new DefaultAWSGlueMetastore(conf, glueClient)).getExecutorService(), defaultExecutorService);
HiveConf customConf = new HiveConf();
customConf.set(GlueMetastoreClientDelegate.CATALOG_ID_CONF, CATALOG_ID);
customConf.setClass(GlueMetastoreClientDelegate.CUSTOM_EXECUTOR_FACTORY_CONF, TestExecutorServiceFactory.class, ExecutorServiceFactory.class);
GlueMetastoreClientDelegate customDelegate = new GlueMetastoreClientDelegate(customConf, mock(AWSGlue.class), mock(Warehouse.class));
Object customExecutorService = new TestExecutorServiceFactory().getExecutorService(customConf);

assertEquals("Custom executor service should be used", customDelegate.getExecutorService(), customExecutorService);
assertEquals("Default executor service should be used", (new DefaultAWSGlueMetastore(customConf, glueClient)).getExecutorService(), customExecutorService);
}

// ===================== Database =====================

@Test
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package com.amazonaws.glue.catalog.util;

import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;

public class TestExecutorService extends ScheduledThreadPoolExecutor {

public TestExecutorService(int corePoolSize, ThreadFactory factory) {
super(corePoolSize, factory);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package com.amazonaws.glue.catalog.util;

import com.amazonaws.glue.catalog.metastore.ExecutorServiceFactory;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.hadoop.hive.conf.HiveConf;

import java.util.concurrent.ExecutorService;

public class TestExecutorServiceFactory implements ExecutorServiceFactory {
private static ExecutorService execService = new TestExecutorService(1, new ThreadFactoryBuilder().build());

@Override
public ExecutorService getExecutorService(HiveConf conf) {
return execService;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import com.google.common.base.Strings;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
Expand Down Expand Up @@ -124,6 +123,7 @@
import org.apache.hadoop.hive.metastore.partition.spec.PartitionSpecProxy;
import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils;
import org.apache.hadoop.hive.metastore.utils.ObjectPair;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.log4j.Logger;
import org.apache.thrift.TException;

Expand All @@ -136,7 +136,6 @@
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.regex.Pattern;

Expand All @@ -158,17 +157,27 @@ public class AWSCatalogMetastoreClient implements IMetaStoreClient {
private final CatalogToHiveConverter catalogToHiveConverter;

private static final int BATCH_DELETE_PARTITIONS_PAGE_SIZE = 25;
private static final int BATCH_DELETE_PARTITIONS_THREADS_COUNT = 5;

public static final String CUSTOM_EXECUTOR_FACTORY_CONF = "hive.metastore.executorservice.factory.class";

static final String BATCH_DELETE_PARTITIONS_THREAD_POOL_NAME_FORMAT = "batch-delete-partitions-%d";
private static final ExecutorService BATCH_DELETE_PARTITIONS_THREAD_POOL = Executors.newFixedThreadPool(
BATCH_DELETE_PARTITIONS_THREADS_COUNT,
new ThreadFactoryBuilder()
.setNameFormat(BATCH_DELETE_PARTITIONS_THREAD_POOL_NAME_FORMAT)
.setDaemon(true).build()
);

private Map<String, String> currentMetaVars;
private final AwsGlueHiveShims hiveShims = ShimsLoader.getHiveShims();
private static ExecutorService sharedExecutorService;

protected synchronized ExecutorService getExecutorService() {
if (sharedExecutorService == null) {
Class<? extends ExecutorServiceFactory> executorFactoryClass = this.conf
.getClass(CUSTOM_EXECUTOR_FACTORY_CONF,
DefaultExecutorServiceFactory.class).asSubclass(
ExecutorServiceFactory.class);
ExecutorServiceFactory factory = ReflectionUtils.newInstance(
executorFactoryClass, conf);
sharedExecutorService = factory.getExecutorService(conf);
}
return sharedExecutorService;
}

public AWSCatalogMetastoreClient(Configuration conf, HiveMetaHookLoader hook) throws MetaException {
this.conf = conf;
Expand Down Expand Up @@ -921,7 +930,7 @@ private List<org.apache.hadoop.hive.metastore.api.Partition> batchDeletePartitio
int j = Math.min(i + BATCH_DELETE_PARTITIONS_PAGE_SIZE, numOfPartitionsToDelete);
final List<Partition> partitionsOnePage = partitionsToDelete.subList(i, j);

batchDeletePartitionsFutures.add(BATCH_DELETE_PARTITIONS_THREAD_POOL.submit(new Callable<BatchDeletePartitionsHelper>() {
batchDeletePartitionsFutures.add(getExecutorService().submit(new Callable<BatchDeletePartitionsHelper>() {
@Override
public BatchDeletePartitionsHelper call() throws Exception {
return new BatchDeletePartitionsHelper(glueClient, dbName, tableName, catalogId, partitionsOnePage).deletePartitions();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import com.amazonaws.glue.catalog.converters.HiveToCatalogConverter;
import com.amazonaws.glue.catalog.util.ExprBuilder;
import com.amazonaws.glue.catalog.util.ExpressionHelper;
import com.amazonaws.glue.catalog.util.TestExecutorServiceFactory;
import com.amazonaws.glue.catalog.util.TestObjects;
import com.amazonaws.glue.shims.AwsGlueHiveShims;
import com.amazonaws.glue.shims.ShimsLoader;
Expand Down Expand Up @@ -56,6 +57,7 @@
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.common.StatsSetupConst;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.TableType;
import org.apache.hadoop.hive.metastore.Warehouse;
import org.apache.hadoop.hive.metastore.api.EnvironmentContext;
Expand Down Expand Up @@ -1327,4 +1329,19 @@ private void assertDaemonThreadPools() {
}
}

// ===================== Thread Executor =====================

@Test
public void testExecutorService() throws Exception {
Object defaultExecutorService = new DefaultExecutorServiceFactory().getExecutorService(conf);
assertEquals("Default executor service should be used", metastoreClient.getExecutorService(), defaultExecutorService);
HiveConf customConf = new HiveConf();
customConf.setClass(GlueMetastoreClientDelegate.CUSTOM_EXECUTOR_FACTORY_CONF, TestExecutorServiceFactory.class, ExecutorServiceFactory.class);
AWSCatalogMetastoreClient customClient = new AWSCatalogMetastoreClient.Builder().withClientFactory(clientFactory)
.withMetastoreFactory(metastoreFactory).withWarehouse(wh).createDefaults(false).withConf(customConf).build();
Object customExecutorService = new TestExecutorServiceFactory().getExecutorService(customConf);

assertEquals("Custom executor service should be used", customClient.getExecutorService(), customExecutorService);
}

}
Loading