Skip to content

Commit

Permalink
refactor: add interceptor after Influxdb lines are converted to grpc …
Browse files Browse the repository at this point in the history
…row insert (#4225)

* fix: make Influxdb lines able to be inserted into last created tables

* Update src/servers/src/influxdb.rs

* add an option to control the time index alignment behavior

* fix ci

* refactor: use interceptor to handle timestamp align

* Apply suggestions from code review

Co-authored-by: dennis zhuang <[email protected]>

---------

Co-authored-by: tison <[email protected]>
Co-authored-by: dennis zhuang <[email protected]>
  • Loading branch information
3 people authored Jul 1, 2024
1 parent 2665616 commit 6276e00
Show file tree
Hide file tree
Showing 13 changed files with 68 additions and 57 deletions.
5 changes: 3 additions & 2 deletions src/cmd/src/frontend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -337,6 +337,7 @@ impl StartCommand {
let client = NodeClients::new(channel_config);

let mut instance = FrontendBuilder::new(
opts.clone(),
cached_meta_backend.clone(),
layered_cache_registry.clone(),
catalog_manager,
Expand All @@ -350,12 +351,12 @@ impl StartCommand {
.await
.context(StartFrontendSnafu)?;

let servers = Services::new(opts.clone(), Arc::new(instance.clone()), plugins)
let servers = Services::new(opts, Arc::new(instance.clone()), plugins)
.build()
.await
.context(StartFrontendSnafu)?;
instance
.build_servers(opts, servers)
.build_servers(servers)
.context(StartFrontendSnafu)?;

Ok(Instance::new(instance, guard))
Expand Down
5 changes: 3 additions & 2 deletions src/cmd/src/standalone.rs
Original file line number Diff line number Diff line change
Expand Up @@ -510,6 +510,7 @@ impl StartCommand {
.await?;

let mut frontend = FrontendBuilder::new(
fe_opts.clone(),
kv_backend,
layered_cache_registry,
catalog_manager,
Expand All @@ -529,12 +530,12 @@ impl StartCommand {
// TODO(discord9): unify with adding `start` and `shutdown` method to flownode too.
let _handle = flow_worker_manager.run_background();

let servers = Services::new(fe_opts.clone(), Arc::new(frontend.clone()), fe_plugins)
let servers = Services::new(fe_opts, Arc::new(frontend.clone()), fe_plugins)
.build()
.await
.context(StartFrontendSnafu)?;
frontend
.build_servers(fe_opts, servers)
.build_servers(servers)
.context(StartFrontendSnafu)?;

Ok(Instance {
Expand Down
12 changes: 4 additions & 8 deletions src/frontend/src/instance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ use auth::{PermissionChecker, PermissionCheckerRef, PermissionReq};
use catalog::CatalogManagerRef;
use client::OutputData;
use common_base::Plugins;
use common_config::{Configurable, KvBackendConfig};
use common_config::KvBackendConfig;
use common_error::ext::{BoxedError, ErrorExt};
use common_frontend::handler::FrontendInvoker;
use common_grpc::channel_manager::{ChannelConfig, ChannelManager};
Expand Down Expand Up @@ -114,6 +114,7 @@ pub type FrontendInstanceRef = Arc<dyn FrontendInstance>;

#[derive(Clone)]
pub struct Instance {
options: FrontendOptions,
catalog_manager: CatalogManagerRef,
script_executor: Arc<ScriptExecutor>,
pipeline_operator: Arc<PipelineOperator>,
Expand Down Expand Up @@ -189,14 +190,9 @@ impl Instance {
Ok((kv_backend, procedure_manager))
}

pub fn build_servers(
&mut self,
opts: impl Into<FrontendOptions> + Configurable,
servers: ServerHandlers,
) -> Result<()> {
let opts: FrontendOptions = opts.into();
pub fn build_servers(&mut self, servers: ServerHandlers) -> Result<()> {
self.export_metrics_task =
ExportMetricsTask::try_new(&opts.export_metrics, Some(&self.plugins))
ExportMetricsTask::try_new(&self.options.export_metrics, Some(&self.plugins))
.context(StartServerSnafu)?;

self.servers = servers;
Expand Down
5 changes: 5 additions & 0 deletions src/frontend/src/instance/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,15 @@ use servers::server::ServerHandlers;
use snafu::OptionExt;

use crate::error::{self, Result};
use crate::frontend::FrontendOptions;
use crate::heartbeat::HeartbeatTask;
use crate::instance::region_query::FrontendRegionQueryHandler;
use crate::instance::Instance;
use crate::script::ScriptExecutor;

/// The frontend [`Instance`] builder.
pub struct FrontendBuilder {
options: FrontendOptions,
kv_backend: KvBackendRef,
layered_cache_registry: LayeredCacheRegistryRef,
local_cache_invalidator: Option<CacheInvalidatorRef>,
Expand All @@ -55,13 +57,15 @@ pub struct FrontendBuilder {

impl FrontendBuilder {
pub fn new(
options: FrontendOptions,
kv_backend: KvBackendRef,
layered_cache_registry: LayeredCacheRegistryRef,
catalog_manager: CatalogManagerRef,
node_manager: NodeManagerRef,
procedure_executor: ProcedureExecutorRef,
) -> Self {
Self {
options,
kv_backend,
layered_cache_registry,
local_cache_invalidator: None,
Expand Down Expand Up @@ -183,6 +187,7 @@ impl FrontendBuilder {
plugins.insert::<StatementExecutorRef>(statement_executor.clone());

Ok(Instance {
options: self.options,
catalog_manager: self.catalog_manager,
script_executor,
pipeline_operator,
Expand Down
5 changes: 5 additions & 0 deletions src/frontend/src/instance/influxdb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,11 @@ impl InfluxdbLineProtocolHandler for Instance {
interceptor_ref.pre_execute(&request.lines, ctx.clone())?;

let requests = request.try_into()?;

let requests = interceptor_ref
.post_lines_conversion(requests, ctx.clone())
.await?;

self.handle_row_inserts(requests, ctx)
.await
.map_err(BoxedError::new)
Expand Down
3 changes: 1 addition & 2 deletions src/servers/src/influxdb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,9 +114,8 @@ fn unwrap_or_default_precision(precision: Option<Precision>) -> Precision {
#[cfg(test)]
mod tests {
use api::v1::value::ValueData;
use api::v1::{ColumnDataType, Rows, SemanticType};
use api::v1::{ColumnDataType, RowInsertRequests, Rows, SemanticType};

use super::*;
use crate::influxdb::InfluxdbRequest;

#[test]
Expand Down
28 changes: 28 additions & 0 deletions src/servers/src/interceptor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ use std::sync::Arc;

use api::prom_store::remote::{ReadRequest, WriteRequest};
use api::v1::greptime_request::Request;
use api::v1::RowInsertRequests;
use async_trait::async_trait;
use common_error::ext::ErrorExt;
use common_query::Output;
use query::parser::PromQuery;
Expand Down Expand Up @@ -275,17 +277,31 @@ impl<E: ErrorExt> ScriptInterceptor for Option<ScriptInterceptorRef<E>> {

/// LineProtocolInterceptor can track life cycle of a line protocol request
/// and customize or abort its execution at given point.
#[async_trait]
pub trait LineProtocolInterceptor {
type Error: ErrorExt;

fn pre_execute(&self, _line: &str, _query_ctx: QueryContextRef) -> Result<(), Self::Error> {
Ok(())
}

/// Called after the lines are converted to the [RowInsertRequests].
/// We can then modify the resulting requests if needed.
/// Typically used in some backward compatibility situation.
async fn post_lines_conversion(
&self,
requests: RowInsertRequests,
query_context: QueryContextRef,
) -> Result<RowInsertRequests, Self::Error> {
let _ = query_context;
Ok(requests)
}
}

pub type LineProtocolInterceptorRef<E> =
Arc<dyn LineProtocolInterceptor<Error = E> + Send + Sync + 'static>;

#[async_trait]
impl<E: ErrorExt> LineProtocolInterceptor for Option<LineProtocolInterceptorRef<E>> {
type Error = E;

Expand All @@ -296,6 +312,18 @@ impl<E: ErrorExt> LineProtocolInterceptor for Option<LineProtocolInterceptorRef<
Ok(())
}
}

async fn post_lines_conversion(
&self,
requests: RowInsertRequests,
query_context: QueryContextRef,
) -> Result<RowInsertRequests, Self::Error> {
if let Some(this) = self {
this.post_lines_conversion(requests, query_context).await
} else {
Ok(requests)
}
}
}

/// OpenTelemetryProtocolInterceptor can track life cycle of an open telemetry protocol request
Expand Down
1 change: 1 addition & 0 deletions src/servers/src/row_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -333,6 +333,7 @@ fn write_ts_to(
ValueData::TimestampNanosecondValue(ts),
),
};

let index = column_indexes.get(&name);
if let Some(index) = index {
check_schema(datatype, SemanticType::Timestamp, &schema[*index])?;
Expand Down
4 changes: 3 additions & 1 deletion tests-integration/src/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -376,14 +376,16 @@ impl GreptimeDbClusterBuilder {
Arc::new(InvalidateTableCacheHandler::new(cache_registry.clone())),
]);

let options = FrontendOptions::default();
let heartbeat_task = HeartbeatTask::new(
&FrontendOptions::default(),
&options,
meta_client.clone(),
HeartbeatOptions::default(),
Arc::new(handlers_executor),
);

let instance = FrontendBuilder::new(
options,
cached_meta_backend.clone(),
cache_registry.clone(),
catalog_manager,
Expand Down
50 changes: 10 additions & 40 deletions tests-integration/src/influxdb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,52 +18,19 @@ mod test {

use client::OutputData;
use common_recordbatch::RecordBatches;
use frontend::instance::Instance;
use rstest::rstest;
use rstest_reuse::apply;
use servers::influxdb::InfluxdbRequest;
use servers::query_handler::sql::SqlQueryHandler;
use servers::query_handler::InfluxdbLineProtocolHandler;
use session::context::QueryContext;

use crate::standalone::GreptimeDbStandaloneBuilder;
use crate::tests;
use crate::tests::test_util::{both_instances_cases, distributed, standalone, MockInstance};

#[tokio::test(flavor = "multi_thread")]
async fn test_standalone_put_influxdb_lines() {
let standalone = GreptimeDbStandaloneBuilder::new("test_standalone_put_influxdb_lines")
.build()
.await;
let instance = &standalone.instance;

test_put_influxdb_lines(instance).await;
}

#[tokio::test(flavor = "multi_thread")]
async fn test_distributed_put_influxdb_lines() {
let instance =
tests::create_distributed_instance("test_distributed_put_influxdb_lines").await;
test_put_influxdb_lines(&instance.frontend()).await;
}
#[apply(both_instances_cases)]
async fn test_put_influxdb_lines_without_time_column(instance: Arc<dyn MockInstance>) {
let instance = instance.frontend();

#[tokio::test(flavor = "multi_thread")]
async fn test_standalone_put_influxdb_lines_without_time_column() {
let standalone = GreptimeDbStandaloneBuilder::new(
"test_standalone_put_influxdb_lines_without_time_column",
)
.build()
.await;
test_put_influxdb_lines_without_time_column(&standalone.instance).await;
}

#[tokio::test(flavor = "multi_thread")]
async fn test_distributed_put_influxdb_lines_without_time_column() {
let instance = tests::create_distributed_instance(
"test_distributed_put_influxdb_lines_without_time_column",
)
.await;
test_put_influxdb_lines_without_time_column(&instance.frontend()).await;
}

async fn test_put_influxdb_lines_without_time_column(instance: &Arc<Instance>) {
let lines = r"
monitor1,host=host1 cpu=66.6,memory=1024
monitor1,host=host2 memory=1027";
Expand Down Expand Up @@ -92,7 +59,10 @@ monitor1,host=host2 memory=1027";
assert_eq!(total, 2);
}

async fn test_put_influxdb_lines(instance: &Arc<Instance>) {
#[apply(both_instances_cases)]
async fn test_put_influxdb_lines(instance: Arc<dyn MockInstance>) {
let instance = instance.frontend();

let lines = r"
monitor1,host=host1 cpu=66.6,memory=1024 1663840496100023100
monitor1,host=host2 memory=1027 1663840496400340001";
Expand Down
1 change: 1 addition & 0 deletions tests-integration/src/standalone.rs
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,7 @@ impl GreptimeDbStandaloneBuilder {
);

let instance = FrontendBuilder::new(
opts.frontend_options(),
kv_backend.clone(),
cache_registry,
catalog_manager,
Expand Down
2 changes: 1 addition & 1 deletion tests-integration/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
mod instance_kafka_wal_test;
mod instance_test;
mod promql_test;
mod test_util;
pub(crate) mod test_util;

use std::collections::HashMap;
use std::sync::Arc;
Expand Down
4 changes: 3 additions & 1 deletion tests-integration/src/tests/test_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
use std::env;
use std::sync::Arc;

use async_trait::async_trait;
use client::OutputData;
use common_query::Output;
use common_recordbatch::util;
Expand All @@ -36,7 +37,8 @@ pub(crate) trait RebuildableMockInstance: MockInstance {
async fn rebuild(&mut self);
}

pub(crate) trait MockInstance: Sync + Send {
#[async_trait]
pub trait MockInstance: Sync + Send {
fn frontend(&self) -> Arc<Instance>;

fn is_distributed_mode(&self) -> bool;
Expand Down

0 comments on commit 6276e00

Please sign in to comment.