From 7383318d6b4499180218860bae6a2d6e975cdd99 Mon Sep 17 00:00:00 2001 From: Reuben Frankel Date: Thu, 26 Oct 2023 15:41:58 +0100 Subject: [PATCH] Add all streams --- tap_f1/client.py | 124 +---------- tap_f1/pagination.py | 12 + tap_f1/streams.py | 515 +++++++++++++++++++++++++++++++++++++++---- tap_f1/tap.py | 59 ++--- 4 files changed, 513 insertions(+), 197 deletions(-) create mode 100644 tap_f1/pagination.py diff --git a/tap_f1/client.py b/tap_f1/client.py index a887ef5..3e7fd85 100644 --- a/tap_f1/client.py +++ b/tap_f1/client.py @@ -1,128 +1,24 @@ """REST client handling, including F1Stream base class.""" -from __future__ import annotations - -from pathlib import Path -from typing import Any, Callable, Iterable - -import requests -from singer_sdk.helpers.jsonpath import extract_jsonpath -from singer_sdk.pagination import BaseAPIPaginator # noqa: TCH002 from singer_sdk.streams import RESTStream -_Auth = Callable[[requests.PreparedRequest], requests.PreparedRequest] -SCHEMAS_DIR = Path(__file__).parent / Path("./schemas") +from tap_f1.pagination import F1Paginator class F1Stream(RESTStream): """F1 stream class.""" - @property - def url_base(self) -> str: - """Return the API URL root, configurable via tap settings.""" - # TODO: hardcode a value here, or retrieve it from self.config - return "https://api.mysample.com" - - records_jsonpath = "$[*]" # Or override `parse_response`. - - # Set this value or override `get_new_paginator`. - next_page_token_jsonpath = "$.next_page" # noqa: S105 - - @property - def http_headers(self) -> dict: - """Return the http headers needed. - - Returns: - A dictionary of HTTP headers. - """ - headers = {} - if "user_agent" in self.config: - headers["User-Agent"] = self.config.get("user_agent") - # If not using an authenticator, you may also provide inline auth headers: - # headers["Private-Token"] = self.config.get("auth_token") # noqa: ERA001 - return headers + url_base = "https://ergast.com/api/f1" + _limit = 1000 - def get_new_paginator(self) -> BaseAPIPaginator: - """Create a new pagination helper instance. + def get_new_paginator(self): + return F1Paginator(0, self._limit) - If the source API can make use of the `next_page_token_jsonpath` - attribute, or it contains a `X-Next-Page` header in the response - then you can remove this method. + def get_url_params(self, context, next_page_token): + params = super().get_url_params(context, next_page_token) + params["limit"] = self._limit - If you need custom pagination that uses page numbers, "next" links, or - other approaches, please read the guide: https://sdk.meltano.com/en/v0.25.0/guides/pagination-classes.html. - - Returns: - A pagination helper instance. - """ - return super().get_new_paginator() - - def get_url_params( - self, - context: dict | None, # noqa: ARG002 - next_page_token: Any | None, # noqa: ANN401 - ) -> dict[str, Any]: - """Return a dictionary of values to be used in URL parameterization. - - Args: - context: The stream context. - next_page_token: The next page index or value. - - Returns: - A dictionary of URL query parameters. - """ - params: dict = {} if next_page_token: - params["page"] = next_page_token - if self.replication_key: - params["sort"] = "asc" - params["order_by"] = self.replication_key - return params + params["offset"] = next_page_token - def prepare_request_payload( - self, - context: dict | None, # noqa: ARG002 - next_page_token: Any | None, # noqa: ARG002, ANN401 - ) -> dict | None: - """Prepare the data payload for the REST API request. - - By default, no payload will be sent (return None). - - Args: - context: The stream context. - next_page_token: The next page index or value. - - Returns: - A dictionary with the JSON body for a POST requests. - """ - # TODO: Delete this method if no payload is required. (Most REST APIs.) - return None - - def parse_response(self, response: requests.Response) -> Iterable[dict]: - """Parse the response and return an iterator of result records. - - Args: - response: The HTTP ``requests.Response`` object. - - Yields: - Each record from the source. - """ - # TODO: Parse response body and return a set of records. - yield from extract_jsonpath(self.records_jsonpath, input=response.json()) - - def post_process( - self, - row: dict, - context: dict | None = None, # noqa: ARG002 - ) -> dict | None: - """As needed, append or transform raw data to match expected structure. - - Args: - row: An individual record from the stream. - context: The stream context. - - Returns: - The updated record dictionary, or ``None`` to skip the record. - """ - # TODO: Delete this method if not needed. - return row + return params diff --git a/tap_f1/pagination.py b/tap_f1/pagination.py new file mode 100644 index 0000000..bb65d31 --- /dev/null +++ b/tap_f1/pagination.py @@ -0,0 +1,12 @@ +from singer_sdk.pagination import BaseOffsetPaginator + + +class F1Paginator(BaseOffsetPaginator): + def has_more(self, response): + data = response.json()["MRData"] + + limit = int(data["limit"]) + offset = int(data["offset"]) + total = int(data["total"]) + + return (limit + offset) < total diff --git a/tap_f1/streams.py b/tap_f1/streams.py index 30e9e7b..b9be90a 100644 --- a/tap_f1/streams.py +++ b/tap_f1/streams.py @@ -1,66 +1,499 @@ """Stream type classes for tap-f1.""" -from __future__ import annotations -import typing as t -from pathlib import Path - -from singer_sdk import typing as th # JSON Schema typing helpers +from singer_sdk import typing as th from tap_f1.client import F1Stream -# TODO: Delete this is if not using json files for schema definition -SCHEMAS_DIR = Path(__file__).parent / Path("./schemas") -# TODO: - Override `UsersStream` and `GroupsStream` with your own stream definition. -# - Copy-paste as many times as needed to create multiple stream types. + +class SpeedUnitType(th.StringType): + @th.DefaultInstanceProperty + def type_dict(self): + return { + **super().type_dict, + "enum": [ + "kph", + "mph", + ], + } + + +class SeasonsStream(F1Stream): + """Define seasons stream.""" + + name = "seasons" + primary_keys = ["season"] + path = "/seasons.json" + records_jsonpath = "MRData.SeasonTable.Seasons[*]" + + schema = th.PropertiesList( + th.Property("season", th.StringType), + th.Property("url", th.URIType), + ).to_dict() + + def get_child_context(self, record, context): + return {"season": record["season"]} -class UsersStream(F1Stream): - """Define custom stream.""" +class CircuitsStream(F1Stream): + """Define circuits stream.""" + + parent_stream_type = SeasonsStream + name = "circuits" + primary_keys = ["circuitId"] + path = "/{season}/circuits.json" + records_jsonpath = "MRData.CircuitTable.Circuits[*]" - name = "users" - path = "/users" - primary_keys: t.ClassVar[list[str]] = ["id"] - replication_key = None - # Optionally, you may also use `schema_filepath` in place of `schema`: - # schema_filepath = SCHEMAS_DIR / "users.json" # noqa: ERA001 schema = th.PropertiesList( + th.Property("circuitId", th.StringType), + th.Property("url", th.URIType), + th.Property("circuitName", th.StringType), + th.Property( + "Location", + th.ObjectType( + th.Property("lat", th.StringType), + th.Property("long", th.StringType), + th.Property("locality", th.StringType), + th.Property("country", th.StringType), + ), + ), + ).to_dict() + + +class DriversStream(F1Stream): + """Define drivers stream.""" + + parent_stream_type = SeasonsStream + context_key = "Driver" + name = "drivers" + primary_keys = ["driverId"] + path = "/{season}/drivers.json" + records_jsonpath = "MRData.DriverTable.Drivers[*]" + + schema = th.PropertiesList( + th.Property("driverId", th.StringType), + th.Property("permanentNumber", th.StringType), + th.Property("code", th.StringType), + th.Property("url", th.URIType), + th.Property("givenName", th.StringType), + th.Property("familyName", th.StringType), + th.Property("dateOfBirth", th.DateType), + th.Property("nationality", th.StringType), + ).to_dict() + + +class ConstructorsStream(F1Stream): + """Define constructors stream.""" + + parent_stream_type = SeasonsStream + context_key = "Constructor" + name = "constructors" + primary_keys = ["constructorId"] + path = "/{season}/constructors.json" + records_jsonpath = "MRData.ConstructorTable.Constructors[*]" + + schema = th.PropertiesList( + th.Property("constructorId", th.StringType), + th.Property("url", th.URIType), th.Property("name", th.StringType), + th.Property("nationality", th.StringType), + ).to_dict() + + +class RacesStream(F1Stream): + """Define races stream.""" + + parent_stream_type = SeasonsStream + name = "races" + primary_keys = ["season", "round"] + path = "/{season}.json" + records_jsonpath = "MRData.RaceTable.Races[*]" + + schema = th.PropertiesList( + th.Property("season", th.StringType), + th.Property("round", th.StringType), + th.Property("url", th.URIType), + th.Property("raceName", th.StringType), + th.Property( + "Circuit", + th.ObjectType( + th.Property("circuitId", th.StringType), + th.Property("url", th.URIType), + th.Property("circuitName", th.StringType), + th.Property( + "Location", + th.ObjectType( + th.Property("lat", th.StringType), + th.Property("long", th.StringType), + th.Property("locality", th.StringType), + th.Property("country", th.StringType), + ), + ), + ), + ), + th.Property("date", th.DateType), + th.Property("time", th.StringType), + th.Property( + "FirstPractice", + th.ObjectType( + th.Property("date", th.DateType), + th.Property("time", th.StringType), + ), + ), th.Property( - "id", - th.StringType, - description="The user's system ID", + "SecondPractice", + th.ObjectType( + th.Property("date", th.DateType), + th.Property("time", th.StringType), + ), ), th.Property( - "age", - th.IntegerType, - description="The user's age in years", + "ThirdPractice", + th.ObjectType( + th.Property("date", th.DateType), + th.Property("time", th.StringType), + ), ), th.Property( - "email", - th.StringType, - description="The user's email address", + "Qualifying", + th.ObjectType( + th.Property("date", th.DateType), + th.Property("time", th.StringType), + ), ), - th.Property("street", th.StringType), - th.Property("city", th.StringType), th.Property( - "state", - th.StringType, - description="State name in ISO 3166-2 format", + "Sprint", + th.ObjectType( + th.Property("date", th.DateType), + th.Property("time", th.StringType), + ), ), - th.Property("zip", th.StringType), ).to_dict() + def get_child_context(self, record, context): + return { + "season": record["season"], + "round": record["round"], + } -class GroupsStream(F1Stream): - """Define custom stream.""" - name = "groups" - path = "/groups" - primary_keys: t.ClassVar[list[str]] = ["id"] - replication_key = "modified" +class QualifyingResultsStream(F1Stream): + """Define qualifying results stream.""" + + parent_stream_type = RacesStream + name = "qualifying_results" + primary_keys = ["season", "round", "number"] + path = "/{season}/{round}/qualifying.json" + records_jsonpath = "MRData.RaceTable.Races[*].QualifyingResults[*]" + schema = th.PropertiesList( - th.Property("name", th.StringType), - th.Property("id", th.StringType), - th.Property("modified", th.DateTimeType), + th.Property("season", th.StringType), + th.Property("round", th.StringType), + th.Property("number", th.StringType), + th.Property("position", th.StringType), + th.Property( + "Driver", + th.ObjectType( + th.Property("driverId", th.StringType), + th.Property("permanentNumber", th.StringType), + th.Property("code", th.StringType), + th.Property("url", th.URIType), + th.Property("givenName", th.StringType), + th.Property("familyName", th.StringType), + th.Property("dateOfBirth", th.DateType), + th.Property("nationality", th.StringType), + ), + ), + th.Property( + "Constructor", + th.ObjectType( + th.Property("constructorId", th.StringType), + th.Property("url", th.URIType), + th.Property("name", th.StringType), + th.Property("nationality", th.StringType), + ), + ), + th.Property("Q1", th.StringType), + th.Property("Q2", th.StringType), + th.Property("Q3", th.StringType), + ).to_dict() + + +class SprintResultsStream(F1Stream): + """Define sprint results stream.""" + + parent_stream_type = RacesStream + name = "sprints_results" + primary_keys = ["season", "round", "number"] + path = "/{season}/{round}/sprint.json" + records_jsonpath = "MRData.RaceTable.Races[*].SprintResults[*]" + + schema = th.PropertiesList( + th.Property("season", th.StringType), + th.Property("round", th.StringType), + th.Property("number", th.StringType), + th.Property("position", th.StringType), + th.Property("positionText", th.StringType), + th.Property("points", th.StringType), + th.Property( + "Driver", + th.ObjectType( + th.Property("driverId", th.StringType), + th.Property("permanentNumber", th.StringType), + th.Property("code", th.StringType), + th.Property("url", th.URIType), + th.Property("givenName", th.StringType), + th.Property("familyName", th.StringType), + th.Property("dateOfBirth", th.DateType), + th.Property("nationality", th.StringType), + ), + ), + th.Property( + "Constructor", + th.ObjectType( + th.Property("constructorId", th.StringType), + th.Property("url", th.URIType), + th.Property("name", th.StringType), + th.Property("nationality", th.StringType), + ), + ), + th.Property("grid", th.StringType), + th.Property("laps", th.StringType), + th.Property("status", th.StringType), + th.Property( + "Time", + th.ObjectType( + th.Property("millis", th.StringType), + th.Property("time", th.StringType), + ), + ), + th.Property( + "FastestLap", + th.ObjectType( + th.Property("rank", th.StringType), + th.Property("lap", th.StringType), + th.Property( + "Time", + th.ObjectType( + th.Property("time", th.StringType), + ), + ), + th.Property( + "AverageSpeed", + th.ObjectType( + th.Property("units", SpeedUnitType), + th.Property("speed", th.StringType), + ), + ), + ), + ), ).to_dict() + + +class RaceResultsStream(F1Stream): + """Define race results stream.""" + + parent_stream_type = RacesStream + name = "race_results" + primary_keys = ["season", "round", "number"] + path = "/{season}/{round}/results.json" + records_jsonpath = "MRData.RaceTable.Races[*].Results[*]" + + schema = th.PropertiesList( + th.Property("season", th.StringType), + th.Property("round", th.StringType), + th.Property("number", th.StringType), + th.Property("position", th.StringType), + th.Property("positionText", th.StringType), + th.Property("points", th.StringType), + th.Property( + "Driver", + th.ObjectType( + th.Property("driverId", th.StringType), + th.Property("permanentNumber", th.StringType), + th.Property("code", th.StringType), + th.Property("url", th.URIType), + th.Property("givenName", th.StringType), + th.Property("familyName", th.StringType), + th.Property("dateOfBirth", th.DateType), + th.Property("nationality", th.StringType), + ), + ), + th.Property( + "Constructor", + th.ObjectType( + th.Property("constructorId", th.StringType), + th.Property("url", th.URIType), + th.Property("name", th.StringType), + th.Property("nationality", th.StringType), + ), + ), + th.Property("grid", th.StringType), + th.Property("laps", th.StringType), + th.Property("status", th.StringType), + th.Property( + "Time", + th.ObjectType( + th.Property("millis", th.StringType), + th.Property("time", th.StringType), + ), + ), + th.Property( + "FastestLap", + th.ObjectType( + th.Property("rank", th.StringType), + th.Property("lap", th.StringType), + th.Property( + "Time", + th.ObjectType( + th.Property("time", th.StringType), + ), + ), + th.Property( + "AverageSpeed", + th.ObjectType( + th.Property("units", SpeedUnitType), + th.Property("speed", th.StringType), + ), + ), + ), + ), + ).to_dict() + + def get_child_context(self, record, context): + return { + **super().get_child_context(record, context), + "driverId": record["Driver"]["driverId"], + } + + +class LapsStream(F1Stream): + """Define laps stream.""" + + parent_stream_type = RaceResultsStream + name = "laps" + primary_keys = ["season", "round", "driverId", "number"] + path = "/{season}/{round}/drivers/{driverId}/laps.json" + records_jsonpath = "MRData.RaceTable.Races[*].Laps[*]" + + schema = th.PropertiesList( + th.Property("season", th.StringType), + th.Property("round", th.StringType), + th.Property("driverId", th.StringType), + th.Property("number", th.StringType), + th.Property( + "Timings", + th.ArrayType( + th.ObjectType( + th.Property("driverId", th.StringType), + th.Property("position", th.StringType), + th.Property("time", th.StringType), + ) + ), + ), + ).to_dict() + + +class PitStopsStream(F1Stream): + """Define pit stops stream.""" + + parent_stream_type = RacesStream + name = "pit_stops" + primary_keys = ["season", "round", "driverId", "stop"] + path = "/{season}/{round}/pitstops.json" + records_jsonpath = "MRData.RaceTable.Races[*].PitStops[*]" + + schema = th.PropertiesList( + th.Property("season", th.StringType), + th.Property("round", th.StringType), + th.Property("driverId", th.StringType), + th.Property("lap", th.StringType), + th.Property("stop", th.StringType), + th.Property("time", th.TimeType), + th.Property("duration", th.StringType), + ).to_dict() + + +class DriverStandingsStream(F1Stream): + """Define driver standings stream.""" + + parent_stream_type = RacesStream + name = "driver_standings" + primary_keys = ["season", "round", "driverId", "position"] + path = "/{season}/{round}/driverStandings.json" + records_jsonpath = "MRData.StandingsTable.StandingsLists[*].DriverStandings[*]" + + schema = th.PropertiesList( + th.Property("season", th.StringType), + th.Property("round", th.StringType), + th.Property("driverId", th.StringType), + th.Property("position", th.StringType), + th.Property("positionText", th.StringType), + th.Property("points", th.StringType), + th.Property("wins", th.StringType), + th.Property( + "Driver", + th.ObjectType( + th.Property("driverId", th.StringType), + th.Property("permanentNumber", th.StringType), + th.Property("code", th.StringType), + th.Property("url", th.URIType), + th.Property("givenName", th.StringType), + th.Property("familyName", th.StringType), + th.Property("dateOfBirth", th.DateType), + th.Property("nationality", th.StringType), + ), + ), + th.Property( + "Constructors", + th.ArrayType( + th.ObjectType( + th.Property("constructorId", th.StringType), + th.Property("url", th.URIType), + th.Property("name", th.StringType), + th.Property("nationality", th.StringType), + ) + ), + ), + ).to_dict() + + def post_process(self, row, context): + # driverId forms part of primary key + row["driverId"] = row["Driver"]["driverId"] + + return row + + +class ConstructorStandingsStream(F1Stream): + """Define constructor standings stream.""" + + parent_stream_type = RacesStream + name = "constructor_standings" + primary_keys = ["season", "round", "constructorId", "position"] + path = "/{season}/{round}/constructorStandings.json" + records_jsonpath = "MRData.StandingsTable.StandingsLists[*].ConstructorStandings[*]" + + schema = th.PropertiesList( + th.Property("season", th.StringType), + th.Property("round", th.StringType), + th.Property("constructorId", th.StringType), + th.Property("position", th.StringType), + th.Property("positionText", th.StringType), + th.Property("points", th.StringType), + th.Property("wins", th.StringType), + th.Property( + "Constructor", + th.ObjectType( + th.Property("constructorId", th.StringType), + th.Property("url", th.URIType), + th.Property("name", th.StringType), + th.Property("nationality", th.StringType), + ), + ), + ).to_dict() + + def post_process(self, row, context): + # constructorId forms part of primary key + row["constructorId"] = row["Constructor"]["constructorId"] + + return row diff --git a/tap_f1/tap.py b/tap_f1/tap.py index 2e072d1..c77f4c5 100644 --- a/tap_f1/tap.py +++ b/tap_f1/tap.py @@ -1,57 +1,32 @@ """F1 tap class.""" -from __future__ import annotations - from singer_sdk import Tap -from singer_sdk import typing as th # JSON schema typing helpers -# TODO: Import your custom stream types here: from tap_f1 import streams +STREAM_TYPES = [ + streams.SeasonsStream, + streams.RacesStream, + streams.CircuitsStream, + streams.QualifyingResultsStream, + streams.SprintResultsStream, + streams.RaceResultsStream, + streams.DriversStream, + streams.ConstructorsStream, + streams.LapsStream, + streams.PitStopsStream, + streams.DriverStandingsStream, + streams.ConstructorStandingsStream, +] + class TapF1(Tap): """F1 tap class.""" name = "tap-f1" - # TODO: Update this section with the actual config values you expect: - config_jsonschema = th.PropertiesList( - th.Property( - "auth_token", - th.StringType, - required=True, - secret=True, # Flag config as protected. - description="The token to authenticate against the API service", - ), - th.Property( - "project_ids", - th.ArrayType(th.StringType), - required=True, - description="Project IDs to replicate", - ), - th.Property( - "start_date", - th.DateTimeType, - description="The earliest record date to sync", - ), - th.Property( - "api_url", - th.StringType, - default="https://api.mysample.com", - description="The url for the API service", - ), - ).to_dict() - - def discover_streams(self) -> list[streams.F1Stream]: - """Return a list of discovered streams. - - Returns: - A list of discovered streams. - """ - return [ - streams.GroupsStream(self), - streams.UsersStream(self), - ] + def discover_streams(self): + return [stream_type(self) for stream_type in STREAM_TYPES] if __name__ == "__main__":