Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: deprecate SinkPayloadFormat #16723

Merged
merged 11 commits into from
May 14, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 3 additions & 7 deletions proto/connector_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -30,16 +30,12 @@ message SinkParam {
string sink_name = 8;
}

enum SinkPayloadFormat {
FORMAT_UNSPECIFIED = 0;
JSON = 1;
STREAM_CHUNK = 2;
}

message SinkWriterStreamRequest {
message StartSink {
SinkParam sink_param = 1;
SinkPayloadFormat format = 2;
// Deprecated: SinkPayloadFormat format = 2;
reserved "format";
reserved 2;
TableSchema payload_schema = 3;
}

Expand Down
4 changes: 1 addition & 3 deletions src/bench/sink_bench/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@ use risingwave_connector::source::datagen::{
use risingwave_connector::source::{
Column, DataType, SourceContext, SourceEnumeratorContext, SplitEnumerator, SplitReader,
};
use risingwave_pb::connector_service::SinkPayloadFormat;
use risingwave_stream::executor::test_utils::prelude::ColumnDesc;
use risingwave_stream::executor::{Barrier, Message, MessageStreamItem, StreamExecutorError};
use serde::{Deserialize, Deserializer};
Expand Down Expand Up @@ -472,9 +471,8 @@ async fn main() {
sink_from_name: "not_need_set".to_string(),
};
let sink = build_sink(sink_param).unwrap();
let mut sink_writer_param = SinkWriterParam::for_test();
let sink_writer_param = SinkWriterParam::for_test();
println!("Start Sink Bench!, Wait {:?}s", BENCH_TIME);
sink_writer_param.connector_params.sink_payload_format = SinkPayloadFormat::StreamChunk;
tokio::spawn(async move {
dispatch_sink!(sink, sink, {
consume_log_stream(sink, mock_range_log_reader, sink_writer_param).boxed()
Expand Down
4 changes: 0 additions & 4 deletions src/compute/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,10 +76,6 @@ pub struct ComputeNodeOpts {
#[clap(long, env = "RW_META_ADDR", default_value = "http://127.0.0.1:5690")]
pub meta_address: MetaAddressStrategy,

/// Payload format of connector sink rpc
#[clap(long, env = "RW_CONNECTOR_RPC_SINK_PAYLOAD_FORMAT")]
pub connector_rpc_sink_payload_format: Option<String>,

/// The path of `risingwave.toml` configuration file.
///
/// If empty, default configuration values will be used.
Expand Down
20 changes: 0 additions & 20 deletions src/compute/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ use risingwave_connector::source::monitor::GLOBAL_SOURCE_METRICS;
use risingwave_dml::dml_manager::DmlManager;
use risingwave_pb::common::WorkerType;
use risingwave_pb::compute::config_service_server::ConfigServiceServer;
use risingwave_pb::connector_service::SinkPayloadFormat;
use risingwave_pb::health::health_server::HealthServer;
use risingwave_pb::meta::add_worker_node_request::Property;
use risingwave_pb::monitor_service::monitor_service_server::MonitorServiceServer;
Expand Down Expand Up @@ -334,28 +333,9 @@ pub async fn compute_node_serve(
config.server.metrics_level,
);

info!(
"connector param: payload_format={:?}",
opts.connector_rpc_sink_payload_format
);

let connector_params = risingwave_connector::ConnectorParams {
sink_payload_format: match opts.connector_rpc_sink_payload_format.as_deref() {
None | Some("stream_chunk") => SinkPayloadFormat::StreamChunk,
Some("json") => SinkPayloadFormat::Json,
_ => {
unreachable!(
"invalid sink payload format: {:?}. Should be either json or stream_chunk",
opts.connector_rpc_sink_payload_format
)
}
},
};

// Initialize the streaming environment.
let stream_env = StreamEnvironment::new(
advertise_addr.clone(),
connector_params,
stream_config,
worker_id,
state_store,
Expand Down
14 changes: 0 additions & 14 deletions src/connector/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@
use std::time::Duration;

use duration_str::parse_std;
use risingwave_pb::connector_service::SinkPayloadFormat;
use serde::de;

pub mod aws_utils;
Expand All @@ -64,19 +63,6 @@ pub use with_options::WithPropertiesExt;
#[cfg(test)]
mod with_options_test;

#[derive(Clone, Debug, Default)]
pub struct ConnectorParams {
pub sink_payload_format: SinkPayloadFormat,
}

impl ConnectorParams {
pub fn new(sink_payload_format: SinkPayloadFormat) -> Self {
Self {
sink_payload_format,
}
}
}

pub(crate) fn deserialize_u32_from_string<'de, D>(deserializer: D) -> Result<u32, D::Error>
where
D: de::Deserializer<'de>,
Expand Down
3 changes: 0 additions & 3 deletions src/connector/src/sink/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,6 @@ use crate::sink::catalog::desc::SinkDesc;
use crate::sink::catalog::{SinkCatalog, SinkId};
use crate::sink::log_store::{LogReader, LogStoreReadItem, LogStoreResult, TruncateOffset};
use crate::sink::writer::SinkWriter;
use crate::ConnectorParams;

const BOUNDED_CHANNEL_SIZE: usize = 16;
#[macro_export]
Expand Down Expand Up @@ -288,7 +287,6 @@ impl SinkMetrics {

#[derive(Clone)]
pub struct SinkWriterParam {
pub connector_params: ConnectorParams,
pub executor_id: u64,
pub vnode_bitmap: Option<Bitmap>,
pub meta_client: Option<SinkMetaClient>,
Expand Down Expand Up @@ -326,7 +324,6 @@ impl SinkMetaClient {
impl SinkWriterParam {
pub fn for_test() -> Self {
SinkWriterParam {
connector_params: Default::default(),
executor_id: Default::default(),
vnode_bitmap: Default::default(),
meta_client: Default::default(),
Expand Down
30 changes: 7 additions & 23 deletions src/connector/src/sink/remote.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,8 @@ use risingwave_pb::connector_service::sink_writer_stream_request::{
use risingwave_pb::connector_service::{
sink_coordinator_stream_request, sink_coordinator_stream_response, sink_writer_stream_response,
PbSinkParam, SinkCoordinatorStreamRequest, SinkCoordinatorStreamResponse, SinkMetadata,
SinkPayloadFormat, SinkWriterStreamRequest, SinkWriterStreamResponse, TableSchema,
ValidateSinkRequest, ValidateSinkResponse,
SinkWriterStreamRequest, SinkWriterStreamResponse, TableSchema, ValidateSinkRequest,
ValidateSinkResponse,
};
use risingwave_rpc_client::error::RpcError;
use risingwave_rpc_client::{
Expand All @@ -68,7 +68,6 @@ use crate::sink::{
DummySinkCommitCoordinator, LogSinker, Result, Sink, SinkCommitCoordinator, SinkError,
SinkLogReader, SinkMetrics, SinkParam, SinkWriterParam,
};
use crate::ConnectorParams;

macro_rules! def_remote_sink {
() => {
Expand All @@ -82,7 +81,6 @@ macro_rules! def_remote_sink {
{ HttpJava, HttpJavaSink, "http" }
}
};
() => {};
({ $variant_name:ident, $sink_type_name:ident, $sink_name:expr }) => {
#[derive(Debug)]
pub struct $variant_name;
Expand Down Expand Up @@ -283,7 +281,7 @@ impl RemoteLogSinker {
request_sender,
response_stream,
} = EmbeddedConnectorClient::new()?
.start_sink_writer_stream(payload_schema, sink_proto, SinkPayloadFormat::StreamChunk)
.start_sink_writer_stream(payload_schema, sink_proto)
.await?;

let sink_metrics = writer_param.sink_metrics;
Expand Down Expand Up @@ -547,12 +545,8 @@ impl<R: RemoteSinkTrait> Sink for CoordinatedRemoteSink<R> {
"sink needs coordination should not have singleton input"
))
})?,
CoordinatedRemoteSinkWriter::new(
self.param.clone(),
writer_param.connector_params,
writer_param.sink_metrics.clone(),
)
.await?,
CoordinatedRemoteSinkWriter::new(self.param.clone(), writer_param.sink_metrics.clone())
.await?,
)
.await?
.into_log_sinker(writer_param.sink_metrics))
Expand All @@ -572,18 +566,10 @@ pub struct CoordinatedRemoteSinkWriter {
}

impl CoordinatedRemoteSinkWriter {
pub async fn new(
param: SinkParam,
connector_params: ConnectorParams,
sink_metrics: SinkMetrics,
) -> Result<Self> {
pub async fn new(param: SinkParam, sink_metrics: SinkMetrics) -> Result<Self> {
let sink_proto = param.to_proto();
let stream_handle = EmbeddedConnectorClient::new()?
.start_sink_writer_stream(
sink_proto.table_schema.clone(),
sink_proto,
connector_params.sink_payload_format,
)
.start_sink_writer_stream(sink_proto.table_schema.clone(), sink_proto)
.await?;

Ok(Self {
Expand Down Expand Up @@ -717,13 +703,11 @@ impl EmbeddedConnectorClient {
&self,
payload_schema: Option<TableSchema>,
sink_proto: PbSinkParam,
sink_payload_format: SinkPayloadFormat,
) -> Result<SinkWriterStreamHandle<JniSinkWriterStreamRequest>> {
let (handle, first_rsp) = SinkWriterStreamHandle::initialize(
SinkWriterStreamRequest {
request: Some(SinkRequest::Start(StartSink {
sink_param: Some(sink_proto),
format: sink_payload_format as i32,
payload_schema,
})),
},
Expand Down
5 changes: 0 additions & 5 deletions src/connector/src/source/reader/desc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ use crate::parser::additional_columns::add_partition_offset_cols;
use crate::parser::{EncodingProperties, ProtocolProperties, SpecificParserConfig};
use crate::source::monitor::SourceMetrics;
use crate::source::{SourceColumnDesc, SourceColumnType, UPSTREAM_SOURCE_KEY};
use crate::ConnectorParams;

pub const DEFAULT_CONNECTOR_MESSAGE_BUFFER_SIZE: usize = 16;

Expand Down Expand Up @@ -59,7 +58,6 @@ pub struct SourceDescBuilder {
row_id_index: Option<usize>,
with_properties: HashMap<String, String>,
source_info: PbStreamSourceInfo,
connector_params: ConnectorParams,
connector_message_buffer_size: usize,
pk_indices: Vec<usize>,
}
Expand All @@ -72,7 +70,6 @@ impl SourceDescBuilder {
row_id_index: Option<usize>,
with_properties: HashMap<String, String>,
source_info: PbStreamSourceInfo,
connector_params: ConnectorParams,
connector_message_buffer_size: usize,
pk_indices: Vec<usize>,
) -> Self {
Expand All @@ -82,7 +79,6 @@ impl SourceDescBuilder {
row_id_index,
with_properties,
source_info,
connector_params,
connector_message_buffer_size,
pk_indices,
}
Expand Down Expand Up @@ -223,7 +219,6 @@ pub mod test_utils {
row_id_index,
with_properties,
source_info,
connector_params: Default::default(),
connector_message_buffer_size: DEFAULT_CONNECTOR_MESSAGE_BUFFER_SIZE,
pk_indices,
}
Expand Down
2 changes: 0 additions & 2 deletions src/rpc_client/src/connector_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -278,15 +278,13 @@ impl ConnectorClient {
&self,
payload_schema: Option<TableSchema>,
sink_proto: PbSinkParam,
sink_payload_format: SinkPayloadFormat,
) -> Result<SinkWriterStreamHandle> {
let mut rpc_client = self.rpc_client.clone();
let (handle, first_rsp) = SinkWriterStreamHandle::initialize(
SinkWriterStreamRequest {
request: Some(SinkRequest::Start(StartSink {
payload_schema,
sink_param: Some(sink_proto),
format: sink_payload_format as i32,
})),
},
|rx| async move {
Expand Down
1 change: 0 additions & 1 deletion src/stream/src/from_proto/sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,6 @@ impl ExecutorBuilder for SinkExecutorBuilder {
);

let sink_write_param = SinkWriterParam {
connector_params: params.env.connector_params(),
executor_id: params.executor_id,
vnode_bitmap: params.vnode_bitmap.clone(),
meta_client: params.env.meta_client().map(SinkMetaClient::MetaClient),
Expand Down
1 change: 0 additions & 1 deletion src/stream/src/from_proto/source/fs_fetch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@ impl ExecutorBuilder for FsFetchExecutorBuilder {
source.row_id_index.map(|x| x as _),
source.with_properties.clone(),
source_info.clone(),
params.env.connector_params(),
params.env.config().developer.connector_message_buffer_size,
params.info.pk_indices.clone(),
);
Expand Down
1 change: 0 additions & 1 deletion src/stream/src/from_proto/source/trad_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,6 @@ pub fn create_source_desc_builder(
row_id_index.map(|x| x as _),
with_properties,
source_info,
params.env.connector_params(),
params.env.config().developer.connector_message_buffer_size,
// `pk_indices` is used to ensure that a message will be skipped instead of parsed
// with null pk when the pk column is missing.
Expand Down
12 changes: 0 additions & 12 deletions src/stream/src/task/env.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ use risingwave_common::config::StreamingConfig;
use risingwave_common::system_param::local_manager::LocalSystemParamsManagerRef;
use risingwave_common::util::addr::HostAddr;
use risingwave_connector::source::monitor::SourceMetrics;
use risingwave_connector::ConnectorParams;
use risingwave_dml::dml_manager::DmlManagerRef;
use risingwave_rpc_client::MetaClient;
use risingwave_storage::StateStoreImpl;
Expand All @@ -33,9 +32,6 @@ pub struct StreamEnvironment {
/// Endpoint the stream manager listens on.
server_addr: HostAddr,

/// Parameters used by connector nodes.
connector_params: ConnectorParams,

/// Streaming related configurations.
config: Arc<StreamingConfig>,

Expand Down Expand Up @@ -65,7 +61,6 @@ impl StreamEnvironment {
#[allow(clippy::too_many_arguments)]
pub fn new(
server_addr: HostAddr,
connector_params: ConnectorParams,
config: Arc<StreamingConfig>,
worker_id: WorkerNodeId,
state_store: StateStoreImpl,
Expand All @@ -76,7 +71,6 @@ impl StreamEnvironment {
) -> Self {
StreamEnvironment {
server_addr,
connector_params,
config,
worker_id,
state_store,
Expand All @@ -93,11 +87,9 @@ impl StreamEnvironment {
pub fn for_test() -> Self {
use risingwave_common::system_param::local_manager::LocalSystemParamsManager;
use risingwave_dml::dml_manager::DmlManager;
use risingwave_pb::connector_service::SinkPayloadFormat;
use risingwave_storage::monitor::MonitoredStorageMetrics;
StreamEnvironment {
server_addr: "127.0.0.1:5688".parse().unwrap(),
connector_params: ConnectorParams::new(SinkPayloadFormat::Json),
config: Arc::new(StreamingConfig::default()),
worker_id: WorkerNodeId::default(),
state_store: StateStoreImpl::shared_in_memory_store(Arc::new(
Expand Down Expand Up @@ -127,10 +119,6 @@ impl StreamEnvironment {
self.state_store.clone()
}

pub fn connector_params(&self) -> ConnectorParams {
self.connector_params.clone()
}

pub fn dml_manager_ref(&self) -> DmlManagerRef {
self.dml_manager.clone()
}
Expand Down
Loading