Skip to content

Commit

Permalink
Merge pull request #87 from bqbooster/combine-audit-logs-information-…
Browse files Browse the repository at this point in the history
…schema

Allow to combine audit logs and information schema jobs
  • Loading branch information
Kayrnt authored Dec 13, 2024
2 parents 1a59629 + 9b8dc01 commit 5266257
Show file tree
Hide file tree
Showing 13 changed files with 143 additions and 33 deletions.
6 changes: 6 additions & 0 deletions .changes/unreleased/Features-20241212-022522.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
kind: Features
body: Allow to combine audit logs and information schema jobs table to have the best of both worlds
time: 2024-12-12T02:25:22.048577+01:00
custom:
Author: Kayrnt
Issue: ""
27 changes: 25 additions & 2 deletions .github/workflows/pr_run_models.yml
Original file line number Diff line number Diff line change
Expand Up @@ -51,11 +51,34 @@ jobs:
echo ${{ secrets.DBT_ENV_SECRET_BIGQUERY_TEST_SERVICE_ACCOUNT }} | base64 -d > ./integration_tests/keyfile.json
fi
- name: Run All models on BigQuery
- name: Setup dbt dependencies
run: |
cd integration_tests
dbt deps
- name: Setup dbt fixtures
run: |
cd integration_tests
dbt run -s tag:fixtures
- name: Run all models once
run: |
cd integration_tests
dbt build -s dbt_bigquery_monitoring --full-refresh --empty
- name: Run all models again to test incremental
run: |
cd integration_tests
dbt build -s dbt_bigquery_monitoring --empty
DBT_BQ_MONITORING_GCP_BIGQUERY_AUDIT_LOGS=True dbt run -s jobs_from_audit_logs+ --full-refresh --empty
- name: Run all models again with cloud audit logs
run: |
cd integration_tests
DBT_BQ_MONITORING_SHOULD_COMBINE_AUDIT_LOGS_AND_INFORMATION_SCHEMA=true DBT_BQ_MONITORING_GCP_BIGQUERY_AUDIT_LOGS=true \
dbt run -s jobs_from_audit_logs+ --full-refresh --empty
- name: Run all models again to test incremental with cloud audit logs
run: |
cd integration_tests
DBT_BQ_MONITORING_SHOULD_COMBINE_AUDIT_LOGS_AND_INFORMATION_SCHEMA=true DBT_BQ_MONITORING_GCP_BIGQUERY_AUDIT_LOGS=true \
dbt run -s jobs_from_audit_logs+ --empty
2 changes: 1 addition & 1 deletion .sqlfluff
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[sqlfluff]
exclude_rules = ST07, AM04, CV03, LT05, ST06, RF04, AM06, ST05, LT02, CP02, LT07
exclude_rules = ST07, AM04, CV03, LT05, ST06, RF04, AM06, ST05, LT02, CP02, LT07, LT14, RF01
dialect = bigquery
templater = dbt

