diff --git a/src/connector/src/parser/mod.rs b/src/connector/src/parser/mod.rs index be0bf9b8d6ca8..0c6469361dfbd 100644 --- a/src/connector/src/parser/mod.rs +++ b/src/connector/src/parser/mod.rs @@ -21,7 +21,7 @@ pub use avro::AvroParserConfig; pub use canal::*; use csv_parser::CsvParser; pub use debezium::*; -use futures::{Future, TryFutureExt, TryStreamExt}; +use futures::{Future, TryFutureExt}; use futures_async_stream::try_stream; pub use json_parser::*; pub use protobuf::*; @@ -572,15 +572,13 @@ impl P { // The parser stream will be long-lived. We use `instrument_with` here to create // a new span for the polling of each chunk. - into_chunk_stream(self, data_stream) - .instrument_with(move || { - tracing::info_span!( - "source_parse_chunk", - actor_id = source_info.actor_id, - source_id = source_info.source_id.table_id() - ) - }) - .map_err(Into::into) // TODO(eh): remove this + into_chunk_stream(self, data_stream).instrument_with(move || { + tracing::info_span!( + "source_parse_chunk", + actor_id = source_info.actor_id, + source_id = source_info.source_id.table_id() + ) + }) } } @@ -730,7 +728,6 @@ async fn into_chunk_stream(mut parser: P, data_stream } pub trait AccessBuilder { - // TODO(eh) async fn generate_accessor(&mut self, payload: Vec) -> anyhow::Result>; } diff --git a/src/connector/src/parser/protobuf/parser.rs b/src/connector/src/parser/protobuf/parser.rs index 0f6f2106da543..d62ac0f9a1cdf 100644 --- a/src/connector/src/parser/protobuf/parser.rs +++ b/src/connector/src/parser/protobuf/parser.rs @@ -341,7 +341,6 @@ fn recursive_parse_json( serde_json::Value::Object(ret) } -// TODO(eh): should use `AccessError` pub fn from_protobuf_value( field_desc: &FieldDescriptor, value: &Value,