Skip to content

Commit

Permalink
Port datasette to use volumes and make it much faster (#460)
Browse files Browse the repository at this point in the history
* Fix datasette example and improve logging

* Port to use volumes (much faster)

* Remove tempfile impmort
  • Loading branch information
ekzhang authored and gongy committed Jan 5, 2024
1 parent 823adff commit d06a879
Showing 1 changed file with 75 additions and 77 deletions.
152 changes: 75 additions & 77 deletions 10_integrations/covid_datasette.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,9 @@
# Try it out for yourself at [modal-labs-example-covid-datasette-app.modal.run/covid-19](https://modal-labs-example-covid-datasette-app.modal.run/covid-19/johns_hopkins_csse_daily_reports).
#
# Some Modal features it uses:
# * Network file systems: a persisted volume lets us store and grow the published dataset over time
# * Scheduled functions: the underlying dataset is refreshed daily, so we schedule a function to run daily
# * Webhooks: exposes the Datasette application for web browser interaction and API requests.
# * Volumes: a persisted volume lets us store and grow the published dataset over time.
# * Scheduled functions: the underlying dataset is refreshed daily, so we schedule a function to run daily.
# * Web endpoints: exposes the Datasette application for web browser interaction and API requests.
#
# ## Basic setup
#
Expand All @@ -24,35 +24,29 @@
import asyncio
import pathlib
import shutil
import tempfile
from datetime import datetime, timedelta
import subprocess
from datetime import datetime
from urllib.request import urlretrieve

from modal import Image, NetworkFileSystem, Period, Stub, asgi_app
from modal import Image, Period, Stub, Volume, asgi_app

stub = Stub("example-covid-datasette")
datasette_image = (
Image.debian_slim()
.pip_install(
"datasette~=0.63.2",
"flufl.lock",
"GitPython",
"sqlite-utils",
)
.apt_install("git")
.pip_install("datasette~=0.63.2", "sqlite-utils")
.apt_install("unzip")
)

# ## Persistent dataset storage
#
# To separate database creation and maintenance from serving, we'll need the underlying
# database file to be stored persistently. To achieve this we use a [`NetworkFileSystem`](/docs/guide/shared-volumes),
# a writable volume that can be attached to Modal functions and persisted across function runs.
# database file to be stored persistently. To achieve this we use a [`Volume`](/docs/guide/volumes).

volume = NetworkFileSystem.persisted("covid-dataset-cache-vol")
stub.volume = Volume.persisted("example-covid-datasette-cache-vol")

CACHE_DIR = "/cache"
LOCK_FILE = str(pathlib.Path(CACHE_DIR, "lock-reports"))
REPO_DIR = pathlib.Path(CACHE_DIR, "COVID-19")
DB_PATH = pathlib.Path(CACHE_DIR, "covid-19.db")
VOLUME_DIR = "/cache-vol"
REPORTS_DIR = pathlib.Path(VOLUME_DIR, "COVID-19")
DB_PATH = pathlib.Path(VOLUME_DIR, "covid-19.db")

# ## Getting a dataset
#
Expand All @@ -65,42 +59,50 @@

@stub.function(
image=datasette_image,
network_file_systems={CACHE_DIR: volume},
volumes={VOLUME_DIR: stub.volume},
retries=2,
)
def download_dataset(cache=True):
import git
from flufl.lock import Lock

if REPO_DIR.exists() and cache:
if REPORTS_DIR.exists() and cache:
print(f"Dataset already present and {cache=}. Skipping download.")
return
elif REPO_DIR.exists():
print(
"Acquiring lock before deleting dataset, which may be in use by other runs."
)
with Lock(LOCK_FILE, default_timeout=timedelta(hours=1)):
shutil.rmtree(REPO_DIR)
print("Cleaned dataset before re-downloading.")
elif REPORTS_DIR.exists():
print("Cleaning dataset before re-downloading...")
shutil.rmtree(REPORTS_DIR)

print("Downloading dataset...")
urlretrieve(
"https://github.com/CSSEGISandData/COVID-19/archive/refs/heads/master.zip",
"/tmp/covid-19.zip",
)

print("Unpacking archive...")
prefix = "COVID-19-master/csse_covid_19_data/csse_covid_19_daily_reports"
subprocess.run(
f"unzip /tmp/covid-19.zip {prefix}/* -d {REPORTS_DIR}", shell=True
)
subprocess.run(f"mv {REPORTS_DIR / prefix}/* {REPORTS_DIR}", shell=True)

print("Committing the volume...")
stub.volume.commit()

git_url = "https://github.com/CSSEGISandData/COVID-19"
git.Repo.clone_from(git_url, REPO_DIR, depth=1)
print("Finished downloading dataset.")


# ## Data munging
#
# This dataset is no swamp, but a bit of data cleaning is still in order. The following two
# functions are used to read a handful of `.csv` files from the git repository and cleaning the
# rows data before inserting into SQLite. You can see that the daily reports are somewhat inconsistent
# in their column names.
# functions read a handful of `.csv` files and clean the data, before inserting it into
# SQLite.


def load_daily_reports():
jhu_csse_base = REPO_DIR
reports_path = (
jhu_csse_base / "csse_covid_19_data" / "csse_covid_19_daily_reports"
)
daily_reports = list(reports_path.glob("*.csv"))
stub.volume.reload()
daily_reports = list(REPORTS_DIR.glob("*.csv"))
if not daily_reports:
raise RuntimeError(
f"Could not find any daily reports in {REPORTS_DIR}."
)
for filepath in daily_reports:
yield from load_report(filepath)

Expand Down Expand Up @@ -141,12 +143,12 @@ def load_report(filepath):
# ## Inserting into SQLite
#
# With the CSV processing out of the way, we're ready to create an SQLite DB and feed data into it.
# Importantly, the `prep_db` function mounts the same network file system used by `download_dataset()`, and
# Importantly, the `prep_db` function mounts the same volume used by `download_dataset()`, and
# rows are batch inserted with progress logged after each batch, as the full COVID-19 has millions
# of rows and does take some time to be fully inserted.
#
# A more sophisticated implementation would only load new data instead of performing a full refresh,
# but for this example things are kept simple.
# but we're keeping things simple for this example!


def chunks(it, size):
Expand All @@ -157,44 +159,40 @@ def chunks(it, size):

@stub.function(
image=datasette_image,
network_file_systems={CACHE_DIR: volume},
volumes={VOLUME_DIR: stub.volume},
timeout=900,
)
def prep_db():
import sqlite_utils
from flufl.lock import Lock

print("Loading daily reports...")
records = load_daily_reports()

with Lock(
LOCK_FILE,
lifetime=timedelta(minutes=2),
default_timeout=timedelta(hours=1),
) as lck, tempfile.NamedTemporaryFile() as tmp:
db = sqlite_utils.Database(tmp.name)
table = db["johns_hopkins_csse_daily_reports"]
DB_PATH.parent.mkdir(parents=True, exist_ok=True)
db = sqlite_utils.Database(DB_PATH)
table = db["johns_hopkins_csse_daily_reports"]

batch_size = 100_000
for i, batch in enumerate(chunks(records, size=batch_size)):
truncate = True if i == 0 else False
table.insert_all(batch, batch_size=batch_size, truncate=truncate)
print(f"Inserted {len(batch)} rows into DB.")

batch_size = 100_000
for i, batch in enumerate(chunks(records, size=batch_size)):
truncate = True if i == 0 else False
table.insert_all(batch, batch_size=batch_size, truncate=truncate)
lck.refresh()
print(f"Inserted {len(batch)} rows into DB.")
table.create_index(["day"], if_not_exists=True)
table.create_index(["province_or_state"], if_not_exists=True)
table.create_index(["country_or_region"], if_not_exists=True)

table.create_index(["day"], if_not_exists=True)
table.create_index(["province_or_state"], if_not_exists=True)
table.create_index(["country_or_region"], if_not_exists=True)
db.close()

print("Syncing DB with network volume.")
DB_PATH.parent.mkdir(parents=True, exist_ok=True)
shutil.copyfile(tmp.name, DB_PATH)
print("Syncing DB with volume.")
stub.volume.commit()


# ## Keeping fresh
# ## Keep it fresh
#
# Johns Hopkins commits new data to the dataset repository every day, so we
# setup a [scheduled](/docs/guide/cron) Modal function to run automatically once every 24 hours.
# Johns Hopkins commits new data to the dataset repository every day, so we set up
# a [scheduled](/docs/guide/cron) function to automatically refresh the database
# every 24 hours.


@stub.function(schedule=Period(hours=24), timeout=1000)
Expand All @@ -204,22 +202,22 @@ def refresh_db():
prep_db.remote()


# ## Webhook
# ## Web endpoint
#
# Hooking up the SQLite database to a Modal webhook is as simple as it gets.
# The Modal `@stub.asgi_app` decorator wraps a few lines of code: one `import` and a few
# lines to instantiate the `Datasette` instance and return a reference to its ASGI app object.
# The Modal `@asgi_app` decorator wraps a few lines of code: one `import` and a few
# lines to instantiate the `Datasette` instance and return its app server.


@stub.function(
image=datasette_image,
network_file_systems={CACHE_DIR: volume},
volumes={VOLUME_DIR: stub.volume},
)
@asgi_app()
def app():
from datasette.app import Datasette

ds = Datasette(files=[DB_PATH])
ds = Datasette(files=[DB_PATH], settings={"sql_time_limit_ms": 10000})
asyncio.run(ds.invoke_startup())
return ds.app()

Expand All @@ -228,11 +226,11 @@ def app():
#
# Run this script using `modal run covid_datasette.py` and it will create the database.
#
# You can run this script using `modal serve covid_datasette.py` and it will create a
# short-lived web URL that exists until you terminate the script.
# You can then use `modal serve covid_datasette.py` to create a short-lived web URL
# that exists until you terminate the script.
#
# When publishing the interactive Datasette app you'll want to create a persistent URL.
# This is achieved by deploying the script with `modal deploy covid_datasette.py`.
# Just run `modal deploy covid_datasette.py`.


@stub.local_entrypoint()
Expand All @@ -243,4 +241,4 @@ def run():
prep_db.remote()


# You can go explore the data over at [modal-labs-covid-datasette-app.modal.run/covid-19/](https://modal-labs-example-covid-datasette-app.modal.run/covid-19/johns_hopkins_csse_daily_reports).
# You can explore the data at the [deployed web endpoint](https://modal-labs--example-covid-datasette-app.modal.run/covid-19).

0 comments on commit d06a879

Please sign in to comment.