Skip to content

Commit

Permalink
Partial
Browse files Browse the repository at this point in the history
  • Loading branch information
liurenjie1024 committed Oct 25, 2023
1 parent f556b28 commit e371975
Show file tree
Hide file tree
Showing 4 changed files with 123 additions and 244 deletions.
15 changes: 9 additions & 6 deletions src/batch/src/executor/generic_exchange.rs
Original file line number Diff line number Diff line change
Expand Up @@ -201,12 +201,15 @@ impl<CS: 'static + Send + CreateSource, C: BatchTaskContext> GenericExchangeExec
// create the collector
let source_id = source.get_task_id();
let counter = metrics.as_ref().map(|metrics| {
metrics.create_collector_for_exchange_recv_row_number(vec![
identity,
source_id.query_id,
source_id.stage_id.to_string(),
source_id.task_id.to_string(),
])
metrics
.executor_metrics()
.exchange_recv_row_number
.with_label_values(&[
source_id.query_id.as_str(),
format!("{}", source_id.stage_id).as_str(),
format!("{}", source_id.task_id).as_str(),
identity.as_str(),
])
});

loop {
Expand Down
10 changes: 7 additions & 3 deletions src/batch/src/executor/row_seq_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::ops::{Bound, RangeBounds};
use std::process::id;
use std::sync::Arc;

use futures::{pin_mut, StreamExt};
Expand Down Expand Up @@ -310,9 +311,12 @@ impl<S: StateStore> RowSeqScanExecutor<S> {
let table = Arc::new(table);

// Create collector.
let histogram = metrics
.as_ref()
.map(|metrics| metrics.create_collector_for_row_seq_scan_next_duration(vec![identity]));
let histogram = metrics.as_ref().map(|metrics| {
metrics
.executor_metrics()
.row_seq_scan_next_duration
.with_label_values(&metrics.executor_labels(&identity))
});

if ordered {
// Currently we execute range-scans concurrently so the order is not guaranteed if
Expand Down
Loading

0 comments on commit e371975

Please sign in to comment.