diff --git a/Cargo.lock b/Cargo.lock
index c08c420fbd7de..a3709bc50a332 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -7308,6 +7308,8 @@ dependencies = [
  "tokio-util",
  "tonic 0.9.2",
  "tracing",
+ "tracing-futures",
+ "tracing-test",
  "url",
  "urlencoding",
  "workspace-hack",
diff --git a/src/connector/Cargo.toml b/src/connector/Cargo.toml
index b322d1b6eb4b6..707fb24651995 100644
--- a/src/connector/Cargo.toml
+++ b/src/connector/Cargo.toml
@@ -129,6 +129,7 @@ tokio-util = { version = "0.7", features = ["codec", "io"] }
 tonic = { workspace = true }
 tonic_0_9 = { package = "tonic", version = "0.9" }
 tracing = "0.1"
+tracing-futures = { version = "0.2", features = ["futures-03"] }
 url = "2"
 urlencoding = "2"
 
@@ -140,6 +141,7 @@ criterion = { workspace = true, features = ["async_tokio", "async"] }
 prost-types = "0.12"
 rand = "0.8"
 tempfile = "3"
+tracing-test = "0.2"
 
 [build-dependencies]
 prost-build = "0.12"
diff --git a/src/connector/src/parser/canal/simd_json_parser.rs b/src/connector/src/parser/canal/simd_json_parser.rs
index e9d15368cbf7c..f6ddbac07ecab 100644
--- a/src/connector/src/parser/canal/simd_json_parser.rs
+++ b/src/connector/src/parser/canal/simd_json_parser.rs
@@ -12,6 +12,7 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
+use itertools::Itertools;
 use risingwave_common::error::ErrorCode::{self, ProtocolError};
 use risingwave_common::error::{Result, RwError};
 use simd_json::{BorrowedValue, Mutable, ValueAccess};
@@ -89,24 +90,23 @@ impl CanalJsonParser {
                     "'data' is missing for creating event".to_string(),
                 ))
             })?;
+
         let mut errors = Vec::new();
-        let mut guard = None;
         for event in events.drain(..) {
             let accessor = JsonAccess::new_with_options(event, &JsonParseOptions::CANAL);
             match apply_row_operation_on_stream_chunk_writer((op, accessor), &mut writer) {
-                Ok(this_guard) => guard = Some(this_guard),
+                Ok(_) => {}
                 Err(err) => errors.push(err),
             }
         }
-        if let Some(guard) = guard {
-            if !errors.is_empty() {
-                tracing::error!(?errors, "failed to parse some columns");
-            }
-            Ok(guard)
+
+        if errors.is_empty() {
+            Ok(())
         } else {
             Err(RwError::from(ErrorCode::InternalError(format!(
-                "failed to parse all columns: {:?}",
-                errors
+                "failed to parse {} row(s) in a single canal json message: {}",
+                errors.len(),
+                errors.iter().join(", ")
             ))))
         }
     }
diff --git a/src/connector/src/parser/csv_parser.rs b/src/connector/src/parser/csv_parser.rs
index 238595f82c1f1..da8882eaf1ed6 100644
--- a/src/connector/src/parser/csv_parser.rs
+++ b/src/connector/src/parser/csv_parser.rs
@@ -12,22 +12,23 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
-use std::str::FromStr;
-
-use anyhow::anyhow;
-use risingwave_common::error::ErrorCode::{InternalError, ProtocolError};
+use risingwave_common::error::ErrorCode::ProtocolError;
 use risingwave_common::error::{ErrorCode, Result, RwError};
-use risingwave_common::types::{Date, Datum, Decimal, ScalarImpl, Time, Timestamp, Timestamptz};
+use risingwave_common::types::{Date, Decimal, Time, Timestamp, Timestamptz};
 
+use super::unified::{AccessError, AccessResult};
 use super::{ByteStreamSourceParser, CsvProperties};
 use crate::only_parse_payload;
 use crate::parser::SourceStreamChunkRowWriter;
 use crate::source::{DataType, SourceColumnDesc, SourceContext, SourceContextRef};
 
