Skip to content

Commit

Permalink
feat: delete pipeline (#4156)
Browse files Browse the repository at this point in the history
* feat: add delete for pipeline

* chore: remove unused code

* refactor: delete pipeline

* chore: add pipeline management api metrics

* chore: minor cr issues

* chore: add unit test

* chore: fix cr issue

* fix: test

* chore: add `GreptimedbManageResponse`

* fix: typo

* fix: typo
  • Loading branch information
shuiyisong authored Jul 5, 2024
1 parent c21e969 commit 849e0b9
Show file tree
Hide file tree
Showing 28 changed files with 739 additions and 173 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

21 changes: 12 additions & 9 deletions src/frontend/src/instance/log_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,8 @@ use async_trait::async_trait;
use auth::{PermissionChecker, PermissionCheckerRef, PermissionReq};
use client::Output;
use common_error::ext::BoxedError;
use pipeline::table::{PipelineInfo, PipelineVersion};
use pipeline::{GreptimeTransformer, Pipeline};
use servers::error::{
AuthSnafu, ExecuteGrpcRequestSnafu, PipelineSnafu, Result as ServerResult,
UnsupportedDeletePipelineSnafu,
};
use pipeline::{GreptimeTransformer, Pipeline, PipelineInfo, PipelineVersion};
use servers::error::{AuthSnafu, ExecuteGrpcRequestSnafu, PipelineSnafu, Result as ServerResult};
use servers::query_handler::LogHandler;
use session::context::QueryContextRef;
use snafu::ResultExt;
Expand Down Expand Up @@ -72,9 +68,16 @@ impl LogHandler for Instance {
.context(PipelineSnafu)
}

async fn delete_pipeline(&self, _name: &str, _query_ctx: QueryContextRef) -> ServerResult<()> {
// TODO(qtang): impl delete
Err(UnsupportedDeletePipelineSnafu {}.build())
async fn delete_pipeline(
&self,
name: &str,
version: PipelineVersion,
ctx: QueryContextRef,
) -> ServerResult<Option<()>> {
self.pipeline_operator
.delete_pipeline(name, version, ctx)
.await
.context(PipelineSnafu)
}
}

Expand Down
4 changes: 3 additions & 1 deletion src/frontend/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use common_runtime::Builder as RuntimeBuilder;
use servers::grpc::builder::GrpcServerBuilder;
use servers::grpc::greptime_handler::GreptimeRequestHandler;
use servers::grpc::{GrpcOptions, GrpcServer, GrpcServerConfig};
use servers::http::event::LogValidatorRef;
use servers::http::{HttpServer, HttpServerBuilder};
use servers::metrics_handler::MetricsHandler;
use servers::mysql::server::{MysqlServer, MysqlSpawnConfig, MysqlSpawnRef};
Expand Down Expand Up @@ -89,7 +90,8 @@ where
Some(self.instance.clone()),
);

builder = builder.with_log_ingest_handler(self.instance.clone());
builder = builder
.with_log_ingest_handler(self.instance.clone(), self.plugins.get::<LogValidatorRef>());

if let Some(user_provider) = self.plugins.get::<UserProviderRef>() {
builder = builder.with_user_provider(user_provider);
Expand Down
1 change: 1 addition & 0 deletions src/pipeline/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ common-telemetry.workspace = true
common-time.workspace = true
crossbeam-utils.workspace = true
csv = "1.3.0"
dashmap.workspace = true
datafusion.workspace = true
datafusion-common.workspace = true
datafusion-expr.workspace = true
Expand Down
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
6 changes: 5 additions & 1 deletion src/pipeline/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,12 @@

mod etl;
mod manager;
mod metrics;

pub use etl::transform::GreptimeTransformer;
pub use etl::value::Value;
pub use etl::{parse, Content, Pipeline};
pub use manager::{error, pipeline_operator, table};
pub use manager::{
error, pipeline_operator, table, util, PipelineInfo, PipelineRef, PipelineTableRef,
PipelineVersion,
};
38 changes: 38 additions & 0 deletions src/pipeline/src/manager.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::sync::Arc;

use common_time::Timestamp;
use datatypes::timestamp::TimestampNanosecond;

use crate::table::PipelineTable;
use crate::{GreptimeTransformer, Pipeline};

pub mod error;
pub mod pipeline_operator;
pub mod table;
pub mod util;

/// Pipeline version. An optional timestamp with nanosecond precision.
/// If the version is None, it means the latest version of the pipeline.
/// User can specify the version by providing a timestamp string formatted as iso8601.
/// When it used in cache key, it will be converted to i64 meaning the number of nanoseconds since the epoch.
pub type PipelineVersion = Option<TimestampNanosecond>;

/// Pipeline info. A tuple of timestamp and pipeline reference.
pub type PipelineInfo = (Timestamp, PipelineRef);

pub type PipelineTableRef = Arc<PipelineTable>;
pub type PipelineRef = Arc<Pipeline<GreptimeTransformer>>;
14 changes: 11 additions & 3 deletions src/pipeline/src/manager/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,13 @@ pub enum Error {
#[snafu(implicit)]
location: Location,
},

#[snafu(display("Invalid pipeline version format: {}", version))]
InvalidPipelineVersion {
version: String,
#[snafu(implicit)]
location: Location,
},
}