Expand Down
1 change: 1 addition & 0 deletions dbt_project.yml
Original file line number Diff line number Diff line change
Expand Up @@ -73,3 +73,4 @@ vars:
gcp_bigquery_audit_logs_storage_project: "{{ env_var('DBT_BQ_MONITORING_GCP_BIGQUERY_AUDIT_LOGS_STORAGE_PROJECT', 'placeholder') if var('enable_gcp_bigquery_audit_logs') | as_bool else None }}"
gcp_bigquery_audit_logs_dataset: "{{ env_var('DBT_BQ_MONITORING_GCP_BIGQUERY_AUDIT_LOGS_DATASET', 'placeholder') if var('enable_gcp_bigquery_audit_logs') | as_bool else None }}"
gcp_bigquery_audit_logs_table: "{{ env_var('DBT_BQ_MONITORING_GCP_BIGQUERY_AUDIT_LOGS_TABLE', 'placeholder') if var('enable_gcp_bigquery_audit_logs') | as_bool else None }}"
should_combine_audit_logs_and_information_schema: "{{ env_var('DBT_BQ_MONITORING_SHOULD_COMBINE_AUDIT_LOGS_AND_INFORMATION_SCHEMA', false) }}"
1 change: 1 addition & 0 deletions docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -144,3 +144,4 @@ See [GCP BigQuery Audit logs](#bigquery-audit-logs-mode) for more information.
| `gcp_bigquery_audit_logs_storage_project` | `DBT_BQ_MONITORING_GCP_BIGQUERY_AUDIT_LOGS_STORAGE_PROJECT` | The GCP project where BigQuery Audit logs data is stored | `'placeholder'` if enabled, `None` otherwise |
| `gcp_bigquery_audit_logs_dataset` | `DBT_BQ_MONITORING_GCP_BIGQUERY_AUDIT_LOGS_DATASET` | The dataset for BigQuery Audit logs data | `'placeholder'` if enabled, `None` otherwise |
| `gcp_bigquery_audit_logs_table` | `DBT_BQ_MONITORING_GCP_BIGQUERY_AUDIT_LOGS_TABLE` | The table for BigQuery Audit logs data | `'placeholder'` if enabled, `None` otherwise |
| `should_combine_audit_logs_and_information_schema` | `DBT_BQ_MONITORING_SHOULD_COMBINE_AUDIT_LOGS_AND_INFORMATION_SCHEMA` | Whether to combine the audit logs and information schema data | `false` |
2 changes: 1 addition & 1 deletion macros/enable_gcp_bigquery_audit_logs.sql
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
{% macro enable_gcp_bigquery_audit_logs() -%}
{% if var('enable_gcp_bigquery_audit_logs') == 'True' -%}
{% if var('enable_gcp_bigquery_audit_logs') | lower == 'true' -%}
{{ return(true) }}
{%- else -%}
{{ return(false) }}
Expand Down
2 changes: 1 addition & 1 deletion macros/enable_gcp_billing_export.sql
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
{% macro enable_gcp_billing_export() -%}
{% if var('enable_gcp_billing_export') == 'True' -%}
{% if var('enable_gcp_billing_export') | lower == 'true' -%}
{{ return(true) }}
{%- else -%}
{{ return(false) }}
Expand Down
2 changes: 1 addition & 1 deletion macros/jobs_done_incremental_hourly.sql
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
{% macro jobs_done_incremental_hourly() -%}
(SELECT *
FROM
{{ ref('jobs_by_project_with_cost') }}
{{ ref('jobs_with_cost') }}
WHERE
{% if is_incremental() %}
creation_time >= TIMESTAMP_TRUNC(_dbt_max_partition, HOUR)
Expand Down
7 changes: 7 additions & 0 deletions macros/should_combine_audit_logs_and_information_schema.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
{% macro should_combine_audit_logs_and_information_schema() -%}
{% if var('should_combine_audit_logs_and_information_schema') | lower == 'true' -%}
{{ return(true) }}
{%- else -%}
{{ return(false) }}
{%- endif %}
{%- endmacro %}
58 changes: 58 additions & 0 deletions models/base/combined_jobs_inputs.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
{{
config(
materialized = "ephemeral",
enabled = enable_gcp_bigquery_audit_logs() and should_combine_audit_logs_and_information_schema()
)
}}

SELECT
COALESCE(TIMESTAMP_TRUNC(a.timestamp, HOUR), TIMESTAMP_TRUNC(j.creation_time, HOUR)) AS hour,
COALESCE(a.bi_engine_statistics, j.bi_engine_statistics) AS bi_engine_statistics,
COALESCE(a.cache_hit, j.cache_hit) AS cache_hit,
a.caller_supplied_user_agent AS caller_supplied_user_agent, -- this field is only available in the audit logs
COALESCE(a.creation_time, j.creation_time) AS creation_time,
COALESCE(a.destination_table, j.destination_table) AS destination_table,
COALESCE(a.end_time, j.end_time) AS end_time,
COALESCE(a.error_result, j.error_result) AS error_result,
COALESCE(a.job_id, j.job_id) AS job_id,
j.job_stages AS job_stages, -- this field is only available in the information schema
COALESCE(a.job_type, j.job_type) AS job_type,
COALESCE(a.labels, j.labels) AS labels,
j.parent_job_id AS parent_job_id, -- this field is only available in information schema
COALESCE(a.priority, j.priority) AS priority,
COALESCE(a.project_id, j.project_id) AS project_id,
COALESCE(a.project_number, j.project_number) AS project_number,
COALESCE(a.query, j.query) AS query,
COALESCE(a.referenced_tables, j.referenced_tables) AS referenced_tables,
COALESCE(a.reservation_id, j.reservation_id) AS reservation_id,
COALESCE(a.start_time, j.start_time) AS start_time,
COALESCE(a.state, j.state) AS state,
COALESCE(a.statement_type, j.statement_type) AS statement_type,
j.timeline AS timeline, -- this field is only available in information schema
j.total_bytes_billed AS total_bytes_billed, -- this field is only available in information schema
j.total_bytes_processed AS total_bytes_processed, -- this field is only available in information schema
COALESCE(a.total_modified_partitions, j.total_modified_partitions) AS total_modified_partitions,
COALESCE(a.total_slot_ms, j.total_slot_ms) AS total_slot_ms,
j.transaction_id AS transaction_id, -- this field is only available in information schema
COALESCE(a.user_email, j.user_email) AS user_email,
j.query_info AS query_info, -- this field is only available in information schema
j.transferred_bytes AS transferred_bytes, -- this field is only available in information schema
j.materialized_view_statistics AS materialized_view_statistics -- this field is only available in information schema
FROM {{ ref('jobs_from_audit_logs') }} AS a
LEFT JOIN {{ ref('information_schema_jobs') }} AS j ON
{% if is_incremental() %}
j.creation_time BETWEEN TIMESTAMP_SUB(TIMESTAMP_TRUNC(_dbt_max_partition, HOUR), INTERVAL 6 HOUR) AND TIMESTAMP_TRUNC(_dbt_max_partition, HOUR)
{% else %}
j.creation_time >= TIMESTAMP_SUB(TIMESTAMP_TRUNC(TIMESTAMP_SUB(
TIMESTAMP_TRUNC(CURRENT_TIMESTAMP(), HOUR),
INTERVAL {{ var('lookback_window_days') }} DAY), HOUR), INTERVAL 6 HOUR)
{% endif %}
AND (a.project_id = j.project_id AND a.job_id = j.job_id)
WHERE
{% if is_incremental() %}
a.timestamp >= TIMESTAMP_TRUNC(_dbt_max_partition, HOUR)
{% else %}
a.timestamp >= TIMESTAMP_SUB(
TIMESTAMP_TRUNC(CURRENT_TIMESTAMP(), HOUR),
INTERVAL {{ var('lookback_window_days') }} DAY)
{% endif %}
39 changes: 18 additions & 21 deletions models/base/jobs_from_audit_logs.sql
Original file line number Diff line number Diff line change
@@ -1,29 +1,39 @@
{{
config(
materialized = "table",
materialized = "view",
enabled = enable_gcp_bigquery_audit_logs()
)
}}
SELECT
-- bi_engine_statistics is not available in the audit logs, so we default to NULL
CAST(NULL AS STRUCT<bi_engine_mode STRING, acceleration_mode STRING, bi_engine_reasons STRUCT<code STRING, message STRING>>) AS bi_engine_statistics,
CAST(NULL AS STRUCT<bi_engine_mode STRING, acceleration_mode STRING, bi_engine_reasons ARRAY<STRUCT<code STRING, message STRING>>>) AS bi_engine_statistics,
CAST(JSON_VALUE(protopayload_auditlog.metadataJson,
'$.jobChange.job.jobStats.queryStats.cacheHit') AS BOOL) AS cache_hit,
TIMESTAMP(JSON_VALUE(protopayload_auditlog.metadataJson,
'$.jobChange.job.jobStats.createTime')) AS creation_time,
COALESCE(JSON_VALUE(protopayload_auditlog.metadataJson, '$.jobChange.job.jobConfig.queryConfig.destinationTable'),
JSON_VALUE(protopayload_auditlog.metadataJson, '$.jobChange.job.jobConfig.loadConfig.destinationTable')
) AS destination_table,
-- destination table is like "projects/<project>/datasets/<dataset>/tables/<table>" in either $.jobChange.job.jobConfig.queryConfig.destinationTable or $.jobChange.job.jobConfig.loadConfig.destinationTable from protopayload_auditlog.metadataJson
-- the expected return type is STRUCT<project_id STRING, dataset_id STRING, table_id STRING>
(SELECT AS STRUCT
SPLIT(table_ref, '/')[OFFSET(1)] as project_id,
SPLIT(table_ref, '/')[OFFSET(3)] as dataset_id,
SPLIT(table_ref, '/')[OFFSET(5)] as table_id
FROM (
SELECT
COALESCE(
JSON_EXTRACT_SCALAR(protopayload_auditlog.metadataJson, '$.jobChange.job.jobConfig.queryConfig.destinationTable'),
JSON_EXTRACT_SCALAR(protopayload_auditlog.metadataJson, '$.jobChange.job.jobConfig.loadConfig.destinationTable')
) table_ref)) AS destination_table,
TIMESTAMP(JSON_VALUE(protopayload_auditlog.metadataJson,
'$.jobChange.job.jobStats.endTime')) AS end_time,
STRUCT(
JSON_VALUE(protopayload_auditlog.metadataJson, '$.jobChange.job.jobStatus.errorResult.code') AS code,
JSON_VALUE(protopayload_auditlog.metadataJson, '$.jobChange.job.jobStatus.errorResult.code') AS reason,
CAST(NULL AS STRING) AS location,
CAST(NULL AS STRING) AS debug_info,
JSON_VALUE(protopayload_auditlog.metadataJson, '$.jobChange.job.jobStatus.errorResult.message') AS message
) as error_result,
SPLIT(
JSON_VALUE(protopayload_auditlog.metadataJson, '$.jobChange.job.jobName'), '/'
)[SAFE_OFFSET(3)] AS job_id,
JSON_QUERY(protopayload_auditlog.metadataJson, '$.jobChange.job.jobStats.stages') AS job_stages,
JSON_VALUE(protopayload_auditlog.metadataJson, '$.jobChange.job.jobConfig.type') AS job_type,
ARRAY(
SELECT
Expand All @@ -43,8 +53,7 @@ SELECT
)
))
) AS labels,
-- parent_job_id is not available in the audit logs, so we default to NULL
NULL AS parent_job_id,

