From 6500c1edd93b8cd1bcc949a59d38bfce7f38d669 Mon Sep 17 00:00:00 2001 From: stonepage <40830455+st1page@users.noreply.github.com> Date: Wed, 20 Sep 2023 18:56:56 +0800 Subject: [PATCH] fix(sink): handle visibility in remote sink (#12463) --- src/common/src/array/arrow.rs | 6 ++++-- src/common/src/array/stream_chunk.rs | 3 +++ src/connector/src/sink/clickhouse.rs | 4 ++++ src/connector/src/sink/remote.rs | 2 +- src/connector/src/sink/utils.rs | 2 +- 5 files changed, 13 insertions(+), 4 deletions(-) diff --git a/src/common/src/array/arrow.rs b/src/common/src/array/arrow.rs index 0f89e6b4f53f4..00d418af0010f 100644 --- a/src/common/src/array/arrow.rs +++ b/src/common/src/array/arrow.rs @@ -26,12 +26,14 @@ use crate::types::{Int256, StructType}; use crate::util::iter_util::ZipEqDebug; // Implement bi-directional `From` between `DataChunk` and `arrow_array::RecordBatch`. - -// note: DataChunk -> arrow RecordBatch will IGNORE the visibilities. impl TryFrom<&DataChunk> for arrow_array::RecordBatch { type Error = ArrayError; fn try_from(chunk: &DataChunk) -> Result { + if !chunk.is_compacted() { + let c = chunk.clone(); + return Self::try_from(&c.compact()); + } let columns: Vec<_> = chunk .columns() .iter() diff --git a/src/common/src/array/stream_chunk.rs b/src/common/src/array/stream_chunk.rs index d072c65ad1826..897bc95ec9992 100644 --- a/src/common/src/array/stream_chunk.rs +++ b/src/common/src/array/stream_chunk.rs @@ -189,6 +189,9 @@ impl StreamChunk { } pub fn to_protobuf(&self) -> PbStreamChunk { + if !self.is_compacted() { + return self.clone().compact().to_protobuf(); + } PbStreamChunk { cardinality: self.cardinality() as u32, ops: self.ops.iter().map(|op| op.to_protobuf() as i32).collect(), diff --git a/src/connector/src/sink/clickhouse.rs b/src/connector/src/sink/clickhouse.rs index 187b87397dbf4..841b7fd0c6340 100644 --- a/src/connector/src/sink/clickhouse.rs +++ b/src/connector/src/sink/clickhouse.rs @@ -301,6 +301,10 @@ impl ClickHouseSinkWriter { )?; for (op, row) in chunk.rows() { if op != Op::Insert { + tracing::warn!( + "append only click house sink receive an {:?} which will be ignored.", + op + ); continue; } let mut clickhouse_filed_vec = vec![]; diff --git a/src/connector/src/sink/remote.rs b/src/connector/src/sink/remote.rs index 851c81b8916d0..83ec62e0d7649 100644 --- a/src/connector/src/sink/remote.rs +++ b/src/connector/src/sink/remote.rs @@ -350,7 +350,7 @@ where async fn write_batch(&mut self, chunk: StreamChunk) -> Result<()> { let payload = match self.payload_format { SinkPayloadFormat::Json => { - let mut row_ops = vec![]; + let mut row_ops = Vec::with_capacity(chunk.cardinality()); let enc = JsonEncoder::new(&self.schema, None, TimestampHandlingMode::String); for (op, row_ref) in chunk.rows() { let map = enc.encode(row_ref)?; diff --git a/src/connector/src/sink/utils.rs b/src/connector/src/sink/utils.rs index b68164beea206..fd25b24f8a7da 100644 --- a/src/connector/src/sink/utils.rs +++ b/src/connector/src/sink/utils.rs @@ -21,7 +21,7 @@ use crate::sink::Result; pub fn chunk_to_json(chunk: StreamChunk, schema: &Schema) -> Result> { let encoder = JsonEncoder::new(schema, None, TimestampHandlingMode::Milli); - let mut records: Vec = Vec::with_capacity(chunk.capacity()); + let mut records: Vec = Vec::with_capacity(chunk.cardinality()); for (_, row) in chunk.rows() { let record = Value::Object(encoder.encode(row)?); records.push(record.to_string());