pub type Result<T> = std::result::Result<T, Error>;
Expand All @@ -113,9 +120,10 @@ impl ErrorExt for Error {
PipelineTableNotFound { .. } => StatusCode::TableNotFound,
InsertPipeline { source, .. } => source.status_code(),
CollectRecords { source, .. } => source.status_code(),
PipelineNotFound { .. } | CompilePipeline { .. } | PipelineTransform { .. } => {
StatusCode::InvalidArguments
}
PipelineNotFound { .. }
| CompilePipeline { .. }
| PipelineTransform { .. }
| InvalidPipelineVersion { .. } => StatusCode::InvalidArguments,
BuildDfLogicalPlan { .. } => StatusCode::Internal,
ExecuteInternalStatement { source, .. } => source.status_code(),
Catalog { source, .. } => source.status_code(),
Expand Down
17 changes: 0 additions & 17 deletions src/pipeline/src/manager/mod.rs

This file was deleted.

67 changes: 48 additions & 19 deletions src/pipeline/src/manager/pipeline_operator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,13 @@

use std::collections::HashMap;
use std::sync::{Arc, RwLock};
use std::time::Instant;

use api::v1::CreateTableExpr;
use catalog::{CatalogManagerRef, RegisterSystemTableRequest};
use common_catalog::consts::{default_engine, DEFAULT_PRIVATE_SCHEMA_NAME};
use common_telemetry::info;
use futures::FutureExt;
use operator::insert::InserterRef;
use operator::statement::StatementExecutorRef;
use query::QueryEngineRef;
Expand All @@ -27,11 +29,14 @@ use snafu::{OptionExt, ResultExt};
use table::TableRef;

use crate::error::{CatalogSnafu, CreateTableSnafu, PipelineTableNotFoundSnafu, Result};
use crate::table::{PipelineInfo, PipelineTable, PipelineTableRef, PipelineVersion};
use crate::manager::{PipelineInfo, PipelineTableRef, PipelineVersion};
use crate::metrics::{
METRIC_PIPELINE_CREATE_HISTOGRAM, METRIC_PIPELINE_DELETE_HISTOGRAM,
METRIC_PIPELINE_RETRIEVE_HISTOGRAM,
};
use crate::table::{PipelineTable, PIPELINE_TABLE_NAME};
use crate::{GreptimeTransformer, Pipeline};

pub const PIPELINE_TABLE_NAME: &str = "pipelines";

/// PipelineOperator is responsible for managing pipelines.
/// It provides the ability to:
/// - Create a pipeline table if it does not exist
Expand All @@ -50,7 +55,7 @@ pub struct PipelineOperator {

impl PipelineOperator {
/// Create a table request for the pipeline table.
pub fn create_table_request(&self, catalog: &str) -> RegisterSystemTableRequest {
fn create_table_request(&self, catalog: &str) -> RegisterSystemTableRequest {
let (time_index, primary_keys, column_defs) = PipelineTable::build_pipeline_schema();

let create_table_expr = CreateTableExpr {
Expand Down Expand Up @@ -146,20 +151,6 @@ impl PipelineOperator {
pub fn get_pipeline_table_from_cache(&self, catalog: &str) -> Option<PipelineTableRef> {
self.tables.read().unwrap().get(catalog).cloned()
}

async fn insert_and_compile(
&self,
ctx: QueryContextRef,
name: &str,
content_type: &str,
pipeline: &str,
) -> Result<PipelineInfo> {
let schema = ctx.current_schema();
self.get_pipeline_table_from_cache(ctx.current_catalog())
.context(PipelineTableNotFoundSnafu)?
.insert_and_compile(&schema, name, content_type, pipeline)
.await
}
}

impl PipelineOperator {
Expand Down Expand Up @@ -189,9 +180,16 @@ impl PipelineOperator {
let schema = query_ctx.current_schema();
self.create_pipeline_table_if_not_exists(query_ctx.clone())
.await?;

let timer = Instant::now();
self.get_pipeline_table_from_cache(query_ctx.current_catalog())
.context(PipelineTableNotFoundSnafu)?
.get_pipeline(&schema, name, version)
.inspect(|re| {
METRIC_PIPELINE_RETRIEVE_HISTOGRAM
.with_label_values(&[&re.is_ok().to_string()])
.observe(timer.elapsed().as_secs_f64())
})
.await
}

Expand All @@ -206,7 +204,38 @@ impl PipelineOperator {
self.create_pipeline_table_if_not_exists(query_ctx.clone())
.await?;

self.insert_and_compile(query_ctx, name, content_type, pipeline)
let timer = Instant::now();
self.get_pipeline_table_from_cache(query_ctx.current_catalog())
.context(PipelineTableNotFoundSnafu)?
.insert_and_compile(&query_ctx.current_schema(), name, content_type, pipeline)
.inspect(|re| {
METRIC_PIPELINE_CREATE_HISTOGRAM
.with_label_values(&[&re.is_ok().to_string()])
.observe(timer.elapsed().as_secs_f64())
})
.await
}

/// Delete a pipeline by name from pipeline table.
pub async fn delete_pipeline(
&self,
name: &str,
version: PipelineVersion,
query_ctx: QueryContextRef,
) -> Result<Option<()>> {
// trigger load pipeline table
self.create_pipeline_table_if_not_exists(query_ctx.clone())
.await?;

let timer = Instant::now();
self.get_pipeline_table_from_cache(query_ctx.current_catalog())
.context(PipelineTableNotFoundSnafu)?
.delete_pipeline(&query_ctx.current_schema(), name, version)
.inspect(|re| {
METRIC_PIPELINE_DELETE_HISTOGRAM
.with_label_values(&[&re.is_ok().to_string()])
.observe(timer.elapsed().as_secs_f64())
})
.await
}
}
Loading

0 comments on commit 849e0b9

Please sign in to comment.