Skip to content

Commit

Permalink
chore: add pipeline lib
Browse files Browse the repository at this point in the history
  • Loading branch information
paomian committed May 28, 2024
1 parent fbc66ec commit cd4d83d
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 63 deletions.
31 changes: 8 additions & 23 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion src/servers/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,8 @@ tower = { workspace = true, features = ["full"] }
tower-http = { version = "0.4", features = ["full"] }
urlencoding = "2.1"
zstd.workspace = true
pipeline = { git = "ssh://[email protected]/GreptimeTeam/pipeline.git", rev = "d3b700c226abc10b5030b38f2a20cc30c1983970" }
#pipeline = { git = "ssh://[email protected]/GreptimeTeam/pipeline.git", rev = "6b88c3c627da9e20f8fd160071e9c69b3ebd4e6a" }
pipeline = { path = "../../../pipeline" }
[target.'cfg(not(windows))'.dependencies]
tikv-jemalloc-ctl = { version = "0.5", features = ["use_std"] }

Expand Down
70 changes: 31 additions & 39 deletions src/servers/src/http/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use std::env;
use std::time::Instant;

use aide::transform::TransformOperation;
use api::v1::{RowInsertRequest, RowInsertRequests};
use api::v1::{RowInsertRequest, RowInsertRequests, Rows};
use axum::extract::{Json, Query, State};
use axum::response::{IntoResponse, Response};
use axum::{Extension, Form};
Expand Down Expand Up @@ -73,18 +73,19 @@ pub struct SqlQuery {
#[derive(Debug, Default, Serialize, Deserialize, JsonSchema)]
pub struct LogIngesterQueryParams {
pub table_name: String,
pub db: Option<String>,
}

fn validate_log_ingester_payload(payload: &Value) -> Result<(), String> {
if !payload.is_object() {
return Err("payload must be an object".to_string());
}

if payload["processors"].as_str().is_none() {
if payload["pipeline_model"].as_str().is_none() {
return Err("processors field is required".to_string());
}

if payload["data"].as_array().is_none() {
if payload["data"].as_array().is_none() && payload["data"].as_object().is_none() {
return Err("data field is required".to_string());
}

Expand All @@ -97,7 +98,7 @@ pub async fn log_ingester(
State(_state): State<LogHandlerRef>,
Query(_query_params): Query<LogIngesterQueryParams>,
Extension(_query_ctx): Extension<QueryContextRef>,
Json(_payload): Json<Value>,
Json(mut _payload): Json<Value>,
) -> HttpResponse {
// TODO (qtang): implement log ingester
// transform payload to RowInsertRequest
Expand All @@ -113,16 +114,17 @@ pub async fn log_ingester(
))
}
};
let processors_ = _payload["processors"].as_str().unwrap();
let data = _payload["data"];
let yaml_content = Content::Yaml(processors_.into());
let processors_ = _payload["pipeline_model"].take();
let processors = processors_.as_str().unwrap();
let data = _payload["data"].take();
let yaml_content = Content::Yaml(processors.into());
let pipeline_: Result<Pipeline<GreptimeTransformer>, String> = parse(&yaml_content);
match pipeline_ {
Ok(pipeline) => {
let pipeline_data = PipelineValue::try_from(data);
match pipeline_data {
Ok(pipeline_data) => {
let transformed_data = pipeline.exec(pipeline_data);
let transformed_data: Result<Rows, String> = pipeline.exec(pipeline_data);
match transformed_data {
Ok(rows) => {
let insert_request = RowInsertRequest {
Expand All @@ -135,43 +137,33 @@ pub async fn log_ingester(
let insert_result =
_state.insert_log(insert_requests, _query_ctx).await;
match insert_result {
Ok(_) => {
return HttpResponse::GreptimedbV1(GreptimedbV1Response {
output: vec![],
execution_time_ms: 0,
resp_metrics: HashMap::new(),
})
}
Err(e) => {
return HttpResponse::Error(ErrorResponse::from_error_message(
StatusCode::InvalidArguments,
e.to_string(),
));
}
Ok(_) => HttpResponse::GreptimedbV1(GreptimedbV1Response {
output: vec![],
execution_time_ms: 0,
resp_metrics: HashMap::new(),
}),
Err(e) => HttpResponse::Error(ErrorResponse::from_error_message(
StatusCode::InvalidArguments,
e.to_string(),
)),
}
}
Err(e) => {
return HttpResponse::Error(ErrorResponse::from_error_message(
StatusCode::InvalidArguments,
e,
))
}
Err(e) => HttpResponse::Error(ErrorResponse::from_error_message(
StatusCode::InvalidArguments,
e,
)),
}
}
Err(e) => {
return HttpResponse::Error(ErrorResponse::from_error_message(
StatusCode::InvalidArguments,
e,
))
}
Err(e) => HttpResponse::Error(ErrorResponse::from_error_message(
StatusCode::InvalidArguments,
e,
)),
}
}
Err(e) => {
return HttpResponse::Error(ErrorResponse::from_error_message(
StatusCode::InvalidArguments,
e,
))
}
Err(e) => HttpResponse::Error(ErrorResponse::from_error_message(
StatusCode::InvalidArguments,
e,
)),
}
}

Expand Down

0 comments on commit cd4d83d

Please sign in to comment.