Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pure python pipline #210

Open
wants to merge 18 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 17 additions & 27 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,12 @@ reporting easier.

Operational overview:

1. Python script `load_data.py`:
* downloads a zip clinical trials registry data from ClinicalTrials.gov
* converts the XML to JSON
* uploads it to BigQuery
* runs SQL to transform it to tabular format including fields to
indentify ACTs and their lateness
* downloads SQL as a CSV file

1. A CSV file of ACTs and pACTs is generated from a full zip archive
published daily by ClinicalTrials.gov. This is done via a
transformation process maintained in a [separate
repo](https://github.com/ebmdatalab/clinicaltrials-act-converter), and
triggered via the `load_data.py` management command, which following
CSV conversion goes on to call the...
2. Django management command `process_data`:
* imports CSV file into Django models
* precomputes aggregate statistics and turns these into rankings
Expand All @@ -36,25 +34,17 @@ Operational overview:
(specifically, trials which have been submitted but are under a QA
process).

These two commands are run daily via a `fab` script, and the results
loaded into a staging database / website.

A separate command copies new data from staging to production
(following moderation).

Much complex logic has been expressed in SQL, which makes it hard to read
and test. This is a legacy of splitting the development between
academics with the domain expertise (and who could use SQL to
prototype) and software engineers. Now the project has been running
for a while and new development interations are less frequent, a useful
project would be as much of this logic to Python.

Similarly, the only reason step (1) exists is to create a CSV which
can be imported to the database. That CSV is useful in its own right
for QA by our academics, but the XML and JSON artefacts are just
intermediate formats that could legitimately be dropped in a
refactored solution (and the CSV could be generated directly from the
database).
`load_data` is run daily by a cron job
([job](https://github.com/ebmdatalab/clinicaltrials-act-tracker/blob/master/deploy/crontab-fdaaa-update),
[script](https://github.com/ebmdatalab/clinicaltrials-act-tracker/blob/master/deploy/fab_scripts/kickoff_background_data_load.sh))
in a staging environment, where the latest data is reviewed by a team
member.

A [separate
command](https://github.com/ebmdatalab/clinicaltrials-act-tracker/blob/master/deploy/fab_scripts/copy_staging_to_live.sh)
copies new data from staging to production (following moderation).
These commands can also be triggered via fab, and via `ebmbot`
chatops.

The historic reason for the XML -> JSON route is because BigQuery
includes a number of useful JSON functions which can be manipulated by
Expand Down
Binary file not shown.
203 changes: 17 additions & 186 deletions clinicaltrials/frontend/management/commands/load_data.py
Original file line number Diff line number Diff line change
@@ -1,86 +1,10 @@
# -*- coding: utf-8 -*-
import logging
import sys
import traceback

from bigquery import Client
from bigquery import StorageClient
from bigquery import TableExporter
from bigquery import wait_for_job
from bigquery import gen_job_name
import xmltodict
import os
import subprocess
import json
import glob
import datetime
import tempfile
import shutil
import requests
import contextlib
import re
from google.cloud.exceptions import NotFound
from xml.parsers.expat import ExpatError

import traceback
from frontend.management.commands.process_data import Command as ProcessCommand
from django.core.management.base import BaseCommand
from django.conf import settings


logger = logging.getLogger(__name__)


def raw_json_name():
date = datetime.datetime.now().strftime('%Y-%m-%d')
return "raw_clincialtrials_json_{}.csv".format(date)


def postprocessor(path, key, value):
"""Convert key names to something bigquery compatible
"""
if key.startswith('#') or key.startswith('@'):
key = key[1:]
if key == 'clinical_results':
# Arbitrarily long field that we don't need, see #179
value = {'truncated_by_postprocessor': True}
return key, value


def wget_file(target, url):
subprocess.check_call(["wget", "-q", "-O", target, url])


def download_and_extract():
"""Clean up from past runs, then download into a temp location and move the
result into place.
"""
logger.info("Downloading. This takes at least 30 mins on a fast connection!")
url = 'https://clinicaltrials.gov/AllPublicXML.zip'

# download and extract
container = tempfile.mkdtemp(
prefix=settings.STORAGE_PREFIX.rstrip(os.sep), dir=settings.WORKING_VOLUME)
try:
data_file = os.path.join(container, "data.zip")
wget_file(data_file, url)
# Can't "wget|unzip" in a pipe because zipfiles have index at end of file.
with contextlib.suppress(OSError):
shutil.rmtree(settings.WORKING_DIR)
subprocess.check_call(["unzip", "-q", "-o", "-d", settings.WORKING_DIR, data_file])
finally:
shutil.rmtree(container)


def upload_to_cloud():
# XXX we should periodically delete old ones of these
logger.info("Uploading to cloud")
client = StorageClient()
bucket = client.get_bucket()
blob = bucket.blob(
"{}{}".format(settings.STORAGE_PREFIX, raw_json_name()),
chunk_size=1024*1024
)
with open(os.path.join(settings.WORKING_DIR, raw_json_name()), 'rb') as f:
blob.upload_from_file(f)
from ctconvert import create_instance


def notify_slack(message):
Expand All @@ -100,123 +24,30 @@ def notify_slack(message):
)


def convert_to_json():
logger.info("Converting to JSON...")
dpath = os.path.join(settings.WORKING_DIR, 'NCT*/')
files = [x for x in sorted(glob.glob(dpath + '*.xml'))]
start = datetime.datetime.now()
completed = 0
with open(os.path.join(settings.WORKING_DIR, raw_json_name()), 'w') as f2:
for source in files:
logger.info("Converting %s", source)
with open(source, 'rb') as f:
try:
f2.write(
json.dumps(
xmltodict.parse(
f,
item_depth=0,
postprocessor=postprocessor)
) + "\n")
except ExpatError:
logger.warn("Unable to parse %s", source)

completed += 1
if completed % 100 == 0:
elapsed = datetime.datetime.now() - start
per_file = elapsed.seconds / completed
remaining = int(per_file * (len(files) - completed) / 60.0)
logger.info("%s minutes remaining", remaining)



def convert_and_download():
logger.info("Executing SQL in cloud and downloading results...")
storage_path = os.path.join(settings.STORAGE_PREFIX, raw_json_name())
schema = [
{'name': 'json', 'type': 'string'},
]
client = Client('clinicaltrials')
tmp_client = Client('tmp_eu')
table_name = settings.PROCESSING_STORAGE_TABLE_NAME
tmp_table = tmp_client.dataset.table("clincialtrials_tmp_{}".format(gen_job_name()))
with contextlib.suppress(NotFound):
table = client.get_table(table_name)
table.gcbq_table.delete()

table = client.create_storage_backed_table(
table_name,
schema,
storage_path
)
sql_path = os.path.join(
settings.BASE_DIR, 'frontend/view.sql')
with open(sql_path, 'r') as sql_file:
job = table.gcbq_client.run_async_query(
gen_job_name(), sql_file.read().format(table_name=table_name))
job.destination = tmp_table
job.use_legacy_sql = False
job.write_disposition = 'WRITE_TRUNCATE'
job.begin()

# The call to .run_async_query() might return before results are actually ready.
# See https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs/query#timeoutMs
wait_for_job(job)


t1_exporter = TableExporter(tmp_table, settings.STORAGE_PREFIX + 'test_table-')
t1_exporter.export_to_storage()

with open(settings.INTERMEDIATE_CSV_PATH, 'w') as f:
t1_exporter.download_from_storage_and_unzip(f)


def get_env(path):
env = os.environ.copy()
with open(path) as e:
for k, v in re.findall(r"^export ([A-Z][A-Z0-9_]*)=(\S*)", e.read(), re.MULTILINE):
env[k] = v
return env
def convert_data():
# This blocks until the compute instance stops running, and raises
# an exception if its startup script finished in an error or
# unknown state
create_instance.main(
"ebmdatalab", "europe-west2-a", "ctgov-converter", wait=True)


def process_data():
# TODO no need to call via shell any more (now we are also a command)
try:
subprocess.check_output(
[
"{}python".format(settings.PROCESSING_VENV_BIN),
"{}/manage.py".format(settings.BASE_DIR),
"process_data",
"--input-csv={}".format(settings.INTERMEDIATE_CSV_PATH),
"--settings=frontend.settings"
],
stderr=subprocess.STDOUT,
env=get_env(settings.PROCESSING_ENV_PATH))
notify_slack("Today's data uploaded to FDAAA staging: "
"https://staging-fdaaa.ebmdatalab.net. "
"If this looks good, tell ebmbot to "
"'@ebmbot fdaaa deploy'""")
except subprocess.CalledProcessError as e:
notify_slack("Error in FDAAA import: command `{}` "
"failed with error code {} "
"and output {}".format(
e.cmd, e.returncode, e.output))
sys.exit(1)
cmd = ProcessCommand()
cmd.handle(
input_csv=('https://storage.googleapis.com/ebmdatalab/clinicaltrials/'
'clinical_trials.csv'))


class Command(BaseCommand):
help = '''Generate a CSV that can be consumed by the `process_data` command, and run that command
'''
help = ''' Generate a CSV that can be consumed by the `process_data` command,
and run that command '''

def handle(self, *args, **options):
with contextlib.suppress(OSError):
os.remove(settings.INTERMEDIATE_CSV_PATH)
try:
download_and_extract()
convert_to_json()
upload_to_cloud()
convert_and_download()
convert_data()
process_data()
notify_slack("Successful FDAAA import")
except:
notify_slack("Error in FDAAA import: {}".format(traceback.format_exc()))
raise
10 changes: 9 additions & 1 deletion clinicaltrials/frontend/management/commands/process_data.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import io
from datetime import date
from lxml.etree import tostring
import csv
Expand Down Expand Up @@ -182,13 +183,20 @@ class Command(BaseCommand):

Each import updates existing Trials and Sponsors in-place with new data.
'''

def add_arguments(self, parser):
parser.add_argument(
'--input-csv',
help="Path or URL to CSV containing the data",
type=str)

def handle(self, *args, **options):
f = open(options['input_csv'])
if '://' in options['input_csv']:
resp = requests.get(options['input_csv'], stream=True)
f = io.StringIO(resp.text)
else:
f = open(options['input_csv'])
# if it's in GCS, download it
logger.info("Creating new trials and sponsors from %s", options['input_csv'])
with transaction.atomic():
# We don't use auto_now on models for `today`, purely so
Expand Down
Binary file removed clinicaltrials/frontend/tests/fixtures/data.zip
Binary file not shown.

This file was deleted.

Loading