Skip to content

Commit

Permalink
fix(connector): fix gcs source connector(#14373) (#14425)
Browse files Browse the repository at this point in the history
  • Loading branch information
KeXiangWang authored Jan 9, 2024
1 parent b069959 commit 34f8705
Show file tree
Hide file tree
Showing 8 changed files with 51 additions and 9 deletions.
1 change: 1 addition & 0 deletions ci/scripts/notify.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ def get_mock_test_status(test):
"e2e-clickhouse-sink-tests": "hard_failed",
"e2e-pulsar-sink-tests": "",
"s3-source-test-for-opendal-fs-engine": "",
"s3-source-tests": "",
"pulsar-source-tests": "",
"connector-node-integration-test": ""
}
Expand Down
2 changes: 1 addition & 1 deletion ci/scripts/s3-source-test-for-opendal-fs-engine.sh
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ cargo make ci-start ci-3cn-3fe-opendal-fs-backend

echo "--- Run test"
python3 -m pip install minio psycopg2-binary
python3 e2e_test/s3/$script.py
python3 e2e_test/s3/$script

echo "--- Kill cluster"
rm -rf /tmp/rw_ci
Expand Down
25 changes: 24 additions & 1 deletion ci/workflows/main-cron.yml
Original file line number Diff line number Diff line change
Expand Up @@ -491,7 +491,7 @@ steps:

- label: "S3 source on OpenDAL fs engine"
key: "s3-source-test-for-opendal-fs-engine"
command: "ci/scripts/s3-source-test-for-opendal-fs-engine.sh -p ci-release -s run"
command: "ci/scripts/s3-source-test-for-opendal-fs-engine.sh -p ci-release -s run.csv"
if: |
!(build.pull_request.labels includes "ci/main-cron/skip-ci") && build.env("CI_STEPS") == null
|| build.pull_request.labels includes "ci/run-s3-source-tests"
Expand All @@ -511,6 +511,29 @@ steps:
timeout_in_minutes: 20
retry: *auto-retry

# TODO(Kexiang): Enable this test after we have a GCS_SOURCE_TEST_CONF.
# - label: "GCS source on OpenDAL fs engine"
# key: "s3-source-test-for-opendal-fs-engine"
# command: "ci/scripts/s3-source-test-for-opendal-fs-engine.sh -p ci-release -s gcs.csv"
# if: |
# !(build.pull_request.labels includes "ci/main-cron/skip-ci") && build.env("CI_STEPS") == null
# || build.pull_request.labels includes "ci/run-s3-source-tests"
# || build.env("CI_STEPS") =~ /(^|,)s3-source-tests?(,|$$)/
# depends_on: build
# plugins:
# - seek-oss/aws-sm#v2.3.1:
# env:
# S3_SOURCE_TEST_CONF: ci_s3_source_test_aws
# - docker-compose#v4.9.0:
# run: rw-build-env
# config: ci/docker-compose.yml
# mount-buildkite-agent: true
# environment:
# - S3_SOURCE_TEST_CONF
# - ./ci/plugins/upload-failure-logs
# timeout_in_minutes: 20
# retry: *auto-retry

