Skip to content

Commit

Permalink
MC-1608 Add job for New Tab engagement by corpus_item_id
Browse files Browse the repository at this point in the history
This job is similar to bqetl_merino_newtab_extract_to_gcs, but it aggregates
on corpus_item_id instead of scheduled_corpus_item_id. The latter (as the name implies)
identifies a scheduled item. We want to start aggregating on the item itself
because we will stop scheduling them for specific dates as part of an experiment
in Q1.
  • Loading branch information
mmiermans committed Jan 2, 2025
1 parent 67fbf88 commit 79804ce
Show file tree
Hide file tree
Showing 8 changed files with 180 additions and 2 deletions.
23 changes: 22 additions & 1 deletion dags.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1804,7 +1804,28 @@ bqetl_merino_newtab_extract_to_gcs:
retry_delay: 5m
start_date: '2024-08-14'
description: |
Aggregates Newtab engagement data that lands in a GCS bucket for Merino recommendations.
Aggregates Newtab engagement data by scheduled_corpus_item_id that lands in a GCS bucket for Merino recommendations.
repo: bigquery-etl
schedule_interval: "*/20 * * * *"
tags:
- repo/bigquery-etl
- impact/tier_1

bqetl_merino_newtab_corpus_items_to_gcs:
default_args:
depends_on_past: false
email:
- [email protected]
- [email protected]
email_on_failure: true
email_on_retry: false
end_date: null
owner: [email protected]
retries: 2
retry_delay: 5m
start_date: '2024-08-14'
description: |
Aggregates Newtab engagement data by corpus_item_id that lands in a GCS bucket for Merino recommendations.
repo: bigquery-etl
schedule_interval: "*/20 * * * *"
tags:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
friendly_name: Newtab Merino BigQuery to Google Cloud Storage (GCS) aggregated on corpus_item_id
description: |-
Newtab engagements aggregated on corpus_item_id are exported to a GCS bucket for Merino to consume.
The table rebuilds every 20 minutes and aggregate one day of data.
owners:
- [email protected]
- [email protected]
labels:
incremental: false
owner1: mmiermans
scheduling:
dag_name: bqetl_merino_newtab_corpus_items_to_gcs
arguments:
- --source-project=moz-fx-data-shared-prod
- --source-dataset=telemetry_derived
- --source-table=newtab_merino_corpus_items_v1
- --destination-bucket=merino-airflow-data-prodpy
- --destination-prefix=newtab-merino-exports/corpus-item-engagement
- --deletion-days-old=30
referenced_tables:
- ['moz-fx-data-shared-prod', 'telemetry_derived', 'newtab_merino_corpus_items_v1']
bigquery: null
references: {}
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
"""Extract New Tab engagement query results and write the combined JSON to a single file."""

from bigquery_etl.newtab_merino import export_newtab_merino_table_to_gcs

if __name__ == "__main__":
export_newtab_merino_table_to_gcs()
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
-- macro checks

#fail
{{ not_null(["scheduled_corpus_item_id"]) }}

#fail
{{ not_null(["impression_count"]) }}

#fail
{{ not_null(["click_count"]) }}

#fail
{{ min_row_count(1) }}
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
friendly_name: Newtab Merino Extract
description: |-
Aggregated Newtab events for Merino recommendations.
See https://mozilla-hub.atlassian.net/browse/MC-1256
owners:
- [email protected]
- [email protected]
labels:
incremental: false
owner: mmiermans
bigquery:
time_partitioning: null
scheduling:
dag_name: bqetl_merino_newtab_extract_to_gcs
date_partition_parameter: null
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
WITH deduplicated_pings AS (
SELECT
submission_timestamp,
document_id,
events,
normalized_country_code,
FROM
`moz-fx-data-shared-prod.firefox_desktop_live.newtab_v1`
WHERE
submission_timestamp > TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 1 DAY)
QUALIFY
ROW_NUMBER() OVER (
PARTITION BY
DATE(submission_timestamp),
document_id
ORDER BY
submission_timestamp DESC
) = 1
),
flattened_newtab_events AS (
SELECT
document_id,
submission_timestamp,
normalized_country_code,
unnested_events.name AS event_name,
mozfun.map.get_key(unnested_events.extra, 'corpus_item_id') AS corpus_item_id,
TIMESTAMP_MILLIS(
SAFE_CAST(mozfun.map.get_key(unnested_events.extra, 'recommended_at') AS INT64)
) AS recommended_at
FROM
deduplicated_pings,
UNNEST(events) AS unnested_events
--filter to Pocket events
WHERE
unnested_events.category = 'pocket'
AND unnested_events.name IN ('impression', 'click')
--keep only data with a non-null scheduled corpus item ID
AND (mozfun.map.get_key(unnested_events.extra, 'corpus_item_id') IS NOT NULL)
AND SAFE_CAST(mozfun.map.get_key(unnested_events.extra, 'recommended_at') AS INT64) IS NOT NULL
),
aggregated_events AS (
SELECT
corpus_item_id,
normalized_country_code,
SUM(CASE WHEN event_name = 'impression' THEN 1 ELSE 0 END) AS impression_count,
SUM(CASE WHEN event_name = 'click' THEN 1 ELSE 0 END) AS click_count
FROM
flattened_newtab_events
WHERE
recommended_at > TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 1 DAY)
GROUP BY
corpus_item_id,
normalized_country_code
),
global_aggregates AS (
SELECT
corpus_item_id,
CAST(NULL AS STRING) AS region,
SUM(impression_count) AS impression_count,
SUM(click_count) AS click_count
FROM
aggregated_events
GROUP BY
corpus_item_id
),
country_aggregates AS (
SELECT
corpus_item_id,
normalized_country_code AS region,
impression_count,
click_count
FROM
aggregated_events
WHERE
-- Gather country (a.k.a. region) specific engagement for all countries that share a feed.
-- https://mozilla-hub.atlassian.net/wiki/x/JY3LB
normalized_country_code IN ('US', 'CA', 'DE', 'CH', 'AT', 'BE', 'GB', 'IE')
)
SELECT
*
FROM
global_aggregates
UNION ALL
SELECT
*
FROM
country_aggregates;
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
fields:
- mode: NULLABLE
name: corpus_item_id
type: STRING
- mode: NULLABLE
name: impression_count
type: INTEGER
- mode: NULLABLE
name: click_count
type: INTEGER
- mode: NULLABLE
name: region
type: STRING
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
friendly_name: Newtab Merino BigQuery Extract to Google Cloud Storage (GCS)
friendly_name: Newtab Merino BigQuery Extract to Google Cloud Storage (GCS) aggregated on scheduled_corpus_item_id
description: |-
Newtab engagements aggregated on scheduled_corpus_item_id are exported to a GCS bucket for Merino to consume.
The table rebuilds every 20 minutes and aggregate one day of data.
Expand Down

0 comments on commit 79804ce

Please sign in to comment.