diff --git a/ci/scripts/notify.py b/ci/scripts/notify.py index 5266998b0045f..9160d01675832 100755 --- a/ci/scripts/notify.py +++ b/ci/scripts/notify.py @@ -98,6 +98,7 @@ def get_mock_test_status(test): "e2e-clickhouse-sink-tests": "hard_failed", "e2e-pulsar-sink-tests": "", "s3-source-test-for-opendal-fs-engine": "", + "s3-source-tests": "", "pulsar-source-tests": "", "connector-node-integration-test": "" } diff --git a/ci/scripts/s3-source-test-for-opendal-fs-engine.sh b/ci/scripts/s3-source-test-for-opendal-fs-engine.sh index 6fbbdb35e0e45..355489acf2512 100755 --- a/ci/scripts/s3-source-test-for-opendal-fs-engine.sh +++ b/ci/scripts/s3-source-test-for-opendal-fs-engine.sh @@ -30,7 +30,7 @@ cargo make ci-start ci-3cn-3fe-opendal-fs-backend echo "--- Run test" python3 -m pip install minio psycopg2-binary -python3 e2e_test/s3/$script.py +python3 e2e_test/s3/$script echo "--- Kill cluster" rm -rf /tmp/rw_ci diff --git a/ci/workflows/main-cron.yml b/ci/workflows/main-cron.yml index 653578e4688e2..d931c3af16660 100644 --- a/ci/workflows/main-cron.yml +++ b/ci/workflows/main-cron.yml @@ -490,6 +490,7 @@ steps: retry: *auto-retry - label: "PosixFs source on OpenDAL fs engine (csv parser)" + key: "s3-source-test-for-opendal-fs-engine" command: "ci/scripts/s3-source-test.sh -p ci-release -s 'posix_fs_source.py csv_without_header'" if: | !(build.pull_request.labels includes "ci/main-cron/skip-ci") && build.env("CI_STEPS") == null @@ -507,7 +508,7 @@ steps: - label: "S3 source on OpenDAL fs engine" key: "s3-source-test-for-opendal-fs-engine" - command: "ci/scripts/s3-source-test-for-opendal-fs-engine.sh -p ci-release -s run" + command: "ci/scripts/s3-source-test-for-opendal-fs-engine.sh -p ci-release -s run.csv" if: | !(build.pull_request.labels includes "ci/main-cron/skip-ci") && build.env("CI_STEPS") == null || build.pull_request.labels includes "ci/run-s3-source-tests" @@ -527,6 +528,29 @@ steps: timeout_in_minutes: 20 retry: *auto-retry + # TODO(Kexiang): Enable this test after we have a GCS_SOURCE_TEST_CONF. + # - label: "GCS source on OpenDAL fs engine" + # key: "s3-source-test-for-opendal-fs-engine" + # command: "ci/scripts/s3-source-test-for-opendal-fs-engine.sh -p ci-release -s gcs.csv" + # if: | + # !(build.pull_request.labels includes "ci/main-cron/skip-ci") && build.env("CI_STEPS") == null + # || build.pull_request.labels includes "ci/run-s3-source-tests" + # || build.env("CI_STEPS") =~ /(^|,)s3-source-tests?(,|$$)/ + # depends_on: build + # plugins: + # - seek-oss/aws-sm#v2.3.1: + # env: + # S3_SOURCE_TEST_CONF: ci_s3_source_test_aws + # - docker-compose#v4.9.0: + # run: rw-build-env + # config: ci/docker-compose.yml + # mount-buildkite-agent: true + # environment: + # - S3_SOURCE_TEST_CONF + # - ./ci/plugins/upload-failure-logs + # timeout_in_minutes: 20 + # retry: *auto-retry + - label: "pulsar source check" key: "pulsar-source-tests" command: "ci/scripts/pulsar-source-test.sh -p ci-release" diff --git a/e2e_test/s3/gcs_source.py b/e2e_test/s3/gcs_source.py index c917f2c2d33fd..5e1144266fb23 100644 --- a/e2e_test/s3/gcs_source.py +++ b/e2e_test/s3/gcs_source.py @@ -57,7 +57,7 @@ def _encode(): connector = 'gcs', match_pattern = '{prefix}*.{fmt}', gcs.bucket_name = '{config['GCS_BUCKET']}', - gcs.credentials = '{credential}', + gcs.credential = '{credential}', ) FORMAT PLAIN ENCODE {_encode()};''') total_rows = file_num * item_num_per_file diff --git a/src/connector/src/source/base.rs b/src/connector/src/source/base.rs index b6093a351783b..0c334bfee1de9 100644 --- a/src/connector/src/source/base.rs +++ b/src/connector/src/source/base.rs @@ -42,7 +42,7 @@ use super::google_pubsub::GooglePubsubMeta; use super::kafka::KafkaMeta; use super::monitor::SourceMetrics; use super::nexmark::source::message::NexmarkMeta; -use super::{OPENDAL_S3_CONNECTOR, POSIX_FS_CONNECTOR}; +use super::{GCS_CONNECTOR, OPENDAL_S3_CONNECTOR, POSIX_FS_CONNECTOR}; use crate::parser::ParserConfig; pub(crate) use crate::source::common::CommonSplitReader; use crate::source::filesystem::FsPageItem; @@ -389,6 +389,7 @@ impl ConnectorProperties { .map(|s| { s.eq_ignore_ascii_case(OPENDAL_S3_CONNECTOR) || s.eq_ignore_ascii_case(POSIX_FS_CONNECTOR) + || s.eq_ignore_ascii_case(GCS_CONNECTOR) }) .unwrap_or(false) } @@ -399,6 +400,7 @@ impl ConnectorProperties { .map(|s| { s.eq_ignore_ascii_case(OPENDAL_S3_CONNECTOR) || s.eq_ignore_ascii_case(POSIX_FS_CONNECTOR) + || s.eq_ignore_ascii_case(GCS_CONNECTOR) }) .unwrap_or(false) } diff --git a/src/connector/src/source/filesystem/opendal_source/gcs_source.rs b/src/connector/src/source/filesystem/opendal_source/gcs_source.rs index 7d9c2bec4429b..d6f7b44bff591 100644 --- a/src/connector/src/source/filesystem/opendal_source/gcs_source.rs +++ b/src/connector/src/source/filesystem/opendal_source/gcs_source.rs @@ -32,9 +32,13 @@ impl OpendalEnumerator { builder.bucket(&gcs_properties.bucket_name); // if credential env is set, use it. Otherwise, ADC will be used. - let cred = gcs_properties.credential; - if let Some(cred) = cred { + if let Some(cred) = gcs_properties.credential { builder.credential(&cred); + } else { + let cred = std::env::var("GOOGLE_APPLICATION_CREDENTIALS"); + if let Ok(cred) = cred { + builder.credential(&cred); + } } if let Some(service_account) = gcs_properties.service_account { diff --git a/src/connector/src/source/filesystem/opendal_source/mod.rs b/src/connector/src/source/filesystem/opendal_source/mod.rs index d6223c467d08b..e0c5a22f1fd90 100644 --- a/src/connector/src/source/filesystem/opendal_source/mod.rs +++ b/src/connector/src/source/filesystem/opendal_source/mod.rs @@ -38,10 +38,15 @@ pub const POSIX_FS_CONNECTOR: &str = "posix_fs"; pub struct GcsProperties { #[serde(rename = "gcs.bucket_name")] pub bucket_name: String, + + /// The base64 encoded credential key. If not set, ADC will be used. #[serde(rename = "gcs.credential")] pub credential: Option, + + /// If credential/ADC is not set. The service account can be used to provide the credential info. #[serde(rename = "gcs.service_account", default)] pub service_account: Option, + #[serde(rename = "match_pattern", default)] pub match_pattern: Option, @@ -107,7 +112,7 @@ pub struct OpendalS3Properties { #[serde(flatten)] pub s3_properties: S3PropertiesCommon, - // The following are only supported by s3_v2 (opendal) source. + /// The following are only supported by s3_v2 (opendal) source. #[serde(rename = "s3.assume_role", default)] pub assume_role: Option, @@ -131,11 +136,11 @@ impl SourceProperties for OpendalS3Properties { #[derive(Clone, Debug, Deserialize, PartialEq, WithOptions)] pub struct PosixFsProperties { - // The root directly of the files to search. The files will be searched recursively. + /// The root directly of the files to search. The files will be searched recursively. #[serde(rename = "posix_fs.root")] pub root: String, - // The regex pattern to match files under root directory. + /// The regex pattern to match files under root directory. #[serde(rename = "match_pattern", default)] pub match_pattern: Option, diff --git a/src/connector/with_options_source.yaml b/src/connector/with_options_source.yaml index 98a45599b56f2..187780cd23826 100644 --- a/src/connector/with_options_source.yaml +++ b/src/connector/with_options_source.yaml @@ -22,9 +22,11 @@ GcsProperties: required: true - name: gcs.credential field_type: String + comments: The base64 encoded credential key. If not set, ADC will be used. required: false - name: gcs.service_account field_type: String + comments: If credential/ADC is not set. The service account can be used to provide the credential info. required: false default: Default::default - name: match_pattern @@ -453,15 +455,18 @@ OpendalS3Properties: required: false - name: s3.assume_role field_type: String + comments: The following are only supported by s3_v2 (opendal) source. required: false default: Default::default PosixFsProperties: fields: - name: posix_fs.root field_type: String + comments: The root directly of the files to search. The files will be searched recursively. required: true - name: match_pattern field_type: String + comments: The regex pattern to match files under root directory. required: false default: Default::default PubsubProperties: