Skip to content

Commit

Permalink
refine
Browse files Browse the repository at this point in the history
  • Loading branch information
ZENOTME committed Dec 12, 2023
1 parent b7c156a commit 8add493
Show file tree
Hide file tree
Showing 7 changed files with 49 additions and 60 deletions.
3 changes: 3 additions & 0 deletions e2e_test/iceberg/test_case/cdc/load.slt
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@ CREATE SINK s1 AS select * from products WITH (
primary_key = 'id'
);

statement ok
flush;

query I
select count(*) from products;
----
Expand Down
20 changes: 10 additions & 10 deletions src/connector/src/sink/iceberg/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,9 @@ use serde_derive::Deserialize;
use url::Url;
use with_options::WithOptions;

use self::prometheus::base_file_metrics_writer::BaseFileWriterWithMetricsBuilder;
use self::prometheus::partition_metrics_writer::FanoutPartitionedWriterWithMetricsBuilder;
use self::prometheus::position_delete_metrics_writer::PositionDeleteWriterWithMetricsBuilder;
use self::prometheus::monitored_base_file_writer::MonitoredBaseFileWriterBuilder;
use self::prometheus::monitored_partition_writer::MonitoredFanoutPartitionedWriterBuilder;
use self::prometheus::monitored_position_delete_writer::MonitoredPositionDeleteWriterBuilder;
use super::{
Sink, SinkError, SinkWriterParam, SINK_TYPE_APPEND_ONLY, SINK_TYPE_OPTION, SINK_TYPE_UPSERT,
};
Expand Down Expand Up @@ -394,15 +394,15 @@ impl IcebergWriter {
pub async fn new_append_only(table: Table, writer_param: &SinkWriterParam) -> Result<Self> {
let builder_helper = table.builder_helper()?;

let data_file_builder = DataFileWriterBuilder::new(BaseFileWriterWithMetricsBuilder::new(
let data_file_builder = DataFileWriterBuilder::new(MonitoredBaseFileWriterBuilder::new(
builder_helper
.rolling_writer_builder(builder_helper.parquet_writer_builder(0, None)?)?,
writer_param
.sink_metrics
.iceberg_rolling_unflushed_data_file
.clone(),
));
let partition_data_file_builder = FanoutPartitionedWriterWithMetricsBuilder::new(
let partition_data_file_builder = MonitoredFanoutPartitionedWriterBuilder::new(
builder_helper.fanout_partition_writer_builder(data_file_builder.clone())?,
writer_param.sink_metrics.iceberg_partition_num.clone(),
);
Expand Down Expand Up @@ -434,30 +434,30 @@ impl IcebergWriter {
writer_param: &SinkWriterParam,
) -> Result<Self> {
let builder_helper = table.builder_helper()?;
let data_file_builder = DataFileWriterBuilder::new(BaseFileWriterWithMetricsBuilder::new(
let data_file_builder = DataFileWriterBuilder::new(MonitoredBaseFileWriterBuilder::new(
builder_helper
.rolling_writer_builder(builder_helper.parquet_writer_builder(0, None)?)?,
writer_param
.sink_metrics
.iceberg_rolling_unflushed_data_file
.clone(),
));
let position_delete_builder = PositionDeleteWriterWithMetricsBuilder::new(
let position_delete_builder = MonitoredPositionDeleteWriterBuilder::new(
builder_helper.position_delete_writer_builder(0, 1024)?,
writer_param
.sink_metrics
.iceberg_position_delete_cache_num
.clone(),
);
let euality_delete_builder =
let equality_delete_builder =
builder_helper.equality_delete_writer_builder(unique_column_ids.clone(), 0)?;
let delta_builder = EqualityDeltaWriterBuilder::new(
data_file_builder,
position_delete_builder,
euality_delete_builder,
equality_delete_builder,
unique_column_ids,
);
let partition_delta_builder = FanoutPartitionedWriterWithMetricsBuilder::new(
let partition_delta_builder = MonitoredFanoutPartitionedWriterBuilder::new(
builder_helper.fanout_partition_writer_builder(delta_builder.clone())?,
writer_param.sink_metrics.iceberg_partition_num.clone(),
);
Expand Down
8 changes: 4 additions & 4 deletions src/connector/src/sink/iceberg/prometheus/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

pub mod base_file_metrics_writer;
pub mod partition_metrics_writer;
pub mod position_delete_metrics_writer;
pub mod write_metrics_writer;
pub mod monitored_base_file_writer;
pub mod monitored_partition_writer;
pub mod monitored_position_delete_writer;
pub mod monitored_write_writer;
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,13 @@ use icelake::Result;
use risingwave_common::metrics::LabelGuardedIntGauge;

#[derive(Clone)]
pub struct BaseFileWriterWithMetricsBuilder<B: FileWriterBuilder> {
pub struct MonitoredBaseFileWriterBuilder<B: FileWriterBuilder> {
inner: BaseFileWriterBuilder<B>,
// metrics
unflush_data_file: LabelGuardedIntGauge<2>,
}

impl<B: FileWriterBuilder> BaseFileWriterWithMetricsBuilder<B> {
impl<B: FileWriterBuilder> MonitoredBaseFileWriterBuilder<B> {
pub fn new(
inner: BaseFileWriterBuilder<B>,
unflush_data_file: LabelGuardedIntGauge<2>,
Expand All @@ -41,11 +41,11 @@ impl<B: FileWriterBuilder> BaseFileWriterWithMetricsBuilder<B> {
}

#[async_trait::async_trait]
impl<B: FileWriterBuilder> FileWriterBuilder for BaseFileWriterWithMetricsBuilder<B> {
type R = BaseFileWriterWithMetrics<B>;
impl<B: FileWriterBuilder> FileWriterBuilder for MonitoredBaseFileWriterBuilder<B> {
type R = MonitoredBaseFileWriter<B>;

async fn build(self, schema: &SchemaRef) -> Result<Self::R> {
Ok(BaseFileWriterWithMetrics {
Ok(MonitoredBaseFileWriter {
inner: self.inner.build(schema).await?,
unflush_data_file: self.unflush_data_file,
cur_metrics: BaseFileWriterMetrics {
Expand All @@ -55,7 +55,7 @@ impl<B: FileWriterBuilder> FileWriterBuilder for BaseFileWriterWithMetricsBuilde
}
}

pub struct BaseFileWriterWithMetrics<B: FileWriterBuilder> {
pub struct MonitoredBaseFileWriter<B: FileWriterBuilder> {
inner: BaseFileWriter<B>,

// metrics
Expand All @@ -65,7 +65,7 @@ pub struct BaseFileWriterWithMetrics<B: FileWriterBuilder> {
}

#[async_trait::async_trait]
impl<B: FileWriterBuilder> FileWriter for BaseFileWriterWithMetrics<B> {
impl<B: FileWriterBuilder> FileWriter for MonitoredBaseFileWriter<B> {
type R = <<B as FileWriterBuilder>::R as FileWriter>::R;

/// Write a record batch. The `DataFileWriter` will create a new file when the current row num is greater than `target_file_row_num`.
Expand All @@ -91,7 +91,7 @@ impl<B: FileWriterBuilder> FileWriter for BaseFileWriterWithMetrics<B> {
}
}

impl<B: FileWriterBuilder> CurrentFileStatus for BaseFileWriterWithMetrics<B> {
impl<B: FileWriterBuilder> CurrentFileStatus for MonitoredBaseFileWriter<B> {
fn current_file_path(&self) -> String {
self.inner.current_file_path()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,12 @@ use icelake::Result;
use risingwave_common::metrics::LabelGuardedIntGauge;

#[derive(Clone)]
pub struct FanoutPartitionedWriterWithMetricsBuilder<B: IcebergWriterBuilder> {
pub struct MonitoredFanoutPartitionedWriterBuilder<B: IcebergWriterBuilder> {
inner: FanoutPartitionedWriterBuilder<B>,
partition_num: LabelGuardedIntGauge<2>,
}

impl<B: IcebergWriterBuilder> FanoutPartitionedWriterWithMetricsBuilder<B> {
impl<B: IcebergWriterBuilder> MonitoredFanoutPartitionedWriterBuilder<B> {
pub fn new(
inner: FanoutPartitionedWriterBuilder<B>,
partition_num: LabelGuardedIntGauge<2>,
Expand All @@ -39,35 +39,26 @@ impl<B: IcebergWriterBuilder> FanoutPartitionedWriterWithMetricsBuilder<B> {
}

#[async_trait::async_trait]
impl<B: IcebergWriterBuilder> IcebergWriterBuilder for FanoutPartitionedWriterWithMetricsBuilder<B>
where
B::R: IcebergWriter,
{
type R = FanoutPartitionedWriterWithMetrics<B>;
impl<B: IcebergWriterBuilder> IcebergWriterBuilder for MonitoredFanoutPartitionedWriterBuilder<B> {
type R = MonitoredFanoutPartitionedWriter<B>;

async fn build(self, schema: &SchemaRef) -> Result<Self::R> {
let writer = self.inner.build(schema).await?;
Ok(FanoutPartitionedWriterWithMetrics {
Ok(MonitoredFanoutPartitionedWriter {
inner: writer,
partition_num: self.partition_num,
current_metrics: FanoutPartitionedWriterMetrics { partition_num: 0 },
})
}
}

pub struct FanoutPartitionedWriterWithMetrics<B: IcebergWriterBuilder>
where
B::R: IcebergWriter,
{
pub struct MonitoredFanoutPartitionedWriter<B: IcebergWriterBuilder> {
inner: FanoutPartitionedWriter<B>,
partition_num: LabelGuardedIntGauge<2>,
current_metrics: FanoutPartitionedWriterMetrics,
}

impl<B: IcebergWriterBuilder> FanoutPartitionedWriterWithMetrics<B>
where
B::R: IcebergWriter,
{
impl<B: IcebergWriterBuilder> MonitoredFanoutPartitionedWriter<B> {
pub fn update_metrics(&mut self) -> Result<()> {
let last_metrics = std::mem::replace(&mut self.current_metrics, self.inner.metrics());
{
Expand All @@ -80,10 +71,7 @@ where
}

#[async_trait::async_trait]
impl<B: IcebergWriterBuilder> IcebergWriter for FanoutPartitionedWriterWithMetrics<B>
where
B::R: IcebergWriter,
{
impl<B: IcebergWriterBuilder> IcebergWriter for MonitoredFanoutPartitionedWriter<B> {
type R = <FanoutPartitionedWriter<B> as IcebergWriter>::R;

async fn write(&mut self, batch: arrow_array::RecordBatch) -> Result<()> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,12 @@ use icelake::Result;
use risingwave_common::metrics::LabelGuardedIntGauge;

#[derive(Clone)]
pub struct PositionDeleteWriterWithMetricsBuilder<B: FileWriterBuilder> {
pub struct MonitoredPositionDeleteWriterBuilder<B: FileWriterBuilder> {
current_cache_number: LabelGuardedIntGauge<2>,
inner: PositionDeleteWriterBuilder<B>,
}

impl<B: FileWriterBuilder> PositionDeleteWriterWithMetricsBuilder<B> {
impl<B: FileWriterBuilder> MonitoredPositionDeleteWriterBuilder<B> {
pub fn new(
inner: PositionDeleteWriterBuilder<B>,
current_cache_number: LabelGuardedIntGauge<2>,
Expand All @@ -39,13 +39,13 @@ impl<B: FileWriterBuilder> PositionDeleteWriterWithMetricsBuilder<B> {

#[async_trait::async_trait]
impl<B: FileWriterBuilder> IcebergWriterBuilder<PositionDeleteInput>
for PositionDeleteWriterWithMetricsBuilder<B>
for MonitoredPositionDeleteWriterBuilder<B>
{
type R = PositionDeleteWriterWithMetrics<B>;
type R = MonitoredPositionDeleteWriter<B>;

async fn build(self, schema: &arrow_schema::SchemaRef) -> Result<Self::R> {
let writer = self.inner.build(schema).await?;
Ok(PositionDeleteWriterWithMetrics {
Ok(MonitoredPositionDeleteWriter {
writer,
cache_number: self.current_cache_number,
current_metrics: PositionDeleteMetrics {
Expand All @@ -55,15 +55,15 @@ impl<B: FileWriterBuilder> IcebergWriterBuilder<PositionDeleteInput>
}
}

pub struct PositionDeleteWriterWithMetrics<B: FileWriterBuilder> {
pub struct MonitoredPositionDeleteWriter<B: FileWriterBuilder> {
writer: PositionDeleteWriter<B>,

// metrics
cache_number: LabelGuardedIntGauge<2>,
current_metrics: PositionDeleteMetrics,
}

impl<B: FileWriterBuilder> PositionDeleteWriterWithMetrics<B> {
impl<B: FileWriterBuilder> MonitoredPositionDeleteWriter<B> {
fn update_metrics(&mut self) -> Result<()> {
let last_metrics = std::mem::replace(&mut self.current_metrics, self.writer.metrics());
{
Expand All @@ -76,9 +76,7 @@ impl<B: FileWriterBuilder> PositionDeleteWriterWithMetrics<B> {
}

#[async_trait::async_trait]
impl<B: FileWriterBuilder> IcebergWriter<PositionDeleteInput>
for PositionDeleteWriterWithMetrics<B>
{
impl<B: FileWriterBuilder> IcebergWriter<PositionDeleteInput> for MonitoredPositionDeleteWriter<B> {
type R = <PositionDeleteWriter<B> as IcebergWriter<PositionDeleteInput>>::R;

async fn write(&mut self, input: PositionDeleteInput) -> Result<()> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,13 @@ use icelake::Result;
use risingwave_common::metrics::{LabelGuardedHistogram, LabelGuardedIntCounter};

#[derive(Clone)]
pub struct WriteMetricsWriterBuilder<B: IcebergWriterBuilder> {
pub struct MonitoredWriteWriterBuilder<B: IcebergWriterBuilder> {
inner: B,
write_qps: LabelGuardedIntCounter<2>,
write_latency: LabelGuardedHistogram<2>,
}

impl<B: IcebergWriterBuilder> WriteMetricsWriterBuilder<B> {
impl<B: IcebergWriterBuilder> MonitoredWriteWriterBuilder<B> {
/// Create writer context.
pub fn new(
inner: B,
Expand All @@ -42,32 +42,32 @@ impl<B: IcebergWriterBuilder> WriteMetricsWriterBuilder<B> {
}

#[async_trait::async_trait]
impl<B: IcebergWriterBuilder> IcebergWriterBuilder for WriteMetricsWriterBuilder<B> {
type R = WriteMetricsWriter<B::R>;
impl<B: IcebergWriterBuilder> IcebergWriterBuilder for MonitoredWriteWriterBuilder<B> {
type R = MonitoredWriteWriter<B::R>;

async fn build(self, schema: &SchemaRef) -> Result<Self::R> {
let appender = self.inner.build(schema).await?;
Ok(WriteMetricsWriter {
Ok(MonitoredWriteWriter {
appender,
write_qps: self.write_qps,
write_latency: self.write_latency,
})
}
}

pub struct WriteMetricsWriter<F: IcebergWriter> {
pub struct MonitoredWriteWriter<F: IcebergWriter> {
appender: F,
write_qps: LabelGuardedIntCounter<2>,
write_latency: LabelGuardedHistogram<2>,
}

#[async_trait]
impl<F: IcebergWriter> IcebergWriter for WriteMetricsWriter<F> {
impl<F: IcebergWriter> IcebergWriter for MonitoredWriteWriter<F> {
type R = F::R;

async fn write(&mut self, record: RecordBatch) -> Result<()> {
self.write_qps.inc();
let _ = self.write_latency.start_timer();
let _timer = self.write_latency.start_timer();
self.appender.write(record).await
}

Expand Down

0 comments on commit 8add493

Please sign in to comment.