Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Improve flights.* dataset reproducibility #645

Merged
merged 29 commits into from
Dec 20, 2024
Merged
Changes from 1 commit
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
ad1b862
feat(DRAFT): Improve `flights.*` dataset reproducibility
dangotbanned Dec 10, 2024
fb3ccc6
build(DRAFT): Generate ISO datetime comparison
dangotbanned Dec 11, 2024
2b1be70
refactor(ruff): Adjust config and fix warnings
dangotbanned Dec 12, 2024
aede7f6
feat(perf): Async requests, use `gzip` instead of `.zip`
dangotbanned Dec 12, 2024
402c2b0
refactor: Reorganize, add `_write_rezip_async` doc
dangotbanned Dec 15, 2024
b57c02f
docs(DRAFT): Improve docs
dangotbanned Dec 15, 2024
efa417c
Merge remote-tracking branch 'upstream/main' into flights-repro
dangotbanned Dec 15, 2024
f646155
refactor: Tidy up, improve doc for `Flights.download_sources`
dangotbanned Dec 16, 2024
bfff9c4
docs: Add/amend some simple docs
dangotbanned Dec 16, 2024
0a19bae
refactor: move `"flights-"` to `Spec._name_prefix`
dangotbanned Dec 16, 2024
4a05b51
docs: fill out more docs
dangotbanned Dec 16, 2024
49205dd
fix: replace `app` with `self`
dangotbanned Dec 16, 2024
3418c5a
refactor: Replace `DateTimeFormat`, `DTF_TO_FMT`, `_transform_temporal`
dangotbanned Dec 16, 2024
0c11ec4
refactor: move global scoped code into `main`
dangotbanned Dec 16, 2024
c668fd7
feat(perf): Store `.parquet` instead of `.csv.gz`
dangotbanned Dec 17, 2024
a002c39
refactor: rename, re-doc `_clean_source` -> `SourceMap.clean`
dangotbanned Dec 17, 2024
2d47fdd
docs: finish `DateRange` doc
dangotbanned Dec 17, 2024
99169b6
docs: add `Flights.scan_sources` doc
dangotbanned Dec 17, 2024
7c5eed4
docs: finish `Flights` doc
dangotbanned Dec 17, 2024
859975e
refactor: reorganize, finish docs for `SourceMap`
dangotbanned Dec 18, 2024
f4bbda8
docs: add module-level doc
dangotbanned Dec 18, 2024
951fe8c
refactor(typing): extend `DateTimeFormat` to include `None`
dangotbanned Dec 18, 2024
a48eb8f
refactor: remove unused `PlScanCsv`
dangotbanned Dec 18, 2024
8481618
Merge branch 'main' into flights-repro
dangotbanned Dec 18, 2024
8ec3adf
feat: improves `Rows` validation
dangotbanned Dec 19, 2024
1962fea
chore: replace `flights.py`
dangotbanned Dec 19, 2024
05707d9
chore: remove `flights.js`
dangotbanned Dec 19, 2024
7c49683
fix: regen with fixed random seed
dangotbanned Dec 19, 2024
cd68193
revert: remove `flights-1k.csv`
dangotbanned Dec 19, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
110 changes: 67 additions & 43 deletions scripts/flights2.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@

import asyncio
import datetime as dt
import gzip
import io
import logging
import tomllib
Expand Down Expand Up @@ -183,8 +182,8 @@ def is_datetime_format(s: Any) -> TypeIs[DateTimeFormat]:
"On_Time_Reporting_Carrier_On_Time_Performance_1987_present_"
)
ZIP: Literal[".zip"] = ".zip"
GZIP: Literal[".csv.gz"] = ".csv.gz"
PATTERN_GZIP: LiteralString = f"*{REPORTING_PREFIX}*{GZIP}"
PARQUET: Literal[".parquet"] = ".parquet"
PATTERN_PARQUET: LiteralString = f"*{REPORTING_PREFIX}*{PARQUET}"

COLUMNS_DEFAULT: Sequence[Column] = (
"date",
Expand Down Expand Up @@ -304,6 +303,9 @@ def file_stems(self) -> Sequence[str]:
.to_list()
)

def paths(self, input_dir: Path, /) -> list[Path]:
return [input_dir / f"{stem}{PARQUET}" for stem in self.file_stems]

def __eq__(self, other: Any, /) -> bool:
"""Two ``DateRange``s are equivalent if they would require the same files."""
return isinstance(other, DateRange) and self.file_stems == other.file_stems
Expand Down Expand Up @@ -481,10 +483,20 @@ def __init__(self, input_dir: Path, /) -> None:
self._frames: dict[DateRange, pl.LazyFrame] = {}

def add_dependency(self, spec: Spec, /) -> None:
"""Adds a spec, detecting any shared resources."""
"""
Adds a spec, detecting and loading any shared resources.

Required files for each unique ``DateRange`` are lazily read into a single table.

Parameters
----------
spec
Describes a target output file.
"""
d_range: DateRange = spec.range
if d_range not in self._mapping:
self._frames[d_range] = self._scan(d_range).pipe(_clean_source)
paths = d_range.paths(self.input_dir)
self._frames[d_range] = pl.scan_parquet(paths).pipe(_clean_source)
self._mapping[d_range].append(spec)

