Skip to content

Commit

Permalink
chore: add metrics for COPY FROM
Browse files Browse the repository at this point in the history
  • Loading branch information
WenyXu committed Dec 6, 2023
1 parent f9e7762 commit 1cccc80
Show file tree
Hide file tree
Showing 2 changed files with 69 additions and 12 deletions.
12 changes: 12 additions & 0 deletions src/operator/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,16 @@ lazy_static! {
register_int_counter!("table_operator_ingest_rows", "table operator ingest rows").unwrap();
pub static ref DIST_DELETE_ROW_COUNT: IntCounter =
register_int_counter!("table_operator_delete_rows", "table operator delete rows").unwrap();
/// The elapsed time of COPY FROM statement stages.
pub static ref COPY_FROM_STAGE_ELAPSED: HistogramVec = register_histogram_vec!(
"table_operator_copy_from_stage_elapsed",
"copy to stage elapsed",
&["stage", "file_format", "table"]
).unwrap();
/// The elapsed time of COPY FROM statement.
pub static ref COPY_FROM_ELAPSED: HistogramVec = register_histogram_vec!(
"table_operator_copy_from_elapsed",
"copy to elapsed",
&["file_format", "table"]
).unwrap();
}
69 changes: 57 additions & 12 deletions src/operator/src/statement/copy_table_from.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ use table::requests::{CopyTableRequest, InsertRequest};
use tokio::io::BufReader;

use crate::error::{self, IntoVectorsSnafu, Result};
use crate::metrics;
use crate::statement::StatementExecutor;

const DEFAULT_BATCH_SIZE: usize = 8192;
Expand Down Expand Up @@ -206,11 +207,10 @@ impl StatementExecutor {
.await
.context(error::ReadObjectSnafu { path })?;

let buf_reader = BufReader::new(reader.compat());

let builder = ParquetRecordBatchStreamBuilder::new(buf_reader)
let builder = ParquetRecordBatchStreamBuilder::new(reader.compat())
.await
.context(error::ReadParquetSnafu)?;
.context(error::ReadParquetSnafu)?
.with_batch_size(8192);

let upstream = builder
.build()
Expand Down Expand Up @@ -252,19 +252,40 @@ impl StatementExecutor {

let format = Format::try_from(&req.with).context(error::ParseFileFormatSnafu)?;

let (object_store, entries) = self.list_copy_from_entries(&req).await?;
let file_format = format.suffix();
let table_name = table_ref.to_string();

let _timer = metrics::COPY_FROM_ELAPSED
.with_label_values(&[file_format, &table_name])
.start_timer();

let (object_store, entries) = {
let _timer = metrics::COPY_FROM_STAGE_ELAPSED
.with_label_values(&["list_entries", file_format, &table_name])
.start_timer();

self.list_copy_from_entries(&req).await?
};

let mut files = Vec::with_capacity(entries.len());
let table_schema = table.schema().arrow_schema().clone();

let _timer = metrics::COPY_FROM_STAGE_ELAPSED
.with_label_values(&["prepare_entries", file_format, &table_name])
.start_timer();
for entry in entries.iter() {
if entry.metadata().mode() != EntryMode::FILE {
continue;
}
let path = entry.path();
let file_schema = self
.infer_schema(&format, object_store.clone(), path)
.await?;
let file_schema = {
let _timer = metrics::COPY_FROM_STAGE_ELAPSED
.with_label_values(&["infer_schema", file_format, &table_name])
.start_timer();

self.infer_schema(&format, object_store.clone(), path)
.await?
};
let (file_schema_projection, table_schema_projection, compat_schema) =
generated_schema_projection_and_compatible_file_schema(&file_schema, &table_schema);

Expand All @@ -288,18 +309,28 @@ impl StatementExecutor {
path,
))
}
_timer.observe_duration();

let mut rows_inserted = 0;

let _timer = metrics::COPY_FROM_STAGE_ELAPSED
.with_label_values(&["read_and_insert", file_format, &table_name])
.start_timer();
for (schema, file_schema_projection, projected_table_schema, path) in files {
let mut stream = self
.build_read_stream(
let mut stream = {
let _timer = metrics::COPY_FROM_STAGE_ELAPSED
.with_label_values(&["build_stream", file_format, &table_name])
.start_timer();

self.build_read_stream(
&format,
object_store.clone(),
path,
schema,
file_schema_projection,
)
.await?;
.await?
};

let fields = projected_table_schema
.fields()
Expand All @@ -312,7 +343,13 @@ impl StatementExecutor {
let mut pending_mem_size = 0;
let mut pending = vec![];

while let Some(r) = stream.next().await {
while let Some(r) = {
let _timer = metrics::COPY_FROM_STAGE_ELAPSED
.with_label_values(&["stream_read", file_format, &table_name])
.start_timer();

stream.next().await
} {
let record_batch = r.context(error::ReadDfRecordBatchSnafu)?;
let vectors =
Helper::try_into_vectors(record_batch.columns()).context(IntoVectorsSnafu)?;
Expand All @@ -336,14 +373,22 @@ impl StatementExecutor {
));

if pending_mem_size as u64 >= pending_mem_threshold {
let _timer = metrics::COPY_FROM_STAGE_ELAPSED
.with_label_values(&["batch_insert", file_format, &table_name])
.start_timer();
rows_inserted += batch_insert(&mut pending, &mut pending_mem_size).await?;
}
}

if !pending.is_empty() {
let _timer = metrics::COPY_FROM_STAGE_ELAPSED
.with_label_values(&["batch_insert", file_format, &table_name])
.start_timer();

rows_inserted += batch_insert(&mut pending, &mut pending_mem_size).await?;
}
}
_timer.observe_duration();

Ok(rows_inserted)
}
Expand Down

0 comments on commit 1cccc80

Please sign in to comment.