Skip to content

Commit

Permalink
basic structure for snowflake sink
Browse files Browse the repository at this point in the history
  • Loading branch information
xzhseh committed Mar 4, 2024
1 parent 56af4dd commit 82c03f9
Show file tree
Hide file tree
Showing 2 changed files with 149 additions and 0 deletions.
4 changes: 4 additions & 0 deletions src/connector/src/sink/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ pub mod pulsar;
pub mod redis;
pub mod remote;
pub mod starrocks;
pub mod snowflake;
pub mod test_sink;
pub mod trivial;
pub mod utils;
Expand Down Expand Up @@ -88,6 +89,7 @@ macro_rules! for_all_sinks {
{ HttpJava, $crate::sink::remote::HttpJavaSink },
{ Doris, $crate::sink::doris::DorisSink },
{ Starrocks, $crate::sink::starrocks::StarrocksSink },
{ Snowflake, $crate::sink::snowflake::SnowflakeSink },
{ DeltaLake, $crate::sink::deltalake::DeltaLakeSink },
{ BigQuery, $crate::sink::big_query::BigQuerySink },
{ Test, $crate::sink::test_sink::TestSink },
Expand Down Expand Up @@ -515,6 +517,8 @@ pub enum SinkError {
),
#[error("Starrocks error: {0}")]
Starrocks(String),
#[error("Snowflake error: {0}")]
Snowflake(String),
#[error("Pulsar error: {0}")]
Pulsar(
#[source]
Expand Down
145 changes: 145 additions & 0 deletions src/connector/src/sink/snowflake.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
// Copyright 2024 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::{collections::HashMap, sync::Arc};

use anyhow::anyhow;
use serde::Deserialize;
use serde_derive::Serialize;
use serde_json::Value;
use serde_with::serde_as;
use async_trait::async_trait;
use with_options::WithOptions;
use risingwave_common::{array::StreamChunk, buffer::Bitmap, catalog::Schema};

use super::{encoder::JsonEncoder, writer::LogSinkerOf, SinkError, SinkParam};
use crate::sink::{DummySinkCommitCoordinator, Result, Sink, SinkWriter, SinkWriterParam};

pub const SNOWFLAKE_SINK: &str = "snowflake";

// TODO: add comments
#[derive(Deserialize, Debug, Clone, WithOptions)]
pub struct SnowflakeCommon {
#[serde(rename = "snowflake.url")]
pub url: String,

#[serde(rename = "snowflake.database")]
pub database: String,

#[serde(rename = "snowflake.user")]
pub user: String,

#[serde(rename = "snowflake.private.key")]
pub private_key: String,

#[serde(rename = "snowflake.private.key.passphrase")]
pub private_key_passphrase: Option<String>,

#[serde(rename = "snowflake.role")]
pub role: String,
}

#[serde_as]
#[derive(Clone, Debug, Deserialize, WithOptions)]
pub struct SnowflakeConfig {
#[serde(flatten)]
pub common: SnowflakeCommon,
}

impl SnowflakeConfig {
pub fn from_hashmap(properties: HashMap<String, String>) -> Result<Self> {
let config =
serde_json::from_value::<SnowflakeConfig>(serde_json::to_value(properties).unwrap())
.map_err(|e| SinkError::Config(anyhow!(e)))?;
Ok(config)
}
}

#[derive(Debug)]
pub struct SnowflakeSink {
pub config: SnowflakeConfig,
schema: Schema,
pk_indices: Vec<usize>,
is_append_only: bool,
}

impl Sink for SnowflakeSink {
type Coordinator = DummySinkCommitCoordinator;
type LogSinker = LogSinkerOf<SnowflakeSinkWriter>;

const SINK_NAME: &'static str = SNOWFLAKE_SINK;

async fn new_log_sinker(&self, writer_param: SinkWriterParam) -> Result<Self::LogSinker> {
todo!()
}

async fn validate(&self) -> Result<()> {
todo!()
}
}

pub struct SnowflakeSinkWriter {
pub config: SnowflakeSink,
schema: Schema,
pk_indices: Vec<usize>,
is_append_only: bool,
client: Option<SnowflakeClient>,
row_encoder: JsonEncoder,
}

impl TryFrom<SinkParam> for SnowflakeSink {
type Error = SinkError;

fn try_from(param: SinkParam) -> std::result::Result<Self, Self::Error> {
let schema = param.schema();
let config = SnowflakeConfig::from_hashmap(param.properties)?;
Ok(SnowflakeSink {
config,
schema,
pk_indices: param.downstream_pk,
is_append_only: param.sink_type.is_append_only(),
})
}
}

#[async_trait]
impl SinkWriter for SnowflakeSinkWriter {
async fn begin_epoch(&mut self, epoch: u64) -> Result<()> {
todo!()
}

async fn write_batch(&mut self, chunk: StreamChunk) -> Result<()> {
todo!()
}

async fn barrier(&mut self, is_checkpoint: bool) -> Result<Self::CommitMetadata> {
todo!()
}

async fn abort(&mut self) -> Result<()> {
Ok(())
}

async fn update_vnode_bitmap(&mut self, _vnode_bitmap: Arc<Bitmap>) -> Result<()> {
Ok(())
}
}

pub struct SnowflakeClient {

}

impl SnowflakeClient {

}

0 comments on commit 82c03f9

Please sign in to comment.