From 5c338d0a67affb5bbfa752d3e06d976505985ed3 Mon Sep 17 00:00:00 2001 From: chengyouling Date: Thu, 7 Nov 2024 10:58:01 +0800 Subject: [PATCH] =?UTF-8?q?=E5=A2=9E=E5=8A=A0rocketmq=E6=8F=92=E4=BB=B6?= =?UTF-8?q?=E6=B5=81=E6=B0=B4=E6=B5=8B=E8=AF=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: chengyouling --- .../common/mq-grayscale/rocketmq/action.yml | 59 ++++ .../common/plugin-change-check/action.yml | 23 ++ .../mq-grayscale/rocketmq/action.yml | 131 +++++++++ .../message_gray_integration_test.yml | 120 +++++++++ .../grayscale-rocketmq-consumer-demo/pom.xml | 49 ++++ .../consumer/RocketMqConsumerApplication.java | 38 +++ .../consumer/RocketMqConsumerController.java | 119 +++++++++ .../consumer/RocketMqMessageUtils.java | 73 +++++ .../src/main/resources/application.yaml | 10 + .../pom.xml | 40 +++ .../integration/GrayscaleRocketmqTest.java | 189 +++++++++++++ .../integration/support/KieClient.java | 251 ++++++++++++++++++ .../support/entity/KieConfigEntity.java | 94 +++++++ .../support/entity/KieResponse.java | 37 +++ .../support/utils/LabelGroupUtils.java | 170 ++++++++++++ .../grayscale-rocketmq-producer-demo/pom.xml | 49 ++++ .../producer/MqProducerController.java | 76 ++++++ .../producer/RocketMqProducerApplication.java | 38 +++ .../src/main/resources/application.yaml | 10 + .../mq-grayscale-rocketmq-test/pom.xml | 61 +++++ sermant-integration-tests/pom.xml | 1 + 21 files changed, 1638 insertions(+) create mode 100644 .github/actions/common/mq-grayscale/rocketmq/action.yml create mode 100644 .github/actions/scenarios/mq-grayscale/rocketmq/action.yml create mode 100644 .github/workflows/message_gray_integration_test.yml create mode 100644 sermant-integration-tests/mq-grayscale-rocketmq-test/grayscale-rocketmq-consumer-demo/pom.xml create mode 100644 sermant-integration-tests/mq-grayscale-rocketmq-test/grayscale-rocketmq-consumer-demo/src/main/java/io/sermant/demo/grayscale/rocketmq/consumer/RocketMqConsumerApplication.java create mode 100644 sermant-integration-tests/mq-grayscale-rocketmq-test/grayscale-rocketmq-consumer-demo/src/main/java/io/sermant/demo/grayscale/rocketmq/consumer/RocketMqConsumerController.java create mode 100644 sermant-integration-tests/mq-grayscale-rocketmq-test/grayscale-rocketmq-consumer-demo/src/main/java/io/sermant/demo/grayscale/rocketmq/consumer/RocketMqMessageUtils.java create mode 100644 sermant-integration-tests/mq-grayscale-rocketmq-test/grayscale-rocketmq-consumer-demo/src/main/resources/application.yaml create mode 100644 sermant-integration-tests/mq-grayscale-rocketmq-test/grayscale-rocketmq-integration-test/pom.xml create mode 100644 sermant-integration-tests/mq-grayscale-rocketmq-test/grayscale-rocketmq-integration-test/src/test/java/io/sermant/demo/grayscale/rocketmq/integration/GrayscaleRocketmqTest.java create mode 100644 sermant-integration-tests/mq-grayscale-rocketmq-test/grayscale-rocketmq-integration-test/src/test/java/io/sermant/demo/grayscale/rocketmq/integration/support/KieClient.java create mode 100644 sermant-integration-tests/mq-grayscale-rocketmq-test/grayscale-rocketmq-integration-test/src/test/java/io/sermant/demo/grayscale/rocketmq/integration/support/entity/KieConfigEntity.java create mode 100644 sermant-integration-tests/mq-grayscale-rocketmq-test/grayscale-rocketmq-integration-test/src/test/java/io/sermant/demo/grayscale/rocketmq/integration/support/entity/KieResponse.java create mode 100644 sermant-integration-tests/mq-grayscale-rocketmq-test/grayscale-rocketmq-integration-test/src/test/java/io/sermant/demo/grayscale/rocketmq/integration/support/utils/LabelGroupUtils.java create mode 100644 sermant-integration-tests/mq-grayscale-rocketmq-test/grayscale-rocketmq-producer-demo/pom.xml create mode 100644 sermant-integration-tests/mq-grayscale-rocketmq-test/grayscale-rocketmq-producer-demo/src/main/java/io/sermant/demo/grayscale/rocketmq/producer/MqProducerController.java create mode 100644 sermant-integration-tests/mq-grayscale-rocketmq-test/grayscale-rocketmq-producer-demo/src/main/java/io/sermant/demo/grayscale/rocketmq/producer/RocketMqProducerApplication.java create mode 100644 sermant-integration-tests/mq-grayscale-rocketmq-test/grayscale-rocketmq-producer-demo/src/main/resources/application.yaml create mode 100644 sermant-integration-tests/mq-grayscale-rocketmq-test/pom.xml diff --git a/.github/actions/common/mq-grayscale/rocketmq/action.yml b/.github/actions/common/mq-grayscale/rocketmq/action.yml new file mode 100644 index 0000000000..ba95760a63 --- /dev/null +++ b/.github/actions/common/mq-grayscale/rocketmq/action.yml @@ -0,0 +1,59 @@ +name: "Rocketmq Message Gray Common operations" +description: "Do something common for rocketmq message gray" +runs: + using: "composite" + steps: + - name: Set up JDK ${{ env.javaVersion }} + uses: actions/setup-java@v3 + with: + java-version: ${{ env.javaVersion }} + distribution: 'adopt' + cache: maven + - name: download agent + uses: actions/cache@v3 + with: + path: sermant-agent-*/ + key: ${{ runner.os }}-agent-${{ github.run_id }} + - name: get cse from cache + uses: actions/cache@v3 + with: + path: Local-CSE-2.1.3-linux-amd64.zip + key: ${{ runner.os }}-local-cse + restore-keys: | + ${{ runner.os }}-local-cse + - name: start cse + shell: bash + run: | + export ROOT_PATH=$(pwd) + bash ./sermant-integration-tests/scripts/startCse.sh + - name: get rocketmq from cache + uses: actions/cache@v3 + with: + path: rocketmq-all-5.1.4-bin-release.zip + key: ${{ runner.os }}-rocketmq-all-5.1.4-bin-release + restore-keys: | + ${{ runner.os }}-rocketmq-all-5.1.4-bin-release + - name: start rocketmq server + shell: bash + run: | + unzip rocketmq-all-5.1.4-bin-release.zip + sed -i 's/if \[\[ "$JAVA_MAJOR_VERSION" -lt "9" \]\]/if [ "$JAVA_MAJOR_VERSION" -lt "9" ]/g' rocketmq-all-5.1.4-bin-release/bin/runserver.sh + sed -i 's/-Xms4g -Xmx4g -Xmn2g/-Xms1g -Xmx1g -Xmn1g/g' rocketmq-all-5.1.4-bin-release/bin/runserver.sh + sed -i '22i enablePropertyFilter = true' rocketmq-all-5.1.4-bin-release/conf/broker.conf + nohup bash rocketmq-all-5.1.4-bin-release/bin/mqnamesrv & + - name: start rocketmq broker + shell: bash + run: | + sed -i 's/-Xms8g -Xmx8g -Xmn4g/-Xms1g -Xmx1g -Xmn1g/g' rocketmq-all-5.1.4-bin-release/bin/runbroker.sh + nohup bash rocketmq-all-5.1.4-bin-release/bin/mqbroker -n localhost:9876 -c rocketmq-all-5.1.4-bin-release/conf/broker.conf & + - name: cache dependencies + uses: actions/cache@v3 + with: + path: ~/.m2/repository + key: ${{ runner.os }}-maven-${{ hashFiles('**/pom.xml') }} + restore-keys: | + ${{ runner.os }}-maven- + - name: entry + uses: ./.github/actions/common/entry + with: + log-dir: ./logs/rocketmq-grayscale/common diff --git a/.github/actions/common/plugin-change-check/action.yml b/.github/actions/common/plugin-change-check/action.yml index a5075e5cd5..10c99b3208 100644 --- a/.github/actions/common/plugin-change-check/action.yml +++ b/.github/actions/common/plugin-change-check/action.yml @@ -494,6 +494,22 @@ runs: shell: bash run: | echo "crossthreadTagTransmissionChanged=${{ steps.changed-tag-transmission-crossthread.outputs.changed }}" >> $GITHUB_ENV + - uses: marceloprado/has-changed-path@v1.0.1 + id: changed-mq-grayscale-rocketmq + with: + paths: sermant-plugins/sermant-mq-grayscale/mq-config-common + sermant-plugins/sermant-mq-grayscale/mq-config-service + sermant-plugins/sermant-mq-grayscale/mq-grayscale-rocketmq-plugin + sermant-integration-tests/mq-grayscale-rocketmq-test/grayscale-rocketmq-consumer-demo + sermant-integration-tests/mq-grayscale-rocketmq-test/grayscale-rocketmq-producer-demo + sermant-integration-tests/mq-grayscale-rocketmq-test/grayscale-rocketmq-integration-test + ./.github/workflows/message_gray_integration_test.yml + ./.github/actions/common/mq-grayscale/rocketmq + ./.github/actions/scenarios/mq-grayscale/rocketmq + - name: env mq-grayscale-rocketmq + shell: bash + run: | + echo "mqGrayscaleRocketMqChanged=${{ steps.changed-mq-grayscale-rocketmq.outputs.changed }}" >> $GITHUB_ENV - uses: marceloprado/has-changed-path@v1.0.1 id: changed-dubbo-router-action with: @@ -924,3 +940,10 @@ runs: ${{ steps.changed-common-action.outputs.changed }} == 'true' -o ${{ env.triggerPushEvent }} == 'true' ];then echo "enableXdsServicAction=true" >> $GITHUB_ENV fi + + # ==========mq grayscale rocketmq is needed to test?========== + if [ ${{ env.mqGrayscaleRocketMqChanged }} == 'true' -o \ + ${{ env.sermantAgentCoreChanged }} == 'true' -o \ + ${{ steps.changed-common-action.outputs.changed }} == 'true' -o ${{ env.triggerPushEvent }} == 'true' ];then + echo "enableMqGrayscaleRocketMqAction=true" >> $GITHUB_ENV + fi diff --git a/.github/actions/scenarios/mq-grayscale/rocketmq/action.yml b/.github/actions/scenarios/mq-grayscale/rocketmq/action.yml new file mode 100644 index 0000000000..6ac9ed9e00 --- /dev/null +++ b/.github/actions/scenarios/mq-grayscale/rocketmq/action.yml @@ -0,0 +1,131 @@ +name: "RocketMq Grayscale Test" +description: "Auto test for rocketMq grayscale" +runs: + using: "composite" + steps: + - name: package rocketmq grayscale tests + shell: bash + run: mvn package -Drocketmq-client.version=${{ matrix.rocketMqClientVersion }} -DskipTests --file sermant-integration-tests/mq-grayscale-rocketmq-test/pom.xml + - name: echo test model + shell: bash + run: | + echo "=======test-model======"-${{ matrix.test-model }} + - name: start base producer service for plugin disabled + shell: bash + if: matrix.test-model == 'PLUGIN_ENABLED_FALSE' + env: + dynamic.config.dynamicConfigType: KIE + dynamic.config.serverAddress: 127.0.0.1:30110 + SERVICE_META_ENVIRONMENT: development + run: | + nohup java -javaagent:sermant-agent-${{ env.sermantVersion }}/agent/sermant-agent.jar=appName=grayscale-rocketmq-producer -jar \ + sermant-integration-tests/mq-grayscale-rocketmq-test/grayscale-rocketmq-producer-demo/target/grayscale-rocketmq-producer-demo.jar > ${{ env.logDir }}/rocketmq-producer-base.log 2>&1 & + - name: start gray producer service for plugin disabled + shell: bash + if: matrix.test-model == 'PLUGIN_ENABLED_FALSE' + env: + dynamic.config.dynamicConfigType: KIE + dynamic.config.serverAddress: 127.0.0.1:30110 + SERVICE_META_ENVIRONMENT: development + SERVICE_META_VERSION: 1.0.1 + run: | + nohup java -javaagent:sermant-agent-${{ env.sermantVersion }}/agent/sermant-agent.jar=appName=grayscale-rocketmq-producer -Dserver.port=9040 -jar \ + sermant-integration-tests/mq-grayscale-rocketmq-test/grayscale-rocketmq-producer-demo/target/grayscale-rocketmq-producer-demo.jar > ${{ env.logDir }}/rocketmq-producer-gray.log 2>&1 & + - name: start base producer service for plugin enabled + shell: bash + if: matrix.test-model != 'PLUGIN_ENABLED_FALSE' + env: + dynamic.config.dynamicConfigType: KIE + dynamic.config.serverAddress: 127.0.0.1:30110 + SERVICE_META_ENVIRONMENT: development + run: | + nohup java -javaagent:sermant-agent-${{ env.sermantVersion }}/agent/sermant-agent.jar=appName=grayscale-rocketmq-producer -Dgrayscale.mq.config.enabled=true -jar \ + sermant-integration-tests/mq-grayscale-rocketmq-test/grayscale-rocketmq-producer-demo/target/grayscale-rocketmq-producer-demo.jar > ${{ env.logDir }}/rocketmq-producer-base.log 2>&1 & + - name: start gray producer service for plugin enabled + shell: bash + if: matrix.test-model != 'PLUGIN_ENABLED_FALSE' + env: + dynamic.config.dynamicConfigType: KIE + dynamic.config.serverAddress: 127.0.0.1:30110 + SERVICE_META_ENVIRONMENT: development + SERVICE_META_VERSION: 1.0.1 + run: | + nohup java -javaagent:sermant-agent-${{ env.sermantVersion }}/agent/sermant-agent.jar=appName=grayscale-rocketmq-producer -Dgrayscale.mq.config.enabled=true -Dserver.port=9040 -jar \ + sermant-integration-tests/mq-grayscale-rocketmq-test/grayscale-rocketmq-producer-demo/target/grayscale-rocketmq-producer-demo.jar > ${{ env.logDir }}/rocketmq-producer-gray.log 2>&1 & + - name: start base consumer service for plugin disabled + shell: bash + if: matrix.test-model == 'PLUGIN_ENABLED_FALSE' + env: + dynamic.config.dynamicConfigType: KIE + dynamic.config.serverAddress: 127.0.0.1:30110 + SERVICE_META_ENVIRONMENT: development + run: | + nohup java -javaagent:sermant-agent-${{ env.sermantVersion }}/agent/sermant-agent.jar=appName=grayscale-rocketmq-consumer -jar \ + sermant-integration-tests/mq-grayscale-rocketmq-test/grayscale-rocketmq-consumer-demo/target/grayscale-rocketmq-consumer-demo.jar > ${{ env.logDir }}/rocketmq-consumer-base.log 2>&1 & + - name: start base consumer service for plugin enabled + shell: bash + if: matrix.test-model != 'PLUGIN_ENABLED_FALSE' + env: + dynamic.config.dynamicConfigType: KIE + dynamic.config.serverAddress: 127.0.0.1:30110 + SERVICE_META_ENVIRONMENT: development + run: | + nohup java -javaagent:sermant-agent-${{ env.sermantVersion }}/agent/sermant-agent.jar=appName=grayscale-rocketmq-consumer -Dgrayscale.mq.config.enabled=true -jar \ + sermant-integration-tests/mq-grayscale-rocketmq-test/grayscale-rocketmq-consumer-demo/target/grayscale-rocketmq-consumer-demo.jar > ${{ env.logDir }}/rocketmq-consumer-base.log 2>&1 & + - name: start gray consumer service for plugin disabled + shell: bash + if: matrix.test-model == 'PLUGIN_ENABLED_FALSE' + env: + dynamic.config.dynamicConfigType: KIE + dynamic.config.serverAddress: 127.0.0.1:30110 + SERVICE_META_ENVIRONMENT: development + SERVICE_META_VERSION: 1.0.1 + run: | + nohup java -javaagent:sermant-agent-${{ env.sermantVersion }}/agent/sermant-agent.jar=appName=grayscale-rocketmq-consumer -Dserver.port=9010 -jar \ + sermant-integration-tests/mq-grayscale-rocketmq-test/grayscale-rocketmq-consumer-demo/target/grayscale-rocketmq-consumer-demo.jar > ${{ env.logDir }}/rocketmq-consumer-gray.log 2>&1 & + - name: start gray consumer service for plugin enabled + shell: bash + if: matrix.test-model == 'CONSUMER_BASE_GRAY' + env: + dynamic.config.dynamicConfigType: KIE + dynamic.config.serverAddress: 127.0.0.1:30110 + SERVICE_META_ENVIRONMENT: development + SERVICE_META_VERSION: 1.0.1 + run: | + nohup java -javaagent:sermant-agent-${{ env.sermantVersion }}/agent/sermant-agent.jar=appName=grayscale-rocketmq-consumer -Dgrayscale.mq.config.enabled=true -Dserver.port=9010 -jar \ + sermant-integration-tests/mq-grayscale-rocketmq-test/grayscale-rocketmq-consumer-demo/target/grayscale-rocketmq-consumer-demo.jar > ${{ env.logDir }}/rocketmq-consumer-gray.log 2>&1 & + - name: waiting for service start for only base + shell: bash + if: matrix.test-model == 'CONSUMER_BASE_ONLY' + run: | + bash ./sermant-integration-tests/scripts/checkService.sh http://127.0.0.1:9030/actuator/health 200 + bash ./sermant-integration-tests/scripts/checkService.sh http://127.0.0.1:9040/actuator/health 200 + bash ./sermant-integration-tests/scripts/checkService.sh http://127.0.0.1:9000/actuator/health 200 + - name: waiting for service start for base and gray + shell: bash + if: matrix.test-model != 'CONSUMER_BASE_ONLY' + run: | + bash ./sermant-integration-tests/scripts/checkService.sh http://127.0.0.1:9030/actuator/health 200 + bash ./sermant-integration-tests/scripts/checkService.sh http://127.0.0.1:9040/actuator/health 200 + bash ./sermant-integration-tests/scripts/checkService.sh http://127.0.0.1:9000/actuator/health 200 + bash ./sermant-integration-tests/scripts/checkService.sh http://127.0.0.1:9010/actuator/health 200 + - name: test message grayscale rocketmq + shell: bash + run: | + mvn test -Dgrayscale.rocketmq.integration.test.type=${{ matrix.test-model }} --file \ + sermant-integration-tests/mq-grayscale-rocketmq-test/grayscale-rocketmq-integration-test/pom.xml + - name: exit + if: always() + uses: ./.github/actions/common/exit + with: + processor-keyword: grayscale-rocketmq + - name: if failure then upload error log + uses: actions/upload-artifact@v3 + if: ${{ failure() || cancelled() }} + with: + name: (test-for-grayscale-rocketmq)-(${{ matrix.test-model }}})-logs + path: | + ./*.log + ./logs/**/*.log + if-no-files-found: warn + retention-days: 2 diff --git a/.github/workflows/message_gray_integration_test.yml b/.github/workflows/message_gray_integration_test.yml new file mode 100644 index 0000000000..6507ea9017 --- /dev/null +++ b/.github/workflows/message_gray_integration_test.yml @@ -0,0 +1,120 @@ +name: Message gray integration test +env: + sermantVersion: 1.0.0 +on: + push: + pull_request: + branches: + - '*' + paths: + - 'sermant-agentcore/**' + - 'sermant-integration-tests/mq-grayscale-rocketmq-test/**' + - 'sermant-plugins/sermant-mq-grayscale/**' + - '.github/workflows/message_gray_integration_test.yml' + - '.github/actions/common/mq-grayscale/rocketmq/**' + - '.github/actions/scenarios/mq-grayscale/rocketmq/**' + - '.github/actions/common/plugin-change-check/action.yml' + - '.github/actions/common/entry/action.yml' + - '.github/actions/common/exit/action.yml' +concurrency: + group: ${{ github.workflow }}-${{ github.event.pull_request.number || github.ref }}-${{ github.head_ref }} + cancel-in-progress: true +jobs: + set-execution-conditions: + name: set-execution-conditions + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v3 + with: + fetch-depth: 100 + - name: plugin-change-check + id: plugin-change-check + uses: ./.github/actions/common/plugin-change-check + - name: set-outputs + id: set-outputs + run: | + echo "enableMqGrayscaleRocketMqAction=${{env.enableMqGrayscaleRocketMqAction}}" >> $GITHUB_OUTPUT + outputs: + enableMqGrayscaleRocketMqAction: ${{ steps.set-outputs.outputs.enableMqGrayscaleRocketMqAction }} + download-midwares-and-cache: + name: download midwares and cache + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v3 + - name: cache local cse + uses: actions/cache@v3 + with: + path: Local-CSE-2.1.3-linux-amd64.zip + key: ${{ runner.os }}-local-cse + restore-keys: | + ${{ runner.os }}-local-cse + - name: download cse + run: | + export ROOT_PATH=$(pwd) + bash ./sermant-integration-tests/scripts/tryDownloadMidware.sh cse + - name: cache rocketmq + uses: actions/cache@v3 + with: + path: rocketmq-all-5.1.4-bin-release.zip + key: ${{ runner.os }}-rocketmq-all-5.1.4-bin-release + restore-keys: | + ${{ runner.os }}-rocketmq-all-5.1.4-bin-release + - name: download rocketmq + run: | + export ROOT_PATH=$(pwd) + bash ./sermant-integration-tests/scripts/tryDownloadMidware.sh rocketmq514 + build-agent-and-cache: + name: build agent and cache + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v3 + - name: Set up JDK 8 + uses: actions/setup-java@v3 + with: + java-version: '8' + distribution: 'adopt' + cache: maven + - name: cache agent + uses: actions/cache@v3 + with: + path: sermant-agent-*/ + key: ${{ runner.os }}-agent-${{ github.run_id }} + - name: package agent + run: | + sed -i '/sermant-backend/d' pom.xml + sed -i '/sermant-integration-tests/d' pom.xml + sed -i '/sermant-injector/d' pom.xml + sed -i '/sermant-flowcontrol/d' sermant-plugins/pom.xml + sed -i '/sermant-database-write-prohibition/d' sermant-plugins/pom.xml + sed -i '/sermant-spring-beans-deal/d' sermant-plugins/pom.xml + sed -i '/sermant-service-removal/d' sermant-plugins/pom.xml + sed -i '/sermant-service-visibility/d' sermant-plugins/pom.xml + sed -i '/sermant-monitor/d' sermant-plugins/pom.xml + sed -i '/sermant-mq-consume-prohibition/d' sermant-plugins/pom.xml + sed -i '/sermant-springboot-registry/d' sermant-plugins/pom.xml + sed -i '/sermant-service-registry/d' sermant-plugins/pom.xml + sed -i '/sermant-dynamic-config/d' sermant-plugins/pom.xml + sed -i '/sermant-router/d' sermant-plugins/pom.xml + sed -i '/sermant-loadbalancer/d' sermant-plugins/pom.xml + mvn package -DskipTests -Ptest --file pom.xml + test-for-grayscale-rocketmq: + name: Test for grayscale rocketmq + runs-on: ubuntu-latest + if: needs.set-execution-conditions.outputs.enableMqGrayscaleRocketMqAction=='true' + needs: [set-execution-conditions, build-agent-and-cache, download-midwares-and-cache] + strategy: + matrix: + test-model: [ 'PLUGIN_ENABLED_FALSE','CONSUMER_BASE_ONLY','CONSUMER_BASE_GRAY' ] + rocketMqClientVersion: ["4.8.0", "4.9.0", "4.9.1", "4.9.2", "4.9.3", "4.9.4", "4.9.5", "4.9.6", "4.9.7", "4.9.8", "5.0.0"] + fail-fast: false + steps: + - uses: actions/checkout@v3 + with: + fetch-depth: 100 + - name: set java version to environment + run: | + echo "javaVersion=8" >> $GITHUB_ENV + - name: common operations + uses: ./.github/actions/common/mq-grayscale/rocketmq + - name: message gray test for test-model=${{ matrix.test-model }} rocketMqClientVersion=${{ matrix.rocketMqClientVersion }} + uses: ./.github/actions/scenarios/mq-grayscale/rocketmq diff --git a/sermant-integration-tests/mq-grayscale-rocketmq-test/grayscale-rocketmq-consumer-demo/pom.xml b/sermant-integration-tests/mq-grayscale-rocketmq-test/grayscale-rocketmq-consumer-demo/pom.xml new file mode 100644 index 0000000000..0e92b41e45 --- /dev/null +++ b/sermant-integration-tests/mq-grayscale-rocketmq-test/grayscale-rocketmq-consumer-demo/pom.xml @@ -0,0 +1,49 @@ + + + + mq-grayscale-rocketmq-test + io.sermant.integration + 1.0.0 + + 4.0.0 + + grayscale-rocketmq-consumer-demo + 1.0.0 + + + 8 + 8 + + + + + org.springframework.boot + spring-boot-starter-web + + + org.apache.rocketmq + rocketmq-client + + + org.apache.rocketmq + rocketmq-common + + + org.springframework.boot + spring-boot-starter-actuator + + + + + ${project.artifactId} + + + org.springframework.boot + spring-boot-maven-plugin + + + + + diff --git a/sermant-integration-tests/mq-grayscale-rocketmq-test/grayscale-rocketmq-consumer-demo/src/main/java/io/sermant/demo/grayscale/rocketmq/consumer/RocketMqConsumerApplication.java b/sermant-integration-tests/mq-grayscale-rocketmq-test/grayscale-rocketmq-consumer-demo/src/main/java/io/sermant/demo/grayscale/rocketmq/consumer/RocketMqConsumerApplication.java new file mode 100644 index 0000000000..76e0bd3678 --- /dev/null +++ b/sermant-integration-tests/mq-grayscale-rocketmq-test/grayscale-rocketmq-consumer-demo/src/main/java/io/sermant/demo/grayscale/rocketmq/consumer/RocketMqConsumerApplication.java @@ -0,0 +1,38 @@ +/* + * Copyright (C) 2024-2024 Sermant Authors. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.sermant.demo.grayscale.rocketmq.consumer; + +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; + +/** + * springboot starter + * + * @author chengyouling + * @since 2024-10-30 + **/ +@SpringBootApplication +public class RocketMqConsumerApplication { + /** + * 启动类 + * + * @param args 进程启动入参 + */ + public static void main(String[] args) { + SpringApplication.run(RocketMqConsumerApplication.class, args); + } +} diff --git a/sermant-integration-tests/mq-grayscale-rocketmq-test/grayscale-rocketmq-consumer-demo/src/main/java/io/sermant/demo/grayscale/rocketmq/consumer/RocketMqConsumerController.java b/sermant-integration-tests/mq-grayscale-rocketmq-test/grayscale-rocketmq-consumer-demo/src/main/java/io/sermant/demo/grayscale/rocketmq/consumer/RocketMqConsumerController.java new file mode 100644 index 0000000000..6e0c922c7a --- /dev/null +++ b/sermant-integration-tests/mq-grayscale-rocketmq-test/grayscale-rocketmq-consumer-demo/src/main/java/io/sermant/demo/grayscale/rocketmq/consumer/RocketMqConsumerController.java @@ -0,0 +1,119 @@ +/* + * Copyright (C) 2024-2024 Sermant Authors. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.sermant.demo.grayscale.rocketmq.consumer; + +import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer; +import org.apache.rocketmq.client.consumer.PullStatus; +import org.apache.rocketmq.client.exception.MQBrokerException; +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.client.impl.consumer.PullResultExt; +import org.apache.rocketmq.common.message.MessageExt; +import org.apache.rocketmq.common.message.MessageQueue; +import org.apache.rocketmq.remoting.exception.RemotingException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.RestController; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; + +/** + * consumer controller + * + * @author chengyouling + * @since 2024-10-30 + **/ +@RestController +public class RocketMqConsumerController { + private static final Logger LOGGER = LoggerFactory.getLogger(RocketMqConsumerController.class); + + private DefaultMQPullConsumer pullConsumer; + + private Map offSetTable = new HashMap<>(); + + @Value("${rocketmq.address}") + private String nameServer; + + @Value("${rocketmq.topic}") + private String topic; + + private final int maxReconsumeTimes = 10000; + + private final int maxNums = 32; + + /** + * clear cache count info + * + * @return is success + */ + @GetMapping("/clearMessageCount") + public String clearMessageCount() { + RocketMqMessageUtils.clearMessageCount(); + return "success"; + } + + /** + * pull message + * + * @return message count + */ + @GetMapping("/getMessageCount") + public Map getMessageCount() { + try { + if (pullConsumer == null) { + pullConsumer = new DefaultMQPullConsumer("default"); + pullConsumer.setNamesrvAddr(nameServer); + pullConsumer.setMaxReconsumeTimes(maxReconsumeTimes); + pullConsumer.start(); + } + Set messageQueues = pullConsumer.fetchSubscribeMessageQueues(topic); + if (messageQueues.isEmpty()) { + return new HashMap<>(); + } + for (MessageQueue mq : messageQueues) { + PullResultExt pullResult = (PullResultExt) pullConsumer.pullBlockIfNotFound(mq, null, + getMessageQueueOffSet(mq), maxNums); + putMessageQueueOffSet(mq, pullResult); + if (pullResult.getPullStatus() == PullStatus.FOUND) { + List messageExts = pullResult.getMsgFoundList(); + for (MessageExt messageExt: messageExts) { + RocketMqMessageUtils.convertMessageCount(messageExt); + } + } + } + return RocketMqMessageUtils.getMessageCount(); + } catch (MQClientException | RemotingException | MQBrokerException | InterruptedException e) { + LOGGER.error("consumePullMessage error!", e); + return new HashMap<>(); + } + } + + private void putMessageQueueOffSet(MessageQueue mq, PullResultExt pullResult) { + offSetTable.put(mq, pullResult.getNextBeginOffset()); + } + + private long getMessageQueueOffSet(MessageQueue mq) { + if (offSetTable.get(mq) != null) { + return offSetTable.get(mq); + } + return 0; + } +} diff --git a/sermant-integration-tests/mq-grayscale-rocketmq-test/grayscale-rocketmq-consumer-demo/src/main/java/io/sermant/demo/grayscale/rocketmq/consumer/RocketMqMessageUtils.java b/sermant-integration-tests/mq-grayscale-rocketmq-test/grayscale-rocketmq-consumer-demo/src/main/java/io/sermant/demo/grayscale/rocketmq/consumer/RocketMqMessageUtils.java new file mode 100644 index 0000000000..13bc301e54 --- /dev/null +++ b/sermant-integration-tests/mq-grayscale-rocketmq-test/grayscale-rocketmq-consumer-demo/src/main/java/io/sermant/demo/grayscale/rocketmq/consumer/RocketMqMessageUtils.java @@ -0,0 +1,73 @@ +/* + * Copyright (C) 2024-2024 Sermant Authors. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.sermant.demo.grayscale.rocketmq.consumer; + +import org.apache.rocketmq.common.message.MessageExt; + +import java.util.HashMap; +import java.util.Map; + +/** + * message count utils + * + * @author chengyouling + * @since 2024-10-30 + */ +public class RocketMqMessageUtils { + private static final String GRAY = "gray"; + + private static int grayMessageCount = 0; + + private static int baseMessageCount = 0; + + private RocketMqMessageUtils() { + } + + /** + * convert base/gray message + * + * @param messageExt MessageExt + */ + public static void convertMessageCount(MessageExt messageExt) { + if (messageExt.getProperties() != null && GRAY.equals(messageExt.getProperties().get( + "x_lane_canary"))) { + grayMessageCount++; + return; + } + baseMessageCount++; + } + + /** + * get base/gray message count + * + * @return message count + */ + public static Map getMessageCount() { + Map countMap = new HashMap<>(); + countMap.put("baseMessageCount", baseMessageCount); + countMap.put("grayMessageCount", grayMessageCount); + return countMap; + } + + /** + * clear cache count info + */ + public static void clearMessageCount() { + baseMessageCount = 0; + grayMessageCount = 0; + } +} diff --git a/sermant-integration-tests/mq-grayscale-rocketmq-test/grayscale-rocketmq-consumer-demo/src/main/resources/application.yaml b/sermant-integration-tests/mq-grayscale-rocketmq-test/grayscale-rocketmq-consumer-demo/src/main/resources/application.yaml new file mode 100644 index 0000000000..2c81926171 --- /dev/null +++ b/sermant-integration-tests/mq-grayscale-rocketmq-test/grayscale-rocketmq-consumer-demo/src/main/resources/application.yaml @@ -0,0 +1,10 @@ +server: + port: 9000 +rocketmq: + address: 127.0.0.1:9876 + topic: MESSAGE-GRAY +management: + endpoints: + web: + exposure: + include: "*" diff --git a/sermant-integration-tests/mq-grayscale-rocketmq-test/grayscale-rocketmq-integration-test/pom.xml b/sermant-integration-tests/mq-grayscale-rocketmq-test/grayscale-rocketmq-integration-test/pom.xml new file mode 100644 index 0000000000..17cca01e74 --- /dev/null +++ b/sermant-integration-tests/mq-grayscale-rocketmq-test/grayscale-rocketmq-integration-test/pom.xml @@ -0,0 +1,40 @@ + + + + mq-grayscale-rocketmq-test + io.sermant.integration + 1.0.0 + + 4.0.0 + + grayscale-rocketmq-integration-test + 1.0.0 + + + 8 + 8 + + + + + org.junit.jupiter + junit-jupiter + test + + + org.springframework + spring-web + 5.3.20 + test + + + com.alibaba + fastjson + 2.0.9 + test + + + + diff --git a/sermant-integration-tests/mq-grayscale-rocketmq-test/grayscale-rocketmq-integration-test/src/test/java/io/sermant/demo/grayscale/rocketmq/integration/GrayscaleRocketmqTest.java b/sermant-integration-tests/mq-grayscale-rocketmq-test/grayscale-rocketmq-integration-test/src/test/java/io/sermant/demo/grayscale/rocketmq/integration/GrayscaleRocketmqTest.java new file mode 100644 index 0000000000..b80ba69fef --- /dev/null +++ b/sermant-integration-tests/mq-grayscale-rocketmq-test/grayscale-rocketmq-integration-test/src/test/java/io/sermant/demo/grayscale/rocketmq/integration/GrayscaleRocketmqTest.java @@ -0,0 +1,189 @@ +/* + * Copyright (C) 2024-2024 Sermant Authors. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.sermant.demo.grayscale.rocketmq.integration; + +import io.sermant.demo.grayscale.rocketmq.integration.support.KieClient; + +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.condition.EnabledIfSystemProperty; +import org.springframework.web.client.RestTemplate; + +import java.util.concurrent.TimeUnit; + +import com.alibaba.fastjson.JSON; + +/** + * test method class + * + * @author chengyouling + * @since 2024-10-30 + */ +public class GrayscaleRocketmqTest { + private static final RestTemplate restTemplate = new RestTemplate(); + + private static final KieClient kieClient = new KieClient(restTemplate); + + private static final String CONFIG_KEY = "grayscale.mq.config"; + + @Test + @EnabledIfSystemProperty(named = "grayscale.rocketmq.integration.test.type", matches = "PLUGIN_ENABLED_FALSE") + public void testPluginEnabledFalseConsumeMessage() { + producerMessage(); + int baseBaseCount = (int) JSON.parseObject(getBaseResult()).get("baseMessageCount"); + int grayBaseCount = (int) JSON.parseObject(getGrayResult()).get("baseMessageCount"); + Assertions.assertTrue(baseBaseCount == 20 && grayBaseCount == 20); + } + + @Test + @EnabledIfSystemProperty(named = "grayscale.rocketmq.integration.test.type", matches = "CONSUMER_BASE_ONLY") + public void testAutoOnlyBaseConsumeMessage() { + createGrayscaleConfig("AUTO", ""); + triggerAndClearCacheCount(false); + producerMessage(); + String baseResult = getBaseResult(); + int baseBaseCount = (int) JSON.parseObject(baseResult).get("baseMessageCount"); + int baseGrayCount = (int) JSON.parseObject(baseResult).get("grayMessageCount"); + Assertions.assertTrue(baseBaseCount == 10 && baseGrayCount == 10); + } + + @Test + @EnabledIfSystemProperty(named = "grayscale.rocketmq.integration.test.type", matches = "CONSUMER_BASE_ONLY") + public void testAutoBaseExcConsumeMessage() { + createGrayscaleConfig("AUTO", "gray"); + triggerAndClearCacheCount(false); + producerMessage(); + String baseResult = getBaseResult(); + int baseBaseCount = (int) JSON.parseObject(baseResult).get("baseMessageCount"); + int baseGrayCount = (int) JSON.parseObject(baseResult).get("grayMessageCount"); + Assertions.assertTrue(baseBaseCount == 10 && baseGrayCount == 0); + } + + @Test + @EnabledIfSystemProperty(named = "grayscale.rocketmq.integration.test.type", matches = "CONSUMER_BASE_GRAY") + public void testAutoBaseGrayConsumeMessage() throws InterruptedException { + createGrayscaleConfig("AUTO", ""); + triggerAndClearCacheCount(true); + producerMessage(); + Thread.sleep(30000); + int baseBaseCount = (int) JSON.parseObject(getBaseResult()).get("baseMessageCount"); + int grayGrayCount = (int) JSON.parseObject(getGrayResult()).get("grayMessageCount"); + Assertions.assertTrue(baseBaseCount == 10 && grayGrayCount == 10); + } + + @Test + @EnabledIfSystemProperty(named = "grayscale.rocketmq.integration.test.type", matches = "CONSUMER_BASE_ONLY") + public void testBaseBaseOnlyConsumeMessage() { + createGrayscaleConfig("BASE", ""); + triggerAndClearCacheCount(false); + producerMessage(); + String baseResult = getBaseResult(); + int baseBaseCount = (int) JSON.parseObject(baseResult).get("baseMessageCount"); + int baseGrayCount = (int) JSON.parseObject(baseResult).get("grayMessageCount"); + Assertions.assertTrue( baseBaseCount == 10 && baseGrayCount == 10); + } + + @Test + @EnabledIfSystemProperty(named = "grayscale.rocketmq.integration.test.type", matches = "CONSUMER_BASE_ONLY") + public void testBaseBaseExcGrayConsumeMessage() { + createGrayscaleConfig("BASE", "gray"); + triggerAndClearCacheCount(false); + producerMessage(); + String baseResult = getBaseResult(); + int baseBaseCount = (int) JSON.parseObject(baseResult).get("baseMessageCount"); + int baseGrayCount = (int) JSON.parseObject(baseResult).get("grayMessageCount"); + Assertions.assertTrue(baseBaseCount == 10 && baseGrayCount == 0); + } + + @Test + @EnabledIfSystemProperty(named = "grayscale.rocketmq.integration.test.type", matches = "CONSUMER_BASE_GRAY") + public void testBaseBaseGrayConsumeMessage() { + createGrayscaleConfig("BASE", ""); + triggerAndClearCacheCount(true); + producerMessage(); + int baseBaseCount = (int) JSON.parseObject(getBaseResult()).get("baseMessageCount"); + int grayGrayCount = (int) JSON.parseObject(getGrayResult()).get("grayMessageCount"); + Assertions.assertTrue(baseBaseCount == 10 && grayGrayCount <= 10); + } + + @Test + @EnabledIfSystemProperty(named = "grayscale.rocketmq.integration.test.type", matches = "CONSUMER_BASE_GRAY") + public void testBaseBaseGrayExcGrayConsumeMessage() { + createGrayscaleConfig("BASE", "gray"); + triggerAndClearCacheCount(true); + producerMessage(); + int baseBaseCount = (int) JSON.parseObject(getBaseResult()).get("baseMessageCount"); + int grayGrayCount = (int) JSON.parseObject(getGrayResult()).get("grayMessageCount"); + Assertions.assertTrue(baseBaseCount == 10 && grayGrayCount == 10); + } + + private String getBaseResult() { + return restTemplate.getForObject("http://127.0.0.1:9000/getMessageCount", String.class); + } + + private String getGrayResult() { + return restTemplate.getForObject("http://127.0.0.1:9010/getMessageCount", String.class); + } + + private void producerMessage() { + for (int i = 0; i < 10; i++) { + restTemplate.getForObject("http://127.0.0.1:9030/producerMessage?message={1}", + String.class, "message"); + restTemplate.getForObject("http://127.0.0.1:9040/producerMessage?message={1}", + String.class, "gray-message"); + } + } + + private void triggerAndClearCacheCount(boolean isGrayInstanceInit) { + if (isGrayInstanceInit) { + // Trigger start gray consumer. + restTemplate.getForObject("http://127.0.0.1:9010/getMessageCount", String.class); + restTemplate.getForObject("http://127.0.0.1:9010/clearMessageCount", String.class); + } + + // Trigger start base consumer. + restTemplate.getForObject("http://127.0.0.1:9000/getMessageCount", String.class); + restTemplate.getForObject("http://127.0.0.1:9000/clearMessageCount", String.class); + } + + private void createGrayscaleConfig(String consumeMode, String excludeTag) { + String CONTENT = "enabled: true\n" + + "grayscale:\n" + + " - consumerGroupTag: gray\n" + + " serviceMeta:\n" + + " version: 1.0.1\n" + + " trafficTag:\n" + + " x_lane_canary: gray\n" + + "base:\n" + + " autoCheckDelayTime: 10\n" + + " consumeMode: " + consumeMode + "\n" + + " excludeGroupTags: ["+ excludeTag + "]\n"; + + Assertions.assertTrue(kieClient.publishConfig(CONFIG_KEY, CONTENT)); + } + + @AfterAll + public static void deleteGrayscaleConfig() { + kieClient.deleteKey(CONFIG_KEY); + try { + TimeUnit.SECONDS.sleep(2); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } +} diff --git a/sermant-integration-tests/mq-grayscale-rocketmq-test/grayscale-rocketmq-integration-test/src/test/java/io/sermant/demo/grayscale/rocketmq/integration/support/KieClient.java b/sermant-integration-tests/mq-grayscale-rocketmq-test/grayscale-rocketmq-integration-test/src/test/java/io/sermant/demo/grayscale/rocketmq/integration/support/KieClient.java new file mode 100644 index 0000000000..76ea118248 --- /dev/null +++ b/sermant-integration-tests/mq-grayscale-rocketmq-test/grayscale-rocketmq-integration-test/src/test/java/io/sermant/demo/grayscale/rocketmq/integration/support/KieClient.java @@ -0,0 +1,251 @@ +/* + * Copyright (C) 2024-2024 Sermant Authors. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.sermant.demo.grayscale.rocketmq.integration.support; + +import com.alibaba.fastjson.JSONObject; + +import io.sermant.demo.grayscale.rocketmq.integration.support.entity.KieConfigEntity; +import io.sermant.demo.grayscale.rocketmq.integration.support.entity.KieResponse; +import io.sermant.demo.grayscale.rocketmq.integration.support.utils.LabelGroupUtils; + +import org.springframework.http.HttpEntity; +import org.springframework.http.HttpHeaders; +import org.springframework.http.HttpStatus; +import org.springframework.http.MediaType; +import org.springframework.http.ResponseEntity; +import org.springframework.web.client.HttpClientErrorException; +import org.springframework.web.client.RestTemplate; + +import java.io.Serializable; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * Kie response client + * + * @author chengyouling + * @since 2024-10-30 + */ +public class KieClient { + private final RestTemplate restTemplate; + + private final String url; + + private final Map labels; + + /** + * 构造函数 + * + * @param restTemplate 请求器 + */ + public KieClient(RestTemplate restTemplate) { + this(restTemplate, null); + } + + /** + * 构造函数 + * + * @param restTemplate 请求器 + * @param url 地址 + */ + public KieClient(RestTemplate restTemplate, String url) { + this(restTemplate, url, null); + } + + /** + * 构造函数 + * + * @param restTemplate 请求器 + * @param url 地址 + * @param labels 标签 + */ + public KieClient(RestTemplate restTemplate, String url, Map labels) { + this.restTemplate = restTemplate; + this.url = url == null ? "http://127.0.0.1:30110/v1/default/kie/kv" : url; + this.labels = labels; + } + + /** + * 发布配置 + * + * @param key 键 + * @param value 值 + * @return 是否发布成功 + */ + public boolean publishConfig(String key, String value) { + final KieConfigEntity configEntity = queryTargetKeyId(key); + if (configEntity != null) { + // 更新操作 + return updateKey(configEntity, value); + } else { + // 新增操作 + return addKey(key, value); + } + } + + /** + * 更新配置 + * + * @param configEntity 查询的config信息 + * @param value 新的配置内容 + * @return 是否更新成功 + */ + public boolean updateKey(KieConfigEntity configEntity, String value) { + HttpHeaders headers = new HttpHeaders(); + MediaType type = MediaType.parseMediaType("application/json; charset=UTF-8"); + headers.setContentType(type); + headers.add("Accept", MediaType.APPLICATION_JSON.toString()); + + HttpEntity entity = new HttpEntity<>(JSONObject.toJSONString(new KieRequest(configEntity.getKey(), + value, labels)), + headers); + String address = this.url + "/" + configEntity.getId(); + restTemplate.put(address, entity); + return true; + } + + /** + * 根据key删除配置 + * + * @param key 键 + * @return 是否删除成功 + */ + public boolean deleteKey(String key) { + final KieConfigEntity configEntity = this.queryTargetKeyId(key); + if (configEntity == null) { + return false; + } + String address = this.url + "/" + configEntity.getId(); + restTemplate.delete(address); + return true; + } + + /** + * 添加新配置 + * + * @param key 键 + * @param value 值 + * @return 是否添加成功 + */ + public boolean addKey(String key, String value) { + HttpHeaders headers = new HttpHeaders(); + MediaType type = MediaType.parseMediaType("application/json; charset=UTF-8"); + headers.setContentType(type); + headers.add("Accept", MediaType.APPLICATION_JSON.toString()); + + HttpEntity entity = new HttpEntity<>(JSONObject.toJSONString(new KieRequest(key, value, labels)), headers); + + try { + ResponseEntity responseEntity = restTemplate.postForEntity(url, entity, String.class); + return responseEntity.getStatusCode() == HttpStatus.OK; + } catch (HttpClientErrorException ex) { + // ignored + } + return false; + } + + private KieConfigEntity queryTargetKeyId(String key) { + final List query = query(null); + return query.stream().filter(kieConfigEntity -> kieConfigEntity.getKey().equals(key)) + .findFirst().orElse(null); + } + + /** + * 根据标签查询kie配置 + * + * @param labels 新增标签 + * @return kie响应 + */ + public List query(Map labels) { + Map curLabels = labels; + if (labels == null) { + curLabels = this.labels; + } + if (curLabels == null) { + curLabels = new HashMap<>(); + curLabels.put("app", "default"); + curLabels.put("environment", "development"); + } + final String labelGroup = LabelGroupUtils.createLabelGroup(curLabels); + final String labelCondition = LabelGroupUtils.getLabelCondition(labelGroup); + String address = this.url + "?" + labelCondition + "&match=exact&revision="; + final ResponseEntity configs = restTemplate.getForEntity(address, String.class); + JSONObject result = JSONObject.parseObject(configs.getBody()); + if (configs.getStatusCode().value() == 200) { + final KieResponse kieResponse = result.toJavaObject(KieResponse.class); + return kieResponse.getData(); + } + return Collections.emptyList(); + } + + /** + * kie请求 + * + * @since 2022-07-12 + */ + static class KieRequest implements Serializable { + private String key; + private String value; + private Map labels; + private String status = "enabled"; + + public KieRequest(String key, String value, Map labels) { + this.key = key; + this.value = value; + this.labels = labels; + if (this.labels == null) { + this.labels = new HashMap<>(); + this.labels.put("app", "default"); + this.labels.put("environment", "development"); + } + } + + public String getKey() { + return key; + } + + public void setKey(String key) { + this.key = key; + } + + public String getValue() { + return value; + } + + public void setValue(String value) { + this.value = value; + } + + public Map getLabels() { + return labels; + } + + public void setLabels(Map labels) { + this.labels = labels; + } + + public String getStatus() { + return status; + } + + public void setStatus(String status) { + this.status = status; + } + } +} diff --git a/sermant-integration-tests/mq-grayscale-rocketmq-test/grayscale-rocketmq-integration-test/src/test/java/io/sermant/demo/grayscale/rocketmq/integration/support/entity/KieConfigEntity.java b/sermant-integration-tests/mq-grayscale-rocketmq-test/grayscale-rocketmq-integration-test/src/test/java/io/sermant/demo/grayscale/rocketmq/integration/support/entity/KieConfigEntity.java new file mode 100644 index 0000000000..d5035f3df2 --- /dev/null +++ b/sermant-integration-tests/mq-grayscale-rocketmq-test/grayscale-rocketmq-integration-test/src/test/java/io/sermant/demo/grayscale/rocketmq/integration/support/entity/KieConfigEntity.java @@ -0,0 +1,94 @@ +/* + * Copyright (C) 2024-2024 Sermant Authors. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.sermant.demo.grayscale.rocketmq.integration.support.entity; + +import java.util.HashMap; +import java.util.Map; + +/** + * Kie response entity + * + * @author chengyouling + * @since 2024-10-30 + */ +public class KieConfigEntity { + private String id; + private String key; + private Map labels = new HashMap(); + private String value; + private String valueType; + private String status; + + public String getId() { + return id; + } + + public void setId(String id) { + this.id = id; + } + + public String getKey() { + return key; + } + + public void setKey(String key) { + this.key = key; + } + + public Map getLabels() { + return labels; + } + + public void setLabels(Map labels) { + this.labels = labels; + } + + public String getValue() { + return value; + } + + public void setValue(String value) { + this.value = value; + } + + public String getValueType() { + return valueType; + } + + public void setValueType(String valueType) { + this.valueType = valueType; + } + + public String getStatus() { + return status; + } + + public void setStatus(String status) { + this.status = status; + } + + @Override + public String toString() { + return "KieConfigEntity{" + + "id='" + id + '\'' + + ", key='" + key + '\'' + + ", labels=" + labels + + ", value='" + value + '\'' + + ", valueType='" + valueType + '\'' + + ", status='" + status + '\'' + + '}'; + } +} diff --git a/sermant-integration-tests/mq-grayscale-rocketmq-test/grayscale-rocketmq-integration-test/src/test/java/io/sermant/demo/grayscale/rocketmq/integration/support/entity/KieResponse.java b/sermant-integration-tests/mq-grayscale-rocketmq-test/grayscale-rocketmq-integration-test/src/test/java/io/sermant/demo/grayscale/rocketmq/integration/support/entity/KieResponse.java new file mode 100644 index 0000000000..47f437c15c --- /dev/null +++ b/sermant-integration-tests/mq-grayscale-rocketmq-test/grayscale-rocketmq-integration-test/src/test/java/io/sermant/demo/grayscale/rocketmq/integration/support/entity/KieResponse.java @@ -0,0 +1,37 @@ +/* + * Copyright (C) 2024-2024 Sermant Authors. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.sermant.demo.grayscale.rocketmq.integration.support.entity; + +import java.util.List; + +/** + * Kie response + * + * @author chengyouling + * @since 2024-10-30 + */ +public class KieResponse { + private List data; + + public List getData() { + return data; + } + + public void setData(List data) { + this.data = data; + } +} diff --git a/sermant-integration-tests/mq-grayscale-rocketmq-test/grayscale-rocketmq-integration-test/src/test/java/io/sermant/demo/grayscale/rocketmq/integration/support/utils/LabelGroupUtils.java b/sermant-integration-tests/mq-grayscale-rocketmq-test/grayscale-rocketmq-integration-test/src/test/java/io/sermant/demo/grayscale/rocketmq/integration/support/utils/LabelGroupUtils.java new file mode 100644 index 0000000000..daabdfd9b2 --- /dev/null +++ b/sermant-integration-tests/mq-grayscale-rocketmq-test/grayscale-rocketmq-integration-test/src/test/java/io/sermant/demo/grayscale/rocketmq/integration/support/utils/LabelGroupUtils.java @@ -0,0 +1,170 @@ +/* + * Copyright (C) 2024-2024 Sermant Authors. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.sermant.demo.grayscale.rocketmq.integration.support.utils; + +import java.io.UnsupportedEncodingException; +import java.net.URLDecoder; +import java.util.ArrayList; +import java.util.Collections; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; + +/** + * group生成工具 + * + * @author zhouss + * @since 2021-11-23 + */ +public class LabelGroupUtils { + private static final String GROUP_SEPARATOR = "&"; + + private static final String KV_SEPARATOR = "="; + + /** + * 查询时使用的kv分隔符 + */ + private static final String LABEL_QUERY_SEPARATOR = ":"; + + /** + * 查询标签前缀 + */ + private static final String LABEL_PREFIX = "label="; + + /** + * 键值对长度 + */ + private static final int KV_LEN = 2; + + /** + * 默认组 + */ + private static final String DEFAULT_GROUP_KEY = "GROUP"; + + private LabelGroupUtils() { + } + + /** + * 创建标签组 + * + * @param labels 标签组 + * @return labelGroup 例如: app=sc&service=helloService + */ + public static String createLabelGroup(Map labels) { + if (labels == null || labels.isEmpty()) { + return ""; + } + final StringBuilder group = new StringBuilder(); + final List keys = new ArrayList<>(labels.keySet()); + + // 防止相同map因排序不同而导致最后的label不一致 + Collections.sort(keys); + for (String key : keys) { + String value = labels.get(key); + if (key == null || value == null) { + continue; + } + group.append(key).append(KV_SEPARATOR).append(value).append(GROUP_SEPARATOR); + } + if (group.length() == 0) { + return ""; + } + return group.deleteCharAt(group.length() - 1).toString(); + } + + /** + * 重组group, 防止因多个标签因顺序问题而导致group不同 + * + * @param group 标签组 + * @return group + */ + public static String rebuildGroup(String group) { + if (isLabelGroup(group)) { + return createLabelGroup(resolveGroupLabels(group)); + } + return LabelGroupUtils.createLabelGroup(Collections.singletonMap(DEFAULT_GROUP_KEY, group)); + } + + /** + * 是否为标签组key + * + * @param group 监听键 + * @return 是否为标签组 + */ + public static boolean isLabelGroup(String group) { + return group != null && group.contains(KV_SEPARATOR); + } + + /** + * 解析标签为map + * + * @param group 标签组 app=sc&service=helloService + * @return 标签键值对, 返回键值将会是有序的 + */ + public static Map resolveGroupLabels(String group) { + final Map result = new LinkedHashMap<>(); + if (group == null) { + return result; + } + String curGroup = group; + if (!isLabelGroup(curGroup)) { + // 如果非group标签(ZK配置中心场景适配),则为该group创建标签 + curGroup = LabelGroupUtils.createLabelGroup(Collections.singletonMap(DEFAULT_GROUP_KEY, curGroup)); + } + try { + final String decode = URLDecoder.decode(curGroup, "UTF-8"); + final String[] labels = decode.split("&"); + for (String label : labels) { + final String[] labelKv = label.split("="); + if (labelKv.length == KV_LEN) { + result.put(labelKv[0], labelKv[1]); + } else if (labelKv.length == 1) { + // 仅配置了KEY的情况, 使用空串代替 + result.put(labelKv[0], ""); + } + } + } catch (UnsupportedEncodingException ignored) { + // ignored + } + return result; + } + + /** + * 获取标签信息 + * + * @param group 分组 app=sc&service=helloService转换label=app:sc&label=service:helloService + * @return 标签组条件 + */ + public static String getLabelCondition(String group) { + if (group == null || "".equals(group)) { + return group; + } + String curGroup = rebuildGroup(group); + final Map labels = resolveGroupLabels(curGroup); + final StringBuilder finalGroup = new StringBuilder(); + for (Map.Entry entry : labels.entrySet()) { + finalGroup.append(LABEL_PREFIX) + .append(buildSingleLabel(entry.getKey(), entry.getValue())) + .append(GROUP_SEPARATOR); + } + return finalGroup.deleteCharAt(finalGroup.length() - 1).toString(); + } + + private static String buildSingleLabel(String key, String value) { + return key + LABEL_QUERY_SEPARATOR + value; + } +} diff --git a/sermant-integration-tests/mq-grayscale-rocketmq-test/grayscale-rocketmq-producer-demo/pom.xml b/sermant-integration-tests/mq-grayscale-rocketmq-test/grayscale-rocketmq-producer-demo/pom.xml new file mode 100644 index 0000000000..98a7d47520 --- /dev/null +++ b/sermant-integration-tests/mq-grayscale-rocketmq-test/grayscale-rocketmq-producer-demo/pom.xml @@ -0,0 +1,49 @@ + + + + mq-grayscale-rocketmq-test + io.sermant.integration + 1.0.0 + + 4.0.0 + + grayscale-rocketmq-producer-demo + 1.0.0 + + + 8 + 8 + + + + + org.springframework.boot + spring-boot-starter-web + + + org.apache.rocketmq + rocketmq-client + + + org.apache.rocketmq + rocketmq-common + + + org.springframework.boot + spring-boot-starter-actuator + + + + + ${project.artifactId} + + + org.springframework.boot + spring-boot-maven-plugin + + + + + diff --git a/sermant-integration-tests/mq-grayscale-rocketmq-test/grayscale-rocketmq-producer-demo/src/main/java/io/sermant/demo/grayscale/rocketmq/producer/MqProducerController.java b/sermant-integration-tests/mq-grayscale-rocketmq-test/grayscale-rocketmq-producer-demo/src/main/java/io/sermant/demo/grayscale/rocketmq/producer/MqProducerController.java new file mode 100644 index 0000000000..8e1a194e56 --- /dev/null +++ b/sermant-integration-tests/mq-grayscale-rocketmq-test/grayscale-rocketmq-producer-demo/src/main/java/io/sermant/demo/grayscale/rocketmq/producer/MqProducerController.java @@ -0,0 +1,76 @@ +/* + * Copyright (C) 2024-2024 Sermant Authors. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.sermant.demo.grayscale.rocketmq.producer; + +import org.apache.rocketmq.client.exception.MQBrokerException; +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.client.producer.DefaultMQProducer; +import org.apache.rocketmq.common.message.Message; +import org.apache.rocketmq.remoting.exception.RemotingException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.RequestParam; +import org.springframework.web.bind.annotation.RestController; + +import java.nio.charset.StandardCharsets; + +/** + * producer controller + * + * @author chengyouling + * @since 2024-10-30 + */ +@RestController +public class MqProducerController { + private static final Logger LOGGER = LoggerFactory.getLogger(MqProducerController.class); + + @Value("${rocketmq.address}") + private String mqAddress; + + @Value("${rocketmq.topic}") + private String mqTopic; + + private DefaultMQProducer defaultMqProducer; + + private final int sendMsgTimeout = 60000; + + /** + * producer message + * + * @param message message + * @return is success + */ + @GetMapping("/producerMessage") + public String producerMessage(@RequestParam("message") String message) { + try { + if (defaultMqProducer == null) { + defaultMqProducer = new DefaultMQProducer("default"); + defaultMqProducer.setNamesrvAddr(mqAddress); + defaultMqProducer.setSendMsgTimeout(sendMsgTimeout); + defaultMqProducer.start(); + } + Message mqMessage = new Message(mqTopic, message.getBytes(StandardCharsets.UTF_8)); + defaultMqProducer.send(mqMessage); + } catch (MQClientException | RemotingException | MQBrokerException | InterruptedException e) { + LOGGER.error("send message error, address={}, message={}", mqAddress, message, e); + return "error"; + } + return "success"; + } +} diff --git a/sermant-integration-tests/mq-grayscale-rocketmq-test/grayscale-rocketmq-producer-demo/src/main/java/io/sermant/demo/grayscale/rocketmq/producer/RocketMqProducerApplication.java b/sermant-integration-tests/mq-grayscale-rocketmq-test/grayscale-rocketmq-producer-demo/src/main/java/io/sermant/demo/grayscale/rocketmq/producer/RocketMqProducerApplication.java new file mode 100644 index 0000000000..4c9d33b13d --- /dev/null +++ b/sermant-integration-tests/mq-grayscale-rocketmq-test/grayscale-rocketmq-producer-demo/src/main/java/io/sermant/demo/grayscale/rocketmq/producer/RocketMqProducerApplication.java @@ -0,0 +1,38 @@ +/* + * Copyright (C) 2024-2024 Sermant Authors. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.sermant.demo.grayscale.rocketmq.producer; + +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; + +/** + * springboot starer + * + * @author chengyouling + * @since 2024-10-30 + **/ +@SpringBootApplication +public class RocketMqProducerApplication { + /** + * 启动类 + * + * @param args 进程启动入参 + */ + public static void main(String[] args) { + SpringApplication.run(RocketMqProducerApplication.class, args); + } +} diff --git a/sermant-integration-tests/mq-grayscale-rocketmq-test/grayscale-rocketmq-producer-demo/src/main/resources/application.yaml b/sermant-integration-tests/mq-grayscale-rocketmq-test/grayscale-rocketmq-producer-demo/src/main/resources/application.yaml new file mode 100644 index 0000000000..5351a02721 --- /dev/null +++ b/sermant-integration-tests/mq-grayscale-rocketmq-test/grayscale-rocketmq-producer-demo/src/main/resources/application.yaml @@ -0,0 +1,10 @@ +server: + port: 9030 +rocketmq: + address: 127.0.0.1:9876 + topic: MESSAGE-GRAY +management: + endpoints: + web: + exposure: + include: "*" diff --git a/sermant-integration-tests/mq-grayscale-rocketmq-test/pom.xml b/sermant-integration-tests/mq-grayscale-rocketmq-test/pom.xml new file mode 100644 index 0000000000..6f304c1094 --- /dev/null +++ b/sermant-integration-tests/mq-grayscale-rocketmq-test/pom.xml @@ -0,0 +1,61 @@ + + + + org.springframework.boot + spring-boot-starter-parent + 2.7.15 + + + 4.0.0 + + io.sermant.integration + mq-grayscale-rocketmq-test + 1.0.0 + pom + + + 8 + 8 + 2.7.15 + 2021.0.9 + 5.0.0 + + + + + + org.springframework.cloud + spring-cloud-dependencies + ${spring-cloud.version} + pom + import + + + org.springframework.boot + spring-boot-dependencies + ${spring-boot.version} + pom + import + + + org.apache.rocketmq + rocketmq-client + ${rocketmq-client.version} + + + org.apache.rocketmq + rocketmq-common + ${rocketmq-client.version} + + + + + + grayscale-rocketmq-integration-test + grayscale-rocketmq-producer-demo + grayscale-rocketmq-consumer-demo + + + diff --git a/sermant-integration-tests/pom.xml b/sermant-integration-tests/pom.xml index 2883cd1e41..d426e3f621 100644 --- a/sermant-integration-tests/pom.xml +++ b/sermant-integration-tests/pom.xml @@ -21,5 +21,6 @@ mq-consume-prohibition-test database-write-prohibition-test xds-service-test + mq-grayscale-rocketmq-test