Skip to content

Commit

Permalink
Lattice LockClient LockInfoSerializer Implementation(#2580)
Browse files Browse the repository at this point in the history
---------

Co-authored-by: mapeng <[email protected]>
  • Loading branch information
Marcosrico and mapeng authored Aug 2, 2023
1 parent c6890fa commit 0d4e2f5
Show file tree
Hide file tree
Showing 4 changed files with 85 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,7 @@ public LockClient(MetaClientConfig config) {
if (MetaClientConfig.StoreType.ZOOKEEPER.equals(config.getStoreType())) {
ZkMetaClientConfig zkMetaClientConfig = new ZkMetaClientConfig.ZkMetaClientConfigBuilder().
setConnectionAddress(config.getConnectionAddress())
// Currently only support ZNRecordSerializer. TODO: make this configurable
.setZkSerializer((new ZNRecordSerializer()))
.setZkSerializer((new LockInfoSerializer()))
.build();
_metaClient = new ZkMetaClientFactory().getMetaClient(zkMetaClientConfig);
_metaClient.connect();
Expand Down Expand Up @@ -110,11 +109,7 @@ public LockInfo retrieveLock(String key) {
if (stat == null) {
return null;
}
//Create a new DataRecord from underlying record
DataRecord dataRecord = new DataRecord(_metaClient.get(key));
//Create a new LockInfo from DataRecord
LockInfo lockInfo = new LockInfo(dataRecord, stat);
return lockInfo;
return new LockInfo(_metaClient.get(key), stat);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ public class LockInfo extends DataRecord {
public static final long DEFAULT_LAST_RENEWED_AT_LONG = -1L;
public static final long DEFAULT_TIMEOUT_DURATION = -1L;
private static final String DEFAULT_LOCK_INFO = "lockInfo.";
private DataRecord _dataRecord;

/**
* The keys to lock information
Expand All @@ -59,7 +58,6 @@ public enum LockInfoAttribute {
*/
public LockInfo() {
super(DEFAULT_LOCK_INFO);
_dataRecord = new DataRecord(DEFAULT_LOCK_INFO);
setLockInfoFields(DEFAULT_LOCK_ID_TEXT, DEFAULT_OWNER_ID_TEXT, DEFAULT_CLIENT_ID_TEXT, DEFAULT_CLIENT_DATA, DEFAULT_GRANTED_AT_LONG,
DEFAULT_LAST_RENEWED_AT_LONG, DEFAULT_TIMEOUT_DURATION);
}
Expand Down Expand Up @@ -140,7 +138,7 @@ private void setLockInfoFields(String lockId, String ownerId, String clientId, S
* It is created by the lockClient and a new one is created for each time the lock is acquired.
*/
public void setLockId(String lockId) {
_dataRecord.setSimpleField(LockInfoAttribute.LOCK_ID.name(), lockId == null ? DEFAULT_LOCK_ID_TEXT : lockId);
setSimpleField(LockInfoAttribute.LOCK_ID.name(), lockId == null ? DEFAULT_LOCK_ID_TEXT : lockId);
}

/**
Expand All @@ -150,39 +148,39 @@ public void setLockId(String lockId) {
* by the same owner.
*/
public void setOwnerId(String ownerId) {
_dataRecord.setSimpleField(LockInfoAttribute.OWNER_ID.name(), ownerId == null ? DEFAULT_OWNER_ID_TEXT : ownerId);
setSimpleField(LockInfoAttribute.OWNER_ID.name(), ownerId == null ? DEFAULT_OWNER_ID_TEXT : ownerId);
}

/**
* Get the value for CLIENT_ID attribute of the lock
* @param clientId Unique identifier that represents who will get the lock (the client).
*/
public void setClientId(String clientId) {
_dataRecord.setSimpleField(LockInfoAttribute.CLIENT_ID.name(), clientId == null ? DEFAULT_CLIENT_ID_TEXT : clientId);
setSimpleField(LockInfoAttribute.CLIENT_ID.name(), clientId == null ? DEFAULT_CLIENT_ID_TEXT : clientId);
}

/**
* Get the value for CLIENT_DATA attribute of the lock
* @param clientData String representing the serialized data object
*/
public void setClientData(String clientData) {
_dataRecord.setSimpleField(LockInfoAttribute.CLIENT_DATA.name(), clientData == null ? DEFAULT_CLIENT_DATA : clientData);
setSimpleField(LockInfoAttribute.CLIENT_DATA.name(), clientData == null ? DEFAULT_CLIENT_DATA : clientData);
}

/**
* Get the value for GRANTED_AT attribute of the lock
* @param grantTime Long representing the time at which the lock was granted
*/
public void setGrantedAt(Long grantTime) {
_dataRecord.setLongField(LockInfoAttribute.GRANTED_AT.name(), grantTime);
setLongField(LockInfoAttribute.GRANTED_AT.name(), grantTime);
}

/**
* Get the value for LAST_RENEWED_AT attribute of the lock
* @param lastRenewalTime Long representing the time at which the lock was last renewed
*/
public void setLastRenewedAt(Long lastRenewalTime) {
_dataRecord.setLongField(LockInfoAttribute.LAST_RENEWED_AT.name(), lastRenewalTime);
setLongField(LockInfoAttribute.LAST_RENEWED_AT.name(), lastRenewalTime);
}

/**
Expand All @@ -191,31 +189,31 @@ public void setLastRenewedAt(Long lastRenewalTime) {
*/
public void setTimeout(long timeout) {
// Always store the timeout value in milliseconds for the sake of simplicity
_dataRecord.setLongField(LockInfoAttribute.TIMEOUT.name(), timeout);
setLongField(LockInfoAttribute.TIMEOUT.name(), timeout);
}

/**
* Get the value for OWNER_ID attribute of the lock
* @return the owner id of the lock, {@link #DEFAULT_OWNER_ID_TEXT} if there is no owner id set
*/
public String getOwnerId() {
return _dataRecord.getStringField(LockInfoAttribute.OWNER_ID.name(), DEFAULT_OWNER_ID_TEXT);
return getStringField(LockInfoAttribute.OWNER_ID.name(), DEFAULT_OWNER_ID_TEXT);
}

/**
* Get the value for CLIENT_ID attribute of the lock
* @return the client id of the lock, {@link #DEFAULT_CLIENT_ID_TEXT} if there is no client id set
*/
public String getClientId() {
return _dataRecord.getStringField(LockInfoAttribute.CLIENT_ID.name(), DEFAULT_CLIENT_ID_TEXT);
return getStringField(LockInfoAttribute.CLIENT_ID.name(), DEFAULT_CLIENT_ID_TEXT);
}

/**
* Get the value for LOCK_ID attribute of the lock
* @return the id of the lock, {@link #DEFAULT_LOCK_ID_TEXT} if there is no lock id set
*/
public String getLockId() {
return _dataRecord.getStringField(LockInfoAttribute.LOCK_ID.name(), DEFAULT_LOCK_ID_TEXT);
return getStringField(LockInfoAttribute.LOCK_ID.name(), DEFAULT_LOCK_ID_TEXT);
}

/**
Expand All @@ -224,7 +222,7 @@ public String getLockId() {
* if there is no client data set.
*/
public String getClientData() {
return _dataRecord.getStringField(LockInfoAttribute.CLIENT_DATA.name(), DEFAULT_CLIENT_DATA);
return getStringField(LockInfoAttribute.CLIENT_DATA.name(), DEFAULT_CLIENT_DATA);
}

/**
Expand All @@ -233,7 +231,7 @@ public String getClientData() {
* if there is no grant time set
*/
public Long getGrantedAt() {
return _dataRecord.getLongField(LockInfoAttribute.GRANTED_AT.name(), DEFAULT_GRANTED_AT_LONG);
return getLongField(LockInfoAttribute.GRANTED_AT.name(), DEFAULT_GRANTED_AT_LONG);
}

/**
Expand All @@ -242,15 +240,15 @@ public Long getGrantedAt() {
* if there is no renewal time set
*/
public Long getLastRenewedAt() {
return _dataRecord.getLongField(LockInfoAttribute.LAST_RENEWED_AT.name(), DEFAULT_LAST_RENEWED_AT_LONG);
return getLongField(LockInfoAttribute.LAST_RENEWED_AT.name(), DEFAULT_LAST_RENEWED_AT_LONG);
}

/**
* Get the value for TIMEOUT attribute of the lock
* @return the expiring time of the lock, {@link #DEFAULT_TIMEOUT_DURATION} if there is no timeout set
*/
public long getTimeout() {
return _dataRecord.getLongField(LockInfoAttribute.TIMEOUT.name(), DEFAULT_TIMEOUT_DURATION);
return getLongField(LockInfoAttribute.TIMEOUT.name(), DEFAULT_TIMEOUT_DURATION);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package org.apache.helix.metaclient.recipes.lock;

/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

import org.apache.helix.metaclient.datamodel.DataRecord;
import org.apache.helix.zookeeper.datamodel.ZNRecord;
import org.apache.helix.zookeeper.datamodel.serializer.ZNRecordSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class LockInfoSerializer extends ZNRecordSerializer {
private static final Logger LOG = LoggerFactory.getLogger(LockInfoSerializer.class);
@Override
public Object deserialize(byte[] bytes) {
try {
return new LockInfo(new DataRecord((ZNRecord) super.deserialize(bytes)));
} catch (Exception e) {
LOG.error("Exception during deserialization of bytes: {}", new String(bytes), e);
return null;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,13 @@
public class LockClientTest extends ZkMetaClientTestBase {

private static final String TEST_INVALID_PATH = "/_invalid/a/b/c";
private static final String OWNER_ID = "urn:li:principal:UNKNOWN";
private static final String CLIENT_ID = "test_client_id";
private static final String CLIENT_DATA = "client_data";
private static final String LOCK_ID = "794c8a4c-c14b-4c23-b83f-4e1147fc6978";
private static final long GRANT_TIME = System.currentTimeMillis();
private static final long LAST_RENEWAL_TIME = System.currentTimeMillis();
private static final long TIMEOUT = 100000;

public LockClient createLockClient() {

Expand All @@ -37,11 +44,23 @@ public LockClient createLockClient() {
return new LockClient(config);
}

private LockInfo createLockInfo() {
LockInfo lockInfo = new LockInfo();
lockInfo.setOwnerId(OWNER_ID);
lockInfo.setClientId(CLIENT_ID);
lockInfo.setClientData(CLIENT_DATA);
lockInfo.setLockId(LOCK_ID);
lockInfo.setGrantedAt(GRANT_TIME);
lockInfo.setLastRenewedAt(LAST_RENEWAL_TIME);
lockInfo.setTimeout(TIMEOUT);
return lockInfo;
}

@Test
public void testAcquireLock() {
final String key = "/TestLockClient_testAcquireLock";
LockClient lockClient = createLockClient();
LockInfo lockInfo = new LockInfo();
LockInfo lockInfo = createLockInfo();
lockClient.acquireLock(key, lockInfo, MetaClientInterface.EntryMode.PERSISTENT);
Assert.assertNotNull(lockClient.retrieveLock(key));
try {
Expand All @@ -56,7 +75,7 @@ public void testAcquireLock() {
public void testReleaseLock() {
final String key = "/TestLockClient_testReleaseLock";
LockClient lockClient = createLockClient();
LockInfo lockInfo = new LockInfo();
LockInfo lockInfo = createLockInfo();
lockClient.acquireLock(key, lockInfo, MetaClientInterface.EntryMode.PERSISTENT);
Assert.assertNotNull(lockClient.retrieveLock(key));

Expand All @@ -69,7 +88,7 @@ public void testReleaseLock() {
public void testAcquireTTLLock() {
final String key = "/TestLockClient_testAcquireTTLLock";
LockClient lockClient = createLockClient();
LockInfo lockInfo = new LockInfo();
LockInfo lockInfo = createLockInfo();
lockClient.acquireLockWithTTL(key, lockInfo, 1L);
Assert.assertNotNull(lockClient.retrieveLock(key));
try {
Expand All @@ -84,22 +103,27 @@ public void testAcquireTTLLock() {
public void testRetrieveLock() {
final String key = "/TestLockClient_testRetrieveLock";
LockClient lockClient = createLockClient();
LockInfo lockInfo = new LockInfo();
LockInfo lockInfo = createLockInfo();
lockClient.acquireLock(key, lockInfo, MetaClientInterface.EntryMode.PERSISTENT);
Assert.assertNotNull(lockClient.retrieveLock(key));
Assert.assertEquals(lockClient.retrieveLock(key).getOwnerId(), OWNER_ID);
Assert.assertEquals(lockClient.retrieveLock(key).getClientId(), CLIENT_ID);
Assert.assertEquals(lockClient.retrieveLock(key).getClientData(), CLIENT_DATA);
Assert.assertEquals(lockClient.retrieveLock(key).getLockId(), LOCK_ID);
Assert.assertEquals(lockClient.retrieveLock(key).getTimeout(), TIMEOUT);
Assert.assertNull(lockClient.retrieveLock(TEST_INVALID_PATH));
}

@Test
public void testRenewTTLLock() {
final String key = "/TestLockClient_testRenewTTLLock";
LockClient lockClient = createLockClient();
LockInfo lockInfo = new LockInfo();
LockInfo lockInfo = createLockInfo();
lockClient.acquireLockWithTTL(key, lockInfo, 1L);
Assert.assertNotNull(lockClient.retrieveLock(key));

lockClient.renewTTLLock(key);
Assert.assertNotSame(lockClient.retrieveLock(key).getGrantedAt(), lockInfo.getLastRenewedAt());
Assert.assertNotSame(lockClient.retrieveLock(key).getGrantedAt(), lockClient.retrieveLock(key).getLastRenewedAt());
try {
lockClient.renewTTLLock(TEST_INVALID_PATH);
Assert.fail("Should not be able to renew lock for key: " + key);
Expand Down

0 comments on commit 0d4e2f5

Please sign in to comment.