Skip to content

Commit

Permalink
Add all streams
Browse files Browse the repository at this point in the history
  • Loading branch information
ReubenFrankel committed Nov 24, 2023
1 parent 0f3c20d commit 7383318
Show file tree
Hide file tree
Showing 4 changed files with 513 additions and 197 deletions.
124 changes: 10 additions & 114 deletions tap_f1/client.py
Original file line number Diff line number Diff line change
@@ -1,128 +1,24 @@
"""REST client handling, including F1Stream base class."""

from __future__ import annotations

from pathlib import Path
from typing import Any, Callable, Iterable

import requests
from singer_sdk.helpers.jsonpath import extract_jsonpath
from singer_sdk.pagination import BaseAPIPaginator # noqa: TCH002
from singer_sdk.streams import RESTStream

_Auth = Callable[[requests.PreparedRequest], requests.PreparedRequest]
SCHEMAS_DIR = Path(__file__).parent / Path("./schemas")
from tap_f1.pagination import F1Paginator


class F1Stream(RESTStream):
"""F1 stream class."""

@property
def url_base(self) -> str:
"""Return the API URL root, configurable via tap settings."""
# TODO: hardcode a value here, or retrieve it from self.config
return "https://api.mysample.com"

records_jsonpath = "$[*]" # Or override `parse_response`.

# Set this value or override `get_new_paginator`.
next_page_token_jsonpath = "$.next_page" # noqa: S105

@property
def http_headers(self) -> dict:
"""Return the http headers needed.
Returns:
A dictionary of HTTP headers.
"""
headers = {}
if "user_agent" in self.config:
headers["User-Agent"] = self.config.get("user_agent")
# If not using an authenticator, you may also provide inline auth headers:
# headers["Private-Token"] = self.config.get("auth_token") # noqa: ERA001
return headers
url_base = "https://ergast.com/api/f1"
_limit = 1000

def get_new_paginator(self) -> BaseAPIPaginator:
"""Create a new pagination helper instance.
def get_new_paginator(self):
return F1Paginator(0, self._limit)

If the source API can make use of the `next_page_token_jsonpath`
attribute, or it contains a `X-Next-Page` header in the response
then you can remove this method.
def get_url_params(self, context, next_page_token):
params = super().get_url_params(context, next_page_token)
params["limit"] = self._limit

If you need custom pagination that uses page numbers, "next" links, or
other approaches, please read the guide: https://sdk.meltano.com/en/v0.25.0/guides/pagination-classes.html.
Returns:
A pagination helper instance.
"""
return super().get_new_paginator()

def get_url_params(
self,
context: dict | None, # noqa: ARG002
next_page_token: Any | None, # noqa: ANN401
) -> dict[str, Any]:
"""Return a dictionary of values to be used in URL parameterization.
Args:
context: The stream context.
next_page_token: The next page index or value.
Returns:
A dictionary of URL query parameters.
"""
params: dict = {}
if next_page_token:
params["page"] = next_page_token
if self.replication_key:
params["sort"] = "asc"
params["order_by"] = self.replication_key
return params
params["offset"] = next_page_token

def prepare_request_payload(
self,
context: dict | None, # noqa: ARG002
next_page_token: Any | None, # noqa: ARG002, ANN401
) -> dict | None:
"""Prepare the data payload for the REST API request.
By default, no payload will be sent (return None).
Args:
context: The stream context.
next_page_token: The next page index or value.
Returns:
A dictionary with the JSON body for a POST requests.
"""
# TODO: Delete this method if no payload is required. (Most REST APIs.)
return None

def parse_response(self, response: requests.Response) -> Iterable[dict]:
"""Parse the response and return an iterator of result records.
Args:
response: The HTTP ``requests.Response`` object.
Yields:
Each record from the source.
"""
# TODO: Parse response body and return a set of records.
yield from extract_jsonpath(self.records_jsonpath, input=response.json())

def post_process(
self,
row: dict,
context: dict | None = None, # noqa: ARG002
) -> dict | None:
"""As needed, append or transform raw data to match expected structure.
Args:
row: An individual record from the stream.
context: The stream context.
Returns:
The updated record dictionary, or ``None`` to skip the record.
"""
# TODO: Delete this method if not needed.
return row
return params
12 changes: 12 additions & 0 deletions tap_f1/pagination.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
from singer_sdk.pagination import BaseOffsetPaginator


class F1Paginator(BaseOffsetPaginator):
def has_more(self, response):
data = response.json()["MRData"]

limit = int(data["limit"])
offset = int(data["offset"])
total = int(data["total"])

return (limit + offset) < total
Loading

0 comments on commit 7383318

Please sign in to comment.