Skip to content

Commit

Permalink
流量标签透传插件集成测试demo:grpc demo、crossthread demo、util demo
Browse files Browse the repository at this point in the history
  • Loading branch information
daizhenyu committed Oct 25, 2023
1 parent 3b96fc4 commit d67b6f9
Show file tree
Hide file tree
Showing 19 changed files with 958 additions and 0 deletions.
15 changes: 15 additions & 0 deletions .github/actions/common/plugin-change-check/action.yml
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,14 @@ runs:
shell: bash
run: |
echo "sermantServiceRemovalChanged=${{ steps.changed-sermant-removal.outputs.changed }}" >> $GITHUB_ENV
- uses: marceloprado/[email protected]
id: changed-sermant-tag-transmission
with:
paths: sermant-plugins/sermant-tag-transmission
- name: env sermant-sermant-tag-transmission
shell: bash
run: |
echo "sermantTagTransmissionChanged=${{ steps.changed-sermant-tag-transmission.outputs.changed }}" >> $GITHUB_ENV
- uses: marceloprado/[email protected]
id: changed-workflow-or-test
with:
Expand Down Expand Up @@ -179,6 +187,12 @@ runs:
echo "enableRemoval=true" >> $GITHUB_ENV
fi
# *****************tagtransmission.yml*****************
# ==========tag transmission is needed to test?==========
if [ ${{ env.sermantAgentCoreChanged }} == 'true' -o ${{ env.sermantTagTransmissionChanged }} == 'true';then
echo "enableTagTransmission=true" >> $GITHUB_ENV
fi
# all workflow will trigger while workflow changed
if [ ${{ steps.changed-workflow-or-test.outputs.changed }} == 'true' -o ${{ env.triggerPushEvent }} == 'true' ];then
echo "enableDubboRouter=true" >> $GITHUB_ENV
Expand All @@ -193,5 +207,6 @@ runs:
echo "enableSpringLane=true" >> $GITHUB_ENV
echo "enableDubboLane=true" >> $GITHUB_ENV
echo "enableRemoval=true" >> $GITHUB_ENV
echo "enableTagTransmission=true" >> $GITHUB_ENV
fi
12 changes: 12 additions & 0 deletions sermant-integration-tests/scripts/tryDownloadMidware.sh
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,12 @@ NACOS_FILE_NAME_210=nacos-server-2.1.0.tar.gz
#======================================Service Center配置======================================
SERVICE_CENTER_ADDRESS=https://github.com/apache/servicecomb-service-center/releases/download/v2.1.0/apache-servicecomb-service-center-2.1.0-linux-amd64.tar.gz
SERVICE_CENTER_FILE_NAME=apache-servicecomb-service-center-2.1.0-linux-amd64.tar.gz
#======================================rocketmq配置======================================
ROCKETMQ_ADDRESS=https://archive.apache.org/dist/rocketmq/4.8.0/rocketmq-all-4.8.0-source-release.zip
ROCKETMQ_FILE_NAME=rocketmq-all-4.8.0-source-release.zip
#======================================rocketmq配置======================================
KAFKA_ADDRESS=https://archive.apache.org/dist/kafka/2.7.0/kafka_2.13-2.7.0.tgz
KAFKA_FILE_NAME=kafka_2.13-2.7.0.tgz

#重试次数
TRY_TIMES=3
Expand Down Expand Up @@ -77,6 +83,12 @@ elif [ $midleware == "nacos210" ]; then
elif [ $midleware == "sc" ]; then
ADDRESS=$SERVICE_CENTER_ADDRESS
FILE_NAME=$SERVICE_CENTER_FILE_NAME
elif [ $midleware == "rocketmq" ]; then
ADDRESS=$ROCKETMQ_ADDRESS
FILE_NAME=$ROCKETMQ_FILE_NAME
elif [ $midleware == "kafka" ]; then
ADDRESS=$KAFKA_ADDRESS
FILE_NAME=$KAFKA_FILE_NAME
else
ADDRESS=$CSE_ADDRESS
FILE_NAME=$CSE_FILE_NAME
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>tag-transmission-test</artifactId>
<groupId>com.huaweicloud.sermant.tagtransmission</groupId>
<version>1.0.0</version>
</parent>
<modelVersion>4.0.0</modelVersion>

<artifactId>crossthread-demo</artifactId>

<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
</properties>

<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>com.huaweicloud.sermant.tagtransmission</groupId>
<artifactId>tag-transmission-util-demo</artifactId>
</dependency>
</dependencies>

<build>
<finalName>${project.artifactId}</finalName>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Copyright (C) 2023-2023 Huawei Technologies Co., Ltd. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.huaweicloud.demo.tagtransmission.crossthread;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

/**
* springboot 启动类
*
* @author daizhenyu
* @since 2023-09-07
**/
@SpringBootApplication
public class CrossThreadApplication {
/**
* 启动类
*
* @param args 进程启动入参
*/
public static void main(String[] args) {
SpringApplication.run(CrossThreadApplication.class, args);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,183 @@
/*
* Copyright (C) 2023-2023 Huawei Technologies Co., Ltd. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.huaweicloud.demo.tagtransmission.crossthread.controller;

import com.huaweicloud.demo.tagtransmission.util.HttpClientUtils;

import org.springframework.beans.factory.annotation.Value;
import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.FutureTask;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

/**
* 验证流量标签传递 跨线程场景
*
* @author daizhenyu
* @since 2023-09-11
**/
@RestController
@RequestMapping(value = "thread")
public class CrossThreadController {
/**
* 存储消费者调用http服务端返回的流量标签
*/
public static final Map<String, String> THREAD_TAG_MAP = new HashMap<>();

private static final int CORE_POOL_SIZE = 2;

private static final int MAX_POOL_SIZE = 2;

private static final int KEEP_ALIVE_TIME = 10;

private static final int QUEUE_CAPACITY = 20;

private static final int INITIAL_DELAY_TIME = 1;

private static final int DELAY_TIME = 10;

private static final int SLEEP_TIME = 5000;

private static final String THREAD_TAG = "threadTag";

private final ThreadPoolExecutor executor = new ThreadPoolExecutor(CORE_POOL_SIZE, MAX_POOL_SIZE, KEEP_ALIVE_TIME,
TimeUnit.SECONDS, new ArrayBlockingQueue<>(QUEUE_CAPACITY));

private final ScheduledExecutorService scheduledExecutor = Executors.newScheduledThreadPool(CORE_POOL_SIZE);

@Value("${common.server.url}")
private String commonServerUrl;

/**
* 新建线程
*
* @return 透传标签值
* @throws ExecutionException
* @throws InterruptedException
*/
@RequestMapping(value = "testNewThread", method = RequestMethod.GET, produces = MediaType.TEXT_PLAIN_VALUE)
public String testNewThread() throws ExecutionException, InterruptedException {
FutureTask<String> futureTask = new FutureTask<>(() -> HttpClientUtils.doHttpUrlConnectionGet(commonServerUrl));
Thread thread = new Thread(futureTask);
thread.start();
return futureTask.get();
}

/**
* 普通线程池执行executor方法提交线程任务
*
* @return 透传标签值
* @throws InterruptedException
*/
@RequestMapping(value = "testExecutor", method = RequestMethod.GET, produces = MediaType.TEXT_PLAIN_VALUE)
public String testExecutor() throws InterruptedException {
String trafficTag = null;
executor.execute(() -> THREAD_TAG_MAP.put(THREAD_TAG, HttpClientUtils.doHttpUrlConnectionGet(commonServerUrl)));
Thread.sleep(SLEEP_TIME);
trafficTag = THREAD_TAG_MAP.get(THREAD_TAG);

// 删除流量标签,以免干扰下一次测试查询
THREAD_TAG_MAP.remove(THREAD_TAG);
return trafficTag;
}

/**
* 普通线程池执行submit方法提交线程任务
*
* @return 透传标签值
* @throws ExecutionException
* @throws InterruptedException
*/
@RequestMapping(value = "testSubmit", method = RequestMethod.GET, produces = MediaType.TEXT_PLAIN_VALUE)
public String testSubmit() throws ExecutionException, InterruptedException {
FutureTask<String> futureTask = new FutureTask<>(() -> HttpClientUtils.doHttpUrlConnectionGet(commonServerUrl));
executor.submit(futureTask);
return futureTask.get();
}

/**
* 定时线程池执行schedule方法提交线程任务
*
* @return 透传标签值
* @throws ExecutionException
* @throws InterruptedException
*/
@RequestMapping(value = "testSchedule", method = RequestMethod.GET, produces = MediaType.TEXT_PLAIN_VALUE)
public String testSchedule() throws ExecutionException, InterruptedException {
FutureTask<String> futureTask = new FutureTask<>(() -> HttpClientUtils.doHttpUrlConnectionGet(commonServerUrl));
scheduledExecutor.schedule(futureTask, INITIAL_DELAY_TIME, TimeUnit.SECONDS);
return futureTask.get();
}

/**
* 定时线程池执行scheduleAtFixedRate方法提交线程任务
*
* @return 透传标签值
* @throws ExecutionException
* @throws InterruptedException
*/
@RequestMapping(value = "testScheduleAtFixedRate", method = RequestMethod.GET,
produces = MediaType.TEXT_PLAIN_VALUE)
public String testScheduleAtFixedRate() throws ExecutionException, InterruptedException {
FutureTask<String> futureTask = new FutureTask<>(() -> HttpClientUtils.doHttpUrlConnectionGet(commonServerUrl));
scheduledExecutor.scheduleAtFixedRate(futureTask, INITIAL_DELAY_TIME, DELAY_TIME, TimeUnit.SECONDS);
return futureTask.get();
}

/**
* 定时线程池执行scheduleWithFixedDelay方法提交线程任务
*
* @return 透传标签值
* @throws ExecutionException
* @throws InterruptedException
*/
@RequestMapping(value = "testScheduleWithFixedDelay", method = RequestMethod.GET,
produces = MediaType.TEXT_PLAIN_VALUE)
public String testScheduleWithFixedDelay() throws ExecutionException, InterruptedException {
FutureTask<String> futureTask = new FutureTask<>(() -> HttpClientUtils.doHttpUrlConnectionGet(commonServerUrl));
scheduledExecutor.scheduleWithFixedDelay(futureTask, INITIAL_DELAY_TIME, DELAY_TIME, TimeUnit.SECONDS);
return futureTask.get();
}

/**
* 关闭线程池
*
* @throws InterruptedException
*/
@RequestMapping(value = "shutdown", method = RequestMethod.GET, produces = MediaType.TEXT_PLAIN_VALUE)
public void shutdownThreadPool() throws InterruptedException {
// 延迟五秒关闭线程池,以防后续线程任务执行
Thread.sleep(SLEEP_TIME);

if (executor != null) {
executor.shutdown();
}
if (scheduledExecutor != null) {
scheduledExecutor.shutdown();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
common.server.url=http://127.0.0.1:9040/common/httpServer
server.port=9045
Loading

0 comments on commit d67b6f9

Please sign in to comment.