Skip to content

Commit

Permalink
refactor(sink): unify log write path and distinguish table sink from …
Browse files Browse the repository at this point in the history
…external sink
  • Loading branch information
wenym1 committed Dec 27, 2023
1 parent 94c4c91 commit 89cd4b2
Show file tree
Hide file tree
Showing 9 changed files with 129 additions and 204 deletions.
1 change: 0 additions & 1 deletion proto/connector_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ message SinkParam {
string db_name = 5;
string sink_from_name = 6;
catalog.SinkFormatDesc format_desc = 7;
optional uint32 target_table = 8;
}

enum SinkPayloadFormat {
Expand Down
6 changes: 3 additions & 3 deletions src/connector/src/sink/log_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -174,9 +174,9 @@ pub trait LogReader: Send + Sized + 'static {
) -> impl Future<Output = LogStoreResult<(bool, Option<Bitmap>)>> + Send + '_;
}

pub trait LogStoreFactory: 'static {
type Reader: LogReader + Send + 'static;
type Writer: LogWriter + Send + 'static;
pub trait LogStoreFactory: Send + 'static {
type Reader: LogReader;
type Writer: LogWriter;

fn build(self) -> impl Future<Output = (Self::Reader, Self::Writer)> + Send;
}
Expand Down
22 changes: 6 additions & 16 deletions src/connector/src/sink/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ pub mod pulsar;
pub mod redis;
pub mod remote;
pub mod starrocks;
pub mod table;
pub mod test_sink;
pub mod utils;
pub mod writer;
Expand All @@ -46,7 +45,7 @@ use ::redis::RedisError;
use anyhow::anyhow;
use async_trait::async_trait;
use risingwave_common::buffer::Bitmap;
use risingwave_common::catalog::{ColumnDesc, Field, Schema, TableId};
use risingwave_common::catalog::{ColumnDesc, Field, Schema};
use risingwave_common::error::{anyhow_error, ErrorCode, RwError};
use risingwave_common::metrics::{
LabelGuardedHistogram, LabelGuardedIntCounter, LabelGuardedIntGauge,
Expand All @@ -62,7 +61,6 @@ use self::catalog::{SinkFormatDesc, SinkType};
use crate::sink::catalog::desc::SinkDesc;
use crate::sink::catalog::{SinkCatalog, SinkId};
use crate::sink::log_store::{LogReader, LogStoreReadItem, LogStoreResult, TruncateOffset};
use crate::sink::table::TABLE_SINK;
use crate::sink::writer::SinkWriter;
use crate::ConnectorParams;

Expand All @@ -89,8 +87,7 @@ macro_rules! for_all_sinks {
{ Starrocks, $crate::sink::starrocks::StarrocksSink },
{ DeltaLakeRust, $crate::sink::deltalake::DeltaLakeSink },
{ BigQuery, $crate::sink::big_query::BigQuerySink },
{ Test, $crate::sink::test_sink::TestSink },
{ Table, $crate::sink::table::TableSink }
{ Test, $crate::sink::test_sink::TestSink }
}
$(,$arg)*
}
Expand Down Expand Up @@ -151,7 +148,6 @@ pub struct SinkParam {
pub format_desc: Option<SinkFormatDesc>,
pub db_name: String,
pub sink_from_name: String,
pub target_table: Option<TableId>,
}

impl SinkParam {
Expand Down Expand Up @@ -183,7 +179,6 @@ impl SinkParam {
format_desc,
db_name: pb_param.db_name,
sink_from_name: pb_param.sink_from_name,
target_table: pb_param.target_table.map(TableId::new),
}
}

Expand All @@ -199,7 +194,6 @@ impl SinkParam {
format_desc: self.format_desc.as_ref().map(|f| f.to_proto()),
db_name: self.db_name.clone(),
sink_from_name: self.sink_from_name.clone(),
target_table: self.target_table.map(|table_id| table_id.table_id()),
}
}

