From c90d67e3f62762e8cb0550a0fa02511d7d26aeeb Mon Sep 17 00:00:00 2001 From: Rossil <40714231+Rossil2012@users.noreply.github.com> Date: Sun, 17 Sep 2023 21:20:18 +0800 Subject: [PATCH] style check by risedev --- src/connector/src/common.rs | 10 +-- src/connector/src/sink/mod.rs | 2 +- src/connector/src/sink/pulsar.rs | 84 +++++++++++++------------- src/connector/src/source/pulsar/mod.rs | 2 +- 4 files changed, 49 insertions(+), 49 deletions(-) diff --git a/src/connector/src/common.rs b/src/connector/src/common.rs index 27f6918d0494c..eeffcb7c3903f 100644 --- a/src/connector/src/common.rs +++ b/src/connector/src/common.rs @@ -14,24 +14,24 @@ use std::borrow::Cow; use std::collections::HashMap; +use std::io::Write; use std::time::Duration; -use url::Url; -use std::io::Write; use anyhow::{anyhow, Ok}; -use tempfile::NamedTempFile; use async_nats::jetstream::consumer::DeliverPolicy; use async_nats::jetstream::{self}; -use pulsar::{Pulsar, Authentication, TokioExecutor}; -use pulsar::authentication::oauth2::{OAuth2Authentication, OAuth2Params}; use aws_sdk_kinesis::Client as KinesisClient; use clickhouse::Client; +use pulsar::authentication::oauth2::{OAuth2Authentication, OAuth2Params}; +use pulsar::{Authentication, Pulsar, TokioExecutor}; use rdkafka::ClientConfig; use risingwave_common::error::ErrorCode::InvalidParameterValue; use risingwave_common::error::{anyhow_error, RwError}; use serde_derive::{Deserialize, Serialize}; use serde_with::json::JsonString; use serde_with::{serde_as, DisplayFromStr}; +use tempfile::NamedTempFile; +use url::Url; use crate::aws_auth::AwsAuthProps; use crate::aws_utils::load_file_descriptor_from_s3; diff --git a/src/connector/src/sink/mod.rs b/src/connector/src/sink/mod.rs index 73caa66f1c373..4681303b5d2b3 100644 --- a/src/connector/src/sink/mod.rs +++ b/src/connector/src/sink/mod.rs @@ -21,9 +21,9 @@ pub mod iceberg; pub mod kafka; pub mod kinesis; pub mod nats; +pub mod pulsar; pub mod redis; pub mod remote; -pub mod pulsar; #[cfg(any(test, madsim))] pub mod test_sink; pub mod utils; diff --git a/src/connector/src/sink/pulsar.rs b/src/connector/src/sink/pulsar.rs index c20d3c521f068..7776258919cd7 100644 --- a/src/connector/src/sink/pulsar.rs +++ b/src/connector/src/sink/pulsar.rs @@ -17,29 +17,29 @@ use std::fmt::Debug; use std::time::Duration; use anyhow::anyhow; +use async_trait::async_trait; use futures::future::try_join_all; use futures::TryFutureExt; -use serde::Deserialize; -use async_trait::async_trait; use futures_async_stream::for_await; -use pulsar::{Pulsar, TokioExecutor, Producer}; -use pulsar::producer::{SendFuture, Message}; -use risingwave_common::catalog::Schema; +use pulsar::producer::{Message, SendFuture}; +use pulsar::{Producer, Pulsar, TokioExecutor}; use risingwave_common::array::StreamChunk; +use risingwave_common::catalog::Schema; use risingwave_rpc_client::ConnectorClient; +use serde::Deserialize; +use super::encoder::{JsonEncoder, TimestampHandlingMode}; use super::{ - Sink, SinkError, SinkWriterParam, SinkWriter, SinkParam, - SINK_TYPE_OPTION, SINK_TYPE_APPEND_ONLY, SINK_TYPE_UPSERT, + Sink, SinkError, SinkParam, SinkWriter, SinkWriterParam, SINK_TYPE_APPEND_ONLY, + SINK_TYPE_OPTION, SINK_TYPE_UPSERT, }; -use super::encoder::{JsonEncoder, TimestampHandlingMode}; -use crate::{deserialize_u32_from_string, deserialize_duration_from_string}; -use crate::sink::{Result, DummySinkCommitCoordinator}; +use crate::common::PulsarCommon; use crate::sink::utils::{ - gen_append_only_message_stream, gen_upsert_message_stream, - AppendOnlyAdapterOpts, UpsertAdapterOpts, + gen_append_only_message_stream, gen_upsert_message_stream, AppendOnlyAdapterOpts, + UpsertAdapterOpts, }; -use crate::common::PulsarCommon; +use crate::sink::{DummySinkCommitCoordinator, Result}; +use crate::{deserialize_duration_from_string, deserialize_u32_from_string}; pub const PULSAR_SINK: &str = "pulsar"; @@ -82,9 +82,8 @@ impl PulsarConfig { pub fn from_hashmap(values: HashMap) -> Result { let config = serde_json::from_value::(serde_json::to_value(values).unwrap()) .map_err(|e| SinkError::Config(anyhow!(e)))?; - - if config.r#type != SINK_TYPE_APPEND_ONLY - && config.r#type != SINK_TYPE_UPSERT { + + if config.r#type != SINK_TYPE_APPEND_ONLY && config.r#type != SINK_TYPE_UPSERT { return Err(SinkError::Config(anyhow!( "`{}` must be {}, or {}", SINK_TYPE_OPTION, @@ -119,13 +118,15 @@ impl PulsarSink { impl Sink for PulsarSink { type Coordinator = DummySinkCommitCoordinator; type Writer = PulsarSinkWriter; + async fn new_writer(&self, _writer_param: SinkWriterParam) -> Result { PulsarSinkWriter::new( - self.config.clone(), - self.schema.clone(), + self.config.clone(), + self.schema.clone(), self.downstream_pk.clone(), self.is_append_only, - ).await + ) + .await } async fn validate(&self, _client: Option) -> Result<()> { @@ -155,10 +156,10 @@ pub struct PulsarSinkWriter { impl PulsarSinkWriter { pub async fn new( - config: PulsarConfig, - schema: Schema, - downstream_pk: Vec, - is_append_only: bool + config: PulsarConfig, + schema: Schema, + downstream_pk: Vec, + is_append_only: bool, ) -> Result { let pulsar = config.common.build_client().await?; let producer = pulsar @@ -185,7 +186,7 @@ impl PulsarSinkWriter { for _ in 0..self.config.max_retry_num { match self.producer.send(message.clone()).await { // If the message is sent successfully, - // a SendFuture holding the message receipt + // a SendFuture holding the message receipt // or error after sending is returned Ok(send_future) => { // Check if send_future_buffer is greater than the preset limit @@ -196,22 +197,20 @@ impl PulsarSinkWriter { .map_err(|e| SinkError::Pulsar(anyhow!(e))) .await?; } - + success_flag = true; self.send_future_buffer.push_back(send_future); break; } // error upon sending - Err(e) => { - match e { - pulsar::Error::Connection(e) => { - connection_err = Some(e); - tokio::time::sleep(self.config.retry_interval).await; - continue; - }, - _ => return Err(SinkError::Pulsar(anyhow!(e))) + Err(e) => match e { + pulsar::Error::Connection(e) => { + connection_err = Some(e); + tokio::time::sleep(self.config.retry_interval).await; + continue; } - } + _ => return Err(SinkError::Pulsar(anyhow!(e))), + }, } } @@ -256,7 +255,8 @@ impl PulsarSinkWriter { #[for_await] for msg in append_only_stream { let (event_key_object, event_object) = msg?; - self.write_json_objects(event_key_object, event_object).await?; + self.write_json_objects(event_key_object, event_object) + .await?; } Ok(()) } @@ -270,7 +270,7 @@ impl PulsarSinkWriter { let val_encoder = JsonEncoder::new(&schema, None, TimestampHandlingMode::Milli); let upsert_stream = gen_upsert_message_stream( - chunk, + chunk, UpsertAdapterOpts::default(), key_encoder, val_encoder, @@ -279,7 +279,8 @@ impl PulsarSinkWriter { #[for_await] for msg in upsert_stream { let (event_key_object, event_object) = msg?; - self.write_json_objects(event_key_object, event_object).await?; + self.write_json_objects(event_key_object, event_object) + .await?; } Ok(()) } @@ -288,10 +289,9 @@ impl PulsarSinkWriter { try_join_all( self.send_future_buffer .drain(..) - .map(|send_future| send_future - .map_err(|e| SinkError::Pulsar(anyhow!(e))) - ), - ).await?; + .map(|send_future| send_future.map_err(|e| SinkError::Pulsar(anyhow!(e)))), + ) + .await?; Ok(()) } @@ -318,4 +318,4 @@ impl SinkWriter for PulsarSinkWriter { Ok(()) } -} \ No newline at end of file +} diff --git a/src/connector/src/source/pulsar/mod.rs b/src/connector/src/source/pulsar/mod.rs index 6be45ace41e13..544d1b7fb3ed3 100644 --- a/src/connector/src/source/pulsar/mod.rs +++ b/src/connector/src/source/pulsar/mod.rs @@ -22,8 +22,8 @@ use serde::Deserialize; pub use split::*; use crate::common::PulsarCommon; -use crate::source::SourceProperties; use crate::source::pulsar::source::reader::PulsarSplitReader; +use crate::source::SourceProperties; pub const PULSAR_CONNECTOR: &str = "pulsar";