JSON_VALUE(protopayload_auditlog.metadataJson,
'$.jobChange.job.jobConfig.queryConfig.priority') AS priority,
resource.labels.project_id,
Expand Down Expand Up @@ -74,22 +83,10 @@ SELECT
'$.jobChange.job.jobStats.startTime')) AS start_time,
JSON_VALUE(protopayload_auditlog.metadataJson, '$.jobChange.job.jobStatus.jobState') AS state,
JSON_VALUE(protopayload_auditlog.metadataJson, '$.jobChange.job.jobConfig.queryConfig.statementType') AS statement_type,
JSON_QUERY(protopayload_auditlog.metadataJson, '$.jobChange.job.jobStats.timeline') AS timeline,
-- total_bytes_billed is not available in the audit logs, so we default to NULL
NULL AS total_bytes_billed,
JSON_VALUE(protopayload_auditlog.metadataJson, '$.jobChange.job.jobStats.queryStats.totalBytesProcessed') AS total_bytes_processed,
-- total_modified_partitions is not available in the audit logs, so we default to NULL
NULL AS total_modified_partitions,
CAST(JSON_VALUE(protopayload_auditlog.metadataJson, '$.jobChange.job.jobStats.totalSlotMs') AS INT64) AS total_slot_ms,
-- transaction_id is not available in the audit logs, so we default to NULL
NULL AS transaction_id,
protopayload_auditlog.authenticationInfo.principalEmail AS user_email,
-- query_info is not available in the audit logs, so we default to NULL
NULL AS query_info,
-- transferred_bytes is not available in the audit logs, so we default to NULL
NULL AS transferred_bytes,
-- materialized_view_statistics is not available in the audit logs, so we default to NULL
NULL AS materialized_view_statistics,

