Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(connector): remove JsonParser from production code #17016

Merged
merged 2 commits into from
May 31, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
82 changes: 81 additions & 1 deletion src/connector/benches/json_vs_plain_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,90 @@ mod json_common;
use criterion::{criterion_group, criterion_main, BatchSize, Criterion};
use futures::executor::block_on;
use json_common::*;
use old_json_parser::JsonParser;
use risingwave_connector::parser::plain_parser::PlainParser;
use risingwave_connector::parser::{JsonParser, SourceStreamChunkBuilder, SpecificParserConfig};
use risingwave_connector::parser::{SourceStreamChunkBuilder, SpecificParserConfig};
use risingwave_connector::source::SourceContext;

// The original implementation used to parse JSON prior to #13707.
mod old_json_parser {
use anyhow::Context as _;
use itertools::{Either, Itertools as _};
use risingwave_common::{bail, try_match_expand};
use risingwave_connector::error::ConnectorResult;
use risingwave_connector::parser::{
Access as _, EncodingProperties, JsonAccess, SourceStreamChunkRowWriter,
};
use risingwave_connector::source::{SourceColumnDesc, SourceContextRef};

use super::*;

/// Parser for JSON format
#[derive(Debug)]
pub struct JsonParser {
_rw_columns: Vec<SourceColumnDesc>,
_source_ctx: SourceContextRef,
// If schema registry is used, the starting index of payload is 5.
payload_start_idx: usize,
}

impl JsonParser {
pub fn new(
props: SpecificParserConfig,
rw_columns: Vec<SourceColumnDesc>,
source_ctx: SourceContextRef,
) -> ConnectorResult<Self> {
let json_config = try_match_expand!(props.encoding_config, EncodingProperties::Json)?;
let payload_start_idx = if json_config.use_schema_registry {
5
} else {
0
};
Ok(Self {
_rw_columns: rw_columns,
_source_ctx: source_ctx,
payload_start_idx,
})
}

#[allow(clippy::unused_async)]
pub async fn parse_inner(
&self,
mut payload: Vec<u8>,
mut writer: SourceStreamChunkRowWriter<'_>,
) -> ConnectorResult<()> {
let value = simd_json::to_borrowed_value(&mut payload[self.payload_start_idx..])
.context("failed to parse json payload")?;
let values = if let simd_json::BorrowedValue::Array(arr) = value {
Either::Left(arr.into_iter())
} else {
Either::Right(std::iter::once(value))
};

let mut errors = Vec::new();
for value in values {
let accessor = JsonAccess::new(value);
match writer
.insert(|column| accessor.access(&[&column.name], Some(&column.data_type)))
{
Ok(_) => {}
Err(err) => errors.push(err),
}
}

if errors.is_empty() {
Ok(())
} else {
bail!(
"failed to parse {} row(s) in a single json message: {}",
errors.len(),
errors.iter().format(", ")
);
}
}
}
}

fn generate_json_rows() -> Vec<Vec<u8>> {
let mut rng = rand::thread_rng();
let mut records = Vec::with_capacity(NUM_RECORDS);
Expand Down
Loading
Loading