forked from hotgluexyz/tap-canvas-catalog
-
Notifications
You must be signed in to change notification settings - Fork 0
/
client.py
134 lines (104 loc) · 3.92 KB
/
client.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
"""REST client handling, including CanvasCatalogStream base class."""
from __future__ import annotations
from pathlib import Path
from typing import Any, Callable, Iterable
import requests
import dateutil.parser
import backoff
from singer_sdk.authenticators import APIKeyAuthenticator
from singer_sdk.helpers.jsonpath import extract_jsonpath
from singer_sdk.streams import RESTStream
_Auth = Callable[[requests.PreparedRequest], requests.PreparedRequest]
SCHEMAS_DIR = Path(__file__).parent / Path("./schemas")
class CanvasCatalogStream(RESTStream):
"""CanvasCatalog stream class."""
# OR use a dynamic url_base:
@property
def url_base(self) -> str:
"""Return the API URL root, configurable via tap settings."""
api_url = self.config.get("api_url", "")
if not api_url:
raise ValueError("api_url is not set in the configuration")
return api_url
records_jsonpath = "$[*]" # Or override `parse_response`.
@property
def authenticator(self) -> APIKeyAuthenticator:
"""Return a new authenticator object.
Returns:
An authenticator instance.
"""
api_key = self.config.get("api_key", "")
if not api_key:
raise ValueError("api_key is not set in the configuration")
return APIKeyAuthenticator.create_for_stream(
self,
key="Authorization",
value=f'Token token="{api_key}"',
location="header",
)
@property
def http_headers(self) -> dict:
"""Return the http headers needed.
Returns:
A dictionary of HTTP headers.
"""
headers = {}
if "user_agent" in self.config:
headers["User-Agent"] = self.config.get("user_agent")
# If not using an authenticator, you may also provide inline auth headers:
# headers["Private-Token"] = self.config.get("auth_token")
return headers
def get_next_page_token(
self,
response: requests.Response,
previous_token: Any | None,
) -> Any | None:
"""Return a token for identifying next page or None if no more pages.
Args:
response: The HTTP ``requests.Response`` object.
previous_token: The previous page token value.
Returns:
The next pagination token.
"""
next_page_token = previous_token or 1
all_matches = list(extract_jsonpath(
self.records_jsonpath, response.json()
))
if len(all_matches) > 0:
return next_page_token + 1
return None
def get_url_params(
self,
context: dict | None,
next_page_token: Any | None,
) -> dict[str, Any]:
"""Return a dictionary of values to be used in URL parameterization.
Args:
context: The stream context.
next_page_token: The next page index or value.
Returns:
A dictionary of URL query parameters.
"""
params: dict = {}
if next_page_token:
params["page"] = next_page_token
return params
def backoff_wait_generator(self):
return backoff.expo(base=2, factor=5)
def backoff_max_tries(self) -> int:
return 7
def post_process(self, row: dict, context: dict | None = None) -> dict | None:
"""As needed, append or transform raw data to match expected structure.
Args:
row: An individual record from the stream.
context: The stream context.
Returns:
The updated record dictionary, or ``None`` to skip the record.
"""
# if self.replication_key:
# replication_date = self.get_starting_timestamp(context)
# if replication_date:
# updated_at = dateutil.parser.parse(row.get(self.replication_key))
# if updated_at > replication_date:
# return row
return row