- label: "pulsar source check"
key: "pulsar-source-tests"
command: "ci/scripts/pulsar-source-test.sh -p ci-release"
Expand Down
2 changes: 1 addition & 1 deletion e2e_test/s3/gcs_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ def _encode():
connector = 'gcs',
match_pattern = '{prefix}*.{fmt}',
gcs.bucket_name = '{config['GCS_BUCKET']}',
gcs.credentials = '{credential}',
gcs.credential = '{credential}',
) FORMAT PLAIN ENCODE {_encode()};''')

total_rows = file_num * item_num_per_file
Expand Down
12 changes: 9 additions & 3 deletions src/connector/src/source/base.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ use super::google_pubsub::GooglePubsubMeta;
use super::kafka::KafkaMeta;
use super::monitor::SourceMetrics;
use super::nexmark::source::message::NexmarkMeta;
use super::OPENDAL_S3_CONNECTOR;
use super::{GCS_CONNECTOR, OPENDAL_S3_CONNECTOR};
use crate::parser::ParserConfig;
pub(crate) use crate::source::common::CommonSplitReader;
use crate::source::filesystem::FsPageItem;
Expand Down Expand Up @@ -386,14 +386,20 @@ impl ConnectorProperties {
pub fn is_new_fs_connector_b_tree_map(with_properties: &BTreeMap<String, String>) -> bool {
with_properties
.get(UPSTREAM_SOURCE_KEY)
.map(|s| s.eq_ignore_ascii_case(OPENDAL_S3_CONNECTOR))
.map(|s| {
s.eq_ignore_ascii_case(OPENDAL_S3_CONNECTOR)
|| s.eq_ignore_ascii_case(GCS_CONNECTOR)
})
.unwrap_or(false)
}

pub fn is_new_fs_connector_hash_map(with_properties: &HashMap<String, String>) -> bool {
with_properties
.get(UPSTREAM_SOURCE_KEY)
.map(|s| s.eq_ignore_ascii_case(OPENDAL_S3_CONNECTOR))
.map(|s| {
s.eq_ignore_ascii_case(OPENDAL_S3_CONNECTOR)
|| s.eq_ignore_ascii_case(GCS_CONNECTOR)
})
.unwrap_or(false)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,13 @@ impl<Src: OpendalSource> OpendalEnumerator<Src> {
builder.bucket(&gcs_properties.bucket_name);

// if credential env is set, use it. Otherwise, ADC will be used.
let cred = gcs_properties.credential;
if let Some(cred) = cred {
if let Some(cred) = gcs_properties.credential {
builder.credential(&cred);
} else {
let cred = std::env::var("GOOGLE_APPLICATION_CREDENTIALS");
if let Ok(cred) = cred {
builder.credential(&cred);
}
}

if let Some(service_account) = gcs_properties.service_account {
Expand Down
7 changes: 6 additions & 1 deletion src/connector/src/source/filesystem/opendal_source/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,15 @@ pub const OPENDAL_S3_CONNECTOR: &str = "s3_v2";
pub struct GcsProperties {
#[serde(rename = "gcs.bucket_name")]
pub bucket_name: String,

/// The base64 encoded credential key. If not set, ADC will be used.
#[serde(rename = "gcs.credential")]
pub credential: Option<String>,

/// If credential/ADC is not set. The service account can be used to provide the credential info.
#[serde(rename = "gcs.service_account", default)]
pub service_account: Option<String>,

#[serde(rename = "match_pattern", default)]
pub match_pattern: Option<String>,

Expand Down Expand Up @@ -94,7 +99,7 @@ pub struct OpendalS3Properties {
#[serde(flatten)]
pub s3_properties: S3PropertiesCommon,

// The following are only supported by s3_v2 (opendal) source.
/// The following are only supported by s3_v2 (opendal) source.
#[serde(rename = "s3.assume_role", default)]
pub assume_role: Option<String>,

Expand Down
3 changes: 3 additions & 0 deletions src/connector/with_options_source.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,11 @@ GcsProperties:
required: true
- name: gcs.credential
field_type: String
comments: The base64 encoded credential key. If not set, ADC will be used.
required: false
- name: gcs.service_account
field_type: String
comments: If credential/ADC is not set. The service account can be used to provide the credential info.
required: false
default: Default::default
- name: match_pattern
Expand Down Expand Up @@ -453,6 +455,7 @@ OpendalS3Properties:
required: false
- name: s3.assume_role
field_type: String
comments: The following are only supported by s3_v2 (opendal) source.
required: false
default: Default::default
PubsubProperties:
Expand Down

0 comments on commit 34f8705

Please sign in to comment.