From 58818f964b7f2d614c5450c55c3db0d7064ea5e8 Mon Sep 17 00:00:00 2001 From: xxhZs <1060434431@qq.com> Date: Mon, 30 Oct 2023 11:19:42 +0800 Subject: [PATCH] add s3 file path --- Cargo.lock | 1 + integration_tests/big-query-sink/README.md | 1 + .../append-only-sql/create_sink.sql | 20 ++++++- src/connector/Cargo.toml | 1 + src/connector/src/sink/big_query.rs | 54 +++++++++++++++---- 5 files changed, 67 insertions(+), 10 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 50519aeccab88..204fb01f4223a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7378,6 +7378,7 @@ dependencies = [ "url", "urlencoding", "workspace-hack", + "yup-oauth2", ] [[package]] diff --git a/integration_tests/big-query-sink/README.md b/integration_tests/big-query-sink/README.md index dd957eb4e9bb4..42d4fdc793266 100644 --- a/integration_tests/big-query-sink/README.md +++ b/integration_tests/big-query-sink/README.md @@ -28,6 +28,7 @@ CREATE table '${project_id}'.'${dataset_id}'.'${table_id}'( 1. We need to obtain the JSON file for Google Cloud service accounts, which can be configured here: https://console.cloud.google.com/iam-admin/serviceaccounts. 2. Because BigQuery has limited support for updates and deletes, we currently only support 'append only' + 3. Regarding file path, we can choose between S3 and local files, and the specific SQL statement is in the 'create_sink.sql'. Run the following query ```sql diff --git a/integration_tests/big-query-sink/append-only-sql/create_sink.sql b/integration_tests/big-query-sink/append-only-sql/create_sink.sql index 4cc9b377a861e..c5dd9d9d48725 100644 --- a/integration_tests/big-query-sink/append-only-sql/create_sink.sql +++ b/integration_tests/big-query-sink/append-only-sql/create_sink.sql @@ -1,11 +1,29 @@ +-- create sink with local file CREATE SINK bhv_big_query_sink FROM bhv_mv WITH ( connector = 'bigquery', type = 'append-only', - bigquery.path= '${bigquery_service_account_json_path}', + bigquery.local.path= '${bigquery_service_account_json_path}', bigquery.project= '${project_id}', bigquery.dataset= '${dataset_id}', bigquery.table= '${table_id}', force_append_only='true' +); + + +-- create sink with s3 file +CREATE SINK bhv_big_query_sink +FROM + bhv_mv WITH ( + connector = 'bigquery', + type = 'append-only', + bigquery.s3.path= '${s3_service_account_json_path}', + bigquery.project= '${project_id}', + bigquery.dataset= '${dataset_id}', + bigquery.table= '${table_id}', + access_key = '${aws_access_key}', + secret_access = '${aws_secret_access}', + region = '${aws_region}', + force_append_only='true', ); \ No newline at end of file diff --git a/src/connector/Cargo.toml b/src/connector/Cargo.toml index d8ba8f7c6d4a7..e9d29b7f2752c 100644 --- a/src/connector/Cargo.toml +++ b/src/connector/Cargo.toml @@ -132,6 +132,7 @@ tracing = "0.1" tracing-futures = { version = "0.2", features = ["futures-03"] } url = "2" urlencoding = "2" +yup-oauth2 = "8.3" [target.'cfg(not(madsim))'.dependencies] workspace-hack = { path = "../workspace-hack" } diff --git a/src/connector/src/sink/big_query.rs b/src/connector/src/sink/big_query.rs index 9b62ddda37588..4c540b2954233 100644 --- a/src/connector/src/sink/big_query.rs +++ b/src/connector/src/sink/big_query.rs @@ -29,10 +29,14 @@ use risingwave_common::types::DataType; use serde_derive::{Deserialize, Serialize}; use serde_json::Value; use serde_with::serde_as; +use url::Url; +use yup_oauth2::ServiceAccountKey; use super::encoder::{JsonEncoder, RowEncoder, TimestampHandlingMode}; use super::writer::LogSinkerOf; use super::{SinkError, SINK_TYPE_APPEND_ONLY, SINK_TYPE_OPTION, SINK_TYPE_UPSERT}; +use crate::aws_auth::AwsAuthProps; +use crate::aws_utils::load_file_descriptor_from_s3; use crate::sink::writer::SinkWriterExt; use crate::sink::{ DummySinkCommitCoordinator, Result, Sink, SinkParam, SinkWriter, SinkWriterParam, @@ -43,14 +47,51 @@ const BIGQUERY_INSERT_MAX_NUMS: usize = 1024; #[derive(Deserialize, Serialize, Debug, Clone)] pub struct BigQueryCommon { - #[serde(rename = "bigquery.path")] - pub file_path: String, + #[serde(rename = "bigquery.local.path")] + pub local_path: Option, + #[serde(rename = "bigquery.s3.path")] + pub s3_path: Option, #[serde(rename = "bigquery.project")] pub project: String, #[serde(rename = "bigquery.dataset")] pub dataset: String, #[serde(rename = "bigquery.table")] pub table: String, + #[serde(flatten)] + /// required keys refer to [`crate::aws_utils::AWS_DEFAULT_CONFIG`] + pub s3_credentials: HashMap, +} + +impl BigQueryCommon { + pub(crate) async fn build_client(&self) -> Result { + let service_account = if let Some(local_path) = &self.local_path { + let auth_json = std::fs::read_to_string(local_path) + .map_err(|err| SinkError::BigQuery(anyhow::anyhow!(err)))?; + serde_json::from_str::(&auth_json) + .map_err(|err| SinkError::BigQuery(anyhow::anyhow!(err)))? + } else if let Some(s3_path) = &self.s3_path { + let url = + Url::parse(s3_path).map_err(|err| SinkError::BigQuery(anyhow::anyhow!(err)))?; + let auth_json = load_file_descriptor_from_s3( + &url, + &AwsAuthProps::from_pairs( + self.s3_credentials + .iter() + .map(|(k, v)| (k.as_str(), v.as_str())), + ), + ) + .await + .map_err(|err| SinkError::BigQuery(anyhow::anyhow!(err)))?; + serde_json::from_slice::(&auth_json) + .map_err(|err| SinkError::BigQuery(anyhow::anyhow!(err)))? + } else { + return Err(SinkError::BigQuery(anyhow::anyhow!("`bigquery.local.path` and `bigquery.s3.path` set at least one, configure as needed."))); + }; + let client: Client = Client::from_service_account_key(service_account, false) + .await + .map_err(|err| SinkError::BigQuery(anyhow::anyhow!(err)))?; + Ok(client) + } } #[serde_as] @@ -200,9 +241,7 @@ impl Sink for BigQuerySink { ))); } - let client = Client::from_service_account_key_file(&self.config.common.file_path) - .await - .map_err(|e| SinkError::BigQuery(e.into()))?; + let client = self.config.common.build_client().await?; let mut rs = client .job() .query( @@ -266,10 +305,7 @@ impl BigQuerySinkWriter { pk_indices: Vec, is_append_only: bool, ) -> Result { - let client = Client::from_service_account_key_file(&config.common.file_path) - .await - .map_err(|e| SinkError::BigQuery(e.into())) - .unwrap(); + let client = config.common.build_client().await?; Ok(Self { config, schema: schema.clone(),