Skip to content

Commit

Permalink
feat: report the error causes cdc connector exit to grafana (#14611)
Browse files Browse the repository at this point in the history
  • Loading branch information
StrikeW authored Jan 24, 2024
1 parent bf777a0 commit 9382565
Show file tree
Hide file tree
Showing 11 changed files with 105 additions and 21 deletions.
2 changes: 1 addition & 1 deletion docker/dashboards/risingwave-dev-dashboard.json

Large diffs are not rendered by default.

10 changes: 10 additions & 0 deletions grafana/risingwave-dev-dashboard.dashboard.py
Original file line number Diff line number Diff line change
Expand Up @@ -1013,6 +1013,16 @@ def section_streaming_cdc(outer_panels):
),
],
),
panels.timeseries_count(
"CDC Source Errors",
"",
[
panels.target(
f"sum({metric('cdc_source_error')}) by (connector_name, source_id, error_msg)",
"{{connector_name}}: {{error_msg}} ({{source_id}})",
),
],
),
],
),
]
Expand Down
2 changes: 1 addition & 1 deletion grafana/risingwave-dev-dashboard.json

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import com.risingwave.connector.api.source.*;
import com.risingwave.connector.source.common.DbzConnectorConfig;
import com.risingwave.connector.source.common.DbzSourceUtils;
import com.risingwave.java.binding.Binding;
import com.risingwave.proto.ConnectorServiceProto.GetEventStreamResponse;
import io.debezium.config.CommonConnectorConfig;
import io.grpc.stub.StreamObserver;
Expand All @@ -32,7 +33,7 @@ public class DbzCdcEngineRunner implements CdcEngineRunner {

private final ExecutorService executor;
private final AtomicBoolean running = new AtomicBoolean(false);
private final CdcEngine engine;
private CdcEngine engine;
private final DbzConnectorConfig config;

public static CdcEngineRunner newCdcEngineRunner(
Expand All @@ -58,17 +59,19 @@ public static CdcEngineRunner newCdcEngineRunner(
}
});

runner = new DbzCdcEngineRunner(engine, config);
runner = new DbzCdcEngineRunner(config);
runner.withEngine(engine);
} catch (Exception e) {
LOG.error("failed to create the CDC engine", e);
}
return runner;
}

public static CdcEngineRunner create(DbzConnectorConfig config) {
DbzCdcEngineRunner runner = null;
public static CdcEngineRunner create(DbzConnectorConfig config, long channelPtr) {
DbzCdcEngineRunner runner = new DbzCdcEngineRunner(config);
try {
var sourceId = config.getSourceId();
final DbzCdcEngineRunner finalRunner = runner;
var engine =
new DbzCdcEngine(
config.getSourceId(),
Expand All @@ -80,27 +83,46 @@ public static CdcEngineRunner create(DbzConnectorConfig config) {
sourceId,
message,
error);
String errorMsg =
(error != null ? error.getMessage() : message);
if (!Binding.sendCdcSourceErrorToChannel(
channelPtr, errorMsg)) {
LOG.warn(
"engine#{} unable to send error message: {}",
sourceId,
errorMsg);
}
// We need to stop the engine runner on debezium engine failure
try {
finalRunner.stop();
} catch (Exception e) {
LOG.warn("failed to stop the engine#{}", sourceId, e);
}
} else {
LOG.info("engine#{} stopped normally. {}", sourceId, message);
}
});

runner = new DbzCdcEngineRunner(engine, config);
runner.withEngine(engine);
} catch (Exception e) {
LOG.error("failed to create the CDC engine", e);
runner = null;
}
return runner;
}

// private constructor
private DbzCdcEngineRunner(CdcEngine engine, DbzConnectorConfig config) {
private DbzCdcEngineRunner(DbzConnectorConfig config) {
this.executor =
Executors.newSingleThreadExecutor(
r -> new Thread(r, "rw-dbz-engine-runner-" + engine.getId()));
this.engine = engine;
r -> new Thread(r, "rw-dbz-engine-runner-" + config.getSourceId()));
this.config = config;
}

private void withEngine(CdcEngine engine) {
this.engine = engine;
}