-macro_rules! to_rust_type {
+macro_rules! parse {
     ($v:ident, $t:ty) => {
-        $v.parse::<$t>()
-            .map_err(|_| anyhow!("failed parse {} from {}", stringify!($t), $v))?
+        $v.parse::<$t>().map_err(|_| AccessError::TypeError {
+            expected: stringify!($t).to_owned(),
+            got: "string".to_owned(),
+            value: $v.to_string(),
+        })
     };
 }
 
@@ -74,29 +75,26 @@ impl CsvParser {
     }
 
     #[inline]
-    fn parse_string(dtype: &DataType, v: String) -> Result<Datum> {
+    fn parse_string(dtype: &DataType, v: String) -> AccessResult {
         let v = match dtype {
             // mysql use tinyint to represent boolean
-            DataType::Boolean => ScalarImpl::Bool(to_rust_type!(v, i16) != 0),
-            DataType::Int16 => ScalarImpl::Int16(to_rust_type!(v, i16)),
-            DataType::Int32 => ScalarImpl::Int32(to_rust_type!(v, i32)),
-            DataType::Int64 => ScalarImpl::Int64(to_rust_type!(v, i64)),
-            DataType::Float32 => ScalarImpl::Float32(to_rust_type!(v, f32).into()),
-            DataType::Float64 => ScalarImpl::Float64(to_rust_type!(v, f64).into()),
+            DataType::Boolean => (parse!(v, i16)? != 0).into(),
+            DataType::Int16 => parse!(v, i16)?.into(),
+            DataType::Int32 => parse!(v, i32)?.into(),
+            DataType::Int64 => parse!(v, i64)?.into(),
+            DataType::Float32 => parse!(v, f32)?.into(),
+            DataType::Float64 => parse!(v, f64)?.into(),
             // FIXME: decimal should have more precision than f64
-            DataType::Decimal => Decimal::from_str(v.as_str())
-                .map_err(|_| anyhow!("parse decimal from string err {}", v))?
-                .into(),
+            DataType::Decimal => parse!(v, Decimal)?.into(),
             DataType::Varchar => v.into(),
-            DataType::Date => ScalarImpl::Date(to_rust_type!(v, Date)),
-            DataType::Time => ScalarImpl::Time(to_rust_type!(v, Time)),
-            DataType::Timestamp => ScalarImpl::Timestamp(to_rust_type!(v, Timestamp)),
-            DataType::Timestamptz => ScalarImpl::Timestamptz(to_rust_type!(v, Timestamptz)),
+            DataType::Date => parse!(v, Date)?.into(),
+            DataType::Time => parse!(v, Time)?.into(),
+            DataType::Timestamp => parse!(v, Timestamp)?.into(),
+            DataType::Timestamptz => parse!(v, Timestamptz)?.into(),
             _ => {
-                return Err(RwError::from(InternalError(format!(
-                    "CSV data source not support type {}",
-                    dtype
-                ))))
+                return Err(AccessError::UnsupportedType {
+                    ty: dtype.to_string(),
+                })
             }
         };
         Ok(Some(v))
@@ -109,12 +107,12 @@ impl CsvParser {
         mut writer: SourceStreamChunkRowWriter<'_>,
     ) -> Result<()> {
         let mut fields = self.read_row(&payload)?;
+
         if let Some(headers) = &mut self.headers {
             if headers.is_empty() {
                 *headers = fields;
-                // Here we want a row, but got nothing. So it's an error for the `parse_inner` but
-                // has no bad impact on the system.
-                return Err(RwError::from(ProtocolError("This message indicates a header, no row will be inserted. However, internal parser state was updated.".to_string())));
+                // The header row does not output a row, so we return early.
+                return Ok(());
             }
             writer.insert(|desc| {
                 if let Some(i) = headers.iter().position(|name| name == &desc.name) {
@@ -126,7 +124,7 @@ impl CsvParser {
                 } else {
                     Ok(None)
                 }
-            })
+            })?;
         } else {
             fields.reverse();
             writer.insert(|desc| {
@@ -138,8 +136,10 @@ impl CsvParser {
                 } else {
                     Ok(None)
                 }
-            })
+            })?;
         }
+
+        Ok(())
     }
 }
 
diff --git a/src/connector/src/parser/debezium/debezium_parser.rs b/src/connector/src/parser/debezium/debezium_parser.rs
index e98a57650d308..0fab32bc1adb9 100644
--- a/src/connector/src/parser/debezium/debezium_parser.rs
+++ b/src/connector/src/parser/debezium/debezium_parser.rs
@@ -111,7 +111,7 @@ impl DebeziumParser {
                 if let Ok(transaction_control) = row_op.transaction_control() {
                     Ok(ParseResult::TransactionControl(transaction_control))
                 } else {
-                    Err(err)
+                    Err(err)?
                 }
             }
         }
