diff --git a/src/operator/src/metrics.rs b/src/operator/src/metrics.rs index 577d843cc069..b37750c047ca 100644 --- a/src/operator/src/metrics.rs +++ b/src/operator/src/metrics.rs @@ -22,4 +22,10 @@ lazy_static! { register_int_counter!("table_operator_ingest_rows", "table operator ingest rows").unwrap(); pub static ref DIST_DELETE_ROW_COUNT: IntCounter = register_int_counter!("table_operator_delete_rows", "table operator delete rows").unwrap(); + /// The elapsed time of COPY FROM statement. + pub static ref COPY_FROM_ELAPSED: HistogramVec = register_histogram_vec!( + "table_operator_copy_from_elapsed", + "copy to elapsed", + &["stage", "file_format", "table"] + ).unwrap(); } diff --git a/src/operator/src/statement/copy_table_from.rs b/src/operator/src/statement/copy_table_from.rs index 2474ee3483de..b7e1f796f168 100644 --- a/src/operator/src/statement/copy_table_from.rs +++ b/src/operator/src/statement/copy_table_from.rs @@ -48,6 +48,7 @@ use table::requests::{CopyTableRequest, InsertRequest}; use tokio::io::BufReader; use crate::error::{self, IntoVectorsSnafu, Result}; +use crate::metrics; use crate::statement::StatementExecutor; const DEFAULT_BATCH_SIZE: usize = 8192; @@ -252,7 +253,16 @@ impl StatementExecutor { let format = Format::try_from(&req.with).context(error::ParseFileFormatSnafu)?; - let (object_store, entries) = self.list_copy_from_entries(&req).await?; + let file_format = format.suffix(); + let table_name = table_ref.to_string(); + + let (object_store, entries) = { + let _timer = metrics::COPY_FROM_ELAPSED + .with_label_values(&["list_entries", file_format, &table_name]) + .start_timer(); + + self.list_copy_from_entries(&req).await? + }; let mut files = Vec::with_capacity(entries.len()); let table_schema = table.schema().arrow_schema().clone(); @@ -262,9 +272,14 @@ impl StatementExecutor { continue; } let path = entry.path(); - let file_schema = self - .infer_schema(&format, object_store.clone(), path) - .await?; + let file_schema = { + let _timer = metrics::COPY_FROM_ELAPSED + .with_label_values(&["infer_schema", file_format, &table_name]) + .start_timer(); + + self.infer_schema(&format, object_store.clone(), path) + .await? + }; let (file_schema_projection, table_schema_projection, compat_schema) = generated_schema_projection_and_compatible_file_schema(&file_schema, &table_schema); @@ -291,15 +306,20 @@ impl StatementExecutor { let mut rows_inserted = 0; for (schema, file_schema_projection, projected_table_schema, path) in files { - let mut stream = self - .build_read_stream( + let mut stream = { + let _timer = metrics::COPY_FROM_ELAPSED + .with_label_values(&["build_read_stream", file_format, &table_name]) + .start_timer(); + + self.build_read_stream( &format, object_store.clone(), path, schema, file_schema_projection, ) - .await?; + .await? + }; let fields = projected_table_schema .fields() @@ -313,6 +333,10 @@ impl StatementExecutor { let mut pending = vec![]; while let Some(r) = stream.next().await { + let _timer = metrics::COPY_FROM_ELAPSED + .with_label_values(&["read", file_format, &table_name]) + .start_timer(); + let record_batch = r.context(error::ReadDfRecordBatchSnafu)?; let vectors = Helper::try_into_vectors(record_batch.columns()).context(IntoVectorsSnafu)?; @@ -341,6 +365,10 @@ impl StatementExecutor { } if !pending.is_empty() { + let _timer = metrics::COPY_FROM_ELAPSED + .with_label_values(&["batch_insert", file_format, &table_name]) + .start_timer(); + rows_inserted += batch_insert(&mut pending, &mut pending_mem_size).await?; } }