Skip to content

Commit

Permalink
fix(connector): fix gcs source connector (#14373)
Browse files Browse the repository at this point in the history
  • Loading branch information
KeXiangWang authored Jan 8, 2024
1 parent 10f7b77 commit ba1e2e1
Show file tree
Hide file tree
Showing 8 changed files with 50 additions and 9 deletions.
1 change: 1 addition & 0 deletions ci/scripts/notify.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ def get_mock_test_status(test):
"e2e-clickhouse-sink-tests": "hard_failed",
"e2e-pulsar-sink-tests": "",
"s3-source-test-for-opendal-fs-engine": "",
"s3-source-tests": "",
"pulsar-source-tests": "",
"connector-node-integration-test": ""
}
Expand Down
2 changes: 1 addition & 1 deletion ci/scripts/s3-source-test-for-opendal-fs-engine.sh
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ cargo make ci-start ci-3cn-3fe-opendal-fs-backend

echo "--- Run test"
python3 -m pip install minio psycopg2-binary
python3 e2e_test/s3/$script.py
python3 e2e_test/s3/$script

echo "--- Kill cluster"
rm -rf /tmp/rw_ci
Expand Down
26 changes: 25 additions & 1 deletion ci/workflows/main-cron.yml
Original file line number Diff line number Diff line change
Expand Up @@ -490,6 +490,7 @@ steps:
retry: *auto-retry

- label: "PosixFs source on OpenDAL fs engine (csv parser)"
key: "s3-source-test-for-opendal-fs-engine"
command: "ci/scripts/s3-source-test.sh -p ci-release -s 'posix_fs_source.py csv_without_header'"
if: |
!(build.pull_request.labels includes "ci/main-cron/skip-ci") && build.env("CI_STEPS") == null
Expand All @@ -507,7 +508,7 @@ steps:

- label: "S3 source on OpenDAL fs engine"
key: "s3-source-test-for-opendal-fs-engine"
command: "ci/scripts/s3-source-test-for-opendal-fs-engine.sh -p ci-release -s run"
command: "ci/scripts/s3-source-test-for-opendal-fs-engine.sh -p ci-release -s run.csv"
if: |
!(build.pull_request.labels includes "ci/main-cron/skip-ci") && build.env("CI_STEPS") == null
|| build.pull_request.labels includes "ci/run-s3-source-tests"
Expand All @@ -527,6 +528,29 @@ steps:
timeout_in_minutes: 20
retry: *auto-retry

# TODO(Kexiang): Enable this test after we have a GCS_SOURCE_TEST_CONF.
# - label: "GCS source on OpenDAL fs engine"
# key: "s3-source-test-for-opendal-fs-engine"
# command: "ci/scripts/s3-source-test-for-opendal-fs-engine.sh -p ci-release -s gcs.csv"
# if: |
# !(build.pull_request.labels includes "ci/main-cron/skip-ci") && build.env("CI_STEPS") == null
# || build.pull_request.labels includes "ci/run-s3-source-tests"
# || build.env("CI_STEPS") =~ /(^|,)s3-source-tests?(,|$$)/
# depends_on: build
# plugins:
# - seek-oss/aws-sm#v2.3.1:
# env:
# S3_SOURCE_TEST_CONF: ci_s3_source_test_aws
# - docker-compose#v4.9.0:
# run: rw-build-env
# config: ci/docker-compose.yml
# mount-buildkite-agent: true
# environment:
# - S3_SOURCE_TEST_CONF
# - ./ci/plugins/upload-failure-logs
# timeout_in_minutes: 20
# retry: *auto-retry

