Skip to content

Commit

Permalink
fix: Remove possible panic in metrics clean up.
Browse files Browse the repository at this point in the history
  • Loading branch information
liurenjie1024 committed Oct 26, 2023
1 parent 9d08bb2 commit 00f4d14
Show file tree
Hide file tree
Showing 6 changed files with 148 additions and 314 deletions.
15 changes: 9 additions & 6 deletions src/batch/src/executor/generic_exchange.rs
Original file line number Diff line number Diff line change
Expand Up @@ -201,12 +201,15 @@ impl<CS: 'static + Send + CreateSource, C: BatchTaskContext> GenericExchangeExec
// create the collector
let source_id = source.get_task_id();
let counter = metrics.as_ref().map(|metrics| {
metrics.create_collector_for_exchange_recv_row_number(vec![
identity,
source_id.query_id,
source_id.stage_id.to_string(),
source_id.task_id.to_string(),
])
metrics
.executor_metrics()
.exchange_recv_row_number
.with_label_values(&[
source_id.query_id.as_str(),
format!("{}", source_id.stage_id).as_str(),
format!("{}", source_id.task_id).as_str(),
identity.as_str(),
])
});

loop {
Expand Down
18 changes: 10 additions & 8 deletions src/batch/src/executor/row_seq_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::ops::{Bound, RangeBounds};
use std::ops::{Bound, Deref, RangeBounds};
use std::sync::Arc;

use futures::{pin_mut, StreamExt};
Expand Down Expand Up @@ -310,9 +310,12 @@ impl<S: StateStore> RowSeqScanExecutor<S> {
let table = Arc::new(table);

// Create collector.
let histogram = metrics
.as_ref()
.map(|metrics| metrics.create_collector_for_row_seq_scan_next_duration(vec![identity]));
let histogram = metrics.as_ref().map(|metrics| {
metrics
.executor_metrics()
.row_seq_scan_next_duration
.with_label_values(&metrics.executor_labels(&identity))
});

if ordered {
// Currently we execute range-scans concurrently so the order is not guaranteed if
Expand All @@ -329,9 +332,8 @@ impl<S: StateStore> RowSeqScanExecutor<S> {
// Point Get
for point_get in point_gets {
let table = table.clone();
let histogram = histogram.clone();
if let Some(row) =
Self::execute_point_get(table, point_get, epoch.clone(), histogram).await?
Self::execute_point_get(table, point_get, epoch.clone(), histogram.clone()).await?
{
if let Some(chunk) = data_chunk_builder.append_one_row(row) {
yield chunk;
Expand Down Expand Up @@ -365,7 +367,7 @@ impl<S: StateStore> RowSeqScanExecutor<S> {
table: Arc<StorageTable<S>>,
scan_range: ScanRange,
epoch: BatchQueryEpoch,
histogram: Option<Histogram>,
histogram: Option<impl Deref<Target = Histogram>>,
) -> Result<Option<OwnedRow>> {
let pk_prefix = scan_range.pk_prefix;
assert!(pk_prefix.len() == table.pk_indices().len());
Expand All @@ -389,7 +391,7 @@ impl<S: StateStore> RowSeqScanExecutor<S> {
ordered: bool,
epoch: BatchQueryEpoch,
chunk_size: usize,
histogram: Option<Histogram>,
histogram: Option<impl Deref<Target = Histogram>>,
) {
let ScanRange {
pk_prefix,
Expand Down
1 change: 1 addition & 0 deletions src/batch/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
#![feature(result_option_inspect)]
#![feature(assert_matches)]
#![feature(lazy_cell)]
#![feature(array_methods)]

mod error;
pub mod exchange_source;
Expand Down
Loading

0 comments on commit 00f4d14

Please sign in to comment.