From 34f8705881feb04a15e4fc94a1298d7252125eb4 Mon Sep 17 00:00:00 2001 From: Kexiang Wang Date: Mon, 8 Jan 2024 22:05:37 -0500 Subject: [PATCH] fix(connector): fix gcs source connector(#14373) (#14425) --- ci/scripts/notify.py | 1 + .../s3-source-test-for-opendal-fs-engine.sh | 2 +- ci/workflows/main-cron.yml | 25 ++++++++++++++++++- e2e_test/s3/gcs_source.py | 2 +- src/connector/src/source/base.rs | 12 ++++++--- .../filesystem/opendal_source/gcs_source.rs | 8 ++++-- .../source/filesystem/opendal_source/mod.rs | 7 +++++- src/connector/with_options_source.yaml | 3 +++ 8 files changed, 51 insertions(+), 9 deletions(-) diff --git a/ci/scripts/notify.py b/ci/scripts/notify.py index 818dfce72143a..26db597a59b36 100755 --- a/ci/scripts/notify.py +++ b/ci/scripts/notify.py @@ -97,6 +97,7 @@ def get_mock_test_status(test): "e2e-clickhouse-sink-tests": "hard_failed", "e2e-pulsar-sink-tests": "", "s3-source-test-for-opendal-fs-engine": "", + "s3-source-tests": "", "pulsar-source-tests": "", "connector-node-integration-test": "" } diff --git a/ci/scripts/s3-source-test-for-opendal-fs-engine.sh b/ci/scripts/s3-source-test-for-opendal-fs-engine.sh index 6fbbdb35e0e45..355489acf2512 100755 --- a/ci/scripts/s3-source-test-for-opendal-fs-engine.sh +++ b/ci/scripts/s3-source-test-for-opendal-fs-engine.sh @@ -30,7 +30,7 @@ cargo make ci-start ci-3cn-3fe-opendal-fs-backend echo "--- Run test" python3 -m pip install minio psycopg2-binary -python3 e2e_test/s3/$script.py +python3 e2e_test/s3/$script echo "--- Kill cluster" rm -rf /tmp/rw_ci diff --git a/ci/workflows/main-cron.yml b/ci/workflows/main-cron.yml index 75f58eadf2492..8e16f5b6eb978 100644 --- a/ci/workflows/main-cron.yml +++ b/ci/workflows/main-cron.yml @@ -491,7 +491,7 @@ steps: - label: "S3 source on OpenDAL fs engine" key: "s3-source-test-for-opendal-fs-engine" - command: "ci/scripts/s3-source-test-for-opendal-fs-engine.sh -p ci-release -s run" + command: "ci/scripts/s3-source-test-for-opendal-fs-engine.sh -p ci-release -s run.csv" if: | !(build.pull_request.labels includes "ci/main-cron/skip-ci") && build.env("CI_STEPS") == null || build.pull_request.labels includes "ci/run-s3-source-tests" @@ -511,6 +511,29 @@ steps: timeout_in_minutes: 20 retry: *auto-retry + # TODO(Kexiang): Enable this test after we have a GCS_SOURCE_TEST_CONF. + # - label: "GCS source on OpenDAL fs engine" + # key: "s3-source-test-for-opendal-fs-engine" + # command: "ci/scripts/s3-source-test-for-opendal-fs-engine.sh -p ci-release -s gcs.csv" + # if: | + # !(build.pull_request.labels includes "ci/main-cron/skip-ci") && build.env("CI_STEPS") == null + # || build.pull_request.labels includes "ci/run-s3-source-tests" + # || build.env("CI_STEPS") =~ /(^|,)s3-source-tests?(,|$$)/ + # depends_on: build + # plugins: + # - seek-oss/aws-sm#v2.3.1: + # env: + # S3_SOURCE_TEST_CONF: ci_s3_source_test_aws + # - docker-compose#v4.9.0: + # run: rw-build-env + # config: ci/docker-compose.yml + # mount-buildkite-agent: true + # environment: + # - S3_SOURCE_TEST_CONF + # - ./ci/plugins/upload-failure-logs + # timeout_in_minutes: 20 + # retry: *auto-retry + - label: "pulsar source check" key: "pulsar-source-tests" command: "ci/scripts/pulsar-source-test.sh -p ci-release" diff --git a/e2e_test/s3/gcs_source.py b/e2e_test/s3/gcs_source.py index c917f2c2d33fd..5e1144266fb23 100644 --- a/e2e_test/s3/gcs_source.py +++ b/e2e_test/s3/gcs_source.py @@ -57,7 +57,7 @@ def _encode(): connector = 'gcs', match_pattern = '{prefix}*.{fmt}', gcs.bucket_name = '{config['GCS_BUCKET']}', - gcs.credentials = '{credential}', + gcs.credential = '{credential}', ) FORMAT PLAIN ENCODE {_encode()};''') total_rows = file_num * item_num_per_file diff --git a/src/connector/src/source/base.rs b/src/connector/src/source/base.rs index 1a4a594d44285..1ee532e4c5d51 100644 --- a/src/connector/src/source/base.rs +++ b/src/connector/src/source/base.rs @@ -42,7 +42,7 @@ use super::google_pubsub::GooglePubsubMeta; use super::kafka::KafkaMeta; use super::monitor::SourceMetrics; use super::nexmark::source::message::NexmarkMeta; -use super::OPENDAL_S3_CONNECTOR; +use super::{GCS_CONNECTOR, OPENDAL_S3_CONNECTOR}; use crate::parser::ParserConfig; pub(crate) use crate::source::common::CommonSplitReader; use crate::source::filesystem::FsPageItem; @@ -386,14 +386,20 @@ impl ConnectorProperties { pub fn is_new_fs_connector_b_tree_map(with_properties: &BTreeMap) -> bool { with_properties .get(UPSTREAM_SOURCE_KEY) - .map(|s| s.eq_ignore_ascii_case(OPENDAL_S3_CONNECTOR)) + .map(|s| { + s.eq_ignore_ascii_case(OPENDAL_S3_CONNECTOR) + || s.eq_ignore_ascii_case(GCS_CONNECTOR) + }) .unwrap_or(false) } pub fn is_new_fs_connector_hash_map(with_properties: &HashMap) -> bool { with_properties .get(UPSTREAM_SOURCE_KEY) - .map(|s| s.eq_ignore_ascii_case(OPENDAL_S3_CONNECTOR)) + .map(|s| { + s.eq_ignore_ascii_case(OPENDAL_S3_CONNECTOR) + || s.eq_ignore_ascii_case(GCS_CONNECTOR) + }) .unwrap_or(false) } } diff --git a/src/connector/src/source/filesystem/opendal_source/gcs_source.rs b/src/connector/src/source/filesystem/opendal_source/gcs_source.rs index 7d9c2bec4429b..d6f7b44bff591 100644 --- a/src/connector/src/source/filesystem/opendal_source/gcs_source.rs +++ b/src/connector/src/source/filesystem/opendal_source/gcs_source.rs @@ -32,9 +32,13 @@ impl OpendalEnumerator { builder.bucket(&gcs_properties.bucket_name); // if credential env is set, use it. Otherwise, ADC will be used. - let cred = gcs_properties.credential; - if let Some(cred) = cred { + if let Some(cred) = gcs_properties.credential { builder.credential(&cred); + } else { + let cred = std::env::var("GOOGLE_APPLICATION_CREDENTIALS"); + if let Ok(cred) = cred { + builder.credential(&cred); + } } if let Some(service_account) = gcs_properties.service_account { diff --git a/src/connector/src/source/filesystem/opendal_source/mod.rs b/src/connector/src/source/filesystem/opendal_source/mod.rs index 93e09be83ebf1..b5e02d1ad3095 100644 --- a/src/connector/src/source/filesystem/opendal_source/mod.rs +++ b/src/connector/src/source/filesystem/opendal_source/mod.rs @@ -36,10 +36,15 @@ pub const OPENDAL_S3_CONNECTOR: &str = "s3_v2"; pub struct GcsProperties { #[serde(rename = "gcs.bucket_name")] pub bucket_name: String, + + /// The base64 encoded credential key. If not set, ADC will be used. #[serde(rename = "gcs.credential")] pub credential: Option, + + /// If credential/ADC is not set. The service account can be used to provide the credential info. #[serde(rename = "gcs.service_account", default)] pub service_account: Option, + #[serde(rename = "match_pattern", default)] pub match_pattern: Option, @@ -94,7 +99,7 @@ pub struct OpendalS3Properties { #[serde(flatten)] pub s3_properties: S3PropertiesCommon, - // The following are only supported by s3_v2 (opendal) source. + /// The following are only supported by s3_v2 (opendal) source. #[serde(rename = "s3.assume_role", default)] pub assume_role: Option, diff --git a/src/connector/with_options_source.yaml b/src/connector/with_options_source.yaml index 6ee69ccd2ab18..3dd6731b22f1e 100644 --- a/src/connector/with_options_source.yaml +++ b/src/connector/with_options_source.yaml @@ -22,9 +22,11 @@ GcsProperties: required: true - name: gcs.credential field_type: String + comments: The base64 encoded credential key. If not set, ADC will be used. required: false - name: gcs.service_account field_type: String + comments: If credential/ADC is not set. The service account can be used to provide the credential info. required: false default: Default::default - name: match_pattern @@ -453,6 +455,7 @@ OpendalS3Properties: required: false - name: s3.assume_role field_type: String + comments: The following are only supported by s3_v2 (opendal) source. required: false default: Default::default PubsubProperties: