Skip to content

Commit

Permalink
chore: add more info for pipeline dryrun API (#5232)
Browse files Browse the repository at this point in the history
  • Loading branch information
paomian authored Dec 26, 2024
1 parent 00ad27d commit 0cf44e1
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 5 deletions.
1 change: 1 addition & 0 deletions src/servers/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,7 @@ pub enum Error {

#[snafu(display("Pipeline management api error"))]
Pipeline {
#[snafu(source)]
source: pipeline::error::Error,
#[snafu(implicit)]
location: Location,
Expand Down
31 changes: 26 additions & 5 deletions src/servers/src/http/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ use axum::http::{Request, StatusCode};
use axum::response::{IntoResponse, Response};
use axum::{async_trait, BoxError, Extension, Json, TypedHeader};
use bytes::Bytes;
use common_error::ext::ErrorExt;
use common_query::prelude::GREPTIME_TIMESTAMP;
use common_query::{Output, OutputData};
use common_telemetry::{error, warn};
Expand All @@ -41,13 +42,13 @@ use pipeline::util::to_pipeline_version;
use pipeline::{GreptimeTransformer, PipelineVersion};
use prost::Message;
use serde::{Deserialize, Serialize};
use serde_json::{Deserializer, Map, Value};
use serde_json::{json, Deserializer, Map, Value};
use session::context::{Channel, QueryContext, QueryContextRef};
use snafu::{ensure, OptionExt, ResultExt};

use crate::error::{
CatalogSnafu, DecodeOtlpRequestSnafu, Error, InvalidParameterSnafu, ParseJson5Snafu,
ParseJsonSnafu, PipelineSnafu, Result, UnsupportedContentTypeSnafu,
status_code_to_http_status, CatalogSnafu, DecodeOtlpRequestSnafu, Error, InvalidParameterSnafu,
ParseJson5Snafu, ParseJsonSnafu, PipelineSnafu, Result, UnsupportedContentTypeSnafu,
};
use crate::http::extractor::LogTableName;
use crate::http::header::CONTENT_TYPE_PROTOBUF_STR;
Expand Down Expand Up @@ -404,6 +405,14 @@ fn check_data_valid(data_len: usize) -> Result<()> {
Ok(())
}

fn add_step_info_for_pipeline_dryrun_error(step_msg: &str, e: Error) -> Response {
let body = Json(json!({
"error": format!("{}: {}", step_msg,e.output_msg()),
}));

(status_code_to_http_status(&e.status_code()), body).into_response()
}

#[axum_macros::debug_handler]
pub async fn pipeline_dryrun(
State(log_state): State<LogState>,
Expand Down Expand Up @@ -431,8 +440,20 @@ pub async fn pipeline_dryrun(
dryrun_pipeline_inner(data, &pipeline)
}
Some(pipeline) => {
let pipeline = handler.build_pipeline(&pipeline)?;
dryrun_pipeline_inner(data, &pipeline)
let pipeline = handler.build_pipeline(&pipeline);
match pipeline {
Ok(pipeline) => match dryrun_pipeline_inner(data, &pipeline) {
Ok(response) => Ok(response),
Err(e) => Ok(add_step_info_for_pipeline_dryrun_error(
"Failed to exec pipeline",
e,
)),
},
Err(e) => Ok(add_step_info_for_pipeline_dryrun_error(
"Failed to build pipeline",
e,
)),
}
}
}
}
Expand Down

0 comments on commit 0cf44e1

Please sign in to comment.