/** Start to run the cdc engine */
public boolean start() throws InterruptedException {
if (isRunning()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ public static void runJniDbzSourceThread(byte[] getEventStreamRequestBytes, long
}

public void start(long channelPtr) {
var runner = DbzCdcEngineRunner.create(config);
var runner = DbzCdcEngineRunner.create(config, channelPtr);
if (runner == null) {
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,12 @@ private void logIfEnabled(int level, String format, Object arg1, Object arg2) {

private void logIfEnabled(int level, String format, Object... arguments) {
if (TracingSlf4jImpl.isEnabled(level)) {
TracingSlf4jImpl.event(
name, level, new ParameterizedMessage(format, arguments).getFormattedMessage());
var pm = new ParameterizedMessage(format, arguments);
if (null != pm.getThrowable()) {
logIfEnabled(level, pm.getFormattedMessage(), pm.getThrowable());
} else {
TracingSlf4jImpl.event(name, level, pm.getFormattedMessage());
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,8 @@ public static native void tracingSlf4jEvent(

public static native boolean sendCdcSourceMsgToChannel(long channelPtr, byte[] msg);

public static native boolean sendCdcSourceErrorToChannel(long channelPtr, String errorMsg);

public static native com.risingwave.java.binding.JniSinkWriterStreamRequest
recvSinkWriterRequestFromChannel(long channelPtr);

Expand Down
9 changes: 9 additions & 0 deletions src/common/src/metrics/error_metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ pub struct ErrorMetrics {
pub user_compute_error: ErrorMetricRef<4>,
pub user_source_reader_error: ErrorMetricRef<5>,
pub user_source_error: ErrorMetricRef<5>,
pub cdc_source_error: ErrorMetricRef<3>,
}

impl ErrorMetrics {
Expand Down Expand Up @@ -123,6 +124,12 @@ impl ErrorMetrics {
"table_id",
],
)),
// cdc source is singleton, so we use source_id to identify the connector
cdc_source_error: Arc::new(ErrorMetric::new(
"cdc_source_error",
"CDC source errors in the system, queryable by tags",
&["connector_name", "source_id", "error_msg"],
)),
}
}

Expand All @@ -132,6 +139,7 @@ impl ErrorMetrics {
&self.user_compute_error.desc,
&self.user_source_reader_error.desc,
&self.user_source_error.desc,
&self.cdc_source_error.desc,
]
}

Expand All @@ -141,6 +149,7 @@ impl ErrorMetrics {
self.user_compute_error.collect(),
self.user_source_reader_error.collect(),
self.user_source_error.collect(),
self.cdc_source_error.collect(),
]
}
}
Expand Down
28 changes: 20 additions & 8 deletions src/connector/src/source/cdc/source/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use async_trait::async_trait;
use futures_async_stream::try_stream;
use itertools::Itertools;
use prost::Message;
use risingwave_common::metrics::GLOBAL_ERROR_METRICS;
use risingwave_common::util::addr::HostAddr;
use risingwave_jni_core::jvm_runtime::JVM;
use risingwave_jni_core::{call_static_method, JniReceiverType, JniSenderType};
Expand Down Expand Up @@ -202,14 +203,25 @@ impl<T: CdcSourceTypeTrait> CommonSplitReader for CdcSplitReader<T> {
let metrics = self.source_ctx.metrics.clone();

while let Some(result) = rx.recv().await {
let GetEventStreamResponse { events, .. } = result?;
tracing::trace!("receive {} cdc events ", events.len());
metrics
.connector_source_rows_received
.with_label_values(&[source_type.as_str_name(), &source_id])
.inc_by(events.len() as u64);
let msgs = events.into_iter().map(SourceMessage::from).collect_vec();
yield msgs;
match result {
Ok(GetEventStreamResponse { events, .. }) => {
tracing::trace!("receive {} cdc events ", events.len());
metrics
.connector_source_rows_received
.with_label_values(&[source_type.as_str_name(), &source_id])
.inc_by(events.len() as u64);
let msgs = events.into_iter().map(SourceMessage::from).collect_vec();
yield msgs;
}
Err(e) => {
GLOBAL_ERROR_METRICS.cdc_source_error.report([
source_type.as_str_name().into(),
source_id.clone(),
e.to_string(),
]);
Err(e)?;
}
}
}

Err(anyhow!("all senders are dropped"))?;
Expand Down
22 changes: 22 additions & 0 deletions src/jni_core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1016,6 +1016,28 @@ extern "system" fn Java_com_risingwave_java_binding_Binding_sendCdcSourceMsgToCh
})
}

#[no_mangle]
extern "system" fn Java_com_risingwave_java_binding_Binding_sendCdcSourceErrorToChannel<'a>(
env: EnvParam<'a>,
channel: Pointer<'a, JniSenderType<GetEventStreamResponse>>,
msg: JString<'a>,
) -> jboolean {
execute_and_catch(env, move |env| {
let err_msg: String = env
.get_string(&msg)
.expect("source error message should be a java string")
.into();

match channel.as_ref().blocking_send(Err(anyhow!(err_msg))) {
Ok(_) => Ok(JNI_TRUE),
Err(e) => {
tracing::error!(error = ?e.as_report(), "send error");
Ok(JNI_FALSE)
}
}
})
}

pub enum JniSinkWriterStreamRequest {
PbRequest(SinkWriterStreamRequest),
Chunk {
Expand Down
3 changes: 3 additions & 0 deletions src/jni_core/src/macros.rs
Original file line number Diff line number Diff line change
Expand Up @@ -505,6 +505,8 @@ macro_rules! for_all_plain_native_methods {

public static native boolean sendCdcSourceMsgToChannel(long channelPtr, byte[] msg);

public static native boolean sendCdcSourceErrorToChannel(long channelPtr, String errorMsg);

public static native com.risingwave.java.binding.JniSinkWriterStreamRequest
recvSinkWriterRequestFromChannel(long channelPtr);

Expand Down Expand Up @@ -916,6 +918,7 @@ mod tests {
iteratorGetByteaValue (JI)[B,
iteratorGetArrayValue (JILjava/lang/Class;)Ljava/lang/Object;,
sendCdcSourceMsgToChannel (J[B)Z,
sendCdcSourceErrorToChannel (JLjava/lang/String;)Z,
recvSinkWriterRequestFromChannel (J)Lcom/risingwave/java/binding/JniSinkWriterStreamRequest;,
sendSinkWriterResponseToChannel (J[B)Z,
sendSinkWriterErrorToChannel (JLjava/lang/String;)Z,
Expand Down

0 comments on commit 9382565

Please sign in to comment.