Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(connector): fix gcs source connector #14373

Merged
merged 3 commits into from
Jan 8, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions ci/scripts/notify.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ def get_mock_test_status(test):
"e2e-clickhouse-sink-tests": "hard_failed",
"e2e-pulsar-sink-tests": "",
"s3-source-test-for-opendal-fs-engine": "",
"s3-source-tests": "",
"pulsar-source-tests": "",
"connector-node-integration-test": ""
}
Expand Down
2 changes: 1 addition & 1 deletion ci/scripts/s3-source-test-for-opendal-fs-engine.sh
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ cargo make ci-start ci-3cn-3fe-opendal-fs-backend

echo "--- Run test"
python3 -m pip install minio psycopg2-binary
python3 e2e_test/s3/$script.py
python3 e2e_test/s3/$script

echo "--- Kill cluster"
rm -rf /tmp/rw_ci
Expand Down
26 changes: 25 additions & 1 deletion ci/workflows/main-cron.yml
Original file line number Diff line number Diff line change
Expand Up @@ -490,6 +490,7 @@ steps:
retry: *auto-retry

- label: "PosixFs source on OpenDAL fs engine (csv parser)"
key: "s3-source-test-for-opendal-fs-engine"
command: "ci/scripts/s3-source-test.sh -p ci-release -s 'posix_fs_source.py csv_without_header'"
if: |
!(build.pull_request.labels includes "ci/main-cron/skip-ci") && build.env("CI_STEPS") == null
Expand All @@ -507,7 +508,7 @@ steps:

- label: "S3 source on OpenDAL fs engine"
key: "s3-source-test-for-opendal-fs-engine"
command: "ci/scripts/s3-source-test-for-opendal-fs-engine.sh -p ci-release -s run"
command: "ci/scripts/s3-source-test-for-opendal-fs-engine.sh -p ci-release -s run.csv"
if: |
!(build.pull_request.labels includes "ci/main-cron/skip-ci") && build.env("CI_STEPS") == null
|| build.pull_request.labels includes "ci/run-s3-source-tests"
Expand All @@ -527,6 +528,29 @@ steps:
timeout_in_minutes: 20
retry: *auto-retry

# TODO(Kexiang): Enable this test after we have a GCS_SOURCE_TEST_CONF.
KeXiangWang marked this conversation as resolved.
Show resolved Hide resolved
# - label: "GCS source on OpenDAL fs engine"
# key: "s3-source-test-for-opendal-fs-engine"
# command: "ci/scripts/s3-source-test-for-opendal-fs-engine.sh -p ci-release -s gcs.csv"
# if: |
# !(build.pull_request.labels includes "ci/main-cron/skip-ci") && build.env("CI_STEPS") == null
# || build.pull_request.labels includes "ci/run-s3-source-tests"
# || build.env("CI_STEPS") =~ /(^|,)s3-source-tests?(,|$$)/
# depends_on: build
# plugins:
# - seek-oss/aws-sm#v2.3.1:
# env:
# S3_SOURCE_TEST_CONF: ci_s3_source_test_aws
# - docker-compose#v4.9.0:
# run: rw-build-env
# config: ci/docker-compose.yml
# mount-buildkite-agent: true
# environment:
# - S3_SOURCE_TEST_CONF
# - ./ci/plugins/upload-failure-logs
# timeout_in_minutes: 20
# retry: *auto-retry

- label: "pulsar source check"
key: "pulsar-source-tests"
command: "ci/scripts/pulsar-source-test.sh -p ci-release"
Expand Down
2 changes: 1 addition & 1 deletion e2e_test/s3/gcs_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ def _encode():
connector = 'gcs',
match_pattern = '{prefix}*.{fmt}',
gcs.bucket_name = '{config['GCS_BUCKET']}',
gcs.credentials = '{credential}',
gcs.credential = '{credential}',
) FORMAT PLAIN ENCODE {_encode()};''')

total_rows = file_num * item_num_per_file
Expand Down
4 changes: 3 additions & 1 deletion src/connector/src/source/base.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ use super::google_pubsub::GooglePubsubMeta;
use super::kafka::KafkaMeta;
use super::monitor::SourceMetrics;
use super::nexmark::source::message::NexmarkMeta;
use super::{OPENDAL_S3_CONNECTOR, POSIX_FS_CONNECTOR};
use super::{GCS_CONNECTOR, OPENDAL_S3_CONNECTOR, POSIX_FS_CONNECTOR};
use crate::parser::ParserConfig;
pub(crate) use crate::source::common::CommonSplitReader;
use crate::source::filesystem::FsPageItem;
Expand Down Expand Up @@ -389,6 +389,7 @@ impl ConnectorProperties {
.map(|s| {
s.eq_ignore_ascii_case(OPENDAL_S3_CONNECTOR)
|| s.eq_ignore_ascii_case(POSIX_FS_CONNECTOR)
|| s.eq_ignore_ascii_case(GCS_CONNECTOR)
})
.unwrap_or(false)
}
Expand All @@ -399,6 +400,7 @@ impl ConnectorProperties {
.map(|s| {
s.eq_ignore_ascii_case(OPENDAL_S3_CONNECTOR)
|| s.eq_ignore_ascii_case(POSIX_FS_CONNECTOR)
|| s.eq_ignore_ascii_case(GCS_CONNECTOR)
})
.unwrap_or(false)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,13 @@ impl<Src: OpendalSource> OpendalEnumerator<Src> {
builder.bucket(&gcs_properties.bucket_name);

// if credential env is set, use it. Otherwise, ADC will be used.
let cred = gcs_properties.credential;
if let Some(cred) = cred {
if let Some(cred) = gcs_properties.credential {
builder.credential(&cred);
} else {
let cred = std::env::var("GOOGLE_APPLICATION_CREDENTIALS");
if let Ok(cred) = cred {
builder.credential(&cred);
}
}

if let Some(service_account) = gcs_properties.service_account {
Expand Down
11 changes: 8 additions & 3 deletions src/connector/src/source/filesystem/opendal_source/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,15 @@ pub const POSIX_FS_CONNECTOR: &str = "posix_fs";
pub struct GcsProperties {
#[serde(rename = "gcs.bucket_name")]
pub bucket_name: String,

/// The base64 encoded credential key. If not set, ADC will be used.
KeXiangWang marked this conversation as resolved.
Show resolved Hide resolved
#[serde(rename = "gcs.credential")]
pub credential: Option<String>,

/// If credential/ADC is not set. The service account can be used to provide the credential info.
#[serde(rename = "gcs.service_account", default)]
pub service_account: Option<String>,

#[serde(rename = "match_pattern", default)]
pub match_pattern: Option<String>,

Expand Down Expand Up @@ -107,7 +112,7 @@ pub struct OpendalS3Properties {
#[serde(flatten)]
pub s3_properties: S3PropertiesCommon,

// The following are only supported by s3_v2 (opendal) source.
/// The following are only supported by s3_v2 (opendal) source.
#[serde(rename = "s3.assume_role", default)]
pub assume_role: Option<String>,

Expand All @@ -131,11 +136,11 @@ impl SourceProperties for OpendalS3Properties {

#[derive(Clone, Debug, Deserialize, PartialEq, WithOptions)]
pub struct PosixFsProperties {
// The root directly of the files to search. The files will be searched recursively.
/// The root directly of the files to search. The files will be searched recursively.
#[serde(rename = "posix_fs.root")]
pub root: String,

// The regex pattern to match files under root directory.
/// The regex pattern to match files under root directory.
#[serde(rename = "match_pattern", default)]
pub match_pattern: Option<String>,

Expand Down
Loading