diff --git a/src/connector/src/parser/debezium/mongo_json_parser.rs b/src/connector/src/parser/debezium/mongo_json_parser.rs
index 5d8738a3fa865..cd10fb2dc21f5 100644
--- a/src/connector/src/parser/debezium/mongo_json_parser.rs
+++ b/src/connector/src/parser/debezium/mongo_json_parser.rs
@@ -99,7 +99,7 @@ impl DebeziumMongoJsonParser {
 
         let row_op = DebeziumChangeEvent::with_value(MongoProjection::new(accessor));
 
-        apply_row_operation_on_stream_chunk_writer(row_op, &mut writer)
+        apply_row_operation_on_stream_chunk_writer(row_op, &mut writer).map_err(Into::into)
     }
 }
 
diff --git a/src/connector/src/parser/debezium/simd_json_parser.rs b/src/connector/src/parser/debezium/simd_json_parser.rs
index 40befaf0f60e7..ca4313c43edcb 100644
--- a/src/connector/src/parser/debezium/simd_json_parser.rs
+++ b/src/connector/src/parser/debezium/simd_json_parser.rs
@@ -447,7 +447,9 @@ mod tests {
             assert_json_eq(&row[12], "{\"k1\": \"v1_updated\", \"k2\": 33}");
         }
 
+        #[cfg(not(madsim))] // Traced test does not work with madsim
         #[tokio::test]
+        #[tracing_test::traced_test]
         async fn test2_debezium_json_parser_overflow() {
             let columns = vec![
                 SourceColumnDesc::simple("O_KEY", DataType::Int64, ColumnId::from(0)),
@@ -478,23 +480,19 @@ mod tests {
                     r#"{{"payload":{{"before":null,"after":{{"O_KEY":{},"O_BOOL":{},"O_TINY":{},"O_INT":{},"O_REAL":{},"O_DOUBLE":{}}},"source":{{"version":"1.9.7.Final","connector":"mysql","name":"RW_CDC_test.orders","ts_ms":1678158055000,"snapshot":"false","db":"test","sequence":null,"table":"orders","server_id":223344,"gtid":null,"file":"mysql-bin.000003","pos":637,"row":0,"thread":4,"query":null}},"op":"c","ts_ms":1678158055464,"transaction":null}}}}"#,
                     values[0], values[1], values[2], values[3], values[4], values[5]
                 ).as_bytes().to_vec();
-                let e = parser
+
+                let res = parser
                     .parse_inner(None, Some(data), builder.row_writer())
-                    .await
-                    .unwrap_err();
-                println!("{}", e);
+                    .await;
                 if i < 5 {
                     // For other overflow, the parsing succeeds but the type conversion fails
-                    assert!(
-                        e.to_string().contains("AccessError: TypeError"),
-                        "i={i}, actual error: {e}"
-                    );
+                    // The errors are ignored and logged.
+                    res.unwrap();
+                    assert!(logs_contain("Expected type"), "{i}");
                 } else {
                     // For f64 overflow, the parsing fails
-                    assert!(
-                        e.to_string().contains("InvalidNumber"),
-                        "i={i}, actual error: {e}"
-                    );
+                    let e = res.unwrap_err();
+                    assert!(e.to_string().contains("InvalidNumber"), "{i}: {e}");
                 }
             }
         }
diff --git a/src/connector/src/parser/json_parser.rs b/src/connector/src/parser/json_parser.rs
index b3e108a242ab7..c173599f7d501 100644
--- a/src/connector/src/parser/json_parser.rs
+++ b/src/connector/src/parser/json_parser.rs
@@ -15,6 +15,7 @@
 use std::collections::HashMap;
 
 use apache_avro::Schema;
+use itertools::{Either, Itertools};
 use jst::{convert_avro, Context};
 use risingwave_common::error::ErrorCode::{self, InternalError, ProtocolError};
 use risingwave_common::error::{Result, RwError};
