Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Remove possible panic in metrics clean up. #13004

Merged
merged 2 commits into from
Oct 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 9 additions & 6 deletions src/batch/src/executor/generic_exchange.rs
Original file line number Diff line number Diff line change
Expand Up @@ -201,12 +201,15 @@ impl<CS: 'static + Send + CreateSource, C: BatchTaskContext> GenericExchangeExec
// create the collector
let source_id = source.get_task_id();
let counter = metrics.as_ref().map(|metrics| {
metrics.create_collector_for_exchange_recv_row_number(vec![
identity,
source_id.query_id,
source_id.stage_id.to_string(),
source_id.task_id.to_string(),
])
metrics
.executor_metrics()
.exchange_recv_row_number
.with_label_values(&[
source_id.query_id.as_str(),
format!("{}", source_id.stage_id).as_str(),
format!("{}", source_id.task_id).as_str(),
identity.as_str(),
])
});

loop {
Expand Down
6 changes: 3 additions & 3 deletions src/batch/src/executor/group_top_n.rs
Original file line number Diff line number Diff line change
Expand Up @@ -240,9 +240,9 @@ impl<K: HashKey> GroupTopNExecutor<K> {
#[cfg(test)]
mod tests {
use futures::stream::StreamExt;
use prometheus::IntGauge;
use risingwave_common::array::DataChunk;
use risingwave_common::catalog::{Field, Schema};
use risingwave_common::metrics::LabelGuardedIntGauge;
use risingwave_common::test_prelude::DataChunkTestExt;
use risingwave_common::types::DataType;
use risingwave_common::util::sort_util::OrderType;
Expand All @@ -254,7 +254,7 @@ mod tests {

#[tokio::test]
async fn test_group_top_n_executor() {
let parent_mem = MemoryContext::root(IntGauge::new("root_memory_usage", " ").unwrap());
let parent_mem = MemoryContext::root(LabelGuardedIntGauge::<4>::test_int_gauge());
{
let schema = Schema {
fields: vec![
Expand Down Expand Up @@ -290,7 +290,7 @@ mod tests {
];
let mem_ctx = MemoryContext::new(
Some(parent_mem.clone()),
IntGauge::new("memory_usage", " ").unwrap(),
LabelGuardedIntGauge::<4>::test_int_gauge(),
);
let top_n_executor = (GroupTopNExecutorBuilder {
child: Box::new(mock_executor),
Expand Down
6 changes: 3 additions & 3 deletions src/batch/src/executor/hash_agg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -308,8 +308,8 @@ mod tests {
use std::sync::Arc;

use futures_async_stream::for_await;
use prometheus::IntGauge;
use risingwave_common::catalog::{Field, Schema};
use risingwave_common::metrics::LabelGuardedIntGauge;
use risingwave_common::test_prelude::DataChunkTestExt;
use risingwave_pb::data::data_type::TypeName;
use risingwave_pb::data::PbDataType;
Expand All @@ -323,7 +323,7 @@ mod tests {

#[tokio::test]
async fn execute_int32_grouped() {
let parent_mem = MemoryContext::root(IntGauge::new("root_memory_usage", " ").unwrap());
let parent_mem = MemoryContext::root(LabelGuardedIntGauge::<4>::test_int_gauge());
{
let src_exec = Box::new(MockExecutor::with_chunk(
DataChunk::from_pretty(
Expand Down Expand Up @@ -370,7 +370,7 @@ mod tests {

let mem_context = MemoryContext::new(
Some(parent_mem.clone()),
IntGauge::new("memory_usage", " ").unwrap(),
LabelGuardedIntGauge::<4>::test_int_gauge(),
);
let actual_exec = HashAggExecutorBuilder::deserialize(
&agg_prost,
Expand Down
6 changes: 3 additions & 3 deletions src/batch/src/executor/join/hash_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1933,12 +1933,12 @@ impl<K> HashJoinExecutor<K> {
mod tests {
use futures::StreamExt;
use futures_async_stream::for_await;
use prometheus::IntGauge;
use risingwave_common::array::{ArrayBuilderImpl, DataChunk};
use risingwave_common::catalog::{Field, Schema};
use risingwave_common::error::Result;
use risingwave_common::hash::Key32;
use risingwave_common::memory::MemoryContext;
use risingwave_common::metrics::LabelGuardedIntGauge;
use risingwave_common::test_prelude::DataChunkTestExt;
use risingwave_common::types::DataType;
use risingwave_common::util::iter_util::ZipEqDebug;
Expand Down Expand Up @@ -2157,7 +2157,7 @@ mod tests {
};

let mem_ctx =
MemoryContext::new(parent_mem_ctx, IntGauge::new("memory_usage", " ").unwrap());
MemoryContext::new(parent_mem_ctx, LabelGuardedIntGauge::<4>::test_int_gauge());
Box::new(HashJoinExecutor::<Key32>::new(
join_type,
output_indices,
Expand Down Expand Up @@ -2198,7 +2198,7 @@ mod tests {
right_executor: BoxedExecutor,
) {
let parent_mem_context =
MemoryContext::root(IntGauge::new("total_memory_usage", " ").unwrap());
MemoryContext::root(LabelGuardedIntGauge::<4>::test_int_gauge());

{
let join_executor = self.create_join_executor_with_chunk_size_and_executors(
Expand Down
18 changes: 10 additions & 8 deletions src/batch/src/executor/row_seq_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::ops::{Bound, RangeBounds};
use std::ops::{Bound, Deref, RangeBounds};
use std::sync::Arc;

use futures::{pin_mut, StreamExt};
Expand Down Expand Up @@ -310,9 +310,12 @@ impl<S: StateStore> RowSeqScanExecutor<S> {
let table = Arc::new(table);

// Create collector.
let histogram = metrics
.as_ref()
.map(|metrics| metrics.create_collector_for_row_seq_scan_next_duration(vec![identity]));
let histogram = metrics.as_ref().map(|metrics| {
metrics
.executor_metrics()
.row_seq_scan_next_duration
.with_label_values(&metrics.executor_labels(&identity))
});

if ordered {
// Currently we execute range-scans concurrently so the order is not guaranteed if
Expand All @@ -329,9 +332,8 @@ impl<S: StateStore> RowSeqScanExecutor<S> {
// Point Get
for point_get in point_gets {
let table = table.clone();
let histogram = histogram.clone();
if let Some(row) =
Self::execute_point_get(table, point_get, epoch.clone(), histogram).await?
Self::execute_point_get(table, point_get, epoch.clone(), histogram.clone()).await?
{
if let Some(chunk) = data_chunk_builder.append_one_row(row) {
yield chunk;
Expand Down Expand Up @@ -365,7 +367,7 @@ impl<S: StateStore> RowSeqScanExecutor<S> {
table: Arc<StorageTable<S>>,
scan_range: ScanRange,
epoch: BatchQueryEpoch,
histogram: Option<Histogram>,
histogram: Option<impl Deref<Target = Histogram>>,
) -> Result<Option<OwnedRow>> {
let pk_prefix = scan_range.pk_prefix;
assert!(pk_prefix.len() == table.pk_indices().len());
Expand All @@ -389,7 +391,7 @@ impl<S: StateStore> RowSeqScanExecutor<S> {
ordered: bool,
epoch: BatchQueryEpoch,
chunk_size: usize,
histogram: Option<Histogram>,
histogram: Option<impl Deref<Target = Histogram>>,
) {
let ScanRange {
pk_prefix,
Expand Down
1 change: 1 addition & 0 deletions src/batch/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
#![feature(result_option_inspect)]
#![feature(assert_matches)]
#![feature(lazy_cell)]
#![feature(array_methods)]

mod error;
pub mod exchange_source;
Expand Down
Loading
Loading