- label: "pulsar source check"
key: "pulsar-source-tests"
command: "ci/scripts/pulsar-source-test.sh -p ci-release"
Expand Down
2 changes: 1 addition & 1 deletion e2e_test/s3/gcs_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ def _encode():
connector = 'gcs',
match_pattern = '{prefix}*.{fmt}',
gcs.bucket_name = '{config['GCS_BUCKET']}',
gcs.credentials = '{credential}',
gcs.credential = '{credential}',
) FORMAT PLAIN ENCODE {_encode()};''')

total_rows = file_num * item_num_per_file
Expand Down
4 changes: 3 additions & 1 deletion src/connector/src/source/base.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ use super::google_pubsub::GooglePubsubMeta;
use super::kafka::KafkaMeta;
use super::monitor::SourceMetrics;
use super::nexmark::source::message::NexmarkMeta;
use super::{OPENDAL_S3_CONNECTOR, POSIX_FS_CONNECTOR};
use super::{GCS_CONNECTOR, OPENDAL_S3_CONNECTOR, POSIX_FS_CONNECTOR};
use crate::parser::ParserConfig;
pub(crate) use crate::source::common::CommonSplitReader;
use crate::source::filesystem::FsPageItem;
Expand Down Expand Up @@ -400,6 +400,7 @@ impl ConnectorProperties {
.map(|s| {
s.eq_ignore_ascii_case(OPENDAL_S3_CONNECTOR)
|| s.eq_ignore_ascii_case(POSIX_FS_CONNECTOR)
|| s.eq_ignore_ascii_case(GCS_CONNECTOR)
})
.unwrap_or(false)
}
Expand All @@ -410,6 +411,7 @@ impl ConnectorProperties {
.map(|s| {
s.eq_ignore_ascii_case(OPENDAL_S3_CONNECTOR)
|| s.eq_ignore_ascii_case(POSIX_FS_CONNECTOR)
|| s.eq_ignore_ascii_case(GCS_CONNECTOR)
})
.unwrap_or(false)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,13 @@ impl<Src: OpendalSource> OpendalEnumerator<Src> {
builder.bucket(&gcs_properties.bucket_name);

// if credential env is set, use it. Otherwise, ADC will be used.
let cred = gcs_properties.credential;
if let Some(cred) = cred {
if let Some(cred) = gcs_properties.credential {
builder.credential(&cred);
} else {
let cred = std::env::var("GOOGLE_APPLICATION_CREDENTIALS");
if let Ok(cred) = cred {
builder.credential(&cred);
}
}

if let Some(service_account) = gcs_properties.service_account {
Expand Down
11 changes: 8 additions & 3 deletions src/connector/src/source/filesystem/opendal_source/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,15 @@ pub const POSIX_FS_CONNECTOR: &str = "posix_fs";
pub struct GcsProperties {
#[serde(rename = "gcs.bucket_name")]
pub bucket_name: String,

/// The base64 encoded credential key. If not set, ADC will be used.
#[serde(rename = "gcs.credential")]
pub credential: Option<String>,

/// If credential/ADC is not set. The service account can be used to provide the credential info.
#[serde(rename = "gcs.service_account", default)]
pub service_account: Option<String>,

#[serde(rename = "match_pattern", default)]
pub match_pattern: Option<String>,

Expand Down Expand Up @@ -107,7 +112,7 @@ pub struct OpendalS3Properties {
#[serde(flatten)]
pub s3_properties: S3PropertiesCommon,

// The following are only supported by s3_v2 (opendal) source.
/// The following are only supported by s3_v2 (opendal) source.
#[serde(rename = "s3.assume_role", default)]
pub assume_role: Option<String>,

Expand All @@ -131,11 +136,11 @@ impl SourceProperties for OpendalS3Properties {

#[derive(Clone, Debug, Deserialize, PartialEq, WithOptions)]
pub struct PosixFsProperties {
// The root directly of the files to search. The files will be searched recursively.
/// The root directly of the files to search. The files will be searched recursively.
#[serde(rename = "posix_fs.root")]
pub root: String,

// The regex pattern to match files under root directory.
/// The regex pattern to match files under root directory.
#[serde(rename = "match_pattern", default)]
pub match_pattern: Option<String>,

Expand Down
5 changes: 5 additions & 0 deletions src/connector/with_options_source.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,11 @@ GcsProperties:
required: true
- name: gcs.credential
field_type: String
comments: The base64 encoded credential key. If not set, ADC will be used.
required: false
- name: gcs.service_account
field_type: String
comments: If credential/ADC is not set. The service account can be used to provide the credential info.
required: false
default: Default::default
- name: match_pattern
Expand Down Expand Up @@ -453,15 +455,18 @@ OpendalS3Properties:
required: false
- name: s3.assume_role
field_type: String
comments: The following are only supported by s3_v2 (opendal) source.
required: false
default: Default::default
PosixFsProperties:
fields:
- name: posix_fs.root
field_type: String
comments: The root directly of the files to search. The files will be searched recursively.
required: true
- name: match_pattern
field_type: String
comments: The regex pattern to match files under root directory.
required: false
default: Default::default
PubsubProperties:
Expand Down

0 comments on commit ba1e2e1

Please sign in to comment.