Skip to content

Commit

Permalink
chore: add metrics for COPY TO
Browse files Browse the repository at this point in the history
  • Loading branch information
WenyXu committed Dec 5, 2023
1 parent f9e7762 commit c07d9e4
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 7 deletions.
6 changes: 6 additions & 0 deletions src/operator/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,10 @@ lazy_static! {
register_int_counter!("table_operator_ingest_rows", "table operator ingest rows").unwrap();
pub static ref DIST_DELETE_ROW_COUNT: IntCounter =
register_int_counter!("table_operator_delete_rows", "table operator delete rows").unwrap();
/// The elapsed time of COPY TO statement.
pub static ref COPY_TO_ELAPSED: HistogramVec = register_histogram_vec!(
"table_operator_copy_to_elapsed",
"copy to elapsed",
&["stage", "file_format", "table"]
).unwrap();
}
42 changes: 35 additions & 7 deletions src/operator/src/statement/copy_table_from.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ use table::requests::{CopyTableRequest, InsertRequest};
use tokio::io::BufReader;

use crate::error::{self, IntoVectorsSnafu, Result};
use crate::metrics;
use crate::statement::StatementExecutor;

const DEFAULT_BATCH_SIZE: usize = 8192;
Expand Down Expand Up @@ -252,7 +253,16 @@ impl StatementExecutor {

let format = Format::try_from(&req.with).context(error::ParseFileFormatSnafu)?;

let (object_store, entries) = self.list_copy_from_entries(&req).await?;
let file_format = format.suffix();
let table_name = table_ref.to_string();

let (object_store, entries) = {
let _timer = metrics::COPY_TO_ELAPSED
.with_label_values(&["list_entries", file_format, &table_name])
.start_timer();

self.list_copy_from_entries(&req).await?
};

let mut files = Vec::with_capacity(entries.len());
let table_schema = table.schema().arrow_schema().clone();
Expand All @@ -262,9 +272,14 @@ impl StatementExecutor {
continue;
}
let path = entry.path();
let file_schema = self
.infer_schema(&format, object_store.clone(), path)
.await?;
let file_schema = {
let _timer = metrics::COPY_TO_ELAPSED
.with_label_values(&["infer_schema", file_format, &table_name])
.start_timer();

self.infer_schema(&format, object_store.clone(), path)
.await?
};
let (file_schema_projection, table_schema_projection, compat_schema) =
generated_schema_projection_and_compatible_file_schema(&file_schema, &table_schema);

Expand All @@ -291,15 +306,20 @@ impl StatementExecutor {

let mut rows_inserted = 0;
for (schema, file_schema_projection, projected_table_schema, path) in files {
let mut stream = self
.build_read_stream(
let mut stream = {
let _timer = metrics::COPY_TO_ELAPSED
.with_label_values(&["build_read_stream", file_format, &table_name])
.start_timer();

self.build_read_stream(
&format,
object_store.clone(),
path,
schema,
file_schema_projection,
)
.await?;
.await?
};

let fields = projected_table_schema
.fields()
Expand All @@ -313,6 +333,10 @@ impl StatementExecutor {
let mut pending = vec![];

while let Some(r) = stream.next().await {
let _timer = metrics::COPY_TO_ELAPSED
.with_label_values(&["read", file_format, &table_name])
.start_timer();

let record_batch = r.context(error::ReadDfRecordBatchSnafu)?;
let vectors =
Helper::try_into_vectors(record_batch.columns()).context(IntoVectorsSnafu)?;
Expand Down Expand Up @@ -341,6 +365,10 @@ impl StatementExecutor {
}

if !pending.is_empty() {
let _timer = metrics::COPY_TO_ELAPSED
.with_label_values(&["batch_insert", file_format, &table_name])
.start_timer();

rows_inserted += batch_insert(&mut pending, &mut pending_mem_size).await?;
}
}
Expand Down

0 comments on commit c07d9e4

Please sign in to comment.