diff --git a/.github/actions/common/plugin-change-check/action.yml b/.github/actions/common/plugin-change-check/action.yml index 34db4ca71a..0e353f72eb 100644 --- a/.github/actions/common/plugin-change-check/action.yml +++ b/.github/actions/common/plugin-change-check/action.yml @@ -91,6 +91,14 @@ runs: shell: bash run: | echo "sermantServiceRemovalChanged=${{ steps.changed-sermant-removal.outputs.changed }}" >> $GITHUB_ENV + - uses: marceloprado/has-changed-path@v1.0.1 + id: changed-sermant-tag-transmission + with: + paths: sermant-plugins/sermant-tag-transmission + - name: env sermant-sermant-tag-transmission + shell: bash + run: | + echo "sermantTagTransmissionChanged=${{ steps.changed-sermant-tag-transmission.outputs.changed }}" >> $GITHUB_ENV - uses: marceloprado/has-changed-path@v1.0.1 id: changed-workflow-or-test with: @@ -179,6 +187,12 @@ runs: echo "enableRemoval=true" >> $GITHUB_ENV fi + # *****************tagtransmission.yml***************** + # ==========tag transmission is needed to test?========== + if [ ${{ env.sermantAgentCoreChanged }} == 'true' -o ${{ env.sermantTagTransmissionChanged }} == 'true';then + echo "enableTagTransmission=true" >> $GITHUB_ENV + fi + # all workflow will trigger while workflow changed if [ ${{ steps.changed-workflow-or-test.outputs.changed }} == 'true' -o ${{ env.triggerPushEvent }} == 'true' ];then echo "enableDubboRouter=true" >> $GITHUB_ENV @@ -193,5 +207,6 @@ runs: echo "enableSpringLane=true" >> $GITHUB_ENV echo "enableDubboLane=true" >> $GITHUB_ENV echo "enableRemoval=true" >> $GITHUB_ENV + echo "enableTagTransmission=true" >> $GITHUB_ENV fi diff --git a/sermant-integration-tests/scripts/tryDownloadMidware.sh b/sermant-integration-tests/scripts/tryDownloadMidware.sh index 5c4c4cb0bb..628a4e439c 100644 --- a/sermant-integration-tests/scripts/tryDownloadMidware.sh +++ b/sermant-integration-tests/scripts/tryDownloadMidware.sh @@ -34,6 +34,12 @@ NACOS_FILE_NAME_210=nacos-server-2.1.0.tar.gz #======================================Service Center配置====================================== SERVICE_CENTER_ADDRESS=https://github.com/apache/servicecomb-service-center/releases/download/v2.1.0/apache-servicecomb-service-center-2.1.0-linux-amd64.tar.gz SERVICE_CENTER_FILE_NAME=apache-servicecomb-service-center-2.1.0-linux-amd64.tar.gz +#======================================rocketmq配置====================================== +ROCKETMQ_ADDRESS=https://archive.apache.org/dist/rocketmq/4.8.0/rocketmq-all-4.8.0-source-release.zip +ROCKETMQ_FILE_NAME=rocketmq-all-4.8.0-source-release.zip +#======================================rocketmq配置====================================== +KAFKA_ADDRESS=https://archive.apache.org/dist/kafka/2.7.0/kafka_2.13-2.7.0.tgz +KAFKA_FILE_NAME=kafka_2.13-2.7.0.tgz #重试次数 TRY_TIMES=3 @@ -77,6 +83,12 @@ elif [ $midleware == "nacos210" ]; then elif [ $midleware == "sc" ]; then ADDRESS=$SERVICE_CENTER_ADDRESS FILE_NAME=$SERVICE_CENTER_FILE_NAME +elif [ $midleware == "rocketmq" ]; then + ADDRESS=$ROCKETMQ_ADDRESS + FILE_NAME=$ROCKETMQ_FILE_NAME +elif [ $midleware == "kafka" ]; then + ADDRESS=$KAFKA_ADDRESS + FILE_NAME=$KAFKA_FILE_NAME else ADDRESS=$CSE_ADDRESS FILE_NAME=$CSE_FILE_NAME diff --git a/sermant-integration-tests/tag-transmission-test/crossthread-demo/pom.xml b/sermant-integration-tests/tag-transmission-test/crossthread-demo/pom.xml new file mode 100644 index 0000000000..09bfe41d33 --- /dev/null +++ b/sermant-integration-tests/tag-transmission-test/crossthread-demo/pom.xml @@ -0,0 +1,39 @@ + + + + tag-transmission-test + com.huaweicloud.sermant.tagtransmission + 1.0.0 + + 4.0.0 + + crossthread-demo + + + 8 + 8 + + + + + org.springframework.boot + spring-boot-starter-web + + + com.huaweicloud.sermant.tagtransmission + tag-transmission-util-demo + + + + + ${project.artifactId} + + + org.springframework.boot + spring-boot-maven-plugin + + + + \ No newline at end of file diff --git a/sermant-integration-tests/tag-transmission-test/crossthread-demo/src/main/java/com/huaweicloud/demo/tagtransmission/crossthread/CrossThreadApplication.java b/sermant-integration-tests/tag-transmission-test/crossthread-demo/src/main/java/com/huaweicloud/demo/tagtransmission/crossthread/CrossThreadApplication.java new file mode 100644 index 0000000000..fe9d642472 --- /dev/null +++ b/sermant-integration-tests/tag-transmission-test/crossthread-demo/src/main/java/com/huaweicloud/demo/tagtransmission/crossthread/CrossThreadApplication.java @@ -0,0 +1,38 @@ +/* + * Copyright (C) 2023-2023 Huawei Technologies Co., Ltd. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.huaweicloud.demo.tagtransmission.crossthread; + +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; + +/** + * springboot 启动类 + * + * @author daizhenyu + * @since 2023-09-07 + **/ +@SpringBootApplication +public class CrossThreadApplication { + /** + * 启动类 + * + * @param args 进程启动入参 + */ + public static void main(String[] args) { + SpringApplication.run(CrossThreadApplication.class, args); + } +} \ No newline at end of file diff --git a/sermant-integration-tests/tag-transmission-test/crossthread-demo/src/main/java/com/huaweicloud/demo/tagtransmission/crossthread/controller/CrossThreadController.java b/sermant-integration-tests/tag-transmission-test/crossthread-demo/src/main/java/com/huaweicloud/demo/tagtransmission/crossthread/controller/CrossThreadController.java new file mode 100644 index 0000000000..03bd0d4a1a --- /dev/null +++ b/sermant-integration-tests/tag-transmission-test/crossthread-demo/src/main/java/com/huaweicloud/demo/tagtransmission/crossthread/controller/CrossThreadController.java @@ -0,0 +1,183 @@ +/* + * Copyright (C) 2023-2023 Huawei Technologies Co., Ltd. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.huaweicloud.demo.tagtransmission.crossthread.controller; + +import com.huaweicloud.demo.tagtransmission.util.HttpClientUtils; + +import org.springframework.beans.factory.annotation.Value; +import org.springframework.http.MediaType; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RequestMethod; +import org.springframework.web.bind.annotation.RestController; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executors; +import java.util.concurrent.FutureTask; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +/** + * 验证流量标签传递 跨线程场景 + * + * @author daizhenyu + * @since 2023-09-11 + **/ +@RestController +@RequestMapping(value = "thread") +public class CrossThreadController { + /** + * 存储消费者调用http服务端返回的流量标签 + */ + public static final Map THREAD_TAG_MAP = new HashMap<>(); + + private static final int CORE_POOL_SIZE = 2; + + private static final int MAX_POOL_SIZE = 2; + + private static final int KEEP_ALIVE_TIME = 10; + + private static final int QUEUE_CAPACITY = 20; + + private static final int INITIAL_DELAY_TIME = 1; + + private static final int DELAY_TIME = 10; + + private static final int SLEEP_TIME = 5000; + + private static final String THREAD_TAG = "threadTag"; + + private final ThreadPoolExecutor executor = new ThreadPoolExecutor(CORE_POOL_SIZE, MAX_POOL_SIZE, KEEP_ALIVE_TIME, + TimeUnit.SECONDS, new ArrayBlockingQueue<>(QUEUE_CAPACITY)); + + private final ScheduledExecutorService scheduledExecutor = Executors.newScheduledThreadPool(CORE_POOL_SIZE); + + @Value("${common.server.url}") + private String commonServerUrl; + + /** + * 新建线程 + * + * @return 透传标签值 + * @throws ExecutionException + * @throws InterruptedException + */ + @RequestMapping(value = "testNewThread", method = RequestMethod.GET, produces = MediaType.TEXT_PLAIN_VALUE) + public String testNewThread() throws ExecutionException, InterruptedException { + FutureTask futureTask = new FutureTask<>(() -> HttpClientUtils.doHttpUrlConnectionGet(commonServerUrl)); + Thread thread = new Thread(futureTask); + thread.start(); + return futureTask.get(); + } + + /** + * 普通线程池执行executor方法提交线程任务 + * + * @return 透传标签值 + * @throws InterruptedException + */ + @RequestMapping(value = "testExecutor", method = RequestMethod.GET, produces = MediaType.TEXT_PLAIN_VALUE) + public String testExecutor() throws InterruptedException { + String trafficTag = null; + executor.execute(() -> THREAD_TAG_MAP.put(THREAD_TAG, HttpClientUtils.doHttpUrlConnectionGet(commonServerUrl))); + Thread.sleep(SLEEP_TIME); + trafficTag = THREAD_TAG_MAP.get(THREAD_TAG); + + // 删除流量标签,以免干扰下一次测试查询 + THREAD_TAG_MAP.remove(THREAD_TAG); + return trafficTag; + } + + /** + * 普通线程池执行submit方法提交线程任务 + * + * @return 透传标签值 + * @throws ExecutionException + * @throws InterruptedException + */ + @RequestMapping(value = "testSubmit", method = RequestMethod.GET, produces = MediaType.TEXT_PLAIN_VALUE) + public String testSubmit() throws ExecutionException, InterruptedException { + FutureTask futureTask = new FutureTask<>(() -> HttpClientUtils.doHttpUrlConnectionGet(commonServerUrl)); + executor.submit(futureTask); + return futureTask.get(); + } + + /** + * 定时线程池执行schedule方法提交线程任务 + * + * @return 透传标签值 + * @throws ExecutionException + * @throws InterruptedException + */ + @RequestMapping(value = "testSchedule", method = RequestMethod.GET, produces = MediaType.TEXT_PLAIN_VALUE) + public String testSchedule() throws ExecutionException, InterruptedException { + FutureTask futureTask = new FutureTask<>(() -> HttpClientUtils.doHttpUrlConnectionGet(commonServerUrl)); + scheduledExecutor.schedule(futureTask, INITIAL_DELAY_TIME, TimeUnit.SECONDS); + return futureTask.get(); + } + + /** + * 定时线程池执行scheduleAtFixedRate方法提交线程任务 + * + * @return 透传标签值 + * @throws ExecutionException + * @throws InterruptedException + */ + @RequestMapping(value = "testScheduleAtFixedRate", method = RequestMethod.GET, + produces = MediaType.TEXT_PLAIN_VALUE) + public String testScheduleAtFixedRate() throws ExecutionException, InterruptedException { + FutureTask futureTask = new FutureTask<>(() -> HttpClientUtils.doHttpUrlConnectionGet(commonServerUrl)); + scheduledExecutor.scheduleAtFixedRate(futureTask, INITIAL_DELAY_TIME, DELAY_TIME, TimeUnit.SECONDS); + return futureTask.get(); + } + + /** + * 定时线程池执行scheduleWithFixedDelay方法提交线程任务 + * + * @return 透传标签值 + * @throws ExecutionException + * @throws InterruptedException + */ + @RequestMapping(value = "testScheduleWithFixedDelay", method = RequestMethod.GET, + produces = MediaType.TEXT_PLAIN_VALUE) + public String testScheduleWithFixedDelay() throws ExecutionException, InterruptedException { + FutureTask futureTask = new FutureTask<>(() -> HttpClientUtils.doHttpUrlConnectionGet(commonServerUrl)); + scheduledExecutor.scheduleWithFixedDelay(futureTask, INITIAL_DELAY_TIME, DELAY_TIME, TimeUnit.SECONDS); + return futureTask.get(); + } + + /** + * 关闭线程池 + * + * @throws InterruptedException + */ + @RequestMapping(value = "shutdown", method = RequestMethod.GET, produces = MediaType.TEXT_PLAIN_VALUE) + public void shutdownThreadPool() throws InterruptedException { + // 延迟五秒关闭线程池,以防后续线程任务执行 + Thread.sleep(SLEEP_TIME); + + if (executor != null) { + executor.shutdown(); + } + if (scheduledExecutor != null) { + scheduledExecutor.shutdown(); + } + } +} \ No newline at end of file diff --git a/sermant-integration-tests/tag-transmission-test/crossthread-demo/src/main/resources/application.properties b/sermant-integration-tests/tag-transmission-test/crossthread-demo/src/main/resources/application.properties new file mode 100644 index 0000000000..af7831d32f --- /dev/null +++ b/sermant-integration-tests/tag-transmission-test/crossthread-demo/src/main/resources/application.properties @@ -0,0 +1,2 @@ +common.server.url=http://127.0.0.1:9040/common/httpServer +server.port=9045 \ No newline at end of file diff --git a/sermant-integration-tests/tag-transmission-test/grpc-api-demo/pom.xml b/sermant-integration-tests/tag-transmission-test/grpc-api-demo/pom.xml new file mode 100644 index 0000000000..36fd4ce3e4 --- /dev/null +++ b/sermant-integration-tests/tag-transmission-test/grpc-api-demo/pom.xml @@ -0,0 +1,77 @@ + + + + tag-transmission-test + com.huaweicloud.sermant.tagtransmission + 1.0.0 + + 4.0.0 + + grpc-api-demo + + + 8 + 8 + 1.6.2 + 0.6.1 + + + + + io.grpc + grpc-netty-shaded + ${grpc.version} + + + com.google.protobuf + protobuf-java + ${protobuf.version} + + + io.grpc + grpc-stub + ${grpc.version} + + + io.grpc + grpc-protobuf + ${grpc.version} + + + + + + + kr.motd.maven + os-maven-plugin + ${os.maven.version} + + + + + org.xolstice.maven.plugins + protobuf-maven-plugin + ${protobuf.maven.version} + + com.google.protobuf:protoc:${protobuf.version}:exe:${os.detected.classifier} + grpc-java + io.grpc:protoc-gen-grpc-java:${grpc.version}:exe:${os.detected.classifier} + src/main/proto + src/main/java + false + + + + + compile + compile-custom + + + + + + + + \ No newline at end of file diff --git a/sermant-integration-tests/tag-transmission-test/grpc-api-demo/src/main/proto/grpctest.proto b/sermant-integration-tests/tag-transmission-test/grpc-api-demo/src/main/proto/grpctest.proto new file mode 100644 index 0000000000..73d4d32993 --- /dev/null +++ b/sermant-integration-tests/tag-transmission-test/grpc-api-demo/src/main/proto/grpctest.proto @@ -0,0 +1,34 @@ +/* + * Copyright (C) 2023-2023 Huawei Technologies Co., Ltd. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +syntax = "proto3"; + +option java_multiple_files = true; +option java_package = "com.huaweicloud.demo.tagtransmission.grpc.api.service"; +option java_outer_classname = "GrpcTagTransmissionServiceProto"; + +package grpc; + +service TagTransmissionService { + rpc transmitTag (EmptyRequest) returns (TrafficTag); +} + +message EmptyRequest { +} + +message TrafficTag { + string tag = 1; +} \ No newline at end of file diff --git a/sermant-integration-tests/tag-transmission-test/grpc-client-demo/pom.xml b/sermant-integration-tests/tag-transmission-test/grpc-client-demo/pom.xml new file mode 100644 index 0000000000..44e19b9bd1 --- /dev/null +++ b/sermant-integration-tests/tag-transmission-test/grpc-client-demo/pom.xml @@ -0,0 +1,46 @@ + + + + tag-transmission-test + com.huaweicloud.sermant.tagtransmission + 1.0.0 + + 4.0.0 + + grpc-client-demo + + + 8 + 8 + 1.0.1 + + + + + com.huaweicloud.sermant.tagtransmission + grpc-api-demo + + + org.springframework.boot + spring-boot-starter-web + + + com.google.guava + failureaccess + ${failureaccess.version} + + + + + ${project.artifactId} + + + org.springframework.boot + spring-boot-maven-plugin + + + + + \ No newline at end of file diff --git a/sermant-integration-tests/tag-transmission-test/grpc-client-demo/src/main/java/com/huaweicloud/demo/tagtransmission/grpc/client/GrpcClientApplication.java b/sermant-integration-tests/tag-transmission-test/grpc-client-demo/src/main/java/com/huaweicloud/demo/tagtransmission/grpc/client/GrpcClientApplication.java new file mode 100644 index 0000000000..a0648b62c1 --- /dev/null +++ b/sermant-integration-tests/tag-transmission-test/grpc-client-demo/src/main/java/com/huaweicloud/demo/tagtransmission/grpc/client/GrpcClientApplication.java @@ -0,0 +1,38 @@ +/* + * Copyright (C) 2023-2023 Huawei Technologies Co., Ltd. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.huaweicloud.demo.tagtransmission.grpc.client; + +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; + +/** + * springboot 启动类 + * + * @author daizhenyu + * @since 2023-10-13 + **/ +@SpringBootApplication +public class GrpcClientApplication { + /** + * 启动类 + * + * @param args 进程启动入参 + */ + public static void main(String[] args) { + SpringApplication.run(GrpcClientApplication.class, args); + } +} \ No newline at end of file diff --git a/sermant-integration-tests/tag-transmission-test/grpc-client-demo/src/main/java/com/huaweicloud/demo/tagtransmission/grpc/client/controller/GrpcClientController.java b/sermant-integration-tests/tag-transmission-test/grpc-client-demo/src/main/java/com/huaweicloud/demo/tagtransmission/grpc/client/controller/GrpcClientController.java new file mode 100644 index 0000000000..a474d85c1e --- /dev/null +++ b/sermant-integration-tests/tag-transmission-test/grpc-client-demo/src/main/java/com/huaweicloud/demo/tagtransmission/grpc/client/controller/GrpcClientController.java @@ -0,0 +1,151 @@ +/* + * Copyright (C) 2023-2023 Huawei Technologies Co., Ltd. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.huaweicloud.demo.tagtransmission.grpc.client.controller; + +import com.huaweicloud.demo.tagtransmission.grpc.api.service.EmptyRequest; +import com.huaweicloud.demo.tagtransmission.grpc.api.service.GrpcTagTransmissionServiceProto; +import com.huaweicloud.demo.tagtransmission.grpc.api.service.TagTransmissionServiceGrpc; +import com.huaweicloud.demo.tagtransmission.grpc.api.service.TrafficTag; + +import com.google.protobuf.Descriptors; +import com.google.protobuf.DynamicMessage; + +import io.grpc.CallOptions; +import io.grpc.ManagedChannel; +import io.grpc.ManagedChannelBuilder; +import io.grpc.MethodDescriptor; +import io.grpc.protobuf.ProtoUtils; +import io.grpc.stub.ClientCalls; +import io.grpc.stub.StreamObserver; + +import org.springframework.beans.factory.annotation.Value; +import org.springframework.http.MediaType; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RequestMethod; +import org.springframework.web.bind.annotation.RestController; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; + +/** + * grpc client端 + * + * @author daizhenyu + * @since 2023-10-13 + **/ +@RestController +@RequestMapping(value = "grpc") +public class GrpcClientController { + @Value("${grpc.server.port}") + private int grpcServerPort; + + /** + * 验证grpc透传流量标签,使用stub方式调用服务端 + * + * @return 流量标签值 + */ + @RequestMapping(value = "testGrpcByStub", method = RequestMethod.GET, produces = MediaType.TEXT_PLAIN_VALUE) + public String testGrpcByStub() { + ManagedChannel originChannel = ManagedChannelBuilder.forAddress("localhost", grpcServerPort) + .usePlaintext() + .build(); + TagTransmissionServiceGrpc.TagTransmissionServiceBlockingStub stub = TagTransmissionServiceGrpc + .newBlockingStub(originChannel); + TrafficTag trafficTag = stub.transmitTag(EmptyRequest.newBuilder().build()); + originChannel.shutdown(); + return trafficTag.getTag(); + } + + /** + * 验证grpc透传流量标签,使用dynamic message方式调用服务端 + * + * @return 流量标签值 + * @throws ExecutionException + * @throws InterruptedException + */ + @RequestMapping(value = "testGrpcByDynamicMessage", method = RequestMethod.GET, + produces = MediaType.TEXT_PLAIN_VALUE) + public String testGrpcByDynamicMessage() throws ExecutionException, InterruptedException { + ManagedChannel channel = + ManagedChannelBuilder.forAddress("localhost", grpcServerPort).usePlaintext().build(); + + Descriptors.MethodDescriptor originMethodDescriptor = generateProtobufMethodDescriptor(); + MethodDescriptor methodDescriptor = generateGrpcMethodDescriptor( + originMethodDescriptor); + + // 创建动态消息 + DynamicMessage request = DynamicMessage.newBuilder(originMethodDescriptor.getInputType()).build(); + + // 使用 CompletableFuture 处理异步响应 + CallOptions callOptions = CallOptions.DEFAULT; + CompletableFuture responseFuture = new CompletableFuture<>(); + ClientCalls.asyncUnaryCall(channel.newCall(methodDescriptor, callOptions), request, + new StreamObserver() { + @Override + public void onNext(DynamicMessage value) { + responseFuture.complete(value); + } + + @Override + public void onError(Throwable t) { + responseFuture.completeExceptionally(t); + } + + @Override + public void onCompleted() { + } + }); + + // 等待异步响应完成 + DynamicMessage response = responseFuture.get(); + channel.shutdown(); + return (String) response.getField(originMethodDescriptor.getOutputType().findFieldByName("tag")); + } + + private MethodDescriptor + generateGrpcMethodDescriptor(Descriptors.MethodDescriptor originMethodDescriptor) { + // 生成方法全名 + String fullMethodName = MethodDescriptor + .generateFullMethodName(originMethodDescriptor.getService().getFullName(), + originMethodDescriptor.getName()); + + // 请求和响应类型 + MethodDescriptor.Marshaller inputTypeMarshaller = ProtoUtils + .marshaller(DynamicMessage.newBuilder(originMethodDescriptor.getInputType()) + .buildPartial()); + MethodDescriptor.Marshaller outputTypeMarshaller = ProtoUtils + .marshaller(DynamicMessage.newBuilder(originMethodDescriptor.getOutputType()) + .buildPartial()); + + // 生成方法描述 + return MethodDescriptor.newBuilder() + .setFullMethodName(fullMethodName) + .setRequestMarshaller(inputTypeMarshaller) + .setResponseMarshaller(outputTypeMarshaller) + // 使用 UNKNOWN,自动修改 + .setType(MethodDescriptor.MethodType.UNKNOWN) + .build(); + } + + private Descriptors.MethodDescriptor generateProtobufMethodDescriptor() { + // 构建服务存根 + Descriptors.FileDescriptor serviceFileDescriptor = GrpcTagTransmissionServiceProto.getDescriptor().getFile(); + Descriptors.ServiceDescriptor serviceDescriptor = serviceFileDescriptor + .findServiceByName("TagTransmissionService"); + return serviceDescriptor.getMethods().get(0); + } +} \ No newline at end of file diff --git a/sermant-integration-tests/tag-transmission-test/grpc-client-demo/src/main/resources/application.properties b/sermant-integration-tests/tag-transmission-test/grpc-client-demo/src/main/resources/application.properties new file mode 100644 index 0000000000..26b27691ee --- /dev/null +++ b/sermant-integration-tests/tag-transmission-test/grpc-client-demo/src/main/resources/application.properties @@ -0,0 +1,2 @@ +grpc.server.port=12000 +server.port=9046 \ No newline at end of file diff --git a/sermant-integration-tests/tag-transmission-test/grpc-server-demo/pom.xml b/sermant-integration-tests/tag-transmission-test/grpc-server-demo/pom.xml new file mode 100644 index 0000000000..070e0c6873 --- /dev/null +++ b/sermant-integration-tests/tag-transmission-test/grpc-server-demo/pom.xml @@ -0,0 +1,50 @@ + + + + tag-transmission-test + com.huaweicloud.sermant.tagtransmission + 1.0.0 + + 4.0.0 + + grpc-server-demo + + + 8 + 8 + 1.0.1 + + + + + com.huaweicloud.sermant.tagtransmission + grpc-api-demo + + + org.springframework.boot + spring-boot-starter-web + + + com.huaweicloud.sermant.tagtransmission + tag-transmission-util-demo + + + com.google.guava + failureaccess + ${failureaccess.version} + + + + + ${project.artifactId} + + + org.springframework.boot + spring-boot-maven-plugin + + + + + \ No newline at end of file diff --git a/sermant-integration-tests/tag-transmission-test/grpc-server-demo/src/main/java/com/huaweicloud/demo/tagtransmission/grpc/server/GrpcServer.java b/sermant-integration-tests/tag-transmission-test/grpc-server-demo/src/main/java/com/huaweicloud/demo/tagtransmission/grpc/server/GrpcServer.java new file mode 100644 index 0000000000..ea70c50a17 --- /dev/null +++ b/sermant-integration-tests/tag-transmission-test/grpc-server-demo/src/main/java/com/huaweicloud/demo/tagtransmission/grpc/server/GrpcServer.java @@ -0,0 +1,74 @@ +/* + * Copyright (C) 2023-2023 Huawei Technologies Co., Ltd. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.huaweicloud.demo.tagtransmission.grpc.server; + +import com.huaweicloud.demo.tagtransmission.grpc.server.serviceimpl.GrpcTagTransmissionServiceImpl; + +import io.grpc.Server; +import io.grpc.ServerBuilder; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.boot.CommandLineRunner; +import org.springframework.stereotype.Component; + +import java.io.IOException; + +/** + * grpc server端 + * + * @author daizhenyu + * @since 2023-10-08 + **/ +@Component +public class GrpcServer implements CommandLineRunner { + @Value("${grpc.server.port}") + private int grpcPort; + + @Autowired + private GrpcTagTransmissionServiceImpl tagTransmissionService; + + private Server server; + + @Override + public void run(String[] args) throws InterruptedException, IOException { + start(); + blockUntilShutdown(); + } + + private void start() throws IOException { + server = ServerBuilder.forPort(grpcPort) + .addService(tagTransmissionService) + .build() + .start(); + Runtime.getRuntime().addShutdownHook(new Thread(() -> { + GrpcServer.this.stop(); + })); + } + + private void stop() { + if (server != null) { + server.shutdown(); + } + } + + private void blockUntilShutdown() throws InterruptedException { + if (server != null) { + server.awaitTermination(); + } + } +} \ No newline at end of file diff --git a/sermant-integration-tests/tag-transmission-test/grpc-server-demo/src/main/java/com/huaweicloud/demo/tagtransmission/grpc/server/GrpcServerApplication.java b/sermant-integration-tests/tag-transmission-test/grpc-server-demo/src/main/java/com/huaweicloud/demo/tagtransmission/grpc/server/GrpcServerApplication.java new file mode 100644 index 0000000000..7b8a406378 --- /dev/null +++ b/sermant-integration-tests/tag-transmission-test/grpc-server-demo/src/main/java/com/huaweicloud/demo/tagtransmission/grpc/server/GrpcServerApplication.java @@ -0,0 +1,38 @@ +/* + * Copyright (C) 2023-2023 Huawei Technologies Co., Ltd. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.huaweicloud.demo.tagtransmission.grpc.server; + +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; + +/** + * springboot 启动类 + * + * @author daizhenyu + * @since 2023-10-13 + **/ +@SpringBootApplication +public class GrpcServerApplication { + /** + * 启动类 + * + * @param args 进程启动入参 + */ + public static void main(String[] args) { + SpringApplication.run(GrpcServerApplication.class, args); + } +} \ No newline at end of file diff --git a/sermant-integration-tests/tag-transmission-test/grpc-server-demo/src/main/java/com/huaweicloud/demo/tagtransmission/grpc/server/serviceimpl/GrpcTagTransmissionServiceImpl.java b/sermant-integration-tests/tag-transmission-test/grpc-server-demo/src/main/java/com/huaweicloud/demo/tagtransmission/grpc/server/serviceimpl/GrpcTagTransmissionServiceImpl.java new file mode 100644 index 0000000000..87aaebc949 --- /dev/null +++ b/sermant-integration-tests/tag-transmission-test/grpc-server-demo/src/main/java/com/huaweicloud/demo/tagtransmission/grpc/server/serviceimpl/GrpcTagTransmissionServiceImpl.java @@ -0,0 +1,47 @@ +/* + * Copyright (C) 2023-2023 Huawei Technologies Co., Ltd. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.huaweicloud.demo.tagtransmission.grpc.server.serviceimpl; + +import com.huaweicloud.demo.tagtransmission.grpc.api.service.EmptyRequest; +import com.huaweicloud.demo.tagtransmission.grpc.api.service.TagTransmissionServiceGrpc; +import com.huaweicloud.demo.tagtransmission.grpc.api.service.TrafficTag; +import com.huaweicloud.demo.tagtransmission.util.HttpClientUtils; + +import io.grpc.stub.StreamObserver; + +import org.springframework.beans.factory.annotation.Value; +import org.springframework.stereotype.Component; + +/** + * grpc服务的实现类 + * + * @author daizhenyu + * @since 2023-10-08 + **/ +@Component +public class GrpcTagTransmissionServiceImpl extends TagTransmissionServiceGrpc.TagTransmissionServiceImplBase { + @Value("${common.server.url}") + private String commonServerUrl; + + @Override + public void transmitTag(EmptyRequest request, StreamObserver responseObserver) { + responseObserver.onNext(TrafficTag.newBuilder() + .setTag(HttpClientUtils.doHttpUrlConnectionGet(commonServerUrl)) + .build()); + responseObserver.onCompleted(); + } +} \ No newline at end of file diff --git a/sermant-integration-tests/tag-transmission-test/grpc-server-demo/src/main/resources/application.properties b/sermant-integration-tests/tag-transmission-test/grpc-server-demo/src/main/resources/application.properties new file mode 100644 index 0000000000..aaea4894f9 --- /dev/null +++ b/sermant-integration-tests/tag-transmission-test/grpc-server-demo/src/main/resources/application.properties @@ -0,0 +1,3 @@ +common.server.url=http://127.0.0.1:9040/common/httpServer +grpc.server.port=12000 +server.port=9047 \ No newline at end of file diff --git a/sermant-integration-tests/tag-transmission-test/tag-transmission-util-demo/pom.xml b/sermant-integration-tests/tag-transmission-test/tag-transmission-util-demo/pom.xml new file mode 100644 index 0000000000..b6568842e2 --- /dev/null +++ b/sermant-integration-tests/tag-transmission-test/tag-transmission-util-demo/pom.xml @@ -0,0 +1,35 @@ + + + + + + tag-transmission-test + com.huaweicloud.sermant.tagtransmission + 1.0.0 + + 4.0.0 + + tag-transmission-util-demo + + + 8 + 8 + + + \ No newline at end of file diff --git a/sermant-integration-tests/tag-transmission-test/tag-transmission-util-demo/src/main/java/com/huaweicloud/demo/tagtransmission/util/HttpClientUtils.java b/sermant-integration-tests/tag-transmission-test/tag-transmission-util-demo/src/main/java/com/huaweicloud/demo/tagtransmission/util/HttpClientUtils.java new file mode 100644 index 0000000000..a7ad6225a6 --- /dev/null +++ b/sermant-integration-tests/tag-transmission-test/tag-transmission-util-demo/src/main/java/com/huaweicloud/demo/tagtransmission/util/HttpClientUtils.java @@ -0,0 +1,74 @@ +/* + * Copyright (C) 2023-2023 Huawei Technologies Co., Ltd. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.huaweicloud.demo.tagtransmission.util; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStreamReader; +import java.net.HttpURLConnection; +import java.net.URL; + +/** + * httpclient 工具类 + * + * @author daizhenyu + * @since 2023-10-12 + **/ +public class HttpClientUtils { + private HttpClientUtils() { + } + + /** + * jdkhttp get方法工具类 + * + * @param url + * @return http请求的response + */ + public static String doHttpUrlConnectionGet(String url) { + String responseContext = null; + BufferedReader in = null; + HttpURLConnection connection = null; + try { + URL serverUrl = new URL(url); + connection = (HttpURLConnection) serverUrl.openConnection(); + connection.setRequestMethod("GET"); + + // 读取响应数据 + in = new BufferedReader(new InputStreamReader(connection.getInputStream())); + String inputLine; + StringBuilder content = new StringBuilder(); + while ((inputLine = in.readLine()) != null) { + content.append(inputLine); + } + responseContext = content.toString(); + } catch (IOException e) { + // ignore + } finally { + if (in != null) { + try { + in.close(); + } catch (IOException e) { + // ignore + } + } + if (connection != null) { + connection.disconnect(); + } + } + return responseContext; + } +} \ No newline at end of file