From 393d41471199ec9c1b33b5b387d195454876ba46 Mon Sep 17 00:00:00 2001 From: Kexiang Wang Date: Fri, 5 Jan 2024 17:17:54 -0500 Subject: [PATCH] fix gcs-source --- ci/scripts/notify.py | 1 + .../s3-source-test-for-opendal-fs-engine.sh | 2 +- ci/workflows/main-cron.yml | 26 ++++++++++++++++++- .../filesystem/opendal_source/gcs_source.rs | 8 ++++-- .../source/filesystem/opendal_source/mod.rs | 11 +++++--- 5 files changed, 41 insertions(+), 7 deletions(-) diff --git a/ci/scripts/notify.py b/ci/scripts/notify.py index 5266998b0045f..9160d01675832 100755 --- a/ci/scripts/notify.py +++ b/ci/scripts/notify.py @@ -98,6 +98,7 @@ def get_mock_test_status(test): "e2e-clickhouse-sink-tests": "hard_failed", "e2e-pulsar-sink-tests": "", "s3-source-test-for-opendal-fs-engine": "", + "s3-source-tests": "", "pulsar-source-tests": "", "connector-node-integration-test": "" } diff --git a/ci/scripts/s3-source-test-for-opendal-fs-engine.sh b/ci/scripts/s3-source-test-for-opendal-fs-engine.sh index 6fbbdb35e0e45..355489acf2512 100755 --- a/ci/scripts/s3-source-test-for-opendal-fs-engine.sh +++ b/ci/scripts/s3-source-test-for-opendal-fs-engine.sh @@ -30,7 +30,7 @@ cargo make ci-start ci-3cn-3fe-opendal-fs-backend echo "--- Run test" python3 -m pip install minio psycopg2-binary -python3 e2e_test/s3/$script.py +python3 e2e_test/s3/$script echo "--- Kill cluster" rm -rf /tmp/rw_ci diff --git a/ci/workflows/main-cron.yml b/ci/workflows/main-cron.yml index 653578e4688e2..d931c3af16660 100644 --- a/ci/workflows/main-cron.yml +++ b/ci/workflows/main-cron.yml @@ -490,6 +490,7 @@ steps: retry: *auto-retry - label: "PosixFs source on OpenDAL fs engine (csv parser)" + key: "s3-source-test-for-opendal-fs-engine" command: "ci/scripts/s3-source-test.sh -p ci-release -s 'posix_fs_source.py csv_without_header'" if: | !(build.pull_request.labels includes "ci/main-cron/skip-ci") && build.env("CI_STEPS") == null @@ -507,7 +508,7 @@ steps: - label: "S3 source on OpenDAL fs engine" key: "s3-source-test-for-opendal-fs-engine" - command: "ci/scripts/s3-source-test-for-opendal-fs-engine.sh -p ci-release -s run" + command: "ci/scripts/s3-source-test-for-opendal-fs-engine.sh -p ci-release -s run.csv" if: | !(build.pull_request.labels includes "ci/main-cron/skip-ci") && build.env("CI_STEPS") == null || build.pull_request.labels includes "ci/run-s3-source-tests" @@ -527,6 +528,29 @@ steps: timeout_in_minutes: 20 retry: *auto-retry + # TODO(Kexiang): Enable this test after we have a GCS_SOURCE_TEST_CONF. + # - label: "GCS source on OpenDAL fs engine" + # key: "s3-source-test-for-opendal-fs-engine" + # command: "ci/scripts/s3-source-test-for-opendal-fs-engine.sh -p ci-release -s gcs.csv" + # if: | + # !(build.pull_request.labels includes "ci/main-cron/skip-ci") && build.env("CI_STEPS") == null + # || build.pull_request.labels includes "ci/run-s3-source-tests" + # || build.env("CI_STEPS") =~ /(^|,)s3-source-tests?(,|$$)/ + # depends_on: build + # plugins: + # - seek-oss/aws-sm#v2.3.1: + # env: + # S3_SOURCE_TEST_CONF: ci_s3_source_test_aws + # - docker-compose#v4.9.0: + # run: rw-build-env + # config: ci/docker-compose.yml + # mount-buildkite-agent: true + # environment: + # - S3_SOURCE_TEST_CONF + # - ./ci/plugins/upload-failure-logs + # timeout_in_minutes: 20 + # retry: *auto-retry + - label: "pulsar source check" key: "pulsar-source-tests" command: "ci/scripts/pulsar-source-test.sh -p ci-release" diff --git a/src/connector/src/source/filesystem/opendal_source/gcs_source.rs b/src/connector/src/source/filesystem/opendal_source/gcs_source.rs index 7d9c2bec4429b..d6f7b44bff591 100644 --- a/src/connector/src/source/filesystem/opendal_source/gcs_source.rs +++ b/src/connector/src/source/filesystem/opendal_source/gcs_source.rs @@ -32,9 +32,13 @@ impl OpendalEnumerator { builder.bucket(&gcs_properties.bucket_name); // if credential env is set, use it. Otherwise, ADC will be used. - let cred = gcs_properties.credential; - if let Some(cred) = cred { + if let Some(cred) = gcs_properties.credential { builder.credential(&cred); + } else { + let cred = std::env::var("GOOGLE_APPLICATION_CREDENTIALS"); + if let Ok(cred) = cred { + builder.credential(&cred); + } } if let Some(service_account) = gcs_properties.service_account { diff --git a/src/connector/src/source/filesystem/opendal_source/mod.rs b/src/connector/src/source/filesystem/opendal_source/mod.rs index d6223c467d08b..e0c5a22f1fd90 100644 --- a/src/connector/src/source/filesystem/opendal_source/mod.rs +++ b/src/connector/src/source/filesystem/opendal_source/mod.rs @@ -38,10 +38,15 @@ pub const POSIX_FS_CONNECTOR: &str = "posix_fs"; pub struct GcsProperties { #[serde(rename = "gcs.bucket_name")] pub bucket_name: String, + + /// The base64 encoded credential key. If not set, ADC will be used. #[serde(rename = "gcs.credential")] pub credential: Option, + + /// If credential/ADC is not set. The service account can be used to provide the credential info. #[serde(rename = "gcs.service_account", default)] pub service_account: Option, + #[serde(rename = "match_pattern", default)] pub match_pattern: Option, @@ -107,7 +112,7 @@ pub struct OpendalS3Properties { #[serde(flatten)] pub s3_properties: S3PropertiesCommon, - // The following are only supported by s3_v2 (opendal) source. + /// The following are only supported by s3_v2 (opendal) source. #[serde(rename = "s3.assume_role", default)] pub assume_role: Option, @@ -131,11 +136,11 @@ impl SourceProperties for OpendalS3Properties { #[derive(Clone, Debug, Deserialize, PartialEq, WithOptions)] pub struct PosixFsProperties { - // The root directly of the files to search. The files will be searched recursively. + /// The root directly of the files to search. The files will be searched recursively. #[serde(rename = "posix_fs.root")] pub root: String, - // The regex pattern to match files under root directory. + /// The regex pattern to match files under root directory. #[serde(rename = "match_pattern", default)] pub match_pattern: Option,