From d06a879a824c8298ac55e0673f967ea4532305f4 Mon Sep 17 00:00:00 2001 From: Eric Zhang Date: Wed, 4 Oct 2023 11:36:56 -0400 Subject: [PATCH] Port datasette to use volumes and make it much faster (#460) * Fix datasette example and improve logging * Port to use volumes (much faster) * Remove tempfile impmort --- 10_integrations/covid_datasette.py | 152 ++++++++++++++--------------- 1 file changed, 75 insertions(+), 77 deletions(-) diff --git a/10_integrations/covid_datasette.py b/10_integrations/covid_datasette.py index e4cc26ebb..08e429bb9 100644 --- a/10_integrations/covid_datasette.py +++ b/10_integrations/covid_datasette.py @@ -11,9 +11,9 @@ # Try it out for yourself at [modal-labs-example-covid-datasette-app.modal.run/covid-19](https://modal-labs-example-covid-datasette-app.modal.run/covid-19/johns_hopkins_csse_daily_reports). # # Some Modal features it uses: -# * Network file systems: a persisted volume lets us store and grow the published dataset over time -# * Scheduled functions: the underlying dataset is refreshed daily, so we schedule a function to run daily -# * Webhooks: exposes the Datasette application for web browser interaction and API requests. +# * Volumes: a persisted volume lets us store and grow the published dataset over time. +# * Scheduled functions: the underlying dataset is refreshed daily, so we schedule a function to run daily. +# * Web endpoints: exposes the Datasette application for web browser interaction and API requests. # # ## Basic setup # @@ -24,35 +24,29 @@ import asyncio import pathlib import shutil -import tempfile -from datetime import datetime, timedelta +import subprocess +from datetime import datetime +from urllib.request import urlretrieve -from modal import Image, NetworkFileSystem, Period, Stub, asgi_app +from modal import Image, Period, Stub, Volume, asgi_app stub = Stub("example-covid-datasette") datasette_image = ( Image.debian_slim() - .pip_install( - "datasette~=0.63.2", - "flufl.lock", - "GitPython", - "sqlite-utils", - ) - .apt_install("git") + .pip_install("datasette~=0.63.2", "sqlite-utils") + .apt_install("unzip") ) # ## Persistent dataset storage # # To separate database creation and maintenance from serving, we'll need the underlying -# database file to be stored persistently. To achieve this we use a [`NetworkFileSystem`](/docs/guide/shared-volumes), -# a writable volume that can be attached to Modal functions and persisted across function runs. +# database file to be stored persistently. To achieve this we use a [`Volume`](/docs/guide/volumes). -volume = NetworkFileSystem.persisted("covid-dataset-cache-vol") +stub.volume = Volume.persisted("example-covid-datasette-cache-vol") -CACHE_DIR = "/cache" -LOCK_FILE = str(pathlib.Path(CACHE_DIR, "lock-reports")) -REPO_DIR = pathlib.Path(CACHE_DIR, "COVID-19") -DB_PATH = pathlib.Path(CACHE_DIR, "covid-19.db") +VOLUME_DIR = "/cache-vol" +REPORTS_DIR = pathlib.Path(VOLUME_DIR, "COVID-19") +DB_PATH = pathlib.Path(VOLUME_DIR, "covid-19.db") # ## Getting a dataset # @@ -65,42 +59,50 @@ @stub.function( image=datasette_image, - network_file_systems={CACHE_DIR: volume}, + volumes={VOLUME_DIR: stub.volume}, retries=2, ) def download_dataset(cache=True): - import git - from flufl.lock import Lock - - if REPO_DIR.exists() and cache: + if REPORTS_DIR.exists() and cache: print(f"Dataset already present and {cache=}. Skipping download.") return - elif REPO_DIR.exists(): - print( - "Acquiring lock before deleting dataset, which may be in use by other runs." - ) - with Lock(LOCK_FILE, default_timeout=timedelta(hours=1)): - shutil.rmtree(REPO_DIR) - print("Cleaned dataset before re-downloading.") + elif REPORTS_DIR.exists(): + print("Cleaning dataset before re-downloading...") + shutil.rmtree(REPORTS_DIR) + + print("Downloading dataset...") + urlretrieve( + "https://github.com/CSSEGISandData/COVID-19/archive/refs/heads/master.zip", + "/tmp/covid-19.zip", + ) + + print("Unpacking archive...") + prefix = "COVID-19-master/csse_covid_19_data/csse_covid_19_daily_reports" + subprocess.run( + f"unzip /tmp/covid-19.zip {prefix}/* -d {REPORTS_DIR}", shell=True + ) + subprocess.run(f"mv {REPORTS_DIR / prefix}/* {REPORTS_DIR}", shell=True) + + print("Committing the volume...") + stub.volume.commit() - git_url = "https://github.com/CSSEGISandData/COVID-19" - git.Repo.clone_from(git_url, REPO_DIR, depth=1) + print("Finished downloading dataset.") # ## Data munging # # This dataset is no swamp, but a bit of data cleaning is still in order. The following two -# functions are used to read a handful of `.csv` files from the git repository and cleaning the -# rows data before inserting into SQLite. You can see that the daily reports are somewhat inconsistent -# in their column names. +# functions read a handful of `.csv` files and clean the data, before inserting it into +# SQLite. def load_daily_reports(): - jhu_csse_base = REPO_DIR - reports_path = ( - jhu_csse_base / "csse_covid_19_data" / "csse_covid_19_daily_reports" - ) - daily_reports = list(reports_path.glob("*.csv")) + stub.volume.reload() + daily_reports = list(REPORTS_DIR.glob("*.csv")) + if not daily_reports: + raise RuntimeError( + f"Could not find any daily reports in {REPORTS_DIR}." + ) for filepath in daily_reports: yield from load_report(filepath) @@ -141,12 +143,12 @@ def load_report(filepath): # ## Inserting into SQLite # # With the CSV processing out of the way, we're ready to create an SQLite DB and feed data into it. -# Importantly, the `prep_db` function mounts the same network file system used by `download_dataset()`, and +# Importantly, the `prep_db` function mounts the same volume used by `download_dataset()`, and # rows are batch inserted with progress logged after each batch, as the full COVID-19 has millions # of rows and does take some time to be fully inserted. # # A more sophisticated implementation would only load new data instead of performing a full refresh, -# but for this example things are kept simple. +# but we're keeping things simple for this example! def chunks(it, size): @@ -157,44 +159,40 @@ def chunks(it, size): @stub.function( image=datasette_image, - network_file_systems={CACHE_DIR: volume}, + volumes={VOLUME_DIR: stub.volume}, timeout=900, ) def prep_db(): import sqlite_utils - from flufl.lock import Lock print("Loading daily reports...") records = load_daily_reports() - with Lock( - LOCK_FILE, - lifetime=timedelta(minutes=2), - default_timeout=timedelta(hours=1), - ) as lck, tempfile.NamedTemporaryFile() as tmp: - db = sqlite_utils.Database(tmp.name) - table = db["johns_hopkins_csse_daily_reports"] + DB_PATH.parent.mkdir(parents=True, exist_ok=True) + db = sqlite_utils.Database(DB_PATH) + table = db["johns_hopkins_csse_daily_reports"] + + batch_size = 100_000 + for i, batch in enumerate(chunks(records, size=batch_size)): + truncate = True if i == 0 else False + table.insert_all(batch, batch_size=batch_size, truncate=truncate) + print(f"Inserted {len(batch)} rows into DB.") - batch_size = 100_000 - for i, batch in enumerate(chunks(records, size=batch_size)): - truncate = True if i == 0 else False - table.insert_all(batch, batch_size=batch_size, truncate=truncate) - lck.refresh() - print(f"Inserted {len(batch)} rows into DB.") + table.create_index(["day"], if_not_exists=True) + table.create_index(["province_or_state"], if_not_exists=True) + table.create_index(["country_or_region"], if_not_exists=True) - table.create_index(["day"], if_not_exists=True) - table.create_index(["province_or_state"], if_not_exists=True) - table.create_index(["country_or_region"], if_not_exists=True) + db.close() - print("Syncing DB with network volume.") - DB_PATH.parent.mkdir(parents=True, exist_ok=True) - shutil.copyfile(tmp.name, DB_PATH) + print("Syncing DB with volume.") + stub.volume.commit() -# ## Keeping fresh +# ## Keep it fresh # -# Johns Hopkins commits new data to the dataset repository every day, so we -# setup a [scheduled](/docs/guide/cron) Modal function to run automatically once every 24 hours. +# Johns Hopkins commits new data to the dataset repository every day, so we set up +# a [scheduled](/docs/guide/cron) function to automatically refresh the database +# every 24 hours. @stub.function(schedule=Period(hours=24), timeout=1000) @@ -204,22 +202,22 @@ def refresh_db(): prep_db.remote() -# ## Webhook +# ## Web endpoint # # Hooking up the SQLite database to a Modal webhook is as simple as it gets. -# The Modal `@stub.asgi_app` decorator wraps a few lines of code: one `import` and a few -# lines to instantiate the `Datasette` instance and return a reference to its ASGI app object. +# The Modal `@asgi_app` decorator wraps a few lines of code: one `import` and a few +# lines to instantiate the `Datasette` instance and return its app server. @stub.function( image=datasette_image, - network_file_systems={CACHE_DIR: volume}, + volumes={VOLUME_DIR: stub.volume}, ) @asgi_app() def app(): from datasette.app import Datasette - ds = Datasette(files=[DB_PATH]) + ds = Datasette(files=[DB_PATH], settings={"sql_time_limit_ms": 10000}) asyncio.run(ds.invoke_startup()) return ds.app() @@ -228,11 +226,11 @@ def app(): # # Run this script using `modal run covid_datasette.py` and it will create the database. # -# You can run this script using `modal serve covid_datasette.py` and it will create a -# short-lived web URL that exists until you terminate the script. +# You can then use `modal serve covid_datasette.py` to create a short-lived web URL +# that exists until you terminate the script. # # When publishing the interactive Datasette app you'll want to create a persistent URL. -# This is achieved by deploying the script with `modal deploy covid_datasette.py`. +# Just run `modal deploy covid_datasette.py`. @stub.local_entrypoint() @@ -243,4 +241,4 @@ def run(): prep_db.remote() -# You can go explore the data over at [modal-labs-covid-datasette-app.modal.run/covid-19/](https://modal-labs-example-covid-datasette-app.modal.run/covid-19/johns_hopkins_csse_daily_reports). +# You can explore the data at the [deployed web endpoint](https://modal-labs--example-covid-datasette-app.modal.run/covid-19).