Skip to content

Commit

Permalink
feat: introduct Limiter to limit in-flight write bytes size in frontend
Browse files Browse the repository at this point in the history
  • Loading branch information
zyy17 committed Dec 25, 2024
1 parent d51b65a commit b2f1017
Show file tree
Hide file tree
Showing 17 changed files with 398 additions and 10 deletions.
2 changes: 2 additions & 0 deletions config/config.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
| `init_regions_parallelism` | Integer | `16` | Parallelism of initializing regions. |
| `max_concurrent_queries` | Integer | `0` | The maximum current queries allowed to be executed. Zero means unlimited. |
| `enable_telemetry` | Bool | `true` | Enable telemetry to collect anonymous usage data. Enabled by default. |
| `max_in_flight_write_bytes` | String | Unset | The maximum in-flight write bytes. |
| `runtime` | -- | -- | The runtime options. |
| `runtime.global_rt_size` | Integer | `8` | The number of threads to execute the runtime for global read operations. |
| `runtime.compact_rt_size` | Integer | `4` | The number of threads to execute the runtime for global write operations. |
Expand Down Expand Up @@ -195,6 +196,7 @@
| Key | Type | Default | Descriptions |
| --- | -----| ------- | ----------- |
| `default_timezone` | String | Unset | The default timezone of the server. |
| `max_in_flight_write_bytes` | String | Unset | The maximum in-flight write bytes. |
| `runtime` | -- | -- | The runtime options. |
| `runtime.global_rt_size` | Integer | `8` | The number of threads to execute the runtime for global read operations. |
| `runtime.compact_rt_size` | Integer | `4` | The number of threads to execute the runtime for global write operations. |
Expand Down
4 changes: 4 additions & 0 deletions config/frontend.example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@
## @toml2docs:none-default
default_timezone = "UTC"

## The maximum in-flight write bytes.
## @toml2docs:none-default
#+ max_in_flight_write_bytes = "500MB"

## The runtime options.
#+ [runtime]
## The number of threads to execute the runtime for global read operations.
Expand Down
4 changes: 4 additions & 0 deletions config/standalone.example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@ max_concurrent_queries = 0
## Enable telemetry to collect anonymous usage data. Enabled by default.
#+ enable_telemetry = true

## The maximum in-flight write bytes.
## @toml2docs:none-default
#+ max_in_flight_write_bytes = "500MB"

## The runtime options.
#+ [runtime]
## The number of threads to execute the runtime for global read operations.
Expand Down
4 changes: 4 additions & 0 deletions src/cmd/src/standalone.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use catalog::information_schema::InformationExtension;
use catalog::kvbackend::KvBackendCatalogManager;
use clap::Parser;
use client::api::v1::meta::RegionRole;
use common_base::readable_size::ReadableSize;
use common_base::Plugins;
use common_catalog::consts::{MIN_USER_FLOW_ID, MIN_USER_TABLE_ID};
use common_config::{metadata_store_dir, Configurable, KvBackendConfig};
Expand Down Expand Up @@ -152,6 +153,7 @@ pub struct StandaloneOptions {
pub tracing: TracingOptions,
pub init_regions_in_background: bool,
pub init_regions_parallelism: usize,
pub max_in_flight_write_bytes: Option<ReadableSize>,
}

impl Default for StandaloneOptions {
Expand Down Expand Up @@ -181,6 +183,7 @@ impl Default for StandaloneOptions {
tracing: TracingOptions::default(),
init_regions_in_background: false,
init_regions_parallelism: 16,
max_in_flight_write_bytes: None,
}
}
}
Expand Down Expand Up @@ -218,6 +221,7 @@ impl StandaloneOptions {
user_provider: cloned_opts.user_provider,
// Handle the export metrics task run by standalone to frontend for execution
export_metrics: cloned_opts.export_metrics,
max_in_flight_write_bytes: cloned_opts.max_in_flight_write_bytes,
..Default::default()
}
}
Expand Down
8 changes: 8 additions & 0 deletions src/frontend/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -321,6 +321,12 @@ pub enum Error {
location: Location,
source: BoxedError,
},

