From 7f1ea19d3e3d78c385947f28b0223546d933883f Mon Sep 17 00:00:00 2001 From: chengyouling Date: Thu, 7 Nov 2024 10:58:01 +0800 Subject: [PATCH] =?UTF-8?q?=E5=A2=9E=E5=8A=A0rocketmq=E6=8F=92=E4=BB=B6?= =?UTF-8?q?=E6=B5=81=E6=B0=B4=E6=B5=8B=E8=AF=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: chengyouling --- .../common/mq-grayscale/rocketmq/action.yml | 59 ++ .../common/plugin-change-check/action.yml | 23 + .../mq-grayscale/rocketmq/action.yml | 86 +++ .../message_gray_integration_test.yml | 288 +++++++++ .../grayscale-rocketmq-consumer-demo/pom.xml | 49 ++ .../consumer/RocketMqConsumerApplication.java | 38 ++ .../consumer/RocketMqConsumerController.java | 126 ++++ .../consumer/RocketMqLitePullConsumer.java | 116 ++++ .../consumer/RocketMqMessageUtils.java | 73 +++ .../consumer/RocketMqPullConsumer.java | 153 +++++ .../consumer/RocketMqPushConsumer.java | 92 +++ .../src/main/resources/application.yaml | 10 + .../pom.xml | 40 ++ .../integration/GrayscaleRocketmqTest.java | 552 ++++++++++++++++++ .../integration/support/KieClient.java | 251 ++++++++ .../support/entity/KieConfigEntity.java | 94 +++ .../support/entity/KieResponse.java | 37 ++ .../support/utils/LabelGroupUtils.java | 170 ++++++ .../grayscale-rocketmq-producer-demo/pom.xml | 49 ++ .../producer/MqProducerController.java | 139 +++++ .../producer/RocketMqProducerApplication.java | 38 ++ .../src/main/resources/application.yaml | 10 + .../mq-grayscale-rocketmq-test/pom.xml | 61 ++ sermant-integration-tests/pom.xml | 1 + 24 files changed, 2555 insertions(+) create mode 100644 .github/actions/common/mq-grayscale/rocketmq/action.yml create mode 100644 .github/actions/scenarios/mq-grayscale/rocketmq/action.yml create mode 100644 .github/workflows/message_gray_integration_test.yml create mode 100644 sermant-integration-tests/mq-grayscale-rocketmq-test/grayscale-rocketmq-consumer-demo/pom.xml create mode 100644 sermant-integration-tests/mq-grayscale-rocketmq-test/grayscale-rocketmq-consumer-demo/src/main/java/io/sermant/demo/grayscale/rocketmq/consumer/RocketMqConsumerApplication.java create mode 100644 sermant-integration-tests/mq-grayscale-rocketmq-test/grayscale-rocketmq-consumer-demo/src/main/java/io/sermant/demo/grayscale/rocketmq/consumer/RocketMqConsumerController.java create mode 100644 sermant-integration-tests/mq-grayscale-rocketmq-test/grayscale-rocketmq-consumer-demo/src/main/java/io/sermant/demo/grayscale/rocketmq/consumer/RocketMqLitePullConsumer.java create mode 100644 sermant-integration-tests/mq-grayscale-rocketmq-test/grayscale-rocketmq-consumer-demo/src/main/java/io/sermant/demo/grayscale/rocketmq/consumer/RocketMqMessageUtils.java create mode 100644 sermant-integration-tests/mq-grayscale-rocketmq-test/grayscale-rocketmq-consumer-demo/src/main/java/io/sermant/demo/grayscale/rocketmq/consumer/RocketMqPullConsumer.java create mode 100644 sermant-integration-tests/mq-grayscale-rocketmq-test/grayscale-rocketmq-consumer-demo/src/main/java/io/sermant/demo/grayscale/rocketmq/consumer/RocketMqPushConsumer.java create mode 100644 sermant-integration-tests/mq-grayscale-rocketmq-test/grayscale-rocketmq-consumer-demo/src/main/resources/application.yaml create mode 100644 sermant-integration-tests/mq-grayscale-rocketmq-test/grayscale-rocketmq-integration-test/pom.xml create mode 100644 sermant-integration-tests/mq-grayscale-rocketmq-test/grayscale-rocketmq-integration-test/src/test/java/io/sermant/demo/grayscale/rocketmq/integration/GrayscaleRocketmqTest.java create mode 100644 sermant-integration-tests/mq-grayscale-rocketmq-test/grayscale-rocketmq-integration-test/src/test/java/io/sermant/demo/grayscale/rocketmq/integration/support/KieClient.java create mode 100644 sermant-integration-tests/mq-grayscale-rocketmq-test/grayscale-rocketmq-integration-test/src/test/java/io/sermant/demo/grayscale/rocketmq/integration/support/entity/KieConfigEntity.java create mode 100644 sermant-integration-tests/mq-grayscale-rocketmq-test/grayscale-rocketmq-integration-test/src/test/java/io/sermant/demo/grayscale/rocketmq/integration/support/entity/KieResponse.java create mode 100644 sermant-integration-tests/mq-grayscale-rocketmq-test/grayscale-rocketmq-integration-test/src/test/java/io/sermant/demo/grayscale/rocketmq/integration/support/utils/LabelGroupUtils.java create mode 100644 sermant-integration-tests/mq-grayscale-rocketmq-test/grayscale-rocketmq-producer-demo/pom.xml create mode 100644 sermant-integration-tests/mq-grayscale-rocketmq-test/grayscale-rocketmq-producer-demo/src/main/java/io/sermant/demo/grayscale/rocketmq/producer/MqProducerController.java create mode 100644 sermant-integration-tests/mq-grayscale-rocketmq-test/grayscale-rocketmq-producer-demo/src/main/java/io/sermant/demo/grayscale/rocketmq/producer/RocketMqProducerApplication.java create mode 100644 sermant-integration-tests/mq-grayscale-rocketmq-test/grayscale-rocketmq-producer-demo/src/main/resources/application.yaml create mode 100644 sermant-integration-tests/mq-grayscale-rocketmq-test/pom.xml diff --git a/.github/actions/common/mq-grayscale/rocketmq/action.yml b/.github/actions/common/mq-grayscale/rocketmq/action.yml new file mode 100644 index 0000000000..122d83f558 --- /dev/null +++ b/.github/actions/common/mq-grayscale/rocketmq/action.yml @@ -0,0 +1,59 @@ +name: "Rocketmq Message Gray Common operations" +description: "Do something common for rocketmq message gray" +runs: + using: "composite" + steps: + - name: Set up JDK ${{ env.javaVersion }} + uses: actions/setup-java@v4 + with: + java-version: ${{ env.javaVersion }} + distribution: 'adopt' + cache: maven + - name: download agent + uses: actions/cache@v4 + with: + path: sermant-agent-*/ + key: ${{ runner.os }}-agent-${{ github.run_id }} + - name: get cse from cache + uses: actions/cache@v4 + with: + path: Local-CSE-2.1.3-linux-amd64.zip + key: ${{ runner.os }}-local-cse + restore-keys: | + ${{ runner.os }}-local-cse + - name: start cse + shell: bash + run: | + export ROOT_PATH=$(pwd) + bash ./sermant-integration-tests/scripts/startCse.sh + - name: get rocketmq from cache + uses: actions/cache@v4 + with: + path: rocketmq-all-5.1.4-bin-release.zip + key: ${{ runner.os }}-rocketmq-all-5.1.4-bin-release + restore-keys: | + ${{ runner.os }}-rocketmq-all-5.1.4-bin-release + - name: start rocketmq server + shell: bash + run: | + unzip rocketmq-all-5.1.4-bin-release.zip + sed -i 's/if \[\[ "$JAVA_MAJOR_VERSION" -lt "9" \]\]/if [ "$JAVA_MAJOR_VERSION" -lt "9" ]/g' rocketmq-all-5.1.4-bin-release/bin/runserver.sh + sed -i 's/-Xms4g -Xmx4g -Xmn2g/-Xms1g -Xmx1g -Xmn1g/g' rocketmq-all-5.1.4-bin-release/bin/runserver.sh + sed -i '22i enablePropertyFilter = true' rocketmq-all-5.1.4-bin-release/conf/broker.conf + nohup bash rocketmq-all-5.1.4-bin-release/bin/mqnamesrv & + - name: start rocketmq broker + shell: bash + run: | + sed -i 's/-Xms8g -Xmx8g -Xmn4g/-Xms1g -Xmx1g -Xmn1g/g' rocketmq-all-5.1.4-bin-release/bin/runbroker.sh + nohup bash rocketmq-all-5.1.4-bin-release/bin/mqbroker -n localhost:9876 -c rocketmq-all-5.1.4-bin-release/conf/broker.conf & + - name: cache dependencies + uses: actions/cache@v4 + with: + path: ~/.m2/repository + key: ${{ runner.os }}-maven-${{ hashFiles('**/pom.xml') }} + restore-keys: | + ${{ runner.os }}-maven- + - name: entry + uses: ./.github/actions/common/entry + with: + log-dir: ./logs/rocketmq-grayscale/common diff --git a/.github/actions/common/plugin-change-check/action.yml b/.github/actions/common/plugin-change-check/action.yml index 39cd81b572..cf88e6842f 100644 --- a/.github/actions/common/plugin-change-check/action.yml +++ b/.github/actions/common/plugin-change-check/action.yml @@ -494,6 +494,22 @@ runs: shell: bash run: | echo "crossthreadTagTransmissionChanged=${{ steps.changed-tag-transmission-crossthread.outputs.changed }}" >> $GITHUB_ENV + - uses: ktamas77/has-changed-path@v1.0.3 + id: changed-mq-grayscale-rocketmq + with: + paths: sermant-plugins/sermant-mq-grayscale/mq-config-common + sermant-plugins/sermant-mq-grayscale/mq-config-service + sermant-plugins/sermant-mq-grayscale/mq-grayscale-rocketmq-plugin + sermant-integration-tests/mq-grayscale-rocketmq-test/grayscale-rocketmq-consumer-demo + sermant-integration-tests/mq-grayscale-rocketmq-test/grayscale-rocketmq-producer-demo + sermant-integration-tests/mq-grayscale-rocketmq-test/grayscale-rocketmq-integration-test + ./.github/workflows/message_gray_integration_test.yml + ./.github/actions/common/mq-grayscale/rocketmq + ./.github/actions/scenarios/mq-grayscale/rocketmq + - name: env mq-grayscale-rocketmq + shell: bash + run: | + echo "mqGrayscaleRocketMqChanged=${{ steps.changed-mq-grayscale-rocketmq.outputs.changed }}" >> $GITHUB_ENV - uses: ktamas77/has-changed-path@v1.0.3 id: changed-dubbo-router-action with: @@ -924,3 +940,10 @@ runs: ${{ steps.changed-common-action.outputs.changed }} == 'true' -o ${{ env.triggerPushEvent }} == 'true' ];then echo "enableXdsServicAction=true" >> $GITHUB_ENV fi + + # ==========mq grayscale rocketmq is needed to test?========== + if [ ${{ env.mqGrayscaleRocketMqChanged }} == 'true' -o \ + ${{ env.sermantAgentCoreChanged }} == 'true' -o \ + ${{ steps.changed-common-action.outputs.changed }} == 'true' -o ${{ env.triggerPushEvent }} == 'true' ];then + echo "enableMqGrayscaleRocketMqAction=true" >> $GITHUB_ENV + fi diff --git a/.github/actions/scenarios/mq-grayscale/rocketmq/action.yml b/.github/actions/scenarios/mq-grayscale/rocketmq/action.yml new file mode 100644 index 0000000000..5d2aafcef7 --- /dev/null +++ b/.github/actions/scenarios/mq-grayscale/rocketmq/action.yml @@ -0,0 +1,86 @@ +name: "RocketMq Grayscale Test" +description: "Auto test for rocketMq grayscale" +runs: + using: "composite" + steps: + - name: package rocketmq grayscale tests + shell: bash + run: mvn package -Drocketmq-client.version=${{ matrix.rocketMqClientVersion }} -DskipTests --file sermant-integration-tests/mq-grayscale-rocketmq-test/pom.xml + - name: echo test model + shell: bash + run: | + echo "=======test-model======"-${{ matrix.test-model }} + - name: start base producer service + shell: bash + env: + dynamic.config.dynamicConfigType: KIE + dynamic.config.serverAddress: 127.0.0.1:30110 + SERVICE_META_ENVIRONMENT: development + run: | + nohup java -javaagent:sermant-agent-${{ env.sermantVersion }}/agent/sermant-agent.jar=appName=grayscale-rocketmq-producer -jar \ + sermant-integration-tests/mq-grayscale-rocketmq-test/grayscale-rocketmq-producer-demo/target/grayscale-rocketmq-producer-demo.jar > ${{ env.logDir }}/${{ matrix.rocketMqClientVersion }}-rocketmq-producer-base.log 2>&1 & + - name: start gray producer service + shell: bash + env: + dynamic.config.dynamicConfigType: KIE + dynamic.config.serverAddress: 127.0.0.1:30110 + SERVICE_META_ENVIRONMENT: development + SERVICE_META_VERSION: 1.0.1 + run: | + nohup java -javaagent:sermant-agent-${{ env.sermantVersion }}/agent/sermant-agent.jar=appName=grayscale-rocketmq-producer -Dserver.port=9040 -jar \ + sermant-integration-tests/mq-grayscale-rocketmq-test/grayscale-rocketmq-producer-demo/target/grayscale-rocketmq-producer-demo.jar > ${{ env.logDir }}/${{ matrix.rocketMqClientVersion }}-rocketmq-producer-gray.log 2>&1 & + - name: start base consumer service + shell: bash + env: + dynamic.config.dynamicConfigType: KIE + dynamic.config.serverAddress: 127.0.0.1:30110 + SERVICE_META_ENVIRONMENT: development + run: | + nohup java -javaagent:sermant-agent-${{ env.sermantVersion }}/agent/sermant-agent.jar=appName=grayscale-rocketmq-consumer -jar \ + sermant-integration-tests/mq-grayscale-rocketmq-test/grayscale-rocketmq-consumer-demo/target/grayscale-rocketmq-consumer-demo.jar > ${{ env.logDir }}/${{ matrix.rocketMqClientVersion }}-rocketmq-consumer-base.log 2>&1 & + - name: start gray consumer service + shell: bash + if: matrix.test-model == 'AUTO_BASE_GRAY' || matrix.test-model == 'AUTO_EXC_BASE_GRAY' || matrix.test-model == 'BASE_BASE_GRAY' || matrix.test-model == 'BASE_EXC_BASE_GRAY' + env: + dynamic.config.dynamicConfigType: KIE + dynamic.config.serverAddress: 127.0.0.1:30110 + SERVICE_META_ENVIRONMENT: development + SERVICE_META_VERSION: 1.0.1 + run: | + nohup java -javaagent:sermant-agent-${{ env.sermantVersion }}/agent/sermant-agent.jar=appName=grayscale-rocketmq-consumer -Dserver.port=9010 -jar \ + sermant-integration-tests/mq-grayscale-rocketmq-test/grayscale-rocketmq-consumer-demo/target/grayscale-rocketmq-consumer-demo.jar > ${{ env.logDir }}/${{ matrix.rocketMqClientVersion }}-rocketmq-consumer-gray.log 2>&1 & + - name: waiting for service start for only base + shell: bash + if: matrix.test-model != 'AUTO_BASE_GRAY' && matrix.test-model != 'AUTO_EXC_BASE_GRAY' && matrix.test-model != 'BASE_BASE_GRAY' && matrix.test-model != 'BASE_EXC_BASE_GRAY' + run: | + bash ./sermant-integration-tests/scripts/checkService.sh http://127.0.0.1:9030/actuator/health 200 + bash ./sermant-integration-tests/scripts/checkService.sh http://127.0.0.1:9040/actuator/health 200 + bash ./sermant-integration-tests/scripts/checkService.sh http://127.0.0.1:9000/actuator/health 200 + - name: waiting for service start for base and gray + shell: bash + if: matrix.test-model == 'AUTO_BASE_GRAY' || matrix.test-model == 'AUTO_EXC_BASE_GRAY' || matrix.test-model == 'BASE_BASE_GRAY' || matrix.test-model == 'BASE_EXC_BASE_GRAY' + run: | + bash ./sermant-integration-tests/scripts/checkService.sh http://127.0.0.1:9030/actuator/health 200 + bash ./sermant-integration-tests/scripts/checkService.sh http://127.0.0.1:9040/actuator/health 200 + bash ./sermant-integration-tests/scripts/checkService.sh http://127.0.0.1:9000/actuator/health 200 + bash ./sermant-integration-tests/scripts/checkService.sh http://127.0.0.1:9010/actuator/health 200 + - name: test message grayscale rocketmq + shell: bash + run: | + mvn test -Dgrayscale.rocketmq.integration.test.type=${{ matrix.test-model }} --file \ + sermant-integration-tests/mq-grayscale-rocketmq-test/grayscale-rocketmq-integration-test/pom.xml + - name: exit + if: always() + uses: ./.github/actions/common/exit + with: + processor-keyword: grayscale-rocketmq + - name: if failure then upload error log + uses: actions/upload-artifact@v3 + if: ${{ failure() || cancelled() }} + with: + name: (test-for-grayscale-rocketmq)-(${{ matrix.test-model }}})-logs + path: | + ./*.log + ./logs/**/*.log + if-no-files-found: warn + retention-days: 2 diff --git a/.github/workflows/message_gray_integration_test.yml b/.github/workflows/message_gray_integration_test.yml new file mode 100644 index 0000000000..38a855f969 --- /dev/null +++ b/.github/workflows/message_gray_integration_test.yml @@ -0,0 +1,288 @@ +name: Message gray integration test +env: + sermantVersion: 1.0.0 +on: + push: + pull_request: + branches: + - '*' + paths: + - 'sermant-agentcore/**' + - 'sermant-integration-tests/mq-grayscale-rocketmq-test/**' + - 'sermant-plugins/sermant-mq-grayscale/**' + - '.github/workflows/message_gray_integration_test.yml' + - '.github/actions/common/mq-grayscale/rocketmq/**' + - '.github/actions/scenarios/mq-grayscale/rocketmq/**' + - '.github/actions/common/plugin-change-check/action.yml' + - '.github/actions/common/entry/action.yml' + - '.github/actions/common/exit/action.yml' +concurrency: + group: ${{ github.workflow }}-${{ github.event.pull_request.number || github.ref }}-${{ github.head_ref }} + cancel-in-progress: true +jobs: + set-execution-conditions: + name: set-execution-conditions + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v3 + with: + fetch-depth: 100 + - name: plugin-change-check + id: plugin-change-check + uses: ./.github/actions/common/plugin-change-check + - name: set-outputs + id: set-outputs + run: | + echo "enableMqGrayscaleRocketMqAction=${{env.enableMqGrayscaleRocketMqAction}}" >> $GITHUB_OUTPUT + outputs: + enableMqGrayscaleRocketMqAction: ${{ steps.set-outputs.outputs.enableMqGrayscaleRocketMqAction }} + download-midwares-and-cache: + name: download midwares and cache + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v3 + - name: cache local cse + uses: actions/cache@v3 + with: + path: Local-CSE-2.1.3-linux-amd64.zip + key: ${{ runner.os }}-local-cse + restore-keys: | + ${{ runner.os }}-local-cse + - name: download cse + run: | + export ROOT_PATH=$(pwd) + bash ./sermant-integration-tests/scripts/tryDownloadMidware.sh cse + - name: cache rocketmq + uses: actions/cache@v3 + with: + path: rocketmq-all-5.1.4-bin-release.zip + key: ${{ runner.os }}-rocketmq-all-5.1.4-bin-release + restore-keys: | + ${{ runner.os }}-rocketmq-all-5.1.4-bin-release + - name: download rocketmq + run: | + export ROOT_PATH=$(pwd) + bash ./sermant-integration-tests/scripts/tryDownloadMidware.sh rocketmq514 + build-agent-and-cache: + name: build agent and cache + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v3 + - name: Set up JDK 8 + uses: actions/setup-java@v3 + with: + java-version: '8' + distribution: 'adopt' + cache: maven + - name: cache agent + uses: actions/cache@v3 + with: + path: sermant-agent-*/ + key: ${{ runner.os }}-agent-${{ github.run_id }} + - name: package agent + run: | + sed -i '/sermant-backend/d' pom.xml + sed -i '/sermant-integration-tests/d' pom.xml + sed -i '/sermant-injector/d' pom.xml + sed -i '/sermant-flowcontrol/d' sermant-plugins/pom.xml + sed -i '/sermant-database-write-prohibition/d' sermant-plugins/pom.xml + sed -i '/sermant-spring-beans-deal/d' sermant-plugins/pom.xml + sed -i '/sermant-service-removal/d' sermant-plugins/pom.xml + sed -i '/sermant-service-visibility/d' sermant-plugins/pom.xml + sed -i '/sermant-monitor/d' sermant-plugins/pom.xml + sed -i '/sermant-mq-consume-prohibition/d' sermant-plugins/pom.xml + sed -i '/sermant-springboot-registry/d' sermant-plugins/pom.xml + sed -i '/sermant-service-registry/d' sermant-plugins/pom.xml + sed -i '/sermant-dynamic-config/d' sermant-plugins/pom.xml + sed -i '/sermant-router/d' sermant-plugins/pom.xml + sed -i '/sermant-loadbalancer/d' sermant-plugins/pom.xml + mvn package -DskipTests -Ptest --file pom.xml + test-for-plugin-false: + name: Test for grayscale rocketmq pull plugin false + runs-on: ubuntu-latest + if: needs.set-execution-conditions.outputs.enableMqGrayscaleRocketMqAction=='true' + needs: [set-execution-conditions, build-agent-and-cache, download-midwares-and-cache] + strategy: + matrix: + test-model: ['PLUGIN_ENABLED_FALSE'] + rocketMqClientVersion: ["4.8.0", "4.9.8", "5.1.4", "5.2.0", "5.3.1"] + fail-fast: false + steps: + - uses: actions/checkout@v3 + with: + fetch-depth: 100 + - name: set java version to environment + run: | + echo "javaVersion=8" >> $GITHUB_ENV + - name: common operations + uses: ./.github/actions/common/mq-grayscale/rocketmq + - name: message gray test for test-model=${{ matrix.test-model }} rocketMqClientVersion=${{ matrix.rocketMqClientVersion }} + uses: ./.github/actions/scenarios/mq-grayscale/rocketmq + test-for-auto-only-base: + name: Test for pull base only for auto rule + runs-on: ubuntu-latest + if: needs.set-execution-conditions.outputs.enableMqGrayscaleRocketMqAction=='true' + needs: [set-execution-conditions, build-agent-and-cache, download-midwares-and-cache, test-for-plugin-false] + strategy: + matrix: + test-model: ['AUTO_ONLY_BASE'] + rocketMqClientVersion: ["4.8.0", "4.9.8", "5.1.4", "5.2.0", "5.3.1"] + fail-fast: false + steps: + - uses: actions/checkout@v3 + with: + fetch-depth: 100 + - name: set java version to environment + run: | + echo "javaVersion=8" >> $GITHUB_ENV + - name: common operations + uses: ./.github/actions/common/mq-grayscale/rocketmq + - name: message gray test for test-model=${{ matrix.test-model }} rocketMqClientVersion=${{ matrix.rocketMqClientVersion }} + uses: ./.github/actions/scenarios/mq-grayscale/rocketmq + test-for-auto-exc-only-base: + name: Test for pull base only for auto exclude rule + runs-on: ubuntu-latest + if: needs.set-execution-conditions.outputs.enableMqGrayscaleRocketMqAction=='true' + needs: [set-execution-conditions, build-agent-and-cache, download-midwares-and-cache, test-for-auto-only-base] + strategy: + matrix: + test-model: ['AUTO_EXC_ONLY_BASE'] + rocketMqClientVersion: ["4.8.0", "4.9.8", "5.1.4", "5.2.0", "5.3.1"] + fail-fast: false + steps: + - uses: actions/checkout@v3 + with: + fetch-depth: 100 + - name: set java version to environment + run: | + echo "javaVersion=8" >> $GITHUB_ENV + - name: common operations + uses: ./.github/actions/common/mq-grayscale/rocketmq + - name: message gray test for test-model=${{ matrix.test-model }} rocketMqClientVersion=${{ matrix.rocketMqClientVersion }} + uses: ./.github/actions/scenarios/mq-grayscale/rocketmq + test-for-auto-base-gray: + name: Test for pull base gray for auto rule + runs-on: ubuntu-latest + if: needs.set-execution-conditions.outputs.enableMqGrayscaleRocketMqAction=='true' + needs: [set-execution-conditions, build-agent-and-cache, download-midwares-and-cache, test-for-auto-exc-only-base] + strategy: + matrix: + test-model: ['AUTO_BASE_GRAY'] + rocketMqClientVersion: ["4.8.0", "4.9.8", "5.1.4", "5.2.0", "5.3.1"] + fail-fast: false + steps: + - uses: actions/checkout@v3 + with: + fetch-depth: 100 + - name: set java version to environment + run: | + echo "javaVersion=8" >> $GITHUB_ENV + - name: common operations + uses: ./.github/actions/common/mq-grayscale/rocketmq + - name: message gray test for test-model=${{ matrix.test-model }} rocketMqClientVersion=${{ matrix.rocketMqClientVersion }} + uses: ./.github/actions/scenarios/mq-grayscale/rocketmq + test-for-auto-exc-base-gray: + name: Test for pull base gray for auto exclude rule + runs-on: ubuntu-latest + if: needs.set-execution-conditions.outputs.enableMqGrayscaleRocketMqAction=='true' + needs: [set-execution-conditions, build-agent-and-cache, download-midwares-and-cache, test-for-auto-base-gray] + strategy: + matrix: + test-model: ['AUTO_EXC_BASE_GRAY'] + rocketMqClientVersion: ["4.8.0", "4.9.8", "5.1.4", "5.2.0", "5.3.1"] + fail-fast: false + steps: + - uses: actions/checkout@v3 + with: + fetch-depth: 100 + - name: set java version to environment + run: | + echo "javaVersion=8" >> $GITHUB_ENV + - name: common operations + uses: ./.github/actions/common/mq-grayscale/rocketmq + - name: message gray test for test-model=${{ matrix.test-model }} rocketMqClientVersion=${{ matrix.rocketMqClientVersion }} + uses: ./.github/actions/scenarios/mq-grayscale/rocketmq + test-for-base-only-base: + name: Test for pull base only for base rule + runs-on: ubuntu-latest + if: needs.set-execution-conditions.outputs.enableMqGrayscaleRocketMqAction=='true' + needs: [set-execution-conditions, build-agent-and-cache, download-midwares-and-cache, test-for-auto-exc-base-gray] + strategy: + matrix: + test-model: ['BASE_ONLY_BASE'] + rocketMqClientVersion: ["4.8.0", "4.9.8", "5.1.4", "5.2.0", "5.3.1"] + fail-fast: false + steps: + - uses: actions/checkout@v3 + with: + fetch-depth: 100 + - name: set java version to environment + run: | + echo "javaVersion=8" >> $GITHUB_ENV + - name: common operations + uses: ./.github/actions/common/mq-grayscale/rocketmq + - name: message gray test for test-model=${{ matrix.test-model }} rocketMqClientVersion=${{ matrix.rocketMqClientVersion }} + uses: ./.github/actions/scenarios/mq-grayscale/rocketmq + test-for-base-exc-only-base: + name: Test for pull base only for base exclude rule + runs-on: ubuntu-latest + if: needs.set-execution-conditions.outputs.enableMqGrayscaleRocketMqAction=='true' + needs: [set-execution-conditions, build-agent-and-cache, download-midwares-and-cache, test-for-base-only-base] + strategy: + matrix: + test-model: ['BASE_EXC_ONLY_BASE'] + rocketMqClientVersion: ["4.8.0", "4.9.8", "5.1.4", "5.2.0", "5.3.1"] + fail-fast: false + steps: + - uses: actions/checkout@v3 + with: + fetch-depth: 100 + - name: set java version to environment + run: | + echo "javaVersion=8" >> $GITHUB_ENV + - name: common operations + uses: ./.github/actions/common/mq-grayscale/rocketmq + - name: message gray test for test-model=${{ matrix.test-model }} rocketMqClientVersion=${{ matrix.rocketMqClientVersion }} + uses: ./.github/actions/scenarios/mq-grayscale/rocketmq + test-for-base-base-gray: + name: Test for pull base gray for base rule + runs-on: ubuntu-latest + if: needs.set-execution-conditions.outputs.enableMqGrayscaleRocketMqAction=='true' + needs: [set-execution-conditions, build-agent-and-cache, download-midwares-and-cache, test-for-base-exc-only-base] + strategy: + matrix: + test-model: ['BASE_BASE_GRAY'] + rocketMqClientVersion: ["4.8.0", "4.9.8", "5.1.4", "5.2.0", "5.3.1"] + fail-fast: false + steps: + - uses: actions/checkout@v3 + with: + fetch-depth: 100 + - name: set java version to environment + run: | + echo "javaVersion=8" >> $GITHUB_ENV + - name: common operations + uses: ./.github/actions/common/mq-grayscale/rocketmq + - name: message gray test for test-model=${{ matrix.test-model }} rocketMqClientVersion=${{ matrix.rocketMqClientVersion }} + uses: ./.github/actions/scenarios/mq-grayscale/rocketmq + test-for-base-exc-base-gray: + name: Test for pull base gray for base exclude rule + runs-on: ubuntu-latest + if: needs.set-execution-conditions.outputs.enableMqGrayscaleRocketMqAction=='true' + needs: [set-execution-conditions, build-agent-and-cache, download-midwares-and-cache, test-for-base-base-gray] + strategy: + matrix: + test-model: ['BASE_EXC_BASE_GRAY'] + rocketMqClientVersion: ["4.8.0", "4.9.8", "5.1.4", "5.2.0", "5.3.1"] + fail-fast: false + steps: + - uses: actions/checkout@v3 + with: + fetch-depth: 100 + - name: set java version to environment + run: | + echo "javaVersion=8" >> $GITHUB_ENV + - name: common operations + uses: ./.github/actions/common/mq-grayscale/rocketmq + - name: message gray test for test-model=${{ matrix.test-model }} rocketMqClientVersion=${{ matrix.rocketMqClientVersion }} + uses: ./.github/actions/scenarios/mq-grayscale/rocketmq diff --git a/sermant-integration-tests/mq-grayscale-rocketmq-test/grayscale-rocketmq-consumer-demo/pom.xml b/sermant-integration-tests/mq-grayscale-rocketmq-test/grayscale-rocketmq-consumer-demo/pom.xml new file mode 100644 index 0000000000..0e92b41e45 --- /dev/null +++ b/sermant-integration-tests/mq-grayscale-rocketmq-test/grayscale-rocketmq-consumer-demo/pom.xml @@ -0,0 +1,49 @@ + + + + mq-grayscale-rocketmq-test + io.sermant.integration + 1.0.0 + + 4.0.0 + + grayscale-rocketmq-consumer-demo + 1.0.0 + + + 8 + 8 + + + + + org.springframework.boot + spring-boot-starter-web + + + org.apache.rocketmq + rocketmq-client + + + org.apache.rocketmq + rocketmq-common + + + org.springframework.boot + spring-boot-starter-actuator + + + + + ${project.artifactId} + + + org.springframework.boot + spring-boot-maven-plugin + + + + + diff --git a/sermant-integration-tests/mq-grayscale-rocketmq-test/grayscale-rocketmq-consumer-demo/src/main/java/io/sermant/demo/grayscale/rocketmq/consumer/RocketMqConsumerApplication.java b/sermant-integration-tests/mq-grayscale-rocketmq-test/grayscale-rocketmq-consumer-demo/src/main/java/io/sermant/demo/grayscale/rocketmq/consumer/RocketMqConsumerApplication.java new file mode 100644 index 0000000000..db4b9ae9a9 --- /dev/null +++ b/sermant-integration-tests/mq-grayscale-rocketmq-test/grayscale-rocketmq-consumer-demo/src/main/java/io/sermant/demo/grayscale/rocketmq/consumer/RocketMqConsumerApplication.java @@ -0,0 +1,38 @@ +/* + * Copyright (C) 2024-2024 Sermant Authors. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.sermant.demo.grayscale.rocketmq.consumer; + +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; + +/** + * springboot starter + * + * @author chengyouling + * @since 2024-10-30 + **/ +@SpringBootApplication +public class RocketMqConsumerApplication { + /** + * main + * + * @param args parameters + */ + public static void main(String[] args) { + SpringApplication.run(RocketMqConsumerApplication.class, args); + } +} diff --git a/sermant-integration-tests/mq-grayscale-rocketmq-test/grayscale-rocketmq-consumer-demo/src/main/java/io/sermant/demo/grayscale/rocketmq/consumer/RocketMqConsumerController.java b/sermant-integration-tests/mq-grayscale-rocketmq-test/grayscale-rocketmq-consumer-demo/src/main/java/io/sermant/demo/grayscale/rocketmq/consumer/RocketMqConsumerController.java new file mode 100644 index 0000000000..35de7388b0 --- /dev/null +++ b/sermant-integration-tests/mq-grayscale-rocketmq-test/grayscale-rocketmq-consumer-demo/src/main/java/io/sermant/demo/grayscale/rocketmq/consumer/RocketMqConsumerController.java @@ -0,0 +1,126 @@ +/* + * Copyright (C) 2024-2024 Sermant Authors. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.sermant.demo.grayscale.rocketmq.consumer; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.RequestParam; +import org.springframework.web.bind.annotation.RestController; + +import java.util.Map; + +/** + * consumer controller + * + * @author chengyouling + * @since 2024-10-30 + **/ +@RestController +public class RocketMqConsumerController { + @Value("${rocketmq.address}") + private String nameServer; + + @Value("${rocketmq.topic}") + private String topic; + + @Autowired + private RocketMqPullConsumer pullConsumer; + + @Autowired + private RocketMqPushConsumer pushConsumer; + + @Autowired + private RocketMqLitePullConsumer litePullConsumer; + + /** + * clear cache count info + * + * @return is success + */ + @GetMapping("/clearMessageCount") + public String clearMessageCount() { + RocketMqMessageUtils.clearMessageCount(); + return "success"; + } + + /** + * init consumer + * + * @param consumerType consumerType + * @return init status + */ + @GetMapping("/initConsumer") + public String initConsumer(@RequestParam("consumerType") String consumerType) { + if ("PUSH".equals(consumerType)) { + pushConsumer.initPushConsumer(topic + "-PUSH", nameServer); + } else if ("PULL".equals(consumerType)) { + pullConsumer.initPullConsumer(nameServer, topic + "-PULL"); + } else { + litePullConsumer.initLitePullConsumer(topic + "-LITE-PULL", nameServer); + } + return "success"; + } + + /** + * shutdown consumer + * + * @param consumerType consumerType + * @return status + */ + @GetMapping("/shutdownConsumer") + public String shutdownConsumer(@RequestParam("consumerType") String consumerType) { + if ("PUSH".equals(consumerType)) { + pushConsumer.shutdownPushConsumer(); + } else if ("PULL".equals(consumerType)) { + pullConsumer.shutdownPullConsumer(); + } else { + litePullConsumer.shutdownLitePullConsumer(); + } + return "ok"; + } + + /** + * pull message + * + * @return message count + */ + @GetMapping("/getPullConsumerMessageCount") + public Map getPullConsumerMessageCount() { + return pullConsumer.getMessageCount(); + } + + /** + * get push consumer message count + * + * @return message count + */ + @GetMapping("/getPushConsumerMessageCount") + public Map getPushConsumerMessageCount() { + return pushConsumer.getMessageCount(topic + "-PUSH", nameServer); + } + + /** + * get lite pull consumer message count + * + * @return message count + */ + @GetMapping("/getLitePullConsumerMessageCount") + public Map getLitePullConsumerMessageCount() { + return litePullConsumer.getMessageCount(topic + "-LITE-PULL", nameServer); + } +} diff --git a/sermant-integration-tests/mq-grayscale-rocketmq-test/grayscale-rocketmq-consumer-demo/src/main/java/io/sermant/demo/grayscale/rocketmq/consumer/RocketMqLitePullConsumer.java b/sermant-integration-tests/mq-grayscale-rocketmq-test/grayscale-rocketmq-consumer-demo/src/main/java/io/sermant/demo/grayscale/rocketmq/consumer/RocketMqLitePullConsumer.java new file mode 100644 index 0000000000..f361fdc2b5 --- /dev/null +++ b/sermant-integration-tests/mq-grayscale-rocketmq-test/grayscale-rocketmq-consumer-demo/src/main/java/io/sermant/demo/grayscale/rocketmq/consumer/RocketMqLitePullConsumer.java @@ -0,0 +1,116 @@ +/* + * Copyright (C) 2024-2024 Sermant Authors. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.sermant.demo.grayscale.rocketmq.consumer; + +import org.apache.rocketmq.client.consumer.DefaultLitePullConsumer; +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.common.message.MessageExt; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.stereotype.Component; + +import java.util.List; +import java.util.Map; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +/** + * litePull consumer + * + * @author chengyouling + * @since 2024-11-30 + **/ +@Component +public class RocketMqLitePullConsumer { + private static final Logger LOGGER = LoggerFactory.getLogger(RocketMqLitePullConsumer.class); + + private DefaultLitePullConsumer litePullConsumer; + + private final ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor(); + + private final long delay = 2L; + + private final long initialDelay = 0L; + + /** + * init lite pull consumer + * + * @param mqTopic topic + * @param mqAddress address + */ + public void initLitePullConsumer(String mqTopic, String mqAddress) { + if (litePullConsumer == null) { + try { + litePullConsumer = new DefaultLitePullConsumer("default"); + litePullConsumer.setNamesrvAddr(mqAddress); + litePullConsumer.subscribe(mqTopic, "*"); + litePullConsumer.start(); + executorService.scheduleWithFixedDelay(new LitePullRunnable(litePullConsumer), initialDelay, delay, + TimeUnit.SECONDS); + } catch (MQClientException e) { + LOGGER.error("init lite pull consumer error!", e); + } + } + } + + /** + * get lite pull message count + * + * @param mqTopic topic + * @param mqAddress address + * @return message count + */ + public Map getMessageCount(String mqTopic, String mqAddress) { + if (litePullConsumer == null) { + initLitePullConsumer(mqTopic, mqAddress); + } + return RocketMqMessageUtils.getMessageCount(); + } + + /** + * shutdown consumer + */ + public void shutdownLitePullConsumer() { + executorService.shutdown(); + if (litePullConsumer != null) { + litePullConsumer.shutdown(); + } + } + + /** + * lite pull consumer runnable + * + * @author chengyouling + * @since 2024-11-30 + **/ + static class LitePullRunnable implements Runnable { + private final DefaultLitePullConsumer litePullConsumer; + + private final long pullTimeout = 30000L; + + LitePullRunnable(DefaultLitePullConsumer litePullConsumer) { + this.litePullConsumer = litePullConsumer; + } + + @Override + public void run() { + List messageExts = litePullConsumer.poll(pullTimeout); + messageExts.forEach(RocketMqMessageUtils::convertMessageCount); + } + } +} diff --git a/sermant-integration-tests/mq-grayscale-rocketmq-test/grayscale-rocketmq-consumer-demo/src/main/java/io/sermant/demo/grayscale/rocketmq/consumer/RocketMqMessageUtils.java b/sermant-integration-tests/mq-grayscale-rocketmq-test/grayscale-rocketmq-consumer-demo/src/main/java/io/sermant/demo/grayscale/rocketmq/consumer/RocketMqMessageUtils.java new file mode 100644 index 0000000000..13bc301e54 --- /dev/null +++ b/sermant-integration-tests/mq-grayscale-rocketmq-test/grayscale-rocketmq-consumer-demo/src/main/java/io/sermant/demo/grayscale/rocketmq/consumer/RocketMqMessageUtils.java @@ -0,0 +1,73 @@ +/* + * Copyright (C) 2024-2024 Sermant Authors. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.sermant.demo.grayscale.rocketmq.consumer; + +import org.apache.rocketmq.common.message.MessageExt; + +import java.util.HashMap; +import java.util.Map; + +/** + * message count utils + * + * @author chengyouling + * @since 2024-10-30 + */ +public class RocketMqMessageUtils { + private static final String GRAY = "gray"; + + private static int grayMessageCount = 0; + + private static int baseMessageCount = 0; + + private RocketMqMessageUtils() { + } + + /** + * convert base/gray message + * + * @param messageExt MessageExt + */ + public static void convertMessageCount(MessageExt messageExt) { + if (messageExt.getProperties() != null && GRAY.equals(messageExt.getProperties().get( + "x_lane_canary"))) { + grayMessageCount++; + return; + } + baseMessageCount++; + } + + /** + * get base/gray message count + * + * @return message count + */ + public static Map getMessageCount() { + Map countMap = new HashMap<>(); + countMap.put("baseMessageCount", baseMessageCount); + countMap.put("grayMessageCount", grayMessageCount); + return countMap; + } + + /** + * clear cache count info + */ + public static void clearMessageCount() { + baseMessageCount = 0; + grayMessageCount = 0; + } +} diff --git a/sermant-integration-tests/mq-grayscale-rocketmq-test/grayscale-rocketmq-consumer-demo/src/main/java/io/sermant/demo/grayscale/rocketmq/consumer/RocketMqPullConsumer.java b/sermant-integration-tests/mq-grayscale-rocketmq-test/grayscale-rocketmq-consumer-demo/src/main/java/io/sermant/demo/grayscale/rocketmq/consumer/RocketMqPullConsumer.java new file mode 100644 index 0000000000..100a0ea483 --- /dev/null +++ b/sermant-integration-tests/mq-grayscale-rocketmq-test/grayscale-rocketmq-consumer-demo/src/main/java/io/sermant/demo/grayscale/rocketmq/consumer/RocketMqPullConsumer.java @@ -0,0 +1,153 @@ +/* + * Copyright (C) 2024-2024 Sermant Authors. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.sermant.demo.grayscale.rocketmq.consumer; + +import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer; +import org.apache.rocketmq.client.consumer.PullStatus; +import org.apache.rocketmq.client.exception.MQBrokerException; +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.client.impl.consumer.PullResultExt; +import org.apache.rocketmq.common.message.MessageExt; +import org.apache.rocketmq.common.message.MessageQueue; +import org.apache.rocketmq.remoting.exception.RemotingException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.stereotype.Component; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +/** + * pull consumer + * + * @author chengyouling + * @since 2024-11-30 + **/ +@Component +public class RocketMqPullConsumer { + private static final Logger LOGGER = LoggerFactory.getLogger(RocketMqPullConsumer.class); + + private static final Map OFF_SET_TABLE = new HashMap<>(); + + private final int maxReconsumeTimes = 10000; + + private final long pullTimeout = 300000L; + + private final long delay = 2L; + + private final long initialDelay = 0L; + + private final ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor(); + + private DefaultMQPullConsumer pullConsumer; + + /** + * init pull consumer + * + * @param mqAddress address + * @param topic topic + */ + public void initPullConsumer(String mqAddress, String topic) { + try { + if (pullConsumer == null) { + pullConsumer = new DefaultMQPullConsumer("default"); + pullConsumer.setNamesrvAddr(mqAddress); + pullConsumer.setMaxReconsumeTimes(maxReconsumeTimes); + pullConsumer.setConsumerPullTimeoutMillis(pullTimeout); + pullConsumer.start(); + executorService.scheduleWithFixedDelay(new PullRunnable(pullConsumer, topic), initialDelay, delay, + TimeUnit.SECONDS); + } + } catch (MQClientException e) { + LOGGER.error("init pull consumer error!", e); + } + } + + /** + * get pull message count + * + * @return message count + */ + public Map getMessageCount() { + return RocketMqMessageUtils.getMessageCount(); + } + + private static void putMessageQueueOffSet(MessageQueue mq, PullResultExt pullResult) { + OFF_SET_TABLE.put(mq, pullResult.getNextBeginOffset()); + } + + private static long getMessageQueueOffSet(MessageQueue mq) { + return OFF_SET_TABLE.getOrDefault(mq, 0L); + } + + /** + * shutdown consumer + */ + public void shutdownPullConsumer() { + executorService.shutdown(); + if (pullConsumer != null) { + pullConsumer.shutdown(); + } + } + + /** + * pull consumer runnable + * + * @author chengyouling + * @since 2024-11-30 + **/ + static class PullRunnable implements Runnable { + private final DefaultMQPullConsumer pullConsumer; + + private final String topic; + + private final int maxNums = 32; + + PullRunnable(DefaultMQPullConsumer pullConsumer, String topic) { + this.pullConsumer = pullConsumer; + this.topic = topic; + } + + @Override + public void run() { + try { + Set messageQueues = pullConsumer.fetchSubscribeMessageQueues(topic); + if (messageQueues.isEmpty()) { + return; + } + for (MessageQueue mq : messageQueues) { + PullResultExt pullResult = (PullResultExt) pullConsumer.pullBlockIfNotFound(mq, null, + getMessageQueueOffSet(mq), maxNums); + putMessageQueueOffSet(mq, pullResult); + if (pullResult.getPullStatus() == PullStatus.FOUND) { + List messageExts = pullResult.getMsgFoundList(); + for (MessageExt messageExt: messageExts) { + RocketMqMessageUtils.convertMessageCount(messageExt); + } + } + } + } catch (MQClientException | MQBrokerException | RemotingException | InterruptedException e) { + LOGGER.error("get pull message error!", e); + } + } + } +} diff --git a/sermant-integration-tests/mq-grayscale-rocketmq-test/grayscale-rocketmq-consumer-demo/src/main/java/io/sermant/demo/grayscale/rocketmq/consumer/RocketMqPushConsumer.java b/sermant-integration-tests/mq-grayscale-rocketmq-test/grayscale-rocketmq-consumer-demo/src/main/java/io/sermant/demo/grayscale/rocketmq/consumer/RocketMqPushConsumer.java new file mode 100644 index 0000000000..358f1bb463 --- /dev/null +++ b/sermant-integration-tests/mq-grayscale-rocketmq-test/grayscale-rocketmq-consumer-demo/src/main/java/io/sermant/demo/grayscale/rocketmq/consumer/RocketMqPushConsumer.java @@ -0,0 +1,92 @@ +/* + * Copyright (C) 2024-2024 Sermant Authors. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.sermant.demo.grayscale.rocketmq.consumer; + +import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; +import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; +import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.common.message.MessageExt; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.stereotype.Component; + +import java.util.Map; + +/** + * push consumer + * + * @author chengyouling + * @since 2024-11-30 + **/ +@Component +public class RocketMqPushConsumer { + private static final Logger LOGGER = LoggerFactory.getLogger(RocketMqPushConsumer.class); + + private DefaultMQPushConsumer pushConsumer; + + private final long pushTimeout = 300000L; + + /** + * init push consumer + * + * @param mqTopic topic + * @param mqAddress address + */ + public void initPushConsumer(String mqTopic, String mqAddress) { + try { + if (pushConsumer == null) { + pushConsumer = new DefaultMQPushConsumer("default"); + pushConsumer.setNamesrvAddr(mqAddress); + pushConsumer.subscribe(mqTopic, "*"); + pushConsumer.setConsumeTimeout(pushTimeout); + pushConsumer.registerMessageListener((MessageListenerConcurrently)(messages, context) -> { + for (MessageExt messageExt : messages) { + RocketMqMessageUtils.convertMessageCount(messageExt); + } + return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; + }); + pushConsumer.start(); + } + } catch (MQClientException e) { + LOGGER.error("init push consumer error!", e); + } + } + + /** + * get push message count + * + * @param mqTopic topic + * @param mqAddress address + * @return message count + */ + public Map getMessageCount(String mqTopic, String mqAddress) { + if (pushConsumer == null) { + initPushConsumer(mqTopic, mqAddress); + } + return RocketMqMessageUtils.getMessageCount(); + } + + /** + * shutdown consumer + */ + public void shutdownPushConsumer() { + if (pushConsumer != null) { + pushConsumer.shutdown(); + } + } +} diff --git a/sermant-integration-tests/mq-grayscale-rocketmq-test/grayscale-rocketmq-consumer-demo/src/main/resources/application.yaml b/sermant-integration-tests/mq-grayscale-rocketmq-test/grayscale-rocketmq-consumer-demo/src/main/resources/application.yaml new file mode 100644 index 0000000000..2c81926171 --- /dev/null +++ b/sermant-integration-tests/mq-grayscale-rocketmq-test/grayscale-rocketmq-consumer-demo/src/main/resources/application.yaml @@ -0,0 +1,10 @@ +server: + port: 9000 +rocketmq: + address: 127.0.0.1:9876 + topic: MESSAGE-GRAY +management: + endpoints: + web: + exposure: + include: "*" diff --git a/sermant-integration-tests/mq-grayscale-rocketmq-test/grayscale-rocketmq-integration-test/pom.xml b/sermant-integration-tests/mq-grayscale-rocketmq-test/grayscale-rocketmq-integration-test/pom.xml new file mode 100644 index 0000000000..17cca01e74 --- /dev/null +++ b/sermant-integration-tests/mq-grayscale-rocketmq-test/grayscale-rocketmq-integration-test/pom.xml @@ -0,0 +1,40 @@ + + + + mq-grayscale-rocketmq-test + io.sermant.integration + 1.0.0 + + 4.0.0 + + grayscale-rocketmq-integration-test + 1.0.0 + + + 8 + 8 + + + + + org.junit.jupiter + junit-jupiter + test + + + org.springframework + spring-web + 5.3.20 + test + + + com.alibaba + fastjson + 2.0.9 + test + + + + diff --git a/sermant-integration-tests/mq-grayscale-rocketmq-test/grayscale-rocketmq-integration-test/src/test/java/io/sermant/demo/grayscale/rocketmq/integration/GrayscaleRocketmqTest.java b/sermant-integration-tests/mq-grayscale-rocketmq-test/grayscale-rocketmq-integration-test/src/test/java/io/sermant/demo/grayscale/rocketmq/integration/GrayscaleRocketmqTest.java new file mode 100644 index 0000000000..c82ec13d83 --- /dev/null +++ b/sermant-integration-tests/mq-grayscale-rocketmq-test/grayscale-rocketmq-integration-test/src/test/java/io/sermant/demo/grayscale/rocketmq/integration/GrayscaleRocketmqTest.java @@ -0,0 +1,552 @@ +/* + * Copyright (C) 2024-2024 Sermant Authors. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.sermant.demo.grayscale.rocketmq.integration; + +import io.sermant.demo.grayscale.rocketmq.integration.support.KieClient; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.condition.EnabledIfSystemProperty; +import org.springframework.web.client.RestTemplate; + +import java.util.concurrent.TimeUnit; + +import com.alibaba.fastjson.JSON; + +/** + * test method class + * + * @author chengyouling + * @since 2024-10-30 + */ +public class GrayscaleRocketmqTest { + private static final RestTemplate restTemplate = new RestTemplate(); + + private static final KieClient kieClient = new KieClient(restTemplate); + + private static final String CONFIG_KEY = "grayscale.mq.config"; + + private static final String CONSUMER_TYPE_PULL = "PULL"; + + private static final String CONSUMER_TYPE_LITE_PULL = "LITE-PULL"; + + private static final String CONSUMER_TYPE_PUSH = "PUSH"; + + @Test + @EnabledIfSystemProperty(named = "grayscale.rocketmq.integration.test.type", matches = "PLUGIN_ENABLED_FALSE") + public void testPluginEnabledFalseConsumeMessage() throws InterruptedException { + testPluginEnabledFalsePull(); + Thread.sleep(10000); + testPluginEnabledFalseLitePull(); + Thread.sleep(10000); + testPluginEnabledFalsePush(); + } + + private void testPluginEnabledFalsePull() throws InterruptedException { + initAndProduceMessage(false, CONSUMER_TYPE_PULL); + Thread.sleep(50000); + String baseResult = getPullBaseResult(); + int baseBaseCount = parseBaseMessageCount(baseResult); + int baseGrayCount = parseGrayMessageCount(baseResult); + Assertions.assertTrue(baseBaseCount == 2 && baseGrayCount == 0); + shutdownConsumer(false, CONSUMER_TYPE_PULL); + } + + private void testPluginEnabledFalseLitePull() throws InterruptedException { + initAndProduceMessage(false, CONSUMER_TYPE_LITE_PULL); + Thread.sleep(180000); + String baseResult = getLitePullBaseResult(); + int baseBaseCount = parseBaseMessageCount(baseResult); + int baseGrayCount = parseGrayMessageCount(baseResult); + Assertions.assertTrue(baseBaseCount == 2 && baseGrayCount == 0); + shutdownConsumer(false, CONSUMER_TYPE_LITE_PULL); + } + + private void testPluginEnabledFalsePush() throws InterruptedException { + initAndProduceMessage(false, CONSUMER_TYPE_PUSH); + Thread.sleep(180000); + String baseResult = getPushBaseResult(); + int baseBaseCount = parseBaseMessageCount(baseResult); + int baseGrayCount = parseGrayMessageCount(baseResult); + Assertions.assertTrue(baseBaseCount == 2 && baseGrayCount == 0); + shutdownConsumer(false, CONSUMER_TYPE_PUSH); + } + + private int parseBaseMessageCount(String messageCount) { + return (int) JSON.parseObject(messageCount).get("baseMessageCount"); + } + + private int parseGrayMessageCount(String messageCount) { + return (int) JSON.parseObject(messageCount).get("grayMessageCount"); + } + + @Test + @EnabledIfSystemProperty(named = "grayscale.rocketmq.integration.test.type", matches = "AUTO_ONLY_BASE") + public void testAutoOnlyBaseConsumeMessage() throws InterruptedException { + createGrayscaleConfig("AUTO", ""); + testAutoOnlyBasePull(); + Thread.sleep(10000); + testAutoOnlyBaseLitePull(); + Thread.sleep(10000); + testAutoOnlyBasePush(); + } + + private void testAutoOnlyBasePull() throws InterruptedException { + initAndProduceMessage(false, CONSUMER_TYPE_PULL); + Thread.sleep(50000); + String baseResult = getPullBaseResult(); + int baseBaseCount = parseBaseMessageCount(baseResult); + int baseGrayCount = parseGrayMessageCount(baseResult); + Assertions.assertTrue(baseBaseCount == 1 && baseGrayCount == 1); + shutdownConsumer(false, CONSUMER_TYPE_PULL); + } + + private void testAutoOnlyBaseLitePull() throws InterruptedException { + initAndProduceMessage(false, CONSUMER_TYPE_LITE_PULL); + Thread.sleep(180000); + String baseResult = getLitePullBaseResult(); + int baseBaseCount = parseBaseMessageCount(baseResult); + int baseGrayCount = parseGrayMessageCount(baseResult); + Assertions.assertTrue(baseBaseCount == 1 && baseGrayCount == 1); + shutdownConsumer(false, CONSUMER_TYPE_LITE_PULL); + } + + private void testAutoOnlyBasePush() throws InterruptedException { + initAndProduceMessage(false, CONSUMER_TYPE_PUSH); + Thread.sleep(180000); + String baseResult = getPushBaseResult(); + int baseBaseCount = parseBaseMessageCount(baseResult); + int baseGrayCount = parseGrayMessageCount(baseResult); + Assertions.assertTrue(baseBaseCount == 1 && baseGrayCount == 1); + shutdownConsumer(false, CONSUMER_TYPE_PUSH); + } + + @Test + @EnabledIfSystemProperty(named = "grayscale.rocketmq.integration.test.type", matches = "AUTO_EXC_ONLY_BASE") + public void testAutoExcOnlyBaseConsumeMessage() throws InterruptedException { + createGrayscaleConfig("AUTO", "gray"); + testAutoExcOnlyBasePull(); + Thread.sleep(10000); + testAutoExcOnlyBaseLitePull(); + Thread.sleep(10000); + testAutoExcOnlyBasePush(); + } + + private void testAutoExcOnlyBasePull() throws InterruptedException { + initAndProduceMessage(false, CONSUMER_TYPE_PULL); + Thread.sleep(50000); + String baseResult = getPullBaseResult(); + int baseBaseCount = parseBaseMessageCount(baseResult); + int baseGrayCount = parseGrayMessageCount(baseResult); + Assertions.assertTrue(baseBaseCount == 1 && baseGrayCount == 0); + shutdownConsumer(false, CONSUMER_TYPE_PULL); + } + + private void testAutoExcOnlyBaseLitePull() throws InterruptedException { + initAndProduceMessage(false, CONSUMER_TYPE_LITE_PULL); + Thread.sleep(180000); + String baseResult = getLitePullBaseResult(); + int baseBaseCount = parseBaseMessageCount(baseResult); + int baseGrayCount = parseGrayMessageCount(baseResult); + Assertions.assertTrue(baseBaseCount == 1 && baseGrayCount == 0); + shutdownConsumer(false, CONSUMER_TYPE_LITE_PULL); + } + + private void testAutoExcOnlyBasePush() throws InterruptedException { + initAndProduceMessage(false, CONSUMER_TYPE_PUSH); + Thread.sleep(180000); + String baseResult = getPushBaseResult(); + int baseBaseCount = parseBaseMessageCount(baseResult); + int baseGrayCount = parseGrayMessageCount(baseResult); + Assertions.assertTrue(baseBaseCount == 1 && baseGrayCount == 0); + shutdownConsumer(false, CONSUMER_TYPE_PUSH); + } + + @Test + @EnabledIfSystemProperty(named = "grayscale.rocketmq.integration.test.type", matches = "AUTO_BASE_GRAY") + public void testAutoBaseGrayConsumeMessage() throws InterruptedException { + createGrayscaleConfig("AUTO", ""); + testAutoBaseGrayPull(); + Thread.sleep(10000); + testAutoBaseGrayLitePull(); + Thread.sleep(10000); + testAutoBaseGrayPush(); + } + + private void testAutoBaseGrayPull() throws InterruptedException { + initAndProduceMessage(true, CONSUMER_TYPE_PULL); + Thread.sleep(50000); + String grayResult = getPullGrayResult(); + int grayGrayCount = parseGrayMessageCount(grayResult); + String baseResult = getPullBaseResult(); + int baseBaseCount = parseBaseMessageCount(baseResult); + Assertions.assertTrue(baseBaseCount == 1 && grayGrayCount == 1); + shutdownConsumer(true, CONSUMER_TYPE_PULL); + } + + private void testAutoBaseGrayLitePull() throws InterruptedException { + initAndProduceMessage(true, CONSUMER_TYPE_LITE_PULL); + Thread.sleep(180000); + String grayResult = getLitePullGrayResult(); + int grayBaseCount = parseBaseMessageCount(grayResult); + int grayGrayCount = parseGrayMessageCount(grayResult); + String baseResult = getLitePullBaseResult(); + int baseBaseCount = parseBaseMessageCount(baseResult); + int baseGrayCount = parseGrayMessageCount(baseResult); + Assertions.assertTrue(baseBaseCount == 1 && baseGrayCount == 0 && grayBaseCount == 0 && grayGrayCount == 1); + shutdownConsumer(true, CONSUMER_TYPE_LITE_PULL); + } + + private void testAutoBaseGrayPush() throws InterruptedException { + initAndProduceMessage(true, CONSUMER_TYPE_PUSH); + Thread.sleep(180000); + String baseResult = getPushBaseResult(); + int baseBaseCount = parseBaseMessageCount(baseResult); + int baseGrayCount = parseGrayMessageCount(baseResult); + String grayResult = getPushGrayResult(); + int grayBaseCount = parseBaseMessageCount(grayResult); + int grayGrayCount = parseGrayMessageCount(grayResult); + Assertions.assertTrue(baseBaseCount == 1 && baseGrayCount == 0 && grayBaseCount == 0 && grayGrayCount == 1); + shutdownConsumer(true, CONSUMER_TYPE_PUSH); + } + + @Test + @EnabledIfSystemProperty(named = "grayscale.rocketmq.integration.test.type", matches = "AUTO_EXC_BASE_GRAY") + public void testAutoExcBaseGrayConsumeMessage() throws InterruptedException { + createGrayscaleConfig("AUTO", "gray"); + testAutoExcBaseGrayPull(); + Thread.sleep(10000); + testAutoExcBaseGrayLitePull(); + Thread.sleep(10000); + testAutoExcBaseGrayPush(); + } + + private void testAutoExcBaseGrayPull() throws InterruptedException { + initAndProduceMessage(true, CONSUMER_TYPE_PULL); + Thread.sleep(50000); + String grayResult = getPullGrayResult(); + int grayBaseCount = parseBaseMessageCount(grayResult); + int grayGrayCount = parseGrayMessageCount(grayResult); + String baseResult = getPullBaseResult(); + int baseBaseCount = parseBaseMessageCount(baseResult); + int baseGrayCount = parseGrayMessageCount(baseResult); + Assertions.assertTrue(baseBaseCount == 1 && baseGrayCount == 0 && grayBaseCount == 0 && grayGrayCount == 1); + shutdownConsumer(true, CONSUMER_TYPE_PULL); + } + + private void testAutoExcBaseGrayLitePull() throws InterruptedException { + initAndProduceMessage(true, CONSUMER_TYPE_LITE_PULL); + Thread.sleep(180000); + String baseResult = getLitePullBaseResult(); + int baseBaseCount = parseBaseMessageCount(baseResult); + int baseGrayCount = parseGrayMessageCount(baseResult); + String grayResult = getLitePullGrayResult(); + int grayBaseCount = parseBaseMessageCount(grayResult); + int grayGrayCount = parseGrayMessageCount(grayResult); + Assertions.assertTrue(baseBaseCount == 1 && baseGrayCount == 0 && grayBaseCount == 0 && grayGrayCount == 1); + shutdownConsumer(true, CONSUMER_TYPE_LITE_PULL); + } + + private void testAutoExcBaseGrayPush() throws InterruptedException { + initAndProduceMessage(true, CONSUMER_TYPE_PUSH); + Thread.sleep(180000); + String baseResult = getPushBaseResult(); + int baseBaseCount = parseBaseMessageCount(baseResult); + int baseGrayCount = parseGrayMessageCount(baseResult); + String grayResult = getPushGrayResult(); + int grayBaseCount = parseBaseMessageCount(grayResult); + int grayGrayCount = parseGrayMessageCount(grayResult); + Assertions.assertTrue(baseBaseCount == 1 && baseGrayCount == 0 && grayBaseCount == 0 && grayGrayCount == 1); + shutdownConsumer(true, CONSUMER_TYPE_PUSH); + } + + @Test + @EnabledIfSystemProperty(named = "grayscale.rocketmq.integration.test.type", matches = "BASE_ONLY_BASE") + public void testBaseOnlyBaseConsumeMessage() throws InterruptedException { + createGrayscaleConfig("BASE", ""); + testBaseOnlyBasePull(); + Thread.sleep(10000); + testBaseOnlyBaseLitePull(); + Thread.sleep(10000); + testBaseOnlyBasePush(); + } + + private void testBaseOnlyBasePull() throws InterruptedException { + initAndProduceMessage(false, CONSUMER_TYPE_PULL); + Thread.sleep(50000); + String baseResult = getPullBaseResult(); + int baseBaseCount = parseBaseMessageCount(baseResult); + int baseGrayCount = parseGrayMessageCount(baseResult); + Assertions.assertTrue( baseBaseCount == 1 && baseGrayCount == 1); + shutdownConsumer(false, CONSUMER_TYPE_PULL); + } + + private void testBaseOnlyBaseLitePull() throws InterruptedException { + initAndProduceMessage(false, CONSUMER_TYPE_LITE_PULL); + Thread.sleep(180000); + String baseResult = getLitePullBaseResult(); + int baseBaseCount = parseBaseMessageCount(baseResult); + int baseGrayCount = parseGrayMessageCount(baseResult); + Assertions.assertTrue( baseBaseCount == 1 && baseGrayCount == 1); + shutdownConsumer(false, CONSUMER_TYPE_LITE_PULL); + } + + private void testBaseOnlyBasePush() throws InterruptedException { + initAndProduceMessage(false, CONSUMER_TYPE_PUSH); + Thread.sleep(180000); + String baseResult = getPushBaseResult(); + int baseBaseCount = parseBaseMessageCount(baseResult); + int baseGrayCount = parseGrayMessageCount(baseResult); + Assertions.assertTrue( baseBaseCount == 1 && baseGrayCount == 1); + shutdownConsumer(false, CONSUMER_TYPE_PUSH); + } + + @Test + @EnabledIfSystemProperty(named = "grayscale.rocketmq.integration.test.type", matches = "BASE_EXC_ONLY_BASE") + public void testBaseExcOnlyBaseConsumeMessage() throws InterruptedException { + createGrayscaleConfig("BASE", "gray"); + testBaseExcOnlyBasePull(); + Thread.sleep(10000); + testBaseExcOnlyBaseLitePull(); + Thread.sleep(10000); + testBaseExcOnlyBasePush(); + } + + private void testBaseExcOnlyBasePull() throws InterruptedException { + initAndProduceMessage(false, CONSUMER_TYPE_PULL); + Thread.sleep(50000); + String baseResult = getPullBaseResult(); + int baseBaseCount = parseBaseMessageCount(baseResult); + int baseGrayCount = parseGrayMessageCount(baseResult); + Assertions.assertTrue( baseBaseCount == 1 && baseGrayCount == 0); + shutdownConsumer(false, CONSUMER_TYPE_PULL); + } + + private void testBaseExcOnlyBaseLitePull() throws InterruptedException { + initAndProduceMessage(false, CONSUMER_TYPE_LITE_PULL); + Thread.sleep(180000); + String baseResult = getLitePullBaseResult(); + int baseBaseCount = parseBaseMessageCount(baseResult); + int baseGrayCount = parseGrayMessageCount(baseResult); + Assertions.assertTrue( baseBaseCount == 1 && baseGrayCount == 0); + shutdownConsumer(false, CONSUMER_TYPE_LITE_PULL); + } + + private void testBaseExcOnlyBasePush() throws InterruptedException { + initAndProduceMessage(false, CONSUMER_TYPE_PUSH); + Thread.sleep(180000); + String baseResult = getPushBaseResult(); + int baseBaseCount = parseBaseMessageCount(baseResult); + int baseGrayCount = parseGrayMessageCount(baseResult); + Assertions.assertTrue( baseBaseCount == 1 && baseGrayCount == 0); + shutdownConsumer(false, CONSUMER_TYPE_PUSH); + } + + @Test + @EnabledIfSystemProperty(named = "grayscale.rocketmq.integration.test.type", matches = "BASE_BASE_GRAY") + public void testBaseBaseGrayConsumeMessage() throws InterruptedException { + createGrayscaleConfig("BASE", ""); + testBaseBaseGrayPull(); + Thread.sleep(10000); + testBaseBaseGrayLitePull(); + Thread.sleep(10000); + testBaseBaseGrayPush(); + } + + private void testBaseBaseGrayPull() throws InterruptedException { + initAndProduceMessage(true, CONSUMER_TYPE_PULL); + Thread.sleep(50000); + String baseResult = getPullBaseResult(); + int baseBaseCount = parseBaseMessageCount(baseResult); + int baseGrayCount = parseGrayMessageCount(baseResult); + String grayResult = getPullGrayResult(); + int grayGrayCount = parseGrayMessageCount(grayResult); + int grayBaseCount = parseBaseMessageCount(grayResult); + Assertions.assertTrue(baseBaseCount == 1 && baseGrayCount == 1 && grayGrayCount == 1 && grayBaseCount == 0); + shutdownConsumer(true, CONSUMER_TYPE_PULL); + } + + private void testBaseBaseGrayLitePull() throws InterruptedException { + initAndProduceMessage(true, CONSUMER_TYPE_LITE_PULL); + Thread.sleep(180000); + String baseResult = getLitePullBaseResult(); + int baseBaseCount = parseBaseMessageCount(baseResult); + int baseGrayCount = parseGrayMessageCount(baseResult); + String grayResult = getLitePullGrayResult(); + int grayGrayCount = parseGrayMessageCount(grayResult); + int grayBaseCount = parseBaseMessageCount(grayResult); + Assertions.assertTrue(baseBaseCount == 1 && baseGrayCount == 1 && grayGrayCount == 1 && grayBaseCount == 0); + shutdownConsumer(true, CONSUMER_TYPE_LITE_PULL); + } + + private void testBaseBaseGrayPush() throws InterruptedException { + initAndProduceMessage(true, CONSUMER_TYPE_PUSH); + Thread.sleep(180000); + String baseResult = getPushBaseResult(); + int baseBaseCount = parseBaseMessageCount(baseResult); + int baseGrayCount = parseGrayMessageCount(baseResult); + String grayResult = getPushGrayResult(); + int grayGrayCount = parseGrayMessageCount(grayResult); + int grayBaseCount = parseBaseMessageCount(grayResult); + Assertions.assertTrue(baseBaseCount == 1 && baseGrayCount == 1 && grayGrayCount == 1 && grayBaseCount == 0); + shutdownConsumer(true, CONSUMER_TYPE_PUSH); + } + + @Test + @EnabledIfSystemProperty(named = "grayscale.rocketmq.integration.test.type", matches = "BASE_EXC_BASE_GRAY") + public void testBaseExcBaseGrayConsumeMessage() throws InterruptedException { + createGrayscaleConfig("BASE", "gray"); + testBaseExcBaseGrayPull(); + Thread.sleep(10000); + testBaseExcBaseGrayLitePull(); + Thread.sleep(10000); + testBaseExcBaseGrayPush(); + } + + private void testBaseExcBaseGrayPull() throws InterruptedException { + initAndProduceMessage(true, CONSUMER_TYPE_PULL); + Thread.sleep(50000); + String baseResult = getPullBaseResult(); + int baseBaseCount = parseBaseMessageCount(baseResult); + int baseGrayCount = parseGrayMessageCount(baseResult); + String grayResult = getPullGrayResult(); + int grayBaseCount = parseBaseMessageCount(grayResult); + int grayGrayCount = parseGrayMessageCount(grayResult); + Assertions.assertTrue(baseBaseCount == 1 && baseGrayCount == 0 && grayBaseCount == 0 && grayGrayCount == 1); + shutdownConsumer(true, CONSUMER_TYPE_PULL); + } + + private void testBaseExcBaseGrayLitePull() throws InterruptedException { + initAndProduceMessage(true, CONSUMER_TYPE_LITE_PULL); + Thread.sleep(180000); + String baseResult = getLitePullBaseResult(); + int baseBaseCount = parseBaseMessageCount(baseResult); + int baseGrayCount = parseGrayMessageCount(baseResult); + String grayResult = getLitePullGrayResult(); + int grayBaseCount = parseBaseMessageCount(grayResult); + int grayGrayCount = parseGrayMessageCount(grayResult); + Assertions.assertTrue(baseBaseCount == 1 && baseGrayCount == 0 && grayBaseCount == 0 && grayGrayCount == 1); + shutdownConsumer(true, CONSUMER_TYPE_LITE_PULL); + } + + private void testBaseExcBaseGrayPush() throws InterruptedException { + initAndProduceMessage(true, CONSUMER_TYPE_PUSH); + Thread.sleep(180000); + String baseResult = getPushBaseResult(); + int baseBaseCount = parseBaseMessageCount(baseResult); + int baseGrayCount = parseGrayMessageCount(baseResult); + String grayResult = getPushGrayResult(); + int grayBaseCount = parseBaseMessageCount(grayResult); + int grayGrayCount = parseGrayMessageCount(grayResult); + Assertions.assertTrue(baseBaseCount == 1 && baseGrayCount == 0 && grayBaseCount == 0 && grayGrayCount == 1); + shutdownConsumer(true, CONSUMER_TYPE_PUSH); + } + + private void initAndProduceMessage(boolean isGrayInstanceInit, String consumerType) + throws InterruptedException { + clearCacheCount(isGrayInstanceInit); + initProducer(); + produceMessage(consumerType); + if (isGrayInstanceInit) { + // Trigger start gray consumer. + restTemplate.getForObject("http://127.0.0.1:9010/initConsumer?consumerType={1}", String.class, + consumerType); + Thread.sleep(50000); + } + + // Trigger start base consumer. + restTemplate.getForObject("http://127.0.0.1:9000/initConsumer?consumerType={1}", String.class, consumerType); + } + + private void shutdownConsumer(boolean isGrayInstanceInit, String consumerType) { + if (isGrayInstanceInit) { + restTemplate.getForObject("http://127.0.0.1:9010/shutdownConsumer?consumerType={1}", String.class, + consumerType); + } + restTemplate.getForObject("http://127.0.0.1:9000/shutdownConsumer?consumerType={1}", String.class, + consumerType); + } + + private String getPullBaseResult() { + return restTemplate.getForObject("http://127.0.0.1:9000/getPullConsumerMessageCount", String.class); + } + + private String getPushBaseResult() { + return restTemplate.getForObject("http://127.0.0.1:9000/getPushConsumerMessageCount", String.class); + } + + private String getLitePullBaseResult() { + return restTemplate.getForObject("http://127.0.0.1:9000/getLitePullConsumerMessageCount", String.class); + } + + private String getPullGrayResult() { + return restTemplate.getForObject("http://127.0.0.1:9010/getPullConsumerMessageCount", String.class); + } + + private String getPushGrayResult() { + return restTemplate.getForObject("http://127.0.0.1:9010/getPushConsumerMessageCount", String.class); + } + + private String getLitePullGrayResult() { + return restTemplate.getForObject("http://127.0.0.1:9010/getLitePullConsumerMessageCount", String.class); + } + + private void initProducer() { + restTemplate.getForObject("http://127.0.0.1:9030/initProducer", String.class); + restTemplate.getForObject("http://127.0.0.1:9040/initProducer", String.class); + } + + private void produceMessage(String consumerType) { + String path = consumerType.equals(CONSUMER_TYPE_PULL) ? "producePullMessage" : + consumerType.equals(CONSUMER_TYPE_LITE_PULL) ? "produceLitePullMessage" : "producePushMessage"; + restTemplate.getForObject("http://127.0.0.1:9030/" + path + "?message={1}", String.class, "message"); + restTemplate.getForObject("http://127.0.0.1:9040/" + path + "?message={1}", String.class, "gray-message"); + } + + private void clearCacheCount(boolean isGrayInstanceInit) { + if (isGrayInstanceInit) { + restTemplate.getForObject("http://127.0.0.1:9010/clearMessageCount", String.class); + } + restTemplate.getForObject("http://127.0.0.1:9000/clearMessageCount", String.class); + } + + private void createGrayscaleConfig(String consumeMode, String excludeTag) { + String CONTENT = "enabled: true\n" + + "grayscale:\n" + + " - consumerGroupTag: gray\n" + + " serviceMeta:\n" + + " version: 1.0.1\n" + + " trafficTag:\n" + + " x_lane_canary: gray\n" + + "base:\n" + + " autoCheckDelayTime: 5\n" + + " consumeMode: " + consumeMode + "\n" + + " excludeGroupTags: [\""+ excludeTag + "\"]\n"; + Assertions.assertTrue(kieClient.publishConfig(CONFIG_KEY, CONTENT)); + } + + @AfterEach + public void deleteGrayscaleConfig() { + try { + kieClient.deleteKey(CONFIG_KEY); + TimeUnit.SECONDS.sleep(2); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } +} diff --git a/sermant-integration-tests/mq-grayscale-rocketmq-test/grayscale-rocketmq-integration-test/src/test/java/io/sermant/demo/grayscale/rocketmq/integration/support/KieClient.java b/sermant-integration-tests/mq-grayscale-rocketmq-test/grayscale-rocketmq-integration-test/src/test/java/io/sermant/demo/grayscale/rocketmq/integration/support/KieClient.java new file mode 100644 index 0000000000..76ea118248 --- /dev/null +++ b/sermant-integration-tests/mq-grayscale-rocketmq-test/grayscale-rocketmq-integration-test/src/test/java/io/sermant/demo/grayscale/rocketmq/integration/support/KieClient.java @@ -0,0 +1,251 @@ +/* + * Copyright (C) 2024-2024 Sermant Authors. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.sermant.demo.grayscale.rocketmq.integration.support; + +import com.alibaba.fastjson.JSONObject; + +import io.sermant.demo.grayscale.rocketmq.integration.support.entity.KieConfigEntity; +import io.sermant.demo.grayscale.rocketmq.integration.support.entity.KieResponse; +import io.sermant.demo.grayscale.rocketmq.integration.support.utils.LabelGroupUtils; + +import org.springframework.http.HttpEntity; +import org.springframework.http.HttpHeaders; +import org.springframework.http.HttpStatus; +import org.springframework.http.MediaType; +import org.springframework.http.ResponseEntity; +import org.springframework.web.client.HttpClientErrorException; +import org.springframework.web.client.RestTemplate; + +import java.io.Serializable; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * Kie response client + * + * @author chengyouling + * @since 2024-10-30 + */ +public class KieClient { + private final RestTemplate restTemplate; + + private final String url; + + private final Map labels; + + /** + * 构造函数 + * + * @param restTemplate 请求器 + */ + public KieClient(RestTemplate restTemplate) { + this(restTemplate, null); + } + + /** + * 构造函数 + * + * @param restTemplate 请求器 + * @param url 地址 + */ + public KieClient(RestTemplate restTemplate, String url) { + this(restTemplate, url, null); + } + + /** + * 构造函数 + * + * @param restTemplate 请求器 + * @param url 地址 + * @param labels 标签 + */ + public KieClient(RestTemplate restTemplate, String url, Map labels) { + this.restTemplate = restTemplate; + this.url = url == null ? "http://127.0.0.1:30110/v1/default/kie/kv" : url; + this.labels = labels; + } + + /** + * 发布配置 + * + * @param key 键 + * @param value 值 + * @return 是否发布成功 + */ + public boolean publishConfig(String key, String value) { + final KieConfigEntity configEntity = queryTargetKeyId(key); + if (configEntity != null) { + // 更新操作 + return updateKey(configEntity, value); + } else { + // 新增操作 + return addKey(key, value); + } + } + + /** + * 更新配置 + * + * @param configEntity 查询的config信息 + * @param value 新的配置内容 + * @return 是否更新成功 + */ + public boolean updateKey(KieConfigEntity configEntity, String value) { + HttpHeaders headers = new HttpHeaders(); + MediaType type = MediaType.parseMediaType("application/json; charset=UTF-8"); + headers.setContentType(type); + headers.add("Accept", MediaType.APPLICATION_JSON.toString()); + + HttpEntity entity = new HttpEntity<>(JSONObject.toJSONString(new KieRequest(configEntity.getKey(), + value, labels)), + headers); + String address = this.url + "/" + configEntity.getId(); + restTemplate.put(address, entity); + return true; + } + + /** + * 根据key删除配置 + * + * @param key 键 + * @return 是否删除成功 + */ + public boolean deleteKey(String key) { + final KieConfigEntity configEntity = this.queryTargetKeyId(key); + if (configEntity == null) { + return false; + } + String address = this.url + "/" + configEntity.getId(); + restTemplate.delete(address); + return true; + } + + /** + * 添加新配置 + * + * @param key 键 + * @param value 值 + * @return 是否添加成功 + */ + public boolean addKey(String key, String value) { + HttpHeaders headers = new HttpHeaders(); + MediaType type = MediaType.parseMediaType("application/json; charset=UTF-8"); + headers.setContentType(type); + headers.add("Accept", MediaType.APPLICATION_JSON.toString()); + + HttpEntity entity = new HttpEntity<>(JSONObject.toJSONString(new KieRequest(key, value, labels)), headers); + + try { + ResponseEntity responseEntity = restTemplate.postForEntity(url, entity, String.class); + return responseEntity.getStatusCode() == HttpStatus.OK; + } catch (HttpClientErrorException ex) { + // ignored + } + return false; + } + + private KieConfigEntity queryTargetKeyId(String key) { + final List query = query(null); + return query.stream().filter(kieConfigEntity -> kieConfigEntity.getKey().equals(key)) + .findFirst().orElse(null); + } + + /** + * 根据标签查询kie配置 + * + * @param labels 新增标签 + * @return kie响应 + */ + public List query(Map labels) { + Map curLabels = labels; + if (labels == null) { + curLabels = this.labels; + } + if (curLabels == null) { + curLabels = new HashMap<>(); + curLabels.put("app", "default"); + curLabels.put("environment", "development"); + } + final String labelGroup = LabelGroupUtils.createLabelGroup(curLabels); + final String labelCondition = LabelGroupUtils.getLabelCondition(labelGroup); + String address = this.url + "?" + labelCondition + "&match=exact&revision="; + final ResponseEntity configs = restTemplate.getForEntity(address, String.class); + JSONObject result = JSONObject.parseObject(configs.getBody()); + if (configs.getStatusCode().value() == 200) { + final KieResponse kieResponse = result.toJavaObject(KieResponse.class); + return kieResponse.getData(); + } + return Collections.emptyList(); + } + + /** + * kie请求 + * + * @since 2022-07-12 + */ + static class KieRequest implements Serializable { + private String key; + private String value; + private Map labels; + private String status = "enabled"; + + public KieRequest(String key, String value, Map labels) { + this.key = key; + this.value = value; + this.labels = labels; + if (this.labels == null) { + this.labels = new HashMap<>(); + this.labels.put("app", "default"); + this.labels.put("environment", "development"); + } + } + + public String getKey() { + return key; + } + + public void setKey(String key) { + this.key = key; + } + + public String getValue() { + return value; + } + + public void setValue(String value) { + this.value = value; + } + + public Map getLabels() { + return labels; + } + + public void setLabels(Map labels) { + this.labels = labels; + } + + public String getStatus() { + return status; + } + + public void setStatus(String status) { + this.status = status; + } + } +} diff --git a/sermant-integration-tests/mq-grayscale-rocketmq-test/grayscale-rocketmq-integration-test/src/test/java/io/sermant/demo/grayscale/rocketmq/integration/support/entity/KieConfigEntity.java b/sermant-integration-tests/mq-grayscale-rocketmq-test/grayscale-rocketmq-integration-test/src/test/java/io/sermant/demo/grayscale/rocketmq/integration/support/entity/KieConfigEntity.java new file mode 100644 index 0000000000..d5035f3df2 --- /dev/null +++ b/sermant-integration-tests/mq-grayscale-rocketmq-test/grayscale-rocketmq-integration-test/src/test/java/io/sermant/demo/grayscale/rocketmq/integration/support/entity/KieConfigEntity.java @@ -0,0 +1,94 @@ +/* + * Copyright (C) 2024-2024 Sermant Authors. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.sermant.demo.grayscale.rocketmq.integration.support.entity; + +import java.util.HashMap; +import java.util.Map; + +/** + * Kie response entity + * + * @author chengyouling + * @since 2024-10-30 + */ +public class KieConfigEntity { + private String id; + private String key; + private Map labels = new HashMap(); + private String value; + private String valueType; + private String status; + + public String getId() { + return id; + } + + public void setId(String id) { + this.id = id; + } + + public String getKey() { + return key; + } + + public void setKey(String key) { + this.key = key; + } + + public Map getLabels() { + return labels; + } + + public void setLabels(Map labels) { + this.labels = labels; + } + + public String getValue() { + return value; + } + + public void setValue(String value) { + this.value = value; + } + + public String getValueType() { + return valueType; + } + + public void setValueType(String valueType) { + this.valueType = valueType; + } + + public String getStatus() { + return status; + } + + public void setStatus(String status) { + this.status = status; + } + + @Override + public String toString() { + return "KieConfigEntity{" + + "id='" + id + '\'' + + ", key='" + key + '\'' + + ", labels=" + labels + + ", value='" + value + '\'' + + ", valueType='" + valueType + '\'' + + ", status='" + status + '\'' + + '}'; + } +} diff --git a/sermant-integration-tests/mq-grayscale-rocketmq-test/grayscale-rocketmq-integration-test/src/test/java/io/sermant/demo/grayscale/rocketmq/integration/support/entity/KieResponse.java b/sermant-integration-tests/mq-grayscale-rocketmq-test/grayscale-rocketmq-integration-test/src/test/java/io/sermant/demo/grayscale/rocketmq/integration/support/entity/KieResponse.java new file mode 100644 index 0000000000..47f437c15c --- /dev/null +++ b/sermant-integration-tests/mq-grayscale-rocketmq-test/grayscale-rocketmq-integration-test/src/test/java/io/sermant/demo/grayscale/rocketmq/integration/support/entity/KieResponse.java @@ -0,0 +1,37 @@ +/* + * Copyright (C) 2024-2024 Sermant Authors. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.sermant.demo.grayscale.rocketmq.integration.support.entity; + +import java.util.List; + +/** + * Kie response + * + * @author chengyouling + * @since 2024-10-30 + */ +public class KieResponse { + private List data; + + public List getData() { + return data; + } + + public void setData(List data) { + this.data = data; + } +} diff --git a/sermant-integration-tests/mq-grayscale-rocketmq-test/grayscale-rocketmq-integration-test/src/test/java/io/sermant/demo/grayscale/rocketmq/integration/support/utils/LabelGroupUtils.java b/sermant-integration-tests/mq-grayscale-rocketmq-test/grayscale-rocketmq-integration-test/src/test/java/io/sermant/demo/grayscale/rocketmq/integration/support/utils/LabelGroupUtils.java new file mode 100644 index 0000000000..daabdfd9b2 --- /dev/null +++ b/sermant-integration-tests/mq-grayscale-rocketmq-test/grayscale-rocketmq-integration-test/src/test/java/io/sermant/demo/grayscale/rocketmq/integration/support/utils/LabelGroupUtils.java @@ -0,0 +1,170 @@ +/* + * Copyright (C) 2024-2024 Sermant Authors. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.sermant.demo.grayscale.rocketmq.integration.support.utils; + +import java.io.UnsupportedEncodingException; +import java.net.URLDecoder; +import java.util.ArrayList; +import java.util.Collections; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; + +/** + * group生成工具 + * + * @author zhouss + * @since 2021-11-23 + */ +public class LabelGroupUtils { + private static final String GROUP_SEPARATOR = "&"; + + private static final String KV_SEPARATOR = "="; + + /** + * 查询时使用的kv分隔符 + */ + private static final String LABEL_QUERY_SEPARATOR = ":"; + + /** + * 查询标签前缀 + */ + private static final String LABEL_PREFIX = "label="; + + /** + * 键值对长度 + */ + private static final int KV_LEN = 2; + + /** + * 默认组 + */ + private static final String DEFAULT_GROUP_KEY = "GROUP"; + + private LabelGroupUtils() { + } + + /** + * 创建标签组 + * + * @param labels 标签组 + * @return labelGroup 例如: app=sc&service=helloService + */ + public static String createLabelGroup(Map labels) { + if (labels == null || labels.isEmpty()) { + return ""; + } + final StringBuilder group = new StringBuilder(); + final List keys = new ArrayList<>(labels.keySet()); + + // 防止相同map因排序不同而导致最后的label不一致 + Collections.sort(keys); + for (String key : keys) { + String value = labels.get(key); + if (key == null || value == null) { + continue; + } + group.append(key).append(KV_SEPARATOR).append(value).append(GROUP_SEPARATOR); + } + if (group.length() == 0) { + return ""; + } + return group.deleteCharAt(group.length() - 1).toString(); + } + + /** + * 重组group, 防止因多个标签因顺序问题而导致group不同 + * + * @param group 标签组 + * @return group + */ + public static String rebuildGroup(String group) { + if (isLabelGroup(group)) { + return createLabelGroup(resolveGroupLabels(group)); + } + return LabelGroupUtils.createLabelGroup(Collections.singletonMap(DEFAULT_GROUP_KEY, group)); + } + + /** + * 是否为标签组key + * + * @param group 监听键 + * @return 是否为标签组 + */ + public static boolean isLabelGroup(String group) { + return group != null && group.contains(KV_SEPARATOR); + } + + /** + * 解析标签为map + * + * @param group 标签组 app=sc&service=helloService + * @return 标签键值对, 返回键值将会是有序的 + */ + public static Map resolveGroupLabels(String group) { + final Map result = new LinkedHashMap<>(); + if (group == null) { + return result; + } + String curGroup = group; + if (!isLabelGroup(curGroup)) { + // 如果非group标签(ZK配置中心场景适配),则为该group创建标签 + curGroup = LabelGroupUtils.createLabelGroup(Collections.singletonMap(DEFAULT_GROUP_KEY, curGroup)); + } + try { + final String decode = URLDecoder.decode(curGroup, "UTF-8"); + final String[] labels = decode.split("&"); + for (String label : labels) { + final String[] labelKv = label.split("="); + if (labelKv.length == KV_LEN) { + result.put(labelKv[0], labelKv[1]); + } else if (labelKv.length == 1) { + // 仅配置了KEY的情况, 使用空串代替 + result.put(labelKv[0], ""); + } + } + } catch (UnsupportedEncodingException ignored) { + // ignored + } + return result; + } + + /** + * 获取标签信息 + * + * @param group 分组 app=sc&service=helloService转换label=app:sc&label=service:helloService + * @return 标签组条件 + */ + public static String getLabelCondition(String group) { + if (group == null || "".equals(group)) { + return group; + } + String curGroup = rebuildGroup(group); + final Map labels = resolveGroupLabels(curGroup); + final StringBuilder finalGroup = new StringBuilder(); + for (Map.Entry entry : labels.entrySet()) { + finalGroup.append(LABEL_PREFIX) + .append(buildSingleLabel(entry.getKey(), entry.getValue())) + .append(GROUP_SEPARATOR); + } + return finalGroup.deleteCharAt(finalGroup.length() - 1).toString(); + } + + private static String buildSingleLabel(String key, String value) { + return key + LABEL_QUERY_SEPARATOR + value; + } +} diff --git a/sermant-integration-tests/mq-grayscale-rocketmq-test/grayscale-rocketmq-producer-demo/pom.xml b/sermant-integration-tests/mq-grayscale-rocketmq-test/grayscale-rocketmq-producer-demo/pom.xml new file mode 100644 index 0000000000..98a7d47520 --- /dev/null +++ b/sermant-integration-tests/mq-grayscale-rocketmq-test/grayscale-rocketmq-producer-demo/pom.xml @@ -0,0 +1,49 @@ + + + + mq-grayscale-rocketmq-test + io.sermant.integration + 1.0.0 + + 4.0.0 + + grayscale-rocketmq-producer-demo + 1.0.0 + + + 8 + 8 + + + + + org.springframework.boot + spring-boot-starter-web + + + org.apache.rocketmq + rocketmq-client + + + org.apache.rocketmq + rocketmq-common + + + org.springframework.boot + spring-boot-starter-actuator + + + + + ${project.artifactId} + + + org.springframework.boot + spring-boot-maven-plugin + + + + + diff --git a/sermant-integration-tests/mq-grayscale-rocketmq-test/grayscale-rocketmq-producer-demo/src/main/java/io/sermant/demo/grayscale/rocketmq/producer/MqProducerController.java b/sermant-integration-tests/mq-grayscale-rocketmq-test/grayscale-rocketmq-producer-demo/src/main/java/io/sermant/demo/grayscale/rocketmq/producer/MqProducerController.java new file mode 100644 index 0000000000..d355fa90e2 --- /dev/null +++ b/sermant-integration-tests/mq-grayscale-rocketmq-test/grayscale-rocketmq-producer-demo/src/main/java/io/sermant/demo/grayscale/rocketmq/producer/MqProducerController.java @@ -0,0 +1,139 @@ +/* + * Copyright (C) 2024-2024 Sermant Authors. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.sermant.demo.grayscale.rocketmq.producer; + +import org.apache.rocketmq.client.exception.MQBrokerException; +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.client.producer.DefaultMQProducer; +import org.apache.rocketmq.common.message.Message; +import org.apache.rocketmq.remoting.exception.RemotingException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.RequestParam; +import org.springframework.web.bind.annotation.RestController; + +import java.nio.charset.StandardCharsets; + +/** + * producer controller + * + * @author chengyouling + * @since 2024-10-30 + */ +@RestController +public class MqProducerController { + private static final Logger LOGGER = LoggerFactory.getLogger(MqProducerController.class); + + @Value("${rocketmq.address}") + private String mqAddress; + + @Value("${rocketmq.topic}") + private String mqTopic; + + private DefaultMQProducer producer; + + private final String errorMessage = "error"; + + private final String successMessage = "success"; + + private final int sendMsgTimeout = 60000; + + /** + * init push producer + * + * @return is success + */ + @GetMapping("/initProducer") + public String initProducer() { + try { + if (producer == null) { + producer = new DefaultMQProducer("default"); + producer.setNamesrvAddr(mqAddress); + producer.setSendMsgTimeout(sendMsgTimeout); + producer.start(); + } + } catch (MQClientException e) { + LOGGER.error("init pull producer error!", e); + } + return successMessage; + } + + /** + * pull producer produce message + * + * @param message message + * @return is success + */ + @GetMapping("/producePullMessage") + public String producePullMessage(@RequestParam("message") String message) { + try { + if (producer == null) { + initProducer(); + } + Message mqMessage = new Message(mqTopic + "-PULL", message.getBytes(StandardCharsets.UTF_8)); + producer.send(mqMessage); + } catch (MQClientException | RemotingException | MQBrokerException | InterruptedException e) { + LOGGER.error("send pull message error, address={}, message={}", mqAddress, message, e); + return errorMessage; + } + return successMessage; + } + + /** + * lite pull producer produce message + * + * @param message message + * @return is success + */ + @GetMapping("/produceLitePullMessage") + public String produceLitePullMessage(@RequestParam("message") String message) { + try { + if (producer == null) { + initProducer(); + } + Message mqMessage = new Message(mqTopic + "-LITE-PULL", message.getBytes(StandardCharsets.UTF_8)); + producer.send(mqMessage); + } catch (MQClientException | RemotingException | MQBrokerException | InterruptedException e) { + LOGGER.error("send lite pull message error, address={}, message={}", mqAddress, message, e); + return errorMessage; + } + return successMessage; + } + + /** + * push producer produce message + * + * @param message message + * @return is success + */ + @GetMapping("/producePushMessage") + public String producePushMessage(@RequestParam("message") String message) { + try { + if (producer == null) { + initProducer(); + } + Message mqMessage = new Message(mqTopic + "-PUSH", message.getBytes(StandardCharsets.UTF_8)); + producer.send(mqMessage); + } catch (MQClientException | RemotingException | MQBrokerException | InterruptedException e) { + LOGGER.error("send push message error, address={}, message={}", mqAddress, message, e); + return errorMessage; + } + return successMessage; + } +} diff --git a/sermant-integration-tests/mq-grayscale-rocketmq-test/grayscale-rocketmq-producer-demo/src/main/java/io/sermant/demo/grayscale/rocketmq/producer/RocketMqProducerApplication.java b/sermant-integration-tests/mq-grayscale-rocketmq-test/grayscale-rocketmq-producer-demo/src/main/java/io/sermant/demo/grayscale/rocketmq/producer/RocketMqProducerApplication.java new file mode 100644 index 0000000000..4c9d33b13d --- /dev/null +++ b/sermant-integration-tests/mq-grayscale-rocketmq-test/grayscale-rocketmq-producer-demo/src/main/java/io/sermant/demo/grayscale/rocketmq/producer/RocketMqProducerApplication.java @@ -0,0 +1,38 @@ +/* + * Copyright (C) 2024-2024 Sermant Authors. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.sermant.demo.grayscale.rocketmq.producer; + +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; + +/** + * springboot starer + * + * @author chengyouling + * @since 2024-10-30 + **/ +@SpringBootApplication +public class RocketMqProducerApplication { + /** + * 启动类 + * + * @param args 进程启动入参 + */ + public static void main(String[] args) { + SpringApplication.run(RocketMqProducerApplication.class, args); + } +} diff --git a/sermant-integration-tests/mq-grayscale-rocketmq-test/grayscale-rocketmq-producer-demo/src/main/resources/application.yaml b/sermant-integration-tests/mq-grayscale-rocketmq-test/grayscale-rocketmq-producer-demo/src/main/resources/application.yaml new file mode 100644 index 0000000000..5351a02721 --- /dev/null +++ b/sermant-integration-tests/mq-grayscale-rocketmq-test/grayscale-rocketmq-producer-demo/src/main/resources/application.yaml @@ -0,0 +1,10 @@ +server: + port: 9030 +rocketmq: + address: 127.0.0.1:9876 + topic: MESSAGE-GRAY +management: + endpoints: + web: + exposure: + include: "*" diff --git a/sermant-integration-tests/mq-grayscale-rocketmq-test/pom.xml b/sermant-integration-tests/mq-grayscale-rocketmq-test/pom.xml new file mode 100644 index 0000000000..6f304c1094 --- /dev/null +++ b/sermant-integration-tests/mq-grayscale-rocketmq-test/pom.xml @@ -0,0 +1,61 @@ + + + + org.springframework.boot + spring-boot-starter-parent + 2.7.15 + + + 4.0.0 + + io.sermant.integration + mq-grayscale-rocketmq-test + 1.0.0 + pom + + + 8 + 8 + 2.7.15 + 2021.0.9 + 5.0.0 + + + + + + org.springframework.cloud + spring-cloud-dependencies + ${spring-cloud.version} + pom + import + + + org.springframework.boot + spring-boot-dependencies + ${spring-boot.version} + pom + import + + + org.apache.rocketmq + rocketmq-client + ${rocketmq-client.version} + + + org.apache.rocketmq + rocketmq-common + ${rocketmq-client.version} + + + + + + grayscale-rocketmq-integration-test + grayscale-rocketmq-producer-demo + grayscale-rocketmq-consumer-demo + + + diff --git a/sermant-integration-tests/pom.xml b/sermant-integration-tests/pom.xml index 2883cd1e41..d426e3f621 100644 --- a/sermant-integration-tests/pom.xml +++ b/sermant-integration-tests/pom.xml @@ -21,5 +21,6 @@ mq-consume-prohibition-test database-write-prohibition-test xds-service-test + mq-grayscale-rocketmq-test