Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Metaclient updater retry logic #2805

Merged
merged 4 commits into from
Jun 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -196,13 +196,26 @@ public Stat (EntryMode mode, int version, long ctime, long mtime, long etime) {

/**
* Update existing data of a given key using an updater. This method will issue a read to get
* current data and apply updater upon the current data.
* current data and apply updater upon the current data. This method will NOT retry applying updated
* data upon failure.
* @param key key to identify the entry
* @param updater An updater that modifies the entry value.
* @return the updated value.
*/
T update(final String key, DataUpdater<T> updater);


/**
* Update existing data of a given key using an updater. This method will issue a read to get
* current data and apply updater upon the current data.
* @param key key to identify the entry
* @param updater An updater that modifies the entry value.
* @param retryOnFailure If true, updater should retry applying updated data upon failure.
* @param createIfAbsent If true, create the entry if it does not exist.
* @return the updated value.
*/
T update(final String key, DataUpdater<T> updater, boolean retryOnFailure, boolean createIfAbsent);

/**
* Check if there is an entry for the given key.
* @param key key to identify the entry
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
* under the License.
*/

import java.util.ConcurrentModificationException;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
Expand All @@ -39,6 +40,7 @@
import org.apache.helix.metaclient.api.MetaClientInterface;
import org.apache.helix.metaclient.api.Op;
import org.apache.helix.metaclient.api.OpResult;
import org.apache.helix.metaclient.exception.MetaClientBadVersionException;
import org.apache.helix.metaclient.exception.MetaClientException;
import org.apache.helix.metaclient.exception.MetaClientNoNodeException;
import org.apache.helix.metaclient.exception.MetaClientNodeExistsException;
Expand Down Expand Up @@ -207,16 +209,61 @@ public void set(String key, T data, int version) {

@Override
public T update(String key, DataUpdater<T> updater) {
org.apache.zookeeper.data.Stat stat = new org.apache.zookeeper.data.Stat();
// TODO: add retry logic for ZkBadVersionException.
try {
T oldData = _zkClient.readData(key, stat);
T newData = updater.update(oldData);
set(key, newData, stat.getVersion());
return newData;
} catch (ZkException e) {
throw translateZkExceptionToMetaclientException(e);
}
return update(key, updater, false, false);
}

@Override
public T update(String key, DataUpdater<T> updater, boolean retryOnFailure, boolean createIfAbsent) {
final int MAX_RETRY_ATTEMPTS = 3;
int retryAttempts = 0;
boolean retry;
T updatedData = null;
do {
retry = false;
retryAttempts++;
try {
ImmutablePair<T, Stat> tup = getDataAndStat(key);
Stat stat = tup.right;
T oldData = tup.left;
T newData = updater.update(oldData);
set(key, newData, stat.getVersion());
updatedData = newData;
} catch (MetaClientBadVersionException badVersionException) {
// If exceeded max retry attempts, re-throw exception
if (retryAttempts >= MAX_RETRY_ATTEMPTS) {
LOG.error("Failed to update node at {} after {} attempts.", key, MAX_RETRY_ATTEMPTS);
throw badVersionException;
}
// Retry on bad version
retry = true;
} catch (MetaClientNoNodeException noNodeException) {
if (!createIfAbsent) {
LOG.error("Failed to update node at {} as node does not exist. createIfAbsent was {}.", key, createIfAbsent);
throw noNodeException;
}
// If node does not exist, attempt to create it - pass null to updater
T newData = updater.update(null);
if (newData != null) {
try {
create(key, newData);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for creating the PR!
Question about the retry logic here.
For example, when user try to update node "/a/b/c", and there is no node "/a/b", set in line 228 with throw NoNodeException, and we reach line 243. Since there is no node "/a/b", create will also throw NoNodeException, and we keep retry until expire. Should we do recursive create instead? Or we just let it expire and forward the NoNodeException?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Really good question. I am leaning towards not recursively creating, because that is the same behavior as helix zk client. However, if this API's contract with customers is "give me a node path, and I will make sure that it has the data you want" then I think there's a good argument for making our create attempt recursive rather than just on the leaf node. cc @junkaixue do you have thoughts on this?

And to clarify, this is the current flow if you tried to update path /a/b/c when no nodes in that path exist yet. (It should fail before retrying update)
try to update /a/b/c --> MetaClientNoNodeException is caught
try to create /a/b/ --> MetaClientNoNodeException this is not caught and so immediately fails
we do not retry update as method failed due to above exception

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am also thinking of no creation. If this is update, it supposed to be updating something was there. It has the possibility that the node just be deleted recursively before update. Then update should be an invalidate operation and fail.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would second no creation as well. Say if we try to update "/a/b/c" and get no node exception, we just fail immediately.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Spoke with @xyuanlu offline. Added createIfAbsent parameter. Base functionality is to not create the node, but will attempt to create leaf node if it is not present. It will not recursively create and will fail on first create() call if parent path does not exist.

Added relevant test cases to cover this
cc @junkaixue

updatedData = newData;
// If parent node for key does not exist, then updater will immediately fail due to uncaught NoNodeException
} catch (MetaClientNodeExistsException nodeExistsException) {
// If exceeded max retry attempts, cast to ConcurrentModification exception and re-throw.
if (retryAttempts >= MAX_RETRY_ATTEMPTS) {
LOG.error("Failed to update node at {} after {} attempts.", key, MAX_RETRY_ATTEMPTS);
throw new ConcurrentModificationException("Failed to update node at " + key + " after " +
MAX_RETRY_ATTEMPTS + " attempts.", nodeExistsException);
}
// If node now exists, then retry update
retry = true;
} catch (ZkException e) {
throw translateZkExceptionToMetaclientException(e);
}
}
}
} while (retryOnFailure && retry);
return updatedData;
}

//TODO: Get Expiry Time in Stat
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,10 @@
import org.apache.helix.metaclient.api.ChildChangeListener;
import org.apache.helix.metaclient.api.DataUpdater;
import org.apache.helix.metaclient.api.MetaClientInterface;
import org.apache.helix.metaclient.exception.MetaClientBadVersionException;
import org.apache.helix.metaclient.exception.MetaClientException;
import org.apache.helix.metaclient.api.DirectChildChangeListener;

import java.io.StringWriter;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
Expand Down Expand Up @@ -222,33 +221,168 @@ public void testUpdate() {
try (ZkMetaClient<Integer> zkMetaClient = new ZkMetaClient<>(config)) {
zkMetaClient.connect();
int initValue = 3;
DataUpdater<Integer> updater = new DataUpdater<Integer>() {
@Override
public Integer update(Integer currentData) {
return currentData != null ? currentData + 1 : initValue;
}
};

zkMetaClient.create(key, initValue);
MetaClientInterface.Stat entryStat = zkMetaClient.exists(key);
Assert.assertEquals(entryStat.getVersion(), 0);

// test update() and validate entry value and version
Integer newData = zkMetaClient.update(key, new DataUpdater<Integer>() {
// Test updater basic success
for (int i = 1; i < 3; i++) {
Integer newData = zkMetaClient.update(key, updater);
Assert.assertEquals((int) newData, initValue + i);
Assert.assertEquals(zkMetaClient.exists(key).getVersion(), i);
}

// Cleanup
zkMetaClient.delete(key);
}
}

@Test
public void testUpdateWithRetry() throws InterruptedException {
final boolean RETRY_ON_FAILURE = true;
final boolean CREATE_IF_ABSENT = true;
final String key = "/TestZkMetaClient_testUpdateWithRetry";
ZkMetaClientConfig config =
new ZkMetaClientConfig.ZkMetaClientConfigBuilder().setConnectionAddress(ZK_ADDR).build();
try (ZkMetaClient<Integer> zkMetaClient = new ZkMetaClient<>(config)) {
zkMetaClient.connect();
int initValue = 3;
// Basic updater that increments node value by 1, starting at initValue
DataUpdater<Integer> basicUpdater = new DataUpdater<Integer>() {
@Override
public Integer update(Integer currentData) {
return currentData + 1;
return currentData != null ? currentData + 1 : initValue;
}
});
Assert.assertEquals((int) newData, (int) initValue + 1);
};

entryStat = zkMetaClient.exists(key);
Assert.assertEquals(entryStat.getVersion(), 1);
// Test updater fails create node if it doesn't exist when createIfAbsent is false
try {
zkMetaClient.update(key, basicUpdater, RETRY_ON_FAILURE, false);
Assert.fail("Updater should have thrown error");
} catch (MetaClientNoNodeException e) {
Assert.assertFalse(zkMetaClient.exists(key) != null);
}

newData = zkMetaClient.update(key, new DataUpdater<Integer>() {
// Test updater fails when parent path does not exist
try {
zkMetaClient.update(key + "/child", basicUpdater, RETRY_ON_FAILURE, CREATE_IF_ABSENT);
Assert.fail("Updater should have thrown error");
} catch (MetaClientNoNodeException e) {
Assert.assertFalse(zkMetaClient.exists(key + "/child") != null);
}

// Test updater creates node if it doesn't exist when createIfAbsent is true
Integer newData = zkMetaClient.update(key, basicUpdater, RETRY_ON_FAILURE, CREATE_IF_ABSENT);
Assert.assertEquals((int) newData, initValue);
Assert.assertEquals(zkMetaClient.exists(key).getVersion(), 0);

// Cleanup
zkMetaClient.delete(key);

AtomicBoolean latch = new AtomicBoolean();

// Increments znode version and sets latch value to true
DataUpdater<Integer> versionIncrementUpdater = new DataUpdater<Integer>() {
@Override
public Integer update(Integer currentData) {
return currentData + 1;
latch.set(true);
return currentData;
}
});
};

entryStat = zkMetaClient.exists(key);
Assert.assertEquals(entryStat.getVersion(), 2);
Assert.assertEquals((int) newData, (int) initValue + 2);
// Reads znode, calls versionIncrementUpdater, fails to update due to bad version, then retries and should succeed
DataUpdater<Integer> failsOnceUpdater = new DataUpdater<Integer>() {
@Override
public Integer update(Integer currentData) {
try {
while (!latch.get()) {
zkMetaClient.update(key, versionIncrementUpdater, RETRY_ON_FAILURE, CREATE_IF_ABSENT);
}
return currentData != null ? currentData + 1 : initValue;
} catch (MetaClientException e) {
return -1;
}
}
};

// Always fails to update due to bad version
DataUpdater<Integer> alwaysFailLatchedUpdater = new DataUpdater<Integer>() {
@Override
public Integer update(Integer currentData) {
try {
latch.set(false);
while (!latch.get()) {
zkMetaClient.update(key, versionIncrementUpdater, RETRY_ON_FAILURE, CREATE_IF_ABSENT);
}
return currentData != null ? currentData + 1 : initValue;
} catch (MetaClientException e) {
return -1;
}
}
};

// Updater reads znode, sees it does not exist and attempts to create it, but should fail as znode already created
// due to create() call in updater. Should then retry and successfully update the node.
DataUpdater<Integer> failOnFirstCreateLatchedUpdater = new DataUpdater<Integer>() {
@Override
public Integer update(Integer currentData) {
try {
if (!latch.get()) {
zkMetaClient.create(key, initValue);
latch.set(true);
}
return currentData != null ? currentData + 1 : initValue;
} catch (MetaClientException e) {
return -1;
}
}
};

// Throws error when update called
DataUpdater<Integer> errorUpdater = new DataUpdater<Integer>() {
@Override
public Integer update(Integer currentData) {
throw new RuntimeException("IGNORABLE: Test dataUpdater correctly throws exception");
}
};

// Reset latch
latch.set(false);
// Test updater retries on bad version
// Latched updater should read znode at version 0, but attempt to write to version 1 which fails. Should retry
// and increment version to 2
zkMetaClient.create(key, initValue);
zkMetaClient.update(key, failsOnceUpdater, RETRY_ON_FAILURE, CREATE_IF_ABSENT);
Assert.assertEquals((int) zkMetaClient.get(key), initValue + 1);
Assert.assertEquals(zkMetaClient.exists(key).getVersion(), 2);

// Reset latch
latch.set(false);
// Test updater fails on retries exceeded
try {
zkMetaClient.update(key, alwaysFailLatchedUpdater, RETRY_ON_FAILURE, CREATE_IF_ABSENT);
Assert.fail("Updater should have thrown error");
} catch (MetaClientBadVersionException e) {}


// Test updater throws error
try {
zkMetaClient.update(key, errorUpdater, RETRY_ON_FAILURE, CREATE_IF_ABSENT);
Assert.fail("DataUpdater should have thrown error");
} catch (RuntimeException e) {}

// Reset latch and cleanup old node
latch.set(false);
zkMetaClient.delete(key);
// Test updater retries update if node does not exist on read, but then exists when updater attempts to create it
zkMetaClient.update(key, failOnFirstCreateLatchedUpdater, RETRY_ON_FAILURE, CREATE_IF_ABSENT);
Assert.assertEquals((int) zkMetaClient.get(key), initValue + 1);
Assert.assertEquals(zkMetaClient.exists(key).getVersion(), 1);
zkMetaClient.delete(key);
}
}
Expand Down
Loading