diff --git a/e2e_test/s3/file_sink.py b/e2e_test/s3/file_sink.py index 04042eb7817af..a3113b430960a 100644 --- a/e2e_test/s3/file_sink.py +++ b/e2e_test/s3/file_sink.py @@ -141,7 +141,6 @@ def _table(): s3.credentials.secret = 'hummockadmin', s3.endpoint_url = 'http://hummock001.127.0.0.1:9301', s3.path = 'test_parquet_sink/', - s3.file_type = 'parquet', type = 'append-only', force_append_only='true' ) FORMAT PLAIN ENCODE PARQUET(force_append_only='true');''') @@ -185,6 +184,7 @@ def _table(): stmt = f'select count(*), sum(id) from test_parquet_sink_table' print(f'Execute reading sink files: {stmt}') + print(f'Create snowflake s3 sink ') # Execute a SELECT statement cur.execute(f'''CREATE sink test_file_sink_json as select id, @@ -201,15 +201,14 @@ def _table(): test_timestamp, test_timestamptz from {_table()} WITH ( - connector = 's3', + connector = 'snowflake', match_pattern = '*.parquet', - s3.region_name = 'custom', - s3.bucket_name = 'hummock001', - s3.credentials.access = 'hummockadmin', - s3.credentials.secret = 'hummockadmin', + snowflake.aws_region = 'custom', + snowflake.s3_bucket = 'hummock001', + snowflake.aws_access_key_id = 'hummockadmin', + snowflake.aws_secret_access_key = 'hummockadmin', s3.endpoint_url = 'http://hummock001.127.0.0.1:9301', s3.path = 'test_json_sink/', - s3.file_type = 'json', type = 'append-only', force_append_only='true' ) FORMAT PLAIN ENCODE JSON(force_append_only='true');''') @@ -297,7 +296,6 @@ def test_file_sink_batching(): s3.credentials.secret = 'hummockadmin', s3.endpoint_url = 'http://hummock001.127.0.0.1:9301', s3.path = 'test_file_sink_batching/', - s3.file_type = 'parquet', type = 'append-only', rollover_seconds = 5, max_row_count = 5, diff --git a/e2e_test/sink/append_only_sink.slt b/e2e_test/sink/append_only_sink.slt index 486c5af6f94a0..2136b577c3cf8 100644 --- a/e2e_test/sink/append_only_sink.slt +++ b/e2e_test/sink/append_only_sink.slt @@ -38,7 +38,6 @@ WITH s3.region_name = 'test', s3.bucket_name = 'test', s3.path = '', - s3.file_type = 'parquet', type = 'append-only', force_append_only='true' ) FORMAT PLAIN ENCODE PARQUET(force_append_only='true'); diff --git a/e2e_test/sink/license.slt b/e2e_test/sink/license.slt index 6e65b3653a536..1ba7d6ee38114 100644 --- a/e2e_test/sink/license.slt +++ b/e2e_test/sink/license.slt @@ -119,7 +119,6 @@ WITH s3.region_name = 'us-east-1', s3.bucket_name = 'test', s3.path = '', - s3.file_type = 'parquet', type = 'append-only', force_append_only='true' ) FORMAT PLAIN ENCODE PARQUET(force_append_only='true'); diff --git a/src/connector/src/sink/catalog/mod.rs b/src/connector/src/sink/catalog/mod.rs index 23f34eab97417..fec6e34ee752b 100644 --- a/src/connector/src/sink/catalog/mod.rs +++ b/src/connector/src/sink/catalog/mod.rs @@ -223,6 +223,18 @@ impl SinkFormatDesc { secret_refs: self.secret_refs.clone(), } } + + // This function is for compatibility purposes. It sets the `SinkFormatDesc` + // when there is no configuration provided for the snowflake sink only. + pub fn plain_json_for_snowflake_only() -> Self { + Self { + format: SinkFormat::AppendOnly, + encode: SinkEncode::Json, + options: Default::default(), + secret_refs: Default::default(), + key_encode: None, + } + } } impl TryFrom for SinkFormatDesc { diff --git a/src/connector/src/sink/file_sink/opendal_sink.rs b/src/connector/src/sink/file_sink/opendal_sink.rs index 2a491e7765fbe..9583a6e97ff31 100644 --- a/src/connector/src/sink/file_sink/opendal_sink.rs +++ b/src/connector/src/sink/file_sink/opendal_sink.rs @@ -110,6 +110,7 @@ pub enum EngineType { Fs, Azblob, Webhdfs, + Snowflake, } impl Sink for FileSink { @@ -169,16 +170,23 @@ impl TryFrom for FileSink { let op = S::new_operator(config.clone())?; let batching_strategy = S::get_batching_strategy(config.clone()); let engine_type = S::get_engine_type(); - + let format_desc = match param.format_desc { + Some(desc) => desc, + None => { + if let EngineType::Snowflake = engine_type { + SinkFormatDesc::plain_json_for_snowflake_only() + } else { + return Err(SinkError::Config(anyhow!("missing FORMAT ... ENCODE ..."))); + } + } + }; Ok(Self { op, path, schema, is_append_only: param.sink_type.is_append_only(), batching_strategy, - format_desc: param - .format_desc - .ok_or_else(|| SinkError::Config(anyhow!("missing FORMAT ... ENCODE ...")))?, + format_desc, engine_type, _marker: PhantomData, }) @@ -390,28 +398,24 @@ impl OpenDalSinkWriter { // 1. A subdirectory is defined based on `path_partition_prefix` (e.g., by day态hour or month or none.). // 2. The file name includes the `executor_id` and the creation time in seconds since the UNIX epoch. // If the engine type is `Fs`, the path is automatically handled, and the filename does not include a path prefix. - let object_name = match self.engine_type { - // For the local fs sink, the data will be automatically written to the defined path. - // Therefore, there is no need to specify the path in the file name. - EngineType::Fs => { - format!( - "{}{}_{}.{}", - self.path_partition_prefix(&create_time), - self.executor_id, - create_time.as_secs(), - suffix - ) - } - _ => format!( - "{}/{}{}_{}.{}", - self.write_path, + // 3. For the Snowflake Sink, the `write_path` parameter can be empty. + // When the `write_path` is not specified, the data will be written to the root of the specified bucket. + let object_name = { + let base_path = match self.engine_type { + EngineType::Fs => "".to_string(), + EngineType::Snowflake if self.write_path.is_empty() => "".to_string(), + _ => format!("{}/", self.write_path), + }; + + format!( + "{}{}{}_{}.{}", + base_path, self.path_partition_prefix(&create_time), self.executor_id, create_time.as_secs(), suffix, - ), + ) }; - Ok(self .operator .writer_with(&object_name) diff --git a/src/connector/src/sink/file_sink/s3.rs b/src/connector/src/sink/file_sink/s3.rs index 0e11f125097f1..86eaafbcac4bb 100644 --- a/src/connector/src/sink/file_sink/s3.rs +++ b/src/connector/src/sink/file_sink/s3.rs @@ -27,16 +27,24 @@ use crate::sink::{Result, SinkError, SINK_TYPE_APPEND_ONLY, SINK_TYPE_OPTION, SI use crate::source::UnknownFields; #[derive(Deserialize, Debug, Clone, WithOptions)] pub struct S3Common { - #[serde(rename = "s3.region_name")] + #[serde(rename = "s3.region_name", alias = "snowflake.aws_region")] pub region_name: String, - #[serde(rename = "s3.bucket_name")] + #[serde(rename = "s3.bucket_name", alias = "snowflake.s3_bucket")] pub bucket_name: String, /// The directory where the sink file is located. - #[serde(rename = "s3.path")] - pub path: String, - #[serde(rename = "s3.credentials.access", default)] + #[serde(rename = "s3.path", alias = "snowflake.s3_path", default)] + pub path: Option, + #[serde( + rename = "s3.credentials.access", + alias = "snowflake.aws_access_key_id", + default + )] pub access: Option, - #[serde(rename = "s3.credentials.secret", default)] + #[serde( + rename = "s3.credentials.secret", + alias = "snowflake.aws_secret_access_key", + default + )] pub secret: Option, #[serde(rename = "s3.endpoint_url")] pub endpoint_url: Option, @@ -136,7 +144,7 @@ impl OpendalSinkBackend for S3Sink { } fn get_path(properties: Self::Properties) -> String { - properties.common.path + properties.common.path.unwrap_or_default() } fn get_engine_type() -> super::opendal_sink::EngineType { @@ -151,3 +159,48 @@ impl OpendalSinkBackend for S3Sink { } } } + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub struct SnowflakeSink; + +pub const SNOWFLAKE_SINK: &str = "snowflake"; + +impl OpendalSinkBackend for SnowflakeSink { + type Properties = S3Config; + + const SINK_NAME: &'static str = SNOWFLAKE_SINK; + + fn from_btreemap(btree_map: BTreeMap) -> Result { + let config = serde_json::from_value::(serde_json::to_value(btree_map).unwrap()) + .map_err(|e| SinkError::Config(anyhow!(e)))?; + if config.r#type != SINK_TYPE_APPEND_ONLY && config.r#type != SINK_TYPE_UPSERT { + return Err(SinkError::Config(anyhow!( + "`{}` must be {}, or {}", + SINK_TYPE_OPTION, + SINK_TYPE_APPEND_ONLY, + SINK_TYPE_UPSERT + ))); + } + Ok(config) + } + + fn new_operator(properties: S3Config) -> Result { + FileSink::::new_s3_sink(properties) + } + + fn get_path(properties: Self::Properties) -> String { + properties.common.path.unwrap_or_default() + } + + fn get_engine_type() -> super::opendal_sink::EngineType { + super::opendal_sink::EngineType::Snowflake + } + + fn get_batching_strategy(properties: Self::Properties) -> BatchingStrategy { + BatchingStrategy { + max_row_count: properties.batching_strategy.max_row_count, + rollover_seconds: properties.batching_strategy.rollover_seconds, + path_partition_prefix: properties.batching_strategy.path_partition_prefix, + } + } +} diff --git a/src/connector/src/sink/mod.rs b/src/connector/src/sink/mod.rs index 2111b33e29ef7..03b2fc68476ce 100644 --- a/src/connector/src/sink/mod.rs +++ b/src/connector/src/sink/mod.rs @@ -37,7 +37,6 @@ pub mod nats; pub mod pulsar; pub mod redis; pub mod remote; -pub mod snowflake; pub mod sqlserver; pub mod starrocks; pub mod test_sink; @@ -128,7 +127,7 @@ macro_rules! for_all_sinks { { Webhdfs, $crate::sink::file_sink::opendal_sink::FileSink<$crate::sink::file_sink::webhdfs::WebhdfsSink>}, { Fs, $crate::sink::file_sink::opendal_sink::FileSink }, - { Snowflake, $crate::sink::snowflake::SnowflakeSink }, + { Snowflake, $crate::sink::file_sink::opendal_sink::FileSink<$crate::sink::file_sink::s3::SnowflakeSink>}, { DeltaLake, $crate::sink::deltalake::DeltaLakeSink }, { BigQuery, $crate::sink::big_query::BigQuerySink }, { DynamoDb, $crate::sink::dynamodb::DynamoDbSink }, @@ -837,12 +836,6 @@ pub enum SinkError { Starrocks(String), #[error("File error: {0}")] File(String), - #[error("Snowflake error: {0}")] - Snowflake( - #[source] - #[backtrace] - anyhow::Error, - ), #[error("Pulsar error: {0}")] Pulsar( #[source] diff --git a/src/connector/src/sink/snowflake.rs b/src/connector/src/sink/snowflake.rs deleted file mode 100644 index 9e65ba6f4ea02..0000000000000 --- a/src/connector/src/sink/snowflake.rs +++ /dev/null @@ -1,401 +0,0 @@ -// Copyright 2024 RisingWave Labs -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use std::collections::BTreeMap; -use std::fmt::Write; -use std::sync::Arc; - -use anyhow::{anyhow, Context}; -use async_trait::async_trait; -use bytes::{Bytes, BytesMut}; -use risingwave_common::array::{Op, StreamChunk}; -use risingwave_common::bitmap::Bitmap; -use risingwave_common::catalog::Schema; -use risingwave_common::config::ObjectStoreConfig; -use risingwave_object_store::object::object_metrics::GLOBAL_OBJECT_STORE_METRICS; -use risingwave_object_store::object::{ - ObjectStore, OpendalObjectStore, OpendalStreamingUploader, StreamingUploader, -}; -use serde::Deserialize; -use serde_json::Value; -use serde_with::serde_as; -use uuid::Uuid; -use with_options::WithOptions; - -use super::encoder::{ - JsonEncoder, JsonbHandlingMode, RowEncoder, TimeHandlingMode, TimestampHandlingMode, - TimestamptzHandlingMode, -}; -use super::writer::LogSinkerOf; -use super::{SinkError, SinkParam, SinkWriterMetrics}; -use crate::sink::writer::SinkWriterExt; -use crate::sink::{DummySinkCommitCoordinator, Result, Sink, SinkWriter, SinkWriterParam}; - -pub const SNOWFLAKE_SINK: &str = "snowflake"; -const S3_INTERMEDIATE_FILE_NAME: &str = "RW_SNOWFLAKE_S3_SINK_FILE"; - -#[derive(Debug, Clone, Deserialize, WithOptions)] -pub struct SnowflakeCommon { - /// The s3 bucket where intermediate sink files will be stored - #[serde(rename = "snowflake.s3_bucket", alias = "s3.bucket_name")] - pub s3_bucket: String, - - /// The optional s3 path to be specified - /// the actual file location would be `s3:////` - /// if this field is specified by user(s) - /// otherwise it would be `s3:///` - #[serde(rename = "snowflake.s3_path", alias = "s3.path")] - pub s3_path: Option, - - /// s3 credentials - #[serde( - rename = "snowflake.aws_access_key_id", - alias = "s3.credentials.access" - )] - pub aws_access_key_id: String, - - /// s3 credentials - #[serde( - rename = "snowflake.aws_secret_access_key", - alias = "s3.credentials.secret" - )] - pub aws_secret_access_key: String, - - /// The s3 region, e.g., us-east-2 - #[serde(rename = "snowflake.aws_region", alias = "s3.region_name")] - pub aws_region: String, -} - -#[serde_as] -#[derive(Clone, Debug, Deserialize, WithOptions)] -pub struct SnowflakeConfig { - #[serde(flatten)] - pub common: SnowflakeCommon, -} - -impl SnowflakeConfig { - pub fn from_btreemap(properties: BTreeMap) -> Result { - let config = - serde_json::from_value::(serde_json::to_value(properties).unwrap()) - .map_err(|e| SinkError::Config(anyhow!(e)))?; - Ok(config) - } -} - -#[derive(Debug)] -pub struct SnowflakeSink { - pub config: SnowflakeConfig, - schema: Schema, - pk_indices: Vec, - is_append_only: bool, -} - -impl Sink for SnowflakeSink { - type Coordinator = DummySinkCommitCoordinator; - type LogSinker = LogSinkerOf; - - const SINK_NAME: &'static str = SNOWFLAKE_SINK; - - async fn new_log_sinker(&self, writer_param: SinkWriterParam) -> Result { - Ok(SnowflakeSinkWriter::new( - self.config.clone(), - self.schema.clone(), - self.pk_indices.clone(), - self.is_append_only, - )? - .into_log_sinker(SinkWriterMetrics::new(&writer_param))) - } - - async fn validate(&self) -> Result<()> { - risingwave_common::license::Feature::SnowflakeSink - .check_available() - .map_err(|e| anyhow::anyhow!(e))?; - if !self.is_append_only { - return Err(SinkError::Config( - anyhow!("SnowflakeSink only supports append-only mode at present, please change the query to append-only, or use `force_append_only = 'true'`") - )); - } - Ok(()) - } -} - -impl TryFrom for SnowflakeSink { - type Error = SinkError; - - fn try_from(param: SinkParam) -> std::result::Result { - let schema = param.schema(); - let config = SnowflakeConfig::from_btreemap(param.properties)?; - Ok(SnowflakeSink { - config, - schema, - pk_indices: param.downstream_pk, - is_append_only: param.sink_type.is_append_only(), - }) - } -} - -pub struct SnowflakeSinkWriter { - #[expect(dead_code)] - config: SnowflakeConfig, - #[expect(dead_code)] - schema: Schema, - #[expect(dead_code)] - pk_indices: Vec, - #[expect(dead_code)] - is_append_only: bool, - /// the client to insert file to external storage (i.e., s3) - s3_client: SnowflakeS3Client, - row_encoder: JsonEncoder, - /// The current epoch, used in naming the sink files - /// mainly used for debugging purpose - epoch: u64, - /// streaming uploader to upload data to the intermediate (s3) storage. - /// this also contains the file suffix *unique* to the *local* sink writer per epoch. - /// i.e., opendal s3 engine and the file suffix for intermediate s3 file. - /// note: the option here *implicitly* indicates whether we have at - /// least call `streaming_upload` once during this epoch, - /// which is mainly used to prevent uploading empty data. - streaming_uploader: Option, -} - -impl SnowflakeSinkWriter { - pub fn new( - config: SnowflakeConfig, - schema: Schema, - pk_indices: Vec, - is_append_only: bool, - ) -> Result { - let s3_client = SnowflakeS3Client::new( - config.common.s3_bucket.clone(), - config.common.s3_path.clone(), - config.common.aws_access_key_id.clone(), - config.common.aws_secret_access_key.clone(), - config.common.aws_region.clone(), - )?; - - Ok(Self { - config, - schema: schema.clone(), - pk_indices, - is_append_only, - s3_client, - row_encoder: JsonEncoder::new( - schema, - None, - super::encoder::DateHandlingMode::String, - TimestampHandlingMode::String, - TimestamptzHandlingMode::UtcString, - TimeHandlingMode::String, - JsonbHandlingMode::String, - ), - // initial value of `epoch` will be set to 0 - epoch: 0, - // will be (lazily) initialized after the begin of each epoch - // when some data is ready to be upload - streaming_uploader: None, - }) - } - - /// return a brand new the streaming uploader as well as the file suffix. - /// note: should *only* be called iff after a new epoch begins, - /// and `streaming_upload` being called the first time. - /// i.e., lazily initialization of the internal `streaming_uploader`. - /// plus, this function is *pure*, the `&mut self` here is to make rustc (and tokio) happy. - async fn new_streaming_uploader(&mut self) -> Result { - let file_suffix = self.file_suffix(); - let path = generate_s3_file_name(self.s3_client.s3_path(), &file_suffix); - let uploader = self - .s3_client - .opendal_s3_engine - .streaming_upload(&path) - .await - .with_context(|| { - format!( - "failed to create the streaming uploader of opendal s3 engine for epoch {}", - self.epoch - ) - }) - .map_err(SinkError::Snowflake)?; - Ok(uploader) - } - - /// write data to the current streaming uploader for this epoch. - async fn streaming_upload(&mut self, data: Bytes) -> Result<()> { - let uploader = match self.streaming_uploader.as_mut() { - Some(s) => s, - None => { - assert!( - self.streaming_uploader.is_none(), - "expect `streaming_uploader` to be None" - ); - let uploader = self.new_streaming_uploader().await?; - self.streaming_uploader.insert(uploader) - } - }; - uploader - .write_bytes(data) - .await - .context("failed to write bytes when streaming uploading to s3") - .map_err(SinkError::Snowflake)?; - Ok(()) - } - - /// finalize streaming upload for this epoch. - /// ensure all the data has been properly uploaded to intermediate s3. - async fn finish_streaming_upload(&mut self) -> Result<()> { - let uploader = std::mem::take(&mut self.streaming_uploader); - let Some(uploader) = uploader else { - // there is no data to be uploaded for this epoch - return Ok(()); - }; - uploader - .finish() - .await - .context("failed to finish streaming upload to s3") - .map_err(SinkError::Snowflake)?; - Ok(()) - } - - async fn append_only(&mut self, chunk: StreamChunk) -> Result<()> { - let mut chunk_buf = BytesMut::new(); - - // write the json representations of the row(s) in current chunk to `chunk_buf` - for (op, row) in chunk.rows() { - assert_eq!(op, Op::Insert, "expect all `op(s)` to be `Op::Insert`"); - // to prevent temporary string allocation, - // so we directly write to `chunk_buf` implicitly via `write_fmt`. - write!( - chunk_buf, - "{}", - Value::Object(self.row_encoder.encode(row)?) - ) - .unwrap(); // write to a `BytesMut` should never fail - } - - // streaming upload in a chunk-by-chunk manner - self.streaming_upload(chunk_buf.freeze()).await?; - Ok(()) - } - - fn update_epoch(&mut self, epoch: u64) { - self.epoch = epoch; - } - - /// generate a *global unique* uuid, - /// which is the key to the uniqueness of file suffix. - fn gen_uuid() -> Uuid { - Uuid::new_v4() - } - - /// construct the *global unique* file suffix for the sink. - /// note: this is unique even across multiple parallel writer(s). - fn file_suffix(&self) -> String { - // the format of suffix will be _ - format!("{}_{}", self.epoch, Self::gen_uuid()) - } - - /// sink `payload` to s3, then trigger corresponding `insertFiles` post request - /// to snowflake, to finish the overall sinking pipeline. - async fn commit(&mut self) -> Result<()> { - // note that after `finish_streaming_upload`, do *not* interact with - // `streaming_uploader` until new data comes in at next epoch, - // since the ownership has been taken in this method, and `None` will be left. - self.finish_streaming_upload().await - } -} - -#[async_trait] -impl SinkWriter for SnowflakeSinkWriter { - async fn begin_epoch(&mut self, epoch: u64) -> Result<()> { - self.update_epoch(epoch); - Ok(()) - } - - async fn abort(&mut self) -> Result<()> { - Ok(()) - } - - async fn update_vnode_bitmap(&mut self, _vnode_bitmap: Arc) -> Result<()> { - Ok(()) - } - - async fn barrier(&mut self, is_checkpoint: bool) -> Result { - if is_checkpoint { - // finalize current streaming upload, plus notify snowflake to sink - // the corresponding data to snowflake pipe. - // note: if no data needs to be committed, then `commit` is simply a no-op. - self.commit().await?; - } - Ok(()) - } - - async fn write_batch(&mut self, chunk: StreamChunk) -> Result<()> { - self.append_only(chunk).await?; - Ok(()) - } -} - -/// The helper function to generate the *global unique* s3 file name. -pub(crate) fn generate_s3_file_name(s3_path: Option<&str>, suffix: &str) -> String { - match s3_path { - Some(path) => format!("{}/{}_{}", path, S3_INTERMEDIATE_FILE_NAME, suffix), - None => format!("{}_{}", S3_INTERMEDIATE_FILE_NAME, suffix), - } -} - -/// todo: refactor this part after s3 sink is available -pub struct SnowflakeS3Client { - #[expect(dead_code)] - s3_bucket: String, - s3_path: Option, - pub opendal_s3_engine: OpendalObjectStore, -} - -impl SnowflakeS3Client { - pub fn new( - s3_bucket: String, - s3_path: Option, - aws_access_key_id: String, - aws_secret_access_key: String, - aws_region: String, - ) -> Result { - // FIXME: we should use the `ObjectStoreConfig` instead of default - // just use default configuration here for opendal s3 engine - let config = ObjectStoreConfig::default(); - - let metrics = Arc::new(GLOBAL_OBJECT_STORE_METRICS.clone()); - - // create the s3 engine for streaming upload to the intermediate s3 bucket - let opendal_s3_engine = OpendalObjectStore::new_s3_engine_with_credentials( - &s3_bucket, - Arc::new(config), - metrics, - &aws_access_key_id, - &aws_secret_access_key, - &aws_region, - ) - .context("failed to create opendal s3 engine") - .map_err(SinkError::Snowflake)?; - - Ok(Self { - s3_bucket, - s3_path, - opendal_s3_engine, - }) - } - - pub fn s3_path(&self) -> Option<&str> { - self.s3_path.as_deref() - } -} diff --git a/src/connector/with_options_sink.yaml b/src/connector/with_options_sink.yaml index dd2dd7098f99f..2c286695b312c 100644 --- a/src/connector/with_options_sink.yaml +++ b/src/connector/with_options_sink.yaml @@ -977,21 +977,32 @@ S3Config: - name: s3.region_name field_type: String required: true + alias: + - snowflake.aws_region - name: s3.bucket_name field_type: String required: true + alias: + - snowflake.s3_bucket - name: s3.path field_type: String comments: The directory where the sink file is located. - required: true + required: false + default: Default::default + alias: + - snowflake.s3_path - name: s3.credentials.access field_type: String required: false default: Default::default + alias: + - snowflake.aws_access_key_id - name: s3.credentials.secret field_type: String required: false default: Default::default + alias: + - snowflake.aws_secret_access_key - name: s3.endpoint_url field_type: String required: false @@ -1014,42 +1025,6 @@ S3Config: - name: r#type field_type: String required: true -SnowflakeConfig: - fields: - - name: snowflake.s3_bucket - field_type: String - comments: The s3 bucket where intermediate sink files will be stored - required: true - alias: - - s3.bucket_name - - name: snowflake.s3_path - field_type: String - comments: |- - The optional s3 path to be specified - the actual file location would be `s3:////` - if this field is specified by user(s) - otherwise it would be `s3:///` - required: false - alias: - - s3.path - - name: snowflake.aws_access_key_id - field_type: String - comments: s3 credentials - required: true - alias: - - s3.credentials.access - - name: snowflake.aws_secret_access_key - field_type: String - comments: s3 credentials - required: true - alias: - - s3.credentials.secret - - name: snowflake.aws_region - field_type: String - comments: The s3 region, e.g., us-east-2 - required: true - alias: - - s3.region_name SqlServerConfig: fields: - name: sqlserver.host diff --git a/src/frontend/src/handler/create_sink.rs b/src/frontend/src/handler/create_sink.rs index 24a7e96fe15d7..5385e2b4957c1 100644 --- a/src/frontend/src/handler/create_sink.rs +++ b/src/frontend/src/handler/create_sink.rs @@ -871,7 +871,7 @@ static CONNECTORS_COMPATIBLE_FORMATS: LazyLock::SINK_NAME => hashmap!( Format::Plain => vec![Encode::Parquet, Encode::Json], ), + FileSink::::SINK_NAME => hashmap!( + Format::Plain => vec![Encode::Parquet, Encode::Json], + ), FileSink::::SINK_NAME => hashmap!( Format::Plain => vec![Encode::Parquet, Encode::Json], ),