Data products that are streams are typically based on Cloud Pub/Sub. For more details review this section of the Data Mesh guide.
To illustrate how data streams are produced and consumed we will use two Dataflow pipelines - one to generate the input for the product and another one to produce curated data to be consumed.
Build the base infrastructure of a data mesh organization as described in the parent README. It will create all the necessary resources and set up a number of Terraform variables needed for the scripts in this folder.
Ensure you have Java 11 or newer compiler installed - it's needed to build the Dataflow ingest and consumption pipelines.
In this step a new Pub/Sub topic is created where the source payloads will be published. Also, a Cloud Storage bucket is created to store Dataflow deployment artifacts.
cd ingest-infrastructure
terraform init
terraform apply
cd ..
Once the infrastructure is created, store the resource ids of the newly created artifacts
source get-ingest-infra-ids.sh
This step uses a Google-provided Streaming Data Generation Dataflow template. It will generate Pub/Sub messages with a JSON payload based on event-generator-template.json . These messages will be written to an input topic in the domain's base data project.
./start-event-generation.sh <messages-per-second>
Selecting messages-per-second
parameters in low hundeds range is a good option
for this demo.
./start-product-pipeline.sh
Once both the event generation and product pipelines start, it can take several minutes before the data appears in the destination tables and topics.
Navigate to the pipeline monitoring URL printed to the console by
the start-product-pipeline.sh
script. The pipeline graph will be similar to
the one below.
This pipeline performs several steps typical to ingestion pipelines:
- Validation (Parse Payload, Process Invalid Messages)
- Enrichment (Enrich Clicks)
- Persistence of the enriched data to the analytical data store (Persist Clicks to BQ)
- Streaming analytics (Generate Stats, Publish Stats)
Streaming analytics branch of the pipeline generates data which is exposed as a data product - the statistics generated are published to a Pub/Sub topic, which is tagged with the product tag template.
Potential customers can find this data stream product by using this command from the root directory of this repo:
bin/search-products.sh "web traffic"
The output will be similar to this:
[
{
"integratedSystem": "CLOUD_PUBSUB",
"linkedResource": "//pubsub.googleapis.com/projects/your-data-mesh-domain-product/topics/web-traffic-stats-v1",
"modifyTime": "2022-10-07T02:34:55Z",
"relativeResourceName": "projects/your-data-mesh-domain-product/locations/global/entryGroups/@pubsub/entries/cHJ...djE",
"searchResultSubtype": "entry.data_stream.topic",
"searchResultType": "ENTRY"
}
]
This means that there is a Pub/Sub topic which matches the search description.
To get the details about the topic, copy the value of relativeResourceName
from the search result and run
bin/show-entry-details.sh <relativeResourceName>
This will print content of all the tags on this resource:
[
{
"fields": {
"access_request_link": {
"displayName": "Access request link",
"order": 2,
"stringValue": "https://streaming-network-events.wiki.corp/v1/access"
},
"business_owner": {
"displayName": "Business owner",
"order": 7,
"stringValue": "[email protected]"
},
"data_domain": {
"displayName": "Data domain",
"enumValue": {
"displayName": "Operations"
},
"order": 11
},
"data_product_description": {
"displayName": "Data product description",
"order": 8,
"stringValue": "15 minute web click counts per region generated every 5 minutes"
},
"data_product_name": {
"displayName": "Data product name",
"order": 9,
"stringValue": "Web traffic statistics v1"
},
"data_product_status": {
"displayName": "Data product status",
"enumValue": {
"displayName": "RELEASED"
},
"order": 1
},
...
"documentation_link": {
"displayName": "Documentation Link",
"order": 3,
"stringValue": "https://streaming-network-events.wiki.corp/v1/overview"
},
"technical_owner": {
"displayName": "Technical owner",
"order": 6,
"stringValue": "[email protected]"
}
},
"name": "projects/your-data-mesh-domain-product/locations/global/entryGroups/@pubsub/entries/cHJvamVjdHMvZ2NwLWRhdGEtbWVzaC1kb21haW4tcHJvZHVjdC90b3BpY3Mvd2ViLXRyYWZmaWMtc3RhdHMtdjE/tags/CcwopuLMtY1T",
"template": "projects/your-data-mesh-central-catalog/locations/us-central1/tagTemplates/data_product",
"templateDisplayName": "Data Product"
},
{
"fields": {
"description": {
"displayName": "Description",
"stringValue": "Web traffic by location statistics v1"
},
"schema_ref": {
"displayName": "Enforced PubSub topic schema",
"stringValue": "https://console.cloud.google.com/cloudpubsub/schema/detail/web_traffic_stats_v1?project=your-data-mesh-domain-product"
}
},
"name": "projects/your-data-mesh-domain-product/locations/global/entryGroups/@pubsub/entries/cHJ...jE/tags/CU...D",
"template": "projects/your-data-mesh-central-catalog/locations/us-central1/tagTemplates/pubsub_topic_details",
"templateDisplayName": "PubSub Topic Details"
}
]
Note: typically an organization will build a custom UI to visualize the Data Catalog tags to the end user.
Consumers create subscriptions to the product topic in their projects (once they are granted the permission to do that):
cd streaming-product/consumption-infrastructure
terraform init
terraform apply
At this point the data should be appearing in this subscription. You can preview the data in the subscription:
./pull-messages-from-consumer-subscription.sh <number-of-messages>
You should see the content of the Pub/Sub payloads:
{"country":"USA","region":"CA","start_ts":"2022-10-13T20:10:00.000Z","end_ts":"2022-10-13T20:25:00.000Z","count":21}
{"country":"Canada","region":"AB","start_ts":"2022-10-13T20:10:00.000Z","end_ts":"2022-10-13T20:25:00.000Z","count":15}
{"country":"USA","region":"AZ","start_ts":"2022-10-13T20:10:00.000Z","end_ts":"2022-10-13T20:25:00.000Z","count":21}
{"country":"Mexico","region":"JA","start_ts":"2022-10-13T20:10:00.000Z","end_ts":"2022-10-13T20:25:00.000Z","count":9}
{"country":"USA","region":"NV","start_ts":"2022-10-13T20:10:00.000Z","end_ts":"2022-10-13T20:25:00.000Z","count":11}
Note: sometimes not a full list of requested message is retrieved. This is expected behaviour of the gcloud command.
./stop-event-generation.sh
./stop-product-pipeline.sh
cd consumption-infrastructure
terraform destroy
cd ..
cd ingest-infrastructure
terraform destroy
cd ..