protopayload_auditlog.requestMetadata.callerIp AS caller_ip_address,
protopayload_auditlog.requestMetadata.callerSuppliedUserAgent AS caller_supplied_user_agent,
Expand Down
17 changes: 15 additions & 2 deletions models/base/jobs_with_cost.sql
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,18 @@
WITH
source AS (
SELECT *,
{%- if enable_gcp_bigquery_audit_logs() %}
{%- if enable_gcp_bigquery_audit_logs() and should_combine_audit_logs_and_information_schema() %}
FROM {{ ref('combined_jobs_inputs') }}
{%- elif enable_gcp_bigquery_audit_logs() %}
NULL AS job_stages,
NULL AS parent_job_id,
NULL AS timeline,
NULL AS total_bytes_billed,
NULL AS total_bytes_processed,
NULL AS transaction_id,
NULL AS query_info,
NULL AS transferred_bytes,
NULL AS materialized_view_statistics,
FROM {{ ref('jobs_from_audit_logs') }}
{%- else %}
NULL AS caller_supplied_user_agent,
Expand All @@ -21,7 +32,9 @@ SELECT
cache_hit,
caller_supplied_user_agent,
creation_time,
{%- if enable_gcp_bigquery_audit_logs() %}
{%- if enable_gcp_bigquery_audit_logs() and should_combine_audit_logs_and_information_schema() %}
hour
{%- elif enable_gcp_bigquery_audit_logs() %}
TIMESTAMP_TRUNC(timestamp, HOUR)
{%- else %}
TIMESTAMP_TRUNC(creation_time, HOUR)
Expand Down
12 changes: 8 additions & 4 deletions models/global/datamart/daily_spend.sql
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
WITH compute_cost AS (
{#- we use the billing export if possible else fallback to the estimated comput cost #}
{%- if enable_gcp_billing_export() %}
SELECT day,
SELECT TIMESTAMP_TRUNC(hour, DAY) AS day,
'compute' AS cost_category,
SUM(compute_cost) AS cost
FROM {{ ref('compute_billing_per_hour') }}
Expand All @@ -37,7 +37,7 @@ WITH compute_cost AS (
,
storage_cost AS (
SELECT
day,
TIMESTAMP_TRUNC(HOUR, DAY) AS day,
'storage' AS cost_category,
SUM(storage_cost) AS cost
FROM {{ ref('storage_billing_per_hour') }}
Expand All @@ -52,10 +52,14 @@ SELECT
cost_category,
SUM(cost) AS cost
FROM (
SELECT * FROM compute_cost
SELECT
day,
cost_category,
cost
FROM compute_cost
{%- if enable_gcp_billing_export() %}
UNION ALL
SELECT * FROM storage_cost
SELECT day, cost_category, cost FROM storage_cost AS s
{%- endif %}
)
GROUP BY ALL

0 comments on commit 5266257

Please sign in to comment.