diff --git a/src/operator/src/metrics.rs b/src/operator/src/metrics.rs index 577d843cc069..d1a92b961ecc 100644 --- a/src/operator/src/metrics.rs +++ b/src/operator/src/metrics.rs @@ -22,4 +22,16 @@ lazy_static! { register_int_counter!("table_operator_ingest_rows", "table operator ingest rows").unwrap(); pub static ref DIST_DELETE_ROW_COUNT: IntCounter = register_int_counter!("table_operator_delete_rows", "table operator delete rows").unwrap(); + /// The elapsed time of COPY FROM statement stages. + pub static ref COPY_FROM_STAGE_ELAPSED: HistogramVec = register_histogram_vec!( + "table_operator_copy_from_stage_elapsed", + "copy to stage elapsed", + &["stage", "file_format", "table"] + ).unwrap(); + /// The elapsed time of COPY FROM statement. + pub static ref COPY_FROM_ELAPSED: HistogramVec = register_histogram_vec!( + "table_operator_copy_from_elapsed", + "copy to elapsed", + &["file_format", "table"] + ).unwrap(); } diff --git a/src/operator/src/statement/copy_table_from.rs b/src/operator/src/statement/copy_table_from.rs index 2474ee3483de..f04ac4d6c5c7 100644 --- a/src/operator/src/statement/copy_table_from.rs +++ b/src/operator/src/statement/copy_table_from.rs @@ -48,6 +48,7 @@ use table::requests::{CopyTableRequest, InsertRequest}; use tokio::io::BufReader; use crate::error::{self, IntoVectorsSnafu, Result}; +use crate::metrics; use crate::statement::StatementExecutor; const DEFAULT_BATCH_SIZE: usize = 8192; @@ -206,11 +207,10 @@ impl StatementExecutor { .await .context(error::ReadObjectSnafu { path })?; - let buf_reader = BufReader::new(reader.compat()); - - let builder = ParquetRecordBatchStreamBuilder::new(buf_reader) + let builder = ParquetRecordBatchStreamBuilder::new(reader.compat()) .await - .context(error::ReadParquetSnafu)?; + .context(error::ReadParquetSnafu)? + .with_batch_size(8192); let upstream = builder .build() @@ -252,19 +252,40 @@ impl StatementExecutor { let format = Format::try_from(&req.with).context(error::ParseFileFormatSnafu)?; - let (object_store, entries) = self.list_copy_from_entries(&req).await?; + let file_format = format.suffix(); + let table_name = table_ref.to_string(); + + let _timer = metrics::COPY_FROM_ELAPSED + .with_label_values(&[file_format, &table_name]) + .start_timer(); + + let (object_store, entries) = { + let _timer = metrics::COPY_FROM_STAGE_ELAPSED + .with_label_values(&["list_entries", file_format, &table_name]) + .start_timer(); + + self.list_copy_from_entries(&req).await? + }; let mut files = Vec::with_capacity(entries.len()); let table_schema = table.schema().arrow_schema().clone(); + let _timer = metrics::COPY_FROM_STAGE_ELAPSED + .with_label_values(&["prepare_entries", file_format, &table_name]) + .start_timer(); for entry in entries.iter() { if entry.metadata().mode() != EntryMode::FILE { continue; } let path = entry.path(); - let file_schema = self - .infer_schema(&format, object_store.clone(), path) - .await?; + let file_schema = { + let _timer = metrics::COPY_FROM_STAGE_ELAPSED + .with_label_values(&["infer_schema", file_format, &table_name]) + .start_timer(); + + self.infer_schema(&format, object_store.clone(), path) + .await? + }; let (file_schema_projection, table_schema_projection, compat_schema) = generated_schema_projection_and_compatible_file_schema(&file_schema, &table_schema); @@ -288,18 +309,28 @@ impl StatementExecutor { path, )) } + _timer.observe_duration(); let mut rows_inserted = 0; + + let _timer = metrics::COPY_FROM_STAGE_ELAPSED + .with_label_values(&["read_and_insert", file_format, &table_name]) + .start_timer(); for (schema, file_schema_projection, projected_table_schema, path) in files { - let mut stream = self - .build_read_stream( + let mut stream = { + let _timer = metrics::COPY_FROM_STAGE_ELAPSED + .with_label_values(&["build_stream", file_format, &table_name]) + .start_timer(); + + self.build_read_stream( &format, object_store.clone(), path, schema, file_schema_projection, ) - .await?; + .await? + }; let fields = projected_table_schema .fields() @@ -312,7 +343,13 @@ impl StatementExecutor { let mut pending_mem_size = 0; let mut pending = vec![]; - while let Some(r) = stream.next().await { + while let Some(r) = { + let _timer = metrics::COPY_FROM_STAGE_ELAPSED + .with_label_values(&["stream_read", file_format, &table_name]) + .start_timer(); + + stream.next().await + } { let record_batch = r.context(error::ReadDfRecordBatchSnafu)?; let vectors = Helper::try_into_vectors(record_batch.columns()).context(IntoVectorsSnafu)?; @@ -336,14 +373,22 @@ impl StatementExecutor { )); if pending_mem_size as u64 >= pending_mem_threshold { + let _timer = metrics::COPY_FROM_STAGE_ELAPSED + .with_label_values(&["batch_insert", file_format, &table_name]) + .start_timer(); rows_inserted += batch_insert(&mut pending, &mut pending_mem_size).await?; } } if !pending.is_empty() { + let _timer = metrics::COPY_FROM_STAGE_ELAPSED + .with_label_values(&["batch_insert", file_format, &table_name]) + .start_timer(); + rows_inserted += batch_insert(&mut pending, &mut pending_mem_size).await?; } } + _timer.observe_duration(); Ok(rows_inserted) }