Skip to content

Commit

Permalink
Port to use volumes (much faster)
Browse files Browse the repository at this point in the history
  • Loading branch information
ekzhang committed Oct 4, 2023
1 parent b107582 commit 428d1c2
Showing 1 changed file with 62 additions and 68 deletions.
130 changes: 62 additions & 68 deletions 10_integrations/covid_datasette.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,9 @@
# Try it out for yourself at [modal-labs-example-covid-datasette-app.modal.run/covid-19](https://modal-labs-example-covid-datasette-app.modal.run/covid-19/johns_hopkins_csse_daily_reports).
#
# Some Modal features it uses:
# * Network file systems: a persisted volume lets us store and grow the published dataset over time
# * Scheduled functions: the underlying dataset is refreshed daily, so we schedule a function to run daily
# * Webhooks: exposes the Datasette application for web browser interaction and API requests.
# * Volumes: a persisted volume lets us store and grow the published dataset over time.
# * Scheduled functions: the underlying dataset is refreshed daily, so we schedule a function to run daily.
# * Web endpoints: exposes the Datasette application for web browser interaction and API requests.
#
# ## Basic setup
#
Expand All @@ -26,34 +26,28 @@
import shutil
import subprocess
import tempfile
from datetime import datetime, timedelta
from datetime import datetime
from urllib.request import urlretrieve

from modal import Image, NetworkFileSystem, Period, Stub, asgi_app
from modal import Image, Period, Stub, Volume, asgi_app

stub = Stub("example-covid-datasette")
datasette_image = (
Image.debian_slim()
.pip_install(
"datasette~=0.63.2",
"flufl.lock",
"sqlite-utils",
)
.pip_install("datasette~=0.63.2", "sqlite-utils")
.apt_install("unzip")
)

# ## Persistent dataset storage
#
# To separate database creation and maintenance from serving, we'll need the underlying
# database file to be stored persistently. To achieve this we use a [`NetworkFileSystem`](/docs/guide/shared-volumes),
# a writable volume that can be attached to Modal functions and persisted across function runs.
# database file to be stored persistently. To achieve this we use a [`Volume`](/docs/guide/volumes).

volume = NetworkFileSystem.persisted("covid-dataset-cache-vol")
stub.volume = Volume.persisted("example-covid-datasette-cache-vol")

CACHE_DIR = "/cache"
LOCK_FILE = str(pathlib.Path(CACHE_DIR, "lock-reports"))
REPORTS_DIR = pathlib.Path(CACHE_DIR, "COVID-19")
DB_PATH = pathlib.Path(CACHE_DIR, "covid-19.db")
VOLUME_DIR = "/cache-vol"
REPORTS_DIR = pathlib.Path(VOLUME_DIR, "COVID-19")
DB_PATH = pathlib.Path(VOLUME_DIR, "covid-19.db")

# ## Getting a dataset
#
Expand All @@ -66,46 +60,50 @@

@stub.function(
image=datasette_image,
network_file_systems={CACHE_DIR: volume},
volumes={VOLUME_DIR: stub.volume},
retries=2,
)
def download_dataset(cache=True):
from flufl.lock import Lock

if REPORTS_DIR.exists() and cache:
print(f"Dataset already present and {cache=}. Skipping download.")
return
elif REPORTS_DIR.exists():
print(
"Acquiring lock before deleting dataset, which may be in use by other runs."
)
with Lock(LOCK_FILE, default_timeout=timedelta(hours=1)):
shutil.rmtree(REPORTS_DIR)
print("Cleaned dataset before re-downloading.")
print("Cleaning dataset before re-downloading...")
shutil.rmtree(REPORTS_DIR)

print("Downloading dataset...")
urlretrieve("https://github.com/CSSEGISandData/COVID-19/archive/refs/heads/master.zip", "/tmp/covid-19.zip")
urlretrieve(
"https://github.com/CSSEGISandData/COVID-19/archive/refs/heads/master.zip",
"/tmp/covid-19.zip",
)

print("Unpacking archive")
print("Unpacking archive...")
prefix = "COVID-19-master/csse_covid_19_data/csse_covid_19_daily_reports"
subprocess.run(f"unzip /tmp/covid-19.zip {prefix}/* -d {REPORTS_DIR}", shell=True)
subprocess.run(
f"unzip /tmp/covid-19.zip {prefix}/* -d {REPORTS_DIR}", shell=True
)
subprocess.run(f"mv {REPORTS_DIR / prefix}/* {REPORTS_DIR}", shell=True)

print("Committing the volume...")
stub.volume.commit()

print("Finished downloading dataset.")


# ## Data munging
#
# This dataset is no swamp, but a bit of data cleaning is still in order. The following two
# functions are used to read a handful of `.csv` files from the git repository and cleaning the
# rows data before inserting into SQLite. You can see that the daily reports are somewhat inconsistent
# in their column names.
# functions read a handful of `.csv` files and clean the data, before inserting it into
# SQLite.


def load_daily_reports():
stub.volume.reload()
daily_reports = list(REPORTS_DIR.glob("*.csv"))
if not daily_reports:
raise RuntimeError(f"Could not find any daily reports in {REPORTS_DIR}.")
raise RuntimeError(
f"Could not find any daily reports in {REPORTS_DIR}."
)
for filepath in daily_reports:
yield from load_report(filepath)

