Skip to content

Commit

Permalink
Provide a way to run a Plugin in a zone leader replica (#1044)
Browse files Browse the repository at this point in the history
Motivation:

When Central Dogma works on multiple IDC, we may need to run a `Plugin`
in a specific zone due to network latency, firewall policy, or load
balancing.

Modifications:

- Add `PluginTarget.ZONE_LEADER_ONLY` to run a `Plugin` in each zone
leader.
- Add `ZoneProvider` interface to dynamically resolve a zone name.
- Allow setting `zone` via `dogma.json` and `CentralDogmaBuilder`.
- If the value of `zone` starts with `classpath:`, the class is loaded
and cast to `ZoneProvider`.
- If the value starts with `$`, the value is read from the environment
variable.
- Add `onTakeZoneLeadership` and `onReleaseZoneLeadership` to the
methods and fields where `onTakeLeadership` and `onReleaseLeadership`
were defined before.
  - `/leader/{zone}` is used to take the zone leadership.

Result:

You can now run a `Plugin` in a zone leader with the following:

- `dogma.json`
  ```json
  {
     "zone": "my-zone" 
  }
  ```
- `Plugin`
  ```java
  class ZoneLeaderPlugin implements Plugin {

    @OverRide
    public PluginTarget target() {
      return PluginTarget.ZONE_LEADER_ONLY;
    }
  }
  ```
  • Loading branch information
ikhoon authored Nov 7, 2024
1 parent 755b3b4 commit 8ca35bf
Show file tree
Hide file tree
Showing 24 changed files with 583 additions and 69 deletions.
6 changes: 6 additions & 0 deletions dependencies.toml
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ jgit = "5.13.3.202401111512-r"
jgit6 = "6.10.0.202406032230-r"
junit4 = "4.13.2"
junit5 = "5.11.0"
# Don't upgrade junit-pioneer to 2.x.x that requires Java 11
junit-pioneer = "1.9.1"
jsch = "0.1.55"
# Don't update `json-path` version
json-path = "2.2.0"
Expand Down Expand Up @@ -299,6 +301,10 @@ module = "org.junit.platform:junit-platform-commons"
[libraries.junit5-platform-launcher]
module = "org.junit.platform:junit-platform-launcher"

[libraries.junit-pioneer]
module = "org.junit-pioneer:junit-pioneer"
version.ref = "junit-pioneer"

[libraries.kubernetes-client-api]
module = "io.fabric8:kubernetes-client-api"
version.ref = "kubernetes-client"
Expand Down
7 changes: 7 additions & 0 deletions it/server/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,10 @@ dependencies {

testImplementation libs.curator.test
}

// To use @SetEnvironmentVariable
if (project.ext.testJavaVersion >= 16) {
tasks.withType(Test) {
jvmArgs '--add-opens=java.base/java.lang=ALL-UNNAMED', '--add-opens=java.base/java.util=ALL-UNNAMED'
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import static org.assertj.core.api.Assertions.assertThat;

import org.junit.jupiter.api.Test;
import org.junitpioneer.jupiter.SetEnvironmentVariable;

import com.linecorp.centraldogma.server.CentralDogmaConfig;

Expand All @@ -30,4 +31,12 @@ void convert() {
assertThat(CentralDogmaConfig.convertValue("valid_prefix:value", "property"))
.isEqualTo("valid_value");
}

@SetEnvironmentVariable(key = "ZONE", value = "ZONE_A")
@SetEnvironmentVariable(key = "MY_ZONE", value = "ZONE_B")
@Test
void environmentVariable() {
assertThat(CentralDogmaConfig.convertValue("env:ZONE", "zone")).isEqualTo("ZONE_A");
assertThat(CentralDogmaConfig.convertValue("env:MY_ZONE", "zone")).isEqualTo("ZONE_B");
}
}
4 changes: 4 additions & 0 deletions it/zone-leader-plugin/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
dependencies {
testImplementation project(':server')
testImplementation libs.curator.test
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
/*
* Copyright 2024 LINE Corporation
*
* LINE Corporation licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/

package com.linecorp.centraldogma.it.zoneleader;

import static org.assertj.core.api.Assertions.assertThat;
import static org.awaitility.Awaitility.await;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletionStage;

import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

import com.linecorp.armeria.common.util.UnmodifiableFuture;
import com.linecorp.centraldogma.server.CentralDogmaBuilder;
import com.linecorp.centraldogma.server.plugin.Plugin;
import com.linecorp.centraldogma.server.plugin.PluginContext;
import com.linecorp.centraldogma.server.plugin.PluginTarget;
import com.linecorp.centraldogma.testing.internal.CentralDogmaReplicationExtension;

class ZoneLeaderPluginTest {

private static final List<ZoneLeaderTestPlugin> plugins = new ArrayList<>();
private static final int NUM_REPLICAS = 9;

@RegisterExtension
static CentralDogmaReplicationExtension cluster = new CentralDogmaReplicationExtension(NUM_REPLICAS) {
@Override
protected void configureEach(int serverId, CentralDogmaBuilder builder) {
if (serverId <= 3) {
builder.zone("zone1");
} else if (serverId <= 6) {
builder.zone("zone2");
} else {
builder.zone("zone3");
}
final ZoneLeaderTestPlugin plugin = new ZoneLeaderTestPlugin(serverId);
plugins.add(plugin);
builder.plugins(plugin);
}
};

@AfterAll
static void afterAll() {
plugins.clear();
}

@Test
void shouldSelectZoneLeaderOnly() throws InterruptedException {
assertZoneLeaderSelection();
final List<ZoneLeaderTestPlugin> zone1 = plugins.subList(0, 3);
final int zone1LeaderId = zoneLeaderId(zone1);
// Zone leadership should be released when the leader goes down.
cluster.servers().get(zone1LeaderId - 1).stopAsync().join();
// Wait for the new zone leader to be selected.
Thread.sleep(500);
assertZoneLeaderSelection();
final int newZone1LeaderId = zoneLeaderId(zone1);
assertThat(newZone1LeaderId).isNotEqualTo(zone1LeaderId);

final List<ZoneLeaderTestPlugin> zone2 = plugins.subList(3, 6);
final int zone2LeaderId = zoneLeaderId(zone2);
cluster.servers().get(zone2LeaderId - 1).stopAsync().join();
// Wait for the new zone leader to be selected.
Thread.sleep(500);
assertZoneLeaderSelection();
final int newZone2LeaderId = zoneLeaderId(zone2);
assertThat(newZone2LeaderId).isNotEqualTo(zone2LeaderId);
}

private static int zoneLeaderId(List<ZoneLeaderTestPlugin> plugins) {
return plugins.stream()
.filter(ZoneLeaderTestPlugin::isStarted)
.mapToInt(p -> p.serverId)
.findFirst()
.getAsInt();
}

/**
* Make sure that only zone leaders start {@link ZoneLeaderTestPlugin}.
*/
private static void assertZoneLeaderSelection() {
for (int i = 0; i < NUM_REPLICAS; i += 3) {
final List<ZoneLeaderTestPlugin> zonePlugins = plugins.subList(i, i + 3);
await().untilAsserted(() -> {
assertThat(zonePlugins.stream().filter(ZoneLeaderTestPlugin::isStarted)).hasSize(1);
});
}
}

private static final class ZoneLeaderTestPlugin implements Plugin {

private final int serverId;
private boolean started;

private ZoneLeaderTestPlugin(int serverId) {
this.serverId = serverId;
}

@Override
public PluginTarget target() {
return PluginTarget.ZONE_LEADER_ONLY;
}

boolean isStarted() {
return started;
}

@Override
public CompletionStage<Void> start(PluginContext context) {
started = true;
return UnmodifiableFuture.completedFuture(null);
}

@Override
public CompletionStage<Void> stop(PluginContext context) {
started = false;
return UnmodifiableFuture.completedFuture(null);
}

@Override
public Class<?> configType() {
return getClass();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Copyright 2024 LINE Corporation
*
* LINE Corporation licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/

package com.linecorp.centraldogma.server;

import static org.assertj.core.api.Assertions.assertThat;

import org.junit.jupiter.api.Test;

class DynamicZoneResolverTest {

@Test
void shouldUseCustomConfigValueConverter() {
final String zone = CentralDogmaConfig.convertValue("zone:dummy", "zone");
assertThat(zone).isEqualTo("ZONE_C");
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Copyright 2024 LINE Corporation
*
* LINE Corporation licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/

package com.linecorp.centraldogma.server;

import java.util.List;

import com.google.common.collect.ImmutableList;

public final class MyZoneProvider implements ConfigValueConverter {

@Override
public List<String> supportedPrefixes() {
return ImmutableList.of("zone");
}

@Override
public String convert(String prefix, String value) {
return "ZONE_C";
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
com.linecorp.centraldogma.server.MyZoneProvider
Loading

0 comments on commit 8ca35bf

Please sign in to comment.