From a191b8d14589bb8104372c7c26609c5f580baeb3 Mon Sep 17 00:00:00 2001 From: groot Date: Thu, 8 Aug 2024 16:00:14 +0800 Subject: [PATCH] Fix bug of describeCollection V2 (#1030) Signed-off-by: yhmo --- .../java/io/milvus/client/MilvusClient.java | 2 +- src/main/java/io/milvus/pool/ClientPool.java | 49 ++++++++++++++++++- .../io/milvus/v2/client/MilvusClientV2.java | 18 +++++-- .../service/collection/CollectionService.java | 21 +------- .../response/DescribeCollectionResp.java | 5 ++ .../v2/service/vector/VectorService.java | 6 +-- .../java/io/milvus/v2/utils/ClientUtils.java | 2 +- .../java/io/milvus/v2/utils/ConvertUtils.java | 24 +++++++++ .../v2/client/MilvusClientV2DockerTest.java | 2 +- 9 files changed, 99 insertions(+), 30 deletions(-) diff --git a/src/main/java/io/milvus/client/MilvusClient.java b/src/main/java/io/milvus/client/MilvusClient.java index 3f85057b2..6d3fde667 100644 --- a/src/main/java/io/milvus/client/MilvusClient.java +++ b/src/main/java/io/milvus/client/MilvusClient.java @@ -97,7 +97,7 @@ public interface MilvusClient { void setLogLevel(LogLevel level); /** - * Disconnects from a Milvus server with timeout of 1 minute + * Disconnects from a Milvus server with timeout of 1 second */ default void close() { try { diff --git a/src/main/java/io/milvus/pool/ClientPool.java b/src/main/java/io/milvus/pool/ClientPool.java index 1f559ff11..f97b6b5ff 100644 --- a/src/main/java/io/milvus/pool/ClientPool.java +++ b/src/main/java/io/milvus/pool/ClientPool.java @@ -35,6 +35,15 @@ protected ClientPool(PoolConfig config, PoolClientFactory clientFactory) { this.clientPool = new GenericKeyedObjectPool(clientFactory, poolConfig); } + /** + * Get a client object which is idle from the pool. + * Once the client is hold by the caller, it will be marked as active state and cannot be fetched by other caller. + * If the number of clients hits the MaxTotalPerKey value, this method will be blocked for MaxBlockWaitDuration. + * If no idle client available after MaxBlockWaitDuration, this method will return a null object to caller. + * + * @param key the key of a group where the client belong + * @return MilvusClient or MilvusClientV2 + */ public T getClient(String key) { try { return clientPool.borrowObject(key); @@ -44,7 +53,14 @@ public T getClient(String key) { } } - + /** + * Return a client object. Once a client is returned, it becomes idle state and wait the next caller. + * The caller should ensure the client is returned. Otherwise, the client will keep in active state and cannot be used by the next caller. + * Throw exceptions if the key doesn't exist or the client is not belong to this key group. + * + * @param key the key of a group where the client belong + * @param grpcClient the client object to return + */ public void returnClient(String key, T grpcClient) { try { clientPool.returnObject(key, grpcClient); @@ -54,6 +70,10 @@ public void returnClient(String key, T grpcClient) { } } + /** + * Release/disconnect all clients of all key groups, close the pool. + * + */ public void close() { if (clientPool != null && !clientPool.isClosed()) { clientPool.close(); @@ -61,30 +81,57 @@ public void close() { } } + /** + * Release/disconnect idle clients of all key groups. + * + */ public void clear() { if (clientPool != null && !clientPool.isClosed()) { clientPool.clear(); } } + /** + * Release/disconnect idle clients of a key group. + * + * @param key the key of a group + */ public void clear(String key) { if (clientPool != null && !clientPool.isClosed()) { clientPool.clear(key); } } + /** + * Return the number of idle clients of a key group + * + * @param key the key of a group + */ public int getIdleClientNumber(String key) { return clientPool.getNumIdle(key); } + /** + * Return the number of active clients of a key group + * + * @param key the key of a group + */ public int getActiveClientNumber(String key) { return clientPool.getNumActive(key); } + /** + * Return the number of idle clients of all key group + * + */ public int getTotalIdleClientNumber() { return clientPool.getNumIdle(); } + /** + * Return the number of active clients of all key group + * + */ public int getTotalActiveClientNumber() { return clientPool.getNumActive(); } diff --git a/src/main/java/io/milvus/v2/client/MilvusClientV2.java b/src/main/java/io/milvus/v2/client/MilvusClientV2.java index f934d131e..404506645 100644 --- a/src/main/java/io/milvus/v2/client/MilvusClientV2.java +++ b/src/main/java/io/milvus/v2/client/MilvusClientV2.java @@ -693,12 +693,12 @@ public DescribeAliasResp describeAlias(DescribeAliasReq request) { * * @return String */ - public String getVersion() { - return retry(()->clientUtils.getVersion(this.blockingStub)); + public String getServerVersion() { + return retry(()->clientUtils.getServerVersion(this.blockingStub)); } /** - * close client + * Disconnects from a Milvus server with configurable timeout * * @param maxWaitSeconds max wait seconds * @throws InterruptedException if the client failed to close connection @@ -710,6 +710,18 @@ public void close(long maxWaitSeconds) throws InterruptedException { } } + /** + * Disconnects from a Milvus server with timeout of 1 second + * + */ + public void close() { + try { + close(TimeUnit.MINUTES.toSeconds(1)); + } catch (InterruptedException e) { + System.out.println("Interrupted during shutdown Milvus client!"); + } + } + public boolean clientIsReady() { return channel != null && !channel.isShutdown() && !channel.isTerminated(); } diff --git a/src/main/java/io/milvus/v2/service/collection/CollectionService.java b/src/main/java/io/milvus/v2/service/collection/CollectionService.java index 3f24c213d..ccf6fad7a 100644 --- a/src/main/java/io/milvus/v2/service/collection/CollectionService.java +++ b/src/main/java/io/milvus/v2/service/collection/CollectionService.java @@ -189,26 +189,7 @@ public DescribeCollectionResp describeCollection(MilvusServiceGrpc.MilvusService .build(); DescribeCollectionResponse response = blockingStub.describeCollection(describeCollectionRequest); rpcUtils.handleResponse(title, response.getStatus()); - return convertDescCollectionResp(response); - } - - public static DescribeCollectionResp convertDescCollectionResp(DescribeCollectionResponse response) { - DescribeCollectionResp describeCollectionResp = DescribeCollectionResp.builder() - .collectionName(response.getCollectionName()) - .databaseName(response.getDbName()) - .description(response.getSchema().getDescription()) - .numOfPartitions(response.getNumPartitions()) - .collectionSchema(SchemaUtils.convertFromGrpcCollectionSchema(response.getSchema())) - .autoID(response.getSchema().getFieldsList().stream().anyMatch(FieldSchema::getAutoID)) - .enableDynamicField(response.getSchema().getEnableDynamicField()) - .fieldNames(response.getSchema().getFieldsList().stream().map(FieldSchema::getName).collect(java.util.stream.Collectors.toList())) - .vectorFieldNames(response.getSchema().getFieldsList().stream().filter(fieldSchema -> ParamUtils.isVectorDataType(fieldSchema.getDataType())).map(FieldSchema::getName).collect(java.util.stream.Collectors.toList())) - .primaryFieldName(response.getSchema().getFieldsList().stream().filter(FieldSchema::getIsPrimaryKey).map(FieldSchema::getName).collect(java.util.stream.Collectors.toList()).get(0)) - .createTime(response.getCreatedTimestamp()) - .consistencyLevel(io.milvus.v2.common.ConsistencyLevel.valueOf(response.getConsistencyLevel().name().toUpperCase())) - .build(); - - return describeCollectionResp; + return convertUtils.convertDescCollectionResp(response); } public Void renameCollection(MilvusServiceGrpc.MilvusServiceBlockingStub blockingStub, RenameCollectionReq request) { diff --git a/src/main/java/io/milvus/v2/service/collection/response/DescribeCollectionResp.java b/src/main/java/io/milvus/v2/service/collection/response/DescribeCollectionResp.java index 0fe45efcd..2f0ea8478 100644 --- a/src/main/java/io/milvus/v2/service/collection/response/DescribeCollectionResp.java +++ b/src/main/java/io/milvus/v2/service/collection/response/DescribeCollectionResp.java @@ -21,10 +21,13 @@ import io.milvus.v2.common.ConsistencyLevel; import io.milvus.v2.service.collection.request.CreateCollectionReq; +import lombok.Builder; import lombok.Data; import lombok.experimental.SuperBuilder; +import java.util.HashMap; import java.util.List; +import java.util.Map; @Data @SuperBuilder @@ -43,4 +46,6 @@ public class DescribeCollectionResp { private CreateCollectionReq.CollectionSchema collectionSchema; private Long createTime; private ConsistencyLevel consistencyLevel; + @Builder.Default + private final Map properties = new HashMap<>(); } diff --git a/src/main/java/io/milvus/v2/service/vector/VectorService.java b/src/main/java/io/milvus/v2/service/vector/VectorService.java index 2dea19620..477a70b70 100644 --- a/src/main/java/io/milvus/v2/service/vector/VectorService.java +++ b/src/main/java/io/milvus/v2/service/vector/VectorService.java @@ -168,7 +168,7 @@ public SearchResp search(MilvusServiceGrpc.MilvusServiceBlockingStub blockingStu public QueryIterator queryIterator(MilvusServiceGrpc.MilvusServiceBlockingStub blockingStub, QueryIteratorReq request) { DescribeCollectionResponse descResp = getCollectionInfo(blockingStub, "", request.getCollectionName()); - DescribeCollectionResp respR = CollectionService.convertDescCollectionResp(descResp); + DescribeCollectionResp respR = convertUtils.convertDescCollectionResp(descResp); CreateCollectionReq.FieldSchema pkField = respR.getCollectionSchema().getField(respR.getPrimaryFieldName()); return new QueryIterator(request, blockingStub, pkField); } @@ -176,7 +176,7 @@ public QueryIterator queryIterator(MilvusServiceGrpc.MilvusServiceBlockingStub b public SearchIterator searchIterator(MilvusServiceGrpc.MilvusServiceBlockingStub blockingStub, SearchIteratorReq request) { DescribeCollectionResponse descResp = getCollectionInfo(blockingStub, "", request.getCollectionName()); - DescribeCollectionResp respR = CollectionService.convertDescCollectionResp(descResp); + DescribeCollectionResp respR = convertUtils.convertDescCollectionResp(descResp); CreateCollectionReq.FieldSchema pkField = respR.getCollectionSchema().getField(respR.getPrimaryFieldName()); return new SearchIterator(request, blockingStub, pkField); } @@ -189,7 +189,7 @@ public DeleteResp delete(MilvusServiceGrpc.MilvusServiceBlockingStub blockingStu } DescribeCollectionResponse descResp = getCollectionInfo(blockingStub, "", request.getCollectionName()); - DescribeCollectionResp respR = CollectionService.convertDescCollectionResp(descResp); + DescribeCollectionResp respR = convertUtils.convertDescCollectionResp(descResp); if (request.getFilter() == null) { request.setFilter(vectorUtils.getExprById(respR.getPrimaryFieldName(), request.getIds())); } diff --git a/src/main/java/io/milvus/v2/utils/ClientUtils.java b/src/main/java/io/milvus/v2/utils/ClientUtils.java index 46a29946e..41ca6b0fa 100644 --- a/src/main/java/io/milvus/v2/utils/ClientUtils.java +++ b/src/main/java/io/milvus/v2/utils/ClientUtils.java @@ -127,7 +127,7 @@ public void checkDatabaseExist(MilvusServiceGrpc.MilvusServiceBlockingStub block throw new IllegalArgumentException("Database " + dbName + " not exist"); } } - public String getVersion(MilvusServiceGrpc.MilvusServiceBlockingStub blockingStub) { + public String getServerVersion(MilvusServiceGrpc.MilvusServiceBlockingStub blockingStub) { GetVersionResponse response = blockingStub.getVersion(GetVersionRequest.newBuilder().build()); rpcUtils.handleResponse("Get server version", response.getStatus()); return response.getVersion(); diff --git a/src/main/java/io/milvus/v2/utils/ConvertUtils.java b/src/main/java/io/milvus/v2/utils/ConvertUtils.java index be50d8248..35f02d44e 100644 --- a/src/main/java/io/milvus/v2/utils/ConvertUtils.java +++ b/src/main/java/io/milvus/v2/utils/ConvertUtils.java @@ -21,10 +21,12 @@ import io.milvus.grpc.*; import io.milvus.param.Constant; +import io.milvus.param.ParamUtils; import io.milvus.response.QueryResultsWrapper; import io.milvus.response.SearchResultsWrapper; import io.milvus.v2.common.IndexBuildState; import io.milvus.v2.common.IndexParam; +import io.milvus.v2.service.collection.response.DescribeCollectionResp; import io.milvus.v2.service.index.response.DescribeIndexResp; import io.milvus.v2.service.vector.response.QueryResp; import io.milvus.v2.service.vector.response.SearchResp; @@ -112,4 +114,26 @@ public DescribeIndexResp convertToDescribeIndexResp(List respo return DescribeIndexResp.builder().indexDescriptions(descs).build(); } + + public DescribeCollectionResp convertDescCollectionResp(DescribeCollectionResponse response) { + Map properties = new HashMap<>(); + response.getPropertiesList().forEach(prop->properties.put(prop.getKey(), prop.getValue())); + + DescribeCollectionResp describeCollectionResp = DescribeCollectionResp.builder() + .collectionName(response.getCollectionName()) + .databaseName(response.getDbName()) + .description(response.getSchema().getDescription()) + .numOfPartitions(response.getNumPartitions()) + .collectionSchema(SchemaUtils.convertFromGrpcCollectionSchema(response.getSchema())) + .autoID(response.getSchema().getFieldsList().stream().anyMatch(FieldSchema::getAutoID)) + .enableDynamicField(response.getSchema().getEnableDynamicField()) + .fieldNames(response.getSchema().getFieldsList().stream().map(FieldSchema::getName).collect(java.util.stream.Collectors.toList())) + .vectorFieldNames(response.getSchema().getFieldsList().stream().filter(fieldSchema -> ParamUtils.isVectorDataType(fieldSchema.getDataType())).map(FieldSchema::getName).collect(java.util.stream.Collectors.toList())) + .primaryFieldName(response.getSchema().getFieldsList().stream().filter(FieldSchema::getIsPrimaryKey).map(FieldSchema::getName).collect(java.util.stream.Collectors.toList()).get(0)) + .createTime(response.getCreatedTimestamp()) + .consistencyLevel(io.milvus.v2.common.ConsistencyLevel.valueOf(response.getConsistencyLevel().name().toUpperCase())) + .properties(properties) + .build(); + return describeCollectionResp; + } } diff --git a/src/test/java/io/milvus/v2/client/MilvusClientV2DockerTest.java b/src/test/java/io/milvus/v2/client/MilvusClientV2DockerTest.java index 3d2869a2f..893f96845 100644 --- a/src/test/java/io/milvus/v2/client/MilvusClientV2DockerTest.java +++ b/src/test/java/io/milvus/v2/client/MilvusClientV2DockerTest.java @@ -1161,7 +1161,7 @@ void testClientPool() { Thread t = new Thread(() -> { for (int i = 0; i < requestPerThread; i++) { MilvusClientV2 client = pool.getClient(key); - String version = client.getVersion(); + String version = client.getServerVersion(); // System.out.printf("%d, %s%n", i, version); System.out.printf("idle %d, active %d%n", pool.getIdleClientNumber(key), pool.getActiveClientNumber(key)); pool.returnClient(key, client);