Skip to content

Commit

Permalink
refactor: reorganize, finish docs for SourceMap
Browse files Browse the repository at this point in the history
- moves `Flights.scan_sources` -> `SourceMap.from_specs`
- rename `SourceMap.add_dependency` -> `SourceMap.add_spec`

#645 (comment)
  • Loading branch information
dangotbanned committed Dec 18, 2024
1 parent 7c5eed4 commit 859975e
Showing 1 changed file with 43 additions and 30 deletions.
73 changes: 43 additions & 30 deletions scripts/flights2.py
Original file line number Diff line number Diff line change
Expand Up @@ -477,16 +477,46 @@ def _transform_temporal(self, ldf: pl.LazyFrame, /) -> pl.LazyFrame:


class SourceMap:
"""Handles resource sharing and reading."""
"""
Group specs by common data, scanning a `pl.LazyFrame`_ per-group.
Parameters
----------
input_dir
Directory containing monthly input files.
.. _pl.LazyFrame:
https://docs.pola.rs/api/python/stable/reference/lazyframe/index.html
"""

def __init__(self, input_dir: Path, /) -> None:
self.input_dir: Path = input_dir
self._mapping = defaultdict[DateRange, deque[Spec]](deque)
self._frames: dict[DateRange, pl.LazyFrame] = {}

def add_dependency(self, spec: Spec, /) -> None:
@classmethod
def from_specs(cls, specs: Iterable[Spec], input_dir: Path, /) -> SourceMap:
"""
Construct with all dependent data grouped and loaded.
Parameters
----------
specs
Target dataset definitions.
input_dir
Directory containing monthly input files.
"""
obj = cls(input_dir)
logger.info("Scanning dependencies ...")
for spec in specs:
obj.add_spec(spec)
msg = f"Finished scanning {len(obj)!r} date ranges."
logger.info(msg)
return obj

def add_spec(self, spec: Spec, /) -> None:
"""
Adds a spec, detecting and loading any shared resources.
Adds a spec dependency, detecting and loading any shared resources.
Required files for each unique ``DateRange`` are lazily read into a single table.
Expand All @@ -506,7 +536,7 @@ def iter_tasks(self) -> Iterator[tuple[Spec, pl.LazyFrame]]:
if not len(self):
msg = (
"Dependent specs have not yet been added.\n\n"
f"Try calling {self.add_dependency.__qualname__}(...) first."
f"Try calling {self.add_spec.__qualname__}(...) first."
)
raise TypeError(msg)
for d_range, frame in self._frames.items():
Expand Down Expand Up @@ -644,19 +674,19 @@ class Flights:
>>> decl.run() # doctest: +SKIP
"""

input_dir: Path
output_dir: Path
specs: Sequence[Spec]
sources: SourceMap

def __init__(
self,
specs: Sequence[Spec],
input_dir: str | Path,
output_dir: str | Path,
self, specs: Sequence[Spec], input_dir: str | Path, output_dir: str | Path
) -> None:
self.input_dir = Path(input_dir)
self.output_dir = Path(output_dir)
self.input_dir.mkdir(exist_ok=True)
self.output_dir.mkdir(exist_ok=True)

self.specs: Sequence[Spec] = specs
self.sources: SourceMap = SourceMap(self.input_dir)
self.specs = specs

@classmethod
def from_toml(
Expand Down Expand Up @@ -745,29 +775,12 @@ def download_sources(self) -> None:
else:
logger.info("Sources already downloaded.")

def scan_sources(self) -> SourceMap:
"""
Group specs by common data, scanning a `pl.LazyFrame`_ per-group.
See Also
--------
``SourceMap``
.. _pl.LazyFrame:
https://docs.pola.rs/api/python/stable/reference/lazyframe/index.html
"""
logger.info("Scanning dependencies ...")
for spec in self:
self.sources.add_dependency(spec)
msg = f"Finished scanning {len(self.sources)!r} date ranges."
logger.info(msg)
return self.sources

def run(self) -> None:
"""Top-level command providing fully managed data collection, transformation and export."""
logger.info("Starting job ...")
self.download_sources()
for spec, frame in self.scan_sources().iter_tasks():
self.sources = SourceMap.from_specs(self, self.input_dir)
for spec, frame in self.sources.iter_tasks():
result = spec.transform(frame)
spec.write(result, self.output_dir)
logger.info("Finished job.")
Expand Down

0 comments on commit 859975e

Please sign in to comment.