From 3055f26aa4f040f549ca91581fc6b2ff6b275f33 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Grant=20Pal=C3=A1u=20Spencer?= Date: Fri, 21 Jun 2024 16:01:09 -0700 Subject: [PATCH] Metaclient updater retry logic (#2805) Add retry logic to MetaClient Updater --- .../metaclient/api/MetaClientInterface.java | 15 +- .../metaclient/impl/zk/ZkMetaClient.java | 67 +++++-- .../metaclient/impl/zk/TestZkMetaClient.java | 168 ++++++++++++++++-- 3 files changed, 222 insertions(+), 28 deletions(-) diff --git a/meta-client/src/main/java/org/apache/helix/metaclient/api/MetaClientInterface.java b/meta-client/src/main/java/org/apache/helix/metaclient/api/MetaClientInterface.java index 5403e7ca44..c8a1d0d36d 100644 --- a/meta-client/src/main/java/org/apache/helix/metaclient/api/MetaClientInterface.java +++ b/meta-client/src/main/java/org/apache/helix/metaclient/api/MetaClientInterface.java @@ -196,13 +196,26 @@ public Stat (EntryMode mode, int version, long ctime, long mtime, long etime) { /** * Update existing data of a given key using an updater. This method will issue a read to get - * current data and apply updater upon the current data. + * current data and apply updater upon the current data. This method will NOT retry applying updated + * data upon failure. * @param key key to identify the entry * @param updater An updater that modifies the entry value. * @return the updated value. */ T update(final String key, DataUpdater updater); + + /** + * Update existing data of a given key using an updater. This method will issue a read to get + * current data and apply updater upon the current data. + * @param key key to identify the entry + * @param updater An updater that modifies the entry value. + * @param retryOnFailure If true, updater should retry applying updated data upon failure. + * @param createIfAbsent If true, create the entry if it does not exist. + * @return the updated value. + */ + T update(final String key, DataUpdater updater, boolean retryOnFailure, boolean createIfAbsent); + /** * Check if there is an entry for the given key. * @param key key to identify the entry diff --git a/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/ZkMetaClient.java b/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/ZkMetaClient.java index 16d28c6d7a..f4594d5d6f 100644 --- a/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/ZkMetaClient.java +++ b/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/ZkMetaClient.java @@ -19,6 +19,7 @@ * under the License. */ +import java.util.ConcurrentModificationException; import java.util.List; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; @@ -39,6 +40,7 @@ import org.apache.helix.metaclient.api.MetaClientInterface; import org.apache.helix.metaclient.api.Op; import org.apache.helix.metaclient.api.OpResult; +import org.apache.helix.metaclient.exception.MetaClientBadVersionException; import org.apache.helix.metaclient.exception.MetaClientException; import org.apache.helix.metaclient.exception.MetaClientNoNodeException; import org.apache.helix.metaclient.exception.MetaClientNodeExistsException; @@ -207,16 +209,61 @@ public void set(String key, T data, int version) { @Override public T update(String key, DataUpdater updater) { - org.apache.zookeeper.data.Stat stat = new org.apache.zookeeper.data.Stat(); - // TODO: add retry logic for ZkBadVersionException. - try { - T oldData = _zkClient.readData(key, stat); - T newData = updater.update(oldData); - set(key, newData, stat.getVersion()); - return newData; - } catch (ZkException e) { - throw translateZkExceptionToMetaclientException(e); - } + return update(key, updater, false, false); + } + + @Override + public T update(String key, DataUpdater updater, boolean retryOnFailure, boolean createIfAbsent) { + final int MAX_RETRY_ATTEMPTS = 3; + int retryAttempts = 0; + boolean retry; + T updatedData = null; + do { + retry = false; + retryAttempts++; + try { + ImmutablePair tup = getDataAndStat(key); + Stat stat = tup.right; + T oldData = tup.left; + T newData = updater.update(oldData); + set(key, newData, stat.getVersion()); + updatedData = newData; + } catch (MetaClientBadVersionException badVersionException) { + // If exceeded max retry attempts, re-throw exception + if (retryAttempts >= MAX_RETRY_ATTEMPTS) { + LOG.error("Failed to update node at {} after {} attempts.", key, MAX_RETRY_ATTEMPTS); + throw badVersionException; + } + // Retry on bad version + retry = true; + } catch (MetaClientNoNodeException noNodeException) { + if (!createIfAbsent) { + LOG.error("Failed to update node at {} as node does not exist. createIfAbsent was {}.", key, createIfAbsent); + throw noNodeException; + } + // If node does not exist, attempt to create it - pass null to updater + T newData = updater.update(null); + if (newData != null) { + try { + create(key, newData); + updatedData = newData; + // If parent node for key does not exist, then updater will immediately fail due to uncaught NoNodeException + } catch (MetaClientNodeExistsException nodeExistsException) { + // If exceeded max retry attempts, cast to ConcurrentModification exception and re-throw. + if (retryAttempts >= MAX_RETRY_ATTEMPTS) { + LOG.error("Failed to update node at {} after {} attempts.", key, MAX_RETRY_ATTEMPTS); + throw new ConcurrentModificationException("Failed to update node at " + key + " after " + + MAX_RETRY_ATTEMPTS + " attempts.", nodeExistsException); + } + // If node now exists, then retry update + retry = true; + } catch (ZkException e) { + throw translateZkExceptionToMetaclientException(e); + } + } + } + } while (retryOnFailure && retry); + return updatedData; } //TODO: Get Expiry Time in Stat diff --git a/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestZkMetaClient.java b/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestZkMetaClient.java index a5da69f2f1..2919893ff8 100644 --- a/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestZkMetaClient.java +++ b/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestZkMetaClient.java @@ -23,11 +23,10 @@ import org.apache.helix.metaclient.api.ChildChangeListener; import org.apache.helix.metaclient.api.DataUpdater; import org.apache.helix.metaclient.api.MetaClientInterface; +import org.apache.helix.metaclient.exception.MetaClientBadVersionException; import org.apache.helix.metaclient.exception.MetaClientException; import org.apache.helix.metaclient.api.DirectChildChangeListener; -import java.io.StringWriter; -import java.util.ArrayList; import java.util.Arrays; import java.util.HashSet; import java.util.List; @@ -222,33 +221,168 @@ public void testUpdate() { try (ZkMetaClient zkMetaClient = new ZkMetaClient<>(config)) { zkMetaClient.connect(); int initValue = 3; + DataUpdater updater = new DataUpdater() { + @Override + public Integer update(Integer currentData) { + return currentData != null ? currentData + 1 : initValue; + } + }; + zkMetaClient.create(key, initValue); - MetaClientInterface.Stat entryStat = zkMetaClient.exists(key); - Assert.assertEquals(entryStat.getVersion(), 0); - // test update() and validate entry value and version - Integer newData = zkMetaClient.update(key, new DataUpdater() { + // Test updater basic success + for (int i = 1; i < 3; i++) { + Integer newData = zkMetaClient.update(key, updater); + Assert.assertEquals((int) newData, initValue + i); + Assert.assertEquals(zkMetaClient.exists(key).getVersion(), i); + } + + // Cleanup + zkMetaClient.delete(key); + } + } + + @Test + public void testUpdateWithRetry() throws InterruptedException { + final boolean RETRY_ON_FAILURE = true; + final boolean CREATE_IF_ABSENT = true; + final String key = "/TestZkMetaClient_testUpdateWithRetry"; + ZkMetaClientConfig config = + new ZkMetaClientConfig.ZkMetaClientConfigBuilder().setConnectionAddress(ZK_ADDR).build(); + try (ZkMetaClient zkMetaClient = new ZkMetaClient<>(config)) { + zkMetaClient.connect(); + int initValue = 3; + // Basic updater that increments node value by 1, starting at initValue + DataUpdater basicUpdater = new DataUpdater() { @Override public Integer update(Integer currentData) { - return currentData + 1; + return currentData != null ? currentData + 1 : initValue; } - }); - Assert.assertEquals((int) newData, (int) initValue + 1); + }; - entryStat = zkMetaClient.exists(key); - Assert.assertEquals(entryStat.getVersion(), 1); + // Test updater fails create node if it doesn't exist when createIfAbsent is false + try { + zkMetaClient.update(key, basicUpdater, RETRY_ON_FAILURE, false); + Assert.fail("Updater should have thrown error"); + } catch (MetaClientNoNodeException e) { + Assert.assertFalse(zkMetaClient.exists(key) != null); + } - newData = zkMetaClient.update(key, new DataUpdater() { + // Test updater fails when parent path does not exist + try { + zkMetaClient.update(key + "/child", basicUpdater, RETRY_ON_FAILURE, CREATE_IF_ABSENT); + Assert.fail("Updater should have thrown error"); + } catch (MetaClientNoNodeException e) { + Assert.assertFalse(zkMetaClient.exists(key + "/child") != null); + } + // Test updater creates node if it doesn't exist when createIfAbsent is true + Integer newData = zkMetaClient.update(key, basicUpdater, RETRY_ON_FAILURE, CREATE_IF_ABSENT); + Assert.assertEquals((int) newData, initValue); + Assert.assertEquals(zkMetaClient.exists(key).getVersion(), 0); + + // Cleanup + zkMetaClient.delete(key); + + AtomicBoolean latch = new AtomicBoolean(); + + // Increments znode version and sets latch value to true + DataUpdater versionIncrementUpdater = new DataUpdater() { @Override public Integer update(Integer currentData) { - return currentData + 1; + latch.set(true); + return currentData; } - }); + }; - entryStat = zkMetaClient.exists(key); - Assert.assertEquals(entryStat.getVersion(), 2); - Assert.assertEquals((int) newData, (int) initValue + 2); + // Reads znode, calls versionIncrementUpdater, fails to update due to bad version, then retries and should succeed + DataUpdater failsOnceUpdater = new DataUpdater() { + @Override + public Integer update(Integer currentData) { + try { + while (!latch.get()) { + zkMetaClient.update(key, versionIncrementUpdater, RETRY_ON_FAILURE, CREATE_IF_ABSENT); + } + return currentData != null ? currentData + 1 : initValue; + } catch (MetaClientException e) { + return -1; + } + } + }; + + // Always fails to update due to bad version + DataUpdater alwaysFailLatchedUpdater = new DataUpdater() { + @Override + public Integer update(Integer currentData) { + try { + latch.set(false); + while (!latch.get()) { + zkMetaClient.update(key, versionIncrementUpdater, RETRY_ON_FAILURE, CREATE_IF_ABSENT); + } + return currentData != null ? currentData + 1 : initValue; + } catch (MetaClientException e) { + return -1; + } + } + }; + + // Updater reads znode, sees it does not exist and attempts to create it, but should fail as znode already created + // due to create() call in updater. Should then retry and successfully update the node. + DataUpdater failOnFirstCreateLatchedUpdater = new DataUpdater() { + @Override + public Integer update(Integer currentData) { + try { + if (!latch.get()) { + zkMetaClient.create(key, initValue); + latch.set(true); + } + return currentData != null ? currentData + 1 : initValue; + } catch (MetaClientException e) { + return -1; + } + } + }; + + // Throws error when update called + DataUpdater errorUpdater = new DataUpdater() { + @Override + public Integer update(Integer currentData) { + throw new RuntimeException("IGNORABLE: Test dataUpdater correctly throws exception"); + } + }; + + // Reset latch + latch.set(false); + // Test updater retries on bad version + // Latched updater should read znode at version 0, but attempt to write to version 1 which fails. Should retry + // and increment version to 2 + zkMetaClient.create(key, initValue); + zkMetaClient.update(key, failsOnceUpdater, RETRY_ON_FAILURE, CREATE_IF_ABSENT); + Assert.assertEquals((int) zkMetaClient.get(key), initValue + 1); + Assert.assertEquals(zkMetaClient.exists(key).getVersion(), 2); + + // Reset latch + latch.set(false); + // Test updater fails on retries exceeded + try { + zkMetaClient.update(key, alwaysFailLatchedUpdater, RETRY_ON_FAILURE, CREATE_IF_ABSENT); + Assert.fail("Updater should have thrown error"); + } catch (MetaClientBadVersionException e) {} + + + // Test updater throws error + try { + zkMetaClient.update(key, errorUpdater, RETRY_ON_FAILURE, CREATE_IF_ABSENT); + Assert.fail("DataUpdater should have thrown error"); + } catch (RuntimeException e) {} + + // Reset latch and cleanup old node + latch.set(false); + zkMetaClient.delete(key); + // Test updater retries update if node does not exist on read, but then exists when updater attempts to create it + zkMetaClient.update(key, failOnFirstCreateLatchedUpdater, RETRY_ON_FAILURE, CREATE_IF_ABSENT); + Assert.assertEquals((int) zkMetaClient.get(key), initValue + 1); + Assert.assertEquals(zkMetaClient.exists(key).getVersion(), 1); zkMetaClient.delete(key); } }