@@ -111,29 +112,27 @@ impl JsonParser {
         let value = simd_json::to_borrowed_value(&mut payload[self.payload_start_idx..])
             .map_err(|e| RwError::from(ProtocolError(e.to_string())))?;
         let values = if let simd_json::BorrowedValue::Array(arr) = value {
-            arr
+            Either::Left(arr.into_iter())
         } else {
-            vec![value]
+            Either::Right(std::iter::once(value))
         };
+
         let mut errors = Vec::new();
-        let mut guard = None;
         for value in values {
             let accessor = JsonAccess::new(value);
             match apply_row_accessor_on_stream_chunk_writer(accessor, &mut writer) {
-                Ok(this_guard) => guard = Some(this_guard),
+                Ok(_) => {}
                 Err(err) => errors.push(err),
             }
         }
 
-        if let Some(guard) = guard {
-            if !errors.is_empty() {
-                tracing::error!(?errors, "failed to parse some columns");
-            }
-            Ok(guard)
+        if errors.is_empty() {
+            Ok(())
         } else {
             Err(RwError::from(ErrorCode::InternalError(format!(
-                "failed to parse all columns: {:?}",
-                errors
+                "failed to parse {} row(s) in a single json message: {}",
+                errors.len(),
+                errors.iter().join(", ")
             ))))
         }
     }
diff --git a/src/connector/src/parser/maxwell/maxwell_parser.rs b/src/connector/src/parser/maxwell/maxwell_parser.rs
index 0980472021bff..8bee0bbfef1e5 100644
--- a/src/connector/src/parser/maxwell/maxwell_parser.rs
+++ b/src/connector/src/parser/maxwell/maxwell_parser.rs
@@ -61,7 +61,7 @@ impl MaxwellParser {
         let payload_accessor = self.payload_builder.generate_accessor(payload).await?;
         let row_op = MaxwellChangeEvent::new(payload_accessor);
 
-        apply_row_operation_on_stream_chunk_writer(row_op, &mut writer)
+        apply_row_operation_on_stream_chunk_writer(row_op, &mut writer).map_err(Into::into)
     }
 }
 
diff --git a/src/connector/src/parser/mod.rs b/src/connector/src/parser/mod.rs
index 7446f76a600b8..45c62609f5bf0 100644
--- a/src/connector/src/parser/mod.rs
+++ b/src/connector/src/parser/mod.rs
@@ -34,13 +34,14 @@ use risingwave_pb::catalog::{
     SchemaRegistryNameStrategy as PbSchemaRegistryNameStrategy, StreamSourceInfo,
 };
 pub use schema_registry::name_strategy_from_str;
+use tracing_futures::Instrument;
 
 use self::avro::AvroAccessBuilder;
 use self::bytes_parser::BytesAccessBuilder;
 pub use self::mysql::mysql_row_to_datums;
 use self::plain_parser::PlainParser;
 use self::simd_json_parser::DebeziumJsonAccessBuilder;
-use self::unified::AccessImpl;
+use self::unified::{AccessImpl, AccessResult};
 use self::upsert_parser::UpsertParser;
 use self::util::get_kafka_topic;
 use crate::aws_auth::AwsAuthProps;
