Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: report sink failure event to event log (#18958) #18997

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions proto/meta.proto
Original file line number Diff line number Diff line change
Expand Up @@ -769,6 +769,12 @@ message EventLog {
string cdc_table_id = 3;
string upstream_ddl = 4;
}
message EventSinkFail {
uint32 sink_id = 1;
string sink_name = 2;
string connector = 3;
string error = 4;
}
// Event logs identifier, which should be populated by event log service.
optional string unique_id = 1;
// Processing time, which should be populated by event log service.
Expand All @@ -782,6 +788,7 @@ message EventLog {
EventCollectBarrierFail collect_barrier_fail = 8;
EventLog.EventWorkerNodePanic worker_node_panic = 9;
EventLog.EventAutoSchemaChangeFail auto_schema_change_fail = 10;
EventLog.EventSinkFail sink_fail = 11;
}
}

Expand All @@ -795,6 +802,7 @@ message AddEventLogRequest {
// A subset of EventLog.event that can be added by non meta node.
oneof event {
EventLog.EventWorkerNodePanic worker_node_panic = 1;
EventLog.EventSinkFail sink_fail = 2;
}
}

Expand Down
23 changes: 23 additions & 0 deletions src/connector/src/sink/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -534,6 +534,29 @@ impl SinkMetaClient {
}
}
}

pub async fn add_sink_fail_evet_log(
&self,
sink_id: u32,
sink_name: String,
connector: String,
error: String,
) {
match self {
SinkMetaClient::MetaClient(meta_client) => {
match meta_client
.add_sink_fail_evet(sink_id, sink_name, connector, error)
.await
{
Ok(_) => {}
Err(e) => {
tracing::warn!(error = %e.as_report(), sink_id = sink_id, "Fialed to add sink fail event to event log.");
}
}
}
SinkMetaClient::MockMetaClient(_) => {}
}
}
}

impl SinkWriterParam {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ fn event_type(e: &Event) -> String {
Event::CollectBarrierFail(_) => "COLLECT_BARRIER_FAIL",
Event::WorkerNodePanic(_) => "WORKER_NODE_PANIC",
Event::AutoSchemaChangeFail(_) => "AUTO_SCHEMA_CHANGE_FAIL",
Event::SinkFail(_) => "SINK_FAIL",
}
.into()
}
3 changes: 3 additions & 0 deletions src/meta/service/src/event_log_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,9 @@ impl EventLogService for EventLogServiceImpl {
risingwave_pb::meta::add_event_log_request::Event::WorkerNodePanic(e) => {
risingwave_pb::meta::event_log::Event::WorkerNodePanic(e)
}
risingwave_pb::meta::add_event_log_request::Event::SinkFail(e) => {
risingwave_pb::meta::event_log::Event::SinkFail(e)
}
};
self.event_log_manager.add_event_logs(vec![e]);
Ok(Response::new(AddEventLogResponse {}))
Expand Down
1 change: 1 addition & 0 deletions src/meta/src/manager/event_log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,7 @@ impl From<&EventLog> for ChannelId {
Event::CollectBarrierFail(_) => 6,
Event::WorkerNodePanic(_) => 7,
Event::AutoSchemaChangeFail(_) => 8,
Event::SinkFail(_) => 9,
}
}
}
20 changes: 20 additions & 0 deletions src/rpc_client/src/meta_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1423,6 +1423,26 @@ impl MetaClient {
.join();
}

pub async fn add_sink_fail_evet(
&self,
sink_id: u32,
sink_name: String,
connector: String,
error: String,
) -> Result<()> {
let event = event_log::EventSinkFail {
sink_id,
sink_name,
connector,
error,
};
let req = AddEventLogRequest {
event: Some(add_event_log_request::Event::SinkFail(event)),
};
self.inner.add_event_log(req).await?;
Ok(())
}

