Skip to content

Commit

Permalink
fix gcs-source
Browse files Browse the repository at this point in the history
  • Loading branch information
KeXiangWang committed Jan 5, 2024
1 parent a353b93 commit 393d414
Show file tree
Hide file tree
Showing 5 changed files with 41 additions and 7 deletions.
1 change: 1 addition & 0 deletions ci/scripts/notify.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ def get_mock_test_status(test):
"e2e-clickhouse-sink-tests": "hard_failed",
"e2e-pulsar-sink-tests": "",
"s3-source-test-for-opendal-fs-engine": "",
"s3-source-tests": "",
"pulsar-source-tests": "",
"connector-node-integration-test": ""
}
Expand Down
2 changes: 1 addition & 1 deletion ci/scripts/s3-source-test-for-opendal-fs-engine.sh
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ cargo make ci-start ci-3cn-3fe-opendal-fs-backend

echo "--- Run test"
python3 -m pip install minio psycopg2-binary
python3 e2e_test/s3/$script.py
python3 e2e_test/s3/$script

echo "--- Kill cluster"
rm -rf /tmp/rw_ci
Expand Down
26 changes: 25 additions & 1 deletion ci/workflows/main-cron.yml
Original file line number Diff line number Diff line change
Expand Up @@ -490,6 +490,7 @@ steps:
retry: *auto-retry

- label: "PosixFs source on OpenDAL fs engine (csv parser)"
key: "s3-source-test-for-opendal-fs-engine"
command: "ci/scripts/s3-source-test.sh -p ci-release -s 'posix_fs_source.py csv_without_header'"
if: |
!(build.pull_request.labels includes "ci/main-cron/skip-ci") && build.env("CI_STEPS") == null
Expand All @@ -507,7 +508,7 @@ steps:

- label: "S3 source on OpenDAL fs engine"
key: "s3-source-test-for-opendal-fs-engine"
command: "ci/scripts/s3-source-test-for-opendal-fs-engine.sh -p ci-release -s run"
command: "ci/scripts/s3-source-test-for-opendal-fs-engine.sh -p ci-release -s run.csv"
if: |
!(build.pull_request.labels includes "ci/main-cron/skip-ci") && build.env("CI_STEPS") == null
|| build.pull_request.labels includes "ci/run-s3-source-tests"
Expand All @@ -527,6 +528,29 @@ steps:
timeout_in_minutes: 20
retry: *auto-retry

# TODO(Kexiang): Enable this test after we have a GCS_SOURCE_TEST_CONF.
# - label: "GCS source on OpenDAL fs engine"
# key: "s3-source-test-for-opendal-fs-engine"
# command: "ci/scripts/s3-source-test-for-opendal-fs-engine.sh -p ci-release -s gcs.csv"
# if: |
# !(build.pull_request.labels includes "ci/main-cron/skip-ci") && build.env("CI_STEPS") == null
# || build.pull_request.labels includes "ci/run-s3-source-tests"
# || build.env("CI_STEPS") =~ /(^|,)s3-source-tests?(,|$$)/
# depends_on: build
# plugins:
# - seek-oss/aws-sm#v2.3.1:
# env:
# S3_SOURCE_TEST_CONF: ci_s3_source_test_aws
# - docker-compose#v4.9.0:
# run: rw-build-env
# config: ci/docker-compose.yml
# mount-buildkite-agent: true
# environment:
# - S3_SOURCE_TEST_CONF
# - ./ci/plugins/upload-failure-logs
# timeout_in_minutes: 20
# retry: *auto-retry

- label: "pulsar source check"
key: "pulsar-source-tests"
command: "ci/scripts/pulsar-source-test.sh -p ci-release"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,13 @@ impl<Src: OpendalSource> OpendalEnumerator<Src> {
builder.bucket(&gcs_properties.bucket_name);

// if credential env is set, use it. Otherwise, ADC will be used.
let cred = gcs_properties.credential;
if let Some(cred) = cred {
if let Some(cred) = gcs_properties.credential {
builder.credential(&cred);
} else {
let cred = std::env::var("GOOGLE_APPLICATION_CREDENTIALS");
if let Ok(cred) = cred {
builder.credential(&cred);
}
}

if let Some(service_account) = gcs_properties.service_account {
Expand Down
11 changes: 8 additions & 3 deletions src/connector/src/source/filesystem/opendal_source/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,15 @@ pub const POSIX_FS_CONNECTOR: &str = "posix_fs";
pub struct GcsProperties {
#[serde(rename = "gcs.bucket_name")]
pub bucket_name: String,

/// The base64 encoded credential key. If not set, ADC will be used.
#[serde(rename = "gcs.credential")]
pub credential: Option<String>,

/// If credential/ADC is not set. The service account can be used to provide the credential info.
#[serde(rename = "gcs.service_account", default)]
pub service_account: Option<String>,

#[serde(rename = "match_pattern", default)]
pub match_pattern: Option<String>,

Expand Down Expand Up @@ -107,7 +112,7 @@ pub struct OpendalS3Properties {
#[serde(flatten)]
pub s3_properties: S3PropertiesCommon,

// The following are only supported by s3_v2 (opendal) source.
/// The following are only supported by s3_v2 (opendal) source.
#[serde(rename = "s3.assume_role", default)]
pub assume_role: Option<String>,

Expand All @@ -131,11 +136,11 @@ impl SourceProperties for OpendalS3Properties {

#[derive(Clone, Debug, Deserialize, PartialEq, WithOptions)]
pub struct PosixFsProperties {
// The root directly of the files to search. The files will be searched recursively.
/// The root directly of the files to search. The files will be searched recursively.
#[serde(rename = "posix_fs.root")]
pub root: String,

// The regex pattern to match files under root directory.
/// The regex pattern to match files under root directory.
#[serde(rename = "match_pattern", default)]
pub match_pattern: Option<String>,

Expand Down

0 comments on commit 393d414

Please sign in to comment.