Skip to content

Commit

Permalink
support full streaming mode in all posix-like connectors (#7768)
Browse files Browse the repository at this point in the history
GitOrigin-RevId: 043a9ef72ed29a007810b11471043267b3d3d43e
  • Loading branch information
zxqfd555-pw authored and Manul from Pathway committed Dec 9, 2024
1 parent 53fae8e commit 833dc27
Show file tree
Hide file tree
Showing 37 changed files with 1,071 additions and 798 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,10 @@ All notable changes to this project will be documented in this file.
This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
## [Unreleased]

### Changed
- `pw.io.s3.read` now monitors object deletions and modifications in the S3 source, when ran in streaming mode. When an object is deleted in S3, it is also removed from the engine. Similarly, if an object is modified in S3, the engine updates its state to reflect those changes.
- `pw.io.s3.read` now supports `with_metadata` flag, which makes it possible to attach the metadata of the source object to the table entries.

### Fixed
- `pw.xpacks.llm.document_store.DocumentStore` no longer requires `_metadata` column in the input table.

Expand Down
37 changes: 35 additions & 2 deletions integration_tests/s3/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ def put_aws_object(path, contents):
aws_secret_access_key=os.environ["AWS_S3_SECRET_ACCESS_KEY"],
)
s3_client.put_object(
Bucket="aws-integrationtest",
Bucket=S3_BUCKET_NAME,
Key=path,
Body=contents,
)
Expand All @@ -37,6 +37,31 @@ def put_minio_object(path, contents):
)


def delete_aws_object(path):
s3_client = boto3.client(
"s3",
aws_access_key_id=os.environ["AWS_S3_ACCESS_KEY"],
aws_secret_access_key=os.environ["AWS_S3_SECRET_ACCESS_KEY"],
)
s3_client.delete_objects(
Bucket=S3_BUCKET_NAME,
Delete={"Objects": [{"Key": path}]},
)


def delete_minio_object(path):
s3_client = boto3.client(
"s3",
aws_access_key_id=os.environ["MINIO_S3_ACCESS_KEY"],
aws_secret_access_key=os.environ["MINIO_S3_SECRET_ACCESS_KEY"],
endpoint_url="https://minio-api.deploys.pathway.com",
)
s3_client.delete_objects(
Bucket=MINIO_BUCKET_NAME,
Delete={"Objects": [{"Key": path}]},
)


