Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Release Buildflow 0.3.1 #335

Merged
merged 3 commits into from
Dec 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion buildflow/core/app/flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -935,7 +935,7 @@
print("Confirmation disabled.")
if progress is not None:
progress.start()
apply_task = progress.add_task("Destryoing resources...", total=1)
apply_task = progress.add_task("Destroying resources...", total=1)

Check warning on line 938 in buildflow/core/app/flow.py

View check run for this annotation

Codecov / codecov/patch

buildflow/core/app/flow.py#L938

Added line #L938 was not covered by tests
await self._get_infra_actor().destroy(pulumi_program=self._pulumi_program)
if progress is not None:
progress.advance(apply_task)
Expand Down
1 change: 0 additions & 1 deletion buildflow/core/types/duckdb_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,3 @@

DuckDBDatabase = str
DuckDBTableID = str
MotherDuckToken = str
2 changes: 2 additions & 0 deletions buildflow/io/aws/s3_file_change_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ class S3FileChangeStream(AWSPrimtive):
event_types: Iterable[S3ChangeStreamEventType] = (
S3ChangeStreamEventType.OBJECT_CREATED_ALL,
)
filter_test_events: bool = True

# The sqs queue is always managed by the S3FileChangeStream and
# is setup in __post_init__ based on the bucket configuration.
Expand Down Expand Up @@ -53,6 +54,7 @@ def source(self, credentials: AWSCredentials) -> SourceStrategy:
credentials=credentials,
sqs_source=self.sqs_queue.source(credentials),
aws_region=self.s3_bucket.aws_region,
filter_test_events=self.filter_test_events,
)

def pulumi_resources(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,15 @@ def __init__(
credentials: AWSCredentials,
sqs_source: SQSSource,
aws_region: str,
filter_test_events: bool = True,
):
super().__init__(
credentials=credentials, strategy_id="aws-s3-filestream-source"
)
self.sqs_queue_source = sqs_source
aws_clients = AWSClients(credentials=credentials, region=aws_region)
self._s3_client = aws_clients.s3_client()
self._filter_test_events = filter_test_events

async def pull(self) -> PullResponse:
sqs_response = await self.sqs_queue_source.pull()
Expand All @@ -44,8 +46,13 @@ async def pull(self) -> PullResponse:
)
)
else:
# This can happen for the initial test notification that is sent
# on the queue.
if metadata.get("Event") == "s3:TestEvent" and self._filter_test_events:
# Filter out test events from S3 if specified, we ack it so it
# won't be delivered again.
if len(sqs_response.payload) == 1:
await self.sqs_queue_source.ack(sqs_response.ack_info, True)
continue
# Otherwise, we don't know what this is, so we'll just pass it along
parsed_payloads.append(
S3FileChangeEvent(
s3_client=self._s3_client,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ def setUp(self) -> None:
self.sqs_client = boto3.client("sqs", region_name=self.region)
self.creds = AWSCredentials(CredentialsOptions.default())

async def test_s3_file_change_stream_test_event(self):
async def test_s3_file_change_stream_test_event_keep(self):
with mock_sqs():
with mock_sts():
self.queue_url = self._create_queue(self.queue_name, self.region)
Expand Down Expand Up @@ -69,7 +69,10 @@ async def test_s3_file_change_stream_test_event(self):
aws_account_id=None,
)
s3_stream = S3FileChangeStreamSource(
sqs_source=source, aws_region=self.region, credentials=self.creds
sqs_source=source,
aws_region=self.region,
credentials=self.creds,
filter_test_events=False,
)

backlog = await s3_stream.backlog()
Expand All @@ -84,6 +87,53 @@ async def test_s3_file_change_stream_test_event(self):
self.assertEqual(payload.metadata, contents)
self.assertEqual(payload.file_path, None)

async def test_s3_file_change_stream_test_event_filter(self):
with mock_sqs():
with mock_sts():
self.queue_url = self._create_queue(self.queue_name, self.region)
sink = SQSSink(
credentials=self.creds,
queue_name=self.queue_name,
aws_region=self.region,
aws_account_id=None,
)
await sink.push(
[
json.dumps(
{
"Service": "Amazon S3",
"Event": "s3:TestEvent",
"Time": "2023-07-28T16:34:05.246Z",
"Bucket": "test-bucket",
"RequestId": "request-id",
"HostId": "host-id",
}
)
]
)

source = SQSSource(
credentials=self.creds,
queue_name=self.queue_name,
aws_region=self.region,
aws_account_id=None,
)
s3_stream = S3FileChangeStreamSource(
sqs_source=source,
aws_region=self.region,
credentials=self.creds,
filter_test_events=True,
)

backlog = await s3_stream.backlog()
self.assertEqual(backlog, 1)

pull_response = await s3_stream.pull()
self.assertEqual(len(pull_response.payload), 0)

backlog = await s3_stream.backlog()
self.assertEqual(backlog, 0)

async def test_s3_file_change_stream_create(self):
with mock_sts():
with mock_sqs():
Expand Down Expand Up @@ -113,9 +163,23 @@ async def test_s3_file_change_stream_create(self):
},
"eventName": "ObjectRemoved:Delete",
},
]
],
}
await sink.push([json.dumps(contents)])
await sink.push(
[
json.dumps(contents),
json.dumps(
{
"Service": "Amazon S3",
"Event": "s3:TestEvent",
"Time": "2023-07-28T16:34:05.246Z",
"Bucket": "test-bucket",
"RequestId": "request-id",
"HostId": "host-id",
}
),
]
)