Expand All @@ -225,7 +219,6 @@ impl From<SinkCatalog> for SinkParam {
format_desc: sink_catalog.format_desc,
db_name: sink_catalog.db_name,
sink_from_name: sink_catalog.sink_from_name,
target_table: sink_catalog.target_table,
}
}
}
Expand Down Expand Up @@ -373,13 +366,10 @@ impl SinkImpl {
param.properties.remove(PRIVATE_LINK_TARGET_KEY);
param.properties.remove(CONNECTION_NAME_KEY);

let sink_type = if param.target_table.is_some() {
TABLE_SINK
} else {
param.properties.get(CONNECTOR_TYPE_KEY).ok_or_else(|| {
SinkError::Config(anyhow!("missing config: {}", CONNECTOR_TYPE_KEY))
})?
};
let sink_type = param
.properties
.get(CONNECTOR_TYPE_KEY)
.ok_or_else(|| SinkError::Config(anyhow!("missing config: {}", CONNECTOR_TYPE_KEY)))?;

match_sink_name_str!(
sink_type.to_lowercase().as_str(),
Expand Down
76 changes: 0 additions & 76 deletions src/connector/src/sink/table.rs

This file was deleted.

1 change: 0 additions & 1 deletion src/connector/src/source/external.rs
Original file line number Diff line number Diff line change
Expand Up @@ -605,7 +605,6 @@ mod tests {
format_desc: None,
db_name: "db".into(),
sink_from_name: "table".into(),
target_table: None,
};

let rw_schema = param.schema();
Expand Down
5 changes: 0 additions & 5 deletions src/meta/src/manager/sink_coordination/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -407,7 +407,6 @@ mod tests {
format_desc: None,
db_name: "test".into(),
sink_from_name: "test".into(),
target_table: None,
};

let epoch1 = 233;
Expand Down Expand Up @@ -578,7 +577,6 @@ mod tests {
format_desc: None,
db_name: "test".into(),
sink_from_name: "test".into(),
target_table: None,
};

let epoch1 = 233;
Expand Down Expand Up @@ -698,7 +696,6 @@ mod tests {
format_desc: None,
db_name: "test".into(),
sink_from_name: "test".into(),
target_table: None,
};

let (manager, (_join_handle, _stop_tx)) = SinkCoordinatorManager::start_worker();
Expand Down Expand Up @@ -738,7 +735,6 @@ mod tests {
format_desc: None,
db_name: "test".into(),
sink_from_name: "test".into(),
target_table: None,
};

let epoch = 233;
Expand Down Expand Up @@ -818,7 +814,6 @@ mod tests {
format_desc: None,
db_name: "test".into(),
sink_from_name: "test".into(),
target_table: None,
};

let epoch = 233;
Expand Down
8 changes: 4 additions & 4 deletions src/stream/src/executor/monitor/streaming_stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use prometheus::{
exponential_buckets, histogram_opts, register_gauge_vec_with_registry,
register_histogram_with_registry, register_int_counter_vec_with_registry,
register_int_counter_with_registry, register_int_gauge_vec_with_registry,
register_int_gauge_with_registry, Histogram, IntCounter, IntGauge, Registry,
register_int_gauge_with_registry, Histogram, IntCounter, IntCounterVec, IntGauge, Registry,
};
use risingwave_common::config::MetricLevel;
use risingwave_common::metrics::{
Expand Down Expand Up @@ -68,8 +68,8 @@ pub struct StreamingMetrics {
pub source_split_change_count: GenericCounterVec<AtomicU64>,

// Sink & materialized view
pub sink_input_row_count: GenericCounterVec<AtomicU64>,
pub mview_input_row_count: GenericCounterVec<AtomicU64>,
pub sink_input_row_count: LabelGuardedIntCounterVec<3>,
pub mview_input_row_count: IntCounterVec,

// Exchange (see also `compute::ExchangeServiceMetrics`)
pub exchange_frag_recv_size: GenericCounterVec<AtomicU64>,
Expand Down Expand Up @@ -232,7 +232,7 @@ impl StreamingMetrics {
)
.unwrap();

let sink_input_row_count = register_int_counter_vec_with_registry!(
let sink_input_row_count = register_guarded_int_counter_vec_with_registry!(
"stream_sink_input_row_count",
"Total number of rows streamed into sink executors",
&["sink_id", "actor_id", "fragment_id"],
Expand Down
Loading

0 comments on commit 89cd4b2

Please sign in to comment.