#[snafu(display("In-flight write bytes exceeded the maximum limit"))]
InFlightWriteBytesExceeded {
#[snafu(implicit)]
location: Location,
},
}

pub type Result<T> = std::result::Result<T, Error>;
Expand Down Expand Up @@ -392,6 +398,8 @@ impl ErrorExt for Error {
Error::StartScriptManager { source, .. } => source.status_code(),

Error::TableOperation { source, .. } => source.status_code(),

Error::InFlightWriteBytesExceeded { .. } => StatusCode::RateLimited,
}
}

Expand Down
3 changes: 3 additions & 0 deletions src/frontend/src/frontend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use common_base::readable_size::ReadableSize;
use common_config::config::Configurable;
use common_options::datanode::DatanodeClientOptions;
use common_telemetry::logging::{LoggingOptions, TracingOptions};
Expand Down Expand Up @@ -46,6 +47,7 @@ pub struct FrontendOptions {
pub user_provider: Option<String>,
pub export_metrics: ExportMetricsOption,
pub tracing: TracingOptions,
pub max_in_flight_write_bytes: Option<ReadableSize>,
}

impl Default for FrontendOptions {
Expand All @@ -68,6 +70,7 @@ impl Default for FrontendOptions {
user_provider: None,
export_metrics: ExportMetricsOption::default(),
tracing: TracingOptions::default(),
max_in_flight_write_bytes: None,
}
}
}
Expand Down
2 changes: 2 additions & 0 deletions src/frontend/src/instance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ use crate::error::{
};
use crate::frontend::FrontendOptions;
use crate::heartbeat::HeartbeatTask;
use crate::limiter::LimiterRef;
use crate::script::ScriptExecutor;

#[async_trait]
Expand Down Expand Up @@ -124,6 +125,7 @@ pub struct Instance {
export_metrics_task: Option<ExportMetricsTask>,
table_metadata_manager: TableMetadataManagerRef,
stats: StatementStatistics,
limiter: Option<LimiterRef>,
}

