diff --git a/proto/meta.proto b/proto/meta.proto index d51e22d444502..aa006a3400b1e 100644 --- a/proto/meta.proto +++ b/proto/meta.proto @@ -769,6 +769,12 @@ message EventLog { string cdc_table_id = 3; string upstream_ddl = 4; } + message EventSinkFail { + uint32 sink_id = 1; + string sink_name = 2; + string connector = 3; + string error = 4; + } // Event logs identifier, which should be populated by event log service. optional string unique_id = 1; // Processing time, which should be populated by event log service. @@ -782,6 +788,7 @@ message EventLog { EventCollectBarrierFail collect_barrier_fail = 8; EventLog.EventWorkerNodePanic worker_node_panic = 9; EventLog.EventAutoSchemaChangeFail auto_schema_change_fail = 10; + EventLog.EventSinkFail sink_fail = 11; } } @@ -795,6 +802,7 @@ message AddEventLogRequest { // A subset of EventLog.event that can be added by non meta node. oneof event { EventLog.EventWorkerNodePanic worker_node_panic = 1; + EventLog.EventSinkFail sink_fail = 2; } } diff --git a/src/connector/src/sink/mod.rs b/src/connector/src/sink/mod.rs index 56147cbc91755..2bbbb95582447 100644 --- a/src/connector/src/sink/mod.rs +++ b/src/connector/src/sink/mod.rs @@ -534,6 +534,29 @@ impl SinkMetaClient { } } } + + pub async fn add_sink_fail_evet_log( + &self, + sink_id: u32, + sink_name: String, + connector: String, + error: String, + ) { + match self { + SinkMetaClient::MetaClient(meta_client) => { + match meta_client + .add_sink_fail_evet(sink_id, sink_name, connector, error) + .await + { + Ok(_) => {} + Err(e) => { + tracing::warn!(error = %e.as_report(), sink_id = sink_id, "Fialed to add sink fail event to event log."); + } + } + } + SinkMetaClient::MockMetaClient(_) => {} + } + } } impl SinkWriterParam { diff --git a/src/frontend/src/catalog/system_catalog/rw_catalog/rw_event_logs.rs b/src/frontend/src/catalog/system_catalog/rw_catalog/rw_event_logs.rs index 1b0acfece2d01..ca5ad7efa3d53 100644 --- a/src/frontend/src/catalog/system_catalog/rw_catalog/rw_event_logs.rs +++ b/src/frontend/src/catalog/system_catalog/rw_catalog/rw_event_logs.rs @@ -63,6 +63,7 @@ fn event_type(e: &Event) -> String { Event::CollectBarrierFail(_) => "COLLECT_BARRIER_FAIL", Event::WorkerNodePanic(_) => "WORKER_NODE_PANIC", Event::AutoSchemaChangeFail(_) => "AUTO_SCHEMA_CHANGE_FAIL", + Event::SinkFail(_) => "SINK_FAIL", } .into() } diff --git a/src/meta/service/src/event_log_service.rs b/src/meta/service/src/event_log_service.rs index c7b148e28a58f..a118d6e9ca267 100644 --- a/src/meta/service/src/event_log_service.rs +++ b/src/meta/service/src/event_log_service.rs @@ -50,6 +50,9 @@ impl EventLogService for EventLogServiceImpl { risingwave_pb::meta::add_event_log_request::Event::WorkerNodePanic(e) => { risingwave_pb::meta::event_log::Event::WorkerNodePanic(e) } + risingwave_pb::meta::add_event_log_request::Event::SinkFail(e) => { + risingwave_pb::meta::event_log::Event::SinkFail(e) + } }; self.event_log_manager.add_event_logs(vec![e]); Ok(Response::new(AddEventLogResponse {})) diff --git a/src/meta/src/manager/event_log.rs b/src/meta/src/manager/event_log.rs index a1fdc356502d9..f45437b6a7d03 100644 --- a/src/meta/src/manager/event_log.rs +++ b/src/meta/src/manager/event_log.rs @@ -172,6 +172,7 @@ impl From<&EventLog> for ChannelId { Event::CollectBarrierFail(_) => 6, Event::WorkerNodePanic(_) => 7, Event::AutoSchemaChangeFail(_) => 8, + Event::SinkFail(_) => 9, } } } diff --git a/src/rpc_client/src/meta_client.rs b/src/rpc_client/src/meta_client.rs index e8b46ded0ed19..6e5dd813a240b 100644 --- a/src/rpc_client/src/meta_client.rs +++ b/src/rpc_client/src/meta_client.rs @@ -1423,6 +1423,26 @@ impl MetaClient { .join(); } + pub async fn add_sink_fail_evet( + &self, + sink_id: u32, + sink_name: String, + connector: String, + error: String, + ) -> Result<()> { + let event = event_log::EventSinkFail { + sink_id, + sink_name, + connector, + error, + }; + let req = AddEventLogRequest { + event: Some(add_event_log_request::Event::SinkFail(event)), + }; + self.inner.add_event_log(req).await?; + Ok(()) + } + pub async fn cancel_compact_task(&self, task_id: u64, task_status: TaskStatus) -> Result { let req = CancelCompactTaskRequest { task_id, diff --git a/src/stream/src/executor/sink.rs b/src/stream/src/executor/sink.rs index 91f5ed876155e..3847e36f9a95c 100644 --- a/src/stream/src/executor/sink.rs +++ b/src/stream/src/executor/sink.rs @@ -517,6 +517,17 @@ impl SinkExecutor { actor_context.fragment_id.to_string(), ]); + if let Some(meta_client) = sink_writer_param.meta_client.as_ref() { + meta_client + .add_sink_fail_evet_log( + sink_writer_param.sink_id.sink_id, + sink_writer_param.sink_name.clone(), + sink_writer_param.connector.clone(), + e.to_report_string(), + ) + .await; + } + match log_reader.rewind().await { Ok((true, curr_vnode_bitmap)) => { warn!( diff --git a/src/tests/simulation/tests/integration_tests/sink/err_isolation.rs b/src/tests/simulation/tests/integration_tests/sink/err_isolation.rs index bb153f0799961..124653946b872 100644 --- a/src/tests/simulation/tests/integration_tests/sink/err_isolation.rs +++ b/src/tests/simulation/tests/integration_tests/sink/err_isolation.rs @@ -64,3 +64,42 @@ async fn test_sink_decouple_err_isolation() -> Result<()> { Ok(()) } + +#[tokio::test] +async fn test_sink_error_event_logs() -> Result<()> { + let mut cluster = start_sink_test_cluster().await?; + + let source_parallelism = 6; + + let test_sink = SimulationTestSink::register_new(); + test_sink.set_err_rate(1.0); + let test_source = SimulationTestSource::register_new(source_parallelism, 0..100000, 0.2, 20); + + let mut session = cluster.start_session(); + + session.run("set streaming_parallelism = 6").await?; + session.run("set sink_decouple = true").await?; + session.run(CREATE_SOURCE).await?; + session.run(CREATE_SINK).await?; + assert_eq!(6, test_sink.parallelism_counter.load(Relaxed)); + + test_sink.store.wait_for_err(1).await?; + + session.run(DROP_SINK).await?; + session.run(DROP_SOURCE).await?; + + // Due to sink failure isolation, source stream should not be recreated + assert_eq!( + source_parallelism, + test_source.create_stream_count.load(Relaxed) + ); + + // Sink error should be recorded in rw_event_logs + let result = session + .run("select * from rw_event_logs where event_type = 'SINK_FAIL'") + .await?; + assert!(!result.is_empty()); + println!("Sink fail event logs: {:?}", result); + + Ok(()) +} diff --git a/src/tests/simulation/tests/integration_tests/sink/utils.rs b/src/tests/simulation/tests/integration_tests/sink/utils.rs index a134ab1a265ff..bef5bdfa35d0b 100644 --- a/src/tests/simulation/tests/integration_tests/sink/utils.rs +++ b/src/tests/simulation/tests/integration_tests/sink/utils.rs @@ -52,6 +52,7 @@ pub struct TestSinkStoreInner { pub id_name: HashMap>, pub epochs: Vec, pub checkpoint_count: usize, + pub err_count: usize, } #[derive(Clone)] @@ -66,6 +67,7 @@ impl TestSinkStore { id_name: HashMap::new(), epochs: Vec::new(), checkpoint_count: 0, + err_count: 0, })), } } @@ -99,6 +101,10 @@ impl TestSinkStore { self.inner().id_name.len() } + pub fn err_count(&self) -> usize { + self.inner().err_count + } + pub async fn wait_for_count(&self, count: usize) -> anyhow::Result<()> { let mut prev_count = 0; let mut has_printed = false; @@ -122,6 +128,16 @@ impl TestSinkStore { } Ok(()) } + + pub async fn wait_for_err(&self, count: usize) -> anyhow::Result<()> { + loop { + sleep(Duration::from_secs(1)).await; + if self.err_count() >= count { + break; + } + } + Ok(()) + } } pub struct TestWriter { @@ -154,6 +170,7 @@ impl SinkWriter for TestWriter { async fn write_batch(&mut self, chunk: StreamChunk) -> risingwave_connector::sink::Result<()> { if thread_rng().gen_ratio(self.err_rate.load(Relaxed), u32::MAX) { println!("write with err"); + self.store.inner().err_count += 1; return Err(SinkError::Internal(anyhow::anyhow!("fail to write"))); } for (op, row) in chunk.rows() {