Skip to content

Commit

Permalink
refactor(connector): remove JsonParser from production code (#17016)
Browse files Browse the repository at this point in the history
Signed-off-by: Bugen Zhao <[email protected]>
  • Loading branch information
BugenZhao authored May 31, 2024
1 parent 0c8b036 commit 9edfd72
Show file tree
Hide file tree
Showing 4 changed files with 201 additions and 224 deletions.
82 changes: 81 additions & 1 deletion src/connector/benches/json_vs_plain_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,90 @@ mod json_common;
use criterion::{criterion_group, criterion_main, BatchSize, Criterion};
use futures::executor::block_on;
use json_common::*;
use old_json_parser::JsonParser;
use risingwave_connector::parser::plain_parser::PlainParser;
use risingwave_connector::parser::{JsonParser, SourceStreamChunkBuilder, SpecificParserConfig};
use risingwave_connector::parser::{SourceStreamChunkBuilder, SpecificParserConfig};
use risingwave_connector::source::SourceContext;

// The original implementation used to parse JSON prior to #13707.
mod old_json_parser {
use anyhow::Context as _;
use itertools::{Either, Itertools as _};
use risingwave_common::{bail, try_match_expand};
use risingwave_connector::error::ConnectorResult;
use risingwave_connector::parser::{
Access as _, EncodingProperties, JsonAccess, SourceStreamChunkRowWriter,
};
use risingwave_connector::source::{SourceColumnDesc, SourceContextRef};

use super::*;

/// Parser for JSON format
#[derive(Debug)]
pub struct JsonParser {
_rw_columns: Vec<SourceColumnDesc>,
_source_ctx: SourceContextRef,
// If schema registry is used, the starting index of payload is 5.
payload_start_idx: usize,
}

impl JsonParser {
pub fn new(
props: SpecificParserConfig,
rw_columns: Vec<SourceColumnDesc>,
source_ctx: SourceContextRef,
) -> ConnectorResult<Self> {
let json_config = try_match_expand!(props.encoding_config, EncodingProperties::Json)?;
let payload_start_idx = if json_config.use_schema_registry {
5
} else {
0
};
Ok(Self {
_rw_columns: rw_columns,
_source_ctx: source_ctx,
payload_start_idx,
})
}

#[allow(clippy::unused_async)]
pub async fn parse_inner(
&self,
mut payload: Vec<u8>,
mut writer: SourceStreamChunkRowWriter<'_>,
) -> ConnectorResult<()> {
let value = simd_json::to_borrowed_value(&mut payload[self.payload_start_idx..])
.context("failed to parse json payload")?;
let values = if let simd_json::BorrowedValue::Array(arr) = value {
Either::Left(arr.into_iter())
} else {
Either::Right(std::iter::once(value))
};

let mut errors = Vec::new();
for value in values {
let accessor = JsonAccess::new(value);
match writer
.insert(|column| accessor.access(&[&column.name], Some(&column.data_type)))
{
Ok(_) => {}
Err(err) => errors.push(err),
}
}

if errors.is_empty() {
Ok(())
} else {
bail!(
"failed to parse {} row(s) in a single json message: {}",
errors.len(),
errors.iter().format(", ")
);
}
}
}
}

fn generate_json_rows() -> Vec<Vec<u8>> {
let mut rng = rand::thread_rng();
let mut records = Vec::with_capacity(NUM_RECORDS);
Expand Down
Loading

0 comments on commit 9edfd72

Please sign in to comment.