Skip to content

Commit

Permalink
revert box in FileSink struct
Browse files Browse the repository at this point in the history
  • Loading branch information
wcy-fdu committed Aug 21, 2024
1 parent fdd1472 commit 807c869
Show file tree
Hide file tree
Showing 4 changed files with 28 additions and 28 deletions.
14 changes: 7 additions & 7 deletions src/connector/src/sink/file_sink/fs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ impl UnknownFields for FsConfig {
pub const FS_SINK: &str = "fs";

impl<S: OpendalSinkBackend> FileSink<S> {
pub fn new_fs_sink(config: FsConfig) -> Result<Box<Operator>> {
pub fn new_fs_sink(config: FsConfig) -> Result<Operator> {
// Create fs builder.
let mut builder = Fs::default();
// Create fs backend builder.
Expand All @@ -62,7 +62,7 @@ impl<S: OpendalSinkBackend> FileSink<S> {
.layer(LoggingLayer::default())
.layer(RetryLayer::default())
.finish();
Ok(Box::new(operator))
Ok(operator)
}
}

Expand All @@ -74,7 +74,7 @@ impl OpendalSinkBackend for FsSink {

const SINK_NAME: &'static str = FS_SINK;

fn from_btreemap(btree_map: BTreeMap<String, String>) -> Result<Box<Self::Properties>> {
fn from_btreemap(btree_map: BTreeMap<String, String>) -> Result<Self::Properties> {
let config = serde_json::from_value::<FsConfig>(serde_json::to_value(btree_map).unwrap())
.map_err(|e| SinkError::Config(anyhow!(e)))?;
if config.r#type != SINK_TYPE_APPEND_ONLY && config.r#type != SINK_TYPE_UPSERT {
Expand All @@ -85,15 +85,15 @@ impl OpendalSinkBackend for FsSink {
SINK_TYPE_UPSERT
)));
}
Ok(Box::new(config))
Ok(config)
}

fn new_operator(properties: FsConfig) -> Result<Box<Operator>> {
fn new_operator(properties: FsConfig) -> Result<Operator> {
FileSink::<FsSink>::new_fs_sink(properties)
}

fn get_path(properties: Self::Properties) -> Box<str> {
properties.common.path.into_boxed_str()
fn get_path(properties: Self::Properties) -> String {
properties.common.path
}

fn get_engine_type() -> super::opendal_sink::EngineType {
Expand Down
14 changes: 7 additions & 7 deletions src/connector/src/sink/file_sink/gcs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ impl UnknownFields for GcsConfig {
pub const GCS_SINK: &str = "gcs";

impl<S: OpendalSinkBackend> FileSink<S> {
pub fn new_gcs_sink(config: GcsConfig) -> Result<Box<Operator>> {
pub fn new_gcs_sink(config: GcsConfig) -> Result<Operator> {
// Create gcs builder.
let mut builder = Gcs::default();

Expand All @@ -79,7 +79,7 @@ impl<S: OpendalSinkBackend> FileSink<S> {
.layer(LoggingLayer::default())
.layer(RetryLayer::default())
.finish();
Ok(Box::new(operator))
Ok(operator)
}
}

Expand All @@ -91,7 +91,7 @@ impl OpendalSinkBackend for GcsSink {

const SINK_NAME: &'static str = GCS_SINK;

fn from_btreemap(btree_map: BTreeMap<String, String>) -> Result<Box<Self::Properties>> {
fn from_btreemap(btree_map: BTreeMap<String, String>) -> Result<Self::Properties> {
let config = serde_json::from_value::<GcsConfig>(serde_json::to_value(btree_map).unwrap())
.map_err(|e| SinkError::Config(anyhow!(e)))?;
if config.r#type != SINK_TYPE_APPEND_ONLY && config.r#type != SINK_TYPE_UPSERT {
Expand All @@ -102,15 +102,15 @@ impl OpendalSinkBackend for GcsSink {
SINK_TYPE_UPSERT
)));
}
Ok(Box::new(config))
Ok(config)
}

fn new_operator(properties: GcsConfig) -> Result<Box<Operator>> {
fn new_operator(properties: GcsConfig) -> Result<Operator> {
FileSink::<GcsSink>::new_gcs_sink(properties)
}

fn get_path(properties: Self::Properties) -> Box<str> {
properties.common.path.into_boxed_str()
fn get_path(properties: Self::Properties) -> String {
properties.common.path
}

fn get_engine_type() -> super::opendal_sink::EngineType {
Expand Down
14 changes: 7 additions & 7 deletions src/connector/src/sink/file_sink/opendal_sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ use crate::with_options::WithOptions;
/// - S: The type parameter S represents the concrete implementation of the `OpendalSinkBackend` trait used by this file sink.
#[derive(Debug, Clone)]
pub struct FileSink<S: OpendalSinkBackend> {
pub(crate) op: Box<Operator>,
pub(crate) op: Operator,
/// The path to the file where the sink writes data.
pub(crate) path: String,
/// The schema describing the structure of the data being written to the file sink.
Expand Down Expand Up @@ -75,9 +75,9 @@ pub trait OpendalSinkBackend: Send + Sync + 'static + Clone + PartialEq {
type Properties: TryFromBTreeMap + Send + Sync + Clone + WithOptions;
const SINK_NAME: &'static str;

fn from_btreemap(btree_map: BTreeMap<String, String>) -> Result<Box<Self::Properties>>;
fn new_operator(properties: Self::Properties) -> Result<Box<Operator>>;
fn get_path(properties: Self::Properties) -> Box<str>;
fn from_btreemap(btree_map: BTreeMap<String, String>) -> Result<Self::Properties>;
fn new_operator(properties: Self::Properties) -> Result<Operator>;
fn get_path(properties: Self::Properties) -> String;
fn get_engine_type() -> EngineType;
}

Expand Down Expand Up @@ -133,7 +133,7 @@ impl<S: OpendalSinkBackend> TryFrom<SinkParam> for FileSink<S> {

fn try_from(param: SinkParam) -> std::result::Result<Self, Self::Error> {
let schema = param.schema();
let config = *S::from_btreemap(param.properties)?;
let config = S::from_btreemap(param.properties)?;
let path = S::get_path(config.clone()).to_string();
let op = S::new_operator(config.clone())?;
let engine_type = S::get_engine_type();
Expand All @@ -153,7 +153,7 @@ impl<S: OpendalSinkBackend> TryFrom<SinkParam> for FileSink<S> {

pub struct OpenDalSinkWriter {
schema: SchemaRef,
operator: Box<Operator>,
operator: Operator,
sink_writer: Option<FileWriterEnum>,
is_append_only: bool,
write_path: String,
Expand Down Expand Up @@ -219,7 +219,7 @@ impl SinkWriter for OpenDalSinkWriter {

impl OpenDalSinkWriter {
pub fn new(
operator: Box<Operator>,
operator: Operator,
write_path: &str,
rw_schema: Schema,
is_append_only: bool,
Expand Down
14 changes: 7 additions & 7 deletions src/connector/src/sink/file_sink/s3.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ pub struct S3Config {
pub const S3_SINK: &str = "s3";

impl<S: OpendalSinkBackend> FileSink<S> {
pub fn new_s3_sink(config: S3Config) -> Result<Box<Operator>> {
pub fn new_s3_sink(config: S3Config) -> Result<Operator> {
// Create s3 builder.
let mut builder = S3::default();
builder.bucket(&config.common.bucket_name);
Expand Down Expand Up @@ -96,7 +96,7 @@ impl<S: OpendalSinkBackend> FileSink<S> {
.layer(RetryLayer::default())
.finish();

Ok(Box::new(operator))
Ok(operator)
}
}

Expand All @@ -114,7 +114,7 @@ impl OpendalSinkBackend for S3Sink {

const SINK_NAME: &'static str = S3_SINK;

fn from_btreemap(btree_map: BTreeMap<String, String>) -> Result<Box<Self::Properties>> {
fn from_btreemap(btree_map: BTreeMap<String, String>) -> Result<Self::Properties> {
let config = serde_json::from_value::<S3Config>(serde_json::to_value(btree_map).unwrap())
.map_err(|e| SinkError::Config(anyhow!(e)))?;
if config.r#type != SINK_TYPE_APPEND_ONLY && config.r#type != SINK_TYPE_UPSERT {
Expand All @@ -125,15 +125,15 @@ impl OpendalSinkBackend for S3Sink {
SINK_TYPE_UPSERT
)));
}
Ok(Box::new(config))
Ok(config)
}

fn new_operator(properties: S3Config) -> Result<Box<Operator>> {
fn new_operator(properties: S3Config) -> Result<Operator> {
FileSink::<S3Sink>::new_s3_sink(properties)
}

fn get_path(properties: Self::Properties) -> Box<str> {
properties.common.path.into_boxed_str()
fn get_path(properties: Self::Properties) -> String {
properties.common.path
}

fn get_engine_type() -> super::opendal_sink::EngineType {
Expand Down

0 comments on commit 807c869

Please sign in to comment.