Expand Down Expand Up @@ -146,12 +144,12 @@ def load_report(filepath):
# ## Inserting into SQLite
#
# With the CSV processing out of the way, we're ready to create an SQLite DB and feed data into it.
# Importantly, the `prep_db` function mounts the same network file system used by `download_dataset()`, and
# Importantly, the `prep_db` function mounts the same volume used by `download_dataset()`, and
# rows are batch inserted with progress logged after each batch, as the full COVID-19 has millions
# of rows and does take some time to be fully inserted.
#
# A more sophisticated implementation would only load new data instead of performing a full refresh,
# but for this example things are kept simple.
# but we're keeping things simple for this example!


def chunks(it, size):
Expand All @@ -162,44 +160,40 @@ def chunks(it, size):

@stub.function(
image=datasette_image,
network_file_systems={CACHE_DIR: volume},
volumes={VOLUME_DIR: stub.volume},
timeout=900,
)
def prep_db():
import sqlite_utils
from flufl.lock import Lock

print("Loading daily reports...")
records = load_daily_reports()

with Lock(
LOCK_FILE,
lifetime=timedelta(minutes=2),
default_timeout=timedelta(hours=1),
) as lck, tempfile.NamedTemporaryFile() as tmp:
db = sqlite_utils.Database(tmp.name)
table = db["johns_hopkins_csse_daily_reports"]
DB_PATH.parent.mkdir(parents=True, exist_ok=True)
db = sqlite_utils.Database(DB_PATH)
table = db["johns_hopkins_csse_daily_reports"]

batch_size = 100_000
for i, batch in enumerate(chunks(records, size=batch_size)):
truncate = True if i == 0 else False
table.insert_all(batch, batch_size=batch_size, truncate=truncate)
print(f"Inserted {len(batch)} rows into DB.")

batch_size = 100_000
for i, batch in enumerate(chunks(records, size=batch_size)):
truncate = True if i == 0 else False
table.insert_all(batch, batch_size=batch_size, truncate=truncate)
lck.refresh()
print(f"Inserted {len(batch)} rows into DB.")
table.create_index(["day"], if_not_exists=True)
table.create_index(["province_or_state"], if_not_exists=True)
table.create_index(["country_or_region"], if_not_exists=True)

table.create_index(["day"], if_not_exists=True)
table.create_index(["province_or_state"], if_not_exists=True)
table.create_index(["country_or_region"], if_not_exists=True)
db.close()

print("Syncing DB with network volume.")
DB_PATH.parent.mkdir(parents=True, exist_ok=True)
shutil.copyfile(tmp.name, DB_PATH)
print("Syncing DB with volume.")
stub.volume.commit()


# ## Keeping fresh
# ## Keep it fresh
#
# Johns Hopkins commits new data to the dataset repository every day, so we
# setup a [scheduled](/docs/guide/cron) Modal function to run automatically once every 24 hours.
# Johns Hopkins commits new data to the dataset repository every day, so we set up
# a [scheduled](/docs/guide/cron) function to automatically refresh the database
# every 24 hours.


@stub.function(schedule=Period(hours=24), timeout=1000)
Expand All @@ -209,22 +203,22 @@ def refresh_db():
prep_db.remote()


# ## Webhook
# ## Web endpoint
#
# Hooking up the SQLite database to a Modal webhook is as simple as it gets.
# The Modal `@stub.asgi_app` decorator wraps a few lines of code: one `import` and a few
# lines to instantiate the `Datasette` instance and return a reference to its ASGI app object.
# The Modal `@asgi_app` decorator wraps a few lines of code: one `import` and a few
# lines to instantiate the `Datasette` instance and return its app server.


@stub.function(
image=datasette_image,
network_file_systems={CACHE_DIR: volume},
volumes={VOLUME_DIR: stub.volume},
)
@asgi_app()
def app():
from datasette.app import Datasette

ds = Datasette(files=[DB_PATH])
ds = Datasette(files=[DB_PATH], settings={"sql_time_limit_ms": 10000})
asyncio.run(ds.invoke_startup())
return ds.app()

Expand All @@ -233,11 +227,11 @@ def app():
#
# Run this script using `modal run covid_datasette.py` and it will create the database.
#
# You can run this script using `modal serve covid_datasette.py` and it will create a
# short-lived web URL that exists until you terminate the script.
# You can then use `modal serve covid_datasette.py` to create a short-lived web URL
# that exists until you terminate the script.
#
# When publishing the interactive Datasette app you'll want to create a persistent URL.
# This is achieved by deploying the script with `modal deploy covid_datasette.py`.
# Just run `modal deploy covid_datasette.py`.


@stub.local_entrypoint()
Expand All @@ -248,4 +242,4 @@ def run():
prep_db.remote()


# You can go explore the data over at [modal-labs-covid-datasette-app.modal.run/covid-19/](https://modal-labs-example-covid-datasette-app.modal.run/covid-19/johns_hopkins_csse_daily_reports).
# You can explore the data at the [deployed web endpoint](https://modal-labs--example-covid-datasette-app.modal.run/covid-19).

0 comments on commit 428d1c2

Please sign in to comment.