Skip to content

Commit

Permalink
add s3 file path
Browse files Browse the repository at this point in the history
  • Loading branch information
xxhZs committed Oct 30, 2023
1 parent 946f2a7 commit 58818f9
Show file tree
Hide file tree
Showing 5 changed files with 67 additions and 10 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions integration_tests/big-query-sink/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ CREATE table '${project_id}'.'${dataset_id}'.'${table_id}'(

1. We need to obtain the JSON file for Google Cloud service accounts, which can be configured here: https://console.cloud.google.com/iam-admin/serviceaccounts.
2. Because BigQuery has limited support for updates and deletes, we currently only support 'append only'
3. Regarding file path, we can choose between S3 and local files, and the specific SQL statement is in the 'create_sink.sql'.

Run the following query
```sql
Expand Down
20 changes: 19 additions & 1 deletion integration_tests/big-query-sink/append-only-sql/create_sink.sql
Original file line number Diff line number Diff line change
@@ -1,11 +1,29 @@
-- create sink with local file
CREATE SINK bhv_big_query_sink
FROM
bhv_mv WITH (
connector = 'bigquery',
type = 'append-only',
bigquery.path= '${bigquery_service_account_json_path}',
bigquery.local.path= '${bigquery_service_account_json_path}',
bigquery.project= '${project_id}',
bigquery.dataset= '${dataset_id}',
bigquery.table= '${table_id}',
force_append_only='true'
);


-- create sink with s3 file
CREATE SINK bhv_big_query_sink
FROM
bhv_mv WITH (
connector = 'bigquery',
type = 'append-only',
bigquery.s3.path= '${s3_service_account_json_path}',
bigquery.project= '${project_id}',
bigquery.dataset= '${dataset_id}',
bigquery.table= '${table_id}',
access_key = '${aws_access_key}',
secret_access = '${aws_secret_access}',
region = '${aws_region}',
force_append_only='true',
);
1 change: 1 addition & 0 deletions src/connector/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@ tracing = "0.1"
tracing-futures = { version = "0.2", features = ["futures-03"] }
url = "2"
urlencoding = "2"
yup-oauth2 = "8.3"

[target.'cfg(not(madsim))'.dependencies]
workspace-hack = { path = "../workspace-hack" }
Expand Down
54 changes: 45 additions & 9 deletions src/connector/src/sink/big_query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,14 @@ use risingwave_common::types::DataType;
use serde_derive::{Deserialize, Serialize};
use serde_json::Value;
use serde_with::serde_as;
use url::Url;
use yup_oauth2::ServiceAccountKey;

use super::encoder::{JsonEncoder, RowEncoder, TimestampHandlingMode};
use super::writer::LogSinkerOf;
use super::{SinkError, SINK_TYPE_APPEND_ONLY, SINK_TYPE_OPTION, SINK_TYPE_UPSERT};
use crate::aws_auth::AwsAuthProps;
use crate::aws_utils::load_file_descriptor_from_s3;
use crate::sink::writer::SinkWriterExt;
use crate::sink::{
DummySinkCommitCoordinator, Result, Sink, SinkParam, SinkWriter, SinkWriterParam,
Expand All @@ -43,14 +47,51 @@ const BIGQUERY_INSERT_MAX_NUMS: usize = 1024;

#[derive(Deserialize, Serialize, Debug, Clone)]
pub struct BigQueryCommon {
#[serde(rename = "bigquery.path")]
pub file_path: String,
#[serde(rename = "bigquery.local.path")]
pub local_path: Option<String>,
#[serde(rename = "bigquery.s3.path")]
pub s3_path: Option<String>,
#[serde(rename = "bigquery.project")]
pub project: String,
#[serde(rename = "bigquery.dataset")]
pub dataset: String,
#[serde(rename = "bigquery.table")]
pub table: String,
#[serde(flatten)]
/// required keys refer to [`crate::aws_utils::AWS_DEFAULT_CONFIG`]
pub s3_credentials: HashMap<String, String>,
}

impl BigQueryCommon {
pub(crate) async fn build_client(&self) -> Result<Client> {
let service_account = if let Some(local_path) = &self.local_path {
let auth_json = std::fs::read_to_string(local_path)
.map_err(|err| SinkError::BigQuery(anyhow::anyhow!(err)))?;
serde_json::from_str::<ServiceAccountKey>(&auth_json)
.map_err(|err| SinkError::BigQuery(anyhow::anyhow!(err)))?
} else if let Some(s3_path) = &self.s3_path {
let url =
Url::parse(s3_path).map_err(|err| SinkError::BigQuery(anyhow::anyhow!(err)))?;
let auth_json = load_file_descriptor_from_s3(
&url,
&AwsAuthProps::from_pairs(
self.s3_credentials
.iter()
.map(|(k, v)| (k.as_str(), v.as_str())),
),
)
.await
.map_err(|err| SinkError::BigQuery(anyhow::anyhow!(err)))?;
serde_json::from_slice::<ServiceAccountKey>(&auth_json)
.map_err(|err| SinkError::BigQuery(anyhow::anyhow!(err)))?
} else {
return Err(SinkError::BigQuery(anyhow::anyhow!("`bigquery.local.path` and `bigquery.s3.path` set at least one, configure as needed.")));
};
let client: Client = Client::from_service_account_key(service_account, false)
.await
.map_err(|err| SinkError::BigQuery(anyhow::anyhow!(err)))?;
Ok(client)
}
}

#[serde_as]
Expand Down Expand Up @@ -200,9 +241,7 @@ impl Sink for BigQuerySink {
)));
}

let client = Client::from_service_account_key_file(&self.config.common.file_path)
.await
.map_err(|e| SinkError::BigQuery(e.into()))?;
let client = self.config.common.build_client().await?;
let mut rs = client
.job()
.query(
Expand Down Expand Up @@ -266,10 +305,7 @@ impl BigQuerySinkWriter {
pk_indices: Vec<usize>,
is_append_only: bool,
) -> Result<Self> {
let client = Client::from_service_account_key_file(&config.common.file_path)
.await
.map_err(|e| SinkError::BigQuery(e.into()))
.unwrap();
let client = config.common.build_client().await?;
Ok(Self {
config,
schema: schema.clone(),
Expand Down

0 comments on commit 58818f9

Please sign in to comment.