From 8ca35bf184d5532292456f81c239d728c8b43601 Mon Sep 17 00:00:00 2001 From: Ikhun Um Date: Thu, 7 Nov 2024 14:51:00 +0900 Subject: [PATCH] Provide a way to run a `Plugin` in a zone leader replica (#1044) Motivation: When Central Dogma works on multiple IDC, we may need to run a `Plugin` in a specific zone due to network latency, firewall policy, or load balancing. Modifications: - Add `PluginTarget.ZONE_LEADER_ONLY` to run a `Plugin` in each zone leader. - Add `ZoneProvider` interface to dynamically resolve a zone name. - Allow setting `zone` via `dogma.json` and `CentralDogmaBuilder`. - If the value of `zone` starts with `classpath:`, the class is loaded and cast to `ZoneProvider`. - If the value starts with `$`, the value is read from the environment variable. - Add `onTakeZoneLeadership` and `onReleaseZoneLeadership` to the methods and fields where `onTakeLeadership` and `onReleaseLeadership` were defined before. - `/leader/{zone}` is used to take the zone leadership. Result: You can now run a `Plugin` in a zone leader with the following: - `dogma.json` ```json { "zone": "my-zone" } ``` - `Plugin` ```java class ZoneLeaderPlugin implements Plugin { @Override public PluginTarget target() { return PluginTarget.ZONE_LEADER_ONLY; } } ``` --- dependencies.toml | 6 + it/server/build.gradle | 7 + .../it/ConfigValueConverterTest.java | 9 + it/zone-leader-plugin/build.gradle | 4 + .../it/zoneleader/ZoneLeaderPluginTest.java | 142 ++++++++++++++++ .../server/DynamicZoneResolverTest.java | 30 ++++ .../centraldogma/server/MyZoneProvider.java | 34 ++++ ...p.centraldogma.server.ConfigValueConverter | 1 + .../centraldogma/server/CentralDogma.java | 76 +++++++-- .../server/CentralDogmaBuilder.java | 34 +++- .../server/CentralDogmaConfig.java | 18 +- .../server/DefaultConfigValueConverter.java | 5 +- .../centraldogma/server/PluginGroup.java | 14 +- .../command/AbstractCommandExecutor.java | 26 ++- .../command/StandaloneCommandExecutor.java | 36 +++- .../replication/ZooKeeperCommandExecutor.java | 157 +++++++++++++++--- .../server/plugin/PluginTarget.java | 10 +- .../internal/api/auth/PermissionTest.java | 2 +- .../server/internal/replication/Replica.java | 15 +- settings.gradle | 3 +- site/src/sphinx/setup-configuration.rst | 13 +- testing-internal/build.gradle | 1 + .../CentralDogmaReplicationExtension.java | 7 + .../internal/ProjectManagerExtension.java | 2 +- 24 files changed, 583 insertions(+), 69 deletions(-) create mode 100644 it/zone-leader-plugin/build.gradle create mode 100644 it/zone-leader-plugin/src/test/java/com/linecorp/centraldogma/it/zoneleader/ZoneLeaderPluginTest.java create mode 100644 it/zone-leader-plugin/src/test/java/com/linecorp/centraldogma/server/DynamicZoneResolverTest.java create mode 100644 it/zone-leader-plugin/src/test/java/com/linecorp/centraldogma/server/MyZoneProvider.java create mode 100644 it/zone-leader-plugin/src/test/resources/META-INF/services/com.linecorp.centraldogma.server.ConfigValueConverter diff --git a/dependencies.toml b/dependencies.toml index 2f027437d9..ed70c06ef2 100644 --- a/dependencies.toml +++ b/dependencies.toml @@ -42,6 +42,8 @@ jgit = "5.13.3.202401111512-r" jgit6 = "6.10.0.202406032230-r" junit4 = "4.13.2" junit5 = "5.11.0" +# Don't upgrade junit-pioneer to 2.x.x that requires Java 11 +junit-pioneer = "1.9.1" jsch = "0.1.55" # Don't update `json-path` version json-path = "2.2.0" @@ -299,6 +301,10 @@ module = "org.junit.platform:junit-platform-commons" [libraries.junit5-platform-launcher] module = "org.junit.platform:junit-platform-launcher" +[libraries.junit-pioneer] +module = "org.junit-pioneer:junit-pioneer" +version.ref = "junit-pioneer" + [libraries.kubernetes-client-api] module = "io.fabric8:kubernetes-client-api" version.ref = "kubernetes-client" diff --git a/it/server/build.gradle b/it/server/build.gradle index ce239317ea..b0629bd242 100644 --- a/it/server/build.gradle +++ b/it/server/build.gradle @@ -4,3 +4,10 @@ dependencies { testImplementation libs.curator.test } + +// To use @SetEnvironmentVariable +if (project.ext.testJavaVersion >= 16) { + tasks.withType(Test) { + jvmArgs '--add-opens=java.base/java.lang=ALL-UNNAMED', '--add-opens=java.base/java.util=ALL-UNNAMED' + } +} diff --git a/it/server/src/test/java/com/linecorp/centraldogma/it/ConfigValueConverterTest.java b/it/server/src/test/java/com/linecorp/centraldogma/it/ConfigValueConverterTest.java index 3fd31ac5ef..4382af1f24 100644 --- a/it/server/src/test/java/com/linecorp/centraldogma/it/ConfigValueConverterTest.java +++ b/it/server/src/test/java/com/linecorp/centraldogma/it/ConfigValueConverterTest.java @@ -18,6 +18,7 @@ import static org.assertj.core.api.Assertions.assertThat; import org.junit.jupiter.api.Test; +import org.junitpioneer.jupiter.SetEnvironmentVariable; import com.linecorp.centraldogma.server.CentralDogmaConfig; @@ -30,4 +31,12 @@ void convert() { assertThat(CentralDogmaConfig.convertValue("valid_prefix:value", "property")) .isEqualTo("valid_value"); } + + @SetEnvironmentVariable(key = "ZONE", value = "ZONE_A") + @SetEnvironmentVariable(key = "MY_ZONE", value = "ZONE_B") + @Test + void environmentVariable() { + assertThat(CentralDogmaConfig.convertValue("env:ZONE", "zone")).isEqualTo("ZONE_A"); + assertThat(CentralDogmaConfig.convertValue("env:MY_ZONE", "zone")).isEqualTo("ZONE_B"); + } } diff --git a/it/zone-leader-plugin/build.gradle b/it/zone-leader-plugin/build.gradle new file mode 100644 index 0000000000..995a3fb019 --- /dev/null +++ b/it/zone-leader-plugin/build.gradle @@ -0,0 +1,4 @@ +dependencies { + testImplementation project(':server') + testImplementation libs.curator.test +} diff --git a/it/zone-leader-plugin/src/test/java/com/linecorp/centraldogma/it/zoneleader/ZoneLeaderPluginTest.java b/it/zone-leader-plugin/src/test/java/com/linecorp/centraldogma/it/zoneleader/ZoneLeaderPluginTest.java new file mode 100644 index 0000000000..a8374bfceb --- /dev/null +++ b/it/zone-leader-plugin/src/test/java/com/linecorp/centraldogma/it/zoneleader/ZoneLeaderPluginTest.java @@ -0,0 +1,142 @@ +/* + * Copyright 2024 LINE Corporation + * + * LINE Corporation licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package com.linecorp.centraldogma.it.zoneleader; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.awaitility.Awaitility.await; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CompletionStage; + +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +import com.linecorp.armeria.common.util.UnmodifiableFuture; +import com.linecorp.centraldogma.server.CentralDogmaBuilder; +import com.linecorp.centraldogma.server.plugin.Plugin; +import com.linecorp.centraldogma.server.plugin.PluginContext; +import com.linecorp.centraldogma.server.plugin.PluginTarget; +import com.linecorp.centraldogma.testing.internal.CentralDogmaReplicationExtension; + +class ZoneLeaderPluginTest { + + private static final List plugins = new ArrayList<>(); + private static final int NUM_REPLICAS = 9; + + @RegisterExtension + static CentralDogmaReplicationExtension cluster = new CentralDogmaReplicationExtension(NUM_REPLICAS) { + @Override + protected void configureEach(int serverId, CentralDogmaBuilder builder) { + if (serverId <= 3) { + builder.zone("zone1"); + } else if (serverId <= 6) { + builder.zone("zone2"); + } else { + builder.zone("zone3"); + } + final ZoneLeaderTestPlugin plugin = new ZoneLeaderTestPlugin(serverId); + plugins.add(plugin); + builder.plugins(plugin); + } + }; + + @AfterAll + static void afterAll() { + plugins.clear(); + } + + @Test + void shouldSelectZoneLeaderOnly() throws InterruptedException { + assertZoneLeaderSelection(); + final List zone1 = plugins.subList(0, 3); + final int zone1LeaderId = zoneLeaderId(zone1); + // Zone leadership should be released when the leader goes down. + cluster.servers().get(zone1LeaderId - 1).stopAsync().join(); + // Wait for the new zone leader to be selected. + Thread.sleep(500); + assertZoneLeaderSelection(); + final int newZone1LeaderId = zoneLeaderId(zone1); + assertThat(newZone1LeaderId).isNotEqualTo(zone1LeaderId); + + final List zone2 = plugins.subList(3, 6); + final int zone2LeaderId = zoneLeaderId(zone2); + cluster.servers().get(zone2LeaderId - 1).stopAsync().join(); + // Wait for the new zone leader to be selected. + Thread.sleep(500); + assertZoneLeaderSelection(); + final int newZone2LeaderId = zoneLeaderId(zone2); + assertThat(newZone2LeaderId).isNotEqualTo(zone2LeaderId); + } + + private static int zoneLeaderId(List plugins) { + return plugins.stream() + .filter(ZoneLeaderTestPlugin::isStarted) + .mapToInt(p -> p.serverId) + .findFirst() + .getAsInt(); + } + + /** + * Make sure that only zone leaders start {@link ZoneLeaderTestPlugin}. + */ + private static void assertZoneLeaderSelection() { + for (int i = 0; i < NUM_REPLICAS; i += 3) { + final List zonePlugins = plugins.subList(i, i + 3); + await().untilAsserted(() -> { + assertThat(zonePlugins.stream().filter(ZoneLeaderTestPlugin::isStarted)).hasSize(1); + }); + } + } + + private static final class ZoneLeaderTestPlugin implements Plugin { + + private final int serverId; + private boolean started; + + private ZoneLeaderTestPlugin(int serverId) { + this.serverId = serverId; + } + + @Override + public PluginTarget target() { + return PluginTarget.ZONE_LEADER_ONLY; + } + + boolean isStarted() { + return started; + } + + @Override + public CompletionStage start(PluginContext context) { + started = true; + return UnmodifiableFuture.completedFuture(null); + } + + @Override + public CompletionStage stop(PluginContext context) { + started = false; + return UnmodifiableFuture.completedFuture(null); + } + + @Override + public Class configType() { + return getClass(); + } + } +} diff --git a/it/zone-leader-plugin/src/test/java/com/linecorp/centraldogma/server/DynamicZoneResolverTest.java b/it/zone-leader-plugin/src/test/java/com/linecorp/centraldogma/server/DynamicZoneResolverTest.java new file mode 100644 index 0000000000..9fd0b0b27c --- /dev/null +++ b/it/zone-leader-plugin/src/test/java/com/linecorp/centraldogma/server/DynamicZoneResolverTest.java @@ -0,0 +1,30 @@ +/* + * Copyright 2024 LINE Corporation + * + * LINE Corporation licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package com.linecorp.centraldogma.server; + +import static org.assertj.core.api.Assertions.assertThat; + +import org.junit.jupiter.api.Test; + +class DynamicZoneResolverTest { + + @Test + void shouldUseCustomConfigValueConverter() { + final String zone = CentralDogmaConfig.convertValue("zone:dummy", "zone"); + assertThat(zone).isEqualTo("ZONE_C"); + } +} diff --git a/it/zone-leader-plugin/src/test/java/com/linecorp/centraldogma/server/MyZoneProvider.java b/it/zone-leader-plugin/src/test/java/com/linecorp/centraldogma/server/MyZoneProvider.java new file mode 100644 index 0000000000..2228f5a940 --- /dev/null +++ b/it/zone-leader-plugin/src/test/java/com/linecorp/centraldogma/server/MyZoneProvider.java @@ -0,0 +1,34 @@ +/* + * Copyright 2024 LINE Corporation + * + * LINE Corporation licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package com.linecorp.centraldogma.server; + +import java.util.List; + +import com.google.common.collect.ImmutableList; + +public final class MyZoneProvider implements ConfigValueConverter { + + @Override + public List supportedPrefixes() { + return ImmutableList.of("zone"); + } + + @Override + public String convert(String prefix, String value) { + return "ZONE_C"; + } +} diff --git a/it/zone-leader-plugin/src/test/resources/META-INF/services/com.linecorp.centraldogma.server.ConfigValueConverter b/it/zone-leader-plugin/src/test/resources/META-INF/services/com.linecorp.centraldogma.server.ConfigValueConverter new file mode 100644 index 0000000000..0f1543b96e --- /dev/null +++ b/it/zone-leader-plugin/src/test/resources/META-INF/services/com.linecorp.centraldogma.server.ConfigValueConverter @@ -0,0 +1 @@ +com.linecorp.centraldogma.server.MyZoneProvider diff --git a/server/src/main/java/com/linecorp/centraldogma/server/CentralDogma.java b/server/src/main/java/com/linecorp/centraldogma/server/CentralDogma.java index f6fd3c6d1c..02e23070b4 100644 --- a/server/src/main/java/com/linecorp/centraldogma/server/CentralDogma.java +++ b/server/src/main/java/com/linecorp/centraldogma/server/CentralDogma.java @@ -218,7 +218,7 @@ public class CentralDogma implements AutoCloseable { */ public static CentralDogma forConfig(File configFile) throws IOException { requireNonNull(configFile, "configFile"); - return new CentralDogma(CentralDogmaConfig.load(configFile), Flags.meterRegistry()); + return new CentralDogma(CentralDogmaConfig.load(configFile), Flags.meterRegistry(), ImmutableList.of()); } private final SettableHealthChecker serverHealth = new SettableHealthChecker(false); @@ -230,6 +230,8 @@ public static CentralDogma forConfig(File configFile) throws IOException { private final PluginGroup pluginsForAllReplicas; @Nullable private final PluginGroup pluginsForLeaderOnly; + @Nullable + private final PluginGroup pluginsForZoneLeaderOnly; private final CentralDogmaConfig cfg; @Nullable @@ -254,12 +256,18 @@ public static CentralDogma forConfig(File configFile) throws IOException { @Nullable private volatile MirrorRunner mirrorRunner; - CentralDogma(CentralDogmaConfig cfg, MeterRegistry meterRegistry) { + CentralDogma(CentralDogmaConfig cfg, MeterRegistry meterRegistry, List plugins) { this.cfg = requireNonNull(cfg, "cfg"); pluginsForAllReplicas = PluginGroup.loadPlugins( - CentralDogma.class.getClassLoader(), PluginTarget.ALL_REPLICAS, cfg); + CentralDogma.class.getClassLoader(), PluginTarget.ALL_REPLICAS, cfg, plugins); pluginsForLeaderOnly = PluginGroup.loadPlugins( - CentralDogma.class.getClassLoader(), PluginTarget.LEADER_ONLY, cfg); + CentralDogma.class.getClassLoader(), PluginTarget.LEADER_ONLY, cfg, plugins); + pluginsForZoneLeaderOnly = PluginGroup.loadPlugins( + CentralDogma.class.getClassLoader(), PluginTarget.ZONE_LEADER_ONLY, cfg, plugins); + if (pluginsForZoneLeaderOnly != null) { + checkState(!isNullOrEmpty(cfg.zone()), + "zone must be specified when zone leader plugins are enabled."); + } startStop = new CentralDogmaStartStop(pluginsForAllReplicas); this.meterRegistry = meterRegistry; } @@ -335,6 +343,9 @@ public List plugins(PluginTarget target) { case ALL_REPLICAS: return pluginsForAllReplicas != null ? ImmutableList.copyOf(pluginsForAllReplicas.plugins()) : ImmutableList.of(); + case ZONE_LEADER_ONLY: + return pluginsForZoneLeaderOnly != null ? + ImmutableList.copyOf(pluginsForZoneLeaderOnly.plugins()) : ImmutableList.of(); default: // Should not reach here. throw new Error("Unknown plugin target: " + target); @@ -483,6 +494,42 @@ private CommandExecutor startCommandExecutor( } }; + Consumer onTakeZoneLeadership = null; + Consumer onReleaseZoneLeadership = null; + // TODO(ikhoon): Deduplicate + if (pluginsForZoneLeaderOnly != null) { + final String zone = cfg.zone(); + onTakeZoneLeadership = exec -> { + logger.info("Starting plugins on the {} zone leader replica ..", zone); + pluginsForZoneLeaderOnly + .start(cfg, pm, exec, meterRegistry, purgeWorker, projectInitializer) + .handle((unused, cause) -> { + if (cause == null) { + logger.info("Started plugins on the {} zone leader replica.", zone); + } else { + logger.error("Failed to start plugins on the {} zone leader replica..", + zone, cause); + } + return null; + }); + }; + onReleaseZoneLeadership = exec -> { + logger.info("Stopping plugins on the {} zone leader replica ..", zone); + pluginsForZoneLeaderOnly.stop(cfg, pm, exec, meterRegistry, purgeWorker, projectInitializer) + .handle((unused, cause) -> { + if (cause == null) { + logger.info("Stopped plugins on the {} zone leader replica.", + zone); + } else { + logger.error( + "Failed to stop plugins on the {} zone leader replica.", + zone, cause); + } + return null; + }); + }; + } + statusManager = new ServerStatusManager(cfg.dataDir()); logger.info("Startup mode: {}", statusManager.serverStatus()); final CommandExecutor executor; @@ -490,13 +537,15 @@ private CommandExecutor startCommandExecutor( switch (replicationMethod) { case ZOOKEEPER: executor = newZooKeeperCommandExecutor(pm, repositoryWorker, statusManager, meterRegistry, - sessionManager, onTakeLeadership, onReleaseLeadership); + sessionManager, onTakeLeadership, onReleaseLeadership, + onTakeZoneLeadership, onReleaseZoneLeadership); break; case NONE: logger.info("No replication mechanism specified; entering standalone"); executor = new StandaloneCommandExecutor(pm, repositoryWorker, statusManager, sessionManager, cfg.writeQuotaPerRepository(), - onTakeLeadership, onReleaseLeadership); + onTakeLeadership, onReleaseLeadership, + onTakeZoneLeadership, onReleaseZoneLeadership); break; default: throw new Error("unknown replication method: " + replicationMethod); @@ -714,7 +763,9 @@ private CommandExecutor newZooKeeperCommandExecutor( MeterRegistry meterRegistry, @Nullable SessionManager sessionManager, @Nullable Consumer onTakeLeadership, - @Nullable Consumer onReleaseLeadership) { + @Nullable Consumer onReleaseLeadership, + @Nullable Consumer onTakeZoneLeadership, + @Nullable Consumer onReleaseZoneLeadership) { final ZooKeeperReplicationConfig zkCfg = (ZooKeeperReplicationConfig) cfg.replicationConfig(); // Delete the old UUID replica ID which is not used anymore. @@ -726,8 +777,11 @@ private CommandExecutor newZooKeeperCommandExecutor( return new ZooKeeperCommandExecutor( zkCfg, dataDir, new StandaloneCommandExecutor(pm, repositoryWorker, serverStatusManager, sessionManager, - /* onTakeLeadership */ null, /* onReleaseLeadership */ null), - meterRegistry, pm, config().writeQuotaPerRepository(), onTakeLeadership, onReleaseLeadership); + /* onTakeLeadership */ null, /* onReleaseLeadership */ null, + /* onTakeZoneLeadership */ null, /* onReleaseZoneLeadership */ null), + meterRegistry, pm, config().writeQuotaPerRepository(), config().zone(), + onTakeLeadership, onReleaseLeadership, + onTakeZoneLeadership, onReleaseZoneLeadership); } private void configureThriftService(ServerBuilder sb, ProjectApiManager projectApiManager, @@ -1026,7 +1080,6 @@ private void doStop() { this.repositoryWorker = null; this.sessionManager = null; this.mirrorRunner = null; - projectInitializer = null; if (meterRegistryToBeClosed != null) { assert meterRegistry instanceof CompositeMeterRegistry; ((CompositeMeterRegistry) meterRegistry).remove(meterRegistryToBeClosed); @@ -1040,6 +1093,9 @@ private void doStop() { } else { logger.info("Stopped the Central Dogma successfully."); } + + // Should be nullified after stopping the command executor because the command executor may access it. + projectInitializer = null; } private static boolean doStop( diff --git a/server/src/main/java/com/linecorp/centraldogma/server/CentralDogmaBuilder.java b/server/src/main/java/com/linecorp/centraldogma/server/CentralDogmaBuilder.java index a93f2c062b..27401e1158 100644 --- a/server/src/main/java/com/linecorp/centraldogma/server/CentralDogmaBuilder.java +++ b/server/src/main/java/com/linecorp/centraldogma/server/CentralDogmaBuilder.java @@ -49,6 +49,7 @@ import com.linecorp.centraldogma.server.auth.AuthProvider; import com.linecorp.centraldogma.server.auth.AuthProviderFactory; import com.linecorp.centraldogma.server.auth.Session; +import com.linecorp.centraldogma.server.plugin.Plugin; import com.linecorp.centraldogma.server.plugin.PluginConfig; import com.linecorp.centraldogma.server.storage.repository.Repository; @@ -132,8 +133,11 @@ public final class CentralDogmaBuilder { private CorsConfig corsConfig; private final List pluginConfigs = new ArrayList<>(); + private final List plugins = new ArrayList<>(); @Nullable private ManagementConfig managementConfig; + @Nullable + private String zone; /** * Creates a new builder with the specified data directory. @@ -531,6 +535,23 @@ public CentralDogmaBuilder pluginConfigs(PluginConfig... pluginConfigs) { return this; } + /** + * Adds the {@link Plugin}s. + */ + public CentralDogmaBuilder plugins(Plugin... plugins) { + requireNonNull(plugins, "plugins"); + return plugins(ImmutableList.copyOf(plugins)); + } + + /** + * Adds the {@link Plugin}s. + */ + public CentralDogmaBuilder plugins(Iterable plugins) { + requireNonNull(plugins, "plugins"); + this.plugins.addAll(ImmutableList.copyOf(plugins)); + return this; + } + /** * Enables a management service with the specified {@link ManagementConfig}. */ @@ -540,11 +561,20 @@ public CentralDogmaBuilder management(ManagementConfig managementConfig) { return this; } + /** + * Specifies the zone of the server. + */ + public CentralDogmaBuilder zone(String zone) { + requireNonNull(zone, "zone"); + this.zone = zone; + return this; + } + /** * Returns a newly-created {@link CentralDogma} server. */ public CentralDogma build() { - return new CentralDogma(buildConfig(), meterRegistry); + return new CentralDogma(buildConfig(), meterRegistry, ImmutableList.copyOf(plugins)); } private CentralDogmaConfig buildConfig() { @@ -573,6 +603,6 @@ private CentralDogmaConfig buildConfig() { maxRemovedRepositoryAgeMillis, gracefulShutdownTimeout, webAppEnabled, webAppTitle,replicationConfig, null, accessLogFormat, authCfg, quotaConfig, - corsConfig, pluginConfigs, managementConfig); + corsConfig, pluginConfigs, managementConfig, zone); } } diff --git a/server/src/main/java/com/linecorp/centraldogma/server/CentralDogmaConfig.java b/server/src/main/java/com/linecorp/centraldogma/server/CentralDogmaConfig.java index f914d37f4e..0a3c76ea6d 100644 --- a/server/src/main/java/com/linecorp/centraldogma/server/CentralDogmaConfig.java +++ b/server/src/main/java/com/linecorp/centraldogma/server/CentralDogmaConfig.java @@ -82,6 +82,7 @@ import com.linecorp.centraldogma.server.auth.AuthConfig; import com.linecorp.centraldogma.server.plugin.PluginConfig; import com.linecorp.centraldogma.server.plugin.PluginConfigDeserializer; +import com.linecorp.centraldogma.server.plugin.PluginTarget; import com.linecorp.centraldogma.server.storage.repository.Repository; import io.netty.util.NetUtil; @@ -269,6 +270,9 @@ public static CentralDogmaConfig load(String json) throws JsonMappingException, @Nullable private final ManagementConfig managementConfig; + @Nullable + private final String zone; + CentralDogmaConfig( @JsonProperty(value = "dataDir", required = true) File dataDir, @JsonProperty(value = "ports", required = true) @@ -295,7 +299,8 @@ public static CentralDogmaConfig load(String json) throws JsonMappingException, @JsonProperty("writeQuotaPerRepository") @Nullable QuotaConfig writeQuotaPerRepository, @JsonProperty("cors") @Nullable CorsConfig corsConfig, @JsonProperty("pluginConfigs") @Nullable List pluginConfigs, - @JsonProperty("management") @Nullable ManagementConfig managementConfig) { + @JsonProperty("management") @Nullable ManagementConfig managementConfig, + @JsonProperty("zone") @Nullable String zone) { this.dataDir = requireNonNull(dataDir, "dataDir"); this.ports = ImmutableList.copyOf(requireNonNull(ports, "ports")); @@ -344,6 +349,7 @@ public static CentralDogmaConfig load(String json) throws JsonMappingException, pluginConfigMap = this.pluginConfigs.stream().collect( toImmutableMap(PluginConfig::getClass, Function.identity())); this.managementConfig = managementConfig; + this.zone = convertValue(zone, "zone"); } /** @@ -582,6 +588,16 @@ public ManagementConfig managementConfig() { return managementConfig; } + /** + * Returns the zone of the server. + * Note that the zone must be specified to use the {@link PluginTarget#ZONE_LEADER_ONLY} target. + */ + @Nullable + @JsonProperty("zone") + public String zone() { + return zone; + } + @Override public String toString() { try { diff --git a/server/src/main/java/com/linecorp/centraldogma/server/DefaultConfigValueConverter.java b/server/src/main/java/com/linecorp/centraldogma/server/DefaultConfigValueConverter.java index 6aad2907c3..f0b4b692aa 100644 --- a/server/src/main/java/com/linecorp/centraldogma/server/DefaultConfigValueConverter.java +++ b/server/src/main/java/com/linecorp/centraldogma/server/DefaultConfigValueConverter.java @@ -30,9 +30,10 @@ enum DefaultConfigValueConverter implements ConfigValueConverter { private static final String PLAINTEXT = "plaintext"; private static final String FILE = "file"; private static final String BASE64 = "base64"; + private static final String ENV = "env"; // TODO(minwoox): Add more prefixes such as classpath, url, etc. - private static final List SUPPORTED_PREFIXES = ImmutableList.of(PLAINTEXT, FILE, BASE64); + private static final List SUPPORTED_PREFIXES = ImmutableList.of(PLAINTEXT, FILE, BASE64, ENV); @Override public List supportedPrefixes() { @@ -52,6 +53,8 @@ public String convert(String prefix, String value) { } case BASE64: return new String(Base64.getDecoder().decode(value), StandardCharsets.UTF_8).trim(); + case ENV: + return System.getenv(value); default: // Should never reach here. throw new Error(); diff --git a/server/src/main/java/com/linecorp/centraldogma/server/PluginGroup.java b/server/src/main/java/com/linecorp/centraldogma/server/PluginGroup.java index cfdb3bca63..0962d26732 100644 --- a/server/src/main/java/com/linecorp/centraldogma/server/PluginGroup.java +++ b/server/src/main/java/com/linecorp/centraldogma/server/PluginGroup.java @@ -35,6 +35,7 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Iterables; import com.spotify.futures.CompletableFutures; import com.linecorp.armeria.common.util.StartStopSupport; @@ -64,7 +65,7 @@ final class PluginGroup { */ @Nullable static PluginGroup loadPlugins(PluginTarget target, CentralDogmaConfig config) { - return loadPlugins(PluginGroup.class.getClassLoader(), target, config); + return loadPlugins(PluginGroup.class.getClassLoader(), target, config, ImmutableList.of()); } /** @@ -76,21 +77,22 @@ static PluginGroup loadPlugins(PluginTarget target, CentralDogmaConfig config) { * @param target the {@link PluginTarget} which would be loaded */ @Nullable - static PluginGroup loadPlugins(ClassLoader classLoader, PluginTarget target, CentralDogmaConfig config) { + static PluginGroup loadPlugins(ClassLoader classLoader, PluginTarget target, CentralDogmaConfig config, + List plugins) { requireNonNull(classLoader, "classLoader"); requireNonNull(target, "target"); requireNonNull(config, "config"); final ServiceLoader loader = ServiceLoader.load(Plugin.class, classLoader); - final ImmutableMap.Builder, Plugin> plugins = new ImmutableMap.Builder<>(); - for (Plugin plugin : loader) { + final ImmutableMap.Builder, Plugin> allPlugins = new ImmutableMap.Builder<>(); + for (Plugin plugin : Iterables.concat(plugins, loader)) { if (target == plugin.target() && plugin.isEnabled(config)) { - plugins.put(plugin.configType(), plugin); + allPlugins.put(plugin.configType(), plugin); } } // IllegalArgumentException is thrown if there are duplicate keys. - final Map, Plugin> pluginMap = plugins.build(); + final Map, Plugin> pluginMap = allPlugins.build(); if (pluginMap.isEmpty()) { return null; } diff --git a/server/src/main/java/com/linecorp/centraldogma/server/command/AbstractCommandExecutor.java b/server/src/main/java/com/linecorp/centraldogma/server/command/AbstractCommandExecutor.java index 567343bcba..66356e8d0f 100644 --- a/server/src/main/java/com/linecorp/centraldogma/server/command/AbstractCommandExecutor.java +++ b/server/src/main/java/com/linecorp/centraldogma/server/command/AbstractCommandExecutor.java @@ -46,6 +46,10 @@ public abstract class AbstractCommandExecutor implements CommandExecutor { private final Consumer onTakeLeadership; @Nullable private final Consumer onReleaseLeadership; + @Nullable + private final Consumer onTakeZoneLeadership; + @Nullable + private final Consumer onReleaseZoneLeadership; private final CommandExecutorStartStop startStop = new CommandExecutorStartStop(); private volatile boolean started; @@ -58,11 +62,17 @@ public abstract class AbstractCommandExecutor implements CommandExecutor { * * @param onTakeLeadership the callback to be invoked after the replica has taken the leadership * @param onReleaseLeadership the callback to be invoked before the replica releases the leadership + * @param onTakeZoneLeadership the callback to be invoked after the replica has taken the zone leadership + * @param onReleaseZoneLeadership the callback to be invoked before the replica releases the zone leadership */ protected AbstractCommandExecutor(@Nullable Consumer onTakeLeadership, - @Nullable Consumer onReleaseLeadership) { + @Nullable Consumer onReleaseLeadership, + @Nullable Consumer onTakeZoneLeadership, + @Nullable Consumer onReleaseZoneLeadership) { this.onTakeLeadership = onTakeLeadership; this.onReleaseLeadership = onReleaseLeadership; + this.onTakeZoneLeadership = onTakeZoneLeadership; + this.onReleaseZoneLeadership = onReleaseZoneLeadership; statusManager = new CommandExecutorStatusManager(this); } @@ -86,7 +96,9 @@ public final CompletableFuture start() { } protected abstract void doStart(@Nullable Runnable onTakeLeadership, - @Nullable Runnable onReleaseLeadership) throws Exception; + @Nullable Runnable onReleaseLeadership, + @Nullable Runnable onTakeZoneLeadership, + @Nullable Runnable onReleaseZoneLeadership) throws Exception; @Override public final CompletableFuture stop() { @@ -95,7 +107,8 @@ public final CompletableFuture stop() { return startStop.stop().thenRun(numPendingStopRequests::decrementAndGet); } - protected abstract void doStop(@Nullable Runnable onReleaseLeadership) throws Exception; + protected abstract void doStop(@Nullable Runnable onReleaseLeadership, + @Nullable Runnable onReleaseZoneLeadership) throws Exception; @Override public final boolean isWritable() { @@ -155,7 +168,9 @@ protected CompletionStage doStart(@Nullable Void unused) throws Exception return execute("command-executor", () -> { try { AbstractCommandExecutor.this.doStart(toRunnable(onTakeLeadership), - toRunnable(onReleaseLeadership)); + toRunnable(onReleaseLeadership), + toRunnable(onTakeZoneLeadership), + toRunnable(onReleaseZoneLeadership)); } catch (Exception e) { Exceptions.throwUnsafely(e); } @@ -166,7 +181,8 @@ protected CompletionStage doStart(@Nullable Void unused) throws Exception protected CompletionStage doStop(@Nullable Void unused) throws Exception { return execute("command-executor-shutdown", () -> { try { - AbstractCommandExecutor.this.doStop(toRunnable(onReleaseLeadership)); + AbstractCommandExecutor.this.doStop(toRunnable(onReleaseLeadership), + toRunnable(onReleaseZoneLeadership)); } catch (Exception e) { Exceptions.throwUnsafely(e); } diff --git a/server/src/main/java/com/linecorp/centraldogma/server/command/StandaloneCommandExecutor.java b/server/src/main/java/com/linecorp/centraldogma/server/command/StandaloneCommandExecutor.java index 6022d8ef8f..337a5a12f8 100644 --- a/server/src/main/java/com/linecorp/centraldogma/server/command/StandaloneCommandExecutor.java +++ b/server/src/main/java/com/linecorp/centraldogma/server/command/StandaloneCommandExecutor.java @@ -74,6 +74,8 @@ public class StandaloneCommandExecutor extends AbstractCommandExecutor { * @param sessionManager the session manager for creating/removing a session * @param onTakeLeadership the callback to be invoked after the replica has taken the leadership * @param onReleaseLeadership the callback to be invoked before the replica releases the leadership + * @param onTakeZoneLeadership the callback to be invoked after the replica has taken the zone leadership + * @param onReleaseZoneLeadership the callback to be invoked before the replica releases the zone leadership */ public StandaloneCommandExecutor(ProjectManager projectManager, Executor repositoryWorker, @@ -81,10 +83,12 @@ public StandaloneCommandExecutor(ProjectManager projectManager, @Nullable SessionManager sessionManager, @Nullable QuotaConfig writeQuota, @Nullable Consumer onTakeLeadership, - @Nullable Consumer onReleaseLeadership) { + @Nullable Consumer onReleaseLeadership, + @Nullable Consumer onTakeZoneLeadership, + @Nullable Consumer onReleaseZoneLeadership) { this(projectManager, repositoryWorker, serverStatusManager, sessionManager, writeQuota != null ? writeQuota.permitsPerSecond() : 0, - onTakeLeadership, onReleaseLeadership); + onTakeLeadership, onReleaseLeadership, onTakeZoneLeadership, onReleaseZoneLeadership); } /** @@ -95,15 +99,19 @@ public StandaloneCommandExecutor(ProjectManager projectManager, * @param sessionManager the session manager for creating/removing a session * @param onTakeLeadership the callback to be invoked after the replica has taken the leadership * @param onReleaseLeadership the callback to be invoked before the replica releases the leadership + * @param onTakeZoneLeadership the callback to be invoked after the replica has taken the zone leadership + * @param onReleaseZoneLeadership the callback to be invoked before the replica releases the zone leadership */ public StandaloneCommandExecutor(ProjectManager projectManager, Executor repositoryWorker, ServerStatusManager serverStatusManager, @Nullable SessionManager sessionManager, @Nullable Consumer onTakeLeadership, - @Nullable Consumer onReleaseLeadership) { - this(projectManager, repositoryWorker, serverStatusManager, sessionManager, -1, onTakeLeadership, - onReleaseLeadership); + @Nullable Consumer onReleaseLeadership, + @Nullable Consumer onTakeZoneLeadership, + @Nullable Consumer onReleaseZoneLeadership) { + this(projectManager, repositoryWorker, serverStatusManager, sessionManager, -1, + onTakeLeadership, onReleaseLeadership, onTakeZoneLeadership, onReleaseZoneLeadership); } private StandaloneCommandExecutor(ProjectManager projectManager, @@ -112,8 +120,10 @@ private StandaloneCommandExecutor(ProjectManager projectManager, @Nullable SessionManager sessionManager, double permitsPerSecond, @Nullable Consumer onTakeLeadership, - @Nullable Consumer onReleaseLeadership) { - super(onTakeLeadership, onReleaseLeadership); + @Nullable Consumer onReleaseLeadership, + @Nullable Consumer onTakeZoneLeadership, + @Nullable Consumer onReleaseZoneLeadership) { + super(onTakeLeadership, onReleaseLeadership, onTakeZoneLeadership, onReleaseZoneLeadership); this.projectManager = requireNonNull(projectManager, "projectManager"); this.repositoryWorker = requireNonNull(repositoryWorker, "repositoryWorker"); this.serverStatusManager = requireNonNull(serverStatusManager, "serverStatusManager"); @@ -130,17 +140,25 @@ public int replicaId() { @Override protected void doStart(@Nullable Runnable onTakeLeadership, - @Nullable Runnable onReleaseLeadership) { + @Nullable Runnable onReleaseLeadership, + @Nullable Runnable onTakeZoneLeadership, + @Nullable Runnable onReleaseZoneLeadership) { if (onTakeLeadership != null) { onTakeLeadership.run(); } + if (onTakeZoneLeadership != null) { + onTakeZoneLeadership.run(); + } } @Override - protected void doStop(@Nullable Runnable onReleaseLeadership) { + protected void doStop(@Nullable Runnable onReleaseLeadership, @Nullable Runnable onReleaseZoneLeadership) { if (onReleaseLeadership != null) { onReleaseLeadership.run(); } + if (onReleaseZoneLeadership != null) { + onReleaseZoneLeadership.run(); + } } @Override diff --git a/server/src/main/java/com/linecorp/centraldogma/server/internal/replication/ZooKeeperCommandExecutor.java b/server/src/main/java/com/linecorp/centraldogma/server/internal/replication/ZooKeeperCommandExecutor.java index 57d8a14580..d73c7a9bc2 100644 --- a/server/src/main/java/com/linecorp/centraldogma/server/internal/replication/ZooKeeperCommandExecutor.java +++ b/server/src/main/java/com/linecorp/centraldogma/server/internal/replication/ZooKeeperCommandExecutor.java @@ -169,6 +169,8 @@ public final class ZooKeeperCommandExecutor @Nullable private final QuotaConfig writeQuota; + @Nullable + private final String zone; private MetadataService metadataService; @@ -184,6 +186,12 @@ public final class ZooKeeperCommandExecutor private volatile OldLogRemover oldLogRemover; private volatile ExecutorService leaderSelectorExecutor; private volatile LeaderSelector leaderSelector; + @Nullable + private volatile ZoneLeaderPluginsRunner zonePluginsRunner; + @Nullable + private volatile ExecutorService zoneLeaderSelectorExecutor; + @Nullable + private volatile LeaderSelector zoneLeaderSelector; private volatile ScheduledExecutorService quotaExecutor; private volatile boolean createdParentNodes; private volatile boolean canReplicate; @@ -323,17 +331,73 @@ private void deleteLogBlock(String logPath, LogMeta logMeta, String blockPath, } } + private class ZoneLeaderPluginsRunner implements LeaderSelectorListener { + volatile boolean hasLeadership; + + @Override + public void stateChanged(CuratorFramework client, ConnectionState newState) { + //ignore + } + + @Override + public void takeLeadership(CuratorFramework client) throws Exception { + final ListenerInfo listenerInfo = ZooKeeperCommandExecutor.this.listenerInfo; + if (listenerInfo == null) { + // Stopped. + return; + } + + logger.info("Taking the {} zone leadership: {}", zone, replicaId()); + try { + hasLeadership = true; + if (listenerInfo.onTakeZoneLeadership != null) { + listenerInfo.onTakeZoneLeadership.run(); + } + + synchronized (this) { + // Wait until the zone leadership is lost. + wait(); + } + } catch (InterruptedException e) { + // Leader selector has been closed. + } catch (Exception e) { + logger.error("Leader stopped due to an unexpected exception:", e); + } finally { + hasLeadership = false; + logger.info("Releasing the zone {} leadership: {}", zone, replicaId()); + if (listenerInfo.onReleaseZoneLeadership != null) { + listenerInfo.onReleaseZoneLeadership.run(); + } + + if (ZooKeeperCommandExecutor.this.listenerInfo != null) { + // Requeue only when the executor is not stopped. + zoneLeaderSelector.requeue(); + } + } + } + } + private static final class ListenerInfo { long lastReplayedRevision; + @Nullable final Runnable onTakeLeadership; + @Nullable final Runnable onReleaseLeadership; + @Nullable + final Runnable onTakeZoneLeadership; + @Nullable + final Runnable onReleaseZoneLeadership; ListenerInfo(long lastReplayedRevision, - @Nullable Runnable onTakeLeadership, @Nullable Runnable onReleaseLeadership) { + @Nullable Runnable onTakeLeadership, @Nullable Runnable onReleaseLeadership, + @Nullable Runnable onTakeZoneLeadership, @Nullable Runnable onReleaseZoneLeadership + ) { this.lastReplayedRevision = lastReplayedRevision; this.onReleaseLeadership = onReleaseLeadership; this.onTakeLeadership = onTakeLeadership; + this.onTakeZoneLeadership = onTakeZoneLeadership; + this.onReleaseZoneLeadership = onReleaseZoneLeadership; } } @@ -344,9 +408,12 @@ public ZooKeeperCommandExecutor(ZooKeeperReplicationConfig cfg, MeterRegistry meterRegistry, ProjectManager projectManager, @Nullable QuotaConfig writeQuota, + @Nullable String zone, @Nullable Consumer onTakeLeadership, - @Nullable Consumer onReleaseLeadership) { - super(onTakeLeadership, onReleaseLeadership); + @Nullable Consumer onReleaseLeadership, + @Nullable Consumer onTakeZoneLeadership, + @Nullable Consumer onReleaseZoneLeadership) { + super(onTakeLeadership, onReleaseLeadership, onTakeZoneLeadership, onReleaseZoneLeadership); this.cfg = requireNonNull(cfg, "cfg"); requireNonNull(dataDir, "dataDir"); @@ -362,6 +429,7 @@ public ZooKeeperCommandExecutor(ZooKeeperReplicationConfig cfg, this.meterRegistry = requireNonNull(meterRegistry, "meterRegistry"); this.writeQuota = writeQuota; metadataService = new MetadataService(projectManager, this); + this.zone = zone; // Register the metrics which are accessible even before started. Gauge.builder("replica.id", this, self -> replicaId()).register(meterRegistry); @@ -377,6 +445,15 @@ public ZooKeeperCommandExecutor(ZooKeeperReplicationConfig cfg, return remover != null && remover.hasLeadership ? 1 : 0; }) .register(meterRegistry); + if (onTakeZoneLeadership != null) { + Gauge.builder("replica.has.zone.leadership", this, + self -> { + final ZoneLeaderPluginsRunner zoneRunner = self.zonePluginsRunner; + return zoneRunner != null && zoneRunner.hasLeadership ? 1 : 0; + }) + .tag("zone", zone) + .register(meterRegistry); + } Gauge.builder("replica.last.replayed.revision", this, self -> { final ListenerInfo info = self.listenerInfo; @@ -395,13 +472,16 @@ public int replicaId() { @Override protected void doStart(@Nullable Runnable onTakeLeadership, - @Nullable Runnable onReleaseLeadership) throws Exception { + @Nullable Runnable onReleaseLeadership, + @Nullable Runnable onTakeZoneLeadership, + @Nullable Runnable onReleaseZoneLeadership) throws Exception { try { // Get the last replayed revision. final long lastReplayedRevision; try { lastReplayedRevision = getLastReplayedRevision(); - listenerInfo = new ListenerInfo(lastReplayedRevision, onTakeLeadership, onReleaseLeadership); + listenerInfo = new ListenerInfo(lastReplayedRevision, onTakeLeadership, onReleaseLeadership, + onTakeZoneLeadership, onReleaseZoneLeadership); } catch (Exception e) { throw new ReplicationException("failed to read " + revisionFile, e); } @@ -443,6 +523,21 @@ protected void doStart(@Nullable Runnable onTakeLeadership, leaderSelectorExecutor, oldLogRemover); leaderSelector.start(); + if (onTakeZoneLeadership != null) { + // Start the zone leader selection. + zonePluginsRunner = new ZoneLeaderPluginsRunner(); + zoneLeaderSelectorExecutor = ExecutorServiceMetrics.monitor( + meterRegistry, + Executors.newSingleThreadExecutor( + new DefaultThreadFactory("zookeeper-zone-leader-selector", true)), + "zkZoneLeaderSelector"); + + assert zone != null; + zoneLeaderSelector = new LeaderSelector(curator, absolutePath(LEADER_PATH, zone), + zoneLeaderSelectorExecutor, zonePluginsRunner); + zoneLeaderSelector.start(); + } + // Start the delegate. // The delegate is StandaloneCommandExecutor, which will be quite fast to start. delegate.start().get(); @@ -635,7 +730,8 @@ private void stopLater() { } @Override - protected void doStop(@Nullable Runnable onReleaseLeadership) throws Exception { + protected void doStop(@Nullable Runnable onReleaseLeadership, + @Nullable Runnable onReleaseZoneLeadership) throws Exception { canReplicate = false; listenerInfo = null; logger.info("Stopping the worker threads"); @@ -666,36 +762,47 @@ protected void doStop(@Nullable Runnable onReleaseLeadership) throws Exception { logger.warn("Failed to close the leader selector: {}", e.getMessage(), e); } finally { try { - if (logWatcher != null) { - logger.info("Closing the log watcher"); - logWatcher.close(); - interrupted |= shutdown(logWatcherExecutor); - logger.info("Closed the log watcher"); + if (zoneLeaderSelector != null) { + logger.info("Closing the zone {} leader selector", zone); + zoneLeaderSelector.close(); + interrupted |= shutdown(zoneLeaderSelectorExecutor); + logger.info("Closed the zone {} leader selector", zone); } } catch (Exception e) { - logger.warn("Failed to close the log watcher: {}", e.getMessage(), e); + logger.warn("Failed to close the zone {} leader selector: {}", zone, e.getMessage(), e); } finally { try { - if (curator != null) { - logger.info("Closing the Curator framework"); - curator.close(); - logger.info("Closed the Curator framework"); + if (logWatcher != null) { + logger.info("Closing the log watcher"); + logWatcher.close(); + interrupted |= shutdown(logWatcherExecutor); + logger.info("Closed the log watcher"); } } catch (Exception e) { - logger.warn("Failed to close the Curator framework: {}", e.getMessage(), e); + logger.warn("Failed to close the log watcher: {}", e.getMessage(), e); } finally { try { - if (quorumPeer != null) { - final long peerId = quorumPeer.getId(); - logger.info("Shutting down the ZooKeeper peer ({})", peerId); - quorumPeer.shutdown(); - logger.info("Shut down the ZooKeeper peer ({})", peerId); + if (curator != null) { + logger.info("Closing the Curator framework"); + curator.close(); + logger.info("Closed the Curator framework"); } } catch (Exception e) { - logger.warn("Failed to shut down the ZooKeeper peer: {}", e.getMessage(), e); + logger.warn("Failed to close the Curator framework: {}", e.getMessage(), e); } finally { - if (interrupted) { - Thread.currentThread().interrupt(); + try { + if (quorumPeer != null) { + final long peerId = quorumPeer.getId(); + logger.info("Shutting down the ZooKeeper peer ({})", peerId); + quorumPeer.shutdown(); + logger.info("Shut down the ZooKeeper peer ({})", peerId); + } + } catch (Exception e) { + logger.warn("Failed to shut down the ZooKeeper peer: {}", e.getMessage(), e); + } finally { + if (interrupted) { + Thread.currentThread().interrupt(); + } } } } diff --git a/server/src/main/java/com/linecorp/centraldogma/server/plugin/PluginTarget.java b/server/src/main/java/com/linecorp/centraldogma/server/plugin/PluginTarget.java index 530d60500d..b28b5e26ec 100644 --- a/server/src/main/java/com/linecorp/centraldogma/server/plugin/PluginTarget.java +++ b/server/src/main/java/com/linecorp/centraldogma/server/plugin/PluginTarget.java @@ -15,6 +15,8 @@ */ package com.linecorp.centraldogma.server.plugin; +import com.linecorp.centraldogma.server.CentralDogmaConfig; + /** * Targets that a {@link Plugin} is applied to which replica. */ @@ -28,5 +30,11 @@ public enum PluginTarget { * Run the {@link Plugin} on the leader replica. It would be started after the replica has taken * the leadership and would be stopped when the replica is about to release the leadership. */ - LEADER_ONLY + LEADER_ONLY, + /** + * Run the {@link Plugin} on the leader replica for a zone specified in {@link CentralDogmaConfig#zone()}. + * It would be started after the replica has taken the zone leadership and would be stopped when + * the replica is about to release the zone leadership. + */ + ZONE_LEADER_ONLY } diff --git a/server/src/test/java/com/linecorp/centraldogma/server/internal/api/auth/PermissionTest.java b/server/src/test/java/com/linecorp/centraldogma/server/internal/api/auth/PermissionTest.java index d6b9e9cf35..11c4961cf6 100644 --- a/server/src/test/java/com/linecorp/centraldogma/server/internal/api/auth/PermissionTest.java +++ b/server/src/test/java/com/linecorp/centraldogma/server/internal/api/auth/PermissionTest.java @@ -96,7 +96,7 @@ protected void configure(ServerBuilder sb) throws Exception { MoreExecutors.directExecutor(), NoopMeterRegistry.get(), null); final ServerStatusManager statusManager = new ServerStatusManager(dataDir); final CommandExecutor executor = new StandaloneCommandExecutor( - pm, ForkJoinPool.commonPool(), statusManager, null, null, null); + pm, ForkJoinPool.commonPool(), statusManager, null, null, null, null, null); executor.start().join(); new InternalProjectInitializer(executor, pm).initialize(); diff --git a/server/src/test/java/com/linecorp/centraldogma/server/internal/replication/Replica.java b/server/src/test/java/com/linecorp/centraldogma/server/internal/replication/Replica.java index 1a2ddd17d5..cf39759e66 100644 --- a/server/src/test/java/com/linecorp/centraldogma/server/internal/replication/Replica.java +++ b/server/src/test/java/com/linecorp/centraldogma/server/internal/replication/Replica.java @@ -68,7 +68,8 @@ final class Replica { final int id = spec.getServerId(); final ZooKeeperReplicationConfig zkCfg = new ZooKeeperReplicationConfig(id, servers); - commandExecutor = new ZooKeeperCommandExecutor(zkCfg, dataDir, new AbstractCommandExecutor(null, null) { + commandExecutor = new ZooKeeperCommandExecutor( + zkCfg, dataDir, new AbstractCommandExecutor(null, null, null, null) { @Override public int replicaId() { return id; @@ -78,18 +79,22 @@ public int replicaId() { public void setWriteQuota(String projectName, String repoName, QuotaConfig writeQuota) {} @Override - protected void doStart(@Nullable Runnable onTakeLeadership, - @Nullable Runnable onReleaseLeadership) {} + protected void doStart( + @Nullable Runnable onTakeLeadership, + @Nullable Runnable onReleaseLeadership, + @Nullable Runnable onTakeZoneLeadership, + @Nullable Runnable onReleaseZoneLeadership) {} @Override - protected void doStop(@Nullable Runnable onReleaseLeadership) {} + protected void doStop(@Nullable Runnable onReleaseLeadership, + @Nullable Runnable onReleaseZoneLeadership) {} @Override @SuppressWarnings("unchecked") protected CompletableFuture doExecute(Command command) { return (CompletableFuture) delegate.apply(command); } - }, meterRegistry, mock(ProjectManager.class), writeQuota, null, null); + }, meterRegistry, mock(ProjectManager.class), writeQuota, null, null, null, null, null); commandExecutor.setMetadataService(mockMetaService()); commandExecutor.setLockTimeoutMillis(10000); diff --git a/settings.gradle b/settings.gradle index 30f63381bf..2b35fe3a9b 100644 --- a/settings.gradle +++ b/settings.gradle @@ -35,7 +35,8 @@ project(':it:it-mirror').projectDir = file('it/mirror') includeWithFlags ':it:it-server', 'java', 'relocate' // Set correct directory names project(':it:it-server').projectDir = file('it/server') -includeWithFlags ':testing-internal', 'java', 'relocate' +includeWithFlags ':it:zone-leader-plugin', 'java', 'relocate' +includeWithFlags ':testing-internal', 'java', 'relocate' // Unpublished projects includeWithFlags ':webapp', 'java11' diff --git a/site/src/sphinx/setup-configuration.rst b/site/src/sphinx/setup-configuration.rst index 88faedf428..53a1ef209a 100644 --- a/site/src/sphinx/setup-configuration.rst +++ b/site/src/sphinx/setup-configuration.rst @@ -64,7 +64,8 @@ defaults: "port": 36463, "protocol": null, "path": null - } + }, + "zone": null } Core properties @@ -262,6 +263,16 @@ Core properties - the path of the management service. If not specified, the management service is mounted at ``/internal/management``. +- ``zone`` (string) + + - the zone name of the server. If not specified, ``PluginTarget.ZONE_LEADER_ONLY`` can't be used. + + - If the value starts with ``env:``, the environment variable is used as the zone name. + For example, if the value is ``env:ZONE_NAME``, the environment variable named ``ZONE_NAME`` is used as the + zone name. + + - You can also dynamically load a zone name by implementing :api:`com.linecorp.centraldogma.server.ConfigValueConverter`. + .. _replication: Configuring replication diff --git a/testing-internal/build.gradle b/testing-internal/build.gradle index bd73b3a9fe..5e8df88e87 100644 --- a/testing-internal/build.gradle +++ b/testing-internal/build.gradle @@ -4,5 +4,6 @@ dependencies { api libs.assertj api libs.json.unit.fluent + api libs.junit.pioneer api libs.logback12 } diff --git a/testing-internal/src/main/java/com/linecorp/centraldogma/testing/internal/CentralDogmaReplicationExtension.java b/testing-internal/src/main/java/com/linecorp/centraldogma/testing/internal/CentralDogmaReplicationExtension.java index a6ae4c9024..76d52fcff7 100644 --- a/testing-internal/src/main/java/com/linecorp/centraldogma/testing/internal/CentralDogmaReplicationExtension.java +++ b/testing-internal/src/main/java/com/linecorp/centraldogma/testing/internal/CentralDogmaReplicationExtension.java @@ -119,6 +119,7 @@ protected void configure(CentralDogmaBuilder builder) { .pluginConfigs(new MirroringServicePluginConfig(false)) .gracefulShutdownTimeout(new GracefulShutdownTimeout(0, 0)) .replication(new ZooKeeperReplicationConfig(serverId, zooKeeperServers)); + configureEach(serverId, builder); } @Override @@ -195,6 +196,12 @@ protected void after(ExtensionContext context) throws Exception { } } + /** + * Override this method to configure each server of the Central Dogma cluster. + * @param serverId the ID of the server that starts from {@code 1} to {@link #numReplicas} + */ + protected void configureEach(int serverId, CentralDogmaBuilder builder) {} + public void start() throws InterruptedException { if (dogmaCluster == null) { throw new IllegalStateException("Central Dogma cluster is not created yet"); diff --git a/testing-internal/src/main/java/com/linecorp/centraldogma/testing/internal/ProjectManagerExtension.java b/testing-internal/src/main/java/com/linecorp/centraldogma/testing/internal/ProjectManagerExtension.java index 48cebbe3ef..8c352a16ec 100644 --- a/testing-internal/src/main/java/com/linecorp/centraldogma/testing/internal/ProjectManagerExtension.java +++ b/testing-internal/src/main/java/com/linecorp/centraldogma/testing/internal/ProjectManagerExtension.java @@ -146,6 +146,6 @@ protected ProjectManager newProjectManager(Executor repositoryWorker, Executor p */ protected CommandExecutor newCommandExecutor(ProjectManager projectManager, Executor worker, File dataDir) { return new StandaloneCommandExecutor(projectManager, worker, new ServerStatusManager(dataDir), null, - null, null, null); + null, null, null, null, null); } }