Skip to content

Commit

Permalink
extract a common struct for s3 sources
Browse files Browse the repository at this point in the history
  • Loading branch information
xxchan committed Dec 26, 2023
1 parent be65e58 commit ecba13b
Show file tree
Hide file tree
Showing 5 changed files with 32 additions and 11 deletions.
7 changes: 5 additions & 2 deletions src/connector/src/source/filesystem/opendal_source/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ pub mod opendal_reader;

use self::opendal_enumerator::OpendalEnumerator;
use self::opendal_reader::OpendalReader;
use super::{OpendalFsSplit, S3Properties};
use super::s3::S3PropertiesCommon;
use super::OpendalFsSplit;
use crate::source::SourceProperties;

pub const GCS_CONNECTOR: &str = "gcs";
Expand Down Expand Up @@ -81,7 +82,9 @@ impl OpendalSource for OpendalGcs {
#[derive(Clone, Debug, Deserialize, PartialEq)]
pub struct OpendalS3Properties {
#[serde(flatten)]
pub s3_properties: S3Properties,
pub s3_properties: S3PropertiesCommon,

// The following are only supported by s3_v2 (opendal) source.
#[serde(rename = "s3.assume_role", default)]
pub assume_role: Option<String>,
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,12 @@ use opendal::Operator;
use super::opendal_enumerator::OpendalEnumerator;
use super::OpendalSource;
use crate::source::filesystem::s3::enumerator::get_prefix;
use crate::source::filesystem::S3Properties;
use crate::source::filesystem::s3::S3PropertiesCommon;

impl<Src: OpendalSource> OpendalEnumerator<Src> {
/// create opendal s3 source.
pub fn new_s3_source(
s3_properties: S3Properties,
s3_properties: S3PropertiesCommon,
assume_role: Option<String>,
) -> anyhow::Result<Self> {
// Create s3 builder.
Expand Down
6 changes: 4 additions & 2 deletions src/connector/src/source/filesystem/s3/enumerator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ impl SplitEnumerator for S3SplitEnumerator {
let config = AwsAuthProps::from(&properties);
let sdk_config = config.build_config().await?;
let s3_client = s3_client(&sdk_config, Some(default_conn_config()));
let properties = properties.common;
let (prefix, matcher) = if let Some(pattern) = properties.match_pattern.as_ref() {
let prefix = get_prefix(pattern);
let matcher = glob::Pattern::new(pattern)
Expand Down Expand Up @@ -125,11 +126,12 @@ mod tests {
}

use super::*;
use crate::source::filesystem::s3::S3PropertiesCommon;
use crate::source::SourceEnumeratorContext;
#[tokio::test]
#[ignore]
async fn test_s3_split_enumerator() {
let props = S3Properties {
let props = S3PropertiesCommon {
region_name: "ap-southeast-1".to_owned(),
bucket_name: "mingchao-s3-source".to_owned(),
match_pattern: Some("happy[0-9].csv".to_owned()),
Expand All @@ -138,7 +140,7 @@ mod tests {
endpoint_url: None,
};
let mut enumerator =
S3SplitEnumerator::new(props.clone(), SourceEnumeratorContext::default().into())
S3SplitEnumerator::new(props.into(), SourceEnumeratorContext::default().into())
.await
.unwrap();
let splits = enumerator.list_splits().await.unwrap();
Expand Down
16 changes: 15 additions & 1 deletion src/connector/src/source/filesystem/s3/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,9 @@ use crate::source::SourceProperties;

pub const S3_CONNECTOR: &str = "s3";

/// These are supported by both `s3` and `s3_v2` (opendal) sources.
#[derive(Clone, Debug, Deserialize, PartialEq)]
pub struct S3Properties {
pub struct S3PropertiesCommon {
#[serde(rename = "s3.region_name")]
pub region_name: String,
#[serde(rename = "s3.bucket_name")]
Expand All @@ -40,6 +41,18 @@ pub struct S3Properties {
pub endpoint_url: Option<String>,
}

#[derive(Clone, Debug, Deserialize, PartialEq)]
pub struct S3Properties {
#[serde(flatten)]
pub common: S3PropertiesCommon,
}

impl From<S3PropertiesCommon> for S3Properties {
fn from(common: S3PropertiesCommon) -> Self {
Self { common }
}
}

impl SourceProperties for S3Properties {
type Split = FsSplit;
type SplitEnumerator = S3SplitEnumerator;
Expand All @@ -50,6 +63,7 @@ impl SourceProperties for S3Properties {

impl From<&S3Properties> for AwsAuthProps {
fn from(props: &S3Properties) -> Self {
let props = &props.common;
Self {
region: Some(props.region_name.clone()),
endpoint: props.endpoint_url.clone(),
Expand Down
10 changes: 6 additions & 4 deletions src/connector/src/source/filesystem/s3/source/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ impl SplitReader for S3FileReader {

let sdk_config = config.build_config().await?;

let bucket_name = props.bucket_name;
let bucket_name = props.common.bucket_name;
let s3_client = s3_client(&sdk_config, Some(default_conn_config()));

let s3_file_reader = S3FileReader {
Expand Down Expand Up @@ -253,20 +253,22 @@ mod tests {
CommonParserConfig, CsvProperties, EncodingProperties, ProtocolProperties,
SpecificParserConfig,
};
use crate::source::filesystem::{S3Properties, S3SplitEnumerator};
use crate::source::filesystem::s3::S3PropertiesCommon;
use crate::source::filesystem::S3SplitEnumerator;
use crate::source::{SourceColumnDesc, SourceEnumeratorContext, SplitEnumerator};

#[tokio::test]
#[ignore]
async fn test_s3_split_reader() {
let props = S3Properties {
let props: S3Properties = S3PropertiesCommon {
region_name: "ap-southeast-1".to_owned(),
bucket_name: "mingchao-s3-source".to_owned(),
match_pattern: None,
access: None,
secret: None,
endpoint_url: None,
};
}
.into();
let mut enumerator =
S3SplitEnumerator::new(props.clone(), SourceEnumeratorContext::default().into())
.await
Expand Down

0 comments on commit ecba13b

Please sign in to comment.