Skip to content

Commit

Permalink
Metaclient updater retry logic (#2805)
Browse files Browse the repository at this point in the history
Add retry logic to MetaClient Updater
  • Loading branch information
GrantPSpencer authored Jun 21, 2024
1 parent e1bf949 commit 3055f26
Show file tree
Hide file tree
Showing 3 changed files with 222 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -196,13 +196,26 @@ public Stat (EntryMode mode, int version, long ctime, long mtime, long etime) {

/**
* Update existing data of a given key using an updater. This method will issue a read to get
* current data and apply updater upon the current data.
* current data and apply updater upon the current data. This method will NOT retry applying updated
* data upon failure.
* @param key key to identify the entry
* @param updater An updater that modifies the entry value.
* @return the updated value.
*/
T update(final String key, DataUpdater<T> updater);


/**
* Update existing data of a given key using an updater. This method will issue a read to get
* current data and apply updater upon the current data.
* @param key key to identify the entry
* @param updater An updater that modifies the entry value.
* @param retryOnFailure If true, updater should retry applying updated data upon failure.
* @param createIfAbsent If true, create the entry if it does not exist.
* @return the updated value.
*/
T update(final String key, DataUpdater<T> updater, boolean retryOnFailure, boolean createIfAbsent);

/**
* Check if there is an entry for the given key.
* @param key key to identify the entry
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
* under the License.
*/

import java.util.ConcurrentModificationException;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
Expand All @@ -39,6 +40,7 @@
import org.apache.helix.metaclient.api.MetaClientInterface;
import org.apache.helix.metaclient.api.Op;
import org.apache.helix.metaclient.api.OpResult;
import org.apache.helix.metaclient.exception.MetaClientBadVersionException;
import org.apache.helix.metaclient.exception.MetaClientException;
import org.apache.helix.metaclient.exception.MetaClientNoNodeException;
import org.apache.helix.metaclient.exception.MetaClientNodeExistsException;
Expand Down Expand Up @@ -207,16 +209,61 @@ public void set(String key, T data, int version) {

@Override
public T update(String key, DataUpdater<T> updater) {
org.apache.zookeeper.data.Stat stat = new org.apache.zookeeper.data.Stat();
// TODO: add retry logic for ZkBadVersionException.
try {
T oldData = _zkClient.readData(key, stat);
T newData = updater.update(oldData);
set(key, newData, stat.getVersion());
return newData;
} catch (ZkException e) {
throw translateZkExceptionToMetaclientException(e);
}
return update(key, updater, false, false);
}

@Override
public T update(String key, DataUpdater<T> updater, boolean retryOnFailure, boolean createIfAbsent) {
final int MAX_RETRY_ATTEMPTS = 3;
int retryAttempts = 0;
boolean retry;
T updatedData = null;
do {
retry = false;
retryAttempts++;
try {
ImmutablePair<T, Stat> tup = getDataAndStat(key);
Stat stat = tup.right;
T oldData = tup.left;
T newData = updater.update(oldData);
set(key, newData, stat.getVersion());
updatedData = newData;
} catch (MetaClientBadVersionException badVersionException) {
// If exceeded max retry attempts, re-throw exception
if (retryAttempts >= MAX_RETRY_ATTEMPTS) {
LOG.error("Failed to update node at {} after {} attempts.", key, MAX_RETRY_ATTEMPTS);
throw badVersionException;
}
// Retry on bad version
retry = true;
} catch (MetaClientNoNodeException noNodeException) {
if (!createIfAbsent) {
LOG.error("Failed to update node at {} as node does not exist. createIfAbsent was {}.", key, createIfAbsent);
throw noNodeException;
}
// If node does not exist, attempt to create it - pass null to updater
T newData = updater.update(null);
if (newData != null) {
try {
create(key, newData);
updatedData = newData;
// If parent node for key does not exist, then updater will immediately fail due to uncaught NoNodeException
} catch (MetaClientNodeExistsException nodeExistsException) {
// If exceeded max retry attempts, cast to ConcurrentModification exception and re-throw.
if (retryAttempts >= MAX_RETRY_ATTEMPTS) {
LOG.error("Failed to update node at {} after {} attempts.", key, MAX_RETRY_ATTEMPTS);
throw new ConcurrentModificationException("Failed to update node at " + key + " after " +
MAX_RETRY_ATTEMPTS + " attempts.", nodeExistsException);
}
// If node now exists, then retry update
retry = true;
} catch (ZkException e) {
throw translateZkExceptionToMetaclientException(e);
}
}
}
} while (retryOnFailure && retry);
return updatedData;
}

//TODO: Get Expiry Time in Stat
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,10 @@
import org.apache.helix.metaclient.api.ChildChangeListener;
import org.apache.helix.metaclient.api.DataUpdater;
import org.apache.helix.metaclient.api.MetaClientInterface;
import org.apache.helix.metaclient.exception.MetaClientBadVersionException;
import org.apache.helix.metaclient.exception.MetaClientException;
import org.apache.helix.metaclient.api.DirectChildChangeListener;

import java.io.StringWriter;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
Expand Down Expand Up @@ -222,33 +221,168 @@ public void testUpdate() {
try (ZkMetaClient<Integer> zkMetaClient = new ZkMetaClient<>(config)) {
zkMetaClient.connect();
int initValue = 3;
DataUpdater<Integer> updater = new DataUpdater<Integer>() {
@Override
public Integer update(Integer currentData) {
return currentData != null ? currentData + 1 : initValue;
}
};

zkMetaClient.create(key, initValue);
MetaClientInterface.Stat entryStat = zkMetaClient.exists(key);
Assert.assertEquals(entryStat.getVersion(), 0);

// test update() and validate entry value and version
Integer newData = zkMetaClient.update(key, new DataUpdater<Integer>() {
// Test updater basic success
for (int i = 1; i < 3; i++) {
Integer newData = zkMetaClient.update(key, updater);
Assert.assertEquals((int) newData, initValue + i);
Assert.assertEquals(zkMetaClient.exists(key).getVersion(), i);
}

// Cleanup
zkMetaClient.delete(key);
}
}

@Test
public void testUpdateWithRetry() throws InterruptedException {
final boolean RETRY_ON_FAILURE = true;
final boolean CREATE_IF_ABSENT = true;
final String key = "/TestZkMetaClient_testUpdateWithRetry";
ZkMetaClientConfig config =
new ZkMetaClientConfig.ZkMetaClientConfigBuilder().setConnectionAddress(ZK_ADDR).build();
try (ZkMetaClient<Integer> zkMetaClient = new ZkMetaClient<>(config)) {
zkMetaClient.connect();
int initValue = 3;
// Basic updater that increments node value by 1, starting at initValue
DataUpdater<Integer> basicUpdater = new DataUpdater<Integer>() {
@Override
public Integer update(Integer currentData) {
return currentData + 1;
return currentData != null ? currentData + 1 : initValue;
}
});
Assert.assertEquals((int) newData, (int) initValue + 1);
};

entryStat = zkMetaClient.exists(key);
Assert.assertEquals(entryStat.getVersion(), 1);
// Test updater fails create node if it doesn't exist when createIfAbsent is false
try {
zkMetaClient.update(key, basicUpdater, RETRY_ON_FAILURE, false);
Assert.fail("Updater should have thrown error");
} catch (MetaClientNoNodeException e) {
Assert.assertFalse(zkMetaClient.exists(key) != null);
}

newData = zkMetaClient.update(key, new DataUpdater<Integer>() {
// Test updater fails when parent path does not exist
try {
zkMetaClient.update(key + "/child", basicUpdater, RETRY_ON_FAILURE, CREATE_IF_ABSENT);
Assert.fail("Updater should have thrown error");
} catch (MetaClientNoNodeException e) {
Assert.assertFalse(zkMetaClient.exists(key + "/child") != null);
}

// Test updater creates node if it doesn't exist when createIfAbsent is true
Integer newData = zkMetaClient.update(key, basicUpdater, RETRY_ON_FAILURE, CREATE_IF_ABSENT);
Assert.assertEquals((int) newData, initValue);
Assert.assertEquals(zkMetaClient.exists(key).getVersion(), 0);

// Cleanup
zkMetaClient.delete(key);

AtomicBoolean latch = new AtomicBoolean();

// Increments znode version and sets latch value to true
DataUpdater<Integer> versionIncrementUpdater = new DataUpdater<Integer>() {
@Override
public Integer update(Integer currentData) {
return currentData + 1;
latch.set(true);
return currentData;
}
});
};

entryStat = zkMetaClient.exists(key);
Assert.assertEquals(entryStat.getVersion(), 2);
Assert.assertEquals((int) newData, (int) initValue + 2);
// Reads znode, calls versionIncrementUpdater, fails to update due to bad version, then retries and should succeed
DataUpdater<Integer> failsOnceUpdater = new DataUpdater<Integer>() {
@Override
public Integer update(Integer currentData) {
try {
while (!latch.get()) {
zkMetaClient.update(key, versionIncrementUpdater, RETRY_ON_FAILURE, CREATE_IF_ABSENT);
}
return currentData != null ? currentData + 1 : initValue;
} catch (MetaClientException e) {
return -1;
}
}
};

// Always fails to update due to bad version
DataUpdater<Integer> alwaysFailLatchedUpdater = new DataUpdater<Integer>() {
@Override
public Integer update(Integer currentData) {
try {
latch.set(false);
while (!latch.get()) {
zkMetaClient.update(key, versionIncrementUpdater, RETRY_ON_FAILURE, CREATE_IF_ABSENT);
}
return currentData != null ? currentData + 1 : initValue;
} catch (MetaClientException e) {
return -1;
}
}
};

// Updater reads znode, sees it does not exist and attempts to create it, but should fail as znode already created
// due to create() call in updater. Should then retry and successfully update the node.
DataUpdater<Integer> failOnFirstCreateLatchedUpdater = new DataUpdater<Integer>() {
@Override
public Integer update(Integer currentData) {
try {
if (!latch.get()) {
zkMetaClient.create(key, initValue);
latch.set(true);
}
return currentData != null ? currentData + 1 : initValue;
} catch (MetaClientException e) {
return -1;
}
}
};

// Throws error when update called
DataUpdater<Integer> errorUpdater = new DataUpdater<Integer>() {
@Override
public Integer update(Integer currentData) {
throw new RuntimeException("IGNORABLE: Test dataUpdater correctly throws exception");
}
};

// Reset latch
latch.set(false);
// Test updater retries on bad version
// Latched updater should read znode at version 0, but attempt to write to version 1 which fails. Should retry
// and increment version to 2
zkMetaClient.create(key, initValue);
zkMetaClient.update(key, failsOnceUpdater, RETRY_ON_FAILURE, CREATE_IF_ABSENT);
Assert.assertEquals((int) zkMetaClient.get(key), initValue + 1);
Assert.assertEquals(zkMetaClient.exists(key).getVersion(), 2);

// Reset latch
latch.set(false);
// Test updater fails on retries exceeded
try {
zkMetaClient.update(key, alwaysFailLatchedUpdater, RETRY_ON_FAILURE, CREATE_IF_ABSENT);
Assert.fail("Updater should have thrown error");
} catch (MetaClientBadVersionException e) {}


// Test updater throws error
try {
zkMetaClient.update(key, errorUpdater, RETRY_ON_FAILURE, CREATE_IF_ABSENT);
Assert.fail("DataUpdater should have thrown error");
} catch (RuntimeException e) {}

// Reset latch and cleanup old node
latch.set(false);
zkMetaClient.delete(key);
// Test updater retries update if node does not exist on read, but then exists when updater attempts to create it
zkMetaClient.update(key, failOnFirstCreateLatchedUpdater, RETRY_ON_FAILURE, CREATE_IF_ABSENT);
Assert.assertEquals((int) zkMetaClient.get(key), initValue + 1);
Assert.assertEquals(zkMetaClient.exists(key).getVersion(), 1);
zkMetaClient.delete(key);
}
}
Expand Down

0 comments on commit 3055f26

Please sign in to comment.