Skip to content

Commit

Permalink
feat: hint options for gRPC insert (#4454)
Browse files Browse the repository at this point in the history
* feat: hint options for gRPC isnert

* chore: unit test for extract_hints

* feat: add integration test for grpc hint

* test: add integration test for hints
  • Loading branch information
fengjiachun authored Aug 1, 2024
1 parent 90301a6 commit 291d9d5
Show file tree
Hide file tree
Showing 8 changed files with 210 additions and 14 deletions.
35 changes: 34 additions & 1 deletion src/client/src/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,12 @@ use common_telemetry::tracing_context::W3cTrace;
use futures_util::StreamExt;
use prost::Message;
use snafu::{ensure, ResultExt};
use tonic::metadata::AsciiMetadataKey;
use tonic::transport::Channel;

use crate::error::{ConvertFlightDataSnafu, Error, IllegalFlightMessagesSnafu, ServerSnafu};
use crate::error::{
ConvertFlightDataSnafu, Error, IllegalFlightMessagesSnafu, InvalidAsciiSnafu, ServerSnafu,
};
use crate::{from_grpc_response, Client, Result};

#[derive(Clone, Debug, Default)]
Expand Down Expand Up @@ -130,6 +133,36 @@ impl Database {
self.handle(Request::Inserts(requests)).await
}

pub async fn insert_with_hints(
&self,
requests: InsertRequests,
hints: &[(&str, &str)],
) -> Result<u32> {
let mut client = make_database_client(&self.client)?.inner;
let request = self.to_rpc_request(Request::Inserts(requests));

let mut request = tonic::Request::new(request);
let metadata = request.metadata_mut();
for (key, value) in hints {
let key = AsciiMetadataKey::from_bytes(format!("x-greptime-hint-{}", key).as_bytes())
.map_err(|_| {
InvalidAsciiSnafu {
value: key.to_string(),
}
.build()
})?;
let value = value.parse().map_err(|_| {
InvalidAsciiSnafu {
value: value.to_string(),
}
.build()
})?;
metadata.insert(key, value);
}
let response = client.handle(request).await?.into_inner();
from_grpc_response(response)
}

async fn handle(&self, request: Request) -> Result<u32> {
let mut client = make_database_client(&self.client)?.inner;
let request = self.to_rpc_request(request);
Expand Down
9 changes: 9 additions & 0 deletions src/client/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,13 @@ pub enum Error {
#[snafu(implicit)]
location: Location,
},

#[snafu(display("Failed to parse ascii string: {}", value))]
InvalidAscii {
value: String,
#[snafu(implicit)]
location: Location,
},
}

pub type Result<T> = std::result::Result<T, Error>;
Expand All @@ -143,6 +150,8 @@ impl ErrorExt for Error {
| Error::ConvertFlightData { source, .. }
| Error::CreateTlsChannel { source, .. } => source.status_code(),
Error::IllegalGrpcClientState { .. } => StatusCode::Unexpected,

Error::InvalidAscii { .. } => StatusCode::InvalidArguments,
}
}