def put_object_into_storage(storage, path, contents):
put_object_methods = {
"s3": put_aws_object,
Expand All @@ -45,8 +70,16 @@ def put_object_into_storage(storage, path, contents):
return put_object_methods[storage](path, contents)


def delete_object_from_storage(storage, path):
delete_object_methods = {
"s3": delete_aws_object,
"minio": delete_minio_object,
}
return delete_object_methods[storage](path)


def create_jsonlines(input_dicts):
return "\n".join([json.dumps(value) for value in input_dicts])
return "\n".join([json.dumps(value) for value in input_dicts]) + "\n"


def read_jsonlines_fields(path, keys_to_extract):
Expand Down
23 changes: 20 additions & 3 deletions integration_tests/s3/test_s3_formats.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import json
import time

import pytest

Expand All @@ -10,16 +11,27 @@

@pytest.mark.parametrize("storage_type", ["s3", "minio"])
@pytest.mark.parametrize("format", ["binary", "plaintext", "plaintext_by_object"])
def test_formats_without_parsing(storage_type, format, tmp_path, s3_path):
@pytest.mark.parametrize("with_metadata", [True, False])
def test_formats_without_parsing(
storage_type, format, with_metadata, tmp_path, s3_path
):
input_path = f"{s3_path}/input.txt"
input_full_contents = "abc\n\ndef\nghi\njkl"
output_path = tmp_path / "output.json"
uploaded_at = int(time.time())

put_object_into_storage(storage_type, input_path, input_full_contents)
table = create_table_for_storage(storage_type, input_path, format)
table = create_table_for_storage(
storage_type, input_path, format, with_metadata=with_metadata
)
pw.io.jsonlines.write(table, output_path)
pw.run()

def check_metadata(metadata):
assert uploaded_at <= metadata["modified_at"] <= uploaded_at + 10
assert metadata["path"] == input_path
assert metadata["size"] == len(input_full_contents)

if format in ("binary", "plaintext_by_object"):
expected_output = (
[ord(c) for c in input_full_contents]
Expand All @@ -29,11 +41,16 @@ def test_formats_without_parsing(storage_type, format, tmp_path, s3_path):
with open(output_path) as f:
result = json.load(f)
assert result["data"] == expected_output
if with_metadata:
check_metadata(result["_metadata"])
else:
lines = []
with open(output_path, "r") as f:
for row in f:
lines.append(json.loads(row)["data"])
result = json.loads(row)
lines.append(result["data"])
if with_metadata:
check_metadata(result["_metadata"])
lines.sort()
target = input_full_contents.split("\n")
target.sort()
Expand Down
4 changes: 2 additions & 2 deletions integration_tests/s3/test_s3_generic.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ def test_s3_backfilling(snapshot_access, tmp_path: pathlib.Path, s3_path: str):
)
G.clear()

input_contents = "key,value\n1,Hello\n2,World\n3,Bonjour\n4,Monde"
input_contents = "key,value\n1,Hello\n2,World\n3,Bonjour\n4,Monde\n"
put_aws_object(s3_input_path, input_contents)
table = pw.io.s3_csv.read(
s3_path,
Expand All @@ -62,7 +62,7 @@ def test_s3_backfilling(snapshot_access, tmp_path: pathlib.Path, s3_path: str):
)
G.clear()

input_contents = "key,value\n1,Hello\n2,World\n3,Bonjour\n4,Monde\n5,Hola"
input_contents = "key,value\n1,Hello\n2,World\n3,Bonjour\n4,Monde\n5,Hola\n"
s3_input_path_2 = f"{s3_path}/input_2.csv"
input_contents_2 = "key,value\n6,Mundo"
output_path = tmp_path / "output_final.csv"
Expand Down
91 changes: 91 additions & 0 deletions integration_tests/s3/test_s3_streaming.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
import pytest

import pathway as pw
from pathway.tests.utils import (
ExceptionAwareThread,
FileLinesNumberChecker,
wait_result_with_checker,
)

from .base import (
create_table_for_storage,
delete_object_from_storage,
put_object_into_storage,
)


@pytest.mark.parametrize("storage_type", ["s3", "minio"])
def test_object_modified(tmp_path, s3_path, storage_type):
input_path = f"{s3_path}/input.txt"
contents = ["one", "two", "three"]
put_object_into_storage(storage_type, input_path, "\n".join(contents) + "\n")

output_path = tmp_path / "output.json"

def stream_data():
wait_result_with_checker(
FileLinesNumberChecker(output_path, 3), 30, target=None
)

contents.append("four")
put_object_into_storage(storage_type, input_path, "\n".join(contents) + "\n")
wait_result_with_checker(
FileLinesNumberChecker(output_path, 4), 30, target=None
)

contents.append("five")
contents.append("six")
put_object_into_storage(storage_type, input_path, "\n".join(contents) + "\n")
wait_result_with_checker(
FileLinesNumberChecker(output_path, 6), 30, target=None
)

table = create_table_for_storage(
storage_type, s3_path, "plaintext", mode="streaming"
)
pw.io.jsonlines.write(table, output_path)

t = ExceptionAwareThread(target=stream_data)
t.start()
wait_result_with_checker(FileLinesNumberChecker(output_path, 6), 90)
t.join()


@pytest.mark.parametrize("storage_type", ["s3", "minio"])
def test_object_deleted(tmp_path, s3_path, storage_type):
input_path_1 = f"{s3_path}/input_1.txt"
input_path_2 = f"{s3_path}/input_2.txt"
input_path_3 = f"{s3_path}/input_3.txt"
put_object_into_storage(storage_type, input_path_1, "one")

output_path = tmp_path / "output.json"

def stream_data():
wait_result_with_checker(
FileLinesNumberChecker(output_path, 1), 30, target=None
)

put_object_into_storage(storage_type, input_path_2, "two")
wait_result_with_checker(
FileLinesNumberChecker(output_path, 2), 30, target=None
)

delete_object_from_storage(storage_type, input_path_1)
delete_object_from_storage(storage_type, input_path_2)
wait_result_with_checker(
FileLinesNumberChecker(output_path, 4), 30, target=None
)

put_object_into_storage(storage_type, input_path_1, "four")
put_object_into_storage(storage_type, input_path_3, "three")
wait_result_with_checker(
FileLinesNumberChecker(output_path, 6), 30, target=None
)

table = create_table_for_storage(storage_type, s3_path, "binary", mode="streaming")
pw.io.jsonlines.write(table, output_path)

t = ExceptionAwareThread(target=stream_data)
t.start()
wait_result_with_checker(FileLinesNumberChecker(output_path, 6), 90)
t.join()
19 changes: 1 addition & 18 deletions integration_tests/webserver/test_rest_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,29 +12,12 @@
from pathway.internals.udfs.caches import InMemoryCache
from pathway.tests.utils import (
CsvLinesNumberChecker,
ExceptionAwareThread,
expect_csv_checker,
wait_result_with_checker,
)


class ExceptionAwareThread(threading.Thread):
def run(self):
self._exception = None
try:
if self._target is not None: # type: ignore
self._result = self._target(*self._args, **self._kwargs) # type: ignore
except Exception as e:
self._exception = e
finally:
del self._target, self._args, self._kwargs # type: ignore

def join(self, timeout=None):
super().join(timeout)
if self._exception:
raise self._exception
return self._result


def _test_server_basic(tmp_path: pathlib.Path, port: int | str) -> None:
output_path = tmp_path / "output.csv"

Expand Down
3 changes: 2 additions & 1 deletion integration_tests/wordcount/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -399,7 +399,8 @@ def generate_input(file_name, input_size, commit_frequency):
dataset_line = json.dumps(dataset_line_dict)
fw.write(dataset_line + "\n")
if (seq_line_id + 1) % commit_frequency == 0:
fw.write(COMMIT_LINE)
# fw.write(COMMIT_LINE)
pass


def generate_next_input(inputs_path):
Expand Down
8 changes: 8 additions & 0 deletions python/pathway/io/minio/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ def read(
*,
schema: type[Schema] | None = None,
mode: str = "streaming",
with_metadata: bool = False,
csv_settings: CsvParserSettings | None = None,
json_field_paths: dict[str, str] | None = None,
downloader_threads_count: int | None = None,
Expand Down Expand Up @@ -92,6 +93,12 @@ def read(
mode: If set to ``streaming``, the engine waits for the new objects under the
given path prefix. Set it to ``static``, it only considers the available
data and ingest all of it. Default value is ``streaming``.
with_metadata: When set to true, the connector will add an additional column
named ``_metadata`` to the table. This column will be a JSON field that will
contain an optional field ``modified_at``. Additionally, the column will also
have an optional field named ``owner`` containing an ID of the object owner.
Finally, the column will also contain a field named ``path`` that will show
the full path to the object within a bucket from where a row was filled.
csv_settings: Settings for the CSV parser. This parameter is used only in case
the specified format is "csv".
json_field_paths: If the format is "json", this field allows to map field names
Expand Down Expand Up @@ -149,6 +156,7 @@ def read(
schema=schema,
csv_settings=csv_settings,
mode=mode,
with_metadata=with_metadata,
autocommit_duration_ms=autocommit_duration_ms,
persistent_id=persistent_id,
json_field_paths=json_field_paths,
Expand Down
24 changes: 24 additions & 0 deletions python/pathway/io/s3/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ def read(
aws_s3_settings: AwsS3Settings | None = None,
schema: type[Schema] | None = None,
mode: str = "streaming",
with_metadata: bool = False,
csv_settings: CsvParserSettings | None = None,
json_field_paths: dict[str, str] | None = None,
downloader_threads_count: int | None = None,
Expand Down Expand Up @@ -129,6 +130,12 @@ def read(
mode: If set to ``streaming``, the engine waits for the new objects under the
given path prefix. Set it to ``static``, it only considers the available
data and ingest all of it. Default value is ``streaming``.
with_metadata: When set to true, the connector will add an additional column
named ``_metadata`` to the table. This column will be a JSON field that will
contain an optional field ``modified_at``. Additionally, the column will also
have an optional field named ``owner`` containing an ID of the object owner.
Finally, the column will also contain a field named ``path`` that will show
the full path to the object within a bucket from where a row was filled.
csv_settings: Settings for the CSV parser. This parameter is used only in case
the specified format is ``csv``.
json_field_paths: If the format is ``json``, this field allows to map field names
Expand Down Expand Up @@ -274,6 +281,7 @@ def read(
schema=schema,
csv_settings=csv_settings,
json_field_paths=json_field_paths,
with_metadata=with_metadata,
_stacklevel=5,
)
data_source_options = datasource.DataSourceOptions(
Expand All @@ -300,6 +308,7 @@ def read_from_digital_ocean(
*,
schema: type[Schema] | None = None,
mode: str = "streaming",
with_metadata: bool = False,
csv_settings: CsvParserSettings | None = None,
json_field_paths: dict[str, str] | None = None,
downloader_threads_count: int | None = None,
Expand Down Expand Up @@ -330,6 +339,12 @@ def read_from_digital_ocean(
mode: If set to ``streaming``, the engine waits for the new objects under the
given path prefix. Set it to ``static``, it only considers the available
data and ingest all of it. Default value is ``streaming``.
with_metadata: When set to true, the connector will add an additional column
named ``_metadata`` to the table. This column will be a JSON field that will
contain an optional field ``modified_at``. Additionally, the column will also
have an optional field named ``owner`` containing an ID of the object owner.
Finally, the column will also contain a field named ``path`` that will show
the full path to the object within a bucket from where a row was filled.
csv_settings: Settings for the CSV parser. This parameter is used only in case
the specified format is "csv".
json_field_paths: If the format is "json", this field allows to map field names
Expand Down Expand Up @@ -397,6 +412,7 @@ def read_from_digital_ocean(
schema=schema,
csv_settings=csv_settings,
json_field_paths=json_field_paths,
with_metadata=with_metadata,
_stacklevel=5,
)
datasource_options = datasource.DataSourceOptions(
Expand All @@ -423,6 +439,7 @@ def read_from_wasabi(
*,
schema: type[Schema] | None = None,
mode: str = "streaming",
with_metadata: bool = False,
csv_settings: CsvParserSettings | None = None,
json_field_paths: dict[str, str] | None = None,
downloader_threads_count: int | None = None,
Expand Down Expand Up @@ -452,6 +469,12 @@ def read_from_wasabi(
mode: If set to ``streaming``, the engine waits for the new objects under the
given path prefix. Set it to ``static``, it only considers the available
data and ingest all of it. Default value is ``streaming``.
with_metadata: When set to true, the connector will add an additional column
named ``_metadata`` to the table. This column will be a JSON field that will
contain an optional field ``modified_at``. Additionally, the column will also
have an optional field named ``owner`` containing an ID of the object owner.
Finally, the column will also contain a field named ``path`` that will show
the full path to the object within a bucket from where a row was filled.
csv_settings: Settings for the CSV parser. This parameter is used only in case
the specified format is "csv".
json_field_paths: If the format is "json", this field allows to map field names
Expand Down Expand Up @@ -518,6 +541,7 @@ def read_from_wasabi(
schema=schema,
csv_settings=csv_settings,
json_field_paths=json_field_paths,
with_metadata=with_metadata,
_stacklevel=5,
)
datasource_options = datasource.DataSourceOptions(
Expand Down
2 changes: 2 additions & 0 deletions python/pathway/tests/test_py_object_wrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,8 @@ class Checker:
error: AssertionError | None = None

def __call__(self) -> bool:
if not output_path.exists():
return False
try:
G.clear()
result = pw.io.csv.read(output_path, schema=OutputSchema, mode="static")
Expand Down
Loading

0 comments on commit 833dc27

Please sign in to comment.