From d0128ee5efcf0d7b97242b0dcf08115a3738f498 Mon Sep 17 00:00:00 2001 From: paomian Date: Mon, 27 May 2024 17:34:31 +0800 Subject: [PATCH] chore: add pipeline lib --- Cargo.lock | 31 ++++------------ src/servers/Cargo.toml | 3 +- src/servers/src/http/handler.rs | 65 ++++++++++++++------------------- 3 files changed, 38 insertions(+), 61 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 7db551ab64d8..3b0ee56188dd 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -222,7 +222,7 @@ dependencies = [ "common-macro", "common-time", "datatypes", - "greptime-proto 0.1.0 (git+https://github.com/GreptimeTeam/greptime-proto.git?rev=902f75fdd170c572e90b1f640161d90995f20218)", + "greptime-proto", "paste", "prost 0.12.4", "snafu 0.8.2", @@ -2113,7 +2113,7 @@ dependencies = [ "backtrace", "common-error", "console-subscriber", - "greptime-proto 0.1.0 (git+https://github.com/GreptimeTeam/greptime-proto.git?rev=902f75fdd170c572e90b1f640161d90995f20218)", + "greptime-proto", "lazy_static", "once_cell", "opentelemetry 0.21.0", @@ -3846,7 +3846,7 @@ dependencies = [ "enum-as-inner", "enum_dispatch", "futures", - "greptime-proto 0.1.0 (git+https://github.com/GreptimeTeam/greptime-proto.git?rev=902f75fdd170c572e90b1f640161d90995f20218)", + "greptime-proto", "hydroflow", "itertools 0.10.5", "minstant", @@ -4228,20 +4228,6 @@ dependencies = [ "tonic-build 0.11.0", ] -[[package]] -name = "greptime-proto" -version = "0.1.0" -source = "git+https://github.com/GreptimeTeam/greptime-proto.git#31dd060fc3c84836d0b4997b11c22da122084dae" -dependencies = [ - "prost 0.12.4", - "serde", - "serde_json", - "strum 0.25.0", - "strum_macros 0.25.3", - "tonic 0.11.0", - "tonic-build 0.11.0", -] - [[package]] name = "h2" version = "0.3.26" @@ -4733,7 +4719,7 @@ dependencies = [ "common-telemetry", "fst", "futures", - "greptime-proto 0.1.0 (git+https://github.com/GreptimeTeam/greptime-proto.git?rev=902f75fdd170c572e90b1f640161d90995f20218)", + "greptime-proto", "mockall", "pin-project", "prost 0.12.4", @@ -7181,12 +7167,11 @@ checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" [[package]] name = "pipeline" version = "0.1.0" -source = "git+ssh://git@github.com/GreptimeTeam/pipeline.git?rev=d3b700c226abc10b5030b38f2a20cc30c1983970#d3b700c226abc10b5030b38f2a20cc30c1983970" dependencies = [ "chrono", "chrono-tz 0.9.0", "csv", - "greptime-proto 0.1.0 (git+https://github.com/GreptimeTeam/greptime-proto.git)", + "greptime-proto", "itertools 0.12.1", "serde_json", "urlencoding", @@ -7585,7 +7570,7 @@ dependencies = [ "datafusion-functions 37.0.0", "datatypes", "futures", - "greptime-proto 0.1.0 (git+https://github.com/GreptimeTeam/greptime-proto.git?rev=902f75fdd170c572e90b1f640161d90995f20218)", + "greptime-proto", "itertools 0.10.5", "lazy_static", "prometheus", @@ -7926,7 +7911,7 @@ dependencies = [ "format_num", "futures", "futures-util", - "greptime-proto 0.1.0 (git+https://github.com/GreptimeTeam/greptime-proto.git?rev=902f75fdd170c572e90b1f640161d90995f20218)", + "greptime-proto", "humantime", "lazy_static", "meter-core", @@ -10562,7 +10547,7 @@ dependencies = [ "datatypes", "derive_builder 0.12.0", "futures", - "greptime-proto 0.1.0 (git+https://github.com/GreptimeTeam/greptime-proto.git?rev=902f75fdd170c572e90b1f640161d90995f20218)", + "greptime-proto", "humantime", "humantime-serde", "parquet", diff --git a/src/servers/Cargo.toml b/src/servers/Cargo.toml index 7421aa01f630..791eac7a1825 100644 --- a/src/servers/Cargo.toml +++ b/src/servers/Cargo.toml @@ -104,7 +104,8 @@ tower = { workspace = true, features = ["full"] } tower-http = { version = "0.4", features = ["full"] } urlencoding = "2.1" zstd.workspace = true -pipeline = { git = "ssh://git@github.com/GreptimeTeam/pipeline.git", rev = "d3b700c226abc10b5030b38f2a20cc30c1983970" } +#pipeline = { git = "ssh://git@github.com/GreptimeTeam/pipeline.git", rev = "6b88c3c627da9e20f8fd160071e9c69b3ebd4e6a" } +pipeline = { path = "../../../pipeline" } [target.'cfg(not(windows))'.dependencies] tikv-jemalloc-ctl = { version = "0.5", features = ["use_std"] } diff --git a/src/servers/src/http/handler.rs b/src/servers/src/http/handler.rs index 265e315fbc39..0bec89428fd4 100644 --- a/src/servers/src/http/handler.rs +++ b/src/servers/src/http/handler.rs @@ -17,7 +17,7 @@ use std::env; use std::time::Instant; use aide::transform::TransformOperation; -use api::v1::{RowInsertRequest, RowInsertRequests}; +use api::v1::{RowInsertRequest, RowInsertRequests, Rows}; use axum::extract::{Json, Query, State}; use axum::response::{IntoResponse, Response}; use axum::{Extension, Form}; @@ -97,7 +97,7 @@ pub async fn log_ingester( State(_state): State, Query(_query_params): Query, Extension(_query_ctx): Extension, - Json(_payload): Json, + Json(mut _payload): Json, ) -> HttpResponse { // TODO (qtang): implement log ingester // transform payload to RowInsertRequest @@ -113,16 +113,17 @@ pub async fn log_ingester( )) } }; - let processors_ = _payload["processors"].as_str().unwrap(); - let data = _payload["data"]; - let yaml_content = Content::Yaml(processors_.into()); + let processors_ = _payload["processors"].take(); + let processors = processors_.as_str().unwrap(); + let data = _payload["data"].take(); + let yaml_content = Content::Yaml(processors.into()); let pipeline_: Result, String> = parse(&yaml_content); match pipeline_ { Ok(pipeline) => { let pipeline_data = PipelineValue::try_from(data); match pipeline_data { Ok(pipeline_data) => { - let transformed_data = pipeline.exec(pipeline_data); + let transformed_data: Result = pipeline.exec(pipeline_data); match transformed_data { Ok(rows) => { let insert_request = RowInsertRequest { @@ -135,43 +136,33 @@ pub async fn log_ingester( let insert_result = _state.insert_log(insert_requests, _query_ctx).await; match insert_result { - Ok(_) => { - return HttpResponse::GreptimedbV1(GreptimedbV1Response { - output: vec![], - execution_time_ms: 0, - resp_metrics: HashMap::new(), - }) - } - Err(e) => { - return HttpResponse::Error(ErrorResponse::from_error_message( - StatusCode::InvalidArguments, - e.to_string(), - )); - } + Ok(_) => HttpResponse::GreptimedbV1(GreptimedbV1Response { + output: vec![], + execution_time_ms: 0, + resp_metrics: HashMap::new(), + }), + Err(e) => HttpResponse::Error(ErrorResponse::from_error_message( + StatusCode::InvalidArguments, + e.to_string(), + )), } } - Err(e) => { - return HttpResponse::Error(ErrorResponse::from_error_message( - StatusCode::InvalidArguments, - e, - )) - } + Err(e) => HttpResponse::Error(ErrorResponse::from_error_message( + StatusCode::InvalidArguments, + e, + )), } } - Err(e) => { - return HttpResponse::Error(ErrorResponse::from_error_message( - StatusCode::InvalidArguments, - e, - )) - } + Err(e) => HttpResponse::Error(ErrorResponse::from_error_message( + StatusCode::InvalidArguments, + e, + )), } } - Err(e) => { - return HttpResponse::Error(ErrorResponse::from_error_message( - StatusCode::InvalidArguments, - e, - )) - } + Err(e) => HttpResponse::Error(ErrorResponse::from_error_message( + StatusCode::InvalidArguments, + e, + )), } }