diff --git a/.github/workflows/backend.yml b/.github/workflows/backend.yml index 7c07d3dab6d..8f720d95039 100644 --- a/.github/workflows/backend.yml +++ b/.github/workflows/backend.yml @@ -284,7 +284,7 @@ jobs: max_attempts: 3 retry_on: error command: | - ./mvnw -B install -DskipTests -D"maven.test.skip"=true -D"maven.javadoc.skip"=true -D"license.skipAddThirdParty" + ./mvnw -B install -DskipTests -D"maven.test.skip"=true -D"maven.javadoc.skip"=true -D"license.skipAddThirdParty" -D"skip.ui"=true - name: Check Dependencies Licenses run: tools/dependencies/checkLicense.sh @@ -326,22 +326,18 @@ jobs: steps: - name: Checkout PR uses: actions/checkout@v3 - - uses: actions/setup-node@v2 with: node-version: 20.x - - name: Install Dependencies and Check Code Style run: | cd seatunnel-engine/seatunnel-engine-ui/ npm install npm run lint - - name: Run unit tests run: | cd seatunnel-engine/seatunnel-engine-ui/ npm run test:unit - - name: Build SeaTunnel UI run: | cd seatunnel-engine/seatunnel-engine-ui/ @@ -366,7 +362,7 @@ jobs: cache: 'maven' - name: run all modules unit test run: | - ./mvnw -B -T 1 clean verify -DskipUT=false -DskipIT=true -D"license.skipAddThirdParty"=true --no-snapshot-updates + ./mvnw -B -T 1 clean verify -DskipUT=false -DskipIT=true -D"license.skipAddThirdParty"=true -D"skip.ui"=true --no-snapshot-updates env: MAVEN_OPTS: -Xmx4096m @@ -393,8 +389,8 @@ jobs: run: | sub_modules=`python tools/update_modules_check/update_modules_check.py sub_update_it_module ${{needs.changes.outputs.it-modules}} 8 0` if [ ! -z $sub_modules ]; then - echo $sub_modules - ./mvnw -T 1 -B verify -DskipUT=true -DskipIT=false -D"license.skipAddThirdParty"=true --no-snapshot-updates -pl $sub_modules -am -Pci + echo $sub_modules + ./mvnw -T 1 -B verify -DskipUT=true -DskipIT=false -D"license.skipAddThirdParty"=true -D"skip.ui"=true --no-snapshot-updates -pl $sub_modules -am -Pci else echo "sub modules is empty, skipping" fi @@ -425,7 +421,7 @@ jobs: sub_modules=`python tools/update_modules_check/update_modules_check.py sub_update_it_module ${{needs.changes.outputs.it-modules}} 8 1` if [ ! -z $sub_modules ]; then echo $sub_modules - ./mvnw -T 1 -B verify -DskipUT=true -DskipIT=false -D"license.skipAddThirdParty"=true --no-snapshot-updates -pl $sub_modules -am -Pci + ./mvnw -T 1 -B verify -DskipUT=true -DskipIT=false -D"license.skipAddThirdParty"=true -D"skip.ui"=true --no-snapshot-updates -pl $sub_modules -am -Pci else echo "sub modules is empty, skipping" fi @@ -456,7 +452,7 @@ jobs: sub_modules=`python tools/update_modules_check/update_modules_check.py sub_update_it_module ${{needs.changes.outputs.it-modules}} 8 2` if [ ! -z $sub_modules ]; then echo $sub_modules - ./mvnw -T 1 -B verify -DskipUT=true -DskipIT=false -D"license.skipAddThirdParty"=true --no-snapshot-updates -pl $sub_modules -am -Pci + ./mvnw -T 1 -B verify -DskipUT=true -DskipIT=false -D"license.skipAddThirdParty"=true -D"skip.ui"=true --no-snapshot-updates -pl $sub_modules -am -Pci else echo "sub modules is empty, skipping" fi @@ -487,7 +483,7 @@ jobs: sub_modules=`python tools/update_modules_check/update_modules_check.py sub_update_it_module ${{needs.changes.outputs.it-modules}} 8 3` if [ ! -z $sub_modules ]; then echo $sub_modules - ./mvnw -T 1 -B verify -DskipUT=true -DskipIT=false -D"license.skipAddThirdParty"=true --no-snapshot-updates -pl $sub_modules -am -Pci + ./mvnw -T 1 -B verify -DskipUT=true -DskipIT=false -D"license.skipAddThirdParty"=true -D"skip.ui"=true --no-snapshot-updates -pl $sub_modules -am -Pci else echo "sub modules is empty, skipping" fi @@ -517,7 +513,7 @@ jobs: sub_modules=`python tools/update_modules_check/update_modules_check.py sub_update_it_module ${{needs.changes.outputs.it-modules}} 8 4` if [ ! -z $sub_modules ]; then echo $sub_modules - ./mvnw -T 1 -B verify -DskipUT=true -DskipIT=false -D"license.skipAddThirdParty"=true --no-snapshot-updates -pl $sub_modules -am -Pci + ./mvnw -T 1 -B verify -DskipUT=true -DskipIT=false -D"license.skipAddThirdParty"=true -D"skip.ui"=true --no-snapshot-updates -pl $sub_modules -am -Pci else echo "sub modules is empty, skipping" fi @@ -547,7 +543,7 @@ jobs: sub_modules=`python tools/update_modules_check/update_modules_check.py sub_update_it_module ${{needs.changes.outputs.it-modules}} 8 5` if [ ! -z $sub_modules ]; then echo $sub_modules - ./mvnw -T 1 -B verify -DskipUT=true -DskipIT=false -D"license.skipAddThirdParty"=true --no-snapshot-updates -pl $sub_modules -am -Pci + ./mvnw -T 1 -B verify -DskipUT=true -DskipIT=false -D"license.skipAddThirdParty"=true -D"skip.ui"=true --no-snapshot-updates -pl $sub_modules -am -Pci else echo "sub modules is empty, skipping" fi @@ -577,7 +573,7 @@ jobs: sub_modules=`python tools/update_modules_check/update_modules_check.py sub_update_it_module ${{needs.changes.outputs.it-modules}} 8 6` if [ ! -z $sub_modules ]; then echo $sub_modules - ./mvnw -T 1 -B verify -DskipUT=true -DskipIT=false -D"license.skipAddThirdParty"=true --no-snapshot-updates -pl $sub_modules -am -Pci + ./mvnw -T 1 -B verify -DskipUT=true -DskipIT=false -D"license.skipAddThirdParty"=true -D"skip.ui"=true --no-snapshot-updates -pl $sub_modules -am -Pci else echo "sub modules is empty, skipping" fi @@ -608,7 +604,7 @@ jobs: sub_modules=`python tools/update_modules_check/update_modules_check.py sub_update_it_module ${{needs.changes.outputs.it-modules}} 8 7` if [ ! -z $sub_modules ]; then echo $sub_modules - ./mvnw -T 1 -B verify -DskipUT=true -DskipIT=false -D"license.skipAddThirdParty"=true --no-snapshot-updates -pl $sub_modules -am -Pci + ./mvnw -T 1 -B verify -DskipUT=true -DskipIT=false -D"license.skipAddThirdParty"=true -D"skip.ui"=true --no-snapshot-updates -pl $sub_modules -am -Pci else echo "sub modules is empty, skipping" fi @@ -639,6 +635,7 @@ jobs: ./mvnw -T 1 -B verify -DskipUT=true -DskipIT=false -D"license.skipAddThirdParty"=true --no-snapshot-updates -pl :connector-seatunnel-e2e-base,:connector-console-seatunnel-e2e -am -Pci env: MAVEN_OPTS: -Xmx4096m + engine-k8s-it: needs: [ changes, sanity-check ] if: needs.changes.outputs.api == 'true' || contains(needs.changes.outputs.it-modules, 'seatunnel-engine-k8s-e2e') @@ -668,10 +665,11 @@ jobs: cache: 'maven' - name: run seatunnel zeta on k8s test run: | - ./mvnw -T 1 -B verify -DskipUT=true -DskipIT=false -D"license.skipAddThirdParty"=true --no-snapshot-updates -pl :seatunnel-engine-k8s-e2e -am -Pci + ./mvnw -T 1 -B verify -DskipUT=true -DskipIT=false -D"license.skipAddThirdParty"=true -D"skip.ui"=true --no-snapshot-updates -pl :seatunnel-engine-k8s-e2e -am -Pci env: MAVEN_OPTS: -Xmx4096m KUBECONFIG: /etc/rancher/k3s/k3s.yaml + transform-v2-it-part-1: needs: [ changes, sanity-check ] if: needs.changes.outputs.api == 'true' || needs.changes.outputs.engine == 'true' @@ -696,7 +694,7 @@ jobs: run: tools/github/free_disk_space.sh - name: run transform-v2 integration test (part-1) run: | - ./mvnw -B -T 1 verify -DskipUT=true -DskipIT=false -D"license.skipAddThirdParty"=true --no-snapshot-updates -pl :seatunnel-transforms-v2-e2e-part-1 -am -Pci + ./mvnw -B -T 1 verify -DskipUT=true -DskipIT=false -D"license.skipAddThirdParty"=true -D"skip.ui"=true --no-snapshot-updates -pl :seatunnel-transforms-v2-e2e-part-1 -am -Pci env: MAVEN_OPTS: -Xmx4096m @@ -724,7 +722,7 @@ jobs: run: tools/github/free_disk_space.sh - name: run transform-v2 integration test (part-2) run: | - ./mvnw -B -T 1 verify -DskipUT=true -DskipIT=false -D"license.skipAddThirdParty"=true --no-snapshot-updates -pl :seatunnel-transforms-v2-e2e-part-2 -am -Pci + ./mvnw -B -T 1 verify -DskipUT=true -DskipIT=false -D"license.skipAddThirdParty"=true -D"skip.ui"=true --no-snapshot-updates -pl :seatunnel-transforms-v2-e2e-part-2 -am -Pci env: MAVEN_OPTS: -Xmx4096m @@ -755,7 +753,7 @@ jobs: ./mvnw help:evaluate -Dexpression=project.modules -q -DforceStdout -pl :seatunnel-connector-v2-e2e >> /tmp/sub_module.txt sub_modules=`python tools/update_modules_check/update_modules_check.py sub /tmp/sub_module.txt` run_it_modules=`python tools/update_modules_check/update_modules_check.py sub_it_module "$sub_modules" 7 0` - ./mvnw -B -T 1 verify -DskipUT=true -DskipIT=false -D"license.skipAddThirdParty"=true --no-snapshot-updates -pl $run_it_modules -am -Pci + ./mvnw -B -T 1 verify -DskipUT=true -DskipIT=false -D"license.skipAddThirdParty"=true -D"skip.ui"=true --no-snapshot-updates -pl $run_it_modules -am -Pci env: MAVEN_OPTS: -Xmx4096m @@ -786,7 +784,7 @@ jobs: ./mvnw help:evaluate -Dexpression=project.modules -q -DforceStdout -pl :seatunnel-connector-v2-e2e >> /tmp/sub_module.txt sub_modules=`python tools/update_modules_check/update_modules_check.py sub /tmp/sub_module.txt` run_it_modules=`python tools/update_modules_check/update_modules_check.py sub_it_module "$sub_modules" 7 1` - ./mvnw -B -T 1 verify -DskipUT=true -DskipIT=false -D"license.skipAddThirdParty"=true --no-snapshot-updates -pl $run_it_modules -am -Pci + ./mvnw -B -T 1 verify -DskipUT=true -DskipIT=false -D"license.skipAddThirdParty"=true -D"skip.ui"=true --no-snapshot-updates -pl $run_it_modules -am -Pci env: MAVEN_OPTS: -Xmx4096m @@ -817,7 +815,7 @@ jobs: ./mvnw help:evaluate -Dexpression=project.modules -q -DforceStdout -pl :seatunnel-connector-v2-e2e >> /tmp/sub_module.txt sub_modules=`python tools/update_modules_check/update_modules_check.py sub /tmp/sub_module.txt` run_it_modules=`python tools/update_modules_check/update_modules_check.py sub_it_module "$sub_modules" 7 2` - ./mvnw -B -T 1 verify -DskipUT=true -DskipIT=false -D"license.skipAddThirdParty"=true --no-snapshot-updates -pl $run_it_modules -am -Pci + ./mvnw -B -T 1 verify -DskipUT=true -DskipIT=false -D"license.skipAddThirdParty"=true -D"skip.ui"=true --no-snapshot-updates -pl $run_it_modules -am -Pci env: MAVEN_OPTS: -Xmx4096m @@ -848,7 +846,7 @@ jobs: ./mvnw help:evaluate -Dexpression=project.modules -q -DforceStdout -pl :seatunnel-connector-v2-e2e >> /tmp/sub_module.txt sub_modules=`python tools/update_modules_check/update_modules_check.py sub /tmp/sub_module.txt` run_it_modules=`python tools/update_modules_check/update_modules_check.py sub_it_module "$sub_modules" 7 3` - ./mvnw -B -T 1 verify -DskipUT=true -DskipIT=false -D"license.skipAddThirdParty"=true --no-snapshot-updates -pl $run_it_modules -am -Pci + ./mvnw -B -T 1 verify -DskipUT=true -DskipIT=false -D"license.skipAddThirdParty"=true -D"skip.ui"=true --no-snapshot-updates -pl $run_it_modules -am -Pci env: MAVEN_OPTS: -Xmx4096m @@ -879,7 +877,7 @@ jobs: ./mvnw help:evaluate -Dexpression=project.modules -q -DforceStdout -pl :seatunnel-connector-v2-e2e >> /tmp/sub_module.txt sub_modules=`python tools/update_modules_check/update_modules_check.py sub /tmp/sub_module.txt` run_it_modules=`python tools/update_modules_check/update_modules_check.py sub_it_module "$sub_modules" 7 4` - ./mvnw -B -T 1 verify -DskipUT=true -DskipIT=false -D"license.skipAddThirdParty"=true --no-snapshot-updates -pl $run_it_modules -am -Pci + ./mvnw -B -T 1 verify -DskipUT=true -DskipIT=false -D"license.skipAddThirdParty"=true -D"skip.ui"=true --no-snapshot-updates -pl $run_it_modules -am -Pci env: MAVEN_OPTS: -Xmx4096m @@ -910,7 +908,7 @@ jobs: ./mvnw help:evaluate -Dexpression=project.modules -q -DforceStdout -pl :seatunnel-connector-v2-e2e >> /tmp/sub_module.txt sub_modules=`python tools/update_modules_check/update_modules_check.py sub /tmp/sub_module.txt` run_it_modules=`python tools/update_modules_check/update_modules_check.py sub_it_module "$sub_modules" 7 5` - ./mvnw -B -T 1 verify -DskipUT=true -DskipIT=false -D"license.skipAddThirdParty"=true --no-snapshot-updates -pl $run_it_modules -am -Pci + ./mvnw -B -T 1 verify -DskipUT=true -DskipIT=false -D"license.skipAddThirdParty"=true -D"skip.ui"=true --no-snapshot-updates -pl $run_it_modules -am -Pci env: MAVEN_OPTS: -Xmx4096m @@ -941,7 +939,7 @@ jobs: ./mvnw help:evaluate -Dexpression=project.modules -q -DforceStdout -pl :seatunnel-connector-v2-e2e >> /tmp/sub_module.txt sub_modules=`python tools/update_modules_check/update_modules_check.py sub /tmp/sub_module.txt` run_it_modules=`python tools/update_modules_check/update_modules_check.py sub_it_module "$sub_modules" 7 6` - ./mvnw -B -T 1 verify -DskipUT=true -DskipIT=false -D"license.skipAddThirdParty"=true --no-snapshot-updates -pl $run_it_modules -am -Pci + ./mvnw -B -T 1 verify -DskipUT=true -DskipIT=false -D"license.skipAddThirdParty"=true -D"skip.ui"=true --no-snapshot-updates -pl $run_it_modules -am -Pci env: MAVEN_OPTS: -Xmx4096m @@ -969,7 +967,7 @@ jobs: run: tools/github/free_disk_space.sh - name: run jdbc connectors integration test (part-1) run: | - ./mvnw -B -T 1 verify -DskipUT=true -DskipIT=false -D"license.skipAddThirdParty"=true --no-snapshot-updates -pl :connector-jdbc-e2e-part-1 -am -Pci + ./mvnw -B -T 1 verify -DskipUT=true -DskipIT=false -D"license.skipAddThirdParty"=true -D"skip.ui"=true --no-snapshot-updates -pl :connector-jdbc-e2e-part-1 -am -Pci env: MAVEN_OPTS: -Xmx4096m @@ -997,7 +995,7 @@ jobs: run: tools/github/free_disk_space.sh - name: run jdbc connectors integration test (part-2) run: | - ./mvnw -B -T 1 verify -DskipUT=true -DskipIT=false -D"license.skipAddThirdParty"=true --no-snapshot-updates -pl :connector-jdbc-e2e-part-2 -am -Pci + ./mvnw -B -T 1 verify -DskipUT=true -DskipIT=false -D"license.skipAddThirdParty"=true -D"skip.ui"=true --no-snapshot-updates -pl :connector-jdbc-e2e-part-2 -am -Pci env: MAVEN_OPTS: -Xmx4096m @@ -1025,7 +1023,7 @@ jobs: run: tools/github/free_disk_space.sh - name: run jdbc connectors integration test (part-3) run: | - ./mvnw -B -T 1 verify -DskipUT=true -DskipIT=false -D"license.skipAddThirdParty"=true --no-snapshot-updates -pl :connector-jdbc-e2e-part-3 -am -Pci + ./mvnw -B -T 1 verify -DskipUT=true -DskipIT=false -D"license.skipAddThirdParty"=true -D"skip.ui"=true --no-snapshot-updates -pl :connector-jdbc-e2e-part-3 -am -Pci env: MAVEN_OPTS: -Xmx4096m @@ -1053,7 +1051,7 @@ jobs: run: tools/github/free_disk_space.sh - name: run jdbc connectors integration test (part-4) run: | - ./mvnw -B -T 1 verify -DskipUT=true -DskipIT=false -D"license.skipAddThirdParty"=true --no-snapshot-updates -pl :connector-jdbc-e2e-part-4 -am -Pci + ./mvnw -B -T 1 verify -DskipUT=true -DskipIT=false -D"license.skipAddThirdParty"=true -D"skip.ui"=true --no-snapshot-updates -pl :connector-jdbc-e2e-part-4 -am -Pci env: MAVEN_OPTS: -Xmx4096m @@ -1081,7 +1079,7 @@ jobs: run: tools/github/free_disk_space.sh - name: run jdbc connectors integration test (part-5) run: | - ./mvnw -B -T 1 verify -DskipUT=true -DskipIT=false -D"license.skipAddThirdParty"=true --no-snapshot-updates -pl :connector-jdbc-e2e-part-5 -am -Pci + ./mvnw -B -T 1 verify -DskipUT=true -DskipIT=false -D"license.skipAddThirdParty"=true -D"skip.ui"=true --no-snapshot-updates -pl :connector-jdbc-e2e-part-5 -am -Pci env: MAVEN_OPTS: -Xmx4096m @@ -1109,7 +1107,7 @@ jobs: run: tools/github/free_disk_space.sh - name: run jdbc connectors integration test (part-6) run: | - ./mvnw -B -T 1 verify -DskipUT=true -DskipIT=false -D"license.skipAddThirdParty"=true --no-snapshot-updates -pl :connector-jdbc-e2e-part-6 -am -Pci + ./mvnw -B -T 1 verify -DskipUT=true -DskipIT=false -D"license.skipAddThirdParty"=true -D"skip.ui"=true --no-snapshot-updates -pl :connector-jdbc-e2e-part-6 -am -Pci env: MAVEN_OPTS: -Xmx4096m @@ -1137,7 +1135,7 @@ jobs: run: tools/github/free_disk_space.sh - name: run jdbc connectors integration test (part-7) run: | - ./mvnw -B -T 1 verify -DskipUT=true -DskipIT=false -D"license.skipAddThirdParty"=true --no-snapshot-updates -pl :connector-jdbc-e2e-part-7 -am -Pci + ./mvnw -B -T 1 verify -DskipUT=true -DskipIT=false -D"license.skipAddThirdParty"=true -D"skip.ui"=true --no-snapshot-updates -pl :connector-jdbc-e2e-part-7 -am -Pci env: MAVEN_OPTS: -Xmx4096m @@ -1162,7 +1160,7 @@ jobs: run: tools/github/free_disk_space.sh - name: run kudu connector integration test run: | - ./mvnw -B -T 1 verify -DskipUT=true -DskipIT=false -D"license.skipAddThirdParty"=true --no-snapshot-updates -pl :connector-kudu-e2e -am -Pci + ./mvnw -B -T 1 verify -DskipUT=true -DskipIT=false -D"license.skipAddThirdParty"=true -D"skip.ui"=true --no-snapshot-updates -pl :connector-kudu-e2e -am -Pci env: MAVEN_OPTS: -Xmx4096m @@ -1187,7 +1185,7 @@ jobs: run: tools/github/free_disk_space.sh - name: run amazonsqs connector integration test run: | - ./mvnw -B -T 1 verify -DskipUT=true -DskipIT=false -D"license.skipAddThirdParty"=true --no-snapshot-updates -pl :connector-amazonsqs-e2e -am -Pci + ./mvnw -B -T 1 verify -DskipUT=true -DskipIT=false -D"license.skipAddThirdParty"=true -D"skip.ui"=true --no-snapshot-updates -pl :connector-amazonsqs-e2e -am -Pci env: MAVEN_OPTS: -Xmx4096m @@ -1212,7 +1210,7 @@ jobs: run: tools/github/free_disk_space.sh - name: run kafka connector integration test run: | - ./mvnw -B -T 1 verify -DskipUT=true -DskipIT=false -D"license.skipAddThirdParty"=true --no-snapshot-updates -pl :connector-kafka-e2e -am -Pci + ./mvnw -B -T 1 verify -DskipUT=true -DskipIT=false -D"license.skipAddThirdParty"=true -D"skip.ui"=true --no-snapshot-updates -pl :connector-kafka-e2e -am -Pci env: MAVEN_OPTS: -Xmx4096m @@ -1237,7 +1235,7 @@ jobs: run: tools/github/free_disk_space.sh - name: run rocket connector integration test run: | - ./mvnw -B -T 1 verify -DskipUT=true -DskipIT=false -D"license.skipAddThirdParty"=true --no-snapshot-updates -pl :connector-rocketmq-e2e -am -Pci + ./mvnw -B -T 1 verify -DskipUT=true -DskipIT=false -D"license.skipAddThirdParty"=true -D"skip.ui"=true --no-snapshot-updates -pl :connector-rocketmq-e2e -am -Pci env: MAVEN_OPTS: -Xmx4096m @@ -1263,11 +1261,10 @@ jobs: run: tools/github/free_disk_space.sh - name: run doris connector integration test run: | - ./mvnw -B -T 1 verify -DskipUT=true -DskipIT=false -D"license.skipAddThirdParty"=true --no-snapshot-updates -pl :connector-doris-e2e -am -Pci + ./mvnw -B -T 1 verify -DskipUT=true -DskipIT=false -D"license.skipAddThirdParty"=true -D"skip.ui"=true --no-snapshot-updates -pl :connector-doris-e2e -am -Pci env: MAVEN_OPTS: -Xmx4096m - paimon-connector-it: needs: [ changes, sanity-check ] if: needs.changes.outputs.api == 'true' || contains(needs.changes.outputs.it-modules, 'connector-paimon-e2e') @@ -1289,7 +1286,7 @@ jobs: run: tools/github/free_disk_space.sh - name: run paimon connector integration test run: | - ./mvnw -B -T 1 verify -DskipUT=true -DskipIT=false -D"license.skipAddThirdParty"=true --no-snapshot-updates -pl :connector-paimon-e2e -am -Pci + ./mvnw -B -T 1 verify -DskipUT=true -DskipIT=false -D"license.skipAddThirdParty"=true -D"skip.ui"=true --no-snapshot-updates -pl :connector-paimon-e2e -am -Pci env: MAVEN_OPTS: -Xmx4096m @@ -1344,7 +1341,7 @@ jobs: run: tools/github/free_disk_space.sh - name: run file local connector integration test run: | - ./mvnw -B -T 1 verify -DskipUT=true -DskipIT=false -D"license.skipAddThirdParty"=true --no-snapshot-updates -pl :connector-file-local-e2e -am -Pci + ./mvnw -B -T 1 verify -DskipUT=true -DskipIT=false -D"license.skipAddThirdParty"=true -D"skip.ui"=true --no-snapshot-updates -pl :connector-file-local-e2e -am -Pci env: MAVEN_OPTS: -Xmx4096m @@ -1369,7 +1366,7 @@ jobs: run: tools/github/free_disk_space.sh - name: run file sftp connector integration test run: | - ./mvnw -B -T 1 verify -DskipUT=true -DskipIT=false -D"license.skipAddThirdParty"=true --no-snapshot-updates -pl :connector-file-sftp-e2e -am -Pci + ./mvnw -B -T 1 verify -DskipUT=true -DskipIT=false -D"license.skipAddThirdParty"=true -D"skip.ui"=true --no-snapshot-updates -pl :connector-file-sftp-e2e -am -Pci env: MAVEN_OPTS: -Xmx4096m @@ -1394,6 +1391,6 @@ jobs: run: tools/github/free_disk_space.sh - name: run redis connector integration test run: | - ./mvnw -B -T 1 verify -DskipUT=true -DskipIT=false -D"license.skipAddThirdParty"=true --no-snapshot-updates -pl :connector-redis-e2e -am -Pci + ./mvnw -B -T 1 verify -DskipUT=true -DskipIT=false -D"license.skipAddThirdParty"=true -D"skip.ui"=true --no-snapshot-updates -pl :connector-redis-e2e -am -Pci env: MAVEN_OPTS: -Xmx4096m diff --git a/docs/en/seatunnel-engine/rest-api-v2.md b/docs/en/seatunnel-engine/rest-api-v2.md index bc85d51befc..57908ce27e2 100644 --- a/docs/en/seatunnel-engine/rest-api-v2.md +++ b/docs/en/seatunnel-engine/rest-api-v2.md @@ -21,7 +21,7 @@ seatunnel: http: enable-http: true port: 8080 - enable-dynamic-port: false + enable-dynamic-port: true port-range: 100 ``` diff --git a/docs/zh/seatunnel-engine/rest-api-v2.md b/docs/zh/seatunnel-engine/rest-api-v2.md index f15ef3188ae..a3d5f67ecf5 100644 --- a/docs/zh/seatunnel-engine/rest-api-v2.md +++ b/docs/zh/seatunnel-engine/rest-api-v2.md @@ -19,7 +19,7 @@ seatunnel: http: enable-http: true port: 8080 - enable-dynamic-port: false + enable-dynamic-port: true port-range: 100 ``` diff --git a/seatunnel-ci-tools/src/test/java/org/apache/seatunnel/api/ChineseCharacterCheckTest.java b/seatunnel-ci-tools/src/test/java/org/apache/seatunnel/api/ChineseCharacterCheckTest.java new file mode 100644 index 00000000000..d0b2b838a1a --- /dev/null +++ b/seatunnel-ci-tools/src/test/java/org/apache/seatunnel/api/ChineseCharacterCheckTest.java @@ -0,0 +1,180 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.api; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Disabled; +import org.junit.jupiter.api.Test; + +import com.github.javaparser.JavaParser; +import com.github.javaparser.ParseResult; +import com.github.javaparser.ast.CompilationUnit; +import com.github.javaparser.ast.comments.Comment; +import com.github.javaparser.ast.visitor.VoidVisitorAdapter; +import lombok.extern.slf4j.Slf4j; + +import java.io.IOException; +import java.nio.file.FileVisitOption; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.ArrayList; +import java.util.List; +import java.util.regex.Pattern; +import java.util.stream.Stream; + +import static org.apache.seatunnel.api.ImportShadeClassCheckTest.isWindows; + +@Slf4j +public class ChineseCharacterCheckTest { + + private final JavaParser JAVA_PARSER = new JavaParser(); + + private static final Pattern CHINESE_PATTERN = Pattern.compile("[\\u4e00-\\u9fa5]"); + + /** Defines what content should be checked for Chinese characters */ + public enum CheckScope { + /** Check both comments and code */ + ALL, + /** Check only comments */ + COMMENTS_ONLY, + /** Check only code (string literals) */ + CODE_ONLY + } + + @Disabled("Currently only checking comments") + @Test + public void checkChineseCharactersInAll() { + checkChineseCharacters(CheckScope.ALL); + } + + @Test + public void checkChineseCharactersInCommentsOnly() { + checkChineseCharacters(CheckScope.COMMENTS_ONLY); + } + + @Disabled("Currently only checking comments") + @Test + public void checkChineseCharactersInCodeOnly() { + checkChineseCharacters(CheckScope.CODE_ONLY); + } + + private void checkChineseCharacters(CheckScope scope) { + // Define path fragments for source and test Java files + String mainPathFragment = isWindows ? "src\\main\\java" : "src/main/java"; + String testPathFragment2 = isWindows ? "src\\test\\java" : "src/test/java"; + + try (Stream paths = Files.walk(Paths.get(".."), FileVisitOption.FOLLOW_LINKS)) { + List filesWithChinese = new ArrayList<>(); + + // Filter Java files in the specified directories + paths.filter( + path -> { + String pathString = path.toString(); + return pathString.endsWith(".java") + && (pathString.contains(mainPathFragment) + || pathString.contains(testPathFragment2)); + }) + .forEach( + path -> { + try { + // Parse the Java file + ParseResult parseResult = + JAVA_PARSER.parse(Files.newInputStream(path)); + + parseResult + .getResult() + .ifPresent( + cu -> { + // Check for Chinese characters in comments + // if needed + if (scope != CheckScope.CODE_ONLY) { + List comments = + cu.getAllContainedComments(); + for (Comment comment : comments) { + if (CHINESE_PATTERN + .matcher( + comment + .getContent()) + .find()) { + filesWithChinese.add( + String.format( + "Found Chinese characters in comment at %s: %s", + path + .toAbsolutePath(), + comment.getContent() + .trim())); + } + } + } + + // Check for Chinese characters in code if + // needed + if (scope != CheckScope.COMMENTS_ONLY) { + ChineseCharacterVisitor visitor = + new ChineseCharacterVisitor( + path, filesWithChinese); + visitor.visit(cu, null); + } + }); + + } catch (Exception e) { + log.error("Error parsing file: {}", path, e); + } + }); + + // Assert that no files contain Chinese characters + Assertions.assertEquals( + 0, + filesWithChinese.size(), + () -> + String.format( + "Found Chinese characters in following files (Scope: %s):\n%s", + scope, String.join("\n", filesWithChinese))); + + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + private static class ChineseCharacterVisitor extends VoidVisitorAdapter { + private final Path filePath; + private final List filesWithChinese; + + public ChineseCharacterVisitor(Path filePath, List filesWithChinese) { + this.filePath = filePath; + this.filesWithChinese = filesWithChinese; + } + + @Override + public void visit(CompilationUnit cu, Void arg) { + // Check for Chinese characters in string literals + cu.findAll(com.github.javaparser.ast.expr.StringLiteralExpr.class) + .forEach( + str -> { + if (CHINESE_PATTERN.matcher(str.getValue()).find()) { + filesWithChinese.add( + String.format( + "Found Chinese characters in string literal at %s: %s", + filePath.toAbsolutePath(), str.getValue())); + } + }); + super.visit(cu, arg); + } + } +} diff --git a/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/system/SeaTunnelFTPFileSystem.java b/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/system/SeaTunnelFTPFileSystem.java index 029890918da..963d18c7037 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/system/SeaTunnelFTPFileSystem.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/system/SeaTunnelFTPFileSystem.java @@ -342,6 +342,7 @@ private boolean exists(FTPClient client, Path file) throws IOException { try { return getFileStatus(client, file) != null; } catch (FileNotFoundException fnfe) { + LOG.debug("File does not exist: " + file, fnfe); return false; } } @@ -557,12 +558,18 @@ private boolean mkdirs(FTPClient client, Path file, FsPermission permission) if (created) { String parentDir = parent.toUri().getPath(); client.changeWorkingDirectory(parentDir); - created = created && client.makeDirectory(pathName); + LOG.debug("Creating directory " + pathName); + created = client.makeDirectory(pathName); } } else if (isFile(client, absolute)) { throw new ParentNotDirectoryException( String.format( "Can't make directory for path %s since it is a file.", absolute)); + } else { + LOG.debug("Skipping creation of existing directory " + file); + } + if (!created) { + LOG.debug("Failed to create " + file); } return created; } diff --git a/seatunnel-connectors-v2/connector-file/connector-file-sftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sftp/system/SFTPFileSystem.java b/seatunnel-connectors-v2/connector-file/connector-file-sftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sftp/system/SFTPFileSystem.java index f49145bc4cd..83fccdeb3c4 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-sftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sftp/system/SFTPFileSystem.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-sftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sftp/system/SFTPFileSystem.java @@ -147,6 +147,7 @@ private boolean exists(ChannelSftp channel, Path file) throws IOException { getFileStatus(channel, file); return true; } catch (FileNotFoundException fnfe) { + LOG.debug("File does not exist: " + file, fnfe); return false; } catch (IOException ioe) { throw new IOException(E_FILE_STATUS, ioe); @@ -284,6 +285,7 @@ private boolean mkdirs(ChannelSftp client, Path file, FsPermission permission) try { final String previousCwd = client.pwd(); client.cd(parentDir); + LOG.debug("Creating directory " + pathName); client.mkdir(pathName); client.cd(previousCwd); } catch (SftpException e) { @@ -293,6 +295,11 @@ private boolean mkdirs(ChannelSftp client, Path file, FsPermission permission) } } else if (isFile(client, absolute)) { throw new IOException(String.format(E_DIR_CREATE_FROMFILE, absolute)); + } else { + LOG.debug("Skipping creation of existing directory " + file); + } + if (!created) { + LOG.debug("Failed to create " + file); } return created; } diff --git a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSink.java b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSink.java index 13f48823b2e..891ef1ae58c 100644 --- a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSink.java +++ b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSink.java @@ -219,6 +219,7 @@ private HadoopConf createHadoopConf(ReadonlyConfig readonlyConfig) { .getOptional(HiveOptions.HDFS_SITE_PATH) .ifPresent(hadoopConf::setHdfsSitePath); readonlyConfig.getOptional(HiveOptions.REMOTE_USER).ifPresent(hadoopConf::setRemoteUser); + readonlyConfig.getOptional(HiveOptions.KRB5_PATH).ifPresent(hadoopConf::setKrb5Path); readonlyConfig .getOptional(HiveOptions.KERBEROS_PRINCIPAL) .ifPresent(hadoopConf::setKerberosPrincipal); diff --git a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/config/HiveSourceConfig.java b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/config/HiveSourceConfig.java index dd3d9478c13..25f95c2c4e5 100644 --- a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/config/HiveSourceConfig.java +++ b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/config/HiveSourceConfig.java @@ -182,6 +182,9 @@ private HadoopConf parseHiveHadoopConfig(ReadonlyConfig readonlyConfig, Table ta readonlyConfig .getOptional(HdfsSourceConfigOptions.HDFS_SITE_PATH) .ifPresent(hadoopConf::setHdfsSitePath); + readonlyConfig + .getOptional(HdfsSourceConfigOptions.KRB5_PATH) + .ifPresent(hadoopConf::setKrb5Path); readonlyConfig .getOptional(HdfsSourceConfigOptions.KERBEROS_PRINCIPAL) .ifPresent(hadoopConf::setKerberosPrincipal); diff --git a/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/catalog/HudiCatalog.java b/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/catalog/HudiCatalog.java index 0d238c193d8..63a02ff7fb0 100644 --- a/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/catalog/HudiCatalog.java +++ b/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/catalog/HudiCatalog.java @@ -100,7 +100,9 @@ public void open() throws CatalogException { @Override public void close() throws CatalogException { try { - fs.close(); + if (fs != null) { + fs.close(); + } } catch (Exception e) { log.info("Hudi catalog close error.", e); } diff --git a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/sink/RedisSinkWriter.java b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/sink/RedisSinkWriter.java index 71739b57897..b634462fbf4 100644 --- a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/sink/RedisSinkWriter.java +++ b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/sink/RedisSinkWriter.java @@ -39,6 +39,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Optional; public class RedisSinkWriter extends AbstractSinkWriter implements SupportMultiTableSinkWriter { @@ -78,8 +79,7 @@ public void write(SeaTunnelRow element) throws IOException { String value = getValue(element, fields); valueBuffer.add(value); if (keyBuffer.size() >= batchSize) { - doBatchWrite(); - clearBuffer(); + flush(); } } @@ -221,6 +221,16 @@ private void doBatchWrite() { @Override public void close() throws IOException { + flush(); + } + + @Override + public Optional prepareCommit() { + flush(); + return Optional.empty(); + } + + private synchronized void flush() { if (!keyBuffer.isEmpty()) { doBatchWrite(); clearBuffer(); diff --git a/seatunnel-connectors-v2/connector-sls/src/main/java/org/apache/seatunnel/connectors/seatunnel/sls/serialization/FastLogDeserializationContent.java b/seatunnel-connectors-v2/connector-sls/src/main/java/org/apache/seatunnel/connectors/seatunnel/sls/serialization/FastLogDeserializationContent.java index 27bd35bff23..86851ad89a8 100644 --- a/seatunnel-connectors-v2/connector-sls/src/main/java/org/apache/seatunnel/connectors/seatunnel/sls/serialization/FastLogDeserializationContent.java +++ b/seatunnel-connectors-v2/connector-sls/src/main/java/org/apache/seatunnel/connectors/seatunnel/sls/serialization/FastLogDeserializationContent.java @@ -90,7 +90,8 @@ private SeaTunnelRow convertFastLogContent(FastLog log) { .append("\":\"") .append(content.getValue()) .append("\",")); - jsonStringBuilder.deleteCharAt(jsonStringBuilder.length() - 1); // 删除最后一个逗号 + // Remove the last comma + jsonStringBuilder.deleteCharAt(jsonStringBuilder.length() - 1); jsonStringBuilder.append("}"); // content field transformedRow.add(jsonStringBuilder.toString()); diff --git a/seatunnel-connectors-v2/connector-typesense/src/main/java/org/apache/seatunnel/connectors/seatunnel/typesense/serialize/source/DefaultSeaTunnelRowDeserializer.java b/seatunnel-connectors-v2/connector-typesense/src/main/java/org/apache/seatunnel/connectors/seatunnel/typesense/serialize/source/DefaultSeaTunnelRowDeserializer.java index 762506d4980..08dcd85f7ff 100644 --- a/seatunnel-connectors-v2/connector-typesense/src/main/java/org/apache/seatunnel/connectors/seatunnel/typesense/serialize/source/DefaultSeaTunnelRowDeserializer.java +++ b/seatunnel-connectors-v2/connector-typesense/src/main/java/org/apache/seatunnel/connectors/seatunnel/typesense/serialize/source/DefaultSeaTunnelRowDeserializer.java @@ -115,10 +115,10 @@ SeaTunnelRow convert(TypesenseRecord rowRecord) { try { for (int i = 0; i < rowTypeInfo.getTotalFields(); i++) { fieldName = rowTypeInfo.getFieldName(i); - value = doc.get(fieldName); // 字段值 + value = doc.get(fieldName); if (value != null) { - seaTunnelDataType = - rowTypeInfo.getFieldType(i); // seaTunnelDataType 为SeaTunnel类型 + // seaTunnelDataType is the SeaTunnel type + seaTunnelDataType = rowTypeInfo.getFieldType(i); seaTunnelFields[i] = convertValue(seaTunnelDataType, value); } } diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-oracle-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/OracleCDCWithSchemaChangeIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-oracle-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/OracleCDCWithSchemaChangeIT.java index 3502b547579..a6a1ed920d9 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-oracle-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/OracleCDCWithSchemaChangeIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-oracle-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/OracleCDCWithSchemaChangeIT.java @@ -37,6 +37,7 @@ import org.apache.seatunnel.e2e.common.container.TestContainer; import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer; import org.apache.seatunnel.e2e.common.junit.TestContainerExtension; +import org.apache.seatunnel.e2e.common.util.JobIdGenerator; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.Assertions; @@ -65,6 +66,7 @@ import static org.awaitility.Awaitility.await; import static org.awaitility.Awaitility.with; import static org.awaitility.Durations.TWO_SECONDS; +import static org.testcontainers.shaded.org.awaitility.Awaitility.given; @Slf4j @TestMethodOrder(MethodOrderer.OrderAnnotation.class) @@ -192,17 +194,24 @@ public void testOracleCdc2MysqlWithSchemaEvolutionCase(TestContainer container) dropTable(ORACLE_CONTAINER.getJdbcUrl(), SCEHMA_NAME, SOURCE_TABLE1); dropTable(ORACLE_CONTAINER.getJdbcUrl(), SCEHMA_NAME, SOURCE_TABLE1 + "_SINK"); createAndInitialize("full_types", ADMIN_USER, ADMIN_PWD); + String jobId = String.valueOf(JobIdGenerator.newJobId()); CompletableFuture.runAsync( () -> { try { - container.executeJob("/oraclecdc_to_mysql_with_schema_change.conf"); + container.executeJob("/oraclecdc_to_mysql_with_schema_change.conf", jobId); } catch (Exception e) { log.error("Commit task exception :" + e.getMessage()); throw new RuntimeException(e); } }); - Thread.sleep(10000L); + given().pollDelay(10, TimeUnit.SECONDS) + .await() + .pollDelay(5000L, TimeUnit.MILLISECONDS) + .untilAsserted( + () -> { + Assertions.assertEquals("RUNNING", container.getJobStatus(jobId)); + }); assertSchemaEvolution( ORACLE_CONTAINER.getJdbcUrl(), diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/java/org/apache/seatunnel/e2e/connector/doris/DorisIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/java/org/apache/seatunnel/e2e/connector/doris/DorisIT.java index 3d9b0572db4..178ed0ffba5 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/java/org/apache/seatunnel/e2e/connector/doris/DorisIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/java/org/apache/seatunnel/e2e/connector/doris/DorisIT.java @@ -738,7 +738,7 @@ public void close() throws SQLException { } public void getErrorUrl(String message) { - // 使用正则表达式匹配URL + // Using regular expressions to match URLs Pattern pattern = Pattern.compile("http://[\\w./?=&-_]+"); Matcher matcher = pattern.matcher(message); String urlString = null; @@ -754,12 +754,12 @@ public void getErrorUrl(String message) { URL url = new URL(urlString); HttpURLConnection connection = (HttpURLConnection) url.openConnection(); - // 设置请求方法 + // Set the request method connection.setRequestMethod("GET"); - // 设置连接超时时间 + // Set the connection timeout connection.setConnectTimeout(5000); - // 设置读取超时时间 + // Set the read timeout connection.setReadTimeout(5000); int responseCode = connection.getResponseCode(); diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/java/org/apache/seatunnel/e2e/connector/doris/DorisMultiReadIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/java/org/apache/seatunnel/e2e/connector/doris/DorisMultiReadIT.java index dd604b57145..cbdbfc4ae41 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/java/org/apache/seatunnel/e2e/connector/doris/DorisMultiReadIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/java/org/apache/seatunnel/e2e/connector/doris/DorisMultiReadIT.java @@ -493,7 +493,7 @@ public void close() throws SQLException { } public void getErrorUrl(String message) { - // 使用正则表达式匹配URL + // Using regular expressions to match URLs Pattern pattern = Pattern.compile("http://[\\w./?=&-_]+"); Matcher matcher = pattern.matcher(message); String urlString = null; @@ -509,12 +509,12 @@ public void getErrorUrl(String message) { URL url = new URL(urlString); HttpURLConnection connection = (HttpURLConnection) url.openConnection(); - // 设置请求方法 + // Set the request method connection.setRequestMethod("GET"); - // 设置连接超时时间 + // Set the connection timeout connection.setConnectTimeout(5000); - // 设置读取超时时间 + // Set the read timeout connection.setReadTimeout(5000); int responseCode = connection.getResponseCode(); diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hudi-e2e/pom.xml b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hudi-e2e/pom.xml index 7fe8cc8523e..ac49a5bec23 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hudi-e2e/pom.xml +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hudi-e2e/pom.xml @@ -59,11 +59,16 @@ org.apache.seatunnel - connector-seatunnel-e2e-base + seatunnel-hadoop3-3.1.4-uber ${project.version} - tests - test-jar + optional test + + + org.apache.avro + avro + + diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hudi-e2e/src/test/java/org/apache/seatunnel/e2e/connector/hudi/HudiSeatunnelS3MultiTableIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hudi-e2e/src/test/java/org/apache/seatunnel/e2e/connector/hudi/HudiSeatunnelS3MultiTableIT.java index 237fd100d26..64451bb2786 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hudi-e2e/src/test/java/org/apache/seatunnel/e2e/connector/hudi/HudiSeatunnelS3MultiTableIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hudi-e2e/src/test/java/org/apache/seatunnel/e2e/connector/hudi/HudiSeatunnelS3MultiTableIT.java @@ -18,8 +18,8 @@ package org.apache.seatunnel.e2e.connector.hudi; import org.apache.seatunnel.common.utils.FileUtils; +import org.apache.seatunnel.e2e.common.container.seatunnel.SeaTunnelContainer; import org.apache.seatunnel.e2e.common.util.ContainerUtil; -import org.apache.seatunnel.engine.e2e.SeaTunnelContainer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.LocalFileSystem; @@ -32,16 +32,13 @@ import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestInstance; import org.testcontainers.containers.Container; import org.testcontainers.containers.MinIOContainer; import io.minio.BucketExistsArgs; -import io.minio.DownloadObjectArgs; -import io.minio.ListObjectsArgs; import io.minio.MakeBucketArgs; import io.minio.MinioClient; -import io.minio.Result; -import io.minio.messages.Item; import lombok.extern.slf4j.Slf4j; import java.io.IOException; @@ -51,6 +48,7 @@ import static org.awaitility.Awaitility.given; @Slf4j +@TestInstance(TestInstance.Lifecycle.PER_CLASS) public class HudiSeatunnelS3MultiTableIT extends SeaTunnelContainer { private static final String MINIO_DOCKER_IMAGE = "minio/minio:RELEASE.2024-06-13T22-53-53Z"; @@ -88,7 +86,6 @@ public void startUp() throws Exception { String s3URL = container.getS3URL(); - // configuringClient minioClient = MinioClient.builder() .endpoint(s3URL) @@ -111,11 +108,13 @@ protected String[] buildStartCommand() { "wget -P " + SEATUNNEL_HOME + "lib " + + " --timeout=180 " + AWS_SDK_DOWNLOAD + " &&" + "wget -P " + SEATUNNEL_HOME + "lib " + + " --timeout=180 " + HADOOP_AWS_DOWNLOAD + " &&" + ContainerUtil.adaptPathForWin( @@ -132,90 +131,81 @@ public void tearDown() throws Exception { } } + @Override + protected boolean isIssueWeAlreadyKnow(String threadName) { + return super.isIssueWeAlreadyKnow(threadName) + // hudi with s3 + || threadName.startsWith("s3a-transfer"); + } + @Test public void testS3MultiWrite() throws IOException, InterruptedException { copyFileToContainer("/hudi/core-site.xml", "/tmp/seatunnel/config/core-site.xml"); - Container.ExecResult textWriteResult = executeSeaTunnelJob("/hudi/s3_fake_to_hudi.conf"); + Container.ExecResult textWriteResult = executeJob("/hudi/s3_fake_to_hudi.conf"); Assertions.assertEquals(0, textWriteResult.getExitCode()); Configuration configuration = new Configuration(); configuration.set("fs.defaultFS", LocalFileSystem.DEFAULT_FS); - given().ignoreExceptions() + given().pollDelay(10, TimeUnit.SECONDS) + .pollInterval(1, TimeUnit.SECONDS) .await() - .atMost(60000, TimeUnit.MILLISECONDS) + .atMost(300, TimeUnit.SECONDS) .untilAsserted( () -> { // copy hudi to local - Path inputPath1 = - downloadNewestCommitFile(DATABASE_1 + "/" + TABLE_NAME_1 + "/"); - Path inputPath2 = - downloadNewestCommitFile(DATABASE_2 + "/" + TABLE_NAME_2 + "/"); - ParquetReader reader1 = - ParquetReader.builder(new GroupReadSupport(), inputPath1) - .withConf(configuration) - .build(); - ParquetReader reader2 = - ParquetReader.builder(new GroupReadSupport(), inputPath2) - .withConf(configuration) - .build(); - - long rowCount1 = 0; - long rowCount2 = 0; - // Read data and count rows - while (reader1.read() != null) { - rowCount1++; - } - // Read data and count rows - while (reader2.read() != null) { - rowCount2++; + Path inputPath1 = null; + Path inputPath2 = null; + try { + inputPath1 = + new Path( + MinIoUtils.downloadNewestCommitFile( + minioClient, + BUCKET, + String.format( + "%s/%s/", DATABASE_1, TABLE_NAME_1), + DOWNLOAD_PATH)); + log.info( + "download from s3 success, the parquet file is at: {}", + inputPath1); + inputPath2 = + new Path( + MinIoUtils.downloadNewestCommitFile( + minioClient, + BUCKET, + String.format( + "%s/%s/", DATABASE_2, TABLE_NAME_2), + DOWNLOAD_PATH)); + log.info( + "download from s3 success, the parquet file is at: {}", + inputPath2); + ParquetReader reader1 = + ParquetReader.builder(new GroupReadSupport(), inputPath1) + .withConf(configuration) + .build(); + ParquetReader reader2 = + ParquetReader.builder(new GroupReadSupport(), inputPath2) + .withConf(configuration) + .build(); + + long rowCount1 = 0; + long rowCount2 = 0; + // Read data and count rows + while (reader1.read() != null) { + rowCount1++; + } + // Read data and count rows + while (reader2.read() != null) { + rowCount2++; + } + Assertions.assertEquals(100, rowCount1); + Assertions.assertEquals(240, rowCount2); + } finally { + if (inputPath1 != null) { + FileUtils.deleteFile(inputPath1.toUri().getPath()); + } + if (inputPath2 != null) { + FileUtils.deleteFile(inputPath2.toUri().getPath()); + } } - FileUtils.deleteFile(inputPath1.toUri().getPath()); - FileUtils.deleteFile(inputPath2.toUri().getPath()); - Assertions.assertEquals(100, rowCount1); - Assertions.assertEquals(240, rowCount2); }); } - - public Path downloadNewestCommitFile(String pathPrefix) throws IOException { - Iterable> listObjects = - minioClient.listObjects( - ListObjectsArgs.builder().bucket(BUCKET).prefix(pathPrefix).build()); - String newestCommitFileabsolutePath = ""; - String newestCommitFileName = ""; - long newestCommitTime = 0L; - for (Result listObject : listObjects) { - Item item; - try { - item = listObject.get(); - } catch (Exception e) { - throw new IOException("List minio file error.", e); - } - if (item.isDir() || !item.objectName().endsWith(".parquet")) { - continue; - } - long fileCommitTime = - Long.parseLong( - item.objectName() - .substring( - item.objectName().lastIndexOf("_") + 1, - item.objectName().lastIndexOf(".parquet"))); - if (fileCommitTime > newestCommitTime) { - newestCommitFileabsolutePath = item.objectName(); - newestCommitFileName = - newestCommitFileabsolutePath.substring( - item.objectName().lastIndexOf("/") + 1); - newestCommitTime = fileCommitTime; - } - } - try { - minioClient.downloadObject( - DownloadObjectArgs.builder() - .bucket(BUCKET) - .object(newestCommitFileabsolutePath) - .filename(DOWNLOAD_PATH + newestCommitFileName) - .build()); - } catch (Exception e) { - log.error("Download file from minio error."); - } - return new Path(DOWNLOAD_PATH + newestCommitFileName); - } } diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hudi-e2e/src/test/java/org/apache/seatunnel/e2e/connector/hudi/HudiSparkS3MultiTableIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hudi-e2e/src/test/java/org/apache/seatunnel/e2e/connector/hudi/HudiSparkS3MultiTableIT.java index f91f340f3c3..8b598d443b3 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hudi-e2e/src/test/java/org/apache/seatunnel/e2e/connector/hudi/HudiSparkS3MultiTableIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hudi-e2e/src/test/java/org/apache/seatunnel/e2e/connector/hudi/HudiSparkS3MultiTableIT.java @@ -32,21 +32,18 @@ import org.apache.parquet.hadoop.ParquetReader; import org.apache.parquet.hadoop.example.GroupReadSupport; -import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.Assertions; -import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.TestTemplate; import org.testcontainers.containers.Container; import org.testcontainers.containers.MinIOContainer; import io.minio.BucketExistsArgs; -import io.minio.DownloadObjectArgs; -import io.minio.ListObjectsArgs; import io.minio.MakeBucketArgs; import io.minio.MinioClient; -import io.minio.Result; -import io.minio.messages.Item; import lombok.extern.slf4j.Slf4j; +import okhttp3.OkHttpClient; import java.io.IOException; import java.util.concurrent.TimeUnit; @@ -72,7 +69,7 @@ public class HudiSparkS3MultiTableIT extends TestSuiteBase implements TestResour private static final String TABLE_NAME_2 = "st_test_2"; private static final String DOWNLOAD_PATH = "/tmp/seatunnel/"; - @BeforeEach + @BeforeAll @Override public void startUp() throws Exception { container = @@ -86,11 +83,15 @@ public void startUp() throws Exception { String s3URL = container.getS3URL(); + OkHttpClient.Builder builder = new OkHttpClient.Builder(); + builder.connectTimeout(10, TimeUnit.SECONDS); + builder.readTimeout(10, TimeUnit.SECONDS); // configuringClient minioClient = MinioClient.builder() .endpoint(s3URL) .credentials(container.getUserName(), container.getPassword()) + .httpClient(builder.build()) .build(); // create bucket @@ -100,7 +101,7 @@ public void startUp() throws Exception { Assertions.assertTrue(minioClient.bucketExists(existsArgs)); } - @AfterEach + @AfterAll @Override public void tearDown() throws Exception { if (container != null) { @@ -121,82 +122,68 @@ public void testS3MultiWrite(TestContainer container) throws IOException, Interr Configuration configuration = new Configuration(); configuration.set("fs.defaultFS", LocalFileSystem.DEFAULT_FS); given().ignoreExceptions() + .pollDelay(5, TimeUnit.SECONDS) + .pollInterval(2, TimeUnit.SECONDS) .await() - .atMost(60000, TimeUnit.MILLISECONDS) + .atMost(300, TimeUnit.SECONDS) .untilAsserted( () -> { // copy hudi to local - Path inputPath1 = - downloadNewestCommitFile(DATABASE_1 + "/" + TABLE_NAME_1 + "/"); - Path inputPath2 = - downloadNewestCommitFile(DATABASE_2 + "/" + TABLE_NAME_2 + "/"); - ParquetReader reader1 = - ParquetReader.builder(new GroupReadSupport(), inputPath1) - .withConf(configuration) - .build(); - ParquetReader reader2 = - ParquetReader.builder(new GroupReadSupport(), inputPath2) - .withConf(configuration) - .build(); - - long rowCount1 = 0; - long rowCount2 = 0; - // Read data and count rows - while (reader1.read() != null) { - rowCount1++; - } - // Read data and count rows - while (reader2.read() != null) { - rowCount2++; + // copy hudi to local + Path inputPath1 = null; + Path inputPath2 = null; + try { + inputPath1 = + new Path( + MinIoUtils.downloadNewestCommitFile( + minioClient, + BUCKET, + String.format( + "%s/%s/", DATABASE_1, TABLE_NAME_1), + DOWNLOAD_PATH)); + log.info( + "download from s3 success, the parquet file is at: {}", + inputPath1); + inputPath2 = + new Path( + MinIoUtils.downloadNewestCommitFile( + minioClient, + BUCKET, + String.format( + "%s/%s/", DATABASE_2, TABLE_NAME_2), + DOWNLOAD_PATH)); + log.info( + "download from s3 success, the parquet file is at: {}", + inputPath2); + ParquetReader reader1 = + ParquetReader.builder(new GroupReadSupport(), inputPath1) + .withConf(configuration) + .build(); + ParquetReader reader2 = + ParquetReader.builder(new GroupReadSupport(), inputPath2) + .withConf(configuration) + .build(); + + long rowCount1 = 0; + long rowCount2 = 0; + // Read data and count rows + while (reader1.read() != null) { + rowCount1++; + } + // Read data and count rows + while (reader2.read() != null) { + rowCount2++; + } + Assertions.assertEquals(100, rowCount1); + Assertions.assertEquals(240, rowCount2); + } finally { + if (inputPath1 != null) { + FileUtils.deleteFile(inputPath1.toUri().getPath()); + } + if (inputPath2 != null) { + FileUtils.deleteFile(inputPath2.toUri().getPath()); + } } - FileUtils.deleteFile(inputPath1.toUri().getPath()); - FileUtils.deleteFile(inputPath2.toUri().getPath()); - Assertions.assertEquals(100, rowCount1); - Assertions.assertEquals(240, rowCount2); }); } - - public Path downloadNewestCommitFile(String pathPrefix) throws IOException { - Iterable> listObjects = - minioClient.listObjects( - ListObjectsArgs.builder().bucket(BUCKET).prefix(pathPrefix).build()); - String newestCommitFileabsolutePath = ""; - String newestCommitFileName = ""; - long newestCommitTime = 0L; - for (Result listObject : listObjects) { - Item item; - try { - item = listObject.get(); - } catch (Exception e) { - throw new IOException("List minio file error.", e); - } - if (item.isDir() || !item.objectName().endsWith(".parquet")) { - continue; - } - long fileCommitTime = - Long.parseLong( - item.objectName() - .substring( - item.objectName().lastIndexOf("_") + 1, - item.objectName().lastIndexOf(".parquet"))); - if (fileCommitTime > newestCommitTime) { - newestCommitFileabsolutePath = item.objectName(); - newestCommitFileName = - newestCommitFileabsolutePath.substring( - item.objectName().lastIndexOf("/") + 1); - newestCommitTime = fileCommitTime; - } - } - try { - minioClient.downloadObject( - DownloadObjectArgs.builder() - .bucket(BUCKET) - .object(newestCommitFileabsolutePath) - .filename(DOWNLOAD_PATH + newestCommitFileName) - .build()); - } catch (Exception e) { - log.error("Download file from minio error."); - } - return new Path(DOWNLOAD_PATH + newestCommitFileName); - } } diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hudi-e2e/src/test/java/org/apache/seatunnel/e2e/connector/hudi/MinIoUtils.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hudi-e2e/src/test/java/org/apache/seatunnel/e2e/connector/hudi/MinIoUtils.java new file mode 100644 index 00000000000..9cdfdc531e0 --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hudi-e2e/src/test/java/org/apache/seatunnel/e2e/connector/hudi/MinIoUtils.java @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.e2e.connector.hudi; + +import io.minio.GetObjectArgs; +import io.minio.ListObjectsArgs; +import io.minio.MinioClient; +import io.minio.Result; +import io.minio.messages.Item; +import lombok.extern.slf4j.Slf4j; + +import java.io.FilterInputStream; +import java.io.OutputStream; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; + +@Slf4j +public class MinIoUtils { + + public static String downloadNewestCommitFile( + MinioClient minioClient, String bucketName, String pathPrefix, String downloadPath) { + Iterable> listObjects = + minioClient.listObjects( + ListObjectsArgs.builder().bucket(bucketName).prefix(pathPrefix).build()); + long newestCommitTime = 0L; + String objectPath = null; + for (Result listObject : listObjects) { + Item item = null; + try { + item = listObject.get(); + } catch (Exception e) { + throw new RuntimeException(e); + } + if (item.isDir() || !item.objectName().endsWith(".parquet")) { + continue; + } + long fileCommitTime = + Long.parseLong( + item.objectName() + .substring( + item.objectName().lastIndexOf("_") + 1, + item.objectName().lastIndexOf(".parquet"))); + if (fileCommitTime > newestCommitTime) { + objectPath = item.objectName(); + } + } + log.info("download object path: {}", objectPath); + assert objectPath != null; + Path path = + Paths.get( + createDir(downloadPath) + + objectPath.substring(objectPath.lastIndexOf("/") + 1)); + try (FilterInputStream inputStream = + minioClient.getObject( + GetObjectArgs.builder() + .bucket(bucketName) + .object(objectPath) + .build()); + OutputStream outputStream = Files.newOutputStream(path)) { + byte[] buffer = new byte[1024]; + int bytesRead; + while ((bytesRead = inputStream.read(buffer)) != -1) { + outputStream.write(buffer, 0, bytesRead); + } + } catch (Exception e) { + log.error("download error \n", e); + throw new RuntimeException(e); + } + log.info("download success path: {}", path); + return path.toFile().getAbsolutePath(); + } + + private static String createDir(String downloadPath) { + Path path = Paths.get(downloadPath); + if (!Files.exists(path)) { + try { + Files.createDirectories(path); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + return downloadPath; + } +} diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-iceberg-e2e/src/test/java/org/apache/seatunnel/e2e/connector/iceberg/IcebergSinkIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-iceberg-e2e/src/test/java/org/apache/seatunnel/e2e/connector/iceberg/IcebergSinkIT.java index 20c1b02914e..0f5e0dfe0b5 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-iceberg-e2e/src/test/java/org/apache/seatunnel/e2e/connector/iceberg/IcebergSinkIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-iceberg-e2e/src/test/java/org/apache/seatunnel/e2e/connector/iceberg/IcebergSinkIT.java @@ -108,7 +108,7 @@ private void extractFiles() { "sh", "-c", "cd " + CATALOG_DIR + " && tar -zxvf " + NAMESPACE_TAR); try { Process process = processBuilder.start(); - // 等待命令执行完成 + // Wait for the command to complete int exitCode = process.waitFor(); if (exitCode == 0) { log.info("Extract files successful."); diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/pom.xml b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/pom.xml index c1ce438be3a..3fc619d2255 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/pom.xml +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/pom.xml @@ -57,14 +57,6 @@ test - - org.apache.seatunnel - connector-seatunnel-e2e-base - ${project.version} - tests - test-jar - test - org.apache.seatunnel seatunnel-hadoop3-3.1.4-uber diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonWithS3IT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonWithS3IT.java index 2df1a5e49b2..a618aad8b34 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonWithS3IT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonWithS3IT.java @@ -17,13 +17,14 @@ package org.apache.seatunnel.e2e.connector.paimon; +import org.apache.seatunnel.e2e.common.container.seatunnel.SeaTunnelContainer; import org.apache.seatunnel.e2e.common.util.ContainerUtil; -import org.apache.seatunnel.engine.e2e.SeaTunnelContainer; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestInstance; import org.testcontainers.containers.Container; import org.testcontainers.containers.MinIOContainer; @@ -34,6 +35,7 @@ import java.nio.file.Paths; import java.util.Map; +@TestInstance(TestInstance.Lifecycle.PER_CLASS) public class PaimonWithS3IT extends SeaTunnelContainer { private static final String MINIO_DOCKER_IMAGE = "minio/minio:RELEASE.2024-06-13T22-53-53Z"; @@ -121,10 +123,10 @@ protected boolean isIssueWeAlreadyKnow(String threadName) { @Test public void testFaceCDCSinkPaimonWithS3Filesystem() throws Exception { - Container.ExecResult execResult = executeSeaTunnelJob("/fake_to_paimon_with_s3.conf"); + Container.ExecResult execResult = executeJob("/fake_to_paimon_with_s3.conf"); Assertions.assertEquals(0, execResult.getExitCode()); - Container.ExecResult readResult = executeSeaTunnelJob("/paimon_with_s3_to_assert.conf"); + Container.ExecResult readResult = executeJob("/paimon_with_s3_to_assert.conf"); Assertions.assertEquals(0, readResult.getExitCode()); } } diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/java/org/apache/seatunnel/e2e/connector/redis/RedisTestCaseTemplateIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/java/org/apache/seatunnel/e2e/connector/redis/RedisTestCaseTemplateIT.java index 96ac20cbe6e..bdc66016db6 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/java/org/apache/seatunnel/e2e/connector/redis/RedisTestCaseTemplateIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/java/org/apache/seatunnel/e2e/connector/redis/RedisTestCaseTemplateIT.java @@ -16,6 +16,8 @@ */ package org.apache.seatunnel.e2e.connector.redis; +import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.node.ObjectNode; + import org.apache.seatunnel.api.table.type.ArrayType; import org.apache.seatunnel.api.table.type.BasicType; import org.apache.seatunnel.api.table.type.DecimalType; @@ -25,15 +27,20 @@ import org.apache.seatunnel.api.table.type.SeaTunnelDataType; import org.apache.seatunnel.api.table.type.SeaTunnelRow; import org.apache.seatunnel.api.table.type.SeaTunnelRowType; +import org.apache.seatunnel.common.utils.JsonUtils; import org.apache.seatunnel.e2e.common.TestResource; import org.apache.seatunnel.e2e.common.TestSuiteBase; +import org.apache.seatunnel.e2e.common.container.EngineType; import org.apache.seatunnel.e2e.common.container.TestContainer; +import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer; import org.apache.seatunnel.format.json.JsonSerializationSchema; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.TestTemplate; +import org.junit.jupiter.api.condition.DisabledOnOs; +import org.junit.jupiter.api.condition.OS; import org.testcontainers.containers.Container; import org.testcontainers.containers.GenericContainer; import org.testcontainers.containers.output.Slf4jLogConsumer; @@ -52,13 +59,21 @@ import java.time.LocalDate; import java.time.LocalDateTime; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import java.util.stream.Collectors; import java.util.stream.Stream; +import static org.awaitility.Awaitility.await; + @Slf4j public abstract class RedisTestCaseTemplateIT extends TestSuiteBase implements TestResource { @@ -492,7 +507,7 @@ public void testFakeToRedisDeleteSetTest(TestContainer container) } @TestTemplate - public void testMysqlCdcToRedisDeleteZSetTest(TestContainer container) + public void testFakeToToRedisDeleteZSetTest(TestContainer container) throws IOException, InterruptedException { Container.ExecResult execResult = container.executeJob("/fake-to-redis-test-delete-zset.conf"); @@ -501,6 +516,69 @@ public void testMysqlCdcToRedisDeleteZSetTest(TestContainer container) jedis.del("zset_check"); } + @TestTemplate + @DisabledOnContainer( + value = {}, + type = {EngineType.SPARK, EngineType.FLINK}, + disabledReason = "Only support for seatunnel") + @DisabledOnOs(OS.WINDOWS) + public void testFakeToRedisInRealTimeTest(TestContainer container) + throws IOException, InterruptedException { + CompletableFuture.supplyAsync( + () -> { + try { + container.executeJob("/fake-to-redis-test-in-real-time.conf"); + } catch (Exception e) { + log.error("Commit task exception :" + e.getMessage()); + throw new RuntimeException(e); + } + return null; + }); + await().atMost(60000, TimeUnit.MILLISECONDS) + .untilAsserted( + () -> { + Assertions.assertEquals(3, jedis.llen("list_check")); + }); + jedis.del("list_check"); + // Get the task id + Container.ExecResult execResult = container.executeBaseCommand(new String[] {"-l"}); + String regex = "(\\d+)\\s+"; + Pattern pattern = Pattern.compile(regex); + List runningJobId = + Arrays.stream(execResult.getStdout().toString().split("\n")) + .filter(s -> s.contains("fake-to-redis-test-in-real-time")) + .map( + s -> { + Matcher matcher = pattern.matcher(s); + return matcher.find() ? matcher.group(1) : null; + }) + .filter(jobId -> jobId != null) + .collect(Collectors.toList()); + Assertions.assertEquals(1, runningJobId.size()); + // Verify that the status is Running + for (String jobId : runningJobId) { + Container.ExecResult execResult1 = + container.executeBaseCommand(new String[] {"-j", jobId}); + String stdout = execResult1.getStdout(); + ObjectNode jsonNodes = JsonUtils.parseObject(stdout); + Assertions.assertEquals(jsonNodes.get("jobStatus").asText(), "RUNNING"); + } + // Execute cancellation task + String[] batchCancelCommand = + Stream.concat(Arrays.stream(new String[] {"-can"}), runningJobId.stream()) + .toArray(String[]::new); + Assertions.assertEquals(0, container.executeBaseCommand(batchCancelCommand).getExitCode()); + + // Verify whether the cancellation is successful + for (String jobId : runningJobId) { + Container.ExecResult execResult1 = + container.executeBaseCommand(new String[] {"-j", jobId}); + String stdout = execResult1.getStdout(); + ObjectNode jsonNodes = JsonUtils.parseObject(stdout); + Assertions.assertEquals(jsonNodes.get("jobStatus").asText(), "CANCELED"); + } + } + @TestTemplate public void testFakeToRedisNormalKeyIsNullTest(TestContainer container) throws IOException, InterruptedException { diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/fake-to-redis-test-in-real-time.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/fake-to-redis-test-in-real-time.conf new file mode 100644 index 00000000000..9e8829a3913 --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/fake-to-redis-test-in-real-time.conf @@ -0,0 +1,68 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +env { + parallelism = 1 + job.mode = "STREAMING" + checkpoint.interval = 10000 + shade.identifier = "base64" +} + +source { + FakeSource { + schema = { + fields { + id = int + val_bool = boolean + val_int8 = tinyint + val_int16 = smallint + val_int32 = int + val_int64 = bigint + val_float = float + val_double = double + val_decimal = "decimal(16, 1)" + val_string = string + val_unixtime_micros = timestamp + } + } + rows = [ + { + kind = INSERT + fields = [1, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW", "2020-02-02T02:02:02"] + }, + { + kind = INSERT + fields = [2, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW", "2020-02-02T02:02:02"] + }, + { + kind = INSERT + fields = [3, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW", "2020-02-02T02:02:02"] + } + ] + } +} + +sink { + Redis { + host = "redis-e2e" + port = 6379 + auth = "U2VhVHVubmVs" + key = "list_check" + data_type = list + batch_size = 33 + } +} diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-starrocks-e2e/pom.xml b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-starrocks-e2e/pom.xml index 05829ea893d..7f11b08d8b2 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-starrocks-e2e/pom.xml +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-starrocks-e2e/pom.xml @@ -55,12 +55,6 @@ test-jar test - - org.apache.seatunnel - connector-cdc-mysql-e2e - ${project.version} - test - org.apache.seatunnel connector-assert diff --git a/seatunnel-e2e/seatunnel-engine-e2e/connector-console-seatunnel-e2e/src/test/java/org/apache/seatunnel/engine/e2e/console/FakeSourceToConsoleIT.java b/seatunnel-e2e/seatunnel-engine-e2e/connector-console-seatunnel-e2e/src/test/java/org/apache/seatunnel/engine/e2e/console/FakeSourceToConsoleIT.java index 832be6fc18a..c63982cbbf5 100644 --- a/seatunnel-e2e/seatunnel-engine-e2e/connector-console-seatunnel-e2e/src/test/java/org/apache/seatunnel/engine/e2e/console/FakeSourceToConsoleIT.java +++ b/seatunnel-e2e/seatunnel-engine-e2e/connector-console-seatunnel-e2e/src/test/java/org/apache/seatunnel/engine/e2e/console/FakeSourceToConsoleIT.java @@ -17,7 +17,7 @@ package org.apache.seatunnel.engine.e2e.console; -import org.apache.seatunnel.engine.e2e.SeaTunnelContainer; +import org.apache.seatunnel.engine.e2e.SeaTunnelEngineContainer; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; @@ -25,7 +25,7 @@ import java.io.IOException; -public class FakeSourceToConsoleIT extends SeaTunnelContainer { +public class FakeSourceToConsoleIT extends SeaTunnelEngineContainer { @Test public void testFakeSourceToConsoleSink() throws IOException, InterruptedException { diff --git a/seatunnel-e2e/seatunnel-engine-e2e/connector-console-seatunnel-e2e/src/test/java/org/apache/seatunnel/engine/e2e/console/FakeSourceToConsoleWithEventReportIT.java b/seatunnel-e2e/seatunnel-engine-e2e/connector-console-seatunnel-e2e/src/test/java/org/apache/seatunnel/engine/e2e/console/FakeSourceToConsoleWithEventReportIT.java index 79a4dbe1f81..e23cadfe419 100644 --- a/seatunnel-e2e/seatunnel-engine-e2e/connector-console-seatunnel-e2e/src/test/java/org/apache/seatunnel/engine/e2e/console/FakeSourceToConsoleWithEventReportIT.java +++ b/seatunnel-e2e/seatunnel-engine-e2e/connector-console-seatunnel-e2e/src/test/java/org/apache/seatunnel/engine/e2e/console/FakeSourceToConsoleWithEventReportIT.java @@ -21,7 +21,7 @@ import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.node.ArrayNode; import org.apache.seatunnel.api.event.EventType; -import org.apache.seatunnel.engine.e2e.SeaTunnelContainer; +import org.apache.seatunnel.engine.e2e.SeaTunnelEngineContainer; import org.apache.seatunnel.engine.server.event.JobEventHttpReportHandler; import org.junit.jupiter.api.AfterAll; @@ -52,7 +52,7 @@ import static org.awaitility.Awaitility.given; @Slf4j -public class FakeSourceToConsoleWithEventReportIT extends SeaTunnelContainer { +public class FakeSourceToConsoleWithEventReportIT extends SeaTunnelEngineContainer { private static final int MOCK_SERVER_PORT = 1024; private MockWebServer mockWebServer; diff --git a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterSeaTunnelContainer.java b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterSeaTunnelEngineContainer.java similarity index 99% rename from seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterSeaTunnelContainer.java rename to seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterSeaTunnelEngineContainer.java index 6bdf1c24153..28b8adb41fc 100644 --- a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterSeaTunnelContainer.java +++ b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterSeaTunnelEngineContainer.java @@ -53,7 +53,7 @@ import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.in; -public class ClusterSeaTunnelContainer extends SeaTunnelContainer { +public class ClusterSeaTunnelEngineContainer extends SeaTunnelEngineContainer { private GenericContainer secondServer; diff --git a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/JobClientJobProxyIT.java b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/JobClientJobProxyIT.java index c44161f2a49..febdaa332d4 100644 --- a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/JobClientJobProxyIT.java +++ b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/JobClientJobProxyIT.java @@ -28,7 +28,7 @@ import static org.apache.seatunnel.e2e.common.util.ContainerUtil.PROJECT_ROOT_PATH; -public class JobClientJobProxyIT extends SeaTunnelContainer { +public class JobClientJobProxyIT extends SeaTunnelEngineContainer { @Override @BeforeAll diff --git a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/JobRestoreIT.java b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/JobRestoreIT.java index f251d9019fc..9a0e94f89e0 100644 --- a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/JobRestoreIT.java +++ b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/JobRestoreIT.java @@ -26,7 +26,7 @@ import static org.apache.seatunnel.e2e.common.util.ContainerUtil.PROJECT_ROOT_PATH; -public class JobRestoreIT extends SeaTunnelContainer { +public class JobRestoreIT extends SeaTunnelEngineContainer { @Override @BeforeAll diff --git a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/SeaTunnelContainer.java b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/SeaTunnelEngineContainer.java similarity index 91% rename from seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/SeaTunnelContainer.java rename to seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/SeaTunnelEngineContainer.java index 0b96193f0ca..1d736904939 100644 --- a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/SeaTunnelContainer.java +++ b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/SeaTunnelEngineContainer.java @@ -17,6 +17,8 @@ package org.apache.seatunnel.engine.e2e; +import org.apache.seatunnel.e2e.common.container.seatunnel.SeaTunnelContainer; + import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.TestInstance; @@ -28,8 +30,7 @@ @Slf4j @TestInstance(TestInstance.Lifecycle.PER_CLASS) -public abstract class SeaTunnelContainer - extends org.apache.seatunnel.e2e.common.container.seatunnel.SeaTunnelContainer { +public abstract class SeaTunnelEngineContainer extends SeaTunnelContainer { @Override @BeforeAll diff --git a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/SinkPlaceholderIT.java b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/SinkPlaceholderIT.java index eee3705452f..a577b2b6bd1 100644 --- a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/SinkPlaceholderIT.java +++ b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/SinkPlaceholderIT.java @@ -23,7 +23,7 @@ import java.io.IOException; -public class SinkPlaceholderIT extends SeaTunnelContainer { +public class SinkPlaceholderIT extends SeaTunnelEngineContainer { @Test public void testSinkPlaceholder() throws IOException, InterruptedException { diff --git a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/classloader/ClassLoaderITBase.java b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/classloader/ClassLoaderITBase.java index 0a4e090598f..a8234bd59e1 100644 --- a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/classloader/ClassLoaderITBase.java +++ b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/classloader/ClassLoaderITBase.java @@ -18,7 +18,7 @@ package org.apache.seatunnel.engine.e2e.classloader; import org.apache.seatunnel.e2e.common.util.ContainerUtil; -import org.apache.seatunnel.engine.e2e.SeaTunnelContainer; +import org.apache.seatunnel.engine.e2e.SeaTunnelEngineContainer; import org.apache.seatunnel.engine.server.rest.RestConstant; import org.awaitility.Awaitility; @@ -41,7 +41,7 @@ import static org.apache.seatunnel.e2e.common.util.ContainerUtil.PROJECT_ROOT_PATH; import static org.hamcrest.Matchers.equalTo; -public abstract class ClassLoaderITBase extends SeaTunnelContainer { +public abstract class ClassLoaderITBase extends SeaTunnelEngineContainer { private static final String CONF_FILE = "/classloader/fake_to_inmemory.conf"; diff --git a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/joblog/JobLogIT.java b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/joblog/JobLogIT.java index 18d08b7506e..37310b9fe09 100644 --- a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/joblog/JobLogIT.java +++ b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/joblog/JobLogIT.java @@ -19,7 +19,7 @@ import org.apache.seatunnel.common.constants.JobMode; import org.apache.seatunnel.e2e.common.util.ContainerUtil; -import org.apache.seatunnel.engine.e2e.SeaTunnelContainer; +import org.apache.seatunnel.engine.e2e.SeaTunnelEngineContainer; import org.apache.seatunnel.engine.server.rest.RestConstant; import org.awaitility.Awaitility; @@ -53,7 +53,7 @@ import static org.apache.seatunnel.e2e.common.util.ContainerUtil.PROJECT_ROOT_PATH; import static org.hamcrest.Matchers.equalTo; -public class JobLogIT extends SeaTunnelContainer { +public class JobLogIT extends SeaTunnelEngineContainer { private static final String CUSTOM_JOB_NAME = "test-job-log-file"; private static final String CUSTOM_JOB_NAME2 = "test-job-log-file2"; diff --git a/seatunnel-engine/seatunnel-engine-ui/pom.xml b/seatunnel-engine/seatunnel-engine-ui/pom.xml index 5244e73c5b2..0ac5f9954b4 100644 --- a/seatunnel-engine/seatunnel-engine-ui/pom.xml +++ b/seatunnel-engine/seatunnel-engine-ui/pom.xml @@ -35,6 +35,7 @@ node_modules ../seatunnel-engine-server/src/main/resources/ui .deployed + false @@ -70,6 +71,7 @@ ${build.node.version} ${build.npm.version} + ${skip.ui} diff --git a/tools/update_modules_check/update_modules_check.py b/tools/update_modules_check/update_modules_check.py index cef49ad4c35..7b3fddfab10 100644 --- a/tools/update_modules_check/update_modules_check.py +++ b/tools/update_modules_check/update_modules_check.py @@ -154,6 +154,10 @@ def get_sub_it_modules(modules, total_num, current_num): modules_arr.remove("connector-file-local-e2e") modules_arr.remove("connector-file-sftp-e2e") modules_arr.remove("connector-redis-e2e") + if "connector-seatunnel-e2e-base" in modules_arr: + modules_arr.remove("connector-seatunnel-e2e-base") + if "connector-console-seatunnel-e2e" in modules_arr: + modules_arr.remove("connector-console-seatunnel-e2e") output = "" for i, module in enumerate(modules_arr): if len(module) > 0 and i % int(total_num) == int(current_num): @@ -195,6 +199,10 @@ def get_sub_update_it_modules(modules, total_num, current_num): module_list.remove("connector-file-sftp-e2e") if "connector-redis-e2e" in module_list: module_list.remove("connector-redis-e2e") + if "connector-seatunnel-e2e-base" in module_list: + module_list.remove("connector-seatunnel-e2e-base") + if "connector-console-seatunnel-e2e" in module_list: + module_list.remove("connector-console-seatunnel-e2e") for i, module in enumerate(module_list): if len(module) > 0 and i % int(total_num) == int(current_num): final_modules.append(":" + module)