-
Notifications
You must be signed in to change notification settings - Fork 70
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: Safely ignore certain API response codes #1754
Conversation
Codecov Report
@@ Coverage Diff @@
## main #1754 +/- ##
==========================================
- Coverage 86.42% 86.28% -0.15%
==========================================
Files 59 59
Lines 4996 5008 +12
Branches 816 819 +3
==========================================
+ Hits 4318 4321 +3
- Misses 479 486 +7
- Partials 199 201 +2
📣 We’re building smart automated test selection to slash your CI/CD build times. Learn more |
@edgarrmondragon , wondering if I should just call it |
Yeah, that makes sense 👍 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hey @rawwar! Thanks for your continued contributions 😄
Naming
I feel like the purpose of this feature is to allow users to skip a stream or partition if it's safe to do so. With that in mind, what do you think naming this something like EndOfStreamError
?
That feels closer to how Python iterators use the StopIteration
exception to signal they're consumed.
Iterator example
class Iterator:
def __init__(self, initial, max=10):
self.current = initial
self.max = max
def __iter__(self):
return self
def __next__(self):
if self.current < self.max:
self.current += 1
return self.current
raise StopIteration
for i in Iterator(1):
print(i)
Make it agnostic
The status code feels too specific to REST streams, so wdyt about building the error message with the status code instead of passsing it as an argument.
Skip partitions too?
The current implementation doesn't allow developers to skip a partition. This use case has come up a few times when a child stream partition should not be synced based on the parent record (#1597).
Testing
With the above consideration in mind, we could use a snapshot test to validate that the partition/stream is skipped:
Unit test
import io
from contextlib import redirect_stdout
import pytest
from freezegun import freeze_time
from singer_sdk.exceptions import IgnorableException
from singer_sdk.streams.core import Stream
from singer_sdk.tap_base import Tap
@freeze_time("2022-01-01T00:00:00Z")
@pytest.mark.snapshot()
def test_end_of_stream_error(snapshot):
"""Test end of stream error is raised."""
snapshot_name = "end_of_stream_error.jsonl"
class CustomTap(Tap):
name = "custom_tap"
def discover_streams(self):
return [StreamWithPartitions(self)]
class StreamWithPartitions(Stream):
name = "partitions"
partitions = [
{"partition": "x"},
{"partition": "y"},
{"partition": "z"},
]
schema = {
"type": "object",
"properties": {
"id": {"type": "integer"},
"name": {"type": "string"},
"partition": {"type": "string"},
},
}
def get_records(self, context):
partition = context["partition"]
if partition == "y":
msg = "End of stream"
raise IgnorableException(msg)
yield {
"id": 1,
"name": "foo",
"partition": partition,
}
yield {
"id": 2,
"name": "bar",
"partition": partition,
}
tap = CustomTap(config=None)
buf = io.StringIO()
with redirect_stdout(buf):
tap.sync_all()
buf.seek(0)
snapshot.assert_match(buf.read(), snapshot_name)
@edgarrmondragon , After thinking about what you mentioned, here are few doubts I have..
If we make this generic, how will developer consider ignoring only specific types of error codes? For example, If I choose to ignore 5xx errors and not any 4xx, Do I depend on catching the exception and checking its status code or some instance variable to decide how to move forward?
Sure. But, is it better to have levels of Exceptions or one general exception? Like, Having a base |
@rawwar No, the tap developer would rather control when the exception is raised. Rather than catching it and deciding what do with it, the SDK handles and logs the exception then continues to the next stream or partition. A user may want to do the following mapping:
The RESTStream class knows to retry the first and raise the second. With the changes here, the stream and tap would know to handle the third one gracefully so the program doesn't crash.
Yeah, we could totally do that 👍 |
Implements #1689
📚 Documentation preview 📚: https://meltano-sdk--1754.org.readthedocs.build/en/1754/