Skip to content

Commit

Permalink
Change some debug to info.
Browse files Browse the repository at this point in the history
  • Loading branch information
plypaul committed Sep 24, 2024
1 parent 76692f6 commit 70dcf64
Show file tree
Hide file tree
Showing 4 changed files with 37 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,7 @@
from dbt_common.exceptions.base import DbtDatabaseError
from dbt_semantic_interfaces.enum_extension import assert_values_exhausted
from metricflow_semantics.errors.error_classes import SqlBindParametersNotSupportedError
from metricflow_semantics.mf_logging.formatting import indent
from metricflow_semantics.mf_logging.lazy_formattable import LazyFormat
from metricflow_semantics.mf_logging.pretty_print import mf_pformat
from metricflow_semantics.random_id import random_id
from metricflow_semantics.sql.sql_bind_parameters import SqlBindParameters

Expand Down Expand Up @@ -142,16 +140,14 @@ def query(
request_id = SqlRequestId(f"mf_rid__{random_id()}")
if sql_bind_parameters.param_dict:
raise SqlBindParametersNotSupportedError(
f"Invalid execute statement - we do not support queries with bind parameters through dbt adapters! "
f"Invalid query statement - we do not support queries with bind parameters through dbt adapters! "
f"Bind params: {sql_bind_parameters.param_dict}"
)
logger.debug(
LazyFormat(lambda: AdapterBackedSqlClient._format_run_query_log_message(stmt, sql_bind_parameters))
)
logger.info(LazyFormat("Running query() statement", statement=stmt, param_dict=sql_bind_parameters.param_dict))
with self._adapter.connection_named(f"MetricFlow_request_{request_id}"):
# returns a Tuple[AdapterResponse, agate.Table] but the decorator converts it to Any
result = self._adapter.execute(sql=stmt, auto_begin=True, fetch=True)
logger.debug(LazyFormat(lambda: f"Query returned from dbt Adapter with response {result[0]}"))
logger.info(LazyFormat(lambda: f"query() returned from dbt Adapter with response {result[0]}"))

agate_data = result[1]
rows = [row.values() for row in agate_data.rows]
Expand All @@ -160,9 +156,10 @@ def query(
rows=rows,
)
stop = time.time()
logger.debug(

logger.info(
LazyFormat(
lambda: f"Finished running the query in {stop - start:.2f}s with {data_table.row_count} row(s) returned"
"Finished running query()", runtime=f"{stop - start:.2f}s", returned_row_count=data_table.row_count
)
)
return data_table
Expand All @@ -186,16 +183,17 @@ def execute(
)
start = time.time()
request_id = SqlRequestId(f"mf_rid__{random_id()}")
logger.debug(
LazyFormat(lambda: AdapterBackedSqlClient._format_run_query_log_message(stmt, sql_bind_parameters))
logger.info(
LazyFormat("Running execute() statement", statement=stmt, param_dict=sql_bind_parameters.param_dict)
)
with self._adapter.connection_named(f"MetricFlow_request_{request_id}"):
result = self._adapter.execute(stmt, auto_begin=True, fetch=False)
# Calls to execute often involve some amount of DDL so we commit here
self._adapter.commit_if_has_connection()
logger.debug(LazyFormat(lambda: f"Query executed via dbt Adapter with response {result[0]}"))
logger.debug(LazyFormat(lambda: f"execute() returned from dbt Adapter with response {result[0]}"))
stop = time.time()
logger.debug(LazyFormat(lambda: f"Finished running the query in {stop - start:.2f}s"))
logger.info(LazyFormat("Finished execute()", runtime=f"{stop - start:.2f}s"))

return None

def dry_run(
Expand All @@ -213,17 +211,7 @@ def dry_run(
concrete values for SQL query parameters.
"""
start = time.time()
logger.debug(
LazyFormat(
lambda: f"Running dry_run of:"
f"\n\n{indent(stmt)}\n"
+ (
f"\nwith parameters: {dict(sql_bind_parameters.param_dict)}"
if sql_bind_parameters.param_dict
else ""
)
)
)
logger.info(LazyFormat("Running dry run", statement=stmt, param_dict=sql_bind_parameters.param_dict))
request_id = SqlRequestId(f"mf_rid__{random_id()}")
connection_name = f"MetricFlow_dry_run_request_{request_id}"
# TODO - consolidate to self._adapter.validate_sql() when all implementations will work from within MetricFlow
Expand Down Expand Up @@ -259,7 +247,7 @@ def dry_run(
raise DbtDatabaseError(f"Encountered error in Databricks dry run. Full output: {plan_output_str}")

stop = time.time()
logger.debug(LazyFormat(lambda: f"Finished running the dry_run in {stop - start:.2f}s"))
logger.info(LazyFormat("Finished running the dry run", runtime=f"{stop - start:.2f}s"))
return

def close(self) -> None: # noqa: D102
Expand All @@ -270,11 +258,3 @@ def render_bind_parameter_key(self, bind_parameter_key: str) -> str:
raise SqlBindParametersNotSupportedError(
"We do not support queries with bind parameters through dbt adapters, so we do not have rendering enabled!"
)

@staticmethod
def _format_run_query_log_message(statement: str, sql_bind_parameters: SqlBindParameters) -> str:
"""Helper for creating nicely formatted query logging."""
message = f"Running query:\n\n{indent(statement)}"
if len(sql_bind_parameters.param_dict) > 0:
message += f"\n\nwith parameters:\n\n{indent(mf_pformat(sql_bind_parameters.param_dict))}"
return message
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,13 @@ def _inner(*args: ParametersType.args, **kwargs: ParametersType.kwargs) -> Retur
# __qualname__ includes the path like MyClass.my_function
function_name = f"{wrapped_function.__qualname__}()"
start_time = time.time()
logger.debug(LazyFormat(lambda: f"Starting {function_name}"))
logger.info(LazyFormat(lambda: f"Starting {function_name}"))

try:
result = wrapped_function(*args, **kwargs)
finally:
runtime = time.time() - start_time
logger.debug(LazyFormat(lambda: f"Finished {function_name} in {runtime:.1f}s"))
logger.info(LazyFormat(lambda: f"Finished {function_name} in {runtime:.1f}s"))
if runtime > runtime_warning_threshold:
logger.warning(LazyFormat(lambda: f"{function_name} is slow with a runtime of {runtime:.1f}s"))

Expand All @@ -53,11 +53,11 @@ def log_block_runtime(code_block_name: str, runtime_warning_threshold: float = 5
"""Logs the runtime of the enclosed code block."""
start_time = time.time()
description = f"code_block_name={repr(code_block_name)}"
logger.debug(LazyFormat(lambda: f"Starting {description}"))
logger.info(LazyFormat(lambda: f"Starting {description}"))

yield

runtime = time.time() - start_time
logger.debug(LazyFormat(lambda: f"Finished {description} in {runtime:.1f}s"))
logger.info(LazyFormat(lambda: f"Finished {description} in {runtime:.1f}s"))
if runtime > runtime_warning_threshold:
logger.warning(LazyFormat(lambda: f"{description} is slow with a runtime of {runtime:.1f}s"))
Original file line number Diff line number Diff line change
Expand Up @@ -494,7 +494,7 @@ def _parse_and_validate_query(

query_resolution = query_resolver.resolve_query(resolver_input_for_query)

logger.debug(LazyFormat(lambda: "Query resolution is:\n" + indent(mf_pformat(query_resolution))))
logger.debug(LazyFormat("Resolved query", query_resolution=query_resolution))

self._raise_exception_if_there_are_errors(
input_to_issue_set=query_resolution.input_to_issue_set.merge(
Expand Down
27 changes: 19 additions & 8 deletions metricflow/engine/metricflow_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,8 @@
from metricflow_semantics.dag.sequential_id import SequentialIdGenerator
from metricflow_semantics.errors.error_classes import ExecutionException
from metricflow_semantics.filters.time_constraint import TimeRangeConstraint
from metricflow_semantics.mf_logging.formatting import indent
from metricflow_semantics.mf_logging.lazy_formattable import LazyFormat
from metricflow_semantics.mf_logging.pretty_print import mf_pformat
from metricflow_semantics.mf_logging.runtime import log_block_runtime
from metricflow_semantics.model.linkable_element_property import LinkableElementProperty
from metricflow_semantics.model.semantic_manifest_lookup import SemanticManifestLookup
from metricflow_semantics.model.semantics.linkable_element import LinkableDimension
Expand Down Expand Up @@ -419,7 +418,7 @@ def __init__(

@log_call(module_name=__name__, telemetry_reporter=_telemetry_reporter)
def query(self, mf_request: MetricFlowQueryRequest) -> MetricFlowQueryResult: # noqa: D102
logger.debug(LazyFormat(lambda: f"Starting query request:\n{indent(mf_pformat(mf_request))}"))
logger.info(LazyFormat("Starting query request", mf_request=mf_request))
explain_result = self._create_execution_plan(mf_request)
execution_plan = explain_result.convert_to_execution_plan_result.execution_plan

Expand All @@ -439,7 +438,7 @@ def query(self, mf_request: MetricFlowQueryRequest) -> MetricFlowQueryResult: #

assert task_execution_result.sql, "Task execution should have returned SQL that was run"

logger.debug(LazyFormat(lambda: f"Finished query request: {mf_request.request_id}"))
logger.info(LazyFormat("Finished query request", request_id=mf_request.request_id))
return MetricFlowQueryResult(
query_spec=explain_result.query_spec,
dataflow_plan=explain_result.dataflow_plan,
Expand Down Expand Up @@ -494,7 +493,7 @@ def _create_execution_plan(self, mf_query_request: MetricFlowQueryRequest) -> Me
order_by=mf_query_request.order_by,
min_max_only=mf_query_request.min_max_only,
).query_spec
logger.debug(LazyFormat(lambda: f"Query spec is:\n{mf_pformat(query_spec)}"))
logger.info(LazyFormat("Parsed query", query_spec=query_spec))

output_selection_specs: Optional[InstanceSpecSet] = None
if mf_query_request.query_type == MetricFlowQueryType.DIMENSION_VALUES:
Expand All @@ -505,14 +504,25 @@ def _create_execution_plan(self, mf_query_request: MetricFlowQueryRequest) -> Me
dimension_specs=query_spec.dimension_specs,
time_dimension_specs=query_spec.time_dimension_specs,
)

if query_spec.metric_specs:
logger.info(
LazyFormat(
"Building dataflow plan", dataflow_plan_optimizations=mf_query_request.dataflow_plan_optimizations
)
)
dataflow_plan = self._dataflow_plan_builder.build_plan(
query_spec=query_spec,
output_selection_specs=output_selection_specs,
optimizations=mf_query_request.dataflow_plan_optimizations,
)
else:
logger.info(
LazyFormat(
"Building dataflow plan for distinct values",
dataflow_plan_optimizations=mf_query_request.dataflow_plan_optimizations,
)
)

dataflow_plan = self._dataflow_plan_builder.build_plan_for_distinct_values(
query_spec=query_spec, optimizations=mf_query_request.dataflow_plan_optimizations
)
Expand All @@ -523,8 +533,8 @@ def _create_execution_plan(self, mf_query_request: MetricFlowQueryRequest) -> Me
f"Got tasks: {dataflow_plan.sink_nodes}"
)

logger.info(LazyFormat("Building execution plan"))
convert_to_execution_plan_result = self._to_execution_plan_converter.convert_to_execution_plan(dataflow_plan)

return MetricFlowExplainResult(
query_spec=query_spec,
dataflow_plan=dataflow_plan,
Expand All @@ -533,7 +543,8 @@ def _create_execution_plan(self, mf_query_request: MetricFlowQueryRequest) -> Me

@log_call(module_name=__name__, telemetry_reporter=_telemetry_reporter)
def explain(self, mf_request: MetricFlowQueryRequest) -> MetricFlowExplainResult: # noqa: D102
return self._create_execution_plan(mf_request)
with log_block_runtime("explain"):
return self._create_execution_plan(mf_request)

def get_measures_for_metrics(self, metric_names: List[str]) -> List[Measure]: # noqa: D102
metrics = self._semantic_manifest_lookup.metric_lookup.get_metrics(
Expand Down

0 comments on commit 70dcf64

Please sign in to comment.