impl Instance {
Expand Down
10 changes: 10 additions & 0 deletions src/frontend/src/instance/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ use crate::frontend::FrontendOptions;
use crate::heartbeat::HeartbeatTask;
use crate::instance::region_query::FrontendRegionQueryHandler;
use crate::instance::Instance;
use crate::limiter::Limiter;
use crate::script::ScriptExecutor;

/// The frontend [`Instance`] builder.
Expand Down Expand Up @@ -196,6 +197,14 @@ impl FrontendBuilder {

plugins.insert::<StatementExecutorRef>(statement_executor.clone());

// Create the limiter if the max_in_flight_write_bytes is set.
let limiter = self
.options
.max_in_flight_write_bytes
.map(|max_in_flight_write_bytes| {
Arc::new(Limiter::new(max_in_flight_write_bytes.as_bytes()))
});

Ok(Instance {
options: self.options,
catalog_manager: self.catalog_manager,
Expand All @@ -211,6 +220,7 @@ impl FrontendBuilder {
export_metrics_task: None,
table_metadata_manager: Arc::new(TableMetadataManager::new(kv_backend)),
stats: self.stats,
limiter,
})
}
}
10 changes: 8 additions & 2 deletions src/frontend/src/instance/grpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ use snafu::{ensure, OptionExt, ResultExt};
use table::table_name::TableName;

use crate::error::{
Error, IncompleteGrpcRequestSnafu, NotSupportedSnafu, PermissionSnafu, Result,
TableOperationSnafu,
Error, InFlightWriteBytesExceededSnafu, IncompleteGrpcRequestSnafu, NotSupportedSnafu,
PermissionSnafu, Result, TableOperationSnafu,
};
use crate::instance::{attach_timer, Instance};
use crate::metrics::{GRPC_HANDLE_PROMQL_ELAPSED, GRPC_HANDLE_SQL_ELAPSED};
Expand All @@ -50,6 +50,12 @@ impl GrpcQueryHandler for Instance {
.check_permission(ctx.current_user(), PermissionReq::GrpcRequest(&request))
.context(PermissionSnafu)?;

let _guard = self
.limiter
.as_ref()
.and_then(|l| l.limit_request(&request))
.context(InFlightWriteBytesExceededSnafu)?;

let output = match request {
Request::Inserts(requests) => self.handle_inserts(requests, ctx.clone()).await?,
Request::RowInserts(requests) => self.handle_row_inserts(requests, ctx.clone()).await?,
Expand Down
10 changes: 8 additions & 2 deletions src/frontend/src/instance/influxdb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,12 @@ use async_trait::async_trait;
use auth::{PermissionChecker, PermissionCheckerRef, PermissionReq};
use client::Output;
use common_error::ext::BoxedError;
use servers::error::{AuthSnafu, Error};
use servers::error::{AuthSnafu, Error, InFlightWriteBytesExceededSnafu};
use servers::influxdb::InfluxdbRequest;
use servers::interceptor::{LineProtocolInterceptor, LineProtocolInterceptorRef};
use servers::query_handler::InfluxdbLineProtocolHandler;
use session::context::QueryContextRef;
use snafu::ResultExt;
use snafu::{OptionExt, ResultExt};

use crate::instance::Instance;

Expand All @@ -46,6 +46,12 @@ impl InfluxdbLineProtocolHandler for Instance {
.post_lines_conversion(requests, ctx.clone())
.await?;

let _guard = self
.limiter
.as_ref()
.and_then(|l| l.limit_row_inserts(&requests))
.context(InFlightWriteBytesExceededSnafu)?;

self.handle_influx_row_inserts(requests, ctx)
.await
.map_err(BoxedError::new)
Expand Down
11 changes: 9 additions & 2 deletions src/frontend/src/instance/log_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,13 @@ use common_error::ext::BoxedError;
use pipeline::pipeline_operator::PipelineOperator;
use pipeline::{GreptimeTransformer, Pipeline, PipelineInfo, PipelineVersion};
use servers::error::{
AuthSnafu, Error as ServerError, ExecuteGrpcRequestSnafu, PipelineSnafu, Result as ServerResult,
AuthSnafu, Error as ServerError, ExecuteGrpcRequestSnafu, InFlightWriteBytesExceededSnafu,
PipelineSnafu, Result as ServerResult,
};
use servers::interceptor::{LogIngestInterceptor, LogIngestInterceptorRef};
use servers::query_handler::PipelineHandler;
use session::context::{QueryContext, QueryContextRef};
use snafu::ResultExt;
use snafu::{OptionExt, ResultExt};
use table::Table;

use crate::instance::Instance;
Expand Down Expand Up @@ -110,6 +111,12 @@ impl Instance {
log: RowInsertRequests,
ctx: QueryContextRef,
) -> ServerResult<Output> {
let _guard = self
.limiter
.as_ref()
.and_then(|l| l.limit_row_inserts(&log))
.context(InFlightWriteBytesExceededSnafu)?;

self.inserter
.handle_log_inserts(log, ctx, self.statement_executor.as_ref())
.await
Expand Down
9 changes: 8 additions & 1 deletion src/frontend/src/instance/opentsdb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use auth::{PermissionChecker, PermissionCheckerRef, PermissionReq};
use common_error::ext::BoxedError;
use common_telemetry::tracing;
use servers::error as server_error;
use servers::error::AuthSnafu;
use servers::error::{AuthSnafu, InFlightWriteBytesExceededSnafu};
use servers::opentsdb::codec::DataPoint;
use servers::opentsdb::data_point_to_grpc_row_insert_requests;
use servers::query_handler::OpentsdbProtocolHandler;
Expand All @@ -41,6 +41,13 @@ impl OpentsdbProtocolHandler for Instance {
.context(AuthSnafu)?;

let (requests, _) = data_point_to_grpc_row_insert_requests(data_points)?;

let _guard = self
.limiter
.as_ref()
.and_then(|l| l.limit_row_inserts(&requests))
.context(InFlightWriteBytesExceededSnafu)?;

let output = self
.handle_row_inserts(requests, ctx)
.await
Expand Down
23 changes: 21 additions & 2 deletions src/frontend/src/instance/otlp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,12 @@ use opentelemetry_proto::tonic::collector::logs::v1::ExportLogsServiceRequest;
use opentelemetry_proto::tonic::collector::metrics::v1::ExportMetricsServiceRequest;
use opentelemetry_proto::tonic::collector::trace::v1::ExportTraceServiceRequest;
use pipeline::PipelineWay;
use servers::error::{self, AuthSnafu, Result as ServerResult};
use servers::error::{self, AuthSnafu, InFlightWriteBytesExceededSnafu, Result as ServerResult};
use servers::interceptor::{OpenTelemetryProtocolInterceptor, OpenTelemetryProtocolInterceptorRef};
use servers::otlp;
use servers::query_handler::OpenTelemetryProtocolHandler;
use session::context::QueryContextRef;
use snafu::ResultExt;
use snafu::{OptionExt, ResultExt};

use crate::instance::Instance;
use crate::metrics::{OTLP_LOGS_ROWS, OTLP_METRICS_ROWS, OTLP_TRACES_ROWS};
Expand All @@ -53,6 +53,12 @@ impl OpenTelemetryProtocolHandler for Instance {
let (requests, rows) = otlp::metrics::to_grpc_insert_requests(request)?;
OTLP_METRICS_ROWS.inc_by(rows as u64);

let _guard = self
.limiter
.as_ref()
.and_then(|l| l.limit_row_inserts(&requests))
.context(InFlightWriteBytesExceededSnafu)?;

self.handle_row_inserts(requests, ctx)
.await
.map_err(BoxedError::new)
Expand Down Expand Up @@ -83,6 +89,12 @@ impl OpenTelemetryProtocolHandler for Instance {

OTLP_TRACES_ROWS.inc_by(rows as u64);

let _guard = self
.limiter
.as_ref()
.and_then(|l| l.limit_row_inserts(&requests))
.context(InFlightWriteBytesExceededSnafu)?;

self.handle_log_inserts(requests, ctx)
.await
.map_err(BoxedError::new)
Expand All @@ -109,6 +121,13 @@ impl OpenTelemetryProtocolHandler for Instance {
interceptor_ref.pre_execute(ctx.clone())?;

let (requests, rows) = otlp::logs::to_grpc_insert_requests(request, pipeline, table_name)?;

let _guard = self
.limiter
.as_ref()
.and_then(|l| l.limit_row_inserts(&requests))
.context(InFlightWriteBytesExceededSnafu)?;

self.handle_log_inserts(requests, ctx)
.await
.inspect(|_| OTLP_LOGS_ROWS.inc_by(rows as u64))
Expand Down
8 changes: 7 additions & 1 deletion src/frontend/src/instance/prom_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use common_telemetry::{debug, tracing};
use operator::insert::InserterRef;
use operator::statement::StatementExecutor;
use prost::Message;
use servers::error::{self, AuthSnafu, Result as ServerResult};
use servers::error::{self, AuthSnafu, InFlightWriteBytesExceededSnafu, Result as ServerResult};
use servers::http::header::{collect_plan_metrics, CONTENT_ENCODING_SNAPPY, CONTENT_TYPE_PROTOBUF};
use servers::http::prom_store::PHYSICAL_TABLE_PARAM;
use servers::interceptor::{PromStoreProtocolInterceptor, PromStoreProtocolInterceptorRef};
Expand Down Expand Up @@ -175,6 +175,12 @@ impl PromStoreProtocolHandler for Instance {
.get::<PromStoreProtocolInterceptorRef<servers::error::Error>>();
interceptor_ref.pre_write(&request, ctx.clone())?;

let _guard = self
.limiter
.as_ref()
.and_then(|l| l.limit_row_inserts(&request))
.context(InFlightWriteBytesExceededSnafu)?;

let output = if with_metric_engine {
let physical_table = ctx
.extension(PHYSICAL_TABLE_PARAM)
Expand Down
1 change: 1 addition & 0 deletions src/frontend/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ pub mod error;
pub mod frontend;
pub mod heartbeat;
pub mod instance;
pub(crate) mod limiter;
pub(crate) mod metrics;
mod script;
pub mod server;
Expand Down
Loading

0 comments on commit b2f1017

Please sign in to comment.