Skip to content

Commit

Permalink
chore: try add headers to record_batch
Browse files Browse the repository at this point in the history
  • Loading branch information
shuiyisong committed Dec 24, 2023
1 parent 06fd7fd commit bb61e78
Show file tree
Hide file tree
Showing 9 changed files with 52 additions and 10 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ etcd-client = "0.12"
fst = "0.4.7"
futures = "0.3"
futures-util = "0.3"
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "a31ea166fc015ea7ff111ac94e26c3a5d64364d2" }
greptime-proto = { git = "https://github.com/shuiyisong/greptime-proto.git", rev = "4faf6225df6f4d08c0e1421921a5e9a1c55e4038" }
humantime-serde = "1.1"
itertools = "0.10"
lazy_static = "1.4"
Expand Down
6 changes: 5 additions & 1 deletion src/client/src/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ use common_grpc::flight::{FlightDecoder, FlightMessage};
use common_query::Output;
use common_recordbatch::error::ExternalSnafu;
use common_recordbatch::RecordBatchStreamWrapper;
use common_telemetry::logging;
use common_telemetry::tracing_context::W3cTrace;
use common_telemetry::{logging, warn};
use futures_util::StreamExt;
use prost::Message;
use snafu::{ensure, ResultExt};
Expand Down Expand Up @@ -322,6 +322,10 @@ impl Database {
};
Ok(Output::Stream(Box::pin(record_batch_stream)))
}
FlightMessage::Metrics(s) => {
warn!("[DEBUG]receive metrics in database: {:?}", s);
Ok(Output::AffectedRows(0))
}
}
}
}
Expand Down
19 changes: 13 additions & 6 deletions src/client/src/region.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use common_meta::datanode_manager::{AffectedRows, Datanode};
use common_meta::error::{self as meta_error, Result as MetaResult};
use common_recordbatch::error::ExternalSnafu;
use common_recordbatch::{RecordBatchStreamWrapper, SendableRecordBatchStream};
use common_telemetry::error;
use common_telemetry::{error, warn};
use prost::Message;
use snafu::{location, Location, OptionExt, ResultExt};
use tokio_stream::StreamExt;
Expand Down Expand Up @@ -124,16 +124,23 @@ impl RegionRequester {
let flight_message = flight_message
.map_err(BoxedError::new)
.context(ExternalSnafu)?;
let FlightMessage::Recordbatch(record_batch) = flight_message else {
yield IllegalFlightMessagesSnafu {

match flight_message {
FlightMessage::Recordbatch(record_batch) => yield Ok(record_batch),
FlightMessage::Metrics(s) => {
warn!("[DEBUG]receive metrics in region: {:?}", s);
break;
}
_ => {
yield IllegalFlightMessagesSnafu {
reason: "A Schema message must be succeeded exclusively by a set of RecordBatch messages"
}
.fail()
.map_err(BoxedError::new)
.context(ExternalSnafu);
break;
};
yield Ok(record_batch);
break;
}
}
}
}));
let record_batch_stream = RecordBatchStreamWrapper {
Expand Down
20 changes: 19 additions & 1 deletion src/common/grpc/src/flight.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
use std::collections::HashMap;
use std::sync::Arc;

use api::v1::{AffectedRows, FlightMetadata};
use api::v1::{AffectedRows, FlightMetadata, Metrics};
use arrow_flight::utils::flight_data_to_arrow_batch;
use arrow_flight::{FlightData, SchemaAsIpc};
use common_base::bytes::Bytes;
Expand All @@ -39,6 +39,7 @@ pub enum FlightMessage {
Schema(SchemaRef),
Recordbatch(RecordBatch),
AffectedRows(usize),
Metrics(String),
}

pub struct FlightEncoder {
Expand Down Expand Up @@ -85,6 +86,20 @@ impl FlightEncoder {
FlightMessage::AffectedRows(rows) => {
let metadata = FlightMetadata {
affected_rows: Some(AffectedRows { value: rows as _ }),
metrics: None,
}
.encode_to_vec();
FlightData {
flight_descriptor: None,
data_header: build_none_flight_msg().into(),
app_metadata: metadata.into(),
data_body: ProstBytes::default(),
}
}
FlightMessage::Metrics(s) => {
let metadata = FlightMetadata {
affected_rows: None,
metrics: Some(Metrics { metrics: s }),
}
.encode_to_vec();
FlightData {
Expand Down Expand Up @@ -119,6 +134,9 @@ impl FlightDecoder {
if let Some(AffectedRows { value }) = metadata.affected_rows {
return Ok(FlightMessage::AffectedRows(value as _));
}
if let Some(Metrics { metrics }) = metadata.metrics {
return Ok(FlightMessage::Metrics(metrics));
}
InvalidFlightDataSnafu {
reason: "Expecting FlightMetadata have some meaningful content.",
}
Expand Down
4 changes: 4 additions & 0 deletions src/common/recordbatch/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,10 @@ pub trait RecordBatchStream: Stream<Item = Result<RecordBatch>> {
fn output_ordering(&self) -> Option<&[OrderOption]> {
None
}

fn metrics(&self) -> Option<String> {
None
}
}

pub type SendableRecordBatchStream = Pin<Box<dyn RecordBatchStream + Send>>;
Expand Down
1 change: 1 addition & 0 deletions src/servers/src/grpc/flight/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ impl FlightRecordBatchStream {
}
}
}
// make last package to pass metrics
}
}

Expand Down
4 changes: 4 additions & 0 deletions src/table/src/table/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,10 @@ impl MemoryUsageMetrics {
self.end_time.record()
}
}

pub fn mem_used(&self) -> usize {
self.mem_used.value()
}
}

impl Drop for MemoryUsageMetrics {
Expand Down
4 changes: 4 additions & 0 deletions src/table/src/table/scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,10 @@ impl RecordBatchStream for StreamWithMetricWrapper {
fn schema(&self) -> SchemaRef {
self.stream.schema()
}

fn metrics(&self) -> Option<String> {
Some(self.metric.mem_used().to_string())
}
}

#[cfg(test)]
Expand Down

0 comments on commit bb61e78

Please sign in to comment.