diff --git a/presto-main/src/main/java/com/facebook/presto/server/GracefulShutdownHandler.java b/presto-main/src/main/java/com/facebook/presto/server/GracefulShutdownHandler.java index c4e5daa37df9..e73b745d3251 100644 --- a/presto-main/src/main/java/com/facebook/presto/server/GracefulShutdownHandler.java +++ b/presto-main/src/main/java/com/facebook/presto/server/GracefulShutdownHandler.java @@ -20,6 +20,7 @@ import com.facebook.presto.execution.TaskManager; import io.airlift.units.Duration; +import javax.annotation.PreDestroy; import javax.annotation.concurrent.GuardedBy; import javax.inject.Inject; @@ -32,6 +33,7 @@ import java.util.concurrent.TimeoutException; import static com.facebook.airlift.concurrent.Threads.threadsNamed; +import static com.google.common.base.Preconditions.checkState; import static com.google.common.collect.ImmutableList.toImmutableList; import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly; import static java.lang.Thread.currentThread; @@ -55,6 +57,8 @@ public class GracefulShutdownHandler private final boolean isResourceManager; private final ShutdownAction shutdownAction; private final Duration gracePeriod; + private final NodeStatusNotificationManager nodeStatusNotificationManager; + private boolean isLoadNodeStatusNotification; @GuardedBy("this") private boolean shutdownRequested; @@ -65,7 +69,8 @@ public GracefulShutdownHandler( ServerConfig serverConfig, ShutdownAction shutdownAction, LifeCycleManager lifeCycleManager, - QueryManager queryManager) + QueryManager queryManager, + NodeStatusNotificationManager nodeStatusNotificationManager) { this.sqlTaskManager = requireNonNull(sqlTaskManager, "sqlTaskManager is null"); this.shutdownAction = requireNonNull(shutdownAction, "shutdownAction is null"); @@ -74,6 +79,21 @@ public GracefulShutdownHandler( this.isResourceManager = serverConfig.isResourceManager(); this.gracePeriod = serverConfig.getGracePeriod(); this.queryManager = requireNonNull(queryManager, "queryManager is null"); + this.nodeStatusNotificationManager = requireNonNull(nodeStatusNotificationManager, "nodeStatusNotificationManager is null"); + } + + public void loadNodeStatusNotification() + { + log.debug("Loading node status notification"); + checkState(!isLoadNodeStatusNotification, "Node status notification can be registered only once"); + this.nodeStatusNotificationManager.getNotificationProvider().registerGracefulShutdownEventListener(this::initiateShutdown); + isLoadNodeStatusNotification = true; + } + + private void initiateShutdown() + { + log.info("Trigger shutdown from status notification"); + requestShutdown(); } public synchronized void requestShutdown() @@ -85,6 +105,7 @@ public synchronized void requestShutdown() } if (isShutdownRequested()) { + log.info("Shutdown already requested"); return; } @@ -202,4 +223,10 @@ public synchronized boolean isShutdownRequested() { return shutdownRequested; } + + @PreDestroy + public synchronized void destroy() + { + this.nodeStatusNotificationManager.getNotificationProvider().removeGracefulShutdownEventListener(this::initiateShutdown); + } } diff --git a/presto-main/src/main/java/com/facebook/presto/server/NodeStatusNotificationManager.java b/presto-main/src/main/java/com/facebook/presto/server/NodeStatusNotificationManager.java new file mode 100644 index 000000000000..b91a09bb210c --- /dev/null +++ b/presto-main/src/main/java/com/facebook/presto/server/NodeStatusNotificationManager.java @@ -0,0 +1,65 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.server; + +import com.facebook.presto.spi.nodestatus.NoOpNodeStatusNotificationProvider; +import com.facebook.presto.spi.nodestatus.NodeStatusNotificationProvider; +import com.facebook.presto.spi.nodestatus.NodeStatusNotificationProviderFactory; +import com.google.common.collect.ImmutableMap; + +import java.io.File; +import java.io.IOException; +import java.util.Map; + +import static com.facebook.presto.util.PropertiesUtil.loadProperties; +import static com.google.common.base.Preconditions.checkState; +import static java.util.Objects.requireNonNull; + +public class NodeStatusNotificationManager +{ + private static final File NODE_STATUS_NOTIFICATION_CONFIG = new File("etc/node-status-notification.properties"); + private NodeStatusNotificationProviderFactory notificationProviderFactory; + private NodeStatusNotificationProvider notificationProvider = new NoOpNodeStatusNotificationProvider(); + private boolean isNotificationProviderAdded; + + public void addNodeStatusNotificationProviderFactory(NodeStatusNotificationProviderFactory notificationProviderFactory) + { + this.notificationProviderFactory = requireNonNull(notificationProviderFactory, "notificationProviderFactory is null"); + } + + public void loadNodeStatusNotificationProvider() + throws IOException + { + if (this.notificationProviderFactory == null) { + return; + } + checkState(!isNotificationProviderAdded, "NotificationProvider can only be set once"); + this.notificationProvider = this.notificationProviderFactory.create(getConfig()); + this.isNotificationProviderAdded = true; + } + + private Map getConfig() + throws IOException + { + if (NODE_STATUS_NOTIFICATION_CONFIG.exists()) { + return loadProperties(NODE_STATUS_NOTIFICATION_CONFIG); + } + return ImmutableMap.of(); + } + + public NodeStatusNotificationProvider getNotificationProvider() + { + return this.notificationProvider; + } +} diff --git a/presto-main/src/main/java/com/facebook/presto/server/PluginManager.java b/presto-main/src/main/java/com/facebook/presto/server/PluginManager.java index dd7abc8f2d85..0f72c608bb3b 100644 --- a/presto-main/src/main/java/com/facebook/presto/server/PluginManager.java +++ b/presto-main/src/main/java/com/facebook/presto/server/PluginManager.java @@ -33,6 +33,7 @@ import com.facebook.presto.spi.connector.ConnectorFactory; import com.facebook.presto.spi.eventlistener.EventListenerFactory; import com.facebook.presto.spi.function.FunctionNamespaceManagerFactory; +import com.facebook.presto.spi.nodestatus.NodeStatusNotificationProviderFactory; import com.facebook.presto.spi.prerequisites.QueryPrerequisitesFactory; import com.facebook.presto.spi.resourceGroups.ResourceGroupConfigurationManagerFactory; import com.facebook.presto.spi.security.PasswordAuthenticatorFactory; @@ -123,6 +124,7 @@ public class PluginManager private final HistoryBasedPlanStatisticsManager historyBasedPlanStatisticsManager; private final TracerProviderManager tracerProviderManager; private final AnalyzerProviderManager analyzerProviderManager; + private final NodeStatusNotificationManager nodeStatusNotificationManager; @Inject public PluginManager( @@ -142,7 +144,8 @@ public PluginManager( NodeTtlFetcherManager nodeTtlFetcherManager, ClusterTtlProviderManager clusterTtlProviderManager, HistoryBasedPlanStatisticsManager historyBasedPlanStatisticsManager, - TracerProviderManager tracerProviderManager) + TracerProviderManager tracerProviderManager, + NodeStatusNotificationManager nodeStatusNotificationManager) { requireNonNull(nodeInfo, "nodeInfo is null"); requireNonNull(config, "config is null"); @@ -172,6 +175,7 @@ public PluginManager( this.historyBasedPlanStatisticsManager = requireNonNull(historyBasedPlanStatisticsManager, "historyBasedPlanStatisticsManager is null"); this.tracerProviderManager = requireNonNull(tracerProviderManager, "tracerProviderManager is null"); this.analyzerProviderManager = requireNonNull(analyzerProviderManager, "analyzerProviderManager is null"); + this.nodeStatusNotificationManager = requireNonNull(nodeStatusNotificationManager, "nodeStatusNotificationManager is null"); } public void loadPlugins() @@ -317,6 +321,11 @@ public void installPlugin(Plugin plugin) log.info("Registering analyzer provider %s", analyzerProvider.getType()); analyzerProviderManager.addAnalyzerProvider(analyzerProvider); } + + for (NodeStatusNotificationProviderFactory nodeStatusNotificationProviderFactory : plugin.getNodeStatusNotificationProviderFactory()) { + log.info("Registering node status notification provider %s", nodeStatusNotificationProviderFactory.getName()); + nodeStatusNotificationManager.addNodeStatusNotificationProviderFactory(nodeStatusNotificationProviderFactory); + } } private URLClassLoader buildClassLoader(String plugin) diff --git a/presto-main/src/main/java/com/facebook/presto/server/PrestoServer.java b/presto-main/src/main/java/com/facebook/presto/server/PrestoServer.java index cc52d8427ebe..8b3aaa3009cc 100644 --- a/presto-main/src/main/java/com/facebook/presto/server/PrestoServer.java +++ b/presto-main/src/main/java/com/facebook/presto/server/PrestoServer.java @@ -175,7 +175,8 @@ public void run() injector.getInstance(NodeTtlFetcherManager.class).loadNodeTtlFetcher(); injector.getInstance(ClusterTtlProviderManager.class).loadClusterTtlProvider(); injector.getInstance(TracerProviderManager.class).loadTracerProvider(); - + injector.getInstance(NodeStatusNotificationManager.class).loadNodeStatusNotificationProvider(); + injector.getInstance(GracefulShutdownHandler.class).loadNodeStatusNotification(); startAssociatedProcesses(injector); injector.getInstance(Announcer.class).start(); diff --git a/presto-main/src/main/java/com/facebook/presto/server/ServerMainModule.java b/presto-main/src/main/java/com/facebook/presto/server/ServerMainModule.java index 68e27f78dd09..15d5402149eb 100644 --- a/presto-main/src/main/java/com/facebook/presto/server/ServerMainModule.java +++ b/presto-main/src/main/java/com/facebook/presto/server/ServerMainModule.java @@ -769,6 +769,7 @@ public ListeningExecutorService createResourceManagerExecutor(ResourceManagerCon //Optional Status Detector newOptionalBinder(binder, NodeStatusService.class); + binder.bind(NodeStatusNotificationManager.class).in(Scopes.SINGLETON); } @Provides diff --git a/presto-main/src/main/java/com/facebook/presto/testing/LocalQueryRunner.java b/presto-main/src/main/java/com/facebook/presto/testing/LocalQueryRunner.java index 51963e8e8ac7..1a143f6535b9 100644 --- a/presto-main/src/main/java/com/facebook/presto/testing/LocalQueryRunner.java +++ b/presto-main/src/main/java/com/facebook/presto/testing/LocalQueryRunner.java @@ -119,6 +119,7 @@ import com.facebook.presto.operator.TaskContext; import com.facebook.presto.operator.index.IndexJoinLookupStats; import com.facebook.presto.server.ConnectorMetadataUpdateHandleJsonSerde; +import com.facebook.presto.server.NodeStatusNotificationManager; import com.facebook.presto.server.PluginManager; import com.facebook.presto.server.PluginManagerConfig; import com.facebook.presto.server.SessionPropertyDefaults; @@ -504,7 +505,8 @@ private LocalQueryRunner(Session defaultSession, FeaturesConfig featuresConfig, new ThrowingNodeTtlFetcherManager(), new ThrowingClusterTtlProviderManager(), historyBasedPlanStatisticsManager, - new TracerProviderManager(new TracingConfig())); + new TracerProviderManager(new TracingConfig()), + new NodeStatusNotificationManager()); connectorManager.addConnectorFactory(globalSystemConnectorFactory); connectorManager.createConnection(GlobalSystemConnector.NAME, GlobalSystemConnector.NAME, ImmutableMap.of()); diff --git a/presto-main/src/test/java/com/facebook/presto/server/TestThriftServerInfoIntegration.java b/presto-main/src/test/java/com/facebook/presto/server/TestThriftServerInfoIntegration.java index 74ee36cb0550..53b7f43adcaa 100644 --- a/presto-main/src/test/java/com/facebook/presto/server/TestThriftServerInfoIntegration.java +++ b/presto-main/src/test/java/com/facebook/presto/server/TestThriftServerInfoIntegration.java @@ -136,6 +136,7 @@ public void configure(Binder binder) configBinder(binder).bindConfig(ServerConfig.class); //Bind noop QueryManager similar to the binding done for TaskManager here binder.bind(QueryManager.class).to(NoOpQueryManager.class).in(Scopes.SINGLETON); + binder.bind(NodeStatusNotificationManager.class).in(Scopes.SINGLETON); binder.bind(GracefulShutdownHandler.class).in(Scopes.SINGLETON); binder.bind(ShutdownAction.class).to(TestingPrestoServer.TestShutdownAction.class).in(Scopes.SINGLETON); diff --git a/presto-spark-base/src/main/java/com/facebook/presto/spark/PrestoSparkModule.java b/presto-spark-base/src/main/java/com/facebook/presto/spark/PrestoSparkModule.java index 9339824c8b9f..8b8a61d8ffd8 100644 --- a/presto-spark-base/src/main/java/com/facebook/presto/spark/PrestoSparkModule.java +++ b/presto-spark-base/src/main/java/com/facebook/presto/spark/PrestoSparkModule.java @@ -96,6 +96,7 @@ import com.facebook.presto.resourcemanager.ResourceGroupService; import com.facebook.presto.server.ConnectorMetadataUpdateHandleJsonSerde; import com.facebook.presto.server.ForJsonMetadataUpdateHandle; +import com.facebook.presto.server.NodeStatusNotificationManager; import com.facebook.presto.server.PluginManager; import com.facebook.presto.server.PluginManagerConfig; import com.facebook.presto.server.QuerySessionSupplier; @@ -501,6 +502,7 @@ protected void setup(Binder binder) binder.bind(ResourceGroupService.class).to(NoopResourceGroupService.class).in(Scopes.SINGLETON); binder.bind(NodeTtlFetcherManager.class).to(ThrowingNodeTtlFetcherManager.class).in(Scopes.SINGLETON); binder.bind(ClusterTtlProviderManager.class).to(ThrowingClusterTtlProviderManager.class).in(Scopes.SINGLETON); + binder.bind(NodeStatusNotificationManager.class).in(Scopes.SINGLETON); // TODO: Decouple and remove: required by SessionPropertyDefaults, PluginManager, InternalResourceGroupManager, ConnectorManager configBinder(binder).bindConfig(NodeConfig.class); diff --git a/presto-spi/src/main/java/com/facebook/presto/spi/Plugin.java b/presto-spi/src/main/java/com/facebook/presto/spi/Plugin.java index d1a8043ef750..64d58edba0db 100644 --- a/presto-spi/src/main/java/com/facebook/presto/spi/Plugin.java +++ b/presto-spi/src/main/java/com/facebook/presto/spi/Plugin.java @@ -20,6 +20,7 @@ import com.facebook.presto.spi.connector.ConnectorFactory; import com.facebook.presto.spi.eventlistener.EventListenerFactory; import com.facebook.presto.spi.function.FunctionNamespaceManagerFactory; +import com.facebook.presto.spi.nodestatus.NodeStatusNotificationProviderFactory; import com.facebook.presto.spi.prerequisites.QueryPrerequisitesFactory; import com.facebook.presto.spi.resourceGroups.ResourceGroupConfigurationManagerFactory; import com.facebook.presto.spi.security.PasswordAuthenticatorFactory; @@ -130,4 +131,9 @@ default Iterable getAnalyzerProviders() { return emptyList(); } + + default Iterable getNodeStatusNotificationProviderFactory() + { + return emptyList(); + } } diff --git a/presto-spi/src/main/java/com/facebook/presto/spi/nodestatus/GracefulShutdownEventListener.java b/presto-spi/src/main/java/com/facebook/presto/spi/nodestatus/GracefulShutdownEventListener.java new file mode 100644 index 000000000000..f325dcddfa98 --- /dev/null +++ b/presto-spi/src/main/java/com/facebook/presto/spi/nodestatus/GracefulShutdownEventListener.java @@ -0,0 +1,20 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.spi.nodestatus; + +@FunctionalInterface +public interface GracefulShutdownEventListener +{ + void onNodeShuttingDown(); +} diff --git a/presto-spi/src/main/java/com/facebook/presto/spi/nodestatus/NoOpNodeStatusNotificationProvider.java b/presto-spi/src/main/java/com/facebook/presto/spi/nodestatus/NoOpNodeStatusNotificationProvider.java new file mode 100644 index 000000000000..74d41999964d --- /dev/null +++ b/presto-spi/src/main/java/com/facebook/presto/spi/nodestatus/NoOpNodeStatusNotificationProvider.java @@ -0,0 +1,28 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.spi.nodestatus; + +public class NoOpNodeStatusNotificationProvider + implements NodeStatusNotificationProvider +{ + @Override + public void registerGracefulShutdownEventListener(GracefulShutdownEventListener listener) + { + } + + @Override + public void removeGracefulShutdownEventListener(GracefulShutdownEventListener listener) + { + } +} diff --git a/presto-spi/src/main/java/com/facebook/presto/spi/nodestatus/NodeStatusNotificationProvider.java b/presto-spi/src/main/java/com/facebook/presto/spi/nodestatus/NodeStatusNotificationProvider.java new file mode 100644 index 000000000000..ab2baeacdaa8 --- /dev/null +++ b/presto-spi/src/main/java/com/facebook/presto/spi/nodestatus/NodeStatusNotificationProvider.java @@ -0,0 +1,29 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.spi.nodestatus; + +/** + * The {@code NodeStatusNotificationProvider} interface provides a registry for node status listeners. + * Implementations of this interface can listen to node status events and notify all registered listeners, + * especially when a node goes down. + * + *

It is essential for implementations to ensure proper synchronization if the registry is accessed + * by multiple threads.

+ */ +public interface NodeStatusNotificationProvider +{ + void registerGracefulShutdownEventListener(GracefulShutdownEventListener listener); + + void removeGracefulShutdownEventListener(GracefulShutdownEventListener listener); +} diff --git a/presto-spi/src/main/java/com/facebook/presto/spi/nodestatus/NodeStatusNotificationProviderFactory.java b/presto-spi/src/main/java/com/facebook/presto/spi/nodestatus/NodeStatusNotificationProviderFactory.java new file mode 100644 index 000000000000..630262ba2b18 --- /dev/null +++ b/presto-spi/src/main/java/com/facebook/presto/spi/nodestatus/NodeStatusNotificationProviderFactory.java @@ -0,0 +1,23 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.spi.nodestatus; + +import java.util.Map; + +public interface NodeStatusNotificationProviderFactory +{ + String getName(); + + NodeStatusNotificationProvider create(Map config); +}