Skip to content

Commit

Permalink
Merge pull request #45 from mwvgroup/development
Browse files Browse the repository at this point in the history
Development
  • Loading branch information
troyraen authored Feb 11, 2021
2 parents 58af663 + 888feee commit aea85f0
Show file tree
Hide file tree
Showing 23 changed files with 399 additions and 473 deletions.
5 changes: 5 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
# ZTF alert data
broker/ztf_archive/data/

# Authentication keys
GCPauth.json
krb5.conf
pitt-reader.user.keytab

# OS files
*.DS_Store
.AppleDouble
Expand Down
41 changes: 27 additions & 14 deletions broker/alert_ingestion/consume.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
"""

import logging
import google.cloud.logging
import os
import re
from pathlib import Path
Expand All @@ -67,7 +68,10 @@
from google.cloud import pubsub, storage
from google.cloud.pubsub_v1.publisher.futures import Future

log = logging.getLogger(__name__)
log = logging.getLogger(__name__) # python root logger
client = google.cloud.logging.Client() # google cloud logger
client.get_default_handler()
client.setup_logging() # this connects cloud log to root log

DEFAULT_ZTF_CONFIG = {
'bootstrap.servers': 'public2.alerts.ztf.uw.edu:9094',
Expand Down Expand Up @@ -111,7 +115,7 @@ def seekable(self): # necessary so that fastavro can write to the file


def _set_config_defaults(kafka_config: dict) -> dict:
"""Set default values for a Kafka configuration dictionary
"""Set default values for a Kafka configuration dictionaryk
Default values:
enable.auto.commit: False,
Expand Down Expand Up @@ -168,7 +172,7 @@ def __init__(
self.pubsub_alert_data_topic = pubsub_alert_data_topic
self.pubsub_in_GCS_topic = pubsub_in_GCS_topic
self.kafka_server = kafka_config["bootstrap.servers"]
log.info(f'Initializing consumer: {self.__repr__()}')
log.debug(f'Initializing consumer: {self.__repr__()}')

# Connect to Kafka stream
# Enforce NO auto commit, correct log handling
Expand All @@ -178,12 +182,12 @@ def __init__(
# Connect to Google Cloud Storage
self.storage_client = storage.Client()
self.bucket = self.storage_client.get_bucket(bucket_name)
log.info(f'Connected to bucket: {self.bucket.name}')
log.debug(f'Connected to bucket: {self.bucket.name}')

def close(self) -> None:
"""Close down and terminate the Kafka Consumer"""

log.info(f'Closing consumer: {self.__repr__()}')
log.debug(f'Closing consumer: {self.__repr__()}')
super().close()

@staticmethod
Expand Down Expand Up @@ -219,9 +223,9 @@ def fix_schema(temp_file: TempAlertFile, survey: str, version: str) -> None:
temp_file.truncate() # removes leftover data
temp_file.seek(0)

log.debug(f'Schema header reformatted for {survey} version {version}')
log.info(f'Schema header reformatted for {survey} version {version}')

def upload_bytes_to_bucket(self, data: bytes, destination_name: str) -> None:
def upload_bytes_to_bucket(self, data: bytes, destination_name: str) -> bytes:
"""Uploads bytes data to a GCP storage bucket. Prior to storage,
corrects the schema header to be compliant with BigQuery's strict
validation standards if the alert is from a survey version with an
Expand All @@ -230,15 +234,19 @@ def upload_bytes_to_bucket(self, data: bytes, destination_name: str) -> None:
Args:
data: Data to upload
destination_name: Name of the file to be created
Returns:
data with a corrected schema header (if one is necessary)
"""

log.debug(f'Uploading {destination_name} to {self.bucket.name}')
log.info(f'Uploading {destination_name} to {self.bucket.name}')
blob = self.bucket.blob(destination_name)

# Get the survey name and version
survey = guess_schema_survey(data)
version = guess_schema_version(data)

# Correct the message schema, upload to GCS, and return it
# By default, spool data in memory to avoid IO unless data is too big
# LSST alerts are anticipated at 80 kB, so 150 kB should be plenty
max_alert_packet_size = 150000
Expand All @@ -247,15 +255,19 @@ def upload_bytes_to_bucket(self, data: bytes, destination_name: str) -> None:
temp_file.seek(0)
self.fix_schema(temp_file, survey, version)
blob.upload_from_file(temp_file)
temp_file.seek(0)
return temp_file.read()

def run(self) -> None:
"""Ingest kafka Messages to GCS and PubSub"""

log.info('Starting consumer.run ...')
log.debug('Starting consumer.run ...')
try:
while True:
msg = self.consume(num_messages=1, timeout=5)[0]
# msg = self.consume(num_messages=1, timeout=1)
msg = self.poll(timeout=1)
if msg is None:
log.info('msg is None')
continue

if msg.error():
Expand All @@ -266,12 +278,13 @@ def run(self) -> None:

else:
timestamp_kind, timestamp = msg.timestamp()
file_name = f'{timestamp}.avro'
file_name = f'{msg.topic()}_{timestamp}.avro'

log.debug(f'Ingesting {file_name}')
log.info(f'Ingesting {file_name}')
msg_schema_fixed = self.upload_bytes_to_bucket(msg.value(), file_name)
# returns msg.value() bytes object with schema corrected
publish_pubsub(self.pubsub_alert_data_topic, msg_schema_fixed)
publish_pubsub(self.pubsub_in_GCS_topic, file_name.encode('UTF-8'))
publish_pubsub(self.pubsub_alert_data_topic, msg.value())
self.upload_bytes_to_bucket(msg.value(), file_name)

if not self._debug:
self.commit()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
###

# NOT SURE OF THE RIGHT WAY TO GET INTO THIS DIRECTORY:
cd broker/alert_ingestion/GCS_to_BQ
cd broker/cloud_functions/GCS_to_BQ

# deploy stream_GCS_to_BQ() to listen to the ztf_alert_avro_bucket
bucket="${GOOGLE_CLOUD_PROJECT}_ztf_alert_avro_bucket"
Expand Down
13 changes: 13 additions & 0 deletions broker/cloud_functions/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
# Cloud Functions

This directory contains cloud functions used by the Pitt-Google Broker.
Source code for each function is stored in a dedicated directory
and is accompanied by a bash script that deploys the cloud function
to the Google Cloud Platform.

For more information on cloud functions, see: https://cloud.google.com/functions

| Function | Description |
|---|---|
| `GCS_to_BQ` | Load the contents of avro files from Google Cloud Storage (GCP) into Big Query (BQ) |
| `scheduleinstance` | Deploys and schedules the execution of functions for launching virtual machines that ingest ZTF data into BQ |
38 changes: 38 additions & 0 deletions broker/cloud_functions/scheduleinstance.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
#!/bin/sh

echo "WARNING: Make sure you have updated the values of [IMAGE NAME] and [VERSION] with the values of the current Docker image using this script.\n"
# version number should be most recent commit used to build the image
# git log -1 --format=format:"%H"

# Configure gcloud as a Docker credential helper
gcloud auth configure-docker gcr.io/ardent-cycling-243415/consumeztf

# Create 2 instances of the consumer
gcloud compute instances create-with-container consume-ztf-1 --zone=us-central1-a --machine-type=f1-micro --image-project=cos-cloud --container-image=gcr.io/ardent-cycling-243415/consumeztf:7019f8aa86ffe16dcb36fa791dc2fb7e56bb687f --labels=env=consume-ztf-1 --image=cos-stable-81-12871-1190-0 --service-account=591409139500-compute@developer.gserviceaccount.com --scopes=cloud-platform
gcloud compute instances create-with-container consume-ztf-2 --zone=us-central1-a --machine-type=f1-micro --image-project=cos-cloud --container-image=gcr.io/ardent-cycling-243415/consumeztf:7019f8aa86ffe16dcb36fa791dc2fb7e56bb687f --labels=env=consume-ztf-2 --image=cos-stable-81-12871-1190-0 --service-account=591409139500-compute@developer.gserviceaccount.com --scopes=cloud-platform


# Create the Pub/Sub topics to trigger starting and stopping the instance
gcloud pubsub topics create start-instance-event
gcloud pubsub topics create stop-instance-event


# Create the cloud functions to publish to PubSub

cd scheduleinstance/

gcloud functions deploy startInstancePubSub --trigger-topic start-instance-event --runtime nodejs8

gcloud functions deploy stopInstancePubSub --trigger-topic stop-instance-event --runtime nodejs8

# Finally, schedule the PubSub messages that trigger the cloud functions.

# Reset consume-ztf-1 on odd days
gcloud scheduler jobs create pubsub stop-consume-ztf-1 --schedule '0 9 1-31/2 * *' --topic stop-instance-event --message-body '{"zone":"us-west1-b", "label":"env=consume-ztf-1"}' --time-zone 'America/Los_Angeles'

gcloud scheduler jobs create pubsub start-consume-ztf-1 --schedule '0 17 1-31/2 * *' --topic start-instance-event --message-body '{"zone":"us-west1-b", "label":"env=consume-ztf-1"}' --time-zone 'America/Los_Angeles'

# Reset consume-ztf-2 on even days
gcloud scheduler jobs create pubsub stop-consume-ztf-2 --schedule '0 0 2-30/2 * *' --topic stop-instance-event --message-body '{"zone":"us-west1-b", "label":"env=consume-ztf-2"}' --time-zone 'America/Los_Angeles'

gcloud scheduler jobs create pubsub start-consume-ztf-2 --schedule '0 0 2-30/2 * *' --topic start-instance-event --message-body '{"zone":"us-west1-b", "label":"env=consume-ztf-2"}' --time-zone 'America/Los_Angeles'
2 changes: 1 addition & 1 deletion broker/pub_sub_client/message_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ def publish_pubsub(topic_name, message):

topic_path = publisher.topic_path(project_id, topic_name)

topic = publisher.get_topic(topic_path)
# topic = publisher.get_topic(topic_path)
log.info(f'Connected to PubSub: {topic_path}')

future = publisher.publish(topic_path, data=message)
Expand Down
58 changes: 38 additions & 20 deletions docker_files/consume_ztf.Dockerfile
Original file line number Diff line number Diff line change
@@ -1,23 +1,41 @@
FROM python:3.7
# Slim used to reduce image size
FROM python:3.7-slim

# Configure Environment variables
ENV PYTHONPATH "Pitt-Google-Broker/:${PYTHONPATH}"
ENV GOOGLE_CLOUD_PROJECT "ardent-cycling-243415"
ENV ztf_server "public2.alerts.ztf.uw.edu:9094"
ENV ztf_principle "[email protected]"
ENV ztf_keytab_path "pitt-reader.user.keytab"
ENV PATH="/root/miniconda3/bin:${PATH}"
ARG PATH="/root/miniconda3/bin:${PATH}"

# Copy credentials and runtime files
COPY docker_files/consume_ztf.py docker_files/consume_ztf.py
COPY krb5.conf /etc/krb5.conf
COPY pitt-reader.user.keytab pitt-reader.user.keytab

# Install utils for fetching remote source code
RUN apt-get update && \
apt-get install -y git wget python-dev gcc krb5-user && \
rm -rf /var/lib/apt/lists/* && \
apt-get clean

RUN wget \
https://repo.anaconda.com/miniconda/Miniconda3-latest-Linux-x86_64.sh \
&& mkdir /root/.conda \
&& bash Miniconda3-latest-Linux-x86_64.sh -b \
&& rm -f Miniconda3-latest-Linux-x86_64.sh

RUN conda install -c conda-forge kafka-python -y
RUN conda install -c conda-forge python-confluent-kafka -y
RUN conda install -c stuarteberg -c conda-forge librdkafka -y

# Get broker source code and install dependencies
RUN git clone --single-branch --branch master --depth 1 https://github.com/mwvgroup/Pitt-Google-Broker && \
rm -rf Pitt-Google-Broker/.git

MAINTAINER Daniel Perrefort "[email protected]"

COPY consume_ztf.py consume_ztf.py

# Install git
RUN apt-get update
RUN apt-get install -y git

# Get broker source code and add to path
RUN git clone https://github.com/mwvgroup/Pitt-Google-Broker

# Install dependencies
# Some dependency installs may fail without numpy, so we install it first
RUN pip install numpy
RUN pip install -r Pitt-Google-Broker/requirements.txt

# Configure Python Environment
ENV PYTHONPATH="Pitt-Google-Broker/:${PYTHONPATH}"


CMD [ "python", "./consume_ztf.py" ]
# Launch the ZTF consumer
CMD [ "python", "docker_files/consume_ztf.py" ]
9 changes: 5 additions & 4 deletions docker_files/consume_ztf.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,14 @@
import os
from datetime import datetime

from broker.consumer import GCSKafkaConsumer
from broker.alert_ingestion.consume import GCSKafkaConsumer

# Define connection configuration using default values as a starting point
config = {
'bootstrap.servers': os.environ['ztf_server'],
'group.id': 'group',
'session.timeout.ms': 6000,
'enable.auto.commit': 'FALSE',
'enable.auto.commit': 'False',
'sasl.kerberos.kinit.cmd': 'kinit -t "%{sasl.kerberos.keytab}" -k %{sasl.kerberos.principal}',
'sasl.kerberos.service.name': 'kafka',
'security.protocol': 'SASL_PLAINTEXT',
Expand All @@ -30,9 +30,10 @@
# Create a consumer
c = GCSKafkaConsumer(
kafka_config=config,
bucket_name='ardent-cycling-243415-ztf-avro-files',
bucket_name='ardent-cycling-243415_ztf_alert_avro_bucket',
kafka_topic=ztf_topic,
pubsub_topic='ztf-avro-status'
pubsub_alert_data_topic='ztf_alert_data',
pubsub_in_GCS_topic='ztf_alert_avro_in_bucket'
)

if __name__ == '__main__':
Expand Down
63 changes: 63 additions & 0 deletions docs/source/deployment/cloud_configuration.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
Cloud Configuration
===================

Before deploying a broker instance to the cloud, you will need to create and
authenticate a new cloud project. This project, and it's unique Id, will be
used to organize the various resources used by the deployed system. For
information on creating a new GCP project, see:
`https://cloud.google.com/resource-manager/docs/creating-managing-projects <https://cloud.google.com/resource-manager/docs/creating-managing-projects>`.

Once your project has been created, take note of the unique project Id as it
will be required at multiple points throughout the deployment process.

Authenticate CLI Tools
-------------------

You will need to authenticate the `gcloud` command line tools
so that they can access your google cloud project. This is accomplished using
the project Id noted earlier:

.. code-block:: bash
gcloud auth login # Login to GCP
gcloud config set project [PROJECT-ID] # Configure the project ID
gcloud auth configure-docker # Allow access for deploying docker images
Setting up GCP
--------------

You will need to set up a handful of tools in GCP. This includes enabling
various API's for use in your GCP project

.. code-block: bash
gcloud services enable containerregistry.googleapis.com
With the API's enabled, the broker package provides
an automated setup tool that creates various recourses required
for the broker to run.

.. code-block:: python
:linenos:
from broker.gcp_setup import auto_setup
# See a list of changes that will be made to your GCP project
help(auto_setup)
# Setup your GCP project
auto_setup()
Deploying the ``stream_GCS_to_BQ`` Cloud Function
-------------------------------------------------

The ``stream_GCS_to_BQ`` function must be deployed from the command line as a
Google Cloud Function so that it listens to the appropriate bucket(s) for new
alert Avro files and appends the data to a BigQuery table. The Google Cloud SDK
must be installed first (see :ref:`_gcloud`). The following script automates the
deployment. Note that it may take a couple of minutes to complete.

.. code-block::bash
:linenos:
./broker/cloud_functions/GCS_to_BQ.sh
Loading

0 comments on commit aea85f0

Please sign in to comment.