@@ -128,6 +129,13 @@ impl SourceStreamChunkBuilder {
 
 /// `SourceStreamChunkRowWriter` is responsible to write one or more records to the [`StreamChunk`],
 /// where each contains either one row (Insert/Delete) or two rows (Update) that can be written atomically.
+///
+/// Callers are supposed to call one of the `insert`, `delete` or `update` methods to write a record,
+/// providing a closure that produces one or two [`Datum`]s by corresponding [`SourceColumnDesc`].
+/// Specifically,
+/// - only columns with [`SourceColumnType::Normal`] need to be handled;
+/// - errors for non-primary key columns will be ignored and filled with default value instead;
+/// - other errors will be propagated.
 pub struct SourceStreamChunkRowWriter<'a> {
     descs: &'a [SourceColumnDesc],
     builders: &'a mut [ArrayBuilderImpl],
@@ -282,15 +290,33 @@ impl SourceStreamChunkRowWriter<'_> {
 impl SourceStreamChunkRowWriter<'_> {
     fn do_action<A: OpAction>(
         &mut self,
-        mut f: impl FnMut(&SourceColumnDesc) -> Result<A::Output>,
-    ) -> Result<()> {
-        let mut f_with_meta = |desc: &SourceColumnDesc| {
+        mut f: impl FnMut(&SourceColumnDesc) -> AccessResult<A::Output>,
+    ) -> AccessResult<()> {
+        let mut wrapped_f = |desc: &SourceColumnDesc| {
             if let Some(meta_value) =
                 (self.row_meta.as_ref()).and_then(|row_meta| row_meta.value_for_column(desc))
             {
+                // For meta columns, fill in the meta data.
                 Ok(A::output_for(meta_value))
             } else {
-                f(desc)
+                // For normal columns, call the user provided closure.
+                match f(desc) {
+                    Ok(output) => Ok(output),
+
+                    // Throw error for failed access to primary key columns.
+                    Err(e) if desc.is_pk => Err(e),
+                    // Ignore error for other columns and fill in `NULL` instead.
+                    Err(error) => {
+                        // TODO: figure out a way to fill in not-null default value if user specifies one
+                        // TODO: decide whether the error should not be ignored (e.g., even not a valid Debezium message)
+                        tracing::warn!(
+                            %error,
+                            column = desc.name,
+                            "failed to parse non-pk column, padding with `NULL`"
+                        );
+                        Ok(A::output_for(Datum::None))
+                    }
+                }
             }
         };
 
@@ -300,7 +326,7 @@ impl SourceStreamChunkRowWriter<'_> {
         let result = (self.descs.iter())
             .zip_eq_fast(self.builders.iter_mut())
             .try_for_each(|(desc, builder)| {
-                f_with_meta(desc).map(|output| {
+                wrapped_f(desc).map(|output| {
                     A::apply(builder, output);
                     applied_columns.push(builder);
                 })
@@ -312,7 +338,6 @@ impl SourceStreamChunkRowWriter<'_> {
                 Ok(())
             }
             Err(e) => {
-                tracing::warn!("failed to parse source data: {}", e);
                 for builder in applied_columns {
                     A::rollback(builder);
                 }
@@ -321,36 +346,36 @@ impl SourceStreamChunkRowWriter<'_> {
         }
     }
 
-    /// Write an `Insert` record to the [`StreamChunk`].
-    ///
-    /// # Arguments
+    /// Write an `Insert` record to the [`StreamChunk`], with the given fallible closure that
+    /// produces one [`Datum`] by corresponding [`SourceColumnDesc`].
     ///
-    /// * `f`: A failable closure that produced one [`Datum`] by corresponding [`SourceColumnDesc`].
-    ///   Callers only need to handle columns with the type [`SourceColumnType::Normal`].
-    pub fn insert(&mut self, f: impl FnMut(&SourceColumnDesc) -> Result<Datum>) -> Result<()> {
+    /// See the [struct-level documentation](SourceStreamChunkRowWriter) for more details.
+    pub fn insert(
+        &mut self,
+        f: impl FnMut(&SourceColumnDesc) -> AccessResult<Datum>,
+    ) -> AccessResult<()> {
         self.do_action::<OpActionInsert>(f)
     }
 
-    /// Write a `Delete` record to the [`StreamChunk`].
-    ///
-    /// # Arguments
+    /// Write a `Delete` record to the [`StreamChunk`], with the given fallible closure that
+    /// produces one [`Datum`] by corresponding [`SourceColumnDesc`].
     ///
-    /// * `f`: A failable closure that produced one [`Datum`] by corresponding [`SourceColumnDesc`].
-    ///   Callers only need to handle columns with the type [`SourceColumnType::Normal`].
-    pub fn delete(&mut self, f: impl FnMut(&SourceColumnDesc) -> Result<Datum>) -> Result<()> {
+    /// See the [struct-level documentation](SourceStreamChunkRowWriter) for more details.
+    pub fn delete(
+        &mut self,
+        f: impl FnMut(&SourceColumnDesc) -> AccessResult<Datum>,
+    ) -> AccessResult<()> {
         self.do_action::<OpActionDelete>(f)
     }
 
-    /// Write a `Update` record to the [`StreamChunk`].
+    /// Write a `Update` record to the [`StreamChunk`], with the given fallible closure that
+    /// produces two [`Datum`]s as old and new value by corresponding [`SourceColumnDesc`].
     ///
-    /// # Arguments
-    ///
-    /// * `f`: A failable closure that produced two [`Datum`]s as old and new value by corresponding
-    ///   [`SourceColumnDesc`]. Callers only need to handle columns with the type [`SourceColumnType::Normal`].
+    /// See the [struct-level documentation](SourceStreamChunkRowWriter) for more details.
     pub fn update(
         &mut self,
-        f: impl FnMut(&SourceColumnDesc) -> Result<(Datum, Datum)>,
-    ) -> Result<()> {
+        f: impl FnMut(&SourceColumnDesc) -> AccessResult<(Datum, Datum)>,
+    ) -> AccessResult<()> {
         self.do_action::<OpActionUpdate>(f)
     }
 }
@@ -380,7 +405,9 @@ pub trait ByteStreamSourceParser: Send + Debug + Sized + 'static {
     /// The source context, used to report parsing error.
     fn source_ctx(&self) -> &SourceContext;
 
-    /// Parse one record from the given `payload` and write it to the `writer`.
+    /// Parse one record from the given `payload` and write rows to the `writer`.
+    ///
+    /// Returns error if **any** of the rows in the message failed to parse.
     fn parse_one<'a>(
         &'a mut self,
         key: Option<Vec<u8>>,
@@ -388,11 +415,13 @@ pub trait ByteStreamSourceParser: Send + Debug + Sized + 'static {
         writer: SourceStreamChunkRowWriter<'a>,
     ) -> impl Future<Output = Result<()>> + Send + 'a;
 
-    /// Parse one record from the given `payload`, either write it to the `writer` or interpret it
+    /// Parse one record from the given `payload`, either write rows to the `writer` or interpret it
     /// as a transaction control message.
     ///
     /// The default implementation forwards to [`ByteStreamSourceParser::parse_one`] for
     /// non-transactional sources.
+    ///
+    /// Returns error if **any** of the rows in the message failed to parse.
     fn parse_one_with_txn<'a>(
         &'a mut self,
         key: Option<Vec<u8>>,
@@ -402,7 +431,10 @@ pub trait ByteStreamSourceParser: Send + Debug + Sized + 'static {
         self.parse_one(key, payload, writer)
             .map_ok(|_| ParseResult::Rows)
     }
+}
 
+#[easy_ext::ext(SourceParserIntoStreamExt)]
+impl<P: ByteStreamSourceParser> P {
     /// Parse a data stream of one source split into a stream of [`StreamChunk`].
     ///
     /// # Arguments
@@ -411,9 +443,17 @@ pub trait ByteStreamSourceParser: Send + Debug + Sized + 'static {
     ///
     /// # Returns
     ///
-    /// A [`crate::source::BoxSourceWithStateStream`] which is a stream of parsed msgs.
-    fn into_stream(self, data_stream: BoxSourceStream) -> impl SourceWithStateStream {
-        into_chunk_stream(self, data_stream)
+    /// A [`SourceWithStateStream`] which is a stream of parsed messages.
+    pub fn into_stream(self, data_stream: BoxSourceStream) -> impl SourceWithStateStream {
+        // Enable tracing to provide more information for parsing failures.
+        let source_info = &self.source_ctx().source_info;
+        let span = tracing::info_span!(
+            "source_parser",
+            actor_id = source_info.actor_id,
+            source_id = source_info.source_id.table_id()
+        );
+
+        into_chunk_stream(self, data_stream).instrument(span)
     }
 }
 
@@ -474,6 +514,11 @@ async fn into_chunk_stream<P: ByteStreamSourceParser>(mut parser: P, data_stream
             if msg.key.is_none() && msg.payload.is_none() {
                 continue;
             }
+            let parse_span = tracing::info_span!(
+                "parse_one",
+                split_id = msg.split_id.as_ref(),
+                offset = msg.offset
+            );
 
             split_offset_mapping.insert(msg.split_id, msg.offset.clone());
 
@@ -487,9 +532,12 @@ async fn into_chunk_stream<P: ByteStreamSourceParser>(mut parser: P, data_stream
                         offset: msg.offset,
                     }),
                 )
+                .instrument(parse_span.clone())
                 .await
             {
-                Ok(ParseResult::Rows) => {
+                // It's possible that parsing multiple rows in a single message PARTIALLY failed.
+                // We still have to maintain the row number in this case.
+                res @ (Ok(ParseResult::Rows) | Err(_)) => {
                     // The number of rows added to the builder.
                     let num = builder.op_num() - old_op_num;
 
@@ -497,20 +545,25 @@ async fn into_chunk_stream<P: ByteStreamSourceParser>(mut parser: P, data_stream
                     if let Some(Transaction { len, .. }) = &mut current_transaction {
                         *len += num;
                     }
+
+                    if let Err(error) = res {
+                        tracing::error!(parent: &parse_span, %error, "failed to parse message, skipping");
+                        parser.source_ctx().report_user_source_error(error);
+                    }
                 }
 
                 Ok(ParseResult::TransactionControl(txn_ctl)) => {
                     match txn_ctl {
                         TransactionControl::Begin { id } => {
                             if let Some(Transaction { id: current_id, .. }) = &current_transaction {
-                                tracing::warn!(current_id, id, "already in transaction");
+                                tracing::warn!(parent: &parse_span, current_id, id, "already in transaction");
                             }
                             current_transaction = Some(Transaction { id, len: 0 });
                         }
                         TransactionControl::Commit { id } => {
                             let current_id = current_transaction.as_ref().map(|t| &t.id);
                             if current_id != Some(&id) {
-                                tracing::warn!(?current_id, id, "transaction id mismatch");
+                                tracing::warn!(parent: &parse_span, ?current_id, id, "transaction id mismatch");
                             }
                             current_transaction = None;
                         }
@@ -526,13 +579,6 @@ async fn into_chunk_stream<P: ByteStreamSourceParser>(mut parser: P, data_stream
                         };
                     }
                 }
-
-                Err(error) => {
-                    tracing::warn!(%error, "message parsing failed, skipping");
-                    // Skip for batch
-                    parser.source_ctx().report_user_source_error(error);
-                    continue;
-                }
             }
         }
 
@@ -614,11 +660,11 @@ pub enum ByteStreamSourceParserImpl {
     CanalJson(CanalJsonParser),
 }
 
-pub type ParserStream = impl SourceWithStateStream + Unpin;
+pub type ParsedStreamImpl = impl SourceWithStateStream + Unpin;
 
 impl ByteStreamSourceParserImpl {
     /// Converts this parser into a stream of [`StreamChunk`].
-    pub fn into_stream(self, msg_stream: BoxSourceStream) -> ParserStream {
+    pub fn into_stream(self, msg_stream: BoxSourceStream) -> ParsedStreamImpl {
         #[auto_enum(futures03::Stream)]
         let stream = match self {
             Self::Csv(parser) => parser.into_stream(msg_stream),
diff --git a/src/connector/src/parser/plain_parser.rs b/src/connector/src/parser/plain_parser.rs
index 53f87660de031..0b42b1e66ab7c 100644
--- a/src/connector/src/parser/plain_parser.rs
+++ b/src/connector/src/parser/plain_parser.rs
@@ -62,7 +62,7 @@ impl PlainParser {
     ) -> Result<()> {
         let accessor = self.payload_builder.generate_accessor(payload).await?;
 
-        apply_row_accessor_on_stream_chunk_writer(accessor, &mut writer)
+        apply_row_accessor_on_stream_chunk_writer(accessor, &mut writer).map_err(Into::into)
     }
 }
 
diff --git a/src/connector/src/parser/unified/mod.rs b/src/connector/src/parser/unified/mod.rs
index 39ea6acfb3c7d..b76f22e59d58d 100644
--- a/src/connector/src/parser/unified/mod.rs
+++ b/src/connector/src/parser/unified/mod.rs
@@ -32,7 +32,7 @@ pub mod protobuf;
 pub mod upsert;
 pub mod util;
 
-pub type AccessResult = std::result::Result<Datum, AccessError>;
+pub type AccessResult<T = Datum> = std::result::Result<T, AccessError>;
 
 /// Access a certain field in an object according to the path
 pub trait Access {
@@ -87,14 +87,16 @@ where
 
 #[derive(Error, Debug)]
 pub enum AccessError {
-    #[error("Undefined {name} at {path}")]
+    #[error("Undefined field `{name}` at `{path}`")]
     Undefined { name: String, path: String },
-    #[error("TypeError {expected} expected, got {got} {value}")]
+    #[error("Expected type `{expected}` but got `{got}` for `{value}`")]
     TypeError {
         expected: String,
         got: String,
         value: String,
     },
+    #[error("Unsupported data type `{ty}`")]
+    UnsupportedType { ty: String },
     #[error(transparent)]
     Other(#[from] anyhow::Error),
 }
diff --git a/src/connector/src/parser/unified/util.rs b/src/connector/src/parser/unified/util.rs
index 38b83a3fc00d2..92cf5da3ac81c 100644
--- a/src/connector/src/parser/unified/util.rs
+++ b/src/connector/src/parser/unified/util.rs
@@ -13,73 +13,28 @@
 // limitations under the License.
 
 use risingwave_common::error::{ErrorCode, RwError};
-use risingwave_common::types::Datum;
 
-use super::{Access, AccessError, ChangeEvent};
+use super::{Access, AccessError, AccessResult, ChangeEvent};
 use crate::parser::unified::ChangeEventOperation;
 use crate::parser::SourceStreamChunkRowWriter;
-
-pub fn apply_delete_on_stream_chunk_writer(
-    row_op: impl ChangeEvent,
-    writer: &mut SourceStreamChunkRowWriter<'_>,
-) -> std::result::Result<(), RwError> {
-    writer.delete(|column| {
-        let res = row_op.access_field(&column.name, &column.data_type);
-        match res {
-            Ok(datum) => Ok(datum),
-            Err(e) => {
-                tracing::error!(name=column.name, data_type=%column.data_type, err=%e, "delete column error");
-                if column.is_pk {
-                    // It should be an error when pk column is missing in the message
-                    Err(e)?
-                } else {
-                    Ok(None)
-                }
-            }
-        }
-    })
-}
-
-pub fn apply_upsert_on_stream_chunk_writer(
-    row_op: impl ChangeEvent,
-    writer: &mut SourceStreamChunkRowWriter<'_>,
-) -> std::result::Result<(), RwError> {
-    writer.insert(|column| {
-        let res = match row_op.access_field(&column.name, &column.data_type) {
-            Ok(o) => Ok(o),
-            Err(AccessError::Undefined { name, .. }) if !column.is_pk && name == column.name => {
-                // Fill in null value for non-pk column
-                // TODO: figure out a way to fill in not-null default value if user specifies one
-                Ok(None)
-            }
-            Err(e) => Err(e),
-        };
-        tracing::trace!(
-            "inserted {:?} {:?} is_pk:{:?} {:?} ",
-            &column.name,
-            &column.data_type,
-            &column.is_pk,
-            res
-        );
-        Ok(res?)
-    })
-}
+use crate::source::SourceColumnDesc;
 
 pub fn apply_row_operation_on_stream_chunk_writer_with_op(
     row_op: impl ChangeEvent,
     writer: &mut SourceStreamChunkRowWriter<'_>,
     op: ChangeEventOperation,
-) -> std::result::Result<(), RwError> {
+) -> AccessResult<()> {
+    let f = |column: &SourceColumnDesc| row_op.access_field(&column.name, &column.data_type);
     match op {
-        ChangeEventOperation::Upsert => apply_upsert_on_stream_chunk_writer(row_op, writer),
-        ChangeEventOperation::Delete => apply_delete_on_stream_chunk_writer(row_op, writer),
+        ChangeEventOperation::Upsert => writer.insert(f),
+        ChangeEventOperation::Delete => writer.delete(f),
     }
 }
 
 pub fn apply_row_operation_on_stream_chunk_writer(
     row_op: impl ChangeEvent,
     writer: &mut SourceStreamChunkRowWriter<'_>,
-) -> std::result::Result<(), RwError> {
+) -> AccessResult<()> {
     let op = row_op.op()?;
     apply_row_operation_on_stream_chunk_writer_with_op(row_op, writer, op)
 }
@@ -87,37 +42,8 @@ pub fn apply_row_operation_on_stream_chunk_writer(
 pub fn apply_row_accessor_on_stream_chunk_writer(
     accessor: impl Access,
     writer: &mut SourceStreamChunkRowWriter<'_>,
-) -> Result<(), RwError> {
-    writer.insert(|column| {
-        let res: Result<Datum, RwError> = match accessor
-            .access(&[&column.name], Some(&column.data_type))
-        {
-            Ok(o) => Ok(o),
-            Err(AccessError::Undefined { name, .. }) if !column.is_pk && name == column.name => {
-                // Fill in null value for non-pk column
-                // TODO: figure out a way to fill in not-null default value if user specifies one
-                Ok(None)
-            }
-            Err(e) => {
-                // if want to discard the row, return Err(e) here
-                tracing::warn!(
-                    "access error when fetch {} with type {}, err {:?}. Fill None instead.",
-                    &column.name,
-                    &column.data_type,
-                    e
-                );
-                Ok(None)
-            }
-        };
-        tracing::trace!(
-            "inserted {:?} {:?} is_pk:{:?} {:?} ",
-            &column.name,
-            &column.data_type,
-            &column.is_pk,
-            res
-        );
-        res
-    })
+) -> AccessResult<()> {
+    writer.insert(|column| accessor.access(&[&column.name], Some(&column.data_type)))
 }
 
 impl From<AccessError> for RwError {
diff --git a/src/connector/src/parser/upsert_parser.rs b/src/connector/src/parser/upsert_parser.rs
index 381e429565d11..edc250947d754 100644
--- a/src/connector/src/parser/upsert_parser.rs
+++ b/src/connector/src/parser/upsert_parser.rs
@@ -117,6 +117,7 @@ impl UpsertParser {
         }
 
         apply_row_operation_on_stream_chunk_writer_with_op(row_op, &mut writer, change_event_op)
+            .map_err(Into::into)
     }
 }
 
diff --git a/src/meta/src/barrier/mod.rs b/src/meta/src/barrier/mod.rs
index cd3ee0360009f..e59225c8de510 100644
--- a/src/meta/src/barrier/mod.rs
+++ b/src/meta/src/barrier/mod.rs
@@ -688,9 +688,7 @@ impl GlobalBarrierManager {
         };
 
         // Tracing related stuff
-        prev_epoch.span().in_scope(|| {
-            tracing::info!(target: "rw_tracing", epoch = curr_epoch.value().0, "new barrier enqueued");
-        });
+        tracing::info!(target: "rw_tracing", parent: prev_epoch.span(), epoch = curr_epoch.value().0, "new barrier enqueued");
         span.record("epoch", curr_epoch.value().0);
 
         let command_ctx = Arc::new(CommandContext::new(