Skip to content

Commit

Permalink
fix: Remove possible panic in metrics clean up. (#13004)
Browse files Browse the repository at this point in the history
  • Loading branch information
liurenjie1024 authored Oct 27, 2023
1 parent 03f285b commit 31d6dc0
Show file tree
Hide file tree
Showing 10 changed files with 160 additions and 365 deletions.
15 changes: 9 additions & 6 deletions src/batch/src/executor/generic_exchange.rs
Original file line number Diff line number Diff line change
Expand Up @@ -201,12 +201,15 @@ impl<CS: 'static + Send + CreateSource, C: BatchTaskContext> GenericExchangeExec
// create the collector
let source_id = source.get_task_id();
let counter = metrics.as_ref().map(|metrics| {
metrics.create_collector_for_exchange_recv_row_number(vec![
identity,
source_id.query_id,
source_id.stage_id.to_string(),
source_id.task_id.to_string(),
])
metrics
.executor_metrics()
.exchange_recv_row_number
.with_label_values(&[
source_id.query_id.as_str(),
format!("{}", source_id.stage_id).as_str(),
format!("{}", source_id.task_id).as_str(),
identity.as_str(),
])
});

loop {
Expand Down
6 changes: 3 additions & 3 deletions src/batch/src/executor/group_top_n.rs
Original file line number Diff line number Diff line change
Expand Up @@ -240,9 +240,9 @@ impl<K: HashKey> GroupTopNExecutor<K> {
#[cfg(test)]
mod tests {
use futures::stream::StreamExt;
use prometheus::IntGauge;
use risingwave_common::array::DataChunk;
use risingwave_common::catalog::{Field, Schema};
use risingwave_common::metrics::LabelGuardedIntGauge;
use risingwave_common::test_prelude::DataChunkTestExt;
use risingwave_common::types::DataType;
use risingwave_common::util::sort_util::OrderType;
Expand All @@ -254,7 +254,7 @@ mod tests {

#[tokio::test]
async fn test_group_top_n_executor() {
let parent_mem = MemoryContext::root(IntGauge::new("root_memory_usage", " ").unwrap());
let parent_mem = MemoryContext::root(LabelGuardedIntGauge::<4>::test_int_gauge());
{
let schema = Schema {
fields: vec![
Expand Down Expand Up @@ -290,7 +290,7 @@ mod tests {
];
let mem_ctx = MemoryContext::new(
Some(parent_mem.clone()),
IntGauge::new("memory_usage", " ").unwrap(),
LabelGuardedIntGauge::<4>::test_int_gauge(),
);
let top_n_executor = (GroupTopNExecutorBuilder {
child: Box::new(mock_executor),
Expand Down
6 changes: 3 additions & 3 deletions src/batch/src/executor/hash_agg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -308,8 +308,8 @@ mod tests {
use std::sync::Arc;

use futures_async_stream::for_await;
use prometheus::IntGauge;
use risingwave_common::catalog::{Field, Schema};
use risingwave_common::metrics::LabelGuardedIntGauge;
use risingwave_common::test_prelude::DataChunkTestExt;
use risingwave_pb::data::data_type::TypeName;
use risingwave_pb::data::PbDataType;
Expand All @@ -323,7 +323,7 @@ mod tests {

#[tokio::test]
async fn execute_int32_grouped() {
let parent_mem = MemoryContext::root(IntGauge::new("root_memory_usage", " ").unwrap());
let parent_mem = MemoryContext::root(LabelGuardedIntGauge::<4>::test_int_gauge());
{
let src_exec = Box::new(MockExecutor::with_chunk(
DataChunk::from_pretty(
Expand Down Expand Up @@ -370,7 +370,7 @@ mod tests {

let mem_context = MemoryContext::new(
Some(parent_mem.clone()),
IntGauge::new("memory_usage", " ").unwrap(),
LabelGuardedIntGauge::<4>::test_int_gauge(),
);
let actual_exec = HashAggExecutorBuilder::deserialize(
&agg_prost,
Expand Down
6 changes: 3 additions & 3 deletions src/batch/src/executor/join/hash_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1933,12 +1933,12 @@ impl<K> HashJoinExecutor<K> {
mod tests {
use futures::StreamExt;
use futures_async_stream::for_await;
use prometheus::IntGauge;
use risingwave_common::array::{ArrayBuilderImpl, DataChunk};
use risingwave_common::catalog::{Field, Schema};
use risingwave_common::error::Result;
use risingwave_common::hash::Key32;
use risingwave_common::memory::MemoryContext;
use risingwave_common::metrics::LabelGuardedIntGauge;
use risingwave_common::test_prelude::DataChunkTestExt;
use risingwave_common::types::DataType;
use risingwave_common::util::iter_util::ZipEqDebug;
Expand Down Expand Up @@ -2157,7 +2157,7 @@ mod tests {
};

let mem_ctx =
MemoryContext::new(parent_mem_ctx, IntGauge::new("memory_usage", " ").unwrap());
MemoryContext::new(parent_mem_ctx, LabelGuardedIntGauge::<4>::test_int_gauge());
Box::new(HashJoinExecutor::<Key32>::new(
join_type,
output_indices,
Expand Down Expand Up @@ -2198,7 +2198,7 @@ mod tests {
right_executor: BoxedExecutor,
) {
let parent_mem_context =
MemoryContext::root(IntGauge::new("total_memory_usage", " ").unwrap());
MemoryContext::root(LabelGuardedIntGauge::<4>::test_int_gauge());

{
let join_executor = self.create_join_executor_with_chunk_size_and_executors(
Expand Down
18 changes: 10 additions & 8 deletions src/batch/src/executor/row_seq_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::ops::{Bound, RangeBounds};
use std::ops::{Bound, Deref, RangeBounds};
use std::sync::Arc;

use futures::{pin_mut, StreamExt};
Expand Down Expand Up @@ -310,9 +310,12 @@ impl<S: StateStore> RowSeqScanExecutor<S> {
let table = Arc::new(table);

// Create collector.
let histogram = metrics
.as_ref()
.map(|metrics| metrics.create_collector_for_row_seq_scan_next_duration(vec![identity]));
let histogram = metrics.as_ref().map(|metrics| {
metrics
.executor_metrics()
.row_seq_scan_next_duration
.with_label_values(&metrics.executor_labels(&identity))
});

if ordered {
// Currently we execute range-scans concurrently so the order is not guaranteed if
Expand All @@ -329,9 +332,8 @@ impl<S: StateStore> RowSeqScanExecutor<S> {
// Point Get
for point_get in point_gets {
let table = table.clone();
let histogram = histogram.clone();
if let Some(row) =
Self::execute_point_get(table, point_get, epoch.clone(), histogram).await?
Self::execute_point_get(table, point_get, epoch.clone(), histogram.clone()).await?
{
if let Some(chunk) = data_chunk_builder.append_one_row(row) {
yield chunk;
Expand Down Expand Up @@ -365,7 +367,7 @@ impl<S: StateStore> RowSeqScanExecutor<S> {
table: Arc<StorageTable<S>>,
scan_range: ScanRange,
epoch: BatchQueryEpoch,
histogram: Option<Histogram>,
histogram: Option<impl Deref<Target = Histogram>>,
) -> Result<Option<OwnedRow>> {
let pk_prefix = scan_range.pk_prefix;
assert!(pk_prefix.len() == table.pk_indices().len());
Expand All @@ -389,7 +391,7 @@ impl<S: StateStore> RowSeqScanExecutor<S> {
ordered: bool,
epoch: BatchQueryEpoch,
chunk_size: usize,
histogram: Option<Histogram>,
histogram: Option<impl Deref<Target = Histogram>>,
) {
let ScanRange {
pk_prefix,
Expand Down
1 change: 1 addition & 0 deletions src/batch/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
#![feature(result_option_inspect)]
#![feature(assert_matches)]
#![feature(lazy_cell)]
#![feature(array_methods)]

mod error;
pub mod exchange_source;
Expand Down
Loading

0 comments on commit 31d6dc0

Please sign in to comment.