Skip to content

Commit

Permalink
chore: use instrument instead of trace
Browse files Browse the repository at this point in the history
  • Loading branch information
Taylor-lagrange committed Nov 17, 2023
1 parent 56d2ec1 commit 96a019a
Show file tree
Hide file tree
Showing 17 changed files with 64 additions and 116 deletions.
10 changes: 4 additions & 6 deletions src/common/meta/src/ddl_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,7 @@ impl DdlManager {
})
}

#[tracing::instrument(skip_all)]
pub async fn submit_alter_table_task(
&self,
cluster_id: u64,
Expand All @@ -157,6 +158,7 @@ impl DdlManager {
self.submit_procedure(procedure_with_id).await
}

#[tracing::instrument(skip_all)]
pub async fn submit_create_table_task(
&self,
cluster_id: u64,
Expand All @@ -173,6 +175,7 @@ impl DdlManager {
self.submit_procedure(procedure_with_id).await
}

#[tracing::instrument(skip_all)]
pub async fn submit_drop_table_task(
&self,
cluster_id: u64,
Expand All @@ -195,6 +198,7 @@ impl DdlManager {
self.submit_procedure(procedure_with_id).await
}

#[tracing::instrument(skip_all)]
pub async fn submit_truncate_table_task(
&self,
cluster_id: u64,
Expand Down Expand Up @@ -262,9 +266,6 @@ async fn handle_truncate_table_task(
table_info_value,
table_route,
)
.trace(tracing::info_span!(
"DdlManager::submit_truncate_table_task"
))
.await?;

info!("Table: {table_id} is truncated via procedure_id {id:?}");
Expand Down Expand Up @@ -307,7 +308,6 @@ async fn handle_alter_table_task(

let id = ddl_manager
.submit_alter_table_task(cluster_id, alter_table_task, table_info_value)
.trace(tracing::info_span!("DdlManager::submit_alter_table_task"))
.await?;

info!("Table: {table_id} is altered via procedure_id {id:?}");
Expand Down Expand Up @@ -345,7 +345,6 @@ async fn handle_drop_table_task(
table_info_value,
table_route_value,
)
.trace(tracing::info_span!("DdlManager::submit_drop_table_task"))
.await?;

info!("Table: {table_id} is dropped via procedure_id {id:?}");
Expand All @@ -372,7 +371,6 @@ async fn handle_create_table_task(

let id = ddl_manager
.submit_create_table_task(cluster_id, create_table_task, region_routes)
.trace(tracing::info_span!("DdlManager::submit_create_table_task"))
.await?;

info!("Table: {table_id:?} is created via procedure_id {id:?}");
Expand Down
3 changes: 2 additions & 1 deletion src/datanode/src/region_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use common_query::physical_plan::DfPhysicalPlanAdapter;
use common_query::{DfPhysicalPlan, Output};
use common_recordbatch::SendableRecordBatchStream;
use common_runtime::Runtime;
use common_telemetry::tracing::info_span;
use common_telemetry::tracing::{self, info_span};
use common_telemetry::tracing_context::{FutureExt, TracingContext};
use common_telemetry::{info, warn};
use dashmap::DashMap;
Expand Down Expand Up @@ -101,6 +101,7 @@ impl RegionServer {
self.inner.handle_request(region_id, request).await
}

#[tracing::instrument(skip_all)]
pub async fn handle_read(&self, request: QueryRequest) -> Result<SendableRecordBatchStream> {
self.inner.handle_read(request).await
}
Expand Down
4 changes: 1 addition & 3 deletions src/frontend/src/instance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,8 @@ use common_procedure::local::{LocalManager, ManagerConfig};
use common_procedure::options::ProcedureConfig;
use common_procedure::ProcedureManagerRef;
use common_query::Output;
use common_telemetry::error;
use common_telemetry::logging::info;
use common_telemetry::tracing_context::FutureExt;
use common_telemetry::{error, tracing};
use datanode::region_server::RegionServer;
use log_store::raft_engine::RaftEngineBackend;
use meta_client::client::{MetaClient, MetaClientBuilder};
Expand Down Expand Up @@ -414,7 +413,6 @@ impl Instance {
let stmt = QueryStatement::Sql(stmt);
self.statement_executor
.execute_stmt(stmt, query_ctx)
.trace(tracing::info_span!("Instance::query_statement"))
.await
.context(TableOperationSnafu)
}
Expand Down
69 changes: 13 additions & 56 deletions src/operator/src/statement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ use common_meta::kv_backend::KvBackendRef;
use common_meta::table_name::TableName;
use common_query::Output;
use common_telemetry::tracing;
use common_telemetry::tracing_context::FutureExt;
use common_time::range::TimestampRange;
use common_time::Timestamp;
use partition::manager::{PartitionRuleManager, PartitionRuleManagerRef};
Expand Down Expand Up @@ -90,6 +89,7 @@ impl StatementExecutor {
}
}

#[tracing::instrument(skip_all)]
pub async fn execute_stmt(
&self,
stmt: QueryStatement,
Expand All @@ -104,103 +104,62 @@ impl StatementExecutor {
pub async fn execute_sql(&self, stmt: Statement, query_ctx: QueryContextRef) -> Result<Output> {
match stmt {
Statement::Query(_) | Statement::Explain(_) | Statement::Delete(_) => {
self.plan_exec(QueryStatement::Sql(stmt), query_ctx)
.trace(tracing::info_span!("StatementExecutor::plan_exec"))
.await
self.plan_exec(QueryStatement::Sql(stmt), query_ctx).await
}

Statement::Insert(insert) => {
self.insert(insert, query_ctx)
.trace(tracing::info_span!("StatementExecutor::insert"))
.await
}
Statement::Insert(insert) => self.insert(insert, query_ctx).await,

Statement::Tql(tql) => {
self.execute_tql(tql, query_ctx)
.trace(tracing::info_span!("StatementExecutor::execute_tql"))
.await
}
Statement::Tql(tql) => self.execute_tql(tql, query_ctx).await,

Statement::DescribeTable(stmt) => {
self.describe_table(stmt, query_ctx)
.trace(tracing::info_span!("StatementExecutor::describe_table"))
.await
}
Statement::DescribeTable(stmt) => self.describe_table(stmt, query_ctx).await,

Statement::ShowDatabases(stmt) => {
self.show_databases(stmt, query_ctx)
.trace(tracing::info_span!("StatementExecutor::show_databases"))
.await
}
Statement::ShowDatabases(stmt) => self.show_databases(stmt, query_ctx).await,

Statement::ShowTables(stmt) => {
self.show_tables(stmt, query_ctx)
.trace(tracing::info_span!("StatementExecutor::show_tables"))
.await
}
Statement::ShowTables(stmt) => self.show_tables(stmt, query_ctx).await,

Statement::Copy(sql::statements::copy::Copy::CopyTable(stmt)) => {
let req = to_copy_table_request(stmt, query_ctx.clone())?;
match req.direction {
CopyDirection::Export => self
.copy_table_to(req, query_ctx)
.trace(tracing::info_span!("StatementExecutor::copy_table_export"))
.await
.map(Output::AffectedRows),
CopyDirection::Import => self
.copy_table_from(req, query_ctx)
.trace(tracing::info_span!("StatementExecutor::copy_table_import"))
.await
.map(Output::AffectedRows),
}
}

Statement::Copy(sql::statements::copy::Copy::CopyDatabase(arg)) => {
self.copy_database(to_copy_database_request(arg, &query_ctx)?)
.trace(tracing::info_span!("StatementExecutor::copy_database"))
.await
}

Statement::CreateTable(stmt) => {
let _ = self
.create_table(stmt, query_ctx)
.trace(tracing::info_span!("StatementExecutor::create_table"))
.await?;
let _ = self.create_table(stmt, query_ctx).await?;
Ok(Output::AffectedRows(0))
}
Statement::CreateExternalTable(stmt) => {
let _ = self
.create_external_table(stmt, query_ctx)
.trace(tracing::info_span!(
"StatementExecutor::create_external_table"
))
.await?;
let _ = self.create_external_table(stmt, query_ctx).await?;
Ok(Output::AffectedRows(0))
}
Statement::Alter(alter_table) => {
self.alter_table(alter_table, query_ctx)
.trace(tracing::info_span!("StatementExecutor::alter_table"))
.await
}
Statement::Alter(alter_table) => self.alter_table(alter_table, query_ctx).await,
Statement::DropTable(stmt) => {
let (catalog, schema, table) =
table_idents_to_full_name(stmt.table_name(), query_ctx)
.map_err(BoxedError::new)
.context(error::ExternalSnafu)?;
let table_name = TableName::new(catalog, schema, table);
self.drop_table(table_name)
.trace(tracing::info_span!("StatementExecutor::drop_table"))
.await
self.drop_table(table_name).await
}
Statement::TruncateTable(stmt) => {
let (catalog, schema, table) =
table_idents_to_full_name(stmt.table_name(), query_ctx)
.map_err(BoxedError::new)
.context(error::ExternalSnafu)?;
let table_name = TableName::new(catalog, schema, table);
self.truncate_table(table_name)
.trace(tracing::info_span!("StatementExecutor::truncate_table"))
.await
self.truncate_table(table_name).await
}

Statement::CreateDatabase(stmt) => {
Expand All @@ -209,7 +168,6 @@ impl StatementExecutor {
&format_raw_object_name(&stmt.name),
stmt.if_not_exists,
)
.trace(tracing::info_span!("StatementExecutor::create_database"))
.await
}

Expand All @@ -228,7 +186,6 @@ impl StatementExecutor {
let table_name = TableName::new(catalog, schema, table);

self.show_create_table(table_name, table_ref, query_ctx)
.trace(tracing::info_span!("StatementExecutor::show_create_table"))
.await
}
}
Expand All @@ -242,11 +199,11 @@ impl StatementExecutor {
self.query_engine
.planner()
.plan(stmt, query_ctx)
.trace(tracing::info_span!("StatementExecutor::plan"))
.await
.context(PlanStatementSnafu)
}

#[tracing::instrument(skip_all)]
async fn plan_exec(&self, stmt: QueryStatement, query_ctx: QueryContextRef) -> Result<Output> {
let plan = self.plan(stmt, query_ctx.clone()).await?;
self.query_engine
Expand Down
3 changes: 2 additions & 1 deletion src/operator/src/statement/backup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

use common_datasource::file_format::Format;
use common_query::Output;
use common_telemetry::info;
use common_telemetry::{info, tracing};
use session::context::QueryContextBuilder;
use snafu::{ensure, ResultExt};
use table::requests::{CopyDatabaseRequest, CopyDirection, CopyTableRequest};
Expand All @@ -27,6 +27,7 @@ pub(crate) const COPY_DATABASE_TIME_START_KEY: &str = "start_time";
pub(crate) const COPY_DATABASE_TIME_END_KEY: &str = "end_time";

impl StatementExecutor {
#[tracing::instrument(skip_all)]
pub(crate) async fn copy_database(&self, req: CopyDatabaseRequest) -> error::Result<Output> {
// location must end with / so that every table is exported to a file.
ensure!(
Expand Down
3 changes: 2 additions & 1 deletion src/operator/src/statement/copy_table_from.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use common_datasource::object_store::{build_backend, parse_url};
use common_datasource::util::find_dir_and_filename;
use common_recordbatch::adapter::ParquetRecordBatchStreamAdapter;
use common_recordbatch::DfSendableRecordBatchStream;
use common_telemetry::debug;
use common_telemetry::{debug, tracing};
use datafusion::datasource::listing::PartitionedFile;
use datafusion::datasource::object_store::ObjectStoreUrl;
use datafusion::datasource::physical_plan::{FileOpener, FileScanConfig, FileStream};
Expand Down Expand Up @@ -237,6 +237,7 @@ impl StatementExecutor {
}
}

#[tracing::instrument(skip_all)]
pub async fn copy_table_from(
&self,
req: CopyTableRequest,
Expand Down
3 changes: 2 additions & 1 deletion src/operator/src/statement/copy_table_to.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use common_datasource::util::find_dir_and_filename;
use common_query::Output;
use common_recordbatch::adapter::DfRecordBatchStreamAdapter;
use common_recordbatch::SendableRecordBatchStream;
use common_telemetry::debug;
use common_telemetry::{debug, tracing};
use datafusion::datasource::DefaultTableSource;
use datafusion_common::TableReference as DfTableReference;
use datafusion_expr::LogicalPlanBuilder;
Expand Down Expand Up @@ -84,6 +84,7 @@ impl StatementExecutor {
}
}

#[tracing::instrument(skip_all)]
pub(crate) async fn copy_table_to(
&self,
req: CopyTableRequest,
Expand Down
8 changes: 7 additions & 1 deletion src/operator/src/statement/ddl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use common_meta::rpc::ddl::{DdlTask, SubmitDdlTaskRequest, SubmitDdlTaskResponse
use common_meta::rpc::router::{Partition, Partition as MetaPartition};
use common_meta::table_name::TableName;
use common_query::Output;
use common_telemetry::info;
use common_telemetry::{info, tracing};
use datatypes::prelude::ConcreteDataType;
use datatypes::schema::RawSchema;
use partition::partition::{PartitionBound, PartitionDef};
Expand Down Expand Up @@ -58,11 +58,13 @@ impl StatementExecutor {
self.catalog_manager.clone()
}

#[tracing::instrument(skip_all)]
pub async fn create_table(&self, stmt: CreateTable, ctx: QueryContextRef) -> Result<TableRef> {
let create_expr = &mut expr_factory::create_to_expr(&stmt, ctx)?;
self.create_table_inner(create_expr, stmt.partitions).await
}

#[tracing::instrument(skip_all)]
pub async fn create_external_table(
&self,
create_expr: CreateExternalTable,
Expand Down Expand Up @@ -151,6 +153,7 @@ impl StatementExecutor {
Ok(table)
}

#[tracing::instrument(skip_all)]
pub async fn drop_table(&self, table_name: TableName) -> Result<Output> {
let table = self
.catalog_manager
Expand Down Expand Up @@ -181,6 +184,7 @@ impl StatementExecutor {
Ok(Output::AffectedRows(0))
}

#[tracing::instrument(skip_all)]
pub async fn truncate_table(&self, table_name: TableName) -> Result<Output> {
let table = self
.catalog_manager
Expand Down Expand Up @@ -221,6 +225,7 @@ impl StatementExecutor {
Ok(())
}

#[tracing::instrument(skip_all)]
pub async fn alter_table(
&self,
alter_table: AlterTable,
Expand Down Expand Up @@ -347,6 +352,7 @@ impl StatementExecutor {
.context(error::ExecuteDdlSnafu)
}

#[tracing::instrument(skip_all)]
pub async fn create_database(
&self,
catalog: &str,
Expand Down
2 changes: 2 additions & 0 deletions src/operator/src/statement/describe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

use common_error::ext::BoxedError;
use common_query::Output;
use common_telemetry::tracing;
use session::context::QueryContextRef;
use snafu::{OptionExt, ResultExt};
use sql::statements::describe::DescribeTable;
Expand All @@ -26,6 +27,7 @@ use crate::statement::StatementExecutor;
use crate::table::table_idents_to_full_name;

impl StatementExecutor {
#[tracing::instrument(skip_all)]
pub(super) async fn describe_table(
&self,
stmt: DescribeTable,
Expand Down
2 changes: 2 additions & 0 deletions src/operator/src/statement/dml.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
// limitations under the License.

use common_query::Output;
use common_telemetry::tracing;
use query::parser::QueryStatement;
use session::context::QueryContextRef;
use sql::statements::insert::Insert;
Expand All @@ -22,6 +23,7 @@ use super::StatementExecutor;
use crate::error::Result;

impl StatementExecutor {
#[tracing::instrument(skip_all)]
pub async fn insert(&self, insert: Box<Insert>, query_ctx: QueryContextRef) -> Result<Output> {
if insert.can_extract_values() {
// Fast path: plain insert ("insert with literal values") is executed directly
Expand Down
Loading

0 comments on commit 96a019a

Please sign in to comment.