source = SQSSource(
credentials=self.creds,
Expand All @@ -128,7 +192,7 @@ async def test_s3_file_change_stream_create(self):
)

backlog = await s3_stream.backlog()
self.assertEqual(backlog, 1)
self.assertEqual(backlog, 2)

pull_response = await s3_stream.pull()
self.assertEqual(len(pull_response.payload), 2)
Expand All @@ -149,6 +213,9 @@ async def test_s3_file_change_stream_create(self):
self.assertEqual(delete.metadata, contents["Records"][1])
self.assertEqual(delete.file_path, want_delete_path)

backlog = await s3_stream.backlog()
self.assertEqual(backlog, 0)


if __name__ == "__main__":
unittest.main()
21 changes: 11 additions & 10 deletions buildflow/io/duckdb/duckdb.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,10 @@
import dataclasses
import logging
import os
from typing import Optional

from buildflow.config.cloud_provider_config import LocalOptions
from buildflow.core.credentials.empty_credentials import EmptyCredentials
from buildflow.core.types.duckdb_types import (
DuckDBDatabase,
DuckDBTableID,
MotherDuckToken,
)
from buildflow.core.types.duckdb_types import DuckDBDatabase, DuckDBTableID
from buildflow.io.duckdb.strategies.duckdb_strategies import DuckDBSink
from buildflow.io.primitive import LocalPrimtive

Expand All @@ -17,16 +13,21 @@
class DuckDBTable(LocalPrimtive):
database: DuckDBDatabase
table: DuckDBTableID
motherduck_token: Optional[MotherDuckToken] = None

def __post_init__(self):
if not self.database.startswith("md:") and not self.database.startswith("/"):
self.database = os.path.join(os.getcwd(), self.database)
if self.motherduck_token is not None:
self.database = f"{self.database}?{self.motherduck_token}"
if self.database.startswith("md:") and "MOTHERDUCK_TOKEN" not in os.environ:
logging.warning(

Check warning on line 21 in buildflow/io/duckdb/duckdb.py

View check run for this annotation

Codecov / codecov/patch

buildflow/io/duckdb/duckdb.py#L21

Added line #L21 was not covered by tests
"MOTHERDUCK_TOKEN not found in environment variables, "
"but you are writing to a database starting with `md:`."
)

def primitive_id(self):
return f"{self.database}:{self.table}"
db = self.database
if "?" in self.database:
db = self.database.split("?")[0]

Check warning on line 29 in buildflow/io/duckdb/duckdb.py

View check run for this annotation

Codecov / codecov/patch

buildflow/io/duckdb/duckdb.py#L29

Added line #L29 was not covered by tests
return f"{db}:{self.table}"

@classmethod
def from_local_options(
Expand Down
6 changes: 5 additions & 1 deletion buildflow/types/aws.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import dataclasses
import enum
from typing import Any
from urllib.parse import unquote_plus

from buildflow.core.types.aws_types import S3BucketName
from buildflow.types.portable import FileChangeEvent, PortableFileChangeEventType
Expand Down Expand Up @@ -63,5 +64,8 @@

@property
def blob(self) -> bytes:
data = self.s3_client.get_object(Bucket=self.bucket_name, Key=self.file_path)
# S3 gives us a url encoded path, so we need to decode it inorder to read
# the file from s3.
file_path = unquote_plus(self.file_path)
data = self.s3_client.get_object(Bucket=self.bucket_name, Key=file_path)

Check warning on line 70 in buildflow/types/aws.py

View check run for this annotation

Codecov / codecov/patch

buildflow/types/aws.py#L69-L70

Added lines #L69 - L70 were not covered by tests
return data["Body"].read()
9 changes: 4 additions & 5 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ build-backend = "setuptools.build_meta"

[project]
name = "buildflow"
version = "0.3.0"
version = "0.3.1"
authors = [
{ name = "Caleb Van Dyke", email = "[email protected]" },
{ name = "Josh Tanke", email = "[email protected]" },
Expand Down Expand Up @@ -42,15 +42,14 @@ dependencies = [
# We can remove this once duckdb has released version 0.8.2
"pandas",
"pg8000",
"pulumi==3.35.3",
"pulumi==3.98.0",
"pulumi_aws",
"pulumi_gcp",
"pulumi_snowflake",
"pyarrow",
"pydantic<2.0.2",
# Avoid issue with cython 3.0.0 and pyyaml:
# https://github.com/yaml/pyyaml/issues/724
"pyyaml<5.4.0,<6.0.0",
# Add constraints to ensure pyyaml works with cython 3.
"pyyaml!=5.4.0,!=6.0.0,!=5.4.1",
"s3fs",
"sqlalchemy[asyncio]",
"snowflake-ingest",
Expand Down
Loading