Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: support to flatten json object in greptime_identity pipeline #5358

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions src/pipeline/src/etl/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -582,6 +582,22 @@ pub enum Error {
#[snafu(implicit)]
location: Location,
},
#[snafu(display(
"Reached max nested levels when flattening JSON object: {max_nested_levels}"
))]
ReachedMaxNestedLevels {
max_nested_levels: usize,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Failed to serialize to json: {}", input))]
SerializeToJson {
input: String,
#[snafu(source)]
error: serde_json::error::Error,
#[snafu(implicit)]
location: Location,
},
}

pub type Result<T> = std::result::Result<T, Error>;
Expand Down
151 changes: 146 additions & 5 deletions src/pipeline/src/etl/transform/transformer/greptime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,13 @@ use api::v1::{ColumnDataType, ColumnDataTypeExtension, JsonTypeExtension, Semant
use coerce::{coerce_columns, coerce_value};
use greptime_proto::v1::{ColumnSchema, Row, Rows, Value as GreptimeValue};
use itertools::Itertools;
use serde_json::{Map, Number};
use serde_json::{Map, Number, Value as JsonValue};
use snafu::ResultExt;

use crate::etl::error::{
IdentifyPipelineColumnTypeMismatchSnafu, Result, TransformColumnNameMustBeUniqueSnafu,
TransformEmptySnafu, TransformMultipleTimestampIndexSnafu, TransformTimestampIndexCountSnafu,
IdentifyPipelineColumnTypeMismatchSnafu, ReachedMaxNestedLevelsSnafu, Result,
SerializeToJsonSnafu, TransformColumnNameMustBeUniqueSnafu, TransformEmptySnafu,
TransformMultipleTimestampIndexSnafu, TransformTimestampIndexCountSnafu,
UnsupportedNumberTypeSnafu,
};
use crate::etl::field::{InputFieldInfo, OneInputOneOutputField};
Expand All @@ -38,6 +40,7 @@ use crate::etl::transform::{Transform, Transformer, Transforms};
use crate::etl::value::{Timestamp, Value};

const DEFAULT_GREPTIME_TIMESTAMP_COLUMN: &str = "greptime_timestamp";
const DEFAULT_MAX_NESTED_LEVELS_FOR_JSON_FLATTENING: usize = 10;

/// fields not in the columns will be discarded
/// to prevent automatic column creation in GreptimeDB
Expand Down Expand Up @@ -376,7 +379,10 @@ fn identity_pipeline_inner<'a>(
let mut schema_info = SchemaInfo::default();
for value in array {
if let serde_json::Value::Object(map) = value {
let row = json_value_to_row(&mut schema_info, map)?;
let row = json_value_to_row(
&mut schema_info,
flatten_json_object(map, DEFAULT_MAX_NESTED_LEVELS_FOR_JSON_FLATTENING)?,
)?;
rows.push(row);
}
}
Expand Down Expand Up @@ -437,11 +443,72 @@ pub fn identity_pipeline(
}
}

/// Consumes the JSON object and consumes it into a single-level object.
///
/// The `max_nested_levels` parameter is used to limit the nested levels of the JSON object.
/// The error will be returned if the nested levels is greater than the `max_nested_levels`.
pub fn flatten_json_object(
object: Map<String, JsonValue>,
max_nested_levels: usize,
) -> Result<Map<String, JsonValue>> {
let mut flattened = Map::new();

if !object.is_empty() {
// it will use recursion to flatten the object.
do_flatten_json_object(&mut flattened, None, object, 1, max_nested_levels)?;
}

Ok(flattened)
}

fn do_flatten_json_object(
dest: &mut Map<String, JsonValue>,
base: Option<&str>,
object: Map<String, JsonValue>,
current_level: usize,
max_nested_levels: usize,
) -> Result<()> {
// For safety, we do not allow the depth to be greater than the max_object_depth.
if current_level > max_nested_levels {
return ReachedMaxNestedLevelsSnafu { max_nested_levels }.fail();
}

for (key, value) in object {
let new_key = base.map_or_else(|| key.clone(), |base_key| format!("{base_key}.{key}"));

match value {
JsonValue::Object(object) => {
do_flatten_json_object(
dest,
Some(&new_key),
object,
current_level + 1,
max_nested_levels,
)?;
}
// To simplify the process of logs collection scenario, we will convert the array into a string that can be easily handled by full-text search.
JsonValue::Array(array) => {
let array_str = serde_json::to_string(&array).context(SerializeToJsonSnafu {
input: format!("{array:?}"),
Copy link
Preview

Copilot AI Jan 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using format! with {:?} for large arrays could lead to performance issues. Consider using a more efficient serialization method.

Suggested change
input: format!("{array:?}"),
input: serde_json::to_string(&array).unwrap_or_else(|_| "<serialization error>".to_string()),

Copilot is powered by AI, so mistakes are possible. Review output carefully before use.

Positive Feedback
Negative Feedback

Provide additional feedback

Please help us improve GitHub Copilot by sharing more details about this comment.

Please select one or more of the options
})?;
dest.insert(new_key, JsonValue::String(array_str));
}
_ => {
dest.insert(new_key, value);
}
}
}

Ok(())
}

#[cfg(test)]
mod tests {
use api::v1::SemanticType;

use crate::etl::transform::transformer::greptime::identity_pipeline_inner;
use crate::etl::transform::transformer::greptime::{
flatten_json_object, identity_pipeline_inner,
};
use crate::identity_pipeline;

#[test]
Expand Down Expand Up @@ -585,4 +652,78 @@ mod tests {
);
}
}

#[test]
fn test_flatten() {
let test_cases = vec![
// Basic case.
(
serde_json::json!(
{
"a": {
"b": {
"c": [1, 2, 3]
}
},
"d": [
"foo",
"bar"
],
"e": {
"f": [7, 8, 9],
"g": {
"h": 123,
"i": "hello",
"j": {
"k": true
}
}
}
}
),
10,
Some(serde_json::json!(
{
"a.b.c": "[1,2,3]",
"d": "[\"foo\",\"bar\"]",
"e.f": "[7,8,9]",
"e.g.h": 123,
"e.g.i": "hello",
"e.g.j.k": true
}
)),
),
// Test the case where the object has more than 3 nested levels.
(
serde_json::json!(
{
"a": {
"b": {
"c": {
"d": [1, 2, 3]
}
}
},
"e": [
"foo",
"bar"
]
}
),
3,
None,
),
];

for (input, max_depth, expected) in test_cases {
let flattened_object =
flatten_json_object(input.as_object().unwrap().clone(), max_depth);
match flattened_object {
Ok(flattened_object) => {
assert_eq!(&flattened_object, expected.unwrap().as_object().unwrap())
}
Err(_) => assert_eq!(None, expected),
}
}
}
}
Loading