def iter_tasks(self) -> Iterator[tuple[Spec, pl.LazyFrame]]:
Expand All @@ -499,27 +511,6 @@ def iter_tasks(self) -> Iterator[tuple[Spec, pl.LazyFrame]]:
for spec in self._mapping[d_range]:
yield spec, frame

def _scan(self, d_range: DateRange, /) -> pl.LazyFrame:
"""
Lazily read all required files into a single table.

Parameters
----------
d_range
Target time period, spanning multiple monthly files.

Notes
-----
- Only the subset of columns defined in ``SCAN_SCHEMA`` are preserved.
- Some of the unused columns contain invalid utf8 values.
"""
return pl.scan_csv(
[self.input_dir / f"{stem}{GZIP}" for stem in d_range.file_stems],
try_parse_dates=True,
schema_overrides=SCAN_SCHEMA,
encoding="utf8-lossy",
).select(SCAN_SCHEMA.names())

def __len__(self) -> int:
return len(self._frames)

Expand All @@ -533,7 +524,7 @@ class Flights:
specs
Target dataset definitions.
input_dir
Directory to store zip files.
Directory to store monthly input files.
output_dir
Directory to write realised specs to.

Expand Down Expand Up @@ -608,7 +599,8 @@ def _required_stems(self) -> set[str]:

@property
def _existing_stems(self) -> set[str]:
return {_without_suffixes(fp.name) for fp in self.input_dir.glob(PATTERN_GZIP)}
it = self.input_dir.glob(PATTERN_PARQUET)
return {_without_suffixes(fp.name) for fp in it}

@property
def missing_stems(self) -> set[str]:
Expand All @@ -627,7 +619,7 @@ async def _download_sources_async(self, names: Iterable[str], /) -> list[Path]:
session = niquests.AsyncSession(base_url=ROUTE_ZIP)
aws = (_request_async(session, name) for name in names)
buffers = await asyncio.gather(*aws)
writes = (_write_rezip_async(self.input_dir, buf) for buf in buffers)
writes = (_write_zip_to_parquet_async(self.input_dir, buf) for buf in buffers)
return await asyncio.gather(*writes)

def download_sources(self) -> None:
Expand Down Expand Up @@ -679,35 +671,67 @@ async def _request_async(session: niquests.AsyncSession, name: str, /) -> io.Byt
raise NotImplementedError(msg)


def _write_rezip(input_dir: Path, buf: io.BytesIO, /) -> Path:
def _write_zip_to_parquet(input_dir: Path, buf: io.BytesIO, /) -> Path:
dangotbanned marked this conversation as resolved.
Show resolved Hide resolved
"""
Extract inner csv from a zip file, writing to a gzipped csv of the same name.
Extract inner ``.csv`` from ``.zip``, write to ``.parquet``of the same name.

Parameters
----------
input_dir
Directory to store monthly input files.
buf
Buffer containing the zipped response.

Notes
-----
- ``.read_bytes()`` is the only expensive op here
- End result (gzip, single file) can be scanned in parallel by ``polars``
- And slightly smaller than zipped directory
- We pay the *decompress*->*compress* cost only **once** per-download
- Only the subset of columns defined in ``SCAN_SCHEMA`` are preserved
- Further reduces file size
- Also, some unused columns contain invalid utf8 values

Original file:

On_Time_Reporting_Carrier_On_Time_Performance_1987_present_YYYY_M.zip
├──On_Time_Reporting_Carrier_On_Time_Performance_(1987_present)_YYYY_M.csv
└──readme.html

Result file:

On_Time_Reporting_Carrier_On_Time_Performance_1987_present_YYYY_M.parquet

Size comparison:

| format | min (KB) | max (KB) |
| -------- | -------- | --------- |
| .parquet | 1_800 | 3_000 |
| .zip | 15_000 | 30_000 |
| .csv | 200_000 | 250_000 |
"""
zip_csv = next(zipfile.Path(zipfile.ZipFile(buf)).glob("*.csv"))
stem = zip_csv.at.replace("(", "").replace(")", "")
gzipped: Path = (input_dir / stem).with_suffix(".csv.gz")
gzipped.touch()
msg = f"Writing {gzipped.as_posix()!r}"
output = (input_dir / stem).with_suffix(".parquet")
output.touch()
msg = f"Writing {output.as_posix()!r}"
logger.debug(msg)
with gzip.GzipFile(gzipped, mode="wb", mtime=0) as f:
f.write(zip_csv.read_bytes())
return gzipped
with zip_csv.open("rb") as strm:
ldf = pl.scan_csv(
strm,
try_parse_dates=True,
schema_overrides=SCAN_SCHEMA,
encoding="utf8-lossy",
).select(SCAN_SCHEMA.names())
ldf.collect().write_parquet(output, compression="zstd", compression_level=17)
return output


async def _write_rezip_async(input_dir: Path, buf: io.BytesIO, /) -> Path:
async def _write_zip_to_parquet_async(input_dir: Path, buf: io.BytesIO, /) -> Path:
"""
Wraps ``_write_rezip`` to run in a separate thread.
Wraps ``_write_zip_to_parquet`` to run in a separate thread.

- **Greatly** reduces the cost of the decompress > compress operations
- During testing, each write would block for ~10s
"""
return await asyncio.to_thread(_write_rezip, input_dir, buf)
return await asyncio.to_thread(_write_zip_to_parquet, input_dir, buf)


def _file_stem_source[T: (str, pl.Expr)](year: T, month: T, /) -> pl.Expr:
Expand Down