pub async fn cancel_compact_task(&self, task_id: u64, task_status: TaskStatus) -> Result<bool> {
let req = CancelCompactTaskRequest {
task_id,
Expand Down
11 changes: 11 additions & 0 deletions src/stream/src/executor/sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -517,6 +517,17 @@ impl<F: LogStoreFactory> SinkExecutor<F> {
actor_context.fragment_id.to_string(),
]);

if let Some(meta_client) = sink_writer_param.meta_client.as_ref() {
meta_client
.add_sink_fail_evet_log(
sink_writer_param.sink_id.sink_id,
sink_writer_param.sink_name.clone(),
sink_writer_param.connector.clone(),
e.to_report_string(),
)
.await;
}

match log_reader.rewind().await {
Ok((true, curr_vnode_bitmap)) => {
warn!(
Expand Down
39 changes: 39 additions & 0 deletions src/tests/simulation/tests/integration_tests/sink/err_isolation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,3 +64,42 @@ async fn test_sink_decouple_err_isolation() -> Result<()> {

Ok(())
}

#[tokio::test]
async fn test_sink_error_event_logs() -> Result<()> {
let mut cluster = start_sink_test_cluster().await?;

let source_parallelism = 6;

let test_sink = SimulationTestSink::register_new();
test_sink.set_err_rate(1.0);
let test_source = SimulationTestSource::register_new(source_parallelism, 0..100000, 0.2, 20);

let mut session = cluster.start_session();

session.run("set streaming_parallelism = 6").await?;
session.run("set sink_decouple = true").await?;
session.run(CREATE_SOURCE).await?;
session.run(CREATE_SINK).await?;
assert_eq!(6, test_sink.parallelism_counter.load(Relaxed));

test_sink.store.wait_for_err(1).await?;

session.run(DROP_SINK).await?;
session.run(DROP_SOURCE).await?;

// Due to sink failure isolation, source stream should not be recreated
assert_eq!(
source_parallelism,
test_source.create_stream_count.load(Relaxed)
);

// Sink error should be recorded in rw_event_logs
let result = session
.run("select * from rw_event_logs where event_type = 'SINK_FAIL'")
.await?;
assert!(!result.is_empty());
println!("Sink fail event logs: {:?}", result);

Ok(())
}
17 changes: 17 additions & 0 deletions src/tests/simulation/tests/integration_tests/sink/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ pub struct TestSinkStoreInner {
pub id_name: HashMap<i32, Vec<String>>,
pub epochs: Vec<u64>,
pub checkpoint_count: usize,
pub err_count: usize,
}

#[derive(Clone)]
Expand All @@ -66,6 +67,7 @@ impl TestSinkStore {
id_name: HashMap::new(),
epochs: Vec::new(),
checkpoint_count: 0,
err_count: 0,
})),
}
}
Expand Down Expand Up @@ -99,6 +101,10 @@ impl TestSinkStore {
self.inner().id_name.len()
}

pub fn err_count(&self) -> usize {
self.inner().err_count
}

pub async fn wait_for_count(&self, count: usize) -> anyhow::Result<()> {
let mut prev_count = 0;
let mut has_printed = false;
Expand All @@ -122,6 +128,16 @@ impl TestSinkStore {
}
Ok(())
}

pub async fn wait_for_err(&self, count: usize) -> anyhow::Result<()> {
loop {
sleep(Duration::from_secs(1)).await;
if self.err_count() >= count {
break;
}
}
Ok(())
}
}

pub struct TestWriter {
Expand Down Expand Up @@ -154,6 +170,7 @@ impl SinkWriter for TestWriter {
async fn write_batch(&mut self, chunk: StreamChunk) -> risingwave_connector::sink::Result<()> {
if thread_rng().gen_ratio(self.err_rate.load(Relaxed), u32::MAX) {
println!("write with err");
self.store.inner().err_count += 1;
return Err(SinkError::Internal(anyhow::anyhow!("fail to write")));
}
for (op, row) in chunk.rows() {
Expand Down
Loading