From 118f2b6c8349a361d46802230a7a409665c12d3e Mon Sep 17 00:00:00 2001 From: Grant Palau Spencer Date: Thu, 16 May 2024 22:08:36 -0700 Subject: [PATCH 1/4] add metaclient updater retry logic --- .../metaclient/impl/zk/ZkMetaClient.java | 42 ++++++--- .../metaclient/impl/zk/TestZkMetaClient.java | 87 +++++++++++++++---- 2 files changed, 102 insertions(+), 27 deletions(-) diff --git a/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/ZkMetaClient.java b/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/ZkMetaClient.java index 16d28c6d7a..d422fddfcd 100644 --- a/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/ZkMetaClient.java +++ b/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/ZkMetaClient.java @@ -39,6 +39,7 @@ import org.apache.helix.metaclient.api.MetaClientInterface; import org.apache.helix.metaclient.api.Op; import org.apache.helix.metaclient.api.OpResult; +import org.apache.helix.metaclient.exception.MetaClientBadVersionException; import org.apache.helix.metaclient.exception.MetaClientException; import org.apache.helix.metaclient.exception.MetaClientNoNodeException; import org.apache.helix.metaclient.exception.MetaClientNodeExistsException; @@ -58,6 +59,7 @@ import org.apache.helix.zookeeper.zkclient.IZkStateListener; import org.apache.helix.zookeeper.zkclient.ZkConnection; import org.apache.helix.zookeeper.zkclient.exception.ZkException; +import org.apache.helix.zookeeper.zkclient.exception.ZkNoNodeException; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.Watcher; @@ -207,16 +209,36 @@ public void set(String key, T data, int version) { @Override public T update(String key, DataUpdater updater) { - org.apache.zookeeper.data.Stat stat = new org.apache.zookeeper.data.Stat(); - // TODO: add retry logic for ZkBadVersionException. - try { - T oldData = _zkClient.readData(key, stat); - T newData = updater.update(oldData); - set(key, newData, stat.getVersion()); - return newData; - } catch (ZkException e) { - throw translateZkExceptionToMetaclientException(e); - } + boolean retry; + T updatedData = null; + do { + retry = false; + try { + org.apache.zookeeper.data.Stat stat = new org.apache.zookeeper.data.Stat(); + T oldData = _zkClient.readData(key, stat); + T newData = updater.update(oldData); + set(key, newData, stat.getVersion()); + updatedData = newData; + } catch (MetaClientBadVersionException badVersionException) { + // Retry on bad version + retry = true; + } catch (ZkNoNodeException noNodeException) { + // If node does not exist, attempt to create it - pass null to updater + T newData = updater.update(null); + if (newData != null) { + try { + create(key, newData); + updatedData = newData; + } catch (MetaClientNodeExistsException nodeExistsException) { + // If node now exists, then retry update + retry = true; + } catch (ZkException e) { + throw translateZkExceptionToMetaclientException(e); + } + } + } + } while (retry); + return updatedData; } //TODO: Get Expiry Time in Stat diff --git a/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestZkMetaClient.java b/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestZkMetaClient.java index a5da69f2f1..9302385141 100644 --- a/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestZkMetaClient.java +++ b/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestZkMetaClient.java @@ -215,40 +215,93 @@ public void testSet() { } @Test - public void testUpdate() { + public void testUpdate() throws InterruptedException { + int testIterationCount = 2; final String key = "/TestZkMetaClient_testUpdate"; ZkMetaClientConfig config = new ZkMetaClientConfig.ZkMetaClientConfigBuilder().setConnectionAddress(ZK_ADDR).build(); try (ZkMetaClient zkMetaClient = new ZkMetaClient<>(config)) { zkMetaClient.connect(); int initValue = 3; - zkMetaClient.create(key, initValue); - MetaClientInterface.Stat entryStat = zkMetaClient.exists(key); - Assert.assertEquals(entryStat.getVersion(), 0); + DataUpdater updater = new DataUpdater() { + @Override + public Integer update(Integer currentData) { + return currentData != null ? currentData + 1 : initValue; + } + }; + + // Test updater basic success + for (int i = 0; i < testIterationCount; i++) { + Integer newData = zkMetaClient.update(key, updater); + Assert.assertEquals((int) newData, initValue + i); + Assert.assertEquals(zkMetaClient.exists(key).getVersion(), i); + } - // test update() and validate entry value and version - Integer newData = zkMetaClient.update(key, new DataUpdater() { + zkMetaClient.delete(key); + + AtomicBoolean latch = new AtomicBoolean(); + DataUpdater noOpUpdater = new DataUpdater() { @Override public Integer update(Integer currentData) { - return currentData + 1; + latch.set(true); + return currentData; } - }); - Assert.assertEquals((int) newData, (int) initValue + 1); + }; - entryStat = zkMetaClient.exists(key); - Assert.assertEquals(entryStat.getVersion(), 1); + DataUpdater latchedUpdater = new DataUpdater() { + @Override + public Integer update(Integer currentData) { + try { + while (!latch.get()) { + Thread.sleep(200); + } + return currentData != null ? currentData + 1 : initValue; + } catch (InterruptedException e) { + return -1; + } + } + }; - newData = zkMetaClient.update(key, new DataUpdater() { + // Test updater retries on bad version + zkMetaClient.create(key, initValue); + for (int i = 0; i < testIterationCount; i++) { + Thread thread = new Thread(() -> { + zkMetaClient.update(key, latchedUpdater); + }); + thread.start(); + zkMetaClient.update(key, noOpUpdater); + thread.join(); + latch.set(false); + Assert.assertEquals((int) zkMetaClient.get(key), initValue + i + 1); + Assert.assertEquals(zkMetaClient.exists(key).getVersion(), 2 + (i*2)); + } + DataUpdater errorUpdater = new DataUpdater() { @Override public Integer update(Integer currentData) { - return currentData + 1; + throw new RuntimeException("IGNORABLE: Test dataUpdater correctly throws exception"); } - }); + }; - entryStat = zkMetaClient.exists(key); - Assert.assertEquals(entryStat.getVersion(), 2); - Assert.assertEquals((int) newData, (int) initValue + 2); + // Test updater throws error + try { + zkMetaClient.update(key, errorUpdater); + Assert.fail("DataUpdater should have thrown error"); + } catch (RuntimeException e) {} + + zkMetaClient.delete(key); + + // Test updater retries update if node now exists when attempting to create it + latch.set(false); + Thread thread = new Thread(() -> { + zkMetaClient.update(key, latchedUpdater); + }); + thread.start(); + zkMetaClient.create(key, initValue); + latch.set(true); + thread.join(); + Assert.assertEquals((int) zkMetaClient.get(key), initValue + 1); + Assert.assertEquals(zkMetaClient.exists(key).getVersion(), 1); zkMetaClient.delete(key); } } From 089b78ff5d24d1e657cdb859109f7a60c82f7f3e Mon Sep 17 00:00:00 2001 From: Grant Palau Spencer Date: Thu, 30 May 2024 21:35:22 -0700 Subject: [PATCH 2/4] change get stat method --- .../org/apache/helix/metaclient/impl/zk/ZkMetaClient.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/ZkMetaClient.java b/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/ZkMetaClient.java index d422fddfcd..97d677ed5d 100644 --- a/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/ZkMetaClient.java +++ b/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/ZkMetaClient.java @@ -59,7 +59,6 @@ import org.apache.helix.zookeeper.zkclient.IZkStateListener; import org.apache.helix.zookeeper.zkclient.ZkConnection; import org.apache.helix.zookeeper.zkclient.exception.ZkException; -import org.apache.helix.zookeeper.zkclient.exception.ZkNoNodeException; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.Watcher; @@ -214,15 +213,16 @@ public T update(String key, DataUpdater updater) { do { retry = false; try { - org.apache.zookeeper.data.Stat stat = new org.apache.zookeeper.data.Stat(); - T oldData = _zkClient.readData(key, stat); + ImmutablePair tup = getDataAndStat(key); + Stat stat = tup.right; + T oldData = tup.left; T newData = updater.update(oldData); set(key, newData, stat.getVersion()); updatedData = newData; } catch (MetaClientBadVersionException badVersionException) { // Retry on bad version retry = true; - } catch (ZkNoNodeException noNodeException) { + } catch (MetaClientNoNodeException noNodeException) { // If node does not exist, attempt to create it - pass null to updater T newData = updater.update(null); if (newData != null) { From 11bf5624ed2a3a71d6586d6d3017893e47d128c4 Mon Sep 17 00:00:00 2001 From: Grant Palau Spencer Date: Wed, 5 Jun 2024 15:37:12 -0700 Subject: [PATCH 3/4] add retryOnFailure param to updater, limit retries, refactor metaclient updater tests --- .../metaclient/api/MetaClientInterface.java | 14 +- .../metaclient/impl/zk/ZkMetaClient.java | 20 ++- .../metaclient/impl/zk/TestZkMetaClient.java | 131 +++++++++++++----- 3 files changed, 130 insertions(+), 35 deletions(-) diff --git a/meta-client/src/main/java/org/apache/helix/metaclient/api/MetaClientInterface.java b/meta-client/src/main/java/org/apache/helix/metaclient/api/MetaClientInterface.java index 5403e7ca44..df0bb40400 100644 --- a/meta-client/src/main/java/org/apache/helix/metaclient/api/MetaClientInterface.java +++ b/meta-client/src/main/java/org/apache/helix/metaclient/api/MetaClientInterface.java @@ -196,13 +196,25 @@ public Stat (EntryMode mode, int version, long ctime, long mtime, long etime) { /** * Update existing data of a given key using an updater. This method will issue a read to get - * current data and apply updater upon the current data. + * current data and apply updater upon the current data. This method will NOT retry applying updated + * data upon failure. * @param key key to identify the entry * @param updater An updater that modifies the entry value. * @return the updated value. */ T update(final String key, DataUpdater updater); + + /** + * Update existing data of a given key using an updater. This method will issue a read to get + * current data and apply updater upon the current data. + * @param key key to identify the entry + * @param updater An updater that modifies the entry value. + * @param retryOnFailure If true, updater should retry applying updated data upon failure. + * @return the updated value. + */ + T update(final String key, DataUpdater updater, boolean retryOnFailure); + /** * Check if there is an entry for the given key. * @param key key to identify the entry diff --git a/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/ZkMetaClient.java b/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/ZkMetaClient.java index 97d677ed5d..d02430df8e 100644 --- a/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/ZkMetaClient.java +++ b/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/ZkMetaClient.java @@ -208,10 +208,18 @@ public void set(String key, T data, int version) { @Override public T update(String key, DataUpdater updater) { + return update(key, updater, false); + } + + @Override + public T update(String key, DataUpdater updater, boolean retryOnFailure) { + final int MAX_RETRY_ATTEMPTS = 3; + int retryAttempts = 0; boolean retry; T updatedData = null; do { retry = false; + retryAttempts++; try { ImmutablePair tup = getDataAndStat(key); Stat stat = tup.right; @@ -220,6 +228,11 @@ public T update(String key, DataUpdater updater) { set(key, newData, stat.getVersion()); updatedData = newData; } catch (MetaClientBadVersionException badVersionException) { + // If exceeded max retry attempts, re-throw exception + if (retryAttempts >= MAX_RETRY_ATTEMPTS) { + LOG.error("Failed to update node at " + key + " after " + MAX_RETRY_ATTEMPTS + " attempts."); + throw badVersionException; + } // Retry on bad version retry = true; } catch (MetaClientNoNodeException noNodeException) { @@ -230,6 +243,11 @@ public T update(String key, DataUpdater updater) { create(key, newData); updatedData = newData; } catch (MetaClientNodeExistsException nodeExistsException) { + // If exceeded max retry attempts, re-throw exception + if (retryAttempts >= MAX_RETRY_ATTEMPTS) { + LOG.error("Failed to update node at " + key + " after " + MAX_RETRY_ATTEMPTS + " attempts."); + throw nodeExistsException; + } // If node now exists, then retry update retry = true; } catch (ZkException e) { @@ -237,7 +255,7 @@ public T update(String key, DataUpdater updater) { } } } - } while (retry); + } while (retryOnFailure && retry); return updatedData; } diff --git a/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestZkMetaClient.java b/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestZkMetaClient.java index 9302385141..18697b9197 100644 --- a/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestZkMetaClient.java +++ b/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestZkMetaClient.java @@ -23,11 +23,10 @@ import org.apache.helix.metaclient.api.ChildChangeListener; import org.apache.helix.metaclient.api.DataUpdater; import org.apache.helix.metaclient.api.MetaClientInterface; +import org.apache.helix.metaclient.exception.MetaClientBadVersionException; import org.apache.helix.metaclient.exception.MetaClientException; import org.apache.helix.metaclient.api.DirectChildChangeListener; -import java.io.StringWriter; -import java.util.ArrayList; import java.util.Arrays; import java.util.HashSet; import java.util.List; @@ -215,8 +214,7 @@ public void testSet() { } @Test - public void testUpdate() throws InterruptedException { - int testIterationCount = 2; + public void testUpdate() { final String key = "/TestZkMetaClient_testUpdate"; ZkMetaClientConfig config = new ZkMetaClientConfig.ZkMetaClientConfigBuilder().setConnectionAddress(ZK_ADDR).build(); @@ -230,17 +228,50 @@ public Integer update(Integer currentData) { } }; + zkMetaClient.create(key, initValue); + // Test updater basic success - for (int i = 0; i < testIterationCount; i++) { + for (int i = 1; i < 3; i++) { Integer newData = zkMetaClient.update(key, updater); Assert.assertEquals((int) newData, initValue + i); Assert.assertEquals(zkMetaClient.exists(key).getVersion(), i); } + // Cleanup + zkMetaClient.delete(key); + } + } + + @Test + public void testUpdateWithRetry() throws InterruptedException { + final boolean RETRY_ON_FAILURE = true; + int testIterationCount = 2; + final String key = "/TestZkMetaClient_testUpdateWithRetry"; + ZkMetaClientConfig config = + new ZkMetaClientConfig.ZkMetaClientConfigBuilder().setConnectionAddress(ZK_ADDR).build(); + try (ZkMetaClient zkMetaClient = new ZkMetaClient<>(config)) { + zkMetaClient.connect(); + int initValue = 3; + DataUpdater updater = new DataUpdater() { + @Override + public Integer update(Integer currentData) { + return currentData != null ? currentData + 1 : initValue; + } + }; + + // Test updater creates node if it doesn't exist + Integer newData = zkMetaClient.update(key, updater, RETRY_ON_FAILURE); + Assert.assertEquals((int) newData, initValue); + Assert.assertEquals(zkMetaClient.exists(key).getVersion(), 0); + + + // Cleanup zkMetaClient.delete(key); AtomicBoolean latch = new AtomicBoolean(); - DataUpdater noOpUpdater = new DataUpdater() { + + // Increments znode version and sets latch value to true + DataUpdater versionIncrementUpdater = new DataUpdater() { @Override public Integer update(Integer currentData) { latch.set(true); @@ -248,34 +279,55 @@ public Integer update(Integer currentData) { } }; - DataUpdater latchedUpdater = new DataUpdater() { + // Reads znode, calls versionIncrementUpdater, fails to update due to bad version, then retries and should succeed + DataUpdater failsOnceUpdater = new DataUpdater() { @Override public Integer update(Integer currentData) { try { while (!latch.get()) { - Thread.sleep(200); + zkMetaClient.update(key, versionIncrementUpdater, RETRY_ON_FAILURE); } return currentData != null ? currentData + 1 : initValue; - } catch (InterruptedException e) { + } catch (MetaClientException e) { return -1; } } }; - // Test updater retries on bad version - zkMetaClient.create(key, initValue); - for (int i = 0; i < testIterationCount; i++) { - Thread thread = new Thread(() -> { - zkMetaClient.update(key, latchedUpdater); - }); - thread.start(); - zkMetaClient.update(key, noOpUpdater); - thread.join(); - latch.set(false); - Assert.assertEquals((int) zkMetaClient.get(key), initValue + i + 1); - Assert.assertEquals(zkMetaClient.exists(key).getVersion(), 2 + (i*2)); - } + // Always fails to update due to bad version + DataUpdater alwaysFailLatchedUpdater = new DataUpdater() { + @Override + public Integer update(Integer currentData) { + try { + latch.set(false); + while (!latch.get()) { + zkMetaClient.update(key, versionIncrementUpdater, RETRY_ON_FAILURE); + } + return currentData != null ? currentData + 1 : initValue; + } catch (MetaClientException e) { + return -1; + } + } + }; + + // Updater reads znode, sees it does not exist and attempts to create it, but should fail as znode already created + // due to create() call in updater. Should then retry and successfully update the node. + DataUpdater failOnFirstCreateLatchedUpdater = new DataUpdater() { + @Override + public Integer update(Integer currentData) { + try { + if (!latch.get()) { + zkMetaClient.create(key, initValue); + latch.set(true); + } + return currentData != null ? currentData + 1 : initValue; + } catch (MetaClientException e) { + return -1; + } + } + }; + // Throws error when update called DataUpdater errorUpdater = new DataUpdater() { @Override public Integer update(Integer currentData) { @@ -283,23 +335,36 @@ public Integer update(Integer currentData) { } }; + // Reset latch + latch.set(false); + // Test updater retries on bad version + // Latched updater should read znode at version 0, but attempt to write to version 1 which fails. Should retry + // and increment version to 2 + zkMetaClient.create(key, initValue); + zkMetaClient.update(key, failsOnceUpdater, RETRY_ON_FAILURE); + Assert.assertEquals((int) zkMetaClient.get(key), initValue + 1); + Assert.assertEquals(zkMetaClient.exists(key).getVersion(), 2); + + // Reset latch + latch.set(false); + // Test updater fails on retries exceeded + try { + zkMetaClient.update(key, alwaysFailLatchedUpdater, RETRY_ON_FAILURE); + Assert.fail("Updater should have thrown error"); + } catch (MetaClientBadVersionException e) {} + + // Test updater throws error try { - zkMetaClient.update(key, errorUpdater); + zkMetaClient.update(key, errorUpdater, RETRY_ON_FAILURE); Assert.fail("DataUpdater should have thrown error"); } catch (RuntimeException e) {} - zkMetaClient.delete(key); - - // Test updater retries update if node now exists when attempting to create it + // Reset latch and cleanup old node latch.set(false); - Thread thread = new Thread(() -> { - zkMetaClient.update(key, latchedUpdater); - }); - thread.start(); - zkMetaClient.create(key, initValue); - latch.set(true); - thread.join(); + zkMetaClient.delete(key); + // Test updater retries update if node does not exist on read, but then exists when updater attempts to create it + zkMetaClient.update(key, failOnFirstCreateLatchedUpdater, RETRY_ON_FAILURE); Assert.assertEquals((int) zkMetaClient.get(key), initValue + 1); Assert.assertEquals(zkMetaClient.exists(key).getVersion(), 1); zkMetaClient.delete(key); From 02fa9d936d6abd8b76f6df76fc21b8a72c3bbde3 Mon Sep 17 00:00:00 2001 From: Grant Palau Spencer Date: Mon, 10 Jun 2024 14:54:12 -0700 Subject: [PATCH 4/4] only create node if createIfAbsent is true --- .../metaclient/api/MetaClientInterface.java | 3 +- .../metaclient/impl/zk/ZkMetaClient.java | 19 +++++++--- .../metaclient/impl/zk/TestZkMetaClient.java | 38 +++++++++++++------ 3 files changed, 42 insertions(+), 18 deletions(-) diff --git a/meta-client/src/main/java/org/apache/helix/metaclient/api/MetaClientInterface.java b/meta-client/src/main/java/org/apache/helix/metaclient/api/MetaClientInterface.java index df0bb40400..c8a1d0d36d 100644 --- a/meta-client/src/main/java/org/apache/helix/metaclient/api/MetaClientInterface.java +++ b/meta-client/src/main/java/org/apache/helix/metaclient/api/MetaClientInterface.java @@ -211,9 +211,10 @@ public Stat (EntryMode mode, int version, long ctime, long mtime, long etime) { * @param key key to identify the entry * @param updater An updater that modifies the entry value. * @param retryOnFailure If true, updater should retry applying updated data upon failure. + * @param createIfAbsent If true, create the entry if it does not exist. * @return the updated value. */ - T update(final String key, DataUpdater updater, boolean retryOnFailure); + T update(final String key, DataUpdater updater, boolean retryOnFailure, boolean createIfAbsent); /** * Check if there is an entry for the given key. diff --git a/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/ZkMetaClient.java b/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/ZkMetaClient.java index d02430df8e..f4594d5d6f 100644 --- a/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/ZkMetaClient.java +++ b/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/ZkMetaClient.java @@ -19,6 +19,7 @@ * under the License. */ +import java.util.ConcurrentModificationException; import java.util.List; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; @@ -208,11 +209,11 @@ public void set(String key, T data, int version) { @Override public T update(String key, DataUpdater updater) { - return update(key, updater, false); + return update(key, updater, false, false); } @Override - public T update(String key, DataUpdater updater, boolean retryOnFailure) { + public T update(String key, DataUpdater updater, boolean retryOnFailure, boolean createIfAbsent) { final int MAX_RETRY_ATTEMPTS = 3; int retryAttempts = 0; boolean retry; @@ -230,23 +231,29 @@ public T update(String key, DataUpdater updater, boolean retryOnFailure) { } catch (MetaClientBadVersionException badVersionException) { // If exceeded max retry attempts, re-throw exception if (retryAttempts >= MAX_RETRY_ATTEMPTS) { - LOG.error("Failed to update node at " + key + " after " + MAX_RETRY_ATTEMPTS + " attempts."); + LOG.error("Failed to update node at {} after {} attempts.", key, MAX_RETRY_ATTEMPTS); throw badVersionException; } // Retry on bad version retry = true; } catch (MetaClientNoNodeException noNodeException) { + if (!createIfAbsent) { + LOG.error("Failed to update node at {} as node does not exist. createIfAbsent was {}.", key, createIfAbsent); + throw noNodeException; + } // If node does not exist, attempt to create it - pass null to updater T newData = updater.update(null); if (newData != null) { try { create(key, newData); updatedData = newData; + // If parent node for key does not exist, then updater will immediately fail due to uncaught NoNodeException } catch (MetaClientNodeExistsException nodeExistsException) { - // If exceeded max retry attempts, re-throw exception + // If exceeded max retry attempts, cast to ConcurrentModification exception and re-throw. if (retryAttempts >= MAX_RETRY_ATTEMPTS) { - LOG.error("Failed to update node at " + key + " after " + MAX_RETRY_ATTEMPTS + " attempts."); - throw nodeExistsException; + LOG.error("Failed to update node at {} after {} attempts.", key, MAX_RETRY_ATTEMPTS); + throw new ConcurrentModificationException("Failed to update node at " + key + " after " + + MAX_RETRY_ATTEMPTS + " attempts.", nodeExistsException); } // If node now exists, then retry update retry = true; diff --git a/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestZkMetaClient.java b/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestZkMetaClient.java index 18697b9197..2919893ff8 100644 --- a/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestZkMetaClient.java +++ b/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestZkMetaClient.java @@ -245,26 +245,42 @@ public Integer update(Integer currentData) { @Test public void testUpdateWithRetry() throws InterruptedException { final boolean RETRY_ON_FAILURE = true; - int testIterationCount = 2; + final boolean CREATE_IF_ABSENT = true; final String key = "/TestZkMetaClient_testUpdateWithRetry"; ZkMetaClientConfig config = new ZkMetaClientConfig.ZkMetaClientConfigBuilder().setConnectionAddress(ZK_ADDR).build(); try (ZkMetaClient zkMetaClient = new ZkMetaClient<>(config)) { zkMetaClient.connect(); int initValue = 3; - DataUpdater updater = new DataUpdater() { + // Basic updater that increments node value by 1, starting at initValue + DataUpdater basicUpdater = new DataUpdater() { @Override public Integer update(Integer currentData) { return currentData != null ? currentData + 1 : initValue; } }; - // Test updater creates node if it doesn't exist - Integer newData = zkMetaClient.update(key, updater, RETRY_ON_FAILURE); + // Test updater fails create node if it doesn't exist when createIfAbsent is false + try { + zkMetaClient.update(key, basicUpdater, RETRY_ON_FAILURE, false); + Assert.fail("Updater should have thrown error"); + } catch (MetaClientNoNodeException e) { + Assert.assertFalse(zkMetaClient.exists(key) != null); + } + + // Test updater fails when parent path does not exist + try { + zkMetaClient.update(key + "/child", basicUpdater, RETRY_ON_FAILURE, CREATE_IF_ABSENT); + Assert.fail("Updater should have thrown error"); + } catch (MetaClientNoNodeException e) { + Assert.assertFalse(zkMetaClient.exists(key + "/child") != null); + } + + // Test updater creates node if it doesn't exist when createIfAbsent is true + Integer newData = zkMetaClient.update(key, basicUpdater, RETRY_ON_FAILURE, CREATE_IF_ABSENT); Assert.assertEquals((int) newData, initValue); Assert.assertEquals(zkMetaClient.exists(key).getVersion(), 0); - // Cleanup zkMetaClient.delete(key); @@ -285,7 +301,7 @@ public Integer update(Integer currentData) { public Integer update(Integer currentData) { try { while (!latch.get()) { - zkMetaClient.update(key, versionIncrementUpdater, RETRY_ON_FAILURE); + zkMetaClient.update(key, versionIncrementUpdater, RETRY_ON_FAILURE, CREATE_IF_ABSENT); } return currentData != null ? currentData + 1 : initValue; } catch (MetaClientException e) { @@ -301,7 +317,7 @@ public Integer update(Integer currentData) { try { latch.set(false); while (!latch.get()) { - zkMetaClient.update(key, versionIncrementUpdater, RETRY_ON_FAILURE); + zkMetaClient.update(key, versionIncrementUpdater, RETRY_ON_FAILURE, CREATE_IF_ABSENT); } return currentData != null ? currentData + 1 : initValue; } catch (MetaClientException e) { @@ -341,7 +357,7 @@ public Integer update(Integer currentData) { // Latched updater should read znode at version 0, but attempt to write to version 1 which fails. Should retry // and increment version to 2 zkMetaClient.create(key, initValue); - zkMetaClient.update(key, failsOnceUpdater, RETRY_ON_FAILURE); + zkMetaClient.update(key, failsOnceUpdater, RETRY_ON_FAILURE, CREATE_IF_ABSENT); Assert.assertEquals((int) zkMetaClient.get(key), initValue + 1); Assert.assertEquals(zkMetaClient.exists(key).getVersion(), 2); @@ -349,14 +365,14 @@ public Integer update(Integer currentData) { latch.set(false); // Test updater fails on retries exceeded try { - zkMetaClient.update(key, alwaysFailLatchedUpdater, RETRY_ON_FAILURE); + zkMetaClient.update(key, alwaysFailLatchedUpdater, RETRY_ON_FAILURE, CREATE_IF_ABSENT); Assert.fail("Updater should have thrown error"); } catch (MetaClientBadVersionException e) {} // Test updater throws error try { - zkMetaClient.update(key, errorUpdater, RETRY_ON_FAILURE); + zkMetaClient.update(key, errorUpdater, RETRY_ON_FAILURE, CREATE_IF_ABSENT); Assert.fail("DataUpdater should have thrown error"); } catch (RuntimeException e) {} @@ -364,7 +380,7 @@ public Integer update(Integer currentData) { latch.set(false); zkMetaClient.delete(key); // Test updater retries update if node does not exist on read, but then exists when updater attempts to create it - zkMetaClient.update(key, failOnFirstCreateLatchedUpdater, RETRY_ON_FAILURE); + zkMetaClient.update(key, failOnFirstCreateLatchedUpdater, RETRY_ON_FAILURE, CREATE_IF_ABSENT); Assert.assertEquals((int) zkMetaClient.get(key), initValue + 1); Assert.assertEquals(zkMetaClient.exists(key).getVersion(), 1); zkMetaClient.delete(key);