diff --git a/scripts/flights2.py b/scripts/flights2.py index 5f27b33..2b83c76 100644 --- a/scripts/flights2.py +++ b/scripts/flights2.py @@ -477,16 +477,46 @@ def _transform_temporal(self, ldf: pl.LazyFrame, /) -> pl.LazyFrame: class SourceMap: - """Handles resource sharing and reading.""" + """ + Group specs by common data, scanning a `pl.LazyFrame`_ per-group. + + Parameters + ---------- + input_dir + Directory containing monthly input files. + + .. _pl.LazyFrame: + https://docs.pola.rs/api/python/stable/reference/lazyframe/index.html + """ def __init__(self, input_dir: Path, /) -> None: self.input_dir: Path = input_dir self._mapping = defaultdict[DateRange, deque[Spec]](deque) self._frames: dict[DateRange, pl.LazyFrame] = {} - def add_dependency(self, spec: Spec, /) -> None: + @classmethod + def from_specs(cls, specs: Iterable[Spec], input_dir: Path, /) -> SourceMap: + """ + Construct with all dependent data grouped and loaded. + + Parameters + ---------- + specs + Target dataset definitions. + input_dir + Directory containing monthly input files. + """ + obj = cls(input_dir) + logger.info("Scanning dependencies ...") + for spec in specs: + obj.add_spec(spec) + msg = f"Finished scanning {len(obj)!r} date ranges." + logger.info(msg) + return obj + + def add_spec(self, spec: Spec, /) -> None: """ - Adds a spec, detecting and loading any shared resources. + Adds a spec dependency, detecting and loading any shared resources. Required files for each unique ``DateRange`` are lazily read into a single table. @@ -506,7 +536,7 @@ def iter_tasks(self) -> Iterator[tuple[Spec, pl.LazyFrame]]: if not len(self): msg = ( "Dependent specs have not yet been added.\n\n" - f"Try calling {self.add_dependency.__qualname__}(...) first." + f"Try calling {self.add_spec.__qualname__}(...) first." ) raise TypeError(msg) for d_range, frame in self._frames.items(): @@ -644,19 +674,19 @@ class Flights: >>> decl.run() # doctest: +SKIP """ + input_dir: Path + output_dir: Path + specs: Sequence[Spec] + sources: SourceMap + def __init__( - self, - specs: Sequence[Spec], - input_dir: str | Path, - output_dir: str | Path, + self, specs: Sequence[Spec], input_dir: str | Path, output_dir: str | Path ) -> None: self.input_dir = Path(input_dir) self.output_dir = Path(output_dir) self.input_dir.mkdir(exist_ok=True) self.output_dir.mkdir(exist_ok=True) - - self.specs: Sequence[Spec] = specs - self.sources: SourceMap = SourceMap(self.input_dir) + self.specs = specs @classmethod def from_toml( @@ -745,29 +775,12 @@ def download_sources(self) -> None: else: logger.info("Sources already downloaded.") - def scan_sources(self) -> SourceMap: - """ - Group specs by common data, scanning a `pl.LazyFrame`_ per-group. - - See Also - -------- - ``SourceMap`` - - .. _pl.LazyFrame: - https://docs.pola.rs/api/python/stable/reference/lazyframe/index.html - """ - logger.info("Scanning dependencies ...") - for spec in self: - self.sources.add_dependency(spec) - msg = f"Finished scanning {len(self.sources)!r} date ranges." - logger.info(msg) - return self.sources - def run(self) -> None: """Top-level command providing fully managed data collection, transformation and export.""" logger.info("Starting job ...") self.download_sources() - for spec, frame in self.scan_sources().iter_tasks(): + self.sources = SourceMap.from_specs(self, self.input_dir) + for spec, frame in self.sources.iter_tasks(): result = spec.transform(frame) spec.write(result, self.output_dir) logger.info("Finished job.")