Expand Down
11 changes: 10 additions & 1 deletion src/operator/src/insert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -649,9 +649,18 @@ impl Inserter {
statement_executor: &StatementExecutor,
create_type: AutoCreateTableType,
) -> Result<TableRef> {
let mut hint_options = vec![];
let options: &[(&str, &str)] = match create_type {
AutoCreateTableType::Logical(_) => unreachable!(),
AutoCreateTableType::Physical => &[],
AutoCreateTableType::Physical => {
if let Some(append_mode) = ctx.extension(APPEND_MODE_KEY) {
hint_options.push((APPEND_MODE_KEY, append_mode));
}
if let Some(merge_mode) = ctx.extension(MERGE_MODE_KEY) {
hint_options.push((MERGE_MODE_KEY, merge_mode));
}
hint_options.as_slice()
}
// Set append_mode to true for log table.
// because log tables should keep rows with the same ts and tags.
AutoCreateTableType::Log => &[(APPEND_MODE_KEY, "true")],
Expand Down
74 changes: 71 additions & 3 deletions src/servers/src/grpc/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,16 @@ use api::v1::{AffectedRows, GreptimeRequest, GreptimeResponse, ResponseHeader};
use async_trait::async_trait;
use common_error::status_code::StatusCode;
use common_query::OutputData;
use common_telemetry::warn;
use common_telemetry::{debug, warn};
use futures::StreamExt;
use tonic::metadata::{KeyAndValueRef, MetadataMap};
use tonic::{Request, Response, Status, Streaming};

use crate::grpc::greptime_handler::GreptimeRequestHandler;
use crate::grpc::{cancellation, TonicResult};

pub const GREPTIME_DB_HEADER_HINT_PREFIX: &str = "x-greptime-hint-";

pub(crate) struct DatabaseService {
handler: GreptimeRequestHandler,
}
Expand All @@ -42,10 +45,15 @@ impl GreptimeDatabase for DatabaseService {
request: Request<GreptimeRequest>,
) -> TonicResult<Response<GreptimeResponse>> {
let remote_addr = request.remote_addr();
let hints = extract_hints(request.metadata());
debug!(
"GreptimeDatabase::Handle: request from {:?} with hints: {:?}",
remote_addr, hints
);
let handler = self.handler.clone();
let request_future = async move {
let request = request.into_inner();
let output = handler.handle_request(request).await?;
let output = handler.handle_request(request, hints).await?;
let message = match output.data {
OutputData::AffectedRows(rows) => GreptimeResponse {
header: Some(ResponseHeader {
Expand Down Expand Up @@ -83,14 +91,19 @@ impl GreptimeDatabase for DatabaseService {
request: Request<Streaming<GreptimeRequest>>,
) -> Result<Response<GreptimeResponse>, Status> {
let remote_addr = request.remote_addr();
let hints = extract_hints(request.metadata());
debug!(
"GreptimeDatabase::HandleRequests: request from {:?} with hints: {:?}",
remote_addr, hints
);
let handler = self.handler.clone();
let request_future = async move {
let mut affected_rows = 0;

let mut stream = request.into_inner();
while let Some(request) = stream.next().await {
let request = request?;
let output = handler.handle_request(request).await?;
let output = handler.handle_request(request, hints.clone()).await?;
match output.data {
OutputData::AffectedRows(rows) => affected_rows += rows,
OutputData::Stream(_) | OutputData::RecordBatches(_) => {
Expand Down Expand Up @@ -129,3 +142,58 @@ impl GreptimeDatabase for DatabaseService {
cancellation::with_cancellation_handler(request_future, cancellation_future).await
}
}

fn extract_hints(metadata: &MetadataMap) -> Vec<(String, String)> {
metadata
.iter()
.filter_map(|kv| {
let KeyAndValueRef::Ascii(key, value) = kv else {
return None;
};
let key = key.as_str();
if !key.starts_with(GREPTIME_DB_HEADER_HINT_PREFIX) {
return None;
}
let Ok(value) = value.to_str() else {
// Simply return None for non-string values.
return None;
};
// Safety: we already checked the prefix.
let new_key = key
.strip_prefix(GREPTIME_DB_HEADER_HINT_PREFIX)
.unwrap()
.to_string();
Some((new_key, value.trim().to_string()))
})
.collect()
}

#[cfg(test)]
mod tests {
use tonic::metadata::MetadataValue;

use super::*;

#[test]
fn test_extract_hints() {
let mut metadata = MetadataMap::new();
let prev = metadata.insert(
"x-greptime-hint-append_mode",
MetadataValue::from_static("true"),
);
assert!(prev.is_none());
let hints = extract_hints(&metadata);
assert_eq!(hints, vec![("append_mode".to_string(), "true".to_string())]);
}

#[test]
fn extract_hints_ignores_non_ascii_metadata() {
let mut metadata = MetadataMap::new();
metadata.insert_bin(
"x-greptime-hint-merge_mode-bin",
MetadataValue::from_bytes(b"last_non_null"),
);
let hints = extract_hints(&metadata);
assert!(hints.is_empty());
}
}
2 changes: 1 addition & 1 deletion src/servers/src/grpc/flight.rs
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ impl FlightCraft for GreptimeRequestHandler {
request_type = get_request_type(&request)
);
async {
let output = self.handle_request(request).await?;
let output = self.handle_request(request, Default::default()).await?;
let stream: Pin<Box<dyn Stream<Item = Result<FlightData, Status>> + Send + Sync>> =
to_flight_data_stream(output, TracingContext::from_current_span());
Ok(Response::new(stream))
Expand Down
23 changes: 16 additions & 7 deletions src/servers/src/grpc/greptime_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,13 +58,17 @@ impl GreptimeRequestHandler {
}

#[tracing::instrument(skip_all, fields(protocol = "grpc", request_type = get_request_type(&request)))]
pub(crate) async fn handle_request(&self, request: GreptimeRequest) -> Result<Output> {
pub(crate) async fn handle_request(
&self,
request: GreptimeRequest,
hints: Vec<(String, String)>,
) -> Result<Output> {
let query = request.request.context(InvalidQuerySnafu {
reason: "Expecting non-empty GreptimeRequest.",
})?;

let header = request.header.as_ref();
let query_ctx = create_query_context(header);
let query_ctx = create_query_context(header, hints);
let user_info = auth(self.user_provider.clone(), header, &query_ctx).await?;
query_ctx.set_current_user(user_info);

Expand Down Expand Up @@ -164,7 +168,10 @@ pub(crate) async fn auth(
})
}

pub(crate) fn create_query_context(header: Option<&RequestHeader>) -> QueryContextRef {
pub(crate) fn create_query_context(
header: Option<&RequestHeader>,
extensions: Vec<(String, String)>,
) -> QueryContextRef {
let (catalog, schema) = header
.map(|header| {
// We provide dbname field in newer versions of protos/sdks
Expand Down Expand Up @@ -193,12 +200,14 @@ pub(crate) fn create_query_context(header: Option<&RequestHeader>) -> QueryConte
)
});
let timezone = parse_timezone(header.map(|h| h.timezone.as_str()));
QueryContextBuilder::default()
let mut ctx_builder = QueryContextBuilder::default()
.current_catalog(catalog)
.current_schema(schema)
.timezone(timezone)
.build()
.into()
.timezone(timezone);
for (key, value) in extensions {
ctx_builder = ctx_builder.set_extension(key, value);
}
ctx_builder.build().into()
}

/// Histogram timer for handling gRPC request.
Expand Down
2 changes: 1 addition & 1 deletion src/servers/src/grpc/prom_query_gateway.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ impl PrometheusGateway for PrometheusGatewayService {
};

let header = inner.header.as_ref();
let query_ctx = create_query_context(header);
let query_ctx = create_query_context(header, Default::default());
let user_info = auth(self.user_provider.clone(), header, &query_ctx).await?;
query_ctx.set_current_user(user_info);

Expand Down
68 changes: 68 additions & 0 deletions tests-integration/tests/grpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ macro_rules! grpc_tests {

test_invalid_dbname,
test_auto_create_table,
test_auto_create_table_with_hints,
test_insert_and_select,
test_dbname,
test_grpc_message_size_ok,
Expand Down Expand Up @@ -277,6 +278,17 @@ pub async fn test_auto_create_table(store_type: StorageType) {
guard.remove_all().await;
}

pub async fn test_auto_create_table_with_hints(store_type: StorageType) {
let (addr, mut guard, fe_grpc_server) =
setup_grpc_server(store_type, "auto_create_table_with_hints").await;

let grpc_client = Client::with_urls(vec![addr]);
let db = Database::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, grpc_client);
insert_with_hints_and_assert(&db).await;
let _ = fe_grpc_server.shutdown().await;
guard.remove_all().await;
}

fn expect_data() -> (Column, Column, Column, Column) {
// testing data:
let expected_host_col = Column {
Expand Down Expand Up @@ -377,6 +389,62 @@ pub async fn test_insert_and_select(store_type: StorageType) {
guard.remove_all().await;
}

async fn insert_with_hints_and_assert(db: &Database) {
// testing data:
let (expected_host_col, expected_cpu_col, expected_mem_col, expected_ts_col) = expect_data();

let request = InsertRequest {
table_name: "demo".to_string(),
columns: vec![
expected_host_col.clone(),
expected_cpu_col.clone(),
expected_mem_col.clone(),
expected_ts_col.clone(),
],
row_count: 4,
};
let result = db
.insert_with_hints(
InsertRequests {
inserts: vec![request],
},
&[("append_mode", "true")],
)
.await;
assert_eq!(result.unwrap(), 4);

// show table
let output = db.sql("SHOW CREATE TABLE demo;").await.unwrap();

let record_batches = match output.data {
OutputData::RecordBatches(record_batches) => record_batches,
OutputData::Stream(stream) => RecordBatches::try_collect(stream).await.unwrap(),
OutputData::AffectedRows(_) => unreachable!(),
};

let pretty = record_batches.pretty_print().unwrap();
let expected = "\
+-------+-------------------------------------+
| Table | Create Table |
+-------+-------------------------------------+
| demo | CREATE TABLE IF NOT EXISTS \"demo\" ( |
| | \"host\" STRING NULL, |
| | \"cpu\" DOUBLE NULL, |
| | \"memory\" DOUBLE NULL, |
| | \"ts\" TIMESTAMP(3) NOT NULL, |
| | TIME INDEX (\"ts\"), |
| | PRIMARY KEY (\"host\") |
| | ) |
| | |
| | ENGINE=mito |
| | WITH( |
| | append_mode = 'true' |
| | ) |
+-------+-------------------------------------+\
";
assert_eq!(pretty, expected);
}

async fn insert_and_assert(db: &Database) {
// testing data:
let (expected_host_col, expected_cpu_col, expected_mem_col, expected_ts_col) = expect_data();
Expand Down

0 comments on commit 291d9d5

Please sign in to comment.